1# http://pyrocko.org - GPLv3
2#
3# The Pyrocko Developers, 21st Century
4# ---|P------/S----------~Lg----------
6import errno
7import time
8import os
9import struct
10import weakref
11import math
12import shutil
13try:
14 import fcntl
15except ImportError:
16 fcntl = None
17import copy
18import logging
19import re
20import hashlib
21from glob import glob
23import numpy as num
24from scipy import signal
26from . import meta
27from .error import StoreError
28try:
29 from . import store_ext
30except ImportError:
31 store_ext = None
32from pyrocko import util, spit
34logger = logging.getLogger('pyrocko.gf.store')
35op = os.path
37# gf store endianness
38E = '<'
40gf_dtype = num.dtype(num.float32)
41gf_dtype_store = num.dtype(E + 'f4')
43gf_dtype_nbytes_per_sample = 4
45gf_store_header_dtype = [
46 ('nrecords', E + 'u8'),
47 ('deltat', E + 'f4'),
48]
50gf_store_header_fmt = E + 'Qf'
51gf_store_header_fmt_size = struct.calcsize(gf_store_header_fmt)
53gf_record_dtype = num.dtype([
54 ('data_offset', E + 'u8'),
55 ('itmin', E + 'i4'),
56 ('nsamples', E + 'u4'),
57 ('begin_value', E + 'f4'),
58 ('end_value', E + 'f4'),
59])
61available_stored_tables = ['phase', 'takeoff_angle', 'incidence_angle']
63km = 1000.
66class SeismosizerErrorEnum:
67 SUCCESS = 0
68 INVALID_RECORD = 1
69 EMPTY_RECORD = 2
70 BAD_RECORD = 3
71 ALLOC_FAILED = 4
72 BAD_REQUEST = 5
73 BAD_DATA_OFFSET = 6
74 READ_DATA_FAILED = 7
75 SEEK_INDEX_FAILED = 8
76 READ_INDEX_FAILED = 9
77 FSTAT_TRACES_FAILED = 10
78 BAD_STORE = 11
79 MMAP_INDEX_FAILED = 12
80 MMAP_TRACES_FAILED = 13
81 INDEX_OUT_OF_BOUNDS = 14
82 NTARGETS_OUT_OF_BOUNDS = 15
85def valid_string_id(s):
86 return re.match(meta.StringID.pattern, s)
89def check_string_id(s):
90 if not valid_string_id(s):
91 raise ValueError('invalid name %s' % s)
93# - data_offset
94#
95# Data file offset of first sample in bytes (the seek value).
96# Special values:
97#
98# 0x00 - missing trace
99# 0x01 - zero trace (GF trace is physically zero)
100# 0x02 - short trace of 1 or 2 samples (no need for data allocation)
101#
102# - itmin
103#
104# Time of first sample of the trace as a multiple of the sampling interval. All
105# GF samples must be evaluated at multiples of the sampling interval with
106# respect to a global reference time zero. Must be set also for zero traces.
107#
108# - nsamples
109#
110# Number of samples in the GF trace. Should be set to zero for zero traces.
111#
112# - begin_value, end_value
113#
114# Values of first and last sample. These values are included in data[]
115# redunantly.
118class NotMultipleOfSamplingInterval(Exception):
119 pass
122sampling_check_eps = 1e-5
125class GFTrace(object):
126 '''
127 Green's Function trace class for handling traces from the GF store.
128 '''
130 @classmethod
131 def from_trace(cls, tr):
132 return cls(data=tr.ydata.copy(), tmin=tr.tmin, deltat=tr.deltat)
134 def __init__(self, data=None, itmin=None, deltat=1.0,
135 is_zero=False, begin_value=None, end_value=None, tmin=None):
137 assert sum((x is None) for x in (tmin, itmin)) == 1, \
138 'GFTrace: either tmin or itmin must be given'\
140 if tmin is not None:
141 itmin = int(round(tmin / deltat))
142 if abs(itmin*deltat - tmin) > sampling_check_eps*deltat:
143 raise NotMultipleOfSamplingInterval(
144 'GFTrace: tmin (%g) is not a multiple of sampling '
145 'interval (%g)' % (tmin, deltat))
147 if data is not None:
148 data = num.asarray(data, dtype=gf_dtype)
149 else:
150 data = num.array([], dtype=gf_dtype)
151 begin_value = 0.0
152 end_value = 0.0
154 self.data = weakref.ref(data)()
155 self.itmin = itmin
156 self.deltat = deltat
157 self.is_zero = is_zero
158 self.n_records_stacked = 0.
159 self.t_stack = 0.
160 self.t_optimize = 0.
161 self.err = SeismosizerErrorEnum.SUCCESS
163 if data is not None and data.size > 0:
164 if begin_value is None:
165 begin_value = data[0]
166 if end_value is None:
167 end_value = data[-1]
169 self.begin_value = begin_value
170 self.end_value = end_value
172 @property
173 def t(self):
174 '''
175 Time vector of the GF trace.
177 :returns: Time vector
178 :rtype: :py:class:`numpy.ndarray`
179 '''
180 return num.linspace(
181 self.itmin*self.deltat,
182 (self.itmin+self.data.size-1)*self.deltat, self.data.size)
184 def __str__(self, itmin=0):
186 if self.is_zero:
187 return 'ZERO'
189 s = []
190 for i in range(itmin, self.itmin + self.data.size):
191 if i >= self.itmin and i < self.itmin + self.data.size:
192 s.append('%7.4g' % self.data[i-self.itmin])
193 else:
194 s.append(' '*7)
196 return '|'.join(s)
198 def to_trace(self, net, sta, loc, cha):
199 from pyrocko import trace
200 return trace.Trace(
201 net, sta, loc, cha,
202 ydata=self.data,
203 deltat=self.deltat,
204 tmin=self.itmin*self.deltat)
207class GFValue(object):
209 def __init__(self, value):
210 self.value = value
211 self.n_records_stacked = 0.
212 self.t_stack = 0.
213 self.t_optimize = 0.
216Zero = GFTrace(is_zero=True, itmin=0)
219def make_same_span(tracesdict):
221 traces = tracesdict.values()
223 nonzero = [tr for tr in traces if not tr.is_zero]
224 if not nonzero:
225 return {k: Zero for k in tracesdict.keys()}
227 deltat = nonzero[0].deltat
229 itmin = min(tr.itmin for tr in nonzero)
230 itmax = max(tr.itmin+tr.data.size for tr in nonzero) - 1
232 out = {}
233 for k, tr in tracesdict.items():
234 n = itmax - itmin + 1
235 if tr.itmin != itmin or tr.data.size != n:
236 data = num.zeros(n, dtype=gf_dtype)
237 if not tr.is_zero:
238 lo = tr.itmin-itmin
239 hi = lo + tr.data.size
240 data[:lo] = tr.data[0]
241 data[lo:hi] = tr.data
242 data[hi:] = tr.data[-1]
244 tr = GFTrace(data, itmin, deltat)
246 out[k] = tr
248 return out
251class CannotCreate(StoreError):
252 pass
255class CannotOpen(StoreError):
256 pass
259class DuplicateInsert(StoreError):
260 pass
263class ShortRead(StoreError):
264 def __str__(self):
265 return 'unexpected end of data (truncated traces file?)'
268class NotAllowedToInterpolate(StoreError):
269 def __str__(self):
270 return 'not allowed to interpolate'
273class NoSuchExtra(StoreError):
274 def __init__(self, s):
275 StoreError.__init__(self)
276 self.value = s
278 def __str__(self):
279 return 'extra information for key "%s" not found.' % self.value
282class NoSuchPhase(StoreError):
283 def __init__(self, s):
284 StoreError.__init__(self)
285 self.value = s
287 def __str__(self):
288 return 'phase for key "%s" not found. ' \
289 'Running "fomosto ttt" or "fomosto sat" may be needed.' \
290 % self.value
293def remove_if_exists(fn, force=False):
294 if os.path.exists(fn):
295 if force:
296 os.remove(fn)
297 else:
298 raise CannotCreate('file %s already exists' % fn)
301def get_extra_path(store_dir, key):
302 check_string_id(key)
303 return os.path.join(store_dir, 'extra', key)
306class BaseStore(object):
308 @staticmethod
309 def lock_fn_(store_dir):
310 return os.path.join(store_dir, 'lock')
312 @staticmethod
313 def index_fn_(store_dir):
314 return os.path.join(store_dir, 'index')
316 @staticmethod
317 def data_fn_(store_dir):
318 return os.path.join(store_dir, 'traces')
320 @staticmethod
321 def config_fn_(store_dir):
322 return os.path.join(store_dir, 'config')
324 @staticmethod
325 def create(store_dir, deltat, nrecords, force=False):
327 try:
328 util.ensuredir(store_dir)
329 except Exception:
330 raise CannotCreate('cannot create directory %s' % store_dir)
332 index_fn = BaseStore.index_fn_(store_dir)
333 data_fn = BaseStore.data_fn_(store_dir)
335 for fn in (index_fn, data_fn):
336 remove_if_exists(fn, force)
338 with open(index_fn, 'wb') as f:
339 f.write(struct.pack(gf_store_header_fmt, nrecords, deltat))
340 records = num.zeros(nrecords, dtype=gf_record_dtype)
341 records.tofile(f)
343 with open(data_fn, 'wb') as f:
344 f.write(b'\0' * 32)
346 def __init__(self, store_dir, mode='r', use_memmap=True):
347 assert mode in 'rw'
348 self.store_dir = store_dir
349 self.mode = mode
350 self._use_memmap = use_memmap
351 self._nrecords = None
352 self._deltat = None
353 self._f_index = None
354 self._f_data = None
355 self._records = None
356 self.cstore = None
358 def open(self):
359 assert self._f_index is None
360 index_fn = self.index_fn()
361 data_fn = self.data_fn()
363 if self.mode == 'r':
364 fmode = 'rb'
365 elif self.mode == 'w':
366 fmode = 'r+b'
367 else:
368 assert False, 'invalid mode: %s' % self.mode
370 try:
371 self._f_index = open(index_fn, fmode)
372 self._f_data = open(data_fn, fmode)
373 except Exception:
374 self.mode = ''
375 raise CannotOpen('cannot open gf store: %s' % self.store_dir)
377 try:
378 # replace single precision deltat value in store with the double
379 # precision value from the config, if available
380 self.cstore = store_ext.store_init(
381 self._f_index.fileno(), self._f_data.fileno(),
382 self.get_deltat() or 0.0)
384 except store_ext.StoreExtError as e:
385 raise StoreError(str(e))
387 while True:
388 try:
389 dataheader = self._f_index.read(gf_store_header_fmt_size)
390 break
392 except IOError as e:
393 # occasionally got this one on an NFS volume
394 if e.errno == errno.EBUSY:
395 time.sleep(0.01)
396 else:
397 raise
399 nrecords, deltat = struct.unpack(gf_store_header_fmt, dataheader)
400 self._nrecords = nrecords
401 self._deltat = deltat
403 self._load_index()
405 def __del__(self):
406 if self.mode != '':
407 self.close()
409 def lock(self):
410 if not fcntl:
411 lock_fn = self.lock_fn()
412 util.create_lockfile(lock_fn)
413 else:
414 if not self._f_index:
415 self.open()
417 while True:
418 try:
419 fcntl.lockf(self._f_index, fcntl.LOCK_EX)
420 break
422 except IOError as e:
423 if e.errno == errno.ENOLCK:
424 time.sleep(0.01)
425 else:
426 raise
428 def unlock(self):
429 if not fcntl:
430 lock_fn = self.lock_fn()
431 util.delete_lockfile(lock_fn)
432 else:
433 self._f_data.flush()
434 self._f_index.flush()
435 fcntl.lockf(self._f_index, fcntl.LOCK_UN)
437 def put(self, irecord, trace):
438 self._put(irecord, trace)
440 def get_record(self, irecord):
441 return self._get_record(irecord)
443 def get_span(self, irecord, decimate=1):
444 return self._get_span(irecord, decimate=decimate)
446 def get(self, irecord, itmin=None, nsamples=None, decimate=1,
447 implementation='c'):
448 return self._get(irecord, itmin, nsamples, decimate, implementation)
450 def sum(self, irecords, delays, weights, itmin=None,
451 nsamples=None, decimate=1, implementation='c',
452 optimization='enable'):
453 return self._sum(irecords, delays, weights, itmin, nsamples, decimate,
454 implementation, optimization)
456 def irecord_format(self):
457 return util.zfmt(self._nrecords)
459 def str_irecord(self, irecord):
460 return self.irecord_format() % irecord
462 def close(self):
463 if self.mode == 'w':
464 if not self._f_index:
465 self.open()
466 self._save_index()
468 if self._f_data:
469 self._f_data.close()
470 self._f_data = None
472 if self._f_index:
473 self._f_index.close()
474 self._f_index = None
476 del self._records
477 self._records = None
479 self.mode = ''
481 def _get_record(self, irecord):
482 if not self._f_index:
483 self.open()
485 return self._records[irecord]
487 def _get(self, irecord, itmin, nsamples, decimate, implementation):
488 '''
489 Retrieve complete GF trace from storage.
490 '''
492 if not self._f_index:
493 self.open()
495 if not self.mode == 'r':
496 raise StoreError('store not open in read mode')
498 if implementation == 'c' and decimate == 1:
500 if nsamples is None:
501 nsamples = -1
503 if itmin is None:
504 itmin = 0
506 try:
507 return GFTrace(*store_ext.store_get(
508 self.cstore, int(irecord), int(itmin), int(nsamples)))
509 except store_ext.StoreExtError as e:
510 raise StoreError(str(e))
512 else:
513 return self._get_impl_reference(irecord, itmin, nsamples, decimate)
515 def get_deltat(self):
516 return self._deltat
518 def _get_impl_reference(self, irecord, itmin, nsamples, decimate):
519 deltat = self.get_deltat()
521 if not (0 <= irecord < self._nrecords):
522 raise StoreError('invalid record number requested '
523 '(irecord = %i, nrecords = %i)' %
524 (irecord, self._nrecords))
526 ipos, itmin_data, nsamples_data, begin_value, end_value = \
527 self._records[irecord]
529 if None in (itmin, nsamples):
530 itmin = itmin_data
531 itmax = itmin_data + nsamples_data - 1
532 nsamples = nsamples_data
533 else:
534 itmin = itmin * decimate
535 nsamples = nsamples * decimate
536 itmax = itmin + nsamples - decimate
538 if ipos == 0:
539 return None
541 elif ipos == 1:
542 itmin_ext = (max(itmin, itmin_data)//decimate) * decimate
543 return GFTrace(is_zero=True, itmin=itmin_ext//decimate)
545 if decimate == 1:
546 ilo = max(itmin, itmin_data) - itmin_data
547 ihi = min(itmin+nsamples, itmin_data+nsamples_data) - itmin_data
548 data = self._get_data(ipos, begin_value, end_value, ilo, ihi)
550 return GFTrace(data, itmin=itmin_data+ilo, deltat=deltat,
551 begin_value=begin_value, end_value=end_value)
553 else:
554 itmax_data = itmin_data + nsamples_data - 1
556 # put begin and end to multiples of new sampling rate
557 itmin_ext = (max(itmin, itmin_data)//decimate) * decimate
558 itmax_ext = -((-min(itmax, itmax_data))//decimate) * decimate
559 nsamples_ext = itmax_ext - itmin_ext + 1
561 # add some padding for the aa filter
562 order = 30
563 itmin_ext_pad = itmin_ext - order//2
564 itmax_ext_pad = itmax_ext + order//2
565 nsamples_ext_pad = itmax_ext_pad - itmin_ext_pad + 1
567 itmin_overlap = max(itmin_data, itmin_ext_pad)
568 itmax_overlap = min(itmax_data, itmax_ext_pad)
570 ilo = itmin_overlap - itmin_ext_pad
571 ihi = max(ilo, itmax_overlap - itmin_ext_pad + 1)
572 ilo_data = itmin_overlap - itmin_data
573 ihi_data = max(ilo_data, itmax_overlap - itmin_data + 1)
575 data_ext_pad = num.empty(nsamples_ext_pad, dtype=gf_dtype)
576 data_ext_pad[ilo:ihi] = self._get_data(
577 ipos, begin_value, end_value, ilo_data, ihi_data)
579 data_ext_pad[:ilo] = begin_value
580 data_ext_pad[ihi:] = end_value
582 b = signal.firwin(order + 1, 1. / decimate, window='hamming')
583 a = 1.
584 data_filt_pad = signal.lfilter(b, a, data_ext_pad)
585 data_deci = data_filt_pad[order:order+nsamples_ext:decimate]
586 if data_deci.size >= 1:
587 if itmin_ext <= itmin_data:
588 data_deci[0] = begin_value
590 if itmax_ext >= itmax_data:
591 data_deci[-1] = end_value
593 return GFTrace(data_deci, itmin_ext//decimate,
594 deltat*decimate,
595 begin_value=begin_value, end_value=end_value)
597 def _get_span(self, irecord, decimate=1):
598 '''
599 Get temporal extent of GF trace at given index.
600 '''
602 if not self._f_index:
603 self.open()
605 assert 0 <= irecord < self._nrecords, \
606 'irecord = %i, nrecords = %i' % (irecord, self._nrecords)
608 (_, itmin, nsamples, _, _) = self._records[irecord]
610 itmax = itmin + nsamples - 1
612 if decimate == 1:
613 return itmin, itmax
614 else:
615 if nsamples == 0:
616 return itmin//decimate, itmin//decimate - 1
617 else:
618 return itmin//decimate, -((-itmax)//decimate)
620 def _put(self, irecord, trace):
621 '''
622 Save GF trace to storage.
623 '''
625 if not self._f_index:
626 self.open()
628 deltat = self.get_deltat()
630 assert self.mode == 'w'
631 assert trace.is_zero or \
632 abs(trace.deltat - deltat) < 1e-7 * deltat
633 assert 0 <= irecord < self._nrecords, \
634 'irecord = %i, nrecords = %i' % (irecord, self._nrecords)
636 if self._records[irecord][0] != 0:
637 raise DuplicateInsert('record %i already in store' % irecord)
639 if trace.is_zero or num.all(trace.data == 0.0):
640 self._records[irecord] = (1, trace.itmin, 0, 0., 0.)
641 return
643 ndata = trace.data.size
645 if ndata > 2:
646 self._f_data.seek(0, 2)
647 ipos = self._f_data.tell()
648 trace.data.astype(gf_dtype_store).tofile(self._f_data)
649 else:
650 ipos = 2
652 self._records[irecord] = (ipos, trace.itmin, ndata,
653 trace.data[0], trace.data[-1])
655 def _sum_impl_alternative(self, irecords, delays, weights, itmin, nsamples,
656 decimate):
658 '''
659 Sum delayed and weighted GF traces.
660 '''
662 if not self._f_index:
663 self.open()
665 assert self.mode == 'r'
667 deltat = self.get_deltat() * decimate
669 if len(irecords) == 0:
670 if None in (itmin, nsamples):
671 return Zero
672 else:
673 return GFTrace(
674 num.zeros(nsamples, dtype=gf_dtype), itmin,
675 deltat, is_zero=True)
677 assert len(irecords) == len(delays)
678 assert len(irecords) == len(weights)
680 if None in (itmin, nsamples):
681 itmin_delayed, itmax_delayed = [], []
682 for irecord, delay in zip(irecords, delays):
683 itmin, itmax = self._get_span(irecord, decimate=decimate)
684 itmin_delayed.append(itmin + delay/deltat)
685 itmax_delayed.append(itmax + delay/deltat)
687 itmin = int(math.floor(min(itmin_delayed)))
688 nsamples = int(math.ceil(max(itmax_delayed))) - itmin + 1
690 out = num.zeros(nsamples, dtype=gf_dtype)
691 if nsamples == 0:
692 return GFTrace(out, itmin, deltat)
694 for ii in range(len(irecords)):
695 irecord = irecords[ii]
696 delay = delays[ii]
697 weight = weights[ii]
699 if weight == 0.0:
700 continue
702 idelay_floor = int(math.floor(delay/deltat))
703 idelay_ceil = int(math.ceil(delay/deltat))
705 gf_trace = self._get(
706 irecord,
707 itmin - idelay_ceil,
708 nsamples + idelay_ceil - idelay_floor,
709 decimate,
710 'reference')
712 assert gf_trace.itmin >= itmin - idelay_ceil
713 assert gf_trace.data.size <= nsamples + idelay_ceil - idelay_floor
715 if gf_trace.is_zero:
716 continue
718 ilo = gf_trace.itmin - itmin + idelay_floor
719 ihi = ilo + gf_trace.data.size + (idelay_ceil-idelay_floor)
721 data = gf_trace.data
723 if idelay_floor == idelay_ceil:
724 out[ilo:ihi] += data * weight
725 else:
726 if data.size:
727 k = 1
728 if ihi <= nsamples:
729 out[ihi-1] += gf_trace.end_value * \
730 ((idelay_ceil-delay/deltat) * weight)
731 k = 0
733 out[ilo+1:ihi-k] += data[:data.size-k] * \
734 ((delay/deltat-idelay_floor) * weight)
735 k = 1
736 if ilo >= 0:
737 out[ilo] += gf_trace.begin_value * \
738 ((delay/deltat-idelay_floor) * weight)
739 k = 0
741 out[ilo+k:ihi-1] += data[k:] * \
742 ((idelay_ceil-delay/deltat) * weight)
744 if ilo > 0 and gf_trace.begin_value != 0.0:
745 out[:ilo] += gf_trace.begin_value * weight
747 if ihi < nsamples and gf_trace.end_value != 0.0:
748 out[ihi:] += gf_trace.end_value * weight
750 return GFTrace(out, itmin, deltat)
752 def _sum_impl_reference(self, irecords, delays, weights, itmin, nsamples,
753 decimate):
755 if not self._f_index:
756 self.open()
758 deltat = self.get_deltat() * decimate
760 if len(irecords) == 0:
761 if None in (itmin, nsamples):
762 return Zero
763 else:
764 return GFTrace(
765 num.zeros(nsamples, dtype=gf_dtype), itmin,
766 deltat, is_zero=True)
768 datas = []
769 itmins = []
770 for i, delay, weight in zip(irecords, delays, weights):
771 tr = self._get(i, None, None, decimate, 'reference')
773 idelay_floor = int(math.floor(delay/deltat))
774 idelay_ceil = int(math.ceil(delay/deltat))
776 if idelay_floor == idelay_ceil:
777 itmins.append(tr.itmin + idelay_floor)
778 datas.append(tr.data.copy()*weight)
779 else:
780 itmins.append(tr.itmin + idelay_floor)
781 datas.append(tr.data.copy()*weight*(idelay_ceil-delay/deltat))
782 itmins.append(tr.itmin + idelay_ceil)
783 datas.append(tr.data.copy()*weight*(delay/deltat-idelay_floor))
785 itmin_all = min(itmins)
787 itmax_all = max(itmin_ + data.size for (itmin_, data) in
788 zip(itmins, datas))
790 if itmin is not None:
791 itmin_all = min(itmin_all, itmin)
792 if nsamples is not None:
793 itmax_all = max(itmax_all, itmin+nsamples)
795 nsamples_all = itmax_all - itmin_all
797 arr = num.zeros((len(datas), nsamples_all), dtype=gf_dtype)
798 for i, itmin_, data in zip(num.arange(len(datas)), itmins, datas):
799 if data.size > 0:
800 ilo = itmin_-itmin_all
801 ihi = ilo + data.size
802 arr[i, :ilo] = data[0]
803 arr[i, ilo:ihi] = data
804 arr[i, ihi:] = data[-1]
806 sum_arr = arr.sum(axis=0)
808 if itmin is not None and nsamples is not None:
809 ilo = itmin-itmin_all
810 ihi = ilo + nsamples
811 sum_arr = sum_arr[ilo:ihi]
813 else:
814 itmin = itmin_all
816 return GFTrace(sum_arr, itmin, deltat)
818 def _optimize(self, irecords, delays, weights):
819 if num.unique(irecords).size == irecords.size:
820 return irecords, delays, weights
822 deltat = self.get_deltat()
824 delays = delays / deltat
825 irecords2 = num.repeat(irecords, 2)
826 delays2 = num.empty(irecords2.size, dtype=float)
827 delays2[0::2] = num.floor(delays)
828 delays2[1::2] = num.ceil(delays)
829 weights2 = num.repeat(weights, 2)
830 weights2[0::2] *= 1.0 - (delays - delays2[0::2])
831 weights2[1::2] *= (1.0 - (delays2[1::2] - delays)) * \
832 (delays2[1::2] - delays2[0::2])
834 delays2 *= deltat
836 iorder = num.lexsort((delays2, irecords2))
838 irecords2 = irecords2[iorder]
839 delays2 = delays2[iorder]
840 weights2 = weights2[iorder]
842 ui = num.empty(irecords2.size, dtype=bool)
843 ui[1:] = num.logical_or(num.diff(irecords2) != 0,
844 num.diff(delays2) != 0.)
846 ui[0] = 0
847 ind2 = num.cumsum(ui)
848 ui[0] = 1
849 ind1 = num.where(ui)[0]
851 irecords3 = irecords2[ind1]
852 delays3 = delays2[ind1]
853 weights3 = num.bincount(ind2, weights2)
855 return irecords3, delays3, weights3
857 def _sum(self, irecords, delays, weights, itmin, nsamples, decimate,
858 implementation, optimization):
860 if not self._f_index:
861 self.open()
863 t0 = time.time()
864 if optimization == 'enable':
865 irecords, delays, weights = self._optimize(
866 irecords, delays, weights)
867 else:
868 assert optimization == 'disable'
870 t1 = time.time()
871 deltat = self.get_deltat()
873 if implementation == 'c' and decimate == 1:
874 if delays.size != 0:
875 itoffset = int(num.floor(num.min(delays)/deltat))
876 else:
877 itoffset = 0
879 if nsamples is None:
880 nsamples = -1
882 if itmin is None:
883 itmin = 0
884 else:
885 itmin -= itoffset
887 try:
888 tr = GFTrace(*store_ext.store_sum(
889 self.cstore, irecords.astype(num.uint64),
890 delays - itoffset*deltat,
891 weights.astype(num.float32),
892 int(itmin), int(nsamples)))
894 tr.itmin += itoffset
896 except store_ext.StoreExtError as e:
897 raise StoreError(str(e) + ' in store %s' % self.store_dir)
899 elif implementation == 'alternative':
900 tr = self._sum_impl_alternative(irecords, delays, weights,
901 itmin, nsamples, decimate)
903 else:
904 tr = self._sum_impl_reference(irecords, delays, weights,
905 itmin, nsamples, decimate)
907 t2 = time.time()
909 tr.n_records_stacked = irecords.size
910 tr.t_optimize = t1 - t0
911 tr.t_stack = t2 - t1
913 return tr
915 def _sum_statics(self, irecords, delays, weights, it, ntargets,
916 nthreads):
917 if not self._f_index:
918 self.open()
920 return store_ext.store_sum_static(
921 self.cstore, irecords, delays, weights, it, ntargets, nthreads)
923 def _load_index(self):
924 if self._use_memmap:
925 records = num.memmap(
926 self._f_index, dtype=gf_record_dtype,
927 offset=gf_store_header_fmt_size,
928 mode=('r', 'r+')[self.mode == 'w'])
930 else:
931 self._f_index.seek(gf_store_header_fmt_size)
932 records = num.fromfile(self._f_index, dtype=gf_record_dtype)
934 assert len(records) == self._nrecords
936 self._records = records
938 def _save_index(self):
939 self._f_index.seek(0)
940 self._f_index.write(struct.pack(gf_store_header_fmt, self._nrecords,
941 self.get_deltat()))
943 if self._use_memmap:
944 self._records.flush()
945 else:
946 self._f_index.seek(gf_store_header_fmt_size)
947 self._records.tofile(self._f_index)
948 self._f_index.flush()
950 def _get_data(self, ipos, begin_value, end_value, ilo, ihi):
951 if ihi - ilo > 0:
952 if ipos == 2:
953 data_orig = num.empty(2, dtype=gf_dtype)
954 data_orig[0] = begin_value
955 data_orig[1] = end_value
956 return data_orig[ilo:ihi]
957 else:
958 self._f_data.seek(
959 int(ipos + ilo*gf_dtype_nbytes_per_sample))
960 arr = num.fromfile(
961 self._f_data, gf_dtype_store, ihi-ilo).astype(gf_dtype)
963 if arr.size != ihi-ilo:
964 raise ShortRead()
965 return arr
966 else:
967 return num.empty((0,), dtype=gf_dtype)
969 def lock_fn(self):
970 return BaseStore.lock_fn_(self.store_dir)
972 def index_fn(self):
973 return BaseStore.index_fn_(self.store_dir)
975 def data_fn(self):
976 return BaseStore.data_fn_(self.store_dir)
978 def config_fn(self):
979 return BaseStore.config_fn_(self.store_dir)
981 def count_special_records(self):
982 if not self._f_index:
983 self.open()
985 return num.histogram(
986 self._records['data_offset'],
987 bins=[0, 1, 2, 3, num.array(-1).astype(num.uint64)])[0]
989 @property
990 def size_index(self):
991 return os.stat(self.index_fn()).st_size
993 @property
994 def size_data(self):
995 return os.stat(self.data_fn()).st_size
997 @property
998 def size_index_and_data(self):
999 return self.size_index + self.size_data
1001 @property
1002 def size_index_and_data_human(self):
1003 return util.human_bytesize(self.size_index_and_data)
1005 def stats(self):
1006 counter = self.count_special_records()
1008 stats = dict(
1009 total=self._nrecords,
1010 inserted=(counter[1] + counter[2] + counter[3]),
1011 empty=counter[0],
1012 short=counter[2],
1013 zero=counter[1],
1014 size_data=self.size_data,
1015 size_index=self.size_index,
1016 )
1018 return stats
1020 stats_keys = 'total inserted empty short zero size_data size_index'.split()
1023def remake_dir(dpath, force):
1024 if os.path.exists(dpath):
1025 if force:
1026 shutil.rmtree(dpath)
1027 else:
1028 raise CannotCreate('Directory "%s" already exists.' % dpath)
1030 os.mkdir(dpath)
1033class MakeTimingParamsFailed(StoreError):
1034 pass
1037class Store(BaseStore):
1039 '''
1040 Green's function disk storage and summation machine.
1042 The `Store` can be used to efficiently store, retrieve, and sum Green's
1043 function traces. A `Store` contains many 1D time traces sampled at even
1044 multiples of a global sampling rate, where each time trace has an
1045 individual start and end time. The traces are treated as having repeating
1046 end points, so the functions they represent can be non-constant only
1047 between begin and end time. It provides capabilities to retrieve decimated
1048 traces and to extract parts of the traces. The main purpose of this class
1049 is to provide a fast, easy to use, and flexible machanism to compute
1050 weighted delay-and-sum stacks with many Green's function traces involved.
1052 Individual Green's functions are accessed through a single integer index at
1053 low level. On top of that, various indexing schemes can be implemented by
1054 providing a mapping from physical coordinates to the low level index `i`.
1055 E.g. for a problem with cylindrical symmetry, one might define a mapping
1056 from the coordinates (`receiver_depth`, `source_depth`, `distance`) to the
1057 low level index. Index translation is done in the
1058 :py:class:`pyrocko.gf.meta.Config` subclass object associated with the
1059 Store. It is accessible through the store's :py:attr:`config` attribute,
1060 and contains all meta information about the store, including physical
1061 extent, geometry, sampling rate, and information about the type of the
1062 stored Green's functions. Information about the underlying earth model
1063 can also be found there.
1065 A GF store can also contain tabulated phase arrivals. In basic cases, these
1066 can be created with the :py:meth:`make_travel_time_tables` and evaluated
1067 with the :py:func:`t` methods.
1069 .. attribute:: config
1071 The :py:class:`pyrocko.gf.meta.Config` derived object associated with
1072 the store which contains all meta information about the store and
1073 provides the high-level to low-level index mapping.
1075 .. attribute:: store_dir
1077 Path to the store's data directory.
1079 .. attribute:: mode
1081 The mode in which the store is opened: ``'r'``: read-only, ``'w'``:
1082 writeable.
1083 '''
1085 @classmethod
1086 def create(cls, store_dir, config, force=False, extra=None):
1087 '''
1088 Create new GF store.
1090 Creates a new GF store at path ``store_dir``. The layout of the GF is
1091 defined with the parameters given in ``config``, which should be an
1092 object of a subclass of :py:class:`~pyrocko.gf.meta.Config`. This
1093 function will refuse to overwrite an existing GF store, unless
1094 ``force`` is set to ``True``. If more information, e.g. parameters
1095 used for the modelling code, earth models or other, should be saved
1096 along with the GF store, these may be provided though a dict given to
1097 ``extra``. The keys of this dict must be names and the values must be
1098 *guts* type objects.
1100 :param store_dir: GF store path
1101 :type store_dir: str
1102 :param config: GF store Config
1103 :type config: :py:class:`~pyrocko.gf.meta.Config`
1104 :param force: Force overwrite, defaults to ``False``
1105 :type force: bool
1106 :param extra: Extra information
1107 :type extra: dict or None
1108 '''
1110 cls.create_editables(store_dir, config, force=force, extra=extra)
1111 cls.create_dependants(store_dir, force=force)
1113 return cls(store_dir)
1115 @staticmethod
1116 def create_editables(store_dir, config, force=False, extra=None):
1117 try:
1118 util.ensuredir(store_dir)
1119 except Exception:
1120 raise CannotCreate('cannot create directory %s' % store_dir)
1122 fns = []
1124 config_fn = os.path.join(store_dir, 'config')
1125 remove_if_exists(config_fn, force)
1126 meta.dump(config, filename=config_fn)
1128 fns.append(config_fn)
1130 for sub_dir in ['extra']:
1131 dpath = os.path.join(store_dir, sub_dir)
1132 remake_dir(dpath, force)
1134 if extra:
1135 for k, v in extra.items():
1136 fn = get_extra_path(store_dir, k)
1137 remove_if_exists(fn, force)
1138 meta.dump(v, filename=fn)
1140 fns.append(fn)
1142 return fns
1144 @staticmethod
1145 def create_dependants(store_dir, force=False):
1146 config_fn = os.path.join(store_dir, 'config')
1147 config = meta.load(filename=config_fn)
1149 BaseStore.create(store_dir, config.deltat, config.nrecords,
1150 force=force)
1152 for sub_dir in ['decimated']:
1153 dpath = os.path.join(store_dir, sub_dir)
1154 remake_dir(dpath, force)
1156 def __init__(self, store_dir, mode='r', use_memmap=True):
1157 BaseStore.__init__(self, store_dir, mode=mode, use_memmap=use_memmap)
1158 config_fn = self.config_fn()
1159 if not os.path.isfile(config_fn):
1160 raise StoreError(
1161 'directory "%s" does not seem to contain a GF store '
1162 '("config" file not found)' % store_dir)
1163 self.load_config()
1165 self._decimated = {}
1166 self._extra = {}
1167 self._phases = {}
1168 for decimate in range(2, 9):
1169 if os.path.isdir(self._decimated_store_dir(decimate)):
1170 self._decimated[decimate] = None
1172 def open(self):
1173 if not self._f_index:
1174 BaseStore.open(self)
1175 c = self.config
1177 mscheme = 'type_' + c.short_type.lower()
1178 store_ext.store_mapping_init(
1179 self.cstore, mscheme,
1180 c.mins, c.maxs, c.deltas, c.ns.astype(num.uint64),
1181 c.ncomponents)
1183 def save_config(self, make_backup=False):
1184 config_fn = self.config_fn()
1185 if make_backup:
1186 os.rename(config_fn, config_fn + '~')
1188 meta.dump(self.config, filename=config_fn)
1190 def get_deltat(self):
1191 return self.config.deltat
1193 def load_config(self):
1194 self.config = meta.load(filename=self.config_fn())
1196 def ensure_reference(self, force=False):
1197 if self.config.reference is not None and not force:
1198 return
1199 self.ensure_uuid()
1200 reference = '%s-%s' % (self.config.id, self.config.uuid[0:6])
1202 if self.config.reference is not None:
1203 self.config.reference = reference
1204 self.save_config()
1205 else:
1206 with open(self.config_fn(), 'a') as config:
1207 config.write('reference: %s\n' % reference)
1208 self.load_config()
1210 def ensure_uuid(self, force=False):
1211 if self.config.uuid is not None and not force:
1212 return
1213 uuid = self.create_store_hash()
1215 if self.config.uuid is not None:
1216 self.config.uuid = uuid
1217 self.save_config()
1218 else:
1219 with open(self.config_fn(), 'a') as config:
1220 config.write('\nuuid: %s\n' % uuid)
1221 self.load_config()
1223 def create_store_hash(self):
1224 logger.info('creating hash for store ...')
1225 m = hashlib.sha1()
1227 traces_size = op.getsize(self.data_fn())
1228 with open(self.data_fn(), 'rb') as traces:
1229 while traces.tell() < traces_size:
1230 m.update(traces.read(4096))
1231 traces.seek(1024 * 1024 * 10, 1)
1233 with open(self.config_fn(), 'rb') as config:
1234 m.update(config.read())
1235 return m.hexdigest()
1237 def get_extra_path(self, key):
1238 return get_extra_path(self.store_dir, key)
1240 def get_extra(self, key):
1241 '''
1242 Get extra information stored under given key.
1243 '''
1245 x = self._extra
1246 if key not in x:
1247 fn = self.get_extra_path(key)
1248 if not os.path.exists(fn):
1249 raise NoSuchExtra(key)
1251 x[key] = meta.load(filename=fn)
1253 return x[key]
1255 def upgrade(self):
1256 '''
1257 Upgrade store config and files to latest Pyrocko version.
1258 '''
1259 fns = [os.path.join(self.store_dir, 'config')]
1260 for key in self.extra_keys():
1261 fns.append(self.get_extra_path(key))
1263 i = 0
1264 for fn in fns:
1265 i += util.pf_upgrade(fn)
1267 return i
1269 def extra_keys(self):
1270 return [x for x in os.listdir(os.path.join(self.store_dir, 'extra'))
1271 if valid_string_id(x)]
1273 def put(self, args, trace):
1274 '''
1275 Insert trace into GF store.
1277 Store a single GF trace at (high-level) index ``args``.
1279 :param args: :py:class:`~pyrocko.gf.meta.Config` index tuple, e.g.
1280 ``(source_depth, distance, component)`` as in
1281 :py:class:`~pyrocko.gf.meta.ConfigTypeA`.
1282 :type args: tuple
1283 :returns: GF trace at ``args``
1284 :rtype: :py:class:`~pyrocko.gf.store.GFTrace`
1285 '''
1287 irecord = self.config.irecord(*args)
1288 self._put(irecord, trace)
1290 def get_record(self, args):
1291 irecord = self.config.irecord(*args)
1292 return self._get_record(irecord)
1294 def str_irecord(self, args):
1295 return BaseStore.str_irecord(self, self.config.irecord(*args))
1297 def get(self, args, itmin=None, nsamples=None, decimate=1,
1298 interpolation='nearest_neighbor', implementation='c'):
1299 '''
1300 Retrieve GF trace from store.
1302 Retrieve a single GF trace from the store at (high-level) index
1303 ``args``. By default, the full trace is retrieved. Given ``itmin`` and
1304 ``nsamples``, only the selected portion of the trace is extracted. If
1305 ``decimate`` is an integer in the range [2,8], the trace is decimated
1306 on the fly or, if available, the trace is read from a decimated version
1307 of the GF store.
1309 :param args: :py:class:`~pyrocko.gf.meta.Config` index tuple, e.g.
1310 ``(source_depth, distance, component)`` as in
1311 :py:class:`~pyrocko.gf.meta.ConfigTypeA`.
1312 :type args: tuple
1313 :param itmin: Start time index (start time is ``itmin * dt``),
1314 defaults to None
1315 :type itmin: int or None
1316 :param nsamples: Number of samples, defaults to None
1317 :type nsamples: int or None
1318 :param decimate: Decimatation factor, defaults to 1
1319 :type decimate: int
1320 :param interpolation: Interpolation method
1321 ``['nearest_neighbor', 'multilinear', 'off']``, defaults to
1322 ``'nearest_neighbor'``
1323 :type interpolation: str
1324 :param implementation: Implementation to use ``['c', 'reference']``,
1325 defaults to ``'c'``.
1326 :type implementation: str
1328 :returns: GF trace at ``args``
1329 :rtype: :py:class:`~pyrocko.gf.store.GFTrace`
1330 '''
1332 store, decimate = self._decimated_store(decimate)
1333 if interpolation == 'nearest_neighbor':
1334 irecord = store.config.irecord(*args)
1335 tr = store._get(irecord, itmin, nsamples, decimate,
1336 implementation)
1338 elif interpolation == 'off':
1339 irecords, weights = store.config.vicinity(*args)
1340 if len(irecords) != 1:
1341 raise NotAllowedToInterpolate()
1342 else:
1343 tr = store._get(irecords[0], itmin, nsamples, decimate,
1344 implementation)
1346 elif interpolation == 'multilinear':
1347 tr = store._sum(irecords, num.zeros(len(irecords)), weights,
1348 itmin, nsamples, decimate, implementation,
1349 'disable')
1351 # to prevent problems with rounding errors (BaseStore saves deltat
1352 # as a 4-byte floating point value, value from YAML config is more
1353 # accurate)
1354 tr.deltat = self.config.deltat * decimate
1355 return tr
1357 def sum(self, args, delays, weights, itmin=None, nsamples=None,
1358 decimate=1, interpolation='nearest_neighbor', implementation='c',
1359 optimization='enable'):
1360 '''
1361 Sum delayed and weighted GF traces.
1363 Calculate sum of delayed and weighted GF traces. ``args`` is a tuple of
1364 arrays forming the (high-level) indices of the GF traces to be
1365 selected. Delays and weights for the summation are given in the arrays
1366 ``delays`` and ``weights``. ``itmin`` and ``nsamples`` can be given to
1367 restrict to computation to a given time interval. If ``decimate`` is
1368 an integer in the range [2,8], decimated traces are used in the
1369 summation.
1371 :param args: :py:class:`~pyrocko.gf.meta.Config` index tuple, e.g.
1372 ``(source_depth, distance, component)`` as in
1373 :py:class:`~pyrocko.gf.meta.ConfigTypeA`.
1374 :type args: tuple(numpy.ndarray)
1375 :param delays: Delay times
1376 :type delays: :py:class:`numpy.ndarray`
1377 :param weights: Trace weights
1378 :type weights: :py:class:`numpy.ndarray`
1379 :param itmin: Start time index (start time is ``itmin * dt``),
1380 defaults to None
1381 :type itmin: int or None
1382 :param nsamples: Number of samples, defaults to None
1383 :type nsamples: int or None
1384 :param decimate: Decimatation factor, defaults to 1
1385 :type decimate: int
1386 :param interpolation: Interpolation method
1387 ``['nearest_neighbor', 'multilinear', 'off']``, defaults to
1388 ``'nearest_neighbor'``
1389 :type interpolation: str
1390 :param implementation: Implementation to use,
1391 ``['c', 'alternative', 'reference']``, where ``'alternative'``
1392 and ``'reference'`` use a Python implementation, defaults to `'c'`
1393 :type implementation: str
1394 :param optimization: Optimization mode ``['enable', 'disable']``,
1395 defaults to ``'enable'``
1396 :type optimization: str
1397 :returns: Stacked GF trace.
1398 :rtype: :py:class:`~pyrocko.gf.store.GFTrace`
1399 '''
1401 store, decimate_ = self._decimated_store(decimate)
1403 if interpolation == 'nearest_neighbor':
1404 irecords = store.config.irecords(*args)
1405 else:
1406 assert interpolation == 'multilinear'
1407 irecords, ip_weights = store.config.vicinities(*args)
1408 neach = irecords.size // args[0].size
1409 weights = num.repeat(weights, neach) * ip_weights
1410 delays = num.repeat(delays, neach)
1412 tr = store._sum(irecords, delays, weights, itmin, nsamples, decimate_,
1413 implementation, optimization)
1415 # to prevent problems with rounding errors (BaseStore saves deltat
1416 # as a 4-byte floating point value, value from YAML config is more
1417 # accurate)
1418 tr.deltat = self.config.deltat * decimate
1419 return tr
1421 def make_decimated(self, decimate, config=None, force=False,
1422 show_progress=False):
1423 '''
1424 Create decimated version of GF store.
1426 Create a downsampled version of the GF store. Downsampling is done for
1427 the integer factor ``decimate`` which should be in the range [2,8]. If
1428 ``config`` is ``None``, all traces of the GF store are decimated and
1429 held available (i.e. the index mapping of the original store is used),
1430 otherwise, a different spacial stepping can be specified by giving a
1431 modified GF store configuration in ``config`` (see :py:meth:`create`).
1432 Decimated GF sub-stores are created under the ``decimated``
1433 subdirectory within the GF store directory. Holding available decimated
1434 versions of the GF store can save computation time, IO bandwidth, or
1435 decrease memory footprint at the cost of increased disk space usage,
1436 when computation are done for lower frequency signals.
1438 :param decimate: Decimate factor
1439 :type decimate: int
1440 :param config: GF store config object, defaults to None
1441 :type config: :py:class:`~pyrocko.gf.meta.Config` or None
1442 :param force: Force overwrite, defaults to ``False``
1443 :type force: bool
1444 :param show_progress: Show progress, defaults to ``False``
1445 :type show_progress: bool
1446 '''
1448 if not self._f_index:
1449 self.open()
1451 if not (2 <= decimate <= 8):
1452 raise StoreError('decimate argument must be in the range [2,8]')
1454 assert self.mode == 'r'
1456 if config is None:
1457 config = self.config
1459 config = copy.deepcopy(config)
1460 config.sample_rate = self.config.sample_rate / decimate
1462 if decimate in self._decimated:
1463 del self._decimated[decimate]
1465 store_dir = self._decimated_store_dir(decimate)
1466 if os.path.exists(store_dir):
1467 if force:
1468 shutil.rmtree(store_dir)
1469 else:
1470 raise CannotCreate('store already exists at %s' % store_dir)
1472 store_dir_incomplete = store_dir + '-incomplete'
1473 Store.create(store_dir_incomplete, config, force=force)
1475 decimated = Store(store_dir_incomplete, 'w')
1476 try:
1477 if show_progress:
1478 pbar = util.progressbar(
1479 'decimating store', self.config.nrecords)
1481 for i, args in enumerate(decimated.config.iter_nodes()):
1482 tr = self.get(args, decimate=decimate)
1483 decimated.put(args, tr)
1485 if show_progress:
1486 pbar.update(i+1)
1488 finally:
1489 if show_progress:
1490 pbar.finish()
1492 decimated.close()
1494 shutil.move(store_dir_incomplete, store_dir)
1496 self._decimated[decimate] = None
1498 def stats(self):
1499 stats = BaseStore.stats(self)
1500 stats['decimated'] = sorted(self._decimated.keys())
1501 return stats
1503 stats_keys = BaseStore.stats_keys + ['decimated']
1505 def check(self, show_progress=False):
1506 have_holes = []
1507 for pdef in self.config.tabulated_phases:
1508 phase_id = pdef.id
1509 ph = self.get_stored_phase(phase_id)
1510 if ph.check_holes():
1511 have_holes.append(phase_id)
1513 if have_holes:
1514 for phase_id in have_holes:
1515 logger.warning(
1516 'Travel time table of phase "{}" contains holes. You can '
1517 ' use `fomosto tttlsd` to fix holes.'.format(
1518 phase_id))
1519 else:
1520 logger.info('No holes in travel time tables')
1522 try:
1523 if show_progress:
1524 pbar = util.progressbar('checking store', self.config.nrecords)
1526 problems = 0
1527 for i, args in enumerate(self.config.iter_nodes()):
1528 tr = self.get(args)
1529 if tr and not tr.is_zero:
1530 if not tr.begin_value == tr.data[0]:
1531 logger.warning('wrong begin value for trace at %s '
1532 '(data corruption?)' % str(args))
1533 problems += 1
1534 if not tr.end_value == tr.data[-1]:
1535 logger.warning('wrong end value for trace at %s '
1536 '(data corruption?)' % str(args))
1537 problems += 1
1538 if not num.all(num.isfinite(tr.data)):
1539 logger.warning(
1540 'nans or infs in trace at %s' % str(args))
1541 problems += 1
1543 if show_progress:
1544 pbar.update(i+1)
1546 finally:
1547 if show_progress:
1548 pbar.finish()
1550 return problems
1552 def check_earthmodels(self, config):
1553 if config.earthmodel_receiver_1d.profile('z')[-1] not in\
1554 config.earthmodel_1d.profile('z'):
1555 logger.warning('deepest layer of earthmodel_receiver_1d not '
1556 'found in earthmodel_1d')
1558 def _decimated_store_dir(self, decimate):
1559 return os.path.join(self.store_dir, 'decimated', str(decimate))
1561 def _decimated_store(self, decimate):
1562 if decimate == 1 or decimate not in self._decimated:
1563 return self, decimate
1564 else:
1565 store = self._decimated[decimate]
1566 if store is None:
1567 store = Store(self._decimated_store_dir(decimate), 'r')
1568 self._decimated[decimate] = store
1570 return store, 1
1572 def phase_filename(self, phase_id, attribute='phase'):
1573 check_string_id(phase_id)
1574 return os.path.join(
1575 self.store_dir, 'phases', phase_id + '.%s' % attribute)
1577 def get_phase_identifier(self, phase_id, attribute):
1578 return '{}.{}'.format(phase_id, attribute)
1580 def get_stored_phase(self, phase_def, attribute='phase'):
1581 '''
1582 Get stored phase from GF store.
1584 :returns: Phase information
1585 :rtype: :py:class:`pyrocko.spit.SPTree`
1586 '''
1588 phase_id = self.get_phase_identifier(phase_def, attribute)
1589 if phase_id not in self._phases:
1590 fn = self.phase_filename(phase_def, attribute)
1591 if not os.path.isfile(fn):
1592 raise NoSuchPhase(phase_id)
1594 spt = spit.SPTree(filename=fn)
1595 self._phases[phase_id] = spt
1597 return self._phases[phase_id]
1599 def get_phase(self, phase_def):
1600 toks = phase_def.split(':', 1)
1601 if len(toks) == 2:
1602 provider, phase_def = toks
1603 else:
1604 provider, phase_def = 'stored', toks[0]
1606 if provider == 'stored':
1607 return self.get_stored_phase(phase_def)
1609 elif provider == 'vel':
1610 vel = float(phase_def) * 1000.
1612 def evaluate(args):
1613 return self.config.get_distance(args) / vel
1615 return evaluate
1617 elif provider == 'vel_surface':
1618 vel = float(phase_def) * 1000.
1620 def evaluate(args):
1621 return self.config.get_surface_distance(args) / vel
1623 return evaluate
1625 elif provider in ('cake', 'iaspei'):
1626 from pyrocko import cake
1627 mod = self.config.earthmodel_1d
1628 if provider == 'cake':
1629 phases = [cake.PhaseDef(phase_def)]
1630 else:
1631 phases = cake.PhaseDef.classic(phase_def)
1633 def evaluate(args):
1634 if len(args) == 2:
1635 zr, zs, x = (self.config.receiver_depth,) + args
1636 elif len(args) == 3:
1637 zr, zs, x = args
1638 else:
1639 assert False
1641 t = []
1642 if phases:
1643 rays = mod.arrivals(
1644 phases=phases,
1645 distances=[x*cake.m2d],
1646 zstart=zs,
1647 zstop=zr)
1649 for ray in rays:
1650 t.append(ray.t)
1652 if t:
1653 return min(t)
1654 else:
1655 return None
1657 return evaluate
1659 raise StoreError('unsupported phase provider: %s' % provider)
1661 def t(self, timing, *args):
1662 '''
1663 Compute interpolated phase arrivals.
1665 **Examples:**
1667 If ``test_store`` is a :py:class:`Type A<pyrocko.gf.meta.ConfigTypeA>`
1668 store::
1670 test_store.t('stored:p', (1000, 10000))
1671 test_store.t('last{stored:P|stored:Pdiff}', (1000, 10000))
1672 # The latter arrival
1673 # of P or diffracted
1674 # P phase
1676 If ``test_store`` is a :py:class:`Type B<pyrocko.gf.meta.ConfigTypeB>`
1677 store::
1679 test_store.t('S', (1000, 1000, 10000))
1680 test_store.t('first{P|p|Pdiff|sP}', (1000, 1000, 10000))
1681 # The first arrival of
1682 # the given phases is
1683 # selected
1685 Independent of the store type, it is also possible to specify two
1686 location objects and the GF index tuple is calculated internally::
1688 test_store.t('p', source, target)
1690 :param timing: travel-time definition
1691 :type timing: str or :py:class:`~pyrocko.gf.meta.Timing`
1692 :param \\*args: if ``len(args) == 1``, ``args[0]`` must be a
1693 :py:class:`GF index tuple <pyrocko.gf.meta.Config>`, e.g.
1694 ``(source_depth, distance, component)`` for a
1695 :py:class:`Type A<pyrocko.gf.meta.ConfigTypeA>` store. If
1696 ``len(args) == 2``, two location objects are expected, e.g.
1697 ``(source, receiver)`` and the appropriate GF index is computed
1698 internally.
1699 :type \\*args: (:py:class:`tuple`,) or
1700 (:py:class:`~pyrocko.model.Location`,
1701 :py:class:`~pyrocko.model.Location`)
1702 :returns: Phase arrival according to ``timing``
1703 :rtype: float or None
1704 '''
1706 if len(args) == 1:
1707 args = args[0]
1708 else:
1709 args = self.config.make_indexing_args1(*args)
1711 if not isinstance(timing, meta.Timing):
1712 timing = meta.Timing(timing)
1714 return timing.evaluate(self.get_phase, args)
1716 def get_available_interpolation_tables(self):
1717 filenames = glob(op.join(self.store_dir, 'phases', '*'))
1718 return [op.basename(file) for file in filenames]
1720 def get_stored_attribute(self, phase_def, attribute, *args):
1721 '''
1722 Return interpolated store attribute
1724 :param attribute: takeoff_angle / incidence_angle [deg]
1725 :type attribute: str
1726 :param \\*args: :py:class:`~pyrocko.gf.meta.Config` index tuple, e.g.
1727 ``(source_depth, distance, component)`` as in
1728 :py:class:`~pyrocko.gf.meta.ConfigTypeA`.
1729 :type \\*args: tuple
1730 '''
1731 try:
1732 return self.get_stored_phase(phase_def, attribute)(*args)
1733 except NoSuchPhase:
1734 raise StoreError(
1735 'Interpolation table for {} of {} does not exist! '
1736 'Available tables: {}'.format(
1737 attribute, phase_def,
1738 self.get_available_interpolation_tables()))
1740 def get_many_stored_attributes(self, phase_def, attribute, coords):
1741 '''
1742 Return interpolated store attribute
1744 :param attribute: takeoff_angle / incidence_angle [deg]
1745 :type attribute: str
1746 :param \\coords: :py:class:`num.array.Array`, with columns being
1747 ``(source_depth, distance, component)`` as in
1748 :py:class:`~pyrocko.gf.meta.ConfigTypeA`.
1749 :type \\coords: :py:class:`num.array.Array`
1750 '''
1751 try:
1752 return self.get_stored_phase(
1753 phase_def, attribute).interpolate_many(coords)
1754 except NoSuchPhase:
1755 raise StoreError(
1756 'Interpolation table for {} of {} does not exist! '
1757 'Available tables: {}'.format(
1758 attribute, phase_def,
1759 self.get_available_interpolation_tables()))
1761 def make_stored_table(self, attribute, force=False):
1762 '''
1763 Compute tables for selected ray attributes.
1765 :param attribute: phase / takeoff_angle [deg]/ incidence_angle [deg]
1766 :type attribute: str
1768 Tables are computed using the 1D earth model defined in
1769 :py:attr:`~pyrocko.gf.meta.Config.earthmodel_1d` for each defined phase
1770 in :py:attr:`~pyrocko.gf.meta.Config.tabulated_phases`.
1771 '''
1773 if attribute not in available_stored_tables:
1774 raise StoreError(
1775 'Cannot create attribute table for {}! '
1776 'Supported attribute tables: {}'.format(
1777 attribute, available_stored_tables))
1779 from pyrocko import cake
1780 config = self.config
1782 if not config.tabulated_phases:
1783 return
1785 mod = config.earthmodel_1d
1787 if not mod:
1788 raise StoreError('no earth model found')
1790 if config.earthmodel_receiver_1d:
1791 self.check_earthmodels(config)
1793 for pdef in config.tabulated_phases:
1795 phase_id = pdef.id
1796 phases = pdef.phases
1798 if attribute == 'phase':
1799 ftol = config.deltat * 0.5
1800 horvels = pdef.horizontal_velocities
1801 else:
1802 ftol = config.deltat * 0.01
1804 fn = self.phase_filename(phase_id, attribute)
1806 if os.path.exists(fn) and not force:
1807 logger.info('file already exists: %s' % fn)
1808 continue
1810 def evaluate(args):
1812 nargs = len(args)
1813 if nargs == 2:
1814 receiver_depth, source_depth, distance = (
1815 config.receiver_depth,) + args
1816 elif nargs == 3:
1817 receiver_depth, source_depth, distance = args
1818 else:
1819 raise ValueError(
1820 'Number of input arguments %i is not supported!'
1821 'Supported number of arguments: 2 or 3' % nargs)
1823 ray_attribute_values = []
1824 arrival_times = []
1825 if phases:
1826 rays = mod.arrivals(
1827 phases=phases,
1828 distances=[distance * cake.m2d],
1829 zstart=source_depth,
1830 zstop=receiver_depth)
1832 for ray in rays:
1833 arrival_times.append(ray.t)
1834 if attribute != 'phase':
1835 ray_attribute_values.append(
1836 getattr(ray, attribute)())
1838 if attribute == 'phase':
1839 for v in horvels:
1840 arrival_times.append(distance / (v * km))
1842 if arrival_times:
1843 if attribute == 'phase':
1844 return min(arrival_times)
1845 else:
1846 earliest_idx = num.argmin(arrival_times)
1847 return ray_attribute_values[earliest_idx]
1848 else:
1849 return None
1851 logger.info(
1852 'making "%s" table for phasegroup "%s"', attribute, phase_id)
1854 ip = spit.SPTree(
1855 f=evaluate,
1856 ftol=ftol,
1857 xbounds=num.transpose((config.mins, config.maxs)),
1858 xtols=config.deltas)
1860 util.ensuredirs(fn)
1861 ip.dump(fn)
1863 def make_timing_params(self, begin, end, snap_vred=True, force=False):
1864 '''
1865 Compute tight parameterized time ranges to include given timings.
1867 Calculates appropriate time ranges to cover given begin and end timings
1868 over all GF points in the store. A dict with the following keys is
1869 returned:
1871 * ``'tmin'``: time [s], minimum of begin timing over all GF points
1872 * ``'tmax'``: time [s], maximum of end timing over all GF points
1873 * ``'vred'``, ``'tmin_vred'``: slope [m/s] and offset [s] of reduction
1874 velocity [m/s] appropriate to catch begin timing over all GF points
1875 * ``'tlenmax_vred'``: maximum time length needed to cover all end
1876 timings, when using linear slope given with (``vred``, ``tmin_vred``)
1877 as start
1878 '''
1880 data = []
1881 warned = set()
1882 for args in self.config.iter_nodes(level=-1):
1883 tmin = self.t(begin, args)
1884 tmax = self.t(end, args)
1885 if tmin is None:
1886 warned.add(str(begin))
1887 if tmax is None:
1888 warned.add(str(end))
1890 x = self.config.get_surface_distance(args)
1891 data.append((x, tmin, tmax))
1893 if len(warned):
1894 w = ' | '.join(list(warned))
1895 msg = '''determination of time window failed using phase
1896definitions: %s.\n Travel time table contains holes in probed ranges. You can
1897use `fomosto tttlsd` to fix holes.''' % w
1898 if force:
1899 logger.warning(msg)
1900 else:
1901 raise MakeTimingParamsFailed(msg)
1903 xs, tmins, tmaxs = num.array(data, dtype=float).T
1905 tlens = tmaxs - tmins
1907 i = num.nanargmin(tmins)
1908 if not num.isfinite(i):
1909 raise MakeTimingParamsFailed('determination of time window failed')
1911 tminmin = tmins[i]
1912 x_tminmin = xs[i]
1913 dx = (xs - x_tminmin)
1914 dx = num.where(dx != 0.0, dx, num.nan)
1915 s = (tmins - tminmin) / dx
1916 sred = num.min(num.abs(s[num.isfinite(s)]))
1918 deltax = self.config.distance_delta
1920 if snap_vred:
1921 tdif = sred*deltax
1922 tdif2 = self.config.deltat * math.floor(tdif / self.config.deltat)
1923 sred = tdif2/self.config.distance_delta
1925 tmin_vred = tminmin - sred*x_tminmin
1926 if snap_vred:
1927 xe = x_tminmin - int(x_tminmin / deltax) * deltax
1928 tmin_vred = float(
1929 self.config.deltat *
1930 math.floor(tmin_vred / self.config.deltat) - xe * sred)
1932 tlenmax_vred = num.nanmax(tmax - (tmin_vred + sred*x))
1933 if sred != 0.0:
1934 vred = 1.0/sred
1935 else:
1936 vred = 0.0
1938 return dict(
1939 tmin=tminmin,
1940 tmax=num.nanmax(tmaxs),
1941 tlenmax=num.nanmax(tlens),
1942 tmin_vred=tmin_vred,
1943 tlenmax_vred=tlenmax_vred,
1944 vred=vred)
1946 def make_travel_time_tables(self, force=False):
1947 '''
1948 Compute travel time tables.
1950 Travel time tables are computed using the 1D earth model defined in
1951 :py:attr:`~pyrocko.gf.meta.Config.earthmodel_1d` for each defined phase
1952 in :py:attr:`~pyrocko.gf.meta.Config.tabulated_phases`. The accuracy of
1953 the tablulated times is adjusted to the sampling rate of the store.
1954 '''
1955 self.make_stored_table(attribute='phase', force=force)
1957 def make_ttt(self, force=False):
1958 self.make_travel_time_tables(force=force)
1960 def make_takeoff_angle_tables(self, force=False):
1961 '''
1962 Compute takeoff-angle tables.
1964 Takeoff-angle tables [deg] are computed using the 1D earth model
1965 defined in :py:attr:`~pyrocko.gf.meta.Config.earthmodel_1d` for each
1966 defined phase in :py:attr:`~pyrocko.gf.meta.Config.tabulated_phases`.
1967 The accuracy of the tablulated times is adjusted to 0.01 times the
1968 sampling rate of the store.
1969 '''
1970 self.make_stored_table(attribute='takeoff_angle', force=force)
1972 def make_incidence_angle_tables(self, force=False):
1973 '''
1974 Compute incidence-angle tables.
1976 Incidence-angle tables [deg] are computed using the 1D earth model
1977 defined in :py:attr:`~pyrocko.gf.meta.Config.earthmodel_1d` for each
1978 defined phase in :py:attr:`~pyrocko.gf.meta.Config.tabulated_phases`.
1979 The accuracy of the tablulated times is adjusted to 0.01 times the
1980 sampling rate of the store.
1981 '''
1982 self.make_stored_table(attribute='incidence_angle', force=force)
1984 def get_provided_components(self):
1986 scheme_desc = meta.component_scheme_to_description[
1987 self.config.component_scheme]
1989 quantity = self.config.stored_quantity \
1990 or scheme_desc.default_stored_quantity
1992 if not quantity:
1993 return scheme_desc.provided_components
1994 else:
1995 return [
1996 quantity + '.' + comp
1997 for comp in scheme_desc.provided_components]
1999 def fix_ttt_holes(self, phase_id):
2001 pdef = self.config.get_tabulated_phase(phase_id)
2002 mode = None
2003 for phase in pdef.phases:
2004 for leg in phase.legs():
2005 if mode is None:
2006 mode = leg.mode
2008 else:
2009 if mode != leg.mode:
2010 raise StoreError(
2011 'Can only fix holes in pure P or pure S phases.')
2013 sptree = self.get_stored_phase(phase_id)
2014 sptree_lsd = self.config.fix_ttt_holes(sptree, mode)
2016 phase_lsd = phase_id + '.lsd'
2017 fn = self.phase_filename(phase_lsd)
2018 sptree_lsd.dump(fn)
2020 def statics(self, source, multi_location, itsnapshot, components,
2021 interpolation='nearest_neighbor', nthreads=0):
2022 if not self._f_index:
2023 self.open()
2025 out = {}
2026 ntargets = multi_location.ntargets
2027 source_terms = source.get_source_terms(self.config.component_scheme)
2028 # TODO: deal with delays for snapshots > 1 sample
2030 if itsnapshot is not None:
2031 delays = source.times
2033 # Fringe case where we sample at sample 0 and sample 1
2034 tsnapshot = itsnapshot * self.config.deltat
2035 if delays.max() == tsnapshot and delays.min() != tsnapshot:
2036 delays[delays == delays.max()] -= self.config.deltat
2038 else:
2039 delays = source.times * 0
2040 itsnapshot = 1
2042 if ntargets == 0:
2043 raise StoreError('MultiLocation.coords5 is empty')
2045 res = store_ext.store_calc_static(
2046 self.cstore,
2047 source.coords5(),
2048 source_terms,
2049 delays,
2050 multi_location.coords5,
2051 self.config.component_scheme,
2052 interpolation,
2053 itsnapshot,
2054 nthreads)
2056 out = {}
2057 for icomp, (comp, comp_res) in enumerate(
2058 zip(self.get_provided_components(), res)):
2059 if comp not in components:
2060 continue
2061 out[comp] = res[icomp]
2063 return out
2065 def calc_seismograms(self, source, receivers, components, deltat=None,
2066 itmin=None, nsamples=None,
2067 interpolation='nearest_neighbor',
2068 optimization='enable', nthreads=1):
2069 config = self.config
2071 assert interpolation in ['nearest_neighbor', 'multilinear'], \
2072 'Unknown interpolation: %s' % interpolation
2074 if not isinstance(receivers, list):
2075 receivers = [receivers]
2077 if deltat is None:
2078 decimate = 1
2079 else:
2080 decimate = int(round(deltat/config.deltat))
2081 if abs(deltat / (decimate * config.deltat) - 1.0) > 0.001:
2082 raise StoreError(
2083 'unavailable decimation ratio target.deltat / store.deltat'
2084 ' = %g / %g' % (deltat, config.deltat))
2086 store, decimate_ = self._decimated_store(decimate)
2088 if not store._f_index:
2089 store.open()
2091 scheme = config.component_scheme
2093 source_coords_arr = source.coords5()
2094 source_terms = source.get_source_terms(scheme)
2095 delays = source.times.ravel()
2097 nreceiver = len(receivers)
2098 receiver_coords_arr = num.empty((nreceiver, 5))
2099 for irec, rec in enumerate(receivers):
2100 receiver_coords_arr[irec, :] = rec.coords5
2102 dt = self.get_deltat()
2104 itoffset = int(num.floor(delays.min()/dt)) if delays.size != 0 else 0
2106 if itmin is None:
2107 itmin = num.zeros(nreceiver, dtype=num.int32)
2108 else:
2109 itmin = (itmin-itoffset).astype(num.int32)
2111 if nsamples is None:
2112 nsamples = num.zeros(nreceiver, dtype=num.int32) - 1
2113 else:
2114 nsamples = nsamples.astype(num.int32)
2116 try:
2117 results = store_ext.store_calc_timeseries(
2118 store.cstore,
2119 source_coords_arr,
2120 source_terms,
2121 (delays - itoffset*dt),
2122 receiver_coords_arr,
2123 scheme,
2124 interpolation,
2125 itmin,
2126 nsamples,
2127 nthreads)
2128 except Exception as e:
2129 raise e
2131 provided_components = self.get_provided_components()
2132 ncomponents = len(provided_components)
2134 seismograms = [dict() for _ in range(nreceiver)]
2135 for ires in range(len(results)):
2136 res = results.pop(0)
2137 ireceiver = ires // ncomponents
2139 comp = provided_components[res[-2]]
2141 if comp not in components:
2142 continue
2144 tr = GFTrace(*res[:-2])
2145 tr.deltat = config.deltat * decimate
2146 tr.itmin += itoffset
2148 tr.n_records_stacked = 0
2149 tr.t_optimize = 0.
2150 tr.t_stack = 0.
2151 tr.err = res[-1]
2153 seismograms[ireceiver][comp] = tr
2155 return seismograms
2157 def seismogram(self, source, receiver, components, deltat=None,
2158 itmin=None, nsamples=None,
2159 interpolation='nearest_neighbor',
2160 optimization='enable', nthreads=1):
2162 config = self.config
2164 if deltat is None:
2165 decimate = 1
2166 else:
2167 decimate = int(round(deltat/config.deltat))
2168 if abs(deltat / (decimate * config.deltat) - 1.0) > 0.001:
2169 raise StoreError(
2170 'unavailable decimation ratio target.deltat / store.deltat'
2171 ' = %g / %g' % (deltat, config.deltat))
2173 store, decimate_ = self._decimated_store(decimate)
2175 if not store._f_index:
2176 store.open()
2178 scheme = config.component_scheme
2180 source_coords_arr = source.coords5()
2181 source_terms = source.get_source_terms(scheme)
2182 receiver_coords_arr = receiver.coords5[num.newaxis, :].copy()
2184 try:
2185 params = store_ext.make_sum_params(
2186 store.cstore,
2187 source_coords_arr,
2188 source_terms,
2189 receiver_coords_arr,
2190 scheme,
2191 interpolation, nthreads)
2193 except store_ext.StoreExtError:
2194 raise meta.OutOfBounds()
2196 provided_components = self.get_provided_components()
2198 out = {}
2199 for icomp, comp in enumerate(provided_components):
2200 if comp in components:
2201 weights, irecords = params[icomp]
2203 neach = irecords.size // source.times.size
2204 delays = num.repeat(source.times, neach)
2206 tr = store._sum(
2207 irecords, delays, weights, itmin, nsamples, decimate_,
2208 'c', optimization)
2210 # to prevent problems with rounding errors (BaseStore saves
2211 # deltat as a 4-byte floating point value, value from YAML
2212 # config is more accurate)
2213 tr.deltat = config.deltat * decimate
2215 out[comp] = tr
2217 return out
2220__all__ = '''
2221gf_dtype
2222NotMultipleOfSamplingInterval
2223GFTrace
2224CannotCreate
2225CannotOpen
2226DuplicateInsert
2227NotAllowedToInterpolate
2228NoSuchExtra
2229NoSuchPhase
2230BaseStore
2231Store
2232SeismosizerErrorEnum
2233'''.split()