1# http://pyrocko.org - GPLv3
2#
3# The Pyrocko Developers, 21st Century
4# ---|P------/S----------~Lg----------
6import os
7import re
8import threading
9import logging
11from pyrocko import util
12from pyrocko.io.io_common import FileLoadError
13from pyrocko.progress import progress
15from . import error, io, model
16from .database import Database, get_database, execute_get1, abspath
18logger = logging.getLogger('psq.selection')
20g_icount = 0
21g_lock = threading.Lock()
23re_persistent_name = re.compile(r'^[a-zA-Z_][a-zA-Z0-9_]{0,64}$')
26def make_unique_name():
27 with g_lock:
28 global g_icount
29 name = '%i_%i' % (os.getpid(), g_icount)
30 g_icount += 1
32 return name
35def make_task(*args):
36 return progress.task(*args, logger=logger)
39doc_snippets = dict(
40 query_args='''
41 :param obj:
42 Object providing ``tmin``, ``tmax`` and ``codes`` to be used to
43 constrain the query. Direct arguments override those from ``obj``.
44 :type obj:
45 any object with attributes ``tmin``, ``tmax`` and ``codes``
47 :param tmin:
48 Start time of query interval.
49 :type tmin:
50 timestamp
52 :param tmax:
53 End time of query interval.
54 :type tmax:
55 timestamp
57 :param time:
58 Time instant to query. Equivalent to setting ``tmin`` and ``tmax``
59 to the same value.
60 :type time:
61 timestamp
63 :param codes:
64 Pattern of content codes to query.
65 :type codes:
66 :class:`list` of :py:class:`~pyrocko.squirrel.model.Codes`
67 objects appropriate for the queried content type, or anything which
68 can be converted to such objects.
69''',
70 file_formats=', '.join(
71 "``'%s'``" % fmt for fmt in io.supported_formats()))
74def filldocs(meth):
75 meth.__doc__ %= doc_snippets
76 return meth
79class GeneratorWithLen(object):
81 def __init__(self, gen, length):
82 self.gen = gen
83 self.length = length
85 def __len__(self):
86 return self.length
88 def __iter__(self):
89 return self.gen
92class Selection(object):
94 '''
95 Database backed file selection (base class for
96 :py:class:`~pyrocko.squirrel.base.Squirrel`).
98 :param database:
99 Database instance or file path to database.
100 :type database:
101 :py:class:`~pyrocko.squirrel.database.Database` or :py:class:`str`
103 :param persistent:
104 If given a name, create a persistent selection.
105 :type persistent:
106 :py:class:`str`
108 A selection in this context represents the list of files available to the
109 application. Instead of using :py:class:`Selection` directly, user
110 applications should usually use its subclass
111 :py:class:`~pyrocko.squirrel.base.Squirrel` which adds content indices to
112 the selection and provides high level data querying.
114 By default, a temporary table in the database is created to hold the names
115 of the files in the selection. This table is only visible inside the
116 application which created it. If a name is given to ``persistent``, a named
117 selection is created, which is visible also in other applications using the
118 same database.
120 Besides the filename references, desired content kind masks and file format
121 indications are stored in the selection's database table to make the user
122 choice regarding these options persistent on a per-file basis. Book-keeping
123 on whether files are unknown, known or if modification checks are forced is
124 handled in the selection's file-state table.
126 Paths of files can be added to the selection using the :py:meth:`add`
127 method and removed with :py:meth:`remove`. :py:meth:`undig_grouped` can be
128 used to iterate over all content known to the selection.
129 '''
131 def __init__(self, database, persistent=None):
132 self._conn = None
134 if not isinstance(database, Database):
135 database = get_database(database)
137 if persistent is not None:
138 assert isinstance(persistent, str)
139 if not re_persistent_name.match(persistent):
140 raise error.SquirrelError(
141 'invalid persistent selection name: %s' % persistent)
143 self.name = 'psel_' + persistent
144 else:
145 self.name = 'sel_' + make_unique_name()
147 self._persistent = persistent
148 self._database = database
149 self._conn = self._database.get_connection()
150 self._sources = []
151 self._is_new = True
152 self._volatile_paths = []
154 with self.transaction('init selection') as cursor:
156 if persistent is not None:
157 self._is_new = 1 == cursor.execute(
158 '''
159 INSERT OR IGNORE INTO persistent VALUES (?)
160 ''', (persistent,)).rowcount
162 self._names = {
163 'db': 'main' if self._persistent else 'temp',
164 'file_states': self.name + '_file_states',
165 'bulkinsert': self.name + '_bulkinsert'}
167 cursor.execute(self._register_table(self._sql(
168 '''
169 CREATE TABLE IF NOT EXISTS %(db)s.%(file_states)s (
170 file_id integer PRIMARY KEY,
171 file_state integer,
172 kind_mask integer,
173 format text)
174 ''')))
176 cursor.execute(self._sql(
177 '''
178 CREATE INDEX
179 IF NOT EXISTS %(db)s.%(file_states)s_index_file_state
180 ON %(file_states)s (file_state)
181 '''))
183 def __del__(self):
184 if hasattr(self, '_conn') and self._conn:
185 self._cleanup()
186 if not self._persistent:
187 self._delete()
189 def _register_table(self, s):
190 return self._database._register_table(s)
192 def _sql(self, s):
193 return s % self._names
195 def transaction(self, label='', mode='immediate'):
196 return self._database.transaction(label, mode)
198 def is_new(self):
199 '''
200 Is this a new selection?
202 Always ``True`` for non-persistent selections. Only ``False`` for
203 a persistent selection which already existed in the database when the
204 it was initialized.
205 '''
206 return self._is_new
208 def get_database(self):
209 '''
210 Get the database to which this selection belongs.
212 :returns: :py:class:`~pyrocko.squirrel.database.Database` object
213 '''
214 return self._database
216 def _cleanup(self):
217 '''
218 Perform cleanup actions before database connection is closed.
220 Removes volatile content from database.
221 '''
223 while self._volatile_paths:
224 path = self._volatile_paths.pop()
225 self._database.remove(path)
227 def _delete(self):
228 '''
229 Destroy the tables assoctiated with this selection.
230 '''
231 with self.transaction('delete selection') as cursor:
232 cursor.execute(self._sql(
233 'DROP TABLE %(db)s.%(file_states)s'))
235 if self._persistent:
236 cursor.execute(
237 '''
238 DELETE FROM persistent WHERE name == ?
239 ''', (self.name[5:],))
241 self._conn = None
243 def delete(self):
244 self._delete()
246 @filldocs
247 def add(
248 self,
249 paths,
250 kind_mask=model.g_kind_mask_all,
251 format='detect',
252 show_progress=True):
254 '''
255 Add files to the selection.
257 :param paths:
258 Paths to files to be added to the selection.
259 :type paths:
260 iterator yielding :py:class:`str` objects
262 :param kind_mask:
263 Content kinds to be added to the selection.
264 :type kind_mask:
265 :py:class:`int` (bit mask)
267 :param format:
268 File format identifier or ``'detect'`` to enable auto-detection
269 (available: %(file_formats)s).
270 :type format:
271 str
272 '''
274 if isinstance(paths, str):
275 paths = [paths]
277 paths = util.short_to_list(200, paths)
279 if isinstance(paths, list) and len(paths) == 0:
280 return
282 if show_progress:
283 task = make_task('Gathering file names')
284 paths = task(paths)
286 db = self.get_database()
287 with self.transaction('add files') as cursor:
289 if isinstance(paths, list) and len(paths) <= 200:
291 paths = [db.relpath(path) for path in paths]
293 # short non-iterator paths: can do without temp table
295 cursor.executemany(
296 '''
297 INSERT OR IGNORE INTO files
298 VALUES (NULL, ?, NULL, NULL, NULL)
299 ''', ((x,) for x in paths))
301 if show_progress:
302 task = make_task('Preparing database', 3)
303 task.update(0, condition='pruning stale information')
305 cursor.executemany(self._sql(
306 '''
307 DELETE FROM %(db)s.%(file_states)s
308 WHERE file_id IN (
309 SELECT files.file_id
310 FROM files
311 WHERE files.path == ? )
312 AND ( kind_mask != ? OR format != ? )
313 '''), (
314 (path, kind_mask, format) for path in paths))
316 if show_progress:
317 task.update(1, condition='adding file names to selection')
319 cursor.executemany(self._sql(
320 '''
321 INSERT OR IGNORE INTO %(db)s.%(file_states)s
322 SELECT files.file_id, 0, ?, ?
323 FROM files
324 WHERE files.path = ?
325 '''), ((kind_mask, format, path) for path in paths))
327 if show_progress:
328 task.update(2, condition='updating file states')
330 cursor.executemany(self._sql(
331 '''
332 UPDATE %(db)s.%(file_states)s
333 SET file_state = 1
334 WHERE file_id IN (
335 SELECT files.file_id
336 FROM files
337 WHERE files.path == ? )
338 AND file_state != 0
339 '''), ((path,) for path in paths))
341 if show_progress:
342 task.update(3)
343 task.done()
345 else:
347 cursor.execute(self._sql(
348 '''
349 CREATE TEMP TABLE temp.%(bulkinsert)s
350 (path text)
351 '''))
353 cursor.executemany(self._sql(
354 'INSERT INTO temp.%(bulkinsert)s VALUES (?)'),
355 ((db.relpath(x),) for x in paths))
357 if show_progress:
358 task = make_task('Preparing database', 5)
359 task.update(0, condition='adding file names to database')
361 cursor.execute(self._sql(
362 '''
363 INSERT OR IGNORE INTO files
364 SELECT NULL, path, NULL, NULL, NULL
365 FROM temp.%(bulkinsert)s
366 '''))
368 if show_progress:
369 task.update(1, condition='pruning stale information')
371 cursor.execute(self._sql(
372 '''
373 DELETE FROM %(db)s.%(file_states)s
374 WHERE file_id IN (
375 SELECT files.file_id
376 FROM temp.%(bulkinsert)s
377 INNER JOIN files
378 ON temp.%(bulkinsert)s.path == files.path)
379 AND ( kind_mask != ? OR format != ? )
380 '''), (kind_mask, format))
382 if show_progress:
383 task.update(2, condition='adding file names to selection')
385 cursor.execute(self._sql(
386 '''
387 INSERT OR IGNORE INTO %(db)s.%(file_states)s
388 SELECT files.file_id, 0, ?, ?
389 FROM temp.%(bulkinsert)s
390 INNER JOIN files
391 ON temp.%(bulkinsert)s.path == files.path
392 '''), (kind_mask, format))
394 if show_progress:
395 task.update(3, condition='updating file states')
397 cursor.execute(self._sql(
398 '''
399 UPDATE %(db)s.%(file_states)s
400 SET file_state = 1
401 WHERE file_id IN (
402 SELECT files.file_id
403 FROM temp.%(bulkinsert)s
404 INNER JOIN files
405 ON temp.%(bulkinsert)s.path == files.path)
406 AND file_state != 0
407 '''))
409 if show_progress:
410 task.update(4, condition='dropping temporary data')
412 cursor.execute(self._sql(
413 'DROP TABLE temp.%(bulkinsert)s'))
415 if show_progress:
416 task.update(5)
417 task.done()
419 def remove(self, paths):
420 '''
421 Remove files from the selection.
423 :param paths:
424 Paths to files to be removed from the selection.
425 :type paths:
426 :py:class:`list` of :py:class:`str`
427 '''
428 if isinstance(paths, str):
429 paths = [paths]
431 db = self.get_database()
433 def normpath(path):
434 return db.relpath(abspath(path))
436 with self.transaction('remove files') as cursor:
437 cursor.executemany(self._sql(
438 '''
439 DELETE FROM %(db)s.%(file_states)s
440 WHERE %(db)s.%(file_states)s.file_id IN
441 (SELECT files.file_id
442 FROM files
443 WHERE files.path == ?)
444 '''), ((normpath(path),) for path in paths))
446 def iter_paths(self, raw=False):
447 '''
448 Iterate over all file paths currently belonging to the selection.
450 :param raw:
451 By default absolute paths are yielded. Set to ``True`` to yield
452 the path as it is stored in the database, which can be relative or
453 absolute, depending on whether the file is within a Squirrel
454 environment or outside.
455 :type raw:
456 bool
458 :yields: File paths.
459 '''
461 sql = self._sql('''
462 SELECT
463 files.path
464 FROM %(db)s.%(file_states)s
465 INNER JOIN files
466 ON files.file_id = %(db)s.%(file_states)s.file_id
467 ORDER BY %(db)s.%(file_states)s.file_id
468 ''')
470 if raw:
471 def trans(path):
472 return path
473 else:
474 db = self.get_database()
475 trans = db.abspath
477 for values in self._conn.execute(sql):
478 yield trans(values[0])
480 def get_paths(self, raw=False):
481 '''
482 Get all file paths currently belonging to the selection.
484 :param raw:
485 By default absolute paths are returned. Set to ``True`` to return
486 the path as it is stored in the database, which can be relative or
487 absolute, depending on whether the file is within a Squirrel
488 environment or outside.
489 :type raw:
490 bool
492 :returns: List of file paths.
493 '''
494 return list(self.iter_paths(raw=raw))
496 def _set_file_states_known(self, transaction=None):
497 '''
498 Set file states to "known" (2).
499 '''
500 with (transaction or self.transaction('set file states known')) \
501 as cursor:
502 cursor.execute(self._sql(
503 '''
504 UPDATE %(db)s.%(file_states)s
505 SET file_state = 2
506 WHERE file_state < 2
507 '''))
509 def _set_file_states_force_check(self, paths=None, transaction=None):
510 '''
511 Set file states to "request force check" (1).
512 '''
514 with (transaction or self.transaction('set file states force check')) \
515 as cursor:
517 if paths is None:
518 cursor.execute(self._sql(
519 '''
520 UPDATE %(db)s.%(file_states)s
521 SET file_state = 1
522 '''))
523 else:
524 db = self.get_database()
526 def normpath(path):
527 return db.relpath(abspath(path))
529 cursor.executemany(self._sql(
530 '''
531 UPDATE %(db)s.%(file_states)s
532 SET file_state = 1
533 WHERE %(db)s.%(file_states)s.file_id IN
534 (SELECT files.file_id
535 FROM files
536 WHERE files.path == ?)
537 '''), ((normpath(path),) for path in paths))
539 def undig_grouped(self, skip_unchanged=False):
540 '''
541 Get inventory of cached content for all files in the selection.
543 :param skip_unchanged:
544 If ``True`` only inventory of modified files is
545 yielded (:py:meth:`flag_modified` must be called beforehand).
546 :type skip_unchanged:
547 bool
549 This generator yields tuples ``((format, path), nuts)`` where ``path``
550 is the path to the file, ``format`` is the format assignation or
551 ``'detect'`` and ``nuts`` is a list of
552 :py:class:`~pyrocko.squirrel.model.Nut` objects representing the
553 contents of the file.
554 '''
556 if skip_unchanged:
557 where = '''
558 WHERE %(db)s.%(file_states)s.file_state == 0
559 '''
560 else:
561 where = ''
563 nfiles = execute_get1(self._conn, self._sql('''
564 SELECT
565 COUNT()
566 FROM %(db)s.%(file_states)s
567 ''' + where), ())[0]
569 def gen():
570 sql = self._sql('''
571 SELECT
572 %(db)s.%(file_states)s.format,
573 files.path,
574 files.format,
575 files.mtime,
576 files.size,
577 nuts.file_segment,
578 nuts.file_element,
579 kind_codes.kind_id,
580 kind_codes.codes,
581 nuts.tmin_seconds,
582 nuts.tmin_offset,
583 nuts.tmax_seconds,
584 nuts.tmax_offset,
585 kind_codes.deltat
586 FROM %(db)s.%(file_states)s
587 LEFT OUTER JOIN files
588 ON %(db)s.%(file_states)s.file_id = files.file_id
589 LEFT OUTER JOIN nuts
590 ON files.file_id = nuts.file_id
591 LEFT OUTER JOIN kind_codes
592 ON nuts.kind_codes_id == kind_codes.kind_codes_id
593 ''' + where + '''
594 ORDER BY %(db)s.%(file_states)s.file_id
595 ''')
597 nuts = []
598 format_path = None
599 db = self.get_database()
600 for values in self._conn.execute(sql):
601 apath = db.abspath(values[1])
602 if format_path is not None and apath != format_path[1]:
603 yield format_path, nuts
604 nuts = []
606 format_path = values[0], apath
608 if values[2] is not None:
609 nuts.append(model.Nut(
610 values_nocheck=format_path[1:2] + values[2:]))
612 if format_path is not None:
613 yield format_path, nuts
615 return GeneratorWithLen(gen(), nfiles)
617 def flag_modified(self, check=True):
618 '''
619 Mark files which have been modified.
621 :param check:
622 If ``True`` query modification times of known files on disk. If
623 ``False``, only flag unknown files.
624 :type check:
625 bool
627 Assumes file state is 0 for newly added files, 1 for files added again
628 to the selection (forces check), or 2 for all others (no checking is
629 done for those).
631 Sets file state to 0 for unknown or modified files, 2 for known and not
632 modified files.
633 '''
635 db = self.get_database()
636 with self.transaction('flag modified') as cursor:
637 sql = self._sql('''
638 UPDATE %(db)s.%(file_states)s
639 SET file_state = 0
640 WHERE (
641 SELECT mtime
642 FROM files
643 WHERE
644 files.file_id == %(db)s.%(file_states)s.file_id) IS NULL
645 AND file_state == 1
646 ''')
648 cursor.execute(sql)
650 if not check:
652 sql = self._sql('''
653 UPDATE %(db)s.%(file_states)s
654 SET file_state = 2
655 WHERE file_state == 1
656 ''')
658 cursor.execute(sql)
660 return
662 def iter_file_states():
663 sql = self._sql('''
664 SELECT
665 files.file_id,
666 files.path,
667 files.format,
668 files.mtime,
669 files.size
670 FROM %(db)s.%(file_states)s
671 INNER JOIN files
672 ON %(db)s.%(file_states)s.file_id == files.file_id
673 WHERE %(db)s.%(file_states)s.file_state == 1
674 ORDER BY %(db)s.%(file_states)s.file_id
675 ''')
677 for (file_id, path, fmt, mtime_db,
678 size_db) in self._conn.execute(sql):
680 path = db.abspath(path)
681 try:
682 mod = io.get_backend(fmt)
683 file_stats = mod.get_stats(path)
685 except FileLoadError:
686 yield 0, file_id
687 continue
688 except io.UnknownFormat:
689 continue
691 if (mtime_db, size_db) != file_stats:
692 yield 0, file_id
693 else:
694 yield 2, file_id
696 # could better use callback function here...
698 sql = self._sql('''
699 UPDATE %(db)s.%(file_states)s
700 SET file_state = ?
701 WHERE file_id = ?
702 ''')
704 cursor.executemany(sql, iter_file_states())
707__all__ = [
708 'Selection',
709]