1# https://pyrocko.org - GPLv3 

2# 

3# The Pyrocko Developers, 21st Century 

4# ---|P------/S----------~Lg---------- 

5 

6from __future__ import absolute_import, print_function, division 

7 

8import os 

9import logging 

10import time 

11import weakref 

12import copy 

13import re 

14import sys 

15import operator 

16import math 

17import hashlib 

18try: 

19 import cPickle as pickle 

20except ImportError: 

21 import pickle 

22 

23 

24from . import avl 

25from . import trace, io, util 

26from . import config 

27from .trace import degapper 

28 

29 

30is_windows = sys.platform.startswith('win') 

31show_progress_force_off = False 

32version_salt = 'v1-' 

33 

34 

35def ehash(s): 

36 return hashlib.sha1((version_salt + s).encode('utf8')).hexdigest() 

37 

38 

39def cmp(a, b): 

40 return int(a > b) - int(a < b) 

41 

42 

43def sl(s): 

44 return [str(x) for x in sorted(s)] 

45 

46 

47class Counter(dict): 

48 

49 def __missing__(self, k): 

50 return 0 

51 

52 def update(self, other): 

53 for k, v in other.items(): 

54 self[k] += v 

55 

56 def subtract(self, other): 

57 for k, v in other.items(): 

58 self[k] -= v 

59 if self[k] <= 0: 

60 del self[k] 

61 

62 def subtract1(self, k): 

63 self[k] -= 1 

64 if self[k] <= 0: 

65 del self[k] 

66 

67 

68def fix_unicode_copy(counter, func): 

69 counter_new = Counter() 

70 for k in counter: 

71 counter_new[func(k)] = counter[k] 

72 return counter_new 

73 

74 

75pjoin = os.path.join 

76logger = logging.getLogger('pyrocko.pile') 

77 

78 

79def avl_remove_exact(avltree, element): 

80 ilo, ihi = avltree.span(element) 

81 for i in range(ilo, ihi): 

82 if avltree[i] is element: 

83 avltree.remove_at(i) 

84 return i 

85 

86 raise ValueError( 

87 'avl_remove_exact(avltree, element): element not in avltree') 

88 

89 

90def cmpfunc(key): 

91 if isinstance(key, str): 

92 # special cases; these run about 50% faster than the generic one on 

93 # Python 2.5 

94 if key == 'tmin': 

95 return lambda a, b: cmp(a.tmin, b.tmin) 

96 if key == 'tmax': 

97 return lambda a, b: cmp(a.tmax, b.tmax) 

98 

99 key = operator.attrgetter(key) 

100 

101 return lambda a, b: cmp(key(a), key(b)) 

102 

103 

104g_dummys = {} 

105 

106 

107def get_dummy(key): 

108 if key not in g_dummys: 

109 class Dummy(object): 

110 def __init__(self, k): 

111 setattr(self, key, k) 

112 

113 g_dummys[key] = Dummy 

114 

115 return g_dummys[key] 

116 

117 

118class TooMany(Exception): 

119 def __init__(self, n): 

120 Exception.__init__(self) 

121 self.n = n 

122 

123 

124class Sorted(object): 

125 def __init__(self, values=[], key=None): 

126 self._set_key(key) 

127 self._avl = avl.new(values, self._cmp) 

128 

129 def _set_key(self, key): 

130 self._key = key 

131 self._cmp = cmpfunc(key) 

132 if isinstance(key, str): 

133 self._dummy = get_dummy(key) 

134 

135 def __getstate__(self): 

136 state = list(self._avl.iter()), self._key 

137 return state 

138 

139 def __setstate__(self, state): 

140 l, key = state 

141 self._set_key(key) 

142 self._avl = avl.from_iter(iter(l), len(l)) 

143 

144 def insert(self, value): 

145 self._avl.insert(value) 

146 

147 def remove(self, value): 

148 return avl_remove_exact(self._avl, value) 

149 

150 def remove_at(self, i): 

151 return self._avl.remove_at(i) 

152 

153 def insert_many(self, values): 

154 for value in values: 

155 self._avl.insert(value) 

156 

157 def remove_many(self, values): 

158 for value in values: 

159 avl_remove_exact(self._avl, value) 

160 

161 def __iter__(self): 

162 return iter(self._avl) 

163 

164 def with_key_in(self, kmin, kmax): 

165 omin, omax = self._dummy(kmin), self._dummy(kmax) 

166 ilo, ihi = self._avl.span(omin, omax) 

167 return self._avl[ilo:ihi] 

168 

169 def with_key_in_limited(self, kmin, kmax, nmax): 

170 omin, omax = self._dummy(kmin), self._dummy(kmax) 

171 ilo, ihi = self._avl.span(omin, omax) 

172 if ihi - ilo > nmax: 

173 raise TooMany(ihi - ilo) 

174 

175 return self._avl[ilo:ihi] 

176 

177 def index(self, value): 

178 ilo, ihi = self._avl.span(value) 

179 for i in range(ilo, ihi): 

180 if self._avl[i] is value: 

181 return i 

182 

183 raise ValueError('element is not in avl tree') 

184 

185 def min(self): 

186 return self._avl.min() 

187 

188 def max(self): 

189 return self._avl.max() 

190 

191 def __len__(self): 

192 return len(self._avl) 

193 

194 def __getitem__(self, i): 

195 return self._avl[i] 

196 

197 

198class TracesFileCache(object): 

199 ''' 

200 Manages trace metainformation cache. 

201 

202 For each directory with files containing traces, one cache file is 

203 maintained to hold the trace metainformation of all files which are 

204 contained in the directory. 

205 ''' 

206 

207 caches = {} 

208 

209 def __init__(self, cachedir): 

210 ''' 

211 Create new cache. 

212 

213 :param cachedir: directory to hold the cache files. 

214 ''' 

215 

216 self.cachedir = cachedir 

217 self.dircaches = {} 

218 self.modified = set() 

219 util.ensuredir(self.cachedir) 

220 

221 def get(self, abspath): 

222 ''' 

223 Try to get an item from the cache. 

224 

225 :param abspath: absolute path of the object to retrieve 

226 

227 :returns: a stored object is returned or None if nothing could be 

228 found. 

229 ''' 

230 

231 dircache = self._get_dircache_for(abspath) 

232 if abspath in dircache: 

233 return dircache[abspath] 

234 return None 

235 

236 def put(self, abspath, tfile): 

237 ''' 

238 Put an item into the cache. 

239 

240 :param abspath: absolute path of the object to be stored 

241 :param tfile: object to be stored 

242 ''' 

243 

244 cachepath = self._dircachepath(abspath) 

245 # get lock on cachepath here 

246 dircache = self._get_dircache(cachepath) 

247 dircache[abspath] = tfile 

248 self.modified.add(cachepath) 

249 

250 def dump_modified(self): 

251 ''' 

252 Save any modifications to disk. 

253 ''' 

254 

255 for cachepath in self.modified: 

