1# http://pyrocko.org - GPLv3 

2# 

3# The Pyrocko Developers, 21st Century 

4# ---|P------/S----------~Lg---------- 

5from __future__ import absolute_import, division, print_function 

6 

7import os 

8import logging 

9import time 

10import weakref 

11import copy 

12import re 

13import sys 

14import operator 

15import math 

16import hashlib 

17try: 

18 import cPickle as pickle 

19except ImportError: 

20 import pickle 

21 

22 

23from . import avl 

24from . import trace, io, util 

25from . import config 

26from .trace import degapper 

27 

28 

29is_windows = sys.platform.startswith('win') 

30show_progress_force_off = False 

31version_salt = 'v1-' 

32 

33 

34def ehash(s): 

35 return hashlib.sha1((version_salt + s).encode('utf8')).hexdigest() 

36 

37 

38def cmp(a, b): 

39 return int(a > b) - int(a < b) 

40 

41 

42def sl(s): 

43 return [str(x) for x in sorted(s)] 

44 

45 

46class Counter(dict): 

47 

48 def __missing__(self, k): 

49 return 0 

50 

51 def update(self, other): 

52 for k, v in other.items(): 

53 self[k] += v 

54 

55 def subtract(self, other): 

56 for k, v in other.items(): 

57 self[k] -= v 

58 if self[k] <= 0: 

59 del self[k] 

60 

61 def subtract1(self, k): 

62 self[k] -= 1 

63 if self[k] <= 0: 

64 del self[k] 

65 

66 

67def fix_unicode_copy(counter, func): 

68 counter_new = Counter() 

69 for k in counter: 

70 counter_new[func(k)] = counter[k] 

71 return counter_new 

72 

73 

74pjoin = os.path.join 

75logger = logging.getLogger('pyrocko.pile') 

76 

77 

78def avl_remove_exact(avltree, element): 

79 ilo, ihi = avltree.span(element) 

80 for i in range(ilo, ihi): 

81 if avltree[i] is element: 

82 avltree.remove_at(i) 

83 return i 

84 

85 raise ValueError( 

86 'avl_remove_exact(avltree, element): element not in avltree') 

87 

88 

89def cmpfunc(key): 

90 if isinstance(key, str): 

91 # special cases; these run about 50% faster than the generic one on 

92 # Python 2.5 

93 if key == 'tmin': 

94 return lambda a, b: cmp(a.tmin, b.tmin) 

95 if key == 'tmax': 

96 return lambda a, b: cmp(a.tmax, b.tmax) 

97 

98 key = operator.attrgetter(key) 

99 

100 return lambda a, b: cmp(key(a), key(b)) 

101 

102 

103g_dummys = {} 

104 

105 

106def get_dummy(key): 

107 if key not in g_dummys: 

108 class Dummy(object): 

109 def __init__(self, k): 

110 setattr(self, key, k) 

111 

112 g_dummys[key] = Dummy 

113 

114 return g_dummys[key] 

115 

116 

117class TooMany(Exception): 

118 def __init__(self, n): 

119 Exception.__init__(self) 

120 self.n = n 

121 

122 

123class Sorted(object): 

124 def __init__(self, values=[], key=None): 

125 self._set_key(key) 

126 self._avl = avl.new(values, self._cmp) 

127 

128 def _set_key(self, key): 

129 self._key = key 

130 self._cmp = cmpfunc(key) 

131 if isinstance(key, str): 

132 self._dummy = get_dummy(key) 

133 

134 def __getstate__(self): 

135 state = list(self._avl.iter()), self._key 

136 return state 

137 

138 def __setstate__(self, state): 

139 l, key = state 

140 self._set_key(key) 

141 self._avl = avl.from_iter(iter(l), len(l)) 

142 

143 def insert(self, value): 

144 self._avl.insert(value) 

145 

146 def remove(self, value): 

147 return avl_remove_exact(self._avl, value) 

148 

149 def remove_at(self, i): 

150 return self._avl.remove_at(i) 

151 

152 def insert_many(self, values): 

153 for value in values: 

154 self._avl.insert(value) 

155 

156 def remove_many(self, values): 

157 for value in values: 

158 avl_remove_exact(self._avl, value) 

159 

160 def __iter__(self): 

161 return iter(self._avl) 

162 

163 def with_key_in(self, kmin, kmax): 

164 omin, omax = self._dummy(kmin), self._dummy(kmax) 

165 ilo, ihi = self._avl.span(omin, omax) 

166 return self._avl[ilo:ihi] 

167 

168 def with_key_in_limited(self, kmin, kmax, nmax): 

169 omin, omax = self._dummy(kmin), self._dummy(kmax) 

170 ilo, ihi = self._avl.span(omin, omax) 

171 if ihi - ilo > nmax: 

172 raise TooMany(ihi - ilo) 

173 

174 return self._avl[ilo:ihi] 

175 

176 def index(self, value): 

177 ilo, ihi = self._avl.span(value) 

178 for i in range(ilo, ihi): 

179 if self._avl[i] is value: 

180 return i 

181 

182 raise ValueError('element is not in avl tree') 

183 

184 def min(self): 

185 return self._avl.min() 

186 

187 def max(self): 

188 return self._avl.max() 

189 

190 def __len__(self): 

191 return len(self._avl) 

192 

193 def __getitem__(self, i): 

194 return self._avl[i] 

195 

196 

197class TracesFileCache(object): 

198 ''' 

199 Manages trace metainformation cache. 

200 

201 For each directory with files containing traces, one cache file is 

202 maintained to hold the trace metainformation of all files which are 

203 contained in the directory. 

204 ''' 

205 

206 caches = {} 

207 

208 def __init__(self, cachedir): 

209 ''' 

210 Create new cache. 

211 

212 :param cachedir: directory to hold the cache files. 

213 ''' 

214 

215 self.cachedir = cachedir 

216 self.dircaches = {} 

217 self.modified = set() 

218 util.ensuredir(self.cachedir) 

219 

220 def get(self, abspath): 

221 ''' 

222 Try to get an item from the cache. 

223 

224 :param abspath: absolute path of the object to retrieve 

225 

226 :returns: a stored object is returned or None if nothing could be 

227 found. 

228 ''' 

229 

230 dircache = self._get_dircache_for(abspath) 

231 if abspath in dircache: 

232 return dircache[abspath] 

233 return None 

234 

235 def put(self, abspath, tfile): 

236 ''' 

237 Put an item into the cache. 

238 

239 :param abspath: absolute path of the object to be stored 

240 :param tfile: object to be stored 

241 ''' 

242 

243 cachepath = self._dircachepath(abspath) 

244 # get lock on cachepath here 

245 dircache = self._get_dircache(cachepath) 

246 dircache[abspath] = tfile 

247 self.modified.add(cachepath) 

248 

249 def dump_modified(self): 

250 ''' 

251 Save any modifications to disk. 

252 ''' 

253 

