1# http://pyrocko.org - GPLv3
2#
3# The Pyrocko Developers, 21st Century
4# ---|P------/S----------~Lg----------
6from __future__ import absolute_import, print_function
8import sys
9import os
11import math
12import logging
13import threading
14import queue
15from collections import defaultdict
17from pyrocko.guts import Object, Int, List, Tuple, String, Timestamp, Dict
18from pyrocko import util, trace
19from pyrocko.progress import progress
21from . import model, io, cache, dataset
23from .model import to_kind_id, separator, WaveformOrder
24from .client import fdsn, catalog
25from .selection import Selection, filldocs
26from .database import abspath
27from . import client, environment, error
29logger = logging.getLogger('psq.base')
31guts_prefix = 'squirrel'
34def make_task(*args):
35 return progress.task(*args, logger=logger)
38def lpick(condition, seq):
39 ft = [], []
40 for ele in seq:
41 ft[int(bool(condition(ele)))].append(ele)
43 return ft
46def codes_fill(n, codes):
47 return codes[:n] + ('*',) * (n-len(codes))
50c_kind_to_ncodes = {
51 'station': 4,
52 'channel': 6,
53 'response': 6,
54 'waveform': 6,
55 'event': 1,
56 'waveform_promise': 6,
57 'undefined': 1}
60c_inflated = ['', '*', '*', '*', '*', '*']
61c_offsets = [0, 2, 1, 1, 1, 1, 0]
64def codes_inflate(codes):
65 codes = codes[:6]
66 inflated = list(c_inflated)
67 ncodes = len(codes)
68 offset = c_offsets[ncodes]
69 inflated[offset:offset+ncodes] = codes
70 return inflated
73def codes_inflate2(codes):
74 inflated = list(c_inflated)
75 ncodes = len(codes)
76 inflated[:ncodes] = codes
77 return tuple(inflated)
80def codes_patterns_for_kind(kind, codes):
81 if not codes:
82 return []
84 if not isinstance(codes[0], str):
85 out = []
86 for subcodes in codes:
87 out.extend(codes_patterns_for_kind(kind, subcodes))
88 return out
90 if kind in ('event', 'undefined'):
91 return [codes]
93 cfill = codes_inflate(codes)[:c_kind_to_ncodes[kind]]
95 if kind == 'station':
96 cfill2 = list(cfill)
97 cfill2[3] = '[*]'
98 return [cfill, cfill2]
100 return [cfill]
103def group_channels(channels):
104 groups = defaultdict(list)
105 for channel in channels:
106 codes = channel.codes
107 gcodes = codes[:-1] + (codes[-1][:-1],)
108 groups[gcodes].append(channel)
110 return groups
113def pyrocko_station_from_channel_group(group, extra_args):
114 list_of_args = [channel._get_pyrocko_station_args() for channel in group]
115 args = util.consistency_merge(list_of_args + extra_args)
116 from pyrocko import model as pmodel
117 return pmodel.Station(
118 network=args[0],
119 station=args[1],
120 location=args[2],
121 lat=args[3],
122 lon=args[4],
123 elevation=args[5],
124 depth=args[6],
125 channels=[ch.get_pyrocko_channel() for ch in group])
128def blocks(tmin, tmax, deltat, nsamples_block=100000):
129 tblock = deltat * nsamples_block
130 iblock_min = int(math.floor(tmin / tblock))
131 iblock_max = int(math.ceil(tmax / tblock))
132 for iblock in range(iblock_min, iblock_max):
133 yield iblock * tblock, (iblock+1) * tblock
136def gaps(avail, tmin, tmax):
137 assert tmin < tmax
139 data = [(tmax, 1), (tmin, -1)]
140 for (tmin_a, tmax_a) in avail:
141 assert tmin_a < tmax_a
142 data.append((tmin_a, 1))
143 data.append((tmax_a, -1))
145 data.sort()
146 s = 1
147 gaps = []
148 tmin_g = None
149 for t, x in data:
150 if s == 1 and x == -1:
151 tmin_g = t
152 elif s == 0 and x == 1 and tmin_g is not None:
153 tmax_g = t
154 if tmin_g != tmax_g:
155 gaps.append((tmin_g, tmax_g))
157 s += x
159 return gaps
162def order_key(order):
163 return (order.codes, order.tmin, order.tmax)
166class Batch(object):
167 '''
168 Batch of waveforms from window-wise data extraction.
170 Encapsulates state and results yielded for each window in window-wise
171 waveform extraction with the :py:meth:`Squirrel.chopper_waveforms` method.
173 *Attributes:*
175 .. py:attribute:: tmin
177 Start of this time window.
179 .. py:attribute:: tmax
181 End of this time window.
183 .. py:attribute:: i
185 Index of this time window in sequence.
187 .. py:attribute:: n
189 Total number of time windows in sequence.
191 .. py:attribute:: traces
193 Extracted waveforms for this time window.
194 '''
196 def __init__(self, tmin, tmax, i, n, traces):
197 self.tmin = tmin
198 self.tmax = tmax
199 self.i = i
200 self.n = n
201 self.traces = traces
204class Squirrel(Selection):
205 '''
206 Prompt, lazy, indexing, caching, dynamic seismological dataset access.
208 :param env:
209 Squirrel environment instance or directory path to use as starting
210 point for its detection. By default, the current directory is used as
211 starting point. When searching for a usable environment the directory
212 ``'.squirrel'`` or ``'squirrel'`` in the current (or starting point)
213 directory is used if it exists, otherwise the parent directories are
214 search upwards for the existence of such a directory. If no such
215 directory is found, the user's global Squirrel environment
216 ``'$HOME/.pyrocko/squirrel'`` is used.
217 :type env:
218 :py:class:`~pyrocko.squirrel.environment.Environment` or
219 :py:class:`str`
221 :param database:
222 Database instance or path to database. By default the
223 database found in the detected Squirrel environment is used.
224 :type database:
225 :py:class:`~pyrocko.squirrel.database.Database` or :py:class:`str`
227 :param cache_path:
228 Directory path to use for data caching. By default, the ``'cache'``
229 directory in the detected Squirrel environment is used.
230 :type cache_path:
231 :py:class:`str`
233 :param persistent:
234 If given a name, create a persistent selection.
235 :type persistent:
236 :py:class:`str`
238 This is the central class of the Squirrel framework. It provides a unified
239 interface to query and access seismic waveforms, station meta-data and
240 event information from local file collections and remote data sources. For
241 prompt responses, a profound database setup is used under the hood. To
242 speed up assemblage of ad-hoc data selections, files are indexed on first
243 use and the extracted meta-data is remembered in the database for
244 subsequent accesses. Bulk data is lazily loaded from disk and remote
245 sources, just when requested. Once loaded, data is cached in memory to
246 expedite typical access patterns. Files and data sources can be dynamically
247 added to and removed from the Squirrel selection at runtime.
249 Queries are restricted to the contents of the files currently added to the
250 Squirrel selection (usually a subset of the file meta-information
251 collection in the database). This list of files is referred to here as the
252 "selection". By default, temporary tables are created in the attached
253 database to hold the names of the files in the selection as well as various
254 indices and counters. These tables are only visible inside the application
255 which created them and are deleted when the database connection is closed
256 or the application exits. To create a selection which is not deleted at
257 exit, supply a name to the ``persistent`` argument of the Squirrel
258 constructor. Persistent selections are shared among applications using the
259 same database.
261 **Method summary**
263 Some of the methods are implemented in :py:class:`Squirrel`'s base class
264 :py:class:`~pyrocko.squirrel.selection.Selection`.
266 .. autosummary::
268 ~Squirrel.add
269 ~Squirrel.add_source
270 ~Squirrel.add_fdsn
271 ~Squirrel.add_catalog
272 ~Squirrel.add_dataset
273 ~Squirrel.add_virtual
274 ~Squirrel.update
275 ~Squirrel.update_waveform_promises
276 ~Squirrel.advance_accessor
277 ~Squirrel.clear_accessor
278 ~Squirrel.reload
279 ~pyrocko.squirrel.selection.Selection.iter_paths
280 ~Squirrel.iter_nuts
281 ~Squirrel.iter_kinds
282 ~Squirrel.iter_deltats
283 ~Squirrel.iter_codes
284 ~Squirrel.iter_counts
285 ~pyrocko.squirrel.selection.Selection.get_paths
286 ~Squirrel.get_nuts
287 ~Squirrel.get_kinds
288 ~Squirrel.get_deltats
289 ~Squirrel.get_codes
290 ~Squirrel.get_counts
291 ~Squirrel.get_time_span
292 ~Squirrel.get_deltat_span
293 ~Squirrel.get_nfiles
294 ~Squirrel.get_nnuts
295 ~Squirrel.get_total_size
296 ~Squirrel.get_stats
297 ~Squirrel.get_content
298 ~Squirrel.get_stations
299 ~Squirrel.get_channels
300 ~Squirrel.get_responses
301 ~Squirrel.get_events
302 ~Squirrel.get_waveform_nuts
303 ~Squirrel.get_waveforms
304 ~Squirrel.chopper_waveforms
305 ~Squirrel.get_coverage
306 ~Squirrel.pile
307 ~Squirrel.snuffle
308 ~Squirrel.glob_codes
309 ~pyrocko.squirrel.selection.Selection.get_database
310 ~Squirrel.print_tables
311 '''
313 def __init__(
314 self, env=None, database=None, cache_path=None, persistent=None):
316 if not isinstance(env, environment.Environment):
317 env = environment.get_environment(env)
319 if database is None:
320 database = env.expand_path(env.database_path)
322 if cache_path is None:
323 cache_path = env.expand_path(env.cache_path)
325 if persistent is None:
326 persistent = env.persistent
328 Selection.__init__(
329 self, database=database, persistent=persistent)
331 self.get_database().set_basepath(os.path.dirname(env.get_basepath()))
333 self._content_caches = {
334 'waveform': cache.ContentCache(),
335 'default': cache.ContentCache()}
337 self._cache_path = cache_path
339 self._sources = []
340 self._operators = []
341 self._operator_registry = {}
343 self._pile = None
344 self._n_choppers_active = 0
346 self._names.update({
347 'nuts': self.name + '_nuts',
348 'kind_codes_count': self.name + '_kind_codes_count',
349 'coverage': self.name + '_coverage'})
351 with self.transaction() as cursor:
352 self._create_tables_squirrel(cursor)
354 def _create_tables_squirrel(self, cursor):
356 cursor.execute(self._register_table(self._sql(
357 '''
358 CREATE TABLE IF NOT EXISTS %(db)s.%(nuts)s (
359 nut_id integer PRIMARY KEY,
360 file_id integer,
361 file_segment integer,
362 file_element integer,
363 kind_id integer,
364 kind_codes_id integer,
365 tmin_seconds integer,
366 tmin_offset integer,
367 tmax_seconds integer,
368 tmax_offset integer,
369 kscale integer)
370 ''')))
372 cursor.execute(self._register_table(self._sql(
373 '''
374 CREATE TABLE IF NOT EXISTS %(db)s.%(kind_codes_count)s (
375 kind_codes_id integer PRIMARY KEY,
376 count integer)
377 ''')))
379 cursor.execute(self._sql(
380 '''
381 CREATE UNIQUE INDEX IF NOT EXISTS %(db)s.%(nuts)s_file_element
382 ON %(nuts)s (file_id, file_segment, file_element)
383 '''))
385 cursor.execute(self._sql(
386 '''
387 CREATE INDEX IF NOT EXISTS %(db)s.%(nuts)s_index_file_id
388 ON %(nuts)s (file_id)
389 '''))
391 cursor.execute(self._sql(
392 '''
393 CREATE INDEX IF NOT EXISTS %(db)s.%(nuts)s_index_tmin_seconds
394 ON %(nuts)s (kind_id, tmin_seconds)
395 '''))
397 cursor.execute(self._sql(
398 '''
399 CREATE INDEX IF NOT EXISTS %(db)s.%(nuts)s_index_tmax_seconds
400 ON %(nuts)s (kind_id, tmax_seconds)
401 '''))
403 cursor.execute(self._sql(
404 '''
405 CREATE INDEX IF NOT EXISTS %(db)s.%(nuts)s_index_kscale
406 ON %(nuts)s (kind_id, kscale, tmin_seconds)
407 '''))
409 cursor.execute(self._sql(
410 '''
411 CREATE TRIGGER IF NOT EXISTS %(db)s.%(nuts)s_delete_nuts
412 BEFORE DELETE ON main.files FOR EACH ROW
413 BEGIN
414 DELETE FROM %(nuts)s WHERE file_id == old.file_id;
415 END
416 '''))
418 # trigger only on size to make silent update of mtime possible
419 cursor.execute(self._sql(
420 '''
421 CREATE TRIGGER IF NOT EXISTS %(db)s.%(nuts)s_delete_nuts2
422 BEFORE UPDATE OF size ON main.files FOR EACH ROW
423 BEGIN
424 DELETE FROM %(nuts)s WHERE file_id == old.file_id;
425 END
426 '''))
428 cursor.execute(self._sql(
429 '''
430 CREATE TRIGGER IF NOT EXISTS
431 %(db)s.%(file_states)s_delete_files
432 BEFORE DELETE ON %(db)s.%(file_states)s FOR EACH ROW
433 BEGIN
434 DELETE FROM %(nuts)s WHERE file_id == old.file_id;
435 END
436 '''))
438 cursor.execute(self._sql(
439 '''
440 CREATE TRIGGER IF NOT EXISTS %(db)s.%(nuts)s_inc_kind_codes
441 BEFORE INSERT ON %(nuts)s FOR EACH ROW
442 BEGIN
443 INSERT OR IGNORE INTO %(kind_codes_count)s VALUES
444 (new.kind_codes_id, 0);
445 UPDATE %(kind_codes_count)s
446 SET count = count + 1
447 WHERE new.kind_codes_id
448 == %(kind_codes_count)s.kind_codes_id;
449 END
450 '''))
452 cursor.execute(self._sql(
453 '''
454 CREATE TRIGGER IF NOT EXISTS %(db)s.%(nuts)s_dec_kind_codes
455 BEFORE DELETE ON %(nuts)s FOR EACH ROW
456 BEGIN
457 UPDATE %(kind_codes_count)s
458 SET count = count - 1
459 WHERE old.kind_codes_id
460 == %(kind_codes_count)s.kind_codes_id;
461 END
462 '''))
464 cursor.execute(self._register_table(self._sql(
465 '''
466 CREATE TABLE IF NOT EXISTS %(db)s.%(coverage)s (
467 kind_codes_id integer,
468 time_seconds integer,
469 time_offset integer,
470 step integer)
471 ''')))
473 cursor.execute(self._sql(
474 '''
475 CREATE UNIQUE INDEX IF NOT EXISTS %(db)s.%(coverage)s_time
476 ON %(coverage)s (kind_codes_id, time_seconds, time_offset)
477 '''))
479 cursor.execute(self._sql(
480 '''
481 CREATE TRIGGER IF NOT EXISTS %(db)s.%(nuts)s_add_coverage
482 AFTER INSERT ON %(nuts)s FOR EACH ROW
483 BEGIN
484 INSERT OR IGNORE INTO %(coverage)s VALUES
485 (new.kind_codes_id, new.tmin_seconds, new.tmin_offset, 0)
486 ;
487 UPDATE %(coverage)s
488 SET step = step + 1
489 WHERE new.kind_codes_id == %(coverage)s.kind_codes_id
490 AND new.tmin_seconds == %(coverage)s.time_seconds
491 AND new.tmin_offset == %(coverage)s.time_offset
492 ;
493 INSERT OR IGNORE INTO %(coverage)s VALUES
494 (new.kind_codes_id, new.tmax_seconds, new.tmax_offset, 0)
495 ;
496 UPDATE %(coverage)s
497 SET step = step - 1
498 WHERE new.kind_codes_id == %(coverage)s.kind_codes_id
499 AND new.tmax_seconds == %(coverage)s.time_seconds
500 AND new.tmax_offset == %(coverage)s.time_offset
501 ;
502 DELETE FROM %(coverage)s
503 WHERE new.kind_codes_id == %(coverage)s.kind_codes_id
504 AND new.tmin_seconds == %(coverage)s.time_seconds
505 AND new.tmin_offset == %(coverage)s.time_offset
506 AND step == 0
507 ;
508 DELETE FROM %(coverage)s
509 WHERE new.kind_codes_id == %(coverage)s.kind_codes_id
510 AND new.tmax_seconds == %(coverage)s.time_seconds
511 AND new.tmax_offset == %(coverage)s.time_offset
512 AND step == 0
513 ;
514 END
515 '''))
517 cursor.execute(self._sql(
518 '''
519 CREATE TRIGGER IF NOT EXISTS %(db)s.%(nuts)s_remove_coverage
520 BEFORE DELETE ON %(nuts)s FOR EACH ROW
521 BEGIN
522 INSERT OR IGNORE INTO %(coverage)s VALUES
523 (old.kind_codes_id, old.tmin_seconds, old.tmin_offset, 0)
524 ;
525 UPDATE %(coverage)s
526 SET step = step - 1
527 WHERE old.kind_codes_id == %(coverage)s.kind_codes_id
528 AND old.tmin_seconds == %(coverage)s.time_seconds
529 AND old.tmin_offset == %(coverage)s.time_offset
530 ;
531 INSERT OR IGNORE INTO %(coverage)s VALUES
532 (old.kind_codes_id, old.tmax_seconds, old.tmax_offset, 0)
533 ;
534 UPDATE %(coverage)s
535 SET step = step + 1
536 WHERE old.kind_codes_id == %(coverage)s.kind_codes_id
537 AND old.tmax_seconds == %(coverage)s.time_seconds
538 AND old.tmax_offset == %(coverage)s.time_offset
539 ;
540 DELETE FROM %(coverage)s
541 WHERE old.kind_codes_id == %(coverage)s.kind_codes_id
542 AND old.tmin_seconds == %(coverage)s.time_seconds
543 AND old.tmin_offset == %(coverage)s.time_offset
544 AND step == 0
545 ;
546 DELETE FROM %(coverage)s
547 WHERE old.kind_codes_id == %(coverage)s.kind_codes_id
548 AND old.tmax_seconds == %(coverage)s.time_seconds
549 AND old.tmax_offset == %(coverage)s.time_offset
550 AND step == 0
551 ;
552 END
553 '''))
555 def _delete(self):
556 '''Delete database tables associated with this Squirrel.'''
558 for s in '''
559 DROP TRIGGER %(db)s.%(nuts)s_delete_nuts;
560 DROP TRIGGER %(db)s.%(nuts)s_delete_nuts2;
561 DROP TRIGGER %(db)s.%(file_states)s_delete_files;
562 DROP TRIGGER %(db)s.%(nuts)s_inc_kind_codes;
563 DROP TRIGGER %(db)s.%(nuts)s_dec_kind_codes;
564 DROP TABLE %(db)s.%(nuts)s;
565 DROP TABLE %(db)s.%(kind_codes_count)s;
566 DROP TRIGGER IF EXISTS %(db)s.%(nuts)s_add_coverage;
567 DROP TRIGGER IF EXISTS %(db)s.%(nuts)s_remove_coverage;
568 DROP TABLE IF EXISTS %(db)s.%(coverage)s;
569 '''.strip().splitlines():
571 self._conn.execute(self._sql(s))
573 Selection._delete(self)
575 @filldocs
576 def add(self,
577 paths,
578 kinds=None,
579 format='detect',
580 include=None,
581 exclude=None,
582 check=True):
584 '''
585 Add files to the selection.
587 :param paths:
588 Iterator yielding paths to files or directories to be added to the
589 selection. Recurses into directories. If given a ``str``, it
590 is treated as a single path to be added.
591 :type paths:
592 :py:class:`list` of :py:class:`str`
594 :param kinds:
595 Content types to be made available through the Squirrel selection.
596 By default, all known content types are accepted.
597 :type kinds:
598 :py:class:`list` of :py:class:`str`
600 :param format:
601 File format identifier or ``'detect'`` to enable auto-detection
602 (available: %(file_formats)s).
603 :type format:
604 str
606 :param include:
607 If not ``None``, files are only included if their paths match the
608 given regular expression pattern.
609 :type format:
610 str
612 :param exclude:
613 If not ``None``, files are only included if their paths do not
614 match the given regular expression pattern.
615 :type format:
616 str
618 :param check:
619 If ``True``, all file modification times are checked to see if
620 cached information has to be updated (slow). If ``False``, only
621 previously unknown files are indexed and cached information is used
622 for known files, regardless of file state (fast, corrresponds to
623 Squirrel's ``--optimistic`` mode). File deletions will go
624 undetected in the latter case.
625 :type check:
626 bool
628 :Complexity:
629 O(log N)
630 '''
632 if isinstance(kinds, str):
633 kinds = (kinds,)
635 if isinstance(paths, str):
636 paths = [paths]
638 kind_mask = model.to_kind_mask(kinds)
640 with progress.view():
641 Selection.add(
642 self, util.iter_select_files(
643 paths,
644 show_progress=False,
645 include=include,
646 exclude=exclude,
647 pass_through=lambda path: path.startswith('virtual:')
648 ), kind_mask, format)
650 self._load(check)
651 self._update_nuts()
653 def reload(self):
654 '''
655 Check for modifications and reindex modified files.
657 Based on file modification times.
658 '''
660 self._set_file_states_force_check()
661 self._load(check=True)
662 self._update_nuts()
664 def add_virtual(self, nuts, virtual_paths=None):
665 '''
666 Add content which is not backed by files.
668 :param nuts:
669 Content pieces to be added.
670 :type nuts:
671 iterator yielding :py:class:`~pyrocko.squirrel.model.Nut` objects
673 :param virtual_paths:
674 List of virtual paths to prevent creating a temporary list of the
675 nuts while aggregating the file paths for the selection.
676 :type virtual_paths:
677 :py:class:`list` of :py:class:`str`
679 Stores to the main database and the selection.
680 '''
682 if isinstance(virtual_paths, str):
683 virtual_paths = [virtual_paths]
685 if virtual_paths is None:
686 if not isinstance(nuts, list):
687 nuts = list(nuts)
688 virtual_paths = set(nut.file_path for nut in nuts)
690 Selection.add(self, virtual_paths)
691 self.get_database().dig(nuts)
692 self._update_nuts()
694 def add_volatile(self, nuts):
695 if not isinstance(nuts, list):
696 nuts = list(nuts)
698 paths = list(set(nut.file_path for nut in nuts))
699 io.backends.virtual.add_nuts(nuts)
700 self.add_virtual(nuts, paths)
701 self._volatile_paths.extend(paths)
703 def add_volatile_waveforms(self, traces):
704 '''
705 Add in-memory waveforms which will be removed when the app closes.
706 '''
708 name = model.random_name()
710 path = 'virtual:volatile:%s' % name
712 nuts = []
713 for itr, tr in enumerate(traces):
714 assert tr.tmin <= tr.tmax
715 tmin_seconds, tmin_offset = model.tsplit(tr.tmin)
716 tmax_seconds, tmax_offset = model.tsplit(
717 tr.tmin + tr.data_len()*tr.deltat)
719 nuts.append(model.Nut(
720 file_path=path,
721 file_format='virtual',
722 file_segment=itr,
723 file_element=0,
724 codes=separator.join(tr.codes),
725 tmin_seconds=tmin_seconds,
726 tmin_offset=tmin_offset,
727 tmax_seconds=tmax_seconds,
728 tmax_offset=tmax_offset,
729 deltat=tr.deltat,
730 kind_id=to_kind_id('waveform'),
731 content=tr))
733 self.add_volatile(nuts)
734 return path
736 def _load(self, check):
737 for _ in io.iload(
738 self,
739 content=[],
740 skip_unchanged=True,
741 check=check):
742 pass
744 def _update_nuts(self):
745 transaction = self.transaction()
746 with make_task('Aggregating selection') as task, \
747 transaction as cursor:
749 self._conn.set_progress_handler(task.update, 100000)
750 nrows = cursor.execute(self._sql(
751 '''
752 INSERT INTO %(db)s.%(nuts)s
753 SELECT NULL,
754 nuts.file_id, nuts.file_segment, nuts.file_element,
755 nuts.kind_id, nuts.kind_codes_id,
756 nuts.tmin_seconds, nuts.tmin_offset,
757 nuts.tmax_seconds, nuts.tmax_offset,
758 nuts.kscale
759 FROM %(db)s.%(file_states)s
760 INNER JOIN nuts
761 ON %(db)s.%(file_states)s.file_id == nuts.file_id
762 INNER JOIN kind_codes
763 ON nuts.kind_codes_id ==
764 kind_codes.kind_codes_id
765 WHERE %(db)s.%(file_states)s.file_state != 2
766 AND (((1 << kind_codes.kind_id)
767 & %(db)s.%(file_states)s.kind_mask) != 0)
768 ''')).rowcount
770 task.update(nrows)
771 self._set_file_states_known(transaction)
772 self._conn.set_progress_handler(None, 0)
774 def add_source(self, source, check=True):
775 '''
776 Add remote resource.
778 :param source:
779 Remote data access client instance.
780 :type source:
781 subclass of :py:class:`~pyrocko.squirrel.client.base.Source`
782 '''
784 self._sources.append(source)
785 source.setup(self, check=check)
787 def add_fdsn(self, *args, **kwargs):
788 '''
789 Add FDSN site for transparent remote data access.
791 Arguments are passed to
792 :py:class:`~pyrocko.squirrel.client.fdsn.FDSNSource`.
793 '''
795 self.add_source(fdsn.FDSNSource(*args, **kwargs))
797 def add_catalog(self, *args, **kwargs):
798 '''
799 Add online catalog for transparent event data access.
801 Arguments are passed to
802 :py:class:`~pyrocko.squirrel.client.catalog.CatalogSource`.
803 '''
805 self.add_source(catalog.CatalogSource(*args, **kwargs))
807 def add_dataset(self, ds, check=True, warn_persistent=True):
808 '''
809 Read dataset description from file and add its contents.
811 :param ds:
812 Path to dataset description file or dataset description object
813 . See :py:mod:`~pyrocko.squirrel.dataset`.
814 :type ds:
815 :py:class:`str` or :py:class:`~pyrocko.squirrel.dataset.Dataset`
817 :param check:
818 If ``True``, all file modification times are checked to see if
819 cached information has to be updated (slow). If ``False``, only
820 previously unknown files are indexed and cached information is used
821 for known files, regardless of file state (fast, corrresponds to
822 Squirrel's ``--optimistic`` mode). File deletions will go
823 undetected in the latter case.
824 :type check:
825 bool
826 '''
827 if isinstance(ds, str):
828 ds = dataset.read_dataset(ds)
829 path = ds
830 else:
831 path = None
833 if warn_persistent and ds.persistent and (
834 not self._persistent or (self._persistent != ds.persistent)):
836 logger.warning(
837 'Dataset `persistent` flag ignored. Can not be set on already '
838 'existing Squirrel instance.%s' % (
839 ' Dataset: %s' % path if path else ''))
841 ds.setup(self, check=check)
843 def _get_selection_args(
844 self, obj=None, tmin=None, tmax=None, time=None, codes=None):
846 if time is not None:
847 tmin = time
848 tmax = time
850 if obj is not None:
851 tmin = tmin if tmin is not None else obj.tmin
852 tmax = tmax if tmax is not None else obj.tmax
853 codes = codes if codes is not None else codes_inflate2(obj.codes)
855 if isinstance(codes, str):
856 codes = tuple(codes.split('.'))
858 return tmin, tmax, codes
860 def _selection_args_to_kwargs(
861 self, obj=None, tmin=None, tmax=None, time=None, codes=None):
863 return dict(obj=obj, tmin=tmin, tmax=tmax, time=time, codes=codes)
865 def _timerange_sql(self, tmin, tmax, kind, cond, args, naiv):
867 tmin_seconds, tmin_offset = model.tsplit(tmin)
868 tmax_seconds, tmax_offset = model.tsplit(tmax)
869 if naiv:
870 cond.append('%(db)s.%(nuts)s.tmin_seconds <= ?')
871 args.append(tmax_seconds)
872 else:
873 tscale_edges = model.tscale_edges
874 tmin_cond = []
875 for kscale in range(tscale_edges.size + 1):
876 if kscale != tscale_edges.size:
877 tscale = int(tscale_edges[kscale])
878 tmin_cond.append('''
879 (%(db)s.%(nuts)s.kind_id = ?
880 AND %(db)s.%(nuts)s.kscale == ?
881 AND %(db)s.%(nuts)s.tmin_seconds BETWEEN ? AND ?)
882 ''')
883 args.extend(
884 (to_kind_id(kind), kscale,
885 tmin_seconds - tscale - 1, tmax_seconds + 1))
887 else:
888 tmin_cond.append('''
889 (%(db)s.%(nuts)s.kind_id == ?
890 AND %(db)s.%(nuts)s.kscale == ?
891 AND %(db)s.%(nuts)s.tmin_seconds <= ?)
892 ''')
894 args.extend(
895 (to_kind_id(kind), kscale, tmax_seconds + 1))
896 if tmin_cond:
897 cond.append(' ( ' + ' OR '.join(tmin_cond) + ' ) ')
899 cond.append('%(db)s.%(nuts)s.tmax_seconds >= ?')
900 args.append(tmin_seconds)
902 def iter_nuts(
903 self, kind=None, tmin=None, tmax=None, codes=None, naiv=False,
904 kind_codes_ids=None, path=None):
906 '''
907 Iterate over content entities matching given constraints.
909 :param kind:
910 Content kind (or kinds) to extract.
911 :type kind:
912 :py:class:`str`, :py:class:`list` of :py:class:`str`
914 :param tmin:
915 Start time of query interval.
916 :type tmin:
917 timestamp
919 :param tmax:
920 End time of query interval.
921 :type tmax:
922 timestamp
924 :param codes:
925 Pattern of content codes to query.
926 :type codes:
927 :py:class:`tuple` of :py:class:`str`
929 :param naiv:
930 Bypass time span lookup through indices (slow, for testing).
931 :type naiv:
932 :py:class:`bool`
934 :param kind_codes_ids:
935 Kind-codes IDs of contents to be retrieved (internal use).
936 :type kind_codes_ids:
937 :py:class:`list` of :py:class:`str`
939 :yields:
940 :py:class:`~pyrocko.squirrel.model.Nut` objects representing the
941 intersecting content.
943 :complexity:
944 O(log N) for the time selection part due to heavy use of database
945 indices.
947 Query time span is treated as a half-open interval ``[tmin, tmax)``.
948 However, if ``tmin`` equals ``tmax``, the edge logics are modified to
949 closed-interval so that content intersecting with the time instant ``t
950 = tmin = tmax`` is returned (otherwise nothing would be returned as
951 ``[t, t)`` never matches anything).
953 Time spans of content entities to be matched are also treated as half
954 open intervals, e.g. content span ``[0, 1)`` is matched by query span
955 ``[0, 1)`` but not by ``[-1, 0)`` or ``[1, 2)``. Also here, logics are
956 modified to closed-interval when the content time span is an empty
957 interval, i.e. to indicate a time instant. E.g. time instant 0 is
958 matched by ``[0, 1)`` but not by ``[-1, 0)`` or ``[1, 2)``.
959 '''
961 if not isinstance(kind, str):
962 if kind is None:
963 kind = model.g_content_kinds
964 for kind_ in kind:
965 for nut in self.iter_nuts(kind_, tmin, tmax, codes):
966 yield nut
968 return
970 cond = []
971 args = []
972 if tmin is not None or tmax is not None:
973 assert kind is not None
974 if tmin is None:
975 tmin = self.get_time_span()[0]
976 if tmax is None:
977 tmax = self.get_time_span()[1] + 1.0
979 self._timerange_sql(tmin, tmax, kind, cond, args, naiv)
981 elif kind is not None:
982 cond.append('kind_codes.kind_id == ?')
983 args.append(to_kind_id(kind))
985 if codes is not None:
986 pats = codes_patterns_for_kind(kind, codes)
987 if pats:
988 cond.append(
989 ' ( %s ) ' % ' OR '.join(
990 ('kind_codes.codes GLOB ?',) * len(pats)))
991 args.extend(separator.join(pat) for pat in pats)
993 if kind_codes_ids is not None:
994 cond.append(
995 ' ( kind_codes.kind_codes_id IN ( %s ) ) ' % ', '.join(
996 '?'*len(kind_codes_ids)))
998 args.extend(kind_codes_ids)
1000 db = self.get_database()
1001 if path is not None:
1002 cond.append('files.path == ?')
1003 args.append(db.relpath(abspath(path)))
1005 sql = ('''
1006 SELECT
1007 files.path,
1008 files.format,
1009 files.mtime,
1010 files.size,
1011 %(db)s.%(nuts)s.file_segment,
1012 %(db)s.%(nuts)s.file_element,
1013 kind_codes.kind_id,
1014 kind_codes.codes,
1015 %(db)s.%(nuts)s.tmin_seconds,
1016 %(db)s.%(nuts)s.tmin_offset,
1017 %(db)s.%(nuts)s.tmax_seconds,
1018 %(db)s.%(nuts)s.tmax_offset,
1019 kind_codes.deltat
1020 FROM files
1021 INNER JOIN %(db)s.%(nuts)s
1022 ON files.file_id == %(db)s.%(nuts)s.file_id
1023 INNER JOIN kind_codes
1024 ON %(db)s.%(nuts)s.kind_codes_id == kind_codes.kind_codes_id
1025 ''')
1027 if cond:
1028 sql += ''' WHERE ''' + ' AND '.join(cond)
1030 sql = self._sql(sql)
1031 if tmin is None and tmax is None:
1032 for row in self._conn.execute(sql, args):
1033 row = (db.abspath(row[0]),) + row[1:]
1034 nut = model.Nut(values_nocheck=row)
1035 yield nut
1036 else:
1037 assert tmin is not None and tmax is not None
1038 if tmin == tmax:
1039 for row in self._conn.execute(sql, args):
1040 row = (db.abspath(row[0]),) + row[1:]
1041 nut = model.Nut(values_nocheck=row)
1042 if (nut.tmin <= tmin < nut.tmax) \
1043 or (nut.tmin == nut.tmax and tmin == nut.tmin):
1045 yield nut
1046 else:
1047 for row in self._conn.execute(sql, args):
1048 row = (db.abspath(row[0]),) + row[1:]
1049 nut = model.Nut(values_nocheck=row)
1050 if (tmin < nut.tmax and nut.tmin < tmax) \
1051 or (nut.tmin == nut.tmax
1052 and tmin <= nut.tmin < tmax):
1054 yield nut
1056 def get_nuts(self, *args, **kwargs):
1057 '''
1058 Get content entities matching given constraints.
1060 Like :py:meth:`iter_nuts` but returns results as a list.
1061 '''
1063 return list(self.iter_nuts(*args, **kwargs))
1065 def _split_nuts(
1066 self, kind, tmin=None, tmax=None, codes=None, path=None):
1068 tmin_seconds, tmin_offset = model.tsplit(tmin)
1069 tmax_seconds, tmax_offset = model.tsplit(tmax)
1071 names_main_nuts = dict(self._names)
1072 names_main_nuts.update(db='main', nuts='nuts')
1074 db = self.get_database()
1076 def main_nuts(s):
1077 return s % names_main_nuts
1079 with self.transaction() as cursor:
1080 # modify selection and main
1081 for sql_subst in [
1082 self._sql, main_nuts]:
1084 cond = []
1085 args = []
1087 self._timerange_sql(tmin, tmax, kind, cond, args, False)
1089 if codes is not None:
1090 pats = codes_patterns_for_kind(kind, codes)
1091 if pats:
1092 cond.append(
1093 ' ( %s ) ' % ' OR '.join(
1094 ('kind_codes.codes GLOB ?',) * len(pats)))
1095 args.extend(separator.join(pat) for pat in pats)
1097 if path is not None:
1098 cond.append('files.path == ?')
1099 args.append(db.relpath(abspath(path)))
1101 sql = sql_subst('''
1102 SELECT
1103 %(db)s.%(nuts)s.nut_id,
1104 %(db)s.%(nuts)s.tmin_seconds,
1105 %(db)s.%(nuts)s.tmin_offset,
1106 %(db)s.%(nuts)s.tmax_seconds,
1107 %(db)s.%(nuts)s.tmax_offset,
1108 kind_codes.deltat
1109 FROM files
1110 INNER JOIN %(db)s.%(nuts)s
1111 ON files.file_id == %(db)s.%(nuts)s.file_id
1112 INNER JOIN kind_codes
1113 ON %(db)s.%(nuts)s.kind_codes_id == kind_codes.kind_codes_id
1114 WHERE ''' + ' AND '.join(cond)) # noqa
1116 insert = []
1117 delete = []
1118 for row in cursor.execute(sql, args):
1119 nut_id, nut_tmin_seconds, nut_tmin_offset, \
1120 nut_tmax_seconds, nut_tmax_offset, nut_deltat = row
1122 nut_tmin = model.tjoin(
1123 nut_tmin_seconds, nut_tmin_offset)
1124 nut_tmax = model.tjoin(
1125 nut_tmax_seconds, nut_tmax_offset)
1127 if nut_tmin < tmax and tmin < nut_tmax:
1128 if nut_tmin < tmin:
1129 insert.append((
1130 nut_tmin_seconds, nut_tmin_offset,
1131 tmin_seconds, tmin_offset,
1132 model.tscale_to_kscale(
1133 tmin_seconds - nut_tmin_seconds),
1134 nut_id))
1136 if tmax < nut_tmax:
1137 insert.append((
1138 tmax_seconds, tmax_offset,
1139 nut_tmax_seconds, nut_tmax_offset,
1140 model.tscale_to_kscale(
1141 nut_tmax_seconds - tmax_seconds),
1142 nut_id))
1144 delete.append((nut_id,))
1146 sql_add = '''
1147 INSERT INTO %(db)s.%(nuts)s (
1148 file_id, file_segment, file_element, kind_id,
1149 kind_codes_id, tmin_seconds, tmin_offset,
1150 tmax_seconds, tmax_offset, kscale )
1151 SELECT
1152 file_id, file_segment, file_element,
1153 kind_id, kind_codes_id, ?, ?, ?, ?, ?
1154 FROM %(db)s.%(nuts)s
1155 WHERE nut_id == ?
1156 '''
1157 cursor.executemany(sql_subst(sql_add), insert)
1159 sql_delete = '''
1160 DELETE FROM %(db)s.%(nuts)s WHERE nut_id == ?
1161 '''
1162 cursor.executemany(sql_subst(sql_delete), delete)
1164 def get_time_span(self, kinds=None):
1165 '''
1166 Get time interval over all content in selection.
1168 :param kinds:
1169 If not ``None``, restrict query to given content kinds.
1170 :type kind:
1171 list of str
1173 :complexity:
1174 O(1), independent of the number of nuts.
1176 :returns:
1177 ``(tmin, tmax)``, combined time interval of queried content kinds.
1178 '''
1180 sql_min = self._sql('''
1181 SELECT MIN(tmin_seconds), MIN(tmin_offset)
1182 FROM %(db)s.%(nuts)s
1183 WHERE kind_id == ?
1184 AND tmin_seconds == (
1185 SELECT MIN(tmin_seconds)
1186 FROM %(db)s.%(nuts)s
1187 WHERE kind_id == ?)
1188 ''')
1190 sql_max = self._sql('''
1191 SELECT MAX(tmax_seconds), MAX(tmax_offset)
1192 FROM %(db)s.%(nuts)s
1193 WHERE kind_id == ?
1194 AND tmax_seconds == (
1195 SELECT MAX(tmax_seconds)
1196 FROM %(db)s.%(nuts)s
1197 WHERE kind_id == ?)
1198 ''')
1200 gtmin = None
1201 gtmax = None
1203 if isinstance(kinds, str):
1204 kinds = [kinds]
1206 if kinds is None:
1207 kind_ids = model.g_content_kind_ids
1208 else:
1209 kind_ids = model.to_kind_ids(kinds)
1211 for kind_id in kind_ids:
1212 for tmin_seconds, tmin_offset in self._conn.execute(
1213 sql_min, (kind_id, kind_id)):
1214 tmin = model.tjoin(tmin_seconds, tmin_offset)
1215 if tmin is not None and (gtmin is None or tmin < gtmin):
1216 gtmin = tmin
1218 for (tmax_seconds, tmax_offset) in self._conn.execute(
1219 sql_max, (kind_id, kind_id)):
1220 tmax = model.tjoin(tmax_seconds, tmax_offset)
1221 if tmax is not None and (gtmax is None or tmax > gtmax):
1222 gtmax = tmax
1224 return gtmin, gtmax
1226 def has(self, kinds):
1227 '''
1228 Check availability of given content kinds.
1230 :param kinds:
1231 Content kinds to query.
1232 :type kind:
1233 list of str
1235 :returns:
1236 ``True`` if any of the queried content kinds is available
1237 in the selection.
1238 '''
1239 self_tmin, self_tmax = self.get_time_span(kinds)
1241 return None not in (self_tmin, self_tmax)
1243 def get_deltat_span(self, kind):
1244 '''
1245 Get min and max sampling interval of all content of given kind.
1247 :param kind:
1248 Content kind
1249 :type kind:
1250 str
1252 :returns: (deltat_min, deltat_max)
1253 '''
1255 deltats = [
1256 deltat for deltat in self.get_deltats(kind)
1257 if deltat is not None]
1259 if deltats:
1260 return min(deltats), max(deltats)
1261 else:
1262 return None, None
1264 def iter_kinds(self, codes=None):
1265 '''
1266 Iterate over content types available in selection.
1268 :param codes:
1269 If given, get kinds only for selected codes identifier.
1270 :type codes:
1271 :py:class:`tuple` of :py:class:`str`
1273 :yields:
1274 Available content kinds as :py:class:`str`.
1276 :complexity:
1277 O(1), independent of number of nuts.
1278 '''
1280 return self._database._iter_kinds(
1281 codes=codes,
1282 kind_codes_count='%(db)s.%(kind_codes_count)s' % self._names)
1284 def iter_deltats(self, kind=None):
1285 '''
1286 Iterate over sampling intervals available in selection.
1288 :param kind:
1289 If given, get sampling intervals only for a given content type.
1290 :type kind:
1291 str
1293 :yields:
1294 :py:class:`float` values.
1296 :complexity:
1297 O(1), independent of number of nuts.
1298 '''
1299 return self._database._iter_deltats(
1300 kind=kind,
1301 kind_codes_count='%(db)s.%(kind_codes_count)s' % self._names)
1303 def iter_codes(self, kind=None):
1304 '''
1305 Iterate over content identifier code sequences available in selection.
1307 :param kind:
1308 If given, get codes only for a given content type.
1309 :type kind:
1310 str
1312 :yields:
1313 :py:class:`tuple` of :py:class:`str`
1315 :complexity:
1316 O(1), independent of number of nuts.
1317 '''
1318 return self._database._iter_codes(
1319 kind=kind,
1320 kind_codes_count='%(db)s.%(kind_codes_count)s' % self._names)
1322 def iter_counts(self, kind=None):
1323 '''
1324 Iterate over number of occurrences of any (kind, codes) combination.
1326 :param kind:
1327 If given, get counts only for selected content type.
1328 :type kind:
1329 str
1331 :yields:
1332 Tuples of the form ``((kind, codes), count)``.
1334 :complexity:
1335 O(1), independent of number of nuts.
1336 '''
1337 return self._database._iter_counts(
1338 kind=kind,
1339 kind_codes_count='%(db)s.%(kind_codes_count)s' % self._names)
1341 def get_kinds(self, codes=None):
1342 '''
1343 Get content types available in selection.
1345 :param codes:
1346 If given, get kinds only for selected codes identifier.
1347 :type codes:
1348 :py:class:`tuple` of :py:class:`str`
1350 :returns:
1351 Sorted list of available content types.
1353 :complexity:
1354 O(1), independent of number of nuts.
1356 '''
1357 return sorted(list(self.iter_kinds(codes=codes)))
1359 def get_deltats(self, kind=None):
1360 '''
1361 Get sampling intervals available in selection.
1363 :param kind:
1364 If given, get codes only for selected content type.
1365 :type kind:
1366 str
1368 :complexity:
1369 O(1), independent of number of nuts.
1371 :returns: sorted list of available sampling intervals
1372 '''
1373 return sorted(list(self.iter_deltats(kind=kind)))
1375 def get_codes(self, kind=None):
1376 '''
1377 Get identifier code sequences available in selection.
1379 :param kind:
1380 If given, get codes only for selected content type.
1381 :type kind:
1382 str
1384 :complexity:
1385 O(1), independent of number of nuts.
1387 :returns: sorted list of available codes as tuples of strings
1388 '''
1389 return sorted(list(self.iter_codes(kind=kind)))
1391 def get_counts(self, kind=None):
1392 '''
1393 Get number of occurrences of any (kind, codes) combination.
1395 :param kind:
1396 If given, get codes only for selected content type.
1397 :type kind:
1398 str
1400 :complexity:
1401 O(1), independent of number of nuts.
1403 :returns: ``dict`` with ``counts[kind][codes]`` or ``counts[codes]``
1404 if kind is not ``None``
1405 '''
1406 d = {}
1407 for (k, codes, deltat), count in self.iter_counts():
1408 if k not in d:
1409 v = d[k] = {}
1410 else:
1411 v = d[k]
1413 if codes not in v:
1414 v[codes] = 0
1416 v[codes] += count
1418 if kind is not None:
1419 return d[kind]
1420 else:
1421 return d
1423 def glob_codes(self, kind, codes_list):
1424 '''
1425 Find codes matching given patterns.
1427 :param kind:
1428 Content kind to be queried.
1429 :type kind:
1430 str
1432 :param codes_list:
1433 List of code patterns to query. If not given or empty, an empty
1434 list is returned.
1435 :type codes_list:
1436 :py:class:`list` of :py:class:`tuple` of :py:class:`str`
1438 :returns:
1439 List of matches of the form ``[kind_codes_id, codes, deltat]``.
1440 '''
1442 args = [to_kind_id(kind)]
1443 pats = []
1444 for codes in codes_list:
1445 pats.extend(codes_patterns_for_kind(kind, codes))
1447 codes_cond = ' ( %s ) ' % ' OR '.join(
1448 ('kind_codes.codes GLOB ?',) * len(pats))
1450 args.extend(separator.join(pat) for pat in pats)
1452 sql = self._sql('''
1453 SELECT kind_codes_id, codes, deltat FROM kind_codes
1454 WHERE
1455 kind_id == ?
1456 AND ''' + codes_cond)
1458 return list(map(list, self._conn.execute(sql, args)))
1460 def update(self, constraint=None, **kwargs):
1461 '''
1462 Update or partially update channel and event inventories.
1464 :param constraint:
1465 Selection of times or areas to be brought up to date.
1466 :type constraint:
1467 :py:class:`~pyrocko.squirrel.client.Constraint`
1469 :param \\*\\*kwargs:
1470 Shortcut for setting ``constraint=Constraint(**kwargs)``.
1472 This function triggers all attached remote sources, to check for
1473 updates in the meta-data. The sources will only submit queries when
1474 their expiration date has passed, or if the selection spans into
1475 previously unseen times or areas.
1476 '''
1478 if constraint is None:
1479 constraint = client.Constraint(**kwargs)
1481 for source in self._sources:
1482 source.update_channel_inventory(self, constraint)
1483 source.update_event_inventory(self, constraint)
1485 def update_waveform_promises(self, constraint=None, **kwargs):
1486 '''
1487 Permit downloading of remote waveforms.
1489 :param constraint:
1490 Remote waveforms compatible with the given constraint are enabled
1491 for download.
1492 :type constraint:
1493 :py:class:`~pyrocko.squirrel.client.Constraint`
1495 :param \\*\\*kwargs:
1496 Shortcut for setting ``constraint=Constraint(**kwargs)``.
1498 Calling this method permits Squirrel to download waveforms from remote
1499 sources when processing subsequent waveform requests. This works by
1500 inserting so called waveform promises into the database. It will look
1501 into the available channels for each remote source and create a promise
1502 for each channel compatible with the given constraint. If the promise
1503 then matches in a waveform request, Squirrel tries to download the
1504 waveform. If the download is successful, the downloaded waveform is
1505 added to the Squirrel and the promise is deleted. If the download
1506 fails, the promise is kept if the reason of failure looks like being
1507 temporary, e.g. because of a network failure. If the cause of failure
1508 however seems to be permanent, the promise is deleted so that no
1509 further attempts are made to download a waveform which might not be
1510 available from that server at all. To force re-scheduling after a
1511 permanent failure, call :py:meth:`update_waveform_promises`
1512 yet another time.
1513 '''
1515 if constraint is None:
1516 constraint = client.Constraint(**kwargs)
1518 # TODO
1519 print('contraint ignored atm')
1521 for source in self._sources:
1522 source.update_waveform_promises(self)
1524 def update_responses(self, constraint=None, **kwargs):
1525 # TODO
1526 if constraint is None:
1527 constraint = client.Constraint(**kwargs)
1529 print('contraint ignored atm')
1530 for source in self._sources:
1531 source.update_response_inventory(self, constraint)
1533 def get_nfiles(self):
1534 '''
1535 Get number of files in selection.
1536 '''
1538 sql = self._sql('''SELECT COUNT(*) FROM %(db)s.%(file_states)s''')
1539 for row in self._conn.execute(sql):
1540 return row[0]
1542 def get_nnuts(self):
1543 '''
1544 Get number of nuts in selection.
1545 '''
1547 sql = self._sql('''SELECT COUNT(*) FROM %(db)s.%(nuts)s''')
1548 for row in self._conn.execute(sql):
1549 return row[0]
1551 def get_total_size(self):
1552 '''
1553 Get aggregated file size available in selection.
1554 '''
1556 sql = self._sql('''
1557 SELECT SUM(files.size) FROM %(db)s.%(file_states)s
1558 INNER JOIN files
1559 ON %(db)s.%(file_states)s.file_id = files.file_id
1560 ''')
1562 for row in self._conn.execute(sql):
1563 return row[0] or 0
1565 def get_stats(self):
1566 '''
1567 Get statistics on contents available through this selection.
1568 '''
1570 kinds = self.get_kinds()
1571 time_spans = {}
1572 for kind in kinds:
1573 time_spans[kind] = self.get_time_span([kind])
1575 return SquirrelStats(
1576 nfiles=self.get_nfiles(),
1577 nnuts=self.get_nnuts(),
1578 kinds=kinds,
1579 codes=self.get_codes(),
1580 total_size=self.get_total_size(),
1581 counts=self.get_counts(),
1582 time_spans=time_spans,
1583 sources=[s.describe() for s in self._sources],
1584 operators=[op.describe() for op in self._operators])
1586 def get_content(
1587 self,
1588 nut,
1589 cache_id='default',
1590 accessor_id='default',
1591 show_progress=False):
1593 '''
1594 Get and possibly load full content for a given index entry from file.
1596 Loads the actual content objects (channel, station, waveform, ...) from
1597 file. For efficiency sibling content (all stuff in the same file
1598 segment) will also be loaded as a side effect. The loaded contents are
1599 cached in the Squirrel object.
1600 '''
1602 content_cache = self._content_caches[cache_id]
1603 if not content_cache.has(nut):
1605 for nut_loaded in io.iload(
1606 nut.file_path,
1607 segment=nut.file_segment,
1608 format=nut.file_format,
1609 database=self._database,
1610 show_progress=show_progress):
1612 content_cache.put(nut_loaded)
1614 try:
1615 return content_cache.get(nut, accessor_id)
1616 except KeyError:
1617 raise error.NotAvailable(
1618 'Unable to retrieve content: %s, %s, %s, %s' % nut.key)
1620 def advance_accessor(self, accessor_id, cache_id=None):
1621 '''
1622 Notify memory caches about consumer moving to a new data batch.
1624 :param accessor_id:
1625 Name of accessing consumer to be advanced.
1626 :type accessor_id:
1627 str
1629 :param cache_id:
1630 Name of cache to for which the accessor should be advanced. By
1631 default the named accessor is advanced in all registered caches.
1632 By default, two caches named ``'default'`` and ``'waveforms'`` are
1633 available.
1634 :type cache_id:
1635 str
1637 See :py:class:`~pyrocko.squirrel.cache.ContentCache` for details on how
1638 Squirrel's memory caching works and can be tuned. Default behaviour is
1639 to release data when it has not been used in the latest data
1640 window/batch. If the accessor is never advanced, data is cached
1641 indefinitely - which is often desired e.g. for station meta-data.
1642 Methods for consecutive data traversal, like
1643 :py:meth:`chopper_waveforms` automatically advance and clear
1644 their accessor.
1645 '''
1646 for cache_ in (
1647 self._content_caches.keys()
1648 if cache_id is None
1649 else [cache_id]):
1651 self._content_caches[cache_].advance_accessor(accessor_id)
1653 def clear_accessor(self, accessor_id, cache_id=None):
1654 '''
1655 Notify memory caches about a consumer having finished.
1657 :param accessor_id:
1658 Name of accessor to be cleared.
1659 :type accessor_id:
1660 str
1662 :param cache_id:
1663 Name of cache to for which the accessor should be cleared. By
1664 default the named accessor is cleared from all registered caches.
1665 By default, two caches named ``'default'`` and ``'waveforms'`` are
1666 available.
1667 :type cache_id:
1668 str
1670 Calling this method clears all references to cache entries held by the
1671 named accessor. Cache entries are then freed if not referenced by any
1672 other accessor.
1673 '''
1675 for cache_ in (
1676 self._content_caches.keys()
1677 if cache_id is None
1678 else [cache_id]):
1680 self._content_caches[cache_].clear_accessor(accessor_id)
1682 def _check_duplicates(self, nuts):
1683 d = defaultdict(list)
1684 for nut in nuts:
1685 d[nut.codes].append(nut)
1687 for codes, group in d.items():
1688 if len(group) > 1:
1689 logger.warning(
1690 'Multiple entries matching codes: %s'
1691 % '.'.join(codes.split(separator)))
1693 @filldocs
1694 def get_stations(
1695 self, obj=None, tmin=None, tmax=None, time=None, codes=None,
1696 model='squirrel'):
1698 '''
1699 Get stations matching given constraints.
1701 %(query_args)s
1703 :param model:
1704 Select object model for returned values: ``'squirrel'`` to get
1705 Squirrel station objects or ``'pyrocko'`` to get Pyrocko station
1706 objects with channel information attached.
1707 :type model:
1708 str
1710 :returns:
1711 List of :py:class:`pyrocko.squirrel.Station
1712 <pyrocko.squirrel.model.Station>` objects by default or list of
1713 :py:class:`pyrocko.model.Station <pyrocko.model.station.Station>`
1714 objects if ``model='pyrocko'`` is requested.
1716 See :py:meth:`iter_nuts` for details on time span matching.
1717 '''
1719 if model == 'pyrocko':
1720 return self._get_pyrocko_stations(obj, tmin, tmax, time, codes)
1721 elif model == 'squirrel':
1722 args = self._get_selection_args(obj, tmin, tmax, time, codes)
1723 nuts = sorted(
1724 self.iter_nuts('station', *args), key=lambda nut: nut.dkey)
1725 self._check_duplicates(nuts)
1726 return [self.get_content(nut) for nut in nuts]
1727 else:
1728 raise ValueError('Invalid station model: %s' % model)
1730 @filldocs
1731 def get_channels(
1732 self, obj=None, tmin=None, tmax=None, time=None, codes=None):
1734 '''
1735 Get channels matching given constraints.
1737 %(query_args)s
1739 :returns:
1740 List of :py:class:`~pyrocko.squirrel.model.Channel` objects.
1742 See :py:meth:`iter_nuts` for details on time span matching.
1743 '''
1745 args = self._get_selection_args(obj, tmin, tmax, time, codes)
1746 nuts = sorted(
1747 self.iter_nuts('channel', *args), key=lambda nut: nut.dkey)
1748 self._check_duplicates(nuts)
1749 return [self.get_content(nut) for nut in nuts]
1751 @filldocs
1752 def get_sensors(
1753 self, obj=None, tmin=None, tmax=None, time=None, codes=None):
1755 '''
1756 Get sensors matching given constraints.
1758 %(query_args)s
1760 :returns:
1761 List of :py:class:`~pyrocko.squirrel.model.Sensor` objects.
1763 See :py:meth:`iter_nuts` for details on time span matching.
1764 '''
1766 tmin, tmax, codes = self._get_selection_args(
1767 obj, tmin, tmax, time, codes)
1769 if codes is not None:
1770 if isinstance(codes, str):
1771 codes = codes.split('.')
1772 codes = tuple(codes_inflate(codes))
1773 if codes[4] != '*':
1774 codes = codes[:4] + (codes[4][:-1] + '?',) + codes[5:]
1776 nuts = sorted(
1777 self.iter_nuts(
1778 'channel', tmin, tmax, codes), key=lambda nut: nut.dkey)
1779 self._check_duplicates(nuts)
1780 return model.Sensor.from_channels(
1781 self.get_content(nut) for nut in nuts)
1783 @filldocs
1784 def get_responses(
1785 self, obj=None, tmin=None, tmax=None, time=None, codes=None):
1787 '''
1788 Get instrument responses matching given constraints.
1790 %(query_args)s
1792 :returns:
1793 List of :py:class:`~pyrocko.squirrel.model.Response` objects.
1795 See :py:meth:`iter_nuts` for details on time span matching.
1796 '''
1798 args = self._get_selection_args(obj, tmin, tmax, time, codes)
1799 nuts = sorted(
1800 self.iter_nuts('response', *args), key=lambda nut: nut.dkey)
1801 self._check_duplicates(nuts)
1802 return [self.get_content(nut) for nut in nuts]
1804 @filldocs
1805 def get_response(
1806 self, obj=None, tmin=None, tmax=None, time=None, codes=None):
1808 '''
1809 Get instrument response matching given constraints.
1811 %(query_args)s
1813 :returns:
1814 :py:class:`~pyrocko.squirrel.model.Response` object.
1816 Same as :py:meth:`get_responses` but returning exactly one response.
1817 Raises :py:exc:`~pyrocko.squirrel.error.NotAvailable` if zero or more
1818 than one is available.
1820 See :py:meth:`iter_nuts` for details on time span matching.
1821 '''
1823 responses = self.get_responses(obj, tmin, tmax, time, codes)
1824 if len(responses) == 0:
1825 raise error.NotAvailable(
1826 'No instrument response available.')
1827 elif len(responses) > 1:
1828 raise error.NotAvailable(
1829 'Multiple instrument responses matching given constraints.')
1831 return responses[0]
1833 @filldocs
1834 def get_events(
1835 self, obj=None, tmin=None, tmax=None, time=None, codes=None):
1837 '''
1838 Get events matching given constraints.
1840 %(query_args)s
1842 :returns:
1843 List of :py:class:`~pyrocko.model.event.Event` objects.
1845 See :py:meth:`iter_nuts` for details on time span matching.
1846 '''
1848 args = self._get_selection_args(obj, tmin, tmax, time, codes)
1849 nuts = sorted(
1850 self.iter_nuts('event', *args), key=lambda nut: nut.dkey)
1851 self._check_duplicates(nuts)
1852 return [self.get_content(nut) for nut in nuts]
1854 def _redeem_promises(self, *args):
1856 tmin, tmax, _ = args
1858 waveforms = list(self.iter_nuts('waveform', *args))
1859 promises = list(self.iter_nuts('waveform_promise', *args))
1861 codes_to_avail = defaultdict(list)
1862 for nut in waveforms:
1863 codes_to_avail[nut.codes].append((nut.tmin, nut.tmax+nut.deltat))
1865 def tts(x):
1866 if isinstance(x, tuple):
1867 return tuple(tts(e) for e in x)
1868 elif isinstance(x, list):
1869 return list(tts(e) for e in x)
1870 else:
1871 return util.time_to_str(x)
1873 orders = []
1874 for promise in promises:
1875 waveforms_avail = codes_to_avail[promise.codes]
1876 for block_tmin, block_tmax in blocks(
1877 max(tmin, promise.tmin),
1878 min(tmax, promise.tmax),
1879 promise.deltat):
1881 orders.append(
1882 WaveformOrder(
1883 source_id=promise.file_path,
1884 codes=tuple(promise.codes.split(separator)),
1885 tmin=block_tmin,
1886 tmax=block_tmax,
1887 deltat=promise.deltat,
1888 gaps=gaps(waveforms_avail, block_tmin, block_tmax)))
1890 orders_noop, orders = lpick(lambda order: order.gaps, orders)
1892 order_keys_noop = set(order_key(order) for order in orders_noop)
1893 if len(order_keys_noop) != 0 or len(orders_noop) != 0:
1894 logger.info(
1895 'Waveform orders already satisified with cached/local data: '
1896 '%i (%i)' % (len(order_keys_noop), len(orders_noop)))
1898 source_ids = []
1899 sources = {}
1900 for source in self._sources:
1901 if isinstance(source, fdsn.FDSNSource):
1902 source_ids.append(source._source_id)
1903 sources[source._source_id] = source
1905 source_priority = dict(
1906 (source_id, i) for (i, source_id) in enumerate(source_ids))
1908 order_groups = defaultdict(list)
1909 for order in orders:
1910 order_groups[order_key(order)].append(order)
1912 for k, order_group in order_groups.items():
1913 order_group.sort(
1914 key=lambda order: source_priority[order.source_id])
1916 n_order_groups = len(order_groups)
1918 if len(order_groups) != 0 or len(orders) != 0:
1919 logger.info(
1920 'Waveform orders standing for download: %i (%i)'
1921 % (len(order_groups), len(orders)))
1923 task = make_task('Waveform orders processed', n_order_groups)
1924 else:
1925 task = None
1927 def split_promise(order):
1928 self._split_nuts(
1929 'waveform_promise',
1930 order.tmin, order.tmax,
1931 codes=order.codes,
1932 path=order.source_id)
1934 def release_order_group(order):
1935 okey = order_key(order)
1936 for followup in order_groups[okey]:
1937 split_promise(followup)
1939 del order_groups[okey]
1941 if task:
1942 task.update(n_order_groups - len(order_groups))
1944 def noop(order):
1945 pass
1947 def success(order):
1948 release_order_group(order)
1949 split_promise(order)
1951 def batch_add(paths):
1952 self.add(paths)
1954 calls = queue.Queue()
1956 def enqueue(f):
1957 def wrapper(*args):
1958 calls.put((f, args))
1960 return wrapper
1962 for order in orders_noop:
1963 split_promise(order)
1965 while order_groups:
1967 orders_now = []
1968 empty = []
1969 for k, order_group in order_groups.items():
1970 try:
1971 orders_now.append(order_group.pop(0))
1972 except IndexError:
1973 empty.append(k)
1975 for k in empty:
1976 del order_groups[k]
1978 by_source_id = defaultdict(list)
1979 for order in orders_now:
1980 by_source_id[order.source_id].append(order)
1982 threads = []
1983 for source_id in by_source_id:
1984 def download():
1985 try:
1986 sources[source_id].download_waveforms(
1987 by_source_id[source_id],
1988 success=enqueue(success),
1989 error_permanent=enqueue(split_promise),
1990 error_temporary=noop,
1991 batch_add=enqueue(batch_add))
1993 finally:
1994 calls.put(None)
1996 thread = threading.Thread(target=download)
1997 thread.start()
1998 threads.append(thread)
2000 ndone = 0
2001 while ndone < len(threads):
2002 ret = calls.get()
2003 if ret is None:
2004 ndone += 1
2005 else:
2006 ret[0](*ret[1])
2008 for thread in threads:
2009 thread.join()
2011 if task:
2012 task.update(n_order_groups - len(order_groups))
2014 if task:
2015 task.done()
2017 @filldocs
2018 def get_waveform_nuts(
2019 self, obj=None, tmin=None, tmax=None, time=None, codes=None):
2021 '''
2022 Get waveform content entities matching given constraints.
2024 %(query_args)s
2026 Like :py:meth:`get_nuts` with ``kind='waveform'`` but additionally
2027 resolves matching waveform promises (downloads waveforms from remote
2028 sources).
2030 See :py:meth:`iter_nuts` for details on time span matching.
2031 '''
2033 args = self._get_selection_args(obj, tmin, tmax, time, codes)
2034 self._redeem_promises(*args)
2035 return sorted(
2036 self.iter_nuts('waveform', *args), key=lambda nut: nut.dkey)
2038 @filldocs
2039 def get_waveforms(
2040 self, obj=None, tmin=None, tmax=None, time=None, codes=None,
2041 uncut=False, want_incomplete=True, degap=True, maxgap=5,
2042 maxlap=None, snap=None, include_last=False, load_data=True,
2043 accessor_id='default', operator_params=None):
2045 '''
2046 Get waveforms matching given constraints.
2048 %(query_args)s
2050 :param uncut:
2051 Set to ``True``, to disable cutting traces to [``tmin``, ``tmax``]
2052 and to disable degapping/deoverlapping. Returns untouched traces as
2053 they are read from file segment. File segments are always read in
2054 their entirety.
2055 :type uncut:
2056 bool
2058 :param want_incomplete:
2059 If ``True``, gappy/incomplete traces are included in the result.
2060 :type want_incomplete:
2061 bool
2063 :param degap:
2064 If ``True``, connect traces and remove gaps and overlaps.
2065 :type degap:
2066 bool
2068 :param maxgap:
2069 Maximum gap size in samples which is filled with interpolated
2070 samples when ``degap`` is ``True``.
2071 :type maxgap:
2072 int
2074 :param maxlap:
2075 Maximum overlap size in samples which is removed when ``degap`` is
2076 ``True``
2077 :type maxlap:
2078 int
2080 :param snap:
2081 Rounding functions used when computing sample index from time
2082 instance, for trace start and trace end, respectively. By default,
2083 ``(round, round)`` is used.
2084 :type snap:
2085 tuple of 2 callables
2087 :param include_last:
2088 If ``True``, add one more sample to the returned traces (the sample
2089 which would be the first sample of a query with ``tmin`` set to the
2090 current value of ``tmax``).
2091 :type include_last:
2092 bool
2094 :param load_data:
2095 If ``True``, waveform data samples are read from files (or cache).
2096 If ``False``, meta-information-only traces are returned (dummy
2097 traces with no data samples).
2098 :type load_data:
2099 bool
2101 :param accessor_id:
2102 Name of consumer on who's behalf data is accessed. Used in cache
2103 management (see :py:mod:`~pyrocko.squirrel.cache`). Used as a key
2104 to distinguish different points of extraction for the decision of
2105 when to release cached waveform data. Should be used when data is
2106 alternately extracted from more than one region / selection.
2107 :type accessor_id:
2108 str
2110 See :py:meth:`iter_nuts` for details on time span matching.
2112 Loaded data is kept in memory (at least) until
2113 :py:meth:`clear_accessor` has been called or
2114 :py:meth:`advance_accessor` has been called two consecutive times
2115 without data being accessed between the two calls (by this accessor).
2116 Data may still be further kept in the memory cache if held alive by
2117 consumers with a different ``accessor_id``.
2118 '''
2120 tmin, tmax, codes = self._get_selection_args(
2121 obj, tmin, tmax, time, codes)
2123 self_tmin, self_tmax = self.get_time_span(
2124 ['waveform', 'waveform_promise'])
2126 if None in (self_tmin, self_tmax):
2127 logger.warning(
2128 'No waveforms available.')
2129 return []
2131 tmin = tmin if tmin is not None else self_tmin
2132 tmax = tmax if tmax is not None else self_tmax
2134 if codes is not None:
2135 operator = self.get_operator(codes)
2136 if operator is not None:
2137 return operator.get_waveforms(
2138 self, codes,
2139 tmin=tmin, tmax=tmax,
2140 uncut=uncut, want_incomplete=want_incomplete, degap=degap,
2141 maxgap=maxgap, maxlap=maxlap, snap=snap,
2142 include_last=include_last, load_data=load_data,
2143 accessor_id=accessor_id, params=operator_params)
2145 nuts = self.get_waveform_nuts(obj, tmin, tmax, time, codes)
2147 if load_data:
2148 traces = [
2149 self.get_content(nut, 'waveform', accessor_id) for nut in nuts]
2151 else:
2152 traces = [
2153 trace.Trace(**nut.trace_kwargs) for nut in nuts]
2155 if uncut:
2156 return traces
2158 if snap is None:
2159 snap = (round, round)
2161 chopped = []
2162 for tr in traces:
2163 if not load_data and tr.ydata is not None:
2164 tr = tr.copy(data=False)
2165 tr.ydata = None
2167 try:
2168 chopped.append(tr.chop(
2169 tmin, tmax,
2170 inplace=False,
2171 snap=snap,
2172 include_last=include_last))
2174 except trace.NoData:
2175 pass
2177 processed = self._process_chopped(
2178 chopped, degap, maxgap, maxlap, want_incomplete, tmin, tmax)
2180 return processed
2182 @filldocs
2183 def chopper_waveforms(
2184 self, obj=None, tmin=None, tmax=None, time=None, codes=None,
2185 tinc=None, tpad=0.,
2186 want_incomplete=True, snap_window=False,
2187 degap=True, maxgap=5, maxlap=None,
2188 snap=None, include_last=False, load_data=True,
2189 accessor_id=None, clear_accessor=True, operator_params=None):
2191 '''
2192 Iterate window-wise over waveform archive.
2194 %(query_args)s
2196 :param tinc:
2197 Time increment (window shift time) (default uses ``tmax-tmin``)
2198 :type tinc:
2199 timestamp
2201 :param tpad:
2202 Padding time appended on either side of the data window (window
2203 overlap is ``2*tpad``).
2204 :type tpad:
2205 timestamp
2207 :param want_incomplete:
2208 If ``True``, gappy/incomplete traces are included in the result.
2209 :type want_incomplete:
2210 bool
2212 :param snap_window:
2213 If ``True``, start time windows at multiples of tinc with respect
2214 to system time zero.
2216 :param degap:
2217 If ``True``, connect traces and remove gaps and overlaps.
2218 :type degap:
2219 bool
2221 :param maxgap:
2222 Maximum gap size in samples which is filled with interpolated
2223 samples when ``degap`` is ``True``.
2224 :type maxgap:
2225 int
2227 :param maxlap:
2228 Maximum overlap size in samples which is removed when ``degap`` is
2229 ``True``
2230 :type maxlap:
2231 int
2233 :param snap:
2234 Rounding functions used when computing sample index from time
2235 instance, for trace start and trace end, respectively. By default,
2236 ``(round, round)`` is used.
2237 :type snap:
2238 tuple of 2 callables
2240 :param include_last:
2241 If ``True``, add one more sample to the returned traces (the sample
2242 which would be the first sample of a query with ``tmin`` set to the
2243 current value of ``tmax``).
2244 :type include_last:
2245 bool
2247 :param load_data:
2248 If ``True``, waveform data samples are read from files (or cache).
2249 If ``False``, meta-information-only traces are returned (dummy
2250 traces with no data samples).
2251 :type load_data:
2252 bool
2254 :param accessor_id:
2255 Name of consumer on who's behalf data is accessed. Used in cache
2256 management (see :py:mod:`~pyrocko.squirrel.cache`). Used as a key
2257 to distinguish different points of extraction for the decision of
2258 when to release cached waveform data. Should be used when data is
2259 alternately extracted from more than one region / selection.
2260 :type accessor_id:
2261 str
2263 :param clear_accessor:
2264 If ``True`` (default), :py:meth:`clear_accessor` is called when the
2265 chopper finishes. Set to ``False`` to keep loaded waveforms in
2266 memory when the generator returns.
2268 :yields:
2269 A list of :py:class:`~pyrocko.trace.Trace` objects for every
2270 extracted time window.
2272 See :py:meth:`iter_nuts` for details on time span matching.
2273 '''
2275 tmin, tmax, codes = self._get_selection_args(
2276 obj, tmin, tmax, time, codes)
2278 self_tmin, self_tmax = self.get_time_span(
2279 ['waveform', 'waveform_promise'])
2281 if None in (self_tmin, self_tmax):
2282 logger.warning(
2283 'Content has undefined time span. No waveforms and no '
2284 'waveform promises?')
2285 return
2287 if snap_window and tinc is not None:
2288 tmin = tmin if tmin is not None else self_tmin
2289 tmax = tmax if tmax is not None else self_tmax
2290 tmin = math.floor(tmin / tinc) * tinc
2291 tmax = math.ceil(tmax / tinc) * tinc
2292 else:
2293 tmin = tmin if tmin is not None else self_tmin + tpad
2294 tmax = tmax if tmax is not None else self_tmax - tpad
2296 tinc = tinc if tinc is not None else tmax - tmin
2298 try:
2299 if accessor_id is None:
2300 accessor_id = 'chopper%i' % self._n_choppers_active
2302 self._n_choppers_active += 1
2304 eps = tinc * 1e-6
2305 if tinc != 0.0:
2306 nwin = int(((tmax - eps) - tmin) / tinc) + 1
2307 else:
2308 nwin = 1
2310 for iwin in range(nwin):
2311 wmin, wmax = tmin+iwin*tinc, min(tmin+(iwin+1)*tinc, tmax)
2312 chopped = []
2313 wmin, wmax = tmin+iwin*tinc, min(tmin+(iwin+1)*tinc, tmax)
2314 eps = tinc*1e-6
2315 if wmin >= tmax-eps:
2316 break
2318 chopped = self.get_waveforms(
2319 tmin=wmin-tpad,
2320 tmax=wmax+tpad,
2321 codes=codes,
2322 snap=snap,
2323 include_last=include_last,
2324 load_data=load_data,
2325 want_incomplete=want_incomplete,
2326 degap=degap,
2327 maxgap=maxgap,
2328 maxlap=maxlap,
2329 accessor_id=accessor_id,
2330 operator_params=operator_params)
2332 self.advance_accessor(accessor_id)
2334 yield Batch(
2335 tmin=wmin,
2336 tmax=wmax,
2337 i=iwin,
2338 n=nwin,
2339 traces=chopped)
2341 iwin += 1
2343 finally:
2344 self._n_choppers_active -= 1
2345 if clear_accessor:
2346 self.clear_accessor(accessor_id, 'waveform')
2348 def _process_chopped(
2349 self, chopped, degap, maxgap, maxlap, want_incomplete, tmin, tmax):
2351 chopped.sort(key=lambda a: a.full_id)
2352 if degap:
2353 chopped = trace.degapper(chopped, maxgap=maxgap, maxlap=maxlap)
2355 if not want_incomplete:
2356 chopped_weeded = []
2357 for tr in chopped:
2358 emin = tr.tmin - tmin
2359 emax = tr.tmax + tr.deltat - tmax
2360 if (abs(emin) <= 0.5*tr.deltat and abs(emax) <= 0.5*tr.deltat):
2361 chopped_weeded.append(tr)
2363 elif degap:
2364 if (0. < emin <= 5. * tr.deltat
2365 and -5. * tr.deltat <= emax < 0.):
2367 tr.extend(tmin, tmax-tr.deltat, fillmethod='repeat')
2368 chopped_weeded.append(tr)
2370 chopped = chopped_weeded
2372 return chopped
2374 def _get_pyrocko_stations(
2375 self, obj=None, tmin=None, tmax=None, time=None, codes=None):
2377 from pyrocko import model as pmodel
2379 by_nsl = defaultdict(lambda: (list(), list()))
2380 for station in self.get_stations(obj, tmin, tmax, time, codes):
2381 sargs = station._get_pyrocko_station_args()
2382 nsl = sargs[1:4]
2383 by_nsl[nsl][0].append(sargs)
2385 for channel in self.get_channels(obj, tmin, tmax, time, codes):
2386 sargs = channel._get_pyrocko_station_args()
2387 nsl = sargs[1:4]
2388 sargs_list, channels_list = by_nsl[nsl]
2389 sargs_list.append(sargs)
2390 channels_list.append(channel)
2392 pstations = []
2393 nsls = list(by_nsl.keys())
2394 nsls.sort()
2395 for nsl in nsls:
2396 sargs_list, channels_list = by_nsl[nsl]
2397 sargs = util.consistency_merge(sargs_list)
2399 by_c = defaultdict(list)
2400 for ch in channels_list:
2401 by_c[ch.channel].append(ch._get_pyrocko_channel_args())
2403 chas = list(by_c.keys())
2404 chas.sort()
2405 pchannels = []
2406 for cha in chas:
2407 list_of_cargs = by_c[cha]
2408 cargs = util.consistency_merge(list_of_cargs)
2409 pchannels.append(pmodel.Channel(
2410 name=cargs[0],
2411 azimuth=cargs[1],
2412 dip=cargs[2]))
2414 pstations.append(pmodel.Station(
2415 network=sargs[0],
2416 station=sargs[1],
2417 location=sargs[2],
2418 lat=sargs[3],
2419 lon=sargs[4],
2420 elevation=sargs[5],
2421 depth=sargs[6] or 0.0,
2422 channels=pchannels))
2424 return pstations
2426 @property
2427 def pile(self):
2429 '''
2430 Emulates the older :py:class:`pyrocko.pile.Pile` interface.
2432 This property exposes a :py:class:`pyrocko.squirrel.pile.Pile` object,
2433 which emulates most of the older :py:class:`pyrocko.pile.Pile` methods
2434 but uses the fluffy power of the Squirrel under the hood.
2436 This interface can be used as a drop-in replacement for piles which are
2437 used in existing scripts and programs for efficient waveform data
2438 access. The Squirrel-based pile scales better for large datasets. Newer
2439 scripts should use Squirrel's native methods to avoid the emulation
2440 overhead.
2441 '''
2442 from . import pile
2444 if self._pile is None:
2445 self._pile = pile.Pile(self)
2447 return self._pile
2449 def snuffle(self):
2450 '''
2451 Look at dataset in Snuffler.
2452 '''
2453 self.pile.snuffle()
2455 def _gather_codes_keys(self, kind, gather, selector):
2456 return set(
2457 gather(codes)
2458 for codes in self.iter_codes(kind)
2459 if selector is None or selector(codes))
2461 def __str__(self):
2462 return str(self.get_stats())
2464 def get_coverage(
2465 self, kind, tmin=None, tmax=None, codes_list=None, limit=None,
2466 return_raw=True):
2468 '''
2469 Get coverage information.
2471 Get information about strips of gapless data coverage.
2473 :param kind:
2474 Content kind to be queried.
2475 :type kind:
2476 str
2478 :param tmin:
2479 Start time of query interval.
2480 :type tmin:
2481 timestamp
2483 :param tmax:
2484 End time of query interval.
2485 :type tmax:
2486 timestamp
2488 :param codes_list:
2489 List of code patterns to query. If not given or empty, an empty
2490 list is returned.
2491 :type codes_list:
2492 :py:class:`list` of :py:class:`tuple` of :py:class:`str`
2494 :param limit:
2495 Limit query to return only up to a given maximum number of entries
2496 per matching channel (without setting this option, very gappy data
2497 could cause the query to execute for a very long time).
2498 :type limit:
2499 int
2501 :returns:
2502 List of entries of the form ``(pattern, codes, deltat, tmin, tmax,
2503 data)`` where ``pattern`` is the request code pattern which
2504 yielded this entry, ``codes`` are the matching channel codes,
2505 ``tmin`` and ``tmax`` are the global min and max times for which
2506 data for this channel is available, regardless of any time
2507 restrictions in the query. ``data`` is a list with (up to
2508 ``limit``) change-points of the form ``(time, count)`` where a
2509 ``count`` of zero indicates a data gap, a value of 1 normal data
2510 coverage and higher values indicate duplicate/redundant data.
2511 '''
2513 tmin_seconds, tmin_offset = model.tsplit(tmin)
2514 tmax_seconds, tmax_offset = model.tsplit(tmax)
2515 kind_id = to_kind_id(kind)
2517 if codes_list is None:
2518 codes_list = self.get_codes(kind=kind)
2520 kdata_all = []
2521 for pattern in codes_list:
2522 kdata = self.glob_codes(kind, [pattern])
2523 for row in kdata:
2524 row[0:0] = [pattern]
2526 kdata_all.extend(kdata)
2528 kind_codes_ids = [x[1] for x in kdata_all]
2530 counts_at_tmin = {}
2531 if tmin is not None:
2532 for nut in self.iter_nuts(
2533 kind, tmin, tmin, kind_codes_ids=kind_codes_ids):
2535 k = nut.codes, nut.deltat
2536 if k not in counts_at_tmin:
2537 counts_at_tmin[k] = 0
2539 counts_at_tmin[k] += 1
2541 coverage = []
2542 for pattern, kind_codes_id, codes, deltat in kdata_all:
2543 entry = [pattern, codes, deltat, None, None, []]
2544 for i, order in [(0, 'ASC'), (1, 'DESC')]:
2545 sql = self._sql('''
2546 SELECT
2547 time_seconds,
2548 time_offset
2549 FROM %(db)s.%(coverage)s
2550 WHERE
2551 kind_codes_id == ?
2552 ORDER BY
2553 kind_codes_id ''' + order + ''',
2554 time_seconds ''' + order + ''',
2555 time_offset ''' + order + '''
2556 LIMIT 1
2557 ''')
2559 for row in self._conn.execute(sql, [kind_codes_id]):
2560 entry[3+i] = model.tjoin(row[0], row[1])
2562 if None in entry[3:5]:
2563 continue
2565 args = [kind_codes_id]
2567 sql_time = ''
2568 if tmin is not None:
2569 # intentionally < because (== tmin) is queried from nuts
2570 sql_time += ' AND ( ? < time_seconds ' \
2571 'OR ( ? == time_seconds AND ? < time_offset ) ) '
2572 args.extend([tmin_seconds, tmin_seconds, tmin_offset])
2574 if tmax is not None:
2575 sql_time += ' AND ( time_seconds < ? ' \
2576 'OR ( ? == time_seconds AND time_offset <= ? ) ) '
2577 args.extend([tmax_seconds, tmax_seconds, tmax_offset])
2579 sql_limit = ''
2580 if limit is not None:
2581 sql_limit = ' LIMIT ?'
2582 args.append(limit)
2584 sql = self._sql('''
2585 SELECT
2586 time_seconds,
2587 time_offset,
2588 step
2589 FROM %(db)s.%(coverage)s
2590 WHERE
2591 kind_codes_id == ?
2592 ''' + sql_time + '''
2593 ORDER BY
2594 kind_codes_id,
2595 time_seconds,
2596 time_offset
2597 ''' + sql_limit)
2599 rows = list(self._conn.execute(sql, args))
2601 if limit is not None and len(rows) == limit:
2602 entry[-1] = None
2603 else:
2604 counts = counts_at_tmin.get((codes, deltat), 0)
2605 tlast = None
2606 if tmin is not None:
2607 entry[-1].append((tmin, counts))
2608 tlast = tmin
2610 for row in rows:
2611 t = model.tjoin(row[0], row[1])
2612 counts += row[2]
2613 entry[-1].append((t, counts))
2614 tlast = t
2616 if tmax is not None and (tlast is None or tlast != tmax):
2617 entry[-1].append((tmax, counts))
2619 coverage.append(entry)
2621 if return_raw:
2622 return coverage
2623 else:
2624 return [model.Coverage.from_values(
2625 entry + [kind_id]) for entry in coverage]
2627 def add_operator(self, op):
2628 self._operators.append(op)
2630 def update_operator_mappings(self):
2631 available = [
2632 separator.join(codes)
2633 for codes in self.get_codes(kind=('channel'))]
2635 for operator in self._operators:
2636 operator.update_mappings(available, self._operator_registry)
2638 def iter_operator_mappings(self):
2639 for operator in self._operators:
2640 for in_codes, out_codes in operator.iter_mappings():
2641 yield operator, in_codes, out_codes
2643 def get_operator_mappings(self):
2644 return list(self.iter_operator_mappings())
2646 def get_operator(self, codes):
2647 if isinstance(codes, tuple):
2648 codes = separator.join(codes)
2649 try:
2650 return self._operator_registry[codes][0]
2651 except KeyError:
2652 return None
2654 def get_operator_group(self, codes):
2655 if isinstance(codes, tuple):
2656 codes = separator.join(codes)
2657 try:
2658 return self._operator_registry[codes]
2659 except KeyError:
2660 return None, (None, None, None)
2662 def iter_operator_codes(self):
2663 for _, _, out_codes in self.iter_operator_mappings():
2664 for codes in out_codes:
2665 yield tuple(codes.split(separator))
2667 def get_operator_codes(self):
2668 return list(self.iter_operator_codes())
2670 def print_tables(self, table_names=None, stream=None):
2671 '''
2672 Dump raw database tables in textual form (for debugging purposes).
2674 :param table_names:
2675 Names of tables to be dumped or ``None`` to dump all.
2676 :type table_names:
2677 :py:class:`list` of :py:class:`str`
2679 :param stream:
2680 Open file or ``None`` to dump to standard output.
2681 '''
2683 if stream is None:
2684 stream = sys.stdout
2686 if isinstance(table_names, str):
2687 table_names = [table_names]
2689 if table_names is None:
2690 table_names = [
2691 'selection_file_states',
2692 'selection_nuts',
2693 'selection_kind_codes_count',
2694 'files', 'nuts', 'kind_codes', 'kind_codes_count']
2696 m = {
2697 'selection_file_states': '%(db)s.%(file_states)s',
2698 'selection_nuts': '%(db)s.%(nuts)s',
2699 'selection_kind_codes_count': '%(db)s.%(kind_codes_count)s',
2700 'files': 'files',
2701 'nuts': 'nuts',
2702 'kind_codes': 'kind_codes',
2703 'kind_codes_count': 'kind_codes_count'}
2705 for table_name in table_names:
2706 self._database.print_table(
2707 m[table_name] % self._names, stream=stream)
2710class SquirrelStats(Object):
2711 '''
2712 Container to hold statistics about contents available from a Squirrel.
2714 See also :py:meth:`Squirrel.get_stats`.
2715 '''
2717 nfiles = Int.T(
2718 help='Number of files in selection.')
2719 nnuts = Int.T(
2720 help='Number of index nuts in selection.')
2721 codes = List.T(
2722 Tuple.T(content_t=String.T()),
2723 help='Available code sequences in selection, e.g. '
2724 '(agency, network, station, location) for stations nuts.')
2725 kinds = List.T(
2726 String.T(),
2727 help='Available content types in selection.')
2728 total_size = Int.T(
2729 help='Aggregated file size of files is selection.')
2730 counts = Dict.T(
2731 String.T(), Dict.T(Tuple.T(content_t=String.T()), Int.T()),
2732 help='Breakdown of how many nuts of any content type and code '
2733 'sequence are available in selection, ``counts[kind][codes]``.')
2734 time_spans = Dict.T(
2735 String.T(), Tuple.T(content_t=Timestamp.T()),
2736 help='Time spans by content type.')
2737 sources = List.T(
2738 String.T(),
2739 help='Descriptions of attached sources.')
2740 operators = List.T(
2741 String.T(),
2742 help='Descriptions of attached operators.')
2744 def __str__(self):
2745 kind_counts = dict(
2746 (kind, sum(self.counts[kind].values())) for kind in self.kinds)
2748 scodes = model.codes_to_str_abbreviated(self.codes)
2750 ssources = '<none>' if not self.sources else '\n' + '\n'.join(
2751 ' ' + s for s in self.sources)
2753 soperators = '<none>' if not self.operators else '\n' + '\n'.join(
2754 ' ' + s for s in self.operators)
2756 def stime(t):
2757 return util.tts(t) if t is not None and t not in (
2758 model.g_tmin, model.g_tmax) else '<none>'
2760 def stable(rows):
2761 ns = [max(len(w) for w in col) for col in zip(*rows)]
2762 return '\n'.join(
2763 ' '.join(w.ljust(n) for n, w in zip(ns, row))
2764 for row in rows)
2766 def indent(s):
2767 return '\n'.join(' '+line for line in s.splitlines())
2769 stspans = '<none>' if not self.kinds else '\n' + indent(stable([(
2770 kind + ':',
2771 str(kind_counts[kind]),
2772 stime(self.time_spans[kind][0]),
2773 '-',
2774 stime(self.time_spans[kind][1])) for kind in sorted(self.kinds)]))
2776 s = '''
2777Number of files: %i
2778Total size of known files: %s
2779Number of index nuts: %i
2780Available content kinds: %s
2781Available codes: %s
2782Sources: %s
2783Operators: %s''' % (
2784 self.nfiles,
2785 util.human_bytesize(self.total_size),
2786 self.nnuts,
2787 stspans, scodes, ssources, soperators)
2789 return s.lstrip()
2792__all__ = [
2793 'Squirrel',
2794 'SquirrelStats',
2795]