256 self._dump_dircache(self.dircaches[cachepath], cachepath) 

257 # unlock 

258 

259 self.modified = set() 

260 

261 def clean(self): 

262 ''' 

263 Weed out missing files from the disk caches. 

264 ''' 

265 

266 self.dump_modified() 

267 

268 for fn in os.listdir(self.cachedir): 

269 if len(fn) == 40: 

270 cache = self._load_dircache(pjoin(self.cachedir, fn)) 

271 self._dump_dircache(cache, pjoin(self.cachedir, fn)) 

272 

273 def _get_dircache_for(self, abspath): 

274 return self._get_dircache(self._dircachepath(abspath)) 

275 

276 def _get_dircache(self, cachepath): 

277 if cachepath not in self.dircaches: 

278 if os.path.isfile(cachepath): 

279 self.dircaches[cachepath] = self._load_dircache(cachepath) 

280 else: 

281 self.dircaches[cachepath] = {} 

282 

283 return self.dircaches[cachepath] 

284 

285 def _dircachepath(self, abspath): 

286 cachefn = ehash(os.path.dirname(abspath)) 

287 return pjoin(self.cachedir, cachefn) 

288 

289 def _load_dircache(self, cachefilename): 

290 

291 with open(cachefilename, 'rb') as f: 

292 cache = pickle.load(f) 

293 

294 # weed out files which no longer exist 

295 for fn in list(cache.keys()): 

296 if not os.path.isfile(fn): 

297 del cache[fn] 

298 

299 time_float = util.get_time_float() 

300 

301 for v in cache.values(): 

302 v.trees_from_content(v.traces) 

303 for tr in v.traces: 

304 tr.file = v 

305 # fix Py2 codes to not include unicode when the cache file 

306 # was created with Py3 

307 if not isinstance(tr.station, str): 

308 tr.prune_from_reuse_cache() 

309 tr.set_codes( 

310 str(tr.network), 

311 str(tr.station), 

312 str(tr.location), 

313 str(tr.channel)) 

314 

315 tr.tmin = time_float(tr.tmin) 

316 tr.tmax = time_float(tr.tmax) 

317 

318 v.data_use_count = 0 

319 v.data_loaded = False 

320 v.fix_unicode_codes() 

321 

322 return cache 

323 

324 def _dump_dircache(self, cache, cachefilename): 

325 

326 if not cache: 

327 if os.path.exists(cachefilename): 

328 os.remove(cachefilename) 

329 return 

330 

331 # make a copy without the parents and the binsearch trees 

332 cache_copy = {} 

333 for fn in cache.keys(): 

334 trf = copy.copy(cache[fn]) 

335 trf.parent = None 

336 trf.by_tmin = None 

337 trf.by_tmax = None 

338 trf.by_tlen = None 

339 trf.by_mtime = None 

340 trf.data_use_count = 0 

341 trf.data_loaded = False 

342 traces = [] 

343 for tr in trf.traces: 

344 tr = tr.copy(data=False) 

345 tr.ydata = None 

346 tr.meta = None 

347 tr.file = trf 

348 traces.append(tr) 

349 

350 trf.traces = traces 

351 

352 cache_copy[fn] = trf 

353 

354 tmpfn = cachefilename+'.%i.tmp' % os.getpid() 

355 with open(tmpfn, 'wb') as f: 

356 pickle.dump(cache_copy, f, protocol=2) 

357 

358 if is_windows and os.path.exists(cachefilename): 

359 # windows doesn't allow to rename over existing file 

360 os.unlink(cachefilename) 

361 

362 os.rename(tmpfn, cachefilename) 

363 

364 

365def get_cache(cachedir): 

366 ''' 

367 Get global TracesFileCache object for given directory. 

368 ''' 

369 if cachedir not in TracesFileCache.caches: 

370 TracesFileCache.caches[cachedir] = TracesFileCache(cachedir) 

371 

372 return TracesFileCache.caches[cachedir] 

373 

374 

375def loader( 

376 filenames, fileformat, cache, filename_attributes, 

377 show_progress=True, update_progress=None): 

378 

379 if show_progress_force_off: 

380 show_progress = False 

381 

382 class Progress(object): 

383 def __init__(self, label, n): 

384 self._label = label 

385 self._n = n 

386 self._bar = None 

387 if show_progress: 

388 self._bar = util.progressbar(label, self._n) 

389 

390 if update_progress: 

391 update_progress(label, 0, self._n) 

392 

393 def update(self, i): 

394 if self._bar: 

395 if i < self._n-1: 

396 self._bar.update(i) 

397 else: 

398 self._bar.finish() 

399 self._bar = None 

400 

401 abort = False 

402 if update_progress: 

403 abort = update_progress(self._label, i, self._n) 

404 

405 return abort 

406 

407 def finish(self): 

408 if self._bar: 

409 self._bar.finish() 

410 self._bar = None 

411 

412 if not filenames: 

413 logger.warning('No files to load from') 

414 return 

415 

416 regex = None 

417 if filename_attributes: 

418 regex = re.compile(filename_attributes) 

419 

420 try: 

421 progress = Progress('Looking at files', len(filenames)) 

422 

423 failures = [] 

424 to_load = [] 

425 for i, filename in enumerate(filenames): 

426 try: 

427 abspath = os.path.abspath(filename) 

428 

429 substitutions = None 

430 if regex: 

431 m = regex.search(filename) 

432 if not m: 

433 raise FilenameAttributeError( 

434 "Cannot get attributes with pattern '%s' " 

435 "from path '%s'" % (filename_attributes, filename)) 

436 

437 substitutions = {} 

438 for k in m.groupdict(): 

439 if k in ('network', 'station', 'location', 'channel'): 

440 substitutions[k] = m.groupdict()[k] 

441 

442 mtime = os.stat(filename)[8] 

443 tfile = None 

444 if cache: 

445 tfile = cache.get(abspath) 

446 

447 mustload = ( 

448 not tfile or 

449 (tfile.format != fileformat and fileformat != 'detect') or 

450 tfile.mtime != mtime or 

451 substitutions is not None) 

452 

453 to_load.append( 

454 (mustload, mtime, abspath, substitutions, tfile)) 

455 

456 except (OSError, FilenameAttributeError) as xerror: 

457 failures.append(abspath) 

458 logger.warning(xerror) 

459 

460 abort = progress.update(i+1) 

461 if abort: 

462 progress.update(len(filenames)) 

463 return 

464 

465 progress.update(len(filenames)) 

466 

467 to_load.sort(key=lambda x: x[2]) 

468 

469 nload = len([1 for x in to_load if x[0]]) 

470 iload = 0 

471 

472 count_all = False 

473 if nload < 0.01*len(to_load): 

474 nload = len(to_load) 

475 count_all = True 

476 

477 if to_load: 

478 progress = Progress('Scanning files', nload) 

479 

480 for (mustload, mtime, abspath, substitutions, tfile) in to_load: 

481 try: 

482 if mustload: 

