Coverage for /usr/local/lib/python3.11/dist-packages/pyrocko/squirrel/pile.py: 83%
193 statements
« prev ^ index » next coverage.py v6.5.0, created at 2023-10-06 06:59 +0000
« prev ^ index » next coverage.py v6.5.0, created at 2023-10-06 06:59 +0000
1# http://pyrocko.org - GPLv3
2#
3# The Pyrocko Developers, 21st Century
4# ---|P------/S----------~Lg----------
6'''
7Emulation of the older :py:mod:`pyrocko.pile` interface.
8'''
10import logging
11from pyrocko import squirrel as psq, trace, util
12from pyrocko import pile as classic_pile
14logger = logging.getLogger('psq.pile')
17def trace_callback_to_nut_callback(trace_callback):
18 if trace_callback is None:
19 return None
21 def nut_callback(nut):
22 return trace_callback(nut.dummy_trace)
24 return nut_callback
27class CodesDummyTrace(object):
28 def __init__(self, codes):
29 self.network, self.station, self.location, self.channel \
30 = self.nslc_id \
31 = codes[0:4]
34def trace_callback_to_codes_callback(trace_callback):
35 if trace_callback is None:
36 return None
38 def codes_callback(codes):
39 return trace_callback(CodesDummyTrace(codes))
41 return codes_callback
44class Pile(object):
45 '''
46 :py:class:`pyrocko.pile.Pile` surrogate: waveform lookup, loading and
47 caching.
49 This class emulates most of the older :py:class:`pyrocko.pile.Pile` methods
50 by using calls to a :py:class:`pyrocko.squirrel.base.Squirrel` instance
51 behind the scenes.
53 This interface can be used as a drop-in replacement for piles which are
54 used in existing scripts and programs for efficient waveform data access.
55 The Squirrel-based pile scales better for large datasets. Newer scripts
56 should use Squirrel's native methods to avoid the emulation overhead.
58 .. note::
59 Many methods in the original pile implementation lack documentation, as
60 do here. Read the source, Luke!
61 '''
62 def __init__(self, squirrel=None):
63 if squirrel is None:
64 squirrel = psq.Squirrel()
66 self._squirrel = squirrel
67 self._listeners = []
68 self._squirrel.get_database().add_listener(
69 self._notify_squirrel_to_pile)
71 def _notify_squirrel_to_pile(self, event, *args):
72 self.notify_listeners(event)
74 def add_listener(self, obj):
75 self._listeners.append(util.smart_weakref(obj))
77 def notify_listeners(self, what):
78 for ref in self._listeners:
79 obj = ref()
80 if obj:
81 obj(what, [])
83 def get_tmin(self):
84 return self.tmin
86 def get_tmax(self):
87 return self.tmax
89 def get_deltatmin(self):
90 return self._squirrel.get_deltat_span('waveform')[0]
92 def get_deltatmax(self):
93 return self._squirrel.get_deltat_span('waveform')[1]
95 @property
96 def deltatmin(self):
97 return self.get_deltatmin()
99 @property
100 def deltatmax(self):
101 return self.get_deltatmax()
103 @property
104 def tmin(self):
105 return self._squirrel.get_time_span('waveform', dummy_limits=False)[0]
107 @property
108 def tmax(self):
109 return self._squirrel.get_time_span('waveform', dummy_limits=False)[1]
111 @property
112 def networks(self):
113 return set(
114 codes.network for codes in self._squirrel.get_codes('waveform'))
116 @property
117 def stations(self):
118 return set(
119 codes.station for codes in self._squirrel.get_codes('waveform'))
121 @property
122 def locations(self):
123 return set(
124 codes.location for codes in self._squirrel.get_codes('waveform'))
126 @property
127 def channels(self):
128 return set(
129 codes.channel for codes in self._squirrel.get_codes('waveform'))
131 def is_relevant(self, tmin, tmax):
132 ptmin, ptmax = self._squirrel.get_time_span(
133 ['waveform', 'waveform_promise'], dummy_limits=False)
135 if None in (ptmin, ptmax):
136 return False
138 return tmax >= ptmin and ptmax >= tmin
140 def load_files(
141 self, filenames,
142 filename_attributes=None,
143 fileformat='mseed',
144 cache=None,
145 show_progress=True,
146 update_progress=None):
148 self._squirrel.add(
149 filenames, kinds='waveform', format=fileformat)
151 def chop(
152 self, tmin, tmax,
153 nut_selector=None,
154 snap=(round, round),
155 include_last=False,
156 load_data=True,
157 accessor_id='default'):
159 nuts = self._squirrel.get_waveform_nuts(tmin=tmin, tmax=tmax)
161 if load_data:
162 traces = [
163 self._squirrel.get_content(nut, 'waveform', accessor_id)
165 for nut in nuts if nut_selector is None or nut_selector(nut)]
167 else:
168 traces = [
169 trace.Trace(**nut.trace_kwargs)
170 for nut in nuts if nut_selector is None or nut_selector(nut)]
172 self._squirrel.advance_accessor(accessor_id)
174 chopped = []
175 used_files = set()
176 for tr in traces:
177 if not load_data and tr.ydata is not None:
178 tr = tr.copy(data=False)
179 tr.ydata = None
181 try:
182 chopped.append(tr.chop(
183 tmin, tmax,
184 inplace=False,
185 snap=snap,
186 include_last=include_last))
188 except trace.NoData:
189 pass
191 return chopped, used_files
193 def _process_chopped(
194 self, chopped, degap, maxgap, maxlap, want_incomplete, wmax, wmin,
195 tpad):
197 chopped.sort(key=lambda a: a.full_id)
198 if degap:
199 chopped = trace.degapper(chopped, maxgap=maxgap, maxlap=maxlap)
201 if not want_incomplete:
202 chopped_weeded = []
203 for tr in chopped:
204 emin = tr.tmin - (wmin-tpad)
205 emax = tr.tmax + tr.deltat - (wmax+tpad)
206 if (abs(emin) <= 0.5*tr.deltat and abs(emax) <= 0.5*tr.deltat):
207 chopped_weeded.append(tr)
209 elif degap:
210 if (0. < emin <= 5. * tr.deltat and
211 -5. * tr.deltat <= emax < 0.):
213 tr.extend(
214 wmin-tpad,
215 wmax+tpad-tr.deltat,
216 fillmethod='repeat')
218 chopped_weeded.append(tr)
220 chopped = chopped_weeded
222 for tr in chopped:
223 tr.wmin = wmin
224 tr.wmax = wmax
226 return chopped
228 def chopper(
229 self,
230 tmin=None, tmax=None, tinc=None, tpad=0.,
231 trace_selector=None,
232 want_incomplete=True, degap=True, maxgap=5, maxlap=None,
233 keep_current_files_open=False, accessor_id='default',
234 snap=(round, round), include_last=False, load_data=True,
235 style=None):
237 '''
238 Get iterator for shifting window wise data extraction from waveform
239 archive.
241 :param tmin: start time (default uses start time of available data)
242 :param tmax: end time (default uses end time of available data)
243 :param tinc: time increment (window shift time) (default uses
244 ``tmax-tmin``)
245 :param tpad: padding time appended on either side of the data windows
246 (window overlap is ``2*tpad``)
247 :param trace_selector: filter callback taking
248 :py:class:`pyrocko.trace.Trace` objects
249 :param want_incomplete: if set to ``False``, gappy/incomplete traces
250 are discarded from the results
251 :param degap: whether to try to connect traces and to remove gaps and
252 overlaps
253 :param maxgap: maximum gap size in samples which is filled with
254 interpolated samples when ``degap`` is ``True``
255 :param maxlap: maximum overlap size in samples which is removed when
256 ``degap`` is ``True``
257 :param keep_current_files_open: whether to keep cached trace data in
258 memory after the iterator has ended
259 :param accessor_id: if given, used as a key to identify different
260 points of extraction for the decision of when to release cached
261 trace data (should be used when data is alternately extracted from
262 more than one region / selection)
263 :param snap: replaces Python's :py:func:`round` function which is used
264 to determine indices where to start and end the trace data array
265 :param include_last: whether to include last sample
266 :param load_data: whether to load the waveform data. If set to
267 ``False``, traces with no data samples, but with correct
268 meta-information are returned
269 :param style: set to ``'batch'`` to yield waveforms and information
270 about the chopper state as :py:class:`pyrocko.pile.Batch` objects.
271 By default lists of :py:class:`pyrocko.trace.Trace` objects are
272 yielded.
273 :returns: iterator providing extracted waveforms for each extracted
274 window. See ``style`` argument for details.
275 '''
277 if tmin is None:
278 if self.tmin is None:
279 logger.warning("Pile's tmin is not set - pile may be empty.")
280 return
281 tmin = self.tmin + tpad
283 if tmax is None:
284 if self.tmax is None:
285 logger.warning("Pile's tmax is not set - pile may be empty.")
286 return
287 tmax = self.tmax - tpad
289 if tinc is None:
290 tinc = tmax - tmin
292 if not self.is_relevant(tmin-tpad, tmax+tpad):
293 return
295 nut_selector = trace_callback_to_nut_callback(trace_selector)
297 eps = tinc * 1e-6
298 if tinc != 0.0:
299 nwin = int(((tmax - eps) - tmin) / tinc) + 1
300 else:
301 nwin = 1
303 for iwin in range(nwin):
304 wmin, wmax = tmin+iwin*tinc, min(tmin+(iwin+1)*tinc, tmax)
306 chopped, used_files = self.chop(
307 wmin-tpad, wmax+tpad, nut_selector, snap,
308 include_last, load_data, accessor_id)
310 processed = self._process_chopped(
311 chopped, degap, maxgap, maxlap, want_incomplete, wmax, wmin,
312 tpad)
314 if style == 'batch':
315 yield classic_pile.Batch(
316 tmin=wmin,
317 tmax=wmax,
318 i=iwin,
319 n=nwin,
320 traces=processed)
322 else:
323 yield processed
325 if not keep_current_files_open:
326 self._squirrel.clear_accessor(accessor_id, 'waveform')
328 def chopper_grouped(self, gather, progress=None, *args, **kwargs):
329 keys = self.gather_keys(gather)
330 if len(keys) == 0:
331 return
333 outer_trace_selector = None
334 if 'trace_selector' in kwargs:
335 outer_trace_selector = kwargs['trace_selector']
337 # the use of this gather-cache makes it impossible to modify the pile
338 # during chopping
339 pbar = None
340 try:
341 if progress is not None:
342 pbar = util.progressbar(progress, len(keys))
344 for ikey, key in enumerate(keys):
345 def tsel(tr):
346 return gather(tr) == key and (
347 outer_trace_selector is None or
348 outer_trace_selector(tr))
350 kwargs['trace_selector'] = tsel
352 for traces in self.chopper(*args, **kwargs):
353 yield traces
355 if pbar:
356 pbar.update(ikey+1)
358 finally:
359 if pbar:
360 pbar.finish()
362 def reload_modified(self):
363 self._squirrel.reload()
365 def iter_traces(
366 self,
367 load_data=False,
368 return_abspath=False,
369 trace_selector=None):
371 '''
372 Iterate over all traces in pile.
374 :param load_data: whether to load the waveform data, by default empty
375 traces are yielded
376 :param return_abspath: if ``True`` yield tuples containing absolute
377 file path and :py:class:`pyrocko.trace.Trace` objects
378 :param trace_selector: filter callback taking
379 :py:class:`pyrocko.trace.Trace` objects
381 '''
382 assert not load_data
383 assert not return_abspath
385 nut_selector = trace_callback_to_nut_callback(trace_selector)
387 for nut in self._squirrel.get_waveform_nuts():
388 if nut_selector is None or nut_selector(nut):
389 yield trace.Trace(**nut.trace_kwargs)
391 def gather_keys(self, gather, selector=None):
392 codes_gather = trace_callback_to_codes_callback(gather)
393 codes_selector = trace_callback_to_codes_callback(selector)
394 return self._squirrel._gather_codes_keys(
395 'waveform', codes_gather, codes_selector)
397 def snuffle(self, **kwargs):
398 '''Visualize it.
400 :param stations: list of `pyrocko.model.Station` objects or ``None``
401 :param events: list of `pyrocko.model.Event` objects or ``None``
402 :param markers: list of `pyrocko.gui_util.Marker` objects or ``None``
403 :param ntracks: float, number of tracks to be shown initially
404 (default: 12)
405 :param follow: time interval (in seconds) for real time follow mode or
406 ``None``
407 :param controls: bool, whether to show the main controls (default:
408 ``True``)
409 :param opengl: bool, whether to use opengl (default: ``False``)
410 '''
412 from pyrocko.gui.snuffler import snuffle
413 snuffle(self, **kwargs)
415 def add_file(self, mtf):
416 if isinstance(mtf, classic_pile.MemTracesFile):
417 name = self._squirrel.add_volatile_waveforms(mtf.get_traces())
418 mtf._squirrel_name = name
419 else:
420 assert False
422 def remove_file(self, mtf):
423 if isinstance(mtf, classic_pile.MemTracesFile) \
424 and getattr(mtf, '_squirrel_name', False):
426 self._squirrel.remove(mtf._squirrel_name)
427 mtf._squirrel_name = None
429 def is_empty(self):
430 return 'waveform' not in self._squirrel.get_kinds()
432 def get_update_count(self):
433 return 0
436def get_cache(_):
437 return None