254 for cachepath in self.modified: 

255 self._dump_dircache(self.dircaches[cachepath], cachepath) 

256 # unlock 

257 

258 self.modified = set() 

259 

260 def clean(self): 

261 ''' 

262 Weed out missing files from the disk caches. 

263 ''' 

264 

265 self.dump_modified() 

266 

267 for fn in os.listdir(self.cachedir): 

268 if len(fn) == 40: 

269 cache = self._load_dircache(pjoin(self.cachedir, fn)) 

270 self._dump_dircache(cache, pjoin(self.cachedir, fn)) 

271 

272 def _get_dircache_for(self, abspath): 

273 return self._get_dircache(self._dircachepath(abspath)) 

274 

275 def _get_dircache(self, cachepath): 

276 if cachepath not in self.dircaches: 

277 if os.path.isfile(cachepath): 

278 self.dircaches[cachepath] = self._load_dircache(cachepath) 

279 else: 

280 self.dircaches[cachepath] = {} 

281 

282 return self.dircaches[cachepath] 

283 

284 def _dircachepath(self, abspath): 

285 cachefn = ehash(os.path.dirname(abspath)) 

286 return pjoin(self.cachedir, cachefn) 

287 

288 def _load_dircache(self, cachefilename): 

289 

290 with open(cachefilename, 'rb') as f: 

291 cache = pickle.load(f) 

292 

293 # weed out files which no longer exist 

294 for fn in list(cache.keys()): 

295 if not os.path.isfile(fn): 

296 del cache[fn] 

297 

298 time_float = util.get_time_float() 

299 

300 for v in cache.values(): 

301 v.trees_from_content(v.traces) 

302 for tr in v.traces: 

303 tr.file = v 

304 # fix Py2 codes to not include unicode when the cache file 

305 # was created with Py3 

306 if not isinstance(tr.station, str): 

307 tr.prune_from_reuse_cache() 

308 tr.set_codes( 

309 str(tr.network), 

310 str(tr.station), 

311 str(tr.location), 

312 str(tr.channel)) 

313 

314 tr.tmin = time_float(tr.tmin) 

315 tr.tmax = time_float(tr.tmax) 

316 

317 v.data_use_count = 0 

318 v.data_loaded = False 

319 v.fix_unicode_codes() 

320 

321 return cache 

322 

323 def _dump_dircache(self, cache, cachefilename): 

324 

325 if not cache: 

326 if os.path.exists(cachefilename): 

327 os.remove(cachefilename) 

328 return 

329 

330 # make a copy without the parents and the binsearch trees 

331 cache_copy = {} 

332 for fn in cache.keys(): 

333 trf = copy.copy(cache[fn]) 

334 trf.parent = None 

335 trf.by_tmin = None 

336 trf.by_tmax = None 

337 trf.by_tlen = None 

338 trf.by_mtime = None 

339 trf.data_use_count = 0 

340 trf.data_loaded = False 

341 traces = [] 

342 for tr in trf.traces: 

343 tr = tr.copy(data=False) 

344 tr.ydata = None 

345 tr.meta = None 

346 tr.file = trf 

347 traces.append(tr) 

348 

349 trf.traces = traces 

350 

351 cache_copy[fn] = trf 

352 

353 tmpfn = cachefilename+'.%i.tmp' % os.getpid() 

354 with open(tmpfn, 'wb') as f: 

355 pickle.dump(cache_copy, f, protocol=2) 

356 

357 if is_windows and os.path.exists(cachefilename): 

358 # windows doesn't allow to rename over existing file 

359 os.unlink(cachefilename) 

360 

361 os.rename(tmpfn, cachefilename) 

362 

363 

364def get_cache(cachedir): 

365 ''' 

366 Get global TracesFileCache object for given directory. 

367 ''' 

368 if cachedir not in TracesFileCache.caches: 

369 TracesFileCache.caches[cachedir] = TracesFileCache(cachedir) 

370 

371 return TracesFileCache.caches[cachedir] 

372 

373 

374def loader( 

375 filenames, fileformat, cache, filename_attributes, 

376 show_progress=True, update_progress=None): 

377 

378 if show_progress_force_off: 

379 show_progress = False 

380 

381 class Progress(object): 

382 def __init__(self, label, n): 

383 self._label = label 

384 self._n = n 

385 self._bar = None 

386 if show_progress: 

387 self._bar = util.progressbar(label, self._n) 

388 

389 if update_progress: 

390 update_progress(label, 0, self._n) 

391 

392 def update(self, i): 

393 if self._bar: 

394 if i < self._n-1: 

395 self._bar.update(i) 

396 else: 

397 self._bar.finish() 

398 self._bar = None 

399 

400 abort = False 

401 if update_progress: 

402 abort = update_progress(self._label, i, self._n) 

403 

404 return abort 

405 

406 if not filenames: 

407 logger.warning('No files to load from') 

408 return 

409 

410 regex = None 

411 if filename_attributes: 

412 regex = re.compile(filename_attributes) 

413 

414 progress = Progress('Looking at files', len(filenames)) 

415 

416 failures = [] 

417 to_load = [] 

418 for i, filename in enumerate(filenames): 

419 try: 

420 abspath = os.path.abspath(filename) 

421 

422 substitutions = None 

423 if regex: 

424 m = regex.search(filename) 

425 if not m: 

426 raise FilenameAttributeError( 

427 "Cannot get attributes with pattern '%s' " 

428 "from path '%s'" % (filename_attributes, filename)) 

429 

430 substitutions = {} 

431 for k in m.groupdict(): 

432 if k in ('network', 'station', 'location', 'channel'): 

433 substitutions[k] = m.groupdict()[k] 

434 

435 mtime = os.stat(filename)[8] 

436 tfile = None 

437 if cache: 

438 tfile = cache.get(abspath) 

439 

440 mustload = ( 

441 not tfile or 

442 (tfile.format != fileformat and fileformat != 'detect') or 

443 tfile.mtime != mtime or 

444 substitutions is not None) 

445 

446 to_load.append((mustload, mtime, abspath, substitutions, tfile)) 

447 

448 except (OSError, FilenameAttributeError) as xerror: 

449 failures.append(abspath) 

450 logger.warning(xerror) 

451 

452 abort = progress.update(i+1) 

453 if abort: 

454 progress.update(len(filenames)) 

455 return 

456 

457 progress.update(len(filenames)) 

458 

459 to_load.sort(key=lambda x: x[2]) 

460 

461 nload = len([1 for x in to_load if x[0]]) 

462 iload = 0 

463 

464 count_all = False 

465 if nload < 0.01*len(to_load): 

466 nload = len(to_load) 

467 count_all = True 

468 

469 if to_load: 

470 progress = Progress('Scanning files', nload) 

471 

472 for (mustload, mtime, abspath, substitutions, tfile) in to_load: 

473 try: 

474 if mustload: 

