1# https://pyrocko.org - GPLv3 

2# 

3# The Pyrocko Developers, 21st Century 

4# ---|P------/S----------~Lg---------- 

5 

6import os 

7import logging 

8import time 

9import copy 

10import re 

11import sys 

12import operator 

13import math 

14import hashlib 

15try: 

16 import cPickle as pickle 

17except ImportError: 

18 import pickle 

19 

20 

21from . import avl 

22from . import trace, io, util 

23from . import config 

24from .trace import degapper 

25 

26 

27is_windows = sys.platform.startswith('win') 

28show_progress_force_off = False 

29version_salt = 'v1-' 

30 

31 

32def ehash(s): 

33 return hashlib.sha1((version_salt + s).encode('utf8')).hexdigest() 

34 

35 

36def cmp(a, b): 

37 return int(a > b) - int(a < b) 

38 

39 

40def sl(s): 

41 return [str(x) for x in sorted(s)] 

42 

43 

44class Counter(dict): 

45 

46 def __missing__(self, k): 

47 return 0 

48 

49 def update(self, other): 

50 for k, v in other.items(): 

51 self[k] += v 

52 

53 def subtract(self, other): 

54 for k, v in other.items(): 

55 self[k] -= v 

56 if self[k] <= 0: 

57 del self[k] 

58 

59 def subtract1(self, k): 

60 self[k] -= 1 

61 if self[k] <= 0: 

62 del self[k] 

63 

64 

65def fix_unicode_copy(counter, func): 

66 counter_new = Counter() 

67 for k in counter: 

68 counter_new[func(k)] = counter[k] 

69 return counter_new 

70 

71 

72pjoin = os.path.join 

73logger = logging.getLogger('pyrocko.pile') 

74 

75 

76def avl_remove_exact(avltree, element): 

77 ilo, ihi = avltree.span(element) 

78 for i in range(ilo, ihi): 

79 if avltree[i] is element: 

80 avltree.remove_at(i) 

81 return i 

82 

83 raise ValueError( 

84 'avl_remove_exact(avltree, element): element not in avltree') 

85 

86 

87def cmpfunc(key): 

88 if isinstance(key, str): 

89 # special cases; these run about 50% faster than the generic one on 

90 # Python 2.5 

91 if key == 'tmin': 

92 return lambda a, b: cmp(a.tmin, b.tmin) 

93 if key == 'tmax': 

94 return lambda a, b: cmp(a.tmax, b.tmax) 

95 

96 key = operator.attrgetter(key) 

97 

98 return lambda a, b: cmp(key(a), key(b)) 

99 

100 

101g_dummys = {} 

102 

103 

104def get_dummy(key): 

105 if key not in g_dummys: 

106 class Dummy(object): 

107 def __init__(self, k): 

108 setattr(self, key, k) 

109 

110 g_dummys[key] = Dummy 

111 

112 return g_dummys[key] 

113 

114 

115class TooMany(Exception): 

116 def __init__(self, n): 

117 Exception.__init__(self) 

118 self.n = n 

119 

120 

121class Sorted(object): 

122 def __init__(self, values=[], key=None): 

123 self._set_key(key) 

124 self._avl = avl.new(values, self._cmp) 

125 

126 def _set_key(self, key): 

127 self._key = key 

128 self._cmp = cmpfunc(key) 

129 if isinstance(key, str): 

130 self._dummy = get_dummy(key) 

131 

132 def __getstate__(self): 

133 state = list(self._avl.iter()), self._key 

134 return state 

135 

136 def __setstate__(self, state): 

137 it, key = state 

138 self._set_key(key) 

139 self._avl = avl.from_iter(iter(it), len(it)) 

140 

141 def insert(self, value): 

142 self._avl.insert(value) 

143 

144 def remove(self, value): 

145 return avl_remove_exact(self._avl, value) 

146 

147 def remove_at(self, i): 

148 return self._avl.remove_at(i) 

149 

150 def insert_many(self, values): 

151 for value in values: 

152 self._avl.insert(value) 

153 

154 def remove_many(self, values): 

155 for value in values: 

156 avl_remove_exact(self._avl, value) 

157 

158 def __iter__(self): 

159 return iter(self._avl) 

160 

161 def with_key_in(self, kmin, kmax): 

162 omin, omax = self._dummy(kmin), self._dummy(kmax) 

163 ilo, ihi = self._avl.span(omin, omax) 

164 return self._avl[ilo:ihi] 

165 

166 def with_key_in_limited(self, kmin, kmax, nmax): 

167 omin, omax = self._dummy(kmin), self._dummy(kmax) 

168 ilo, ihi = self._avl.span(omin, omax) 

169 if ihi - ilo > nmax: 

170 raise TooMany(ihi - ilo) 

171 

172 return self._avl[ilo:ihi] 

173 

174 def index(self, value): 

175 ilo, ihi = self._avl.span(value) 

176 for i in range(ilo, ihi): 

177 if self._avl[i] is value: 

178 return i 

179 

180 raise ValueError('element is not in avl tree') 

181 

182 def min(self): 

183 return self._avl.min() 

184 

185 def max(self): 

186 return self._avl.max() 

187 

188 def __len__(self): 

189 return len(self._avl) 

190 

191 def __getitem__(self, i): 

192 return self._avl[i] 

193 

194 

195class TracesFileCache(object): 

196 ''' 

197 Manages trace metainformation cache. 

198 

199 For each directory with files containing traces, one cache file is 

200 maintained to hold the trace metainformation of all files which are 

201 contained in the directory. 

202 ''' 

203 

204 caches = {} 

205 

206 def __init__(self, cachedir): 

207 ''' 

208 Create new cache. 

209 

210 :param cachedir: directory to hold the cache files. 

211 ''' 

212 

213 self.cachedir = cachedir 

214 self.dircaches = {} 

215 self.modified = set() 

216 util.ensuredir(self.cachedir) 

217 

218 def get(self, abspath): 

219 ''' 

220 Try to get an item from the cache. 

221 

222 :param abspath: absolute path of the object to retrieve 

223 

224 :returns: a stored object is returned or None if nothing could be 

225 found. 

226 ''' 

227 

228 dircache = self._get_dircache_for(abspath) 

229 if abspath in dircache: 

230 return dircache[abspath] 

231 return None 

232 

233 def put(self, abspath, tfile): 

234 ''' 

235 Put an item into the cache. 

236 

237 :param abspath: absolute path of the object to be stored 

238 :param tfile: object to be stored 

239 ''' 

240 

241 cachepath = self._dircachepath(abspath) 

242 # get lock on cachepath here 

243 dircache = self._get_dircache(cachepath) 

244 dircache[abspath] = tfile 

245 self.modified.add(cachepath) 

246 

247 def dump_modified(self): 

248 ''' 

249 Save any modifications to disk. 

250 ''' 

251 

252 for cachepath in self.modified: 

253 self._dump_dircache(self.dircaches[cachepath], cachepath) 