483 tfile = TracesFile( 

484 None, abspath, fileformat, 

485 substitutions=substitutions, mtime=mtime) 

486 

487 if cache and not substitutions: 

488 cache.put(abspath, tfile) 

489 

490 if not count_all: 

491 iload += 1 

492 

493 if count_all: 

494 iload += 1 

495 

496 except (io.FileLoadError, OSError) as xerror: 

497 failures.append(abspath) 

498 logger.warning(xerror) 

499 else: 

500 yield tfile 

501 

502 abort = progress.update(iload+1) 

503 if abort: 

504 break 

505 

506 progress.update(nload) 

507 

508 if failures: 

509 logger.warning( 

510 'The following file%s caused problems and will be ignored:\n' % 

511 util.plural_s(len(failures)) + '\n'.join(failures)) 

512 

513 if cache: 

514 cache.dump_modified() 

515 finally: 

516 progress.finish() 

517 

518 

519def tlen(x): 

520 return x.tmax-x.tmin 

521 

522 

523class TracesGroup(object): 

524 

525 ''' 

526 Trace container base class. 

527 

528 Base class for Pile, SubPile, and TracesFile, i.e. anything containing 

529 a collection of several traces. A TracesGroup object maintains lookup sets 

530 of some of the traces meta-information, as well as a combined time-range 

531 of its contents. 

532 ''' 

533 

534 def __init__(self, parent): 

535 self.parent = parent 

536 self.empty() 

537 self.nupdates = 0 

538 self.abspath = None 

539 

540 def set_parent(self, parent): 

541 self.parent = parent 

542 

543 def get_parent(self): 

544 return self.parent 

545 

546 def empty(self): 

547 self.networks, self.stations, self.locations, self.channels, \ 

548 self.nslc_ids, self.deltats = [Counter() for x in range(6)] 

549 self.by_tmin = Sorted([], 'tmin') 

550 self.by_tmax = Sorted([], 'tmax') 

551 self.by_tlen = Sorted([], tlen) 

552 self.by_mtime = Sorted([], 'mtime') 

553 self.tmin, self.tmax = None, None 

554 self.deltatmin, self.deltatmax = None, None 

555 

556 def trees_from_content(self, content): 

557 self.by_tmin = Sorted(content, 'tmin') 

558 self.by_tmax = Sorted(content, 'tmax') 

559 self.by_tlen = Sorted(content, tlen) 

560 self.by_mtime = Sorted(content, 'mtime') 

561 self.adjust_minmax() 

562 

563 def fix_unicode_codes(self): 

564 for net in self.networks: 

565 if isinstance(net, str): 

566 return 

567 

568 self.networks = fix_unicode_copy(self.networks, str) 

569 self.stations = fix_unicode_copy(self.stations, str) 

570 self.locations = fix_unicode_copy(self.locations, str) 

571 self.channels = fix_unicode_copy(self.channels, str) 

572 self.nslc_ids = fix_unicode_copy( 

573 self.nslc_ids, lambda k: tuple(str(x) for x in k)) 

574 

575 def add(self, content): 

576 ''' 

577 Add content to traces group and update indices. 

578 

579 Accepts :py:class:`pyrocko.trace.Trace` objects and 

580 :py:class:`pyrocko.pile.TracesGroup` objects. 

581 ''' 

582 

583 if isinstance(content, (trace.Trace, TracesGroup)): 

584 content = [content] 

585 

586 for c in content: 

587 

588 if isinstance(c, TracesGroup): 

589 self.networks.update(c.networks) 

590 self.stations.update(c.stations) 

591 self.locations.update(c.locations) 

592 self.channels.update(c.channels) 

593 self.nslc_ids.update(c.nslc_ids) 

594 self.deltats.update(c.deltats) 

595 

596 self.by_tmin.insert_many(c.by_tmin) 

597 self.by_tmax.insert_many(c.by_tmax) 

598 self.by_tlen.insert_many(c.by_tlen) 

599 self.by_mtime.insert_many(c.by_mtime) 

600 

601 elif isinstance(c, trace.Trace): 

602 self.networks[c.network] += 1 

603 self.stations[c.station] += 1 

604 self.locations[c.location] += 1 

605 self.channels[c.channel] += 1 

606 self.nslc_ids[c.nslc_id] += 1 

607 self.deltats[c.deltat] += 1 

608 

609 self.by_tmin.insert(c) 

610 self.by_tmax.insert(c) 

611 self.by_tlen.insert(c) 

612 self.by_mtime.insert(c) 

613 

614 self.adjust_minmax() 

615 

616 self.nupdates += 1 

617 self.notify_listeners('add') 

618 

619 if self.parent is not None: 

620 self.parent.add(content) 

621 

622 def remove(self, content): 

623 ''' 

624 Remove content to traces group and update indices. 

625 ''' 

626 if isinstance(content, (trace.Trace, TracesGroup)): 

627 content = [content] 

628 

629 for c in content: 

630 

631 if isinstance(c, TracesGroup): 

632 self.networks.subtract(c.networks) 

633 self.stations.subtract(c.stations) 

634 self.locations.subtract(c.locations) 

635 self.channels.subtract(c.channels) 

636 self.nslc_ids.subtract(c.nslc_ids) 

637 self.deltats.subtract(c.deltats) 

638 

639 self.by_tmin.remove_many(c.by_tmin) 

640 self.by_tmax.remove_many(c.by_tmax) 

641 self.by_tlen.remove_many(c.by_tlen) 

642 self.by_mtime.remove_many(c.by_mtime) 

643 

644 elif isinstance(c, trace.Trace): 

645 self.networks.subtract1(c.network) 

646 self.stations.subtract1(c.station) 

647 self.locations.subtract1(c.location) 

648 self.channels.subtract1(c.channel) 

649 self.nslc_ids.subtract1(c.nslc_id) 

650 self.deltats.subtract1(c.deltat) 

651 

652 self.by_tmin.remove(c) 

653 self.by_tmax.remove(c) 

654 self.by_tlen.remove(c) 

655 self.by_mtime.remove(c) 

656 

657 self.adjust_minmax() 

658 

659 self.nupdates += 1 

660 self.notify_listeners('remove') 

661 

662 if self.parent is not None: 

663 self.parent.remove(content) 

664 

665 def relevant(self, tmin, tmax, group_selector=None, trace_selector=None): 

666 ''' 

667 Return list of :py:class:`pyrocko.trace.Trace` objects where given 

668 arguments ``tmin`` and ``tmax`` match. 

669 

670 :param tmin: start time 

671 :param tmax: end time 

672 :param group_selector: lambda expression taking group dict of regex 

673 match object as a single argument and which returns true or false 

674 to keep or reject a file (default: ``None``) 

675 :param trace_selector: lambda expression taking group dict of regex 

676 match object as a single argument and which returns true or false 

677 to keep or reject a file (default: ``None``) 

678 ''' 

679 

680 if not self.by_tmin or not self.is_relevant( 

681 tmin, tmax, group_selector): 

