Coverage for /usr/local/lib/python3.11/dist-packages/pyrocko/squirrel/pile.py: 83%

193 statements  

« prev     ^ index     » next       coverage.py v6.5.0, created at 2023-10-06 06:59 +0000

1# http://pyrocko.org - GPLv3 

2# 

3# The Pyrocko Developers, 21st Century 

4# ---|P------/S----------~Lg---------- 

5 

6''' 

7Emulation of the older :py:mod:`pyrocko.pile` interface. 

8''' 

9 

10import logging 

11from pyrocko import squirrel as psq, trace, util 

12from pyrocko import pile as classic_pile 

13 

14logger = logging.getLogger('psq.pile') 

15 

16 

17def trace_callback_to_nut_callback(trace_callback): 

18 if trace_callback is None: 

19 return None 

20 

21 def nut_callback(nut): 

22 return trace_callback(nut.dummy_trace) 

23 

24 return nut_callback 

25 

26 

27class CodesDummyTrace(object): 

28 def __init__(self, codes): 

29 self.network, self.station, self.location, self.channel \ 

30 = self.nslc_id \ 

31 = codes[0:4] 

32 

33 

34def trace_callback_to_codes_callback(trace_callback): 

35 if trace_callback is None: 

36 return None 

37 

38 def codes_callback(codes): 

39 return trace_callback(CodesDummyTrace(codes)) 

40 

41 return codes_callback 

42 

43 

44class Pile(object): 

45 ''' 

46 :py:class:`pyrocko.pile.Pile` surrogate: waveform lookup, loading and 

47 caching. 

48 

49 This class emulates most of the older :py:class:`pyrocko.pile.Pile` methods 

50 by using calls to a :py:class:`pyrocko.squirrel.base.Squirrel` instance 

51 behind the scenes. 

52 

53 This interface can be used as a drop-in replacement for piles which are 

54 used in existing scripts and programs for efficient waveform data access. 

55 The Squirrel-based pile scales better for large datasets. Newer scripts 

56 should use Squirrel's native methods to avoid the emulation overhead. 

57 

58 .. note:: 

59 Many methods in the original pile implementation lack documentation, as 

60 do here. Read the source, Luke! 

61 ''' 

62 def __init__(self, squirrel=None): 

63 if squirrel is None: 

64 squirrel = psq.Squirrel() 

65 

66 self._squirrel = squirrel 

67 self._listeners = [] 

68 self._squirrel.get_database().add_listener( 

69 self._notify_squirrel_to_pile) 

70 

71 def _notify_squirrel_to_pile(self, event, *args): 

72 self.notify_listeners(event) 

73 

74 def add_listener(self, obj): 

75 self._listeners.append(util.smart_weakref(obj)) 

76 

77 def notify_listeners(self, what): 

78 for ref in self._listeners: 

79 obj = ref() 

80 if obj: 

81 obj(what, []) 

82 

83 def get_tmin(self): 

84 return self.tmin 

85 

86 def get_tmax(self): 

87 return self.tmax 

88 

89 def get_deltatmin(self): 

90 return self._squirrel.get_deltat_span('waveform')[0] 

91 

92 def get_deltatmax(self): 

93 return self._squirrel.get_deltat_span('waveform')[1] 

94 

95 @property 

96 def deltatmin(self): 

97 return self.get_deltatmin() 

98 

99 @property 

100 def deltatmax(self): 

101 return self.get_deltatmax() 

102 

103 @property 

104 def tmin(self): 

105 return self._squirrel.get_time_span('waveform', dummy_limits=False)[0] 

106 

107 @property 

108 def tmax(self): 

109 return self._squirrel.get_time_span('waveform', dummy_limits=False)[1] 

110 

111 @property 

112 def networks(self): 

113 return set( 

114 codes.network for codes in self._squirrel.get_codes('waveform')) 

115 

116 @property 

117 def stations(self): 

118 return set( 

119 codes.station for codes in self._squirrel.get_codes('waveform')) 

120 

121 @property 

122 def locations(self): 

123 return set( 

124 codes.location for codes in self._squirrel.get_codes('waveform')) 

125 

126 @property 

127 def channels(self): 

128 return set( 

129 codes.channel for codes in self._squirrel.get_codes('waveform')) 

130 

131 def is_relevant(self, tmin, tmax): 

132 ptmin, ptmax = self._squirrel.get_time_span( 

133 ['waveform', 'waveform_promise'], dummy_limits=False) 

134 

135 if None in (ptmin, ptmax): 

136 return False 

137 

138 return tmax >= ptmin and ptmax >= tmin 

139 

140 def load_files( 

141 self, filenames, 

142 filename_attributes=None, 

143 fileformat='mseed', 

144 cache=None, 

145 show_progress=True, 

146 update_progress=None): 

147 

148 self._squirrel.add( 

149 filenames, kinds='waveform', format=fileformat) 

150 

151 def chop( 

152 self, tmin, tmax, 

153 nut_selector=None, 

154 snap=(round, round), 

155 include_last=False, 

156 load_data=True, 

157 accessor_id='default'): 

