1# http://pyrocko.org - GPLv3 

2# 

3# The Pyrocko Developers, 21st Century 

4# ---|P------/S----------~Lg---------- 

5from __future__ import absolute_import, division, print_function 

6 

7import os 

8import logging 

9import time 

10import weakref 

11import copy 

12import re 

13import sys 

14import operator 

15import math 

16import hashlib 

17try: 

18 import cPickle as pickle 

19except ImportError: 

20 import pickle 

21 

22 

23from . import avl 

24from . import trace, io, util 

25from . import config 

26from .trace import degapper 

27 

28 

29is_windows = sys.platform.startswith('win') 

30show_progress_force_off = False 

31version_salt = 'v1-' 

32 

33 

34def ehash(s): 

35 return hashlib.sha1((version_salt + s).encode('utf8')).hexdigest() 

36 

37 

38def cmp(a, b): 

39 return int(a > b) - int(a < b) 

40 

41 

42def sl(s): 

43 return [str(x) for x in sorted(s)] 

44 

45 

46class Counter(dict): 

47 

48 def __missing__(self, k): 

49 return 0 

50 

51 def update(self, other): 

52 for k, v in other.items(): 

53 self[k] += v 

54 

55 def subtract(self, other): 

56 for k, v in other.items(): 

57 self[k] -= v 

58 if self[k] <= 0: 

59 del self[k] 

60 

61 def subtract1(self, k): 

62 self[k] -= 1 

63 if self[k] <= 0: 

64 del self[k] 

65 

66 

67def fix_unicode_copy(counter, func): 

68 counter_new = Counter() 

69 for k in counter: 

70 counter_new[func(k)] = counter[k] 

71 return counter_new 

72 

73 

74pjoin = os.path.join 

75logger = logging.getLogger('pyrocko.pile') 

76 

77 

78def avl_remove_exact(avltree, element): 

79 ilo, ihi = avltree.span(element) 

80 for i in range(ilo, ihi): 

81 if avltree[i] is element: 

82 avltree.remove_at(i) 

83 return i 

84 

85 raise ValueError( 

86 'avl_remove_exact(avltree, element): element not in avltree') 

87 

88 

89def cmpfunc(key): 

90 if isinstance(key, str): 

91 # special cases; these run about 50% faster than the generic one on 

92 # Python 2.5 

93 if key == 'tmin': 

94 return lambda a, b: cmp(a.tmin, b.tmin) 

95 if key == 'tmax': 

96 return lambda a, b: cmp(a.tmax, b.tmax) 

97 

98 key = operator.attrgetter(key) 

99 

100 return lambda a, b: cmp(key(a), key(b)) 

101 

102 

103g_dummys = {} 

104 

105 

106def get_dummy(key): 

107 if key not in g_dummys: 

108 class Dummy(object): 

109 def __init__(self, k): 

110 setattr(self, key, k) 

111 

112 g_dummys[key] = Dummy 

113 

114 return g_dummys[key] 

115 

116 

117class TooMany(Exception): 

118 def __init__(self, n): 

119 Exception.__init__(self) 

120 self.n = n 

121 

122 

123class Sorted(object): 

124 def __init__(self, values=[], key=None): 

125 self._set_key(key) 

126 self._avl = avl.new(values, self._cmp) 

127 

128 def _set_key(self, key): 

129 self._key = key 

130 self._cmp = cmpfunc(key) 

131 if isinstance(key, str): 

132 self._dummy = get_dummy(key) 

133 

134 def __getstate__(self): 

135 state = list(self._avl.iter()), self._key 

136 return state 

137 

138 def __setstate__(self, state): 

139 l, key = state 

140 self._set_key(key) 

141 self._avl = avl.from_iter(iter(l), len(l)) 

142 

143 def insert(self, value): 

144 self._avl.insert(value) 

145 

146 def remove(self, value): 

147 return avl_remove_exact(self._avl, value) 

148 

149 def remove_at(self, i): 

150 return self._avl.remove_at(i) 

151 

152 def insert_many(self, values): 

153 for value in values: 

154 self._avl.insert(value) 

155 

156 def remove_many(self, values): 

157 for value in values: 

158 avl_remove_exact(self._avl, value) 

159 

160 def __iter__(self): 

161 return iter(self._avl) 

162 

163 def with_key_in(self, kmin, kmax): 

164 omin, omax = self._dummy(kmin), self._dummy(kmax) 

165 ilo, ihi = self._avl.span(omin, omax) 

166 return self._avl[ilo:ihi] 

167 

168 def with_key_in_limited(self, kmin, kmax, nmax): 

169 omin, omax = self._dummy(kmin), self._dummy(kmax) 

170 ilo, ihi = self._avl.span(omin, omax) 

171 if ihi - ilo > nmax: 

172 raise TooMany(ihi - ilo) 

173 

174 return self._avl[ilo:ihi] 

175 

176 def index(self, value): 

177 ilo, ihi = self._avl.span(value) 

178 for i in range(ilo, ihi): 

179 if self._avl[i] is value: 

180 return i 

181 

182 raise ValueError('element is not in avl tree') 

183 

184 def min(self): 

185 return self._avl.min() 

186 

187 def max(self): 

188 return self._avl.max() 

189 

190 def __len__(self): 

191 return len(self._avl) 

192 

193 def __getitem__(self, i): 

194 return self._avl[i] 

195 

196 

197class TracesFileCache(object): 

198 ''' 

199 Manages trace metainformation cache. 

200 

201 For each directory with files containing traces, one cache file is 

202 maintained to hold the trace metainformation of all files which are 

203 contained in the directory. 

204 ''' 

205 

206 caches = {} 

207 

208 def __init__(self, cachedir): 

209 ''' 

210 Create new cache. 

211 

212 :param cachedir: directory to hold the cache files. 

213 ''' 

214 

215 self.cachedir = cachedir 

216 self.dircaches = {} 

217 self.modified = set() 

218 util.ensuredir(self.cachedir) 

219 

220 def get(self, abspath): 

221 ''' 

222 Try to get an item from the cache. 

223 

224 :param abspath: absolute path of the object to retrieve 

225 

226 :returns: a stored object is returned or None if nothing could be 

227 found. 

228 ''' 

229 

230 dircache = self._get_dircache_for(abspath) 

231 if abspath in dircache: 

232 return dircache[abspath] 

233 return None 

234 

235 def put(self, abspath, tfile): 

236 ''' 

237 Put an item into the cache. 

238 

239 :param abspath: absolute path of the object to be stored 

240 :param tfile: object to be stored 

241 ''' 

242 

243 cachepath = self._dircachepath(abspath) 

244 # get lock on cachepath here 

245 dircache = self._get_dircache(cachepath) 

246 dircache[abspath] = tfile 

247 self.modified.add(cachepath) 

248 

249 def dump_modified(self): 

250 ''' 

251 Save any modifications to disk. 

252 ''' 

253 

254 for cachepath in self.modified: 

