1# http://pyrocko.org - GPLv3
2#
3# The Pyrocko Developers, 21st Century
4# ---|P------/S----------~Lg----------
6import errno
7import time
8import os
9import struct
10import math
11import shutil
12try:
13 import fcntl
14except ImportError:
15 fcntl = None
16import copy
17import logging
18import re
19import hashlib
20from glob import glob
22import numpy as num
23from scipy import signal
25from . import meta
26from .error import StoreError
27try:
28 from . import store_ext
29except ImportError:
30 store_ext = None
31from pyrocko import util, spit
33logger = logging.getLogger('pyrocko.gf.store')
34op = os.path
36# gf store endianness
37E = '<'
39gf_dtype = num.dtype(num.float32)
40gf_dtype_store = num.dtype(E + 'f4')
42gf_dtype_nbytes_per_sample = 4
44gf_store_header_dtype = [
45 ('nrecords', E + 'u8'),
46 ('deltat', E + 'f4'),
47]
49gf_store_header_fmt = E + 'Qf'
50gf_store_header_fmt_size = struct.calcsize(gf_store_header_fmt)
52gf_record_dtype = num.dtype([
53 ('data_offset', E + 'u8'),
54 ('itmin', E + 'i4'),
55 ('nsamples', E + 'u4'),
56 ('begin_value', E + 'f4'),
57 ('end_value', E + 'f4'),
58])
60available_stored_tables = ['phase', 'takeoff_angle', 'incidence_angle']
62km = 1000.
65class SeismosizerErrorEnum:
66 SUCCESS = 0
67 INVALID_RECORD = 1
68 EMPTY_RECORD = 2
69 BAD_RECORD = 3
70 ALLOC_FAILED = 4
71 BAD_REQUEST = 5
72 BAD_DATA_OFFSET = 6
73 READ_DATA_FAILED = 7
74 SEEK_INDEX_FAILED = 8
75 READ_INDEX_FAILED = 9
76 FSTAT_TRACES_FAILED = 10
77 BAD_STORE = 11
78 MMAP_INDEX_FAILED = 12
79 MMAP_TRACES_FAILED = 13
80 INDEX_OUT_OF_BOUNDS = 14
81 NTARGETS_OUT_OF_BOUNDS = 15
84def valid_string_id(s):
85 return re.match(meta.StringID.pattern, s)
88def check_string_id(s):
89 if not valid_string_id(s):
90 raise ValueError('invalid name %s' % s)
92# - data_offset
93#
94# Data file offset of first sample in bytes (the seek value).
95# Special values:
96#
97# 0x00 - missing trace
98# 0x01 - zero trace (GF trace is physically zero)
99# 0x02 - short trace of 1 or 2 samples (no need for data allocation)
100#
101# - itmin
102#
103# Time of first sample of the trace as a multiple of the sampling interval. All
104# GF samples must be evaluated at multiples of the sampling interval with
105# respect to a global reference time zero. Must be set also for zero traces.
106#
107# - nsamples
108#
109# Number of samples in the GF trace. Should be set to zero for zero traces.
110#
111# - begin_value, end_value
112#
113# Values of first and last sample. These values are included in data[]
114# redunantly.
117class NotMultipleOfSamplingInterval(Exception):
118 pass
121sampling_check_eps = 1e-5
124class GFTrace(object):
125 '''
126 Green's Function trace class for handling traces from the GF store.
127 '''
129 @classmethod
130 def from_trace(cls, tr):
131 return cls(data=tr.ydata.copy(), tmin=tr.tmin, deltat=tr.deltat)
133 def __init__(self, data=None, itmin=None, deltat=1.0,
134 is_zero=False, begin_value=None, end_value=None, tmin=None):
136 assert sum((x is None) for x in (tmin, itmin)) == 1, \
137 'GFTrace: either tmin or itmin must be given'\
139 if tmin is not None:
140 itmin = int(round(tmin / deltat))
141 if abs(itmin*deltat - tmin) > sampling_check_eps*deltat:
142 raise NotMultipleOfSamplingInterval(
143 'GFTrace: tmin (%g) is not a multiple of sampling '
144 'interval (%g)' % (tmin, deltat))
146 if data is not None:
147 data = num.asarray(data, dtype=gf_dtype)
148 else:
149 data = num.array([], dtype=gf_dtype)
150 begin_value = 0.0
151 end_value = 0.0
153 self.data = data
154 self.itmin = itmin
155 self.deltat = deltat
156 self.is_zero = is_zero
157 self.n_records_stacked = 0.
158 self.t_stack = 0.
159 self.t_optimize = 0.
160 self.err = SeismosizerErrorEnum.SUCCESS
162 if data is not None and data.size > 0:
163 if begin_value is None:
164 begin_value = data[0]
165 if end_value is None:
166 end_value = data[-1]
168 self.begin_value = begin_value
169 self.end_value = end_value
171 @property
172 def t(self):
173 '''
174 Time vector of the GF trace.
176 :returns: Time vector
177 :rtype: :py:class:`numpy.ndarray`
178 '''
179 return num.linspace(
180 self.itmin*self.deltat,
181 (self.itmin+self.data.size-1)*self.deltat, self.data.size)
183 def __str__(self, itmin=0):
185 if self.is_zero:
186 return 'ZERO'
188 s = []
189 for i in range(itmin, self.itmin + self.data.size):
190 if i >= self.itmin and i < self.itmin + self.data.size:
191 s.append('%7.4g' % self.data[i-self.itmin])
192 else:
193 s.append(' '*7)
195 return '|'.join(s)
197 def to_trace(self, net, sta, loc, cha):
198 from pyrocko import trace
199 return trace.Trace(
200 net, sta, loc, cha,
201 ydata=self.data,
202 deltat=self.deltat,
203 tmin=self.itmin*self.deltat)
206class GFValue(object):
208 def __init__(self, value):
209 self.value = value
210 self.n_records_stacked = 0.
211 self.t_stack = 0.
212 self.t_optimize = 0.
215Zero = GFTrace(is_zero=True, itmin=0)
218def make_same_span(tracesdict):
220 traces = tracesdict.values()
222 nonzero = [tr for tr in traces if not tr.is_zero]
223 if not nonzero:
224 return {k: Zero for k in tracesdict.keys()}
226 deltat = nonzero[0].deltat
228 itmin = min(tr.itmin for tr in nonzero)
229 itmax = max(tr.itmin+tr.data.size for tr in nonzero) - 1
231 out = {}
232 for k, tr in tracesdict.items():
233 n = itmax - itmin + 1
234 if tr.itmin != itmin or tr.data.size != n:
235 data = num.zeros(n, dtype=gf_dtype)
236 if not tr.is_zero:
237 lo = tr.itmin-itmin
238 hi = lo + tr.data.size
239 data[:lo] = tr.data[0]
240 data[lo:hi] = tr.data
241 data[hi:] = tr.data[-1]
243 tr = GFTrace(data, itmin, deltat)
245 out[k] = tr
247 return out
250class CannotCreate(StoreError):
251 pass
254class CannotOpen(StoreError):
255 pass
258class DuplicateInsert(StoreError):
259 pass
262class ShortRead(StoreError):
263 def __str__(self):
264 return 'unexpected end of data (truncated traces file?)'
267class NotAllowedToInterpolate(StoreError):
268 def __str__(self):
269 return 'not allowed to interpolate'
272class NoSuchExtra(StoreError):
273 def __init__(self, s):
274 StoreError.__init__(self)
275 self.value = s
277 def __str__(self):
278 return 'extra information for key "%s" not found.' % self.value
281class NoSuchPhase(StoreError):
282 def __init__(self, s):
283 StoreError.__init__(self)
284 self.value = s
286 def __str__(self):
287 return 'phase for key "%s" not found. ' \
288 'Running "fomosto ttt" may be needed.' % self.value
291def remove_if_exists(fn, force=False):
292 if os.path.exists(fn):
293 if force:
294 os.remove(fn)
295 else:
296 raise CannotCreate('file %s already exists' % fn)
299def get_extra_path(store_dir, key):
300 check_string_id(key)
301 return os.path.join(store_dir, 'extra', key)
304class BaseStore(object):
306 @staticmethod
307 def lock_fn_(store_dir):
308 return os.path.join(store_dir, 'lock')
310 @staticmethod
311 def index_fn_(store_dir):
312 return os.path.join(store_dir, 'index')
314 @staticmethod
315 def data_fn_(store_dir):
316 return os.path.join(store_dir, 'traces')
318 @staticmethod
319 def config_fn_(store_dir):
320 return os.path.join(store_dir, 'config')
322 @staticmethod
323 def create(store_dir, deltat, nrecords, force=False):
325 try:
326 util.ensuredir(store_dir)
327 except Exception:
328 raise CannotCreate('cannot create directory %s' % store_dir)
330 index_fn = BaseStore.index_fn_(store_dir)
331 data_fn = BaseStore.data_fn_(store_dir)
333 for fn in (index_fn, data_fn):
334 remove_if_exists(fn, force)
336 with open(index_fn, 'wb') as f:
337 f.write(struct.pack(gf_store_header_fmt, nrecords, deltat))
338 records = num.zeros(nrecords, dtype=gf_record_dtype)
339 records.tofile(f)
341 with open(data_fn, 'wb') as f:
342 f.write(b'\0' * 32)
344 def __init__(self, store_dir, mode='r', use_memmap=True):
345 assert mode in 'rw'
346 self.store_dir = store_dir
347 self.mode = mode
348 self._use_memmap = use_memmap
349 self._nrecords = None
350 self._deltat = None
351 self._f_index = None
352 self._f_data = None
353 self._records = None
354 self.cstore = None
356 def open(self):
357 assert self._f_index is None
358 index_fn = self.index_fn()
359 data_fn = self.data_fn()
361 if self.mode == 'r':
362 fmode = 'rb'
363 elif self.mode == 'w':
364 fmode = 'r+b'
365 else:
366 assert False, 'invalid mode: %s' % self.mode
368 try:
369 self._f_index = open(index_fn, fmode)
370 self._f_data = open(data_fn, fmode)
371 except Exception:
372 self.mode = ''
373 raise CannotOpen('cannot open gf store: %s' % self.store_dir)
375 try:
376 # replace single precision deltat value in store with the double
377 # precision value from the config, if available
378 self.cstore = store_ext.store_init(
379 self._f_index.fileno(), self._f_data.fileno(),
380 self.get_deltat() or 0.0)
382 except store_ext.StoreExtError as e:
383 raise StoreError(str(e))
385 while True:
386 try:
387 dataheader = self._f_index.read(gf_store_header_fmt_size)
388 break
390 except IOError as e:
391 # occasionally got this one on an NFS volume
392 if e.errno == errno.EBUSY:
393 time.sleep(0.01)
394 else:
395 raise
397 nrecords, deltat = struct.unpack(gf_store_header_fmt, dataheader)
398 self._nrecords = nrecords
399 self._deltat = deltat
401 self._load_index()
403 def __del__(self):
404 if self.mode != '':
405 self.close()
407 def lock(self):
408 if not fcntl:
409 lock_fn = self.lock_fn()
410 util.create_lockfile(lock_fn)
411 else:
412 if not self._f_index:
413 self.open()
415 while True:
416 try:
417 fcntl.lockf(self._f_index, fcntl.LOCK_EX)
418 break
420 except IOError as e:
421 if e.errno == errno.ENOLCK:
422 time.sleep(0.01)
423 else:
424 raise
426 def unlock(self):
427 if not fcntl:
428 lock_fn = self.lock_fn()
429 util.delete_lockfile(lock_fn)
430 else:
431 self._f_data.flush()
432 self._f_index.flush()
433 fcntl.lockf(self._f_index, fcntl.LOCK_UN)
435 def put(self, irecord, trace):
436 self._put(irecord, trace)
438 def get_record(self, irecord):
439 return self._get_record(irecord)
441 def get_span(self, irecord, decimate=1):
442 return self._get_span(irecord, decimate=decimate)
444 def get(self, irecord, itmin=None, nsamples=None, decimate=1,
445 implementation='c'):
446 return self._get(irecord, itmin, nsamples, decimate, implementation)
448 def sum(self, irecords, delays, weights, itmin=None,
449 nsamples=None, decimate=1, implementation='c',
450 optimization='enable'):
451 return self._sum(irecords, delays, weights, itmin, nsamples, decimate,
452 implementation, optimization)
454 def irecord_format(self):
455 return util.zfmt(self._nrecords)
457 def str_irecord(self, irecord):
458 return self.irecord_format() % irecord
460 def close(self):
461 if self.mode == 'w':
462 if not self._f_index:
463 self.open()
464 self._save_index()
466 if self._f_data:
467 self._f_data.close()
468 self._f_data = None
470 if self._f_index:
471 self._f_index.close()
472 self._f_index = None
474 del self._records
475 self._records = None
477 self.mode = ''
479 def _get_record(self, irecord):
480 if not self._f_index:
481 self.open()
483 return self._records[irecord]
485 def _get(self, irecord, itmin, nsamples, decimate, implementation):
486 '''
487 Retrieve complete GF trace from storage.
488 '''
490 if not self._f_index:
491 self.open()
493 if not self.mode == 'r':
494 raise StoreError('store not open in read mode')
496 if implementation == 'c' and decimate == 1:
498 if nsamples is None:
499 nsamples = -1
501 if itmin is None:
502 itmin = 0
504 try:
505 return GFTrace(*store_ext.store_get(
506 self.cstore, int(irecord), int(itmin), int(nsamples)))
507 except store_ext.StoreExtError as e:
508 raise StoreError(str(e))
510 else:
511 return self._get_impl_reference(irecord, itmin, nsamples, decimate)
513 def get_deltat(self):
514 return self._deltat
516 def _get_impl_reference(self, irecord, itmin, nsamples, decimate):
517 deltat = self.get_deltat()
519 if not (0 <= irecord < self._nrecords):
520 raise StoreError('invalid record number requested '
521 '(irecord = %i, nrecords = %i)' %
522 (irecord, self._nrecords))
524 ipos, itmin_data, nsamples_data, begin_value, end_value = \
525 self._records[irecord]
527 if None in (itmin, nsamples):
528 itmin = itmin_data
529 itmax = itmin_data + nsamples_data - 1
530 nsamples = nsamples_data
531 else:
532 itmin = itmin * decimate
533 nsamples = nsamples * decimate
534 itmax = itmin + nsamples - decimate
536 if ipos == 0:
537 return None
539 elif ipos == 1:
540 itmin_ext = (max(itmin, itmin_data)//decimate) * decimate
541 return GFTrace(is_zero=True, itmin=itmin_ext//decimate)
543 if decimate == 1:
544 ilo = max(itmin, itmin_data) - itmin_data
545 ihi = min(itmin+nsamples, itmin_data+nsamples_data) - itmin_data
546 data = self._get_data(ipos, begin_value, end_value, ilo, ihi)
548 return GFTrace(data, itmin=itmin_data+ilo, deltat=deltat,
549 begin_value=begin_value, end_value=end_value)
551 else:
552 itmax_data = itmin_data + nsamples_data - 1
554 # put begin and end to multiples of new sampling rate
555 itmin_ext = (max(itmin, itmin_data)//decimate) * decimate
556 itmax_ext = -((-min(itmax, itmax_data))//decimate) * decimate
557 nsamples_ext = itmax_ext - itmin_ext + 1
559 # add some padding for the aa filter
560 order = 30
561 itmin_ext_pad = itmin_ext - order//2
562 itmax_ext_pad = itmax_ext + order//2
563 nsamples_ext_pad = itmax_ext_pad - itmin_ext_pad + 1
565 itmin_overlap = max(itmin_data, itmin_ext_pad)
566 itmax_overlap = min(itmax_data, itmax_ext_pad)
568 ilo = itmin_overlap - itmin_ext_pad
569 ihi = max(ilo, itmax_overlap - itmin_ext_pad + 1)
570 ilo_data = itmin_overlap - itmin_data
571 ihi_data = max(ilo_data, itmax_overlap - itmin_data + 1)
573 data_ext_pad = num.empty(nsamples_ext_pad, dtype=gf_dtype)
574 data_ext_pad[ilo:ihi] = self._get_data(
575 ipos, begin_value, end_value, ilo_data, ihi_data)
577 data_ext_pad[:ilo] = begin_value
578 data_ext_pad[ihi:] = end_value
580 b = signal.firwin(order + 1, 1. / decimate, window='hamming')
581 a = 1.
582 data_filt_pad = signal.lfilter(b, a, data_ext_pad)
583 data_deci = data_filt_pad[order:order+nsamples_ext:decimate]
584 if data_deci.size >= 1:
585 if itmin_ext <= itmin_data:
586 data_deci[0] = begin_value
588 if itmax_ext >= itmax_data:
589 data_deci[-1] = end_value
591 return GFTrace(data_deci, itmin_ext//decimate,
592 deltat*decimate,
593 begin_value=begin_value, end_value=end_value)
595 def _get_span(self, irecord, decimate=1):
596 '''
597 Get temporal extent of GF trace at given index.
598 '''
600 if not self._f_index:
601 self.open()
603 assert 0 <= irecord < self._nrecords, \
604 'irecord = %i, nrecords = %i' % (irecord, self._nrecords)
606 (_, itmin, nsamples, _, _) = self._records[irecord]
608 itmax = itmin + nsamples - 1
610 if decimate == 1:
611 return itmin, itmax
612 else:
613 if nsamples == 0:
614 return itmin//decimate, itmin//decimate - 1
615 else:
616 return itmin//decimate, -((-itmax)//decimate)
618 def _put(self, irecord, trace):
619 '''
620 Save GF trace to storage.
621 '''
623 if not self._f_index:
624 self.open()
626 deltat = self.get_deltat()
628 assert self.mode == 'w'
629 assert trace.is_zero or \
630 abs(trace.deltat - deltat) < 1e-7 * deltat
631 assert 0 <= irecord < self._nrecords, \
632 'irecord = %i, nrecords = %i' % (irecord, self._nrecords)
634 if self._records[irecord][0] != 0:
635 raise DuplicateInsert('record %i already in store' % irecord)
637 if trace.is_zero or num.all(trace.data == 0.0):
638 self._records[irecord] = (1, trace.itmin, 0, 0., 0.)
639 return
641 ndata = trace.data.size
643 if ndata > 2:
644 self._f_data.seek(0, 2)
645 ipos = self._f_data.tell()
646 trace.data.astype(gf_dtype_store).tofile(self._f_data)
647 else:
648 ipos = 2
650 self._records[irecord] = (ipos, trace.itmin, ndata,
651 trace.data[0], trace.data[-1])
653 def _sum_impl_alternative(self, irecords, delays, weights, itmin, nsamples,
654 decimate):
656 '''
657 Sum delayed and weighted GF traces.
658 '''
660 if not self._f_index:
661 self.open()
663 assert self.mode == 'r'
665 deltat = self.get_deltat() * decimate
667 if len(irecords) == 0:
668 if None in (itmin, nsamples):
669 return Zero
670 else:
671 return GFTrace(
672 num.zeros(nsamples, dtype=gf_dtype), itmin,
673 deltat, is_zero=True)
675 assert len(irecords) == len(delays)
676 assert len(irecords) == len(weights)
678 if None in (itmin, nsamples):
679 itmin_delayed, itmax_delayed = [], []
680 for irecord, delay in zip(irecords, delays):
681 itmin, itmax = self._get_span(irecord, decimate=decimate)
682 itmin_delayed.append(itmin + delay/deltat)
683 itmax_delayed.append(itmax + delay/deltat)
685 itmin = int(math.floor(min(itmin_delayed)))
686 nsamples = int(math.ceil(max(itmax_delayed))) - itmin + 1
688 out = num.zeros(nsamples, dtype=gf_dtype)
689 if nsamples == 0:
690 return GFTrace(out, itmin, deltat)
692 for ii in range(len(irecords)):
693 irecord = irecords[ii]
694 delay = delays[ii]
695 weight = weights[ii]
697 if weight == 0.0:
698 continue
700 idelay_floor = int(math.floor(delay/deltat))
701 idelay_ceil = int(math.ceil(delay/deltat))
703 gf_trace = self._get(
704 irecord,
705 itmin - idelay_ceil,
706 nsamples + idelay_ceil - idelay_floor,
707 decimate,
708 'reference')
710 assert gf_trace.itmin >= itmin - idelay_ceil
711 assert gf_trace.data.size <= nsamples + idelay_ceil - idelay_floor
713 if gf_trace.is_zero:
714 continue
716 ilo = gf_trace.itmin - itmin + idelay_floor
717 ihi = ilo + gf_trace.data.size + (idelay_ceil-idelay_floor)
719 data = gf_trace.data
721 if idelay_floor == idelay_ceil:
722 out[ilo:ihi] += data * weight
723 else:
724 if data.size:
725 k = 1
726 if ihi <= nsamples:
727 out[ihi-1] += gf_trace.end_value * \
728 ((idelay_ceil-delay/deltat) * weight)
729 k = 0
731 out[ilo+1:ihi-k] += data[:data.size-k] * \
732 ((delay/deltat-idelay_floor) * weight)
733 k = 1
734 if ilo >= 0:
735 out[ilo] += gf_trace.begin_value * \
736 ((delay/deltat-idelay_floor) * weight)
737 k = 0
739 out[ilo+k:ihi-1] += data[k:] * \
740 ((idelay_ceil-delay/deltat) * weight)
742 if ilo > 0 and gf_trace.begin_value != 0.0:
743 out[:ilo] += gf_trace.begin_value * weight
745 if ihi < nsamples and gf_trace.end_value != 0.0:
746 out[ihi:] += gf_trace.end_value * weight
748 return GFTrace(out, itmin, deltat)
750 def _sum_impl_reference(self, irecords, delays, weights, itmin, nsamples,
751 decimate):
753 if not self._f_index:
754 self.open()
756 deltat = self.get_deltat() * decimate
758 if len(irecords) == 0:
759 if None in (itmin, nsamples):
760 return Zero
761 else:
762 return GFTrace(
763 num.zeros(nsamples, dtype=gf_dtype), itmin,
764 deltat, is_zero=True)
766 datas = []
767 itmins = []
768 for i, delay, weight in zip(irecords, delays, weights):
769 tr = self._get(i, None, None, decimate, 'reference')
771 idelay_floor = int(math.floor(delay/deltat))
772 idelay_ceil = int(math.ceil(delay/deltat))
774 if idelay_floor == idelay_ceil:
775 itmins.append(tr.itmin + idelay_floor)
776 datas.append(tr.data.copy()*weight)
777 else:
778 itmins.append(tr.itmin + idelay_floor)
779 datas.append(tr.data.copy()*weight*(idelay_ceil-delay/deltat))
780 itmins.append(tr.itmin + idelay_ceil)
781 datas.append(tr.data.copy()*weight*(delay/deltat-idelay_floor))
783 itmin_all = min(itmins)
785 itmax_all = max(itmin_ + data.size for (itmin_, data) in
786 zip(itmins, datas))
788 if itmin is not None:
789 itmin_all = min(itmin_all, itmin)
790 if nsamples is not None:
791 itmax_all = max(itmax_all, itmin+nsamples)
793 nsamples_all = itmax_all - itmin_all
795 arr = num.zeros((len(datas), nsamples_all), dtype=gf_dtype)
796 for i, itmin_, data in zip(num.arange(len(datas)), itmins, datas):
797 if data.size > 0:
798 ilo = itmin_-itmin_all
799 ihi = ilo + data.size
800 arr[i, :ilo] = data[0]
801 arr[i, ilo:ihi] = data
802 arr[i, ihi:] = data[-1]
804 sum_arr = arr.sum(axis=0)
806 if itmin is not None and nsamples is not None:
807 ilo = itmin-itmin_all
808 ihi = ilo + nsamples
809 sum_arr = sum_arr[ilo:ihi]
811 else:
812 itmin = itmin_all
814 return GFTrace(sum_arr, itmin, deltat)
816 def _optimize(self, irecords, delays, weights):
817 if num.unique(irecords).size == irecords.size:
818 return irecords, delays, weights
820 deltat = self.get_deltat()
822 delays = delays / deltat
823 irecords2 = num.repeat(irecords, 2)
824 delays2 = num.empty(irecords2.size, dtype=float)
825 delays2[0::2] = num.floor(delays)
826 delays2[1::2] = num.ceil(delays)
827 weights2 = num.repeat(weights, 2)
828 weights2[0::2] *= 1.0 - (delays - delays2[0::2])
829 weights2[1::2] *= (1.0 - (delays2[1::2] - delays)) * \
830 (delays2[1::2] - delays2[0::2])
832 delays2 *= deltat
834 iorder = num.lexsort((delays2, irecords2))
836 irecords2 = irecords2[iorder]
837 delays2 = delays2[iorder]
838 weights2 = weights2[iorder]
840 ui = num.empty(irecords2.size, dtype=bool)
841 ui[1:] = num.logical_or(num.diff(irecords2) != 0,
842 num.diff(delays2) != 0.)
844 ui[0] = 0
845 ind2 = num.cumsum(ui)
846 ui[0] = 1
847 ind1 = num.where(ui)[0]
849 irecords3 = irecords2[ind1]
850 delays3 = delays2[ind1]
851 weights3 = num.bincount(ind2, weights2)
853 return irecords3, delays3, weights3
855 def _sum(self, irecords, delays, weights, itmin, nsamples, decimate,
856 implementation, optimization):
858 if not self._f_index:
859 self.open()
861 t0 = time.time()
862 if optimization == 'enable':
863 irecords, delays, weights = self._optimize(
864 irecords, delays, weights)
865 else:
866 assert optimization == 'disable'
868 t1 = time.time()
869 deltat = self.get_deltat()
871 if implementation == 'c' and decimate == 1:
872 if delays.size != 0:
873 itoffset = int(num.floor(num.min(delays)/deltat))
874 else:
875 itoffset = 0
877 if nsamples is None:
878 nsamples = -1
880 if itmin is None:
881 itmin = 0
882 else:
883 itmin -= itoffset
885 try:
886 tr = GFTrace(*store_ext.store_sum(
887 self.cstore, irecords.astype(num.uint64),
888 delays - itoffset*deltat,
889 weights.astype(num.float32),
890 int(itmin), int(nsamples)))
892 tr.itmin += itoffset
894 except store_ext.StoreExtError as e:
895 raise StoreError(str(e) + ' in store %s' % self.store_dir)
897 elif implementation == 'alternative':
898 tr = self._sum_impl_alternative(irecords, delays, weights,
899 itmin, nsamples, decimate)
901 else:
902 tr = self._sum_impl_reference(irecords, delays, weights,
903 itmin, nsamples, decimate)
905 t2 = time.time()
907 tr.n_records_stacked = irecords.size
908 tr.t_optimize = t1 - t0
909 tr.t_stack = t2 - t1
911 return tr
913 def _sum_statics(self, irecords, delays, weights, it, ntargets,
914 nthreads):
915 if not self._f_index:
916 self.open()
918 return store_ext.store_sum_static(
919 self.cstore, irecords, delays, weights, it, ntargets, nthreads)
921 def _load_index(self):
922 if self._use_memmap:
923 records = num.memmap(
924 self._f_index, dtype=gf_record_dtype,
925 offset=gf_store_header_fmt_size,
926 mode=('r', 'r+')[self.mode == 'w'])
928 else:
929 self._f_index.seek(gf_store_header_fmt_size)
930 records = num.fromfile(self._f_index, dtype=gf_record_dtype)
932 assert len(records) == self._nrecords
934 self._records = records
936 def _save_index(self):
937 self._f_index.seek(0)
938 self._f_index.write(struct.pack(gf_store_header_fmt, self._nrecords,
939 self.get_deltat()))
941 if self._use_memmap:
942 self._records.flush()
943 else:
944 self._f_index.seek(gf_store_header_fmt_size)
945 self._records.tofile(self._f_index)
946 self._f_index.flush()
948 def _get_data(self, ipos, begin_value, end_value, ilo, ihi):
949 if ihi - ilo > 0:
950 if ipos == 2:
951 data_orig = num.empty(2, dtype=gf_dtype)
952 data_orig[0] = begin_value
953 data_orig[1] = end_value
954 return data_orig[ilo:ihi]
955 else:
956 self._f_data.seek(
957 int(ipos + ilo*gf_dtype_nbytes_per_sample))
958 arr = num.fromfile(
959 self._f_data, gf_dtype_store, ihi-ilo).astype(gf_dtype)
961 if arr.size != ihi-ilo:
962 raise ShortRead()
963 return arr
964 else:
965 return num.empty((0,), dtype=gf_dtype)
967 def lock_fn(self):
968 return BaseStore.lock_fn_(self.store_dir)
970 def index_fn(self):
971 return BaseStore.index_fn_(self.store_dir)
973 def data_fn(self):
974 return BaseStore.data_fn_(self.store_dir)
976 def config_fn(self):
977 return BaseStore.config_fn_(self.store_dir)
979 def count_special_records(self):
980 if not self._f_index:
981 self.open()
983 return num.histogram(
984 self._records['data_offset'],
985 bins=[0, 1, 2, 3, num.array(-1).astype(num.uint64)])[0]
987 @property
988 def size_index(self):
989 return os.stat(self.index_fn()).st_size
991 @property
992 def size_data(self):
993 return os.stat(self.data_fn()).st_size
995 @property
996 def size_index_and_data(self):
997 return self.size_index + self.size_data
999 @property
1000 def size_index_and_data_human(self):
1001 return util.human_bytesize(self.size_index_and_data)
1003 def stats(self):
1004 counter = self.count_special_records()
1006 stats = dict(
1007 total=self._nrecords,
1008 inserted=(counter[1] + counter[2] + counter[3]),
1009 empty=counter[0],
1010 short=counter[2],
1011 zero=counter[1],
1012 size_data=self.size_data,
1013 size_index=self.size_index,
1014 )
1016 return stats
1018 stats_keys = 'total inserted empty short zero size_data size_index'.split()
1021def remake_dir(dpath, force):
1022 if os.path.exists(dpath):
1023 if force:
1024 shutil.rmtree(dpath)
1025 else:
1026 raise CannotCreate('Directory "%s" already exists.' % dpath)
1028 os.mkdir(dpath)
1031class MakeTimingParamsFailed(StoreError):
1032 pass
1035class Store(BaseStore):
1037 '''
1038 Green's function disk storage and summation machine.
1040 The `Store` can be used to efficiently store, retrieve, and sum Green's
1041 function traces. A `Store` contains many 1D time traces sampled at even
1042 multiples of a global sampling rate, where each time trace has an
1043 individual start and end time. The traces are treated as having repeating
1044 end points, so the functions they represent can be non-constant only
1045 between begin and end time. It provides capabilities to retrieve decimated
1046 traces and to extract parts of the traces. The main purpose of this class
1047 is to provide a fast, easy to use, and flexible machanism to compute
1048 weighted delay-and-sum stacks with many Green's function traces involved.
1050 Individual Green's functions are accessed through a single integer index at
1051 low level. On top of that, various indexing schemes can be implemented by
1052 providing a mapping from physical coordinates to the low level index `i`.
1053 E.g. for a problem with cylindrical symmetry, one might define a mapping
1054 from the coordinates (`receiver_depth`, `source_depth`, `distance`) to the
1055 low level index. Index translation is done in the
1056 :py:class:`pyrocko.gf.meta.Config` subclass object associated with the
1057 Store. It is accessible through the store's :py:attr:`config` attribute,
1058 and contains all meta information about the store, including physical
1059 extent, geometry, sampling rate, and information about the type of the
1060 stored Green's functions. Information about the underlying earth model
1061 can also be found there.
1063 A GF store can also contain tabulated phase arrivals. In basic cases, these
1064 can be created with the :py:meth:`make_travel_time_tables` and evaluated
1065 with the :py:func:`t` methods.
1067 .. attribute:: config
1069 The :py:class:`pyrocko.gf.meta.Config` derived object associated with
1070 the store which contains all meta information about the store and
1071 provides the high-level to low-level index mapping.
1073 .. attribute:: store_dir
1075 Path to the store's data directory.
1077 .. attribute:: mode
1079 The mode in which the store is opened: ``'r'``: read-only, ``'w'``:
1080 writeable.
1081 '''
1083 @classmethod
1084 def create(cls, store_dir, config, force=False, extra=None):
1085 '''
1086 Create new GF store.
1088 Creates a new GF store at path ``store_dir``. The layout of the GF is
1089 defined with the parameters given in ``config``, which should be an
1090 object of a subclass of :py:class:`~pyrocko.gf.meta.Config`. This
1091 function will refuse to overwrite an existing GF store, unless
1092 ``force`` is set to ``True``. If more information, e.g. parameters
1093 used for the modelling code, earth models or other, should be saved
1094 along with the GF store, these may be provided though a dict given to
1095 ``extra``. The keys of this dict must be names and the values must be
1096 *guts* type objects.
1098 :param store_dir: GF store path
1099 :type store_dir: str
1100 :param config: GF store Config
1101 :type config: :py:class:`~pyrocko.gf.meta.Config`
1102 :param force: Force overwrite, defaults to ``False``
1103 :type force: bool
1104 :param extra: Extra information
1105 :type extra: dict or None
1106 '''
1108 cls.create_editables(store_dir, config, force=force, extra=extra)
1109 cls.create_dependants(store_dir, force=force)
1111 return cls(store_dir)
1113 @staticmethod
1114 def create_editables(store_dir, config, force=False, extra=None):
1115 try:
1116 util.ensuredir(store_dir)
1117 except Exception:
1118 raise CannotCreate('cannot create directory %s' % store_dir)
1120 fns = []
1122 config_fn = os.path.join(store_dir, 'config')
1123 remove_if_exists(config_fn, force)
1124 meta.dump(config, filename=config_fn)
1126 fns.append(config_fn)
1128 for sub_dir in ['extra']:
1129 dpath = os.path.join(store_dir, sub_dir)
1130 remake_dir(dpath, force)
1132 if extra:
1133 for k, v in extra.items():
1134 fn = get_extra_path(store_dir, k)
1135 remove_if_exists(fn, force)
1136 meta.dump(v, filename=fn)
1138 fns.append(fn)
1140 return fns
1142 @staticmethod
1143 def create_dependants(store_dir, force=False):
1144 config_fn = os.path.join(store_dir, 'config')
1145 config = meta.load(filename=config_fn)
1147 BaseStore.create(store_dir, config.deltat, config.nrecords,
1148 force=force)
1150 for sub_dir in ['decimated']:
1151 dpath = os.path.join(store_dir, sub_dir)
1152 remake_dir(dpath, force)
1154 def __init__(self, store_dir, mode='r', use_memmap=True):
1155 BaseStore.__init__(self, store_dir, mode=mode, use_memmap=use_memmap)
1156 config_fn = self.config_fn()
1157 if not os.path.isfile(config_fn):
1158 raise StoreError(
1159 'directory "%s" does not seem to contain a GF store '
1160 '("config" file not found)' % store_dir)
1161 self.load_config()
1163 self._decimated = {}
1164 self._extra = {}
1165 self._phases = {}
1166 for decimate in range(2, 9):
1167 if os.path.isdir(self._decimated_store_dir(decimate)):
1168 self._decimated[decimate] = None
1170 def open(self):
1171 if not self._f_index:
1172 BaseStore.open(self)
1173 c = self.config
1175 mscheme = 'type_' + c.short_type.lower()
1176 store_ext.store_mapping_init(
1177 self.cstore, mscheme,
1178 c.mins, c.maxs, c.deltas, c.ns.astype(num.uint64),
1179 c.ncomponents)
1181 def save_config(self, make_backup=False):
1182 config_fn = self.config_fn()
1183 if make_backup:
1184 os.rename(config_fn, config_fn + '~')
1186 meta.dump(self.config, filename=config_fn)
1188 def get_deltat(self):
1189 return self.config.deltat
1191 def load_config(self):
1192 self.config = meta.load(filename=self.config_fn())
1194 def ensure_reference(self, force=False):
1195 if self.config.reference is not None and not force:
1196 return
1197 self.ensure_uuid()
1198 reference = '%s-%s' % (self.config.id, self.config.uuid[0:6])
1200 if self.config.reference is not None:
1201 self.config.reference = reference
1202 self.save_config()
1203 else:
1204 with open(self.config_fn(), 'a') as config:
1205 config.write('reference: %s\n' % reference)
1206 self.load_config()
1208 def ensure_uuid(self, force=False):
1209 if self.config.uuid is not None and not force:
1210 return
1211 uuid = self.create_store_hash()
1213 if self.config.uuid is not None:
1214 self.config.uuid = uuid
1215 self.save_config()
1216 else:
1217 with open(self.config_fn(), 'a') as config:
1218 config.write('\nuuid: %s\n' % uuid)
1219 self.load_config()
1221 def create_store_hash(self):
1222 logger.info('creating hash for store ...')
1223 m = hashlib.sha1()
1225 traces_size = op.getsize(self.data_fn())
1226 with open(self.data_fn(), 'rb') as traces:
1227 while traces.tell() < traces_size:
1228 m.update(traces.read(4096))
1229 traces.seek(1024 * 1024 * 10, 1)
1231 with open(self.config_fn(), 'rb') as config:
1232 m.update(config.read())
1233 return m.hexdigest()
1235 def get_extra_path(self, key):
1236 return get_extra_path(self.store_dir, key)
1238 def get_extra(self, key):
1239 '''
1240 Get extra information stored under given key.
1241 '''
1243 x = self._extra
1244 if key not in x:
1245 fn = self.get_extra_path(key)
1246 if not os.path.exists(fn):
1247 raise NoSuchExtra(key)
1249 x[key] = meta.load(filename=fn)
1251 return x[key]
1253 def upgrade(self):
1254 '''
1255 Upgrade store config and files to latest Pyrocko version.
1256 '''
1257 fns = [os.path.join(self.store_dir, 'config')]
1258 for key in self.extra_keys():
1259 fns.append(self.get_extra_path(key))
1261 i = 0
1262 for fn in fns:
1263 i += util.pf_upgrade(fn)
1265 return i
1267 def extra_keys(self):
1268 return [x for x in os.listdir(os.path.join(self.store_dir, 'extra'))
1269 if valid_string_id(x)]
1271 def put(self, args, trace):
1272 '''
1273 Insert trace into GF store.
1275 Store a single GF trace at (high-level) index ``args``.
1277 :param args: :py:class:`~pyrocko.gf.meta.Config` index tuple, e.g.
1278 ``(source_depth, distance, component)`` as in
1279 :py:class:`~pyrocko.gf.meta.ConfigTypeA`.
1280 :type args: tuple
1281 :returns: GF trace at ``args``
1282 :rtype: :py:class:`~pyrocko.gf.store.GFTrace`
1283 '''
1285 irecord = self.config.irecord(*args)
1286 self._put(irecord, trace)
1288 def get_record(self, args):
1289 irecord = self.config.irecord(*args)
1290 return self._get_record(irecord)
1292 def str_irecord(self, args):
1293 return BaseStore.str_irecord(self, self.config.irecord(*args))
1295 def get(self, args, itmin=None, nsamples=None, decimate=1,
1296 interpolation='nearest_neighbor', implementation='c'):
1297 '''
1298 Retrieve GF trace from store.
1300 Retrieve a single GF trace from the store at (high-level) index
1301 ``args``. By default, the full trace is retrieved. Given ``itmin`` and
1302 ``nsamples``, only the selected portion of the trace is extracted. If
1303 ``decimate`` is an integer in the range [2,8], the trace is decimated
1304 on the fly or, if available, the trace is read from a decimated version
1305 of the GF store.
1307 :param args: :py:class:`~pyrocko.gf.meta.Config` index tuple, e.g.
1308 ``(source_depth, distance, component)`` as in
1309 :py:class:`~pyrocko.gf.meta.ConfigTypeA`.
1310 :type args: tuple
1311 :param itmin: Start time index (start time is ``itmin * dt``),
1312 defaults to None
1313 :type itmin: int or None
1314 :param nsamples: Number of samples, defaults to None
1315 :type nsamples: int or None
1316 :param decimate: Decimatation factor, defaults to 1
1317 :type decimate: int
1318 :param interpolation: Interpolation method
1319 ``['nearest_neighbor', 'multilinear', 'off']``, defaults to
1320 ``'nearest_neighbor'``
1321 :type interpolation: str
1322 :param implementation: Implementation to use ``['c', 'reference']``,
1323 defaults to ``'c'``.
1324 :type implementation: str
1326 :returns: GF trace at ``args``
1327 :rtype: :py:class:`~pyrocko.gf.store.GFTrace`
1328 '''
1330 store, decimate = self._decimated_store(decimate)
1331 if interpolation == 'nearest_neighbor':
1332 irecord = store.config.irecord(*args)
1333 tr = store._get(irecord, itmin, nsamples, decimate,
1334 implementation)
1336 elif interpolation == 'off':
1337 irecords, weights = store.config.vicinity(*args)
1338 if len(irecords) != 1:
1339 raise NotAllowedToInterpolate()
1340 else:
1341 tr = store._get(irecords[0], itmin, nsamples, decimate,
1342 implementation)
1344 elif interpolation == 'multilinear':
1345 tr = store._sum(irecords, num.zeros(len(irecords)), weights,
1346 itmin, nsamples, decimate, implementation,
1347 'disable')
1349 # to prevent problems with rounding errors (BaseStore saves deltat
1350 # as a 4-byte floating point value, value from YAML config is more
1351 # accurate)
1352 tr.deltat = self.config.deltat * decimate
1353 return tr
1355 def sum(self, args, delays, weights, itmin=None, nsamples=None,
1356 decimate=1, interpolation='nearest_neighbor', implementation='c',
1357 optimization='enable'):
1358 '''
1359 Sum delayed and weighted GF traces.
1361 Calculate sum of delayed and weighted GF traces. ``args`` is a tuple of
1362 arrays forming the (high-level) indices of the GF traces to be
1363 selected. Delays and weights for the summation are given in the arrays
1364 ``delays`` and ``weights``. ``itmin`` and ``nsamples`` can be given to
1365 restrict to computation to a given time interval. If ``decimate`` is
1366 an integer in the range [2,8], decimated traces are used in the
1367 summation.
1369 :param args: :py:class:`~pyrocko.gf.meta.Config` index tuple, e.g.
1370 ``(source_depth, distance, component)`` as in
1371 :py:class:`~pyrocko.gf.meta.ConfigTypeA`.
1372 :type args: tuple(numpy.ndarray)
1373 :param delays: Delay times
1374 :type delays: :py:class:`numpy.ndarray`
1375 :param weights: Trace weights
1376 :type weights: :py:class:`numpy.ndarray`
1377 :param itmin: Start time index (start time is ``itmin * dt``),
1378 defaults to None
1379 :type itmin: int or None
1380 :param nsamples: Number of samples, defaults to None
1381 :type nsamples: int or None
1382 :param decimate: Decimatation factor, defaults to 1
1383 :type decimate: int
1384 :param interpolation: Interpolation method
1385 ``['nearest_neighbor', 'multilinear', 'off']``, defaults to
1386 ``'nearest_neighbor'``
1387 :type interpolation: str
1388 :param implementation: Implementation to use,
1389 ``['c', 'alternative', 'reference']``, where ``'alternative'``
1390 and ``'reference'`` use a Python implementation, defaults to `'c'`
1391 :type implementation: str
1392 :param optimization: Optimization mode ``['enable', 'disable']``,
1393 defaults to ``'enable'``
1394 :type optimization: str
1395 :returns: Stacked GF trace.
1396 :rtype: :py:class:`~pyrocko.gf.store.GFTrace`
1397 '''
1399 store, decimate_ = self._decimated_store(decimate)
1401 if interpolation == 'nearest_neighbor':
1402 irecords = store.config.irecords(*args)
1403 else:
1404 assert interpolation == 'multilinear'
1405 irecords, ip_weights = store.config.vicinities(*args)
1406 neach = irecords.size // args[0].size
1407 weights = num.repeat(weights, neach) * ip_weights
1408 delays = num.repeat(delays, neach)
1410 tr = store._sum(irecords, delays, weights, itmin, nsamples, decimate_,
1411 implementation, optimization)
1413 # to prevent problems with rounding errors (BaseStore saves deltat
1414 # as a 4-byte floating point value, value from YAML config is more
1415 # accurate)
1416 tr.deltat = self.config.deltat * decimate
1417 return tr
1419 def make_decimated(self, decimate, config=None, force=False,
1420 show_progress=False):
1421 '''
1422 Create decimated version of GF store.
1424 Create a downsampled version of the GF store. Downsampling is done for
1425 the integer factor ``decimate`` which should be in the range [2,8]. If
1426 ``config`` is ``None``, all traces of the GF store are decimated and
1427 held available (i.e. the index mapping of the original store is used),
1428 otherwise, a different spacial stepping can be specified by giving a
1429 modified GF store configuration in ``config`` (see :py:meth:`create`).
1430 Decimated GF sub-stores are created under the ``decimated``
1431 subdirectory within the GF store directory. Holding available decimated
1432 versions of the GF store can save computation time, IO bandwidth, or
1433 decrease memory footprint at the cost of increased disk space usage,
1434 when computation are done for lower frequency signals.
1436 :param decimate: Decimate factor
1437 :type decimate: int
1438 :param config: GF store config object, defaults to None
1439 :type config: :py:class:`~pyrocko.gf.meta.Config` or None
1440 :param force: Force overwrite, defaults to ``False``
1441 :type force: bool
1442 :param show_progress: Show progress, defaults to ``False``
1443 :type show_progress: bool
1444 '''
1446 if not self._f_index:
1447 self.open()
1449 if not (2 <= decimate <= 8):
1450 raise StoreError('decimate argument must be in the range [2,8]')
1452 assert self.mode == 'r'
1454 if config is None:
1455 config = self.config
1457 config = copy.deepcopy(config)
1458 config.sample_rate = self.config.sample_rate / decimate
1460 if decimate in self._decimated:
1461 del self._decimated[decimate]
1463 store_dir = self._decimated_store_dir(decimate)
1464 if os.path.exists(store_dir):
1465 if force:
1466 shutil.rmtree(store_dir)
1467 else:
1468 raise CannotCreate('store already exists at %s' % store_dir)
1470 store_dir_incomplete = store_dir + '-incomplete'
1471 Store.create(store_dir_incomplete, config, force=force)
1473 decimated = Store(store_dir_incomplete, 'w')
1474 try:
1475 if show_progress:
1476 pbar = util.progressbar(
1477 'decimating store', self.config.nrecords)
1479 for i, args in enumerate(decimated.config.iter_nodes()):
1480 tr = self.get(args, decimate=decimate)
1481 decimated.put(args, tr)
1483 if show_progress:
1484 pbar.update(i+1)
1486 finally:
1487 if show_progress:
1488 pbar.finish()
1490 decimated.close()
1492 shutil.move(store_dir_incomplete, store_dir)
1494 self._decimated[decimate] = None
1496 def stats(self):
1497 stats = BaseStore.stats(self)
1498 stats['decimated'] = sorted(self._decimated.keys())
1499 return stats
1501 stats_keys = BaseStore.stats_keys + ['decimated']
1503 def check(self, show_progress=False):
1504 have_holes = []
1505 for pdef in self.config.tabulated_phases:
1506 phase_id = pdef.id
1507 ph = self.get_stored_phase(phase_id)
1508 if ph.check_holes():
1509 have_holes.append(phase_id)
1511 if have_holes:
1512 for phase_id in have_holes:
1513 logger.warning(
1514 'Travel time table of phase "{}" contains holes. You can '
1515 ' use `fomosto tttlsd` to fix holes.'.format(
1516 phase_id))
1517 else:
1518 logger.info('No holes in travel time tables')
1520 try:
1521 if show_progress:
1522 pbar = util.progressbar('checking store', self.config.nrecords)
1524 problems = 0
1525 for i, args in enumerate(self.config.iter_nodes()):
1526 tr = self.get(args)
1527 if tr and not tr.is_zero:
1528 if not tr.begin_value == tr.data[0]:
1529 logger.warning('wrong begin value for trace at %s '
1530 '(data corruption?)' % str(args))
1531 problems += 1
1532 if not tr.end_value == tr.data[-1]:
1533 logger.warning('wrong end value for trace at %s '
1534 '(data corruption?)' % str(args))
1535 problems += 1
1536 if not num.all(num.isfinite(tr.data)):
1537 logger.warning(
1538 'nans or infs in trace at %s' % str(args))
1539 problems += 1
1541 if show_progress:
1542 pbar.update(i+1)
1544 finally:
1545 if show_progress:
1546 pbar.finish()
1548 return problems
1550 def check_earthmodels(self, config):
1551 if config.earthmodel_receiver_1d.profile('z')[-1] not in\
1552 config.earthmodel_1d.profile('z'):
1553 logger.warning('deepest layer of earthmodel_receiver_1d not '
1554 'found in earthmodel_1d')
1556 def _decimated_store_dir(self, decimate):
1557 return os.path.join(self.store_dir, 'decimated', str(decimate))
1559 def _decimated_store(self, decimate):
1560 if decimate == 1 or decimate not in self._decimated:
1561 return self, decimate
1562 else:
1563 store = self._decimated[decimate]
1564 if store is None:
1565 store = Store(self._decimated_store_dir(decimate), 'r')
1566 self._decimated[decimate] = store
1568 return store, 1
1570 def phase_filename(self, phase_id, attribute='phase'):
1571 check_string_id(phase_id)
1572 return os.path.join(
1573 self.store_dir, 'phases', phase_id + '.%s' % attribute)
1575 def get_phase_identifier(self, phase_id, attribute):
1576 return '{}.{}'.format(phase_id, attribute)
1578 def get_stored_phase(self, phase_def, attribute='phase'):
1579 '''
1580 Get stored phase from GF store.
1582 :returns: Phase information
1583 :rtype: :py:class:`pyrocko.spit.SPTree`
1584 '''
1586 phase_id = self.get_phase_identifier(phase_def, attribute)
1587 if phase_id not in self._phases:
1588 fn = self.phase_filename(phase_def, attribute)
1589 if not os.path.isfile(fn):
1590 raise NoSuchPhase(phase_id)
1592 spt = spit.SPTree(filename=fn)
1593 self._phases[phase_id] = spt
1595 return self._phases[phase_id]
1597 def get_phase(self, phase_def):
1598 toks = phase_def.split(':', 1)
1599 if len(toks) == 2:
1600 provider, phase_def = toks
1601 else:
1602 provider, phase_def = 'stored', toks[0]
1604 if provider == 'stored':
1605 return self.get_stored_phase(phase_def)
1607 elif provider == 'vel':
1608 vel = float(phase_def) * 1000.
1610 def evaluate(args):
1611 return self.config.get_distance(args) / vel
1613 return evaluate
1615 elif provider == 'vel_surface':
1616 vel = float(phase_def) * 1000.
1618 def evaluate(args):
1619 return self.config.get_surface_distance(args) / vel
1621 return evaluate
1623 elif provider in ('cake', 'iaspei'):
1624 from pyrocko import cake
1625 mod = self.config.earthmodel_1d
1626 if provider == 'cake':
1627 phases = [cake.PhaseDef(phase_def)]
1628 else:
1629 phases = cake.PhaseDef.classic(phase_def)
1631 def evaluate(args):
1632 if len(args) == 2:
1633 zr, zs, x = (self.config.receiver_depth,) + args
1634 elif len(args) == 3:
1635 zr, zs, x = args
1636 else:
1637 assert False
1639 t = []
1640 if phases:
1641 rays = mod.arrivals(
1642 phases=phases,
1643 distances=[x*cake.m2d],
1644 zstart=zs,
1645 zstop=zr)
1647 for ray in rays:
1648 t.append(ray.t)
1650 if t:
1651 return min(t)
1652 else:
1653 return None
1655 return evaluate
1657 raise StoreError('unsupported phase provider: %s' % provider)
1659 def t(self, timing, *args):
1660 '''
1661 Compute interpolated phase arrivals.
1663 **Examples:**
1665 If ``test_store`` is a :py:class:`Type A<pyrocko.gf.meta.ConfigTypeA>`
1666 store::
1668 test_store.t('stored:p', (1000, 10000))
1669 test_store.t('last{stored:P|stored:Pdiff}', (1000, 10000))
1670 # The latter arrival
1671 # of P or diffracted
1672 # P phase
1674 If ``test_store`` is a :py:class:`Type B<pyrocko.gf.meta.ConfigTypeB>`
1675 store::
1677 test_store.t('S', (1000, 1000, 10000))
1678 test_store.t('first{P|p|Pdiff|sP}', (1000, 1000, 10000))
1679 # The first arrival of
1680 # the given phases is
1681 # selected
1683 Independent of the store type, it is also possible to specify two
1684 location objects and the GF index tuple is calculated internally::
1686 test_store.t('p', source, target)
1688 :param timing: travel-time definition
1689 :type timing: str or :py:class:`~pyrocko.gf.meta.Timing`
1690 :param \\*args: if ``len(args) == 1``, ``args[0]`` must be a
1691 :py:class:`GF index tuple <pyrocko.gf.meta.Config>`, e.g.
1692 ``(source_depth, distance, component)`` for a
1693 :py:class:`Type A<pyrocko.gf.meta.ConfigTypeA>` store. If
1694 ``len(args) == 2``, two location objects are expected, e.g.
1695 ``(source, receiver)`` and the appropriate GF index is computed
1696 internally.
1697 :type \\*args: (:py:class:`tuple`,) or
1698 (:py:class:`~pyrocko.model.Location`,
1699 :py:class:`~pyrocko.model.Location`)
1700 :returns: Phase arrival according to ``timing``
1701 :rtype: float or None
1702 '''
1704 if len(args) == 1:
1705 args = args[0]
1706 else:
1707 args = self.config.make_indexing_args1(*args)
1709 if not isinstance(timing, meta.Timing):
1710 timing = meta.Timing(timing)
1712 return timing.evaluate(self.get_phase, args)
1714 def get_available_interpolation_tables(self):
1715 filenames = glob(op.join(self.store_dir, 'phases', '*'))
1716 return [op.basename(file) for file in filenames]
1718 def get_stored_attribute(self, phase_def, attribute, *args):
1719 '''
1720 Return interpolated store attribute
1722 :param attribute: takeoff_angle / incidence_angle [deg]
1723 :type attribute: str
1724 :param \\*args: :py:class:`~pyrocko.gf.meta.Config` index tuple, e.g.
1725 ``(source_depth, distance, component)`` as in
1726 :py:class:`~pyrocko.gf.meta.ConfigTypeA`.
1727 :type \\*args: tuple
1728 '''
1729 try:
1730 return self.get_stored_phase(phase_def, attribute)(*args)
1731 except NoSuchPhase:
1732 raise StoreError(
1733 'Interpolation table for {} of {} does not exist! '
1734 'Available tables: {}'.format(
1735 attribute, phase_def,
1736 self.get_available_interpolation_tables()))
1738 def get_many_stored_attributes(self, phase_def, attribute, coords):
1739 '''
1740 Return interpolated store attribute
1742 :param attribute: takeoff_angle / incidence_angle [deg]
1743 :type attribute: str
1744 :param \\coords: :py:class:`num.array.Array`, with columns being
1745 ``(source_depth, distance, component)`` as in
1746 :py:class:`~pyrocko.gf.meta.ConfigTypeA`.
1747 :type \\coords: :py:class:`num.array.Array`
1748 '''
1749 try:
1750 return self.get_stored_phase(
1751 phase_def, attribute).interpolate_many(coords)
1752 except NoSuchPhase:
1753 raise StoreError(
1754 'Interpolation table for {} of {} does not exist! '
1755 'Available tables: {}'.format(
1756 attribute, phase_def,
1757 self.get_available_interpolation_tables()))
1759 def make_stored_table(self, attribute, force=False):
1760 '''
1761 Compute tables for selected ray attributes.
1763 :param attribute: phase / takeoff_angle [deg]/ incidence_angle [deg]
1764 :type attribute: str
1766 Tables are computed using the 1D earth model defined in
1767 :py:attr:`~pyrocko.gf.meta.Config.earthmodel_1d` for each defined phase
1768 in :py:attr:`~pyrocko.gf.meta.Config.tabulated_phases`.
1769 '''
1771 if attribute not in available_stored_tables:
1772 raise StoreError(
1773 'Cannot create attribute table for {}! '
1774 'Supported attribute tables: {}'.format(
1775 attribute, available_stored_tables))
1777 from pyrocko import cake
1778 config = self.config
1780 if not config.tabulated_phases:
1781 return
1783 mod = config.earthmodel_1d
1785 if not mod:
1786 raise StoreError('no earth model found')
1788 if config.earthmodel_receiver_1d:
1789 self.check_earthmodels(config)
1791 for pdef in config.tabulated_phases:
1793 phase_id = pdef.id
1794 phases = pdef.phases
1796 if attribute == 'phase':
1797 ftol = config.deltat * 0.5
1798 horvels = pdef.horizontal_velocities
1799 else:
1800 ftol = config.deltat * 0.01
1802 fn = self.phase_filename(phase_id, attribute)
1804 if os.path.exists(fn) and not force:
1805 logger.info('file already exists: %s' % fn)
1806 continue
1808 def evaluate(args):
1810 nargs = len(args)
1811 if nargs == 2:
1812 receiver_depth, source_depth, distance = (
1813 config.receiver_depth,) + args
1814 elif nargs == 3:
1815 receiver_depth, source_depth, distance = args
1816 else:
1817 raise ValueError(
1818 'Number of input arguments %i is not supported!'
1819 'Supported number of arguments: 2 or 3' % nargs)
1821 ray_attribute_values = []
1822 arrival_times = []
1823 if phases:
1824 rays = mod.arrivals(
1825 phases=phases,
1826 distances=[distance * cake.m2d],
1827 zstart=source_depth,
1828 zstop=receiver_depth)
1830 for ray in rays:
1831 arrival_times.append(ray.t)
1832 if attribute != 'phase':
1833 ray_attribute_values.append(
1834 getattr(ray, attribute)())
1836 if attribute == 'phase':
1837 for v in horvels:
1838 arrival_times.append(distance / (v * km))
1840 if arrival_times:
1841 if attribute == 'phase':
1842 return min(arrival_times)
1843 else:
1844 earliest_idx = num.argmin(arrival_times)
1845 return ray_attribute_values[earliest_idx]
1846 else:
1847 return None
1849 logger.info(
1850 'making "%s" table for phasegroup "%s"', attribute, phase_id)
1852 ip = spit.SPTree(
1853 f=evaluate,
1854 ftol=ftol,
1855 xbounds=num.transpose((config.mins, config.maxs)),
1856 xtols=config.deltas)
1858 util.ensuredirs(fn)
1859 ip.dump(fn)
1861 def make_timing_params(self, begin, end, snap_vred=True, force=False):
1862 '''
1863 Compute tight parameterized time ranges to include given timings.
1865 Calculates appropriate time ranges to cover given begin and end timings
1866 over all GF points in the store. A dict with the following keys is
1867 returned:
1869 * ``'tmin'``: time [s], minimum of begin timing over all GF points
1870 * ``'tmax'``: time [s], maximum of end timing over all GF points
1871 * ``'vred'``, ``'tmin_vred'``: slope [m/s] and offset [s] of reduction
1872 velocity [m/s] appropriate to catch begin timing over all GF points
1873 * ``'tlenmax_vred'``: maximum time length needed to cover all end
1874 timings, when using linear slope given with (``vred``, ``tmin_vred``)
1875 as start
1876 '''
1878 data = []
1879 warned = set()
1880 for args in self.config.iter_nodes(level=-1):
1881 tmin = self.t(begin, args)
1882 tmax = self.t(end, args)
1883 if tmin is None:
1884 warned.add(str(begin))
1885 if tmax is None:
1886 warned.add(str(end))
1888 x = self.config.get_surface_distance(args)
1889 data.append((x, tmin, tmax))
1891 if len(warned):
1892 w = ' | '.join(list(warned))
1893 msg = '''determination of time window failed using phase
1894definitions: %s.\n Travel time table contains holes in probed ranges. You can
1895use `fomosto tttlsd` to fix holes.''' % w
1896 if force:
1897 logger.warning(msg)
1898 else:
1899 raise MakeTimingParamsFailed(msg)
1901 xs, tmins, tmaxs = num.array(data, dtype=float).T
1903 tlens = tmaxs - tmins
1905 i = num.nanargmin(tmins)
1906 if not num.isfinite(i):
1907 raise MakeTimingParamsFailed('determination of time window failed')
1909 tminmin = tmins[i]
1910 x_tminmin = xs[i]
1911 dx = (xs - x_tminmin)
1912 dx = num.where(dx != 0.0, dx, num.nan)
1913 s = (tmins - tminmin) / dx
1914 sred = num.min(num.abs(s[num.isfinite(s)]))
1916 deltax = self.config.distance_delta
1918 if snap_vred:
1919 tdif = sred*deltax
1920 tdif2 = self.config.deltat * math.floor(tdif / self.config.deltat)
1921 sred = tdif2/self.config.distance_delta
1923 tmin_vred = tminmin - sred*x_tminmin
1924 if snap_vred:
1925 xe = x_tminmin - int(x_tminmin / deltax) * deltax
1926 tmin_vred = float(
1927 self.config.deltat *
1928 math.floor(tmin_vred / self.config.deltat) - xe * sred)
1930 tlenmax_vred = num.nanmax(tmax - (tmin_vred + sred*x))
1931 if sred != 0.0:
1932 vred = 1.0/sred
1933 else:
1934 vred = 0.0
1936 return dict(
1937 tmin=tminmin,
1938 tmax=num.nanmax(tmaxs),
1939 tlenmax=num.nanmax(tlens),
1940 tmin_vred=tmin_vred,
1941 tlenmax_vred=tlenmax_vred,
1942 vred=vred)
1944 def make_travel_time_tables(self, force=False):
1945 '''
1946 Compute travel time tables.
1948 Travel time tables are computed using the 1D earth model defined in
1949 :py:attr:`~pyrocko.gf.meta.Config.earthmodel_1d` for each defined phase
1950 in :py:attr:`~pyrocko.gf.meta.Config.tabulated_phases`. The accuracy of
1951 the tablulated times is adjusted to the sampling rate of the store.
1952 '''
1953 self.make_stored_table(attribute='phase', force=force)
1955 def make_ttt(self, force=False):
1956 self.make_travel_time_tables(force=force)
1958 def make_takeoff_angle_tables(self, force=False):
1959 '''
1960 Compute takeoff-angle tables.
1962 Takeoff-angle tables [deg] are computed using the 1D earth model
1963 defined in :py:attr:`~pyrocko.gf.meta.Config.earthmodel_1d` for each
1964 defined phase in :py:attr:`~pyrocko.gf.meta.Config.tabulated_phases`.
1965 The accuracy of the tablulated times is adjusted to 0.01 times the
1966 sampling rate of the store.
1967 '''
1968 self.make_stored_table(attribute='takeoff_angle', force=force)
1970 def make_incidence_angle_tables(self, force=False):
1971 '''
1972 Compute incidence-angle tables.
1974 Incidence-angle tables [deg] are computed using the 1D earth model
1975 defined in :py:attr:`~pyrocko.gf.meta.Config.earthmodel_1d` for each
1976 defined phase in :py:attr:`~pyrocko.gf.meta.Config.tabulated_phases`.
1977 The accuracy of the tablulated times is adjusted to 0.01 times the
1978 sampling rate of the store.
1979 '''
1980 self.make_stored_table(attribute='incidence_angle', force=force)
1982 def get_provided_components(self):
1984 scheme_desc = meta.component_scheme_to_description[
1985 self.config.component_scheme]
1987 quantity = self.config.stored_quantity \
1988 or scheme_desc.default_stored_quantity
1990 if not quantity:
1991 return scheme_desc.provided_components
1992 else:
1993 return [
1994 quantity + '.' + comp
1995 for comp in scheme_desc.provided_components]
1997 def fix_ttt_holes(self, phase_id):
1999 pdef = self.config.get_tabulated_phase(phase_id)
2000 mode = None
2001 for phase in pdef.phases:
2002 for leg in phase.legs():
2003 if mode is None:
2004 mode = leg.mode
2006 else:
2007 if mode != leg.mode:
2008 raise StoreError(
2009 'Can only fix holes in pure P or pure S phases.')
2011 sptree = self.get_stored_phase(phase_id)
2012 sptree_lsd = self.config.fix_ttt_holes(sptree, mode)
2014 phase_lsd = phase_id + '.lsd'
2015 fn = self.phase_filename(phase_lsd)
2016 sptree_lsd.dump(fn)
2018 def statics(self, source, multi_location, itsnapshot, components,
2019 interpolation='nearest_neighbor', nthreads=0):
2020 if not self._f_index:
2021 self.open()
2023 out = {}
2024 ntargets = multi_location.ntargets
2025 source_terms = source.get_source_terms(self.config.component_scheme)
2026 # TODO: deal with delays for snapshots > 1 sample
2028 if itsnapshot is not None:
2029 delays = source.times
2031 # Fringe case where we sample at sample 0 and sample 1
2032 tsnapshot = itsnapshot * self.config.deltat
2033 if delays.max() == tsnapshot and delays.min() != tsnapshot:
2034 delays[delays == delays.max()] -= self.config.deltat
2036 else:
2037 delays = source.times * 0
2038 itsnapshot = 1
2040 if ntargets == 0:
2041 raise StoreError('MultiLocation.coords5 is empty')
2043 res = store_ext.store_calc_static(
2044 self.cstore,
2045 source.coords5(),
2046 source_terms,
2047 delays,
2048 multi_location.coords5,
2049 self.config.component_scheme,
2050 interpolation,
2051 itsnapshot,
2052 nthreads)
2054 out = {}
2055 for icomp, (comp, comp_res) in enumerate(
2056 zip(self.get_provided_components(), res)):
2057 if comp not in components:
2058 continue
2059 out[comp] = res[icomp]
2061 return out
2063 def calc_seismograms(self, source, receivers, components, deltat=None,
2064 itmin=None, nsamples=None,
2065 interpolation='nearest_neighbor',
2066 optimization='enable', nthreads=1):
2067 config = self.config
2069 assert interpolation in ['nearest_neighbor', 'multilinear'], \
2070 'Unknown interpolation: %s' % interpolation
2072 if not isinstance(receivers, list):
2073 receivers = [receivers]
2075 if deltat is None:
2076 decimate = 1
2077 else:
2078 decimate = int(round(deltat/config.deltat))
2079 if abs(deltat / (decimate * config.deltat) - 1.0) > 0.001:
2080 raise StoreError(
2081 'unavailable decimation ratio target.deltat / store.deltat'
2082 ' = %g / %g' % (deltat, config.deltat))
2084 store, decimate_ = self._decimated_store(decimate)
2086 if not store._f_index:
2087 store.open()
2089 scheme = config.component_scheme
2091 source_coords_arr = source.coords5()
2092 source_terms = source.get_source_terms(scheme)
2093 delays = source.times.ravel()
2095 nreceiver = len(receivers)
2096 receiver_coords_arr = num.empty((nreceiver, 5))
2097 for irec, rec in enumerate(receivers):
2098 receiver_coords_arr[irec, :] = rec.coords5
2100 dt = self.get_deltat()
2102 itoffset = int(num.floor(delays.min()/dt)) if delays.size != 0 else 0
2104 if itmin is None:
2105 itmin = num.zeros(nreceiver, dtype=num.int32)
2106 else:
2107 itmin = (itmin-itoffset).astype(num.int32)
2109 if nsamples is None:
2110 nsamples = num.zeros(nreceiver, dtype=num.int32) - 1
2111 else:
2112 nsamples = nsamples.astype(num.int32)
2114 try:
2115 results = store_ext.store_calc_timeseries(
2116 store.cstore,
2117 source_coords_arr,
2118 source_terms,
2119 (delays - itoffset*dt),
2120 receiver_coords_arr,
2121 scheme,
2122 interpolation,
2123 itmin,
2124 nsamples,
2125 nthreads)
2126 except Exception as e:
2127 raise e
2129 provided_components = self.get_provided_components()
2130 ncomponents = len(provided_components)
2132 seismograms = [dict() for _ in range(nreceiver)]
2133 for ires in range(len(results)):
2134 res = results.pop(0)
2135 ireceiver = ires // ncomponents
2137 comp = provided_components[res[-2]]
2139 if comp not in components:
2140 continue
2142 tr = GFTrace(*res[:-2])
2143 tr.deltat = config.deltat * decimate
2144 tr.itmin += itoffset
2146 tr.n_records_stacked = 0
2147 tr.t_optimize = 0.
2148 tr.t_stack = 0.
2149 tr.err = res[-1]
2151 seismograms[ireceiver][comp] = tr
2153 return seismograms
2155 def seismogram(self, source, receiver, components, deltat=None,
2156 itmin=None, nsamples=None,
2157 interpolation='nearest_neighbor',
2158 optimization='enable', nthreads=1):
2160 config = self.config
2162 if deltat is None:
2163 decimate = 1
2164 else:
2165 decimate = int(round(deltat/config.deltat))
2166 if abs(deltat / (decimate * config.deltat) - 1.0) > 0.001:
2167 raise StoreError(
2168 'unavailable decimation ratio target.deltat / store.deltat'
2169 ' = %g / %g' % (deltat, config.deltat))
2171 store, decimate_ = self._decimated_store(decimate)
2173 if not store._f_index:
2174 store.open()
2176 scheme = config.component_scheme
2178 source_coords_arr = source.coords5()
2179 source_terms = source.get_source_terms(scheme)
2180 receiver_coords_arr = receiver.coords5[num.newaxis, :].copy()
2182 try:
2183 params = store_ext.make_sum_params(
2184 store.cstore,
2185 source_coords_arr,
2186 source_terms,
2187 receiver_coords_arr,
2188 scheme,
2189 interpolation, nthreads)
2191 except store_ext.StoreExtError:
2192 raise meta.OutOfBounds()
2194 provided_components = self.get_provided_components()
2196 out = {}
2197 for icomp, comp in enumerate(provided_components):
2198 if comp in components:
2199 weights, irecords = params[icomp]
2201 neach = irecords.size // source.times.size
2202 delays = num.repeat(source.times, neach)
2204 tr = store._sum(
2205 irecords, delays, weights, itmin, nsamples, decimate_,
2206 'c', optimization)
2208 # to prevent problems with rounding errors (BaseStore saves
2209 # deltat as a 4-byte floating point value, value from YAML
2210 # config is more accurate)
2211 tr.deltat = config.deltat * decimate
2213 out[comp] = tr
2215 return out
2218__all__ = '''
2219gf_dtype
2220NotMultipleOfSamplingInterval
2221GFTrace
2222CannotCreate
2223CannotOpen
2224DuplicateInsert
2225NotAllowedToInterpolate
2226NoSuchExtra
2227NoSuchPhase
2228BaseStore
2229Store
2230SeismosizerErrorEnum
2231'''.split()