158 

159 nuts = self._squirrel.get_waveform_nuts(tmin=tmin, tmax=tmax) 

160 

161 if load_data: 

162 traces = [ 

163 self._squirrel.get_content(nut, 'waveform', accessor_id) 

164 

165 for nut in nuts if nut_selector is None or nut_selector(nut)] 

166 

167 else: 

168 traces = [ 

169 trace.Trace(**nut.trace_kwargs) 

170 for nut in nuts if nut_selector is None or nut_selector(nut)] 

171 

172 self._squirrel.advance_accessor(accessor_id) 

173 

174 chopped = [] 

175 used_files = set() 

176 for tr in traces: 

177 if not load_data and tr.ydata is not None: 

178 tr = tr.copy(data=False) 

179 tr.ydata = None 

180 

181 try: 

182 chopped.append(tr.chop( 

183 tmin, tmax, 

184 inplace=False, 

185 snap=snap, 

186 include_last=include_last)) 

187 

188 except trace.NoData: 

189 pass 

190 

191 return chopped, used_files 

192 

193 def _process_chopped( 

194 self, chopped, degap, maxgap, maxlap, want_incomplete, wmax, wmin, 

195 tpad): 

196 

197 chopped.sort(key=lambda a: a.full_id) 

198 if degap: 

199 chopped = trace.degapper(chopped, maxgap=maxgap, maxlap=maxlap) 

200 

201 if not want_incomplete: 

202 chopped_weeded = [] 

203 for tr in chopped: 

204 emin = tr.tmin - (wmin-tpad) 

205 emax = tr.tmax + tr.deltat - (wmax+tpad) 

206 if (abs(emin) <= 0.5*tr.deltat and abs(emax) <= 0.5*tr.deltat): 

207 chopped_weeded.append(tr) 

208 

209 elif degap: 

210 if (0. < emin <= 5. * tr.deltat and 

211 -5. * tr.deltat <= emax < 0.): 

212 

213 tr.extend( 

214 wmin-tpad, 

215 wmax+tpad-tr.deltat, 

216 fillmethod='repeat') 

217 

218 chopped_weeded.append(tr) 

219 

220 chopped = chopped_weeded 

221 

222 for tr in chopped: 

223 tr.wmin = wmin 

224 tr.wmax = wmax 

225 

226 return chopped 

227 

228 def chopper( 

229 self, 

230 tmin=None, tmax=None, tinc=None, tpad=0., 

231 trace_selector=None, 

232 want_incomplete=True, degap=True, maxgap=5, maxlap=None, 

233 keep_current_files_open=False, accessor_id='default', 

234 snap=(round, round), include_last=False, load_data=True, 

235 style=None): 

236 

237 ''' 

238 Get iterator for shifting window wise data extraction from waveform 

239 archive. 

240 

241 :param tmin: start time (default uses start time of available data) 

242 :param tmax: end time (default uses end time of available data) 

243 :param tinc: time increment (window shift time) (default uses 

244 ``tmax-tmin``) 

245 :param tpad: padding time appended on either side of the data windows 

246 (window overlap is ``2*tpad``) 

247 :param trace_selector: filter callback taking 

248 :py:class:`pyrocko.trace.Trace` objects 

249 :param want_incomplete: if set to ``False``, gappy/incomplete traces 

250 are discarded from the results 

251 :param degap: whether to try to connect traces and to remove gaps and 

252 overlaps 

253 :param maxgap: maximum gap size in samples which is filled with 

254 interpolated samples when ``degap`` is ``True`` 

255 :param maxlap: maximum overlap size in samples which is removed when 

256 ``degap`` is ``True`` 

257 :param keep_current_files_open: whether to keep cached trace data in 

258 memory after the iterator has ended 

259 :param accessor_id: if given, used as a key to identify different 

260 points of extraction for the decision of when to release cached 

261 trace data (should be used when data is alternately extracted from 

262 more than one region / selection) 

263 :param snap: replaces Python's :py:func:`round` function which is used 

264 to determine indices where to start and end the trace data array 

265 :param include_last: whether to include last sample 

266 :param load_data: whether to load the waveform data. If set to 

267 ``False``, traces with no data samples, but with correct 

268 meta-information are returned 

269 :param style: set to ``'batch'`` to yield waveforms and information 

270 about the chopper state as :py:class:`pyrocko.pile.Batch` objects. 

271 By default lists of :py:class:`pyrocko.trace.Trace` objects are 

272 yielded. 

273 :returns: iterator providing extracted waveforms for each extracted 

274 window. See ``style`` argument for details. 

275 ''' 

276 

277 if tmin is None: 

278 if self.tmin is None: 

279 logger.warning("Pile's tmin is not set - pile may be empty.") 

280 return 

281 tmin = self.tmin + tpad 

282 

283 if tmax is None: 

284 if self.tmax is None: 

285 logger.warning("Pile's tmax is not set - pile may be empty.") 

286 return 

287 tmax = self.tmax - tpad 

288 

289 if tinc is None: 

290 tinc = tmax - tmin 

291 

292 if not self.is_relevant(tmin-tpad, tmax+tpad): 

