1# http://pyrocko.org - GPLv3 

2# 

3# The Pyrocko Developers, 21st Century 

4# ---|P------/S----------~Lg---------- 

5 

6import logging 

7import weakref 

8from pyrocko import squirrel as psq, trace, util 

9from pyrocko import pile as classic_pile 

10 

11logger = logging.getLogger('psq.pile') 

12 

13 

14def trace_callback_to_nut_callback(trace_callback): 

15 if trace_callback is None: 

16 return None 

17 

18 def nut_callback(nut): 

19 return trace_callback(nut.dummy_trace) 

20 

21 return nut_callback 

22 

23 

24class CodesDummyTrace(object): 

25 def __init__(self, codes): 

26 self.network, self.station, self.location, self.channel \ 

27 = self.nslc_id \ 

28 = codes[0:4] 

29 

30 

31def trace_callback_to_codes_callback(trace_callback): 

32 if trace_callback is None: 

33 return None 

34 

35 def codes_callback(codes): 

36 return trace_callback(CodesDummyTrace(codes)) 

37 

38 return codes_callback 

39 

40 

41class Pile(object): 

42 ''' 

43 :py:class:`pyrocko.pile.Pile` surrogate: waveform lookup, loading and 

44 caching. 

45 

46 This class emulates most of the older :py:class:`pyrocko.pile.Pile` methods 

47 by using calls to a :py:class:`pyrocko.squirrel.base.Squirrel` instance 

48 behind the scenes. 

49 

50 This interface can be used as a drop-in replacement for piles which are 

51 used in existing scripts and programs for efficient waveform data access. 

52 The Squirrel-based pile scales better for large datasets. Newer scripts 

53 should use Squirrel's native methods to avoid the emulation overhead. 

54 

55 .. note:: 

56 Many methods in the original pile implementation lack documentation, as 

57 do here. Read the source, Luke! 

58 ''' 

59 def __init__(self, squirrel=None): 

60 if squirrel is None: 

61 squirrel = psq.Squirrel() 

62 

63 self._squirrel = squirrel 

64 self._listeners = [] 

65 self._squirrel.get_database().add_listener( 

66 self._notify_squirrel_to_pile) 

67 

68 def _notify_squirrel_to_pile(self, event, *args): 

69 self.notify_listeners(event) 

70 

71 def add_listener(self, obj): 

72 self._listeners.append(weakref.ref(obj)) 

73 

74 def notify_listeners(self, what): 

75 for ref in self._listeners: 

76 obj = ref() 

77 if obj: 

78 obj.pile_changed(what) 

79 

80 def get_tmin(self): 

81 return self.tmin 

82 

83 def get_tmax(self): 

84 return self.tmax 

85 

86 def get_deltatmin(self): 

87 return self._squirrel.get_deltat_span('waveform')[0] 

88 

89 def get_deltatmax(self): 

90 return self._squirrel.get_deltat_span('waveform')[1] 

91 

92 @property 

93 def deltatmin(self): 

94 return self.get_deltatmin() 

95 

96 @property 

97 def deltatmax(self): 

98 return self.get_deltatmax() 

99 

100 @property 

101 def tmin(self): 

102 return self._squirrel.get_time_span('waveform')[0] 

103 

104 @property 

105 def tmax(self): 

106 return self._squirrel.get_time_span('waveform')[1] 

107 

108 @property 

109 def networks(self): 

110 return set( 

111 codes.network for codes in self._squirrel.get_codes('waveform')) 

112 

113 @property 

114 def stations(self): 

115 return set( 

116 codes.station for codes in self._squirrel.get_codes('waveform')) 

117 

118 @property 

119 def locations(self): 

120 return set( 

121 codes.location for codes in self._squirrel.get_codes('waveform')) 

122 

123 @property 

124 def channels(self): 

125 return set( 

126 codes.channel for codes in self._squirrel.get_codes('waveform')) 

127 

128 def is_relevant(self, tmin, tmax): 

129 ptmin, ptmax = self._squirrel.get_time_span( 

130 ['waveform', 'waveform_promise']) 

131 

132 if None in (ptmin, ptmax): 

133 return False 

134 

135 return tmax >= ptmin and ptmax >= tmin 

136 

137 def load_files( 

138 self, filenames, 

139 filename_attributes=None, 

140 fileformat='mseed', 

141 cache=None, 

142 show_progress=True, 

143 update_progress=None): 

144 

145 self._squirrel.add( 

146 filenames, kinds='waveform', format=fileformat) 

147 

148 def chop( 

149 self, tmin, tmax, 

150 nut_selector=None, 

151 snap=(round, round), 

152 include_last=False, 

153 load_data=True, 

154 accessor_id='default'): 

155 

156 nuts = self._squirrel.get_waveform_nuts(tmin=tmin, tmax=tmax) 

157 

158 if load_data: 

159 traces = [ 

160 self._squirrel.get_content(nut, 'waveform', accessor_id) 

161 

162 for nut in nuts if nut_selector is None or nut_selector(nut)] 

