Coverage for /usr/local/lib/python3.11/dist-packages/pyrocko/gf/store.py: 87%
1104 statements
« prev ^ index » next coverage.py v6.5.0, created at 2023-10-06 15:01 +0000
« prev ^ index » next coverage.py v6.5.0, created at 2023-10-06 15:01 +0000
1# http://pyrocko.org - GPLv3
2#
3# The Pyrocko Developers, 21st Century
4# ---|P------/S----------~Lg----------
6'''
7Storage, retrieval and summation of Green's functions.
8'''
10import errno
11import time
12import os
13import struct
14import math
15import shutil
16try:
17 import fcntl
18except ImportError:
19 fcntl = None
20import copy
21import logging
22import re
23import hashlib
24from glob import glob
26import numpy as num
27from scipy import signal
29from . import meta
30from .error import StoreError
31try:
32 from . import store_ext
33except ImportError:
34 store_ext = None
35from pyrocko import util, spit
37logger = logging.getLogger('pyrocko.gf.store')
38op = os.path
40# gf store endianness
41E = '<'
43gf_dtype = num.dtype(num.float32)
44gf_dtype_store = num.dtype(E + 'f4')
46gf_dtype_nbytes_per_sample = 4
48gf_store_header_dtype = [
49 ('nrecords', E + 'u8'),
50 ('deltat', E + 'f4'),
51]
53gf_store_header_fmt = E + 'Qf'
54gf_store_header_fmt_size = struct.calcsize(gf_store_header_fmt)
56gf_record_dtype = num.dtype([
57 ('data_offset', E + 'u8'),
58 ('itmin', E + 'i4'),
59 ('nsamples', E + 'u4'),
60 ('begin_value', E + 'f4'),
61 ('end_value', E + 'f4'),
62])
64available_stored_tables = ['phase', 'takeoff_angle', 'incidence_angle']
66km = 1000.
69class SeismosizerErrorEnum:
70 SUCCESS = 0
71 INVALID_RECORD = 1
72 EMPTY_RECORD = 2
73 BAD_RECORD = 3
74 ALLOC_FAILED = 4
75 BAD_REQUEST = 5
76 BAD_DATA_OFFSET = 6
77 READ_DATA_FAILED = 7
78 SEEK_INDEX_FAILED = 8
79 READ_INDEX_FAILED = 9
80 FSTAT_TRACES_FAILED = 10
81 BAD_STORE = 11
82 MMAP_INDEX_FAILED = 12
83 MMAP_TRACES_FAILED = 13
84 INDEX_OUT_OF_BOUNDS = 14
85 NTARGETS_OUT_OF_BOUNDS = 15
88def valid_string_id(s):
89 return re.match(meta.StringID.pattern, s)
92def check_string_id(s):
93 if not valid_string_id(s):
94 raise ValueError('invalid name %s' % s)
96# - data_offset
97#
98# Data file offset of first sample in bytes (the seek value).
99# Special values:
100#
101# 0x00 - missing trace
102# 0x01 - zero trace (GF trace is physically zero)
103# 0x02 - short trace of 1 or 2 samples (no need for data allocation)
104#
105# - itmin
106#
107# Time of first sample of the trace as a multiple of the sampling interval. All
108# GF samples must be evaluated at multiples of the sampling interval with
109# respect to a global reference time zero. Must be set also for zero traces.
110#
111# - nsamples
112#
113# Number of samples in the GF trace. Should be set to zero for zero traces.
114#
115# - begin_value, end_value
116#
117# Values of first and last sample. These values are included in data[]
118# redunantly.
121class NotMultipleOfSamplingInterval(Exception):
122 pass
125sampling_check_eps = 1e-5
128class GFTrace(object):
129 '''
130 Green's Function trace class for handling traces from the GF store.
131 '''
133 @classmethod
134 def from_trace(cls, tr):
135 return cls(data=tr.ydata.copy(), tmin=tr.tmin, deltat=tr.deltat)
137 def __init__(self, data=None, itmin=None, deltat=1.0,
138 is_zero=False, begin_value=None, end_value=None, tmin=None):
140 assert sum((x is None) for x in (tmin, itmin)) == 1, \
141 'GFTrace: either tmin or itmin must be given'\
143 if tmin is not None:
144 itmin = int(round(tmin / deltat))
145 if abs(itmin*deltat - tmin) > sampling_check_eps*deltat:
146 raise NotMultipleOfSamplingInterval(
147 'GFTrace: tmin (%g) is not a multiple of sampling '
148 'interval (%g)' % (tmin, deltat))
150 if data is not None:
151 data = num.asarray(data, dtype=gf_dtype)
152 else:
153 data = num.array([], dtype=gf_dtype)
154 begin_value = 0.0
155 end_value = 0.0
157 self.data = data
158 self.itmin = itmin
159 self.deltat = deltat
160 self.is_zero = is_zero
161 self.n_records_stacked = 0.
162 self.t_stack = 0.
163 self.t_optimize = 0.
164 self.err = SeismosizerErrorEnum.SUCCESS
166 if data is not None and data.size > 0:
167 if begin_value is None:
168 begin_value = data[0]
169 if end_value is None:
170 end_value = data[-1]
172 self.begin_value = begin_value
173 self.end_value = end_value
175 @property
176 def t(self):
177 '''
178 Time vector of the GF trace.
180 :returns: Time vector
181 :rtype: :py:class:`numpy.ndarray`
182 '''
183 return num.linspace(
184 self.itmin*self.deltat,
185 (self.itmin+self.data.size-1)*self.deltat, self.data.size)
187 def __str__(self, itmin=0):
189 if self.is_zero:
190 return 'ZERO'
192 s = []
193 for i in range(itmin, self.itmin + self.data.size):
194 if i >= self.itmin and i < self.itmin + self.data.size:
195 s.append('%7.4g' % self.data[i-self.itmin])
196 else:
197 s.append(' '*7)
199 return '|'.join(s)
201 def to_trace(self, net, sta, loc, cha):
202 from pyrocko import trace
203 return trace.Trace(
204 net, sta, loc, cha,
205 ydata=self.data,
206 deltat=self.deltat,
207 tmin=self.itmin*self.deltat)
210class GFValue(object):
212 def __init__(self, value):
213 self.value = value
214 self.n_records_stacked = 0.
215 self.t_stack = 0.
216 self.t_optimize = 0.
219Zero = GFTrace(is_zero=True, itmin=0)
222def make_same_span(tracesdict):
224 traces = tracesdict.values()
226 nonzero = [tr for tr in traces if not tr.is_zero]
227 if not nonzero:
228 return {k: Zero for k in tracesdict.keys()}
230 deltat = nonzero[0].deltat
232 itmin = min(tr.itmin for tr in nonzero)
233 itmax = max(tr.itmin+tr.data.size for tr in nonzero) - 1
235 out = {}
236 for k, tr in tracesdict.items():
237 n = itmax - itmin + 1
238 if tr.itmin != itmin or tr.data.size != n:
239 data = num.zeros(n, dtype=gf_dtype)
240 if not tr.is_zero:
241 lo = tr.itmin-itmin
242 hi = lo + tr.data.size
243 data[:lo] = tr.data[0]
244 data[lo:hi] = tr.data
245 data[hi:] = tr.data[-1]
247 tr = GFTrace(data, itmin, deltat)
249 out[k] = tr
251 return out
254class CannotCreate(StoreError):
255 pass
258class CannotOpen(StoreError):
259 pass
262class DuplicateInsert(StoreError):
263 pass
266class ShortRead(StoreError):
267 def __str__(self):
268 return 'unexpected end of data (truncated traces file?)'
271class NotAllowedToInterpolate(StoreError):
272 def __str__(self):
273 return 'not allowed to interpolate'
276class NoSuchExtra(StoreError):
277 def __init__(self, s):
278 StoreError.__init__(self)
279 self.value = s
281 def __str__(self):
282 return 'extra information for key "%s" not found.' % self.value
285class NoSuchPhase(StoreError):
286 def __init__(self, s):
287 StoreError.__init__(self)
288 self.value = s
290 def __str__(self):
291 return 'phase for key "%s" not found. ' \
292 'Running "fomosto ttt" or "fomosto sat" may be needed.' \
293 % self.value
296def remove_if_exists(fn, force=False):
297 if os.path.exists(fn):
298 if force:
299 os.remove(fn)
300 else:
301 raise CannotCreate('file %s already exists' % fn)
304def get_extra_path(store_dir, key):
305 check_string_id(key)
306 return os.path.join(store_dir, 'extra', key)
309class BaseStore(object):
310 '''
311 Low-level part of Green's function store implementation.
313 See :py:class:`Store`.
314 '''
316 @staticmethod
317 def lock_fn_(store_dir):
318 return os.path.join(store_dir, 'lock')
320 @staticmethod
321 def index_fn_(store_dir):
322 return os.path.join(store_dir, 'index')
324 @staticmethod
325 def data_fn_(store_dir):
326 return os.path.join(store_dir, 'traces')
328 @staticmethod
329 def config_fn_(store_dir):
330 return os.path.join(store_dir, 'config')
332 @staticmethod
333 def create(store_dir, deltat, nrecords, force=False):
335 try:
336 util.ensuredir(store_dir)
337 except Exception:
338 raise CannotCreate('cannot create directory %s' % store_dir)
340 index_fn = BaseStore.index_fn_(store_dir)
341 data_fn = BaseStore.data_fn_(store_dir)
343 for fn in (index_fn, data_fn):
344 remove_if_exists(fn, force)
346 with open(index_fn, 'wb') as f:
347 f.write(struct.pack(gf_store_header_fmt, nrecords, deltat))
348 records = num.zeros(nrecords, dtype=gf_record_dtype)
349 records.tofile(f)
351 with open(data_fn, 'wb') as f:
352 f.write(b'\0' * 32)
354 def __init__(self, store_dir, mode='r', use_memmap=True):
355 assert mode in 'rw'
356 self.store_dir = store_dir
357 self.mode = mode
358 self._use_memmap = use_memmap
359 self._nrecords = None
360 self._deltat = None
361 self._f_index = None
362 self._f_data = None
363 self._records = None
364 self.cstore = None
366 def open(self):
367 assert self._f_index is None
368 index_fn = self.index_fn()
369 data_fn = self.data_fn()
371 if self.mode == 'r':
372 fmode = 'rb'
373 elif self.mode == 'w':
374 fmode = 'r+b'
375 else:
376 assert False, 'invalid mode: %s' % self.mode
378 try:
379 self._f_index = open(index_fn, fmode)
380 self._f_data = open(data_fn, fmode)
381 except Exception:
382 self.mode = ''
383 raise CannotOpen('cannot open gf store: %s' % self.store_dir)
385 try:
386 # replace single precision deltat value in store with the double
387 # precision value from the config, if available
388 self.cstore = store_ext.store_init(
389 self._f_index.fileno(), self._f_data.fileno(),
390 self.get_deltat() or 0.0)
392 except store_ext.StoreExtError as e:
393 raise StoreError(str(e))
395 while True:
396 try:
397 dataheader = self._f_index.read(gf_store_header_fmt_size)
398 break
400 except IOError as e:
401 # occasionally got this one on an NFS volume
402 if e.errno == errno.EBUSY:
403 time.sleep(0.01)
404 else:
405 raise
407 nrecords, deltat = struct.unpack(gf_store_header_fmt, dataheader)
408 self._nrecords = nrecords
409 self._deltat = deltat
411 self._load_index()
413 def __del__(self):
414 if self.mode != '':
415 self.close()
417 def lock(self):
418 if not fcntl:
419 lock_fn = self.lock_fn()
420 util.create_lockfile(lock_fn)
421 else:
422 if not self._f_index:
423 self.open()
425 while True:
426 try:
427 fcntl.lockf(self._f_index, fcntl.LOCK_EX)
428 break
430 except IOError as e:
431 if e.errno == errno.ENOLCK:
432 time.sleep(0.01)
433 else:
434 raise
436 def unlock(self):
437 if not fcntl:
438 lock_fn = self.lock_fn()
439 util.delete_lockfile(lock_fn)
440 else:
441 self._f_data.flush()
442 self._f_index.flush()
443 fcntl.lockf(self._f_index, fcntl.LOCK_UN)
445 def put(self, irecord, trace):
446 self._put(irecord, trace)
448 def get_record(self, irecord):
449 return self._get_record(irecord)
451 def get_span(self, irecord, decimate=1):
452 return self._get_span(irecord, decimate=decimate)
454 def get(self, irecord, itmin=None, nsamples=None, decimate=1,
455 implementation='c'):
456 return self._get(irecord, itmin, nsamples, decimate, implementation)
458 def sum(self, irecords, delays, weights, itmin=None,
459 nsamples=None, decimate=1, implementation='c',
460 optimization='enable'):
461 return self._sum(irecords, delays, weights, itmin, nsamples, decimate,
462 implementation, optimization)
464 def irecord_format(self):
465 return util.zfmt(self._nrecords)
467 def str_irecord(self, irecord):
468 return self.irecord_format() % irecord
470 def close(self):
471 if self.mode == 'w':
472 if not self._f_index:
473 self.open()
474 self._save_index()
476 if self._f_data:
477 self._f_data.close()
478 self._f_data = None
480 if self._f_index:
481 self._f_index.close()
482 self._f_index = None
484 del self._records
485 self._records = None
487 self.mode = ''
489 def _get_record(self, irecord):
490 if not self._f_index:
491 self.open()
493 return self._records[irecord]
495 def _get(self, irecord, itmin, nsamples, decimate, implementation):
496 '''
497 Retrieve complete GF trace from storage.
498 '''
500 if not self._f_index:
501 self.open()
503 if not self.mode == 'r':
504 raise StoreError('store not open in read mode')
506 if implementation == 'c' and decimate == 1:
508 if nsamples is None:
509 nsamples = -1
511 if itmin is None:
512 itmin = 0
514 try:
515 return GFTrace(*store_ext.store_get(
516 self.cstore, int(irecord), int(itmin), int(nsamples)))
517 except store_ext.StoreExtError as e:
518 raise StoreError(str(e))
520 else:
521 return self._get_impl_reference(irecord, itmin, nsamples, decimate)
523 def get_deltat(self):
524 return self._deltat
526 def _get_impl_reference(self, irecord, itmin, nsamples, decimate):
527 deltat = self.get_deltat()
529 if not (0 <= irecord < self._nrecords):
530 raise StoreError('invalid record number requested '
531 '(irecord = %i, nrecords = %i)' %
532 (irecord, self._nrecords))
534 ipos, itmin_data, nsamples_data, begin_value, end_value = \
535 self._records[irecord]
537 if None in (itmin, nsamples):
538 itmin = itmin_data
539 itmax = itmin_data + nsamples_data - 1
540 nsamples = nsamples_data
541 else:
542 itmin = itmin * decimate
543 nsamples = nsamples * decimate
544 itmax = itmin + nsamples - decimate
546 if ipos == 0:
547 return None
549 elif ipos == 1:
550 itmin_ext = (max(itmin, itmin_data)//decimate) * decimate
551 return GFTrace(is_zero=True, itmin=itmin_ext//decimate)
553 if decimate == 1:
554 ilo = max(itmin, itmin_data) - itmin_data
555 ihi = min(itmin+nsamples, itmin_data+nsamples_data) - itmin_data
556 data = self._get_data(ipos, begin_value, end_value, ilo, ihi)
558 return GFTrace(data, itmin=itmin_data+ilo, deltat=deltat,
559 begin_value=begin_value, end_value=end_value)
561 else:
562 itmax_data = itmin_data + nsamples_data - 1
564 # put begin and end to multiples of new sampling rate
565 itmin_ext = (max(itmin, itmin_data)//decimate) * decimate
566 itmax_ext = -((-min(itmax, itmax_data))//decimate) * decimate
567 nsamples_ext = itmax_ext - itmin_ext + 1
569 # add some padding for the aa filter
570 order = 30
571 itmin_ext_pad = itmin_ext - order//2
572 itmax_ext_pad = itmax_ext + order//2
573 nsamples_ext_pad = itmax_ext_pad - itmin_ext_pad + 1
575 itmin_overlap = max(itmin_data, itmin_ext_pad)
576 itmax_overlap = min(itmax_data, itmax_ext_pad)
578 ilo = itmin_overlap - itmin_ext_pad
579 ihi = max(ilo, itmax_overlap - itmin_ext_pad + 1)
580 ilo_data = itmin_overlap - itmin_data
581 ihi_data = max(ilo_data, itmax_overlap - itmin_data + 1)
583 data_ext_pad = num.empty(nsamples_ext_pad, dtype=gf_dtype)
584 data_ext_pad[ilo:ihi] = self._get_data(
585 ipos, begin_value, end_value, ilo_data, ihi_data)
587 data_ext_pad[:ilo] = begin_value
588 data_ext_pad[ihi:] = end_value
590 b = signal.firwin(order + 1, 1. / decimate, window='hamming')
591 a = 1.
592 data_filt_pad = signal.lfilter(b, a, data_ext_pad)
593 data_deci = data_filt_pad[order:order+nsamples_ext:decimate]
594 if data_deci.size >= 1:
595 if itmin_ext <= itmin_data:
596 data_deci[0] = begin_value
598 if itmax_ext >= itmax_data:
599 data_deci[-1] = end_value
601 return GFTrace(data_deci, itmin_ext//decimate,
602 deltat*decimate,
603 begin_value=begin_value, end_value=end_value)
605 def _get_span(self, irecord, decimate=1):
606 '''
607 Get temporal extent of GF trace at given index.
608 '''
610 if not self._f_index:
611 self.open()
613 assert 0 <= irecord < self._nrecords, \
614 'irecord = %i, nrecords = %i' % (irecord, self._nrecords)
616 (_, itmin, nsamples, _, _) = self._records[irecord]
618 itmax = itmin + nsamples - 1
620 if decimate == 1:
621 return itmin, itmax
622 else:
623 if nsamples == 0:
624 return itmin//decimate, itmin//decimate - 1
625 else:
626 return itmin//decimate, -((-itmax)//decimate)
628 def _put(self, irecord, trace):
629 '''
630 Save GF trace to storage.
631 '''
633 if not self._f_index:
634 self.open()
636 deltat = self.get_deltat()
638 assert self.mode == 'w'
639 assert trace.is_zero or \
640 abs(trace.deltat - deltat) < 1e-7 * deltat
641 assert 0 <= irecord < self._nrecords, \
642 'irecord = %i, nrecords = %i' % (irecord, self._nrecords)
644 if self._records[irecord][0] != 0:
645 raise DuplicateInsert('record %i already in store' % irecord)
647 if trace.is_zero or num.all(trace.data == 0.0):
648 self._records[irecord] = (1, trace.itmin, 0, 0., 0.)
649 return
651 ndata = trace.data.size
653 if ndata > 2:
654 self._f_data.seek(0, 2)
655 ipos = self._f_data.tell()
656 trace.data.astype(gf_dtype_store).tofile(self._f_data)
657 else:
658 ipos = 2
660 self._records[irecord] = (ipos, trace.itmin, ndata,
661 trace.data[0], trace.data[-1])
663 def _sum_impl_alternative(self, irecords, delays, weights, itmin, nsamples,
664 decimate):
666 '''
667 Sum delayed and weighted GF traces.
668 '''
670 if not self._f_index:
671 self.open()
673 assert self.mode == 'r'
675 deltat = self.get_deltat() * decimate
677 if len(irecords) == 0:
678 if None in (itmin, nsamples):
679 return Zero
680 else:
681 return GFTrace(
682 num.zeros(nsamples, dtype=gf_dtype), itmin,
683 deltat, is_zero=True)
685 assert len(irecords) == len(delays)
686 assert len(irecords) == len(weights)
688 if None in (itmin, nsamples):
689 itmin_delayed, itmax_delayed = [], []
690 for irecord, delay in zip(irecords, delays):
691 itmin, itmax = self._get_span(irecord, decimate=decimate)
692 itmin_delayed.append(itmin + delay/deltat)
693 itmax_delayed.append(itmax + delay/deltat)
695 itmin = int(math.floor(min(itmin_delayed)))
696 nsamples = int(math.ceil(max(itmax_delayed))) - itmin + 1
698 out = num.zeros(nsamples, dtype=gf_dtype)
699 if nsamples == 0:
700 return GFTrace(out, itmin, deltat)
702 for ii in range(len(irecords)):
703 irecord = irecords[ii]
704 delay = delays[ii]
705 weight = weights[ii]
707 if weight == 0.0:
708 continue
710 idelay_floor = int(math.floor(delay/deltat))
711 idelay_ceil = int(math.ceil(delay/deltat))
713 gf_trace = self._get(
714 irecord,
715 itmin - idelay_ceil,
716 nsamples + idelay_ceil - idelay_floor,
717 decimate,
718 'reference')
720 assert gf_trace.itmin >= itmin - idelay_ceil
721 assert gf_trace.data.size <= nsamples + idelay_ceil - idelay_floor
723 if gf_trace.is_zero:
724 continue
726 ilo = gf_trace.itmin - itmin + idelay_floor
727 ihi = ilo + gf_trace.data.size + (idelay_ceil-idelay_floor)
729 data = gf_trace.data
731 if idelay_floor == idelay_ceil:
732 out[ilo:ihi] += data * weight
733 else:
734 if data.size:
735 k = 1
736 if ihi <= nsamples:
737 out[ihi-1] += gf_trace.end_value * \
738 ((idelay_ceil-delay/deltat) * weight)
739 k = 0
741 out[ilo+1:ihi-k] += data[:data.size-k] * \
742 ((delay/deltat-idelay_floor) * weight)
743 k = 1
744 if ilo >= 0:
745 out[ilo] += gf_trace.begin_value * \
746 ((delay/deltat-idelay_floor) * weight)
747 k = 0
749 out[ilo+k:ihi-1] += data[k:] * \
750 ((idelay_ceil-delay/deltat) * weight)
752 if ilo > 0 and gf_trace.begin_value != 0.0:
753 out[:ilo] += gf_trace.begin_value * weight
755 if ihi < nsamples and gf_trace.end_value != 0.0:
756 out[ihi:] += gf_trace.end_value * weight
758 return GFTrace(out, itmin, deltat)
760 def _sum_impl_reference(self, irecords, delays, weights, itmin, nsamples,
761 decimate):
763 if not self._f_index:
764 self.open()
766 deltat = self.get_deltat() * decimate
768 if len(irecords) == 0:
769 if None in (itmin, nsamples):
770 return Zero
771 else:
772 return GFTrace(
773 num.zeros(nsamples, dtype=gf_dtype), itmin,
774 deltat, is_zero=True)
776 datas = []
777 itmins = []
778 for i, delay, weight in zip(irecords, delays, weights):
779 tr = self._get(i, None, None, decimate, 'reference')
781 idelay_floor = int(math.floor(delay/deltat))
782 idelay_ceil = int(math.ceil(delay/deltat))
784 if idelay_floor == idelay_ceil:
785 itmins.append(tr.itmin + idelay_floor)
786 datas.append(tr.data.copy()*weight)
787 else:
788 itmins.append(tr.itmin + idelay_floor)
789 datas.append(tr.data.copy()*weight*(idelay_ceil-delay/deltat))
790 itmins.append(tr.itmin + idelay_ceil)
791 datas.append(tr.data.copy()*weight*(delay/deltat-idelay_floor))
793 itmin_all = min(itmins)
795 itmax_all = max(itmin_ + data.size for (itmin_, data) in
796 zip(itmins, datas))
798 if itmin is not None:
799 itmin_all = min(itmin_all, itmin)
800 if nsamples is not None:
801 itmax_all = max(itmax_all, itmin+nsamples)
803 nsamples_all = itmax_all - itmin_all
805 arr = num.zeros((len(datas), nsamples_all), dtype=gf_dtype)
806 for i, itmin_, data in zip(num.arange(len(datas)), itmins, datas):
807 if data.size > 0:
808 ilo = itmin_-itmin_all
809 ihi = ilo + data.size
810 arr[i, :ilo] = data[0]
811 arr[i, ilo:ihi] = data
812 arr[i, ihi:] = data[-1]
814 sum_arr = arr.sum(axis=0)
816 if itmin is not None and nsamples is not None:
817 ilo = itmin-itmin_all
818 ihi = ilo + nsamples
819 sum_arr = sum_arr[ilo:ihi]
821 else:
822 itmin = itmin_all
824 return GFTrace(sum_arr, itmin, deltat)
826 def _optimize(self, irecords, delays, weights):
827 if num.unique(irecords).size == irecords.size:
828 return irecords, delays, weights
830 deltat = self.get_deltat()
832 delays = delays / deltat
833 irecords2 = num.repeat(irecords, 2)
834 delays2 = num.empty(irecords2.size, dtype=float)
835 delays2[0::2] = num.floor(delays)
836 delays2[1::2] = num.ceil(delays)
837 weights2 = num.repeat(weights, 2)
838 weights2[0::2] *= 1.0 - (delays - delays2[0::2])
839 weights2[1::2] *= (1.0 - (delays2[1::2] - delays)) * \
840 (delays2[1::2] - delays2[0::2])
842 delays2 *= deltat
844 iorder = num.lexsort((delays2, irecords2))
846 irecords2 = irecords2[iorder]
847 delays2 = delays2[iorder]
848 weights2 = weights2[iorder]
850 ui = num.empty(irecords2.size, dtype=bool)
851 ui[1:] = num.logical_or(num.diff(irecords2) != 0,
852 num.diff(delays2) != 0.)
854 ui[0] = 0
855 ind2 = num.cumsum(ui)
856 ui[0] = 1
857 ind1 = num.where(ui)[0]
859 irecords3 = irecords2[ind1]
860 delays3 = delays2[ind1]
861 weights3 = num.bincount(ind2, weights2)
863 return irecords3, delays3, weights3
865 def _sum(self, irecords, delays, weights, itmin, nsamples, decimate,
866 implementation, optimization):
868 if not self._f_index:
869 self.open()
871 t0 = time.time()
872 if optimization == 'enable':
873 irecords, delays, weights = self._optimize(
874 irecords, delays, weights)
875 else:
876 assert optimization == 'disable'
878 t1 = time.time()
879 deltat = self.get_deltat()
881 if implementation == 'c' and decimate == 1:
882 if delays.size != 0:
883 itoffset = int(num.floor(num.min(delays)/deltat))
884 else:
885 itoffset = 0
887 if nsamples is None:
888 nsamples = -1
890 if itmin is None:
891 itmin = 0
892 else:
893 itmin -= itoffset
895 try:
896 tr = GFTrace(*store_ext.store_sum(
897 self.cstore, irecords.astype(num.uint64),
898 delays - itoffset*deltat,
899 weights.astype(num.float32),
900 int(itmin), int(nsamples)))
902 tr.itmin += itoffset
904 except store_ext.StoreExtError as e:
905 raise StoreError(str(e) + ' in store %s' % self.store_dir)
907 elif implementation == 'alternative':
908 tr = self._sum_impl_alternative(irecords, delays, weights,
909 itmin, nsamples, decimate)
911 else:
912 tr = self._sum_impl_reference(irecords, delays, weights,
913 itmin, nsamples, decimate)
915 t2 = time.time()
917 tr.n_records_stacked = irecords.size
918 tr.t_optimize = t1 - t0
919 tr.t_stack = t2 - t1
921 return tr
923 def _sum_statics(self, irecords, delays, weights, it, ntargets,
924 nthreads):
925 if not self._f_index:
926 self.open()
928 return store_ext.store_sum_static(
929 self.cstore, irecords, delays, weights, it, ntargets, nthreads)
931 def _load_index(self):
932 if self._use_memmap:
933 records = num.memmap(
934 self._f_index, dtype=gf_record_dtype,
935 offset=gf_store_header_fmt_size,
936 mode=('r', 'r+')[self.mode == 'w'])
938 else:
939 self._f_index.seek(gf_store_header_fmt_size)
940 records = num.fromfile(self._f_index, dtype=gf_record_dtype)
942 assert len(records) == self._nrecords
944 self._records = records
946 def _save_index(self):
947 self._f_index.seek(0)
948 self._f_index.write(struct.pack(gf_store_header_fmt, self._nrecords,
949 self.get_deltat()))
951 if self._use_memmap:
952 self._records.flush()
953 else:
954 self._f_index.seek(gf_store_header_fmt_size)
955 self._records.tofile(self._f_index)
956 self._f_index.flush()
958 def _get_data(self, ipos, begin_value, end_value, ilo, ihi):
959 if ihi - ilo > 0:
960 if ipos == 2:
961 data_orig = num.empty(2, dtype=gf_dtype)
962 data_orig[0] = begin_value
963 data_orig[1] = end_value
964 return data_orig[ilo:ihi]
965 else:
966 self._f_data.seek(
967 int(ipos + ilo*gf_dtype_nbytes_per_sample))
968 arr = num.fromfile(
969 self._f_data, gf_dtype_store, ihi-ilo).astype(gf_dtype)
971 if arr.size != ihi-ilo:
972 raise ShortRead()
973 return arr
974 else:
975 return num.empty((0,), dtype=gf_dtype)
977 def lock_fn(self):
978 return BaseStore.lock_fn_(self.store_dir)
980 def index_fn(self):
981 return BaseStore.index_fn_(self.store_dir)
983 def data_fn(self):
984 return BaseStore.data_fn_(self.store_dir)
986 def config_fn(self):
987 return BaseStore.config_fn_(self.store_dir)
989 def count_special_records(self):
990 if not self._f_index:
991 self.open()
993 return num.histogram(
994 self._records['data_offset'],
995 bins=[0, 1, 2, 3, num.array(-1).astype(num.uint64)])[0]
997 @property
998 def size_index(self):
999 return os.stat(self.index_fn()).st_size
1001 @property
1002 def size_data(self):
1003 return os.stat(self.data_fn()).st_size
1005 @property
1006 def size_index_and_data(self):
1007 return self.size_index + self.size_data
1009 @property
1010 def size_index_and_data_human(self):
1011 return util.human_bytesize(self.size_index_and_data)
1013 def stats(self):
1014 counter = self.count_special_records()
1016 stats = dict(
1017 total=self._nrecords,
1018 inserted=(counter[1] + counter[2] + counter[3]),
1019 empty=counter[0],
1020 short=counter[2],
1021 zero=counter[1],
1022 size_data=self.size_data,
1023 size_index=self.size_index,
1024 )
1026 return stats
1028 stats_keys = 'total inserted empty short zero size_data size_index'.split()
1031def remake_dir(dpath, force):
1032 if os.path.exists(dpath):
1033 if force:
1034 shutil.rmtree(dpath)
1035 else:
1036 raise CannotCreate('Directory "%s" already exists.' % dpath)
1038 os.mkdir(dpath)
1041class MakeTimingParamsFailed(StoreError):
1042 pass
1045class Store(BaseStore):
1047 '''
1048 Green's function disk storage and summation machine.
1050 The `Store` can be used to efficiently store, retrieve, and sum Green's
1051 function traces. A `Store` contains many 1D time traces sampled at even
1052 multiples of a global sampling rate, where each time trace has an
1053 individual start and end time. The traces are treated as having repeating
1054 end points, so the functions they represent can be non-constant only
1055 between begin and end time. It provides capabilities to retrieve decimated
1056 traces and to extract parts of the traces. The main purpose of this class
1057 is to provide a fast, easy to use, and flexible machanism to compute
1058 weighted delay-and-sum stacks with many Green's function traces involved.
1060 Individual Green's functions are accessed through a single integer index at
1061 low level. On top of that, various indexing schemes can be implemented by
1062 providing a mapping from physical coordinates to the low level index `i`.
1063 E.g. for a problem with cylindrical symmetry, one might define a mapping
1064 from the coordinates (`receiver_depth`, `source_depth`, `distance`) to the
1065 low level index. Index translation is done in the
1066 :py:class:`pyrocko.gf.meta.Config` subclass object associated with the
1067 Store. It is accessible through the store's :py:attr:`config` attribute,
1068 and contains all meta information about the store, including physical
1069 extent, geometry, sampling rate, and information about the type of the
1070 stored Green's functions. Information about the underlying earth model
1071 can also be found there.
1073 A GF store can also contain tabulated phase arrivals. In basic cases, these
1074 can be created with the :py:meth:`make_travel_time_tables` and evaluated
1075 with the :py:func:`t` methods.
1077 .. attribute:: config
1079 The :py:class:`pyrocko.gf.meta.Config` derived object associated with
1080 the store which contains all meta information about the store and
1081 provides the high-level to low-level index mapping.
1083 .. attribute:: store_dir
1085 Path to the store's data directory.
1087 .. attribute:: mode
1089 The mode in which the store is opened: ``'r'``: read-only, ``'w'``:
1090 writeable.
1091 '''
1093 @classmethod
1094 def create(cls, store_dir, config, force=False, extra=None):
1095 '''
1096 Create new GF store.
1098 Creates a new GF store at path ``store_dir``. The layout of the GF is
1099 defined with the parameters given in ``config``, which should be an
1100 object of a subclass of :py:class:`~pyrocko.gf.meta.Config`. This
1101 function will refuse to overwrite an existing GF store, unless
1102 ``force`` is set to ``True``. If more information, e.g. parameters
1103 used for the modelling code, earth models or other, should be saved
1104 along with the GF store, these may be provided though a dict given to
1105 ``extra``. The keys of this dict must be names and the values must be
1106 *guts* type objects.
1108 :param store_dir: GF store path
1109 :type store_dir: str
1110 :param config: GF store Config
1111 :type config: :py:class:`~pyrocko.gf.meta.Config`
1112 :param force: Force overwrite, defaults to ``False``
1113 :type force: bool
1114 :param extra: Extra information
1115 :type extra: dict or None
1116 '''
1118 cls.create_editables(store_dir, config, force=force, extra=extra)
1119 cls.create_dependants(store_dir, force=force)
1121 return cls(store_dir)
1123 @staticmethod
1124 def create_editables(store_dir, config, force=False, extra=None):
1125 try:
1126 util.ensuredir(store_dir)
1127 except Exception:
1128 raise CannotCreate('cannot create directory %s' % store_dir)
1130 fns = []
1132 config_fn = os.path.join(store_dir, 'config')
1133 remove_if_exists(config_fn, force)
1134 meta.dump(config, filename=config_fn)
1136 fns.append(config_fn)
1138 for sub_dir in ['extra']:
1139 dpath = os.path.join(store_dir, sub_dir)
1140 remake_dir(dpath, force)
1142 if extra:
1143 for k, v in extra.items():
1144 fn = get_extra_path(store_dir, k)
1145 remove_if_exists(fn, force)
1146 meta.dump(v, filename=fn)
1148 fns.append(fn)
1150 return fns
1152 @staticmethod
1153 def create_dependants(store_dir, force=False):
1154 config_fn = os.path.join(store_dir, 'config')
1155 config = meta.load(filename=config_fn)
1157 BaseStore.create(store_dir, config.deltat, config.nrecords,
1158 force=force)
1160 for sub_dir in ['decimated']:
1161 dpath = os.path.join(store_dir, sub_dir)
1162 remake_dir(dpath, force)
1164 def __init__(self, store_dir, mode='r', use_memmap=True):
1165 BaseStore.__init__(self, store_dir, mode=mode, use_memmap=use_memmap)
1166 config_fn = self.config_fn()
1167 if not os.path.isfile(config_fn):
1168 raise StoreError(
1169 'directory "%s" does not seem to contain a GF store '
1170 '("config" file not found)' % store_dir)
1171 self.load_config()
1173 self._decimated = {}
1174 self._extra = {}
1175 self._phases = {}
1176 for decimate in range(2, 9):
1177 if os.path.isdir(self._decimated_store_dir(decimate)):
1178 self._decimated[decimate] = None
1180 def open(self):
1181 if not self._f_index:
1182 BaseStore.open(self)
1183 c = self.config
1185 mscheme = 'type_' + c.short_type.lower()
1186 store_ext.store_mapping_init(
1187 self.cstore, mscheme,
1188 c.mins, c.maxs, c.deltas, c.ns.astype(num.uint64),
1189 c.ncomponents)
1191 def save_config(self, make_backup=False):
1192 config_fn = self.config_fn()
1193 if make_backup:
1194 os.rename(config_fn, config_fn + '~')
1196 meta.dump(self.config, filename=config_fn)
1198 def get_deltat(self):
1199 return self.config.deltat
1201 def load_config(self):
1202 self.config = meta.load(filename=self.config_fn())
1204 def ensure_reference(self, force=False):
1205 if self.config.reference is not None and not force:
1206 return
1207 self.ensure_uuid()
1208 reference = '%s-%s' % (self.config.id, self.config.uuid[0:6])
1210 if self.config.reference is not None:
1211 self.config.reference = reference
1212 self.save_config()
1213 else:
1214 with open(self.config_fn(), 'a') as config:
1215 config.write('reference: %s\n' % reference)
1216 self.load_config()
1218 def ensure_uuid(self, force=False):
1219 if self.config.uuid is not None and not force:
1220 return
1221 uuid = self.create_store_hash()
1223 if self.config.uuid is not None:
1224 self.config.uuid = uuid
1225 self.save_config()
1226 else:
1227 with open(self.config_fn(), 'a') as config:
1228 config.write('\nuuid: %s\n' % uuid)
1229 self.load_config()
1231 def create_store_hash(self):
1232 logger.info('creating hash for store ...')
1233 m = hashlib.sha1()
1235 traces_size = op.getsize(self.data_fn())
1236 with open(self.data_fn(), 'rb') as traces:
1237 while traces.tell() < traces_size:
1238 m.update(traces.read(4096))
1239 traces.seek(1024 * 1024 * 10, 1)
1241 with open(self.config_fn(), 'rb') as config:
1242 m.update(config.read())
1243 return m.hexdigest()
1245 def get_extra_path(self, key):
1246 return get_extra_path(self.store_dir, key)
1248 def get_extra(self, key):
1249 '''
1250 Get extra information stored under given key.
1251 '''
1253 x = self._extra
1254 if key not in x:
1255 fn = self.get_extra_path(key)
1256 if not os.path.exists(fn):
1257 raise NoSuchExtra(key)
1259 x[key] = meta.load(filename=fn)
1261 return x[key]
1263 def upgrade(self):
1264 '''
1265 Upgrade store config and files to latest Pyrocko version.
1266 '''
1267 fns = [os.path.join(self.store_dir, 'config')]
1268 for key in self.extra_keys():
1269 fns.append(self.get_extra_path(key))
1271 i = 0
1272 for fn in fns:
1273 i += util.pf_upgrade(fn)
1275 return i
1277 def extra_keys(self):
1278 return [x for x in os.listdir(os.path.join(self.store_dir, 'extra'))
1279 if valid_string_id(x)]
1281 def put(self, args, trace):
1282 '''
1283 Insert trace into GF store.
1285 Store a single GF trace at (high-level) index ``args``.
1287 :param args: :py:class:`~pyrocko.gf.meta.Config` index tuple, e.g.
1288 ``(source_depth, distance, component)`` as in
1289 :py:class:`~pyrocko.gf.meta.ConfigTypeA`.
1290 :type args: tuple
1291 :returns: GF trace at ``args``
1292 :rtype: :py:class:`~pyrocko.gf.store.GFTrace`
1293 '''
1295 irecord = self.config.irecord(*args)
1296 self._put(irecord, trace)
1298 def get_record(self, args):
1299 irecord = self.config.irecord(*args)
1300 return self._get_record(irecord)
1302 def str_irecord(self, args):
1303 return BaseStore.str_irecord(self, self.config.irecord(*args))
1305 def get(self, args, itmin=None, nsamples=None, decimate=1,
1306 interpolation='nearest_neighbor', implementation='c'):
1307 '''
1308 Retrieve GF trace from store.
1310 Retrieve a single GF trace from the store at (high-level) index
1311 ``args``. By default, the full trace is retrieved. Given ``itmin`` and
1312 ``nsamples``, only the selected portion of the trace is extracted. If
1313 ``decimate`` is an integer in the range [2,8], the trace is decimated
1314 on the fly or, if available, the trace is read from a decimated version
1315 of the GF store.
1317 :param args: :py:class:`~pyrocko.gf.meta.Config` index tuple, e.g.
1318 ``(source_depth, distance, component)`` as in
1319 :py:class:`~pyrocko.gf.meta.ConfigTypeA`.
1320 :type args: tuple
1321 :param itmin: Start time index (start time is ``itmin * dt``),
1322 defaults to None
1323 :type itmin: int or None
1324 :param nsamples: Number of samples, defaults to None
1325 :type nsamples: int or None
1326 :param decimate: Decimatation factor, defaults to 1
1327 :type decimate: int
1328 :param interpolation: Interpolation method
1329 ``['nearest_neighbor', 'multilinear', 'off']``, defaults to
1330 ``'nearest_neighbor'``
1331 :type interpolation: str
1332 :param implementation: Implementation to use ``['c', 'reference']``,
1333 defaults to ``'c'``.
1334 :type implementation: str
1336 :returns: GF trace at ``args``
1337 :rtype: :py:class:`~pyrocko.gf.store.GFTrace`
1338 '''
1340 store, decimate = self._decimated_store(decimate)
1341 if interpolation == 'nearest_neighbor':
1342 irecord = store.config.irecord(*args)
1343 tr = store._get(irecord, itmin, nsamples, decimate,
1344 implementation)
1346 elif interpolation == 'off':
1347 irecords, weights = store.config.vicinity(*args)
1348 if len(irecords) != 1:
1349 raise NotAllowedToInterpolate()
1350 else:
1351 tr = store._get(irecords[0], itmin, nsamples, decimate,
1352 implementation)
1354 elif interpolation == 'multilinear':
1355 tr = store._sum(irecords, num.zeros(len(irecords)), weights,
1356 itmin, nsamples, decimate, implementation,
1357 'disable')
1359 # to prevent problems with rounding errors (BaseStore saves deltat
1360 # as a 4-byte floating point value, value from YAML config is more
1361 # accurate)
1362 tr.deltat = self.config.deltat * decimate
1363 return tr
1365 def sum(self, args, delays, weights, itmin=None, nsamples=None,
1366 decimate=1, interpolation='nearest_neighbor', implementation='c',
1367 optimization='enable'):
1368 '''
1369 Sum delayed and weighted GF traces.
1371 Calculate sum of delayed and weighted GF traces. ``args`` is a tuple of
1372 arrays forming the (high-level) indices of the GF traces to be
1373 selected. Delays and weights for the summation are given in the arrays
1374 ``delays`` and ``weights``. ``itmin`` and ``nsamples`` can be given to
1375 restrict to computation to a given time interval. If ``decimate`` is
1376 an integer in the range [2,8], decimated traces are used in the
1377 summation.
1379 :param args: :py:class:`~pyrocko.gf.meta.Config` index tuple, e.g.
1380 ``(source_depth, distance, component)`` as in
1381 :py:class:`~pyrocko.gf.meta.ConfigTypeA`.
1382 :type args: tuple(numpy.ndarray)
1383 :param delays: Delay times
1384 :type delays: :py:class:`numpy.ndarray`
1385 :param weights: Trace weights
1386 :type weights: :py:class:`numpy.ndarray`
1387 :param itmin: Start time index (start time is ``itmin * dt``),
1388 defaults to None
1389 :type itmin: int or None
1390 :param nsamples: Number of samples, defaults to None
1391 :type nsamples: int or None
1392 :param decimate: Decimatation factor, defaults to 1
1393 :type decimate: int
1394 :param interpolation: Interpolation method
1395 ``['nearest_neighbor', 'multilinear', 'off']``, defaults to
1396 ``'nearest_neighbor'``
1397 :type interpolation: str
1398 :param implementation: Implementation to use,
1399 ``['c', 'alternative', 'reference']``, where ``'alternative'``
1400 and ``'reference'`` use a Python implementation, defaults to `'c'`
1401 :type implementation: str
1402 :param optimization: Optimization mode ``['enable', 'disable']``,
1403 defaults to ``'enable'``
1404 :type optimization: str
1405 :returns: Stacked GF trace.
1406 :rtype: :py:class:`~pyrocko.gf.store.GFTrace`
1407 '''
1409 store, decimate_ = self._decimated_store(decimate)
1411 if interpolation == 'nearest_neighbor':
1412 irecords = store.config.irecords(*args)
1413 else:
1414 assert interpolation == 'multilinear'
1415 irecords, ip_weights = store.config.vicinities(*args)
1416 neach = irecords.size // args[0].size
1417 weights = num.repeat(weights, neach) * ip_weights
1418 delays = num.repeat(delays, neach)
1420 tr = store._sum(irecords, delays, weights, itmin, nsamples, decimate_,
1421 implementation, optimization)
1423 # to prevent problems with rounding errors (BaseStore saves deltat
1424 # as a 4-byte floating point value, value from YAML config is more
1425 # accurate)
1426 tr.deltat = self.config.deltat * decimate
1427 return tr
1429 def make_decimated(self, decimate, config=None, force=False,
1430 show_progress=False):
1431 '''
1432 Create decimated version of GF store.
1434 Create a downsampled version of the GF store. Downsampling is done for
1435 the integer factor ``decimate`` which should be in the range [2,8]. If
1436 ``config`` is ``None``, all traces of the GF store are decimated and
1437 held available (i.e. the index mapping of the original store is used),
1438 otherwise, a different spacial stepping can be specified by giving a
1439 modified GF store configuration in ``config`` (see :py:meth:`create`).
1440 Decimated GF sub-stores are created under the ``decimated``
1441 subdirectory within the GF store directory. Holding available decimated
1442 versions of the GF store can save computation time, IO bandwidth, or
1443 decrease memory footprint at the cost of increased disk space usage,
1444 when computation are done for lower frequency signals.
1446 :param decimate: Decimate factor
1447 :type decimate: int
1448 :param config: GF store config object, defaults to None
1449 :type config: :py:class:`~pyrocko.gf.meta.Config` or None
1450 :param force: Force overwrite, defaults to ``False``
1451 :type force: bool
1452 :param show_progress: Show progress, defaults to ``False``
1453 :type show_progress: bool
1454 '''
1456 if not self._f_index:
1457 self.open()
1459 if not (2 <= decimate <= 8):
1460 raise StoreError('decimate argument must be in the range [2,8]')
1462 assert self.mode == 'r'
1464 if config is None:
1465 config = self.config
1467 config = copy.deepcopy(config)
1468 config.sample_rate = self.config.sample_rate / decimate
1470 if decimate in self._decimated:
1471 del self._decimated[decimate]
1473 store_dir = self._decimated_store_dir(decimate)
1474 if os.path.exists(store_dir):
1475 if force:
1476 shutil.rmtree(store_dir)
1477 else:
1478 raise CannotCreate('store already exists at %s' % store_dir)
1480 store_dir_incomplete = store_dir + '-incomplete'
1481 Store.create(store_dir_incomplete, config, force=force)
1483 decimated = Store(store_dir_incomplete, 'w')
1484 try:
1485 if show_progress:
1486 pbar = util.progressbar(
1487 'decimating store', self.config.nrecords)
1489 for i, args in enumerate(decimated.config.iter_nodes()):
1490 tr = self.get(args, decimate=decimate)
1491 decimated.put(args, tr)
1493 if show_progress:
1494 pbar.update(i+1)
1496 finally:
1497 if show_progress:
1498 pbar.finish()
1500 decimated.close()
1502 shutil.move(store_dir_incomplete, store_dir)
1504 self._decimated[decimate] = None
1506 def stats(self):
1507 stats = BaseStore.stats(self)
1508 stats['decimated'] = sorted(self._decimated.keys())
1509 return stats
1511 stats_keys = BaseStore.stats_keys + ['decimated']
1513 def check(self, show_progress=False):
1514 have_holes = []
1515 for pdef in self.config.tabulated_phases:
1516 phase_id = pdef.id
1517 ph = self.get_stored_phase(phase_id)
1518 if ph.check_holes():
1519 have_holes.append(phase_id)
1521 if have_holes:
1522 for phase_id in have_holes:
1523 logger.warning(
1524 'Travel time table of phase "{}" contains holes. You can '
1525 ' use `fomosto tttlsd` to fix holes.'.format(
1526 phase_id))
1527 else:
1528 logger.info('No holes in travel time tables')
1530 try:
1531 if show_progress:
1532 pbar = util.progressbar('checking store', self.config.nrecords)
1534 problems = 0
1535 for i, args in enumerate(self.config.iter_nodes()):
1536 tr = self.get(args)
1537 if tr and not tr.is_zero:
1538 if not tr.begin_value == tr.data[0]:
1539 logger.warning('wrong begin value for trace at %s '
1540 '(data corruption?)' % str(args))
1541 problems += 1
1542 if not tr.end_value == tr.data[-1]:
1543 logger.warning('wrong end value for trace at %s '
1544 '(data corruption?)' % str(args))
1545 problems += 1
1546 if not num.all(num.isfinite(tr.data)):
1547 logger.warning(
1548 'nans or infs in trace at %s' % str(args))
1549 problems += 1
1551 if show_progress:
1552 pbar.update(i+1)
1554 finally:
1555 if show_progress:
1556 pbar.finish()
1558 return problems
1560 def check_earthmodels(self, config):
1561 if config.earthmodel_receiver_1d.profile('z')[-1] not in\
1562 config.earthmodel_1d.profile('z'):
1563 logger.warning('deepest layer of earthmodel_receiver_1d not '
1564 'found in earthmodel_1d')
1566 def _decimated_store_dir(self, decimate):
1567 return os.path.join(self.store_dir, 'decimated', str(decimate))
1569 def _decimated_store(self, decimate):
1570 if decimate == 1 or decimate not in self._decimated:
1571 return self, decimate
1572 else:
1573 store = self._decimated[decimate]
1574 if store is None:
1575 store = Store(self._decimated_store_dir(decimate), 'r')
1576 self._decimated[decimate] = store
1578 return store, 1
1580 def phase_filename(self, phase_id, attribute='phase'):
1581 check_string_id(phase_id)
1582 return os.path.join(
1583 self.store_dir, 'phases', phase_id + '.%s' % attribute)
1585 def get_phase_identifier(self, phase_id, attribute):
1586 return '{}.{}'.format(phase_id, attribute)
1588 def get_stored_phase(self, phase_def, attribute='phase'):
1589 '''
1590 Get stored phase from GF store.
1592 :returns: Phase information
1593 :rtype: :py:class:`pyrocko.spit.SPTree`
1594 '''
1596 phase_id = self.get_phase_identifier(phase_def, attribute)
1597 if phase_id not in self._phases:
1598 fn = self.phase_filename(phase_def, attribute)
1599 if not os.path.isfile(fn):
1600 raise NoSuchPhase(phase_id)
1602 spt = spit.SPTree(filename=fn)
1603 self._phases[phase_id] = spt
1605 return self._phases[phase_id]
1607 def get_phase(self, phase_def, attributes=None):
1608 toks = phase_def.split(':', 1)
1609 if len(toks) == 2:
1610 provider, phase_def = toks
1611 else:
1612 provider, phase_def = 'stored', toks[0]
1614 if attributes and provider not in ['stored', 'cake', 'iaspei']:
1615 raise meta.TimingAttributeError(
1616 'Attributes (%s) not supported for provider \'%s\'.' % (
1617 ', '.join("'%s'" % attribute for attribute in attributes),
1618 provider))
1620 if provider == 'stored':
1621 if attributes is None:
1622 return self.get_stored_phase(phase_def)
1623 else:
1624 def evaluate(args):
1625 return tuple(
1626 self.get_stored_phase(phase_def, attribute)(args)
1627 for attribute in ['phase'] + attributes)
1629 return evaluate
1631 elif provider == 'vel':
1632 vel = float(phase_def) * 1000.
1634 def evaluate(args):
1635 return self.config.get_distance(args) / vel
1637 return evaluate
1639 elif provider == 'vel_surface':
1640 vel = float(phase_def) * 1000.
1642 def evaluate(args):
1643 return self.config.get_surface_distance(args) / vel
1645 return evaluate
1647 elif provider in ('cake', 'iaspei'):
1648 from pyrocko import cake
1649 mod = self.config.earthmodel_1d
1650 if provider == 'cake':
1651 phases = [cake.PhaseDef(phase_def)]
1652 else:
1653 phases = cake.PhaseDef.classic(phase_def)
1655 def evaluate(args):
1656 if len(args) == 2:
1657 zr, zs, x = (self.config.receiver_depth,) + args
1658 elif len(args) == 3:
1659 zr, zs, x = args
1660 else:
1661 assert False
1663 if phases:
1664 rays = mod.arrivals(
1665 phases=phases,
1666 distances=[x*cake.m2d],
1667 zstart=zs,
1668 zstop=zr)
1669 else:
1670 rays = []
1672 rays.sort(key=lambda ray: ray.t)
1674 if rays:
1675 if attributes is None:
1676 return rays[0].t
1677 else:
1678 try:
1679 return rays[0].t_and_attributes(attributes)
1680 except KeyError as e:
1681 raise meta.TimingAttributeError(
1682 'Attribute %s not supported for provider '
1683 '\'%s\'.' % (str(e), provider)) from None
1684 else:
1685 if attributes is None:
1686 return None
1687 else:
1688 return (None,) * (len(attributes) + 1)
1690 return evaluate
1692 raise StoreError('unsupported phase provider: %s' % provider)
1694 def t(self, timing, *args, attributes=None):
1695 '''
1696 Compute interpolated phase arrivals.
1698 **Examples:**
1700 If ``test_store`` is a :py:class:`Type A<pyrocko.gf.meta.ConfigTypeA>`
1701 store::
1703 test_store.t('stored:p', (1000, 10000))
1704 test_store.t('last{stored:P|stored:Pdiff}', (1000, 10000))
1705 # The latter arrival
1706 # of P or diffracted
1707 # P phase
1709 If ``test_store`` is a :py:class:`Type B<pyrocko.gf.meta.ConfigTypeB>`
1710 store::
1712 test_store.t('S', (1000, 1000, 10000))
1713 test_store.t('first{P|p|Pdiff|sP}', (1000, 1000, 10000))
1714 # The first arrival of
1715 # the given phases is
1716 # selected
1718 Independent of the store type, it is also possible to specify two
1719 location objects and the GF index tuple is calculated internally::
1721 test_store.t('p', source, target)
1723 :param timing: travel-time definition
1724 :type timing: str or :py:class:`~pyrocko.gf.meta.Timing`
1725 :param \\*args: if ``len(args) == 1``, ``args[0]`` must be a
1726 :py:class:`GF index tuple <pyrocko.gf.meta.Config>`, e.g.
1727 ``(source_depth, distance, component)`` for a
1728 :py:class:`Type A<pyrocko.gf.meta.ConfigTypeA>` store. If
1729 ``len(args) == 2``, two location objects are expected, e.g.
1730 ``(source, receiver)`` and the appropriate GF index is computed
1731 internally.
1732 :type \\*args: (:py:class:`tuple`,) or
1733 (:py:class:`~pyrocko.model.location.Location`,
1734 :py:class:`~pyrocko.model.location.Location`)
1736 :param attributes: additional attributes to return along with the time.
1737 Requires the attribute to be either stored or it must be supported
1738 by the phase calculation backend. E.g. ``['takeoff_angle']``.
1739 :type attributes: :py:class:`list` of :py:class:`str`
1741 :returns: Phase arrival according to ``timing``
1742 :rtype: float or None
1743 '''
1745 if len(args) == 1:
1746 args = args[0]
1747 else:
1748 args = self.config.make_indexing_args1(*args)
1750 if not isinstance(timing, meta.Timing):
1751 timing = meta.Timing(timing)
1753 return timing.evaluate(self.get_phase, args, attributes=attributes)
1755 def get_available_interpolation_tables(self):
1756 filenames = glob(op.join(self.store_dir, 'phases', '*'))
1757 return [op.basename(file) for file in filenames]
1759 def get_stored_attribute(self, phase_def, attribute, *args):
1760 '''
1761 Return interpolated store attribute
1763 :param attribute: takeoff_angle / incidence_angle [deg]
1764 :type attribute: str
1765 :param \\*args: :py:class:`~pyrocko.gf.meta.Config` index tuple, e.g.
1766 ``(source_depth, distance, component)`` as in
1767 :py:class:`~pyrocko.gf.meta.ConfigTypeA`.
1768 :type \\*args: tuple
1769 '''
1770 try:
1771 return self.get_stored_phase(phase_def, attribute)(*args)
1772 except NoSuchPhase:
1773 raise StoreError(
1774 'Interpolation table for {} of {} does not exist! '
1775 'Available tables: {}'.format(
1776 attribute, phase_def,
1777 self.get_available_interpolation_tables()))
1779 def get_many_stored_attributes(self, phase_def, attribute, coords):
1780 '''
1781 Return interpolated store attribute
1783 :param attribute: takeoff_angle / incidence_angle [deg]
1784 :type attribute: str
1785 :param \\coords: :py:class:`numpy.ndarray`, with columns being
1786 ``(source_depth, distance, component)`` as in
1787 :py:class:`~pyrocko.gf.meta.ConfigTypeA`.
1788 :type \\coords: :py:class:`numpy.ndarray`
1789 '''
1790 try:
1791 return self.get_stored_phase(
1792 phase_def, attribute).interpolate_many(coords)
1793 except NoSuchPhase:
1794 raise StoreError(
1795 'Interpolation table for {} of {} does not exist! '
1796 'Available tables: {}'.format(
1797 attribute, phase_def,
1798 self.get_available_interpolation_tables()))
1800 def make_stored_table(self, attribute, force=False):
1801 '''
1802 Compute tables for selected ray attributes.
1804 :param attribute: phase / takeoff_angle [deg]/ incidence_angle [deg]
1805 :type attribute: str
1807 Tables are computed using the 1D earth model defined in
1808 :py:attr:`~pyrocko.gf.meta.Config.earthmodel_1d` for each defined phase
1809 in :py:attr:`~pyrocko.gf.meta.Config.tabulated_phases`.
1810 '''
1812 if attribute not in available_stored_tables:
1813 raise StoreError(
1814 'Cannot create attribute table for {}! '
1815 'Supported attribute tables: {}'.format(
1816 attribute, available_stored_tables))
1818 from pyrocko import cake
1819 config = self.config
1821 if not config.tabulated_phases:
1822 return
1824 mod = config.earthmodel_1d
1826 if not mod:
1827 raise StoreError('no earth model found')
1829 if config.earthmodel_receiver_1d:
1830 self.check_earthmodels(config)
1832 for pdef in config.tabulated_phases:
1834 phase_id = pdef.id
1835 phases = pdef.phases
1837 if attribute == 'phase':
1838 ftol = config.deltat * 0.5
1839 horvels = pdef.horizontal_velocities
1840 else:
1841 ftol = config.deltat * 0.01
1843 fn = self.phase_filename(phase_id, attribute)
1845 if os.path.exists(fn) and not force:
1846 logger.info('file already exists: %s' % fn)
1847 continue
1849 def evaluate(args):
1851 nargs = len(args)
1852 if nargs == 2:
1853 receiver_depth, source_depth, distance = (
1854 config.receiver_depth,) + args
1855 elif nargs == 3:
1856 receiver_depth, source_depth, distance = args
1857 else:
1858 raise ValueError(
1859 'Number of input arguments %i is not supported!'
1860 'Supported number of arguments: 2 or 3' % nargs)
1862 ray_attribute_values = []
1863 arrival_times = []
1864 if phases:
1865 rays = mod.arrivals(
1866 phases=phases,
1867 distances=[distance * cake.m2d],
1868 zstart=source_depth,
1869 zstop=receiver_depth)
1871 for ray in rays:
1872 arrival_times.append(ray.t)
1873 if attribute != 'phase':
1874 ray_attribute_values.append(
1875 getattr(ray, attribute)())
1877 if attribute == 'phase':
1878 for v in horvels:
1879 arrival_times.append(distance / (v * km))
1881 if arrival_times:
1882 if attribute == 'phase':
1883 return min(arrival_times)
1884 else:
1885 earliest_idx = num.argmin(arrival_times)
1886 return ray_attribute_values[earliest_idx]
1887 else:
1888 return None
1890 logger.info(
1891 'making "%s" table for phasegroup "%s"', attribute, phase_id)
1893 ip = spit.SPTree(
1894 f=evaluate,
1895 ftol=ftol,
1896 xbounds=num.transpose((config.mins, config.maxs)),
1897 xtols=config.deltas)
1899 util.ensuredirs(fn)
1900 ip.dump(fn)
1902 def make_timing_params(self, begin, end, snap_vred=True, force=False):
1903 '''
1904 Compute tight parameterized time ranges to include given timings.
1906 Calculates appropriate time ranges to cover given begin and end timings
1907 over all GF points in the store. A dict with the following keys is
1908 returned:
1910 * ``'tmin'``: time [s], minimum of begin timing over all GF points
1911 * ``'tmax'``: time [s], maximum of end timing over all GF points
1912 * ``'vred'``, ``'tmin_vred'``: slope [m/s] and offset [s] of reduction
1913 velocity [m/s] appropriate to catch begin timing over all GF points
1914 * ``'tlenmax_vred'``: maximum time length needed to cover all end
1915 timings, when using linear slope given with (``vred``, ``tmin_vred``)
1916 as start
1917 '''
1919 data = []
1920 warned = set()
1921 for args in self.config.iter_nodes(level=-1):
1922 tmin = self.t(begin, args)
1923 tmax = self.t(end, args)
1924 if tmin is None:
1925 warned.add(str(begin))
1926 if tmax is None:
1927 warned.add(str(end))
1929 x = self.config.get_surface_distance(args)
1930 data.append((x, tmin, tmax))
1932 if len(warned):
1933 w = ' | '.join(list(warned))
1934 msg = '''determination of time window failed using phase
1935definitions: %s.\n Travel time table contains holes in probed ranges. You can
1936use `fomosto tttlsd` to fix holes.''' % w
1937 if force:
1938 logger.warning(msg)
1939 else:
1940 raise MakeTimingParamsFailed(msg)
1942 xs, tmins, tmaxs = num.array(data, dtype=float).T
1944 tlens = tmaxs - tmins
1946 i = num.nanargmin(tmins)
1947 if not num.isfinite(i):
1948 raise MakeTimingParamsFailed('determination of time window failed')
1950 tminmin = tmins[i]
1951 x_tminmin = xs[i]
1952 dx = (xs - x_tminmin)
1953 dx = num.where(dx != 0.0, dx, num.nan)
1954 s = (tmins - tminmin) / dx
1955 sred = num.min(num.abs(s[num.isfinite(s)]))
1957 deltax = self.config.distance_delta
1959 if snap_vred:
1960 tdif = sred*deltax
1961 tdif2 = self.config.deltat * math.floor(tdif / self.config.deltat)
1962 sred = tdif2/self.config.distance_delta
1964 tmin_vred = tminmin - sred*x_tminmin
1965 if snap_vred:
1966 xe = x_tminmin - int(x_tminmin / deltax) * deltax
1967 tmin_vred = float(
1968 self.config.deltat *
1969 math.floor(tmin_vred / self.config.deltat) - xe * sred)
1971 tlenmax_vred = num.nanmax(tmax - (tmin_vred + sred*x))
1972 if sred != 0.0:
1973 vred = 1.0/sred
1974 else:
1975 vred = 0.0
1977 return dict(
1978 tmin=tminmin,
1979 tmax=num.nanmax(tmaxs),
1980 tlenmax=num.nanmax(tlens),
1981 tmin_vred=tmin_vred,
1982 tlenmax_vred=tlenmax_vred,
1983 vred=vred)
1985 def make_travel_time_tables(self, force=False):
1986 '''
1987 Compute travel time tables.
1989 Travel time tables are computed using the 1D earth model defined in
1990 :py:attr:`~pyrocko.gf.meta.Config.earthmodel_1d` for each defined phase
1991 in :py:attr:`~pyrocko.gf.meta.Config.tabulated_phases`. The accuracy of
1992 the tablulated times is adjusted to the sampling rate of the store.
1993 '''
1994 self.make_stored_table(attribute='phase', force=force)
1996 def make_ttt(self, force=False):
1997 self.make_travel_time_tables(force=force)
1999 def make_takeoff_angle_tables(self, force=False):
2000 '''
2001 Compute takeoff-angle tables.
2003 Takeoff-angle tables [deg] are computed using the 1D earth model
2004 defined in :py:attr:`~pyrocko.gf.meta.Config.earthmodel_1d` for each
2005 defined phase in :py:attr:`~pyrocko.gf.meta.Config.tabulated_phases`.
2006 The accuracy of the tablulated times is adjusted to 0.01 times the
2007 sampling rate of the store.
2008 '''
2009 self.make_stored_table(attribute='takeoff_angle', force=force)
2011 def make_incidence_angle_tables(self, force=False):
2012 '''
2013 Compute incidence-angle tables.
2015 Incidence-angle tables [deg] are computed using the 1D earth model
2016 defined in :py:attr:`~pyrocko.gf.meta.Config.earthmodel_1d` for each
2017 defined phase in :py:attr:`~pyrocko.gf.meta.Config.tabulated_phases`.
2018 The accuracy of the tablulated times is adjusted to 0.01 times the
2019 sampling rate of the store.
2020 '''
2021 self.make_stored_table(attribute='incidence_angle', force=force)
2023 def get_provided_components(self):
2025 scheme_desc = meta.component_scheme_to_description[
2026 self.config.component_scheme]
2028 quantity = self.config.stored_quantity \
2029 or scheme_desc.default_stored_quantity
2031 if not quantity:
2032 return scheme_desc.provided_components
2033 else:
2034 return [
2035 quantity + '.' + comp
2036 for comp in scheme_desc.provided_components]
2038 def fix_ttt_holes(self, phase_id):
2040 pdef = self.config.get_tabulated_phase(phase_id)
2041 mode = None
2042 for phase in pdef.phases:
2043 for leg in phase.legs():
2044 if mode is None:
2045 mode = leg.mode
2047 else:
2048 if mode != leg.mode:
2049 raise StoreError(
2050 'Can only fix holes in pure P or pure S phases.')
2052 sptree = self.get_stored_phase(phase_id)
2053 sptree_lsd = self.config.fix_ttt_holes(sptree, mode)
2055 phase_lsd = phase_id + '.lsd'
2056 fn = self.phase_filename(phase_lsd)
2057 sptree_lsd.dump(fn)
2059 def statics(self, source, multi_location, itsnapshot, components,
2060 interpolation='nearest_neighbor', nthreads=0):
2061 if not self._f_index:
2062 self.open()
2064 out = {}
2065 ntargets = multi_location.ntargets
2066 source_terms = source.get_source_terms(self.config.component_scheme)
2067 # TODO: deal with delays for snapshots > 1 sample
2069 if itsnapshot is not None:
2070 delays = source.times
2072 # Fringe case where we sample at sample 0 and sample 1
2073 tsnapshot = itsnapshot * self.config.deltat
2074 if delays.max() == tsnapshot and delays.min() != tsnapshot:
2075 delays[delays == delays.max()] -= self.config.deltat
2077 else:
2078 delays = source.times * 0
2079 itsnapshot = 1
2081 if ntargets == 0:
2082 raise StoreError('MultiLocation.coords5 is empty')
2084 res = store_ext.store_calc_static(
2085 self.cstore,
2086 source.coords5(),
2087 source_terms,
2088 delays,
2089 multi_location.coords5,
2090 self.config.component_scheme,
2091 interpolation,
2092 itsnapshot,
2093 nthreads)
2095 out = {}
2096 for icomp, (comp, comp_res) in enumerate(
2097 zip(self.get_provided_components(), res)):
2098 if comp not in components:
2099 continue
2100 out[comp] = res[icomp]
2102 return out
2104 def calc_seismograms(self, source, receivers, components, deltat=None,
2105 itmin=None, nsamples=None,
2106 interpolation='nearest_neighbor',
2107 optimization='enable', nthreads=1):
2108 config = self.config
2110 assert interpolation in ['nearest_neighbor', 'multilinear'], \
2111 'Unknown interpolation: %s' % interpolation
2113 if not isinstance(receivers, list):
2114 receivers = [receivers]
2116 if deltat is None:
2117 decimate = 1
2118 else:
2119 decimate = int(round(deltat/config.deltat))
2120 if abs(deltat / (decimate * config.deltat) - 1.0) > 0.001:
2121 raise StoreError(
2122 'unavailable decimation ratio target.deltat / store.deltat'
2123 ' = %g / %g' % (deltat, config.deltat))
2125 store, decimate_ = self._decimated_store(decimate)
2127 if not store._f_index:
2128 store.open()
2130 scheme = config.component_scheme
2132 source_coords_arr = source.coords5()
2133 source_terms = source.get_source_terms(scheme)
2134 delays = source.times.ravel()
2136 nreceiver = len(receivers)
2137 receiver_coords_arr = num.empty((nreceiver, 5))
2138 for irec, rec in enumerate(receivers):
2139 receiver_coords_arr[irec, :] = rec.coords5
2141 dt = self.get_deltat()
2143 itoffset = int(num.floor(delays.min()/dt)) if delays.size != 0 else 0
2145 if itmin is None:
2146 itmin = num.zeros(nreceiver, dtype=num.int32)
2147 else:
2148 itmin = (itmin-itoffset).astype(num.int32)
2150 if nsamples is None:
2151 nsamples = num.zeros(nreceiver, dtype=num.int32) - 1
2152 else:
2153 nsamples = nsamples.astype(num.int32)
2155 try:
2156 results = store_ext.store_calc_timeseries(
2157 store.cstore,
2158 source_coords_arr,
2159 source_terms,
2160 (delays - itoffset*dt),
2161 receiver_coords_arr,
2162 scheme,
2163 interpolation,
2164 itmin,
2165 nsamples,
2166 nthreads)
2167 except Exception as e:
2168 raise e
2170 provided_components = self.get_provided_components()
2171 ncomponents = len(provided_components)
2173 seismograms = [dict() for _ in range(nreceiver)]
2174 for ires in range(len(results)):
2175 res = results.pop(0)
2176 ireceiver = ires // ncomponents
2178 comp = provided_components[res[-2]]
2180 if comp not in components:
2181 continue
2183 tr = GFTrace(*res[:-2])
2184 tr.deltat = config.deltat * decimate
2185 tr.itmin += itoffset
2187 tr.n_records_stacked = 0
2188 tr.t_optimize = 0.
2189 tr.t_stack = 0.
2190 tr.err = res[-1]
2192 seismograms[ireceiver][comp] = tr
2194 return seismograms
2196 def seismogram(self, source, receiver, components, deltat=None,
2197 itmin=None, nsamples=None,
2198 interpolation='nearest_neighbor',
2199 optimization='enable', nthreads=1):
2201 config = self.config
2203 if deltat is None:
2204 decimate = 1
2205 else:
2206 decimate = int(round(deltat/config.deltat))
2207 if abs(deltat / (decimate * config.deltat) - 1.0) > 0.001:
2208 raise StoreError(
2209 'unavailable decimation ratio target.deltat / store.deltat'
2210 ' = %g / %g' % (deltat, config.deltat))
2212 store, decimate_ = self._decimated_store(decimate)
2214 if not store._f_index:
2215 store.open()
2217 scheme = config.component_scheme
2219 source_coords_arr = source.coords5()
2220 source_terms = source.get_source_terms(scheme)
2221 receiver_coords_arr = receiver.coords5[num.newaxis, :].copy()
2223 try:
2224 params = store_ext.make_sum_params(
2225 store.cstore,
2226 source_coords_arr,
2227 source_terms,
2228 receiver_coords_arr,
2229 scheme,
2230 interpolation, nthreads)
2232 except store_ext.StoreExtError:
2233 raise meta.OutOfBounds()
2235 provided_components = self.get_provided_components()
2237 out = {}
2238 for icomp, comp in enumerate(provided_components):
2239 if comp in components:
2240 weights, irecords = params[icomp]
2242 neach = irecords.size // source.times.size
2243 delays = num.repeat(source.times, neach)
2245 tr = store._sum(
2246 irecords, delays, weights, itmin, nsamples, decimate_,
2247 'c', optimization)
2249 # to prevent problems with rounding errors (BaseStore saves
2250 # deltat as a 4-byte floating point value, value from YAML
2251 # config is more accurate)
2252 tr.deltat = config.deltat * decimate
2254 out[comp] = tr
2256 return out
2259__all__ = '''
2260gf_dtype
2261NotMultipleOfSamplingInterval
2262GFTrace
2263CannotCreate
2264CannotOpen
2265DuplicateInsert
2266NotAllowedToInterpolate
2267NoSuchExtra
2268NoSuchPhase
2269BaseStore
2270Store
2271SeismosizerErrorEnum
2272'''.split()