255 self._dump_dircache(self.dircaches[cachepath], cachepath) 

256 # unlock 

257 

258 self.modified = set() 

259 

260 def clean(self): 

261 ''' 

262 Weed out missing files from the disk caches. 

263 ''' 

264 

265 self.dump_modified() 

266 

267 for fn in os.listdir(self.cachedir): 

268 if len(fn) == 40: 

269 cache = self._load_dircache(pjoin(self.cachedir, fn)) 

270 self._dump_dircache(cache, pjoin(self.cachedir, fn)) 

271 

272 def _get_dircache_for(self, abspath): 

273 return self._get_dircache(self._dircachepath(abspath)) 

274 

275 def _get_dircache(self, cachepath): 

276 if cachepath not in self.dircaches: 

277 if os.path.isfile(cachepath): 

278 self.dircaches[cachepath] = self._load_dircache(cachepath) 

279 else: 

280 self.dircaches[cachepath] = {} 

281 

282 return self.dircaches[cachepath] 

283 

284 def _dircachepath(self, abspath): 

285 cachefn = ehash(os.path.dirname(abspath)) 

286 return pjoin(self.cachedir, cachefn) 

287 

288 def _load_dircache(self, cachefilename): 

289 

290 with open(cachefilename, 'rb') as f: 

291 cache = pickle.load(f) 

292 

293 # weed out files which no longer exist 

294 for fn in list(cache.keys()): 

295 if not os.path.isfile(fn): 

296 del cache[fn] 

297 

298 time_float = util.get_time_float() 

299 

300 for v in cache.values(): 

301 v.trees_from_content(v.traces) 

302 for tr in v.traces: 

303 tr.file = v 

304 # fix Py2 codes to not include unicode when the cache file 

305 # was created with Py3 

306 if not isinstance(tr.station, str): 

307 tr.prune_from_reuse_cache() 

308 tr.set_codes( 

309 str(tr.network), 

310 str(tr.station), 

311 str(tr.location), 

312 str(tr.channel)) 

313 

314 tr.tmin = time_float(tr.tmin) 

315 tr.tmax = time_float(tr.tmax) 

316 

317 v.data_use_count = 0 

318 v.data_loaded = False 

319 v.fix_unicode_codes() 

320 

321 return cache 

322 

323 def _dump_dircache(self, cache, cachefilename): 

324 

325 if not cache: 

326 if os.path.exists(cachefilename): 

327 os.remove(cachefilename) 

328 return 

329 

330 # make a copy without the parents and the binsearch trees 

331 cache_copy = {} 

332 for fn in cache.keys(): 

333 trf = copy.copy(cache[fn]) 

334 trf.parent = None 

335 trf.by_tmin = None 

336 trf.by_tmax = None 

337 trf.by_tlen = None 

338 trf.by_mtime = None 

339 trf.data_use_count = 0 

340 trf.data_loaded = False 

341 traces = [] 

342 for tr in trf.traces: 

343 tr = tr.copy(data=False) 

344 tr.ydata = None 

345 tr.meta = None 

346 tr.file = trf 

347 traces.append(tr) 

348 

349 trf.traces = traces 

350 

351 cache_copy[fn] = trf 

352 

353 tmpfn = cachefilename+'.%i.tmp' % os.getpid() 

354 with open(tmpfn, 'wb') as f: 

355 pickle.dump(cache_copy, f, protocol=2) 

356 

357 if is_windows and os.path.exists(cachefilename): 

358 # windows doesn't allow to rename over existing file 

359 os.unlink(cachefilename) 

360 

361 os.rename(tmpfn, cachefilename) 

362 

363 

364def get_cache(cachedir): 

365 ''' 

366 Get global TracesFileCache object for given directory. 

367 ''' 

368 if cachedir not in TracesFileCache.caches: 

369 TracesFileCache.caches[cachedir] = TracesFileCache(cachedir) 

370 

371 return TracesFileCache.caches[cachedir] 

372 

373 

374def loader( 

375 filenames, fileformat, cache, filename_attributes, 

376 show_progress=True, update_progress=None): 

377 

378 if show_progress_force_off: 

379 show_progress = False 

380 

381 class Progress(object): 

382 def __init__(self, label, n): 

383 self._label = label 

384 self._n = n 

385 self._bar = None 

386 if show_progress: 

387 self._bar = util.progressbar(label, self._n) 

388 

389 if update_progress: 

390 update_progress(label, 0, self._n) 

391 

392 def update(self, i): 

393 if self._bar: 

394 if i < self._n-1: 

395 self._bar.update(i) 

396 else: 

397 self._bar.finish() 

398 self._bar = None 

399 

400 abort = False 

401 if update_progress: 

402 abort = update_progress(self._label, i, self._n) 

403 

404 return abort 

405 

406 def finish(self): 

407 if self._bar: 

408 self._bar.finish() 

409 self._bar = None 

410 

411 if not filenames: 

412 logger.warning('No files to load from') 

413 return 

414 

415 regex = None 

416 if filename_attributes: 

417 regex = re.compile(filename_attributes) 

418 

419 try: 

420 progress = Progress('Looking at files', len(filenames)) 

421 

422 failures = [] 

423 to_load = [] 

424 for i, filename in enumerate(filenames): 

425 try: 

426 abspath = os.path.abspath(filename) 

427 

428 substitutions = None 

429 if regex: 

430 m = regex.search(filename) 

431 if not m: 

432 raise FilenameAttributeError( 

433 "Cannot get attributes with pattern '%s' " 

434 "from path '%s'" % (filename_attributes, filename)) 

435 

436 substitutions = {} 

437 for k in m.groupdict(): 

438 if k in ('network', 'station', 'location', 'channel'): 

439 substitutions[k] = m.groupdict()[k] 

440 

441 mtime = os.stat(filename)[8] 

442 tfile = None 

443 if cache: 

444 tfile = cache.get(abspath) 

445 

446 mustload = ( 

447 not tfile or 

448 (tfile.format != fileformat and fileformat != 'detect') or 

449 tfile.mtime != mtime or 

450 substitutions is not None) 

451 

452 to_load.append( 

453 (mustload, mtime, abspath, substitutions, tfile)) 

454 

455 except (OSError, FilenameAttributeError) as xerror: 

456 failures.append(abspath) 

457 logger.warning(xerror) 

458 

459 abort = progress.update(i+1) 

460 if abort: 

461 progress.update(len(filenames)) 

462 return 

463 

464 progress.update(len(filenames)) 

465 

466 to_load.sort(key=lambda x: x[2]) 

467 

468 nload = len([1 for x in to_load if x[0]]) 

469 iload = 0 

470 

471 count_all = False 

472 if nload < 0.01*len(to_load): 

473 nload = len(to_load) 

474 count_all = True 

475 

476 if to_load: 

477 progress = Progress('Scanning files', nload) 

478 

479 for (mustload, mtime, abspath, substitutions, tfile) in to_load: 

480 try: 