682 

683 return [] 

684 

685 return [tr for tr in self.by_tmin.with_key_in(tmin-self.tlenmax, tmax) 

686 if tr.is_relevant(tmin, tmax, trace_selector)] 

687 

688 def adjust_minmax(self): 

689 if self.by_tmin: 

690 self.tmin = self.by_tmin.min().tmin 

691 self.tmax = self.by_tmax.max().tmax 

692 t = self.by_tlen.max() 

693 self.tlenmax = t.tmax - t.tmin 

694 self.mtime = self.by_mtime.max().mtime 

695 deltats = list(self.deltats.keys()) 

696 self.deltatmin = min(deltats) 

697 self.deltatmax = max(deltats) 

698 else: 

699 self.tmin = None 

700 self.tmax = None 

701 self.tlenmax = None 

702 self.mtime = None 

703 self.deltatmin = None 

704 self.deltatmax = None 

705 

706 def notify_listeners(self, what): 

707 pass 

708 

709 def get_update_count(self): 

710 return self.nupdates 

711 

712 def overlaps(self, tmin, tmax): 

713 return self.tmin is not None \ 

714 and tmax >= self.tmin and self.tmax >= tmin 

715 

716 def is_relevant(self, tmin, tmax, group_selector=None): 

717 if self.tmin is None or self.tmax is None: 

718 return False 

719 return tmax >= self.tmin and self.tmax >= tmin and ( 

720 group_selector is None or group_selector(self)) 

721 

722 

723class MemTracesFile(TracesGroup): 

724 

725 ''' 

726 This is needed to make traces without an actual disc file to be inserted 

727 into a Pile. 

728 ''' 

729 

730 def __init__(self, parent, traces): 

731 TracesGroup.__init__(self, parent) 

732 self.add(traces) 

733 self.mtime = time.time() 

734 

735 def add(self, traces): 

736 if isinstance(traces, trace.Trace): 

737 traces = [traces] 

738 

739 for tr in traces: 

740 tr.file = self 

741 

742 TracesGroup.add(self, traces) 

743 

744 def load_headers(self, mtime=None): 

745 pass 

746 

747 def load_data(self): 

748 pass 

749 

750 def use_data(self): 

751 pass 

752 

753 def drop_data(self): 

754 pass 

755 

756 def reload_if_modified(self): 

757 return False 

758 

759 def iter_traces(self): 

760 for tr in self.by_tmin: 

761 yield tr 

762 

763 def get_traces(self): 

764 return list(self.by_tmin) 

765 

766 def gather_keys(self, gather, selector=None): 

767 keys = set() 

768 for tr in self.by_tmin: 

769 if selector is None or selector(tr): 

770 keys.add(gather(tr)) 

771 

772 return keys 

773 

774 def __str__(self): 

775 

776 s = 'MemTracesFile\n' 

777 s += 'file mtime: %s\n' % util.time_to_str(self.mtime) 

778 s += 'number of traces: %i\n' % len(self.by_tmin) 

779 s += 'timerange: %s - %s\n' % ( 

780 util.time_to_str(self.tmin), util.time_to_str(self.tmax)) 

781 s += 'networks: %s\n' % ', '.join(sl(self.networks.keys())) 

782 s += 'stations: %s\n' % ', '.join(sl(self.stations.keys())) 

783 s += 'locations: %s\n' % ', '.join(sl(self.locations.keys())) 

784 s += 'channels: %s\n' % ', '.join(sl(self.channels.keys())) 

785 s += 'deltats: %s\n' % ', '.join(sl(self.deltats.keys())) 

786 return s 

787 

788 

789class TracesFile(TracesGroup): 

790 def __init__( 

791 self, parent, abspath, format, 

792 substitutions=None, mtime=None): 

793 

794 TracesGroup.__init__(self, parent) 

795 self.abspath = abspath 

796 self.format = format 

797 self.traces = [] 

798 self.data_loaded = False 

799 self.data_use_count = 0 

800 self.substitutions = substitutions 

801 self.load_headers(mtime=mtime) 

802 self.mtime = mtime 

803 

804 def load_headers(self, mtime=None): 

805 logger.debug('loading headers from file: %s' % self.abspath) 

806 if mtime is None: 

807 self.mtime = os.stat(self.abspath)[8] 

808 

809 def kgen(tr): 

810 return (tr.mtime, tr.tmin, tr.tmax) + tr.nslc_id 

811 

812 self.remove(self.traces) 

813 ks = set() 

814 for tr in io.load(self.abspath, 

815 format=self.format, 

816 getdata=False, 

817 substitutions=self.substitutions): 

818 

819 k = kgen(tr) 

820 if k not in ks: 

821 ks.add(k) 

822 self.traces.append(tr) 

823 tr.file = self 

824 

825 self.add(self.traces) 

826 

827 self.data_loaded = False 

828 self.data_use_count = 0 

829 

830 def load_data(self, force=False): 

831 file_changed = False 

832 if not self.data_loaded or force: 

833 logger.debug('loading data from file: %s' % self.abspath) 

834 

835 def kgen(tr): 

836 return (tr.mtime, tr.tmin, tr.tmax) + tr.nslc_id 

837 

838 traces_ = io.load(self.abspath, format=self.format, getdata=True, 

839 substitutions=self.substitutions) 

840 

841 # prevent adding duplicate snippets from corrupt mseed files 

842 k_loaded = set() 

843 traces = [] 

844 for tr in traces_: 

845 k = kgen(tr) 

846 if k not in k_loaded: 

847 k_loaded.add(k) 

848 traces.append(tr) 

849 

850 k_current_d = dict((kgen(tr), tr) for tr in self.traces) 

851 k_current = set(k_current_d) 

852 k_new = k_loaded - k_current 

853 k_delete = k_current - k_loaded 

854 k_unchanged = k_current & k_loaded 

855 

856 for tr in self.traces[:]: 

857 if kgen(tr) in k_delete: 

858 self.remove(tr) 

859 self.traces.remove(tr) 

860 tr.file = None 

861 file_changed = True 

862 

863 for tr in traces: 

864 if kgen(tr) in k_new: 

865 tr.file = self 

866 self.traces.append(tr) 

867 self.add(tr) 

868 file_changed = True 

869 

870 for tr in traces: 

871 if kgen(tr) in k_unchanged: 

872 ctr = k_current_d[kgen(tr)] 

873 ctr.ydata = tr.ydata 

874 

875 self.data_loaded = True 

876 

877 if file_changed: 

878 logger.debug('reloaded (file may have changed): %s' % self.abspath) 

879 

880 return file_changed 

881 

882 def use_data(self): 

883 if not self.data_loaded: 

884 raise Exception('Data not loaded') 

885 self.data_use_count += 1 

886 

887 def drop_data(self): 

888 if self.data_loaded: 

889 if self.data_use_count == 1: 

890 logger.debug('forgetting data of file: %s' % self.abspath) 

