1# http://pyrocko.org - GPLv3 

2# 

3# The Pyrocko Developers, 21st Century 

4# ---|P------/S----------~Lg---------- 

5from __future__ import absolute_import, division, print_function 

6 

7import os 

8import logging 

9import time 

10import weakref 

11import copy 

12import re 

13import sys 

14import operator 

15import math 

16import hashlib 

17try: 

18 import cPickle as pickle 

19except ImportError: 

20 import pickle 

21 

22 

23from . import avl 

24from . import trace, io, util 

25from . import config 

26from .trace import degapper 

27 

28 

29is_windows = sys.platform.startswith('win') 

30show_progress_force_off = False 

31version_salt = 'v1-' 

32 

33 

34def ehash(s): 

35 return hashlib.sha1((version_salt + s).encode('utf8')).hexdigest() 

36 

37 

38def cmp(a, b): 

39 return int(a > b) - int(a < b) 

40 

41 

42def sl(s): 

43 return [str(x) for x in sorted(s)] 

44 

45 

46class Counter(dict): 

47 

48 def __missing__(self, k): 

49 return 0 

50 

51 def update(self, other): 

52 for k, v in other.items(): 

53 self[k] += v 

54 

55 def subtract(self, other): 

56 for k, v in other.items(): 

57 self[k] -= v 

58 if self[k] <= 0: 

59 del self[k] 

60 

61 def subtract1(self, k): 

62 self[k] -= 1 

63 if self[k] <= 0: 

64 del self[k] 

65 

66 

67def fix_unicode_copy(counter, func): 

68 counter_new = Counter() 

69 for k in counter: 

70 counter_new[func(k)] = counter[k] 

71 return counter_new 

72 

73 

74pjoin = os.path.join 

75logger = logging.getLogger('pyrocko.pile') 

76 

77 

78def avl_remove_exact(avltree, element): 

79 ilo, ihi = avltree.span(element) 

80 for i in range(ilo, ihi): 

81 if avltree[i] is element: 

82 avltree.remove_at(i) 

83 return i 

84 

85 raise ValueError( 

86 'avl_remove_exact(avltree, element): element not in avltree') 

87 

88 

89def cmpfunc(key): 

90 if isinstance(key, str): 

91 # special cases; these run about 50% faster than the generic one on 

92 # Python 2.5 

93 if key == 'tmin': 

94 return lambda a, b: cmp(a.tmin, b.tmin) 

95 if key == 'tmax': 

96 return lambda a, b: cmp(a.tmax, b.tmax) 

97 

98 key = operator.attrgetter(key) 

99 

100 return lambda a, b: cmp(key(a), key(b)) 

101 

102 

103g_dummys = {} 

104 

105 

106def get_dummy(key): 

107 if key not in g_dummys: 

108 class Dummy(object): 

109 def __init__(self, k): 

110 setattr(self, key, k) 

111 

112 g_dummys[key] = Dummy 

113 

114 return g_dummys[key] 

115 

116 

117class TooMany(Exception): 

118 def __init__(self, n): 

119 Exception.__init__(self) 

120 self.n = n 

121 

122 

123class Sorted(object): 

124 def __init__(self, values=[], key=None): 

125 self._set_key(key) 

126 self._avl = avl.new(values, self._cmp) 

127 

128 def _set_key(self, key): 

129 self._key = key 

130 self._cmp = cmpfunc(key) 

131 if isinstance(key, str): 

132 self._dummy = get_dummy(key) 

133 

134 def __getstate__(self): 

135 state = list(self._avl.iter()), self._key 

136 return state 

137 

138 def __setstate__(self, state): 

139 l, key = state 

140 self._set_key(key) 

141 self._avl = avl.from_iter(iter(l), len(l)) 

142 

143 def insert(self, value): 

144 self._avl.insert(value) 

145 

146 def remove(self, value): 

147 return avl_remove_exact(self._avl, value) 

148 

149 def remove_at(self, i): 

150 return self._avl.remove_at(i) 

151 

152 def insert_many(self, values): 

153 for value in values: 

154 self._avl.insert(value) 

155 

156 def remove_many(self, values): 

157 for value in values: 

158 avl_remove_exact(self._avl, value) 

159 

160 def __iter__(self): 

161 return iter(self._avl) 

162 

163 def with_key_in(self, kmin, kmax): 

164 omin, omax = self._dummy(kmin), self._dummy(kmax) 

165 ilo, ihi = self._avl.span(omin, omax) 

166 return self._avl[ilo:ihi] 

167 

168 def with_key_in_limited(self, kmin, kmax, nmax): 

169 omin, omax = self._dummy(kmin), self._dummy(kmax) 

170 ilo, ihi = self._avl.span(omin, omax) 

171 if ihi - ilo > nmax: 

172 raise TooMany(ihi - ilo) 

173 

174 return self._avl[ilo:ihi] 

175 

176 def index(self, value): 

177 ilo, ihi = self._avl.span(value) 

178 for i in range(ilo, ihi): 

179 if self._avl[i] is value: 

180 return i 

181 

182 raise ValueError('element is not in avl tree') 

183 

184 def min(self): 

185 return self._avl.min() 

186 

187 def max(self): 

188 return self._avl.max() 

189 

190 def __len__(self): 

191 return len(self._avl) 

192 

193 def __getitem__(self, i): 

194 return self._avl[i] 

195 

196 

197class TracesFileCache(object): 

198 ''' 

199 Manages trace metainformation cache. 

200 

201 For each directory with files containing traces, one cache file is 

202 maintained to hold the trace metainformation of all files which are 

203 contained in the directory. 

204 ''' 

205 

206 caches = {} 

207 

208 def __init__(self, cachedir): 

209 ''' 

210 Create new cache. 

211 

212 :param cachedir: directory to hold the cache files. 

213 ''' 

214 

215 self.cachedir = cachedir 

216 self.dircaches = {} 

217 self.modified = set() 

218 util.ensuredir(self.cachedir) 

219 

220 def get(self, abspath): 

221 ''' 

222 Try to get an item from the cache. 

223 

224 :param abspath: absolute path of the object to retrieve 

225 

226 :returns: a stored object is returned or None if nothing could be 

227 found. 

228 ''' 

229 

230 dircache = self._get_dircache_for(abspath) 

231 if abspath in dircache: 

232 return dircache[abspath] 

233 return None 

234 

235 def put(self, abspath, tfile): 

236 ''' 

237 Put an item into the cache. 

238 

239 :param abspath: absolute path of the object to be stored 

240 :param tfile: object to be stored 

241 ''' 

242 

243 cachepath = self._dircachepath(abspath) 

244 # get lock on cachepath here 

245 dircache = self._get_dircache(cachepath) 

246 dircache[abspath] = tfile 

247 self.modified.add(cachepath) 

248 

249 def dump_modified(self): 

250 ''' 

251 Save any modifications to disk. 

252 ''' 

253 