293 return 

294 

295 nut_selector = trace_callback_to_nut_callback(trace_selector) 

296 

297 eps = tinc * 1e-6 

298 if tinc != 0.0: 

299 nwin = int(((tmax - eps) - tmin) / tinc) + 1 

300 else: 

301 nwin = 1 

302 

303 for iwin in range(nwin): 

304 wmin, wmax = tmin+iwin*tinc, min(tmin+(iwin+1)*tinc, tmax) 

305 

306 chopped, used_files = self.chop( 

307 wmin-tpad, wmax+tpad, nut_selector, snap, 

308 include_last, load_data, accessor_id) 

309 

310 processed = self._process_chopped( 

311 chopped, degap, maxgap, maxlap, want_incomplete, wmax, wmin, 

312 tpad) 

313 

314 if style == 'batch': 

315 yield classic_pile.Batch( 

316 tmin=wmin, 

317 tmax=wmax, 

318 i=iwin, 

319 n=nwin, 

320 traces=processed) 

321 

322 else: 

323 yield processed 

324 

325 if not keep_current_files_open: 

326 self._squirrel.clear_accessor(accessor_id, 'waveform') 

327 

328 def chopper_grouped(self, gather, progress=None, *args, **kwargs): 

329 keys = self.gather_keys(gather) 

330 if len(keys) == 0: 

331 return 

332 

333 outer_trace_selector = None 

334 if 'trace_selector' in kwargs: 

335 outer_trace_selector = kwargs['trace_selector'] 

336 

337 # the use of this gather-cache makes it impossible to modify the pile 

338 # during chopping 

339 pbar = None 

340 try: 

341 if progress is not None: 

342 pbar = util.progressbar(progress, len(keys)) 

343 

344 for ikey, key in enumerate(keys): 

345 def tsel(tr): 

346 return gather(tr) == key and ( 

347 outer_trace_selector is None or 

348 outer_trace_selector(tr)) 

349 

350 kwargs['trace_selector'] = tsel 

351 

352 for traces in self.chopper(*args, **kwargs): 

353 yield traces 

354 

355 if pbar: 

356 pbar.update(ikey+1) 

357 

358 finally: 

359 if pbar: 

360 pbar.finish() 

361 

362 def reload_modified(self): 

363 self._squirrel.reload() 

364 

365 def iter_traces( 

366 self, 

367 load_data=False, 

368 return_abspath=False, 

369 trace_selector=None): 

370 

371 ''' 

372 Iterate over all traces in pile. 

373 

374 :param load_data: whether to load the waveform data, by default empty 

375 traces are yielded 

376 :param return_abspath: if ``True`` yield tuples containing absolute 

377 file path and :py:class:`pyrocko.trace.Trace` objects 

378 :param trace_selector: filter callback taking 

379 :py:class:`pyrocko.trace.Trace` objects 

380 

381 ''' 

382 assert not load_data 

383 assert not return_abspath 

384 

385 nut_selector = trace_callback_to_nut_callback(trace_selector) 

386 

387 for nut in self._squirrel.get_waveform_nuts(): 

388 if nut_selector is None or nut_selector(nut): 

389 yield trace.Trace(**nut.trace_kwargs) 

390 

391 def gather_keys(self, gather, selector=None): 

392 codes_gather = trace_callback_to_codes_callback(gather) 

393 codes_selector = trace_callback_to_codes_callback(selector) 

394 return self._squirrel._gather_codes_keys( 

395 'waveform', codes_gather, codes_selector) 

396 

397 def snuffle(self, **kwargs): 

398 '''Visualize it. 

399 

400 :param stations: list of `pyrocko.model.Station` objects or ``None`` 

401 :param events: list of `pyrocko.model.Event` objects or ``None`` 

402 :param markers: list of `pyrocko.gui_util.Marker` objects or ``None`` 

403 :param ntracks: float, number of tracks to be shown initially 

404 (default: 12) 

405 :param follow: time interval (in seconds) for real time follow mode or 

406 ``None`` 

407 :param controls: bool, whether to show the main controls (default: 

408 ``True``) 

409 :param opengl: bool, whether to use opengl (default: ``False``) 

410 ''' 

411 

412 from pyrocko.gui.snuffler import snuffle 

413 snuffle(self, **kwargs) 

414 

415 def add_file(self, mtf): 

416 if isinstance(mtf, classic_pile.MemTracesFile): 

417 name = self._squirrel.add_volatile_waveforms(mtf.get_traces()) 

418 mtf._squirrel_name = name 

419 else: 

420 assert False 

421 

422 def remove_file(self, mtf): 

423 if isinstance(mtf, classic_pile.MemTracesFile) \ 

424 and getattr(mtf, '_squirrel_name', False): 

425 

426 self._squirrel.remove(mtf._squirrel_name) 

427 mtf._squirrel_name = None 

428 

429 def is_empty(self): 

430 return 'waveform' not in self._squirrel.get_kinds() 

431 

432 def get_update_count(self): 

433 return 0 

434 

435 

436def get_cache(_): 

437 return None