1# http://pyrocko.org - GPLv3
2#
3# The Pyrocko Developers, 21st Century
4# ---|P------/S----------~Lg----------
6from __future__ import absolute_import, print_function
8import sys
9import os
11import math
12import logging
13import threading
14import queue
15from collections import defaultdict
17from pyrocko.guts import Object, Int, List, Tuple, String, Timestamp, Dict
18from pyrocko import util, trace
19from pyrocko.progress import progress
21from . import model, io, cache, dataset
23from .model import to_kind_id, WaveformOrder, to_kind, to_codes, \
24 STATION, CHANNEL, RESPONSE, EVENT, WAVEFORM, codes_patterns_list, \
25 codes_patterns_for_kind
26from .client import fdsn, catalog
27from .selection import Selection, filldocs
28from .database import abspath
29from .operators.base import Operator, CodesPatternFiltering
30from . import client, environment, error
32logger = logging.getLogger('psq.base')
34guts_prefix = 'squirrel'
37def make_task(*args):
38 return progress.task(*args, logger=logger)
41def lpick(condition, seq):
42 ft = [], []
43 for ele in seq:
44 ft[int(bool(condition(ele)))].append(ele)
46 return ft
49def blocks(tmin, tmax, deltat, nsamples_block=100000):
50 tblock = util.to_time_float(deltat * nsamples_block)
51 iblock_min = int(math.floor(tmin / tblock))
52 iblock_max = int(math.ceil(tmax / tblock))
53 for iblock in range(iblock_min, iblock_max):
54 yield iblock * tblock, (iblock+1) * tblock
57def gaps(avail, tmin, tmax):
58 assert tmin < tmax
60 data = [(tmax, 1), (tmin, -1)]
61 for (tmin_a, tmax_a) in avail:
62 assert tmin_a < tmax_a
63 data.append((tmin_a, 1))
64 data.append((tmax_a, -1))
66 data.sort()
67 s = 1
68 gaps = []
69 tmin_g = None
70 for t, x in data:
71 if s == 1 and x == -1:
72 tmin_g = t
73 elif s == 0 and x == 1 and tmin_g is not None:
74 tmax_g = t
75 if tmin_g != tmax_g:
76 gaps.append((tmin_g, tmax_g))
78 s += x
80 return gaps
83def order_key(order):
84 return (order.codes, order.tmin, order.tmax)
87def _is_exact(pat):
88 return not ('*' in pat or '?' in pat or ']' in pat or '[' in pat)
91class Batch(object):
92 '''
93 Batch of waveforms from window-wise data extraction.
95 Encapsulates state and results yielded for each window in window-wise
96 waveform extraction with the :py:meth:`Squirrel.chopper_waveforms` method.
98 *Attributes:*
100 .. py:attribute:: tmin
102 Start of this time window.
104 .. py:attribute:: tmax
106 End of this time window.
108 .. py:attribute:: i
110 Index of this time window in sequence.
112 .. py:attribute:: n
114 Total number of time windows in sequence.
116 .. py:attribute:: traces
118 Extracted waveforms for this time window.
119 '''
121 def __init__(self, tmin, tmax, i, n, traces):
122 self.tmin = tmin
123 self.tmax = tmax
124 self.i = i
125 self.n = n
126 self.traces = traces
129class Squirrel(Selection):
130 '''
131 Prompt, lazy, indexing, caching, dynamic seismological dataset access.
133 :param env:
134 Squirrel environment instance or directory path to use as starting
135 point for its detection. By default, the current directory is used as
136 starting point. When searching for a usable environment the directory
137 ``'.squirrel'`` or ``'squirrel'`` in the current (or starting point)
138 directory is used if it exists, otherwise the parent directories are
139 search upwards for the existence of such a directory. If no such
140 directory is found, the user's global Squirrel environment
141 ``'$HOME/.pyrocko/squirrel'`` is used.
142 :type env:
143 :py:class:`~pyrocko.squirrel.environment.Environment` or
144 :py:class:`str`
146 :param database:
147 Database instance or path to database. By default the
148 database found in the detected Squirrel environment is used.
149 :type database:
150 :py:class:`~pyrocko.squirrel.database.Database` or :py:class:`str`
152 :param cache_path:
153 Directory path to use for data caching. By default, the ``'cache'``
154 directory in the detected Squirrel environment is used.
155 :type cache_path:
156 :py:class:`str`
158 :param persistent:
159 If given a name, create a persistent selection.
160 :type persistent:
161 :py:class:`str`
163 This is the central class of the Squirrel framework. It provides a unified
164 interface to query and access seismic waveforms, station meta-data and
165 event information from local file collections and remote data sources. For
166 prompt responses, a profound database setup is used under the hood. To
167 speed up assemblage of ad-hoc data selections, files are indexed on first
168 use and the extracted meta-data is remembered in the database for
169 subsequent accesses. Bulk data is lazily loaded from disk and remote
170 sources, just when requested. Once loaded, data is cached in memory to
171 expedite typical access patterns. Files and data sources can be dynamically
172 added to and removed from the Squirrel selection at runtime.
174 Queries are restricted to the contents of the files currently added to the
175 Squirrel selection (usually a subset of the file meta-information
176 collection in the database). This list of files is referred to here as the
177 "selection". By default, temporary tables are created in the attached
178 database to hold the names of the files in the selection as well as various
179 indices and counters. These tables are only visible inside the application
180 which created them and are deleted when the database connection is closed
181 or the application exits. To create a selection which is not deleted at
182 exit, supply a name to the ``persistent`` argument of the Squirrel
183 constructor. Persistent selections are shared among applications using the
184 same database.
186 **Method summary**
188 Some of the methods are implemented in :py:class:`Squirrel`'s base class
189 :py:class:`~pyrocko.squirrel.selection.Selection`.
191 .. autosummary::
193 ~Squirrel.add
194 ~Squirrel.add_source
195 ~Squirrel.add_fdsn
196 ~Squirrel.add_catalog
197 ~Squirrel.add_dataset
198 ~Squirrel.add_virtual
199 ~Squirrel.update
200 ~Squirrel.update_waveform_promises
201 ~Squirrel.advance_accessor
202 ~Squirrel.clear_accessor
203 ~Squirrel.reload
204 ~pyrocko.squirrel.selection.Selection.iter_paths
205 ~Squirrel.iter_nuts
206 ~Squirrel.iter_kinds
207 ~Squirrel.iter_deltats
208 ~Squirrel.iter_codes
209 ~pyrocko.squirrel.selection.Selection.get_paths
210 ~Squirrel.get_nuts
211 ~Squirrel.get_kinds
212 ~Squirrel.get_deltats
213 ~Squirrel.get_codes
214 ~Squirrel.get_counts
215 ~Squirrel.get_time_span
216 ~Squirrel.get_deltat_span
217 ~Squirrel.get_nfiles
218 ~Squirrel.get_nnuts
219 ~Squirrel.get_total_size
220 ~Squirrel.get_stats
221 ~Squirrel.get_content
222 ~Squirrel.get_stations
223 ~Squirrel.get_channels
224 ~Squirrel.get_responses
225 ~Squirrel.get_events
226 ~Squirrel.get_waveform_nuts
227 ~Squirrel.get_waveforms
228 ~Squirrel.chopper_waveforms
229 ~Squirrel.get_coverage
230 ~Squirrel.pile
231 ~Squirrel.snuffle
232 ~Squirrel.glob_codes
233 ~pyrocko.squirrel.selection.Selection.get_database
234 ~Squirrel.print_tables
235 '''
237 def __init__(
238 self, env=None, database=None, cache_path=None, persistent=None):
240 if not isinstance(env, environment.Environment):
241 env = environment.get_environment(env)
243 if database is None:
244 database = env.expand_path(env.database_path)
246 if cache_path is None:
247 cache_path = env.expand_path(env.cache_path)
249 if persistent is None:
250 persistent = env.persistent
252 Selection.__init__(
253 self, database=database, persistent=persistent)
255 self.get_database().set_basepath(os.path.dirname(env.get_basepath()))
257 self._content_caches = {
258 'waveform': cache.ContentCache(),
259 'default': cache.ContentCache()}
261 self._cache_path = cache_path
263 self._sources = []
264 self._operators = []
265 self._operator_registry = {}
267 self._pile = None
268 self._n_choppers_active = 0
270 self._names.update({
271 'nuts': self.name + '_nuts',
272 'kind_codes_count': self.name + '_kind_codes_count',
273 'coverage': self.name + '_coverage'})
275 with self.transaction('create tables') as cursor:
276 self._create_tables_squirrel(cursor)
278 def _create_tables_squirrel(self, cursor):
280 cursor.execute(self._register_table(self._sql(
281 '''
282 CREATE TABLE IF NOT EXISTS %(db)s.%(nuts)s (
283 nut_id integer PRIMARY KEY,
284 file_id integer,
285 file_segment integer,
286 file_element integer,
287 kind_id integer,
288 kind_codes_id integer,
289 tmin_seconds integer,
290 tmin_offset integer,
291 tmax_seconds integer,
292 tmax_offset integer,
293 kscale integer)
294 ''')))
296 cursor.execute(self._register_table(self._sql(
297 '''
298 CREATE TABLE IF NOT EXISTS %(db)s.%(kind_codes_count)s (
299 kind_codes_id integer PRIMARY KEY,
300 count integer)
301 ''')))
303 cursor.execute(self._sql(
304 '''
305 CREATE UNIQUE INDEX IF NOT EXISTS %(db)s.%(nuts)s_file_element
306 ON %(nuts)s (file_id, file_segment, file_element)
307 '''))
309 cursor.execute(self._sql(
310 '''
311 CREATE INDEX IF NOT EXISTS %(db)s.%(nuts)s_index_file_id
312 ON %(nuts)s (file_id)
313 '''))
315 cursor.execute(self._sql(
316 '''
317 CREATE INDEX IF NOT EXISTS %(db)s.%(nuts)s_index_tmin_seconds
318 ON %(nuts)s (kind_id, tmin_seconds)
319 '''))
321 cursor.execute(self._sql(
322 '''
323 CREATE INDEX IF NOT EXISTS %(db)s.%(nuts)s_index_tmax_seconds
324 ON %(nuts)s (kind_id, tmax_seconds)
325 '''))
327 cursor.execute(self._sql(
328 '''
329 CREATE INDEX IF NOT EXISTS %(db)s.%(nuts)s_index_kscale
330 ON %(nuts)s (kind_id, kscale, tmin_seconds)
331 '''))
333 cursor.execute(self._sql(
334 '''
335 CREATE TRIGGER IF NOT EXISTS %(db)s.%(nuts)s_delete_nuts
336 BEFORE DELETE ON main.files FOR EACH ROW
337 BEGIN
338 DELETE FROM %(nuts)s WHERE file_id == old.file_id;
339 END
340 '''))
342 # trigger only on size to make silent update of mtime possible
343 cursor.execute(self._sql(
344 '''
345 CREATE TRIGGER IF NOT EXISTS %(db)s.%(nuts)s_delete_nuts2
346 BEFORE UPDATE OF size ON main.files FOR EACH ROW
347 BEGIN
348 DELETE FROM %(nuts)s WHERE file_id == old.file_id;
349 END
350 '''))
352 cursor.execute(self._sql(
353 '''
354 CREATE TRIGGER IF NOT EXISTS
355 %(db)s.%(file_states)s_delete_files
356 BEFORE DELETE ON %(db)s.%(file_states)s FOR EACH ROW
357 BEGIN
358 DELETE FROM %(nuts)s WHERE file_id == old.file_id;
359 END
360 '''))
362 cursor.execute(self._sql(
363 '''
364 CREATE TRIGGER IF NOT EXISTS %(db)s.%(nuts)s_inc_kind_codes
365 BEFORE INSERT ON %(nuts)s FOR EACH ROW
366 BEGIN
367 INSERT OR IGNORE INTO %(kind_codes_count)s VALUES
368 (new.kind_codes_id, 0);
369 UPDATE %(kind_codes_count)s
370 SET count = count + 1
371 WHERE new.kind_codes_id
372 == %(kind_codes_count)s.kind_codes_id;
373 END
374 '''))
376 cursor.execute(self._sql(
377 '''
378 CREATE TRIGGER IF NOT EXISTS %(db)s.%(nuts)s_dec_kind_codes
379 BEFORE DELETE ON %(nuts)s FOR EACH ROW
380 BEGIN
381 UPDATE %(kind_codes_count)s
382 SET count = count - 1
383 WHERE old.kind_codes_id
384 == %(kind_codes_count)s.kind_codes_id;
385 END
386 '''))
388 cursor.execute(self._register_table(self._sql(
389 '''
390 CREATE TABLE IF NOT EXISTS %(db)s.%(coverage)s (
391 kind_codes_id integer,
392 time_seconds integer,
393 time_offset integer,
394 step integer)
395 ''')))
397 cursor.execute(self._sql(
398 '''
399 CREATE UNIQUE INDEX IF NOT EXISTS %(db)s.%(coverage)s_time
400 ON %(coverage)s (kind_codes_id, time_seconds, time_offset)
401 '''))
403 cursor.execute(self._sql(
404 '''
405 CREATE TRIGGER IF NOT EXISTS %(db)s.%(nuts)s_add_coverage
406 AFTER INSERT ON %(nuts)s FOR EACH ROW
407 BEGIN
408 INSERT OR IGNORE INTO %(coverage)s VALUES
409 (new.kind_codes_id, new.tmin_seconds, new.tmin_offset, 0)
410 ;
411 UPDATE %(coverage)s
412 SET step = step + 1
413 WHERE new.kind_codes_id == %(coverage)s.kind_codes_id
414 AND new.tmin_seconds == %(coverage)s.time_seconds
415 AND new.tmin_offset == %(coverage)s.time_offset
416 ;
417 INSERT OR IGNORE INTO %(coverage)s VALUES
418 (new.kind_codes_id, new.tmax_seconds, new.tmax_offset, 0)
419 ;
420 UPDATE %(coverage)s
421 SET step = step - 1
422 WHERE new.kind_codes_id == %(coverage)s.kind_codes_id
423 AND new.tmax_seconds == %(coverage)s.time_seconds
424 AND new.tmax_offset == %(coverage)s.time_offset
425 ;
426 DELETE FROM %(coverage)s
427 WHERE new.kind_codes_id == %(coverage)s.kind_codes_id
428 AND new.tmin_seconds == %(coverage)s.time_seconds
429 AND new.tmin_offset == %(coverage)s.time_offset
430 AND step == 0
431 ;
432 DELETE FROM %(coverage)s
433 WHERE new.kind_codes_id == %(coverage)s.kind_codes_id
434 AND new.tmax_seconds == %(coverage)s.time_seconds
435 AND new.tmax_offset == %(coverage)s.time_offset
436 AND step == 0
437 ;
438 END
439 '''))
441 cursor.execute(self._sql(
442 '''
443 CREATE TRIGGER IF NOT EXISTS %(db)s.%(nuts)s_remove_coverage
444 BEFORE DELETE ON %(nuts)s FOR EACH ROW
445 BEGIN
446 INSERT OR IGNORE INTO %(coverage)s VALUES
447 (old.kind_codes_id, old.tmin_seconds, old.tmin_offset, 0)
448 ;
449 UPDATE %(coverage)s
450 SET step = step - 1
451 WHERE old.kind_codes_id == %(coverage)s.kind_codes_id
452 AND old.tmin_seconds == %(coverage)s.time_seconds
453 AND old.tmin_offset == %(coverage)s.time_offset
454 ;
455 INSERT OR IGNORE INTO %(coverage)s VALUES
456 (old.kind_codes_id, old.tmax_seconds, old.tmax_offset, 0)
457 ;
458 UPDATE %(coverage)s
459 SET step = step + 1
460 WHERE old.kind_codes_id == %(coverage)s.kind_codes_id
461 AND old.tmax_seconds == %(coverage)s.time_seconds
462 AND old.tmax_offset == %(coverage)s.time_offset
463 ;
464 DELETE FROM %(coverage)s
465 WHERE old.kind_codes_id == %(coverage)s.kind_codes_id
466 AND old.tmin_seconds == %(coverage)s.time_seconds
467 AND old.tmin_offset == %(coverage)s.time_offset
468 AND step == 0
469 ;
470 DELETE FROM %(coverage)s
471 WHERE old.kind_codes_id == %(coverage)s.kind_codes_id
472 AND old.tmax_seconds == %(coverage)s.time_seconds
473 AND old.tmax_offset == %(coverage)s.time_offset
474 AND step == 0
475 ;
476 END
477 '''))
479 def _delete(self):
480 '''Delete database tables associated with this Squirrel.'''
482 with self.transaction('delete tables') as cursor:
483 for s in '''
484 DROP TRIGGER %(db)s.%(nuts)s_delete_nuts;
485 DROP TRIGGER %(db)s.%(nuts)s_delete_nuts2;
486 DROP TRIGGER %(db)s.%(file_states)s_delete_files;
487 DROP TRIGGER %(db)s.%(nuts)s_inc_kind_codes;
488 DROP TRIGGER %(db)s.%(nuts)s_dec_kind_codes;
489 DROP TABLE %(db)s.%(nuts)s;
490 DROP TABLE %(db)s.%(kind_codes_count)s;
491 DROP TRIGGER IF EXISTS %(db)s.%(nuts)s_add_coverage;
492 DROP TRIGGER IF EXISTS %(db)s.%(nuts)s_remove_coverage;
493 DROP TABLE IF EXISTS %(db)s.%(coverage)s;
494 '''.strip().splitlines():
496 cursor.execute(self._sql(s))
498 Selection._delete(self)
500 @filldocs
501 def add(self,
502 paths,
503 kinds=None,
504 format='detect',
505 include=None,
506 exclude=None,
507 check=True):
509 '''
510 Add files to the selection.
512 :param paths:
513 Iterator yielding paths to files or directories to be added to the
514 selection. Recurses into directories. If given a ``str``, it
515 is treated as a single path to be added.
516 :type paths:
517 :py:class:`list` of :py:class:`str`
519 :param kinds:
520 Content types to be made available through the Squirrel selection.
521 By default, all known content types are accepted.
522 :type kinds:
523 :py:class:`list` of :py:class:`str`
525 :param format:
526 File format identifier or ``'detect'`` to enable auto-detection
527 (available: %(file_formats)s).
528 :type format:
529 str
531 :param include:
532 If not ``None``, files are only included if their paths match the
533 given regular expression pattern.
534 :type format:
535 str
537 :param exclude:
538 If not ``None``, files are only included if their paths do not
539 match the given regular expression pattern.
540 :type format:
541 str
543 :param check:
544 If ``True``, all file modification times are checked to see if
545 cached information has to be updated (slow). If ``False``, only
546 previously unknown files are indexed and cached information is used
547 for known files, regardless of file state (fast, corrresponds to
548 Squirrel's ``--optimistic`` mode). File deletions will go
549 undetected in the latter case.
550 :type check:
551 bool
553 :Complexity:
554 O(log N)
555 '''
557 if isinstance(kinds, str):
558 kinds = (kinds,)
560 if isinstance(paths, str):
561 paths = [paths]
563 kind_mask = model.to_kind_mask(kinds)
565 with progress.view():
566 Selection.add(
567 self, util.iter_select_files(
568 paths,
569 show_progress=False,
570 include=include,
571 exclude=exclude,
572 pass_through=lambda path: path.startswith('virtual:')
573 ), kind_mask, format)
575 self._load(check)
576 self._update_nuts()
578 def reload(self):
579 '''
580 Check for modifications and reindex modified files.
582 Based on file modification times.
583 '''
585 self._set_file_states_force_check()
586 self._load(check=True)
587 self._update_nuts()
589 def add_virtual(self, nuts, virtual_paths=None):
590 '''
591 Add content which is not backed by files.
593 :param nuts:
594 Content pieces to be added.
595 :type nuts:
596 iterator yielding :py:class:`~pyrocko.squirrel.model.Nut` objects
598 :param virtual_paths:
599 List of virtual paths to prevent creating a temporary list of the
600 nuts while aggregating the file paths for the selection.
601 :type virtual_paths:
602 :py:class:`list` of :py:class:`str`
604 Stores to the main database and the selection.
605 '''
607 if isinstance(virtual_paths, str):
608 virtual_paths = [virtual_paths]
610 if virtual_paths is None:
611 if not isinstance(nuts, list):
612 nuts = list(nuts)
613 virtual_paths = set(nut.file_path for nut in nuts)
615 Selection.add(self, virtual_paths)
616 self.get_database().dig(nuts)
617 self._update_nuts()
619 def add_volatile(self, nuts):
620 if not isinstance(nuts, list):
621 nuts = list(nuts)
623 paths = list(set(nut.file_path for nut in nuts))
624 io.backends.virtual.add_nuts(nuts)
625 self.add_virtual(nuts, paths)
626 self._volatile_paths.extend(paths)
628 def add_volatile_waveforms(self, traces):
629 '''
630 Add in-memory waveforms which will be removed when the app closes.
631 '''
633 name = model.random_name()
635 path = 'virtual:volatile:%s' % name
637 nuts = []
638 for itr, tr in enumerate(traces):
639 assert tr.tmin <= tr.tmax
640 tmin_seconds, tmin_offset = model.tsplit(tr.tmin)
641 tmax_seconds, tmax_offset = model.tsplit(
642 tr.tmin + tr.data_len()*tr.deltat)
644 nuts.append(model.Nut(
645 file_path=path,
646 file_format='virtual',
647 file_segment=itr,
648 file_element=0,
649 file_mtime=0,
650 codes=tr.codes,
651 tmin_seconds=tmin_seconds,
652 tmin_offset=tmin_offset,
653 tmax_seconds=tmax_seconds,
654 tmax_offset=tmax_offset,
655 deltat=tr.deltat,
656 kind_id=to_kind_id('waveform'),
657 content=tr))
659 self.add_volatile(nuts)
660 return path
662 def _load(self, check):
663 for _ in io.iload(
664 self,
665 content=[],
666 skip_unchanged=True,
667 check=check):
668 pass
670 def _update_nuts(self, transaction=None):
671 transaction = transaction or self.transaction('update nuts')
672 with make_task('Aggregating selection') as task, \
673 transaction as cursor:
675 self._conn.set_progress_handler(task.update, 100000)
676 nrows = cursor.execute(self._sql(
677 '''
678 INSERT INTO %(db)s.%(nuts)s
679 SELECT NULL,
680 nuts.file_id, nuts.file_segment, nuts.file_element,
681 nuts.kind_id, nuts.kind_codes_id,
682 nuts.tmin_seconds, nuts.tmin_offset,
683 nuts.tmax_seconds, nuts.tmax_offset,
684 nuts.kscale
685 FROM %(db)s.%(file_states)s
686 INNER JOIN nuts
687 ON %(db)s.%(file_states)s.file_id == nuts.file_id
688 INNER JOIN kind_codes
689 ON nuts.kind_codes_id ==
690 kind_codes.kind_codes_id
691 WHERE %(db)s.%(file_states)s.file_state != 2
692 AND (((1 << kind_codes.kind_id)
693 & %(db)s.%(file_states)s.kind_mask) != 0)
694 ''')).rowcount
696 task.update(nrows)
697 self._set_file_states_known(transaction)
698 self._conn.set_progress_handler(None, 0)
700 def add_source(self, source, check=True):
701 '''
702 Add remote resource.
704 :param source:
705 Remote data access client instance.
706 :type source:
707 subclass of :py:class:`~pyrocko.squirrel.client.base.Source`
708 '''
710 self._sources.append(source)
711 source.setup(self, check=check)
713 def add_fdsn(self, *args, **kwargs):
714 '''
715 Add FDSN site for transparent remote data access.
717 Arguments are passed to
718 :py:class:`~pyrocko.squirrel.client.fdsn.FDSNSource`.
719 '''
721 self.add_source(fdsn.FDSNSource(*args, **kwargs))
723 def add_catalog(self, *args, **kwargs):
724 '''
725 Add online catalog for transparent event data access.
727 Arguments are passed to
728 :py:class:`~pyrocko.squirrel.client.catalog.CatalogSource`.
729 '''
731 self.add_source(catalog.CatalogSource(*args, **kwargs))
733 def add_dataset(self, ds, check=True, warn_persistent=True):
734 '''
735 Read dataset description from file and add its contents.
737 :param ds:
738 Path to dataset description file or dataset description object
739 . See :py:mod:`~pyrocko.squirrel.dataset`.
740 :type ds:
741 :py:class:`str` or :py:class:`~pyrocko.squirrel.dataset.Dataset`
743 :param check:
744 If ``True``, all file modification times are checked to see if
745 cached information has to be updated (slow). If ``False``, only
746 previously unknown files are indexed and cached information is used
747 for known files, regardless of file state (fast, corrresponds to
748 Squirrel's ``--optimistic`` mode). File deletions will go
749 undetected in the latter case.
750 :type check:
751 bool
752 '''
753 if isinstance(ds, str):
754 ds = dataset.read_dataset(ds)
755 path = ds
756 else:
757 path = None
759 if warn_persistent and ds.persistent and (
760 not self._persistent or (self._persistent != ds.persistent)):
762 logger.warning(
763 'Dataset `persistent` flag ignored. Can not be set on already '
764 'existing Squirrel instance.%s' % (
765 ' Dataset: %s' % path if path else ''))
767 ds.setup(self, check=check)
769 def _get_selection_args(
770 self, kind_id,
771 obj=None, tmin=None, tmax=None, time=None, codes=None):
773 if codes is not None:
774 codes = codes_patterns_for_kind(kind_id, codes)
776 if time is not None:
777 tmin = time
778 tmax = time
780 if obj is not None:
781 tmin = tmin if tmin is not None else obj.tmin
782 tmax = tmax if tmax is not None else obj.tmax
783 codes = codes if codes is not None else codes_patterns_for_kind(
784 kind_id, obj.codes)
786 return tmin, tmax, codes
788 def _get_selection_args_str(self, *args, **kwargs):
790 tmin, tmax, codes = self._get_selection_args(*args, **kwargs)
791 return 'tmin: %s, tmax: %s, codes: %s' % (
792 util.time_to_str(tmin) if tmin is not None else 'none',
793 util.time_to_str(tmax) if tmin is not None else 'none',
794 ','.join(str(entry) for entry in codes))
796 def _selection_args_to_kwargs(
797 self, obj=None, tmin=None, tmax=None, time=None, codes=None):
799 return dict(obj=obj, tmin=tmin, tmax=tmax, time=time, codes=codes)
801 def _timerange_sql(self, tmin, tmax, kind, cond, args, naiv):
803 tmin_seconds, tmin_offset = model.tsplit(tmin)
804 tmax_seconds, tmax_offset = model.tsplit(tmax)
805 if naiv:
806 cond.append('%(db)s.%(nuts)s.tmin_seconds <= ?')
807 args.append(tmax_seconds)
808 else:
809 tscale_edges = model.tscale_edges
810 tmin_cond = []
811 for kscale in range(tscale_edges.size + 1):
812 if kscale != tscale_edges.size:
813 tscale = int(tscale_edges[kscale])
814 tmin_cond.append('''
815 (%(db)s.%(nuts)s.kind_id = ?
816 AND %(db)s.%(nuts)s.kscale == ?
817 AND %(db)s.%(nuts)s.tmin_seconds BETWEEN ? AND ?)
818 ''')
819 args.extend(
820 (to_kind_id(kind), kscale,
821 tmin_seconds - tscale - 1, tmax_seconds + 1))
823 else:
824 tmin_cond.append('''
825 (%(db)s.%(nuts)s.kind_id == ?
826 AND %(db)s.%(nuts)s.kscale == ?
827 AND %(db)s.%(nuts)s.tmin_seconds <= ?)
828 ''')
830 args.extend(
831 (to_kind_id(kind), kscale, tmax_seconds + 1))
832 if tmin_cond:
833 cond.append(' ( ' + ' OR '.join(tmin_cond) + ' ) ')
835 cond.append('%(db)s.%(nuts)s.tmax_seconds >= ?')
836 args.append(tmin_seconds)
838 def _codes_match_sql(self, kind_id, codes, cond, args):
839 pats = codes_patterns_for_kind(kind_id, codes)
840 if pats is None:
841 return
843 pats_exact = []
844 pats_nonexact = []
845 for pat in pats:
846 spat = pat.safe_str
847 (pats_exact if _is_exact(spat) else pats_nonexact).append(spat)
849 cond_exact = None
850 if pats_exact:
851 cond_exact = ' ( kind_codes.codes IN ( %s ) ) ' % ', '.join(
852 '?'*len(pats_exact))
854 args.extend(pats_exact)
856 cond_nonexact = None
857 if pats_nonexact:
858 cond_nonexact = ' ( %s ) ' % ' OR '.join(
859 ('kind_codes.codes GLOB ?',) * len(pats_nonexact))
861 args.extend(pats_nonexact)
863 if cond_exact and cond_nonexact:
864 cond.append(' ( %s OR %s ) ' % (cond_exact, cond_nonexact))
866 elif cond_exact:
867 cond.append(cond_exact)
869 elif cond_nonexact:
870 cond.append(cond_nonexact)
872 def iter_nuts(
873 self, kind=None, tmin=None, tmax=None, codes=None, naiv=False,
874 kind_codes_ids=None, path=None):
876 '''
877 Iterate over content entities matching given constraints.
879 :param kind:
880 Content kind (or kinds) to extract.
881 :type kind:
882 :py:class:`str`, :py:class:`list` of :py:class:`str`
884 :param tmin:
885 Start time of query interval.
886 :type tmin:
887 timestamp
889 :param tmax:
890 End time of query interval.
891 :type tmax:
892 timestamp
894 :param codes:
895 List of code patterns to query.
896 :type codes:
897 :py:class:`list` of :py:class:`~pyrocko.squirrel.model.Codes`
898 objects appropriate for the queried content type, or anything which
899 can be converted to such objects.
901 :param naiv:
902 Bypass time span lookup through indices (slow, for testing).
903 :type naiv:
904 :py:class:`bool`
906 :param kind_codes_ids:
907 Kind-codes IDs of contents to be retrieved (internal use).
908 :type kind_codes_ids:
909 :py:class:`list` of :py:class:`int`
911 :yields:
912 :py:class:`~pyrocko.squirrel.model.Nut` objects representing the
913 intersecting content.
915 :complexity:
916 O(log N) for the time selection part due to heavy use of database
917 indices.
919 Query time span is treated as a half-open interval ``[tmin, tmax)``.
920 However, if ``tmin`` equals ``tmax``, the edge logics are modified to
921 closed-interval so that content intersecting with the time instant ``t
922 = tmin = tmax`` is returned (otherwise nothing would be returned as
923 ``[t, t)`` never matches anything).
925 Time spans of content entities to be matched are also treated as half
926 open intervals, e.g. content span ``[0, 1)`` is matched by query span
927 ``[0, 1)`` but not by ``[-1, 0)`` or ``[1, 2)``. Also here, logics are
928 modified to closed-interval when the content time span is an empty
929 interval, i.e. to indicate a time instant. E.g. time instant 0 is
930 matched by ``[0, 1)`` but not by ``[-1, 0)`` or ``[1, 2)``.
931 '''
933 if not isinstance(kind, str):
934 if kind is None:
935 kind = model.g_content_kinds
936 for kind_ in kind:
937 for nut in self.iter_nuts(kind_, tmin, tmax, codes):
938 yield nut
940 return
942 kind_id = to_kind_id(kind)
944 cond = []
945 args = []
946 if tmin is not None or tmax is not None:
947 assert kind is not None
948 if tmin is None:
949 tmin = self.get_time_span()[0]
950 if tmax is None:
951 tmax = self.get_time_span()[1] + 1.0
953 self._timerange_sql(tmin, tmax, kind, cond, args, naiv)
955 cond.append('kind_codes.kind_id == ?')
956 args.append(kind_id)
958 if codes is not None:
959 self._codes_match_sql(kind_id, codes, cond, args)
961 if kind_codes_ids is not None:
962 cond.append(
963 ' ( kind_codes.kind_codes_id IN ( %s ) ) ' % ', '.join(
964 '?'*len(kind_codes_ids)))
966 args.extend(kind_codes_ids)
968 db = self.get_database()
969 if path is not None:
970 cond.append('files.path == ?')
971 args.append(db.relpath(abspath(path)))
973 sql = ('''
974 SELECT
975 files.path,
976 files.format,
977 files.mtime,
978 files.size,
979 %(db)s.%(nuts)s.file_segment,
980 %(db)s.%(nuts)s.file_element,
981 kind_codes.kind_id,
982 kind_codes.codes,
983 %(db)s.%(nuts)s.tmin_seconds,
984 %(db)s.%(nuts)s.tmin_offset,
985 %(db)s.%(nuts)s.tmax_seconds,
986 %(db)s.%(nuts)s.tmax_offset,
987 kind_codes.deltat
988 FROM files
989 INNER JOIN %(db)s.%(nuts)s
990 ON files.file_id == %(db)s.%(nuts)s.file_id
991 INNER JOIN kind_codes
992 ON %(db)s.%(nuts)s.kind_codes_id == kind_codes.kind_codes_id
993 ''')
995 if cond:
996 sql += ''' WHERE ''' + ' AND '.join(cond)
998 sql = self._sql(sql)
999 if tmin is None and tmax is None:
1000 for row in self._conn.execute(sql, args):
1001 row = (db.abspath(row[0]),) + row[1:]
1002 nut = model.Nut(values_nocheck=row)
1003 yield nut
1004 else:
1005 assert tmin is not None and tmax is not None
1006 if tmin == tmax:
1007 for row in self._conn.execute(sql, args):
1008 row = (db.abspath(row[0]),) + row[1:]
1009 nut = model.Nut(values_nocheck=row)
1010 if (nut.tmin <= tmin < nut.tmax) \
1011 or (nut.tmin == nut.tmax and tmin == nut.tmin):
1013 yield nut
1014 else:
1015 for row in self._conn.execute(sql, args):
1016 row = (db.abspath(row[0]),) + row[1:]
1017 nut = model.Nut(values_nocheck=row)
1018 if (tmin < nut.tmax and nut.tmin < tmax) \
1019 or (nut.tmin == nut.tmax
1020 and tmin <= nut.tmin < tmax):
1022 yield nut
1024 def get_nuts(self, *args, **kwargs):
1025 '''
1026 Get content entities matching given constraints.
1028 Like :py:meth:`iter_nuts` but returns results as a list.
1029 '''
1031 return list(self.iter_nuts(*args, **kwargs))
1033 def _split_nuts(
1034 self, kind, tmin=None, tmax=None, codes=None, path=None):
1036 kind_id = to_kind_id(kind)
1037 tmin_seconds, tmin_offset = model.tsplit(tmin)
1038 tmax_seconds, tmax_offset = model.tsplit(tmax)
1040 names_main_nuts = dict(self._names)
1041 names_main_nuts.update(db='main', nuts='nuts')
1043 db = self.get_database()
1045 def main_nuts(s):
1046 return s % names_main_nuts
1048 with self.transaction('split nuts') as cursor:
1049 # modify selection and main
1050 for sql_subst in [
1051 self._sql, main_nuts]:
1053 cond = []
1054 args = []
1056 self._timerange_sql(tmin, tmax, kind, cond, args, False)
1058 if codes is not None:
1059 self._codes_match_sql(kind_id, codes, cond, args)
1061 if path is not None:
1062 cond.append('files.path == ?')
1063 args.append(db.relpath(abspath(path)))
1065 sql = sql_subst('''
1066 SELECT
1067 %(db)s.%(nuts)s.nut_id,
1068 %(db)s.%(nuts)s.tmin_seconds,
1069 %(db)s.%(nuts)s.tmin_offset,
1070 %(db)s.%(nuts)s.tmax_seconds,
1071 %(db)s.%(nuts)s.tmax_offset,
1072 kind_codes.deltat
1073 FROM files
1074 INNER JOIN %(db)s.%(nuts)s
1075 ON files.file_id == %(db)s.%(nuts)s.file_id
1076 INNER JOIN kind_codes
1077 ON %(db)s.%(nuts)s.kind_codes_id == kind_codes.kind_codes_id
1078 WHERE ''' + ' AND '.join(cond)) # noqa
1080 insert = []
1081 delete = []
1082 for row in cursor.execute(sql, args):
1083 nut_id, nut_tmin_seconds, nut_tmin_offset, \
1084 nut_tmax_seconds, nut_tmax_offset, nut_deltat = row
1086 nut_tmin = model.tjoin(
1087 nut_tmin_seconds, nut_tmin_offset)
1088 nut_tmax = model.tjoin(
1089 nut_tmax_seconds, nut_tmax_offset)
1091 if nut_tmin < tmax and tmin < nut_tmax:
1092 if nut_tmin < tmin:
1093 insert.append((
1094 nut_tmin_seconds, nut_tmin_offset,
1095 tmin_seconds, tmin_offset,
1096 model.tscale_to_kscale(
1097 tmin_seconds - nut_tmin_seconds),
1098 nut_id))
1100 if tmax < nut_tmax:
1101 insert.append((
1102 tmax_seconds, tmax_offset,
1103 nut_tmax_seconds, nut_tmax_offset,
1104 model.tscale_to_kscale(
1105 nut_tmax_seconds - tmax_seconds),
1106 nut_id))
1108 delete.append((nut_id,))
1110 sql_add = '''
1111 INSERT INTO %(db)s.%(nuts)s (
1112 file_id, file_segment, file_element, kind_id,
1113 kind_codes_id, tmin_seconds, tmin_offset,
1114 tmax_seconds, tmax_offset, kscale )
1115 SELECT
1116 file_id, file_segment, file_element,
1117 kind_id, kind_codes_id, ?, ?, ?, ?, ?
1118 FROM %(db)s.%(nuts)s
1119 WHERE nut_id == ?
1120 '''
1121 cursor.executemany(sql_subst(sql_add), insert)
1123 sql_delete = '''
1124 DELETE FROM %(db)s.%(nuts)s WHERE nut_id == ?
1125 '''
1126 cursor.executemany(sql_subst(sql_delete), delete)
1128 def get_time_span(self, kinds=None):
1129 '''
1130 Get time interval over all content in selection.
1132 :param kinds:
1133 If not ``None``, restrict query to given content kinds.
1134 :type kind:
1135 list of str
1137 :complexity:
1138 O(1), independent of the number of nuts.
1140 :returns:
1141 ``(tmin, tmax)``, combined time interval of queried content kinds.
1142 '''
1144 sql_min = self._sql('''
1145 SELECT MIN(tmin_seconds), MIN(tmin_offset)
1146 FROM %(db)s.%(nuts)s
1147 WHERE kind_id == ?
1148 AND tmin_seconds == (
1149 SELECT MIN(tmin_seconds)
1150 FROM %(db)s.%(nuts)s
1151 WHERE kind_id == ?)
1152 ''')
1154 sql_max = self._sql('''
1155 SELECT MAX(tmax_seconds), MAX(tmax_offset)
1156 FROM %(db)s.%(nuts)s
1157 WHERE kind_id == ?
1158 AND tmax_seconds == (
1159 SELECT MAX(tmax_seconds)
1160 FROM %(db)s.%(nuts)s
1161 WHERE kind_id == ?)
1162 ''')
1164 gtmin = None
1165 gtmax = None
1167 if isinstance(kinds, str):
1168 kinds = [kinds]
1170 if kinds is None:
1171 kind_ids = model.g_content_kind_ids
1172 else:
1173 kind_ids = model.to_kind_ids(kinds)
1175 for kind_id in kind_ids:
1176 for tmin_seconds, tmin_offset in self._conn.execute(
1177 sql_min, (kind_id, kind_id)):
1178 tmin = model.tjoin(tmin_seconds, tmin_offset)
1179 if tmin is not None and (gtmin is None or tmin < gtmin):
1180 gtmin = tmin
1182 for (tmax_seconds, tmax_offset) in self._conn.execute(
1183 sql_max, (kind_id, kind_id)):
1184 tmax = model.tjoin(tmax_seconds, tmax_offset)
1185 if tmax is not None and (gtmax is None or tmax > gtmax):
1186 gtmax = tmax
1188 return gtmin, gtmax
1190 def has(self, kinds):
1191 '''
1192 Check availability of given content kinds.
1194 :param kinds:
1195 Content kinds to query.
1196 :type kind:
1197 list of str
1199 :returns:
1200 ``True`` if any of the queried content kinds is available
1201 in the selection.
1202 '''
1203 self_tmin, self_tmax = self.get_time_span(kinds)
1205 return None not in (self_tmin, self_tmax)
1207 def get_deltat_span(self, kind):
1208 '''
1209 Get min and max sampling interval of all content of given kind.
1211 :param kind:
1212 Content kind
1213 :type kind:
1214 str
1216 :returns: ``(deltat_min, deltat_max)``
1217 '''
1219 deltats = [
1220 deltat for deltat in self.get_deltats(kind)
1221 if deltat is not None]
1223 if deltats:
1224 return min(deltats), max(deltats)
1225 else:
1226 return None, None
1228 def iter_kinds(self, codes=None):
1229 '''
1230 Iterate over content types available in selection.
1232 :param codes:
1233 If given, get kinds only for selected codes identifier.
1234 Only a single identifier may be given here and no pattern matching
1235 is done, currently.
1236 :type codes:
1237 :py:class:`~pyrocko.squirrel.model.Codes`
1239 :yields:
1240 Available content kinds as :py:class:`str`.
1242 :complexity:
1243 O(1), independent of number of nuts.
1244 '''
1246 return self._database._iter_kinds(
1247 codes=codes,
1248 kind_codes_count='%(db)s.%(kind_codes_count)s' % self._names)
1250 def iter_deltats(self, kind=None):
1251 '''
1252 Iterate over sampling intervals available in selection.
1254 :param kind:
1255 If given, get sampling intervals only for a given content type.
1256 :type kind:
1257 str
1259 :yields:
1260 :py:class:`float` values.
1262 :complexity:
1263 O(1), independent of number of nuts.
1264 '''
1265 return self._database._iter_deltats(
1266 kind=kind,
1267 kind_codes_count='%(db)s.%(kind_codes_count)s' % self._names)
1269 def iter_codes(self, kind=None):
1270 '''
1271 Iterate over content identifier code sequences available in selection.
1273 :param kind:
1274 If given, get codes only for a given content type.
1275 :type kind:
1276 str
1278 :yields:
1279 :py:class:`tuple` of :py:class:`str`
1281 :complexity:
1282 O(1), independent of number of nuts.
1283 '''
1284 return self._database._iter_codes(
1285 kind=kind,
1286 kind_codes_count='%(db)s.%(kind_codes_count)s' % self._names)
1288 def _iter_codes_info(self, kind=None, codes=None):
1289 '''
1290 Iterate over number of occurrences of any (kind, codes) combination.
1292 :param kind:
1293 If given, get counts only for selected content type.
1294 :type kind:
1295 str
1297 :yields:
1298 Tuples of the form ``(kind, codes, deltat, kind_codes_id, count)``.
1300 :complexity:
1301 O(1), independent of number of nuts.
1302 '''
1303 return self._database._iter_codes_info(
1304 kind=kind,
1305 codes=codes,
1306 kind_codes_count='%(db)s.%(kind_codes_count)s' % self._names)
1308 def get_kinds(self, codes=None):
1309 '''
1310 Get content types available in selection.
1312 :param codes:
1313 If given, get kinds only for selected codes identifier.
1314 Only a single identifier may be given here and no pattern matching
1315 is done, currently.
1316 :type codes:
1317 :py:class:`~pyrocko.squirrel.model.Codes`
1319 :returns:
1320 Sorted list of available content types.
1321 :rtype:
1322 py:class:`list` of :py:class:`str`
1324 :complexity:
1325 O(1), independent of number of nuts.
1327 '''
1328 return sorted(list(self.iter_kinds(codes=codes)))
1330 def get_deltats(self, kind=None):
1331 '''
1332 Get sampling intervals available in selection.
1334 :param kind:
1335 If given, get sampling intervals only for selected content type.
1336 :type kind:
1337 str
1339 :complexity:
1340 O(1), independent of number of nuts.
1342 :returns: Sorted list of available sampling intervals.
1343 '''
1344 return sorted(list(self.iter_deltats(kind=kind)))
1346 def get_codes(self, kind=None):
1347 '''
1348 Get identifier code sequences available in selection.
1350 :param kind:
1351 If given, get codes only for selected content type.
1352 :type kind:
1353 str
1355 :complexity:
1356 O(1), independent of number of nuts.
1358 :returns: Sorted list of available codes as tuples of strings.
1359 '''
1360 return sorted(list(self.iter_codes(kind=kind)))
1362 def get_counts(self, kind=None):
1363 '''
1364 Get number of occurrences of any (kind, codes) combination.
1366 :param kind:
1367 If given, get codes only for selected content type.
1368 :type kind:
1369 str
1371 :complexity:
1372 O(1), independent of number of nuts.
1374 :returns: ``dict`` with ``counts[kind][codes]`` or ``counts[codes]``
1375 if kind is not ``None``
1376 '''
1377 d = {}
1378 for kind_id, codes, _, _, count in self._iter_codes_info(kind=kind):
1379 if kind_id not in d:
1380 v = d[kind_id] = {}
1381 else:
1382 v = d[kind_id]
1384 if codes not in v:
1385 v[codes] = 0
1387 v[codes] += count
1389 if kind is not None:
1390 return d[to_kind_id(kind)]
1391 else:
1392 return dict((to_kind(kind_id), v) for (kind_id, v) in d.items())
1394 def glob_codes(self, kind, codes):
1395 '''
1396 Find codes matching given patterns.
1398 :param kind:
1399 Content kind to be queried.
1400 :type kind:
1401 str
1403 :param codes:
1404 List of code patterns to query.
1405 :type codes:
1406 :py:class:`list` of :py:class:`~pyrocko.squirrel.model.Codes`
1407 objects appropriate for the queried content type, or anything which
1408 can be converted to such objects.
1410 :returns:
1411 List of matches of the form ``[kind_codes_id, codes, deltat]``.
1412 '''
1414 kind_id = to_kind_id(kind)
1415 args = [kind_id]
1416 pats = codes_patterns_for_kind(kind_id, codes)
1418 if pats:
1419 codes_cond = 'AND ( %s ) ' % ' OR '.join(
1420 ('kind_codes.codes GLOB ?',) * len(pats))
1422 args.extend(pat.safe_str for pat in pats)
1423 else:
1424 codes_cond = ''
1426 sql = self._sql('''
1427 SELECT kind_codes_id, codes, deltat FROM kind_codes
1428 WHERE
1429 kind_id == ? ''' + codes_cond)
1431 return list(map(list, self._conn.execute(sql, args)))
1433 def update(self, constraint=None, **kwargs):
1434 '''
1435 Update or partially update channel and event inventories.
1437 :param constraint:
1438 Selection of times or areas to be brought up to date.
1439 :type constraint:
1440 :py:class:`~pyrocko.squirrel.client.base.Constraint`
1442 :param \\*\\*kwargs:
1443 Shortcut for setting ``constraint=Constraint(**kwargs)``.
1445 This function triggers all attached remote sources, to check for
1446 updates in the meta-data. The sources will only submit queries when
1447 their expiration date has passed, or if the selection spans into
1448 previously unseen times or areas.
1449 '''
1451 if constraint is None:
1452 constraint = client.Constraint(**kwargs)
1454 for source in self._sources:
1455 source.update_channel_inventory(self, constraint)
1456 source.update_event_inventory(self, constraint)
1458 def update_waveform_promises(self, constraint=None, **kwargs):
1459 '''
1460 Permit downloading of remote waveforms.
1462 :param constraint:
1463 Remote waveforms compatible with the given constraint are enabled
1464 for download.
1465 :type constraint:
1466 :py:class:`~pyrocko.squirrel.client.base.Constraint`
1468 :param \\*\\*kwargs:
1469 Shortcut for setting ``constraint=Constraint(**kwargs)``.
1471 Calling this method permits Squirrel to download waveforms from remote
1472 sources when processing subsequent waveform requests. This works by
1473 inserting so called waveform promises into the database. It will look
1474 into the available channels for each remote source and create a promise
1475 for each channel compatible with the given constraint. If the promise
1476 then matches in a waveform request, Squirrel tries to download the
1477 waveform. If the download is successful, the downloaded waveform is
1478 added to the Squirrel and the promise is deleted. If the download
1479 fails, the promise is kept if the reason of failure looks like being
1480 temporary, e.g. because of a network failure. If the cause of failure
1481 however seems to be permanent, the promise is deleted so that no
1482 further attempts are made to download a waveform which might not be
1483 available from that server at all. To force re-scheduling after a
1484 permanent failure, call :py:meth:`update_waveform_promises`
1485 yet another time.
1486 '''
1488 if constraint is None:
1489 constraint = client.Constraint(**kwargs)
1491 for source in self._sources:
1492 source.update_waveform_promises(self, constraint)
1494 def remove_waveform_promises(self, from_database='selection'):
1495 '''
1496 Remove waveform promises from live selection or global database.
1498 Calling this function removes all waveform promises provided by the
1499 attached sources.
1501 :param from_database:
1502 Remove from live selection ``'selection'`` or global database
1503 ``'global'``.
1504 '''
1505 for source in self._sources:
1506 source.remove_waveform_promises(self, from_database=from_database)
1508 def update_responses(self, constraint=None, **kwargs):
1509 if constraint is None:
1510 constraint = client.Constraint(**kwargs)
1512 for source in self._sources:
1513 source.update_response_inventory(self, constraint)
1515 def get_nfiles(self):
1516 '''
1517 Get number of files in selection.
1518 '''
1520 sql = self._sql('''SELECT COUNT(*) FROM %(db)s.%(file_states)s''')
1521 for row in self._conn.execute(sql):
1522 return row[0]
1524 def get_nnuts(self):
1525 '''
1526 Get number of nuts in selection.
1527 '''
1529 sql = self._sql('''SELECT COUNT(*) FROM %(db)s.%(nuts)s''')
1530 for row in self._conn.execute(sql):
1531 return row[0]
1533 def get_total_size(self):
1534 '''
1535 Get aggregated file size available in selection.
1536 '''
1538 sql = self._sql('''
1539 SELECT SUM(files.size) FROM %(db)s.%(file_states)s
1540 INNER JOIN files
1541 ON %(db)s.%(file_states)s.file_id = files.file_id
1542 ''')
1544 for row in self._conn.execute(sql):
1545 return row[0] or 0
1547 def get_stats(self):
1548 '''
1549 Get statistics on contents available through this selection.
1550 '''
1552 kinds = self.get_kinds()
1553 time_spans = {}
1554 for kind in kinds:
1555 time_spans[kind] = self.get_time_span([kind])
1557 return SquirrelStats(
1558 nfiles=self.get_nfiles(),
1559 nnuts=self.get_nnuts(),
1560 kinds=kinds,
1561 codes=self.get_codes(),
1562 total_size=self.get_total_size(),
1563 counts=self.get_counts(),
1564 time_spans=time_spans,
1565 sources=[s.describe() for s in self._sources],
1566 operators=[op.describe() for op in self._operators])
1568 def get_content(
1569 self,
1570 nut,
1571 cache_id='default',
1572 accessor_id='default',
1573 show_progress=False,
1574 model='squirrel'):
1576 '''
1577 Get and possibly load full content for a given index entry from file.
1579 Loads the actual content objects (channel, station, waveform, ...) from
1580 file. For efficiency, sibling content (all stuff in the same file
1581 segment) will also be loaded as a side effect. The loaded contents are
1582 cached in the Squirrel object.
1583 '''
1585 content_cache = self._content_caches[cache_id]
1586 if not content_cache.has(nut):
1588 for nut_loaded in io.iload(
1589 nut.file_path,
1590 segment=nut.file_segment,
1591 format=nut.file_format,
1592 database=self._database,
1593 update_selection=self,
1594 show_progress=show_progress):
1596 content_cache.put(nut_loaded)
1598 try:
1599 return content_cache.get(nut, accessor_id, model)
1600 except KeyError:
1601 raise error.NotAvailable(
1602 'Unable to retrieve content: %s, %s, %s, %s' % nut.key)
1604 def advance_accessor(self, accessor_id='default', cache_id=None):
1605 '''
1606 Notify memory caches about consumer moving to a new data batch.
1608 :param accessor_id:
1609 Name of accessing consumer to be advanced.
1610 :type accessor_id:
1611 str
1613 :param cache_id:
1614 Name of cache to for which the accessor should be advanced. By
1615 default the named accessor is advanced in all registered caches.
1616 By default, two caches named ``'default'`` and ``'waveform'`` are
1617 available.
1618 :type cache_id:
1619 str
1621 See :py:class:`~pyrocko.squirrel.cache.ContentCache` for details on how
1622 Squirrel's memory caching works and can be tuned. Default behaviour is
1623 to release data when it has not been used in the latest data
1624 window/batch. If the accessor is never advanced, data is cached
1625 indefinitely - which is often desired e.g. for station meta-data.
1626 Methods for consecutive data traversal, like
1627 :py:meth:`chopper_waveforms` automatically advance and clear
1628 their accessor.
1629 '''
1630 for cache_ in (
1631 self._content_caches.keys()
1632 if cache_id is None
1633 else [cache_id]):
1635 self._content_caches[cache_].advance_accessor(accessor_id)
1637 def clear_accessor(self, accessor_id, cache_id=None):
1638 '''
1639 Notify memory caches about a consumer having finished.
1641 :param accessor_id:
1642 Name of accessor to be cleared.
1643 :type accessor_id:
1644 str
1646 :param cache_id:
1647 Name of cache for which the accessor should be cleared. By default
1648 the named accessor is cleared from all registered caches. By
1649 default, two caches named ``'default'`` and ``'waveform'`` are
1650 available.
1651 :type cache_id:
1652 str
1654 Calling this method clears all references to cache entries held by the
1655 named accessor. Cache entries are then freed if not referenced by any
1656 other accessor.
1657 '''
1659 for cache_ in (
1660 self._content_caches.keys()
1661 if cache_id is None
1662 else [cache_id]):
1664 self._content_caches[cache_].clear_accessor(accessor_id)
1666 def get_cache_stats(self, cache_id):
1667 return self._content_caches[cache_id].get_stats()
1669 def _check_duplicates(self, nuts):
1670 d = defaultdict(list)
1671 for nut in nuts:
1672 d[nut.codes].append(nut)
1674 for codes, group in d.items():
1675 if len(group) > 1:
1676 logger.warning(
1677 'Multiple entries matching codes: %s' % str(codes))
1679 @filldocs
1680 def get_stations(
1681 self, obj=None, tmin=None, tmax=None, time=None, codes=None,
1682 model='squirrel'):
1684 '''
1685 Get stations matching given constraints.
1687 %(query_args)s
1689 :param model:
1690 Select object model for returned values: ``'squirrel'`` to get
1691 Squirrel station objects or ``'pyrocko'`` to get Pyrocko station
1692 objects with channel information attached.
1693 :type model:
1694 str
1696 :returns:
1697 List of :py:class:`pyrocko.squirrel.Station
1698 <pyrocko.squirrel.model.Station>` objects by default or list of
1699 :py:class:`pyrocko.model.Station <pyrocko.model.station.Station>`
1700 objects if ``model='pyrocko'`` is requested.
1702 See :py:meth:`iter_nuts` for details on time span matching.
1703 '''
1705 if model == 'pyrocko':
1706 return self._get_pyrocko_stations(obj, tmin, tmax, time, codes)
1707 elif model in ('squirrel', 'stationxml'):
1708 args = self._get_selection_args(
1709 STATION, obj, tmin, tmax, time, codes)
1711 nuts = sorted(
1712 self.iter_nuts('station', *args), key=lambda nut: nut.dkey)
1713 self._check_duplicates(nuts)
1714 return [self.get_content(nut, model=model) for nut in nuts]
1715 else:
1716 raise ValueError('Invalid station model: %s' % model)
1718 @filldocs
1719 def get_channels(
1720 self, obj=None, tmin=None, tmax=None, time=None, codes=None,
1721 model='squirrel'):
1723 '''
1724 Get channels matching given constraints.
1726 %(query_args)s
1728 :returns:
1729 List of :py:class:`~pyrocko.squirrel.model.Channel` objects.
1731 See :py:meth:`iter_nuts` for details on time span matching.
1732 '''
1734 args = self._get_selection_args(
1735 CHANNEL, obj, tmin, tmax, time, codes)
1737 nuts = sorted(
1738 self.iter_nuts('channel', *args), key=lambda nut: nut.dkey)
1739 self._check_duplicates(nuts)
1740 return [self.get_content(nut, model=model) for nut in nuts]
1742 @filldocs
1743 def get_sensors(
1744 self, obj=None, tmin=None, tmax=None, time=None, codes=None):
1746 '''
1747 Get sensors matching given constraints.
1749 %(query_args)s
1751 :returns:
1752 List of :py:class:`~pyrocko.squirrel.model.Sensor` objects.
1754 See :py:meth:`iter_nuts` for details on time span matching.
1755 '''
1757 tmin, tmax, codes = self._get_selection_args(
1758 CHANNEL, obj, tmin, tmax, time, codes)
1760 if codes is not None:
1761 codes = codes_patterns_list(
1762 (entry.replace(channel=entry.channel[:-1] + '?')
1763 if entry != '*' else entry)
1764 for entry in codes)
1766 nuts = sorted(
1767 self.iter_nuts(
1768 'channel', tmin, tmax, codes), key=lambda nut: nut.dkey)
1769 self._check_duplicates(nuts)
1770 return model.Sensor.from_channels(
1771 self.get_content(nut) for nut in nuts)
1773 @filldocs
1774 def get_responses(
1775 self, obj=None, tmin=None, tmax=None, time=None, codes=None,
1776 model='squirrel'):
1778 '''
1779 Get instrument responses matching given constraints.
1781 %(query_args)s
1783 :returns:
1784 List of :py:class:`~pyrocko.squirrel.model.Response` objects.
1786 See :py:meth:`iter_nuts` for details on time span matching.
1787 '''
1789 args = self._get_selection_args(
1790 RESPONSE, obj, tmin, tmax, time, codes)
1792 nuts = sorted(
1793 self.iter_nuts('response', *args), key=lambda nut: nut.dkey)
1794 self._check_duplicates(nuts)
1795 return [self.get_content(nut, model=model) for nut in nuts]
1797 @filldocs
1798 def get_response(
1799 self, obj=None, tmin=None, tmax=None, time=None, codes=None,
1800 model='squirrel'):
1802 '''
1803 Get instrument response matching given constraints.
1805 %(query_args)s
1807 :returns:
1808 :py:class:`~pyrocko.squirrel.model.Response` object.
1810 Same as :py:meth:`get_responses` but returning exactly one response.
1811 Raises :py:exc:`~pyrocko.squirrel.error.NotAvailable` if zero or more
1812 than one is available.
1814 See :py:meth:`iter_nuts` for details on time span matching.
1815 '''
1817 responses = self.get_responses(
1818 obj, tmin, tmax, time, codes, model=model)
1819 if len(responses) == 0:
1820 raise error.NotAvailable(
1821 'No instrument response available (%s).'
1822 % self._get_selection_args_str(
1823 RESPONSE, obj, tmin, tmax, time, codes))
1825 elif len(responses) > 1:
1826 if model == 'squirrel':
1827 rinfo = ':\n' + '\n'.join(
1828 ' ' + resp.summary for resp in responses)
1829 else:
1830 rinfo = '.'
1832 raise error.NotAvailable(
1833 'Multiple instrument responses matching given constraints '
1834 '(%s)%s' % (
1835 self._get_selection_args_str(
1836 RESPONSE, obj, tmin, tmax, time, codes), rinfo))
1838 return responses[0]
1840 @filldocs
1841 def get_events(
1842 self, obj=None, tmin=None, tmax=None, time=None, codes=None):
1844 '''
1845 Get events matching given constraints.
1847 %(query_args)s
1849 :returns:
1850 List of :py:class:`~pyrocko.model.event.Event` objects.
1852 See :py:meth:`iter_nuts` for details on time span matching.
1853 '''
1855 args = self._get_selection_args(EVENT, obj, tmin, tmax, time, codes)
1856 nuts = sorted(
1857 self.iter_nuts('event', *args), key=lambda nut: nut.dkey)
1858 self._check_duplicates(nuts)
1859 return [self.get_content(nut) for nut in nuts]
1861 def _redeem_promises(self, *args):
1863 tmin, tmax, _ = args
1865 waveforms = list(self.iter_nuts('waveform', *args))
1866 promises = list(self.iter_nuts('waveform_promise', *args))
1868 codes_to_avail = defaultdict(list)
1869 for nut in waveforms:
1870 codes_to_avail[nut.codes].append((nut.tmin, nut.tmax))
1872 def tts(x):
1873 if isinstance(x, tuple):
1874 return tuple(tts(e) for e in x)
1875 elif isinstance(x, list):
1876 return list(tts(e) for e in x)
1877 else:
1878 return util.time_to_str(x)
1880 orders = []
1881 for promise in promises:
1882 waveforms_avail = codes_to_avail[promise.codes]
1883 for block_tmin, block_tmax in blocks(
1884 max(tmin, promise.tmin),
1885 min(tmax, promise.tmax),
1886 promise.deltat):
1888 orders.append(
1889 WaveformOrder(
1890 source_id=promise.file_path,
1891 codes=promise.codes,
1892 tmin=block_tmin,
1893 tmax=block_tmax,
1894 deltat=promise.deltat,
1895 gaps=gaps(waveforms_avail, block_tmin, block_tmax)))
1897 orders_noop, orders = lpick(lambda order: order.gaps, orders)
1899 order_keys_noop = set(order_key(order) for order in orders_noop)
1900 if len(order_keys_noop) != 0 or len(orders_noop) != 0:
1901 logger.info(
1902 'Waveform orders already satisified with cached/local data: '
1903 '%i (%i)' % (len(order_keys_noop), len(orders_noop)))
1905 source_ids = []
1906 sources = {}
1907 for source in self._sources:
1908 if isinstance(source, fdsn.FDSNSource):
1909 source_ids.append(source._source_id)
1910 sources[source._source_id] = source
1912 source_priority = dict(
1913 (source_id, i) for (i, source_id) in enumerate(source_ids))
1915 order_groups = defaultdict(list)
1916 for order in orders:
1917 order_groups[order_key(order)].append(order)
1919 for k, order_group in order_groups.items():
1920 order_group.sort(
1921 key=lambda order: source_priority[order.source_id])
1923 n_order_groups = len(order_groups)
1925 if len(order_groups) != 0 or len(orders) != 0:
1926 logger.info(
1927 'Waveform orders standing for download: %i (%i)'
1928 % (len(order_groups), len(orders)))
1930 task = make_task('Waveform orders processed', n_order_groups)
1931 else:
1932 task = None
1934 def split_promise(order):
1935 self._split_nuts(
1936 'waveform_promise',
1937 order.tmin, order.tmax,
1938 codes=order.codes,
1939 path=order.source_id)
1941 def release_order_group(order):
1942 okey = order_key(order)
1943 for followup in order_groups[okey]:
1944 split_promise(followup)
1946 del order_groups[okey]
1948 if task:
1949 task.update(n_order_groups - len(order_groups))
1951 def noop(order):
1952 pass
1954 def success(order):
1955 release_order_group(order)
1956 split_promise(order)
1958 def batch_add(paths):
1959 self.add(paths)
1961 calls = queue.Queue()
1963 def enqueue(f):
1964 def wrapper(*args):
1965 calls.put((f, args))
1967 return wrapper
1969 for order in orders_noop:
1970 split_promise(order)
1972 while order_groups:
1974 orders_now = []
1975 empty = []
1976 for k, order_group in order_groups.items():
1977 try:
1978 orders_now.append(order_group.pop(0))
1979 except IndexError:
1980 empty.append(k)
1982 for k in empty:
1983 del order_groups[k]
1985 by_source_id = defaultdict(list)
1986 for order in orders_now:
1987 by_source_id[order.source_id].append(order)
1989 threads = []
1990 for source_id in by_source_id:
1991 def download():
1992 try:
1993 sources[source_id].download_waveforms(
1994 by_source_id[source_id],
1995 success=enqueue(success),
1996 error_permanent=enqueue(split_promise),
1997 error_temporary=noop,
1998 batch_add=enqueue(batch_add))
2000 finally:
2001 calls.put(None)
2003 thread = threading.Thread(target=download)
2004 thread.start()
2005 threads.append(thread)
2007 ndone = 0
2008 while ndone < len(threads):
2009 ret = calls.get()
2010 if ret is None:
2011 ndone += 1
2012 else:
2013 ret[0](*ret[1])
2015 for thread in threads:
2016 thread.join()
2018 if task:
2019 task.update(n_order_groups - len(order_groups))
2021 if task:
2022 task.done()
2024 @filldocs
2025 def get_waveform_nuts(
2026 self, obj=None, tmin=None, tmax=None, time=None, codes=None):
2028 '''
2029 Get waveform content entities matching given constraints.
2031 %(query_args)s
2033 Like :py:meth:`get_nuts` with ``kind='waveform'`` but additionally
2034 resolves matching waveform promises (downloads waveforms from remote
2035 sources).
2037 See :py:meth:`iter_nuts` for details on time span matching.
2038 '''
2040 args = self._get_selection_args(WAVEFORM, obj, tmin, tmax, time, codes)
2041 self._redeem_promises(*args)
2042 return sorted(
2043 self.iter_nuts('waveform', *args), key=lambda nut: nut.dkey)
2045 @filldocs
2046 def get_waveforms(
2047 self, obj=None, tmin=None, tmax=None, time=None, codes=None,
2048 uncut=False, want_incomplete=True, degap=True, maxgap=5,
2049 maxlap=None, snap=None, include_last=False, load_data=True,
2050 accessor_id='default', operator_params=None):
2052 '''
2053 Get waveforms matching given constraints.
2055 %(query_args)s
2057 :param uncut:
2058 Set to ``True``, to disable cutting traces to [``tmin``, ``tmax``]
2059 and to disable degapping/deoverlapping. Returns untouched traces as
2060 they are read from file segment. File segments are always read in
2061 their entirety.
2062 :type uncut:
2063 bool
2065 :param want_incomplete:
2066 If ``True``, gappy/incomplete traces are included in the result.
2067 :type want_incomplete:
2068 bool
2070 :param degap:
2071 If ``True``, connect traces and remove gaps and overlaps.
2072 :type degap:
2073 bool
2075 :param maxgap:
2076 Maximum gap size in samples which is filled with interpolated
2077 samples when ``degap`` is ``True``.
2078 :type maxgap:
2079 int
2081 :param maxlap:
2082 Maximum overlap size in samples which is removed when ``degap`` is
2083 ``True``.
2084 :type maxlap:
2085 int
2087 :param snap:
2088 Rounding functions used when computing sample index from time
2089 instance, for trace start and trace end, respectively. By default,
2090 ``(round, round)`` is used.
2091 :type snap:
2092 tuple of 2 callables
2094 :param include_last:
2095 If ``True``, add one more sample to the returned traces (the sample
2096 which would be the first sample of a query with ``tmin`` set to the
2097 current value of ``tmax``).
2098 :type include_last:
2099 bool
2101 :param load_data:
2102 If ``True``, waveform data samples are read from files (or cache).
2103 If ``False``, meta-information-only traces are returned (dummy
2104 traces with no data samples).
2105 :type load_data:
2106 bool
2108 :param accessor_id:
2109 Name of consumer on who's behalf data is accessed. Used in cache
2110 management (see :py:mod:`~pyrocko.squirrel.cache`). Used as a key
2111 to distinguish different points of extraction for the decision of
2112 when to release cached waveform data. Should be used when data is
2113 alternately extracted from more than one region / selection.
2114 :type accessor_id:
2115 str
2117 See :py:meth:`iter_nuts` for details on time span matching.
2119 Loaded data is kept in memory (at least) until
2120 :py:meth:`clear_accessor` has been called or
2121 :py:meth:`advance_accessor` has been called two consecutive times
2122 without data being accessed between the two calls (by this accessor).
2123 Data may still be further kept in the memory cache if held alive by
2124 consumers with a different ``accessor_id``.
2125 '''
2127 tmin, tmax, codes = self._get_selection_args(
2128 WAVEFORM, obj, tmin, tmax, time, codes)
2130 self_tmin, self_tmax = self.get_time_span(
2131 ['waveform', 'waveform_promise'])
2133 if None in (self_tmin, self_tmax):
2134 logger.warning(
2135 'No waveforms available.')
2136 return []
2138 tmin = tmin if tmin is not None else self_tmin
2139 tmax = tmax if tmax is not None else self_tmax
2141 if codes is not None and len(codes) == 1:
2142 # TODO: fix for multiple / mixed codes
2143 operator = self.get_operator(codes[0])
2144 if operator is not None:
2145 return operator.get_waveforms(
2146 self, codes[0],
2147 tmin=tmin, tmax=tmax,
2148 uncut=uncut, want_incomplete=want_incomplete, degap=degap,
2149 maxgap=maxgap, maxlap=maxlap, snap=snap,
2150 include_last=include_last, load_data=load_data,
2151 accessor_id=accessor_id, params=operator_params)
2153 nuts = self.get_waveform_nuts(obj, tmin, tmax, time, codes)
2155 if load_data:
2156 traces = [
2157 self.get_content(nut, 'waveform', accessor_id) for nut in nuts]
2159 else:
2160 traces = [
2161 trace.Trace(**nut.trace_kwargs) for nut in nuts]
2163 if uncut:
2164 return traces
2166 if snap is None:
2167 snap = (round, round)
2169 chopped = []
2170 for tr in traces:
2171 if not load_data and tr.ydata is not None:
2172 tr = tr.copy(data=False)
2173 tr.ydata = None
2175 try:
2176 chopped.append(tr.chop(
2177 tmin, tmax,
2178 inplace=False,
2179 snap=snap,
2180 include_last=include_last))
2182 except trace.NoData:
2183 pass
2185 processed = self._process_chopped(
2186 chopped, degap, maxgap, maxlap, want_incomplete, tmin, tmax)
2188 return processed
2190 @filldocs
2191 def chopper_waveforms(
2192 self, obj=None, tmin=None, tmax=None, time=None, codes=None,
2193 tinc=None, tpad=0.,
2194 want_incomplete=True, snap_window=False,
2195 degap=True, maxgap=5, maxlap=None,
2196 snap=None, include_last=False, load_data=True,
2197 accessor_id=None, clear_accessor=True, operator_params=None,
2198 grouping=None):
2200 '''
2201 Iterate window-wise over waveform archive.
2203 %(query_args)s
2205 :param tinc:
2206 Time increment (window shift time) (default uses ``tmax-tmin``).
2207 :type tinc:
2208 timestamp
2210 :param tpad:
2211 Padding time appended on either side of the data window (window
2212 overlap is ``2*tpad``).
2213 :type tpad:
2214 timestamp
2216 :param want_incomplete:
2217 If ``True``, gappy/incomplete traces are included in the result.
2218 :type want_incomplete:
2219 bool
2221 :param snap_window:
2222 If ``True``, start time windows at multiples of tinc with respect
2223 to system time zero.
2224 :type snap_window:
2225 bool
2227 :param degap:
2228 If ``True``, connect traces and remove gaps and overlaps.
2229 :type degap:
2230 bool
2232 :param maxgap:
2233 Maximum gap size in samples which is filled with interpolated
2234 samples when ``degap`` is ``True``.
2235 :type maxgap:
2236 int
2238 :param maxlap:
2239 Maximum overlap size in samples which is removed when ``degap`` is
2240 ``True``.
2241 :type maxlap:
2242 int
2244 :param snap:
2245 Rounding functions used when computing sample index from time
2246 instance, for trace start and trace end, respectively. By default,
2247 ``(round, round)`` is used.
2248 :type snap:
2249 tuple of 2 callables
2251 :param include_last:
2252 If ``True``, add one more sample to the returned traces (the sample
2253 which would be the first sample of a query with ``tmin`` set to the
2254 current value of ``tmax``).
2255 :type include_last:
2256 bool
2258 :param load_data:
2259 If ``True``, waveform data samples are read from files (or cache).
2260 If ``False``, meta-information-only traces are returned (dummy
2261 traces with no data samples).
2262 :type load_data:
2263 bool
2265 :param accessor_id:
2266 Name of consumer on who's behalf data is accessed. Used in cache
2267 management (see :py:mod:`~pyrocko.squirrel.cache`). Used as a key
2268 to distinguish different points of extraction for the decision of
2269 when to release cached waveform data. Should be used when data is
2270 alternately extracted from more than one region / selection.
2271 :type accessor_id:
2272 str
2274 :param clear_accessor:
2275 If ``True`` (default), :py:meth:`clear_accessor` is called when the
2276 chopper finishes. Set to ``False`` to keep loaded waveforms in
2277 memory when the generator returns.
2278 :type clear_accessor:
2279 bool
2281 :param grouping:
2282 By default, traversal over the data is over time and all matching
2283 traces of a time window are yielded. Using this option, it is
2284 possible to traverse the data first by group (e.g. station or
2285 network) and second by time. This can reduce the number of traces
2286 in each batch and thus reduce the memory footprint of the process.
2287 :type grouping:
2288 :py:class:`~pyrocko.squirrel.operator.Grouping`
2290 :yields:
2291 A list of :py:class:`~pyrocko.trace.Trace` objects for every
2292 extracted time window.
2294 See :py:meth:`iter_nuts` for details on time span matching.
2295 '''
2297 tmin, tmax, codes = self._get_selection_args(
2298 WAVEFORM, obj, tmin, tmax, time, codes)
2300 self_tmin, self_tmax = self.get_time_span(
2301 ['waveform', 'waveform_promise'])
2303 if None in (self_tmin, self_tmax):
2304 logger.warning(
2305 'Content has undefined time span. No waveforms and no '
2306 'waveform promises?')
2307 return
2309 if snap_window and tinc is not None:
2310 tmin = tmin if tmin is not None else self_tmin
2311 tmax = tmax if tmax is not None else self_tmax
2312 tmin = math.floor(tmin / tinc) * tinc
2313 tmax = math.ceil(tmax / tinc) * tinc
2314 else:
2315 tmin = tmin if tmin is not None else self_tmin + tpad
2316 tmax = tmax if tmax is not None else self_tmax - tpad
2318 tinc = tinc if tinc is not None else tmax - tmin
2320 try:
2321 if accessor_id is None:
2322 accessor_id = 'chopper%i' % self._n_choppers_active
2324 self._n_choppers_active += 1
2326 eps = tinc * 1e-6
2327 if tinc != 0.0:
2328 nwin = int(((tmax - eps) - tmin) / tinc) + 1
2329 else:
2330 nwin = 1
2332 if grouping is None:
2333 codes_list = [codes]
2334 else:
2335 operator = Operator(
2336 filtering=CodesPatternFiltering(codes=codes),
2337 grouping=grouping)
2339 available = set(self.get_codes(kind='waveform'))
2340 available.update(self.get_codes(kind='waveform_promise'))
2341 operator.update_mappings(sorted(available))
2343 def iter_codes_list():
2344 for scl in operator.iter_in_codes():
2345 yield codes_patterns_list(scl)
2347 codes_list = iter_codes_list()
2349 for scl in codes_list:
2350 for iwin in range(nwin):
2351 wmin, wmax = tmin+iwin*tinc, min(tmin+(iwin+1)*tinc, tmax)
2353 chopped = self.get_waveforms(
2354 tmin=wmin-tpad,
2355 tmax=wmax+tpad,
2356 codes=scl,
2357 snap=snap,
2358 include_last=include_last,
2359 load_data=load_data,
2360 want_incomplete=want_incomplete,
2361 degap=degap,
2362 maxgap=maxgap,
2363 maxlap=maxlap,
2364 accessor_id=accessor_id,
2365 operator_params=operator_params)
2367 self.advance_accessor(accessor_id)
2369 yield Batch(
2370 tmin=wmin,
2371 tmax=wmax,
2372 i=iwin,
2373 n=nwin,
2374 traces=chopped)
2376 finally:
2377 self._n_choppers_active -= 1
2378 if clear_accessor:
2379 self.clear_accessor(accessor_id, 'waveform')
2381 def _process_chopped(
2382 self, chopped, degap, maxgap, maxlap, want_incomplete, tmin, tmax):
2384 chopped.sort(key=lambda a: a.full_id)
2385 if degap:
2386 chopped = trace.degapper(chopped, maxgap=maxgap, maxlap=maxlap)
2388 if not want_incomplete:
2389 chopped_weeded = []
2390 for tr in chopped:
2391 emin = tr.tmin - tmin
2392 emax = tr.tmax + tr.deltat - tmax
2393 if (abs(emin) <= 0.5*tr.deltat and abs(emax) <= 0.5*tr.deltat):
2394 chopped_weeded.append(tr)
2396 elif degap:
2397 if (0. < emin <= 5. * tr.deltat
2398 and -5. * tr.deltat <= emax < 0.):
2400 tr.extend(tmin, tmax-tr.deltat, fillmethod='repeat')
2401 chopped_weeded.append(tr)
2403 chopped = chopped_weeded
2405 return chopped
2407 def _get_pyrocko_stations(
2408 self, obj=None, tmin=None, tmax=None, time=None, codes=None):
2410 from pyrocko import model as pmodel
2412 by_nsl = defaultdict(lambda: (list(), list()))
2413 for station in self.get_stations(obj, tmin, tmax, time, codes):
2414 sargs = station._get_pyrocko_station_args()
2415 by_nsl[station.codes.nsl][0].append(sargs)
2417 for channel in self.get_channels(obj, tmin, tmax, time, codes):
2418 sargs = channel._get_pyrocko_station_args()
2419 sargs_list, channels_list = by_nsl[channel.codes.nsl]
2420 sargs_list.append(sargs)
2421 channels_list.append(channel)
2423 pstations = []
2424 nsls = list(by_nsl.keys())
2425 nsls.sort()
2426 for nsl in nsls:
2427 sargs_list, channels_list = by_nsl[nsl]
2428 sargs = util.consistency_merge(
2429 [('',) + x for x in sargs_list])
2431 by_c = defaultdict(list)
2432 for ch in channels_list:
2433 by_c[ch.codes.channel].append(ch._get_pyrocko_channel_args())
2435 chas = list(by_c.keys())
2436 chas.sort()
2437 pchannels = []
2438 for cha in chas:
2439 list_of_cargs = by_c[cha]
2440 cargs = util.consistency_merge(
2441 [('',) + x for x in list_of_cargs])
2442 pchannels.append(pmodel.Channel(*cargs))
2444 pstations.append(
2445 pmodel.Station(*sargs, channels=pchannels))
2447 return pstations
2449 @property
2450 def pile(self):
2452 '''
2453 Emulates the older :py:class:`pyrocko.pile.Pile` interface.
2455 This property exposes a :py:class:`pyrocko.squirrel.pile.Pile` object,
2456 which emulates most of the older :py:class:`pyrocko.pile.Pile` methods
2457 but uses the fluffy power of the Squirrel under the hood.
2459 This interface can be used as a drop-in replacement for piles which are
2460 used in existing scripts and programs for efficient waveform data
2461 access. The Squirrel-based pile scales better for large datasets. Newer
2462 scripts should use Squirrel's native methods to avoid the emulation
2463 overhead.
2464 '''
2465 from . import pile
2467 if self._pile is None:
2468 self._pile = pile.Pile(self)
2470 return self._pile
2472 def snuffle(self):
2473 '''
2474 Look at dataset in Snuffler.
2475 '''
2476 self.pile.snuffle()
2478 def _gather_codes_keys(self, kind, gather, selector):
2479 return set(
2480 gather(codes)
2481 for codes in self.iter_codes(kind)
2482 if selector is None or selector(codes))
2484 def __str__(self):
2485 return str(self.get_stats())
2487 def get_coverage(
2488 self, kind, tmin=None, tmax=None, codes=None, limit=None):
2490 '''
2491 Get coverage information.
2493 Get information about strips of gapless data coverage.
2495 :param kind:
2496 Content kind to be queried.
2497 :type kind:
2498 str
2500 :param tmin:
2501 Start time of query interval.
2502 :type tmin:
2503 timestamp
2505 :param tmax:
2506 End time of query interval.
2507 :type tmax:
2508 timestamp
2510 :param codes:
2511 If given, restrict query to given content codes patterns.
2512 :type codes:
2513 :py:class:`list` of :py:class:`~pyrocko.squirrel.model.Codes`
2514 objects appropriate for the queried content type, or anything which
2515 can be converted to such objects.
2517 :param limit:
2518 Limit query to return only up to a given maximum number of entries
2519 per matching time series (without setting this option, very gappy
2520 data could cause the query to execute for a very long time).
2521 :type limit:
2522 int
2524 :returns:
2525 Information about time spans covered by the requested time series
2526 data.
2527 :rtype:
2528 :py:class:`list` of :py:class:`Coverage` objects
2529 '''
2531 tmin_seconds, tmin_offset = model.tsplit(tmin)
2532 tmax_seconds, tmax_offset = model.tsplit(tmax)
2533 kind_id = to_kind_id(kind)
2535 codes_info = list(self._iter_codes_info(kind=kind))
2537 kdata_all = []
2538 if codes is None:
2539 for _, codes_entry, deltat, kind_codes_id, _ in codes_info:
2540 kdata_all.append(
2541 (codes_entry, kind_codes_id, codes_entry, deltat))
2543 else:
2544 for codes_entry in codes:
2545 pattern = to_codes(kind_id, codes_entry)
2546 for _, codes_entry, deltat, kind_codes_id, _ in codes_info:
2547 if model.match_codes(pattern, codes_entry):
2548 kdata_all.append(
2549 (pattern, kind_codes_id, codes_entry, deltat))
2551 kind_codes_ids = [x[1] for x in kdata_all]
2553 counts_at_tmin = {}
2554 if tmin is not None:
2555 for nut in self.iter_nuts(
2556 kind, tmin, tmin, kind_codes_ids=kind_codes_ids):
2558 k = nut.codes, nut.deltat
2559 if k not in counts_at_tmin:
2560 counts_at_tmin[k] = 0
2562 counts_at_tmin[k] += 1
2564 coverages = []
2565 for pattern, kind_codes_id, codes_entry, deltat in kdata_all:
2566 entry = [pattern, codes_entry, deltat, None, None, []]
2567 for i, order in [(0, 'ASC'), (1, 'DESC')]:
2568 sql = self._sql('''
2569 SELECT
2570 time_seconds,
2571 time_offset
2572 FROM %(db)s.%(coverage)s
2573 WHERE
2574 kind_codes_id == ?
2575 ORDER BY
2576 kind_codes_id ''' + order + ''',
2577 time_seconds ''' + order + ''',
2578 time_offset ''' + order + '''
2579 LIMIT 1
2580 ''')
2582 for row in self._conn.execute(sql, [kind_codes_id]):
2583 entry[3+i] = model.tjoin(row[0], row[1])
2585 if None in entry[3:5]:
2586 continue
2588 args = [kind_codes_id]
2590 sql_time = ''
2591 if tmin is not None:
2592 # intentionally < because (== tmin) is queried from nuts
2593 sql_time += ' AND ( ? < time_seconds ' \
2594 'OR ( ? == time_seconds AND ? < time_offset ) ) '
2595 args.extend([tmin_seconds, tmin_seconds, tmin_offset])
2597 if tmax is not None:
2598 sql_time += ' AND ( time_seconds < ? ' \
2599 'OR ( ? == time_seconds AND time_offset <= ? ) ) '
2600 args.extend([tmax_seconds, tmax_seconds, tmax_offset])
2602 sql_limit = ''
2603 if limit is not None:
2604 sql_limit = ' LIMIT ?'
2605 args.append(limit)
2607 sql = self._sql('''
2608 SELECT
2609 time_seconds,
2610 time_offset,
2611 step
2612 FROM %(db)s.%(coverage)s
2613 WHERE
2614 kind_codes_id == ?
2615 ''' + sql_time + '''
2616 ORDER BY
2617 kind_codes_id,
2618 time_seconds,
2619 time_offset
2620 ''' + sql_limit)
2622 rows = list(self._conn.execute(sql, args))
2624 if limit is not None and len(rows) == limit:
2625 entry[-1] = None
2626 else:
2627 counts = counts_at_tmin.get((codes_entry, deltat), 0)
2628 tlast = None
2629 if tmin is not None:
2630 entry[-1].append((tmin, counts))
2631 tlast = tmin
2633 for row in rows:
2634 t = model.tjoin(row[0], row[1])
2635 counts += row[2]
2636 entry[-1].append((t, counts))
2637 tlast = t
2639 if tmax is not None and (tlast is None or tlast != tmax):
2640 entry[-1].append((tmax, counts))
2642 coverages.append(model.Coverage.from_values(entry + [kind_id]))
2644 return coverages
2646 def add_operator(self, op):
2647 self._operators.append(op)
2649 def update_operator_mappings(self):
2650 available = self.get_codes(kind=('channel'))
2652 for operator in self._operators:
2653 operator.update_mappings(available, self._operator_registry)
2655 def iter_operator_mappings(self):
2656 for operator in self._operators:
2657 for in_codes, out_codes in operator.iter_mappings():
2658 yield operator, in_codes, out_codes
2660 def get_operator_mappings(self):
2661 return list(self.iter_operator_mappings())
2663 def get_operator(self, codes):
2664 try:
2665 return self._operator_registry[codes][0]
2666 except KeyError:
2667 return None
2669 def get_operator_group(self, codes):
2670 try:
2671 return self._operator_registry[codes]
2672 except KeyError:
2673 return None, (None, None, None)
2675 def iter_operator_codes(self):
2676 for _, _, out_codes in self.iter_operator_mappings():
2677 for codes in out_codes:
2678 yield codes
2680 def get_operator_codes(self):
2681 return list(self.iter_operator_codes())
2683 def print_tables(self, table_names=None, stream=None):
2684 '''
2685 Dump raw database tables in textual form (for debugging purposes).
2687 :param table_names:
2688 Names of tables to be dumped or ``None`` to dump all.
2689 :type table_names:
2690 :py:class:`list` of :py:class:`str`
2692 :param stream:
2693 Open file or ``None`` to dump to standard output.
2694 '''
2696 if stream is None:
2697 stream = sys.stdout
2699 if isinstance(table_names, str):
2700 table_names = [table_names]
2702 if table_names is None:
2703 table_names = [
2704 'selection_file_states',
2705 'selection_nuts',
2706 'selection_kind_codes_count',
2707 'files', 'nuts', 'kind_codes', 'kind_codes_count']
2709 m = {
2710 'selection_file_states': '%(db)s.%(file_states)s',
2711 'selection_nuts': '%(db)s.%(nuts)s',
2712 'selection_kind_codes_count': '%(db)s.%(kind_codes_count)s',
2713 'files': 'files',
2714 'nuts': 'nuts',
2715 'kind_codes': 'kind_codes',
2716 'kind_codes_count': 'kind_codes_count'}
2718 for table_name in table_names:
2719 self._database.print_table(
2720 m[table_name] % self._names, stream=stream)
2723class SquirrelStats(Object):
2724 '''
2725 Container to hold statistics about contents available from a Squirrel.
2727 See also :py:meth:`Squirrel.get_stats`.
2728 '''
2730 nfiles = Int.T(
2731 help='Number of files in selection.')
2732 nnuts = Int.T(
2733 help='Number of index nuts in selection.')
2734 codes = List.T(
2735 Tuple.T(content_t=String.T()),
2736 help='Available code sequences in selection, e.g. '
2737 '(agency, network, station, location) for stations nuts.')
2738 kinds = List.T(
2739 String.T(),
2740 help='Available content types in selection.')
2741 total_size = Int.T(
2742 help='Aggregated file size of files is selection.')
2743 counts = Dict.T(
2744 String.T(), Dict.T(Tuple.T(content_t=String.T()), Int.T()),
2745 help='Breakdown of how many nuts of any content type and code '
2746 'sequence are available in selection, ``counts[kind][codes]``.')
2747 time_spans = Dict.T(
2748 String.T(), Tuple.T(content_t=Timestamp.T()),
2749 help='Time spans by content type.')
2750 sources = List.T(
2751 String.T(),
2752 help='Descriptions of attached sources.')
2753 operators = List.T(
2754 String.T(),
2755 help='Descriptions of attached operators.')
2757 def __str__(self):
2758 kind_counts = dict(
2759 (kind, sum(self.counts[kind].values())) for kind in self.kinds)
2761 scodes = model.codes_to_str_abbreviated(self.codes)
2763 ssources = '<none>' if not self.sources else '\n' + '\n'.join(
2764 ' ' + s for s in self.sources)
2766 soperators = '<none>' if not self.operators else '\n' + '\n'.join(
2767 ' ' + s for s in self.operators)
2769 def stime(t):
2770 return util.tts(t) if t is not None and t not in (
2771 model.g_tmin, model.g_tmax) else '<none>'
2773 def stable(rows):
2774 ns = [max(len(w) for w in col) for col in zip(*rows)]
2775 return '\n'.join(
2776 ' '.join(w.ljust(n) for n, w in zip(ns, row))
2777 for row in rows)
2779 def indent(s):
2780 return '\n'.join(' '+line for line in s.splitlines())
2782 stspans = '<none>' if not self.kinds else '\n' + indent(stable([(
2783 kind + ':',
2784 str(kind_counts[kind]),
2785 stime(self.time_spans[kind][0]),
2786 '-',
2787 stime(self.time_spans[kind][1])) for kind in sorted(self.kinds)]))
2789 s = '''
2790Number of files: %i
2791Total size of known files: %s
2792Number of index nuts: %i
2793Available content kinds: %s
2794Available codes: %s
2795Sources: %s
2796Operators: %s''' % (
2797 self.nfiles,
2798 util.human_bytesize(self.total_size),
2799 self.nnuts,
2800 stspans, scodes, ssources, soperators)
2802 return s.lstrip()
2805__all__ = [
2806 'Squirrel',
2807 'SquirrelStats',
2808]