Coverage for /usr/local/lib/python3.13/dist-packages/pyrocko/squirrel/database.py: 83%
532 statements
« prev ^ index » next coverage.py v7.6.0, created at 2025-12-04 10:41 +0000
« prev ^ index » next coverage.py v7.6.0, created at 2025-12-04 10:41 +0000
1# http://pyrocko.org - GPLv3
2#
3# The Pyrocko Developers, 21st Century
4# ---|P------/S----------~Lg----------
6'''
7Database interface code.
8'''
10import sys
11import os
12import logging
13import sqlite3
14import re
15import time
16import threading
18from pyrocko.io.io_common import FileLoadError
19from pyrocko import util
20from pyrocko.guts import Object, Int, List, Dict, Tuple, String
21from . import error, io
22from .model import Nut, to_kind_id, to_kind, to_codes_simple, \
23 codes_patterns_for_kind
24from .error import SquirrelError
26logger = logging.getLogger('psq.database')
28guts_prefix = 'squirrel'
30RLOCK_DEBUG = False
33def tid():
34 return threading.get_ident()
37def abspath(path):
38 if not path.startswith('virtual:') and not path.startswith('client:'):
39 return os.path.abspath(path)
40 else:
41 return path
44def versiontuple(s):
45 fill = [0, 0, 0]
46 vals = [int(x) for x in s.split('.')]
47 fill[:len(vals)] = vals
48 return tuple(fill)
51class ExecuteGet1Error(SquirrelError):
52 pass
55def execute_get1(connection, sql, args=()):
56 rows = list(connection.execute(sql, args))
57 if len(rows) == 1:
58 return rows[0]
59 else:
60 raise ExecuteGet1Error('Expected database entry not found.')
63g_databases = {}
66def get_database(path):
67 path = os.path.abspath(path)
68 if path not in g_databases:
69 g_databases[path] = Database(path)
71 return g_databases[path]
74def close_database(database):
75 path = os.path.abspath(database._database_path)
76 database._conn.close()
77 if path in g_databases:
78 del g_databases[path]
81icolor = 0
83ansi_colors = [
84 '\033[%im' % i for i in range(31, 37 + 1)]
86ansi_reset = '\033[39m'
89def next_color():
90 global icolor
91 color = ansi_colors[icolor % len(ansi_colors)]
92 icolor += 1
93 return color
96thread_colors = {}
97process_colors = {}
100def thread_color():
102 itid = tid()
103 if itid not in thread_colors:
104 thread_colors[itid] = next_color()
105 return thread_colors[itid]
108def process_color():
109 ipid = os.getpid()
110 return ansi_colors[ipid % len(ansi_colors)]
113def color_tid_pid():
114 return '%stid: %s%s, %spid: %s%s' % (
115 thread_color(), tid(), ansi_reset,
116 process_color(), os.getpid(), ansi_reset)
119def get_RLock():
120 if RLOCK_DEBUG:
121 RLockBase = threading.RLock().__class__
123 class RLockDebug(RLockBase):
124 def acquire(self):
125 logger.debug('Waiting for lock (%s).' % color_tid_pid())
126 ret = RLockBase.acquire(self)
127 logger.debug('Got lock (%s).' % color_tid_pid())
128 return ret
130 def release(self):
131 logger.debug(
132 'Releasing lock (%s).' % color_tid_pid())
133 return RLockBase.release(self)
135 return RLockDebug()
136 else:
137 return threading.RLock()
140LOCK = get_RLock()
143class Transaction(object):
144 def __init__(
145 self, conn,
146 label='',
147 mode='immediate',
148 retry_interval=0.1,
149 callback=None):
151 self.cursor = conn.cursor()
152 assert mode in ('deferred', 'immediate', 'exclusive')
153 self.mode = mode
154 self.depth = 0
155 self.rollback_wanted = False
156 self.retry_interval = retry_interval
157 self.callback = callback
158 self.label = label
159 self.started = False
160 self.closed = False
161 self.debug = logger.isEnabledFor(logging.DEBUG)
163 def begin(self):
164 LOCK.acquire()
165 if self.depth == 0:
166 tries = 0
167 while True:
168 try:
169 tries += 1
170 if self.debug:
171 logger.debug(
172 'Waiting for transaction: %-30s '
173 '(%s, mode: %s)',
174 self.label,
175 color_tid_pid(),
176 self.mode)
178 self.cursor.execute('BEGIN %s' % self.mode.upper())
179 self.started = True
180 if self.debug:
181 logger.debug(
182 'Transaction started: %-30s '
183 '(%s, mode: %s)',
184 self.label,
185 color_tid_pid(),
186 self.mode)
188 self.total_changes_begin \
189 = self.cursor.connection.total_changes
190 break
192 except sqlite3.OperationalError as e:
193 if not str(e) == 'database is locked':
194 raise
196 logger.info(
197 'Database is locked retrying in %s s: %s '
198 '(%s, tries: %i)' % (
199 self.retry_interval, self.label,
200 color_tid_pid(), tries))
202 time.sleep(self.retry_interval)
204 self.depth += 1
206 def commit(self):
207 try:
208 if not self.started:
209 raise Exception(
210 'Trying to commit without having started a transaction '
211 '(%s, %s)' % (
212 self.label,
213 color_tid_pid()))
215 self.depth -= 1
216 if self.depth == 0:
217 if not self.rollback_wanted:
218 self.cursor.execute('COMMIT')
219 self.started = False
220 if self.total_changes_begin is not None:
221 total_changes = self.cursor.connection.total_changes \
222 - self.total_changes_begin
223 else:
224 total_changes = None
226 if self.callback is not None and total_changes:
227 self.callback('modified', total_changes)
229 if self.debug:
230 logger.debug(
231 'Transaction completed: %-30s '
232 '(%s, changes: %i)',
233 self.label,
234 color_tid_pid(),
235 total_changes or 0)
237 else:
238 self.cursor.execute('ROLLBACK')
239 self.started = False
240 logger.warning('Deferred rollback executed.')
241 if self.debug:
242 logger.debug(
243 'Transaction failed: %-30s (%s)',
244 self.label,
245 color_tid_pid())
247 self.rollback_wanted = False
248 finally:
249 LOCK.release()
251 def rollback(self):
252 try:
253 if not self.started:
254 raise Exception(
255 'Trying to rollback without having started a transaction.')
257 self.depth -= 1
258 if self.depth == 0:
259 self.cursor.execute('ROLLBACK')
260 self.started = False
262 if self.debug:
263 logger.debug(
264 'Transaction failed: %-30s (pid: %s)',
265 self.label,
266 os.getpid())
268 self.rollback_wanted = False
269 else:
270 logger.warning('Deferred rollback scheduled.')
271 self.rollback_wanted = True
273 finally:
274 LOCK.release()
276 def close(self):
277 self.cursor.close()
278 self.closed = True
280 def __enter__(self):
281 LOCK.acquire()
282 self.begin()
283 return self.cursor
285 def __exit__(self, exc_type, exc_value, traceback):
286 if exc_type is None:
287 self.commit()
288 else:
289 self.rollback()
291 if self.depth == 0:
292 self.close()
293 self.callback = None
295 LOCK.release()
298class xlist(list):
299 pass
302class Cursor(sqlite3.Cursor):
304 def execute(self, *args, **kwargs):
305 with LOCK:
306 r = super().execute(*args, **kwargs)
307 xl = xlist(r)
308 xl.rowcount = r.rowcount
309 return xl
311 def execute_nolock(self, *args, **kwargs):
312 return super().execute(*args, **kwargs)
314 def executemany(self, *args, **kwargs):
315 with LOCK:
316 return super().executemany(*args, **kwargs)
318 def executescript(self, *args, **kwargs):
319 with LOCK:
320 return super().executescript(*args, **kwargs)
323class Connection(sqlite3.Connection):
324 def cursor(self, factory=Cursor):
325 return factory(self)
327 def execute(self, *args, **kwargs):
328 cursor = self.cursor()
329 return cursor.execute(*args, **kwargs)
331 def execute_nolock(self, *args, **kwargs):
332 cursor = self.cursor()
333 return cursor.execute_nolock(*args, **kwargs)
335 def executemany(self, *args, **kwargs):
336 cursor = self.cursor()
337 return cursor.executemany(*args, **kwargs)
339 def executescript(self, *args, **kwargs):
340 cursor = self.cursor()
341 return cursor.executescript(*args, **kwargs)
343 def close(self):
344 if hasattr(self, '_close_handlers'):
345 for handler in self._close_handlers:
346 handler()
348 sqlite3.Connection.close(self)
350 def on_close(self, handler):
351 if not hasattr(self, '_close_handlers'):
352 self._close_handlers = []
354 self._close_handlers.append(handler)
357class Database(object):
358 '''
359 Shared meta-information database used by Squirrel.
360 '''
362 def __init__(self, database_path=':memory:', log_statements=False):
363 self._database_path = database_path
364 if database_path != ':memory:':
365 util.ensuredirs(database_path)
367 try:
368 logger.debug(
369 'Opening connection to database (threadsafety: %i): %s',
370 sqlite3.threadsafety,
371 database_path)
373 self._conn = sqlite3.connect(
374 database_path,
375 isolation_level=None,
376 factory=Connection,
377 check_same_thread=False if sqlite3.threadsafety == 3 else True)
379 except sqlite3.OperationalError:
380 raise error.SquirrelError(
381 'Cannot connect to database: %s' % database_path)
383 self._conn.on_close(self.close)
385 self._conn.text_factory = str
386 self._tables = {}
388 if log_statements:
389 self._conn.set_trace_callback(self._log_statement)
391 self._transaction = {}
392 self._listeners = []
393 self._initialize_db()
394 self._basepath = None
396 self._selections = []
398 self.version = None
400 def __del__(self):
401 self.close()
403 def close(self):
404 for selection in self._selections[:]:
405 selection.close()
407 self._selections[:] = []
409 def add_selection(self, selection):
410 self._selections.append(selection)
412 def remove_selection(self, selection):
413 self._selections.remove(selection)
415 def set_basepath(self, basepath):
416 if basepath is not None:
417 self._basepath = os.path.abspath(basepath)
418 else:
419 self._basepath = None
421 def relpath(self, path):
422 if self._basepath is not None and path.startswith(
423 self._basepath + os.path.sep):
424 return path[len(self._basepath) + 1:]
425 else:
426 return path
428 def abspath(self, path):
429 if self._basepath is not None and not path.startswith('virtual:') \
430 and not path.startswith('client:') \
431 and not os.path.isabs(path):
432 return os.path.join(self._basepath, path)
433 else:
434 return path
436 def _log_statement(self, statement):
437 logger.debug(statement)
439 def get_connection(self):
440 return self._conn
442 def transaction(self, label='', mode='immediate'):
443 current_transaction = self._transaction.get(tid())
444 if current_transaction and not current_transaction.closed:
445 # Only one transaction can operate on the db at a time.
446 # Synchronized in Transaction.begin and the Transaction context
447 # manager, therefore we allow one Transaction object per thread.
448 # The following exception is only to catch programming errors.
449 raise error.SquirrelError(
450 'Cannot start transaction "%s". A transaction "%s" is already '
451 'in progress.' % (label, self._transaction[tid()].label))
453 self._transaction[tid()] = Transaction(
454 self._conn,
455 label=label,
456 mode=mode,
457 callback=self._notify_listeners)
459 return self._transaction[tid()]
461 def add_listener(self, listener):
462 listener_ref = util.smart_weakref(listener)
463 self._listeners.append(listener_ref)
464 return listener_ref
466 def remove_listener(self, listener_ref):
467 self._listeners.remove(listener_ref)
469 def _notify_listeners(self, event, *args):
470 dead = []
471 for listener_ref in self._listeners:
472 listener = listener_ref()
473 if listener is not None:
474 listener(event, *args)
475 else:
476 dead.append(listener_ref)
478 for listener_ref in dead:
479 self.remove_listener(listener_ref)
481 def _register_table(self, s):
482 m = re.search(r'(\S+)\s*\(([^)]+)\)', s)
483 table_name = m.group(1)
484 dtypes = m.group(2)
485 table_header = []
486 for dele in dtypes.split(','):
487 table_header.append(dele.split()[:2])
489 self._tables[table_name] = table_header
491 return s
493 def optimize(self):
494 logger.info('Optimizing database...')
495 with self.transaction('optimize') as cursor:
496 cursor.execute('''PRAGMA optimize''')
497 logger.info('Done optimizing database.')
499 def activate_wal_mode(self):
500 result = self.get_connection().execute(
501 '''PRAGMA journal_mode''')[0][0]
503 if result == 'wal':
504 logger.info('WAL mode already active.')
505 return
507 logger.info('Activating WAL mode on database.')
508 result = self.get_connection().execute(
509 '''PRAGMA journal_mode=WAL''')[0][0]
510 if result != 'wal':
511 raise error.SquirrelError(
512 'Could not activate WAL mode. Result from '
513 '"PRAGMA journal_mode=WAL" was "%s".' % result)
515 def _initialize_db(self):
516 with self.transaction('initialize') as cursor:
517 cursor.execute(
518 '''PRAGMA recursive_triggers = true''')
520 cursor.execute(
521 '''PRAGMA busy_timeout = 30000''')
523 cursor.execute(
524 '''PRAGMA cache_size = -10000''')
526 if 2 == len(list(
527 cursor.execute(
528 '''
529 SELECT name FROM sqlite_master
530 WHERE type = 'table' AND name IN (
531 'files',
532 'persistent')
533 '''))):
535 try:
536 self.version = versiontuple(execute_get1(
537 cursor,
538 '''
539 SELECT value FROM settings
540 WHERE key = "version"
541 ''')[0])
542 except sqlite3.OperationalError:
543 raise error.SquirrelError(
544 'Squirrel database in pre-release format found: %s\n'
545 'Please remove the database file and reindex.'
546 % self._database_path)
548 if self.version >= (1, 1, 0):
549 raise error.SquirrelError(
550 'Squirrel database "%s" is of version %i.%i.%i which '
551 'is not supported by this version of Pyrocko. Please '
552 'upgrade the Pyrocko library.'
553 % ((self._database_path, ) + self.version))
555 return
557 cursor.execute(self._register_table(
558 '''
559 CREATE TABLE IF NOT EXISTS settings (
560 key text PRIMARY KEY,
561 value text)
562 '''))
564 cursor.execute(
565 "INSERT OR IGNORE INTO settings VALUES ('version', '1.0')")
567 self.version = execute_get1(
568 cursor,
569 'SELECT value FROM settings WHERE key = "version"')
571 cursor.execute(self._register_table(
572 '''
573 CREATE TABLE IF NOT EXISTS files (
574 file_id integer PRIMARY KEY,
575 path text,
576 format text,
577 mtime float,
578 size integer)
579 '''))
581 cursor.execute(
582 '''
583 CREATE UNIQUE INDEX IF NOT EXISTS index_files_path
584 ON files (path)
585 ''')
587 cursor.execute(self._register_table(
588 '''
589 CREATE TABLE IF NOT EXISTS nuts (
590 nut_id integer PRIMARY KEY AUTOINCREMENT,
591 file_id integer,
592 file_segment integer,
593 file_element integer,
594 kind_id integer,
595 kind_codes_id integer,
596 tmin_seconds integer,
597 tmin_offset integer,
598 tmax_seconds integer,
599 tmax_offset integer,
600 kscale integer)
601 '''))
603 cursor.execute(
604 '''
605 CREATE UNIQUE INDEX IF NOT EXISTS index_nuts_file_element
606 ON nuts (file_id, file_segment, file_element)
607 ''')
609 cursor.execute(self._register_table(
610 '''
611 CREATE TABLE IF NOT EXISTS kind_codes (
612 kind_codes_id integer PRIMARY KEY,
613 kind_id integer,
614 codes text,
615 deltat float)
616 '''))
618 cursor.execute(
619 '''
620 CREATE UNIQUE INDEX IF NOT EXISTS index_kind_codes
621 ON kind_codes (kind_id, codes, deltat)
622 ''')
624 cursor.execute(self._register_table(
625 '''
626 CREATE TABLE IF NOT EXISTS kind_codes_count (
627 kind_codes_id integer PRIMARY KEY,
628 count integer)
629 '''))
631 cursor.execute(
632 '''
633 CREATE INDEX IF NOT EXISTS index_nuts_file_id
634 ON nuts (file_id)
635 ''')
637 cursor.execute(
638 '''
639 CREATE TRIGGER IF NOT EXISTS delete_nuts_on_delete_file
640 BEFORE DELETE ON files FOR EACH ROW
641 BEGIN
642 DELETE FROM nuts where file_id = old.file_id;
643 END
644 ''')
646 # trigger only on size to make silent update of mtime possible
647 cursor.execute(
648 '''
649 CREATE TRIGGER IF NOT EXISTS delete_nuts_on_update_file
650 BEFORE UPDATE OF size ON files FOR EACH ROW
651 BEGIN
652 DELETE FROM nuts where file_id = old.file_id;
653 END
654 ''')
656 cursor.execute(
657 '''
658 CREATE TRIGGER IF NOT EXISTS increment_kind_codes
659 BEFORE INSERT ON nuts FOR EACH ROW
660 BEGIN
661 INSERT OR IGNORE INTO kind_codes_count
662 VALUES (new.kind_codes_id, 0);
663 UPDATE kind_codes_count
664 SET count = count + 1
665 WHERE new.kind_codes_id = kind_codes_id;
666 END
667 ''')
669 cursor.execute(
670 '''
671 CREATE TRIGGER IF NOT EXISTS decrement_kind_codes
672 BEFORE DELETE ON nuts FOR EACH ROW
673 BEGIN
674 UPDATE kind_codes_count
675 SET count = count - 1
676 WHERE old.kind_codes_id = kind_codes_id;
677 END
678 ''')
680 cursor.execute(self._register_table(
681 '''
682 CREATE TABLE IF NOT EXISTS persistent (
683 name text UNIQUE)
684 '''))
686 def dig(self, nuts, transaction=None):
687 '''
688 Store or update content meta-information.
690 Given ``nuts`` are assumed to represent an up-to-date and complete
691 inventory of a set of files. Any old information about these files is
692 first pruned from the database (via database triggers). If such content
693 is part of a live selection, it is also removed there. Then the new
694 content meta-information is inserted into the main database. The
695 content is not automatically inserted into the live selections again.
696 It is in the responsibility of the selection object to perform this
697 step.
698 '''
700 nuts = list(nuts)
702 if not nuts:
703 return
705 files = set()
706 kind_codes = set()
707 for nut in nuts:
708 files.add((
709 self.relpath(nut.file_path),
710 nut.file_format,
711 nut.file_mtime,
712 nut.file_size))
713 kind_codes.add(
714 (nut.kind_id, nut.codes.safe_str, nut.deltat or 0.0))
716 with (transaction or self.transaction('dig')) as c:
718 c.executemany(
719 'INSERT OR IGNORE INTO files VALUES (NULL,?,?,?,?)', files)
721 c.executemany(
722 '''UPDATE files SET
723 format = ?, mtime = ?, size = ?
724 WHERE path = ?
725 ''',
726 ((x[1], x[2], x[3], x[0]) for x in files))
728 c.executemany(
729 'INSERT OR IGNORE INTO kind_codes VALUES (NULL,?,?,?)',
730 kind_codes)
732 c.executemany(
733 '''
734 INSERT INTO nuts VALUES
735 (NULL, (
736 SELECT file_id FROM files
737 WHERE path = ?
738 ),?,?,?,
739 (
740 SELECT kind_codes_id FROM kind_codes
741 WHERE kind_id = ? AND codes = ? AND deltat = ?
742 ), ?,?,?,?,?)
743 ''',
744 ((self.relpath(nut.file_path),
745 nut.file_segment, nut.file_element,
746 nut.kind_id,
747 nut.kind_id, nut.codes.safe_str, nut.deltat or 0.0,
748 nut.tmin_seconds, nut.tmin_offset,
749 nut.tmax_seconds, nut.tmax_offset,
750 nut.kscale) for nut in nuts))
752 def undig(self, path, segment=None):
754 path = self.relpath(abspath(path))
756 sql = '''
757 SELECT
758 files.path,
759 files.format,
760 files.mtime,
761 files.size,
762 nuts.file_segment,
763 nuts.file_element,
764 kind_codes.kind_id,
765 kind_codes.codes,
766 nuts.tmin_seconds,
767 nuts.tmin_offset,
768 nuts.tmax_seconds,
769 nuts.tmax_offset,
770 kind_codes.deltat
771 FROM files
772 INNER JOIN nuts ON files.file_id = nuts.file_id
773 INNER JOIN kind_codes
774 ON nuts.kind_codes_id = kind_codes.kind_codes_id
775 WHERE files.path = ?
776 '''
777 args = [path]
778 if segment is not None:
779 sql += ' AND nuts.file_segment = ?'
780 args.append(segment)
782 return [
783 Nut(values_nocheck=(self.abspath(row[0]),) + row[1:])
784 for row in self._conn.execute(sql, args)]
786 def undig_all(self):
787 sql = '''
788 SELECT
789 files.path,
790 files.format,
791 files.mtime,
792 files.size,
793 nuts.file_segment,
794 nuts.file_element,
795 kind_codes.kind_id,
796 kind_codes.codes,
797 nuts.tmin_seconds,
798 nuts.tmin_offset,
799 nuts.tmax_seconds,
800 nuts.tmax_offset,
801 kind_codes.deltat
802 FROM files
803 INNER JOIN nuts ON files.file_id = nuts.file_id
804 INNER JOIN kind_codes
805 ON nuts.kind_codes_id = kind_codes.kind_codes_id
806 '''
808 nuts = []
809 path = None
810 for values in self._conn.execute(sql):
811 if path is not None and values[0] != path:
812 yield path, nuts
813 nuts = []
815 path = self.abspath(values[0])
817 if values[1] is not None:
818 nuts.append(Nut(values_nocheck=(path,) + values[1:]))
820 if path is not None:
821 yield path, nuts
823 def undig_few(self, paths, format='detect', segment=None):
824 for path in paths:
825 nuts = self.undig(path, segment=segment)
826 if nuts:
827 yield (nuts[0].file_format, path), nuts
828 else:
829 yield (format, path), []
831 def undig_many(self, paths, show_progress=True):
832 selection = self.new_selection(paths, show_progress=show_progress)
834 for (_, path), nuts in selection.undig_grouped():
835 yield path, nuts
837 del selection
839 def new_selection(self, paths=None, format='detect', show_progress=True):
840 from .selection import Selection
841 selection = Selection(self)
842 if paths:
843 selection.add(paths, format=format, show_progress=show_progress)
844 return selection
846 def undig_content(self, nut):
847 return None
849 def remove(self, path):
850 '''
851 Prune content meta-information about a given file.
853 All content pieces belonging to file ``path`` are removed from the
854 main database and any attached live selections (via database triggers).
855 '''
857 path = self.relpath(abspath(path))
859 with self.transaction('remove file') as cursor:
860 cursor.execute(
861 'DELETE FROM files WHERE path = ?', (path,))
863 def remove_glob(self, pattern):
864 '''
865 Prune content meta-information about files matching given pattern.
867 All content pieces belonging to files who's pathes match the given
868 ``pattern`` are removed from the main database and any attached live
869 selections (via database triggers).
870 '''
872 with self.transaction('remove file glob') as cursor:
873 return cursor.execute(
874 'DELETE FROM files WHERE path GLOB ?', (pattern,)).rowcount
876 def _remove_volatile(self):
877 '''
878 Prune leftover volatile content from database.
880 If the cleanup handler of an attached selection is not called, e.g. due
881 to a crash or terminated process, volatile content will not be removed
882 properly. This method will delete such leftover entries.
884 This is a mainenance operatation which should only be called when no
885 apps are using the database because it would remove volatile content
886 currently used by the apps.
887 '''
889 with self.transaction('remove volatile') as cursor:
890 return cursor.execute(
891 '''
892 DELETE FROM files
893 WHERE path LIKE 'virtual:volatile:%'
894 ''').rowcount
896 def reset(self, path, transaction=None):
897 '''
898 Prune information associated with a given file, but keep the file path.
900 This method is called when reading a file failed. File attributes,
901 format, size and modification time are set to NULL. File content
902 meta-information is removed from the database and any attached live
903 selections (via database triggers).
904 '''
906 path = self.relpath(abspath(path))
908 with (transaction or self.transaction('reset file')) as cursor:
909 cursor.execute(
910 '''
911 UPDATE files SET
912 format = NULL,
913 mtime = NULL,
914 size = NULL
915 WHERE path = ?
916 ''', (path,))
918 def silent_touch(self, path):
919 '''
920 Update modification time of file without initiating reindexing.
922 Useful to prolong validity period of data with expiration date.
923 '''
925 apath = abspath(path)
926 path = self.relpath(apath)
928 with self.transaction('silent touch') as cursor:
930 sql = 'SELECT format, size FROM files WHERE path = ?'
931 fmt, size = execute_get1(cursor, sql, (path,))
933 mod = io.get_backend(fmt)
934 mod.touch(apath)
935 file_stats = mod.get_stats(apath)
937 if file_stats[1] != size:
938 raise FileLoadError(
939 'Silent update for file "%s" failed: size has changed.'
940 % apath)
942 sql = '''
943 UPDATE files
944 SET mtime = ?
945 WHERE path = ?
946 '''
947 cursor.execute(sql, (file_stats[0], path))
949 def _iter_codes_info(
950 self, kind=None, codes=None, kind_codes_count='kind_codes_count'):
952 args = []
953 sel = ''
954 if kind is not None:
955 kind_id = to_kind_id(kind)
957 sel = 'AND kind_codes.kind_id = ?'
958 args.append(to_kind_id(kind))
960 if codes is not None:
961 assert kind is not None # TODO supp by recursing possible kinds
962 kind_id = to_kind_id(kind)
963 pats = codes_patterns_for_kind(kind_id, codes)
965 if pats:
966 # could optimize this by using IN for non-patterns
967 sel += ' AND ( %s ) ' % ' OR '.join(
968 ('kind_codes.codes GLOB ?',) * len(pats))
970 args.extend(pat.safe_str for pat in pats)
972 sql = ('''
973 SELECT
974 kind_codes.kind_id,
975 kind_codes.codes,
976 kind_codes.deltat,
977 kind_codes.kind_codes_id,
978 %(kind_codes_count)s.count
979 FROM %(kind_codes_count)s
980 INNER JOIN kind_codes
981 ON %(kind_codes_count)s.kind_codes_id
982 = kind_codes.kind_codes_id
983 WHERE %(kind_codes_count)s.count > 0
984 ''' + sel + '''
985 ''') % {'kind_codes_count': kind_codes_count}
987 for kind_id, scodes, deltat, kcid, count in self._conn.execute(
988 sql, args):
990 yield (
991 kind_id, to_codes_simple(kind_id, scodes), deltat, kcid, count)
993 def _iter_deltats(self, kind=None, kind_codes_count='kind_codes_count'):
994 args = []
995 sel = ''
996 if kind is not None:
997 assert isinstance(kind, str)
998 sel = 'AND kind_codes.kind_id = ?'
999 args.append(to_kind_id(kind))
1001 sql = ('''
1002 SELECT DISTINCT kind_codes.deltat FROM %(kind_codes_count)s
1003 INNER JOIN kind_codes
1004 ON %(kind_codes_count)s.kind_codes_id
1005 = kind_codes.kind_codes_id
1006 WHERE %(kind_codes_count)s.count > 0
1007 ''' + sel + '''
1008 ORDER BY kind_codes.deltat
1009 ''') % {'kind_codes_count': kind_codes_count}
1011 for row in self._conn.execute(sql, args):
1012 yield row[0]
1014 def _iter_codes(self, kind=None, kind_codes_count='kind_codes_count'):
1015 args = []
1016 sel = ''
1017 if kind is not None:
1018 assert isinstance(kind, str)
1019 sel = 'AND kind_codes.kind_id = ?'
1020 args.append(to_kind_id(kind))
1022 sql = ('''
1023 SELECT DISTINCT kind_codes.kind_id, kind_codes.codes
1024 FROM %(kind_codes_count)s
1025 INNER JOIN kind_codes
1026 ON %(kind_codes_count)s.kind_codes_id
1027 = kind_codes.kind_codes_id
1028 WHERE %(kind_codes_count)s.count > 0
1029 ''' + sel + '''
1030 ORDER BY kind_codes.codes
1032 ''') % dict(kind_codes_count=kind_codes_count)
1034 for row in self._conn.execute(sql, args):
1035 yield to_codes_simple(*row)
1037 def _iter_kinds(self, codes=None, kind_codes_count='kind_codes_count'):
1038 args = []
1039 sel = ''
1040 if codes is not None:
1041 sel = 'AND kind_codes.codes = ?'
1042 args.append(codes.safe_str)
1044 sql = ('''
1045 SELECT DISTINCT kind_codes.kind_id FROM %(kind_codes_count)s
1046 INNER JOIN kind_codes
1047 ON %(kind_codes_count)s.kind_codes_id
1048 = kind_codes.kind_codes_id
1049 WHERE %(kind_codes_count)s.count > 0
1050 ''' + sel + '''
1051 ORDER BY kind_codes.kind_id
1052 ''') % {'kind_codes_count': kind_codes_count}
1054 for row in self._conn.execute(sql, args):
1055 yield to_kind(row[0])
1057 def iter_paths(self):
1058 for row in self._conn.execute('''SELECT path FROM files'''):
1059 yield self.abspath(row[0])
1061 def iter_nnuts_by_file(self):
1062 sql = '''
1063 SELECT
1064 path,
1065 (SELECT COUNT(*) FROM nuts WHERE nuts.file_id = files.file_id)
1066 FROM files
1067 '''
1068 for row in self._conn.execute(sql):
1069 yield (self.abspath(row[0]),) + row[1:]
1071 def iter_kinds(self, codes=None):
1072 return self._iter_kinds(codes=codes)
1074 def iter_codes(self, kind=None):
1075 return self._iter_codes(kind=kind)
1077 def get_paths(self):
1078 return list(self.iter_paths())
1080 def get_kinds(self, codes=None):
1081 return list(self.iter_kinds(codes=codes))
1083 def get_codes(self, kind=None):
1084 return list(self.iter_codes(kind=kind))
1086 def get_counts(self, kind=None):
1087 d = {}
1088 for kind_id, codes, _, _, count in self._iter_codes_info(kind=kind):
1089 if kind_id not in d:
1090 v = d[kind_id] = {}
1091 else:
1092 v = d[kind_id]
1094 if codes not in v:
1095 v[codes] = 0
1097 v[codes] += count
1099 if kind is not None:
1100 return d[to_kind_id(kind)]
1101 else:
1102 return dict((to_kind(kind_id), v) for (kind_id, v) in d.items())
1104 def get_nfiles(self):
1105 sql = '''SELECT COUNT(*) FROM files'''
1106 for row in self._conn.execute(sql):
1107 return row[0]
1109 def get_nnuts(self):
1110 sql = '''SELECT COUNT(*) FROM nuts'''
1111 for row in self._conn.execute(sql):
1112 return row[0]
1114 def get_nnuts_by_file(self):
1115 return list(self.iter_nnuts_by_file())
1117 def get_total_size(self):
1118 sql = '''
1119 SELECT SUM(files.size) FROM files
1120 '''
1122 for row in self._conn.execute(sql):
1123 return row[0] or 0
1125 def get_persistent_names(self):
1126 sql = '''
1127 SELECT name FROM persistent
1128 '''
1129 return [row[0] for row in self._conn.execute(sql)]
1131 def vacuum(self):
1133 logger.info('Vacuuming database...')
1134 sql = '''
1135 VACUUM
1136 '''
1137 self._conn.execute(sql)
1138 logger.info('Done vacuuming database.')
1140 def get_stats(self):
1141 return DatabaseStats(
1142 nfiles=self.get_nfiles(),
1143 nnuts=self.get_nnuts(),
1144 kinds=self.get_kinds(),
1145 codes=self.get_codes(),
1146 counts=self.get_counts(),
1147 total_size=self.get_total_size(),
1148 persistent=self.get_persistent_names())
1150 def __str__(self):
1151 return str(self.get_stats())
1153 def print_tables(self, stream=None):
1154 for table in [
1155 'persistent',
1156 'files',
1157 'nuts',
1158 'kind_codes',
1159 'kind_codes_count']:
1161 self.print_table(table, stream=stream)
1163 def print_table(self, name, stream=None):
1165 if stream is None:
1166 stream = sys.stdout
1168 class hstr(str):
1169 def __repr__(self):
1170 return self
1172 w = stream.write
1173 w('\n')
1174 w('\n')
1175 w(name)
1176 w('\n')
1177 sql = 'SELECT * FROM %s' % name
1178 tab = []
1179 if name in self._tables:
1180 headers = self._tables[name]
1181 tab.append([None for _ in headers])
1182 tab.append([hstr(x[0]) for x in headers])
1183 tab.append([hstr(x[1]) for x in headers])
1184 tab.append([None for _ in headers])
1186 for row in self._conn.execute(sql):
1187 tab.append([x for x in row])
1189 widths = [
1190 max((len(repr(x)) if x is not None else 0) for x in col)
1191 for col in zip(*tab)]
1193 for row in tab:
1194 w(' '.join(
1195 (repr(x).ljust(wid) if x is not None else ''.ljust(wid, '-'))
1196 for (x, wid) in zip(row, widths)))
1198 w('\n')
1200 w('\n')
1203class DatabaseStats(Object):
1204 '''
1205 Container to hold statistics about contents cached in meta-information db.
1206 '''
1208 nfiles = Int.T(
1209 help='Number of files in database.')
1210 nnuts = Int.T(
1211 help='Number of index nuts in database.')
1212 codes = List.T(
1213 Tuple.T(content_t=String.T()),
1214 help='Available code sequences in database, e.g. '
1215 '(agency, network, station, location) for stations nuts.')
1216 kinds = List.T(
1217 String.T(),
1218 help='Available content types in database.')
1219 total_size = Int.T(
1220 help='Aggregated file size [bytes] of files referenced in database.')
1221 counts = Dict.T(
1222 String.T(), Dict.T(Tuple.T(content_t=String.T()), Int.T()),
1223 help='Breakdown of how many nuts of any content type and code '
1224 'sequence are available in database, ``counts[kind][codes]``.')
1225 persistent = List.T(
1226 String.T(),
1227 help='Names of persistent selections stored in database.')
1229 def __str__(self):
1230 kind_counts = dict(
1231 (kind, sum(self.counts[kind].values())) for kind in self.kinds)
1233 codes = [c.safe_str for c in self.codes]
1235 if len(codes) > 20:
1236 scodes = '\n' + util.ewrap(codes[:10], indent=' ') \
1237 + '\n [%i more]\n' % (len(codes) - 20) \
1238 + util.ewrap(codes[-10:], indent=' ')
1239 else:
1240 scodes = '\n' + util.ewrap(codes, indent=' ') \
1241 if codes else '<none>'
1243 s = '''
1244Available codes: %s
1245Number of files: %i
1246Total size of known files: %s
1247Number of index nuts: %i
1248Available content kinds: %s
1249Persistent selections: %s''' % (
1250 scodes,
1251 self.nfiles,
1252 util.human_bytesize(self.total_size),
1253 self.nnuts,
1254 ', '.join('%s: %i' % (
1255 kind, kind_counts[kind]) for kind in sorted(self.kinds)),
1256 ', '.join(self.persistent))
1258 return s
1261__all__ = [
1262 'Database',
1263 'DatabaseStats',
1264]