254 for cachepath in self.modified: 

255 self._dump_dircache(self.dircaches[cachepath], cachepath) 

256 # unlock 

257 

258 self.modified = set() 

259 

260 def clean(self): 

261 ''' 

262 Weed out missing files from the disk caches. 

263 ''' 

264 

265 self.dump_modified() 

266 

267 for fn in os.listdir(self.cachedir): 

268 if len(fn) == 40: 

269 cache = self._load_dircache(pjoin(self.cachedir, fn)) 

270 self._dump_dircache(cache, pjoin(self.cachedir, fn)) 

271 

272 def _get_dircache_for(self, abspath): 

273 return self._get_dircache(self._dircachepath(abspath)) 

274 

275 def _get_dircache(self, cachepath): 

276 if cachepath not in self.dircaches: 

277 if os.path.isfile(cachepath): 

278 self.dircaches[cachepath] = self._load_dircache(cachepath) 

279 else: 

280 self.dircaches[cachepath] = {} 

281 

282 return self.dircaches[cachepath] 

283 

284 def _dircachepath(self, abspath): 

285 cachefn = ehash(os.path.dirname(abspath)) 

286 return pjoin(self.cachedir, cachefn) 

287 

288 def _load_dircache(self, cachefilename): 

289 

290 with open(cachefilename, 'rb') as f: 

291 cache = pickle.load(f) 

292 

293 # weed out files which no longer exist 

294 for fn in list(cache.keys()): 

295 if not os.path.isfile(fn): 

296 del cache[fn] 

297 

298 time_float = util.get_time_float() 

299 

300 for v in cache.values(): 

301 v.trees_from_content(v.traces) 

302 for tr in v.traces: 

303 tr.file = v 

304 # fix Py2 codes to not include unicode when the cache file 

305 # was created with Py3 

306 if not isinstance(tr.station, str): 

307 tr.prune_from_reuse_cache() 

308 tr.set_codes( 

309 str(tr.network), 

310 str(tr.station), 

311 str(tr.location), 

312 str(tr.channel)) 

313 

314 tr.tmin = time_float(tr.tmin) 

315 tr.tmax = time_float(tr.tmax) 

316 

317 v.data_use_count = 0 

318 v.data_loaded = False 

319 v.fix_unicode_codes() 

320 

321 return cache 

322 

323 def _dump_dircache(self, cache, cachefilename): 

324 

325 if not cache: 

326 if os.path.exists(cachefilename): 

327 os.remove(cachefilename) 

328 return 

329 

330 # make a copy without the parents and the binsearch trees 

331 cache_copy = {} 

332 for fn in cache.keys(): 

333 trf = copy.copy(cache[fn]) 

334 trf.parent = None 

335 trf.by_tmin = None 

336 trf.by_tmax = None 

337 trf.by_tlen = None 

338 trf.by_mtime = None 

339 trf.data_use_count = 0 

340 trf.data_loaded = False 

341 traces = [] 

342 for tr in trf.traces: 

343 tr = tr.copy(data=False) 

344 tr.ydata = None 

345 tr.meta = None 

346 tr.file = trf 

347 traces.append(tr) 

348 

349 trf.traces = traces 

350 

351 cache_copy[fn] = trf 

352 

353 tmpfn = cachefilename+'.%i.tmp' % os.getpid() 

354 with open(tmpfn, 'wb') as f: 

355 pickle.dump(cache_copy, f, protocol=2) 

356 

357 if is_windows and os.path.exists(cachefilename): 

358 # windows doesn't allow to rename over existing file 

359 os.unlink(cachefilename) 

360 

361 os.rename(tmpfn, cachefilename) 

362 

363 

364def get_cache(cachedir): 

365 ''' 

366 Get global TracesFileCache object for given directory. 

367 ''' 

368 if cachedir not in TracesFileCache.caches: 

369 TracesFileCache.caches[cachedir] = TracesFileCache(cachedir) 

370 

371 return TracesFileCache.caches[cachedir] 

372 

373 

374def loader( 

375 filenames, fileformat, cache, filename_attributes, 

376 show_progress=True, update_progress=None): 

377 

378 if show_progress_force_off: 

379 show_progress = False 

380 

381 class Progress(object): 

382 def __init__(self, label, n): 

383 self._label = label 

384 self._n = n 

385 self._bar = None 

386 if show_progress: 

387 self._bar = util.progressbar(label, self._n) 

388 

389 if update_progress: 

390 update_progress(label, 0, self._n) 

391 

392 def update(self, i): 

393 if self._bar: 

394 if i < self._n-1: 

395 self._bar.update(i) 

396 else: 

397 self._bar.finish() 

398 self._bar = None 

399 

400 abort = False 

401 if update_progress: 

402 abort = update_progress(self._label, i, self._n) 

403 

404 return abort 

405 

406 if not filenames: 

407 logger.warning('No files to load from') 

408 return 

409 

410 regex = None 

411 if filename_attributes: 

412 regex = re.compile(filename_attributes) 

413 

414 progress = Progress('Looking at files', len(filenames)) 

415 

416 failures = [] 

417 to_load = [] 

418 for i, filename in enumerate(filenames): 

419 try: 

420 abspath = os.path.abspath(filename) 

421 

422 substitutions = None 

423 if regex: 

424 m = regex.search(filename) 

425 if not m: 

426 raise FilenameAttributeError( 

427 "Cannot get attributes with pattern '%s' " 

428 "from path '%s'" % (filename_attributes, filename)) 

429 

430 substitutions = {} 

431 for k in m.groupdict(): 

432 if k in ('network', 'station', 'location', 'channel'): 

433 substitutions[k] = m.groupdict()[k] 

434 

435 mtime = os.stat(filename)[8] 

436 tfile = None 

437 if cache: 

438 tfile = cache.get(abspath) 

439 

440 mustload = ( 

441 not tfile or 

442 (tfile.format != fileformat and fileformat != 'detect') or 

443 tfile.mtime != mtime or 

444 substitutions is not None) 

445 

446 to_load.append((mustload, mtime, abspath, substitutions, tfile)) 

447 

448 except (OSError, FilenameAttributeError) as xerror: 

449 failures.append(abspath) 

450 logger.warning(xerror) 

451 

452 abort = progress.update(i+1) 

453 if abort: 

454 progress.update(len(filenames)) 

455 return 

456 

457 progress.update(len(filenames)) 

458 

459 to_load.sort(key=lambda x: x[2]) 

460 

461 nload = len([1 for x in to_load if x[0]]) 

462 iload = 0 

463 

464 count_all = False 

465 if nload < 0.01*len(to_load): 

466 nload = len(to_load) 

467 count_all = True 

468 

469 if to_load: 

470 progress = Progress('Scanning files', nload) 

471 

472 for (mustload, mtime, abspath, substitutions, tfile) in to_load: 

473 try: 

474 if mustload: 

