Coverage for /usr/local/lib/python3.11/dist-packages/pyrocko/pile.py: 71%

928 statements  

« prev     ^ index     » next       coverage.py v6.5.0, created at 2023-10-04 09:52 +0000

1# https://pyrocko.org - GPLv3 

2# 

3# The Pyrocko Developers, 21st Century 

4# ---|P------/S----------~Lg---------- 

5 

6''' 

7Waveform archive lookup, data loading and caching infrastructure. 

8 

9.. note:: 

10 

11 This module has been superseded by :py:mod:`~pyrocko.squirrel` but will 

12 remain available for backwards compatibility. 

13''' 

14 

15import os 

16import logging 

17import time 

18import copy 

19import re 

20import sys 

21import operator 

22import math 

23import hashlib 

24try: 

25 import cPickle as pickle 

26except ImportError: 

27 import pickle 

28 

29 

30from . import avl 

31from . import trace, io, util 

32from . import config 

33from .trace import degapper 

34 

35 

36is_windows = sys.platform.startswith('win') 

37show_progress_force_off = False 

38version_salt = 'v1-' 

39 

40 

41def ehash(s): 

42 return hashlib.sha1((version_salt + s).encode('utf8')).hexdigest() 

43 

44 

45def cmp(a, b): 

46 return int(a > b) - int(a < b) 

47 

48 

49def sl(s): 

50 return [str(x) for x in sorted(s)] 

51 

52 

53class Counter(dict): 

54 

55 def __missing__(self, k): 

56 return 0 

57 

58 def update(self, other): 

59 for k, v in other.items(): 

60 self[k] += v 

61 

62 def subtract(self, other): 

63 for k, v in other.items(): 

64 self[k] -= v 

65 if self[k] <= 0: 

66 del self[k] 

67 

68 def subtract1(self, k): 

69 self[k] -= 1 

70 if self[k] <= 0: 

71 del self[k] 

72 

73 

74def fix_unicode_copy(counter, func): 

75 counter_new = Counter() 

76 for k in counter: 

77 counter_new[func(k)] = counter[k] 

78 return counter_new 

79 

80 

81pjoin = os.path.join 

82logger = logging.getLogger('pyrocko.pile') 

83 

84 

85def avl_remove_exact(avltree, element): 

86 ilo, ihi = avltree.span(element) 

87 for i in range(ilo, ihi): 

88 if avltree[i] is element: 

89 avltree.remove_at(i) 

90 return i 

91 

92 raise ValueError( 

93 'avl_remove_exact(avltree, element): element not in avltree') 

94 

95 

96def cmpfunc(key): 

97 if isinstance(key, str): 

98 # special cases; these run about 50% faster than the generic one on 

99 # Python 2.5 

100 if key == 'tmin': 

101 return lambda a, b: cmp(a.tmin, b.tmin) 

102 if key == 'tmax': 

103 return lambda a, b: cmp(a.tmax, b.tmax) 

104 

105 key = operator.attrgetter(key) 

106 

107 return lambda a, b: cmp(key(a), key(b)) 

108 

109 

110g_dummys = {} 

111 

112 

113def get_dummy(key): 

114 if key not in g_dummys: 

115 class Dummy(object): 

116 def __init__(self, k): 

117 setattr(self, key, k) 

118 

119 g_dummys[key] = Dummy 

120 

121 return g_dummys[key] 

122 

123 

124class TooMany(Exception): 

125 def __init__(self, n): 

126 Exception.__init__(self) 

127 self.n = n 

128 

129 

130class Sorted(object): 

131 def __init__(self, values=[], key=None): 

132 self._set_key(key) 

133 self._avl = avl.new(values, self._cmp) 

134 

135 def _set_key(self, key): 

136 self._key = key 

137 self._cmp = cmpfunc(key) 

138 if isinstance(key, str): 

139 self._dummy = get_dummy(key) 

140 

141 def __getstate__(self): 

142 state = list(self._avl.iter()), self._key 

143 return state 

144 

145 def __setstate__(self, state): 

146 it, key = state 

147 self._set_key(key) 

148 self._avl = avl.from_iter(iter(it), len(it)) 

149 

150 def insert(self, value): 

151 self._avl.insert(value) 

152 

153 def remove(self, value): 

154 return avl_remove_exact(self._avl, value) 

155 

156 def remove_at(self, i): 

157 return self._avl.remove_at(i) 

158 

159 def insert_many(self, values): 

160 for value in values: 

161 self._avl.insert(value) 

162 

163 def remove_many(self, values): 

164 for value in values: 

165 avl_remove_exact(self._avl, value) 

166 

167 def __iter__(self): 

168 return iter(self._avl) 

169 

170 def with_key_in(self, kmin, kmax): 

171 omin, omax = self._dummy(kmin), self._dummy(kmax) 

172 ilo, ihi = self._avl.span(omin, omax) 

173 return self._avl[ilo:ihi] 

174 

175 def with_key_in_limited(self, kmin, kmax, nmax): 

176 omin, omax = self._dummy(kmin), self._dummy(kmax) 

177 ilo, ihi = self._avl.span(omin, omax) 

178 if ihi - ilo > nmax: 

179 raise TooMany(ihi - ilo) 

180 

181 return self._avl[ilo:ihi] 

182 

183 def index(self, value): 

184 ilo, ihi = self._avl.span(value) 

185 for i in range(ilo, ihi): 

186 if self._avl[i] is value: 

187 return i 

188 

189 raise ValueError('element is not in avl tree') 

190 

191 def min(self): 

192 return self._avl.min() 

193 

194 def max(self): 

195 return self._avl.max() 

196 

197 def __len__(self): 

198 return len(self._avl) 

199 

200 def __getitem__(self, i): 

201 return self._avl[i] 

202 

203 

204class TracesFileCache(object): 

205 ''' 

206 Manages trace metainformation cache. 

207 

208 For each directory with files containing traces, one cache file is 

209 maintained to hold the trace metainformation of all files which are 

210 contained in the directory. 

211 ''' 

212 

213 caches = {} 

214 

215 def __init__(self, cachedir): 

216 ''' 

217 Create new cache. 

218 

219 :param cachedir: directory to hold the cache files. 

220 ''' 

221 

222 self.cachedir = cachedir 

223 self.dircaches = {} 

224 self.modified = set() 

225 util.ensuredir(self.cachedir) 

226 

227 def get(self, abspath): 

228 ''' 

229 Try to get an item from the cache. 

230 

231 :param abspath: absolute path of the object to retrieve 

232 

233 :returns: a stored object is returned or None if nothing could be 

234 found. 

235 ''' 

236 

237 dircache = self._get_dircache_for(abspath) 

238 if abspath in dircache: 

239 return dircache[abspath] 

240 return None 

241 

242 def put(self, abspath, tfile): 

243 ''' 

244 Put an item into the cache. 

245 

246 :param abspath: absolute path of the object to be stored 

247 :param tfile: object to be stored 

248 ''' 

249 

250 cachepath = self._dircachepath(abspath) 

