Source code for pyrocko.scenario.targets.waveform
# http://pyrocko.org - GPLv3
#
# The Pyrocko Developers, 21st Century
# ---|P------/S----------~Lg----------
from __future__ import absolute_import, division, print_function
import hashlib
import math
import logging
import numpy as num
from os import path as op
from functools import reduce
from pyrocko.guts import StringChoice, Float, List, Bool
from pyrocko.gui.marker import PhaseMarker, EventMarker
from pyrocko import gf, model, util, trace, io
from pyrocko.io_common import FileSaveError
from pyrocko import pile
from ..station import StationGenerator, RandomStationGenerator
from .base import TargetGenerator, NoiseGenerator
from ..error import ScenarioError
DEFAULT_STORE_ID = 'global_2s'
logger = logging.getLogger('pyrocko.scenario.targets.waveform')
guts_prefix = 'pf.scenario'
[docs]class WaveformNoiseGenerator(NoiseGenerator):
def get_time_increment(self, deltat):
return deltat * 1024
def get_intersecting_snippets(self, deltat, codes, tmin, tmax):
raise NotImplementedError()
def add_noise(self, tr):
for ntr in self.get_intersecting_snippets(
tr.deltat, tr.nslc_id, tr.tmin, tr.tmax):
tr.add(ntr)
[docs]class RealNoiseGenerator(WaveformNoiseGenerator):
waveform_paths = List.T(
optional=True,
help='List of files with waveforms.')
def get_intersecting_snippets(self, deltat, codes, tmin, tmax):
tinc = self.get_time_increment(deltat)
noise_waveforms = self.get_noise_waveforms()
trs = []
for noise_waveform in noise_waveforms:
if noise_waveform.network == codes[0] and noise_waveform.station == codes[1] and noise_waveform.location == codes[2] and noise_waveform.channel == codes[3]:
if noise_waveform.deltat != deltat:
noise_waveform.resample(deltat)
noise_waveform.shift(tmin - noise_waveform.tmin+1200)
noise_waveform.chop(tmin, tmax)
trs.append(trace.Trace(
codes[0], codes[1], codes[2], codes[3],
deltat=deltat,
tmin=tmin,
ydata=noise_waveform.ydata))
return trs
def get_noise_waveforms(self):
noise_waveforms = []
if self.waveform_paths:
for filename in self.waveform_paths:
noise_waveforms.extend(
io.load(filename))
self._noise_waveforms = noise_waveforms
return self._noise_waveforms
[docs]class WhiteNoiseGenerator(WaveformNoiseGenerator):
scale = Float.T(default=1e-6)
def get_seed_offset2(self, deltat, iw, codes):
m = hashlib.sha1(('%e %i %s.%s.%s.%s' % ((deltat, iw) + codes))
.encode('utf8'))
return int(m.hexdigest(), base=16) % 10000000
def get_intersecting_snippets(self, deltat, codes, tmin, tmax):
tinc = self.get_time_increment(deltat)
iwmin = int(math.floor(tmin / tinc))
iwmax = int(math.floor(tmax / tinc))
trs = []
for iw in range(iwmin, iwmax+1):
seed_offset = self.get_seed_offset2(deltat, iw, codes)
rstate = self.get_rstate(seed_offset)
n = int(round(tinc // deltat))
trs.append(trace.Trace(
codes[0], codes[1], codes[2], codes[3],
deltat=deltat,
tmin=iw*tinc,
ydata=rstate.normal(loc=0, scale=self.scale, size=n)))
return trs
[docs]class WaveformGenerator(TargetGenerator):
station_generator = StationGenerator.T(
default=RandomStationGenerator.D(),
help='The StationGenerator for creating the stations.')
noise_generator = WaveformNoiseGenerator.T(
default=WhiteNoiseGenerator.D(),
help='Add Synthetic noise on the waveforms.')
store_id = gf.StringID.T(
default=DEFAULT_STORE_ID,
help='The GF store to use for forward-calculations.')
seismogram_quantity = StringChoice.T(
choices=['displacement', 'velocity', 'acceleration', 'counts'],
default='displacement')
vmin_cut = Float.T(
default=2000.,
help='Minimum velocity to seismic velicty to consider in the model.')
vmax_cut = Float.T(
default=8000.,
help='Maximum velocity to seismic velicty to consider in the model.')
fmin = Float.T(
default=0.01,
help='Minimum frequency/wavelength to resolve in the'
' synthetic waveforms.')
tabulated_phases = List.T(
gf.meta.TPDef.T(), optional=True,
help='Define seismic phases to be calculated.')
tabulated_phases_from_store = Bool.T(
default=False,
help='Calculate seismic phase arrivals for all travel-time tables '
'defined in GF store.')
tabulated_phases_noise_scale = Float.T(
default=0.0,
help='Standard deviation of normally distributed noise added to '
'calculated phase arrivals.')
taper = trace.Taper.T(
optional=True,
help='Time domain taper applied to synthetic waveforms.')
compensate_synthetic_offsets = Bool.T(
default=False,
help='Center synthetic trace amplitudes using mean of waveform tips.')
tinc = Float.T(
optional=True,
help='Time increment of waveforms.')
continuous = Bool.T(
default=True,
help='Only produce traces that intersect with events.')
def __init__(self, *args, **kwargs):
super(WaveformGenerator, self).__init__(*args, **kwargs)
self._targets = []
self._piles = {}
def _get_pile(self, path):
apath = op.abspath(path)
assert op.isdir(apath)
if apath not in self._piles:
fns = util.select_files(
[apath], show_progress=False)
p = pile.Pile()
if fns:
p.load_files(fns, fileformat='mseed', show_progress=False)
self._piles[apath] = p
return self._piles[apath]
def get_stations(self):
return self.station_generator.get_stations()
[docs] def get_targets(self):
if self._targets:
return self._targets
for station in self.get_stations():
channel_data = []
channels = station.get_channels()
if channels:
for channel in channels:
channel_data.append([
channel.name,
channel.azimuth,
channel.dip])
else:
for c_name in ['BHZ', 'BHE', 'BHN']:
channel_data.append([
c_name,
model.guess_azimuth_from_name(c_name),
model.guess_dip_from_name(c_name)])
for c_name, c_azi, c_dip in channel_data:
target = gf.Target(
codes=(
station.network,
station.station,
station.location,
c_name),
quantity='displacement',
lat=station.lat,
lon=station.lon,
north_shift=station.north_shift,
east_shift=station.east_shift,
depth=station.depth,
store_id=self.store_id,
optimization='enable',
interpolation='nearest_neighbor',
azimuth=c_azi,
dip=c_dip)
self._targets.append(target)
return self._targets
[docs] def get_time_range(self, sources):
dmin, dmax = self.station_generator.get_distance_range(sources)
times = num.array([source.time for source in sources],
dtype=num.float)
tmin_events = num.min(times)
tmax_events = num.max(times)
tmin = tmin_events + dmin / self.vmax_cut - 10.0 / self.fmin
tmax = tmax_events + dmax / self.vmin_cut + 10.0 / self.fmin
return tmin, tmax
def get_codes_to_deltat(self, engine, sources):
deltats = {}
targets = self.get_targets()
for source in sources:
for target in targets:
deltats[target.codes] = engine.get_store(
target.store_id).config.deltat
return deltats
def get_useful_time_increment(self, engine, sources):
_, dmax = self.station_generator.get_distance_range(sources)
tinc = dmax / self.vmin_cut + 2.0 / self.fmin
deltats = set(self.get_codes_to_deltat(engine, sources).values())
deltat = reduce(util.lcm, deltats)
tinc = int(round(tinc / deltat)) * deltat
return tinc
def get_relevant_sources(self, sources, tmin, tmax):
dmin, dmax = self.station_generator.get_distance_range(sources)
trange = tmax - tmin
tmax_pad = trange + tmax + dmin / self.vmax_cut
tmin_pad = tmin - (dmax / self.vmin_cut + trange)
return [s for s in sources if s.time < tmax_pad and s.time > tmin_pad]
def get_waveforms(self, engine, sources, tmin, tmax):
sources_relevant = self.get_relevant_sources(sources, tmin, tmax)
if not (self.continuous or sources_relevant):
return []
trs = {}
tts = util.time_to_str
for nslc, deltat in self.get_codes_to_deltat(engine, sources).items():
tr_tmin = int(round(tmin / deltat)) * deltat
tr_tmax = (int(round(tmax / deltat))-1) * deltat
nsamples = int(round((tr_tmax - tr_tmin) / deltat)) + 1
tr = trace.Trace(
*nslc,
tmin=tr_tmin,
ydata=num.zeros(nsamples),
deltat=deltat)
self.noise_generator.add_noise(tr)
trs[nslc] = tr
logger.debug('Forward modelling waveforms between %s - %s...'
% (tts(tmin, format='%Y-%m-%d_%H-%M-%S'),
tts(tmax, format='%Y-%m-%d_%H-%M-%S')))
if not sources_relevant:
return list(trs.values())
targets = self.get_targets()
response = engine.process(sources_relevant, targets)
for source, target, res in response.iter_results(
get='results'):
if isinstance(res, gf.SeismosizerError):
logger.warning(
'Out of bounds! \nTarget: %s\nSource: %s\n' % (
'.'.join(target.codes)), source)
continue
tr = res.trace.pyrocko_trace()
candidate = trs[target.codes]
if not candidate.overlaps(tr.tmin, tr.tmax):
continue
if self.compensate_synthetic_offsets:
tr.ydata -= (num.mean(tr.ydata[-3:-1]) +
num.mean(tr.ydata[1:3])) / 2.
if self.taper:
tr.taper(self.taper)
resp = self.get_transfer_function(target.codes)
if resp:
tr = tr.transfer(transfer_function=resp)
candidate.add(tr)
trs[target.codes] = candidate
return list(trs.values())
def get_onsets(self, engine, sources, *args, **kwargs):
targets = {t.codes[:3]: t for t in self.get_targets()}
markers = []
for source in sources:
ev = source.pyrocko_event()
markers.append(EventMarker(ev))
for nsl, target in targets.items():
store = engine.get_store(target.store_id)
if self.tabulated_phases:
tabulated_phases = self.tabulated_phases
elif self.tabulated_phases_from_store:
tabulated_phases = store.config.tabulated_phases
else:
tabulated_phases = []
for phase in tabulated_phases:
t = store.t(phase.id, source, target)
if not t:
continue
noise_scale = self.tabulated_phases_noise_scale
if noise_scale != 0.0:
t += num.random.normal(scale=noise_scale)
t += source.time
markers.append(
PhaseMarker(
phasename=phase.id,
tmin=t,
tmax=t,
event=source.pyrocko_event(),
nslc_ids=(nsl+('*',),)
)
)
return markers
def get_transfer_function(self, codes):
if self.seismogram_quantity == 'displacement':
return None
elif self.seismogram_quantity == 'velocity':
return trace.DifferentiationResponse(1)
elif self.seismogram_quantity == 'acceleration':
return trace.DifferentiationResponse(2)
elif self.seismogram_quantity == 'counts':
raise NotImplementedError()
def ensure_data(self, engine, sources, path, tmin=None, tmax=None):
self.ensure_waveforms(engine, sources, path, tmin, tmax)
self.ensure_responses(path)
def ensure_waveforms(self, engine, sources, path, tmin=None, tmax=None):
path_waveforms = op.join(path, 'waveforms')
util.ensuredir(path_waveforms)
p = self._get_pile(path_waveforms)
nslc_ids = set(target.codes for target in self.get_targets())
def have_waveforms(tmin, tmax):
trs_have = p.all(
tmin=tmin, tmax=tmax,
load_data=False, degap=False,
trace_selector=lambda tr: tr.nslc_id in nslc_ids)
return any(tr.data_len() > 0 for tr in trs_have)
def add_files(paths):
p.load_files(paths, fileformat='mseed', show_progress=False)
path_traces = op.join(
path_waveforms,
'%(wmin_year)s',
'%(wmin_month)s',
'%(wmin_day)s',
'waveform_%(network)s_%(station)s_' +
'%(location)s_%(channel)s_%(tmin)s_%(tmax)s.mseed')
tmin_all, tmax_all = self.get_time_range(sources)
tmin = tmin if tmin is not None else tmin_all
tmax = tmax if tmax is not None else tmax_all
tts = util.time_to_str
tinc = self.tinc or self.get_useful_time_increment(engine, sources)
tmin = math.floor(tmin / tinc) * tinc
tmax = math.ceil(tmax / tinc) * tinc
nwin = int(round((tmax - tmin) / tinc))
pbar = None
for iwin in range(nwin):
tmin_win = tmin + iwin*tinc
tmax_win = tmin + (iwin+1)*tinc
if have_waveforms(tmin_win, tmax_win):
continue
if pbar is None:
pbar = util.progressbar('Generating waveforms', (nwin-iwin))
pbar.update(iwin)
trs = self.get_waveforms(engine, sources, tmin_win, tmax_win)
try:
wpaths = io.save(
trs, path_traces,
additional=dict(
wmin_year=tts(tmin_win, format='%Y'),
wmin_month=tts(tmin_win, format='%m'),
wmin_day=tts(tmin_win, format='%d'),
wmin=tts(tmin_win, format='%Y-%m-%d_%H-%M-%S'),
wmax_year=tts(tmax_win, format='%Y'),
wmax_month=tts(tmax_win, format='%m'),
wmax_day=tts(tmax_win, format='%d'),
wmax=tts(tmax_win, format='%Y-%m-%d_%H-%M-%S')))
for wpath in wpaths:
logger.debug('Generated file: %s' % wpath)
add_files(wpaths)
except FileSaveError as e:
raise ScenarioError(str(e))
if pbar is not None:
pbar.finish()
def ensure_responses(self, path):
from pyrocko.io import stationxml
path_responses = op.join(path, 'meta')
util.ensuredir(path_responses)
fn_stationxml = op.join(path_responses, 'stations.xml')
if op.exists(fn_stationxml):
return
logger.debug('Writing waveform meta information to StationXML...')
stations = self.station_generator.get_stations()
sxml = stationxml.FDSNStationXML.from_pyrocko_stations(stations)
sunit = {
'displacement': 'M',
'velocity': 'M/S',
'acceleration': 'M/S**2',
'counts': 'COUNTS'}[self.seismogram_quantity]
response = stationxml.Response(
instrument_sensitivity=stationxml.Sensitivity(
value=1.,
frequency=1.,
input_units=stationxml.Units(sunit),
output_units=stationxml.Units('COUNTS')),
stage_list=[])
for net, station, channel in sxml.iter_network_station_channels():
channel.response = response
sxml.dump_xml(filename=fn_stationxml)
def add_map_artists(self, engine, sources, automap):
automap.add_stations(self.get_stations())