163 

164 else: 

165 traces = [ 

166 trace.Trace(**nut.trace_kwargs) 

167 for nut in nuts if nut_selector is None or nut_selector(nut)] 

168 

169 self._squirrel.advance_accessor(accessor_id) 

170 

171 chopped = [] 

172 used_files = set() 

173 for tr in traces: 

174 if not load_data and tr.ydata is not None: 

175 tr = tr.copy(data=False) 

176 tr.ydata = None 

177 

178 try: 

179 chopped.append(tr.chop( 

180 tmin, tmax, 

181 inplace=False, 

182 snap=snap, 

183 include_last=include_last)) 

184 

185 except trace.NoData: 

186 pass 

187 

188 return chopped, used_files 

189 

190 def _process_chopped( 

191 self, chopped, degap, maxgap, maxlap, want_incomplete, wmax, wmin, 

192 tpad): 

193 

194 chopped.sort(key=lambda a: a.full_id) 

195 if degap: 

196 chopped = trace.degapper(chopped, maxgap=maxgap, maxlap=maxlap) 

197 

198 if not want_incomplete: 

199 chopped_weeded = [] 

200 for tr in chopped: 

201 emin = tr.tmin - (wmin-tpad) 

202 emax = tr.tmax + tr.deltat - (wmax+tpad) 

203 if (abs(emin) <= 0.5*tr.deltat and abs(emax) <= 0.5*tr.deltat): 

204 chopped_weeded.append(tr) 

205 

206 elif degap: 

207 if (0. < emin <= 5. * tr.deltat and 

208 -5. * tr.deltat <= emax < 0.): 

209 

210 tr.extend( 

211 wmin-tpad, 

212 wmax+tpad-tr.deltat, 

213 fillmethod='repeat') 

214 

215 chopped_weeded.append(tr) 

216 

217 chopped = chopped_weeded 

218 

219 for tr in chopped: 

220 tr.wmin = wmin 

221 tr.wmax = wmax 

222 

223 return chopped 

224 

225 def chopper( 

226 self, 

227 tmin=None, tmax=None, tinc=None, tpad=0., 

228 trace_selector=None, 

229 want_incomplete=True, degap=True, maxgap=5, maxlap=None, 

230 keep_current_files_open=False, accessor_id='default', 

231 snap=(round, round), include_last=False, load_data=True, 

232 style=None): 

233 

234 ''' 

235 Get iterator for shifting window wise data extraction from waveform 

236 archive. 

237 

238 :param tmin: start time (default uses start time of available data) 

239 :param tmax: end time (default uses end time of available data) 

240 :param tinc: time increment (window shift time) (default uses 

241 ``tmax-tmin``) 

242 :param tpad: padding time appended on either side of the data windows 

243 (window overlap is ``2*tpad``) 

244 :param trace_selector: filter callback taking 

245 :py:class:`pyrocko.trace.Trace` objects 

246 :param want_incomplete: if set to ``False``, gappy/incomplete traces 

247 are discarded from the results 

248 :param degap: whether to try to connect traces and to remove gaps and 

249 overlaps 

250 :param maxgap: maximum gap size in samples which is filled with 

251 interpolated samples when ``degap`` is ``True`` 

252 :param maxlap: maximum overlap size in samples which is removed when 

253 ``degap`` is ``True`` 

254 :param keep_current_files_open: whether to keep cached trace data in 

255 memory after the iterator has ended 

256 :param accessor_id: if given, used as a key to identify different 

257 points of extraction for the decision of when to release cached 

258 trace data (should be used when data is alternately extracted from 

259 more than one region / selection) 

260 :param snap: replaces Python's :py:func:`round` function which is used 

261 to determine indices where to start and end the trace data array 

262 :param include_last: whether to include last sample 

263 :param load_data: whether to load the waveform data. If set to 

264 ``False``, traces with no data samples, but with correct 

265 meta-information are returned 

266 :param style: set to ``'batch'`` to yield waveforms and information 

267 about the chopper state as :py:class:`pyrocko.pile.Batch` objects. 

268 By default lists of :py:class:`pyrocko.trace.Trace` objects are 

269 yielded. 

270 :returns: iterator providing extracted waveforms for each extracted 

271 window. See ``style`` argument for details. 

272 ''' 

273 

274 if tmin is None: 

275 if self.tmin is None: 

276 logger.warning('Pile\'s tmin is not set - pile may be empty.') 

277 return 

278 tmin = self.tmin + tpad 

279 

280 if tmax is None: 

281 if self.tmax is None: 

282 logger.warning('Pile\'s tmax is not set - pile may be empty.') 

283 return 

284 tmax = self.tmax - tpad 

285 

286 if tinc is None: 

287 tinc = tmax - tmin 

288 

289 if not self.is_relevant(tmin-tpad, tmax+tpad): 

290 return 

291 

292 nut_selector = trace_callback_to_nut_callback(trace_selector) 