254 # unlock 

255 

256 self.modified = set() 

257 

258 def clean(self): 

259 ''' 

260 Weed out missing files from the disk caches. 

261 ''' 

262 

263 self.dump_modified() 

264 

265 for fn in os.listdir(self.cachedir): 

266 if len(fn) == 40: 

267 cache = self._load_dircache(pjoin(self.cachedir, fn)) 

268 self._dump_dircache(cache, pjoin(self.cachedir, fn)) 

269 

270 def _get_dircache_for(self, abspath): 

271 return self._get_dircache(self._dircachepath(abspath)) 

272 

273 def _get_dircache(self, cachepath): 

274 if cachepath not in self.dircaches: 

275 if os.path.isfile(cachepath): 

276 self.dircaches[cachepath] = self._load_dircache(cachepath) 

277 else: 

278 self.dircaches[cachepath] = {} 

279 

280 return self.dircaches[cachepath] 

281 

282 def _dircachepath(self, abspath): 

283 cachefn = ehash(os.path.dirname(abspath)) 

284 return pjoin(self.cachedir, cachefn) 

285 

286 def _load_dircache(self, cachefilename): 

287 

288 with open(cachefilename, 'rb') as f: 

289 cache = pickle.load(f) 

290 

291 # weed out files which no longer exist 

292 for fn in list(cache.keys()): 

293 if not os.path.isfile(fn): 

294 del cache[fn] 

295 

296 time_float = util.get_time_float() 

297 

298 for v in cache.values(): 

299 v.trees_from_content(v.traces) 

300 for tr in v.traces: 

301 tr.file = v 

302 # fix Py2 codes to not include unicode when the cache file 

303 # was created with Py3 

304 if not isinstance(tr.station, str): 

305 tr.prune_from_reuse_cache() 

306 tr.set_codes( 

307 str(tr.network), 

308 str(tr.station), 

309 str(tr.location), 

310 str(tr.channel)) 

311 

312 tr.tmin = time_float(tr.tmin) 

313 tr.tmax = time_float(tr.tmax) 

314 

315 v.data_use_count = 0 

316 v.data_loaded = False 

317 v.fix_unicode_codes() 

318 

319 return cache 

320 

321 def _dump_dircache(self, cache, cachefilename): 

322 

323 if not cache: 

324 if os.path.exists(cachefilename): 

325 os.remove(cachefilename) 

326 return 

327 

328 # make a copy without the parents and the binsearch trees 

329 cache_copy = {} 

330 for fn in cache.keys(): 

331 trf = copy.copy(cache[fn]) 

332 trf.parent = None 

333 trf.by_tmin = None 

334 trf.by_tmax = None 

335 trf.by_tlen = None 

336 trf.by_mtime = None 

337 trf.data_use_count = 0 

338 trf.data_loaded = False 

339 traces = [] 

340 for tr in trf.traces: 

341 tr = tr.copy(data=False) 

342 tr.ydata = None 

343 tr.meta = None 

344 tr.file = trf 

345 traces.append(tr) 

346 

347 trf.traces = traces 

348 

349 cache_copy[fn] = trf 

350 

351 tmpfn = cachefilename+'.%i.tmp' % os.getpid() 

352 with open(tmpfn, 'wb') as f: 

353 pickle.dump(cache_copy, f, protocol=2) 

354 

355 if is_windows and os.path.exists(cachefilename): 

356 # windows doesn't allow to rename over existing file 

357 os.unlink(cachefilename) 

358 

359 os.rename(tmpfn, cachefilename) 

360 

361 

362def get_cache(cachedir): 

363 ''' 

364 Get global TracesFileCache object for given directory. 

365 ''' 

366 if cachedir not in TracesFileCache.caches: 

367 TracesFileCache.caches[cachedir] = TracesFileCache(cachedir) 

368 

369 return TracesFileCache.caches[cachedir] 

370 

371 

372def loader( 

373 filenames, fileformat, cache, filename_attributes, 

374 show_progress=True, update_progress=None): 

375 

376 if show_progress_force_off: 

377 show_progress = False 

378 

379 class Progress(object): 

380 def __init__(self, label, n): 

381 self._label = label 

382 self._n = n 

383 self._bar = None 

384 if show_progress: 

385 self._bar = util.progressbar(label, self._n) 

386 

387 if update_progress: 

388 update_progress(label, 0, self._n) 

389 

390 def update(self, i): 

391 if self._bar: 

392 if i < self._n-1: 

393 self._bar.update(i) 

394 else: 

395 self._bar.finish() 

396 self._bar = None 

397 

398 abort = False 

399 if update_progress: 

400 abort = update_progress(self._label, i, self._n) 

401 

402 return abort 

403 

404 def finish(self): 

405 if self._bar: 

406 self._bar.finish() 

407 self._bar = None 

408 

409 if not filenames: 

410 logger.warning('No files to load from') 

411 return 

412 

413 regex = None 

414 if filename_attributes: 

415 regex = re.compile(filename_attributes) 

416 

417 try: 

418 progress = Progress('Looking at files', len(filenames)) 

419 

420 failures = [] 

421 to_load = [] 

422 for i, filename in enumerate(filenames): 

423 try: 

424 abspath = os.path.abspath(filename) 

425 

426 substitutions = None 

427 if regex: 

428 m = regex.search(filename) 

429 if not m: 

430 raise FilenameAttributeError( 

431 "Cannot get attributes with pattern '%s' " 

432 "from path '%s'" % (filename_attributes, filename)) 

433 

434 substitutions = {} 

435 for k in m.groupdict(): 

436 if k in ('network', 'station', 'location', 'channel'): 

437 substitutions[k] = m.groupdict()[k] 

438 

439 mtime = os.stat(filename)[8] 

440 tfile = None 

441 if cache: 

442 tfile = cache.get(abspath) 

443 

444 mustload = ( 

445 not tfile or 

446 (tfile.format != fileformat and fileformat != 'detect') or 

447 tfile.mtime != mtime or 

448 substitutions is not None) 

449 

450 to_load.append( 

451 (mustload, mtime, abspath, substitutions, tfile)) 

452 

453 except (OSError, FilenameAttributeError) as xerror: 

454 failures.append(abspath) 

455 logger.warning(xerror) 

456 

457 abort = progress.update(i+1) 

458 if abort: 

459 progress.update(len(filenames)) 

460 return 

461 

462 progress.update(len(filenames)) 

463 

464 to_load.sort(key=lambda x: x[2]) 

465 

466 nload = len([1 for x in to_load if x[0]]) 

467 iload = 0 

468 

469 count_all = False 

470 if nload < 0.01*len(to_load): 

471 nload = len(to_load) 

472 count_all = True 

473 

474 if to_load: 

475 progress = Progress('Scanning files', nload) 

476 

477 for (mustload, mtime, abspath, substitutions, tfile) in to_load: 

478 try: 

479 if mustload: 