475 tfile = TracesFile( 

476 None, abspath, fileformat, 

477 substitutions=substitutions, mtime=mtime) 

478 

479 if cache and not substitutions: 

480 cache.put(abspath, tfile) 

481 

482 if not count_all: 

483 iload += 1 

484 

485 if count_all: 

486 iload += 1 

487 

488 except (io.FileLoadError, OSError) as xerror: 

489 failures.append(abspath) 

490 logger.warning(xerror) 

491 else: 

492 yield tfile 

493 

494 abort = progress.update(iload+1) 

495 if abort: 

496 break 

497 

498 progress.update(nload) 

499 

500 if failures: 

501 logger.warning( 

502 'The following file%s caused problems and will be ignored:\n' % 

503 util.plural_s(len(failures)) + '\n'.join(failures)) 

504 

505 if cache: 

506 cache.dump_modified() 

507 

508 

509def tlen(x): 

510 return x.tmax-x.tmin 

511 

512 

513class TracesGroup(object): 

514 

515 ''' 

516 Trace container base class. 

517 

518 Base class for Pile, SubPile, and TracesFile, i.e. anything containing 

519 a collection of several traces. A TracesGroup object maintains lookup sets 

520 of some of the traces meta-information, as well as a combined time-range 

521 of its contents. 

522 ''' 

523 

524 def __init__(self, parent): 

525 self.parent = parent 

526 self.empty() 

527 self.nupdates = 0 

528 self.abspath = None 

529 

530 def set_parent(self, parent): 

531 self.parent = parent 

532 

533 def get_parent(self): 

534 return self.parent 

535 

536 def empty(self): 

537 self.networks, self.stations, self.locations, self.channels, \ 

538 self.nslc_ids, self.deltats = [Counter() for x in range(6)] 

539 self.by_tmin = Sorted([], 'tmin') 

540 self.by_tmax = Sorted([], 'tmax') 

541 self.by_tlen = Sorted([], tlen) 

542 self.by_mtime = Sorted([], 'mtime') 

543 self.tmin, self.tmax = None, None 

544 self.deltatmin, self.deltatmax = None, None 

545 

546 def trees_from_content(self, content): 

547 self.by_tmin = Sorted(content, 'tmin') 

548 self.by_tmax = Sorted(content, 'tmax') 

549 self.by_tlen = Sorted(content, tlen) 

550 self.by_mtime = Sorted(content, 'mtime') 

551 self.adjust_minmax() 

552 

553 def fix_unicode_codes(self): 

554 for net in self.networks: 

555 if isinstance(net, str): 

556 return 

557 

558 self.networks = fix_unicode_copy(self.networks, str) 

559 self.stations = fix_unicode_copy(self.stations, str) 

560 self.locations = fix_unicode_copy(self.locations, str) 

561 self.channels = fix_unicode_copy(self.channels, str) 

562 self.nslc_ids = fix_unicode_copy( 

563 self.nslc_ids, lambda k: tuple(str(x) for x in k)) 

564 

565 def add(self, content): 

566 ''' 

567 Add content to traces group and update indices. 

568 

569 Accepts :py:class:`pyrocko.trace.Trace` objects and 

570 :py:class:`pyrocko.pile.TracesGroup` objects. 

571 ''' 

572 

573 if isinstance(content, (trace.Trace, TracesGroup)): 

574 content = [content] 

575 

576 for c in content: 

577 

578 if isinstance(c, TracesGroup): 

579 self.networks.update(c.networks) 

580 self.stations.update(c.stations) 

581 self.locations.update(c.locations) 

582 self.channels.update(c.channels) 

583 self.nslc_ids.update(c.nslc_ids) 

584 self.deltats.update(c.deltats) 

585 

586 self.by_tmin.insert_many(c.by_tmin) 

587 self.by_tmax.insert_many(c.by_tmax) 

588 self.by_tlen.insert_many(c.by_tlen) 

589 self.by_mtime.insert_many(c.by_mtime) 

590 

591 elif isinstance(c, trace.Trace): 

592 self.networks[c.network] += 1 

593 self.stations[c.station] += 1 

594 self.locations[c.location] += 1 

595 self.channels[c.channel] += 1 

596 self.nslc_ids[c.nslc_id] += 1 

597 self.deltats[c.deltat] += 1 

598 

599 self.by_tmin.insert(c) 

600 self.by_tmax.insert(c) 

601 self.by_tlen.insert(c) 

602 self.by_mtime.insert(c) 

603 

604 self.adjust_minmax() 

605 

606 self.nupdates += 1 

607 self.notify_listeners('add') 

608 

609 if self.parent is not None: 

610 self.parent.add(content) 

611 

612 def remove(self, content): 

613 ''' 

614 Remove content to traces group and update indices. 

615 ''' 

616 if isinstance(content, (trace.Trace, TracesGroup)): 

617 content = [content] 

618 

619 for c in content: 

620 

621 if isinstance(c, TracesGroup): 

622 self.networks.subtract(c.networks) 

623 self.stations.subtract(c.stations) 

624 self.locations.subtract(c.locations) 

625 self.channels.subtract(c.channels) 

626 self.nslc_ids.subtract(c.nslc_ids) 

627 self.deltats.subtract(c.deltats) 

628 

629 self.by_tmin.remove_many(c.by_tmin) 

630 self.by_tmax.remove_many(c.by_tmax) 

631 self.by_tlen.remove_many(c.by_tlen) 

632 self.by_mtime.remove_many(c.by_mtime) 

633 

634 elif isinstance(c, trace.Trace): 

635 self.networks.subtract1(c.network) 

636 self.stations.subtract1(c.station) 

637 self.locations.subtract1(c.location) 

638 self.channels.subtract1(c.channel) 

639 self.nslc_ids.subtract1(c.nslc_id) 

640 self.deltats.subtract1(c.deltat) 

641 

642 self.by_tmin.remove(c) 

643 self.by_tmax.remove(c) 

644 self.by_tlen.remove(c) 

645 self.by_mtime.remove(c) 

646 

647 self.adjust_minmax() 

648 

649 self.nupdates += 1 

650 self.notify_listeners('remove') 

651 

652 if self.parent is not None: 

653 self.parent.remove(content) 

654 

655 def relevant(self, tmin, tmax, group_selector=None, trace_selector=None): 

656 ''' 

657 Return list of :py:class:`pyrocko.trace.Trace` objects where given 

658 arguments ``tmin`` and ``tmax`` match. 

659 

660 :param tmin: start time 

661 :param tmax: end time 

662 :param group_selector: lambda expression taking group dict of regex 

663 match object as a single argument and which returns true or false 

664 to keep or reject a file (default: ``None``) 

665 :param trace_selector: lambda expression taking group dict of regex 

666 match object as a single argument and which returns true or false 

667 to keep or reject a file (default: ``None``) 

668 ''' 

669 

670 if not self.by_tmin or not self.is_relevant( 

671 tmin, tmax, group_selector): 

672 