475 tfile = TracesFile( 

476 None, abspath, fileformat, 

477 substitutions=substitutions, mtime=mtime) 

478 

479 if cache and not substitutions: 

480 cache.put(abspath, tfile) 

481 

482 if not count_all: 

483 iload += 1 

484 

485 if count_all: 

486 iload += 1 

487 

488 except (io.FileLoadError, OSError) as xerror: 

489 failures.append(abspath) 

490 logger.warning(xerror) 

491 else: 

492 yield tfile 

493 

494 abort = progress.update(iload+1) 

495 if abort: 

496 break 

497 

498 progress.update(nload) 

499 

500 if failures: 

501 logger.warning( 

502 'The following file%s caused problems and will be ignored:\n' % 

503 util.plural_s(len(failures)) + '\n'.join(failures)) 

504 

505 if cache: 

506 cache.dump_modified() 

507 

508 

509def tlen(x): 

510 return x.tmax-x.tmin 

511 

512 

513class TracesGroup(object): 

514 

515 ''' 

516 Trace container base class. 

517 

518 Base class for Pile, SubPile, and TracesFile, i.e. anything containing 

519 a collection of several traces. A TracesGroup object maintains lookup sets 

520 of some of the traces meta-information, as well as a combined time-range 

521 of its contents. 

522 ''' 

523 

524 def __init__(self, parent): 

525 self.parent = parent 

526 self.empty() 

527 self.nupdates = 0 

528 self.abspath = None 

529 

530 def set_parent(self, parent): 

531 self.parent = parent 

532 

533 def get_parent(self): 

534 return self.parent 

535 

536 def empty(self): 

537 self.networks, self.stations, self.locations, self.channels, \ 

538 self.nslc_ids, self.deltats = [Counter() for x in range(6)] 

539 self.by_tmin = Sorted([], 'tmin') 

540 self.by_tmax = Sorted([], 'tmax') 

541 self.by_tlen = Sorted([], tlen) 

542 self.by_mtime = Sorted([], 'mtime') 

543 self.tmin, self.tmax = None, None 

544 self.deltatmin, self.deltatmax = None, None 

545 

546 def trees_from_content(self, content): 

547 self.by_tmin = Sorted(content, 'tmin') 

548 self.by_tmax = Sorted(content, 'tmax') 

549 self.by_tlen = Sorted(content, tlen) 

550 self.by_mtime = Sorted(content, 'mtime') 

551 self.adjust_minmax() 

552 

553 def fix_unicode_codes(self): 

554 for net in self.networks: 

555 if isinstance(net, str): 

556 return 

557 

558 self.networks = fix_unicode_copy(self.networks, str) 

559 self.stations = fix_unicode_copy(self.stations, str) 

560 self.locations = fix_unicode_copy(self.locations, str) 

561 self.channels = fix_unicode_copy(self.channels, str) 

562 self.nslc_ids = fix_unicode_copy( 

563 self.nslc_ids, lambda k: tuple(str(x) for x in k)) 

564 

565 def add(self, content): 

566 ''' 

567 Add content to traces group and update indices. 

568 

569 Accepts :py:class:`pyrocko.trace.Trace` objects and 

570 :py:class:`pyrocko.pile.TracesGroup` objects. 

571 ''' 

572 

573 if isinstance(content, (trace.Trace, TracesGroup)): 

574 content = [content] 

575 

576 for c in content: 

577 

578 if isinstance(c, TracesGroup): 

579 self.networks.update(c.networks) 

580 self.stations.update(c.stations) 

581 self.locations.update(c.locations) 

582 self.channels.update(c.channels) 

583 self.nslc_ids.update(c.nslc_ids) 

584 self.deltats.update(c.deltats) 

585 

586 self.by_tmin.insert_many(c.by_tmin) 

587 self.by_tmax.insert_many(c.by_tmax) 

588 self.by_tlen.insert_many(c.by_tlen) 

589 self.by_mtime.insert_many(c.by_mtime) 

590 

591 elif isinstance(c, trace.Trace): 

592 self.networks[c.network] += 1 

593 self.stations[c.station] += 1 

594 self.locations[c.location] += 1 

595 self.channels[c.channel] += 1 

596 self.nslc_ids[c.nslc_id] += 1 

597 self.deltats[c.deltat] += 1 

598 

599 self.by_tmin.insert(c) 

600 self.by_tmax.insert(c) 

601 self.by_tlen.insert(c) 

602 self.by_mtime.insert(c) 

603 

604 self.adjust_minmax() 

605 

606 self.nupdates += 1 

607 self.notify_listeners('add') 

608 

609 if self.parent is not None: 

610 self.parent.add(content) 

611 

612 def remove(self, content): 

613 ''' 

614 Remove content to traces group and update indices. 

615 ''' 

616 if isinstance(content, (trace.Trace, TracesGroup)): 

617 content = [content] 

618 

619 for c in content: 

620 

621 if isinstance(c, TracesGroup): 

622 self.networks.subtract(c.networks) 

623 self.stations.subtract(c.stations) 

624 self.locations.subtract(c.locations) 

625 self.channels.subtract(c.channels) 

626 self.nslc_ids.subtract(c.nslc_ids) 

627 self.deltats.subtract(c.deltats) 

628 

629 self.by_tmin.remove_many(c.by_tmin) 

630 self.by_tmax.remove_many(c.by_tmax) 

631 self.by_tlen.remove_many(c.by_tlen) 

632 self.by_mtime.remove_many(c.by_mtime) 

633 

634 elif isinstance(c, trace.Trace): 

635 self.networks.subtract1(c.network) 

636 self.stations.subtract1(c.station) 

637 self.locations.subtract1(c.location) 

638 self.channels.subtract1(c.channel) 

639 self.nslc_ids.subtract1(c.nslc_id) 

640 self.deltats.subtract1(c.deltat) 

641 

642 self.by_tmin.remove(c) 

643 self.by_tmax.remove(c) 

644 self.by_tlen.remove(c) 

645 self.by_mtime.remove(c) 

646 

647 self.adjust_minmax() 

648 

649 self.nupdates += 1 

650 self.notify_listeners('remove') 

651 

652 if self.parent is not None: 

653 self.parent.remove(content) 

654 

655 def relevant(self, tmin, tmax, group_selector=None, trace_selector=None): 

656 ''' 

657 Return list of :py:class:`pyrocko.trace.Trace` objects where given 

658 arguments ``tmin`` and ``tmax`` match. 

659 

660 :param tmin: start time 

661 :param tmax: end time 

662 :param group_selector: lambda expression taking group dict of regex 

663 match object as a single argument and which returns true or false 

664 to keep or reject a file (default: ``None``) 

665 :param trace_selector: lambda expression taking group dict of regex 

666 match object as a single argument and which returns true or false 

667 to keep or reject a file (default: ``None``) 

668 ''' 

669 

670 if not self.by_tmin or not self.is_relevant( 

671 tmin, tmax, group_selector): 

672 