251 # get lock on cachepath here 

252 dircache = self._get_dircache(cachepath) 

253 dircache[abspath] = tfile 

254 self.modified.add(cachepath) 

255 

256 def dump_modified(self): 

257 ''' 

258 Save any modifications to disk. 

259 ''' 

260 

261 for cachepath in self.modified: 

262 self._dump_dircache(self.dircaches[cachepath], cachepath) 

263 # unlock 

264 

265 self.modified = set() 

266 

267 def clean(self): 

268 ''' 

269 Weed out missing files from the disk caches. 

270 ''' 

271 

272 self.dump_modified() 

273 

274 for fn in os.listdir(self.cachedir): 

275 if len(fn) == 40: 

276 cache = self._load_dircache(pjoin(self.cachedir, fn)) 

277 self._dump_dircache(cache, pjoin(self.cachedir, fn)) 

278 

279 def _get_dircache_for(self, abspath): 

280 return self._get_dircache(self._dircachepath(abspath)) 

281 

282 def _get_dircache(self, cachepath): 

283 if cachepath not in self.dircaches: 

284 if os.path.isfile(cachepath): 

285 self.dircaches[cachepath] = self._load_dircache(cachepath) 

286 else: 

287 self.dircaches[cachepath] = {} 

288 

289 return self.dircaches[cachepath] 

290 

291 def _dircachepath(self, abspath): 

292 cachefn = ehash(os.path.dirname(abspath)) 

293 return pjoin(self.cachedir, cachefn) 

294 

295 def _load_dircache(self, cachefilename): 

296 

297 with open(cachefilename, 'rb') as f: 

298 cache = pickle.load(f) 

299 

300 # weed out files which no longer exist 

301 for fn in list(cache.keys()): 

302 if not os.path.isfile(fn): 

303 del cache[fn] 

304 

305 time_float = util.get_time_float() 

306 

307 for v in cache.values(): 

308 v.trees_from_content(v.traces) 

309 for tr in v.traces: 

310 tr.file = v 

311 # fix Py2 codes to not include unicode when the cache file 

312 # was created with Py3 

313 if not isinstance(tr.station, str): 

314 tr.prune_from_reuse_cache() 

315 tr.set_codes( 

316 str(tr.network), 

317 str(tr.station), 

318 str(tr.location), 

319 str(tr.channel)) 

320 

321 tr.tmin = time_float(tr.tmin) 

322 tr.tmax = time_float(tr.tmax) 

323 

324 v.data_use_count = 0 

325 v.data_loaded = False 

326 v.fix_unicode_codes() 

327 

328 return cache 

329 

330 def _dump_dircache(self, cache, cachefilename): 

331 

332 if not cache: 

333 if os.path.exists(cachefilename): 

334 os.remove(cachefilename) 

335 return 

336 

337 # make a copy without the parents and the binsearch trees 

338 cache_copy = {} 

339 for fn in cache.keys(): 

340 trf = copy.copy(cache[fn]) 

341 trf.parent = None 

342 trf.by_tmin = None 

343 trf.by_tmax = None 

344 trf.by_tlen = None 

345 trf.by_mtime = None 

346 trf.data_use_count = 0 

347 trf.data_loaded = False 

348 traces = [] 

349 for tr in trf.traces: 

350 tr = tr.copy(data=False) 

351 tr.ydata = None 

352 tr.meta = None 

353 tr.file = trf 

354 traces.append(tr) 

355 

356 trf.traces = traces 

357 

358 cache_copy[fn] = trf 

359 

360 tmpfn = cachefilename+'.%i.tmp' % os.getpid() 

361 with open(tmpfn, 'wb') as f: 

362 pickle.dump(cache_copy, f, protocol=2) 

363 

364 if is_windows and os.path.exists(cachefilename): 

365 # windows doesn't allow to rename over existing file 

366 os.unlink(cachefilename) 

367 

368 os.rename(tmpfn, cachefilename) 

369 

370 

371def get_cache(cachedir): 

372 ''' 

373 Get global TracesFileCache object for given directory. 

374 ''' 

375 if cachedir not in TracesFileCache.caches: 

376 TracesFileCache.caches[cachedir] = TracesFileCache(cachedir) 

377 

378 return TracesFileCache.caches[cachedir] 

379 

380 

381def loader( 

382 filenames, fileformat, cache, filename_attributes, 

383 show_progress=True, update_progress=None): 

384 

385 if show_progress_force_off: 

386 show_progress = False 

387 

388 class Progress(object): 

389 def __init__(self, label, n): 

390 self._label = label 

391 self._n = n 

392 self._bar = None 

393 if show_progress: 

394 self._bar = util.progressbar(label, self._n) 

395 

396 if update_progress: 

397 update_progress(label, 0, self._n) 

398 

399 def update(self, i): 

400 if self._bar: 

401 if i < self._n-1: 

402 self._bar.update(i) 

403 else: 

404 self._bar.finish() 

405 self._bar = None 

406 

407 abort = False 

408 if update_progress: 

409 abort = update_progress(self._label, i, self._n) 

410 

411 return abort 

412 

413 def finish(self): 

414 if self._bar: 

415 self._bar.finish() 

416 self._bar = None 

417 

418 if not filenames: 

419 logger.warning('No files to load from') 

420 return 

421 

422 regex = None 

423 if filename_attributes: 

424 regex = re.compile(filename_attributes) 

425 

426 try: 

427 progress = Progress('Looking at files', len(filenames)) 

428 

429 failures = [] 

430 to_load = [] 

431 for i, filename in enumerate(filenames): 

432 try: 

433 abspath = os.path.abspath(filename) 

434 

435 substitutions = None 

436 if regex: 

437 m = regex.search(filename) 

438 if not m: 

439 raise FilenameAttributeError( 

440 "Cannot get attributes with pattern '%s' " 

441 "from path '%s'" % (filename_attributes, filename)) 

442 

443 substitutions = {} 

444 for k in m.groupdict(): 

445 if k in ('network', 'station', 'location', 'channel'): 

446 substitutions[k] = m.groupdict()[k] 

447 

448 mtime = os.stat(filename)[8] 

449 tfile = None 

450 if cache: 

451 tfile = cache.get(abspath) 

452 

453 mustload = ( 

454 not tfile or 

455 (tfile.format != fileformat and fileformat != 'detect') or 

456 tfile.mtime != mtime or 

457 substitutions is not None) 

458 

459 to_load.append( 

460 (mustload, mtime, abspath, substitutions, tfile)) 

461 

462 except (OSError, FilenameAttributeError) as xerror: 

463 failures.append(abspath) 

464 logger.warning(xerror) 

465 

466 abort = progress.update(i+1) 

467 if abort: 

468 progress.update(len(filenames)) 

469 return 

470 

471 progress.update(len(filenames)) 

472 

473 to_load.sort(key=lambda x: x[2]) 

474 

475 nload = len([1 for x in to_load if x[0]]) 

476 iload = 0 

477 

478 count_all = False 

