1# http://pyrocko.org - GPLv3 

2# 

3# The Pyrocko Developers, 21st Century 

4# ---|P------/S----------~Lg---------- 

5from __future__ import absolute_import, division 

6 

7import errno 

8import time 

9import os 

10import struct 

11import weakref 

12import math 

13import shutil 

14try: 

15 import fcntl 

16except ImportError: 

17 fcntl = None 

18import copy 

19import logging 

20import re 

21import hashlib 

22 

23import numpy as num 

24from scipy import signal 

25 

26from . import meta 

27from .error import StoreError 

28try: 

29 from . import store_ext 

30except ImportError: 

31 store_ext = None 

32from pyrocko import util, spit 

33 

34logger = logging.getLogger('pyrocko.gf.store') 

35op = os.path 

36 

37# gf store endianness 

38E = '<' 

39 

40gf_dtype = num.dtype(num.float32) 

41gf_dtype_store = num.dtype(E + 'f4') 

42 

43gf_dtype_nbytes_per_sample = 4 

44 

45gf_store_header_dtype = [ 

46 ('nrecords', E + 'u8'), 

47 ('deltat', E + 'f4'), 

48] 

49 

50gf_store_header_fmt = E + 'Qf' 

51gf_store_header_fmt_size = struct.calcsize(gf_store_header_fmt) 

52 

53gf_record_dtype = num.dtype([ 

54 ('data_offset', E + 'u8'), 

55 ('itmin', E + 'i4'), 

56 ('nsamples', E + 'u4'), 

57 ('begin_value', E + 'f4'), 

58 ('end_value', E + 'f4'), 

59]) 

60 

61 

62class SeismosizerErrorEnum: 

63 SUCCESS = 0 

64 INVALID_RECORD = 1 

65 EMPTY_RECORD = 2 

66 BAD_RECORD = 3 

67 ALLOC_FAILED = 4 

68 BAD_REQUEST = 5 

69 BAD_DATA_OFFSET = 6 

70 READ_DATA_FAILED = 7 

71 SEEK_INDEX_FAILED = 8 

72 READ_INDEX_FAILED = 9 

73 FSTAT_TRACES_FAILED = 10 

74 BAD_STORE = 11 

75 MMAP_INDEX_FAILED = 12 

76 MMAP_TRACES_FAILED = 13 

77 INDEX_OUT_OF_BOUNDS = 14 

78 NTARGETS_OUT_OF_BOUNDS = 15 

79 

80 

81def valid_string_id(s): 

82 return re.match(meta.StringID.pattern, s) 

83 

84 

85def check_string_id(s): 

86 if not valid_string_id(s): 

87 raise ValueError('invalid name %s' % s) 

88 

89# - data_offset 

90# 

91# Data file offset of first sample in bytes (the seek value). 

92# Special values: 

93# 

94# 0x00 - missing trace 

95# 0x01 - zero trace (GF trace is physically zero) 

96# 0x02 - short trace of 1 or 2 samples (no need for data allocation) 

97# 

98# - itmin 

99# 

100# Time of first sample of the trace as a multiple of the sampling interval. All 

101# GF samples must be evaluated at multiples of the sampling interval with 

102# respect to a global reference time zero. Must be set also for zero traces. 

103# 

104# - nsamples 

105# 

106# Number of samples in the GF trace. Should be set to zero for zero traces. 

107# 

108# - begin_value, end_value 

109# 

110# Values of first and last sample. These values are included in data[] 

111# redunantly. 

112 

113 

114class NotMultipleOfSamplingInterval(Exception): 

115 pass 

116 

117 

118sampling_check_eps = 1e-5 

119 

120 

121class GFTrace(object): 

122 ''' 

123 Green's Function trace class for handling traces from the GF store. 

124 ''' 

125 

126 @classmethod 

127 def from_trace(cls, tr): 

128 return cls(data=tr.ydata.copy(), tmin=tr.tmin, deltat=tr.deltat) 

129 

130 def __init__(self, data=None, itmin=None, deltat=1.0, 

131 is_zero=False, begin_value=None, end_value=None, tmin=None): 

132 

133 assert sum((x is None) for x in (tmin, itmin)) == 1, \ 

134 'GFTrace: either tmin or itmin must be given'\ 

135 

136 if tmin is not None: 

137 itmin = int(round(tmin / deltat)) 

138 if abs(itmin*deltat - tmin) > sampling_check_eps*deltat: 

139 raise NotMultipleOfSamplingInterval( 

140 'GFTrace: tmin (%g) is not a multiple of sampling ' 

141 'interval (%g)' % (tmin, deltat)) 

142 

143 if data is not None: 

144 data = num.asarray(data, dtype=gf_dtype) 

145 else: 

146 data = num.array([], dtype=gf_dtype) 

147 begin_value = 0.0 

148 end_value = 0.0 

149 

150 self.data = weakref.ref(data)() 

151 self.itmin = itmin 

152 self.deltat = deltat 

153 self.is_zero = is_zero 

154 self.n_records_stacked = 0. 

155 self.t_stack = 0. 

156 self.t_optimize = 0. 

157 self.err = SeismosizerErrorEnum.SUCCESS 

158 

159 if data is not None and data.size > 0: 

160 if begin_value is None: 

161 begin_value = data[0] 

162 if end_value is None: 

163 end_value = data[-1] 

164 

165 self.begin_value = begin_value 

166 self.end_value = end_value 

167 

168 @property 

169 def t(self): 

170 ''' 

171 Time vector of the GF trace. 

172 

173 :returns: Time vector 

174 :rtype: :py:class:`numpy.ndarray` 

175 ''' 

176 return num.linspace( 

177 self.itmin*self.deltat, 

178 (self.itmin+self.data.size-1)*self.deltat, self.data.size) 

179 

180 def __str__(self, itmin=0): 

181 

182 if self.is_zero: 

183 return 'ZERO' 

184 

185 s = [] 

186 for i in range(itmin, self.itmin + self.data.size): 

187 if i >= self.itmin and i < self.itmin + self.data.size: 

188 s.append('%7.4g' % self.data[i-self.itmin]) 

189 else: 

190 s.append(' '*7) 

191 

192 return '|'.join(s) 

193 

194 def to_trace(self, net, sta, loc, cha): 

195 from pyrocko import trace 

196 return trace.Trace( 

197 net, sta, loc, cha, 

198 ydata=self.data, 

199 deltat=self.deltat, 

200 tmin=self.itmin*self.deltat) 

201 

202 

203class GFValue(object): 

204 

205 def __init__(self, value): 

206 self.value = value 

207 self.n_records_stacked = 0. 

208 self.t_stack = 0. 

209 self.t_optimize = 0. 

210 

211 

212Zero = GFTrace(is_zero=True, itmin=0) 

213 

214 

215def make_same_span(tracesdict): 

216 

217 traces = tracesdict.values() 

218 

219 nonzero = [tr for tr in traces if not tr.is_zero] 

220 if not nonzero: 

221 return {k: Zero for k in tracesdict.keys()} 

222 

223 deltat = nonzero[0].deltat 

224 

225 itmin = min(tr.itmin for tr in nonzero) 

226 itmax = max(tr.itmin+tr.data.size for tr in nonzero) - 1 

227 

228 out = {} 

229 for k, tr in tracesdict.items(): 

230 n = itmax - itmin + 1 

231 if tr.itmin != itmin or tr.data.size != n: 

232 data = num.zeros(n, dtype=gf_dtype) 

233 if not tr.is_zero: 

234 lo = tr.itmin-itmin 

235 hi = lo + tr.data.size 

236 data[:lo] = tr.data[0] 

237 data[lo:hi] = tr.data 

238 data[hi:] = tr.data[-1] 

239 

240 tr = GFTrace(data, itmin, deltat) 

241 

242 out[k] = tr 

243 

244 return out 

245 

246 

247class CannotCreate(StoreError): 

248 pass 

249 

250 

251class CannotOpen(StoreError): 

252 pass 

253 

254 

255class DuplicateInsert(StoreError): 

256 pass 

257 

258 

259class ShortRead(StoreError): 

260 def __str__(self): 

261 return 'unexpected end of data (truncated traces file?)' 

262 

263 

264class NotAllowedToInterpolate(StoreError): 

265 def __str__(self): 

266 return 'not allowed to interpolate' 

267 

268 

269class NoSuchExtra(StoreError): 

270 def __init__(self, s): 

271 StoreError.__init__(self) 

272 self.value = s 

273 

274 def __str__(self): 

275 return 'extra information for key "%s" not found.' % self.value 

276 

277 

278class NoSuchPhase(StoreError): 

279 def __init__(self, s): 

280 StoreError.__init__(self) 

281 self.value = s 

282 

283 def __str__(self): 

284 return 'phase for key "%s" not found. ' \ 

285 'Running "fomosto ttt" may be needed.' % self.value 

286 

287 

288def remove_if_exists(fn, force=False): 

289 if os.path.exists(fn): 

290 if force: 

291 os.remove(fn) 

292 else: 

293 raise CannotCreate('file %s already exists' % fn) 

294 

295 

296def get_extra_path(store_dir, key): 

297 check_string_id(key) 

298 return os.path.join(store_dir, 'extra', key) 

299 

300 

301class BaseStore(object): 

302 

303 @staticmethod 

304 def lock_fn_(store_dir): 

305 return os.path.join(store_dir, 'lock') 

306 

307 @staticmethod 