480 tfile = TracesFile( 

481 None, abspath, fileformat, 

482 substitutions=substitutions, mtime=mtime) 

483 

484 if cache and not substitutions: 

485 cache.put(abspath, tfile) 

486 

487 if not count_all: 

488 iload += 1 

489 

490 if count_all: 

491 iload += 1 

492 

493 except (io.FileLoadError, OSError) as xerror: 

494 failures.append(abspath) 

495 logger.warning(xerror) 

496 else: 

497 yield tfile 

498 

499 abort = progress.update(iload+1) 

500 if abort: 

501 break 

502 

503 progress.update(nload) 

504 

505 if failures: 

506 logger.warning( 

507 'The following file%s caused problems and will be ignored:\n' % 

508 util.plural_s(len(failures)) + '\n'.join(failures)) 

509 

510 if cache: 

511 cache.dump_modified() 

512 finally: 

513 progress.finish() 

514 

515 

516def tlen(x): 

517 return x.tmax-x.tmin 

518 

519 

520class TracesGroup(object): 

521 

522 ''' 

523 Trace container base class. 

524 

525 Base class for Pile, SubPile, and TracesFile, i.e. anything containing 

526 a collection of several traces. A TracesGroup object maintains lookup sets 

527 of some of the traces meta-information, as well as a combined time-range 

528 of its contents. 

529 ''' 

530 

531 def __init__(self, parent): 

532 self.parent = parent 

533 self.empty() 

534 self.nupdates = 0 

535 self.abspath = None 

536 

537 def set_parent(self, parent): 

538 self.parent = parent 

539 

540 def get_parent(self): 

541 return self.parent 

542 

543 def empty(self): 

544 self.networks, self.stations, self.locations, self.channels, \ 

545 self.nslc_ids, self.deltats = [Counter() for x in range(6)] 

546 self.by_tmin = Sorted([], 'tmin') 

547 self.by_tmax = Sorted([], 'tmax') 

548 self.by_tlen = Sorted([], tlen) 

549 self.by_mtime = Sorted([], 'mtime') 

550 self.tmin, self.tmax = None, None 

551 self.deltatmin, self.deltatmax = None, None 

552 

553 def trees_from_content(self, content): 

554 self.by_tmin = Sorted(content, 'tmin') 

555 self.by_tmax = Sorted(content, 'tmax') 

556 self.by_tlen = Sorted(content, tlen) 

557 self.by_mtime = Sorted(content, 'mtime') 

558 self.adjust_minmax() 

559 

560 def fix_unicode_codes(self): 

561 for net in self.networks: 

562 if isinstance(net, str): 

563 return 

564 

565 self.networks = fix_unicode_copy(self.networks, str) 

566 self.stations = fix_unicode_copy(self.stations, str) 

567 self.locations = fix_unicode_copy(self.locations, str) 

568 self.channels = fix_unicode_copy(self.channels, str) 

569 self.nslc_ids = fix_unicode_copy( 

570 self.nslc_ids, lambda k: tuple(str(x) for x in k)) 

571 

572 def add(self, content): 

573 ''' 

574 Add content to traces group and update indices. 

575 

576 Accepts :py:class:`pyrocko.trace.Trace` objects and 

577 :py:class:`pyrocko.pile.TracesGroup` objects. 

578 ''' 

579 

580 if isinstance(content, (trace.Trace, TracesGroup)): 

581 content = [content] 

582 

583 for c in content: 

584 

585 if isinstance(c, TracesGroup): 

586 self.networks.update(c.networks) 

587 self.stations.update(c.stations) 

588 self.locations.update(c.locations) 

589 self.channels.update(c.channels) 

590 self.nslc_ids.update(c.nslc_ids) 

591 self.deltats.update(c.deltats) 

592 

593 self.by_tmin.insert_many(c.by_tmin) 

594 self.by_tmax.insert_many(c.by_tmax) 

595 self.by_tlen.insert_many(c.by_tlen) 

596 self.by_mtime.insert_many(c.by_mtime) 

597 

598 elif isinstance(c, trace.Trace): 

599 self.networks[c.network] += 1 

600 self.stations[c.station] += 1 

601 self.locations[c.location] += 1 

602 self.channels[c.channel] += 1 

603 self.nslc_ids[c.nslc_id] += 1 

604 self.deltats[c.deltat] += 1 

605 

606 self.by_tmin.insert(c) 

607 self.by_tmax.insert(c) 

608 self.by_tlen.insert(c) 

609 self.by_mtime.insert(c) 

610 

611 self.adjust_minmax() 

612 

613 self.nupdates += 1 

614 self.notify_listeners('add', content) 

615 

616 if self.parent is not None: 

617 self.parent.add(content) 

618 

619 def remove(self, content): 

620 ''' 

621 Remove content to traces group and update indices. 

622 ''' 

623 if isinstance(content, (trace.Trace, TracesGroup)): 

624 content = [content] 

625 

626 for c in content: 

627 

628 if isinstance(c, TracesGroup): 

629 self.networks.subtract(c.networks) 

630 self.stations.subtract(c.stations) 

631 self.locations.subtract(c.locations) 

632 self.channels.subtract(c.channels) 

633 self.nslc_ids.subtract(c.nslc_ids) 

634 self.deltats.subtract(c.deltats) 

635 

636 self.by_tmin.remove_many(c.by_tmin) 

637 self.by_tmax.remove_many(c.by_tmax) 

638 self.by_tlen.remove_many(c.by_tlen) 

639 self.by_mtime.remove_many(c.by_mtime) 

640 

641 elif isinstance(c, trace.Trace): 

642 self.networks.subtract1(c.network) 

643 self.stations.subtract1(c.station) 

644 self.locations.subtract1(c.location) 

645 self.channels.subtract1(c.channel) 

646 self.nslc_ids.subtract1(c.nslc_id) 

647 self.deltats.subtract1(c.deltat) 

648 

649 self.by_tmin.remove(c) 

650 self.by_tmax.remove(c) 

651 self.by_tlen.remove(c) 

652 self.by_mtime.remove(c) 

653 

654 self.adjust_minmax() 

655 

656 self.nupdates += 1 

657 self.notify_listeners('remove', content) 

658 

659 if self.parent is not None: 

660 self.parent.remove(content) 

661 

662 def relevant(self, tmin, tmax, group_selector=None, trace_selector=None): 

663 ''' 

664 Return list of :py:class:`pyrocko.trace.Trace` objects where given 

665 arguments ``tmin`` and ``tmax`` match. 

666 

667 :param tmin: start time 

668 :param tmax: end time 

669 :param group_selector: lambda expression taking group dict of regex 

670 match object as a single argument and which returns true or false 

671 to keep or reject a file (default: ``None``) 

672 :param trace_selector: lambda expression taking group dict of regex 

673 match object as a single argument and which returns true or false 

674 to keep or reject a file (default: ``None``) 

675 ''' 

676 

677 if not self.by_tmin or not self.is_relevant( 

678 tmin, tmax, group_selector): 