891 for tr in self.traces: 

892 tr.drop_data() 

893 

894 self.data_loaded = False 

895 

896 self.data_use_count -= 1 

897 else: 

898 self.data_use_count = 0 

899 

900 def reload_if_modified(self): 

901 mtime = os.stat(self.abspath)[8] 

902 if mtime != self.mtime: 

903 logger.debug( 

904 'mtime=%i, reloading file: %s' % (mtime, self.abspath)) 

905 

906 self.mtime = mtime 

907 if self.data_loaded: 

908 self.load_data(force=True) 

909 else: 

910 self.load_headers() 

911 

912 return True 

913 

914 return False 

915 

916 def iter_traces(self): 

917 for tr in self.traces: 

918 yield tr 

919 

920 def gather_keys(self, gather, selector=None): 

921 keys = set() 

922 for tr in self.by_tmin: 

923 if selector is None or selector(tr): 

924 keys.add(gather(tr)) 

925 

926 return keys 

927 

928 def __str__(self): 

929 s = 'TracesFile\n' 

930 s += 'abspath: %s\n' % self.abspath 

931 s += 'file mtime: %s\n' % util.time_to_str(self.mtime) 

932 s += 'number of traces: %i\n' % len(self.traces) 

933 s += 'timerange: %s - %s\n' % ( 

934 util.time_to_str(self.tmin), util.time_to_str(self.tmax)) 

935 s += 'networks: %s\n' % ', '.join(sl(self.networks.keys())) 

936 s += 'stations: %s\n' % ', '.join(sl(self.stations.keys())) 

937 s += 'locations: %s\n' % ', '.join(sl(self.locations.keys())) 

938 s += 'channels: %s\n' % ', '.join(sl(self.channels.keys())) 

939 s += 'deltats: %s\n' % ', '.join(sl(self.deltats.keys())) 

940 return s 

941 

942 

943class FilenameAttributeError(Exception): 

944 pass 

945 

946 

947class SubPile(TracesGroup): 

948 def __init__(self, parent): 

949 TracesGroup.__init__(self, parent) 

950 self.files = [] 

951 self.empty() 

952 

953 def add_file(self, file): 

954 self.files.append(file) 

955 file.set_parent(self) 

956 self.add(file) 

957 

958 def remove_file(self, file): 

959 self.files.remove(file) 

960 file.set_parent(None) 

961 self.remove(file) 

962 

963 def remove_files(self, files): 

964 for file in files: 

965 self.files.remove(file) 

966 file.set_parent(None) 

967 self.remove(files) 

968 

969 def gather_keys(self, gather, selector=None): 

970 keys = set() 

971 for file in self.files: 

972 keys |= file.gather_keys(gather, selector) 

973 

974 return keys 

975 

976 def iter_traces( 

977 self, 

978 load_data=False, 

979 return_abspath=False, 

980 group_selector=None, 

981 trace_selector=None): 

982 

983 for file in self.files: 

984 

985 if group_selector and not group_selector(file): 

986 continue 

987 

988 must_drop = False 

989 if load_data: 

990 file.load_data() 

991 file.use_data() 

992 must_drop = True 

993 

994 for tr in file.iter_traces(): 

995 if trace_selector and not trace_selector(tr): 

996 continue 

997 

998 if return_abspath: 

999 yield file.abspath, tr 

1000 else: 

1001 yield tr 

1002 

1003 if must_drop: 

1004 file.drop_data() 

1005 

1006 def iter_files(self): 

1007 for file in self.files: 

1008 yield file 

1009 

1010 def reload_modified(self): 

1011 modified = False 

1012 for file in self.files: 

1013 modified |= file.reload_if_modified() 

1014 

1015 return modified 

1016 

1017 def __str__(self): 

1018 s = 'SubPile\n' 

1019 s += 'number of files: %i\n' % len(self.files) 

1020 s += 'timerange: %s - %s\n' % ( 

1021 util.time_to_str(self.tmin), util.time_to_str(self.tmax)) 

1022 s += 'networks: %s\n' % ', '.join(sl(self.networks.keys())) 

1023 s += 'stations: %s\n' % ', '.join(sl(self.stations.keys())) 

1024 s += 'locations: %s\n' % ', '.join(sl(self.locations.keys())) 

1025 s += 'channels: %s\n' % ', '.join(sl(self.channels.keys())) 

1026 s += 'deltats: %s\n' % ', '.join(sl(self.deltats.keys())) 

1027 return s 

1028 

1029 

1030class Batch(object): 

1031 ''' 

1032 Batch of waveforms from window wise data extraction. 

1033 

1034 Encapsulates state and results yielded for each window in window wise 

1035 waveform extraction with the :py:meth:`Pile.chopper` method (when the 

1036 `style='batch'` keyword argument set). 

1037 

1038 *Attributes:* 

1039 

1040 .. py:attribute:: tmin 

1041 

1042 Start of this time window. 

1043 

1044 .. py:attribute:: tmax 

1045 

1046 End of this time window. 

1047 

1048 .. py:attribute:: i 

1049 

1050 Index of this time window in sequence. 

1051 

1052 .. py:attribute:: n 

1053 

1054 Total number of time windows in sequence. 

1055 

1056 .. py:attribute:: traces 

1057 

1058 Extracted waveforms for this time window. 

1059 ''' 

1060 

1061 def __init__(self, tmin, tmax, i, n, traces): 

1062 self.tmin = tmin 

1063 self.tmax = tmax 

1064 self.i = i 

1065 self.n = n 

1066 self.traces = traces 

1067 

1068 

1069class Pile(TracesGroup): 

1070 ''' 

1071 Waveform archive lookup, data loading and caching infrastructure. 

1072 ''' 

1073 

1074 def __init__(self): 

1075 TracesGroup.__init__(self, None) 

1076 self.subpiles = {} 

1077 self.open_files = {} 

1078 self.listeners = [] 

1079 self.abspaths = set() 

1080 

1081 def add_listener(self, obj): 

1082 self.listeners.append(weakref.ref(obj)) 

1083 

1084 def notify_listeners(self, what): 

1085 for ref in self.listeners: 

1086 obj = ref() 

1087 if obj: 

1088 obj.pile_changed(what) 

1089 

1090 def load_files( 

1091 self, filenames, 

1092 filename_attributes=None, 

1093 fileformat='mseed', 

1094 cache=None, 

1095 show_progress=True, 

1096 update_progress=None): 

1097 

1098 load = loader( 

1099 filenames, fileformat, cache, filename_attributes, 

1100 show_progress=show_progress, 

1101 update_progress=update_progress) 

1102 

1103 self.add_files(load) 

1104 

1105 def add_files(self, files): 

1106 for file in files: 

1107 self.add_file(file) 

1108 

1109 def add_file(self, file): 

1110 if file.abspath is not None and file.abspath in self.abspaths: 

1111 logger.warning('File already in pile: %s' % file.abspath) 

1112 return 

1113 

1114 if file.deltatmin is None: 

