1# http://pyrocko.org - GPLv3 

2# 

3# The Pyrocko Developers, 21st Century 

4# ---|P------/S----------~Lg---------- 

5 

6import os 

7import logging 

8import time 

9import weakref 

10import copy 

11import re 

12import sys 

13import operator 

14import math 

15import hashlib 

16try: 

17 import cPickle as pickle 

18except ImportError: 

19 import pickle 

20 

21 

22from . import avl 

23from . import trace, io, util 

24from . import config 

25from .trace import degapper 

26 

27 

28is_windows = sys.platform.startswith('win') 

29show_progress_force_off = False 

30version_salt = 'v1-' 

31 

32 

33def ehash(s): 

34 return hashlib.sha1((version_salt + s).encode('utf8')).hexdigest() 

35 

36 

37def cmp(a, b): 

38 return int(a > b) - int(a < b) 

39 

40 

41def sl(s): 

42 return [str(x) for x in sorted(s)] 

43 

44 

45class Counter(dict): 

46 

47 def __missing__(self, k): 

48 return 0 

49 

50 def update(self, other): 

51 for k, v in other.items(): 

52 self[k] += v 

53 

54 def subtract(self, other): 

55 for k, v in other.items(): 

56 self[k] -= v 

57 if self[k] <= 0: 

58 del self[k] 

59 

60 def subtract1(self, k): 

61 self[k] -= 1 

62 if self[k] <= 0: 

63 del self[k] 

64 

65 

66def fix_unicode_copy(counter, func): 

67 counter_new = Counter() 

68 for k in counter: 

69 counter_new[func(k)] = counter[k] 

70 return counter_new 

71 

72 

73pjoin = os.path.join 

74logger = logging.getLogger('pyrocko.pile') 

75 

76 

77def avl_remove_exact(avltree, element): 

78 ilo, ihi = avltree.span(element) 

79 for i in range(ilo, ihi): 

80 if avltree[i] is element: 

81 avltree.remove_at(i) 

82 return i 

83 

84 raise ValueError( 

85 'avl_remove_exact(avltree, element): element not in avltree') 

86 

87 

88def cmpfunc(key): 

89 if isinstance(key, str): 

90 # special cases; these run about 50% faster than the generic one on 

91 # Python 2.5 

92 if key == 'tmin': 

93 return lambda a, b: cmp(a.tmin, b.tmin) 

94 if key == 'tmax': 

95 return lambda a, b: cmp(a.tmax, b.tmax) 

96 

97 key = operator.attrgetter(key) 

98 

99 return lambda a, b: cmp(key(a), key(b)) 

100 

101 

102g_dummys = {} 

103 

104 

105def get_dummy(key): 

106 if key not in g_dummys: 

107 class Dummy(object): 

108 def __init__(self, k): 

109 setattr(self, key, k) 

110 

111 g_dummys[key] = Dummy 

112 

113 return g_dummys[key] 

114 

115 

116class TooMany(Exception): 

117 def __init__(self, n): 

118 Exception.__init__(self) 

119 self.n = n 

120 

121 

122class Sorted(object): 

123 def __init__(self, values=[], key=None): 

124 self._set_key(key) 

125 self._avl = avl.new(values, self._cmp) 

126 

127 def _set_key(self, key): 

128 self._key = key 

129 self._cmp = cmpfunc(key) 

130 if isinstance(key, str): 

131 self._dummy = get_dummy(key) 

132 

133 def __getstate__(self): 

134 state = list(self._avl.iter()), self._key 

135 return state 

136 

137 def __setstate__(self, state): 

138 l, key = state 

139 self._set_key(key) 

140 self._avl = avl.from_iter(iter(l), len(l)) 

141 

142 def insert(self, value): 

143 self._avl.insert(value) 

144 

145 def remove(self, value): 

146 return avl_remove_exact(self._avl, value) 

147 

148 def remove_at(self, i): 

149 return self._avl.remove_at(i) 

150 

151 def insert_many(self, values): 

152 for value in values: 

153 self._avl.insert(value) 

154 

155 def remove_many(self, values): 

156 for value in values: 

157 avl_remove_exact(self._avl, value) 

158 

159 def __iter__(self): 

160 return iter(self._avl) 

161 

162 def with_key_in(self, kmin, kmax): 

163 omin, omax = self._dummy(kmin), self._dummy(kmax) 

164 ilo, ihi = self._avl.span(omin, omax) 

165 return self._avl[ilo:ihi] 

166 

167 def with_key_in_limited(self, kmin, kmax, nmax): 

168 omin, omax = self._dummy(kmin), self._dummy(kmax) 

169 ilo, ihi = self._avl.span(omin, omax) 

170 if ihi - ilo > nmax: 

171 raise TooMany(ihi - ilo) 

172 

173 return self._avl[ilo:ihi] 

174 

175 def index(self, value): 

176 ilo, ihi = self._avl.span(value) 

177 for i in range(ilo, ihi): 

178 if self._avl[i] is value: 

179 return i 

180 

181 raise ValueError('element is not in avl tree') 

182 

183 def min(self): 

184 return self._avl.min() 

185 

186 def max(self): 

187 return self._avl.max() 

188 

189 def __len__(self): 

190 return len(self._avl) 

191 

192 def __getitem__(self, i): 

193 return self._avl[i] 

194 

195 

196class TracesFileCache(object): 

197 ''' 

198 Manages trace metainformation cache. 

199 

200 For each directory with files containing traces, one cache file is 

201 maintained to hold the trace metainformation of all files which are 

202 contained in the directory. 

203 ''' 

204 

205 caches = {} 

206 

207 def __init__(self, cachedir): 

208 ''' 

209 Create new cache. 

210 

211 :param cachedir: directory to hold the cache files. 

212 ''' 

213 

214 self.cachedir = cachedir 

215 self.dircaches = {} 

216 self.modified = set() 

217 util.ensuredir(self.cachedir) 

218 

219 def get(self, abspath): 

220 ''' 

221 Try to get an item from the cache. 

222 

223 :param abspath: absolute path of the object to retrieve 

224 

225 :returns: a stored object is returned or None if nothing could be 

226 found. 

227 ''' 

228 

229 dircache = self._get_dircache_for(abspath) 

230 if abspath in dircache: 

231 return dircache[abspath] 

232 return None 

233 

234 def put(self, abspath, tfile): 

235 ''' 

236 Put an item into the cache. 

237 

238 :param abspath: absolute path of the object to be stored 

239 :param tfile: object to be stored 

240 ''' 

241 

242 cachepath = self._dircachepath(abspath) 

243 # get lock on cachepath here 

244 dircache = self._get_dircache(cachepath) 

245 dircache[abspath] = tfile 

246 self.modified.add(cachepath) 

247 

248 def dump_modified(self): 

249 ''' 

250 Save any modifications to disk. 

251 ''' 

252 

253 for cachepath in self.modified: 

254 self._dump_dircache(self.dircaches[cachepath], cachepath) 

