1# http://pyrocko.org - GPLv3
2#
3# The Pyrocko Developers, 21st Century
4# ---|P------/S----------~Lg----------
5from __future__ import absolute_import, division
7import errno
8import time
9import os
10import struct
11import weakref
12import math
13import shutil
14try:
15 import fcntl
16except ImportError:
17 fcntl = None
18import copy
19import logging
20import re
21import hashlib
22from glob import glob
24import numpy as num
25from scipy import signal
27from . import meta
28from .error import StoreError
29try:
30 from . import store_ext
31except ImportError:
32 store_ext = None
33from pyrocko import util, spit
35logger = logging.getLogger('pyrocko.gf.store')
36op = os.path
38# gf store endianness
39E = '<'
41gf_dtype = num.dtype(num.float32)
42gf_dtype_store = num.dtype(E + 'f4')
44gf_dtype_nbytes_per_sample = 4
46gf_store_header_dtype = [
47 ('nrecords', E + 'u8'),
48 ('deltat', E + 'f4'),
49]
51gf_store_header_fmt = E + 'Qf'
52gf_store_header_fmt_size = struct.calcsize(gf_store_header_fmt)
54gf_record_dtype = num.dtype([
55 ('data_offset', E + 'u8'),
56 ('itmin', E + 'i4'),
57 ('nsamples', E + 'u4'),
58 ('begin_value', E + 'f4'),
59 ('end_value', E + 'f4'),
60])
62available_stored_tables = ['phase', 'takeoff_angle', 'incidence_angle']
64km = 1000.
67class SeismosizerErrorEnum:
68 SUCCESS = 0
69 INVALID_RECORD = 1
70 EMPTY_RECORD = 2
71 BAD_RECORD = 3
72 ALLOC_FAILED = 4
73 BAD_REQUEST = 5
74 BAD_DATA_OFFSET = 6
75 READ_DATA_FAILED = 7
76 SEEK_INDEX_FAILED = 8
77 READ_INDEX_FAILED = 9
78 FSTAT_TRACES_FAILED = 10
79 BAD_STORE = 11
80 MMAP_INDEX_FAILED = 12
81 MMAP_TRACES_FAILED = 13
82 INDEX_OUT_OF_BOUNDS = 14
83 NTARGETS_OUT_OF_BOUNDS = 15
86def valid_string_id(s):
87 return re.match(meta.StringID.pattern, s)
90def check_string_id(s):
91 if not valid_string_id(s):
92 raise ValueError('invalid name %s' % s)
94# - data_offset
95#
96# Data file offset of first sample in bytes (the seek value).
97# Special values:
98#
99# 0x00 - missing trace
100# 0x01 - zero trace (GF trace is physically zero)
101# 0x02 - short trace of 1 or 2 samples (no need for data allocation)
102#
103# - itmin
104#
105# Time of first sample of the trace as a multiple of the sampling interval. All
106# GF samples must be evaluated at multiples of the sampling interval with
107# respect to a global reference time zero. Must be set also for zero traces.
108#
109# - nsamples
110#
111# Number of samples in the GF trace. Should be set to zero for zero traces.
112#
113# - begin_value, end_value
114#
115# Values of first and last sample. These values are included in data[]
116# redunantly.
119class NotMultipleOfSamplingInterval(Exception):
120 pass
123sampling_check_eps = 1e-5
126class GFTrace(object):
127 '''
128 Green's Function trace class for handling traces from the GF store.
129 '''
131 @classmethod
132 def from_trace(cls, tr):
133 return cls(data=tr.ydata.copy(), tmin=tr.tmin, deltat=tr.deltat)
135 def __init__(self, data=None, itmin=None, deltat=1.0,
136 is_zero=False, begin_value=None, end_value=None, tmin=None):
138 assert sum((x is None) for x in (tmin, itmin)) == 1, \
139 'GFTrace: either tmin or itmin must be given'\
141 if tmin is not None:
142 itmin = int(round(tmin / deltat))
143 if abs(itmin*deltat - tmin) > sampling_check_eps*deltat:
144 raise NotMultipleOfSamplingInterval(
145 'GFTrace: tmin (%g) is not a multiple of sampling '
146 'interval (%g)' % (tmin, deltat))
148 if data is not None:
149 data = num.asarray(data, dtype=gf_dtype)
150 else:
151 data = num.array([], dtype=gf_dtype)
152 begin_value = 0.0
153 end_value = 0.0
155 self.data = weakref.ref(data)()
156 self.itmin = itmin
157 self.deltat = deltat
158 self.is_zero = is_zero
159 self.n_records_stacked = 0.
160 self.t_stack = 0.
161 self.t_optimize = 0.
162 self.err = SeismosizerErrorEnum.SUCCESS
164 if data is not None and data.size > 0:
165 if begin_value is None:
166 begin_value = data[0]
167 if end_value is None:
168 end_value = data[-1]
170 self.begin_value = begin_value
171 self.end_value = end_value
173 @property
174 def t(self):
175 '''
176 Time vector of the GF trace.
178 :returns: Time vector
179 :rtype: :py:class:`numpy.ndarray`
180 '''
181 return num.linspace(
182 self.itmin*self.deltat,
183 (self.itmin+self.data.size-1)*self.deltat, self.data.size)
185 def __str__(self, itmin=0):
187 if self.is_zero:
188 return 'ZERO'
190 s = []
191 for i in range(itmin, self.itmin + self.data.size):
192 if i >= self.itmin and i < self.itmin + self.data.size:
193 s.append('%7.4g' % self.data[i-self.itmin])
194 else:
195 s.append(' '*7)
197 return '|'.join(s)
199 def to_trace(self, net, sta, loc, cha):
200 from pyrocko import trace
201 return trace.Trace(
202 net, sta, loc, cha,
203 ydata=self.data,
204 deltat=self.deltat,
205 tmin=self.itmin*self.deltat)
208class GFValue(object):
210 def __init__(self, value):
211 self.value = value
212 self.n_records_stacked = 0.
213 self.t_stack = 0.
214 self.t_optimize = 0.
217Zero = GFTrace(is_zero=True, itmin=0)
220def make_same_span(tracesdict):
222 traces = tracesdict.values()
224 nonzero = [tr for tr in traces if not tr.is_zero]
225 if not nonzero:
226 return {k: Zero for k in tracesdict.keys()}
228 deltat = nonzero[0].deltat
230 itmin = min(tr.itmin for tr in nonzero)
231 itmax = max(tr.itmin+tr.data.size for tr in nonzero) - 1
233 out = {}
234 for k, tr in tracesdict.items():
235 n = itmax - itmin + 1
236 if tr.itmin != itmin or tr.data.size != n:
237 data = num.zeros(n, dtype=gf_dtype)
238 if not tr.is_zero:
239 lo = tr.itmin-itmin
240 hi = lo + tr.data.size
241 data[:lo] = tr.data[0]
242 data[lo:hi] = tr.data
243 data[hi:] = tr.data[-1]
245 tr = GFTrace(data, itmin, deltat)
247 out[k] = tr
249 return out
252class CannotCreate(StoreError):
253 pass
256class CannotOpen(StoreError):
257 pass
260class DuplicateInsert(StoreError):
261 pass
264class ShortRead(StoreError):
265 def __str__(self):
266 return 'unexpected end of data (truncated traces file?)'
269class NotAllowedToInterpolate(StoreError):
270 def __str__(self):
271 return 'not allowed to interpolate'
274class NoSuchExtra(StoreError):
275 def __init__(self, s):
276 StoreError.__init__(self)
277 self.value = s
279 def __str__(self):
280 return 'extra information for key "%s" not found.' % self.value
283class NoSuchPhase(StoreError):
284 def __init__(self, s):
285 StoreError.__init__(self)
286 self.value = s
288 def __str__(self):
289 return 'phase for key "%s" not found. ' \
290 'Running "fomosto ttt" may be needed.' % self.value
293def remove_if_exists(fn, force=False):
294 if os.path.exists(fn):
295 if force:
296 os.remove(fn)
297 else:
298 raise CannotCreate('file %s already exists' % fn)
301def get_extra_path(store_dir, key):
302 check_string_id(key)
303 return os.path.join(store_dir, 'extra', key)
306class BaseStore(object):
308 @staticmethod
309 def lock_fn_(store_dir):
310 return os.path.join(store_dir, 'lock')
312 @staticmethod
313 def index_fn_(store_dir):
314 return os.path.join(store_dir, 'index')
316 @staticmethod
317 def data_fn_(store_dir):
318 return os.path.join(store_dir, 'traces')
320 @staticmethod
321 def config_fn_(store_dir):
322 return os.path.join(store_dir, 'config')
324 @staticmethod
325 def create(store_dir, deltat, nrecords, force=False):
327 try:
328 util.ensuredir(store_dir)
329 except Exception:
330 raise CannotCreate('cannot create directory %s' % store_dir)
332 index_fn = BaseStore.index_fn_(store_dir)
333 data_fn = BaseStore.data_fn_(store_dir)
335 for fn in (index_fn, data_fn):
336 remove_if_exists(fn, force)
338 with open(index_fn, 'wb') as f:
339 f.write(struct.pack(gf_store_header_fmt, nrecords, deltat))
340 records = num.zeros(nrecords, dtype=gf_record_dtype)
341 records.tofile(f)
343 with open(data_fn, 'wb') as f:
344 f.write(b'\0' * 32)
346 def __init__(self, store_dir, mode='r', use_memmap=True):
347 assert mode in 'rw'
348 self.store_dir = store_dir
349 self.mode = mode
350 self._use_memmap = use_memmap
351 self._nrecords = None
352 self._deltat = None
353 self._f_index = None
354 self._f_data = None
355 self._records = None
356 self.cstore = None
358 def open(self):
359 assert self._f_index is None
360 index_fn = self.index_fn()
361 data_fn = self.data_fn()
363 if self.mode == 'r':
364 fmode = 'rb'
365 elif self.mode == 'w':
366 fmode = 'r+b'
367 else:
368 assert False, 'invalid mode: %s' % self.mode
370 try:
371 self._f_index = open(index_fn, fmode)
372 self._f_data = open(data_fn, fmode)
373 except Exception:
374 self.mode = ''
375 raise CannotOpen('cannot open gf store: %s' % self.store_dir)
377 try:
378 # replace single precision deltat value in store with the double
379 # precision value from the config, if available
380 self.cstore = store_ext.store_init(
381 self._f_index.fileno(), self._f_data.fileno(),
382 self.get_deltat() or 0.0)
384 except store_ext.StoreExtError as e:
385 raise StoreError(str(e))
387 while True:
388 try:
389 dataheader = self._f_index.read(gf_store_header_fmt_size)
390 break
392 except IOError as e:
393 # occasionally got this one on an NFS volume
394 if e.errno == errno.EBUSY:
395 time.sleep(0.01)
396 else:
397 raise
399 nrecords, deltat = struct.unpack(gf_store_header_fmt, dataheader)
400 self._nrecords = nrecords
401 self._deltat = deltat
403 self._load_index()
405 def __del__(self):
406 if self.mode != '':
407 self.close()
409 def lock(self):
410 if not fcntl:
411 lock_fn = self.lock_fn()
412 util.create_lockfile(lock_fn)
413 else:
414 if not self._f_index:
415 self.open()
417 while True:
418 try:
419 fcntl.lockf(self._f_index, fcntl.LOCK_EX)
420 break
422 except IOError as e:
423 if e.errno == errno.ENOLCK:
424 time.sleep(0.01)
425 else:
426 raise
428 def unlock(self):
429 if not fcntl:
430 lock_fn = self.lock_fn()
431 util.delete_lockfile(lock_fn)
432 else:
433 self._f_data.flush()
434 self._f_index.flush()
435 fcntl.lockf(self._f_index, fcntl.LOCK_UN)
437 def put(self, irecord, trace):
438 self._put(irecord, trace)
440 def get_record(self, irecord):
441 return self._get_record(irecord)
443 def get_span(self, irecord, decimate=1):
444 return self._get_span(irecord, decimate=decimate)
446 def get(self, irecord, itmin=None, nsamples=None, decimate=1,
447 implementation='c'):
448 return self._get(irecord, itmin, nsamples, decimate, implementation)
450 def sum(self, irecords, delays, weights, itmin=None,
451 nsamples=None, decimate=1, implementation='c',
452 optimization='enable'):
453 return self._sum(irecords, delays, weights, itmin, nsamples, decimate,
454 implementation, optimization)
456 def irecord_format(self):
457 return util.zfmt(self._nrecords)
459 def str_irecord(self, irecord):
460 return self.irecord_format() % irecord
462 def close(self):
463 if self.mode == 'w':
464 if not self._f_index:
465 self.open()
466 self._save_index()
468 if self._f_data:
469 self._f_data.close()
470 self._f_data = None
472 if self._f_index:
473 self._f_index.close()
474 self._f_index = None
476 del self._records
477 self._records = None
479 self.mode = ''
481 def _get_record(self, irecord):
482 if not self._f_index:
483 self.open()
485 return self._records[irecord]
487 def _get(self, irecord, itmin, nsamples, decimate, implementation):
488 '''
489 Retrieve complete GF trace from storage.
490 '''
492 if not self._f_index:
493 self.open()
495 if not self.mode == 'r':
496 raise StoreError('store not open in read mode')
498 if implementation == 'c' and decimate == 1:
500 if nsamples is None:
501 nsamples = -1
503 if itmin is None:
504 itmin = 0
506 try:
507 return GFTrace(*store_ext.store_get(
508 self.cstore, int(irecord), int(itmin), int(nsamples)))
509 except store_ext.StoreExtError as e:
510 raise StoreError(str(e))
512 else:
513 return self._get_impl_reference(irecord, itmin, nsamples, decimate)
515 def get_deltat(self):
516 return self._deltat
518 def _get_impl_reference(self, irecord, itmin, nsamples, decimate):
519 deltat = self.get_deltat()
521 if not (0 <= irecord < self._nrecords):
522 raise StoreError('invalid record number requested '
523 '(irecord = %i, nrecords = %i)' %
524 (irecord, self._nrecords))
526 ipos, itmin_data, nsamples_data, begin_value, end_value = \
527 self._records[irecord]
529 if None in (itmin, nsamples):
530 itmin = itmin_data
531 itmax = itmin_data + nsamples_data - 1
532 nsamples = nsamples_data
533 else:
534 itmin = itmin * decimate
535 nsamples = nsamples * decimate
536 itmax = itmin + nsamples - decimate
538 if ipos == 0:
539 return None
541 elif ipos == 1:
542 itmin_ext = (max(itmin, itmin_data)//decimate) * decimate
543 return GFTrace(is_zero=True, itmin=itmin_ext//decimate)
545 if decimate == 1:
546 ilo = max(itmin, itmin_data) - itmin_data
547 ihi = min(itmin+nsamples, itmin_data+nsamples_data) - itmin_data
548 data = self._get_data(ipos, begin_value, end_value, ilo, ihi)
550 return GFTrace(data, itmin=itmin_data+ilo, deltat=deltat,
551 begin_value=begin_value, end_value=end_value)
553 else:
554 itmax_data = itmin_data + nsamples_data - 1
556 # put begin and end to multiples of new sampling rate
557 itmin_ext = (max(itmin, itmin_data)//decimate) * decimate
558 itmax_ext = -((-min(itmax, itmax_data))//decimate) * decimate
559 nsamples_ext = itmax_ext - itmin_ext + 1
561 # add some padding for the aa filter
562 order = 30
563 itmin_ext_pad = itmin_ext - order//2
564 itmax_ext_pad = itmax_ext + order//2
565 nsamples_ext_pad = itmax_ext_pad - itmin_ext_pad + 1
567 itmin_overlap = max(itmin_data, itmin_ext_pad)
568 itmax_overlap = min(itmax_data, itmax_ext_pad)
570 ilo = itmin_overlap - itmin_ext_pad
571 ihi = max(ilo, itmax_overlap - itmin_ext_pad + 1)
572 ilo_data = itmin_overlap - itmin_data
573 ihi_data = max(ilo_data, itmax_overlap - itmin_data + 1)
575 data_ext_pad = num.empty(nsamples_ext_pad, dtype=gf_dtype)
576 data_ext_pad[ilo:ihi] = self._get_data(
577 ipos, begin_value, end_value, ilo_data, ihi_data)
579 data_ext_pad[:ilo] = begin_value
580 data_ext_pad[ihi:] = end_value
582 b = signal.firwin(order + 1, 1. / decimate, window='hamming')
583 a = 1.
584 data_filt_pad = signal.lfilter(b, a, data_ext_pad)
585 data_deci = data_filt_pad[order:order+nsamples_ext:decimate]
586 if data_deci.size >= 1:
587 if itmin_ext <= itmin_data:
588 data_deci[0] = begin_value
590 if itmax_ext >= itmax_data:
591 data_deci[-1] = end_value
593 return GFTrace(data_deci, itmin_ext//decimate,
594 deltat*decimate,
595 begin_value=begin_value, end_value=end_value)
597 def _get_span(self, irecord, decimate=1):
598 '''
599 Get temporal extent of GF trace at given index.
600 '''
602 if not self._f_index:
603 self.open()
605 assert 0 <= irecord < self._nrecords, \
606 'irecord = %i, nrecords = %i' % (irecord, self._nrecords)
608 (_, itmin, nsamples, _, _) = self._records[irecord]
610 itmax = itmin + nsamples - 1
612 if decimate == 1:
613 return itmin, itmax
614 else:
615 if nsamples == 0:
616 return itmin//decimate, itmin//decimate - 1
617 else:
618 return itmin//decimate, -((-itmax)//decimate)
620 def _put(self, irecord, trace):
621 '''
622 Save GF trace to storage.
623 '''
625 if not self._f_index:
626 self.open()
628 deltat = self.get_deltat()
630 assert self.mode == 'w'
631 assert trace.is_zero or \
632 abs(trace.deltat - deltat) < 1e-7 * deltat
633 assert 0 <= irecord < self._nrecords, \
634 'irecord = %i, nrecords = %i' % (irecord, self._nrecords)
636 if self._records[irecord][0] != 0:
637 raise DuplicateInsert('record %i already in store' % irecord)
639 if trace.is_zero or num.all(trace.data == 0.0):
640 self._records[irecord] = (1, trace.itmin, 0, 0., 0.)
641 return
643 ndata = trace.data.size
645 if ndata > 2:
646 self._f_data.seek(0, 2)
647 ipos = self._f_data.tell()
648 trace.data.astype(gf_dtype_store).tofile(self._f_data)
649 else:
650 ipos = 2
652 self._records[irecord] = (ipos, trace.itmin, ndata,
653 trace.data[0], trace.data[-1])
655 def _sum_impl_alternative(self, irecords, delays, weights, itmin, nsamples,
656 decimate):
658 '''
659 Sum delayed and weighted GF traces.
660 '''
662 if not self._f_index:
663 self.open()
665 assert self.mode == 'r'
667 deltat = self.get_deltat() * decimate
669 if len(irecords) == 0:
670 if None in (itmin, nsamples):
671 return Zero
672 else:
673 return GFTrace(
674 num.zeros(nsamples, dtype=gf_dtype), itmin,
675 deltat, is_zero=True)
677 assert len(irecords) == len(delays)
678 assert len(irecords) == len(weights)
680 if None in (itmin, nsamples):
681 itmin_delayed, itmax_delayed = [], []
682 for irecord, delay in zip(irecords, delays):
683 itmin, itmax = self._get_span(irecord, decimate=decimate)
684 itmin_delayed.append(itmin + delay/deltat)
685 itmax_delayed.append(itmax + delay/deltat)
687 itmin = int(math.floor(min(itmin_delayed)))
688 nsamples = int(math.ceil(max(itmax_delayed))) - itmin + 1
690 out = num.zeros(nsamples, dtype=gf_dtype)
691 if nsamples == 0:
692 return GFTrace(out, itmin, deltat)
694 for ii in range(len(irecords)):
695 irecord = irecords[ii]
696 delay = delays[ii]
697 weight = weights[ii]
699 if weight == 0.0:
700 continue
702 idelay_floor = int(math.floor(delay/deltat))
703 idelay_ceil = int(math.ceil(delay/deltat))
705 gf_trace = self._get(
706 irecord,
707 itmin - idelay_ceil,
708 nsamples + idelay_ceil - idelay_floor,
709 decimate,
710 'reference')
712 assert gf_trace.itmin >= itmin - idelay_ceil
713 assert gf_trace.data.size <= nsamples + idelay_ceil - idelay_floor
715 if gf_trace.is_zero:
716 continue
718 ilo = gf_trace.itmin - itmin + idelay_floor
719 ihi = ilo + gf_trace.data.size + (idelay_ceil-idelay_floor)
721 data = gf_trace.data
723 if idelay_floor == idelay_ceil:
724 out[ilo:ihi] += data * weight
725 else:
726 if data.size:
727 k = 1
728 if ihi <= nsamples:
729 out[ihi-1] += gf_trace.end_value * \
730 ((idelay_ceil-delay/deltat) * weight)
731 k = 0
733 out[ilo+1:ihi-k] += data[:data.size-k] * \
734 ((delay/deltat-idelay_floor) * weight)
735 k = 1
736 if ilo >= 0:
737 out[ilo] += gf_trace.begin_value * \
738 ((delay/deltat-idelay_floor) * weight)
739 k = 0
741 out[ilo+k:ihi-1] += data[k:] * \
742 ((idelay_ceil-delay/deltat) * weight)
744 if ilo > 0 and gf_trace.begin_value != 0.0:
745 out[:ilo] += gf_trace.begin_value * weight
747 if ihi < nsamples and gf_trace.end_value != 0.0:
748 out[ihi:] += gf_trace.end_value * weight
750 return GFTrace(out, itmin, deltat)
752 def _sum_impl_reference(self, irecords, delays, weights, itmin, nsamples,
753 decimate):
755 if not self._f_index:
756 self.open()
758 deltat = self.get_deltat() * decimate
760 if len(irecords) == 0:
761 if None in (itmin, nsamples):
762 return Zero
763 else:
764 return GFTrace(
765 num.zeros(nsamples, dtype=gf_dtype), itmin,
766 deltat, is_zero=True)
768 datas = []
769 itmins = []
770 for i, delay, weight in zip(irecords, delays, weights):
771 tr = self._get(i, None, None, decimate, 'reference')
773 idelay_floor = int(math.floor(delay/deltat))
774 idelay_ceil = int(math.ceil(delay/deltat))
776 if idelay_floor == idelay_ceil:
777 itmins.append(tr.itmin + idelay_floor)
778 datas.append(tr.data.copy()*weight)
779 else:
780 itmins.append(tr.itmin + idelay_floor)
781 datas.append(tr.data.copy()*weight*(idelay_ceil-delay/deltat))
782 itmins.append(tr.itmin + idelay_ceil)
783 datas.append(tr.data.copy()*weight*(delay/deltat-idelay_floor))
785 itmin_all = min(itmins)
787 itmax_all = max(itmin_ + data.size for (itmin_, data) in
788 zip(itmins, datas))
790 if itmin is not None:
791 itmin_all = min(itmin_all, itmin)
792 if nsamples is not None:
793 itmax_all = max(itmax_all, itmin+nsamples)
795 nsamples_all = itmax_all - itmin_all
797 arr = num.zeros((len(datas), nsamples_all), dtype=gf_dtype)
798 for i, itmin_, data in zip(num.arange(len(datas)), itmins, datas):
799 if data.size > 0:
800 ilo = itmin_-itmin_all
801 ihi = ilo + data.size
802 arr[i, :ilo] = data[0]
803 arr[i, ilo:ihi] = data
804 arr[i, ihi:] = data[-1]
806 sum_arr = arr.sum(axis=0)
808 if itmin is not None and nsamples is not None:
809 ilo = itmin-itmin_all
810 ihi = ilo + nsamples
811 sum_arr = sum_arr[ilo:ihi]
813 else:
814 itmin = itmin_all
816 return GFTrace(sum_arr, itmin, deltat)
818 def _optimize(self, irecords, delays, weights):
819 if num.unique(irecords).size == irecords.size:
820 return irecords, delays, weights
822 deltat = self.get_deltat()
824 delays = delays / deltat
825 irecords2 = num.repeat(irecords, 2)
826 delays2 = num.empty(irecords2.size, dtype=float)
827 delays2[0::2] = num.floor(delays)
828 delays2[1::2] = num.ceil(delays)
829 weights2 = num.repeat(weights, 2)
830 weights2[0::2] *= 1.0 - (delays - delays2[0::2])
831 weights2[1::2] *= (1.0 - (delays2[1::2] - delays)) * \
832 (delays2[1::2] - delays2[0::2])
834 delays2 *= deltat
836 iorder = num.lexsort((delays2, irecords2))
838 irecords2 = irecords2[iorder]
839 delays2 = delays2[iorder]
840 weights2 = weights2[iorder]
842 ui = num.empty(irecords2.size, dtype=bool)
843 ui[1:] = num.logical_or(num.diff(irecords2) != 0,
844 num.diff(delays2) != 0.)
846 ui[0] = 0
847 ind2 = num.cumsum(ui)
848 ui[0] = 1
849 ind1 = num.where(ui)[0]
851 irecords3 = irecords2[ind1]
852 delays3 = delays2[ind1]
853 weights3 = num.bincount(ind2, weights2)
855 return irecords3, delays3, weights3
857 def _sum(self, irecords, delays, weights, itmin, nsamples, decimate,
858 implementation, optimization):
860 if not self._f_index:
861 self.open()
863 t0 = time.time()
864 if optimization == 'enable':
865 irecords, delays, weights = self._optimize(
866 irecords, delays, weights)
867 else:
868 assert optimization == 'disable'
870 t1 = time.time()
871 deltat = self.get_deltat()
873 if implementation == 'c' and decimate == 1:
874 if delays.size != 0:
875 itoffset = int(num.floor(num.min(delays)/deltat))
876 else:
877 itoffset = 0
879 if nsamples is None:
880 nsamples = -1
882 if itmin is None:
883 itmin = 0
884 else:
885 itmin -= itoffset
887 try:
888 tr = GFTrace(*store_ext.store_sum(
889 self.cstore, irecords.astype(num.uint64),
890 delays - itoffset*deltat,
891 weights.astype(num.float32),
892 int(itmin), int(nsamples)))
894 tr.itmin += itoffset
896 except store_ext.StoreExtError as e:
897 raise StoreError(str(e) + ' in store %s' % self.store_dir)
899 elif implementation == 'alternative':
900 tr = self._sum_impl_alternative(irecords, delays, weights,
901 itmin, nsamples, decimate)
903 else:
904 tr = self._sum_impl_reference(irecords, delays, weights,
905 itmin, nsamples, decimate)
907 t2 = time.time()
909 tr.n_records_stacked = irecords.size
910 tr.t_optimize = t1 - t0
911 tr.t_stack = t2 - t1
913 return tr
915 def _sum_statics(self, irecords, delays, weights, it, ntargets,
916 nthreads):
917 if not self._f_index:
918 self.open()
920 return store_ext.store_sum_static(
921 self.cstore, irecords, delays, weights, it, ntargets, nthreads)
923 def _load_index(self):
924 if self._use_memmap:
925 records = num.memmap(
926 self._f_index, dtype=gf_record_dtype,
927 offset=gf_store_header_fmt_size,
928 mode=('r', 'r+')[self.mode == 'w'])
930 else:
931 self._f_index.seek(gf_store_header_fmt_size)
932 records = num.fromfile(self._f_index, dtype=gf_record_dtype)
934 assert len(records) == self._nrecords
936 self._records = records
938 def _save_index(self):
939 self._f_index.seek(0)
940 self._f_index.write(struct.pack(gf_store_header_fmt, self._nrecords,
941 self.get_deltat()))
943 if self._use_memmap:
944 self._records.flush()
945 else:
946 self._f_index.seek(gf_store_header_fmt_size)
947 self._records.tofile(self._f_index)
948 self._f_index.flush()
950 def _get_data(self, ipos, begin_value, end_value, ilo, ihi):
951 if ihi - ilo > 0:
952 if ipos == 2:
953 data_orig = num.empty(2, dtype=gf_dtype)
954 data_orig[0] = begin_value
955 data_orig[1] = end_value
956 return data_orig[ilo:ihi]
957 else:
958 self._f_data.seek(
959 int(ipos + ilo*gf_dtype_nbytes_per_sample))
960 arr = num.fromfile(
961 self._f_data, gf_dtype_store, ihi-ilo).astype(gf_dtype)
963 if arr.size != ihi-ilo:
964 raise ShortRead()
965 return arr
966 else:
967 return num.empty((0,), dtype=gf_dtype)
969 def lock_fn(self):
970 return BaseStore.lock_fn_(self.store_dir)
972 def index_fn(self):
973 return BaseStore.index_fn_(self.store_dir)
975 def data_fn(self):
976 return BaseStore.data_fn_(self.store_dir)
978 def config_fn(self):
979 return BaseStore.config_fn_(self.store_dir)
981 def count_special_records(self):
982 if not self._f_index:
983 self.open()
985 return num.histogram(self._records['data_offset'],
986 bins=[0, 1, 2, 3, num.uint64(-1)])[0]
988 @property
989 def size_index(self):
990 return os.stat(self.index_fn()).st_size
992 @property
993 def size_data(self):
994 return os.stat(self.data_fn()).st_size
996 @property
997 def size_index_and_data(self):
998 return self.size_index + self.size_data
1000 @property
1001 def size_index_and_data_human(self):
1002 return util.human_bytesize(self.size_index_and_data)
1004 def stats(self):
1005 counter = self.count_special_records()
1007 stats = dict(
1008 total=self._nrecords,
1009 inserted=(counter[1] + counter[2] + counter[3]),
1010 empty=counter[0],
1011 short=counter[2],
1012 zero=counter[1],
1013 size_data=self.size_data,
1014 size_index=self.size_index,
1015 )
1017 return stats
1019 stats_keys = 'total inserted empty short zero size_data size_index'.split()
1022def remake_dir(dpath, force):
1023 if os.path.exists(dpath):
1024 if force:
1025 shutil.rmtree(dpath)
1026 else:
1027 raise CannotCreate('Directory "%s" already exists.' % dpath)
1029 os.mkdir(dpath)
1032class MakeTimingParamsFailed(StoreError):
1033 pass
1036class Store(BaseStore):
1038 '''
1039 Green's function disk storage and summation machine.
1041 The `Store` can be used to efficiently store, retrieve, and sum Green's
1042 function traces. A `Store` contains many 1D time traces sampled at even
1043 multiples of a global sampling rate, where each time trace has an
1044 individual start and end time. The traces are treated as having repeating
1045 end points, so the functions they represent can be non-constant only
1046 between begin and end time. It provides capabilities to retrieve decimated
1047 traces and to extract parts of the traces. The main purpose of this class
1048 is to provide a fast, easy to use, and flexible machanism to compute
1049 weighted delay-and-sum stacks with many Green's function traces involved.
1051 Individual Green's functions are accessed through a single integer index at
1052 low level. On top of that, various indexing schemes can be implemented by
1053 providing a mapping from physical coordinates to the low level index `i`.
1054 E.g. for a problem with cylindrical symmetry, one might define a mapping
1055 from the coordinates (`receiver_depth`, `source_depth`, `distance`) to the
1056 low level index. Index translation is done in the
1057 :py:class:`pyrocko.gf.meta.Config` subclass object associated with the
1058 Store. It is accessible through the store's :py:attr:`config` attribute,
1059 and contains all meta information about the store, including physical
1060 extent, geometry, sampling rate, and information about the type of the
1061 stored Green's functions. Information about the underlying earth model
1062 can also be found there.
1064 A GF store can also contain tabulated phase arrivals. In basic cases, these
1065 can be created with the :py:meth:`make_travel_time_tables` and evaluated
1066 with the :py:func:`t` methods.
1068 .. attribute:: config
1070 The :py:class:`pyrocko.gf.meta.Config` derived object associated with
1071 the store which contains all meta information about the store and
1072 provides the high-level to low-level index mapping.
1074 .. attribute:: store_dir
1076 Path to the store's data directory.
1078 .. attribute:: mode
1080 The mode in which the store is opened: ``'r'``: read-only, ``'w'``:
1081 writeable.
1082 '''
1084 @classmethod
1085 def create(cls, store_dir, config, force=False, extra=None):
1086 '''
1087 Create new GF store.
1089 Creates a new GF store at path ``store_dir``. The layout of the GF is
1090 defined with the parameters given in ``config``, which should be an
1091 object of a subclass of :py:class:`~pyrocko.gf.meta.Config`. This
1092 function will refuse to overwrite an existing GF store, unless
1093 ``force`` is set to ``True``. If more information, e.g. parameters
1094 used for the modelling code, earth models or other, should be saved
1095 along with the GF store, these may be provided though a dict given to
1096 ``extra``. The keys of this dict must be names and the values must be
1097 *guts* type objects.
1099 :param store_dir: GF store path
1100 :type store_dir: str
1101 :param config: GF store Config
1102 :type config: :py:class:`~pyrocko.gf.meta.Config`
1103 :param force: Force overwrite, defaults to ``False``
1104 :type force: bool
1105 :param extra: Extra information
1106 :type extra: dict or None
1107 '''
1109 cls.create_editables(store_dir, config, force=force, extra=extra)
1110 cls.create_dependants(store_dir, force=force)
1112 return cls(store_dir)
1114 @staticmethod
1115 def create_editables(store_dir, config, force=False, extra=None):
1116 try:
1117 util.ensuredir(store_dir)
1118 except Exception:
1119 raise CannotCreate('cannot create directory %s' % store_dir)
1121 fns = []
1123 config_fn = os.path.join(store_dir, 'config')
1124 remove_if_exists(config_fn, force)
1125 meta.dump(config, filename=config_fn)
1127 fns.append(config_fn)
1129 for sub_dir in ['extra']:
1130 dpath = os.path.join(store_dir, sub_dir)
1131 remake_dir(dpath, force)
1133 if extra:
1134 for k, v in extra.items():
1135 fn = get_extra_path(store_dir, k)
1136 remove_if_exists(fn, force)
1137 meta.dump(v, filename=fn)
1139 fns.append(fn)
1141 return fns
1143 @staticmethod
1144 def create_dependants(store_dir, force=False):
1145 config_fn = os.path.join(store_dir, 'config')
1146 config = meta.load(filename=config_fn)
1148 BaseStore.create(store_dir, config.deltat, config.nrecords,
1149 force=force)
1151 for sub_dir in ['decimated']:
1152 dpath = os.path.join(store_dir, sub_dir)
1153 remake_dir(dpath, force)
1155 def __init__(self, store_dir, mode='r', use_memmap=True):
1156 BaseStore.__init__(self, store_dir, mode=mode, use_memmap=use_memmap)
1157 config_fn = self.config_fn()
1158 if not os.path.isfile(config_fn):
1159 raise StoreError(
1160 'directory "%s" does not seem to contain a GF store '
1161 '("config" file not found)' % store_dir)
1162 self.load_config()
1164 self._decimated = {}
1165 self._extra = {}
1166 self._phases = {}
1167 for decimate in range(2, 9):
1168 if os.path.isdir(self._decimated_store_dir(decimate)):
1169 self._decimated[decimate] = None
1171 def open(self):
1172 if not self._f_index:
1173 BaseStore.open(self)
1174 c = self.config
1176 mscheme = 'type_' + c.short_type.lower()
1177 store_ext.store_mapping_init(
1178 self.cstore, mscheme,
1179 c.mins, c.maxs, c.deltas, c.ns.astype(num.uint64),
1180 c.ncomponents)
1182 def save_config(self, make_backup=False):
1183 config_fn = self.config_fn()
1184 if make_backup:
1185 os.rename(config_fn, config_fn + '~')
1187 meta.dump(self.config, filename=config_fn)
1189 def get_deltat(self):
1190 return self.config.deltat
1192 def load_config(self):
1193 self.config = meta.load(filename=self.config_fn())
1195 def ensure_reference(self, force=False):
1196 if self.config.reference is not None and not force:
1197 return
1198 self.ensure_uuid()
1199 reference = '%s-%s' % (self.config.id, self.config.uuid[0:6])
1201 if self.config.reference is not None:
1202 self.config.reference = reference
1203 self.save_config()
1204 else:
1205 with open(self.config_fn(), 'a') as config:
1206 config.write('reference: %s\n' % reference)
1207 self.load_config()
1209 def ensure_uuid(self, force=False):
1210 if self.config.uuid is not None and not force:
1211 return
1212 uuid = self.create_store_hash()
1214 if self.config.uuid is not None:
1215 self.config.uuid = uuid
1216 self.save_config()
1217 else:
1218 with open(self.config_fn(), 'a') as config:
1219 config.write('\nuuid: %s\n' % uuid)
1220 self.load_config()
1222 def create_store_hash(self):
1223 logger.info('creating hash for store ...')
1224 m = hashlib.sha1()
1226 traces_size = op.getsize(self.data_fn())
1227 with open(self.data_fn(), 'rb') as traces:
1228 while traces.tell() < traces_size:
1229 m.update(traces.read(4096))
1230 traces.seek(1024 * 1024 * 10, 1)
1232 with open(self.config_fn(), 'rb') as config:
1233 m.update(config.read())
1234 return m.hexdigest()
1236 def get_extra_path(self, key):
1237 return get_extra_path(self.store_dir, key)
1239 def get_extra(self, key):
1240 '''
1241 Get extra information stored under given key.
1242 '''
1244 x = self._extra
1245 if key not in x:
1246 fn = self.get_extra_path(key)
1247 if not os.path.exists(fn):
1248 raise NoSuchExtra(key)
1250 x[key] = meta.load(filename=fn)
1252 return x[key]
1254 def upgrade(self):
1255 '''
1256 Upgrade store config and files to latest Pyrocko version.
1257 '''
1258 fns = [os.path.join(self.store_dir, 'config')]
1259 for key in self.extra_keys():
1260 fns.append(self.get_extra_path(key))
1262 i = 0
1263 for fn in fns:
1264 i += util.pf_upgrade(fn)
1266 return i
1268 def extra_keys(self):
1269 return [x for x in os.listdir(os.path.join(self.store_dir, 'extra'))
1270 if valid_string_id(x)]
1272 def put(self, args, trace):
1273 '''
1274 Insert trace into GF store.
1276 Store a single GF trace at (high-level) index ``args``.
1278 :param args: :py:class:`~pyrocko.gf.meta.Config` index tuple, e.g.
1279 ``(source_depth, distance, component)`` as in
1280 :py:class:`~pyrocko.gf.meta.ConfigTypeA`.
1281 :type args: tuple
1282 :returns: GF trace at ``args``
1283 :rtype: :py:class:`~pyrocko.gf.store.GFTrace`
1284 '''
1286 irecord = self.config.irecord(*args)
1287 self._put(irecord, trace)
1289 def get_record(self, args):
1290 irecord = self.config.irecord(*args)
1291 return self._get_record(irecord)
1293 def str_irecord(self, args):
1294 return BaseStore.str_irecord(self, self.config.irecord(*args))
1296 def get(self, args, itmin=None, nsamples=None, decimate=1,
1297 interpolation='nearest_neighbor', implementation='c'):
1298 '''
1299 Retrieve GF trace from store.
1301 Retrieve a single GF trace from the store at (high-level) index
1302 ``args``. By default, the full trace is retrieved. Given ``itmin`` and
1303 ``nsamples``, only the selected portion of the trace is extracted. If
1304 ``decimate`` is an integer in the range [2,8], the trace is decimated
1305 on the fly or, if available, the trace is read from a decimated version
1306 of the GF store.
1308 :param args: :py:class:`~pyrocko.gf.meta.Config` index tuple, e.g.
1309 ``(source_depth, distance, component)`` as in
1310 :py:class:`~pyrocko.gf.meta.ConfigTypeA`.
1311 :type args: tuple
1312 :param itmin: Start time index (start time is ``itmin * dt``),
1313 defaults to None
1314 :type itmin: int or None
1315 :param nsamples: Number of samples, defaults to None
1316 :type nsamples: int or None
1317 :param decimate: Decimatation factor, defaults to 1
1318 :type decimate: int
1319 :param interpolation: Interpolation method
1320 ``['nearest_neighbor', 'multilinear', 'off']``, defaults to
1321 ``'nearest_neighbor'``
1322 :type interpolation: str
1323 :param implementation: Implementation to use ``['c', 'reference']``,
1324 defaults to ``'c'``.
1325 :type implementation: str
1327 :returns: GF trace at ``args``
1328 :rtype: :py:class:`~pyrocko.gf.store.GFTrace`
1329 '''
1331 store, decimate = self._decimated_store(decimate)
1332 if interpolation == 'nearest_neighbor':
1333 irecord = store.config.irecord(*args)
1334 tr = store._get(irecord, itmin, nsamples, decimate,
1335 implementation)
1337 elif interpolation == 'off':
1338 irecords, weights = store.config.vicinity(*args)
1339 if len(irecords) != 1:
1340 raise NotAllowedToInterpolate()
1341 else:
1342 tr = store._get(irecords[0], itmin, nsamples, decimate,
1343 implementation)
1345 elif interpolation == 'multilinear':
1346 tr = store._sum(irecords, num.zeros(len(irecords)), weights,
1347 itmin, nsamples, decimate, implementation,
1348 'disable')
1350 # to prevent problems with rounding errors (BaseStore saves deltat
1351 # as a 4-byte floating point value, value from YAML config is more
1352 # accurate)
1353 tr.deltat = self.config.deltat * decimate
1354 return tr
1356 def sum(self, args, delays, weights, itmin=None, nsamples=None,
1357 decimate=1, interpolation='nearest_neighbor', implementation='c',
1358 optimization='enable'):
1359 '''
1360 Sum delayed and weighted GF traces.
1362 Calculate sum of delayed and weighted GF traces. ``args`` is a tuple of
1363 arrays forming the (high-level) indices of the GF traces to be
1364 selected. Delays and weights for the summation are given in the arrays
1365 ``delays`` and ``weights``. ``itmin`` and ``nsamples`` can be given to
1366 restrict to computation to a given time interval. If ``decimate`` is
1367 an integer in the range [2,8], decimated traces are used in the
1368 summation.
1370 :param args: :py:class:`~pyrocko.gf.meta.Config` index tuple, e.g.
1371 ``(source_depth, distance, component)`` as in
1372 :py:class:`~pyrocko.gf.meta.ConfigTypeA`.
1373 :type args: tuple(numpy.ndarray)
1374 :param delays: Delay times
1375 :type delays: :py:class:`numpy.ndarray`
1376 :param weights: Trace weights
1377 :type weights: :py:class:`numpy.ndarray`
1378 :param itmin: Start time index (start time is ``itmin * dt``),
1379 defaults to None
1380 :type itmin: int or None
1381 :param nsamples: Number of samples, defaults to None
1382 :type nsamples: int or None
1383 :param decimate: Decimatation factor, defaults to 1
1384 :type decimate: int
1385 :param interpolation: Interpolation method
1386 ``['nearest_neighbor', 'multilinear', 'off']``, defaults to
1387 ``'nearest_neighbor'``
1388 :type interpolation: str
1389 :param implementation: Implementation to use,
1390 ``['c', 'alternative', 'reference']``, where ``'alternative'``
1391 and ``'reference'`` use a Python implementation, defaults to `'c'`
1392 :type implementation: str
1393 :param optimization: Optimization mode ``['enable', 'disable']``,
1394 defaults to ``'enable'``
1395 :type optimization: str
1396 :returns: Stacked GF trace.
1397 :rtype: :py:class:`~pyrocko.gf.store.GFTrace`
1398 '''
1400 store, decimate_ = self._decimated_store(decimate)
1402 if interpolation == 'nearest_neighbor':
1403 irecords = store.config.irecords(*args)
1404 else:
1405 assert interpolation == 'multilinear'
1406 irecords, ip_weights = store.config.vicinities(*args)
1407 neach = irecords.size // args[0].size
1408 weights = num.repeat(weights, neach) * ip_weights
1409 delays = num.repeat(delays, neach)
1411 tr = store._sum(irecords, delays, weights, itmin, nsamples, decimate_,
1412 implementation, optimization)
1414 # to prevent problems with rounding errors (BaseStore saves deltat
1415 # as a 4-byte floating point value, value from YAML config is more
1416 # accurate)
1417 tr.deltat = self.config.deltat * decimate
1418 return tr
1420 def make_decimated(self, decimate, config=None, force=False,
1421 show_progress=False):
1422 '''
1423 Create decimated version of GF store.
1425 Create a downsampled version of the GF store. Downsampling is done for
1426 the integer factor ``decimate`` which should be in the range [2,8]. If
1427 ``config`` is ``None``, all traces of the GF store are decimated and
1428 held available (i.e. the index mapping of the original store is used),
1429 otherwise, a different spacial stepping can be specified by giving a
1430 modified GF store configuration in ``config`` (see :py:meth:`create`).
1431 Decimated GF sub-stores are created under the ``decimated``
1432 subdirectory within the GF store directory. Holding available decimated
1433 versions of the GF store can save computation time, IO bandwidth, or
1434 decrease memory footprint at the cost of increased disk space usage,
1435 when computation are done for lower frequency signals.
1437 :param decimate: Decimate factor
1438 :type decimate: int
1439 :param config: GF store config object, defaults to None
1440 :type config: :py:class:`~pyrocko.gf.meta.Config` or None
1441 :param force: Force overwrite, defaults to ``False``
1442 :type force: bool
1443 :param show_progress: Show progress, defaults to ``False``
1444 :type show_progress: bool
1445 '''
1447 if not self._f_index:
1448 self.open()
1450 if not (2 <= decimate <= 8):
1451 raise StoreError('decimate argument must be in the range [2,8]')
1453 assert self.mode == 'r'
1455 if config is None:
1456 config = self.config
1458 config = copy.deepcopy(config)
1459 config.sample_rate = self.config.sample_rate / decimate
1461 if decimate in self._decimated:
1462 del self._decimated[decimate]
1464 store_dir = self._decimated_store_dir(decimate)
1465 if os.path.exists(store_dir):
1466 if force:
1467 shutil.rmtree(store_dir)
1468 else:
1469 raise CannotCreate('store already exists at %s' % store_dir)
1471 store_dir_incomplete = store_dir + '-incomplete'
1472 Store.create(store_dir_incomplete, config, force=force)
1474 decimated = Store(store_dir_incomplete, 'w')
1475 if show_progress:
1476 pbar = util.progressbar('decimating store', self.config.nrecords)
1478 for i, args in enumerate(decimated.config.iter_nodes()):
1479 tr = self.get(args, decimate=decimate)
1480 decimated.put(args, tr)
1482 if show_progress:
1483 pbar.update(i+1)
1485 if show_progress:
1486 pbar.finish()
1488 decimated.close()
1490 shutil.move(store_dir_incomplete, store_dir)
1492 self._decimated[decimate] = None
1494 def stats(self):
1495 stats = BaseStore.stats(self)
1496 stats['decimated'] = sorted(self._decimated.keys())
1497 return stats
1499 stats_keys = BaseStore.stats_keys + ['decimated']
1501 def check(self, show_progress=False):
1502 have_holes = []
1503 for pdef in self.config.tabulated_phases:
1504 phase_id = pdef.id
1505 ph = self.get_stored_phase(phase_id)
1506 if ph.check_holes():
1507 have_holes.append(phase_id)
1509 if have_holes:
1510 for phase_id in have_holes:
1511 logger.warning(
1512 'Travel time table of phase "{}" contains holes. You can '
1513 ' use `fomosto tttlsd` to fix holes.'.format(
1514 phase_id))
1515 else:
1516 logger.info('No holes in travel time tables')
1518 if show_progress:
1519 pbar = util.progressbar('checking store', self.config.nrecords)
1521 problems = 0
1522 for i, args in enumerate(self.config.iter_nodes()):
1523 tr = self.get(args)
1524 if tr and not tr.is_zero:
1525 if not tr.begin_value == tr.data[0]:
1526 logger.warning('wrong begin value for trace at %s '
1527 '(data corruption?)' % str(args))
1528 problems += 1
1529 if not tr.end_value == tr.data[-1]:
1530 logger.warning('wrong end value for trace at %s '
1531 '(data corruption?)' % str(args))
1532 problems += 1
1533 if not num.all(num.isfinite(tr.data)):
1534 logger.warning('nans or infs in trace at %s' % str(args))
1535 problems += 1
1537 if show_progress:
1538 pbar.update(i+1)
1540 if show_progress:
1541 pbar.finish()
1543 return problems
1545 def check_earthmodels(self, config):
1546 if config.earthmodel_receiver_1d.profile('z')[-1] not in\
1547 config.earthmodel_1d.profile('z'):
1548 logger.warning('deepest layer of earthmodel_receiver_1d not '
1549 'found in earthmodel_1d')
1551 def _decimated_store_dir(self, decimate):
1552 return os.path.join(self.store_dir, 'decimated', str(decimate))
1554 def _decimated_store(self, decimate):
1555 if decimate == 1 or decimate not in self._decimated:
1556 return self, decimate
1557 else:
1558 store = self._decimated[decimate]
1559 if store is None:
1560 store = Store(self._decimated_store_dir(decimate), 'r')
1561 self._decimated[decimate] = store
1563 return store, 1
1565 def phase_filename(self, phase_id, attribute='phase'):
1566 check_string_id(phase_id)
1567 return os.path.join(
1568 self.store_dir, 'phases', phase_id + '.%s' % attribute)
1570 def get_phase_identifier(self, phase_id, attribute):
1571 return '{}.{}'.format(phase_id, attribute)
1573 def get_stored_phase(self, phase_def, attribute='phase'):
1574 '''
1575 Get stored phase from GF store.
1577 :returns: Phase information
1578 :rtype: :py:class:`pyrocko.spit.SPTree`
1579 '''
1581 phase_id = self.get_phase_identifier(phase_def, attribute)
1582 if phase_id not in self._phases:
1583 fn = self.phase_filename(phase_def, attribute)
1584 if not os.path.isfile(fn):
1585 raise NoSuchPhase(phase_id)
1587 spt = spit.SPTree(filename=fn)
1588 self._phases[phase_id] = spt
1590 return self._phases[phase_id]
1592 def get_phase(self, phase_def):
1593 toks = phase_def.split(':', 1)
1594 if len(toks) == 2:
1595 provider, phase_def = toks
1596 else:
1597 provider, phase_def = 'stored', toks[0]
1599 if provider == 'stored':
1600 return self.get_stored_phase(phase_def)
1602 elif provider == 'vel':
1603 vel = float(phase_def) * 1000.
1605 def evaluate(args):
1606 return self.config.get_distance(args) / vel
1608 return evaluate
1610 elif provider == 'vel_surface':
1611 vel = float(phase_def) * 1000.
1613 def evaluate(args):
1614 return self.config.get_surface_distance(args) / vel
1616 return evaluate
1618 elif provider in ('cake', 'iaspei'):
1619 from pyrocko import cake
1620 mod = self.config.earthmodel_1d
1621 if provider == 'cake':
1622 phases = [cake.PhaseDef(phase_def)]
1623 else:
1624 phases = cake.PhaseDef.classic(phase_def)
1626 def evaluate(args):
1627 if len(args) == 2:
1628 zr, zs, x = (self.config.receiver_depth,) + args
1629 elif len(args) == 3:
1630 zr, zs, x = args
1631 else:
1632 assert False
1634 t = []
1635 if phases:
1636 rays = mod.arrivals(
1637 phases=phases,
1638 distances=[x*cake.m2d],
1639 zstart=zs,
1640 zstop=zr)
1642 for ray in rays:
1643 t.append(ray.t)
1645 if t:
1646 return min(t)
1647 else:
1648 return None
1650 return evaluate
1652 raise StoreError('unsupported phase provider: %s' % provider)
1654 def t(self, timing, *args):
1655 '''
1656 Compute interpolated phase arrivals.
1658 **Examples:**
1660 If ``test_store`` is of :py:class:`~pyrocko.gf.meta.ConfigTypeA`::
1662 test_store.t('p', (1000, 10000))
1663 test_store.t('last{P|Pdiff}', (1000, 10000)) # The latter arrival
1664 # of P or diffracted
1665 # P phase
1667 If ``test_store`` is of :py:class:`~pyrocko.gf.meta.ConfigTypeB`::
1669 test_store.t('S', (1000, 1000, 10000))
1670 test_store.t('first{P|p|Pdiff|sP}', (1000, 1000, 10000)) # The
1671 ` # first arrival of
1672 # the given phases is
1673 # selected
1675 :param timing: Timing string as described above
1676 :type timing: str or :py:class:`~pyrocko.gf.meta.Timing`
1677 :param \\*args: :py:class:`~pyrocko.gf.meta.Config` index tuple, e.g.
1678 ``(source_depth, distance, component)`` as in
1679 :py:class:`~pyrocko.gf.meta.ConfigTypeA`.
1680 :type \\*args: tuple
1681 :returns: Phase arrival according to ``timing``
1682 :rtype: float or None
1683 '''
1685 if len(args) == 1:
1686 args = args[0]
1687 else:
1688 args = self.config.make_indexing_args1(*args)
1690 if not isinstance(timing, meta.Timing):
1691 timing = meta.Timing(timing)
1693 return timing.evaluate(self.get_phase, args)
1695 def get_available_interpolation_tables(self):
1696 filenames = glob(op.join(self.store_dir, 'phases', '*'))
1697 return [op.basename(file) for file in filenames]
1699 def get_stored_attribute(self, phase_def, attribute, *args):
1700 '''
1701 Return interpolated store attribute
1703 :param attribute: takeoff_angle / incidence_angle [deg]
1704 :type attribute: str
1705 :param \\*args: :py:class:`~pyrocko.gf.meta.Config` index tuple, e.g.
1706 ``(source_depth, distance, component)`` as in
1707 :py:class:`~pyrocko.gf.meta.ConfigTypeA`.
1708 :type \\*args: tuple
1709 '''
1710 try:
1711 return self.get_stored_phase(phase_def, attribute)(*args)
1712 except NoSuchPhase:
1713 raise StoreError(
1714 'Interpolation table for {} of {} does not exist! '
1715 'Available tables: {}'.format(
1716 attribute, phase_def,
1717 self.get_available_interpolation_tables()))
1719 def get_many_stored_attributes(self, phase_def, attribute, coords):
1720 '''
1721 Return interpolated store attribute
1723 :param attribute: takeoff_angle / incidence_angle [deg]
1724 :type attribute: str
1725 :param \\coords: :py:class:`num.array.Array`, with columns being
1726 ``(source_depth, distance, component)`` as in
1727 :py:class:`~pyrocko.gf.meta.ConfigTypeA`.
1728 :type \\coords: :py:class:`num.array.Array`
1729 '''
1730 try:
1731 return self.get_stored_phase(
1732 phase_def, attribute).interpolate_many(coords)
1733 except NoSuchPhase:
1734 raise StoreError(
1735 'Interpolation table for {} of {} does not exist! '
1736 'Available tables: {}'.format(
1737 attribute, phase_def,
1738 self.get_available_interpolation_tables()))
1740 def make_stored_table(self, attribute, force=False):
1741 '''
1742 Compute tables for selected ray attributes.
1744 :param attribute: phase / takeoff_angle [deg]/ incidence_angle [deg]
1745 :type attribute: str
1747 Tables are computed using the 1D earth model defined in
1748 :py:attr:`~pyrocko.gf.meta.Config.earthmodel_1d` for each defined phase
1749 in :py:attr:`~pyrocko.gf.meta.Config.tabulated_phases`.
1750 '''
1752 if attribute not in available_stored_tables:
1753 raise StoreError(
1754 'Cannot create attribute table for {}! '
1755 'Supported attribute tables: {}'.format(
1756 attribute, available_stored_tables))
1758 from pyrocko import cake
1759 config = self.config
1761 if not config.tabulated_phases:
1762 return
1764 mod = config.earthmodel_1d
1766 if not mod:
1767 raise StoreError('no earth model found')
1769 if config.earthmodel_receiver_1d:
1770 self.check_earthmodels(config)
1772 for pdef in config.tabulated_phases:
1774 phase_id = pdef.id
1775 phases = pdef.phases
1777 if attribute == 'phase':
1778 ftol = config.deltat * 0.5
1779 horvels = pdef.horizontal_velocities
1780 else:
1781 ftol = config.deltat * 0.01
1783 fn = self.phase_filename(phase_id, attribute)
1785 if os.path.exists(fn) and not force:
1786 logger.info('file already exists: %s' % fn)
1787 continue
1789 def evaluate(args):
1791 nargs = len(args)
1792 if nargs == 2:
1793 receiver_depth, source_depth, distance = (
1794 config.receiver_depth,) + args
1795 elif nargs == 3:
1796 receiver_depth, source_depth, distance = args
1797 else:
1798 raise ValueError(
1799 'Number of input arguments %i is not supported!'
1800 'Supported number of arguments: 2 or 3' % nargs)
1802 ray_attribute_values = []
1803 arrival_times = []
1804 if phases:
1805 rays = mod.arrivals(
1806 phases=phases,
1807 distances=[distance * cake.m2d],
1808 zstart=source_depth,
1809 zstop=receiver_depth)
1811 for ray in rays:
1812 arrival_times.append(ray.t)
1813 if attribute != 'phase':
1814 ray_attribute_values.append(
1815 getattr(ray, attribute)())
1817 if attribute == 'phase':
1818 for v in horvels:
1819 arrival_times.append(distance / (v * km))
1821 if arrival_times:
1822 if attribute == 'phase':
1823 return min(arrival_times)
1824 else:
1825 earliest_idx = num.argmin(arrival_times)
1826 return ray_attribute_values[earliest_idx]
1827 else:
1828 return None
1830 logger.info(
1831 'making "%s" table for phasegroup "%s"', attribute, phase_id)
1833 ip = spit.SPTree(
1834 f=evaluate,
1835 ftol=ftol,
1836 xbounds=num.transpose((config.mins, config.maxs)),
1837 xtols=config.deltas)
1839 util.ensuredirs(fn)
1840 ip.dump(fn)
1842 def make_timing_params(self, begin, end, snap_vred=True, force=False):
1843 '''
1844 Compute tight parameterized time ranges to include given timings.
1846 Calculates appropriate time ranges to cover given begin and end timings
1847 over all GF points in the store. A dict with the following keys is
1848 returned:
1850 * ``'tmin'``: time [s], minimum of begin timing over all GF points
1851 * ``'tmax'``: time [s], maximum of end timing over all GF points
1852 * ``'vred'``, ``'tmin_vred'``: slope [m/s] and offset [s] of reduction
1853 velocity [m/s] appropriate to catch begin timing over all GF points
1854 * ``'tlenmax_vred'``: maximum time length needed to cover all end
1855 timings, when using linear slope given with (``vred``, ``tmin_vred``)
1856 as start
1857 '''
1859 data = []
1860 warned = set()
1861 for args in self.config.iter_nodes(level=-1):
1862 tmin = self.t(begin, args)
1863 tmax = self.t(end, args)
1864 if tmin is None:
1865 warned.add(str(begin))
1866 if tmax is None:
1867 warned.add(str(end))
1869 x = self.config.get_surface_distance(args)
1870 data.append((x, tmin, tmax))
1872 if len(warned):
1873 w = ' | '.join(list(warned))
1874 msg = '''determination of time window failed using phase
1875definitions: %s.\n Travel time table contains holes in probed ranges. You can
1876use `fomosto tttlsd` to fix holes.''' % w
1877 if force:
1878 logger.warning(msg)
1879 else:
1880 raise MakeTimingParamsFailed(msg)
1882 xs, tmins, tmaxs = num.array(data, dtype=float).T
1884 tlens = tmaxs - tmins
1886 i = num.nanargmin(tmins)
1887 if not num.isfinite(i):
1888 raise MakeTimingParamsFailed('determination of time window failed')
1890 tminmin = tmins[i]
1891 x_tminmin = xs[i]
1892 dx = (xs - x_tminmin)
1893 dx = num.where(dx != 0.0, dx, num.nan)
1894 s = (tmins - tminmin) / dx
1895 sred = num.min(num.abs(s[num.isfinite(s)]))
1897 deltax = self.config.distance_delta
1899 if snap_vred:
1900 tdif = sred*deltax
1901 tdif2 = self.config.deltat * math.floor(tdif / self.config.deltat)
1902 sred = tdif2/self.config.distance_delta
1904 tmin_vred = tminmin - sred*x_tminmin
1905 if snap_vred:
1906 xe = x_tminmin - int(x_tminmin / deltax) * deltax
1907 tmin_vred = float(
1908 self.config.deltat *
1909 math.floor(tmin_vred / self.config.deltat) - xe * sred)
1911 tlenmax_vred = num.nanmax(tmax - (tmin_vred + sred*x))
1912 if sred != 0.0:
1913 vred = 1.0/sred
1914 else:
1915 vred = 0.0
1917 return dict(
1918 tmin=tminmin,
1919 tmax=num.nanmax(tmaxs),
1920 tlenmax=num.nanmax(tlens),
1921 tmin_vred=tmin_vred,
1922 tlenmax_vred=tlenmax_vred,
1923 vred=vred)
1925 def make_travel_time_tables(self, force=False):
1926 '''
1927 Compute travel time tables.
1929 Travel time tables are computed using the 1D earth model defined in
1930 :py:attr:`~pyrocko.gf.meta.Config.earthmodel_1d` for each defined phase
1931 in :py:attr:`~pyrocko.gf.meta.Config.tabulated_phases`. The accuracy of
1932 the tablulated times is adjusted to the sampling rate of the store.
1933 '''
1934 self.make_stored_table(attribute='phase', force=force)
1936 def make_ttt(self, force=False):
1937 self.make_travel_time_tables(force=force)
1939 def make_takeoff_angle_tables(self, force=False):
1940 '''
1941 Compute takeoff-angle tables.
1943 Takeoff-angle tables [deg] are computed using the 1D earth model
1944 defined in :py:attr:`~pyrocko.gf.meta.Config.earthmodel_1d` for each
1945 defined phase in :py:attr:`~pyrocko.gf.meta.Config.tabulated_phases`.
1946 The accuracy of the tablulated times is adjusted to 0.01 times the
1947 sampling rate of the store.
1948 '''
1949 self.make_stored_table(attribute='takeoff_angle', force=force)
1951 def make_incidence_angle_tables(self, force=False):
1952 '''
1953 Compute incidence-angle tables.
1955 Incidence-angle tables [deg] are computed using the 1D earth model
1956 defined in :py:attr:`~pyrocko.gf.meta.Config.earthmodel_1d` for each
1957 defined phase in :py:attr:`~pyrocko.gf.meta.Config.tabulated_phases`.
1958 The accuracy of the tablulated times is adjusted to 0.01 times the
1959 sampling rate of the store.
1960 '''
1961 self.make_stored_table(attribute='incidence_angle', force=force)
1963 def get_provided_components(self):
1965 scheme_desc = meta.component_scheme_to_description[
1966 self.config.component_scheme]
1968 quantity = self.config.stored_quantity \
1969 or scheme_desc.default_stored_quantity
1971 if not quantity:
1972 return scheme_desc.provided_components
1973 else:
1974 return [
1975 quantity + '.' + comp
1976 for comp in scheme_desc.provided_components]
1978 def fix_ttt_holes(self, phase_id):
1980 pdef = self.config.get_tabulated_phase(phase_id)
1981 mode = None
1982 for phase in pdef.phases:
1983 for leg in phase.legs():
1984 if mode is None:
1985 mode = leg.mode
1987 else:
1988 if mode != leg.mode:
1989 raise StoreError(
1990 'Can only fix holes in pure P or pure S phases.')
1992 sptree = self.get_stored_phase(phase_id)
1993 sptree_lsd = self.config.fix_ttt_holes(sptree, mode)
1995 phase_lsd = phase_id + '.lsd'
1996 fn = self.phase_filename(phase_lsd)
1997 sptree_lsd.dump(fn)
1999 def statics(self, source, multi_location, itsnapshot, components,
2000 interpolation='nearest_neighbor', nthreads=0):
2001 if not self._f_index:
2002 self.open()
2004 out = {}
2005 ntargets = multi_location.ntargets
2006 source_terms = source.get_source_terms(self.config.component_scheme)
2007 # TODO: deal with delays for snapshots > 1 sample
2009 if itsnapshot is not None:
2010 delays = source.times
2012 # Fringe case where we sample at sample 0 and sample 1
2013 tsnapshot = itsnapshot * self.config.deltat
2014 if delays.max() == tsnapshot and delays.min() != tsnapshot:
2015 delays[delays == delays.max()] -= self.config.deltat
2017 else:
2018 delays = source.times * 0
2019 itsnapshot = 1
2021 if ntargets == 0:
2022 raise StoreError('MultiLocation.coords5 is empty')
2024 res = store_ext.store_calc_static(
2025 self.cstore,
2026 source.coords5(),
2027 source_terms,
2028 delays,
2029 multi_location.coords5,
2030 self.config.component_scheme,
2031 interpolation,
2032 itsnapshot,
2033 nthreads)
2035 out = {}
2036 for icomp, (comp, comp_res) in enumerate(
2037 zip(self.get_provided_components(), res)):
2038 if comp not in components:
2039 continue
2040 out[comp] = res[icomp]
2042 return out
2044 def calc_seismograms(self, source, receivers, components, deltat=None,
2045 itmin=None, nsamples=None,
2046 interpolation='nearest_neighbor',
2047 optimization='enable', nthreads=1):
2048 config = self.config
2050 assert interpolation in ['nearest_neighbor', 'multilinear'], \
2051 'Unknown interpolation: %s' % interpolation
2053 if not isinstance(receivers, list):
2054 receivers = [receivers]
2056 if deltat is None:
2057 decimate = 1
2058 else:
2059 decimate = int(round(deltat/config.deltat))
2060 if abs(deltat / (decimate * config.deltat) - 1.0) > 0.001:
2061 raise StoreError(
2062 'unavailable decimation ratio target.deltat / store.deltat'
2063 ' = %g / %g' % (deltat, config.deltat))
2065 store, decimate_ = self._decimated_store(decimate)
2067 if not store._f_index:
2068 store.open()
2070 scheme = config.component_scheme
2072 source_coords_arr = source.coords5()
2073 source_terms = source.get_source_terms(scheme)
2074 delays = source.times.ravel()
2076 nreceiver = len(receivers)
2077 receiver_coords_arr = num.empty((nreceiver, 5))
2078 for irec, rec in enumerate(receivers):
2079 receiver_coords_arr[irec, :] = rec.coords5
2081 dt = self.get_deltat()
2083 itoffset = int(num.floor(delays.min()/dt)) if delays.size != 0 else 0
2085 if itmin is None:
2086 itmin = num.zeros(nreceiver, dtype=num.int32)
2087 else:
2088 itmin = (itmin-itoffset).astype(num.int32)
2090 if nsamples is None:
2091 nsamples = num.zeros(nreceiver, dtype=num.int32) - 1
2092 else:
2093 nsamples = nsamples.astype(num.int32)
2095 try:
2096 results = store_ext.store_calc_timeseries(
2097 store.cstore,
2098 source_coords_arr,
2099 source_terms,
2100 (delays - itoffset*dt),
2101 receiver_coords_arr,
2102 scheme,
2103 interpolation,
2104 itmin,
2105 nsamples,
2106 nthreads)
2107 except Exception as e:
2108 raise e
2110 provided_components = self.get_provided_components()
2111 ncomponents = len(provided_components)
2113 seismograms = [dict() for _ in range(nreceiver)]
2114 for ires in range(len(results)):
2115 res = results.pop(0)
2116 ireceiver = ires // ncomponents
2118 comp = provided_components[res[-2]]
2120 if comp not in components:
2121 continue
2123 tr = GFTrace(*res[:-2])
2124 tr.deltat = config.deltat * decimate
2125 tr.itmin += itoffset
2127 tr.n_records_stacked = 0
2128 tr.t_optimize = 0.
2129 tr.t_stack = 0.
2130 tr.err = res[-1]
2132 seismograms[ireceiver][comp] = tr
2134 return seismograms
2136 def seismogram(self, source, receiver, components, deltat=None,
2137 itmin=None, nsamples=None,
2138 interpolation='nearest_neighbor',
2139 optimization='enable', nthreads=1):
2141 config = self.config
2143 if deltat is None:
2144 decimate = 1
2145 else:
2146 decimate = int(round(deltat/config.deltat))
2147 if abs(deltat / (decimate * config.deltat) - 1.0) > 0.001:
2148 raise StoreError(
2149 'unavailable decimation ratio target.deltat / store.deltat'
2150 ' = %g / %g' % (deltat, config.deltat))
2152 store, decimate_ = self._decimated_store(decimate)
2154 if not store._f_index:
2155 store.open()
2157 scheme = config.component_scheme
2159 source_coords_arr = source.coords5()
2160 source_terms = source.get_source_terms(scheme)
2161 receiver_coords_arr = receiver.coords5[num.newaxis, :].copy()
2163 try:
2164 params = store_ext.make_sum_params(
2165 store.cstore,
2166 source_coords_arr,
2167 source_terms,
2168 receiver_coords_arr,
2169 scheme,
2170 interpolation, nthreads)
2172 except store_ext.StoreExtError:
2173 raise meta.OutOfBounds()
2175 provided_components = self.get_provided_components()
2177 out = {}
2178 for icomp, comp in enumerate(provided_components):
2179 if comp in components:
2180 weights, irecords = params[icomp]
2182 neach = irecords.size // source.times.size
2183 delays = num.repeat(source.times, neach)
2185 tr = store._sum(
2186 irecords, delays, weights, itmin, nsamples, decimate_,
2187 'c', optimization)
2189 # to prevent problems with rounding errors (BaseStore saves
2190 # deltat as a 4-byte floating point value, value from YAML
2191 # config is more accurate)
2192 tr.deltat = config.deltat * decimate
2194 out[comp] = tr
2196 return out
2199__all__ = '''
2200gf_dtype
2201NotMultipleOfSamplingInterval
2202GFTrace
2203CannotCreate
2204CannotOpen
2205DuplicateInsert
2206NotAllowedToInterpolate
2207NoSuchExtra
2208NoSuchPhase
2209BaseStore
2210Store
2211SeismosizerErrorEnum
2212'''.split()