# http://pyrocko.org - GPLv3
#
# The Pyrocko Developers, 21st Century
# ---|P------/S----------~Lg----------
from subprocess import check_call, CalledProcessError
import os
import os.path as op
import re
import logging
import tempfile
import shutil
import inspect
import numpy as num
from scipy.interpolate import RegularGridInterpolator as scrgi
from matplotlib import cm, pyplot as plt, patheffects
from matplotlib.ticker import FuncFormatter
from pyrocko import orthodrome as pod
from pyrocko.guts import Object
from pyrocko.plot import mpl_init, mpl_papersize, mpl_color, AutoScaler, gmtpy
from pyrocko.plot.automap import Map, NoTopo
from pyrocko.gf import PseudoDynamicRupture
from pyrocko.gf.seismosizer import map_anchor
from pyrocko.dataset.topo.tile import Tile
logger = logging.getLogger(__name__)
km = 1e3
d2m = 111180.
m2d = 1. / d2m
cm2inch = gmtpy.cm/gmtpy.inch
d2r = num.pi / 180.
r2d = 1. / d2r
def km_formatter(v, p):
return '%g' % (v / km)
def get_kwargs(cls):
sig = inspect.signature(cls.__init__)
kwargs = {}
if cls.T.cls == RuptureMap:
for p in Map.T.properties:
kwargs[p.name] = p._default
for par in sig.parameters.values():
if par.default is inspect._empty:
continue
kwargs[par.name] = par.default
return kwargs
def _save_grid(lats, lons, data, filename):
'''
Save lat-lon gridded data as gmt .grd file.
:param lats:
Grid latitude coordinates in [deg].
:type lats:
iterable
:param lons:
Grid longitude coordinates in [deg].
:type lons:
iterable
:param data:
Grid data of any kind.
:type data:
:py:class:`~numpy.ndarray`, ``(n_lons, n_lats)``
:param filename:
Filename of the written grid file.
:type filename:
str
'''
gmtpy.savegrd(lons, lats, data, filename=filename, naming='lonlat')
def _mplcmap_to_gmtcpt_code(mplcmap, steps=256):
'''
Get gmt readable R/G/A code from a given matplotlib colormap.
:param mplcmap:
Name of the demanded matplotlib colormap.
:type mplcmap:
str
:returns:
Series of comma seperate R/G/B values for gmtpy usage.
:rtype:
str
'''
cmap = cm.get_cmap(mplcmap)
rgbas = [cmap(i) for i in num.linspace(0, 255, steps).astype(num.int64)]
return ','.join(['%d/%d/%d' % (
c[0] * 255, c[1] * 255, c[2] * 255) for c in rgbas])
[docs]def make_colormap(
gmt,
vmin,
vmax,
C=None,
cmap=None,
space=False):
'''
Create gmt-readable colormap cpt file called my_<cmap>.cpt.
:param vmin:
Minimum value covered by the colormap.
:type vmin:
float
:param vmax:
Maximum value covered by the colormap.
:type vmax:
float
:param C:
Comma seperated R/G/B values for cmap definition.
:type C:
optional, str
:param cmap:
Name of the colormap. Colormap is stored as "my_<cmap>.cpt".
If name is equivalent to a matplotlib colormap, R/G/B strings are
extracted from this colormap.
:type cmap:
optional, str
:param space:
If ``True``, the range of the colormap is broadened below vmin
and above vmax.
:type space: optional, bool
'''
scaler = AutoScaler(mode='min-max')
scale = scaler.make_scale((vmin, vmax))
incr = scale[2]
margin = 0.
if vmin == vmax:
space = True
if space:
margin = incr
msg = ('Please give either a valid color code or a'
' valid matplotlib colormap name.')
if C is None and cmap is None:
raise ValueError(msg)
if C is None:
try:
C = _mplcmap_to_gmtcpt_code(cmap)
except ValueError:
raise ValueError(msg)
if cmap is None:
logger.warning(
'No colormap name given. Uses temporary filename instead')
cmap = 'temp_cmap'
return gmt.makecpt(
C=C,
D='o',
T='%g/%g/%g' % (
vmin - margin, vmax + margin, incr),
Z=True,
out_filename='my_%s.cpt' % cmap,
suppress_defaults=True)
[docs]def clear_temp(gridfiles=[], cpts=[]):
'''
Clear all temporary needed grid and colormap cpt files.
:param gridfiles:
List of all "...grd" files, which shall be deleted.
:type gridfiles:
optional, list
:param cpts:
Cmaps, whose corresponding "my_<cmap>.cpt" file shall be deleted.
:type cpts:
optional, list
'''
for fil in gridfiles:
try:
os.remove(fil)
except OSError:
continue
for fil in cpts:
try:
os.remove('my_%s.cpt' % fil)
except OSError:
continue
[docs]def xy_to_latlon(source, x, y):
'''
Convert x and y relative coordinates on extended ruptures into latlon.
:param source:
Extended source class, on which the given point is located.
:type source:
:py:class:`~pyrocko.gf.seismosizer.RectangularSource` or
:py:class:`~pyrocko.gf.seismosizer.PseudoDynamicRupture`
:param x:
Relative point coordinate along strike (range: -1:1).
:type x:
float or :py:class:`~numpy.ndarray`
:param y:
Relative downdip point coordinate (range: -1:1).
:type y:
float or :py:class:`~numpy.ndarray`
:returns:
Latitude and Longitude of the given point in [deg].
:rtype:
tuple of float
'''
s = source
ax, ay = map_anchor[s.anchor]
length, width = (x - ax) / 2. * s.length, (y - ay) / 2. * s.width
strike, dip = s.strike * d2r, s.dip * d2r
northing = num.cos(strike)*length - num.cos(dip) * num.sin(strike)*width
easting = num.sin(strike)*length + num.cos(dip) * num.cos(strike)*width
northing += source.north_shift
easting += source.east_shift
return pod.ne_to_latlon(s.lat, s.lon, northing, easting)
[docs]def xy_to_lw(source, x, y):
'''
Convert relative coordinates on extended ruptures into length and width.
:param source:
Extended source, on which the given points are located.
:type source:
:py:class:`~pyrocko.gf.seismosizer.RectangularSource` or
:py:class:`~pyrocko.gf.seismosizer.PseudoDynamicRupture`
:param x:
Relative point coordinates along strike (range: -1:1).
:type x:
float or :py:class:`~numpy.ndarray`
:param y:
Relative downdip point coordinates (range: -1:1).
:type y:
float or :py:class:`~numpy.ndarray`
:returns:
Length and downdip width of the given points from the anchor in [m].
:rtype:
tuple of float
'''
length, width = source.length, source.width
ax, ay = map_anchor[source.anchor]
lengths = (x - ax) / 2. * length
widths = (y - ay) / 2. * width
return lengths, widths
cbar_anchor = {
'center': 'MC',
'center_left': 'ML',
'center_right': 'MR',
'top': 'TC',
'top_left': 'TL',
'top_right': 'TR',
'bottom': 'BC',
'bottom_left': 'BL',
'bottom_right': 'BR'}
cbar_helper = {
'traction': {
'unit': 'MPa',
'factor': 1e-6},
'tx': {
'unit': 'MPa',
'factor': 1e-6},
'ty': {
'unit': 'MPa',
'factor': 1e-6},
'tz': {
'unit': 'MPa',
'factor': 1e-6},
'time': {
'unit': 's',
'factor': 1.},
'strike': {
'unit': '°',
'factor': 1.},
'dip': {
'unit': '°',
'factor': 1.},
'vr': {
'unit': 'km/s',
'factor': 1e-3},
'length': {
'unit': 'km',
'factor': 1e-3},
'width': {
'unit': 'km',
'factor': 1e-3}
}
fonttype = 'Helvetica'
c2disl = dict([('x', 0), ('y', 1), ('z', 2)])
def _make_gmt_conf(fontcolor, size):
'''
Update different gmt parameters depending on figure size and fontcolor.
:param fontcolor:
GMT readable colorcode or colorstring for the font.
:type fontcolor:
str
:param size:
Tuple of the figure size (width, height) in [cm].
:type size:
tuple of float
:returns:
Best fitting fontsize, GMT configuration parameter
:rtype:
float, dict
'''
color = fontcolor
fontsize = num.max(size)
font = '%gp,%s' % (fontsize, fonttype)
pen_size = fontsize / 16.
tick_size = num.min(size) / 200.
return fontsize, dict(
MAP_FRAME_PEN='%s' % color,
MAP_TICK_PEN_PRIMARY='%gp,%s' % (pen_size, color),
MAP_TICK_PEN_SECONDARY='%gp,%s' % (pen_size, color),
MAP_TICK_LENGTH_PRIMARY='%gc' % tick_size,
MAP_TICK_LENGTH_SECONDARY='%gc' % (tick_size * 3),
FONT_ANNOT_PRIMARY='%s-Bold,%s' % (font, color),
FONT_LABEL='%s-Bold,%s' % (font, color),
FONT_TITLE='%s-Bold,%s' % (font, color),
PS_CHAR_ENCODING='ISOLatin1+',
MAP_FRAME_TYPE='fancy',
FORMAT_GEO_MAP='D',
PS_PAGE_ORIENTATION='portrait',
MAP_GRID_PEN_PRIMARY='thinnest,%s' % color,
MAP_ANNOT_OBLIQUE='6')
[docs]class SourceError(Exception):
pass
[docs]class RuptureMap(Map):
'''
Map plotting of attributes and results of the
:py:class:`~pyrocko.gf.seismosizer.PseudoDynamicRupture`.
'''
def __init__(
self,
source=None,
fontcolor='darkslategrey',
width=20.,
height=14.,
margins=None,
color_wet=(216, 242, 254),
color_dry=(238, 236, 230),
topo_cpt_wet='light_sea_uniform',
topo_cpt_dry='light_land_uniform',
show_cities=False,
*args,
**kwargs):
size = (width, height)
fontsize, gmt_config = _make_gmt_conf(fontcolor, size)
if margins is None:
margins = [
fontsize * 0.15, num.min(size) / 200.,
num.min(size) / 200., fontsize * 0.05]
Map.__init__(self, *args, margins=margins, width=width, height=height,
gmt_config=gmt_config,
topo_cpt_dry=topo_cpt_dry, topo_cpt_wet=topo_cpt_wet,
color_wet=color_wet, color_dry=color_dry,
**kwargs)
if show_cities:
self.draw_cities()
self._source = source
self._fontcolor = fontcolor
self._fontsize = fontsize
self.axes_layout = 'WeSn'
@property
def size(self):
'''
Figure size in [cm].
'''
return (self.width, self.height)
@property
def font(self):
'''
Font style (size and type).
'''
return '%sp,%s' % (self._fontsize, fonttype)
@property
def source(self):
'''
PseudoDynamicRupture whose attributes are plotted.
Note, that source.patches attribute needs to be calculated in advance.
'''
if self._source is None:
raise SourceError('No source given. Please define it!')
if not isinstance(self._source, PseudoDynamicRupture):
raise SourceError('This function is only capable for a source of'
' type: %s' % type(PseudoDynamicRupture()))
if not self._source.patches:
raise TypeError('No source patches are defined. Please run'
'"source.discretize_patches()" on your source')
return self._source
@source.setter
def source(self, source):
self._source = source
def _get_topotile(self):
if self._dems is None:
self._setup()
try:
t, _ = self._get_topo_tile('land')
except NoTopo:
wesn = self.wesn
nx = int(num.ceil(
self.width * cm2inch * self.topo_resolution_max))
ny = int(num.ceil(
self.height * cm2inch * self.topo_resolution_max))
data = num.zeros((nx, ny))
t = Tile(wesn[0], wesn[2],
(wesn[1] - wesn[0]) / nx, (wesn[3] - wesn[2]) / ny,
data)
return t
def _patches_to_lw(self):
'''
Generate regular rect. length-width grid based on the patch distance.
Prerequesite is a regular grid of patches (constant lengths and widths)
Both coordinates are given relative to the source anchor point in [m]
The grid is extended from the patch centres to the edges of the source.
:returns:
Lengths along strike, widths downdip in [m].
:rtype:
:py:class:`~numpy.ndarray`, :py:class:`~numpy.ndarray`
'''
source = self.source
patches = source.patches
patch_l, patch_w = patches[0].length, patches[0].width
patch_lengths = num.concatenate((
num.array([0.]),
num.array([il*patch_l+patch_l/2. for il in range(source.nx)]),
num.array([patch_l * source.nx])))
patch_widths = num.concatenate((
num.array([0.]),
num.array([iw*patch_w+patch_w/2. for iw in range(source.ny)]),
num.array([patch_w * source.ny])))
ax, ay = map_anchor[source.anchor]
patch_lengths -= source.length * (ax + 1.) / 2.
patch_widths -= source.width * (ay + 1.) / 2.
return patch_lengths, patch_widths
def _xy_to_lw(self, x, y):
'''
Generate regular rect. length-width grid based on the xy coordinates.
Prerequesite is a regular grid with constant dx and dy. x and y are
relative coordinates on the rupture plane (range -1:1) along strike and
downdip.
Length and width are obtained relative to the source anchor point
in [m].
:returns:
Lengths along strike, widths downdip in [m].
:rtype:
:py:class:`~numpy.ndarray`, :py:class:`~numpy.ndarray`
'''
x, y = num.unique(x), num.unique(y)
dx, dy = x[1] - x[0], y[1] - y[0]
if any(num.abs(num.diff(x) - dx) >= 1e-6):
raise ValueError('Regular grid with constant spacing needed.'
' Please check the x coordinates.')
if any(num.abs(num.diff(y) - dy) >= 1e-6):
raise ValueError('Regular grid with constant spacing needed.'
' Please check the y coordinates.')
return xy_to_lw(self.source, x, y)
def _tile_to_lw(self, ref_lat, ref_lon,
north_shift=0., east_shift=0., strike=0.):
'''
Coordinate transformation from the topo tile grid into length-width.
The topotile latlon grid is rotated into the length width grid. The
width is defined here as its horizontal component. The resulting grid
is used for interpolation of grid data.
:param ref_lat:
Reference latitude, from which length-width relative coordinates
grid are calculated.
:type ref_lat:
float
:param ref_lon:
Reference longitude, from which length-width relative coordinates
grid are calculated.
:type ref_lon:
float
:param north_shift:
North shift of the reference point in [m].
:type north_shift:
optional, float
:param east_shift:
East shift of the reference point [m].
:type east_shift:
optional, float
:param strike:
striking of the length axis compared to the North axis in [deg].
:type strike:
optional, float
:returns:
Topotile grid nodes as array of length-width coordinates.
:rtype:
:py:class:`~numpy.ndarray`, ``(n_nodes, 2)``
'''
t = self._get_topotile()
grid_lats = t.y()
grid_lons = t.x()
meshgrid_lons, meshgrid_lats = num.meshgrid(grid_lons, grid_lats)
grid_northing, grid_easting = pod.latlon_to_ne_numpy(
ref_lat, ref_lon, meshgrid_lats.flatten(), meshgrid_lons.flatten())
grid_northing -= north_shift
grid_easting -= east_shift
strike *= d2r
sin, cos = num.sin(strike), num.cos(strike)
points_out = num.zeros((grid_northing.shape[0], 2))
points_out[:, 0] = -sin * grid_northing + cos * grid_easting
points_out[:, 1] = cos * grid_northing + sin * grid_easting
return points_out
def _prep_patch_grid_data(self, data):
'''
Extend patch data from patch centres to the outer source edges.
Patch data is always defined in the centre of the patches. For
interpolation the data is extended here to the edges of the rupture
plane.
:param data:
Patch wise data.
:type data:
:py:class:`~numpy.ndarray`
:returns:
Extended data array.
:rtype:
:py:class:`~numpy.ndarray`
'''
shape = (self.source.nx + 2, self.source.ny + 2)
data = data.reshape(self.source.nx, self.source.ny).copy()
data_new = num.zeros(shape)
data_new[1:-1, 1:-1] = data
data_new[1:-1, 0] = data[:, 0]
data_new[1:-1, -1] = data[:, -1]
data_new[0, 1:-1] = data[0, :]
data_new[-1, 1:-1] = data[-1, :]
for i, j in zip([-1, -1, 0, 0], [-1, 0, -1, 0]):
data_new[i, j] = data[i, j]
return data_new
def _regular_data_to_grid(self, lengths, widths, data, filename):
'''
Interpolate regular data onto topotile grid.
Regular gridded data is interpolated onto the latlon grid of the
topotile. It is then stored as a gmt-readable .grd-file.
:param lengths:
Grid coordinates along strike relative to anchor in [m].
:type lengths:
:py:class:`~numpy.ndarray`
:param widths:
Grid coordinates downdip relative to anchor in [m].
:type widths:
:py:class:`~numpy.ndarray`
:param data:
Data grid array.
:type data:
:py:class:`~numpy.ndarray`
:param filename:
Filename, where grid is stored.
:type filename:
str
'''
source = self.source
interpolator = scrgi(
(widths * num.cos(d2r * source.dip), lengths),
data.T,
bounds_error=False,
method='nearest')
points_out = self._tile_to_lw(
ref_lat=source.lat,
ref_lon=source.lon,
north_shift=source.north_shift,
east_shift=source.east_shift,
strike=source.strike)
t = self._get_topotile()
t.data = num.zeros_like(t.data, dtype=float)
t.data[:] = num.nan
t.data = interpolator(points_out).reshape(t.data.shape)
_save_grid(t.y(), t.x(), t.data, filename=filename)
[docs] def patch_data_to_grid(self, data, *args, **kwargs):
'''
Generate a grid file based on regular patch wise data.
:param data:
Patchwise grid data.
:type data:
:py:class:`~numpy.ndarray`
'''
lengths, widths = self._patches_to_lw()
data_new = self._prep_patch_grid_data(data)
self._regular_data_to_grid(lengths, widths, data_new, *args, **kwargs)
[docs] def xy_data_to_grid(self, x, y, data, *args, **kwargs):
'''
Generate a grid file based on gridded data using xy coordinates.
Convert a grid based on relative fault coordinates (range -1:1) along
strike (x) and downdip (y) into a .grd file.
:param x:
Relative point coordinate along strike (range: -1:1).
:type x:
float or :py:class:`~numpy.ndarray`
:param y:
Relative downdip point coordinate (range: -1:1).
:type y:
float or :py:class:`~numpy.ndarray`
:param data:
Patchwise grid data.
:type data:
:py:class:`~numpy.ndarray`
'''
lengths, widths = self._xy_to_lw(x, y)
self._regular_data_to_grid(
lengths, widths, data.reshape((lengths.shape[0], widths.shape[0])),
*args, **kwargs)
[docs] def draw_image(self, gridfile, cmap, cbar=True, **kwargs):
'''
Draw grid data as image and include, if whished, a colorbar.
:param gridfile:
File of the grid which shall be plotted.
:type gridfile:
str
:param cmap:
Name of the colormap, which shall be used. A .cpt-file
"my_<cmap>.cpt" must exist.
:type cmap:
str
:param cbar:
If ``True``, a colorbar corresponding to the grid data is
added. Keyword arguments are parsed to it.
:type cbar:
optional, bool
'''
self.gmt.grdimage(
gridfile,
C='my_%s.cpt' % cmap,
E='200',
Q=True,
n='+t0.0',
*self.jxyr)
if cbar:
self.draw_colorbar(cmap=cmap, **kwargs)
[docs] def draw_contour(
self,
gridfile,
contour_int,
anot_int,
angle=None,
unit='',
color='',
style='',
**kwargs):
'''
Draw grid data as contour lines.
:param gridfile:
File of the grid which shall be plotted.
:type gridfile:
str
:param contour_int:
Interval of contour lines in units of the gridfile.
:type contour_int:
float
:param anot_int:
Interval of labelled contour lines in units of the gridfile. Must
be a integer multiple of contour_int.
:type anot_int:
float
:param angle:
Rotation angle of the labels in [deg].
:type angle:
optional, float
:param unit:
Name of the unit in the grid file. It will be displayed behind the
label on labelled contour lines.
:type unit:
optional, str
:param color:
GMT readable color code or string of the contour lines.
:type color:
optional, str
:param style:
Line style of the contour lines. If not given, solid lines are
plotted.
:type style:
optional, str
'''
pen_size = self._fontsize / 40.
if not color:
color = self._fontcolor
a_string = '%g+f%s,%s+r%gc+u%s' % (
anot_int, self.font, color, pen_size*4, unit)
if angle:
a_string += '+a%g' % angle
c_string = '%g' % contour_int
if kwargs:
kwargs['A'], kwargs['C'] = a_string, c_string
else:
kwargs = dict(A=a_string, C=c_string)
if style:
style = ',' + style
args = ['-Wc%gp,%s%s+s' % (pen_size, color, style)]
self.gmt.grdcontour(
gridfile,
S='10',
W='a%gp,%s%s+s' % (pen_size*4, color, style),
*self.jxyr + args,
**kwargs)
[docs] def draw_colorbar(self, cmap, label='', anchor='top_right', **kwargs):
'''
Draw a colorbar based on a existing colormap.
:param cmap:
Name of the colormap, which shall be used. A .cpt-file
"my_<cmap>.cpt" must exist.
:type cmap:
str
:param label:
Title of the colorbar.
:type label:
optional, str
:param anchor:
Placement of the colorbar. Combine 'top', 'center' and 'bottom'
with 'left', None for middle and 'right'.
:type anchor:
optional, str
'''
if not kwargs:
kwargs = {}
if label:
kwargs['B'] = 'af+l%s' % label
kwargs['C'] = 'my_%s.cpt' % cmap
a_str = cbar_anchor[anchor]
w = self.width / 3.
h = w / 10.
lgap = rgap = w / 10.
bgap, tgap = h, h / 10.
dx, dy = 2.5 * lgap, 2. * tgap
if 'bottom' in anchor:
dy += 4 * h
self.gmt.psscale(
D='j%s+w%gc/%gc+h+o%gc/%gc' % (a_str, w, h, dx, dy),
F='+g238/236/230+c%g/%g/%g/%g' % (lgap, rgap, bgap, tgap),
*self.jxyr,
**kwargs)
[docs] def draw_vector(self, x_gridfile, y_gridfile, vcolor='', **kwargs):
'''
Draw vectors based on two grid files.
Two grid files for vector lengths in x and y need to be given. The
function calls gmt.grdvector. All arguments defined for this function
in gmt can be passed as keyword arguments. Different standard settings
are applied if not defined differently.
:param x_gridfile:
File of the grid defining vector lengths in x.
:type x_gridfile:
str
:param y_gridfile:
File of the grid defining vector lengths in y.
:type y_gridfile:
str
:param vcolor:
Vector face color as defined in "G" option.
:type vcolor:
str
'''
kwargs['S'] = kwargs.get('S', 'il1.')
kwargs['I'] = kwargs.get('I', 'x20')
kwargs['W'] = kwargs.get('W', '0.3p,%s' % 'black')
kwargs['Q'] = kwargs.get('Q', '4c+e+n5c+h1')
self.gmt.grdvector(
x_gridfile, y_gridfile,
G='%s' % 'lightgrey' if not vcolor else vcolor,
*self.jxyr,
**kwargs)
[docs] def draw_dynamic_data(self, data, **kwargs):
'''
Draw an image of any data gridded on the patches e.g dislocation.
:param data:
Patchwise grid data.
:type data:
:py:class:`~numpy.ndarray`
'''
plot_data = data
kwargs['cmap'] = kwargs.get('cmap', 'afmhot_r')
clim = kwargs.pop('clim', (plot_data.min(), plot_data.max()))
cpt = []
if not op.exists('my_%s.cpt' % kwargs['cmap']):
make_colormap(self.gmt, clim[0], clim[1],
cmap=kwargs['cmap'], space=False)
cpt = [kwargs['cmap']]
tmp_grd_file = 'tmpdata.grd'
self.patch_data_to_grid(plot_data, tmp_grd_file)
self.draw_image(tmp_grd_file, **kwargs)
clear_temp(gridfiles=[tmp_grd_file], cpts=cpt)
[docs] def draw_patch_parameter(self, attribute, **kwargs):
'''
Draw an image of a chosen patch attribute e.g traction.
:param attribute:
Patch attribute, which is plotted. All patch attributes can be
taken (see doc of :py:class:`~pyrocko.modelling.okada.OkadaSource`)
and also ``traction``, ``tx``, ``ty`` or ``tz`` to display the
length or the single components of the traction vector.
:type attribute:
str
'''
a = attribute
source = self.source
if a == 'traction':
data = num.linalg.norm(source.get_tractions(), axis=1)
elif a == 'tx':
data = source.get_tractions()[:, 0]
elif a == 'ty':
data = source.get_tractions()[:, 1]
elif a == 'tz':
data = source.get_tractions()[:, 2]
else:
data = source.get_patch_attribute(attribute)
factor = 1. if 'label' in kwargs else cbar_helper[a]['factor']
data *= factor
kwargs['label'] = kwargs.get(
'label',
'%s [%s]' % (a, cbar_helper[a]['unit']))
self.draw_dynamic_data(data, **kwargs)
[docs] def draw_time_contour(self, store, clevel=[], **kwargs):
'''
Draw high contour lines of the rupture front propgation time.
:param store:
Greens function store, which is used for time calculation.
:type store:
:py:class:`~pyrocko.gf.store.Store`
:param clevel:
List of times, for which contour lines are drawn.
:type clevel:
optional, list of float
'''
_, _, _, _, points_xy = self.source._discretize_points(store, cs='xyz')
_, _, times, _, _, _ = self.source.get_vr_time_interpolators(store)
scaler = AutoScaler(mode='0-max', approx_ticks=8)
scale = scaler.make_scale([num.min(times), num.max(times)])
if clevel:
if len(clevel) > 1:
kwargs['anot_int'] = num.min(num.diff(clevel))
else:
kwargs['anot_int'] = clevel[0]
kwargs['contour_int'] = kwargs['anot_int']
kwargs['L'] = '0/%g' % num.max(clevel)
kwargs['anot_int'] = kwargs.get('anot_int', scale[2] * 2.)
kwargs['contour_int'] = kwargs.get('contour_int', scale[2])
kwargs['unit'] = kwargs.get('unit', cbar_helper['time']['unit'])
kwargs['L'] = kwargs.get('L', '0/%g' % (num.max(times) + 1.))
kwargs['G'] = kwargs.get('G', 'n2/3c')
tmp_grd_file = 'tmpdata.grd'
self.xy_data_to_grid(points_xy[:, 0], points_xy[:, 1],
times, tmp_grd_file)
self.draw_contour(tmp_grd_file, **kwargs)
clear_temp(gridfiles=[tmp_grd_file], cpts=[])
[docs] def draw_points(self, lats, lons, symbol='point', size=None, **kwargs):
'''
Draw points at given locations.
:param lats:
Point latitude coordinates in [deg].
:type lats:
iterable
:param lons:
Point longitude coordinates in [deg].
:type lons:
iterable
:param symbol:
Define symbol of the points (``'star', 'circle', 'point',
'triangle'``) - default is ``point``.
:type symbol:
optional, str
:param size:
Size of the points in [points].
:type size:
optional, float
'''
sym_to_gmt = dict(
star='a',
circle='c',
point='p',
triangle='t')
lats = num.atleast_1d(lats)
lons = num.atleast_1d(lons)
if lats.shape[0] != lons.shape[0]:
raise IndexError('lats and lons do not have the same shape!')
if size is None:
size = self._fontsize / 3.
kwargs['S'] = kwargs.get('S', sym_to_gmt[symbol] + '%gp' % size)
kwargs['G'] = kwargs.get('G', gmtpy.color('scarletred2'))
kwargs['W'] = kwargs.get('W', '2p,%s' % self._fontcolor)
self.gmt.psxy(
in_columns=[lons, lats],
*self.jxyr,
**kwargs)
[docs] def draw_nucleation_point(self, **kwargs):
'''
Plot the nucleation point onto the map.
'''
nlat, nlon = xy_to_latlon(
self.source, self.source.nucleation_x, self.source.nucleation_y)
self.draw_points(nlat, nlon, **kwargs)
[docs] def draw_dislocation(self, time=None, component='', **kwargs):
'''
Draw dislocation onto map at any time.
For a given time (if ``time`` is ``None``, ``tmax`` is used) and given
component the patchwise dislocation is plotted onto the map.
:param time:
Time after origin, for which dislocation is computed. If ``None``,
``tmax`` is taken.
:type time:
optional, float
:param component:
Dislocation component, which shall be plotted: ``x`` along strike,
``y`` along updip, ``z`` normal. If ``None``, the
length of the dislocation vector is plotted.
:type component: optional, str
'''
disl = self.source.get_slip(time=time)
if component:
data = disl[:, c2disl[component]]
else:
data = num.linalg.norm(disl, axis=1)
kwargs['label'] = kwargs.get(
'label', 'u%s [m]' % (component))
self.draw_dynamic_data(data, **kwargs)
[docs] def draw_dislocation_contour(
self, time=None, component=None, clevel=[], **kwargs):
'''
Draw dislocation contour onto map at any time.
For a given time (if ``time`` is ``None``, ``tmax`` is used) and given
component the patchwise dislocation is plotted as contour onto the map.
:param time:
Time after origin, for which dislocation is computed. If ``None``,
``tmax`` is taken.
:type time:
optional, float
:param component:
Dislocation component, which shall be plotted: ``x`` along strike,
``y`` along updip, ``z`` normal``. If ``None``, the length of the
dislocation vector is plotted.
:type component: optional, str
:param clevel:
Times, for which contour lines are drawn.
:type clevel:
optional, list of float
'''
disl = self.source.get_slip(time=time)
if component:
data = disl[:, c2disl[component]]
else:
data = num.linalg.norm(disl, axis=1)
scaler = AutoScaler(mode='min-max', approx_ticks=7)
scale = scaler.make_scale([num.min(data), num.max(data)])
if clevel:
if len(clevel) > 1:
kwargs['anot_int'] = num.min(num.diff(clevel))
else:
kwargs['anot_int'] = clevel[0]
kwargs['contour_int'] = kwargs['anot_int']
kwargs['L'] = '%g/%g' % (
num.min(clevel) - kwargs['contour_int'],
num.max(clevel) + kwargs['contour_int'])
else:
kwargs['anot_int'] = kwargs.get('anot_int', scale[2] * 2.)
kwargs['contour_int'] = kwargs.get('contour_int', scale[2])
kwargs['L'] = kwargs.get('L', '%g/%g' % (
num.min(data) - 1., num.max(data) + 1.))
kwargs['unit'] = kwargs.get('unit', ' m')
kwargs['G'] = kwargs.get('G', 'n2/3c')
tmp_grd_file = 'tmpdata.grd'
self.patch_data_to_grid(data, tmp_grd_file)
self.draw_contour(tmp_grd_file, **kwargs)
clear_temp(gridfiles=[tmp_grd_file], cpts=[])
[docs] def draw_dislocation_vector(self, time=None, **kwargs):
'''
Draw vector arrows onto map indicating direction of dislocation.
For a given time (if ``time`` is ``None``, ``tmax`` is used)
and given component the dislocation is plotted as vectors onto the map.
:param time:
Time after origin [s], for which dislocation is computed. If
``None``, ``tmax`` is used.
:type time:
optional, float
'''
disl = self.source.get_slip(time=time)
p_strike = self.source.get_patch_attribute('strike') * d2r
p_dip = self.source.get_patch_attribute('dip') * d2r
disl_dh = num.cos(p_dip) * disl[:, 1]
disl_n = num.cos(p_strike) * disl[:, 0] + num.sin(p_strike) * disl_dh
disl_e = num.sin(p_strike) * disl[:, 0] - num.cos(p_strike) * disl_dh
tmp_grd_files = ['tmpdata_%s.grd' % c for c in ('n', 'e')]
self.patch_data_to_grid(disl_n, tmp_grd_files[0])
self.patch_data_to_grid(disl_e, tmp_grd_files[1])
self.draw_vector(
tmp_grd_files[1], tmp_grd_files[0],
**kwargs)
clear_temp(gridfiles=tmp_grd_files, cpts=[])
[docs] def draw_top_edge(self, **kwargs):
'''
Indicate rupture top edge on map.
'''
outline = self.source.outline(cs='latlondepth')
top_edge = outline[:2, :]
kwargs = kwargs or {}
kwargs['W'] = kwargs.get(
'W', '%gp,%s' % (self._fontsize / 10., gmtpy.color('orange3')))
self.gmt.psxy(
in_columns=[top_edge[:, 1], top_edge[:, 0]],
*self.jxyr,
**kwargs)
[docs]class RuptureView(Object):
'''
Plot of attributes and results of the
:py:class:`~pyrocko.gf.seismosizer.PseudoDynamicRupture`.
'''
_patches_to_lw = RuptureMap._patches_to_lw
def __init__(self, source=None, figsize=None, fontsize=None):
self._source = source
self._axes = None
self.figsize = figsize or mpl_papersize('halfletter', 'landscape')
self.fontsize = fontsize or 10
mpl_init(fontsize=self.fontsize)
self._fig = None
self._axes = None
self._is_1d = False
@property
def source(self):
'''
PseudoDynamicRupture whose attributes are plotted.
Note, that source.patches attribute needs to be calculated for
:type source: :py:class:`~pyrocko.gf.seismosizer.PseudoDynamicRupture`.
'''
if self._source is None:
raise SourceError('No source given. Please define it!')
if not isinstance(self._source, PseudoDynamicRupture):
raise SourceError('This function is only capable for a source of'
' type: %s' % type(PseudoDynamicRupture()))
if not self._source.patches:
raise TypeError('No source patches are defined. Please run'
'\"discretize_patches\" on your source')
return self._source
@source.setter
def source(self, source):
self._source = source
def _setup(
self,
title='',
xlabel='',
ylabel='',
aspect=1.,
spatial_plot=True,
**kwargs):
if self._fig is not None and self._axes is not None:
return
self._fig = plt.figure(figsize=self.figsize)
self._axes = self._fig.add_subplot(1, 1, 1, aspect=aspect)
ax = self._axes
if ax is not None:
ax.set_title(title)
ax.grid(alpha=.3)
ax.set_xlabel(xlabel)
ax.set_ylabel(ylabel)
if spatial_plot:
ax.xaxis.set_major_formatter(FuncFormatter(km_formatter))
ax.yaxis.set_major_formatter(FuncFormatter(km_formatter))
def _clear_all(self):
plt.cla()
plt.clf()
plt.close()
self._fig, self._axes = None, None
def _draw_scatter(self, x, y, *args, **kwargs):
default_kwargs = dict(
linewidth=0,
marker='o',
markerfacecolor=mpl_color('skyblue2'),
markersize=6.,
markeredgecolor=mpl_color('skyblue3'))
default_kwargs.update(kwargs)
if self._axes is not None:
self._axes.plot(x, y, *args, **default_kwargs)
def _draw_image(self, length, width, data, *args, **kwargs):
if self._axes is not None:
if 'extent' not in kwargs:
kwargs['extent'] = [
num.min(length), num.max(length),
num.max(width), num.min(width)]
im = self._axes.imshow(
data,
*args,
interpolation='none',
vmin=kwargs.get('clim', [None])[0],
vmax=kwargs.get('clim', [None, None])[1],
**kwargs)
del kwargs['extent']
if 'aspect' in kwargs:
del kwargs['aspect']
if 'clim' in kwargs:
del kwargs['clim']
if 'cmap' in kwargs:
del kwargs['cmap']
plt.colorbar(
im, *args, shrink=0.9, pad=0.03, aspect=15., **kwargs)
def _draw_contour(self, x, y, data, clevel=None, unit='', *args, **kwargs):
setup_kwargs = dict(
xlabel=kwargs.pop('xlabel', 'along strike [km]'),
ylabel=kwargs.pop('ylabel', 'down dip [km]'),
title=kwargs.pop('title', ''),
aspect=kwargs.pop('aspect', 1))
self._setup(**setup_kwargs)
if self._axes is not None:
if clevel is None:
scaler = AutoScaler(mode='min-max')
scale = scaler.make_scale([data.min(), data.max()])
clevel = num.arange(scale[0], scale[1] + scale[2], scale[2])
if not isinstance(clevel, num.ndarray):
clevel = num.array([clevel])
clevel = clevel[clevel < data.max()]
cont = self._axes.contour(
x, y, data, clevel, *args,
linewidths=1.5, **kwargs)
plt.setp(cont.collections, path_effects=[
patheffects.withStroke(linewidth=2.0, foreground="beige"),
patheffects.Normal()])
clabels = self._axes.clabel(
cont, clevel[::2], *args,
inline=1, fmt='%g' + '%s' % unit,
inline_spacing=15, rightside_up=True, use_clabeltext=True,
**kwargs)
plt.setp(clabels, path_effects=[
patheffects.withStroke(linewidth=1.25, foreground="beige"),
patheffects.Normal()])
[docs] def draw_points(self, length, width, *args, **kwargs):
'''
Draw a point onto the figure.
Args and kwargs can be defined according to
:py:func:`matplotlib.pyplot.scatter`.
:param length:
Point(s) coordinate on the rupture plane along strike relative to
the anchor point in [m].
:type length:
float, :py:class:`~numpy.ndarray`
:param width:
Point(s) coordinate on the rupture plane along downdip relative to
the anchor point in [m].
:type width:
float, :py:class:`~numpy.ndarray`
'''
if self._axes is not None:
kwargs['color'] = kwargs.get('color', mpl_color('scarletred2'))
kwargs['s'] = kwargs.get('s', 100.)
self._axes.scatter(length, width, *args, **kwargs)
[docs] def draw_dynamic_data(self, data, **kwargs):
'''
Draw an image of any data gridded on the patches e.g dislocation.
:param data:
Patchwise grid data.
:type data:
:py:class:`~numpy.ndarray`
'''
plot_data = data
anchor_x, anchor_y = map_anchor[self.source.anchor]
length, width = xy_to_lw(
self.source, num.array([-1., 1.]), num.array([-1., 1.]))
data = plot_data.reshape(self.source.nx, self.source.ny).T
kwargs['cmap'] = kwargs.get('cmap', 'afmhot_r')
setup_kwargs = dict(
xlabel='along strike [km]',
ylabel='down dip [km]',
title='',
aspect=1)
setup_kwargs.update(kwargs)
kwargs = {k: v for k, v in kwargs.items() if k not in
('xlabel', 'ylabel', 'title')}
self._setup(**setup_kwargs)
self._draw_image(length=length, width=width, data=data, **kwargs)
[docs] def draw_patch_parameter(self, attribute, **kwargs):
'''
Draw an image of a chosen patch attribute e.g traction.
:param attribute:
Patch attribute, which is plotted. All patch attributes can be
taken (see doc of :py:class:`~pyrocko.modelling.okada.OkadaSource`)
and also ``'traction', 'tx', 'ty', 'tz'`` to display the length
or the single components of the traction vector.
:type attribute:
str
'''
a = attribute
source = self.source
if a == 'traction':
data = num.linalg.norm(source.get_tractions(), axis=1)
elif a == 'tx':
data = source.get_tractions()[:, 0]
elif a == 'ty':
data = source.get_tractions()[:, 1]
elif a == 'tz':
data = source.get_tractions()[:, 2]
else:
data = source.get_patch_attribute(attribute)
factor = 1. if 'label' in kwargs else cbar_helper[a]['factor']
data *= factor
kwargs['label'] = kwargs.get(
'label',
'%s [%s]' % (a, cbar_helper[a]['unit']))
return self.draw_dynamic_data(data=data, **kwargs)
[docs] def draw_time_contour(self, store, clevel=[], **kwargs):
'''
Draw high resolution contours of the rupture front propgation time
:param store:
Greens function store, which is used for time calculation.
:type store:
:py:class:`~pyrocko.gf.store.Store`
:param clevel:
Levels of the contour lines. If no levels are given, they are
automatically computed based on ``tmin`` and ``tmax``.
:type clevel:
optional, list
'''
source = self.source
default_kwargs = dict(
colors='#474747ff'
)
default_kwargs.update(kwargs)
_, _, _, _, points_xy = source._discretize_points(store, cs='xyz')
_, _, _, _, time_interpolator, _ = source.get_vr_time_interpolators(
store)
times = time_interpolator.values
scaler = AutoScaler(mode='min-max', approx_ticks=8)
scale = scaler.make_scale([times.min(), times.max()])
if len(clevel) == 0:
clevel = num.arange(scale[0] + scale[2], scale[1], scale[2])
points_x = time_interpolator.grid[0]
points_y = time_interpolator.grid[1]
self._draw_contour(points_x, points_y, data=times.T,
clevel=clevel, unit='s', **default_kwargs)
[docs] def draw_nucleation_point(self, **kwargs):
'''
Draw the nucleation point onto the map.
'''
nuc_x, nuc_y = self.source.nucleation_x, self.source.nucleation_y
length, width = xy_to_lw(self.source, nuc_x, nuc_y)
self.draw_points(length, width, marker='o', **kwargs)
[docs] def draw_dislocation(self, time=None, component='', **kwargs):
'''
Draw dislocation onto map at any time.
For a given time (if ``time`` is ``None``, ``tmax`` is used)
and given component the patchwise dislocation is plotted onto the map.
:param time:
Time after origin [s], for which dislocation is computed.
If ``None``, ``tmax`` is taken.
:type time:
optional, float
:param component:
Dislocation component, which shall be plotted: ``x``
along strike, ``y`` along updip, ``z`` normal. If ``None``, the
length of the dislocation vector is plotted
:type component: optional, str
'''
disl = self.source.get_slip(time=time)
if component:
data = disl[:, c2disl[component]]
else:
data = num.linalg.norm(disl, axis=1)
kwargs['label'] = kwargs.get(
'label', 'u%s [m]' % (component))
self.draw_dynamic_data(data, **kwargs)
[docs] def draw_dislocation_contour(
self, time=None, component=None, clevel=[], **kwargs):
'''
Draw dislocation contour onto map at any time.
For a given time (if time is ``None``, ``tmax`` is used) and given
component the patchwise dislocation is plotted as contour onto the map.
:param time:
Time after origin, for which dislocation is computed. If
``None``, ``tmax`` is taken.
:type time:
optional, float
:param component:
Dislocation component, which shall be plotted. ``x``
along strike, ``y`` along updip, ``z`` - normal. If ``None``
is given, the length of the dislocation vector is plotted.
:type component:
optional, str
'''
disl = self.source.get_slip(time=time)
if component:
data = disl[:, c2disl[component]]
else:
data = num.linalg.norm(disl, axis=1)
data = data.reshape(self.source.ny, self.source.nx, order='F')
scaler = AutoScaler(mode='min-max', approx_ticks=7)
scale = scaler.make_scale([data.min(), data.max()])
if len(clevel) == 0:
clevel = num.arange(scale[0] + scale[2], scale[1], scale[2])
anchor_x, anchor_y = map_anchor[self.source.anchor]
length, width = self._patches_to_lw()
length, width = length[1:-1], width[1:-1]
kwargs['colors'] = kwargs.get('colors', '#474747ff')
self._setup(**kwargs)
self._draw_contour(
length, width, data=data, clevel=clevel, unit='m', **kwargs)
[docs] def draw_source_dynamics(
self, variable, store, deltat=None, *args, **kwargs):
'''
Display dynamic source parameter.
Fast inspection possibility for the cumulative moment and the source
time function approximation (assuming equal paths between different
patches and observation point - valid for an observation point in the
far field perpendicular to the source strike), so the cumulative moment
rate function.
:param variable:#
Dynamic parameter, which shall be plotted. Choose
between 'moment_rate' ('stf') or 'cumulative_moment' ('moment')
:type variable:
str
:param store:
Greens function store, whose store.config.deltat defines
the time increment between two parameter snapshots. If ``store`` is
not given, the time increment is defined is taken from ``deltat``.
:type store:
:py:class:`~pyrocko.gf.store.Store`
:param deltat:
Time increment between two parameter snapshots. If not
given, store.config.deltat is used to define ``deltat``.
:type deltat:
optional, float
'''
v = variable
data, times = self.source.get_moment_rate(store=store, deltat=deltat)
deltat = num.concatenate([(num.diff(times)[0], ), num.diff(times)])
if v in ('moment_rate', 'stf'):
name, unit = 'dM/dt', 'Nm/s'
elif v in ('cumulative_moment', 'moment'):
data = num.cumsum(data * deltat)
name, unit = 'M', 'Nm'
else:
raise ValueError('No dynamic data for given variable %s found' % v)
self._setup(xlabel='time [s]',
ylabel='%s / %.2g %s' % (name, data.max(), unit),
aspect='auto',
spatial_plot=False)
self._draw_scatter(times, data/num.max(data), *args, **kwargs)
self._is_1d = True
[docs] def draw_patch_dynamics(
self, variable, nx, ny, store=None, deltat=None, *args, **kwargs):
'''
Display dynamic boundary element / patch parameter.
Fast inspection possibility for different dynamic parameter for a
single patch / boundary element. The chosen parameter is plotted for
the chosen patch.
:param variable:
Dynamic parameter, which shall be plotted. Choose
between 'moment_rate' ('stf') or 'cumulative_moment' ('moment').
:type variable:
str
:param nx:
Patch index along strike (range: 0:source.nx - 1).
:type nx:
int
:param nx:
Patch index downdip (range: 0:source.ny - 1).
:type nx:
int
:param store:
Greens function store, whose store.config.deltat defines
the time increment between two parameter snapshots. If ``store`` is
not given, the time increment is defined is taken from ``deltat``.
:type store:
optional, :py:class:`~pyrocko.gf.store.Store`
:param deltat:
Time increment between two parameter snapshots. If not given,
store.config.deltat is used to define ``deltat``.
:type deltat:
optional, float
'''
v = variable
source = self.source
idx = nx * source.ny + nx
m = re.match(r'dislocation_([xyz])', v)
if v in ('moment_rate', 'cumulative_moment', 'moment', 'stf'):
data, times = source.get_moment_rate_patches(
store=store, deltat=deltat)
elif 'dislocation' in v or 'slip_rate' == v:
ddisloc, times = source.get_delta_slip(store=store, deltat=deltat)
if v in ('moment_rate', 'stf'):
data, times = source.get_moment_rate_patches(
store=store, deltat=deltat)
name, unit = 'dM/dt', 'Nm/s'
elif v in ('cumulative_moment', 'moment'):
data, times = source.get_moment_rate_patches(
store=store, deltat=deltat)
data = num.cumsum(data, axis=1)
name, unit = 'M', 'Nm'
elif v == 'slip_rate':
data, times = source.get_slip_rate(store=store, deltat=deltat)
name, unit = 'du/dt', 'm/s'
elif v == 'dislocation':
data, times = source.get_delta_slip(store=store, deltat=deltat)
data = num.linalg.norm(num.cumsum(data, axis=2), axis=1)
name, unit = 'du', 'm'
elif m:
data, times = source.get_delta_slip(store=store, deltat=deltat)
data = num.cumsum(data, axis=2)[:, c2disl[m.group(1)], :]
name, unit = 'du%s' % m.group(1), 'm'
else:
raise ValueError('No dynamic data for given variable %s found' % v)
self._setup(xlabel='time [s]',
ylabel='%s / %.2g %s' % (name, num.max(data), unit),
aspect='auto',
spatial_plot=False)
self._draw_scatter(times, data[idx, :]/num.max(data),
*args, **kwargs)
self._is_1d = True
def finalize(self):
if self._is_1d:
return
length, width = xy_to_lw(
self.source, num.array([-1., 1.]), num.array([1., -1.]))
self._axes.set_xlim(length)
self._axes.set_ylim(width)
def gcf(self):
self.finalize()
return self._fig
[docs] def save(self, filename, dpi=None):
'''
Save plot to file.
:param filename:
Filename and path, where the plot is stored.
:type filename:
str
:param dpi:
Resolution of the output plot in [dpi].
:type dpi:
int
'''
self.finalize()
try:
self._fig.savefig(fname=filename, dpi=dpi, bbox_inches='tight')
except TypeError:
self._fig.savefig(filename=filename, dpi=dpi, bbox_inches='tight')
self._clear_all()
[docs] def show_plot(self):
'''
Show plot.
'''
self.finalize()
plt.show()
self._clear_all()
[docs]def render_movie(fn_path, output_path, framerate=20):
'''
Generate a mp4 movie based on given png files using
`ffmpeg <https://ffmpeg.org>`_.
Render a movie based on a set of given .png files in fn_path. All files
must have a filename specified by ``fn_path`` (e.g. giving ``fn_path`` with
``/temp/f%04.png`` a valid png filename would be ``/temp/f0001.png``). The
files must have a numbering, indicating their order in the movie.
:param fn_path:
Path and fileformat specification of the input .png files.
:type fn_path:
str
:param output_path:
Path and filename of the output ``.mp4`` movie file.
:type output_path:
str
:param deltat:
Time between individual frames (``1 / framerate``) in [s].
:type deltat:
optional, float
'''
try:
check_call(['ffmpeg', '-loglevel', 'panic'])
except CalledProcessError:
pass
except (TypeError, IOError):
logger.warning(
'Package ffmpeg needed for movie rendering. Please install it '
'(e.g. on linux distr. via sudo apt-get ffmpeg) and retry.')
return False
try:
check_call([
'ffmpeg', '-loglevel', 'info', '-y',
'-framerate', '%g' % framerate,
'-i', fn_path,
'-vcodec', 'libx264',
'-preset', 'medium',
'-tune', 'animation',
'-pix_fmt', 'yuv420p',
'-movflags', '+faststart',
'-filter:v', "crop='floor(in_w/2)*2:floor(in_h/2)*2'",
'-crf', '15',
output_path])
return True
except CalledProcessError as e:
logger.warning(e)
return False
def render_gif(fn, output_path, loops=-1):
'''
Generate a gif based on a given mp4 using ffmpeg.
Render a gif based on a given .mp4 movie file in ``fn`` path.
:param fn:
Path and file name of the input .mp4 file.
:type fn:
str
:param output_path:
Path and filename of the output animated gif file.
:type output_path:
str
:param loops:
Number of gif repeats (loops). ``-1`` is not repetition, ``0``
infinite.
:type loops:
optional, integer
'''
try:
check_call(['ffmpeg', '-loglevel', 'panic'])
except CalledProcessError:
pass
except (TypeError, IOError):
logger.warning(
'Package ffmpeg needed for movie rendering. Please install it '
'(e.g. on linux distr. via sudo apt-get ffmpeg.) and retry.')
return False
try:
check_call([
'ffmpeg', '-hide_banner', '-loglevel', 'panic', '-y', '-i',
fn,
'-filter_complex', 'palettegen[v1];[0:v][v1]paletteuse',
'-loop', '%d' % loops,
output_path])
return True
except CalledProcessError as e:
logger.warning(e)
return False
[docs]def rupture_movie(
source,
store,
variable='dislocation',
draw_time_contours=False,
fn_path='.',
prefix='',
plot_type='map',
deltat=None,
framerate=None,
store_images=False,
render_as_gif=False,
gif_loops=-1,
**kwargs):
'''
Generate a movie based on a given source for dynamic parameter.
Create a MPEG-4 movie or gif of one of the following dynamic parameters
(``dislocation``, ``dislocation_x`` (along strike), ``dislocation_y``
(along updip), ``dislocation_z`` (normal), ``slip_rate``, ``moment_rate``).
If desired, the single snap shots can be stored as images as well.
``kwargs`` have to be given according to the chosen ``plot_type``.
:param source:
Pseudo dynamic rupture, for which the movie is produced.
:type source:
:py:class:`~pyrocko.gf.seismosizer.PseudoDynamicRupture`
:param store:
Greens function store, which is used for time calculation. If
``deltat`` is not given, it is taken from the store.config.deltat
:type store:
:py:class:`~pyrocko.gf.store.Store`
:param variable:
Dynamic parameter, which shall be plotted. Choose between
``dislocation``, ``dislocation_x`` (along strike), ``dislocation_y``
(along updip), ``dislocation_z`` (normal), ``slip_rate`` and
``moment_rate``, default ``dislocation``.
:type variable:
optional, str
:param draw_time_contours:
If ``True``, corresponding isochrones are drawn on the each plots.
:type draw_time_contours:
optional, bool
:param fn_path:
Absolut or relative path, where movie (and optional images) are stored.
:type fn_path:
optional, str
:param prefix:
File prefix used for the movie (and optional image) files.
:type prefix:
optional, str
:param plot_type:
Choice of plot type: ``map``, ``view`` (map plot using
:py:class:`~pyrocko.plot.dynamic_rupture.RuptureMap`
or plane view using
:py:class:`~pyrocko.plot.dynamic_rupture.RuptureView`).
:type plot_type:
optional, str
:param deltat:
Time between parameter snapshots. If not given, store.config.deltat is
used to define ``deltat``.
:type deltat:
optional, float
:param store_images:
Choice to store the single .png parameter snapshots in ``fn_path`` or
not.
:type store_images:
optional, bool
:param render_as_gif:
If ``True``, the movie is converted into a gif. If ``False``, the movie
is returned as mp4.
:type render_as_gif:
optional, bool
:param gif_loops:
If ``render_as_gif`` is ``True``, a gif with ``gif_loops`` number of
loops (repetitions) is returned. ``-1`` is no repetition, ``0``
infinite.
:type gif_loops:
optional, integer
'''
v = variable
assert plot_type in ('map', 'view')
if not source.patches:
source.discretize_patches(store, interpolation='multilinear')
if source.coef_mat is None:
source.calc_coef_mat()
prefix = prefix or v
deltat = deltat or store.config.deltat
framerate = max(framerate or int(1./deltat), 1)
if v == 'moment_rate':
data, times = source.get_moment_rate_patches(deltat=deltat)
name, unit = 'dM/dt', 'Nm/s'
elif 'dislocation' in v or 'slip_rate' == v:
ddisloc, times = source.get_delta_slip(deltat=deltat)
else:
raise ValueError('No dynamic data for given variable %s found' % v)
deltat = times[1] - times[0]
m = re.match(r'dislocation_([xyz])', v)
if m:
data = num.cumsum(ddisloc, axis=1)[:, :, c2disl[m.group(1)]]
name, unit = 'du%s' % m.group(1), 'm'
elif v == 'dislocation':
data = num.linalg.norm(num.cumsum(ddisloc, axis=1), axis=2)
name, unit = 'du', 'm'
elif v == 'slip_rate':
data = num.linalg.norm(ddisloc, axis=2) / deltat
name, unit = 'du/dt', 'm/s'
if plot_type == 'map':
plt_base = RuptureMap
elif plot_type == 'view':
plt_base = RuptureView
else:
raise AttributeError('invalid type: %s' % plot_type)
attrs_base = get_kwargs(plt_base)
kwargs_base = dict([k for k in kwargs.items() if k[0] in attrs_base])
kwargs_plt = dict([k for k in kwargs.items() if k[0] not in kwargs_base])
if 'clim' in kwargs_plt:
data = num.clip(data, kwargs_plt['clim'][0], kwargs_plt['clim'][1])
else:
kwargs_plt['clim'] = [num.min(data), num.max(data)]
if 'label' not in kwargs_plt:
vmax = num.max(num.abs(kwargs_plt['clim']))
data /= vmax
kwargs_plt['label'] = '%s / %.2g %s' % (name, vmax, unit)
kwargs_plt['clim'] = [i / vmax for i in kwargs_plt['clim']]
with tempfile.TemporaryDirectory(suffix='pyrocko') as temp_path:
fns_temp = [op.join(temp_path, 'f%09d.png' % (it + 1))
for it, _ in enumerate(times)]
fn_temp_path = op.join(temp_path, 'f%09d.png')
for it, (t, ft) in enumerate(zip(times, fns_temp)):
plot_data = data[:, it]
plt = plt_base(source=source, **kwargs_base)
plt.draw_dynamic_data(plot_data, **kwargs_plt)
plt.draw_nucleation_point()
if draw_time_contours:
plt.draw_time_contour(store, clevel=[t])
plt.save(ft)
fn_mp4 = op.join(temp_path, 'movie.mp4')
return_code = render_movie(
fn_temp_path,
output_path=fn_mp4,
framerate=framerate)
if render_as_gif and return_code:
render_gif(fn=fn_mp4, output_path=op.join(
fn_path, '%s_%s_gif.gif' % (prefix, plot_type)),
loops=gif_loops)
elif return_code:
shutil.move(fn_mp4, op.join(
fn_path, '%s_%s_movie.mp4' % (prefix, plot_type)))
else:
logger.error('ffmpeg failed. Exit')
if store_images:
fns = [op.join(fn_path, '%s_%s_%g.png' % (prefix, plot_type, t))
for t in times]
for ft, f in zip(fns_temp, fns):
shutil.move(ft, f)
__all__ = [
'make_colormap',
'clear_temp',
'xy_to_latlon',
'xy_to_lw',
'SourceError',
'RuptureMap',
'RuptureView',
'rupture_movie',
'render_movie']