479 if nload < 0.01*len(to_load): 

480 nload = len(to_load) 

481 count_all = True 

482 

483 if to_load: 

484 progress = Progress('Scanning files', nload) 

485 

486 for (mustload, mtime, abspath, substitutions, tfile) in to_load: 

487 try: 

488 if mustload: 

489 tfile = TracesFile( 

490 None, abspath, fileformat, 

491 substitutions=substitutions, mtime=mtime) 

492 

493 if cache and not substitutions: 

494 cache.put(abspath, tfile) 

495 

496 if not count_all: 

497 iload += 1 

498 

499 if count_all: 

500 iload += 1 

501 

502 except (io.FileLoadError, OSError) as xerror: 

503 failures.append(abspath) 

504 logger.warning(xerror) 

505 else: 

506 yield tfile 

507 

508 abort = progress.update(iload+1) 

509 if abort: 

510 break 

511 

512 progress.update(nload) 

513 

514 if failures: 

515 logger.warning( 

516 'The following file%s caused problems and will be ignored:\n' % 

517 util.plural_s(len(failures)) + '\n'.join(failures)) 

518 

519 if cache: 

520 cache.dump_modified() 

521 finally: 

522 progress.finish() 

523 

524 

525def tlen(x): 

526 return x.tmax-x.tmin 

527 

528 

529class TracesGroup(object): 

530 

531 ''' 

532 Trace container base class. 

533 

534 Base class for Pile, SubPile, and TracesFile, i.e. anything containing 

535 a collection of several traces. A TracesGroup object maintains lookup sets 

536 of some of the traces meta-information, as well as a combined time-range 

537 of its contents. 

538 ''' 

539 

540 def __init__(self, parent): 

541 self.parent = parent 

542 self.empty() 

543 self.nupdates = 0 

544 self.abspath = None 

545 

546 def set_parent(self, parent): 

547 self.parent = parent 

548 

549 def get_parent(self): 

550 return self.parent 

551 

552 def empty(self): 

553 self.networks, self.stations, self.locations, self.channels, \ 

554 self.nslc_ids, self.deltats = [Counter() for x in range(6)] 

555 self.by_tmin = Sorted([], 'tmin') 

556 self.by_tmax = Sorted([], 'tmax') 

557 self.by_tlen = Sorted([], tlen) 

558 self.by_mtime = Sorted([], 'mtime') 

559 self.tmin, self.tmax = None, None 

560 self.deltatmin, self.deltatmax = None, None 

561 

562 def trees_from_content(self, content): 

563 self.by_tmin = Sorted(content, 'tmin') 

564 self.by_tmax = Sorted(content, 'tmax') 

565 self.by_tlen = Sorted(content, tlen) 

566 self.by_mtime = Sorted(content, 'mtime') 

567 self.adjust_minmax() 

568 

569 def fix_unicode_codes(self): 

570 for net in self.networks: 

571 if isinstance(net, str): 

572 return 

573 

574 self.networks = fix_unicode_copy(self.networks, str) 

575 self.stations = fix_unicode_copy(self.stations, str) 

576 self.locations = fix_unicode_copy(self.locations, str) 

577 self.channels = fix_unicode_copy(self.channels, str) 

578 self.nslc_ids = fix_unicode_copy( 

579 self.nslc_ids, lambda k: tuple(str(x) for x in k)) 

580 

581 def add(self, content): 

582 ''' 

583 Add content to traces group and update indices. 

584 

585 Accepts :py:class:`pyrocko.trace.Trace` objects and 

586 :py:class:`pyrocko.pile.TracesGroup` objects. 

587 ''' 

588 

589 if isinstance(content, (trace.Trace, TracesGroup)): 

590 content = [content] 

591 

592 for c in content: 

593 

594 if isinstance(c, TracesGroup): 

595 self.networks.update(c.networks) 

596 self.stations.update(c.stations) 

597 self.locations.update(c.locations) 

598 self.channels.update(c.channels) 

599 self.nslc_ids.update(c.nslc_ids) 

600 self.deltats.update(c.deltats) 

601 

602 self.by_tmin.insert_many(c.by_tmin) 

603 self.by_tmax.insert_many(c.by_tmax) 

604 self.by_tlen.insert_many(c.by_tlen) 

605 self.by_mtime.insert_many(c.by_mtime) 

606 

607 elif isinstance(c, trace.Trace): 

608 self.networks[c.network] += 1 

609 self.stations[c.station] += 1 

610 self.locations[c.location] += 1 

611 self.channels[c.channel] += 1 

612 self.nslc_ids[c.nslc_id] += 1 

613 self.deltats[c.deltat] += 1 

614 

615 self.by_tmin.insert(c) 

616 self.by_tmax.insert(c) 

617 self.by_tlen.insert(c) 

618 self.by_mtime.insert(c) 

619 

620 self.adjust_minmax() 

621 

622 self.nupdates += 1 

623 self.notify_listeners('add', content) 

624 

625 if self.parent is not None: 

626 self.parent.add(content) 

627 

628 def remove(self, content): 

629 ''' 

630 Remove content to traces group and update indices. 

631 ''' 

632 if isinstance(content, (trace.Trace, TracesGroup)): 

633 content = [content] 

634 

635 for c in content: 

636 

637 if isinstance(c, TracesGroup): 

638 self.networks.subtract(c.networks) 

639 self.stations.subtract(c.stations) 

640 self.locations.subtract(c.locations) 

641 self.channels.subtract(c.channels) 

642 self.nslc_ids.subtract(c.nslc_ids) 

643 self.deltats.subtract(c.deltats) 

644 

645 self.by_tmin.remove_many(c.by_tmin) 

646 self.by_tmax.remove_many(c.by_tmax) 

647 self.by_tlen.remove_many(c.by_tlen) 

648 self.by_mtime.remove_many(c.by_mtime) 

649 

650 elif isinstance(c, trace.Trace): 

651 self.networks.subtract1(c.network) 

652 self.stations.subtract1(c.station) 

653 self.locations.subtract1(c.location) 

654 self.channels.subtract1(c.channel) 

655 self.nslc_ids.subtract1(c.nslc_id) 

656 self.deltats.subtract1(c.deltat) 

657 

658 self.by_tmin.remove(c) 

659 self.by_tmax.remove(c) 

660 self.by_tlen.remove(c) 

661 self.by_mtime.remove(c) 

662 

663 self.adjust_minmax() 

664 

665 self.nupdates += 1 

666 self.notify_listeners('remove', content) 

667 

668 if self.parent is not None: 

669 self.parent.remove(content) 

670 

671 def relevant(self, tmin, tmax, group_selector=None, trace_selector=None): 

672 ''' 

673 Return list of :py:class:`pyrocko.trace.Trace` objects where given 

674 arguments ``tmin`` and ``tmax`` match. 

675 

676 :param tmin: start time 

677 :param tmax: end time 

678 :param group_selector: lambda expression taking group dict of regex 

679 match object as a single argument and which returns true or false 

680 to keep or reject a file (default: ``None``) 

681 :param trace_selector: lambda expression taking group dict of regex 

682 match object as a single argument and which returns true or false 

683 to keep or reject a file (default: ``None``) 

684 ''' 

