1# http://pyrocko.org - GPLv3
2#
3# The Pyrocko Developers, 21st Century
4# ---|P------/S----------~Lg----------
6from __future__ import absolute_import, print_function
8import sys
9import os
11import math
12import logging
13import threading
14import queue
15from collections import defaultdict
17from pyrocko.guts import Object, Int, List, Tuple, String, Timestamp, Dict
18from pyrocko import util, trace
19from pyrocko.progress import progress
21from . import model, io, cache, dataset
23from .model import to_kind_id, WaveformOrder, to_kind, to_codes, \
24 STATION, CHANNEL, RESPONSE, EVENT, WAVEFORM
25from .client import fdsn, catalog
26from .selection import Selection, filldocs
27from .database import abspath
28from . import client, environment, error
30logger = logging.getLogger('psq.base')
32guts_prefix = 'squirrel'
35def make_task(*args):
36 return progress.task(*args, logger=logger)
39def lpick(condition, seq):
40 ft = [], []
41 for ele in seq:
42 ft[int(bool(condition(ele)))].append(ele)
44 return ft
47def codes_patterns_for_kind(kind, codes):
48 if not codes:
49 return []
51 if not isinstance(codes[0], str):
52 out = []
53 for subcodes in codes:
54 out.extend(codes_patterns_for_kind(kind, subcodes))
55 return out
57 if kind in ('event', 'undefined'):
58 return [codes]
60 codes = to_codes(to_kind_id(kind), codes)
62 if kind == 'station':
63 return [codes, codes.replace(location='[*]')]
64 else:
65 return [codes]
68def blocks(tmin, tmax, deltat, nsamples_block=100000):
69 tblock = util.to_time_float(deltat * nsamples_block)
70 iblock_min = int(math.floor(tmin / tblock))
71 iblock_max = int(math.ceil(tmax / tblock))
72 for iblock in range(iblock_min, iblock_max):
73 yield iblock * tblock, (iblock+1) * tblock
76def gaps(avail, tmin, tmax):
77 assert tmin < tmax
79 data = [(tmax, 1), (tmin, -1)]
80 for (tmin_a, tmax_a) in avail:
81 assert tmin_a < tmax_a
82 data.append((tmin_a, 1))
83 data.append((tmax_a, -1))
85 data.sort()
86 s = 1
87 gaps = []
88 tmin_g = None
89 for t, x in data:
90 if s == 1 and x == -1:
91 tmin_g = t
92 elif s == 0 and x == 1 and tmin_g is not None:
93 tmax_g = t
94 if tmin_g != tmax_g:
95 gaps.append((tmin_g, tmax_g))
97 s += x
99 return gaps
102def order_key(order):
103 return (order.codes, order.tmin, order.tmax)
106class Batch(object):
107 '''
108 Batch of waveforms from window-wise data extraction.
110 Encapsulates state and results yielded for each window in window-wise
111 waveform extraction with the :py:meth:`Squirrel.chopper_waveforms` method.
113 *Attributes:*
115 .. py:attribute:: tmin
117 Start of this time window.
119 .. py:attribute:: tmax
121 End of this time window.
123 .. py:attribute:: i
125 Index of this time window in sequence.
127 .. py:attribute:: n
129 Total number of time windows in sequence.
131 .. py:attribute:: traces
133 Extracted waveforms for this time window.
134 '''
136 def __init__(self, tmin, tmax, i, n, traces):
137 self.tmin = tmin
138 self.tmax = tmax
139 self.i = i
140 self.n = n
141 self.traces = traces
144class Squirrel(Selection):
145 '''
146 Prompt, lazy, indexing, caching, dynamic seismological dataset access.
148 :param env:
149 Squirrel environment instance or directory path to use as starting
150 point for its detection. By default, the current directory is used as
151 starting point. When searching for a usable environment the directory
152 ``'.squirrel'`` or ``'squirrel'`` in the current (or starting point)
153 directory is used if it exists, otherwise the parent directories are
154 search upwards for the existence of such a directory. If no such
155 directory is found, the user's global Squirrel environment
156 ``'$HOME/.pyrocko/squirrel'`` is used.
157 :type env:
158 :py:class:`~pyrocko.squirrel.environment.Environment` or
159 :py:class:`str`
161 :param database:
162 Database instance or path to database. By default the
163 database found in the detected Squirrel environment is used.
164 :type database:
165 :py:class:`~pyrocko.squirrel.database.Database` or :py:class:`str`
167 :param cache_path:
168 Directory path to use for data caching. By default, the ``'cache'``
169 directory in the detected Squirrel environment is used.
170 :type cache_path:
171 :py:class:`str`
173 :param persistent:
174 If given a name, create a persistent selection.
175 :type persistent:
176 :py:class:`str`
178 This is the central class of the Squirrel framework. It provides a unified
179 interface to query and access seismic waveforms, station meta-data and
180 event information from local file collections and remote data sources. For
181 prompt responses, a profound database setup is used under the hood. To
182 speed up assemblage of ad-hoc data selections, files are indexed on first
183 use and the extracted meta-data is remembered in the database for
184 subsequent accesses. Bulk data is lazily loaded from disk and remote
185 sources, just when requested. Once loaded, data is cached in memory to
186 expedite typical access patterns. Files and data sources can be dynamically
187 added to and removed from the Squirrel selection at runtime.
189 Queries are restricted to the contents of the files currently added to the
190 Squirrel selection (usually a subset of the file meta-information
191 collection in the database). This list of files is referred to here as the
192 "selection". By default, temporary tables are created in the attached
193 database to hold the names of the files in the selection as well as various
194 indices and counters. These tables are only visible inside the application
195 which created them and are deleted when the database connection is closed
196 or the application exits. To create a selection which is not deleted at
197 exit, supply a name to the ``persistent`` argument of the Squirrel
198 constructor. Persistent selections are shared among applications using the
199 same database.
201 **Method summary**
203 Some of the methods are implemented in :py:class:`Squirrel`'s base class
204 :py:class:`~pyrocko.squirrel.selection.Selection`.
206 .. autosummary::
208 ~Squirrel.add
209 ~Squirrel.add_source
210 ~Squirrel.add_fdsn
211 ~Squirrel.add_catalog
212 ~Squirrel.add_dataset
213 ~Squirrel.add_virtual
214 ~Squirrel.update
215 ~Squirrel.update_waveform_promises
216 ~Squirrel.advance_accessor
217 ~Squirrel.clear_accessor
218 ~Squirrel.reload
219 ~pyrocko.squirrel.selection.Selection.iter_paths
220 ~Squirrel.iter_nuts
221 ~Squirrel.iter_kinds
222 ~Squirrel.iter_deltats
223 ~Squirrel.iter_codes
224 ~pyrocko.squirrel.selection.Selection.get_paths
225 ~Squirrel.get_nuts
226 ~Squirrel.get_kinds
227 ~Squirrel.get_deltats
228 ~Squirrel.get_codes
229 ~Squirrel.get_counts
230 ~Squirrel.get_time_span
231 ~Squirrel.get_deltat_span
232 ~Squirrel.get_nfiles
233 ~Squirrel.get_nnuts
234 ~Squirrel.get_total_size
235 ~Squirrel.get_stats
236 ~Squirrel.get_content
237 ~Squirrel.get_stations
238 ~Squirrel.get_channels
239 ~Squirrel.get_responses
240 ~Squirrel.get_events
241 ~Squirrel.get_waveform_nuts
242 ~Squirrel.get_waveforms
243 ~Squirrel.chopper_waveforms
244 ~Squirrel.get_coverage
245 ~Squirrel.pile
246 ~Squirrel.snuffle
247 ~Squirrel.glob_codes
248 ~pyrocko.squirrel.selection.Selection.get_database
249 ~Squirrel.print_tables
250 '''
252 def __init__(
253 self, env=None, database=None, cache_path=None, persistent=None):
255 if not isinstance(env, environment.Environment):
256 env = environment.get_environment(env)
258 if database is None:
259 database = env.expand_path(env.database_path)
261 if cache_path is None:
262 cache_path = env.expand_path(env.cache_path)
264 if persistent is None:
265 persistent = env.persistent
267 Selection.__init__(
268 self, database=database, persistent=persistent)
270 self.get_database().set_basepath(os.path.dirname(env.get_basepath()))
272 self._content_caches = {
273 'waveform': cache.ContentCache(),
274 'default': cache.ContentCache()}
276 self._cache_path = cache_path
278 self._sources = []
279 self._operators = []
280 self._operator_registry = {}
282 self._pile = None
283 self._n_choppers_active = 0
285 self._names.update({
286 'nuts': self.name + '_nuts',
287 'kind_codes_count': self.name + '_kind_codes_count',
288 'coverage': self.name + '_coverage'})
290 with self.transaction('create tables') as cursor:
291 self._create_tables_squirrel(cursor)
293 def _create_tables_squirrel(self, cursor):
295 cursor.execute(self._register_table(self._sql(
296 '''
297 CREATE TABLE IF NOT EXISTS %(db)s.%(nuts)s (
298 nut_id integer PRIMARY KEY,
299 file_id integer,
300 file_segment integer,
301 file_element integer,
302 kind_id integer,
303 kind_codes_id integer,
304 tmin_seconds integer,
305 tmin_offset integer,
306 tmax_seconds integer,
307 tmax_offset integer,
308 kscale integer)
309 ''')))
311 cursor.execute(self._register_table(self._sql(
312 '''
313 CREATE TABLE IF NOT EXISTS %(db)s.%(kind_codes_count)s (
314 kind_codes_id integer PRIMARY KEY,
315 count integer)
316 ''')))
318 cursor.execute(self._sql(
319 '''
320 CREATE UNIQUE INDEX IF NOT EXISTS %(db)s.%(nuts)s_file_element
321 ON %(nuts)s (file_id, file_segment, file_element)
322 '''))
324 cursor.execute(self._sql(
325 '''
326 CREATE INDEX IF NOT EXISTS %(db)s.%(nuts)s_index_file_id
327 ON %(nuts)s (file_id)
328 '''))
330 cursor.execute(self._sql(
331 '''
332 CREATE INDEX IF NOT EXISTS %(db)s.%(nuts)s_index_tmin_seconds
333 ON %(nuts)s (kind_id, tmin_seconds)
334 '''))
336 cursor.execute(self._sql(
337 '''
338 CREATE INDEX IF NOT EXISTS %(db)s.%(nuts)s_index_tmax_seconds
339 ON %(nuts)s (kind_id, tmax_seconds)
340 '''))
342 cursor.execute(self._sql(
343 '''
344 CREATE INDEX IF NOT EXISTS %(db)s.%(nuts)s_index_kscale
345 ON %(nuts)s (kind_id, kscale, tmin_seconds)
346 '''))
348 cursor.execute(self._sql(
349 '''
350 CREATE TRIGGER IF NOT EXISTS %(db)s.%(nuts)s_delete_nuts
351 BEFORE DELETE ON main.files FOR EACH ROW
352 BEGIN
353 DELETE FROM %(nuts)s WHERE file_id == old.file_id;
354 END
355 '''))
357 # trigger only on size to make silent update of mtime possible
358 cursor.execute(self._sql(
359 '''
360 CREATE TRIGGER IF NOT EXISTS %(db)s.%(nuts)s_delete_nuts2
361 BEFORE UPDATE OF size ON main.files FOR EACH ROW
362 BEGIN
363 DELETE FROM %(nuts)s WHERE file_id == old.file_id;
364 END
365 '''))
367 cursor.execute(self._sql(
368 '''
369 CREATE TRIGGER IF NOT EXISTS
370 %(db)s.%(file_states)s_delete_files
371 BEFORE DELETE ON %(db)s.%(file_states)s FOR EACH ROW
372 BEGIN
373 DELETE FROM %(nuts)s WHERE file_id == old.file_id;
374 END
375 '''))
377 cursor.execute(self._sql(
378 '''
379 CREATE TRIGGER IF NOT EXISTS %(db)s.%(nuts)s_inc_kind_codes
380 BEFORE INSERT ON %(nuts)s FOR EACH ROW
381 BEGIN
382 INSERT OR IGNORE INTO %(kind_codes_count)s VALUES
383 (new.kind_codes_id, 0);
384 UPDATE %(kind_codes_count)s
385 SET count = count + 1
386 WHERE new.kind_codes_id
387 == %(kind_codes_count)s.kind_codes_id;
388 END
389 '''))
391 cursor.execute(self._sql(
392 '''
393 CREATE TRIGGER IF NOT EXISTS %(db)s.%(nuts)s_dec_kind_codes
394 BEFORE DELETE ON %(nuts)s FOR EACH ROW
395 BEGIN
396 UPDATE %(kind_codes_count)s
397 SET count = count - 1
398 WHERE old.kind_codes_id
399 == %(kind_codes_count)s.kind_codes_id;
400 END
401 '''))
403 cursor.execute(self._register_table(self._sql(
404 '''
405 CREATE TABLE IF NOT EXISTS %(db)s.%(coverage)s (
406 kind_codes_id integer,
407 time_seconds integer,
408 time_offset integer,
409 step integer)
410 ''')))
412 cursor.execute(self._sql(
413 '''
414 CREATE UNIQUE INDEX IF NOT EXISTS %(db)s.%(coverage)s_time
415 ON %(coverage)s (kind_codes_id, time_seconds, time_offset)
416 '''))
418 cursor.execute(self._sql(
419 '''
420 CREATE TRIGGER IF NOT EXISTS %(db)s.%(nuts)s_add_coverage
421 AFTER INSERT ON %(nuts)s FOR EACH ROW
422 BEGIN
423 INSERT OR IGNORE INTO %(coverage)s VALUES
424 (new.kind_codes_id, new.tmin_seconds, new.tmin_offset, 0)
425 ;
426 UPDATE %(coverage)s
427 SET step = step + 1
428 WHERE new.kind_codes_id == %(coverage)s.kind_codes_id
429 AND new.tmin_seconds == %(coverage)s.time_seconds
430 AND new.tmin_offset == %(coverage)s.time_offset
431 ;
432 INSERT OR IGNORE INTO %(coverage)s VALUES
433 (new.kind_codes_id, new.tmax_seconds, new.tmax_offset, 0)
434 ;
435 UPDATE %(coverage)s
436 SET step = step - 1
437 WHERE new.kind_codes_id == %(coverage)s.kind_codes_id
438 AND new.tmax_seconds == %(coverage)s.time_seconds
439 AND new.tmax_offset == %(coverage)s.time_offset
440 ;
441 DELETE FROM %(coverage)s
442 WHERE new.kind_codes_id == %(coverage)s.kind_codes_id
443 AND new.tmin_seconds == %(coverage)s.time_seconds
444 AND new.tmin_offset == %(coverage)s.time_offset
445 AND step == 0
446 ;
447 DELETE FROM %(coverage)s
448 WHERE new.kind_codes_id == %(coverage)s.kind_codes_id
449 AND new.tmax_seconds == %(coverage)s.time_seconds
450 AND new.tmax_offset == %(coverage)s.time_offset
451 AND step == 0
452 ;
453 END
454 '''))
456 cursor.execute(self._sql(
457 '''
458 CREATE TRIGGER IF NOT EXISTS %(db)s.%(nuts)s_remove_coverage
459 BEFORE DELETE ON %(nuts)s FOR EACH ROW
460 BEGIN
461 INSERT OR IGNORE INTO %(coverage)s VALUES
462 (old.kind_codes_id, old.tmin_seconds, old.tmin_offset, 0)
463 ;
464 UPDATE %(coverage)s
465 SET step = step - 1
466 WHERE old.kind_codes_id == %(coverage)s.kind_codes_id
467 AND old.tmin_seconds == %(coverage)s.time_seconds
468 AND old.tmin_offset == %(coverage)s.time_offset
469 ;
470 INSERT OR IGNORE INTO %(coverage)s VALUES
471 (old.kind_codes_id, old.tmax_seconds, old.tmax_offset, 0)
472 ;
473 UPDATE %(coverage)s
474 SET step = step + 1
475 WHERE old.kind_codes_id == %(coverage)s.kind_codes_id
476 AND old.tmax_seconds == %(coverage)s.time_seconds
477 AND old.tmax_offset == %(coverage)s.time_offset
478 ;
479 DELETE FROM %(coverage)s
480 WHERE old.kind_codes_id == %(coverage)s.kind_codes_id
481 AND old.tmin_seconds == %(coverage)s.time_seconds
482 AND old.tmin_offset == %(coverage)s.time_offset
483 AND step == 0
484 ;
485 DELETE FROM %(coverage)s
486 WHERE old.kind_codes_id == %(coverage)s.kind_codes_id
487 AND old.tmax_seconds == %(coverage)s.time_seconds
488 AND old.tmax_offset == %(coverage)s.time_offset
489 AND step == 0
490 ;
491 END
492 '''))
494 def _delete(self):
495 '''Delete database tables associated with this Squirrel.'''
497 with self.transaction('delete tables') as cursor:
498 for s in '''
499 DROP TRIGGER %(db)s.%(nuts)s_delete_nuts;
500 DROP TRIGGER %(db)s.%(nuts)s_delete_nuts2;
501 DROP TRIGGER %(db)s.%(file_states)s_delete_files;
502 DROP TRIGGER %(db)s.%(nuts)s_inc_kind_codes;
503 DROP TRIGGER %(db)s.%(nuts)s_dec_kind_codes;
504 DROP TABLE %(db)s.%(nuts)s;
505 DROP TABLE %(db)s.%(kind_codes_count)s;
506 DROP TRIGGER IF EXISTS %(db)s.%(nuts)s_add_coverage;
507 DROP TRIGGER IF EXISTS %(db)s.%(nuts)s_remove_coverage;
508 DROP TABLE IF EXISTS %(db)s.%(coverage)s;
509 '''.strip().splitlines():
511 cursor.execute(self._sql(s))
513 Selection._delete(self)
515 @filldocs
516 def add(self,
517 paths,
518 kinds=None,
519 format='detect',
520 include=None,
521 exclude=None,
522 check=True):
524 '''
525 Add files to the selection.
527 :param paths:
528 Iterator yielding paths to files or directories to be added to the
529 selection. Recurses into directories. If given a ``str``, it
530 is treated as a single path to be added.
531 :type paths:
532 :py:class:`list` of :py:class:`str`
534 :param kinds:
535 Content types to be made available through the Squirrel selection.
536 By default, all known content types are accepted.
537 :type kinds:
538 :py:class:`list` of :py:class:`str`
540 :param format:
541 File format identifier or ``'detect'`` to enable auto-detection
542 (available: %(file_formats)s).
543 :type format:
544 str
546 :param include:
547 If not ``None``, files are only included if their paths match the
548 given regular expression pattern.
549 :type format:
550 str
552 :param exclude:
553 If not ``None``, files are only included if their paths do not
554 match the given regular expression pattern.
555 :type format:
556 str
558 :param check:
559 If ``True``, all file modification times are checked to see if
560 cached information has to be updated (slow). If ``False``, only
561 previously unknown files are indexed and cached information is used
562 for known files, regardless of file state (fast, corrresponds to
563 Squirrel's ``--optimistic`` mode). File deletions will go
564 undetected in the latter case.
565 :type check:
566 bool
568 :Complexity:
569 O(log N)
570 '''
572 if isinstance(kinds, str):
573 kinds = (kinds,)
575 if isinstance(paths, str):
576 paths = [paths]
578 kind_mask = model.to_kind_mask(kinds)
580 with progress.view():
581 Selection.add(
582 self, util.iter_select_files(
583 paths,
584 show_progress=False,
585 include=include,
586 exclude=exclude,
587 pass_through=lambda path: path.startswith('virtual:')
588 ), kind_mask, format)
590 self._load(check)
591 self._update_nuts()
593 def reload(self):
594 '''
595 Check for modifications and reindex modified files.
597 Based on file modification times.
598 '''
600 self._set_file_states_force_check()
601 self._load(check=True)
602 self._update_nuts()
604 def add_virtual(self, nuts, virtual_paths=None):
605 '''
606 Add content which is not backed by files.
608 :param nuts:
609 Content pieces to be added.
610 :type nuts:
611 iterator yielding :py:class:`~pyrocko.squirrel.model.Nut` objects
613 :param virtual_paths:
614 List of virtual paths to prevent creating a temporary list of the
615 nuts while aggregating the file paths for the selection.
616 :type virtual_paths:
617 :py:class:`list` of :py:class:`str`
619 Stores to the main database and the selection.
620 '''
622 if isinstance(virtual_paths, str):
623 virtual_paths = [virtual_paths]
625 if virtual_paths is None:
626 if not isinstance(nuts, list):
627 nuts = list(nuts)
628 virtual_paths = set(nut.file_path for nut in nuts)
630 Selection.add(self, virtual_paths)
631 self.get_database().dig(nuts)
632 self._update_nuts()
634 def add_volatile(self, nuts):
635 if not isinstance(nuts, list):
636 nuts = list(nuts)
638 paths = list(set(nut.file_path for nut in nuts))
639 io.backends.virtual.add_nuts(nuts)
640 self.add_virtual(nuts, paths)
641 self._volatile_paths.extend(paths)
643 def add_volatile_waveforms(self, traces):
644 '''
645 Add in-memory waveforms which will be removed when the app closes.
646 '''
648 name = model.random_name()
650 path = 'virtual:volatile:%s' % name
652 nuts = []
653 for itr, tr in enumerate(traces):
654 assert tr.tmin <= tr.tmax
655 tmin_seconds, tmin_offset = model.tsplit(tr.tmin)
656 tmax_seconds, tmax_offset = model.tsplit(
657 tr.tmin + tr.data_len()*tr.deltat)
659 nuts.append(model.Nut(
660 file_path=path,
661 file_format='virtual',
662 file_segment=itr,
663 file_element=0,
664 file_mtime=0,
665 codes=tr.codes,
666 tmin_seconds=tmin_seconds,
667 tmin_offset=tmin_offset,
668 tmax_seconds=tmax_seconds,
669 tmax_offset=tmax_offset,
670 deltat=tr.deltat,
671 kind_id=to_kind_id('waveform'),
672 content=tr))
674 self.add_volatile(nuts)
675 return path
677 def _load(self, check):
678 for _ in io.iload(
679 self,
680 content=[],
681 skip_unchanged=True,
682 check=check):
683 pass
685 def _update_nuts(self, transaction=None):
686 transaction = transaction or self.transaction('update nuts')
687 with make_task('Aggregating selection') as task, \
688 transaction as cursor:
690 self._conn.set_progress_handler(task.update, 100000)
691 nrows = cursor.execute(self._sql(
692 '''
693 INSERT INTO %(db)s.%(nuts)s
694 SELECT NULL,
695 nuts.file_id, nuts.file_segment, nuts.file_element,
696 nuts.kind_id, nuts.kind_codes_id,
697 nuts.tmin_seconds, nuts.tmin_offset,
698 nuts.tmax_seconds, nuts.tmax_offset,
699 nuts.kscale
700 FROM %(db)s.%(file_states)s
701 INNER JOIN nuts
702 ON %(db)s.%(file_states)s.file_id == nuts.file_id
703 INNER JOIN kind_codes
704 ON nuts.kind_codes_id ==
705 kind_codes.kind_codes_id
706 WHERE %(db)s.%(file_states)s.file_state != 2
707 AND (((1 << kind_codes.kind_id)
708 & %(db)s.%(file_states)s.kind_mask) != 0)
709 ''')).rowcount
711 task.update(nrows)
712 self._set_file_states_known(transaction)
713 self._conn.set_progress_handler(None, 0)
715 def add_source(self, source, check=True):
716 '''
717 Add remote resource.
719 :param source:
720 Remote data access client instance.
721 :type source:
722 subclass of :py:class:`~pyrocko.squirrel.client.base.Source`
723 '''
725 self._sources.append(source)
726 source.setup(self, check=check)
728 def add_fdsn(self, *args, **kwargs):
729 '''
730 Add FDSN site for transparent remote data access.
732 Arguments are passed to
733 :py:class:`~pyrocko.squirrel.client.fdsn.FDSNSource`.
734 '''
736 self.add_source(fdsn.FDSNSource(*args, **kwargs))
738 def add_catalog(self, *args, **kwargs):
739 '''
740 Add online catalog for transparent event data access.
742 Arguments are passed to
743 :py:class:`~pyrocko.squirrel.client.catalog.CatalogSource`.
744 '''
746 self.add_source(catalog.CatalogSource(*args, **kwargs))
748 def add_dataset(self, ds, check=True, warn_persistent=True):
749 '''
750 Read dataset description from file and add its contents.
752 :param ds:
753 Path to dataset description file or dataset description object
754 . See :py:mod:`~pyrocko.squirrel.dataset`.
755 :type ds:
756 :py:class:`str` or :py:class:`~pyrocko.squirrel.dataset.Dataset`
758 :param check:
759 If ``True``, all file modification times are checked to see if
760 cached information has to be updated (slow). If ``False``, only
761 previously unknown files are indexed and cached information is used
762 for known files, regardless of file state (fast, corrresponds to
763 Squirrel's ``--optimistic`` mode). File deletions will go
764 undetected in the latter case.
765 :type check:
766 bool
767 '''
768 if isinstance(ds, str):
769 ds = dataset.read_dataset(ds)
770 path = ds
771 else:
772 path = None
774 if warn_persistent and ds.persistent and (
775 not self._persistent or (self._persistent != ds.persistent)):
777 logger.warning(
778 'Dataset `persistent` flag ignored. Can not be set on already '
779 'existing Squirrel instance.%s' % (
780 ' Dataset: %s' % path if path else ''))
782 ds.setup(self, check=check)
784 def _get_selection_args(
785 self, kind_id,
786 obj=None, tmin=None, tmax=None, time=None, codes=None):
788 if codes is not None:
789 codes = to_codes(kind_id, codes)
791 if time is not None:
792 tmin = time
793 tmax = time
795 if obj is not None:
796 tmin = tmin if tmin is not None else obj.tmin
797 tmax = tmax if tmax is not None else obj.tmax
798 codes = codes if codes is not None else obj.codes
800 return tmin, tmax, codes
802 def _selection_args_to_kwargs(
803 self, obj=None, tmin=None, tmax=None, time=None, codes=None):
805 return dict(obj=obj, tmin=tmin, tmax=tmax, time=time, codes=codes)
807 def _timerange_sql(self, tmin, tmax, kind, cond, args, naiv):
809 tmin_seconds, tmin_offset = model.tsplit(tmin)
810 tmax_seconds, tmax_offset = model.tsplit(tmax)
811 if naiv:
812 cond.append('%(db)s.%(nuts)s.tmin_seconds <= ?')
813 args.append(tmax_seconds)
814 else:
815 tscale_edges = model.tscale_edges
816 tmin_cond = []
817 for kscale in range(tscale_edges.size + 1):
818 if kscale != tscale_edges.size:
819 tscale = int(tscale_edges[kscale])
820 tmin_cond.append('''
821 (%(db)s.%(nuts)s.kind_id = ?
822 AND %(db)s.%(nuts)s.kscale == ?
823 AND %(db)s.%(nuts)s.tmin_seconds BETWEEN ? AND ?)
824 ''')
825 args.extend(
826 (to_kind_id(kind), kscale,
827 tmin_seconds - tscale - 1, tmax_seconds + 1))
829 else:
830 tmin_cond.append('''
831 (%(db)s.%(nuts)s.kind_id == ?
832 AND %(db)s.%(nuts)s.kscale == ?
833 AND %(db)s.%(nuts)s.tmin_seconds <= ?)
834 ''')
836 args.extend(
837 (to_kind_id(kind), kscale, tmax_seconds + 1))
838 if tmin_cond:
839 cond.append(' ( ' + ' OR '.join(tmin_cond) + ' ) ')
841 cond.append('%(db)s.%(nuts)s.tmax_seconds >= ?')
842 args.append(tmin_seconds)
844 def iter_nuts(
845 self, kind=None, tmin=None, tmax=None, codes=None, naiv=False,
846 kind_codes_ids=None, path=None):
848 '''
849 Iterate over content entities matching given constraints.
851 :param kind:
852 Content kind (or kinds) to extract.
853 :type kind:
854 :py:class:`str`, :py:class:`list` of :py:class:`str`
856 :param tmin:
857 Start time of query interval.
858 :type tmin:
859 timestamp
861 :param tmax:
862 End time of query interval.
863 :type tmax:
864 timestamp
866 :param codes:
867 Pattern of content codes to query.
868 :type codes:
869 :py:class:`tuple` of :py:class:`str`
871 :param naiv:
872 Bypass time span lookup through indices (slow, for testing).
873 :type naiv:
874 :py:class:`bool`
876 :param kind_codes_ids:
877 Kind-codes IDs of contents to be retrieved (internal use).
878 :type kind_codes_ids:
879 :py:class:`list` of :py:class:`int`
881 :yields:
882 :py:class:`~pyrocko.squirrel.model.Nut` objects representing the
883 intersecting content.
885 :complexity:
886 O(log N) for the time selection part due to heavy use of database
887 indices.
889 Query time span is treated as a half-open interval ``[tmin, tmax)``.
890 However, if ``tmin`` equals ``tmax``, the edge logics are modified to
891 closed-interval so that content intersecting with the time instant ``t
892 = tmin = tmax`` is returned (otherwise nothing would be returned as
893 ``[t, t)`` never matches anything).
895 Time spans of content entities to be matched are also treated as half
896 open intervals, e.g. content span ``[0, 1)`` is matched by query span
897 ``[0, 1)`` but not by ``[-1, 0)`` or ``[1, 2)``. Also here, logics are
898 modified to closed-interval when the content time span is an empty
899 interval, i.e. to indicate a time instant. E.g. time instant 0 is
900 matched by ``[0, 1)`` but not by ``[-1, 0)`` or ``[1, 2)``.
901 '''
903 if not isinstance(kind, str):
904 if kind is None:
905 kind = model.g_content_kinds
906 for kind_ in kind:
907 for nut in self.iter_nuts(kind_, tmin, tmax, codes):
908 yield nut
910 return
912 cond = []
913 args = []
914 if tmin is not None or tmax is not None:
915 assert kind is not None
916 if tmin is None:
917 tmin = self.get_time_span()[0]
918 if tmax is None:
919 tmax = self.get_time_span()[1] + 1.0
921 self._timerange_sql(tmin, tmax, kind, cond, args, naiv)
923 elif kind is not None:
924 cond.append('kind_codes.kind_id == ?')
925 args.append(to_kind_id(kind))
927 if codes is not None:
928 pats = codes_patterns_for_kind(kind, codes)
929 if pats:
930 cond.append(
931 ' ( %s ) ' % ' OR '.join(
932 ('kind_codes.codes GLOB ?',) * len(pats)))
933 args.extend(pat.safe_str for pat in pats)
935 if kind_codes_ids is not None:
936 cond.append(
937 ' ( kind_codes.kind_codes_id IN ( %s ) ) ' % ', '.join(
938 '?'*len(kind_codes_ids)))
940 args.extend(kind_codes_ids)
942 db = self.get_database()
943 if path is not None:
944 cond.append('files.path == ?')
945 args.append(db.relpath(abspath(path)))
947 sql = ('''
948 SELECT
949 files.path,
950 files.format,
951 files.mtime,
952 files.size,
953 %(db)s.%(nuts)s.file_segment,
954 %(db)s.%(nuts)s.file_element,
955 kind_codes.kind_id,
956 kind_codes.codes,
957 %(db)s.%(nuts)s.tmin_seconds,
958 %(db)s.%(nuts)s.tmin_offset,
959 %(db)s.%(nuts)s.tmax_seconds,
960 %(db)s.%(nuts)s.tmax_offset,
961 kind_codes.deltat
962 FROM files
963 INNER JOIN %(db)s.%(nuts)s
964 ON files.file_id == %(db)s.%(nuts)s.file_id
965 INNER JOIN kind_codes
966 ON %(db)s.%(nuts)s.kind_codes_id == kind_codes.kind_codes_id
967 ''')
969 if cond:
970 sql += ''' WHERE ''' + ' AND '.join(cond)
972 sql = self._sql(sql)
973 if tmin is None and tmax is None:
974 for row in self._conn.execute(sql, args):
975 row = (db.abspath(row[0]),) + row[1:]
976 nut = model.Nut(values_nocheck=row)
977 yield nut
978 else:
979 assert tmin is not None and tmax is not None
980 if tmin == tmax:
981 for row in self._conn.execute(sql, args):
982 row = (db.abspath(row[0]),) + row[1:]
983 nut = model.Nut(values_nocheck=row)
984 if (nut.tmin <= tmin < nut.tmax) \
985 or (nut.tmin == nut.tmax and tmin == nut.tmin):
987 yield nut
988 else:
989 for row in self._conn.execute(sql, args):
990 row = (db.abspath(row[0]),) + row[1:]
991 nut = model.Nut(values_nocheck=row)
992 if (tmin < nut.tmax and nut.tmin < tmax) \
993 or (nut.tmin == nut.tmax
994 and tmin <= nut.tmin < tmax):
996 yield nut
998 def get_nuts(self, *args, **kwargs):
999 '''
1000 Get content entities matching given constraints.
1002 Like :py:meth:`iter_nuts` but returns results as a list.
1003 '''
1005 return list(self.iter_nuts(*args, **kwargs))
1007 def _split_nuts(
1008 self, kind, tmin=None, tmax=None, codes=None, path=None):
1010 tmin_seconds, tmin_offset = model.tsplit(tmin)
1011 tmax_seconds, tmax_offset = model.tsplit(tmax)
1013 names_main_nuts = dict(self._names)
1014 names_main_nuts.update(db='main', nuts='nuts')
1016 db = self.get_database()
1018 def main_nuts(s):
1019 return s % names_main_nuts
1021 with self.transaction('split nuts') as cursor:
1022 # modify selection and main
1023 for sql_subst in [
1024 self._sql, main_nuts]:
1026 cond = []
1027 args = []
1029 self._timerange_sql(tmin, tmax, kind, cond, args, False)
1031 if codes is not None:
1032 pats = codes_patterns_for_kind(kind, codes)
1033 if pats:
1034 cond.append(
1035 ' ( %s ) ' % ' OR '.join(
1036 ('kind_codes.codes GLOB ?',) * len(pats)))
1037 args.extend(pat.safe_str for pat in pats)
1039 if path is not None:
1040 cond.append('files.path == ?')
1041 args.append(db.relpath(abspath(path)))
1043 sql = sql_subst('''
1044 SELECT
1045 %(db)s.%(nuts)s.nut_id,
1046 %(db)s.%(nuts)s.tmin_seconds,
1047 %(db)s.%(nuts)s.tmin_offset,
1048 %(db)s.%(nuts)s.tmax_seconds,
1049 %(db)s.%(nuts)s.tmax_offset,
1050 kind_codes.deltat
1051 FROM files
1052 INNER JOIN %(db)s.%(nuts)s
1053 ON files.file_id == %(db)s.%(nuts)s.file_id
1054 INNER JOIN kind_codes
1055 ON %(db)s.%(nuts)s.kind_codes_id == kind_codes.kind_codes_id
1056 WHERE ''' + ' AND '.join(cond)) # noqa
1058 insert = []
1059 delete = []
1060 for row in cursor.execute(sql, args):
1061 nut_id, nut_tmin_seconds, nut_tmin_offset, \
1062 nut_tmax_seconds, nut_tmax_offset, nut_deltat = row
1064 nut_tmin = model.tjoin(
1065 nut_tmin_seconds, nut_tmin_offset)
1066 nut_tmax = model.tjoin(
1067 nut_tmax_seconds, nut_tmax_offset)
1069 if nut_tmin < tmax and tmin < nut_tmax:
1070 if nut_tmin < tmin:
1071 insert.append((
1072 nut_tmin_seconds, nut_tmin_offset,
1073 tmin_seconds, tmin_offset,
1074 model.tscale_to_kscale(
1075 tmin_seconds - nut_tmin_seconds),
1076 nut_id))
1078 if tmax < nut_tmax:
1079 insert.append((
1080 tmax_seconds, tmax_offset,
1081 nut_tmax_seconds, nut_tmax_offset,
1082 model.tscale_to_kscale(
1083 nut_tmax_seconds - tmax_seconds),
1084 nut_id))
1086 delete.append((nut_id,))
1088 sql_add = '''
1089 INSERT INTO %(db)s.%(nuts)s (
1090 file_id, file_segment, file_element, kind_id,
1091 kind_codes_id, tmin_seconds, tmin_offset,
1092 tmax_seconds, tmax_offset, kscale )
1093 SELECT
1094 file_id, file_segment, file_element,
1095 kind_id, kind_codes_id, ?, ?, ?, ?, ?
1096 FROM %(db)s.%(nuts)s
1097 WHERE nut_id == ?
1098 '''
1099 cursor.executemany(sql_subst(sql_add), insert)
1101 sql_delete = '''
1102 DELETE FROM %(db)s.%(nuts)s WHERE nut_id == ?
1103 '''
1104 cursor.executemany(sql_subst(sql_delete), delete)
1106 def get_time_span(self, kinds=None):
1107 '''
1108 Get time interval over all content in selection.
1110 :param kinds:
1111 If not ``None``, restrict query to given content kinds.
1112 :type kind:
1113 list of str
1115 :complexity:
1116 O(1), independent of the number of nuts.
1118 :returns:
1119 ``(tmin, tmax)``, combined time interval of queried content kinds.
1120 '''
1122 sql_min = self._sql('''
1123 SELECT MIN(tmin_seconds), MIN(tmin_offset)
1124 FROM %(db)s.%(nuts)s
1125 WHERE kind_id == ?
1126 AND tmin_seconds == (
1127 SELECT MIN(tmin_seconds)
1128 FROM %(db)s.%(nuts)s
1129 WHERE kind_id == ?)
1130 ''')
1132 sql_max = self._sql('''
1133 SELECT MAX(tmax_seconds), MAX(tmax_offset)
1134 FROM %(db)s.%(nuts)s
1135 WHERE kind_id == ?
1136 AND tmax_seconds == (
1137 SELECT MAX(tmax_seconds)
1138 FROM %(db)s.%(nuts)s
1139 WHERE kind_id == ?)
1140 ''')
1142 gtmin = None
1143 gtmax = None
1145 if isinstance(kinds, str):
1146 kinds = [kinds]
1148 if kinds is None:
1149 kind_ids = model.g_content_kind_ids
1150 else:
1151 kind_ids = model.to_kind_ids(kinds)
1153 for kind_id in kind_ids:
1154 for tmin_seconds, tmin_offset in self._conn.execute(
1155 sql_min, (kind_id, kind_id)):
1156 tmin = model.tjoin(tmin_seconds, tmin_offset)
1157 if tmin is not None and (gtmin is None or tmin < gtmin):
1158 gtmin = tmin
1160 for (tmax_seconds, tmax_offset) in self._conn.execute(
1161 sql_max, (kind_id, kind_id)):
1162 tmax = model.tjoin(tmax_seconds, tmax_offset)
1163 if tmax is not None and (gtmax is None or tmax > gtmax):
1164 gtmax = tmax
1166 return gtmin, gtmax
1168 def has(self, kinds):
1169 '''
1170 Check availability of given content kinds.
1172 :param kinds:
1173 Content kinds to query.
1174 :type kind:
1175 list of str
1177 :returns:
1178 ``True`` if any of the queried content kinds is available
1179 in the selection.
1180 '''
1181 self_tmin, self_tmax = self.get_time_span(kinds)
1183 return None not in (self_tmin, self_tmax)
1185 def get_deltat_span(self, kind):
1186 '''
1187 Get min and max sampling interval of all content of given kind.
1189 :param kind:
1190 Content kind
1191 :type kind:
1192 str
1194 :returns: ``(deltat_min, deltat_max)``
1195 '''
1197 deltats = [
1198 deltat for deltat in self.get_deltats(kind)
1199 if deltat is not None]
1201 if deltats:
1202 return min(deltats), max(deltats)
1203 else:
1204 return None, None
1206 def iter_kinds(self, codes=None):
1207 '''
1208 Iterate over content types available in selection.
1210 :param codes:
1211 If given, get kinds only for selected codes identifier.
1212 :type codes:
1213 :py:class:`tuple` of :py:class:`str`
1215 :yields:
1216 Available content kinds as :py:class:`str`.
1218 :complexity:
1219 O(1), independent of number of nuts.
1220 '''
1222 return self._database._iter_kinds(
1223 codes=codes,
1224 kind_codes_count='%(db)s.%(kind_codes_count)s' % self._names)
1226 def iter_deltats(self, kind=None):
1227 '''
1228 Iterate over sampling intervals available in selection.
1230 :param kind:
1231 If given, get sampling intervals only for a given content type.
1232 :type kind:
1233 str
1235 :yields:
1236 :py:class:`float` values.
1238 :complexity:
1239 O(1), independent of number of nuts.
1240 '''
1241 return self._database._iter_deltats(
1242 kind=kind,
1243 kind_codes_count='%(db)s.%(kind_codes_count)s' % self._names)
1245 def iter_codes(self, kind=None):
1246 '''
1247 Iterate over content identifier code sequences available in selection.
1249 :param kind:
1250 If given, get codes only for a given content type.
1251 :type kind:
1252 str
1254 :yields:
1255 :py:class:`tuple` of :py:class:`str`
1257 :complexity:
1258 O(1), independent of number of nuts.
1259 '''
1260 return self._database._iter_codes(
1261 kind=kind,
1262 kind_codes_count='%(db)s.%(kind_codes_count)s' % self._names)
1264 def _iter_codes_info(self, kind=None):
1265 '''
1266 Iterate over number of occurrences of any (kind, codes) combination.
1268 :param kind:
1269 If given, get counts only for selected content type.
1270 :type kind:
1271 str
1273 :yields:
1274 Tuples of the form ``(kind, codes, deltat, kind_codes_id, count)``.
1276 :complexity:
1277 O(1), independent of number of nuts.
1278 '''
1279 return self._database._iter_codes_info(
1280 kind=kind,
1281 kind_codes_count='%(db)s.%(kind_codes_count)s' % self._names)
1283 def get_kinds(self, codes=None):
1284 '''
1285 Get content types available in selection.
1287 :param codes:
1288 If given, get kinds only for selected codes identifier.
1289 :type codes:
1290 :py:class:`tuple` of :py:class:`str`
1292 :returns:
1293 Sorted list of available content types.
1295 :complexity:
1296 O(1), independent of number of nuts.
1298 '''
1299 return sorted(list(self.iter_kinds(codes=codes)))
1301 def get_deltats(self, kind=None):
1302 '''
1303 Get sampling intervals available in selection.
1305 :param kind:
1306 If given, get sampling intervals only for selected content type.
1307 :type kind:
1308 str
1310 :complexity:
1311 O(1), independent of number of nuts.
1313 :returns: Sorted list of available sampling intervals.
1314 '''
1315 return sorted(list(self.iter_deltats(kind=kind)))
1317 def get_codes(self, kind=None):
1318 '''
1319 Get identifier code sequences available in selection.
1321 :param kind:
1322 If given, get codes only for selected content type.
1323 :type kind:
1324 str
1326 :complexity:
1327 O(1), independent of number of nuts.
1329 :returns: Sorted list of available codes as tuples of strings.
1330 '''
1331 return sorted(list(self.iter_codes(kind=kind)))
1333 def get_counts(self, kind=None):
1334 '''
1335 Get number of occurrences of any (kind, codes) combination.
1337 :param kind:
1338 If given, get codes only for selected content type.
1339 :type kind:
1340 str
1342 :complexity:
1343 O(1), independent of number of nuts.
1345 :returns: ``dict`` with ``counts[kind][codes]`` or ``counts[codes]``
1346 if kind is not ``None``
1347 '''
1348 d = {}
1349 for kind_id, codes, _, _, count in self._iter_codes_info(kind=kind):
1350 if kind_id not in d:
1351 v = d[kind_id] = {}
1352 else:
1353 v = d[kind_id]
1355 if codes not in v:
1356 v[codes] = 0
1358 v[codes] += count
1360 if kind is not None:
1361 return d[to_kind_id(kind)]
1362 else:
1363 return dict((to_kind(kind_id), v) for (kind_id, v) in d.items())
1365 def glob_codes(self, kind, codes_list):
1366 '''
1367 Find codes matching given patterns.
1369 :param kind:
1370 Content kind to be queried.
1371 :type kind:
1372 str
1374 :param codes_list:
1375 List of code patterns to query. If not given or empty, an empty
1376 list is returned.
1377 :type codes_list:
1378 :py:class:`list` of :py:class:`tuple` of :py:class:`str`
1380 :returns:
1381 List of matches of the form ``[kind_codes_id, codes, deltat]``.
1382 '''
1384 args = [to_kind_id(kind)]
1385 pats = []
1386 for codes in codes_list:
1387 pats.extend(codes_patterns_for_kind(kind, codes))
1389 if pats:
1390 codes_cond = 'AND ( %s ) ' % ' OR '.join(
1391 ('kind_codes.codes GLOB ?',) * len(pats))
1393 args.extend(pat.safe_str for pat in pats)
1394 else:
1395 codes_cond = ''
1397 sql = self._sql('''
1398 SELECT kind_codes_id, codes, deltat FROM kind_codes
1399 WHERE
1400 kind_id == ? ''' + codes_cond)
1402 return list(map(list, self._conn.execute(sql, args)))
1404 def update(self, constraint=None, **kwargs):
1405 '''
1406 Update or partially update channel and event inventories.
1408 :param constraint:
1409 Selection of times or areas to be brought up to date.
1410 :type constraint:
1411 :py:class:`~pyrocko.squirrel.client.base.Constraint`
1413 :param \\*\\*kwargs:
1414 Shortcut for setting ``constraint=Constraint(**kwargs)``.
1416 This function triggers all attached remote sources, to check for
1417 updates in the meta-data. The sources will only submit queries when
1418 their expiration date has passed, or if the selection spans into
1419 previously unseen times or areas.
1420 '''
1422 if constraint is None:
1423 constraint = client.Constraint(**kwargs)
1425 for source in self._sources:
1426 source.update_channel_inventory(self, constraint)
1427 source.update_event_inventory(self, constraint)
1429 def update_waveform_promises(self, constraint=None, **kwargs):
1430 '''
1431 Permit downloading of remote waveforms.
1433 :param constraint:
1434 Remote waveforms compatible with the given constraint are enabled
1435 for download.
1436 :type constraint:
1437 :py:class:`~pyrocko.squirrel.client.base.Constraint`
1439 :param \\*\\*kwargs:
1440 Shortcut for setting ``constraint=Constraint(**kwargs)``.
1442 Calling this method permits Squirrel to download waveforms from remote
1443 sources when processing subsequent waveform requests. This works by
1444 inserting so called waveform promises into the database. It will look
1445 into the available channels for each remote source and create a promise
1446 for each channel compatible with the given constraint. If the promise
1447 then matches in a waveform request, Squirrel tries to download the
1448 waveform. If the download is successful, the downloaded waveform is
1449 added to the Squirrel and the promise is deleted. If the download
1450 fails, the promise is kept if the reason of failure looks like being
1451 temporary, e.g. because of a network failure. If the cause of failure
1452 however seems to be permanent, the promise is deleted so that no
1453 further attempts are made to download a waveform which might not be
1454 available from that server at all. To force re-scheduling after a
1455 permanent failure, call :py:meth:`update_waveform_promises`
1456 yet another time.
1457 '''
1459 if constraint is None:
1460 constraint = client.Constraint(**kwargs)
1462 for source in self._sources:
1463 source.update_waveform_promises(self, constraint)
1465 def update_responses(self, constraint=None, **kwargs):
1466 # TODO
1467 if constraint is None:
1468 constraint = client.Constraint(**kwargs)
1470 print('contraint ignored atm')
1471 for source in self._sources:
1472 source.update_response_inventory(self, constraint)
1474 def get_nfiles(self):
1475 '''
1476 Get number of files in selection.
1477 '''
1479 sql = self._sql('''SELECT COUNT(*) FROM %(db)s.%(file_states)s''')
1480 for row in self._conn.execute(sql):
1481 return row[0]
1483 def get_nnuts(self):
1484 '''
1485 Get number of nuts in selection.
1486 '''
1488 sql = self._sql('''SELECT COUNT(*) FROM %(db)s.%(nuts)s''')
1489 for row in self._conn.execute(sql):
1490 return row[0]
1492 def get_total_size(self):
1493 '''
1494 Get aggregated file size available in selection.
1495 '''
1497 sql = self._sql('''
1498 SELECT SUM(files.size) FROM %(db)s.%(file_states)s
1499 INNER JOIN files
1500 ON %(db)s.%(file_states)s.file_id = files.file_id
1501 ''')
1503 for row in self._conn.execute(sql):
1504 return row[0] or 0
1506 def get_stats(self):
1507 '''
1508 Get statistics on contents available through this selection.
1509 '''
1511 kinds = self.get_kinds()
1512 time_spans = {}
1513 for kind in kinds:
1514 time_spans[kind] = self.get_time_span([kind])
1516 return SquirrelStats(
1517 nfiles=self.get_nfiles(),
1518 nnuts=self.get_nnuts(),
1519 kinds=kinds,
1520 codes=self.get_codes(),
1521 total_size=self.get_total_size(),
1522 counts=self.get_counts(),
1523 time_spans=time_spans,
1524 sources=[s.describe() for s in self._sources],
1525 operators=[op.describe() for op in self._operators])
1527 def get_content(
1528 self,
1529 nut,
1530 cache_id='default',
1531 accessor_id='default',
1532 show_progress=False):
1534 '''
1535 Get and possibly load full content for a given index entry from file.
1537 Loads the actual content objects (channel, station, waveform, ...) from
1538 file. For efficiency, sibling content (all stuff in the same file
1539 segment) will also be loaded as a side effect. The loaded contents are
1540 cached in the Squirrel object.
1541 '''
1543 content_cache = self._content_caches[cache_id]
1544 if not content_cache.has(nut):
1546 for nut_loaded in io.iload(
1547 nut.file_path,
1548 segment=nut.file_segment,
1549 format=nut.file_format,
1550 database=self._database,
1551 update_selection=self,
1552 show_progress=show_progress):
1554 content_cache.put(nut_loaded)
1556 try:
1557 return content_cache.get(nut, accessor_id)
1558 except KeyError:
1559 raise error.NotAvailable(
1560 'Unable to retrieve content: %s, %s, %s, %s' % nut.key)
1562 def advance_accessor(self, accessor_id, cache_id=None):
1563 '''
1564 Notify memory caches about consumer moving to a new data batch.
1566 :param accessor_id:
1567 Name of accessing consumer to be advanced.
1568 :type accessor_id:
1569 str
1571 :param cache_id:
1572 Name of cache to for which the accessor should be advanced. By
1573 default the named accessor is advanced in all registered caches.
1574 By default, two caches named ``'default'`` and ``'waveforms'`` are
1575 available.
1576 :type cache_id:
1577 str
1579 See :py:class:`~pyrocko.squirrel.cache.ContentCache` for details on how
1580 Squirrel's memory caching works and can be tuned. Default behaviour is
1581 to release data when it has not been used in the latest data
1582 window/batch. If the accessor is never advanced, data is cached
1583 indefinitely - which is often desired e.g. for station meta-data.
1584 Methods for consecutive data traversal, like
1585 :py:meth:`chopper_waveforms` automatically advance and clear
1586 their accessor.
1587 '''
1588 for cache_ in (
1589 self._content_caches.keys()
1590 if cache_id is None
1591 else [cache_id]):
1593 self._content_caches[cache_].advance_accessor(accessor_id)
1595 def clear_accessor(self, accessor_id, cache_id=None):
1596 '''
1597 Notify memory caches about a consumer having finished.
1599 :param accessor_id:
1600 Name of accessor to be cleared.
1601 :type accessor_id:
1602 str
1604 :param cache_id:
1605 Name of cache for which the accessor should be cleared. By default
1606 the named accessor is cleared from all registered caches. By
1607 default, two caches named ``'default'`` and ``'waveforms'`` are
1608 available.
1609 :type cache_id:
1610 str
1612 Calling this method clears all references to cache entries held by the
1613 named accessor. Cache entries are then freed if not referenced by any
1614 other accessor.
1615 '''
1617 for cache_ in (
1618 self._content_caches.keys()
1619 if cache_id is None
1620 else [cache_id]):
1622 self._content_caches[cache_].clear_accessor(accessor_id)
1624 def get_cache_stats(self, cache_id):
1625 return self._content_caches[cache_id].get_stats()
1627 def _check_duplicates(self, nuts):
1628 d = defaultdict(list)
1629 for nut in nuts:
1630 d[nut.codes].append(nut)
1632 for codes, group in d.items():
1633 if len(group) > 1:
1634 logger.warning(
1635 'Multiple entries matching codes: %s' % str(codes))
1637 @filldocs
1638 def get_stations(
1639 self, obj=None, tmin=None, tmax=None, time=None, codes=None,
1640 model='squirrel'):
1642 '''
1643 Get stations matching given constraints.
1645 %(query_args)s
1647 :param model:
1648 Select object model for returned values: ``'squirrel'`` to get
1649 Squirrel station objects or ``'pyrocko'`` to get Pyrocko station
1650 objects with channel information attached.
1651 :type model:
1652 str
1654 :returns:
1655 List of :py:class:`pyrocko.squirrel.Station
1656 <pyrocko.squirrel.model.Station>` objects by default or list of
1657 :py:class:`pyrocko.model.Station <pyrocko.model.station.Station>`
1658 objects if ``model='pyrocko'`` is requested.
1660 See :py:meth:`iter_nuts` for details on time span matching.
1661 '''
1663 if model == 'pyrocko':
1664 return self._get_pyrocko_stations(obj, tmin, tmax, time, codes)
1665 elif model == 'squirrel':
1666 args = self._get_selection_args(
1667 STATION, obj, tmin, tmax, time, codes)
1669 nuts = sorted(
1670 self.iter_nuts('station', *args), key=lambda nut: nut.dkey)
1671 self._check_duplicates(nuts)
1672 return [self.get_content(nut) for nut in nuts]
1673 else:
1674 raise ValueError('Invalid station model: %s' % model)
1676 @filldocs
1677 def get_channels(
1678 self, obj=None, tmin=None, tmax=None, time=None, codes=None):
1680 '''
1681 Get channels matching given constraints.
1683 %(query_args)s
1685 :returns:
1686 List of :py:class:`~pyrocko.squirrel.model.Channel` objects.
1688 See :py:meth:`iter_nuts` for details on time span matching.
1689 '''
1691 args = self._get_selection_args(
1692 CHANNEL, obj, tmin, tmax, time, codes)
1694 nuts = sorted(
1695 self.iter_nuts('channel', *args), key=lambda nut: nut.dkey)
1696 self._check_duplicates(nuts)
1697 return [self.get_content(nut) for nut in nuts]
1699 @filldocs
1700 def get_sensors(
1701 self, obj=None, tmin=None, tmax=None, time=None, codes=None):
1703 '''
1704 Get sensors matching given constraints.
1706 %(query_args)s
1708 :returns:
1709 List of :py:class:`~pyrocko.squirrel.model.Sensor` objects.
1711 See :py:meth:`iter_nuts` for details on time span matching.
1712 '''
1714 tmin, tmax, codes = self._get_selection_args(
1715 CHANNEL, obj, tmin, tmax, time, codes)
1717 if codes is not None:
1718 if codes.channel != '*':
1719 codes = codes.replace(codes.channel[:-1] + '?')
1721 nuts = sorted(
1722 self.iter_nuts(
1723 'channel', tmin, tmax, codes), key=lambda nut: nut.dkey)
1724 self._check_duplicates(nuts)
1725 return model.Sensor.from_channels(
1726 self.get_content(nut) for nut in nuts)
1728 @filldocs
1729 def get_responses(
1730 self, obj=None, tmin=None, tmax=None, time=None, codes=None):
1732 '''
1733 Get instrument responses matching given constraints.
1735 %(query_args)s
1737 :returns:
1738 List of :py:class:`~pyrocko.squirrel.model.Response` objects.
1740 See :py:meth:`iter_nuts` for details on time span matching.
1741 '''
1743 args = self._get_selection_args(
1744 RESPONSE, obj, tmin, tmax, time, codes)
1746 nuts = sorted(
1747 self.iter_nuts('response', *args), key=lambda nut: nut.dkey)
1748 self._check_duplicates(nuts)
1749 return [self.get_content(nut) for nut in nuts]
1751 @filldocs
1752 def get_response(
1753 self, obj=None, tmin=None, tmax=None, time=None, codes=None):
1755 '''
1756 Get instrument response matching given constraints.
1758 %(query_args)s
1760 :returns:
1761 :py:class:`~pyrocko.squirrel.model.Response` object.
1763 Same as :py:meth:`get_responses` but returning exactly one response.
1764 Raises :py:exc:`~pyrocko.squirrel.error.NotAvailable` if zero or more
1765 than one is available.
1767 See :py:meth:`iter_nuts` for details on time span matching.
1768 '''
1770 responses = self.get_responses(obj, tmin, tmax, time, codes)
1771 if len(responses) == 0:
1772 raise error.NotAvailable(
1773 'No instrument response available.')
1774 elif len(responses) > 1:
1775 raise error.NotAvailable(
1776 'Multiple instrument responses matching given constraints.')
1778 return responses[0]
1780 @filldocs
1781 def get_events(
1782 self, obj=None, tmin=None, tmax=None, time=None, codes=None):
1784 '''
1785 Get events matching given constraints.
1787 %(query_args)s
1789 :returns:
1790 List of :py:class:`~pyrocko.model.event.Event` objects.
1792 See :py:meth:`iter_nuts` for details on time span matching.
1793 '''
1795 args = self._get_selection_args(EVENT, obj, tmin, tmax, time, codes)
1796 nuts = sorted(
1797 self.iter_nuts('event', *args), key=lambda nut: nut.dkey)
1798 self._check_duplicates(nuts)
1799 return [self.get_content(nut) for nut in nuts]
1801 def _redeem_promises(self, *args):
1803 tmin, tmax, _ = args
1805 waveforms = list(self.iter_nuts('waveform', *args))
1806 promises = list(self.iter_nuts('waveform_promise', *args))
1808 codes_to_avail = defaultdict(list)
1809 for nut in waveforms:
1810 codes_to_avail[nut.codes].append((nut.tmin, nut.tmax))
1812 def tts(x):
1813 if isinstance(x, tuple):
1814 return tuple(tts(e) for e in x)
1815 elif isinstance(x, list):
1816 return list(tts(e) for e in x)
1817 else:
1818 return util.time_to_str(x)
1820 orders = []
1821 for promise in promises:
1822 waveforms_avail = codes_to_avail[promise.codes]
1823 for block_tmin, block_tmax in blocks(
1824 max(tmin, promise.tmin),
1825 min(tmax, promise.tmax),
1826 promise.deltat):
1828 orders.append(
1829 WaveformOrder(
1830 source_id=promise.file_path,
1831 codes=promise.codes,
1832 tmin=block_tmin,
1833 tmax=block_tmax,
1834 deltat=promise.deltat,
1835 gaps=gaps(waveforms_avail, block_tmin, block_tmax)))
1837 orders_noop, orders = lpick(lambda order: order.gaps, orders)
1839 order_keys_noop = set(order_key(order) for order in orders_noop)
1840 if len(order_keys_noop) != 0 or len(orders_noop) != 0:
1841 logger.info(
1842 'Waveform orders already satisified with cached/local data: '
1843 '%i (%i)' % (len(order_keys_noop), len(orders_noop)))
1845 source_ids = []
1846 sources = {}
1847 for source in self._sources:
1848 if isinstance(source, fdsn.FDSNSource):
1849 source_ids.append(source._source_id)
1850 sources[source._source_id] = source
1852 source_priority = dict(
1853 (source_id, i) for (i, source_id) in enumerate(source_ids))
1855 order_groups = defaultdict(list)
1856 for order in orders:
1857 order_groups[order_key(order)].append(order)
1859 for k, order_group in order_groups.items():
1860 order_group.sort(
1861 key=lambda order: source_priority[order.source_id])
1863 n_order_groups = len(order_groups)
1865 if len(order_groups) != 0 or len(orders) != 0:
1866 logger.info(
1867 'Waveform orders standing for download: %i (%i)'
1868 % (len(order_groups), len(orders)))
1870 task = make_task('Waveform orders processed', n_order_groups)
1871 else:
1872 task = None
1874 def split_promise(order):
1875 self._split_nuts(
1876 'waveform_promise',
1877 order.tmin, order.tmax,
1878 codes=order.codes,
1879 path=order.source_id)
1881 def release_order_group(order):
1882 okey = order_key(order)
1883 for followup in order_groups[okey]:
1884 split_promise(followup)
1886 del order_groups[okey]
1888 if task:
1889 task.update(n_order_groups - len(order_groups))
1891 def noop(order):
1892 pass
1894 def success(order):
1895 release_order_group(order)
1896 split_promise(order)
1898 def batch_add(paths):
1899 self.add(paths)
1901 calls = queue.Queue()
1903 def enqueue(f):
1904 def wrapper(*args):
1905 calls.put((f, args))
1907 return wrapper
1909 for order in orders_noop:
1910 split_promise(order)
1912 while order_groups:
1914 orders_now = []
1915 empty = []
1916 for k, order_group in order_groups.items():
1917 try:
1918 orders_now.append(order_group.pop(0))
1919 except IndexError:
1920 empty.append(k)
1922 for k in empty:
1923 del order_groups[k]
1925 by_source_id = defaultdict(list)
1926 for order in orders_now:
1927 by_source_id[order.source_id].append(order)
1929 threads = []
1930 for source_id in by_source_id:
1931 def download():
1932 try:
1933 sources[source_id].download_waveforms(
1934 by_source_id[source_id],
1935 success=enqueue(success),
1936 error_permanent=enqueue(split_promise),
1937 error_temporary=noop,
1938 batch_add=enqueue(batch_add))
1940 finally:
1941 calls.put(None)
1943 thread = threading.Thread(target=download)
1944 thread.start()
1945 threads.append(thread)
1947 ndone = 0
1948 while ndone < len(threads):
1949 ret = calls.get()
1950 if ret is None:
1951 ndone += 1
1952 else:
1953 ret[0](*ret[1])
1955 for thread in threads:
1956 thread.join()
1958 if task:
1959 task.update(n_order_groups - len(order_groups))
1961 if task:
1962 task.done()
1964 @filldocs
1965 def get_waveform_nuts(
1966 self, obj=None, tmin=None, tmax=None, time=None, codes=None):
1968 '''
1969 Get waveform content entities matching given constraints.
1971 %(query_args)s
1973 Like :py:meth:`get_nuts` with ``kind='waveform'`` but additionally
1974 resolves matching waveform promises (downloads waveforms from remote
1975 sources).
1977 See :py:meth:`iter_nuts` for details on time span matching.
1978 '''
1980 args = self._get_selection_args(WAVEFORM, obj, tmin, tmax, time, codes)
1981 self._redeem_promises(*args)
1982 return sorted(
1983 self.iter_nuts('waveform', *args), key=lambda nut: nut.dkey)
1985 @filldocs
1986 def get_waveforms(
1987 self, obj=None, tmin=None, tmax=None, time=None, codes=None,
1988 uncut=False, want_incomplete=True, degap=True, maxgap=5,
1989 maxlap=None, snap=None, include_last=False, load_data=True,
1990 accessor_id='default', operator_params=None):
1992 '''
1993 Get waveforms matching given constraints.
1995 %(query_args)s
1997 :param uncut:
1998 Set to ``True``, to disable cutting traces to [``tmin``, ``tmax``]
1999 and to disable degapping/deoverlapping. Returns untouched traces as
2000 they are read from file segment. File segments are always read in
2001 their entirety.
2002 :type uncut:
2003 bool
2005 :param want_incomplete:
2006 If ``True``, gappy/incomplete traces are included in the result.
2007 :type want_incomplete:
2008 bool
2010 :param degap:
2011 If ``True``, connect traces and remove gaps and overlaps.
2012 :type degap:
2013 bool
2015 :param maxgap:
2016 Maximum gap size in samples which is filled with interpolated
2017 samples when ``degap`` is ``True``.
2018 :type maxgap:
2019 int
2021 :param maxlap:
2022 Maximum overlap size in samples which is removed when ``degap`` is
2023 ``True``.
2024 :type maxlap:
2025 int
2027 :param snap:
2028 Rounding functions used when computing sample index from time
2029 instance, for trace start and trace end, respectively. By default,
2030 ``(round, round)`` is used.
2031 :type snap:
2032 tuple of 2 callables
2034 :param include_last:
2035 If ``True``, add one more sample to the returned traces (the sample
2036 which would be the first sample of a query with ``tmin`` set to the
2037 current value of ``tmax``).
2038 :type include_last:
2039 bool
2041 :param load_data:
2042 If ``True``, waveform data samples are read from files (or cache).
2043 If ``False``, meta-information-only traces are returned (dummy
2044 traces with no data samples).
2045 :type load_data:
2046 bool
2048 :param accessor_id:
2049 Name of consumer on who's behalf data is accessed. Used in cache
2050 management (see :py:mod:`~pyrocko.squirrel.cache`). Used as a key
2051 to distinguish different points of extraction for the decision of
2052 when to release cached waveform data. Should be used when data is
2053 alternately extracted from more than one region / selection.
2054 :type accessor_id:
2055 str
2057 See :py:meth:`iter_nuts` for details on time span matching.
2059 Loaded data is kept in memory (at least) until
2060 :py:meth:`clear_accessor` has been called or
2061 :py:meth:`advance_accessor` has been called two consecutive times
2062 without data being accessed between the two calls (by this accessor).
2063 Data may still be further kept in the memory cache if held alive by
2064 consumers with a different ``accessor_id``.
2065 '''
2067 tmin, tmax, codes = self._get_selection_args(
2068 WAVEFORM, obj, tmin, tmax, time, codes)
2070 self_tmin, self_tmax = self.get_time_span(
2071 ['waveform', 'waveform_promise'])
2073 if None in (self_tmin, self_tmax):
2074 logger.warning(
2075 'No waveforms available.')
2076 return []
2078 tmin = tmin if tmin is not None else self_tmin
2079 tmax = tmax if tmax is not None else self_tmax
2081 if codes is not None:
2082 operator = self.get_operator(codes)
2083 if operator is not None:
2084 return operator.get_waveforms(
2085 self, codes,
2086 tmin=tmin, tmax=tmax,
2087 uncut=uncut, want_incomplete=want_incomplete, degap=degap,
2088 maxgap=maxgap, maxlap=maxlap, snap=snap,
2089 include_last=include_last, load_data=load_data,
2090 accessor_id=accessor_id, params=operator_params)
2092 nuts = self.get_waveform_nuts(obj, tmin, tmax, time, codes)
2094 if load_data:
2095 traces = [
2096 self.get_content(nut, 'waveform', accessor_id) for nut in nuts]
2098 else:
2099 traces = [
2100 trace.Trace(**nut.trace_kwargs) for nut in nuts]
2102 if uncut:
2103 return traces
2105 if snap is None:
2106 snap = (round, round)
2108 chopped = []
2109 for tr in traces:
2110 if not load_data and tr.ydata is not None:
2111 tr = tr.copy(data=False)
2112 tr.ydata = None
2114 try:
2115 chopped.append(tr.chop(
2116 tmin, tmax,
2117 inplace=False,
2118 snap=snap,
2119 include_last=include_last))
2121 except trace.NoData:
2122 pass
2124 processed = self._process_chopped(
2125 chopped, degap, maxgap, maxlap, want_incomplete, tmin, tmax)
2127 return processed
2129 @filldocs
2130 def chopper_waveforms(
2131 self, obj=None, tmin=None, tmax=None, time=None, codes=None,
2132 tinc=None, tpad=0.,
2133 want_incomplete=True, snap_window=False,
2134 degap=True, maxgap=5, maxlap=None,
2135 snap=None, include_last=False, load_data=True,
2136 accessor_id=None, clear_accessor=True, operator_params=None):
2138 '''
2139 Iterate window-wise over waveform archive.
2141 %(query_args)s
2143 :param tinc:
2144 Time increment (window shift time) (default uses ``tmax-tmin``).
2145 :type tinc:
2146 timestamp
2148 :param tpad:
2149 Padding time appended on either side of the data window (window
2150 overlap is ``2*tpad``).
2151 :type tpad:
2152 timestamp
2154 :param want_incomplete:
2155 If ``True``, gappy/incomplete traces are included in the result.
2156 :type want_incomplete:
2157 bool
2159 :param snap_window:
2160 If ``True``, start time windows at multiples of tinc with respect
2161 to system time zero.
2162 :type snap_window:
2163 bool
2165 :param degap:
2166 If ``True``, connect traces and remove gaps and overlaps.
2167 :type degap:
2168 bool
2170 :param maxgap:
2171 Maximum gap size in samples which is filled with interpolated
2172 samples when ``degap`` is ``True``.
2173 :type maxgap:
2174 int
2176 :param maxlap:
2177 Maximum overlap size in samples which is removed when ``degap`` is
2178 ``True``.
2179 :type maxlap:
2180 int
2182 :param snap:
2183 Rounding functions used when computing sample index from time
2184 instance, for trace start and trace end, respectively. By default,
2185 ``(round, round)`` is used.
2186 :type snap:
2187 tuple of 2 callables
2189 :param include_last:
2190 If ``True``, add one more sample to the returned traces (the sample
2191 which would be the first sample of a query with ``tmin`` set to the
2192 current value of ``tmax``).
2193 :type include_last:
2194 bool
2196 :param load_data:
2197 If ``True``, waveform data samples are read from files (or cache).
2198 If ``False``, meta-information-only traces are returned (dummy
2199 traces with no data samples).
2200 :type load_data:
2201 bool
2203 :param accessor_id:
2204 Name of consumer on who's behalf data is accessed. Used in cache
2205 management (see :py:mod:`~pyrocko.squirrel.cache`). Used as a key
2206 to distinguish different points of extraction for the decision of
2207 when to release cached waveform data. Should be used when data is
2208 alternately extracted from more than one region / selection.
2209 :type accessor_id:
2210 str
2212 :param clear_accessor:
2213 If ``True`` (default), :py:meth:`clear_accessor` is called when the
2214 chopper finishes. Set to ``False`` to keep loaded waveforms in
2215 memory when the generator returns.
2216 :type clear_accessor:
2217 bool
2219 :yields:
2220 A list of :py:class:`~pyrocko.trace.Trace` objects for every
2221 extracted time window.
2223 See :py:meth:`iter_nuts` for details on time span matching.
2224 '''
2226 tmin, tmax, codes = self._get_selection_args(
2227 WAVEFORM, obj, tmin, tmax, time, codes)
2229 self_tmin, self_tmax = self.get_time_span(
2230 ['waveform', 'waveform_promise'])
2232 if None in (self_tmin, self_tmax):
2233 logger.warning(
2234 'Content has undefined time span. No waveforms and no '
2235 'waveform promises?')
2236 return
2238 if snap_window and tinc is not None:
2239 tmin = tmin if tmin is not None else self_tmin
2240 tmax = tmax if tmax is not None else self_tmax
2241 tmin = math.floor(tmin / tinc) * tinc
2242 tmax = math.ceil(tmax / tinc) * tinc
2243 else:
2244 tmin = tmin if tmin is not None else self_tmin + tpad
2245 tmax = tmax if tmax is not None else self_tmax - tpad
2247 tinc = tinc if tinc is not None else tmax - tmin
2249 try:
2250 if accessor_id is None:
2251 accessor_id = 'chopper%i' % self._n_choppers_active
2253 self._n_choppers_active += 1
2255 eps = tinc * 1e-6
2256 if tinc != 0.0:
2257 nwin = int(((tmax - eps) - tmin) / tinc) + 1
2258 else:
2259 nwin = 1
2261 for iwin in range(nwin):
2262 wmin, wmax = tmin+iwin*tinc, min(tmin+(iwin+1)*tinc, tmax)
2264 chopped = self.get_waveforms(
2265 tmin=wmin-tpad,
2266 tmax=wmax+tpad,
2267 codes=codes,
2268 snap=snap,
2269 include_last=include_last,
2270 load_data=load_data,
2271 want_incomplete=want_incomplete,
2272 degap=degap,
2273 maxgap=maxgap,
2274 maxlap=maxlap,
2275 accessor_id=accessor_id,
2276 operator_params=operator_params)
2278 self.advance_accessor(accessor_id)
2280 yield Batch(
2281 tmin=wmin,
2282 tmax=wmax,
2283 i=iwin,
2284 n=nwin,
2285 traces=chopped)
2287 iwin += 1
2289 finally:
2290 self._n_choppers_active -= 1
2291 if clear_accessor:
2292 self.clear_accessor(accessor_id, 'waveform')
2294 def _process_chopped(
2295 self, chopped, degap, maxgap, maxlap, want_incomplete, tmin, tmax):
2297 chopped.sort(key=lambda a: a.full_id)
2298 if degap:
2299 chopped = trace.degapper(chopped, maxgap=maxgap, maxlap=maxlap)
2301 if not want_incomplete:
2302 chopped_weeded = []
2303 for tr in chopped:
2304 emin = tr.tmin - tmin
2305 emax = tr.tmax + tr.deltat - tmax
2306 if (abs(emin) <= 0.5*tr.deltat and abs(emax) <= 0.5*tr.deltat):
2307 chopped_weeded.append(tr)
2309 elif degap:
2310 if (0. < emin <= 5. * tr.deltat
2311 and -5. * tr.deltat <= emax < 0.):
2313 tr.extend(tmin, tmax-tr.deltat, fillmethod='repeat')
2314 chopped_weeded.append(tr)
2316 chopped = chopped_weeded
2318 return chopped
2320 def _get_pyrocko_stations(
2321 self, obj=None, tmin=None, tmax=None, time=None, codes=None):
2323 from pyrocko import model as pmodel
2325 by_nsl = defaultdict(lambda: (list(), list()))
2326 for station in self.get_stations(obj, tmin, tmax, time, codes):
2327 sargs = station._get_pyrocko_station_args()
2328 by_nsl[station.codes.nsl][0].append(sargs)
2330 for channel in self.get_channels(obj, tmin, tmax, time, codes):
2331 sargs = channel._get_pyrocko_station_args()
2332 sargs_list, channels_list = by_nsl[channel.codes.nsl]
2333 sargs_list.append(sargs)
2334 channels_list.append(channel)
2336 pstations = []
2337 nsls = list(by_nsl.keys())
2338 nsls.sort()
2339 for nsl in nsls:
2340 sargs_list, channels_list = by_nsl[nsl]
2341 sargs = util.consistency_merge(
2342 [('',) + x for x in sargs_list])
2344 by_c = defaultdict(list)
2345 for ch in channels_list:
2346 by_c[ch.codes.channel].append(ch._get_pyrocko_channel_args())
2348 chas = list(by_c.keys())
2349 chas.sort()
2350 pchannels = []
2351 for cha in chas:
2352 list_of_cargs = by_c[cha]
2353 cargs = util.consistency_merge(
2354 [('',) + x for x in list_of_cargs])
2355 pchannels.append(pmodel.Channel(*cargs))
2357 pstations.append(
2358 pmodel.Station(*sargs, channels=pchannels))
2360 return pstations
2362 @property
2363 def pile(self):
2365 '''
2366 Emulates the older :py:class:`pyrocko.pile.Pile` interface.
2368 This property exposes a :py:class:`pyrocko.squirrel.pile.Pile` object,
2369 which emulates most of the older :py:class:`pyrocko.pile.Pile` methods
2370 but uses the fluffy power of the Squirrel under the hood.
2372 This interface can be used as a drop-in replacement for piles which are
2373 used in existing scripts and programs for efficient waveform data
2374 access. The Squirrel-based pile scales better for large datasets. Newer
2375 scripts should use Squirrel's native methods to avoid the emulation
2376 overhead.
2377 '''
2378 from . import pile
2380 if self._pile is None:
2381 self._pile = pile.Pile(self)
2383 return self._pile
2385 def snuffle(self):
2386 '''
2387 Look at dataset in Snuffler.
2388 '''
2389 self.pile.snuffle()
2391 def _gather_codes_keys(self, kind, gather, selector):
2392 return set(
2393 gather(codes)
2394 for codes in self.iter_codes(kind)
2395 if selector is None or selector(codes))
2397 def __str__(self):
2398 return str(self.get_stats())
2400 def get_coverage(
2401 self, kind, tmin=None, tmax=None, codes_list=None, limit=None):
2403 '''
2404 Get coverage information.
2406 Get information about strips of gapless data coverage.
2408 :param kind:
2409 Content kind to be queried.
2410 :type kind:
2411 str
2413 :param tmin:
2414 Start time of query interval.
2415 :type tmin:
2416 timestamp
2418 :param tmax:
2419 End time of query interval.
2420 :type tmax:
2421 timestamp
2423 :param codes_list:
2424 If given, restrict query to given content codes patterns.
2425 :type codes_list:
2426 :py:class:`list` of :py:class:`Codes` objects appropriate for the
2427 queried content type, or anything which can be converted to
2428 such objects.
2430 :param limit:
2431 Limit query to return only up to a given maximum number of entries
2432 per matching time series (without setting this option, very gappy
2433 data could cause the query to execute for a very long time).
2434 :type limit:
2435 int
2437 :returns:
2438 Information about time spans covered by the requested time series
2439 data.
2440 :rtype:
2441 :py:class:`list` of :py:class:`Coverage` objects
2442 '''
2444 tmin_seconds, tmin_offset = model.tsplit(tmin)
2445 tmax_seconds, tmax_offset = model.tsplit(tmax)
2446 kind_id = to_kind_id(kind)
2448 codes_info = list(self._iter_codes_info(kind=kind))
2450 kdata_all = []
2451 if codes_list is None:
2452 for _, codes, deltat, kind_codes_id, _ in codes_info:
2453 kdata_all.append((codes, kind_codes_id, codes, deltat))
2455 else:
2456 for pattern in codes_list:
2457 pattern = to_codes(kind_id, pattern)
2458 for _, codes, deltat, kind_codes_id, _ in codes_info:
2459 if model.match_codes(pattern, codes):
2460 kdata_all.append(
2461 (pattern, kind_codes_id, codes, deltat))
2463 kind_codes_ids = [x[1] for x in kdata_all]
2465 counts_at_tmin = {}
2466 if tmin is not None:
2467 for nut in self.iter_nuts(
2468 kind, tmin, tmin, kind_codes_ids=kind_codes_ids):
2470 k = nut.codes, nut.deltat
2471 if k not in counts_at_tmin:
2472 counts_at_tmin[k] = 0
2474 counts_at_tmin[k] += 1
2476 coverages = []
2477 for pattern, kind_codes_id, codes, deltat in kdata_all:
2478 entry = [pattern, codes, deltat, None, None, []]
2479 for i, order in [(0, 'ASC'), (1, 'DESC')]:
2480 sql = self._sql('''
2481 SELECT
2482 time_seconds,
2483 time_offset
2484 FROM %(db)s.%(coverage)s
2485 WHERE
2486 kind_codes_id == ?
2487 ORDER BY
2488 kind_codes_id ''' + order + ''',
2489 time_seconds ''' + order + ''',
2490 time_offset ''' + order + '''
2491 LIMIT 1
2492 ''')
2494 for row in self._conn.execute(sql, [kind_codes_id]):
2495 entry[3+i] = model.tjoin(row[0], row[1])
2497 if None in entry[3:5]:
2498 continue
2500 args = [kind_codes_id]
2502 sql_time = ''
2503 if tmin is not None:
2504 # intentionally < because (== tmin) is queried from nuts
2505 sql_time += ' AND ( ? < time_seconds ' \
2506 'OR ( ? == time_seconds AND ? < time_offset ) ) '
2507 args.extend([tmin_seconds, tmin_seconds, tmin_offset])
2509 if tmax is not None:
2510 sql_time += ' AND ( time_seconds < ? ' \
2511 'OR ( ? == time_seconds AND time_offset <= ? ) ) '
2512 args.extend([tmax_seconds, tmax_seconds, tmax_offset])
2514 sql_limit = ''
2515 if limit is not None:
2516 sql_limit = ' LIMIT ?'
2517 args.append(limit)
2519 sql = self._sql('''
2520 SELECT
2521 time_seconds,
2522 time_offset,
2523 step
2524 FROM %(db)s.%(coverage)s
2525 WHERE
2526 kind_codes_id == ?
2527 ''' + sql_time + '''
2528 ORDER BY
2529 kind_codes_id,
2530 time_seconds,
2531 time_offset
2532 ''' + sql_limit)
2534 rows = list(self._conn.execute(sql, args))
2536 if limit is not None and len(rows) == limit:
2537 entry[-1] = None
2538 else:
2539 counts = counts_at_tmin.get((codes, deltat), 0)
2540 tlast = None
2541 if tmin is not None:
2542 entry[-1].append((tmin, counts))
2543 tlast = tmin
2545 for row in rows:
2546 t = model.tjoin(row[0], row[1])
2547 counts += row[2]
2548 entry[-1].append((t, counts))
2549 tlast = t
2551 if tmax is not None and (tlast is None or tlast != tmax):
2552 entry[-1].append((tmax, counts))
2554 coverages.append(model.Coverage.from_values(entry + [kind_id]))
2556 return coverages
2558 def add_operator(self, op):
2559 self._operators.append(op)
2561 def update_operator_mappings(self):
2562 available = self.get_codes(kind=('channel'))
2564 for operator in self._operators:
2565 operator.update_mappings(available, self._operator_registry)
2567 def iter_operator_mappings(self):
2568 for operator in self._operators:
2569 for in_codes, out_codes in operator.iter_mappings():
2570 yield operator, in_codes, out_codes
2572 def get_operator_mappings(self):
2573 return list(self.iter_operator_mappings())
2575 def get_operator(self, codes):
2576 try:
2577 return self._operator_registry[codes][0]
2578 except KeyError:
2579 return None
2581 def get_operator_group(self, codes):
2582 try:
2583 return self._operator_registry[codes]
2584 except KeyError:
2585 return None, (None, None, None)
2587 def iter_operator_codes(self):
2588 for _, _, out_codes in self.iter_operator_mappings():
2589 for codes in out_codes:
2590 yield codes
2592 def get_operator_codes(self):
2593 return list(self.iter_operator_codes())
2595 def print_tables(self, table_names=None, stream=None):
2596 '''
2597 Dump raw database tables in textual form (for debugging purposes).
2599 :param table_names:
2600 Names of tables to be dumped or ``None`` to dump all.
2601 :type table_names:
2602 :py:class:`list` of :py:class:`str`
2604 :param stream:
2605 Open file or ``None`` to dump to standard output.
2606 '''
2608 if stream is None:
2609 stream = sys.stdout
2611 if isinstance(table_names, str):
2612 table_names = [table_names]
2614 if table_names is None:
2615 table_names = [
2616 'selection_file_states',
2617 'selection_nuts',
2618 'selection_kind_codes_count',
2619 'files', 'nuts', 'kind_codes', 'kind_codes_count']
2621 m = {
2622 'selection_file_states': '%(db)s.%(file_states)s',
2623 'selection_nuts': '%(db)s.%(nuts)s',
2624 'selection_kind_codes_count': '%(db)s.%(kind_codes_count)s',
2625 'files': 'files',
2626 'nuts': 'nuts',
2627 'kind_codes': 'kind_codes',
2628 'kind_codes_count': 'kind_codes_count'}
2630 for table_name in table_names:
2631 self._database.print_table(
2632 m[table_name] % self._names, stream=stream)
2635class SquirrelStats(Object):
2636 '''
2637 Container to hold statistics about contents available from a Squirrel.
2639 See also :py:meth:`Squirrel.get_stats`.
2640 '''
2642 nfiles = Int.T(
2643 help='Number of files in selection.')
2644 nnuts = Int.T(
2645 help='Number of index nuts in selection.')
2646 codes = List.T(
2647 Tuple.T(content_t=String.T()),
2648 help='Available code sequences in selection, e.g. '
2649 '(agency, network, station, location) for stations nuts.')
2650 kinds = List.T(
2651 String.T(),
2652 help='Available content types in selection.')
2653 total_size = Int.T(
2654 help='Aggregated file size of files is selection.')
2655 counts = Dict.T(
2656 String.T(), Dict.T(Tuple.T(content_t=String.T()), Int.T()),
2657 help='Breakdown of how many nuts of any content type and code '
2658 'sequence are available in selection, ``counts[kind][codes]``.')
2659 time_spans = Dict.T(
2660 String.T(), Tuple.T(content_t=Timestamp.T()),
2661 help='Time spans by content type.')
2662 sources = List.T(
2663 String.T(),
2664 help='Descriptions of attached sources.')
2665 operators = List.T(
2666 String.T(),
2667 help='Descriptions of attached operators.')
2669 def __str__(self):
2670 kind_counts = dict(
2671 (kind, sum(self.counts[kind].values())) for kind in self.kinds)
2673 scodes = model.codes_to_str_abbreviated(self.codes)
2675 ssources = '<none>' if not self.sources else '\n' + '\n'.join(
2676 ' ' + s for s in self.sources)
2678 soperators = '<none>' if not self.operators else '\n' + '\n'.join(
2679 ' ' + s for s in self.operators)
2681 def stime(t):
2682 return util.tts(t) if t is not None and t not in (
2683 model.g_tmin, model.g_tmax) else '<none>'
2685 def stable(rows):
2686 ns = [max(len(w) for w in col) for col in zip(*rows)]
2687 return '\n'.join(
2688 ' '.join(w.ljust(n) for n, w in zip(ns, row))
2689 for row in rows)
2691 def indent(s):
2692 return '\n'.join(' '+line for line in s.splitlines())
2694 stspans = '<none>' if not self.kinds else '\n' + indent(stable([(
2695 kind + ':',
2696 str(kind_counts[kind]),
2697 stime(self.time_spans[kind][0]),
2698 '-',
2699 stime(self.time_spans[kind][1])) for kind in sorted(self.kinds)]))
2701 s = '''
2702Number of files: %i
2703Total size of known files: %s
2704Number of index nuts: %i
2705Available content kinds: %s
2706Available codes: %s
2707Sources: %s
2708Operators: %s''' % (
2709 self.nfiles,
2710 util.human_bytesize(self.total_size),
2711 self.nnuts,
2712 stspans, scodes, ssources, soperators)
2714 return s.lstrip()
2717__all__ = [
2718 'Squirrel',
2719 'SquirrelStats',
2720]