255 # unlock 

256 

257 self.modified = set() 

258 

259 def clean(self): 

260 ''' 

261 Weed out missing files from the disk caches. 

262 ''' 

263 

264 self.dump_modified() 

265 

266 for fn in os.listdir(self.cachedir): 

267 if len(fn) == 40: 

268 cache = self._load_dircache(pjoin(self.cachedir, fn)) 

269 self._dump_dircache(cache, pjoin(self.cachedir, fn)) 

270 

271 def _get_dircache_for(self, abspath): 

272 return self._get_dircache(self._dircachepath(abspath)) 

273 

274 def _get_dircache(self, cachepath): 

275 if cachepath not in self.dircaches: 

276 if os.path.isfile(cachepath): 

277 self.dircaches[cachepath] = self._load_dircache(cachepath) 

278 else: 

279 self.dircaches[cachepath] = {} 

280 

281 return self.dircaches[cachepath] 

282 

283 def _dircachepath(self, abspath): 

284 cachefn = ehash(os.path.dirname(abspath)) 

285 return pjoin(self.cachedir, cachefn) 

286 

287 def _load_dircache(self, cachefilename): 

288 

289 with open(cachefilename, 'rb') as f: 

290 cache = pickle.load(f) 

291 

292 # weed out files which no longer exist 

293 for fn in list(cache.keys()): 

294 if not os.path.isfile(fn): 

295 del cache[fn] 

296 

297 time_float = util.get_time_float() 

298 

299 for v in cache.values(): 

300 v.trees_from_content(v.traces) 

301 for tr in v.traces: 

302 tr.file = v 

303 # fix Py2 codes to not include unicode when the cache file 

304 # was created with Py3 

305 if not isinstance(tr.station, str): 

306 tr.prune_from_reuse_cache() 

307 tr.set_codes( 

308 str(tr.network), 

309 str(tr.station), 

310 str(tr.location), 

311 str(tr.channel)) 

312 

313 tr.tmin = time_float(tr.tmin) 

314 tr.tmax = time_float(tr.tmax) 

315 

316 v.data_use_count = 0 

317 v.data_loaded = False 

318 v.fix_unicode_codes() 

319 

320 return cache 

321 

322 def _dump_dircache(self, cache, cachefilename): 

323 

324 if not cache: 

325 if os.path.exists(cachefilename): 

326 os.remove(cachefilename) 

327 return 

328 

329 # make a copy without the parents and the binsearch trees 

330 cache_copy = {} 

331 for fn in cache.keys(): 

332 trf = copy.copy(cache[fn]) 

333 trf.parent = None 

334 trf.by_tmin = None 

335 trf.by_tmax = None 

336 trf.by_tlen = None 

337 trf.by_mtime = None 

338 trf.data_use_count = 0 

339 trf.data_loaded = False 

340 traces = [] 

341 for tr in trf.traces: 

342 tr = tr.copy(data=False) 

343 tr.ydata = None 

344 tr.meta = None 

345 tr.file = trf 

346 traces.append(tr) 

347 

348 trf.traces = traces 

349 

350 cache_copy[fn] = trf 

351 

352 tmpfn = cachefilename+'.%i.tmp' % os.getpid() 

353 with open(tmpfn, 'wb') as f: 

354 pickle.dump(cache_copy, f, protocol=2) 

355 

356 if is_windows and os.path.exists(cachefilename): 

357 # windows doesn't allow to rename over existing file 

358 os.unlink(cachefilename) 

359 

360 os.rename(tmpfn, cachefilename) 

361 

362 

363def get_cache(cachedir): 

364 ''' 

365 Get global TracesFileCache object for given directory. 

366 ''' 

367 if cachedir not in TracesFileCache.caches: 

368 TracesFileCache.caches[cachedir] = TracesFileCache(cachedir) 

369 

370 return TracesFileCache.caches[cachedir] 

371 

372 

373def loader( 

374 filenames, fileformat, cache, filename_attributes, 

375 show_progress=True, update_progress=None): 

376 

377 if show_progress_force_off: 

378 show_progress = False 

379 

380 class Progress(object): 

381 def __init__(self, label, n): 

382 self._label = label 

383 self._n = n 

384 self._bar = None 

385 if show_progress: 

386 self._bar = util.progressbar(label, self._n) 

387 

388 if update_progress: 

389 update_progress(label, 0, self._n) 

390 

391 def update(self, i): 

392 if self._bar: 

393 if i < self._n-1: 

394 self._bar.update(i) 

395 else: 

396 self._bar.finish() 

397 self._bar = None 

398 

399 abort = False 

400 if update_progress: 

401 abort = update_progress(self._label, i, self._n) 

402 

403 return abort 

404 

405 def finish(self): 

406 if self._bar: 

407 self._bar.finish() 

408 self._bar = None 

409 

410 if not filenames: 

411 logger.warning('No files to load from') 

412 return 

413 

414 regex = None 

415 if filename_attributes: 

416 regex = re.compile(filename_attributes) 

417 

418 try: 

419 progress = Progress('Looking at files', len(filenames)) 

420 

421 failures = [] 

422 to_load = [] 

423 for i, filename in enumerate(filenames): 

424 try: 

425 abspath = os.path.abspath(filename) 

426 

427 substitutions = None 

428 if regex: 

429 m = regex.search(filename) 

430 if not m: 

431 raise FilenameAttributeError( 

432 "Cannot get attributes with pattern '%s' " 

433 "from path '%s'" % (filename_attributes, filename)) 

434 

435 substitutions = {} 

436 for k in m.groupdict(): 

437 if k in ('network', 'station', 'location', 'channel'): 

438 substitutions[k] = m.groupdict()[k] 

439 

440 mtime = os.stat(filename)[8] 

441 tfile = None 

442 if cache: 

443 tfile = cache.get(abspath) 

444 

445 mustload = ( 

446 not tfile or 

447 (tfile.format != fileformat and fileformat != 'detect') or 

448 tfile.mtime != mtime or 

449 substitutions is not None) 

450 

451 to_load.append( 

452 (mustload, mtime, abspath, substitutions, tfile)) 

453 

454 except (OSError, FilenameAttributeError) as xerror: 

455 failures.append(abspath) 

456 logger.warning(xerror) 

457 

458 abort = progress.update(i+1) 

459 if abort: 

460 progress.update(len(filenames)) 

461 return 

462 

463 progress.update(len(filenames)) 

464 

465 to_load.sort(key=lambda x: x[2]) 

466 

467 nload = len([1 for x in to_load if x[0]]) 

468 iload = 0 

469 

470 count_all = False 

471 if nload < 0.01*len(to_load): 

472 nload = len(to_load) 

473 count_all = True 

474 

475 if to_load: 

476 progress = Progress('Scanning files', nload) 

477 

478 for (mustload, mtime, abspath, substitutions, tfile) in to_load: 

479 try: 

480 if mustload: 

