# https://pyrocko.org - GPLv3
#
# The Pyrocko Developers, 21st Century
# ---|P------/S----------~Lg----------
import numpy as num
import logging
from pyrocko import moment_tensor as mt
import pyrocko.guts as guts
from pyrocko.guts import Float, String, Timestamp, Int
from pyrocko.model import Location
from pyrocko.modelling import okada_ext
from pyrocko.util import get_threadpool_limits
guts_prefix = 'modelling'
logger = logging.getLogger(__name__)
d2r = num.pi/180.
r2d = 180./num.pi
km = 1e3
[docs]class AnalyticalSource(Location):
name = String.T(
optional=True,
default='')
time = Timestamp.T(
default=0.,
help='source origin time',
optional=True)
vr = Float.T(
default=0.,
help='Rupture velocity',
optional=True)
@property
def northing(self):
return self.north_shift
@property
def easting(self):
return self.east_shift
clone = guts.clone
[docs]class AnalyticalRectangularSource(AnalyticalSource):
'''
Rectangular analytical source model
'''
strike = Float.T(
default=0.0,
help='strike direction in [deg], measured clockwise from north')
dip = Float.T(
default=90.0,
help='dip angle in [deg], measured downward from horizontal')
rake = Float.T(
default=0.0,
help='rake angle in [deg], '
'measured counter-clockwise from right-horizontal '
'in on-plane view')
al1 = Float.T(
default=0.,
help='Distance "left" side to source point [m]')
al2 = Float.T(
default=0.,
help='Distance "right" side to source point [m]')
aw1 = Float.T(
default=0.,
help='Distance "lower" side to source point [m]')
aw2 = Float.T(
default=0.,
help='Distance "upper" side to source point [m]')
slip = Float.T(
default=0.,
help='Slip on the rectangular source area [m]',
optional=True)
@property
def length(self):
return abs(-self.al1 + self.al2)
@property
def width(self):
return abs(-self.aw1 + self.aw2)
@property
def area(self):
return self.width * self.length
[docs]class OkadaSource(AnalyticalRectangularSource):
'''
Rectangular Okada source model
'''
opening = Float.T(
default=0.,
help='Opening of the plane in [m]',
optional=True)
poisson = Float.T(
default=0.25,
help='Poisson\'s ratio, default 0.25',
optional=True)
shearmod = Float.T(
default=32e9,
help='Shear modulus along the plane [Pa]',
optional=True)
@property
def lamb(self):
'''
Calculation of first Lame's parameter
According to Mueller (2007), the first Lame parameter lambda can be
determined from the formulation for the poisson ration :math:`\\nu`:
:math:`\\nu = \\frac{\\lambda}{2(\\lambda + \\mu)}`
with the shear modulus :math:`\\mu`
'''
return (2. * self.poisson * self.shearmod) / (1. - 2*self.poisson)
@property
def seismic_moment(self):
'''
Scalar Seismic moment
Code copied from Kite
Disregarding the opening (as for now)
We assume a shear modulus of :math:`\\mu = 36 \\mathrm{GPa}`
and :math:`M_0 = mu A D`
.. important ::
We assume a perfect elastic solid with :math:`K=\\frac{5}{3}\\mu`
Through :math:`\\mu = \\frac{3K(1-2\\nu)}{2(1+\\nu)}` this leads to
:math:`\\mu = \\frac{8(1+\\nu)}{1-2\\nu}`
:return: Seismic moment release
:rtype: float
'''
if self.shearmod:
mu = self.shearmod
elif self.poisson:
self.shearmod = (8. * (1. + self.poisson)) / (1. - 2*self.poisson)
mu = self.shearmod
else:
raise ValueError(
'Shear modulus or poisson ratio needed for moment calculation')
disl = 0.
if self.slip:
disl = (disl**2 + self.slip**2)**.5
if self.opening:
disl = (disl**2 + self.opening**2)**.5
return mu * self.area * disl
@property
def moment_magnitude(self):
''' Moment magnitude from Seismic moment
We assume :math:`M_\\mathrm{w} = {\\frac{2}{3}}\\log_{10}(M_0) - 10.7`
:returns: Moment magnitude
:rtype: float
'''
return mt.moment_to_magnitude(self.seismic_moment)
[docs] def source_patch(self):
''' Build source information array for okada_ext.okada input
:return: array of the source data as input for okada_ext.okada
:rtype: :py:class:`numpy.ndarray`, ``(1, 9)``
'''
return num.array([
self.northing,
self.easting,
self.depth,
self.strike,
self.dip,
self.al1,
self.al2,
self.aw1,
self.aw2])
[docs] def source_disloc(self):
''' Build source dislocation for okada_ext.okada input
:return: array of the source dislocation data as input for
okada_ext.okada
:rtype: :py:class:`numpy.ndarray`, ``(1, 3)``
'''
return num.array([
num.cos(self.rake * d2r) * self.slip,
num.sin(self.rake * d2r) * self.slip,
self.opening])
[docs] def discretize(self, nlength, nwidth, *args, **kwargs):
''' Discretize the given fault by ``nlength * nwidth`` fault patches
Discretizing the fault into several sub faults. ``nlength`` is
number of points in strike direction, ``nwidth`` in down dip direction
along the fault. Fault orientation, slip and elastic parameters are
kept.
:param nlength: Number of discrete points in faults strike direction
:type nlength: int
:param nwidth: Number of discrete points in faults down-dip direction
:type nwidth: int
:return: Discrete fault patches
:rtype: list of :py:class:`pyrocko.modelling.okada.OkadaSource` objects
'''
assert nlength > 0
assert nwidth > 0
il = num.repeat(num.arange(nlength), nwidth)
iw = num.tile(num.arange(nwidth), nlength)
patch_length = self.length / nlength
patch_width = self.width / nwidth
al1 = -patch_length / 2.
al2 = patch_length / 2.
aw1 = -patch_width / 2.
aw2 = patch_width / 2.
source_points = num.zeros((nlength * nwidth, 3))
source_points[:, 0] = il * patch_length + patch_length / 2.
source_points[:, 1] = iw * patch_width + patch_width / 2.
source_points[:, 0] += self.al1
source_points[:, 1] -= self.aw2
rotmat = num.asarray(
mt.euler_to_matrix(self.dip*d2r, self.strike*d2r, 0.))
source_points_rot = num.dot(rotmat.T, source_points.T).T
source_points_rot[:, 0] += self.northing
source_points_rot[:, 1] += self.easting
source_points_rot[:, 2] += self.depth
kwargs = {
prop: getattr(self, prop) for prop in self.T.propnames
if prop not in [
'north_shift', 'east_shift', 'depth',
'al1', 'al2', 'aw1', 'aw2']}
return (
[OkadaPatch(
parent=self,
ix=src_point[0],
iy=src_point[1],
north_shift=coord[0],
east_shift=coord[1],
depth=coord[2],
al1=al1, al2=al2, aw1=aw1, aw2=aw2, **kwargs)
for src_point, coord in zip(source_points, source_points_rot)],
source_points)
class OkadaPatch(OkadaSource):
ix = Int.T(help='Relative index of the patch in x')
iy = Int.T(help='Relative index of the patch in y')
def __init__(self, *args, parent=None, **kwargs):
super().__init__(*args, **kwargs)
self.parent = parent
[docs]class DislocationInverter(object):
'''
Toolbox for Boundary Element Method (BEM) and dislocation inversion based
on okada_ext.okada
'''
[docs] @staticmethod
def get_coef_mat(source_patches_list, pure_shear=False,
rotate_sdn=True, nthreads=1):
'''
Build coefficient matrix for given source_patches
The BEM for a fault and the determination of the slip distribution from
the stress drop is based on the relation
:math:`stress = coefmat \\cdot displ`.
Here the coefficient matrix is build and filled based on the
okada_ext.okada displacements and partial displacement
differentiations.
:param source_patches_list: list of all OkadaSources, which shall be
used for BEM
:type source_patches_list: list of
:py:class:`pyrocko.modelling.okada.OkadaSource`
:param pure_shear: Shall only shear forces be taken into account,
or additionally include opening, default False.
:type pure_shear: optional, bool
:param rotate_sdn: Rotation towards strike, dip, normal, default True.
:type rotate_sdn: optional, bool
:param nthreads: Number of threads, default 1
:type nthreads: optional, int
:return: coefficient matrix for all sources
:rtype: :py:class:`numpy.ndarray`,
``(len(source_patches_list) * 3, len(source_patches_list) * 3)``
'''
source_patches = num.array([
src.source_patch() for src in source_patches_list])
receiver_coords = source_patches[:, :3].copy()
npoints = len(source_patches_list)
if pure_shear:
n_eq = 2
else:
n_eq = 3
coefmat = num.zeros((npoints * 3, npoints * 3))
lambda_mean = num.mean([src.lamb for src in source_patches_list])
mu_mean = num.mean([src.shearmod for src in source_patches_list])
unit_disl = 1.
disl_cases = {
'strikeslip': {
'slip': unit_disl,
'opening': 0.,
'rake': 0.},
'dipslip': {
'slip': unit_disl,
'opening': 0.,
'rake': 90.},
'tensileslip': {
'slip': 0.,
'opening': unit_disl,
'rake': 0.}
}
diag_ind = [0, 4, 8]
kron = num.zeros(9)
kron[diag_ind] = 1.
kron = kron[num.newaxis, num.newaxis, :]
for idisl, case_type in enumerate([
'strikeslip', 'dipslip', 'tensileslip'][:n_eq]):
case = disl_cases[case_type]
source_disl = num.array([
case['slip'] * num.cos(case['rake'] * d2r),
case['slip'] * num.sin(case['rake'] * d2r),
case['opening']])
results = okada_ext.okada(
source_patches,
num.tile(source_disl, npoints).reshape(-1, 3),
receiver_coords,
lambda_mean,
mu_mean,
nthreads=nthreads,
rotate_sdn=rotate_sdn,
stack_sources=False)
eps = 0.5 * (results[:, :, 3:] +
results[:, :, (3, 6, 9, 4, 7, 10, 5, 8, 11)])
dilatation = eps[:, :, diag_ind].sum(axis=-1)[:, :, num.newaxis]
stress_sdn = kron*lambda_mean*dilatation + 2.*mu_mean*eps
coefmat[:, idisl::3] = stress_sdn[:, :, (2, 5, 8)]\
.reshape(-1, npoints*3).T
if pure_shear:
coefmat[2::3, :] = 0.
return -coefmat / unit_disl
[docs] @staticmethod
def get_coef_mat_single(source_patches_list, pure_shear=False,
rotate_sdn=True, nthreads=1):
'''
Build coefficient matrix for given source_patches
The BEM for a fault and the determination of the slip distribution from
the stress drop is based on the relation
:math:`stress = coefmat \\cdot displ`.
Here the coefficient matrix is build and filled based on the
okada_ext.okada displacements and partial displacement
differentiations.
:param source_patches_list: list of all OkadaSources, which shall be
used for BEM
:type source_patches_list: list of
:py:class:`pyrocko.modelling.okada.OkadaSource`
:param pure_shear: Flag, if also opening mode shall be taken into
account (False) or the fault is described as pure shear (True).
:type pure_shear: optional, bool
:param rotate_sdn: Rotation towards strike, dip, normal, default True.
:type rotate_sdn: optional, bool
:param nthreads: Number of threads, default 1
:type nthreads: optional, int
:return: coefficient matrix for all sources
:rtype: :py:class:`numpy.ndarray`,
``(len(source_patches_list) * 3, len(source_patches_list) * 3)``
'''
source_patches = num.array([
src.source_patch() for src in source_patches_list])
receiver_coords = source_patches[:, :3].copy()
npoints = len(source_patches_list)
if pure_shear:
n_eq = 2
else:
n_eq = 3
coefmat = num.zeros((npoints * 3, npoints * 3))
lambda_mean = num.mean([src.lamb for src in source_patches_list])
mu_mean = num.mean([src.shearmod for src in source_patches_list])
unit_disl = 1.
disl_cases = {
'strikeslip': {
'slip': unit_disl,
'opening': 0.,
'rake': 0.},
'dipslip': {
'slip': unit_disl,
'opening': 0.,
'rake': 90.},
'tensileslip': {
'slip': 0.,
'opening': unit_disl,
'rake': 0.}
}
diag_ind = [0, 4, 8]
kron = num.zeros(9)
kron[diag_ind] = 1.
kron = kron[num.newaxis, :]
for idisl, case_type in enumerate([
'strikeslip', 'dipslip', 'tensileslip'][:n_eq]):
case = disl_cases[case_type]
source_disl = num.array([
case['slip'] * num.cos(case['rake'] * d2r),
case['slip'] * num.sin(case['rake'] * d2r),
case['opening']])
for isrc, source in enumerate(source_patches):
results = okada_ext.okada(
source[num.newaxis, :],
source_disl[num.newaxis, :],
receiver_coords,
lambda_mean,
mu_mean,
nthreads=nthreads,
rotate_sdn=rotate_sdn)
eps = \
0.5 * (
results[:, 3:] +
results[:, (3, 6, 9, 4, 7, 10, 5, 8, 11)])
dilatation = num.sum(eps[:, diag_ind], axis=1)[:, num.newaxis]
stress_sdn = kron * lambda_mean * dilatation+2. * mu_mean * eps
coefmat[:, isrc*3 + idisl] = stress_sdn[:, (2, 5, 8)].ravel()
if pure_shear:
coefmat[2::3, isrc * 3 + idisl] = 0.
return -coefmat / unit_disl
[docs] @staticmethod
def get_coef_mat_slow(source_patches_list, pure_shear=False,
rotate_sdn=True, nthreads=1):
'''
Build coefficient matrix for given source_patches (Slow version)
The BEM for a fault and the determination of the slip distribution from
the stress drop is based on the relation
:math:`stress = coefmat \\cdot displ`.
Here the coefficient matrix is build and filled based on the
okada_ext.okada displacements and partial displacement
differentiations.
:param source_patches_list: list of all OkadaSources, which shall be
used for BEM
:type source_patches_list: list of
:py:class:`pyrocko.modelling.okada.OkadaSource`
:param pure_shear: Flag, if also opening mode shall be taken into
account (False) or the fault is described as pure shear (True).
:type pure_shear: optional, bool
:param rotate_sdn: Rotation towards strike, dip, normal, default True.
:type rotate_sdn: optional, bool
:param nthreads: Number of threads, default 1
:type nthreads: optional, int
:return: coefficient matrix for all sources
:rtype: :py:class:`numpy.ndarray`,
``(source_patches_list.shape[0] * 3,
source_patches_list.shape[0] * 3(2))``
'''
source_patches = num.array([
src.source_patch() for src in source_patches_list])
receiver_coords = source_patches[:, :3].copy()
npoints = len(source_patches_list)
if pure_shear:
n_eq = 2
else:
n_eq = 3
coefmat = num.zeros((npoints * 3, npoints * 3))
def ned2sdn_rotmat(strike, dip):
rotmat = mt.euler_to_matrix(
(dip + 180.) * d2r, strike * d2r, 0.).A
return rotmat
lambda_mean = num.mean([src.lamb for src in source_patches_list])
shearmod_mean = num.mean([src.shearmod for src in source_patches_list])
unit_disl = 1.
disl_cases = {
'strikeslip': {
'slip': unit_disl,
'opening': 0.,
'rake': 0.},
'dipslip': {
'slip': unit_disl,
'opening': 0.,
'rake': 90.},
'tensileslip': {
'slip': 0.,
'opening': unit_disl,
'rake': 0.}
}
for idisl, case_type in enumerate([
'strikeslip', 'dipslip', 'tensileslip'][:n_eq]):
case = disl_cases[case_type]
source_disl = num.array([
case['slip'] * num.cos(case['rake'] * d2r),
case['slip'] * num.sin(case['rake'] * d2r),
case['opening']])
for isource, source in enumerate(source_patches):
results = okada_ext.okada(
source[num.newaxis, :].copy(),
source_disl[num.newaxis, :].copy(),
receiver_coords,
lambda_mean,
shearmod_mean,
nthreads=nthreads,
rotate_sdn=rotate_sdn)
for irec in range(receiver_coords.shape[0]):
eps = num.zeros((3, 3))
for m in range(3):
for n in range(3):
eps[m, n] = 0.5 * (
results[irec][m * 3 + n + 3] +
results[irec][n * 3 + m + 3])
stress = num.zeros((3, 3))
dilatation = num.sum([eps[i, i] for i in range(3)])
for m, n in zip([0, 0, 0, 1, 1, 2], [0, 1, 2, 1, 2, 2]):
if m == n:
stress[m, n] = \
lambda_mean * \
dilatation + \
2. * shearmod_mean * \
eps[m, n]
else:
stress[m, n] = \
2. * shearmod_mean * \
eps[m, n]
stress[n, m] = stress[m, n]
normal = num.array([0., 0., -1.])
for isig in range(3):
tension = num.sum(stress[isig, :] * normal)
coefmat[irec * n_eq + isig, isource * n_eq + idisl] = \
tension / unit_disl
return coefmat
[docs] @staticmethod
def get_disloc_lsq(
stress_field,
coef_mat=None,
source_list=None,
pure_shear=False,
epsilon=None,
nthreads=1,
**kwargs):
'''
Least square inversion to get displacement from stress
Follows approach for Least-Square Inversion published in Menke (1989)
to calculate displacements on a fault with several segments from a
given stress field. If not done, the coefficient matrix is determined
within the code.
:param stress_field: Array containing the stress change [Pa] for each
source patch (order: [
src1 dstress_Strike, src1 dstress_Dip, src1 dstress_Tensile,
src2 dstress_Strike, ...])
:type stress_field: :py:class:`numpy.ndarray`, ``(n_sources * 3, )``
:param coef_mat: Coefficient matrix to connect source patches
displacement and the resulting stress field
:type coef_mat: optional, :py:class:`numpy.ndarray`,
``(source_patches_list.shape[0] * 3,
source_patches.shape[] * 3(2)``
:param source_list: list of all OkadaSources, which shall be
used for BEM
:type source_list: optional, list of
:py:class:`pyrocko.modelling.okada.OkadaSource`
:param epsilon: regularize small values from the coefficient matrix,
default None.
:type epsilon: optional, float
:param nthreads: number of threads for the least squares inversion,
default 1
:type nthreads: optional, int
:return: inverted displacements (u_strike, u_dip , u_tensile) for each
source patch. order: [
[patch1 u_Strike, patch1 u_Dip, patch1 u_Tensile],
[patch2 u_Strike, patch2 u_Dip, patch2 u_Tensile],
...]
:rtype: :py:class:`numpy.ndarray`, ``(n_sources, 3)``
'''
if source_list is not None and coef_mat is None:
coef_mat = DislocationInverter.get_coef_mat(
source_list, pure_shear=pure_shear, nthreads=nthreads,
**kwargs)
if epsilon is not None:
coef_mat[coef_mat < epsilon] = 0.
idx = num.arange(0, coef_mat.shape[0])
if pure_shear:
idx = idx[idx % 3 != 2]
coef_mat_in = coef_mat[idx, :][:, idx]
disloc_est = num.zeros(coef_mat.shape[0])
if stress_field.ndim == 2:
stress_field = stress_field.ravel()
threadpool_limits = get_threadpool_limits()
with threadpool_limits(limits=nthreads, user_api='blas'):
try:
disloc_est[idx] = num.linalg.multi_dot([
num.linalg.inv(num.dot(coef_mat_in.T, coef_mat_in)),
coef_mat_in.T,
stress_field[idx]])
except num.linalg.LinAlgError as e:
logger.warning('Linear inversion failed!')
logger.warning(
'coef_mat: %s\nstress_field: %s',
coef_mat_in, stress_field[idx])
raise e
return disloc_est.reshape(-1, 3)
__all__ = [
'AnalyticalSource',
'AnalyticalRectangularSource',
'OkadaSource',
'DislocationInverter']