Source code for grond.targets.waveform.target

from __future__ import print_function

import logging
import math
import numpy as num

from pyrocko import gf, trace, weeding, util
from pyrocko.guts import (Object, String, Float, Bool, Int, StringChoice,
                          Timestamp, List, Dict)
from pyrocko.guts_array import Array

from grond.dataset import NotFound
from grond.meta import GrondError, nslcs_to_patterns

from ..base import (MisfitConfig, MisfitTarget, MisfitResult, TargetGroup)
from grond.meta import has_get_plot_classes

from pyrocko import crust2x2
from string import Template

guts_prefix = 'grond'
logger = logging.getLogger('grond.targets.waveform.target')


class StoreIDSelectorError(GrondError):
    pass


class StoreIDSelector(Object):
    '''
    Base class for GF store selectors.

    GF store selectors can be implemented to select different stores, based on
    station location, source location or other characteristics.
    '''

    pass


class Crust2StoreIDSelector(StoreIDSelector):
    '''
    Store ID selector picking CRUST 2.0 model based on event location.
    '''

    template = String.T(
        help="Template for the GF store ID. For example ``'crust2_${id}'`` "
             "where ``'${id}'`` will be replaced with the corresponding CRUST "
             "2.0 profile identifier for the source location.")

    def get_store_id(self, event, st, cha):
        s = Template(self.template)
        return s.substitute(id=(
            crust2x2.get_profile(event.lat, event.lon)._ident).lower())


class StationDictStoreIDSelector(StoreIDSelector):
    '''
    Store ID selector using a manual station to store ID mapping.
    '''

    mapping = Dict.T(
        String.T(), gf.StringID.T(),
        help='Dictionary with station to store ID pairs, keys are NET.STA. '
             "Add a fallback store ID under the key ``'others'``.")

    def get_store_id(self, event, st, cha):
        try:
            store_id = self.mapping['%s.%s' % (st.network, st.station)]
        except KeyError:
            try:
                store_id = self.mapping['others']
            except KeyError:
                raise StoreIDSelectorError(
                    'No store ID found for station "%s.%s".' % (
                        st.network, st.station))

        return store_id


class DepthRangeToStoreID(Object):
    depth_min = Float.T()
    depth_max = Float.T()
    store_id = gf.StringID.T()


class StationDepthStoreIDSelector(StoreIDSelector):
    '''
    Store ID selector using a mapping from station depth range to store ID.
    '''

    depth_ranges = List.T(DepthRangeToStoreID.T())

    def get_store_id(self, event, st, cha):
        for r in self.depth_ranges:
            if r.depth_min <= st.depth < r.depth_max:
                return r.store_id

        raise StoreIDSelectorError(
            'No store ID found for station "%s.%s" at %g m depth.' % (
                st.network, st.station, st.depth))


class DomainChoice(StringChoice):
    choices = [
        'time_domain',
        'frequency_domain',
        'log_frequency_domain',
        'envelope',
        'absolute',
        'cc_max_norm']