481 tfile = TracesFile( 

482 None, abspath, fileformat, 

483 substitutions=substitutions, mtime=mtime) 

484 

485 if cache and not substitutions: 

486 cache.put(abspath, tfile) 

487 

488 if not count_all: 

489 iload += 1 

490 

491 if count_all: 

492 iload += 1 

493 

494 except (io.FileLoadError, OSError) as xerror: 

495 failures.append(abspath) 

496 logger.warning(xerror) 

497 else: 

498 yield tfile 

499 

500 abort = progress.update(iload+1) 

501 if abort: 

502 break 

503 

504 progress.update(nload) 

505 

506 if failures: 

507 logger.warning( 

508 'The following file%s caused problems and will be ignored:\n' % 

509 util.plural_s(len(failures)) + '\n'.join(failures)) 

510 

511 if cache: 

512 cache.dump_modified() 

513 finally: 

514 progress.finish() 

515 

516 

517def tlen(x): 

518 return x.tmax-x.tmin 

519 

520 

521class TracesGroup(object): 

522 

523 ''' 

524 Trace container base class. 

525 

526 Base class for Pile, SubPile, and TracesFile, i.e. anything containing 

527 a collection of several traces. A TracesGroup object maintains lookup sets 

528 of some of the traces meta-information, as well as a combined time-range 

529 of its contents. 

530 ''' 

531 

532 def __init__(self, parent): 

533 self.parent = parent 

534 self.empty() 

535 self.nupdates = 0 

536 self.abspath = None 

537 

538 def set_parent(self, parent): 

539 self.parent = parent 

540 

541 def get_parent(self): 

542 return self.parent 

543 

544 def empty(self): 

545 self.networks, self.stations, self.locations, self.channels, \ 

546 self.nslc_ids, self.deltats = [Counter() for x in range(6)] 

547 self.by_tmin = Sorted([], 'tmin') 

548 self.by_tmax = Sorted([], 'tmax') 

549 self.by_tlen = Sorted([], tlen) 

550 self.by_mtime = Sorted([], 'mtime') 

551 self.tmin, self.tmax = None, None 

552 self.deltatmin, self.deltatmax = None, None 

553 

554 def trees_from_content(self, content): 

555 self.by_tmin = Sorted(content, 'tmin') 

556 self.by_tmax = Sorted(content, 'tmax') 

557 self.by_tlen = Sorted(content, tlen) 

558 self.by_mtime = Sorted(content, 'mtime') 

559 self.adjust_minmax() 

560 

561 def fix_unicode_codes(self): 

562 for net in self.networks: 

563 if isinstance(net, str): 

564 return 

565 

566 self.networks = fix_unicode_copy(self.networks, str) 

567 self.stations = fix_unicode_copy(self.stations, str) 

568 self.locations = fix_unicode_copy(self.locations, str) 

569 self.channels = fix_unicode_copy(self.channels, str) 

570 self.nslc_ids = fix_unicode_copy( 

571 self.nslc_ids, lambda k: tuple(str(x) for x in k)) 

572 

573 def add(self, content): 

574 ''' 

575 Add content to traces group and update indices. 

576 

577 Accepts :py:class:`pyrocko.trace.Trace` objects and 

578 :py:class:`pyrocko.pile.TracesGroup` objects. 

579 ''' 

580 

581 if isinstance(content, (trace.Trace, TracesGroup)): 

582 content = [content] 

583 

584 for c in content: 

585 

586 if isinstance(c, TracesGroup): 

587 self.networks.update(c.networks) 

588 self.stations.update(c.stations) 

589 self.locations.update(c.locations) 

590 self.channels.update(c.channels) 

591 self.nslc_ids.update(c.nslc_ids) 

592 self.deltats.update(c.deltats) 

593 

594 self.by_tmin.insert_many(c.by_tmin) 

595 self.by_tmax.insert_many(c.by_tmax) 

596 self.by_tlen.insert_many(c.by_tlen) 

597 self.by_mtime.insert_many(c.by_mtime) 

598 

599 elif isinstance(c, trace.Trace): 

600 self.networks[c.network] += 1 

601 self.stations[c.station] += 1 

602 self.locations[c.location] += 1 

603 self.channels[c.channel] += 1 

604 self.nslc_ids[c.nslc_id] += 1 

605 self.deltats[c.deltat] += 1 

606 

607 self.by_tmin.insert(c) 

608 self.by_tmax.insert(c) 

609 self.by_tlen.insert(c) 

610 self.by_mtime.insert(c) 

611 

612 self.adjust_minmax() 

613 

614 self.nupdates += 1 

615 self.notify_listeners('add') 

616 

617 if self.parent is not None: 

618 self.parent.add(content) 

619 

620 def remove(self, content): 

621 ''' 

622 Remove content to traces group and update indices. 

623 ''' 

624 if isinstance(content, (trace.Trace, TracesGroup)): 

625 content = [content] 

626 

627 for c in content: 

628 

629 if isinstance(c, TracesGroup): 

630 self.networks.subtract(c.networks) 

631 self.stations.subtract(c.stations) 

632 self.locations.subtract(c.locations) 

633 self.channels.subtract(c.channels) 

634 self.nslc_ids.subtract(c.nslc_ids) 

635 self.deltats.subtract(c.deltats) 

636 

637 self.by_tmin.remove_many(c.by_tmin) 

638 self.by_tmax.remove_many(c.by_tmax) 

639 self.by_tlen.remove_many(c.by_tlen) 

640 self.by_mtime.remove_many(c.by_mtime) 

641 

642 elif isinstance(c, trace.Trace): 

643 self.networks.subtract1(c.network) 

644 self.stations.subtract1(c.station) 

645 self.locations.subtract1(c.location) 

646 self.channels.subtract1(c.channel) 

647 self.nslc_ids.subtract1(c.nslc_id) 

648 self.deltats.subtract1(c.deltat) 

649 

650 self.by_tmin.remove(c) 

651 self.by_tmax.remove(c) 

652 self.by_tlen.remove(c) 

653 self.by_mtime.remove(c) 

654 

655 self.adjust_minmax() 

656 

657 self.nupdates += 1 

658 self.notify_listeners('remove') 

659 

660 if self.parent is not None: 

661 self.parent.remove(content) 

662 

663 def relevant(self, tmin, tmax, group_selector=None, trace_selector=None): 

664 ''' 

665 Return list of :py:class:`pyrocko.trace.Trace` objects where given 

666 arguments ``tmin`` and ``tmax`` match. 

667 

668 :param tmin: start time 

669 :param tmax: end time 

670 :param group_selector: lambda expression taking group dict of regex 

671 match object as a single argument and which returns true or false 

672 to keep or reject a file (default: ``None``) 

673 :param trace_selector: lambda expression taking group dict of regex 

674 match object as a single argument and which returns true or false 

675 to keep or reject a file (default: ``None``) 

676 ''' 

677 

678 if not self.by_tmin or not self.is_relevant( 

679 tmin, tmax, group_selector): 

