1# http://pyrocko.org - GPLv3
2#
3# The Pyrocko Developers, 21st Century
4# ---|P------/S----------~Lg----------
6from __future__ import absolute_import, print_function
8import sys
9import os
11import math
12import logging
13import threading
14import queue
15from collections import defaultdict
17from pyrocko.guts import Object, Int, List, Tuple, String, Timestamp, Dict
18from pyrocko import util, trace
19from pyrocko.progress import progress
21from . import model, io, cache, dataset
23from .model import to_kind_id, WaveformOrder, to_kind, to_codes, \
24 STATION, CHANNEL, RESPONSE, EVENT, WAVEFORM
25from .client import fdsn, catalog
26from .selection import Selection, filldocs
27from .database import abspath
28from . import client, environment, error
30logger = logging.getLogger('psq.base')
32guts_prefix = 'squirrel'
35def make_task(*args):
36 return progress.task(*args, logger=logger)
39def lpick(condition, seq):
40 ft = [], []
41 for ele in seq:
42 ft[int(bool(condition(ele)))].append(ele)
44 return ft
47def codes_patterns_for_kind(kind_id, codes):
48 if isinstance(codes, list):
49 lcodes = []
50 for sc in codes:
51 lcodes.extend(codes_patterns_for_kind(kind_id, sc))
53 return lcodes
55 codes = to_codes(kind_id, codes)
57 if kind_id == model.STATION:
58 return [codes, codes.replace(location='[*]')]
59 else:
60 return [codes]
63def blocks(tmin, tmax, deltat, nsamples_block=100000):
64 tblock = util.to_time_float(deltat * nsamples_block)
65 iblock_min = int(math.floor(tmin / tblock))
66 iblock_max = int(math.ceil(tmax / tblock))
67 for iblock in range(iblock_min, iblock_max):
68 yield iblock * tblock, (iblock+1) * tblock
71def gaps(avail, tmin, tmax):
72 assert tmin < tmax
74 data = [(tmax, 1), (tmin, -1)]
75 for (tmin_a, tmax_a) in avail:
76 assert tmin_a < tmax_a
77 data.append((tmin_a, 1))
78 data.append((tmax_a, -1))
80 data.sort()
81 s = 1
82 gaps = []
83 tmin_g = None
84 for t, x in data:
85 if s == 1 and x == -1:
86 tmin_g = t
87 elif s == 0 and x == 1 and tmin_g is not None:
88 tmax_g = t
89 if tmin_g != tmax_g:
90 gaps.append((tmin_g, tmax_g))
92 s += x
94 return gaps
97def order_key(order):
98 return (order.codes, order.tmin, order.tmax)
101class Batch(object):
102 '''
103 Batch of waveforms from window-wise data extraction.
105 Encapsulates state and results yielded for each window in window-wise
106 waveform extraction with the :py:meth:`Squirrel.chopper_waveforms` method.
108 *Attributes:*
110 .. py:attribute:: tmin
112 Start of this time window.
114 .. py:attribute:: tmax
116 End of this time window.
118 .. py:attribute:: i
120 Index of this time window in sequence.
122 .. py:attribute:: n
124 Total number of time windows in sequence.
126 .. py:attribute:: traces
128 Extracted waveforms for this time window.
129 '''
131 def __init__(self, tmin, tmax, i, n, traces):
132 self.tmin = tmin
133 self.tmax = tmax
134 self.i = i
135 self.n = n
136 self.traces = traces
139class Squirrel(Selection):
140 '''
141 Prompt, lazy, indexing, caching, dynamic seismological dataset access.
143 :param env:
144 Squirrel environment instance or directory path to use as starting
145 point for its detection. By default, the current directory is used as
146 starting point. When searching for a usable environment the directory
147 ``'.squirrel'`` or ``'squirrel'`` in the current (or starting point)
148 directory is used if it exists, otherwise the parent directories are
149 search upwards for the existence of such a directory. If no such
150 directory is found, the user's global Squirrel environment
151 ``'$HOME/.pyrocko/squirrel'`` is used.
152 :type env:
153 :py:class:`~pyrocko.squirrel.environment.Environment` or
154 :py:class:`str`
156 :param database:
157 Database instance or path to database. By default the
158 database found in the detected Squirrel environment is used.
159 :type database:
160 :py:class:`~pyrocko.squirrel.database.Database` or :py:class:`str`
162 :param cache_path:
163 Directory path to use for data caching. By default, the ``'cache'``
164 directory in the detected Squirrel environment is used.
165 :type cache_path:
166 :py:class:`str`
168 :param persistent:
169 If given a name, create a persistent selection.
170 :type persistent:
171 :py:class:`str`
173 This is the central class of the Squirrel framework. It provides a unified
174 interface to query and access seismic waveforms, station meta-data and
175 event information from local file collections and remote data sources. For
176 prompt responses, a profound database setup is used under the hood. To
177 speed up assemblage of ad-hoc data selections, files are indexed on first
178 use and the extracted meta-data is remembered in the database for
179 subsequent accesses. Bulk data is lazily loaded from disk and remote
180 sources, just when requested. Once loaded, data is cached in memory to
181 expedite typical access patterns. Files and data sources can be dynamically
182 added to and removed from the Squirrel selection at runtime.
184 Queries are restricted to the contents of the files currently added to the
185 Squirrel selection (usually a subset of the file meta-information
186 collection in the database). This list of files is referred to here as the
187 "selection". By default, temporary tables are created in the attached
188 database to hold the names of the files in the selection as well as various
189 indices and counters. These tables are only visible inside the application
190 which created them and are deleted when the database connection is closed
191 or the application exits. To create a selection which is not deleted at
192 exit, supply a name to the ``persistent`` argument of the Squirrel
193 constructor. Persistent selections are shared among applications using the
194 same database.
196 **Method summary**
198 Some of the methods are implemented in :py:class:`Squirrel`'s base class
199 :py:class:`~pyrocko.squirrel.selection.Selection`.
201 .. autosummary::
203 ~Squirrel.add
204 ~Squirrel.add_source
205 ~Squirrel.add_fdsn
206 ~Squirrel.add_catalog
207 ~Squirrel.add_dataset
208 ~Squirrel.add_virtual
209 ~Squirrel.update
210 ~Squirrel.update_waveform_promises
211 ~Squirrel.advance_accessor
212 ~Squirrel.clear_accessor
213 ~Squirrel.reload
214 ~pyrocko.squirrel.selection.Selection.iter_paths
215 ~Squirrel.iter_nuts
216 ~Squirrel.iter_kinds
217 ~Squirrel.iter_deltats
218 ~Squirrel.iter_codes
219 ~pyrocko.squirrel.selection.Selection.get_paths
220 ~Squirrel.get_nuts
221 ~Squirrel.get_kinds
222 ~Squirrel.get_deltats
223 ~Squirrel.get_codes
224 ~Squirrel.get_counts
225 ~Squirrel.get_time_span
226 ~Squirrel.get_deltat_span
227 ~Squirrel.get_nfiles
228 ~Squirrel.get_nnuts
229 ~Squirrel.get_total_size
230 ~Squirrel.get_stats
231 ~Squirrel.get_content
232 ~Squirrel.get_stations
233 ~Squirrel.get_channels
234 ~Squirrel.get_responses
235 ~Squirrel.get_events
236 ~Squirrel.get_waveform_nuts
237 ~Squirrel.get_waveforms
238 ~Squirrel.chopper_waveforms
239 ~Squirrel.get_coverage
240 ~Squirrel.pile
241 ~Squirrel.snuffle
242 ~Squirrel.glob_codes
243 ~pyrocko.squirrel.selection.Selection.get_database
244 ~Squirrel.print_tables
245 '''
247 def __init__(
248 self, env=None, database=None, cache_path=None, persistent=None):
250 if not isinstance(env, environment.Environment):
251 env = environment.get_environment(env)
253 if database is None:
254 database = env.expand_path(env.database_path)
256 if cache_path is None:
257 cache_path = env.expand_path(env.cache_path)
259 if persistent is None:
260 persistent = env.persistent
262 Selection.__init__(
263 self, database=database, persistent=persistent)
265 self.get_database().set_basepath(os.path.dirname(env.get_basepath()))
267 self._content_caches = {
268 'waveform': cache.ContentCache(),
269 'default': cache.ContentCache()}
271 self._cache_path = cache_path
273 self._sources = []
274 self._operators = []
275 self._operator_registry = {}
277 self._pile = None
278 self._n_choppers_active = 0
280 self._names.update({
281 'nuts': self.name + '_nuts',
282 'kind_codes_count': self.name + '_kind_codes_count',
283 'coverage': self.name + '_coverage'})
285 with self.transaction('create tables') as cursor:
286 self._create_tables_squirrel(cursor)
288 def _create_tables_squirrel(self, cursor):
290 cursor.execute(self._register_table(self._sql(
291 '''
292 CREATE TABLE IF NOT EXISTS %(db)s.%(nuts)s (
293 nut_id integer PRIMARY KEY,
294 file_id integer,
295 file_segment integer,
296 file_element integer,
297 kind_id integer,
298 kind_codes_id integer,
299 tmin_seconds integer,
300 tmin_offset integer,
301 tmax_seconds integer,
302 tmax_offset integer,
303 kscale integer)
304 ''')))
306 cursor.execute(self._register_table(self._sql(
307 '''
308 CREATE TABLE IF NOT EXISTS %(db)s.%(kind_codes_count)s (
309 kind_codes_id integer PRIMARY KEY,
310 count integer)
311 ''')))
313 cursor.execute(self._sql(
314 '''
315 CREATE UNIQUE INDEX IF NOT EXISTS %(db)s.%(nuts)s_file_element
316 ON %(nuts)s (file_id, file_segment, file_element)
317 '''))
319 cursor.execute(self._sql(
320 '''
321 CREATE INDEX IF NOT EXISTS %(db)s.%(nuts)s_index_file_id
322 ON %(nuts)s (file_id)
323 '''))
325 cursor.execute(self._sql(
326 '''
327 CREATE INDEX IF NOT EXISTS %(db)s.%(nuts)s_index_tmin_seconds
328 ON %(nuts)s (kind_id, tmin_seconds)
329 '''))
331 cursor.execute(self._sql(
332 '''
333 CREATE INDEX IF NOT EXISTS %(db)s.%(nuts)s_index_tmax_seconds
334 ON %(nuts)s (kind_id, tmax_seconds)
335 '''))
337 cursor.execute(self._sql(
338 '''
339 CREATE INDEX IF NOT EXISTS %(db)s.%(nuts)s_index_kscale
340 ON %(nuts)s (kind_id, kscale, tmin_seconds)
341 '''))
343 cursor.execute(self._sql(
344 '''
345 CREATE TRIGGER IF NOT EXISTS %(db)s.%(nuts)s_delete_nuts
346 BEFORE DELETE ON main.files FOR EACH ROW
347 BEGIN
348 DELETE FROM %(nuts)s WHERE file_id == old.file_id;
349 END
350 '''))
352 # trigger only on size to make silent update of mtime possible
353 cursor.execute(self._sql(
354 '''
355 CREATE TRIGGER IF NOT EXISTS %(db)s.%(nuts)s_delete_nuts2
356 BEFORE UPDATE OF size ON main.files FOR EACH ROW
357 BEGIN
358 DELETE FROM %(nuts)s WHERE file_id == old.file_id;
359 END
360 '''))
362 cursor.execute(self._sql(
363 '''
364 CREATE TRIGGER IF NOT EXISTS
365 %(db)s.%(file_states)s_delete_files
366 BEFORE DELETE ON %(db)s.%(file_states)s FOR EACH ROW
367 BEGIN
368 DELETE FROM %(nuts)s WHERE file_id == old.file_id;
369 END
370 '''))
372 cursor.execute(self._sql(
373 '''
374 CREATE TRIGGER IF NOT EXISTS %(db)s.%(nuts)s_inc_kind_codes
375 BEFORE INSERT ON %(nuts)s FOR EACH ROW
376 BEGIN
377 INSERT OR IGNORE INTO %(kind_codes_count)s VALUES
378 (new.kind_codes_id, 0);
379 UPDATE %(kind_codes_count)s
380 SET count = count + 1
381 WHERE new.kind_codes_id
382 == %(kind_codes_count)s.kind_codes_id;
383 END
384 '''))
386 cursor.execute(self._sql(
387 '''
388 CREATE TRIGGER IF NOT EXISTS %(db)s.%(nuts)s_dec_kind_codes
389 BEFORE DELETE ON %(nuts)s FOR EACH ROW
390 BEGIN
391 UPDATE %(kind_codes_count)s
392 SET count = count - 1
393 WHERE old.kind_codes_id
394 == %(kind_codes_count)s.kind_codes_id;
395 END
396 '''))
398 cursor.execute(self._register_table(self._sql(
399 '''
400 CREATE TABLE IF NOT EXISTS %(db)s.%(coverage)s (
401 kind_codes_id integer,
402 time_seconds integer,
403 time_offset integer,
404 step integer)
405 ''')))
407 cursor.execute(self._sql(
408 '''
409 CREATE UNIQUE INDEX IF NOT EXISTS %(db)s.%(coverage)s_time
410 ON %(coverage)s (kind_codes_id, time_seconds, time_offset)
411 '''))
413 cursor.execute(self._sql(
414 '''
415 CREATE TRIGGER IF NOT EXISTS %(db)s.%(nuts)s_add_coverage
416 AFTER INSERT ON %(nuts)s FOR EACH ROW
417 BEGIN
418 INSERT OR IGNORE INTO %(coverage)s VALUES
419 (new.kind_codes_id, new.tmin_seconds, new.tmin_offset, 0)
420 ;
421 UPDATE %(coverage)s
422 SET step = step + 1
423 WHERE new.kind_codes_id == %(coverage)s.kind_codes_id
424 AND new.tmin_seconds == %(coverage)s.time_seconds
425 AND new.tmin_offset == %(coverage)s.time_offset
426 ;
427 INSERT OR IGNORE INTO %(coverage)s VALUES
428 (new.kind_codes_id, new.tmax_seconds, new.tmax_offset, 0)
429 ;
430 UPDATE %(coverage)s
431 SET step = step - 1
432 WHERE new.kind_codes_id == %(coverage)s.kind_codes_id
433 AND new.tmax_seconds == %(coverage)s.time_seconds
434 AND new.tmax_offset == %(coverage)s.time_offset
435 ;
436 DELETE FROM %(coverage)s
437 WHERE new.kind_codes_id == %(coverage)s.kind_codes_id
438 AND new.tmin_seconds == %(coverage)s.time_seconds
439 AND new.tmin_offset == %(coverage)s.time_offset
440 AND step == 0
441 ;
442 DELETE FROM %(coverage)s
443 WHERE new.kind_codes_id == %(coverage)s.kind_codes_id
444 AND new.tmax_seconds == %(coverage)s.time_seconds
445 AND new.tmax_offset == %(coverage)s.time_offset
446 AND step == 0
447 ;
448 END
449 '''))
451 cursor.execute(self._sql(
452 '''
453 CREATE TRIGGER IF NOT EXISTS %(db)s.%(nuts)s_remove_coverage
454 BEFORE DELETE ON %(nuts)s FOR EACH ROW
455 BEGIN
456 INSERT OR IGNORE INTO %(coverage)s VALUES
457 (old.kind_codes_id, old.tmin_seconds, old.tmin_offset, 0)
458 ;
459 UPDATE %(coverage)s
460 SET step = step - 1
461 WHERE old.kind_codes_id == %(coverage)s.kind_codes_id
462 AND old.tmin_seconds == %(coverage)s.time_seconds
463 AND old.tmin_offset == %(coverage)s.time_offset
464 ;
465 INSERT OR IGNORE INTO %(coverage)s VALUES
466 (old.kind_codes_id, old.tmax_seconds, old.tmax_offset, 0)
467 ;
468 UPDATE %(coverage)s
469 SET step = step + 1
470 WHERE old.kind_codes_id == %(coverage)s.kind_codes_id
471 AND old.tmax_seconds == %(coverage)s.time_seconds
472 AND old.tmax_offset == %(coverage)s.time_offset
473 ;
474 DELETE FROM %(coverage)s
475 WHERE old.kind_codes_id == %(coverage)s.kind_codes_id
476 AND old.tmin_seconds == %(coverage)s.time_seconds
477 AND old.tmin_offset == %(coverage)s.time_offset
478 AND step == 0
479 ;
480 DELETE FROM %(coverage)s
481 WHERE old.kind_codes_id == %(coverage)s.kind_codes_id
482 AND old.tmax_seconds == %(coverage)s.time_seconds
483 AND old.tmax_offset == %(coverage)s.time_offset
484 AND step == 0
485 ;
486 END
487 '''))
489 def _delete(self):
490 '''Delete database tables associated with this Squirrel.'''
492 with self.transaction('delete tables') as cursor:
493 for s in '''
494 DROP TRIGGER %(db)s.%(nuts)s_delete_nuts;
495 DROP TRIGGER %(db)s.%(nuts)s_delete_nuts2;
496 DROP TRIGGER %(db)s.%(file_states)s_delete_files;
497 DROP TRIGGER %(db)s.%(nuts)s_inc_kind_codes;
498 DROP TRIGGER %(db)s.%(nuts)s_dec_kind_codes;
499 DROP TABLE %(db)s.%(nuts)s;
500 DROP TABLE %(db)s.%(kind_codes_count)s;
501 DROP TRIGGER IF EXISTS %(db)s.%(nuts)s_add_coverage;
502 DROP TRIGGER IF EXISTS %(db)s.%(nuts)s_remove_coverage;
503 DROP TABLE IF EXISTS %(db)s.%(coverage)s;
504 '''.strip().splitlines():
506 cursor.execute(self._sql(s))
508 Selection._delete(self)
510 @filldocs
511 def add(self,
512 paths,
513 kinds=None,
514 format='detect',
515 include=None,
516 exclude=None,
517 check=True):
519 '''
520 Add files to the selection.
522 :param paths:
523 Iterator yielding paths to files or directories to be added to the
524 selection. Recurses into directories. If given a ``str``, it
525 is treated as a single path to be added.
526 :type paths:
527 :py:class:`list` of :py:class:`str`
529 :param kinds:
530 Content types to be made available through the Squirrel selection.
531 By default, all known content types are accepted.
532 :type kinds:
533 :py:class:`list` of :py:class:`str`
535 :param format:
536 File format identifier or ``'detect'`` to enable auto-detection
537 (available: %(file_formats)s).
538 :type format:
539 str
541 :param include:
542 If not ``None``, files are only included if their paths match the
543 given regular expression pattern.
544 :type format:
545 str
547 :param exclude:
548 If not ``None``, files are only included if their paths do not
549 match the given regular expression pattern.
550 :type format:
551 str
553 :param check:
554 If ``True``, all file modification times are checked to see if
555 cached information has to be updated (slow). If ``False``, only
556 previously unknown files are indexed and cached information is used
557 for known files, regardless of file state (fast, corrresponds to
558 Squirrel's ``--optimistic`` mode). File deletions will go
559 undetected in the latter case.
560 :type check:
561 bool
563 :Complexity:
564 O(log N)
565 '''
567 if isinstance(kinds, str):
568 kinds = (kinds,)
570 if isinstance(paths, str):
571 paths = [paths]
573 kind_mask = model.to_kind_mask(kinds)
575 with progress.view():
576 Selection.add(
577 self, util.iter_select_files(
578 paths,
579 show_progress=False,
580 include=include,
581 exclude=exclude,
582 pass_through=lambda path: path.startswith('virtual:')
583 ), kind_mask, format)
585 self._load(check)
586 self._update_nuts()
588 def reload(self):
589 '''
590 Check for modifications and reindex modified files.
592 Based on file modification times.
593 '''
595 self._set_file_states_force_check()
596 self._load(check=True)
597 self._update_nuts()
599 def add_virtual(self, nuts, virtual_paths=None):
600 '''
601 Add content which is not backed by files.
603 :param nuts:
604 Content pieces to be added.
605 :type nuts:
606 iterator yielding :py:class:`~pyrocko.squirrel.model.Nut` objects
608 :param virtual_paths:
609 List of virtual paths to prevent creating a temporary list of the
610 nuts while aggregating the file paths for the selection.
611 :type virtual_paths:
612 :py:class:`list` of :py:class:`str`
614 Stores to the main database and the selection.
615 '''
617 if isinstance(virtual_paths, str):
618 virtual_paths = [virtual_paths]
620 if virtual_paths is None:
621 if not isinstance(nuts, list):
622 nuts = list(nuts)
623 virtual_paths = set(nut.file_path for nut in nuts)
625 Selection.add(self, virtual_paths)
626 self.get_database().dig(nuts)
627 self._update_nuts()
629 def add_volatile(self, nuts):
630 if not isinstance(nuts, list):
631 nuts = list(nuts)
633 paths = list(set(nut.file_path for nut in nuts))
634 io.backends.virtual.add_nuts(nuts)
635 self.add_virtual(nuts, paths)
636 self._volatile_paths.extend(paths)
638 def add_volatile_waveforms(self, traces):
639 '''
640 Add in-memory waveforms which will be removed when the app closes.
641 '''
643 name = model.random_name()
645 path = 'virtual:volatile:%s' % name
647 nuts = []
648 for itr, tr in enumerate(traces):
649 assert tr.tmin <= tr.tmax
650 tmin_seconds, tmin_offset = model.tsplit(tr.tmin)
651 tmax_seconds, tmax_offset = model.tsplit(
652 tr.tmin + tr.data_len()*tr.deltat)
654 nuts.append(model.Nut(
655 file_path=path,
656 file_format='virtual',
657 file_segment=itr,
658 file_element=0,
659 file_mtime=0,
660 codes=tr.codes,
661 tmin_seconds=tmin_seconds,
662 tmin_offset=tmin_offset,
663 tmax_seconds=tmax_seconds,
664 tmax_offset=tmax_offset,
665 deltat=tr.deltat,
666 kind_id=to_kind_id('waveform'),
667 content=tr))
669 self.add_volatile(nuts)
670 return path
672 def _load(self, check):
673 for _ in io.iload(
674 self,
675 content=[],
676 skip_unchanged=True,
677 check=check):
678 pass
680 def _update_nuts(self, transaction=None):
681 transaction = transaction or self.transaction('update nuts')
682 with make_task('Aggregating selection') as task, \
683 transaction as cursor:
685 self._conn.set_progress_handler(task.update, 100000)
686 nrows = cursor.execute(self._sql(
687 '''
688 INSERT INTO %(db)s.%(nuts)s
689 SELECT NULL,
690 nuts.file_id, nuts.file_segment, nuts.file_element,
691 nuts.kind_id, nuts.kind_codes_id,
692 nuts.tmin_seconds, nuts.tmin_offset,
693 nuts.tmax_seconds, nuts.tmax_offset,
694 nuts.kscale
695 FROM %(db)s.%(file_states)s
696 INNER JOIN nuts
697 ON %(db)s.%(file_states)s.file_id == nuts.file_id
698 INNER JOIN kind_codes
699 ON nuts.kind_codes_id ==
700 kind_codes.kind_codes_id
701 WHERE %(db)s.%(file_states)s.file_state != 2
702 AND (((1 << kind_codes.kind_id)
703 & %(db)s.%(file_states)s.kind_mask) != 0)
704 ''')).rowcount
706 task.update(nrows)
707 self._set_file_states_known(transaction)
708 self._conn.set_progress_handler(None, 0)
710 def add_source(self, source, check=True):
711 '''
712 Add remote resource.
714 :param source:
715 Remote data access client instance.
716 :type source:
717 subclass of :py:class:`~pyrocko.squirrel.client.base.Source`
718 '''
720 self._sources.append(source)
721 source.setup(self, check=check)
723 def add_fdsn(self, *args, **kwargs):
724 '''
725 Add FDSN site for transparent remote data access.
727 Arguments are passed to
728 :py:class:`~pyrocko.squirrel.client.fdsn.FDSNSource`.
729 '''
731 self.add_source(fdsn.FDSNSource(*args, **kwargs))
733 def add_catalog(self, *args, **kwargs):
734 '''
735 Add online catalog for transparent event data access.
737 Arguments are passed to
738 :py:class:`~pyrocko.squirrel.client.catalog.CatalogSource`.
739 '''
741 self.add_source(catalog.CatalogSource(*args, **kwargs))
743 def add_dataset(self, ds, check=True, warn_persistent=True):
744 '''
745 Read dataset description from file and add its contents.
747 :param ds:
748 Path to dataset description file or dataset description object
749 . See :py:mod:`~pyrocko.squirrel.dataset`.
750 :type ds:
751 :py:class:`str` or :py:class:`~pyrocko.squirrel.dataset.Dataset`
753 :param check:
754 If ``True``, all file modification times are checked to see if
755 cached information has to be updated (slow). If ``False``, only
756 previously unknown files are indexed and cached information is used
757 for known files, regardless of file state (fast, corrresponds to
758 Squirrel's ``--optimistic`` mode). File deletions will go
759 undetected in the latter case.
760 :type check:
761 bool
762 '''
763 if isinstance(ds, str):
764 ds = dataset.read_dataset(ds)
765 path = ds
766 else:
767 path = None
769 if warn_persistent and ds.persistent and (
770 not self._persistent or (self._persistent != ds.persistent)):
772 logger.warning(
773 'Dataset `persistent` flag ignored. Can not be set on already '
774 'existing Squirrel instance.%s' % (
775 ' Dataset: %s' % path if path else ''))
777 ds.setup(self, check=check)
779 def _get_selection_args(
780 self, kind_id,
781 obj=None, tmin=None, tmax=None, time=None, codes=None):
783 if codes is not None:
784 codes = to_codes(kind_id, codes)
786 if time is not None:
787 tmin = time
788 tmax = time
790 if obj is not None:
791 tmin = tmin if tmin is not None else obj.tmin
792 tmax = tmax if tmax is not None else obj.tmax
793 codes = codes if codes is not None else obj.codes
795 return tmin, tmax, codes
797 def _get_selection_args_str(self, *args, **kwargs):
799 tmin, tmax, codes = self._get_selection_args(*args, **kwargs)
800 return 'tmin: %s, tmax: %s, codes: %s' % (
801 util.time_to_str(tmin) if tmin is not None else 'none',
802 util.time_to_str(tmax) if tmin is not None else 'none',
803 str(codes))
805 def _selection_args_to_kwargs(
806 self, obj=None, tmin=None, tmax=None, time=None, codes=None):
808 return dict(obj=obj, tmin=tmin, tmax=tmax, time=time, codes=codes)
810 def _timerange_sql(self, tmin, tmax, kind, cond, args, naiv):
812 tmin_seconds, tmin_offset = model.tsplit(tmin)
813 tmax_seconds, tmax_offset = model.tsplit(tmax)
814 if naiv:
815 cond.append('%(db)s.%(nuts)s.tmin_seconds <= ?')
816 args.append(tmax_seconds)
817 else:
818 tscale_edges = model.tscale_edges
819 tmin_cond = []
820 for kscale in range(tscale_edges.size + 1):
821 if kscale != tscale_edges.size:
822 tscale = int(tscale_edges[kscale])
823 tmin_cond.append('''
824 (%(db)s.%(nuts)s.kind_id = ?
825 AND %(db)s.%(nuts)s.kscale == ?
826 AND %(db)s.%(nuts)s.tmin_seconds BETWEEN ? AND ?)
827 ''')
828 args.extend(
829 (to_kind_id(kind), kscale,
830 tmin_seconds - tscale - 1, tmax_seconds + 1))
832 else:
833 tmin_cond.append('''
834 (%(db)s.%(nuts)s.kind_id == ?
835 AND %(db)s.%(nuts)s.kscale == ?
836 AND %(db)s.%(nuts)s.tmin_seconds <= ?)
837 ''')
839 args.extend(
840 (to_kind_id(kind), kscale, tmax_seconds + 1))
841 if tmin_cond:
842 cond.append(' ( ' + ' OR '.join(tmin_cond) + ' ) ')
844 cond.append('%(db)s.%(nuts)s.tmax_seconds >= ?')
845 args.append(tmin_seconds)
847 def iter_nuts(
848 self, kind=None, tmin=None, tmax=None, codes=None, naiv=False,
849 kind_codes_ids=None, path=None):
851 '''
852 Iterate over content entities matching given constraints.
854 :param kind:
855 Content kind (or kinds) to extract.
856 :type kind:
857 :py:class:`str`, :py:class:`list` of :py:class:`str`
859 :param tmin:
860 Start time of query interval.
861 :type tmin:
862 timestamp
864 :param tmax:
865 End time of query interval.
866 :type tmax:
867 timestamp
869 :param codes:
870 Pattern of content codes to query.
871 :type codes:
872 :py:class:`tuple` of :py:class:`str`
874 :param naiv:
875 Bypass time span lookup through indices (slow, for testing).
876 :type naiv:
877 :py:class:`bool`
879 :param kind_codes_ids:
880 Kind-codes IDs of contents to be retrieved (internal use).
881 :type kind_codes_ids:
882 :py:class:`list` of :py:class:`int`
884 :yields:
885 :py:class:`~pyrocko.squirrel.model.Nut` objects representing the
886 intersecting content.
888 :complexity:
889 O(log N) for the time selection part due to heavy use of database
890 indices.
892 Query time span is treated as a half-open interval ``[tmin, tmax)``.
893 However, if ``tmin`` equals ``tmax``, the edge logics are modified to
894 closed-interval so that content intersecting with the time instant ``t
895 = tmin = tmax`` is returned (otherwise nothing would be returned as
896 ``[t, t)`` never matches anything).
898 Time spans of content entities to be matched are also treated as half
899 open intervals, e.g. content span ``[0, 1)`` is matched by query span
900 ``[0, 1)`` but not by ``[-1, 0)`` or ``[1, 2)``. Also here, logics are
901 modified to closed-interval when the content time span is an empty
902 interval, i.e. to indicate a time instant. E.g. time instant 0 is
903 matched by ``[0, 1)`` but not by ``[-1, 0)`` or ``[1, 2)``.
904 '''
906 if not isinstance(kind, str):
907 if kind is None:
908 kind = model.g_content_kinds
909 for kind_ in kind:
910 for nut in self.iter_nuts(kind_, tmin, tmax, codes):
911 yield nut
913 return
915 kind_id = to_kind_id(kind)
917 cond = []
918 args = []
919 if tmin is not None or tmax is not None:
920 assert kind is not None
921 if tmin is None:
922 tmin = self.get_time_span()[0]
923 if tmax is None:
924 tmax = self.get_time_span()[1] + 1.0
926 self._timerange_sql(tmin, tmax, kind, cond, args, naiv)
928 cond.append('kind_codes.kind_id == ?')
929 args.append(kind_id)
931 if codes is not None:
932 pats = codes_patterns_for_kind(kind_id, codes)
933 if pats:
934 cond.append(
935 ' ( %s ) ' % ' OR '.join(
936 ('kind_codes.codes GLOB ?',) * len(pats)))
937 args.extend(pat.safe_str for pat in pats)
939 if kind_codes_ids is not None:
940 cond.append(
941 ' ( kind_codes.kind_codes_id IN ( %s ) ) ' % ', '.join(
942 '?'*len(kind_codes_ids)))
944 args.extend(kind_codes_ids)
946 db = self.get_database()
947 if path is not None:
948 cond.append('files.path == ?')
949 args.append(db.relpath(abspath(path)))
951 sql = ('''
952 SELECT
953 files.path,
954 files.format,
955 files.mtime,
956 files.size,
957 %(db)s.%(nuts)s.file_segment,
958 %(db)s.%(nuts)s.file_element,
959 kind_codes.kind_id,
960 kind_codes.codes,
961 %(db)s.%(nuts)s.tmin_seconds,
962 %(db)s.%(nuts)s.tmin_offset,
963 %(db)s.%(nuts)s.tmax_seconds,
964 %(db)s.%(nuts)s.tmax_offset,
965 kind_codes.deltat
966 FROM files
967 INNER JOIN %(db)s.%(nuts)s
968 ON files.file_id == %(db)s.%(nuts)s.file_id
969 INNER JOIN kind_codes
970 ON %(db)s.%(nuts)s.kind_codes_id == kind_codes.kind_codes_id
971 ''')
973 if cond:
974 sql += ''' WHERE ''' + ' AND '.join(cond)
976 sql = self._sql(sql)
977 if tmin is None and tmax is None:
978 for row in self._conn.execute(sql, args):
979 row = (db.abspath(row[0]),) + row[1:]
980 nut = model.Nut(values_nocheck=row)
981 yield nut
982 else:
983 assert tmin is not None and tmax is not None
984 if tmin == tmax:
985 for row in self._conn.execute(sql, args):
986 row = (db.abspath(row[0]),) + row[1:]
987 nut = model.Nut(values_nocheck=row)
988 if (nut.tmin <= tmin < nut.tmax) \
989 or (nut.tmin == nut.tmax and tmin == nut.tmin):
991 yield nut
992 else:
993 for row in self._conn.execute(sql, args):
994 row = (db.abspath(row[0]),) + row[1:]
995 nut = model.Nut(values_nocheck=row)
996 if (tmin < nut.tmax and nut.tmin < tmax) \
997 or (nut.tmin == nut.tmax
998 and tmin <= nut.tmin < tmax):
1000 yield nut
1002 def get_nuts(self, *args, **kwargs):
1003 '''
1004 Get content entities matching given constraints.
1006 Like :py:meth:`iter_nuts` but returns results as a list.
1007 '''
1009 return list(self.iter_nuts(*args, **kwargs))
1011 def _split_nuts(
1012 self, kind, tmin=None, tmax=None, codes=None, path=None):
1014 kind_id = to_kind_id(kind)
1015 tmin_seconds, tmin_offset = model.tsplit(tmin)
1016 tmax_seconds, tmax_offset = model.tsplit(tmax)
1018 names_main_nuts = dict(self._names)
1019 names_main_nuts.update(db='main', nuts='nuts')
1021 db = self.get_database()
1023 def main_nuts(s):
1024 return s % names_main_nuts
1026 with self.transaction('split nuts') as cursor:
1027 # modify selection and main
1028 for sql_subst in [
1029 self._sql, main_nuts]:
1031 cond = []
1032 args = []
1034 self._timerange_sql(tmin, tmax, kind, cond, args, False)
1036 if codes is not None:
1037 pats = codes_patterns_for_kind(kind_id, codes)
1038 if pats:
1039 cond.append(
1040 ' ( %s ) ' % ' OR '.join(
1041 ('kind_codes.codes GLOB ?',) * len(pats)))
1042 args.extend(pat.safe_str for pat in pats)
1044 if path is not None:
1045 cond.append('files.path == ?')
1046 args.append(db.relpath(abspath(path)))
1048 sql = sql_subst('''
1049 SELECT
1050 %(db)s.%(nuts)s.nut_id,
1051 %(db)s.%(nuts)s.tmin_seconds,
1052 %(db)s.%(nuts)s.tmin_offset,
1053 %(db)s.%(nuts)s.tmax_seconds,
1054 %(db)s.%(nuts)s.tmax_offset,
1055 kind_codes.deltat
1056 FROM files
1057 INNER JOIN %(db)s.%(nuts)s
1058 ON files.file_id == %(db)s.%(nuts)s.file_id
1059 INNER JOIN kind_codes
1060 ON %(db)s.%(nuts)s.kind_codes_id == kind_codes.kind_codes_id
1061 WHERE ''' + ' AND '.join(cond)) # noqa
1063 insert = []
1064 delete = []
1065 for row in cursor.execute(sql, args):
1066 nut_id, nut_tmin_seconds, nut_tmin_offset, \
1067 nut_tmax_seconds, nut_tmax_offset, nut_deltat = row
1069 nut_tmin = model.tjoin(
1070 nut_tmin_seconds, nut_tmin_offset)
1071 nut_tmax = model.tjoin(
1072 nut_tmax_seconds, nut_tmax_offset)
1074 if nut_tmin < tmax and tmin < nut_tmax:
1075 if nut_tmin < tmin:
1076 insert.append((
1077 nut_tmin_seconds, nut_tmin_offset,
1078 tmin_seconds, tmin_offset,
1079 model.tscale_to_kscale(
1080 tmin_seconds - nut_tmin_seconds),
1081 nut_id))
1083 if tmax < nut_tmax:
1084 insert.append((
1085 tmax_seconds, tmax_offset,
1086 nut_tmax_seconds, nut_tmax_offset,
1087 model.tscale_to_kscale(
1088 nut_tmax_seconds - tmax_seconds),
1089 nut_id))
1091 delete.append((nut_id,))
1093 sql_add = '''
1094 INSERT INTO %(db)s.%(nuts)s (
1095 file_id, file_segment, file_element, kind_id,
1096 kind_codes_id, tmin_seconds, tmin_offset,
1097 tmax_seconds, tmax_offset, kscale )
1098 SELECT
1099 file_id, file_segment, file_element,
1100 kind_id, kind_codes_id, ?, ?, ?, ?, ?
1101 FROM %(db)s.%(nuts)s
1102 WHERE nut_id == ?
1103 '''
1104 cursor.executemany(sql_subst(sql_add), insert)
1106 sql_delete = '''
1107 DELETE FROM %(db)s.%(nuts)s WHERE nut_id == ?
1108 '''
1109 cursor.executemany(sql_subst(sql_delete), delete)
1111 def get_time_span(self, kinds=None):
1112 '''
1113 Get time interval over all content in selection.
1115 :param kinds:
1116 If not ``None``, restrict query to given content kinds.
1117 :type kind:
1118 list of str
1120 :complexity:
1121 O(1), independent of the number of nuts.
1123 :returns:
1124 ``(tmin, tmax)``, combined time interval of queried content kinds.
1125 '''
1127 sql_min = self._sql('''
1128 SELECT MIN(tmin_seconds), MIN(tmin_offset)
1129 FROM %(db)s.%(nuts)s
1130 WHERE kind_id == ?
1131 AND tmin_seconds == (
1132 SELECT MIN(tmin_seconds)
1133 FROM %(db)s.%(nuts)s
1134 WHERE kind_id == ?)
1135 ''')
1137 sql_max = self._sql('''
1138 SELECT MAX(tmax_seconds), MAX(tmax_offset)
1139 FROM %(db)s.%(nuts)s
1140 WHERE kind_id == ?
1141 AND tmax_seconds == (
1142 SELECT MAX(tmax_seconds)
1143 FROM %(db)s.%(nuts)s
1144 WHERE kind_id == ?)
1145 ''')
1147 gtmin = None
1148 gtmax = None
1150 if isinstance(kinds, str):
1151 kinds = [kinds]
1153 if kinds is None:
1154 kind_ids = model.g_content_kind_ids
1155 else:
1156 kind_ids = model.to_kind_ids(kinds)
1158 for kind_id in kind_ids:
1159 for tmin_seconds, tmin_offset in self._conn.execute(
1160 sql_min, (kind_id, kind_id)):
1161 tmin = model.tjoin(tmin_seconds, tmin_offset)
1162 if tmin is not None and (gtmin is None or tmin < gtmin):
1163 gtmin = tmin
1165 for (tmax_seconds, tmax_offset) in self._conn.execute(
1166 sql_max, (kind_id, kind_id)):
1167 tmax = model.tjoin(tmax_seconds, tmax_offset)
1168 if tmax is not None and (gtmax is None or tmax > gtmax):
1169 gtmax = tmax
1171 return gtmin, gtmax
1173 def has(self, kinds):
1174 '''
1175 Check availability of given content kinds.
1177 :param kinds:
1178 Content kinds to query.
1179 :type kind:
1180 list of str
1182 :returns:
1183 ``True`` if any of the queried content kinds is available
1184 in the selection.
1185 '''
1186 self_tmin, self_tmax = self.get_time_span(kinds)
1188 return None not in (self_tmin, self_tmax)
1190 def get_deltat_span(self, kind):
1191 '''
1192 Get min and max sampling interval of all content of given kind.
1194 :param kind:
1195 Content kind
1196 :type kind:
1197 str
1199 :returns: ``(deltat_min, deltat_max)``
1200 '''
1202 deltats = [
1203 deltat for deltat in self.get_deltats(kind)
1204 if deltat is not None]
1206 if deltats:
1207 return min(deltats), max(deltats)
1208 else:
1209 return None, None
1211 def iter_kinds(self, codes=None):
1212 '''
1213 Iterate over content types available in selection.
1215 :param codes:
1216 If given, get kinds only for selected codes identifier.
1217 :type codes:
1218 :py:class:`tuple` of :py:class:`str`
1220 :yields:
1221 Available content kinds as :py:class:`str`.
1223 :complexity:
1224 O(1), independent of number of nuts.
1225 '''
1227 return self._database._iter_kinds(
1228 codes=codes,
1229 kind_codes_count='%(db)s.%(kind_codes_count)s' % self._names)
1231 def iter_deltats(self, kind=None):
1232 '''
1233 Iterate over sampling intervals available in selection.
1235 :param kind:
1236 If given, get sampling intervals only for a given content type.
1237 :type kind:
1238 str
1240 :yields:
1241 :py:class:`float` values.
1243 :complexity:
1244 O(1), independent of number of nuts.
1245 '''
1246 return self._database._iter_deltats(
1247 kind=kind,
1248 kind_codes_count='%(db)s.%(kind_codes_count)s' % self._names)
1250 def iter_codes(self, kind=None):
1251 '''
1252 Iterate over content identifier code sequences available in selection.
1254 :param kind:
1255 If given, get codes only for a given content type.
1256 :type kind:
1257 str
1259 :yields:
1260 :py:class:`tuple` of :py:class:`str`
1262 :complexity:
1263 O(1), independent of number of nuts.
1264 '''
1265 return self._database._iter_codes(
1266 kind=kind,
1267 kind_codes_count='%(db)s.%(kind_codes_count)s' % self._names)
1269 def _iter_codes_info(self, kind=None):
1270 '''
1271 Iterate over number of occurrences of any (kind, codes) combination.
1273 :param kind:
1274 If given, get counts only for selected content type.
1275 :type kind:
1276 str
1278 :yields:
1279 Tuples of the form ``(kind, codes, deltat, kind_codes_id, count)``.
1281 :complexity:
1282 O(1), independent of number of nuts.
1283 '''
1284 return self._database._iter_codes_info(
1285 kind=kind,
1286 kind_codes_count='%(db)s.%(kind_codes_count)s' % self._names)
1288 def get_kinds(self, codes=None):
1289 '''
1290 Get content types available in selection.
1292 :param codes:
1293 If given, get kinds only for selected codes identifier.
1294 :type codes:
1295 :py:class:`tuple` of :py:class:`str`
1297 :returns:
1298 Sorted list of available content types.
1300 :complexity:
1301 O(1), independent of number of nuts.
1303 '''
1304 return sorted(list(self.iter_kinds(codes=codes)))
1306 def get_deltats(self, kind=None):
1307 '''
1308 Get sampling intervals available in selection.
1310 :param kind:
1311 If given, get sampling intervals only for selected content type.
1312 :type kind:
1313 str
1315 :complexity:
1316 O(1), independent of number of nuts.
1318 :returns: Sorted list of available sampling intervals.
1319 '''
1320 return sorted(list(self.iter_deltats(kind=kind)))
1322 def get_codes(self, kind=None):
1323 '''
1324 Get identifier code sequences available in selection.
1326 :param kind:
1327 If given, get codes only for selected content type.
1328 :type kind:
1329 str
1331 :complexity:
1332 O(1), independent of number of nuts.
1334 :returns: Sorted list of available codes as tuples of strings.
1335 '''
1336 return sorted(list(self.iter_codes(kind=kind)))
1338 def get_counts(self, kind=None):
1339 '''
1340 Get number of occurrences of any (kind, codes) combination.
1342 :param kind:
1343 If given, get codes only for selected content type.
1344 :type kind:
1345 str
1347 :complexity:
1348 O(1), independent of number of nuts.
1350 :returns: ``dict`` with ``counts[kind][codes]`` or ``counts[codes]``
1351 if kind is not ``None``
1352 '''
1353 d = {}
1354 for kind_id, codes, _, _, count in self._iter_codes_info(kind=kind):
1355 if kind_id not in d:
1356 v = d[kind_id] = {}
1357 else:
1358 v = d[kind_id]
1360 if codes not in v:
1361 v[codes] = 0
1363 v[codes] += count
1365 if kind is not None:
1366 return d[to_kind_id(kind)]
1367 else:
1368 return dict((to_kind(kind_id), v) for (kind_id, v) in d.items())
1370 def glob_codes(self, kind, codes_list):
1371 '''
1372 Find codes matching given patterns.
1374 :param kind:
1375 Content kind to be queried.
1376 :type kind:
1377 str
1379 :param codes_list:
1380 List of code patterns to query. If not given or empty, an empty
1381 list is returned.
1382 :type codes_list:
1383 :py:class:`list` of :py:class:`tuple` of :py:class:`str`
1385 :returns:
1386 List of matches of the form ``[kind_codes_id, codes, deltat]``.
1387 '''
1389 kind_id = to_kind_id(kind)
1390 args = [kind_id]
1391 pats = []
1392 for codes in codes_list:
1393 pats.extend(codes_patterns_for_kind(kind_id, codes))
1395 if pats:
1396 codes_cond = 'AND ( %s ) ' % ' OR '.join(
1397 ('kind_codes.codes GLOB ?',) * len(pats))
1399 args.extend(pat.safe_str for pat in pats)
1400 else:
1401 codes_cond = ''
1403 sql = self._sql('''
1404 SELECT kind_codes_id, codes, deltat FROM kind_codes
1405 WHERE
1406 kind_id == ? ''' + codes_cond)
1408 return list(map(list, self._conn.execute(sql, args)))
1410 def update(self, constraint=None, **kwargs):
1411 '''
1412 Update or partially update channel and event inventories.
1414 :param constraint:
1415 Selection of times or areas to be brought up to date.
1416 :type constraint:
1417 :py:class:`~pyrocko.squirrel.client.base.Constraint`
1419 :param \\*\\*kwargs:
1420 Shortcut for setting ``constraint=Constraint(**kwargs)``.
1422 This function triggers all attached remote sources, to check for
1423 updates in the meta-data. The sources will only submit queries when
1424 their expiration date has passed, or if the selection spans into
1425 previously unseen times or areas.
1426 '''
1428 if constraint is None:
1429 constraint = client.Constraint(**kwargs)
1431 for source in self._sources:
1432 source.update_channel_inventory(self, constraint)
1433 source.update_event_inventory(self, constraint)
1435 def update_waveform_promises(self, constraint=None, **kwargs):
1436 '''
1437 Permit downloading of remote waveforms.
1439 :param constraint:
1440 Remote waveforms compatible with the given constraint are enabled
1441 for download.
1442 :type constraint:
1443 :py:class:`~pyrocko.squirrel.client.base.Constraint`
1445 :param \\*\\*kwargs:
1446 Shortcut for setting ``constraint=Constraint(**kwargs)``.
1448 Calling this method permits Squirrel to download waveforms from remote
1449 sources when processing subsequent waveform requests. This works by
1450 inserting so called waveform promises into the database. It will look
1451 into the available channels for each remote source and create a promise
1452 for each channel compatible with the given constraint. If the promise
1453 then matches in a waveform request, Squirrel tries to download the
1454 waveform. If the download is successful, the downloaded waveform is
1455 added to the Squirrel and the promise is deleted. If the download
1456 fails, the promise is kept if the reason of failure looks like being
1457 temporary, e.g. because of a network failure. If the cause of failure
1458 however seems to be permanent, the promise is deleted so that no
1459 further attempts are made to download a waveform which might not be
1460 available from that server at all. To force re-scheduling after a
1461 permanent failure, call :py:meth:`update_waveform_promises`
1462 yet another time.
1463 '''
1465 if constraint is None:
1466 constraint = client.Constraint(**kwargs)
1468 for source in self._sources:
1469 source.update_waveform_promises(self, constraint)
1471 def update_responses(self, constraint=None, **kwargs):
1472 if constraint is None:
1473 constraint = client.Constraint(**kwargs)
1475 for source in self._sources:
1476 source.update_response_inventory(self, constraint)
1478 def get_nfiles(self):
1479 '''
1480 Get number of files in selection.
1481 '''
1483 sql = self._sql('''SELECT COUNT(*) FROM %(db)s.%(file_states)s''')
1484 for row in self._conn.execute(sql):
1485 return row[0]
1487 def get_nnuts(self):
1488 '''
1489 Get number of nuts in selection.
1490 '''
1492 sql = self._sql('''SELECT COUNT(*) FROM %(db)s.%(nuts)s''')
1493 for row in self._conn.execute(sql):
1494 return row[0]
1496 def get_total_size(self):
1497 '''
1498 Get aggregated file size available in selection.
1499 '''
1501 sql = self._sql('''
1502 SELECT SUM(files.size) FROM %(db)s.%(file_states)s
1503 INNER JOIN files
1504 ON %(db)s.%(file_states)s.file_id = files.file_id
1505 ''')
1507 for row in self._conn.execute(sql):
1508 return row[0] or 0
1510 def get_stats(self):
1511 '''
1512 Get statistics on contents available through this selection.
1513 '''
1515 kinds = self.get_kinds()
1516 time_spans = {}
1517 for kind in kinds:
1518 time_spans[kind] = self.get_time_span([kind])
1520 return SquirrelStats(
1521 nfiles=self.get_nfiles(),
1522 nnuts=self.get_nnuts(),
1523 kinds=kinds,
1524 codes=self.get_codes(),
1525 total_size=self.get_total_size(),
1526 counts=self.get_counts(),
1527 time_spans=time_spans,
1528 sources=[s.describe() for s in self._sources],
1529 operators=[op.describe() for op in self._operators])
1531 def get_content(
1532 self,
1533 nut,
1534 cache_id='default',
1535 accessor_id='default',
1536 show_progress=False):
1538 '''
1539 Get and possibly load full content for a given index entry from file.
1541 Loads the actual content objects (channel, station, waveform, ...) from
1542 file. For efficiency, sibling content (all stuff in the same file
1543 segment) will also be loaded as a side effect. The loaded contents are
1544 cached in the Squirrel object.
1545 '''
1547 content_cache = self._content_caches[cache_id]
1548 if not content_cache.has(nut):
1550 for nut_loaded in io.iload(
1551 nut.file_path,
1552 segment=nut.file_segment,
1553 format=nut.file_format,
1554 database=self._database,
1555 update_selection=self,
1556 show_progress=show_progress):
1558 content_cache.put(nut_loaded)
1560 try:
1561 return content_cache.get(nut, accessor_id)
1562 except KeyError:
1563 raise error.NotAvailable(
1564 'Unable to retrieve content: %s, %s, %s, %s' % nut.key)
1566 def advance_accessor(self, accessor_id, cache_id=None):
1567 '''
1568 Notify memory caches about consumer moving to a new data batch.
1570 :param accessor_id:
1571 Name of accessing consumer to be advanced.
1572 :type accessor_id:
1573 str
1575 :param cache_id:
1576 Name of cache to for which the accessor should be advanced. By
1577 default the named accessor is advanced in all registered caches.
1578 By default, two caches named ``'default'`` and ``'waveforms'`` are
1579 available.
1580 :type cache_id:
1581 str
1583 See :py:class:`~pyrocko.squirrel.cache.ContentCache` for details on how
1584 Squirrel's memory caching works and can be tuned. Default behaviour is
1585 to release data when it has not been used in the latest data
1586 window/batch. If the accessor is never advanced, data is cached
1587 indefinitely - which is often desired e.g. for station meta-data.
1588 Methods for consecutive data traversal, like
1589 :py:meth:`chopper_waveforms` automatically advance and clear
1590 their accessor.
1591 '''
1592 for cache_ in (
1593 self._content_caches.keys()
1594 if cache_id is None
1595 else [cache_id]):
1597 self._content_caches[cache_].advance_accessor(accessor_id)
1599 def clear_accessor(self, accessor_id, cache_id=None):
1600 '''
1601 Notify memory caches about a consumer having finished.
1603 :param accessor_id:
1604 Name of accessor to be cleared.
1605 :type accessor_id:
1606 str
1608 :param cache_id:
1609 Name of cache for which the accessor should be cleared. By default
1610 the named accessor is cleared from all registered caches. By
1611 default, two caches named ``'default'`` and ``'waveforms'`` are
1612 available.
1613 :type cache_id:
1614 str
1616 Calling this method clears all references to cache entries held by the
1617 named accessor. Cache entries are then freed if not referenced by any
1618 other accessor.
1619 '''
1621 for cache_ in (
1622 self._content_caches.keys()
1623 if cache_id is None
1624 else [cache_id]):
1626 self._content_caches[cache_].clear_accessor(accessor_id)
1628 def get_cache_stats(self, cache_id):
1629 return self._content_caches[cache_id].get_stats()
1631 def _check_duplicates(self, nuts):
1632 d = defaultdict(list)
1633 for nut in nuts:
1634 d[nut.codes].append(nut)
1636 for codes, group in d.items():
1637 if len(group) > 1:
1638 logger.warning(
1639 'Multiple entries matching codes: %s' % str(codes))
1641 @filldocs
1642 def get_stations(
1643 self, obj=None, tmin=None, tmax=None, time=None, codes=None,
1644 model='squirrel'):
1646 '''
1647 Get stations matching given constraints.
1649 %(query_args)s
1651 :param model:
1652 Select object model for returned values: ``'squirrel'`` to get
1653 Squirrel station objects or ``'pyrocko'`` to get Pyrocko station
1654 objects with channel information attached.
1655 :type model:
1656 str
1658 :returns:
1659 List of :py:class:`pyrocko.squirrel.Station
1660 <pyrocko.squirrel.model.Station>` objects by default or list of
1661 :py:class:`pyrocko.model.Station <pyrocko.model.station.Station>`
1662 objects if ``model='pyrocko'`` is requested.
1664 See :py:meth:`iter_nuts` for details on time span matching.
1665 '''
1667 if model == 'pyrocko':
1668 return self._get_pyrocko_stations(obj, tmin, tmax, time, codes)
1669 elif model == 'squirrel':
1670 args = self._get_selection_args(
1671 STATION, obj, tmin, tmax, time, codes)
1673 nuts = sorted(
1674 self.iter_nuts('station', *args), key=lambda nut: nut.dkey)
1675 self._check_duplicates(nuts)
1676 return [self.get_content(nut) for nut in nuts]
1677 else:
1678 raise ValueError('Invalid station model: %s' % model)
1680 @filldocs
1681 def get_channels(
1682 self, obj=None, tmin=None, tmax=None, time=None, codes=None):
1684 '''
1685 Get channels matching given constraints.
1687 %(query_args)s
1689 :returns:
1690 List of :py:class:`~pyrocko.squirrel.model.Channel` objects.
1692 See :py:meth:`iter_nuts` for details on time span matching.
1693 '''
1695 args = self._get_selection_args(
1696 CHANNEL, obj, tmin, tmax, time, codes)
1698 nuts = sorted(
1699 self.iter_nuts('channel', *args), key=lambda nut: nut.dkey)
1700 self._check_duplicates(nuts)
1701 return [self.get_content(nut) for nut in nuts]
1703 @filldocs
1704 def get_sensors(
1705 self, obj=None, tmin=None, tmax=None, time=None, codes=None):
1707 '''
1708 Get sensors matching given constraints.
1710 %(query_args)s
1712 :returns:
1713 List of :py:class:`~pyrocko.squirrel.model.Sensor` objects.
1715 See :py:meth:`iter_nuts` for details on time span matching.
1716 '''
1718 tmin, tmax, codes = self._get_selection_args(
1719 CHANNEL, obj, tmin, tmax, time, codes)
1721 if codes is not None:
1722 if codes.channel != '*':
1723 codes = codes.replace(codes.channel[:-1] + '?')
1725 nuts = sorted(
1726 self.iter_nuts(
1727 'channel', tmin, tmax, codes), key=lambda nut: nut.dkey)
1728 self._check_duplicates(nuts)
1729 return model.Sensor.from_channels(
1730 self.get_content(nut) for nut in nuts)
1732 @filldocs
1733 def get_responses(
1734 self, obj=None, tmin=None, tmax=None, time=None, codes=None):
1736 '''
1737 Get instrument responses matching given constraints.
1739 %(query_args)s
1741 :returns:
1742 List of :py:class:`~pyrocko.squirrel.model.Response` objects.
1744 See :py:meth:`iter_nuts` for details on time span matching.
1745 '''
1747 args = self._get_selection_args(
1748 RESPONSE, obj, tmin, tmax, time, codes)
1750 nuts = sorted(
1751 self.iter_nuts('response', *args), key=lambda nut: nut.dkey)
1752 self._check_duplicates(nuts)
1753 return [self.get_content(nut) for nut in nuts]
1755 @filldocs
1756 def get_response(
1757 self, obj=None, tmin=None, tmax=None, time=None, codes=None):
1759 '''
1760 Get instrument response matching given constraints.
1762 %(query_args)s
1764 :returns:
1765 :py:class:`~pyrocko.squirrel.model.Response` object.
1767 Same as :py:meth:`get_responses` but returning exactly one response.
1768 Raises :py:exc:`~pyrocko.squirrel.error.NotAvailable` if zero or more
1769 than one is available.
1771 See :py:meth:`iter_nuts` for details on time span matching.
1772 '''
1774 responses = self.get_responses(obj, tmin, tmax, time, codes)
1775 if len(responses) == 0:
1776 raise error.NotAvailable(
1777 'No instrument response available (%s).'
1778 % self._get_selection_args_str(
1779 RESPONSE, obj, tmin, tmax, time, codes))
1781 elif len(responses) > 1:
1782 raise error.NotAvailable(
1783 'Multiple instrument responses matching given constraints '
1784 '(%s):\n%s' % (
1785 self._get_selection_args_str(
1786 RESPONSE, obj, tmin, tmax, time, codes),
1787 '\n'.join(
1788 ' ' + resp.summary for resp in responses)))
1790 return responses[0]
1792 @filldocs
1793 def get_events(
1794 self, obj=None, tmin=None, tmax=None, time=None, codes=None):
1796 '''
1797 Get events matching given constraints.
1799 %(query_args)s
1801 :returns:
1802 List of :py:class:`~pyrocko.model.event.Event` objects.
1804 See :py:meth:`iter_nuts` for details on time span matching.
1805 '''
1807 args = self._get_selection_args(EVENT, obj, tmin, tmax, time, codes)
1808 nuts = sorted(
1809 self.iter_nuts('event', *args), key=lambda nut: nut.dkey)
1810 self._check_duplicates(nuts)
1811 return [self.get_content(nut) for nut in nuts]
1813 def _redeem_promises(self, *args):
1815 tmin, tmax, _ = args
1817 waveforms = list(self.iter_nuts('waveform', *args))
1818 promises = list(self.iter_nuts('waveform_promise', *args))
1820 codes_to_avail = defaultdict(list)
1821 for nut in waveforms:
1822 codes_to_avail[nut.codes].append((nut.tmin, nut.tmax))
1824 def tts(x):
1825 if isinstance(x, tuple):
1826 return tuple(tts(e) for e in x)
1827 elif isinstance(x, list):
1828 return list(tts(e) for e in x)
1829 else:
1830 return util.time_to_str(x)
1832 orders = []
1833 for promise in promises:
1834 waveforms_avail = codes_to_avail[promise.codes]
1835 for block_tmin, block_tmax in blocks(
1836 max(tmin, promise.tmin),
1837 min(tmax, promise.tmax),
1838 promise.deltat):
1840 orders.append(
1841 WaveformOrder(
1842 source_id=promise.file_path,
1843 codes=promise.codes,
1844 tmin=block_tmin,
1845 tmax=block_tmax,
1846 deltat=promise.deltat,
1847 gaps=gaps(waveforms_avail, block_tmin, block_tmax)))
1849 orders_noop, orders = lpick(lambda order: order.gaps, orders)
1851 order_keys_noop = set(order_key(order) for order in orders_noop)
1852 if len(order_keys_noop) != 0 or len(orders_noop) != 0:
1853 logger.info(
1854 'Waveform orders already satisified with cached/local data: '
1855 '%i (%i)' % (len(order_keys_noop), len(orders_noop)))
1857 source_ids = []
1858 sources = {}
1859 for source in self._sources:
1860 if isinstance(source, fdsn.FDSNSource):
1861 source_ids.append(source._source_id)
1862 sources[source._source_id] = source
1864 source_priority = dict(
1865 (source_id, i) for (i, source_id) in enumerate(source_ids))
1867 order_groups = defaultdict(list)
1868 for order in orders:
1869 order_groups[order_key(order)].append(order)
1871 for k, order_group in order_groups.items():
1872 order_group.sort(
1873 key=lambda order: source_priority[order.source_id])
1875 n_order_groups = len(order_groups)
1877 if len(order_groups) != 0 or len(orders) != 0:
1878 logger.info(
1879 'Waveform orders standing for download: %i (%i)'
1880 % (len(order_groups), len(orders)))
1882 task = make_task('Waveform orders processed', n_order_groups)
1883 else:
1884 task = None
1886 def split_promise(order):
1887 self._split_nuts(
1888 'waveform_promise',
1889 order.tmin, order.tmax,
1890 codes=order.codes,
1891 path=order.source_id)
1893 def release_order_group(order):
1894 okey = order_key(order)
1895 for followup in order_groups[okey]:
1896 split_promise(followup)
1898 del order_groups[okey]
1900 if task:
1901 task.update(n_order_groups - len(order_groups))
1903 def noop(order):
1904 pass
1906 def success(order):
1907 release_order_group(order)
1908 split_promise(order)
1910 def batch_add(paths):
1911 self.add(paths)
1913 calls = queue.Queue()
1915 def enqueue(f):
1916 def wrapper(*args):
1917 calls.put((f, args))
1919 return wrapper
1921 for order in orders_noop:
1922 split_promise(order)
1924 while order_groups:
1926 orders_now = []
1927 empty = []
1928 for k, order_group in order_groups.items():
1929 try:
1930 orders_now.append(order_group.pop(0))
1931 except IndexError:
1932 empty.append(k)
1934 for k in empty:
1935 del order_groups[k]
1937 by_source_id = defaultdict(list)
1938 for order in orders_now:
1939 by_source_id[order.source_id].append(order)
1941 threads = []
1942 for source_id in by_source_id:
1943 def download():
1944 try:
1945 sources[source_id].download_waveforms(
1946 by_source_id[source_id],
1947 success=enqueue(success),
1948 error_permanent=enqueue(split_promise),
1949 error_temporary=noop,
1950 batch_add=enqueue(batch_add))
1952 finally:
1953 calls.put(None)
1955 thread = threading.Thread(target=download)
1956 thread.start()
1957 threads.append(thread)
1959 ndone = 0
1960 while ndone < len(threads):
1961 ret = calls.get()
1962 if ret is None:
1963 ndone += 1
1964 else:
1965 ret[0](*ret[1])
1967 for thread in threads:
1968 thread.join()
1970 if task:
1971 task.update(n_order_groups - len(order_groups))
1973 if task:
1974 task.done()
1976 @filldocs
1977 def get_waveform_nuts(
1978 self, obj=None, tmin=None, tmax=None, time=None, codes=None):
1980 '''
1981 Get waveform content entities matching given constraints.
1983 %(query_args)s
1985 Like :py:meth:`get_nuts` with ``kind='waveform'`` but additionally
1986 resolves matching waveform promises (downloads waveforms from remote
1987 sources).
1989 See :py:meth:`iter_nuts` for details on time span matching.
1990 '''
1992 args = self._get_selection_args(WAVEFORM, obj, tmin, tmax, time, codes)
1993 self._redeem_promises(*args)
1994 return sorted(
1995 self.iter_nuts('waveform', *args), key=lambda nut: nut.dkey)
1997 @filldocs
1998 def get_waveforms(
1999 self, obj=None, tmin=None, tmax=None, time=None, codes=None,
2000 uncut=False, want_incomplete=True, degap=True, maxgap=5,
2001 maxlap=None, snap=None, include_last=False, load_data=True,
2002 accessor_id='default', operator_params=None):
2004 '''
2005 Get waveforms matching given constraints.
2007 %(query_args)s
2009 :param uncut:
2010 Set to ``True``, to disable cutting traces to [``tmin``, ``tmax``]
2011 and to disable degapping/deoverlapping. Returns untouched traces as
2012 they are read from file segment. File segments are always read in
2013 their entirety.
2014 :type uncut:
2015 bool
2017 :param want_incomplete:
2018 If ``True``, gappy/incomplete traces are included in the result.
2019 :type want_incomplete:
2020 bool
2022 :param degap:
2023 If ``True``, connect traces and remove gaps and overlaps.
2024 :type degap:
2025 bool
2027 :param maxgap:
2028 Maximum gap size in samples which is filled with interpolated
2029 samples when ``degap`` is ``True``.
2030 :type maxgap:
2031 int
2033 :param maxlap:
2034 Maximum overlap size in samples which is removed when ``degap`` is
2035 ``True``.
2036 :type maxlap:
2037 int
2039 :param snap:
2040 Rounding functions used when computing sample index from time
2041 instance, for trace start and trace end, respectively. By default,
2042 ``(round, round)`` is used.
2043 :type snap:
2044 tuple of 2 callables
2046 :param include_last:
2047 If ``True``, add one more sample to the returned traces (the sample
2048 which would be the first sample of a query with ``tmin`` set to the
2049 current value of ``tmax``).
2050 :type include_last:
2051 bool
2053 :param load_data:
2054 If ``True``, waveform data samples are read from files (or cache).
2055 If ``False``, meta-information-only traces are returned (dummy
2056 traces with no data samples).
2057 :type load_data:
2058 bool
2060 :param accessor_id:
2061 Name of consumer on who's behalf data is accessed. Used in cache
2062 management (see :py:mod:`~pyrocko.squirrel.cache`). Used as a key
2063 to distinguish different points of extraction for the decision of
2064 when to release cached waveform data. Should be used when data is
2065 alternately extracted from more than one region / selection.
2066 :type accessor_id:
2067 str
2069 See :py:meth:`iter_nuts` for details on time span matching.
2071 Loaded data is kept in memory (at least) until
2072 :py:meth:`clear_accessor` has been called or
2073 :py:meth:`advance_accessor` has been called two consecutive times
2074 without data being accessed between the two calls (by this accessor).
2075 Data may still be further kept in the memory cache if held alive by
2076 consumers with a different ``accessor_id``.
2077 '''
2079 tmin, tmax, codes = self._get_selection_args(
2080 WAVEFORM, obj, tmin, tmax, time, codes)
2082 self_tmin, self_tmax = self.get_time_span(
2083 ['waveform', 'waveform_promise'])
2085 if None in (self_tmin, self_tmax):
2086 logger.warning(
2087 'No waveforms available.')
2088 return []
2090 tmin = tmin if tmin is not None else self_tmin
2091 tmax = tmax if tmax is not None else self_tmax
2093 if codes is not None:
2094 operator = self.get_operator(codes)
2095 if operator is not None:
2096 return operator.get_waveforms(
2097 self, codes,
2098 tmin=tmin, tmax=tmax,
2099 uncut=uncut, want_incomplete=want_incomplete, degap=degap,
2100 maxgap=maxgap, maxlap=maxlap, snap=snap,
2101 include_last=include_last, load_data=load_data,
2102 accessor_id=accessor_id, params=operator_params)
2104 nuts = self.get_waveform_nuts(obj, tmin, tmax, time, codes)
2106 if load_data:
2107 traces = [
2108 self.get_content(nut, 'waveform', accessor_id) for nut in nuts]
2110 else:
2111 traces = [
2112 trace.Trace(**nut.trace_kwargs) for nut in nuts]
2114 if uncut:
2115 return traces
2117 if snap is None:
2118 snap = (round, round)
2120 chopped = []
2121 for tr in traces:
2122 if not load_data and tr.ydata is not None:
2123 tr = tr.copy(data=False)
2124 tr.ydata = None
2126 try:
2127 chopped.append(tr.chop(
2128 tmin, tmax,
2129 inplace=False,
2130 snap=snap,
2131 include_last=include_last))
2133 except trace.NoData:
2134 pass
2136 processed = self._process_chopped(
2137 chopped, degap, maxgap, maxlap, want_incomplete, tmin, tmax)
2139 return processed
2141 @filldocs
2142 def chopper_waveforms(
2143 self, obj=None, tmin=None, tmax=None, time=None, codes=None,
2144 tinc=None, tpad=0.,
2145 want_incomplete=True, snap_window=False,
2146 degap=True, maxgap=5, maxlap=None,
2147 snap=None, include_last=False, load_data=True,
2148 accessor_id=None, clear_accessor=True, operator_params=None):
2150 '''
2151 Iterate window-wise over waveform archive.
2153 %(query_args)s
2155 :param tinc:
2156 Time increment (window shift time) (default uses ``tmax-tmin``).
2157 :type tinc:
2158 timestamp
2160 :param tpad:
2161 Padding time appended on either side of the data window (window
2162 overlap is ``2*tpad``).
2163 :type tpad:
2164 timestamp
2166 :param want_incomplete:
2167 If ``True``, gappy/incomplete traces are included in the result.
2168 :type want_incomplete:
2169 bool
2171 :param snap_window:
2172 If ``True``, start time windows at multiples of tinc with respect
2173 to system time zero.
2174 :type snap_window:
2175 bool
2177 :param degap:
2178 If ``True``, connect traces and remove gaps and overlaps.
2179 :type degap:
2180 bool
2182 :param maxgap:
2183 Maximum gap size in samples which is filled with interpolated
2184 samples when ``degap`` is ``True``.
2185 :type maxgap:
2186 int
2188 :param maxlap:
2189 Maximum overlap size in samples which is removed when ``degap`` is
2190 ``True``.
2191 :type maxlap:
2192 int
2194 :param snap:
2195 Rounding functions used when computing sample index from time
2196 instance, for trace start and trace end, respectively. By default,
2197 ``(round, round)`` is used.
2198 :type snap:
2199 tuple of 2 callables
2201 :param include_last:
2202 If ``True``, add one more sample to the returned traces (the sample
2203 which would be the first sample of a query with ``tmin`` set to the
2204 current value of ``tmax``).
2205 :type include_last:
2206 bool
2208 :param load_data:
2209 If ``True``, waveform data samples are read from files (or cache).
2210 If ``False``, meta-information-only traces are returned (dummy
2211 traces with no data samples).
2212 :type load_data:
2213 bool
2215 :param accessor_id:
2216 Name of consumer on who's behalf data is accessed. Used in cache
2217 management (see :py:mod:`~pyrocko.squirrel.cache`). Used as a key
2218 to distinguish different points of extraction for the decision of
2219 when to release cached waveform data. Should be used when data is
2220 alternately extracted from more than one region / selection.
2221 :type accessor_id:
2222 str
2224 :param clear_accessor:
2225 If ``True`` (default), :py:meth:`clear_accessor` is called when the
2226 chopper finishes. Set to ``False`` to keep loaded waveforms in
2227 memory when the generator returns.
2228 :type clear_accessor:
2229 bool
2231 :yields:
2232 A list of :py:class:`~pyrocko.trace.Trace` objects for every
2233 extracted time window.
2235 See :py:meth:`iter_nuts` for details on time span matching.
2236 '''
2238 tmin, tmax, codes = self._get_selection_args(
2239 WAVEFORM, obj, tmin, tmax, time, codes)
2241 self_tmin, self_tmax = self.get_time_span(
2242 ['waveform', 'waveform_promise'])
2244 if None in (self_tmin, self_tmax):
2245 logger.warning(
2246 'Content has undefined time span. No waveforms and no '
2247 'waveform promises?')
2248 return
2250 if snap_window and tinc is not None:
2251 tmin = tmin if tmin is not None else self_tmin
2252 tmax = tmax if tmax is not None else self_tmax
2253 tmin = math.floor(tmin / tinc) * tinc
2254 tmax = math.ceil(tmax / tinc) * tinc
2255 else:
2256 tmin = tmin if tmin is not None else self_tmin + tpad
2257 tmax = tmax if tmax is not None else self_tmax - tpad
2259 tinc = tinc if tinc is not None else tmax - tmin
2261 try:
2262 if accessor_id is None:
2263 accessor_id = 'chopper%i' % self._n_choppers_active
2265 self._n_choppers_active += 1
2267 eps = tinc * 1e-6
2268 if tinc != 0.0:
2269 nwin = int(((tmax - eps) - tmin) / tinc) + 1
2270 else:
2271 nwin = 1
2273 for iwin in range(nwin):
2274 wmin, wmax = tmin+iwin*tinc, min(tmin+(iwin+1)*tinc, tmax)
2276 chopped = self.get_waveforms(
2277 tmin=wmin-tpad,
2278 tmax=wmax+tpad,
2279 codes=codes,
2280 snap=snap,
2281 include_last=include_last,
2282 load_data=load_data,
2283 want_incomplete=want_incomplete,
2284 degap=degap,
2285 maxgap=maxgap,
2286 maxlap=maxlap,
2287 accessor_id=accessor_id,
2288 operator_params=operator_params)
2290 self.advance_accessor(accessor_id)
2292 yield Batch(
2293 tmin=wmin,
2294 tmax=wmax,
2295 i=iwin,
2296 n=nwin,
2297 traces=chopped)
2299 iwin += 1
2301 finally:
2302 self._n_choppers_active -= 1
2303 if clear_accessor:
2304 self.clear_accessor(accessor_id, 'waveform')
2306 def _process_chopped(
2307 self, chopped, degap, maxgap, maxlap, want_incomplete, tmin, tmax):
2309 chopped.sort(key=lambda a: a.full_id)
2310 if degap:
2311 chopped = trace.degapper(chopped, maxgap=maxgap, maxlap=maxlap)
2313 if not want_incomplete:
2314 chopped_weeded = []
2315 for tr in chopped:
2316 emin = tr.tmin - tmin
2317 emax = tr.tmax + tr.deltat - tmax
2318 if (abs(emin) <= 0.5*tr.deltat and abs(emax) <= 0.5*tr.deltat):
2319 chopped_weeded.append(tr)
2321 elif degap:
2322 if (0. < emin <= 5. * tr.deltat
2323 and -5. * tr.deltat <= emax < 0.):
2325 tr.extend(tmin, tmax-tr.deltat, fillmethod='repeat')
2326 chopped_weeded.append(tr)
2328 chopped = chopped_weeded
2330 return chopped
2332 def _get_pyrocko_stations(
2333 self, obj=None, tmin=None, tmax=None, time=None, codes=None):
2335 from pyrocko import model as pmodel
2337 by_nsl = defaultdict(lambda: (list(), list()))
2338 for station in self.get_stations(obj, tmin, tmax, time, codes):
2339 sargs = station._get_pyrocko_station_args()
2340 by_nsl[station.codes.nsl][0].append(sargs)
2342 for channel in self.get_channels(obj, tmin, tmax, time, codes):
2343 sargs = channel._get_pyrocko_station_args()
2344 sargs_list, channels_list = by_nsl[channel.codes.nsl]
2345 sargs_list.append(sargs)
2346 channels_list.append(channel)
2348 pstations = []
2349 nsls = list(by_nsl.keys())
2350 nsls.sort()
2351 for nsl in nsls:
2352 sargs_list, channels_list = by_nsl[nsl]
2353 sargs = util.consistency_merge(
2354 [('',) + x for x in sargs_list])
2356 by_c = defaultdict(list)
2357 for ch in channels_list:
2358 by_c[ch.codes.channel].append(ch._get_pyrocko_channel_args())
2360 chas = list(by_c.keys())
2361 chas.sort()
2362 pchannels = []
2363 for cha in chas:
2364 list_of_cargs = by_c[cha]
2365 cargs = util.consistency_merge(
2366 [('',) + x for x in list_of_cargs])
2367 pchannels.append(pmodel.Channel(*cargs))
2369 pstations.append(
2370 pmodel.Station(*sargs, channels=pchannels))
2372 return pstations
2374 @property
2375 def pile(self):
2377 '''
2378 Emulates the older :py:class:`pyrocko.pile.Pile` interface.
2380 This property exposes a :py:class:`pyrocko.squirrel.pile.Pile` object,
2381 which emulates most of the older :py:class:`pyrocko.pile.Pile` methods
2382 but uses the fluffy power of the Squirrel under the hood.
2384 This interface can be used as a drop-in replacement for piles which are
2385 used in existing scripts and programs for efficient waveform data
2386 access. The Squirrel-based pile scales better for large datasets. Newer
2387 scripts should use Squirrel's native methods to avoid the emulation
2388 overhead.
2389 '''
2390 from . import pile
2392 if self._pile is None:
2393 self._pile = pile.Pile(self)
2395 return self._pile
2397 def snuffle(self):
2398 '''
2399 Look at dataset in Snuffler.
2400 '''
2401 self.pile.snuffle()
2403 def _gather_codes_keys(self, kind, gather, selector):
2404 return set(
2405 gather(codes)
2406 for codes in self.iter_codes(kind)
2407 if selector is None or selector(codes))
2409 def __str__(self):
2410 return str(self.get_stats())
2412 def get_coverage(
2413 self, kind, tmin=None, tmax=None, codes_list=None, limit=None):
2415 '''
2416 Get coverage information.
2418 Get information about strips of gapless data coverage.
2420 :param kind:
2421 Content kind to be queried.
2422 :type kind:
2423 str
2425 :param tmin:
2426 Start time of query interval.
2427 :type tmin:
2428 timestamp
2430 :param tmax:
2431 End time of query interval.
2432 :type tmax:
2433 timestamp
2435 :param codes_list:
2436 If given, restrict query to given content codes patterns.
2437 :type codes_list:
2438 :py:class:`list` of :py:class:`Codes` objects appropriate for the
2439 queried content type, or anything which can be converted to
2440 such objects.
2442 :param limit:
2443 Limit query to return only up to a given maximum number of entries
2444 per matching time series (without setting this option, very gappy
2445 data could cause the query to execute for a very long time).
2446 :type limit:
2447 int
2449 :returns:
2450 Information about time spans covered by the requested time series
2451 data.
2452 :rtype:
2453 :py:class:`list` of :py:class:`Coverage` objects
2454 '''
2456 tmin_seconds, tmin_offset = model.tsplit(tmin)
2457 tmax_seconds, tmax_offset = model.tsplit(tmax)
2458 kind_id = to_kind_id(kind)
2460 codes_info = list(self._iter_codes_info(kind=kind))
2462 kdata_all = []
2463 if codes_list is None:
2464 for _, codes, deltat, kind_codes_id, _ in codes_info:
2465 kdata_all.append((codes, kind_codes_id, codes, deltat))
2467 else:
2468 for pattern in codes_list:
2469 pattern = to_codes(kind_id, pattern)
2470 for _, codes, deltat, kind_codes_id, _ in codes_info:
2471 if model.match_codes(pattern, codes):
2472 kdata_all.append(
2473 (pattern, kind_codes_id, codes, deltat))
2475 kind_codes_ids = [x[1] for x in kdata_all]
2477 counts_at_tmin = {}
2478 if tmin is not None:
2479 for nut in self.iter_nuts(
2480 kind, tmin, tmin, kind_codes_ids=kind_codes_ids):
2482 k = nut.codes, nut.deltat
2483 if k not in counts_at_tmin:
2484 counts_at_tmin[k] = 0
2486 counts_at_tmin[k] += 1
2488 coverages = []
2489 for pattern, kind_codes_id, codes, deltat in kdata_all:
2490 entry = [pattern, codes, deltat, None, None, []]
2491 for i, order in [(0, 'ASC'), (1, 'DESC')]:
2492 sql = self._sql('''
2493 SELECT
2494 time_seconds,
2495 time_offset
2496 FROM %(db)s.%(coverage)s
2497 WHERE
2498 kind_codes_id == ?
2499 ORDER BY
2500 kind_codes_id ''' + order + ''',
2501 time_seconds ''' + order + ''',
2502 time_offset ''' + order + '''
2503 LIMIT 1
2504 ''')
2506 for row in self._conn.execute(sql, [kind_codes_id]):
2507 entry[3+i] = model.tjoin(row[0], row[1])
2509 if None in entry[3:5]:
2510 continue
2512 args = [kind_codes_id]
2514 sql_time = ''
2515 if tmin is not None:
2516 # intentionally < because (== tmin) is queried from nuts
2517 sql_time += ' AND ( ? < time_seconds ' \
2518 'OR ( ? == time_seconds AND ? < time_offset ) ) '
2519 args.extend([tmin_seconds, tmin_seconds, tmin_offset])
2521 if tmax is not None:
2522 sql_time += ' AND ( time_seconds < ? ' \
2523 'OR ( ? == time_seconds AND time_offset <= ? ) ) '
2524 args.extend([tmax_seconds, tmax_seconds, tmax_offset])
2526 sql_limit = ''
2527 if limit is not None:
2528 sql_limit = ' LIMIT ?'
2529 args.append(limit)
2531 sql = self._sql('''
2532 SELECT
2533 time_seconds,
2534 time_offset,
2535 step
2536 FROM %(db)s.%(coverage)s
2537 WHERE
2538 kind_codes_id == ?
2539 ''' + sql_time + '''
2540 ORDER BY
2541 kind_codes_id,
2542 time_seconds,
2543 time_offset
2544 ''' + sql_limit)
2546 rows = list(self._conn.execute(sql, args))
2548 if limit is not None and len(rows) == limit:
2549 entry[-1] = None
2550 else:
2551 counts = counts_at_tmin.get((codes, deltat), 0)
2552 tlast = None
2553 if tmin is not None:
2554 entry[-1].append((tmin, counts))
2555 tlast = tmin
2557 for row in rows:
2558 t = model.tjoin(row[0], row[1])
2559 counts += row[2]
2560 entry[-1].append((t, counts))
2561 tlast = t
2563 if tmax is not None and (tlast is None or tlast != tmax):
2564 entry[-1].append((tmax, counts))
2566 coverages.append(model.Coverage.from_values(entry + [kind_id]))
2568 return coverages
2570 def add_operator(self, op):
2571 self._operators.append(op)
2573 def update_operator_mappings(self):
2574 available = self.get_codes(kind=('channel'))
2576 for operator in self._operators:
2577 operator.update_mappings(available, self._operator_registry)
2579 def iter_operator_mappings(self):
2580 for operator in self._operators:
2581 for in_codes, out_codes in operator.iter_mappings():
2582 yield operator, in_codes, out_codes
2584 def get_operator_mappings(self):
2585 return list(self.iter_operator_mappings())
2587 def get_operator(self, codes):
2588 try:
2589 return self._operator_registry[codes][0]
2590 except KeyError:
2591 return None
2593 def get_operator_group(self, codes):
2594 try:
2595 return self._operator_registry[codes]
2596 except KeyError:
2597 return None, (None, None, None)
2599 def iter_operator_codes(self):
2600 for _, _, out_codes in self.iter_operator_mappings():
2601 for codes in out_codes:
2602 yield codes
2604 def get_operator_codes(self):
2605 return list(self.iter_operator_codes())
2607 def print_tables(self, table_names=None, stream=None):
2608 '''
2609 Dump raw database tables in textual form (for debugging purposes).
2611 :param table_names:
2612 Names of tables to be dumped or ``None`` to dump all.
2613 :type table_names:
2614 :py:class:`list` of :py:class:`str`
2616 :param stream:
2617 Open file or ``None`` to dump to standard output.
2618 '''
2620 if stream is None:
2621 stream = sys.stdout
2623 if isinstance(table_names, str):
2624 table_names = [table_names]
2626 if table_names is None:
2627 table_names = [
2628 'selection_file_states',
2629 'selection_nuts',
2630 'selection_kind_codes_count',
2631 'files', 'nuts', 'kind_codes', 'kind_codes_count']
2633 m = {
2634 'selection_file_states': '%(db)s.%(file_states)s',
2635 'selection_nuts': '%(db)s.%(nuts)s',
2636 'selection_kind_codes_count': '%(db)s.%(kind_codes_count)s',
2637 'files': 'files',
2638 'nuts': 'nuts',
2639 'kind_codes': 'kind_codes',
2640 'kind_codes_count': 'kind_codes_count'}
2642 for table_name in table_names:
2643 self._database.print_table(
2644 m[table_name] % self._names, stream=stream)
2647class SquirrelStats(Object):
2648 '''
2649 Container to hold statistics about contents available from a Squirrel.
2651 See also :py:meth:`Squirrel.get_stats`.
2652 '''
2654 nfiles = Int.T(
2655 help='Number of files in selection.')
2656 nnuts = Int.T(
2657 help='Number of index nuts in selection.')
2658 codes = List.T(
2659 Tuple.T(content_t=String.T()),
2660 help='Available code sequences in selection, e.g. '
2661 '(agency, network, station, location) for stations nuts.')
2662 kinds = List.T(
2663 String.T(),
2664 help='Available content types in selection.')
2665 total_size = Int.T(
2666 help='Aggregated file size of files is selection.')
2667 counts = Dict.T(
2668 String.T(), Dict.T(Tuple.T(content_t=String.T()), Int.T()),
2669 help='Breakdown of how many nuts of any content type and code '
2670 'sequence are available in selection, ``counts[kind][codes]``.')
2671 time_spans = Dict.T(
2672 String.T(), Tuple.T(content_t=Timestamp.T()),
2673 help='Time spans by content type.')
2674 sources = List.T(
2675 String.T(),
2676 help='Descriptions of attached sources.')
2677 operators = List.T(
2678 String.T(),
2679 help='Descriptions of attached operators.')
2681 def __str__(self):
2682 kind_counts = dict(
2683 (kind, sum(self.counts[kind].values())) for kind in self.kinds)
2685 scodes = model.codes_to_str_abbreviated(self.codes)
2687 ssources = '<none>' if not self.sources else '\n' + '\n'.join(
2688 ' ' + s for s in self.sources)
2690 soperators = '<none>' if not self.operators else '\n' + '\n'.join(
2691 ' ' + s for s in self.operators)
2693 def stime(t):
2694 return util.tts(t) if t is not None and t not in (
2695 model.g_tmin, model.g_tmax) else '<none>'
2697 def stable(rows):
2698 ns = [max(len(w) for w in col) for col in zip(*rows)]
2699 return '\n'.join(
2700 ' '.join(w.ljust(n) for n, w in zip(ns, row))
2701 for row in rows)
2703 def indent(s):
2704 return '\n'.join(' '+line for line in s.splitlines())
2706 stspans = '<none>' if not self.kinds else '\n' + indent(stable([(
2707 kind + ':',
2708 str(kind_counts[kind]),
2709 stime(self.time_spans[kind][0]),
2710 '-',
2711 stime(self.time_spans[kind][1])) for kind in sorted(self.kinds)]))
2713 s = '''
2714Number of files: %i
2715Total size of known files: %s
2716Number of index nuts: %i
2717Available content kinds: %s
2718Available codes: %s
2719Sources: %s
2720Operators: %s''' % (
2721 self.nfiles,
2722 util.human_bytesize(self.total_size),
2723 self.nnuts,
2724 stspans, scodes, ssources, soperators)
2726 return s.lstrip()
2729__all__ = [
2730 'Squirrel',
2731 'SquirrelStats',
2732]