# http://pyrocko.org - GPLv3
#
# The Pyrocko Developers, 21st Century
# ---|P------/S----------~Lg----------
from __future__ import absolute_import, division
from builtins import zip, range
import errno
import time
import os
import struct
import weakref
import math
import shutil
import fcntl
import copy
import logging
import re
import hashlib
import numpy as num
from scipy import signal
from . import meta
from .error import StoreError
from . import store_ext
from pyrocko import util, spit
logger = logging.getLogger('pyrocko.gf.store')
op = os.path
# gf store endianness
E = '<'
gf_dtype = num.dtype(num.float32)
gf_dtype_store = num.dtype(E + 'f4')
gf_dtype_nbytes_per_sample = 4
gf_store_header_dtype = [
('nrecords', E + 'u8'),
('deltat', E + 'f4'),
]
gf_store_header_fmt = E + 'Qf'
gf_store_header_fmt_size = struct.calcsize(gf_store_header_fmt)
gf_record_dtype = num.dtype([
('data_offset', E + 'u8'),
('itmin', E + 'i4'),
('nsamples', E + 'u4'),
('begin_value', E + 'f4'),
('end_value', E + 'f4'),
])
class SeismosizerErrorEnum:
SUCCESS = 0
INVALID_RECORD = 1
EMPTY_RECORD = 2
BAD_RECORD = 3
ALLOC_FAILED = 4
BAD_REQUEST = 5
BAD_DATA_OFFSET = 6
READ_DATA_FAILED = 7
SEEK_INDEX_FAILED = 8
READ_INDEX_FAILED = 9
FSTAT_TRACES_FAILED = 10
BAD_STORE = 11
MMAP_INDEX_FAILED = 12
MMAP_TRACES_FAILED = 13
INDEX_OUT_OF_BOUNDS = 14
NTARGETS_OUT_OF_BOUNDS = 15
def valid_string_id(s):
return re.match(meta.StringID.pattern, s)
def check_string_id(s):
if not valid_string_id(s):
raise ValueError('invalid name %s' % s)
# - data_offset
#
# Data file offset of first sample in bytes (the seek value).
# Special values:
#
# 0x00 - missing trace
# 0x01 - zero trace (GF trace is physically zero)
# 0x02 - short trace of 1 or 2 samples (no need for data allocation)
#
# - itmin
#
# Time of first sample of the trace as a multiple of the sampling interval. All
# GF samples must be evaluated at multiples of the sampling interval with
# respect to a global reference time zero. Must be set also for zero traces.
#
# - nsamples
#
# Number of samples in the GF trace. Should be set to zero for zero traces.
#
# - begin_value, end_value
#
# Values of first and last sample. These values are included in data[]
# redunantly.
[docs]class NotMultipleOfSamplingInterval(Exception):
pass
sampling_check_eps = 1e-5
[docs]class GFTrace(object):
'''
Green's Function trace class for handling traces from the GF store.
'''
@classmethod
def from_trace(cls, tr):
return cls(data=tr.ydata.copy(), tmin=tr.tmin, deltat=tr.deltat)
def __init__(self, data=None, itmin=None, deltat=1.0,
is_zero=False, begin_value=None, end_value=None, tmin=None):
assert sum((x is None) for x in (tmin, itmin)) == 1, \
'GFTrace: either tmin or itmin must be given'\
if tmin is not None:
itmin = int(round(tmin / deltat))
if abs(itmin*deltat - tmin) > sampling_check_eps*deltat:
raise NotMultipleOfSamplingInterval(
'GFTrace: tmin (%g) is not a multiple of sampling '
'interval (%g)' % (tmin, deltat))
if data is not None:
data = num.asarray(data, dtype=gf_dtype)
else:
data = num.array([], dtype=gf_dtype)
begin_value = 0.0
end_value = 0.0
self.data = weakref.ref(data)()
self.itmin = itmin
self.deltat = deltat
self.is_zero = is_zero
self.n_records_stacked = 0.
self.t_stack = 0.
self.t_optimize = 0.
self.err = SeismosizerErrorEnum.SUCCESS
if data is not None and data.size > 0:
if begin_value is None:
begin_value = data[0]
if end_value is None:
end_value = data[-1]
self.begin_value = begin_value
self.end_value = end_value
@property
def t(self):
'''
Time vector of the GF trace.
:returns: Time vector
:rtype: :py:class:`numpy.ndarray`
'''
return num.linspace(
self.itmin*self.deltat,
(self.itmin+self.data.size-1)*self.deltat, self.data.size)
def __str__(self, itmin=0):
if self.is_zero:
return 'ZERO'
s = []
for i in range(itmin, self.itmin + self.data.size):
if i >= self.itmin and i < self.itmin + self.data.size:
s.append('%7.4g' % self.data[i-self.itmin])
else:
s.append(' '*7)
return '|'.join(s)
def to_trace(self, net, sta, loc, cha):
from pyrocko import trace
return trace.Trace(
net, sta, loc, cha,
ydata=self.data,
deltat=self.deltat,
tmin=self.itmin*self.deltat)
class GFValue(object):
def __init__(self, value):
self.value = value
self.n_records_stacked = 0.
self.t_stack = 0.
self.t_optimize = 0.
Zero = GFTrace(is_zero=True, itmin=0)
def make_same_span(tracesdict):
traces = tracesdict.values()
nonzero = [tr for tr in traces if not tr.is_zero]
if not nonzero:
return {k: Zero for k in tracesdict.keys()}
deltat = nonzero[0].deltat
itmin = min(tr.itmin for tr in nonzero)
itmax = max(tr.itmin+tr.data.size for tr in nonzero) - 1
out = {}
for k, tr in tracesdict.items():
n = itmax - itmin + 1
if tr.itmin != itmin or tr.data.size != n:
data = num.zeros(n, dtype=gf_dtype)
if not tr.is_zero:
lo = tr.itmin-itmin
hi = lo + tr.data.size
data[:lo] = tr.data[0]
data[lo:hi] = tr.data
data[hi:] = tr.data[-1]
tr = GFTrace(data, itmin, deltat)
out[k] = tr
return out
[docs]class CannotCreate(StoreError):
pass
[docs]class CannotOpen(StoreError):
pass
[docs]class DuplicateInsert(StoreError):
pass
class ShortRead(StoreError):
def __str__(self):
return 'unexpected end of data (truncated traces file?)'
[docs]class NotAllowedToInterpolate(StoreError):
def __str__(self):
return 'not allowed to interpolate'
[docs]class NoSuchPhase(StoreError):
def __init__(self, s):
StoreError.__init__(self)
self.value = s
def __str__(self):
return 'phase for key "%s" not found. ' \
'Running "fomosto ttt" may be needed.' % self.value
def remove_if_exists(fn, force=False):
if os.path.exists(fn):
if force:
os.remove(fn)
else:
raise CannotCreate('file %s already exists' % fn)
def get_extra_path(store_dir, key):
check_string_id(key)
return os.path.join(store_dir, 'extra', key)
class BaseStore(object):
@staticmethod
def index_fn_(store_dir):
return os.path.join(store_dir, 'index')
@staticmethod
def data_fn_(store_dir):
return os.path.join(store_dir, 'traces')
@staticmethod
def config_fn_(store_dir):
return os.path.join(store_dir, 'config')
@staticmethod
def create(store_dir, deltat, nrecords, force=False):
try:
util.ensuredir(store_dir)
except Exception:
raise CannotCreate('cannot create directory %s' % store_dir)
index_fn = BaseStore.index_fn_(store_dir)
data_fn = BaseStore.data_fn_(store_dir)
for fn in (index_fn, data_fn):
remove_if_exists(fn, force)
with open(index_fn, 'wb') as f:
f.write(struct.pack(gf_store_header_fmt, nrecords, deltat))
records = num.zeros(nrecords, dtype=gf_record_dtype)
records.tofile(f)
with open(data_fn, 'wb') as f:
f.write(b'\0' * 32)
def __init__(self, store_dir, mode='r', use_memmap=True):
assert mode in 'rw'
self.store_dir = store_dir
self.mode = mode
self._use_memmap = use_memmap
self._nrecords = None
self._deltat = None
self._f_index = None
self._f_data = None
self._end_values = None
self.cstore = None
def open(self):
index_fn = self.index_fn()
data_fn = self.data_fn()
if self.mode == 'r':
fmode = 'rb'
elif self.mode == 'w':
fmode = 'r+b'
else:
assert False, 'invalid mode: %s' % self.mode
try:
self._f_index = open(index_fn, fmode)
self._f_data = open(data_fn, fmode)
except Exception:
self.mode = ''
raise CannotOpen('cannot open gf store: %s' % self.store_dir)
try:
# replace single precision deltat value in store with the double
# precision value from the config, if available
self.cstore = store_ext.store_init(
self._f_index.fileno(), self._f_data.fileno(),
self.get_deltat() or 0.0)
except store_ext.StoreExtError as e:
raise StoreError(str(e))
while True:
try:
dataheader = self._f_index.read(gf_store_header_fmt_size)
break
except IOError as e:
# occasionally got this one on an NFS volume
if e.errno == errno.EBUSY:
time.sleep(0.01)
else:
raise
nrecords, deltat = struct.unpack(gf_store_header_fmt, dataheader)
self._nrecords = nrecords
self._deltat = deltat
self._load_index()
def __del__(self):
if self.mode != '':
self.close()
def lock(self):
if not self._f_index:
self.open()
while True:
try:
fcntl.lockf(self._f_index, fcntl.LOCK_EX)
break
except IOError as e:
if e.errno == errno.ENOLCK:
time.sleep(0.01)
else:
raise
def unlock(self):
self._f_data.flush()
fcntl.lockf(self._f_index, fcntl.LOCK_UN)
def put(self, irecord, trace):
self._put(irecord, trace)
def get_record(self, irecord):
return self._get_record(irecord)
def get_span(self, irecord, decimate=1):
return self._get_span(irecord, decimate=decimate)
def get(self, irecord, itmin=None, nsamples=None, decimate=1,
implementation='c'):
return self._get(irecord, itmin, nsamples, decimate, implementation)
def sum(self, irecords, delays, weights, itmin=None,
nsamples=None, decimate=1, implementation='c',
optimization='enable'):
return self._sum(irecords, delays, weights, itmin, nsamples, decimate,
implementation, optimization)
def irecord_format(self):
return util.zfmt(self._nrecords)
def str_irecord(self, irecord):
return self.irecord_format() % irecord
def close(self):
if self.mode == 'w':
if not self._f_index:
self.open()
self._save_index()
if self._f_data:
self._f_data.close()
self._f_data = None
if self._f_index:
self._f_index.close()
self._f_index = None
self.mode = ''
def _get_record(self, irecord):
if not self._f_index:
self.open()
return self._records[irecord]
def _get(self, irecord, itmin, nsamples, decimate, implementation):
'''Retrieve complete GF trace from storage.'''
if not self._f_index:
self.open()
if not self.mode == 'r':
raise StoreError('store not open in read mode')
if implementation == 'c' and decimate == 1:
if nsamples is None:
nsamples = -1
if itmin is None:
itmin = 0
try:
return GFTrace(*store_ext.store_get(
self.cstore, int(irecord), int(itmin), int(nsamples)))
except store_ext.StoreExtError as e:
raise StoreError(str(e))
else:
return self._get_impl_reference(irecord, itmin, nsamples, decimate)
def get_deltat(self):
return self._deltat
def _get_impl_reference(self, irecord, itmin, nsamples, decimate):
deltat = self.get_deltat()
if not (0 <= irecord < self._nrecords):
raise StoreError('invalid record number requested '
'(irecord = %i, nrecords = %i)' %
(irecord, self._nrecords))
ipos, itmin_data, nsamples_data, begin_value, end_value = \
self._records[irecord]
if None in (itmin, nsamples):
itmin = itmin_data
itmax = itmin_data + nsamples_data - 1
nsamples = nsamples_data
else:
itmin = itmin * decimate
nsamples = nsamples * decimate
itmax = itmin + nsamples - decimate
if ipos == 0:
return None
elif ipos == 1:
itmin_ext = (max(itmin, itmin_data)//decimate) * decimate
return GFTrace(is_zero=True, itmin=itmin_ext//decimate)
if decimate == 1:
ilo = max(itmin, itmin_data) - itmin_data
ihi = min(itmin+nsamples, itmin_data+nsamples_data) - itmin_data
data = self._get_data(ipos, begin_value, end_value, ilo, ihi)
return GFTrace(data, itmin=itmin_data+ilo, deltat=deltat,
begin_value=begin_value, end_value=end_value)
else:
itmax_data = itmin_data + nsamples_data - 1
# put begin and end to multiples of new sampling rate
itmin_ext = (max(itmin, itmin_data)//decimate) * decimate
itmax_ext = -((-min(itmax, itmax_data))//decimate) * decimate
nsamples_ext = itmax_ext - itmin_ext + 1
# add some padding for the aa filter
order = 30
itmin_ext_pad = itmin_ext - order//2
itmax_ext_pad = itmax_ext + order//2
nsamples_ext_pad = itmax_ext_pad - itmin_ext_pad + 1
itmin_overlap = max(itmin_data, itmin_ext_pad)
itmax_overlap = min(itmax_data, itmax_ext_pad)
ilo = itmin_overlap - itmin_ext_pad
ihi = max(ilo, itmax_overlap - itmin_ext_pad + 1)
ilo_data = itmin_overlap - itmin_data
ihi_data = max(ilo_data, itmax_overlap - itmin_data + 1)
data_ext_pad = num.empty(nsamples_ext_pad, dtype=gf_dtype)
data_ext_pad[ilo:ihi] = self._get_data(
ipos, begin_value, end_value, ilo_data, ihi_data)
data_ext_pad[:ilo] = begin_value
data_ext_pad[ihi:] = end_value
b = signal.firwin(order + 1, 1. / decimate, window='hamming')
a = 1.
data_filt_pad = signal.lfilter(b, a, data_ext_pad)
data_deci = data_filt_pad[order:order+nsamples_ext:decimate]
if data_deci.size >= 1:
if itmin_ext <= itmin_data:
data_deci[0] = begin_value
if itmax_ext >= itmax_data:
data_deci[-1] = end_value
return GFTrace(data_deci, itmin_ext//decimate,
deltat*decimate,
begin_value=begin_value, end_value=end_value)
def _get_span(self, irecord, decimate=1):
'''
Get temporal extent of GF trace at given index.
'''
if not self._f_index:
self.open()
assert 0 <= irecord < self._nrecords, \
'irecord = %i, nrecords = %i' % (irecord, self._nrecords)
(_, itmin, nsamples, _, _) = self._records[irecord]
itmax = itmin + nsamples - 1
if decimate == 1:
return itmin, itmax
else:
if nsamples == 0:
return itmin//decimate, itmin//decimate - 1
else:
return itmin//decimate, -((-itmax)//decimate)
def _put(self, irecord, trace):
'''
Save GF trace to storage.
'''
if not self._f_index:
self.open()
deltat = self.get_deltat()
assert self.mode == 'w'
assert trace.is_zero or \
abs(trace.deltat - deltat) < 1e-7 * deltat
assert 0 <= irecord < self._nrecords, \
'irecord = %i, nrecords = %i' % (irecord, self._nrecords)
if self._records[irecord][0] != 0:
raise DuplicateInsert('record %i already in store' % irecord)
if trace.is_zero or num.all(trace.data == 0.0):
self._records[irecord] = (1, trace.itmin, 0, 0., 0.)
return
ndata = trace.data.size
if ndata > 2:
self._f_data.seek(0, 2)
ipos = self._f_data.tell()
trace.data.astype(gf_dtype_store).tofile(self._f_data)
else:
ipos = 2
self._records[irecord] = (ipos, trace.itmin, ndata,
trace.data[0], trace.data[-1])
def _sum_impl_alternative(self, irecords, delays, weights, itmin, nsamples,
decimate):
'''
Sum delayed and weighted GF traces.
'''
if not self._f_index:
self.open()
assert self.mode == 'r'
deltat = self.get_deltat() * decimate
if len(irecords) == 0:
if None in (itmin, nsamples):
return Zero
else:
return GFTrace(
num.zeros(nsamples, dtype=gf_dtype), itmin,
deltat, is_zero=True)
assert len(irecords) == len(delays)
assert len(irecords) == len(weights)
if None in (itmin, nsamples):
itmin_delayed, itmax_delayed = [], []
for irecord, delay in zip(irecords, delays):
itmin, itmax = self._get_span(irecord, decimate=decimate)
itmin_delayed.append(itmin + delay/deltat)
itmax_delayed.append(itmax + delay/deltat)
itmin = int(math.floor(min(itmin_delayed)))
nsamples = int(math.ceil(max(itmax_delayed))) - itmin + 1
out = num.zeros(nsamples, dtype=gf_dtype)
if nsamples == 0:
return GFTrace(out, itmin, deltat)
for ii in range(len(irecords)):
irecord = irecords[ii]
delay = delays[ii]
weight = weights[ii]
if weight == 0.0:
continue
idelay_floor = int(math.floor(delay/deltat))
idelay_ceil = int(math.ceil(delay/deltat))
gf_trace = self._get(
irecord,
itmin - idelay_ceil,
nsamples + idelay_ceil - idelay_floor,
decimate,
'reference')
assert gf_trace.itmin >= itmin - idelay_ceil
assert gf_trace.data.size <= nsamples + idelay_ceil - idelay_floor
if gf_trace.is_zero:
continue
ilo = gf_trace.itmin - itmin + idelay_floor
ihi = ilo + gf_trace.data.size + (idelay_ceil-idelay_floor)
data = gf_trace.data
if idelay_floor == idelay_ceil:
out[ilo:ihi] += data * weight
else:
if data.size:
k = 1
if ihi <= nsamples:
out[ihi-1] += gf_trace.end_value * \
((idelay_ceil-delay/deltat) * weight)
k = 0
out[ilo+1:ihi-k] += data[:data.size-k] * \
((delay/deltat-idelay_floor) * weight)
k = 1
if ilo >= 0:
out[ilo] += gf_trace.begin_value * \
((delay/deltat-idelay_floor) * weight)
k = 0
out[ilo+k:ihi-1] += data[k:] * \
((idelay_ceil-delay/deltat) * weight)
if ilo > 0 and gf_trace.begin_value != 0.0:
out[:ilo] += gf_trace.begin_value * weight
if ihi < nsamples and gf_trace.end_value != 0.0:
out[ihi:] += gf_trace.end_value * weight
return GFTrace(out, itmin, deltat)
def _sum_impl_reference(self, irecords, delays, weights, itmin, nsamples,
decimate):
if not self._f_index:
self.open()
deltat = self.get_deltat() * decimate
if len(irecords) == 0:
if None in (itmin, nsamples):
return Zero
else:
return GFTrace(
num.zeros(nsamples, dtype=gf_dtype), itmin,
deltat, is_zero=True)
datas = []
itmins = []
for i, delay, weight in zip(irecords, delays, weights):
tr = self._get(i, None, None, decimate, 'reference')
idelay_floor = int(math.floor(delay/deltat))
idelay_ceil = int(math.ceil(delay/deltat))
if idelay_floor == idelay_ceil:
itmins.append(tr.itmin + idelay_floor)
datas.append(tr.data.copy()*weight)
else:
itmins.append(tr.itmin + idelay_floor)
datas.append(tr.data.copy()*weight*(idelay_ceil-delay/deltat))
itmins.append(tr.itmin + idelay_ceil)
datas.append(tr.data.copy()*weight*(delay/deltat-idelay_floor))
itmin_all = min(itmins)
itmax_all = max(itmin_ + data.size for (itmin_, data) in
zip(itmins, datas))
if itmin is not None:
itmin_all = min(itmin_all, itmin)
if nsamples is not None:
itmax_all = max(itmax_all, itmin+nsamples)
nsamples_all = itmax_all - itmin_all
arr = num.zeros((len(datas), nsamples_all), dtype=gf_dtype)
for i, itmin_, data in zip(num.arange(len(datas)), itmins, datas):
if data.size > 0:
ilo = itmin_-itmin_all
ihi = ilo + data.size
arr[i, :ilo] = data[0]
arr[i, ilo:ihi] = data
arr[i, ihi:] = data[-1]
sum_arr = arr.sum(axis=0)
if itmin is not None and nsamples is not None:
ilo = itmin-itmin_all
ihi = ilo + nsamples
sum_arr = sum_arr[ilo:ihi]
else:
itmin = itmin_all
return GFTrace(sum_arr, itmin, deltat)
def _optimize(self, irecords, delays, weights):
if num.unique(irecords).size == irecords.size:
return irecords, delays, weights
deltat = self.get_deltat()
delays = delays / deltat
irecords2 = num.repeat(irecords, 2)
delays2 = num.empty(irecords2.size, dtype=num.float)
delays2[0::2] = num.floor(delays)
delays2[1::2] = num.ceil(delays)
weights2 = num.repeat(weights, 2)
weights2[0::2] *= 1.0 - (delays - delays2[0::2])
weights2[1::2] *= (1.0 - (delays2[1::2] - delays)) * \
(delays2[1::2] - delays2[0::2])
delays2 *= deltat
iorder = num.lexsort((delays2, irecords2))
irecords2 = irecords2[iorder]
delays2 = delays2[iorder]
weights2 = weights2[iorder]
ui = num.empty(irecords2.size, dtype=num.bool)
ui[1:] = num.logical_or(num.diff(irecords2) != 0,
num.diff(delays2) != 0.)
ui[0] = 0
ind2 = num.cumsum(ui)
ui[0] = 1
ind1 = num.where(ui)[0]
irecords3 = irecords2[ind1]
delays3 = delays2[ind1]
weights3 = num.bincount(ind2, weights2)
return irecords3, delays3, weights3
def _sum(self, irecords, delays, weights, itmin, nsamples, decimate,
implementation, optimization):
if not self._f_index:
self.open()
t0 = time.time()
if optimization == 'enable':
irecords, delays, weights = self._optimize(
irecords, delays, weights)
else:
assert optimization == 'disable'
t1 = time.time()
deltat = self.get_deltat()
if implementation == 'c' and decimate == 1:
if delays.size != 0:
itoffset = int(num.floor(num.min(delays)/deltat))
else:
itoffset = 0
if nsamples is None:
nsamples = -1
if itmin is None:
itmin = 0
else:
itmin -= itoffset
try:
tr = GFTrace(*store_ext.store_sum(
self.cstore, irecords.astype(num.uint64),
delays - itoffset*deltat,
weights.astype(num.float32),
int(itmin), int(nsamples)))
tr.itmin += itoffset
except store_ext.StoreExtError as e:
raise StoreError(str(e) + ' in store %s' % self.store_dir)
elif implementation == 'alternative':
tr = self._sum_impl_alternative(irecords, delays, weights,
itmin, nsamples, decimate)
else:
tr = self._sum_impl_reference(irecords, delays, weights,
itmin, nsamples, decimate)
t2 = time.time()
tr.n_records_stacked = irecords.size
tr.t_optimize = t1 - t0
tr.t_stack = t2 - t1
return tr
def _sum_statics(self, irecords, delays, weights, it, ntargets,
nthreads):
if not self._f_index:
self.open()
return store_ext.store_sum_static(
self.cstore, irecords, delays, weights, it, ntargets, nthreads)
def _load_index(self):
if self._use_memmap:
records = num.memmap(
self._f_index, dtype=gf_record_dtype,
offset=gf_store_header_fmt_size,
mode=('r', 'r+')[self.mode == 'w'])
else:
self._f_index.seek(gf_store_header_fmt_size)
records = num.fromfile(self._f_index, dtype=gf_record_dtype)
assert len(records) == self._nrecords
self._records = records
self._end_values = self._records['end_value']
def _save_index(self):
self._f_index.seek(0)
self._f_index.write(struct.pack(gf_store_header_fmt, self._nrecords,
self.get_deltat()))
if self._use_memmap:
del self._records
else:
self._f_index.seek(gf_store_header_fmt_size)
self._records.tofile(self._f_index)
self._f_index.flush()
def _get_data(self, ipos, begin_value, end_value, ilo, ihi):
if ihi - ilo > 0:
if ipos == 2:
data_orig = num.empty(2, dtype=gf_dtype)
data_orig[0] = begin_value
data_orig[1] = end_value
return data_orig[ilo:ihi]
else:
self._f_data.seek(
int(ipos + ilo*gf_dtype_nbytes_per_sample))
arr = num.fromfile(
self._f_data, gf_dtype_store, ihi-ilo).astype(gf_dtype)
if arr.size != ihi-ilo:
raise ShortRead()
return arr
else:
return num.empty((0,), dtype=gf_dtype)
def index_fn(self):
return BaseStore.index_fn_(self.store_dir)
def data_fn(self):
return BaseStore.data_fn_(self.store_dir)
def config_fn(self):
return BaseStore.config_fn_(self.store_dir)
def count_special_records(self):
if not self._f_index:
self.open()
return num.histogram(self._records['data_offset'],
bins=[0, 1, 2, 3, num.uint64(-1)])[0]
@property
def size_index(self):
return os.stat(self.index_fn()).st_size
@property
def size_data(self):
return os.stat(self.data_fn()).st_size
@property
def size_index_and_data(self):
return self.size_index + self.size_data
@property
def size_index_and_data_human(self):
return util.human_bytesize(self.size_index_and_data)
def stats(self):
counter = self.count_special_records()
stats = dict(
total=self._nrecords,
inserted=(counter[1] + counter[2] + counter[3]),
empty=counter[0],
short=counter[2],
zero=counter[1],
size_data=self.size_data,
size_index=self.size_index,
)
return stats
stats_keys = 'total inserted empty short zero size_data size_index'.split()
def remake_dir(dpath, force):
if os.path.exists(dpath):
if force:
shutil.rmtree(dpath)
else:
raise CannotCreate('Directory "%s" already exists.' % dpath)
os.mkdir(dpath)
class MakeTimingParamsFailed(StoreError):
pass
[docs]class Store(BaseStore):
'''
Green's function disk storage and summation machine.
The `Store` can be used to efficiently store, retrieve, and sum Green's
function traces. A `Store` contains many 1D time traces sampled at even
multiples of a global sampling rate, where each time trace has an
individual start and end time. The traces are treated as having repeating
end points, so the functions they represent can be non-constant only
between begin and end time. It provides capabilities to retrieve decimated
traces and to extract parts of the traces. The main purpose of this class
is to provide a fast, easy to use, and flexible machanism to compute
weighted delay-and-sum stacks with many Green's function traces involved.
Individual Green's functions are accessed through a single integer index at
low level. On top of that, various indexing schemes can be implemented by
providing a mapping from physical coordinates to the low level index `i`.
E.g. for a problem with cylindrical symmetry, one might define a mapping
from the coordinates (`receiver_depth`, `source_depth`, `distance`) to the
low level index. Index translation is done in the
:py:class:`pyrocko.gf.meta.Config` subclass object associated with the
Store. It is accessible through the store's :py:attr:`config` attribute,
and contains all meta information about the store, including physical
extent, geometry, sampling rate, and information about the type of the
stored Green's functions. Information about the underlying earth model
can also be found there.
A GF store can also contain tabulated phase arrivals. In basic cases, these
can be created with the :py:meth:`make_ttt` and evaluated with the
:py:func:`t` methods.
.. attribute:: config
The :py:class:`pyrocko.gf.meta.Config` derived object associated with
the store which contains all meta information about the store and
provides the high-level to low-level index mapping.
.. attribute:: store_dir
Path to the store's data directory.
.. attribute:: mode
The mode in which the store is opened: ``'r'``: read-only, ``'w'``:
writeable.
'''
[docs] @classmethod
def create(cls, store_dir, config, force=False, extra=None):
'''
Create new GF store.
Creates a new GF store at path ``store_dir``. The layout of the GF is
defined with the parameters given in ``config``, which should be an
object of a subclass of :py:class:`~pyrocko.gf.meta.Config`. This
function will refuse to overwrite an existing GF store, unless
``force`` is set to ``True``. If more information, e.g. parameters
used for the modelling code, earth models or other, should be saved
along with the GF store, these may be provided though a dict given to
``extra``. The keys of this dict must be names and the values must be
*guts* type objects.
:param store_dir: GF store path
:type store_dir: str
:param config: GF store Config
:type config: :py:class:`~pyrocko.gf.meta.Config`
:param force: Force overwrite, defaults to ``False``
:type force: bool
:param extra: Extra information
:type extra: dict or None
'''
cls.create_editables(store_dir, config, force=force, extra=extra)
cls.create_dependants(store_dir, force=force)
return cls(store_dir)
@staticmethod
def create_editables(store_dir, config, force=False, extra=None):
try:
util.ensuredir(store_dir)
except Exception:
raise CannotCreate('cannot create directory %s' % store_dir)
fns = []
config_fn = os.path.join(store_dir, 'config')
remove_if_exists(config_fn, force)
meta.dump(config, filename=config_fn)
fns.append(config_fn)
for sub_dir in ['extra']:
dpath = os.path.join(store_dir, sub_dir)
remake_dir(dpath, force)
if extra:
for k, v in extra.items():
fn = get_extra_path(store_dir, k)
remove_if_exists(fn, force)
meta.dump(v, filename=fn)
fns.append(fn)
return fns
@staticmethod
def create_dependants(store_dir, force=False):
config_fn = os.path.join(store_dir, 'config')
config = meta.load(filename=config_fn)
BaseStore.create(store_dir, config.deltat, config.nrecords,
force=force)
for sub_dir in ['decimated']:
dpath = os.path.join(store_dir, sub_dir)
remake_dir(dpath, force)
def __init__(self, store_dir, mode='r', use_memmap=True):
BaseStore.__init__(self, store_dir, mode=mode, use_memmap=use_memmap)
config_fn = self.config_fn()
if not os.path.isfile(config_fn):
raise StoreError(
'directory "%s" does not seem to contain a GF store '
'("config" file not found)' % store_dir)
self.load_config()
self._decimated = {}
self._extra = {}
self._phases = {}
for decimate in range(2, 9):
if os.path.isdir(self._decimated_store_dir(decimate)):
self._decimated[decimate] = None
def open(self):
if not self._f_index:
BaseStore.open(self)
c = self.config
mscheme = 'type_' + c.short_type.lower()
store_ext.store_mapping_init(
self.cstore, mscheme,
c.mins, c.maxs, c.deltas, c.ns.astype(num.uint64),
c.ncomponents)
def save_config(self, make_backup=False):
config_fn = self.config_fn()
if make_backup:
os.rename(config_fn, config_fn + '~')
meta.dump(self.config, filename=config_fn)
def get_deltat(self):
return self.config.deltat
def load_config(self):
self.config = meta.load(filename=self.config_fn())
def ensure_reference(self, force=False):
if self.config.reference is not None and not force:
return
self.ensure_uuid()
reference = '%s-%s' % (self.config.id, self.config.uuid[0:6])
if self.config.reference is not None:
self.config.reference = reference
self.save_config()
else:
with open(self.config_fn(), 'a') as config:
config.write('reference: %s\n' % reference)
self.load_config()
def ensure_uuid(self, force=False):
if self.config.uuid is not None and not force:
return
uuid = self.create_store_hash()
if self.config.uuid is not None:
self.config.uuid = uuid
self.save_config()
else:
with open(self.config_fn(), 'a') as config:
config.write('\nuuid: %s\n' % uuid)
self.load_config()
def create_store_hash(self):
logger.info('creating hash for store ...')
m = hashlib.sha1()
traces_size = op.getsize(self.data_fn())
with open(self.data_fn(), 'rb') as traces:
while traces.tell() < traces_size:
m.update(traces.read(4096))
traces.seek(1024 * 1024 * 10, 1)
with open(self.config_fn(), 'rb') as config:
m.update(config.read())
return m.hexdigest()
def get_extra_path(self, key):
return get_extra_path(self.store_dir, key)
[docs] def upgrade(self):
'''
Upgrade store config and files to latest Pyrocko version.
'''
fns = [os.path.join(self.store_dir, 'config')]
for key in self.extra_keys():
fns.append(self.get_extra_path(key))
i = 0
for fn in fns:
i += util.pf_upgrade(fn)
return i
def extra_keys(self):
return [x for x in os.listdir(os.path.join(self.store_dir, 'extra'))
if valid_string_id(x)]
[docs] def put(self, args, trace):
'''
Insert trace into GF store.
Store a single GF trace at (high-level) index ``args``.
:param args: :py:class:`~pyrocko.gf.meta.Config` index tuple, e.g.
``(source_depth, distance, component)`` as in
:py:class:`~pyrocko.gf.meta.ConfigTypeA`.
:type args: tuple
:returns: GF trace at ``args``
:rtype: :py:class:`~pyrocko.gf.store.GFTrace`
'''
irecord = self.config.irecord(*args)
self._put(irecord, trace)
def get_record(self, args):
irecord = self.config.irecord(*args)
return self._get_record(irecord)
def str_irecord(self, args):
return BaseStore.str_irecord(self, self.config.irecord(*args))
[docs] def get(self, args, itmin=None, nsamples=None, decimate=1,
interpolation='nearest_neighbor', implementation='c'):
'''
Retrieve GF trace from store.
Retrieve a single GF trace from the store at (high-level) index
``args``. By default, the full trace is retrieved. Given ``itmin`` and
``nsamples``, only the selected portion of the trace is extracted. If
``decimate`` is an integer in the range [2,8], the trace is decimated
on the fly or, if available, the trace is read from a decimated version
of the GF store.
:param args: :py:class:`~pyrocko.gf.meta.Config` index tuple, e.g.
``(source_depth, distance, component)`` as in
:py:class:`~pyrocko.gf.meta.ConfigTypeA`.
:type args: tuple
:param itmin: Start time index (start time is ``itmin * dt``),
defaults to None
:type itmin: int or None
:param nsamples: Number of samples, defaults to None
:type nsamples: int or None
:param decimate: Decimatation factor, defaults to 1
:type decimate: int
:param interpolation: Interpolation method
``['nearest_neighbor', 'multilinear', 'off']``, defaults to
``'nearest_neighbor'``
:type interpolation: str
:param implementation: Implementation to use ``['c', 'reference']``,
defaults to ``'c'``.
:type implementation: str
:returns: GF trace at ``args``
:rtype: :py:class:`~pyrocko.gf.store.GFTrace`
'''
store, decimate = self._decimated_store(decimate)
if interpolation == 'nearest_neighbor':
irecord = store.config.irecord(*args)
tr = store._get(irecord, itmin, nsamples, decimate,
implementation)
elif interpolation == 'off':
irecords, weights = store.config.vicinity(*args)
if len(irecords) != 1:
raise NotAllowedToInterpolate()
else:
tr = store._get(irecords[0], itmin, nsamples, decimate,
implementation)
elif interpolation == 'multilinear':
tr = store._sum(irecords, num.zeros(len(irecords)), weights,
itmin, nsamples, decimate, implementation,
'disable')
# to prevent problems with rounding errors (BaseStore saves deltat
# as a 4-byte floating point value, value from YAML config is more
# accurate)
tr.deltat = self.config.deltat * decimate
return tr
[docs] def sum(self, args, delays, weights, itmin=None, nsamples=None,
decimate=1, interpolation='nearest_neighbor', implementation='c',
optimization='enable'):
'''
Sum delayed and weighted GF traces.
Calculate sum of delayed and weighted GF traces. ``args`` is a tuple of
arrays forming the (high-level) indices of the GF traces to be
selected. Delays and weights for the summation are given in the arrays
``delays`` and ``weights``. ``itmin`` and ``nsamples`` can be given to
restrict to computation to a given time interval. If ``decimate`` is
an integer in the range [2,8], decimated traces are used in the
summation.
:param args: :py:class:`~pyrocko.gf.meta.Config` index tuple, e.g.
``(source_depth, distance, component)`` as in
:py:class:`~pyrocko.gf.meta.ConfigTypeA`.
:type args: tuple(numpy.ndarray)
:param delays: Delay times
:type delays: :py:class:`numpy.ndarray`
:param weights: Trace weights
:type weights: :py:class:`numpy.ndarray`
:param itmin: Start time index (start time is ``itmin * dt``),
defaults to None
:type itmin: int or None
:param nsamples: Number of samples, defaults to None
:type nsamples: int or None
:param decimate: Decimatation factor, defaults to 1
:type decimate: int
:param interpolation: Interpolation method
``['nearest_neighbor', 'multilinear', 'off']``, defaults to
``'nearest_neighbor'``
:type interpolation: str
:param implementation: Implementation to use,
``['c', 'alternative', 'reference']``, where ``'alternative'``
and ``'reference'`` use a Python implementation, defaults to `'c'`
:type implementation: str
:param optimization: Optimization mode ``['enable', 'disable']``,
defaults to ``'enable'``
:type optimization: str
:returns: Stacked GF trace.
:rtype: :py:class:`~pyrocko.gf.store.GFTrace`
'''
store, decimate_ = self._decimated_store(decimate)
if interpolation == 'nearest_neighbor':
irecords = store.config.irecords(*args)
else:
assert interpolation == 'multilinear'
irecords, ip_weights = store.config.vicinities(*args)
neach = irecords.size // args[0].size
weights = num.repeat(weights, neach) * ip_weights
delays = num.repeat(delays, neach)
tr = store._sum(irecords, delays, weights, itmin, nsamples, decimate_,
implementation, optimization)
# to prevent problems with rounding errors (BaseStore saves deltat
# as a 4-byte floating point value, value from YAML config is more
# accurate)
tr.deltat = self.config.deltat * decimate
return tr
[docs] def make_decimated(self, decimate, config=None, force=False,
show_progress=False):
'''
Create decimated version of GF store.
Create a downsampled version of the GF store. Downsampling is done for
the integer factor ``decimate`` which should be in the range [2,8]. If
``config`` is ``None``, all traces of the GF store are decimated and
held available (i.e. the index mapping of the original store is used),
otherwise, a different spacial stepping can be specified by giving a
modified GF store configuration in ``config`` (see :py:meth:`create`).
Decimated GF sub-stores are created under the ``decimated``
subdirectory within the GF store directory. Holding available decimated
versions of the GF store can save computation time, IO bandwidth, or
decrease memory footprint at the cost of increased disk space usage,
when computation are done for lower frequency signals.
:param decimate: Decimate factor
:type decimate: int
:param config: GF store config object, defaults to None
:type config: :py:class:`~pyrocko.gf.meta.Config` or None
:param force: Force overwrite, defaults to ``False``
:type force: bool
:param show_progress: Show progress, defaults to ``False``
:type show_progress: bool
'''
if not self._f_index:
self.open()
if not (2 <= decimate <= 8):
raise StoreError('decimate argument must be in the range [2,8]')
assert self.mode == 'r'
if config is None:
config = self.config
config = copy.deepcopy(config)
config.sample_rate = self.config.sample_rate / decimate
if decimate in self._decimated:
del self._decimated[decimate]
store_dir = self._decimated_store_dir(decimate)
if os.path.exists(store_dir):
if force:
shutil.rmtree(store_dir)
else:
raise CannotCreate('store already exists at %s' % store_dir)
store_dir_incomplete = store_dir + '-incomplete'
Store.create(store_dir_incomplete, config, force=force)
decimated = Store(store_dir_incomplete, 'w')
if show_progress:
pbar = util.progressbar('decimating store', self.config.nrecords)
for i, args in enumerate(decimated.config.iter_nodes()):
tr = self.get(args, decimate=decimate)
decimated.put(args, tr)
if show_progress:
pbar.update(i+1)
if show_progress:
pbar.finish()
decimated.close()
shutil.move(store_dir_incomplete, store_dir)
self._decimated[decimate] = None
def stats(self):
stats = BaseStore.stats(self)
stats['decimated'] = sorted(self._decimated.keys())
return stats
stats_keys = BaseStore.stats_keys + ['decimated']
def check(self, show_progress=False):
have_holes = []
for pdef in self.config.tabulated_phases:
phase_id = pdef.id
ph = self.get_stored_phase(phase_id)
if ph.check_holes():
have_holes.append(phase_id)
if have_holes:
for phase_id in have_holes:
logger.warn(
'Travel time table of phase "{}" contains holes. You can '
' use `fomosto tttlsd` to fix holes.'.format(
phase_id))
else:
logger.info('No holes in travel time tables')
if show_progress:
pbar = util.progressbar('checking store', self.config.nrecords)
problems = 0
for i, args in enumerate(self.config.iter_nodes()):
tr = self.get(args)
if tr and not tr.is_zero:
if not tr.begin_value == tr.data[0]:
logger.warn('wrong begin value for trace at %s '
'(data corruption?)' % str(args))
problems += 1
if not tr.end_value == tr.data[-1]:
logger.warn('wrong end value for trace at %s '
'(data corruption?)' % str(args))
problems += 1
if not num.all(num.isfinite(tr.data)):
logger.warn('nans or infs in trace at %s' % str(args))
problems += 1
if show_progress:
pbar.update(i+1)
if show_progress:
pbar.finish()
return problems
def check_earthmodels(self, config):
if config.earthmodel_receiver_1d.profile('z')[-1] not in\
config.earthmodel_1d.profile('z'):
logger.warn('deepest layer of earthmodel_receiver_1d not '
'found in earthmodel_1d')
def _decimated_store_dir(self, decimate):
return os.path.join(self.store_dir, 'decimated', str(decimate))
def _decimated_store(self, decimate):
if decimate == 1 or decimate not in self._decimated:
return self, decimate
else:
store = self._decimated[decimate]
if store is None:
store = Store(self._decimated_store_dir(decimate), 'r')
self._decimated[decimate] = store
return store, 1
def phase_filename(self, phase_id):
check_string_id(phase_id)
return os.path.join(self.store_dir, 'phases', phase_id + '.phase')
[docs] def get_stored_phase(self, phase_id):
"""Get stored phase from GF store
:returns: Phase information
:rtype: :py:class:`pyrocko.spit.SPTree`
"""
if phase_id not in self._phases:
fn = self.phase_filename(phase_id)
if not os.path.isfile(fn):
raise NoSuchPhase(phase_id)
spt = spit.SPTree(filename=fn)
self._phases[phase_id] = spt
return self._phases[phase_id]
def get_phase(self, phase_def):
toks = phase_def.split(':', 1)
if len(toks) == 2:
provider, phase_def = toks
else:
provider, phase_def = 'stored', toks[0]
if provider == 'stored':
return self.get_stored_phase(phase_def)
elif provider == 'vel':
vel = float(phase_def) * 1000.
def evaluate(args):
return self.config.get_distance(args) / vel
return evaluate
elif provider == 'vel_surface':
vel = float(phase_def) * 1000.
def evaluate(args):
return self.config.get_surface_distance(args) / vel
return evaluate
elif provider in ('cake', 'iaspei'):
from pyrocko import cake
mod = self.config.earthmodel_1d
if provider == 'cake':
phases = [cake.PhaseDef(phase_def)]
else:
phases = cake.PhaseDef.classic(phase_def)
def evaluate(args):
if len(args) == 2:
zr, zs, x = (self.config.receiver_depth,) + args
elif len(args) == 3:
zr, zs, x = args
else:
assert False
t = []
if phases:
rays = mod.arrivals(
phases=phases,
distances=[x*cake.m2d],
zstart=zs,
zstop=zr)
for ray in rays:
t.append(ray.t)
if t:
return min(t)
else:
return None
return evaluate
raise StoreError('unsupported phase provider: %s' % provider)
[docs] def t(self, timing, *args):
'''
Compute interpolated phase arrivals.
**Examples:**
If ``test_store`` is of :py:class:`~pyrocko.gf.meta.ConfigTypeA`::
test_store.t('p', (1000, 10000))
test_store.t('last{P|Pdiff}', (1000, 10000)) # The latter arrival
# of P or diffracted
# P phase
If ``test_store`` is of :py:class:`~pyrocko.gf.meta.ConfigTypeB`::
test_store.t('S', (1000, 1000, 10000))
test_store.t('first{P|p|Pdiff|sP}', (1000, 1000, 10000)) # The
` # first arrival of
# the given phases is
# selected
:param timing: Timing string as described above
:type timing: str or :py:class:`~pyrocko.gf.meta.Timing`
:param \\*args: :py:class:`~pyrocko.gf.meta.Config` index tuple, e.g.
``(source_depth, distance, component)`` as in
:py:class:`~pyrocko.gf.meta.ConfigTypeA`.
:type \\*args: tuple
:returns: Phase arrival according to ``timing``
:rtype: float or None
'''
if len(args) == 1:
args = args[0]
else:
args = self.config.make_indexing_args1(*args)
if not isinstance(timing, meta.Timing):
timing = meta.Timing(timing)
return timing.evaluate(self.get_phase, args)
[docs] def make_timing_params(self, begin, end, snap_vred=True, force=False):
'''
Compute tight parameterized time ranges to include given timings.
Calculates appropriate time ranges to cover given begin and end timings
over all GF points in the store. A dict with the following keys is
returned:
* ``'tmin'``: time [s], minimum of begin timing over all GF points
* ``'tmax'``: time [s], maximum of end timing over all GF points
* ``'vred'``, ``'tmin_vred'``: slope [m/s] and offset [s] of reduction
velocity [m/s] appropriate to catch begin timing over all GF points
* ``'tlenmax_vred'``: maximum time length needed to cover all end
timings, when using linear slope given with (``vred``, ``tmin_vred``)
as start
'''
data = []
warned = set()
for args in self.config.iter_nodes(level=-1):
tmin = self.t(begin, args)
tmax = self.t(end, args)
if tmin is None:
warned.add(str(begin))
if tmax is None:
warned.add(str(end))
x = self.config.get_surface_distance(args)
data.append((x, tmin, tmax))
if len(warned):
w = ' | '.join(list(warned))
msg = '''determination of time window failed using phase
definitions: %s.\n Travel time table contains holes in probed ranges. You can
use `fomosto tttlsd` to fix holes.''' % w
if force:
logger.warn(msg)
else:
raise MakeTimingParamsFailed(msg)
xs, tmins, tmaxs = num.array(data, dtype=num.float).T
tlens = tmaxs - tmins
i = num.nanargmin(tmins)
if not num.isfinite(i):
raise MakeTimingParamsFailed('determination of time window failed')
tminmin = tmins[i]
x_tminmin = xs[i]
dx = (xs - x_tminmin)
dx = num.where(dx != 0.0, dx, num.nan)
s = (tmins - tminmin) / dx
sred = num.min(num.abs(s[num.isfinite(s)]))
deltax = self.config.distance_delta
if snap_vred:
tdif = sred*deltax
tdif2 = self.config.deltat * math.floor(tdif / self.config.deltat)
sred = tdif2/self.config.distance_delta
tmin_vred = tminmin - sred*x_tminmin
if snap_vred:
xe = x_tminmin - int(x_tminmin / deltax) * deltax
tmin_vred = float(
self.config.deltat *
math.floor(tmin_vred / self.config.deltat) - xe * sred)
tlenmax_vred = num.nanmax(tmax - (tmin_vred + sred*x))
if sred != 0.0:
vred = 1.0/sred
else:
vred = 0.0
return dict(
tmin=tminmin,
tmax=num.nanmax(tmaxs),
tlenmax=num.nanmax(tlens),
tmin_vred=tmin_vred,
tlenmax_vred=tlenmax_vred,
vred=vred)
[docs] def make_ttt(self, force=False):
'''Compute travel time tables.
Travel time tables are computed using the 1D earth model defined in
:py:attr:`~pyrocko.gf.meta.Config.earthmodel_1d` for each defined phase
in :py:attr:`~pyrocko.gf.meta.Config.tabulated_phases`. The accuracy of
the tablulated times is adjusted to the sampling rate of the store.
'''
from pyrocko import cake
config = self.config
if not config.tabulated_phases:
return
mod = config.earthmodel_1d
if config.earthmodel_receiver_1d:
self.check_earthmodels(config)
if not mod:
raise StoreError('no earth model found')
for pdef in config.tabulated_phases:
phase_id = pdef.id
phases = pdef.phases
horvels = pdef.horizontal_velocities
fn = self.phase_filename(phase_id)
if os.path.exists(fn) and not force:
logger.info('file already exists: %s' % fn)
continue
def evaluate(args):
if len(args) == 2:
zr, zs, x = (config.receiver_depth,) + args
elif len(args) == 3:
zr, zs, x = args
else:
assert False
t = []
if phases:
rays = mod.arrivals(
phases=phases,
distances=[x*cake.m2d],
zstart=zs,
zstop=zr)
for ray in rays:
t.append(ray.t)
for v in horvels:
t.append(x/(v*1000.))
if t:
return min(t)
else:
return None
logger.info('making travel time table for phasegroup "%s"' %
phase_id)
ip = spit.SPTree(
f=evaluate,
ftol=config.deltat*0.5,
xbounds=num.transpose((config.mins, config.maxs)),
xtols=config.deltas)
util.ensuredirs(fn)
ip.dump(fn)
def get_provided_components(self):
scheme_desc = meta.component_scheme_to_description[
self.config.component_scheme]
quantity = self.config.stored_quantity \
or scheme_desc.default_stored_quantity
if not quantity:
return scheme_desc.provided_components
else:
return [
quantity + '.' + comp
for comp in scheme_desc.provided_components]
def fix_ttt_holes(self, phase_id):
pdef = self.config.get_tabulated_phase(phase_id)
mode = None
for phase in pdef.phases:
for leg in phase.legs():
if mode is None:
mode = leg.mode
else:
if mode != leg.mode:
raise StoreError(
'Can only fix holes in pure P or pure S phases.')
sptree = self.get_stored_phase(phase_id)
sptree_lsd = self.config.fix_ttt_holes(sptree, mode)
phase_lsd = phase_id + '.lsd'
fn = self.phase_filename(phase_lsd)
sptree_lsd.dump(fn)
def statics(self, source, multi_location, itsnapshot, components,
interpolation='nearest_neighbor', nthreads=0):
if not self._f_index:
self.open()
out = {}
ntargets = multi_location.ntargets
source_terms = source.get_source_terms(self.config.component_scheme)
# TODO: deal with delays for snapshots > 1 sample
if itsnapshot is not None:
delays = source.times
else:
delays = source.times*0
itsnapshot = 1
if ntargets == 0:
raise StoreError('MultiLocation.coords5 is empty')
res = store_ext.store_calc_static(
self.cstore,
source.coords5(),
source_terms,
delays,
multi_location.coords5,
self.config.component_scheme,
interpolation,
itsnapshot,
nthreads)
out = {}
for icomp, (comp, comp_res) in enumerate(
zip(self.get_provided_components(), res)):
if comp not in components:
continue
out[comp] = res[icomp]
return out
def calc_seismograms(self, source, receivers, components, deltat=None,
itmin=None, nsamples=None,
interpolation='nearest_neighbor',
optimization='enable', nthreads=1):
config = self.config
assert interpolation in ['nearest_neighbor', 'multilinear'], \
'Unknown interpolation: %s' % interpolation
if not isinstance(receivers, list):
receivers = [receivers]
if deltat is None:
decimate = 1
else:
decimate = int(round(deltat/config.deltat))
if abs(deltat / (decimate * config.deltat) - 1.0) > 0.001:
raise StoreError(
'unavailable decimation ratio target.deltat / store.deltat'
' = %g / %g' % (deltat, config.deltat))
store, decimate_ = self._decimated_store(decimate)
if not store._f_index:
store.open()
scheme = config.component_scheme
source_coords_arr = source.coords5()
source_terms = source.get_source_terms(scheme)
delays = source.times.ravel()
nreceiver = len(receivers)
receiver_coords_arr = num.empty((nreceiver, 5))
for irec, rec in enumerate(receivers):
receiver_coords_arr[irec, :] = rec.coords5
dt = self.get_deltat()
itoffset = int(num.floor(delays.min()/dt)) if delays.size != 0 else 0
if itmin is None:
itmin = num.zeros(nreceiver, dtype=num.int32)
else:
itmin = (itmin-itoffset).astype(num.int32)
if nsamples is None:
nsamples = num.zeros(nreceiver, dtype=num.int32) - 1
else:
nsamples = nsamples.astype(num.int32)
try:
results = store_ext.store_calc_timeseries(
store.cstore,
source_coords_arr,
source_terms,
(delays - itoffset*dt),
receiver_coords_arr,
scheme,
interpolation,
itmin,
nsamples,
nthreads)
except Exception as e:
raise e
provided_components = self.get_provided_components()
ncomponents = len(provided_components)
seismograms = [dict() for _ in range(nreceiver)]
for ires in range(len(results)):
res = results.pop(0)
ireceiver = ires // ncomponents
comp = provided_components[res[-2]]
if comp not in components:
continue
tr = GFTrace(*res[:-2])
tr.deltat = config.deltat * decimate
tr.itmin += itoffset
tr.n_records_stacked = 0
tr.t_optimize = 0.
tr.t_stack = 0.
tr.err = res[-1]
seismograms[ireceiver][comp] = tr
return seismograms
def seismogram(self, source, receiver, components, deltat=None,
itmin=None, nsamples=None,
interpolation='nearest_neighbor',
optimization='enable', nthreads=1):
config = self.config
if deltat is None:
decimate = 1
else:
decimate = int(round(deltat/config.deltat))
if abs(deltat / (decimate * config.deltat) - 1.0) > 0.001:
raise StoreError(
'unavailable decimation ratio target.deltat / store.deltat'
' = %g / %g' % (deltat, config.deltat))
store, decimate_ = self._decimated_store(decimate)
if not store._f_index:
store.open()
scheme = config.component_scheme
source_coords_arr = source.coords5()
source_terms = source.get_source_terms(scheme)
receiver_coords_arr = receiver.coords5[num.newaxis, :].copy()
try:
params = store_ext.make_sum_params(
store.cstore,
source_coords_arr,
source_terms,
receiver_coords_arr,
scheme,
interpolation, nthreads)
except store_ext.StoreExtError:
raise meta.OutOfBounds()
provided_components = self.get_provided_components()
out = {}
for icomp, comp in enumerate(provided_components):
if comp in components:
weights, irecords = params[icomp]
neach = irecords.size // source.times.size
delays = num.repeat(source.times, neach)
tr = store._sum(
irecords, delays, weights, itmin, nsamples, decimate_,
'c', optimization)
# to prevent problems with rounding errors (BaseStore saves
# deltat as a 4-byte floating point value, value from YAML
# config is more accurate)
tr.deltat = config.deltat * decimate
out[comp] = tr
return out
__all__ = '''
gf_dtype
NotMultipleOfSamplingInterval
GFTrace
CannotCreate
CannotOpen
DuplicateInsert
NotAllowedToInterpolate
NoSuchExtra
NoSuchPhase
BaseStore
Store
SeismosizerErrorEnum
'''.split()