1# http://pyrocko.org - GPLv3
2#
3# The Pyrocko Developers, 21st Century
4# ---|P------/S----------~Lg----------
6from __future__ import absolute_import, print_function
8import os
9import re
10import threading
11import logging
13from pyrocko import util
14from pyrocko.io.io_common import FileLoadError
15from pyrocko.progress import progress
17from . import error, io, model
18from .database import Database, get_database, execute_get1, abspath
20logger = logging.getLogger('psq.selection')
22g_icount = 0
23g_lock = threading.Lock()
25re_persistent_name = re.compile(r'^[a-zA-Z_][a-zA-Z0-9_]{0,64}$')
28def make_unique_name():
29 with g_lock:
30 global g_icount
31 name = '%i_%i' % (os.getpid(), g_icount)
32 g_icount += 1
34 return name
37def make_task(*args):
38 return progress.task(*args, logger=logger)
41doc_snippets = dict(
42 query_args='''
43 :param obj:
44 Object providing ``tmin``, ``tmax`` and ``codes`` to be used to
45 constrain the query. Direct arguments override those from ``obj``.
46 :type obj:
47 any object with attributes ``tmin``, ``tmax`` and ``codes``
49 :param tmin:
50 Start time of query interval.
51 :type tmin:
52 timestamp
54 :param tmax:
55 End time of query interval.
56 :type tmax:
57 timestamp
59 :param time:
60 Time instant to query. Equivalent to setting ``tmin`` and ``tmax``
61 to the same value.
62 :type time:
63 timestamp
65 :param codes:
66 Pattern of content codes to query.
67 :type codes:
68 :py:class:`tuple` of :py:class:`str`
69''',
70 file_formats=', '.join(
71 "``'%s'``" % fmt for fmt in io.supported_formats()))
74def filldocs(meth):
75 meth.__doc__ %= doc_snippets
76 return meth
79class GeneratorWithLen(object):
81 def __init__(self, gen, length):
82 self.gen = gen
83 self.length = length
85 def __len__(self):
86 return self.length
88 def __iter__(self):
89 return self.gen
92class Selection(object):
94 '''
95 Database backed file selection (base class for
96 :py:class:`~pyrocko.squirrel.base.Squirrel`).
98 :param database:
99 Database instance or file path to database.
100 :type database:
101 :py:class:`~pyrocko.squirrel.database.Database` or :py:class:`str`
103 :param persistent:
104 If given a name, create a persistent selection.
105 :type persistent:
106 :py:class:`str`
108 A selection in this context represents the list of files available to the
109 application. Instead of using :py:class:`Selection` directly, user
110 applications should usually use its subclass
111 :py:class:`~pyrocko.squirrel.base.Squirrel` which adds content indices to
112 the selection and provides high level data querying.
114 By default, a temporary table in the database is created to hold the names
115 of the files in the selection. This table is only visible inside the
116 application which created it. If a name is given to ``persistent``, a named
117 selection is created, which is visible also in other applications using the
118 same database.
120 Besides the filename references, desired content kind masks and file format
121 indications are stored in the selection's database table to make the user
122 choice regarding these options persistent on a per-file basis. Book-keeping
123 on whether files are unknown, known or if modification checks are forced is
124 handled in the selection's file-state table.
126 Paths of files can be added to the selection using the :py:meth:`add`
127 method and removed with :py:meth:`remove`. :py:meth:`undig_grouped` can be
128 used to iterate over all content known to the selection.
129 '''
131 def __init__(self, database, persistent=None):
132 self._conn = None
134 if not isinstance(database, Database):
135 database = get_database(database)
137 if persistent is not None:
138 assert isinstance(persistent, str)
139 if not re_persistent_name.match(persistent):
140 raise error.SquirrelError(
141 'invalid persistent selection name: %s' % persistent)
143 self.name = 'psel_' + persistent
144 else:
145 self.name = 'sel_' + make_unique_name()
147 self._persistent = persistent
148 self._database = database
149 self._conn = self._database.get_connection()
150 self._sources = []
151 self._is_new = True
152 self._volatile_paths = []
154 with self.transaction('init selection') as cursor:
156 if persistent is not None:
157 self._is_new = 1 == cursor.execute(
158 '''
159 INSERT OR IGNORE INTO persistent VALUES (?)
160 ''', (persistent,)).rowcount
162 self._names = {
163 'db': 'main' if self._persistent else 'temp',
164 'file_states': self.name + '_file_states',
165 'bulkinsert': self.name + '_bulkinsert'}
167 cursor.execute(self._register_table(self._sql(
168 '''
169 CREATE TABLE IF NOT EXISTS %(db)s.%(file_states)s (
170 file_id integer PRIMARY KEY,
171 file_state integer,
172 kind_mask integer,
173 format text)
174 ''')))
176 cursor.execute(self._sql(
177 '''
178 CREATE INDEX
179 IF NOT EXISTS %(db)s.%(file_states)s_index_file_state
180 ON %(file_states)s (file_state)
181 '''))
183 def __del__(self):
184 if hasattr(self, '_conn') and self._conn:
185 self._cleanup()
186 if not self._persistent:
187 self._delete()
189 def _register_table(self, s):
190 return self._database._register_table(s)
192 def _sql(self, s):
193 return s % self._names
195 def transaction(self, label='', mode='immediate'):
196 return self._database.transaction(label, mode)
198 def is_new(self):
199 '''
200 Is this a new selection?
202 Always ``True`` for non-persistent selections. Only ``False`` for
203 a persistent selection which already existed in the database when the
204 it was initialized.
205 '''
206 return self._is_new
208 def get_database(self):
209 '''
210 Get the database to which this selection belongs.
212 :returns: :py:class:`~pyrocko.squirrel.database.Database` object
213 '''
214 return self._database
216 def _cleanup(self):
217 '''
218 Perform cleanup actions before database connection is closed.
220 Removes volatile content from database.
221 '''
223 while self._volatile_paths:
224 path = self._volatile_paths.pop()
225 self._database.remove(path)
227 def _delete(self):
228 '''
229 Destroy the tables assoctiated with this selection.
230 '''
231 with self.transaction('delete selection') as cursor:
232 cursor.execute(self._sql(
233 'DROP TABLE %(db)s.%(file_states)s'))
235 if self._persistent:
236 cursor.execute(
237 '''
238 DELETE FROM persistent WHERE name == ?
239 ''', (self.name[5:],))
241 self._conn = None
243 def delete(self):
244 self._delete()
246 @filldocs
247 def add(
248 self,
249 paths,
250 kind_mask=model.g_kind_mask_all,
251 format='detect',
252 show_progress=True):
254 '''
255 Add files to the selection.
257 :param paths:
258 Paths to files to be added to the selection.
259 :type paths:
260 iterator yielding :py:class:`str` objects
262 :param kind_mask:
263 Content kinds to be added to the selection.
264 :type kind_mask:
265 :py:class:`int` (bit mask)
267 :param format:
268 File format identifier or ``'detect'`` to enable auto-detection
269 (available: %(file_formats)s).
270 :type format:
271 str
272 '''
274 if isinstance(paths, str):
275 paths = [paths]
277 if show_progress:
278 task = make_task('Gathering file names')
279 paths = task(paths)
281 paths = util.short_to_list(200, paths)
283 db = self.get_database()
284 with self.transaction('add files') as cursor:
286 if isinstance(paths, list) and len(paths) <= 200:
288 paths = [db.relpath(path) for path in paths]
290 # short non-iterator paths: can do without temp table
292 cursor.executemany(
293 '''
294 INSERT OR IGNORE INTO files
295 VALUES (NULL, ?, NULL, NULL, NULL)
296 ''', ((x,) for x in paths))
298 if show_progress:
299 task = make_task('Preparing database', 3)
300 task.update(0, condition='pruning stale information')
302 cursor.executemany(self._sql(
303 '''
304 DELETE FROM %(db)s.%(file_states)s
305 WHERE file_id IN (
306 SELECT files.file_id
307 FROM files
308 WHERE files.path == ? )
309 AND ( kind_mask != ? OR format != ? )
310 '''), (
311 (path, kind_mask, format) for path in paths))
313 if show_progress:
314 task.update(1, condition='adding file names to selection')
316 cursor.executemany(self._sql(
317 '''
318 INSERT OR IGNORE INTO %(db)s.%(file_states)s
319 SELECT files.file_id, 0, ?, ?
320 FROM files
321 WHERE files.path = ?
322 '''), ((kind_mask, format, path) for path in paths))
324 if show_progress:
325 task.update(2, condition='updating file states')
327 cursor.executemany(self._sql(
328 '''
329 UPDATE %(db)s.%(file_states)s
330 SET file_state = 1
331 WHERE file_id IN (
332 SELECT files.file_id
333 FROM files
334 WHERE files.path == ? )
335 AND file_state != 0
336 '''), ((path,) for path in paths))
338 if show_progress:
339 task.update(3)
340 task.done()
342 else:
344 cursor.execute(self._sql(
345 '''
346 CREATE TEMP TABLE temp.%(bulkinsert)s
347 (path text)
348 '''))
350 cursor.executemany(self._sql(
351 'INSERT INTO temp.%(bulkinsert)s VALUES (?)'),
352 ((db.relpath(x),) for x in paths))
354 if show_progress:
355 task = make_task('Preparing database', 5)
356 task.update(0, condition='adding file names to database')
358 cursor.execute(self._sql(
359 '''
360 INSERT OR IGNORE INTO files
361 SELECT NULL, path, NULL, NULL, NULL
362 FROM temp.%(bulkinsert)s
363 '''))
365 if show_progress:
366 task.update(1, condition='pruning stale information')
368 cursor.execute(self._sql(
369 '''
370 DELETE FROM %(db)s.%(file_states)s
371 WHERE file_id IN (
372 SELECT files.file_id
373 FROM temp.%(bulkinsert)s
374 INNER JOIN files
375 ON temp.%(bulkinsert)s.path == files.path)
376 AND ( kind_mask != ? OR format != ? )
377 '''), (kind_mask, format))
379 if show_progress:
380 task.update(2, condition='adding file names to selection')
382 cursor.execute(self._sql(
383 '''
384 INSERT OR IGNORE INTO %(db)s.%(file_states)s
385 SELECT files.file_id, 0, ?, ?
386 FROM temp.%(bulkinsert)s
387 INNER JOIN files
388 ON temp.%(bulkinsert)s.path == files.path
389 '''), (kind_mask, format))
391 if show_progress:
392 task.update(3, condition='updating file states')
394 cursor.execute(self._sql(
395 '''
396 UPDATE %(db)s.%(file_states)s
397 SET file_state = 1
398 WHERE file_id IN (
399 SELECT files.file_id
400 FROM temp.%(bulkinsert)s
401 INNER JOIN files
402 ON temp.%(bulkinsert)s.path == files.path)
403 AND file_state != 0
404 '''))
406 if show_progress:
407 task.update(4, condition='dropping temporary data')
409 cursor.execute(self._sql(
410 'DROP TABLE temp.%(bulkinsert)s'))
412 if show_progress:
413 task.update(5)
414 task.done()
416 def remove(self, paths):
417 '''
418 Remove files from the selection.
420 :param paths:
421 Paths to files to be removed from the selection.
422 :type paths:
423 :py:class:`list` of :py:class:`str`
424 '''
425 if isinstance(paths, str):
426 paths = [paths]
428 db = self.get_database()
430 def normpath(path):
431 return db.relpath(abspath(path))
433 with self.transaction('remove files') as cursor:
434 cursor.executemany(self._sql(
435 '''
436 DELETE FROM %(db)s.%(file_states)s
437 WHERE %(db)s.%(file_states)s.file_id IN
438 (SELECT files.file_id
439 FROM files
440 WHERE files.path == ?)
441 '''), ((normpath(path),) for path in paths))
443 def iter_paths(self, raw=False):
444 '''
445 Iterate over all file paths currently belonging to the selection.
447 :param raw:
448 By default absolute paths are yielded. Set to ``True`` to yield
449 the path as it is stored in the database, which can be relative or
450 absolute, depending on whether the file is within a Squirrel
451 environment or outside.
452 :type raw:
453 bool
455 :yields: File paths.
456 '''
458 sql = self._sql('''
459 SELECT
460 files.path
461 FROM %(db)s.%(file_states)s
462 INNER JOIN files
463 ON files.file_id = %(db)s.%(file_states)s.file_id
464 ORDER BY %(db)s.%(file_states)s.file_id
465 ''')
467 if raw:
468 def trans(path):
469 return path
470 else:
471 db = self.get_database()
472 trans = db.abspath
474 for values in self._conn.execute(sql):
475 yield trans(values[0])
477 def get_paths(self, raw=False):
478 '''
479 Get all file paths currently belonging to the selection.
481 :param raw:
482 By default absolute paths are returned. Set to ``True`` to return
483 the path as it is stored in the database, which can be relative or
484 absolute, depending on whether the file is within a Squirrel
485 environment or outside.
486 :type raw:
487 bool
489 :returns: List of file paths.
490 '''
491 return list(self.iter_paths(raw=raw))
493 def _set_file_states_known(self, transaction=None):
494 '''
495 Set file states to "known" (2).
496 '''
497 with (transaction or self.transaction('set file states known')) \
498 as cursor:
499 cursor.execute(self._sql(
500 '''
501 UPDATE %(db)s.%(file_states)s
502 SET file_state = 2
503 WHERE file_state < 2
504 '''))
506 def _set_file_states_force_check(self, paths=None, transaction=None):
507 '''
508 Set file states to "request force check" (1).
509 '''
511 with (transaction or self.transaction('set file states force check')) \
512 as cursor:
514 if paths is None:
515 cursor.execute(self._sql(
516 '''
517 UPDATE %(db)s.%(file_states)s
518 SET file_state = 1
519 '''))
520 else:
521 db = self.get_database()
523 def normpath(path):
524 return db.relpath(abspath(path))
526 cursor.executemany(self._sql(
527 '''
528 UPDATE %(db)s.%(file_states)s
529 SET file_state = 1
530 WHERE %(db)s.%(file_states)s.file_id IN
531 (SELECT files.file_id
532 FROM files
533 WHERE files.path == ?)
534 '''), ((normpath(path),) for path in paths))
536 def undig_grouped(self, skip_unchanged=False):
537 '''
538 Get inventory of cached content for all files in the selection.
540 :param skip_unchanged:
541 If ``True`` only inventory of modified files is
542 yielded (:py:meth:`flag_modified` must be called beforehand).
543 :type skip_unchanged:
544 bool
546 This generator yields tuples ``((format, path), nuts)`` where ``path``
547 is the path to the file, ``format`` is the format assignation or
548 ``'detect'`` and ``nuts`` is a list of
549 :py:class:`~pyrocko.squirrel.model.Nut` objects representing the
550 contents of the file.
551 '''
553 if skip_unchanged:
554 where = '''
555 WHERE %(db)s.%(file_states)s.file_state == 0
556 '''
557 else:
558 where = ''
560 nfiles = execute_get1(self._conn, self._sql('''
561 SELECT
562 COUNT()
563 FROM %(db)s.%(file_states)s
564 ''' + where), ())[0]
566 def gen():
567 sql = self._sql('''
568 SELECT
569 %(db)s.%(file_states)s.format,
570 files.path,
571 files.format,
572 files.mtime,
573 files.size,
574 nuts.file_segment,
575 nuts.file_element,
576 kind_codes.kind_id,
577 kind_codes.codes,
578 nuts.tmin_seconds,
579 nuts.tmin_offset,
580 nuts.tmax_seconds,
581 nuts.tmax_offset,
582 kind_codes.deltat
583 FROM %(db)s.%(file_states)s
584 LEFT OUTER JOIN files
585 ON %(db)s.%(file_states)s.file_id = files.file_id
586 LEFT OUTER JOIN nuts
587 ON files.file_id = nuts.file_id
588 LEFT OUTER JOIN kind_codes
589 ON nuts.kind_codes_id == kind_codes.kind_codes_id
590 ''' + where + '''
591 ORDER BY %(db)s.%(file_states)s.file_id
592 ''')
594 nuts = []
595 format_path = None
596 db = self.get_database()
597 for values in self._conn.execute(sql):
598 apath = db.abspath(values[1])
599 if format_path is not None and apath != format_path[1]:
600 yield format_path, nuts
601 nuts = []
603 format_path = values[0], apath
605 if values[2] is not None:
606 nuts.append(model.Nut(
607 values_nocheck=format_path[1:2] + values[2:]))
609 if format_path is not None:
610 yield format_path, nuts
612 return GeneratorWithLen(gen(), nfiles)
614 def flag_modified(self, check=True):
615 '''
616 Mark files which have been modified.
618 :param check:
619 If ``True`` query modification times of known files on disk. If
620 ``False``, only flag unknown files.
621 :type check:
622 bool
624 Assumes file state is 0 for newly added files, 1 for files added again
625 to the selection (forces check), or 2 for all others (no checking is
626 done for those).
628 Sets file state to 0 for unknown or modified files, 2 for known and not
629 modified files.
630 '''
632 db = self.get_database()
633 with self.transaction('flag modified') as cursor:
634 sql = self._sql('''
635 UPDATE %(db)s.%(file_states)s
636 SET file_state = 0
637 WHERE (
638 SELECT mtime
639 FROM files
640 WHERE
641 files.file_id == %(db)s.%(file_states)s.file_id) IS NULL
642 AND file_state == 1
643 ''')
645 cursor.execute(sql)
647 if not check:
649 sql = self._sql('''
650 UPDATE %(db)s.%(file_states)s
651 SET file_state = 2
652 WHERE file_state == 1
653 ''')
655 cursor.execute(sql)
657 return
659 def iter_file_states():
660 sql = self._sql('''
661 SELECT
662 files.file_id,
663 files.path,
664 files.format,
665 files.mtime,
666 files.size
667 FROM %(db)s.%(file_states)s
668 INNER JOIN files
669 ON %(db)s.%(file_states)s.file_id == files.file_id
670 WHERE %(db)s.%(file_states)s.file_state == 1
671 ORDER BY %(db)s.%(file_states)s.file_id
672 ''')
674 for (file_id, path, fmt, mtime_db,
675 size_db) in self._conn.execute(sql):
677 path = db.abspath(path)
678 try:
679 mod = io.get_backend(fmt)
680 file_stats = mod.get_stats(path)
682 except FileLoadError:
683 yield 0, file_id
684 continue
685 except io.UnknownFormat:
686 continue
688 if (mtime_db, size_db) != file_stats:
689 yield 0, file_id
690 else:
691 yield 2, file_id
693 # could better use callback function here...
695 sql = self._sql('''
696 UPDATE %(db)s.%(file_states)s
697 SET file_state = ?
698 WHERE file_id = ?
699 ''')
701 cursor.executemany(sql, iter_file_states())
704__all__ = [
705 'Selection',
706]