1# http://pyrocko.org - GPLv3
2#
3# The Pyrocko Developers, 21st Century
4# ---|P------/S----------~Lg----------
5from __future__ import absolute_import, division
7import errno
8import time
9import os
10import struct
11import weakref
12import math
13import shutil
14try:
15 import fcntl
16except ImportError:
17 fcntl = None
18import copy
19import logging
20import re
21import hashlib
23import numpy as num
24from scipy import signal
26from . import meta
27from .error import StoreError
28try:
29 from . import store_ext
30except ImportError:
31 store_ext = None
32from pyrocko import util, spit
34logger = logging.getLogger('pyrocko.gf.store')
35op = os.path
37# gf store endianness
38E = '<'
40gf_dtype = num.dtype(num.float32)
41gf_dtype_store = num.dtype(E + 'f4')
43gf_dtype_nbytes_per_sample = 4
45gf_store_header_dtype = [
46 ('nrecords', E + 'u8'),
47 ('deltat', E + 'f4'),
48]
50gf_store_header_fmt = E + 'Qf'
51gf_store_header_fmt_size = struct.calcsize(gf_store_header_fmt)
53gf_record_dtype = num.dtype([
54 ('data_offset', E + 'u8'),
55 ('itmin', E + 'i4'),
56 ('nsamples', E + 'u4'),
57 ('begin_value', E + 'f4'),
58 ('end_value', E + 'f4'),
59])
62class SeismosizerErrorEnum:
63 SUCCESS = 0
64 INVALID_RECORD = 1
65 EMPTY_RECORD = 2
66 BAD_RECORD = 3
67 ALLOC_FAILED = 4
68 BAD_REQUEST = 5
69 BAD_DATA_OFFSET = 6
70 READ_DATA_FAILED = 7
71 SEEK_INDEX_FAILED = 8
72 READ_INDEX_FAILED = 9
73 FSTAT_TRACES_FAILED = 10
74 BAD_STORE = 11
75 MMAP_INDEX_FAILED = 12
76 MMAP_TRACES_FAILED = 13
77 INDEX_OUT_OF_BOUNDS = 14
78 NTARGETS_OUT_OF_BOUNDS = 15
81def valid_string_id(s):
82 return re.match(meta.StringID.pattern, s)
85def check_string_id(s):
86 if not valid_string_id(s):
87 raise ValueError('invalid name %s' % s)
89# - data_offset
90#
91# Data file offset of first sample in bytes (the seek value).
92# Special values:
93#
94# 0x00 - missing trace
95# 0x01 - zero trace (GF trace is physically zero)
96# 0x02 - short trace of 1 or 2 samples (no need for data allocation)
97#
98# - itmin
99#
100# Time of first sample of the trace as a multiple of the sampling interval. All
101# GF samples must be evaluated at multiples of the sampling interval with
102# respect to a global reference time zero. Must be set also for zero traces.
103#
104# - nsamples
105#
106# Number of samples in the GF trace. Should be set to zero for zero traces.
107#
108# - begin_value, end_value
109#
110# Values of first and last sample. These values are included in data[]
111# redunantly.
114class NotMultipleOfSamplingInterval(Exception):
115 pass
118sampling_check_eps = 1e-5
121class GFTrace(object):
122 '''
123 Green's Function trace class for handling traces from the GF store.
124 '''
126 @classmethod
127 def from_trace(cls, tr):
128 return cls(data=tr.ydata.copy(), tmin=tr.tmin, deltat=tr.deltat)
130 def __init__(self, data=None, itmin=None, deltat=1.0,
131 is_zero=False, begin_value=None, end_value=None, tmin=None):
133 assert sum((x is None) for x in (tmin, itmin)) == 1, \
134 'GFTrace: either tmin or itmin must be given'\
136 if tmin is not None:
137 itmin = int(round(tmin / deltat))
138 if abs(itmin*deltat - tmin) > sampling_check_eps*deltat:
139 raise NotMultipleOfSamplingInterval(
140 'GFTrace: tmin (%g) is not a multiple of sampling '
141 'interval (%g)' % (tmin, deltat))
143 if data is not None:
144 data = num.asarray(data, dtype=gf_dtype)
145 else:
146 data = num.array([], dtype=gf_dtype)
147 begin_value = 0.0
148 end_value = 0.0
150 self.data = weakref.ref(data)()
151 self.itmin = itmin
152 self.deltat = deltat
153 self.is_zero = is_zero
154 self.n_records_stacked = 0.
155 self.t_stack = 0.
156 self.t_optimize = 0.
157 self.err = SeismosizerErrorEnum.SUCCESS
159 if data is not None and data.size > 0:
160 if begin_value is None:
161 begin_value = data[0]
162 if end_value is None:
163 end_value = data[-1]
165 self.begin_value = begin_value
166 self.end_value = end_value
168 @property
169 def t(self):
170 '''
171 Time vector of the GF trace.
173 :returns: Time vector
174 :rtype: :py:class:`numpy.ndarray`
175 '''
176 return num.linspace(
177 self.itmin*self.deltat,
178 (self.itmin+self.data.size-1)*self.deltat, self.data.size)
180 def __str__(self, itmin=0):
182 if self.is_zero:
183 return 'ZERO'
185 s = []
186 for i in range(itmin, self.itmin + self.data.size):
187 if i >= self.itmin and i < self.itmin + self.data.size:
188 s.append('%7.4g' % self.data[i-self.itmin])
189 else:
190 s.append(' '*7)
192 return '|'.join(s)
194 def to_trace(self, net, sta, loc, cha):
195 from pyrocko import trace
196 return trace.Trace(
197 net, sta, loc, cha,
198 ydata=self.data,
199 deltat=self.deltat,
200 tmin=self.itmin*self.deltat)
203class GFValue(object):
205 def __init__(self, value):
206 self.value = value
207 self.n_records_stacked = 0.
208 self.t_stack = 0.
209 self.t_optimize = 0.
212Zero = GFTrace(is_zero=True, itmin=0)
215def make_same_span(tracesdict):
217 traces = tracesdict.values()
219 nonzero = [tr for tr in traces if not tr.is_zero]
220 if not nonzero:
221 return {k: Zero for k in tracesdict.keys()}
223 deltat = nonzero[0].deltat
225 itmin = min(tr.itmin for tr in nonzero)
226 itmax = max(tr.itmin+tr.data.size for tr in nonzero) - 1
228 out = {}
229 for k, tr in tracesdict.items():
230 n = itmax - itmin + 1
231 if tr.itmin != itmin or tr.data.size != n:
232 data = num.zeros(n, dtype=gf_dtype)
233 if not tr.is_zero:
234 lo = tr.itmin-itmin
235 hi = lo + tr.data.size
236 data[:lo] = tr.data[0]
237 data[lo:hi] = tr.data
238 data[hi:] = tr.data[-1]
240 tr = GFTrace(data, itmin, deltat)
242 out[k] = tr
244 return out
247class CannotCreate(StoreError):
248 pass
251class CannotOpen(StoreError):
252 pass
255class DuplicateInsert(StoreError):
256 pass
259class ShortRead(StoreError):
260 def __str__(self):
261 return 'unexpected end of data (truncated traces file?)'
264class NotAllowedToInterpolate(StoreError):
265 def __str__(self):
266 return 'not allowed to interpolate'
269class NoSuchExtra(StoreError):
270 def __init__(self, s):
271 StoreError.__init__(self)
272 self.value = s
274 def __str__(self):
275 return 'extra information for key "%s" not found.' % self.value
278class NoSuchPhase(StoreError):
279 def __init__(self, s):
280 StoreError.__init__(self)
281 self.value = s
283 def __str__(self):
284 return 'phase for key "%s" not found. ' \
285 'Running "fomosto ttt" may be needed.' % self.value
288def remove_if_exists(fn, force=False):
289 if os.path.exists(fn):
290 if force:
291 os.remove(fn)
292 else:
293 raise CannotCreate('file %s already exists' % fn)
296def get_extra_path(store_dir, key):
297 check_string_id(key)
298 return os.path.join(store_dir, 'extra', key)
301class BaseStore(object):
303 @staticmethod
304 def lock_fn_(store_dir):
305 return os.path.join(store_dir, 'lock')
307 @staticmethod
308 def index_fn_(store_dir):
309 return os.path.join(store_dir, 'index')
311 @staticmethod
312 def data_fn_(store_dir):
313 return os.path.join(store_dir, 'traces')
315 @staticmethod
316 def config_fn_(store_dir):
317 return os.path.join(store_dir, 'config')
319 @staticmethod
320 def create(store_dir, deltat, nrecords, force=False):
322 try:
323 util.ensuredir(store_dir)
324 except Exception:
325 raise CannotCreate('cannot create directory %s' % store_dir)
327 index_fn = BaseStore.index_fn_(store_dir)
328 data_fn = BaseStore.data_fn_(store_dir)
330 for fn in (index_fn, data_fn):
331 remove_if_exists(fn, force)
333 with open(index_fn, 'wb') as f:
334 f.write(struct.pack(gf_store_header_fmt, nrecords, deltat))
335 records = num.zeros(nrecords, dtype=gf_record_dtype)
336 records.tofile(f)
338 with open(data_fn, 'wb') as f:
339 f.write(b'\0' * 32)
341 def __init__(self, store_dir, mode='r', use_memmap=True):
342 assert mode in 'rw'
343 self.store_dir = store_dir
344 self.mode = mode
345 self._use_memmap = use_memmap
346 self._nrecords = None
347 self._deltat = None
348 self._f_index = None
349 self._f_data = None
350 self._records = None
351 self.cstore = None
353 def open(self):
354 assert self._f_index is None
355 index_fn = self.index_fn()
356 data_fn = self.data_fn()
358 if self.mode == 'r':
359 fmode = 'rb'
360 elif self.mode == 'w':
361 fmode = 'r+b'
362 else:
363 assert False, 'invalid mode: %s' % self.mode
365 try:
366 self._f_index = open(index_fn, fmode)
367 self._f_data = open(data_fn, fmode)
368 except Exception:
369 self.mode = ''
370 raise CannotOpen('cannot open gf store: %s' % self.store_dir)
372 try:
373 # replace single precision deltat value in store with the double
374 # precision value from the config, if available
375 self.cstore = store_ext.store_init(
376 self._f_index.fileno(), self._f_data.fileno(),
377 self.get_deltat() or 0.0)
379 except store_ext.StoreExtError as e:
380 raise StoreError(str(e))
382 while True:
383 try:
384 dataheader = self._f_index.read(gf_store_header_fmt_size)
385 break
387 except IOError as e:
388 # occasionally got this one on an NFS volume
389 if e.errno == errno.EBUSY:
390 time.sleep(0.01)
391 else:
392 raise
394 nrecords, deltat = struct.unpack(gf_store_header_fmt, dataheader)
395 self._nrecords = nrecords
396 self._deltat = deltat
398 self._load_index()
400 def __del__(self):
401 if self.mode != '':
402 self.close()
404 def lock(self):
405 if not fcntl:
406 lock_fn = self.lock_fn()
407 util.create_lockfile(lock_fn)
408 else:
409 if not self._f_index:
410 self.open()
412 while True:
413 try:
414 fcntl.lockf(self._f_index, fcntl.LOCK_EX)
415 break
417 except IOError as e:
418 if e.errno == errno.ENOLCK:
419 time.sleep(0.01)
420 else:
421 raise
423 def unlock(self):
424 if not fcntl:
425 lock_fn = self.lock_fn()
426 util.delete_lockfile(lock_fn)
427 else:
428 self._f_data.flush()
429 self._f_index.flush()
430 fcntl.lockf(self._f_index, fcntl.LOCK_UN)
432 def put(self, irecord, trace):
433 self._put(irecord, trace)
435 def get_record(self, irecord):
436 return self._get_record(irecord)
438 def get_span(self, irecord, decimate=1):
439 return self._get_span(irecord, decimate=decimate)
441 def get(self, irecord, itmin=None, nsamples=None, decimate=1,
442 implementation='c'):
443 return self._get(irecord, itmin, nsamples, decimate, implementation)
445 def sum(self, irecords, delays, weights, itmin=None,
446 nsamples=None, decimate=1, implementation='c',
447 optimization='enable'):
448 return self._sum(irecords, delays, weights, itmin, nsamples, decimate,
449 implementation, optimization)
451 def irecord_format(self):
452 return util.zfmt(self._nrecords)
454 def str_irecord(self, irecord):
455 return self.irecord_format() % irecord
457 def close(self):
458 if self.mode == 'w':
459 if not self._f_index:
460 self.open()
461 self._save_index()
463 if self._f_data:
464 self._f_data.close()
465 self._f_data = None
467 if self._f_index:
468 self._f_index.close()
469 self._f_index = None
471 del self._records
472 self._records = None
474 self.mode = ''
476 def _get_record(self, irecord):
477 if not self._f_index:
478 self.open()
480 return self._records[irecord]
482 def _get(self, irecord, itmin, nsamples, decimate, implementation):
483 '''
484 Retrieve complete GF trace from storage.
485 '''
487 if not self._f_index:
488 self.open()
490 if not self.mode == 'r':
491 raise StoreError('store not open in read mode')
493 if implementation == 'c' and decimate == 1:
495 if nsamples is None:
496 nsamples = -1
498 if itmin is None:
499 itmin = 0
501 try:
502 return GFTrace(*store_ext.store_get(
503 self.cstore, int(irecord), int(itmin), int(nsamples)))
504 except store_ext.StoreExtError as e:
505 raise StoreError(str(e))
507 else:
508 return self._get_impl_reference(irecord, itmin, nsamples, decimate)
510 def get_deltat(self):
511 return self._deltat
513 def _get_impl_reference(self, irecord, itmin, nsamples, decimate):
514 deltat = self.get_deltat()
516 if not (0 <= irecord < self._nrecords):
517 raise StoreError('invalid record number requested '
518 '(irecord = %i, nrecords = %i)' %
519 (irecord, self._nrecords))
521 ipos, itmin_data, nsamples_data, begin_value, end_value = \
522 self._records[irecord]
524 if None in (itmin, nsamples):
525 itmin = itmin_data
526 itmax = itmin_data + nsamples_data - 1
527 nsamples = nsamples_data
528 else:
529 itmin = itmin * decimate
530 nsamples = nsamples * decimate
531 itmax = itmin + nsamples - decimate
533 if ipos == 0:
534 return None
536 elif ipos == 1:
537 itmin_ext = (max(itmin, itmin_data)//decimate) * decimate
538 return GFTrace(is_zero=True, itmin=itmin_ext//decimate)
540 if decimate == 1:
541 ilo = max(itmin, itmin_data) - itmin_data
542 ihi = min(itmin+nsamples, itmin_data+nsamples_data) - itmin_data
543 data = self._get_data(ipos, begin_value, end_value, ilo, ihi)
545 return GFTrace(data, itmin=itmin_data+ilo, deltat=deltat,
546 begin_value=begin_value, end_value=end_value)
548 else:
549 itmax_data = itmin_data + nsamples_data - 1
551 # put begin and end to multiples of new sampling rate
552 itmin_ext = (max(itmin, itmin_data)//decimate) * decimate
553 itmax_ext = -((-min(itmax, itmax_data))//decimate) * decimate
554 nsamples_ext = itmax_ext - itmin_ext + 1
556 # add some padding for the aa filter
557 order = 30
558 itmin_ext_pad = itmin_ext - order//2
559 itmax_ext_pad = itmax_ext + order//2
560 nsamples_ext_pad = itmax_ext_pad - itmin_ext_pad + 1
562 itmin_overlap = max(itmin_data, itmin_ext_pad)
563 itmax_overlap = min(itmax_data, itmax_ext_pad)
565 ilo = itmin_overlap - itmin_ext_pad
566 ihi = max(ilo, itmax_overlap - itmin_ext_pad + 1)
567 ilo_data = itmin_overlap - itmin_data
568 ihi_data = max(ilo_data, itmax_overlap - itmin_data + 1)
570 data_ext_pad = num.empty(nsamples_ext_pad, dtype=gf_dtype)
571 data_ext_pad[ilo:ihi] = self._get_data(
572 ipos, begin_value, end_value, ilo_data, ihi_data)
574 data_ext_pad[:ilo] = begin_value
575 data_ext_pad[ihi:] = end_value
577 b = signal.firwin(order + 1, 1. / decimate, window='hamming')
578 a = 1.
579 data_filt_pad = signal.lfilter(b, a, data_ext_pad)
580 data_deci = data_filt_pad[order:order+nsamples_ext:decimate]
581 if data_deci.size >= 1:
582 if itmin_ext <= itmin_data:
583 data_deci[0] = begin_value
585 if itmax_ext >= itmax_data:
586 data_deci[-1] = end_value
588 return GFTrace(data_deci, itmin_ext//decimate,
589 deltat*decimate,
590 begin_value=begin_value, end_value=end_value)
592 def _get_span(self, irecord, decimate=1):
593 '''
594 Get temporal extent of GF trace at given index.
595 '''
597 if not self._f_index:
598 self.open()
600 assert 0 <= irecord < self._nrecords, \
601 'irecord = %i, nrecords = %i' % (irecord, self._nrecords)
603 (_, itmin, nsamples, _, _) = self._records[irecord]
605 itmax = itmin + nsamples - 1
607 if decimate == 1:
608 return itmin, itmax
609 else:
610 if nsamples == 0:
611 return itmin//decimate, itmin//decimate - 1
612 else:
613 return itmin//decimate, -((-itmax)//decimate)
615 def _put(self, irecord, trace):
616 '''
617 Save GF trace to storage.
618 '''
620 if not self._f_index:
621 self.open()
623 deltat = self.get_deltat()
625 assert self.mode == 'w'
626 assert trace.is_zero or \
627 abs(trace.deltat - deltat) < 1e-7 * deltat
628 assert 0 <= irecord < self._nrecords, \
629 'irecord = %i, nrecords = %i' % (irecord, self._nrecords)
631 if self._records[irecord][0] != 0:
632 raise DuplicateInsert('record %i already in store' % irecord)
634 if trace.is_zero or num.all(trace.data == 0.0):
635 self._records[irecord] = (1, trace.itmin, 0, 0., 0.)
636 return
638 ndata = trace.data.size
640 if ndata > 2:
641 self._f_data.seek(0, 2)
642 ipos = self._f_data.tell()
643 trace.data.astype(gf_dtype_store).tofile(self._f_data)
644 else:
645 ipos = 2
647 self._records[irecord] = (ipos, trace.itmin, ndata,
648 trace.data[0], trace.data[-1])
650 def _sum_impl_alternative(self, irecords, delays, weights, itmin, nsamples,
651 decimate):
653 '''
654 Sum delayed and weighted GF traces.
655 '''
657 if not self._f_index:
658 self.open()
660 assert self.mode == 'r'
662 deltat = self.get_deltat() * decimate
664 if len(irecords) == 0:
665 if None in (itmin, nsamples):
666 return Zero
667 else:
668 return GFTrace(
669 num.zeros(nsamples, dtype=gf_dtype), itmin,
670 deltat, is_zero=True)
672 assert len(irecords) == len(delays)
673 assert len(irecords) == len(weights)
675 if None in (itmin, nsamples):
676 itmin_delayed, itmax_delayed = [], []
677 for irecord, delay in zip(irecords, delays):
678 itmin, itmax = self._get_span(irecord, decimate=decimate)
679 itmin_delayed.append(itmin + delay/deltat)
680 itmax_delayed.append(itmax + delay/deltat)
682 itmin = int(math.floor(min(itmin_delayed)))
683 nsamples = int(math.ceil(max(itmax_delayed))) - itmin + 1
685 out = num.zeros(nsamples, dtype=gf_dtype)
686 if nsamples == 0:
687 return GFTrace(out, itmin, deltat)
689 for ii in range(len(irecords)):
690 irecord = irecords[ii]
691 delay = delays[ii]
692 weight = weights[ii]
694 if weight == 0.0:
695 continue
697 idelay_floor = int(math.floor(delay/deltat))
698 idelay_ceil = int(math.ceil(delay/deltat))
700 gf_trace = self._get(
701 irecord,
702 itmin - idelay_ceil,
703 nsamples + idelay_ceil - idelay_floor,
704 decimate,
705 'reference')
707 assert gf_trace.itmin >= itmin - idelay_ceil
708 assert gf_trace.data.size <= nsamples + idelay_ceil - idelay_floor
710 if gf_trace.is_zero:
711 continue
713 ilo = gf_trace.itmin - itmin + idelay_floor
714 ihi = ilo + gf_trace.data.size + (idelay_ceil-idelay_floor)
716 data = gf_trace.data
718 if idelay_floor == idelay_ceil:
719 out[ilo:ihi] += data * weight
720 else:
721 if data.size:
722 k = 1
723 if ihi <= nsamples:
724 out[ihi-1] += gf_trace.end_value * \
725 ((idelay_ceil-delay/deltat) * weight)
726 k = 0
728 out[ilo+1:ihi-k] += data[:data.size-k] * \
729 ((delay/deltat-idelay_floor) * weight)
730 k = 1
731 if ilo >= 0:
732 out[ilo] += gf_trace.begin_value * \
733 ((delay/deltat-idelay_floor) * weight)
734 k = 0
736 out[ilo+k:ihi-1] += data[k:] * \
737 ((idelay_ceil-delay/deltat) * weight)
739 if ilo > 0 and gf_trace.begin_value != 0.0:
740 out[:ilo] += gf_trace.begin_value * weight
742 if ihi < nsamples and gf_trace.end_value != 0.0:
743 out[ihi:] += gf_trace.end_value * weight
745 return GFTrace(out, itmin, deltat)
747 def _sum_impl_reference(self, irecords, delays, weights, itmin, nsamples,
748 decimate):
750 if not self._f_index:
751 self.open()
753 deltat = self.get_deltat() * decimate
755 if len(irecords) == 0:
756 if None in (itmin, nsamples):
757 return Zero
758 else:
759 return GFTrace(
760 num.zeros(nsamples, dtype=gf_dtype), itmin,
761 deltat, is_zero=True)
763 datas = []
764 itmins = []
765 for i, delay, weight in zip(irecords, delays, weights):
766 tr = self._get(i, None, None, decimate, 'reference')
768 idelay_floor = int(math.floor(delay/deltat))
769 idelay_ceil = int(math.ceil(delay/deltat))
771 if idelay_floor == idelay_ceil:
772 itmins.append(tr.itmin + idelay_floor)
773 datas.append(tr.data.copy()*weight)
774 else:
775 itmins.append(tr.itmin + idelay_floor)
776 datas.append(tr.data.copy()*weight*(idelay_ceil-delay/deltat))
777 itmins.append(tr.itmin + idelay_ceil)
778 datas.append(tr.data.copy()*weight*(delay/deltat-idelay_floor))
780 itmin_all = min(itmins)
782 itmax_all = max(itmin_ + data.size for (itmin_, data) in
783 zip(itmins, datas))
785 if itmin is not None:
786 itmin_all = min(itmin_all, itmin)
787 if nsamples is not None:
788 itmax_all = max(itmax_all, itmin+nsamples)
790 nsamples_all = itmax_all - itmin_all
792 arr = num.zeros((len(datas), nsamples_all), dtype=gf_dtype)
793 for i, itmin_, data in zip(num.arange(len(datas)), itmins, datas):
794 if data.size > 0:
795 ilo = itmin_-itmin_all
796 ihi = ilo + data.size
797 arr[i, :ilo] = data[0]
798 arr[i, ilo:ihi] = data
799 arr[i, ihi:] = data[-1]
801 sum_arr = arr.sum(axis=0)
803 if itmin is not None and nsamples is not None:
804 ilo = itmin-itmin_all
805 ihi = ilo + nsamples
806 sum_arr = sum_arr[ilo:ihi]
808 else:
809 itmin = itmin_all
811 return GFTrace(sum_arr, itmin, deltat)
813 def _optimize(self, irecords, delays, weights):
814 if num.unique(irecords).size == irecords.size:
815 return irecords, delays, weights
817 deltat = self.get_deltat()
819 delays = delays / deltat
820 irecords2 = num.repeat(irecords, 2)
821 delays2 = num.empty(irecords2.size, dtype=float)
822 delays2[0::2] = num.floor(delays)
823 delays2[1::2] = num.ceil(delays)
824 weights2 = num.repeat(weights, 2)
825 weights2[0::2] *= 1.0 - (delays - delays2[0::2])
826 weights2[1::2] *= (1.0 - (delays2[1::2] - delays)) * \
827 (delays2[1::2] - delays2[0::2])
829 delays2 *= deltat
831 iorder = num.lexsort((delays2, irecords2))
833 irecords2 = irecords2[iorder]
834 delays2 = delays2[iorder]
835 weights2 = weights2[iorder]
837 ui = num.empty(irecords2.size, dtype=num.bool)
838 ui[1:] = num.logical_or(num.diff(irecords2) != 0,
839 num.diff(delays2) != 0.)
841 ui[0] = 0
842 ind2 = num.cumsum(ui)
843 ui[0] = 1
844 ind1 = num.where(ui)[0]
846 irecords3 = irecords2[ind1]
847 delays3 = delays2[ind1]
848 weights3 = num.bincount(ind2, weights2)
850 return irecords3, delays3, weights3
852 def _sum(self, irecords, delays, weights, itmin, nsamples, decimate,
853 implementation, optimization):
855 if not self._f_index:
856 self.open()
858 t0 = time.time()
859 if optimization == 'enable':
860 irecords, delays, weights = self._optimize(
861 irecords, delays, weights)
862 else:
863 assert optimization == 'disable'
865 t1 = time.time()
866 deltat = self.get_deltat()
868 if implementation == 'c' and decimate == 1:
869 if delays.size != 0:
870 itoffset = int(num.floor(num.min(delays)/deltat))
871 else:
872 itoffset = 0
874 if nsamples is None:
875 nsamples = -1
877 if itmin is None:
878 itmin = 0
879 else:
880 itmin -= itoffset
882 try:
883 tr = GFTrace(*store_ext.store_sum(
884 self.cstore, irecords.astype(num.uint64),
885 delays - itoffset*deltat,
886 weights.astype(num.float32),
887 int(itmin), int(nsamples)))
889 tr.itmin += itoffset
891 except store_ext.StoreExtError as e:
892 raise StoreError(str(e) + ' in store %s' % self.store_dir)
894 elif implementation == 'alternative':
895 tr = self._sum_impl_alternative(irecords, delays, weights,
896 itmin, nsamples, decimate)
898 else:
899 tr = self._sum_impl_reference(irecords, delays, weights,
900 itmin, nsamples, decimate)
902 t2 = time.time()
904 tr.n_records_stacked = irecords.size
905 tr.t_optimize = t1 - t0
906 tr.t_stack = t2 - t1
908 return tr
910 def _sum_statics(self, irecords, delays, weights, it, ntargets,
911 nthreads):
912 if not self._f_index:
913 self.open()
915 return store_ext.store_sum_static(
916 self.cstore, irecords, delays, weights, it, ntargets, nthreads)
918 def _load_index(self):
919 if self._use_memmap:
920 records = num.memmap(
921 self._f_index, dtype=gf_record_dtype,
922 offset=gf_store_header_fmt_size,
923 mode=('r', 'r+')[self.mode == 'w'])
925 else:
926 self._f_index.seek(gf_store_header_fmt_size)
927 records = num.fromfile(self._f_index, dtype=gf_record_dtype)
929 assert len(records) == self._nrecords
931 self._records = records
933 def _save_index(self):
934 self._f_index.seek(0)
935 self._f_index.write(struct.pack(gf_store_header_fmt, self._nrecords,
936 self.get_deltat()))
938 if self._use_memmap:
939 self._records.flush()
940 else:
941 self._f_index.seek(gf_store_header_fmt_size)
942 self._records.tofile(self._f_index)
943 self._f_index.flush()
945 def _get_data(self, ipos, begin_value, end_value, ilo, ihi):
946 if ihi - ilo > 0:
947 if ipos == 2:
948 data_orig = num.empty(2, dtype=gf_dtype)
949 data_orig[0] = begin_value
950 data_orig[1] = end_value
951 return data_orig[ilo:ihi]
952 else:
953 self._f_data.seek(
954 int(ipos + ilo*gf_dtype_nbytes_per_sample))
955 arr = num.fromfile(
956 self._f_data, gf_dtype_store, ihi-ilo).astype(gf_dtype)
958 if arr.size != ihi-ilo:
959 raise ShortRead()
960 return arr
961 else:
962 return num.empty((0,), dtype=gf_dtype)
964 def lock_fn(self):
965 return BaseStore.lock_fn_(self.store_dir)
967 def index_fn(self):
968 return BaseStore.index_fn_(self.store_dir)
970 def data_fn(self):
971 return BaseStore.data_fn_(self.store_dir)
973 def config_fn(self):
974 return BaseStore.config_fn_(self.store_dir)
976 def count_special_records(self):
977 if not self._f_index:
978 self.open()
980 return num.histogram(self._records['data_offset'],
981 bins=[0, 1, 2, 3, num.uint64(-1)])[0]
983 @property
984 def size_index(self):
985 return os.stat(self.index_fn()).st_size
987 @property
988 def size_data(self):
989 return os.stat(self.data_fn()).st_size
991 @property
992 def size_index_and_data(self):
993 return self.size_index + self.size_data
995 @property
996 def size_index_and_data_human(self):
997 return util.human_bytesize(self.size_index_and_data)
999 def stats(self):
1000 counter = self.count_special_records()
1002 stats = dict(
1003 total=self._nrecords,
1004 inserted=(counter[1] + counter[2] + counter[3]),
1005 empty=counter[0],
1006 short=counter[2],
1007 zero=counter[1],
1008 size_data=self.size_data,
1009 size_index=self.size_index,
1010 )
1012 return stats
1014 stats_keys = 'total inserted empty short zero size_data size_index'.split()
1017def remake_dir(dpath, force):
1018 if os.path.exists(dpath):
1019 if force:
1020 shutil.rmtree(dpath)
1021 else:
1022 raise CannotCreate('Directory "%s" already exists.' % dpath)
1024 os.mkdir(dpath)
1027class MakeTimingParamsFailed(StoreError):
1028 pass
1031class Store(BaseStore):
1033 '''
1034 Green's function disk storage and summation machine.
1036 The `Store` can be used to efficiently store, retrieve, and sum Green's
1037 function traces. A `Store` contains many 1D time traces sampled at even
1038 multiples of a global sampling rate, where each time trace has an
1039 individual start and end time. The traces are treated as having repeating
1040 end points, so the functions they represent can be non-constant only
1041 between begin and end time. It provides capabilities to retrieve decimated
1042 traces and to extract parts of the traces. The main purpose of this class
1043 is to provide a fast, easy to use, and flexible machanism to compute
1044 weighted delay-and-sum stacks with many Green's function traces involved.
1046 Individual Green's functions are accessed through a single integer index at
1047 low level. On top of that, various indexing schemes can be implemented by
1048 providing a mapping from physical coordinates to the low level index `i`.
1049 E.g. for a problem with cylindrical symmetry, one might define a mapping
1050 from the coordinates (`receiver_depth`, `source_depth`, `distance`) to the
1051 low level index. Index translation is done in the
1052 :py:class:`pyrocko.gf.meta.Config` subclass object associated with the
1053 Store. It is accessible through the store's :py:attr:`config` attribute,
1054 and contains all meta information about the store, including physical
1055 extent, geometry, sampling rate, and information about the type of the
1056 stored Green's functions. Information about the underlying earth model
1057 can also be found there.
1059 A GF store can also contain tabulated phase arrivals. In basic cases, these
1060 can be created with the :py:meth:`make_ttt` and evaluated with the
1061 :py:func:`t` methods.
1063 .. attribute:: config
1065 The :py:class:`pyrocko.gf.meta.Config` derived object associated with
1066 the store which contains all meta information about the store and
1067 provides the high-level to low-level index mapping.
1069 .. attribute:: store_dir
1071 Path to the store's data directory.
1073 .. attribute:: mode
1075 The mode in which the store is opened: ``'r'``: read-only, ``'w'``:
1076 writeable.
1077 '''
1079 @classmethod
1080 def create(cls, store_dir, config, force=False, extra=None):
1081 '''
1082 Create new GF store.
1084 Creates a new GF store at path ``store_dir``. The layout of the GF is
1085 defined with the parameters given in ``config``, which should be an
1086 object of a subclass of :py:class:`~pyrocko.gf.meta.Config`. This
1087 function will refuse to overwrite an existing GF store, unless
1088 ``force`` is set to ``True``. If more information, e.g. parameters
1089 used for the modelling code, earth models or other, should be saved
1090 along with the GF store, these may be provided though a dict given to
1091 ``extra``. The keys of this dict must be names and the values must be
1092 *guts* type objects.
1094 :param store_dir: GF store path
1095 :type store_dir: str
1096 :param config: GF store Config
1097 :type config: :py:class:`~pyrocko.gf.meta.Config`
1098 :param force: Force overwrite, defaults to ``False``
1099 :type force: bool
1100 :param extra: Extra information
1101 :type extra: dict or None
1102 '''
1104 cls.create_editables(store_dir, config, force=force, extra=extra)
1105 cls.create_dependants(store_dir, force=force)
1107 return cls(store_dir)
1109 @staticmethod
1110 def create_editables(store_dir, config, force=False, extra=None):
1111 try:
1112 util.ensuredir(store_dir)
1113 except Exception:
1114 raise CannotCreate('cannot create directory %s' % store_dir)
1116 fns = []
1118 config_fn = os.path.join(store_dir, 'config')
1119 remove_if_exists(config_fn, force)
1120 meta.dump(config, filename=config_fn)
1122 fns.append(config_fn)
1124 for sub_dir in ['extra']:
1125 dpath = os.path.join(store_dir, sub_dir)
1126 remake_dir(dpath, force)
1128 if extra:
1129 for k, v in extra.items():
1130 fn = get_extra_path(store_dir, k)
1131 remove_if_exists(fn, force)
1132 meta.dump(v, filename=fn)
1134 fns.append(fn)
1136 return fns
1138 @staticmethod
1139 def create_dependants(store_dir, force=False):
1140 config_fn = os.path.join(store_dir, 'config')
1141 config = meta.load(filename=config_fn)
1143 BaseStore.create(store_dir, config.deltat, config.nrecords,
1144 force=force)
1146 for sub_dir in ['decimated']:
1147 dpath = os.path.join(store_dir, sub_dir)
1148 remake_dir(dpath, force)
1150 def __init__(self, store_dir, mode='r', use_memmap=True):
1151 BaseStore.__init__(self, store_dir, mode=mode, use_memmap=use_memmap)
1152 config_fn = self.config_fn()
1153 if not os.path.isfile(config_fn):
1154 raise StoreError(
1155 'directory "%s" does not seem to contain a GF store '
1156 '("config" file not found)' % store_dir)
1157 self.load_config()
1159 self._decimated = {}
1160 self._extra = {}
1161 self._phases = {}
1162 for decimate in range(2, 9):
1163 if os.path.isdir(self._decimated_store_dir(decimate)):
1164 self._decimated[decimate] = None
1166 def open(self):
1167 if not self._f_index:
1168 BaseStore.open(self)
1169 c = self.config
1171 mscheme = 'type_' + c.short_type.lower()
1172 store_ext.store_mapping_init(
1173 self.cstore, mscheme,
1174 c.mins, c.maxs, c.deltas, c.ns.astype(num.uint64),
1175 c.ncomponents)
1177 def save_config(self, make_backup=False):
1178 config_fn = self.config_fn()
1179 if make_backup:
1180 os.rename(config_fn, config_fn + '~')
1182 meta.dump(self.config, filename=config_fn)
1184 def get_deltat(self):
1185 return self.config.deltat
1187 def load_config(self):
1188 self.config = meta.load(filename=self.config_fn())
1190 def ensure_reference(self, force=False):
1191 if self.config.reference is not None and not force:
1192 return
1193 self.ensure_uuid()
1194 reference = '%s-%s' % (self.config.id, self.config.uuid[0:6])
1196 if self.config.reference is not None:
1197 self.config.reference = reference
1198 self.save_config()
1199 else:
1200 with open(self.config_fn(), 'a') as config:
1201 config.write('reference: %s\n' % reference)
1202 self.load_config()
1204 def ensure_uuid(self, force=False):
1205 if self.config.uuid is not None and not force:
1206 return
1207 uuid = self.create_store_hash()
1209 if self.config.uuid is not None:
1210 self.config.uuid = uuid
1211 self.save_config()
1212 else:
1213 with open(self.config_fn(), 'a') as config:
1214 config.write('\nuuid: %s\n' % uuid)
1215 self.load_config()
1217 def create_store_hash(self):
1218 logger.info('creating hash for store ...')
1219 m = hashlib.sha1()
1221 traces_size = op.getsize(self.data_fn())
1222 with open(self.data_fn(), 'rb') as traces:
1223 while traces.tell() < traces_size:
1224 m.update(traces.read(4096))
1225 traces.seek(1024 * 1024 * 10, 1)
1227 with open(self.config_fn(), 'rb') as config:
1228 m.update(config.read())
1229 return m.hexdigest()
1231 def get_extra_path(self, key):
1232 return get_extra_path(self.store_dir, key)
1234 def get_extra(self, key):
1235 '''
1236 Get extra information stored under given key.
1237 '''
1239 x = self._extra
1240 if key not in x:
1241 fn = self.get_extra_path(key)
1242 if not os.path.exists(fn):
1243 raise NoSuchExtra(key)
1245 x[key] = meta.load(filename=fn)
1247 return x[key]
1249 def upgrade(self):
1250 '''
1251 Upgrade store config and files to latest Pyrocko version.
1252 '''
1253 fns = [os.path.join(self.store_dir, 'config')]
1254 for key in self.extra_keys():
1255 fns.append(self.get_extra_path(key))
1257 i = 0
1258 for fn in fns:
1259 i += util.pf_upgrade(fn)
1261 return i
1263 def extra_keys(self):
1264 return [x for x in os.listdir(os.path.join(self.store_dir, 'extra'))
1265 if valid_string_id(x)]
1267 def put(self, args, trace):
1268 '''
1269 Insert trace into GF store.
1271 Store a single GF trace at (high-level) index ``args``.
1273 :param args: :py:class:`~pyrocko.gf.meta.Config` index tuple, e.g.
1274 ``(source_depth, distance, component)`` as in
1275 :py:class:`~pyrocko.gf.meta.ConfigTypeA`.
1276 :type args: tuple
1277 :returns: GF trace at ``args``
1278 :rtype: :py:class:`~pyrocko.gf.store.GFTrace`
1279 '''
1281 irecord = self.config.irecord(*args)
1282 self._put(irecord, trace)
1284 def get_record(self, args):
1285 irecord = self.config.irecord(*args)
1286 return self._get_record(irecord)
1288 def str_irecord(self, args):
1289 return BaseStore.str_irecord(self, self.config.irecord(*args))
1291 def get(self, args, itmin=None, nsamples=None, decimate=1,
1292 interpolation='nearest_neighbor', implementation='c'):
1293 '''
1294 Retrieve GF trace from store.
1296 Retrieve a single GF trace from the store at (high-level) index
1297 ``args``. By default, the full trace is retrieved. Given ``itmin`` and
1298 ``nsamples``, only the selected portion of the trace is extracted. If
1299 ``decimate`` is an integer in the range [2,8], the trace is decimated
1300 on the fly or, if available, the trace is read from a decimated version
1301 of the GF store.
1303 :param args: :py:class:`~pyrocko.gf.meta.Config` index tuple, e.g.
1304 ``(source_depth, distance, component)`` as in
1305 :py:class:`~pyrocko.gf.meta.ConfigTypeA`.
1306 :type args: tuple
1307 :param itmin: Start time index (start time is ``itmin * dt``),
1308 defaults to None
1309 :type itmin: int or None
1310 :param nsamples: Number of samples, defaults to None
1311 :type nsamples: int or None
1312 :param decimate: Decimatation factor, defaults to 1
1313 :type decimate: int
1314 :param interpolation: Interpolation method
1315 ``['nearest_neighbor', 'multilinear', 'off']``, defaults to
1316 ``'nearest_neighbor'``
1317 :type interpolation: str
1318 :param implementation: Implementation to use ``['c', 'reference']``,
1319 defaults to ``'c'``.
1320 :type implementation: str
1322 :returns: GF trace at ``args``
1323 :rtype: :py:class:`~pyrocko.gf.store.GFTrace`
1324 '''
1326 store, decimate = self._decimated_store(decimate)
1327 if interpolation == 'nearest_neighbor':
1328 irecord = store.config.irecord(*args)
1329 tr = store._get(irecord, itmin, nsamples, decimate,
1330 implementation)
1332 elif interpolation == 'off':
1333 irecords, weights = store.config.vicinity(*args)
1334 if len(irecords) != 1:
1335 raise NotAllowedToInterpolate()
1336 else:
1337 tr = store._get(irecords[0], itmin, nsamples, decimate,
1338 implementation)
1340 elif interpolation == 'multilinear':
1341 tr = store._sum(irecords, num.zeros(len(irecords)), weights,
1342 itmin, nsamples, decimate, implementation,
1343 'disable')
1345 # to prevent problems with rounding errors (BaseStore saves deltat
1346 # as a 4-byte floating point value, value from YAML config is more
1347 # accurate)
1348 tr.deltat = self.config.deltat * decimate
1349 return tr
1351 def sum(self, args, delays, weights, itmin=None, nsamples=None,
1352 decimate=1, interpolation='nearest_neighbor', implementation='c',
1353 optimization='enable'):
1354 '''
1355 Sum delayed and weighted GF traces.
1357 Calculate sum of delayed and weighted GF traces. ``args`` is a tuple of
1358 arrays forming the (high-level) indices of the GF traces to be
1359 selected. Delays and weights for the summation are given in the arrays
1360 ``delays`` and ``weights``. ``itmin`` and ``nsamples`` can be given to
1361 restrict to computation to a given time interval. If ``decimate`` is
1362 an integer in the range [2,8], decimated traces are used in the
1363 summation.
1365 :param args: :py:class:`~pyrocko.gf.meta.Config` index tuple, e.g.
1366 ``(source_depth, distance, component)`` as in
1367 :py:class:`~pyrocko.gf.meta.ConfigTypeA`.
1368 :type args: tuple(numpy.ndarray)
1369 :param delays: Delay times
1370 :type delays: :py:class:`numpy.ndarray`
1371 :param weights: Trace weights
1372 :type weights: :py:class:`numpy.ndarray`
1373 :param itmin: Start time index (start time is ``itmin * dt``),
1374 defaults to None
1375 :type itmin: int or None
1376 :param nsamples: Number of samples, defaults to None
1377 :type nsamples: int or None
1378 :param decimate: Decimatation factor, defaults to 1
1379 :type decimate: int
1380 :param interpolation: Interpolation method
1381 ``['nearest_neighbor', 'multilinear', 'off']``, defaults to
1382 ``'nearest_neighbor'``
1383 :type interpolation: str
1384 :param implementation: Implementation to use,
1385 ``['c', 'alternative', 'reference']``, where ``'alternative'``
1386 and ``'reference'`` use a Python implementation, defaults to `'c'`
1387 :type implementation: str
1388 :param optimization: Optimization mode ``['enable', 'disable']``,
1389 defaults to ``'enable'``
1390 :type optimization: str
1391 :returns: Stacked GF trace.
1392 :rtype: :py:class:`~pyrocko.gf.store.GFTrace`
1393 '''
1395 store, decimate_ = self._decimated_store(decimate)
1397 if interpolation == 'nearest_neighbor':
1398 irecords = store.config.irecords(*args)
1399 else:
1400 assert interpolation == 'multilinear'
1401 irecords, ip_weights = store.config.vicinities(*args)
1402 neach = irecords.size // args[0].size
1403 weights = num.repeat(weights, neach) * ip_weights
1404 delays = num.repeat(delays, neach)
1406 tr = store._sum(irecords, delays, weights, itmin, nsamples, decimate_,
1407 implementation, optimization)
1409 # to prevent problems with rounding errors (BaseStore saves deltat
1410 # as a 4-byte floating point value, value from YAML config is more
1411 # accurate)
1412 tr.deltat = self.config.deltat * decimate
1413 return tr
1415 def make_decimated(self, decimate, config=None, force=False,
1416 show_progress=False):
1417 '''
1418 Create decimated version of GF store.
1420 Create a downsampled version of the GF store. Downsampling is done for
1421 the integer factor ``decimate`` which should be in the range [2,8]. If
1422 ``config`` is ``None``, all traces of the GF store are decimated and
1423 held available (i.e. the index mapping of the original store is used),
1424 otherwise, a different spacial stepping can be specified by giving a
1425 modified GF store configuration in ``config`` (see :py:meth:`create`).
1426 Decimated GF sub-stores are created under the ``decimated``
1427 subdirectory within the GF store directory. Holding available decimated
1428 versions of the GF store can save computation time, IO bandwidth, or
1429 decrease memory footprint at the cost of increased disk space usage,
1430 when computation are done for lower frequency signals.
1432 :param decimate: Decimate factor
1433 :type decimate: int
1434 :param config: GF store config object, defaults to None
1435 :type config: :py:class:`~pyrocko.gf.meta.Config` or None
1436 :param force: Force overwrite, defaults to ``False``
1437 :type force: bool
1438 :param show_progress: Show progress, defaults to ``False``
1439 :type show_progress: bool
1440 '''
1442 if not self._f_index:
1443 self.open()
1445 if not (2 <= decimate <= 8):
1446 raise StoreError('decimate argument must be in the range [2,8]')
1448 assert self.mode == 'r'
1450 if config is None:
1451 config = self.config
1453 config = copy.deepcopy(config)
1454 config.sample_rate = self.config.sample_rate / decimate
1456 if decimate in self._decimated:
1457 del self._decimated[decimate]
1459 store_dir = self._decimated_store_dir(decimate)
1460 if os.path.exists(store_dir):
1461 if force:
1462 shutil.rmtree(store_dir)
1463 else:
1464 raise CannotCreate('store already exists at %s' % store_dir)
1466 store_dir_incomplete = store_dir + '-incomplete'
1467 Store.create(store_dir_incomplete, config, force=force)
1469 decimated = Store(store_dir_incomplete, 'w')
1470 if show_progress:
1471 pbar = util.progressbar('decimating store', self.config.nrecords)
1473 for i, args in enumerate(decimated.config.iter_nodes()):
1474 tr = self.get(args, decimate=decimate)
1475 decimated.put(args, tr)
1477 if show_progress:
1478 pbar.update(i+1)
1480 if show_progress:
1481 pbar.finish()
1483 decimated.close()
1485 shutil.move(store_dir_incomplete, store_dir)
1487 self._decimated[decimate] = None
1489 def stats(self):
1490 stats = BaseStore.stats(self)
1491 stats['decimated'] = sorted(self._decimated.keys())
1492 return stats
1494 stats_keys = BaseStore.stats_keys + ['decimated']
1496 def check(self, show_progress=False):
1497 have_holes = []
1498 for pdef in self.config.tabulated_phases:
1499 phase_id = pdef.id
1500 ph = self.get_stored_phase(phase_id)
1501 if ph.check_holes():
1502 have_holes.append(phase_id)
1504 if have_holes:
1505 for phase_id in have_holes:
1506 logger.warning(
1507 'Travel time table of phase "{}" contains holes. You can '
1508 ' use `fomosto tttlsd` to fix holes.'.format(
1509 phase_id))
1510 else:
1511 logger.info('No holes in travel time tables')
1513 if show_progress:
1514 pbar = util.progressbar('checking store', self.config.nrecords)
1516 problems = 0
1517 for i, args in enumerate(self.config.iter_nodes()):
1518 tr = self.get(args)
1519 if tr and not tr.is_zero:
1520 if not tr.begin_value == tr.data[0]:
1521 logger.warning('wrong begin value for trace at %s '
1522 '(data corruption?)' % str(args))
1523 problems += 1
1524 if not tr.end_value == tr.data[-1]:
1525 logger.warning('wrong end value for trace at %s '
1526 '(data corruption?)' % str(args))
1527 problems += 1
1528 if not num.all(num.isfinite(tr.data)):
1529 logger.warning('nans or infs in trace at %s' % str(args))
1530 problems += 1
1532 if show_progress:
1533 pbar.update(i+1)
1535 if show_progress:
1536 pbar.finish()
1538 return problems
1540 def check_earthmodels(self, config):
1541 if config.earthmodel_receiver_1d.profile('z')[-1] not in\
1542 config.earthmodel_1d.profile('z'):
1543 logger.warning('deepest layer of earthmodel_receiver_1d not '
1544 'found in earthmodel_1d')
1546 def _decimated_store_dir(self, decimate):
1547 return os.path.join(self.store_dir, 'decimated', str(decimate))
1549 def _decimated_store(self, decimate):
1550 if decimate == 1 or decimate not in self._decimated:
1551 return self, decimate
1552 else:
1553 store = self._decimated[decimate]
1554 if store is None:
1555 store = Store(self._decimated_store_dir(decimate), 'r')
1556 self._decimated[decimate] = store
1558 return store, 1
1560 def phase_filename(self, phase_id):
1561 check_string_id(phase_id)
1562 return os.path.join(self.store_dir, 'phases', phase_id + '.phase')
1564 def get_stored_phase(self, phase_id):
1565 '''
1566 Get stored phase from GF store.
1568 :returns: Phase information
1569 :rtype: :py:class:`pyrocko.spit.SPTree`
1570 '''
1572 if phase_id not in self._phases:
1573 fn = self.phase_filename(phase_id)
1574 if not os.path.isfile(fn):
1575 raise NoSuchPhase(phase_id)
1577 spt = spit.SPTree(filename=fn)
1578 self._phases[phase_id] = spt
1580 return self._phases[phase_id]
1582 def get_phase(self, phase_def):
1583 toks = phase_def.split(':', 1)
1584 if len(toks) == 2:
1585 provider, phase_def = toks
1586 else:
1587 provider, phase_def = 'stored', toks[0]
1589 if provider == 'stored':
1590 return self.get_stored_phase(phase_def)
1592 elif provider == 'vel':
1593 vel = float(phase_def) * 1000.
1595 def evaluate(args):
1596 return self.config.get_distance(args) / vel
1598 return evaluate
1600 elif provider == 'vel_surface':
1601 vel = float(phase_def) * 1000.
1603 def evaluate(args):
1604 return self.config.get_surface_distance(args) / vel
1606 return evaluate
1608 elif provider in ('cake', 'iaspei'):
1609 from pyrocko import cake
1610 mod = self.config.earthmodel_1d
1611 if provider == 'cake':
1612 phases = [cake.PhaseDef(phase_def)]
1613 else:
1614 phases = cake.PhaseDef.classic(phase_def)
1616 def evaluate(args):
1617 if len(args) == 2:
1618 zr, zs, x = (self.config.receiver_depth,) + args
1619 elif len(args) == 3:
1620 zr, zs, x = args
1621 else:
1622 assert False
1624 t = []
1625 if phases:
1626 rays = mod.arrivals(
1627 phases=phases,
1628 distances=[x*cake.m2d],
1629 zstart=zs,
1630 zstop=zr)
1632 for ray in rays:
1633 t.append(ray.t)
1635 if t:
1636 return min(t)
1637 else:
1638 return None
1640 return evaluate
1642 raise StoreError('unsupported phase provider: %s' % provider)
1644 def t(self, timing, *args):
1645 '''
1646 Compute interpolated phase arrivals.
1648 **Examples:**
1650 If ``test_store`` is of :py:class:`~pyrocko.gf.meta.ConfigTypeA`::
1652 test_store.t('p', (1000, 10000))
1653 test_store.t('last{P|Pdiff}', (1000, 10000)) # The latter arrival
1654 # of P or diffracted
1655 # P phase
1657 If ``test_store`` is of :py:class:`~pyrocko.gf.meta.ConfigTypeB`::
1659 test_store.t('S', (1000, 1000, 10000))
1660 test_store.t('first{P|p|Pdiff|sP}', (1000, 1000, 10000)) # The
1661 ` # first arrival of
1662 # the given phases is
1663 # selected
1665 :param timing: Timing string as described above
1666 :type timing: str or :py:class:`~pyrocko.gf.meta.Timing`
1667 :param \\*args: :py:class:`~pyrocko.gf.meta.Config` index tuple, e.g.
1668 ``(source_depth, distance, component)`` as in
1669 :py:class:`~pyrocko.gf.meta.ConfigTypeA`.
1670 :type \\*args: tuple
1671 :returns: Phase arrival according to ``timing``
1672 :rtype: float or None
1673 '''
1675 if len(args) == 1:
1676 args = args[0]
1677 else:
1678 args = self.config.make_indexing_args1(*args)
1680 if not isinstance(timing, meta.Timing):
1681 timing = meta.Timing(timing)
1683 return timing.evaluate(self.get_phase, args)
1685 def make_timing_params(self, begin, end, snap_vred=True, force=False):
1687 '''
1688 Compute tight parameterized time ranges to include given timings.
1690 Calculates appropriate time ranges to cover given begin and end timings
1691 over all GF points in the store. A dict with the following keys is
1692 returned:
1694 * ``'tmin'``: time [s], minimum of begin timing over all GF points
1695 * ``'tmax'``: time [s], maximum of end timing over all GF points
1696 * ``'vred'``, ``'tmin_vred'``: slope [m/s] and offset [s] of reduction
1697 velocity [m/s] appropriate to catch begin timing over all GF points
1698 * ``'tlenmax_vred'``: maximum time length needed to cover all end
1699 timings, when using linear slope given with (``vred``, ``tmin_vred``)
1700 as start
1701 '''
1703 data = []
1704 warned = set()
1705 for args in self.config.iter_nodes(level=-1):
1706 tmin = self.t(begin, args)
1707 tmax = self.t(end, args)
1708 if tmin is None:
1709 warned.add(str(begin))
1710 if tmax is None:
1711 warned.add(str(end))
1713 x = self.config.get_surface_distance(args)
1714 data.append((x, tmin, tmax))
1716 if len(warned):
1717 w = ' | '.join(list(warned))
1718 msg = '''determination of time window failed using phase
1719definitions: %s.\n Travel time table contains holes in probed ranges. You can
1720use `fomosto tttlsd` to fix holes.''' % w
1721 if force:
1722 logger.warning(msg)
1723 else:
1724 raise MakeTimingParamsFailed(msg)
1726 xs, tmins, tmaxs = num.array(data, dtype=float).T
1728 tlens = tmaxs - tmins
1730 i = num.nanargmin(tmins)
1731 if not num.isfinite(i):
1732 raise MakeTimingParamsFailed('determination of time window failed')
1734 tminmin = tmins[i]
1735 x_tminmin = xs[i]
1736 dx = (xs - x_tminmin)
1737 dx = num.where(dx != 0.0, dx, num.nan)
1738 s = (tmins - tminmin) / dx
1739 sred = num.min(num.abs(s[num.isfinite(s)]))
1741 deltax = self.config.distance_delta
1743 if snap_vred:
1744 tdif = sred*deltax
1745 tdif2 = self.config.deltat * math.floor(tdif / self.config.deltat)
1746 sred = tdif2/self.config.distance_delta
1748 tmin_vred = tminmin - sred*x_tminmin
1749 if snap_vred:
1750 xe = x_tminmin - int(x_tminmin / deltax) * deltax
1751 tmin_vred = float(
1752 self.config.deltat *
1753 math.floor(tmin_vred / self.config.deltat) - xe * sred)
1755 tlenmax_vred = num.nanmax(tmax - (tmin_vred + sred*x))
1756 if sred != 0.0:
1757 vred = 1.0/sred
1758 else:
1759 vred = 0.0
1761 return dict(
1762 tmin=tminmin,
1763 tmax=num.nanmax(tmaxs),
1764 tlenmax=num.nanmax(tlens),
1765 tmin_vred=tmin_vred,
1766 tlenmax_vred=tlenmax_vred,
1767 vred=vred)
1769 def make_ttt(self, force=False):
1770 '''
1771 Compute travel time tables.
1773 Travel time tables are computed using the 1D earth model defined in
1774 :py:attr:`~pyrocko.gf.meta.Config.earthmodel_1d` for each defined phase
1775 in :py:attr:`~pyrocko.gf.meta.Config.tabulated_phases`. The accuracy of
1776 the tablulated times is adjusted to the sampling rate of the store.
1777 '''
1779 from pyrocko import cake
1780 config = self.config
1782 if not config.tabulated_phases:
1783 return
1785 mod = config.earthmodel_1d
1787 if config.earthmodel_receiver_1d:
1788 self.check_earthmodels(config)
1790 if not mod:
1791 raise StoreError('no earth model found')
1793 for pdef in config.tabulated_phases:
1795 phase_id = pdef.id
1796 phases = pdef.phases
1797 horvels = pdef.horizontal_velocities
1799 fn = self.phase_filename(phase_id)
1801 if os.path.exists(fn) and not force:
1802 logger.info('file already exists: %s' % fn)
1803 continue
1805 def evaluate(args):
1807 if len(args) == 2:
1808 zr, zs, x = (config.receiver_depth,) + args
1809 elif len(args) == 3:
1810 zr, zs, x = args
1811 else:
1812 assert False
1814 t = []
1815 if phases:
1816 rays = mod.arrivals(
1817 phases=phases,
1818 distances=[x*cake.m2d],
1819 zstart=zs,
1820 zstop=zr)
1822 for ray in rays:
1823 t.append(ray.t)
1825 for v in horvels:
1826 t.append(x/(v*1000.))
1828 if t:
1829 return min(t)
1830 else:
1831 return None
1833 logger.info('making travel time table for phasegroup "%s"' %
1834 phase_id)
1836 ip = spit.SPTree(
1837 f=evaluate,
1838 ftol=config.deltat*0.5,
1839 xbounds=num.transpose((config.mins, config.maxs)),
1840 xtols=config.deltas)
1842 util.ensuredirs(fn)
1843 ip.dump(fn)
1845 def get_provided_components(self):
1847 scheme_desc = meta.component_scheme_to_description[
1848 self.config.component_scheme]
1850 quantity = self.config.stored_quantity \
1851 or scheme_desc.default_stored_quantity
1853 if not quantity:
1854 return scheme_desc.provided_components
1855 else:
1856 return [
1857 quantity + '.' + comp
1858 for comp in scheme_desc.provided_components]
1860 def fix_ttt_holes(self, phase_id):
1862 pdef = self.config.get_tabulated_phase(phase_id)
1863 mode = None
1864 for phase in pdef.phases:
1865 for leg in phase.legs():
1866 if mode is None:
1867 mode = leg.mode
1869 else:
1870 if mode != leg.mode:
1871 raise StoreError(
1872 'Can only fix holes in pure P or pure S phases.')
1874 sptree = self.get_stored_phase(phase_id)
1875 sptree_lsd = self.config.fix_ttt_holes(sptree, mode)
1877 phase_lsd = phase_id + '.lsd'
1878 fn = self.phase_filename(phase_lsd)
1879 sptree_lsd.dump(fn)
1881 def statics(self, source, multi_location, itsnapshot, components,
1882 interpolation='nearest_neighbor', nthreads=0):
1883 if not self._f_index:
1884 self.open()
1886 out = {}
1887 ntargets = multi_location.ntargets
1888 source_terms = source.get_source_terms(self.config.component_scheme)
1889 # TODO: deal with delays for snapshots > 1 sample
1891 if itsnapshot is not None:
1892 delays = source.times
1894 # Fringe case where we sample at sample 0 and sample 1
1895 tsnapshot = itsnapshot * self.config.deltat
1896 if delays.max() == tsnapshot and delays.min() != tsnapshot:
1897 delays[delays == delays.max()] -= self.config.deltat
1899 else:
1900 delays = source.times * 0
1901 itsnapshot = 1
1903 if ntargets == 0:
1904 raise StoreError('MultiLocation.coords5 is empty')
1906 res = store_ext.store_calc_static(
1907 self.cstore,
1908 source.coords5(),
1909 source_terms,
1910 delays,
1911 multi_location.coords5,
1912 self.config.component_scheme,
1913 interpolation,
1914 itsnapshot,
1915 nthreads)
1917 out = {}
1918 for icomp, (comp, comp_res) in enumerate(
1919 zip(self.get_provided_components(), res)):
1920 if comp not in components:
1921 continue
1922 out[comp] = res[icomp]
1924 return out
1926 def calc_seismograms(self, source, receivers, components, deltat=None,
1927 itmin=None, nsamples=None,
1928 interpolation='nearest_neighbor',
1929 optimization='enable', nthreads=1):
1930 config = self.config
1932 assert interpolation in ['nearest_neighbor', 'multilinear'], \
1933 'Unknown interpolation: %s' % interpolation
1935 if not isinstance(receivers, list):
1936 receivers = [receivers]
1938 if deltat is None:
1939 decimate = 1
1940 else:
1941 decimate = int(round(deltat/config.deltat))
1942 if abs(deltat / (decimate * config.deltat) - 1.0) > 0.001:
1943 raise StoreError(
1944 'unavailable decimation ratio target.deltat / store.deltat'
1945 ' = %g / %g' % (deltat, config.deltat))
1947 store, decimate_ = self._decimated_store(decimate)
1949 if not store._f_index:
1950 store.open()
1952 scheme = config.component_scheme
1954 source_coords_arr = source.coords5()
1955 source_terms = source.get_source_terms(scheme)
1956 delays = source.times.ravel()
1958 nreceiver = len(receivers)
1959 receiver_coords_arr = num.empty((nreceiver, 5))
1960 for irec, rec in enumerate(receivers):
1961 receiver_coords_arr[irec, :] = rec.coords5
1963 dt = self.get_deltat()
1965 itoffset = int(num.floor(delays.min()/dt)) if delays.size != 0 else 0
1967 if itmin is None:
1968 itmin = num.zeros(nreceiver, dtype=num.int32)
1969 else:
1970 itmin = (itmin-itoffset).astype(num.int32)
1972 if nsamples is None:
1973 nsamples = num.zeros(nreceiver, dtype=num.int32) - 1
1974 else:
1975 nsamples = nsamples.astype(num.int32)
1977 try:
1978 results = store_ext.store_calc_timeseries(
1979 store.cstore,
1980 source_coords_arr,
1981 source_terms,
1982 (delays - itoffset*dt),
1983 receiver_coords_arr,
1984 scheme,
1985 interpolation,
1986 itmin,
1987 nsamples,
1988 nthreads)
1989 except Exception as e:
1990 raise e
1992 provided_components = self.get_provided_components()
1993 ncomponents = len(provided_components)
1995 seismograms = [dict() for _ in range(nreceiver)]
1996 for ires in range(len(results)):
1997 res = results.pop(0)
1998 ireceiver = ires // ncomponents
2000 comp = provided_components[res[-2]]
2002 if comp not in components:
2003 continue
2005 tr = GFTrace(*res[:-2])
2006 tr.deltat = config.deltat * decimate
2007 tr.itmin += itoffset
2009 tr.n_records_stacked = 0
2010 tr.t_optimize = 0.
2011 tr.t_stack = 0.
2012 tr.err = res[-1]
2014 seismograms[ireceiver][comp] = tr
2016 return seismograms
2018 def seismogram(self, source, receiver, components, deltat=None,
2019 itmin=None, nsamples=None,
2020 interpolation='nearest_neighbor',
2021 optimization='enable', nthreads=1):
2023 config = self.config
2025 if deltat is None:
2026 decimate = 1
2027 else:
2028 decimate = int(round(deltat/config.deltat))
2029 if abs(deltat / (decimate * config.deltat) - 1.0) > 0.001:
2030 raise StoreError(
2031 'unavailable decimation ratio target.deltat / store.deltat'
2032 ' = %g / %g' % (deltat, config.deltat))
2034 store, decimate_ = self._decimated_store(decimate)
2036 if not store._f_index:
2037 store.open()
2039 scheme = config.component_scheme
2041 source_coords_arr = source.coords5()
2042 source_terms = source.get_source_terms(scheme)
2043 receiver_coords_arr = receiver.coords5[num.newaxis, :].copy()
2045 try:
2046 params = store_ext.make_sum_params(
2047 store.cstore,
2048 source_coords_arr,
2049 source_terms,
2050 receiver_coords_arr,
2051 scheme,
2052 interpolation, nthreads)
2054 except store_ext.StoreExtError:
2055 raise meta.OutOfBounds()
2057 provided_components = self.get_provided_components()
2059 out = {}
2060 for icomp, comp in enumerate(provided_components):
2061 if comp in components:
2062 weights, irecords = params[icomp]
2064 neach = irecords.size // source.times.size
2065 delays = num.repeat(source.times, neach)
2067 tr = store._sum(
2068 irecords, delays, weights, itmin, nsamples, decimate_,
2069 'c', optimization)
2071 # to prevent problems with rounding errors (BaseStore saves
2072 # deltat as a 4-byte floating point value, value from YAML
2073 # config is more accurate)
2074 tr.deltat = config.deltat * decimate
2076 out[comp] = tr
2078 return out
2081__all__ = '''
2082gf_dtype
2083NotMultipleOfSamplingInterval
2084GFTrace
2085CannotCreate
2086CannotOpen
2087DuplicateInsert
2088NotAllowedToInterpolate
2089NoSuchExtra
2090NoSuchPhase
2091BaseStore
2092Store
2093SeismosizerErrorEnum
2094'''.split()