293 

294 eps = tinc * 1e-6 

295 if tinc != 0.0: 

296 nwin = int(((tmax - eps) - tmin) / tinc) + 1 

297 else: 

298 nwin = 1 

299 

300 for iwin in range(nwin): 

301 wmin, wmax = tmin+iwin*tinc, min(tmin+(iwin+1)*tinc, tmax) 

302 

303 chopped, used_files = self.chop( 

304 wmin-tpad, wmax+tpad, nut_selector, snap, 

305 include_last, load_data, accessor_id) 

306 

307 processed = self._process_chopped( 

308 chopped, degap, maxgap, maxlap, want_incomplete, wmax, wmin, 

309 tpad) 

310 

311 if style == 'batch': 

312 yield classic_pile.Batch( 

313 tmin=wmin, 

314 tmax=wmax, 

315 i=iwin, 

316 n=nwin, 

317 traces=processed) 

318 

319 else: 

320 yield processed 

321 

322 if not keep_current_files_open: 

323 self._squirrel.clear_accessor(accessor_id, 'waveform') 

324 

325 def chopper_grouped(self, gather, progress=None, *args, **kwargs): 

326 keys = self.gather_keys(gather) 

327 if len(keys) == 0: 

328 return 

329 

330 outer_trace_selector = None 

331 if 'trace_selector' in kwargs: 

332 outer_trace_selector = kwargs['trace_selector'] 

333 

334 # the use of this gather-cache makes it impossible to modify the pile 

335 # during chopping 

336 pbar = None 

337 try: 

338 if progress is not None: 

339 pbar = util.progressbar(progress, len(keys)) 

340 

341 for ikey, key in enumerate(keys): 

342 def tsel(tr): 

343 return gather(tr) == key and ( 

344 outer_trace_selector is None or 

345 outer_trace_selector(tr)) 

346 

347 kwargs['trace_selector'] = tsel 

348 

349 for traces in self.chopper(*args, **kwargs): 

350 yield traces 

351 

352 if pbar: 

353 pbar.update(ikey+1) 

354 

355 finally: 

356 if pbar: 

357 pbar.finish() 

358 

359 def reload_modified(self): 

360 self._squirrel.reload() 

361 

362 def iter_traces( 

363 self, 

364 load_data=False, 

365 return_abspath=False, 

366 trace_selector=None): 

367 

368 ''' 

369 Iterate over all traces in pile. 

370 

371 :param load_data: whether to load the waveform data, by default empty 

372 traces are yielded 

373 :param return_abspath: if ``True`` yield tuples containing absolute 

374 file path and :py:class:`pyrocko.trace.Trace` objects 

375 :param trace_selector: filter callback taking 

376 :py:class:`pyrocko.trace.Trace` objects 

377 

378 ''' 

379 assert not load_data 

380 assert not return_abspath 

381 

382 nut_selector = trace_callback_to_nut_callback(trace_selector) 

383 

384 for nut in self._squirrel.get_waveform_nuts(): 

385 if nut_selector is None or nut_selector(nut): 

386 yield trace.Trace(**nut.trace_kwargs) 

387 

388 def gather_keys(self, gather, selector=None): 

389 codes_gather = trace_callback_to_codes_callback(gather) 

390 codes_selector = trace_callback_to_codes_callback(selector) 

391 return self._squirrel._gather_codes_keys( 

392 'waveform', codes_gather, codes_selector) 

393 

394 def snuffle(self, **kwargs): 

395 '''Visualize it. 

396 

397 :param stations: list of `pyrocko.model.Station` objects or ``None`` 

398 :param events: list of `pyrocko.model.Event` objects or ``None`` 

399 :param markers: list of `pyrocko.gui_util.Marker` objects or ``None`` 

400 :param ntracks: float, number of tracks to be shown initially 

401 (default: 12) 

402 :param follow: time interval (in seconds) for real time follow mode or 

403 ``None`` 

404 :param controls: bool, whether to show the main controls (default: 

405 ``True``) 

406 :param opengl: bool, whether to use opengl (default: ``False``) 

407 ''' 

408 

409 from pyrocko.gui.snuffler import snuffle 

410 snuffle(self, **kwargs) 

411 

412 def add_file(self, mtf): 

413 if isinstance(mtf, classic_pile.MemTracesFile): 

414 name = self._squirrel.add_volatile_waveforms(mtf.get_traces()) 

415 mtf._squirrel_name = name 

416 else: 

417 assert False 

418 

419 def remove_file(self, mtf): 

420 if isinstance(mtf, classic_pile.MemTracesFile) \ 

421 and getattr(mtf, '_squirrel_name', False): 

422 

423 self._squirrel.remove(mtf._squirrel_name) 

424 mtf._squirrel_name = None 

425 

426 def is_empty(self): 

427 return 'waveform' not in self._squirrel.get_kinds() 

428 

429 def get_update_count(self): 

430 return 0 

431 

432 

433def get_cache(_): 

434 return None