1# http://pyrocko.org - GPLv3
2#
3# The Pyrocko Developers, 21st Century
4# ---|P------/S----------~Lg----------
6from __future__ import absolute_import, print_function
8import sys
9import os
10import logging
11import sqlite3
12import re
13import time
15from pyrocko.io.io_common import FileLoadError
16from pyrocko import util
17from pyrocko.guts import Object, Int, List, Dict, Tuple, String
18from . import error, io
19from .model import Nut, to_kind_id, to_kind, separator
20from .error import SquirrelError
22logger = logging.getLogger('psq.database')
24guts_prefix = 'squirrel'
27def abspath(path):
28 if not path.startswith('virtual:') and not path.startswith('client:'):
29 return os.path.abspath(path)
30 else:
31 return path
34class ExecuteGet1Error(SquirrelError):
35 pass
38def execute_get1(connection, sql, args):
39 rows = list(connection.execute(sql, args))
40 if len(rows) == 1:
41 return rows[0]
42 else:
43 raise ExecuteGet1Error('Expected database entry not found.')
46g_databases = {}
49def get_database(path):
50 path = os.path.abspath(path)
51 if path not in g_databases:
52 g_databases[path] = Database(path)
54 return g_databases[path]
57def close_database(database):
58 path = os.path.abspath(database._database_path)
59 database._conn.close()
60 if path in g_databases:
61 del g_databases[path]
64class Transaction(object):
65 def __init__(self, conn, mode='immediate', retry_interval=0.1):
66 self.cursor = conn.cursor()
67 assert mode in ('deferred', 'immediate', 'exclusive')
68 self.mode = mode
69 self.depth = 0
70 self.rollback_wanted = False
71 self.retry_interval = retry_interval
73 def begin(self):
74 if self.depth == 0:
75 tries = 0
76 while True:
77 try:
78 tries += 1
79 self.cursor.execute('BEGIN %s' % self.mode.upper())
80 break
82 except sqlite3.OperationalError as e:
83 if not str(e) == 'database is locked':
84 raise
86 logger.info(
87 'Database is locked retrying in %s s. Tries: %i'
88 % (self.retry_interval, tries))
90 time.sleep(self.retry_interval)
92 self.depth += 1
94 def commit(self):
95 self.depth -= 1
96 if self.depth == 0:
97 if not self.rollback_wanted:
98 self.cursor.execute('COMMIT')
99 else:
100 self.cursor.execute('ROLLBACK')
101 logger.warning('Deferred rollback executed.')
102 self.rollback_wanted = False
104 def rollback(self):
105 self.depth -= 1
106 if self.depth == 0:
107 self.cursor.execute('ROLLBACK')
108 self.rollback_wanted = False
109 else:
110 logger.warning('Deferred rollback scheduled.')
111 self.rollback_wanted = True
113 def close(self):
114 self.cursor.close()
116 def __enter__(self):
117 self.begin()
118 return self.cursor
120 def __exit__(self, exc_type, exc_value, traceback):
121 if exc_type is None:
122 self.commit()
123 else:
124 self.rollback()
126 if self.depth == 0:
127 self.close()
130class Database(object):
131 '''
132 Shared meta-information database used by Squirrel.
133 '''
135 def __init__(self, database_path=':memory:', log_statements=False):
136 self._database_path = database_path
137 if database_path != ':memory:':
138 util.ensuredirs(database_path)
140 try:
141 logger.debug('Opening connection to database: %s' % database_path)
142 self._conn = sqlite3.connect(database_path, isolation_level=None)
143 except sqlite3.OperationalError:
144 raise error.SquirrelError(
145 'Cannot connect to database: %s' % database_path)
147 self._conn.text_factory = str
148 self._tables = {}
150 if log_statements:
151 self._conn.set_trace_callback(self._log_statement)
153 self._initialize_db()
154 self._basepath = None
156 def set_basepath(self, basepath):
157 if basepath is not None:
158 self._basepath = os.path.abspath(basepath)
159 else:
160 self._basepath = None
162 def relpath(self, path):
163 if self._basepath is not None and path.startswith(
164 self._basepath + os.path.sep):
165 return path[len(self._basepath) + 1:]
166 else:
167 return path
169 def abspath(self, path):
170 if self._basepath is not None and not path.startswith('virtual:') \
171 and not path.startswith('client:') \
172 and not os.path.isabs(path):
173 return os.path.join(self._basepath, path)
174 else:
175 return path
177 def _log_statement(self, statement):
178 logger.debug(statement)
180 def get_connection(self):
181 return self._conn
183 def transaction(self, mode='immediate'):
184 return Transaction(self._conn, mode)
186 def _register_table(self, s):
187 m = re.search(r'(\S+)\s*\(([^)]+)\)', s)
188 table_name = m.group(1)
189 dtypes = m.group(2)
190 table_header = []
191 for dele in dtypes.split(','):
192 table_header.append(dele.split()[:2])
194 self._tables[table_name] = table_header
196 return s
198 def _initialize_db(self):
199 with self.transaction() as cursor:
200 cursor.execute(
201 '''PRAGMA recursive_triggers = true''')
203 cursor.execute(
204 '''PRAGMA busy_timeout = 30000''')
206 if 2 == len(list(
207 cursor.execute(
208 '''
209 SELECT name FROM sqlite_master
210 WHERE type = 'table' AND name IN (
211 'files',
212 'persistent')
213 '''))):
215 return
217 cursor.execute(self._register_table(
218 '''
219 CREATE TABLE IF NOT EXISTS files (
220 file_id integer PRIMARY KEY,
221 path text,
222 format text,
223 mtime float,
224 size integer)
225 '''))
227 cursor.execute(
228 '''
229 CREATE UNIQUE INDEX IF NOT EXISTS index_files_path
230 ON files (path)
231 ''')
233 cursor.execute(self._register_table(
234 '''
235 CREATE TABLE IF NOT EXISTS nuts (
236 nut_id integer PRIMARY KEY AUTOINCREMENT,
237 file_id integer,
238 file_segment integer,
239 file_element integer,
240 kind_id integer,
241 kind_codes_id integer,
242 tmin_seconds integer,
243 tmin_offset integer,
244 tmax_seconds integer,
245 tmax_offset integer,
246 kscale integer)
247 '''))
249 cursor.execute(
250 '''
251 CREATE UNIQUE INDEX IF NOT EXISTS index_nuts_file_element
252 ON nuts (file_id, file_segment, file_element)
253 ''')
255 cursor.execute(self._register_table(
256 '''
257 CREATE TABLE IF NOT EXISTS kind_codes (
258 kind_codes_id integer PRIMARY KEY,
259 kind_id integer,
260 codes text,
261 deltat float)
262 '''))
264 cursor.execute(
265 '''
266 CREATE UNIQUE INDEX IF NOT EXISTS index_kind_codes
267 ON kind_codes (kind_id, codes, deltat)
268 ''')
270 cursor.execute(self._register_table(
271 '''
272 CREATE TABLE IF NOT EXISTS kind_codes_count (
273 kind_codes_id integer PRIMARY KEY,
274 count integer)
275 '''))
277 cursor.execute(
278 '''
279 CREATE INDEX IF NOT EXISTS index_nuts_file_id
280 ON nuts (file_id)
281 ''')
283 cursor.execute(
284 '''
285 CREATE TRIGGER IF NOT EXISTS delete_nuts_on_delete_file
286 BEFORE DELETE ON files FOR EACH ROW
287 BEGIN
288 DELETE FROM nuts where file_id == old.file_id;
289 END
290 ''')
292 # trigger only on size to make silent update of mtime possible
293 cursor.execute(
294 '''
295 CREATE TRIGGER IF NOT EXISTS delete_nuts_on_update_file
296 BEFORE UPDATE OF size ON files FOR EACH ROW
297 BEGIN
298 DELETE FROM nuts where file_id == old.file_id;
299 END
300 ''')
302 cursor.execute(
303 '''
304 CREATE TRIGGER IF NOT EXISTS increment_kind_codes
305 BEFORE INSERT ON nuts FOR EACH ROW
306 BEGIN
307 INSERT OR IGNORE INTO kind_codes_count
308 VALUES (new.kind_codes_id, 0);
309 UPDATE kind_codes_count
310 SET count = count + 1
311 WHERE new.kind_codes_id == kind_codes_id;
312 END
313 ''')
315 cursor.execute(
316 '''
317 CREATE TRIGGER IF NOT EXISTS decrement_kind_codes
318 BEFORE DELETE ON nuts FOR EACH ROW
319 BEGIN
320 UPDATE kind_codes_count
321 SET count = count - 1
322 WHERE old.kind_codes_id == kind_codes_id;
323 END
324 ''')
326 cursor.execute(self._register_table(
327 '''
328 CREATE TABLE IF NOT EXISTS persistent (
329 name text UNIQUE)
330 '''))
332 def dig(self, nuts, transaction=None):
333 '''
334 Store or update content meta-information.
336 Given ``nuts`` are assumed to represent an up-to-date and complete
337 inventory of a set of files. Any old information about these files is
338 first pruned from the database (via database triggers). If such content
339 is part of a live selection, it is also removed there. Then the new
340 content meta-information is inserted into the main database. The
341 content is not automatically inserted into the live selections again.
342 It is in the responsibility of the selection object to perform this
343 step.
344 '''
346 nuts = list(nuts)
348 if not nuts:
349 return
351 files = set()
352 kind_codes = set()
353 for nut in nuts:
354 files.add((
355 self.relpath(nut.file_path),
356 nut.file_format,
357 nut.file_mtime,
358 nut.file_size))
359 kind_codes.add((nut.kind_id, nut.codes, nut.deltat or 0.0))
361 with (transaction or self.transaction()) as c:
363 c.executemany(
364 'INSERT OR IGNORE INTO files VALUES (NULL,?,?,?,?)', files)
366 c.executemany(
367 '''UPDATE files SET
368 format = ?, mtime = ?, size = ?
369 WHERE path == ?
370 ''',
371 ((x[1], x[2], x[3], x[0]) for x in files))
373 c.executemany(
374 'INSERT OR IGNORE INTO kind_codes VALUES (NULL,?,?,?)',
375 kind_codes)
377 c.executemany(
378 '''
379 INSERT INTO nuts VALUES
380 (NULL, (
381 SELECT file_id FROM files
382 WHERE path == ?
383 ),?,?,?,
384 (
385 SELECT kind_codes_id FROM kind_codes
386 WHERE kind_id == ? AND codes == ? AND deltat == ?
387 ), ?,?,?,?,?)
388 ''',
389 ((self.relpath(nut.file_path),
390 nut.file_segment, nut.file_element,
391 nut.kind_id,
392 nut.kind_id, nut.codes, nut.deltat or 0.0,
393 nut.tmin_seconds, nut.tmin_offset,
394 nut.tmax_seconds, nut.tmax_offset,
395 nut.kscale) for nut in nuts))
397 def undig(self, path):
399 path = self.relpath(abspath(path))
401 sql = '''
402 SELECT
403 files.path,
404 files.format,
405 files.mtime,
406 files.size,
407 nuts.file_segment,
408 nuts.file_element,
409 kind_codes.kind_id,
410 kind_codes.codes,
411 nuts.tmin_seconds,
412 nuts.tmin_offset,
413 nuts.tmax_seconds,
414 nuts.tmax_offset,
415 kind_codes.deltat
416 FROM files
417 INNER JOIN nuts ON files.file_id = nuts.file_id
418 INNER JOIN kind_codes
419 ON nuts.kind_codes_id == kind_codes.kind_codes_id
420 WHERE path == ?
421 '''
423 return [Nut(values_nocheck=(self.abspath(row[0]),) + row[1:])
424 for row in self._conn.execute(sql, (path,))]
426 def undig_all(self):
427 sql = '''
428 SELECT
429 files.path,
430 files.format,
431 files.mtime,
432 files.size,
433 nuts.file_segment,
434 nuts.file_element,
435 kind_codes.kind_id,
436 kind_codes.codes,
437 nuts.tmin_seconds,
438 nuts.tmin_offset,
439 nuts.tmax_seconds,
440 nuts.tmax_offset,
441 kind_codes.deltat
442 FROM files
443 INNER JOIN nuts ON files.file_id == nuts.file_id
444 INNER JOIN kind_codes
445 ON nuts.kind_codes_id == kind_codes.kind_codes_id
446 '''
448 nuts = []
449 path = None
450 for values in self._conn.execute(sql):
451 if path is not None and values[0] != path:
452 yield path, nuts
453 nuts = []
455 path = self.abspath(values[0])
457 if values[1] is not None:
458 nuts.append(Nut(values_nocheck=(path,) + values[1:]))
460 if path is not None:
461 yield path, nuts
463 def undig_many(self, paths, show_progress=True):
464 selection = self.new_selection(paths, show_progress=show_progress)
466 for (_, path), nuts in selection.undig_grouped():
467 yield path, nuts
469 del selection
471 def new_selection(self, paths=None, show_progress=True):
472 from .selection import Selection
473 selection = Selection(self)
474 if paths:
475 selection.add(paths, show_progress=show_progress)
476 return selection
478 def undig_content(self, nut):
479 return None
481 def remove(self, path):
482 '''
483 Prune content meta-information about a given file.
485 All content pieces belonging to file ``path`` are removed from the
486 main database and any attached live selections (via database triggers).
487 '''
489 path = self.relpath(abspath(path))
491 with self.transaction() as cursor:
492 cursor.execute(
493 'DELETE FROM files WHERE path = ?', (path,))
495 def remove_glob(self, pattern):
496 '''
497 Prune content meta-information about files matching given pattern.
499 All content pieces belonging to files who's pathes match the given
500 ``pattern`` are removed from the main database and any attached live
501 selections (via database triggers).
502 '''
504 with self.transaction() as cursor:
505 return cursor.execute(
506 'DELETE FROM files WHERE path GLOB ?', (pattern,)).rowcount
508 def _remove_volatile(self):
509 '''
510 Prune leftover volatile content from database.
512 If the cleanup handler of an attached selection is not called, e.g. due
513 to a crash or terminated process, volatile content will not be removed
514 properly. This method will delete such leftover entries.
516 This is a mainenance operatation which should only be called when no
517 apps are using the database because it would remove volatile content
518 currently used by the apps.
519 '''
521 with self.transaction() as cursor:
522 return cursor.execute(
523 '''
524 DELETE FROM files
525 WHERE path LIKE "virtual:volatile:%"').rowcount
526 ''').rowcount
528 def reset(self, path, transaction=None):
529 '''
530 Prune information associated with a given file, but keep the file path.
532 This method is called when reading a file failed. File attributes,
533 format, size and modification time are set to NULL. File content
534 meta-information is removed from the database and any attached live
535 selections (via database triggers).
536 '''
538 path = self.relpath(abspath(path))
540 with (transaction or self.transaction()) as cursor:
541 cursor.execute(
542 '''
543 UPDATE files SET
544 format = NULL,
545 mtime = NULL,
546 size = NULL
547 WHERE path = ?
548 ''', (path,))
550 def silent_touch(self, path):
551 '''
552 Update modification time of file without initiating reindexing.
554 Useful to prolong validity period of data with expiration date.
555 '''
557 apath = abspath(path)
558 path = self.relpath(apath)
560 with self.transaction() as cursor:
562 sql = 'SELECT format, size FROM files WHERE path = ?'
563 fmt, size = execute_get1(cursor, sql, (path,))
565 mod = io.get_backend(fmt)
566 mod.touch(apath)
567 file_stats = mod.get_stats(apath)
569 if file_stats[1] != size:
570 raise FileLoadError(
571 'Silent update for file "%s" failed: size has changed.'
572 % apath)
574 sql = '''
575 UPDATE files
576 SET mtime = ?
577 WHERE path = ?
578 '''
579 cursor.execute(sql, (file_stats[0], path))
581 def _iter_counts(self, kind=None, kind_codes_count='kind_codes_count'):
582 args = []
583 sel = ''
584 if kind is not None:
585 sel = 'AND kind_codes.kind_id == ?'
586 args.append(to_kind_id(kind))
588 sql = ('''
589 SELECT
590 kind_codes.kind_id,
591 kind_codes.codes,
592 kind_codes.deltat,
593 %(kind_codes_count)s.count
594 FROM %(kind_codes_count)s
595 INNER JOIN kind_codes
596 ON %(kind_codes_count)s.kind_codes_id
597 == kind_codes.kind_codes_id
598 WHERE %(kind_codes_count)s.count > 0
599 ''' + sel + '''
600 ''') % {'kind_codes_count': kind_codes_count}
602 for kind_id, codes, deltat, count in self._conn.execute(sql, args):
603 yield (
604 to_kind(kind_id),
605 tuple(codes.split(separator)),
606 deltat), count
608 def _iter_deltats(self, kind=None, kind_codes_count='kind_codes_count'):
609 args = []
610 sel = ''
611 if kind is not None:
612 assert isinstance(kind, str)
613 sel = 'AND kind_codes.kind_id == ?'
614 args.append(to_kind_id(kind))
616 sql = ('''
617 SELECT DISTINCT kind_codes.deltat FROM %(kind_codes_count)s
618 INNER JOIN kind_codes
619 ON %(kind_codes_count)s.kind_codes_id
620 == kind_codes.kind_codes_id
621 WHERE %(kind_codes_count)s.count > 0
622 ''' + sel + '''
623 ORDER BY kind_codes.deltat
624 ''') % {'kind_codes_count': kind_codes_count}
626 for row in self._conn.execute(sql, args):
627 yield row[0]
629 def _iter_codes(self, kind=None, kind_codes_count='kind_codes_count'):
630 args = []
631 sel = ''
632 if kind is not None:
633 assert isinstance(kind, str)
634 sel = 'AND kind_codes.kind_id == ?'
635 args.append(to_kind_id(kind))
637 sql = ('''
638 SELECT DISTINCT kind_codes.codes FROM %(kind_codes_count)s
639 INNER JOIN kind_codes
640 ON %(kind_codes_count)s.kind_codes_id
641 == kind_codes.kind_codes_id
642 WHERE %(kind_codes_count)s.count > 0
643 ''' + sel + '''
644 ORDER BY kind_codes.codes
645 ''') % {'kind_codes_count': kind_codes_count}
647 for row in self._conn.execute(sql, args):
648 yield tuple(row[0].split(separator))
650 def _iter_kinds(self, codes=None, kind_codes_count='kind_codes_count'):
651 args = []
652 sel = ''
653 if codes is not None:
654 assert isinstance(codes, tuple)
655 sel = 'AND kind_codes.codes == ?'
656 args.append(separator.join(codes))
658 sql = ('''
659 SELECT DISTINCT kind_codes.kind_id FROM %(kind_codes_count)s
660 INNER JOIN kind_codes
661 ON %(kind_codes_count)s.kind_codes_id
662 == kind_codes.kind_codes_id
663 WHERE %(kind_codes_count)s.count > 0
664 ''' + sel + '''
665 ORDER BY kind_codes.kind_id
666 ''') % {'kind_codes_count': kind_codes_count}
668 for row in self._conn.execute(sql, args):
669 yield to_kind(row[0])
671 def iter_paths(self):
672 for row in self._conn.execute('''SELECT path FROM files'''):
673 yield self.abspath(row[0])
675 def iter_nnuts_by_file(self):
676 sql = '''
677 SELECT
678 path,
679 (SELECT COUNT(*) FROM nuts WHERE nuts.file_id = files.file_id)
680 FROM files
681 '''
682 for row in self._conn.execute(sql):
683 yield (self.abspath(row[0]),) + row[1:]
685 def iter_kinds(self, codes=None):
686 return self._iter_kinds(codes=codes)
688 def iter_codes(self, kind=None):
689 return self._iter_codes(kind=kind)
691 def iter_counts(self, kind=None):
692 return self._iter_counts(kind=kind)
694 def get_paths(self):
695 return list(self.iter_paths())
697 def get_kinds(self, codes=None):
698 return list(self.iter_kinds(codes=codes))
700 def get_codes(self, kind=None):
701 return list(self.iter_codes(kind=kind))
703 def get_counts(self, kind=None):
704 d = {}
705 for (k, codes, deltat), count in self.iter_counts():
706 if k not in d:
707 v = d[k] = {}
708 else:
709 v = d[k]
711 if codes not in v:
712 v[codes] = 0
714 v[codes] += count
716 if kind is not None:
717 return d[kind]
718 else:
719 return d
721 def get_nfiles(self):
722 sql = '''SELECT COUNT(*) FROM files'''
723 for row in self._conn.execute(sql):
724 return row[0]
726 def get_nnuts(self):
727 sql = '''SELECT COUNT(*) FROM nuts'''
728 for row in self._conn.execute(sql):
729 return row[0]
731 def get_nnuts_by_file(self):
732 return list(self.iter_nnuts_by_file())
734 def get_total_size(self):
735 sql = '''
736 SELECT SUM(files.size) FROM files
737 '''
739 for row in self._conn.execute(sql):
740 return row[0] or 0
742 def get_persistent_names(self):
743 sql = '''
744 SELECT name FROM persistent
745 '''
746 return [row[0] for row in self._conn.execute(sql)]
748 def get_stats(self):
749 return DatabaseStats(
750 nfiles=self.get_nfiles(),
751 nnuts=self.get_nnuts(),
752 kinds=self.get_kinds(),
753 codes=self.get_codes(),
754 counts=self.get_counts(),
755 total_size=self.get_total_size(),
756 persistent=self.get_persistent_names())
758 def __str__(self):
759 return str(self.get_stats())
761 def print_tables(self, stream=None):
762 for table in [
763 'persistent',
764 'files',
765 'nuts',
766 'kind_codes',
767 'kind_codes_count']:
769 self.print_table(table, stream=stream)
771 def print_table(self, name, stream=None):
773 if stream is None:
774 stream = sys.stdout
776 class hstr(str):
777 def __repr__(self):
778 return self
780 w = stream.write
781 w('\n')
782 w('\n')
783 w(name)
784 w('\n')
785 sql = 'SELECT * FROM %s' % name
786 tab = []
787 if name in self._tables:
788 headers = self._tables[name]
789 tab.append([None for _ in headers])
790 tab.append([hstr(x[0]) for x in headers])
791 tab.append([hstr(x[1]) for x in headers])
792 tab.append([None for _ in headers])
794 for row in self._conn.execute(sql):
795 tab.append([x for x in row])
797 widths = [
798 max((len(repr(x)) if x is not None else 0) for x in col)
799 for col in zip(*tab)]
801 for row in tab:
802 w(' '.join(
803 (repr(x).ljust(wid) if x is not None else ''.ljust(wid, '-'))
804 for (x, wid) in zip(row, widths)))
806 w('\n')
808 w('\n')
811class DatabaseStats(Object):
812 '''
813 Container to hold statistics about contents cached in meta-information db.
814 '''
816 nfiles = Int.T(
817 help='Number of files in database.')
818 nnuts = Int.T(
819 help='Number of index nuts in database.')
820 codes = List.T(
821 Tuple.T(content_t=String.T()),
822 help='Available code sequences in database, e.g. '
823 '(agency, network, station, location) for stations nuts.')
824 kinds = List.T(
825 String.T(),
826 help='Available content types in database.')
827 total_size = Int.T(
828 help='Aggregated file size of files referenced in database.')
829 counts = Dict.T(
830 String.T(), Dict.T(Tuple.T(content_t=String.T()), Int.T()),
831 help='Breakdown of how many nuts of any content type and code '
832 'sequence are available in database, ``counts[kind][codes]``.')
833 persistent = List.T(
834 String.T(),
835 help='Names of persistent selections stored in database.')
837 def __str__(self):
838 kind_counts = dict(
839 (kind, sum(self.counts[kind].values())) for kind in self.kinds)
841 codes = ['.'.join(x) for x in self.codes]
843 if len(codes) > 20:
844 scodes = '\n' + util.ewrap(codes[:10], indent=' ') \
845 + '\n [%i more]\n' % (len(codes) - 20) \
846 + util.ewrap(codes[-10:], indent=' ')
847 else:
848 scodes = '\n' + util.ewrap(codes, indent=' ') \
849 if codes else '<none>'
851 s = '''
852Available codes: %s
853Number of files: %i
854Total size of known files: %s
855Number of index nuts: %i
856Available content kinds: %s
857Persistent selections: %s''' % (
858 scodes,
859 self.nfiles,
860 util.human_bytesize(self.total_size),
861 self.nnuts,
862 ', '.join('%s: %i' % (
863 kind, kind_counts[kind]) for kind in sorted(self.kinds)),
864 ', '.join(self.persistent))
866 return s
869__all__ = [
870 'Database',
871 'DatabaseStats',
872]