481 if mustload: 

482 tfile = TracesFile( 

483 None, abspath, fileformat, 

484 substitutions=substitutions, mtime=mtime) 

485 

486 if cache and not substitutions: 

487 cache.put(abspath, tfile) 

488 

489 if not count_all: 

490 iload += 1 

491 

492 if count_all: 

493 iload += 1 

494 

495 except (io.FileLoadError, OSError) as xerror: 

496 failures.append(abspath) 

497 logger.warning(xerror) 

498 else: 

499 yield tfile 

500 

501 abort = progress.update(iload+1) 

502 if abort: 

503 break 

504 

505 progress.update(nload) 

506 

507 if failures: 

508 logger.warning( 

509 'The following file%s caused problems and will be ignored:\n' % 

510 util.plural_s(len(failures)) + '\n'.join(failures)) 

511 

512 if cache: 

513 cache.dump_modified() 

514 finally: 

515 progress.finish() 

516 

517 

518def tlen(x): 

519 return x.tmax-x.tmin 

520 

521 

522class TracesGroup(object): 

523 

524 ''' 

525 Trace container base class. 

526 

527 Base class for Pile, SubPile, and TracesFile, i.e. anything containing 

528 a collection of several traces. A TracesGroup object maintains lookup sets 

529 of some of the traces meta-information, as well as a combined time-range 

530 of its contents. 

531 ''' 

532 

533 def __init__(self, parent): 

534 self.parent = parent 

535 self.empty() 

536 self.nupdates = 0 

537 self.abspath = None 

538 

539 def set_parent(self, parent): 

540 self.parent = parent 

541 

542 def get_parent(self): 

543 return self.parent 

544 

545 def empty(self): 

546 self.networks, self.stations, self.locations, self.channels, \ 

547 self.nslc_ids, self.deltats = [Counter() for x in range(6)] 

548 self.by_tmin = Sorted([], 'tmin') 

549 self.by_tmax = Sorted([], 'tmax') 

550 self.by_tlen = Sorted([], tlen) 

551 self.by_mtime = Sorted([], 'mtime') 

552 self.tmin, self.tmax = None, None 

553 self.deltatmin, self.deltatmax = None, None 

554 

555 def trees_from_content(self, content): 

556 self.by_tmin = Sorted(content, 'tmin') 

557 self.by_tmax = Sorted(content, 'tmax') 

558 self.by_tlen = Sorted(content, tlen) 

559 self.by_mtime = Sorted(content, 'mtime') 

560 self.adjust_minmax() 

561 

562 def fix_unicode_codes(self): 

563 for net in self.networks: 

564 if isinstance(net, str): 

565 return 

566 

567 self.networks = fix_unicode_copy(self.networks, str) 

568 self.stations = fix_unicode_copy(self.stations, str) 

569 self.locations = fix_unicode_copy(self.locations, str) 

570 self.channels = fix_unicode_copy(self.channels, str) 

571 self.nslc_ids = fix_unicode_copy( 

572 self.nslc_ids, lambda k: tuple(str(x) for x in k)) 

573 

574 def add(self, content): 

575 ''' 

576 Add content to traces group and update indices. 

577 

578 Accepts :py:class:`pyrocko.trace.Trace` objects and 

579 :py:class:`pyrocko.pile.TracesGroup` objects. 

580 ''' 

581 

582 if isinstance(content, (trace.Trace, TracesGroup)): 

583 content = [content] 

584 

585 for c in content: 

586 

587 if isinstance(c, TracesGroup): 

588 self.networks.update(c.networks) 

589 self.stations.update(c.stations) 

590 self.locations.update(c.locations) 

591 self.channels.update(c.channels) 

592 self.nslc_ids.update(c.nslc_ids) 

593 self.deltats.update(c.deltats) 

594 

595 self.by_tmin.insert_many(c.by_tmin) 

596 self.by_tmax.insert_many(c.by_tmax) 

597 self.by_tlen.insert_many(c.by_tlen) 

598 self.by_mtime.insert_many(c.by_mtime) 

599 

600 elif isinstance(c, trace.Trace): 

601 self.networks[c.network] += 1 

602 self.stations[c.station] += 1 

603 self.locations[c.location] += 1 

604 self.channels[c.channel] += 1 

605 self.nslc_ids[c.nslc_id] += 1 

606 self.deltats[c.deltat] += 1 

607 

608 self.by_tmin.insert(c) 

609 self.by_tmax.insert(c) 

610 self.by_tlen.insert(c) 

611 self.by_mtime.insert(c) 

612 

613 self.adjust_minmax() 

614 

615 self.nupdates += 1 

616 self.notify_listeners('add') 

617 

618 if self.parent is not None: 

619 self.parent.add(content) 

620 

621 def remove(self, content): 

622 ''' 

623 Remove content to traces group and update indices. 

624 ''' 

625 if isinstance(content, (trace.Trace, TracesGroup)): 

626 content = [content] 

627 

628 for c in content: 

629 

630 if isinstance(c, TracesGroup): 

631 self.networks.subtract(c.networks) 

632 self.stations.subtract(c.stations) 

633 self.locations.subtract(c.locations) 

634 self.channels.subtract(c.channels) 

635 self.nslc_ids.subtract(c.nslc_ids) 

636 self.deltats.subtract(c.deltats) 

637 

638 self.by_tmin.remove_many(c.by_tmin) 

639 self.by_tmax.remove_many(c.by_tmax) 

640 self.by_tlen.remove_many(c.by_tlen) 

641 self.by_mtime.remove_many(c.by_mtime) 

642 

643 elif isinstance(c, trace.Trace): 

644 self.networks.subtract1(c.network) 

645 self.stations.subtract1(c.station) 

646 self.locations.subtract1(c.location) 

647 self.channels.subtract1(c.channel) 

648 self.nslc_ids.subtract1(c.nslc_id) 

649 self.deltats.subtract1(c.deltat) 

650 

651 self.by_tmin.remove(c) 

652 self.by_tmax.remove(c) 

653 self.by_tlen.remove(c) 

654 self.by_mtime.remove(c) 

655 

656 self.adjust_minmax() 

657 

658 self.nupdates += 1 

659 self.notify_listeners('remove') 

660 

661 if self.parent is not None: 

662 self.parent.remove(content) 

663 

664 def relevant(self, tmin, tmax, group_selector=None, trace_selector=None): 

665 ''' 

666 Return list of :py:class:`pyrocko.trace.Trace` objects where given 

667 arguments ``tmin`` and ``tmax`` match. 

668 

669 :param tmin: start time 

670 :param tmax: end time 

671 :param group_selector: lambda expression taking group dict of regex 

672 match object as a single argument and which returns true or false 

673 to keep or reject a file (default: ``None``) 

674 :param trace_selector: lambda expression taking group dict of regex 

675 match object as a single argument and which returns true or false 

676 to keep or reject a file (default: ``None``) 

677 ''' 

678 