673 return [] 

674 

675 return [tr for tr in self.by_tmin.with_key_in(tmin-self.tlenmax, tmax) 

676 if tr.is_relevant(tmin, tmax, trace_selector)] 

677 

678 def adjust_minmax(self): 

679 if self.by_tmin: 

680 self.tmin = self.by_tmin.min().tmin 

681 self.tmax = self.by_tmax.max().tmax 

682 t = self.by_tlen.max() 

683 self.tlenmax = t.tmax - t.tmin 

684 self.mtime = self.by_mtime.max().mtime 

685 deltats = list(self.deltats.keys()) 

686 self.deltatmin = min(deltats) 

687 self.deltatmax = max(deltats) 

688 else: 

689 self.tmin = None 

690 self.tmax = None 

691 self.tlenmax = None 

692 self.mtime = None 

693 self.deltatmin = None 

694 self.deltatmax = None 

695 

696 def notify_listeners(self, what): 

697 pass 

698 

699 def get_update_count(self): 

700 return self.nupdates 

701 

702 def overlaps(self, tmin, tmax): 

703 return self.tmin is not None \ 

704 and tmax >= self.tmin and self.tmax >= tmin 

705 

706 def is_relevant(self, tmin, tmax, group_selector=None): 

707 if self.tmin is None or self.tmax is None: 

708 return False 

709 return tmax >= self.tmin and self.tmax >= tmin and ( 

710 group_selector is None or group_selector(self)) 

711 

712 

713class MemTracesFile(TracesGroup): 

714 

715 ''' 

716 This is needed to make traces without an actual disc file to be inserted 

717 into a Pile. 

718 ''' 

719 

720 def __init__(self, parent, traces): 

721 TracesGroup.__init__(self, parent) 

722 self.add(traces) 

723 self.mtime = time.time() 

724 

725 def add(self, traces): 

726 if isinstance(traces, trace.Trace): 

727 traces = [traces] 

728 

729 for tr in traces: 

730 tr.file = self 

731 

732 TracesGroup.add(self, traces) 

733 

734 def load_headers(self, mtime=None): 

735 pass 

736 

737 def load_data(self): 

738 pass 

739 

740 def use_data(self): 

741 pass 

742 

743 def drop_data(self): 

744 pass 

745 

746 def reload_if_modified(self): 

747 return False 

748 

749 def iter_traces(self): 

750 for tr in self.by_tmin: 

751 yield tr 

752 

753 def get_traces(self): 

754 return list(self.by_tmin) 

755 

756 def gather_keys(self, gather, selector=None): 

757 keys = set() 

758 for tr in self.by_tmin: 

759 if selector is None or selector(tr): 

760 keys.add(gather(tr)) 

761 

762 return keys 

763 

764 def __str__(self): 

765 

766 s = 'MemTracesFile\n' 

767 s += 'file mtime: %s\n' % util.time_to_str(self.mtime) 

768 s += 'number of traces: %i\n' % len(self.by_tmin) 

769 s += 'timerange: %s - %s\n' % ( 

770 util.time_to_str(self.tmin), util.time_to_str(self.tmax)) 

771 s += 'networks: %s\n' % ', '.join(sl(self.networks.keys())) 

772 s += 'stations: %s\n' % ', '.join(sl(self.stations.keys())) 

773 s += 'locations: %s\n' % ', '.join(sl(self.locations.keys())) 

774 s += 'channels: %s\n' % ', '.join(sl(self.channels.keys())) 

775 s += 'deltats: %s\n' % ', '.join(sl(self.deltats.keys())) 

776 return s 

777 

778 

779class TracesFile(TracesGroup): 

780 def __init__( 

781 self, parent, abspath, format, 

782 substitutions=None, mtime=None): 

783 

784 TracesGroup.__init__(self, parent) 

785 self.abspath = abspath 

786 self.format = format 

787 self.traces = [] 

788 self.data_loaded = False 

789 self.data_use_count = 0 

790 self.substitutions = substitutions 

791 self.load_headers(mtime=mtime) 

792 self.mtime = mtime 

793 

794 def load_headers(self, mtime=None): 

795 logger.debug('loading headers from file: %s' % self.abspath) 

796 if mtime is None: 

797 self.mtime = os.stat(self.abspath)[8] 

798 

799 def kgen(tr): 

800 return (tr.mtime, tr.tmin, tr.tmax) + tr.nslc_id 

801 

802 self.remove(self.traces) 

803 ks = set() 

804 for tr in io.load(self.abspath, 

805 format=self.format, 

806 getdata=False, 

807 substitutions=self.substitutions): 

808 

809 k = kgen(tr) 

810 if k not in ks: 

811 ks.add(k) 

812 self.traces.append(tr) 

813 tr.file = self 

814 

815 self.add(self.traces) 

816 

817 self.data_loaded = False 

818 self.data_use_count = 0 

819 

820 def load_data(self, force=False): 

821 file_changed = False 

822 if not self.data_loaded or force: 

823 logger.debug('loading data from file: %s' % self.abspath) 

824 

825 def kgen(tr): 

826 return (tr.mtime, tr.tmin, tr.tmax) + tr.nslc_id 

827 

828 traces_ = io.load(self.abspath, format=self.format, getdata=True, 

829 substitutions=self.substitutions) 

830 

831 # prevent adding duplicate snippets from corrupt mseed files 

832 k_loaded = set() 

833 traces = [] 

834 for tr in traces_: 

835 k = kgen(tr) 

836 if k not in k_loaded: 

837 k_loaded.add(k) 

838 traces.append(tr) 

839 

840 k_current_d = dict((kgen(tr), tr) for tr in self.traces) 

841 k_current = set(k_current_d) 

842 k_new = k_loaded - k_current 

843 k_delete = k_current - k_loaded 

844 k_unchanged = k_current & k_loaded 

845 

846 for tr in self.traces[:]: 

847 if kgen(tr) in k_delete: 

848 self.remove(tr) 

849 self.traces.remove(tr) 

850 tr.file = None 

851 file_changed = True 

852 

853 for tr in traces: 

854 if kgen(tr) in k_new: 

855 tr.file = self 

856 self.traces.append(tr) 

857 self.add(tr) 

858 file_changed = True 

859 

860 for tr in traces: 

861 if kgen(tr) in k_unchanged: 

862 ctr = k_current_d[kgen(tr)] 

863 ctr.ydata = tr.ydata 

864 

865 self.data_loaded = True 

866 

867 if file_changed: 

868 logger.debug('reloaded (file may have changed): %s' % self.abspath) 

869 

870 return file_changed 

871 

872 def use_data(self): 

873 if not self.data_loaded: 

874 raise Exception('Data not loaded') 

875 self.data_use_count += 1 

876 

877 def drop_data(self): 

878 if self.data_loaded: 

879 if self.data_use_count == 1: 

880 logger.debug('forgetting data of file: %s' % self.abspath) 