673 return [] 

674 

675 return [tr for tr in self.by_tmin.with_key_in(tmin-self.tlenmax, tmax) 

676 if tr.is_relevant(tmin, tmax, trace_selector)] 

677 

678 def adjust_minmax(self): 

679 if self.by_tmin: 

680 self.tmin = self.by_tmin.min().tmin 

681 self.tmax = self.by_tmax.max().tmax 

682 t = self.by_tlen.max() 

683 self.tlenmax = t.tmax - t.tmin 

684 self.mtime = self.by_mtime.max().mtime 

685 deltats = list(self.deltats.keys()) 

686 self.deltatmin = min(deltats) 

687 self.deltatmax = max(deltats) 

688 else: 

689 self.tmin = None 

690 self.tmax = None 

691 self.tlenmax = None 

692 self.mtime = None 

693 self.deltatmin = None 

694 self.deltatmax = None 

695 

696 def notify_listeners(self, what): 

697 pass 

698 

699 def get_update_count(self): 

700 return self.nupdates 

701 

702 def overlaps(self, tmin, tmax): 

703 return self.tmin is not None \ 

704 and tmax >= self.tmin and self.tmax >= tmin 

705 

706 def is_relevant(self, tmin, tmax, group_selector=None): 

707 if self.tmin is None or self.tmax is None: 

708 return False 

709 return tmax >= self.tmin and self.tmax >= tmin and ( 

710 group_selector is None or group_selector(self)) 

711 

712 

713class MemTracesFile(TracesGroup): 

714 

715 ''' 

716 This is needed to make traces without an actual disc file to be inserted 

717 into a Pile. 

718 ''' 

719 

720 def __init__(self, parent, traces): 

721 TracesGroup.__init__(self, parent) 

722 self.add(traces) 

723 self.mtime = time.time() 

724 

725 def add(self, traces): 

726 if isinstance(traces, trace.Trace): 

727 traces = [traces] 

728 

729 for tr in traces: 

730 tr.file = self 

731 

732 TracesGroup.add(self, traces) 

733 

734 def load_headers(self, mtime=None): 

735 pass 

736 

737 def load_data(self): 

738 pass 

739 

740 def use_data(self): 

741 pass 

742 

743 def drop_data(self): 

744 pass 

745 

746 def reload_if_modified(self): 

747 return False 

748 

749 def iter_traces(self): 

750 for tr in self.by_tmin: 

751 yield tr 

752 

753 def get_traces(self): 

754 return list(self.by_tmin) 

755 

756 def gather_keys(self, gather, selector=None): 

757 keys = set() 

758 for tr in self.by_tmin: 

759 if selector is None or selector(tr): 

760 keys.add(gather(tr)) 

761 

762 return keys 

763 

764 def __str__(self): 

765 

766 s = 'MemTracesFile\n' 

767 s += 'file mtime: %s\n' % util.time_to_str(self.mtime) 

768 s += 'number of traces: %i\n' % len(self.by_tmin) 

769 s += 'timerange: %s - %s\n' % ( 

770 util.time_to_str(self.tmin), util.time_to_str(self.tmax)) 

771 s += 'networks: %s\n' % ', '.join(sl(self.networks.keys())) 

772 s += 'stations: %s\n' % ', '.join(sl(self.stations.keys())) 

773 s += 'locations: %s\n' % ', '.join(sl(self.locations.keys())) 

774 s += 'channels: %s\n' % ', '.join(sl(self.channels.keys())) 

775 s += 'deltats: %s\n' % ', '.join(sl(self.deltats.keys())) 

776 return s 

777 

778 

779class TracesFile(TracesGroup): 

780 def __init__( 

781 self, parent, abspath, format, 

782 substitutions=None, mtime=None): 

783 

784 TracesGroup.__init__(self, parent) 

785 self.abspath = abspath 

786 self.format = format 

787 self.traces = [] 

788 self.data_loaded = False 

789 self.data_use_count = 0 

790 self.substitutions = substitutions 

791 self.load_headers(mtime=mtime) 

792 self.mtime = mtime 

793 

794 def load_headers(self, mtime=None): 

795 logger.debug('loading headers from file: %s' % self.abspath) 

796 if mtime is None: 

797 self.mtime = os.stat(self.abspath)[8] 

798 

799 def kgen(tr): 

800 return (tr.mtime, tr.tmin, tr.tmax) + tr.nslc_id 

801 

802 self.remove(self.traces) 

803 ks = set() 

804 for tr in io.load(self.abspath, 

805 format=self.format, 

806 getdata=False, 

807 substitutions=self.substitutions): 

808 

809 k = kgen(tr) 

810 if k not in ks: 

811 ks.add(k) 

812 self.traces.append(tr) 

813 tr.file = self 

814 

815 self.add(self.traces) 

816 

817 self.data_loaded = False 

818 self.data_use_count = 0 

819 

820 def load_data(self, force=False): 

821 file_changed = False 

822 if not self.data_loaded or force: 

823 logger.debug('loading data from file: %s' % self.abspath) 

824 

825 def kgen(tr): 

826 return (tr.mtime, tr.tmin, tr.tmax) + tr.nslc_id 

827 

828 traces_ = io.load(self.abspath, format=self.format, getdata=True, 

829 substitutions=self.substitutions) 

830 

831 # prevent adding duplicate snippets from corrupt mseed files 

832 k_loaded = set() 

833 traces = [] 

834 for tr in traces_: 

835 k = kgen(tr) 

836 if k not in k_loaded: 

837 k_loaded.add(k) 

838 traces.append(tr) 

839 

840 k_current_d = dict((kgen(tr), tr) for tr in self.traces) 

841 k_current = set(k_current_d) 

842 k_new = k_loaded - k_current 

843 k_delete = k_current - k_loaded 

844 k_unchanged = k_current & k_loaded 

845 

846 for tr in self.traces[:]: 

847 if kgen(tr) in k_delete: 

848 self.remove(tr) 

849 self.traces.remove(tr) 

850 tr.file = None 

851 file_changed = True 

852 

853 for tr in traces: 

854 if kgen(tr) in k_new: 

855 tr.file = self 

856 self.traces.append(tr) 

857 self.add(tr) 

858 file_changed = True 

859 

860 for tr in traces: 

861 if kgen(tr) in k_unchanged: 

862 ctr = k_current_d[kgen(tr)] 

863 ctr.ydata = tr.ydata 

864 

865 self.data_loaded = True 

866 

867 if file_changed: 

868 logger.debug('reloaded (file may have changed): %s' % self.abspath) 

869 

870 return file_changed 

871 

872 def use_data(self): 

873 if not self.data_loaded: 

874 raise Exception('Data not loaded') 

875 self.data_use_count += 1 

876 

877 def drop_data(self): 

878 if self.data_loaded: 

879 if self.data_use_count == 1: 

880 logger.debug('forgetting data of file: %s' % self.abspath) 

881 for tr in self.traces: 