679 if not self.by_tmin or not self.is_relevant( 

680 tmin, tmax, group_selector): 

681 

682 return [] 

683 

684 return [tr for tr in self.by_tmin.with_key_in(tmin-self.tlenmax, tmax) 

685 if tr.is_relevant(tmin, tmax, trace_selector)] 

686 

687 def adjust_minmax(self): 

688 if self.by_tmin: 

689 self.tmin = self.by_tmin.min().tmin 

690 self.tmax = self.by_tmax.max().tmax 

691 t = self.by_tlen.max() 

692 self.tlenmax = t.tmax - t.tmin 

693 self.mtime = self.by_mtime.max().mtime 

694 deltats = list(self.deltats.keys()) 

695 self.deltatmin = min(deltats) 

696 self.deltatmax = max(deltats) 

697 else: 

698 self.tmin = None 

699 self.tmax = None 

700 self.tlenmax = None 

701 self.mtime = None 

702 self.deltatmin = None 

703 self.deltatmax = None 

704 

705 def notify_listeners(self, what): 

706 pass 

707 

708 def get_update_count(self): 

709 return self.nupdates 

710 

711 def overlaps(self, tmin, tmax): 

712 return self.tmin is not None \ 

713 and tmax >= self.tmin and self.tmax >= tmin 

714 

715 def is_relevant(self, tmin, tmax, group_selector=None): 

716 if self.tmin is None or self.tmax is None: 

717 return False 

718 return tmax >= self.tmin and self.tmax >= tmin and ( 

719 group_selector is None or group_selector(self)) 

720 

721 

722class MemTracesFile(TracesGroup): 

723 

724 ''' 

725 This is needed to make traces without an actual disc file to be inserted 

726 into a Pile. 

727 ''' 

728 

729 def __init__(self, parent, traces): 

730 TracesGroup.__init__(self, parent) 

731 self.add(traces) 

732 self.mtime = time.time() 

733 

734 def add(self, traces): 

735 if isinstance(traces, trace.Trace): 

736 traces = [traces] 

737 

738 for tr in traces: 

739 tr.file = self 

740 

741 TracesGroup.add(self, traces) 

742 

743 def load_headers(self, mtime=None): 

744 pass 

745 

746 def load_data(self): 

747 pass 

748 

749 def use_data(self): 

750 pass 

751 

752 def drop_data(self): 

753 pass 

754 

755 def reload_if_modified(self): 

756 return False 

757 

758 def iter_traces(self): 

759 for tr in self.by_tmin: 

760 yield tr 

761 

762 def get_traces(self): 

763 return list(self.by_tmin) 

764 

765 def gather_keys(self, gather, selector=None): 

766 keys = set() 

767 for tr in self.by_tmin: 

768 if selector is None or selector(tr): 

769 keys.add(gather(tr)) 

770 

771 return keys 

772 

773 def __str__(self): 

774 

775 s = 'MemTracesFile\n' 

776 s += 'file mtime: %s\n' % util.time_to_str(self.mtime) 

777 s += 'number of traces: %i\n' % len(self.by_tmin) 

778 s += 'timerange: %s - %s\n' % ( 

779 util.time_to_str(self.tmin), util.time_to_str(self.tmax)) 

780 s += 'networks: %s\n' % ', '.join(sl(self.networks.keys())) 

781 s += 'stations: %s\n' % ', '.join(sl(self.stations.keys())) 

782 s += 'locations: %s\n' % ', '.join(sl(self.locations.keys())) 

783 s += 'channels: %s\n' % ', '.join(sl(self.channels.keys())) 

784 s += 'deltats: %s\n' % ', '.join(sl(self.deltats.keys())) 

785 return s 

786 

787 

788class TracesFile(TracesGroup): 

789 def __init__( 

790 self, parent, abspath, format, 

791 substitutions=None, mtime=None): 

792 

793 TracesGroup.__init__(self, parent) 

794 self.abspath = abspath 

795 self.format = format 

796 self.traces = [] 

797 self.data_loaded = False 

798 self.data_use_count = 0 

799 self.substitutions = substitutions 

800 self.load_headers(mtime=mtime) 

801 self.mtime = mtime 

802 

803 def load_headers(self, mtime=None): 

804 logger.debug('loading headers from file: %s' % self.abspath) 

805 if mtime is None: 

806 self.mtime = os.stat(self.abspath)[8] 

807 

808 def kgen(tr): 

809 return (tr.mtime, tr.tmin, tr.tmax) + tr.nslc_id 

810 

811 self.remove(self.traces) 

812 ks = set() 

813 for tr in io.load(self.abspath, 

814 format=self.format, 

815 getdata=False, 

816 substitutions=self.substitutions): 

817 

818 k = kgen(tr) 

819 if k not in ks: 

820 ks.add(k) 

821 self.traces.append(tr) 

822 tr.file = self 

823 

824 self.add(self.traces) 

825 

826 self.data_loaded = False 

827 self.data_use_count = 0 

828 

829 def load_data(self, force=False): 

830 file_changed = False 

831 if not self.data_loaded or force: 

832 logger.debug('loading data from file: %s' % self.abspath) 

833 

834 def kgen(tr): 

835 return (tr.mtime, tr.tmin, tr.tmax) + tr.nslc_id 

836 

837 traces_ = io.load(self.abspath, format=self.format, getdata=True, 

838 substitutions=self.substitutions) 

839 

840 # prevent adding duplicate snippets from corrupt mseed files 

841 k_loaded = set() 

842 traces = [] 

843 for tr in traces_: 

844 k = kgen(tr) 

845 if k not in k_loaded: 

846 k_loaded.add(k) 

847 traces.append(tr) 

848 

849 k_current_d = dict((kgen(tr), tr) for tr in self.traces) 

850 k_current = set(k_current_d) 

851 k_new = k_loaded - k_current 

852 k_delete = k_current - k_loaded 

853 k_unchanged = k_current & k_loaded 

854 

855 for tr in self.traces[:]: 

856 if kgen(tr) in k_delete: 

857 self.remove(tr) 

858 self.traces.remove(tr) 

859 tr.file = None 

860 file_changed = True 

861 

862 for tr in traces: 

863 if kgen(tr) in k_new: 

864 tr.file = self 

865 self.traces.append(tr) 

866 self.add(tr) 

867 file_changed = True 

868 

869 for tr in traces: 

870 if kgen(tr) in k_unchanged: 

871 ctr = k_current_d[kgen(tr)] 

872 ctr.ydata = tr.ydata 

873 

874 self.data_loaded = True 

875 

876 if file_changed: 

877 logger.debug('reloaded (file may have changed): %s' % self.abspath) 

878 

879 return file_changed 

880 

881 def use_data(self): 

882 if not self.data_loaded: 

883 raise Exception('Data not loaded') 

884 self.data_use_count += 1 

885 

886 def drop_data(self): 

887 if self.data_loaded: 

888 if self.data_use_count == 1: 