679 

680 return [] 

681 

682 return [tr for tr in self.by_tmin.with_key_in(tmin-self.tlenmax, tmax) 

683 if tr.is_relevant(tmin, tmax, trace_selector)] 

684 

685 def adjust_minmax(self): 

686 if self.by_tmin: 

687 self.tmin = self.by_tmin.min().tmin 

688 self.tmax = self.by_tmax.max().tmax 

689 t = self.by_tlen.max() 

690 self.tlenmax = t.tmax - t.tmin 

691 self.mtime = self.by_mtime.max().mtime 

692 deltats = list(self.deltats.keys()) 

693 self.deltatmin = min(deltats) 

694 self.deltatmax = max(deltats) 

695 else: 

696 self.tmin = None 

697 self.tmax = None 

698 self.tlenmax = None 

699 self.mtime = None 

700 self.deltatmin = None 

701 self.deltatmax = None 

702 

703 def notify_listeners(self, what, content): 

704 pass 

705 

706 def get_update_count(self): 

707 return self.nupdates 

708 

709 def overlaps(self, tmin, tmax): 

710 return self.tmin is not None \ 

711 and tmax >= self.tmin and self.tmax >= tmin 

712 

713 def is_relevant(self, tmin, tmax, group_selector=None): 

714 if self.tmin is None or self.tmax is None: 

715 return False 

716 return tmax >= self.tmin and self.tmax >= tmin and ( 

717 group_selector is None or group_selector(self)) 

718 

719 

720class MemTracesFile(TracesGroup): 

721 

722 ''' 

723 This is needed to make traces without an actual disc file to be inserted 

724 into a Pile. 

725 ''' 

726 

727 def __init__(self, parent, traces): 

728 TracesGroup.__init__(self, parent) 

729 self.add(traces) 

730 self.mtime = time.time() 

731 

732 def add(self, traces): 

733 if isinstance(traces, trace.Trace): 

734 traces = [traces] 

735 

736 for tr in traces: 

737 tr.file = self 

738 

739 TracesGroup.add(self, traces) 

740 

741 def load_headers(self, mtime=None): 

742 pass 

743 

744 def load_data(self): 

745 pass 

746 

747 def use_data(self): 

748 pass 

749 

750 def drop_data(self): 

751 pass 

752 

753 def reload_if_modified(self): 

754 return False 

755 

756 def iter_traces(self): 

757 for tr in self.by_tmin: 

758 yield tr 

759 

760 def get_traces(self): 

761 return list(self.by_tmin) 

762 

763 def gather_keys(self, gather, selector=None): 

764 keys = set() 

765 for tr in self.by_tmin: 

766 if selector is None or selector(tr): 

767 keys.add(gather(tr)) 

768 

769 return keys 

770 

771 def __str__(self): 

772 

773 s = 'MemTracesFile\n' 

774 s += 'file mtime: %s\n' % util.time_to_str(self.mtime) 

775 s += 'number of traces: %i\n' % len(self.by_tmin) 

776 s += 'timerange: %s - %s\n' % ( 

777 util.time_to_str(self.tmin), util.time_to_str(self.tmax)) 

778 s += 'networks: %s\n' % ', '.join(sl(self.networks.keys())) 

779 s += 'stations: %s\n' % ', '.join(sl(self.stations.keys())) 

780 s += 'locations: %s\n' % ', '.join(sl(self.locations.keys())) 

781 s += 'channels: %s\n' % ', '.join(sl(self.channels.keys())) 

782 s += 'deltats: %s\n' % ', '.join(sl(self.deltats.keys())) 

783 return s 

784 

785 

786class TracesFile(TracesGroup): 

787 def __init__( 

788 self, parent, abspath, format, 

789 substitutions=None, mtime=None): 

790 

791 TracesGroup.__init__(self, parent) 

792 self.abspath = abspath 

793 self.format = format 

794 self.traces = [] 

795 self.data_loaded = False 

796 self.data_use_count = 0 

797 self.substitutions = substitutions 

798 self.load_headers(mtime=mtime) 

799 self.mtime = mtime 

800 

801 def load_headers(self, mtime=None): 

802 logger.debug('loading headers from file: %s' % self.abspath) 

803 if mtime is None: 

804 self.mtime = os.stat(self.abspath)[8] 

805 

806 def kgen(tr): 

807 return (tr.mtime, tr.tmin, tr.tmax) + tr.nslc_id 

808 

809 self.remove(self.traces) 

810 ks = set() 

811 for tr in io.load(self.abspath, 

812 format=self.format, 

813 getdata=False, 

814 substitutions=self.substitutions): 

815 

816 k = kgen(tr) 

817 if k not in ks: 

818 ks.add(k) 

819 self.traces.append(tr) 

820 tr.file = self 

821 

822 self.add(self.traces) 

823 

824 self.data_loaded = False 

825 self.data_use_count = 0 

826 

827 def load_data(self, force=False): 

828 file_changed = False 

829 if not self.data_loaded or force: 

830 logger.debug('loading data from file: %s' % self.abspath) 

831 

832 def kgen(tr): 

833 return (tr.mtime, tr.tmin, tr.tmax) + tr.nslc_id 

834 

835 traces_ = io.load(self.abspath, format=self.format, getdata=True, 

836 substitutions=self.substitutions) 

837 

838 # prevent adding duplicate snippets from corrupt mseed files 

839 k_loaded = set() 

840 traces = [] 

841 for tr in traces_: 

842 k = kgen(tr) 

843 if k not in k_loaded: 

844 k_loaded.add(k) 

845 traces.append(tr) 

846 

847 k_current_d = dict((kgen(tr), tr) for tr in self.traces) 

848 k_current = set(k_current_d) 

849 k_new = k_loaded - k_current 

850 k_delete = k_current - k_loaded 

851 k_unchanged = k_current & k_loaded 

852 

853 for tr in self.traces[:]: 

854 if kgen(tr) in k_delete: 

855 self.remove(tr) 

856 self.traces.remove(tr) 

857 tr.file = None 

858 file_changed = True 

859 

860 for tr in traces: 

861 if kgen(tr) in k_new: 

862 tr.file = self 

863 self.traces.append(tr) 

864 self.add(tr) 

865 file_changed = True 

866 

867 for tr in traces: 

868 if kgen(tr) in k_unchanged: 

869 ctr = k_current_d[kgen(tr)] 

870 ctr.ydata = tr.ydata 

871 

872 self.data_loaded = True 

873 

874 if file_changed: 

875 logger.debug('reloaded (file may have changed): %s' % self.abspath) 

876 

877 return file_changed 

878 

879 def use_data(self): 

880 if not self.data_loaded: 

881 raise Exception('Data not loaded') 

882 self.data_use_count += 1 

883 

884 def drop_data(self): 

885 if self.data_loaded: 

886 if self.data_use_count == 1: 

887 logger.debug('forgetting data of file: %s' % self.abspath) 

888 for tr in self.traces: 