882 tr.drop_data() 

883 

884 self.data_loaded = False 

885 

886 self.data_use_count -= 1 

887 else: 

888 self.data_use_count = 0 

889 

890 def reload_if_modified(self): 

891 mtime = os.stat(self.abspath)[8] 

892 if mtime != self.mtime: 

893 logger.debug( 

894 'mtime=%i, reloading file: %s' % (mtime, self.abspath)) 

895 

896 self.mtime = mtime 

897 if self.data_loaded: 

898 self.load_data(force=True) 

899 else: 

900 self.load_headers() 

901 

902 return True 

903 

904 return False 

905 

906 def iter_traces(self): 

907 for tr in self.traces: 

908 yield tr 

909 

910 def gather_keys(self, gather, selector=None): 

911 keys = set() 

912 for tr in self.by_tmin: 

913 if selector is None or selector(tr): 

914 keys.add(gather(tr)) 

915 

916 return keys 

917 

918 def __str__(self): 

919 s = 'TracesFile\n' 

920 s += 'abspath: %s\n' % self.abspath 

921 s += 'file mtime: %s\n' % util.time_to_str(self.mtime) 

922 s += 'number of traces: %i\n' % len(self.traces) 

923 s += 'timerange: %s - %s\n' % ( 

924 util.time_to_str(self.tmin), util.time_to_str(self.tmax)) 

925 s += 'networks: %s\n' % ', '.join(sl(self.networks.keys())) 

926 s += 'stations: %s\n' % ', '.join(sl(self.stations.keys())) 

927 s += 'locations: %s\n' % ', '.join(sl(self.locations.keys())) 

928 s += 'channels: %s\n' % ', '.join(sl(self.channels.keys())) 

929 s += 'deltats: %s\n' % ', '.join(sl(self.deltats.keys())) 

930 return s 

931 

932 

933class FilenameAttributeError(Exception): 

934 pass 

935 

936 

937class SubPile(TracesGroup): 

938 def __init__(self, parent): 

939 TracesGroup.__init__(self, parent) 

940 self.files = [] 

941 self.empty() 

942 

943 def add_file(self, file): 

944 self.files.append(file) 

945 file.set_parent(self) 

946 self.add(file) 

947 

948 def remove_file(self, file): 

949 self.files.remove(file) 

950 file.set_parent(None) 

951 self.remove(file) 

952 

953 def remove_files(self, files): 

954 for file in files: 

955 self.files.remove(file) 

956 file.set_parent(None) 

957 self.remove(files) 

958 

959 def gather_keys(self, gather, selector=None): 

960 keys = set() 

961 for file in self.files: 

962 keys |= file.gather_keys(gather, selector) 

963 

964 return keys 

965 

966 def iter_traces( 

967 self, 

968 load_data=False, 

969 return_abspath=False, 

970 group_selector=None, 

971 trace_selector=None): 

972 

973 for file in self.files: 

974 

975 if group_selector and not group_selector(file): 

976 continue 

977 

978 must_drop = False 

979 if load_data: 

980 file.load_data() 

981 file.use_data() 

982 must_drop = True 

983 

984 for tr in file.iter_traces(): 

985 if trace_selector and not trace_selector(tr): 

986 continue 

987 

988 if return_abspath: 

989 yield file.abspath, tr 

990 else: 

991 yield tr 

992 

993 if must_drop: 

994 file.drop_data() 

995 

996 def iter_files(self): 

997 for file in self.files: 

998 yield file 

999 

1000 def reload_modified(self): 

1001 modified = False 

1002 for file in self.files: 

1003 modified |= file.reload_if_modified() 

1004 

1005 return modified 

1006 

1007 def __str__(self): 

1008 s = 'SubPile\n' 

1009 s += 'number of files: %i\n' % len(self.files) 

1010 s += 'timerange: %s - %s\n' % ( 

1011 util.time_to_str(self.tmin), util.time_to_str(self.tmax)) 

1012 s += 'networks: %s\n' % ', '.join(sl(self.networks.keys())) 

1013 s += 'stations: %s\n' % ', '.join(sl(self.stations.keys())) 

1014 s += 'locations: %s\n' % ', '.join(sl(self.locations.keys())) 

1015 s += 'channels: %s\n' % ', '.join(sl(self.channels.keys())) 

1016 s += 'deltats: %s\n' % ', '.join(sl(self.deltats.keys())) 

1017 return s 

1018 

1019 

1020class Batch(object): 

1021 ''' 

1022 Batch of waveforms from window wise data extraction. 

1023 

1024 Encapsulates state and results yielded for each window in window wise 

1025 waveform extraction with the :py:meth:`Pile.chopper` method (when the 

1026 `style='batch'` keyword argument set). 

1027 

1028 *Attributes:* 

1029 

1030 .. py:attribute:: tmin 

1031 

1032 Start of this time window. 

1033 

1034 .. py:attribute:: tmax 

1035 

1036 End of this time window. 

1037 

1038 .. py:attribute:: i 

1039 

1040 Index of this time window in sequence. 

1041 

1042 .. py:attribute:: n 

1043 

1044 Total number of time windows in sequence. 

1045 

1046 .. py:attribute:: traces 

1047 

1048 Extracted waveforms for this time window. 

1049 ''' 

1050 

1051 def __init__(self, tmin, tmax, i, n, traces): 

1052 self.tmin = tmin 

1053 self.tmax = tmax 

1054 self.i = i 

1055 self.n = n 

1056 self.traces = traces 

1057 

1058 

1059class Pile(TracesGroup): 

1060 ''' 

1061 Waveform archive lookup, data loading and caching infrastructure. 

1062 ''' 

1063 

1064 def __init__(self): 

1065 TracesGroup.__init__(self, None) 

1066 self.subpiles = {} 

1067 self.open_files = {} 

1068 self.listeners = [] 

1069 self.abspaths = set() 

1070 

1071 def add_listener(self, obj): 

1072 self.listeners.append(weakref.ref(obj)) 

1073 

1074 def notify_listeners(self, what): 

1075 for ref in self.listeners: 

1076 obj = ref() 

1077 if obj: 

1078 obj.pile_changed(what) 

1079 

1080 def load_files( 

1081 self, filenames, 

1082 filename_attributes=None, 

1083 fileformat='mseed', 

1084 cache=None, 

1085 show_progress=True, 

1086 update_progress=None): 

1087 

1088 load = loader( 

1089 filenames, fileformat, cache, filename_attributes, 

1090 show_progress=show_progress, 

1091 update_progress=update_progress) 

1092 

1093 self.add_files(load) 

1094 

1095 def add_files(self, files): 

1096 for file in files: 

1097 self.add_file(file) 

1098 

1099 def add_file(self, file): 

1100 if file.abspath is not None and file.abspath in self.abspaths: 

1101 logger.warning('File already in pile: %s' % file.abspath) 

1102 return 

1103 

1104 if file.deltatmin is None: 