881 for tr in self.traces: 

882 tr.drop_data() 

883 

884 self.data_loaded = False 

885 

886 self.data_use_count -= 1 

887 else: 

888 self.data_use_count = 0 

889 

890 def reload_if_modified(self): 

891 mtime = os.stat(self.abspath)[8] 

892 if mtime != self.mtime: 

893 logger.debug( 

894 'mtime=%i, reloading file: %s' % (mtime, self.abspath)) 

895 

896 self.mtime = mtime 

897 if self.data_loaded: 

898 self.load_data(force=True) 

899 else: 

900 self.load_headers() 

901 

902 return True 

903 

904 return False 

905 

906 def iter_traces(self): 

907 for tr in self.traces: 

908 yield tr 

909 

910 def gather_keys(self, gather, selector=None): 

911 keys = set() 

912 for tr in self.by_tmin: 

913 if selector is None or selector(tr): 

914 keys.add(gather(tr)) 

915 

916 return keys 

917 

918 def __str__(self): 

919 s = 'TracesFile\n' 

920 s += 'abspath: %s\n' % self.abspath 

921 s += 'file mtime: %s\n' % util.time_to_str(self.mtime) 

922 s += 'number of traces: %i\n' % len(self.traces) 

923 s += 'timerange: %s - %s\n' % ( 

924 util.time_to_str(self.tmin), util.time_to_str(self.tmax)) 

925 s += 'networks: %s\n' % ', '.join(sl(self.networks.keys())) 

926 s += 'stations: %s\n' % ', '.join(sl(self.stations.keys())) 

927 s += 'locations: %s\n' % ', '.join(sl(self.locations.keys())) 

928 s += 'channels: %s\n' % ', '.join(sl(self.channels.keys())) 

929 s += 'deltats: %s\n' % ', '.join(sl(self.deltats.keys())) 

930 return s 

931 

932 

933class FilenameAttributeError(Exception): 

934 pass 

935 

936 

937class SubPile(TracesGroup): 

938 def __init__(self, parent): 

939 TracesGroup.__init__(self, parent) 

940 self.files = [] 

941 self.empty() 

942 

943 def add_file(self, file): 

944 self.files.append(file) 

945 file.set_parent(self) 

946 self.add(file) 

947 

948 def remove_file(self, file): 

949 self.files.remove(file) 

950 file.set_parent(None) 

951 self.remove(file) 

952 

953 def remove_files(self, files): 

954 for file in files: 

955 self.files.remove(file) 

956 file.set_parent(None) 

957 self.remove(files) 

958 

959 def gather_keys(self, gather, selector=None): 

960 keys = set() 

961 for file in self.files: 

962 keys |= file.gather_keys(gather, selector) 

963 

964 return keys 

965 

966 def iter_traces( 

967 self, 

968 load_data=False, 

969 return_abspath=False, 

970 group_selector=None, 

971 trace_selector=None): 

972 

973 for file in self.files: 

974 

975 if group_selector and not group_selector(file): 

976 continue 

977 

978 must_drop = False 

979 if load_data: 

980 file.load_data() 

981 file.use_data() 

982 must_drop = True 

983 

984 for tr in file.iter_traces(): 

985 if trace_selector and not trace_selector(tr): 

986 continue 

987 

988 if return_abspath: 

989 yield file.abspath, tr 

990 else: 

991 yield tr 

992 

993 if must_drop: 

994 file.drop_data() 

995 

996 def iter_files(self): 

997 for file in self.files: 

998 yield file 

999 

1000 def reload_modified(self): 

1001 modified = False 

1002 for file in self.files: 

1003 modified |= file.reload_if_modified() 

1004 

1005 return modified 

1006 

1007 def __str__(self): 

1008 s = 'SubPile\n' 

1009 s += 'number of files: %i\n' % len(self.files) 

1010 s += 'timerange: %s - %s\n' % ( 

1011 util.time_to_str(self.tmin), util.time_to_str(self.tmax)) 

1012 s += 'networks: %s\n' % ', '.join(sl(self.networks.keys())) 

1013 s += 'stations: %s\n' % ', '.join(sl(self.stations.keys())) 

1014 s += 'locations: %s\n' % ', '.join(sl(self.locations.keys())) 

1015 s += 'channels: %s\n' % ', '.join(sl(self.channels.keys())) 

1016 s += 'deltats: %s\n' % ', '.join(sl(self.deltats.keys())) 

1017 return s 

1018 

1019 

1020class Batch(object): 

1021 ''' 

1022 Batch of waveforms from window wise data extraction. 

1023 

1024 Encapsulates state and results yielded for each window in window wise 

1025 waveform extraction with the :py:meth:`Pile.chopper` method (when the 

1026 `style='batch'` keyword argument set). 

1027 

1028 *Attributes:* 

1029 

1030 .. py:attribute:: tmin 

1031 

1032 Start of this time window. 

1033 

1034 .. py:attribute:: tmax 

1035 

1036 End of this time window. 

1037 

1038 .. py:attribute:: i 

1039 

1040 Index of this time window in sequence. 

1041 

1042 .. py:attribute:: n 

1043 

1044 Total number of time windows in sequence. 

1045 

1046 .. py:attribute:: traces 

1047 

1048 Extracted waveforms for this time window. 

1049 ''' 

1050 

1051 def __init__(self, tmin, tmax, i, n, traces): 

1052 self.tmin = tmin 

1053 self.tmax = tmax 

1054 self.i = i 

1055 self.n = n 

1056 self.traces = traces 

1057 

1058 

1059class Pile(TracesGroup): 

1060 ''' 

1061 Waveform archive lookup, data loading and caching infrastructure. 

1062 ''' 

1063 

1064 def __init__(self): 

1065 TracesGroup.__init__(self, None) 

1066 self.subpiles = {} 

1067 self.open_files = {} 

1068 self.listeners = [] 

1069 self.abspaths = set() 

1070 

1071 def add_listener(self, obj): 

1072 self.listeners.append(weakref.ref(obj)) 

1073 

1074 def notify_listeners(self, what): 

1075 for ref in self.listeners: 

1076 obj = ref() 

1077 if obj: 

1078 obj.pile_changed(what) 

1079 

1080 def load_files( 

1081 self, filenames, 

1082 filename_attributes=None, 

1083 fileformat='mseed', 

1084 cache=None, 

1085 show_progress=True, 

1086 update_progress=None): 

1087 

1088 load = loader( 

1089 filenames, fileformat, cache, filename_attributes, 

1090 show_progress=show_progress, 

1091 update_progress=update_progress) 

1092 

1093 self.add_files(load) 

1094 

1095 def add_files(self, files): 

1096 for file in files: 

1097 self.add_file(file) 

1098 

1099 def add_file(self, file): 

1100 if file.abspath is not None and file.abspath in self.abspaths: 

1101 logger.warning('File already in pile: %s' % file.abspath) 

1102 return 

1103 