685 

686 if not self.by_tmin or not self.is_relevant( 

687 tmin, tmax, group_selector): 

688 

689 return [] 

690 

691 return [tr for tr in self.by_tmin.with_key_in(tmin-self.tlenmax, tmax) 

692 if tr.is_relevant(tmin, tmax, trace_selector)] 

693 

694 def adjust_minmax(self): 

695 if self.by_tmin: 

696 self.tmin = self.by_tmin.min().tmin 

697 self.tmax = self.by_tmax.max().tmax 

698 t = self.by_tlen.max() 

699 self.tlenmax = t.tmax - t.tmin 

700 self.mtime = self.by_mtime.max().mtime 

701 deltats = list(self.deltats.keys()) 

702 self.deltatmin = min(deltats) 

703 self.deltatmax = max(deltats) 

704 else: 

705 self.tmin = None 

706 self.tmax = None 

707 self.tlenmax = None 

708 self.mtime = None 

709 self.deltatmin = None 

710 self.deltatmax = None 

711 

712 def notify_listeners(self, what, content): 

713 pass 

714 

715 def get_update_count(self): 

716 return self.nupdates 

717 

718 def overlaps(self, tmin, tmax): 

719 return self.tmin is not None \ 

720 and tmax >= self.tmin and self.tmax >= tmin 

721 

722 def is_relevant(self, tmin, tmax, group_selector=None): 

723 if self.tmin is None or self.tmax is None: 

724 return False 

725 return tmax >= self.tmin and self.tmax >= tmin and ( 

726 group_selector is None or group_selector(self)) 

727 

728 

729class MemTracesFile(TracesGroup): 

730 

731 ''' 

732 This is needed to make traces without an actual disc file to be inserted 

733 into a Pile. 

734 ''' 

735 

736 def __init__(self, parent, traces): 

737 TracesGroup.__init__(self, parent) 

738 self.add(traces) 

739 self.mtime = time.time() 

740 

741 def add(self, traces): 

742 if isinstance(traces, trace.Trace): 

743 traces = [traces] 

744 

745 for tr in traces: 

746 tr.file = self 

747 

748 TracesGroup.add(self, traces) 

749 

750 def load_headers(self, mtime=None): 

751 pass 

752 

753 def load_data(self): 

754 pass 

755 

756 def use_data(self): 

757 pass 

758 

759 def drop_data(self): 

760 pass 

761 

762 def reload_if_modified(self): 

763 return False 

764 

765 def iter_traces(self): 

766 for tr in self.by_tmin: 

767 yield tr 

768 

769 def get_traces(self): 

770 return list(self.by_tmin) 

771 

772 def gather_keys(self, gather, selector=None): 

773 keys = set() 

774 for tr in self.by_tmin: 

775 if selector is None or selector(tr): 

776 keys.add(gather(tr)) 

777 

778 return keys 

779 

780 def __str__(self): 

781 

782 s = 'MemTracesFile\n' 

783 s += 'file mtime: %s\n' % util.time_to_str(self.mtime) 

784 s += 'number of traces: %i\n' % len(self.by_tmin) 

785 s += 'timerange: %s - %s\n' % ( 

786 util.time_to_str(self.tmin), util.time_to_str(self.tmax)) 

787 s += 'networks: %s\n' % ', '.join(sl(self.networks.keys())) 

788 s += 'stations: %s\n' % ', '.join(sl(self.stations.keys())) 

789 s += 'locations: %s\n' % ', '.join(sl(self.locations.keys())) 

790 s += 'channels: %s\n' % ', '.join(sl(self.channels.keys())) 

791 s += 'deltats: %s\n' % ', '.join(sl(self.deltats.keys())) 

792 return s 

793 

794 

795class TracesFile(TracesGroup): 

796 def __init__( 

797 self, parent, abspath, format, 

798 substitutions=None, mtime=None): 

799 

800 TracesGroup.__init__(self, parent) 

801 self.abspath = abspath 

802 self.format = format 

803 self.traces = [] 

804 self.data_loaded = False 

805 self.data_use_count = 0 

806 self.substitutions = substitutions 

807 self.load_headers(mtime=mtime) 

808 self.mtime = mtime 

809 

810 def load_headers(self, mtime=None): 

811 logger.debug('loading headers from file: %s' % self.abspath) 

812 if mtime is None: 

813 self.mtime = os.stat(self.abspath)[8] 

814 

815 def kgen(tr): 

816 return (tr.mtime, tr.tmin, tr.tmax) + tr.nslc_id 

817 

818 self.remove(self.traces) 

819 ks = set() 

820 for tr in io.load(self.abspath, 

821 format=self.format, 

822 getdata=False, 

823 substitutions=self.substitutions): 

824 

825 k = kgen(tr) 

826 if k not in ks: 

827 ks.add(k) 

828 self.traces.append(tr) 

829 tr.file = self 

830 

831 self.add(self.traces) 

832 

833 self.data_loaded = False 

834 self.data_use_count = 0 

835 

836 def load_data(self, force=False): 

837 file_changed = False 

838 if not self.data_loaded or force: 

839 logger.debug('loading data from file: %s' % self.abspath) 

840 

841 def kgen(tr): 

842 return (tr.mtime, tr.tmin, tr.tmax) + tr.nslc_id 

843 

844 traces_ = io.load(self.abspath, format=self.format, getdata=True, 

845 substitutions=self.substitutions) 

846 

847 # prevent adding duplicate snippets from corrupt mseed files 

848 k_loaded = set() 

849 traces = [] 

850 for tr in traces_: 

851 k = kgen(tr) 

852 if k not in k_loaded: 

853 k_loaded.add(k) 

854 traces.append(tr) 

855 

856 k_current_d = dict((kgen(tr), tr) for tr in self.traces) 

857 k_current = set(k_current_d) 

858 k_new = k_loaded - k_current 

859 k_delete = k_current - k_loaded 

860 k_unchanged = k_current & k_loaded 

861 

862 for tr in self.traces[:]: 

863 if kgen(tr) in k_delete: 

864 self.remove(tr) 

865 self.traces.remove(tr) 

866 tr.file = None 

867 file_changed = True 

868 

869 for tr in traces: 

870 if kgen(tr) in k_new: 

871 tr.file = self 

872 self.traces.append(tr) 

873 self.add(tr) 

874 file_changed = True 

875 

876 for tr in traces: 

877 if kgen(tr) in k_unchanged: 

878 ctr = k_current_d[kgen(tr)] 

879 ctr.ydata = tr.ydata 

880 

881 self.data_loaded = True 

882 

883 if file_changed: 

884 logger.debug('reloaded (file may have changed): %s' % self.abspath) 

885 

886 return file_changed 

887 

888 def use_data(self): 

889 if not self.data_loaded: 

890 raise Exception('Data not loaded') 

891 self.data_use_count += 1 

