1# http://pyrocko.org - GPLv3
2#
3# The Pyrocko Developers, 21st Century
4# ---|P------/S----------~Lg----------
6from __future__ import absolute_import, print_function
8import sys
9import os
11import math
12import logging
13import threading
14import queue
15from collections import defaultdict
17from pyrocko.guts import Object, Int, List, Tuple, String, Timestamp, Dict
18from pyrocko import util, trace
19from pyrocko.progress import progress
21from . import model, io, cache, dataset
23from .model import to_kind_id, separator, WaveformOrder
24from .client import fdsn, catalog
25from .selection import Selection, filldocs
26from .database import abspath
27from . import client, environment, error
29logger = logging.getLogger('psq.base')
31guts_prefix = 'squirrel'
34def make_task(*args):
35 return progress.task(*args, logger=logger)
38def lpick(condition, seq):
39 ft = [], []
40 for ele in seq:
41 ft[int(bool(condition(ele)))].append(ele)
43 return ft
46def codes_fill(n, codes):
47 return codes[:n] + ('*',) * (n-len(codes))
50c_kind_to_ncodes = {
51 'station': 4,
52 'channel': 6,
53 'response': 6,
54 'waveform': 6,
55 'event': 1,
56 'waveform_promise': 6,
57 'undefined': 1}
60c_inflated = ['', '*', '*', '*', '*', '*']
61c_offsets = [0, 2, 1, 1, 1, 1, 0]
64def codes_inflate(codes):
65 codes = codes[:6]
66 inflated = list(c_inflated)
67 ncodes = len(codes)
68 offset = c_offsets[ncodes]
69 inflated[offset:offset+ncodes] = codes
70 return inflated
73def codes_inflate2(codes):
74 inflated = list(c_inflated)
75 ncodes = len(codes)
76 inflated[:ncodes] = codes
77 return tuple(inflated)
80def codes_patterns_for_kind(kind, codes):
81 if not codes:
82 return []
84 if not isinstance(codes[0], str):
85 out = []
86 for subcodes in codes:
87 out.extend(codes_patterns_for_kind(kind, subcodes))
88 return out
90 if kind in ('event', 'undefined'):
91 return [codes]
93 cfill = codes_inflate(codes)[:c_kind_to_ncodes[kind]]
95 if kind == 'station':
96 cfill2 = list(cfill)
97 cfill2[3] = '[*]'
98 return [cfill, cfill2]
100 return [cfill]
103def group_channels(channels):
104 groups = defaultdict(list)
105 for channel in channels:
106 codes = channel.codes
107 gcodes = codes[:-1] + (codes[-1][:-1],)
108 groups[gcodes].append(channel)
110 return groups
113def pyrocko_station_from_channel_group(group, extra_args):
114 list_of_args = [channel._get_pyrocko_station_args() for channel in group]
115 args = util.consistency_merge(list_of_args + extra_args)
116 from pyrocko import model as pmodel
117 return pmodel.Station(
118 network=args[0],
119 station=args[1],
120 location=args[2],
121 lat=args[3],
122 lon=args[4],
123 elevation=args[5],
124 depth=args[6],
125 channels=[ch.get_pyrocko_channel() for ch in group])
128def blocks(tmin, tmax, deltat, nsamples_block=100000):
129 tblock = deltat * nsamples_block
130 iblock_min = int(math.floor(tmin / tblock))
131 iblock_max = int(math.ceil(tmax / tblock))
132 for iblock in range(iblock_min, iblock_max):
133 yield iblock * tblock, (iblock+1) * tblock
136def gaps(avail, tmin, tmax):
137 assert tmin < tmax
139 data = [(tmax, 1), (tmin, -1)]
140 for (tmin_a, tmax_a) in avail:
141 assert tmin_a < tmax_a
142 data.append((tmin_a, 1))
143 data.append((tmax_a, -1))
145 data.sort()
146 s = 1
147 gaps = []
148 tmin_g = None
149 for t, x in data:
150 if s == 1 and x == -1:
151 tmin_g = t
152 elif s == 0 and x == 1 and tmin_g is not None:
153 tmax_g = t
154 if tmin_g != tmax_g:
155 gaps.append((tmin_g, tmax_g))
157 s += x
159 return gaps
162def order_key(order):
163 return (order.codes, order.tmin, order.tmax)
166class Batch(object):
167 '''
168 Batch of waveforms from window-wise data extraction.
170 Encapsulates state and results yielded for each window in window-wise
171 waveform extraction with the :py:meth:`Squirrel.chopper_waveforms` method.
173 *Attributes:*
175 .. py:attribute:: tmin
177 Start of this time window.
179 .. py:attribute:: tmax
181 End of this time window.
183 .. py:attribute:: i
185 Index of this time window in sequence.
187 .. py:attribute:: n
189 Total number of time windows in sequence.
191 .. py:attribute:: traces
193 Extracted waveforms for this time window.
194 '''
196 def __init__(self, tmin, tmax, i, n, traces):
197 self.tmin = tmin
198 self.tmax = tmax
199 self.i = i
200 self.n = n
201 self.traces = traces
204class Squirrel(Selection):
205 '''
206 Prompt, lazy, indexing, caching, dynamic seismological dataset access.
208 :param env:
209 Squirrel environment instance or directory path to use as starting
210 point for its detection. By default, the current directory is used as
211 starting point. When searching for a usable environment the directory
212 ``'.squirrel'`` or ``'squirrel'`` in the current (or starting point)
213 directory is used if it exists, otherwise the parent directories are
214 search upwards for the existence of such a directory. If no such
215 directory is found, the user's global Squirrel environment
216 ``'$HOME/.pyrocko/squirrel'`` is used.
217 :type env:
218 :py:class:`~pyrocko.squirrel.environment.Environment` or
219 :py:class:`str`
221 :param database:
222 Database instance or path to database. By default the
223 database found in the detected Squirrel environment is used.
224 :type database:
225 :py:class:`~pyrocko.squirrel.database.Database` or :py:class:`str`
227 :param cache_path:
228 Directory path to use for data caching. By default, the ``'cache'``
229 directory in the detected Squirrel environment is used.
230 :type cache_path:
231 :py:class:`str`
233 :param persistent:
234 If given a name, create a persistent selection.
235 :type persistent:
236 :py:class:`str`
238 This is the central class of the Squirrel framework. It provides a unified
239 interface to query and access seismic waveforms, station meta-data and
240 event information from local file collections and remote data sources. For
241 prompt responses, a profound database setup is used under the hood. To
242 speed up assemblage of ad-hoc data selections, files are indexed on first
243 use and the extracted meta-data is remembered in the database for
244 subsequent accesses. Bulk data is lazily loaded from disk and remote
245 sources, just when requested. Once loaded, data is cached in memory to
246 expedite typical access patterns. Files and data sources can be dynamically
247 added to and removed from the Squirrel selection at runtime.
249 Queries are restricted to the contents of the files currently added to the
250 Squirrel selection (usually a subset of the file meta-information
251 collection in the database). This list of files is referred to here as the
252 "selection". By default, temporary tables are created in the attached
253 database to hold the names of the files in the selection as well as various
254 indices and counters. These tables are only visible inside the application
255 which created them and are deleted when the database connection is closed
256 or the application exits. To create a selection which is not deleted at
257 exit, supply a name to the ``persistent`` argument of the Squirrel
258 constructor. Persistent selections are shared among applications using the
259 same database.
261 **Method summary**
263 Some of the methods are implemented in :py:class:`Squirrel`'s base class
264 :py:class:`~pyrocko.squirrel.selection.Selection`.
266 .. autosummary::
268 ~Squirrel.add
269 ~Squirrel.add_source
270 ~Squirrel.add_fdsn
271 ~Squirrel.add_catalog
272 ~Squirrel.add_dataset
273 ~Squirrel.add_virtual
274 ~Squirrel.update
275 ~Squirrel.update_waveform_promises
276 ~Squirrel.advance_accessor
277 ~Squirrel.clear_accessor
278 ~Squirrel.reload
279 ~pyrocko.squirrel.selection.Selection.iter_paths
280 ~Squirrel.iter_nuts
281 ~Squirrel.iter_kinds
282 ~Squirrel.iter_deltats
283 ~Squirrel.iter_codes
284 ~Squirrel.iter_counts
285 ~pyrocko.squirrel.selection.Selection.get_paths
286 ~Squirrel.get_nuts
287 ~Squirrel.get_kinds
288 ~Squirrel.get_deltats
289 ~Squirrel.get_codes
290 ~Squirrel.get_counts
291 ~Squirrel.get_time_span
292 ~Squirrel.get_deltat_span
293 ~Squirrel.get_nfiles
294 ~Squirrel.get_nnuts
295 ~Squirrel.get_total_size
296 ~Squirrel.get_stats
297 ~Squirrel.get_content
298 ~Squirrel.get_stations
299 ~Squirrel.get_channels
300 ~Squirrel.get_responses
301 ~Squirrel.get_events
302 ~Squirrel.get_waveform_nuts
303 ~Squirrel.get_waveforms
304 ~Squirrel.chopper_waveforms
305 ~Squirrel.get_coverage
306 ~Squirrel.pile
307 ~Squirrel.snuffle
308 ~Squirrel.glob_codes
309 ~pyrocko.squirrel.selection.Selection.get_database
310 ~Squirrel.print_tables
311 '''
313 def __init__(
314 self, env=None, database=None, cache_path=None, persistent=None):
316 if not isinstance(env, environment.Environment):
317 env = environment.get_environment(env)
319 if database is None:
320 database = env.expand_path(env.database_path)
322 if cache_path is None:
323 cache_path = env.expand_path(env.cache_path)
325 if persistent is None:
326 persistent = env.persistent
328 Selection.__init__(
329 self, database=database, persistent=persistent)
331 self.get_database().set_basepath(os.path.dirname(env.get_basepath()))
333 self._content_caches = {
334 'waveform': cache.ContentCache(),
335 'default': cache.ContentCache()}
337 self._cache_path = cache_path
339 self._sources = []
340 self._operators = []
341 self._operator_registry = {}
343 self._pile = None
344 self._n_choppers_active = 0
346 self._names.update({
347 'nuts': self.name + '_nuts',
348 'kind_codes_count': self.name + '_kind_codes_count',
349 'coverage': self.name + '_coverage'})
351 with self.transaction() as cursor:
352 self._create_tables_squirrel(cursor)
354 def _create_tables_squirrel(self, cursor):
356 cursor.execute(self._register_table(self._sql(
357 '''
358 CREATE TABLE IF NOT EXISTS %(db)s.%(nuts)s (
359 nut_id integer PRIMARY KEY,
360 file_id integer,
361 file_segment integer,
362 file_element integer,
363 kind_id integer,
364 kind_codes_id integer,
365 tmin_seconds integer,
366 tmin_offset integer,
367 tmax_seconds integer,
368 tmax_offset integer,
369 kscale integer)
370 ''')))
372 cursor.execute(self._register_table(self._sql(
373 '''
374 CREATE TABLE IF NOT EXISTS %(db)s.%(kind_codes_count)s (
375 kind_codes_id integer PRIMARY KEY,
376 count integer)
377 ''')))
379 cursor.execute(self._sql(
380 '''
381 CREATE UNIQUE INDEX IF NOT EXISTS %(db)s.%(nuts)s_file_element
382 ON %(nuts)s (file_id, file_segment, file_element)
383 '''))
385 cursor.execute(self._sql(
386 '''
387 CREATE INDEX IF NOT EXISTS %(db)s.%(nuts)s_index_file_id
388 ON %(nuts)s (file_id)
389 '''))
391 cursor.execute(self._sql(
392 '''
393 CREATE INDEX IF NOT EXISTS %(db)s.%(nuts)s_index_tmin_seconds
394 ON %(nuts)s (kind_id, tmin_seconds)
395 '''))
397 cursor.execute(self._sql(
398 '''
399 CREATE INDEX IF NOT EXISTS %(db)s.%(nuts)s_index_tmax_seconds
400 ON %(nuts)s (kind_id, tmax_seconds)
401 '''))
403 cursor.execute(self._sql(
404 '''
405 CREATE INDEX IF NOT EXISTS %(db)s.%(nuts)s_index_kscale
406 ON %(nuts)s (kind_id, kscale, tmin_seconds)
407 '''))
409 cursor.execute(self._sql(
410 '''
411 CREATE TRIGGER IF NOT EXISTS %(db)s.%(nuts)s_delete_nuts
412 BEFORE DELETE ON main.files FOR EACH ROW
413 BEGIN
414 DELETE FROM %(nuts)s WHERE file_id == old.file_id;
415 END
416 '''))
418 # trigger only on size to make silent update of mtime possible
419 cursor.execute(self._sql(
420 '''
421 CREATE TRIGGER IF NOT EXISTS %(db)s.%(nuts)s_delete_nuts2
422 BEFORE UPDATE OF size ON main.files FOR EACH ROW
423 BEGIN
424 DELETE FROM %(nuts)s WHERE file_id == old.file_id;
425 END
426 '''))
428 cursor.execute(self._sql(
429 '''
430 CREATE TRIGGER IF NOT EXISTS
431 %(db)s.%(file_states)s_delete_files
432 BEFORE DELETE ON %(db)s.%(file_states)s FOR EACH ROW
433 BEGIN
434 DELETE FROM %(nuts)s WHERE file_id == old.file_id;
435 END
436 '''))
438 cursor.execute(self._sql(
439 '''
440 CREATE TRIGGER IF NOT EXISTS %(db)s.%(nuts)s_inc_kind_codes
441 BEFORE INSERT ON %(nuts)s FOR EACH ROW
442 BEGIN
443 INSERT OR IGNORE INTO %(kind_codes_count)s VALUES
444 (new.kind_codes_id, 0);
445 UPDATE %(kind_codes_count)s
446 SET count = count + 1
447 WHERE new.kind_codes_id
448 == %(kind_codes_count)s.kind_codes_id;
449 END
450 '''))
452 cursor.execute(self._sql(
453 '''
454 CREATE TRIGGER IF NOT EXISTS %(db)s.%(nuts)s_dec_kind_codes
455 BEFORE DELETE ON %(nuts)s FOR EACH ROW
456 BEGIN
457 UPDATE %(kind_codes_count)s
458 SET count = count - 1
459 WHERE old.kind_codes_id
460 == %(kind_codes_count)s.kind_codes_id;
461 END
462 '''))
464 cursor.execute(self._register_table(self._sql(
465 '''
466 CREATE TABLE IF NOT EXISTS %(db)s.%(coverage)s (
467 kind_codes_id integer,
468 time_seconds integer,
469 time_offset integer,
470 step integer)
471 ''')))
473 cursor.execute(self._sql(
474 '''
475 CREATE UNIQUE INDEX IF NOT EXISTS %(db)s.%(coverage)s_time
476 ON %(coverage)s (kind_codes_id, time_seconds, time_offset)
477 '''))
479 cursor.execute(self._sql(
480 '''
481 CREATE TRIGGER IF NOT EXISTS %(db)s.%(nuts)s_add_coverage
482 AFTER INSERT ON %(nuts)s FOR EACH ROW
483 BEGIN
484 INSERT OR IGNORE INTO %(coverage)s VALUES
485 (new.kind_codes_id, new.tmin_seconds, new.tmin_offset, 0)
486 ;
487 UPDATE %(coverage)s
488 SET step = step + 1
489 WHERE new.kind_codes_id == %(coverage)s.kind_codes_id
490 AND new.tmin_seconds == %(coverage)s.time_seconds
491 AND new.tmin_offset == %(coverage)s.time_offset
492 ;
493 INSERT OR IGNORE INTO %(coverage)s VALUES
494 (new.kind_codes_id, new.tmax_seconds, new.tmax_offset, 0)
495 ;
496 UPDATE %(coverage)s
497 SET step = step - 1
498 WHERE new.kind_codes_id == %(coverage)s.kind_codes_id
499 AND new.tmax_seconds == %(coverage)s.time_seconds
500 AND new.tmax_offset == %(coverage)s.time_offset
501 ;
502 DELETE FROM %(coverage)s
503 WHERE new.kind_codes_id == %(coverage)s.kind_codes_id
504 AND new.tmin_seconds == %(coverage)s.time_seconds
505 AND new.tmin_offset == %(coverage)s.time_offset
506 AND step == 0
507 ;
508 DELETE FROM %(coverage)s
509 WHERE new.kind_codes_id == %(coverage)s.kind_codes_id
510 AND new.tmax_seconds == %(coverage)s.time_seconds
511 AND new.tmax_offset == %(coverage)s.time_offset
512 AND step == 0
513 ;
514 END
515 '''))
517 cursor.execute(self._sql(
518 '''
519 CREATE TRIGGER IF NOT EXISTS %(db)s.%(nuts)s_remove_coverage
520 BEFORE DELETE ON %(nuts)s FOR EACH ROW
521 BEGIN
522 INSERT OR IGNORE INTO %(coverage)s VALUES
523 (old.kind_codes_id, old.tmin_seconds, old.tmin_offset, 0)
524 ;
525 UPDATE %(coverage)s
526 SET step = step - 1
527 WHERE old.kind_codes_id == %(coverage)s.kind_codes_id
528 AND old.tmin_seconds == %(coverage)s.time_seconds
529 AND old.tmin_offset == %(coverage)s.time_offset
530 ;
531 INSERT OR IGNORE INTO %(coverage)s VALUES
532 (old.kind_codes_id, old.tmax_seconds, old.tmax_offset, 0)
533 ;
534 UPDATE %(coverage)s
535 SET step = step + 1
536 WHERE old.kind_codes_id == %(coverage)s.kind_codes_id
537 AND old.tmax_seconds == %(coverage)s.time_seconds
538 AND old.tmax_offset == %(coverage)s.time_offset
539 ;
540 DELETE FROM %(coverage)s
541 WHERE old.kind_codes_id == %(coverage)s.kind_codes_id
542 AND old.tmin_seconds == %(coverage)s.time_seconds
543 AND old.tmin_offset == %(coverage)s.time_offset
544 AND step == 0
545 ;
546 DELETE FROM %(coverage)s
547 WHERE old.kind_codes_id == %(coverage)s.kind_codes_id
548 AND old.tmax_seconds == %(coverage)s.time_seconds
549 AND old.tmax_offset == %(coverage)s.time_offset
550 AND step == 0
551 ;
552 END
553 '''))
555 def _delete(self):
556 '''Delete database tables associated with this Squirrel.'''
558 for s in '''
559 DROP TRIGGER %(db)s.%(nuts)s_delete_nuts;
560 DROP TRIGGER %(db)s.%(nuts)s_delete_nuts2;
561 DROP TRIGGER %(db)s.%(file_states)s_delete_files;
562 DROP TRIGGER %(db)s.%(nuts)s_inc_kind_codes;
563 DROP TRIGGER %(db)s.%(nuts)s_dec_kind_codes;
564 DROP TABLE %(db)s.%(nuts)s;
565 DROP TABLE %(db)s.%(kind_codes_count)s;
566 DROP TRIGGER IF EXISTS %(db)s.%(nuts)s_add_coverage;
567 DROP TRIGGER IF EXISTS %(db)s.%(nuts)s_remove_coverage;
568 DROP TABLE IF EXISTS %(db)s.%(coverage)s;
569 '''.strip().splitlines():
571 self._conn.execute(self._sql(s))
573 Selection._delete(self)
575 @filldocs
576 def add(self,
577 paths,
578 kinds=None,
579 format='detect',
580 include=None,
581 exclude=None,
582 check=True):
584 '''
585 Add files to the selection.
587 :param paths:
588 Iterator yielding paths to files or directories to be added to the
589 selection. Recurses into directories. If given a ``str``, it
590 is treated as a single path to be added.
591 :type paths:
592 :py:class:`list` of :py:class:`str`
594 :param kinds:
595 Content types to be made available through the Squirrel selection.
596 By default, all known content types are accepted.
597 :type kinds:
598 :py:class:`list` of :py:class:`str`
600 :param format:
601 File format identifier or ``'detect'`` to enable auto-detection
602 (available: %(file_formats)s).
603 :type format:
604 str
606 :param include:
607 If not ``None``, files are only included if their paths match the
608 given regular expression pattern.
609 :type format:
610 str
612 :param exclude:
613 If not ``None``, files are only included if their paths do not
614 match the given regular expression pattern.
615 :type format:
616 str
618 :param check:
619 If ``True``, all file modification times are checked to see if
620 cached information has to be updated (slow). If ``False``, only
621 previously unknown files are indexed and cached information is used
622 for known files, regardless of file state (fast, corrresponds to
623 Squirrel's ``--optimistic`` mode). File deletions will go
624 undetected in the latter case.
625 :type check:
626 bool
628 :Complexity:
629 O(log N)
630 '''
632 if isinstance(kinds, str):
633 kinds = (kinds,)
635 if isinstance(paths, str):
636 paths = [paths]
638 kind_mask = model.to_kind_mask(kinds)
640 with progress.view():
641 Selection.add(
642 self, util.iter_select_files(
643 paths,
644 show_progress=False,
645 include=include,
646 exclude=exclude,
647 pass_through=lambda path: path.startswith('virtual:')
648 ), kind_mask, format)
650 self._load(check)
651 self._update_nuts()
653 def reload(self):
654 '''
655 Check for modifications and reindex modified files.
657 Based on file modification times.
658 '''
660 self._set_file_states_force_check()
661 self._load(check=True)
662 self._update_nuts()
664 def add_virtual(self, nuts, virtual_paths=None):
665 '''
666 Add content which is not backed by files.
668 :param nuts:
669 Content pieces to be added.
670 :type nuts:
671 iterator yielding :py:class:`~pyrocko.squirrel.model.Nut` objects
673 :param virtual_paths:
674 List of virtual paths to prevent creating a temporary list of the
675 nuts while aggregating the file paths for the selection.
676 :type virtual_paths:
677 :py:class:`list` of :py:class:`str`
679 Stores to the main database and the selection.
680 '''
682 if isinstance(virtual_paths, str):
683 virtual_paths = [virtual_paths]
685 if virtual_paths is None:
686 if not isinstance(nuts, list):
687 nuts = list(nuts)
688 virtual_paths = set(nut.file_path for nut in nuts)
690 Selection.add(self, virtual_paths)
691 self.get_database().dig(nuts)
692 self._update_nuts()
694 def add_volatile(self, nuts):
695 if not isinstance(nuts, list):
696 nuts = list(nuts)
698 paths = list(set(nut.file_path for nut in nuts))
699 io.backends.virtual.add_nuts(nuts)
700 self.add_virtual(nuts, paths)
701 self._volatile_paths.extend(paths)
703 def add_volatile_waveforms(self, traces):
704 '''
705 Add in-memory waveforms which will be removed when the app closes.
706 '''
708 name = model.random_name()
710 path = 'virtual:volatile:%s' % name
712 nuts = []
713 for itr, tr in enumerate(traces):
714 assert tr.tmin <= tr.tmax
715 tmin_seconds, tmin_offset = model.tsplit(tr.tmin)
716 tmax_seconds, tmax_offset = model.tsplit(
717 tr.tmin + tr.data_len()*tr.deltat)
719 nuts.append(model.Nut(
720 file_path=path,
721 file_format='virtual',
722 file_segment=itr,
723 file_element=0,
724 file_mtime=0,
725 codes=separator.join(tr.codes),
726 tmin_seconds=tmin_seconds,
727 tmin_offset=tmin_offset,
728 tmax_seconds=tmax_seconds,
729 tmax_offset=tmax_offset,
730 deltat=tr.deltat,
731 kind_id=to_kind_id('waveform'),
732 content=tr))
734 self.add_volatile(nuts)
735 return path
737 def _load(self, check):
738 for _ in io.iload(
739 self,
740 content=[],
741 skip_unchanged=True,
742 check=check):
743 pass
745 def _update_nuts(self):
746 transaction = self.transaction()
747 with make_task('Aggregating selection') as task, \
748 transaction as cursor:
750 self._conn.set_progress_handler(task.update, 100000)
751 nrows = cursor.execute(self._sql(
752 '''
753 INSERT INTO %(db)s.%(nuts)s
754 SELECT NULL,
755 nuts.file_id, nuts.file_segment, nuts.file_element,
756 nuts.kind_id, nuts.kind_codes_id,
757 nuts.tmin_seconds, nuts.tmin_offset,
758 nuts.tmax_seconds, nuts.tmax_offset,
759 nuts.kscale
760 FROM %(db)s.%(file_states)s
761 INNER JOIN nuts
762 ON %(db)s.%(file_states)s.file_id == nuts.file_id
763 INNER JOIN kind_codes
764 ON nuts.kind_codes_id ==
765 kind_codes.kind_codes_id
766 WHERE %(db)s.%(file_states)s.file_state != 2
767 AND (((1 << kind_codes.kind_id)
768 & %(db)s.%(file_states)s.kind_mask) != 0)
769 ''')).rowcount
771 task.update(nrows)
772 self._set_file_states_known(transaction)
773 self._conn.set_progress_handler(None, 0)
775 def add_source(self, source, check=True):
776 '''
777 Add remote resource.
779 :param source:
780 Remote data access client instance.
781 :type source:
782 subclass of :py:class:`~pyrocko.squirrel.client.base.Source`
783 '''
785 self._sources.append(source)
786 source.setup(self, check=check)
788 def add_fdsn(self, *args, **kwargs):
789 '''
790 Add FDSN site for transparent remote data access.
792 Arguments are passed to
793 :py:class:`~pyrocko.squirrel.client.fdsn.FDSNSource`.
794 '''
796 self.add_source(fdsn.FDSNSource(*args, **kwargs))
798 def add_catalog(self, *args, **kwargs):
799 '''
800 Add online catalog for transparent event data access.
802 Arguments are passed to
803 :py:class:`~pyrocko.squirrel.client.catalog.CatalogSource`.
804 '''
806 self.add_source(catalog.CatalogSource(*args, **kwargs))
808 def add_dataset(self, ds, check=True, warn_persistent=True):
809 '''
810 Read dataset description from file and add its contents.
812 :param ds:
813 Path to dataset description file or dataset description object
814 . See :py:mod:`~pyrocko.squirrel.dataset`.
815 :type ds:
816 :py:class:`str` or :py:class:`~pyrocko.squirrel.dataset.Dataset`
818 :param check:
819 If ``True``, all file modification times are checked to see if
820 cached information has to be updated (slow). If ``False``, only
821 previously unknown files are indexed and cached information is used
822 for known files, regardless of file state (fast, corrresponds to
823 Squirrel's ``--optimistic`` mode). File deletions will go
824 undetected in the latter case.
825 :type check:
826 bool
827 '''
828 if isinstance(ds, str):
829 ds = dataset.read_dataset(ds)
830 path = ds
831 else:
832 path = None
834 if warn_persistent and ds.persistent and (
835 not self._persistent or (self._persistent != ds.persistent)):
837 logger.warning(
838 'Dataset `persistent` flag ignored. Can not be set on already '
839 'existing Squirrel instance.%s' % (
840 ' Dataset: %s' % path if path else ''))
842 ds.setup(self, check=check)
844 def _get_selection_args(
845 self, obj=None, tmin=None, tmax=None, time=None, codes=None):
847 if time is not None:
848 tmin = time
849 tmax = time
851 if obj is not None:
852 tmin = tmin if tmin is not None else obj.tmin
853 tmax = tmax if tmax is not None else obj.tmax
854 codes = codes if codes is not None else codes_inflate2(obj.codes)
856 if isinstance(codes, str):
857 codes = tuple(codes.split('.'))
859 return tmin, tmax, codes
861 def _selection_args_to_kwargs(
862 self, obj=None, tmin=None, tmax=None, time=None, codes=None):
864 return dict(obj=obj, tmin=tmin, tmax=tmax, time=time, codes=codes)
866 def _timerange_sql(self, tmin, tmax, kind, cond, args, naiv):
868 tmin_seconds, tmin_offset = model.tsplit(tmin)
869 tmax_seconds, tmax_offset = model.tsplit(tmax)
870 if naiv:
871 cond.append('%(db)s.%(nuts)s.tmin_seconds <= ?')
872 args.append(tmax_seconds)
873 else:
874 tscale_edges = model.tscale_edges
875 tmin_cond = []
876 for kscale in range(tscale_edges.size + 1):
877 if kscale != tscale_edges.size:
878 tscale = int(tscale_edges[kscale])
879 tmin_cond.append('''
880 (%(db)s.%(nuts)s.kind_id = ?
881 AND %(db)s.%(nuts)s.kscale == ?
882 AND %(db)s.%(nuts)s.tmin_seconds BETWEEN ? AND ?)
883 ''')
884 args.extend(
885 (to_kind_id(kind), kscale,
886 tmin_seconds - tscale - 1, tmax_seconds + 1))
888 else:
889 tmin_cond.append('''
890 (%(db)s.%(nuts)s.kind_id == ?
891 AND %(db)s.%(nuts)s.kscale == ?
892 AND %(db)s.%(nuts)s.tmin_seconds <= ?)
893 ''')
895 args.extend(
896 (to_kind_id(kind), kscale, tmax_seconds + 1))
897 if tmin_cond:
898 cond.append(' ( ' + ' OR '.join(tmin_cond) + ' ) ')
900 cond.append('%(db)s.%(nuts)s.tmax_seconds >= ?')
901 args.append(tmin_seconds)
903 def iter_nuts(
904 self, kind=None, tmin=None, tmax=None, codes=None, naiv=False,
905 kind_codes_ids=None, path=None):
907 '''
908 Iterate over content entities matching given constraints.
910 :param kind:
911 Content kind (or kinds) to extract.
912 :type kind:
913 :py:class:`str`, :py:class:`list` of :py:class:`str`
915 :param tmin:
916 Start time of query interval.
917 :type tmin:
918 timestamp
920 :param tmax:
921 End time of query interval.
922 :type tmax:
923 timestamp
925 :param codes:
926 Pattern of content codes to query.
927 :type codes:
928 :py:class:`tuple` of :py:class:`str`
930 :param naiv:
931 Bypass time span lookup through indices (slow, for testing).
932 :type naiv:
933 :py:class:`bool`
935 :param kind_codes_ids:
936 Kind-codes IDs of contents to be retrieved (internal use).
937 :type kind_codes_ids:
938 :py:class:`list` of :py:class:`str`
940 :yields:
941 :py:class:`~pyrocko.squirrel.model.Nut` objects representing the
942 intersecting content.
944 :complexity:
945 O(log N) for the time selection part due to heavy use of database
946 indices.
948 Query time span is treated as a half-open interval ``[tmin, tmax)``.
949 However, if ``tmin`` equals ``tmax``, the edge logics are modified to
950 closed-interval so that content intersecting with the time instant ``t
951 = tmin = tmax`` is returned (otherwise nothing would be returned as
952 ``[t, t)`` never matches anything).
954 Time spans of content entities to be matched are also treated as half
955 open intervals, e.g. content span ``[0, 1)`` is matched by query span
956 ``[0, 1)`` but not by ``[-1, 0)`` or ``[1, 2)``. Also here, logics are
957 modified to closed-interval when the content time span is an empty
958 interval, i.e. to indicate a time instant. E.g. time instant 0 is
959 matched by ``[0, 1)`` but not by ``[-1, 0)`` or ``[1, 2)``.
960 '''
962 if not isinstance(kind, str):
963 if kind is None:
964 kind = model.g_content_kinds
965 for kind_ in kind:
966 for nut in self.iter_nuts(kind_, tmin, tmax, codes):
967 yield nut
969 return
971 cond = []
972 args = []
973 if tmin is not None or tmax is not None:
974 assert kind is not None
975 if tmin is None:
976 tmin = self.get_time_span()[0]
977 if tmax is None:
978 tmax = self.get_time_span()[1] + 1.0
980 self._timerange_sql(tmin, tmax, kind, cond, args, naiv)
982 elif kind is not None:
983 cond.append('kind_codes.kind_id == ?')
984 args.append(to_kind_id(kind))
986 if codes is not None:
987 pats = codes_patterns_for_kind(kind, codes)
988 if pats:
989 cond.append(
990 ' ( %s ) ' % ' OR '.join(
991 ('kind_codes.codes GLOB ?',) * len(pats)))
992 args.extend(separator.join(pat) for pat in pats)
994 if kind_codes_ids is not None:
995 cond.append(
996 ' ( kind_codes.kind_codes_id IN ( %s ) ) ' % ', '.join(
997 '?'*len(kind_codes_ids)))
999 args.extend(kind_codes_ids)
1001 db = self.get_database()
1002 if path is not None:
1003 cond.append('files.path == ?')
1004 args.append(db.relpath(abspath(path)))
1006 sql = ('''
1007 SELECT
1008 files.path,
1009 files.format,
1010 files.mtime,
1011 files.size,
1012 %(db)s.%(nuts)s.file_segment,
1013 %(db)s.%(nuts)s.file_element,
1014 kind_codes.kind_id,
1015 kind_codes.codes,
1016 %(db)s.%(nuts)s.tmin_seconds,
1017 %(db)s.%(nuts)s.tmin_offset,
1018 %(db)s.%(nuts)s.tmax_seconds,
1019 %(db)s.%(nuts)s.tmax_offset,
1020 kind_codes.deltat
1021 FROM files
1022 INNER JOIN %(db)s.%(nuts)s
1023 ON files.file_id == %(db)s.%(nuts)s.file_id
1024 INNER JOIN kind_codes
1025 ON %(db)s.%(nuts)s.kind_codes_id == kind_codes.kind_codes_id
1026 ''')
1028 if cond:
1029 sql += ''' WHERE ''' + ' AND '.join(cond)
1031 sql = self._sql(sql)
1032 if tmin is None and tmax is None:
1033 for row in self._conn.execute(sql, args):
1034 row = (db.abspath(row[0]),) + row[1:]
1035 nut = model.Nut(values_nocheck=row)
1036 yield nut
1037 else:
1038 assert tmin is not None and tmax is not None
1039 if tmin == tmax:
1040 for row in self._conn.execute(sql, args):
1041 row = (db.abspath(row[0]),) + row[1:]
1042 nut = model.Nut(values_nocheck=row)
1043 if (nut.tmin <= tmin < nut.tmax) \
1044 or (nut.tmin == nut.tmax and tmin == nut.tmin):
1046 yield nut
1047 else:
1048 for row in self._conn.execute(sql, args):
1049 row = (db.abspath(row[0]),) + row[1:]
1050 nut = model.Nut(values_nocheck=row)
1051 if (tmin < nut.tmax and nut.tmin < tmax) \
1052 or (nut.tmin == nut.tmax
1053 and tmin <= nut.tmin < tmax):
1055 yield nut
1057 def get_nuts(self, *args, **kwargs):
1058 '''
1059 Get content entities matching given constraints.
1061 Like :py:meth:`iter_nuts` but returns results as a list.
1062 '''
1064 return list(self.iter_nuts(*args, **kwargs))
1066 def _split_nuts(
1067 self, kind, tmin=None, tmax=None, codes=None, path=None):
1069 tmin_seconds, tmin_offset = model.tsplit(tmin)
1070 tmax_seconds, tmax_offset = model.tsplit(tmax)
1072 names_main_nuts = dict(self._names)
1073 names_main_nuts.update(db='main', nuts='nuts')
1075 db = self.get_database()
1077 def main_nuts(s):
1078 return s % names_main_nuts
1080 with self.transaction() as cursor:
1081 # modify selection and main
1082 for sql_subst in [
1083 self._sql, main_nuts]:
1085 cond = []
1086 args = []
1088 self._timerange_sql(tmin, tmax, kind, cond, args, False)
1090 if codes is not None:
1091 pats = codes_patterns_for_kind(kind, codes)
1092 if pats:
1093 cond.append(
1094 ' ( %s ) ' % ' OR '.join(
1095 ('kind_codes.codes GLOB ?',) * len(pats)))
1096 args.extend(separator.join(pat) for pat in pats)
1098 if path is not None:
1099 cond.append('files.path == ?')
1100 args.append(db.relpath(abspath(path)))
1102 sql = sql_subst('''
1103 SELECT
1104 %(db)s.%(nuts)s.nut_id,
1105 %(db)s.%(nuts)s.tmin_seconds,
1106 %(db)s.%(nuts)s.tmin_offset,
1107 %(db)s.%(nuts)s.tmax_seconds,
1108 %(db)s.%(nuts)s.tmax_offset,
1109 kind_codes.deltat
1110 FROM files
1111 INNER JOIN %(db)s.%(nuts)s
1112 ON files.file_id == %(db)s.%(nuts)s.file_id
1113 INNER JOIN kind_codes
1114 ON %(db)s.%(nuts)s.kind_codes_id == kind_codes.kind_codes_id
1115 WHERE ''' + ' AND '.join(cond)) # noqa
1117 insert = []
1118 delete = []
1119 for row in cursor.execute(sql, args):
1120 nut_id, nut_tmin_seconds, nut_tmin_offset, \
1121 nut_tmax_seconds, nut_tmax_offset, nut_deltat = row
1123 nut_tmin = model.tjoin(
1124 nut_tmin_seconds, nut_tmin_offset)
1125 nut_tmax = model.tjoin(
1126 nut_tmax_seconds, nut_tmax_offset)
1128 if nut_tmin < tmax and tmin < nut_tmax:
1129 if nut_tmin < tmin:
1130 insert.append((
1131 nut_tmin_seconds, nut_tmin_offset,
1132 tmin_seconds, tmin_offset,
1133 model.tscale_to_kscale(
1134 tmin_seconds - nut_tmin_seconds),
1135 nut_id))
1137 if tmax < nut_tmax:
1138 insert.append((
1139 tmax_seconds, tmax_offset,
1140 nut_tmax_seconds, nut_tmax_offset,
1141 model.tscale_to_kscale(
1142 nut_tmax_seconds - tmax_seconds),
1143 nut_id))
1145 delete.append((nut_id,))
1147 sql_add = '''
1148 INSERT INTO %(db)s.%(nuts)s (
1149 file_id, file_segment, file_element, kind_id,
1150 kind_codes_id, tmin_seconds, tmin_offset,
1151 tmax_seconds, tmax_offset, kscale )
1152 SELECT
1153 file_id, file_segment, file_element,
1154 kind_id, kind_codes_id, ?, ?, ?, ?, ?
1155 FROM %(db)s.%(nuts)s
1156 WHERE nut_id == ?
1157 '''
1158 cursor.executemany(sql_subst(sql_add), insert)
1160 sql_delete = '''
1161 DELETE FROM %(db)s.%(nuts)s WHERE nut_id == ?
1162 '''
1163 cursor.executemany(sql_subst(sql_delete), delete)
1165 def get_time_span(self, kinds=None):
1166 '''
1167 Get time interval over all content in selection.
1169 :param kinds:
1170 If not ``None``, restrict query to given content kinds.
1171 :type kind:
1172 list of str
1174 :complexity:
1175 O(1), independent of the number of nuts.
1177 :returns:
1178 ``(tmin, tmax)``, combined time interval of queried content kinds.
1179 '''
1181 sql_min = self._sql('''
1182 SELECT MIN(tmin_seconds), MIN(tmin_offset)
1183 FROM %(db)s.%(nuts)s
1184 WHERE kind_id == ?
1185 AND tmin_seconds == (
1186 SELECT MIN(tmin_seconds)
1187 FROM %(db)s.%(nuts)s
1188 WHERE kind_id == ?)
1189 ''')
1191 sql_max = self._sql('''
1192 SELECT MAX(tmax_seconds), MAX(tmax_offset)
1193 FROM %(db)s.%(nuts)s
1194 WHERE kind_id == ?
1195 AND tmax_seconds == (
1196 SELECT MAX(tmax_seconds)
1197 FROM %(db)s.%(nuts)s
1198 WHERE kind_id == ?)
1199 ''')
1201 gtmin = None
1202 gtmax = None
1204 if isinstance(kinds, str):
1205 kinds = [kinds]
1207 if kinds is None:
1208 kind_ids = model.g_content_kind_ids
1209 else:
1210 kind_ids = model.to_kind_ids(kinds)
1212 for kind_id in kind_ids:
1213 for tmin_seconds, tmin_offset in self._conn.execute(
1214 sql_min, (kind_id, kind_id)):
1215 tmin = model.tjoin(tmin_seconds, tmin_offset)
1216 if tmin is not None and (gtmin is None or tmin < gtmin):
1217 gtmin = tmin
1219 for (tmax_seconds, tmax_offset) in self._conn.execute(
1220 sql_max, (kind_id, kind_id)):
1221 tmax = model.tjoin(tmax_seconds, tmax_offset)
1222 if tmax is not None and (gtmax is None or tmax > gtmax):
1223 gtmax = tmax
1225 return gtmin, gtmax
1227 def has(self, kinds):
1228 '''
1229 Check availability of given content kinds.
1231 :param kinds:
1232 Content kinds to query.
1233 :type kind:
1234 list of str
1236 :returns:
1237 ``True`` if any of the queried content kinds is available
1238 in the selection.
1239 '''
1240 self_tmin, self_tmax = self.get_time_span(kinds)
1242 return None not in (self_tmin, self_tmax)
1244 def get_deltat_span(self, kind):
1245 '''
1246 Get min and max sampling interval of all content of given kind.
1248 :param kind:
1249 Content kind
1250 :type kind:
1251 str
1253 :returns: ``(deltat_min, deltat_max)``
1254 '''
1256 deltats = [
1257 deltat for deltat in self.get_deltats(kind)
1258 if deltat is not None]
1260 if deltats:
1261 return min(deltats), max(deltats)
1262 else:
1263 return None, None
1265 def iter_kinds(self, codes=None):
1266 '''
1267 Iterate over content types available in selection.
1269 :param codes:
1270 If given, get kinds only for selected codes identifier.
1271 :type codes:
1272 :py:class:`tuple` of :py:class:`str`
1274 :yields:
1275 Available content kinds as :py:class:`str`.
1277 :complexity:
1278 O(1), independent of number of nuts.
1279 '''
1281 return self._database._iter_kinds(
1282 codes=codes,
1283 kind_codes_count='%(db)s.%(kind_codes_count)s' % self._names)
1285 def iter_deltats(self, kind=None):
1286 '''
1287 Iterate over sampling intervals available in selection.
1289 :param kind:
1290 If given, get sampling intervals only for a given content type.
1291 :type kind:
1292 str
1294 :yields:
1295 :py:class:`float` values.
1297 :complexity:
1298 O(1), independent of number of nuts.
1299 '''
1300 return self._database._iter_deltats(
1301 kind=kind,
1302 kind_codes_count='%(db)s.%(kind_codes_count)s' % self._names)
1304 def iter_codes(self, kind=None):
1305 '''
1306 Iterate over content identifier code sequences available in selection.
1308 :param kind:
1309 If given, get codes only for a given content type.
1310 :type kind:
1311 str
1313 :yields:
1314 :py:class:`tuple` of :py:class:`str`
1316 :complexity:
1317 O(1), independent of number of nuts.
1318 '''
1319 return self._database._iter_codes(
1320 kind=kind,
1321 kind_codes_count='%(db)s.%(kind_codes_count)s' % self._names)
1323 def iter_counts(self, kind=None):
1324 '''
1325 Iterate over number of occurrences of any (kind, codes) combination.
1327 :param kind:
1328 If given, get counts only for selected content type.
1329 :type kind:
1330 str
1332 :yields:
1333 Tuples of the form ``((kind, codes), count)``.
1335 :complexity:
1336 O(1), independent of number of nuts.
1337 '''
1338 return self._database._iter_counts(
1339 kind=kind,
1340 kind_codes_count='%(db)s.%(kind_codes_count)s' % self._names)
1342 def get_kinds(self, codes=None):
1343 '''
1344 Get content types available in selection.
1346 :param codes:
1347 If given, get kinds only for selected codes identifier.
1348 :type codes:
1349 :py:class:`tuple` of :py:class:`str`
1351 :returns:
1352 Sorted list of available content types.
1354 :complexity:
1355 O(1), independent of number of nuts.
1357 '''
1358 return sorted(list(self.iter_kinds(codes=codes)))
1360 def get_deltats(self, kind=None):
1361 '''
1362 Get sampling intervals available in selection.
1364 :param kind:
1365 If given, get sampling intervals only for selected content type.
1366 :type kind:
1367 str
1369 :complexity:
1370 O(1), independent of number of nuts.
1372 :returns: Sorted list of available sampling intervals.
1373 '''
1374 return sorted(list(self.iter_deltats(kind=kind)))
1376 def get_codes(self, kind=None):
1377 '''
1378 Get identifier code sequences available in selection.
1380 :param kind:
1381 If given, get codes only for selected content type.
1382 :type kind:
1383 str
1385 :complexity:
1386 O(1), independent of number of nuts.
1388 :returns: Sorted list of available codes as tuples of strings.
1389 '''
1390 return sorted(list(self.iter_codes(kind=kind)))
1392 def get_counts(self, kind=None):
1393 '''
1394 Get number of occurrences of any (kind, codes) combination.
1396 :param kind:
1397 If given, get codes only for selected content type.
1398 :type kind:
1399 str
1401 :complexity:
1402 O(1), independent of number of nuts.
1404 :returns: ``dict`` with ``counts[kind][codes]`` or ``counts[codes]``
1405 if kind is not ``None``
1406 '''
1407 d = {}
1408 for (k, codes, deltat), count in self.iter_counts():
1409 if k not in d:
1410 v = d[k] = {}
1411 else:
1412 v = d[k]
1414 if codes not in v:
1415 v[codes] = 0
1417 v[codes] += count
1419 if kind is not None:
1420 return d[kind]
1421 else:
1422 return d
1424 def glob_codes(self, kind, codes_list):
1425 '''
1426 Find codes matching given patterns.
1428 :param kind:
1429 Content kind to be queried.
1430 :type kind:
1431 str
1433 :param codes_list:
1434 List of code patterns to query. If not given or empty, an empty
1435 list is returned.
1436 :type codes_list:
1437 :py:class:`list` of :py:class:`tuple` of :py:class:`str`
1439 :returns:
1440 List of matches of the form ``[kind_codes_id, codes, deltat]``.
1441 '''
1443 args = [to_kind_id(kind)]
1444 pats = []
1445 for codes in codes_list:
1446 pats.extend(codes_patterns_for_kind(kind, codes))
1448 if pats:
1449 codes_cond = 'AND ( %s ) ' % ' OR '.join(
1450 ('kind_codes.codes GLOB ?',) * len(pats))
1452 args.extend(separator.join(pat) for pat in pats)
1453 else:
1454 codes_cond = ''
1456 sql = self._sql('''
1457 SELECT kind_codes_id, codes, deltat FROM kind_codes
1458 WHERE
1459 kind_id == ? ''' + codes_cond)
1461 return list(map(list, self._conn.execute(sql, args)))
1463 def update(self, constraint=None, **kwargs):
1464 '''
1465 Update or partially update channel and event inventories.
1467 :param constraint:
1468 Selection of times or areas to be brought up to date.
1469 :type constraint:
1470 :py:class:`~pyrocko.squirrel.client.base.Constraint`
1472 :param \\*\\*kwargs:
1473 Shortcut for setting ``constraint=Constraint(**kwargs)``.
1475 This function triggers all attached remote sources, to check for
1476 updates in the meta-data. The sources will only submit queries when
1477 their expiration date has passed, or if the selection spans into
1478 previously unseen times or areas.
1479 '''
1481 if constraint is None:
1482 constraint = client.Constraint(**kwargs)
1484 for source in self._sources:
1485 source.update_channel_inventory(self, constraint)
1486 source.update_event_inventory(self, constraint)
1488 def update_waveform_promises(self, constraint=None, **kwargs):
1489 '''
1490 Permit downloading of remote waveforms.
1492 :param constraint:
1493 Remote waveforms compatible with the given constraint are enabled
1494 for download.
1495 :type constraint:
1496 :py:class:`~pyrocko.squirrel.client.base.Constraint`
1498 :param \\*\\*kwargs:
1499 Shortcut for setting ``constraint=Constraint(**kwargs)``.
1501 Calling this method permits Squirrel to download waveforms from remote
1502 sources when processing subsequent waveform requests. This works by
1503 inserting so called waveform promises into the database. It will look
1504 into the available channels for each remote source and create a promise
1505 for each channel compatible with the given constraint. If the promise
1506 then matches in a waveform request, Squirrel tries to download the
1507 waveform. If the download is successful, the downloaded waveform is
1508 added to the Squirrel and the promise is deleted. If the download
1509 fails, the promise is kept if the reason of failure looks like being
1510 temporary, e.g. because of a network failure. If the cause of failure
1511 however seems to be permanent, the promise is deleted so that no
1512 further attempts are made to download a waveform which might not be
1513 available from that server at all. To force re-scheduling after a
1514 permanent failure, call :py:meth:`update_waveform_promises`
1515 yet another time.
1516 '''
1518 if constraint is None:
1519 constraint = client.Constraint(**kwargs)
1521 # TODO
1522 print('contraint ignored atm')
1524 for source in self._sources:
1525 source.update_waveform_promises(self, constraint)
1527 def update_responses(self, constraint=None, **kwargs):
1528 # TODO
1529 if constraint is None:
1530 constraint = client.Constraint(**kwargs)
1532 print('contraint ignored atm')
1533 for source in self._sources:
1534 source.update_response_inventory(self, constraint)
1536 def get_nfiles(self):
1537 '''
1538 Get number of files in selection.
1539 '''
1541 sql = self._sql('''SELECT COUNT(*) FROM %(db)s.%(file_states)s''')
1542 for row in self._conn.execute(sql):
1543 return row[0]
1545 def get_nnuts(self):
1546 '''
1547 Get number of nuts in selection.
1548 '''
1550 sql = self._sql('''SELECT COUNT(*) FROM %(db)s.%(nuts)s''')
1551 for row in self._conn.execute(sql):
1552 return row[0]
1554 def get_total_size(self):
1555 '''
1556 Get aggregated file size available in selection.
1557 '''
1559 sql = self._sql('''
1560 SELECT SUM(files.size) FROM %(db)s.%(file_states)s
1561 INNER JOIN files
1562 ON %(db)s.%(file_states)s.file_id = files.file_id
1563 ''')
1565 for row in self._conn.execute(sql):
1566 return row[0] or 0
1568 def get_stats(self):
1569 '''
1570 Get statistics on contents available through this selection.
1571 '''
1573 kinds = self.get_kinds()
1574 time_spans = {}
1575 for kind in kinds:
1576 time_spans[kind] = self.get_time_span([kind])
1578 return SquirrelStats(
1579 nfiles=self.get_nfiles(),
1580 nnuts=self.get_nnuts(),
1581 kinds=kinds,
1582 codes=self.get_codes(),
1583 total_size=self.get_total_size(),
1584 counts=self.get_counts(),
1585 time_spans=time_spans,
1586 sources=[s.describe() for s in self._sources],
1587 operators=[op.describe() for op in self._operators])
1589 def get_content(
1590 self,
1591 nut,
1592 cache_id='default',
1593 accessor_id='default',
1594 show_progress=False):
1596 '''
1597 Get and possibly load full content for a given index entry from file.
1599 Loads the actual content objects (channel, station, waveform, ...) from
1600 file. For efficiency sibling content (all stuff in the same file
1601 segment) will also be loaded as a side effect. The loaded contents are
1602 cached in the Squirrel object.
1603 '''
1605 content_cache = self._content_caches[cache_id]
1606 if not content_cache.has(nut):
1608 for nut_loaded in io.iload(
1609 nut.file_path,
1610 segment=nut.file_segment,
1611 format=nut.file_format,
1612 database=self._database,
1613 show_progress=show_progress):
1615 content_cache.put(nut_loaded)
1617 try:
1618 return content_cache.get(nut, accessor_id)
1619 except KeyError:
1620 raise error.NotAvailable(
1621 'Unable to retrieve content: %s, %s, %s, %s' % nut.key)
1623 def advance_accessor(self, accessor_id, cache_id=None):
1624 '''
1625 Notify memory caches about consumer moving to a new data batch.
1627 :param accessor_id:
1628 Name of accessing consumer to be advanced.
1629 :type accessor_id:
1630 str
1632 :param cache_id:
1633 Name of cache to for which the accessor should be advanced. By
1634 default the named accessor is advanced in all registered caches.
1635 By default, two caches named ``'default'`` and ``'waveforms'`` are
1636 available.
1637 :type cache_id:
1638 str
1640 See :py:class:`~pyrocko.squirrel.cache.ContentCache` for details on how
1641 Squirrel's memory caching works and can be tuned. Default behaviour is
1642 to release data when it has not been used in the latest data
1643 window/batch. If the accessor is never advanced, data is cached
1644 indefinitely - which is often desired e.g. for station meta-data.
1645 Methods for consecutive data traversal, like
1646 :py:meth:`chopper_waveforms` automatically advance and clear
1647 their accessor.
1648 '''
1649 for cache_ in (
1650 self._content_caches.keys()
1651 if cache_id is None
1652 else [cache_id]):
1654 self._content_caches[cache_].advance_accessor(accessor_id)
1656 def clear_accessor(self, accessor_id, cache_id=None):
1657 '''
1658 Notify memory caches about a consumer having finished.
1660 :param accessor_id:
1661 Name of accessor to be cleared.
1662 :type accessor_id:
1663 str
1665 :param cache_id:
1666 Name of cache for which the accessor should be cleared. By default
1667 the named accessor is cleared from all registered caches. By
1668 default, two caches named ``'default'`` and ``'waveforms'`` are
1669 available.
1670 :type cache_id:
1671 str
1673 Calling this method clears all references to cache entries held by the
1674 named accessor. Cache entries are then freed if not referenced by any
1675 other accessor.
1676 '''
1678 for cache_ in (
1679 self._content_caches.keys()
1680 if cache_id is None
1681 else [cache_id]):
1683 self._content_caches[cache_].clear_accessor(accessor_id)
1685 def _check_duplicates(self, nuts):
1686 d = defaultdict(list)
1687 for nut in nuts:
1688 d[nut.codes].append(nut)
1690 for codes, group in d.items():
1691 if len(group) > 1:
1692 logger.warning(
1693 'Multiple entries matching codes: %s'
1694 % '.'.join(codes.split(separator)))
1696 @filldocs
1697 def get_stations(
1698 self, obj=None, tmin=None, tmax=None, time=None, codes=None,
1699 model='squirrel'):
1701 '''
1702 Get stations matching given constraints.
1704 %(query_args)s
1706 :param model:
1707 Select object model for returned values: ``'squirrel'`` to get
1708 Squirrel station objects or ``'pyrocko'`` to get Pyrocko station
1709 objects with channel information attached.
1710 :type model:
1711 str
1713 :returns:
1714 List of :py:class:`pyrocko.squirrel.Station
1715 <pyrocko.squirrel.model.Station>` objects by default or list of
1716 :py:class:`pyrocko.model.Station <pyrocko.model.station.Station>`
1717 objects if ``model='pyrocko'`` is requested.
1719 See :py:meth:`iter_nuts` for details on time span matching.
1720 '''
1722 if model == 'pyrocko':
1723 return self._get_pyrocko_stations(obj, tmin, tmax, time, codes)
1724 elif model == 'squirrel':
1725 args = self._get_selection_args(obj, tmin, tmax, time, codes)
1726 nuts = sorted(
1727 self.iter_nuts('station', *args), key=lambda nut: nut.dkey)
1728 self._check_duplicates(nuts)
1729 return [self.get_content(nut) for nut in nuts]
1730 else:
1731 raise ValueError('Invalid station model: %s' % model)
1733 @filldocs
1734 def get_channels(
1735 self, obj=None, tmin=None, tmax=None, time=None, codes=None):
1737 '''
1738 Get channels matching given constraints.
1740 %(query_args)s
1742 :returns:
1743 List of :py:class:`~pyrocko.squirrel.model.Channel` objects.
1745 See :py:meth:`iter_nuts` for details on time span matching.
1746 '''
1748 args = self._get_selection_args(obj, tmin, tmax, time, codes)
1749 nuts = sorted(
1750 self.iter_nuts('channel', *args), key=lambda nut: nut.dkey)
1751 self._check_duplicates(nuts)
1752 return [self.get_content(nut) for nut in nuts]
1754 @filldocs
1755 def get_sensors(
1756 self, obj=None, tmin=None, tmax=None, time=None, codes=None):
1758 '''
1759 Get sensors matching given constraints.
1761 %(query_args)s
1763 :returns:
1764 List of :py:class:`~pyrocko.squirrel.model.Sensor` objects.
1766 See :py:meth:`iter_nuts` for details on time span matching.
1767 '''
1769 tmin, tmax, codes = self._get_selection_args(
1770 obj, tmin, tmax, time, codes)
1772 if codes is not None:
1773 if isinstance(codes, str):
1774 codes = codes.split('.')
1775 codes = tuple(codes_inflate(codes))
1776 if codes[4] != '*':
1777 codes = codes[:4] + (codes[4][:-1] + '?',) + codes[5:]
1779 nuts = sorted(
1780 self.iter_nuts(
1781 'channel', tmin, tmax, codes), key=lambda nut: nut.dkey)
1782 self._check_duplicates(nuts)
1783 return model.Sensor.from_channels(
1784 self.get_content(nut) for nut in nuts)
1786 @filldocs
1787 def get_responses(
1788 self, obj=None, tmin=None, tmax=None, time=None, codes=None):
1790 '''
1791 Get instrument responses matching given constraints.
1793 %(query_args)s
1795 :returns:
1796 List of :py:class:`~pyrocko.squirrel.model.Response` objects.
1798 See :py:meth:`iter_nuts` for details on time span matching.
1799 '''
1801 args = self._get_selection_args(obj, tmin, tmax, time, codes)
1802 nuts = sorted(
1803 self.iter_nuts('response', *args), key=lambda nut: nut.dkey)
1804 self._check_duplicates(nuts)
1805 return [self.get_content(nut) for nut in nuts]
1807 @filldocs
1808 def get_response(
1809 self, obj=None, tmin=None, tmax=None, time=None, codes=None):
1811 '''
1812 Get instrument response matching given constraints.
1814 %(query_args)s
1816 :returns:
1817 :py:class:`~pyrocko.squirrel.model.Response` object.
1819 Same as :py:meth:`get_responses` but returning exactly one response.
1820 Raises :py:exc:`~pyrocko.squirrel.error.NotAvailable` if zero or more
1821 than one is available.
1823 See :py:meth:`iter_nuts` for details on time span matching.
1824 '''
1826 responses = self.get_responses(obj, tmin, tmax, time, codes)
1827 if len(responses) == 0:
1828 raise error.NotAvailable(
1829 'No instrument response available.')
1830 elif len(responses) > 1:
1831 raise error.NotAvailable(
1832 'Multiple instrument responses matching given constraints.')
1834 return responses[0]
1836 @filldocs
1837 def get_events(
1838 self, obj=None, tmin=None, tmax=None, time=None, codes=None):
1840 '''
1841 Get events matching given constraints.
1843 %(query_args)s
1845 :returns:
1846 List of :py:class:`~pyrocko.model.event.Event` objects.
1848 See :py:meth:`iter_nuts` for details on time span matching.
1849 '''
1851 args = self._get_selection_args(obj, tmin, tmax, time, codes)
1852 nuts = sorted(
1853 self.iter_nuts('event', *args), key=lambda nut: nut.dkey)
1854 self._check_duplicates(nuts)
1855 return [self.get_content(nut) for nut in nuts]
1857 def _redeem_promises(self, *args):
1859 tmin, tmax, _ = args
1861 waveforms = list(self.iter_nuts('waveform', *args))
1862 promises = list(self.iter_nuts('waveform_promise', *args))
1864 codes_to_avail = defaultdict(list)
1865 for nut in waveforms:
1866 codes_to_avail[nut.codes].append((nut.tmin, nut.tmax))
1868 def tts(x):
1869 if isinstance(x, tuple):
1870 return tuple(tts(e) for e in x)
1871 elif isinstance(x, list):
1872 return list(tts(e) for e in x)
1873 else:
1874 return util.time_to_str(x)
1876 orders = []
1877 for promise in promises:
1878 waveforms_avail = codes_to_avail[promise.codes]
1879 for block_tmin, block_tmax in blocks(
1880 max(tmin, promise.tmin),
1881 min(tmax, promise.tmax),
1882 promise.deltat):
1884 orders.append(
1885 WaveformOrder(
1886 source_id=promise.file_path,
1887 codes=tuple(promise.codes.split(separator)),
1888 tmin=block_tmin,
1889 tmax=block_tmax,
1890 deltat=promise.deltat,
1891 gaps=gaps(waveforms_avail, block_tmin, block_tmax)))
1893 orders_noop, orders = lpick(lambda order: order.gaps, orders)
1895 order_keys_noop = set(order_key(order) for order in orders_noop)
1896 if len(order_keys_noop) != 0 or len(orders_noop) != 0:
1897 logger.info(
1898 'Waveform orders already satisified with cached/local data: '
1899 '%i (%i)' % (len(order_keys_noop), len(orders_noop)))
1901 source_ids = []
1902 sources = {}
1903 for source in self._sources:
1904 if isinstance(source, fdsn.FDSNSource):
1905 source_ids.append(source._source_id)
1906 sources[source._source_id] = source
1908 source_priority = dict(
1909 (source_id, i) for (i, source_id) in enumerate(source_ids))
1911 order_groups = defaultdict(list)
1912 for order in orders:
1913 order_groups[order_key(order)].append(order)
1915 for k, order_group in order_groups.items():
1916 order_group.sort(
1917 key=lambda order: source_priority[order.source_id])
1919 n_order_groups = len(order_groups)
1921 if len(order_groups) != 0 or len(orders) != 0:
1922 logger.info(
1923 'Waveform orders standing for download: %i (%i)'
1924 % (len(order_groups), len(orders)))
1926 task = make_task('Waveform orders processed', n_order_groups)
1927 else:
1928 task = None
1930 def split_promise(order):
1931 self._split_nuts(
1932 'waveform_promise',
1933 order.tmin, order.tmax,
1934 codes=order.codes,
1935 path=order.source_id)
1937 def release_order_group(order):
1938 okey = order_key(order)
1939 for followup in order_groups[okey]:
1940 split_promise(followup)
1942 del order_groups[okey]
1944 if task:
1945 task.update(n_order_groups - len(order_groups))
1947 def noop(order):
1948 pass
1950 def success(order):
1951 release_order_group(order)
1952 split_promise(order)
1954 def batch_add(paths):
1955 self.add(paths)
1957 calls = queue.Queue()
1959 def enqueue(f):
1960 def wrapper(*args):
1961 calls.put((f, args))
1963 return wrapper
1965 for order in orders_noop:
1966 split_promise(order)
1968 while order_groups:
1970 orders_now = []
1971 empty = []
1972 for k, order_group in order_groups.items():
1973 try:
1974 orders_now.append(order_group.pop(0))
1975 except IndexError:
1976 empty.append(k)
1978 for k in empty:
1979 del order_groups[k]
1981 by_source_id = defaultdict(list)
1982 for order in orders_now:
1983 by_source_id[order.source_id].append(order)
1985 threads = []
1986 for source_id in by_source_id:
1987 def download():
1988 try:
1989 sources[source_id].download_waveforms(
1990 by_source_id[source_id],
1991 success=enqueue(success),
1992 error_permanent=enqueue(split_promise),
1993 error_temporary=noop,
1994 batch_add=enqueue(batch_add))
1996 finally:
1997 calls.put(None)
1999 thread = threading.Thread(target=download)
2000 thread.start()
2001 threads.append(thread)
2003 ndone = 0
2004 while ndone < len(threads):
2005 ret = calls.get()
2006 if ret is None:
2007 ndone += 1
2008 else:
2009 ret[0](*ret[1])
2011 for thread in threads:
2012 thread.join()
2014 if task:
2015 task.update(n_order_groups - len(order_groups))
2017 if task:
2018 task.done()
2020 @filldocs
2021 def get_waveform_nuts(
2022 self, obj=None, tmin=None, tmax=None, time=None, codes=None):
2024 '''
2025 Get waveform content entities matching given constraints.
2027 %(query_args)s
2029 Like :py:meth:`get_nuts` with ``kind='waveform'`` but additionally
2030 resolves matching waveform promises (downloads waveforms from remote
2031 sources).
2033 See :py:meth:`iter_nuts` for details on time span matching.
2034 '''
2036 args = self._get_selection_args(obj, tmin, tmax, time, codes)
2037 self._redeem_promises(*args)
2038 return sorted(
2039 self.iter_nuts('waveform', *args), key=lambda nut: nut.dkey)
2041 @filldocs
2042 def get_waveforms(
2043 self, obj=None, tmin=None, tmax=None, time=None, codes=None,
2044 uncut=False, want_incomplete=True, degap=True, maxgap=5,
2045 maxlap=None, snap=None, include_last=False, load_data=True,
2046 accessor_id='default', operator_params=None):
2048 '''
2049 Get waveforms matching given constraints.
2051 %(query_args)s
2053 :param uncut:
2054 Set to ``True``, to disable cutting traces to [``tmin``, ``tmax``]
2055 and to disable degapping/deoverlapping. Returns untouched traces as
2056 they are read from file segment. File segments are always read in
2057 their entirety.
2058 :type uncut:
2059 bool
2061 :param want_incomplete:
2062 If ``True``, gappy/incomplete traces are included in the result.
2063 :type want_incomplete:
2064 bool
2066 :param degap:
2067 If ``True``, connect traces and remove gaps and overlaps.
2068 :type degap:
2069 bool
2071 :param maxgap:
2072 Maximum gap size in samples which is filled with interpolated
2073 samples when ``degap`` is ``True``.
2074 :type maxgap:
2075 int
2077 :param maxlap:
2078 Maximum overlap size in samples which is removed when ``degap`` is
2079 ``True``.
2080 :type maxlap:
2081 int
2083 :param snap:
2084 Rounding functions used when computing sample index from time
2085 instance, for trace start and trace end, respectively. By default,
2086 ``(round, round)`` is used.
2087 :type snap:
2088 tuple of 2 callables
2090 :param include_last:
2091 If ``True``, add one more sample to the returned traces (the sample
2092 which would be the first sample of a query with ``tmin`` set to the
2093 current value of ``tmax``).
2094 :type include_last:
2095 bool
2097 :param load_data:
2098 If ``True``, waveform data samples are read from files (or cache).
2099 If ``False``, meta-information-only traces are returned (dummy
2100 traces with no data samples).
2101 :type load_data:
2102 bool
2104 :param accessor_id:
2105 Name of consumer on who's behalf data is accessed. Used in cache
2106 management (see :py:mod:`~pyrocko.squirrel.cache`). Used as a key
2107 to distinguish different points of extraction for the decision of
2108 when to release cached waveform data. Should be used when data is
2109 alternately extracted from more than one region / selection.
2110 :type accessor_id:
2111 str
2113 See :py:meth:`iter_nuts` for details on time span matching.
2115 Loaded data is kept in memory (at least) until
2116 :py:meth:`clear_accessor` has been called or
2117 :py:meth:`advance_accessor` has been called two consecutive times
2118 without data being accessed between the two calls (by this accessor).
2119 Data may still be further kept in the memory cache if held alive by
2120 consumers with a different ``accessor_id``.
2121 '''
2123 tmin, tmax, codes = self._get_selection_args(
2124 obj, tmin, tmax, time, codes)
2126 self_tmin, self_tmax = self.get_time_span(
2127 ['waveform', 'waveform_promise'])
2129 if None in (self_tmin, self_tmax):
2130 logger.warning(
2131 'No waveforms available.')
2132 return []
2134 tmin = tmin if tmin is not None else self_tmin
2135 tmax = tmax if tmax is not None else self_tmax
2137 if codes is not None:
2138 operator = self.get_operator(codes)
2139 if operator is not None:
2140 return operator.get_waveforms(
2141 self, codes,
2142 tmin=tmin, tmax=tmax,
2143 uncut=uncut, want_incomplete=want_incomplete, degap=degap,
2144 maxgap=maxgap, maxlap=maxlap, snap=snap,
2145 include_last=include_last, load_data=load_data,
2146 accessor_id=accessor_id, params=operator_params)
2148 nuts = self.get_waveform_nuts(obj, tmin, tmax, time, codes)
2150 if load_data:
2151 traces = [
2152 self.get_content(nut, 'waveform', accessor_id) for nut in nuts]
2154 else:
2155 traces = [
2156 trace.Trace(**nut.trace_kwargs) for nut in nuts]
2158 if uncut:
2159 return traces
2161 if snap is None:
2162 snap = (round, round)
2164 chopped = []
2165 for tr in traces:
2166 if not load_data and tr.ydata is not None:
2167 tr = tr.copy(data=False)
2168 tr.ydata = None
2170 try:
2171 chopped.append(tr.chop(
2172 tmin, tmax,
2173 inplace=False,
2174 snap=snap,
2175 include_last=include_last))
2177 except trace.NoData:
2178 pass
2180 processed = self._process_chopped(
2181 chopped, degap, maxgap, maxlap, want_incomplete, tmin, tmax)
2183 return processed
2185 @filldocs
2186 def chopper_waveforms(
2187 self, obj=None, tmin=None, tmax=None, time=None, codes=None,
2188 tinc=None, tpad=0.,
2189 want_incomplete=True, snap_window=False,
2190 degap=True, maxgap=5, maxlap=None,
2191 snap=None, include_last=False, load_data=True,
2192 accessor_id=None, clear_accessor=True, operator_params=None):
2194 '''
2195 Iterate window-wise over waveform archive.
2197 %(query_args)s
2199 :param tinc:
2200 Time increment (window shift time) (default uses ``tmax-tmin``).
2201 :type tinc:
2202 timestamp
2204 :param tpad:
2205 Padding time appended on either side of the data window (window
2206 overlap is ``2*tpad``).
2207 :type tpad:
2208 timestamp
2210 :param want_incomplete:
2211 If ``True``, gappy/incomplete traces are included in the result.
2212 :type want_incomplete:
2213 bool
2215 :param snap_window:
2216 If ``True``, start time windows at multiples of tinc with respect
2217 to system time zero.
2218 :type snap_window:
2219 bool
2221 :param degap:
2222 If ``True``, connect traces and remove gaps and overlaps.
2223 :type degap:
2224 bool
2226 :param maxgap:
2227 Maximum gap size in samples which is filled with interpolated
2228 samples when ``degap`` is ``True``.
2229 :type maxgap:
2230 int
2232 :param maxlap:
2233 Maximum overlap size in samples which is removed when ``degap`` is
2234 ``True``.
2235 :type maxlap:
2236 int
2238 :param snap:
2239 Rounding functions used when computing sample index from time
2240 instance, for trace start and trace end, respectively. By default,
2241 ``(round, round)`` is used.
2242 :type snap:
2243 tuple of 2 callables
2245 :param include_last:
2246 If ``True``, add one more sample to the returned traces (the sample
2247 which would be the first sample of a query with ``tmin`` set to the
2248 current value of ``tmax``).
2249 :type include_last:
2250 bool
2252 :param load_data:
2253 If ``True``, waveform data samples are read from files (or cache).
2254 If ``False``, meta-information-only traces are returned (dummy
2255 traces with no data samples).
2256 :type load_data:
2257 bool
2259 :param accessor_id:
2260 Name of consumer on who's behalf data is accessed. Used in cache
2261 management (see :py:mod:`~pyrocko.squirrel.cache`). Used as a key
2262 to distinguish different points of extraction for the decision of
2263 when to release cached waveform data. Should be used when data is
2264 alternately extracted from more than one region / selection.
2265 :type accessor_id:
2266 str
2268 :param clear_accessor:
2269 If ``True`` (default), :py:meth:`clear_accessor` is called when the
2270 chopper finishes. Set to ``False`` to keep loaded waveforms in
2271 memory when the generator returns.
2272 :type clear_accessor:
2273 bool
2275 :yields:
2276 A list of :py:class:`~pyrocko.trace.Trace` objects for every
2277 extracted time window.
2279 See :py:meth:`iter_nuts` for details on time span matching.
2280 '''
2282 tmin, tmax, codes = self._get_selection_args(
2283 obj, tmin, tmax, time, codes)
2285 self_tmin, self_tmax = self.get_time_span(
2286 ['waveform', 'waveform_promise'])
2288 if None in (self_tmin, self_tmax):
2289 logger.warning(
2290 'Content has undefined time span. No waveforms and no '
2291 'waveform promises?')
2292 return
2294 if snap_window and tinc is not None:
2295 tmin = tmin if tmin is not None else self_tmin
2296 tmax = tmax if tmax is not None else self_tmax
2297 tmin = math.floor(tmin / tinc) * tinc
2298 tmax = math.ceil(tmax / tinc) * tinc
2299 else:
2300 tmin = tmin if tmin is not None else self_tmin + tpad
2301 tmax = tmax if tmax is not None else self_tmax - tpad
2303 tinc = tinc if tinc is not None else tmax - tmin
2305 try:
2306 if accessor_id is None:
2307 accessor_id = 'chopper%i' % self._n_choppers_active
2309 self._n_choppers_active += 1
2311 eps = tinc * 1e-6
2312 if tinc != 0.0:
2313 nwin = int(((tmax - eps) - tmin) / tinc) + 1
2314 else:
2315 nwin = 1
2317 for iwin in range(nwin):
2318 wmin, wmax = tmin+iwin*tinc, min(tmin+(iwin+1)*tinc, tmax)
2319 chopped = []
2320 wmin, wmax = tmin+iwin*tinc, min(tmin+(iwin+1)*tinc, tmax)
2321 eps = tinc*1e-6
2322 if wmin >= tmax-eps:
2323 break
2325 chopped = self.get_waveforms(
2326 tmin=wmin-tpad,
2327 tmax=wmax+tpad,
2328 codes=codes,
2329 snap=snap,
2330 include_last=include_last,
2331 load_data=load_data,
2332 want_incomplete=want_incomplete,
2333 degap=degap,
2334 maxgap=maxgap,
2335 maxlap=maxlap,
2336 accessor_id=accessor_id,
2337 operator_params=operator_params)
2339 self.advance_accessor(accessor_id)
2341 yield Batch(
2342 tmin=wmin,
2343 tmax=wmax,
2344 i=iwin,
2345 n=nwin,
2346 traces=chopped)
2348 iwin += 1
2350 finally:
2351 self._n_choppers_active -= 1
2352 if clear_accessor:
2353 self.clear_accessor(accessor_id, 'waveform')
2355 def _process_chopped(
2356 self, chopped, degap, maxgap, maxlap, want_incomplete, tmin, tmax):
2358 chopped.sort(key=lambda a: a.full_id)
2359 if degap:
2360 chopped = trace.degapper(chopped, maxgap=maxgap, maxlap=maxlap)
2362 if not want_incomplete:
2363 chopped_weeded = []
2364 for tr in chopped:
2365 emin = tr.tmin - tmin
2366 emax = tr.tmax + tr.deltat - tmax
2367 if (abs(emin) <= 0.5*tr.deltat and abs(emax) <= 0.5*tr.deltat):
2368 chopped_weeded.append(tr)
2370 elif degap:
2371 if (0. < emin <= 5. * tr.deltat
2372 and -5. * tr.deltat <= emax < 0.):
2374 tr.extend(tmin, tmax-tr.deltat, fillmethod='repeat')
2375 chopped_weeded.append(tr)
2377 chopped = chopped_weeded
2379 return chopped
2381 def _get_pyrocko_stations(
2382 self, obj=None, tmin=None, tmax=None, time=None, codes=None):
2384 from pyrocko import model as pmodel
2386 by_nsl = defaultdict(lambda: (list(), list()))
2387 for station in self.get_stations(obj, tmin, tmax, time, codes):
2388 sargs = station._get_pyrocko_station_args()
2389 nsl = sargs[1:4]
2390 by_nsl[nsl][0].append(sargs)
2392 for channel in self.get_channels(obj, tmin, tmax, time, codes):
2393 sargs = channel._get_pyrocko_station_args()
2394 nsl = sargs[1:4]
2395 sargs_list, channels_list = by_nsl[nsl]
2396 sargs_list.append(sargs)
2397 channels_list.append(channel)
2399 pstations = []
2400 nsls = list(by_nsl.keys())
2401 nsls.sort()
2402 for nsl in nsls:
2403 sargs_list, channels_list = by_nsl[nsl]
2404 sargs = util.consistency_merge(sargs_list)
2406 by_c = defaultdict(list)
2407 for ch in channels_list:
2408 by_c[ch.channel].append(ch._get_pyrocko_channel_args())
2410 chas = list(by_c.keys())
2411 chas.sort()
2412 pchannels = []
2413 for cha in chas:
2414 list_of_cargs = by_c[cha]
2415 cargs = util.consistency_merge(list_of_cargs)
2416 pchannels.append(pmodel.Channel(
2417 name=cargs[0],
2418 azimuth=cargs[1],
2419 dip=cargs[2]))
2421 pstations.append(pmodel.Station(
2422 network=sargs[0],
2423 station=sargs[1],
2424 location=sargs[2],
2425 lat=sargs[3],
2426 lon=sargs[4],
2427 elevation=sargs[5],
2428 depth=sargs[6] or 0.0,
2429 channels=pchannels))
2431 return pstations
2433 @property
2434 def pile(self):
2436 '''
2437 Emulates the older :py:class:`pyrocko.pile.Pile` interface.
2439 This property exposes a :py:class:`pyrocko.squirrel.pile.Pile` object,
2440 which emulates most of the older :py:class:`pyrocko.pile.Pile` methods
2441 but uses the fluffy power of the Squirrel under the hood.
2443 This interface can be used as a drop-in replacement for piles which are
2444 used in existing scripts and programs for efficient waveform data
2445 access. The Squirrel-based pile scales better for large datasets. Newer
2446 scripts should use Squirrel's native methods to avoid the emulation
2447 overhead.
2448 '''
2449 from . import pile
2451 if self._pile is None:
2452 self._pile = pile.Pile(self)
2454 return self._pile
2456 def snuffle(self):
2457 '''
2458 Look at dataset in Snuffler.
2459 '''
2460 self.pile.snuffle()
2462 def _gather_codes_keys(self, kind, gather, selector):
2463 return set(
2464 gather(codes)
2465 for codes in self.iter_codes(kind)
2466 if selector is None or selector(codes))
2468 def __str__(self):
2469 return str(self.get_stats())
2471 def get_coverage(
2472 self, kind, tmin=None, tmax=None, codes_list=None, limit=None,
2473 return_raw=True):
2475 '''
2476 Get coverage information.
2478 Get information about strips of gapless data coverage.
2480 :param kind:
2481 Content kind to be queried.
2482 :type kind:
2483 str
2485 :param tmin:
2486 Start time of query interval.
2487 :type tmin:
2488 timestamp
2490 :param tmax:
2491 End time of query interval.
2492 :type tmax:
2493 timestamp
2495 :param codes_list:
2496 List of code patterns to query. If not given or empty, an empty
2497 list is returned.
2498 :type codes_list:
2499 :py:class:`list` of :py:class:`tuple` of :py:class:`str`
2501 :param limit:
2502 Limit query to return only up to a given maximum number of entries
2503 per matching channel (without setting this option, very gappy data
2504 could cause the query to execute for a very long time).
2505 :type limit:
2506 int
2508 :returns:
2509 List of entries of the form ``(pattern, codes, deltat, tmin, tmax,
2510 data)`` where ``pattern`` is the request code pattern which
2511 yielded this entry, ``codes`` are the matching channel codes,
2512 ``tmin`` and ``tmax`` are the global min and max times for which
2513 data for this channel is available, regardless of any time
2514 restrictions in the query. ``data`` is a list with (up to
2515 ``limit``) change-points of the form ``(time, count)`` where a
2516 ``count`` of zero indicates a data gap, a value of 1 normal data
2517 coverage and higher values indicate duplicate/redundant data.
2518 '''
2520 tmin_seconds, tmin_offset = model.tsplit(tmin)
2521 tmax_seconds, tmax_offset = model.tsplit(tmax)
2522 kind_id = to_kind_id(kind)
2524 if codes_list is None:
2525 codes_list = self.get_codes(kind=kind)
2527 kdata_all = []
2528 for pattern in codes_list:
2529 kdata = self.glob_codes(kind, [pattern])
2530 for row in kdata:
2531 row[0:0] = [pattern]
2533 kdata_all.extend(kdata)
2535 kind_codes_ids = [x[1] for x in kdata_all]
2537 counts_at_tmin = {}
2538 if tmin is not None:
2539 for nut in self.iter_nuts(
2540 kind, tmin, tmin, kind_codes_ids=kind_codes_ids):
2542 k = nut.codes, nut.deltat
2543 if k not in counts_at_tmin:
2544 counts_at_tmin[k] = 0
2546 counts_at_tmin[k] += 1
2548 coverage = []
2549 for pattern, kind_codes_id, codes, deltat in kdata_all:
2550 entry = [pattern, codes, deltat, None, None, []]
2551 for i, order in [(0, 'ASC'), (1, 'DESC')]:
2552 sql = self._sql('''
2553 SELECT
2554 time_seconds,
2555 time_offset
2556 FROM %(db)s.%(coverage)s
2557 WHERE
2558 kind_codes_id == ?
2559 ORDER BY
2560 kind_codes_id ''' + order + ''',
2561 time_seconds ''' + order + ''',
2562 time_offset ''' + order + '''
2563 LIMIT 1
2564 ''')
2566 for row in self._conn.execute(sql, [kind_codes_id]):
2567 entry[3+i] = model.tjoin(row[0], row[1])
2569 if None in entry[3:5]:
2570 continue
2572 args = [kind_codes_id]
2574 sql_time = ''
2575 if tmin is not None:
2576 # intentionally < because (== tmin) is queried from nuts
2577 sql_time += ' AND ( ? < time_seconds ' \
2578 'OR ( ? == time_seconds AND ? < time_offset ) ) '
2579 args.extend([tmin_seconds, tmin_seconds, tmin_offset])
2581 if tmax is not None:
2582 sql_time += ' AND ( time_seconds < ? ' \
2583 'OR ( ? == time_seconds AND time_offset <= ? ) ) '
2584 args.extend([tmax_seconds, tmax_seconds, tmax_offset])
2586 sql_limit = ''
2587 if limit is not None:
2588 sql_limit = ' LIMIT ?'
2589 args.append(limit)
2591 sql = self._sql('''
2592 SELECT
2593 time_seconds,
2594 time_offset,
2595 step
2596 FROM %(db)s.%(coverage)s
2597 WHERE
2598 kind_codes_id == ?
2599 ''' + sql_time + '''
2600 ORDER BY
2601 kind_codes_id,
2602 time_seconds,
2603 time_offset
2604 ''' + sql_limit)
2606 rows = list(self._conn.execute(sql, args))
2608 if limit is not None and len(rows) == limit:
2609 entry[-1] = None
2610 else:
2611 counts = counts_at_tmin.get((codes, deltat), 0)
2612 tlast = None
2613 if tmin is not None:
2614 entry[-1].append((tmin, counts))
2615 tlast = tmin
2617 for row in rows:
2618 t = model.tjoin(row[0], row[1])
2619 counts += row[2]
2620 entry[-1].append((t, counts))
2621 tlast = t
2623 if tmax is not None and (tlast is None or tlast != tmax):
2624 entry[-1].append((tmax, counts))
2626 coverage.append(entry)
2628 if return_raw:
2629 return coverage
2630 else:
2631 return [model.Coverage.from_values(
2632 entry + [kind_id]) for entry in coverage]
2634 def add_operator(self, op):
2635 self._operators.append(op)
2637 def update_operator_mappings(self):
2638 available = [
2639 separator.join(codes)
2640 for codes in self.get_codes(kind=('channel'))]
2642 for operator in self._operators:
2643 operator.update_mappings(available, self._operator_registry)
2645 def iter_operator_mappings(self):
2646 for operator in self._operators:
2647 for in_codes, out_codes in operator.iter_mappings():
2648 yield operator, in_codes, out_codes
2650 def get_operator_mappings(self):
2651 return list(self.iter_operator_mappings())
2653 def get_operator(self, codes):
2654 if isinstance(codes, tuple):
2655 codes = separator.join(codes)
2656 try:
2657 return self._operator_registry[codes][0]
2658 except KeyError:
2659 return None
2661 def get_operator_group(self, codes):
2662 if isinstance(codes, tuple):
2663 codes = separator.join(codes)
2664 try:
2665 return self._operator_registry[codes]
2666 except KeyError:
2667 return None, (None, None, None)
2669 def iter_operator_codes(self):
2670 for _, _, out_codes in self.iter_operator_mappings():
2671 for codes in out_codes:
2672 yield tuple(codes.split(separator))
2674 def get_operator_codes(self):
2675 return list(self.iter_operator_codes())
2677 def print_tables(self, table_names=None, stream=None):
2678 '''
2679 Dump raw database tables in textual form (for debugging purposes).
2681 :param table_names:
2682 Names of tables to be dumped or ``None`` to dump all.
2683 :type table_names:
2684 :py:class:`list` of :py:class:`str`
2686 :param stream:
2687 Open file or ``None`` to dump to standard output.
2688 '''
2690 if stream is None:
2691 stream = sys.stdout
2693 if isinstance(table_names, str):
2694 table_names = [table_names]
2696 if table_names is None:
2697 table_names = [
2698 'selection_file_states',
2699 'selection_nuts',
2700 'selection_kind_codes_count',
2701 'files', 'nuts', 'kind_codes', 'kind_codes_count']
2703 m = {
2704 'selection_file_states': '%(db)s.%(file_states)s',
2705 'selection_nuts': '%(db)s.%(nuts)s',
2706 'selection_kind_codes_count': '%(db)s.%(kind_codes_count)s',
2707 'files': 'files',
2708 'nuts': 'nuts',
2709 'kind_codes': 'kind_codes',
2710 'kind_codes_count': 'kind_codes_count'}
2712 for table_name in table_names:
2713 self._database.print_table(
2714 m[table_name] % self._names, stream=stream)
2717class SquirrelStats(Object):
2718 '''
2719 Container to hold statistics about contents available from a Squirrel.
2721 See also :py:meth:`Squirrel.get_stats`.
2722 '''
2724 nfiles = Int.T(
2725 help='Number of files in selection.')
2726 nnuts = Int.T(
2727 help='Number of index nuts in selection.')
2728 codes = List.T(
2729 Tuple.T(content_t=String.T()),
2730 help='Available code sequences in selection, e.g. '
2731 '(agency, network, station, location) for stations nuts.')
2732 kinds = List.T(
2733 String.T(),
2734 help='Available content types in selection.')
2735 total_size = Int.T(
2736 help='Aggregated file size of files is selection.')
2737 counts = Dict.T(
2738 String.T(), Dict.T(Tuple.T(content_t=String.T()), Int.T()),
2739 help='Breakdown of how many nuts of any content type and code '
2740 'sequence are available in selection, ``counts[kind][codes]``.')
2741 time_spans = Dict.T(
2742 String.T(), Tuple.T(content_t=Timestamp.T()),
2743 help='Time spans by content type.')
2744 sources = List.T(
2745 String.T(),
2746 help='Descriptions of attached sources.')
2747 operators = List.T(
2748 String.T(),
2749 help='Descriptions of attached operators.')
2751 def __str__(self):
2752 kind_counts = dict(
2753 (kind, sum(self.counts[kind].values())) for kind in self.kinds)
2755 scodes = model.codes_to_str_abbreviated(self.codes)
2757 ssources = '<none>' if not self.sources else '\n' + '\n'.join(
2758 ' ' + s for s in self.sources)
2760 soperators = '<none>' if not self.operators else '\n' + '\n'.join(
2761 ' ' + s for s in self.operators)
2763 def stime(t):
2764 return util.tts(t) if t is not None and t not in (
2765 model.g_tmin, model.g_tmax) else '<none>'
2767 def stable(rows):
2768 ns = [max(len(w) for w in col) for col in zip(*rows)]
2769 return '\n'.join(
2770 ' '.join(w.ljust(n) for n, w in zip(ns, row))
2771 for row in rows)
2773 def indent(s):
2774 return '\n'.join(' '+line for line in s.splitlines())
2776 stspans = '<none>' if not self.kinds else '\n' + indent(stable([(
2777 kind + ':',
2778 str(kind_counts[kind]),
2779 stime(self.time_spans[kind][0]),
2780 '-',
2781 stime(self.time_spans[kind][1])) for kind in sorted(self.kinds)]))
2783 s = '''
2784Number of files: %i
2785Total size of known files: %s
2786Number of index nuts: %i
2787Available content kinds: %s
2788Available codes: %s
2789Sources: %s
2790Operators: %s''' % (
2791 self.nfiles,
2792 util.human_bytesize(self.total_size),
2793 self.nnuts,
2794 stspans, scodes, ssources, soperators)
2796 return s.lstrip()
2799__all__ = [
2800 'Squirrel',
2801 'SquirrelStats',
2802]