Coverage for /usr/local/lib/python3.13/dist-packages/pyrocko/squirrel/selection.py: 89%
230 statements
« prev ^ index » next coverage.py v7.6.0, created at 2025-12-04 10:41 +0000
« prev ^ index » next coverage.py v7.6.0, created at 2025-12-04 10:41 +0000
1# http://pyrocko.org - GPLv3
2#
3# The Pyrocko Developers, 21st Century
4# ---|P------/S----------~Lg----------
6'''
7Meta-data caching for flexible file selections.
8'''
10import os
11import re
12import threading
13import logging
15from pyrocko import util
16from pyrocko.io.io_common import FileLoadError
17from pyrocko import progress
19from . import error, io, model
20from .database import Database, get_database, execute_get1, abspath
22logger = logging.getLogger('psq.selection')
24g_icount = 0
25g_lock = threading.Lock()
27re_persistent_name = re.compile(r'^[a-zA-Z_][a-zA-Z0-9_]{0,64}$')
30def make_unique_name():
31 with g_lock:
32 global g_icount
33 name = '%i_%i' % (os.getpid(), g_icount)
34 g_icount += 1
36 return name
39def make_task(*args):
40 return progress.task(*args, logger=logger)
43doc_snippets = dict(
44 query_args='''
45 :param obj:
46 Object providing ``tmin``, ``tmax`` and ``codes`` to be used to
47 constrain the query. Direct arguments override those from ``obj``.
48 :type obj:
49 any object with attributes ``tmin``, ``tmax`` and ``codes``
51 :param tmin:
52 Start time of query interval.
53 :type tmin:
54 :py:func:`~pyrocko.util.get_time_float`
56 :param tmax:
57 End time of query interval.
58 :type tmax:
59 :py:func:`~pyrocko.util.get_time_float`
61 :param time:
62 Time instant to query. Equivalent to setting ``tmin`` and ``tmax``
63 to the same value.
64 :type time:
65 :py:func:`~pyrocko.util.get_time_float`
67 :param codes:
68 Pattern of content codes to query.
69 :type codes:
70 :class:`list` of :py:class:`~pyrocko.squirrel.model.Codes`
71 objects appropriate for the queried content type, or anything which
72 can be converted to such objects.
73''',
74 file_formats=', '.join(
75 "``'%s'``" % fmt for fmt in io.supported_formats()))
78def filldocs(meth):
79 meth.__doc__ %= doc_snippets
80 return meth
83class GeneratorWithLen(object):
85 def __init__(self, gen, length):
86 self.gen = gen
87 self.length = length
89 def __len__(self):
90 return self.length
92 def __iter__(self):
93 return self.gen
96class Selection(object):
98 '''
99 Database backed file selection (base class for
100 :py:class:`~pyrocko.squirrel.base.Squirrel`).
102 :param database:
103 Database instance or file path to database.
104 :type database:
105 :py:class:`~pyrocko.squirrel.database.Database` or :py:class:`str`
107 :param persistent:
108 If given a name, create a persistent selection.
109 :type persistent:
110 :py:class:`str`
112 A selection in this context represents the list of files available to the
113 application. Instead of using :py:class:`Selection` directly, user
114 applications should usually use its subclass
115 :py:class:`~pyrocko.squirrel.base.Squirrel` which adds content indices to
116 the selection and provides high level data querying.
118 By default, a temporary table in the database is created to hold the names
119 of the files in the selection. This table is only visible inside the
120 application which created it. If a name is given to ``persistent``, a named
121 selection is created, which is visible also in other applications using the
122 same database.
124 Besides the filename references, desired content kind masks and file format
125 indications are stored in the selection's database table to make the user
126 choice regarding these options persistent on a per-file basis. Book-keeping
127 on whether files are unknown, known or if modification checks are forced is
128 handled in the selection's file-state table.
130 Paths of files can be added to the selection using the :py:meth:`add`
131 method and removed with :py:meth:`remove`. :py:meth:`undig_grouped` can be
132 used to iterate over all content known to the selection.
133 '''
135 def __init__(self, database, persistent=None):
136 self._conn = None
138 if not isinstance(database, Database):
139 database = get_database(database)
141 if persistent is not None:
142 assert isinstance(persistent, str)
143 if not re_persistent_name.match(persistent):
144 raise error.SquirrelError(
145 'invalid persistent selection name: %s' % persistent)
147 self.name = 'psel_' + persistent
148 else:
149 self.name = 'sel_' + make_unique_name()
151 self._persistent = persistent
152 self._database = database
153 self._database.add_selection(self)
155 self._conn = self._database.get_connection()
156 self._sources = []
157 self._is_new = True
158 self._volatile_paths = []
160 with self.transaction('init selection') as cursor:
162 if persistent is not None:
163 self._is_new = 1 == cursor.execute(
164 '''
165 INSERT OR IGNORE INTO persistent VALUES (?)
166 ''', (persistent,)).rowcount
168 self._names = {
169 'db': 'main' if self._persistent else 'temp',
170 'file_states': self.name + '_file_states',
171 'bulkinsert': self.name + '_bulkinsert'}
173 cursor.execute(self._register_table(self._sql(
174 '''
175 CREATE TABLE IF NOT EXISTS %(db)s.%(file_states)s (
176 file_id integer PRIMARY KEY,
177 file_state integer,
178 kind_mask integer,
179 format text)
180 ''')))
182 cursor.execute(self._sql(
183 '''
184 CREATE INDEX
185 IF NOT EXISTS %(db)s.%(file_states)s_index_file_state
186 ON %(file_states)s (file_state)
187 '''))
189 def __del__(self):
190 self.close()
192 def close(self, delete_persistent=False):
193 if hasattr(self, '_conn') and self._conn:
194 self._cleanup()
195 if not self._persistent or delete_persistent:
196 self._delete()
198 self._conn = None
199 self._database.remove_selection(self)
201 def delete(self):
202 if self._conn is None:
203 raise error.SquirrelError(
204 'Cannot delete selection, no database connection.')
206 self.close(delete_persistent=True)
208 def _cleanup(self):
209 '''
210 Perform cleanup actions before database connection is closed.
212 Removes volatile content from database.
213 '''
215 while self._volatile_paths:
216 path = self._volatile_paths.pop()
217 self._database.remove(path)
219 def _delete(self):
220 '''
221 Destroy the tables assoctiated with this selection.
222 '''
223 with self.transaction('delete selection') as cursor:
224 cursor.execute(self._sql(
225 'DROP TABLE %(db)s.%(file_states)s'))
227 if self._persistent:
228 cursor.execute(
229 '''
230 DELETE FROM persistent WHERE name == ?
231 ''', (self.name[5:],))
233 def _register_table(self, s):
234 return self._database._register_table(s)
236 def _sql(self, s):
237 return s % self._names
239 def transaction(self, label='', mode='immediate'):
240 return self._database.transaction(label, mode)
242 def is_new(self):
243 '''
244 Is this a new selection?
246 Always ``True`` for non-persistent selections. Only ``False`` for
247 a persistent selection which already existed in the database when the
248 it was initialized.
249 '''
250 return self._is_new
252 def get_database(self):
253 '''
254 Get the database to which this selection belongs.
256 :returns: :py:class:`~pyrocko.squirrel.database.Database` object
257 '''
258 return self._database
260 @filldocs
261 def add(
262 self,
263 paths,
264 kind_mask=model.g_kind_mask_all,
265 format='detect',
266 show_progress=True,
267 transaction=None):
269 '''
270 Add files to the selection.
272 :param paths:
273 Paths to files to be added to the selection.
274 :type paths:
275 iterator yielding :py:class:`str` objects
277 :param kind_mask:
278 Content kinds to be added to the selection.
279 :type kind_mask:
280 :py:class:`int` (bit mask)
282 :param format:
283 File format identifier or ``'detect'`` to enable auto-detection
284 (available: %(file_formats)s).
285 :type format:
286 str
287 '''
289 if isinstance(paths, str):
290 paths = [paths]
292 paths = util.short_to_list(200, paths)
294 if isinstance(paths, list) and len(paths) == 0:
295 return
297 use_temp_table = not isinstance(paths, list) or len(paths) > 200
299 if show_progress:
300 task = make_task('Gathering file names')
301 paths = task(paths)
303 db = self.get_database()
304 with (transaction or self.transaction('add files')) as cursor:
305 if not use_temp_table:
306 paths = [db.relpath(path) for path in paths]
308 # short non-iterator paths: can do without temp table
310 cursor.executemany(
311 '''
312 INSERT OR IGNORE INTO files
313 VALUES (NULL, ?, NULL, NULL, NULL)
314 ''', ((x,) for x in paths))
316 if show_progress:
317 task = make_task('Preparing database', 3)
318 task.update(0, condition='pruning stale information')
320 cursor.executemany(self._sql(
321 '''
322 DELETE FROM %(db)s.%(file_states)s
323 WHERE file_id IN (
324 SELECT files.file_id
325 FROM files
326 WHERE files.path == ? )
327 AND ( kind_mask != ? OR format != ? )
328 '''), (
329 (path, kind_mask, format) for path in paths))
331 if show_progress:
332 task.update(1, condition='adding file names to selection')
334 cursor.executemany(self._sql(
335 '''
336 INSERT OR IGNORE INTO %(db)s.%(file_states)s
337 SELECT files.file_id, 0, ?, ?
338 FROM files
339 WHERE files.path = ?
340 '''), ((kind_mask, format, path) for path in paths))
342 if show_progress:
343 task.update(2, condition='updating file states')
345 cursor.executemany(self._sql(
346 '''
347 UPDATE %(db)s.%(file_states)s
348 SET file_state = 1
349 WHERE file_id IN (
350 SELECT files.file_id
351 FROM files
352 WHERE files.path == ? )
353 AND file_state != 0
354 '''), ((path,) for path in paths))
356 if show_progress:
357 task.update(3)
358 task.done()
360 else:
362 cursor.execute(self._sql(
363 '''
364 CREATE TEMP TABLE temp.%(bulkinsert)s
365 (path text)
366 '''))
368 cursor.executemany(self._sql(
369 'INSERT INTO temp.%(bulkinsert)s VALUES (?)'),
370 ((db.relpath(x),) for x in paths))
372 if show_progress:
373 task = make_task('Preparing database', 5)
374 task.update(0, condition='adding file names to database')
376 cursor.execute(self._sql(
377 '''
378 INSERT OR IGNORE INTO files
379 SELECT NULL, path, NULL, NULL, NULL
380 FROM temp.%(bulkinsert)s
381 '''))
383 if show_progress:
384 task.update(1, condition='pruning stale information')
386 cursor.execute(self._sql(
387 '''
388 DELETE FROM %(db)s.%(file_states)s
389 WHERE file_id IN (
390 SELECT files.file_id
391 FROM temp.%(bulkinsert)s
392 INNER JOIN files
393 ON temp.%(bulkinsert)s.path == files.path)
394 AND ( kind_mask != ? OR format != ? )
395 '''), (kind_mask, format))
397 if show_progress:
398 task.update(2, condition='adding file names to selection')
400 cursor.execute(self._sql(
401 '''
402 INSERT OR IGNORE INTO %(db)s.%(file_states)s
403 SELECT files.file_id, 0, ?, ?
404 FROM temp.%(bulkinsert)s
405 INNER JOIN files
406 ON temp.%(bulkinsert)s.path == files.path
407 '''), (kind_mask, format))
409 if show_progress:
410 task.update(3, condition='updating file states')
412 cursor.execute(self._sql(
413 '''
414 UPDATE %(db)s.%(file_states)s
415 SET file_state = 1
416 WHERE file_id IN (
417 SELECT files.file_id
418 FROM temp.%(bulkinsert)s
419 INNER JOIN files
420 ON temp.%(bulkinsert)s.path == files.path)
421 AND file_state != 0
422 '''))
424 if show_progress:
425 task.update(4, condition='dropping temporary data')
427 cursor.execute(self._sql(
428 'DROP TABLE temp.%(bulkinsert)s'))
430 if show_progress:
431 task.update(5)
432 task.done()
434 def remove(self, paths):
435 '''
436 Remove files from the selection.
438 :param paths:
439 Paths to files to be removed from the selection.
440 :type paths:
441 :py:class:`list` of :py:class:`str`
442 '''
443 if isinstance(paths, str):
444 paths = [paths]
446 db = self.get_database()
448 def normpath(path):
449 return db.relpath(abspath(path))
451 with self.transaction('remove files') as cursor:
452 cursor.executemany(self._sql(
453 '''
454 DELETE FROM %(db)s.%(file_states)s
455 WHERE %(db)s.%(file_states)s.file_id IN
456 (SELECT files.file_id
457 FROM files
458 WHERE files.path == ?)
459 '''), ((normpath(path),) for path in paths))
461 def iter_paths(self, raw=False, format=None):
462 '''
463 Iterate over all file paths currently belonging to the selection.
465 :param raw:
466 By default absolute paths are yielded. Set to ``True`` to yield
467 the path as it is stored in the database, which can be relative or
468 absolute, depending on whether the file is within a Squirrel
469 environment or outside.
470 :type raw:
471 bool
473 :yields: File paths.
474 '''
476 conditions = []
477 args = []
478 if format is not None:
479 conditions.append('files.format = ?')
480 args.append(format)
482 if conditions:
483 condition = 'WHERE %s' % ' AND '.join(conditions)
484 else:
485 condition = ''
487 sql = self._sql('''
488 SELECT
489 files.path
490 FROM %(db)s.%(file_states)s
491 INNER JOIN files
492 ON files.file_id = %(db)s.%(file_states)s.file_id
493''' + condition + '''
494 ORDER BY %(db)s.%(file_states)s.file_id
495 ''')
497 if raw:
498 def trans(path):
499 return path
500 else:
501 db = self.get_database()
502 trans = db.abspath
504 for values in self._conn.execute(sql, args):
505 yield trans(values[0])
507 def get_paths(self, raw=False, format=None):
508 '''
509 Get all file paths currently belonging to the selection.
511 :param raw:
512 By default absolute paths are returned. Set to ``True`` to return
513 the path as it is stored in the database, which can be relative or
514 absolute, depending on whether the file is within a Squirrel
515 environment or outside.
516 :type raw:
517 bool
519 :returns: List of file paths.
520 '''
521 return list(self.iter_paths(raw=raw, format=format))
523 def _set_file_states_known(self, transaction=None):
524 '''
525 Set file states to "known" (2).
526 '''
527 with (transaction or self.transaction('set file states known')) \
528 as cursor:
529 cursor.execute(self._sql(
530 '''
531 UPDATE %(db)s.%(file_states)s
532 SET file_state = 2
533 WHERE file_state < 2
534 '''))
536 def _set_file_states_force_check(self, paths=None, transaction=None):
537 '''
538 Set file states to "request force check" (1).
539 '''
541 with (transaction or self.transaction('set file states force check')) \
542 as cursor:
544 if paths is None:
545 cursor.execute(self._sql(
546 '''
547 UPDATE %(db)s.%(file_states)s
548 SET file_state = 1
549 '''))
550 else:
551 db = self.get_database()
553 def normpath(path):
554 return db.relpath(abspath(path))
556 cursor.executemany(self._sql(
557 '''
558 UPDATE %(db)s.%(file_states)s
559 SET file_state = 1
560 WHERE %(db)s.%(file_states)s.file_id IN
561 (SELECT files.file_id
562 FROM files
563 WHERE files.path == ?)
564 '''), ((normpath(path),) for path in paths))
566 def undig_grouped(self, skip_unchanged=False):
567 '''
568 Get inventory of cached content for all files in the selection.
570 :param skip_unchanged:
571 If ``True`` only inventory of modified files is
572 yielded (:py:meth:`flag_modified` must be called beforehand).
573 :type skip_unchanged:
574 bool
576 This generator yields tuples ``((format, path), nuts)`` where ``path``
577 is the path to the file, ``format`` is the format assignation or
578 ``'detect'`` and ``nuts`` is a list of
579 :py:class:`~pyrocko.squirrel.model.Nut` objects representing the
580 contents of the file.
581 '''
583 where = ''
584 if skip_unchanged:
585 where = '''
586 WHERE file_states_b.file_state == 0
587 '''
589 nfiles = execute_get1(self._conn, self._sql('''
590 SELECT
591 COUNT()
592 FROM %(db)s.%(file_states)s file_states_b
593 ''' + where), ())[0]
595 def gen():
596 sql = self._sql('''
597 SELECT
598 file_states_a.format,
599 files.path,
600 files.format,
601 files.mtime,
602 files.size,
603 nuts.file_segment,
604 nuts.file_element,
605 kind_codes.kind_id,
606 kind_codes.codes,
607 nuts.tmin_seconds,
608 nuts.tmin_offset,
609 nuts.tmax_seconds,
610 nuts.tmax_offset,
611 kind_codes.deltat
612 FROM %(db)s.%(file_states)s file_states_a
613 LEFT OUTER JOIN files
614 ON file_states_a.file_id = files.file_id
615 LEFT OUTER JOIN nuts
616 ON files.file_id = nuts.file_id
617 LEFT OUTER JOIN kind_codes
618 ON nuts.kind_codes_id == kind_codes.kind_codes_id
619 WHERE file_states_a.file_id IN (
620 SELECT file_id FROM %(db)s.%(file_states)s file_states_b
621 ''' + where + '''
622 ORDER BY file_states_b.file_id
623 LIMIT ? OFFSET ?
624 )
625 ORDER BY file_states_a.file_id
626 ''')
628 db = self.get_database()
630 limit = 100
631 offset = 0
632 while True:
633 nuts = []
634 format_path = None
635 at_end = True
636 for values in self._conn.execute(
637 sql, (limit, offset)):
639 at_end = False
640 apath = db.abspath(values[1])
641 if format_path is not None and apath != format_path[1]:
642 yield format_path, nuts
643 nuts = []
645 format_path = values[0], apath
647 if values[2] is not None:
648 nuts.append(model.Nut(
649 values_nocheck=format_path[1:2] + values[2:]))
651 if format_path is not None:
652 yield format_path, nuts
654 offset += limit
655 if at_end:
656 break
658 return GeneratorWithLen(gen(), nfiles)
660 def flag_modified(self, check=True, transaction=None):
661 '''
662 Mark files which have been modified.
664 :param check:
665 If ``True`` query modification times of known files on disk. If
666 ``False``, only flag unknown files.
667 :type check:
668 bool
670 Assumes file state is 0 for newly added files, 1 for files added again
671 to the selection (forces check), or 2 for all others (no checking is
672 done for those).
674 Sets file state to 0 for unknown or modified files, 2 for known and not
675 modified files.
676 '''
678 db = self.get_database()
679 with (transaction or self.transaction('flag modified')) as cursor:
680 sql = self._sql('''
681 UPDATE %(db)s.%(file_states)s
682 SET file_state = 0
683 WHERE (
684 SELECT mtime
685 FROM files
686 WHERE
687 files.file_id == %(db)s.%(file_states)s.file_id) IS NULL
688 AND file_state == 1
689 ''')
691 cursor.execute(sql)
693 if not check:
695 sql = self._sql('''
696 UPDATE %(db)s.%(file_states)s
697 SET file_state = 2
698 WHERE file_state == 1
699 ''')
701 cursor.execute(sql)
703 return
705 def iter_file_states():
706 sql = self._sql('''
707 SELECT
708 files.file_id,
709 files.path,
710 files.format,
711 files.mtime,
712 files.size
713 FROM %(db)s.%(file_states)s
714 INNER JOIN files
715 ON %(db)s.%(file_states)s.file_id == files.file_id
716 WHERE %(db)s.%(file_states)s.file_state == 1
717 ORDER BY %(db)s.%(file_states)s.file_id
718 ''')
720 for (file_id, path, fmt, mtime_db,
721 size_db) in self._conn.execute(sql):
723 path = db.abspath(path)
724 try:
725 mod = io.get_backend(fmt)
726 file_stats = mod.get_stats(path)
728 except FileLoadError:
729 yield 0, file_id
730 continue
731 except io.UnknownFormat:
732 continue
734 if (mtime_db, size_db) != file_stats:
735 yield 0, file_id
736 else:
737 yield 2, file_id
739 # could better use callback function here...
741 sql = self._sql('''
742 UPDATE %(db)s.%(file_states)s
743 SET file_state = ?
744 WHERE file_id = ?
745 ''')
747 cursor.executemany(sql, iter_file_states())
750__all__ = [
751 'Selection',
752]