889 logger.debug('forgetting data of file: %s' % self.abspath) 

890 for tr in self.traces: 

891 tr.drop_data() 

892 

893 self.data_loaded = False 

894 

895 self.data_use_count -= 1 

896 else: 

897 self.data_use_count = 0 

898 

899 def reload_if_modified(self): 

900 mtime = os.stat(self.abspath)[8] 

901 if mtime != self.mtime: 

902 logger.debug( 

903 'mtime=%i, reloading file: %s' % (mtime, self.abspath)) 

904 

905 self.mtime = mtime 

906 if self.data_loaded: 

907 self.load_data(force=True) 

908 else: 

909 self.load_headers() 

910 

911 return True 

912 

913 return False 

914 

915 def iter_traces(self): 

916 for tr in self.traces: 

917 yield tr 

918 

919 def gather_keys(self, gather, selector=None): 

920 keys = set() 

921 for tr in self.by_tmin: 

922 if selector is None or selector(tr): 

923 keys.add(gather(tr)) 

924 

925 return keys 

926 

927 def __str__(self): 

928 s = 'TracesFile\n' 

929 s += 'abspath: %s\n' % self.abspath 

930 s += 'file mtime: %s\n' % util.time_to_str(self.mtime) 

931 s += 'number of traces: %i\n' % len(self.traces) 

932 s += 'timerange: %s - %s\n' % ( 

933 util.time_to_str(self.tmin), util.time_to_str(self.tmax)) 

934 s += 'networks: %s\n' % ', '.join(sl(self.networks.keys())) 

935 s += 'stations: %s\n' % ', '.join(sl(self.stations.keys())) 

936 s += 'locations: %s\n' % ', '.join(sl(self.locations.keys())) 

937 s += 'channels: %s\n' % ', '.join(sl(self.channels.keys())) 

938 s += 'deltats: %s\n' % ', '.join(sl(self.deltats.keys())) 

939 return s 

940 

941 

942class FilenameAttributeError(Exception): 

943 pass 

944 

945 

946class SubPile(TracesGroup): 

947 def __init__(self, parent): 

948 TracesGroup.__init__(self, parent) 

949 self.files = [] 

950 self.empty() 

951 

952 def add_file(self, file): 

953 self.files.append(file) 

954 file.set_parent(self) 

955 self.add(file) 

956 

957 def remove_file(self, file): 

958 self.files.remove(file) 

959 file.set_parent(None) 

960 self.remove(file) 

961 

962 def remove_files(self, files): 

963 for file in files: 

964 self.files.remove(file) 

965 file.set_parent(None) 

966 self.remove(files) 

967 

968 def gather_keys(self, gather, selector=None): 

969 keys = set() 

970 for file in self.files: 

971 keys |= file.gather_keys(gather, selector) 

972 

973 return keys 

974 

975 def iter_traces( 

976 self, 

977 load_data=False, 

978 return_abspath=False, 

979 group_selector=None, 

980 trace_selector=None): 

981 

982 for file in self.files: 

983 

984 if group_selector and not group_selector(file): 

985 continue 

986 

987 must_drop = False 

988 if load_data: 

989 file.load_data() 

990 file.use_data() 

991 must_drop = True 

992 

993 for tr in file.iter_traces(): 

994 if trace_selector and not trace_selector(tr): 

995 continue 

996 

997 if return_abspath: 

998 yield file.abspath, tr 

999 else: 

1000 yield tr 

1001 

1002 if must_drop: 

1003 file.drop_data() 

1004 

1005 def iter_files(self): 

1006 for file in self.files: 

1007 yield file 

1008 

1009 def reload_modified(self): 

1010 modified = False 

1011 for file in self.files: 

1012 modified |= file.reload_if_modified() 

1013 

1014 return modified 

1015 

1016 def __str__(self): 

1017 s = 'SubPile\n' 

1018 s += 'number of files: %i\n' % len(self.files) 

1019 s += 'timerange: %s - %s\n' % ( 

1020 util.time_to_str(self.tmin), util.time_to_str(self.tmax)) 

1021 s += 'networks: %s\n' % ', '.join(sl(self.networks.keys())) 

1022 s += 'stations: %s\n' % ', '.join(sl(self.stations.keys())) 

1023 s += 'locations: %s\n' % ', '.join(sl(self.locations.keys())) 

1024 s += 'channels: %s\n' % ', '.join(sl(self.channels.keys())) 

1025 s += 'deltats: %s\n' % ', '.join(sl(self.deltats.keys())) 

1026 return s 

1027 

1028 

1029class Batch(object): 

1030 ''' 

1031 Batch of waveforms from window wise data extraction. 

1032 

1033 Encapsulates state and results yielded for each window in window wise 

1034 waveform extraction with the :py:meth:`Pile.chopper` method (when the 

1035 `style='batch'` keyword argument set). 

1036 

1037 *Attributes:* 

1038 

1039 .. py:attribute:: tmin 

1040 

1041 Start of this time window. 

1042 

1043 .. py:attribute:: tmax 

1044 

1045 End of this time window. 

1046 

1047 .. py:attribute:: i 

1048 

1049 Index of this time window in sequence. 

1050 

1051 .. py:attribute:: n 

1052 

1053 Total number of time windows in sequence. 

1054 

1055 .. py:attribute:: traces 

1056 

1057 Extracted waveforms for this time window. 

1058 ''' 

1059 

1060 def __init__(self, tmin, tmax, i, n, traces): 

1061 self.tmin = tmin 

1062 self.tmax = tmax 

1063 self.i = i 

1064 self.n = n 

1065 self.traces = traces 

1066 

1067 

1068class Pile(TracesGroup): 

1069 ''' 

1070 Waveform archive lookup, data loading and caching infrastructure. 

1071 ''' 

1072 

1073 def __init__(self): 

1074 TracesGroup.__init__(self, None) 

1075 self.subpiles = {} 

1076 self.open_files = {} 

1077 self.listeners = [] 

1078 self.abspaths = set() 

1079 

1080 def add_listener(self, obj): 

1081 self.listeners.append(weakref.ref(obj)) 

1082 

1083 def notify_listeners(self, what): 

1084 for ref in self.listeners: 

1085 obj = ref() 

1086 if obj: 

1087 obj.pile_changed(what) 

1088 

1089 def load_files( 

1090 self, filenames, 

1091 filename_attributes=None, 

1092 fileformat='mseed', 

1093 cache=None, 

1094 show_progress=True, 

1095 update_progress=None): 

1096 

1097 load = loader( 

1098 filenames, fileformat, cache, filename_attributes, 

1099 show_progress=show_progress, 

1100 update_progress=update_progress) 

1101 

1102 self.add_files(load) 

1103 

1104 def add_files(self, files): 

1105 for file in files: 

1106 self.add_file(file) 

1107 

1108 def add_file(self, file): 

1109 if file.abspath is not None and file.abspath in self.abspaths: 

1110 logger.warning('File already in pile: %s' % file.abspath) 

