1# http://pyrocko.org - GPLv3
2#
3# The Pyrocko Developers, 21st Century
4# ---|P------/S----------~Lg----------
6from __future__ import absolute_import, print_function
8import sys
9import os
11import math
12import logging
13import threading
14import queue
15from collections import defaultdict
17from pyrocko.guts import Object, Int, List, Tuple, String, Timestamp, Dict
18from pyrocko import util, trace
19from pyrocko.progress import progress
21from . import model, io, cache, dataset
23from .model import to_kind_id, WaveformOrder, to_kind, to_codes, \
24 STATION, CHANNEL, RESPONSE, EVENT, WAVEFORM, codes_patterns_list, \
25 codes_patterns_for_kind
26from .client import fdsn, catalog
27from .selection import Selection, filldocs
28from .database import abspath
29from .operators.base import Operator, CodesPatternFiltering
30from . import client, environment, error
32logger = logging.getLogger('psq.base')
34guts_prefix = 'squirrel'
37def make_task(*args):
38 return progress.task(*args, logger=logger)
41def lpick(condition, seq):
42 ft = [], []
43 for ele in seq:
44 ft[int(bool(condition(ele)))].append(ele)
46 return ft
49def blocks(tmin, tmax, deltat, nsamples_block=100000):
50 tblock = util.to_time_float(deltat * nsamples_block)
51 iblock_min = int(math.floor(tmin / tblock))
52 iblock_max = int(math.ceil(tmax / tblock))
53 for iblock in range(iblock_min, iblock_max):
54 yield iblock * tblock, (iblock+1) * tblock
57def gaps(avail, tmin, tmax):
58 assert tmin < tmax
60 data = [(tmax, 1), (tmin, -1)]
61 for (tmin_a, tmax_a) in avail:
62 assert tmin_a < tmax_a
63 data.append((tmin_a, 1))
64 data.append((tmax_a, -1))
66 data.sort()
67 s = 1
68 gaps = []
69 tmin_g = None
70 for t, x in data:
71 if s == 1 and x == -1:
72 tmin_g = t
73 elif s == 0 and x == 1 and tmin_g is not None:
74 tmax_g = t
75 if tmin_g != tmax_g:
76 gaps.append((tmin_g, tmax_g))
78 s += x
80 return gaps
83def order_key(order):
84 return (order.codes, order.tmin, order.tmax)
87def _is_exact(pat):
88 return not ('*' in pat or '?' in pat or ']' in pat or '[' in pat)
91def prefix_tree(tups):
92 if not tups:
93 return []
95 if len(tups[0]) == 1:
96 return sorted((tup[0], []) for tup in tups)
98 d = defaultdict(list)
99 for tup in tups:
100 d[tup[0]].append(tup[1:])
102 sub = []
103 for k in sorted(d.keys()):
104 sub.append((k, prefix_tree(d[k])))
106 return sub
109class Batch(object):
110 '''
111 Batch of waveforms from window-wise data extraction.
113 Encapsulates state and results yielded for each window in window-wise
114 waveform extraction with the :py:meth:`Squirrel.chopper_waveforms` method.
116 *Attributes:*
118 .. py:attribute:: tmin
120 Start of this time window.
122 .. py:attribute:: tmax
124 End of this time window.
126 .. py:attribute:: i
128 Index of this time window in sequence.
130 .. py:attribute:: n
132 Total number of time windows in sequence.
134 .. py:attribute:: traces
136 Extracted waveforms for this time window.
137 '''
139 def __init__(self, tmin, tmax, i, n, traces):
140 self.tmin = tmin
141 self.tmax = tmax
142 self.i = i
143 self.n = n
144 self.traces = traces
147class Squirrel(Selection):
148 '''
149 Prompt, lazy, indexing, caching, dynamic seismological dataset access.
151 :param env:
152 Squirrel environment instance or directory path to use as starting
153 point for its detection. By default, the current directory is used as
154 starting point. When searching for a usable environment the directory
155 ``'.squirrel'`` or ``'squirrel'`` in the current (or starting point)
156 directory is used if it exists, otherwise the parent directories are
157 search upwards for the existence of such a directory. If no such
158 directory is found, the user's global Squirrel environment
159 ``'$HOME/.pyrocko/squirrel'`` is used.
160 :type env:
161 :py:class:`~pyrocko.squirrel.environment.Environment` or
162 :py:class:`str`
164 :param database:
165 Database instance or path to database. By default the
166 database found in the detected Squirrel environment is used.
167 :type database:
168 :py:class:`~pyrocko.squirrel.database.Database` or :py:class:`str`
170 :param cache_path:
171 Directory path to use for data caching. By default, the ``'cache'``
172 directory in the detected Squirrel environment is used.
173 :type cache_path:
174 :py:class:`str`
176 :param persistent:
177 If given a name, create a persistent selection.
178 :type persistent:
179 :py:class:`str`
181 This is the central class of the Squirrel framework. It provides a unified
182 interface to query and access seismic waveforms, station meta-data and
183 event information from local file collections and remote data sources. For
184 prompt responses, a profound database setup is used under the hood. To
185 speed up assemblage of ad-hoc data selections, files are indexed on first
186 use and the extracted meta-data is remembered in the database for
187 subsequent accesses. Bulk data is lazily loaded from disk and remote
188 sources, just when requested. Once loaded, data is cached in memory to
189 expedite typical access patterns. Files and data sources can be dynamically
190 added to and removed from the Squirrel selection at runtime.
192 Queries are restricted to the contents of the files currently added to the
193 Squirrel selection (usually a subset of the file meta-information
194 collection in the database). This list of files is referred to here as the
195 "selection". By default, temporary tables are created in the attached
196 database to hold the names of the files in the selection as well as various
197 indices and counters. These tables are only visible inside the application
198 which created them and are deleted when the database connection is closed
199 or the application exits. To create a selection which is not deleted at
200 exit, supply a name to the ``persistent`` argument of the Squirrel
201 constructor. Persistent selections are shared among applications using the
202 same database.
204 **Method summary**
206 Some of the methods are implemented in :py:class:`Squirrel`'s base class
207 :py:class:`~pyrocko.squirrel.selection.Selection`.
209 .. autosummary::
211 ~Squirrel.add
212 ~Squirrel.add_source
213 ~Squirrel.add_fdsn
214 ~Squirrel.add_catalog
215 ~Squirrel.add_dataset
216 ~Squirrel.add_virtual
217 ~Squirrel.update
218 ~Squirrel.update_waveform_promises
219 ~Squirrel.advance_accessor
220 ~Squirrel.clear_accessor
221 ~Squirrel.reload
222 ~pyrocko.squirrel.selection.Selection.iter_paths
223 ~Squirrel.iter_nuts
224 ~Squirrel.iter_kinds
225 ~Squirrel.iter_deltats
226 ~Squirrel.iter_codes
227 ~pyrocko.squirrel.selection.Selection.get_paths
228 ~Squirrel.get_nuts
229 ~Squirrel.get_kinds
230 ~Squirrel.get_deltats
231 ~Squirrel.get_codes
232 ~Squirrel.get_counts
233 ~Squirrel.get_time_span
234 ~Squirrel.get_deltat_span
235 ~Squirrel.get_nfiles
236 ~Squirrel.get_nnuts
237 ~Squirrel.get_total_size
238 ~Squirrel.get_stats
239 ~Squirrel.get_content
240 ~Squirrel.get_stations
241 ~Squirrel.get_channels
242 ~Squirrel.get_responses
243 ~Squirrel.get_events
244 ~Squirrel.get_waveform_nuts
245 ~Squirrel.get_waveforms
246 ~Squirrel.chopper_waveforms
247 ~Squirrel.get_coverage
248 ~Squirrel.pile
249 ~Squirrel.snuffle
250 ~Squirrel.glob_codes
251 ~pyrocko.squirrel.selection.Selection.get_database
252 ~Squirrel.print_tables
253 '''
255 def __init__(
256 self, env=None, database=None, cache_path=None, persistent=None):
258 if not isinstance(env, environment.Environment):
259 env = environment.get_environment(env)
261 if database is None:
262 database = env.expand_path(env.database_path)
264 if cache_path is None:
265 cache_path = env.expand_path(env.cache_path)
267 if persistent is None:
268 persistent = env.persistent
270 Selection.__init__(
271 self, database=database, persistent=persistent)
273 self.get_database().set_basepath(os.path.dirname(env.get_basepath()))
275 self._content_caches = {
276 'waveform': cache.ContentCache(),
277 'default': cache.ContentCache()}
279 self._cache_path = cache_path
281 self._sources = []
282 self._operators = []
283 self._operator_registry = {}
285 self._pile = None
286 self._n_choppers_active = 0
288 self._names.update({
289 'nuts': self.name + '_nuts',
290 'kind_codes_count': self.name + '_kind_codes_count',
291 'coverage': self.name + '_coverage'})
293 with self.transaction('create tables') as cursor:
294 self._create_tables_squirrel(cursor)
296 def _create_tables_squirrel(self, cursor):
298 cursor.execute(self._register_table(self._sql(
299 '''
300 CREATE TABLE IF NOT EXISTS %(db)s.%(nuts)s (
301 nut_id integer PRIMARY KEY,
302 file_id integer,
303 file_segment integer,
304 file_element integer,
305 kind_id integer,
306 kind_codes_id integer,
307 tmin_seconds integer,
308 tmin_offset integer,
309 tmax_seconds integer,
310 tmax_offset integer,
311 kscale integer)
312 ''')))
314 cursor.execute(self._register_table(self._sql(
315 '''
316 CREATE TABLE IF NOT EXISTS %(db)s.%(kind_codes_count)s (
317 kind_codes_id integer PRIMARY KEY,
318 count integer)
319 ''')))
321 cursor.execute(self._sql(
322 '''
323 CREATE UNIQUE INDEX IF NOT EXISTS %(db)s.%(nuts)s_file_element
324 ON %(nuts)s (file_id, file_segment, file_element)
325 '''))
327 cursor.execute(self._sql(
328 '''
329 CREATE INDEX IF NOT EXISTS %(db)s.%(nuts)s_index_file_id
330 ON %(nuts)s (file_id)
331 '''))
333 cursor.execute(self._sql(
334 '''
335 CREATE INDEX IF NOT EXISTS %(db)s.%(nuts)s_index_tmin_seconds
336 ON %(nuts)s (kind_id, tmin_seconds)
337 '''))
339 cursor.execute(self._sql(
340 '''
341 CREATE INDEX IF NOT EXISTS %(db)s.%(nuts)s_index_tmax_seconds
342 ON %(nuts)s (kind_id, tmax_seconds)
343 '''))
345 cursor.execute(self._sql(
346 '''
347 CREATE INDEX IF NOT EXISTS %(db)s.%(nuts)s_index_kscale
348 ON %(nuts)s (kind_id, kscale, tmin_seconds)
349 '''))
351 cursor.execute(self._sql(
352 '''
353 CREATE TRIGGER IF NOT EXISTS %(db)s.%(nuts)s_delete_nuts
354 BEFORE DELETE ON main.files FOR EACH ROW
355 BEGIN
356 DELETE FROM %(nuts)s WHERE file_id == old.file_id;
357 END
358 '''))
360 # trigger only on size to make silent update of mtime possible
361 cursor.execute(self._sql(
362 '''
363 CREATE TRIGGER IF NOT EXISTS %(db)s.%(nuts)s_delete_nuts2
364 BEFORE UPDATE OF size ON main.files FOR EACH ROW
365 BEGIN
366 DELETE FROM %(nuts)s WHERE file_id == old.file_id;
367 END
368 '''))
370 cursor.execute(self._sql(
371 '''
372 CREATE TRIGGER IF NOT EXISTS
373 %(db)s.%(file_states)s_delete_files
374 BEFORE DELETE ON %(db)s.%(file_states)s FOR EACH ROW
375 BEGIN
376 DELETE FROM %(nuts)s WHERE file_id == old.file_id;
377 END
378 '''))
380 cursor.execute(self._sql(
381 '''
382 CREATE TRIGGER IF NOT EXISTS %(db)s.%(nuts)s_inc_kind_codes
383 BEFORE INSERT ON %(nuts)s FOR EACH ROW
384 BEGIN
385 INSERT OR IGNORE INTO %(kind_codes_count)s VALUES
386 (new.kind_codes_id, 0);
387 UPDATE %(kind_codes_count)s
388 SET count = count + 1
389 WHERE new.kind_codes_id
390 == %(kind_codes_count)s.kind_codes_id;
391 END
392 '''))
394 cursor.execute(self._sql(
395 '''
396 CREATE TRIGGER IF NOT EXISTS %(db)s.%(nuts)s_dec_kind_codes
397 BEFORE DELETE ON %(nuts)s FOR EACH ROW
398 BEGIN
399 UPDATE %(kind_codes_count)s
400 SET count = count - 1
401 WHERE old.kind_codes_id
402 == %(kind_codes_count)s.kind_codes_id;
403 END
404 '''))
406 cursor.execute(self._register_table(self._sql(
407 '''
408 CREATE TABLE IF NOT EXISTS %(db)s.%(coverage)s (
409 kind_codes_id integer,
410 time_seconds integer,
411 time_offset integer,
412 step integer)
413 ''')))
415 cursor.execute(self._sql(
416 '''
417 CREATE UNIQUE INDEX IF NOT EXISTS %(db)s.%(coverage)s_time
418 ON %(coverage)s (kind_codes_id, time_seconds, time_offset)
419 '''))
421 cursor.execute(self._sql(
422 '''
423 CREATE TRIGGER IF NOT EXISTS %(db)s.%(nuts)s_add_coverage
424 AFTER INSERT ON %(nuts)s FOR EACH ROW
425 BEGIN
426 INSERT OR IGNORE INTO %(coverage)s VALUES
427 (new.kind_codes_id, new.tmin_seconds, new.tmin_offset, 0)
428 ;
429 UPDATE %(coverage)s
430 SET step = step + 1
431 WHERE new.kind_codes_id == %(coverage)s.kind_codes_id
432 AND new.tmin_seconds == %(coverage)s.time_seconds
433 AND new.tmin_offset == %(coverage)s.time_offset
434 ;
435 INSERT OR IGNORE INTO %(coverage)s VALUES
436 (new.kind_codes_id, new.tmax_seconds, new.tmax_offset, 0)
437 ;
438 UPDATE %(coverage)s
439 SET step = step - 1
440 WHERE new.kind_codes_id == %(coverage)s.kind_codes_id
441 AND new.tmax_seconds == %(coverage)s.time_seconds
442 AND new.tmax_offset == %(coverage)s.time_offset
443 ;
444 DELETE FROM %(coverage)s
445 WHERE new.kind_codes_id == %(coverage)s.kind_codes_id
446 AND new.tmin_seconds == %(coverage)s.time_seconds
447 AND new.tmin_offset == %(coverage)s.time_offset
448 AND step == 0
449 ;
450 DELETE FROM %(coverage)s
451 WHERE new.kind_codes_id == %(coverage)s.kind_codes_id
452 AND new.tmax_seconds == %(coverage)s.time_seconds
453 AND new.tmax_offset == %(coverage)s.time_offset
454 AND step == 0
455 ;
456 END
457 '''))
459 cursor.execute(self._sql(
460 '''
461 CREATE TRIGGER IF NOT EXISTS %(db)s.%(nuts)s_remove_coverage
462 BEFORE DELETE ON %(nuts)s FOR EACH ROW
463 BEGIN
464 INSERT OR IGNORE INTO %(coverage)s VALUES
465 (old.kind_codes_id, old.tmin_seconds, old.tmin_offset, 0)
466 ;
467 UPDATE %(coverage)s
468 SET step = step - 1
469 WHERE old.kind_codes_id == %(coverage)s.kind_codes_id
470 AND old.tmin_seconds == %(coverage)s.time_seconds
471 AND old.tmin_offset == %(coverage)s.time_offset
472 ;
473 INSERT OR IGNORE INTO %(coverage)s VALUES
474 (old.kind_codes_id, old.tmax_seconds, old.tmax_offset, 0)
475 ;
476 UPDATE %(coverage)s
477 SET step = step + 1
478 WHERE old.kind_codes_id == %(coverage)s.kind_codes_id
479 AND old.tmax_seconds == %(coverage)s.time_seconds
480 AND old.tmax_offset == %(coverage)s.time_offset
481 ;
482 DELETE FROM %(coverage)s
483 WHERE old.kind_codes_id == %(coverage)s.kind_codes_id
484 AND old.tmin_seconds == %(coverage)s.time_seconds
485 AND old.tmin_offset == %(coverage)s.time_offset
486 AND step == 0
487 ;
488 DELETE FROM %(coverage)s
489 WHERE old.kind_codes_id == %(coverage)s.kind_codes_id
490 AND old.tmax_seconds == %(coverage)s.time_seconds
491 AND old.tmax_offset == %(coverage)s.time_offset
492 AND step == 0
493 ;
494 END
495 '''))
497 def _delete(self):
498 '''Delete database tables associated with this Squirrel.'''
500 with self.transaction('delete tables') as cursor:
501 for s in '''
502 DROP TRIGGER %(db)s.%(nuts)s_delete_nuts;
503 DROP TRIGGER %(db)s.%(nuts)s_delete_nuts2;
504 DROP TRIGGER %(db)s.%(file_states)s_delete_files;
505 DROP TRIGGER %(db)s.%(nuts)s_inc_kind_codes;
506 DROP TRIGGER %(db)s.%(nuts)s_dec_kind_codes;
507 DROP TABLE %(db)s.%(nuts)s;
508 DROP TABLE %(db)s.%(kind_codes_count)s;
509 DROP TRIGGER IF EXISTS %(db)s.%(nuts)s_add_coverage;
510 DROP TRIGGER IF EXISTS %(db)s.%(nuts)s_remove_coverage;
511 DROP TABLE IF EXISTS %(db)s.%(coverage)s;
512 '''.strip().splitlines():
514 cursor.execute(self._sql(s))
516 Selection._delete(self)
518 @filldocs
519 def add(self,
520 paths,
521 kinds=None,
522 format='detect',
523 include=None,
524 exclude=None,
525 check=True):
527 '''
528 Add files to the selection.
530 :param paths:
531 Iterator yielding paths to files or directories to be added to the
532 selection. Recurses into directories. If given a ``str``, it
533 is treated as a single path to be added.
534 :type paths:
535 :py:class:`list` of :py:class:`str`
537 :param kinds:
538 Content types to be made available through the Squirrel selection.
539 By default, all known content types are accepted.
540 :type kinds:
541 :py:class:`list` of :py:class:`str`
543 :param format:
544 File format identifier or ``'detect'`` to enable auto-detection
545 (available: %(file_formats)s).
546 :type format:
547 str
549 :param include:
550 If not ``None``, files are only included if their paths match the
551 given regular expression pattern.
552 :type format:
553 str
555 :param exclude:
556 If not ``None``, files are only included if their paths do not
557 match the given regular expression pattern.
558 :type format:
559 str
561 :param check:
562 If ``True``, all file modification times are checked to see if
563 cached information has to be updated (slow). If ``False``, only
564 previously unknown files are indexed and cached information is used
565 for known files, regardless of file state (fast, corrresponds to
566 Squirrel's ``--optimistic`` mode). File deletions will go
567 undetected in the latter case.
568 :type check:
569 bool
571 :Complexity:
572 O(log N)
573 '''
575 if isinstance(kinds, str):
576 kinds = (kinds,)
578 if isinstance(paths, str):
579 paths = [paths]
581 kind_mask = model.to_kind_mask(kinds)
583 with progress.view():
584 Selection.add(
585 self, util.iter_select_files(
586 paths,
587 show_progress=False,
588 include=include,
589 exclude=exclude,
590 pass_through=lambda path: path.startswith('virtual:')
591 ), kind_mask, format)
593 self._load(check)
594 self._update_nuts()
596 def reload(self):
597 '''
598 Check for modifications and reindex modified files.
600 Based on file modification times.
601 '''
603 self._set_file_states_force_check()
604 self._load(check=True)
605 self._update_nuts()
607 def add_virtual(self, nuts, virtual_paths=None):
608 '''
609 Add content which is not backed by files.
611 :param nuts:
612 Content pieces to be added.
613 :type nuts:
614 iterator yielding :py:class:`~pyrocko.squirrel.model.Nut` objects
616 :param virtual_paths:
617 List of virtual paths to prevent creating a temporary list of the
618 nuts while aggregating the file paths for the selection.
619 :type virtual_paths:
620 :py:class:`list` of :py:class:`str`
622 Stores to the main database and the selection.
623 '''
625 if isinstance(virtual_paths, str):
626 virtual_paths = [virtual_paths]
628 if virtual_paths is None:
629 if not isinstance(nuts, list):
630 nuts = list(nuts)
631 virtual_paths = set(nut.file_path for nut in nuts)
633 Selection.add(self, virtual_paths)
634 self.get_database().dig(nuts)
635 self._update_nuts()
637 def add_volatile(self, nuts):
638 if not isinstance(nuts, list):
639 nuts = list(nuts)
641 paths = list(set(nut.file_path for nut in nuts))
642 io.backends.virtual.add_nuts(nuts)
643 self.add_virtual(nuts, paths)
644 self._volatile_paths.extend(paths)
646 def add_volatile_waveforms(self, traces):
647 '''
648 Add in-memory waveforms which will be removed when the app closes.
649 '''
651 name = model.random_name()
653 path = 'virtual:volatile:%s' % name
655 nuts = []
656 for itr, tr in enumerate(traces):
657 assert tr.tmin <= tr.tmax
658 tmin_seconds, tmin_offset = model.tsplit(tr.tmin)
659 tmax_seconds, tmax_offset = model.tsplit(
660 tr.tmin + tr.data_len()*tr.deltat)
662 nuts.append(model.Nut(
663 file_path=path,
664 file_format='virtual',
665 file_segment=itr,
666 file_element=0,
667 file_mtime=0,
668 codes=tr.codes,
669 tmin_seconds=tmin_seconds,
670 tmin_offset=tmin_offset,
671 tmax_seconds=tmax_seconds,
672 tmax_offset=tmax_offset,
673 deltat=tr.deltat,
674 kind_id=to_kind_id('waveform'),
675 content=tr))
677 self.add_volatile(nuts)
678 return path
680 def _load(self, check):
681 for _ in io.iload(
682 self,
683 content=[],
684 skip_unchanged=True,
685 check=check):
686 pass
688 def _update_nuts(self, transaction=None):
689 transaction = transaction or self.transaction('update nuts')
690 with make_task('Aggregating selection') as task, \
691 transaction as cursor:
693 self._conn.set_progress_handler(task.update, 100000)
694 nrows = cursor.execute(self._sql(
695 '''
696 INSERT INTO %(db)s.%(nuts)s
697 SELECT NULL,
698 nuts.file_id, nuts.file_segment, nuts.file_element,
699 nuts.kind_id, nuts.kind_codes_id,
700 nuts.tmin_seconds, nuts.tmin_offset,
701 nuts.tmax_seconds, nuts.tmax_offset,
702 nuts.kscale
703 FROM %(db)s.%(file_states)s
704 INNER JOIN nuts
705 ON %(db)s.%(file_states)s.file_id == nuts.file_id
706 INNER JOIN kind_codes
707 ON nuts.kind_codes_id ==
708 kind_codes.kind_codes_id
709 WHERE %(db)s.%(file_states)s.file_state != 2
710 AND (((1 << kind_codes.kind_id)
711 & %(db)s.%(file_states)s.kind_mask) != 0)
712 ''')).rowcount
714 task.update(nrows)
715 self._set_file_states_known(transaction)
716 self._conn.set_progress_handler(None, 0)
718 def add_source(self, source, check=True):
719 '''
720 Add remote resource.
722 :param source:
723 Remote data access client instance.
724 :type source:
725 subclass of :py:class:`~pyrocko.squirrel.client.base.Source`
726 '''
728 self._sources.append(source)
729 source.setup(self, check=check)
731 def add_fdsn(self, *args, **kwargs):
732 '''
733 Add FDSN site for transparent remote data access.
735 Arguments are passed to
736 :py:class:`~pyrocko.squirrel.client.fdsn.FDSNSource`.
737 '''
739 self.add_source(fdsn.FDSNSource(*args, **kwargs))
741 def add_catalog(self, *args, **kwargs):
742 '''
743 Add online catalog for transparent event data access.
745 Arguments are passed to
746 :py:class:`~pyrocko.squirrel.client.catalog.CatalogSource`.
747 '''
749 self.add_source(catalog.CatalogSource(*args, **kwargs))
751 def add_dataset(self, ds, check=True):
752 '''
753 Read dataset description from file and add its contents.
755 :param ds:
756 Path to dataset description file or dataset description object
757 . See :py:mod:`~pyrocko.squirrel.dataset`.
758 :type ds:
759 :py:class:`str` or :py:class:`~pyrocko.squirrel.dataset.Dataset`
761 :param check:
762 If ``True``, all file modification times are checked to see if
763 cached information has to be updated (slow). If ``False``, only
764 previously unknown files are indexed and cached information is used
765 for known files, regardless of file state (fast, corrresponds to
766 Squirrel's ``--optimistic`` mode). File deletions will go
767 undetected in the latter case.
768 :type check:
769 bool
770 '''
771 if isinstance(ds, str):
772 ds = dataset.read_dataset(ds)
774 ds.setup(self, check=check)
776 def _get_selection_args(
777 self, kind_id,
778 obj=None, tmin=None, tmax=None, time=None, codes=None):
780 if codes is not None:
781 codes = codes_patterns_for_kind(kind_id, codes)
783 if time is not None:
784 tmin = time
785 tmax = time
787 if obj is not None:
788 tmin = tmin if tmin is not None else obj.tmin
789 tmax = tmax if tmax is not None else obj.tmax
790 codes = codes if codes is not None else codes_patterns_for_kind(
791 kind_id, obj.codes)
793 return tmin, tmax, codes
795 def _get_selection_args_str(self, *args, **kwargs):
797 tmin, tmax, codes = self._get_selection_args(*args, **kwargs)
798 return 'tmin: %s, tmax: %s, codes: %s' % (
799 util.time_to_str(tmin) if tmin is not None else 'none',
800 util.time_to_str(tmax) if tmin is not None else 'none',
801 ','.join(str(entry) for entry in codes))
803 def _selection_args_to_kwargs(
804 self, obj=None, tmin=None, tmax=None, time=None, codes=None):
806 return dict(obj=obj, tmin=tmin, tmax=tmax, time=time, codes=codes)
808 def _timerange_sql(self, tmin, tmax, kind, cond, args, naiv):
810 tmin_seconds, tmin_offset = model.tsplit(tmin)
811 tmax_seconds, tmax_offset = model.tsplit(tmax)
812 if naiv:
813 cond.append('%(db)s.%(nuts)s.tmin_seconds <= ?')
814 args.append(tmax_seconds)
815 else:
816 tscale_edges = model.tscale_edges
817 tmin_cond = []
818 for kscale in range(tscale_edges.size + 1):
819 if kscale != tscale_edges.size:
820 tscale = int(tscale_edges[kscale])
821 tmin_cond.append('''
822 (%(db)s.%(nuts)s.kind_id = ?
823 AND %(db)s.%(nuts)s.kscale == ?
824 AND %(db)s.%(nuts)s.tmin_seconds BETWEEN ? AND ?)
825 ''')
826 args.extend(
827 (to_kind_id(kind), kscale,
828 tmin_seconds - tscale - 1, tmax_seconds + 1))
830 else:
831 tmin_cond.append('''
832 (%(db)s.%(nuts)s.kind_id == ?
833 AND %(db)s.%(nuts)s.kscale == ?
834 AND %(db)s.%(nuts)s.tmin_seconds <= ?)
835 ''')
837 args.extend(
838 (to_kind_id(kind), kscale, tmax_seconds + 1))
839 if tmin_cond:
840 cond.append(' ( ' + ' OR '.join(tmin_cond) + ' ) ')
842 cond.append('%(db)s.%(nuts)s.tmax_seconds >= ?')
843 args.append(tmin_seconds)
845 def _codes_match_sql(self, kind_id, codes, cond, args):
846 pats = codes_patterns_for_kind(kind_id, codes)
847 if pats is None:
848 return
850 pats_exact = []
851 pats_nonexact = []
852 for pat in pats:
853 spat = pat.safe_str
854 (pats_exact if _is_exact(spat) else pats_nonexact).append(spat)
856 cond_exact = None
857 if pats_exact:
858 cond_exact = ' ( kind_codes.codes IN ( %s ) ) ' % ', '.join(
859 '?'*len(pats_exact))
861 args.extend(pats_exact)
863 cond_nonexact = None
864 if pats_nonexact:
865 cond_nonexact = ' ( %s ) ' % ' OR '.join(
866 ('kind_codes.codes GLOB ?',) * len(pats_nonexact))
868 args.extend(pats_nonexact)
870 if cond_exact and cond_nonexact:
871 cond.append(' ( %s OR %s ) ' % (cond_exact, cond_nonexact))
873 elif cond_exact:
874 cond.append(cond_exact)
876 elif cond_nonexact:
877 cond.append(cond_nonexact)
879 def iter_nuts(
880 self, kind=None, tmin=None, tmax=None, codes=None, naiv=False,
881 kind_codes_ids=None, path=None):
883 '''
884 Iterate over content entities matching given constraints.
886 :param kind:
887 Content kind (or kinds) to extract.
888 :type kind:
889 :py:class:`str`, :py:class:`list` of :py:class:`str`
891 :param tmin:
892 Start time of query interval.
893 :type tmin:
894 timestamp
896 :param tmax:
897 End time of query interval.
898 :type tmax:
899 timestamp
901 :param codes:
902 List of code patterns to query.
903 :type codes:
904 :py:class:`list` of :py:class:`~pyrocko.squirrel.model.Codes`
905 objects appropriate for the queried content type, or anything which
906 can be converted to such objects.
908 :param naiv:
909 Bypass time span lookup through indices (slow, for testing).
910 :type naiv:
911 :py:class:`bool`
913 :param kind_codes_ids:
914 Kind-codes IDs of contents to be retrieved (internal use).
915 :type kind_codes_ids:
916 :py:class:`list` of :py:class:`int`
918 :yields:
919 :py:class:`~pyrocko.squirrel.model.Nut` objects representing the
920 intersecting content.
922 :complexity:
923 O(log N) for the time selection part due to heavy use of database
924 indices.
926 Query time span is treated as a half-open interval ``[tmin, tmax)``.
927 However, if ``tmin`` equals ``tmax``, the edge logics are modified to
928 closed-interval so that content intersecting with the time instant ``t
929 = tmin = tmax`` is returned (otherwise nothing would be returned as
930 ``[t, t)`` never matches anything).
932 Time spans of content entities to be matched are also treated as half
933 open intervals, e.g. content span ``[0, 1)`` is matched by query span
934 ``[0, 1)`` but not by ``[-1, 0)`` or ``[1, 2)``. Also here, logics are
935 modified to closed-interval when the content time span is an empty
936 interval, i.e. to indicate a time instant. E.g. time instant 0 is
937 matched by ``[0, 1)`` but not by ``[-1, 0)`` or ``[1, 2)``.
938 '''
940 if not isinstance(kind, str):
941 if kind is None:
942 kind = model.g_content_kinds
943 for kind_ in kind:
944 for nut in self.iter_nuts(kind_, tmin, tmax, codes):
945 yield nut
947 return
949 kind_id = to_kind_id(kind)
951 cond = []
952 args = []
953 if tmin is not None or tmax is not None:
954 assert kind is not None
955 if tmin is None:
956 tmin = self.get_time_span()[0]
957 if tmax is None:
958 tmax = self.get_time_span()[1] + 1.0
960 self._timerange_sql(tmin, tmax, kind, cond, args, naiv)
962 cond.append('kind_codes.kind_id == ?')
963 args.append(kind_id)
965 if codes is not None:
966 self._codes_match_sql(kind_id, codes, cond, args)
968 if kind_codes_ids is not None:
969 cond.append(
970 ' ( kind_codes.kind_codes_id IN ( %s ) ) ' % ', '.join(
971 '?'*len(kind_codes_ids)))
973 args.extend(kind_codes_ids)
975 db = self.get_database()
976 if path is not None:
977 cond.append('files.path == ?')
978 args.append(db.relpath(abspath(path)))
980 sql = ('''
981 SELECT
982 files.path,
983 files.format,
984 files.mtime,
985 files.size,
986 %(db)s.%(nuts)s.file_segment,
987 %(db)s.%(nuts)s.file_element,
988 kind_codes.kind_id,
989 kind_codes.codes,
990 %(db)s.%(nuts)s.tmin_seconds,
991 %(db)s.%(nuts)s.tmin_offset,
992 %(db)s.%(nuts)s.tmax_seconds,
993 %(db)s.%(nuts)s.tmax_offset,
994 kind_codes.deltat
995 FROM files
996 INNER JOIN %(db)s.%(nuts)s
997 ON files.file_id == %(db)s.%(nuts)s.file_id
998 INNER JOIN kind_codes
999 ON %(db)s.%(nuts)s.kind_codes_id == kind_codes.kind_codes_id
1000 ''')
1002 if cond:
1003 sql += ''' WHERE ''' + ' AND '.join(cond)
1005 sql = self._sql(sql)
1006 if tmin is None and tmax is None:
1007 for row in self._conn.execute(sql, args):
1008 row = (db.abspath(row[0]),) + row[1:]
1009 nut = model.Nut(values_nocheck=row)
1010 yield nut
1011 else:
1012 assert tmin is not None and tmax is not None
1013 if tmin == tmax:
1014 for row in self._conn.execute(sql, args):
1015 row = (db.abspath(row[0]),) + row[1:]
1016 nut = model.Nut(values_nocheck=row)
1017 if (nut.tmin <= tmin < nut.tmax) \
1018 or (nut.tmin == nut.tmax and tmin == nut.tmin):
1020 yield nut
1021 else:
1022 for row in self._conn.execute(sql, args):
1023 row = (db.abspath(row[0]),) + row[1:]
1024 nut = model.Nut(values_nocheck=row)
1025 if (tmin < nut.tmax and nut.tmin < tmax) \
1026 or (nut.tmin == nut.tmax
1027 and tmin <= nut.tmin < tmax):
1029 yield nut
1031 def get_nuts(self, *args, **kwargs):
1032 '''
1033 Get content entities matching given constraints.
1035 Like :py:meth:`iter_nuts` but returns results as a list.
1036 '''
1038 return list(self.iter_nuts(*args, **kwargs))
1040 def _split_nuts(
1041 self, kind, tmin=None, tmax=None, codes=None, path=None):
1043 kind_id = to_kind_id(kind)
1044 tmin_seconds, tmin_offset = model.tsplit(tmin)
1045 tmax_seconds, tmax_offset = model.tsplit(tmax)
1047 names_main_nuts = dict(self._names)
1048 names_main_nuts.update(db='main', nuts='nuts')
1050 db = self.get_database()
1052 def main_nuts(s):
1053 return s % names_main_nuts
1055 with self.transaction('split nuts') as cursor:
1056 # modify selection and main
1057 for sql_subst in [
1058 self._sql, main_nuts]:
1060 cond = []
1061 args = []
1063 self._timerange_sql(tmin, tmax, kind, cond, args, False)
1065 if codes is not None:
1066 self._codes_match_sql(kind_id, codes, cond, args)
1068 if path is not None:
1069 cond.append('files.path == ?')
1070 args.append(db.relpath(abspath(path)))
1072 sql = sql_subst('''
1073 SELECT
1074 %(db)s.%(nuts)s.nut_id,
1075 %(db)s.%(nuts)s.tmin_seconds,
1076 %(db)s.%(nuts)s.tmin_offset,
1077 %(db)s.%(nuts)s.tmax_seconds,
1078 %(db)s.%(nuts)s.tmax_offset,
1079 kind_codes.deltat
1080 FROM files
1081 INNER JOIN %(db)s.%(nuts)s
1082 ON files.file_id == %(db)s.%(nuts)s.file_id
1083 INNER JOIN kind_codes
1084 ON %(db)s.%(nuts)s.kind_codes_id == kind_codes.kind_codes_id
1085 WHERE ''' + ' AND '.join(cond)) # noqa
1087 insert = []
1088 delete = []
1089 for row in cursor.execute(sql, args):
1090 nut_id, nut_tmin_seconds, nut_tmin_offset, \
1091 nut_tmax_seconds, nut_tmax_offset, nut_deltat = row
1093 nut_tmin = model.tjoin(
1094 nut_tmin_seconds, nut_tmin_offset)
1095 nut_tmax = model.tjoin(
1096 nut_tmax_seconds, nut_tmax_offset)
1098 if nut_tmin < tmax and tmin < nut_tmax:
1099 if nut_tmin < tmin:
1100 insert.append((
1101 nut_tmin_seconds, nut_tmin_offset,
1102 tmin_seconds, tmin_offset,
1103 model.tscale_to_kscale(
1104 tmin_seconds - nut_tmin_seconds),
1105 nut_id))
1107 if tmax < nut_tmax:
1108 insert.append((
1109 tmax_seconds, tmax_offset,
1110 nut_tmax_seconds, nut_tmax_offset,
1111 model.tscale_to_kscale(
1112 nut_tmax_seconds - tmax_seconds),
1113 nut_id))
1115 delete.append((nut_id,))
1117 sql_add = '''
1118 INSERT INTO %(db)s.%(nuts)s (
1119 file_id, file_segment, file_element, kind_id,
1120 kind_codes_id, tmin_seconds, tmin_offset,
1121 tmax_seconds, tmax_offset, kscale )
1122 SELECT
1123 file_id, file_segment, file_element,
1124 kind_id, kind_codes_id, ?, ?, ?, ?, ?
1125 FROM %(db)s.%(nuts)s
1126 WHERE nut_id == ?
1127 '''
1128 cursor.executemany(sql_subst(sql_add), insert)
1130 sql_delete = '''
1131 DELETE FROM %(db)s.%(nuts)s WHERE nut_id == ?
1132 '''
1133 cursor.executemany(sql_subst(sql_delete), delete)
1135 def get_time_span(self, kinds=None):
1136 '''
1137 Get time interval over all content in selection.
1139 :param kinds:
1140 If not ``None``, restrict query to given content kinds.
1141 :type kind:
1142 list of str
1144 :complexity:
1145 O(1), independent of the number of nuts.
1147 :returns:
1148 ``(tmin, tmax)``, combined time interval of queried content kinds.
1149 '''
1151 sql_min = self._sql('''
1152 SELECT MIN(tmin_seconds), MIN(tmin_offset)
1153 FROM %(db)s.%(nuts)s
1154 WHERE kind_id == ?
1155 AND tmin_seconds == (
1156 SELECT MIN(tmin_seconds)
1157 FROM %(db)s.%(nuts)s
1158 WHERE kind_id == ?)
1159 ''')
1161 sql_max = self._sql('''
1162 SELECT MAX(tmax_seconds), MAX(tmax_offset)
1163 FROM %(db)s.%(nuts)s
1164 WHERE kind_id == ?
1165 AND tmax_seconds == (
1166 SELECT MAX(tmax_seconds)
1167 FROM %(db)s.%(nuts)s
1168 WHERE kind_id == ?)
1169 ''')
1171 gtmin = None
1172 gtmax = None
1174 if isinstance(kinds, str):
1175 kinds = [kinds]
1177 if kinds is None:
1178 kind_ids = model.g_content_kind_ids
1179 else:
1180 kind_ids = model.to_kind_ids(kinds)
1182 for kind_id in kind_ids:
1183 for tmin_seconds, tmin_offset in self._conn.execute(
1184 sql_min, (kind_id, kind_id)):
1185 tmin = model.tjoin(tmin_seconds, tmin_offset)
1186 if tmin is not None and (gtmin is None or tmin < gtmin):
1187 gtmin = tmin
1189 for (tmax_seconds, tmax_offset) in self._conn.execute(
1190 sql_max, (kind_id, kind_id)):
1191 tmax = model.tjoin(tmax_seconds, tmax_offset)
1192 if tmax is not None and (gtmax is None or tmax > gtmax):
1193 gtmax = tmax
1195 return gtmin, gtmax
1197 def has(self, kinds):
1198 '''
1199 Check availability of given content kinds.
1201 :param kinds:
1202 Content kinds to query.
1203 :type kind:
1204 list of str
1206 :returns:
1207 ``True`` if any of the queried content kinds is available
1208 in the selection.
1209 '''
1210 self_tmin, self_tmax = self.get_time_span(kinds)
1212 return None not in (self_tmin, self_tmax)
1214 def get_deltat_span(self, kind):
1215 '''
1216 Get min and max sampling interval of all content of given kind.
1218 :param kind:
1219 Content kind
1220 :type kind:
1221 str
1223 :returns: ``(deltat_min, deltat_max)``
1224 '''
1226 deltats = [
1227 deltat for deltat in self.get_deltats(kind)
1228 if deltat is not None]
1230 if deltats:
1231 return min(deltats), max(deltats)
1232 else:
1233 return None, None
1235 def iter_kinds(self, codes=None):
1236 '''
1237 Iterate over content types available in selection.
1239 :param codes:
1240 If given, get kinds only for selected codes identifier.
1241 Only a single identifier may be given here and no pattern matching
1242 is done, currently.
1243 :type codes:
1244 :py:class:`~pyrocko.squirrel.model.Codes`
1246 :yields:
1247 Available content kinds as :py:class:`str`.
1249 :complexity:
1250 O(1), independent of number of nuts.
1251 '''
1253 return self._database._iter_kinds(
1254 codes=codes,
1255 kind_codes_count='%(db)s.%(kind_codes_count)s' % self._names)
1257 def iter_deltats(self, kind=None):
1258 '''
1259 Iterate over sampling intervals available in selection.
1261 :param kind:
1262 If given, get sampling intervals only for a given content type.
1263 :type kind:
1264 str
1266 :yields:
1267 :py:class:`float` values.
1269 :complexity:
1270 O(1), independent of number of nuts.
1271 '''
1272 return self._database._iter_deltats(
1273 kind=kind,
1274 kind_codes_count='%(db)s.%(kind_codes_count)s' % self._names)
1276 def iter_codes(self, kind=None):
1277 '''
1278 Iterate over content identifier code sequences available in selection.
1280 :param kind:
1281 If given, get codes only for a given content type.
1282 :type kind:
1283 str
1285 :yields:
1286 :py:class:`tuple` of :py:class:`str`
1288 :complexity:
1289 O(1), independent of number of nuts.
1290 '''
1291 return self._database._iter_codes(
1292 kind=kind,
1293 kind_codes_count='%(db)s.%(kind_codes_count)s' % self._names)
1295 def _iter_codes_info(self, kind=None, codes=None):
1296 '''
1297 Iterate over number of occurrences of any (kind, codes) combination.
1299 :param kind:
1300 If given, get counts only for selected content type.
1301 :type kind:
1302 str
1304 :yields:
1305 Tuples of the form ``(kind, codes, deltat, kind_codes_id, count)``.
1307 :complexity:
1308 O(1), independent of number of nuts.
1309 '''
1310 return self._database._iter_codes_info(
1311 kind=kind,
1312 codes=codes,
1313 kind_codes_count='%(db)s.%(kind_codes_count)s' % self._names)
1315 def get_kinds(self, codes=None):
1316 '''
1317 Get content types available in selection.
1319 :param codes:
1320 If given, get kinds only for selected codes identifier.
1321 Only a single identifier may be given here and no pattern matching
1322 is done, currently.
1323 :type codes:
1324 :py:class:`~pyrocko.squirrel.model.Codes`
1326 :returns:
1327 Sorted list of available content types.
1328 :rtype:
1329 py:class:`list` of :py:class:`str`
1331 :complexity:
1332 O(1), independent of number of nuts.
1334 '''
1335 return sorted(list(self.iter_kinds(codes=codes)))
1337 def get_deltats(self, kind=None):
1338 '''
1339 Get sampling intervals available in selection.
1341 :param kind:
1342 If given, get sampling intervals only for selected content type.
1343 :type kind:
1344 str
1346 :complexity:
1347 O(1), independent of number of nuts.
1349 :returns: Sorted list of available sampling intervals.
1350 '''
1351 return sorted(list(self.iter_deltats(kind=kind)))
1353 def get_codes(self, kind=None):
1354 '''
1355 Get identifier code sequences available in selection.
1357 :param kind:
1358 If given, get codes only for selected content type.
1359 :type kind:
1360 str
1362 :complexity:
1363 O(1), independent of number of nuts.
1365 :returns: Sorted list of available codes as tuples of strings.
1366 '''
1367 return sorted(list(self.iter_codes(kind=kind)))
1369 def get_counts(self, kind=None):
1370 '''
1371 Get number of occurrences of any (kind, codes) combination.
1373 :param kind:
1374 If given, get codes only for selected content type.
1375 :type kind:
1376 str
1378 :complexity:
1379 O(1), independent of number of nuts.
1381 :returns: ``dict`` with ``counts[kind][codes]`` or ``counts[codes]``
1382 if kind is not ``None``
1383 '''
1384 d = {}
1385 for kind_id, codes, _, _, count in self._iter_codes_info(kind=kind):
1386 if kind_id not in d:
1387 v = d[kind_id] = {}
1388 else:
1389 v = d[kind_id]
1391 if codes not in v:
1392 v[codes] = 0
1394 v[codes] += count
1396 if kind is not None:
1397 return d[to_kind_id(kind)]
1398 else:
1399 return dict((to_kind(kind_id), v) for (kind_id, v) in d.items())
1401 def glob_codes(self, kind, codes):
1402 '''
1403 Find codes matching given patterns.
1405 :param kind:
1406 Content kind to be queried.
1407 :type kind:
1408 str
1410 :param codes:
1411 List of code patterns to query.
1412 :type codes:
1413 :py:class:`list` of :py:class:`~pyrocko.squirrel.model.Codes`
1414 objects appropriate for the queried content type, or anything which
1415 can be converted to such objects.
1417 :returns:
1418 List of matches of the form ``[kind_codes_id, codes, deltat]``.
1419 '''
1421 kind_id = to_kind_id(kind)
1422 args = [kind_id]
1423 pats = codes_patterns_for_kind(kind_id, codes)
1425 if pats:
1426 codes_cond = 'AND ( %s ) ' % ' OR '.join(
1427 ('kind_codes.codes GLOB ?',) * len(pats))
1429 args.extend(pat.safe_str for pat in pats)
1430 else:
1431 codes_cond = ''
1433 sql = self._sql('''
1434 SELECT kind_codes_id, codes, deltat FROM kind_codes
1435 WHERE
1436 kind_id == ? ''' + codes_cond)
1438 return list(map(list, self._conn.execute(sql, args)))
1440 def update(self, constraint=None, **kwargs):
1441 '''
1442 Update or partially update channel and event inventories.
1444 :param constraint:
1445 Selection of times or areas to be brought up to date.
1446 :type constraint:
1447 :py:class:`~pyrocko.squirrel.client.base.Constraint`
1449 :param \\*\\*kwargs:
1450 Shortcut for setting ``constraint=Constraint(**kwargs)``.
1452 This function triggers all attached remote sources, to check for
1453 updates in the meta-data. The sources will only submit queries when
1454 their expiration date has passed, or if the selection spans into
1455 previously unseen times or areas.
1456 '''
1458 if constraint is None:
1459 constraint = client.Constraint(**kwargs)
1461 for source in self._sources:
1462 source.update_channel_inventory(self, constraint)
1463 source.update_event_inventory(self, constraint)
1465 def update_waveform_promises(self, constraint=None, **kwargs):
1466 '''
1467 Permit downloading of remote waveforms.
1469 :param constraint:
1470 Remote waveforms compatible with the given constraint are enabled
1471 for download.
1472 :type constraint:
1473 :py:class:`~pyrocko.squirrel.client.base.Constraint`
1475 :param \\*\\*kwargs:
1476 Shortcut for setting ``constraint=Constraint(**kwargs)``.
1478 Calling this method permits Squirrel to download waveforms from remote
1479 sources when processing subsequent waveform requests. This works by
1480 inserting so called waveform promises into the database. It will look
1481 into the available channels for each remote source and create a promise
1482 for each channel compatible with the given constraint. If the promise
1483 then matches in a waveform request, Squirrel tries to download the
1484 waveform. If the download is successful, the downloaded waveform is
1485 added to the Squirrel and the promise is deleted. If the download
1486 fails, the promise is kept if the reason of failure looks like being
1487 temporary, e.g. because of a network failure. If the cause of failure
1488 however seems to be permanent, the promise is deleted so that no
1489 further attempts are made to download a waveform which might not be
1490 available from that server at all. To force re-scheduling after a
1491 permanent failure, call :py:meth:`update_waveform_promises`
1492 yet another time.
1493 '''
1495 if constraint is None:
1496 constraint = client.Constraint(**kwargs)
1498 for source in self._sources:
1499 source.update_waveform_promises(self, constraint)
1501 def remove_waveform_promises(self, from_database='selection'):
1502 '''
1503 Remove waveform promises from live selection or global database.
1505 Calling this function removes all waveform promises provided by the
1506 attached sources.
1508 :param from_database:
1509 Remove from live selection ``'selection'`` or global database
1510 ``'global'``.
1511 '''
1512 for source in self._sources:
1513 source.remove_waveform_promises(self, from_database=from_database)
1515 def update_responses(self, constraint=None, **kwargs):
1516 if constraint is None:
1517 constraint = client.Constraint(**kwargs)
1519 for source in self._sources:
1520 source.update_response_inventory(self, constraint)
1522 def get_nfiles(self):
1523 '''
1524 Get number of files in selection.
1525 '''
1527 sql = self._sql('''SELECT COUNT(*) FROM %(db)s.%(file_states)s''')
1528 for row in self._conn.execute(sql):
1529 return row[0]
1531 def get_nnuts(self):
1532 '''
1533 Get number of nuts in selection.
1534 '''
1536 sql = self._sql('''SELECT COUNT(*) FROM %(db)s.%(nuts)s''')
1537 for row in self._conn.execute(sql):
1538 return row[0]
1540 def get_total_size(self):
1541 '''
1542 Get aggregated file size available in selection.
1543 '''
1545 sql = self._sql('''
1546 SELECT SUM(files.size) FROM %(db)s.%(file_states)s
1547 INNER JOIN files
1548 ON %(db)s.%(file_states)s.file_id = files.file_id
1549 ''')
1551 for row in self._conn.execute(sql):
1552 return row[0] or 0
1554 def get_stats(self):
1555 '''
1556 Get statistics on contents available through this selection.
1557 '''
1559 kinds = self.get_kinds()
1560 time_spans = {}
1561 for kind in kinds:
1562 time_spans[kind] = self.get_time_span([kind])
1564 return SquirrelStats(
1565 nfiles=self.get_nfiles(),
1566 nnuts=self.get_nnuts(),
1567 kinds=kinds,
1568 codes=self.get_codes(),
1569 total_size=self.get_total_size(),
1570 counts=self.get_counts(),
1571 time_spans=time_spans,
1572 sources=[s.describe() for s in self._sources],
1573 operators=[op.describe() for op in self._operators])
1575 def get_content(
1576 self,
1577 nut,
1578 cache_id='default',
1579 accessor_id='default',
1580 show_progress=False,
1581 model='squirrel'):
1583 '''
1584 Get and possibly load full content for a given index entry from file.
1586 Loads the actual content objects (channel, station, waveform, ...) from
1587 file. For efficiency, sibling content (all stuff in the same file
1588 segment) will also be loaded as a side effect. The loaded contents are
1589 cached in the Squirrel object.
1590 '''
1592 content_cache = self._content_caches[cache_id]
1593 if not content_cache.has(nut):
1595 for nut_loaded in io.iload(
1596 nut.file_path,
1597 segment=nut.file_segment,
1598 format=nut.file_format,
1599 database=self._database,
1600 update_selection=self,
1601 show_progress=show_progress):
1603 content_cache.put(nut_loaded)
1605 try:
1606 return content_cache.get(nut, accessor_id, model)
1608 except KeyError:
1609 raise error.NotAvailable(
1610 'Unable to retrieve content: %s, %s, %s, %s' % nut.key)
1612 def advance_accessor(self, accessor_id='default', cache_id=None):
1613 '''
1614 Notify memory caches about consumer moving to a new data batch.
1616 :param accessor_id:
1617 Name of accessing consumer to be advanced.
1618 :type accessor_id:
1619 str
1621 :param cache_id:
1622 Name of cache to for which the accessor should be advanced. By
1623 default the named accessor is advanced in all registered caches.
1624 By default, two caches named ``'default'`` and ``'waveform'`` are
1625 available.
1626 :type cache_id:
1627 str
1629 See :py:class:`~pyrocko.squirrel.cache.ContentCache` for details on how
1630 Squirrel's memory caching works and can be tuned. Default behaviour is
1631 to release data when it has not been used in the latest data
1632 window/batch. If the accessor is never advanced, data is cached
1633 indefinitely - which is often desired e.g. for station meta-data.
1634 Methods for consecutive data traversal, like
1635 :py:meth:`chopper_waveforms` automatically advance and clear
1636 their accessor.
1637 '''
1638 for cache_ in (
1639 self._content_caches.keys()
1640 if cache_id is None
1641 else [cache_id]):
1643 self._content_caches[cache_].advance_accessor(accessor_id)
1645 def clear_accessor(self, accessor_id, cache_id=None):
1646 '''
1647 Notify memory caches about a consumer having finished.
1649 :param accessor_id:
1650 Name of accessor to be cleared.
1651 :type accessor_id:
1652 str
1654 :param cache_id:
1655 Name of cache for which the accessor should be cleared. By default
1656 the named accessor is cleared from all registered caches. By
1657 default, two caches named ``'default'`` and ``'waveform'`` are
1658 available.
1659 :type cache_id:
1660 str
1662 Calling this method clears all references to cache entries held by the
1663 named accessor. Cache entries are then freed if not referenced by any
1664 other accessor.
1665 '''
1667 for cache_ in (
1668 self._content_caches.keys()
1669 if cache_id is None
1670 else [cache_id]):
1672 self._content_caches[cache_].clear_accessor(accessor_id)
1674 def get_cache_stats(self, cache_id):
1675 return self._content_caches[cache_id].get_stats()
1677 @filldocs
1678 def get_stations(
1679 self, obj=None, tmin=None, tmax=None, time=None, codes=None,
1680 model='squirrel'):
1682 '''
1683 Get stations matching given constraints.
1685 %(query_args)s
1687 :param model:
1688 Select object model for returned values: ``'squirrel'`` to get
1689 Squirrel station objects or ``'pyrocko'`` to get Pyrocko station
1690 objects with channel information attached.
1691 :type model:
1692 str
1694 :returns:
1695 List of :py:class:`pyrocko.squirrel.Station
1696 <pyrocko.squirrel.model.Station>` objects by default or list of
1697 :py:class:`pyrocko.model.Station <pyrocko.model.station.Station>`
1698 objects if ``model='pyrocko'`` is requested.
1700 See :py:meth:`iter_nuts` for details on time span matching.
1701 '''
1703 if model == 'pyrocko':
1704 return self._get_pyrocko_stations(obj, tmin, tmax, time, codes)
1705 elif model in ('squirrel', 'stationxml', 'stationxml+'):
1706 args = self._get_selection_args(
1707 STATION, obj, tmin, tmax, time, codes)
1709 nuts = sorted(
1710 self.iter_nuts('station', *args), key=lambda nut: nut.dkey)
1712 return [self.get_content(nut, model=model) for nut in nuts]
1713 else:
1714 raise ValueError('Invalid station model: %s' % model)
1716 @filldocs
1717 def get_channels(
1718 self, obj=None, tmin=None, tmax=None, time=None, codes=None,
1719 model='squirrel'):
1721 '''
1722 Get channels matching given constraints.
1724 %(query_args)s
1726 :returns:
1727 List of :py:class:`~pyrocko.squirrel.model.Channel` objects.
1729 See :py:meth:`iter_nuts` for details on time span matching.
1730 '''
1732 args = self._get_selection_args(
1733 CHANNEL, obj, tmin, tmax, time, codes)
1735 nuts = sorted(
1736 self.iter_nuts('channel', *args), key=lambda nut: nut.dkey)
1738 return [self.get_content(nut, model=model) for nut in nuts]
1740 @filldocs
1741 def get_sensors(
1742 self, obj=None, tmin=None, tmax=None, time=None, codes=None):
1744 '''
1745 Get sensors matching given constraints.
1747 %(query_args)s
1749 :returns:
1750 List of :py:class:`~pyrocko.squirrel.model.Sensor` objects.
1752 See :py:meth:`iter_nuts` for details on time span matching.
1753 '''
1755 tmin, tmax, codes = self._get_selection_args(
1756 CHANNEL, obj, tmin, tmax, time, codes)
1758 if codes is not None:
1759 codes = codes_patterns_list(
1760 (entry.replace(channel=entry.channel[:-1] + '?')
1761 if entry != '*' else entry)
1762 for entry in codes)
1764 nuts = sorted(
1765 self.iter_nuts(
1766 'channel', tmin, tmax, codes), key=lambda nut: nut.dkey)
1768 return model.Sensor.from_channels(
1769 self.get_content(nut) for nut in nuts)
1771 @filldocs
1772 def get_responses(
1773 self, obj=None, tmin=None, tmax=None, time=None, codes=None,
1774 model='squirrel'):
1776 '''
1777 Get instrument responses matching given constraints.
1779 %(query_args)s
1781 :returns:
1782 List of :py:class:`~pyrocko.squirrel.model.Response` objects.
1784 See :py:meth:`iter_nuts` for details on time span matching.
1785 '''
1787 args = self._get_selection_args(
1788 RESPONSE, obj, tmin, tmax, time, codes)
1790 nuts = sorted(
1791 self.iter_nuts('response', *args), key=lambda nut: nut.dkey)
1793 return [self.get_content(nut, model=model) for nut in nuts]
1795 @filldocs
1796 def get_response(
1797 self, obj=None, tmin=None, tmax=None, time=None, codes=None,
1798 model='squirrel'):
1800 '''
1801 Get instrument response matching given constraints.
1803 %(query_args)s
1805 :returns:
1806 :py:class:`~pyrocko.squirrel.model.Response` object.
1808 Same as :py:meth:`get_responses` but returning exactly one response.
1809 Raises :py:exc:`~pyrocko.squirrel.error.NotAvailable` if zero or more
1810 than one is available.
1812 See :py:meth:`iter_nuts` for details on time span matching.
1813 '''
1815 if model == 'stationxml':
1816 model_ = 'stationxml+'
1817 else:
1818 model_ = model
1820 responses = self.get_responses(
1821 obj, tmin, tmax, time, codes, model=model_)
1822 if len(responses) == 0:
1823 raise error.NotAvailable(
1824 'No instrument response available (%s).'
1825 % self._get_selection_args_str(
1826 RESPONSE, obj, tmin, tmax, time, codes))
1828 elif len(responses) > 1:
1829 if model_ == 'squirrel':
1830 resps_sq = responses
1831 elif model_ == 'stationxml+':
1832 resps_sq = [resp[0] for resp in responses]
1833 else:
1834 raise ValueError('Invalid response model: %s' % model)
1836 rinfo = ':\n' + '\n'.join(
1837 ' ' + resp.summary for resp in resps_sq)
1839 raise error.NotAvailable(
1840 'Multiple instrument responses matching given constraints '
1841 '(%s)%s' % (
1842 self._get_selection_args_str(
1843 RESPONSE, obj, tmin, tmax, time, codes), rinfo))
1845 if model == 'stationxml':
1846 return responses[0][1]
1847 else:
1848 return responses[0]
1850 @filldocs
1851 def get_events(
1852 self, obj=None, tmin=None, tmax=None, time=None, codes=None):
1854 '''
1855 Get events matching given constraints.
1857 %(query_args)s
1859 :returns:
1860 List of :py:class:`~pyrocko.model.event.Event` objects.
1862 See :py:meth:`iter_nuts` for details on time span matching.
1863 '''
1865 args = self._get_selection_args(EVENT, obj, tmin, tmax, time, codes)
1866 nuts = sorted(
1867 self.iter_nuts('event', *args), key=lambda nut: nut.dkey)
1869 return [self.get_content(nut) for nut in nuts]
1871 def _redeem_promises(self, *args):
1873 tmin, tmax, _ = args
1875 waveforms = list(self.iter_nuts('waveform', *args))
1876 promises = list(self.iter_nuts('waveform_promise', *args))
1878 codes_to_avail = defaultdict(list)
1879 for nut in waveforms:
1880 codes_to_avail[nut.codes].append((nut.tmin, nut.tmax))
1882 def tts(x):
1883 if isinstance(x, tuple):
1884 return tuple(tts(e) for e in x)
1885 elif isinstance(x, list):
1886 return list(tts(e) for e in x)
1887 else:
1888 return util.time_to_str(x)
1890 orders = []
1891 for promise in promises:
1892 waveforms_avail = codes_to_avail[promise.codes]
1893 for block_tmin, block_tmax in blocks(
1894 max(tmin, promise.tmin),
1895 min(tmax, promise.tmax),
1896 promise.deltat):
1898 orders.append(
1899 WaveformOrder(
1900 source_id=promise.file_path,
1901 codes=promise.codes,
1902 tmin=block_tmin,
1903 tmax=block_tmax,
1904 deltat=promise.deltat,
1905 gaps=gaps(waveforms_avail, block_tmin, block_tmax)))
1907 orders_noop, orders = lpick(lambda order: order.gaps, orders)
1909 order_keys_noop = set(order_key(order) for order in orders_noop)
1910 if len(order_keys_noop) != 0 or len(orders_noop) != 0:
1911 logger.info(
1912 'Waveform orders already satisified with cached/local data: '
1913 '%i (%i)' % (len(order_keys_noop), len(orders_noop)))
1915 source_ids = []
1916 sources = {}
1917 for source in self._sources:
1918 if isinstance(source, fdsn.FDSNSource):
1919 source_ids.append(source._source_id)
1920 sources[source._source_id] = source
1922 source_priority = dict(
1923 (source_id, i) for (i, source_id) in enumerate(source_ids))
1925 order_groups = defaultdict(list)
1926 for order in orders:
1927 order_groups[order_key(order)].append(order)
1929 for k, order_group in order_groups.items():
1930 order_group.sort(
1931 key=lambda order: source_priority[order.source_id])
1933 n_order_groups = len(order_groups)
1935 if len(order_groups) != 0 or len(orders) != 0:
1936 logger.info(
1937 'Waveform orders standing for download: %i (%i)'
1938 % (len(order_groups), len(orders)))
1940 task = make_task('Waveform orders processed', n_order_groups)
1941 else:
1942 task = None
1944 def split_promise(order):
1945 self._split_nuts(
1946 'waveform_promise',
1947 order.tmin, order.tmax,
1948 codes=order.codes,
1949 path=order.source_id)
1951 def release_order_group(order):
1952 okey = order_key(order)
1953 for followup in order_groups[okey]:
1954 split_promise(followup)
1956 del order_groups[okey]
1958 if task:
1959 task.update(n_order_groups - len(order_groups))
1961 def noop(order):
1962 pass
1964 def success(order):
1965 release_order_group(order)
1966 split_promise(order)
1968 def batch_add(paths):
1969 self.add(paths)
1971 calls = queue.Queue()
1973 def enqueue(f):
1974 def wrapper(*args):
1975 calls.put((f, args))
1977 return wrapper
1979 for order in orders_noop:
1980 split_promise(order)
1982 while order_groups:
1984 orders_now = []
1985 empty = []
1986 for k, order_group in order_groups.items():
1987 try:
1988 orders_now.append(order_group.pop(0))
1989 except IndexError:
1990 empty.append(k)
1992 for k in empty:
1993 del order_groups[k]
1995 by_source_id = defaultdict(list)
1996 for order in orders_now:
1997 by_source_id[order.source_id].append(order)
1999 threads = []
2000 for source_id in by_source_id:
2001 def download():
2002 try:
2003 sources[source_id].download_waveforms(
2004 by_source_id[source_id],
2005 success=enqueue(success),
2006 error_permanent=enqueue(split_promise),
2007 error_temporary=noop,
2008 batch_add=enqueue(batch_add))
2010 finally:
2011 calls.put(None)
2013 thread = threading.Thread(target=download)
2014 thread.start()
2015 threads.append(thread)
2017 ndone = 0
2018 while ndone < len(threads):
2019 ret = calls.get()
2020 if ret is None:
2021 ndone += 1
2022 else:
2023 ret[0](*ret[1])
2025 for thread in threads:
2026 thread.join()
2028 if task:
2029 task.update(n_order_groups - len(order_groups))
2031 if task:
2032 task.done()
2034 @filldocs
2035 def get_waveform_nuts(
2036 self, obj=None, tmin=None, tmax=None, time=None, codes=None):
2038 '''
2039 Get waveform content entities matching given constraints.
2041 %(query_args)s
2043 Like :py:meth:`get_nuts` with ``kind='waveform'`` but additionally
2044 resolves matching waveform promises (downloads waveforms from remote
2045 sources).
2047 See :py:meth:`iter_nuts` for details on time span matching.
2048 '''
2050 args = self._get_selection_args(WAVEFORM, obj, tmin, tmax, time, codes)
2051 self._redeem_promises(*args)
2052 return sorted(
2053 self.iter_nuts('waveform', *args), key=lambda nut: nut.dkey)
2055 @filldocs
2056 def get_waveforms(
2057 self, obj=None, tmin=None, tmax=None, time=None, codes=None,
2058 uncut=False, want_incomplete=True, degap=True, maxgap=5,
2059 maxlap=None, snap=None, include_last=False, load_data=True,
2060 accessor_id='default', operator_params=None):
2062 '''
2063 Get waveforms matching given constraints.
2065 %(query_args)s
2067 :param uncut:
2068 Set to ``True``, to disable cutting traces to [``tmin``, ``tmax``]
2069 and to disable degapping/deoverlapping. Returns untouched traces as
2070 they are read from file segment. File segments are always read in
2071 their entirety.
2072 :type uncut:
2073 bool
2075 :param want_incomplete:
2076 If ``True``, gappy/incomplete traces are included in the result.
2077 :type want_incomplete:
2078 bool
2080 :param degap:
2081 If ``True``, connect traces and remove gaps and overlaps.
2082 :type degap:
2083 bool
2085 :param maxgap:
2086 Maximum gap size in samples which is filled with interpolated
2087 samples when ``degap`` is ``True``.
2088 :type maxgap:
2089 int
2091 :param maxlap:
2092 Maximum overlap size in samples which is removed when ``degap`` is
2093 ``True``.
2094 :type maxlap:
2095 int
2097 :param snap:
2098 Rounding functions used when computing sample index from time
2099 instance, for trace start and trace end, respectively. By default,
2100 ``(round, round)`` is used.
2101 :type snap:
2102 tuple of 2 callables
2104 :param include_last:
2105 If ``True``, add one more sample to the returned traces (the sample
2106 which would be the first sample of a query with ``tmin`` set to the
2107 current value of ``tmax``).
2108 :type include_last:
2109 bool
2111 :param load_data:
2112 If ``True``, waveform data samples are read from files (or cache).
2113 If ``False``, meta-information-only traces are returned (dummy
2114 traces with no data samples).
2115 :type load_data:
2116 bool
2118 :param accessor_id:
2119 Name of consumer on who's behalf data is accessed. Used in cache
2120 management (see :py:mod:`~pyrocko.squirrel.cache`). Used as a key
2121 to distinguish different points of extraction for the decision of
2122 when to release cached waveform data. Should be used when data is
2123 alternately extracted from more than one region / selection.
2124 :type accessor_id:
2125 str
2127 See :py:meth:`iter_nuts` for details on time span matching.
2129 Loaded data is kept in memory (at least) until
2130 :py:meth:`clear_accessor` has been called or
2131 :py:meth:`advance_accessor` has been called two consecutive times
2132 without data being accessed between the two calls (by this accessor).
2133 Data may still be further kept in the memory cache if held alive by
2134 consumers with a different ``accessor_id``.
2135 '''
2137 tmin, tmax, codes = self._get_selection_args(
2138 WAVEFORM, obj, tmin, tmax, time, codes)
2140 self_tmin, self_tmax = self.get_time_span(
2141 ['waveform', 'waveform_promise'])
2143 if None in (self_tmin, self_tmax):
2144 logger.warning(
2145 'No waveforms available.')
2146 return []
2148 tmin = tmin if tmin is not None else self_tmin
2149 tmax = tmax if tmax is not None else self_tmax
2151 if codes is not None and len(codes) == 1:
2152 # TODO: fix for multiple / mixed codes
2153 operator = self.get_operator(codes[0])
2154 if operator is not None:
2155 return operator.get_waveforms(
2156 self, codes[0],
2157 tmin=tmin, tmax=tmax,
2158 uncut=uncut, want_incomplete=want_incomplete, degap=degap,
2159 maxgap=maxgap, maxlap=maxlap, snap=snap,
2160 include_last=include_last, load_data=load_data,
2161 accessor_id=accessor_id, params=operator_params)
2163 nuts = self.get_waveform_nuts(obj, tmin, tmax, time, codes)
2165 if load_data:
2166 traces = [
2167 self.get_content(nut, 'waveform', accessor_id) for nut in nuts]
2169 else:
2170 traces = [
2171 trace.Trace(**nut.trace_kwargs) for nut in nuts]
2173 if uncut:
2174 return traces
2176 if snap is None:
2177 snap = (round, round)
2179 chopped = []
2180 for tr in traces:
2181 if not load_data and tr.ydata is not None:
2182 tr = tr.copy(data=False)
2183 tr.ydata = None
2185 try:
2186 chopped.append(tr.chop(
2187 tmin, tmax,
2188 inplace=False,
2189 snap=snap,
2190 include_last=include_last))
2192 except trace.NoData:
2193 pass
2195 processed = self._process_chopped(
2196 chopped, degap, maxgap, maxlap, want_incomplete, tmin, tmax)
2198 return processed
2200 @filldocs
2201 def chopper_waveforms(
2202 self, obj=None, tmin=None, tmax=None, time=None, codes=None,
2203 tinc=None, tpad=0.,
2204 want_incomplete=True, snap_window=False,
2205 degap=True, maxgap=5, maxlap=None,
2206 snap=None, include_last=False, load_data=True,
2207 accessor_id=None, clear_accessor=True, operator_params=None,
2208 grouping=None):
2210 '''
2211 Iterate window-wise over waveform archive.
2213 %(query_args)s
2215 :param tinc:
2216 Time increment (window shift time) (default uses ``tmax-tmin``).
2217 :type tinc:
2218 timestamp
2220 :param tpad:
2221 Padding time appended on either side of the data window (window
2222 overlap is ``2*tpad``).
2223 :type tpad:
2224 timestamp
2226 :param want_incomplete:
2227 If ``True``, gappy/incomplete traces are included in the result.
2228 :type want_incomplete:
2229 bool
2231 :param snap_window:
2232 If ``True``, start time windows at multiples of tinc with respect
2233 to system time zero.
2234 :type snap_window:
2235 bool
2237 :param degap:
2238 If ``True``, connect traces and remove gaps and overlaps.
2239 :type degap:
2240 bool
2242 :param maxgap:
2243 Maximum gap size in samples which is filled with interpolated
2244 samples when ``degap`` is ``True``.
2245 :type maxgap:
2246 int
2248 :param maxlap:
2249 Maximum overlap size in samples which is removed when ``degap`` is
2250 ``True``.
2251 :type maxlap:
2252 int
2254 :param snap:
2255 Rounding functions used when computing sample index from time
2256 instance, for trace start and trace end, respectively. By default,
2257 ``(round, round)`` is used.
2258 :type snap:
2259 tuple of 2 callables
2261 :param include_last:
2262 If ``True``, add one more sample to the returned traces (the sample
2263 which would be the first sample of a query with ``tmin`` set to the
2264 current value of ``tmax``).
2265 :type include_last:
2266 bool
2268 :param load_data:
2269 If ``True``, waveform data samples are read from files (or cache).
2270 If ``False``, meta-information-only traces are returned (dummy
2271 traces with no data samples).
2272 :type load_data:
2273 bool
2275 :param accessor_id:
2276 Name of consumer on who's behalf data is accessed. Used in cache
2277 management (see :py:mod:`~pyrocko.squirrel.cache`). Used as a key
2278 to distinguish different points of extraction for the decision of
2279 when to release cached waveform data. Should be used when data is
2280 alternately extracted from more than one region / selection.
2281 :type accessor_id:
2282 str
2284 :param clear_accessor:
2285 If ``True`` (default), :py:meth:`clear_accessor` is called when the
2286 chopper finishes. Set to ``False`` to keep loaded waveforms in
2287 memory when the generator returns.
2288 :type clear_accessor:
2289 bool
2291 :param grouping:
2292 By default, traversal over the data is over time and all matching
2293 traces of a time window are yielded. Using this option, it is
2294 possible to traverse the data first by group (e.g. station or
2295 network) and second by time. This can reduce the number of traces
2296 in each batch and thus reduce the memory footprint of the process.
2297 :type grouping:
2298 :py:class:`~pyrocko.squirrel.operator.Grouping`
2300 :yields:
2301 A list of :py:class:`~pyrocko.trace.Trace` objects for every
2302 extracted time window.
2304 See :py:meth:`iter_nuts` for details on time span matching.
2305 '''
2307 tmin, tmax, codes = self._get_selection_args(
2308 WAVEFORM, obj, tmin, tmax, time, codes)
2310 self_tmin, self_tmax = self.get_time_span(
2311 ['waveform', 'waveform_promise'])
2313 if None in (self_tmin, self_tmax):
2314 logger.warning(
2315 'Content has undefined time span. No waveforms and no '
2316 'waveform promises?')
2317 return
2319 if snap_window and tinc is not None:
2320 tmin = tmin if tmin is not None else self_tmin
2321 tmax = tmax if tmax is not None else self_tmax
2322 tmin = math.floor(tmin / tinc) * tinc
2323 tmax = math.ceil(tmax / tinc) * tinc
2324 else:
2325 tmin = tmin if tmin is not None else self_tmin + tpad
2326 tmax = tmax if tmax is not None else self_tmax - tpad
2328 tinc = tinc if tinc is not None else tmax - tmin
2330 try:
2331 if accessor_id is None:
2332 accessor_id = 'chopper%i' % self._n_choppers_active
2334 self._n_choppers_active += 1
2336 eps = tinc * 1e-6
2337 if tinc != 0.0:
2338 nwin = int(((tmax - eps) - tmin) / tinc) + 1
2339 else:
2340 nwin = 1
2342 if grouping is None:
2343 codes_list = [codes]
2344 else:
2345 operator = Operator(
2346 filtering=CodesPatternFiltering(codes=codes),
2347 grouping=grouping)
2349 available = set(self.get_codes(kind='waveform'))
2350 available.update(self.get_codes(kind='waveform_promise'))
2351 operator.update_mappings(sorted(available))
2353 def iter_codes_list():
2354 for scl in operator.iter_in_codes():
2355 yield codes_patterns_list(scl)
2357 codes_list = iter_codes_list()
2359 for scl in codes_list:
2360 for iwin in range(nwin):
2361 wmin, wmax = tmin+iwin*tinc, min(tmin+(iwin+1)*tinc, tmax)
2363 chopped = self.get_waveforms(
2364 tmin=wmin-tpad,
2365 tmax=wmax+tpad,
2366 codes=scl,
2367 snap=snap,
2368 include_last=include_last,
2369 load_data=load_data,
2370 want_incomplete=want_incomplete,
2371 degap=degap,
2372 maxgap=maxgap,
2373 maxlap=maxlap,
2374 accessor_id=accessor_id,
2375 operator_params=operator_params)
2377 self.advance_accessor(accessor_id)
2379 yield Batch(
2380 tmin=wmin,
2381 tmax=wmax,
2382 i=iwin,
2383 n=nwin,
2384 traces=chopped)
2386 finally:
2387 self._n_choppers_active -= 1
2388 if clear_accessor:
2389 self.clear_accessor(accessor_id, 'waveform')
2391 def _process_chopped(
2392 self, chopped, degap, maxgap, maxlap, want_incomplete, tmin, tmax):
2394 chopped.sort(key=lambda a: a.full_id)
2395 if degap:
2396 chopped = trace.degapper(chopped, maxgap=maxgap, maxlap=maxlap)
2398 if not want_incomplete:
2399 chopped_weeded = []
2400 for tr in chopped:
2401 emin = tr.tmin - tmin
2402 emax = tr.tmax + tr.deltat - tmax
2403 if (abs(emin) <= 0.5*tr.deltat and abs(emax) <= 0.5*tr.deltat):
2404 chopped_weeded.append(tr)
2406 elif degap:
2407 if (0. < emin <= 5. * tr.deltat
2408 and -5. * tr.deltat <= emax < 0.):
2410 tr.extend(tmin, tmax-tr.deltat, fillmethod='repeat')
2411 chopped_weeded.append(tr)
2413 chopped = chopped_weeded
2415 return chopped
2417 def _get_pyrocko_stations(
2418 self, obj=None, tmin=None, tmax=None, time=None, codes=None):
2420 from pyrocko import model as pmodel
2422 by_nsl = defaultdict(lambda: (list(), list()))
2423 for station in self.get_stations(obj, tmin, tmax, time, codes):
2424 sargs = station._get_pyrocko_station_args()
2425 by_nsl[station.codes.nsl][0].append(sargs)
2427 for channel in self.get_channels(obj, tmin, tmax, time, codes):
2428 sargs = channel._get_pyrocko_station_args()
2429 sargs_list, channels_list = by_nsl[channel.codes.nsl]
2430 sargs_list.append(sargs)
2431 channels_list.append(channel)
2433 pstations = []
2434 nsls = list(by_nsl.keys())
2435 nsls.sort()
2436 for nsl in nsls:
2437 sargs_list, channels_list = by_nsl[nsl]
2438 sargs = util.consistency_merge(
2439 [('',) + x for x in sargs_list])
2441 by_c = defaultdict(list)
2442 for ch in channels_list:
2443 by_c[ch.codes.channel].append(ch._get_pyrocko_channel_args())
2445 chas = list(by_c.keys())
2446 chas.sort()
2447 pchannels = []
2448 for cha in chas:
2449 list_of_cargs = by_c[cha]
2450 cargs = util.consistency_merge(
2451 [('',) + x for x in list_of_cargs])
2452 pchannels.append(pmodel.Channel(*cargs))
2454 pstations.append(
2455 pmodel.Station(*sargs, channels=pchannels))
2457 return pstations
2459 @property
2460 def pile(self):
2462 '''
2463 Emulates the older :py:class:`pyrocko.pile.Pile` interface.
2465 This property exposes a :py:class:`pyrocko.squirrel.pile.Pile` object,
2466 which emulates most of the older :py:class:`pyrocko.pile.Pile` methods
2467 but uses the fluffy power of the Squirrel under the hood.
2469 This interface can be used as a drop-in replacement for piles which are
2470 used in existing scripts and programs for efficient waveform data
2471 access. The Squirrel-based pile scales better for large datasets. Newer
2472 scripts should use Squirrel's native methods to avoid the emulation
2473 overhead.
2474 '''
2475 from . import pile
2477 if self._pile is None:
2478 self._pile = pile.Pile(self)
2480 return self._pile
2482 def snuffle(self):
2483 '''
2484 Look at dataset in Snuffler.
2485 '''
2486 self.pile.snuffle()
2488 def _gather_codes_keys(self, kind, gather, selector):
2489 return set(
2490 gather(codes)
2491 for codes in self.iter_codes(kind)
2492 if selector is None or selector(codes))
2494 def __str__(self):
2495 return str(self.get_stats())
2497 def get_coverage(
2498 self, kind, tmin=None, tmax=None, codes=None, limit=None):
2500 '''
2501 Get coverage information.
2503 Get information about strips of gapless data coverage.
2505 :param kind:
2506 Content kind to be queried.
2507 :type kind:
2508 str
2510 :param tmin:
2511 Start time of query interval.
2512 :type tmin:
2513 timestamp
2515 :param tmax:
2516 End time of query interval.
2517 :type tmax:
2518 timestamp
2520 :param codes:
2521 If given, restrict query to given content codes patterns.
2522 :type codes:
2523 :py:class:`list` of :py:class:`~pyrocko.squirrel.model.Codes`
2524 objects appropriate for the queried content type, or anything which
2525 can be converted to such objects.
2527 :param limit:
2528 Limit query to return only up to a given maximum number of entries
2529 per matching time series (without setting this option, very gappy
2530 data could cause the query to execute for a very long time).
2531 :type limit:
2532 int
2534 :returns:
2535 Information about time spans covered by the requested time series
2536 data.
2537 :rtype:
2538 :py:class:`list` of :py:class:`Coverage` objects
2539 '''
2541 tmin_seconds, tmin_offset = model.tsplit(tmin)
2542 tmax_seconds, tmax_offset = model.tsplit(tmax)
2543 kind_id = to_kind_id(kind)
2545 codes_info = list(self._iter_codes_info(kind=kind))
2547 kdata_all = []
2548 if codes is None:
2549 for _, codes_entry, deltat, kind_codes_id, _ in codes_info:
2550 kdata_all.append(
2551 (codes_entry, kind_codes_id, codes_entry, deltat))
2553 else:
2554 for codes_entry in codes:
2555 pattern = to_codes(kind_id, codes_entry)
2556 for _, codes_entry, deltat, kind_codes_id, _ in codes_info:
2557 if model.match_codes(pattern, codes_entry):
2558 kdata_all.append(
2559 (pattern, kind_codes_id, codes_entry, deltat))
2561 kind_codes_ids = [x[1] for x in kdata_all]
2563 counts_at_tmin = {}
2564 if tmin is not None:
2565 for nut in self.iter_nuts(
2566 kind, tmin, tmin, kind_codes_ids=kind_codes_ids):
2568 k = nut.codes, nut.deltat
2569 if k not in counts_at_tmin:
2570 counts_at_tmin[k] = 0
2572 counts_at_tmin[k] += 1
2574 coverages = []
2575 for pattern, kind_codes_id, codes_entry, deltat in kdata_all:
2576 entry = [pattern, codes_entry, deltat, None, None, []]
2577 for i, order in [(0, 'ASC'), (1, 'DESC')]:
2578 sql = self._sql('''
2579 SELECT
2580 time_seconds,
2581 time_offset
2582 FROM %(db)s.%(coverage)s
2583 WHERE
2584 kind_codes_id == ?
2585 ORDER BY
2586 kind_codes_id ''' + order + ''',
2587 time_seconds ''' + order + ''',
2588 time_offset ''' + order + '''
2589 LIMIT 1
2590 ''')
2592 for row in self._conn.execute(sql, [kind_codes_id]):
2593 entry[3+i] = model.tjoin(row[0], row[1])
2595 if None in entry[3:5]:
2596 continue
2598 args = [kind_codes_id]
2600 sql_time = ''
2601 if tmin is not None:
2602 # intentionally < because (== tmin) is queried from nuts
2603 sql_time += ' AND ( ? < time_seconds ' \
2604 'OR ( ? == time_seconds AND ? < time_offset ) ) '
2605 args.extend([tmin_seconds, tmin_seconds, tmin_offset])
2607 if tmax is not None:
2608 sql_time += ' AND ( time_seconds < ? ' \
2609 'OR ( ? == time_seconds AND time_offset <= ? ) ) '
2610 args.extend([tmax_seconds, tmax_seconds, tmax_offset])
2612 sql_limit = ''
2613 if limit is not None:
2614 sql_limit = ' LIMIT ?'
2615 args.append(limit)
2617 sql = self._sql('''
2618 SELECT
2619 time_seconds,
2620 time_offset,
2621 step
2622 FROM %(db)s.%(coverage)s
2623 WHERE
2624 kind_codes_id == ?
2625 ''' + sql_time + '''
2626 ORDER BY
2627 kind_codes_id,
2628 time_seconds,
2629 time_offset
2630 ''' + sql_limit)
2632 rows = list(self._conn.execute(sql, args))
2634 if limit is not None and len(rows) == limit:
2635 entry[-1] = None
2636 else:
2637 counts = counts_at_tmin.get((codes_entry, deltat), 0)
2638 tlast = None
2639 if tmin is not None:
2640 entry[-1].append((tmin, counts))
2641 tlast = tmin
2643 for row in rows:
2644 t = model.tjoin(row[0], row[1])
2645 counts += row[2]
2646 entry[-1].append((t, counts))
2647 tlast = t
2649 if tmax is not None and (tlast is None or tlast != tmax):
2650 entry[-1].append((tmax, counts))
2652 coverages.append(model.Coverage.from_values(entry + [kind_id]))
2654 return coverages
2656 def get_stationxml(
2657 self, obj=None, tmin=None, tmax=None, time=None, codes=None,
2658 level='response'):
2660 '''
2661 Get station/channel/response metadata in StationXML representation.
2663 %(query_args)s
2665 :returns:
2666 :py:class:`~pyrocko.io.stationxml.FDSNStationXML` object.
2667 '''
2669 if level not in ('network', 'station', 'channel', 'response'):
2670 raise ValueError('Invalid level: %s' % level)
2672 tmin, tmax, codes = self._get_selection_args(
2673 CHANNEL, obj, tmin, tmax, time, codes)
2675 filtering = CodesPatternFiltering(codes=codes)
2677 nslcs = list(set(
2678 codes.nslc for codes in
2679 filtering.filter(self.get_codes(kind='channel'))))
2681 from pyrocko.io import stationxml as sx
2683 networks = []
2684 for net, stas in prefix_tree(nslcs):
2685 network = sx.Network(code=net)
2686 networks.append(network)
2688 if level not in ('station', 'channel', 'response'):
2689 continue
2691 for sta, locs in stas:
2692 stations = self.get_stations(
2693 tmin=tmin,
2694 tmax=tmax,
2695 codes=(net, sta, '*'),
2696 model='stationxml')
2698 errors = sx.check_overlaps(
2699 'Station', (net, sta), stations)
2701 if errors:
2702 raise sx.Inconsistencies(
2703 'Inconsistencies found:\n %s'
2704 % '\n '.join(errors))
2706 network.station_list.extend(stations)
2708 if level not in ('channel', 'response'):
2709 continue
2711 for loc, chas in locs:
2712 for cha, _ in chas:
2713 channels = self.get_channels(
2714 tmin=tmin,
2715 tmax=tmax,
2716 codes=(net, sta, loc, cha),
2717 model='stationxml')
2719 errors = sx.check_overlaps(
2720 'Channel', (net, sta, loc, cha), channels)
2722 if errors:
2723 raise sx.Inconsistencies(
2724 'Inconsistencies found:\n %s'
2725 % '\n '.join(errors))
2727 for channel in channels:
2728 station = sx.find_containing(stations, channel)
2729 if station is not None:
2730 station.channel_list.append(channel)
2731 else:
2732 raise sx.Inconsistencies(
2733 'No station or station epoch found for '
2734 'channel: %s' % '.'.join(
2735 (net, sta, loc, cha)))
2737 if level != 'response':
2738 continue
2740 response_sq, response_sx = self.get_response(
2741 codes=(net, sta, loc, cha),
2742 tmin=channel.start_date,
2743 tmax=channel.end_date,
2744 model='stationxml+')
2746 if not (
2747 sx.eq_open(
2748 channel.start_date, response_sq.tmin)
2749 and sx.eq_open(
2750 channel.end_date, response_sq.tmax)):
2752 raise sx.Inconsistencies(
2753 'Response time span does not match '
2754 'channel time span: %s' % '.'.join(
2755 (net, sta, loc, cha)))
2757 channel.response = response_sx
2759 return sx.FDSNStationXML(
2760 source='Generated by Pyrocko Squirrel.',
2761 network_list=networks)
2763 def add_operator(self, op):
2764 self._operators.append(op)
2766 def update_operator_mappings(self):
2767 available = self.get_codes(kind=('channel'))
2769 for operator in self._operators:
2770 operator.update_mappings(available, self._operator_registry)
2772 def iter_operator_mappings(self):
2773 for operator in self._operators:
2774 for in_codes, out_codes in operator.iter_mappings():
2775 yield operator, in_codes, out_codes
2777 def get_operator_mappings(self):
2778 return list(self.iter_operator_mappings())
2780 def get_operator(self, codes):
2781 try:
2782 return self._operator_registry[codes][0]
2783 except KeyError:
2784 return None
2786 def get_operator_group(self, codes):
2787 try:
2788 return self._operator_registry[codes]
2789 except KeyError:
2790 return None, (None, None, None)
2792 def iter_operator_codes(self):
2793 for _, _, out_codes in self.iter_operator_mappings():
2794 for codes in out_codes:
2795 yield codes
2797 def get_operator_codes(self):
2798 return list(self.iter_operator_codes())
2800 def print_tables(self, table_names=None, stream=None):
2801 '''
2802 Dump raw database tables in textual form (for debugging purposes).
2804 :param table_names:
2805 Names of tables to be dumped or ``None`` to dump all.
2806 :type table_names:
2807 :py:class:`list` of :py:class:`str`
2809 :param stream:
2810 Open file or ``None`` to dump to standard output.
2811 '''
2813 if stream is None:
2814 stream = sys.stdout
2816 if isinstance(table_names, str):
2817 table_names = [table_names]
2819 if table_names is None:
2820 table_names = [
2821 'selection_file_states',
2822 'selection_nuts',
2823 'selection_kind_codes_count',
2824 'files', 'nuts', 'kind_codes', 'kind_codes_count']
2826 m = {
2827 'selection_file_states': '%(db)s.%(file_states)s',
2828 'selection_nuts': '%(db)s.%(nuts)s',
2829 'selection_kind_codes_count': '%(db)s.%(kind_codes_count)s',
2830 'files': 'files',
2831 'nuts': 'nuts',
2832 'kind_codes': 'kind_codes',
2833 'kind_codes_count': 'kind_codes_count'}
2835 for table_name in table_names:
2836 self._database.print_table(
2837 m[table_name] % self._names, stream=stream)
2840class SquirrelStats(Object):
2841 '''
2842 Container to hold statistics about contents available from a Squirrel.
2844 See also :py:meth:`Squirrel.get_stats`.
2845 '''
2847 nfiles = Int.T(
2848 help='Number of files in selection.')
2849 nnuts = Int.T(
2850 help='Number of index nuts in selection.')
2851 codes = List.T(
2852 Tuple.T(content_t=String.T()),
2853 help='Available code sequences in selection, e.g. '
2854 '(agency, network, station, location) for stations nuts.')
2855 kinds = List.T(
2856 String.T(),
2857 help='Available content types in selection.')
2858 total_size = Int.T(
2859 help='Aggregated file size of files is selection.')
2860 counts = Dict.T(
2861 String.T(), Dict.T(Tuple.T(content_t=String.T()), Int.T()),
2862 help='Breakdown of how many nuts of any content type and code '
2863 'sequence are available in selection, ``counts[kind][codes]``.')
2864 time_spans = Dict.T(
2865 String.T(), Tuple.T(content_t=Timestamp.T()),
2866 help='Time spans by content type.')
2867 sources = List.T(
2868 String.T(),
2869 help='Descriptions of attached sources.')
2870 operators = List.T(
2871 String.T(),
2872 help='Descriptions of attached operators.')
2874 def __str__(self):
2875 kind_counts = dict(
2876 (kind, sum(self.counts[kind].values())) for kind in self.kinds)
2878 scodes = model.codes_to_str_abbreviated(self.codes)
2880 ssources = '<none>' if not self.sources else '\n' + '\n'.join(
2881 ' ' + s for s in self.sources)
2883 soperators = '<none>' if not self.operators else '\n' + '\n'.join(
2884 ' ' + s for s in self.operators)
2886 def stime(t):
2887 return util.tts(t) if t is not None and t not in (
2888 model.g_tmin, model.g_tmax) else '<none>'
2890 def stable(rows):
2891 ns = [max(len(w) for w in col) for col in zip(*rows)]
2892 return '\n'.join(
2893 ' '.join(w.ljust(n) for n, w in zip(ns, row))
2894 for row in rows)
2896 def indent(s):
2897 return '\n'.join(' '+line for line in s.splitlines())
2899 stspans = '<none>' if not self.kinds else '\n' + indent(stable([(
2900 kind + ':',
2901 str(kind_counts[kind]),
2902 stime(self.time_spans[kind][0]),
2903 '-',
2904 stime(self.time_spans[kind][1])) for kind in sorted(self.kinds)]))
2906 s = '''
2907Number of files: %i
2908Total size of known files: %s
2909Number of index nuts: %i
2910Available content kinds: %s
2911Available codes: %s
2912Sources: %s
2913Operators: %s''' % (
2914 self.nfiles,
2915 util.human_bytesize(self.total_size),
2916 self.nnuts,
2917 stspans, scodes, ssources, soperators)
2919 return s.lstrip()
2922__all__ = [
2923 'Squirrel',
2924 'SquirrelStats',
2925]