892 

893 def drop_data(self): 

894 if self.data_loaded: 

895 if self.data_use_count == 1: 

896 logger.debug('forgetting data of file: %s' % self.abspath) 

897 for tr in self.traces: 

898 tr.drop_data() 

899 

900 self.data_loaded = False 

901 

902 self.data_use_count -= 1 

903 else: 

904 self.data_use_count = 0 

905 

906 def reload_if_modified(self): 

907 mtime = os.stat(self.abspath)[8] 

908 if mtime != self.mtime: 

909 logger.debug( 

910 'mtime=%i, reloading file: %s' % (mtime, self.abspath)) 

911 

912 self.mtime = mtime 

913 if self.data_loaded: 

914 self.load_data(force=True) 

915 else: 

916 self.load_headers() 

917 

918 return True 

919 

920 return False 

921 

922 def iter_traces(self): 

923 for tr in self.traces: 

924 yield tr 

925 

926 def gather_keys(self, gather, selector=None): 

927 keys = set() 

928 for tr in self.by_tmin: 

929 if selector is None or selector(tr): 

930 keys.add(gather(tr)) 

931 

932 return keys 

933 

934 def __str__(self): 

935 s = 'TracesFile\n' 

936 s += 'abspath: %s\n' % self.abspath 

937 s += 'file mtime: %s\n' % util.time_to_str(self.mtime) 

938 s += 'number of traces: %i\n' % len(self.traces) 

939 s += 'timerange: %s - %s\n' % ( 

940 util.time_to_str(self.tmin), util.time_to_str(self.tmax)) 

941 s += 'networks: %s\n' % ', '.join(sl(self.networks.keys())) 

942 s += 'stations: %s\n' % ', '.join(sl(self.stations.keys())) 

943 s += 'locations: %s\n' % ', '.join(sl(self.locations.keys())) 

944 s += 'channels: %s\n' % ', '.join(sl(self.channels.keys())) 

945 s += 'deltats: %s\n' % ', '.join(sl(self.deltats.keys())) 

946 return s 

947 

948 

949class FilenameAttributeError(Exception): 

950 pass 

951 

952 

953class SubPile(TracesGroup): 

954 def __init__(self, parent): 

955 TracesGroup.__init__(self, parent) 

956 self.files = [] 

957 self.empty() 

958 

959 def add_file(self, file): 

960 self.files.append(file) 

961 file.set_parent(self) 

962 self.add(file) 

963 

964 def remove_file(self, file): 

965 self.files.remove(file) 

966 file.set_parent(None) 

967 self.remove(file) 

968 

969 def remove_files(self, files): 

970 for file in files: 

971 self.files.remove(file) 

972 file.set_parent(None) 

973 self.remove(files) 

974 

975 def gather_keys(self, gather, selector=None): 

976 keys = set() 

977 for file in self.files: 

978 keys |= file.gather_keys(gather, selector) 

979 

980 return keys 

981 

982 def iter_traces( 

983 self, 

984 load_data=False, 

985 return_abspath=False, 

986 group_selector=None, 

987 trace_selector=None): 

988 

989 for file in self.files: 

990 

991 if group_selector and not group_selector(file): 

992 continue 

993 

994 must_drop = False 

995 if load_data: 

996 file.load_data() 

997 file.use_data() 

998 must_drop = True 

999 

1000 for tr in file.iter_traces(): 

1001 if trace_selector and not trace_selector(tr): 

1002 continue 

1003 

1004 if return_abspath: 

1005 yield file.abspath, tr 

1006 else: 

1007 yield tr 

1008 

1009 if must_drop: 

1010 file.drop_data() 

1011 

1012 def iter_files(self): 

1013 for file in self.files: 

1014 yield file 

1015 

1016 def reload_modified(self): 

1017 modified = False 

1018 for file in self.files: 

1019 modified |= file.reload_if_modified() 

1020 

1021 return modified 

1022 

1023 def __str__(self): 

1024 s = 'SubPile\n' 

1025 s += 'number of files: %i\n' % len(self.files) 

1026 s += 'timerange: %s - %s\n' % ( 

1027 util.time_to_str(self.tmin), util.time_to_str(self.tmax)) 

1028 s += 'networks: %s\n' % ', '.join(sl(self.networks.keys())) 

1029 s += 'stations: %s\n' % ', '.join(sl(self.stations.keys())) 

1030 s += 'locations: %s\n' % ', '.join(sl(self.locations.keys())) 

1031 s += 'channels: %s\n' % ', '.join(sl(self.channels.keys())) 

1032 s += 'deltats: %s\n' % ', '.join(sl(self.deltats.keys())) 

1033 return s 

1034 

1035 

1036class Batch(object): 

1037 ''' 

1038 Batch of waveforms from window wise data extraction. 

1039 

1040 Encapsulates state and results yielded for each window in window wise 

1041 waveform extraction with the :py:meth:`Pile.chopper` method (when the 

1042 `style='batch'` keyword argument set). 

1043 

1044 *Attributes:* 

1045 

1046 .. py:attribute:: tmin 

1047 

1048 Start of this time window. 

1049 

1050 .. py:attribute:: tmax 

1051 

1052 End of this time window. 

1053 

1054 .. py:attribute:: i 

1055 

1056 Index of this time window in sequence. 

1057 

1058 .. py:attribute:: n 

1059 

1060 Total number of time windows in sequence. 

1061 

1062 .. py:attribute:: traces 

1063 

1064 Extracted waveforms for this time window. 

1065 ''' 

1066 

1067 def __init__(self, tmin, tmax, i, n, traces): 

1068 self.tmin = tmin 

1069 self.tmax = tmax 

1070 self.i = i 

1071 self.n = n 

1072 self.traces = traces 

1073 

1074 

1075class Pile(TracesGroup): 

1076 ''' 

1077 Waveform archive lookup, data loading and caching infrastructure. 

1078 ''' 

1079 

1080 def __init__(self): 

1081 TracesGroup.__init__(self, None) 

1082 self.subpiles = {} 

1083 self.open_files = {} 

1084 self.listeners = [] 

1085 self.abspaths = set() 

1086 

1087 def add_listener(self, obj): 

1088 self.listeners.append(util.smart_weakref(obj)) 

1089 

1090 def notify_listeners(self, what, content): 

1091 for ref in self.listeners: 

1092 obj = ref() 

1093 if obj: 

1094 obj(what, content) 

1095 

1096 def load_files( 

1097 self, filenames, 

1098 filename_attributes=None, 

1099 fileformat='mseed', 

1100 cache=None, 

1101 show_progress=True, 

1102 update_progress=None): 

1103 

1104 load = loader( 

1105 filenames, fileformat, cache, filename_attributes, 

1106 show_progress=show_progress, 

1107 update_progress=update_progress) 

1108 

1109 self.add_files(load) 

1110 

1111 def add_files(self, files): 

1112 for file in files: 

1113 self.add_file(file) 

1114 

1115 def add_file(self, file): 

1116 if file.abspath is not None and file.abspath in self.abspaths: 