1104 if file.deltatmin is None: 

1105 logger.warning('Sampling rate of all traces are zero in file: %s' % 

1106 file.abspath) 

1107 return 

1108 

1109 subpile = self.dispatch(file) 

1110 subpile.add_file(file) 

1111 if file.abspath is not None: 

1112 self.abspaths.add(file.abspath) 

1113 

1114 def remove_file(self, file): 

1115 subpile = file.get_parent() 

1116 if subpile is not None: 

1117 subpile.remove_file(file) 

1118 if file.abspath is not None: 

1119 self.abspaths.remove(file.abspath) 

1120 

1121 def remove_files(self, files): 

1122 subpile_files = {} 

1123 for file in files: 

1124 subpile = file.get_parent() 

1125 if subpile not in subpile_files: 

1126 subpile_files[subpile] = [] 

1127 

1128 subpile_files[subpile].append(file) 

1129 

1130 for subpile, files in subpile_files.items(): 

1131 subpile.remove_files(files) 

1132 for file in files: 

1133 if file.abspath is not None: 

1134 self.abspaths.remove(file.abspath) 

1135 

1136 def dispatch_key(self, file): 

1137 dt = int(math.floor(math.log(file.deltatmin))) 

1138 return dt 

1139 

1140 def dispatch(self, file): 

1141 k = self.dispatch_key(file) 

1142 if k not in self.subpiles: 

1143 self.subpiles[k] = SubPile(self) 

1144 

1145 return self.subpiles[k] 

1146 

1147 def get_deltats(self): 

1148 return list(self.deltats.keys()) 

1149 

1150 def chop( 

1151 self, tmin, tmax, 

1152 group_selector=None, 

1153 trace_selector=None, 

1154 snap=(round, round), 

1155 include_last=False, 

1156 load_data=True): 

1157 

1158 chopped = [] 

1159 used_files = set() 

1160 

1161 traces = self.relevant(tmin, tmax, group_selector, trace_selector) 

1162 if load_data: 

1163 files_changed = False 

1164 for tr in traces: 

1165 if tr.file and tr.file not in used_files: 

1166 if tr.file.load_data(): 

1167 files_changed = True 

1168 

1169 if tr.file is not None: 

1170 used_files.add(tr.file) 

1171 

1172 if files_changed: 

1173 traces = self.relevant( 

1174 tmin, tmax, group_selector, trace_selector) 

1175 

1176 for tr in traces: 

1177 if not load_data and tr.ydata is not None: 

1178 tr = tr.copy(data=False) 

1179 tr.ydata = None 

1180 

1181 try: 

1182 chopped.append(tr.chop( 

1183 tmin, tmax, 

1184 inplace=False, 

1185 snap=snap, 

1186 include_last=include_last)) 

1187 

1188 except trace.NoData: 

1189 pass 

1190 

1191 return chopped, used_files 

1192 

1193 def _process_chopped( 

1194 self, chopped, degap, maxgap, maxlap, want_incomplete, wmax, wmin, 

1195 tpad): 

1196 

1197 chopped.sort(key=lambda a: a.full_id) 

1198 if degap: 

1199 chopped = degapper(chopped, maxgap=maxgap, maxlap=maxlap) 

1200 

1201 if not want_incomplete: 

1202 chopped_weeded = [] 

1203 for tr in chopped: 

1204 emin = tr.tmin - (wmin-tpad) 

1205 emax = tr.tmax + tr.deltat - (wmax+tpad) 

1206 if (abs(emin) <= 0.5*tr.deltat and abs(emax) <= 0.5*tr.deltat): 

1207 chopped_weeded.append(tr) 

1208 

1209 elif degap: 

1210 if (0. < emin <= 5. * tr.deltat and 

1211 -5. * tr.deltat <= emax < 0.): 

1212 

1213 tr.extend( 

1214 wmin-tpad, 

1215 wmax+tpad-tr.deltat, 

1216 fillmethod='repeat') 

1217 

1218 chopped_weeded.append(tr) 

1219 

1220 chopped = chopped_weeded 

1221 

1222 for tr in chopped: 

1223 tr.wmin = wmin 

1224 tr.wmax = wmax 

1225 

1226 return chopped 

1227 

1228 def chopper( 

1229 self, 

1230 tmin=None, tmax=None, tinc=None, tpad=0., 

1231 group_selector=None, trace_selector=None, 

1232 want_incomplete=True, degap=True, maxgap=5, maxlap=None, 

1233 keep_current_files_open=False, accessor_id=None, 

1234 snap=(round, round), include_last=False, load_data=True, 

1235 style=None): 

1236 

1237 ''' 

1238 Get iterator for shifting window wise data extraction from waveform 

1239 archive. 

1240 

1241 :param tmin: start time (default uses start time of available data) 

1242 :param tmax: end time (default uses end time of available data) 

1243 :param tinc: time increment (window shift time) (default uses 

1244 ``tmax-tmin``) 

1245 :param tpad: padding time appended on either side of the data windows 

1246 (window overlap is ``2*tpad``) 

1247 :param group_selector: filter callback taking :py:class:`TracesGroup` 

1248 objects 

1249 :param trace_selector: filter callback taking 

1250 :py:class:`pyrocko.trace.Trace` objects 

1251 :param want_incomplete: if set to ``False``, gappy/incomplete traces 

1252 are discarded from the results 

1253 :param degap: whether to try to connect traces and to remove gaps and 

1254 overlaps 

1255 :param maxgap: maximum gap size in samples which is filled with 

1256 interpolated samples when ``degap`` is ``True`` 

1257 :param maxlap: maximum overlap size in samples which is removed when 

1258 ``degap`` is ``True`` 

1259 :param keep_current_files_open: whether to keep cached trace data in 

1260 memory after the iterator has ended 

1261 :param accessor_id: if given, used as a key to identify different 

1262 points of extraction for the decision of when to release cached 

1263 trace data (should be used when data is alternately extracted from 

1264 more than one region / selection) 

1265 :param snap: replaces Python's :py:func:`round` function which is used 

1266 to determine indices where to start and end the trace data array 

1267 :param include_last: whether to include last sample 

1268 :param load_data: whether to load the waveform data. If set to 

1269 ``False``, traces with no data samples, but with correct 

1270 meta-information are returned 

1271 :param style: set to ``'batch'`` to yield waveforms and information 

1272 about the chopper state as :py:class:`Batch` objects. By default 

1273 lists of :py:class:`pyrocko.trace.Trace` objects are yielded. 

1274 :returns: iterator providing extracted waveforms for each extracted 

1275 window. See ``style`` argument for details. 

1276 ''' 

1277 if tmin is None: 

1278 if self.tmin is None: 

1279 logger.warning('Pile\'s tmin is not set - pile may be empty.') 

1280 return 

1281 tmin = self.tmin + tpad 

1282 

1283 if tmax is None: 

1284 if self.tmax is None: 

