1# http://pyrocko.org - GPLv3
2#
3# The Pyrocko Developers, 21st Century
4# ---|P------/S----------~Lg----------
6from __future__ import absolute_import, print_function
8import sys
9import os
10import logging
11import sqlite3
12import re
13import time
15from pyrocko.io.io_common import FileLoadError
16from pyrocko import util
17from pyrocko.guts import Object, Int, List, Dict, Tuple, String
18from . import error, io
19from .model import Nut, to_kind_id, to_kind, separator
20from .error import SquirrelError
22logger = logging.getLogger('psq.database')
24guts_prefix = 'squirrel'
27def abspath(path):
28 if not path.startswith('virtual:') and not path.startswith('client:'):
29 return os.path.abspath(path)
30 else:
31 return path
34class ExecuteGet1Error(SquirrelError):
35 pass
38def execute_get1(connection, sql, args):
39 rows = list(connection.execute(sql, args))
40 if len(rows) == 1:
41 return rows[0]
42 else:
43 raise ExecuteGet1Error('Expected database entry not found.')
46g_databases = {}
49def get_database(path=None):
50 path = os.path.abspath(path)
51 if path not in g_databases:
52 g_databases[path] = Database(path)
54 return g_databases[path]
57class Transaction(object):
58 def __init__(self, conn, mode='immediate', retry_interval=0.1):
59 self.cursor = conn.cursor()
60 assert mode in ('deferred', 'immediate', 'exclusive')
61 self.mode = mode
62 self.depth = 0
63 self.rollback_wanted = False
64 self.retry_interval = retry_interval
66 def begin(self):
67 if self.depth == 0:
68 tries = 0
69 while True:
70 try:
71 tries += 1
72 self.cursor.execute('BEGIN %s' % self.mode.upper())
73 break
75 except sqlite3.OperationalError as e:
76 if not str(e) == 'database is locked':
77 raise
79 logger.info(
80 'Database is locked retrying in %s s. Tries: %i'
81 % (self.retry_interval, tries))
83 time.sleep(self.retry_interval)
85 self.depth += 1
87 def commit(self):
88 self.depth -= 1
89 if self.depth == 0:
90 if not self.rollback_wanted:
91 self.cursor.execute('COMMIT')
92 else:
93 self.cursor.execute('ROLLBACK')
94 logger.warning('Deferred rollback executed.')
95 self.rollback_wanted = False
97 def rollback(self):
98 self.depth -= 1
99 if self.depth == 0:
100 self.cursor.execute('ROLLBACK')
101 self.rollback_wanted = False
102 else:
103 logger.warning('Deferred rollback scheduled.')
104 self.rollback_wanted = True
106 def close(self):
107 self.cursor.close()
109 def __enter__(self):
110 self.begin()
111 return self.cursor
113 def __exit__(self, exc_type, exc_value, traceback):
114 if exc_type is None:
115 self.commit()
116 else:
117 self.rollback()
119 if self.depth == 0:
120 self.close()
123class Database(object):
124 '''
125 Shared meta-information database used by Squirrel.
126 '''
128 def __init__(self, database_path=':memory:', log_statements=False):
129 self._database_path = database_path
130 if database_path != ':memory:':
131 util.ensuredirs(database_path)
133 try:
134 logger.debug('Opening connection to database: %s' % database_path)
135 self._conn = sqlite3.connect(database_path, isolation_level=None)
136 except sqlite3.OperationalError:
137 raise error.SquirrelError(
138 'Cannot connect to database: %s' % database_path)
140 self._conn.text_factory = str
141 self._tables = {}
143 if log_statements:
144 self._conn.set_trace_callback(self._log_statement)
146 self._initialize_db()
147 self._basepath = None
149 def set_basepath(self, basepath):
150 if basepath is not None:
151 self._basepath = os.path.abspath(basepath)
152 else:
153 self._basepath = None
155 def relpath(self, path):
156 if self._basepath is not None and path.startswith(
157 self._basepath + os.path.sep):
158 return path[len(self._basepath) + 1:]
159 else:
160 return path
162 def abspath(self, path):
163 if self._basepath is not None and not path.startswith('virtual:') \
164 and not path.startswith('client:') \
165 and not os.path.isabs(path):
166 return os.path.join(self._basepath, path)
167 else:
168 return path
170 def _log_statement(self, statement):
171 logger.debug(statement)
173 def get_connection(self):
174 return self._conn
176 def transaction(self, mode='immediate'):
177 return Transaction(self._conn, mode)
179 def _register_table(self, s):
180 m = re.search(r'(\S+)\s*\(([^)]+)\)', s)
181 table_name = m.group(1)
182 dtypes = m.group(2)
183 table_header = []
184 for dele in dtypes.split(','):
185 table_header.append(dele.split()[:2])
187 self._tables[table_name] = table_header
189 return s
191 def _initialize_db(self):
192 with self.transaction() as cursor:
193 cursor.execute(
194 '''PRAGMA recursive_triggers = true''')
196 if 2 == len(list(
197 cursor.execute(
198 '''
199 SELECT name FROM sqlite_master
200 WHERE type = 'table' AND name IN (
201 'files',
202 'persistent')
203 '''))):
205 return
207 cursor.execute(self._register_table(
208 '''
209 CREATE TABLE IF NOT EXISTS files (
210 file_id integer PRIMARY KEY,
211 path text,
212 format text,
213 mtime float,
214 size integer)
215 '''))
217 cursor.execute(
218 '''
219 CREATE UNIQUE INDEX IF NOT EXISTS index_files_path
220 ON files (path)
221 ''')
223 cursor.execute(self._register_table(
224 '''
225 CREATE TABLE IF NOT EXISTS nuts (
226 nut_id integer PRIMARY KEY AUTOINCREMENT,
227 file_id integer,
228 file_segment integer,
229 file_element integer,
230 kind_id integer,
231 kind_codes_id integer,
232 tmin_seconds integer,
233 tmin_offset integer,
234 tmax_seconds integer,
235 tmax_offset integer,
236 kscale integer)
237 '''))
239 cursor.execute(
240 '''
241 CREATE UNIQUE INDEX IF NOT EXISTS index_nuts_file_element
242 ON nuts (file_id, file_segment, file_element)
243 ''')
245 cursor.execute(self._register_table(
246 '''
247 CREATE TABLE IF NOT EXISTS kind_codes (
248 kind_codes_id integer PRIMARY KEY,
249 kind_id integer,
250 codes text,
251 deltat float)
252 '''))
254 cursor.execute(
255 '''
256 CREATE UNIQUE INDEX IF NOT EXISTS index_kind_codes
257 ON kind_codes (kind_id, codes, deltat)
258 ''')
260 cursor.execute(self._register_table(
261 '''
262 CREATE TABLE IF NOT EXISTS kind_codes_count (
263 kind_codes_id integer PRIMARY KEY,
264 count integer)
265 '''))
267 cursor.execute(
268 '''
269 CREATE INDEX IF NOT EXISTS index_nuts_file_id
270 ON nuts (file_id)
271 ''')
273 cursor.execute(
274 '''
275 CREATE TRIGGER IF NOT EXISTS delete_nuts_on_delete_file
276 BEFORE DELETE ON files FOR EACH ROW
277 BEGIN
278 DELETE FROM nuts where file_id == old.file_id;
279 END
280 ''')
282 # trigger only on size to make silent update of mtime possible
283 cursor.execute(
284 '''
285 CREATE TRIGGER IF NOT EXISTS delete_nuts_on_update_file
286 BEFORE UPDATE OF size ON files FOR EACH ROW
287 BEGIN
288 DELETE FROM nuts where file_id == old.file_id;
289 END
290 ''')
292 cursor.execute(
293 '''
294 CREATE TRIGGER IF NOT EXISTS increment_kind_codes
295 BEFORE INSERT ON nuts FOR EACH ROW
296 BEGIN
297 INSERT OR IGNORE INTO kind_codes_count
298 VALUES (new.kind_codes_id, 0);
299 UPDATE kind_codes_count
300 SET count = count + 1
301 WHERE new.kind_codes_id == kind_codes_id;
302 END
303 ''')
305 cursor.execute(
306 '''
307 CREATE TRIGGER IF NOT EXISTS decrement_kind_codes
308 BEFORE DELETE ON nuts FOR EACH ROW
309 BEGIN
310 UPDATE kind_codes_count
311 SET count = count - 1
312 WHERE old.kind_codes_id == kind_codes_id;
313 END
314 ''')
316 cursor.execute(self._register_table(
317 '''
318 CREATE TABLE IF NOT EXISTS persistent (
319 name text UNIQUE)
320 '''))
322 def dig(self, nuts, transaction=None):
323 '''
324 Store or update content meta-information.
326 Given ``nuts`` are assumed to represent an up-to-date and complete
327 inventory of a set of files. Any old information about these files is
328 first pruned from the database (via database triggers). If such content
329 is part of a live selection, it is also removed there. Then the new
330 content meta-information is inserted into the main database. The
331 content is not automatically inserted into the live selections again.
332 It is in the responsibility of the selection object to perform this
333 step.
334 '''
336 nuts = list(nuts)
338 if not nuts:
339 return
341 files = set()
342 kind_codes = set()
343 for nut in nuts:
344 files.add((
345 self.relpath(nut.file_path),
346 nut.file_format,
347 nut.file_mtime,
348 nut.file_size))
349 kind_codes.add((nut.kind_id, nut.codes, nut.deltat or 0.0))
351 with (transaction or self.transaction()) as c:
353 c.executemany(
354 'INSERT OR IGNORE INTO files VALUES (NULL,?,?,?,?)', files)
356 c.executemany(
357 '''UPDATE files SET
358 format = ?, mtime = ?, size = ?
359 WHERE path == ?
360 ''',
361 ((x[1], x[2], x[3], x[0]) for x in files))
363 c.executemany(
364 'INSERT OR IGNORE INTO kind_codes VALUES (NULL,?,?,?)',
365 kind_codes)
367 c.executemany(
368 '''
369 INSERT INTO nuts VALUES
370 (NULL, (
371 SELECT file_id FROM files
372 WHERE path == ?
373 ),?,?,?,
374 (
375 SELECT kind_codes_id FROM kind_codes
376 WHERE kind_id == ? AND codes == ? AND deltat == ?
377 ), ?,?,?,?,?)
378 ''',
379 ((self.relpath(nut.file_path),
380 nut.file_segment, nut.file_element,
381 nut.kind_id,
382 nut.kind_id, nut.codes, nut.deltat or 0.0,
383 nut.tmin_seconds, nut.tmin_offset,
384 nut.tmax_seconds, nut.tmax_offset,
385 nut.kscale) for nut in nuts))
387 def undig(self, path):
389 path = self.relpath(abspath(path))
391 sql = '''
392 SELECT
393 files.path,
394 files.format,
395 files.mtime,
396 files.size,
397 nuts.file_segment,
398 nuts.file_element,
399 kind_codes.kind_id,
400 kind_codes.codes,
401 nuts.tmin_seconds,
402 nuts.tmin_offset,
403 nuts.tmax_seconds,
404 nuts.tmax_offset,
405 kind_codes.deltat
406 FROM files
407 INNER JOIN nuts ON files.file_id = nuts.file_id
408 INNER JOIN kind_codes
409 ON nuts.kind_codes_id == kind_codes.kind_codes_id
410 WHERE path == ?
411 '''
413 return [Nut(values_nocheck=(self.abspath(row[0]),) + row[1:])
414 for row in self._conn.execute(sql, (path,))]
416 def undig_all(self):
417 sql = '''
418 SELECT
419 files.path,
420 files.format,
421 files.mtime,
422 files.size,
423 nuts.file_segment,
424 nuts.file_element,
425 kind_codes.kind_id,
426 kind_codes.codes,
427 nuts.tmin_seconds,
428 nuts.tmin_offset,
429 nuts.tmax_seconds,
430 nuts.tmax_offset,
431 kind_codes.deltat
432 FROM files
433 INNER JOIN nuts ON files.file_id == nuts.file_id
434 INNER JOIN kind_codes
435 ON nuts.kind_codes_id == kind_codes.kind_codes_id
436 '''
438 nuts = []
439 path = None
440 for values in self._conn.execute(sql):
441 if path is not None and values[0] != path:
442 yield path, nuts
443 nuts = []
445 path = self.abspath(values[0])
447 if values[1] is not None:
448 nuts.append(Nut(values_nocheck=(path,) + values[1:]))
450 if path is not None:
451 yield path, nuts
453 def undig_many(self, paths, show_progress=True):
454 selection = self.new_selection(paths, show_progress=show_progress)
456 for (_, path), nuts in selection.undig_grouped():
457 yield path, nuts
459 del selection
461 def new_selection(self, paths=None, show_progress=True):
462 from .selection import Selection
463 selection = Selection(self)
464 if paths:
465 selection.add(paths, show_progress=show_progress)
466 return selection
468 def undig_content(self, nut):
469 return None
471 def remove(self, path):
472 '''
473 Prune content meta-information about a given file.
475 All content pieces belonging to file ``path`` are removed from the
476 main database and any attached live selections (via database triggers).
477 '''
479 path = self.relpath(abspath(path))
481 with self.transaction() as cursor:
482 cursor.execute(
483 'DELETE FROM files WHERE path = ?', (path,))
485 def remove_glob(self, pattern):
486 '''
487 Prune content meta-information about files matching given pattern.
489 All content pieces belonging to files who's pathes match the given
490 ``pattern`` are removed from the main database and any attached live
491 selections (via database triggers).
492 '''
494 with self.transaction() as cursor:
495 return cursor.execute(
496 'DELETE FROM files WHERE path GLOB ?', (pattern,)).rowcount
498 def _remove_volatile(self):
499 '''
500 Prune leftover volatile content from database.
502 If the cleanup handler of an attached selection is not called, e.g. due
503 to a crash or terminated process, volatile content will not be removed
504 properly. This method will delete such leftover entries.
506 This is a mainenance operatation which should only be called when no
507 apps are using the database because it would remove volatile content
508 currently used by the apps.
509 '''
511 with self.transaction() as cursor:
512 return cursor.execute(
513 '''
514 DELETE FROM files
515 WHERE path LIKE "virtual:volatile:%"').rowcount
516 ''').rowcount
518 def reset(self, path, transaction=None):
519 '''
520 Prune information associated with a given file, but keep the file path.
522 This method is called when reading a file failed. File attributes,
523 format, size and modification time are set to NULL. File content
524 meta-information is removed from the database and any attached live
525 selections (via database triggers).
526 '''
528 path = self.relpath(abspath(path))
530 with (transaction or self.transaction()) as cursor:
531 cursor.execute(
532 '''
533 UPDATE files SET
534 format = NULL,
535 mtime = NULL,
536 size = NULL
537 WHERE path = ?
538 ''', (path,))
540 def silent_touch(self, path):
541 '''
542 Update modification time of file without initiating reindexing.
544 Useful to prolong validity period of data with expiration date.
545 '''
547 apath = abspath(path)
548 path = self.relpath(apath)
550 with self.transaction() as cursor:
552 sql = 'SELECT format, size FROM files WHERE path = ?'
553 fmt, size = execute_get1(cursor, sql, (path,))
555 mod = io.get_backend(fmt)
556 mod.touch(apath)
557 file_stats = mod.get_stats(apath)
559 if file_stats[1] != size:
560 raise FileLoadError(
561 'Silent update for file "%s" failed: size has changed.'
562 % apath)
564 sql = '''
565 UPDATE files
566 SET mtime = ?
567 WHERE path = ?
568 '''
569 cursor.execute(sql, (file_stats[0], path))
571 def _iter_counts(self, kind=None, kind_codes_count='kind_codes_count'):
572 args = []
573 sel = ''
574 if kind is not None:
575 sel = 'AND kind_codes.kind_id == ?'
576 args.append(to_kind_id(kind))
578 sql = ('''
579 SELECT
580 kind_codes.kind_id,
581 kind_codes.codes,
582 kind_codes.deltat,
583 %(kind_codes_count)s.count
584 FROM %(kind_codes_count)s
585 INNER JOIN kind_codes
586 ON %(kind_codes_count)s.kind_codes_id
587 == kind_codes.kind_codes_id
588 WHERE %(kind_codes_count)s.count > 0
589 ''' + sel + '''
590 ''') % {'kind_codes_count': kind_codes_count}
592 for kind_id, codes, deltat, count in self._conn.execute(sql, args):
593 yield (
594 to_kind(kind_id),
595 tuple(codes.split(separator)),
596 deltat), count
598 def _iter_deltats(self, kind=None, kind_codes_count='kind_codes_count'):
599 args = []
600 sel = ''
601 if kind is not None:
602 assert isinstance(kind, str)
603 sel = 'AND kind_codes.kind_id == ?'
604 args.append(to_kind_id(kind))
606 sql = ('''
607 SELECT DISTINCT kind_codes.deltat FROM %(kind_codes_count)s
608 INNER JOIN kind_codes
609 ON %(kind_codes_count)s.kind_codes_id
610 == kind_codes.kind_codes_id
611 WHERE %(kind_codes_count)s.count > 0
612 ''' + sel + '''
613 ORDER BY kind_codes.deltat
614 ''') % {'kind_codes_count': kind_codes_count}
616 for row in self._conn.execute(sql, args):
617 yield row[0]
619 def _iter_codes(self, kind=None, kind_codes_count='kind_codes_count'):
620 args = []
621 sel = ''
622 if kind is not None:
623 assert isinstance(kind, str)
624 sel = 'AND kind_codes.kind_id == ?'
625 args.append(to_kind_id(kind))
627 sql = ('''
628 SELECT DISTINCT kind_codes.codes FROM %(kind_codes_count)s
629 INNER JOIN kind_codes
630 ON %(kind_codes_count)s.kind_codes_id
631 == kind_codes.kind_codes_id
632 WHERE %(kind_codes_count)s.count > 0
633 ''' + sel + '''
634 ORDER BY kind_codes.codes
635 ''') % {'kind_codes_count': kind_codes_count}
637 for row in self._conn.execute(sql, args):
638 yield tuple(row[0].split(separator))
640 def _iter_kinds(self, codes=None, kind_codes_count='kind_codes_count'):
641 args = []
642 sel = ''
643 if codes is not None:
644 assert isinstance(codes, tuple)
645 sel = 'AND kind_codes.codes == ?'
646 args.append(separator.join(codes))
648 sql = ('''
649 SELECT DISTINCT kind_codes.kind_id FROM %(kind_codes_count)s
650 INNER JOIN kind_codes
651 ON %(kind_codes_count)s.kind_codes_id
652 == kind_codes.kind_codes_id
653 WHERE %(kind_codes_count)s.count > 0
654 ''' + sel + '''
655 ORDER BY kind_codes.kind_id
656 ''') % {'kind_codes_count': kind_codes_count}
658 for row in self._conn.execute(sql, args):
659 yield to_kind(row[0])
661 def iter_paths(self):
662 for row in self._conn.execute('''SELECT path FROM files'''):
663 yield self.abspath(row[0])
665 def iter_nnuts_by_file(self):
666 sql = '''
667 SELECT
668 path,
669 (SELECT COUNT(*) FROM nuts WHERE nuts.file_id = files.file_id)
670 FROM files
671 '''
672 for row in self._conn.execute(sql):
673 yield (self.abspath(row[0]),) + row[1:]
675 def iter_kinds(self, codes=None):
676 return self._iter_kinds(codes=codes)
678 def iter_codes(self, kind=None):
679 return self._iter_codes(kind=kind)
681 def iter_counts(self, kind=None):
682 return self._iter_counts(kind=kind)
684 def get_paths(self):
685 return list(self.iter_paths())
687 def get_kinds(self, codes=None):
688 return list(self.iter_kinds(codes=codes))
690 def get_codes(self, kind=None):
691 return list(self.iter_codes(kind=kind))
693 def get_counts(self, kind=None):
694 d = {}
695 for (k, codes, deltat), count in self.iter_counts():
696 if k not in d:
697 v = d[k] = {}
698 else:
699 v = d[k]
701 if codes not in v:
702 v[codes] = 0
704 v[codes] += count
706 if kind is not None:
707 return d[kind]
708 else:
709 return d
711 def get_nfiles(self):
712 sql = '''SELECT COUNT(*) FROM files'''
713 for row in self._conn.execute(sql):
714 return row[0]
716 def get_nnuts(self):
717 sql = '''SELECT COUNT(*) FROM nuts'''
718 for row in self._conn.execute(sql):
719 return row[0]
721 def get_nnuts_by_file(self):
722 return list(self.iter_nnuts_by_file())
724 def get_total_size(self):
725 sql = '''
726 SELECT SUM(files.size) FROM files
727 '''
729 for row in self._conn.execute(sql):
730 return row[0] or 0
732 def get_persistent_names(self):
733 sql = '''
734 SELECT name FROM persistent
735 '''
736 return [row[0] for row in self._conn.execute(sql)]
738 def get_stats(self):
739 return DatabaseStats(
740 nfiles=self.get_nfiles(),
741 nnuts=self.get_nnuts(),
742 kinds=self.get_kinds(),
743 codes=self.get_codes(),
744 counts=self.get_counts(),
745 total_size=self.get_total_size(),
746 persistent=self.get_persistent_names())
748 def __str__(self):
749 return str(self.get_stats())
751 def print_tables(self, stream=None):
752 for table in [
753 'persistent',
754 'files',
755 'nuts',
756 'kind_codes',
757 'kind_codes_count']:
759 self.print_table(table, stream=stream)
761 def print_table(self, name, stream=None):
763 if stream is None:
764 stream = sys.stdout
766 class hstr(str):
767 def __repr__(self):
768 return self
770 w = stream.write
771 w('\n')
772 w('\n')
773 w(name)
774 w('\n')
775 sql = 'SELECT * FROM %s' % name
776 tab = []
777 if name in self._tables:
778 headers = self._tables[name]
779 tab.append([None for _ in headers])
780 tab.append([hstr(x[0]) for x in headers])
781 tab.append([hstr(x[1]) for x in headers])
782 tab.append([None for _ in headers])
784 for row in self._conn.execute(sql):
785 tab.append([x for x in row])
787 widths = [
788 max((len(repr(x)) if x is not None else 0) for x in col)
789 for col in zip(*tab)]
791 for row in tab:
792 w(' '.join(
793 (repr(x).ljust(wid) if x is not None else ''.ljust(wid, '-'))
794 for (x, wid) in zip(row, widths)))
796 w('\n')
798 w('\n')
801class DatabaseStats(Object):
802 '''
803 Container to hold statistics about contents cached in meta-information db.
804 '''
806 nfiles = Int.T(
807 help='Number of files in database.')
808 nnuts = Int.T(
809 help='Number of index nuts in database.')
810 codes = List.T(
811 Tuple.T(content_t=String.T()),
812 help='Available code sequences in database, e.g. '
813 '(agency, network, station, location) for stations nuts.')
814 kinds = List.T(
815 String.T(),
816 help='Available content types in database.')
817 total_size = Int.T(
818 help='Aggregated file size of files referenced in database.')
819 counts = Dict.T(
820 String.T(), Dict.T(Tuple.T(content_t=String.T()), Int.T()),
821 help='Breakdown of how many nuts of any content type and code '
822 'sequence are available in database, ``counts[kind][codes]``.')
823 persistent = List.T(
824 String.T(),
825 help='Names of persistent selections stored in database.')
827 def __str__(self):
828 kind_counts = dict(
829 (kind, sum(self.counts[kind].values())) for kind in self.kinds)
831 codes = ['.'.join(x) for x in self.codes]
833 if len(codes) > 20:
834 scodes = '\n' + util.ewrap(codes[:10], indent=' ') \
835 + '\n [%i more]\n' % (len(codes) - 20) \
836 + util.ewrap(codes[-10:], indent=' ')
837 else:
838 scodes = '\n' + util.ewrap(codes, indent=' ') \
839 if codes else '<none>'
841 s = '''
842Available codes: %s
843Number of files: %i
844Total size of known files: %s
845Number of index nuts: %i
846Available content kinds: %s
847Persistent selections: %s''' % (
848 scodes,
849 self.nfiles,
850 util.human_bytesize(self.total_size),
851 self.nnuts,
852 ', '.join('%s: %i' % (
853 kind, kind_counts[kind]) for kind in sorted(self.kinds)),
854 ', '.join(self.persistent))
856 return s
859__all__ = [
860 'Database',
861 'DatabaseStats',
862]