680 

681 return [] 

682 

683 return [tr for tr in self.by_tmin.with_key_in(tmin-self.tlenmax, tmax) 

684 if tr.is_relevant(tmin, tmax, trace_selector)] 

685 

686 def adjust_minmax(self): 

687 if self.by_tmin: 

688 self.tmin = self.by_tmin.min().tmin 

689 self.tmax = self.by_tmax.max().tmax 

690 t = self.by_tlen.max() 

691 self.tlenmax = t.tmax - t.tmin 

692 self.mtime = self.by_mtime.max().mtime 

693 deltats = list(self.deltats.keys()) 

694 self.deltatmin = min(deltats) 

695 self.deltatmax = max(deltats) 

696 else: 

697 self.tmin = None 

698 self.tmax = None 

699 self.tlenmax = None 

700 self.mtime = None 

701 self.deltatmin = None 

702 self.deltatmax = None 

703 

704 def notify_listeners(self, what): 

705 pass 

706 

707 def get_update_count(self): 

708 return self.nupdates 

709 

710 def overlaps(self, tmin, tmax): 

711 return self.tmin is not None \ 

712 and tmax >= self.tmin and self.tmax >= tmin 

713 

714 def is_relevant(self, tmin, tmax, group_selector=None): 

715 if self.tmin is None or self.tmax is None: 

716 return False 

717 return tmax >= self.tmin and self.tmax >= tmin and ( 

718 group_selector is None or group_selector(self)) 

719 

720 

721class MemTracesFile(TracesGroup): 

722 

723 ''' 

724 This is needed to make traces without an actual disc file to be inserted 

725 into a Pile. 

726 ''' 

727 

728 def __init__(self, parent, traces): 

729 TracesGroup.__init__(self, parent) 

730 self.add(traces) 

731 self.mtime = time.time() 

732 

733 def add(self, traces): 

734 if isinstance(traces, trace.Trace): 

735 traces = [traces] 

736 

737 for tr in traces: 

738 tr.file = self 

739 

740 TracesGroup.add(self, traces) 

741 

742 def load_headers(self, mtime=None): 

743 pass 

744 

745 def load_data(self): 

746 pass 

747 

748 def use_data(self): 

749 pass 

750 

751 def drop_data(self): 

752 pass 

753 

754 def reload_if_modified(self): 

755 return False 

756 

757 def iter_traces(self): 

758 for tr in self.by_tmin: 

759 yield tr 

760 

761 def get_traces(self): 

762 return list(self.by_tmin) 

763 

764 def gather_keys(self, gather, selector=None): 

765 keys = set() 

766 for tr in self.by_tmin: 

767 if selector is None or selector(tr): 

768 keys.add(gather(tr)) 

769 

770 return keys 

771 

772 def __str__(self): 

773 

774 s = 'MemTracesFile\n' 

775 s += 'file mtime: %s\n' % util.time_to_str(self.mtime) 

776 s += 'number of traces: %i\n' % len(self.by_tmin) 

777 s += 'timerange: %s - %s\n' % ( 

778 util.time_to_str(self.tmin), util.time_to_str(self.tmax)) 

779 s += 'networks: %s\n' % ', '.join(sl(self.networks.keys())) 

780 s += 'stations: %s\n' % ', '.join(sl(self.stations.keys())) 

781 s += 'locations: %s\n' % ', '.join(sl(self.locations.keys())) 

782 s += 'channels: %s\n' % ', '.join(sl(self.channels.keys())) 

783 s += 'deltats: %s\n' % ', '.join(sl(self.deltats.keys())) 

784 return s 

785 

786 

787class TracesFile(TracesGroup): 

788 def __init__( 

789 self, parent, abspath, format, 

790 substitutions=None, mtime=None): 

791 

792 TracesGroup.__init__(self, parent) 

793 self.abspath = abspath 

794 self.format = format 

795 self.traces = [] 

796 self.data_loaded = False 

797 self.data_use_count = 0 

798 self.substitutions = substitutions 

799 self.load_headers(mtime=mtime) 

800 self.mtime = mtime 

801 

802 def load_headers(self, mtime=None): 

803 logger.debug('loading headers from file: %s' % self.abspath) 

804 if mtime is None: 

805 self.mtime = os.stat(self.abspath)[8] 

806 

807 def kgen(tr): 

808 return (tr.mtime, tr.tmin, tr.tmax) + tr.nslc_id 

809 

810 self.remove(self.traces) 

811 ks = set() 

812 for tr in io.load(self.abspath, 

813 format=self.format, 

814 getdata=False, 

815 substitutions=self.substitutions): 

816 

817 k = kgen(tr) 

818 if k not in ks: 

819 ks.add(k) 

820 self.traces.append(tr) 

821 tr.file = self 

822 

823 self.add(self.traces) 

824 

825 self.data_loaded = False 

826 self.data_use_count = 0 

827 

828 def load_data(self, force=False): 

829 file_changed = False 

830 if not self.data_loaded or force: 

831 logger.debug('loading data from file: %s' % self.abspath) 

832 

833 def kgen(tr): 

834 return (tr.mtime, tr.tmin, tr.tmax) + tr.nslc_id 

835 

836 traces_ = io.load(self.abspath, format=self.format, getdata=True, 

837 substitutions=self.substitutions) 

838 

839 # prevent adding duplicate snippets from corrupt mseed files 

840 k_loaded = set() 

841 traces = [] 

842 for tr in traces_: 

843 k = kgen(tr) 

844 if k not in k_loaded: 

845 k_loaded.add(k) 

846 traces.append(tr) 

847 

848 k_current_d = dict((kgen(tr), tr) for tr in self.traces) 

849 k_current = set(k_current_d) 

850 k_new = k_loaded - k_current 

851 k_delete = k_current - k_loaded 

852 k_unchanged = k_current & k_loaded 

853 

854 for tr in self.traces[:]: 

855 if kgen(tr) in k_delete: 

856 self.remove(tr) 

857 self.traces.remove(tr) 

858 tr.file = None 

859 file_changed = True 

860 

861 for tr in traces: 

862 if kgen(tr) in k_new: 

863 tr.file = self 

864 self.traces.append(tr) 

865 self.add(tr) 

866 file_changed = True 

867 

868 for tr in traces: 

869 if kgen(tr) in k_unchanged: 

870 ctr = k_current_d[kgen(tr)] 

871 ctr.ydata = tr.ydata 

872 

873 self.data_loaded = True 

874 

875 if file_changed: 

876 logger.debug('reloaded (file may have changed): %s' % self.abspath) 

877 

878 return file_changed 

879 

880 def use_data(self): 

881 if not self.data_loaded: 

882 raise Exception('Data not loaded') 

883 self.data_use_count += 1 

884 

885 def drop_data(self): 

886 if self.data_loaded: 

887 if self.data_use_count == 1: 

888 logger.debug('forgetting data of file: %s' % self.abspath) 