1285 logger.warning('Pile\'s tmax is not set - pile may be empty.') 

1286 return 

1287 tmax = self.tmax - tpad 

1288 

1289 if not self.is_relevant(tmin-tpad, tmax+tpad, group_selector): 

1290 return 

1291 

1292 if accessor_id not in self.open_files: 

1293 self.open_files[accessor_id] = set() 

1294 

1295 open_files = self.open_files[accessor_id] 

1296 

1297 if tinc is None: 

1298 tinc = tmax - tmin 

1299 nwin = 1 

1300 else: 

1301 eps = tinc * 1e-6 

1302 if tinc != 0.0: 

1303 nwin = int(((tmax - eps) - tmin) / tinc) + 1 

1304 else: 

1305 nwin = 1 

1306 

1307 for iwin in range(nwin): 

1308 wmin, wmax = tmin+iwin*tinc, min(tmin+(iwin+1)*tinc, tmax) 

1309 

1310 chopped, used_files = self.chop( 

1311 wmin-tpad, wmax+tpad, group_selector, trace_selector, snap, 

1312 include_last, load_data) 

1313 

1314 for file in used_files - open_files: 

1315 # increment datause counter on newly opened files 

1316 file.use_data() 

1317 

1318 open_files.update(used_files) 

1319 

1320 processed = self._process_chopped( 

1321 chopped, degap, maxgap, maxlap, want_incomplete, wmax, wmin, 

1322 tpad) 

1323 

1324 if style == 'batch': 

1325 yield Batch( 

1326 tmin=wmin, 

1327 tmax=wmax, 

1328 i=iwin, 

1329 n=nwin, 

1330 traces=processed) 

1331 

1332 else: 

1333 yield processed 

1334 

1335 unused_files = open_files - used_files 

1336 

1337 while unused_files: 

1338 file = unused_files.pop() 

1339 file.drop_data() 

1340 open_files.remove(file) 

1341 

1342 if not keep_current_files_open: 

1343 while open_files: 

1344 file = open_files.pop() 

1345 file.drop_data() 

1346 

1347 def all(self, *args, **kwargs): 

1348 ''' 

1349 Shortcut to aggregate :py:meth:`chopper` output into a single list. 

1350 ''' 

1351 

1352 alltraces = [] 

1353 for traces in self.chopper(*args, **kwargs): 

1354 alltraces.extend(traces) 

1355 

1356 return alltraces 

1357 

1358 def iter_all(self, *args, **kwargs): 

1359 for traces in self.chopper(*args, **kwargs): 

1360 for tr in traces: 

1361 yield tr 

1362 

1363 def chopper_grouped(self, gather, progress=None, *args, **kwargs): 

1364 keys = self.gather_keys(gather) 

1365 if len(keys) == 0: 

1366 return 

1367 

1368 outer_group_selector = None 

1369 if 'group_selector' in kwargs: 

1370 outer_group_selector = kwargs['group_selector'] 

1371 

1372 outer_trace_selector = None 

1373 if 'trace_selector' in kwargs: 

1374 outer_trace_selector = kwargs['trace_selector'] 

1375 

1376 # the use of this gather-cache makes it impossible to modify the pile 

1377 # during chopping 

1378 gather_cache = {} 

1379 pbar = None 

1380 if progress is not None: 

1381 pbar = util.progressbar(progress, len(keys)) 

1382 

1383 for ikey, key in enumerate(keys): 

1384 def tsel(tr): 

1385 return gather(tr) == key and (outer_trace_selector is None or 

1386 outer_trace_selector(tr)) 

1387 

1388 def gsel(gr): 

1389 if gr not in gather_cache: 

1390 gather_cache[gr] = gr.gather_keys(gather) 

1391 

1392 return key in gather_cache[gr] and ( 

1393 outer_group_selector is None or outer_group_selector(gr)) 

1394 

1395 kwargs['trace_selector'] = tsel 

1396 kwargs['group_selector'] = gsel 

1397 

1398 for traces in self.chopper(*args, **kwargs): 

1399 yield traces 

1400 

1401 if pbar: 

1402 pbar.update(ikey+1) 

1403 

1404 if pbar: 

1405 pbar.finish() 

1406 

1407 def gather_keys(self, gather, selector=None): 

1408 keys = set() 

1409 for subpile in self.subpiles.values(): 

1410 keys |= subpile.gather_keys(gather, selector) 

1411 

1412 return sorted(keys) 

1413 

1414 def iter_traces( 

1415 self, 

1416 load_data=False, 

1417 return_abspath=False, 

1418 group_selector=None, 

1419 trace_selector=None): 

1420 

1421 ''' 

1422 Iterate over all traces in pile. 

1423 

1424 :param load_data: whether to load the waveform data, by default empty 

1425 traces are yielded 

1426 :param return_abspath: if ``True`` yield tuples containing absolute 

1427 file path and :py:class:`pyrocko.trace.Trace` objects 

1428 :param group_selector: filter callback taking :py:class:`TracesGroup` 

1429 objects 

1430 :param trace_selector: filter callback taking 

1431 :py:class:`pyrocko.trace.Trace` objects 

1432 

1433 Example; yields only traces, where the station code is 'HH1':: 

1434 

1435 test_pile = pile.make_pile('/local/test_trace_directory') 

1436 for t in test_pile.iter_traces( 

1437 trace_selector=lambda tr: tr.station=='HH1'): 

1438 

1439 print t 

1440 ''' 

1441 

1442 for subpile in self.subpiles.values(): 

1443 if not group_selector or group_selector(subpile): 

1444 for tr in subpile.iter_traces(load_data, return_abspath, 

1445 group_selector, trace_selector): 

1446 yield tr 

1447 

1448 def iter_files(self): 

1449 for subpile in self.subpiles.values(): 

1450 for file in subpile.iter_files(): 

1451 yield file 

1452 

1453 def reload_modified(self): 

1454 modified = False 

1455 for subpile in self.subpiles.values(): 

1456 modified |= subpile.reload_modified() 

1457 

1458 return modified 

1459 

1460 def get_tmin(self): 

1461 return self.tmin 

1462 

1463 def get_tmax(self): 

1464 return self.tmax 

1465 

1466 def get_deltatmin(self): 

1467 return self.deltatmin 

1468 

1469 def get_deltatmax(self): 

1470 return self.deltatmax 

1471 

1472 def is_empty(self): 

1473 return self.tmin is None and self.tmax is None 

1474 

1475 def __str__(self): 

1476 if self.tmin is not None and self.tmax is not None: 

1477 tmin = util.time_to_str(self.tmin) 

1478 tmax = util.time_to_str(self.tmax) 

1479 s = 'Pile\n' 

1480 s += 'number of subpiles: %i\n' % len(self.subpiles) 

1481 s += 'timerange: %s - %s\n' % (tmin, tmax) 

1482 s += 'networks: %s\n' % ', '.join(sl(self.networks.keys())) 