[docs]class WaveformMisfitConfig(MisfitConfig): quantity = gf.QuantityType.T(default='displacement') fmin = Float.T(default=0.0, help='minimum frequency of bandpass filter') fmax = Float.T(help='maximum frequency of bandpass filter') ffactor = Float.T(default=1.5) tmin = gf.Timing.T( optional=True, help='Start of main time window used for waveform fitting.') tmax = gf.Timing.T( optional=True, help='End of main time window used for waveform fitting.') tfade = Float.T( optional=True, help='Decay time of taper prepended and appended to main time window ' 'used for waveform fitting [s].') pick_synthetic_traveltime = gf.Timing.T( optional=True, help='Synthetic phase arrival definition for alignment of observed ' 'and synthetic traces.') pick_phasename = String.T( optional=True, help='Name of picked phase for alignment of observed and synthetic ' 'traces.') domain = DomainChoice.T( default='time_domain', help='Type of data characteristic to be fitted.\n\nAvailable choices ' 'are: %s' % ', '.join("``'%s'``" % s for s in DomainChoice.choices)) norm_exponent = Int.T( default=2, help='Exponent to use in norm (1: L1-norm, 2: L2-norm)') tautoshift_max = Float.T( default=0.0, help='If non-zero, allow synthetic and observed traces to be shifted ' 'against each other by up to +/- the given value [s].') autoshift_penalty_max = Float.T( default=0.0, help='If non-zero, a penalty misfit is added for non-zero shift ' 'values.\n\nThe penalty value is computed as ' '``autoshift_penalty_max * normalization_factor * tautoshift**2 ' '/ tautoshift_max**2``') ranges = {} def get_full_frequency_range(self): return self.fmin / self.ffactor, self.fmax * self.ffactor
def log_exclude(target, reason): logger.debug('Excluding potential target %s: %s' % ( target.string_id(), reason))
[docs]class WaveformTargetGroup(TargetGroup): '''Handles seismogram targets or other targets of dynamic ground motion. ''' distance_min = Float.T( optional=True, help='excludes targets nearer to source, along a great circle') distance_max = Float.T( optional=True, help='excludes targets farther from source, along a great circle') distance_3d_min = Float.T( optional=True, help='excludes targets nearer from source (direct distance)') distance_3d_max = Float.T( optional=True, help='excludes targets farther from source (direct distance)') depth_min = Float.T( optional=True, help='excludes targets with smaller depths') depth_max = Float.T( optional=True, help='excludes targets with larger depths') include = List.T( String.T(), optional=True, help='If not None, list of stations/components to include according ' 'to their STA, NET.STA, NET.STA.LOC, or NET.STA.LOC.CHA codes.') exclude = List.T( String.T(), help='Stations/components to be excluded according to their STA, ' 'NET.STA, NET.STA.LOC, or NET.STA.LOC.CHA codes.') limit = Int.T(optional=True) channels = List.T( String.T(), optional=True, help="set channels to include, e.g. ['Z', 'T']") misfit_config = WaveformMisfitConfig.T() store_id_selector = StoreIDSelector.T( optional=True, help='select GF store based on event-station geometry.') def get_targets(self, ds, event, default_path='none'): logger.debug('Selecting waveform targets...') origin = event targets = [] stations = ds.get_stations() if len(stations) == 0: logger.warning( 'No stations found to create waveform target group.') for st in ds.get_stations(): logger.debug('Selecting waveforms for station %s.%s.%s' % st.nsl()) for cha in self.channels: nslc = st.nsl() + (cha,) logger.debug('Selecting waveforms for %s.%s.%s.%s' % nslc) if self.store_id_selector: store_id = self.store_id_selector.get_store_id( event, st, cha) else: store_id = self.store_id logger.debug('Selecting waveforms for %s.%s.%s.%s' % nslc) target = WaveformMisfitTarget( quantity='displacement', codes=nslc, lat=st.lat, lon=st.lon, north_shift=st.north_shift, east_shift=st.east_shift, depth=st.depth, interpolation=self.interpolation, store_id=store_id, misfit_config=self.misfit_config, manual_weight=self.weight, normalisation_family=self.normalisation_family, path=self.path or default_path) if ds.is_blacklisted(nslc): log_exclude(target, 'excluded by dataset') continue if util.match_nslc( nslcs_to_patterns(self.exclude), nslc): log_exclude(target, 'excluded by target group') continue if self.include is not None and not util.match_nslc( nslcs_to_patterns(self.include), nslc): log_exclude(target, 'excluded by target group') continue if self.distance_min is not None and \ target.distance_to(origin) < self.distance_min: log_exclude(target, 'distance < distance_min') continue if self.distance_max is not None and \ target.distance_to(origin) > self.distance_max: log_exclude(target, 'distance > distance_max') continue if self.distance_3d_min is not None and \ target.distance_3d_to(origin) < self.distance_3d_min: log_exclude(target, 'distance_3d < distance_3d_min') continue if self.distance_3d_max is not None and \ target.distance_3d_to(origin) > self.distance_3d_max: log_exclude(target, 'distance_3d > distance_3d_max') continue if self.depth_min is not None and \ target.depth < self.depth_min: log_exclude(target, 'depth < depth_min') continue if self.depth_max is not None and \ target.depth > self.depth_max: log_exclude(target, 'depth > depth_max') continue azi, _ = target.azibazi_to(origin) if cha == 'R': target.azimuth = azi - 180. target.dip = 0. elif cha == 'T': target.azimuth = azi - 90. target.dip = 0. elif cha == 'Z': target.azimuth = 0. target.dip = -90. target.set_dataset(ds) targets.append(target) if self.limit: return weed(origin, targets, self.limit)[0] else: return targets
class TraceSpectrum(Object): network = String.T() station = String.T() location = String.T() channel = String.T() deltaf = Float.T(default=1.0) fmin = Float.T(default=0.0) ydata = Array.T(shape=(None,), dtype=num.complex, serialize_as='list') def get_ydata(self): return self.ydata def get_xdata(self): return self.fmin + num.arange(self.ydata.size) * self.deltaf class WaveformPiggybackSubtarget(Object): piggy_id = Int.T() _next_piggy_id = 0 @classmethod def new_piggy_id(cls): piggy_id = WaveformPiggybackSubtarget._next_piggy_id WaveformPiggybackSubtarget._next_piggy_id += 1 return piggy_id def __init__(self, piggy_id=None, **kwargs): if piggy_id is None: piggy_id = self.new_piggy_id() Object.__init__(self, piggy_id=piggy_id, **kwargs) def evaluate( self, tr_proc_obs, trspec_proc_obs, tr_proc_syn, trspec_proc_syn): raise NotImplementedError() class WaveformPiggybackSubresult(Object): piggy_id = Int.T()
[docs]class WaveformMisfitResult(gf.Result, MisfitResult): '''Carries the observations for a target and corresponding synthetics. A number of different waveform or phase representations are possible. ''' processed_obs = trace.Trace.T(optional=True) processed_syn = trace.Trace.T(optional=True) filtered_obs = trace.Trace.T(optional=True) filtered_syn = trace.Trace.T(optional=True) spectrum_obs = TraceSpectrum.T(optional=True) spectrum_syn = TraceSpectrum.T(optional=True) taper = trace.Taper.T(optional=True) tobs_shift = Float.T(optional=True) tsyn_pick = Timestamp.T(optional=True) tshift = Float.T(optional=True) cc = trace.Trace.T(optional=True) piggyback_subresults = List.T(WaveformPiggybackSubresult.T())
[docs]@has_get_plot_classes class WaveformMisfitTarget(gf.Target, MisfitTarget): flip_norm = Bool.T(default=False) misfit_config = WaveformMisfitConfig.T() can_bootstrap_weights = True def __init__(self, **kwargs): gf.Target.__init__(self, **kwargs) MisfitTarget.__init__(self, **kwargs) self._piggyback_subtargets = [] def string_id(self): return '.'.join(x for x in (self.path,) + self.codes) @classmethod def get_plot_classes(cls): from . import plot plots = super(WaveformMisfitTarget, cls).get_plot_classes() plots.extend(plot.get_plot_classes()) return plots def get_combined_weight(self): if self._combined_weight is None: w = self.manual_weight for analyser in self.analyser_results.values(): w *= analyser.weight self._combined_weight = num.array([w], dtype=num.float) return self._combined_weight def get_taper_params(self, engine, source): store = engine.get_store(self.store_id) config = self.misfit_config tmin_fit = source.time + store.t(config.tmin, source, self) tmax_fit = source.time + store.t(config.tmax, source, self) if config.fmin > 0.0: tfade = 1.0/config.fmin else: tfade = 1.0/config.fmax if config.tfade is None: tfade_taper = tfade else: tfade_taper = config.tfade return tmin_fit, tmax_fit, tfade, tfade_taper def get_backazimuth_for_waveform(self): return backazimuth_for_waveform(self.azimuth, self.codes) @property def backazimuth(self): return self.azimuth - 180. def get_freqlimits(self): config = self.misfit_config return ( config.fmin/config.ffactor, config.fmin, config.fmax, config.fmax*config.ffactor) def get_pick_shift(self, engine, source): config = self.misfit_config tobs = None tsyn = None ds = self.get_dataset() if config.pick_synthetic_traveltime and config.pick_phasename: store = engine.get_store(self.store_id) tsyn = source.time + store.t( config.pick_synthetic_traveltime, source, self) marker = ds.get_pick( source.name, self.codes[:3], config.pick_phasename) if marker: tobs = marker.tmin return tobs, tsyn def get_cutout_timespan(self, tmin, tmax, tfade): if self.misfit_config.fmin > 0: tinc_obs = 1.0 / self.misfit_config.fmin else: tinc_obs = 10.0 / self.misfit_config.fmax tmin_obs = (math.floor( (tmin - tfade) / tinc_obs) - 1.0) * tinc_obs tmax_obs = (math.ceil( (tmax + tfade) / tinc_obs) + 1.0) * tinc_obs return tmin_obs, tmax_obs def post_process(self, engine, source, tr_syn): tr_syn = tr_syn.pyrocko_trace() nslc = self.codes config = self.misfit_config tmin_fit, tmax_fit, tfade, tfade_taper = \ self.get_taper_params(engine, source) ds = self.get_dataset() tobs, tsyn = self.get_pick_shift(engine, source) if None not in (tobs, tsyn): tobs_shift = tobs - tsyn else: tobs_shift = 0.0 tr_syn.extend( tmin_fit - tfade * 2.0, tmax_fit + tfade * 2.0, fillmethod='repeat') freqlimits = self.get_freqlimits() if config.quantity == 'displacement': syn_resp = None elif config.quantity == 'velocity': syn_resp = trace.DifferentiationResponse(1) elif config.quantity == 'acceleration': syn_resp = trace.DifferentiationResponse(2) else: GrondError('Unsupported quantity: %s' % config.quantity) tr_syn = tr_syn.transfer( freqlimits=freqlimits, tfade=tfade, transfer_function=syn_resp) tr_syn.chop(tmin_fit - 2*tfade, tmax_fit + 2*tfade) tmin_obs, tmax_obs = self.get_cutout_timespan( tmin_fit+tobs_shift, tmax_fit+tobs_shift, tfade) try: tr_obs = ds.get_waveform( nslc, quantity=config.quantity, tinc_cache=1.0/(config.fmin or 0.1*config.fmax), tmin=tmin_fit+tobs_shift-tfade, tmax=tmax_fit+tobs_shift+tfade, tfade=tfade, freqlimits=freqlimits, deltat=tr_syn.deltat, cache=True, backazimuth=self.get_backazimuth_for_waveform()) if tobs_shift != 0.0: tr_obs = tr_obs.copy() tr_obs.shift(-tobs_shift) mr = misfit( tr_obs, tr_syn, taper=trace.CosTaper( tmin_fit - tfade_taper, tmin_fit, tmax_fit, tmax_fit + tfade_taper), domain=config.domain, exponent=config.norm_exponent, flip=self.flip_norm, result_mode=self._result_mode, tautoshift_max=config.tautoshift_max, autoshift_penalty_max=config.autoshift_penalty_max, subtargets=self._piggyback_subtargets) self._piggyback_subtargets = [] mr.tobs_shift = float(tobs_shift) mr.tsyn_pick = float_or_none(tsyn) return mr except NotFound as e: logger.debug(str(e)) raise gf.SeismosizerError('No waveform data: %s' % str(e)) def get_plain_targets(self, engine, source): d = dict( (k, getattr(self, k)) for k in gf.Target.T.propnames) return [gf.Target(**d)] def add_piggyback_subtarget(self, subtarget): self._piggyback_subtargets.append(subtarget)
def misfit( tr_obs, tr_syn, taper, domain, exponent, tautoshift_max, autoshift_penalty_max, flip, result_mode='sparse', subtargets=[]): ''' Calculate misfit between observed and synthetic trace. :param tr_obs: observed trace as :py:class:`pyrocko.trace.Trace` :param tr_syn: synthetic trace as :py:class:`pyrocko.trace.Trace` :param taper: taper applied in timedomain as :py:class:`pyrocko.trace.Taper` :param domain: how to calculate difference, see :py:class:`DomainChoice` :param exponent: exponent of Lx type norms :param tautoshift_max: if non-zero, return lowest misfit when traces are allowed to shift against each other by up to +/- ``tautoshift_max`` :param autoshift_penalty_max: if non-zero, a penalty misfit is added for for non-zero shift values. The penalty value is ``autoshift_penalty_max * normalization_factor * \ tautoshift**2 / tautoshift_max**2`` :param flip: ``bool``, if set to ``True``, normalization factor is computed against *tr_syn* rather than *tr_obs* :param result_mode: ``'full'``, include traces and spectra or ``'sparse'``, include only misfit and normalization factor in result :returns: object of type :py:class:`WaveformMisfitResult` ''' trace.assert_same_sampling_rate(tr_obs, tr_syn) deltat = tr_obs.deltat tmin, tmax = taper.time_span() tr_proc_obs, trspec_proc_obs = _process(tr_obs, tmin, tmax, taper, domain) tr_proc_syn, trspec_proc_syn = _process(tr_syn, tmin, tmax, taper, domain) piggyback_results = [] for subtarget in subtargets: piggyback_results.append( subtarget.evaluate( tr_proc_obs, trspec_proc_obs, tr_proc_syn, trspec_proc_syn)) tshift = None ctr = None deltat = tr_proc_obs.deltat if domain in ('time_domain', 'envelope', 'absolute'): a, b = tr_proc_syn.ydata, tr_proc_obs.ydata if flip: b, a = a, b nshift_max = max(0, min(a.size-1, int(math.floor(tautoshift_max / deltat)))) if nshift_max == 0: m, n = trace.Lx_norm(a, b, norm=exponent) else: mns = [] for ishift in range(-nshift_max, nshift_max+1): if ishift < 0: a_cut = a[-ishift:] b_cut = b[:ishift] elif ishift == 0: a_cut = a b_cut = b elif ishift > 0: a_cut = a[:-ishift] b_cut = b[ishift:] mns.append(trace.Lx_norm(a_cut, b_cut, norm=exponent)) ms, ns = num.array(mns).T iarg = num.argmin(ms) tshift = (iarg-nshift_max)*deltat m, n = ms[iarg], ns[iarg] m += autoshift_penalty_max * n * tshift**2 / tautoshift_max**2 elif domain == 'cc_max_norm': ctr = trace.correlate( tr_proc_syn, tr_proc_obs, mode='same', normalization='normal') tshift, cc_max = ctr.max() m = 0.5 - 0.5 * cc_max n = 0.5 elif domain == 'frequency_domain': a, b = trspec_proc_syn.ydata, trspec_proc_obs.ydata if flip: b, a = a, b m, n = trace.Lx_norm(num.abs(a), num.abs(b), norm=exponent) elif domain == 'log_frequency_domain': a, b = trspec_proc_syn.ydata, trspec_proc_obs.ydata if flip: b, a = a, b a = num.abs(a) b = num.abs(b) eps = (num.mean(a) + num.mean(b)) * 1e-7 if eps == 0.0: eps = 1e-7 a = num.log(a + eps) b = num.log(b + eps) m, n = trace.Lx_norm(a, b, norm=exponent) if result_mode == 'full': result = WaveformMisfitResult( misfits=num.array([[m, n]], dtype=num.float), processed_obs=tr_proc_obs, processed_syn=tr_proc_syn, filtered_obs=tr_obs.copy(), filtered_syn=tr_syn, spectrum_obs=trspec_proc_obs, spectrum_syn=trspec_proc_syn, taper=taper, tshift=tshift, cc=ctr) elif result_mode == 'sparse': result = WaveformMisfitResult( misfits=num.array([[m, n]], dtype=num.float)) else: assert False result.piggyback_subresults = piggyback_results return result def _extend_extract(tr, tmin, tmax): deltat = tr.deltat itmin_frame = int(math.floor(tmin/deltat)) itmax_frame = int(math.ceil(tmax/deltat)) nframe = itmax_frame - itmin_frame + 1 n = tr.data_len() a = num.empty(nframe, dtype=num.float) itmin_tr = int(round(tr.tmin / deltat)) itmax_tr = itmin_tr + n icut1 = min(max(0, itmin_tr - itmin_frame), nframe) icut2 = min(max(0, itmax_tr - itmin_frame), nframe) icut1_tr = min(max(0, icut1 + itmin_frame - itmin_tr), n) icut2_tr = min(max(0, icut2 + itmin_frame - itmin_tr), n) a[:icut1] = tr.ydata[0] a[icut1:icut2] = tr.ydata[icut1_tr:icut2_tr] a[icut2:] = tr.ydata[-1] tr = tr.copy(data=False) tr.tmin = itmin_frame * deltat tr.set_ydata(a) return tr def _process(tr, tmin, tmax, taper, domain): tr_proc = _extend_extract(tr, tmin, tmax) tr_proc.taper(taper) df = None trspec_proc = None if domain == 'envelope': tr_proc = tr_proc.envelope(inplace=False) tr_proc.set_ydata(num.abs(tr_proc.get_ydata())) elif domain == 'absolute': tr_proc.set_ydata(num.abs(tr_proc.get_ydata())) elif domain in ('frequency_domain', 'log_frequency_domain'): ndata = tr_proc.ydata.size nfft = trace.nextpow2(ndata) padded = num.zeros(nfft, dtype=num.float) padded[:ndata] = tr_proc.ydata spectrum = num.fft.rfft(padded) df = 1.0 / (tr_proc.deltat * nfft) trspec_proc = TraceSpectrum( network=tr_proc.network, station=tr_proc.station, location=tr_proc.location, channel=tr_proc.channel, deltaf=df, fmin=0.0, ydata=spectrum) return tr_proc, trspec_proc def backazimuth_for_waveform(azimuth, nslc): if nslc[-1] == 'R': backazimuth = azimuth + 180. elif nslc[-1] == 'T': backazimuth = azimuth + 90. else: backazimuth = None return backazimuth def float_or_none(x): if x is None: return x else: return float(x) def weed(origin, targets, limit, neighborhood=3): azimuths = num.zeros(len(targets)) dists = num.zeros(len(targets)) for i, target in enumerate(targets): _, azimuths[i] = target.azibazi_to(origin) dists[i] = target.distance_to(origin) badnesses = num.ones(len(targets), dtype=float) deleted, meandists_kept = weeding.weed( azimuths, dists, badnesses, nwanted=limit, neighborhood=neighborhood) targets_weeded = [ target for (delete, target) in zip(deleted, targets) if not delete] return targets_weeded, meandists_kept, deleted __all__ = ''' StoreIDSelectorError StoreIDSelector Crust2StoreIDSelector StationDictStoreIDSelector DepthRangeToStoreID StationDepthStoreIDSelector WaveformTargetGroup WaveformMisfitConfig WaveformMisfitTarget WaveformMisfitResult WaveformPiggybackSubtarget WaveformPiggybackSubresult '''.split()