1115 logger.warning('Sampling rate of all traces are zero in file: %s' % 

1116 file.abspath) 

1117 return 

1118 

1119 subpile = self.dispatch(file) 

1120 subpile.add_file(file) 

1121 if file.abspath is not None: 

1122 self.abspaths.add(file.abspath) 

1123 

1124 def remove_file(self, file): 

1125 subpile = file.get_parent() 

1126 if subpile is not None: 

1127 subpile.remove_file(file) 

1128 if file.abspath is not None: 

1129 self.abspaths.remove(file.abspath) 

1130 

1131 def remove_files(self, files): 

1132 subpile_files = {} 

1133 for file in files: 

1134 subpile = file.get_parent() 

1135 if subpile not in subpile_files: 

1136 subpile_files[subpile] = [] 

1137 

1138 subpile_files[subpile].append(file) 

1139 

1140 for subpile, files in subpile_files.items(): 

1141 subpile.remove_files(files) 

1142 for file in files: 

1143 if file.abspath is not None: 

1144 self.abspaths.remove(file.abspath) 

1145 

1146 def dispatch_key(self, file): 

1147 dt = int(math.floor(math.log(file.deltatmin))) 

1148 return dt 

1149 

1150 def dispatch(self, file): 

1151 k = self.dispatch_key(file) 

1152 if k not in self.subpiles: 

1153 self.subpiles[k] = SubPile(self) 

1154 

1155 return self.subpiles[k] 

1156 

1157 def get_deltats(self): 

1158 return list(self.deltats.keys()) 

1159 

1160 def chop( 

1161 self, tmin, tmax, 

1162 group_selector=None, 

1163 trace_selector=None, 

1164 snap=(round, round), 

1165 include_last=False, 

1166 load_data=True): 

1167 

1168 chopped = [] 

1169 used_files = set() 

1170 

1171 traces = self.relevant(tmin, tmax, group_selector, trace_selector) 

1172 if load_data: 

1173 files_changed = False 

1174 for tr in traces: 

1175 if tr.file and tr.file not in used_files: 

1176 if tr.file.load_data(): 

1177 files_changed = True 

1178 

1179 if tr.file is not None: 

1180 used_files.add(tr.file) 

1181 

1182 if files_changed: 

1183 traces = self.relevant( 

1184 tmin, tmax, group_selector, trace_selector) 

1185 

1186 for tr in traces: 

1187 if not load_data and tr.ydata is not None: 

1188 tr = tr.copy(data=False) 

1189 tr.ydata = None 

1190 

1191 try: 

1192 chopped.append(tr.chop( 

1193 tmin, tmax, 

1194 inplace=False, 

1195 snap=snap, 

1196 include_last=include_last)) 

1197 

1198 except trace.NoData: 

1199 pass 

1200 

1201 return chopped, used_files 

1202 

1203 def _process_chopped( 

1204 self, chopped, degap, maxgap, maxlap, want_incomplete, wmax, wmin, 

1205 tpad): 

1206 

1207 chopped.sort(key=lambda a: a.full_id) 

1208 if degap: 

1209 chopped = degapper(chopped, maxgap=maxgap, maxlap=maxlap) 

1210 

1211 if not want_incomplete: 

1212 chopped_weeded = [] 

1213 for tr in chopped: 

1214 emin = tr.tmin - (wmin-tpad) 

1215 emax = tr.tmax + tr.deltat - (wmax+tpad) 

1216 if (abs(emin) <= 0.5*tr.deltat and abs(emax) <= 0.5*tr.deltat): 

1217 chopped_weeded.append(tr) 

1218 

1219 elif degap: 

1220 if (0. < emin <= 5. * tr.deltat and 

1221 -5. * tr.deltat <= emax < 0.): 

1222 

1223 tr.extend( 

1224 wmin-tpad, 

1225 wmax+tpad-tr.deltat, 

1226 fillmethod='repeat') 

1227 

1228 chopped_weeded.append(tr) 

1229 

1230 chopped = chopped_weeded 

1231 

1232 for tr in chopped: 

1233 tr.wmin = wmin 

1234 tr.wmax = wmax 

1235 

1236 return chopped 

1237 

1238 def chopper( 

1239 self, 

1240 tmin=None, tmax=None, tinc=None, tpad=0., 

1241 group_selector=None, trace_selector=None, 

1242 want_incomplete=True, degap=True, maxgap=5, maxlap=None, 

1243 keep_current_files_open=False, accessor_id=None, 

1244 snap=(round, round), include_last=False, load_data=True, 

1245 style=None): 

1246 

1247 ''' 

1248 Get iterator for shifting window wise data extraction from waveform 

1249 archive. 

1250 

1251 :param tmin: start time (default uses start time of available data) 

1252 :param tmax: end time (default uses end time of available data) 

1253 :param tinc: time increment (window shift time) (default uses 

1254 ``tmax-tmin``) 

1255 :param tpad: padding time appended on either side of the data windows 

1256 (window overlap is ``2*tpad``) 

1257 :param group_selector: filter callback taking :py:class:`TracesGroup` 

1258 objects 

1259 :param trace_selector: filter callback taking 

1260 :py:class:`pyrocko.trace.Trace` objects 

1261 :param want_incomplete: if set to ``False``, gappy/incomplete traces 

1262 are discarded from the results 

1263 :param degap: whether to try to connect traces and to remove gaps and 

1264 overlaps 

1265 :param maxgap: maximum gap size in samples which is filled with 

1266 interpolated samples when ``degap`` is ``True`` 

1267 :param maxlap: maximum overlap size in samples which is removed when 

1268 ``degap`` is ``True`` 

1269 :param keep_current_files_open: whether to keep cached trace data in 

1270 memory after the iterator has ended 

1271 :param accessor_id: if given, used as a key to identify different 

1272 points of extraction for the decision of when to release cached 

1273 trace data (should be used when data is alternately extracted from 

1274 more than one region / selection) 

1275 :param snap: replaces Python's :py:func:`round` function which is used 

1276 to determine indices where to start and end the trace data array 

1277 :param include_last: whether to include last sample 

1278 :param load_data: whether to load the waveform data. If set to 

1279 ``False``, traces with no data samples, but with correct 

1280 meta-information are returned 

1281 :param style: set to ``'batch'`` to yield waveforms and information 

1282 about the chopper state as :py:class:`Batch` objects. By default 

1283 lists of :py:class:`pyrocko.trace.Trace` objects are yielded. 

1284 :returns: iterator providing extracted waveforms for each extracted 

1285 window. See ``style`` argument for details. 

1286 ''' 

1287 if tmin is None: 

1288 if self.tmin is None: 

1289 logger.warning('Pile\'s tmin is not set - pile may be empty.') 

1290 return 

1291 tmin = self.tmin + tpad 

1292 

1293 if tmax is None: 

1294 if self.tmax is None: 

1295 logger.warning('Pile\'s tmax is not set - pile may be empty.') 

1296 return 

