1# http://pyrocko.org - GPLv3
2#
3# The Pyrocko Developers, 21st Century
4# ---|P------/S----------~Lg----------
6import errno
7import time
8import os
9import struct
10import weakref
11import math
12import shutil
13try:
14 import fcntl
15except ImportError:
16 fcntl = None
17import copy
18import logging
19import re
20import hashlib
21from glob import glob
23import numpy as num
24from scipy import signal
26from . import meta
27from .error import StoreError
28try:
29 from . import store_ext
30except ImportError:
31 store_ext = None
32from pyrocko import util, spit
34logger = logging.getLogger('pyrocko.gf.store')
35op = os.path
37# gf store endianness
38E = '<'
40gf_dtype = num.dtype(num.float32)
41gf_dtype_store = num.dtype(E + 'f4')
43gf_dtype_nbytes_per_sample = 4
45gf_store_header_dtype = [
46 ('nrecords', E + 'u8'),
47 ('deltat', E + 'f4'),
48]
50gf_store_header_fmt = E + 'Qf'
51gf_store_header_fmt_size = struct.calcsize(gf_store_header_fmt)
53gf_record_dtype = num.dtype([
54 ('data_offset', E + 'u8'),
55 ('itmin', E + 'i4'),
56 ('nsamples', E + 'u4'),
57 ('begin_value', E + 'f4'),
58 ('end_value', E + 'f4'),
59])
61available_stored_tables = ['phase', 'takeoff_angle', 'incidence_angle']
63km = 1000.
66class SeismosizerErrorEnum:
67 SUCCESS = 0
68 INVALID_RECORD = 1
69 EMPTY_RECORD = 2
70 BAD_RECORD = 3
71 ALLOC_FAILED = 4
72 BAD_REQUEST = 5
73 BAD_DATA_OFFSET = 6
74 READ_DATA_FAILED = 7
75 SEEK_INDEX_FAILED = 8
76 READ_INDEX_FAILED = 9
77 FSTAT_TRACES_FAILED = 10
78 BAD_STORE = 11
79 MMAP_INDEX_FAILED = 12
80 MMAP_TRACES_FAILED = 13
81 INDEX_OUT_OF_BOUNDS = 14
82 NTARGETS_OUT_OF_BOUNDS = 15
85def valid_string_id(s):
86 return re.match(meta.StringID.pattern, s)
89def check_string_id(s):
90 if not valid_string_id(s):
91 raise ValueError('invalid name %s' % s)
93# - data_offset
94#
95# Data file offset of first sample in bytes (the seek value).
96# Special values:
97#
98# 0x00 - missing trace
99# 0x01 - zero trace (GF trace is physically zero)
100# 0x02 - short trace of 1 or 2 samples (no need for data allocation)
101#
102# - itmin
103#
104# Time of first sample of the trace as a multiple of the sampling interval. All
105# GF samples must be evaluated at multiples of the sampling interval with
106# respect to a global reference time zero. Must be set also for zero traces.
107#
108# - nsamples
109#
110# Number of samples in the GF trace. Should be set to zero for zero traces.
111#
112# - begin_value, end_value
113#
114# Values of first and last sample. These values are included in data[]
115# redunantly.
118class NotMultipleOfSamplingInterval(Exception):
119 pass
122sampling_check_eps = 1e-5
125class GFTrace(object):
126 '''
127 Green's Function trace class for handling traces from the GF store.
128 '''
130 @classmethod
131 def from_trace(cls, tr):
132 return cls(data=tr.ydata.copy(), tmin=tr.tmin, deltat=tr.deltat)
134 def __init__(self, data=None, itmin=None, deltat=1.0,
135 is_zero=False, begin_value=None, end_value=None, tmin=None):
137 assert sum((x is None) for x in (tmin, itmin)) == 1, \
138 'GFTrace: either tmin or itmin must be given'\
140 if tmin is not None:
141 itmin = int(round(tmin / deltat))
142 if abs(itmin*deltat - tmin) > sampling_check_eps*deltat:
143 raise NotMultipleOfSamplingInterval(
144 'GFTrace: tmin (%g) is not a multiple of sampling '
145 'interval (%g)' % (tmin, deltat))
147 if data is not None:
148 data = num.asarray(data, dtype=gf_dtype)
149 else:
150 data = num.array([], dtype=gf_dtype)
151 begin_value = 0.0
152 end_value = 0.0
154 self.data = weakref.ref(data)()
155 self.itmin = itmin
156 self.deltat = deltat
157 self.is_zero = is_zero
158 self.n_records_stacked = 0.
159 self.t_stack = 0.
160 self.t_optimize = 0.
161 self.err = SeismosizerErrorEnum.SUCCESS
163 if data is not None and data.size > 0:
164 if begin_value is None:
165 begin_value = data[0]
166 if end_value is None:
167 end_value = data[-1]
169 self.begin_value = begin_value
170 self.end_value = end_value
172 @property
173 def t(self):
174 '''
175 Time vector of the GF trace.
177 :returns: Time vector
178 :rtype: :py:class:`numpy.ndarray`
179 '''
180 return num.linspace(
181 self.itmin*self.deltat,
182 (self.itmin+self.data.size-1)*self.deltat, self.data.size)
184 def __str__(self, itmin=0):
186 if self.is_zero:
187 return 'ZERO'
189 s = []
190 for i in range(itmin, self.itmin + self.data.size):
191 if i >= self.itmin and i < self.itmin + self.data.size:
192 s.append('%7.4g' % self.data[i-self.itmin])
193 else:
194 s.append(' '*7)
196 return '|'.join(s)
198 def to_trace(self, net, sta, loc, cha):
199 from pyrocko import trace
200 return trace.Trace(
201 net, sta, loc, cha,
202 ydata=self.data,
203 deltat=self.deltat,
204 tmin=self.itmin*self.deltat)
207class GFValue(object):
209 def __init__(self, value):
210 self.value = value
211 self.n_records_stacked = 0.
212 self.t_stack = 0.
213 self.t_optimize = 0.
216Zero = GFTrace(is_zero=True, itmin=0)
219def make_same_span(tracesdict):
221 traces = tracesdict.values()
223 nonzero = [tr for tr in traces if not tr.is_zero]
224 if not nonzero:
225 return {k: Zero for k in tracesdict.keys()}
227 deltat = nonzero[0].deltat
229 itmin = min(tr.itmin for tr in nonzero)
230 itmax = max(tr.itmin+tr.data.size for tr in nonzero) - 1
232 out = {}
233 for k, tr in tracesdict.items():
234 n = itmax - itmin + 1
235 if tr.itmin != itmin or tr.data.size != n:
236 data = num.zeros(n, dtype=gf_dtype)
237 if not tr.is_zero:
238 lo = tr.itmin-itmin
239 hi = lo + tr.data.size
240 data[:lo] = tr.data[0]
241 data[lo:hi] = tr.data
242 data[hi:] = tr.data[-1]
244 tr = GFTrace(data, itmin, deltat)
246 out[k] = tr
248 return out
251class CannotCreate(StoreError):
252 pass
255class CannotOpen(StoreError):
256 pass
259class DuplicateInsert(StoreError):
260 pass
263class ShortRead(StoreError):
264 def __str__(self):
265 return 'unexpected end of data (truncated traces file?)'
268class NotAllowedToInterpolate(StoreError):
269 def __str__(self):
270 return 'not allowed to interpolate'
273class NoSuchExtra(StoreError):
274 def __init__(self, s):
275 StoreError.__init__(self)
276 self.value = s
278 def __str__(self):
279 return 'extra information for key "%s" not found.' % self.value
282class NoSuchPhase(StoreError):
283 def __init__(self, s):
284 StoreError.__init__(self)
285 self.value = s
287 def __str__(self):
288 return 'phase for key "%s" not found. ' \
289 'Running "fomosto ttt" may be needed.' % self.value
292def remove_if_exists(fn, force=False):
293 if os.path.exists(fn):
294 if force:
295 os.remove(fn)
296 else:
297 raise CannotCreate('file %s already exists' % fn)
300def get_extra_path(store_dir, key):
301 check_string_id(key)
302 return os.path.join(store_dir, 'extra', key)
305class BaseStore(object):
307 @staticmethod
308 def lock_fn_(store_dir):
309 return os.path.join(store_dir, 'lock')
311 @staticmethod
312 def index_fn_(store_dir):
313 return os.path.join(store_dir, 'index')
315 @staticmethod
316 def data_fn_(store_dir):
317 return os.path.join(store_dir, 'traces')
319 @staticmethod
320 def config_fn_(store_dir):
321 return os.path.join(store_dir, 'config')
323 @staticmethod
324 def create(store_dir, deltat, nrecords, force=False):
326 try:
327 util.ensuredir(store_dir)
328 except Exception:
329 raise CannotCreate('cannot create directory %s' % store_dir)
331 index_fn = BaseStore.index_fn_(store_dir)
332 data_fn = BaseStore.data_fn_(store_dir)
334 for fn in (index_fn, data_fn):
335 remove_if_exists(fn, force)
337 with open(index_fn, 'wb') as f:
338 f.write(struct.pack(gf_store_header_fmt, nrecords, deltat))
339 records = num.zeros(nrecords, dtype=gf_record_dtype)
340 records.tofile(f)
342 with open(data_fn, 'wb') as f:
343 f.write(b'\0' * 32)
345 def __init__(self, store_dir, mode='r', use_memmap=True):
346 assert mode in 'rw'
347 self.store_dir = store_dir
348 self.mode = mode
349 self._use_memmap = use_memmap
350 self._nrecords = None
351 self._deltat = None
352 self._f_index = None
353 self._f_data = None
354 self._records = None
355 self.cstore = None
357 def open(self):
358 assert self._f_index is None
359 index_fn = self.index_fn()
360 data_fn = self.data_fn()
362 if self.mode == 'r':
363 fmode = 'rb'
364 elif self.mode == 'w':
365 fmode = 'r+b'
366 else:
367 assert False, 'invalid mode: %s' % self.mode
369 try:
370 self._f_index = open(index_fn, fmode)
371 self._f_data = open(data_fn, fmode)
372 except Exception:
373 self.mode = ''
374 raise CannotOpen('cannot open gf store: %s' % self.store_dir)
376 try:
377 # replace single precision deltat value in store with the double
378 # precision value from the config, if available
379 self.cstore = store_ext.store_init(
380 self._f_index.fileno(), self._f_data.fileno(),
381 self.get_deltat() or 0.0)
383 except store_ext.StoreExtError as e:
384 raise StoreError(str(e))
386 while True:
387 try:
388 dataheader = self._f_index.read(gf_store_header_fmt_size)
389 break
391 except IOError as e:
392 # occasionally got this one on an NFS volume
393 if e.errno == errno.EBUSY:
394 time.sleep(0.01)
395 else:
396 raise
398 nrecords, deltat = struct.unpack(gf_store_header_fmt, dataheader)
399 self._nrecords = nrecords
400 self._deltat = deltat
402 self._load_index()
404 def __del__(self):
405 if self.mode != '':
406 self.close()
408 def lock(self):
409 if not fcntl:
410 lock_fn = self.lock_fn()
411 util.create_lockfile(lock_fn)
412 else:
413 if not self._f_index:
414 self.open()
416 while True:
417 try:
418 fcntl.lockf(self._f_index, fcntl.LOCK_EX)
419 break
421 except IOError as e:
422 if e.errno == errno.ENOLCK:
423 time.sleep(0.01)
424 else:
425 raise
427 def unlock(self):
428 if not fcntl:
429 lock_fn = self.lock_fn()
430 util.delete_lockfile(lock_fn)
431 else:
432 self._f_data.flush()
433 self._f_index.flush()
434 fcntl.lockf(self._f_index, fcntl.LOCK_UN)
436 def put(self, irecord, trace):
437 self._put(irecord, trace)
439 def get_record(self, irecord):
440 return self._get_record(irecord)
442 def get_span(self, irecord, decimate=1):
443 return self._get_span(irecord, decimate=decimate)
445 def get(self, irecord, itmin=None, nsamples=None, decimate=1,
446 implementation='c'):
447 return self._get(irecord, itmin, nsamples, decimate, implementation)
449 def sum(self, irecords, delays, weights, itmin=None,
450 nsamples=None, decimate=1, implementation='c',
451 optimization='enable'):
452 return self._sum(irecords, delays, weights, itmin, nsamples, decimate,
453 implementation, optimization)
455 def irecord_format(self):
456 return util.zfmt(self._nrecords)
458 def str_irecord(self, irecord):
459 return self.irecord_format() % irecord
461 def close(self):
462 if self.mode == 'w':
463 if not self._f_index:
464 self.open()
465 self._save_index()
467 if self._f_data:
468 self._f_data.close()
469 self._f_data = None
471 if self._f_index:
472 self._f_index.close()
473 self._f_index = None
475 del self._records
476 self._records = None
478 self.mode = ''
480 def _get_record(self, irecord):
481 if not self._f_index:
482 self.open()
484 return self._records[irecord]
486 def _get(self, irecord, itmin, nsamples, decimate, implementation):
487 '''
488 Retrieve complete GF trace from storage.
489 '''
491 if not self._f_index:
492 self.open()
494 if not self.mode == 'r':
495 raise StoreError('store not open in read mode')
497 if implementation == 'c' and decimate == 1:
499 if nsamples is None:
500 nsamples = -1
502 if itmin is None:
503 itmin = 0
505 try:
506 return GFTrace(*store_ext.store_get(
507 self.cstore, int(irecord), int(itmin), int(nsamples)))
508 except store_ext.StoreExtError as e:
509 raise StoreError(str(e))
511 else:
512 return self._get_impl_reference(irecord, itmin, nsamples, decimate)
514 def get_deltat(self):
515 return self._deltat
517 def _get_impl_reference(self, irecord, itmin, nsamples, decimate):
518 deltat = self.get_deltat()
520 if not (0 <= irecord < self._nrecords):
521 raise StoreError('invalid record number requested '
522 '(irecord = %i, nrecords = %i)' %
523 (irecord, self._nrecords))
525 ipos, itmin_data, nsamples_data, begin_value, end_value = \
526 self._records[irecord]
528 if None in (itmin, nsamples):
529 itmin = itmin_data
530 itmax = itmin_data + nsamples_data - 1
531 nsamples = nsamples_data
532 else:
533 itmin = itmin * decimate
534 nsamples = nsamples * decimate
535 itmax = itmin + nsamples - decimate
537 if ipos == 0:
538 return None
540 elif ipos == 1:
541 itmin_ext = (max(itmin, itmin_data)//decimate) * decimate
542 return GFTrace(is_zero=True, itmin=itmin_ext//decimate)
544 if decimate == 1:
545 ilo = max(itmin, itmin_data) - itmin_data
546 ihi = min(itmin+nsamples, itmin_data+nsamples_data) - itmin_data
547 data = self._get_data(ipos, begin_value, end_value, ilo, ihi)
549 return GFTrace(data, itmin=itmin_data+ilo, deltat=deltat,
550 begin_value=begin_value, end_value=end_value)
552 else:
553 itmax_data = itmin_data + nsamples_data - 1
555 # put begin and end to multiples of new sampling rate
556 itmin_ext = (max(itmin, itmin_data)//decimate) * decimate
557 itmax_ext = -((-min(itmax, itmax_data))//decimate) * decimate
558 nsamples_ext = itmax_ext - itmin_ext + 1
560 # add some padding for the aa filter
561 order = 30
562 itmin_ext_pad = itmin_ext - order//2
563 itmax_ext_pad = itmax_ext + order//2
564 nsamples_ext_pad = itmax_ext_pad - itmin_ext_pad + 1
566 itmin_overlap = max(itmin_data, itmin_ext_pad)
567 itmax_overlap = min(itmax_data, itmax_ext_pad)
569 ilo = itmin_overlap - itmin_ext_pad
570 ihi = max(ilo, itmax_overlap - itmin_ext_pad + 1)
571 ilo_data = itmin_overlap - itmin_data
572 ihi_data = max(ilo_data, itmax_overlap - itmin_data + 1)
574 data_ext_pad = num.empty(nsamples_ext_pad, dtype=gf_dtype)
575 data_ext_pad[ilo:ihi] = self._get_data(
576 ipos, begin_value, end_value, ilo_data, ihi_data)
578 data_ext_pad[:ilo] = begin_value
579 data_ext_pad[ihi:] = end_value
581 b = signal.firwin(order + 1, 1. / decimate, window='hamming')
582 a = 1.
583 data_filt_pad = signal.lfilter(b, a, data_ext_pad)
584 data_deci = data_filt_pad[order:order+nsamples_ext:decimate]
585 if data_deci.size >= 1:
586 if itmin_ext <= itmin_data:
587 data_deci[0] = begin_value
589 if itmax_ext >= itmax_data:
590 data_deci[-1] = end_value
592 return GFTrace(data_deci, itmin_ext//decimate,
593 deltat*decimate,
594 begin_value=begin_value, end_value=end_value)
596 def _get_span(self, irecord, decimate=1):
597 '''
598 Get temporal extent of GF trace at given index.
599 '''
601 if not self._f_index:
602 self.open()
604 assert 0 <= irecord < self._nrecords, \
605 'irecord = %i, nrecords = %i' % (irecord, self._nrecords)
607 (_, itmin, nsamples, _, _) = self._records[irecord]
609 itmax = itmin + nsamples - 1
611 if decimate == 1:
612 return itmin, itmax
613 else:
614 if nsamples == 0:
615 return itmin//decimate, itmin//decimate - 1
616 else:
617 return itmin//decimate, -((-itmax)//decimate)
619 def _put(self, irecord, trace):
620 '''
621 Save GF trace to storage.
622 '''
624 if not self._f_index:
625 self.open()
627 deltat = self.get_deltat()
629 assert self.mode == 'w'
630 assert trace.is_zero or \
631 abs(trace.deltat - deltat) < 1e-7 * deltat
632 assert 0 <= irecord < self._nrecords, \
633 'irecord = %i, nrecords = %i' % (irecord, self._nrecords)
635 if self._records[irecord][0] != 0:
636 raise DuplicateInsert('record %i already in store' % irecord)
638 if trace.is_zero or num.all(trace.data == 0.0):
639 self._records[irecord] = (1, trace.itmin, 0, 0., 0.)
640 return
642 ndata = trace.data.size
644 if ndata > 2:
645 self._f_data.seek(0, 2)
646 ipos = self._f_data.tell()
647 trace.data.astype(gf_dtype_store).tofile(self._f_data)
648 else:
649 ipos = 2
651 self._records[irecord] = (ipos, trace.itmin, ndata,
652 trace.data[0], trace.data[-1])
654 def _sum_impl_alternative(self, irecords, delays, weights, itmin, nsamples,
655 decimate):
657 '''
658 Sum delayed and weighted GF traces.
659 '''
661 if not self._f_index:
662 self.open()
664 assert self.mode == 'r'
666 deltat = self.get_deltat() * decimate
668 if len(irecords) == 0:
669 if None in (itmin, nsamples):
670 return Zero
671 else:
672 return GFTrace(
673 num.zeros(nsamples, dtype=gf_dtype), itmin,
674 deltat, is_zero=True)
676 assert len(irecords) == len(delays)
677 assert len(irecords) == len(weights)
679 if None in (itmin, nsamples):
680 itmin_delayed, itmax_delayed = [], []
681 for irecord, delay in zip(irecords, delays):
682 itmin, itmax = self._get_span(irecord, decimate=decimate)
683 itmin_delayed.append(itmin + delay/deltat)
684 itmax_delayed.append(itmax + delay/deltat)
686 itmin = int(math.floor(min(itmin_delayed)))
687 nsamples = int(math.ceil(max(itmax_delayed))) - itmin + 1
689 out = num.zeros(nsamples, dtype=gf_dtype)
690 if nsamples == 0:
691 return GFTrace(out, itmin, deltat)
693 for ii in range(len(irecords)):
694 irecord = irecords[ii]
695 delay = delays[ii]
696 weight = weights[ii]
698 if weight == 0.0:
699 continue
701 idelay_floor = int(math.floor(delay/deltat))
702 idelay_ceil = int(math.ceil(delay/deltat))
704 gf_trace = self._get(
705 irecord,
706 itmin - idelay_ceil,
707 nsamples + idelay_ceil - idelay_floor,
708 decimate,
709 'reference')
711 assert gf_trace.itmin >= itmin - idelay_ceil
712 assert gf_trace.data.size <= nsamples + idelay_ceil - idelay_floor
714 if gf_trace.is_zero:
715 continue
717 ilo = gf_trace.itmin - itmin + idelay_floor
718 ihi = ilo + gf_trace.data.size + (idelay_ceil-idelay_floor)
720 data = gf_trace.data
722 if idelay_floor == idelay_ceil:
723 out[ilo:ihi] += data * weight
724 else:
725 if data.size:
726 k = 1
727 if ihi <= nsamples:
728 out[ihi-1] += gf_trace.end_value * \
729 ((idelay_ceil-delay/deltat) * weight)
730 k = 0
732 out[ilo+1:ihi-k] += data[:data.size-k] * \
733 ((delay/deltat-idelay_floor) * weight)
734 k = 1
735 if ilo >= 0:
736 out[ilo] += gf_trace.begin_value * \
737 ((delay/deltat-idelay_floor) * weight)
738 k = 0
740 out[ilo+k:ihi-1] += data[k:] * \
741 ((idelay_ceil-delay/deltat) * weight)
743 if ilo > 0 and gf_trace.begin_value != 0.0:
744 out[:ilo] += gf_trace.begin_value * weight
746 if ihi < nsamples and gf_trace.end_value != 0.0:
747 out[ihi:] += gf_trace.end_value * weight
749 return GFTrace(out, itmin, deltat)
751 def _sum_impl_reference(self, irecords, delays, weights, itmin, nsamples,
752 decimate):
754 if not self._f_index:
755 self.open()
757 deltat = self.get_deltat() * decimate
759 if len(irecords) == 0:
760 if None in (itmin, nsamples):
761 return Zero
762 else:
763 return GFTrace(
764 num.zeros(nsamples, dtype=gf_dtype), itmin,
765 deltat, is_zero=True)
767 datas = []
768 itmins = []
769 for i, delay, weight in zip(irecords, delays, weights):
770 tr = self._get(i, None, None, decimate, 'reference')
772 idelay_floor = int(math.floor(delay/deltat))
773 idelay_ceil = int(math.ceil(delay/deltat))
775 if idelay_floor == idelay_ceil:
776 itmins.append(tr.itmin + idelay_floor)
777 datas.append(tr.data.copy()*weight)
778 else:
779 itmins.append(tr.itmin + idelay_floor)
780 datas.append(tr.data.copy()*weight*(idelay_ceil-delay/deltat))
781 itmins.append(tr.itmin + idelay_ceil)
782 datas.append(tr.data.copy()*weight*(delay/deltat-idelay_floor))
784 itmin_all = min(itmins)
786 itmax_all = max(itmin_ + data.size for (itmin_, data) in
787 zip(itmins, datas))
789 if itmin is not None:
790 itmin_all = min(itmin_all, itmin)
791 if nsamples is not None:
792 itmax_all = max(itmax_all, itmin+nsamples)
794 nsamples_all = itmax_all - itmin_all
796 arr = num.zeros((len(datas), nsamples_all), dtype=gf_dtype)
797 for i, itmin_, data in zip(num.arange(len(datas)), itmins, datas):
798 if data.size > 0:
799 ilo = itmin_-itmin_all
800 ihi = ilo + data.size
801 arr[i, :ilo] = data[0]
802 arr[i, ilo:ihi] = data
803 arr[i, ihi:] = data[-1]
805 sum_arr = arr.sum(axis=0)
807 if itmin is not None and nsamples is not None:
808 ilo = itmin-itmin_all
809 ihi = ilo + nsamples
810 sum_arr = sum_arr[ilo:ihi]
812 else:
813 itmin = itmin_all
815 return GFTrace(sum_arr, itmin, deltat)
817 def _optimize(self, irecords, delays, weights):
818 if num.unique(irecords).size == irecords.size:
819 return irecords, delays, weights
821 deltat = self.get_deltat()
823 delays = delays / deltat
824 irecords2 = num.repeat(irecords, 2)
825 delays2 = num.empty(irecords2.size, dtype=float)
826 delays2[0::2] = num.floor(delays)
827 delays2[1::2] = num.ceil(delays)
828 weights2 = num.repeat(weights, 2)
829 weights2[0::2] *= 1.0 - (delays - delays2[0::2])
830 weights2[1::2] *= (1.0 - (delays2[1::2] - delays)) * \
831 (delays2[1::2] - delays2[0::2])
833 delays2 *= deltat
835 iorder = num.lexsort((delays2, irecords2))
837 irecords2 = irecords2[iorder]
838 delays2 = delays2[iorder]
839 weights2 = weights2[iorder]
841 ui = num.empty(irecords2.size, dtype=bool)
842 ui[1:] = num.logical_or(num.diff(irecords2) != 0,
843 num.diff(delays2) != 0.)
845 ui[0] = 0
846 ind2 = num.cumsum(ui)
847 ui[0] = 1
848 ind1 = num.where(ui)[0]
850 irecords3 = irecords2[ind1]
851 delays3 = delays2[ind1]
852 weights3 = num.bincount(ind2, weights2)
854 return irecords3, delays3, weights3
856 def _sum(self, irecords, delays, weights, itmin, nsamples, decimate,
857 implementation, optimization):
859 if not self._f_index:
860 self.open()
862 t0 = time.time()
863 if optimization == 'enable':
864 irecords, delays, weights = self._optimize(
865 irecords, delays, weights)
866 else:
867 assert optimization == 'disable'
869 t1 = time.time()
870 deltat = self.get_deltat()
872 if implementation == 'c' and decimate == 1:
873 if delays.size != 0:
874 itoffset = int(num.floor(num.min(delays)/deltat))
875 else:
876 itoffset = 0
878 if nsamples is None:
879 nsamples = -1
881 if itmin is None:
882 itmin = 0
883 else:
884 itmin -= itoffset
886 try:
887 tr = GFTrace(*store_ext.store_sum(
888 self.cstore, irecords.astype(num.uint64),
889 delays - itoffset*deltat,
890 weights.astype(num.float32),
891 int(itmin), int(nsamples)))
893 tr.itmin += itoffset
895 except store_ext.StoreExtError as e:
896 raise StoreError(str(e) + ' in store %s' % self.store_dir)
898 elif implementation == 'alternative':
899 tr = self._sum_impl_alternative(irecords, delays, weights,
900 itmin, nsamples, decimate)
902 else:
903 tr = self._sum_impl_reference(irecords, delays, weights,
904 itmin, nsamples, decimate)
906 t2 = time.time()
908 tr.n_records_stacked = irecords.size
909 tr.t_optimize = t1 - t0
910 tr.t_stack = t2 - t1
912 return tr
914 def _sum_statics(self, irecords, delays, weights, it, ntargets,
915 nthreads):
916 if not self._f_index:
917 self.open()
919 return store_ext.store_sum_static(
920 self.cstore, irecords, delays, weights, it, ntargets, nthreads)
922 def _load_index(self):
923 if self._use_memmap:
924 records = num.memmap(
925 self._f_index, dtype=gf_record_dtype,
926 offset=gf_store_header_fmt_size,
927 mode=('r', 'r+')[self.mode == 'w'])
929 else:
930 self._f_index.seek(gf_store_header_fmt_size)
931 records = num.fromfile(self._f_index, dtype=gf_record_dtype)
933 assert len(records) == self._nrecords
935 self._records = records
937 def _save_index(self):
938 self._f_index.seek(0)
939 self._f_index.write(struct.pack(gf_store_header_fmt, self._nrecords,
940 self.get_deltat()))
942 if self._use_memmap:
943 self._records.flush()
944 else:
945 self._f_index.seek(gf_store_header_fmt_size)
946 self._records.tofile(self._f_index)
947 self._f_index.flush()
949 def _get_data(self, ipos, begin_value, end_value, ilo, ihi):
950 if ihi - ilo > 0:
951 if ipos == 2:
952 data_orig = num.empty(2, dtype=gf_dtype)
953 data_orig[0] = begin_value
954 data_orig[1] = end_value
955 return data_orig[ilo:ihi]
956 else:
957 self._f_data.seek(
958 int(ipos + ilo*gf_dtype_nbytes_per_sample))
959 arr = num.fromfile(
960 self._f_data, gf_dtype_store, ihi-ilo).astype(gf_dtype)
962 if arr.size != ihi-ilo:
963 raise ShortRead()
964 return arr
965 else:
966 return num.empty((0,), dtype=gf_dtype)
968 def lock_fn(self):
969 return BaseStore.lock_fn_(self.store_dir)
971 def index_fn(self):
972 return BaseStore.index_fn_(self.store_dir)
974 def data_fn(self):
975 return BaseStore.data_fn_(self.store_dir)
977 def config_fn(self):
978 return BaseStore.config_fn_(self.store_dir)
980 def count_special_records(self):
981 if not self._f_index:
982 self.open()
984 return num.histogram(
985 self._records['data_offset'],
986 bins=[0, 1, 2, 3, num.array(-1).astype(num.uint64)])[0]
988 @property
989 def size_index(self):
990 return os.stat(self.index_fn()).st_size
992 @property
993 def size_data(self):
994 return os.stat(self.data_fn()).st_size
996 @property
997 def size_index_and_data(self):
998 return self.size_index + self.size_data
1000 @property
1001 def size_index_and_data_human(self):
1002 return util.human_bytesize(self.size_index_and_data)
1004 def stats(self):
1005 counter = self.count_special_records()
1007 stats = dict(
1008 total=self._nrecords,
1009 inserted=(counter[1] + counter[2] + counter[3]),
1010 empty=counter[0],
1011 short=counter[2],
1012 zero=counter[1],
1013 size_data=self.size_data,
1014 size_index=self.size_index,
1015 )
1017 return stats
1019 stats_keys = 'total inserted empty short zero size_data size_index'.split()
1022def remake_dir(dpath, force):
1023 if os.path.exists(dpath):
1024 if force:
1025 shutil.rmtree(dpath)
1026 else:
1027 raise CannotCreate('Directory "%s" already exists.' % dpath)
1029 os.mkdir(dpath)
1032class MakeTimingParamsFailed(StoreError):
1033 pass
1036class Store(BaseStore):
1038 '''
1039 Green's function disk storage and summation machine.
1041 The `Store` can be used to efficiently store, retrieve, and sum Green's
1042 function traces. A `Store` contains many 1D time traces sampled at even
1043 multiples of a global sampling rate, where each time trace has an
1044 individual start and end time. The traces are treated as having repeating
1045 end points, so the functions they represent can be non-constant only
1046 between begin and end time. It provides capabilities to retrieve decimated
1047 traces and to extract parts of the traces. The main purpose of this class
1048 is to provide a fast, easy to use, and flexible machanism to compute
1049 weighted delay-and-sum stacks with many Green's function traces involved.
1051 Individual Green's functions are accessed through a single integer index at
1052 low level. On top of that, various indexing schemes can be implemented by
1053 providing a mapping from physical coordinates to the low level index `i`.
1054 E.g. for a problem with cylindrical symmetry, one might define a mapping
1055 from the coordinates (`receiver_depth`, `source_depth`, `distance`) to the
1056 low level index. Index translation is done in the
1057 :py:class:`pyrocko.gf.meta.Config` subclass object associated with the
1058 Store. It is accessible through the store's :py:attr:`config` attribute,
1059 and contains all meta information about the store, including physical
1060 extent, geometry, sampling rate, and information about the type of the
1061 stored Green's functions. Information about the underlying earth model
1062 can also be found there.
1064 A GF store can also contain tabulated phase arrivals. In basic cases, these
1065 can be created with the :py:meth:`make_travel_time_tables` and evaluated
1066 with the :py:func:`t` methods.
1068 .. attribute:: config
1070 The :py:class:`pyrocko.gf.meta.Config` derived object associated with
1071 the store which contains all meta information about the store and
1072 provides the high-level to low-level index mapping.
1074 .. attribute:: store_dir
1076 Path to the store's data directory.
1078 .. attribute:: mode
1080 The mode in which the store is opened: ``'r'``: read-only, ``'w'``:
1081 writeable.
1082 '''
1084 @classmethod
1085 def create(cls, store_dir, config, force=False, extra=None):
1086 '''
1087 Create new GF store.
1089 Creates a new GF store at path ``store_dir``. The layout of the GF is
1090 defined with the parameters given in ``config``, which should be an
1091 object of a subclass of :py:class:`~pyrocko.gf.meta.Config`. This
1092 function will refuse to overwrite an existing GF store, unless
1093 ``force`` is set to ``True``. If more information, e.g. parameters
1094 used for the modelling code, earth models or other, should be saved
1095 along with the GF store, these may be provided though a dict given to
1096 ``extra``. The keys of this dict must be names and the values must be
1097 *guts* type objects.
1099 :param store_dir: GF store path
1100 :type store_dir: str
1101 :param config: GF store Config
1102 :type config: :py:class:`~pyrocko.gf.meta.Config`
1103 :param force: Force overwrite, defaults to ``False``
1104 :type force: bool
1105 :param extra: Extra information
1106 :type extra: dict or None
1107 '''
1109 cls.create_editables(store_dir, config, force=force, extra=extra)
1110 cls.create_dependants(store_dir, force=force)
1112 return cls(store_dir)
1114 @staticmethod
1115 def create_editables(store_dir, config, force=False, extra=None):
1116 try:
1117 util.ensuredir(store_dir)
1118 except Exception:
1119 raise CannotCreate('cannot create directory %s' % store_dir)
1121 fns = []
1123 config_fn = os.path.join(store_dir, 'config')
1124 remove_if_exists(config_fn, force)
1125 meta.dump(config, filename=config_fn)
1127 fns.append(config_fn)
1129 for sub_dir in ['extra']:
1130 dpath = os.path.join(store_dir, sub_dir)
1131 remake_dir(dpath, force)
1133 if extra:
1134 for k, v in extra.items():
1135 fn = get_extra_path(store_dir, k)
1136 remove_if_exists(fn, force)
1137 meta.dump(v, filename=fn)
1139 fns.append(fn)
1141 return fns
1143 @staticmethod
1144 def create_dependants(store_dir, force=False):
1145 config_fn = os.path.join(store_dir, 'config')
1146 config = meta.load(filename=config_fn)
1148 BaseStore.create(store_dir, config.deltat, config.nrecords,
1149 force=force)
1151 for sub_dir in ['decimated']:
1152 dpath = os.path.join(store_dir, sub_dir)
1153 remake_dir(dpath, force)
1155 def __init__(self, store_dir, mode='r', use_memmap=True):
1156 BaseStore.__init__(self, store_dir, mode=mode, use_memmap=use_memmap)
1157 config_fn = self.config_fn()
1158 if not os.path.isfile(config_fn):
1159 raise StoreError(
1160 'directory "%s" does not seem to contain a GF store '
1161 '("config" file not found)' % store_dir)
1162 self.load_config()
1164 self._decimated = {}
1165 self._extra = {}
1166 self._phases = {}
1167 for decimate in range(2, 9):
1168 if os.path.isdir(self._decimated_store_dir(decimate)):
1169 self._decimated[decimate] = None
1171 def open(self):
1172 if not self._f_index:
1173 BaseStore.open(self)
1174 c = self.config
1176 mscheme = 'type_' + c.short_type.lower()
1177 store_ext.store_mapping_init(
1178 self.cstore, mscheme,
1179 c.mins, c.maxs, c.deltas, c.ns.astype(num.uint64),
1180 c.ncomponents)
1182 def save_config(self, make_backup=False):
1183 config_fn = self.config_fn()
1184 if make_backup:
1185 os.rename(config_fn, config_fn + '~')
1187 meta.dump(self.config, filename=config_fn)
1189 def get_deltat(self):
1190 return self.config.deltat
1192 def load_config(self):
1193 self.config = meta.load(filename=self.config_fn())
1195 def ensure_reference(self, force=False):
1196 if self.config.reference is not None and not force:
1197 return
1198 self.ensure_uuid()
1199 reference = '%s-%s' % (self.config.id, self.config.uuid[0:6])
1201 if self.config.reference is not None:
1202 self.config.reference = reference
1203 self.save_config()
1204 else:
1205 with open(self.config_fn(), 'a') as config:
1206 config.write('reference: %s\n' % reference)
1207 self.load_config()
1209 def ensure_uuid(self, force=False):
1210 if self.config.uuid is not None and not force:
1211 return
1212 uuid = self.create_store_hash()
1214 if self.config.uuid is not None:
1215 self.config.uuid = uuid
1216 self.save_config()
1217 else:
1218 with open(self.config_fn(), 'a') as config:
1219 config.write('\nuuid: %s\n' % uuid)
1220 self.load_config()
1222 def create_store_hash(self):
1223 logger.info('creating hash for store ...')
1224 m = hashlib.sha1()
1226 traces_size = op.getsize(self.data_fn())
1227 with open(self.data_fn(), 'rb') as traces:
1228 while traces.tell() < traces_size:
1229 m.update(traces.read(4096))
1230 traces.seek(1024 * 1024 * 10, 1)
1232 with open(self.config_fn(), 'rb') as config:
1233 m.update(config.read())
1234 return m.hexdigest()
1236 def get_extra_path(self, key):
1237 return get_extra_path(self.store_dir, key)
1239 def get_extra(self, key):
1240 '''
1241 Get extra information stored under given key.
1242 '''
1244 x = self._extra
1245 if key not in x:
1246 fn = self.get_extra_path(key)
1247 if not os.path.exists(fn):
1248 raise NoSuchExtra(key)
1250 x[key] = meta.load(filename=fn)
1252 return x[key]
1254 def upgrade(self):
1255 '''
1256 Upgrade store config and files to latest Pyrocko version.
1257 '''
1258 fns = [os.path.join(self.store_dir, 'config')]
1259 for key in self.extra_keys():
1260 fns.append(self.get_extra_path(key))
1262 i = 0
1263 for fn in fns:
1264 i += util.pf_upgrade(fn)
1266 return i
1268 def extra_keys(self):
1269 return [x for x in os.listdir(os.path.join(self.store_dir, 'extra'))
1270 if valid_string_id(x)]
1272 def put(self, args, trace):
1273 '''
1274 Insert trace into GF store.
1276 Store a single GF trace at (high-level) index ``args``.
1278 :param args: :py:class:`~pyrocko.gf.meta.Config` index tuple, e.g.
1279 ``(source_depth, distance, component)`` as in
1280 :py:class:`~pyrocko.gf.meta.ConfigTypeA`.
1281 :type args: tuple
1282 :returns: GF trace at ``args``
1283 :rtype: :py:class:`~pyrocko.gf.store.GFTrace`
1284 '''
1286 irecord = self.config.irecord(*args)
1287 self._put(irecord, trace)
1289 def get_record(self, args):
1290 irecord = self.config.irecord(*args)
1291 return self._get_record(irecord)
1293 def str_irecord(self, args):
1294 return BaseStore.str_irecord(self, self.config.irecord(*args))
1296 def get(self, args, itmin=None, nsamples=None, decimate=1,
1297 interpolation='nearest_neighbor', implementation='c'):
1298 '''
1299 Retrieve GF trace from store.
1301 Retrieve a single GF trace from the store at (high-level) index
1302 ``args``. By default, the full trace is retrieved. Given ``itmin`` and
1303 ``nsamples``, only the selected portion of the trace is extracted. If
1304 ``decimate`` is an integer in the range [2,8], the trace is decimated
1305 on the fly or, if available, the trace is read from a decimated version
1306 of the GF store.
1308 :param args: :py:class:`~pyrocko.gf.meta.Config` index tuple, e.g.
1309 ``(source_depth, distance, component)`` as in
1310 :py:class:`~pyrocko.gf.meta.ConfigTypeA`.
1311 :type args: tuple
1312 :param itmin: Start time index (start time is ``itmin * dt``),
1313 defaults to None
1314 :type itmin: int or None
1315 :param nsamples: Number of samples, defaults to None
1316 :type nsamples: int or None
1317 :param decimate: Decimatation factor, defaults to 1
1318 :type decimate: int
1319 :param interpolation: Interpolation method
1320 ``['nearest_neighbor', 'multilinear', 'off']``, defaults to
1321 ``'nearest_neighbor'``
1322 :type interpolation: str
1323 :param implementation: Implementation to use ``['c', 'reference']``,
1324 defaults to ``'c'``.
1325 :type implementation: str
1327 :returns: GF trace at ``args``
1328 :rtype: :py:class:`~pyrocko.gf.store.GFTrace`
1329 '''
1331 store, decimate = self._decimated_store(decimate)
1332 if interpolation == 'nearest_neighbor':
1333 irecord = store.config.irecord(*args)
1334 tr = store._get(irecord, itmin, nsamples, decimate,
1335 implementation)
1337 elif interpolation == 'off':
1338 irecords, weights = store.config.vicinity(*args)
1339 if len(irecords) != 1:
1340 raise NotAllowedToInterpolate()
1341 else:
1342 tr = store._get(irecords[0], itmin, nsamples, decimate,
1343 implementation)
1345 elif interpolation == 'multilinear':
1346 tr = store._sum(irecords, num.zeros(len(irecords)), weights,
1347 itmin, nsamples, decimate, implementation,
1348 'disable')
1350 # to prevent problems with rounding errors (BaseStore saves deltat
1351 # as a 4-byte floating point value, value from YAML config is more
1352 # accurate)
1353 tr.deltat = self.config.deltat * decimate
1354 return tr
1356 def sum(self, args, delays, weights, itmin=None, nsamples=None,
1357 decimate=1, interpolation='nearest_neighbor', implementation='c',
1358 optimization='enable'):
1359 '''
1360 Sum delayed and weighted GF traces.
1362 Calculate sum of delayed and weighted GF traces. ``args`` is a tuple of
1363 arrays forming the (high-level) indices of the GF traces to be
1364 selected. Delays and weights for the summation are given in the arrays
1365 ``delays`` and ``weights``. ``itmin`` and ``nsamples`` can be given to
1366 restrict to computation to a given time interval. If ``decimate`` is
1367 an integer in the range [2,8], decimated traces are used in the
1368 summation.
1370 :param args: :py:class:`~pyrocko.gf.meta.Config` index tuple, e.g.
1371 ``(source_depth, distance, component)`` as in
1372 :py:class:`~pyrocko.gf.meta.ConfigTypeA`.
1373 :type args: tuple(numpy.ndarray)
1374 :param delays: Delay times
1375 :type delays: :py:class:`numpy.ndarray`
1376 :param weights: Trace weights
1377 :type weights: :py:class:`numpy.ndarray`
1378 :param itmin: Start time index (start time is ``itmin * dt``),
1379 defaults to None
1380 :type itmin: int or None
1381 :param nsamples: Number of samples, defaults to None
1382 :type nsamples: int or None
1383 :param decimate: Decimatation factor, defaults to 1
1384 :type decimate: int
1385 :param interpolation: Interpolation method
1386 ``['nearest_neighbor', 'multilinear', 'off']``, defaults to
1387 ``'nearest_neighbor'``
1388 :type interpolation: str
1389 :param implementation: Implementation to use,
1390 ``['c', 'alternative', 'reference']``, where ``'alternative'``
1391 and ``'reference'`` use a Python implementation, defaults to `'c'`
1392 :type implementation: str
1393 :param optimization: Optimization mode ``['enable', 'disable']``,
1394 defaults to ``'enable'``
1395 :type optimization: str
1396 :returns: Stacked GF trace.
1397 :rtype: :py:class:`~pyrocko.gf.store.GFTrace`
1398 '''
1400 store, decimate_ = self._decimated_store(decimate)
1402 if interpolation == 'nearest_neighbor':
1403 irecords = store.config.irecords(*args)
1404 else:
1405 assert interpolation == 'multilinear'
1406 irecords, ip_weights = store.config.vicinities(*args)
1407 neach = irecords.size // args[0].size
1408 weights = num.repeat(weights, neach) * ip_weights
1409 delays = num.repeat(delays, neach)
1411 tr = store._sum(irecords, delays, weights, itmin, nsamples, decimate_,
1412 implementation, optimization)
1414 # to prevent problems with rounding errors (BaseStore saves deltat
1415 # as a 4-byte floating point value, value from YAML config is more
1416 # accurate)
1417 tr.deltat = self.config.deltat * decimate
1418 return tr
1420 def make_decimated(self, decimate, config=None, force=False,
1421 show_progress=False):
1422 '''
1423 Create decimated version of GF store.
1425 Create a downsampled version of the GF store. Downsampling is done for
1426 the integer factor ``decimate`` which should be in the range [2,8]. If
1427 ``config`` is ``None``, all traces of the GF store are decimated and
1428 held available (i.e. the index mapping of the original store is used),
1429 otherwise, a different spacial stepping can be specified by giving a
1430 modified GF store configuration in ``config`` (see :py:meth:`create`).
1431 Decimated GF sub-stores are created under the ``decimated``
1432 subdirectory within the GF store directory. Holding available decimated
1433 versions of the GF store can save computation time, IO bandwidth, or
1434 decrease memory footprint at the cost of increased disk space usage,
1435 when computation are done for lower frequency signals.
1437 :param decimate: Decimate factor
1438 :type decimate: int
1439 :param config: GF store config object, defaults to None
1440 :type config: :py:class:`~pyrocko.gf.meta.Config` or None
1441 :param force: Force overwrite, defaults to ``False``
1442 :type force: bool
1443 :param show_progress: Show progress, defaults to ``False``
1444 :type show_progress: bool
1445 '''
1447 if not self._f_index:
1448 self.open()
1450 if not (2 <= decimate <= 8):
1451 raise StoreError('decimate argument must be in the range [2,8]')
1453 assert self.mode == 'r'
1455 if config is None:
1456 config = self.config
1458 config = copy.deepcopy(config)
1459 config.sample_rate = self.config.sample_rate / decimate
1461 if decimate in self._decimated:
1462 del self._decimated[decimate]
1464 store_dir = self._decimated_store_dir(decimate)
1465 if os.path.exists(store_dir):
1466 if force:
1467 shutil.rmtree(store_dir)
1468 else:
1469 raise CannotCreate('store already exists at %s' % store_dir)
1471 store_dir_incomplete = store_dir + '-incomplete'
1472 Store.create(store_dir_incomplete, config, force=force)
1474 decimated = Store(store_dir_incomplete, 'w')
1475 try:
1476 if show_progress:
1477 pbar = util.progressbar(
1478 'decimating store', self.config.nrecords)
1480 for i, args in enumerate(decimated.config.iter_nodes()):
1481 tr = self.get(args, decimate=decimate)
1482 decimated.put(args, tr)
1484 if show_progress:
1485 pbar.update(i+1)
1487 finally:
1488 if show_progress:
1489 pbar.finish()
1491 decimated.close()
1493 shutil.move(store_dir_incomplete, store_dir)
1495 self._decimated[decimate] = None
1497 def stats(self):
1498 stats = BaseStore.stats(self)
1499 stats['decimated'] = sorted(self._decimated.keys())
1500 return stats
1502 stats_keys = BaseStore.stats_keys + ['decimated']
1504 def check(self, show_progress=False):
1505 have_holes = []
1506 for pdef in self.config.tabulated_phases:
1507 phase_id = pdef.id
1508 ph = self.get_stored_phase(phase_id)
1509 if ph.check_holes():
1510 have_holes.append(phase_id)
1512 if have_holes:
1513 for phase_id in have_holes:
1514 logger.warning(
1515 'Travel time table of phase "{}" contains holes. You can '
1516 ' use `fomosto tttlsd` to fix holes.'.format(
1517 phase_id))
1518 else:
1519 logger.info('No holes in travel time tables')
1521 try:
1522 if show_progress:
1523 pbar = util.progressbar('checking store', self.config.nrecords)
1525 problems = 0
1526 for i, args in enumerate(self.config.iter_nodes()):
1527 tr = self.get(args)
1528 if tr and not tr.is_zero:
1529 if not tr.begin_value == tr.data[0]:
1530 logger.warning('wrong begin value for trace at %s '
1531 '(data corruption?)' % str(args))
1532 problems += 1
1533 if not tr.end_value == tr.data[-1]:
1534 logger.warning('wrong end value for trace at %s '
1535 '(data corruption?)' % str(args))
1536 problems += 1
1537 if not num.all(num.isfinite(tr.data)):
1538 logger.warning(
1539 'nans or infs in trace at %s' % str(args))
1540 problems += 1
1542 if show_progress:
1543 pbar.update(i+1)
1545 finally:
1546 if show_progress:
1547 pbar.finish()
1549 return problems
1551 def check_earthmodels(self, config):
1552 if config.earthmodel_receiver_1d.profile('z')[-1] not in\
1553 config.earthmodel_1d.profile('z'):
1554 logger.warning('deepest layer of earthmodel_receiver_1d not '
1555 'found in earthmodel_1d')
1557 def _decimated_store_dir(self, decimate):
1558 return os.path.join(self.store_dir, 'decimated', str(decimate))
1560 def _decimated_store(self, decimate):
1561 if decimate == 1 or decimate not in self._decimated:
1562 return self, decimate
1563 else:
1564 store = self._decimated[decimate]
1565 if store is None:
1566 store = Store(self._decimated_store_dir(decimate), 'r')
1567 self._decimated[decimate] = store
1569 return store, 1
1571 def phase_filename(self, phase_id, attribute='phase'):
1572 check_string_id(phase_id)
1573 return os.path.join(
1574 self.store_dir, 'phases', phase_id + '.%s' % attribute)
1576 def get_phase_identifier(self, phase_id, attribute):
1577 return '{}.{}'.format(phase_id, attribute)
1579 def get_stored_phase(self, phase_def, attribute='phase'):
1580 '''
1581 Get stored phase from GF store.
1583 :returns: Phase information
1584 :rtype: :py:class:`pyrocko.spit.SPTree`
1585 '''
1587 phase_id = self.get_phase_identifier(phase_def, attribute)
1588 if phase_id not in self._phases:
1589 fn = self.phase_filename(phase_def, attribute)
1590 if not os.path.isfile(fn):
1591 raise NoSuchPhase(phase_id)
1593 spt = spit.SPTree(filename=fn)
1594 self._phases[phase_id] = spt
1596 return self._phases[phase_id]
1598 def get_phase(self, phase_def):
1599 toks = phase_def.split(':', 1)
1600 if len(toks) == 2:
1601 provider, phase_def = toks
1602 else:
1603 provider, phase_def = 'stored', toks[0]
1605 if provider == 'stored':
1606 return self.get_stored_phase(phase_def)
1608 elif provider == 'vel':
1609 vel = float(phase_def) * 1000.
1611 def evaluate(args):
1612 return self.config.get_distance(args) / vel
1614 return evaluate
1616 elif provider == 'vel_surface':
1617 vel = float(phase_def) * 1000.
1619 def evaluate(args):
1620 return self.config.get_surface_distance(args) / vel
1622 return evaluate
1624 elif provider in ('cake', 'iaspei'):
1625 from pyrocko import cake
1626 mod = self.config.earthmodel_1d
1627 if provider == 'cake':
1628 phases = [cake.PhaseDef(phase_def)]
1629 else:
1630 phases = cake.PhaseDef.classic(phase_def)
1632 def evaluate(args):
1633 if len(args) == 2:
1634 zr, zs, x = (self.config.receiver_depth,) + args
1635 elif len(args) == 3:
1636 zr, zs, x = args
1637 else:
1638 assert False
1640 t = []
1641 if phases:
1642 rays = mod.arrivals(
1643 phases=phases,
1644 distances=[x*cake.m2d],
1645 zstart=zs,
1646 zstop=zr)
1648 for ray in rays:
1649 t.append(ray.t)
1651 if t:
1652 return min(t)
1653 else:
1654 return None
1656 return evaluate
1658 raise StoreError('unsupported phase provider: %s' % provider)
1660 def t(self, timing, *args):
1661 '''
1662 Compute interpolated phase arrivals.
1664 **Examples:**
1666 If ``test_store`` is of :py:class:`~pyrocko.gf.meta.ConfigTypeA`::
1668 test_store.t('p', (1000, 10000))
1669 test_store.t('last{P|Pdiff}', (1000, 10000)) # The latter arrival
1670 # of P or diffracted
1671 # P phase
1673 If ``test_store`` is of :py:class:`~pyrocko.gf.meta.ConfigTypeB`::
1675 test_store.t('S', (1000, 1000, 10000))
1676 test_store.t('first{P|p|Pdiff|sP}', (1000, 1000, 10000)) # The
1677 ` # first arrival of
1678 # the given phases is
1679 # selected
1681 :param timing: Timing string as described above
1682 :type timing: str or :py:class:`~pyrocko.gf.meta.Timing`
1683 :param \\*args: :py:class:`~pyrocko.gf.meta.Config` index tuple, e.g.
1684 ``(source_depth, distance, component)`` as in
1685 :py:class:`~pyrocko.gf.meta.ConfigTypeA`.
1686 :type \\*args: tuple
1687 :returns: Phase arrival according to ``timing``
1688 :rtype: float or None
1689 '''
1691 if len(args) == 1:
1692 args = args[0]
1693 else:
1694 args = self.config.make_indexing_args1(*args)
1696 if not isinstance(timing, meta.Timing):
1697 timing = meta.Timing(timing)
1699 return timing.evaluate(self.get_phase, args)
1701 def get_available_interpolation_tables(self):
1702 filenames = glob(op.join(self.store_dir, 'phases', '*'))
1703 return [op.basename(file) for file in filenames]
1705 def get_stored_attribute(self, phase_def, attribute, *args):
1706 '''
1707 Return interpolated store attribute
1709 :param attribute: takeoff_angle / incidence_angle [deg]
1710 :type attribute: str
1711 :param \\*args: :py:class:`~pyrocko.gf.meta.Config` index tuple, e.g.
1712 ``(source_depth, distance, component)`` as in
1713 :py:class:`~pyrocko.gf.meta.ConfigTypeA`.
1714 :type \\*args: tuple
1715 '''
1716 try:
1717 return self.get_stored_phase(phase_def, attribute)(*args)
1718 except NoSuchPhase:
1719 raise StoreError(
1720 'Interpolation table for {} of {} does not exist! '
1721 'Available tables: {}'.format(
1722 attribute, phase_def,
1723 self.get_available_interpolation_tables()))
1725 def get_many_stored_attributes(self, phase_def, attribute, coords):
1726 '''
1727 Return interpolated store attribute
1729 :param attribute: takeoff_angle / incidence_angle [deg]
1730 :type attribute: str
1731 :param \\coords: :py:class:`num.array.Array`, with columns being
1732 ``(source_depth, distance, component)`` as in
1733 :py:class:`~pyrocko.gf.meta.ConfigTypeA`.
1734 :type \\coords: :py:class:`num.array.Array`
1735 '''
1736 try:
1737 return self.get_stored_phase(
1738 phase_def, attribute).interpolate_many(coords)
1739 except NoSuchPhase:
1740 raise StoreError(
1741 'Interpolation table for {} of {} does not exist! '
1742 'Available tables: {}'.format(
1743 attribute, phase_def,
1744 self.get_available_interpolation_tables()))
1746 def make_stored_table(self, attribute, force=False):
1747 '''
1748 Compute tables for selected ray attributes.
1750 :param attribute: phase / takeoff_angle [deg]/ incidence_angle [deg]
1751 :type attribute: str
1753 Tables are computed using the 1D earth model defined in
1754 :py:attr:`~pyrocko.gf.meta.Config.earthmodel_1d` for each defined phase
1755 in :py:attr:`~pyrocko.gf.meta.Config.tabulated_phases`.
1756 '''
1758 if attribute not in available_stored_tables:
1759 raise StoreError(
1760 'Cannot create attribute table for {}! '
1761 'Supported attribute tables: {}'.format(
1762 attribute, available_stored_tables))
1764 from pyrocko import cake
1765 config = self.config
1767 if not config.tabulated_phases:
1768 return
1770 mod = config.earthmodel_1d
1772 if not mod:
1773 raise StoreError('no earth model found')
1775 if config.earthmodel_receiver_1d:
1776 self.check_earthmodels(config)
1778 for pdef in config.tabulated_phases:
1780 phase_id = pdef.id
1781 phases = pdef.phases
1783 if attribute == 'phase':
1784 ftol = config.deltat * 0.5
1785 horvels = pdef.horizontal_velocities
1786 else:
1787 ftol = config.deltat * 0.01
1789 fn = self.phase_filename(phase_id, attribute)
1791 if os.path.exists(fn) and not force:
1792 logger.info('file already exists: %s' % fn)
1793 continue
1795 def evaluate(args):
1797 nargs = len(args)
1798 if nargs == 2:
1799 receiver_depth, source_depth, distance = (
1800 config.receiver_depth,) + args
1801 elif nargs == 3:
1802 receiver_depth, source_depth, distance = args
1803 else:
1804 raise ValueError(
1805 'Number of input arguments %i is not supported!'
1806 'Supported number of arguments: 2 or 3' % nargs)
1808 ray_attribute_values = []
1809 arrival_times = []
1810 if phases:
1811 rays = mod.arrivals(
1812 phases=phases,
1813 distances=[distance * cake.m2d],
1814 zstart=source_depth,
1815 zstop=receiver_depth)
1817 for ray in rays:
1818 arrival_times.append(ray.t)
1819 if attribute != 'phase':
1820 ray_attribute_values.append(
1821 getattr(ray, attribute)())
1823 if attribute == 'phase':
1824 for v in horvels:
1825 arrival_times.append(distance / (v * km))
1827 if arrival_times:
1828 if attribute == 'phase':
1829 return min(arrival_times)
1830 else:
1831 earliest_idx = num.argmin(arrival_times)
1832 return ray_attribute_values[earliest_idx]
1833 else:
1834 return None
1836 logger.info(
1837 'making "%s" table for phasegroup "%s"', attribute, phase_id)
1839 ip = spit.SPTree(
1840 f=evaluate,
1841 ftol=ftol,
1842 xbounds=num.transpose((config.mins, config.maxs)),
1843 xtols=config.deltas)
1845 util.ensuredirs(fn)
1846 ip.dump(fn)
1848 def make_timing_params(self, begin, end, snap_vred=True, force=False):
1849 '''
1850 Compute tight parameterized time ranges to include given timings.
1852 Calculates appropriate time ranges to cover given begin and end timings
1853 over all GF points in the store. A dict with the following keys is
1854 returned:
1856 * ``'tmin'``: time [s], minimum of begin timing over all GF points
1857 * ``'tmax'``: time [s], maximum of end timing over all GF points
1858 * ``'vred'``, ``'tmin_vred'``: slope [m/s] and offset [s] of reduction
1859 velocity [m/s] appropriate to catch begin timing over all GF points
1860 * ``'tlenmax_vred'``: maximum time length needed to cover all end
1861 timings, when using linear slope given with (``vred``, ``tmin_vred``)
1862 as start
1863 '''
1865 data = []
1866 warned = set()
1867 for args in self.config.iter_nodes(level=-1):
1868 tmin = self.t(begin, args)
1869 tmax = self.t(end, args)
1870 if tmin is None:
1871 warned.add(str(begin))
1872 if tmax is None:
1873 warned.add(str(end))
1875 x = self.config.get_surface_distance(args)
1876 data.append((x, tmin, tmax))
1878 if len(warned):
1879 w = ' | '.join(list(warned))
1880 msg = '''determination of time window failed using phase
1881definitions: %s.\n Travel time table contains holes in probed ranges. You can
1882use `fomosto tttlsd` to fix holes.''' % w
1883 if force:
1884 logger.warning(msg)
1885 else:
1886 raise MakeTimingParamsFailed(msg)
1888 xs, tmins, tmaxs = num.array(data, dtype=float).T
1890 tlens = tmaxs - tmins
1892 i = num.nanargmin(tmins)
1893 if not num.isfinite(i):
1894 raise MakeTimingParamsFailed('determination of time window failed')
1896 tminmin = tmins[i]
1897 x_tminmin = xs[i]
1898 dx = (xs - x_tminmin)
1899 dx = num.where(dx != 0.0, dx, num.nan)
1900 s = (tmins - tminmin) / dx
1901 sred = num.min(num.abs(s[num.isfinite(s)]))
1903 deltax = self.config.distance_delta
1905 if snap_vred:
1906 tdif = sred*deltax
1907 tdif2 = self.config.deltat * math.floor(tdif / self.config.deltat)
1908 sred = tdif2/self.config.distance_delta
1910 tmin_vred = tminmin - sred*x_tminmin
1911 if snap_vred:
1912 xe = x_tminmin - int(x_tminmin / deltax) * deltax
1913 tmin_vred = float(
1914 self.config.deltat *
1915 math.floor(tmin_vred / self.config.deltat) - xe * sred)
1917 tlenmax_vred = num.nanmax(tmax - (tmin_vred + sred*x))
1918 if sred != 0.0:
1919 vred = 1.0/sred
1920 else:
1921 vred = 0.0
1923 return dict(
1924 tmin=tminmin,
1925 tmax=num.nanmax(tmaxs),
1926 tlenmax=num.nanmax(tlens),
1927 tmin_vred=tmin_vred,
1928 tlenmax_vred=tlenmax_vred,
1929 vred=vred)
1931 def make_travel_time_tables(self, force=False):
1932 '''
1933 Compute travel time tables.
1935 Travel time tables are computed using the 1D earth model defined in
1936 :py:attr:`~pyrocko.gf.meta.Config.earthmodel_1d` for each defined phase
1937 in :py:attr:`~pyrocko.gf.meta.Config.tabulated_phases`. The accuracy of
1938 the tablulated times is adjusted to the sampling rate of the store.
1939 '''
1940 self.make_stored_table(attribute='phase', force=force)
1942 def make_ttt(self, force=False):
1943 self.make_travel_time_tables(force=force)
1945 def make_takeoff_angle_tables(self, force=False):
1946 '''
1947 Compute takeoff-angle tables.
1949 Takeoff-angle tables [deg] are computed using the 1D earth model
1950 defined in :py:attr:`~pyrocko.gf.meta.Config.earthmodel_1d` for each
1951 defined phase in :py:attr:`~pyrocko.gf.meta.Config.tabulated_phases`.
1952 The accuracy of the tablulated times is adjusted to 0.01 times the
1953 sampling rate of the store.
1954 '''
1955 self.make_stored_table(attribute='takeoff_angle', force=force)
1957 def make_incidence_angle_tables(self, force=False):
1958 '''
1959 Compute incidence-angle tables.
1961 Incidence-angle tables [deg] are computed using the 1D earth model
1962 defined in :py:attr:`~pyrocko.gf.meta.Config.earthmodel_1d` for each
1963 defined phase in :py:attr:`~pyrocko.gf.meta.Config.tabulated_phases`.
1964 The accuracy of the tablulated times is adjusted to 0.01 times the
1965 sampling rate of the store.
1966 '''
1967 self.make_stored_table(attribute='incidence_angle', force=force)
1969 def get_provided_components(self):
1971 scheme_desc = meta.component_scheme_to_description[
1972 self.config.component_scheme]
1974 quantity = self.config.stored_quantity \
1975 or scheme_desc.default_stored_quantity
1977 if not quantity:
1978 return scheme_desc.provided_components
1979 else:
1980 return [
1981 quantity + '.' + comp
1982 for comp in scheme_desc.provided_components]
1984 def fix_ttt_holes(self, phase_id):
1986 pdef = self.config.get_tabulated_phase(phase_id)
1987 mode = None
1988 for phase in pdef.phases:
1989 for leg in phase.legs():
1990 if mode is None:
1991 mode = leg.mode
1993 else:
1994 if mode != leg.mode:
1995 raise StoreError(
1996 'Can only fix holes in pure P or pure S phases.')
1998 sptree = self.get_stored_phase(phase_id)
1999 sptree_lsd = self.config.fix_ttt_holes(sptree, mode)
2001 phase_lsd = phase_id + '.lsd'
2002 fn = self.phase_filename(phase_lsd)
2003 sptree_lsd.dump(fn)
2005 def statics(self, source, multi_location, itsnapshot, components,
2006 interpolation='nearest_neighbor', nthreads=0):
2007 if not self._f_index:
2008 self.open()
2010 out = {}
2011 ntargets = multi_location.ntargets
2012 source_terms = source.get_source_terms(self.config.component_scheme)
2013 # TODO: deal with delays for snapshots > 1 sample
2015 if itsnapshot is not None:
2016 delays = source.times
2018 # Fringe case where we sample at sample 0 and sample 1
2019 tsnapshot = itsnapshot * self.config.deltat
2020 if delays.max() == tsnapshot and delays.min() != tsnapshot:
2021 delays[delays == delays.max()] -= self.config.deltat
2023 else:
2024 delays = source.times * 0
2025 itsnapshot = 1
2027 if ntargets == 0:
2028 raise StoreError('MultiLocation.coords5 is empty')
2030 res = store_ext.store_calc_static(
2031 self.cstore,
2032 source.coords5(),
2033 source_terms,
2034 delays,
2035 multi_location.coords5,
2036 self.config.component_scheme,
2037 interpolation,
2038 itsnapshot,
2039 nthreads)
2041 out = {}
2042 for icomp, (comp, comp_res) in enumerate(
2043 zip(self.get_provided_components(), res)):
2044 if comp not in components:
2045 continue
2046 out[comp] = res[icomp]
2048 return out
2050 def calc_seismograms(self, source, receivers, components, deltat=None,
2051 itmin=None, nsamples=None,
2052 interpolation='nearest_neighbor',
2053 optimization='enable', nthreads=1):
2054 config = self.config
2056 assert interpolation in ['nearest_neighbor', 'multilinear'], \
2057 'Unknown interpolation: %s' % interpolation
2059 if not isinstance(receivers, list):
2060 receivers = [receivers]
2062 if deltat is None:
2063 decimate = 1
2064 else:
2065 decimate = int(round(deltat/config.deltat))
2066 if abs(deltat / (decimate * config.deltat) - 1.0) > 0.001:
2067 raise StoreError(
2068 'unavailable decimation ratio target.deltat / store.deltat'
2069 ' = %g / %g' % (deltat, config.deltat))
2071 store, decimate_ = self._decimated_store(decimate)
2073 if not store._f_index:
2074 store.open()
2076 scheme = config.component_scheme
2078 source_coords_arr = source.coords5()
2079 source_terms = source.get_source_terms(scheme)
2080 delays = source.times.ravel()
2082 nreceiver = len(receivers)
2083 receiver_coords_arr = num.empty((nreceiver, 5))
2084 for irec, rec in enumerate(receivers):
2085 receiver_coords_arr[irec, :] = rec.coords5
2087 dt = self.get_deltat()
2089 itoffset = int(num.floor(delays.min()/dt)) if delays.size != 0 else 0
2091 if itmin is None:
2092 itmin = num.zeros(nreceiver, dtype=num.int32)
2093 else:
2094 itmin = (itmin-itoffset).astype(num.int32)
2096 if nsamples is None:
2097 nsamples = num.zeros(nreceiver, dtype=num.int32) - 1
2098 else:
2099 nsamples = nsamples.astype(num.int32)
2101 try:
2102 results = store_ext.store_calc_timeseries(
2103 store.cstore,
2104 source_coords_arr,
2105 source_terms,
2106 (delays - itoffset*dt),
2107 receiver_coords_arr,
2108 scheme,
2109 interpolation,
2110 itmin,
2111 nsamples,
2112 nthreads)
2113 except Exception as e:
2114 raise e
2116 provided_components = self.get_provided_components()
2117 ncomponents = len(provided_components)
2119 seismograms = [dict() for _ in range(nreceiver)]
2120 for ires in range(len(results)):
2121 res = results.pop(0)
2122 ireceiver = ires // ncomponents
2124 comp = provided_components[res[-2]]
2126 if comp not in components:
2127 continue
2129 tr = GFTrace(*res[:-2])
2130 tr.deltat = config.deltat * decimate
2131 tr.itmin += itoffset
2133 tr.n_records_stacked = 0
2134 tr.t_optimize = 0.
2135 tr.t_stack = 0.
2136 tr.err = res[-1]
2138 seismograms[ireceiver][comp] = tr
2140 return seismograms
2142 def seismogram(self, source, receiver, components, deltat=None,
2143 itmin=None, nsamples=None,
2144 interpolation='nearest_neighbor',
2145 optimization='enable', nthreads=1):
2147 config = self.config
2149 if deltat is None:
2150 decimate = 1
2151 else:
2152 decimate = int(round(deltat/config.deltat))
2153 if abs(deltat / (decimate * config.deltat) - 1.0) > 0.001:
2154 raise StoreError(
2155 'unavailable decimation ratio target.deltat / store.deltat'
2156 ' = %g / %g' % (deltat, config.deltat))
2158 store, decimate_ = self._decimated_store(decimate)
2160 if not store._f_index:
2161 store.open()
2163 scheme = config.component_scheme
2165 source_coords_arr = source.coords5()
2166 source_terms = source.get_source_terms(scheme)
2167 receiver_coords_arr = receiver.coords5[num.newaxis, :].copy()
2169 try:
2170 params = store_ext.make_sum_params(
2171 store.cstore,
2172 source_coords_arr,
2173 source_terms,
2174 receiver_coords_arr,
2175 scheme,
2176 interpolation, nthreads)
2178 except store_ext.StoreExtError:
2179 raise meta.OutOfBounds()
2181 provided_components = self.get_provided_components()
2183 out = {}
2184 for icomp, comp in enumerate(provided_components):
2185 if comp in components:
2186 weights, irecords = params[icomp]
2188 neach = irecords.size // source.times.size
2189 delays = num.repeat(source.times, neach)
2191 tr = store._sum(
2192 irecords, delays, weights, itmin, nsamples, decimate_,
2193 'c', optimization)
2195 # to prevent problems with rounding errors (BaseStore saves
2196 # deltat as a 4-byte floating point value, value from YAML
2197 # config is more accurate)
2198 tr.deltat = config.deltat * decimate
2200 out[comp] = tr
2202 return out
2205__all__ = '''
2206gf_dtype
2207NotMultipleOfSamplingInterval
2208GFTrace
2209CannotCreate
2210CannotOpen
2211DuplicateInsert
2212NotAllowedToInterpolate
2213NoSuchExtra
2214NoSuchPhase
2215BaseStore
2216Store
2217SeismosizerErrorEnum
2218'''.split()