1# http://pyrocko.org - GPLv3
2#
3# The Pyrocko Developers, 21st Century
4# ---|P------/S----------~Lg----------
6import os
7import logging
9from . import pile, io
10from . import trace as tracemod
12logger = logging.getLogger('pyrocko.hamster_pile')
15class Processor(object):
16 def __init__(self):
17 self._buffers = {}
19 def process(self, trace):
20 return [trace]
22 def get_buffer(self, trace):
24 nslc = trace.nslc_id
25 if nslc not in self._buffers:
26 return None
28 trbuf = self._buffers[nslc]
29 if (abs((trbuf.tmax+trbuf.deltat) - trace.tmin) < 1.0e-1*trbuf.deltat
30 and trbuf.ydata.dtype == trace.ydata.dtype
31 and trbuf.deltat == trace.deltat):
32 return trbuf
33 else:
34 return None
36 def set_buffer(self, trace):
37 nslc = trace.nslc_id
38 self._buffers[nslc] = trace
40 def empty_buffer(self, trace):
41 nslc = trace.nslc_id
42 del self._buffers[nslc]
44 def flush_buffers(self):
45 traces = list(self._buffers.values())
46 self._buffers = {}
47 return traces
50class Renamer(object):
51 def __init__(self, mapping):
52 self._mapping = mapping
54 def process(self, trace):
55 target_id = self._mapping(trace)
56 if target_id is None:
57 return []
59 out = trace.copy()
60 out.set_codes(*target_id)
61 return [out]
64class Chain(Processor):
65 def __init__(self, *processors):
66 self._processors = processors
68 def process(self, trace):
69 traces = [trace]
70 xtraces = []
71 for p in self._processors:
72 for tr in traces:
73 xtraces.extend(p.process(traces))
75 return xtraces
78class Downsampler(Processor):
80 def __init__(self, mapping, deltat):
81 Processor.__init__(self)
82 self._mapping = mapping
83 self._downsampler = tracemod.co_downsample_to(deltat)
85 def process(self, trace):
86 target_id = self._mapping(trace)
87 if target_id is None:
88 return []
90 ds_trace = self._downsampler.send(trace)
91 ds_trace.set_codes(*target_id)
92 if ds_trace.data_len() == 0:
93 return []
95 return [ds_trace]
97 def __del__(self):
98 self._downsampler.close()
101class Grower(Processor):
103 def __init__(self, tflush=None):
104 Processor.__init__(self)
105 self._tflush = tflush
107 def process(self, trace):
108 buffer = self.get_buffer(trace)
109 if buffer is None:
110 buffer = trace
111 self.set_buffer(buffer)
112 else:
113 buffer.append(trace.ydata)
115 if buffer.tmax - buffer.tmin >= self._tflush:
116 self.empty_buffer(buffer)
117 return [buffer]
119 else:
120 return []
123class HamsterPile(pile.Pile):
125 def __init__(
126 self,
127 fixation_length=None,
128 path=None,
129 format='from_extension',
130 forget_fixed=False,
131 processors=None):
133 pile.Pile.__init__(self)
134 self._buffers = {} # keys: nslc, values: MemTracesFile
135 self._fixation_length = fixation_length
136 self._format = format
137 self._path = path
138 self._forget_fixed = forget_fixed
139 if processors is None:
140 self._processors = [Processor()]
141 else:
142 self._processors = []
143 for p in processors:
144 self.add_processor(p)
146 def set_fixation_length(self, length):
147 '''
148 Set length after which the fixation method is called on buffer traces.
150 The length should be given in seconds. Give None to disable.
151 '''
152 self.fixate_all()
153 self._fixation_length = length # in seconds
155 def set_save_path(
156 self,
157 path='dump_%(network)s.%(station)s.%(location)s.%(channel)s_'
158 '%(tmin)s_%(tmax)s.mseed'):
160 self.fixate_all()
161 self._path = path
163 def add_processor(self, processor):
164 self.fixate_all()
165 self._processors.append(processor)
167 def insert_trace(self, trace):
168 logger.debug('Received a trace: %s' % trace)
170 for p in self._processors:
171 for tr in p.process(trace):
172 self._insert_trace(tr)
174 def _insert_trace(self, trace):
175 buf = self._append_to_buffer(trace)
176 nslc = trace.nslc_id
178 if buf is None: # create new buffer trace
179 if nslc in self._buffers:
180 self._fixate(self._buffers[nslc])
182 trbuf = trace.copy()
183 buf = pile.MemTracesFile(None, [trbuf])
184 self.add_file(buf)
185 self._buffers[nslc] = buf
187 buf.recursive_grow_update([trace])
188 trbuf = buf.get_traces()[0]
189 if self._fixation_length is not None:
190 if trbuf.tmax - trbuf.tmin > self._fixation_length:
191 self._fixate(buf)
192 del self._buffers[nslc]
194 def _append_to_buffer(self, trace):
195 '''
196 Try to append the trace to the active buffer traces.
198 Returns the current buffer trace or None if unsuccessful.
199 '''
201 nslc = trace.nslc_id
202 if nslc not in self._buffers:
203 return None
205 buf = self._buffers[nslc]
206 trbuf = buf.get_traces()[0]
207 if (abs((trbuf.tmax+trbuf.deltat) - trace.tmin) < 1.0e-1*trbuf.deltat
208 and trbuf.ydata.dtype == trace.ydata.dtype
209 and trbuf.deltat == trace.deltat):
211 trbuf.append(trace.ydata)
212 return buf
214 return None
216 def fixate_all(self):
217 for buf in self._buffers.values():
218 self._fixate(buf)
220 self._buffers = {}
222 def _fixate(self, buf):
223 if self._path:
224 trbuf = buf.get_traces()[0]
225 fns = io.save([trbuf], self._path, format=self._format)
227 self.remove_file(buf)
228 if not self._forget_fixed:
229 self.load_files(
230 fns, show_progress=False, fileformat=self._format)
232 def drop_older(self, tmax, delete_disk_files=False):
233 self.drop(
234 condition=lambda file: file.tmax < tmax,
235 delete_disk_files=delete_disk_files)
237 def drop(self, condition, delete_disk_files=False):
238 candidates = []
239 buffers = list(self._buffers.values())
240 for file in self.iter_files():
241 if condition(file) and file not in buffers:
242 candidates.append(file)
244 self.remove_files(candidates)
245 if delete_disk_files:
246 for file in candidates:
247 if file.abspath and os.path.exists(file.abspath):
248 os.unlink(file.abspath)
250 def __del__(self):
251 self.fixate_all()