1105 logger.warning('Sampling rate of all traces are zero in file: %s' % 

1106 file.abspath) 

1107 return 

1108 

1109 subpile = self.dispatch(file) 

1110 subpile.add_file(file) 

1111 if file.abspath is not None: 

1112 self.abspaths.add(file.abspath) 

1113 

1114 def remove_file(self, file): 

1115 subpile = file.get_parent() 

1116 if subpile is not None: 

1117 subpile.remove_file(file) 

1118 if file.abspath is not None: 

1119 self.abspaths.remove(file.abspath) 

1120 

1121 def remove_files(self, files): 

1122 subpile_files = {} 

1123 for file in files: 

1124 subpile = file.get_parent() 

1125 if subpile not in subpile_files: 

1126 subpile_files[subpile] = [] 

1127 

1128 subpile_files[subpile].append(file) 

1129 

1130 for subpile, files in subpile_files.items(): 

1131 subpile.remove_files(files) 

1132 for file in files: 

1133 if file.abspath is not None: 

1134 self.abspaths.remove(file.abspath) 

1135 

1136 def dispatch_key(self, file): 

1137 dt = int(math.floor(math.log(file.deltatmin))) 

1138 return dt 

1139 

1140 def dispatch(self, file): 

1141 k = self.dispatch_key(file) 

1142 if k not in self.subpiles: 

1143 self.subpiles[k] = SubPile(self) 

1144 

1145 return self.subpiles[k] 

1146 

1147 def get_deltats(self): 

1148 return list(self.deltats.keys()) 

1149 

1150 def chop( 

1151 self, tmin, tmax, 

1152 group_selector=None, 

1153 trace_selector=None, 

1154 snap=(round, round), 

1155 include_last=False, 

1156 load_data=True): 

1157 

1158 chopped = [] 

1159 used_files = set() 

1160 

1161 traces = self.relevant(tmin, tmax, group_selector, trace_selector) 

1162 if load_data: 

1163 files_changed = False 

1164 for tr in traces: 

1165 if tr.file and tr.file not in used_files: 

1166 if tr.file.load_data(): 

1167 files_changed = True 

1168 

1169 if tr.file is not None: 

1170 used_files.add(tr.file) 

1171 

1172 if files_changed: 

1173 traces = self.relevant( 

1174 tmin, tmax, group_selector, trace_selector) 

1175 

1176 for tr in traces: 

1177 if not load_data and tr.ydata is not None: 

1178 tr = tr.copy(data=False) 

1179 tr.ydata = None 

1180 

1181 try: 

1182 chopped.append(tr.chop( 

1183 tmin, tmax, 

1184 inplace=False, 

1185 snap=snap, 

1186 include_last=include_last)) 

1187 

1188 except trace.NoData: 

1189 pass 

1190 

1191 return chopped, used_files 

1192 

1193 def _process_chopped( 

1194 self, chopped, degap, maxgap, maxlap, want_incomplete, wmax, wmin, 

1195 tpad): 

1196 

1197 chopped.sort(key=lambda a: a.full_id) 

1198 if degap: 

1199 chopped = degapper(chopped, maxgap=maxgap, maxlap=maxlap) 

1200 

1201 if not want_incomplete: 

1202 chopped_weeded = [] 

1203 for tr in chopped: 

1204 emin = tr.tmin - (wmin-tpad) 

1205 emax = tr.tmax + tr.deltat - (wmax+tpad) 

1206 if (abs(emin) <= 0.5*tr.deltat and abs(emax) <= 0.5*tr.deltat): 

1207 chopped_weeded.append(tr) 

1208 

1209 elif degap: 

1210 if (0. < emin <= 5. * tr.deltat and 

1211 -5. * tr.deltat <= emax < 0.): 

1212 

1213 tr.extend( 

1214 wmin-tpad, 

1215 wmax+tpad-tr.deltat, 

1216 fillmethod='repeat') 

1217 

1218 chopped_weeded.append(tr) 

1219 

1220 chopped = chopped_weeded 

1221 

1222 for tr in chopped: 

1223 tr.wmin = wmin 

1224 tr.wmax = wmax 

1225 

1226 return chopped 

1227 

1228 def chopper( 

1229 self, 

1230 tmin=None, tmax=None, tinc=None, tpad=0., 

1231 group_selector=None, trace_selector=None, 

1232 want_incomplete=True, degap=True, maxgap=5, maxlap=None, 

1233 keep_current_files_open=False, accessor_id=None, 

1234 snap=(round, round), include_last=False, load_data=True, 

1235 style=None): 

1236 

1237 ''' 

1238 Get iterator for shifting window wise data extraction from waveform 

1239 archive. 

1240 

1241 :param tmin: start time (default uses start time of available data) 

1242 :param tmax: end time (default uses end time of available data) 

1243 :param tinc: time increment (window shift time) (default uses 

1244 ``tmax-tmin``) 

1245 :param tpad: padding time appended on either side of the data windows 

1246 (window overlap is ``2*tpad``) 

1247 :param group_selector: filter callback taking :py:class:`TracesGroup` 

1248 objects 

1249 :param trace_selector: filter callback taking 

1250 :py:class:`pyrocko.trace.Trace` objects 

1251 :param want_incomplete: if set to ``False``, gappy/incomplete traces 

1252 are discarded from the results 

1253 :param degap: whether to try to connect traces and to remove gaps and 

1254 overlaps 

1255 :param maxgap: maximum gap size in samples which is filled with 

1256 interpolated samples when ``degap`` is ``True`` 

1257 :param maxlap: maximum overlap size in samples which is removed when 

1258 ``degap`` is ``True`` 

1259 :param keep_current_files_open: whether to keep cached trace data in 

1260 memory after the iterator has ended 

1261 :param accessor_id: if given, used as a key to identify different 

1262 points of extraction for the decision of when to release cached 

1263 trace data (should be used when data is alternately extracted from 

1264 more than one region / selection) 

1265 :param snap: replaces Python's :py:func:`round` function which is used 

1266 to determine indices where to start and end the trace data array 

1267 :param include_last: whether to include last sample 

1268 :param load_data: whether to load the waveform data. If set to 

1269 ``False``, traces with no data samples, but with correct 

1270 meta-information are returned 

1271 :param style: set to ``'batch'`` to yield waveforms and information 

1272 about the chopper state as :py:class:`Batch` objects. By default 

1273 lists of :py:class:`pyrocko.trace.Trace` objects are yielded. 

1274 :returns: iterator providing extracted waveforms for each extracted 

1275 window. See ``style`` argument for details. 

1276 ''' 

1277 if tmin is None: 

1278 if self.tmin is None: 

1279 logger.warning('Pile\'s tmin is not set - pile may be empty.') 

1280 return 

1281 tmin = self.tmin + tpad 

1282 

1283 if tmax is None: 

1284 if self.tmax is None: 

1285 logger.warning('Pile\'s tmax is not set - pile may be empty.') 

