Source code for pyrocko.squirrel.pile

# http://pyrocko.org - GPLv3
#
# The Pyrocko Developers, 21st Century
# ---|P------/S----------~Lg----------

import logging
import weakref
from pyrocko import squirrel as psq, trace, util
from pyrocko import pile as classic_pile

logger = logging.getLogger('psq.pile')


def trace_callback_to_nut_callback(trace_callback):
    if trace_callback is None:
        return None

    def nut_callback(nut):
        return trace_callback(nut.dummy_trace)

    return nut_callback


class CodesDummyTrace(object):
    def __init__(self, codes):
        self.network, self.station, self.location, self.channel \
            = self.nslc_id \
            = codes[0:4]


def trace_callback_to_codes_callback(trace_callback):
    if trace_callback is None:
        return None

    def codes_callback(codes):
        return trace_callback(CodesDummyTrace(codes))

    return codes_callback


[docs]class Pile(object): ''' :py:class:`pyrocko.pile.Pile` surrogate: waveform lookup, loading and caching. This class emulates most of the older :py:class:`pyrocko.pile.Pile` methods by using calls to a :py:class:`pyrocko.squirrel.base.Squirrel` instance behind the scenes. This interface can be used as a drop-in replacement for piles which are used in existing scripts and programs for efficient waveform data access. The Squirrel-based pile scales better for large datasets. Newer scripts should use Squirrel's native methods to avoid the emulation overhead. .. note:: Many methods in the original pile implementation lack documentation, as do here. Read the source, Luke! ''' def __init__(self, squirrel=None): if squirrel is None: squirrel = psq.Squirrel() self._squirrel = squirrel self._listeners = [] self._squirrel.get_database().add_listener( self._notify_squirrel_to_pile) def _notify_squirrel_to_pile(self, event, *args): self.notify_listeners(event) def add_listener(self, obj): self._listeners.append(weakref.ref(obj)) def notify_listeners(self, what): for ref in self._listeners: obj = ref() if obj: obj.pile_changed(what) def get_tmin(self): return self.tmin def get_tmax(self): return self.tmax def get_deltatmin(self): return self._squirrel.get_deltat_span('waveform')[0] def get_deltatmax(self): return self._squirrel.get_deltat_span('waveform')[1] @property def deltatmin(self): return self.get_deltatmin() @property def deltatmax(self): return self.get_deltatmax() @property def tmin(self): return self._squirrel.get_time_span('waveform')[0] @property def tmax(self): return self._squirrel.get_time_span('waveform')[1] @property def networks(self): return set( codes.network for codes in self._squirrel.get_codes('waveform')) @property def stations(self): return set( codes.station for codes in self._squirrel.get_codes('waveform')) @property def locations(self): return set( codes.location for codes in self._squirrel.get_codes('waveform')) @property def channels(self): return set( codes.channel for codes in self._squirrel.get_codes('waveform')) def is_relevant(self, tmin, tmax): ptmin, ptmax = self._squirrel.get_time_span( ['waveform', 'waveform_promise']) if None in (ptmin, ptmax): return False return tmax >= ptmin and ptmax >= tmin def load_files( self, filenames, filename_attributes=None, fileformat='mseed', cache=None, show_progress=True, update_progress=None): self._squirrel.add( filenames, kinds='waveform', format=fileformat) def chop( self, tmin, tmax, nut_selector=None, snap=(round, round), include_last=False, load_data=True, accessor_id='default'): nuts = self._squirrel.get_waveform_nuts(tmin=tmin, tmax=tmax) if load_data: traces = [ self._squirrel.get_content(nut, 'waveform', accessor_id) for nut in nuts if nut_selector is None or nut_selector(nut)] else: traces = [ trace.Trace(**nut.trace_kwargs) for nut in nuts if nut_selector is None or nut_selector(nut)] self._squirrel.advance_accessor(accessor_id) chopped = [] used_files = set() for tr in traces: if not load_data and tr.ydata is not None: tr = tr.copy(data=False) tr.ydata = None try: chopped.append(tr.chop( tmin, tmax, inplace=False, snap=snap, include_last=include_last)) except trace.NoData: pass return chopped, used_files def _process_chopped( self, chopped, degap, maxgap, maxlap, want_incomplete, wmax, wmin, tpad): chopped.sort(key=lambda a: a.full_id) if degap: chopped = trace.degapper(chopped, maxgap=maxgap, maxlap=maxlap) if not want_incomplete: chopped_weeded = [] for tr in chopped: emin = tr.tmin - (wmin-tpad) emax = tr.tmax + tr.deltat - (wmax+tpad) if (abs(emin) <= 0.5*tr.deltat and abs(emax) <= 0.5*tr.deltat): chopped_weeded.append(tr) elif degap: if (0. < emin <= 5. * tr.deltat and -5. * tr.deltat <= emax < 0.): tr.extend( wmin-tpad, wmax+tpad-tr.deltat, fillmethod='repeat') chopped_weeded.append(tr) chopped = chopped_weeded for tr in chopped: tr.wmin = wmin tr.wmax = wmax return chopped
[docs] def chopper( self, tmin=None, tmax=None, tinc=None, tpad=0., trace_selector=None, want_incomplete=True, degap=True, maxgap=5, maxlap=None, keep_current_files_open=False, accessor_id='default', snap=(round, round), include_last=False, load_data=True, style=None): ''' Get iterator for shifting window wise data extraction from waveform archive. :param tmin: start time (default uses start time of available data) :param tmax: end time (default uses end time of available data) :param tinc: time increment (window shift time) (default uses ``tmax-tmin``) :param tpad: padding time appended on either side of the data windows (window overlap is ``2*tpad``) :param trace_selector: filter callback taking :py:class:`pyrocko.trace.Trace` objects :param want_incomplete: if set to ``False``, gappy/incomplete traces are discarded from the results :param degap: whether to try to connect traces and to remove gaps and overlaps :param maxgap: maximum gap size in samples which is filled with interpolated samples when ``degap`` is ``True`` :param maxlap: maximum overlap size in samples which is removed when ``degap`` is ``True`` :param keep_current_files_open: whether to keep cached trace data in memory after the iterator has ended :param accessor_id: if given, used as a key to identify different points of extraction for the decision of when to release cached trace data (should be used when data is alternately extracted from more than one region / selection) :param snap: replaces Python's :py:func:`round` function which is used to determine indices where to start and end the trace data array :param include_last: whether to include last sample :param load_data: whether to load the waveform data. If set to ``False``, traces with no data samples, but with correct meta-information are returned :param style: set to ``'batch'`` to yield waveforms and information about the chopper state as :py:class:`pyrocko.pile.Batch` objects. By default lists of :py:class:`pyrocko.trace.Trace` objects are yielded. :returns: iterator providing extracted waveforms for each extracted window. See ``style`` argument for details. ''' if tmin is None: if self.tmin is None: logger.warning('Pile\'s tmin is not set - pile may be empty.') return tmin = self.tmin + tpad if tmax is None: if self.tmax is None: logger.warning('Pile\'s tmax is not set - pile may be empty.') return tmax = self.tmax - tpad if tinc is None: tinc = tmax - tmin if not self.is_relevant(tmin-tpad, tmax+tpad): return nut_selector = trace_callback_to_nut_callback(trace_selector) eps = tinc * 1e-6 if tinc != 0.0: nwin = int(((tmax - eps) - tmin) / tinc) + 1 else: nwin = 1 for iwin in range(nwin): wmin, wmax = tmin+iwin*tinc, min(tmin+(iwin+1)*tinc, tmax) chopped, used_files = self.chop( wmin-tpad, wmax+tpad, nut_selector, snap, include_last, load_data, accessor_id) processed = self._process_chopped( chopped, degap, maxgap, maxlap, want_incomplete, wmax, wmin, tpad) if style == 'batch': yield classic_pile.Batch( tmin=wmin, tmax=wmax, i=iwin, n=nwin, traces=processed) else: yield processed if not keep_current_files_open: self._squirrel.clear_accessor(accessor_id, 'waveform')
def chopper_grouped(self, gather, progress=None, *args, **kwargs): keys = self.gather_keys(gather) if len(keys) == 0: return outer_trace_selector = None if 'trace_selector' in kwargs: outer_trace_selector = kwargs['trace_selector'] # the use of this gather-cache makes it impossible to modify the pile # during chopping pbar = None if progress is not None: pbar = util.progressbar(progress, len(keys)) for ikey, key in enumerate(keys): def tsel(tr): return gather(tr) == key and (outer_trace_selector is None or outer_trace_selector(tr)) kwargs['trace_selector'] = tsel for traces in self.chopper(*args, **kwargs): yield traces if pbar: pbar.update(ikey+1) if pbar: pbar.finish() def reload_modified(self): self._squirrel.reload()
[docs] def iter_traces( self, load_data=False, return_abspath=False, trace_selector=None): ''' Iterate over all traces in pile. :param load_data: whether to load the waveform data, by default empty traces are yielded :param return_abspath: if ``True`` yield tuples containing absolute file path and :py:class:`pyrocko.trace.Trace` objects :param trace_selector: filter callback taking :py:class:`pyrocko.trace.Trace` objects ''' assert not load_data assert not return_abspath nut_selector = trace_callback_to_nut_callback(trace_selector) for nut in self._squirrel.get_waveform_nuts(): if nut_selector is None or nut_selector(nut): yield trace.Trace(**nut.trace_kwargs)
def gather_keys(self, gather, selector=None): codes_gather = trace_callback_to_codes_callback(gather) codes_selector = trace_callback_to_codes_callback(selector) return self._squirrel._gather_codes_keys( 'waveform', codes_gather, codes_selector)
[docs] def snuffle(self, **kwargs): '''Visualize it. :param stations: list of `pyrocko.model.Station` objects or ``None`` :param events: list of `pyrocko.model.Event` objects or ``None`` :param markers: list of `pyrocko.gui_util.Marker` objects or ``None`` :param ntracks: float, number of tracks to be shown initially (default: 12) :param follow: time interval (in seconds) for real time follow mode or ``None`` :param controls: bool, whether to show the main controls (default: ``True``) :param opengl: bool, whether to use opengl (default: ``False``) ''' from pyrocko.gui.snuffler import snuffle snuffle(self, **kwargs)
def add_file(self, mtf): if isinstance(mtf, classic_pile.MemTracesFile): name = self._squirrel.add_volatile_waveforms(mtf.get_traces()) mtf._squirrel_name = name else: assert False def remove_file(self, mtf): if isinstance(mtf, classic_pile.MemTracesFile) \ and getattr(mtf, '_squirrel_name', False): self._squirrel.remove(mtf._squirrel_name) mtf._squirrel_name = None def is_empty(self): return 'waveform' not in self._squirrel.get_kinds() def get_update_count(self): return 0
def get_cache(_): return None