Source code for pyrocko.plot.directivity

# http://pyrocko.org - GPLv3
#
# The Pyrocko Developers, 21st Century
# ---|P------/S----------~Lg----------

import logging
import numpy as num
import matplotlib.pyplot as plt

from matplotlib.cm import ScalarMappable
from matplotlib.ticker import FuncFormatter

from pyrocko.plot import beachball
from pyrocko.gf.meta import Timing
from pyrocko.gf import LocalEngine, Target, RectangularSource


km = 1e3
r2d = 180. / num.pi
d2r = num.pi / 180.

logger = logging.getLogger(__name__)


QUANTITY_LABEL = {
    'displacement': 'Displacement [m]',
    'velocity': 'Velocity [m/s]',
    'acceleration': 'Acceleration [m/s²]'
}


def get_azimuthal_targets(
        store_id, source, radius,
        azi_begin=0., azi_end=360., dazi=1.,
        depth=0.0,
        interpolation='multilinear',
        components='RTZ', quantity='displacement'):

    assert dazi > 0.
    assert azi_begin < azi_end

    nstations = int((azi_end - azi_begin) // dazi)
    assert nstations > 0

    azimuths = num.linspace(azi_begin, azi_end, nstations)

    coords = num.zeros((2, nstations))
    coords[0, :] = num.cos(azimuths*d2r)
    coords[1, :] = num.sin(azimuths*d2r)
    coords *= radius

    dips = {'R': 0., 'T': 0., 'Z': -90.}
    for comp in components:
        assert comp in dips.keys()

    target_kwargs = dict(
        quantity=quantity,
        interpolation=interpolation,
        store_id=store_id)

    targets = [
        Target(
            lat=source.lat,
            lon=source.lon,
            north_shift=coords[0, iazi] + source.north_shift,
            east_shift=coords[1, iazi] + source.east_shift,
            depth=depth,
            azimuth={
                'R': azi,
                'T': azi+90.,
                'Z': 0.
            }[channel],
            dip=dips[channel],
            codes=('', 'S%01d' % iazi, '', channel),
            **target_kwargs)
        for iazi, azi in enumerate(azimuths)
        for channel in components]

    for target, azi in zip(targets, azimuths):
        target.azimuth = azi
        target.dazi = dazi

    return targets, azimuths


def get_seismogram_array(
        response, fmin=None, fmax=None,
        component='R', envelope=False):
    resp = response
    assert len(resp.request.sources) == 1, 'more than one source in response'

    tmin = None
    tmax = None
    traces = []

    for _, target, tr in response.iter_results():
        if target.codes[-1] != component:
            continue
        assert hasattr(target, 'azimuth')
        assert target.dazi

        if fmin is not None:
            tr.highpass(4, fmin, demean=False)

        if fmax is not None:
            tr.lowpass(4, fmax, demean=False)

        tmin = min(tmin, tr.tmin) if tmin else tr.tmin
        tmax = max(tmax, tr.tmax) if tmax else tr.tmax
        traces.append(tr)

    for tr in traces:
        tr.extend(tmin, tmax, fillmethod='repeat')
        if envelope:
            tr.envelope()

    data = num.array([tr.get_ydata() for tr in traces])
    nsamples = data.shape[1]
    return data, num.linspace(tmin, tmax, nsamples)


def hillshade(array, azimuth, angle_altitude):
    azimuth = 360.0 - azimuth
    azi = azimuth * r2d
    alt = angle_altitude * r2d

    x, y = num.gradient(array)
    slope = num.pi/2. - num.arctan(num.sqrt(x*x + y*y))
    aspect = num.arctan2(-x, y)

    shaded = num.sin(alt)*num.sin(slope) \
        + num.cos(alt)*num.cos(slope)*num.cos((azi - num.pi/2.) - aspect)

    return (shaded + 1.)/2.


def hillshade_seismogram_array(
        seismogram_array, rgba_map,
        shad_lim=(.4, .98), contrast=1., blend_mode='multiply'):
    assert blend_mode in ('multiply', 'screen'), 'unknown blend mode'
    assert shad_lim[0] < shad_lim[1], 'bad shading limits'
    from scipy.ndimage import convolve as im_conv
    # Light source from somewhere above - psychologically the best choice
    # from upper left
    ramp = num.array([[1., 0.], [0., -1.]]) * contrast

    # convolution of two 2D arrays
    shad = im_conv(seismogram_array, ramp.T).ravel()
    shad *= -1.

    # if there are strong artifical edges in the data, shades get
    # dominated by them. Cutting off the largest and smallest 2% of
    # # shades helps
    percentile2 = num.percentile(shad, 2.0)
    percentile98 = num.percentile(shad, 98.0)

    shad[shad > percentile98] = percentile98
    shad[shad < percentile2] = percentile2

    # # normalize shading
    shad -= num.nanmin(shad)
    shad /= num.nanmax(shad)

    # # reduce range to balance gray color
    shad *= shad_lim[1] - shad_lim[0]
    shad += shad_lim[0]

    if blend_mode == 'screen':
        rgba_map[:, :3] = 1. - ((1. - rgba_map[:, :3])*(shad[:, num.newaxis]))
    elif blend_mode == 'multiply':
        rgba_map[:, :3] *= shad[:, num.newaxis]

    return rgba_map


[docs]def plot_directivity( engine, source, store_id, distance=300*km, azi_begin=0., azi_end=360., dazi=1., phases={'P': 'first{stored:any_P}-10%', 'S': 'last{stored:any_S}+50'}, interpolation='multilinear', target_depth=0.0, quantity='displacement', envelope=False, component='R', fmin=0.01, fmax=0.1, show_trace=None, hillshade=True, cmap=None, plot_mt='full', show_phases=True, show_description=True, show_colorbar=True, reverse_time=False, show_nucleations=True, axes=None, nthreads=0): ''' Plot the directivity and radiation characteristics of source models. Synthetic seismic traces (R, T or Z) are forward-modelled at a defined radius, covering the full or partial azimuthal range and projected on a polar plot. Difference in the amplitude are enhanced by hillshading the data. :param engine: Forward modelling engine :type engine: :py:class:`~pyrocko.gf.seismosizer.Engine` :param source: Parametrized source model :type source: :py:class:`~pyrocko.gf.seismosizer.Source` :param store_id: Store ID used for forward modelling :type store_id: str :param distance: Distance in [m] :type distance: float :param azi_begin: Begin azimuth in [deg] :type azi_begin: float :param azi_end: End azimuth in [deg] :type azi_end: float :param dazi: Delta azimuth, bin size [deg] :type dazi: float :param phases: Phases to define start and end of time window :type phases: :py:class:`dict` with :py:class:`str` keys and :py:class:`~pyrocko.gf.meta.Timing` phase definitions as values. :param quantity: Seismogram quantity, default ``displacement`` :type quantity: str :param envelope: Plot envelope instead of seismic trace :type envelope: bool :param component: Forward modelled component, default ``R``. Choose from `RTZ` :type component: str :param fmin: Bandpass lower frequency [Hz], default ``0.01`` :type fmin: float :param fmax: Bandpass upper frequency [Hz], default ``0.1`` :type fmax: float :param hillshade: Enable hillshading, default ``True`` :type hillshade: bool :param cmap: Matplotlib colormap to use, default ``seismic``. When ``envelope`` is ``True`` the default colormap will be ``Reds``. :type cmap: str :param plot_mt: Plot a centered moment tensor, default ``full``. Choose from ``full, deviatoric, dc or False`` :type plot_mt: str, bool :param show_phases: Show annotations, default ``True`` :type show_phases: bool :param show_description: Show description, default ``True`` :type show_description: bool :param reverse_time: Reverse time axis. First phases arrive at the center, default ``False`` :type reverse_time: bool :param show_nucleations: Show nucleation piercing points on the moment tensor, default ``True`` :type show_nucleations: bool :param axes: Give axes to plot into :type axes: :py:class:`matplotlib.axes.Axes` :param nthreads: Number of threads used for forward modelling, default ``0`` - all available cores :type nthreads: int ''' ax_tr = None if axes is None: fig = plt.figure(figsize=(10, 8)) if show_trace is not None: ax = fig.add_axes((0.05, 0.3, 0.9, 0.65), polar=True) ax_tr = fig.add_axes((0.1, 0.05, 0.85, 0.2)) else: ax = fig.add_axes((0.05, 0.05, 0.9, 0.9), polar=True) else: fig = axes.figure ax = axes if envelope and cmap is None: cmap = 'Reds' elif cmap is None: cmap = 'seismic' targets, azimuths = get_azimuthal_targets( store_id, source, distance, azi_begin, azi_end, dazi, depth=target_depth, interpolation=interpolation, components=component, quantity=quantity) ref_target = targets[0] store = engine.get_store(store_id) mt = source.pyrocko_moment_tensor(store=store, target=ref_target) resp = engine.process(source, targets, nthreads=nthreads) data, times = get_seismogram_array( resp, fmin, fmax, component=component, envelope=envelope) times -= source.time timings = {label: Timing(phase) for label, phase in phases.items()} discretized_source = source.discretize_basesource(store) target_timings = {} for timing in timings.values(): target_timings[timing] = num.fromiter( (store.t(timing, discretized_source, target) for target in targets), float) all_times = num.array(tuple(t for t in target_timings.values())) tbegin = all_times.min() tend = all_times.max() tsel = num.logical_and(times >= tbegin, times <= tend) data = data[:, tsel].T times = times[tsel] duration = times[-1] - times[0] vmax = num.abs(data).max() cmw = ScalarMappable(cmap=cmap) cmw.set_array(data) cmw.set_clim(-vmax, vmax) if envelope: cmw.set_clim(0., vmax) ax.set_theta_zero_location('N') ax.set_theta_direction(-1) strike_label = mt.strike1 if hasattr(source, 'strike'): strike_label = source.strike try: ax.set_rlabel_position(strike_label % 180. - 180.) except AttributeError: logger.warn('Old matplotlib version: cannot set label positions') def r_fmt(v, p): if v < tbegin or v > tend: return '' return '%g s' % v ax.yaxis.set_major_formatter(FuncFormatter(r_fmt)) if reverse_time: r_lim = (times[0] - 0.3*duration, times[-1]) else: r_lim = (times[-1] + 0.3*duration, times[0]) ax.set_rlim(r_lim) ax.grid(zorder=20) if isinstance(plot_mt, str): mt_size = .15 beachball.plot_beachball_mpl( mt, ax, beachball_type=plot_mt, size=mt_size, size_units='axes', color_t=(0.7, 0.4, 0.4), position=(.5, .5), linewidth=1.) if hasattr(source, 'nucleation_x') and hasattr(source, 'nucleation_y')\ and show_nucleations: try: iter(source.nucleation_x) nucleation_x = source.nucleation_x nucleation_y = source.nucleation_y except TypeError: nucleation_x = [source.nucleation_x] nucleation_y = [source.nucleation_y] for nx, ny in zip(nucleation_x, nucleation_y): angle = float(num.arctan2(ny, nx)) rtp = num.array([[1., angle, (90.-source.strike)*d2r]]) points = beachball.numpy_rtp2xyz(rtp) x, y = beachball.project(points, projection='lambert').T norm = num.sqrt(x**2 + y**2) x = x / norm * mt_size/2. y = y / norm * mt_size/2. ax.plot(x+.5, y+.5, 'x', ms=6, mew=2, mec='darkred', mfc='red', transform=ax.transAxes, zorder=10) mesh = ax.pcolormesh( azimuths * d2r, times, data, cmap=cmw.cmap, norm=cmw.norm, shading='gouraud', zorder=0) if hillshade: mesh.update_scalarmappable() color = mesh.get_facecolor() color = hillshade_seismogram_array( data, color, shad_lim=(.85, 1.), blend_mode='multiply') mesh.set_facecolor(color) if show_phases: label_theta = 270. theta = num.fromiter( (target.azimuth for target in targets), float) * d2r for label, phase_str in phases.items(): timing = Timing(phase_str) timing.offset = 0. timing.offset_is_slowness = False timing.offset_is_percent = False label_times = num.fromiter( (store.t(timing, discretized_source, target) for target in targets), float) ax.plot(theta, label_times, color='k', alpha=.3, lw=1., ls='--') ax.text( theta[0], label_times[0], label, ha='left', color='k', fontsize='small') label_theta += 30. if show_description: description = ( 'Component {component:s}\n' 'Distance {distance:g} km').format( component=component, distance=distance / km) if fmin and fmax: description += '\nBandpass {fmin:g} - {fmax:g} Hz'.format( fmin=fmin, fmax=fmax) elif fmin: description += '\nHighpass {fmin:g} Hz'.format(fmin=fmin) elif fmax: description += '\nLowpass {fmax:g} Hz'.format(fmax=fmax) ax.text( -.05, -.05, description, fontsize='small', ha='left', va='bottom', transform=ax.transAxes) data_label = "Part. " + QUANTITY_LABEL[quantity] if show_colorbar: if envelope: data_label = 'Envelope ' + data_label cb = fig.colorbar( cmw, ax=ax, orientation='vertical', shrink=.8, pad=0.11) cb.set_label(data_label) if show_trace is not None and ax_tr is not None: itr = (num.abs(azimuths - show_trace)).argmin() tr_data = data[:, itr] tr_time = times ax.plot( num.array([show_trace, show_trace]) * d2r, [times[0], times[-1]], ls="--", c="k", alpha=0.5) ax_tr.plot(tr_time, tr_data, c='k', alpha=.5) ax_tr.grid(alpha=0.3) ax_tr.spines.top.set_visible(False) ax_tr.spines.right.set_visible(False) ax_tr.set_xlabel("Time [s]") ax_tr.set_ylabel(data_label) ax_tr.set_xlim([times[0], times[-1]]) for label, phase_str in phases.items(): timing = Timing(phase_str) timing.offset = 0. timing.offset_is_slowness = False timing.offset_is_percent = False label_time = store.t(timing, discretized_source, targets[itr]) ax_tr.axvline(label_time, c='k', alpha=.5, ls='--') ax_tr.annotate( label, xy=(label_time, ax_tr.get_ylim()[-1]), xycoords='data', xytext=(5, -5), textcoords='offset points', fontsize='small', color='k') if axes is None: plt.show() return resp
__all__ = ['plot_directivity'] if __name__ == '__main__': engine = LocalEngine(store_superdirs=['.'], use_config=True) rect_source = RectangularSource( depth=2.6*km, strike=240., dip=76.6, rake=-.4, anchor='top', nucleation_x=-.57, nucleation_y=-.59, velocity=2070., length=27*km, width=9.4*km, slip=1.4) resp = plot_directivity( engine, rect_source, 'crust2_ib', dazi=5, component='R', quantity='displacement', envelope=True)