Coverage for /usr/local/lib/python3.11/dist-packages/pyrocko/squirrel/selection.py: 86%
209 statements
« prev ^ index » next coverage.py v6.5.0, created at 2023-10-04 09:52 +0000
« prev ^ index » next coverage.py v6.5.0, created at 2023-10-04 09:52 +0000
1# http://pyrocko.org - GPLv3
2#
3# The Pyrocko Developers, 21st Century
4# ---|P------/S----------~Lg----------
6'''
7Meta-data caching for flexible file selections.
8'''
10import os
11import re
12import threading
13import logging
15from pyrocko import util
16from pyrocko.io.io_common import FileLoadError
17from pyrocko.progress import progress
19from . import error, io, model
20from .database import Database, get_database, execute_get1, abspath
22logger = logging.getLogger('psq.selection')
24g_icount = 0
25g_lock = threading.Lock()
27re_persistent_name = re.compile(r'^[a-zA-Z_][a-zA-Z0-9_]{0,64}$')
30def make_unique_name():
31 with g_lock:
32 global g_icount
33 name = '%i_%i' % (os.getpid(), g_icount)
34 g_icount += 1
36 return name
39def make_task(*args):
40 return progress.task(*args, logger=logger)
43doc_snippets = dict(
44 query_args='''
45 :param obj:
46 Object providing ``tmin``, ``tmax`` and ``codes`` to be used to
47 constrain the query. Direct arguments override those from ``obj``.
48 :type obj:
49 any object with attributes ``tmin``, ``tmax`` and ``codes``
51 :param tmin:
52 Start time of query interval.
53 :type tmin:
54 :py:func:`~pyrocko.util.get_time_float`
56 :param tmax:
57 End time of query interval.
58 :type tmax:
59 :py:func:`~pyrocko.util.get_time_float`
61 :param time:
62 Time instant to query. Equivalent to setting ``tmin`` and ``tmax``
63 to the same value.
64 :type time:
65 :py:func:`~pyrocko.util.get_time_float`
67 :param codes:
68 Pattern of content codes to query.
69 :type codes:
70 :class:`list` of :py:class:`~pyrocko.squirrel.model.Codes`
71 objects appropriate for the queried content type, or anything which
72 can be converted to such objects.
73''',
74 file_formats=', '.join(
75 "``'%s'``" % fmt for fmt in io.supported_formats()))
78def filldocs(meth):
79 meth.__doc__ %= doc_snippets
80 return meth
83class GeneratorWithLen(object):
85 def __init__(self, gen, length):
86 self.gen = gen
87 self.length = length
89 def __len__(self):
90 return self.length
92 def __iter__(self):
93 return self.gen
96class Selection(object):
98 '''
99 Database backed file selection (base class for
100 :py:class:`~pyrocko.squirrel.base.Squirrel`).
102 :param database:
103 Database instance or file path to database.
104 :type database:
105 :py:class:`~pyrocko.squirrel.database.Database` or :py:class:`str`
107 :param persistent:
108 If given a name, create a persistent selection.
109 :type persistent:
110 :py:class:`str`
112 A selection in this context represents the list of files available to the
113 application. Instead of using :py:class:`Selection` directly, user
114 applications should usually use its subclass
115 :py:class:`~pyrocko.squirrel.base.Squirrel` which adds content indices to
116 the selection and provides high level data querying.
118 By default, a temporary table in the database is created to hold the names
119 of the files in the selection. This table is only visible inside the
120 application which created it. If a name is given to ``persistent``, a named
121 selection is created, which is visible also in other applications using the
122 same database.
124 Besides the filename references, desired content kind masks and file format
125 indications are stored in the selection's database table to make the user
126 choice regarding these options persistent on a per-file basis. Book-keeping
127 on whether files are unknown, known or if modification checks are forced is
128 handled in the selection's file-state table.
130 Paths of files can be added to the selection using the :py:meth:`add`
131 method and removed with :py:meth:`remove`. :py:meth:`undig_grouped` can be
132 used to iterate over all content known to the selection.
133 '''
135 def __init__(self, database, persistent=None):
136 self._conn = None
138 if not isinstance(database, Database):
139 database = get_database(database)
141 if persistent is not None:
142 assert isinstance(persistent, str)
143 if not re_persistent_name.match(persistent):
144 raise error.SquirrelError(
145 'invalid persistent selection name: %s' % persistent)
147 self.name = 'psel_' + persistent
148 else:
149 self.name = 'sel_' + make_unique_name()
151 self._persistent = persistent
152 self._database = database
153 self._conn = self._database.get_connection()
154 self._sources = []
155 self._is_new = True
156 self._volatile_paths = []
158 with self.transaction('init selection') as cursor:
160 if persistent is not None:
161 self._is_new = 1 == cursor.execute(
162 '''
163 INSERT OR IGNORE INTO persistent VALUES (?)
164 ''', (persistent,)).rowcount
166 self._names = {
167 'db': 'main' if self._persistent else 'temp',
168 'file_states': self.name + '_file_states',
169 'bulkinsert': self.name + '_bulkinsert'}
171 cursor.execute(self._register_table(self._sql(
172 '''
173 CREATE TABLE IF NOT EXISTS %(db)s.%(file_states)s (
174 file_id integer PRIMARY KEY,
175 file_state integer,
176 kind_mask integer,
177 format text)
178 ''')))
180 cursor.execute(self._sql(
181 '''
182 CREATE INDEX
183 IF NOT EXISTS %(db)s.%(file_states)s_index_file_state
184 ON %(file_states)s (file_state)
185 '''))
187 def __del__(self):
188 if hasattr(self, '_conn') and self._conn:
189 self._cleanup()
190 if not self._persistent:
191 self._delete()
193 def _register_table(self, s):
194 return self._database._register_table(s)
196 def _sql(self, s):
197 return s % self._names
199 def transaction(self, label='', mode='immediate'):
200 return self._database.transaction(label, mode)
202 def is_new(self):
203 '''
204 Is this a new selection?
206 Always ``True`` for non-persistent selections. Only ``False`` for
207 a persistent selection which already existed in the database when the
208 it was initialized.
209 '''
210 return self._is_new
212 def get_database(self):
213 '''
214 Get the database to which this selection belongs.
216 :returns: :py:class:`~pyrocko.squirrel.database.Database` object
217 '''
218 return self._database
220 def _cleanup(self):
221 '''
222 Perform cleanup actions before database connection is closed.
224 Removes volatile content from database.
225 '''
227 while self._volatile_paths:
228 path = self._volatile_paths.pop()
229 self._database.remove(path)
231 def _delete(self):
232 '''
233 Destroy the tables assoctiated with this selection.
234 '''
235 with self.transaction('delete selection') as cursor:
236 cursor.execute(self._sql(
237 'DROP TABLE %(db)s.%(file_states)s'))
239 if self._persistent:
240 cursor.execute(
241 '''
242 DELETE FROM persistent WHERE name == ?
243 ''', (self.name[5:],))
245 self._conn = None
247 def delete(self):
248 self._delete()
250 @filldocs
251 def add(
252 self,
253 paths,
254 kind_mask=model.g_kind_mask_all,
255 format='detect',
256 show_progress=True):
258 '''
259 Add files to the selection.
261 :param paths:
262 Paths to files to be added to the selection.
263 :type paths:
264 iterator yielding :py:class:`str` objects
266 :param kind_mask:
267 Content kinds to be added to the selection.
268 :type kind_mask:
269 :py:class:`int` (bit mask)
271 :param format:
272 File format identifier or ``'detect'`` to enable auto-detection
273 (available: %(file_formats)s).
274 :type format:
275 str
276 '''
278 if isinstance(paths, str):
279 paths = [paths]
281 paths = util.short_to_list(200, paths)
283 if isinstance(paths, list) and len(paths) == 0:
284 return
286 if show_progress:
287 task = make_task('Gathering file names')
288 paths = task(paths)
290 db = self.get_database()
291 with self.transaction('add files') as cursor:
293 if isinstance(paths, list) and len(paths) <= 200:
295 paths = [db.relpath(path) for path in paths]
297 # short non-iterator paths: can do without temp table
299 cursor.executemany(
300 '''
301 INSERT OR IGNORE INTO files
302 VALUES (NULL, ?, NULL, NULL, NULL)
303 ''', ((x,) for x in paths))
305 if show_progress:
306 task = make_task('Preparing database', 3)
307 task.update(0, condition='pruning stale information')
309 cursor.executemany(self._sql(
310 '''
311 DELETE FROM %(db)s.%(file_states)s
312 WHERE file_id IN (
313 SELECT files.file_id
314 FROM files
315 WHERE files.path == ? )
316 AND ( kind_mask != ? OR format != ? )
317 '''), (
318 (path, kind_mask, format) for path in paths))
320 if show_progress:
321 task.update(1, condition='adding file names to selection')
323 cursor.executemany(self._sql(
324 '''
325 INSERT OR IGNORE INTO %(db)s.%(file_states)s
326 SELECT files.file_id, 0, ?, ?
327 FROM files
328 WHERE files.path = ?
329 '''), ((kind_mask, format, path) for path in paths))
331 if show_progress:
332 task.update(2, condition='updating file states')
334 cursor.executemany(self._sql(
335 '''
336 UPDATE %(db)s.%(file_states)s
337 SET file_state = 1
338 WHERE file_id IN (
339 SELECT files.file_id
340 FROM files
341 WHERE files.path == ? )
342 AND file_state != 0
343 '''), ((path,) for path in paths))
345 if show_progress:
346 task.update(3)
347 task.done()
349 else:
351 cursor.execute(self._sql(
352 '''
353 CREATE TEMP TABLE temp.%(bulkinsert)s
354 (path text)
355 '''))
357 cursor.executemany(self._sql(
358 'INSERT INTO temp.%(bulkinsert)s VALUES (?)'),
359 ((db.relpath(x),) for x in paths))
361 if show_progress:
362 task = make_task('Preparing database', 5)
363 task.update(0, condition='adding file names to database')
365 cursor.execute(self._sql(
366 '''
367 INSERT OR IGNORE INTO files
368 SELECT NULL, path, NULL, NULL, NULL
369 FROM temp.%(bulkinsert)s
370 '''))
372 if show_progress:
373 task.update(1, condition='pruning stale information')
375 cursor.execute(self._sql(
376 '''
377 DELETE FROM %(db)s.%(file_states)s
378 WHERE file_id IN (
379 SELECT files.file_id
380 FROM temp.%(bulkinsert)s
381 INNER JOIN files
382 ON temp.%(bulkinsert)s.path == files.path)
383 AND ( kind_mask != ? OR format != ? )
384 '''), (kind_mask, format))
386 if show_progress:
387 task.update(2, condition='adding file names to selection')
389 cursor.execute(self._sql(
390 '''
391 INSERT OR IGNORE INTO %(db)s.%(file_states)s
392 SELECT files.file_id, 0, ?, ?
393 FROM temp.%(bulkinsert)s
394 INNER JOIN files
395 ON temp.%(bulkinsert)s.path == files.path
396 '''), (kind_mask, format))
398 if show_progress:
399 task.update(3, condition='updating file states')
401 cursor.execute(self._sql(
402 '''
403 UPDATE %(db)s.%(file_states)s
404 SET file_state = 1
405 WHERE file_id IN (
406 SELECT files.file_id
407 FROM temp.%(bulkinsert)s
408 INNER JOIN files
409 ON temp.%(bulkinsert)s.path == files.path)
410 AND file_state != 0
411 '''))
413 if show_progress:
414 task.update(4, condition='dropping temporary data')
416 cursor.execute(self._sql(
417 'DROP TABLE temp.%(bulkinsert)s'))
419 if show_progress:
420 task.update(5)
421 task.done()
423 def remove(self, paths):
424 '''
425 Remove files from the selection.
427 :param paths:
428 Paths to files to be removed from the selection.
429 :type paths:
430 :py:class:`list` of :py:class:`str`
431 '''
432 if isinstance(paths, str):
433 paths = [paths]
435 db = self.get_database()
437 def normpath(path):
438 return db.relpath(abspath(path))
440 with self.transaction('remove files') as cursor:
441 cursor.executemany(self._sql(
442 '''
443 DELETE FROM %(db)s.%(file_states)s
444 WHERE %(db)s.%(file_states)s.file_id IN
445 (SELECT files.file_id
446 FROM files
447 WHERE files.path == ?)
448 '''), ((normpath(path),) for path in paths))
450 def iter_paths(self, raw=False):
451 '''
452 Iterate over all file paths currently belonging to the selection.
454 :param raw:
455 By default absolute paths are yielded. Set to ``True`` to yield
456 the path as it is stored in the database, which can be relative or
457 absolute, depending on whether the file is within a Squirrel
458 environment or outside.
459 :type raw:
460 bool
462 :yields: File paths.
463 '''
465 sql = self._sql('''
466 SELECT
467 files.path
468 FROM %(db)s.%(file_states)s
469 INNER JOIN files
470 ON files.file_id = %(db)s.%(file_states)s.file_id
471 ORDER BY %(db)s.%(file_states)s.file_id
472 ''')
474 if raw:
475 def trans(path):
476 return path
477 else:
478 db = self.get_database()
479 trans = db.abspath
481 for values in self._conn.execute(sql):
482 yield trans(values[0])
484 def get_paths(self, raw=False):
485 '''
486 Get all file paths currently belonging to the selection.
488 :param raw:
489 By default absolute paths are returned. Set to ``True`` to return
490 the path as it is stored in the database, which can be relative or
491 absolute, depending on whether the file is within a Squirrel
492 environment or outside.
493 :type raw:
494 bool
496 :returns: List of file paths.
497 '''
498 return list(self.iter_paths(raw=raw))
500 def _set_file_states_known(self, transaction=None):
501 '''
502 Set file states to "known" (2).
503 '''
504 with (transaction or self.transaction('set file states known')) \
505 as cursor:
506 cursor.execute(self._sql(
507 '''
508 UPDATE %(db)s.%(file_states)s
509 SET file_state = 2
510 WHERE file_state < 2
511 '''))
513 def _set_file_states_force_check(self, paths=None, transaction=None):
514 '''
515 Set file states to "request force check" (1).
516 '''
518 with (transaction or self.transaction('set file states force check')) \
519 as cursor:
521 if paths is None:
522 cursor.execute(self._sql(
523 '''
524 UPDATE %(db)s.%(file_states)s
525 SET file_state = 1
526 '''))
527 else:
528 db = self.get_database()
530 def normpath(path):
531 return db.relpath(abspath(path))
533 cursor.executemany(self._sql(
534 '''
535 UPDATE %(db)s.%(file_states)s
536 SET file_state = 1
537 WHERE %(db)s.%(file_states)s.file_id IN
538 (SELECT files.file_id
539 FROM files
540 WHERE files.path == ?)
541 '''), ((normpath(path),) for path in paths))
543 def undig_grouped(self, skip_unchanged=False):
544 '''
545 Get inventory of cached content for all files in the selection.
547 :param skip_unchanged:
548 If ``True`` only inventory of modified files is
549 yielded (:py:meth:`flag_modified` must be called beforehand).
550 :type skip_unchanged:
551 bool
553 This generator yields tuples ``((format, path), nuts)`` where ``path``
554 is the path to the file, ``format`` is the format assignation or
555 ``'detect'`` and ``nuts`` is a list of
556 :py:class:`~pyrocko.squirrel.model.Nut` objects representing the
557 contents of the file.
558 '''
560 if skip_unchanged:
561 where = '''
562 WHERE %(db)s.%(file_states)s.file_state == 0
563 '''
564 else:
565 where = ''
567 nfiles = execute_get1(self._conn, self._sql('''
568 SELECT
569 COUNT()
570 FROM %(db)s.%(file_states)s
571 ''' + where), ())[0]
573 def gen():
574 sql = self._sql('''
575 SELECT
576 %(db)s.%(file_states)s.format,
577 files.path,
578 files.format,
579 files.mtime,
580 files.size,
581 nuts.file_segment,
582 nuts.file_element,
583 kind_codes.kind_id,
584 kind_codes.codes,
585 nuts.tmin_seconds,
586 nuts.tmin_offset,
587 nuts.tmax_seconds,
588 nuts.tmax_offset,
589 kind_codes.deltat
590 FROM %(db)s.%(file_states)s
591 LEFT OUTER JOIN files
592 ON %(db)s.%(file_states)s.file_id = files.file_id
593 LEFT OUTER JOIN nuts
594 ON files.file_id = nuts.file_id
595 LEFT OUTER JOIN kind_codes
596 ON nuts.kind_codes_id == kind_codes.kind_codes_id
597 ''' + where + '''
598 ORDER BY %(db)s.%(file_states)s.file_id
599 ''')
601 nuts = []
602 format_path = None
603 db = self.get_database()
604 for values in self._conn.execute(sql):
605 apath = db.abspath(values[1])
606 if format_path is not None and apath != format_path[1]:
607 yield format_path, nuts
608 nuts = []
610 format_path = values[0], apath
612 if values[2] is not None:
613 nuts.append(model.Nut(
614 values_nocheck=format_path[1:2] + values[2:]))
616 if format_path is not None:
617 yield format_path, nuts
619 return GeneratorWithLen(gen(), nfiles)
621 def flag_modified(self, check=True):
622 '''
623 Mark files which have been modified.
625 :param check:
626 If ``True`` query modification times of known files on disk. If
627 ``False``, only flag unknown files.
628 :type check:
629 bool
631 Assumes file state is 0 for newly added files, 1 for files added again
632 to the selection (forces check), or 2 for all others (no checking is
633 done for those).
635 Sets file state to 0 for unknown or modified files, 2 for known and not
636 modified files.
637 '''
639 db = self.get_database()
640 with self.transaction('flag modified') as cursor:
641 sql = self._sql('''
642 UPDATE %(db)s.%(file_states)s
643 SET file_state = 0
644 WHERE (
645 SELECT mtime
646 FROM files
647 WHERE
648 files.file_id == %(db)s.%(file_states)s.file_id) IS NULL
649 AND file_state == 1
650 ''')
652 cursor.execute(sql)
654 if not check:
656 sql = self._sql('''
657 UPDATE %(db)s.%(file_states)s
658 SET file_state = 2
659 WHERE file_state == 1
660 ''')
662 cursor.execute(sql)
664 return
666 def iter_file_states():
667 sql = self._sql('''
668 SELECT
669 files.file_id,
670 files.path,
671 files.format,
672 files.mtime,
673 files.size
674 FROM %(db)s.%(file_states)s
675 INNER JOIN files
676 ON %(db)s.%(file_states)s.file_id == files.file_id
677 WHERE %(db)s.%(file_states)s.file_state == 1
678 ORDER BY %(db)s.%(file_states)s.file_id
679 ''')
681 for (file_id, path, fmt, mtime_db,
682 size_db) in self._conn.execute(sql):
684 path = db.abspath(path)
685 try:
686 mod = io.get_backend(fmt)
687 file_stats = mod.get_stats(path)
689 except FileLoadError:
690 yield 0, file_id
691 continue
692 except io.UnknownFormat:
693 continue
695 if (mtime_db, size_db) != file_stats:
696 yield 0, file_id
697 else:
698 yield 2, file_id
700 # could better use callback function here...
702 sql = self._sql('''
703 UPDATE %(db)s.%(file_states)s
704 SET file_state = ?
705 WHERE file_id = ?
706 ''')
708 cursor.executemany(sql, iter_file_states())
711__all__ = [
712 'Selection',
713]