1# http://pyrocko.org - GPLv3
2#
3# The Pyrocko Developers, 21st Century
4# ---|P------/S----------~Lg----------
6from __future__ import absolute_import, print_function
8import sys
9import os
11import math
12import logging
13import threading
14import queue
15from collections import defaultdict
17from pyrocko.guts import Object, Int, List, Tuple, String, Timestamp, Dict
18from pyrocko import util, trace
19from pyrocko.progress import progress
21from . import model, io, cache, dataset
23from .model import to_kind_id, WaveformOrder, to_kind, to_codes, \
24 STATION, CHANNEL, RESPONSE, EVENT, WAVEFORM, codes_patterns_list, \
25 codes_patterns_for_kind
26from .client import fdsn, catalog
27from .selection import Selection, filldocs
28from .database import abspath
29from .operators.base import Operator, CodesPatternFiltering
30from . import client, environment, error
32logger = logging.getLogger('psq.base')
34guts_prefix = 'squirrel'
37def make_task(*args):
38 return progress.task(*args, logger=logger)
41def lpick(condition, seq):
42 ft = [], []
43 for ele in seq:
44 ft[int(bool(condition(ele)))].append(ele)
46 return ft
49def len_plural(obj):
50 return len(obj), '' if len(obj) == 1 else 's'
53def blocks(tmin, tmax, deltat, nsamples_block=100000):
54 tblock = util.to_time_float(deltat * nsamples_block)
55 iblock_min = int(math.floor(tmin / tblock))
56 iblock_max = int(math.ceil(tmax / tblock))
57 for iblock in range(iblock_min, iblock_max):
58 yield iblock * tblock, (iblock+1) * tblock
61def gaps(avail, tmin, tmax):
62 assert tmin < tmax
64 data = [(tmax, 1), (tmin, -1)]
65 for (tmin_a, tmax_a) in avail:
66 assert tmin_a < tmax_a
67 data.append((tmin_a, 1))
68 data.append((tmax_a, -1))
70 data.sort()
71 s = 1
72 gaps = []
73 tmin_g = None
74 for t, x in data:
75 if s == 1 and x == -1:
76 tmin_g = t
77 elif s == 0 and x == 1 and tmin_g is not None:
78 tmax_g = t
79 if tmin_g != tmax_g:
80 gaps.append((tmin_g, tmax_g))
82 s += x
84 return gaps
87def order_key(order):
88 return (order.codes, order.tmin, order.tmax)
91def _is_exact(pat):
92 return not ('*' in pat or '?' in pat or ']' in pat or '[' in pat)
95def prefix_tree(tups):
96 if not tups:
97 return []
99 if len(tups[0]) == 1:
100 return sorted((tup[0], []) for tup in tups)
102 d = defaultdict(list)
103 for tup in tups:
104 d[tup[0]].append(tup[1:])
106 sub = []
107 for k in sorted(d.keys()):
108 sub.append((k, prefix_tree(d[k])))
110 return sub
113def match_time_span(tmin, tmax, obj):
114 return (obj.tmin is None or tmax is None or obj.tmin <= tmax) \
115 and (tmin is None or obj.tmax is None or tmin < obj.tmax)
118class Batch(object):
119 '''
120 Batch of waveforms from window-wise data extraction.
122 Encapsulates state and results yielded for each window in window-wise
123 waveform extraction with the :py:meth:`Squirrel.chopper_waveforms` method.
125 *Attributes:*
127 .. py:attribute:: tmin
129 Start of this time window.
131 .. py:attribute:: tmax
133 End of this time window.
135 .. py:attribute:: i
137 Index of this time window in sequence.
139 .. py:attribute:: n
141 Total number of time windows in sequence.
143 .. py:attribute:: igroup
145 Index of this time window's sequence group.
147 .. py:attribute:: ngroups
149 Total number of sequence groups.
151 .. py:attribute:: traces
153 Extracted waveforms for this time window.
154 '''
156 def __init__(self, tmin, tmax, i, n, igroup, ngroups, traces):
157 self.tmin = tmin
158 self.tmax = tmax
159 self.i = i
160 self.n = n
161 self.igroup = igroup
162 self.ngroups = ngroups
163 self.traces = traces
166class Squirrel(Selection):
167 '''
168 Prompt, lazy, indexing, caching, dynamic seismological dataset access.
170 :param env:
171 Squirrel environment instance or directory path to use as starting
172 point for its detection. By default, the current directory is used as
173 starting point. When searching for a usable environment the directory
174 ``'.squirrel'`` or ``'squirrel'`` in the current (or starting point)
175 directory is used if it exists, otherwise the parent directories are
176 search upwards for the existence of such a directory. If no such
177 directory is found, the user's global Squirrel environment
178 ``'$HOME/.pyrocko/squirrel'`` is used.
179 :type env:
180 :py:class:`~pyrocko.squirrel.environment.Environment` or
181 :py:class:`str`
183 :param database:
184 Database instance or path to database. By default the
185 database found in the detected Squirrel environment is used.
186 :type database:
187 :py:class:`~pyrocko.squirrel.database.Database` or :py:class:`str`
189 :param cache_path:
190 Directory path to use for data caching. By default, the ``'cache'``
191 directory in the detected Squirrel environment is used.
192 :type cache_path:
193 :py:class:`str`
195 :param persistent:
196 If given a name, create a persistent selection.
197 :type persistent:
198 :py:class:`str`
200 This is the central class of the Squirrel framework. It provides a unified
201 interface to query and access seismic waveforms, station meta-data and
202 event information from local file collections and remote data sources. For
203 prompt responses, a profound database setup is used under the hood. To
204 speed up assemblage of ad-hoc data selections, files are indexed on first
205 use and the extracted meta-data is remembered in the database for
206 subsequent accesses. Bulk data is lazily loaded from disk and remote
207 sources, just when requested. Once loaded, data is cached in memory to
208 expedite typical access patterns. Files and data sources can be dynamically
209 added to and removed from the Squirrel selection at runtime.
211 Queries are restricted to the contents of the files currently added to the
212 Squirrel selection (usually a subset of the file meta-information
213 collection in the database). This list of files is referred to here as the
214 "selection". By default, temporary tables are created in the attached
215 database to hold the names of the files in the selection as well as various
216 indices and counters. These tables are only visible inside the application
217 which created them and are deleted when the database connection is closed
218 or the application exits. To create a selection which is not deleted at
219 exit, supply a name to the ``persistent`` argument of the Squirrel
220 constructor. Persistent selections are shared among applications using the
221 same database.
223 **Method summary**
225 Some of the methods are implemented in :py:class:`Squirrel`'s base class
226 :py:class:`~pyrocko.squirrel.selection.Selection`.
228 .. autosummary::
230 ~Squirrel.add
231 ~Squirrel.add_source
232 ~Squirrel.add_fdsn
233 ~Squirrel.add_catalog
234 ~Squirrel.add_dataset
235 ~Squirrel.add_virtual
236 ~Squirrel.update
237 ~Squirrel.update_waveform_promises
238 ~Squirrel.advance_accessor
239 ~Squirrel.clear_accessor
240 ~Squirrel.reload
241 ~pyrocko.squirrel.selection.Selection.iter_paths
242 ~Squirrel.iter_nuts
243 ~Squirrel.iter_kinds
244 ~Squirrel.iter_deltats
245 ~Squirrel.iter_codes
246 ~pyrocko.squirrel.selection.Selection.get_paths
247 ~Squirrel.get_nuts
248 ~Squirrel.get_kinds
249 ~Squirrel.get_deltats
250 ~Squirrel.get_codes
251 ~Squirrel.get_counts
252 ~Squirrel.get_time_span
253 ~Squirrel.get_deltat_span
254 ~Squirrel.get_nfiles
255 ~Squirrel.get_nnuts
256 ~Squirrel.get_total_size
257 ~Squirrel.get_stats
258 ~Squirrel.get_content
259 ~Squirrel.get_stations
260 ~Squirrel.get_channels
261 ~Squirrel.get_responses
262 ~Squirrel.get_events
263 ~Squirrel.get_waveform_nuts
264 ~Squirrel.get_waveforms
265 ~Squirrel.chopper_waveforms
266 ~Squirrel.get_coverage
267 ~Squirrel.pile
268 ~Squirrel.snuffle
269 ~Squirrel.glob_codes
270 ~pyrocko.squirrel.selection.Selection.get_database
271 ~Squirrel.print_tables
272 '''
274 def __init__(
275 self, env=None, database=None, cache_path=None, persistent=None):
277 if not isinstance(env, environment.Environment):
278 env = environment.get_environment(env)
280 if database is None:
281 database = env.expand_path(env.database_path)
283 if cache_path is None:
284 cache_path = env.expand_path(env.cache_path)
286 if persistent is None:
287 persistent = env.persistent
289 Selection.__init__(
290 self, database=database, persistent=persistent)
292 self.get_database().set_basepath(os.path.dirname(env.get_basepath()))
294 self._content_caches = {
295 'waveform': cache.ContentCache(),
296 'default': cache.ContentCache()}
298 self._cache_path = cache_path
300 self._sources = []
301 self._operators = []
302 self._operator_registry = {}
304 self._pending_orders = []
306 self._pile = None
307 self._n_choppers_active = 0
309 self._names.update({
310 'nuts': self.name + '_nuts',
311 'kind_codes_count': self.name + '_kind_codes_count',
312 'coverage': self.name + '_coverage'})
314 with self.transaction('create tables') as cursor:
315 self._create_tables_squirrel(cursor)
317 def _create_tables_squirrel(self, cursor):
319 cursor.execute(self._register_table(self._sql(
320 '''
321 CREATE TABLE IF NOT EXISTS %(db)s.%(nuts)s (
322 nut_id integer PRIMARY KEY,
323 file_id integer,
324 file_segment integer,
325 file_element integer,
326 kind_id integer,
327 kind_codes_id integer,
328 tmin_seconds integer,
329 tmin_offset integer,
330 tmax_seconds integer,
331 tmax_offset integer,
332 kscale integer)
333 ''')))
335 cursor.execute(self._register_table(self._sql(
336 '''
337 CREATE TABLE IF NOT EXISTS %(db)s.%(kind_codes_count)s (
338 kind_codes_id integer PRIMARY KEY,
339 count integer)
340 ''')))
342 cursor.execute(self._sql(
343 '''
344 CREATE UNIQUE INDEX IF NOT EXISTS %(db)s.%(nuts)s_file_element
345 ON %(nuts)s (file_id, file_segment, file_element)
346 '''))
348 cursor.execute(self._sql(
349 '''
350 CREATE INDEX IF NOT EXISTS %(db)s.%(nuts)s_index_file_id
351 ON %(nuts)s (file_id)
352 '''))
354 cursor.execute(self._sql(
355 '''
356 CREATE INDEX IF NOT EXISTS %(db)s.%(nuts)s_index_tmin_seconds
357 ON %(nuts)s (kind_id, tmin_seconds)
358 '''))
360 cursor.execute(self._sql(
361 '''
362 CREATE INDEX IF NOT EXISTS %(db)s.%(nuts)s_index_tmax_seconds
363 ON %(nuts)s (kind_id, tmax_seconds)
364 '''))
366 cursor.execute(self._sql(
367 '''
368 CREATE INDEX IF NOT EXISTS %(db)s.%(nuts)s_index_kscale
369 ON %(nuts)s (kind_id, kscale, tmin_seconds)
370 '''))
372 cursor.execute(self._sql(
373 '''
374 CREATE TRIGGER IF NOT EXISTS %(db)s.%(nuts)s_delete_nuts
375 BEFORE DELETE ON main.files FOR EACH ROW
376 BEGIN
377 DELETE FROM %(nuts)s WHERE file_id == old.file_id;
378 END
379 '''))
381 # trigger only on size to make silent update of mtime possible
382 cursor.execute(self._sql(
383 '''
384 CREATE TRIGGER IF NOT EXISTS %(db)s.%(nuts)s_delete_nuts2
385 BEFORE UPDATE OF size ON main.files FOR EACH ROW
386 BEGIN
387 DELETE FROM %(nuts)s WHERE file_id == old.file_id;
388 END
389 '''))
391 cursor.execute(self._sql(
392 '''
393 CREATE TRIGGER IF NOT EXISTS
394 %(db)s.%(file_states)s_delete_files
395 BEFORE DELETE ON %(db)s.%(file_states)s FOR EACH ROW
396 BEGIN
397 DELETE FROM %(nuts)s WHERE file_id == old.file_id;
398 END
399 '''))
401 cursor.execute(self._sql(
402 '''
403 CREATE TRIGGER IF NOT EXISTS %(db)s.%(nuts)s_inc_kind_codes
404 BEFORE INSERT ON %(nuts)s FOR EACH ROW
405 BEGIN
406 INSERT OR IGNORE INTO %(kind_codes_count)s VALUES
407 (new.kind_codes_id, 0);
408 UPDATE %(kind_codes_count)s
409 SET count = count + 1
410 WHERE new.kind_codes_id
411 == %(kind_codes_count)s.kind_codes_id;
412 END
413 '''))
415 cursor.execute(self._sql(
416 '''
417 CREATE TRIGGER IF NOT EXISTS %(db)s.%(nuts)s_dec_kind_codes
418 BEFORE DELETE ON %(nuts)s FOR EACH ROW
419 BEGIN
420 UPDATE %(kind_codes_count)s
421 SET count = count - 1
422 WHERE old.kind_codes_id
423 == %(kind_codes_count)s.kind_codes_id;
424 END
425 '''))
427 cursor.execute(self._register_table(self._sql(
428 '''
429 CREATE TABLE IF NOT EXISTS %(db)s.%(coverage)s (
430 kind_codes_id integer,
431 time_seconds integer,
432 time_offset integer,
433 step integer)
434 ''')))
436 cursor.execute(self._sql(
437 '''
438 CREATE UNIQUE INDEX IF NOT EXISTS %(db)s.%(coverage)s_time
439 ON %(coverage)s (kind_codes_id, time_seconds, time_offset)
440 '''))
442 cursor.execute(self._sql(
443 '''
444 CREATE TRIGGER IF NOT EXISTS %(db)s.%(nuts)s_add_coverage
445 AFTER INSERT ON %(nuts)s FOR EACH ROW
446 BEGIN
447 INSERT OR IGNORE INTO %(coverage)s VALUES
448 (new.kind_codes_id, new.tmin_seconds, new.tmin_offset, 0)
449 ;
450 UPDATE %(coverage)s
451 SET step = step + 1
452 WHERE new.kind_codes_id == %(coverage)s.kind_codes_id
453 AND new.tmin_seconds == %(coverage)s.time_seconds
454 AND new.tmin_offset == %(coverage)s.time_offset
455 ;
456 INSERT OR IGNORE INTO %(coverage)s VALUES
457 (new.kind_codes_id, new.tmax_seconds, new.tmax_offset, 0)
458 ;
459 UPDATE %(coverage)s
460 SET step = step - 1
461 WHERE new.kind_codes_id == %(coverage)s.kind_codes_id
462 AND new.tmax_seconds == %(coverage)s.time_seconds
463 AND new.tmax_offset == %(coverage)s.time_offset
464 ;
465 DELETE FROM %(coverage)s
466 WHERE new.kind_codes_id == %(coverage)s.kind_codes_id
467 AND new.tmin_seconds == %(coverage)s.time_seconds
468 AND new.tmin_offset == %(coverage)s.time_offset
469 AND step == 0
470 ;
471 DELETE FROM %(coverage)s
472 WHERE new.kind_codes_id == %(coverage)s.kind_codes_id
473 AND new.tmax_seconds == %(coverage)s.time_seconds
474 AND new.tmax_offset == %(coverage)s.time_offset
475 AND step == 0
476 ;
477 END
478 '''))
480 cursor.execute(self._sql(
481 '''
482 CREATE TRIGGER IF NOT EXISTS %(db)s.%(nuts)s_remove_coverage
483 BEFORE DELETE ON %(nuts)s FOR EACH ROW
484 BEGIN
485 INSERT OR IGNORE INTO %(coverage)s VALUES
486 (old.kind_codes_id, old.tmin_seconds, old.tmin_offset, 0)
487 ;
488 UPDATE %(coverage)s
489 SET step = step - 1
490 WHERE old.kind_codes_id == %(coverage)s.kind_codes_id
491 AND old.tmin_seconds == %(coverage)s.time_seconds
492 AND old.tmin_offset == %(coverage)s.time_offset
493 ;
494 INSERT OR IGNORE INTO %(coverage)s VALUES
495 (old.kind_codes_id, old.tmax_seconds, old.tmax_offset, 0)
496 ;
497 UPDATE %(coverage)s
498 SET step = step + 1
499 WHERE old.kind_codes_id == %(coverage)s.kind_codes_id
500 AND old.tmax_seconds == %(coverage)s.time_seconds
501 AND old.tmax_offset == %(coverage)s.time_offset
502 ;
503 DELETE FROM %(coverage)s
504 WHERE old.kind_codes_id == %(coverage)s.kind_codes_id
505 AND old.tmin_seconds == %(coverage)s.time_seconds
506 AND old.tmin_offset == %(coverage)s.time_offset
507 AND step == 0
508 ;
509 DELETE FROM %(coverage)s
510 WHERE old.kind_codes_id == %(coverage)s.kind_codes_id
511 AND old.tmax_seconds == %(coverage)s.time_seconds
512 AND old.tmax_offset == %(coverage)s.time_offset
513 AND step == 0
514 ;
515 END
516 '''))
518 def _delete(self):
519 '''Delete database tables associated with this Squirrel.'''
521 with self.transaction('delete tables') as cursor:
522 for s in '''
523 DROP TRIGGER %(db)s.%(nuts)s_delete_nuts;
524 DROP TRIGGER %(db)s.%(nuts)s_delete_nuts2;
525 DROP TRIGGER %(db)s.%(file_states)s_delete_files;
526 DROP TRIGGER %(db)s.%(nuts)s_inc_kind_codes;
527 DROP TRIGGER %(db)s.%(nuts)s_dec_kind_codes;
528 DROP TABLE %(db)s.%(nuts)s;
529 DROP TABLE %(db)s.%(kind_codes_count)s;
530 DROP TRIGGER IF EXISTS %(db)s.%(nuts)s_add_coverage;
531 DROP TRIGGER IF EXISTS %(db)s.%(nuts)s_remove_coverage;
532 DROP TABLE IF EXISTS %(db)s.%(coverage)s;
533 '''.strip().splitlines():
535 cursor.execute(self._sql(s))
537 Selection._delete(self)
539 @filldocs
540 def add(self,
541 paths,
542 kinds=None,
543 format='detect',
544 include=None,
545 exclude=None,
546 check=True):
548 '''
549 Add files to the selection.
551 :param paths:
552 Iterator yielding paths to files or directories to be added to the
553 selection. Recurses into directories. If given a ``str``, it
554 is treated as a single path to be added.
555 :type paths:
556 :py:class:`list` of :py:class:`str`
558 :param kinds:
559 Content types to be made available through the Squirrel selection.
560 By default, all known content types are accepted.
561 :type kinds:
562 :py:class:`list` of :py:class:`str`
564 :param format:
565 File format identifier or ``'detect'`` to enable auto-detection
566 (available: %(file_formats)s).
567 :type format:
568 str
570 :param include:
571 If not ``None``, files are only included if their paths match the
572 given regular expression pattern.
573 :type format:
574 str
576 :param exclude:
577 If not ``None``, files are only included if their paths do not
578 match the given regular expression pattern.
579 :type format:
580 str
582 :param check:
583 If ``True``, all file modification times are checked to see if
584 cached information has to be updated (slow). If ``False``, only
585 previously unknown files are indexed and cached information is used
586 for known files, regardless of file state (fast, corrresponds to
587 Squirrel's ``--optimistic`` mode). File deletions will go
588 undetected in the latter case.
589 :type check:
590 bool
592 :Complexity:
593 O(log N)
594 '''
596 if isinstance(kinds, str):
597 kinds = (kinds,)
599 if isinstance(paths, str):
600 paths = [paths]
602 kind_mask = model.to_kind_mask(kinds)
604 with progress.view():
605 Selection.add(
606 self, util.iter_select_files(
607 paths,
608 show_progress=False,
609 include=include,
610 exclude=exclude,
611 pass_through=lambda path: path.startswith('virtual:')
612 ), kind_mask, format)
614 self._load(check)
615 self._update_nuts()
617 def reload(self):
618 '''
619 Check for modifications and reindex modified files.
621 Based on file modification times.
622 '''
624 self._set_file_states_force_check()
625 self._load(check=True)
626 self._update_nuts()
628 def add_virtual(self, nuts, virtual_paths=None):
629 '''
630 Add content which is not backed by files.
632 :param nuts:
633 Content pieces to be added.
634 :type nuts:
635 iterator yielding :py:class:`~pyrocko.squirrel.model.Nut` objects
637 :param virtual_paths:
638 List of virtual paths to prevent creating a temporary list of the
639 nuts while aggregating the file paths for the selection.
640 :type virtual_paths:
641 :py:class:`list` of :py:class:`str`
643 Stores to the main database and the selection.
644 '''
646 if isinstance(virtual_paths, str):
647 virtual_paths = [virtual_paths]
649 if virtual_paths is None:
650 if not isinstance(nuts, list):
651 nuts = list(nuts)
652 virtual_paths = set(nut.file_path for nut in nuts)
654 Selection.add(self, virtual_paths)
655 self.get_database().dig(nuts)
656 self._update_nuts()
658 def add_volatile(self, nuts):
659 if not isinstance(nuts, list):
660 nuts = list(nuts)
662 paths = list(set(nut.file_path for nut in nuts))
663 io.backends.virtual.add_nuts(nuts)
664 self.add_virtual(nuts, paths)
665 self._volatile_paths.extend(paths)
667 def add_volatile_waveforms(self, traces):
668 '''
669 Add in-memory waveforms which will be removed when the app closes.
670 '''
672 name = model.random_name()
674 path = 'virtual:volatile:%s' % name
676 nuts = []
677 for itr, tr in enumerate(traces):
678 assert tr.tmin <= tr.tmax
679 tmin_seconds, tmin_offset = model.tsplit(tr.tmin)
680 tmax_seconds, tmax_offset = model.tsplit(
681 tr.tmin + tr.data_len()*tr.deltat)
683 nuts.append(model.Nut(
684 file_path=path,
685 file_format='virtual',
686 file_segment=itr,
687 file_element=0,
688 file_mtime=0,
689 codes=tr.codes,
690 tmin_seconds=tmin_seconds,
691 tmin_offset=tmin_offset,
692 tmax_seconds=tmax_seconds,
693 tmax_offset=tmax_offset,
694 deltat=tr.deltat,
695 kind_id=to_kind_id('waveform'),
696 content=tr))
698 self.add_volatile(nuts)
699 return path
701 def _load(self, check):
702 for _ in io.iload(
703 self,
704 content=[],
705 skip_unchanged=True,
706 check=check):
707 pass
709 def _update_nuts(self, transaction=None):
710 transaction = transaction or self.transaction('update nuts')
711 with make_task('Aggregating selection') as task, \
712 transaction as cursor:
714 self._conn.set_progress_handler(task.update, 100000)
715 nrows = cursor.execute(self._sql(
716 '''
717 INSERT INTO %(db)s.%(nuts)s
718 SELECT NULL,
719 nuts.file_id, nuts.file_segment, nuts.file_element,
720 nuts.kind_id, nuts.kind_codes_id,
721 nuts.tmin_seconds, nuts.tmin_offset,
722 nuts.tmax_seconds, nuts.tmax_offset,
723 nuts.kscale
724 FROM %(db)s.%(file_states)s
725 INNER JOIN nuts
726 ON %(db)s.%(file_states)s.file_id == nuts.file_id
727 INNER JOIN kind_codes
728 ON nuts.kind_codes_id ==
729 kind_codes.kind_codes_id
730 WHERE %(db)s.%(file_states)s.file_state != 2
731 AND (((1 << kind_codes.kind_id)
732 & %(db)s.%(file_states)s.kind_mask) != 0)
733 ''')).rowcount
735 task.update(nrows)
736 self._set_file_states_known(transaction)
737 self._conn.set_progress_handler(None, 0)
739 def add_source(self, source, check=True):
740 '''
741 Add remote resource.
743 :param source:
744 Remote data access client instance.
745 :type source:
746 subclass of :py:class:`~pyrocko.squirrel.client.base.Source`
747 '''
749 self._sources.append(source)
750 source.setup(self, check=check)
752 def add_fdsn(self, *args, **kwargs):
753 '''
754 Add FDSN site for transparent remote data access.
756 Arguments are passed to
757 :py:class:`~pyrocko.squirrel.client.fdsn.FDSNSource`.
758 '''
760 self.add_source(fdsn.FDSNSource(*args, **kwargs))
762 def add_catalog(self, *args, **kwargs):
763 '''
764 Add online catalog for transparent event data access.
766 Arguments are passed to
767 :py:class:`~pyrocko.squirrel.client.catalog.CatalogSource`.
768 '''
770 self.add_source(catalog.CatalogSource(*args, **kwargs))
772 def add_dataset(self, ds, check=True):
773 '''
774 Read dataset description from file and add its contents.
776 :param ds:
777 Path to dataset description file or dataset description object
778 . See :py:mod:`~pyrocko.squirrel.dataset`.
779 :type ds:
780 :py:class:`str` or :py:class:`~pyrocko.squirrel.dataset.Dataset`
782 :param check:
783 If ``True``, all file modification times are checked to see if
784 cached information has to be updated (slow). If ``False``, only
785 previously unknown files are indexed and cached information is used
786 for known files, regardless of file state (fast, corrresponds to
787 Squirrel's ``--optimistic`` mode). File deletions will go
788 undetected in the latter case.
789 :type check:
790 bool
791 '''
792 if isinstance(ds, str):
793 ds = dataset.read_dataset(ds)
795 ds.setup(self, check=check)
797 def _get_selection_args(
798 self, kind_id,
799 obj=None, tmin=None, tmax=None, time=None, codes=None):
801 if codes is not None:
802 codes = codes_patterns_for_kind(kind_id, codes)
804 if time is not None:
805 tmin = time
806 tmax = time
808 if obj is not None:
809 tmin = tmin if tmin is not None else obj.tmin
810 tmax = tmax if tmax is not None else obj.tmax
811 codes = codes if codes is not None else codes_patterns_for_kind(
812 kind_id, obj.codes)
814 return tmin, tmax, codes
816 def _get_selection_args_str(self, *args, **kwargs):
818 tmin, tmax, codes = self._get_selection_args(*args, **kwargs)
819 return 'tmin: %s, tmax: %s, codes: %s' % (
820 util.time_to_str(tmin) if tmin is not None else 'none',
821 util.time_to_str(tmax) if tmax is not None else 'none',
822 ','.join(str(entry) for entry in codes))
824 def _selection_args_to_kwargs(
825 self, obj=None, tmin=None, tmax=None, time=None, codes=None):
827 return dict(obj=obj, tmin=tmin, tmax=tmax, time=time, codes=codes)
829 def _timerange_sql(self, tmin, tmax, kind, cond, args, naiv):
831 tmin_seconds, tmin_offset = model.tsplit(tmin)
832 tmax_seconds, tmax_offset = model.tsplit(tmax)
833 if naiv:
834 cond.append('%(db)s.%(nuts)s.tmin_seconds <= ?')
835 args.append(tmax_seconds)
836 else:
837 tscale_edges = model.tscale_edges
838 tmin_cond = []
839 for kscale in range(tscale_edges.size + 1):
840 if kscale != tscale_edges.size:
841 tscale = int(tscale_edges[kscale])
842 tmin_cond.append('''
843 (%(db)s.%(nuts)s.kind_id = ?
844 AND %(db)s.%(nuts)s.kscale == ?
845 AND %(db)s.%(nuts)s.tmin_seconds BETWEEN ? AND ?)
846 ''')
847 args.extend(
848 (to_kind_id(kind), kscale,
849 tmin_seconds - tscale - 1, tmax_seconds + 1))
851 else:
852 tmin_cond.append('''
853 (%(db)s.%(nuts)s.kind_id == ?
854 AND %(db)s.%(nuts)s.kscale == ?
855 AND %(db)s.%(nuts)s.tmin_seconds <= ?)
856 ''')
858 args.extend(
859 (to_kind_id(kind), kscale, tmax_seconds + 1))
860 if tmin_cond:
861 cond.append(' ( ' + ' OR '.join(tmin_cond) + ' ) ')
863 cond.append('%(db)s.%(nuts)s.tmax_seconds >= ?')
864 args.append(tmin_seconds)
866 def _codes_match_sql(self, kind_id, codes, cond, args):
867 pats = codes_patterns_for_kind(kind_id, codes)
868 if pats is None:
869 return
871 pats_exact = []
872 pats_nonexact = []
873 for pat in pats:
874 spat = pat.safe_str
875 (pats_exact if _is_exact(spat) else pats_nonexact).append(spat)
877 cond_exact = None
878 if pats_exact:
879 cond_exact = ' ( kind_codes.codes IN ( %s ) ) ' % ', '.join(
880 '?'*len(pats_exact))
882 args.extend(pats_exact)
884 cond_nonexact = None
885 if pats_nonexact:
886 cond_nonexact = ' ( %s ) ' % ' OR '.join(
887 ('kind_codes.codes GLOB ?',) * len(pats_nonexact))
889 args.extend(pats_nonexact)
891 if cond_exact and cond_nonexact:
892 cond.append(' ( %s OR %s ) ' % (cond_exact, cond_nonexact))
894 elif cond_exact:
895 cond.append(cond_exact)
897 elif cond_nonexact:
898 cond.append(cond_nonexact)
900 def iter_nuts(
901 self, kind=None, tmin=None, tmax=None, codes=None, naiv=False,
902 kind_codes_ids=None, path=None, limit=None):
904 '''
905 Iterate over content entities matching given constraints.
907 :param kind:
908 Content kind (or kinds) to extract.
909 :type kind:
910 :py:class:`str`, :py:class:`list` of :py:class:`str`
912 :param tmin:
913 Start time of query interval.
914 :type tmin:
915 timestamp
917 :param tmax:
918 End time of query interval.
919 :type tmax:
920 timestamp
922 :param codes:
923 List of code patterns to query.
924 :type codes:
925 :py:class:`list` of :py:class:`~pyrocko.squirrel.model.Codes`
926 objects appropriate for the queried content type, or anything which
927 can be converted to such objects.
929 :param naiv:
930 Bypass time span lookup through indices (slow, for testing).
931 :type naiv:
932 :py:class:`bool`
934 :param kind_codes_ids:
935 Kind-codes IDs of contents to be retrieved (internal use).
936 :type kind_codes_ids:
937 :py:class:`list` of :py:class:`int`
939 :yields:
940 :py:class:`~pyrocko.squirrel.model.Nut` objects representing the
941 intersecting content.
943 :complexity:
944 O(log N) for the time selection part due to heavy use of database
945 indices.
947 Query time span is treated as a half-open interval ``[tmin, tmax)``.
948 However, if ``tmin`` equals ``tmax``, the edge logics are modified to
949 closed-interval so that content intersecting with the time instant ``t
950 = tmin = tmax`` is returned (otherwise nothing would be returned as
951 ``[t, t)`` never matches anything).
953 Time spans of content entities to be matched are also treated as half
954 open intervals, e.g. content span ``[0, 1)`` is matched by query span
955 ``[0, 1)`` but not by ``[-1, 0)`` or ``[1, 2)``. Also here, logics are
956 modified to closed-interval when the content time span is an empty
957 interval, i.e. to indicate a time instant. E.g. time instant 0 is
958 matched by ``[0, 1)`` but not by ``[-1, 0)`` or ``[1, 2)``.
959 '''
961 if not isinstance(kind, str):
962 if kind is None:
963 kind = model.g_content_kinds
964 for kind_ in kind:
965 for nut in self.iter_nuts(kind_, tmin, tmax, codes):
966 yield nut
968 return
970 kind_id = to_kind_id(kind)
972 cond = []
973 args = []
974 if tmin is not None or tmax is not None:
975 assert kind is not None
976 if tmin is None:
977 tmin = self.get_time_span()[0]
978 if tmax is None:
979 tmax = self.get_time_span()[1] + 1.0
981 self._timerange_sql(tmin, tmax, kind, cond, args, naiv)
983 cond.append('kind_codes.kind_id == ?')
984 args.append(kind_id)
986 if codes is not None:
987 self._codes_match_sql(kind_id, codes, cond, args)
989 if kind_codes_ids is not None:
990 cond.append(
991 ' ( kind_codes.kind_codes_id IN ( %s ) ) ' % ', '.join(
992 '?'*len(kind_codes_ids)))
994 args.extend(kind_codes_ids)
996 db = self.get_database()
997 if path is not None:
998 cond.append('files.path == ?')
999 args.append(db.relpath(abspath(path)))
1001 sql = ('''
1002 SELECT
1003 files.path,
1004 files.format,
1005 files.mtime,
1006 files.size,
1007 %(db)s.%(nuts)s.file_segment,
1008 %(db)s.%(nuts)s.file_element,
1009 kind_codes.kind_id,
1010 kind_codes.codes,
1011 %(db)s.%(nuts)s.tmin_seconds,
1012 %(db)s.%(nuts)s.tmin_offset,
1013 %(db)s.%(nuts)s.tmax_seconds,
1014 %(db)s.%(nuts)s.tmax_offset,
1015 kind_codes.deltat
1016 FROM files
1017 INNER JOIN %(db)s.%(nuts)s
1018 ON files.file_id == %(db)s.%(nuts)s.file_id
1019 INNER JOIN kind_codes
1020 ON %(db)s.%(nuts)s.kind_codes_id == kind_codes.kind_codes_id
1021 ''')
1023 if cond:
1024 sql += ''' WHERE ''' + ' AND '.join(cond)
1026 if limit is not None:
1027 sql += ''' LIMIT %i''' % limit
1029 sql = self._sql(sql)
1030 if tmin is None and tmax is None:
1031 for row in self._conn.execute(sql, args):
1032 row = (db.abspath(row[0]),) + row[1:]
1033 nut = model.Nut(values_nocheck=row)
1034 yield nut
1035 else:
1036 assert tmin is not None and tmax is not None
1037 if tmin == tmax:
1038 for row in self._conn.execute(sql, args):
1039 row = (db.abspath(row[0]),) + row[1:]
1040 nut = model.Nut(values_nocheck=row)
1041 if (nut.tmin <= tmin < nut.tmax) \
1042 or (nut.tmin == nut.tmax and tmin == nut.tmin):
1044 yield nut
1045 else:
1046 for row in self._conn.execute(sql, args):
1047 row = (db.abspath(row[0]),) + row[1:]
1048 nut = model.Nut(values_nocheck=row)
1049 if (tmin < nut.tmax and nut.tmin < tmax) \
1050 or (nut.tmin == nut.tmax
1051 and tmin <= nut.tmin < tmax):
1053 yield nut
1055 def get_nuts(self, *args, **kwargs):
1056 '''
1057 Get content entities matching given constraints.
1059 Like :py:meth:`iter_nuts` but returns results as a list.
1060 '''
1062 return list(self.iter_nuts(*args, **kwargs))
1064 def _split_nuts(
1065 self, kind, tmin=None, tmax=None, codes=None, path=None):
1067 kind_id = to_kind_id(kind)
1068 tmin_seconds, tmin_offset = model.tsplit(tmin)
1069 tmax_seconds, tmax_offset = model.tsplit(tmax)
1071 names_main_nuts = dict(self._names)
1072 names_main_nuts.update(db='main', nuts='nuts')
1074 db = self.get_database()
1076 def main_nuts(s):
1077 return s % names_main_nuts
1079 with self.transaction('split nuts') as cursor:
1080 # modify selection and main
1081 for sql_subst in [
1082 self._sql, main_nuts]:
1084 cond = []
1085 args = []
1087 self._timerange_sql(tmin, tmax, kind, cond, args, False)
1089 if codes is not None:
1090 self._codes_match_sql(kind_id, codes, cond, args)
1092 if path is not None:
1093 cond.append('files.path == ?')
1094 args.append(db.relpath(abspath(path)))
1096 sql = sql_subst('''
1097 SELECT
1098 %(db)s.%(nuts)s.nut_id,
1099 %(db)s.%(nuts)s.tmin_seconds,
1100 %(db)s.%(nuts)s.tmin_offset,
1101 %(db)s.%(nuts)s.tmax_seconds,
1102 %(db)s.%(nuts)s.tmax_offset,
1103 kind_codes.deltat
1104 FROM files
1105 INNER JOIN %(db)s.%(nuts)s
1106 ON files.file_id == %(db)s.%(nuts)s.file_id
1107 INNER JOIN kind_codes
1108 ON %(db)s.%(nuts)s.kind_codes_id == kind_codes.kind_codes_id
1109 WHERE ''' + ' AND '.join(cond)) # noqa
1111 insert = []
1112 delete = []
1113 for row in cursor.execute(sql, args):
1114 nut_id, nut_tmin_seconds, nut_tmin_offset, \
1115 nut_tmax_seconds, nut_tmax_offset, nut_deltat = row
1117 nut_tmin = model.tjoin(
1118 nut_tmin_seconds, nut_tmin_offset)
1119 nut_tmax = model.tjoin(
1120 nut_tmax_seconds, nut_tmax_offset)
1122 if nut_tmin < tmax and tmin < nut_tmax:
1123 if nut_tmin < tmin:
1124 insert.append((
1125 nut_tmin_seconds, nut_tmin_offset,
1126 tmin_seconds, tmin_offset,
1127 model.tscale_to_kscale(
1128 tmin_seconds - nut_tmin_seconds),
1129 nut_id))
1131 if tmax < nut_tmax:
1132 insert.append((
1133 tmax_seconds, tmax_offset,
1134 nut_tmax_seconds, nut_tmax_offset,
1135 model.tscale_to_kscale(
1136 nut_tmax_seconds - tmax_seconds),
1137 nut_id))
1139 delete.append((nut_id,))
1141 sql_add = '''
1142 INSERT INTO %(db)s.%(nuts)s (
1143 file_id, file_segment, file_element, kind_id,
1144 kind_codes_id, tmin_seconds, tmin_offset,
1145 tmax_seconds, tmax_offset, kscale )
1146 SELECT
1147 file_id, file_segment, file_element,
1148 kind_id, kind_codes_id, ?, ?, ?, ?, ?
1149 FROM %(db)s.%(nuts)s
1150 WHERE nut_id == ?
1151 '''
1152 cursor.executemany(sql_subst(sql_add), insert)
1154 sql_delete = '''
1155 DELETE FROM %(db)s.%(nuts)s WHERE nut_id == ?
1156 '''
1157 cursor.executemany(sql_subst(sql_delete), delete)
1159 def get_time_span(self, kinds=None):
1160 '''
1161 Get time interval over all content in selection.
1163 :param kinds:
1164 If not ``None``, restrict query to given content kinds.
1165 :type kind:
1166 list of str
1168 :complexity:
1169 O(1), independent of the number of nuts.
1171 :returns:
1172 ``(tmin, tmax)``, combined time interval of queried content kinds.
1173 '''
1175 sql_min = self._sql('''
1176 SELECT MIN(tmin_seconds), MIN(tmin_offset)
1177 FROM %(db)s.%(nuts)s
1178 WHERE kind_id == ?
1179 AND tmin_seconds == (
1180 SELECT MIN(tmin_seconds)
1181 FROM %(db)s.%(nuts)s
1182 WHERE kind_id == ?)
1183 ''')
1185 sql_max = self._sql('''
1186 SELECT MAX(tmax_seconds), MAX(tmax_offset)
1187 FROM %(db)s.%(nuts)s
1188 WHERE kind_id == ?
1189 AND tmax_seconds == (
1190 SELECT MAX(tmax_seconds)
1191 FROM %(db)s.%(nuts)s
1192 WHERE kind_id == ?)
1193 ''')
1195 gtmin = None
1196 gtmax = None
1198 if isinstance(kinds, str):
1199 kinds = [kinds]
1201 if kinds is None:
1202 kind_ids = model.g_content_kind_ids
1203 else:
1204 kind_ids = model.to_kind_ids(kinds)
1206 for kind_id in kind_ids:
1207 for tmin_seconds, tmin_offset in self._conn.execute(
1208 sql_min, (kind_id, kind_id)):
1209 tmin = model.tjoin(tmin_seconds, tmin_offset)
1210 if tmin is not None and (gtmin is None or tmin < gtmin):
1211 gtmin = tmin
1213 for (tmax_seconds, tmax_offset) in self._conn.execute(
1214 sql_max, (kind_id, kind_id)):
1215 tmax = model.tjoin(tmax_seconds, tmax_offset)
1216 if tmax is not None and (gtmax is None or tmax > gtmax):
1217 gtmax = tmax
1219 return gtmin, gtmax
1221 def has(self, kinds):
1222 '''
1223 Check availability of given content kinds.
1225 :param kinds:
1226 Content kinds to query.
1227 :type kind:
1228 list of str
1230 :returns:
1231 ``True`` if any of the queried content kinds is available
1232 in the selection.
1233 '''
1234 self_tmin, self_tmax = self.get_time_span(kinds)
1236 return None not in (self_tmin, self_tmax)
1238 def get_deltat_span(self, kind):
1239 '''
1240 Get min and max sampling interval of all content of given kind.
1242 :param kind:
1243 Content kind
1244 :type kind:
1245 str
1247 :returns: ``(deltat_min, deltat_max)``
1248 '''
1250 deltats = [
1251 deltat for deltat in self.get_deltats(kind)
1252 if deltat is not None]
1254 if deltats:
1255 return min(deltats), max(deltats)
1256 else:
1257 return None, None
1259 def iter_kinds(self, codes=None):
1260 '''
1261 Iterate over content types available in selection.
1263 :param codes:
1264 If given, get kinds only for selected codes identifier.
1265 Only a single identifier may be given here and no pattern matching
1266 is done, currently.
1267 :type codes:
1268 :py:class:`~pyrocko.squirrel.model.Codes`
1270 :yields:
1271 Available content kinds as :py:class:`str`.
1273 :complexity:
1274 O(1), independent of number of nuts.
1275 '''
1277 return self._database._iter_kinds(
1278 codes=codes,
1279 kind_codes_count='%(db)s.%(kind_codes_count)s' % self._names)
1281 def iter_deltats(self, kind=None):
1282 '''
1283 Iterate over sampling intervals available in selection.
1285 :param kind:
1286 If given, get sampling intervals only for a given content type.
1287 :type kind:
1288 str
1290 :yields:
1291 :py:class:`float` values.
1293 :complexity:
1294 O(1), independent of number of nuts.
1295 '''
1296 return self._database._iter_deltats(
1297 kind=kind,
1298 kind_codes_count='%(db)s.%(kind_codes_count)s' % self._names)
1300 def iter_codes(self, kind=None):
1301 '''
1302 Iterate over content identifier code sequences available in selection.
1304 :param kind:
1305 If given, get codes only for a given content type.
1306 :type kind:
1307 str
1309 :yields:
1310 :py:class:`tuple` of :py:class:`str`
1312 :complexity:
1313 O(1), independent of number of nuts.
1314 '''
1315 return self._database._iter_codes(
1316 kind=kind,
1317 kind_codes_count='%(db)s.%(kind_codes_count)s' % self._names)
1319 def _iter_codes_info(self, kind=None, codes=None):
1320 '''
1321 Iterate over number of occurrences of any (kind, codes) combination.
1323 :param kind:
1324 If given, get counts only for selected content type.
1325 :type kind:
1326 str
1328 :yields:
1329 Tuples of the form ``(kind, codes, deltat, kind_codes_id, count)``.
1331 :complexity:
1332 O(1), independent of number of nuts.
1333 '''
1334 return self._database._iter_codes_info(
1335 kind=kind,
1336 codes=codes,
1337 kind_codes_count='%(db)s.%(kind_codes_count)s' % self._names)
1339 def get_kinds(self, codes=None):
1340 '''
1341 Get content types available in selection.
1343 :param codes:
1344 If given, get kinds only for selected codes identifier.
1345 Only a single identifier may be given here and no pattern matching
1346 is done, currently.
1347 :type codes:
1348 :py:class:`~pyrocko.squirrel.model.Codes`
1350 :returns:
1351 Sorted list of available content types.
1352 :rtype:
1353 py:class:`list` of :py:class:`str`
1355 :complexity:
1356 O(1), independent of number of nuts.
1358 '''
1359 return sorted(list(self.iter_kinds(codes=codes)))
1361 def get_deltats(self, kind=None):
1362 '''
1363 Get sampling intervals available in selection.
1365 :param kind:
1366 If given, get sampling intervals only for selected content type.
1367 :type kind:
1368 str
1370 :complexity:
1371 O(1), independent of number of nuts.
1373 :returns: Sorted list of available sampling intervals.
1374 '''
1375 return sorted(list(self.iter_deltats(kind=kind)))
1377 def get_codes(self, kind=None):
1378 '''
1379 Get identifier code sequences available in selection.
1381 :param kind:
1382 If given, get codes only for selected content type.
1383 :type kind:
1384 str
1386 :complexity:
1387 O(1), independent of number of nuts.
1389 :returns: Sorted list of available codes as tuples of strings.
1390 '''
1391 return sorted(list(self.iter_codes(kind=kind)))
1393 def get_counts(self, kind=None):
1394 '''
1395 Get number of occurrences of any (kind, codes) combination.
1397 :param kind:
1398 If given, get codes only for selected content type.
1399 :type kind:
1400 str
1402 :complexity:
1403 O(1), independent of number of nuts.
1405 :returns: ``dict`` with ``counts[kind][codes]`` or ``counts[codes]``
1406 if kind is not ``None``
1407 '''
1408 d = {}
1409 for kind_id, codes, _, _, count in self._iter_codes_info(kind=kind):
1410 if kind_id not in d:
1411 v = d[kind_id] = {}
1412 else:
1413 v = d[kind_id]
1415 if codes not in v:
1416 v[codes] = 0
1418 v[codes] += count
1420 if kind is not None:
1421 return d[to_kind_id(kind)]
1422 else:
1423 return dict((to_kind(kind_id), v) for (kind_id, v) in d.items())
1425 def glob_codes(self, kind, codes):
1426 '''
1427 Find codes matching given patterns.
1429 :param kind:
1430 Content kind to be queried.
1431 :type kind:
1432 str
1434 :param codes:
1435 List of code patterns to query.
1436 :type codes:
1437 :py:class:`list` of :py:class:`~pyrocko.squirrel.model.Codes`
1438 objects appropriate for the queried content type, or anything which
1439 can be converted to such objects.
1441 :returns:
1442 List of matches of the form ``[kind_codes_id, codes, deltat]``.
1443 '''
1445 kind_id = to_kind_id(kind)
1446 args = [kind_id]
1447 pats = codes_patterns_for_kind(kind_id, codes)
1449 if pats:
1450 codes_cond = 'AND ( %s ) ' % ' OR '.join(
1451 ('kind_codes.codes GLOB ?',) * len(pats))
1453 args.extend(pat.safe_str for pat in pats)
1454 else:
1455 codes_cond = ''
1457 sql = self._sql('''
1458 SELECT kind_codes_id, codes, deltat FROM kind_codes
1459 WHERE
1460 kind_id == ? ''' + codes_cond)
1462 return list(map(list, self._conn.execute(sql, args)))
1464 def update(self, constraint=None, **kwargs):
1465 '''
1466 Update or partially update channel and event inventories.
1468 :param constraint:
1469 Selection of times or areas to be brought up to date.
1470 :type constraint:
1471 :py:class:`~pyrocko.squirrel.client.base.Constraint`
1473 :param \\*\\*kwargs:
1474 Shortcut for setting ``constraint=Constraint(**kwargs)``.
1476 This function triggers all attached remote sources, to check for
1477 updates in the meta-data. The sources will only submit queries when
1478 their expiration date has passed, or if the selection spans into
1479 previously unseen times or areas.
1480 '''
1482 if constraint is None:
1483 constraint = client.Constraint(**kwargs)
1485 for source in self._sources:
1486 source.update_channel_inventory(self, constraint)
1487 source.update_event_inventory(self, constraint)
1489 def update_waveform_promises(self, constraint=None, **kwargs):
1490 '''
1491 Permit downloading of remote waveforms.
1493 :param constraint:
1494 Remote waveforms compatible with the given constraint are enabled
1495 for download.
1496 :type constraint:
1497 :py:class:`~pyrocko.squirrel.client.base.Constraint`
1499 :param \\*\\*kwargs:
1500 Shortcut for setting ``constraint=Constraint(**kwargs)``.
1502 Calling this method permits Squirrel to download waveforms from remote
1503 sources when processing subsequent waveform requests. This works by
1504 inserting so called waveform promises into the database. It will look
1505 into the available channels for each remote source and create a promise
1506 for each channel compatible with the given constraint. If the promise
1507 then matches in a waveform request, Squirrel tries to download the
1508 waveform. If the download is successful, the downloaded waveform is
1509 added to the Squirrel and the promise is deleted. If the download
1510 fails, the promise is kept if the reason of failure looks like being
1511 temporary, e.g. because of a network failure. If the cause of failure
1512 however seems to be permanent, the promise is deleted so that no
1513 further attempts are made to download a waveform which might not be
1514 available from that server at all. To force re-scheduling after a
1515 permanent failure, call :py:meth:`update_waveform_promises`
1516 yet another time.
1517 '''
1519 if constraint is None:
1520 constraint = client.Constraint(**kwargs)
1522 for source in self._sources:
1523 source.update_waveform_promises(self, constraint)
1525 def remove_waveform_promises(self, from_database='selection'):
1526 '''
1527 Remove waveform promises from live selection or global database.
1529 Calling this function removes all waveform promises provided by the
1530 attached sources.
1532 :param from_database:
1533 Remove from live selection ``'selection'`` or global database
1534 ``'global'``.
1535 '''
1536 for source in self._sources:
1537 source.remove_waveform_promises(self, from_database=from_database)
1539 def update_responses(self, constraint=None, **kwargs):
1540 if constraint is None:
1541 constraint = client.Constraint(**kwargs)
1543 for source in self._sources:
1544 source.update_response_inventory(self, constraint)
1546 def get_nfiles(self):
1547 '''
1548 Get number of files in selection.
1549 '''
1551 sql = self._sql('''SELECT COUNT(*) FROM %(db)s.%(file_states)s''')
1552 for row in self._conn.execute(sql):
1553 return row[0]
1555 def get_nnuts(self):
1556 '''
1557 Get number of nuts in selection.
1558 '''
1560 sql = self._sql('''SELECT COUNT(*) FROM %(db)s.%(nuts)s''')
1561 for row in self._conn.execute(sql):
1562 return row[0]
1564 def get_total_size(self):
1565 '''
1566 Get aggregated file size available in selection.
1567 '''
1569 sql = self._sql('''
1570 SELECT SUM(files.size) FROM %(db)s.%(file_states)s
1571 INNER JOIN files
1572 ON %(db)s.%(file_states)s.file_id = files.file_id
1573 ''')
1575 for row in self._conn.execute(sql):
1576 return row[0] or 0
1578 def get_stats(self):
1579 '''
1580 Get statistics on contents available through this selection.
1581 '''
1583 kinds = self.get_kinds()
1584 time_spans = {}
1585 for kind in kinds:
1586 time_spans[kind] = self.get_time_span([kind])
1588 return SquirrelStats(
1589 nfiles=self.get_nfiles(),
1590 nnuts=self.get_nnuts(),
1591 kinds=kinds,
1592 codes=self.get_codes(),
1593 total_size=self.get_total_size(),
1594 counts=self.get_counts(),
1595 time_spans=time_spans,
1596 sources=[s.describe() for s in self._sources],
1597 operators=[op.describe() for op in self._operators])
1599 def get_content(
1600 self,
1601 nut,
1602 cache_id='default',
1603 accessor_id='default',
1604 show_progress=False,
1605 model='squirrel'):
1607 '''
1608 Get and possibly load full content for a given index entry from file.
1610 Loads the actual content objects (channel, station, waveform, ...) from
1611 file. For efficiency, sibling content (all stuff in the same file
1612 segment) will also be loaded as a side effect. The loaded contents are
1613 cached in the Squirrel object.
1614 '''
1616 content_cache = self._content_caches[cache_id]
1617 if not content_cache.has(nut):
1619 for nut_loaded in io.iload(
1620 nut.file_path,
1621 segment=nut.file_segment,
1622 format=nut.file_format,
1623 database=self._database,
1624 update_selection=self,
1625 show_progress=show_progress):
1627 content_cache.put(nut_loaded)
1629 try:
1630 return content_cache.get(nut, accessor_id, model)
1632 except KeyError:
1633 raise error.NotAvailable(
1634 'Unable to retrieve content: %s, %s, %s, %s' % nut.key)
1636 def advance_accessor(self, accessor_id='default', cache_id=None):
1637 '''
1638 Notify memory caches about consumer moving to a new data batch.
1640 :param accessor_id:
1641 Name of accessing consumer to be advanced.
1642 :type accessor_id:
1643 str
1645 :param cache_id:
1646 Name of cache to for which the accessor should be advanced. By
1647 default the named accessor is advanced in all registered caches.
1648 By default, two caches named ``'default'`` and ``'waveform'`` are
1649 available.
1650 :type cache_id:
1651 str
1653 See :py:class:`~pyrocko.squirrel.cache.ContentCache` for details on how
1654 Squirrel's memory caching works and can be tuned. Default behaviour is
1655 to release data when it has not been used in the latest data
1656 window/batch. If the accessor is never advanced, data is cached
1657 indefinitely - which is often desired e.g. for station meta-data.
1658 Methods for consecutive data traversal, like
1659 :py:meth:`chopper_waveforms` automatically advance and clear
1660 their accessor.
1661 '''
1662 for cache_ in (
1663 self._content_caches.keys()
1664 if cache_id is None
1665 else [cache_id]):
1667 self._content_caches[cache_].advance_accessor(accessor_id)
1669 def clear_accessor(self, accessor_id, cache_id=None):
1670 '''
1671 Notify memory caches about a consumer having finished.
1673 :param accessor_id:
1674 Name of accessor to be cleared.
1675 :type accessor_id:
1676 str
1678 :param cache_id:
1679 Name of cache for which the accessor should be cleared. By default
1680 the named accessor is cleared from all registered caches. By
1681 default, two caches named ``'default'`` and ``'waveform'`` are
1682 available.
1683 :type cache_id:
1684 str
1686 Calling this method clears all references to cache entries held by the
1687 named accessor. Cache entries are then freed if not referenced by any
1688 other accessor.
1689 '''
1691 for cache_ in (
1692 self._content_caches.keys()
1693 if cache_id is None
1694 else [cache_id]):
1696 self._content_caches[cache_].clear_accessor(accessor_id)
1698 def get_cache_stats(self, cache_id):
1699 return self._content_caches[cache_id].get_stats()
1701 @filldocs
1702 def get_stations(
1703 self, obj=None, tmin=None, tmax=None, time=None, codes=None,
1704 model='squirrel'):
1706 '''
1707 Get stations matching given constraints.
1709 %(query_args)s
1711 :param model:
1712 Select object model for returned values: ``'squirrel'`` to get
1713 Squirrel station objects or ``'pyrocko'`` to get Pyrocko station
1714 objects with channel information attached.
1715 :type model:
1716 str
1718 :returns:
1719 List of :py:class:`pyrocko.squirrel.Station
1720 <pyrocko.squirrel.model.Station>` objects by default or list of
1721 :py:class:`pyrocko.model.Station <pyrocko.model.station.Station>`
1722 objects if ``model='pyrocko'`` is requested.
1724 See :py:meth:`iter_nuts` for details on time span matching.
1725 '''
1727 if model == 'pyrocko':
1728 return self._get_pyrocko_stations(obj, tmin, tmax, time, codes)
1729 elif model in ('squirrel', 'stationxml', 'stationxml+'):
1730 args = self._get_selection_args(
1731 STATION, obj, tmin, tmax, time, codes)
1733 nuts = sorted(
1734 self.iter_nuts('station', *args), key=lambda nut: nut.dkey)
1736 return [self.get_content(nut, model=model) for nut in nuts]
1737 else:
1738 raise ValueError('Invalid station model: %s' % model)
1740 @filldocs
1741 def get_channels(
1742 self, obj=None, tmin=None, tmax=None, time=None, codes=None,
1743 model='squirrel'):
1745 '''
1746 Get channels matching given constraints.
1748 %(query_args)s
1750 :returns:
1751 List of :py:class:`~pyrocko.squirrel.model.Channel` objects.
1753 See :py:meth:`iter_nuts` for details on time span matching.
1754 '''
1756 args = self._get_selection_args(
1757 CHANNEL, obj, tmin, tmax, time, codes)
1759 nuts = sorted(
1760 self.iter_nuts('channel', *args), key=lambda nut: nut.dkey)
1762 return [self.get_content(nut, model=model) for nut in nuts]
1764 @filldocs
1765 def get_sensors(
1766 self, obj=None, tmin=None, tmax=None, time=None, codes=None):
1768 '''
1769 Get sensors matching given constraints.
1771 %(query_args)s
1773 :returns:
1774 List of :py:class:`~pyrocko.squirrel.model.Sensor` objects.
1776 See :py:meth:`iter_nuts` for details on time span matching.
1777 '''
1779 tmin, tmax, codes = self._get_selection_args(
1780 CHANNEL, obj, tmin, tmax, time, codes)
1782 if codes is not None:
1783 codes = codes_patterns_list(
1784 (entry.replace(channel=entry.channel[:-1] + '?')
1785 if entry.channel != '*' else entry)
1786 for entry in codes)
1788 nuts = sorted(
1789 self.iter_nuts(
1790 'channel', tmin, tmax, codes), key=lambda nut: nut.dkey)
1792 return [
1793 sensor for sensor in model.Sensor.from_channels(
1794 self.get_content(nut) for nut in nuts)
1795 if match_time_span(tmin, tmax, sensor)]
1797 @filldocs
1798 def get_responses(
1799 self, obj=None, tmin=None, tmax=None, time=None, codes=None,
1800 model='squirrel'):
1802 '''
1803 Get instrument responses matching given constraints.
1805 %(query_args)s
1807 :returns:
1808 List of :py:class:`~pyrocko.squirrel.model.Response` objects.
1810 See :py:meth:`iter_nuts` for details on time span matching.
1811 '''
1813 args = self._get_selection_args(
1814 RESPONSE, obj, tmin, tmax, time, codes)
1816 nuts = sorted(
1817 self.iter_nuts('response', *args), key=lambda nut: nut.dkey)
1819 return [self.get_content(nut, model=model) for nut in nuts]
1821 @filldocs
1822 def get_response(
1823 self, obj=None, tmin=None, tmax=None, time=None, codes=None,
1824 model='squirrel'):
1826 '''
1827 Get instrument response matching given constraints.
1829 %(query_args)s
1831 :returns:
1832 :py:class:`~pyrocko.squirrel.model.Response` object.
1834 Same as :py:meth:`get_responses` but returning exactly one response.
1835 Raises :py:exc:`~pyrocko.squirrel.error.NotAvailable` if zero or more
1836 than one is available.
1838 See :py:meth:`iter_nuts` for details on time span matching.
1839 '''
1841 if model == 'stationxml':
1842 model_ = 'stationxml+'
1843 else:
1844 model_ = model
1846 responses = self.get_responses(
1847 obj, tmin, tmax, time, codes, model=model_)
1848 if len(responses) == 0:
1849 raise error.NotAvailable(
1850 'No instrument response available (%s).'
1851 % self._get_selection_args_str(
1852 RESPONSE, obj, tmin, tmax, time, codes))
1854 elif len(responses) > 1:
1855 if model_ == 'squirrel':
1856 resps_sq = responses
1857 elif model_ == 'stationxml+':
1858 resps_sq = [resp[0] for resp in responses]
1859 else:
1860 raise ValueError('Invalid response model: %s' % model)
1862 rinfo = ':\n' + '\n'.join(
1863 ' ' + resp.summary for resp in resps_sq)
1865 raise error.NotAvailable(
1866 'Multiple instrument responses matching given constraints '
1867 '(%s)%s' % (
1868 self._get_selection_args_str(
1869 RESPONSE, obj, tmin, tmax, time, codes), rinfo))
1871 if model == 'stationxml':
1872 return responses[0][1]
1873 else:
1874 return responses[0]
1876 @filldocs
1877 def get_events(
1878 self, obj=None, tmin=None, tmax=None, time=None, codes=None):
1880 '''
1881 Get events matching given constraints.
1883 %(query_args)s
1885 :returns:
1886 List of :py:class:`~pyrocko.model.event.Event` objects.
1888 See :py:meth:`iter_nuts` for details on time span matching.
1889 '''
1891 args = self._get_selection_args(EVENT, obj, tmin, tmax, time, codes)
1892 nuts = sorted(
1893 self.iter_nuts('event', *args), key=lambda nut: nut.dkey)
1895 return [self.get_content(nut) for nut in nuts]
1897 def _redeem_promises(self, *args, order_only=False):
1899 def split_promise(order):
1900 self._split_nuts(
1901 'waveform_promise',
1902 order.tmin, order.tmax,
1903 codes=order.codes,
1904 path=order.source_id)
1906 tmin, tmax, _ = args
1908 waveforms = list(self.iter_nuts('waveform', *args))
1909 promises = list(self.iter_nuts('waveform_promise', *args))
1911 codes_to_avail = defaultdict(list)
1912 for nut in waveforms:
1913 codes_to_avail[nut.codes].append((nut.tmin, nut.tmax))
1915 def tts(x):
1916 if isinstance(x, tuple):
1917 return tuple(tts(e) for e in x)
1918 elif isinstance(x, list):
1919 return list(tts(e) for e in x)
1920 else:
1921 return util.time_to_str(x)
1923 orders = []
1924 for promise in promises:
1925 waveforms_avail = codes_to_avail[promise.codes]
1926 for block_tmin, block_tmax in blocks(
1927 max(tmin, promise.tmin),
1928 min(tmax, promise.tmax),
1929 promise.deltat):
1931 orders.append(
1932 WaveformOrder(
1933 source_id=promise.file_path,
1934 codes=promise.codes,
1935 tmin=block_tmin,
1936 tmax=block_tmax,
1937 deltat=promise.deltat,
1938 gaps=gaps(waveforms_avail, block_tmin, block_tmax)))
1940 orders_noop, orders = lpick(lambda order: order.gaps, orders)
1942 order_keys_noop = set(order_key(order) for order in orders_noop)
1943 if len(order_keys_noop) != 0 or len(orders_noop) != 0:
1944 logger.info(
1945 'Waveform orders already satisified with cached/local data: '
1946 '%i (%i)' % (len(order_keys_noop), len(orders_noop)))
1948 for order in orders_noop:
1949 split_promise(order)
1951 if order_only:
1952 if orders:
1953 self._pending_orders.extend(orders)
1954 logger.info(
1955 'Enqueuing %i waveform order%s.'
1956 % len_plural(orders))
1957 return
1958 else:
1959 if self._pending_orders:
1960 orders.extend(self._pending_orders)
1961 logger.info(
1962 'Adding %i previously enqueued order%s.'
1963 % len_plural(self._pending_orders))
1965 self._pending_orders = []
1967 source_ids = []
1968 sources = {}
1969 for source in self._sources:
1970 if isinstance(source, fdsn.FDSNSource):
1971 source_ids.append(source._source_id)
1972 sources[source._source_id] = source
1974 source_priority = dict(
1975 (source_id, i) for (i, source_id) in enumerate(source_ids))
1977 order_groups = defaultdict(list)
1978 for order in orders:
1979 order_groups[order_key(order)].append(order)
1981 for k, order_group in order_groups.items():
1982 order_group.sort(
1983 key=lambda order: source_priority[order.source_id])
1985 n_order_groups = len(order_groups)
1987 if len(order_groups) != 0 or len(orders) != 0:
1988 logger.info(
1989 'Waveform orders standing for download: %i (%i)'
1990 % (len(order_groups), len(orders)))
1992 task = make_task('Waveform orders processed', n_order_groups)
1993 else:
1994 task = None
1996 def release_order_group(order):
1997 okey = order_key(order)
1998 for followup in order_groups[okey]:
1999 split_promise(followup)
2001 del order_groups[okey]
2003 if task:
2004 task.update(n_order_groups - len(order_groups))
2006 def noop(order):
2007 pass
2009 def success(order):
2010 release_order_group(order)
2011 split_promise(order)
2013 def batch_add(paths):
2014 self.add(paths)
2016 calls = queue.Queue()
2018 def enqueue(f):
2019 def wrapper(*args):
2020 calls.put((f, args))
2022 return wrapper
2024 while order_groups:
2026 orders_now = []
2027 empty = []
2028 for k, order_group in order_groups.items():
2029 try:
2030 orders_now.append(order_group.pop(0))
2031 except IndexError:
2032 empty.append(k)
2034 for k in empty:
2035 del order_groups[k]
2037 by_source_id = defaultdict(list)
2038 for order in orders_now:
2039 by_source_id[order.source_id].append(order)
2041 threads = []
2042 for source_id in by_source_id:
2043 def download():
2044 try:
2045 sources[source_id].download_waveforms(
2046 by_source_id[source_id],
2047 success=enqueue(success),
2048 error_permanent=enqueue(split_promise),
2049 error_temporary=noop,
2050 batch_add=enqueue(batch_add))
2052 finally:
2053 calls.put(None)
2055 thread = threading.Thread(target=download)
2056 thread.start()
2057 threads.append(thread)
2059 ndone = 0
2060 while ndone < len(threads):
2061 ret = calls.get()
2062 if ret is None:
2063 ndone += 1
2064 else:
2065 ret[0](*ret[1])
2067 for thread in threads:
2068 thread.join()
2070 if task:
2071 task.update(n_order_groups - len(order_groups))
2073 if task:
2074 task.done()
2076 @filldocs
2077 def get_waveform_nuts(
2078 self, obj=None, tmin=None, tmax=None, time=None, codes=None,
2079 order_only=False):
2081 '''
2082 Get waveform content entities matching given constraints.
2084 %(query_args)s
2086 Like :py:meth:`get_nuts` with ``kind='waveform'`` but additionally
2087 resolves matching waveform promises (downloads waveforms from remote
2088 sources).
2090 See :py:meth:`iter_nuts` for details on time span matching.
2091 '''
2093 args = self._get_selection_args(WAVEFORM, obj, tmin, tmax, time, codes)
2094 self._redeem_promises(*args, order_only=order_only)
2095 return sorted(
2096 self.iter_nuts('waveform', *args), key=lambda nut: nut.dkey)
2098 @filldocs
2099 def have_waveforms(
2100 self, obj=None, tmin=None, tmax=None, time=None, codes=None):
2102 '''
2103 Check if any waveforms or waveform promises are available for given
2104 constraints.
2106 %(query_args)s
2107 '''
2109 args = self._get_selection_args(WAVEFORM, obj, tmin, tmax, time, codes)
2110 return bool(list(
2111 self.iter_nuts('waveform', *args, limit=1))) \
2112 or bool(list(
2113 self.iter_nuts('waveform_promise', *args, limit=1)))
2115 @filldocs
2116 def get_waveforms(
2117 self, obj=None, tmin=None, tmax=None, time=None, codes=None,
2118 uncut=False, want_incomplete=True, degap=True, maxgap=5,
2119 maxlap=None, snap=None, include_last=False, load_data=True,
2120 accessor_id='default', operator_params=None, order_only=False):
2122 '''
2123 Get waveforms matching given constraints.
2125 %(query_args)s
2127 :param uncut:
2128 Set to ``True``, to disable cutting traces to [``tmin``, ``tmax``]
2129 and to disable degapping/deoverlapping. Returns untouched traces as
2130 they are read from file segment. File segments are always read in
2131 their entirety.
2132 :type uncut:
2133 bool
2135 :param want_incomplete:
2136 If ``True``, gappy/incomplete traces are included in the result.
2137 :type want_incomplete:
2138 bool
2140 :param degap:
2141 If ``True``, connect traces and remove gaps and overlaps.
2142 :type degap:
2143 bool
2145 :param maxgap:
2146 Maximum gap size in samples which is filled with interpolated
2147 samples when ``degap`` is ``True``.
2148 :type maxgap:
2149 int
2151 :param maxlap:
2152 Maximum overlap size in samples which is removed when ``degap`` is
2153 ``True``.
2154 :type maxlap:
2155 int
2157 :param snap:
2158 Rounding functions used when computing sample index from time
2159 instance, for trace start and trace end, respectively. By default,
2160 ``(round, round)`` is used.
2161 :type snap:
2162 tuple of 2 callables
2164 :param include_last:
2165 If ``True``, add one more sample to the returned traces (the sample
2166 which would be the first sample of a query with ``tmin`` set to the
2167 current value of ``tmax``).
2168 :type include_last:
2169 bool
2171 :param load_data:
2172 If ``True``, waveform data samples are read from files (or cache).
2173 If ``False``, meta-information-only traces are returned (dummy
2174 traces with no data samples).
2175 :type load_data:
2176 bool
2178 :param accessor_id:
2179 Name of consumer on who's behalf data is accessed. Used in cache
2180 management (see :py:mod:`~pyrocko.squirrel.cache`). Used as a key
2181 to distinguish different points of extraction for the decision of
2182 when to release cached waveform data. Should be used when data is
2183 alternately extracted from more than one region / selection.
2184 :type accessor_id:
2185 str
2187 See :py:meth:`iter_nuts` for details on time span matching.
2189 Loaded data is kept in memory (at least) until
2190 :py:meth:`clear_accessor` has been called or
2191 :py:meth:`advance_accessor` has been called two consecutive times
2192 without data being accessed between the two calls (by this accessor).
2193 Data may still be further kept in the memory cache if held alive by
2194 consumers with a different ``accessor_id``.
2195 '''
2197 tmin, tmax, codes = self._get_selection_args(
2198 WAVEFORM, obj, tmin, tmax, time, codes)
2200 self_tmin, self_tmax = self.get_time_span(
2201 ['waveform', 'waveform_promise'])
2203 if None in (self_tmin, self_tmax):
2204 logger.warning(
2205 'No waveforms available.')
2206 return []
2208 tmin = tmin if tmin is not None else self_tmin
2209 tmax = tmax if tmax is not None else self_tmax
2211 if codes is not None and len(codes) == 1:
2212 # TODO: fix for multiple / mixed codes
2213 operator = self.get_operator(codes[0])
2214 if operator is not None:
2215 return operator.get_waveforms(
2216 self, codes[0],
2217 tmin=tmin, tmax=tmax,
2218 uncut=uncut, want_incomplete=want_incomplete, degap=degap,
2219 maxgap=maxgap, maxlap=maxlap, snap=snap,
2220 include_last=include_last, load_data=load_data,
2221 accessor_id=accessor_id, params=operator_params)
2223 nuts = self.get_waveform_nuts(
2224 obj, tmin, tmax, time, codes, order_only=order_only)
2226 if order_only:
2227 return []
2229 if load_data:
2230 traces = [
2231 self.get_content(nut, 'waveform', accessor_id) for nut in nuts]
2233 else:
2234 traces = [
2235 trace.Trace(**nut.trace_kwargs) for nut in nuts]
2237 if uncut:
2238 return traces
2240 if snap is None:
2241 snap = (round, round)
2243 chopped = []
2244 for tr in traces:
2245 if not load_data and tr.ydata is not None:
2246 tr = tr.copy(data=False)
2247 tr.ydata = None
2249 try:
2250 chopped.append(tr.chop(
2251 tmin, tmax,
2252 inplace=False,
2253 snap=snap,
2254 include_last=include_last))
2256 except trace.NoData:
2257 pass
2259 processed = self._process_chopped(
2260 chopped, degap, maxgap, maxlap, want_incomplete, tmin, tmax)
2262 return processed
2264 @filldocs
2265 def chopper_waveforms(
2266 self, obj=None, tmin=None, tmax=None, time=None, codes=None,
2267 tinc=None, tpad=0.,
2268 want_incomplete=True, snap_window=False,
2269 degap=True, maxgap=5, maxlap=None,
2270 snap=None, include_last=False, load_data=True,
2271 accessor_id=None, clear_accessor=True, operator_params=None,
2272 grouping=None):
2274 '''
2275 Iterate window-wise over waveform archive.
2277 %(query_args)s
2279 :param tinc:
2280 Time increment (window shift time) (default uses ``tmax-tmin``).
2281 :type tinc:
2282 timestamp
2284 :param tpad:
2285 Padding time appended on either side of the data window (window
2286 overlap is ``2*tpad``).
2287 :type tpad:
2288 timestamp
2290 :param want_incomplete:
2291 If ``True``, gappy/incomplete traces are included in the result.
2292 :type want_incomplete:
2293 bool
2295 :param snap_window:
2296 If ``True``, start time windows at multiples of tinc with respect
2297 to system time zero.
2298 :type snap_window:
2299 bool
2301 :param degap:
2302 If ``True``, connect traces and remove gaps and overlaps.
2303 :type degap:
2304 bool
2306 :param maxgap:
2307 Maximum gap size in samples which is filled with interpolated
2308 samples when ``degap`` is ``True``.
2309 :type maxgap:
2310 int
2312 :param maxlap:
2313 Maximum overlap size in samples which is removed when ``degap`` is
2314 ``True``.
2315 :type maxlap:
2316 int
2318 :param snap:
2319 Rounding functions used when computing sample index from time
2320 instance, for trace start and trace end, respectively. By default,
2321 ``(round, round)`` is used.
2322 :type snap:
2323 tuple of 2 callables
2325 :param include_last:
2326 If ``True``, add one more sample to the returned traces (the sample
2327 which would be the first sample of a query with ``tmin`` set to the
2328 current value of ``tmax``).
2329 :type include_last:
2330 bool
2332 :param load_data:
2333 If ``True``, waveform data samples are read from files (or cache).
2334 If ``False``, meta-information-only traces are returned (dummy
2335 traces with no data samples).
2336 :type load_data:
2337 bool
2339 :param accessor_id:
2340 Name of consumer on who's behalf data is accessed. Used in cache
2341 management (see :py:mod:`~pyrocko.squirrel.cache`). Used as a key
2342 to distinguish different points of extraction for the decision of
2343 when to release cached waveform data. Should be used when data is
2344 alternately extracted from more than one region / selection.
2345 :type accessor_id:
2346 str
2348 :param clear_accessor:
2349 If ``True`` (default), :py:meth:`clear_accessor` is called when the
2350 chopper finishes. Set to ``False`` to keep loaded waveforms in
2351 memory when the generator returns.
2352 :type clear_accessor:
2353 bool
2355 :param grouping:
2356 By default, traversal over the data is over time and all matching
2357 traces of a time window are yielded. Using this option, it is
2358 possible to traverse the data first by group (e.g. station or
2359 network) and second by time. This can reduce the number of traces
2360 in each batch and thus reduce the memory footprint of the process.
2361 :type grouping:
2362 :py:class:`~pyrocko.squirrel.operator.Grouping`
2364 :yields:
2365 A list of :py:class:`~pyrocko.trace.Trace` objects for every
2366 extracted time window.
2368 See :py:meth:`iter_nuts` for details on time span matching.
2369 '''
2371 tmin, tmax, codes = self._get_selection_args(
2372 WAVEFORM, obj, tmin, tmax, time, codes)
2374 self_tmin, self_tmax = self.get_time_span(
2375 ['waveform', 'waveform_promise'])
2377 if None in (self_tmin, self_tmax):
2378 logger.warning(
2379 'Content has undefined time span. No waveforms and no '
2380 'waveform promises?')
2381 return
2383 if snap_window and tinc is not None:
2384 tmin = tmin if tmin is not None else self_tmin
2385 tmax = tmax if tmax is not None else self_tmax
2386 tmin = math.floor(tmin / tinc) * tinc
2387 tmax = math.ceil(tmax / tinc) * tinc
2388 else:
2389 tmin = tmin if tmin is not None else self_tmin + tpad
2390 tmax = tmax if tmax is not None else self_tmax - tpad
2392 tinc = tinc if tinc is not None else tmax - tmin
2394 try:
2395 if accessor_id is None:
2396 accessor_id = 'chopper%i' % self._n_choppers_active
2398 self._n_choppers_active += 1
2400 eps = tinc * 1e-6
2401 if tinc != 0.0:
2402 nwin = int(((tmax - eps) - tmin) / tinc) + 1
2403 else:
2404 nwin = 1
2406 if grouping is None:
2407 codes_list = [codes]
2408 else:
2409 operator = Operator(
2410 filtering=CodesPatternFiltering(codes=codes),
2411 grouping=grouping)
2413 available = set(self.get_codes(kind='waveform'))
2414 available.update(self.get_codes(kind='waveform_promise'))
2415 operator.update_mappings(sorted(available))
2417 codes_list = [
2418 codes_patterns_list(scl)
2419 for scl in operator.iter_in_codes()]
2421 ngroups = len(codes_list)
2422 for igroup, scl in enumerate(codes_list):
2423 for iwin in range(nwin):
2424 wmin, wmax = tmin+iwin*tinc, min(tmin+(iwin+1)*tinc, tmax)
2426 chopped = self.get_waveforms(
2427 tmin=wmin-tpad,
2428 tmax=wmax+tpad,
2429 codes=scl,
2430 snap=snap,
2431 include_last=include_last,
2432 load_data=load_data,
2433 want_incomplete=want_incomplete,
2434 degap=degap,
2435 maxgap=maxgap,
2436 maxlap=maxlap,
2437 accessor_id=accessor_id,
2438 operator_params=operator_params)
2440 self.advance_accessor(accessor_id)
2442 yield Batch(
2443 tmin=wmin,
2444 tmax=wmax,
2445 i=iwin,
2446 n=nwin,
2447 igroup=igroup,
2448 ngroups=ngroups,
2449 traces=chopped)
2451 finally:
2452 self._n_choppers_active -= 1
2453 if clear_accessor:
2454 self.clear_accessor(accessor_id, 'waveform')
2456 def _process_chopped(
2457 self, chopped, degap, maxgap, maxlap, want_incomplete, tmin, tmax):
2459 chopped.sort(key=lambda a: a.full_id)
2460 if degap:
2461 chopped = trace.degapper(chopped, maxgap=maxgap, maxlap=maxlap)
2463 if not want_incomplete:
2464 chopped_weeded = []
2465 for tr in chopped:
2466 emin = tr.tmin - tmin
2467 emax = tr.tmax + tr.deltat - tmax
2468 if (abs(emin) <= 0.5*tr.deltat and abs(emax) <= 0.5*tr.deltat):
2469 chopped_weeded.append(tr)
2471 elif degap:
2472 if (0. < emin <= 5. * tr.deltat
2473 and -5. * tr.deltat <= emax < 0.):
2475 tr.extend(tmin, tmax-tr.deltat, fillmethod='repeat')
2476 chopped_weeded.append(tr)
2478 chopped = chopped_weeded
2480 return chopped
2482 def _get_pyrocko_stations(
2483 self, obj=None, tmin=None, tmax=None, time=None, codes=None):
2485 from pyrocko import model as pmodel
2487 if codes is not None:
2488 codes = codes_patterns_for_kind(STATION, codes)
2490 by_nsl = defaultdict(lambda: (list(), list()))
2491 for station in self.get_stations(obj, tmin, tmax, time, codes):
2492 sargs = station._get_pyrocko_station_args()
2493 by_nsl[station.codes.nsl][0].append(sargs)
2495 if codes is not None:
2496 codes = [model.CodesNSLCE(c) for c in codes]
2498 for channel in self.get_channels(obj, tmin, tmax, time, codes):
2499 sargs = channel._get_pyrocko_station_args()
2500 sargs_list, channels_list = by_nsl[channel.codes.nsl]
2501 sargs_list.append(sargs)
2502 channels_list.append(channel)
2504 pstations = []
2505 nsls = list(by_nsl.keys())
2506 nsls.sort()
2507 for nsl in nsls:
2508 sargs_list, channels_list = by_nsl[nsl]
2509 sargs = util.consistency_merge(
2510 [('',) + x for x in sargs_list])
2512 by_c = defaultdict(list)
2513 for ch in channels_list:
2514 by_c[ch.codes.channel].append(ch._get_pyrocko_channel_args())
2516 chas = list(by_c.keys())
2517 chas.sort()
2518 pchannels = []
2519 for cha in chas:
2520 list_of_cargs = by_c[cha]
2521 cargs = util.consistency_merge(
2522 [('',) + x for x in list_of_cargs])
2523 pchannels.append(pmodel.Channel(*cargs))
2525 pstations.append(
2526 pmodel.Station(*sargs, channels=pchannels))
2528 return pstations
2530 @property
2531 def pile(self):
2533 '''
2534 Emulates the older :py:class:`pyrocko.pile.Pile` interface.
2536 This property exposes a :py:class:`pyrocko.squirrel.pile.Pile` object,
2537 which emulates most of the older :py:class:`pyrocko.pile.Pile` methods
2538 but uses the fluffy power of the Squirrel under the hood.
2540 This interface can be used as a drop-in replacement for piles which are
2541 used in existing scripts and programs for efficient waveform data
2542 access. The Squirrel-based pile scales better for large datasets. Newer
2543 scripts should use Squirrel's native methods to avoid the emulation
2544 overhead.
2545 '''
2546 from . import pile
2548 if self._pile is None:
2549 self._pile = pile.Pile(self)
2551 return self._pile
2553 def snuffle(self):
2554 '''
2555 Look at dataset in Snuffler.
2556 '''
2557 self.pile.snuffle()
2559 def _gather_codes_keys(self, kind, gather, selector):
2560 return set(
2561 gather(codes)
2562 for codes in self.iter_codes(kind)
2563 if selector is None or selector(codes))
2565 def __str__(self):
2566 return str(self.get_stats())
2568 def get_coverage(
2569 self, kind, tmin=None, tmax=None, codes=None, limit=None):
2571 '''
2572 Get coverage information.
2574 Get information about strips of gapless data coverage.
2576 :param kind:
2577 Content kind to be queried.
2578 :type kind:
2579 str
2581 :param tmin:
2582 Start time of query interval.
2583 :type tmin:
2584 timestamp
2586 :param tmax:
2587 End time of query interval.
2588 :type tmax:
2589 timestamp
2591 :param codes:
2592 If given, restrict query to given content codes patterns.
2593 :type codes:
2594 :py:class:`list` of :py:class:`~pyrocko.squirrel.model.Codes`
2595 objects appropriate for the queried content type, or anything which
2596 can be converted to such objects.
2598 :param limit:
2599 Limit query to return only up to a given maximum number of entries
2600 per matching time series (without setting this option, very gappy
2601 data could cause the query to execute for a very long time).
2602 :type limit:
2603 int
2605 :returns:
2606 Information about time spans covered by the requested time series
2607 data.
2608 :rtype:
2609 :py:class:`list` of :py:class:`Coverage` objects
2610 '''
2612 tmin_seconds, tmin_offset = model.tsplit(tmin)
2613 tmax_seconds, tmax_offset = model.tsplit(tmax)
2614 kind_id = to_kind_id(kind)
2616 codes_info = list(self._iter_codes_info(kind=kind))
2618 kdata_all = []
2619 if codes is None:
2620 for _, codes_entry, deltat, kind_codes_id, _ in codes_info:
2621 kdata_all.append(
2622 (codes_entry, kind_codes_id, codes_entry, deltat))
2624 else:
2625 for codes_entry in codes:
2626 pattern = to_codes(kind_id, codes_entry)
2627 for _, codes_entry, deltat, kind_codes_id, _ in codes_info:
2628 if model.match_codes(pattern, codes_entry):
2629 kdata_all.append(
2630 (pattern, kind_codes_id, codes_entry, deltat))
2632 kind_codes_ids = [x[1] for x in kdata_all]
2634 counts_at_tmin = {}
2635 if tmin is not None:
2636 for nut in self.iter_nuts(
2637 kind, tmin, tmin, kind_codes_ids=kind_codes_ids):
2639 k = nut.codes, nut.deltat
2640 if k not in counts_at_tmin:
2641 counts_at_tmin[k] = 0
2643 counts_at_tmin[k] += 1
2645 coverages = []
2646 for pattern, kind_codes_id, codes_entry, deltat in kdata_all:
2647 entry = [pattern, codes_entry, deltat, None, None, []]
2648 for i, order in [(0, 'ASC'), (1, 'DESC')]:
2649 sql = self._sql('''
2650 SELECT
2651 time_seconds,
2652 time_offset
2653 FROM %(db)s.%(coverage)s
2654 WHERE
2655 kind_codes_id == ?
2656 ORDER BY
2657 kind_codes_id ''' + order + ''',
2658 time_seconds ''' + order + ''',
2659 time_offset ''' + order + '''
2660 LIMIT 1
2661 ''')
2663 for row in self._conn.execute(sql, [kind_codes_id]):
2664 entry[3+i] = model.tjoin(row[0], row[1])
2666 if None in entry[3:5]:
2667 continue
2669 args = [kind_codes_id]
2671 sql_time = ''
2672 if tmin is not None:
2673 # intentionally < because (== tmin) is queried from nuts
2674 sql_time += ' AND ( ? < time_seconds ' \
2675 'OR ( ? == time_seconds AND ? < time_offset ) ) '
2676 args.extend([tmin_seconds, tmin_seconds, tmin_offset])
2678 if tmax is not None:
2679 sql_time += ' AND ( time_seconds < ? ' \
2680 'OR ( ? == time_seconds AND time_offset <= ? ) ) '
2681 args.extend([tmax_seconds, tmax_seconds, tmax_offset])
2683 sql_limit = ''
2684 if limit is not None:
2685 sql_limit = ' LIMIT ?'
2686 args.append(limit)
2688 sql = self._sql('''
2689 SELECT
2690 time_seconds,
2691 time_offset,
2692 step
2693 FROM %(db)s.%(coverage)s
2694 WHERE
2695 kind_codes_id == ?
2696 ''' + sql_time + '''
2697 ORDER BY
2698 kind_codes_id,
2699 time_seconds,
2700 time_offset
2701 ''' + sql_limit)
2703 rows = list(self._conn.execute(sql, args))
2705 if limit is not None and len(rows) == limit:
2706 entry[-1] = None
2707 else:
2708 counts = counts_at_tmin.get((codes_entry, deltat), 0)
2709 tlast = None
2710 if tmin is not None:
2711 entry[-1].append((tmin, counts))
2712 tlast = tmin
2714 for row in rows:
2715 t = model.tjoin(row[0], row[1])
2716 counts += row[2]
2717 entry[-1].append((t, counts))
2718 tlast = t
2720 if tmax is not None and (tlast is None or tlast != tmax):
2721 entry[-1].append((tmax, counts))
2723 coverages.append(model.Coverage.from_values(entry + [kind_id]))
2725 return coverages
2727 def get_stationxml(
2728 self, obj=None, tmin=None, tmax=None, time=None, codes=None,
2729 level='response'):
2731 '''
2732 Get station/channel/response metadata in StationXML representation.
2734 %(query_args)s
2736 :returns:
2737 :py:class:`~pyrocko.io.stationxml.FDSNStationXML` object.
2738 '''
2740 if level not in ('network', 'station', 'channel', 'response'):
2741 raise ValueError('Invalid level: %s' % level)
2743 tmin, tmax, codes = self._get_selection_args(
2744 CHANNEL, obj, tmin, tmax, time, codes)
2746 filtering = CodesPatternFiltering(codes=codes)
2748 nslcs = list(set(
2749 codes.nslc for codes in
2750 filtering.filter(self.get_codes(kind='channel'))))
2752 from pyrocko.io import stationxml as sx
2754 networks = []
2755 for net, stas in prefix_tree(nslcs):
2756 network = sx.Network(code=net)
2757 networks.append(network)
2759 if level not in ('station', 'channel', 'response'):
2760 continue
2762 for sta, locs in stas:
2763 stations = self.get_stations(
2764 tmin=tmin,
2765 tmax=tmax,
2766 codes=(net, sta, '*'),
2767 model='stationxml')
2769 errors = sx.check_overlaps(
2770 'Station', (net, sta), stations)
2772 if errors:
2773 raise sx.Inconsistencies(
2774 'Inconsistencies found:\n %s'
2775 % '\n '.join(errors))
2777 network.station_list.extend(stations)
2779 if level not in ('channel', 'response'):
2780 continue
2782 for loc, chas in locs:
2783 for cha, _ in chas:
2784 channels = self.get_channels(
2785 tmin=tmin,
2786 tmax=tmax,
2787 codes=(net, sta, loc, cha),
2788 model='stationxml')
2790 errors = sx.check_overlaps(
2791 'Channel', (net, sta, loc, cha), channels)
2793 if errors:
2794 raise sx.Inconsistencies(
2795 'Inconsistencies found:\n %s'
2796 % '\n '.join(errors))
2798 for channel in channels:
2799 station = sx.find_containing(stations, channel)
2800 if station is not None:
2801 station.channel_list.append(channel)
2802 else:
2803 raise sx.Inconsistencies(
2804 'No station or station epoch found for '
2805 'channel: %s' % '.'.join(
2806 (net, sta, loc, cha)))
2808 if level != 'response':
2809 continue
2811 response_sq, response_sx = self.get_response(
2812 codes=(net, sta, loc, cha),
2813 tmin=channel.start_date,
2814 tmax=channel.end_date,
2815 model='stationxml+')
2817 if not (
2818 sx.eq_open(
2819 channel.start_date, response_sq.tmin)
2820 and sx.eq_open(
2821 channel.end_date, response_sq.tmax)):
2823 raise sx.Inconsistencies(
2824 'Response time span does not match '
2825 'channel time span: %s' % '.'.join(
2826 (net, sta, loc, cha)))
2828 channel.response = response_sx
2830 return sx.FDSNStationXML(
2831 source='Generated by Pyrocko Squirrel.',
2832 network_list=networks)
2834 def add_operator(self, op):
2835 self._operators.append(op)
2837 def update_operator_mappings(self):
2838 available = self.get_codes(kind=('channel'))
2840 for operator in self._operators:
2841 operator.update_mappings(available, self._operator_registry)
2843 def iter_operator_mappings(self):
2844 for operator in self._operators:
2845 for in_codes, out_codes in operator.iter_mappings():
2846 yield operator, in_codes, out_codes
2848 def get_operator_mappings(self):
2849 return list(self.iter_operator_mappings())
2851 def get_operator(self, codes):
2852 try:
2853 return self._operator_registry[codes][0]
2854 except KeyError:
2855 return None
2857 def get_operator_group(self, codes):
2858 try:
2859 return self._operator_registry[codes]
2860 except KeyError:
2861 return None, (None, None, None)
2863 def iter_operator_codes(self):
2864 for _, _, out_codes in self.iter_operator_mappings():
2865 for codes in out_codes:
2866 yield codes
2868 def get_operator_codes(self):
2869 return list(self.iter_operator_codes())
2871 def print_tables(self, table_names=None, stream=None):
2872 '''
2873 Dump raw database tables in textual form (for debugging purposes).
2875 :param table_names:
2876 Names of tables to be dumped or ``None`` to dump all.
2877 :type table_names:
2878 :py:class:`list` of :py:class:`str`
2880 :param stream:
2881 Open file or ``None`` to dump to standard output.
2882 '''
2884 if stream is None:
2885 stream = sys.stdout
2887 if isinstance(table_names, str):
2888 table_names = [table_names]
2890 if table_names is None:
2891 table_names = [
2892 'selection_file_states',
2893 'selection_nuts',
2894 'selection_kind_codes_count',
2895 'files', 'nuts', 'kind_codes', 'kind_codes_count']
2897 m = {
2898 'selection_file_states': '%(db)s.%(file_states)s',
2899 'selection_nuts': '%(db)s.%(nuts)s',
2900 'selection_kind_codes_count': '%(db)s.%(kind_codes_count)s',
2901 'files': 'files',
2902 'nuts': 'nuts',
2903 'kind_codes': 'kind_codes',
2904 'kind_codes_count': 'kind_codes_count'}
2906 for table_name in table_names:
2907 self._database.print_table(
2908 m[table_name] % self._names, stream=stream)
2911class SquirrelStats(Object):
2912 '''
2913 Container to hold statistics about contents available from a Squirrel.
2915 See also :py:meth:`Squirrel.get_stats`.
2916 '''
2918 nfiles = Int.T(
2919 help='Number of files in selection.')
2920 nnuts = Int.T(
2921 help='Number of index nuts in selection.')
2922 codes = List.T(
2923 Tuple.T(content_t=String.T()),
2924 help='Available code sequences in selection, e.g. '
2925 '(agency, network, station, location) for stations nuts.')
2926 kinds = List.T(
2927 String.T(),
2928 help='Available content types in selection.')
2929 total_size = Int.T(
2930 help='Aggregated file size of files is selection.')
2931 counts = Dict.T(
2932 String.T(), Dict.T(Tuple.T(content_t=String.T()), Int.T()),
2933 help='Breakdown of how many nuts of any content type and code '
2934 'sequence are available in selection, ``counts[kind][codes]``.')
2935 time_spans = Dict.T(
2936 String.T(), Tuple.T(content_t=Timestamp.T()),
2937 help='Time spans by content type.')
2938 sources = List.T(
2939 String.T(),
2940 help='Descriptions of attached sources.')
2941 operators = List.T(
2942 String.T(),
2943 help='Descriptions of attached operators.')
2945 def __str__(self):
2946 kind_counts = dict(
2947 (kind, sum(self.counts[kind].values())) for kind in self.kinds)
2949 scodes = model.codes_to_str_abbreviated(self.codes)
2951 ssources = '<none>' if not self.sources else '\n' + '\n'.join(
2952 ' ' + s for s in self.sources)
2954 soperators = '<none>' if not self.operators else '\n' + '\n'.join(
2955 ' ' + s for s in self.operators)
2957 def stime(t):
2958 return util.tts(t) if t is not None and t not in (
2959 model.g_tmin, model.g_tmax) else '<none>'
2961 def stable(rows):
2962 ns = [max(len(w) for w in col) for col in zip(*rows)]
2963 return '\n'.join(
2964 ' '.join(w.ljust(n) for n, w in zip(ns, row))
2965 for row in rows)
2967 def indent(s):
2968 return '\n'.join(' '+line for line in s.splitlines())
2970 stspans = '<none>' if not self.kinds else '\n' + indent(stable([(
2971 kind + ':',
2972 str(kind_counts[kind]),
2973 stime(self.time_spans[kind][0]),
2974 '-',
2975 stime(self.time_spans[kind][1])) for kind in sorted(self.kinds)]))
2977 s = '''
2978Number of files: %i
2979Total size of known files: %s
2980Number of index nuts: %i
2981Available content kinds: %s
2982Available codes: %s
2983Sources: %s
2984Operators: %s''' % (
2985 self.nfiles,
2986 util.human_bytesize(self.total_size),
2987 self.nnuts,
2988 stspans, scodes, ssources, soperators)
2990 return s.lstrip()
2993__all__ = [
2994 'Squirrel',
2995 'SquirrelStats',
2996]