1297 tmax = self.tmax - tpad 

1298 

1299 if not self.is_relevant(tmin-tpad, tmax+tpad, group_selector): 

1300 return 

1301 

1302 if accessor_id not in self.open_files: 

1303 self.open_files[accessor_id] = set() 

1304 

1305 open_files = self.open_files[accessor_id] 

1306 

1307 if tinc is None: 

1308 tinc = tmax - tmin 

1309 nwin = 1 

1310 else: 

1311 eps = tinc * 1e-6 

1312 if tinc != 0.0: 

1313 nwin = int(((tmax - eps) - tmin) / tinc) + 1 

1314 else: 

1315 nwin = 1 

1316 

1317 for iwin in range(nwin): 

1318 wmin, wmax = tmin+iwin*tinc, min(tmin+(iwin+1)*tinc, tmax) 

1319 

1320 chopped, used_files = self.chop( 

1321 wmin-tpad, wmax+tpad, group_selector, trace_selector, snap, 

1322 include_last, load_data) 

1323 

1324 for file in used_files - open_files: 

1325 # increment datause counter on newly opened files 

1326 file.use_data() 

1327 

1328 open_files.update(used_files) 

1329 

1330 processed = self._process_chopped( 

1331 chopped, degap, maxgap, maxlap, want_incomplete, wmax, wmin, 

1332 tpad) 

1333 

1334 if style == 'batch': 

1335 yield Batch( 

1336 tmin=wmin, 

1337 tmax=wmax, 

1338 i=iwin, 

1339 n=nwin, 

1340 traces=processed) 

1341 

1342 else: 

1343 yield processed 

1344 

1345 unused_files = open_files - used_files 

1346 

1347 while unused_files: 

1348 file = unused_files.pop() 

1349 file.drop_data() 

1350 open_files.remove(file) 

1351 

1352 if not keep_current_files_open: 

1353 while open_files: 

1354 file = open_files.pop() 

1355 file.drop_data() 

1356 

1357 def all(self, *args, **kwargs): 

1358 ''' 

1359 Shortcut to aggregate :py:meth:`chopper` output into a single list. 

1360 ''' 

1361 

1362 alltraces = [] 

1363 for traces in self.chopper(*args, **kwargs): 

1364 alltraces.extend(traces) 

1365 

1366 return alltraces 

1367 

1368 def iter_all(self, *args, **kwargs): 

1369 for traces in self.chopper(*args, **kwargs): 

1370 for tr in traces: 

1371 yield tr 

1372 

1373 def chopper_grouped(self, gather, progress=None, *args, **kwargs): 

1374 keys = self.gather_keys(gather) 

1375 if len(keys) == 0: 

1376 return 

1377 

1378 outer_group_selector = None 

1379 if 'group_selector' in kwargs: 

1380 outer_group_selector = kwargs['group_selector'] 

1381 

1382 outer_trace_selector = None 

1383 if 'trace_selector' in kwargs: 

1384 outer_trace_selector = kwargs['trace_selector'] 

1385 

1386 # the use of this gather-cache makes it impossible to modify the pile 

1387 # during chopping 

1388 gather_cache = {} 

1389 pbar = None 

1390 try: 

1391 if progress is not None: 

1392 pbar = util.progressbar(progress, len(keys)) 

1393 

1394 for ikey, key in enumerate(keys): 

1395 def tsel(tr): 

1396 return gather(tr) == key and ( 

1397 outer_trace_selector is None 

1398 or outer_trace_selector(tr)) 

1399 

1400 def gsel(gr): 

1401 if gr not in gather_cache: 

1402 gather_cache[gr] = gr.gather_keys(gather) 

1403 

1404 return key in gather_cache[gr] and ( 

1405 outer_group_selector is None 

1406 or outer_group_selector(gr)) 

1407 

1408 kwargs['trace_selector'] = tsel 

1409 kwargs['group_selector'] = gsel 

1410 

1411 for traces in self.chopper(*args, **kwargs): 

1412 yield traces 

1413 

1414 if pbar: 

1415 pbar.update(ikey+1) 

1416 

1417 finally: 

1418 if pbar: 

1419 pbar.finish() 

1420 

1421 def gather_keys(self, gather, selector=None): 

1422 keys = set() 

1423 for subpile in self.subpiles.values(): 

1424 keys |= subpile.gather_keys(gather, selector) 

1425 

1426 return sorted(keys) 

1427 

1428 def iter_traces( 

1429 self, 

1430 load_data=False, 

1431 return_abspath=False, 

1432 group_selector=None, 

1433 trace_selector=None): 

1434 

1435 ''' 

1436 Iterate over all traces in pile. 

1437 

1438 :param load_data: whether to load the waveform data, by default empty 

1439 traces are yielded 

1440 :param return_abspath: if ``True`` yield tuples containing absolute 

1441 file path and :py:class:`pyrocko.trace.Trace` objects 

1442 :param group_selector: filter callback taking :py:class:`TracesGroup` 

1443 objects 

1444 :param trace_selector: filter callback taking 

1445 :py:class:`pyrocko.trace.Trace` objects 

1446 

1447 Example; yields only traces, where the station code is 'HH1':: 

1448 

1449 test_pile = pile.make_pile('/local/test_trace_directory') 

1450 for t in test_pile.iter_traces( 

1451 trace_selector=lambda tr: tr.station=='HH1'): 

1452 

1453 print t 

1454 ''' 

1455 

1456 for subpile in self.subpiles.values(): 

1457 if not group_selector or group_selector(subpile): 

1458 for tr in subpile.iter_traces(load_data, return_abspath, 

1459 group_selector, trace_selector): 

1460 yield tr 

1461 

1462 def iter_files(self): 

1463 for subpile in self.subpiles.values(): 

1464 for file in subpile.iter_files(): 

1465 yield file 

1466 

1467 def reload_modified(self): 

1468 modified = False 

1469 for subpile in self.subpiles.values(): 

1470 modified |= subpile.reload_modified() 

1471 

1472 return modified 

1473 

1474 def get_tmin(self): 

1475 return self.tmin 

1476 

1477 def get_tmax(self): 

1478 return self.tmax 

1479 

1480 def get_deltatmin(self): 

1481 return self.deltatmin 

1482 

1483 def get_deltatmax(self): 

1484 return self.deltatmax 

1485 

1486 def is_empty(self): 

1487 return self.tmin is None and self.tmax is None 

1488 

1489 def __str__(self): 

1490 if self.tmin is not None and self.tmax is not None: 

1491 tmin = util.time_to_str(self.tmin) 

1492 tmax = util.time_to_str(self.tmax) 

1493 s = 'Pile\n' 

1494 s += 'number of subpiles: %i\n' % len(self.subpiles) 

1495 s += 'timerange: %s - %s\n' % (tmin, tmax) 

1496 s += 'networks: %s\n' % ', '.join(sl(self.networks.keys())) 

1497 s += 'stations: %s\n' % ', '.join(sl(self.stations.keys())) 