1483 s += 'stations: %s\n' % ', '.join(sl(self.stations.keys())) 

1484 s += 'locations: %s\n' % ', '.join(sl(self.locations.keys())) 

1485 s += 'channels: %s\n' % ', '.join(sl(self.channels.keys())) 

1486 s += 'deltats: %s\n' % ', '.join(sl(self.deltats.keys())) 

1487 

1488 else: 

1489 s = 'empty Pile' 

1490 

1491 return s 

1492 

1493 def snuffle(self, **kwargs): 

1494 ''' 

1495 Visualize it. 

1496 

1497 :param stations: list of `pyrocko.model.Station` objects or ``None`` 

1498 :param events: list of `pyrocko.model.Event` objects or ``None`` 

1499 :param markers: list of `pyrocko.gui_util.Marker` objects or ``None`` 

1500 :param ntracks: float, number of tracks to be shown initially 

1501 (default: 12) 

1502 :param follow: time interval (in seconds) for real time follow mode or 

1503 ``None`` 

1504 :param controls: bool, whether to show the main controls (default: 

1505 ``True``) 

1506 :param opengl: bool, whether to use opengl (default: ``False``) 

1507 ''' 

1508 

1509 from pyrocko.gui.snuffler import snuffle 

1510 snuffle(self, **kwargs) 

1511 

1512 

1513def make_pile( 

1514 paths=None, selector=None, regex=None, 

1515 fileformat='mseed', 

1516 cachedirname=None, show_progress=True): 

1517 

1518 ''' 

1519 Create pile from given file and directory names. 

1520 

1521 :param paths: filenames and/or directories to look for traces. If paths is 

1522 ``None`` ``sys.argv[1:]`` is used. 

1523 :param selector: lambda expression taking group dict of regex match object 

1524 as a single argument and which returns true or false to keep or reject 

1525 a file 

1526 :param regex: regular expression which filenames have to match 

1527 :param fileformat: format of the files ('mseed', 'sac', 'kan', 

1528 'from_extension', 'detect') 

1529 :param cachedirname: loader cache is stored under this directory. It is 

1530 created as neccessary. 

1531 :param show_progress: show progress bar and other progress information 

1532 ''' 

1533 

1534 if show_progress_force_off: 

1535 show_progress = False 

1536 

1537 if isinstance(paths, str): 

1538 paths = [paths] 

1539 

1540 if paths is None: 

1541 paths = sys.argv[1:] 

1542 

1543 if cachedirname is None: 

1544 cachedirname = config.config().cache_dir 

1545 

1546 fns = util.select_files( 

1547 paths, include=regex, selector=selector, show_progress=show_progress) 

1548 

1549 cache = get_cache(cachedirname) 

1550 p = Pile() 

1551 p.load_files( 

1552 sorted(fns), 

1553 cache=cache, 

1554 fileformat=fileformat, 

1555 show_progress=show_progress) 

1556 

1557 return p 

1558 

1559 

1560class Injector(trace.States): 

1561 

1562 def __init__( 

1563 self, pile, 

1564 fixation_length=None, 

1565 path=None, 

1566 format='from_extension', 

1567 forget_fixed=False): 

1568 

1569 trace.States.__init__(self) 

1570 self._pile = pile 

1571 self._fixation_length = fixation_length 

1572 self._format = format 

1573 self._path = path 

1574 self._forget_fixed = forget_fixed 

1575 

1576 def set_fixation_length(self, length): 

1577 ''' 

1578 Set length after which the fixation method is called on buffer traces. 

1579 

1580 The length should be given in seconds. Give None to disable. 

1581 ''' 

1582 self.fixate_all() 

1583 self._fixation_length = length # in seconds 

1584 

1585 def set_save_path( 

1586 self, 

1587 path='dump_%(network)s.%(station)s.%(location)s.%(channel)s_' 

1588 '%(tmin)s_%(tmax)s.mseed'): 

1589 

1590 self.fixate_all() 

1591 self._path = path 

1592 

1593 def inject(self, trace): 

1594 logger.debug('Received a trace: %s' % trace) 

1595 

1596 buf = self.get(trace) 

1597 if buf is None: 

1598 trbuf = trace.copy() 

1599 buf = MemTracesFile(None, [trbuf]) 

1600 self._pile.add_file(buf) 

1601 self.set(trace, buf) 

1602 

1603 else: 

1604 self._pile.remove_file(buf) 

1605 trbuf = buf.get_traces()[0] 

1606 buf.remove(trbuf) 

1607 trbuf.append(trace.ydata) 

1608 buf.add(trbuf) 

1609 self._pile.add_file(buf) 

1610 self.set(trace, buf) 

1611 

1612 trbuf = buf.get_traces()[0] 

1613 if self._fixation_length is not None: 

1614 if trbuf.tmax - trbuf.tmin > self._fixation_length: 

1615 self._fixate(buf, complete=False) 

1616 

1617 def fixate_all(self): 

1618 for state in list(self._states.values()): 

1619 self._fixate(state[-1]) 

1620 

1621 self._states = {} 

1622 

1623 def free(self, buf): 

1624 self._fixate(buf) 

1625 

1626 def _fixate(self, buf, complete=True): 

1627 trbuf = buf.get_traces()[0] 

1628 del_state = True 

1629 if self._path: 

1630 if self._fixation_length is not None: 

1631 ttmin = trbuf.tmin 

1632 ytmin = util.year_start(ttmin) 

1633 n = int(math.floor((ttmin - ytmin) / self._fixation_length)) 

1634 tmin = ytmin + n*self._fixation_length 

1635 traces = [] 

1636 t = tmin 

1637 while t <= trbuf.tmax: 

1638 try: 

1639 traces.append( 

1640 trbuf.chop( 

1641 t, 

1642 t+self._fixation_length, 

1643 inplace=False, 

1644 snap=(math.ceil, math.ceil))) 

1645 

1646 except trace.NoData: 

1647 pass 

1648 t += self._fixation_length 

1649 

1650 if abs(traces[-1].tmax - (t - trbuf.deltat)) < \ 

1651 trbuf.deltat/100. or complete: 

1652 

1653 self._pile.remove_file(buf) 

1654 

1655 else: # reinsert incomplete last part 

1656 new_trbuf = traces.pop() 

1657 self._pile.remove_file(buf) 

1658 buf.remove(trbuf) 

1659 buf.add(new_trbuf) 

1660 self._pile.add_file(buf) 

1661 del_state = False 

1662 

1663 else: 

1664 traces = [trbuf] 

1665 self._pile.remove_file(buf) 

1666 

1667 fns = io.save(traces, self._path, format=self._format) 

1668 

1669 if not self._forget_fixed: 

1670 self._pile.load_files( 

1671 fns, show_progress=False, fileformat=self._format) 

1672 

1673 if del_state: 

1674 del self._states[trbuf.nslc_id] 

1675 

1676 def __del__(self): 

1677 self.fixate_all()