1286 return 

1287 tmax = self.tmax - tpad 

1288 

1289 if not self.is_relevant(tmin-tpad, tmax+tpad, group_selector): 

1290 return 

1291 

1292 if accessor_id not in self.open_files: 

1293 self.open_files[accessor_id] = set() 

1294 

1295 open_files = self.open_files[accessor_id] 

1296 

1297 if tinc is None: 

1298 tinc = tmax - tmin 

1299 nwin = 1 

1300 else: 

1301 eps = tinc * 1e-6 

1302 if tinc != 0.0: 

1303 nwin = int(((tmax - eps) - tmin) / tinc) + 1 

1304 else: 

1305 nwin = 1 

1306 

1307 for iwin in range(nwin): 

1308 chopped = [] 

1309 wmin, wmax = tmin+iwin*tinc, min(tmin+(iwin+1)*tinc, tmax) 

1310 

1311 chopped, used_files = self.chop( 

1312 wmin-tpad, wmax+tpad, group_selector, trace_selector, snap, 

1313 include_last, load_data) 

1314 

1315 for file in used_files - open_files: 

1316 # increment datause counter on newly opened files 

1317 file.use_data() 

1318 

1319 open_files.update(used_files) 

1320 

1321 processed = self._process_chopped( 

1322 chopped, degap, maxgap, maxlap, want_incomplete, wmax, wmin, 

1323 tpad) 

1324 

1325 if style == 'batch': 

1326 yield Batch( 

1327 tmin=wmin, 

1328 tmax=wmax, 

1329 i=iwin, 

1330 n=nwin, 

1331 traces=processed) 

1332 

1333 else: 

1334 yield processed 

1335 

1336 unused_files = open_files - used_files 

1337 

1338 while unused_files: 

1339 file = unused_files.pop() 

1340 file.drop_data() 

1341 open_files.remove(file) 

1342 

1343 if not keep_current_files_open: 

1344 while open_files: 

1345 file = open_files.pop() 

1346 file.drop_data() 

1347 

1348 def all(self, *args, **kwargs): 

1349 ''' 

1350 Shortcut to aggregate :py:meth:`chopper` output into a single list. 

1351 ''' 

1352 

1353 alltraces = [] 

1354 for traces in self.chopper(*args, **kwargs): 

1355 alltraces.extend(traces) 

1356 

1357 return alltraces 

1358 

1359 def iter_all(self, *args, **kwargs): 

1360 for traces in self.chopper(*args, **kwargs): 

1361 for tr in traces: 

1362 yield tr 

1363 

1364 def chopper_grouped(self, gather, progress=None, *args, **kwargs): 

1365 keys = self.gather_keys(gather) 

1366 if len(keys) == 0: 

1367 return 

1368 

1369 outer_group_selector = None 

1370 if 'group_selector' in kwargs: 

1371 outer_group_selector = kwargs['group_selector'] 

1372 

1373 outer_trace_selector = None 

1374 if 'trace_selector' in kwargs: 

1375 outer_trace_selector = kwargs['trace_selector'] 

1376 

1377 # the use of this gather-cache makes it impossible to modify the pile 

1378 # during chopping 

1379 gather_cache = {} 

1380 pbar = None 

1381 if progress is not None: 

1382 pbar = util.progressbar(progress, len(keys)) 

1383 

1384 for ikey, key in enumerate(keys): 

1385 def tsel(tr): 

1386 return gather(tr) == key and (outer_trace_selector is None or 

1387 outer_trace_selector(tr)) 

1388 

1389 def gsel(gr): 

1390 if gr not in gather_cache: 

1391 gather_cache[gr] = gr.gather_keys(gather) 

1392 

1393 return key in gather_cache[gr] and ( 

1394 outer_group_selector is None or outer_group_selector(gr)) 

1395 

1396 kwargs['trace_selector'] = tsel 

1397 kwargs['group_selector'] = gsel 

1398 

1399 for traces in self.chopper(*args, **kwargs): 

1400 yield traces 

1401 

1402 if pbar: 

1403 pbar.update(ikey+1) 

1404 

1405 if pbar: 

1406 pbar.finish() 

1407 

1408 def gather_keys(self, gather, selector=None): 

1409 keys = set() 

1410 for subpile in self.subpiles.values(): 

1411 keys |= subpile.gather_keys(gather, selector) 

1412 

1413 return sorted(keys) 

1414 

1415 def iter_traces( 

1416 self, 

1417 load_data=False, 

1418 return_abspath=False, 

1419 group_selector=None, 

1420 trace_selector=None): 

1421 

1422 ''' 

1423 Iterate over all traces in pile. 

1424 

1425 :param load_data: whether to load the waveform data, by default empty 

1426 traces are yielded 

1427 :param return_abspath: if ``True`` yield tuples containing absolute 

1428 file path and :py:class:`pyrocko.trace.Trace` objects 

1429 :param group_selector: filter callback taking :py:class:`TracesGroup` 

1430 objects 

1431 :param trace_selector: filter callback taking 

1432 :py:class:`pyrocko.trace.Trace` objects 

1433 

1434 Example; yields only traces, where the station code is 'HH1':: 

1435 

1436 test_pile = pile.make_pile('/local/test_trace_directory') 

1437 for t in test_pile.iter_traces( 

1438 trace_selector=lambda tr: tr.station=='HH1'): 

1439 

1440 print t 

1441 ''' 

1442 

1443 for subpile in self.subpiles.values(): 

1444 if not group_selector or group_selector(subpile): 

1445 for tr in subpile.iter_traces(load_data, return_abspath, 

1446 group_selector, trace_selector): 

1447 yield tr 

1448 

1449 def iter_files(self): 

1450 for subpile in self.subpiles.values(): 

1451 for file in subpile.iter_files(): 

1452 yield file 

1453 

1454 def reload_modified(self): 

1455 modified = False 

1456 for subpile in self.subpiles.values(): 

1457 modified |= subpile.reload_modified() 

1458 

1459 return modified 

1460 

1461 def get_tmin(self): 

1462 return self.tmin 

1463 

1464 def get_tmax(self): 

1465 return self.tmax 

1466 

1467 def get_deltatmin(self): 

1468 return self.deltatmin 

1469 

1470 def get_deltatmax(self): 

1471 return self.deltatmax 

1472 

1473 def is_empty(self): 

1474 return self.tmin is None and self.tmax is None 

1475 

1476 def __str__(self): 

1477 if self.tmin is not None and self.tmax is not None: 

1478 tmin = util.time_to_str(self.tmin) 

1479 tmax = util.time_to_str(self.tmax) 

1480 s = 'Pile\n' 

1481 s += 'number of subpiles: %i\n' % len(self.subpiles) 

1482 s += 'timerange: %s - %s\n' % (tmin, tmax) 

1483 s += 'networks: %s\n' % ', '.join(sl(self.networks.keys())) 