1111 return 

1112 

1113 if file.deltatmin is None: 

1114 logger.warning('Sampling rate of all traces are zero in file: %s' % 

1115 file.abspath) 

1116 return 

1117 

1118 subpile = self.dispatch(file) 

1119 subpile.add_file(file) 

1120 if file.abspath is not None: 

1121 self.abspaths.add(file.abspath) 

1122 

1123 def remove_file(self, file): 

1124 subpile = file.get_parent() 

1125 if subpile is not None: 

1126 subpile.remove_file(file) 

1127 if file.abspath is not None: 

1128 self.abspaths.remove(file.abspath) 

1129 

1130 def remove_files(self, files): 

1131 subpile_files = {} 

1132 for file in files: 

1133 subpile = file.get_parent() 

1134 if subpile not in subpile_files: 

1135 subpile_files[subpile] = [] 

1136 

1137 subpile_files[subpile].append(file) 

1138 

1139 for subpile, files in subpile_files.items(): 

1140 subpile.remove_files(files) 

1141 for file in files: 

1142 if file.abspath is not None: 

1143 self.abspaths.remove(file.abspath) 

1144 

1145 def dispatch_key(self, file): 

1146 dt = int(math.floor(math.log(file.deltatmin))) 

1147 return dt 

1148 

1149 def dispatch(self, file): 

1150 k = self.dispatch_key(file) 

1151 if k not in self.subpiles: 

1152 self.subpiles[k] = SubPile(self) 

1153 

1154 return self.subpiles[k] 

1155 

1156 def get_deltats(self): 

1157 return list(self.deltats.keys()) 

1158 

1159 def chop( 

1160 self, tmin, tmax, 

1161 group_selector=None, 

1162 trace_selector=None, 

1163 snap=(round, round), 

1164 include_last=False, 

1165 load_data=True): 

1166 

1167 chopped = [] 

1168 used_files = set() 

1169 

1170 traces = self.relevant(tmin, tmax, group_selector, trace_selector) 

1171 if load_data: 

1172 files_changed = False 

1173 for tr in traces: 

1174 if tr.file and tr.file not in used_files: 

1175 if tr.file.load_data(): 

1176 files_changed = True 

1177 

1178 if tr.file is not None: 

1179 used_files.add(tr.file) 

1180 

1181 if files_changed: 

1182 traces = self.relevant( 

1183 tmin, tmax, group_selector, trace_selector) 

1184 

1185 for tr in traces: 

1186 if not load_data and tr.ydata is not None: 

1187 tr = tr.copy(data=False) 

1188 tr.ydata = None 

1189 

1190 try: 

1191 chopped.append(tr.chop( 

1192 tmin, tmax, 

1193 inplace=False, 

1194 snap=snap, 

1195 include_last=include_last)) 

1196 

1197 except trace.NoData: 

1198 pass 

1199 

1200 return chopped, used_files 

1201 

1202 def _process_chopped( 

1203 self, chopped, degap, maxgap, maxlap, want_incomplete, wmax, wmin, 

1204 tpad): 

1205 

1206 chopped.sort(key=lambda a: a.full_id) 

1207 if degap: 

1208 chopped = degapper(chopped, maxgap=maxgap, maxlap=maxlap) 

1209 

1210 if not want_incomplete: 

1211 chopped_weeded = [] 

1212 for tr in chopped: 

1213 emin = tr.tmin - (wmin-tpad) 

1214 emax = tr.tmax + tr.deltat - (wmax+tpad) 

1215 if (abs(emin) <= 0.5*tr.deltat and abs(emax) <= 0.5*tr.deltat): 

1216 chopped_weeded.append(tr) 

1217 

1218 elif degap: 

1219 if (0. < emin <= 5. * tr.deltat and 

1220 -5. * tr.deltat <= emax < 0.): 

1221 

1222 tr.extend( 

1223 wmin-tpad, 

1224 wmax+tpad-tr.deltat, 

1225 fillmethod='repeat') 

1226 

1227 chopped_weeded.append(tr) 

1228 

1229 chopped = chopped_weeded 

1230 

1231 for tr in chopped: 

1232 tr.wmin = wmin 

1233 tr.wmax = wmax 

1234 

1235 return chopped 

1236 

1237 def chopper( 

1238 self, 

1239 tmin=None, tmax=None, tinc=None, tpad=0., 

1240 group_selector=None, trace_selector=None, 

1241 want_incomplete=True, degap=True, maxgap=5, maxlap=None, 

1242 keep_current_files_open=False, accessor_id=None, 

1243 snap=(round, round), include_last=False, load_data=True, 

1244 style=None): 

1245 

1246 ''' 

1247 Get iterator for shifting window wise data extraction from waveform 

1248 archive. 

1249 

1250 :param tmin: start time (default uses start time of available data) 

1251 :param tmax: end time (default uses end time of available data) 

1252 :param tinc: time increment (window shift time) (default uses 

1253 ``tmax-tmin``) 

1254 :param tpad: padding time appended on either side of the data windows 

1255 (window overlap is ``2*tpad``) 

1256 :param group_selector: filter callback taking :py:class:`TracesGroup` 

1257 objects 

1258 :param trace_selector: filter callback taking 

1259 :py:class:`pyrocko.trace.Trace` objects 

1260 :param want_incomplete: if set to ``False``, gappy/incomplete traces 

1261 are discarded from the results 

1262 :param degap: whether to try to connect traces and to remove gaps and 

1263 overlaps 

1264 :param maxgap: maximum gap size in samples which is filled with 

1265 interpolated samples when ``degap`` is ``True`` 

1266 :param maxlap: maximum overlap size in samples which is removed when 

1267 ``degap`` is ``True`` 

1268 :param keep_current_files_open: whether to keep cached trace data in 

1269 memory after the iterator has ended 

1270 :param accessor_id: if given, used as a key to identify different 

1271 points of extraction for the decision of when to release cached 

1272 trace data (should be used when data is alternately extracted from 

1273 more than one region / selection) 

1274 :param snap: replaces Python's :py:func:`round` function which is used 

1275 to determine indices where to start and end the trace data array 

1276 :param include_last: whether to include last sample 

1277 :param load_data: whether to load the waveform data. If set to 

1278 ``False``, traces with no data samples, but with correct 

1279 meta-information are returned 

1280 :param style: set to ``'batch'`` to yield waveforms and information 

1281 about the chopper state as :py:class:`Batch` objects. By default 

1282 lists of :py:class:`pyrocko.trace.Trace` objects are yielded. 

1283 :returns: iterator providing extracted waveforms for each extracted 

1284 window. See ``style`` argument for details. 

1285 ''' 

1286 if tmin is None: 

1287 if self.tmin is None: 

1288 logger.warning('Pile\'s tmin is not set - pile may be empty.') 

1289 return 

1290 tmin = self.tmin + tpad 

1291 

1292 if tmax is None: 

1293 if self.tmax is None: 

