1# http://pyrocko.org - GPLv3
2#
3# The Pyrocko Developers, 21st Century
4# ---|P------/S----------~Lg----------
6import logging
7from pyrocko import squirrel as psq, trace, util
8from pyrocko import pile as classic_pile
10logger = logging.getLogger('psq.pile')
13def trace_callback_to_nut_callback(trace_callback):
14 if trace_callback is None:
15 return None
17 def nut_callback(nut):
18 return trace_callback(nut.dummy_trace)
20 return nut_callback
23class CodesDummyTrace(object):
24 def __init__(self, codes):
25 self.network, self.station, self.location, self.channel \
26 = self.nslc_id \
27 = codes[0:4]
30def trace_callback_to_codes_callback(trace_callback):
31 if trace_callback is None:
32 return None
34 def codes_callback(codes):
35 return trace_callback(CodesDummyTrace(codes))
37 return codes_callback
40class Pile(object):
41 '''
42 :py:class:`pyrocko.pile.Pile` surrogate: waveform lookup, loading and
43 caching.
45 This class emulates most of the older :py:class:`pyrocko.pile.Pile` methods
46 by using calls to a :py:class:`pyrocko.squirrel.base.Squirrel` instance
47 behind the scenes.
49 This interface can be used as a drop-in replacement for piles which are
50 used in existing scripts and programs for efficient waveform data access.
51 The Squirrel-based pile scales better for large datasets. Newer scripts
52 should use Squirrel's native methods to avoid the emulation overhead.
54 .. note::
55 Many methods in the original pile implementation lack documentation, as
56 do here. Read the source, Luke!
57 '''
58 def __init__(self, squirrel=None):
59 if squirrel is None:
60 squirrel = psq.Squirrel()
62 self._squirrel = squirrel
63 self._listeners = []
64 self._squirrel.get_database().add_listener(
65 self._notify_squirrel_to_pile)
67 def _notify_squirrel_to_pile(self, event, *args):
68 self.notify_listeners(event)
70 def add_listener(self, obj):
71 self._listeners.append(util.smart_weakref(obj))
73 def notify_listeners(self, what):
74 for ref in self._listeners:
75 obj = ref()
76 if obj:
77 obj.pile_changed(what, [])
79 def get_tmin(self):
80 return self.tmin
82 def get_tmax(self):
83 return self.tmax
85 def get_deltatmin(self):
86 return self._squirrel.get_deltat_span('waveform')[0]
88 def get_deltatmax(self):
89 return self._squirrel.get_deltat_span('waveform')[1]
91 @property
92 def deltatmin(self):
93 return self.get_deltatmin()
95 @property
96 def deltatmax(self):
97 return self.get_deltatmax()
99 @property
100 def tmin(self):
101 return self._squirrel.get_time_span('waveform')[0]
103 @property
104 def tmax(self):
105 return self._squirrel.get_time_span('waveform')[1]
107 @property
108 def networks(self):
109 return set(
110 codes.network for codes in self._squirrel.get_codes('waveform'))
112 @property
113 def stations(self):
114 return set(
115 codes.station for codes in self._squirrel.get_codes('waveform'))
117 @property
118 def locations(self):
119 return set(
120 codes.location for codes in self._squirrel.get_codes('waveform'))
122 @property
123 def channels(self):
124 return set(
125 codes.channel for codes in self._squirrel.get_codes('waveform'))
127 def is_relevant(self, tmin, tmax):
128 ptmin, ptmax = self._squirrel.get_time_span(
129 ['waveform', 'waveform_promise'])
131 if None in (ptmin, ptmax):
132 return False
134 return tmax >= ptmin and ptmax >= tmin
136 def load_files(
137 self, filenames,
138 filename_attributes=None,
139 fileformat='mseed',
140 cache=None,
141 show_progress=True,
142 update_progress=None):
144 self._squirrel.add(
145 filenames, kinds='waveform', format=fileformat)
147 def chop(
148 self, tmin, tmax,
149 nut_selector=None,
150 snap=(round, round),
151 include_last=False,
152 load_data=True,
153 accessor_id='default'):
155 nuts = self._squirrel.get_waveform_nuts(tmin=tmin, tmax=tmax)
157 if load_data:
158 traces = [
159 self._squirrel.get_content(nut, 'waveform', accessor_id)
161 for nut in nuts if nut_selector is None or nut_selector(nut)]
163 else:
164 traces = [
165 trace.Trace(**nut.trace_kwargs)
166 for nut in nuts if nut_selector is None or nut_selector(nut)]
168 self._squirrel.advance_accessor(accessor_id)
170 chopped = []
171 used_files = set()
172 for tr in traces:
173 if not load_data and tr.ydata is not None:
174 tr = tr.copy(data=False)
175 tr.ydata = None
177 try:
178 chopped.append(tr.chop(
179 tmin, tmax,
180 inplace=False,
181 snap=snap,
182 include_last=include_last))
184 except trace.NoData:
185 pass
187 return chopped, used_files
189 def _process_chopped(
190 self, chopped, degap, maxgap, maxlap, want_incomplete, wmax, wmin,
191 tpad):
193 chopped.sort(key=lambda a: a.full_id)
194 if degap:
195 chopped = trace.degapper(chopped, maxgap=maxgap, maxlap=maxlap)
197 if not want_incomplete:
198 chopped_weeded = []
199 for tr in chopped:
200 emin = tr.tmin - (wmin-tpad)
201 emax = tr.tmax + tr.deltat - (wmax+tpad)
202 if (abs(emin) <= 0.5*tr.deltat and abs(emax) <= 0.5*tr.deltat):
203 chopped_weeded.append(tr)
205 elif degap:
206 if (0. < emin <= 5. * tr.deltat and
207 -5. * tr.deltat <= emax < 0.):
209 tr.extend(
210 wmin-tpad,
211 wmax+tpad-tr.deltat,
212 fillmethod='repeat')
214 chopped_weeded.append(tr)
216 chopped = chopped_weeded
218 for tr in chopped:
219 tr.wmin = wmin
220 tr.wmax = wmax
222 return chopped
224 def chopper(
225 self,
226 tmin=None, tmax=None, tinc=None, tpad=0.,
227 trace_selector=None,
228 want_incomplete=True, degap=True, maxgap=5, maxlap=None,
229 keep_current_files_open=False, accessor_id='default',
230 snap=(round, round), include_last=False, load_data=True,
231 style=None):
233 '''
234 Get iterator for shifting window wise data extraction from waveform
235 archive.
237 :param tmin: start time (default uses start time of available data)
238 :param tmax: end time (default uses end time of available data)
239 :param tinc: time increment (window shift time) (default uses
240 ``tmax-tmin``)
241 :param tpad: padding time appended on either side of the data windows
242 (window overlap is ``2*tpad``)
243 :param trace_selector: filter callback taking
244 :py:class:`pyrocko.trace.Trace` objects
245 :param want_incomplete: if set to ``False``, gappy/incomplete traces
246 are discarded from the results
247 :param degap: whether to try to connect traces and to remove gaps and
248 overlaps
249 :param maxgap: maximum gap size in samples which is filled with
250 interpolated samples when ``degap`` is ``True``
251 :param maxlap: maximum overlap size in samples which is removed when
252 ``degap`` is ``True``
253 :param keep_current_files_open: whether to keep cached trace data in
254 memory after the iterator has ended
255 :param accessor_id: if given, used as a key to identify different
256 points of extraction for the decision of when to release cached
257 trace data (should be used when data is alternately extracted from
258 more than one region / selection)
259 :param snap: replaces Python's :py:func:`round` function which is used
260 to determine indices where to start and end the trace data array
261 :param include_last: whether to include last sample
262 :param load_data: whether to load the waveform data. If set to
263 ``False``, traces with no data samples, but with correct
264 meta-information are returned
265 :param style: set to ``'batch'`` to yield waveforms and information
266 about the chopper state as :py:class:`pyrocko.pile.Batch` objects.
267 By default lists of :py:class:`pyrocko.trace.Trace` objects are
268 yielded.
269 :returns: iterator providing extracted waveforms for each extracted
270 window. See ``style`` argument for details.
271 '''
273 if tmin is None:
274 if self.tmin is None:
275 logger.warning("Pile's tmin is not set - pile may be empty.")
276 return
277 tmin = self.tmin + tpad
279 if tmax is None:
280 if self.tmax is None:
281 logger.warning("Pile's tmax is not set - pile may be empty.")
282 return
283 tmax = self.tmax - tpad
285 if tinc is None:
286 tinc = tmax - tmin
288 if not self.is_relevant(tmin-tpad, tmax+tpad):
289 return
291 nut_selector = trace_callback_to_nut_callback(trace_selector)
293 eps = tinc * 1e-6
294 if tinc != 0.0:
295 nwin = int(((tmax - eps) - tmin) / tinc) + 1
296 else:
297 nwin = 1
299 for iwin in range(nwin):
300 wmin, wmax = tmin+iwin*tinc, min(tmin+(iwin+1)*tinc, tmax)
302 chopped, used_files = self.chop(
303 wmin-tpad, wmax+tpad, nut_selector, snap,
304 include_last, load_data, accessor_id)
306 processed = self._process_chopped(
307 chopped, degap, maxgap, maxlap, want_incomplete, wmax, wmin,
308 tpad)
310 if style == 'batch':
311 yield classic_pile.Batch(
312 tmin=wmin,
313 tmax=wmax,
314 i=iwin,
315 n=nwin,
316 traces=processed)
318 else:
319 yield processed
321 if not keep_current_files_open:
322 self._squirrel.clear_accessor(accessor_id, 'waveform')
324 def chopper_grouped(self, gather, progress=None, *args, **kwargs):
325 keys = self.gather_keys(gather)
326 if len(keys) == 0:
327 return
329 outer_trace_selector = None
330 if 'trace_selector' in kwargs:
331 outer_trace_selector = kwargs['trace_selector']
333 # the use of this gather-cache makes it impossible to modify the pile
334 # during chopping
335 pbar = None
336 try:
337 if progress is not None:
338 pbar = util.progressbar(progress, len(keys))
340 for ikey, key in enumerate(keys):
341 def tsel(tr):
342 return gather(tr) == key and (
343 outer_trace_selector is None or
344 outer_trace_selector(tr))
346 kwargs['trace_selector'] = tsel
348 for traces in self.chopper(*args, **kwargs):
349 yield traces
351 if pbar:
352 pbar.update(ikey+1)
354 finally:
355 if pbar:
356 pbar.finish()
358 def reload_modified(self):
359 self._squirrel.reload()
361 def iter_traces(
362 self,
363 load_data=False,
364 return_abspath=False,
365 trace_selector=None):
367 '''
368 Iterate over all traces in pile.
370 :param load_data: whether to load the waveform data, by default empty
371 traces are yielded
372 :param return_abspath: if ``True`` yield tuples containing absolute
373 file path and :py:class:`pyrocko.trace.Trace` objects
374 :param trace_selector: filter callback taking
375 :py:class:`pyrocko.trace.Trace` objects
377 '''
378 assert not load_data
379 assert not return_abspath
381 nut_selector = trace_callback_to_nut_callback(trace_selector)
383 for nut in self._squirrel.get_waveform_nuts():
384 if nut_selector is None or nut_selector(nut):
385 yield trace.Trace(**nut.trace_kwargs)
387 def gather_keys(self, gather, selector=None):
388 codes_gather = trace_callback_to_codes_callback(gather)
389 codes_selector = trace_callback_to_codes_callback(selector)
390 return self._squirrel._gather_codes_keys(
391 'waveform', codes_gather, codes_selector)
393 def snuffle(self, **kwargs):
394 '''Visualize it.
396 :param stations: list of `pyrocko.model.Station` objects or ``None``
397 :param events: list of `pyrocko.model.Event` objects or ``None``
398 :param markers: list of `pyrocko.gui_util.Marker` objects or ``None``
399 :param ntracks: float, number of tracks to be shown initially
400 (default: 12)
401 :param follow: time interval (in seconds) for real time follow mode or
402 ``None``
403 :param controls: bool, whether to show the main controls (default:
404 ``True``)
405 :param opengl: bool, whether to use opengl (default: ``False``)
406 '''
408 from pyrocko.gui.snuffler import snuffle
409 snuffle(self, **kwargs)
411 def add_file(self, mtf):
412 if isinstance(mtf, classic_pile.MemTracesFile):
413 name = self._squirrel.add_volatile_waveforms(mtf.get_traces())
414 mtf._squirrel_name = name
415 else:
416 assert False
418 def remove_file(self, mtf):
419 if isinstance(mtf, classic_pile.MemTracesFile) \
420 and getattr(mtf, '_squirrel_name', False):
422 self._squirrel.remove(mtf._squirrel_name)
423 mtf._squirrel_name = None
425 def is_empty(self):
426 return 'waveform' not in self._squirrel.get_kinds()
428 def get_update_count(self):
429 return 0
432def get_cache(_):
433 return None