1# http://pyrocko.org - GPLv3
2#
3# The Pyrocko Developers, 21st Century
4# ---|P------/S----------~Lg----------
5from __future__ import absolute_import, division
7import errno
8import time
9import os
10import struct
11import weakref
12import math
13import shutil
14try:
15 import fcntl
16except ImportError:
17 fcntl = None
18import copy
19import logging
20import re
21import hashlib
22from glob import glob
24import numpy as num
25from scipy import signal
27from . import meta
28from .error import StoreError
29try:
30 from . import store_ext
31except ImportError:
32 store_ext = None
33from pyrocko import util, spit
35logger = logging.getLogger('pyrocko.gf.store')
36op = os.path
38# gf store endianness
39E = '<'
41gf_dtype = num.dtype(num.float32)
42gf_dtype_store = num.dtype(E + 'f4')
44gf_dtype_nbytes_per_sample = 4
46gf_store_header_dtype = [
47 ('nrecords', E + 'u8'),
48 ('deltat', E + 'f4'),
49]
51gf_store_header_fmt = E + 'Qf'
52gf_store_header_fmt_size = struct.calcsize(gf_store_header_fmt)
54gf_record_dtype = num.dtype([
55 ('data_offset', E + 'u8'),
56 ('itmin', E + 'i4'),
57 ('nsamples', E + 'u4'),
58 ('begin_value', E + 'f4'),
59 ('end_value', E + 'f4'),
60])
62available_stored_tables = ['phase', 'takeoff_angle', 'incidence_angle']
64km = 1000.
67class SeismosizerErrorEnum:
68 SUCCESS = 0
69 INVALID_RECORD = 1
70 EMPTY_RECORD = 2
71 BAD_RECORD = 3
72 ALLOC_FAILED = 4
73 BAD_REQUEST = 5
74 BAD_DATA_OFFSET = 6
75 READ_DATA_FAILED = 7
76 SEEK_INDEX_FAILED = 8
77 READ_INDEX_FAILED = 9
78 FSTAT_TRACES_FAILED = 10
79 BAD_STORE = 11
80 MMAP_INDEX_FAILED = 12
81 MMAP_TRACES_FAILED = 13
82 INDEX_OUT_OF_BOUNDS = 14
83 NTARGETS_OUT_OF_BOUNDS = 15
86def valid_string_id(s):
87 return re.match(meta.StringID.pattern, s)
90def check_string_id(s):
91 if not valid_string_id(s):
92 raise ValueError('invalid name %s' % s)
94# - data_offset
95#
96# Data file offset of first sample in bytes (the seek value).
97# Special values:
98#
99# 0x00 - missing trace
100# 0x01 - zero trace (GF trace is physically zero)
101# 0x02 - short trace of 1 or 2 samples (no need for data allocation)
102#
103# - itmin
104#
105# Time of first sample of the trace as a multiple of the sampling interval. All
106# GF samples must be evaluated at multiples of the sampling interval with
107# respect to a global reference time zero. Must be set also for zero traces.
108#
109# - nsamples
110#
111# Number of samples in the GF trace. Should be set to zero for zero traces.
112#
113# - begin_value, end_value
114#
115# Values of first and last sample. These values are included in data[]
116# redunantly.
119class NotMultipleOfSamplingInterval(Exception):
120 pass
123sampling_check_eps = 1e-5
126class GFTrace(object):
127 '''
128 Green's Function trace class for handling traces from the GF store.
129 '''
131 @classmethod
132 def from_trace(cls, tr):
133 return cls(data=tr.ydata.copy(), tmin=tr.tmin, deltat=tr.deltat)
135 def __init__(self, data=None, itmin=None, deltat=1.0,
136 is_zero=False, begin_value=None, end_value=None, tmin=None):
138 assert sum((x is None) for x in (tmin, itmin)) == 1, \
139 'GFTrace: either tmin or itmin must be given'\
141 if tmin is not None:
142 itmin = int(round(tmin / deltat))
143 if abs(itmin*deltat - tmin) > sampling_check_eps*deltat:
144 raise NotMultipleOfSamplingInterval(
145 'GFTrace: tmin (%g) is not a multiple of sampling '
146 'interval (%g)' % (tmin, deltat))
148 if data is not None:
149 data = num.asarray(data, dtype=gf_dtype)
150 else:
151 data = num.array([], dtype=gf_dtype)
152 begin_value = 0.0
153 end_value = 0.0
155 self.data = weakref.ref(data)()
156 self.itmin = itmin
157 self.deltat = deltat
158 self.is_zero = is_zero
159 self.n_records_stacked = 0.
160 self.t_stack = 0.
161 self.t_optimize = 0.
162 self.err = SeismosizerErrorEnum.SUCCESS
164 if data is not None and data.size > 0:
165 if begin_value is None:
166 begin_value = data[0]
167 if end_value is None:
168 end_value = data[-1]
170 self.begin_value = begin_value
171 self.end_value = end_value
173 @property
174 def t(self):
175 '''
176 Time vector of the GF trace.
178 :returns: Time vector
179 :rtype: :py:class:`numpy.ndarray`
180 '''
181 return num.linspace(
182 self.itmin*self.deltat,
183 (self.itmin+self.data.size-1)*self.deltat, self.data.size)
185 def __str__(self, itmin=0):
187 if self.is_zero:
188 return 'ZERO'
190 s = []
191 for i in range(itmin, self.itmin + self.data.size):
192 if i >= self.itmin and i < self.itmin + self.data.size:
193 s.append('%7.4g' % self.data[i-self.itmin])
194 else:
195 s.append(' '*7)
197 return '|'.join(s)
199 def to_trace(self, net, sta, loc, cha):
200 from pyrocko import trace
201 return trace.Trace(
202 net, sta, loc, cha,
203 ydata=self.data,
204 deltat=self.deltat,
205 tmin=self.itmin*self.deltat)
208class GFValue(object):
210 def __init__(self, value):
211 self.value = value
212 self.n_records_stacked = 0.
213 self.t_stack = 0.
214 self.t_optimize = 0.
217Zero = GFTrace(is_zero=True, itmin=0)
220def make_same_span(tracesdict):
222 traces = tracesdict.values()
224 nonzero = [tr for tr in traces if not tr.is_zero]
225 if not nonzero:
226 return {k: Zero for k in tracesdict.keys()}
228 deltat = nonzero[0].deltat
230 itmin = min(tr.itmin for tr in nonzero)
231 itmax = max(tr.itmin+tr.data.size for tr in nonzero) - 1
233 out = {}
234 for k, tr in tracesdict.items():
235 n = itmax - itmin + 1
236 if tr.itmin != itmin or tr.data.size != n:
237 data = num.zeros(n, dtype=gf_dtype)
238 if not tr.is_zero:
239 lo = tr.itmin-itmin
240 hi = lo + tr.data.size
241 data[:lo] = tr.data[0]
242 data[lo:hi] = tr.data
243 data[hi:] = tr.data[-1]
245 tr = GFTrace(data, itmin, deltat)
247 out[k] = tr
249 return out
252class CannotCreate(StoreError):
253 pass
256class CannotOpen(StoreError):
257 pass
260class DuplicateInsert(StoreError):
261 pass
264class ShortRead(StoreError):
265 def __str__(self):
266 return 'unexpected end of data (truncated traces file?)'
269class NotAllowedToInterpolate(StoreError):
270 def __str__(self):
271 return 'not allowed to interpolate'
274class NoSuchExtra(StoreError):
275 def __init__(self, s):
276 StoreError.__init__(self)
277 self.value = s
279 def __str__(self):
280 return 'extra information for key "%s" not found.' % self.value
283class NoSuchPhase(StoreError):
284 def __init__(self, s):
285 StoreError.__init__(self)
286 self.value = s
288 def __str__(self):
289 return 'phase for key "%s" not found. ' \
290 'Running "fomosto ttt" may be needed.' % self.value
293def remove_if_exists(fn, force=False):
294 if os.path.exists(fn):
295 if force:
296 os.remove(fn)
297 else:
298 raise CannotCreate('file %s already exists' % fn)
301def get_extra_path(store_dir, key):
302 check_string_id(key)
303 return os.path.join(store_dir, 'extra', key)
306class BaseStore(object):
308 @staticmethod
309 def lock_fn_(store_dir):
310 return os.path.join(store_dir, 'lock')
312 @staticmethod
313 def index_fn_(store_dir):
314 return os.path.join(store_dir, 'index')
316 @staticmethod
317 def data_fn_(store_dir):
318 return os.path.join(store_dir, 'traces')
320 @staticmethod
321 def config_fn_(store_dir):
322 return os.path.join(store_dir, 'config')
324 @staticmethod
325 def create(store_dir, deltat, nrecords, force=False):
327 try:
328 util.ensuredir(store_dir)
329 except Exception:
330 raise CannotCreate('cannot create directory %s' % store_dir)
332 index_fn = BaseStore.index_fn_(store_dir)
333 data_fn = BaseStore.data_fn_(store_dir)
335 for fn in (index_fn, data_fn):
336 remove_if_exists(fn, force)
338 with open(index_fn, 'wb') as f:
339 f.write(struct.pack(gf_store_header_fmt, nrecords, deltat))
340 records = num.zeros(nrecords, dtype=gf_record_dtype)
341 records.tofile(f)
343 with open(data_fn, 'wb') as f:
344 f.write(b'\0' * 32)
346 def __init__(self, store_dir, mode='r', use_memmap=True):
347 assert mode in 'rw'
348 self.store_dir = store_dir
349 self.mode = mode
350 self._use_memmap = use_memmap
351 self._nrecords = None
352 self._deltat = None
353 self._f_index = None
354 self._f_data = None
355 self._records = None
356 self.cstore = None
358 def open(self):
359 assert self._f_index is None
360 index_fn = self.index_fn()
361 data_fn = self.data_fn()
363 if self.mode == 'r':
364 fmode = 'rb'
365 elif self.mode == 'w':
366 fmode = 'r+b'
367 else:
368 assert False, 'invalid mode: %s' % self.mode
370 try:
371 self._f_index = open(index_fn, fmode)
372 self._f_data = open(data_fn, fmode)
373 except Exception:
374 self.mode = ''
375 raise CannotOpen('cannot open gf store: %s' % self.store_dir)
377 try:
378 # replace single precision deltat value in store with the double
379 # precision value from the config, if available
380 self.cstore = store_ext.store_init(
381 self._f_index.fileno(), self._f_data.fileno(),
382 self.get_deltat() or 0.0)
384 except store_ext.StoreExtError as e:
385 raise StoreError(str(e))
387 while True:
388 try:
389 dataheader = self._f_index.read(gf_store_header_fmt_size)
390 break
392 except IOError as e:
393 # occasionally got this one on an NFS volume
394 if e.errno == errno.EBUSY:
395 time.sleep(0.01)
396 else:
397 raise
399 nrecords, deltat = struct.unpack(gf_store_header_fmt, dataheader)
400 self._nrecords = nrecords
401 self._deltat = deltat
403 self._load_index()
405 def __del__(self):
406 if self.mode != '':
407 self.close()
409 def lock(self):
410 if not fcntl:
411 lock_fn = self.lock_fn()
412 util.create_lockfile(lock_fn)
413 else:
414 if not self._f_index:
415 self.open()
417 while True:
418 try:
419 fcntl.lockf(self._f_index, fcntl.LOCK_EX)
420 break
422 except IOError as e:
423 if e.errno == errno.ENOLCK:
424 time.sleep(0.01)
425 else:
426 raise
428 def unlock(self):
429 if not fcntl:
430 lock_fn = self.lock_fn()
431 util.delete_lockfile(lock_fn)
432 else:
433 self._f_data.flush()
434 self._f_index.flush()
435 fcntl.lockf(self._f_index, fcntl.LOCK_UN)
437 def put(self, irecord, trace):
438 self._put(irecord, trace)
440 def get_record(self, irecord):
441 return self._get_record(irecord)
443 def get_span(self, irecord, decimate=1):
444 return self._get_span(irecord, decimate=decimate)
446 def get(self, irecord, itmin=None, nsamples=None, decimate=1,
447 implementation='c'):
448 return self._get(irecord, itmin, nsamples, decimate, implementation)
450 def sum(self, irecords, delays, weights, itmin=None,
451 nsamples=None, decimate=1, implementation='c',
452 optimization='enable'):
453 return self._sum(irecords, delays, weights, itmin, nsamples, decimate,
454 implementation, optimization)
456 def irecord_format(self):
457 return util.zfmt(self._nrecords)
459 def str_irecord(self, irecord):
460 return self.irecord_format() % irecord
462 def close(self):
463 if self.mode == 'w':
464 if not self._f_index:
465 self.open()
466 self._save_index()
468 if self._f_data:
469 self._f_data.close()
470 self._f_data = None
472 if self._f_index:
473 self._f_index.close()
474 self._f_index = None
476 del self._records
477 self._records = None
479 self.mode = ''
481 def _get_record(self, irecord):
482 if not self._f_index:
483 self.open()
485 return self._records[irecord]
487 def _get(self, irecord, itmin, nsamples, decimate, implementation):
488 '''
489 Retrieve complete GF trace from storage.
490 '''
492 if not self._f_index:
493 self.open()
495 if not self.mode == 'r':
496 raise StoreError('store not open in read mode')
498 if implementation == 'c' and decimate == 1:
500 if nsamples is None:
501 nsamples = -1
503 if itmin is None:
504 itmin = 0
506 try:
507 return GFTrace(*store_ext.store_get(
508 self.cstore, int(irecord), int(itmin), int(nsamples)))
509 except store_ext.StoreExtError as e:
510 raise StoreError(str(e))
512 else:
513 return self._get_impl_reference(irecord, itmin, nsamples, decimate)
515 def get_deltat(self):
516 return self._deltat
518 def _get_impl_reference(self, irecord, itmin, nsamples, decimate):
519 deltat = self.get_deltat()
521 if not (0 <= irecord < self._nrecords):
522 raise StoreError('invalid record number requested '
523 '(irecord = %i, nrecords = %i)' %
524 (irecord, self._nrecords))
526 ipos, itmin_data, nsamples_data, begin_value, end_value = \
527 self._records[irecord]
529 if None in (itmin, nsamples):
530 itmin = itmin_data
531 itmax = itmin_data + nsamples_data - 1
532 nsamples = nsamples_data
533 else:
534 itmin = itmin * decimate
535 nsamples = nsamples * decimate
536 itmax = itmin + nsamples - decimate
538 if ipos == 0:
539 return None
541 elif ipos == 1:
542 itmin_ext = (max(itmin, itmin_data)//decimate) * decimate
543 return GFTrace(is_zero=True, itmin=itmin_ext//decimate)
545 if decimate == 1:
546 ilo = max(itmin, itmin_data) - itmin_data
547 ihi = min(itmin+nsamples, itmin_data+nsamples_data) - itmin_data
548 data = self._get_data(ipos, begin_value, end_value, ilo, ihi)
550 return GFTrace(data, itmin=itmin_data+ilo, deltat=deltat,
551 begin_value=begin_value, end_value=end_value)
553 else:
554 itmax_data = itmin_data + nsamples_data - 1
556 # put begin and end to multiples of new sampling rate
557 itmin_ext = (max(itmin, itmin_data)//decimate) * decimate
558 itmax_ext = -((-min(itmax, itmax_data))//decimate) * decimate
559 nsamples_ext = itmax_ext - itmin_ext + 1
561 # add some padding for the aa filter
562 order = 30
563 itmin_ext_pad = itmin_ext - order//2
564 itmax_ext_pad = itmax_ext + order//2
565 nsamples_ext_pad = itmax_ext_pad - itmin_ext_pad + 1
567 itmin_overlap = max(itmin_data, itmin_ext_pad)
568 itmax_overlap = min(itmax_data, itmax_ext_pad)
570 ilo = itmin_overlap - itmin_ext_pad
571 ihi = max(ilo, itmax_overlap - itmin_ext_pad + 1)
572 ilo_data = itmin_overlap - itmin_data
573 ihi_data = max(ilo_data, itmax_overlap - itmin_data + 1)
575 data_ext_pad = num.empty(nsamples_ext_pad, dtype=gf_dtype)
576 data_ext_pad[ilo:ihi] = self._get_data(
577 ipos, begin_value, end_value, ilo_data, ihi_data)
579 data_ext_pad[:ilo] = begin_value
580 data_ext_pad[ihi:] = end_value
582 b = signal.firwin(order + 1, 1. / decimate, window='hamming')
583 a = 1.
584 data_filt_pad = signal.lfilter(b, a, data_ext_pad)
585 data_deci = data_filt_pad[order:order+nsamples_ext:decimate]
586 if data_deci.size >= 1:
587 if itmin_ext <= itmin_data:
588 data_deci[0] = begin_value
590 if itmax_ext >= itmax_data:
591 data_deci[-1] = end_value
593 return GFTrace(data_deci, itmin_ext//decimate,
594 deltat*decimate,
595 begin_value=begin_value, end_value=end_value)
597 def _get_span(self, irecord, decimate=1):
598 '''
599 Get temporal extent of GF trace at given index.
600 '''
602 if not self._f_index:
603 self.open()
605 assert 0 <= irecord < self._nrecords, \
606 'irecord = %i, nrecords = %i' % (irecord, self._nrecords)
608 (_, itmin, nsamples, _, _) = self._records[irecord]
610 itmax = itmin + nsamples - 1
612 if decimate == 1:
613 return itmin, itmax
614 else:
615 if nsamples == 0:
616 return itmin//decimate, itmin//decimate - 1
617 else:
618 return itmin//decimate, -((-itmax)//decimate)
620 def _put(self, irecord, trace):
621 '''
622 Save GF trace to storage.
623 '''
625 if not self._f_index:
626 self.open()
628 deltat = self.get_deltat()
630 assert self.mode == 'w'
631 assert trace.is_zero or \
632 abs(trace.deltat - deltat) < 1e-7 * deltat
633 assert 0 <= irecord < self._nrecords, \
634 'irecord = %i, nrecords = %i' % (irecord, self._nrecords)
636 if self._records[irecord][0] != 0:
637 raise DuplicateInsert('record %i already in store' % irecord)
639 if trace.is_zero or num.all(trace.data == 0.0):
640 self._records[irecord] = (1, trace.itmin, 0, 0., 0.)
641 return
643 ndata = trace.data.size
645 if ndata > 2:
646 self._f_data.seek(0, 2)
647 ipos = self._f_data.tell()
648 trace.data.astype(gf_dtype_store).tofile(self._f_data)
649 else:
650 ipos = 2
652 self._records[irecord] = (ipos, trace.itmin, ndata,
653 trace.data[0], trace.data[-1])
655 def _sum_impl_alternative(self, irecords, delays, weights, itmin, nsamples,
656 decimate):
658 '''
659 Sum delayed and weighted GF traces.
660 '''
662 if not self._f_index:
663 self.open()
665 assert self.mode == 'r'
667 deltat = self.get_deltat() * decimate
669 if len(irecords) == 0:
670 if None in (itmin, nsamples):
671 return Zero
672 else:
673 return GFTrace(
674 num.zeros(nsamples, dtype=gf_dtype), itmin,
675 deltat, is_zero=True)
677 assert len(irecords) == len(delays)
678 assert len(irecords) == len(weights)
680 if None in (itmin, nsamples):
681 itmin_delayed, itmax_delayed = [], []
682 for irecord, delay in zip(irecords, delays):
683 itmin, itmax = self._get_span(irecord, decimate=decimate)
684 itmin_delayed.append(itmin + delay/deltat)
685 itmax_delayed.append(itmax + delay/deltat)
687 itmin = int(math.floor(min(itmin_delayed)))
688 nsamples = int(math.ceil(max(itmax_delayed))) - itmin + 1
690 out = num.zeros(nsamples, dtype=gf_dtype)
691 if nsamples == 0:
692 return GFTrace(out, itmin, deltat)
694 for ii in range(len(irecords)):
695 irecord = irecords[ii]
696 delay = delays[ii]
697 weight = weights[ii]
699 if weight == 0.0:
700 continue
702 idelay_floor = int(math.floor(delay/deltat))
703 idelay_ceil = int(math.ceil(delay/deltat))
705 gf_trace = self._get(
706 irecord,
707 itmin - idelay_ceil,
708 nsamples + idelay_ceil - idelay_floor,
709 decimate,
710 'reference')
712 assert gf_trace.itmin >= itmin - idelay_ceil
713 assert gf_trace.data.size <= nsamples + idelay_ceil - idelay_floor
715 if gf_trace.is_zero:
716 continue
718 ilo = gf_trace.itmin - itmin + idelay_floor
719 ihi = ilo + gf_trace.data.size + (idelay_ceil-idelay_floor)
721 data = gf_trace.data
723 if idelay_floor == idelay_ceil:
724 out[ilo:ihi] += data * weight
725 else:
726 if data.size:
727 k = 1
728 if ihi <= nsamples:
729 out[ihi-1] += gf_trace.end_value * \
730 ((idelay_ceil-delay/deltat) * weight)
731 k = 0
733 out[ilo+1:ihi-k] += data[:data.size-k] * \
734 ((delay/deltat-idelay_floor) * weight)
735 k = 1
736 if ilo >= 0:
737 out[ilo] += gf_trace.begin_value * \
738 ((delay/deltat-idelay_floor) * weight)
739 k = 0
741 out[ilo+k:ihi-1] += data[k:] * \
742 ((idelay_ceil-delay/deltat) * weight)
744 if ilo > 0 and gf_trace.begin_value != 0.0:
745 out[:ilo] += gf_trace.begin_value * weight
747 if ihi < nsamples and gf_trace.end_value != 0.0:
748 out[ihi:] += gf_trace.end_value * weight
750 return GFTrace(out, itmin, deltat)
752 def _sum_impl_reference(self, irecords, delays, weights, itmin, nsamples,
753 decimate):
755 if not self._f_index:
756 self.open()
758 deltat = self.get_deltat() * decimate
760 if len(irecords) == 0:
761 if None in (itmin, nsamples):
762 return Zero
763 else:
764 return GFTrace(
765 num.zeros(nsamples, dtype=gf_dtype), itmin,
766 deltat, is_zero=True)
768 datas = []
769 itmins = []
770 for i, delay, weight in zip(irecords, delays, weights):
771 tr = self._get(i, None, None, decimate, 'reference')
773 idelay_floor = int(math.floor(delay/deltat))
774 idelay_ceil = int(math.ceil(delay/deltat))
776 if idelay_floor == idelay_ceil:
777 itmins.append(tr.itmin + idelay_floor)
778 datas.append(tr.data.copy()*weight)
779 else:
780 itmins.append(tr.itmin + idelay_floor)
781 datas.append(tr.data.copy()*weight*(idelay_ceil-delay/deltat))
782 itmins.append(tr.itmin + idelay_ceil)
783 datas.append(tr.data.copy()*weight*(delay/deltat-idelay_floor))
785 itmin_all = min(itmins)
787 itmax_all = max(itmin_ + data.size for (itmin_, data) in
788 zip(itmins, datas))
790 if itmin is not None:
791 itmin_all = min(itmin_all, itmin)
792 if nsamples is not None:
793 itmax_all = max(itmax_all, itmin+nsamples)
795 nsamples_all = itmax_all - itmin_all
797 arr = num.zeros((len(datas), nsamples_all), dtype=gf_dtype)
798 for i, itmin_, data in zip(num.arange(len(datas)), itmins, datas):
799 if data.size > 0:
800 ilo = itmin_-itmin_all
801 ihi = ilo + data.size
802 arr[i, :ilo] = data[0]
803 arr[i, ilo:ihi] = data
804 arr[i, ihi:] = data[-1]
806 sum_arr = arr.sum(axis=0)
808 if itmin is not None and nsamples is not None:
809 ilo = itmin-itmin_all
810 ihi = ilo + nsamples
811 sum_arr = sum_arr[ilo:ihi]
813 else:
814 itmin = itmin_all
816 return GFTrace(sum_arr, itmin, deltat)
818 def _optimize(self, irecords, delays, weights):
819 if num.unique(irecords).size == irecords.size:
820 return irecords, delays, weights
822 deltat = self.get_deltat()
824 delays = delays / deltat
825 irecords2 = num.repeat(irecords, 2)
826 delays2 = num.empty(irecords2.size, dtype=float)
827 delays2[0::2] = num.floor(delays)
828 delays2[1::2] = num.ceil(delays)
829 weights2 = num.repeat(weights, 2)
830 weights2[0::2] *= 1.0 - (delays - delays2[0::2])
831 weights2[1::2] *= (1.0 - (delays2[1::2] - delays)) * \
832 (delays2[1::2] - delays2[0::2])
834 delays2 *= deltat
836 iorder = num.lexsort((delays2, irecords2))
838 irecords2 = irecords2[iorder]
839 delays2 = delays2[iorder]
840 weights2 = weights2[iorder]
842 ui = num.empty(irecords2.size, dtype=bool)
843 ui[1:] = num.logical_or(num.diff(irecords2) != 0,
844 num.diff(delays2) != 0.)
846 ui[0] = 0
847 ind2 = num.cumsum(ui)
848 ui[0] = 1
849 ind1 = num.where(ui)[0]
851 irecords3 = irecords2[ind1]
852 delays3 = delays2[ind1]
853 weights3 = num.bincount(ind2, weights2)
855 return irecords3, delays3, weights3
857 def _sum(self, irecords, delays, weights, itmin, nsamples, decimate,
858 implementation, optimization):
860 if not self._f_index:
861 self.open()
863 t0 = time.time()
864 if optimization == 'enable':
865 irecords, delays, weights = self._optimize(
866 irecords, delays, weights)
867 else:
868 assert optimization == 'disable'
870 t1 = time.time()
871 deltat = self.get_deltat()
873 if implementation == 'c' and decimate == 1:
874 if delays.size != 0:
875 itoffset = int(num.floor(num.min(delays)/deltat))
876 else:
877 itoffset = 0
879 if nsamples is None:
880 nsamples = -1
882 if itmin is None:
883 itmin = 0
884 else:
885 itmin -= itoffset
887 try:
888 tr = GFTrace(*store_ext.store_sum(
889 self.cstore, irecords.astype(num.uint64),
890 delays - itoffset*deltat,
891 weights.astype(num.float32),
892 int(itmin), int(nsamples)))
894 tr.itmin += itoffset
896 except store_ext.StoreExtError as e:
897 raise StoreError(str(e) + ' in store %s' % self.store_dir)
899 elif implementation == 'alternative':
900 tr = self._sum_impl_alternative(irecords, delays, weights,
901 itmin, nsamples, decimate)
903 else:
904 tr = self._sum_impl_reference(irecords, delays, weights,
905 itmin, nsamples, decimate)
907 t2 = time.time()
909 tr.n_records_stacked = irecords.size
910 tr.t_optimize = t1 - t0
911 tr.t_stack = t2 - t1
913 return tr
915 def _sum_statics(self, irecords, delays, weights, it, ntargets,
916 nthreads):
917 if not self._f_index:
918 self.open()
920 return store_ext.store_sum_static(
921 self.cstore, irecords, delays, weights, it, ntargets, nthreads)
923 def _load_index(self):
924 if self._use_memmap:
925 records = num.memmap(
926 self._f_index, dtype=gf_record_dtype,
927 offset=gf_store_header_fmt_size,
928 mode=('r', 'r+')[self.mode == 'w'])
930 else:
931 self._f_index.seek(gf_store_header_fmt_size)
932 records = num.fromfile(self._f_index, dtype=gf_record_dtype)
934 assert len(records) == self._nrecords
936 self._records = records
938 def _save_index(self):
939 self._f_index.seek(0)
940 self._f_index.write(struct.pack(gf_store_header_fmt, self._nrecords,
941 self.get_deltat()))
943 if self._use_memmap:
944 self._records.flush()
945 else:
946 self._f_index.seek(gf_store_header_fmt_size)
947 self._records.tofile(self._f_index)
948 self._f_index.flush()
950 def _get_data(self, ipos, begin_value, end_value, ilo, ihi):
951 if ihi - ilo > 0:
952 if ipos == 2:
953 data_orig = num.empty(2, dtype=gf_dtype)
954 data_orig[0] = begin_value
955 data_orig[1] = end_value
956 return data_orig[ilo:ihi]
957 else:
958 self._f_data.seek(
959 int(ipos + ilo*gf_dtype_nbytes_per_sample))
960 arr = num.fromfile(
961 self._f_data, gf_dtype_store, ihi-ilo).astype(gf_dtype)
963 if arr.size != ihi-ilo:
964 raise ShortRead()
965 return arr
966 else:
967 return num.empty((0,), dtype=gf_dtype)
969 def lock_fn(self):
970 return BaseStore.lock_fn_(self.store_dir)
972 def index_fn(self):
973 return BaseStore.index_fn_(self.store_dir)
975 def data_fn(self):
976 return BaseStore.data_fn_(self.store_dir)
978 def config_fn(self):
979 return BaseStore.config_fn_(self.store_dir)
981 def count_special_records(self):
982 if not self._f_index:
983 self.open()
985 return num.histogram(
986 self._records['data_offset'],
987 bins=[0, 1, 2, 3, num.array(-1).astype(num.uint64)])[0]
989 @property
990 def size_index(self):
991 return os.stat(self.index_fn()).st_size
993 @property
994 def size_data(self):
995 return os.stat(self.data_fn()).st_size
997 @property
998 def size_index_and_data(self):
999 return self.size_index + self.size_data
1001 @property
1002 def size_index_and_data_human(self):
1003 return util.human_bytesize(self.size_index_and_data)
1005 def stats(self):
1006 counter = self.count_special_records()
1008 stats = dict(
1009 total=self._nrecords,
1010 inserted=(counter[1] + counter[2] + counter[3]),
1011 empty=counter[0],
1012 short=counter[2],
1013 zero=counter[1],
1014 size_data=self.size_data,
1015 size_index=self.size_index,
1016 )
1018 return stats
1020 stats_keys = 'total inserted empty short zero size_data size_index'.split()
1023def remake_dir(dpath, force):
1024 if os.path.exists(dpath):
1025 if force:
1026 shutil.rmtree(dpath)
1027 else:
1028 raise CannotCreate('Directory "%s" already exists.' % dpath)
1030 os.mkdir(dpath)
1033class MakeTimingParamsFailed(StoreError):
1034 pass
1037class Store(BaseStore):
1039 '''
1040 Green's function disk storage and summation machine.
1042 The `Store` can be used to efficiently store, retrieve, and sum Green's
1043 function traces. A `Store` contains many 1D time traces sampled at even
1044 multiples of a global sampling rate, where each time trace has an
1045 individual start and end time. The traces are treated as having repeating
1046 end points, so the functions they represent can be non-constant only
1047 between begin and end time. It provides capabilities to retrieve decimated
1048 traces and to extract parts of the traces. The main purpose of this class
1049 is to provide a fast, easy to use, and flexible machanism to compute
1050 weighted delay-and-sum stacks with many Green's function traces involved.
1052 Individual Green's functions are accessed through a single integer index at
1053 low level. On top of that, various indexing schemes can be implemented by
1054 providing a mapping from physical coordinates to the low level index `i`.
1055 E.g. for a problem with cylindrical symmetry, one might define a mapping
1056 from the coordinates (`receiver_depth`, `source_depth`, `distance`) to the
1057 low level index. Index translation is done in the
1058 :py:class:`pyrocko.gf.meta.Config` subclass object associated with the
1059 Store. It is accessible through the store's :py:attr:`config` attribute,
1060 and contains all meta information about the store, including physical
1061 extent, geometry, sampling rate, and information about the type of the
1062 stored Green's functions. Information about the underlying earth model
1063 can also be found there.
1065 A GF store can also contain tabulated phase arrivals. In basic cases, these
1066 can be created with the :py:meth:`make_travel_time_tables` and evaluated
1067 with the :py:func:`t` methods.
1069 .. attribute:: config
1071 The :py:class:`pyrocko.gf.meta.Config` derived object associated with
1072 the store which contains all meta information about the store and
1073 provides the high-level to low-level index mapping.
1075 .. attribute:: store_dir
1077 Path to the store's data directory.
1079 .. attribute:: mode
1081 The mode in which the store is opened: ``'r'``: read-only, ``'w'``:
1082 writeable.
1083 '''
1085 @classmethod
1086 def create(cls, store_dir, config, force=False, extra=None):
1087 '''
1088 Create new GF store.
1090 Creates a new GF store at path ``store_dir``. The layout of the GF is
1091 defined with the parameters given in ``config``, which should be an
1092 object of a subclass of :py:class:`~pyrocko.gf.meta.Config`. This
1093 function will refuse to overwrite an existing GF store, unless
1094 ``force`` is set to ``True``. If more information, e.g. parameters
1095 used for the modelling code, earth models or other, should be saved
1096 along with the GF store, these may be provided though a dict given to
1097 ``extra``. The keys of this dict must be names and the values must be
1098 *guts* type objects.
1100 :param store_dir: GF store path
1101 :type store_dir: str
1102 :param config: GF store Config
1103 :type config: :py:class:`~pyrocko.gf.meta.Config`
1104 :param force: Force overwrite, defaults to ``False``
1105 :type force: bool
1106 :param extra: Extra information
1107 :type extra: dict or None
1108 '''
1110 cls.create_editables(store_dir, config, force=force, extra=extra)
1111 cls.create_dependants(store_dir, force=force)
1113 return cls(store_dir)
1115 @staticmethod
1116 def create_editables(store_dir, config, force=False, extra=None):
1117 try:
1118 util.ensuredir(store_dir)
1119 except Exception:
1120 raise CannotCreate('cannot create directory %s' % store_dir)
1122 fns = []
1124 config_fn = os.path.join(store_dir, 'config')
1125 remove_if_exists(config_fn, force)
1126 meta.dump(config, filename=config_fn)
1128 fns.append(config_fn)
1130 for sub_dir in ['extra']:
1131 dpath = os.path.join(store_dir, sub_dir)
1132 remake_dir(dpath, force)
1134 if extra:
1135 for k, v in extra.items():
1136 fn = get_extra_path(store_dir, k)
1137 remove_if_exists(fn, force)
1138 meta.dump(v, filename=fn)
1140 fns.append(fn)
1142 return fns
1144 @staticmethod
1145 def create_dependants(store_dir, force=False):
1146 config_fn = os.path.join(store_dir, 'config')
1147 config = meta.load(filename=config_fn)
1149 BaseStore.create(store_dir, config.deltat, config.nrecords,
1150 force=force)
1152 for sub_dir in ['decimated']:
1153 dpath = os.path.join(store_dir, sub_dir)
1154 remake_dir(dpath, force)
1156 def __init__(self, store_dir, mode='r', use_memmap=True):
1157 BaseStore.__init__(self, store_dir, mode=mode, use_memmap=use_memmap)
1158 config_fn = self.config_fn()
1159 if not os.path.isfile(config_fn):
1160 raise StoreError(
1161 'directory "%s" does not seem to contain a GF store '
1162 '("config" file not found)' % store_dir)
1163 self.load_config()
1165 self._decimated = {}
1166 self._extra = {}
1167 self._phases = {}
1168 for decimate in range(2, 9):
1169 if os.path.isdir(self._decimated_store_dir(decimate)):
1170 self._decimated[decimate] = None
1172 def open(self):
1173 if not self._f_index:
1174 BaseStore.open(self)
1175 c = self.config
1177 mscheme = 'type_' + c.short_type.lower()
1178 store_ext.store_mapping_init(
1179 self.cstore, mscheme,
1180 c.mins, c.maxs, c.deltas, c.ns.astype(num.uint64),
1181 c.ncomponents)
1183 def save_config(self, make_backup=False):
1184 config_fn = self.config_fn()
1185 if make_backup:
1186 os.rename(config_fn, config_fn + '~')
1188 meta.dump(self.config, filename=config_fn)
1190 def get_deltat(self):
1191 return self.config.deltat
1193 def load_config(self):
1194 self.config = meta.load(filename=self.config_fn())
1196 def ensure_reference(self, force=False):
1197 if self.config.reference is not None and not force:
1198 return
1199 self.ensure_uuid()
1200 reference = '%s-%s' % (self.config.id, self.config.uuid[0:6])
1202 if self.config.reference is not None:
1203 self.config.reference = reference
1204 self.save_config()
1205 else:
1206 with open(self.config_fn(), 'a') as config:
1207 config.write('reference: %s\n' % reference)
1208 self.load_config()
1210 def ensure_uuid(self, force=False):
1211 if self.config.uuid is not None and not force:
1212 return
1213 uuid = self.create_store_hash()
1215 if self.config.uuid is not None:
1216 self.config.uuid = uuid
1217 self.save_config()
1218 else:
1219 with open(self.config_fn(), 'a') as config:
1220 config.write('\nuuid: %s\n' % uuid)
1221 self.load_config()
1223 def create_store_hash(self):
1224 logger.info('creating hash for store ...')
1225 m = hashlib.sha1()
1227 traces_size = op.getsize(self.data_fn())
1228 with open(self.data_fn(), 'rb') as traces:
1229 while traces.tell() < traces_size:
1230 m.update(traces.read(4096))
1231 traces.seek(1024 * 1024 * 10, 1)
1233 with open(self.config_fn(), 'rb') as config:
1234 m.update(config.read())
1235 return m.hexdigest()
1237 def get_extra_path(self, key):
1238 return get_extra_path(self.store_dir, key)
1240 def get_extra(self, key):
1241 '''
1242 Get extra information stored under given key.
1243 '''
1245 x = self._extra
1246 if key not in x:
1247 fn = self.get_extra_path(key)
1248 if not os.path.exists(fn):
1249 raise NoSuchExtra(key)
1251 x[key] = meta.load(filename=fn)
1253 return x[key]
1255 def upgrade(self):
1256 '''
1257 Upgrade store config and files to latest Pyrocko version.
1258 '''
1259 fns = [os.path.join(self.store_dir, 'config')]
1260 for key in self.extra_keys():
1261 fns.append(self.get_extra_path(key))
1263 i = 0
1264 for fn in fns:
1265 i += util.pf_upgrade(fn)
1267 return i
1269 def extra_keys(self):
1270 return [x for x in os.listdir(os.path.join(self.store_dir, 'extra'))
1271 if valid_string_id(x)]
1273 def put(self, args, trace):
1274 '''
1275 Insert trace into GF store.
1277 Store a single GF trace at (high-level) index ``args``.
1279 :param args: :py:class:`~pyrocko.gf.meta.Config` index tuple, e.g.
1280 ``(source_depth, distance, component)`` as in
1281 :py:class:`~pyrocko.gf.meta.ConfigTypeA`.
1282 :type args: tuple
1283 :returns: GF trace at ``args``
1284 :rtype: :py:class:`~pyrocko.gf.store.GFTrace`
1285 '''
1287 irecord = self.config.irecord(*args)
1288 self._put(irecord, trace)
1290 def get_record(self, args):
1291 irecord = self.config.irecord(*args)
1292 return self._get_record(irecord)
1294 def str_irecord(self, args):
1295 return BaseStore.str_irecord(self, self.config.irecord(*args))
1297 def get(self, args, itmin=None, nsamples=None, decimate=1,
1298 interpolation='nearest_neighbor', implementation='c'):
1299 '''
1300 Retrieve GF trace from store.
1302 Retrieve a single GF trace from the store at (high-level) index
1303 ``args``. By default, the full trace is retrieved. Given ``itmin`` and
1304 ``nsamples``, only the selected portion of the trace is extracted. If
1305 ``decimate`` is an integer in the range [2,8], the trace is decimated
1306 on the fly or, if available, the trace is read from a decimated version
1307 of the GF store.
1309 :param args: :py:class:`~pyrocko.gf.meta.Config` index tuple, e.g.
1310 ``(source_depth, distance, component)`` as in
1311 :py:class:`~pyrocko.gf.meta.ConfigTypeA`.
1312 :type args: tuple
1313 :param itmin: Start time index (start time is ``itmin * dt``),
1314 defaults to None
1315 :type itmin: int or None
1316 :param nsamples: Number of samples, defaults to None
1317 :type nsamples: int or None
1318 :param decimate: Decimatation factor, defaults to 1
1319 :type decimate: int
1320 :param interpolation: Interpolation method
1321 ``['nearest_neighbor', 'multilinear', 'off']``, defaults to
1322 ``'nearest_neighbor'``
1323 :type interpolation: str
1324 :param implementation: Implementation to use ``['c', 'reference']``,
1325 defaults to ``'c'``.
1326 :type implementation: str
1328 :returns: GF trace at ``args``
1329 :rtype: :py:class:`~pyrocko.gf.store.GFTrace`
1330 '''
1332 store, decimate = self._decimated_store(decimate)
1333 if interpolation == 'nearest_neighbor':
1334 irecord = store.config.irecord(*args)
1335 tr = store._get(irecord, itmin, nsamples, decimate,
1336 implementation)
1338 elif interpolation == 'off':
1339 irecords, weights = store.config.vicinity(*args)
1340 if len(irecords) != 1:
1341 raise NotAllowedToInterpolate()
1342 else:
1343 tr = store._get(irecords[0], itmin, nsamples, decimate,
1344 implementation)
1346 elif interpolation == 'multilinear':
1347 tr = store._sum(irecords, num.zeros(len(irecords)), weights,
1348 itmin, nsamples, decimate, implementation,
1349 'disable')
1351 # to prevent problems with rounding errors (BaseStore saves deltat
1352 # as a 4-byte floating point value, value from YAML config is more
1353 # accurate)
1354 tr.deltat = self.config.deltat * decimate
1355 return tr
1357 def sum(self, args, delays, weights, itmin=None, nsamples=None,
1358 decimate=1, interpolation='nearest_neighbor', implementation='c',
1359 optimization='enable'):
1360 '''
1361 Sum delayed and weighted GF traces.
1363 Calculate sum of delayed and weighted GF traces. ``args`` is a tuple of
1364 arrays forming the (high-level) indices of the GF traces to be
1365 selected. Delays and weights for the summation are given in the arrays
1366 ``delays`` and ``weights``. ``itmin`` and ``nsamples`` can be given to
1367 restrict to computation to a given time interval. If ``decimate`` is
1368 an integer in the range [2,8], decimated traces are used in the
1369 summation.
1371 :param args: :py:class:`~pyrocko.gf.meta.Config` index tuple, e.g.
1372 ``(source_depth, distance, component)`` as in
1373 :py:class:`~pyrocko.gf.meta.ConfigTypeA`.
1374 :type args: tuple(numpy.ndarray)
1375 :param delays: Delay times
1376 :type delays: :py:class:`numpy.ndarray`
1377 :param weights: Trace weights
1378 :type weights: :py:class:`numpy.ndarray`
1379 :param itmin: Start time index (start time is ``itmin * dt``),
1380 defaults to None
1381 :type itmin: int or None
1382 :param nsamples: Number of samples, defaults to None
1383 :type nsamples: int or None
1384 :param decimate: Decimatation factor, defaults to 1
1385 :type decimate: int
1386 :param interpolation: Interpolation method
1387 ``['nearest_neighbor', 'multilinear', 'off']``, defaults to
1388 ``'nearest_neighbor'``
1389 :type interpolation: str
1390 :param implementation: Implementation to use,
1391 ``['c', 'alternative', 'reference']``, where ``'alternative'``
1392 and ``'reference'`` use a Python implementation, defaults to `'c'`
1393 :type implementation: str
1394 :param optimization: Optimization mode ``['enable', 'disable']``,
1395 defaults to ``'enable'``
1396 :type optimization: str
1397 :returns: Stacked GF trace.
1398 :rtype: :py:class:`~pyrocko.gf.store.GFTrace`
1399 '''
1401 store, decimate_ = self._decimated_store(decimate)
1403 if interpolation == 'nearest_neighbor':
1404 irecords = store.config.irecords(*args)
1405 else:
1406 assert interpolation == 'multilinear'
1407 irecords, ip_weights = store.config.vicinities(*args)
1408 neach = irecords.size // args[0].size
1409 weights = num.repeat(weights, neach) * ip_weights
1410 delays = num.repeat(delays, neach)
1412 tr = store._sum(irecords, delays, weights, itmin, nsamples, decimate_,
1413 implementation, optimization)
1415 # to prevent problems with rounding errors (BaseStore saves deltat
1416 # as a 4-byte floating point value, value from YAML config is more
1417 # accurate)
1418 tr.deltat = self.config.deltat * decimate
1419 return tr
1421 def make_decimated(self, decimate, config=None, force=False,
1422 show_progress=False):
1423 '''
1424 Create decimated version of GF store.
1426 Create a downsampled version of the GF store. Downsampling is done for
1427 the integer factor ``decimate`` which should be in the range [2,8]. If
1428 ``config`` is ``None``, all traces of the GF store are decimated and
1429 held available (i.e. the index mapping of the original store is used),
1430 otherwise, a different spacial stepping can be specified by giving a
1431 modified GF store configuration in ``config`` (see :py:meth:`create`).
1432 Decimated GF sub-stores are created under the ``decimated``
1433 subdirectory within the GF store directory. Holding available decimated
1434 versions of the GF store can save computation time, IO bandwidth, or
1435 decrease memory footprint at the cost of increased disk space usage,
1436 when computation are done for lower frequency signals.
1438 :param decimate: Decimate factor
1439 :type decimate: int
1440 :param config: GF store config object, defaults to None
1441 :type config: :py:class:`~pyrocko.gf.meta.Config` or None
1442 :param force: Force overwrite, defaults to ``False``
1443 :type force: bool
1444 :param show_progress: Show progress, defaults to ``False``
1445 :type show_progress: bool
1446 '''
1448 if not self._f_index:
1449 self.open()
1451 if not (2 <= decimate <= 8):
1452 raise StoreError('decimate argument must be in the range [2,8]')
1454 assert self.mode == 'r'
1456 if config is None:
1457 config = self.config
1459 config = copy.deepcopy(config)
1460 config.sample_rate = self.config.sample_rate / decimate
1462 if decimate in self._decimated:
1463 del self._decimated[decimate]
1465 store_dir = self._decimated_store_dir(decimate)
1466 if os.path.exists(store_dir):
1467 if force:
1468 shutil.rmtree(store_dir)
1469 else:
1470 raise CannotCreate('store already exists at %s' % store_dir)
1472 store_dir_incomplete = store_dir + '-incomplete'
1473 Store.create(store_dir_incomplete, config, force=force)
1475 decimated = Store(store_dir_incomplete, 'w')
1476 try:
1477 if show_progress:
1478 pbar = util.progressbar(
1479 'decimating store', self.config.nrecords)
1481 for i, args in enumerate(decimated.config.iter_nodes()):
1482 tr = self.get(args, decimate=decimate)
1483 decimated.put(args, tr)
1485 if show_progress:
1486 pbar.update(i+1)
1488 finally:
1489 if show_progress:
1490 pbar.finish()
1492 decimated.close()
1494 shutil.move(store_dir_incomplete, store_dir)
1496 self._decimated[decimate] = None
1498 def stats(self):
1499 stats = BaseStore.stats(self)
1500 stats['decimated'] = sorted(self._decimated.keys())
1501 return stats
1503 stats_keys = BaseStore.stats_keys + ['decimated']
1505 def check(self, show_progress=False):
1506 have_holes = []
1507 for pdef in self.config.tabulated_phases:
1508 phase_id = pdef.id
1509 ph = self.get_stored_phase(phase_id)
1510 if ph.check_holes():
1511 have_holes.append(phase_id)
1513 if have_holes:
1514 for phase_id in have_holes:
1515 logger.warning(
1516 'Travel time table of phase "{}" contains holes. You can '
1517 ' use `fomosto tttlsd` to fix holes.'.format(
1518 phase_id))
1519 else:
1520 logger.info('No holes in travel time tables')
1522 try:
1523 if show_progress:
1524 pbar = util.progressbar('checking store', self.config.nrecords)
1526 problems = 0
1527 for i, args in enumerate(self.config.iter_nodes()):
1528 tr = self.get(args)
1529 if tr and not tr.is_zero:
1530 if not tr.begin_value == tr.data[0]:
1531 logger.warning('wrong begin value for trace at %s '
1532 '(data corruption?)' % str(args))
1533 problems += 1
1534 if not tr.end_value == tr.data[-1]:
1535 logger.warning('wrong end value for trace at %s '
1536 '(data corruption?)' % str(args))
1537 problems += 1
1538 if not num.all(num.isfinite(tr.data)):
1539 logger.warning(
1540 'nans or infs in trace at %s' % str(args))
1541 problems += 1
1543 if show_progress:
1544 pbar.update(i+1)
1546 finally:
1547 if show_progress:
1548 pbar.finish()
1550 return problems
1552 def check_earthmodels(self, config):
1553 if config.earthmodel_receiver_1d.profile('z')[-1] not in\
1554 config.earthmodel_1d.profile('z'):
1555 logger.warning('deepest layer of earthmodel_receiver_1d not '
1556 'found in earthmodel_1d')
1558 def _decimated_store_dir(self, decimate):
1559 return os.path.join(self.store_dir, 'decimated', str(decimate))
1561 def _decimated_store(self, decimate):
1562 if decimate == 1 or decimate not in self._decimated:
1563 return self, decimate
1564 else:
1565 store = self._decimated[decimate]
1566 if store is None:
1567 store = Store(self._decimated_store_dir(decimate), 'r')
1568 self._decimated[decimate] = store
1570 return store, 1
1572 def phase_filename(self, phase_id, attribute='phase'):
1573 check_string_id(phase_id)
1574 return os.path.join(
1575 self.store_dir, 'phases', phase_id + '.%s' % attribute)
1577 def get_phase_identifier(self, phase_id, attribute):
1578 return '{}.{}'.format(phase_id, attribute)
1580 def get_stored_phase(self, phase_def, attribute='phase'):
1581 '''
1582 Get stored phase from GF store.
1584 :returns: Phase information
1585 :rtype: :py:class:`pyrocko.spit.SPTree`
1586 '''
1588 phase_id = self.get_phase_identifier(phase_def, attribute)
1589 if phase_id not in self._phases:
1590 fn = self.phase_filename(phase_def, attribute)
1591 if not os.path.isfile(fn):
1592 raise NoSuchPhase(phase_id)
1594 spt = spit.SPTree(filename=fn)
1595 self._phases[phase_id] = spt
1597 return self._phases[phase_id]
1599 def get_phase(self, phase_def):
1600 toks = phase_def.split(':', 1)
1601 if len(toks) == 2:
1602 provider, phase_def = toks
1603 else:
1604 provider, phase_def = 'stored', toks[0]
1606 if provider == 'stored':
1607 return self.get_stored_phase(phase_def)
1609 elif provider == 'vel':
1610 vel = float(phase_def) * 1000.
1612 def evaluate(args):
1613 return self.config.get_distance(args) / vel
1615 return evaluate
1617 elif provider == 'vel_surface':
1618 vel = float(phase_def) * 1000.
1620 def evaluate(args):
1621 return self.config.get_surface_distance(args) / vel
1623 return evaluate
1625 elif provider in ('cake', 'iaspei'):
1626 from pyrocko import cake
1627 mod = self.config.earthmodel_1d
1628 if provider == 'cake':
1629 phases = [cake.PhaseDef(phase_def)]
1630 else:
1631 phases = cake.PhaseDef.classic(phase_def)
1633 def evaluate(args):
1634 if len(args) == 2:
1635 zr, zs, x = (self.config.receiver_depth,) + args
1636 elif len(args) == 3:
1637 zr, zs, x = args
1638 else:
1639 assert False
1641 t = []
1642 if phases:
1643 rays = mod.arrivals(
1644 phases=phases,
1645 distances=[x*cake.m2d],
1646 zstart=zs,
1647 zstop=zr)
1649 for ray in rays:
1650 t.append(ray.t)
1652 if t:
1653 return min(t)
1654 else:
1655 return None
1657 return evaluate
1659 raise StoreError('unsupported phase provider: %s' % provider)
1661 def t(self, timing, *args):
1662 '''
1663 Compute interpolated phase arrivals.
1665 **Examples:**
1667 If ``test_store`` is of :py:class:`~pyrocko.gf.meta.ConfigTypeA`::
1669 test_store.t('p', (1000, 10000))
1670 test_store.t('last{P|Pdiff}', (1000, 10000)) # The latter arrival
1671 # of P or diffracted
1672 # P phase
1674 If ``test_store`` is of :py:class:`~pyrocko.gf.meta.ConfigTypeB`::
1676 test_store.t('S', (1000, 1000, 10000))
1677 test_store.t('first{P|p|Pdiff|sP}', (1000, 1000, 10000)) # The
1678 ` # first arrival of
1679 # the given phases is
1680 # selected
1682 :param timing: Timing string as described above
1683 :type timing: str or :py:class:`~pyrocko.gf.meta.Timing`
1684 :param \\*args: :py:class:`~pyrocko.gf.meta.Config` index tuple, e.g.
1685 ``(source_depth, distance, component)`` as in
1686 :py:class:`~pyrocko.gf.meta.ConfigTypeA`.
1687 :type \\*args: tuple
1688 :returns: Phase arrival according to ``timing``
1689 :rtype: float or None
1690 '''
1692 if len(args) == 1:
1693 args = args[0]
1694 else:
1695 args = self.config.make_indexing_args1(*args)
1697 if not isinstance(timing, meta.Timing):
1698 timing = meta.Timing(timing)
1700 return timing.evaluate(self.get_phase, args)
1702 def get_available_interpolation_tables(self):
1703 filenames = glob(op.join(self.store_dir, 'phases', '*'))
1704 return [op.basename(file) for file in filenames]
1706 def get_stored_attribute(self, phase_def, attribute, *args):
1707 '''
1708 Return interpolated store attribute
1710 :param attribute: takeoff_angle / incidence_angle [deg]
1711 :type attribute: str
1712 :param \\*args: :py:class:`~pyrocko.gf.meta.Config` index tuple, e.g.
1713 ``(source_depth, distance, component)`` as in
1714 :py:class:`~pyrocko.gf.meta.ConfigTypeA`.
1715 :type \\*args: tuple
1716 '''
1717 try:
1718 return self.get_stored_phase(phase_def, attribute)(*args)
1719 except NoSuchPhase:
1720 raise StoreError(
1721 'Interpolation table for {} of {} does not exist! '
1722 'Available tables: {}'.format(
1723 attribute, phase_def,
1724 self.get_available_interpolation_tables()))
1726 def get_many_stored_attributes(self, phase_def, attribute, coords):
1727 '''
1728 Return interpolated store attribute
1730 :param attribute: takeoff_angle / incidence_angle [deg]
1731 :type attribute: str
1732 :param \\coords: :py:class:`num.array.Array`, with columns being
1733 ``(source_depth, distance, component)`` as in
1734 :py:class:`~pyrocko.gf.meta.ConfigTypeA`.
1735 :type \\coords: :py:class:`num.array.Array`
1736 '''
1737 try:
1738 return self.get_stored_phase(
1739 phase_def, attribute).interpolate_many(coords)
1740 except NoSuchPhase:
1741 raise StoreError(
1742 'Interpolation table for {} of {} does not exist! '
1743 'Available tables: {}'.format(
1744 attribute, phase_def,
1745 self.get_available_interpolation_tables()))
1747 def make_stored_table(self, attribute, force=False):
1748 '''
1749 Compute tables for selected ray attributes.
1751 :param attribute: phase / takeoff_angle [deg]/ incidence_angle [deg]
1752 :type attribute: str
1754 Tables are computed using the 1D earth model defined in
1755 :py:attr:`~pyrocko.gf.meta.Config.earthmodel_1d` for each defined phase
1756 in :py:attr:`~pyrocko.gf.meta.Config.tabulated_phases`.
1757 '''
1759 if attribute not in available_stored_tables:
1760 raise StoreError(
1761 'Cannot create attribute table for {}! '
1762 'Supported attribute tables: {}'.format(
1763 attribute, available_stored_tables))
1765 from pyrocko import cake
1766 config = self.config
1768 if not config.tabulated_phases:
1769 return
1771 mod = config.earthmodel_1d
1773 if not mod:
1774 raise StoreError('no earth model found')
1776 if config.earthmodel_receiver_1d:
1777 self.check_earthmodels(config)
1779 for pdef in config.tabulated_phases:
1781 phase_id = pdef.id
1782 phases = pdef.phases
1784 if attribute == 'phase':
1785 ftol = config.deltat * 0.5
1786 horvels = pdef.horizontal_velocities
1787 else:
1788 ftol = config.deltat * 0.01
1790 fn = self.phase_filename(phase_id, attribute)
1792 if os.path.exists(fn) and not force:
1793 logger.info('file already exists: %s' % fn)
1794 continue
1796 def evaluate(args):
1798 nargs = len(args)
1799 if nargs == 2:
1800 receiver_depth, source_depth, distance = (
1801 config.receiver_depth,) + args
1802 elif nargs == 3:
1803 receiver_depth, source_depth, distance = args
1804 else:
1805 raise ValueError(
1806 'Number of input arguments %i is not supported!'
1807 'Supported number of arguments: 2 or 3' % nargs)
1809 ray_attribute_values = []
1810 arrival_times = []
1811 if phases:
1812 rays = mod.arrivals(
1813 phases=phases,
1814 distances=[distance * cake.m2d],
1815 zstart=source_depth,
1816 zstop=receiver_depth)
1818 for ray in rays:
1819 arrival_times.append(ray.t)
1820 if attribute != 'phase':
1821 ray_attribute_values.append(
1822 getattr(ray, attribute)())
1824 if attribute == 'phase':
1825 for v in horvels:
1826 arrival_times.append(distance / (v * km))
1828 if arrival_times:
1829 if attribute == 'phase':
1830 return min(arrival_times)
1831 else:
1832 earliest_idx = num.argmin(arrival_times)
1833 return ray_attribute_values[earliest_idx]
1834 else:
1835 return None
1837 logger.info(
1838 'making "%s" table for phasegroup "%s"', attribute, phase_id)
1840 ip = spit.SPTree(
1841 f=evaluate,
1842 ftol=ftol,
1843 xbounds=num.transpose((config.mins, config.maxs)),
1844 xtols=config.deltas)
1846 util.ensuredirs(fn)
1847 ip.dump(fn)
1849 def make_timing_params(self, begin, end, snap_vred=True, force=False):
1850 '''
1851 Compute tight parameterized time ranges to include given timings.
1853 Calculates appropriate time ranges to cover given begin and end timings
1854 over all GF points in the store. A dict with the following keys is
1855 returned:
1857 * ``'tmin'``: time [s], minimum of begin timing over all GF points
1858 * ``'tmax'``: time [s], maximum of end timing over all GF points
1859 * ``'vred'``, ``'tmin_vred'``: slope [m/s] and offset [s] of reduction
1860 velocity [m/s] appropriate to catch begin timing over all GF points
1861 * ``'tlenmax_vred'``: maximum time length needed to cover all end
1862 timings, when using linear slope given with (``vred``, ``tmin_vred``)
1863 as start
1864 '''
1866 data = []
1867 warned = set()
1868 for args in self.config.iter_nodes(level=-1):
1869 tmin = self.t(begin, args)
1870 tmax = self.t(end, args)
1871 if tmin is None:
1872 warned.add(str(begin))
1873 if tmax is None:
1874 warned.add(str(end))
1876 x = self.config.get_surface_distance(args)
1877 data.append((x, tmin, tmax))
1879 if len(warned):
1880 w = ' | '.join(list(warned))
1881 msg = '''determination of time window failed using phase
1882definitions: %s.\n Travel time table contains holes in probed ranges. You can
1883use `fomosto tttlsd` to fix holes.''' % w
1884 if force:
1885 logger.warning(msg)
1886 else:
1887 raise MakeTimingParamsFailed(msg)
1889 xs, tmins, tmaxs = num.array(data, dtype=float).T
1891 tlens = tmaxs - tmins
1893 i = num.nanargmin(tmins)
1894 if not num.isfinite(i):
1895 raise MakeTimingParamsFailed('determination of time window failed')
1897 tminmin = tmins[i]
1898 x_tminmin = xs[i]
1899 dx = (xs - x_tminmin)
1900 dx = num.where(dx != 0.0, dx, num.nan)
1901 s = (tmins - tminmin) / dx
1902 sred = num.min(num.abs(s[num.isfinite(s)]))
1904 deltax = self.config.distance_delta
1906 if snap_vred:
1907 tdif = sred*deltax
1908 tdif2 = self.config.deltat * math.floor(tdif / self.config.deltat)
1909 sred = tdif2/self.config.distance_delta
1911 tmin_vred = tminmin - sred*x_tminmin
1912 if snap_vred:
1913 xe = x_tminmin - int(x_tminmin / deltax) * deltax
1914 tmin_vred = float(
1915 self.config.deltat *
1916 math.floor(tmin_vred / self.config.deltat) - xe * sred)
1918 tlenmax_vred = num.nanmax(tmax - (tmin_vred + sred*x))
1919 if sred != 0.0:
1920 vred = 1.0/sred
1921 else:
1922 vred = 0.0
1924 return dict(
1925 tmin=tminmin,
1926 tmax=num.nanmax(tmaxs),
1927 tlenmax=num.nanmax(tlens),
1928 tmin_vred=tmin_vred,
1929 tlenmax_vred=tlenmax_vred,
1930 vred=vred)
1932 def make_travel_time_tables(self, force=False):
1933 '''
1934 Compute travel time tables.
1936 Travel time tables are computed using the 1D earth model defined in
1937 :py:attr:`~pyrocko.gf.meta.Config.earthmodel_1d` for each defined phase
1938 in :py:attr:`~pyrocko.gf.meta.Config.tabulated_phases`. The accuracy of
1939 the tablulated times is adjusted to the sampling rate of the store.
1940 '''
1941 self.make_stored_table(attribute='phase', force=force)
1943 def make_ttt(self, force=False):
1944 self.make_travel_time_tables(force=force)
1946 def make_takeoff_angle_tables(self, force=False):
1947 '''
1948 Compute takeoff-angle tables.
1950 Takeoff-angle tables [deg] are computed using the 1D earth model
1951 defined in :py:attr:`~pyrocko.gf.meta.Config.earthmodel_1d` for each
1952 defined phase in :py:attr:`~pyrocko.gf.meta.Config.tabulated_phases`.
1953 The accuracy of the tablulated times is adjusted to 0.01 times the
1954 sampling rate of the store.
1955 '''
1956 self.make_stored_table(attribute='takeoff_angle', force=force)
1958 def make_incidence_angle_tables(self, force=False):
1959 '''
1960 Compute incidence-angle tables.
1962 Incidence-angle tables [deg] are computed using the 1D earth model
1963 defined in :py:attr:`~pyrocko.gf.meta.Config.earthmodel_1d` for each
1964 defined phase in :py:attr:`~pyrocko.gf.meta.Config.tabulated_phases`.
1965 The accuracy of the tablulated times is adjusted to 0.01 times the
1966 sampling rate of the store.
1967 '''
1968 self.make_stored_table(attribute='incidence_angle', force=force)
1970 def get_provided_components(self):
1972 scheme_desc = meta.component_scheme_to_description[
1973 self.config.component_scheme]
1975 quantity = self.config.stored_quantity \
1976 or scheme_desc.default_stored_quantity
1978 if not quantity:
1979 return scheme_desc.provided_components
1980 else:
1981 return [
1982 quantity + '.' + comp
1983 for comp in scheme_desc.provided_components]
1985 def fix_ttt_holes(self, phase_id):
1987 pdef = self.config.get_tabulated_phase(phase_id)
1988 mode = None
1989 for phase in pdef.phases:
1990 for leg in phase.legs():
1991 if mode is None:
1992 mode = leg.mode
1994 else:
1995 if mode != leg.mode:
1996 raise StoreError(
1997 'Can only fix holes in pure P or pure S phases.')
1999 sptree = self.get_stored_phase(phase_id)
2000 sptree_lsd = self.config.fix_ttt_holes(sptree, mode)
2002 phase_lsd = phase_id + '.lsd'
2003 fn = self.phase_filename(phase_lsd)
2004 sptree_lsd.dump(fn)
2006 def statics(self, source, multi_location, itsnapshot, components,
2007 interpolation='nearest_neighbor', nthreads=0):
2008 if not self._f_index:
2009 self.open()
2011 out = {}
2012 ntargets = multi_location.ntargets
2013 source_terms = source.get_source_terms(self.config.component_scheme)
2014 # TODO: deal with delays for snapshots > 1 sample
2016 if itsnapshot is not None:
2017 delays = source.times
2019 # Fringe case where we sample at sample 0 and sample 1
2020 tsnapshot = itsnapshot * self.config.deltat
2021 if delays.max() == tsnapshot and delays.min() != tsnapshot:
2022 delays[delays == delays.max()] -= self.config.deltat
2024 else:
2025 delays = source.times * 0
2026 itsnapshot = 1
2028 if ntargets == 0:
2029 raise StoreError('MultiLocation.coords5 is empty')
2031 res = store_ext.store_calc_static(
2032 self.cstore,
2033 source.coords5(),
2034 source_terms,
2035 delays,
2036 multi_location.coords5,
2037 self.config.component_scheme,
2038 interpolation,
2039 itsnapshot,
2040 nthreads)
2042 out = {}
2043 for icomp, (comp, comp_res) in enumerate(
2044 zip(self.get_provided_components(), res)):
2045 if comp not in components:
2046 continue
2047 out[comp] = res[icomp]
2049 return out
2051 def calc_seismograms(self, source, receivers, components, deltat=None,
2052 itmin=None, nsamples=None,
2053 interpolation='nearest_neighbor',
2054 optimization='enable', nthreads=1):
2055 config = self.config
2057 assert interpolation in ['nearest_neighbor', 'multilinear'], \
2058 'Unknown interpolation: %s' % interpolation
2060 if not isinstance(receivers, list):
2061 receivers = [receivers]
2063 if deltat is None:
2064 decimate = 1
2065 else:
2066 decimate = int(round(deltat/config.deltat))
2067 if abs(deltat / (decimate * config.deltat) - 1.0) > 0.001:
2068 raise StoreError(
2069 'unavailable decimation ratio target.deltat / store.deltat'
2070 ' = %g / %g' % (deltat, config.deltat))
2072 store, decimate_ = self._decimated_store(decimate)
2074 if not store._f_index:
2075 store.open()
2077 scheme = config.component_scheme
2079 source_coords_arr = source.coords5()
2080 source_terms = source.get_source_terms(scheme)
2081 delays = source.times.ravel()
2083 nreceiver = len(receivers)
2084 receiver_coords_arr = num.empty((nreceiver, 5))
2085 for irec, rec in enumerate(receivers):
2086 receiver_coords_arr[irec, :] = rec.coords5
2088 dt = self.get_deltat()
2090 itoffset = int(num.floor(delays.min()/dt)) if delays.size != 0 else 0
2092 if itmin is None:
2093 itmin = num.zeros(nreceiver, dtype=num.int32)
2094 else:
2095 itmin = (itmin-itoffset).astype(num.int32)
2097 if nsamples is None:
2098 nsamples = num.zeros(nreceiver, dtype=num.int32) - 1
2099 else:
2100 nsamples = nsamples.astype(num.int32)
2102 try:
2103 results = store_ext.store_calc_timeseries(
2104 store.cstore,
2105 source_coords_arr,
2106 source_terms,
2107 (delays - itoffset*dt),
2108 receiver_coords_arr,
2109 scheme,
2110 interpolation,
2111 itmin,
2112 nsamples,
2113 nthreads)
2114 except Exception as e:
2115 raise e
2117 provided_components = self.get_provided_components()
2118 ncomponents = len(provided_components)
2120 seismograms = [dict() for _ in range(nreceiver)]
2121 for ires in range(len(results)):
2122 res = results.pop(0)
2123 ireceiver = ires // ncomponents
2125 comp = provided_components[res[-2]]
2127 if comp not in components:
2128 continue
2130 tr = GFTrace(*res[:-2])
2131 tr.deltat = config.deltat * decimate
2132 tr.itmin += itoffset
2134 tr.n_records_stacked = 0
2135 tr.t_optimize = 0.
2136 tr.t_stack = 0.
2137 tr.err = res[-1]
2139 seismograms[ireceiver][comp] = tr
2141 return seismograms
2143 def seismogram(self, source, receiver, components, deltat=None,
2144 itmin=None, nsamples=None,
2145 interpolation='nearest_neighbor',
2146 optimization='enable', nthreads=1):
2148 config = self.config
2150 if deltat is None:
2151 decimate = 1
2152 else:
2153 decimate = int(round(deltat/config.deltat))
2154 if abs(deltat / (decimate * config.deltat) - 1.0) > 0.001:
2155 raise StoreError(
2156 'unavailable decimation ratio target.deltat / store.deltat'
2157 ' = %g / %g' % (deltat, config.deltat))
2159 store, decimate_ = self._decimated_store(decimate)
2161 if not store._f_index:
2162 store.open()
2164 scheme = config.component_scheme
2166 source_coords_arr = source.coords5()
2167 source_terms = source.get_source_terms(scheme)
2168 receiver_coords_arr = receiver.coords5[num.newaxis, :].copy()
2170 try:
2171 params = store_ext.make_sum_params(
2172 store.cstore,
2173 source_coords_arr,
2174 source_terms,
2175 receiver_coords_arr,
2176 scheme,
2177 interpolation, nthreads)
2179 except store_ext.StoreExtError:
2180 raise meta.OutOfBounds()
2182 provided_components = self.get_provided_components()
2184 out = {}
2185 for icomp, comp in enumerate(provided_components):
2186 if comp in components:
2187 weights, irecords = params[icomp]
2189 neach = irecords.size // source.times.size
2190 delays = num.repeat(source.times, neach)
2192 tr = store._sum(
2193 irecords, delays, weights, itmin, nsamples, decimate_,
2194 'c', optimization)
2196 # to prevent problems with rounding errors (BaseStore saves
2197 # deltat as a 4-byte floating point value, value from YAML
2198 # config is more accurate)
2199 tr.deltat = config.deltat * decimate
2201 out[comp] = tr
2203 return out
2206__all__ = '''
2207gf_dtype
2208NotMultipleOfSamplingInterval
2209GFTrace
2210CannotCreate
2211CannotOpen
2212DuplicateInsert
2213NotAllowedToInterpolate
2214NoSuchExtra
2215NoSuchPhase
2216BaseStore
2217Store
2218SeismosizerErrorEnum
2219'''.split()