import logging
import numpy as num
import matplotlib.pyplot as plt
from matplotlib.cm import ScalarMappable
from matplotlib.ticker import FuncFormatter
from pyrocko.plot import beachball
from pyrocko.gf.meta import Timing
from pyrocko.gf import LocalEngine, Target, RectangularSource
from pyrocko.gf.seismosizer import map_anchor
km = 1e3
r2d = 180. / num.pi
d2r = num.pi / 180.
logger = logging.getLogger(__name__)
QUANTITY_LABEL = {
'displacement': 'Displacement [m]',
'velocity': 'Velocity [m/s]',
'acceleration': 'Acceleration [m/s²]'
}
def get_azimuthal_targets(
store_id, source, radius,
azi_begin=0., azi_end=360., dazi=1.,
interpolation='multilinear',
components='RTZ', quantity='displacement'):
assert dazi > 0.
assert azi_begin < azi_end
nstations = int((azi_end - azi_begin) // dazi)
assert nstations > 0
azimuths = num.linspace(azi_begin, azi_end, nstations)
coords = num.zeros((2, nstations))
coords[0, :] = num.cos(azimuths*d2r)
coords[1, :] = num.sin(azimuths*d2r)
coords *= radius
dips = {'R': 0., 'T': 0., 'Z': -90.}
for comp in components:
assert comp in dips.keys()
target_kwargs = dict(
quantity='displacement',
interpolation=interpolation,
store_id=store_id)
targets = [
Target(
lat=source.lat,
lon=source.lon,
north_shift=coords[0, iazi] + source.north_shift,
east_shift=coords[1, iazi] + source.east_shift,
azimuth={
'R': azi,
'T': azi+90.,
'Z': 0.
}[channel],
dip=dips[channel],
codes=('', 'S%01d' % iazi, '', channel),
**target_kwargs)
for iazi, azi in enumerate(azimuths)
for channel in components]
for target, azi in zip(targets, azimuths):
target.azimuth = azi
target.dazi = dazi
return targets, azimuths
def get_seismogram_array(
response, fmin=None, fmax=None,
component='R', envelope=False):
resp = response
assert len(resp.request.sources) == 1, 'more than one source in response'
tmin = None
tmax = None
traces = []
for _, target, tr in response.iter_results():
if target.codes[-1] != component:
continue
assert hasattr(target, 'azimuth')
assert target.dazi
if fmin and fmax:
tr.bandpass(2, fmin, fmax)
elif fmin:
tr.highpass(4, fmin)
elif fmax:
tr.lowpass(4, fmax)
tmin = min(tmin, tr.tmin) if tmin else tr.tmin
tmax = max(tmax, tr.tmax) if tmax else tr.tmax
traces.append(tr)
for tr in traces:
tr.extend(tmin, tmax, fillmethod='repeat')
if envelope:
tr.abshilbert()
data = num.array([tr.get_ydata() for tr in traces])
data -= data.mean()
nsamples = data.shape[1]
return data, num.linspace(tmin, tmax, nsamples)
def hillshade(array, azimuth, angle_altitude):
azimuth = 360.0 - azimuth
azi = azimuth * r2d
alt = angle_altitude * r2d
x, y = num.gradient(array)
slope = num.pi/2. - num.arctan(num.sqrt(x*x + y*y))
aspect = num.arctan2(-x, y)
shaded = num.sin(alt)*num.sin(slope) \
+ num.cos(alt)*num.cos(slope)*num.cos((azi - num.pi/2.) - aspect)
return (shaded + 1.)/2.
def hillshade_seismogram_array(
seismogram_array, rgba_map,
shad_lim=(.4, .98), contrast=1., blend_mode='multiply'):
assert blend_mode in ('multiply', 'screen'), 'unknown blend mode'
assert shad_lim[0] < shad_lim[1], 'bad shading limits'
from scipy.ndimage import convolve as im_conv
# Light source from somewhere above - psychologically the best choice
# from upper left
ramp = num.array([[1., 0.], [0., -1.]]) * contrast
# convolution of two 2D arrays
shad = im_conv(seismogram_array, ramp.T).ravel()
shad *= -1.
# if there are strong artifical edges in the data, shades get
# dominated by them. Cutting off the largest and smallest 2% of
# # shades helps
percentile2 = num.quantile(shad, 0.02)
percentile98 = num.quantile(shad, 0.98)
shad[shad > percentile98] = percentile98
shad[shad < percentile2] = percentile2
# # normalize shading
shad -= num.nanmin(shad)
shad /= num.nanmax(shad)
# # reduce range to balance gray color
shad *= shad_lim[1] - shad_lim[0]
shad += shad_lim[0]
if blend_mode == 'screen':
rgba_map[:, :3] = 1. - ((1. - rgba_map[:, :3])*(shad[:, num.newaxis]))
elif blend_mode == 'multiply':
rgba_map[:, :3] *= shad[:, num.newaxis]
return rgba_map
[docs]def plot_directivity(
engine, source, store_id,
distance=300*km, azi_begin=0., azi_end=360., dazi=1.,
phase_begin='first{stored:any_P}-10%',
phase_end='last{stored:any_S}+50',
quantity='displacement', envelope=False,
component='R', fmin=0.01, fmax=0.1,
hillshade=True, cmap=None,
plot_mt='full', show_phases=True, show_description=True,
reverse_time=False, show_nucleations=True, axes=None, nthreads=0):
'''Plot the directivity and radiation characteristics of source models
Synthetic seismic traces (R, T or Z) are forward-modelled at a defined
radius, covering the full or partial azimuthal range and projected on a
polar plot. Difference in the amplitude are enhanced by hillshading
the data.
:param engine: Forward modelling engine
:type engine: :py:class:`~pyrocko.gf.seismosizer.Engine`
:param source: Parametrized source model
:type source: :py:class:`~pyrocko.gf.seismosizer.Source`
:param store_id: Store ID used for forward modelling
:type store_id: str
:param distance: Distance in [m]
:type distance: float
:param azi_begin: Begin azimuth in [deg]
:type azi_begin: float
:param azi_end: End azimuth in [deg]
:type azi_end: float
:param dazi: Delta azimuth, bin size [deg]
:type dazi: float
:param phase_begin: Start time of the window
:type phase_begin: :py:class:`~pyrocko.gf.meta.Timing`
:param phase_end: End time of the window
:type phase_end: :py:class:`~pyrocko.gf.meta.Timing`
:param quantity: Seismogram quantity, default ``displacement``
:type quantity: str
:param envelope: Plot envelop instead of seismic trace
:type envelope: bool
:param component: Forward modelled component, default ``R``. Choose from
`RTZ`
:type component: str
:param fmin: Bandpass lower frequency [Hz], default ``0.01``
:type fmin: float
:param fmax: Bandpass upper frequency [Hz], default ``0.1``
:type fmax: float
:param hillshade: Enable hillshading, default ``True``
:type hillshade: bool
:param cmap: Matplotlit colormap to use, default ``seismic``.
When ``envelope`` is ``True`` the default colormap will be ``Reds``.
:type cmap: str
:param plot_mt: Plot a centered moment tensor, default ``full``.
Choose from ``full, deviatoric, dc or False``
:type plot_mt: str, bool
:param show_phases: Show annotations, default ``True``
:type show_phases: bool
:param show_description: Show desciption, default ``True``
:type show_description: bool
:param reverse_time: Reverse time axis. First phases arrive at the center,
default ``False``
:type reverse_time: bool
:param show_nucleations: Show nucleation piercing points on the moment
tensor, default ``True``
:type show_nucleations: bool
:param axes: Give axes to plot into
:type axes: :py:class:`matplotlib.axes.Axes`
:param nthreads: Number of threads used for forward modelling,
default ``0`` - all available cores
:type nthreads: int
'''
if axes is None:
fig = plt.figure()
ax = fig.add_subplot(111, polar=True)
else:
fig = axes.figure
ax = axes
if envelope and cmap is None:
cmap = 'Reds'
elif cmap is None:
cmap = 'seismic'
targets, azimuths = get_azimuthal_targets(
store_id, source, distance, azi_begin, azi_end, dazi,
components='R', quantity=quantity)
store = engine.get_store(store_id)
mt = source.pyrocko_moment_tensor(store=store, target=targets[0])
resp = engine.process(source, targets, nthreads=nthreads)
data, times = get_seismogram_array(
resp, fmin, fmax,
component=component, envelope=envelope)
timing_begin = Timing(phase_begin)
timing_end = Timing(phase_end)
src_depth = source.depth
src_distance = distance
anch_x, anch_y = map_anchor[source.anchor]
if hasattr(source, 'nucleation_x') and hasattr(source, 'nucleation_y'):
try:
iter(source.nucleation_x)
nx = float(source.nucleation_x[0])
ny = float(source.nucleation_y[0])
except TypeError:
nx = source.nucleation_x
ny = source.nucleation_y
src_distance += nx * source.length/2.
src_depth += (ny - anch_y) * num.sin(source.dip*d2r) * source.width/2.
tbegin = store.t(timing_begin, (src_depth, src_distance))
tend = store.t(timing_end, (src_depth, src_distance))
tsel = num.logical_and(times >= tbegin, times <= tend)
data = data[:, tsel].T
times = times[tsel]
duration = times[-1] - times[0]
vmax = num.abs(data).max()
cmw = ScalarMappable(cmap=cmap)
cmw.set_array(data)
cmw.set_clim(-vmax, vmax)
if envelope:
cmw.set_clim(0., vmax)
ax.set_theta_zero_location("N")
ax.set_theta_direction(-1)
ax.set_rlabel_position(mt.strike2)
for line in ax.xaxis.get_gridlines():
line.set_zorder(10)
def r_fmt(v, p):
if v < tbegin or v > tend:
return ''
return '%g s' % v
ax.yaxis.set_major_formatter(FuncFormatter(r_fmt))
if reverse_time:
ax.set_rlim(times[0] - .3*duration, times[-1])
else:
ax.set_rlim(times[-1] + .3*duration, times[0])
ax.grid(zorder=20)
if isinstance(plot_mt, str):
mt_size = .15
beachball.plot_beachball_mpl(
mt, ax,
beachball_type=plot_mt, size=mt_size,
size_units='axes', color_t=(0.7, 0.4, 0.4),
position=(.5, .5), linewidth=1.)
if hasattr(source, 'nucleation_x') and hasattr(source, 'nucleation_y')\
and show_nucleations:
try:
iter(source.nucleation_x)
nucleation_x = source.nucleation_x
nucleation_y = source.nucleation_y
except TypeError:
nucleation_x = [source.nucleation_x]
nucleation_y = [source.nucleation_y]
for nx, ny in zip(nucleation_x, nucleation_y):
angle = float(num.arctan2(ny, nx))
rtp = num.array([[1., angle, (90.-source.strike)*d2r]])
points = beachball.numpy_rtp2xyz(rtp)
x, y = beachball.project(points, projection='lambert').T
x *= mt_size
y *= mt_size
ax.plot(x+.5, y+.5, 'x', ms=6, mew=2, mec='darkred', mfc='red',
transform=ax.transAxes, zorder=10)
mesh = ax.pcolormesh(
azimuths * d2r, times, data,
cmap=cmw.cmap, shading='gouraud', zorder=0)
if hillshade:
mesh.update_scalarmappable()
color = mesh.get_facecolor()
color = hillshade_seismogram_array(
data, color, shad_lim=(.85, 1.), blend_mode='multiply')
mesh.set_facecolor(color)
if show_phases:
_phase_begin = Timing(phase_begin)
_phase_end = Timing(phase_end)
for p in (_phase_begin, _phase_end):
p.offset = 0.
p.offset_is_slowness = False
p.offset_is_percent = False
tphase_first = store.t(_phase_begin, (source.depth, distance))
tphase_last = store.t(_phase_end, (source.depth, distance))
theta = num.linspace(0, 2*num.pi, 360)
tfirst = num.full_like(theta, tphase_first)
tlast = num.full_like(theta, tphase_last)
ax.plot(theta, tfirst, color='k', alpha=.3, lw=1.)
ax.plot(theta, tlast, color='k', alpha=.3, lw=1.)
ax.text(
num.pi*7/5, tphase_first, '|'.join(_phase_begin.phase_defs),
ha='left', color='k', fontsize='small')
ax.text(
num.pi*6/5, tphase_last, '|'.join(_phase_end.phase_defs),
ha='left', color='k', fontsize='small')
description = ('Component {component:s}\n'
'Distance {distance:g} km').format(
component=component, distance=distance / km)
if show_description:
if fmin and fmax:
description += '\nBandpass {fmin:g} - {fmax:g} Hz'.format(
fmin=fmin, fmax=fmax)
elif fmin:
description += '\nHighpass {fmin:g} Hz'.format(fmin=fmin)
elif fmax:
description += '\nLowpass {fmax:g} Hz'.format(fmax=fmax)
ax.text(
-.05, -.05, description,
fontsize='small',
ha='left', va='bottom', transform=ax.transAxes)
cbar_label = QUANTITY_LABEL[quantity]
if envelope:
cbar_label = 'Envelope ' + cbar_label
fig.colorbar(
cmw, ax=ax,
orientation='vertical', shrink=.8, pad=0.11,
label=cbar_label)
if axes is None:
plt.show()
return resp
__all__ = ['plot_directivity']
if __name__ == '__main__':
engine = LocalEngine(store_superdirs=['.'], use_config=True)
rect_source = RectangularSource(
depth=2.6*km,
strike=240.,
dip=76.6,
rake=-.4,
anchor='top',
nucleation_x=-.57,
nucleation_y=-.59,
velocity=2070.,
length=27*km,
width=9.4*km,
slip=1.4)
resp = plot_directivity(
engine, rect_source, 'crust2_ib',
dazi=5, component='R', quantity='displacement', envelope=True)