1# http://pyrocko.org - GPLv3
2#
3# The Pyrocko Developers, 21st Century
4# ---|P------/S----------~Lg----------
6from __future__ import absolute_import, print_function
8import sys
9import os
10import logging
11import sqlite3
12import re
13import time
15from pyrocko.io.io_common import FileLoadError
16from pyrocko import util
17from pyrocko.guts import Object, Int, List, Dict, Tuple, String
18from . import error, io
19from .model import Nut, to_kind_id, to_kind, separator
20from .error import SquirrelError
22logger = logging.getLogger('psq.database')
24guts_prefix = 'squirrel'
27def abspath(path):
28 if not path.startswith('virtual:') and not path.startswith('client:'):
29 return os.path.abspath(path)
30 else:
31 return path
34class ExecuteGet1Error(SquirrelError):
35 pass
38def execute_get1(connection, sql, args):
39 rows = list(connection.execute(sql, args))
40 if len(rows) == 1:
41 return rows[0]
42 else:
43 raise ExecuteGet1Error('Expected database entry not found.')
46g_databases = {}
49def get_database(path=None):
50 path = os.path.abspath(path)
51 if path not in g_databases:
52 g_databases[path] = Database(path)
54 return g_databases[path]
57class Transaction(object):
58 def __init__(self, conn, mode='immediate', retry_interval=0.1):
59 self.cursor = conn.cursor()
60 assert mode in ('deferred', 'immediate', 'exclusive')
61 self.mode = mode
62 self.depth = 0
63 self.rollback_wanted = False
64 self.retry_interval = retry_interval
66 def begin(self):
67 if self.depth == 0:
68 tries = 0
69 while True:
70 try:
71 tries += 1
72 self.cursor.execute('BEGIN %s' % self.mode.upper())
73 break
75 except sqlite3.OperationalError as e:
76 if not str(e) == 'database is locked':
77 raise
79 logger.info(
80 'Database is locked retrying in %s s. Tries: %i'
81 % (self.retry_interval, tries))
83 time.sleep(self.retry_interval)
85 self.depth += 1
87 def commit(self):
88 self.depth -= 1
89 if self.depth == 0:
90 if not self.rollback_wanted:
91 self.cursor.execute('COMMIT')
92 else:
93 self.cursor.execute('ROLLBACK')
94 logger.warning('Deferred rollback executed.')
95 self.rollback_wanted = False
97 def rollback(self):
98 self.depth -= 1
99 if self.depth == 0:
100 self.cursor.execute('ROLLBACK')
101 self.rollback_wanted = False
102 else:
103 logger.warning('Deferred rollback scheduled.')
104 self.rollback_wanted = True
106 def close(self):
107 self.cursor.close()
109 def __enter__(self):
110 self.begin()
111 return self.cursor
113 def __exit__(self, exc_type, exc_value, traceback):
114 if exc_type is None:
115 self.commit()
116 else:
117 self.rollback()
119 if self.depth == 0:
120 self.close()
123class Database(object):
124 '''
125 Shared meta-information database used by Squirrel.
126 '''
128 def __init__(self, database_path=':memory:', log_statements=False):
129 self._database_path = database_path
130 if database_path != ':memory:':
131 util.ensuredirs(database_path)
133 try:
134 logger.debug('Opening connection to database: %s' % database_path)
135 self._conn = sqlite3.connect(database_path, isolation_level=None)
136 except sqlite3.OperationalError:
137 raise error.SquirrelError(
138 'Cannot connect to database: %s' % database_path)
140 self._conn.text_factory = str
141 self._tables = {}
143 if log_statements:
144 self._conn.set_trace_callback(self._log_statement)
146 self._initialize_db()
147 self._basepath = None
149 def set_basepath(self, basepath):
150 if basepath is not None:
151 self._basepath = os.path.abspath(basepath)
152 else:
153 self._basepath = None
155 def relpath(self, path):
156 if self._basepath is not None and path.startswith(
157 self._basepath + os.path.sep):
158 return path[len(self._basepath) + 1:]
159 else:
160 return path
162 def abspath(self, path):
163 if self._basepath is not None and not path.startswith('virtual:') \
164 and not path.startswith('client:') \
165 and not os.path.isabs(path):
166 return os.path.join(self._basepath, path)
167 else:
168 return path
170 def _log_statement(self, statement):
171 logger.debug(statement)
173 def get_connection(self):
174 return self._conn
176 def transaction(self, mode='immediate'):
177 return Transaction(self._conn, mode)
179 def _register_table(self, s):
180 m = re.search(r'(\S+)\s*\(([^)]+)\)', s)
181 table_name = m.group(1)
182 dtypes = m.group(2)
183 table_header = []
184 for dele in dtypes.split(','):
185 table_header.append(dele.split()[:2])
187 self._tables[table_name] = table_header
189 return s
191 def _initialize_db(self):
192 with self.transaction() as cursor:
193 cursor.execute(
194 '''PRAGMA recursive_triggers = true''')
196 cursor.execute(
197 '''PRAGMA busy_timeout = 30000''')
199 if 2 == len(list(
200 cursor.execute(
201 '''
202 SELECT name FROM sqlite_master
203 WHERE type = 'table' AND name IN (
204 'files',
205 'persistent')
206 '''))):
208 return
210 cursor.execute(self._register_table(
211 '''
212 CREATE TABLE IF NOT EXISTS files (
213 file_id integer PRIMARY KEY,
214 path text,
215 format text,
216 mtime float,
217 size integer)
218 '''))
220 cursor.execute(
221 '''
222 CREATE UNIQUE INDEX IF NOT EXISTS index_files_path
223 ON files (path)
224 ''')
226 cursor.execute(self._register_table(
227 '''
228 CREATE TABLE IF NOT EXISTS nuts (
229 nut_id integer PRIMARY KEY AUTOINCREMENT,
230 file_id integer,
231 file_segment integer,
232 file_element integer,
233 kind_id integer,
234 kind_codes_id integer,
235 tmin_seconds integer,
236 tmin_offset integer,
237 tmax_seconds integer,
238 tmax_offset integer,
239 kscale integer)
240 '''))
242 cursor.execute(
243 '''
244 CREATE UNIQUE INDEX IF NOT EXISTS index_nuts_file_element
245 ON nuts (file_id, file_segment, file_element)
246 ''')
248 cursor.execute(self._register_table(
249 '''
250 CREATE TABLE IF NOT EXISTS kind_codes (
251 kind_codes_id integer PRIMARY KEY,
252 kind_id integer,
253 codes text,
254 deltat float)
255 '''))
257 cursor.execute(
258 '''
259 CREATE UNIQUE INDEX IF NOT EXISTS index_kind_codes
260 ON kind_codes (kind_id, codes, deltat)
261 ''')
263 cursor.execute(self._register_table(
264 '''
265 CREATE TABLE IF NOT EXISTS kind_codes_count (
266 kind_codes_id integer PRIMARY KEY,
267 count integer)
268 '''))
270 cursor.execute(
271 '''
272 CREATE INDEX IF NOT EXISTS index_nuts_file_id
273 ON nuts (file_id)
274 ''')
276 cursor.execute(
277 '''
278 CREATE TRIGGER IF NOT EXISTS delete_nuts_on_delete_file
279 BEFORE DELETE ON files FOR EACH ROW
280 BEGIN
281 DELETE FROM nuts where file_id == old.file_id;
282 END
283 ''')
285 # trigger only on size to make silent update of mtime possible
286 cursor.execute(
287 '''
288 CREATE TRIGGER IF NOT EXISTS delete_nuts_on_update_file
289 BEFORE UPDATE OF size ON files FOR EACH ROW
290 BEGIN
291 DELETE FROM nuts where file_id == old.file_id;
292 END
293 ''')
295 cursor.execute(
296 '''
297 CREATE TRIGGER IF NOT EXISTS increment_kind_codes
298 BEFORE INSERT ON nuts FOR EACH ROW
299 BEGIN
300 INSERT OR IGNORE INTO kind_codes_count
301 VALUES (new.kind_codes_id, 0);
302 UPDATE kind_codes_count
303 SET count = count + 1
304 WHERE new.kind_codes_id == kind_codes_id;
305 END
306 ''')
308 cursor.execute(
309 '''
310 CREATE TRIGGER IF NOT EXISTS decrement_kind_codes
311 BEFORE DELETE ON nuts FOR EACH ROW
312 BEGIN
313 UPDATE kind_codes_count
314 SET count = count - 1
315 WHERE old.kind_codes_id == kind_codes_id;
316 END
317 ''')
319 cursor.execute(self._register_table(
320 '''
321 CREATE TABLE IF NOT EXISTS persistent (
322 name text UNIQUE)
323 '''))
325 def dig(self, nuts, transaction=None):
326 '''
327 Store or update content meta-information.
329 Given ``nuts`` are assumed to represent an up-to-date and complete
330 inventory of a set of files. Any old information about these files is
331 first pruned from the database (via database triggers). If such content
332 is part of a live selection, it is also removed there. Then the new
333 content meta-information is inserted into the main database. The
334 content is not automatically inserted into the live selections again.
335 It is in the responsibility of the selection object to perform this
336 step.
337 '''
339 nuts = list(nuts)
341 if not nuts:
342 return
344 files = set()
345 kind_codes = set()
346 for nut in nuts:
347 files.add((
348 self.relpath(nut.file_path),
349 nut.file_format,
350 nut.file_mtime,
351 nut.file_size))
352 kind_codes.add((nut.kind_id, nut.codes, nut.deltat or 0.0))
354 with (transaction or self.transaction()) as c:
356 c.executemany(
357 'INSERT OR IGNORE INTO files VALUES (NULL,?,?,?,?)', files)
359 c.executemany(
360 '''UPDATE files SET
361 format = ?, mtime = ?, size = ?
362 WHERE path == ?
363 ''',
364 ((x[1], x[2], x[3], x[0]) for x in files))
366 c.executemany(
367 'INSERT OR IGNORE INTO kind_codes VALUES (NULL,?,?,?)',
368 kind_codes)
370 c.executemany(
371 '''
372 INSERT INTO nuts VALUES
373 (NULL, (
374 SELECT file_id FROM files
375 WHERE path == ?
376 ),?,?,?,
377 (
378 SELECT kind_codes_id FROM kind_codes
379 WHERE kind_id == ? AND codes == ? AND deltat == ?
380 ), ?,?,?,?,?)
381 ''',
382 ((self.relpath(nut.file_path),
383 nut.file_segment, nut.file_element,
384 nut.kind_id,
385 nut.kind_id, nut.codes, nut.deltat or 0.0,
386 nut.tmin_seconds, nut.tmin_offset,
387 nut.tmax_seconds, nut.tmax_offset,
388 nut.kscale) for nut in nuts))
390 def undig(self, path):
392 path = self.relpath(abspath(path))
394 sql = '''
395 SELECT
396 files.path,
397 files.format,
398 files.mtime,
399 files.size,
400 nuts.file_segment,
401 nuts.file_element,
402 kind_codes.kind_id,
403 kind_codes.codes,
404 nuts.tmin_seconds,
405 nuts.tmin_offset,
406 nuts.tmax_seconds,
407 nuts.tmax_offset,
408 kind_codes.deltat
409 FROM files
410 INNER JOIN nuts ON files.file_id = nuts.file_id
411 INNER JOIN kind_codes
412 ON nuts.kind_codes_id == kind_codes.kind_codes_id
413 WHERE path == ?
414 '''
416 return [Nut(values_nocheck=(self.abspath(row[0]),) + row[1:])
417 for row in self._conn.execute(sql, (path,))]
419 def undig_all(self):
420 sql = '''
421 SELECT
422 files.path,
423 files.format,
424 files.mtime,
425 files.size,
426 nuts.file_segment,
427 nuts.file_element,
428 kind_codes.kind_id,
429 kind_codes.codes,
430 nuts.tmin_seconds,
431 nuts.tmin_offset,
432 nuts.tmax_seconds,
433 nuts.tmax_offset,
434 kind_codes.deltat
435 FROM files
436 INNER JOIN nuts ON files.file_id == nuts.file_id
437 INNER JOIN kind_codes
438 ON nuts.kind_codes_id == kind_codes.kind_codes_id
439 '''
441 nuts = []
442 path = None
443 for values in self._conn.execute(sql):
444 if path is not None and values[0] != path:
445 yield path, nuts
446 nuts = []
448 path = self.abspath(values[0])
450 if values[1] is not None:
451 nuts.append(Nut(values_nocheck=(path,) + values[1:]))
453 if path is not None:
454 yield path, nuts
456 def undig_many(self, paths, show_progress=True):
457 selection = self.new_selection(paths, show_progress=show_progress)
459 for (_, path), nuts in selection.undig_grouped():
460 yield path, nuts
462 del selection
464 def new_selection(self, paths=None, show_progress=True):
465 from .selection import Selection
466 selection = Selection(self)
467 if paths:
468 selection.add(paths, show_progress=show_progress)
469 return selection
471 def undig_content(self, nut):
472 return None
474 def remove(self, path):
475 '''
476 Prune content meta-information about a given file.
478 All content pieces belonging to file ``path`` are removed from the
479 main database and any attached live selections (via database triggers).
480 '''
482 path = self.relpath(abspath(path))
484 with self.transaction() as cursor:
485 cursor.execute(
486 'DELETE FROM files WHERE path = ?', (path,))
488 def remove_glob(self, pattern):
489 '''
490 Prune content meta-information about files matching given pattern.
492 All content pieces belonging to files who's pathes match the given
493 ``pattern`` are removed from the main database and any attached live
494 selections (via database triggers).
495 '''
497 with self.transaction() as cursor:
498 return cursor.execute(
499 'DELETE FROM files WHERE path GLOB ?', (pattern,)).rowcount
501 def _remove_volatile(self):
502 '''
503 Prune leftover volatile content from database.
505 If the cleanup handler of an attached selection is not called, e.g. due
506 to a crash or terminated process, volatile content will not be removed
507 properly. This method will delete such leftover entries.
509 This is a mainenance operatation which should only be called when no
510 apps are using the database because it would remove volatile content
511 currently used by the apps.
512 '''
514 with self.transaction() as cursor:
515 return cursor.execute(
516 '''
517 DELETE FROM files
518 WHERE path LIKE "virtual:volatile:%"').rowcount
519 ''').rowcount
521 def reset(self, path, transaction=None):
522 '''
523 Prune information associated with a given file, but keep the file path.
525 This method is called when reading a file failed. File attributes,
526 format, size and modification time are set to NULL. File content
527 meta-information is removed from the database and any attached live
528 selections (via database triggers).
529 '''
531 path = self.relpath(abspath(path))
533 with (transaction or self.transaction()) as cursor:
534 cursor.execute(
535 '''
536 UPDATE files SET
537 format = NULL,
538 mtime = NULL,
539 size = NULL
540 WHERE path = ?
541 ''', (path,))
543 def silent_touch(self, path):
544 '''
545 Update modification time of file without initiating reindexing.
547 Useful to prolong validity period of data with expiration date.
548 '''
550 apath = abspath(path)
551 path = self.relpath(apath)
553 with self.transaction() as cursor:
555 sql = 'SELECT format, size FROM files WHERE path = ?'
556 fmt, size = execute_get1(cursor, sql, (path,))
558 mod = io.get_backend(fmt)
559 mod.touch(apath)
560 file_stats = mod.get_stats(apath)
562 if file_stats[1] != size:
563 raise FileLoadError(
564 'Silent update for file "%s" failed: size has changed.'
565 % apath)
567 sql = '''
568 UPDATE files
569 SET mtime = ?
570 WHERE path = ?
571 '''
572 cursor.execute(sql, (file_stats[0], path))
574 def _iter_counts(self, kind=None, kind_codes_count='kind_codes_count'):
575 args = []
576 sel = ''
577 if kind is not None:
578 sel = 'AND kind_codes.kind_id == ?'
579 args.append(to_kind_id(kind))
581 sql = ('''
582 SELECT
583 kind_codes.kind_id,
584 kind_codes.codes,
585 kind_codes.deltat,
586 %(kind_codes_count)s.count
587 FROM %(kind_codes_count)s
588 INNER JOIN kind_codes
589 ON %(kind_codes_count)s.kind_codes_id
590 == kind_codes.kind_codes_id
591 WHERE %(kind_codes_count)s.count > 0
592 ''' + sel + '''
593 ''') % {'kind_codes_count': kind_codes_count}
595 for kind_id, codes, deltat, count in self._conn.execute(sql, args):
596 yield (
597 to_kind(kind_id),
598 tuple(codes.split(separator)),
599 deltat), count
601 def _iter_deltats(self, kind=None, kind_codes_count='kind_codes_count'):
602 args = []
603 sel = ''
604 if kind is not None:
605 assert isinstance(kind, str)
606 sel = 'AND kind_codes.kind_id == ?'
607 args.append(to_kind_id(kind))
609 sql = ('''
610 SELECT DISTINCT kind_codes.deltat FROM %(kind_codes_count)s
611 INNER JOIN kind_codes
612 ON %(kind_codes_count)s.kind_codes_id
613 == kind_codes.kind_codes_id
614 WHERE %(kind_codes_count)s.count > 0
615 ''' + sel + '''
616 ORDER BY kind_codes.deltat
617 ''') % {'kind_codes_count': kind_codes_count}
619 for row in self._conn.execute(sql, args):
620 yield row[0]
622 def _iter_codes(self, kind=None, kind_codes_count='kind_codes_count'):
623 args = []
624 sel = ''
625 if kind is not None:
626 assert isinstance(kind, str)
627 sel = 'AND kind_codes.kind_id == ?'
628 args.append(to_kind_id(kind))
630 sql = ('''
631 SELECT DISTINCT kind_codes.codes FROM %(kind_codes_count)s
632 INNER JOIN kind_codes
633 ON %(kind_codes_count)s.kind_codes_id
634 == kind_codes.kind_codes_id
635 WHERE %(kind_codes_count)s.count > 0
636 ''' + sel + '''
637 ORDER BY kind_codes.codes
638 ''') % {'kind_codes_count': kind_codes_count}
640 for row in self._conn.execute(sql, args):
641 yield tuple(row[0].split(separator))
643 def _iter_kinds(self, codes=None, kind_codes_count='kind_codes_count'):
644 args = []
645 sel = ''
646 if codes is not None:
647 assert isinstance(codes, tuple)
648 sel = 'AND kind_codes.codes == ?'
649 args.append(separator.join(codes))
651 sql = ('''
652 SELECT DISTINCT kind_codes.kind_id FROM %(kind_codes_count)s
653 INNER JOIN kind_codes
654 ON %(kind_codes_count)s.kind_codes_id
655 == kind_codes.kind_codes_id
656 WHERE %(kind_codes_count)s.count > 0
657 ''' + sel + '''
658 ORDER BY kind_codes.kind_id
659 ''') % {'kind_codes_count': kind_codes_count}
661 for row in self._conn.execute(sql, args):
662 yield to_kind(row[0])
664 def iter_paths(self):
665 for row in self._conn.execute('''SELECT path FROM files'''):
666 yield self.abspath(row[0])
668 def iter_nnuts_by_file(self):
669 sql = '''
670 SELECT
671 path,
672 (SELECT COUNT(*) FROM nuts WHERE nuts.file_id = files.file_id)
673 FROM files
674 '''
675 for row in self._conn.execute(sql):
676 yield (self.abspath(row[0]),) + row[1:]
678 def iter_kinds(self, codes=None):
679 return self._iter_kinds(codes=codes)
681 def iter_codes(self, kind=None):
682 return self._iter_codes(kind=kind)
684 def iter_counts(self, kind=None):
685 return self._iter_counts(kind=kind)
687 def get_paths(self):
688 return list(self.iter_paths())
690 def get_kinds(self, codes=None):
691 return list(self.iter_kinds(codes=codes))
693 def get_codes(self, kind=None):
694 return list(self.iter_codes(kind=kind))
696 def get_counts(self, kind=None):
697 d = {}
698 for (k, codes, deltat), count in self.iter_counts():
699 if k not in d:
700 v = d[k] = {}
701 else:
702 v = d[k]
704 if codes not in v:
705 v[codes] = 0
707 v[codes] += count
709 if kind is not None:
710 return d[kind]
711 else:
712 return d
714 def get_nfiles(self):
715 sql = '''SELECT COUNT(*) FROM files'''
716 for row in self._conn.execute(sql):
717 return row[0]
719 def get_nnuts(self):
720 sql = '''SELECT COUNT(*) FROM nuts'''
721 for row in self._conn.execute(sql):
722 return row[0]
724 def get_nnuts_by_file(self):
725 return list(self.iter_nnuts_by_file())
727 def get_total_size(self):
728 sql = '''
729 SELECT SUM(files.size) FROM files
730 '''
732 for row in self._conn.execute(sql):
733 return row[0] or 0
735 def get_persistent_names(self):
736 sql = '''
737 SELECT name FROM persistent
738 '''
739 return [row[0] for row in self._conn.execute(sql)]
741 def get_stats(self):
742 return DatabaseStats(
743 nfiles=self.get_nfiles(),
744 nnuts=self.get_nnuts(),
745 kinds=self.get_kinds(),
746 codes=self.get_codes(),
747 counts=self.get_counts(),
748 total_size=self.get_total_size(),
749 persistent=self.get_persistent_names())
751 def __str__(self):
752 return str(self.get_stats())
754 def print_tables(self, stream=None):
755 for table in [
756 'persistent',
757 'files',
758 'nuts',
759 'kind_codes',
760 'kind_codes_count']:
762 self.print_table(table, stream=stream)
764 def print_table(self, name, stream=None):
766 if stream is None:
767 stream = sys.stdout
769 class hstr(str):
770 def __repr__(self):
771 return self
773 w = stream.write
774 w('\n')
775 w('\n')
776 w(name)
777 w('\n')
778 sql = 'SELECT * FROM %s' % name
779 tab = []
780 if name in self._tables:
781 headers = self._tables[name]
782 tab.append([None for _ in headers])
783 tab.append([hstr(x[0]) for x in headers])
784 tab.append([hstr(x[1]) for x in headers])
785 tab.append([None for _ in headers])
787 for row in self._conn.execute(sql):
788 tab.append([x for x in row])
790 widths = [
791 max((len(repr(x)) if x is not None else 0) for x in col)
792 for col in zip(*tab)]
794 for row in tab:
795 w(' '.join(
796 (repr(x).ljust(wid) if x is not None else ''.ljust(wid, '-'))
797 for (x, wid) in zip(row, widths)))
799 w('\n')
801 w('\n')
804class DatabaseStats(Object):
805 '''
806 Container to hold statistics about contents cached in meta-information db.
807 '''
809 nfiles = Int.T(
810 help='Number of files in database.')
811 nnuts = Int.T(
812 help='Number of index nuts in database.')
813 codes = List.T(
814 Tuple.T(content_t=String.T()),
815 help='Available code sequences in database, e.g. '
816 '(agency, network, station, location) for stations nuts.')
817 kinds = List.T(
818 String.T(),
819 help='Available content types in database.')
820 total_size = Int.T(
821 help='Aggregated file size of files referenced in database.')
822 counts = Dict.T(
823 String.T(), Dict.T(Tuple.T(content_t=String.T()), Int.T()),
824 help='Breakdown of how many nuts of any content type and code '
825 'sequence are available in database, ``counts[kind][codes]``.')
826 persistent = List.T(
827 String.T(),
828 help='Names of persistent selections stored in database.')
830 def __str__(self):
831 kind_counts = dict(
832 (kind, sum(self.counts[kind].values())) for kind in self.kinds)
834 codes = ['.'.join(x) for x in self.codes]
836 if len(codes) > 20:
837 scodes = '\n' + util.ewrap(codes[:10], indent=' ') \
838 + '\n [%i more]\n' % (len(codes) - 20) \
839 + util.ewrap(codes[-10:], indent=' ')
840 else:
841 scodes = '\n' + util.ewrap(codes, indent=' ') \
842 if codes else '<none>'
844 s = '''
845Available codes: %s
846Number of files: %i
847Total size of known files: %s
848Number of index nuts: %i
849Available content kinds: %s
850Persistent selections: %s''' % (
851 scodes,
852 self.nfiles,
853 util.human_bytesize(self.total_size),
854 self.nnuts,
855 ', '.join('%s: %i' % (
856 kind, kind_counts[kind]) for kind in sorted(self.kinds)),
857 ', '.join(self.persistent))
859 return s
862__all__ = [
863 'Database',
864 'DatabaseStats',
865]