1117 logger.warning('File already in pile: %s' % file.abspath) 

1118 return 

1119 

1120 if file.deltatmin is None: 

1121 logger.warning('Sampling rate of all traces are zero in file: %s' % 

1122 file.abspath) 

1123 return 

1124 

1125 subpile = self.dispatch(file) 

1126 subpile.add_file(file) 

1127 if file.abspath is not None: 

1128 self.abspaths.add(file.abspath) 

1129 

1130 def remove_file(self, file): 

1131 subpile = file.get_parent() 

1132 if subpile is not None: 

1133 subpile.remove_file(file) 

1134 if file.abspath is not None: 

1135 self.abspaths.remove(file.abspath) 

1136 

1137 def remove_files(self, files): 

1138 subpile_files = {} 

1139 for file in files: 

1140 subpile = file.get_parent() 

1141 if subpile not in subpile_files: 

1142 subpile_files[subpile] = [] 

1143 

1144 subpile_files[subpile].append(file) 

1145 

1146 for subpile, files in subpile_files.items(): 

1147 subpile.remove_files(files) 

1148 for file in files: 

1149 if file.abspath is not None: 

1150 self.abspaths.remove(file.abspath) 

1151 

1152 def dispatch_key(self, file): 

1153 dt = int(math.floor(math.log(file.deltatmin))) 

1154 return dt 

1155 

1156 def dispatch(self, file): 

1157 k = self.dispatch_key(file) 

1158 if k not in self.subpiles: 

1159 self.subpiles[k] = SubPile(self) 

1160 

1161 return self.subpiles[k] 

1162 

1163 def get_deltats(self): 

1164 return list(self.deltats.keys()) 

1165 

1166 def chop( 

1167 self, tmin, tmax, 

1168 group_selector=None, 

1169 trace_selector=None, 

1170 snap=(round, round), 

1171 include_last=False, 

1172 load_data=True): 

1173 

1174 chopped = [] 

1175 used_files = set() 

1176 

1177 traces = self.relevant(tmin, tmax, group_selector, trace_selector) 

1178 if load_data: 

1179 files_changed = False 

1180 for tr in traces: 

1181 if tr.file and tr.file not in used_files: 

1182 if tr.file.load_data(): 

1183 files_changed = True 

1184 

1185 if tr.file is not None: 

1186 used_files.add(tr.file) 

1187 

1188 if files_changed: 

1189 traces = self.relevant( 

1190 tmin, tmax, group_selector, trace_selector) 

1191 

1192 for tr in traces: 

1193 if not load_data and tr.ydata is not None: 

1194 tr = tr.copy(data=False) 

1195 tr.ydata = None 

1196 

1197 try: 

1198 chopped.append(tr.chop( 

1199 tmin, tmax, 

1200 inplace=False, 

1201 snap=snap, 

1202 include_last=include_last)) 

1203 

1204 except trace.NoData: 

1205 pass 

1206 

1207 return chopped, used_files 

1208 

1209 def _process_chopped( 

1210 self, chopped, degap, maxgap, maxlap, want_incomplete, wmax, wmin, 

1211 tpad): 

1212 

1213 chopped.sort(key=lambda a: a.full_id) 

1214 if degap: 

1215 chopped = degapper(chopped, maxgap=maxgap, maxlap=maxlap) 

1216 

1217 if not want_incomplete: 

1218 chopped_weeded = [] 

1219 for tr in chopped: 

1220 emin = tr.tmin - (wmin-tpad) 

1221 emax = tr.tmax + tr.deltat - (wmax+tpad) 

1222 if (abs(emin) <= 0.5*tr.deltat and abs(emax) <= 0.5*tr.deltat): 

1223 chopped_weeded.append(tr) 

1224 

1225 elif degap: 

1226 if (0. < emin <= 5. * tr.deltat and 

1227 -5. * tr.deltat <= emax < 0.): 

1228 

1229 tr.extend( 

1230 wmin-tpad, 

1231 wmax+tpad-tr.deltat, 

1232 fillmethod='repeat') 

1233 

1234 chopped_weeded.append(tr) 

1235 

1236 chopped = chopped_weeded 

1237 

1238 for tr in chopped: 

1239 tr.wmin = wmin 

1240 tr.wmax = wmax 

1241 

1242 return chopped 

1243 

1244 def chopper( 

1245 self, 

1246 tmin=None, tmax=None, tinc=None, tpad=0., 

1247 group_selector=None, trace_selector=None, 

1248 want_incomplete=True, degap=True, maxgap=5, maxlap=None, 

1249 keep_current_files_open=False, accessor_id=None, 

1250 snap=(round, round), include_last=False, load_data=True, 

1251 style=None): 

1252 

1253 ''' 

1254 Get iterator for shifting window wise data extraction from waveform 

1255 archive. 

1256 

1257 :param tmin: start time (default uses start time of available data) 

1258 :param tmax: end time (default uses end time of available data) 

1259 :param tinc: time increment (window shift time) (default uses 

1260 ``tmax-tmin``) 

1261 :param tpad: padding time appended on either side of the data windows 

1262 (window overlap is ``2*tpad``) 

1263 :param group_selector: filter callback taking :py:class:`TracesGroup` 

1264 objects 

1265 :param trace_selector: filter callback taking 

1266 :py:class:`pyrocko.trace.Trace` objects 

1267 :param want_incomplete: if set to ``False``, gappy/incomplete traces 

1268 are discarded from the results 

1269 :param degap: whether to try to connect traces and to remove gaps and 

1270 overlaps 

1271 :param maxgap: maximum gap size in samples which is filled with 

1272 interpolated samples when ``degap`` is ``True`` 

1273 :param maxlap: maximum overlap size in samples which is removed when 

1274 ``degap`` is ``True`` 

1275 :param keep_current_files_open: whether to keep cached trace data in 

1276 memory after the iterator has ended 

1277 :param accessor_id: if given, used as a key to identify different 

1278 points of extraction for the decision of when to release cached 

1279 trace data (should be used when data is alternately extracted from 

1280 more than one region / selection) 

1281 :param snap: replaces Python's :py:func:`round` function which is used 

1282 to determine indices where to start and end the trace data array 

1283 :param include_last: whether to include last sample 

1284 :param load_data: whether to load the waveform data. If set to 

1285 ``False``, traces with no data samples, but with correct 

1286 meta-information are returned 

1287 :param style: set to ``'batch'`` to yield waveforms and information 

1288 about the chopper state as :py:class:`Batch` objects. By default 

1289 lists of :py:class:`pyrocko.trace.Trace` objects are yielded. 

1290 :returns: iterator providing extracted waveforms for each extracted 

1291 window. See ``style`` argument for details. 

1292 ''' 

1293 if tmin is None: 

1294 if self.tmin is None: 

1295 logger.warning("Pile's tmin is not set - pile may be empty.") 

1296 return 

1297 tmin = self.tmin + tpad 

1298 

1299 if tmax is None: 

1300 if self.tmax is None: 

