1# http://pyrocko.org - GPLv3
2#
3# The Pyrocko Developers, 21st Century
4# ---|P------/S----------~Lg----------
6import logging
7import weakref
8from pyrocko import squirrel as psq, trace, util
9from pyrocko import pile as classic_pile
11logger = logging.getLogger('psq.pile')
14def trace_callback_to_nut_callback(trace_callback):
15 if trace_callback is None:
16 return None
18 def nut_callback(nut):
19 return trace_callback(nut.dummy_trace)
21 return nut_callback
24class CodesDummyTrace(object):
25 def __init__(self, codes):
26 self.network, self.station, self.location, self.channel \
27 = self.nslc_id \
28 = codes[0:4]
31def trace_callback_to_codes_callback(trace_callback):
32 if trace_callback is None:
33 return None
35 def codes_callback(codes):
36 return trace_callback(CodesDummyTrace(codes))
38 return codes_callback
41class Pile(object):
42 '''
43 :py:class:`pyrocko.pile.Pile` surrogate: waveform lookup, loading and
44 caching.
46 This class emulates most of the older :py:class:`pyrocko.pile.Pile` methods
47 by using calls to a :py:class:`pyrocko.squirrel.base.Squirrel` instance
48 behind the scenes.
50 This interface can be used as a drop-in replacement for piles which are
51 used in existing scripts and programs for efficient waveform data access.
52 The Squirrel-based pile scales better for large datasets. Newer scripts
53 should use Squirrel's native methods to avoid the emulation overhead.
55 .. note::
56 Many methods in the original pile implementation lack documentation, as
57 do here. Read the source, Luke!
58 '''
59 def __init__(self, squirrel=None):
60 if squirrel is None:
61 squirrel = psq.Squirrel()
63 self._squirrel = squirrel
64 self._listeners = []
65 self._squirrel.get_database().add_listener(
66 self._notify_squirrel_to_pile)
68 def _notify_squirrel_to_pile(self, event, *args):
69 self.notify_listeners(event)
71 def add_listener(self, obj):
72 self._listeners.append(weakref.ref(obj))
74 def notify_listeners(self, what):
75 for ref in self._listeners:
76 obj = ref()
77 if obj:
78 obj.pile_changed(what)
80 def get_tmin(self):
81 return self.tmin
83 def get_tmax(self):
84 return self.tmax
86 def get_deltatmin(self):
87 return self._squirrel.get_deltat_span('waveform')[0]
89 def get_deltatmax(self):
90 return self._squirrel.get_deltat_span('waveform')[1]
92 @property
93 def deltatmin(self):
94 return self.get_deltatmin()
96 @property
97 def deltatmax(self):
98 return self.get_deltatmax()
100 @property
101 def tmin(self):
102 return self._squirrel.get_time_span('waveform')[0]
104 @property
105 def tmax(self):
106 return self._squirrel.get_time_span('waveform')[1]
108 @property
109 def networks(self):
110 return set(
111 codes.network for codes in self._squirrel.get_codes('waveform'))
113 @property
114 def stations(self):
115 return set(
116 codes.station for codes in self._squirrel.get_codes('waveform'))
118 @property
119 def locations(self):
120 return set(
121 codes.location for codes in self._squirrel.get_codes('waveform'))
123 @property
124 def channels(self):
125 return set(
126 codes.channel for codes in self._squirrel.get_codes('waveform'))
128 def is_relevant(self, tmin, tmax):
129 ptmin, ptmax = self._squirrel.get_time_span(
130 ['waveform', 'waveform_promise'])
132 if None in (ptmin, ptmax):
133 return False
135 return tmax >= ptmin and ptmax >= tmin
137 def load_files(
138 self, filenames,
139 filename_attributes=None,
140 fileformat='mseed',
141 cache=None,
142 show_progress=True,
143 update_progress=None):
145 self._squirrel.add(
146 filenames, kinds='waveform', format=fileformat)
148 def chop(
149 self, tmin, tmax,
150 nut_selector=None,
151 snap=(round, round),
152 include_last=False,
153 load_data=True,
154 accessor_id='default'):
156 nuts = self._squirrel.get_waveform_nuts(tmin=tmin, tmax=tmax)
158 if load_data:
159 traces = [
160 self._squirrel.get_content(nut, 'waveform', accessor_id)
162 for nut in nuts if nut_selector is None or nut_selector(nut)]
164 else:
165 traces = [
166 trace.Trace(**nut.trace_kwargs)
167 for nut in nuts if nut_selector is None or nut_selector(nut)]
169 self._squirrel.advance_accessor(accessor_id)
171 chopped = []
172 used_files = set()
173 for tr in traces:
174 if not load_data and tr.ydata is not None:
175 tr = tr.copy(data=False)
176 tr.ydata = None
178 try:
179 chopped.append(tr.chop(
180 tmin, tmax,
181 inplace=False,
182 snap=snap,
183 include_last=include_last))
185 except trace.NoData:
186 pass
188 return chopped, used_files
190 def _process_chopped(
191 self, chopped, degap, maxgap, maxlap, want_incomplete, wmax, wmin,
192 tpad):
194 chopped.sort(key=lambda a: a.full_id)
195 if degap:
196 chopped = trace.degapper(chopped, maxgap=maxgap, maxlap=maxlap)
198 if not want_incomplete:
199 chopped_weeded = []
200 for tr in chopped:
201 emin = tr.tmin - (wmin-tpad)
202 emax = tr.tmax + tr.deltat - (wmax+tpad)
203 if (abs(emin) <= 0.5*tr.deltat and abs(emax) <= 0.5*tr.deltat):
204 chopped_weeded.append(tr)
206 elif degap:
207 if (0. < emin <= 5. * tr.deltat and
208 -5. * tr.deltat <= emax < 0.):
210 tr.extend(
211 wmin-tpad,
212 wmax+tpad-tr.deltat,
213 fillmethod='repeat')
215 chopped_weeded.append(tr)
217 chopped = chopped_weeded
219 for tr in chopped:
220 tr.wmin = wmin
221 tr.wmax = wmax
223 return chopped
225 def chopper(
226 self,
227 tmin=None, tmax=None, tinc=None, tpad=0.,
228 trace_selector=None,
229 want_incomplete=True, degap=True, maxgap=5, maxlap=None,
230 keep_current_files_open=False, accessor_id='default',
231 snap=(round, round), include_last=False, load_data=True,
232 style=None):
234 '''
235 Get iterator for shifting window wise data extraction from waveform
236 archive.
238 :param tmin: start time (default uses start time of available data)
239 :param tmax: end time (default uses end time of available data)
240 :param tinc: time increment (window shift time) (default uses
241 ``tmax-tmin``)
242 :param tpad: padding time appended on either side of the data windows
243 (window overlap is ``2*tpad``)
244 :param trace_selector: filter callback taking
245 :py:class:`pyrocko.trace.Trace` objects
246 :param want_incomplete: if set to ``False``, gappy/incomplete traces
247 are discarded from the results
248 :param degap: whether to try to connect traces and to remove gaps and
249 overlaps
250 :param maxgap: maximum gap size in samples which is filled with
251 interpolated samples when ``degap`` is ``True``
252 :param maxlap: maximum overlap size in samples which is removed when
253 ``degap`` is ``True``
254 :param keep_current_files_open: whether to keep cached trace data in
255 memory after the iterator has ended
256 :param accessor_id: if given, used as a key to identify different
257 points of extraction for the decision of when to release cached
258 trace data (should be used when data is alternately extracted from
259 more than one region / selection)
260 :param snap: replaces Python's :py:func:`round` function which is used
261 to determine indices where to start and end the trace data array
262 :param include_last: whether to include last sample
263 :param load_data: whether to load the waveform data. If set to
264 ``False``, traces with no data samples, but with correct
265 meta-information are returned
266 :param style: set to ``'batch'`` to yield waveforms and information
267 about the chopper state as :py:class:`pyrocko.pile.Batch` objects.
268 By default lists of :py:class:`pyrocko.trace.Trace` objects are
269 yielded.
270 :returns: iterator providing extracted waveforms for each extracted
271 window. See ``style`` argument for details.
272 '''
274 if tmin is None:
275 if self.tmin is None:
276 logger.warning('Pile\'s tmin is not set - pile may be empty.')
277 return
278 tmin = self.tmin + tpad
280 if tmax is None:
281 if self.tmax is None:
282 logger.warning('Pile\'s tmax is not set - pile may be empty.')
283 return
284 tmax = self.tmax - tpad
286 if tinc is None:
287 tinc = tmax - tmin
289 if not self.is_relevant(tmin-tpad, tmax+tpad):
290 return
292 nut_selector = trace_callback_to_nut_callback(trace_selector)
294 eps = tinc * 1e-6
295 if tinc != 0.0:
296 nwin = int(((tmax - eps) - tmin) / tinc) + 1
297 else:
298 nwin = 1
300 for iwin in range(nwin):
301 wmin, wmax = tmin+iwin*tinc, min(tmin+(iwin+1)*tinc, tmax)
303 chopped, used_files = self.chop(
304 wmin-tpad, wmax+tpad, nut_selector, snap,
305 include_last, load_data, accessor_id)
307 processed = self._process_chopped(
308 chopped, degap, maxgap, maxlap, want_incomplete, wmax, wmin,
309 tpad)
311 if style == 'batch':
312 yield classic_pile.Batch(
313 tmin=wmin,
314 tmax=wmax,
315 i=iwin,
316 n=nwin,
317 traces=processed)
319 else:
320 yield processed
322 if not keep_current_files_open:
323 self._squirrel.clear_accessor(accessor_id, 'waveform')
325 def chopper_grouped(self, gather, progress=None, *args, **kwargs):
326 keys = self.gather_keys(gather)
327 if len(keys) == 0:
328 return
330 outer_trace_selector = None
331 if 'trace_selector' in kwargs:
332 outer_trace_selector = kwargs['trace_selector']
334 # the use of this gather-cache makes it impossible to modify the pile
335 # during chopping
336 pbar = None
337 if progress is not None:
338 pbar = util.progressbar(progress, len(keys))
340 for ikey, key in enumerate(keys):
341 def tsel(tr):
342 return gather(tr) == key and (outer_trace_selector is None or
343 outer_trace_selector(tr))
345 kwargs['trace_selector'] = tsel
347 for traces in self.chopper(*args, **kwargs):
348 yield traces
350 if pbar:
351 pbar.update(ikey+1)
353 if pbar:
354 pbar.finish()
356 def reload_modified(self):
357 self._squirrel.reload()
359 def iter_traces(
360 self,
361 load_data=False,
362 return_abspath=False,
363 trace_selector=None):
365 '''
366 Iterate over all traces in pile.
368 :param load_data: whether to load the waveform data, by default empty
369 traces are yielded
370 :param return_abspath: if ``True`` yield tuples containing absolute
371 file path and :py:class:`pyrocko.trace.Trace` objects
372 :param trace_selector: filter callback taking
373 :py:class:`pyrocko.trace.Trace` objects
375 '''
376 assert not load_data
377 assert not return_abspath
379 nut_selector = trace_callback_to_nut_callback(trace_selector)
381 for nut in self._squirrel.get_waveform_nuts():
382 if nut_selector is None or nut_selector(nut):
383 yield trace.Trace(**nut.trace_kwargs)
385 def gather_keys(self, gather, selector=None):
386 codes_gather = trace_callback_to_codes_callback(gather)
387 codes_selector = trace_callback_to_codes_callback(selector)
388 return self._squirrel._gather_codes_keys(
389 'waveform', codes_gather, codes_selector)
391 def snuffle(self, **kwargs):
392 '''Visualize it.
394 :param stations: list of `pyrocko.model.Station` objects or ``None``
395 :param events: list of `pyrocko.model.Event` objects or ``None``
396 :param markers: list of `pyrocko.gui_util.Marker` objects or ``None``
397 :param ntracks: float, number of tracks to be shown initially
398 (default: 12)
399 :param follow: time interval (in seconds) for real time follow mode or
400 ``None``
401 :param controls: bool, whether to show the main controls (default:
402 ``True``)
403 :param opengl: bool, whether to use opengl (default: ``False``)
404 '''
406 from pyrocko.gui.snuffler import snuffle
407 snuffle(self, **kwargs)
409 def add_file(self, mtf):
410 if isinstance(mtf, classic_pile.MemTracesFile):
411 name = self._squirrel.add_volatile_waveforms(mtf.get_traces())
412 mtf._squirrel_name = name
413 else:
414 assert False
416 def remove_file(self, mtf):
417 if isinstance(mtf, classic_pile.MemTracesFile) \
418 and getattr(mtf, '_squirrel_name', False):
420 self._squirrel.remove(mtf._squirrel_name)
421 mtf._squirrel_name = None
423 def is_empty(self):
424 return 'waveform' not in self._squirrel.get_kinds()
426 def get_update_count(self):
427 return 0
430def get_cache(_):
431 return None