889 for tr in self.traces: 

890 tr.drop_data() 

891 

892 self.data_loaded = False 

893 

894 self.data_use_count -= 1 

895 else: 

896 self.data_use_count = 0 

897 

898 def reload_if_modified(self): 

899 mtime = os.stat(self.abspath)[8] 

900 if mtime != self.mtime: 

901 logger.debug( 

902 'mtime=%i, reloading file: %s' % (mtime, self.abspath)) 

903 

904 self.mtime = mtime 

905 if self.data_loaded: 

906 self.load_data(force=True) 

907 else: 

908 self.load_headers() 

909 

910 return True 

911 

912 return False 

913 

914 def iter_traces(self): 

915 for tr in self.traces: 

916 yield tr 

917 

918 def gather_keys(self, gather, selector=None): 

919 keys = set() 

920 for tr in self.by_tmin: 

921 if selector is None or selector(tr): 

922 keys.add(gather(tr)) 

923 

924 return keys 

925 

926 def __str__(self): 

927 s = 'TracesFile\n' 

928 s += 'abspath: %s\n' % self.abspath 

929 s += 'file mtime: %s\n' % util.time_to_str(self.mtime) 

930 s += 'number of traces: %i\n' % len(self.traces) 

931 s += 'timerange: %s - %s\n' % ( 

932 util.time_to_str(self.tmin), util.time_to_str(self.tmax)) 

933 s += 'networks: %s\n' % ', '.join(sl(self.networks.keys())) 

934 s += 'stations: %s\n' % ', '.join(sl(self.stations.keys())) 

935 s += 'locations: %s\n' % ', '.join(sl(self.locations.keys())) 

936 s += 'channels: %s\n' % ', '.join(sl(self.channels.keys())) 

937 s += 'deltats: %s\n' % ', '.join(sl(self.deltats.keys())) 

938 return s 

939 

940 

941class FilenameAttributeError(Exception): 

942 pass 

943 

944 

945class SubPile(TracesGroup): 

946 def __init__(self, parent): 

947 TracesGroup.__init__(self, parent) 

948 self.files = [] 

949 self.empty() 

950 

951 def add_file(self, file): 

952 self.files.append(file) 

953 file.set_parent(self) 

954 self.add(file) 

955 

956 def remove_file(self, file): 

957 self.files.remove(file) 

958 file.set_parent(None) 

959 self.remove(file) 

960 

961 def remove_files(self, files): 

962 for file in files: 

963 self.files.remove(file) 

964 file.set_parent(None) 

965 self.remove(files) 

966 

967 def gather_keys(self, gather, selector=None): 

968 keys = set() 

969 for file in self.files: 

970 keys |= file.gather_keys(gather, selector) 

971 

972 return keys 

973 

974 def iter_traces( 

975 self, 

976 load_data=False, 

977 return_abspath=False, 

978 group_selector=None, 

979 trace_selector=None): 

980 

981 for file in self.files: 

982 

983 if group_selector and not group_selector(file): 

984 continue 

985 

986 must_drop = False 

987 if load_data: 

988 file.load_data() 

989 file.use_data() 

990 must_drop = True 

991 

992 for tr in file.iter_traces(): 

993 if trace_selector and not trace_selector(tr): 

994 continue 

995 

996 if return_abspath: 

997 yield file.abspath, tr 

998 else: 

999 yield tr 

1000 

1001 if must_drop: 

1002 file.drop_data() 

1003 

1004 def iter_files(self): 

1005 for file in self.files: 

1006 yield file 

1007 

1008 def reload_modified(self): 

1009 modified = False 

1010 for file in self.files: 

1011 modified |= file.reload_if_modified() 

1012 

1013 return modified 

1014 

1015 def __str__(self): 

1016 s = 'SubPile\n' 

1017 s += 'number of files: %i\n' % len(self.files) 

1018 s += 'timerange: %s - %s\n' % ( 

1019 util.time_to_str(self.tmin), util.time_to_str(self.tmax)) 

1020 s += 'networks: %s\n' % ', '.join(sl(self.networks.keys())) 

1021 s += 'stations: %s\n' % ', '.join(sl(self.stations.keys())) 

1022 s += 'locations: %s\n' % ', '.join(sl(self.locations.keys())) 

1023 s += 'channels: %s\n' % ', '.join(sl(self.channels.keys())) 

1024 s += 'deltats: %s\n' % ', '.join(sl(self.deltats.keys())) 

1025 return s 

1026 

1027 

1028class Batch(object): 

1029 ''' 

1030 Batch of waveforms from window wise data extraction. 

1031 

1032 Encapsulates state and results yielded for each window in window wise 

1033 waveform extraction with the :py:meth:`Pile.chopper` method (when the 

1034 `style='batch'` keyword argument set). 

1035 

1036 *Attributes:* 

1037 

1038 .. py:attribute:: tmin 

1039 

1040 Start of this time window. 

1041 

1042 .. py:attribute:: tmax 

1043 

1044 End of this time window. 

1045 

1046 .. py:attribute:: i 

1047 

1048 Index of this time window in sequence. 

1049 

1050 .. py:attribute:: n 

1051 

1052 Total number of time windows in sequence. 

1053 

1054 .. py:attribute:: traces 

1055 

1056 Extracted waveforms for this time window. 

1057 ''' 

1058 

1059 def __init__(self, tmin, tmax, i, n, traces): 

1060 self.tmin = tmin 

1061 self.tmax = tmax 

1062 self.i = i 

1063 self.n = n 

1064 self.traces = traces 

1065 

1066 

1067class Pile(TracesGroup): 

1068 ''' 

1069 Waveform archive lookup, data loading and caching infrastructure. 

1070 ''' 

1071 

1072 def __init__(self): 

1073 TracesGroup.__init__(self, None) 

1074 self.subpiles = {} 

1075 self.open_files = {} 

1076 self.listeners = [] 

1077 self.abspaths = set() 

1078 

1079 def add_listener(self, obj): 

1080 self.listeners.append(weakref.ref(obj)) 

1081 

1082 def notify_listeners(self, what): 

1083 for ref in self.listeners: 

1084 obj = ref() 

1085 if obj: 

1086 obj.pile_changed(what) 

1087 

1088 def load_files( 

1089 self, filenames, 

1090 filename_attributes=None, 

1091 fileformat='mseed', 

1092 cache=None, 

1093 show_progress=True, 

1094 update_progress=None): 

1095 

1096 load = loader( 

1097 filenames, fileformat, cache, filename_attributes, 

1098 show_progress=show_progress, 

1099 update_progress=update_progress) 

1100 

1101 self.add_files(load) 

1102 

1103 def add_files(self, files): 

1104 for file in files: 

1105 self.add_file(file) 

1106 

1107 def add_file(self, file): 

1108 if file.abspath is not None and file.abspath in self.abspaths: 

1109 logger.warning('File already in pile: %s' % file.abspath) 

1110 return 

1111 