308 def index_fn_(store_dir): 

309 return os.path.join(store_dir, 'index') 

310 

311 @staticmethod 

312 def data_fn_(store_dir): 

313 return os.path.join(store_dir, 'traces') 

314 

315 @staticmethod 

316 def config_fn_(store_dir): 

317 return os.path.join(store_dir, 'config') 

318 

319 @staticmethod 

320 def create(store_dir, deltat, nrecords, force=False): 

321 

322 try: 

323 util.ensuredir(store_dir) 

324 except Exception: 

325 raise CannotCreate('cannot create directory %s' % store_dir) 

326 

327 index_fn = BaseStore.index_fn_(store_dir) 

328 data_fn = BaseStore.data_fn_(store_dir) 

329 

330 for fn in (index_fn, data_fn): 

331 remove_if_exists(fn, force) 

332 

333 with open(index_fn, 'wb') as f: 

334 f.write(struct.pack(gf_store_header_fmt, nrecords, deltat)) 

335 records = num.zeros(nrecords, dtype=gf_record_dtype) 

336 records.tofile(f) 

337 

338 with open(data_fn, 'wb') as f: 

339 f.write(b'\0' * 32) 

340 

341 def __init__(self, store_dir, mode='r', use_memmap=True): 

342 assert mode in 'rw' 

343 self.store_dir = store_dir 

344 self.mode = mode 

345 self._use_memmap = use_memmap 

346 self._nrecords = None 

347 self._deltat = None 

348 self._f_index = None 

349 self._f_data = None 

350 self._records = None 

351 self.cstore = None 

352 

353 def open(self): 

354 assert self._f_index is None 

355 index_fn = self.index_fn() 

356 data_fn = self.data_fn() 

357 

358 if self.mode == 'r': 

359 fmode = 'rb' 

360 elif self.mode == 'w': 

361 fmode = 'r+b' 

362 else: 

363 assert False, 'invalid mode: %s' % self.mode 

364 

365 try: 

366 self._f_index = open(index_fn, fmode) 

367 self._f_data = open(data_fn, fmode) 

368 except Exception: 

369 self.mode = '' 

370 raise CannotOpen('cannot open gf store: %s' % self.store_dir) 

371 

372 try: 

373 # replace single precision deltat value in store with the double 

374 # precision value from the config, if available 

375 self.cstore = store_ext.store_init( 

376 self._f_index.fileno(), self._f_data.fileno(), 

377 self.get_deltat() or 0.0) 

378 

379 except store_ext.StoreExtError as e: 

380 raise StoreError(str(e)) 

381 

382 while True: 

383 try: 

384 dataheader = self._f_index.read(gf_store_header_fmt_size) 

385 break 

386 

387 except IOError as e: 

388 # occasionally got this one on an NFS volume 

389 if e.errno == errno.EBUSY: 

390 time.sleep(0.01) 

391 else: 

392 raise 

393 

394 nrecords, deltat = struct.unpack(gf_store_header_fmt, dataheader) 

395 self._nrecords = nrecords 

396 self._deltat = deltat 

397 

398 self._load_index() 

399 

400 def __del__(self): 

401 if self.mode != '': 

402 self.close() 

403 

404 def lock(self): 

405 if not fcntl: 

406 lock_fn = self.lock_fn() 

407 util.create_lockfile(lock_fn) 

408 else: 

409 if not self._f_index: 

410 self.open() 

411 

412 while True: 

413 try: 

414 fcntl.lockf(self._f_index, fcntl.LOCK_EX) 

415 break 

416 

417 except IOError as e: 

418 if e.errno == errno.ENOLCK: 

419 time.sleep(0.01) 

420 else: 

421 raise 

422 

423 def unlock(self): 

424 if not fcntl: 

425 lock_fn = self.lock_fn() 

426 util.delete_lockfile(lock_fn) 

427 else: 

428 self._f_data.flush() 

429 self._f_index.flush() 

430 fcntl.lockf(self._f_index, fcntl.LOCK_UN) 

431 

432 def put(self, irecord, trace): 

433 self._put(irecord, trace) 

434 

435 def get_record(self, irecord): 

436 return self._get_record(irecord) 

437 

438 def get_span(self, irecord, decimate=1): 

439 return self._get_span(irecord, decimate=decimate) 

440 

441 def get(self, irecord, itmin=None, nsamples=None, decimate=1, 

442 implementation='c'): 

443 return self._get(irecord, itmin, nsamples, decimate, implementation) 

444 

445 def sum(self, irecords, delays, weights, itmin=None, 

446 nsamples=None, decimate=1, implementation='c', 

447 optimization='enable'): 

448 return self._sum(irecords, delays, weights, itmin, nsamples, decimate, 

449 implementation, optimization) 

450 

451 def irecord_format(self): 

452 return util.zfmt(self._nrecords) 

453 

454 def str_irecord(self, irecord): 

455 return self.irecord_format() % irecord 

456 

457 def close(self): 

458 if self.mode == 'w': 

459 if not self._f_index: 

460 self.open() 

461 self._save_index() 

462 

463 if self._f_data: 

464 self._f_data.close() 

465 self._f_data = None 

466 

467 if self._f_index: 

468 self._f_index.close() 

469 self._f_index = None 

470 

471 del self._records 

472 self._records = None 

473 

474 self.mode = '' 

475 

476 def _get_record(self, irecord): 

477 if not self._f_index: 

478 self.open() 

479 

480 return self._records[irecord] 

481 

482 def _get(self, irecord, itmin, nsamples, decimate, implementation): 

483 ''' 

484 Retrieve complete GF trace from storage. 

485 ''' 

486 

487 if not self._f_index: 

488 self.open() 

489 

490 if not self.mode == 'r': 

491 raise StoreError('store not open in read mode') 

492 

493 if implementation == 'c' and decimate == 1: 

494 

495 if nsamples is None: 

496 nsamples = -1 

497 

498 if itmin is None: 

499 itmin = 0 

500 

501 try: 

502 return GFTrace(*store_ext.store_get( 

503 self.cstore, int(irecord), int(itmin), int(nsamples))) 

504 except store_ext.StoreExtError as e: 

505 raise StoreError(str(e)) 

506 

507 else: 

508 return self._get_impl_reference(irecord, itmin, nsamples, decimate) 

509 

510 def get_deltat(self): 

511 return self._deltat 

512 

513 def _get_impl_reference(self, irecord, itmin, nsamples, decimate): 

514 deltat = self.get_deltat() 

515 

516 if not (0 <= irecord < self._nrecords): 

517 raise StoreError('invalid record number requested ' 

518 '(irecord = %i, nrecords = %i)' % 

519 (irecord, self._nrecords)) 

520 

521 ipos, itmin_data, nsamples_data, begin_value, end_value = \ 

522 self._records[irecord] 

523 

524 if None in (itmin, nsamples): 

525 itmin = itmin_data 

526 itmax = itmin_data + nsamples_data - 1 

527 nsamples = nsamples_data 

528 else: 

529 itmin = itmin * decimate 

530 nsamples = nsamples * decimate 

531 itmax = itmin + nsamples - decimate 

532 

533 if ipos == 0: 

534 return None 

535 

536 elif ipos == 1: 

