Coverage for /usr/local/lib/python3.11/dist-packages/pyrocko/hamster_pile.py: 25%
158 statements
« prev ^ index » next coverage.py v6.5.0, created at 2023-10-06 15:01 +0000
« prev ^ index » next coverage.py v6.5.0, created at 2023-10-06 15:01 +0000
1# http://pyrocko.org - GPLv3
2#
3# The Pyrocko Developers, 21st Century
4# ---|P------/S----------~Lg----------
6'''
7Real-time data acquisition and processing helpers for :py:mod:`pyrocko.pile`.
8'''
10import os
11import logging
13from . import pile, io
14from . import trace as tracemod
16logger = logging.getLogger('pyrocko.hamster_pile')
19class Processor(object):
20 def __init__(self):
21 self._buffers = {}
23 def process(self, trace):
24 return [trace]
26 def get_buffer(self, trace):
28 nslc = trace.nslc_id
29 if nslc not in self._buffers:
30 return None
32 trbuf = self._buffers[nslc]
33 if (abs((trbuf.tmax+trbuf.deltat) - trace.tmin) < 1.0e-1*trbuf.deltat
34 and trbuf.ydata.dtype == trace.ydata.dtype
35 and trbuf.deltat == trace.deltat):
36 return trbuf
37 else:
38 return None
40 def set_buffer(self, trace):
41 nslc = trace.nslc_id
42 self._buffers[nslc] = trace
44 def empty_buffer(self, trace):
45 nslc = trace.nslc_id
46 del self._buffers[nslc]
48 def flush_buffers(self):
49 traces = list(self._buffers.values())
50 self._buffers = {}
51 return traces
54class Renamer(object):
55 def __init__(self, mapping):
56 self._mapping = mapping
58 def process(self, trace):
59 target_id = self._mapping(trace)
60 if target_id is None:
61 return []
63 out = trace.copy()
64 out.set_codes(*target_id)
65 return [out]
68class Chain(Processor):
69 def __init__(self, *processors):
70 self._processors = processors
72 def process(self, trace):
73 traces = [trace]
74 xtraces = []
75 for p in self._processors:
76 for tr in traces:
77 xtraces.extend(p.process(traces))
79 return xtraces
82class Downsampler(Processor):
84 def __init__(self, mapping, deltat):
85 Processor.__init__(self)
86 self._mapping = mapping
87 self._downsampler = tracemod.co_downsample_to(deltat)
89 def process(self, trace):
90 target_id = self._mapping(trace)
91 if target_id is None:
92 return []
94 ds_trace = self._downsampler.send(trace)
95 ds_trace.set_codes(*target_id)
96 if ds_trace.data_len() == 0:
97 return []
99 return [ds_trace]
101 def __del__(self):
102 self._downsampler.close()
105class Grower(Processor):
107 def __init__(self, tflush=None):
108 Processor.__init__(self)
109 self._tflush = tflush
111 def process(self, trace):
112 buffer = self.get_buffer(trace)
113 if buffer is None:
114 buffer = trace
115 self.set_buffer(buffer)
116 else:
117 buffer.append(trace.ydata)
119 if buffer.tmax - buffer.tmin >= self._tflush:
120 self.empty_buffer(buffer)
121 return [buffer]
123 else:
124 return []
127class HamsterPile(pile.Pile):
129 def __init__(
130 self,
131 fixation_length=None,
132 path=None,
133 format='from_extension',
134 forget_fixed=False,
135 processors=None):
137 pile.Pile.__init__(self)
138 self._buffers = {} # keys: nslc, values: MemTracesFile
139 self._fixation_length = fixation_length
140 self._format = format
141 self._path = path
142 self._forget_fixed = forget_fixed
143 if processors is None:
144 self._processors = [Processor()]
145 else:
146 self._processors = []
147 for p in processors:
148 self.add_processor(p)
150 def set_fixation_length(self, length):
151 '''
152 Set length after which the fixation method is called on buffer traces.
154 The length should be given in seconds. Give None to disable.
155 '''
156 self.fixate_all()
157 self._fixation_length = length # in seconds
159 def set_save_path(
160 self,
161 path='dump_%(network)s.%(station)s.%(location)s.%(channel)s_'
162 '%(tmin)s_%(tmax)s.mseed'):
164 self.fixate_all()
165 self._path = path
167 def add_processor(self, processor):
168 self.fixate_all()
169 self._processors.append(processor)
171 def insert_trace(self, trace):
172 logger.debug('Received a trace: %s' % trace)
174 for p in self._processors:
175 for tr in p.process(trace):
176 self._insert_trace(tr)
178 def _insert_trace(self, trace):
179 buf = self._append_to_buffer(trace)
180 nslc = trace.nslc_id
182 if buf is None: # create new buffer trace
183 if nslc in self._buffers:
184 self._fixate(self._buffers[nslc])
186 trbuf = trace.copy()
187 buf = pile.MemTracesFile(None, [trbuf])
188 self.add_file(buf)
189 self._buffers[nslc] = buf
191 buf.recursive_grow_update([trace])
192 trbuf = buf.get_traces()[0]
193 if self._fixation_length is not None:
194 if trbuf.tmax - trbuf.tmin > self._fixation_length:
195 self._fixate(buf)
196 del self._buffers[nslc]
198 def _append_to_buffer(self, trace):
199 '''
200 Try to append the trace to the active buffer traces.
202 Returns the current buffer trace or None if unsuccessful.
203 '''
205 nslc = trace.nslc_id
206 if nslc not in self._buffers:
207 return None
209 buf = self._buffers[nslc]
210 trbuf = buf.get_traces()[0]
211 if (abs((trbuf.tmax+trbuf.deltat) - trace.tmin) < 1.0e-1*trbuf.deltat
212 and trbuf.ydata.dtype == trace.ydata.dtype
213 and trbuf.deltat == trace.deltat):
215 trbuf.append(trace.ydata)
216 return buf
218 return None
220 def fixate_all(self):
221 for buf in self._buffers.values():
222 self._fixate(buf)
224 self._buffers = {}
226 def _fixate(self, buf):
227 if self._path:
228 trbuf = buf.get_traces()[0]
229 fns = io.save([trbuf], self._path, format=self._format)
231 self.remove_file(buf)
232 if not self._forget_fixed:
233 self.load_files(
234 fns, show_progress=False, fileformat=self._format)
236 def drop_older(self, tmax, delete_disk_files=False):
237 self.drop(
238 condition=lambda file: file.tmax < tmax,
239 delete_disk_files=delete_disk_files)
241 def drop(self, condition, delete_disk_files=False):
242 candidates = []
243 buffers = list(self._buffers.values())
244 for file in self.iter_files():
245 if condition(file) and file not in buffers:
246 candidates.append(file)
248 self.remove_files(candidates)
249 if delete_disk_files:
250 for file in candidates:
251 if file.abspath and os.path.exists(file.abspath):
252 os.unlink(file.abspath)
254 def __del__(self):
255 self.fixate_all()