Coverage for /usr/local/lib/python3.13/dist-packages/pyrocko/squirrel/base.py: 83%
993 statements
« prev ^ index » next coverage.py v7.6.0, created at 2025-12-04 10:41 +0000
« prev ^ index » next coverage.py v7.6.0, created at 2025-12-04 10:41 +0000
1# http://pyrocko.org - GPLv3
2#
3# The Pyrocko Developers, 21st Century
4# ---|P------/S----------~Lg----------
6'''
7Squirrel main classes.
8'''
9from __future__ import annotations
11import asyncio
12import re
13import sys
14import os
15import time
16import math
17import logging
18import threading
19import queue
20import sqlite3
21from collections import defaultdict
22from concurrent.futures import ThreadPoolExecutor
24from pyrocko.guts import Object, Int, List, Tuple, String, Timestamp, Dict
25from pyrocko import util, trace
26from pyrocko import progress
27from pyrocko.plot import nice_time_tick_inc_approx_secs
29from . import model, io, cache, dataset
31from .model import to_kind_id, WaveformOrder, to_kind, to_codes, \
32 STATION, CHANNEL, RESPONSE, EVENT, WAVEFORM, codes_patterns_list, \
33 codes_patterns_for_kind
34from .client import fdsn, catalog
35from .selection import Selection, filldocs
36from .database import abspath
37from .operators.base import Operator, CodesPatternFiltering
38from . import client, environment, error
40logger = logging.getLogger('psq.base')
42NTHREADS_DEFAULT = 1
43LOADING_EXECUTOR = {}
44guts_prefix = 'squirrel'
47def get_loading_executor(max_workers=NTHREADS_DEFAULT):
48 pid = os.getpid()
49 if pid not in LOADING_EXECUTOR \
50 or LOADING_EXECUTOR[pid]._max_workers != max_workers:
51 LOADING_EXECUTOR[pid] = ThreadPoolExecutor(max_workers=max_workers)
53 return LOADING_EXECUTOR[pid]
56def get_nthreads():
57 try:
58 return max(1, len(os.sched_getaffinity(0)))
59 except Exception:
60 return os.cpu_count() or 1
63def nonef(f, xs):
64 xs_ = [x for x in xs if x is not None]
65 if xs_:
66 return f(xs_)
67 else:
68 return None
71def make_task(*args):
72 return progress.task(*args, logger=logger)
75def lpick(condition, seq):
76 ft = [], []
77 for ele in seq:
78 ft[int(bool(condition(ele)))].append(ele)
80 return ft
83def len_plural(obj):
84 return len(obj), '' if len(obj) == 1 else 's'
87def blocks(tmin, tmax, deltat, n_samples_block=100000):
88 tblock = nice_time_tick_inc_approx_secs(
89 util.to_time_float(deltat * n_samples_block))
90 iblock_min = int(math.floor(tmin / tblock))
91 iblock_max = int(math.ceil(tmax / tblock))
92 for iblock in range(iblock_min, iblock_max):
93 yield iblock * tblock, (iblock+1) * tblock
96def gaps(avail, tmin, tmax):
97 assert tmin < tmax
99 data = [(tmax, 1), (tmin, -1)]
100 for (tmin_a, tmax_a) in avail:
101 assert tmin_a < tmax_a
102 data.append((tmin_a, 1))
103 data.append((tmax_a, -1))
105 data.sort()
106 s = 1
107 gaps = []
108 tmin_g = None
109 for t, x in data:
110 if s == 1 and x == -1:
111 tmin_g = t
112 elif s == 0 and x == 1 and tmin_g is not None:
113 tmax_g = t
114 if tmin_g != tmax_g:
115 gaps.append((tmin_g, tmax_g))
117 s += x
119 return gaps
122def prefix_tree(tups):
123 if not tups:
124 return []
126 if len(tups[0]) == 1:
127 return sorted((tup[0], []) for tup in tups)
129 d = defaultdict(list)
130 for tup in tups:
131 d[tup[0]].append(tup[1:])
133 sub = []
134 for k in sorted(d.keys()):
135 sub.append((k, prefix_tree(d[k])))
137 return sub
140def match_time_span(tmin, tmax, obj):
141 return (obj.tmin is None or tmax is None or obj.tmin <= tmax) \
142 and (tmin is None or obj.tmax is None or tmin < obj.tmax)
145class Batch(object):
146 '''
147 Batch of waveforms from window-wise data extraction.
149 Encapsulates state and results yielded for each window in window-wise
150 waveform extraction with the :py:meth:`Squirrel.chopper_waveforms` method.
152 *Attributes:*
154 .. py:attribute:: tmin
156 Start of this time window.
158 .. py:attribute:: tmax
160 End of this time window.
162 .. py:attribute:: i
164 Index of this time window in sequence.
166 .. py:attribute:: n
168 Total number of time windows in sequence.
170 .. py:attribute:: igroup
172 Index of this time window's sequence group.
174 .. py:attribute:: ngroups
176 Total number of sequence groups.
178 .. py:attribute:: traces
180 Extracted waveforms for this time window.
181 '''
183 def __init__(self, tmin, tmax, tpad, i, n, igroup, ngroups, traces):
184 self.tmin = tmin
185 self.tmax = tmax
186 self.i = i
187 self.n = n
188 self.igroup = igroup
189 self.ngroups = ngroups
190 self.traces = traces
192 def __str__(self):
193 return 'Batch %i/%i, group %i/%i, %i traces, %s - %s' % (
194 self.i, self.n, self.igroup, self.ngroups, len(self.traces),
195 util.time_to_str(self.tmin),
196 util.time_to_str(self.tmax))
199class Squirrel(Selection):
200 '''
201 Prompt, lazy, indexing, caching, dynamic seismological dataset access.
203 :param env:
204 Squirrel environment instance or directory path to use as starting
205 point for its detection. By default, the current directory is used as
206 starting point. When searching for a usable environment the directory
207 ``'.squirrel'`` or ``'squirrel'`` in the current (or starting point)
208 directory is used if it exists, otherwise the parent directories are
209 search upwards for the existence of such a directory. If no such
210 directory is found, the user's global Squirrel environment
211 ``'$HOME/.pyrocko/squirrel'`` is used.
212 :type env:
213 :py:class:`~pyrocko.squirrel.environment.Environment` or
214 :py:class:`str`
216 :param database:
217 Database instance or path to database. By default the
218 database found in the detected Squirrel environment is used.
219 :type database:
220 :py:class:`~pyrocko.squirrel.database.Database` or :py:class:`str`
222 :param cache_path:
223 Directory path to use for data caching. By default, the ``'cache'``
224 directory in the detected Squirrel environment is used.
225 :type cache_path:
226 :py:class:`str`
228 :param persistent:
229 If given a name, create a persistent selection.
230 :type persistent:
231 :py:class:`str`
233 :param n_threads:
234 Number of threads for parallel loading of data. By default a maximum of
235 8 threads are used, or less, depending on how many CPU cores are
236 advertised.
237 :type n_threads:
238 :py:class:`int`
240 This is the central class of the Squirrel framework. It provides a unified
241 interface to query and access seismic waveforms, station meta-data and
242 event information from local file collections and remote data sources. For
243 prompt responses, a profound database setup is used under the hood. To
244 speed up assemblage of ad-hoc data selections, files are indexed on first
245 use and the extracted meta-data is remembered in the database for
246 subsequent accesses. Bulk data is lazily loaded from disk and remote
247 sources, just when requested. Once loaded, data is cached in memory to
248 expedite typical access patterns. Files and data sources can be dynamically
249 added to and removed from the Squirrel selection at runtime.
251 Queries are restricted to the contents of the files currently added to the
252 Squirrel selection (usually a subset of the file meta-information
253 collection in the database). This list of files is referred to here as the
254 "selection". By default, temporary tables are created in the attached
255 database to hold the names of the files in the selection as well as various
256 indices and counters. These tables are only visible inside the application
257 which created them and are deleted when the database connection is closed
258 or the application exits. To create a selection which is not deleted at
259 exit, supply a name to the ``persistent`` argument of the Squirrel
260 constructor. Persistent selections are shared among applications using the
261 same database.
263 **Method summary**
265 Some of the methods are implemented in :py:class:`Squirrel`'s base class
266 :py:class:`~pyrocko.squirrel.selection.Selection`.
268 .. autosummary::
270 ~Squirrel.add
271 ~Squirrel.add_source
272 ~Squirrel.add_fdsn
273 ~Squirrel.add_catalog
274 ~Squirrel.add_dataset
275 ~Squirrel.add_virtual
276 ~Squirrel.update
277 ~Squirrel.update_waveform_promises
278 ~Squirrel.advance_accessor
279 ~Squirrel.clear_accessor
280 ~Squirrel.reload
281 ~pyrocko.squirrel.selection.Selection.iter_paths
282 ~Squirrel.iter_nuts
283 ~Squirrel.iter_kinds
284 ~Squirrel.iter_deltats
285 ~Squirrel.iter_codes
286 ~pyrocko.squirrel.selection.Selection.get_paths
287 ~Squirrel.get_nuts
288 ~Squirrel.get_kinds
289 ~Squirrel.get_deltats
290 ~Squirrel.get_codes
291 ~Squirrel.get_counts
292 ~Squirrel.get_time_span
293 ~Squirrel.get_deltat_span
294 ~Squirrel.get_nfiles
295 ~Squirrel.get_nnuts
296 ~Squirrel.get_total_size
297 ~Squirrel.get_stats
298 ~Squirrel.get_content
299 ~Squirrel.get_stations
300 ~Squirrel.get_channels
301 ~Squirrel.get_responses
302 ~Squirrel.get_events
303 ~Squirrel.get_waveform_nuts
304 ~Squirrel.get_waveforms
305 ~Squirrel.chopper_waveforms
306 ~Squirrel.get_coverage
307 ~Squirrel.pile
308 ~Squirrel.snuffle
309 ~Squirrel.glob_codes
310 ~pyrocko.squirrel.selection.Selection.get_database
311 ~Squirrel.print_tables
312 '''
314 def __init__(
315 self, env=None, database=None, cache_path=None, persistent=None,
316 n_threads=None, n_samples_block=100000):
318 if not isinstance(env, environment.Environment):
319 env = environment.get_environment(env)
321 if database is None:
322 database = env.expand_path(env.database_path)
324 if cache_path is None:
325 cache_path = env.expand_path(env.cache_path)
327 if persistent is None:
328 persistent = env.persistent
330 Selection.__init__(
331 self, database=database, persistent=persistent)
333 self.get_database().set_basepath(os.path.dirname(env.get_basepath()))
335 if n_threads is None:
336 self._n_threads = min(NTHREADS_DEFAULT, get_nthreads())
337 elif n_threads == 0:
338 self._n_threads = get_nthreads()
339 else:
340 self._n_threads = n_threads
342 if sqlite3.threadsafety != 3 and self._n_threads != 1:
343 logger.warning(
344 'Falling back to single-threaded behaviour. The sqlite3 '
345 'module has been compile without support to share the '
346 'connection across threads (sqlite3.threadsafety == %i)'
347 % sqlite3.threadsafety)
349 self._n_threads = 1
351 self._content_caches = {
352 'waveform': cache.ContentCache(),
353 'default': cache.ContentCache()}
355 self._cache_path = cache_path
357 self._sources = []
358 self._operators = []
359 self._operator_registry = {}
360 self._recent_orders = {}
361 self._recent_orders_prune_time = 3600.
363 self._pending_orders = []
365 self._pile = None
366 self._n_choppers_active = 0
368 self.downloads_enabled = True
369 self.n_samples_block = n_samples_block
371 self._names.update({
372 'nuts': self.name + '_nuts',
373 'kind_codes_count': self.name + '_kind_codes_count',
374 'coverage': self.name + '_coverage'})
376 with self.transaction('create tables') as cursor:
377 self._create_tables_squirrel(cursor)
379 def _create_tables_squirrel(self, cursor):
381 cursor.execute(self._register_table(self._sql(
382 '''
383 CREATE TABLE IF NOT EXISTS %(db)s.%(nuts)s (
384 nut_id integer PRIMARY KEY,
385 file_id integer,
386 file_segment integer,
387 file_element integer,
388 kind_id integer,
389 kind_codes_id integer,
390 tmin_seconds integer,
391 tmin_offset integer,
392 tmax_seconds integer,
393 tmax_offset integer,
394 kscale integer)
395 ''')))
397 cursor.execute(self._register_table(self._sql(
398 '''
399 CREATE TABLE IF NOT EXISTS %(db)s.%(kind_codes_count)s (
400 kind_codes_id integer PRIMARY KEY,
401 count integer)
402 ''')))
404 cursor.execute(self._sql(
405 '''
406 CREATE UNIQUE INDEX IF NOT EXISTS %(db)s.%(nuts)s_file_element
407 ON %(nuts)s (file_id, file_segment, file_element)
408 '''))
410 cursor.execute(self._sql(
411 '''
412 CREATE INDEX IF NOT EXISTS %(db)s.%(nuts)s_index_file_id
413 ON %(nuts)s (file_id)
414 '''))
416 cursor.execute(self._sql(
417 '''
418 CREATE INDEX IF NOT EXISTS %(db)s.%(nuts)s_index_tmin_seconds
419 ON %(nuts)s (kind_id, tmin_seconds)
420 '''))
422 cursor.execute(self._sql(
423 '''
424 CREATE INDEX IF NOT EXISTS %(db)s.%(nuts)s_index_tmax_seconds
425 ON %(nuts)s (kind_id, tmax_seconds)
426 '''))
428 cursor.execute(self._sql(
429 '''
430 CREATE INDEX IF NOT EXISTS %(db)s.%(nuts)s_index_kscale
431 ON %(nuts)s (kind_id, kscale, tmin_seconds)
432 '''))
434 cursor.execute(self._sql(
435 '''
436 CREATE TRIGGER IF NOT EXISTS %(db)s.%(nuts)s_delete_nuts
437 BEFORE DELETE ON main.files FOR EACH ROW
438 BEGIN
439 DELETE FROM %(nuts)s WHERE file_id == old.file_id;
440 END
441 '''))
443 # trigger only on size to make silent update of mtime possible
444 cursor.execute(self._sql(
445 '''
446 CREATE TRIGGER IF NOT EXISTS %(db)s.%(nuts)s_delete_nuts2
447 BEFORE UPDATE OF size ON main.files FOR EACH ROW
448 BEGIN
449 DELETE FROM %(nuts)s WHERE file_id == old.file_id;
450 END
451 '''))
453 cursor.execute(self._sql(
454 '''
455 CREATE TRIGGER IF NOT EXISTS
456 %(db)s.%(file_states)s_delete_files
457 BEFORE DELETE ON %(db)s.%(file_states)s FOR EACH ROW
458 BEGIN
459 DELETE FROM %(nuts)s WHERE file_id == old.file_id;
460 END
461 '''))
463 cursor.execute(self._sql(
464 '''
465 CREATE TRIGGER IF NOT EXISTS %(db)s.%(nuts)s_inc_kind_codes
466 BEFORE INSERT ON %(nuts)s FOR EACH ROW
467 BEGIN
468 INSERT OR IGNORE INTO %(kind_codes_count)s VALUES
469 (new.kind_codes_id, 0);
470 UPDATE %(kind_codes_count)s
471 SET count = count + 1
472 WHERE new.kind_codes_id
473 == %(kind_codes_count)s.kind_codes_id;
474 END
475 '''))
477 cursor.execute(self._sql(
478 '''
479 CREATE TRIGGER IF NOT EXISTS %(db)s.%(nuts)s_dec_kind_codes
480 BEFORE DELETE ON %(nuts)s FOR EACH ROW
481 BEGIN
482 UPDATE %(kind_codes_count)s
483 SET count = count - 1
484 WHERE old.kind_codes_id
485 == %(kind_codes_count)s.kind_codes_id;
486 END
487 '''))
489 cursor.execute(self._register_table(self._sql(
490 '''
491 CREATE TABLE IF NOT EXISTS %(db)s.%(coverage)s (
492 kind_codes_id integer,
493 time_seconds integer,
494 time_offset integer,
495 step integer)
496 ''')))
498 cursor.execute(self._sql(
499 '''
500 CREATE UNIQUE INDEX IF NOT EXISTS %(db)s.%(coverage)s_time
501 ON %(coverage)s (kind_codes_id, time_seconds, time_offset)
502 '''))
504 cursor.execute(self._sql(
505 '''
506 CREATE TRIGGER IF NOT EXISTS %(db)s.%(nuts)s_add_coverage
507 AFTER INSERT ON %(nuts)s FOR EACH ROW
508 BEGIN
509 INSERT OR IGNORE INTO %(coverage)s VALUES
510 (new.kind_codes_id, new.tmin_seconds, new.tmin_offset, 0)
511 ;
512 UPDATE %(coverage)s
513 SET step = step + 1
514 WHERE new.kind_codes_id == %(coverage)s.kind_codes_id
515 AND new.tmin_seconds == %(coverage)s.time_seconds
516 AND new.tmin_offset == %(coverage)s.time_offset
517 ;
518 INSERT OR IGNORE INTO %(coverage)s VALUES
519 (new.kind_codes_id, new.tmax_seconds, new.tmax_offset, 0)
520 ;
521 UPDATE %(coverage)s
522 SET step = step - 1
523 WHERE new.kind_codes_id == %(coverage)s.kind_codes_id
524 AND new.tmax_seconds == %(coverage)s.time_seconds
525 AND new.tmax_offset == %(coverage)s.time_offset
526 ;
527 DELETE FROM %(coverage)s
528 WHERE new.kind_codes_id == %(coverage)s.kind_codes_id
529 AND new.tmin_seconds == %(coverage)s.time_seconds
530 AND new.tmin_offset == %(coverage)s.time_offset
531 AND step == 0
532 ;
533 DELETE FROM %(coverage)s
534 WHERE new.kind_codes_id == %(coverage)s.kind_codes_id
535 AND new.tmax_seconds == %(coverage)s.time_seconds
536 AND new.tmax_offset == %(coverage)s.time_offset
537 AND step == 0
538 ;
539 END
540 '''))
542 cursor.execute(self._sql(
543 '''
544 CREATE TRIGGER IF NOT EXISTS %(db)s.%(nuts)s_remove_coverage
545 BEFORE DELETE ON %(nuts)s FOR EACH ROW
546 BEGIN
547 INSERT OR IGNORE INTO %(coverage)s VALUES
548 (old.kind_codes_id, old.tmin_seconds, old.tmin_offset, 0)
549 ;
550 UPDATE %(coverage)s
551 SET step = step - 1
552 WHERE old.kind_codes_id == %(coverage)s.kind_codes_id
553 AND old.tmin_seconds == %(coverage)s.time_seconds
554 AND old.tmin_offset == %(coverage)s.time_offset
555 ;
556 INSERT OR IGNORE INTO %(coverage)s VALUES
557 (old.kind_codes_id, old.tmax_seconds, old.tmax_offset, 0)
558 ;
559 UPDATE %(coverage)s
560 SET step = step + 1
561 WHERE old.kind_codes_id == %(coverage)s.kind_codes_id
562 AND old.tmax_seconds == %(coverage)s.time_seconds
563 AND old.tmax_offset == %(coverage)s.time_offset
564 ;
565 DELETE FROM %(coverage)s
566 WHERE old.kind_codes_id == %(coverage)s.kind_codes_id
567 AND old.tmin_seconds == %(coverage)s.time_seconds
568 AND old.tmin_offset == %(coverage)s.time_offset
569 AND step == 0
570 ;
571 DELETE FROM %(coverage)s
572 WHERE old.kind_codes_id == %(coverage)s.kind_codes_id
573 AND old.tmax_seconds == %(coverage)s.time_seconds
574 AND old.tmax_offset == %(coverage)s.time_offset
575 AND step == 0
576 ;
577 END
578 '''))
580 def _delete(self):
581 '''Delete database tables associated with this Squirrel.'''
583 with self.transaction('delete tables') as cursor:
584 for s in '''
585 DROP TRIGGER %(db)s.%(nuts)s_delete_nuts;
586 DROP TRIGGER %(db)s.%(nuts)s_delete_nuts2;
587 DROP TRIGGER %(db)s.%(file_states)s_delete_files;
588 DROP TRIGGER %(db)s.%(nuts)s_inc_kind_codes;
589 DROP TRIGGER %(db)s.%(nuts)s_dec_kind_codes;
590 DROP TABLE %(db)s.%(nuts)s;
591 DROP TABLE %(db)s.%(kind_codes_count)s;
592 DROP TRIGGER IF EXISTS %(db)s.%(nuts)s_add_coverage;
593 DROP TRIGGER IF EXISTS %(db)s.%(nuts)s_remove_coverage;
594 DROP TABLE IF EXISTS %(db)s.%(coverage)s;
595 '''.strip().splitlines():
597 cursor.execute(self._sql(s))
599 Selection._delete(self)
601 @filldocs
602 def add(self,
603 paths,
604 kinds=None,
605 format='detect',
606 include=None,
607 exclude=None,
608 check=True,
609 transaction=None):
611 '''
612 Add files to the selection.
614 :param paths:
615 Iterator yielding paths to files or directories to be added to the
616 selection. Recurses into directories. If given a ``str``, it
617 is treated as a single path to be added.
618 :type paths:
619 :py:class:`list` of :py:class:`str`
621 :param kinds:
622 Content types to be made available through the Squirrel selection.
623 By default, all known content types are accepted.
624 :type kinds:
625 :py:class:`list` of :py:class:`str`
627 :param format:
628 File format identifier or ``'detect'`` to enable auto-detection
629 (available: %(file_formats)s).
630 :type format:
631 str
633 :param include:
634 If not ``None``, files are only included if their paths match the
635 given regular expression pattern.
636 :type format:
637 str
639 :param exclude:
640 If not ``None``, files are only included if their paths do not
641 match the given regular expression pattern.
642 :type format:
643 str
645 :param check:
646 If ``True``, all file modification times are checked to see if
647 cached information has to be updated (slow). If ``False``, only
648 previously unknown files are indexed and cached information is used
649 for known files, regardless of file state (fast, corrresponds to
650 Squirrel's ``--optimistic`` mode). File deletions will go
651 undetected in the latter case.
652 :type check:
653 bool
655 :Complexity:
656 O(log N)
657 '''
659 if isinstance(kinds, str):
660 kinds = (kinds,)
662 if isinstance(paths, str):
663 paths = [paths]
665 kind_mask = model.to_kind_mask(kinds)
667 Selection.add(
668 self, util.iter_select_files(
669 paths,
670 show_progress=False,
671 include=include,
672 exclude=exclude,
673 pass_through=lambda path: path.startswith('virtual:')
674 ), kind_mask, format, transaction=transaction)
676 self._load(check, transaction=transaction)
677 self._update_nuts(transaction=transaction)
679 def reload(self):
680 '''
681 Check for modifications and reindex modified files.
683 Based on file modification times.
684 '''
686 self._set_file_states_force_check()
687 self._load(check=True)
688 self._update_nuts()
690 def add_virtual(self, nuts, virtual_paths=None):
691 '''
692 Add content which is not backed by files.
694 :param nuts:
695 Content pieces to be added.
696 :type nuts:
697 iterator yielding :py:class:`~pyrocko.squirrel.model.Nut` objects
699 :param virtual_paths:
700 List of virtual paths to prevent creating a temporary list of the
701 nuts while aggregating the file paths for the selection.
702 :type virtual_paths:
703 :py:class:`list` of :py:class:`str`
705 Stores to the main database and the selection.
706 '''
708 if isinstance(virtual_paths, str):
709 virtual_paths = [virtual_paths]
711 if virtual_paths is None:
712 if not isinstance(nuts, list):
713 nuts = list(nuts)
714 virtual_paths = set(nut.file_path for nut in nuts)
716 transaction = self.transaction('add virtual')
717 with transaction:
718 Selection.add(self, virtual_paths, transaction=transaction)
719 if isinstance(nuts, list) and len(nuts) == 0:
720 self.flag_modified(False, transaction=transaction)
721 else:
722 self.get_database().dig(nuts, transaction=transaction)
723 self._update_nuts(transaction=transaction)
725 def add_volatile(self, nuts):
726 if not isinstance(nuts, list):
727 nuts = list(nuts)
729 paths = list(set(nut.file_path for nut in nuts))
730 io.backends.virtual.add_nuts(nuts)
731 self.add_virtual(nuts, paths)
732 self._volatile_paths.extend(paths)
734 def add_volatile_waveforms(self, traces):
735 '''
736 Add in-memory waveforms which will be removed when the app closes.
737 '''
739 name = model.random_name()
741 path = 'virtual:volatile:%s' % name
743 nuts = []
744 for itr, tr in enumerate(traces):
745 assert tr.tmin <= tr.tmax
746 tmin_seconds, tmin_offset = model.tsplit(tr.tmin)
747 tmax_seconds, tmax_offset = model.tsplit(
748 tr.tmin + tr.data_len()*tr.deltat)
750 nuts.append(model.Nut(
751 file_path=path,
752 file_format='virtual',
753 file_segment=itr,
754 file_element=0,
755 file_mtime=0,
756 codes=tr.codes,
757 tmin_seconds=tmin_seconds,
758 tmin_offset=tmin_offset,
759 tmax_seconds=tmax_seconds,
760 tmax_offset=tmax_offset,
761 deltat=tr.deltat,
762 kind_id=to_kind_id('waveform'),
763 content=tr))
765 self.add_volatile(nuts)
766 return path
768 def _load(self, check, transaction=None):
769 for _ in io.iload(
770 self,
771 content=[],
772 skip_unchanged=True,
773 check=check,
774 transaction=transaction):
775 pass
777 def _update_nuts(self, transaction=None):
778 transaction = transaction or self.transaction('update nuts')
779 with make_task('Aggregating selection') as task, \
780 transaction as cursor:
782 self._conn.set_progress_handler(task.update, 100000)
783 nrows = cursor.execute(self._sql(
784 '''
785 INSERT INTO %(db)s.%(nuts)s
786 SELECT NULL,
787 nuts.file_id, nuts.file_segment, nuts.file_element,
788 nuts.kind_id, nuts.kind_codes_id,
789 nuts.tmin_seconds, nuts.tmin_offset,
790 nuts.tmax_seconds, nuts.tmax_offset,
791 nuts.kscale
792 FROM %(db)s.%(file_states)s
793 INNER JOIN nuts
794 ON %(db)s.%(file_states)s.file_id == nuts.file_id
795 INNER JOIN kind_codes
796 ON nuts.kind_codes_id ==
797 kind_codes.kind_codes_id
798 WHERE %(db)s.%(file_states)s.file_state != 2
799 AND (((1 << kind_codes.kind_id)
800 & %(db)s.%(file_states)s.kind_mask) != 0)
801 ''')).rowcount
803 task.update(nrows)
804 self._set_file_states_known(transaction)
805 self._conn.set_progress_handler(None, 0)
807 def add_source(self, source, check=True, upgrade=False):
808 '''
809 Add remote resource.
811 :param source:
812 Remote data access client instance.
813 :type source:
814 subclass of :py:class:`~pyrocko.squirrel.client.base.Source`
815 '''
817 self._sources.append(source)
818 source.setup(self, check=check, upgrade=upgrade)
820 def add_fdsn(self, *args, **kwargs):
821 '''
822 Add FDSN site for transparent remote data access.
824 Arguments are passed to
825 :py:class:`~pyrocko.squirrel.client.fdsn.FDSNSource`.
826 '''
827 source = fdsn.FDSNSource(*args, **kwargs)
828 source.set_basepath('.')
829 self.add_source(source)
831 def add_catalog(self, *args, **kwargs):
832 '''
833 Add online catalog for transparent event data access.
835 Arguments are passed to
836 :py:class:`~pyrocko.squirrel.client.catalog.CatalogSource`.
837 '''
839 self.add_source(catalog.CatalogSource(*args, **kwargs))
841 def add_dataset(self, ds, check=True, upgrade=False):
842 '''
843 Read dataset description from file and add its contents.
845 :param ds:
846 Path to dataset description file, dataset description object
847 or name of a built-in dataset. See
848 :py:mod:`~pyrocko.squirrel.dataset`.
849 :type ds:
850 :py:class:`str` or :py:class:`~pyrocko.squirrel.dataset.Dataset`
852 :param check:
853 If ``True``, all file modification times are checked to see if
854 cached information has to be updated (slow). If ``False``, only
855 previously unknown files are indexed and cached information is used
856 for known files, regardless of file state (fast, corrresponds to
857 Squirrel's ``--optimistic`` mode). File deletions will go
858 undetected in the latter case.
859 :type check:
860 bool
861 '''
862 if isinstance(ds, str):
863 ds = dataset.read_dataset(ds)
865 ds.setup(self, check=check, upgrade=upgrade)
867 def _get_selection_args(
868 self, kind_id,
869 obj=None, tmin=None, tmax=None, time=None, codes=None):
871 if codes is not None:
872 codes = codes_patterns_for_kind(kind_id, codes)
874 if time is not None:
875 tmin = time
876 tmax = time
878 if obj is not None:
879 tmin = tmin if tmin is not None else obj.tmin
880 tmax = tmax if tmax is not None else obj.tmax
881 codes = codes if codes is not None else codes_patterns_for_kind(
882 kind_id, obj.codes)
884 return tmin, tmax, codes
886 def _get_selection_args_str(self, *args, **kwargs):
888 tmin, tmax, codes = self._get_selection_args(*args, **kwargs)
889 return 'tmin: %s, tmax: %s, codes: %s' % (
890 util.time_to_str(tmin) if tmin is not None else 'none',
891 util.time_to_str(tmax) if tmax is not None else 'none',
892 ','.join(str(entry) for entry in codes))
894 def _selection_args_to_kwargs(
895 self, obj=None, tmin=None, tmax=None, time=None, codes=None):
897 return dict(obj=obj, tmin=tmin, tmax=tmax, time=time, codes=codes)
899 def _timerange_sql(self, tmin, tmax, kind, cond, args, naiv):
901 tmin_seconds, tmin_offset = model.tsplit(tmin)
902 tmax_seconds, tmax_offset = model.tsplit(tmax)
903 if naiv:
904 cond.append('%(db)s.%(nuts)s.tmin_seconds <= ?')
905 args.append(tmax_seconds)
906 else:
907 tscale_edges = model.tscale_edges
908 tmin_cond = []
909 for kscale in range(tscale_edges.size + 1):
910 if kscale != tscale_edges.size:
911 tscale = int(tscale_edges[kscale])
912 tmin_cond.append('''
913 (%(db)s.%(nuts)s.kind_id = ?
914 AND %(db)s.%(nuts)s.kscale == ?
915 AND %(db)s.%(nuts)s.tmin_seconds BETWEEN ? AND ?)
916 ''')
917 args.extend(
918 (to_kind_id(kind), kscale,
919 tmin_seconds - tscale - 1, tmax_seconds + 1))
921 else:
922 tmin_cond.append('''
923 (%(db)s.%(nuts)s.kind_id == ?
924 AND %(db)s.%(nuts)s.kscale == ?
925 AND %(db)s.%(nuts)s.tmin_seconds <= ?)
926 ''')
928 args.extend(
929 (to_kind_id(kind), kscale, tmax_seconds + 1))
930 if tmin_cond:
931 cond.append(' ( ' + ' OR '.join(tmin_cond) + ' ) ')
933 cond.append('%(db)s.%(nuts)s.tmax_seconds >= ?')
934 args.append(tmin_seconds)
936 def _codes_match_sql(self, positive, kind_id, codes, cond, args):
937 pats = codes_patterns_for_kind(kind_id, codes)
938 if pats is None:
939 return
941 pats_exact, pats_nonexact = model.classify_patterns(pats)
943 codes_cond = []
944 if pats_exact:
945 codes_cond.append(' ( kind_codes.codes IN ( %s ) ) ' % ', '.join(
946 '?'*len(pats_exact)))
948 args.extend(pats_exact)
950 if pats_nonexact:
951 codes_cond.append(' ( %s ) ' % ' OR '.join(
952 ('kind_codes.codes GLOB ?',) * len(pats_nonexact)))
954 args.extend(pats_nonexact)
956 if codes_cond:
957 cond.append('%s ( %s )' % (
958 'NOT' if not positive else '',
959 ' OR '.join(codes_cond)))
961 def iter_nuts(
962 self, kind=None, tmin=None, tmax=None, codes=None,
963 codes_exclude=None, sample_rate_min=None, sample_rate_max=None,
964 naiv=False, kind_codes_ids=None, path=None, limit=None):
966 '''
967 Iterate over content entities matching given constraints.
969 :param kind:
970 Content kind (or kinds) to extract.
971 :type kind:
972 :py:class:`str`, :py:class:`list` of :py:class:`str`
974 :param tmin:
975 Start time of query interval.
976 :type tmin:
977 :py:func:`~pyrocko.util.get_time_float`
979 :param tmax:
980 End time of query interval.
981 :type tmax:
982 :py:func:`~pyrocko.util.get_time_float`
984 :param codes:
985 List of code patterns to query.
986 :type codes:
987 :py:class:`list` of :py:class:`~pyrocko.squirrel.model.Codes`
988 objects appropriate for the queried content type, or anything which
989 can be converted to such objects.
991 :param naiv:
992 Bypass time span lookup through indices (slow, for testing).
993 :type naiv:
994 :py:class:`bool`
996 :param kind_codes_ids:
997 Kind-codes IDs of contents to be retrieved (internal use).
998 :type kind_codes_ids:
999 :py:class:`list` of :py:class:`int`
1001 :yields:
1002 :py:class:`~pyrocko.squirrel.model.Nut` objects representing the
1003 intersecting content.
1005 :complexity:
1006 O(log N) for the time selection part due to heavy use of database
1007 indices.
1009 Query time span is treated as a half-open interval ``[tmin, tmax)``.
1010 However, if ``tmin`` equals ``tmax``, the edge logics are modified to
1011 closed-interval so that content intersecting with the time instant ``t
1012 = tmin = tmax`` is returned (otherwise nothing would be returned as
1013 ``[t, t)`` never matches anything).
1015 Time spans of content entities to be matched are also treated as half
1016 open intervals, e.g. content span ``[0, 1)`` is matched by query span
1017 ``[0, 1)`` but not by ``[-1, 0)`` or ``[1, 2)``. Also here, logics are
1018 modified to closed-interval when the content time span is an empty
1019 interval, i.e. to indicate a time instant. E.g. time instant 0 is
1020 matched by ``[0, 1)`` but not by ``[-1, 0)`` or ``[1, 2)``.
1021 '''
1023 if not isinstance(kind, str):
1024 if kind is None:
1025 kind = model.g_content_kinds
1026 for kind_ in kind:
1027 for nut in self.iter_nuts(kind_, tmin, tmax, codes):
1028 yield nut
1030 return
1032 if codes is not None and len(codes) == 0:
1033 return
1035 if kind_codes_ids is not None and len(kind_codes_ids) == 0:
1036 return
1038 kind_id = to_kind_id(kind)
1040 cond = []
1041 args = []
1042 if tmin is not None or tmax is not None:
1043 assert kind is not None
1044 if tmin is None:
1045 tmin = self.get_time_span()[0]
1046 if tmax is None:
1047 tmax = self.get_time_span()[1] + 1.0
1049 self._timerange_sql(tmin, tmax, kind, cond, args, naiv)
1051 cond.append('kind_codes.kind_id == ?')
1052 args.append(kind_id)
1054 if codes is not None:
1055 self._codes_match_sql(True, kind_id, codes, cond, args)
1057 if codes_exclude is not None:
1058 self._codes_match_sql(False, kind_id, codes_exclude, cond, args)
1060 if sample_rate_min is not None:
1061 cond.append('kind_codes.deltat <= ?')
1062 args.append(1.0/sample_rate_min)
1064 if sample_rate_max is not None:
1065 cond.append('? <= kind_codes.deltat')
1066 args.append(1.0/sample_rate_max)
1068 if kind_codes_ids is not None:
1069 cond.append(
1070 ' ( kind_codes.kind_codes_id IN ( %s ) ) ' % ', '.join(
1071 '?'*len(kind_codes_ids)))
1073 args.extend(kind_codes_ids)
1075 db = self.get_database()
1076 if path is not None:
1077 cond.append('files.path == ?')
1078 args.append(db.relpath(abspath(path)))
1080 sql = ('''
1081 SELECT
1082 files.path,
1083 files.format,
1084 files.mtime,
1085 files.size,
1086 %(db)s.%(nuts)s.file_segment,
1087 %(db)s.%(nuts)s.file_element,
1088 kind_codes.kind_id,
1089 kind_codes.codes,
1090 %(db)s.%(nuts)s.tmin_seconds,
1091 %(db)s.%(nuts)s.tmin_offset,
1092 %(db)s.%(nuts)s.tmax_seconds,
1093 %(db)s.%(nuts)s.tmax_offset,
1094 kind_codes.deltat
1095 FROM files
1096 INNER JOIN %(db)s.%(nuts)s
1097 ON files.file_id == %(db)s.%(nuts)s.file_id
1098 INNER JOIN kind_codes
1099 ON %(db)s.%(nuts)s.kind_codes_id == kind_codes.kind_codes_id
1100 ''')
1102 if cond:
1103 sql += ''' WHERE ''' + ' AND '.join(cond)
1105 if limit is not None:
1106 sql += ''' LIMIT %i''' % limit
1108 sql = self._sql(sql)
1109 if tmin is None and tmax is None:
1110 for row in self._conn.execute(sql, args):
1111 row = (db.abspath(row[0]),) + row[1:]
1112 nut = model.Nut(values_nocheck=row)
1113 yield nut
1114 else:
1115 assert tmin is not None and tmax is not None
1116 if tmin == tmax:
1117 for row in self._conn.execute(sql, args):
1118 row = (db.abspath(row[0]),) + row[1:]
1119 nut = model.Nut(values_nocheck=row)
1120 if (nut.tmin <= tmin < nut.tmax) \
1121 or (nut.tmin == nut.tmax and tmin == nut.tmin):
1123 yield nut
1124 else:
1125 for row in self._conn.execute(sql, args):
1126 row = (db.abspath(row[0]),) + row[1:]
1127 nut = model.Nut(values_nocheck=row)
1128 if (tmin < nut.tmax and nut.tmin < tmax) \
1129 or (nut.tmin == nut.tmax
1130 and tmin <= nut.tmin < tmax):
1132 yield nut
1134 def get_nuts(self, *args, **kwargs):
1135 '''
1136 Get content entities matching given constraints.
1138 Like :py:meth:`iter_nuts` but returns results as a list.
1139 '''
1141 return list(self.iter_nuts(*args, **kwargs))
1143 def _split_nuts(
1144 self,
1145 kind,
1146 tmin=None,
1147 tmax=None,
1148 codes=None,
1149 path=None,
1150 transaction=None):
1152 kind_id = to_kind_id(kind)
1153 tmin_seconds, tmin_offset = model.tsplit(tmin)
1154 tmax_seconds, tmax_offset = model.tsplit(tmax)
1156 names_main_nuts = dict(self._names)
1157 names_main_nuts.update(db='main', nuts='nuts')
1159 db = self.get_database()
1161 def main_nuts(s):
1162 return s % names_main_nuts
1164 with (transaction or self.transaction('split nuts')) as cursor:
1165 # modify selection and main
1166 for sql_subst in [
1167 self._sql, main_nuts]:
1169 cond = []
1170 args = []
1172 self._timerange_sql(tmin, tmax, kind, cond, args, False)
1174 if codes is not None:
1175 self._codes_match_sql(True, kind_id, codes, cond, args)
1177 if path is not None:
1178 cond.append('files.path == ?')
1179 args.append(db.relpath(abspath(path)))
1181 sql = sql_subst('''
1182 SELECT
1183 %(db)s.%(nuts)s.nut_id,
1184 %(db)s.%(nuts)s.tmin_seconds,
1185 %(db)s.%(nuts)s.tmin_offset,
1186 %(db)s.%(nuts)s.tmax_seconds,
1187 %(db)s.%(nuts)s.tmax_offset,
1188 kind_codes.deltat
1189 FROM files
1190 INNER JOIN %(db)s.%(nuts)s
1191 ON files.file_id == %(db)s.%(nuts)s.file_id
1192 INNER JOIN kind_codes
1193 ON %(db)s.%(nuts)s.kind_codes_id == kind_codes.kind_codes_id
1194 WHERE ''' + ' AND '.join(cond)) # noqa
1196 insert = []
1197 delete = []
1198 for row in cursor.execute(sql, args):
1199 nut_id, nut_tmin_seconds, nut_tmin_offset, \
1200 nut_tmax_seconds, nut_tmax_offset, nut_deltat = row
1202 nut_tmin = model.tjoin(
1203 nut_tmin_seconds, nut_tmin_offset)
1204 nut_tmax = model.tjoin(
1205 nut_tmax_seconds, nut_tmax_offset)
1207 if nut_tmin < tmax and tmin < nut_tmax:
1208 if nut_tmin < tmin:
1209 insert.append((
1210 nut_tmin_seconds, nut_tmin_offset,
1211 tmin_seconds, tmin_offset,
1212 model.tscale_to_kscale(
1213 tmin_seconds - nut_tmin_seconds),
1214 nut_id))
1216 if tmax < nut_tmax:
1217 insert.append((
1218 tmax_seconds, tmax_offset,
1219 nut_tmax_seconds, nut_tmax_offset,
1220 model.tscale_to_kscale(
1221 nut_tmax_seconds - tmax_seconds),
1222 nut_id))
1224 delete.append((nut_id,))
1226 sql_add = '''
1227 INSERT INTO %(db)s.%(nuts)s (
1228 file_id, file_segment, file_element, kind_id,
1229 kind_codes_id, tmin_seconds, tmin_offset,
1230 tmax_seconds, tmax_offset, kscale )
1231 SELECT
1232 file_id, file_segment, file_element,
1233 kind_id, kind_codes_id, ?, ?, ?, ?, ?
1234 FROM %(db)s.%(nuts)s
1235 WHERE nut_id == ?
1236 '''
1237 cursor.executemany(sql_subst(sql_add), insert)
1239 sql_delete = '''
1240 DELETE FROM %(db)s.%(nuts)s WHERE nut_id == ?
1241 '''
1242 cursor.executemany(sql_subst(sql_delete), delete)
1244 def get_time_span(self, kinds=None, tight=True, dummy_limits=True):
1245 '''
1246 Get time interval over all content in selection.
1248 :param kinds:
1249 If not ``None``, restrict query to given content kinds.
1250 :type kind:
1251 list of str
1253 :complexity:
1254 O(1), independent of the number of nuts.
1256 :returns:
1257 ``(tmin, tmax)``, combined time interval of queried content kinds.
1258 '''
1260 sql_min = self._sql('''
1261 SELECT MIN(tmin_seconds), MIN(tmin_offset)
1262 FROM %(db)s.%(nuts)s
1263 WHERE kind_id == ?
1264 AND tmin_seconds == (
1265 SELECT MIN(tmin_seconds)
1266 FROM %(db)s.%(nuts)s
1267 WHERE kind_id == ?)
1268 ''')
1270 sql_max = self._sql('''
1271 SELECT MAX(tmax_seconds), MAX(tmax_offset)
1272 FROM %(db)s.%(nuts)s
1273 WHERE kind_id == ?
1274 AND tmax_seconds == (
1275 SELECT MAX(tmax_seconds)
1276 FROM %(db)s.%(nuts)s
1277 WHERE kind_id == ?)
1278 ''')
1280 gtmin = None
1281 gtmax = None
1283 if isinstance(kinds, str):
1284 kinds = [kinds]
1286 if kinds is None:
1287 kind_ids = model.g_content_kind_ids
1288 else:
1289 kind_ids = model.to_kind_ids(kinds)
1291 tmins = []
1292 tmaxs = []
1293 for kind_id in kind_ids:
1294 for tmin_seconds, tmin_offset in self._conn.execute(
1295 sql_min, (kind_id, kind_id)):
1296 tmins.append(model.tjoin(tmin_seconds, tmin_offset))
1298 for (tmax_seconds, tmax_offset) in self._conn.execute(
1299 sql_max, (kind_id, kind_id)):
1300 tmaxs.append(model.tjoin(tmax_seconds, tmax_offset))
1302 tmins = [tmin if tmin != model.g_tmin else None for tmin in tmins]
1303 tmaxs = [tmax if tmax != model.g_tmax else None for tmax in tmaxs]
1305 if tight:
1306 gtmin = nonef(min, tmins)
1307 gtmax = nonef(max, tmaxs)
1308 else:
1309 gtmin = None if None in tmins else nonef(min, tmins)
1310 gtmax = None if None in tmaxs else nonef(max, tmaxs)
1312 if dummy_limits:
1313 if gtmin is None:
1314 gtmin = model.g_tmin
1315 if gtmax is None:
1316 gtmax = model.g_tmax
1318 return gtmin, gtmax
1320 def has(self, kinds):
1321 '''
1322 Check availability of given content kinds.
1324 :param kinds:
1325 Content kinds to query.
1326 :type kind:
1327 list of str
1329 :returns:
1330 ``True`` if any of the queried content kinds is available
1331 in the selection.
1332 '''
1333 self_tmin, self_tmax = self.get_time_span(
1334 kinds, dummy_limits=False)
1336 return not (self_tmin is None and self_tmax is None)
1338 def get_deltat_span(self, kind):
1339 '''
1340 Get min and max sampling interval of all content of given kind.
1342 :param kind:
1343 Content kind
1344 :type kind:
1345 str
1347 :returns: ``(deltat_min, deltat_max)``
1348 '''
1350 deltats = [
1351 deltat for deltat in self.get_deltats(kind)
1352 if deltat is not None]
1354 if deltats:
1355 return min(deltats), max(deltats)
1356 else:
1357 return None, None
1359 def iter_kinds(self, codes=None):
1360 '''
1361 Iterate over content types available in selection.
1363 :param codes:
1364 If given, get kinds only for selected codes identifier.
1365 Only a single identifier may be given here and no pattern matching
1366 is done, currently.
1367 :type codes:
1368 :py:class:`~pyrocko.squirrel.model.Codes`
1370 :yields:
1371 Available content kinds as :py:class:`str`.
1373 :complexity:
1374 O(1), independent of number of nuts.
1375 '''
1377 return self._database._iter_kinds(
1378 codes=codes,
1379 kind_codes_count='%(db)s.%(kind_codes_count)s' % self._names)
1381 def iter_deltats(self, kind=None):
1382 '''
1383 Iterate over sampling intervals available in selection.
1385 :param kind:
1386 If given, get sampling intervals only for a given content type.
1387 :type kind:
1388 str
1390 :yields:
1391 :py:class:`float` values.
1393 :complexity:
1394 O(1), independent of number of nuts.
1395 '''
1396 return self._database._iter_deltats(
1397 kind=kind,
1398 kind_codes_count='%(db)s.%(kind_codes_count)s' % self._names)
1400 def iter_codes(self, kind=None):
1401 '''
1402 Iterate over content identifier code sequences available in selection.
1404 :param kind:
1405 If given, get codes only for a given content type.
1406 :type kind:
1407 str
1409 :yields:
1410 :py:class:`tuple` of :py:class:`str`
1412 :complexity:
1413 O(1), independent of number of nuts.
1414 '''
1415 return self._database._iter_codes(
1416 kind=kind,
1417 kind_codes_count='%(db)s.%(kind_codes_count)s' % self._names)
1419 def _iter_codes_info(self, kind=None, codes=None):
1420 '''
1421 Iterate over number of occurrences of any (kind, codes) combination.
1423 :param kind:
1424 If given, get counts only for selected content type.
1425 :type kind:
1426 str
1428 :yields:
1429 Tuples of the form ``(kind, codes, deltat, kind_codes_id, count)``.
1431 :complexity:
1432 O(1), independent of number of nuts.
1433 '''
1434 return self._database._iter_codes_info(
1435 kind=kind,
1436 codes=codes,
1437 kind_codes_count='%(db)s.%(kind_codes_count)s' % self._names)
1439 def get_kinds(self, codes=None):
1440 '''
1441 Get content types available in selection.
1443 :param codes:
1444 If given, get kinds only for selected codes identifier.
1445 Only a single identifier may be given here and no pattern matching
1446 is done, currently.
1447 :type codes:
1448 :py:class:`~pyrocko.squirrel.model.Codes`
1450 :returns:
1451 Sorted list of available content types.
1452 :rtype:
1453 py:class:`list` of :py:class:`str`
1455 :complexity:
1456 O(1), independent of number of nuts.
1458 '''
1459 return sorted(list(self.iter_kinds(codes=codes)))
1461 def get_deltats(self, kind=None):
1462 '''
1463 Get sampling intervals available in selection.
1465 :param kind:
1466 If given, get sampling intervals only for selected content type.
1467 :type kind:
1468 str
1470 :complexity:
1471 O(1), independent of number of nuts.
1473 :returns: Sorted list of available sampling intervals.
1474 '''
1475 return sorted(list(self.iter_deltats(kind=kind)))
1477 def get_codes(self, kind=None):
1478 '''
1479 Get identifier code sequences available in selection.
1481 :param kind:
1482 If given, get codes only for selected content type.
1483 :type kind:
1484 str
1486 :complexity:
1487 O(1), independent of number of nuts.
1489 :returns: Sorted list of available codes as tuples of strings.
1490 '''
1491 return sorted(list(self.iter_codes(kind=kind)))
1493 def get_counts(self, kind=None):
1494 '''
1495 Get number of occurrences of any (kind, codes) combination.
1497 :param kind:
1498 If given, get codes only for selected content type.
1499 :type kind:
1500 str
1502 :complexity:
1503 O(1), independent of number of nuts.
1505 :returns: ``dict`` with ``counts[kind][codes]`` or ``counts[codes]``
1506 if kind is not ``None``
1507 '''
1508 d = {}
1509 for kind_id, codes, _, _, count in self._iter_codes_info(kind=kind):
1510 if kind_id not in d:
1511 v = d[kind_id] = {}
1512 else:
1513 v = d[kind_id]
1515 if codes not in v:
1516 v[codes] = 0
1518 v[codes] += count
1520 if kind is not None:
1521 return d[to_kind_id(kind)]
1522 else:
1523 return dict((to_kind(kind_id), v) for (kind_id, v) in d.items())
1525 def glob_codes(self, kind, codes):
1526 '''
1527 Find codes matching given patterns.
1529 :param kind:
1530 Content kind to be queried.
1531 :type kind:
1532 str
1534 :param codes:
1535 List of code patterns to query.
1536 :type codes:
1537 :py:class:`list` of :py:class:`~pyrocko.squirrel.model.Codes`
1538 objects appropriate for the queried content type, or anything which
1539 can be converted to such objects.
1541 :returns:
1542 List of matches of the form ``[kind_codes_id, codes, deltat]``.
1543 '''
1545 kind_id = to_kind_id(kind)
1546 args = [kind_id]
1547 pats = codes_patterns_for_kind(kind_id, codes)
1549 if pats:
1550 codes_cond = 'AND ( %s ) ' % ' OR '.join(
1551 ('kind_codes.codes GLOB ?',) * len(pats))
1553 args.extend(pat.safe_str for pat in pats)
1554 else:
1555 codes_cond = ''
1557 sql = self._sql('''
1558 SELECT kind_codes_id, codes, deltat FROM kind_codes
1559 WHERE
1560 kind_id == ? ''' + codes_cond)
1562 return list(map(list, self._conn.execute(sql, args)))
1564 def update(
1565 self,
1566 constraint=None,
1567 inventory=('channel', 'event'),
1568 **kwargs):
1570 '''
1571 Update or partially update channel and event inventories.
1573 :param constraint:
1574 Selection of times or areas to be brought up to date.
1575 :type constraint:
1576 :py:class:`~pyrocko.squirrel.client.base.Constraint`
1578 :param inventory:
1579 What to update, ``'channel'`` for channel inventories,
1580 ``'event'`` for event catalogs.
1581 :type inventory:
1582 :py:class:`tuple` of :py:class:`str`
1584 :param \\*\\*kwargs:
1585 Shortcut for setting ``constraint=Constraint(**kwargs)``.
1587 This function triggers all attached remote sources, to check for
1588 updates in the meta-data. The sources will only submit queries when
1589 their expiration date has passed, or if the selection spans into
1590 previously unseen times or areas.
1591 '''
1593 if isinstance(inventory, str):
1594 inventory = (inventory,)
1596 for inv in inventory:
1597 if inv not in ('channel', 'event'):
1598 raise error.SquirrelError(
1599 'Invalid argument for `inventory`: %s' % inv)
1601 if constraint is None:
1602 constraint = client.Constraint(**kwargs)
1604 task = make_task('Updating sources')
1605 for source in task(self._sources):
1606 if 'channel' in inventory:
1607 source.update_channel_inventory(self, constraint)
1608 if 'event' in inventory:
1609 source.update_event_inventory(self, constraint)
1611 def update_waveform_promises(self, constraint=None, **kwargs):
1612 '''
1613 Permit downloading of remote waveforms.
1615 :param constraint:
1616 Remote waveforms compatible with the given constraint are enabled
1617 for download.
1618 :type constraint:
1619 :py:class:`~pyrocko.squirrel.client.base.Constraint`
1621 :param \\*\\*kwargs:
1622 Shortcut for setting ``constraint=Constraint(**kwargs)``.
1624 Calling this method permits Squirrel to download waveforms from remote
1625 sources when processing subsequent waveform requests. This works by
1626 inserting so called waveform promises into the database. It will look
1627 into the available channels for each remote source and create a promise
1628 for each channel compatible with the given constraint. If the promise
1629 then matches in a waveform request, Squirrel tries to download the
1630 waveform. If the download is successful, the downloaded waveform is
1631 added to the Squirrel and the promise is deleted. If the download
1632 fails, the promise is kept if the reason of failure looks like being
1633 temporary, e.g. because of a network failure. If the cause of failure
1634 however seems to be permanent, the promise is deleted so that no
1635 further attempts are made to download a waveform which might not be
1636 available from that server at all. To force re-scheduling after a
1637 permanent failure, call :py:meth:`update_waveform_promises`
1638 yet another time.
1639 '''
1641 if constraint is None:
1642 constraint = client.Constraint(**kwargs)
1644 for source in self._sources:
1645 source.update_waveform_promises(self, constraint)
1647 def remove_waveform_promises(self, from_database='selection'):
1648 '''
1649 Remove waveform promises from live selection or global database.
1651 Calling this function removes all waveform promises provided by the
1652 attached sources.
1654 :param from_database:
1655 Remove from live selection ``'selection'`` or global database
1656 ``'global'``.
1657 '''
1658 for source in self._sources:
1659 source.remove_waveform_promises(self, from_database=from_database)
1661 def update_responses(self, constraint=None, **kwargs):
1662 if constraint is None:
1663 constraint = client.Constraint(**kwargs)
1665 for source in self._sources:
1666 source.update_response_inventory(self, constraint)
1668 def get_nfiles(self):
1669 '''
1670 Get number of files in selection.
1671 '''
1673 sql = self._sql('''SELECT COUNT(*) FROM %(db)s.%(file_states)s''')
1674 for row in self._conn.execute(sql):
1675 return row[0]
1677 def get_nnuts(self):
1678 '''
1679 Get number of nuts in selection.
1680 '''
1682 sql = self._sql('''SELECT COUNT(*) FROM %(db)s.%(nuts)s''')
1683 for row in self._conn.execute(sql):
1684 return row[0]
1686 def get_total_size(self):
1687 '''
1688 Get aggregated file size available in selection.
1689 '''
1691 sql = self._sql('''
1692 SELECT SUM(files.size) FROM %(db)s.%(file_states)s
1693 INNER JOIN files
1694 ON %(db)s.%(file_states)s.file_id = files.file_id
1695 ''')
1697 for row in self._conn.execute(sql):
1698 return row[0] or 0
1700 def get_stats(self):
1701 '''
1702 Get statistics on contents available through this selection.
1703 '''
1705 kinds = self.get_kinds()
1706 time_spans = {}
1707 for kind in kinds:
1708 time_spans[kind] = self.get_time_span([kind])
1710 return SquirrelStats(
1711 nfiles=self.get_nfiles(),
1712 nnuts=self.get_nnuts(),
1713 kinds=kinds,
1714 codes=self.get_codes(),
1715 total_size=self.get_total_size(),
1716 counts=self.get_counts(),
1717 time_spans=time_spans,
1718 sources=[s.describe() for s in self._sources],
1719 operators=[op.describe() for op in self._operators])
1721 @filldocs
1722 def check(
1723 self, obj=None, tmin=None, tmax=None, time=None, codes=None,
1724 ignore=[]):
1725 '''
1726 Check for common data/metadata problems.
1728 %(query_args)s
1730 :param ignore:
1731 Problem types to be ignored.
1732 :type ignore:
1733 :class:`list` of :class:`str`
1734 (:py:class:`~pyrocko.squirrel.check.SquirrelCheckProblemType`)
1736 :returns:
1737 :py:class:`~pyrocko.squirrel.check.SquirrelCheck` object
1738 containing the results of the check.
1740 See :py:func:`~pyrocko.squirrel.check.do_check`.
1741 '''
1743 from .check import do_check
1744 tmin, tmax, codes = self._get_selection_args(
1745 CHANNEL, obj, tmin, tmax, time, codes)
1747 return do_check(self, tmin=tmin, tmax=tmax, codes=codes, ignore=ignore)
1749 def get_content(
1750 self,
1751 nut,
1752 cache_id='default',
1753 accessor_id='default',
1754 show_progress=False,
1755 model='squirrel'):
1757 '''
1758 Get and possibly load full content for a given index entry from file.
1760 Loads the actual content objects (channel, station, waveform, ...) from
1761 file. For efficiency, sibling content (all stuff in the same file
1762 segment) will also be loaded as a side effect. The loaded contents are
1763 cached in the Squirrel object.
1764 '''
1765 content_cache = self._content_caches[cache_id]
1766 if not content_cache.has(nut):
1768 for nut_loaded in io.iload(
1769 nut.file_path,
1770 segment=nut.file_segment,
1771 format=nut.file_format,
1772 database=self._database,
1773 update_selection=self,
1774 show_progress=show_progress):
1776 content_cache.put(nut_loaded)
1778 try:
1779 return content_cache.get(nut, accessor_id, model)
1781 except KeyError:
1782 raise error.NotAvailable(
1783 'Unable to retrieve content: %s, %s, %s, %s' % nut.key)
1785 def get_contents_threaded(
1786 self,
1787 nuts: list[model.Nut],
1788 cache_id: str = 'default',
1789 accessor_id: str = 'default',
1790 show_progress: bool = False,
1791 model: str = 'squirrel') -> list[model.Nut]:
1793 '''
1794 Get and possibly load full content for a given index entry from file.
1796 Loads the actual content objects (channel, station, waveform, ...) from
1797 file. For efficiency, sibling content (all stuff in the same file
1798 segment) will also be loaded as a side effect. The loaded contents are
1799 cached in the Squirrel object.
1800 '''
1801 if len(nuts) == 0:
1802 return []
1804 by_file_segment = defaultdict(list)
1805 for nut in nuts:
1806 by_file_segment[nut.file_path, nut.file_segment].append(nut)
1808 def get_content(nuts):
1809 return [
1810 self.get_content(
1811 nut,
1812 cache_id=cache_id,
1813 accessor_id=accessor_id,
1814 show_progress=show_progress,
1815 model=model)
1816 for nut in nuts]
1818 if len(by_file_segment) == 1 or self._n_threads == 1:
1819 return get_content(nuts)
1821 elif len(by_file_segment) > 1:
1822 executor = get_loading_executor(max_workers=self._n_threads)
1823 results = []
1824 for subresults in executor.map(
1825 get_content, by_file_segment.values()):
1827 results.extend(subresults)
1829 return results
1831 def advance_accessor(self, accessor_id='default', cache_id=None):
1832 '''
1833 Notify memory caches about consumer moving to a new data batch.
1835 :param accessor_id:
1836 Name of accessing consumer to be advanced.
1837 :type accessor_id:
1838 str
1840 :param cache_id:
1841 Name of cache to for which the accessor should be advanced. By
1842 default the named accessor is advanced in all registered caches.
1843 By default, two caches named ``'default'`` and ``'waveform'`` are
1844 available.
1845 :type cache_id:
1846 str
1848 See :py:class:`~pyrocko.squirrel.cache.ContentCache` for details on how
1849 Squirrel's memory caching works and can be tuned. Default behaviour is
1850 to release data when it has not been used in the latest data
1851 window/batch. If the accessor is never advanced, data is cached
1852 indefinitely - which is often desired e.g. for station meta-data.
1853 Methods for consecutive data traversal, like
1854 :py:meth:`chopper_waveforms` automatically advance and clear
1855 their accessor.
1856 '''
1857 for cache_ in (
1858 self._content_caches.keys()
1859 if cache_id is None
1860 else [cache_id]):
1862 self._content_caches[cache_].advance_accessor(accessor_id)
1864 def clear_accessor(self, accessor_id, cache_id=None):
1865 '''
1866 Notify memory caches about a consumer having finished.
1868 :param accessor_id:
1869 Name of accessor to be cleared.
1870 :type accessor_id:
1871 str
1873 :param cache_id:
1874 Name of cache for which the accessor should be cleared. By default
1875 the named accessor is cleared from all registered caches. By
1876 default, two caches named ``'default'`` and ``'waveform'`` are
1877 available.
1878 :type cache_id:
1879 str
1881 Calling this method clears all references to cache entries held by the
1882 named accessor. Cache entries are then freed if not referenced by any
1883 other accessor.
1884 '''
1886 for cache_ in (
1887 self._content_caches.keys()
1888 if cache_id is None
1889 else [cache_id]):
1891 self._content_caches[cache_].clear_accessor(accessor_id)
1893 def get_cache_stats(self, cache_id):
1894 return self._content_caches[cache_id].get_stats()
1896 @filldocs
1897 def get_stations(
1898 self, obj=None, tmin=None, tmax=None, time=None, codes=None,
1899 model='squirrel', on_error='raise'):
1901 '''
1902 Get stations matching given constraints.
1904 %(query_args)s
1906 :param model:
1907 Select object model for returned values: ``'squirrel'`` to get
1908 Squirrel station objects or ``'pyrocko'`` to get Pyrocko station
1909 objects with channel information attached.
1910 :type model:
1911 str
1913 :returns:
1914 List of :py:class:`pyrocko.squirrel.Station
1915 <pyrocko.squirrel.model.Station>` objects by default or list of
1916 :py:class:`pyrocko.model.Station <pyrocko.model.station.Station>`
1917 objects if ``model='pyrocko'`` is requested.
1919 See :py:meth:`iter_nuts` for details on time span matching.
1920 '''
1922 if model == 'pyrocko':
1923 return self._get_pyrocko_stations(
1924 obj, tmin, tmax, time, codes, on_error=on_error)
1925 elif model in ('squirrel', 'stationxml', 'stationxml+'):
1926 args = self._get_selection_args(
1927 STATION, obj, tmin, tmax, time, codes)
1929 nuts = sorted(
1930 self.iter_nuts('station', *args), key=lambda nut: nut.dkey)
1932 return [self.get_content(nut, model=model) for nut in nuts]
1933 else:
1934 raise ValueError('Invalid station model: %s' % model)
1936 @filldocs
1937 def get_channels(
1938 self, obj=None, tmin=None, tmax=None, time=None, codes=None,
1939 model='squirrel'):
1941 '''
1942 Get channels matching given constraints.
1944 %(query_args)s
1946 :returns:
1947 List of :py:class:`~pyrocko.squirrel.model.Channel` objects.
1949 See :py:meth:`iter_nuts` for details on time span matching.
1950 '''
1952 args = self._get_selection_args(
1953 CHANNEL, obj, tmin, tmax, time, codes)
1955 nuts = sorted(
1956 self.iter_nuts('channel', *args), key=lambda nut: nut.dkey)
1958 return [self.get_content(nut, model=model) for nut in nuts]
1960 @filldocs
1961 def get_sensors(
1962 self, obj=None, tmin=None, tmax=None, time=None, codes=None):
1964 '''
1965 Get sensors matching given constraints.
1967 %(query_args)s
1969 :returns:
1970 List of :py:class:`~pyrocko.squirrel.model.Sensor` objects.
1972 See :py:meth:`iter_nuts` for details on time span matching.
1973 '''
1975 tmin, tmax, codes = self._get_selection_args(
1976 CHANNEL, obj, tmin, tmax, time, codes)
1978 if codes is not None:
1979 codes = codes_patterns_list(
1980 (entry.replace(channel=entry.channel[:-1] + '?')
1981 if entry.channel != '*' else entry)
1982 for entry in codes)
1984 nuts = sorted(
1985 self.iter_nuts(
1986 'channel', tmin, tmax, codes), key=lambda nut: nut.dkey)
1988 return [
1989 sensor for sensor in model.Sensor.from_channels(
1990 self.get_content(nut) for nut in nuts)
1991 if match_time_span(tmin, tmax, sensor)]
1993 @filldocs
1994 def get_responses(
1995 self, obj=None, tmin=None, tmax=None, time=None, codes=None,
1996 model='squirrel'):
1998 '''
1999 Get instrument responses matching given constraints.
2001 %(query_args)s
2003 :param model:
2004 Select data model for returned objects. Choices: ``'squirrel'``,
2005 ``'stationxml'``, ``'stationxml+'``. See return value description.
2006 :type model:
2007 str
2009 :returns:
2010 List of :py:class:`~pyrocko.squirrel.model.Response` if ``model ==
2011 'squirrel'`` or list of
2012 :py:class:`~pyrocko.io.stationxml.FDSNStationXML`
2013 if ``model == 'stationxml'`` or list of
2014 (:py:class:`~pyrocko.squirrel.model.Response`,
2015 :py:class:`~pyrocko.io.stationxml.FDSNStationXML`) if ``model ==
2016 'stationxml+'``.
2018 See :py:meth:`iter_nuts` for details on time span matching.
2019 '''
2021 args = self._get_selection_args(
2022 RESPONSE, obj, tmin, tmax, time, codes)
2024 nuts = sorted(
2025 self.iter_nuts('response', *args), key=lambda nut: nut.dkey)
2027 return [self.get_content(nut, model=model) for nut in nuts]
2029 @filldocs
2030 def get_response(
2031 self, obj=None, tmin=None, tmax=None, time=None, codes=None,
2032 model='squirrel', on_duplicate='raise'):
2034 '''
2035 Get instrument response matching given constraints.
2037 %(query_args)s
2039 :param model:
2040 Select data model for returned object. Choices: ``'squirrel'``,
2041 ``'stationxml'``, ``'stationxml+'``. See return value description.
2042 :type model:
2043 str
2045 :param on_duplicate:
2046 Determines how duplicates/multiple matching responses are handled.
2047 Choices: ``'raise'`` - raise
2048 :py:exc:`~pyrocko.squirrel.error.Duplicate`, ``'warn'`` - emit a
2049 warning and return first match, ``'ignore'`` - silently return
2050 first match.
2051 :type on_duplicate:
2052 str
2054 :returns:
2055 :py:class:`~pyrocko.squirrel.model.Response` if
2056 ``model == 'squirrel'`` or
2057 :py:class:`~pyrocko.io.stationxml.FDSNStationXML` if ``model ==
2058 'stationxml'`` or
2059 (:py:class:`~pyrocko.squirrel.model.Response`,
2060 :py:class:`~pyrocko.io.stationxml.FDSNStationXML`) if ``model ==
2061 'stationxml+'``.
2063 Same as :py:meth:`get_responses` but returning exactly one response.
2064 Raises :py:exc:`~pyrocko.squirrel.error.NotAvailable` if none is
2065 available. Duplicates are handled according to the ``on_duplicate``
2066 argument.
2068 See :py:meth:`iter_nuts` for details on time span matching.
2069 '''
2071 if model == 'stationxml':
2072 model_ = 'stationxml+'
2073 else:
2074 model_ = model
2076 responses = self.get_responses(
2077 obj, tmin, tmax, time, codes, model=model_)
2078 if len(responses) == 0:
2079 raise error.NotAvailable(
2080 'No instrument response available (%s).'
2081 % self._get_selection_args_str(
2082 RESPONSE, obj, tmin, tmax, time, codes))
2084 elif len(responses) > 1:
2086 if on_duplicate in ('raise', 'warn'):
2087 if model_ == 'squirrel':
2088 resps_sq = responses
2089 elif model_ == 'stationxml+':
2090 resps_sq = [resp[0] for resp in responses]
2091 else:
2092 raise ValueError('Invalid response model: %s' % model)
2094 rinfo = ':\n' + '\n'.join(
2095 ' ' + resp.summary for resp in resps_sq)
2097 message = \
2098 'Multiple instrument responses matching given ' \
2099 'constraints (%s)%s%s' % (
2100 self._get_selection_args_str(
2101 RESPONSE, obj, tmin, tmax, time, codes),
2102 ' -> using first' if on_duplicate == 'warn' else '',
2103 rinfo)
2105 if on_duplicate == 'raise':
2106 raise error.Duplicate(message)
2108 elif on_duplicate == 'warn':
2109 logger.warning(message)
2111 elif on_duplicate == 'ignore':
2112 pass
2114 else:
2115 ValueError(
2116 'Invalid argument for on_duplicate: %s' % on_duplicate)
2118 if model == 'stationxml':
2119 return responses[0][1]
2120 else:
2121 return responses[0]
2123 @filldocs
2124 def get_events(
2125 self, obj=None, tmin=None, tmax=None, time=None, codes=None):
2127 '''
2128 Get events matching given constraints.
2130 %(query_args)s
2132 :returns:
2133 List of :py:class:`~pyrocko.model.event.Event` objects.
2135 See :py:meth:`iter_nuts` for details on time span matching.
2136 '''
2138 args = self._get_selection_args(EVENT, obj, tmin, tmax, time, codes)
2139 nuts = sorted(
2140 self.iter_nuts('event', *args), key=lambda nut: nut.dkey)
2142 return [self.get_content(nut) for nut in nuts]
2144 def _housekeep_recent_orders(self):
2145 now = time.time()
2147 to_delete = []
2148 for order_key, time_created in self._recent_orders.items():
2149 if time_created < now - self._recent_orders_prune_time:
2150 to_delete.append(order_key)
2152 for order_key in to_delete:
2153 del self._recent_orders[order_key]
2155 def _redeem_promises(self, *args, order_only=False):
2157 self._housekeep_recent_orders()
2159 to_be_split = []
2160 to_be_saved = []
2161 transaction = None
2163 source_ids = []
2164 sources = {}
2165 for source in self._sources:
2166 if isinstance(source, fdsn.FDSNSource):
2167 source_ids.append(source._source_id)
2168 sources[source._source_id] = source
2170 source_priority = dict(
2171 (source_id, i) for (i, source_id) in enumerate(source_ids))
2173 def save_execute():
2174 paths = []
2175 for order, trs in to_be_saved:
2176 paths.extend(sources[order.source_id].save_waveforms(trs))
2178 paths = sorted(set(paths))
2179 self.add(paths, transaction=transaction)
2181 to_be_saved[:] = []
2183 def split_promise_execute():
2184 for (tmin, tmax, codes, path) in to_be_split:
2185 self._split_nuts(
2186 'waveform_promise',
2187 tmin,
2188 tmax,
2189 codes=codes,
2190 path=path,
2191 transaction=transaction)
2193 to_be_split[:] = []
2195 def split_promise(order, tmax=None):
2196 this = [
2197 order.tmin,
2198 tmax if tmax is not None else order.tmax,
2199 order.codes,
2200 order.source_id]
2202 if to_be_split:
2203 last = to_be_split[-1]
2204 if last[1] == this[0] and last[2:] == this[2:]:
2205 last[1] = this[1]
2206 else:
2207 split_promise_execute()
2208 to_be_split.append(this)
2209 else:
2210 to_be_split.append(this)
2212 tmin, tmax = args[:2]
2214 waveforms = list(self.iter_nuts('waveform', *args))
2215 promises = list(self.iter_nuts('waveform_promise', *args))
2217 codes_to_avail = defaultdict(list)
2218 for nut in waveforms:
2219 codes_to_avail[nut.codes].append((nut.tmin, nut.tmax))
2221 def tts(x):
2222 if isinstance(x, tuple):
2223 return tuple(tts(e) for e in x)
2224 elif isinstance(x, list):
2225 return list(tts(e) for e in x)
2226 else:
2227 return util.time_to_str(x)
2229 now = time.time()
2230 orders = []
2231 for promise in promises:
2232 waveforms_avail = codes_to_avail[promise.codes]
2233 for block_tmin, block_tmax in blocks(
2234 max(tmin, promise.tmin),
2235 min(tmax, promise.tmax),
2236 promise.deltat,
2237 n_samples_block=self.n_samples_block):
2239 if block_tmin > now:
2240 continue
2242 order = WaveformOrder(
2243 source_id=promise.file_path,
2244 codes=promise.codes,
2245 tmin=block_tmin,
2246 tmax=block_tmax,
2247 anxious=sources[promise.file_path].anxious,
2248 deltat=promise.deltat,
2249 gaps=gaps(waveforms_avail, block_tmin, block_tmax),
2250 time_created=now)
2252 order_key = order.key1()
2253 if order_key in self._recent_orders:
2254 continue
2256 self._recent_orders[order_key] = now
2257 orders.append(order)
2259 orders_noop, orders = lpick(lambda order: order.gaps, orders)
2261 order_keys_noop = set(order.key2() for order in orders_noop)
2262 if len(order_keys_noop) != 0 or len(orders_noop) != 0:
2263 logger.info(
2264 'Waveform orders already satisified with cached/local data: '
2265 '%i (%i)' % (len(order_keys_noop), len(orders_noop)))
2267 for order in orders_noop:
2268 split_promise(order)
2270 if order_only:
2271 if orders:
2272 self._pending_orders.extend(orders)
2273 logger.info(
2274 'Enqueuing %i waveform order%s.'
2275 % len_plural(orders))
2276 return
2277 else:
2278 if self._pending_orders:
2279 orders.extend(self._pending_orders)
2280 logger.info(
2281 'Adding %i previously enqueued order%s.'
2282 % len_plural(self._pending_orders))
2284 self._pending_orders = []
2286 order_groups = defaultdict(list)
2287 for order in orders:
2288 order_groups[order.key2()].append(order)
2290 for k, order_group in order_groups.items():
2291 order_group.sort(
2292 key=lambda order: source_priority[order.source_id])
2294 n_order_groups = len(order_groups)
2296 if len(order_groups) != 0 or len(orders) != 0:
2297 logger.info(
2298 'Waveform orders standing for download: %i (%i)'
2299 % (len(order_groups), len(orders)))
2301 task = make_task('Waveform orders processed', n_order_groups)
2302 else:
2303 task = None
2305 def release_order_group(order):
2306 order_key = order.key2()
2307 for followup in order_groups[order_key]:
2308 if followup is not order:
2309 split_promise(followup)
2311 del order_groups[order_key]
2313 if task:
2314 task.update(n_order_groups - len(order_groups))
2316 def noop(order):
2317 pass
2319 def success(order, trs):
2320 release_order_group(order)
2321 if order.is_near_real_time():
2322 if not trs:
2323 return # keep promise when no data received at real time
2324 else:
2325 tmax = max(tr.tmax+tr.deltat for tr in trs)
2326 tmax = order.tmin \
2327 + round((tmax - order.tmin) / order.deltat) \
2328 * order.deltat
2329 split_promise(order, tmax)
2330 else:
2331 split_promise(order)
2333 to_be_saved.append((order, trs))
2335 calls = queue.Queue()
2337 def enqueue(f):
2338 def wrapper(*args):
2339 calls.put((f, args))
2341 return wrapper
2343 while order_groups:
2345 orders_now = []
2346 empty = []
2347 for k, order_group in order_groups.items():
2348 try:
2349 orders_now.append(order_group.pop(0))
2350 except IndexError:
2351 empty.append(k)
2353 for k in empty:
2354 del order_groups[k]
2356 by_source_id = defaultdict(list)
2357 for order in orders_now:
2358 by_source_id[order.source_id].append(order)
2360 quit_threads = False
2362 def aborted():
2363 return quit_threads
2365 try:
2366 threads = []
2367 for source_id in by_source_id:
2368 def download():
2369 try:
2370 sources[source_id].download_waveforms(
2371 by_source_id[source_id],
2372 success=enqueue(success),
2373 error_permanent=enqueue(split_promise),
2374 error_temporary=noop,
2375 aborted=aborted)
2377 finally:
2378 calls.put(None)
2380 thread = threading.Thread(target=download)
2381 thread.start()
2382 threads.append(thread)
2384 ndone = 0
2385 batch = []
2386 while ndone < len(by_source_id):
2387 try:
2388 ret = calls.get(time, timeout=0.1)
2389 if ret is None:
2390 ndone += 1
2391 else:
2392 batch.append(ret)
2394 except queue.Empty:
2395 if batch:
2396 transaction = self.transaction(
2397 'post receive processing (%i)' % len(batch))
2399 with transaction:
2400 while batch:
2401 func, args = batch.pop(0)
2402 func(*args)
2404 split_promise_execute()
2405 save_execute()
2407 transaction = None
2409 if batch:
2410 transaction = self.transaction(
2411 'post receive processing (finishing, %i)' % len(batch))
2413 with transaction:
2414 while batch:
2415 func, args = batch.pop(0)
2416 func(*args)
2418 split_promise_execute()
2419 save_execute()
2421 transaction = None
2423 finally:
2424 quit_threads = True
2426 for thread in threads:
2427 thread.join()
2429 if task:
2430 task.update(n_order_groups - len(order_groups))
2432 if task:
2433 task.done()
2435 @filldocs
2436 def get_waveform_nuts(
2437 self, obj=None, tmin=None, tmax=None, time=None, codes=None,
2438 codes_exclude=None, sample_rate_min=None, sample_rate_max=None,
2439 order_only=False):
2441 '''
2442 Get waveform content entities matching given constraints.
2444 %(query_args)s
2446 Like :py:meth:`get_nuts` with ``kind='waveform'`` but additionally
2447 resolves matching waveform promises (downloads waveforms from remote
2448 sources).
2450 See :py:meth:`iter_nuts` for details on time span matching.
2451 '''
2453 args = self._get_selection_args(WAVEFORM, obj, tmin, tmax, time, codes)
2455 if self.downloads_enabled:
2456 self._redeem_promises(
2457 *args,
2458 codes_exclude,
2459 sample_rate_min,
2460 sample_rate_max,
2461 order_only=order_only)
2463 nuts = sorted(
2464 self.iter_nuts('waveform', *args), key=lambda nut: nut.dkey)
2466 return nuts
2468 @filldocs
2469 def have_waveforms(
2470 self, obj=None, tmin=None, tmax=None, time=None, codes=None):
2472 '''
2473 Check if any waveforms or waveform promises are available for given
2474 constraints.
2476 %(query_args)s
2477 '''
2479 args = self._get_selection_args(WAVEFORM, obj, tmin, tmax, time, codes)
2480 return bool(list(
2481 self.iter_nuts('waveform', *args, limit=1))) \
2482 or (self.downloads_enabled and bool(list(
2483 self.iter_nuts('waveform_promise', *args, limit=1))))
2485 @filldocs
2486 def get_waveforms(
2487 self, obj=None, tmin=None, tmax=None, time=None, codes=None,
2488 codes_exclude=None, sample_rate_min=None, sample_rate_max=None,
2489 uncut=False, want_incomplete=True, degap=True,
2490 maxgap=5, maxlap=None, snap=None, include_last=False,
2491 load_data=True, accessor_id='default', operator_params=None,
2492 order_only=False, channel_priorities=None):
2494 '''
2495 Get waveforms matching given constraints.
2497 %(query_args)s
2499 :param sample_rate_min:
2500 Consider only waveforms with a sampling rate equal to or greater
2501 than the given value [Hz].
2502 :type sample_rate_min:
2503 float
2505 :param sample_rate_max:
2506 Consider only waveforms with a sampling rate equal to or less than
2507 the given value [Hz].
2508 :type sample_rate_max:
2509 float
2511 :param uncut:
2512 Set to ``True``, to disable cutting traces to [``tmin``, ``tmax``]
2513 and to disable degapping/deoverlapping. Returns untouched traces as
2514 they are read from file segment. File segments are always read in
2515 their entirety.
2516 :type uncut:
2517 bool
2519 :param want_incomplete:
2520 If ``True``, gappy/incomplete traces are included in the result.
2521 :type want_incomplete:
2522 bool
2524 :param degap:
2525 If ``True``, connect traces and remove gaps and overlaps.
2526 :type degap:
2527 bool
2529 :param maxgap:
2530 Maximum gap size in samples which is filled with interpolated
2531 samples when ``degap`` is ``True``.
2532 :type maxgap:
2533 int
2535 :param maxlap:
2536 Maximum overlap size in samples which is removed when ``degap`` is
2537 ``True``.
2538 :type maxlap:
2539 int
2541 :param snap:
2542 Rounding functions used when computing sample index from time
2543 instance, for trace start and trace end, respectively. By default,
2544 ``(round, round)`` is used.
2545 :type snap:
2546 :py:class:`tuple` of 2 callables
2548 :param include_last:
2549 If ``True``, add one more sample to the returned traces (the sample
2550 which would be the first sample of a query with ``tmin`` set to the
2551 current value of ``tmax``).
2552 :type include_last:
2553 bool
2555 :param load_data:
2556 If ``True``, waveform data samples are read from files (or cache).
2557 If ``False``, meta-information-only traces are returned (dummy
2558 traces with no data samples).
2559 :type load_data:
2560 bool
2562 :param accessor_id:
2563 Name of consumer on who's behalf data is accessed. Used in cache
2564 management (see :py:mod:`~pyrocko.squirrel.cache`). Used as a key
2565 to distinguish different points of extraction for the decision of
2566 when to release cached waveform data. Should be used when data is
2567 alternately extracted from more than one region / selection.
2568 :type accessor_id:
2569 str
2571 :param channel_priorities:
2572 List of band/instrument code combinations to try. For example,
2573 giving ``['HH', 'BH']`` would first try to get ``HH?`` channels and
2574 then fallback to ``BH?`` if these are not available. The first
2575 matching waveforms are returned. Use in combination with
2576 ``sample_rate_min`` and ``sample_rate_max`` to constrain the sample
2577 rate.
2578 :type channel_priorities:
2579 :py:class:`list` of :py:class:`str`
2581 See :py:meth:`iter_nuts` for details on time span matching.
2583 Loaded data is kept in memory (at least) until
2584 :py:meth:`clear_accessor` has been called or
2585 :py:meth:`advance_accessor` has been called two consecutive times
2586 without data being accessed between the two calls (by this accessor).
2587 Data may still be further kept in the memory cache if held alive by
2588 consumers with a different ``accessor_id``.
2589 '''
2591 tmin, tmax, codes = self._get_selection_args(
2592 WAVEFORM, obj, tmin, tmax, time, codes)
2594 if channel_priorities is not None:
2595 return self._get_waveforms_prioritized(
2596 tmin=tmin, tmax=tmax, codes=codes, codes_exclude=codes_exclude,
2597 sample_rate_min=sample_rate_min,
2598 sample_rate_max=sample_rate_max,
2599 uncut=uncut, want_incomplete=want_incomplete, degap=degap,
2600 maxgap=maxgap, maxlap=maxlap, snap=snap,
2601 include_last=include_last, load_data=load_data,
2602 accessor_id=accessor_id, operator_params=operator_params,
2603 order_only=order_only, channel_priorities=channel_priorities)
2605 kinds = ['waveform']
2606 if self.downloads_enabled:
2607 kinds.append('waveform_promise')
2609 self_tmin, self_tmax = self.get_time_span(kinds)
2611 if None in (self_tmin, self_tmax):
2612 logger.warning(
2613 'No waveforms available.')
2614 return []
2616 tmin = tmin if tmin is not None else self_tmin
2617 tmax = tmax if tmax is not None else self_tmax
2619 if codes is not None and len(codes) == 1:
2620 # TODO: fix for multiple / mixed codes
2621 operator = self.get_operator(codes[0])
2622 if operator is not None:
2623 return operator.get_waveforms(
2624 self, codes[0],
2625 tmin=tmin, tmax=tmax,
2626 uncut=uncut, want_incomplete=want_incomplete, degap=degap,
2627 maxgap=maxgap, maxlap=maxlap, snap=snap,
2628 include_last=include_last, load_data=load_data,
2629 accessor_id=accessor_id, params=operator_params)
2631 nuts = self.get_waveform_nuts(
2632 obj, tmin, tmax, time, codes, codes_exclude, sample_rate_min,
2633 sample_rate_max, order_only=order_only)
2635 if order_only or not nuts:
2636 return []
2638 if load_data:
2639 traces = self.get_contents_threaded(
2640 nuts, 'waveform', accessor_id=accessor_id)
2642 else:
2643 traces = [
2644 trace.Trace(**nut.trace_kwargs) for nut in nuts]
2646 if uncut:
2647 return traces
2649 if snap is None:
2650 snap = (round, round)
2652 chopped = []
2653 for tr in traces:
2654 if not load_data and tr.ydata is not None:
2655 tr = tr.copy(data=False)
2656 tr.ydata = None
2658 try:
2659 chopped.append(tr.chop(
2660 tmin, tmax,
2661 inplace=False,
2662 snap=snap,
2663 include_last=include_last))
2665 except trace.NoData:
2666 pass
2668 processed = self._process_chopped(
2669 chopped, degap, maxgap, maxlap, want_incomplete, tmin, tmax)
2671 return processed
2673 def _get_waveforms_prioritized(
2674 self, tmin=None, tmax=None, codes=None, codes_exclude=None,
2675 channel_priorities=None, **kwargs):
2677 trs_all = []
2678 codes_have = set()
2679 for channel in channel_priorities:
2680 assert len(channel) == 2
2682 if codes is not None:
2683 re_channel = re.compile(
2684 r'^([' + channel[0] + r'?][' + channel[1] + r'?]|\*)')
2686 codes_now = []
2687 for codes_ in codes:
2688 if codes_.channel == '*':
2689 channel_now, n = channel + '?', 1
2690 else:
2691 channel_now, n = re_channel.subn(
2692 channel, codes_.channel)
2694 if n == 1:
2695 codes_now.append(codes_.replace(channel=channel_now))
2697 else:
2698 codes_now = model.CodesNSLCE('*', '*', '*', channel+'?')
2700 if not codes_now:
2701 continue
2703 codes_exclude_now = list(set(
2704 codes_.replace(channel=channel+codes_.channel[-1])
2705 for codes_ in codes_have))
2707 if codes_exclude:
2708 codes_exclude_now.extend(codes_exclude)
2710 trs = self.get_waveforms(
2711 tmin=tmin,
2712 tmax=tmax,
2713 codes=codes_now,
2714 codes_exclude=codes_exclude_now,
2715 **kwargs)
2717 codes_have.update(set(tr.codes for tr in trs))
2718 trs_all.extend(trs)
2720 return trs_all
2722 @filldocs
2723 def chopper_waveforms(
2724 self, obj=None, tmin=None, tmax=None, time=None, codes=None,
2725 codes_exclude=None, sample_rate_min=None, sample_rate_max=None,
2726 tinc=None, tpad=0.,
2727 want_incomplete=True, snap_window=False,
2728 degap=True, maxgap=5, maxlap=None,
2729 snap=None, include_last=False, load_data=True,
2730 accessor_id=None, clear_accessor=True, operator_params=None,
2731 grouping=None, channel_priorities=None):
2733 '''
2734 Iterate window-wise over waveform archive.
2736 %(query_args)s
2738 :param tinc:
2739 Time increment (window shift time) (default uses ``tmax-tmin``).
2740 :type tinc:
2741 :py:func:`~pyrocko.util.get_time_float`
2743 :param tpad:
2744 Padding time appended on either side of the data window (window
2745 overlap is ``2*tpad``).
2746 :type tpad:
2747 :py:func:`~pyrocko.util.get_time_float`
2749 :param want_incomplete:
2750 If ``True``, gappy/incomplete traces are included in the result.
2751 :type want_incomplete:
2752 bool
2754 :param snap_window:
2755 If ``True``, start time windows at multiples of tinc with respect
2756 to system time zero.
2757 :type snap_window:
2758 bool
2760 :param degap:
2761 If ``True``, connect traces and remove gaps and overlaps.
2762 :type degap:
2763 bool
2765 :param maxgap:
2766 Maximum gap size in samples which is filled with interpolated
2767 samples when ``degap`` is ``True``.
2768 :type maxgap:
2769 int
2771 :param maxlap:
2772 Maximum overlap size in samples which is removed when ``degap`` is
2773 ``True``.
2774 :type maxlap:
2775 int
2777 :param snap:
2778 Rounding functions used when computing sample index from time
2779 instance, for trace start and trace end, respectively. By default,
2780 ``(round, round)`` is used.
2781 :type snap:
2782 :py:class:`tuple` of 2 callables
2784 :param include_last:
2785 If ``True``, add one more sample to the returned traces (the sample
2786 which would be the first sample of a query with ``tmin`` set to the
2787 current value of ``tmax``).
2788 :type include_last:
2789 bool
2791 :param load_data:
2792 If ``True``, waveform data samples are read from files (or cache).
2793 If ``False``, meta-information-only traces are returned (dummy
2794 traces with no data samples).
2795 :type load_data:
2796 bool
2798 :param accessor_id:
2799 Name of consumer on who's behalf data is accessed. Used in cache
2800 management (see :py:mod:`~pyrocko.squirrel.cache`). Used as a key
2801 to distinguish different points of extraction for the decision of
2802 when to release cached waveform data. Should be used when data is
2803 alternately extracted from more than one region / selection.
2804 :type accessor_id:
2805 str
2807 :param clear_accessor:
2808 If ``True`` (default), :py:meth:`clear_accessor` is called when the
2809 chopper finishes. Set to ``False`` to keep loaded waveforms in
2810 memory when the generator returns.
2811 :type clear_accessor:
2812 bool
2814 :param grouping:
2815 By default, traversal over the data is over time and all matching
2816 traces of a time window are yielded. Using this option, it is
2817 possible to traverse the data first by group (e.g. station or
2818 network) and second by time. This can reduce the number of traces
2819 in each batch and thus reduce the memory footprint of the process.
2820 :type grouping:
2821 :py:class:`~pyrocko.squirrel.operators.base.Grouping`
2823 :yields:
2824 For each extracted time window or waveform group a
2825 :py:class:`Batch` object is yielded.
2827 See :py:meth:`iter_nuts` for details on time span matching.
2828 '''
2830 tmin, tmax, codes = self._get_selection_args(
2831 WAVEFORM, obj, tmin, tmax, time, codes)
2833 kinds = ['waveform']
2834 if self.downloads_enabled:
2835 kinds.append('waveform_promise')
2837 self_tmin, self_tmax = self.get_time_span(kinds)
2839 if None in (self_tmin, self_tmax):
2840 logger.warning(
2841 'Content has undefined time span. No waveforms and no '
2842 'waveform promises?')
2843 return
2845 if snap_window and tinc is not None:
2846 tmin = tmin if tmin is not None else self_tmin
2847 tmax = tmax if tmax is not None else self_tmax
2848 tmin = math.floor(tmin / tinc) * tinc
2849 tmax = math.ceil(tmax / tinc) * tinc
2850 else:
2851 tmin = tmin if tmin is not None else self_tmin + tpad
2852 tmax = tmax if tmax is not None else self_tmax - tpad
2854 if tinc is None:
2855 tinc = tmax - tmin
2856 nwin = 1
2857 elif tinc == 0.0:
2858 nwin = 1
2859 else:
2860 eps = 1e-6
2861 nwin = max(1, int((tmax - tmin) / tinc - eps) + 1)
2863 try:
2864 if accessor_id is None:
2865 accessor_id = 'chopper%i' % self._n_choppers_active
2867 self._n_choppers_active += 1
2869 if grouping is None:
2870 codes_list = [codes]
2871 else:
2872 operator = Operator(
2873 filtering=CodesPatternFiltering(codes=codes),
2874 grouping=grouping)
2876 available = set(self.get_codes(kind='waveform'))
2877 if self.downloads_enabled:
2878 available.update(self.get_codes(kind='waveform_promise'))
2879 operator.update_mappings(sorted(available))
2881 codes_list = [
2882 codes_patterns_list(scl)
2883 for scl in operator.iter_in_codes()]
2885 ngroups = len(codes_list)
2886 for igroup, scl in enumerate(codes_list):
2887 for iwin in range(nwin):
2888 wmin, wmax = tmin+iwin*tinc, min(tmin+(iwin+1)*tinc, tmax)
2890 chopped = self.get_waveforms(
2891 tmin=wmin-tpad,
2892 tmax=wmax+tpad,
2893 codes=scl,
2894 codes_exclude=codes_exclude,
2895 sample_rate_min=sample_rate_min,
2896 sample_rate_max=sample_rate_max,
2897 snap=snap,
2898 include_last=include_last,
2899 load_data=load_data,
2900 want_incomplete=want_incomplete,
2901 degap=degap,
2902 maxgap=maxgap,
2903 maxlap=maxlap,
2904 accessor_id=accessor_id,
2905 operator_params=operator_params,
2906 channel_priorities=channel_priorities)
2908 self.advance_accessor(accessor_id)
2910 yield Batch(
2911 tmin=wmin,
2912 tmax=wmax,
2913 tpad=tpad,
2914 i=iwin,
2915 n=nwin,
2916 igroup=igroup,
2917 ngroups=ngroups,
2918 traces=chopped)
2920 finally:
2921 self._n_choppers_active -= 1
2922 if clear_accessor:
2923 self.clear_accessor(accessor_id, 'waveform')
2925 async def chopper_waveforms_async(
2926 self, obj=None, tmin=None, tmax=None, time=None, codes=None,
2927 codes_exclude=None, sample_rate_min=None, sample_rate_max=None,
2928 tinc=None, tpad=0.,
2929 want_incomplete=True, snap_window=False,
2930 degap=True, maxgap=5, maxlap=None,
2931 snap=None, include_last=False, load_data=True,
2932 accessor_id=None, clear_accessor=True, operator_params=None,
2933 grouping=None, channel_priorities=None):
2935 await asyncio.to_thread(
2936 self.chopper_waveforms, obj, tmin, tmax, time, codes,
2937 codes_exclude, sample_rate_min, sample_rate_max,
2938 tinc, tpad, want_incomplete, snap_window,
2939 degap, maxgap, maxlap, snap, include_last, load_data,
2940 accessor_id, clear_accessor, operator_params, grouping,
2941 channel_priorities)
2943 chopper_waveforms_async.__doc__ = chopper_waveforms.__doc__
2945 def _process_chopped(
2946 self, chopped, degap, maxgap, maxlap, want_incomplete, tmin, tmax):
2948 chopped.sort(key=lambda a: a.full_id)
2949 if degap:
2950 chopped = trace.degapper(chopped, maxgap=maxgap, maxlap=maxlap)
2952 if not want_incomplete:
2953 chopped_weeded = []
2954 for tr in chopped:
2955 emin = tr.tmin - tmin
2956 emax = tr.tmax + tr.deltat - tmax
2957 if (abs(emin) <= 0.5*tr.deltat and abs(emax) <= 0.5*tr.deltat):
2958 chopped_weeded.append(tr)
2960 elif degap:
2961 if (0. < emin <= 5. * tr.deltat
2962 and -5. * tr.deltat <= emax < 0.):
2964 tr.extend(tmin, tmax-tr.deltat, fillmethod='repeat')
2965 chopped_weeded.append(tr)
2967 chopped = chopped_weeded
2969 return chopped
2971 def _get_pyrocko_stations(
2972 self, obj=None, tmin=None, tmax=None, time=None, codes=None,
2973 on_error='raise'):
2975 from pyrocko import model as pmodel
2977 if codes is not None:
2978 codes = codes_patterns_for_kind(STATION, codes)
2980 by_nsl = defaultdict(lambda: (list(), list()))
2981 for station in self.get_stations(obj, tmin, tmax, time, codes):
2982 sargs = station._get_pyrocko_station_args()
2983 by_nsl[station.codes.nsl][0].append(sargs)
2985 if codes is not None:
2986 codes = [model.CodesNSLCE(c) for c in codes]
2988 for channel in self.get_channels(obj, tmin, tmax, time, codes):
2989 sargs = channel._get_pyrocko_station_args()
2990 sargs_list, channels_list = by_nsl[channel.codes.nsl]
2991 sargs_list.append(sargs)
2992 channels_list.append(channel)
2994 pstations = []
2995 nsls = list(by_nsl.keys())
2996 nsls.sort()
2997 for nsl in nsls:
2998 sargs_list, channels_list = by_nsl[nsl]
2999 sargs = util.consistency_merge(
3000 [('',) + x for x in sargs_list],
3001 error=on_error)
3003 by_c = defaultdict(list)
3004 for ch in channels_list:
3005 by_c[ch.codes.channel].append(ch._get_pyrocko_channel_args())
3007 chas = list(by_c.keys())
3008 chas.sort()
3009 pchannels = []
3010 for cha in chas:
3011 list_of_cargs = by_c[cha]
3012 cargs = util.consistency_merge(
3013 [('',) + x for x in list_of_cargs],
3014 error=on_error)
3015 pchannels.append(pmodel.Channel(*cargs))
3017 pstations.append(
3018 pmodel.Station(*sargs, channels=pchannels))
3020 return pstations
3022 @property
3023 def pile(self):
3025 '''
3026 Emulates the older :py:class:`pyrocko.pile.Pile` interface.
3028 This property exposes a :py:class:`pyrocko.squirrel.pile.Pile` object,
3029 which emulates most of the older :py:class:`pyrocko.pile.Pile` methods
3030 but uses the fluffy power of the Squirrel under the hood.
3032 This interface can be used as a drop-in replacement for piles which are
3033 used in existing scripts and programs for efficient waveform data
3034 access. The Squirrel-based pile scales better for large datasets. Newer
3035 scripts should use Squirrel's native methods to avoid the emulation
3036 overhead.
3037 '''
3038 from . import pile
3040 if self._pile is None:
3041 self._pile = pile.Pile(self)
3043 return self._pile
3045 def snuffle(self, **kwargs):
3046 '''
3047 Look at dataset in Snuffler.
3048 '''
3049 self.pile.snuffle(**kwargs)
3051 def _gather_codes_keys(self, kind, gather, selector):
3052 return set(
3053 gather(codes)
3054 for codes in self.iter_codes(kind)
3055 if selector is None or selector(codes))
3057 def __str__(self):
3058 return str(self.get_stats())
3060 def get_coverage(
3061 self, kind, tmin=None, tmax=None, codes=None, limit=None):
3063 '''
3064 Get coverage information.
3066 Get information about strips of gapless data coverage.
3068 :param kind:
3069 Content kind to be queried.
3070 :type kind:
3071 str
3073 :param tmin:
3074 Start time of query interval.
3075 :type tmin:
3076 :py:func:`~pyrocko.util.get_time_float`
3078 :param tmax:
3079 End time of query interval.
3080 :type tmax:
3081 :py:func:`~pyrocko.util.get_time_float`
3083 :param codes:
3084 If given, restrict query to given content codes patterns.
3085 :type codes:
3086 :py:class:`list` of :py:class:`~pyrocko.squirrel.model.Codes`
3087 objects appropriate for the queried content type, or anything which
3088 can be converted to such objects.
3090 :param limit:
3091 Limit query to return only up to a given maximum number of entries
3092 per matching time series (without setting this option, very gappy
3093 data could cause the query to execute for a very long time).
3094 :type limit:
3095 int
3097 :returns:
3098 Information about time spans covered by the requested time series
3099 data.
3100 :rtype:
3101 :py:class:`list` of :py:class:`~pyrocko.squirrel.model.Coverage`
3102 '''
3104 tmin_seconds, tmin_offset = model.tsplit(tmin)
3105 tmax_seconds, tmax_offset = model.tsplit(tmax)
3106 kind_id = to_kind_id(kind)
3108 codes_info = list(self._iter_codes_info(kind=kind))
3110 kdata_all = []
3111 if codes is None:
3112 for _, codes_entry, deltat, kind_codes_id, _ in codes_info:
3113 kdata_all.append(
3114 (codes_entry, kind_codes_id, codes_entry, deltat))
3116 else:
3117 for codes_entry in codes:
3118 pattern = to_codes(kind_id, codes_entry)
3119 for _, codes_entry, deltat, kind_codes_id, _ in codes_info:
3120 if model.match_codes(pattern, codes_entry):
3121 kdata_all.append(
3122 (pattern, kind_codes_id, codes_entry, deltat))
3124 kind_codes_ids = [x[1] for x in kdata_all]
3126 counts_at_tmin = {}
3127 if tmin is not None:
3128 for nut in self.iter_nuts(
3129 kind, tmin, tmin, kind_codes_ids=kind_codes_ids):
3131 k = nut.codes, nut.deltat or 0.0
3132 if k not in counts_at_tmin:
3133 counts_at_tmin[k] = 0
3135 counts_at_tmin[k] += 1
3137 coverages = []
3138 for pattern, kind_codes_id, codes_entry, deltat in kdata_all:
3139 entry = [pattern, codes_entry, deltat, None, None, []]
3140 for i, order in [(0, 'ASC'), (1, 'DESC')]:
3141 sql = self._sql('''
3142 SELECT
3143 time_seconds,
3144 time_offset
3145 FROM %(db)s.%(coverage)s
3146 WHERE
3147 kind_codes_id == ?
3148 ORDER BY
3149 kind_codes_id ''' + order + ''',
3150 time_seconds ''' + order + ''',
3151 time_offset ''' + order + '''
3152 LIMIT 1
3153 ''')
3155 for row in self._conn.execute(sql, [kind_codes_id]):
3156 entry[3+i] = model.tjoin(row[0], row[1])
3158 if None in entry[3:5]:
3159 continue
3161 args = [kind_codes_id]
3163 sql_time = ''
3164 if tmin is not None:
3165 # intentionally < because (== tmin) is queried from nuts
3166 sql_time += ' AND ( ? < time_seconds ' \
3167 'OR ( ? == time_seconds AND ? < time_offset ) ) '
3168 args.extend([tmin_seconds, tmin_seconds, tmin_offset])
3170 if tmax is not None:
3171 sql_time += ' AND ( time_seconds < ? ' \
3172 'OR ( ? == time_seconds AND time_offset <= ? ) ) '
3173 args.extend([tmax_seconds, tmax_seconds, tmax_offset])
3175 sql_limit = ''
3176 if limit is not None:
3177 sql_limit = ' LIMIT ?'
3178 args.append(limit)
3180 sql = self._sql('''
3181 SELECT
3182 time_seconds,
3183 time_offset,
3184 step
3185 FROM %(db)s.%(coverage)s
3186 WHERE
3187 kind_codes_id == ?
3188 ''' + sql_time + '''
3189 ORDER BY
3190 kind_codes_id,
3191 time_seconds,
3192 time_offset
3193 ''' + sql_limit)
3195 rows = list(self._conn.execute(sql, args))
3197 if limit is not None and len(rows) == limit:
3198 entry[-1] = None
3199 else:
3200 counts = counts_at_tmin.get((codes_entry, deltat or 0.0), 0)
3201 tlast = None
3202 if tmin is not None:
3203 entry[-1].append((tmin, counts))
3204 tlast = tmin
3206 for row in rows:
3207 t = model.tjoin(row[0], row[1])
3208 counts += row[2]
3209 entry[-1].append((t, counts))
3210 tlast = t
3212 if tmax is not None and (tlast is None or tlast != tmax):
3213 entry[-1].append((tmax, counts))
3215 coverages.append(model.Coverage.from_values(entry + [kind_id]))
3217 return coverages
3219 def get_stationxml(
3220 self, obj=None, tmin=None, tmax=None, time=None, codes=None,
3221 level='response', on_error='raise'):
3223 '''
3224 Get station/channel/response metadata in StationXML representation.
3226 %(query_args)s
3228 :returns:
3229 :py:class:`~pyrocko.io.stationxml.FDSNStationXML` object.
3230 '''
3232 if level not in ('network', 'station', 'channel', 'response'):
3233 raise ValueError('Invalid level: %s' % level)
3235 tmin, tmax, codes = self._get_selection_args(
3236 CHANNEL, obj, tmin, tmax, time, codes)
3238 def tts(t):
3239 if t is None:
3240 return '<none>'
3241 else:
3242 return util.tts(t, format='%Y-%m-%d %H:%M:%S')
3244 if on_error == 'ignore':
3245 def handle_error(exc):
3246 pass
3248 elif on_error == 'warn':
3249 def handle_error(exc):
3250 logger.warning(str(exc))
3252 elif on_error == 'raise':
3253 def handle_error(exc):
3254 raise exc
3256 def use_first(node_type_name, codes, k, group):
3257 if on_error == 'warn' and len(group) > 1:
3258 logger.warning(
3259 'Duplicates for %s %s, %s - %s -> using first' % (
3260 node_type_name,
3261 '.'.join(codes),
3262 tts(k[0]), tts(k[1])))
3264 return group[0]
3266 def deduplicate(node_type_name, codes, nodes):
3267 groups = defaultdict(list)
3268 for node in nodes:
3269 k = (node.start_date, node.end_date)
3270 groups[k].append(node)
3272 return [
3273 use_first(node_type_name, codes, k, group)
3274 for (k, group) in groups.items()]
3276 filtering = CodesPatternFiltering(codes=codes)
3278 nslcs = list(set(
3279 codes.nslc for codes in
3280 filtering.filter(self.get_codes(kind='channel'))))
3282 from pyrocko.io import stationxml as sx
3284 networks = []
3285 task_networks = make_task('StationXML: add networks')
3286 for net, stas in task_networks(prefix_tree(nslcs)):
3287 network = sx.Network(code=net)
3288 networks.append(network)
3290 if level not in ('station', 'channel', 'response'):
3291 continue
3293 task_stations = make_task('StationXML: add stations')
3294 for sta, locs in task_stations(stas):
3295 stations = self.get_stations(
3296 tmin=tmin,
3297 tmax=tmax,
3298 codes=(net, sta, '*'),
3299 model='stationxml')
3301 if on_error != 'raise':
3302 stations = deduplicate(
3303 'Station', (net, sta), stations)
3305 errors = sx.check_overlaps(
3306 'Station', (net, sta), stations)
3308 if errors:
3309 handle_error(error.Duplicate(
3310 'Overlapping/duplicate station info:\n %s'
3311 % '\n '.join(errors)))
3313 network.station_list.extend(stations)
3315 if level not in ('channel', 'response'):
3316 continue
3318 for loc, chas in locs:
3319 for cha, _ in chas:
3320 channels = self.get_channels(
3321 tmin=tmin,
3322 tmax=tmax,
3323 codes=(net, sta, loc, cha),
3324 model='stationxml')
3326 if on_error != 'raise':
3327 channels = deduplicate(
3328 'Channel', (net, sta, loc, cha), channels)
3330 errors = sx.check_overlaps(
3331 'Channel', (net, sta, loc, cha), channels)
3333 if errors:
3334 handle_error(error.Duplicate(
3335 'Overlapping/duplicate channel info:\n %s'
3336 % '\n '.join(errors)))
3338 for channel in channels:
3339 station = sx.find_containing(stations, channel)
3340 if station is not None:
3341 station.channel_list.append(channel)
3342 else:
3343 handle_error(error.NotAvailable(
3344 'No station or station epoch found '
3345 'for channel: %s' % '.'.join(
3346 (net, sta, loc, cha))))
3348 continue
3350 if level != 'response':
3351 continue
3353 try:
3354 response_sq, response_sx = self.get_response(
3355 codes=(net, sta, loc, cha),
3356 tmin=channel.start_date,
3357 tmax=channel.end_date,
3358 model='stationxml+',
3359 on_duplicate=on_error)
3361 except error.NotAvailable as e:
3362 handle_error(e)
3363 continue
3365 if not (
3366 sx.eq_open(
3367 channel.start_date, response_sq.tmin)
3368 and sx.eq_open(
3369 channel.end_date, response_sq.tmax)):
3371 handle_error(error.Inconsistencies(
3372 'Response time span does not match '
3373 'channel time span: %s' % '.'.join(
3374 (net, sta, loc, cha))))
3376 channel.response = response_sx
3378 return sx.FDSNStationXML(
3379 source='Generated by Pyrocko Squirrel.',
3380 network_list=networks)
3382 def add_operator(self, op):
3383 self._operators.append(op)
3385 def update_operator_mappings(self):
3386 available = self.get_codes(kind=('channel'))
3388 for operator in self._operators:
3389 operator.update_mappings(available, self._operator_registry)
3391 def iter_operator_mappings(self):
3392 for operator in self._operators:
3393 for in_codes, out_codes in operator.iter_mappings():
3394 yield operator, in_codes, out_codes
3396 def get_operator_mappings(self):
3397 return list(self.iter_operator_mappings())
3399 def get_operator(self, codes):
3400 try:
3401 return self._operator_registry[codes][0]
3402 except KeyError:
3403 return None
3405 def get_operator_group(self, codes):
3406 try:
3407 return self._operator_registry[codes]
3408 except KeyError:
3409 return None, (None, None, None)
3411 def iter_operator_codes(self):
3412 for _, _, out_codes in self.iter_operator_mappings():
3413 for codes in out_codes:
3414 yield codes
3416 def get_operator_codes(self):
3417 return list(self.iter_operator_codes())
3419 def get_sources(self):
3420 return self._sources
3422 def print_tables(self, table_names=None, stream=None):
3423 '''
3424 Dump raw database tables in textual form (for debugging purposes).
3426 :param table_names:
3427 Names of tables to be dumped or ``None`` to dump all.
3428 :type table_names:
3429 :py:class:`list` of :py:class:`str`
3431 :param stream:
3432 Open file or ``None`` to dump to standard output.
3433 '''
3435 if stream is None:
3436 stream = sys.stdout
3438 if isinstance(table_names, str):
3439 table_names = [table_names]
3441 if table_names is None:
3442 table_names = [
3443 'selection_file_states',
3444 'selection_nuts',
3445 'selection_kind_codes_count',
3446 'files', 'nuts', 'kind_codes', 'kind_codes_count']
3448 m = {
3449 'selection_file_states': '%(db)s.%(file_states)s',
3450 'selection_nuts': '%(db)s.%(nuts)s',
3451 'selection_kind_codes_count': '%(db)s.%(kind_codes_count)s',
3452 'files': 'files',
3453 'nuts': 'nuts',
3454 'kind_codes': 'kind_codes',
3455 'kind_codes_count': 'kind_codes_count'}
3457 for table_name in table_names:
3458 self._database.print_table(
3459 m[table_name] % self._names, stream=stream)
3462class SquirrelStats(Object):
3463 '''
3464 Container to hold statistics about contents available from a Squirrel.
3466 See also :py:meth:`Squirrel.get_stats`.
3467 '''
3469 nfiles = Int.T(
3470 help='Number of files in selection.')
3471 nnuts = Int.T(
3472 help='Number of index nuts in selection.')
3473 codes = List.T(
3474 Tuple.T(content_t=String.T()),
3475 help='Available code sequences in selection, e.g. '
3476 '(agency, network, station, location) for stations nuts.')
3477 kinds = List.T(
3478 String.T(),
3479 help='Available content types in selection.')
3480 total_size = Int.T(
3481 help='Aggregated file size of files is selection.')
3482 counts = Dict.T(
3483 String.T(), Dict.T(Tuple.T(content_t=String.T()), Int.T()),
3484 help='Breakdown of how many nuts of any content type and code '
3485 'sequence are available in selection, ``counts[kind][codes]``.')
3486 time_spans = Dict.T(
3487 String.T(), Tuple.T(content_t=Timestamp.T()),
3488 help='Time spans by content type.')
3489 sources = List.T(
3490 String.T(),
3491 help='Descriptions of attached sources.')
3492 operators = List.T(
3493 String.T(),
3494 help='Descriptions of attached operators.')
3496 def __str__(self):
3497 kind_counts = dict(
3498 (kind, sum(self.counts[kind].values())) for kind in self.kinds)
3500 scodes = model.codes_to_str_abbreviated(self.codes)
3502 ssources = '<none>' if not self.sources else '\n' + '\n'.join(
3503 ' ' + s for s in self.sources)
3505 soperators = '<none>' if not self.operators else '\n' + '\n'.join(
3506 ' ' + s for s in self.operators)
3508 def stime(t):
3509 return util.tts(t) if t is not None and t not in (
3510 model.g_tmin, model.g_tmax) else '<none>'
3512 def stable(rows):
3513 ns = [max(len(w) for w in col) for col in zip(*rows)]
3514 return '\n'.join(
3515 ' '.join(w.ljust(n) for n, w in zip(ns, row))
3516 for row in rows)
3518 def indent(s):
3519 return '\n'.join(' '+line for line in s.splitlines())
3521 stspans = '<none>' if not self.kinds else '\n' + indent(stable([(
3522 kind + ':',
3523 str(kind_counts[kind]),
3524 stime(self.time_spans[kind][0]),
3525 '-',
3526 stime(self.time_spans[kind][1])) for kind in sorted(self.kinds)]))
3528 s = '''
3529Number of files: %i
3530Total size of known files: %s
3531Number of index nuts: %i
3532Available content kinds: %s
3533Available codes: %s
3534Sources: %s
3535Operators: %s''' % (
3536 self.nfiles,
3537 util.human_bytesize(self.total_size),
3538 self.nnuts,
3539 stspans, scodes, ssources, soperators)
3541 return s.lstrip()
3544__all__ = [
3545 'Batch',
3546 'Squirrel',
3547 'SquirrelStats',
3548]