1112 if file.deltatmin is None: 

1113 logger.warning('Sampling rate of all traces are zero in file: %s' % 

1114 file.abspath) 

1115 return 

1116 

1117 subpile = self.dispatch(file) 

1118 subpile.add_file(file) 

1119 if file.abspath is not None: 

1120 self.abspaths.add(file.abspath) 

1121 

1122 def remove_file(self, file): 

1123 subpile = file.get_parent() 

1124 if subpile is not None: 

1125 subpile.remove_file(file) 

1126 if file.abspath is not None: 

1127 self.abspaths.remove(file.abspath) 

1128 

1129 def remove_files(self, files): 

1130 subpile_files = {} 

1131 for file in files: 

1132 subpile = file.get_parent() 

1133 if subpile not in subpile_files: 

1134 subpile_files[subpile] = [] 

1135 

1136 subpile_files[subpile].append(file) 

1137 

1138 for subpile, files in subpile_files.items(): 

1139 subpile.remove_files(files) 

1140 for file in files: 

1141 if file.abspath is not None: 

1142 self.abspaths.remove(file.abspath) 

1143 

1144 def dispatch_key(self, file): 

1145 dt = int(math.floor(math.log(file.deltatmin))) 

1146 return dt 

1147 

1148 def dispatch(self, file): 

1149 k = self.dispatch_key(file) 

1150 if k not in self.subpiles: 

1151 self.subpiles[k] = SubPile(self) 

1152 

1153 return self.subpiles[k] 

1154 

1155 def get_deltats(self): 

1156 return list(self.deltats.keys()) 

1157 

1158 def chop( 

1159 self, tmin, tmax, 

1160 group_selector=None, 

1161 trace_selector=None, 

1162 snap=(round, round), 

1163 include_last=False, 

1164 load_data=True): 

1165 

1166 chopped = [] 

1167 used_files = set() 

1168 

1169 traces = self.relevant(tmin, tmax, group_selector, trace_selector) 

1170 if load_data: 

1171 files_changed = False 

1172 for tr in traces: 

1173 if tr.file and tr.file not in used_files: 

1174 if tr.file.load_data(): 

1175 files_changed = True 

1176 

1177 if tr.file is not None: 

1178 used_files.add(tr.file) 

1179 

1180 if files_changed: 

1181 traces = self.relevant( 

1182 tmin, tmax, group_selector, trace_selector) 

1183 

1184 for tr in traces: 

1185 if not load_data and tr.ydata is not None: 

1186 tr = tr.copy(data=False) 

1187 tr.ydata = None 

1188 

1189 try: 

1190 chopped.append(tr.chop( 

1191 tmin, tmax, 

1192 inplace=False, 

1193 snap=snap, 

1194 include_last=include_last)) 

1195 

1196 except trace.NoData: 

1197 pass 

1198 

1199 return chopped, used_files 

1200 

1201 def _process_chopped( 

1202 self, chopped, degap, maxgap, maxlap, want_incomplete, wmax, wmin, 

1203 tpad): 

1204 

1205 chopped.sort(key=lambda a: a.full_id) 

1206 if degap: 

1207 chopped = degapper(chopped, maxgap=maxgap, maxlap=maxlap) 

1208 

1209 if not want_incomplete: 

1210 chopped_weeded = [] 

1211 for tr in chopped: 

1212 emin = tr.tmin - (wmin-tpad) 

1213 emax = tr.tmax + tr.deltat - (wmax+tpad) 

1214 if (abs(emin) <= 0.5*tr.deltat and abs(emax) <= 0.5*tr.deltat): 

1215 chopped_weeded.append(tr) 

1216 

1217 elif degap: 

1218 if (0. < emin <= 5. * tr.deltat and 

1219 -5. * tr.deltat <= emax < 0.): 

1220 

1221 tr.extend( 

1222 wmin-tpad, 

1223 wmax+tpad-tr.deltat, 

1224 fillmethod='repeat') 

1225 

1226 chopped_weeded.append(tr) 

1227 

1228 chopped = chopped_weeded 

1229 

1230 for tr in chopped: 

1231 tr.wmin = wmin 

1232 tr.wmax = wmax 

1233 

1234 return chopped 

1235 

1236 def chopper( 

1237 self, 

1238 tmin=None, tmax=None, tinc=None, tpad=0., 

1239 group_selector=None, trace_selector=None, 

1240 want_incomplete=True, degap=True, maxgap=5, maxlap=None, 

1241 keep_current_files_open=False, accessor_id=None, 

1242 snap=(round, round), include_last=False, load_data=True, 

1243 style=None): 

1244 

1245 ''' 

1246 Get iterator for shifting window wise data extraction from waveform 

1247 archive. 

1248 

1249 :param tmin: start time (default uses start time of available data) 

1250 :param tmax: end time (default uses end time of available data) 

1251 :param tinc: time increment (window shift time) (default uses 

1252 ``tmax-tmin``) 

1253 :param tpad: padding time appended on either side of the data windows 

1254 (window overlap is ``2*tpad``) 

1255 :param group_selector: filter callback taking :py:class:`TracesGroup` 

1256 objects 

1257 :param trace_selector: filter callback taking 

1258 :py:class:`pyrocko.trace.Trace` objects 

1259 :param want_incomplete: if set to ``False``, gappy/incomplete traces 

1260 are discarded from the results 

1261 :param degap: whether to try to connect traces and to remove gaps and 

1262 overlaps 

1263 :param maxgap: maximum gap size in samples which is filled with 

1264 interpolated samples when ``degap`` is ``True`` 

1265 :param maxlap: maximum overlap size in samples which is removed when 

1266 ``degap`` is ``True`` 

1267 :param keep_current_files_open: whether to keep cached trace data in 

1268 memory after the iterator has ended 

1269 :param accessor_id: if given, used as a key to identify different 

1270 points of extraction for the decision of when to release cached 

1271 trace data (should be used when data is alternately extracted from 

1272 more than one region / selection) 

1273 :param snap: replaces Python's :py:func:`round` function which is used 

1274 to determine indices where to start and end the trace data array 

1275 :param include_last: whether to include last sample 

1276 :param load_data: whether to load the waveform data. If set to 

1277 ``False``, traces with no data samples, but with correct 

1278 meta-information are returned 

1279 :param style: set to ``'batch'`` to yield waveforms and information 

1280 about the chopper state as :py:class:`Batch` objects. By default 

1281 lists of :py:class:`pyrocko.trace.Trace` objects are yielded. 

1282 :returns: iterator providing extracted waveforms for each extracted 

1283 window. See ``style`` argument for details. 

1284 ''' 

1285 if tmin is None: 

1286 if self.tmin is None: 

1287 logger.warning('Pile\'s tmin is not set - pile may be empty.') 

1288 return 

1289 tmin = self.tmin + tpad 

1290 

1291 if tmax is None: 

1292 if self.tmax is None: 

1293 logger.warning('Pile\'s tmax is not set - pile may be empty.') 