537 itmin_ext = (max(itmin, itmin_data)//decimate) * decimate 

538 return GFTrace(is_zero=True, itmin=itmin_ext//decimate) 

539 

540 if decimate == 1: 

541 ilo = max(itmin, itmin_data) - itmin_data 

542 ihi = min(itmin+nsamples, itmin_data+nsamples_data) - itmin_data 

543 data = self._get_data(ipos, begin_value, end_value, ilo, ihi) 

544 

545 return GFTrace(data, itmin=itmin_data+ilo, deltat=deltat, 

546 begin_value=begin_value, end_value=end_value) 

547 

548 else: 

549 itmax_data = itmin_data + nsamples_data - 1 

550 

551 # put begin and end to multiples of new sampling rate 

552 itmin_ext = (max(itmin, itmin_data)//decimate) * decimate 

553 itmax_ext = -((-min(itmax, itmax_data))//decimate) * decimate 

554 nsamples_ext = itmax_ext - itmin_ext + 1 

555 

556 # add some padding for the aa filter 

557 order = 30 

558 itmin_ext_pad = itmin_ext - order//2 

559 itmax_ext_pad = itmax_ext + order//2 

560 nsamples_ext_pad = itmax_ext_pad - itmin_ext_pad + 1 

561 

562 itmin_overlap = max(itmin_data, itmin_ext_pad) 

563 itmax_overlap = min(itmax_data, itmax_ext_pad) 

564 

565 ilo = itmin_overlap - itmin_ext_pad 

566 ihi = max(ilo, itmax_overlap - itmin_ext_pad + 1) 

567 ilo_data = itmin_overlap - itmin_data 

568 ihi_data = max(ilo_data, itmax_overlap - itmin_data + 1) 

569 

570 data_ext_pad = num.empty(nsamples_ext_pad, dtype=gf_dtype) 

571 data_ext_pad[ilo:ihi] = self._get_data( 

572 ipos, begin_value, end_value, ilo_data, ihi_data) 

573 

574 data_ext_pad[:ilo] = begin_value 

575 data_ext_pad[ihi:] = end_value 

576 

577 b = signal.firwin(order + 1, 1. / decimate, window='hamming') 

578 a = 1. 

579 data_filt_pad = signal.lfilter(b, a, data_ext_pad) 

580 data_deci = data_filt_pad[order:order+nsamples_ext:decimate] 

581 if data_deci.size >= 1: 

582 if itmin_ext <= itmin_data: 

583 data_deci[0] = begin_value 

584 

585 if itmax_ext >= itmax_data: 

586 data_deci[-1] = end_value 

587 

588 return GFTrace(data_deci, itmin_ext//decimate, 

589 deltat*decimate, 

590 begin_value=begin_value, end_value=end_value) 

591 

592 def _get_span(self, irecord, decimate=1): 

593 ''' 

594 Get temporal extent of GF trace at given index. 

595 ''' 

596 

597 if not self._f_index: 

598 self.open() 

599 

600 assert 0 <= irecord < self._nrecords, \ 

601 'irecord = %i, nrecords = %i' % (irecord, self._nrecords) 

602 

603 (_, itmin, nsamples, _, _) = self._records[irecord] 

604 

605 itmax = itmin + nsamples - 1 

606 

607 if decimate == 1: 

608 return itmin, itmax 

609 else: 

610 if nsamples == 0: 

611 return itmin//decimate, itmin//decimate - 1 

612 else: 

613 return itmin//decimate, -((-itmax)//decimate) 

614 

615 def _put(self, irecord, trace): 

616 ''' 

617 Save GF trace to storage. 

618 ''' 

619 

620 if not self._f_index: 

621 self.open() 

622 

623 deltat = self.get_deltat() 

624 

625 assert self.mode == 'w' 

626 assert trace.is_zero or \ 

627 abs(trace.deltat - deltat) < 1e-7 * deltat 

628 assert 0 <= irecord < self._nrecords, \ 

629 'irecord = %i, nrecords = %i' % (irecord, self._nrecords) 

630 

631 if self._records[irecord][0] != 0: 

632 raise DuplicateInsert('record %i already in store' % irecord) 

633 

634 if trace.is_zero or num.all(trace.data == 0.0): 

635 self._records[irecord] = (1, trace.itmin, 0, 0., 0.) 

636 return 

637 

638 ndata = trace.data.size 

639 

640 if ndata > 2: 

641 self._f_data.seek(0, 2) 

642 ipos = self._f_data.tell() 

643 trace.data.astype(gf_dtype_store).tofile(self._f_data) 

644 else: 

645 ipos = 2 

646 

647 self._records[irecord] = (ipos, trace.itmin, ndata, 

648 trace.data[0], trace.data[-1]) 

649 

650 def _sum_impl_alternative(self, irecords, delays, weights, itmin, nsamples, 

651 decimate): 

652 

653 ''' 

654 Sum delayed and weighted GF traces. 

655 ''' 

656 

657 if not self._f_index: 

658 self.open() 

659 

660 assert self.mode == 'r' 

661 

662 deltat = self.get_deltat() * decimate 

663 

664 if len(irecords) == 0: 

665 if None in (itmin, nsamples): 

666 return Zero 

667 else: 

668 return GFTrace( 

669 num.zeros(nsamples, dtype=gf_dtype), itmin, 

670 deltat, is_zero=True) 

671 

672 assert len(irecords) == len(delays) 

673 assert len(irecords) == len(weights) 

674 

675 if None in (itmin, nsamples): 

676 itmin_delayed, itmax_delayed = [], [] 

677 for irecord, delay in zip(irecords, delays): 

678 itmin, itmax = self._get_span(irecord, decimate=decimate) 

679 itmin_delayed.append(itmin + delay/deltat) 

680 itmax_delayed.append(itmax + delay/deltat) 

681 

682 itmin = int(math.floor(min(itmin_delayed))) 

683 nsamples = int(math.ceil(max(itmax_delayed))) - itmin + 1 

684 

685 out = num.zeros(nsamples, dtype=gf_dtype) 

686 if nsamples == 0: 

687 return GFTrace(out, itmin, deltat) 

688 

689 for ii in range(len(irecords)): 

690 irecord = irecords[ii] 

691 delay = delays[ii] 

692 weight = weights[ii] 

693 

694 if weight == 0.0: 

695 continue 

696 

697 idelay_floor = int(math.floor(delay/deltat)) 

698 idelay_ceil = int(math.ceil(delay/deltat)) 

699 

700 gf_trace = self._get( 

701 irecord, 

702 itmin - idelay_ceil, 

703 nsamples + idelay_ceil - idelay_floor, 

704 decimate, 

705 'reference') 

706 

707 assert gf_trace.itmin >= itmin - idelay_ceil 

708 assert gf_trace.data.size <= nsamples + idelay_ceil - idelay_floor 

709 

710 if gf_trace.is_zero: 

711 continue 

712 

713 ilo = gf_trace.itmin - itmin + idelay_floor 

714 ihi = ilo + gf_trace.data.size + (idelay_ceil-idelay_floor) 

715 

716 data = gf_trace.data 

717 

718 if idelay_floor == idelay_ceil: 

719 out[ilo:ihi] += data * weight 

720 else: 

721 if data.size: 

722 k = 1 

723 if ihi <= nsamples: 

724 out[ihi-1] += gf_trace.end_value * \ 

725 ((idelay_ceil-delay/deltat) * weight) 

726 k = 0 

727 

728 out[ilo+1:ihi-k] += data[:data.size-k] * \ 

729 ((delay/deltat-idelay_floor) * weight) 

730 k = 1 

731 if ilo >= 0: 

732 out[ilo] += gf_trace.begin_value * \ 

733 ((delay/deltat-idelay_floor) * weight) 

734 k = 0 

735 

736 out[ilo+k:ihi-1] += data[k:] * \ 

737 ((idelay_ceil-delay/deltat) * weight) 

738 

739 if ilo > 0 and gf_trace.begin_value != 0.0: 

740 out[:ilo] += gf_trace.begin_value * weight 

741 

742 if ihi < nsamples and gf_trace.end_value != 0.0: 

743 out[ihi:] += gf_trace.end_value * weight 

744 

745 return GFTrace(out, itmin, deltat) 

746 

747 def _sum_impl_reference(self, irecords, delays, weights, itmin, nsamples, 

748 decimate): 

749 

750 if not self._f_index: 

751 self.open() 

752 

753 deltat = self.get_deltat() * decimate 

754 

755 if len(irecords) == 0: 

756 if None in (itmin, nsamples): 

757 return Zero 

758 else: 

759 return GFTrace( 

760 num.zeros(nsamples, dtype=gf_dtype), itmin, 

761 deltat, is_zero=True) 

762 

763 datas = [] 

764 itmins = [] 

765 for i, delay, weight in zip(irecords, delays, weights): 

766 tr = self._get(i, None, None, decimate, 'reference') 

767 

768 idelay_floor = int(math.floor(delay/deltat)) 

769 idelay_ceil = int(math.ceil(delay/deltat)) 

770 

771 if idelay_floor == idelay_ceil: 

772 itmins.append(tr.itmin + idelay_floor) 

773 datas.append(tr.data.copy()*weight) 

774 else: 

775 itmins.append(tr.itmin + idelay_floor) 

776 datas.append(tr.data.copy()*weight*(idelay_ceil-delay/deltat)) 

777 itmins.append(tr.itmin + idelay_ceil) 

778 datas.append(tr.data.copy()*weight*(delay/deltat-idelay_floor)) 

779 

780 itmin_all = min(itmins) 

781 

782 itmax_all = max(itmin_ + data.size for (itmin_, data) in 

783 zip(itmins, datas)) 

784 

785 if itmin is not None: 

786 itmin_all = min(itmin_all, itmin) 

787 if nsamples is not None: 

788 itmax_all = max(itmax_all, itmin+nsamples) 

789 

790 nsamples_all = itmax_all - itmin_all 

791 

792 arr = num.zeros((len(datas), nsamples_all), dtype=gf_dtype) 

793 for i, itmin_, data in zip(num.arange(len(datas)), itmins, datas): 

794 if data.size > 0: 

795 ilo = itmin_-itmin_all 

796 ihi = ilo + data.size 

797 arr[i, :ilo] = data[0] 

798 arr[i, ilo:ihi] = data 

799 arr[i, ihi:] = data[-1] 

800 

801 sum_arr = arr.sum(axis=0) 

802 

803 if itmin is not None and nsamples is not None: 

804 ilo = itmin-itmin_all 

805 ihi = ilo + nsamples 

806 sum_arr = sum_arr[ilo:ihi] 

807 

808 else: 

809 itmin = itmin_all 

810 

811 return GFTrace(sum_arr, itmin, deltat) 

812 

813 def _optimize(self, irecords, delays, weights): 

814 if num.unique(irecords).size == irecords.size: 

815 return irecords, delays, weights 

816 

817 deltat = self.get_deltat() 

818 

819 delays = delays / deltat 

820 irecords2 = num.repeat(irecords, 2) 

821 delays2 = num.empty(irecords2.size, dtype=float) 

822 delays2[0::2] = num.floor(delays) 

823 delays2[1::2] = num.ceil(delays) 

824 weights2 = num.repeat(weights, 2) 

825 weights2[0::2] *= 1.0 - (delays - delays2[0::2]) 

826 weights2[1::2] *= (1.0 - (delays2[1::2] - delays)) * \ 

827 (delays2[1::2] - delays2[0::2]) 

828 

829 delays2 *= deltat 

830 

831 iorder = num.lexsort((delays2, irecords2)) 

832 

833 irecords2 = irecords2[iorder] 

834 delays2 = delays2[iorder] 

835 weights2 = weights2[iorder] 

836 

837 ui = num.empty(irecords2.size, dtype=num.bool) 

838 ui[1:] = num.logical_or(num.diff(irecords2) != 0, 

839 num.diff(delays2) != 0.) 

840 

841 ui[0] = 0 

842 ind2 = num.cumsum(ui) 

843 ui[0] = 1 

844 ind1 = num.where(ui)[0] 

845 

846 irecords3 = irecords2[ind1] 

847 delays3 = delays2[ind1] 

848 weights3 = num.bincount(ind2, weights2) 

849 

850 return irecords3, delays3, weights3 

851 

852 def _sum(self, irecords, delays, weights, itmin, nsamples, decimate, 

853 implementation, optimization): 

854 

855 if not self._f_index: 

856 self.open() 

857 

858 t0 = time.time() 

859 if optimization == 'enable': 

860 irecords, delays, weights = self._optimize( 

861 irecords, delays, weights) 

862 else: 

863 assert optimization == 'disable' 

864 

865 t1 = time.time() 

866 deltat = self.get_deltat() 

867 

868 if implementation == 'c' and decimate == 1: 

869 if delays.size != 0: 

870 itoffset = int(num.floor(num.min(delays)/deltat)) 

871 else: 

872 itoffset = 0 

873 

874 if nsamples is None: 

875 nsamples = -1 

876 

877 if itmin is None: 

878 itmin = 0 

879 else: 

880 itmin -= itoffset 

881 

882 try: 

883 tr = GFTrace(*store_ext.store_sum( 

884 self.cstore, irecords.astype(num.uint64), 

885 delays - itoffset*deltat, 

886 weights.astype(num.float32), 

887 int(itmin), int(nsamples))) 

888 

889 tr.itmin += itoffset 

890 

891 except store_ext.StoreExtError as e: 

892 raise StoreError(str(e) + ' in store %s' % self.store_dir) 

893 

894 elif implementation == 'alternative': 

895 tr = self._sum_impl_alternative(irecords, delays, weights, 

896 itmin, nsamples, decimate) 

897 

898 else: 

899 tr = self._sum_impl_reference(irecords, delays, weights, 

900 itmin, nsamples, decimate) 

901 

902 t2 = time.time() 

903 

904 tr.n_records_stacked = irecords.size 

905 tr.t_optimize = t1 - t0 

906 tr.t_stack = t2 - t1 

907 

908 return tr 

909 

910 def _sum_statics(self, irecords, delays, weights, it, ntargets, 

911 nthreads): 

912 if not self._f_index: 

913 self.open() 

914 

915 return store_ext.store_sum_static( 

916 self.cstore, irecords, delays, weights, it, ntargets, nthreads) 

917 

918 def _load_index(self): 

919 if self._use_memmap: 

920 records = num.memmap( 

921 self._f_index, dtype=gf_record_dtype, 

922 offset=gf_store_header_fmt_size, 

923 mode=('r', 'r+')[self.mode == 'w']) 

924 

925 else: 

926 self._f_index.seek(gf_store_header_fmt_size) 

927 records = num.fromfile(self._f_index, dtype=gf_record_dtype) 

928 

929 assert len(records) == self._nrecords 

930 

931 self._records = records 

932 

933 def _save_index(self): 

934 self._f_index.seek(0) 

935 self._f_index.write(struct.pack(gf_store_header_fmt, self._nrecords, 

936 self.get_deltat())) 

937 

938 if self._use_memmap: 

939 self._records.flush() 

940 else: 

941 self._f_index.seek(gf_store_header_fmt_size) 

942 self._records.tofile(self._f_index) 

943 self._f_index.flush() 

944 

945 def _get_data(self, ipos, begin_value, end_value, ilo, ihi): 

946 if ihi - ilo > 0: 

947 if ipos == 2: 

948 data_orig = num.empty(2, dtype=gf_dtype) 

949 data_orig[0] = begin_value 

950 data_orig[1] = end_value 

951 return data_orig[ilo:ihi] 

952 else: 

953 self._f_data.seek( 

954 int(ipos + ilo*gf_dtype_nbytes_per_sample)) 

955 arr = num.fromfile( 

956 self._f_data, gf_dtype_store, ihi-ilo).astype(gf_dtype) 

957 

958 if arr.size != ihi-ilo: 

959 raise ShortRead() 

960 return arr 

961 else: 

962 return num.empty((0,), dtype=gf_dtype) 

963 

964 def lock_fn(self): 

965 return BaseStore.lock_fn_(self.store_dir) 

966 

967 def index_fn(self): 

968 return BaseStore.index_fn_(self.store_dir) 

969 

970 def data_fn(self): 

971 return BaseStore.data_fn_(self.store_dir) 

972 

973 def config_fn(self): 

974 return BaseStore.config_fn_(self.store_dir) 

975 

976 def count_special_records(self): 

977 if not self._f_index: 

978 self.open() 

979 

980 return num.histogram(self._records['data_offset'], 

981 bins=[0, 1, 2, 3, num.uint64(-1)])[0] 

982 

983 @property 

984 def size_index(self): 

985 return os.stat(self.index_fn()).st_size 

986 

987 @property 

988 def size_data(self): 

989 return os.stat(self.data_fn()).st_size 

990 

991 @property 

992 def size_index_and_data(self): 

993 return self.size_index + self.size_data 

994 

995 @property 

996 def size_index_and_data_human(self): 

997 return util.human_bytesize(self.size_index_and_data) 

998 

999 def stats(self): 

1000 counter = self.count_special_records() 

1001 

1002 stats = dict( 

1003 total=self._nrecords, 

1004 inserted=(counter[1] + counter[2] + counter[3]), 

1005 empty=counter[0], 

1006 short=counter[2], 

1007 zero=counter[1], 

1008 size_data=self.size_data, 

1009 size_index=self.size_index, 

1010 ) 

1011 

1012 return stats 

1013 

1014 stats_keys = 'total inserted empty short zero size_data size_index'.split() 

1015 

1016 

1017def remake_dir(dpath, force): 

1018 if os.path.exists(dpath): 

1019 if force: 

1020 shutil.rmtree(dpath) 

1021 else: 

1022 raise CannotCreate('Directory "%s" already exists.' % dpath) 

1023 

1024 os.mkdir(dpath) 

1025 

1026 

1027class MakeTimingParamsFailed(StoreError): 

1028 pass 

1029 

1030 

1031class Store(BaseStore): 

1032 

1033 ''' 

1034 Green's function disk storage and summation machine. 

1035 

1036 The `Store` can be used to efficiently store, retrieve, and sum Green's 

1037 function traces. A `Store` contains many 1D time traces sampled at even 

1038 multiples of a global sampling rate, where each time trace has an 

1039 individual start and end time. The traces are treated as having repeating 

1040 end points, so the functions they represent can be non-constant only 

1041 between begin and end time. It provides capabilities to retrieve decimated 

1042 traces and to extract parts of the traces. The main purpose of this class 

1043 is to provide a fast, easy to use, and flexible machanism to compute 

1044 weighted delay-and-sum stacks with many Green's function traces involved. 

1045 

1046 Individual Green's functions are accessed through a single integer index at 

1047 low level. On top of that, various indexing schemes can be implemented by 

1048 providing a mapping from physical coordinates to the low level index `i`. 

1049 E.g. for a problem with cylindrical symmetry, one might define a mapping 

1050 from the coordinates (`receiver_depth`, `source_depth`, `distance`) to the 

1051 low level index. Index translation is done in the 

1052 :py:class:`pyrocko.gf.meta.Config` subclass object associated with the 

1053 Store. It is accessible through the store's :py:attr:`config` attribute, 

1054 and contains all meta information about the store, including physical 

1055 extent, geometry, sampling rate, and information about the type of the 

1056 stored Green's functions. Information about the underlying earth model 

1057 can also be found there. 

1058 

1059 A GF store can also contain tabulated phase arrivals. In basic cases, these 

1060 can be created with the :py:meth:`make_ttt` and evaluated with the 

1061 :py:func:`t` methods. 

1062 

1063 .. attribute:: config 

1064 

1065 The :py:class:`pyrocko.gf.meta.Config` derived object associated with 

1066 the store which contains all meta information about the store and 

1067 provides the high-level to low-level index mapping. 

1068 

1069 .. attribute:: store_dir 

1070 

1071 Path to the store's data directory. 

1072 

1073 .. attribute:: mode 

1074 

1075 The mode in which the store is opened: ``'r'``: read-only, ``'w'``: 

1076 writeable. 

1077 ''' 

1078 

1079 @classmethod 

1080 def create(cls, store_dir, config, force=False, extra=None): 

1081 ''' 

1082 Create new GF store. 

1083 

1084 Creates a new GF store at path ``store_dir``. The layout of the GF is 

1085 defined with the parameters given in ``config``, which should be an 

1086 object of a subclass of :py:class:`~pyrocko.gf.meta.Config`. This 

1087 function will refuse to overwrite an existing GF store, unless 

1088 ``force`` is set to ``True``. If more information, e.g. parameters 

1089 used for the modelling code, earth models or other, should be saved 

1090 along with the GF store, these may be provided though a dict given to 

1091 ``extra``. The keys of this dict must be names and the values must be 

1092 *guts* type objects. 

1093 

1094 :param store_dir: GF store path 

1095 :type store_dir: str 

1096 :param config: GF store Config 

1097 :type config: :py:class:`~pyrocko.gf.meta.Config` 

1098 :param force: Force overwrite, defaults to ``False`` 

1099 :type force: bool 

1100 :param extra: Extra information 

1101 :type extra: dict or None 

1102 ''' 

1103 

1104 cls.create_editables(store_dir, config, force=force, extra=extra) 

1105 cls.create_dependants(store_dir, force=force) 

1106 

1107 return cls(store_dir) 

1108 

1109 @staticmethod 

1110 def create_editables(store_dir, config, force=False, extra=None): 

1111 try: 

1112 util.ensuredir(store_dir) 

1113 except Exception: 

1114 raise CannotCreate('cannot create directory %s' % store_dir) 

1115 

1116 fns = [] 

1117 

1118 config_fn = os.path.join(store_dir, 'config') 

1119 remove_if_exists(config_fn, force) 

1120 meta.dump(config, filename=config_fn) 

1121 

1122 fns.append(config_fn) 

1123 

1124 for sub_dir in ['extra']: 

1125 dpath = os.path.join(store_dir, sub_dir) 

1126 remake_dir(dpath, force) 

1127 

1128 if extra: 

1129 for k, v in extra.items(): 

1130 fn = get_extra_path(store_dir, k) 

1131 remove_if_exists(fn, force) 

1132 meta.dump(v, filename=fn) 

1133 

1134 fns.append(fn) 

1135 

1136 return fns 

1137 

1138 @staticmethod 

1139 def create_dependants(store_dir, force=False): 

1140 config_fn = os.path.join(store_dir, 'config') 

1141 config = meta.load(filename=config_fn) 

1142 

1143 BaseStore.create(store_dir, config.deltat, config.nrecords, 

1144 force=force) 

1145 

1146 for sub_dir in ['decimated']: 

1147 dpath = os.path.join(store_dir, sub_dir) 

1148 remake_dir(dpath, force) 

1149 

1150 def __init__(self, store_dir, mode='r', use_memmap=True): 

1151 BaseStore.__init__(self, store_dir, mode=mode, use_memmap=use_memmap) 

1152 config_fn = self.config_fn() 

1153 if not os.path.isfile(config_fn): 

1154 raise StoreError( 

1155 'directory "%s" does not seem to contain a GF store ' 

1156 '("config" file not found)' % store_dir) 

1157 self.load_config() 

1158 

1159 self._decimated = {} 

1160 self._extra = {} 

1161 self._phases = {} 

1162 for decimate in range(2, 9): 

1163 if os.path.isdir(self._decimated_store_dir(decimate)): 

1164 self._decimated[decimate] = None 

1165 

1166 def open(self): 

1167 if not self._f_index: 

1168 BaseStore.open(self) 

1169 c = self.config 

1170 

1171 mscheme = 'type_' + c.short_type.lower() 

1172 store_ext.store_mapping_init( 

1173 self.cstore, mscheme, 

1174 c.mins, c.maxs, c.deltas, c.ns.astype(num.uint64), 

1175 c.ncomponents) 

1176 

1177 def save_config(self, make_backup=False): 

1178 config_fn = self.config_fn() 

1179 if make_backup: 

1180 os.rename(config_fn, config_fn + '~') 

1181 

1182 meta.dump(self.config, filename=config_fn) 

1183 

1184 def get_deltat(self): 

1185 return self.config.deltat 

1186 

1187 def load_config(self): 

1188 self.config = meta.load(filename=self.config_fn()) 

1189 

1190 def ensure_reference(self, force=False): 

1191 if self.config.reference is not None and not force: 

1192 return 

1193 self.ensure_uuid() 

1194 reference = '%s-%s' % (self.config.id, self.config.uuid[0:6]) 

1195 

1196 if self.config.reference is not None: 

1197 self.config.reference = reference 

1198 self.save_config() 

1199 else: 

1200 with open(self.config_fn(), 'a') as config: 

1201 config.write('reference: %s\n' % reference) 

1202 self.load_config() 

1203 

1204 def ensure_uuid(self, force=False): 

1205 if self.config.uuid is not None and not force: 

1206 return 

1207 uuid = self.create_store_hash() 

1208 

1209 if self.config.uuid is not None: 

1210 self.config.uuid = uuid 

1211 self.save_config() 

1212 else: 

1213 with open(self.config_fn(), 'a') as config: 

1214 config.write('\nuuid: %s\n' % uuid) 

1215 self.load_config() 

1216 

1217 def create_store_hash(self): 

1218 logger.info('creating hash for store ...') 

1219 m = hashlib.sha1() 

1220 

1221 traces_size = op.getsize(self.data_fn()) 

1222 with open(self.data_fn(), 'rb') as traces: 

1223 while traces.tell() < traces_size: 

1224 m.update(traces.read(4096)) 

1225 traces.seek(1024 * 1024 * 10, 1) 

1226 

1227 with open(self.config_fn(), 'rb') as config: 

1228 m.update(config.read()) 

1229 return m.hexdigest() 

1230 

1231 def get_extra_path(self, key): 

1232 return get_extra_path(self.store_dir, key) 

1233 

1234 def get_extra(self, key): 

1235 ''' 

1236 Get extra information stored under given key. 

1237 ''' 

1238 

1239 x = self._extra 

1240 if key not in x: 

1241 fn = self.get_extra_path(key) 

1242 if not os.path.exists(fn): 

1243 raise NoSuchExtra(key) 

1244 

1245 x[key] = meta.load(filename=fn) 

1246 

1247 return x[key] 

1248 

1249 def upgrade(self): 

1250 ''' 

1251 Upgrade store config and files to latest Pyrocko version. 

1252 ''' 

1253 fns = [os.path.join(self.store_dir, 'config')] 

1254 for key in self.extra_keys(): 

1255 fns.append(self.get_extra_path(key)) 

1256 

1257 i = 0 

1258 for fn in fns: 

1259 i += util.pf_upgrade(fn) 

1260 

1261 return i 

1262 

1263 def extra_keys(self): 

1264 return [x for x in os.listdir(os.path.join(self.store_dir, 'extra')) 

1265 if valid_string_id(x)] 

1266 

1267 def put(self, args, trace): 

1268 ''' 

1269 Insert trace into GF store. 

1270 

1271 Store a single GF trace at (high-level) index ``args``. 

1272 

1273 :param args: :py:class:`~pyrocko.gf.meta.Config` index tuple, e.g. 

1274 ``(source_depth, distance, component)`` as in 

1275 :py:class:`~pyrocko.gf.meta.ConfigTypeA`. 

1276 :type args: tuple 

1277 :returns: GF trace at ``args`` 

1278 :rtype: :py:class:`~pyrocko.gf.store.GFTrace` 

1279 ''' 

1280 

1281 irecord = self.config.irecord(*args) 

1282 self._put(irecord, trace) 

1283 

1284 def get_record(self, args): 

1285 irecord = self.config.irecord(*args) 

1286 return self._get_record(irecord) 

1287 

1288 def str_irecord(self, args): 

1289 return BaseStore.str_irecord(self, self.config.irecord(*args)) 

1290 

1291 def get(self, args, itmin=None, nsamples=None, decimate=1, 

1292 interpolation='nearest_neighbor', implementation='c'): 

1293 ''' 

1294 Retrieve GF trace from store. 

1295 

1296 Retrieve a single GF trace from the store at (high-level) index 

1297 ``args``. By default, the full trace is retrieved. Given ``itmin`` and 

1298 ``nsamples``, only the selected portion of the trace is extracted. If 

1299 ``decimate`` is an integer in the range [2,8], the trace is decimated 

1300 on the fly or, if available, the trace is read from a decimated version 

1301 of the GF store. 

1302 

1303 :param args: :py:class:`~pyrocko.gf.meta.Config` index tuple, e.g. 

1304 ``(source_depth, distance, component)`` as in 

1305 :py:class:`~pyrocko.gf.meta.ConfigTypeA`. 

1306 :type args: tuple 

1307 :param itmin: Start time index (start time is ``itmin * dt``), 

1308 defaults to None 

1309 :type itmin: int or None 

1310 :param nsamples: Number of samples, defaults to None 

1311 :type nsamples: int or None 

1312 :param decimate: Decimatation factor, defaults to 1 

1313 :type decimate: int 

1314 :param interpolation: Interpolation method 

1315 ``['nearest_neighbor', 'multilinear', 'off']``, defaults to 

1316 ``'nearest_neighbor'`` 

1317 :type interpolation: str 

1318 :param implementation: Implementation to use ``['c', 'reference']``, 

1319 defaults to ``'c'``. 

1320 :type implementation: str 

1321 

1322 :returns: GF trace at ``args`` 

1323 :rtype: :py:class:`~pyrocko.gf.store.GFTrace` 

1324 ''' 

1325 

1326 store, decimate = self._decimated_store(decimate) 

1327 if interpolation == 'nearest_neighbor': 

1328 irecord = store.config.irecord(*args) 

1329 tr = store._get(irecord, itmin, nsamples, decimate, 

1330 implementation) 

1331 

1332 elif interpolation == 'off': 

1333 irecords, weights = store.config.vicinity(*args) 

1334 if len(irecords) != 1: 

1335 raise NotAllowedToInterpolate() 

1336 else: 

1337 tr = store._get(irecords[0], itmin, nsamples, decimate, 

1338 implementation) 

1339 

1340 elif interpolation == 'multilinear': 

1341 tr = store._sum(irecords, num.zeros(len(irecords)), weights, 

1342 itmin, nsamples, decimate, implementation, 

1343 'disable') 

1344 

1345 # to prevent problems with rounding errors (BaseStore saves deltat 

1346 # as a 4-byte floating point value, value from YAML config is more 

1347 # accurate) 

1348 tr.deltat = self.config.deltat * decimate 

1349 return tr 

1350 

1351 def sum(self, args, delays, weights, itmin=None, nsamples=None, 

1352 decimate=1, interpolation='nearest_neighbor', implementation='c', 

1353 optimization='enable'): 

1354 ''' 

1355 Sum delayed and weighted GF traces. 

1356 

1357 Calculate sum of delayed and weighted GF traces. ``args`` is a tuple of 

1358 arrays forming the (high-level) indices of the GF traces to be 

1359 selected. Delays and weights for the summation are given in the arrays 

1360 ``delays`` and ``weights``. ``itmin`` and ``nsamples`` can be given to 

1361 restrict to computation to a given time interval. If ``decimate`` is 

1362 an integer in the range [2,8], decimated traces are used in the 

1363 summation. 

1364 

1365 :param args: :py:class:`~pyrocko.gf.meta.Config` index tuple, e.g. 

1366 ``(source_depth, distance, component)`` as in 

1367 :py:class:`~pyrocko.gf.meta.ConfigTypeA`. 

1368 :type args: tuple(numpy.ndarray) 

1369 :param delays: Delay times 

1370 :type delays: :py:class:`numpy.ndarray` 

1371 :param weights: Trace weights 

1372 :type weights: :py:class:`numpy.ndarray` 

1373 :param itmin: Start time index (start time is ``itmin * dt``), 

1374 defaults to None 

1375 :type itmin: int or None 

1376 :param nsamples: Number of samples, defaults to None 

1377 :type nsamples: int or None 

1378 :param decimate: Decimatation factor, defaults to 1 

1379 :type decimate: int 

1380 :param interpolation: Interpolation method 

1381 ``['nearest_neighbor', 'multilinear', 'off']``, defaults to 

1382 ``'nearest_neighbor'`` 

1383 :type interpolation: str 

1384 :param implementation: Implementation to use, 

1385 ``['c', 'alternative', 'reference']``, where ``'alternative'`` 

1386 and ``'reference'`` use a Python implementation, defaults to `'c'` 

1387 :type implementation: str 

1388 :param optimization: Optimization mode ``['enable', 'disable']``, 

1389 defaults to ``'enable'`` 

1390 :type optimization: str 

1391 :returns: Stacked GF trace. 

1392 :rtype: :py:class:`~pyrocko.gf.store.GFTrace` 

1393 ''' 

1394 

1395 store, decimate_ = self._decimated_store(decimate) 

1396 

1397 if interpolation == 'nearest_neighbor': 

1398 irecords = store.config.irecords(*args) 

1399 else: 

1400 assert interpolation == 'multilinear' 

1401 irecords, ip_weights = store.config.vicinities(*args) 

1402 neach = irecords.size // args[0].size 

1403 weights = num.repeat(weights, neach) * ip_weights 

1404 delays = num.repeat(delays, neach) 

1405 

1406 tr = store._sum(irecords, delays, weights, itmin, nsamples, decimate_, 

1407 implementation, optimization) 

1408 

1409 # to prevent problems with rounding errors (BaseStore saves deltat 

1410 # as a 4-byte floating point value, value from YAML config is more 

1411 # accurate) 

1412 tr.deltat = self.config.deltat * decimate 

1413 return tr 

1414 

1415 def make_decimated(self, decimate, config=None, force=False, 

1416 show_progress=False): 

1417 ''' 

1418 Create decimated version of GF store. 

1419 

1420 Create a downsampled version of the GF store. Downsampling is done for 

1421 the integer factor ``decimate`` which should be in the range [2,8]. If 

1422 ``config`` is ``None``, all traces of the GF store are decimated and 

1423 held available (i.e. the index mapping of the original store is used), 

1424 otherwise, a different spacial stepping can be specified by giving a 

1425 modified GF store configuration in ``config`` (see :py:meth:`create`). 

1426 Decimated GF sub-stores are created under the ``decimated`` 

1427 subdirectory within the GF store directory. Holding available decimated 

1428 versions of the GF store can save computation time, IO bandwidth, or 

1429 decrease memory footprint at the cost of increased disk space usage, 

1430 when computation are done for lower frequency signals. 

1431 

1432 :param decimate: Decimate factor 

1433 :type decimate: int 

1434 :param config: GF store config object, defaults to None 

1435 :type config: :py:class:`~pyrocko.gf.meta.Config` or None 

1436 :param force: Force overwrite, defaults to ``False`` 

1437 :type force: bool 

1438 :param show_progress: Show progress, defaults to ``False`` 

1439 :type show_progress: bool 

1440 ''' 

1441 

1442 if not self._f_index: 

1443 self.open() 

1444 

1445 if not (2 <= decimate <= 8): 

1446 raise StoreError('decimate argument must be in the range [2,8]') 

1447 

1448 assert self.mode == 'r' 

1449 

1450 if config is None: 

1451 config = self.config 

1452 

1453 config = copy.deepcopy(config) 

1454 config.sample_rate = self.config.sample_rate / decimate 

1455 

1456 if decimate in self._decimated: 

1457 del self._decimated[decimate] 

1458 

1459 store_dir = self._decimated_store_dir(decimate) 

1460 if os.path.exists(store_dir): 

1461 if force: 

1462 shutil.rmtree(store_dir) 

1463 else: 

1464 raise CannotCreate('store already exists at %s' % store_dir) 

1465 

1466 store_dir_incomplete = store_dir + '-incomplete' 

1467 Store.create(store_dir_incomplete, config, force=force) 

1468 

1469 decimated = Store(store_dir_incomplete, 'w') 

1470 if show_progress: 

1471 pbar = util.progressbar('decimating store', self.config.nrecords) 

1472 

1473 for i, args in enumerate(decimated.config.iter_nodes()): 

1474 tr = self.get(args, decimate=decimate) 

1475 decimated.put(args, tr) 

1476 

1477 if show_progress: 

1478 pbar.update(i+1) 

1479 

1480 if show_progress: 

1481 pbar.finish() 

1482 

1483 decimated.close() 

1484 

1485 shutil.move(store_dir_incomplete, store_dir) 

1486 

1487 self._decimated[decimate] = None 

1488 

1489 def stats(self): 

1490 stats = BaseStore.stats(self) 

1491 stats['decimated'] = sorted(self._decimated.keys()) 

1492 return stats 

1493 

1494 stats_keys = BaseStore.stats_keys + ['decimated'] 

1495 

1496 def check(self, show_progress=False): 

1497 have_holes = [] 

1498 for pdef in self.config.tabulated_phases: 

1499 phase_id = pdef.id 

1500 ph = self.get_stored_phase(phase_id) 

1501 if ph.check_holes(): 

1502 have_holes.append(phase_id) 

1503 

1504 if have_holes: 

1505 for phase_id in have_holes: 

1506 logger.warning( 

1507 'Travel time table of phase "{}" contains holes. You can ' 

1508 ' use `fomosto tttlsd` to fix holes.'.format( 

1509 phase_id)) 

1510 else: 

1511 logger.info('No holes in travel time tables') 

1512 

1513 if show_progress: 

1514 pbar = util.progressbar('checking store', self.config.nrecords) 

1515 

1516 problems = 0 

1517 for i, args in enumerate(self.config.iter_nodes()): 

1518 tr = self.get(args) 

1519 if tr and not tr.is_zero: 

1520 if not tr.begin_value == tr.data[0]: 

1521 logger.warning('wrong begin value for trace at %s ' 

1522 '(data corruption?)' % str(args)) 

1523 problems += 1 

1524 if not tr.end_value == tr.data[-1]: 

1525 logger.warning('wrong end value for trace at %s ' 

1526 '(data corruption?)' % str(args)) 

1527 problems += 1 

1528 if not num.all(num.isfinite(tr.data)): 

1529 logger.warning('nans or infs in trace at %s' % str(args)) 

1530 problems += 1 

1531 

1532 if show_progress: 

1533 pbar.update(i+1) 

1534 

1535 if show_progress: 

1536 pbar.finish() 

1537 

1538 return problems 

1539 

1540 def check_earthmodels(self, config): 

1541 if config.earthmodel_receiver_1d.profile('z')[-1] not in\ 

1542 config.earthmodel_1d.profile('z'): 

1543 logger.warning('deepest layer of earthmodel_receiver_1d not ' 

1544 'found in earthmodel_1d') 

1545 

1546 def _decimated_store_dir(self, decimate): 

1547 return os.path.join(self.store_dir, 'decimated', str(decimate)) 

1548 

1549 def _decimated_store(self, decimate): 

1550 if decimate == 1 or decimate not in self._decimated: 

1551 return self, decimate 

1552 else: 

1553 store = self._decimated[decimate] 

1554 if store is None: 

1555 store = Store(self._decimated_store_dir(decimate), 'r') 

1556 self._decimated[decimate] = store 

1557 

1558 return store, 1 

1559 

1560 def phase_filename(self, phase_id): 

1561 check_string_id(phase_id) 

1562 return os.path.join(self.store_dir, 'phases', phase_id + '.phase') 

1563 

1564 def get_stored_phase(self, phase_id): 

1565 ''' 

1566 Get stored phase from GF store. 

1567 

1568 :returns: Phase information 

1569 :rtype: :py:class:`pyrocko.spit.SPTree` 

1570 ''' 

1571 

1572 if phase_id not in self._phases: 

1573 fn = self.phase_filename(phase_id) 

1574 if not os.path.isfile(fn): 

1575 raise NoSuchPhase(phase_id) 

1576 

1577 spt = spit.SPTree(filename=fn) 

1578 self._phases[phase_id] = spt 

1579 

1580 return self._phases[phase_id] 

1581 

1582 def get_phase(self, phase_def): 

1583 toks = phase_def.split(':', 1) 

1584 if len(toks) == 2: 

1585 provider, phase_def = toks 

1586 else: 

1587 provider, phase_def = 'stored', toks[0] 

1588 

1589 if provider == 'stored': 

1590 return self.get_stored_phase(phase_def) 

1591 

1592 elif provider == 'vel': 

1593 vel = float(phase_def) * 1000. 

1594 

1595 def evaluate(args): 

1596 return self.config.get_distance(args) / vel 

1597 

1598 return evaluate 

1599 

1600 elif provider == 'vel_surface': 

1601 vel = float(phase_def) * 1000. 

1602 

1603 def evaluate(args): 

1604 return self.config.get_surface_distance(args) / vel 

1605 

1606 return evaluate 

1607 

1608 elif provider in ('cake', 'iaspei'): 

1609 from pyrocko import cake 

1610 mod = self.config.earthmodel_1d 

1611 if provider == 'cake': 

1612 phases = [cake.PhaseDef(phase_def)] 

1613 else: 

1614 phases = cake.PhaseDef.classic(phase_def) 

1615 

1616 def evaluate(args): 

1617 if len(args) == 2: 

1618 zr, zs, x = (self.config.receiver_depth,) + args 

1619 elif len(args) == 3: 

1620 zr, zs, x = args 

1621 else: 

1622 assert False 

1623 

1624 t = [] 

1625 if phases: 

1626 rays = mod.arrivals( 

1627 phases=phases, 

1628 distances=[x*cake.m2d], 

1629 zstart=zs, 

1630 zstop=zr) 

1631 

1632 for ray in rays: 

1633 t.append(ray.t) 

1634 

1635 if t: 

1636 return min(t) 

1637 else: 

1638 return None 

1639 

1640 return evaluate 

1641 

1642 raise StoreError('unsupported phase provider: %s' % provider) 

1643 

1644 def t(self, timing, *args): 

1645 ''' 

1646 Compute interpolated phase arrivals. 

1647 

1648 **Examples:** 

1649 

1650 If ``test_store`` is of :py:class:`~pyrocko.gf.meta.ConfigTypeA`:: 

1651 

1652 test_store.t('p', (1000, 10000)) 

1653 test_store.t('last{P|Pdiff}', (1000, 10000)) # The latter arrival 

1654 # of P or diffracted 

1655 # P phase 

1656 

1657 If ``test_store`` is of :py:class:`~pyrocko.gf.meta.ConfigTypeB`:: 

1658 

1659 test_store.t('S', (1000, 1000, 10000)) 

1660 test_store.t('first{P|p|Pdiff|sP}', (1000, 1000, 10000)) # The 

1661 ` # first arrival of 

1662 # the given phases is 

1663 # selected 

1664 

1665 :param timing: Timing string as described above 

1666 :type timing: str or :py:class:`~pyrocko.gf.meta.Timing` 

1667 :param \\*args: :py:class:`~pyrocko.gf.meta.Config` index tuple, e.g. 

1668 ``(source_depth, distance, component)`` as in 

1669 :py:class:`~pyrocko.gf.meta.ConfigTypeA`. 

1670 :type \\*args: tuple 

1671 :returns: Phase arrival according to ``timing`` 

1672 :rtype: float or None 

1673 ''' 

1674 

1675 if len(args) == 1: 

1676 args = args[0] 

1677 else: 

1678 args = self.config.make_indexing_args1(*args) 

1679 

1680 if not isinstance(timing, meta.Timing): 

1681 timing = meta.Timing(timing) 

1682 

1683 return timing.evaluate(self.get_phase, args) 

1684 

1685 def make_timing_params(self, begin, end, snap_vred=True, force=False): 

1686 

1687 ''' 

1688 Compute tight parameterized time ranges to include given timings. 

1689 

1690 Calculates appropriate time ranges to cover given begin and end timings 

1691 over all GF points in the store. A dict with the following keys is 

1692 returned: 

1693 

1694 * ``'tmin'``: time [s], minimum of begin timing over all GF points 

1695 * ``'tmax'``: time [s], maximum of end timing over all GF points 

1696 * ``'vred'``, ``'tmin_vred'``: slope [m/s] and offset [s] of reduction 

1697 velocity [m/s] appropriate to catch begin timing over all GF points 

1698 * ``'tlenmax_vred'``: maximum time length needed to cover all end 

1699 timings, when using linear slope given with (``vred``, ``tmin_vred``) 

1700 as start 

1701 ''' 

1702 

1703 data = [] 

1704 warned = set() 

1705 for args in self.config.iter_nodes(level=-1): 

1706 tmin = self.t(begin, args) 

1707 tmax = self.t(end, args) 

1708 if tmin is None: 

1709 warned.add(str(begin)) 

1710 if tmax is None: 

1711 warned.add(str(end)) 

1712 

1713 x = self.config.get_surface_distance(args) 

1714 data.append((x, tmin, tmax)) 

1715 

1716 if len(warned): 

1717 w = ' | '.join(list(warned)) 

1718 msg = '''determination of time window failed using phase 

1719definitions: %s.\n Travel time table contains holes in probed ranges. You can 

1720use `fomosto tttlsd` to fix holes.''' % w 

1721 if force: 

1722 logger.warning(msg) 

1723 else: 

1724 raise MakeTimingParamsFailed(msg) 

1725 

1726 xs, tmins, tmaxs = num.array(data, dtype=float).T 

1727 

1728 tlens = tmaxs - tmins 

1729 

1730 i = num.nanargmin(tmins) 

1731 if not num.isfinite(i): 

1732 raise MakeTimingParamsFailed('determination of time window failed') 

1733 

1734 tminmin = tmins[i] 

1735 x_tminmin = xs[i] 

1736 dx = (xs - x_tminmin) 

1737 dx = num.where(dx != 0.0, dx, num.nan) 

1738 s = (tmins - tminmin) / dx 

1739 sred = num.min(num.abs(s[num.isfinite(s)])) 

1740 

1741 deltax = self.config.distance_delta 

1742 

1743 if snap_vred: 

1744 tdif = sred*deltax 

1745 tdif2 = self.config.deltat * math.floor(tdif / self.config.deltat) 

1746 sred = tdif2/self.config.distance_delta 

1747 

1748 tmin_vred = tminmin - sred*x_tminmin 

1749 if snap_vred: 

1750 xe = x_tminmin - int(x_tminmin / deltax) * deltax 

1751 tmin_vred = float( 

1752 self.config.deltat * 

1753 math.floor(tmin_vred / self.config.deltat) - xe * sred) 

1754 

1755 tlenmax_vred = num.nanmax(tmax - (tmin_vred + sred*x)) 

1756 if sred != 0.0: 

1757 vred = 1.0/sred 

1758 else: 

1759 vred = 0.0 

1760 

1761 return dict( 

1762 tmin=tminmin, 

1763 tmax=num.nanmax(tmaxs), 

1764 tlenmax=num.nanmax(tlens), 

1765 tmin_vred=tmin_vred, 

1766 tlenmax_vred=tlenmax_vred, 

1767 vred=vred) 

1768 

1769 def make_ttt(self, force=False): 

1770 ''' 

1771 Compute travel time tables. 

1772 

1773 Travel time tables are computed using the 1D earth model defined in 

1774 :py:attr:`~pyrocko.gf.meta.Config.earthmodel_1d` for each defined phase 

1775 in :py:attr:`~pyrocko.gf.meta.Config.tabulated_phases`. The accuracy of 

1776 the tablulated times is adjusted to the sampling rate of the store. 

1777 ''' 

1778 

1779 from pyrocko import cake 

1780 config = self.config 

1781 

1782 if not config.tabulated_phases: 

1783 return 

1784 

1785 mod = config.earthmodel_1d 

1786 

1787 if config.earthmodel_receiver_1d: 

1788 self.check_earthmodels(config) 

1789 

1790 if not mod: 

1791 raise StoreError('no earth model found') 

1792 

1793 for pdef in config.tabulated_phases: 

1794 

1795 phase_id = pdef.id 

1796 phases = pdef.phases 

1797 horvels = pdef.horizontal_velocities 

1798 

1799 fn = self.phase_filename(phase_id) 

1800 

1801 if os.path.exists(fn) and not force: 

1802 logger.info('file already exists: %s' % fn) 

1803 continue 

1804 

1805 def evaluate(args): 

1806 

1807 if len(args) == 2: 

1808 zr, zs, x = (config.receiver_depth,) + args 

1809 elif len(args) == 3: 

1810 zr, zs, x = args 

1811 else: 

1812 assert False 

1813 

1814 t = [] 

1815 if phases: 

1816 rays = mod.arrivals( 

1817 phases=phases, 

1818 distances=[x*cake.m2d], 

1819 zstart=zs, 

1820 zstop=zr) 

1821 

1822 for ray in rays: 

1823 t.append(ray.t) 

1824 

1825 for v in horvels: 

1826 t.append(x/(v*1000.)) 

1827 

1828 if t: 

1829 return min(t) 

1830 else: 

1831 return None 

1832 

1833 logger.info('making travel time table for phasegroup "%s"' % 

1834 phase_id) 

1835 

1836 ip = spit.SPTree( 

1837 f=evaluate, 

1838 ftol=config.deltat*0.5, 

1839 xbounds=num.transpose((config.mins, config.maxs)), 

1840 xtols=config.deltas) 

1841 

1842 util.ensuredirs(fn) 

1843 ip.dump(fn) 

1844 

1845 def get_provided_components(self): 

1846 

1847 scheme_desc = meta.component_scheme_to_description[ 

1848 self.config.component_scheme] 

1849 

1850 quantity = self.config.stored_quantity \ 

1851 or scheme_desc.default_stored_quantity 

1852 

1853 if not quantity: 

1854 return scheme_desc.provided_components 

1855 else: 

1856 return [ 

1857 quantity + '.' + comp 

1858 for comp in scheme_desc.provided_components] 

1859 

1860 def fix_ttt_holes(self, phase_id): 

1861 

1862 pdef = self.config.get_tabulated_phase(phase_id) 

1863 mode = None 

1864 for phase in pdef.phases: 

1865 for leg in phase.legs(): 

1866 if mode is None: 

1867 mode = leg.mode 

1868 

1869 else: 

1870 if mode != leg.mode: 

1871 raise StoreError( 

1872 'Can only fix holes in pure P or pure S phases.') 

1873 

1874 sptree = self.get_stored_phase(phase_id) 

1875 sptree_lsd = self.config.fix_ttt_holes(sptree, mode) 

1876 

1877 phase_lsd = phase_id + '.lsd' 

1878 fn = self.phase_filename(phase_lsd) 

1879 sptree_lsd.dump(fn) 

1880 

1881 def statics(self, source, multi_location, itsnapshot, components, 

1882 interpolation='nearest_neighbor', nthreads=0): 

1883 if not self._f_index: 

1884 self.open() 

1885 

1886 out = {} 

1887 ntargets = multi_location.ntargets 

1888 source_terms = source.get_source_terms(self.config.component_scheme) 

1889 # TODO: deal with delays for snapshots > 1 sample 

1890 

1891 if itsnapshot is not None: 

1892 delays = source.times 

1893 

1894 # Fringe case where we sample at sample 0 and sample 1 

1895 tsnapshot = itsnapshot * self.config.deltat 

1896 if delays.max() == tsnapshot and delays.min() != tsnapshot: 

1897 delays[delays == delays.max()] -= self.config.deltat 

1898 

1899 else: 

1900 delays = source.times * 0 

1901 itsnapshot = 1 

1902 

1903 if ntargets == 0: 

1904 raise StoreError('MultiLocation.coords5 is empty') 

1905 

1906 res = store_ext.store_calc_static( 

1907 self.cstore, 

1908 source.coords5(), 

1909 source_terms, 

1910 delays, 

1911 multi_location.coords5, 

1912 self.config.component_scheme, 

1913 interpolation, 

1914 itsnapshot, 

1915 nthreads) 

1916 

1917 out = {} 

1918 for icomp, (comp, comp_res) in enumerate( 

1919 zip(self.get_provided_components(), res)): 

1920 if comp not in components: 

1921 continue 

1922 out[comp] = res[icomp] 

1923 

1924 return out 

1925 

1926 def calc_seismograms(self, source, receivers, components, deltat=None, 

1927 itmin=None, nsamples=None, 

1928 interpolation='nearest_neighbor', 

1929 optimization='enable', nthreads=1): 

1930 config = self.config 

1931 

1932 assert interpolation in ['nearest_neighbor', 'multilinear'], \ 

1933 'Unknown interpolation: %s' % interpolation 

1934 

1935 if not isinstance(receivers, list): 

1936 receivers = [receivers] 

1937 

1938 if deltat is None: 

1939 decimate = 1 

1940 else: 

1941 decimate = int(round(deltat/config.deltat)) 

1942 if abs(deltat / (decimate * config.deltat) - 1.0) > 0.001: 

1943 raise StoreError( 

1944 'unavailable decimation ratio target.deltat / store.deltat' 

1945 ' = %g / %g' % (deltat, config.deltat)) 

1946 

1947 store, decimate_ = self._decimated_store(decimate) 

1948 

1949 if not store._f_index: 

1950 store.open() 

1951 

1952 scheme = config.component_scheme 

1953 

1954 source_coords_arr = source.coords5() 

1955 source_terms = source.get_source_terms(scheme) 

1956 delays = source.times.ravel() 

1957 

1958 nreceiver = len(receivers) 

1959 receiver_coords_arr = num.empty((nreceiver, 5)) 

1960 for irec, rec in enumerate(receivers): 

1961 receiver_coords_arr[irec, :] = rec.coords5 

1962 

1963 dt = self.get_deltat() 

1964 

1965 itoffset = int(num.floor(delays.min()/dt)) if delays.size != 0 else 0 

1966 

1967 if itmin is None: 

1968 itmin = num.zeros(nreceiver, dtype=num.int32) 

1969 else: 

1970 itmin = (itmin-itoffset).astype(num.int32) 

1971 

1972 if nsamples is None: 

1973 nsamples = num.zeros(nreceiver, dtype=num.int32) - 1 

1974 else: 

1975 nsamples = nsamples.astype(num.int32) 

1976 

1977 try: 

1978 results = store_ext.store_calc_timeseries( 

1979 store.cstore, 

1980 source_coords_arr, 

1981 source_terms, 

1982 (delays - itoffset*dt), 

1983 receiver_coords_arr, 

1984 scheme, 

1985 interpolation, 

1986 itmin, 

1987 nsamples, 

1988 nthreads) 

1989 except Exception as e: 

1990 raise e 

1991 

1992 provided_components = self.get_provided_components() 

1993 ncomponents = len(provided_components) 

1994 

1995 seismograms = [dict() for _ in range(nreceiver)] 

1996 for ires in range(len(results)): 

1997 res = results.pop(0) 

1998 ireceiver = ires // ncomponents 

1999 

2000 comp = provided_components[res[-2]] 

2001 

2002 if comp not in components: 

2003 continue 

2004 

2005 tr = GFTrace(*res[:-2]) 

2006 tr.deltat = config.deltat * decimate 

2007 tr.itmin += itoffset 

2008 

2009 tr.n_records_stacked = 0 

2010 tr.t_optimize = 0. 

2011 tr.t_stack = 0. 

2012 tr.err = res[-1] 

2013 

2014 seismograms[ireceiver][comp] = tr 

2015 

2016 return seismograms 

2017 

2018 def seismogram(self, source, receiver, components, deltat=None, 

2019 itmin=None, nsamples=None, 

2020 interpolation='nearest_neighbor', 

2021 optimization='enable', nthreads=1): 

2022 

2023 config = self.config 

2024 

2025 if deltat is None: 

2026 decimate = 1 

2027 else: 

2028 decimate = int(round(deltat/config.deltat)) 

2029 if abs(deltat / (decimate * config.deltat) - 1.0) > 0.001: 

2030 raise StoreError( 

2031 'unavailable decimation ratio target.deltat / store.deltat' 

2032 ' = %g / %g' % (deltat, config.deltat)) 

2033 

2034 store, decimate_ = self._decimated_store(decimate) 

2035 

2036 if not store._f_index: 

2037 store.open() 

2038 

2039 scheme = config.component_scheme 

2040 

2041 source_coords_arr = source.coords5() 

2042 source_terms = source.get_source_terms(scheme) 

2043 receiver_coords_arr = receiver.coords5[num.newaxis, :].copy() 

2044 

2045 try: 

2046 params = store_ext.make_sum_params( 

2047 store.cstore, 

2048 source_coords_arr, 

2049 source_terms, 

2050 receiver_coords_arr, 

2051 scheme, 

2052 interpolation, nthreads) 

2053 

2054 except store_ext.StoreExtError: 

2055 raise meta.OutOfBounds() 

2056 

2057 provided_components = self.get_provided_components() 

2058 

2059 out = {} 

2060 for icomp, comp in enumerate(provided_components): 

2061 if comp in components: 

2062 weights, irecords = params[icomp] 

2063 

2064 neach = irecords.size // source.times.size 

2065 delays = num.repeat(source.times, neach) 

2066 

2067 tr = store._sum( 

2068 irecords, delays, weights, itmin, nsamples, decimate_, 

2069 'c', optimization) 

2070 

2071 # to prevent problems with rounding errors (BaseStore saves 

2072 # deltat as a 4-byte floating point value, value from YAML 

2073 # config is more accurate) 

2074 tr.deltat = config.deltat * decimate 

2075 

2076 out[comp] = tr 

2077 

2078 return out 

2079 

2080 

2081__all__ = ''' 

2082gf_dtype 

2083NotMultipleOfSamplingInterval 

2084GFTrace 

2085CannotCreate 

2086CannotOpen 

2087DuplicateInsert 

2088NotAllowedToInterpolate 

2089NoSuchExtra 

2090NoSuchPhase 

2091BaseStore 

2092Store 

2093SeismosizerErrorEnum 

2094'''.split()