1# http://pyrocko.org - GPLv3
2#
3# The Pyrocko Developers, 21st Century
4# ---|P------/S----------~Lg----------
6import logging
7import weakref
8from pyrocko import squirrel as psq, trace
9from pyrocko import pile as classic_pile
11logger = logging.getLogger('psq.pile')
14def trace_callback_to_nut_callback(trace_callback):
15 if trace_callback is None:
16 return None
18 def nut_callback(nut):
19 return trace_callback(nut.dummy_trace)
21 return nut_callback
24class CodesDummyTrace(object):
25 def __init__(self, codes):
26 self.network, self.station, self.location, self.channel \
27 = self.nslc_id \
28 = codes[1:5]
31def trace_callback_to_codes_callback(trace_callback):
32 if trace_callback is None:
33 return None
35 def codes_callback(codes):
36 return trace_callback(CodesDummyTrace(codes))
38 return codes_callback
41class Pile(object):
42 '''
43 :py:class:`pyrocko.pile.Pile` surrogate: waveform lookup, loading and
44 caching.
46 This class emulates most of the older :py:class:`pyrocko.pile.Pile` methods
47 by using calls to a :py:class:`pyrocko.squirrel.base.Squirrel` instance
48 behind the scenes.
50 This interface can be used as a drop-in replacement for piles which are
51 used in existing scripts and programs for efficient waveform data access.
52 The Squirrel-based pile scales better for large datasets. Newer scripts
53 should use Squirrel's native methods to avoid the emulation overhead.
55 .. note::
56 Many methods in the original pile implementation lack documentation, as
57 do here. Read the source, Luke!
58 '''
59 def __init__(self, squirrel=None):
60 if squirrel is None:
61 squirrel = psq.Squirrel()
63 self._squirrel = squirrel
64 self._listeners = []
66 def add_listener(self, obj):
67 self._listeners.append(weakref.ref(obj))
69 def notify_listeners(self, what):
70 for ref in self._listeners:
71 obj = ref()
72 if obj:
73 obj.pile_changed(what)
75 def get_tmin(self):
76 return self.tmin
78 def get_tmax(self):
79 return self.tmax
81 def get_deltatmin(self):
82 return self._squirrel.get_deltat_span('waveform')[0]
84 def get_deltatmax(self):
85 return self._squirrel.get_deltat_span('waveform')[1]
87 @property
88 def deltatmin(self):
89 return self.get_deltatmin()
91 @property
92 def deltatmax(self):
93 return self.get_deltatmax()
95 @property
96 def tmin(self):
97 return self._squirrel.get_time_span('waveform')[0]
99 @property
100 def tmax(self):
101 return self._squirrel.get_time_span('waveform')[1]
103 @property
104 def networks(self):
105 return set(codes[1] for codes in self._squirrel.get_codes('waveform'))
107 @property
108 def stations(self):
109 return set(codes[2] for codes in self._squirrel.get_codes('waveform'))
111 @property
112 def locations(self):
113 return set(codes[3] for codes in self._squirrel.get_codes('waveform'))
115 @property
116 def channels(self):
117 return set(codes[4] for codes in self._squirrel.get_codes('waveform'))
119 def is_relevant(self, tmin, tmax):
120 ptmin, ptmax = self._squirrel.get_time_span(
121 ['waveform', 'waveform_promise'])
123 if None in (ptmin, ptmax):
124 return False
126 return tmax >= ptmin and ptmax >= tmin
128 def load_files(
129 self, filenames,
130 filename_attributes=None,
131 fileformat='mseed',
132 cache=None,
133 show_progress=True,
134 update_progress=None):
136 self._squirrel.add(
137 filenames, kinds='waveform', format=fileformat)
139 def chop(
140 self, tmin, tmax,
141 nut_selector=None,
142 snap=(round, round),
143 include_last=False,
144 load_data=True,
145 accessor_id='default'):
147 nuts = self._squirrel.get_waveform_nuts(tmin=tmin, tmax=tmax)
149 if load_data:
150 traces = [
151 self._squirrel.get_content(nut, 'waveform', accessor_id)
153 for nut in nuts if nut_selector is None or nut_selector(nut)]
155 else:
156 traces = [
157 trace.Trace(**nut.trace_kwargs)
158 for nut in nuts if nut_selector is None or nut_selector(nut)]
160 self._squirrel.advance_accessor(accessor_id)
162 chopped = []
163 used_files = set()
164 for tr in traces:
165 if not load_data and tr.ydata is not None:
166 tr = tr.copy(data=False)
167 tr.ydata = None
169 try:
170 chopped.append(tr.chop(
171 tmin, tmax,
172 inplace=False,
173 snap=snap,
174 include_last=include_last))
176 except trace.NoData:
177 pass
179 return chopped, used_files
181 def _process_chopped(
182 self, chopped, degap, maxgap, maxlap, want_incomplete, wmax, wmin,
183 tpad):
185 chopped.sort(key=lambda a: a.full_id)
186 if degap:
187 chopped = trace.degapper(chopped, maxgap=maxgap, maxlap=maxlap)
189 if not want_incomplete:
190 chopped_weeded = []
191 for tr in chopped:
192 emin = tr.tmin - (wmin-tpad)
193 emax = tr.tmax + tr.deltat - (wmax+tpad)
194 if (abs(emin) <= 0.5*tr.deltat and abs(emax) <= 0.5*tr.deltat):
195 chopped_weeded.append(tr)
197 elif degap:
198 if (0. < emin <= 5. * tr.deltat and
199 -5. * tr.deltat <= emax < 0.):
201 tr.extend(
202 wmin-tpad,
203 wmax+tpad-tr.deltat,
204 fillmethod='repeat')
206 chopped_weeded.append(tr)
208 chopped = chopped_weeded
210 for tr in chopped:
211 tr.wmin = wmin
212 tr.wmax = wmax
214 return chopped
216 def chopper(
217 self,
218 tmin=None, tmax=None, tinc=None, tpad=0.,
219 group_selector=None, trace_selector=None,
220 want_incomplete=True, degap=True, maxgap=5, maxlap=None,
221 keep_current_files_open=False, accessor_id='default',
222 snap=(round, round), include_last=False, load_data=True,
223 style=None):
225 '''
226 Get iterator for shifting window wise data extraction from waveform
227 archive.
229 :param tmin: start time (default uses start time of available data)
230 :param tmax: end time (default uses end time of available data)
231 :param tinc: time increment (window shift time) (default uses
232 ``tmax-tmin``)
233 :param tpad: padding time appended on either side of the data windows
234 (window overlap is ``2*tpad``)
235 :param group_selector: *ignored in squirrel-based pile*
236 :param trace_selector: filter callback taking
237 :py:class:`pyrocko.trace.Trace` objects
238 :param want_incomplete: if set to ``False``, gappy/incomplete traces
239 are discarded from the results
240 :param degap: whether to try to connect traces and to remove gaps and
241 overlaps
242 :param maxgap: maximum gap size in samples which is filled with
243 interpolated samples when ``degap`` is ``True``
244 :param maxlap: maximum overlap size in samples which is removed when
245 ``degap`` is ``True``
246 :param keep_current_files_open: whether to keep cached trace data in
247 memory after the iterator has ended
248 :param accessor_id: if given, used as a key to identify different
249 points of extraction for the decision of when to release cached
250 trace data (should be used when data is alternately extracted from
251 more than one region / selection)
252 :param snap: replaces Python's :py:func:`round` function which is used
253 to determine indices where to start and end the trace data array
254 :param include_last: whether to include last sample
255 :param load_data: whether to load the waveform data. If set to
256 ``False``, traces with no data samples, but with correct
257 meta-information are returned
258 :param style: set to ``'batch'`` to yield waveforms and information
259 about the chopper state as :py:class:`pyrocko.pile.Batch` objects. By
260 default lists of :py:class:`pyrocko.trace.Trace` objects are yielded.
261 :returns: iterator providing extracted waveforms for each extracted
262 window. See ``style`` argument for details.
263 '''
265 if tmin is None:
266 if self.tmin is None:
267 logger.warning('Pile\'s tmin is not set - pile may be empty.')
268 return
269 tmin = self.tmin + tpad
271 if tmax is None:
272 if self.tmax is None:
273 logger.warning('Pile\'s tmax is not set - pile may be empty.')
274 return
275 tmax = self.tmax - tpad
277 if tinc is None:
278 tinc = tmax - tmin
280 if not self.is_relevant(tmin-tpad, tmax+tpad):
281 return
283 nut_selector = trace_callback_to_nut_callback(trace_selector)
285 eps = tinc * 1e-6
286 if tinc != 0.0:
287 nwin = int(((tmax - eps) - tmin) / tinc) + 1
288 else:
289 nwin = 1
291 for iwin in range(nwin):
292 wmin, wmax = tmin+iwin*tinc, min(tmin+(iwin+1)*tinc, tmax)
294 chopped, used_files = self.chop(
295 wmin-tpad, wmax+tpad, nut_selector, snap,
296 include_last, load_data, accessor_id)
298 processed = self._process_chopped(
299 chopped, degap, maxgap, maxlap, want_incomplete, wmax, wmin,
300 tpad)
302 if style == 'batch':
303 yield classic_pile.Batch(
304 tmin=wmin,
305 tmax=wmax,
306 i=iwin,
307 n=nwin,
308 traces=processed)
310 else:
311 yield processed
313 if not keep_current_files_open:
314 self._squirrel.clear_accessor(accessor_id, 'waveform')
316 def chopper_grouped(self, gather, progress=None, *args, **kwargs):
317 raise NotImplementedError
319 def reload_modified(self):
320 self._squirrel.reload()
322 def iter_traces(
323 self,
324 load_data=False,
325 return_abspath=False,
326 group_selector=None,
327 trace_selector=None):
329 '''
330 Iterate over all traces in pile.
332 :param load_data: whether to load the waveform data, by default empty
333 traces are yielded
334 :param return_abspath: if ``True`` yield tuples containing absolute
335 file path and :py:class:`pyrocko.trace.Trace` objects
336 :param group_selector: *ignored in squirre-based pile*
337 :param trace_selector: filter callback taking
338 :py:class:`pyrocko.trace.Trace` objects
340 '''
341 assert not load_data
342 assert not return_abspath
344 nut_selector = trace_callback_to_nut_callback(trace_selector)
346 for nut in self._squirrel.get_waveform_nuts():
347 if nut_selector is None or nut_selector(nut):
348 yield trace.Trace(**nut.trace_kwargs)
350 def gather_keys(self, gather, selector=None):
351 codes_gather = trace_callback_to_codes_callback(gather)
352 codes_selector = trace_callback_to_codes_callback(selector)
353 return self._squirrel._gather_codes_keys(
354 'waveform', codes_gather, codes_selector)
356 def snuffle(self, **kwargs):
357 '''Visualize it.
359 :param stations: list of `pyrocko.model.Station` objects or ``None``
360 :param events: list of `pyrocko.model.Event` objects or ``None``
361 :param markers: list of `pyrocko.gui_util.Marker` objects or ``None``
362 :param ntracks: float, number of tracks to be shown initially
363 (default: 12)
364 :param follow: time interval (in seconds) for real time follow mode or
365 ``None``
366 :param controls: bool, whether to show the main controls (default:
367 ``True``)
368 :param opengl: bool, whether to use opengl (default: ``False``)
369 '''
371 from pyrocko.gui.snuffler import snuffle
372 snuffle(self, **kwargs)
374 def add_file(self, mtf):
375 if isinstance(mtf, classic_pile.MemTracesFile):
376 name = self._squirrel.add_volatile_waveforms(mtf.get_traces())
377 mtf._squirrel_name = name
378 else:
379 assert False
381 self.notify_listeners('add')
383 def remove_file(self, mtf):
384 if isinstance(mtf, classic_pile.MemTracesFile) \
385 and getattr(mtf, '_squirrel_name', False):
387 self._squirrel.remove(mtf._squirrel_name)
388 mtf._squirrel_name = None
390 self.notify_listeners('remove')
392 def is_empty(self):
393 return 'waveform' not in self._squirrel.get_kinds()
395 def get_update_count(self):
396 return 0
399def get_cache(_):
400 return None