1294 return 

1295 tmax = self.tmax - tpad 

1296 

1297 if not self.is_relevant(tmin-tpad, tmax+tpad, group_selector): 

1298 return 

1299 

1300 if accessor_id not in self.open_files: 

1301 self.open_files[accessor_id] = set() 

1302 

1303 open_files = self.open_files[accessor_id] 

1304 

1305 if tinc is None: 

1306 tinc = tmax - tmin 

1307 nwin = 1 

1308 else: 

1309 eps = tinc * 1e-6 

1310 if tinc != 0.0: 

1311 nwin = int(((tmax - eps) - tmin) / tinc) + 1 

1312 else: 

1313 nwin = 1 

1314 

1315 for iwin in range(nwin): 

1316 wmin, wmax = tmin+iwin*tinc, min(tmin+(iwin+1)*tinc, tmax) 

1317 

1318 chopped, used_files = self.chop( 

1319 wmin-tpad, wmax+tpad, group_selector, trace_selector, snap, 

1320 include_last, load_data) 

1321 

1322 for file in used_files - open_files: 

1323 # increment datause counter on newly opened files 

1324 file.use_data() 

1325 

1326 open_files.update(used_files) 

1327 

1328 processed = self._process_chopped( 

1329 chopped, degap, maxgap, maxlap, want_incomplete, wmax, wmin, 

1330 tpad) 

1331 

1332 if style == 'batch': 

1333 yield Batch( 

1334 tmin=wmin, 

1335 tmax=wmax, 

1336 i=iwin, 

1337 n=nwin, 

1338 traces=processed) 

1339 

1340 else: 

1341 yield processed 

1342 

1343 unused_files = open_files - used_files 

1344 

1345 while unused_files: 

1346 file = unused_files.pop() 

1347 file.drop_data() 

1348 open_files.remove(file) 

1349 

1350 if not keep_current_files_open: 

1351 while open_files: 

1352 file = open_files.pop() 

1353 file.drop_data() 

1354 

1355 def all(self, *args, **kwargs): 

1356 ''' 

1357 Shortcut to aggregate :py:meth:`chopper` output into a single list. 

1358 ''' 

1359 

1360 alltraces = [] 

1361 for traces in self.chopper(*args, **kwargs): 

1362 alltraces.extend(traces) 

1363 

1364 return alltraces 

1365 

1366 def iter_all(self, *args, **kwargs): 

1367 for traces in self.chopper(*args, **kwargs): 

1368 for tr in traces: 

1369 yield tr 

1370 

1371 def chopper_grouped(self, gather, progress=None, *args, **kwargs): 

1372 keys = self.gather_keys(gather) 

1373 if len(keys) == 0: 

1374 return 

1375 

1376 outer_group_selector = None 

1377 if 'group_selector' in kwargs: 

1378 outer_group_selector = kwargs['group_selector'] 

1379 

1380 outer_trace_selector = None 

1381 if 'trace_selector' in kwargs: 

1382 outer_trace_selector = kwargs['trace_selector'] 

1383 

1384 # the use of this gather-cache makes it impossible to modify the pile 

1385 # during chopping 

1386 gather_cache = {} 

1387 pbar = None 

1388 try: 

1389 if progress is not None: 

1390 pbar = util.progressbar(progress, len(keys)) 

1391 

1392 for ikey, key in enumerate(keys): 

1393 def tsel(tr): 

1394 return gather(tr) == key and ( 

1395 outer_trace_selector is None 

1396 or outer_trace_selector(tr)) 

1397 

1398 def gsel(gr): 

1399 if gr not in gather_cache: 

1400 gather_cache[gr] = gr.gather_keys(gather) 

1401 

1402 return key in gather_cache[gr] and ( 

1403 outer_group_selector is None 

1404 or outer_group_selector(gr)) 

1405 

1406 kwargs['trace_selector'] = tsel 

1407 kwargs['group_selector'] = gsel 

1408 

1409 for traces in self.chopper(*args, **kwargs): 

1410 yield traces 

1411 

1412 if pbar: 

1413 pbar.update(ikey+1) 

1414 

1415 finally: 

1416 if pbar: 

1417 pbar.finish() 

1418 

1419 def gather_keys(self, gather, selector=None): 

1420 keys = set() 

1421 for subpile in self.subpiles.values(): 

1422 keys |= subpile.gather_keys(gather, selector) 

1423 

1424 return sorted(keys) 

1425 

1426 def iter_traces( 

1427 self, 

1428 load_data=False, 

1429 return_abspath=False, 

1430 group_selector=None, 

1431 trace_selector=None): 

1432 

1433 ''' 

1434 Iterate over all traces in pile. 

1435 

1436 :param load_data: whether to load the waveform data, by default empty 

1437 traces are yielded 

1438 :param return_abspath: if ``True`` yield tuples containing absolute 

1439 file path and :py:class:`pyrocko.trace.Trace` objects 

1440 :param group_selector: filter callback taking :py:class:`TracesGroup` 

1441 objects 

1442 :param trace_selector: filter callback taking 

1443 :py:class:`pyrocko.trace.Trace` objects 

1444 

1445 Example; yields only traces, where the station code is 'HH1':: 

1446 

1447 test_pile = pile.make_pile('/local/test_trace_directory') 

1448 for t in test_pile.iter_traces( 

1449 trace_selector=lambda tr: tr.station=='HH1'): 

1450 

1451 print t 

1452 ''' 

1453 

1454 for subpile in self.subpiles.values(): 

1455 if not group_selector or group_selector(subpile): 

1456 for tr in subpile.iter_traces(load_data, return_abspath, 

1457 group_selector, trace_selector): 

1458 yield tr 

1459 

1460 def iter_files(self): 

1461 for subpile in self.subpiles.values(): 

1462 for file in subpile.iter_files(): 

1463 yield file 

1464 

1465 def reload_modified(self): 

1466 modified = False 

1467 for subpile in self.subpiles.values(): 

1468 modified |= subpile.reload_modified() 

1469 

1470 return modified 

1471 

1472 def get_tmin(self): 

1473 return self.tmin 

1474 

1475 def get_tmax(self): 

1476 return self.tmax 

1477 

1478 def get_deltatmin(self): 

1479 return self.deltatmin 

1480 

1481 def get_deltatmax(self): 

1482 return self.deltatmax 

1483 

1484 def is_empty(self): 

1485 return self.tmin is None and self.tmax is None 

1486 

1487 def __str__(self): 

1488 if self.tmin is not None and self.tmax is not None: 

1489 tmin = util.time_to_str(self.tmin) 

1490 tmax = util.time_to_str(self.tmax) 

1491 s = 'Pile\n' 

1492 s += 'number of subpiles: %i\n' % len(self.subpiles) 

1493 s += 'timerange: %s - %s\n' % (tmin, tmax) 

1494 s += 'networks: %s\n' % ', '.join(sl(self.networks.keys())) 