889 tr.drop_data() 

890 

891 self.data_loaded = False 

892 

893 self.data_use_count -= 1 

894 else: 

895 self.data_use_count = 0 

896 

897 def reload_if_modified(self): 

898 mtime = os.stat(self.abspath)[8] 

899 if mtime != self.mtime: 

900 logger.debug( 

901 'mtime=%i, reloading file: %s' % (mtime, self.abspath)) 

902 

903 self.mtime = mtime 

904 if self.data_loaded: 

905 self.load_data(force=True) 

906 else: 

907 self.load_headers() 

908 

909 return True 

910 

911 return False 

912 

913 def iter_traces(self): 

914 for tr in self.traces: 

915 yield tr 

916 

917 def gather_keys(self, gather, selector=None): 

918 keys = set() 

919 for tr in self.by_tmin: 

920 if selector is None or selector(tr): 

921 keys.add(gather(tr)) 

922 

923 return keys 

924 

925 def __str__(self): 

926 s = 'TracesFile\n' 

927 s += 'abspath: %s\n' % self.abspath 

928 s += 'file mtime: %s\n' % util.time_to_str(self.mtime) 

929 s += 'number of traces: %i\n' % len(self.traces) 

930 s += 'timerange: %s - %s\n' % ( 

931 util.time_to_str(self.tmin), util.time_to_str(self.tmax)) 

932 s += 'networks: %s\n' % ', '.join(sl(self.networks.keys())) 

933 s += 'stations: %s\n' % ', '.join(sl(self.stations.keys())) 

934 s += 'locations: %s\n' % ', '.join(sl(self.locations.keys())) 

935 s += 'channels: %s\n' % ', '.join(sl(self.channels.keys())) 

936 s += 'deltats: %s\n' % ', '.join(sl(self.deltats.keys())) 

937 return s 

938 

939 

940class FilenameAttributeError(Exception): 

941 pass 

942 

943 

944class SubPile(TracesGroup): 

945 def __init__(self, parent): 

946 TracesGroup.__init__(self, parent) 

947 self.files = [] 

948 self.empty() 

949 

950 def add_file(self, file): 

951 self.files.append(file) 

952 file.set_parent(self) 

953 self.add(file) 

954 

955 def remove_file(self, file): 

956 self.files.remove(file) 

957 file.set_parent(None) 

958 self.remove(file) 

959 

960 def remove_files(self, files): 

961 for file in files: 

962 self.files.remove(file) 

963 file.set_parent(None) 

964 self.remove(files) 

965 

966 def gather_keys(self, gather, selector=None): 

967 keys = set() 

968 for file in self.files: 

969 keys |= file.gather_keys(gather, selector) 

970 

971 return keys 

972 

973 def iter_traces( 

974 self, 

975 load_data=False, 

976 return_abspath=False, 

977 group_selector=None, 

978 trace_selector=None): 

979 

980 for file in self.files: 

981 

982 if group_selector and not group_selector(file): 

983 continue 

984 

985 must_drop = False 

986 if load_data: 

987 file.load_data() 

988 file.use_data() 

989 must_drop = True 

990 

991 for tr in file.iter_traces(): 

992 if trace_selector and not trace_selector(tr): 

993 continue 

994 

995 if return_abspath: 

996 yield file.abspath, tr 

997 else: 

998 yield tr 

999 

1000 if must_drop: 

1001 file.drop_data() 

1002 

1003 def iter_files(self): 

1004 for file in self.files: 

1005 yield file 

1006 

1007 def reload_modified(self): 

1008 modified = False 

1009 for file in self.files: 

1010 modified |= file.reload_if_modified() 

1011 

1012 return modified 

1013 

1014 def __str__(self): 

1015 s = 'SubPile\n' 

1016 s += 'number of files: %i\n' % len(self.files) 

1017 s += 'timerange: %s - %s\n' % ( 

1018 util.time_to_str(self.tmin), util.time_to_str(self.tmax)) 

1019 s += 'networks: %s\n' % ', '.join(sl(self.networks.keys())) 

1020 s += 'stations: %s\n' % ', '.join(sl(self.stations.keys())) 

1021 s += 'locations: %s\n' % ', '.join(sl(self.locations.keys())) 

1022 s += 'channels: %s\n' % ', '.join(sl(self.channels.keys())) 

1023 s += 'deltats: %s\n' % ', '.join(sl(self.deltats.keys())) 

1024 return s 

1025 

1026 

1027class Batch(object): 

1028 ''' 

1029 Batch of waveforms from window wise data extraction. 

1030 

1031 Encapsulates state and results yielded for each window in window wise 

1032 waveform extraction with the :py:meth:`Pile.chopper` method (when the 

1033 `style='batch'` keyword argument set). 

1034 

1035 *Attributes:* 

1036 

1037 .. py:attribute:: tmin 

1038 

1039 Start of this time window. 

1040 

1041 .. py:attribute:: tmax 

1042 

1043 End of this time window. 

1044 

1045 .. py:attribute:: i 

1046 

1047 Index of this time window in sequence. 

1048 

1049 .. py:attribute:: n 

1050 

1051 Total number of time windows in sequence. 

1052 

1053 .. py:attribute:: traces 

1054 

1055 Extracted waveforms for this time window. 

1056 ''' 

1057 

1058 def __init__(self, tmin, tmax, i, n, traces): 

1059 self.tmin = tmin 

1060 self.tmax = tmax 

1061 self.i = i 

1062 self.n = n 

1063 self.traces = traces 

1064 

1065 

1066class Pile(TracesGroup): 

1067 ''' 

1068 Waveform archive lookup, data loading and caching infrastructure. 

1069 ''' 

1070 

1071 def __init__(self): 

1072 TracesGroup.__init__(self, None) 

1073 self.subpiles = {} 

1074 self.open_files = {} 

1075 self.listeners = [] 

1076 self.abspaths = set() 

1077 

1078 def add_listener(self, obj): 

1079 self.listeners.append(util.smart_weakref(obj)) 

1080 

1081 def notify_listeners(self, what, content): 

1082 for ref in self.listeners: 

1083 obj = ref() 

1084 if obj: 

1085 obj(what, content) 

1086 

1087 def load_files( 

1088 self, filenames, 

1089 filename_attributes=None, 

1090 fileformat='mseed', 

1091 cache=None, 

1092 show_progress=True, 

1093 update_progress=None): 

1094 

1095 load = loader( 

1096 filenames, fileformat, cache, filename_attributes, 

1097 show_progress=show_progress, 

1098 update_progress=update_progress) 

1099 

1100 self.add_files(load) 

1101 

1102 def add_files(self, files): 

1103 for file in files: 

1104 self.add_file(file) 

1105 

1106 def add_file(self, file): 

1107 if file.abspath is not None and file.abspath in self.abspaths: 

1108 logger.warning('File already in pile: %s' % file.abspath) 

1109 return 

1110 

1111 if file.deltatmin is None: 

