Source code for pyrocko.hamster_pile

# http://pyrocko.org - GPLv3
#
# The Pyrocko Developers, 21st Century
# ---|P------/S----------~Lg----------
from __future__ import absolute_import

import os
import logging

from . import pile, io
from . import trace as tracemod

logger = logging.getLogger('pyrocko.hamster_pile')


class Processor(object):
    def __init__(self):
        self._buffers = {}

    def process(self, trace):
        return [trace]

    def get_buffer(self, trace):

        nslc = trace.nslc_id
        if nslc not in self._buffers:
            return None

        trbuf = self._buffers[nslc]
        if (abs((trbuf.tmax+trbuf.deltat) - trace.tmin) < 1.0e-1*trbuf.deltat
                and trbuf.ydata.dtype == trace.ydata.dtype
                and trbuf.deltat == trace.deltat):
            return trbuf
        else:
            return None

    def set_buffer(self, trace):
        nslc = trace.nslc_id
        self._buffers[nslc] = trace

    def empty_buffer(self, trace):
        nslc = trace.nslc_id
        del self._buffers[nslc]

    def flush_buffers(self):
        traces = list(self._buffers.values())
        self._buffers = {}
        return traces


class Renamer(object):
    def __init__(self, mapping):
        self._mapping = mapping

    def process(self, trace):
        target_id = self._mapping(trace)
        if target_id is None:
            return []

        out = trace.copy()
        out.set_codes(*target_id)
        return [out]


class Chain(Processor):
    def __init__(self, *processors):
        self._processors = processors

    def process(self, trace):
        traces = [trace]
        xtraces = []
        for p in self._processors:
            for tr in traces:
                xtraces.extend(p.process(traces))

        return xtraces


class Downsampler(Processor):

    def __init__(self, mapping, deltat):
        Processor.__init__(self)
        self._mapping = mapping
        self._downsampler = tracemod.co_downsample_to(deltat)

    def process(self, trace):
        target_id = self._mapping(trace)
        if target_id is None:
            return []

        ds_trace = self._downsampler.send(trace)
        ds_trace.set_codes(*target_id)
        if ds_trace.data_len() == 0:
            return []

        return [ds_trace]

    def __del__(self):
        self._downsampler.close()


class Grower(Processor):

    def __init__(self, tflush=None):
        Processor.__init__(self)
        self._tflush = tflush

    def process(self, trace):
        buffer = self.get_buffer(trace)
        if buffer is None:
            buffer = trace
            self.set_buffer(buffer)
        else:
            buffer.append(trace.ydata)

        if buffer.tmax - buffer.tmin >= self._tflush:
            self.empty_buffer(buffer)
            return [buffer]

        else:
            return []


[docs]class HamsterPile(pile.Pile): def __init__( self, fixation_length=None, path=None, format='from_extension', forget_fixed=False, processors=None): pile.Pile.__init__(self) self._buffers = {} # keys: nslc, values: MemTracesFile self._fixation_length = fixation_length self._format = format self._path = path self._forget_fixed = forget_fixed if processors is None: self._processors = [Processor()] else: self._processors = [] for p in processors: self.add_processor(p)
[docs] def set_fixation_length(self, length): '''Set length after which the fixation method is called on buffer traces. The length should be given in seconds. Give None to disable. ''' self.fixate_all() self._fixation_length = length # in seconds
def set_save_path( self, path='dump_%(network)s.%(station)s.%(location)s.%(channel)s_' '%(tmin)s_%(tmax)s.mseed'): self.fixate_all() self._path = path def add_processor(self, processor): self.fixate_all() self._processors.append(processor) def insert_trace(self, trace): logger.debug('Received a trace: %s' % trace) for p in self._processors: for tr in p.process(trace): self._insert_trace(tr) def _insert_trace(self, trace): buf = self._append_to_buffer(trace) nslc = trace.nslc_id if buf is None: # create new buffer trace if nslc in self._buffers: self._fixate(self._buffers[nslc]) trbuf = trace.copy() buf = pile.MemTracesFile(None, [trbuf]) self.add_file(buf) self._buffers[nslc] = buf buf.recursive_grow_update([trace]) trbuf = buf.get_traces()[0] if self._fixation_length is not None: if trbuf.tmax - trbuf.tmin > self._fixation_length: self._fixate(buf) del self._buffers[nslc] def _append_to_buffer(self, trace): '''Try to append the trace to the active buffer traces. Returns the current buffer trace or None if unsuccessful. ''' nslc = trace.nslc_id if nslc not in self._buffers: return None buf = self._buffers[nslc] trbuf = buf.get_traces()[0] if (abs((trbuf.tmax+trbuf.deltat) - trace.tmin) < 1.0e-1*trbuf.deltat and trbuf.ydata.dtype == trace.ydata.dtype and trbuf.deltat == trace.deltat): trbuf.append(trace.ydata) return buf return None def fixate_all(self): for buf in self._buffers.values(): self._fixate(buf) self._buffers = {} def _fixate(self, buf): if self._path: trbuf = buf.get_traces()[0] fns = io.save([trbuf], self._path, format=self._format) self.remove_file(buf) if not self._forget_fixed: self.load_files( fns, show_progress=False, fileformat=self._format) def drop_older(self, tmax, delete_disk_files=False): self.drop( condition=lambda file: file.tmax < tmax, delete_disk_files=delete_disk_files) def drop(self, condition, delete_disk_files=False): candidates = [] buffers = list(self._buffers.values()) for file in self.iter_files(): if condition(file) and file not in buffers: candidates.append(file) self.remove_files(candidates) if delete_disk_files: for file in candidates: if file.abspath and os.path.exists(file.abspath): os.unlink(file.abspath) def __del__(self): self.fixate_all()