1# http://pyrocko.org - GPLv3
2#
3# The Pyrocko Developers, 21st Century
4# ---|P------/S----------~Lg----------
6import errno
7import time
8import os
9import struct
10import weakref
11import math
12import shutil
13try:
14 import fcntl
15except ImportError:
16 fcntl = None
17import copy
18import logging
19import re
20import hashlib
21from glob import glob
23import numpy as num
24from scipy import signal
26from . import meta
27from .error import StoreError
28try:
29 from . import store_ext
30except ImportError:
31 store_ext = None
32from pyrocko import util, spit
34logger = logging.getLogger('pyrocko.gf.store')
35op = os.path
37# gf store endianness
38E = '<'
40gf_dtype = num.dtype(num.float32)
41gf_dtype_store = num.dtype(E + 'f4')
43gf_dtype_nbytes_per_sample = 4
45gf_store_header_dtype = [
46 ('nrecords', E + 'u8'),
47 ('deltat', E + 'f4'),
48]
50gf_store_header_fmt = E + 'Qf'
51gf_store_header_fmt_size = struct.calcsize(gf_store_header_fmt)
53gf_record_dtype = num.dtype([
54 ('data_offset', E + 'u8'),
55 ('itmin', E + 'i4'),
56 ('nsamples', E + 'u4'),
57 ('begin_value', E + 'f4'),
58 ('end_value', E + 'f4'),
59])
61available_stored_tables = ['phase', 'takeoff_angle', 'incidence_angle']
63km = 1000.
66class SeismosizerErrorEnum:
67 SUCCESS = 0
68 INVALID_RECORD = 1
69 EMPTY_RECORD = 2
70 BAD_RECORD = 3
71 ALLOC_FAILED = 4
72 BAD_REQUEST = 5
73 BAD_DATA_OFFSET = 6
74 READ_DATA_FAILED = 7
75 SEEK_INDEX_FAILED = 8
76 READ_INDEX_FAILED = 9
77 FSTAT_TRACES_FAILED = 10
78 BAD_STORE = 11
79 MMAP_INDEX_FAILED = 12
80 MMAP_TRACES_FAILED = 13
81 INDEX_OUT_OF_BOUNDS = 14
82 NTARGETS_OUT_OF_BOUNDS = 15
85def valid_string_id(s):
86 return re.match(meta.StringID.pattern, s)
89def check_string_id(s):
90 if not valid_string_id(s):
91 raise ValueError('invalid name %s' % s)
93# - data_offset
94#
95# Data file offset of first sample in bytes (the seek value).
96# Special values:
97#
98# 0x00 - missing trace
99# 0x01 - zero trace (GF trace is physically zero)
100# 0x02 - short trace of 1 or 2 samples (no need for data allocation)
101#
102# - itmin
103#
104# Time of first sample of the trace as a multiple of the sampling interval. All
105# GF samples must be evaluated at multiples of the sampling interval with
106# respect to a global reference time zero. Must be set also for zero traces.
107#
108# - nsamples
109#
110# Number of samples in the GF trace. Should be set to zero for zero traces.
111#
112# - begin_value, end_value
113#
114# Values of first and last sample. These values are included in data[]
115# redunantly.
118class NotMultipleOfSamplingInterval(Exception):
119 pass
122sampling_check_eps = 1e-5
125class GFTrace(object):
126 '''
127 Green's Function trace class for handling traces from the GF store.
128 '''
130 @classmethod
131 def from_trace(cls, tr):
132 return cls(data=tr.ydata.copy(), tmin=tr.tmin, deltat=tr.deltat)
134 def __init__(self, data=None, itmin=None, deltat=1.0,
135 is_zero=False, begin_value=None, end_value=None, tmin=None):
137 assert sum((x is None) for x in (tmin, itmin)) == 1, \
138 'GFTrace: either tmin or itmin must be given'\
140 if tmin is not None:
141 itmin = int(round(tmin / deltat))
142 if abs(itmin*deltat - tmin) > sampling_check_eps*deltat:
143 raise NotMultipleOfSamplingInterval(
144 'GFTrace: tmin (%g) is not a multiple of sampling '
145 'interval (%g)' % (tmin, deltat))
147 if data is not None:
148 data = num.asarray(data, dtype=gf_dtype)
149 else:
150 data = num.array([], dtype=gf_dtype)
151 begin_value = 0.0
152 end_value = 0.0
154 self.data = weakref.ref(data)()
155 self.itmin = itmin
156 self.deltat = deltat
157 self.is_zero = is_zero
158 self.n_records_stacked = 0.
159 self.t_stack = 0.
160 self.t_optimize = 0.
161 self.err = SeismosizerErrorEnum.SUCCESS
163 if data is not None and data.size > 0:
164 if begin_value is None:
165 begin_value = data[0]
166 if end_value is None:
167 end_value = data[-1]
169 self.begin_value = begin_value
170 self.end_value = end_value
172 @property
173 def t(self):
174 '''
175 Time vector of the GF trace.
177 :returns: Time vector
178 :rtype: :py:class:`numpy.ndarray`
179 '''
180 return num.linspace(
181 self.itmin*self.deltat,
182 (self.itmin+self.data.size-1)*self.deltat, self.data.size)
184 def __str__(self, itmin=0):
186 if self.is_zero:
187 return 'ZERO'
189 s = []
190 for i in range(itmin, self.itmin + self.data.size):
191 if i >= self.itmin and i < self.itmin + self.data.size:
192 s.append('%7.4g' % self.data[i-self.itmin])
193 else:
194 s.append(' '*7)
196 return '|'.join(s)
198 def to_trace(self, net, sta, loc, cha):
199 from pyrocko import trace
200 return trace.Trace(
201 net, sta, loc, cha,
202 ydata=self.data,
203 deltat=self.deltat,
204 tmin=self.itmin*self.deltat)
207class GFValue(object):
209 def __init__(self, value):
210 self.value = value
211 self.n_records_stacked = 0.
212 self.t_stack = 0.
213 self.t_optimize = 0.
216Zero = GFTrace(is_zero=True, itmin=0)
219def make_same_span(tracesdict):
221 traces = tracesdict.values()
223 nonzero = [tr for tr in traces if not tr.is_zero]
224 if not nonzero:
225 return {k: Zero for k in tracesdict.keys()}
227 deltat = nonzero[0].deltat
229 itmin = min(tr.itmin for tr in nonzero)
230 itmax = max(tr.itmin+tr.data.size for tr in nonzero) - 1
232 out = {}
233 for k, tr in tracesdict.items():
234 n = itmax - itmin + 1
235 if tr.itmin != itmin or tr.data.size != n:
236 data = num.zeros(n, dtype=gf_dtype)
237 if not tr.is_zero:
238 lo = tr.itmin-itmin
239 hi = lo + tr.data.size
240 data[:lo] = tr.data[0]
241 data[lo:hi] = tr.data
242 data[hi:] = tr.data[-1]
244 tr = GFTrace(data, itmin, deltat)
246 out[k] = tr
248 return out
251class CannotCreate(StoreError):
252 pass
255class CannotOpen(StoreError):
256 pass
259class DuplicateInsert(StoreError):
260 pass
263class ShortRead(StoreError):
264 def __str__(self):
265 return 'unexpected end of data (truncated traces file?)'
268class NotAllowedToInterpolate(StoreError):
269 def __str__(self):
270 return 'not allowed to interpolate'
273class NoSuchExtra(StoreError):
274 def __init__(self, s):
275 StoreError.__init__(self)
276 self.value = s
278 def __str__(self):
279 return 'extra information for key "%s" not found.' % self.value
282class NoSuchPhase(StoreError):
283 def __init__(self, s):
284 StoreError.__init__(self)
285 self.value = s
287 def __str__(self):
288 return 'phase for key "%s" not found. ' \
289 'Running "fomosto ttt" may be needed.' % self.value
292def remove_if_exists(fn, force=False):
293 if os.path.exists(fn):
294 if force:
295 os.remove(fn)
296 else:
297 raise CannotCreate('file %s already exists' % fn)
300def get_extra_path(store_dir, key):
301 check_string_id(key)
302 return os.path.join(store_dir, 'extra', key)
305class BaseStore(object):
307 @staticmethod
308 def lock_fn_(store_dir):
309 return os.path.join(store_dir, 'lock')
311 @staticmethod
312 def index_fn_(store_dir):
313 return os.path.join(store_dir, 'index')
315 @staticmethod
316 def data_fn_(store_dir):
317 return os.path.join(store_dir, 'traces')
319 @staticmethod
320 def config_fn_(store_dir):
321 return os.path.join(store_dir, 'config')
323 @staticmethod
324 def create(store_dir, deltat, nrecords, force=False):
326 try:
327 util.ensuredir(store_dir)
328 except Exception:
329 raise CannotCreate('cannot create directory %s' % store_dir)
331 index_fn = BaseStore.index_fn_(store_dir)
332 data_fn = BaseStore.data_fn_(store_dir)
334 for fn in (index_fn, data_fn):
335 remove_if_exists(fn, force)
337 with open(index_fn, 'wb') as f:
338 f.write(struct.pack(gf_store_header_fmt, nrecords, deltat))
339 records = num.zeros(nrecords, dtype=gf_record_dtype)
340 records.tofile(f)
342 with open(data_fn, 'wb') as f:
343 f.write(b'\0' * 32)
345 def __init__(self, store_dir, mode='r', use_memmap=True):
346 assert mode in 'rw'
347 self.store_dir = store_dir
348 self.mode = mode
349 self._use_memmap = use_memmap
350 self._nrecords = None
351 self._deltat = None
352 self._f_index = None
353 self._f_data = None
354 self._records = None
355 self.cstore = None
357 def open(self):
358 assert self._f_index is None
359 index_fn = self.index_fn()
360 data_fn = self.data_fn()
362 if self.mode == 'r':
363 fmode = 'rb'
364 elif self.mode == 'w':
365 fmode = 'r+b'
366 else:
367 assert False, 'invalid mode: %s' % self.mode
369 try:
370 self._f_index = open(index_fn, fmode)
371 self._f_data = open(data_fn, fmode)
372 except Exception:
373 self.mode = ''
374 raise CannotOpen('cannot open gf store: %s' % self.store_dir)
376 try:
377 # replace single precision deltat value in store with the double
378 # precision value from the config, if available
379 self.cstore = store_ext.store_init(
380 self._f_index.fileno(), self._f_data.fileno(),
381 self.get_deltat() or 0.0)
383 except store_ext.StoreExtError as e:
384 raise StoreError(str(e))
386 while True:
387 try:
388 dataheader = self._f_index.read(gf_store_header_fmt_size)
389 break
391 except IOError as e:
392 # occasionally got this one on an NFS volume
393 if e.errno == errno.EBUSY:
394 time.sleep(0.01)
395 else:
396 raise
398 nrecords, deltat = struct.unpack(gf_store_header_fmt, dataheader)
399 self._nrecords = nrecords
400 self._deltat = deltat
402 self._load_index()
404 def __del__(self):
405 if self.mode != '':
406 self.close()
408 def lock(self):
409 if not fcntl:
410 lock_fn = self.lock_fn()
411 util.create_lockfile(lock_fn)
412 else:
413 if not self._f_index:
414 self.open()
416 while True:
417 try:
418 fcntl.lockf(self._f_index, fcntl.LOCK_EX)
419 break
421 except IOError as e:
422 if e.errno == errno.ENOLCK:
423 time.sleep(0.01)
424 else:
425 raise
427 def unlock(self):
428 if not fcntl:
429 lock_fn = self.lock_fn()
430 util.delete_lockfile(lock_fn)
431 else:
432 self._f_data.flush()
433 self._f_index.flush()
434 fcntl.lockf(self._f_index, fcntl.LOCK_UN)
436 def put(self, irecord, trace):
437 self._put(irecord, trace)
439 def get_record(self, irecord):
440 return self._get_record(irecord)
442 def get_span(self, irecord, decimate=1):
443 return self._get_span(irecord, decimate=decimate)
445 def get(self, irecord, itmin=None, nsamples=None, decimate=1,
446 implementation='c'):
447 return self._get(irecord, itmin, nsamples, decimate, implementation)
449 def sum(self, irecords, delays, weights, itmin=None,
450 nsamples=None, decimate=1, implementation='c',
451 optimization='enable'):
452 return self._sum(irecords, delays, weights, itmin, nsamples, decimate,
453 implementation, optimization)
455 def irecord_format(self):
456 return util.zfmt(self._nrecords)
458 def str_irecord(self, irecord):
459 return self.irecord_format() % irecord
461 def close(self):
462 if self.mode == 'w':
463 if not self._f_index:
464 self.open()
465 self._save_index()
467 if self._f_data:
468 self._f_data.close()
469 self._f_data = None
471 if self._f_index:
472 self._f_index.close()
473 self._f_index = None
475 del self._records
476 self._records = None
478 self.mode = ''
480 def _get_record(self, irecord):
481 if not self._f_index:
482 self.open()
484 return self._records[irecord]
486 def _get(self, irecord, itmin, nsamples, decimate, implementation):
487 '''
488 Retrieve complete GF trace from storage.
489 '''
491 if not self._f_index:
492 self.open()
494 if not self.mode == 'r':
495 raise StoreError('store not open in read mode')
497 if implementation == 'c' and decimate == 1:
499 if nsamples is None:
500 nsamples = -1
502 if itmin is None:
503 itmin = 0
505 try:
506 return GFTrace(*store_ext.store_get(
507 self.cstore, int(irecord), int(itmin), int(nsamples)))
508 except store_ext.StoreExtError as e:
509 raise StoreError(str(e))
511 else:
512 return self._get_impl_reference(irecord, itmin, nsamples, decimate)
514 def get_deltat(self):
515 return self._deltat
517 def _get_impl_reference(self, irecord, itmin, nsamples, decimate):
518 deltat = self.get_deltat()
520 if not (0 <= irecord < self._nrecords):
521 raise StoreError('invalid record number requested '
522 '(irecord = %i, nrecords = %i)' %
523 (irecord, self._nrecords))
525 ipos, itmin_data, nsamples_data, begin_value, end_value = \
526 self._records[irecord]
528 if None in (itmin, nsamples):
529 itmin = itmin_data
530 itmax = itmin_data + nsamples_data - 1
531 nsamples = nsamples_data
532 else:
533 itmin = itmin * decimate
534 nsamples = nsamples * decimate
535 itmax = itmin + nsamples - decimate
537 if ipos == 0:
538 return None
540 elif ipos == 1:
541 itmin_ext = (max(itmin, itmin_data)//decimate) * decimate
542 return GFTrace(is_zero=True, itmin=itmin_ext//decimate)
544 if decimate == 1:
545 ilo = max(itmin, itmin_data) - itmin_data
546 ihi = min(itmin+nsamples, itmin_data+nsamples_data) - itmin_data
547 data = self._get_data(ipos, begin_value, end_value, ilo, ihi)
549 return GFTrace(data, itmin=itmin_data+ilo, deltat=deltat,
550 begin_value=begin_value, end_value=end_value)
552 else:
553 itmax_data = itmin_data + nsamples_data - 1
555 # put begin and end to multiples of new sampling rate
556 itmin_ext = (max(itmin, itmin_data)//decimate) * decimate
557 itmax_ext = -((-min(itmax, itmax_data))//decimate) * decimate
558 nsamples_ext = itmax_ext - itmin_ext + 1
560 # add some padding for the aa filter
561 order = 30
562 itmin_ext_pad = itmin_ext - order//2
563 itmax_ext_pad = itmax_ext + order//2
564 nsamples_ext_pad = itmax_ext_pad - itmin_ext_pad + 1
566 itmin_overlap = max(itmin_data, itmin_ext_pad)
567 itmax_overlap = min(itmax_data, itmax_ext_pad)
569 ilo = itmin_overlap - itmin_ext_pad
570 ihi = max(ilo, itmax_overlap - itmin_ext_pad + 1)
571 ilo_data = itmin_overlap - itmin_data
572 ihi_data = max(ilo_data, itmax_overlap - itmin_data + 1)
574 data_ext_pad = num.empty(nsamples_ext_pad, dtype=gf_dtype)
575 data_ext_pad[ilo:ihi] = self._get_data(
576 ipos, begin_value, end_value, ilo_data, ihi_data)
578 data_ext_pad[:ilo] = begin_value
579 data_ext_pad[ihi:] = end_value
581 b = signal.firwin(order + 1, 1. / decimate, window='hamming')
582 a = 1.
583 data_filt_pad = signal.lfilter(b, a, data_ext_pad)
584 data_deci = data_filt_pad[order:order+nsamples_ext:decimate]
585 if data_deci.size >= 1:
586 if itmin_ext <= itmin_data:
587 data_deci[0] = begin_value
589 if itmax_ext >= itmax_data:
590 data_deci[-1] = end_value
592 return GFTrace(data_deci, itmin_ext//decimate,
593 deltat*decimate,
594 begin_value=begin_value, end_value=end_value)
596 def _get_span(self, irecord, decimate=1):
597 '''
598 Get temporal extent of GF trace at given index.
599 '''
601 if not self._f_index:
602 self.open()
604 assert 0 <= irecord < self._nrecords, \
605 'irecord = %i, nrecords = %i' % (irecord, self._nrecords)
607 (_, itmin, nsamples, _, _) = self._records[irecord]
609 itmax = itmin + nsamples - 1
611 if decimate == 1:
612 return itmin, itmax
613 else:
614 if nsamples == 0:
615 return itmin//decimate, itmin//decimate - 1
616 else:
617 return itmin//decimate, -((-itmax)//decimate)
619 def _put(self, irecord, trace):
620 '''
621 Save GF trace to storage.
622 '''
624 if not self._f_index:
625 self.open()
627 deltat = self.get_deltat()
629 assert self.mode == 'w'
630 assert trace.is_zero or \
631 abs(trace.deltat - deltat) < 1e-7 * deltat
632 assert 0 <= irecord < self._nrecords, \
633 'irecord = %i, nrecords = %i' % (irecord, self._nrecords)
635 if self._records[irecord][0] != 0:
636 raise DuplicateInsert('record %i already in store' % irecord)
638 if trace.is_zero or num.all(trace.data == 0.0):
639 self._records[irecord] = (1, trace.itmin, 0, 0., 0.)
640 return
642 ndata = trace.data.size
644 if ndata > 2:
645 self._f_data.seek(0, 2)
646 ipos = self._f_data.tell()
647 trace.data.astype(gf_dtype_store).tofile(self._f_data)
648 else:
649 ipos = 2
651 self._records[irecord] = (ipos, trace.itmin, ndata,
652 trace.data[0], trace.data[-1])
654 def _sum_impl_alternative(self, irecords, delays, weights, itmin, nsamples,
655 decimate):
657 '''
658 Sum delayed and weighted GF traces.
659 '''
661 if not self._f_index:
662 self.open()
664 assert self.mode == 'r'
666 deltat = self.get_deltat() * decimate
668 if len(irecords) == 0:
669 if None in (itmin, nsamples):
670 return Zero
671 else:
672 return GFTrace(
673 num.zeros(nsamples, dtype=gf_dtype), itmin,
674 deltat, is_zero=True)
676 assert len(irecords) == len(delays)
677 assert len(irecords) == len(weights)
679 if None in (itmin, nsamples):
680 itmin_delayed, itmax_delayed = [], []
681 for irecord, delay in zip(irecords, delays):
682 itmin, itmax = self._get_span(irecord, decimate=decimate)
683 itmin_delayed.append(itmin + delay/deltat)
684 itmax_delayed.append(itmax + delay/deltat)
686 itmin = int(math.floor(min(itmin_delayed)))
687 nsamples = int(math.ceil(max(itmax_delayed))) - itmin + 1
689 out = num.zeros(nsamples, dtype=gf_dtype)
690 if nsamples == 0:
691 return GFTrace(out, itmin, deltat)
693 for ii in range(len(irecords)):
694 irecord = irecords[ii]
695 delay = delays[ii]
696 weight = weights[ii]
698 if weight == 0.0:
699 continue
701 idelay_floor = int(math.floor(delay/deltat))
702 idelay_ceil = int(math.ceil(delay/deltat))
704 gf_trace = self._get(
705 irecord,
706 itmin - idelay_ceil,
707 nsamples + idelay_ceil - idelay_floor,
708 decimate,
709 'reference')
711 assert gf_trace.itmin >= itmin - idelay_ceil
712 assert gf_trace.data.size <= nsamples + idelay_ceil - idelay_floor
714 if gf_trace.is_zero:
715 continue
717 ilo = gf_trace.itmin - itmin + idelay_floor
718 ihi = ilo + gf_trace.data.size + (idelay_ceil-idelay_floor)
720 data = gf_trace.data
722 if idelay_floor == idelay_ceil:
723 out[ilo:ihi] += data * weight
724 else:
725 if data.size:
726 k = 1
727 if ihi <= nsamples:
728 out[ihi-1] += gf_trace.end_value * \
729 ((idelay_ceil-delay/deltat) * weight)
730 k = 0
732 out[ilo+1:ihi-k] += data[:data.size-k] * \
733 ((delay/deltat-idelay_floor) * weight)
734 k = 1
735 if ilo >= 0:
736 out[ilo] += gf_trace.begin_value * \
737 ((delay/deltat-idelay_floor) * weight)
738 k = 0
740 out[ilo+k:ihi-1] += data[k:] * \
741 ((idelay_ceil-delay/deltat) * weight)
743 if ilo > 0 and gf_trace.begin_value != 0.0:
744 out[:ilo] += gf_trace.begin_value * weight
746 if ihi < nsamples and gf_trace.end_value != 0.0:
747 out[ihi:] += gf_trace.end_value * weight
749 return GFTrace(out, itmin, deltat)
751 def _sum_impl_reference(self, irecords, delays, weights, itmin, nsamples,
752 decimate):
754 if not self._f_index:
755 self.open()
757 deltat = self.get_deltat() * decimate
759 if len(irecords) == 0:
760 if None in (itmin, nsamples):
761 return Zero
762 else:
763 return GFTrace(
764 num.zeros(nsamples, dtype=gf_dtype), itmin,
765 deltat, is_zero=True)
767 datas = []
768 itmins = []
769 for i, delay, weight in zip(irecords, delays, weights):
770 tr = self._get(i, None, None, decimate, 'reference')
772 idelay_floor = int(math.floor(delay/deltat))
773 idelay_ceil = int(math.ceil(delay/deltat))
775 if idelay_floor == idelay_ceil:
776 itmins.append(tr.itmin + idelay_floor)
777 datas.append(tr.data.copy()*weight)
778 else:
779 itmins.append(tr.itmin + idelay_floor)
780 datas.append(tr.data.copy()*weight*(idelay_ceil-delay/deltat))
781 itmins.append(tr.itmin + idelay_ceil)
782 datas.append(tr.data.copy()*weight*(delay/deltat-idelay_floor))
784 itmin_all = min(itmins)
786 itmax_all = max(itmin_ + data.size for (itmin_, data) in
787 zip(itmins, datas))
789 if itmin is not None:
790 itmin_all = min(itmin_all, itmin)
791 if nsamples is not None:
792 itmax_all = max(itmax_all, itmin+nsamples)
794 nsamples_all = itmax_all - itmin_all
796 arr = num.zeros((len(datas), nsamples_all), dtype=gf_dtype)
797 for i, itmin_, data in zip(num.arange(len(datas)), itmins, datas):
798 if data.size > 0:
799 ilo = itmin_-itmin_all
800 ihi = ilo + data.size
801 arr[i, :ilo] = data[0]
802 arr[i, ilo:ihi] = data
803 arr[i, ihi:] = data[-1]
805 sum_arr = arr.sum(axis=0)
807 if itmin is not None and nsamples is not None:
808 ilo = itmin-itmin_all
809 ihi = ilo + nsamples
810 sum_arr = sum_arr[ilo:ihi]
812 else:
813 itmin = itmin_all
815 return GFTrace(sum_arr, itmin, deltat)
817 def _optimize(self, irecords, delays, weights):
818 if num.unique(irecords).size == irecords.size:
819 return irecords, delays, weights
821 deltat = self.get_deltat()
823 delays = delays / deltat
824 irecords2 = num.repeat(irecords, 2)
825 delays2 = num.empty(irecords2.size, dtype=float)
826 delays2[0::2] = num.floor(delays)
827 delays2[1::2] = num.ceil(delays)
828 weights2 = num.repeat(weights, 2)
829 weights2[0::2] *= 1.0 - (delays - delays2[0::2])
830 weights2[1::2] *= (1.0 - (delays2[1::2] - delays)) * \
831 (delays2[1::2] - delays2[0::2])
833 delays2 *= deltat
835 iorder = num.lexsort((delays2, irecords2))
837 irecords2 = irecords2[iorder]
838 delays2 = delays2[iorder]
839 weights2 = weights2[iorder]
841 ui = num.empty(irecords2.size, dtype=bool)
842 ui[1:] = num.logical_or(num.diff(irecords2) != 0,
843 num.diff(delays2) != 0.)
845 ui[0] = 0
846 ind2 = num.cumsum(ui)
847 ui[0] = 1
848 ind1 = num.where(ui)[0]
850 irecords3 = irecords2[ind1]
851 delays3 = delays2[ind1]
852 weights3 = num.bincount(ind2, weights2)
854 return irecords3, delays3, weights3
856 def _sum(self, irecords, delays, weights, itmin, nsamples, decimate,
857 implementation, optimization):
859 if not self._f_index:
860 self.open()
862 t0 = time.time()
863 if optimization == 'enable':
864 irecords, delays, weights = self._optimize(
865 irecords, delays, weights)
866 else:
867 assert optimization == 'disable'
869 t1 = time.time()
870 deltat = self.get_deltat()
872 if implementation == 'c' and decimate == 1:
873 if delays.size != 0:
874 itoffset = int(num.floor(num.min(delays)/deltat))
875 else:
876 itoffset = 0
878 if nsamples is None:
879 nsamples = -1
881 if itmin is None:
882 itmin = 0
883 else:
884 itmin -= itoffset
886 try:
887 tr = GFTrace(*store_ext.store_sum(
888 self.cstore, irecords.astype(num.uint64),
889 delays - itoffset*deltat,
890 weights.astype(num.float32),
891 int(itmin), int(nsamples)))
893 tr.itmin += itoffset
895 except store_ext.StoreExtError as e:
896 raise StoreError(str(e) + ' in store %s' % self.store_dir)
898 elif implementation == 'alternative':
899 tr = self._sum_impl_alternative(irecords, delays, weights,
900 itmin, nsamples, decimate)
902 else:
903 tr = self._sum_impl_reference(irecords, delays, weights,
904 itmin, nsamples, decimate)
906 t2 = time.time()
908 tr.n_records_stacked = irecords.size
909 tr.t_optimize = t1 - t0
910 tr.t_stack = t2 - t1
912 return tr
914 def _sum_statics(self, irecords, delays, weights, it, ntargets,
915 nthreads):
916 if not self._f_index:
917 self.open()
919 return store_ext.store_sum_static(
920 self.cstore, irecords, delays, weights, it, ntargets, nthreads)
922 def _load_index(self):
923 if self._use_memmap:
924 records = num.memmap(
925 self._f_index, dtype=gf_record_dtype,
926 offset=gf_store_header_fmt_size,
927 mode=('r', 'r+')[self.mode == 'w'])
929 else:
930 self._f_index.seek(gf_store_header_fmt_size)
931 records = num.fromfile(self._f_index, dtype=gf_record_dtype)
933 assert len(records) == self._nrecords
935 self._records = records
937 def _save_index(self):
938 self._f_index.seek(0)
939 self._f_index.write(struct.pack(gf_store_header_fmt, self._nrecords,
940 self.get_deltat()))
942 if self._use_memmap:
943 self._records.flush()
944 else:
945 self._f_index.seek(gf_store_header_fmt_size)
946 self._records.tofile(self._f_index)
947 self._f_index.flush()
949 def _get_data(self, ipos, begin_value, end_value, ilo, ihi):
950 if ihi - ilo > 0:
951 if ipos == 2:
952 data_orig = num.empty(2, dtype=gf_dtype)
953 data_orig[0] = begin_value
954 data_orig[1] = end_value
955 return data_orig[ilo:ihi]
956 else:
957 self._f_data.seek(
958 int(ipos + ilo*gf_dtype_nbytes_per_sample))
959 arr = num.fromfile(
960 self._f_data, gf_dtype_store, ihi-ilo).astype(gf_dtype)
962 if arr.size != ihi-ilo:
963 raise ShortRead()
964 return arr
965 else:
966 return num.empty((0,), dtype=gf_dtype)
968 def lock_fn(self):
969 return BaseStore.lock_fn_(self.store_dir)
971 def index_fn(self):
972 return BaseStore.index_fn_(self.store_dir)
974 def data_fn(self):
975 return BaseStore.data_fn_(self.store_dir)
977 def config_fn(self):
978 return BaseStore.config_fn_(self.store_dir)
980 def count_special_records(self):
981 if not self._f_index:
982 self.open()
984 return num.histogram(
985 self._records['data_offset'],
986 bins=[0, 1, 2, 3, num.array(-1).astype(num.uint64)])[0]
988 @property
989 def size_index(self):
990 return os.stat(self.index_fn()).st_size
992 @property
993 def size_data(self):
994 return os.stat(self.data_fn()).st_size
996 @property
997 def size_index_and_data(self):
998 return self.size_index + self.size_data
1000 @property
1001 def size_index_and_data_human(self):
1002 return util.human_bytesize(self.size_index_and_data)
1004 def stats(self):
1005 counter = self.count_special_records()
1007 stats = dict(
1008 total=self._nrecords,
1009 inserted=(counter[1] + counter[2] + counter[3]),
1010 empty=counter[0],
1011 short=counter[2],
1012 zero=counter[1],
1013 size_data=self.size_data,
1014 size_index=self.size_index,
1015 )
1017 return stats
1019 stats_keys = 'total inserted empty short zero size_data size_index'.split()
1022def remake_dir(dpath, force):
1023 if os.path.exists(dpath):
1024 if force:
1025 shutil.rmtree(dpath)
1026 else:
1027 raise CannotCreate('Directory "%s" already exists.' % dpath)
1029 os.mkdir(dpath)
1032class MakeTimingParamsFailed(StoreError):
1033 pass
1036class Store(BaseStore):
1038 '''
1039 Green's function disk storage and summation machine.
1041 The `Store` can be used to efficiently store, retrieve, and sum Green's
1042 function traces. A `Store` contains many 1D time traces sampled at even
1043 multiples of a global sampling rate, where each time trace has an
1044 individual start and end time. The traces are treated as having repeating
1045 end points, so the functions they represent can be non-constant only
1046 between begin and end time. It provides capabilities to retrieve decimated
1047 traces and to extract parts of the traces. The main purpose of this class
1048 is to provide a fast, easy to use, and flexible machanism to compute
1049 weighted delay-and-sum stacks with many Green's function traces involved.
1051 Individual Green's functions are accessed through a single integer index at
1052 low level. On top of that, various indexing schemes can be implemented by
1053 providing a mapping from physical coordinates to the low level index `i`.
1054 E.g. for a problem with cylindrical symmetry, one might define a mapping
1055 from the coordinates (`receiver_depth`, `source_depth`, `distance`) to the
1056 low level index. Index translation is done in the
1057 :py:class:`pyrocko.gf.meta.Config` subclass object associated with the
1058 Store. It is accessible through the store's :py:attr:`config` attribute,
1059 and contains all meta information about the store, including physical
1060 extent, geometry, sampling rate, and information about the type of the
1061 stored Green's functions. Information about the underlying earth model
1062 can also be found there.
1064 A GF store can also contain tabulated phase arrivals. In basic cases, these
1065 can be created with the :py:meth:`make_travel_time_tables` and evaluated
1066 with the :py:func:`t` methods.
1068 .. attribute:: config
1070 The :py:class:`pyrocko.gf.meta.Config` derived object associated with
1071 the store which contains all meta information about the store and
1072 provides the high-level to low-level index mapping.
1074 .. attribute:: store_dir
1076 Path to the store's data directory.
1078 .. attribute:: mode
1080 The mode in which the store is opened: ``'r'``: read-only, ``'w'``:
1081 writeable.
1082 '''
1084 @classmethod
1085 def create(cls, store_dir, config, force=False, extra=None):
1086 '''
1087 Create new GF store.
1089 Creates a new GF store at path ``store_dir``. The layout of the GF is
1090 defined with the parameters given in ``config``, which should be an
1091 object of a subclass of :py:class:`~pyrocko.gf.meta.Config`. This
1092 function will refuse to overwrite an existing GF store, unless
1093 ``force`` is set to ``True``. If more information, e.g. parameters
1094 used for the modelling code, earth models or other, should be saved
1095 along with the GF store, these may be provided though a dict given to
1096 ``extra``. The keys of this dict must be names and the values must be
1097 *guts* type objects.
1099 :param store_dir: GF store path
1100 :type store_dir: str
1101 :param config: GF store Config
1102 :type config: :py:class:`~pyrocko.gf.meta.Config`
1103 :param force: Force overwrite, defaults to ``False``
1104 :type force: bool
1105 :param extra: Extra information
1106 :type extra: dict or None
1107 '''
1109 cls.create_editables(store_dir, config, force=force, extra=extra)
1110 cls.create_dependants(store_dir, force=force)
1112 return cls(store_dir)
1114 @staticmethod
1115 def create_editables(store_dir, config, force=False, extra=None):
1116 try:
1117 util.ensuredir(store_dir)
1118 except Exception:
1119 raise CannotCreate('cannot create directory %s' % store_dir)
1121 fns = []
1123 config_fn = os.path.join(store_dir, 'config')
1124 remove_if_exists(config_fn, force)
1125 meta.dump(config, filename=config_fn)
1127 fns.append(config_fn)
1129 for sub_dir in ['extra']:
1130 dpath = os.path.join(store_dir, sub_dir)
1131 remake_dir(dpath, force)
1133 if extra:
1134 for k, v in extra.items():
1135 fn = get_extra_path(store_dir, k)
1136 remove_if_exists(fn, force)
1137 meta.dump(v, filename=fn)
1139 fns.append(fn)
1141 return fns
1143 @staticmethod
1144 def create_dependants(store_dir, force=False):
1145 config_fn = os.path.join(store_dir, 'config')
1146 config = meta.load(filename=config_fn)
1148 BaseStore.create(store_dir, config.deltat, config.nrecords,
1149 force=force)
1151 for sub_dir in ['decimated']:
1152 dpath = os.path.join(store_dir, sub_dir)
1153 remake_dir(dpath, force)
1155 def __init__(self, store_dir, mode='r', use_memmap=True):
1156 BaseStore.__init__(self, store_dir, mode=mode, use_memmap=use_memmap)
1157 config_fn = self.config_fn()
1158 if not os.path.isfile(config_fn):
1159 raise StoreError(
1160 'directory "%s" does not seem to contain a GF store '
1161 '("config" file not found)' % store_dir)
1162 self.load_config()
1164 self._decimated = {}
1165 self._extra = {}
1166 self._phases = {}
1167 for decimate in range(2, 9):
1168 if os.path.isdir(self._decimated_store_dir(decimate)):
1169 self._decimated[decimate] = None
1171 def open(self):
1172 if not self._f_index:
1173 BaseStore.open(self)
1174 c = self.config
1176 mscheme = 'type_' + c.short_type.lower()
1177 store_ext.store_mapping_init(
1178 self.cstore, mscheme,
1179 c.mins, c.maxs, c.deltas, c.ns.astype(num.uint64),
1180 c.ncomponents)
1182 def save_config(self, make_backup=False):
1183 config_fn = self.config_fn()
1184 if make_backup:
1185 os.rename(config_fn, config_fn + '~')
1187 meta.dump(self.config, filename=config_fn)
1189 def get_deltat(self):
1190 return self.config.deltat
1192 def load_config(self):
1193 self.config = meta.load(filename=self.config_fn())
1195 def ensure_reference(self, force=False):
1196 if self.config.reference is not None and not force:
1197 return
1198 self.ensure_uuid()
1199 reference = '%s-%s' % (self.config.id, self.config.uuid[0:6])
1201 if self.config.reference is not None:
1202 self.config.reference = reference
1203 self.save_config()
1204 else:
1205 with open(self.config_fn(), 'a') as config:
1206 config.write('reference: %s\n' % reference)
1207 self.load_config()
1209 def ensure_uuid(self, force=False):
1210 if self.config.uuid is not None and not force:
1211 return
1212 uuid = self.create_store_hash()
1214 if self.config.uuid is not None:
1215 self.config.uuid = uuid
1216 self.save_config()
1217 else:
1218 with open(self.config_fn(), 'a') as config:
1219 config.write('\nuuid: %s\n' % uuid)
1220 self.load_config()
1222 def create_store_hash(self):
1223 logger.info('creating hash for store ...')
1224 m = hashlib.sha1()
1226 traces_size = op.getsize(self.data_fn())
1227 with open(self.data_fn(), 'rb') as traces:
1228 while traces.tell() < traces_size:
1229 m.update(traces.read(4096))
1230 traces.seek(1024 * 1024 * 10, 1)
1232 with open(self.config_fn(), 'rb') as config:
1233 m.update(config.read())
1234 return m.hexdigest()
1236 def get_extra_path(self, key):
1237 return get_extra_path(self.store_dir, key)
1239 def get_extra(self, key):
1240 '''
1241 Get extra information stored under given key.
1242 '''
1244 x = self._extra
1245 if key not in x:
1246 fn = self.get_extra_path(key)
1247 if not os.path.exists(fn):
1248 raise NoSuchExtra(key)
1250 x[key] = meta.load(filename=fn)
1252 return x[key]
1254 def upgrade(self):
1255 '''
1256 Upgrade store config and files to latest Pyrocko version.
1257 '''
1258 fns = [os.path.join(self.store_dir, 'config')]
1259 for key in self.extra_keys():
1260 fns.append(self.get_extra_path(key))
1262 i = 0
1263 for fn in fns:
1264 i += util.pf_upgrade(fn)
1266 return i
1268 def extra_keys(self):
1269 return [x for x in os.listdir(os.path.join(self.store_dir, 'extra'))
1270 if valid_string_id(x)]
1272 def put(self, args, trace):
1273 '''
1274 Insert trace into GF store.
1276 Store a single GF trace at (high-level) index ``args``.
1278 :param args: :py:class:`~pyrocko.gf.meta.Config` index tuple, e.g.
1279 ``(source_depth, distance, component)`` as in
1280 :py:class:`~pyrocko.gf.meta.ConfigTypeA`.
1281 :type args: tuple
1282 :returns: GF trace at ``args``
1283 :rtype: :py:class:`~pyrocko.gf.store.GFTrace`
1284 '''
1286 irecord = self.config.irecord(*args)
1287 self._put(irecord, trace)
1289 def get_record(self, args):
1290 irecord = self.config.irecord(*args)
1291 return self._get_record(irecord)
1293 def str_irecord(self, args):
1294 return BaseStore.str_irecord(self, self.config.irecord(*args))
1296 def get(self, args, itmin=None, nsamples=None, decimate=1,
1297 interpolation='nearest_neighbor', implementation='c'):
1298 '''
1299 Retrieve GF trace from store.
1301 Retrieve a single GF trace from the store at (high-level) index
1302 ``args``. By default, the full trace is retrieved. Given ``itmin`` and
1303 ``nsamples``, only the selected portion of the trace is extracted. If
1304 ``decimate`` is an integer in the range [2,8], the trace is decimated
1305 on the fly or, if available, the trace is read from a decimated version
1306 of the GF store.
1308 :param args: :py:class:`~pyrocko.gf.meta.Config` index tuple, e.g.
1309 ``(source_depth, distance, component)`` as in
1310 :py:class:`~pyrocko.gf.meta.ConfigTypeA`.
1311 :type args: tuple
1312 :param itmin: Start time index (start time is ``itmin * dt``),
1313 defaults to None
1314 :type itmin: int or None
1315 :param nsamples: Number of samples, defaults to None
1316 :type nsamples: int or None
1317 :param decimate: Decimatation factor, defaults to 1
1318 :type decimate: int
1319 :param interpolation: Interpolation method
1320 ``['nearest_neighbor', 'multilinear', 'off']``, defaults to
1321 ``'nearest_neighbor'``
1322 :type interpolation: str
1323 :param implementation: Implementation to use ``['c', 'reference']``,
1324 defaults to ``'c'``.
1325 :type implementation: str
1327 :returns: GF trace at ``args``
1328 :rtype: :py:class:`~pyrocko.gf.store.GFTrace`
1329 '''
1331 store, decimate = self._decimated_store(decimate)
1332 if interpolation == 'nearest_neighbor':
1333 irecord = store.config.irecord(*args)
1334 tr = store._get(irecord, itmin, nsamples, decimate,
1335 implementation)
1337 elif interpolation == 'off':
1338 irecords, weights = store.config.vicinity(*args)
1339 if len(irecords) != 1:
1340 raise NotAllowedToInterpolate()
1341 else:
1342 tr = store._get(irecords[0], itmin, nsamples, decimate,
1343 implementation)
1345 elif interpolation == 'multilinear':
1346 tr = store._sum(irecords, num.zeros(len(irecords)), weights,
1347 itmin, nsamples, decimate, implementation,
1348 'disable')
1350 # to prevent problems with rounding errors (BaseStore saves deltat
1351 # as a 4-byte floating point value, value from YAML config is more
1352 # accurate)
1353 tr.deltat = self.config.deltat * decimate
1354 return tr
1356 def sum(self, args, delays, weights, itmin=None, nsamples=None,
1357 decimate=1, interpolation='nearest_neighbor', implementation='c',
1358 optimization='enable'):
1359 '''
1360 Sum delayed and weighted GF traces.
1362 Calculate sum of delayed and weighted GF traces. ``args`` is a tuple of
1363 arrays forming the (high-level) indices of the GF traces to be
1364 selected. Delays and weights for the summation are given in the arrays
1365 ``delays`` and ``weights``. ``itmin`` and ``nsamples`` can be given to
1366 restrict to computation to a given time interval. If ``decimate`` is
1367 an integer in the range [2,8], decimated traces are used in the
1368 summation.
1370 :param args: :py:class:`~pyrocko.gf.meta.Config` index tuple, e.g.
1371 ``(source_depth, distance, component)`` as in
1372 :py:class:`~pyrocko.gf.meta.ConfigTypeA`.
1373 :type args: tuple(numpy.ndarray)
1374 :param delays: Delay times
1375 :type delays: :py:class:`numpy.ndarray`
1376 :param weights: Trace weights
1377 :type weights: :py:class:`numpy.ndarray`
1378 :param itmin: Start time index (start time is ``itmin * dt``),
1379 defaults to None
1380 :type itmin: int or None
1381 :param nsamples: Number of samples, defaults to None
1382 :type nsamples: int or None
1383 :param decimate: Decimatation factor, defaults to 1
1384 :type decimate: int
1385 :param interpolation: Interpolation method
1386 ``['nearest_neighbor', 'multilinear', 'off']``, defaults to
1387 ``'nearest_neighbor'``
1388 :type interpolation: str
1389 :param implementation: Implementation to use,
1390 ``['c', 'alternative', 'reference']``, where ``'alternative'``
1391 and ``'reference'`` use a Python implementation, defaults to `'c'`
1392 :type implementation: str
1393 :param optimization: Optimization mode ``['enable', 'disable']``,
1394 defaults to ``'enable'``
1395 :type optimization: str
1396 :returns: Stacked GF trace.
1397 :rtype: :py:class:`~pyrocko.gf.store.GFTrace`
1398 '''
1400 store, decimate_ = self._decimated_store(decimate)
1402 if interpolation == 'nearest_neighbor':
1403 irecords = store.config.irecords(*args)
1404 else:
1405 assert interpolation == 'multilinear'
1406 irecords, ip_weights = store.config.vicinities(*args)
1407 neach = irecords.size // args[0].size
1408 weights = num.repeat(weights, neach) * ip_weights
1409 delays = num.repeat(delays, neach)
1411 tr = store._sum(irecords, delays, weights, itmin, nsamples, decimate_,
1412 implementation, optimization)
1414 # to prevent problems with rounding errors (BaseStore saves deltat
1415 # as a 4-byte floating point value, value from YAML config is more
1416 # accurate)
1417 tr.deltat = self.config.deltat * decimate
1418 return tr
1420 def make_decimated(self, decimate, config=None, force=False,
1421 show_progress=False):
1422 '''
1423 Create decimated version of GF store.
1425 Create a downsampled version of the GF store. Downsampling is done for
1426 the integer factor ``decimate`` which should be in the range [2,8]. If
1427 ``config`` is ``None``, all traces of the GF store are decimated and
1428 held available (i.e. the index mapping of the original store is used),
1429 otherwise, a different spacial stepping can be specified by giving a
1430 modified GF store configuration in ``config`` (see :py:meth:`create`).
1431 Decimated GF sub-stores are created under the ``decimated``
1432 subdirectory within the GF store directory. Holding available decimated
1433 versions of the GF store can save computation time, IO bandwidth, or
1434 decrease memory footprint at the cost of increased disk space usage,
1435 when computation are done for lower frequency signals.
1437 :param decimate: Decimate factor
1438 :type decimate: int
1439 :param config: GF store config object, defaults to None
1440 :type config: :py:class:`~pyrocko.gf.meta.Config` or None
1441 :param force: Force overwrite, defaults to ``False``
1442 :type force: bool
1443 :param show_progress: Show progress, defaults to ``False``
1444 :type show_progress: bool
1445 '''
1447 if not self._f_index:
1448 self.open()
1450 if not (2 <= decimate <= 8):
1451 raise StoreError('decimate argument must be in the range [2,8]')
1453 assert self.mode == 'r'
1455 if config is None:
1456 config = self.config
1458 config = copy.deepcopy(config)
1459 config.sample_rate = self.config.sample_rate / decimate
1461 if decimate in self._decimated:
1462 del self._decimated[decimate]
1464 store_dir = self._decimated_store_dir(decimate)
1465 if os.path.exists(store_dir):
1466 if force:
1467 shutil.rmtree(store_dir)
1468 else:
1469 raise CannotCreate('store already exists at %s' % store_dir)
1471 store_dir_incomplete = store_dir + '-incomplete'
1472 Store.create(store_dir_incomplete, config, force=force)
1474 decimated = Store(store_dir_incomplete, 'w')
1475 try:
1476 if show_progress:
1477 pbar = util.progressbar(
1478 'decimating store', self.config.nrecords)
1480 for i, args in enumerate(decimated.config.iter_nodes()):
1481 tr = self.get(args, decimate=decimate)
1482 decimated.put(args, tr)
1484 if show_progress:
1485 pbar.update(i+1)
1487 finally:
1488 if show_progress:
1489 pbar.finish()
1491 decimated.close()
1493 shutil.move(store_dir_incomplete, store_dir)
1495 self._decimated[decimate] = None
1497 def stats(self):
1498 stats = BaseStore.stats(self)
1499 stats['decimated'] = sorted(self._decimated.keys())
1500 return stats
1502 stats_keys = BaseStore.stats_keys + ['decimated']
1504 def check(self, show_progress=False):
1505 have_holes = []
1506 for pdef in self.config.tabulated_phases:
1507 phase_id = pdef.id
1508 ph = self.get_stored_phase(phase_id)
1509 if ph.check_holes():
1510 have_holes.append(phase_id)
1512 if have_holes:
1513 for phase_id in have_holes:
1514 logger.warning(
1515 'Travel time table of phase "{}" contains holes. You can '
1516 ' use `fomosto tttlsd` to fix holes.'.format(
1517 phase_id))
1518 else:
1519 logger.info('No holes in travel time tables')
1521 try:
1522 if show_progress:
1523 pbar = util.progressbar('checking store', self.config.nrecords)
1525 problems = 0
1526 for i, args in enumerate(self.config.iter_nodes()):
1527 tr = self.get(args)
1528 if tr and not tr.is_zero:
1529 if not tr.begin_value == tr.data[0]:
1530 logger.warning('wrong begin value for trace at %s '
1531 '(data corruption?)' % str(args))
1532 problems += 1
1533 if not tr.end_value == tr.data[-1]:
1534 logger.warning('wrong end value for trace at %s '
1535 '(data corruption?)' % str(args))
1536 problems += 1
1537 if not num.all(num.isfinite(tr.data)):
1538 logger.warning(
1539 'nans or infs in trace at %s' % str(args))
1540 problems += 1
1542 if show_progress:
1543 pbar.update(i+1)
1545 finally:
1546 if show_progress:
1547 pbar.finish()
1549 return problems
1551 def check_earthmodels(self, config):
1552 if config.earthmodel_receiver_1d.profile('z')[-1] not in\
1553 config.earthmodel_1d.profile('z'):
1554 logger.warning('deepest layer of earthmodel_receiver_1d not '
1555 'found in earthmodel_1d')
1557 def _decimated_store_dir(self, decimate):
1558 return os.path.join(self.store_dir, 'decimated', str(decimate))
1560 def _decimated_store(self, decimate):
1561 if decimate == 1 or decimate not in self._decimated:
1562 return self, decimate
1563 else:
1564 store = self._decimated[decimate]
1565 if store is None:
1566 store = Store(self._decimated_store_dir(decimate), 'r')
1567 self._decimated[decimate] = store
1569 return store, 1
1571 def phase_filename(self, phase_id, attribute='phase'):
1572 check_string_id(phase_id)
1573 return os.path.join(
1574 self.store_dir, 'phases', phase_id + '.%s' % attribute)
1576 def get_phase_identifier(self, phase_id, attribute):
1577 return '{}.{}'.format(phase_id, attribute)
1579 def get_stored_phase(self, phase_def, attribute='phase'):
1580 '''
1581 Get stored phase from GF store.
1583 :returns: Phase information
1584 :rtype: :py:class:`pyrocko.spit.SPTree`
1585 '''
1587 phase_id = self.get_phase_identifier(phase_def, attribute)
1588 if phase_id not in self._phases:
1589 fn = self.phase_filename(phase_def, attribute)
1590 if not os.path.isfile(fn):
1591 raise NoSuchPhase(phase_id)
1593 spt = spit.SPTree(filename=fn)
1594 self._phases[phase_id] = spt
1596 return self._phases[phase_id]
1598 def get_phase(self, phase_def):
1599 toks = phase_def.split(':', 1)
1600 if len(toks) == 2:
1601 provider, phase_def = toks
1602 else:
1603 provider, phase_def = 'stored', toks[0]
1605 if provider == 'stored':
1606 return self.get_stored_phase(phase_def)
1608 elif provider == 'vel':
1609 vel = float(phase_def) * 1000.
1611 def evaluate(args):
1612 return self.config.get_distance(args) / vel
1614 return evaluate
1616 elif provider == 'vel_surface':
1617 vel = float(phase_def) * 1000.
1619 def evaluate(args):
1620 return self.config.get_surface_distance(args) / vel
1622 return evaluate
1624 elif provider in ('cake', 'iaspei'):
1625 from pyrocko import cake
1626 mod = self.config.earthmodel_1d
1627 if provider == 'cake':
1628 phases = [cake.PhaseDef(phase_def)]
1629 else:
1630 phases = cake.PhaseDef.classic(phase_def)
1632 def evaluate(args):
1633 if len(args) == 2:
1634 zr, zs, x = (self.config.receiver_depth,) + args
1635 elif len(args) == 3:
1636 zr, zs, x = args
1637 else:
1638 assert False
1640 t = []
1641 if phases:
1642 rays = mod.arrivals(
1643 phases=phases,
1644 distances=[x*cake.m2d],
1645 zstart=zs,
1646 zstop=zr)
1648 for ray in rays:
1649 t.append(ray.t)
1651 if t:
1652 return min(t)
1653 else:
1654 return None
1656 return evaluate
1658 raise StoreError('unsupported phase provider: %s' % provider)
1660 def t(self, timing, *args):
1661 '''
1662 Compute interpolated phase arrivals.
1664 **Examples:**
1666 If ``test_store`` is a :py:class:`Type A<pyrocko.gf.meta.ConfigTypeA>`
1667 store::
1669 test_store.t('stored:p', (1000, 10000))
1670 test_store.t('last{stored:P|stored:Pdiff}', (1000, 10000))
1671 # The latter arrival
1672 # of P or diffracted
1673 # P phase
1675 If ``test_store`` is a :py:class:`Type B<pyrocko.gf.meta.ConfigTypeB>`
1676 store::
1678 test_store.t('S', (1000, 1000, 10000))
1679 test_store.t('first{P|p|Pdiff|sP}', (1000, 1000, 10000))
1680 # The first arrival of
1681 # the given phases is
1682 # selected
1684 Independent of the store type, it is also possible to specify two
1685 location objects and the GF index tuple is calculated internally::
1687 test_store.t('p', source, target)
1689 :param timing: travel-time definition
1690 :type timing: str or :py:class:`~pyrocko.gf.meta.Timing`
1691 :param \\*args: if ``len(args) == 1``, ``args[0]`` must be a
1692 :py:class:`GF index tuple <pyrocko.gf.meta.Config>`, e.g.
1693 ``(source_depth, distance, component)`` for a
1694 :py:class:`Type A<pyrocko.gf.meta.ConfigTypeA>` store. If
1695 ``len(args) == 2``, two location objects are expected, e.g.
1696 ``(source, receiver)`` and the appropriate GF index is computed
1697 internally.
1698 :type \\*args: (:py:class:`tuple`,) or
1699 (:py:class:`~pyrocko.model.Location`,
1700 :py:class:`~pyrocko.model.Location`)
1701 :returns: Phase arrival according to ``timing``
1702 :rtype: float or None
1703 '''
1705 if len(args) == 1:
1706 args = args[0]
1707 else:
1708 args = self.config.make_indexing_args1(*args)
1710 if not isinstance(timing, meta.Timing):
1711 timing = meta.Timing(timing)
1713 return timing.evaluate(self.get_phase, args)
1715 def get_available_interpolation_tables(self):
1716 filenames = glob(op.join(self.store_dir, 'phases', '*'))
1717 return [op.basename(file) for file in filenames]
1719 def get_stored_attribute(self, phase_def, attribute, *args):
1720 '''
1721 Return interpolated store attribute
1723 :param attribute: takeoff_angle / incidence_angle [deg]
1724 :type attribute: str
1725 :param \\*args: :py:class:`~pyrocko.gf.meta.Config` index tuple, e.g.
1726 ``(source_depth, distance, component)`` as in
1727 :py:class:`~pyrocko.gf.meta.ConfigTypeA`.
1728 :type \\*args: tuple
1729 '''
1730 try:
1731 return self.get_stored_phase(phase_def, attribute)(*args)
1732 except NoSuchPhase:
1733 raise StoreError(
1734 'Interpolation table for {} of {} does not exist! '
1735 'Available tables: {}'.format(
1736 attribute, phase_def,
1737 self.get_available_interpolation_tables()))
1739 def get_many_stored_attributes(self, phase_def, attribute, coords):
1740 '''
1741 Return interpolated store attribute
1743 :param attribute: takeoff_angle / incidence_angle [deg]
1744 :type attribute: str
1745 :param \\coords: :py:class:`num.array.Array`, with columns being
1746 ``(source_depth, distance, component)`` as in
1747 :py:class:`~pyrocko.gf.meta.ConfigTypeA`.
1748 :type \\coords: :py:class:`num.array.Array`
1749 '''
1750 try:
1751 return self.get_stored_phase(
1752 phase_def, attribute).interpolate_many(coords)
1753 except NoSuchPhase:
1754 raise StoreError(
1755 'Interpolation table for {} of {} does not exist! '
1756 'Available tables: {}'.format(
1757 attribute, phase_def,
1758 self.get_available_interpolation_tables()))
1760 def make_stored_table(self, attribute, force=False):
1761 '''
1762 Compute tables for selected ray attributes.
1764 :param attribute: phase / takeoff_angle [deg]/ incidence_angle [deg]
1765 :type attribute: str
1767 Tables are computed using the 1D earth model defined in
1768 :py:attr:`~pyrocko.gf.meta.Config.earthmodel_1d` for each defined phase
1769 in :py:attr:`~pyrocko.gf.meta.Config.tabulated_phases`.
1770 '''
1772 if attribute not in available_stored_tables:
1773 raise StoreError(
1774 'Cannot create attribute table for {}! '
1775 'Supported attribute tables: {}'.format(
1776 attribute, available_stored_tables))
1778 from pyrocko import cake
1779 config = self.config
1781 if not config.tabulated_phases:
1782 return
1784 mod = config.earthmodel_1d
1786 if not mod:
1787 raise StoreError('no earth model found')
1789 if config.earthmodel_receiver_1d:
1790 self.check_earthmodels(config)
1792 for pdef in config.tabulated_phases:
1794 phase_id = pdef.id
1795 phases = pdef.phases
1797 if attribute == 'phase':
1798 ftol = config.deltat * 0.5
1799 horvels = pdef.horizontal_velocities
1800 else:
1801 ftol = config.deltat * 0.01
1803 fn = self.phase_filename(phase_id, attribute)
1805 if os.path.exists(fn) and not force:
1806 logger.info('file already exists: %s' % fn)
1807 continue
1809 def evaluate(args):
1811 nargs = len(args)
1812 if nargs == 2:
1813 receiver_depth, source_depth, distance = (
1814 config.receiver_depth,) + args
1815 elif nargs == 3:
1816 receiver_depth, source_depth, distance = args
1817 else:
1818 raise ValueError(
1819 'Number of input arguments %i is not supported!'
1820 'Supported number of arguments: 2 or 3' % nargs)
1822 ray_attribute_values = []
1823 arrival_times = []
1824 if phases:
1825 rays = mod.arrivals(
1826 phases=phases,
1827 distances=[distance * cake.m2d],
1828 zstart=source_depth,
1829 zstop=receiver_depth)
1831 for ray in rays:
1832 arrival_times.append(ray.t)
1833 if attribute != 'phase':
1834 ray_attribute_values.append(
1835 getattr(ray, attribute)())
1837 if attribute == 'phase':
1838 for v in horvels:
1839 arrival_times.append(distance / (v * km))
1841 if arrival_times:
1842 if attribute == 'phase':
1843 return min(arrival_times)
1844 else:
1845 earliest_idx = num.argmin(arrival_times)
1846 return ray_attribute_values[earliest_idx]
1847 else:
1848 return None
1850 logger.info(
1851 'making "%s" table for phasegroup "%s"', attribute, phase_id)
1853 ip = spit.SPTree(
1854 f=evaluate,
1855 ftol=ftol,
1856 xbounds=num.transpose((config.mins, config.maxs)),
1857 xtols=config.deltas)
1859 util.ensuredirs(fn)
1860 ip.dump(fn)
1862 def make_timing_params(self, begin, end, snap_vred=True, force=False):
1863 '''
1864 Compute tight parameterized time ranges to include given timings.
1866 Calculates appropriate time ranges to cover given begin and end timings
1867 over all GF points in the store. A dict with the following keys is
1868 returned:
1870 * ``'tmin'``: time [s], minimum of begin timing over all GF points
1871 * ``'tmax'``: time [s], maximum of end timing over all GF points
1872 * ``'vred'``, ``'tmin_vred'``: slope [m/s] and offset [s] of reduction
1873 velocity [m/s] appropriate to catch begin timing over all GF points
1874 * ``'tlenmax_vred'``: maximum time length needed to cover all end
1875 timings, when using linear slope given with (``vred``, ``tmin_vred``)
1876 as start
1877 '''
1879 data = []
1880 warned = set()
1881 for args in self.config.iter_nodes(level=-1):
1882 tmin = self.t(begin, args)
1883 tmax = self.t(end, args)
1884 if tmin is None:
1885 warned.add(str(begin))
1886 if tmax is None:
1887 warned.add(str(end))
1889 x = self.config.get_surface_distance(args)
1890 data.append((x, tmin, tmax))
1892 if len(warned):
1893 w = ' | '.join(list(warned))
1894 msg = '''determination of time window failed using phase
1895definitions: %s.\n Travel time table contains holes in probed ranges. You can
1896use `fomosto tttlsd` to fix holes.''' % w
1897 if force:
1898 logger.warning(msg)
1899 else:
1900 raise MakeTimingParamsFailed(msg)
1902 xs, tmins, tmaxs = num.array(data, dtype=float).T
1904 tlens = tmaxs - tmins
1906 i = num.nanargmin(tmins)
1907 if not num.isfinite(i):
1908 raise MakeTimingParamsFailed('determination of time window failed')
1910 tminmin = tmins[i]
1911 x_tminmin = xs[i]
1912 dx = (xs - x_tminmin)
1913 dx = num.where(dx != 0.0, dx, num.nan)
1914 s = (tmins - tminmin) / dx
1915 sred = num.min(num.abs(s[num.isfinite(s)]))
1917 deltax = self.config.distance_delta
1919 if snap_vred:
1920 tdif = sred*deltax
1921 tdif2 = self.config.deltat * math.floor(tdif / self.config.deltat)
1922 sred = tdif2/self.config.distance_delta
1924 tmin_vred = tminmin - sred*x_tminmin
1925 if snap_vred:
1926 xe = x_tminmin - int(x_tminmin / deltax) * deltax
1927 tmin_vred = float(
1928 self.config.deltat *
1929 math.floor(tmin_vred / self.config.deltat) - xe * sred)
1931 tlenmax_vred = num.nanmax(tmax - (tmin_vred + sred*x))
1932 if sred != 0.0:
1933 vred = 1.0/sred
1934 else:
1935 vred = 0.0
1937 return dict(
1938 tmin=tminmin,
1939 tmax=num.nanmax(tmaxs),
1940 tlenmax=num.nanmax(tlens),
1941 tmin_vred=tmin_vred,
1942 tlenmax_vred=tlenmax_vred,
1943 vred=vred)
1945 def make_travel_time_tables(self, force=False):
1946 '''
1947 Compute travel time tables.
1949 Travel time tables are computed using the 1D earth model defined in
1950 :py:attr:`~pyrocko.gf.meta.Config.earthmodel_1d` for each defined phase
1951 in :py:attr:`~pyrocko.gf.meta.Config.tabulated_phases`. The accuracy of
1952 the tablulated times is adjusted to the sampling rate of the store.
1953 '''
1954 self.make_stored_table(attribute='phase', force=force)
1956 def make_ttt(self, force=False):
1957 self.make_travel_time_tables(force=force)
1959 def make_takeoff_angle_tables(self, force=False):
1960 '''
1961 Compute takeoff-angle tables.
1963 Takeoff-angle tables [deg] are computed using the 1D earth model
1964 defined in :py:attr:`~pyrocko.gf.meta.Config.earthmodel_1d` for each
1965 defined phase in :py:attr:`~pyrocko.gf.meta.Config.tabulated_phases`.
1966 The accuracy of the tablulated times is adjusted to 0.01 times the
1967 sampling rate of the store.
1968 '''
1969 self.make_stored_table(attribute='takeoff_angle', force=force)
1971 def make_incidence_angle_tables(self, force=False):
1972 '''
1973 Compute incidence-angle tables.
1975 Incidence-angle tables [deg] are computed using the 1D earth model
1976 defined in :py:attr:`~pyrocko.gf.meta.Config.earthmodel_1d` for each
1977 defined phase in :py:attr:`~pyrocko.gf.meta.Config.tabulated_phases`.
1978 The accuracy of the tablulated times is adjusted to 0.01 times the
1979 sampling rate of the store.
1980 '''
1981 self.make_stored_table(attribute='incidence_angle', force=force)
1983 def get_provided_components(self):
1985 scheme_desc = meta.component_scheme_to_description[
1986 self.config.component_scheme]
1988 quantity = self.config.stored_quantity \
1989 or scheme_desc.default_stored_quantity
1991 if not quantity:
1992 return scheme_desc.provided_components
1993 else:
1994 return [
1995 quantity + '.' + comp
1996 for comp in scheme_desc.provided_components]
1998 def fix_ttt_holes(self, phase_id):
2000 pdef = self.config.get_tabulated_phase(phase_id)
2001 mode = None
2002 for phase in pdef.phases:
2003 for leg in phase.legs():
2004 if mode is None:
2005 mode = leg.mode
2007 else:
2008 if mode != leg.mode:
2009 raise StoreError(
2010 'Can only fix holes in pure P or pure S phases.')
2012 sptree = self.get_stored_phase(phase_id)
2013 sptree_lsd = self.config.fix_ttt_holes(sptree, mode)
2015 phase_lsd = phase_id + '.lsd'
2016 fn = self.phase_filename(phase_lsd)
2017 sptree_lsd.dump(fn)
2019 def statics(self, source, multi_location, itsnapshot, components,
2020 interpolation='nearest_neighbor', nthreads=0):
2021 if not self._f_index:
2022 self.open()
2024 out = {}
2025 ntargets = multi_location.ntargets
2026 source_terms = source.get_source_terms(self.config.component_scheme)
2027 # TODO: deal with delays for snapshots > 1 sample
2029 if itsnapshot is not None:
2030 delays = source.times
2032 # Fringe case where we sample at sample 0 and sample 1
2033 tsnapshot = itsnapshot * self.config.deltat
2034 if delays.max() == tsnapshot and delays.min() != tsnapshot:
2035 delays[delays == delays.max()] -= self.config.deltat
2037 else:
2038 delays = source.times * 0
2039 itsnapshot = 1
2041 if ntargets == 0:
2042 raise StoreError('MultiLocation.coords5 is empty')
2044 res = store_ext.store_calc_static(
2045 self.cstore,
2046 source.coords5(),
2047 source_terms,
2048 delays,
2049 multi_location.coords5,
2050 self.config.component_scheme,
2051 interpolation,
2052 itsnapshot,
2053 nthreads)
2055 out = {}
2056 for icomp, (comp, comp_res) in enumerate(
2057 zip(self.get_provided_components(), res)):
2058 if comp not in components:
2059 continue
2060 out[comp] = res[icomp]
2062 return out
2064 def calc_seismograms(self, source, receivers, components, deltat=None,
2065 itmin=None, nsamples=None,
2066 interpolation='nearest_neighbor',
2067 optimization='enable', nthreads=1):
2068 config = self.config
2070 assert interpolation in ['nearest_neighbor', 'multilinear'], \
2071 'Unknown interpolation: %s' % interpolation
2073 if not isinstance(receivers, list):
2074 receivers = [receivers]
2076 if deltat is None:
2077 decimate = 1
2078 else:
2079 decimate = int(round(deltat/config.deltat))
2080 if abs(deltat / (decimate * config.deltat) - 1.0) > 0.001:
2081 raise StoreError(
2082 'unavailable decimation ratio target.deltat / store.deltat'
2083 ' = %g / %g' % (deltat, config.deltat))
2085 store, decimate_ = self._decimated_store(decimate)
2087 if not store._f_index:
2088 store.open()
2090 scheme = config.component_scheme
2092 source_coords_arr = source.coords5()
2093 source_terms = source.get_source_terms(scheme)
2094 delays = source.times.ravel()
2096 nreceiver = len(receivers)
2097 receiver_coords_arr = num.empty((nreceiver, 5))
2098 for irec, rec in enumerate(receivers):
2099 receiver_coords_arr[irec, :] = rec.coords5
2101 dt = self.get_deltat()
2103 itoffset = int(num.floor(delays.min()/dt)) if delays.size != 0 else 0
2105 if itmin is None:
2106 itmin = num.zeros(nreceiver, dtype=num.int32)
2107 else:
2108 itmin = (itmin-itoffset).astype(num.int32)
2110 if nsamples is None:
2111 nsamples = num.zeros(nreceiver, dtype=num.int32) - 1
2112 else:
2113 nsamples = nsamples.astype(num.int32)
2115 try:
2116 results = store_ext.store_calc_timeseries(
2117 store.cstore,
2118 source_coords_arr,
2119 source_terms,
2120 (delays - itoffset*dt),
2121 receiver_coords_arr,
2122 scheme,
2123 interpolation,
2124 itmin,
2125 nsamples,
2126 nthreads)
2127 except Exception as e:
2128 raise e
2130 provided_components = self.get_provided_components()
2131 ncomponents = len(provided_components)
2133 seismograms = [dict() for _ in range(nreceiver)]
2134 for ires in range(len(results)):
2135 res = results.pop(0)
2136 ireceiver = ires // ncomponents
2138 comp = provided_components[res[-2]]
2140 if comp not in components:
2141 continue
2143 tr = GFTrace(*res[:-2])
2144 tr.deltat = config.deltat * decimate
2145 tr.itmin += itoffset
2147 tr.n_records_stacked = 0
2148 tr.t_optimize = 0.
2149 tr.t_stack = 0.
2150 tr.err = res[-1]
2152 seismograms[ireceiver][comp] = tr
2154 return seismograms
2156 def seismogram(self, source, receiver, components, deltat=None,
2157 itmin=None, nsamples=None,
2158 interpolation='nearest_neighbor',
2159 optimization='enable', nthreads=1):
2161 config = self.config
2163 if deltat is None:
2164 decimate = 1
2165 else:
2166 decimate = int(round(deltat/config.deltat))
2167 if abs(deltat / (decimate * config.deltat) - 1.0) > 0.001:
2168 raise StoreError(
2169 'unavailable decimation ratio target.deltat / store.deltat'
2170 ' = %g / %g' % (deltat, config.deltat))
2172 store, decimate_ = self._decimated_store(decimate)
2174 if not store._f_index:
2175 store.open()
2177 scheme = config.component_scheme
2179 source_coords_arr = source.coords5()
2180 source_terms = source.get_source_terms(scheme)
2181 receiver_coords_arr = receiver.coords5[num.newaxis, :].copy()
2183 try:
2184 params = store_ext.make_sum_params(
2185 store.cstore,
2186 source_coords_arr,
2187 source_terms,
2188 receiver_coords_arr,
2189 scheme,
2190 interpolation, nthreads)
2192 except store_ext.StoreExtError:
2193 raise meta.OutOfBounds()
2195 provided_components = self.get_provided_components()
2197 out = {}
2198 for icomp, comp in enumerate(provided_components):
2199 if comp in components:
2200 weights, irecords = params[icomp]
2202 neach = irecords.size // source.times.size
2203 delays = num.repeat(source.times, neach)
2205 tr = store._sum(
2206 irecords, delays, weights, itmin, nsamples, decimate_,
2207 'c', optimization)
2209 # to prevent problems with rounding errors (BaseStore saves
2210 # deltat as a 4-byte floating point value, value from YAML
2211 # config is more accurate)
2212 tr.deltat = config.deltat * decimate
2214 out[comp] = tr
2216 return out
2219__all__ = '''
2220gf_dtype
2221NotMultipleOfSamplingInterval
2222GFTrace
2223CannotCreate
2224CannotOpen
2225DuplicateInsert
2226NotAllowedToInterpolate
2227NoSuchExtra
2228NoSuchPhase
2229BaseStore
2230Store
2231SeismosizerErrorEnum
2232'''.split()