1498 s += 'locations: %s\n' % ', '.join(sl(self.locations.keys())) 

1499 s += 'channels: %s\n' % ', '.join(sl(self.channels.keys())) 

1500 s += 'deltats: %s\n' % ', '.join(sl(self.deltats.keys())) 

1501 

1502 else: 

1503 s = 'empty Pile' 

1504 

1505 return s 

1506 

1507 def snuffle(self, **kwargs): 

1508 ''' 

1509 Visualize it. 

1510 

1511 :param stations: list of :py:class:`pyrocko.model.Station` objects or 

1512 ``None`` 

1513 :param events: list of :py:class:`pyrocko.model.Event` objects or 

1514 ``None`` 

1515 :param markers: list of :py:class:`pyrocko.gui.snuffler.marker.Marker` 

1516 objects or ``None`` 

1517 :param ntracks: float, number of tracks to be shown initially 

1518 (default: 12) 

1519 :param follow: time interval (in seconds) for real time follow mode or 

1520 ``None`` 

1521 :param controls: bool, whether to show the main controls (default: 

1522 ``True``) 

1523 :param opengl: bool, whether to use opengl (default: ``False``) 

1524 ''' 

1525 

1526 from pyrocko.gui.snuffler.snuffler import snuffle 

1527 snuffle(self, **kwargs) 

1528 

1529 

1530def make_pile( 

1531 paths=None, selector=None, regex=None, 

1532 fileformat='mseed', 

1533 cachedirname=None, show_progress=True): 

1534 

1535 ''' 

1536 Create pile from given file and directory names. 

1537 

1538 :param paths: filenames and/or directories to look for traces. If paths is 

1539 ``None`` ``sys.argv[1:]`` is used. 

1540 :param selector: lambda expression taking group dict of regex match object 

1541 as a single argument and which returns true or false to keep or reject 

1542 a file 

1543 :param regex: regular expression which filenames have to match 

1544 :param fileformat: format of the files ('mseed', 'sac', 'kan', 

1545 'from_extension', 'detect') 

1546 :param cachedirname: loader cache is stored under this directory. It is 

1547 created as neccessary. 

1548 :param show_progress: show progress bar and other progress information 

1549 ''' 

1550 

1551 if show_progress_force_off: 

1552 show_progress = False 

1553 

1554 if isinstance(paths, str): 

1555 paths = [paths] 

1556 

1557 if paths is None: 

1558 paths = sys.argv[1:] 

1559 

1560 if cachedirname is None: 

1561 cachedirname = config.config().cache_dir 

1562 

1563 fns = util.select_files( 

1564 paths, include=regex, selector=selector, show_progress=show_progress) 

1565 

1566 cache = get_cache(cachedirname) 

1567 p = Pile() 

1568 p.load_files( 

1569 sorted(fns), 

1570 cache=cache, 

1571 fileformat=fileformat, 

1572 show_progress=show_progress) 

1573 

1574 return p 

1575 

1576 

1577class Injector(trace.States): 

1578 

1579 def __init__( 

1580 self, pile, 

1581 fixation_length=None, 

1582 path=None, 

1583 format='from_extension', 

1584 forget_fixed=False): 

1585 

1586 trace.States.__init__(self) 

1587 self._pile = pile 

1588 self._fixation_length = fixation_length 

1589 self._format = format 

1590 self._path = path 

1591 self._forget_fixed = forget_fixed 

1592 

1593 def set_fixation_length(self, length): 

1594 ''' 

1595 Set length after which the fixation method is called on buffer traces. 

1596 

1597 The length should be given in seconds. Give None to disable. 

1598 ''' 

1599 self.fixate_all() 

1600 self._fixation_length = length # in seconds 

1601 

1602 def set_save_path( 

1603 self, 

1604 path='dump_%(network)s.%(station)s.%(location)s.%(channel)s_' 

1605 '%(tmin)s_%(tmax)s.mseed'): 

1606 

1607 self.fixate_all() 

1608 self._path = path 

1609 

1610 def inject(self, trace): 

1611 logger.debug('Received a trace: %s' % trace) 

1612 

1613 buf = self.get(trace) 

1614 if buf is None: 

1615 trbuf = trace.copy() 

1616 buf = MemTracesFile(None, [trbuf]) 

1617 self._pile.add_file(buf) 

1618 self.set(trace, buf) 

1619 

1620 else: 

1621 self._pile.remove_file(buf) 

1622 trbuf = buf.get_traces()[0] 

1623 buf.remove(trbuf) 

1624 trbuf.append(trace.ydata) 

1625 buf.add(trbuf) 

1626 self._pile.add_file(buf) 

1627 self.set(trace, buf) 

1628 

1629 trbuf = buf.get_traces()[0] 

1630 if self._fixation_length is not None: 

1631 if trbuf.tmax - trbuf.tmin > self._fixation_length: 

1632 self._fixate(buf, complete=False) 

1633 

1634 def fixate_all(self): 

1635 for state in list(self._states.values()): 

1636 self._fixate(state[-1]) 

1637 

1638 self._states = {} 

1639 

1640 def free(self, buf): 

1641 self._fixate(buf) 

1642 

1643 def _fixate(self, buf, complete=True): 

1644 trbuf = buf.get_traces()[0] 

1645 del_state = True 

1646 if self._path: 

1647 if self._fixation_length is not None: 

1648 ttmin = trbuf.tmin 

1649 ytmin = util.year_start(ttmin) 

1650 n = int(math.floor((ttmin - ytmin) / self._fixation_length)) 

1651 tmin = ytmin + n*self._fixation_length 

1652 traces = [] 

1653 t = tmin 

1654 while t <= trbuf.tmax: 

1655 try: 

1656 traces.append( 

1657 trbuf.chop( 

1658 t, 

1659 t+self._fixation_length, 

1660 inplace=False, 

1661 snap=(math.ceil, math.ceil))) 

1662 

1663 except trace.NoData: 

1664 pass 

1665 t += self._fixation_length 

1666 

1667 if abs(traces[-1].tmax - (t - trbuf.deltat)) < \ 

1668 trbuf.deltat/100. or complete: 

1669 

1670 self._pile.remove_file(buf) 

1671 

1672 else: # reinsert incomplete last part 

1673 new_trbuf = traces.pop() 

1674 self._pile.remove_file(buf) 

1675 buf.remove(trbuf) 

1676 buf.add(new_trbuf) 

1677 self._pile.add_file(buf) 

1678 del_state = False 

1679 

1680 else: 

1681 traces = [trbuf] 

1682 self._pile.remove_file(buf) 

1683 

1684 fns = io.save(traces, self._path, format=self._format) 

1685 

1686 if not self._forget_fixed: 

1687 self._pile.load_files( 

1688 fns, show_progress=False, fileformat=self._format) 

1689 

1690 if del_state: 

1691 del self._states[trbuf.nslc_id] 

1692 

1693 def __del__(self): 

1694 self.fixate_all()