1495 s += 'stations: %s\n' % ', '.join(sl(self.stations.keys())) 

1496 s += 'locations: %s\n' % ', '.join(sl(self.locations.keys())) 

1497 s += 'channels: %s\n' % ', '.join(sl(self.channels.keys())) 

1498 s += 'deltats: %s\n' % ', '.join(sl(self.deltats.keys())) 

1499 

1500 else: 

1501 s = 'empty Pile' 

1502 

1503 return s 

1504 

1505 def snuffle(self, **kwargs): 

1506 ''' 

1507 Visualize it. 

1508 

1509 :param stations: list of `pyrocko.model.Station` objects or ``None`` 

1510 :param events: list of `pyrocko.model.Event` objects or ``None`` 

1511 :param markers: list of `pyrocko.gui_util.Marker` objects or ``None`` 

1512 :param ntracks: float, number of tracks to be shown initially 

1513 (default: 12) 

1514 :param follow: time interval (in seconds) for real time follow mode or 

1515 ``None`` 

1516 :param controls: bool, whether to show the main controls (default: 

1517 ``True``) 

1518 :param opengl: bool, whether to use opengl (default: ``False``) 

1519 ''' 

1520 

1521 from pyrocko.gui.snuffler import snuffle 

1522 snuffle(self, **kwargs) 

1523 

1524 

1525def make_pile( 

1526 paths=None, selector=None, regex=None, 

1527 fileformat='mseed', 

1528 cachedirname=None, show_progress=True): 

1529 

1530 ''' 

1531 Create pile from given file and directory names. 

1532 

1533 :param paths: filenames and/or directories to look for traces. If paths is 

1534 ``None`` ``sys.argv[1:]`` is used. 

1535 :param selector: lambda expression taking group dict of regex match object 

1536 as a single argument and which returns true or false to keep or reject 

1537 a file 

1538 :param regex: regular expression which filenames have to match 

1539 :param fileformat: format of the files ('mseed', 'sac', 'kan', 

1540 'from_extension', 'detect') 

1541 :param cachedirname: loader cache is stored under this directory. It is 

1542 created as neccessary. 

1543 :param show_progress: show progress bar and other progress information 

1544 ''' 

1545 

1546 if show_progress_force_off: 

1547 show_progress = False 

1548 

1549 if isinstance(paths, str): 

1550 paths = [paths] 

1551 

1552 if paths is None: 

1553 paths = sys.argv[1:] 

1554 

1555 if cachedirname is None: 

1556 cachedirname = config.config().cache_dir 

1557 

1558 fns = util.select_files( 

1559 paths, include=regex, selector=selector, show_progress=show_progress) 

1560 

1561 cache = get_cache(cachedirname) 

1562 p = Pile() 

1563 p.load_files( 

1564 sorted(fns), 

1565 cache=cache, 

1566 fileformat=fileformat, 

1567 show_progress=show_progress) 

1568 

1569 return p 

1570 

1571 

1572class Injector(trace.States): 

1573 

1574 def __init__( 

1575 self, pile, 

1576 fixation_length=None, 

1577 path=None, 

1578 format='from_extension', 

1579 forget_fixed=False): 

1580 

1581 trace.States.__init__(self) 

1582 self._pile = pile 

1583 self._fixation_length = fixation_length 

1584 self._format = format 

1585 self._path = path 

1586 self._forget_fixed = forget_fixed 

1587 

1588 def set_fixation_length(self, length): 

1589 ''' 

1590 Set length after which the fixation method is called on buffer traces. 

1591 

1592 The length should be given in seconds. Give None to disable. 

1593 ''' 

1594 self.fixate_all() 

1595 self._fixation_length = length # in seconds 

1596 

1597 def set_save_path( 

1598 self, 

1599 path='dump_%(network)s.%(station)s.%(location)s.%(channel)s_' 

1600 '%(tmin)s_%(tmax)s.mseed'): 

1601 

1602 self.fixate_all() 

1603 self._path = path 

1604 

1605 def inject(self, trace): 

1606 logger.debug('Received a trace: %s' % trace) 

1607 

1608 buf = self.get(trace) 

1609 if buf is None: 

1610 trbuf = trace.copy() 

1611 buf = MemTracesFile(None, [trbuf]) 

1612 self._pile.add_file(buf) 

1613 self.set(trace, buf) 

1614 

1615 else: 

1616 self._pile.remove_file(buf) 

1617 trbuf = buf.get_traces()[0] 

1618 buf.remove(trbuf) 

1619 trbuf.append(trace.ydata) 

1620 buf.add(trbuf) 

1621 self._pile.add_file(buf) 

1622 self.set(trace, buf) 

1623 

1624 trbuf = buf.get_traces()[0] 

1625 if self._fixation_length is not None: 

1626 if trbuf.tmax - trbuf.tmin > self._fixation_length: 

1627 self._fixate(buf, complete=False) 

1628 

1629 def fixate_all(self): 

1630 for state in list(self._states.values()): 

1631 self._fixate(state[-1]) 

1632 

1633 self._states = {} 

1634 

1635 def free(self, buf): 

1636 self._fixate(buf) 

1637 

1638 def _fixate(self, buf, complete=True): 

1639 trbuf = buf.get_traces()[0] 

1640 del_state = True 

1641 if self._path: 

1642 if self._fixation_length is not None: 

1643 ttmin = trbuf.tmin 

1644 ytmin = util.year_start(ttmin) 

1645 n = int(math.floor((ttmin - ytmin) / self._fixation_length)) 

1646 tmin = ytmin + n*self._fixation_length 

1647 traces = [] 

1648 t = tmin 

1649 while t <= trbuf.tmax: 

1650 try: 

1651 traces.append( 

1652 trbuf.chop( 

1653 t, 

1654 t+self._fixation_length, 

1655 inplace=False, 

1656 snap=(math.ceil, math.ceil))) 

1657 

1658 except trace.NoData: 

1659 pass 

1660 t += self._fixation_length 

1661 

1662 if abs(traces[-1].tmax - (t - trbuf.deltat)) < \ 

1663 trbuf.deltat/100. or complete: 

1664 

1665 self._pile.remove_file(buf) 

1666 

1667 else: # reinsert incomplete last part 

1668 new_trbuf = traces.pop() 

1669 self._pile.remove_file(buf) 

1670 buf.remove(trbuf) 

1671 buf.add(new_trbuf) 

1672 self._pile.add_file(buf) 

1673 del_state = False 

1674 

1675 else: 

1676 traces = [trbuf] 

1677 self._pile.remove_file(buf) 

1678 

1679 fns = io.save(traces, self._path, format=self._format) 

1680 

1681 if not self._forget_fixed: 

1682 self._pile.load_files( 

1683 fns, show_progress=False, fileformat=self._format) 

1684 

1685 if del_state: 

1686 del self._states[trbuf.nslc_id] 

1687 

1688 def __del__(self): 

1689 self.fixate_all()