1# http://pyrocko.org - GPLv3
2#
3# The Pyrocko Developers, 21st Century
4# ---|P------/S----------~Lg----------
6import errno
7import time
8import os
9import struct
10import math
11import shutil
12try:
13 import fcntl
14except ImportError:
15 fcntl = None
16import copy
17import logging
18import re
19import hashlib
20from glob import glob
22import numpy as num
23from scipy import signal
25from . import meta
26from .error import StoreError
27try:
28 from . import store_ext
29except ImportError:
30 store_ext = None
31from pyrocko import util, spit
33logger = logging.getLogger('pyrocko.gf.store')
34op = os.path
36# gf store endianness
37E = '<'
39gf_dtype = num.dtype(num.float32)
40gf_dtype_store = num.dtype(E + 'f4')
42gf_dtype_nbytes_per_sample = 4
44gf_store_header_dtype = [
45 ('nrecords', E + 'u8'),
46 ('deltat', E + 'f4'),
47]
49gf_store_header_fmt = E + 'Qf'
50gf_store_header_fmt_size = struct.calcsize(gf_store_header_fmt)
52gf_record_dtype = num.dtype([
53 ('data_offset', E + 'u8'),
54 ('itmin', E + 'i4'),
55 ('nsamples', E + 'u4'),
56 ('begin_value', E + 'f4'),
57 ('end_value', E + 'f4'),
58])
60available_stored_tables = ['phase', 'takeoff_angle', 'incidence_angle']
62km = 1000.
65class SeismosizerErrorEnum:
66 SUCCESS = 0
67 INVALID_RECORD = 1
68 EMPTY_RECORD = 2
69 BAD_RECORD = 3
70 ALLOC_FAILED = 4
71 BAD_REQUEST = 5
72 BAD_DATA_OFFSET = 6
73 READ_DATA_FAILED = 7
74 SEEK_INDEX_FAILED = 8
75 READ_INDEX_FAILED = 9
76 FSTAT_TRACES_FAILED = 10
77 BAD_STORE = 11
78 MMAP_INDEX_FAILED = 12
79 MMAP_TRACES_FAILED = 13
80 INDEX_OUT_OF_BOUNDS = 14
81 NTARGETS_OUT_OF_BOUNDS = 15
84def valid_string_id(s):
85 return re.match(meta.StringID.pattern, s)
88def check_string_id(s):
89 if not valid_string_id(s):
90 raise ValueError('invalid name %s' % s)
92# - data_offset
93#
94# Data file offset of first sample in bytes (the seek value).
95# Special values:
96#
97# 0x00 - missing trace
98# 0x01 - zero trace (GF trace is physically zero)
99# 0x02 - short trace of 1 or 2 samples (no need for data allocation)
100#
101# - itmin
102#
103# Time of first sample of the trace as a multiple of the sampling interval. All
104# GF samples must be evaluated at multiples of the sampling interval with
105# respect to a global reference time zero. Must be set also for zero traces.
106#
107# - nsamples
108#
109# Number of samples in the GF trace. Should be set to zero for zero traces.
110#
111# - begin_value, end_value
112#
113# Values of first and last sample. These values are included in data[]
114# redunantly.
117class NotMultipleOfSamplingInterval(Exception):
118 pass
121sampling_check_eps = 1e-5
124class GFTrace(object):
125 '''
126 Green's Function trace class for handling traces from the GF store.
127 '''
129 @classmethod
130 def from_trace(cls, tr):
131 return cls(data=tr.ydata.copy(), tmin=tr.tmin, deltat=tr.deltat)
133 def __init__(self, data=None, itmin=None, deltat=1.0,
134 is_zero=False, begin_value=None, end_value=None, tmin=None):
136 assert sum((x is None) for x in (tmin, itmin)) == 1, \
137 'GFTrace: either tmin or itmin must be given'\
139 if tmin is not None:
140 itmin = int(round(tmin / deltat))
141 if abs(itmin*deltat - tmin) > sampling_check_eps*deltat:
142 raise NotMultipleOfSamplingInterval(
143 'GFTrace: tmin (%g) is not a multiple of sampling '
144 'interval (%g)' % (tmin, deltat))
146 if data is not None:
147 data = num.asarray(data, dtype=gf_dtype)
148 else:
149 data = num.array([], dtype=gf_dtype)
150 begin_value = 0.0
151 end_value = 0.0
153 self.data = data
154 self.itmin = itmin
155 self.deltat = deltat
156 self.is_zero = is_zero
157 self.n_records_stacked = 0.
158 self.t_stack = 0.
159 self.t_optimize = 0.
160 self.err = SeismosizerErrorEnum.SUCCESS
162 if data is not None and data.size > 0:
163 if begin_value is None:
164 begin_value = data[0]
165 if end_value is None:
166 end_value = data[-1]
168 self.begin_value = begin_value
169 self.end_value = end_value
171 @property
172 def t(self):
173 '''
174 Time vector of the GF trace.
176 :returns: Time vector
177 :rtype: :py:class:`numpy.ndarray`
178 '''
179 return num.linspace(
180 self.itmin*self.deltat,
181 (self.itmin+self.data.size-1)*self.deltat, self.data.size)
183 def __str__(self, itmin=0):
185 if self.is_zero:
186 return 'ZERO'
188 s = []
189 for i in range(itmin, self.itmin + self.data.size):
190 if i >= self.itmin and i < self.itmin + self.data.size:
191 s.append('%7.4g' % self.data[i-self.itmin])
192 else:
193 s.append(' '*7)
195 return '|'.join(s)
197 def to_trace(self, net, sta, loc, cha):
198 from pyrocko import trace
199 return trace.Trace(
200 net, sta, loc, cha,
201 ydata=self.data,
202 deltat=self.deltat,
203 tmin=self.itmin*self.deltat)
206class GFValue(object):
208 def __init__(self, value):
209 self.value = value
210 self.n_records_stacked = 0.
211 self.t_stack = 0.
212 self.t_optimize = 0.
215Zero = GFTrace(is_zero=True, itmin=0)
218def make_same_span(tracesdict):
220 traces = tracesdict.values()
222 nonzero = [tr for tr in traces if not tr.is_zero]
223 if not nonzero:
224 return {k: Zero for k in tracesdict.keys()}
226 deltat = nonzero[0].deltat
228 itmin = min(tr.itmin for tr in nonzero)
229 itmax = max(tr.itmin+tr.data.size for tr in nonzero) - 1
231 out = {}
232 for k, tr in tracesdict.items():
233 n = itmax - itmin + 1
234 if tr.itmin != itmin or tr.data.size != n:
235 data = num.zeros(n, dtype=gf_dtype)
236 if not tr.is_zero:
237 lo = tr.itmin-itmin
238 hi = lo + tr.data.size
239 data[:lo] = tr.data[0]
240 data[lo:hi] = tr.data
241 data[hi:] = tr.data[-1]
243 tr = GFTrace(data, itmin, deltat)
245 out[k] = tr
247 return out
250class CannotCreate(StoreError):
251 pass
254class CannotOpen(StoreError):
255 pass
258class DuplicateInsert(StoreError):
259 pass
262class ShortRead(StoreError):
263 def __str__(self):
264 return 'unexpected end of data (truncated traces file?)'
267class NotAllowedToInterpolate(StoreError):
268 def __str__(self):
269 return 'not allowed to interpolate'
272class NoSuchExtra(StoreError):
273 def __init__(self, s):
274 StoreError.__init__(self)
275 self.value = s
277 def __str__(self):
278 return 'extra information for key "%s" not found.' % self.value
281class NoSuchPhase(StoreError):
282 def __init__(self, s):
283 StoreError.__init__(self)
284 self.value = s
286 def __str__(self):
287 return 'phase for key "%s" not found. ' \
288 'Running "fomosto ttt" or "fomosto sat" may be needed.' \
289 % self.value
292def remove_if_exists(fn, force=False):
293 if os.path.exists(fn):
294 if force:
295 os.remove(fn)
296 else:
297 raise CannotCreate('file %s already exists' % fn)
300def get_extra_path(store_dir, key):
301 check_string_id(key)
302 return os.path.join(store_dir, 'extra', key)
305class BaseStore(object):
307 @staticmethod
308 def lock_fn_(store_dir):
309 return os.path.join(store_dir, 'lock')
311 @staticmethod
312 def index_fn_(store_dir):
313 return os.path.join(store_dir, 'index')
315 @staticmethod
316 def data_fn_(store_dir):
317 return os.path.join(store_dir, 'traces')
319 @staticmethod
320 def config_fn_(store_dir):
321 return os.path.join(store_dir, 'config')
323 @staticmethod
324 def create(store_dir, deltat, nrecords, force=False):
326 try:
327 util.ensuredir(store_dir)
328 except Exception:
329 raise CannotCreate('cannot create directory %s' % store_dir)
331 index_fn = BaseStore.index_fn_(store_dir)
332 data_fn = BaseStore.data_fn_(store_dir)
334 for fn in (index_fn, data_fn):
335 remove_if_exists(fn, force)
337 with open(index_fn, 'wb') as f:
338 f.write(struct.pack(gf_store_header_fmt, nrecords, deltat))
339 records = num.zeros(nrecords, dtype=gf_record_dtype)
340 records.tofile(f)
342 with open(data_fn, 'wb') as f:
343 f.write(b'\0' * 32)
345 def __init__(self, store_dir, mode='r', use_memmap=True):
346 assert mode in 'rw'
347 self.store_dir = store_dir
348 self.mode = mode
349 self._use_memmap = use_memmap
350 self._nrecords = None
351 self._deltat = None
352 self._f_index = None
353 self._f_data = None
354 self._records = None
355 self.cstore = None
357 def open(self):
358 assert self._f_index is None
359 index_fn = self.index_fn()
360 data_fn = self.data_fn()
362 if self.mode == 'r':
363 fmode = 'rb'
364 elif self.mode == 'w':
365 fmode = 'r+b'
366 else:
367 assert False, 'invalid mode: %s' % self.mode
369 try:
370 self._f_index = open(index_fn, fmode)
371 self._f_data = open(data_fn, fmode)
372 except Exception:
373 self.mode = ''
374 raise CannotOpen('cannot open gf store: %s' % self.store_dir)
376 try:
377 # replace single precision deltat value in store with the double
378 # precision value from the config, if available
379 self.cstore = store_ext.store_init(
380 self._f_index.fileno(), self._f_data.fileno(),
381 self.get_deltat() or 0.0)
383 except store_ext.StoreExtError as e:
384 raise StoreError(str(e))
386 while True:
387 try:
388 dataheader = self._f_index.read(gf_store_header_fmt_size)
389 break
391 except IOError as e:
392 # occasionally got this one on an NFS volume
393 if e.errno == errno.EBUSY:
394 time.sleep(0.01)
395 else:
396 raise
398 nrecords, deltat = struct.unpack(gf_store_header_fmt, dataheader)
399 self._nrecords = nrecords
400 self._deltat = deltat
402 self._load_index()
404 def __del__(self):
405 if self.mode != '':
406 self.close()
408 def lock(self):
409 if not fcntl:
410 lock_fn = self.lock_fn()
411 util.create_lockfile(lock_fn)
412 else:
413 if not self._f_index:
414 self.open()
416 while True:
417 try:
418 fcntl.lockf(self._f_index, fcntl.LOCK_EX)
419 break
421 except IOError as e:
422 if e.errno == errno.ENOLCK:
423 time.sleep(0.01)
424 else:
425 raise
427 def unlock(self):
428 if not fcntl:
429 lock_fn = self.lock_fn()
430 util.delete_lockfile(lock_fn)
431 else:
432 self._f_data.flush()
433 self._f_index.flush()
434 fcntl.lockf(self._f_index, fcntl.LOCK_UN)
436 def put(self, irecord, trace):
437 self._put(irecord, trace)
439 def get_record(self, irecord):
440 return self._get_record(irecord)
442 def get_span(self, irecord, decimate=1):
443 return self._get_span(irecord, decimate=decimate)
445 def get(self, irecord, itmin=None, nsamples=None, decimate=1,
446 implementation='c'):
447 return self._get(irecord, itmin, nsamples, decimate, implementation)
449 def sum(self, irecords, delays, weights, itmin=None,
450 nsamples=None, decimate=1, implementation='c',
451 optimization='enable'):
452 return self._sum(irecords, delays, weights, itmin, nsamples, decimate,
453 implementation, optimization)
455 def irecord_format(self):
456 return util.zfmt(self._nrecords)
458 def str_irecord(self, irecord):
459 return self.irecord_format() % irecord
461 def close(self):
462 if self.mode == 'w':
463 if not self._f_index:
464 self.open()
465 self._save_index()
467 if self._f_data:
468 self._f_data.close()
469 self._f_data = None
471 if self._f_index:
472 self._f_index.close()
473 self._f_index = None
475 del self._records
476 self._records = None
478 self.mode = ''
480 def _get_record(self, irecord):
481 if not self._f_index:
482 self.open()
484 return self._records[irecord]
486 def _get(self, irecord, itmin, nsamples, decimate, implementation):
487 '''
488 Retrieve complete GF trace from storage.
489 '''
491 if not self._f_index:
492 self.open()
494 if not self.mode == 'r':
495 raise StoreError('store not open in read mode')
497 if implementation == 'c' and decimate == 1:
499 if nsamples is None:
500 nsamples = -1
502 if itmin is None:
503 itmin = 0
505 try:
506 return GFTrace(*store_ext.store_get(
507 self.cstore, int(irecord), int(itmin), int(nsamples)))
508 except store_ext.StoreExtError as e:
509 raise StoreError(str(e))
511 else:
512 return self._get_impl_reference(irecord, itmin, nsamples, decimate)
514 def get_deltat(self):
515 return self._deltat
517 def _get_impl_reference(self, irecord, itmin, nsamples, decimate):
518 deltat = self.get_deltat()
520 if not (0 <= irecord < self._nrecords):
521 raise StoreError('invalid record number requested '
522 '(irecord = %i, nrecords = %i)' %
523 (irecord, self._nrecords))
525 ipos, itmin_data, nsamples_data, begin_value, end_value = \
526 self._records[irecord]
528 if None in (itmin, nsamples):
529 itmin = itmin_data
530 itmax = itmin_data + nsamples_data - 1
531 nsamples = nsamples_data
532 else:
533 itmin = itmin * decimate
534 nsamples = nsamples * decimate
535 itmax = itmin + nsamples - decimate
537 if ipos == 0:
538 return None
540 elif ipos == 1:
541 itmin_ext = (max(itmin, itmin_data)//decimate) * decimate
542 return GFTrace(is_zero=True, itmin=itmin_ext//decimate)
544 if decimate == 1:
545 ilo = max(itmin, itmin_data) - itmin_data
546 ihi = min(itmin+nsamples, itmin_data+nsamples_data) - itmin_data
547 data = self._get_data(ipos, begin_value, end_value, ilo, ihi)
549 return GFTrace(data, itmin=itmin_data+ilo, deltat=deltat,
550 begin_value=begin_value, end_value=end_value)
552 else:
553 itmax_data = itmin_data + nsamples_data - 1
555 # put begin and end to multiples of new sampling rate
556 itmin_ext = (max(itmin, itmin_data)//decimate) * decimate
557 itmax_ext = -((-min(itmax, itmax_data))//decimate) * decimate
558 nsamples_ext = itmax_ext - itmin_ext + 1
560 # add some padding for the aa filter
561 order = 30
562 itmin_ext_pad = itmin_ext - order//2
563 itmax_ext_pad = itmax_ext + order//2
564 nsamples_ext_pad = itmax_ext_pad - itmin_ext_pad + 1
566 itmin_overlap = max(itmin_data, itmin_ext_pad)
567 itmax_overlap = min(itmax_data, itmax_ext_pad)
569 ilo = itmin_overlap - itmin_ext_pad
570 ihi = max(ilo, itmax_overlap - itmin_ext_pad + 1)
571 ilo_data = itmin_overlap - itmin_data
572 ihi_data = max(ilo_data, itmax_overlap - itmin_data + 1)
574 data_ext_pad = num.empty(nsamples_ext_pad, dtype=gf_dtype)
575 data_ext_pad[ilo:ihi] = self._get_data(
576 ipos, begin_value, end_value, ilo_data, ihi_data)
578 data_ext_pad[:ilo] = begin_value
579 data_ext_pad[ihi:] = end_value
581 b = signal.firwin(order + 1, 1. / decimate, window='hamming')
582 a = 1.
583 data_filt_pad = signal.lfilter(b, a, data_ext_pad)
584 data_deci = data_filt_pad[order:order+nsamples_ext:decimate]
585 if data_deci.size >= 1:
586 if itmin_ext <= itmin_data:
587 data_deci[0] = begin_value
589 if itmax_ext >= itmax_data:
590 data_deci[-1] = end_value
592 return GFTrace(data_deci, itmin_ext//decimate,
593 deltat*decimate,
594 begin_value=begin_value, end_value=end_value)
596 def _get_span(self, irecord, decimate=1):
597 '''
598 Get temporal extent of GF trace at given index.
599 '''
601 if not self._f_index:
602 self.open()
604 assert 0 <= irecord < self._nrecords, \
605 'irecord = %i, nrecords = %i' % (irecord, self._nrecords)
607 (_, itmin, nsamples, _, _) = self._records[irecord]
609 itmax = itmin + nsamples - 1
611 if decimate == 1:
612 return itmin, itmax
613 else:
614 if nsamples == 0:
615 return itmin//decimate, itmin//decimate - 1
616 else:
617 return itmin//decimate, -((-itmax)//decimate)
619 def _put(self, irecord, trace):
620 '''
621 Save GF trace to storage.
622 '''
624 if not self._f_index:
625 self.open()
627 deltat = self.get_deltat()
629 assert self.mode == 'w'
630 assert trace.is_zero or \
631 abs(trace.deltat - deltat) < 1e-7 * deltat
632 assert 0 <= irecord < self._nrecords, \
633 'irecord = %i, nrecords = %i' % (irecord, self._nrecords)
635 if self._records[irecord][0] != 0:
636 raise DuplicateInsert('record %i already in store' % irecord)
638 if trace.is_zero or num.all(trace.data == 0.0):
639 self._records[irecord] = (1, trace.itmin, 0, 0., 0.)
640 return
642 ndata = trace.data.size
644 if ndata > 2:
645 self._f_data.seek(0, 2)
646 ipos = self._f_data.tell()
647 trace.data.astype(gf_dtype_store).tofile(self._f_data)
648 else:
649 ipos = 2
651 self._records[irecord] = (ipos, trace.itmin, ndata,
652 trace.data[0], trace.data[-1])
654 def _sum_impl_alternative(self, irecords, delays, weights, itmin, nsamples,
655 decimate):
657 '''
658 Sum delayed and weighted GF traces.
659 '''
661 if not self._f_index:
662 self.open()
664 assert self.mode == 'r'
666 deltat = self.get_deltat() * decimate
668 if len(irecords) == 0:
669 if None in (itmin, nsamples):
670 return Zero
671 else:
672 return GFTrace(
673 num.zeros(nsamples, dtype=gf_dtype), itmin,
674 deltat, is_zero=True)
676 assert len(irecords) == len(delays)
677 assert len(irecords) == len(weights)
679 if None in (itmin, nsamples):
680 itmin_delayed, itmax_delayed = [], []
681 for irecord, delay in zip(irecords, delays):
682 itmin, itmax = self._get_span(irecord, decimate=decimate)
683 itmin_delayed.append(itmin + delay/deltat)
684 itmax_delayed.append(itmax + delay/deltat)
686 itmin = int(math.floor(min(itmin_delayed)))
687 nsamples = int(math.ceil(max(itmax_delayed))) - itmin + 1
689 out = num.zeros(nsamples, dtype=gf_dtype)
690 if nsamples == 0:
691 return GFTrace(out, itmin, deltat)
693 for ii in range(len(irecords)):
694 irecord = irecords[ii]
695 delay = delays[ii]
696 weight = weights[ii]
698 if weight == 0.0:
699 continue
701 idelay_floor = int(math.floor(delay/deltat))
702 idelay_ceil = int(math.ceil(delay/deltat))
704 gf_trace = self._get(
705 irecord,
706 itmin - idelay_ceil,
707 nsamples + idelay_ceil - idelay_floor,
708 decimate,
709 'reference')
711 assert gf_trace.itmin >= itmin - idelay_ceil
712 assert gf_trace.data.size <= nsamples + idelay_ceil - idelay_floor
714 if gf_trace.is_zero:
715 continue
717 ilo = gf_trace.itmin - itmin + idelay_floor
718 ihi = ilo + gf_trace.data.size + (idelay_ceil-idelay_floor)
720 data = gf_trace.data
722 if idelay_floor == idelay_ceil:
723 out[ilo:ihi] += data * weight
724 else:
725 if data.size:
726 k = 1
727 if ihi <= nsamples:
728 out[ihi-1] += gf_trace.end_value * \
729 ((idelay_ceil-delay/deltat) * weight)
730 k = 0
732 out[ilo+1:ihi-k] += data[:data.size-k] * \
733 ((delay/deltat-idelay_floor) * weight)
734 k = 1
735 if ilo >= 0:
736 out[ilo] += gf_trace.begin_value * \
737 ((delay/deltat-idelay_floor) * weight)
738 k = 0
740 out[ilo+k:ihi-1] += data[k:] * \
741 ((idelay_ceil-delay/deltat) * weight)
743 if ilo > 0 and gf_trace.begin_value != 0.0:
744 out[:ilo] += gf_trace.begin_value * weight
746 if ihi < nsamples and gf_trace.end_value != 0.0:
747 out[ihi:] += gf_trace.end_value * weight
749 return GFTrace(out, itmin, deltat)
751 def _sum_impl_reference(self, irecords, delays, weights, itmin, nsamples,
752 decimate):
754 if not self._f_index:
755 self.open()
757 deltat = self.get_deltat() * decimate
759 if len(irecords) == 0:
760 if None in (itmin, nsamples):
761 return Zero
762 else:
763 return GFTrace(
764 num.zeros(nsamples, dtype=gf_dtype), itmin,
765 deltat, is_zero=True)
767 datas = []
768 itmins = []
769 for i, delay, weight in zip(irecords, delays, weights):
770 tr = self._get(i, None, None, decimate, 'reference')
772 idelay_floor = int(math.floor(delay/deltat))
773 idelay_ceil = int(math.ceil(delay/deltat))
775 if idelay_floor == idelay_ceil:
776 itmins.append(tr.itmin + idelay_floor)
777 datas.append(tr.data.copy()*weight)
778 else:
779 itmins.append(tr.itmin + idelay_floor)
780 datas.append(tr.data.copy()*weight*(idelay_ceil-delay/deltat))
781 itmins.append(tr.itmin + idelay_ceil)
782 datas.append(tr.data.copy()*weight*(delay/deltat-idelay_floor))
784 itmin_all = min(itmins)
786 itmax_all = max(itmin_ + data.size for (itmin_, data) in
787 zip(itmins, datas))
789 if itmin is not None:
790 itmin_all = min(itmin_all, itmin)
791 if nsamples is not None:
792 itmax_all = max(itmax_all, itmin+nsamples)
794 nsamples_all = itmax_all - itmin_all
796 arr = num.zeros((len(datas), nsamples_all), dtype=gf_dtype)
797 for i, itmin_, data in zip(num.arange(len(datas)), itmins, datas):
798 if data.size > 0:
799 ilo = itmin_-itmin_all
800 ihi = ilo + data.size
801 arr[i, :ilo] = data[0]
802 arr[i, ilo:ihi] = data
803 arr[i, ihi:] = data[-1]
805 sum_arr = arr.sum(axis=0)
807 if itmin is not None and nsamples is not None:
808 ilo = itmin-itmin_all
809 ihi = ilo + nsamples
810 sum_arr = sum_arr[ilo:ihi]
812 else:
813 itmin = itmin_all
815 return GFTrace(sum_arr, itmin, deltat)
817 def _optimize(self, irecords, delays, weights):
818 if num.unique(irecords).size == irecords.size:
819 return irecords, delays, weights
821 deltat = self.get_deltat()
823 delays = delays / deltat
824 irecords2 = num.repeat(irecords, 2)
825 delays2 = num.empty(irecords2.size, dtype=float)
826 delays2[0::2] = num.floor(delays)
827 delays2[1::2] = num.ceil(delays)
828 weights2 = num.repeat(weights, 2)
829 weights2[0::2] *= 1.0 - (delays - delays2[0::2])
830 weights2[1::2] *= (1.0 - (delays2[1::2] - delays)) * \
831 (delays2[1::2] - delays2[0::2])
833 delays2 *= deltat
835 iorder = num.lexsort((delays2, irecords2))
837 irecords2 = irecords2[iorder]
838 delays2 = delays2[iorder]
839 weights2 = weights2[iorder]
841 ui = num.empty(irecords2.size, dtype=bool)
842 ui[1:] = num.logical_or(num.diff(irecords2) != 0,
843 num.diff(delays2) != 0.)
845 ui[0] = 0
846 ind2 = num.cumsum(ui)
847 ui[0] = 1
848 ind1 = num.where(ui)[0]
850 irecords3 = irecords2[ind1]
851 delays3 = delays2[ind1]
852 weights3 = num.bincount(ind2, weights2)
854 return irecords3, delays3, weights3
856 def _sum(self, irecords, delays, weights, itmin, nsamples, decimate,
857 implementation, optimization):
859 if not self._f_index:
860 self.open()
862 t0 = time.time()
863 if optimization == 'enable':
864 irecords, delays, weights = self._optimize(
865 irecords, delays, weights)
866 else:
867 assert optimization == 'disable'
869 t1 = time.time()
870 deltat = self.get_deltat()
872 if implementation == 'c' and decimate == 1:
873 if delays.size != 0:
874 itoffset = int(num.floor(num.min(delays)/deltat))
875 else:
876 itoffset = 0
878 if nsamples is None:
879 nsamples = -1
881 if itmin is None:
882 itmin = 0
883 else:
884 itmin -= itoffset
886 try:
887 tr = GFTrace(*store_ext.store_sum(
888 self.cstore, irecords.astype(num.uint64),
889 delays - itoffset*deltat,
890 weights.astype(num.float32),
891 int(itmin), int(nsamples)))
893 tr.itmin += itoffset
895 except store_ext.StoreExtError as e:
896 raise StoreError(str(e) + ' in store %s' % self.store_dir)
898 elif implementation == 'alternative':
899 tr = self._sum_impl_alternative(irecords, delays, weights,
900 itmin, nsamples, decimate)
902 else:
903 tr = self._sum_impl_reference(irecords, delays, weights,
904 itmin, nsamples, decimate)
906 t2 = time.time()
908 tr.n_records_stacked = irecords.size
909 tr.t_optimize = t1 - t0
910 tr.t_stack = t2 - t1
912 return tr
914 def _sum_statics(self, irecords, delays, weights, it, ntargets,
915 nthreads):
916 if not self._f_index:
917 self.open()
919 return store_ext.store_sum_static(
920 self.cstore, irecords, delays, weights, it, ntargets, nthreads)
922 def _load_index(self):
923 if self._use_memmap:
924 records = num.memmap(
925 self._f_index, dtype=gf_record_dtype,
926 offset=gf_store_header_fmt_size,
927 mode=('r', 'r+')[self.mode == 'w'])
929 else:
930 self._f_index.seek(gf_store_header_fmt_size)
931 records = num.fromfile(self._f_index, dtype=gf_record_dtype)
933 assert len(records) == self._nrecords
935 self._records = records
937 def _save_index(self):
938 self._f_index.seek(0)
939 self._f_index.write(struct.pack(gf_store_header_fmt, self._nrecords,
940 self.get_deltat()))
942 if self._use_memmap:
943 self._records.flush()
944 else:
945 self._f_index.seek(gf_store_header_fmt_size)
946 self._records.tofile(self._f_index)
947 self._f_index.flush()
949 def _get_data(self, ipos, begin_value, end_value, ilo, ihi):
950 if ihi - ilo > 0:
951 if ipos == 2:
952 data_orig = num.empty(2, dtype=gf_dtype)
953 data_orig[0] = begin_value
954 data_orig[1] = end_value
955 return data_orig[ilo:ihi]
956 else:
957 self._f_data.seek(
958 int(ipos + ilo*gf_dtype_nbytes_per_sample))
959 arr = num.fromfile(
960 self._f_data, gf_dtype_store, ihi-ilo).astype(gf_dtype)
962 if arr.size != ihi-ilo:
963 raise ShortRead()
964 return arr
965 else:
966 return num.empty((0,), dtype=gf_dtype)
968 def lock_fn(self):
969 return BaseStore.lock_fn_(self.store_dir)
971 def index_fn(self):
972 return BaseStore.index_fn_(self.store_dir)
974 def data_fn(self):
975 return BaseStore.data_fn_(self.store_dir)
977 def config_fn(self):
978 return BaseStore.config_fn_(self.store_dir)
980 def count_special_records(self):
981 if not self._f_index:
982 self.open()
984 return num.histogram(
985 self._records['data_offset'],
986 bins=[0, 1, 2, 3, num.array(-1).astype(num.uint64)])[0]
988 @property
989 def size_index(self):
990 return os.stat(self.index_fn()).st_size
992 @property
993 def size_data(self):
994 return os.stat(self.data_fn()).st_size
996 @property
997 def size_index_and_data(self):
998 return self.size_index + self.size_data
1000 @property
1001 def size_index_and_data_human(self):
1002 return util.human_bytesize(self.size_index_and_data)
1004 def stats(self):
1005 counter = self.count_special_records()
1007 stats = dict(
1008 total=self._nrecords,
1009 inserted=(counter[1] + counter[2] + counter[3]),
1010 empty=counter[0],
1011 short=counter[2],
1012 zero=counter[1],
1013 size_data=self.size_data,
1014 size_index=self.size_index,
1015 )
1017 return stats
1019 stats_keys = 'total inserted empty short zero size_data size_index'.split()
1022def remake_dir(dpath, force):
1023 if os.path.exists(dpath):
1024 if force:
1025 shutil.rmtree(dpath)
1026 else:
1027 raise CannotCreate('Directory "%s" already exists.' % dpath)
1029 os.mkdir(dpath)
1032class MakeTimingParamsFailed(StoreError):
1033 pass
1036class Store(BaseStore):
1038 '''
1039 Green's function disk storage and summation machine.
1041 The `Store` can be used to efficiently store, retrieve, and sum Green's
1042 function traces. A `Store` contains many 1D time traces sampled at even
1043 multiples of a global sampling rate, where each time trace has an
1044 individual start and end time. The traces are treated as having repeating
1045 end points, so the functions they represent can be non-constant only
1046 between begin and end time. It provides capabilities to retrieve decimated
1047 traces and to extract parts of the traces. The main purpose of this class
1048 is to provide a fast, easy to use, and flexible machanism to compute
1049 weighted delay-and-sum stacks with many Green's function traces involved.
1051 Individual Green's functions are accessed through a single integer index at
1052 low level. On top of that, various indexing schemes can be implemented by
1053 providing a mapping from physical coordinates to the low level index `i`.
1054 E.g. for a problem with cylindrical symmetry, one might define a mapping
1055 from the coordinates (`receiver_depth`, `source_depth`, `distance`) to the
1056 low level index. Index translation is done in the
1057 :py:class:`pyrocko.gf.meta.Config` subclass object associated with the
1058 Store. It is accessible through the store's :py:attr:`config` attribute,
1059 and contains all meta information about the store, including physical
1060 extent, geometry, sampling rate, and information about the type of the
1061 stored Green's functions. Information about the underlying earth model
1062 can also be found there.
1064 A GF store can also contain tabulated phase arrivals. In basic cases, these
1065 can be created with the :py:meth:`make_travel_time_tables` and evaluated
1066 with the :py:func:`t` methods.
1068 .. attribute:: config
1070 The :py:class:`pyrocko.gf.meta.Config` derived object associated with
1071 the store which contains all meta information about the store and
1072 provides the high-level to low-level index mapping.
1074 .. attribute:: store_dir
1076 Path to the store's data directory.
1078 .. attribute:: mode
1080 The mode in which the store is opened: ``'r'``: read-only, ``'w'``:
1081 writeable.
1082 '''
1084 @classmethod
1085 def create(cls, store_dir, config, force=False, extra=None):
1086 '''
1087 Create new GF store.
1089 Creates a new GF store at path ``store_dir``. The layout of the GF is
1090 defined with the parameters given in ``config``, which should be an
1091 object of a subclass of :py:class:`~pyrocko.gf.meta.Config`. This
1092 function will refuse to overwrite an existing GF store, unless
1093 ``force`` is set to ``True``. If more information, e.g. parameters
1094 used for the modelling code, earth models or other, should be saved
1095 along with the GF store, these may be provided though a dict given to
1096 ``extra``. The keys of this dict must be names and the values must be
1097 *guts* type objects.
1099 :param store_dir: GF store path
1100 :type store_dir: str
1101 :param config: GF store Config
1102 :type config: :py:class:`~pyrocko.gf.meta.Config`
1103 :param force: Force overwrite, defaults to ``False``
1104 :type force: bool
1105 :param extra: Extra information
1106 :type extra: dict or None
1107 '''
1109 cls.create_editables(store_dir, config, force=force, extra=extra)
1110 cls.create_dependants(store_dir, force=force)
1112 return cls(store_dir)
1114 @staticmethod
1115 def create_editables(store_dir, config, force=False, extra=None):
1116 try:
1117 util.ensuredir(store_dir)
1118 except Exception:
1119 raise CannotCreate('cannot create directory %s' % store_dir)
1121 fns = []
1123 config_fn = os.path.join(store_dir, 'config')
1124 remove_if_exists(config_fn, force)
1125 meta.dump(config, filename=config_fn)
1127 fns.append(config_fn)
1129 for sub_dir in ['extra']:
1130 dpath = os.path.join(store_dir, sub_dir)
1131 remake_dir(dpath, force)
1133 if extra:
1134 for k, v in extra.items():
1135 fn = get_extra_path(store_dir, k)
1136 remove_if_exists(fn, force)
1137 meta.dump(v, filename=fn)
1139 fns.append(fn)
1141 return fns
1143 @staticmethod
1144 def create_dependants(store_dir, force=False):
1145 config_fn = os.path.join(store_dir, 'config')
1146 config = meta.load(filename=config_fn)
1148 BaseStore.create(store_dir, config.deltat, config.nrecords,
1149 force=force)
1151 for sub_dir in ['decimated']:
1152 dpath = os.path.join(store_dir, sub_dir)
1153 remake_dir(dpath, force)
1155 def __init__(self, store_dir, mode='r', use_memmap=True):
1156 BaseStore.__init__(self, store_dir, mode=mode, use_memmap=use_memmap)
1157 config_fn = self.config_fn()
1158 if not os.path.isfile(config_fn):
1159 raise StoreError(
1160 'directory "%s" does not seem to contain a GF store '
1161 '("config" file not found)' % store_dir)
1162 self.load_config()
1164 self._decimated = {}
1165 self._extra = {}
1166 self._phases = {}
1167 for decimate in range(2, 9):
1168 if os.path.isdir(self._decimated_store_dir(decimate)):
1169 self._decimated[decimate] = None
1171 def open(self):
1172 if not self._f_index:
1173 BaseStore.open(self)
1174 c = self.config
1176 mscheme = 'type_' + c.short_type.lower()
1177 store_ext.store_mapping_init(
1178 self.cstore, mscheme,
1179 c.mins, c.maxs, c.deltas, c.ns.astype(num.uint64),
1180 c.ncomponents)
1182 def save_config(self, make_backup=False):
1183 config_fn = self.config_fn()
1184 if make_backup:
1185 os.rename(config_fn, config_fn + '~')
1187 meta.dump(self.config, filename=config_fn)
1189 def get_deltat(self):
1190 return self.config.deltat
1192 def load_config(self):
1193 self.config = meta.load(filename=self.config_fn())
1195 def ensure_reference(self, force=False):
1196 if self.config.reference is not None and not force:
1197 return
1198 self.ensure_uuid()
1199 reference = '%s-%s' % (self.config.id, self.config.uuid[0:6])
1201 if self.config.reference is not None:
1202 self.config.reference = reference
1203 self.save_config()
1204 else:
1205 with open(self.config_fn(), 'a') as config:
1206 config.write('reference: %s\n' % reference)
1207 self.load_config()
1209 def ensure_uuid(self, force=False):
1210 if self.config.uuid is not None and not force:
1211 return
1212 uuid = self.create_store_hash()
1214 if self.config.uuid is not None:
1215 self.config.uuid = uuid
1216 self.save_config()
1217 else:
1218 with open(self.config_fn(), 'a') as config:
1219 config.write('\nuuid: %s\n' % uuid)
1220 self.load_config()
1222 def create_store_hash(self):
1223 logger.info('creating hash for store ...')
1224 m = hashlib.sha1()
1226 traces_size = op.getsize(self.data_fn())
1227 with open(self.data_fn(), 'rb') as traces:
1228 while traces.tell() < traces_size:
1229 m.update(traces.read(4096))
1230 traces.seek(1024 * 1024 * 10, 1)
1232 with open(self.config_fn(), 'rb') as config:
1233 m.update(config.read())
1234 return m.hexdigest()
1236 def get_extra_path(self, key):
1237 return get_extra_path(self.store_dir, key)
1239 def get_extra(self, key):
1240 '''
1241 Get extra information stored under given key.
1242 '''
1244 x = self._extra
1245 if key not in x:
1246 fn = self.get_extra_path(key)
1247 if not os.path.exists(fn):
1248 raise NoSuchExtra(key)
1250 x[key] = meta.load(filename=fn)
1252 return x[key]
1254 def upgrade(self):
1255 '''
1256 Upgrade store config and files to latest Pyrocko version.
1257 '''
1258 fns = [os.path.join(self.store_dir, 'config')]
1259 for key in self.extra_keys():
1260 fns.append(self.get_extra_path(key))
1262 i = 0
1263 for fn in fns:
1264 i += util.pf_upgrade(fn)
1266 return i
1268 def extra_keys(self):
1269 return [x for x in os.listdir(os.path.join(self.store_dir, 'extra'))
1270 if valid_string_id(x)]
1272 def put(self, args, trace):
1273 '''
1274 Insert trace into GF store.
1276 Store a single GF trace at (high-level) index ``args``.
1278 :param args: :py:class:`~pyrocko.gf.meta.Config` index tuple, e.g.
1279 ``(source_depth, distance, component)`` as in
1280 :py:class:`~pyrocko.gf.meta.ConfigTypeA`.
1281 :type args: tuple
1282 :returns: GF trace at ``args``
1283 :rtype: :py:class:`~pyrocko.gf.store.GFTrace`
1284 '''
1286 irecord = self.config.irecord(*args)
1287 self._put(irecord, trace)
1289 def get_record(self, args):
1290 irecord = self.config.irecord(*args)
1291 return self._get_record(irecord)
1293 def str_irecord(self, args):
1294 return BaseStore.str_irecord(self, self.config.irecord(*args))
1296 def get(self, args, itmin=None, nsamples=None, decimate=1,
1297 interpolation='nearest_neighbor', implementation='c'):
1298 '''
1299 Retrieve GF trace from store.
1301 Retrieve a single GF trace from the store at (high-level) index
1302 ``args``. By default, the full trace is retrieved. Given ``itmin`` and
1303 ``nsamples``, only the selected portion of the trace is extracted. If
1304 ``decimate`` is an integer in the range [2,8], the trace is decimated
1305 on the fly or, if available, the trace is read from a decimated version
1306 of the GF store.
1308 :param args: :py:class:`~pyrocko.gf.meta.Config` index tuple, e.g.
1309 ``(source_depth, distance, component)`` as in
1310 :py:class:`~pyrocko.gf.meta.ConfigTypeA`.
1311 :type args: tuple
1312 :param itmin: Start time index (start time is ``itmin * dt``),
1313 defaults to None
1314 :type itmin: int or None
1315 :param nsamples: Number of samples, defaults to None
1316 :type nsamples: int or None
1317 :param decimate: Decimatation factor, defaults to 1
1318 :type decimate: int
1319 :param interpolation: Interpolation method
1320 ``['nearest_neighbor', 'multilinear', 'off']``, defaults to
1321 ``'nearest_neighbor'``
1322 :type interpolation: str
1323 :param implementation: Implementation to use ``['c', 'reference']``,
1324 defaults to ``'c'``.
1325 :type implementation: str
1327 :returns: GF trace at ``args``
1328 :rtype: :py:class:`~pyrocko.gf.store.GFTrace`
1329 '''
1331 store, decimate = self._decimated_store(decimate)
1332 if interpolation == 'nearest_neighbor':
1333 irecord = store.config.irecord(*args)
1334 tr = store._get(irecord, itmin, nsamples, decimate,
1335 implementation)
1337 elif interpolation == 'off':
1338 irecords, weights = store.config.vicinity(*args)
1339 if len(irecords) != 1:
1340 raise NotAllowedToInterpolate()
1341 else:
1342 tr = store._get(irecords[0], itmin, nsamples, decimate,
1343 implementation)
1345 elif interpolation == 'multilinear':
1346 tr = store._sum(irecords, num.zeros(len(irecords)), weights,
1347 itmin, nsamples, decimate, implementation,
1348 'disable')
1350 # to prevent problems with rounding errors (BaseStore saves deltat
1351 # as a 4-byte floating point value, value from YAML config is more
1352 # accurate)
1353 tr.deltat = self.config.deltat * decimate
1354 return tr
1356 def sum(self, args, delays, weights, itmin=None, nsamples=None,
1357 decimate=1, interpolation='nearest_neighbor', implementation='c',
1358 optimization='enable'):
1359 '''
1360 Sum delayed and weighted GF traces.
1362 Calculate sum of delayed and weighted GF traces. ``args`` is a tuple of
1363 arrays forming the (high-level) indices of the GF traces to be
1364 selected. Delays and weights for the summation are given in the arrays
1365 ``delays`` and ``weights``. ``itmin`` and ``nsamples`` can be given to
1366 restrict to computation to a given time interval. If ``decimate`` is
1367 an integer in the range [2,8], decimated traces are used in the
1368 summation.
1370 :param args: :py:class:`~pyrocko.gf.meta.Config` index tuple, e.g.
1371 ``(source_depth, distance, component)`` as in
1372 :py:class:`~pyrocko.gf.meta.ConfigTypeA`.
1373 :type args: tuple(numpy.ndarray)
1374 :param delays: Delay times
1375 :type delays: :py:class:`numpy.ndarray`
1376 :param weights: Trace weights
1377 :type weights: :py:class:`numpy.ndarray`
1378 :param itmin: Start time index (start time is ``itmin * dt``),
1379 defaults to None
1380 :type itmin: int or None
1381 :param nsamples: Number of samples, defaults to None
1382 :type nsamples: int or None
1383 :param decimate: Decimatation factor, defaults to 1
1384 :type decimate: int
1385 :param interpolation: Interpolation method
1386 ``['nearest_neighbor', 'multilinear', 'off']``, defaults to
1387 ``'nearest_neighbor'``
1388 :type interpolation: str
1389 :param implementation: Implementation to use,
1390 ``['c', 'alternative', 'reference']``, where ``'alternative'``
1391 and ``'reference'`` use a Python implementation, defaults to `'c'`
1392 :type implementation: str
1393 :param optimization: Optimization mode ``['enable', 'disable']``,
1394 defaults to ``'enable'``
1395 :type optimization: str
1396 :returns: Stacked GF trace.
1397 :rtype: :py:class:`~pyrocko.gf.store.GFTrace`
1398 '''
1400 store, decimate_ = self._decimated_store(decimate)
1402 if interpolation == 'nearest_neighbor':
1403 irecords = store.config.irecords(*args)
1404 else:
1405 assert interpolation == 'multilinear'
1406 irecords, ip_weights = store.config.vicinities(*args)
1407 neach = irecords.size // args[0].size
1408 weights = num.repeat(weights, neach) * ip_weights
1409 delays = num.repeat(delays, neach)
1411 tr = store._sum(irecords, delays, weights, itmin, nsamples, decimate_,
1412 implementation, optimization)
1414 # to prevent problems with rounding errors (BaseStore saves deltat
1415 # as a 4-byte floating point value, value from YAML config is more
1416 # accurate)
1417 tr.deltat = self.config.deltat * decimate
1418 return tr
1420 def make_decimated(self, decimate, config=None, force=False,
1421 show_progress=False):
1422 '''
1423 Create decimated version of GF store.
1425 Create a downsampled version of the GF store. Downsampling is done for
1426 the integer factor ``decimate`` which should be in the range [2,8]. If
1427 ``config`` is ``None``, all traces of the GF store are decimated and
1428 held available (i.e. the index mapping of the original store is used),
1429 otherwise, a different spacial stepping can be specified by giving a
1430 modified GF store configuration in ``config`` (see :py:meth:`create`).
1431 Decimated GF sub-stores are created under the ``decimated``
1432 subdirectory within the GF store directory. Holding available decimated
1433 versions of the GF store can save computation time, IO bandwidth, or
1434 decrease memory footprint at the cost of increased disk space usage,
1435 when computation are done for lower frequency signals.
1437 :param decimate: Decimate factor
1438 :type decimate: int
1439 :param config: GF store config object, defaults to None
1440 :type config: :py:class:`~pyrocko.gf.meta.Config` or None
1441 :param force: Force overwrite, defaults to ``False``
1442 :type force: bool
1443 :param show_progress: Show progress, defaults to ``False``
1444 :type show_progress: bool
1445 '''
1447 if not self._f_index:
1448 self.open()
1450 if not (2 <= decimate <= 8):
1451 raise StoreError('decimate argument must be in the range [2,8]')
1453 assert self.mode == 'r'
1455 if config is None:
1456 config = self.config
1458 config = copy.deepcopy(config)
1459 config.sample_rate = self.config.sample_rate / decimate
1461 if decimate in self._decimated:
1462 del self._decimated[decimate]
1464 store_dir = self._decimated_store_dir(decimate)
1465 if os.path.exists(store_dir):
1466 if force:
1467 shutil.rmtree(store_dir)
1468 else:
1469 raise CannotCreate('store already exists at %s' % store_dir)
1471 store_dir_incomplete = store_dir + '-incomplete'
1472 Store.create(store_dir_incomplete, config, force=force)
1474 decimated = Store(store_dir_incomplete, 'w')
1475 try:
1476 if show_progress:
1477 pbar = util.progressbar(
1478 'decimating store', self.config.nrecords)
1480 for i, args in enumerate(decimated.config.iter_nodes()):
1481 tr = self.get(args, decimate=decimate)
1482 decimated.put(args, tr)
1484 if show_progress:
1485 pbar.update(i+1)
1487 finally:
1488 if show_progress:
1489 pbar.finish()
1491 decimated.close()
1493 shutil.move(store_dir_incomplete, store_dir)
1495 self._decimated[decimate] = None
1497 def stats(self):
1498 stats = BaseStore.stats(self)
1499 stats['decimated'] = sorted(self._decimated.keys())
1500 return stats
1502 stats_keys = BaseStore.stats_keys + ['decimated']
1504 def check(self, show_progress=False):
1505 have_holes = []
1506 for pdef in self.config.tabulated_phases:
1507 phase_id = pdef.id
1508 ph = self.get_stored_phase(phase_id)
1509 if ph.check_holes():
1510 have_holes.append(phase_id)
1512 if have_holes:
1513 for phase_id in have_holes:
1514 logger.warning(
1515 'Travel time table of phase "{}" contains holes. You can '
1516 ' use `fomosto tttlsd` to fix holes.'.format(
1517 phase_id))
1518 else:
1519 logger.info('No holes in travel time tables')
1521 try:
1522 if show_progress:
1523 pbar = util.progressbar('checking store', self.config.nrecords)
1525 problems = 0
1526 for i, args in enumerate(self.config.iter_nodes()):
1527 tr = self.get(args)
1528 if tr and not tr.is_zero:
1529 if not tr.begin_value == tr.data[0]:
1530 logger.warning('wrong begin value for trace at %s '
1531 '(data corruption?)' % str(args))
1532 problems += 1
1533 if not tr.end_value == tr.data[-1]:
1534 logger.warning('wrong end value for trace at %s '
1535 '(data corruption?)' % str(args))
1536 problems += 1
1537 if not num.all(num.isfinite(tr.data)):
1538 logger.warning(
1539 'nans or infs in trace at %s' % str(args))
1540 problems += 1
1542 if show_progress:
1543 pbar.update(i+1)
1545 finally:
1546 if show_progress:
1547 pbar.finish()
1549 return problems
1551 def check_earthmodels(self, config):
1552 if config.earthmodel_receiver_1d.profile('z')[-1] not in\
1553 config.earthmodel_1d.profile('z'):
1554 logger.warning('deepest layer of earthmodel_receiver_1d not '
1555 'found in earthmodel_1d')
1557 def _decimated_store_dir(self, decimate):
1558 return os.path.join(self.store_dir, 'decimated', str(decimate))
1560 def _decimated_store(self, decimate):
1561 if decimate == 1 or decimate not in self._decimated:
1562 return self, decimate
1563 else:
1564 store = self._decimated[decimate]
1565 if store is None:
1566 store = Store(self._decimated_store_dir(decimate), 'r')
1567 self._decimated[decimate] = store
1569 return store, 1
1571 def phase_filename(self, phase_id, attribute='phase'):
1572 check_string_id(phase_id)
1573 return os.path.join(
1574 self.store_dir, 'phases', phase_id + '.%s' % attribute)
1576 def get_phase_identifier(self, phase_id, attribute):
1577 return '{}.{}'.format(phase_id, attribute)
1579 def get_stored_phase(self, phase_def, attribute='phase'):
1580 '''
1581 Get stored phase from GF store.
1583 :returns: Phase information
1584 :rtype: :py:class:`pyrocko.spit.SPTree`
1585 '''
1587 phase_id = self.get_phase_identifier(phase_def, attribute)
1588 if phase_id not in self._phases:
1589 fn = self.phase_filename(phase_def, attribute)
1590 if not os.path.isfile(fn):
1591 raise NoSuchPhase(phase_id)
1593 spt = spit.SPTree(filename=fn)
1594 self._phases[phase_id] = spt
1596 return self._phases[phase_id]
1598 def get_phase(self, phase_def, attributes=None):
1599 toks = phase_def.split(':', 1)
1600 if len(toks) == 2:
1601 provider, phase_def = toks
1602 else:
1603 provider, phase_def = 'stored', toks[0]
1605 if attributes and provider not in ['stored', 'cake', 'iaspei']:
1606 raise meta.TimingAttributeError(
1607 'Attributes (%s) not supported for provider \'%s\'.' % (
1608 ', '.join("'%s'" % attribute for attribute in attributes),
1609 provider))
1611 if provider == 'stored':
1612 if attributes is None:
1613 return self.get_stored_phase(phase_def)
1614 else:
1615 def evaluate(args):
1616 return tuple(
1617 self.get_stored_phase(phase_def, attribute)(args)
1618 for attribute in ['phase'] + attributes)
1620 return evaluate
1622 elif provider == 'vel':
1623 vel = float(phase_def) * 1000.
1625 def evaluate(args):
1626 return self.config.get_distance(args) / vel
1628 return evaluate
1630 elif provider == 'vel_surface':
1631 vel = float(phase_def) * 1000.
1633 def evaluate(args):
1634 return self.config.get_surface_distance(args) / vel
1636 return evaluate
1638 elif provider in ('cake', 'iaspei'):
1639 from pyrocko import cake
1640 mod = self.config.earthmodel_1d
1641 if provider == 'cake':
1642 phases = [cake.PhaseDef(phase_def)]
1643 else:
1644 phases = cake.PhaseDef.classic(phase_def)
1646 def evaluate(args):
1647 if len(args) == 2:
1648 zr, zs, x = (self.config.receiver_depth,) + args
1649 elif len(args) == 3:
1650 zr, zs, x = args
1651 else:
1652 assert False
1654 if phases:
1655 rays = mod.arrivals(
1656 phases=phases,
1657 distances=[x*cake.m2d],
1658 zstart=zs,
1659 zstop=zr)
1660 else:
1661 rays = []
1663 rays.sort(key=lambda ray: ray.t)
1665 if rays:
1666 if attributes is None:
1667 return rays[0].t
1668 else:
1669 try:
1670 return rays[0].t_and_attributes(attributes)
1671 except KeyError as e:
1672 raise meta.TimingAttributeError(
1673 'Attribute %s not supported for provider '
1674 '\'%s\'.' % (str(e), provider)) from None
1675 else:
1676 if attributes is None:
1677 return None
1678 else:
1679 return (None,) * (len(attributes) + 1)
1681 return evaluate
1683 raise StoreError('unsupported phase provider: %s' % provider)
1685 def t(self, timing, *args, attributes=None):
1686 '''
1687 Compute interpolated phase arrivals.
1689 **Examples:**
1691 If ``test_store`` is a :py:class:`Type A<pyrocko.gf.meta.ConfigTypeA>`
1692 store::
1694 test_store.t('stored:p', (1000, 10000))
1695 test_store.t('last{stored:P|stored:Pdiff}', (1000, 10000))
1696 # The latter arrival
1697 # of P or diffracted
1698 # P phase
1700 If ``test_store`` is a :py:class:`Type B<pyrocko.gf.meta.ConfigTypeB>`
1701 store::
1703 test_store.t('S', (1000, 1000, 10000))
1704 test_store.t('first{P|p|Pdiff|sP}', (1000, 1000, 10000))
1705 # The first arrival of
1706 # the given phases is
1707 # selected
1709 Independent of the store type, it is also possible to specify two
1710 location objects and the GF index tuple is calculated internally::
1712 test_store.t('p', source, target)
1714 :param timing: travel-time definition
1715 :type timing: str or :py:class:`~pyrocko.gf.meta.Timing`
1716 :param \\*args: if ``len(args) == 1``, ``args[0]`` must be a
1717 :py:class:`GF index tuple <pyrocko.gf.meta.Config>`, e.g.
1718 ``(source_depth, distance, component)`` for a
1719 :py:class:`Type A<pyrocko.gf.meta.ConfigTypeA>` store. If
1720 ``len(args) == 2``, two location objects are expected, e.g.
1721 ``(source, receiver)`` and the appropriate GF index is computed
1722 internally.
1723 :type \\*args: (:py:class:`tuple`,) or
1724 (:py:class:`~pyrocko.model.Location`,
1725 :py:class:`~pyrocko.model.Location`)
1727 :param attributes: additional attributes to return along with the time.
1728 Requires the attribute to be either stored or it must be supported
1729 by the phase calculation backend. E.g. ``['takeoff_angle']``.
1730 :type attributes: :py:class:`list` of :py:class:`str`
1732 :returns: Phase arrival according to ``timing``
1733 :rtype: float or None
1734 '''
1736 if len(args) == 1:
1737 args = args[0]
1738 else:
1739 args = self.config.make_indexing_args1(*args)
1741 if not isinstance(timing, meta.Timing):
1742 timing = meta.Timing(timing)
1744 return timing.evaluate(self.get_phase, args, attributes=attributes)
1746 def get_available_interpolation_tables(self):
1747 filenames = glob(op.join(self.store_dir, 'phases', '*'))
1748 return [op.basename(file) for file in filenames]
1750 def get_stored_attribute(self, phase_def, attribute, *args):
1751 '''
1752 Return interpolated store attribute
1754 :param attribute: takeoff_angle / incidence_angle [deg]
1755 :type attribute: str
1756 :param \\*args: :py:class:`~pyrocko.gf.meta.Config` index tuple, e.g.
1757 ``(source_depth, distance, component)`` as in
1758 :py:class:`~pyrocko.gf.meta.ConfigTypeA`.
1759 :type \\*args: tuple
1760 '''
1761 try:
1762 return self.get_stored_phase(phase_def, attribute)(*args)
1763 except NoSuchPhase:
1764 raise StoreError(
1765 'Interpolation table for {} of {} does not exist! '
1766 'Available tables: {}'.format(
1767 attribute, phase_def,
1768 self.get_available_interpolation_tables()))
1770 def get_many_stored_attributes(self, phase_def, attribute, coords):
1771 '''
1772 Return interpolated store attribute
1774 :param attribute: takeoff_angle / incidence_angle [deg]
1775 :type attribute: str
1776 :param \\coords: :py:class:`num.array.Array`, with columns being
1777 ``(source_depth, distance, component)`` as in
1778 :py:class:`~pyrocko.gf.meta.ConfigTypeA`.
1779 :type \\coords: :py:class:`num.array.Array`
1780 '''
1781 try:
1782 return self.get_stored_phase(
1783 phase_def, attribute).interpolate_many(coords)
1784 except NoSuchPhase:
1785 raise StoreError(
1786 'Interpolation table for {} of {} does not exist! '
1787 'Available tables: {}'.format(
1788 attribute, phase_def,
1789 self.get_available_interpolation_tables()))
1791 def make_stored_table(self, attribute, force=False):
1792 '''
1793 Compute tables for selected ray attributes.
1795 :param attribute: phase / takeoff_angle [deg]/ incidence_angle [deg]
1796 :type attribute: str
1798 Tables are computed using the 1D earth model defined in
1799 :py:attr:`~pyrocko.gf.meta.Config.earthmodel_1d` for each defined phase
1800 in :py:attr:`~pyrocko.gf.meta.Config.tabulated_phases`.
1801 '''
1803 if attribute not in available_stored_tables:
1804 raise StoreError(
1805 'Cannot create attribute table for {}! '
1806 'Supported attribute tables: {}'.format(
1807 attribute, available_stored_tables))
1809 from pyrocko import cake
1810 config = self.config
1812 if not config.tabulated_phases:
1813 return
1815 mod = config.earthmodel_1d
1817 if not mod:
1818 raise StoreError('no earth model found')
1820 if config.earthmodel_receiver_1d:
1821 self.check_earthmodels(config)
1823 for pdef in config.tabulated_phases:
1825 phase_id = pdef.id
1826 phases = pdef.phases
1828 if attribute == 'phase':
1829 ftol = config.deltat * 0.5
1830 horvels = pdef.horizontal_velocities
1831 else:
1832 ftol = config.deltat * 0.01
1834 fn = self.phase_filename(phase_id, attribute)
1836 if os.path.exists(fn) and not force:
1837 logger.info('file already exists: %s' % fn)
1838 continue
1840 def evaluate(args):
1842 nargs = len(args)
1843 if nargs == 2:
1844 receiver_depth, source_depth, distance = (
1845 config.receiver_depth,) + args
1846 elif nargs == 3:
1847 receiver_depth, source_depth, distance = args
1848 else:
1849 raise ValueError(
1850 'Number of input arguments %i is not supported!'
1851 'Supported number of arguments: 2 or 3' % nargs)
1853 ray_attribute_values = []
1854 arrival_times = []
1855 if phases:
1856 rays = mod.arrivals(
1857 phases=phases,
1858 distances=[distance * cake.m2d],
1859 zstart=source_depth,
1860 zstop=receiver_depth)
1862 for ray in rays:
1863 arrival_times.append(ray.t)
1864 if attribute != 'phase':
1865 ray_attribute_values.append(
1866 getattr(ray, attribute)())
1868 if attribute == 'phase':
1869 for v in horvels:
1870 arrival_times.append(distance / (v * km))
1872 if arrival_times:
1873 if attribute == 'phase':
1874 return min(arrival_times)
1875 else:
1876 earliest_idx = num.argmin(arrival_times)
1877 return ray_attribute_values[earliest_idx]
1878 else:
1879 return None
1881 logger.info(
1882 'making "%s" table for phasegroup "%s"', attribute, phase_id)
1884 ip = spit.SPTree(
1885 f=evaluate,
1886 ftol=ftol,
1887 xbounds=num.transpose((config.mins, config.maxs)),
1888 xtols=config.deltas)
1890 util.ensuredirs(fn)
1891 ip.dump(fn)
1893 def make_timing_params(self, begin, end, snap_vred=True, force=False):
1894 '''
1895 Compute tight parameterized time ranges to include given timings.
1897 Calculates appropriate time ranges to cover given begin and end timings
1898 over all GF points in the store. A dict with the following keys is
1899 returned:
1901 * ``'tmin'``: time [s], minimum of begin timing over all GF points
1902 * ``'tmax'``: time [s], maximum of end timing over all GF points
1903 * ``'vred'``, ``'tmin_vred'``: slope [m/s] and offset [s] of reduction
1904 velocity [m/s] appropriate to catch begin timing over all GF points
1905 * ``'tlenmax_vred'``: maximum time length needed to cover all end
1906 timings, when using linear slope given with (``vred``, ``tmin_vred``)
1907 as start
1908 '''
1910 data = []
1911 warned = set()
1912 for args in self.config.iter_nodes(level=-1):
1913 tmin = self.t(begin, args)
1914 tmax = self.t(end, args)
1915 if tmin is None:
1916 warned.add(str(begin))
1917 if tmax is None:
1918 warned.add(str(end))
1920 x = self.config.get_surface_distance(args)
1921 data.append((x, tmin, tmax))
1923 if len(warned):
1924 w = ' | '.join(list(warned))
1925 msg = '''determination of time window failed using phase
1926definitions: %s.\n Travel time table contains holes in probed ranges. You can
1927use `fomosto tttlsd` to fix holes.''' % w
1928 if force:
1929 logger.warning(msg)
1930 else:
1931 raise MakeTimingParamsFailed(msg)
1933 xs, tmins, tmaxs = num.array(data, dtype=float).T
1935 tlens = tmaxs - tmins
1937 i = num.nanargmin(tmins)
1938 if not num.isfinite(i):
1939 raise MakeTimingParamsFailed('determination of time window failed')
1941 tminmin = tmins[i]
1942 x_tminmin = xs[i]
1943 dx = (xs - x_tminmin)
1944 dx = num.where(dx != 0.0, dx, num.nan)
1945 s = (tmins - tminmin) / dx
1946 sred = num.min(num.abs(s[num.isfinite(s)]))
1948 deltax = self.config.distance_delta
1950 if snap_vred:
1951 tdif = sred*deltax
1952 tdif2 = self.config.deltat * math.floor(tdif / self.config.deltat)
1953 sred = tdif2/self.config.distance_delta
1955 tmin_vred = tminmin - sred*x_tminmin
1956 if snap_vred:
1957 xe = x_tminmin - int(x_tminmin / deltax) * deltax
1958 tmin_vred = float(
1959 self.config.deltat *
1960 math.floor(tmin_vred / self.config.deltat) - xe * sred)
1962 tlenmax_vred = num.nanmax(tmax - (tmin_vred + sred*x))
1963 if sred != 0.0:
1964 vred = 1.0/sred
1965 else:
1966 vred = 0.0
1968 return dict(
1969 tmin=tminmin,
1970 tmax=num.nanmax(tmaxs),
1971 tlenmax=num.nanmax(tlens),
1972 tmin_vred=tmin_vred,
1973 tlenmax_vred=tlenmax_vred,
1974 vred=vred)
1976 def make_travel_time_tables(self, force=False):
1977 '''
1978 Compute travel time tables.
1980 Travel time tables are computed using the 1D earth model defined in
1981 :py:attr:`~pyrocko.gf.meta.Config.earthmodel_1d` for each defined phase
1982 in :py:attr:`~pyrocko.gf.meta.Config.tabulated_phases`. The accuracy of
1983 the tablulated times is adjusted to the sampling rate of the store.
1984 '''
1985 self.make_stored_table(attribute='phase', force=force)
1987 def make_ttt(self, force=False):
1988 self.make_travel_time_tables(force=force)
1990 def make_takeoff_angle_tables(self, force=False):
1991 '''
1992 Compute takeoff-angle tables.
1994 Takeoff-angle tables [deg] are computed using the 1D earth model
1995 defined in :py:attr:`~pyrocko.gf.meta.Config.earthmodel_1d` for each
1996 defined phase in :py:attr:`~pyrocko.gf.meta.Config.tabulated_phases`.
1997 The accuracy of the tablulated times is adjusted to 0.01 times the
1998 sampling rate of the store.
1999 '''
2000 self.make_stored_table(attribute='takeoff_angle', force=force)
2002 def make_incidence_angle_tables(self, force=False):
2003 '''
2004 Compute incidence-angle tables.
2006 Incidence-angle tables [deg] are computed using the 1D earth model
2007 defined in :py:attr:`~pyrocko.gf.meta.Config.earthmodel_1d` for each
2008 defined phase in :py:attr:`~pyrocko.gf.meta.Config.tabulated_phases`.
2009 The accuracy of the tablulated times is adjusted to 0.01 times the
2010 sampling rate of the store.
2011 '''
2012 self.make_stored_table(attribute='incidence_angle', force=force)
2014 def get_provided_components(self):
2016 scheme_desc = meta.component_scheme_to_description[
2017 self.config.component_scheme]
2019 quantity = self.config.stored_quantity \
2020 or scheme_desc.default_stored_quantity
2022 if not quantity:
2023 return scheme_desc.provided_components
2024 else:
2025 return [
2026 quantity + '.' + comp
2027 for comp in scheme_desc.provided_components]
2029 def fix_ttt_holes(self, phase_id):
2031 pdef = self.config.get_tabulated_phase(phase_id)
2032 mode = None
2033 for phase in pdef.phases:
2034 for leg in phase.legs():
2035 if mode is None:
2036 mode = leg.mode
2038 else:
2039 if mode != leg.mode:
2040 raise StoreError(
2041 'Can only fix holes in pure P or pure S phases.')
2043 sptree = self.get_stored_phase(phase_id)
2044 sptree_lsd = self.config.fix_ttt_holes(sptree, mode)
2046 phase_lsd = phase_id + '.lsd'
2047 fn = self.phase_filename(phase_lsd)
2048 sptree_lsd.dump(fn)
2050 def statics(self, source, multi_location, itsnapshot, components,
2051 interpolation='nearest_neighbor', nthreads=0):
2052 if not self._f_index:
2053 self.open()
2055 out = {}
2056 ntargets = multi_location.ntargets
2057 source_terms = source.get_source_terms(self.config.component_scheme)
2058 # TODO: deal with delays for snapshots > 1 sample
2060 if itsnapshot is not None:
2061 delays = source.times
2063 # Fringe case where we sample at sample 0 and sample 1
2064 tsnapshot = itsnapshot * self.config.deltat
2065 if delays.max() == tsnapshot and delays.min() != tsnapshot:
2066 delays[delays == delays.max()] -= self.config.deltat
2068 else:
2069 delays = source.times * 0
2070 itsnapshot = 1
2072 if ntargets == 0:
2073 raise StoreError('MultiLocation.coords5 is empty')
2075 res = store_ext.store_calc_static(
2076 self.cstore,
2077 source.coords5(),
2078 source_terms,
2079 delays,
2080 multi_location.coords5,
2081 self.config.component_scheme,
2082 interpolation,
2083 itsnapshot,
2084 nthreads)
2086 out = {}
2087 for icomp, (comp, comp_res) in enumerate(
2088 zip(self.get_provided_components(), res)):
2089 if comp not in components:
2090 continue
2091 out[comp] = res[icomp]
2093 return out
2095 def calc_seismograms(self, source, receivers, components, deltat=None,
2096 itmin=None, nsamples=None,
2097 interpolation='nearest_neighbor',
2098 optimization='enable', nthreads=1):
2099 config = self.config
2101 assert interpolation in ['nearest_neighbor', 'multilinear'], \
2102 'Unknown interpolation: %s' % interpolation
2104 if not isinstance(receivers, list):
2105 receivers = [receivers]
2107 if deltat is None:
2108 decimate = 1
2109 else:
2110 decimate = int(round(deltat/config.deltat))
2111 if abs(deltat / (decimate * config.deltat) - 1.0) > 0.001:
2112 raise StoreError(
2113 'unavailable decimation ratio target.deltat / store.deltat'
2114 ' = %g / %g' % (deltat, config.deltat))
2116 store, decimate_ = self._decimated_store(decimate)
2118 if not store._f_index:
2119 store.open()
2121 scheme = config.component_scheme
2123 source_coords_arr = source.coords5()
2124 source_terms = source.get_source_terms(scheme)
2125 delays = source.times.ravel()
2127 nreceiver = len(receivers)
2128 receiver_coords_arr = num.empty((nreceiver, 5))
2129 for irec, rec in enumerate(receivers):
2130 receiver_coords_arr[irec, :] = rec.coords5
2132 dt = self.get_deltat()
2134 itoffset = int(num.floor(delays.min()/dt)) if delays.size != 0 else 0
2136 if itmin is None:
2137 itmin = num.zeros(nreceiver, dtype=num.int32)
2138 else:
2139 itmin = (itmin-itoffset).astype(num.int32)
2141 if nsamples is None:
2142 nsamples = num.zeros(nreceiver, dtype=num.int32) - 1
2143 else:
2144 nsamples = nsamples.astype(num.int32)
2146 try:
2147 results = store_ext.store_calc_timeseries(
2148 store.cstore,
2149 source_coords_arr,
2150 source_terms,
2151 (delays - itoffset*dt),
2152 receiver_coords_arr,
2153 scheme,
2154 interpolation,
2155 itmin,
2156 nsamples,
2157 nthreads)
2158 except Exception as e:
2159 raise e
2161 provided_components = self.get_provided_components()
2162 ncomponents = len(provided_components)
2164 seismograms = [dict() for _ in range(nreceiver)]
2165 for ires in range(len(results)):
2166 res = results.pop(0)
2167 ireceiver = ires // ncomponents
2169 comp = provided_components[res[-2]]
2171 if comp not in components:
2172 continue
2174 tr = GFTrace(*res[:-2])
2175 tr.deltat = config.deltat * decimate
2176 tr.itmin += itoffset
2178 tr.n_records_stacked = 0
2179 tr.t_optimize = 0.
2180 tr.t_stack = 0.
2181 tr.err = res[-1]
2183 seismograms[ireceiver][comp] = tr
2185 return seismograms
2187 def seismogram(self, source, receiver, components, deltat=None,
2188 itmin=None, nsamples=None,
2189 interpolation='nearest_neighbor',
2190 optimization='enable', nthreads=1):
2192 config = self.config
2194 if deltat is None:
2195 decimate = 1
2196 else:
2197 decimate = int(round(deltat/config.deltat))
2198 if abs(deltat / (decimate * config.deltat) - 1.0) > 0.001:
2199 raise StoreError(
2200 'unavailable decimation ratio target.deltat / store.deltat'
2201 ' = %g / %g' % (deltat, config.deltat))
2203 store, decimate_ = self._decimated_store(decimate)
2205 if not store._f_index:
2206 store.open()
2208 scheme = config.component_scheme
2210 source_coords_arr = source.coords5()
2211 source_terms = source.get_source_terms(scheme)
2212 receiver_coords_arr = receiver.coords5[num.newaxis, :].copy()
2214 try:
2215 params = store_ext.make_sum_params(
2216 store.cstore,
2217 source_coords_arr,
2218 source_terms,
2219 receiver_coords_arr,
2220 scheme,
2221 interpolation, nthreads)
2223 except store_ext.StoreExtError:
2224 raise meta.OutOfBounds()
2226 provided_components = self.get_provided_components()
2228 out = {}
2229 for icomp, comp in enumerate(provided_components):
2230 if comp in components:
2231 weights, irecords = params[icomp]
2233 neach = irecords.size // source.times.size
2234 delays = num.repeat(source.times, neach)
2236 tr = store._sum(
2237 irecords, delays, weights, itmin, nsamples, decimate_,
2238 'c', optimization)
2240 # to prevent problems with rounding errors (BaseStore saves
2241 # deltat as a 4-byte floating point value, value from YAML
2242 # config is more accurate)
2243 tr.deltat = config.deltat * decimate
2245 out[comp] = tr
2247 return out
2250__all__ = '''
2251gf_dtype
2252NotMultipleOfSamplingInterval
2253GFTrace
2254CannotCreate
2255CannotOpen
2256DuplicateInsert
2257NotAllowedToInterpolate
2258NoSuchExtra
2259NoSuchPhase
2260BaseStore
2261Store
2262SeismosizerErrorEnum
2263'''.split()