1112 logger.warning('Sampling rate of all traces are zero in file: %s' % 

1113 file.abspath) 

1114 return 

1115 

1116 subpile = self.dispatch(file) 

1117 subpile.add_file(file) 

1118 if file.abspath is not None: 

1119 self.abspaths.add(file.abspath) 

1120 

1121 def remove_file(self, file): 

1122 subpile = file.get_parent() 

1123 if subpile is not None: 

1124 subpile.remove_file(file) 

1125 if file.abspath is not None: 

1126 self.abspaths.remove(file.abspath) 

1127 

1128 def remove_files(self, files): 

1129 subpile_files = {} 

1130 for file in files: 

1131 subpile = file.get_parent() 

1132 if subpile not in subpile_files: 

1133 subpile_files[subpile] = [] 

1134 

1135 subpile_files[subpile].append(file) 

1136 

1137 for subpile, files in subpile_files.items(): 

1138 subpile.remove_files(files) 

1139 for file in files: 

1140 if file.abspath is not None: 

1141 self.abspaths.remove(file.abspath) 

1142 

1143 def dispatch_key(self, file): 

1144 dt = int(math.floor(math.log(file.deltatmin))) 

1145 return dt 

1146 

1147 def dispatch(self, file): 

1148 k = self.dispatch_key(file) 

1149 if k not in self.subpiles: 

1150 self.subpiles[k] = SubPile(self) 

1151 

1152 return self.subpiles[k] 

1153 

1154 def get_deltats(self): 

1155 return list(self.deltats.keys()) 

1156 

1157 def chop( 

1158 self, tmin, tmax, 

1159 group_selector=None, 

1160 trace_selector=None, 

1161 snap=(round, round), 

1162 include_last=False, 

1163 load_data=True): 

1164 

1165 chopped = [] 

1166 used_files = set() 

1167 

1168 traces = self.relevant(tmin, tmax, group_selector, trace_selector) 

1169 if load_data: 

1170 files_changed = False 

1171 for tr in traces: 

1172 if tr.file and tr.file not in used_files: 

1173 if tr.file.load_data(): 

1174 files_changed = True 

1175 

1176 if tr.file is not None: 

1177 used_files.add(tr.file) 

1178 

1179 if files_changed: 

1180 traces = self.relevant( 

1181 tmin, tmax, group_selector, trace_selector) 

1182 

1183 for tr in traces: 

1184 if not load_data and tr.ydata is not None: 

1185 tr = tr.copy(data=False) 

1186 tr.ydata = None 

1187 

1188 try: 

1189 chopped.append(tr.chop( 

1190 tmin, tmax, 

1191 inplace=False, 

1192 snap=snap, 

1193 include_last=include_last)) 

1194 

1195 except trace.NoData: 

1196 pass 

1197 

1198 return chopped, used_files 

1199 

1200 def _process_chopped( 

1201 self, chopped, degap, maxgap, maxlap, want_incomplete, wmax, wmin, 

1202 tpad): 

1203 

1204 chopped.sort(key=lambda a: a.full_id) 

1205 if degap: 

1206 chopped = degapper(chopped, maxgap=maxgap, maxlap=maxlap) 

1207 

1208 if not want_incomplete: 

1209 chopped_weeded = [] 

1210 for tr in chopped: 

1211 emin = tr.tmin - (wmin-tpad) 

1212 emax = tr.tmax + tr.deltat - (wmax+tpad) 

1213 if (abs(emin) <= 0.5*tr.deltat and abs(emax) <= 0.5*tr.deltat): 

1214 chopped_weeded.append(tr) 

1215 

1216 elif degap: 

1217 if (0. < emin <= 5. * tr.deltat and 

1218 -5. * tr.deltat <= emax < 0.): 

1219 

1220 tr.extend( 

1221 wmin-tpad, 

1222 wmax+tpad-tr.deltat, 

1223 fillmethod='repeat') 

1224 

1225 chopped_weeded.append(tr) 

1226 

1227 chopped = chopped_weeded 

1228 

1229 for tr in chopped: 

1230 tr.wmin = wmin 

1231 tr.wmax = wmax 

1232 

1233 return chopped 

1234 

1235 def chopper( 

1236 self, 

1237 tmin=None, tmax=None, tinc=None, tpad=0., 

1238 group_selector=None, trace_selector=None, 

1239 want_incomplete=True, degap=True, maxgap=5, maxlap=None, 

1240 keep_current_files_open=False, accessor_id=None, 

1241 snap=(round, round), include_last=False, load_data=True, 

1242 style=None): 

1243 

1244 ''' 

1245 Get iterator for shifting window wise data extraction from waveform 

1246 archive. 

1247 

1248 :param tmin: start time (default uses start time of available data) 

1249 :param tmax: end time (default uses end time of available data) 

1250 :param tinc: time increment (window shift time) (default uses 

1251 ``tmax-tmin``) 

1252 :param tpad: padding time appended on either side of the data windows 

1253 (window overlap is ``2*tpad``) 

1254 :param group_selector: filter callback taking :py:class:`TracesGroup` 

1255 objects 

1256 :param trace_selector: filter callback taking 

1257 :py:class:`pyrocko.trace.Trace` objects 

1258 :param want_incomplete: if set to ``False``, gappy/incomplete traces 

1259 are discarded from the results 

1260 :param degap: whether to try to connect traces and to remove gaps and 

1261 overlaps 

1262 :param maxgap: maximum gap size in samples which is filled with 

1263 interpolated samples when ``degap`` is ``True`` 

1264 :param maxlap: maximum overlap size in samples which is removed when 

1265 ``degap`` is ``True`` 

1266 :param keep_current_files_open: whether to keep cached trace data in 

1267 memory after the iterator has ended 

1268 :param accessor_id: if given, used as a key to identify different 

1269 points of extraction for the decision of when to release cached 

1270 trace data (should be used when data is alternately extracted from 

1271 more than one region / selection) 

1272 :param snap: replaces Python's :py:func:`round` function which is used 

1273 to determine indices where to start and end the trace data array 

1274 :param include_last: whether to include last sample 

1275 :param load_data: whether to load the waveform data. If set to 

1276 ``False``, traces with no data samples, but with correct 

1277 meta-information are returned 

1278 :param style: set to ``'batch'`` to yield waveforms and information 

1279 about the chopper state as :py:class:`Batch` objects. By default 

1280 lists of :py:class:`pyrocko.trace.Trace` objects are yielded. 

1281 :returns: iterator providing extracted waveforms for each extracted 

1282 window. See ``style`` argument for details. 

1283 ''' 

1284 if tmin is None: 

1285 if self.tmin is None: 

1286 logger.warning("Pile's tmin is not set - pile may be empty.") 

1287 return 

1288 tmin = self.tmin + tpad 

1289 

1290 if tmax is None: 

1291 if self.tmax is None: 

1292 logger.warning("Pile's tmax is not set - pile may be empty.") 

1293 return 

1294 tmax = self.tmax - tpad 

