1# http://pyrocko.org - GPLv3
2#
3# The Pyrocko Developers, 21st Century
4# ---|P------/S----------~Lg----------
6from __future__ import absolute_import, print_function
8import os
9import re
10import threading
11import logging
13from pyrocko import util
14from pyrocko.io.io_common import FileLoadError
15from pyrocko.progress import progress
17from . import error, io, model
18from .database import Database, get_database, execute_get1, abspath
20logger = logging.getLogger('psq.selection')
22g_icount = 0
23g_lock = threading.Lock()
25re_persistent_name = re.compile(r'^[a-zA-Z_][a-zA-Z0-9_]{0,64}$')
28def make_unique_name():
29 with g_lock:
30 global g_icount
31 name = '%i_%i' % (os.getpid(), g_icount)
32 g_icount += 1
34 return name
37def make_task(*args):
38 return progress.task(*args, logger=logger)
41doc_snippets = dict(
42 query_args='''
43 :param obj:
44 Object providing ``tmin``, ``tmax`` and ``codes`` to be used to
45 constrain the query. Direct arguments override those from ``obj``.
46 :type obj:
47 any object with attributes ``tmin``, ``tmax`` and ``codes``
49 :param tmin:
50 Start time of query interval.
51 :type tmin:
52 timestamp
54 :param tmax:
55 End time of query interval.
56 :type tmax:
57 timestamp
59 :param time:
60 Time instant to query. Equivalent to setting ``tmin`` and ``tmax``
61 to the same value.
62 :type time:
63 timestamp
65 :param codes:
66 Pattern of content codes to query.
67 :type codes:
68 :class:`list` of :py:class:`~pyrocko.squirrel.model.Codes`
69 objects appropriate for the queried content type, or anything which
70 can be converted to such objects.
71''',
72 file_formats=', '.join(
73 "``'%s'``" % fmt for fmt in io.supported_formats()))
76def filldocs(meth):
77 meth.__doc__ %= doc_snippets
78 return meth
81class GeneratorWithLen(object):
83 def __init__(self, gen, length):
84 self.gen = gen
85 self.length = length
87 def __len__(self):
88 return self.length
90 def __iter__(self):
91 return self.gen
94class Selection(object):
96 '''
97 Database backed file selection (base class for
98 :py:class:`~pyrocko.squirrel.base.Squirrel`).
100 :param database:
101 Database instance or file path to database.
102 :type database:
103 :py:class:`~pyrocko.squirrel.database.Database` or :py:class:`str`
105 :param persistent:
106 If given a name, create a persistent selection.
107 :type persistent:
108 :py:class:`str`
110 A selection in this context represents the list of files available to the
111 application. Instead of using :py:class:`Selection` directly, user
112 applications should usually use its subclass
113 :py:class:`~pyrocko.squirrel.base.Squirrel` which adds content indices to
114 the selection and provides high level data querying.
116 By default, a temporary table in the database is created to hold the names
117 of the files in the selection. This table is only visible inside the
118 application which created it. If a name is given to ``persistent``, a named
119 selection is created, which is visible also in other applications using the
120 same database.
122 Besides the filename references, desired content kind masks and file format
123 indications are stored in the selection's database table to make the user
124 choice regarding these options persistent on a per-file basis. Book-keeping
125 on whether files are unknown, known or if modification checks are forced is
126 handled in the selection's file-state table.
128 Paths of files can be added to the selection using the :py:meth:`add`
129 method and removed with :py:meth:`remove`. :py:meth:`undig_grouped` can be
130 used to iterate over all content known to the selection.
131 '''
133 def __init__(self, database, persistent=None):
134 self._conn = None
136 if not isinstance(database, Database):
137 database = get_database(database)
139 if persistent is not None:
140 assert isinstance(persistent, str)
141 if not re_persistent_name.match(persistent):
142 raise error.SquirrelError(
143 'invalid persistent selection name: %s' % persistent)
145 self.name = 'psel_' + persistent
146 else:
147 self.name = 'sel_' + make_unique_name()
149 self._persistent = persistent
150 self._database = database
151 self._conn = self._database.get_connection()
152 self._sources = []
153 self._is_new = True
154 self._volatile_paths = []
156 with self.transaction('init selection') as cursor:
158 if persistent is not None:
159 self._is_new = 1 == cursor.execute(
160 '''
161 INSERT OR IGNORE INTO persistent VALUES (?)
162 ''', (persistent,)).rowcount
164 self._names = {
165 'db': 'main' if self._persistent else 'temp',
166 'file_states': self.name + '_file_states',
167 'bulkinsert': self.name + '_bulkinsert'}
169 cursor.execute(self._register_table(self._sql(
170 '''
171 CREATE TABLE IF NOT EXISTS %(db)s.%(file_states)s (
172 file_id integer PRIMARY KEY,
173 file_state integer,
174 kind_mask integer,
175 format text)
176 ''')))
178 cursor.execute(self._sql(
179 '''
180 CREATE INDEX
181 IF NOT EXISTS %(db)s.%(file_states)s_index_file_state
182 ON %(file_states)s (file_state)
183 '''))
185 def __del__(self):
186 if hasattr(self, '_conn') and self._conn:
187 self._cleanup()
188 if not self._persistent:
189 self._delete()
191 def _register_table(self, s):
192 return self._database._register_table(s)
194 def _sql(self, s):
195 return s % self._names
197 def transaction(self, label='', mode='immediate'):
198 return self._database.transaction(label, mode)
200 def is_new(self):
201 '''
202 Is this a new selection?
204 Always ``True`` for non-persistent selections. Only ``False`` for
205 a persistent selection which already existed in the database when the
206 it was initialized.
207 '''
208 return self._is_new
210 def get_database(self):
211 '''
212 Get the database to which this selection belongs.
214 :returns: :py:class:`~pyrocko.squirrel.database.Database` object
215 '''
216 return self._database
218 def _cleanup(self):
219 '''
220 Perform cleanup actions before database connection is closed.
222 Removes volatile content from database.
223 '''
225 while self._volatile_paths:
226 path = self._volatile_paths.pop()
227 self._database.remove(path)
229 def _delete(self):
230 '''
231 Destroy the tables assoctiated with this selection.
232 '''
233 with self.transaction('delete selection') as cursor:
234 cursor.execute(self._sql(
235 'DROP TABLE %(db)s.%(file_states)s'))
237 if self._persistent:
238 cursor.execute(
239 '''
240 DELETE FROM persistent WHERE name == ?
241 ''', (self.name[5:],))
243 self._conn = None
245 def delete(self):
246 self._delete()
248 @filldocs
249 def add(
250 self,
251 paths,
252 kind_mask=model.g_kind_mask_all,
253 format='detect',
254 show_progress=True):
256 '''
257 Add files to the selection.
259 :param paths:
260 Paths to files to be added to the selection.
261 :type paths:
262 iterator yielding :py:class:`str` objects
264 :param kind_mask:
265 Content kinds to be added to the selection.
266 :type kind_mask:
267 :py:class:`int` (bit mask)
269 :param format:
270 File format identifier or ``'detect'`` to enable auto-detection
271 (available: %(file_formats)s).
272 :type format:
273 str
274 '''
276 if isinstance(paths, str):
277 paths = [paths]
279 if show_progress:
280 task = make_task('Gathering file names')
281 paths = task(paths)
283 paths = util.short_to_list(200, paths)
285 db = self.get_database()
286 with self.transaction('add files') as cursor:
288 if isinstance(paths, list) and len(paths) <= 200:
290 paths = [db.relpath(path) for path in paths]
292 # short non-iterator paths: can do without temp table
294 cursor.executemany(
295 '''
296 INSERT OR IGNORE INTO files
297 VALUES (NULL, ?, NULL, NULL, NULL)
298 ''', ((x,) for x in paths))
300 if show_progress:
301 task = make_task('Preparing database', 3)
302 task.update(0, condition='pruning stale information')
304 cursor.executemany(self._sql(
305 '''
306 DELETE FROM %(db)s.%(file_states)s
307 WHERE file_id IN (
308 SELECT files.file_id
309 FROM files
310 WHERE files.path == ? )
311 AND ( kind_mask != ? OR format != ? )
312 '''), (
313 (path, kind_mask, format) for path in paths))
315 if show_progress:
316 task.update(1, condition='adding file names to selection')
318 cursor.executemany(self._sql(
319 '''
320 INSERT OR IGNORE INTO %(db)s.%(file_states)s
321 SELECT files.file_id, 0, ?, ?
322 FROM files
323 WHERE files.path = ?
324 '''), ((kind_mask, format, path) for path in paths))
326 if show_progress:
327 task.update(2, condition='updating file states')
329 cursor.executemany(self._sql(
330 '''
331 UPDATE %(db)s.%(file_states)s
332 SET file_state = 1
333 WHERE file_id IN (
334 SELECT files.file_id
335 FROM files
336 WHERE files.path == ? )
337 AND file_state != 0
338 '''), ((path,) for path in paths))
340 if show_progress:
341 task.update(3)
342 task.done()
344 else:
346 cursor.execute(self._sql(
347 '''
348 CREATE TEMP TABLE temp.%(bulkinsert)s
349 (path text)
350 '''))
352 cursor.executemany(self._sql(
353 'INSERT INTO temp.%(bulkinsert)s VALUES (?)'),
354 ((db.relpath(x),) for x in paths))
356 if show_progress:
357 task = make_task('Preparing database', 5)
358 task.update(0, condition='adding file names to database')
360 cursor.execute(self._sql(
361 '''
362 INSERT OR IGNORE INTO files
363 SELECT NULL, path, NULL, NULL, NULL
364 FROM temp.%(bulkinsert)s
365 '''))
367 if show_progress:
368 task.update(1, condition='pruning stale information')
370 cursor.execute(self._sql(
371 '''
372 DELETE FROM %(db)s.%(file_states)s
373 WHERE file_id IN (
374 SELECT files.file_id
375 FROM temp.%(bulkinsert)s
376 INNER JOIN files
377 ON temp.%(bulkinsert)s.path == files.path)
378 AND ( kind_mask != ? OR format != ? )
379 '''), (kind_mask, format))
381 if show_progress:
382 task.update(2, condition='adding file names to selection')
384 cursor.execute(self._sql(
385 '''
386 INSERT OR IGNORE INTO %(db)s.%(file_states)s
387 SELECT files.file_id, 0, ?, ?
388 FROM temp.%(bulkinsert)s
389 INNER JOIN files
390 ON temp.%(bulkinsert)s.path == files.path
391 '''), (kind_mask, format))
393 if show_progress:
394 task.update(3, condition='updating file states')
396 cursor.execute(self._sql(
397 '''
398 UPDATE %(db)s.%(file_states)s
399 SET file_state = 1
400 WHERE file_id IN (
401 SELECT files.file_id
402 FROM temp.%(bulkinsert)s
403 INNER JOIN files
404 ON temp.%(bulkinsert)s.path == files.path)
405 AND file_state != 0
406 '''))
408 if show_progress:
409 task.update(4, condition='dropping temporary data')
411 cursor.execute(self._sql(
412 'DROP TABLE temp.%(bulkinsert)s'))
414 if show_progress:
415 task.update(5)
416 task.done()
418 def remove(self, paths):
419 '''
420 Remove files from the selection.
422 :param paths:
423 Paths to files to be removed from the selection.
424 :type paths:
425 :py:class:`list` of :py:class:`str`
426 '''
427 if isinstance(paths, str):
428 paths = [paths]
430 db = self.get_database()
432 def normpath(path):
433 return db.relpath(abspath(path))
435 with self.transaction('remove files') as cursor:
436 cursor.executemany(self._sql(
437 '''
438 DELETE FROM %(db)s.%(file_states)s
439 WHERE %(db)s.%(file_states)s.file_id IN
440 (SELECT files.file_id
441 FROM files
442 WHERE files.path == ?)
443 '''), ((normpath(path),) for path in paths))
445 def iter_paths(self, raw=False):
446 '''
447 Iterate over all file paths currently belonging to the selection.
449 :param raw:
450 By default absolute paths are yielded. Set to ``True`` to yield
451 the path as it is stored in the database, which can be relative or
452 absolute, depending on whether the file is within a Squirrel
453 environment or outside.
454 :type raw:
455 bool
457 :yields: File paths.
458 '''
460 sql = self._sql('''
461 SELECT
462 files.path
463 FROM %(db)s.%(file_states)s
464 INNER JOIN files
465 ON files.file_id = %(db)s.%(file_states)s.file_id
466 ORDER BY %(db)s.%(file_states)s.file_id
467 ''')
469 if raw:
470 def trans(path):
471 return path
472 else:
473 db = self.get_database()
474 trans = db.abspath
476 for values in self._conn.execute(sql):
477 yield trans(values[0])
479 def get_paths(self, raw=False):
480 '''
481 Get all file paths currently belonging to the selection.
483 :param raw:
484 By default absolute paths are returned. Set to ``True`` to return
485 the path as it is stored in the database, which can be relative or
486 absolute, depending on whether the file is within a Squirrel
487 environment or outside.
488 :type raw:
489 bool
491 :returns: List of file paths.
492 '''
493 return list(self.iter_paths(raw=raw))
495 def _set_file_states_known(self, transaction=None):
496 '''
497 Set file states to "known" (2).
498 '''
499 with (transaction or self.transaction('set file states known')) \
500 as cursor:
501 cursor.execute(self._sql(
502 '''
503 UPDATE %(db)s.%(file_states)s
504 SET file_state = 2
505 WHERE file_state < 2
506 '''))
508 def _set_file_states_force_check(self, paths=None, transaction=None):
509 '''
510 Set file states to "request force check" (1).
511 '''
513 with (transaction or self.transaction('set file states force check')) \
514 as cursor:
516 if paths is None:
517 cursor.execute(self._sql(
518 '''
519 UPDATE %(db)s.%(file_states)s
520 SET file_state = 1
521 '''))
522 else:
523 db = self.get_database()
525 def normpath(path):
526 return db.relpath(abspath(path))
528 cursor.executemany(self._sql(
529 '''
530 UPDATE %(db)s.%(file_states)s
531 SET file_state = 1
532 WHERE %(db)s.%(file_states)s.file_id IN
533 (SELECT files.file_id
534 FROM files
535 WHERE files.path == ?)
536 '''), ((normpath(path),) for path in paths))
538 def undig_grouped(self, skip_unchanged=False):
539 '''
540 Get inventory of cached content for all files in the selection.
542 :param skip_unchanged:
543 If ``True`` only inventory of modified files is
544 yielded (:py:meth:`flag_modified` must be called beforehand).
545 :type skip_unchanged:
546 bool
548 This generator yields tuples ``((format, path), nuts)`` where ``path``
549 is the path to the file, ``format`` is the format assignation or
550 ``'detect'`` and ``nuts`` is a list of
551 :py:class:`~pyrocko.squirrel.model.Nut` objects representing the
552 contents of the file.
553 '''
555 if skip_unchanged:
556 where = '''
557 WHERE %(db)s.%(file_states)s.file_state == 0
558 '''
559 else:
560 where = ''
562 nfiles = execute_get1(self._conn, self._sql('''
563 SELECT
564 COUNT()
565 FROM %(db)s.%(file_states)s
566 ''' + where), ())[0]
568 def gen():
569 sql = self._sql('''
570 SELECT
571 %(db)s.%(file_states)s.format,
572 files.path,
573 files.format,
574 files.mtime,
575 files.size,
576 nuts.file_segment,
577 nuts.file_element,
578 kind_codes.kind_id,
579 kind_codes.codes,
580 nuts.tmin_seconds,
581 nuts.tmin_offset,
582 nuts.tmax_seconds,
583 nuts.tmax_offset,
584 kind_codes.deltat
585 FROM %(db)s.%(file_states)s
586 LEFT OUTER JOIN files
587 ON %(db)s.%(file_states)s.file_id = files.file_id
588 LEFT OUTER JOIN nuts
589 ON files.file_id = nuts.file_id
590 LEFT OUTER JOIN kind_codes
591 ON nuts.kind_codes_id == kind_codes.kind_codes_id
592 ''' + where + '''
593 ORDER BY %(db)s.%(file_states)s.file_id
594 ''')
596 nuts = []
597 format_path = None
598 db = self.get_database()
599 for values in self._conn.execute(sql):
600 apath = db.abspath(values[1])
601 if format_path is not None and apath != format_path[1]:
602 yield format_path, nuts
603 nuts = []
605 format_path = values[0], apath
607 if values[2] is not None:
608 nuts.append(model.Nut(
609 values_nocheck=format_path[1:2] + values[2:]))
611 if format_path is not None:
612 yield format_path, nuts
614 return GeneratorWithLen(gen(), nfiles)
616 def flag_modified(self, check=True):
617 '''
618 Mark files which have been modified.
620 :param check:
621 If ``True`` query modification times of known files on disk. If
622 ``False``, only flag unknown files.
623 :type check:
624 bool
626 Assumes file state is 0 for newly added files, 1 for files added again
627 to the selection (forces check), or 2 for all others (no checking is
628 done for those).
630 Sets file state to 0 for unknown or modified files, 2 for known and not
631 modified files.
632 '''
634 db = self.get_database()
635 with self.transaction('flag modified') as cursor:
636 sql = self._sql('''
637 UPDATE %(db)s.%(file_states)s
638 SET file_state = 0
639 WHERE (
640 SELECT mtime
641 FROM files
642 WHERE
643 files.file_id == %(db)s.%(file_states)s.file_id) IS NULL
644 AND file_state == 1
645 ''')
647 cursor.execute(sql)
649 if not check:
651 sql = self._sql('''
652 UPDATE %(db)s.%(file_states)s
653 SET file_state = 2
654 WHERE file_state == 1
655 ''')
657 cursor.execute(sql)
659 return
661 def iter_file_states():
662 sql = self._sql('''
663 SELECT
664 files.file_id,
665 files.path,
666 files.format,
667 files.mtime,
668 files.size
669 FROM %(db)s.%(file_states)s
670 INNER JOIN files
671 ON %(db)s.%(file_states)s.file_id == files.file_id
672 WHERE %(db)s.%(file_states)s.file_state == 1
673 ORDER BY %(db)s.%(file_states)s.file_id
674 ''')
676 for (file_id, path, fmt, mtime_db,
677 size_db) in self._conn.execute(sql):
679 path = db.abspath(path)
680 try:
681 mod = io.get_backend(fmt)
682 file_stats = mod.get_stats(path)
684 except FileLoadError:
685 yield 0, file_id
686 continue
687 except io.UnknownFormat:
688 continue
690 if (mtime_db, size_db) != file_stats:
691 yield 0, file_id
692 else:
693 yield 2, file_id
695 # could better use callback function here...
697 sql = self._sql('''
698 UPDATE %(db)s.%(file_states)s
699 SET file_state = ?
700 WHERE file_id = ?
701 ''')
703 cursor.executemany(sql, iter_file_states())
706__all__ = [
707 'Selection',
708]