1301 logger.warning("Pile's tmax is not set - pile may be empty.") 

1302 return 

1303 tmax = self.tmax - tpad 

1304 

1305 if not self.is_relevant(tmin-tpad, tmax+tpad, group_selector): 

1306 return 

1307 

1308 if accessor_id not in self.open_files: 

1309 self.open_files[accessor_id] = set() 

1310 

1311 open_files = self.open_files[accessor_id] 

1312 

1313 if tinc is None: 

1314 tinc = tmax - tmin 

1315 nwin = 1 

1316 else: 

1317 eps = tinc * 1e-6 

1318 if tinc != 0.0: 

1319 nwin = int(((tmax - eps) - tmin) / tinc) + 1 

1320 else: 

1321 nwin = 1 

1322 

1323 for iwin in range(nwin): 

1324 wmin, wmax = tmin+iwin*tinc, min(tmin+(iwin+1)*tinc, tmax) 

1325 

1326 chopped, used_files = self.chop( 

1327 wmin-tpad, wmax+tpad, group_selector, trace_selector, snap, 

1328 include_last, load_data) 

1329 

1330 for file in used_files - open_files: 

1331 # increment datause counter on newly opened files 

1332 file.use_data() 

1333 

1334 open_files.update(used_files) 

1335 

1336 processed = self._process_chopped( 

1337 chopped, degap, maxgap, maxlap, want_incomplete, wmax, wmin, 

1338 tpad) 

1339 

1340 if style == 'batch': 

1341 yield Batch( 

1342 tmin=wmin, 

1343 tmax=wmax, 

1344 i=iwin, 

1345 n=nwin, 

1346 traces=processed) 

1347 

1348 else: 

1349 yield processed 

1350 

1351 unused_files = open_files - used_files 

1352 

1353 while unused_files: 

1354 file = unused_files.pop() 

1355 file.drop_data() 

1356 open_files.remove(file) 

1357 

1358 if not keep_current_files_open: 

1359 while open_files: 

1360 file = open_files.pop() 

1361 file.drop_data() 

1362 

1363 def all(self, *args, **kwargs): 

1364 ''' 

1365 Shortcut to aggregate :py:meth:`chopper` output into a single list. 

1366 ''' 

1367 

1368 alltraces = [] 

1369 for traces in self.chopper(*args, **kwargs): 

1370 alltraces.extend(traces) 

1371 

1372 return alltraces 

1373 

1374 def iter_all(self, *args, **kwargs): 

1375 for traces in self.chopper(*args, **kwargs): 

1376 for tr in traces: 

1377 yield tr 

1378 

1379 def chopper_grouped(self, gather, progress=None, *args, **kwargs): 

1380 keys = self.gather_keys(gather) 

1381 if len(keys) == 0: 

1382 return 

1383 

1384 outer_group_selector = None 

1385 if 'group_selector' in kwargs: 

1386 outer_group_selector = kwargs['group_selector'] 

1387 

1388 outer_trace_selector = None 

1389 if 'trace_selector' in kwargs: 

1390 outer_trace_selector = kwargs['trace_selector'] 

1391 

1392 # the use of this gather-cache makes it impossible to modify the pile 

1393 # during chopping 

1394 gather_cache = {} 

1395 pbar = None 

1396 try: 

1397 if progress is not None: 

1398 pbar = util.progressbar(progress, len(keys)) 

1399 

1400 for ikey, key in enumerate(keys): 

1401 def tsel(tr): 

1402 return gather(tr) == key and ( 

1403 outer_trace_selector is None 

1404 or outer_trace_selector(tr)) 

1405 

1406 def gsel(gr): 

1407 if gr not in gather_cache: 

1408 gather_cache[gr] = gr.gather_keys(gather) 

1409 

1410 return key in gather_cache[gr] and ( 

1411 outer_group_selector is None 

1412 or outer_group_selector(gr)) 

1413 

1414 kwargs['trace_selector'] = tsel 

1415 kwargs['group_selector'] = gsel 

1416 

1417 for traces in self.chopper(*args, **kwargs): 

1418 yield traces 

1419 

1420 if pbar: 

1421 pbar.update(ikey+1) 

1422 

1423 finally: 

1424 if pbar: 

1425 pbar.finish() 

1426 

1427 def gather_keys(self, gather, selector=None): 

1428 keys = set() 

1429 for subpile in self.subpiles.values(): 

1430 keys |= subpile.gather_keys(gather, selector) 

1431 

1432 return sorted(keys) 

1433 

1434 def iter_traces( 

1435 self, 

1436 load_data=False, 

1437 return_abspath=False, 

1438 group_selector=None, 

1439 trace_selector=None): 

1440 

1441 ''' 

1442 Iterate over all traces in pile. 

1443 

1444 :param load_data: whether to load the waveform data, by default empty 

1445 traces are yielded 

1446 :param return_abspath: if ``True`` yield tuples containing absolute 

1447 file path and :py:class:`pyrocko.trace.Trace` objects 

1448 :param group_selector: filter callback taking :py:class:`TracesGroup` 

1449 objects 

1450 :param trace_selector: filter callback taking 

1451 :py:class:`pyrocko.trace.Trace` objects 

1452 

1453 Example; yields only traces, where the station code is 'HH1':: 

1454 

1455 test_pile = pile.make_pile('/local/test_trace_directory') 

1456 for t in test_pile.iter_traces( 

1457 trace_selector=lambda tr: tr.station=='HH1'): 

1458 

1459 print t 

1460 ''' 

1461 

1462 for subpile in self.subpiles.values(): 

1463 if not group_selector or group_selector(subpile): 

1464 for tr in subpile.iter_traces(load_data, return_abspath, 

1465 group_selector, trace_selector): 

1466 yield tr 

1467 

1468 def iter_files(self): 

1469 for subpile in self.subpiles.values(): 

1470 for file in subpile.iter_files(): 

1471 yield file 

1472 

1473 def reload_modified(self): 

1474 modified = False 

1475 for subpile in self.subpiles.values(): 

1476 modified |= subpile.reload_modified() 

1477 

1478 return modified 

1479 

1480 def get_tmin(self): 

1481 return self.tmin 

1482 

1483 def get_tmax(self): 

1484 return self.tmax 

1485 

1486 def get_deltatmin(self): 

1487 return self.deltatmin 

1488 

1489 def get_deltatmax(self): 

1490 return self.deltatmax 

1491 

1492 def is_empty(self): 

1493 return self.tmin is None and self.tmax is None 

1494 

1495 def __str__(self): 

1496 if self.tmin is not None and self.tmax is not None: 

1497 tmin = util.time_to_str(self.tmin) 

1498 tmax = util.time_to_str(self.tmax) 

1499 s = 'Pile\n' 

1500 s += 'number of subpiles: %i\n' % len(self.subpiles) 

1501 s += 'timerange: %s - %s\n' % (tmin, tmax) 

1502 s += 'networks: %s\n' % ', '.join(sl(self.networks.keys())) 