1295 

1296 if not self.is_relevant(tmin-tpad, tmax+tpad, group_selector): 

1297 return 

1298 

1299 if accessor_id not in self.open_files: 

1300 self.open_files[accessor_id] = set() 

1301 

1302 open_files = self.open_files[accessor_id] 

1303 

1304 if tinc is None: 

1305 tinc = tmax - tmin 

1306 nwin = 1 

1307 else: 

1308 eps = tinc * 1e-6 

1309 if tinc != 0.0: 

1310 nwin = int(((tmax - eps) - tmin) / tinc) + 1 

1311 else: 

1312 nwin = 1 

1313 

1314 for iwin in range(nwin): 

1315 wmin, wmax = tmin+iwin*tinc, min(tmin+(iwin+1)*tinc, tmax) 

1316 

1317 chopped, used_files = self.chop( 

1318 wmin-tpad, wmax+tpad, group_selector, trace_selector, snap, 

1319 include_last, load_data) 

1320 

1321 for file in used_files - open_files: 

1322 # increment datause counter on newly opened files 

1323 file.use_data() 

1324 

1325 open_files.update(used_files) 

1326 

1327 processed = self._process_chopped( 

1328 chopped, degap, maxgap, maxlap, want_incomplete, wmax, wmin, 

1329 tpad) 

1330 

1331 if style == 'batch': 

1332 yield Batch( 

1333 tmin=wmin, 

1334 tmax=wmax, 

1335 i=iwin, 

1336 n=nwin, 

1337 traces=processed) 

1338 

1339 else: 

1340 yield processed 

1341 

1342 unused_files = open_files - used_files 

1343 

1344 while unused_files: 

1345 file = unused_files.pop() 

1346 file.drop_data() 

1347 open_files.remove(file) 

1348 

1349 if not keep_current_files_open: 

1350 while open_files: 

1351 file = open_files.pop() 

1352 file.drop_data() 

1353 

1354 def all(self, *args, **kwargs): 

1355 ''' 

1356 Shortcut to aggregate :py:meth:`chopper` output into a single list. 

1357 ''' 

1358 

1359 alltraces = [] 

1360 for traces in self.chopper(*args, **kwargs): 

1361 alltraces.extend(traces) 

1362 

1363 return alltraces 

1364 

1365 def iter_all(self, *args, **kwargs): 

1366 for traces in self.chopper(*args, **kwargs): 

1367 for tr in traces: 

1368 yield tr 

1369 

1370 def chopper_grouped(self, gather, progress=None, *args, **kwargs): 

1371 keys = self.gather_keys(gather) 

1372 if len(keys) == 0: 

1373 return 

1374 

1375 outer_group_selector = None 

1376 if 'group_selector' in kwargs: 

1377 outer_group_selector = kwargs['group_selector'] 

1378 

1379 outer_trace_selector = None 

1380 if 'trace_selector' in kwargs: 

1381 outer_trace_selector = kwargs['trace_selector'] 

1382 

1383 # the use of this gather-cache makes it impossible to modify the pile 

1384 # during chopping 

1385 gather_cache = {} 

1386 pbar = None 

1387 try: 

1388 if progress is not None: 

1389 pbar = util.progressbar(progress, len(keys)) 

1390 

1391 for ikey, key in enumerate(keys): 

1392 def tsel(tr): 

1393 return gather(tr) == key and ( 

1394 outer_trace_selector is None 

1395 or outer_trace_selector(tr)) 

1396 

1397 def gsel(gr): 

1398 if gr not in gather_cache: 

1399 gather_cache[gr] = gr.gather_keys(gather) 

1400 

1401 return key in gather_cache[gr] and ( 

1402 outer_group_selector is None 

1403 or outer_group_selector(gr)) 

1404 

1405 kwargs['trace_selector'] = tsel 

1406 kwargs['group_selector'] = gsel 

1407 

1408 for traces in self.chopper(*args, **kwargs): 

1409 yield traces 

1410 

1411 if pbar: 

1412 pbar.update(ikey+1) 

1413 

1414 finally: 

1415 if pbar: 

1416 pbar.finish() 

1417 

1418 def gather_keys(self, gather, selector=None): 

1419 keys = set() 

1420 for subpile in self.subpiles.values(): 

1421 keys |= subpile.gather_keys(gather, selector) 

1422 

1423 return sorted(keys) 

1424 

1425 def iter_traces( 

1426 self, 

1427 load_data=False, 

1428 return_abspath=False, 

1429 group_selector=None, 

1430 trace_selector=None): 

1431 

1432 ''' 

1433 Iterate over all traces in pile. 

1434 

1435 :param load_data: whether to load the waveform data, by default empty 

1436 traces are yielded 

1437 :param return_abspath: if ``True`` yield tuples containing absolute 

1438 file path and :py:class:`pyrocko.trace.Trace` objects 

1439 :param group_selector: filter callback taking :py:class:`TracesGroup` 

1440 objects 

1441 :param trace_selector: filter callback taking 

1442 :py:class:`pyrocko.trace.Trace` objects 

1443 

1444 Example; yields only traces, where the station code is 'HH1':: 

1445 

1446 test_pile = pile.make_pile('/local/test_trace_directory') 

1447 for t in test_pile.iter_traces( 

1448 trace_selector=lambda tr: tr.station=='HH1'): 

1449 

1450 print t 

1451 ''' 

1452 

1453 for subpile in self.subpiles.values(): 

1454 if not group_selector or group_selector(subpile): 

1455 for tr in subpile.iter_traces(load_data, return_abspath, 

1456 group_selector, trace_selector): 

1457 yield tr 

1458 

1459 def iter_files(self): 

1460 for subpile in self.subpiles.values(): 

1461 for file in subpile.iter_files(): 

1462 yield file 

1463 

1464 def reload_modified(self): 

1465 modified = False 

1466 for subpile in self.subpiles.values(): 

1467 modified |= subpile.reload_modified() 

1468 

1469 return modified 

1470 

1471 def get_tmin(self): 

1472 return self.tmin 

1473 

1474 def get_tmax(self): 

1475 return self.tmax 

1476 

1477 def get_deltatmin(self): 

1478 return self.deltatmin 

1479 

1480 def get_deltatmax(self): 

1481 return self.deltatmax 

1482 

1483 def is_empty(self): 

1484 return self.tmin is None and self.tmax is None 

1485 

1486 def __str__(self): 

1487 if self.tmin is not None and self.tmax is not None: 

1488 tmin = util.time_to_str(self.tmin) 

1489 tmax = util.time_to_str(self.tmax) 

1490 s = 'Pile\n' 

1491 s += 'number of subpiles: %i\n' % len(self.subpiles) 

1492 s += 'timerange: %s - %s\n' % (tmin, tmax) 

1493 s += 'networks: %s\n' % ', '.join(sl(self.networks.keys())) 

1494 s += 'stations: %s\n' % ', '.join(sl(self.stations.keys())) 