1294 logger.warning('Pile\'s tmax is not set - pile may be empty.') 

1295 return 

1296 tmax = self.tmax - tpad 

1297 

1298 if not self.is_relevant(tmin-tpad, tmax+tpad, group_selector): 

1299 return 

1300 

1301 if accessor_id not in self.open_files: 

1302 self.open_files[accessor_id] = set() 

1303 

1304 open_files = self.open_files[accessor_id] 

1305 

1306 if tinc is None: 

1307 tinc = tmax - tmin 

1308 nwin = 1 

1309 else: 

1310 eps = tinc * 1e-6 

1311 if tinc != 0.0: 

1312 nwin = int(((tmax - eps) - tmin) / tinc) + 1 

1313 else: 

1314 nwin = 1 

1315 

1316 for iwin in range(nwin): 

1317 wmin, wmax = tmin+iwin*tinc, min(tmin+(iwin+1)*tinc, tmax) 

1318 

1319 chopped, used_files = self.chop( 

1320 wmin-tpad, wmax+tpad, group_selector, trace_selector, snap, 

1321 include_last, load_data) 

1322 

1323 for file in used_files - open_files: 

1324 # increment datause counter on newly opened files 

1325 file.use_data() 

1326 

1327 open_files.update(used_files) 

1328 

1329 processed = self._process_chopped( 

1330 chopped, degap, maxgap, maxlap, want_incomplete, wmax, wmin, 

1331 tpad) 

1332 

1333 if style == 'batch': 

1334 yield Batch( 

1335 tmin=wmin, 

1336 tmax=wmax, 

1337 i=iwin, 

1338 n=nwin, 

1339 traces=processed) 

1340 

1341 else: 

1342 yield processed 

1343 

1344 unused_files = open_files - used_files 

1345 

1346 while unused_files: 

1347 file = unused_files.pop() 

1348 file.drop_data() 

1349 open_files.remove(file) 

1350 

1351 if not keep_current_files_open: 

1352 while open_files: 

1353 file = open_files.pop() 

1354 file.drop_data() 

1355 

1356 def all(self, *args, **kwargs): 

1357 ''' 

1358 Shortcut to aggregate :py:meth:`chopper` output into a single list. 

1359 ''' 

1360 

1361 alltraces = [] 

1362 for traces in self.chopper(*args, **kwargs): 

1363 alltraces.extend(traces) 

1364 

1365 return alltraces 

1366 

1367 def iter_all(self, *args, **kwargs): 

1368 for traces in self.chopper(*args, **kwargs): 

1369 for tr in traces: 

1370 yield tr 

1371 

1372 def chopper_grouped(self, gather, progress=None, *args, **kwargs): 

1373 keys = self.gather_keys(gather) 

1374 if len(keys) == 0: 

1375 return 

1376 

1377 outer_group_selector = None 

1378 if 'group_selector' in kwargs: 

1379 outer_group_selector = kwargs['group_selector'] 

1380 

1381 outer_trace_selector = None 

1382 if 'trace_selector' in kwargs: 

1383 outer_trace_selector = kwargs['trace_selector'] 

1384 

1385 # the use of this gather-cache makes it impossible to modify the pile 

1386 # during chopping 

1387 gather_cache = {} 

1388 pbar = None 

1389 try: 

1390 if progress is not None: 

1391 pbar = util.progressbar(progress, len(keys)) 

1392 

1393 for ikey, key in enumerate(keys): 

1394 def tsel(tr): 

1395 return gather(tr) == key and ( 

1396 outer_trace_selector is None 

1397 or outer_trace_selector(tr)) 

1398 

1399 def gsel(gr): 

1400 if gr not in gather_cache: 

1401 gather_cache[gr] = gr.gather_keys(gather) 

1402 

1403 return key in gather_cache[gr] and ( 

1404 outer_group_selector is None 

1405 or outer_group_selector(gr)) 

1406 

1407 kwargs['trace_selector'] = tsel 

1408 kwargs['group_selector'] = gsel 

1409 

1410 for traces in self.chopper(*args, **kwargs): 

1411 yield traces 

1412 

1413 if pbar: 

1414 pbar.update(ikey+1) 

1415 

1416 finally: 

1417 if pbar: 

1418 pbar.finish() 

1419 

1420 def gather_keys(self, gather, selector=None): 

1421 keys = set() 

1422 for subpile in self.subpiles.values(): 

1423 keys |= subpile.gather_keys(gather, selector) 

1424 

1425 return sorted(keys) 

1426 

1427 def iter_traces( 

1428 self, 

1429 load_data=False, 

1430 return_abspath=False, 

1431 group_selector=None, 

1432 trace_selector=None): 

1433 

1434 ''' 

1435 Iterate over all traces in pile. 

1436 

1437 :param load_data: whether to load the waveform data, by default empty 

1438 traces are yielded 

1439 :param return_abspath: if ``True`` yield tuples containing absolute 

1440 file path and :py:class:`pyrocko.trace.Trace` objects 

1441 :param group_selector: filter callback taking :py:class:`TracesGroup` 

1442 objects 

1443 :param trace_selector: filter callback taking 

1444 :py:class:`pyrocko.trace.Trace` objects 

1445 

1446 Example; yields only traces, where the station code is 'HH1':: 

1447 

1448 test_pile = pile.make_pile('/local/test_trace_directory') 

1449 for t in test_pile.iter_traces( 

1450 trace_selector=lambda tr: tr.station=='HH1'): 

1451 

1452 print t 

1453 ''' 

1454 

1455 for subpile in self.subpiles.values(): 

1456 if not group_selector or group_selector(subpile): 

1457 for tr in subpile.iter_traces(load_data, return_abspath, 

1458 group_selector, trace_selector): 

1459 yield tr 

1460 

1461 def iter_files(self): 

1462 for subpile in self.subpiles.values(): 

1463 for file in subpile.iter_files(): 

1464 yield file 

1465 

1466 def reload_modified(self): 

1467 modified = False 

1468 for subpile in self.subpiles.values(): 

1469 modified |= subpile.reload_modified() 

1470 

1471 return modified 

1472 

1473 def get_tmin(self): 

1474 return self.tmin 

1475 

1476 def get_tmax(self): 

1477 return self.tmax 

1478 

1479 def get_deltatmin(self): 

1480 return self.deltatmin 

1481 

1482 def get_deltatmax(self): 

1483 return self.deltatmax 

1484 

1485 def is_empty(self): 

1486 return self.tmin is None and self.tmax is None 

1487 

1488 def __str__(self): 

1489 if self.tmin is not None and self.tmax is not None: 

1490 tmin = util.time_to_str(self.tmin) 

1491 tmax = util.time_to_str(self.tmax) 

1492 s = 'Pile\n' 

1493 s += 'number of subpiles: %i\n' % len(self.subpiles) 

1494 s += 'timerange: %s - %s\n' % (tmin, tmax) 

1495 s += 'networks: %s\n' % ', '.join(sl(self.networks.keys())) 