1503 s += 'stations: %s\n' % ', '.join(sl(self.stations.keys())) 

1504 s += 'locations: %s\n' % ', '.join(sl(self.locations.keys())) 

1505 s += 'channels: %s\n' % ', '.join(sl(self.channels.keys())) 

1506 s += 'deltats: %s\n' % ', '.join(sl(self.deltats.keys())) 

1507 

1508 else: 

1509 s = 'empty Pile' 

1510 

1511 return s 

1512 

1513 def snuffle(self, **kwargs): 

1514 ''' 

1515 Visualize it. 

1516 

1517 :param stations: list of :py:class:`pyrocko.model.station.Station` 

1518 objects or ``None`` 

1519 :param events: list of :py:class:`pyrocko.model.event.Event` objects or 

1520 ``None`` 

1521 :param markers: list of :py:class:`pyrocko.gui.snuffler.marker.Marker` 

1522 objects or ``None`` 

1523 :param ntracks: float, number of tracks to be shown initially 

1524 (default: 12) 

1525 :param follow: time interval (in seconds) for real time follow mode or 

1526 ``None`` 

1527 :param controls: bool, whether to show the main controls (default: 

1528 ``True``) 

1529 :param opengl: bool, whether to use opengl (default: ``False``) 

1530 ''' 

1531 

1532 from pyrocko.gui.snuffler.snuffler import snuffle 

1533 snuffle(self, **kwargs) 

1534 

1535 

1536def make_pile( 

1537 paths=None, selector=None, regex=None, 

1538 fileformat='mseed', 

1539 cachedirname=None, show_progress=True): 

1540 

1541 ''' 

1542 Create pile from given file and directory names. 

1543 

1544 :param paths: filenames and/or directories to look for traces. If paths is 

1545 ``None`` ``sys.argv[1:]`` is used. 

1546 :param selector: lambda expression taking group dict of regex match object 

1547 as a single argument and which returns true or false to keep or reject 

1548 a file 

1549 :param regex: regular expression which filenames have to match 

1550 :param fileformat: format of the files ('mseed', 'sac', 'kan', 

1551 'from_extension', 'detect') 

1552 :param cachedirname: loader cache is stored under this directory. It is 

1553 created as neccessary. 

1554 :param show_progress: show progress bar and other progress information 

1555 ''' 

1556 

1557 if show_progress_force_off: 

1558 show_progress = False 

1559 

1560 if isinstance(paths, str): 

1561 paths = [paths] 

1562 

1563 if paths is None: 

1564 paths = sys.argv[1:] 

1565 

1566 if cachedirname is None: 

1567 cachedirname = config.config().cache_dir 

1568 

1569 fns = util.select_files( 

1570 paths, include=regex, selector=selector, show_progress=show_progress) 

1571 

1572 cache = get_cache(cachedirname) 

1573 p = Pile() 

1574 p.load_files( 

1575 sorted(fns), 

1576 cache=cache, 

1577 fileformat=fileformat, 

1578 show_progress=show_progress) 

1579 

1580 return p 

1581 

1582 

1583class Injector(trace.States): 

1584 

1585 def __init__( 

1586 self, pile, 

1587 fixation_length=None, 

1588 path=None, 

1589 format='from_extension', 

1590 forget_fixed=False): 

1591 

1592 trace.States.__init__(self) 

1593 self._pile = pile 

1594 self._fixation_length = fixation_length 

1595 self._format = format 

1596 self._path = path 

1597 self._forget_fixed = forget_fixed 

1598 

1599 def set_fixation_length(self, length): 

1600 ''' 

1601 Set length after which the fixation method is called on buffer traces. 

1602 

1603 The length should be given in seconds. Give None to disable. 

1604 ''' 

1605 self.fixate_all() 

1606 self._fixation_length = length # in seconds 

1607 

1608 def set_save_path( 

1609 self, 

1610 path='dump_%(network)s.%(station)s.%(location)s.%(channel)s_' 

1611 '%(tmin)s_%(tmax)s.mseed'): 

1612 

1613 self.fixate_all() 

1614 self._path = path 

1615 

1616 def inject(self, trace): 

1617 logger.debug('Received a trace: %s' % trace) 

1618 

1619 buf = self.get(trace) 

1620 if buf is None: 

1621 trbuf = trace.copy() 

1622 buf = MemTracesFile(None, [trbuf]) 

1623 self._pile.add_file(buf) 

1624 self.set(trace, buf) 

1625 

1626 else: 

1627 self._pile.remove_file(buf) 

1628 trbuf = buf.get_traces()[0] 

1629 buf.remove(trbuf) 

1630 trbuf.append(trace.ydata) 

1631 buf.add(trbuf) 

1632 self._pile.add_file(buf) 

1633 self.set(trace, buf) 

1634 

1635 trbuf = buf.get_traces()[0] 

1636 if self._fixation_length is not None: 

1637 if trbuf.tmax - trbuf.tmin > self._fixation_length: 

1638 self._fixate(buf, complete=False) 

1639 

1640 def fixate_all(self): 

1641 for state in list(self._states.values()): 

1642 self._fixate(state[-1]) 

1643 

1644 self._states = {} 

1645 

1646 def free(self, buf): 

1647 self._fixate(buf) 

1648 

1649 def _fixate(self, buf, complete=True): 

1650 trbuf = buf.get_traces()[0] 

1651 del_state = True 

1652 if self._path: 

1653 if self._fixation_length is not None: 

1654 ttmin = trbuf.tmin 

1655 ytmin = util.year_start(ttmin) 

1656 n = int(math.floor((ttmin - ytmin) / self._fixation_length)) 

1657 tmin = ytmin + n*self._fixation_length 

1658 traces = [] 

1659 t = tmin 

1660 while t <= trbuf.tmax: 

1661 try: 

1662 traces.append( 

1663 trbuf.chop( 

1664 t, 

1665 t+self._fixation_length, 

1666 inplace=False, 

1667 snap=(math.ceil, math.ceil))) 

1668 

1669 except trace.NoData: 

1670 pass 

1671 t += self._fixation_length 

1672 

1673 if abs(traces[-1].tmax - (t - trbuf.deltat)) < \ 

1674 trbuf.deltat/100. or complete: 

1675 

1676 self._pile.remove_file(buf) 

1677 

1678 else: # reinsert incomplete last part 

1679 new_trbuf = traces.pop() 

1680 self._pile.remove_file(buf) 

1681 buf.remove(trbuf) 

1682 buf.add(new_trbuf) 

1683 self._pile.add_file(buf) 

1684 del_state = False 

1685 

1686 else: 

1687 traces = [trbuf] 

1688 self._pile.remove_file(buf) 

1689 

1690 fns = io.save(traces, self._path, format=self._format) 

1691 

1692 if not self._forget_fixed: 

1693 self._pile.load_files( 

1694 fns, show_progress=False, fileformat=self._format) 

1695 

1696 if del_state: 

1697 del self._states[trbuf.nslc_id] 

1698 

1699 def __del__(self): 

1700 self.fixate_all()