1# http://pyrocko.org - GPLv3
2#
3# The Pyrocko Developers, 21st Century
4# ---|P------/S----------~Lg----------
5from __future__ import absolute_import
7import os
8import logging
10from . import pile, io
11from . import trace as tracemod
13logger = logging.getLogger('pyrocko.hamster_pile')
16class Processor(object):
17 def __init__(self):
18 self._buffers = {}
20 def process(self, trace):
21 return [trace]
23 def get_buffer(self, trace):
25 nslc = trace.nslc_id
26 if nslc not in self._buffers:
27 return None
29 trbuf = self._buffers[nslc]
30 if (abs((trbuf.tmax+trbuf.deltat) - trace.tmin) < 1.0e-1*trbuf.deltat
31 and trbuf.ydata.dtype == trace.ydata.dtype
32 and trbuf.deltat == trace.deltat):
33 return trbuf
34 else:
35 return None
37 def set_buffer(self, trace):
38 nslc = trace.nslc_id
39 self._buffers[nslc] = trace
41 def empty_buffer(self, trace):
42 nslc = trace.nslc_id
43 del self._buffers[nslc]
45 def flush_buffers(self):
46 traces = list(self._buffers.values())
47 self._buffers = {}
48 return traces
51class Renamer(object):
52 def __init__(self, mapping):
53 self._mapping = mapping
55 def process(self, trace):
56 target_id = self._mapping(trace)
57 if target_id is None:
58 return []
60 out = trace.copy()
61 out.set_codes(*target_id)
62 return [out]
65class Chain(Processor):
66 def __init__(self, *processors):
67 self._processors = processors
69 def process(self, trace):
70 traces = [trace]
71 xtraces = []
72 for p in self._processors:
73 for tr in traces:
74 xtraces.extend(p.process(traces))
76 return xtraces
79class Downsampler(Processor):
81 def __init__(self, mapping, deltat):
82 Processor.__init__(self)
83 self._mapping = mapping
84 self._downsampler = tracemod.co_downsample_to(deltat)
86 def process(self, trace):
87 target_id = self._mapping(trace)
88 if target_id is None:
89 return []
91 ds_trace = self._downsampler.send(trace)
92 ds_trace.set_codes(*target_id)
93 if ds_trace.data_len() == 0:
94 return []
96 return [ds_trace]
98 def __del__(self):
99 self._downsampler.close()
102class Grower(Processor):
104 def __init__(self, tflush=None):
105 Processor.__init__(self)
106 self._tflush = tflush
108 def process(self, trace):
109 buffer = self.get_buffer(trace)
110 if buffer is None:
111 buffer = trace
112 self.set_buffer(buffer)
113 else:
114 buffer.append(trace.ydata)
116 if buffer.tmax - buffer.tmin >= self._tflush:
117 self.empty_buffer(buffer)
118 return [buffer]
120 else:
121 return []
124class HamsterPile(pile.Pile):
126 def __init__(
127 self,
128 fixation_length=None,
129 path=None,
130 format='from_extension',
131 forget_fixed=False,
132 processors=None):
134 pile.Pile.__init__(self)
135 self._buffers = {} # keys: nslc, values: MemTracesFile
136 self._fixation_length = fixation_length
137 self._format = format
138 self._path = path
139 self._forget_fixed = forget_fixed
140 if processors is None:
141 self._processors = [Processor()]
142 else:
143 self._processors = []
144 for p in processors:
145 self.add_processor(p)
147 def set_fixation_length(self, length):
148 '''
149 Set length after which the fixation method is called on buffer traces.
151 The length should be given in seconds. Give None to disable.
152 '''
153 self.fixate_all()
154 self._fixation_length = length # in seconds
156 def set_save_path(
157 self,
158 path='dump_%(network)s.%(station)s.%(location)s.%(channel)s_'
159 '%(tmin)s_%(tmax)s.mseed'):
161 self.fixate_all()
162 self._path = path
164 def add_processor(self, processor):
165 self.fixate_all()
166 self._processors.append(processor)
168 def insert_trace(self, trace):
169 logger.debug('Received a trace: %s' % trace)
171 for p in self._processors:
172 for tr in p.process(trace):
173 self._insert_trace(tr)
175 def _insert_trace(self, trace):
176 buf = self._append_to_buffer(trace)
177 nslc = trace.nslc_id
179 if buf is None: # create new buffer trace
180 if nslc in self._buffers:
181 self._fixate(self._buffers[nslc])
183 trbuf = trace.copy()
184 buf = pile.MemTracesFile(None, [trbuf])
185 self.add_file(buf)
186 self._buffers[nslc] = buf
188 buf.recursive_grow_update([trace])
189 trbuf = buf.get_traces()[0]
190 if self._fixation_length is not None:
191 if trbuf.tmax - trbuf.tmin > self._fixation_length:
192 self._fixate(buf)
193 del self._buffers[nslc]
195 def _append_to_buffer(self, trace):
196 '''
197 Try to append the trace to the active buffer traces.
199 Returns the current buffer trace or None if unsuccessful.
200 '''
202 nslc = trace.nslc_id
203 if nslc not in self._buffers:
204 return None
206 buf = self._buffers[nslc]
207 trbuf = buf.get_traces()[0]
208 if (abs((trbuf.tmax+trbuf.deltat) - trace.tmin) < 1.0e-1*trbuf.deltat
209 and trbuf.ydata.dtype == trace.ydata.dtype
210 and trbuf.deltat == trace.deltat):
212 trbuf.append(trace.ydata)
213 return buf
215 return None
217 def fixate_all(self):
218 for buf in self._buffers.values():
219 self._fixate(buf)
221 self._buffers = {}
223 def _fixate(self, buf):
224 if self._path:
225 trbuf = buf.get_traces()[0]
226 fns = io.save([trbuf], self._path, format=self._format)
228 self.remove_file(buf)
229 if not self._forget_fixed:
230 self.load_files(
231 fns, show_progress=False, fileformat=self._format)
233 def drop_older(self, tmax, delete_disk_files=False):
234 self.drop(
235 condition=lambda file: file.tmax < tmax,
236 delete_disk_files=delete_disk_files)
238 def drop(self, condition, delete_disk_files=False):
239 candidates = []
240 buffers = list(self._buffers.values())
241 for file in self.iter_files():
242 if condition(file) and file not in buffers:
243 candidates.append(file)
245 self.remove_files(candidates)
246 if delete_disk_files:
247 for file in candidates:
248 if file.abspath and os.path.exists(file.abspath):
249 os.unlink(file.abspath)
251 def __del__(self):
252 self.fixate_all()