1484 s += 'stations: %s\n' % ', '.join(sl(self.stations.keys())) 

1485 s += 'locations: %s\n' % ', '.join(sl(self.locations.keys())) 

1486 s += 'channels: %s\n' % ', '.join(sl(self.channels.keys())) 

1487 s += 'deltats: %s\n' % ', '.join(sl(self.deltats.keys())) 

1488 

1489 else: 

1490 s = 'empty Pile' 

1491 

1492 return s 

1493 

1494 def snuffle(self, **kwargs): 

1495 ''' 

1496 Visualize it. 

1497 

1498 :param stations: list of `pyrocko.model.Station` objects or ``None`` 

1499 :param events: list of `pyrocko.model.Event` objects or ``None`` 

1500 :param markers: list of `pyrocko.gui_util.Marker` objects or ``None`` 

1501 :param ntracks: float, number of tracks to be shown initially 

1502 (default: 12) 

1503 :param follow: time interval (in seconds) for real time follow mode or 

1504 ``None`` 

1505 :param controls: bool, whether to show the main controls (default: 

1506 ``True``) 

1507 :param opengl: bool, whether to use opengl (default: ``False``) 

1508 ''' 

1509 

1510 from pyrocko.gui.snuffler import snuffle 

1511 snuffle(self, **kwargs) 

1512 

1513 

1514def make_pile( 

1515 paths=None, selector=None, regex=None, 

1516 fileformat='mseed', 

1517 cachedirname=None, show_progress=True): 

1518 

1519 ''' 

1520 Create pile from given file and directory names. 

1521 

1522 :param paths: filenames and/or directories to look for traces. If paths is 

1523 ``None`` ``sys.argv[1:]`` is used. 

1524 :param selector: lambda expression taking group dict of regex match object 

1525 as a single argument and which returns true or false to keep or reject 

1526 a file 

1527 :param regex: regular expression which filenames have to match 

1528 :param fileformat: format of the files ('mseed', 'sac', 'kan', 

1529 'from_extension', 'detect') 

1530 :param cachedirname: loader cache is stored under this directory. It is 

1531 created as neccessary. 

1532 :param show_progress: show progress bar and other progress information 

1533 ''' 

1534 

1535 if show_progress_force_off: 

1536 show_progress = False 

1537 

1538 if isinstance(paths, str): 

1539 paths = [paths] 

1540 

1541 if paths is None: 

1542 paths = sys.argv[1:] 

1543 

1544 if cachedirname is None: 

1545 cachedirname = config.config().cache_dir 

1546 

1547 fns = util.select_files( 

1548 paths, selector, regex, show_progress=show_progress) 

1549 

1550 cache = get_cache(cachedirname) 

1551 p = Pile() 

1552 p.load_files( 

1553 sorted(fns), 

1554 cache=cache, 

1555 fileformat=fileformat, 

1556 show_progress=show_progress) 

1557 

1558 return p 

1559 

1560 

1561class Injector(trace.States): 

1562 

1563 def __init__( 

1564 self, pile, 

1565 fixation_length=None, 

1566 path=None, 

1567 format='from_extension', 

1568 forget_fixed=False): 

1569 

1570 trace.States.__init__(self) 

1571 self._pile = pile 

1572 self._fixation_length = fixation_length 

1573 self._format = format 

1574 self._path = path 

1575 self._forget_fixed = forget_fixed 

1576 

1577 def set_fixation_length(self, length): 

1578 ''' 

1579 Set length after which the fixation method is called on buffer traces. 

1580 

1581 The length should be given in seconds. Give None to disable. 

1582 ''' 

1583 self.fixate_all() 

1584 self._fixation_length = length # in seconds 

1585 

1586 def set_save_path( 

1587 self, 

1588 path='dump_%(network)s.%(station)s.%(location)s.%(channel)s_' 

1589 '%(tmin)s_%(tmax)s.mseed'): 

1590 

1591 self.fixate_all() 

1592 self._path = path 

1593 

1594 def inject(self, trace): 

1595 logger.debug('Received a trace: %s' % trace) 

1596 

1597 buf = self.get(trace) 

1598 if buf is None: 

1599 trbuf = trace.copy() 

1600 buf = MemTracesFile(None, [trbuf]) 

1601 self._pile.add_file(buf) 

1602 self.set(trace, buf) 

1603 

1604 else: 

1605 self._pile.remove_file(buf) 

1606 trbuf = buf.get_traces()[0] 

1607 buf.remove(trbuf) 

1608 trbuf.append(trace.ydata) 

1609 buf.add(trbuf) 

1610 self._pile.add_file(buf) 

1611 self.set(trace, buf) 

1612 

1613 trbuf = buf.get_traces()[0] 

1614 if self._fixation_length is not None: 

1615 if trbuf.tmax - trbuf.tmin > self._fixation_length: 

1616 self._fixate(buf, complete=False) 

1617 

1618 def fixate_all(self): 

1619 for state in list(self._states.values()): 

1620 self._fixate(state[-1]) 

1621 

1622 self._states = {} 

1623 

1624 def free(self, buf): 

1625 self._fixate(buf) 

1626 

1627 def _fixate(self, buf, complete=True): 

1628 trbuf = buf.get_traces()[0] 

1629 del_state = True 

1630 if self._path: 

1631 if self._fixation_length is not None: 

1632 ttmin = trbuf.tmin 

1633 ytmin = util.year_start(ttmin) 

1634 n = int(math.floor((ttmin - ytmin) / self._fixation_length)) 

1635 tmin = ytmin + n*self._fixation_length 

1636 traces = [] 

1637 t = tmin 

1638 while t <= trbuf.tmax: 

1639 try: 

1640 traces.append( 

1641 trbuf.chop( 

1642 t, 

1643 t+self._fixation_length, 

1644 inplace=False, 

1645 snap=(math.ceil, math.ceil))) 

1646 

1647 except trace.NoData: 

1648 pass 

1649 t += self._fixation_length 

1650 

1651 if abs(traces[-1].tmax - (t - trbuf.deltat)) < \ 

1652 trbuf.deltat/100. or complete: 

1653 

1654 self._pile.remove_file(buf) 

1655 

1656 else: # reinsert incomplete last part 

1657 new_trbuf = traces.pop() 

1658 self._pile.remove_file(buf) 

1659 buf.remove(trbuf) 

1660 buf.add(new_trbuf) 

1661 self._pile.add_file(buf) 

1662 del_state = False 

1663 

1664 else: 

1665 traces = [trbuf] 

1666 self._pile.remove_file(buf) 

1667 

1668 fns = io.save(traces, self._path, format=self._format) 

1669 

1670 if not self._forget_fixed: 

1671 self._pile.load_files( 

1672 fns, show_progress=False, fileformat=self._format) 

1673 

1674 if del_state: 

1675 del self._states[trbuf.nslc_id] 

1676 

1677 def __del__(self): 

1678 self.fixate_all()