Source code for pyrocko.hamster_pile

# http://pyrocko.org - GPLv3
#
# The Pyrocko Developers, 21st Century
# ---|P------/S----------~Lg----------
from __future__ import absolute_import

import os
import logging

from . import pile, io
from . import trace as tracemod

logger = logging.getLogger('pyrocko.hamster_pile')


[docs]class Processor(object): def __init__(self): self._buffers = {}
[docs] def process(self, trace): return [trace]
[docs] def get_buffer(self, trace): nslc = trace.nslc_id if nslc not in self._buffers: return None trbuf = self._buffers[nslc] if (abs((trbuf.tmax+trbuf.deltat) - trace.tmin) < 1.0e-1*trbuf.deltat and trbuf.ydata.dtype == trace.ydata.dtype and trbuf.deltat == trace.deltat): return trbuf else: return None
[docs] def set_buffer(self, trace): nslc = trace.nslc_id self._buffers[nslc] = trace
[docs] def empty_buffer(self, trace): nslc = trace.nslc_id del self._buffers[nslc]
[docs] def flush_buffers(self): traces = list(self._buffers.values()) self._buffers = {} return traces
[docs]class Renamer(object): def __init__(self, mapping): self._mapping = mapping
[docs] def process(self, trace): target_id = self._mapping(trace) if target_id is None: return [] out = trace.copy() out.set_codes(*target_id) return [out]
[docs]class Chain(Processor): def __init__(self, *processors): self._processors = processors
[docs] def process(self, trace): traces = [trace] xtraces = [] for p in self._processors: for tr in traces: xtraces.extend(p.process(traces)) return xtraces
[docs]class Downsampler(Processor): def __init__(self, mapping, deltat): Processor.__init__(self) self._mapping = mapping self._downsampler = tracemod.co_downsample_to(deltat)
[docs] def process(self, trace): target_id = self._mapping(trace) if target_id is None: return [] ds_trace = self._downsampler.send(trace) ds_trace.set_codes(*target_id) if ds_trace.data_len() == 0: return [] return [ds_trace]
def __del__(self): self._downsampler.close()
[docs]class Grower(Processor): def __init__(self, tflush=None): Processor.__init__(self) self._tflush = tflush
[docs] def process(self, trace): buffer = self.get_buffer(trace) if buffer is None: buffer = trace self.set_buffer(buffer) else: buffer.append(trace.ydata) if buffer.tmax - buffer.tmin >= self._tflush: self.empty_buffer(buffer) return [buffer] else: return []
[docs]class HamsterPile(pile.Pile): def __init__( self, fixation_length=None, path=None, format='from_extension', forget_fixed=False, processors=None): pile.Pile.__init__(self) self._buffers = {} # keys: nslc, values: MemTracesFile self._fixation_length = fixation_length self._format = format self._path = path self._forget_fixed = forget_fixed if processors is None: self._processors = [Processor()] else: self._processors = [] for p in processors: self.add_processor(p)
[docs] def set_fixation_length(self, length): '''Set length after which the fixation method is called on buffer traces. The length should be given in seconds. Give None to disable. ''' self.fixate_all() self._fixation_length = length # in seconds
[docs] def set_save_path( self, path='dump_%(network)s.%(station)s.%(location)s.%(channel)s_' '%(tmin)s_%(tmax)s.mseed'): self.fixate_all() self._path = path
[docs] def add_processor(self, processor): self.fixate_all() self._processors.append(processor)
[docs] def insert_trace(self, trace): logger.debug('Received a trace: %s' % trace) for p in self._processors: for tr in p.process(trace): self._insert_trace(tr)
def _insert_trace(self, trace): buf = self._append_to_buffer(trace) nslc = trace.nslc_id if buf is None: # create new buffer trace if nslc in self._buffers: self._fixate(self._buffers[nslc]) trbuf = trace.copy() buf = pile.MemTracesFile(None, [trbuf]) self.add_file(buf) self._buffers[nslc] = buf buf.recursive_grow_update([trace]) trbuf = buf.get_traces()[0] if self._fixation_length is not None: if trbuf.tmax - trbuf.tmin > self._fixation_length: self._fixate(buf) del self._buffers[nslc] def _append_to_buffer(self, trace): '''Try to append the trace to the active buffer traces. Returns the current buffer trace or None if unsuccessful. ''' nslc = trace.nslc_id if nslc not in self._buffers: return None buf = self._buffers[nslc] trbuf = buf.get_traces()[0] if (abs((trbuf.tmax+trbuf.deltat) - trace.tmin) < 1.0e-1*trbuf.deltat and trbuf.ydata.dtype == trace.ydata.dtype and trbuf.deltat == trace.deltat): trbuf.append(trace.ydata) return buf return None
[docs] def fixate_all(self): for buf in self._buffers.values(): self._fixate(buf) self._buffers = {}
def _fixate(self, buf): if self._path: trbuf = buf.get_traces()[0] fns = io.save([trbuf], self._path, format=self._format) self.remove_file(buf) if not self._forget_fixed: self.load_files( fns, show_progress=False, fileformat=self._format)
[docs] def drop_older(self, tmax, delete_disk_files=False): self.drop( condition=lambda file: file.tmax < tmax, delete_disk_files=delete_disk_files)
[docs] def drop(self, condition, delete_disk_files=False): candidates = [] buffers = list(self._buffers.values()) for file in self.iter_files(): if condition(file) and file not in buffers: candidates.append(file) self.remove_files(candidates) if delete_disk_files: for file in candidates: if file.abspath and os.path.exists(file.abspath): os.unlink(file.abspath)
def __del__(self): self.fixate_all()