1496 s += 'stations: %s\n' % ', '.join(sl(self.stations.keys())) 

1497 s += 'locations: %s\n' % ', '.join(sl(self.locations.keys())) 

1498 s += 'channels: %s\n' % ', '.join(sl(self.channels.keys())) 

1499 s += 'deltats: %s\n' % ', '.join(sl(self.deltats.keys())) 

1500 

1501 else: 

1502 s = 'empty Pile' 

1503 

1504 return s 

1505 

1506 def snuffle(self, **kwargs): 

1507 ''' 

1508 Visualize it. 

1509 

1510 :param stations: list of `pyrocko.model.Station` objects or ``None`` 

1511 :param events: list of `pyrocko.model.Event` objects or ``None`` 

1512 :param markers: list of `pyrocko.gui_util.Marker` objects or ``None`` 

1513 :param ntracks: float, number of tracks to be shown initially 

1514 (default: 12) 

1515 :param follow: time interval (in seconds) for real time follow mode or 

1516 ``None`` 

1517 :param controls: bool, whether to show the main controls (default: 

1518 ``True``) 

1519 :param opengl: bool, whether to use opengl (default: ``False``) 

1520 ''' 

1521 

1522 from pyrocko.gui.snuffler import snuffle 

1523 snuffle(self, **kwargs) 

1524 

1525 

1526def make_pile( 

1527 paths=None, selector=None, regex=None, 

1528 fileformat='mseed', 

1529 cachedirname=None, show_progress=True): 

1530 

1531 ''' 

1532 Create pile from given file and directory names. 

1533 

1534 :param paths: filenames and/or directories to look for traces. If paths is 

1535 ``None`` ``sys.argv[1:]`` is used. 

1536 :param selector: lambda expression taking group dict of regex match object 

1537 as a single argument and which returns true or false to keep or reject 

1538 a file 

1539 :param regex: regular expression which filenames have to match 

1540 :param fileformat: format of the files ('mseed', 'sac', 'kan', 

1541 'from_extension', 'detect') 

1542 :param cachedirname: loader cache is stored under this directory. It is 

1543 created as neccessary. 

1544 :param show_progress: show progress bar and other progress information 

1545 ''' 

1546 

1547 if show_progress_force_off: 

1548 show_progress = False 

1549 

1550 if isinstance(paths, str): 

1551 paths = [paths] 

1552 

1553 if paths is None: 

1554 paths = sys.argv[1:] 

1555 

1556 if cachedirname is None: 

1557 cachedirname = config.config().cache_dir 

1558 

1559 fns = util.select_files( 

1560 paths, include=regex, selector=selector, show_progress=show_progress) 

1561 

1562 cache = get_cache(cachedirname) 

1563 p = Pile() 

1564 p.load_files( 

1565 sorted(fns), 

1566 cache=cache, 

1567 fileformat=fileformat, 

1568 show_progress=show_progress) 

1569 

1570 return p 

1571 

1572 

1573class Injector(trace.States): 

1574 

1575 def __init__( 

1576 self, pile, 

1577 fixation_length=None, 

1578 path=None, 

1579 format='from_extension', 

1580 forget_fixed=False): 

1581 

1582 trace.States.__init__(self) 

1583 self._pile = pile 

1584 self._fixation_length = fixation_length 

1585 self._format = format 

1586 self._path = path 

1587 self._forget_fixed = forget_fixed 

1588 

1589 def set_fixation_length(self, length): 

1590 ''' 

1591 Set length after which the fixation method is called on buffer traces. 

1592 

1593 The length should be given in seconds. Give None to disable. 

1594 ''' 

1595 self.fixate_all() 

1596 self._fixation_length = length # in seconds 

1597 

1598 def set_save_path( 

1599 self, 

1600 path='dump_%(network)s.%(station)s.%(location)s.%(channel)s_' 

1601 '%(tmin)s_%(tmax)s.mseed'): 

1602 

1603 self.fixate_all() 

1604 self._path = path 

1605 

1606 def inject(self, trace): 

1607 logger.debug('Received a trace: %s' % trace) 

1608 

1609 buf = self.get(trace) 

1610 if buf is None: 

1611 trbuf = trace.copy() 

1612 buf = MemTracesFile(None, [trbuf]) 

1613 self._pile.add_file(buf) 

1614 self.set(trace, buf) 

1615 

1616 else: 

1617 self._pile.remove_file(buf) 

1618 trbuf = buf.get_traces()[0] 

1619 buf.remove(trbuf) 

1620 trbuf.append(trace.ydata) 

1621 buf.add(trbuf) 

1622 self._pile.add_file(buf) 

1623 self.set(trace, buf) 

1624 

1625 trbuf = buf.get_traces()[0] 

1626 if self._fixation_length is not None: 

1627 if trbuf.tmax - trbuf.tmin > self._fixation_length: 

1628 self._fixate(buf, complete=False) 

1629 

1630 def fixate_all(self): 

1631 for state in list(self._states.values()): 

1632 self._fixate(state[-1]) 

1633 

1634 self._states = {} 

1635 

1636 def free(self, buf): 

1637 self._fixate(buf) 

1638 

1639 def _fixate(self, buf, complete=True): 

1640 trbuf = buf.get_traces()[0] 

1641 del_state = True 

1642 if self._path: 

1643 if self._fixation_length is not None: 

1644 ttmin = trbuf.tmin 

1645 ytmin = util.year_start(ttmin) 

1646 n = int(math.floor((ttmin - ytmin) / self._fixation_length)) 

1647 tmin = ytmin + n*self._fixation_length 

1648 traces = [] 

1649 t = tmin 

1650 while t <= trbuf.tmax: 

1651 try: 

1652 traces.append( 

1653 trbuf.chop( 

1654 t, 

1655 t+self._fixation_length, 

1656 inplace=False, 

1657 snap=(math.ceil, math.ceil))) 

1658 

1659 except trace.NoData: 

1660 pass 

1661 t += self._fixation_length 

1662 

1663 if abs(traces[-1].tmax - (t - trbuf.deltat)) < \ 

1664 trbuf.deltat/100. or complete: 

1665 

1666 self._pile.remove_file(buf) 

1667 

1668 else: # reinsert incomplete last part 

1669 new_trbuf = traces.pop() 

1670 self._pile.remove_file(buf) 

1671 buf.remove(trbuf) 

1672 buf.add(new_trbuf) 

1673 self._pile.add_file(buf) 

1674 del_state = False 

1675 

1676 else: 

1677 traces = [trbuf] 

1678 self._pile.remove_file(buf) 

1679 

1680 fns = io.save(traces, self._path, format=self._format) 

1681 

1682 if not self._forget_fixed: 

1683 self._pile.load_files( 

1684 fns, show_progress=False, fileformat=self._format) 

1685 

1686 if del_state: 

1687 del self._states[trbuf.nslc_id] 

1688 

1689 def __del__(self): 

1690 self.fixate_all()