1495 s += 'locations: %s\n' % ', '.join(sl(self.locations.keys())) 

1496 s += 'channels: %s\n' % ', '.join(sl(self.channels.keys())) 

1497 s += 'deltats: %s\n' % ', '.join(sl(self.deltats.keys())) 

1498 

1499 else: 

1500 s = 'empty Pile' 

1501 

1502 return s 

1503 

1504 def snuffle(self, **kwargs): 

1505 ''' 

1506 Visualize it. 

1507 

1508 :param stations: list of :py:class:`pyrocko.model.Station` objects or 

1509 ``None`` 

1510 :param events: list of :py:class:`pyrocko.model.Event` objects or 

1511 ``None`` 

1512 :param markers: list of :py:class:`pyrocko.gui.snuffler.marker.Marker` 

1513 objects or ``None`` 

1514 :param ntracks: float, number of tracks to be shown initially 

1515 (default: 12) 

1516 :param follow: time interval (in seconds) for real time follow mode or 

1517 ``None`` 

1518 :param controls: bool, whether to show the main controls (default: 

1519 ``True``) 

1520 :param opengl: bool, whether to use opengl (default: ``False``) 

1521 ''' 

1522 

1523 from pyrocko.gui.snuffler.snuffler import snuffle 

1524 snuffle(self, **kwargs) 

1525 

1526 

1527def make_pile( 

1528 paths=None, selector=None, regex=None, 

1529 fileformat='mseed', 

1530 cachedirname=None, show_progress=True): 

1531 

1532 ''' 

1533 Create pile from given file and directory names. 

1534 

1535 :param paths: filenames and/or directories to look for traces. If paths is 

1536 ``None`` ``sys.argv[1:]`` is used. 

1537 :param selector: lambda expression taking group dict of regex match object 

1538 as a single argument and which returns true or false to keep or reject 

1539 a file 

1540 :param regex: regular expression which filenames have to match 

1541 :param fileformat: format of the files ('mseed', 'sac', 'kan', 

1542 'from_extension', 'detect') 

1543 :param cachedirname: loader cache is stored under this directory. It is 

1544 created as neccessary. 

1545 :param show_progress: show progress bar and other progress information 

1546 ''' 

1547 

1548 if show_progress_force_off: 

1549 show_progress = False 

1550 

1551 if isinstance(paths, str): 

1552 paths = [paths] 

1553 

1554 if paths is None: 

1555 paths = sys.argv[1:] 

1556 

1557 if cachedirname is None: 

1558 cachedirname = config.config().cache_dir 

1559 

1560 fns = util.select_files( 

1561 paths, include=regex, selector=selector, show_progress=show_progress) 

1562 

1563 cache = get_cache(cachedirname) 

1564 p = Pile() 

1565 p.load_files( 

1566 sorted(fns), 

1567 cache=cache, 

1568 fileformat=fileformat, 

1569 show_progress=show_progress) 

1570 

1571 return p 

1572 

1573 

1574class Injector(trace.States): 

1575 

1576 def __init__( 

1577 self, pile, 

1578 fixation_length=None, 

1579 path=None, 

1580 format='from_extension', 

1581 forget_fixed=False): 

1582 

1583 trace.States.__init__(self) 

1584 self._pile = pile 

1585 self._fixation_length = fixation_length 

1586 self._format = format 

1587 self._path = path 

1588 self._forget_fixed = forget_fixed 

1589 

1590 def set_fixation_length(self, length): 

1591 ''' 

1592 Set length after which the fixation method is called on buffer traces. 

1593 

1594 The length should be given in seconds. Give None to disable. 

1595 ''' 

1596 self.fixate_all() 

1597 self._fixation_length = length # in seconds 

1598 

1599 def set_save_path( 

1600 self, 

1601 path='dump_%(network)s.%(station)s.%(location)s.%(channel)s_' 

1602 '%(tmin)s_%(tmax)s.mseed'): 

1603 

1604 self.fixate_all() 

1605 self._path = path 

1606 

1607 def inject(self, trace): 

1608 logger.debug('Received a trace: %s' % trace) 

1609 

1610 buf = self.get(trace) 

1611 if buf is None: 

1612 trbuf = trace.copy() 

1613 buf = MemTracesFile(None, [trbuf]) 

1614 self._pile.add_file(buf) 

1615 self.set(trace, buf) 

1616 

1617 else: 

1618 self._pile.remove_file(buf) 

1619 trbuf = buf.get_traces()[0] 

1620 buf.remove(trbuf) 

1621 trbuf.append(trace.ydata) 

1622 buf.add(trbuf) 

1623 self._pile.add_file(buf) 

1624 self.set(trace, buf) 

1625 

1626 trbuf = buf.get_traces()[0] 

1627 if self._fixation_length is not None: 

1628 if trbuf.tmax - trbuf.tmin > self._fixation_length: 

1629 self._fixate(buf, complete=False) 

1630 

1631 def fixate_all(self): 

1632 for state in list(self._states.values()): 

1633 self._fixate(state[-1]) 

1634 

1635 self._states = {} 

1636 

1637 def free(self, buf): 

1638 self._fixate(buf) 

1639 

1640 def _fixate(self, buf, complete=True): 

1641 trbuf = buf.get_traces()[0] 

1642 del_state = True 

1643 if self._path: 

1644 if self._fixation_length is not None: 

1645 ttmin = trbuf.tmin 

1646 ytmin = util.year_start(ttmin) 

1647 n = int(math.floor((ttmin - ytmin) / self._fixation_length)) 

1648 tmin = ytmin + n*self._fixation_length 

1649 traces = [] 

1650 t = tmin 

1651 while t <= trbuf.tmax: 

1652 try: 

1653 traces.append( 

1654 trbuf.chop( 

1655 t, 

1656 t+self._fixation_length, 

1657 inplace=False, 

1658 snap=(math.ceil, math.ceil))) 

1659 

1660 except trace.NoData: 

1661 pass 

1662 t += self._fixation_length 

1663 

1664 if abs(traces[-1].tmax - (t - trbuf.deltat)) < \ 

1665 trbuf.deltat/100. or complete: 

1666 

1667 self._pile.remove_file(buf) 

1668 

1669 else: # reinsert incomplete last part 

1670 new_trbuf = traces.pop() 

1671 self._pile.remove_file(buf) 

1672 buf.remove(trbuf) 

1673 buf.add(new_trbuf) 

1674 self._pile.add_file(buf) 

1675 del_state = False 

1676 

1677 else: 

1678 traces = [trbuf] 

1679 self._pile.remove_file(buf) 

1680 

1681 fns = io.save(traces, self._path, format=self._format) 

1682 

1683 if not self._forget_fixed: 

1684 self._pile.load_files( 

1685 fns, show_progress=False, fileformat=self._format) 

1686 

1687 if del_state: 

1688 del self._states[trbuf.nslc_id] 

1689 

1690 def __del__(self): 

1691 self.fixate_all()