Coverage for /usr/local/lib/python3.11/dist-packages/pyrocko/ahfullgreen.py: 94%
95 statements
« prev ^ index » next coverage.py v6.5.0, created at 2024-03-07 11:54 +0000
« prev ^ index » next coverage.py v6.5.0, created at 2024-03-07 11:54 +0000
1# http://pyrocko.org - GPLv3
2#
3# The Pyrocko Developers, 21st Century
4# ---|P------/S----------~Lg----------
6'''
7Analytical solutions for elastic wave propagation in a homogeneous full-space.
8'''
10import math
11import logging
12import numpy as num
13from . import trace
14from .guts import Float, Object
15from . import ahfullgreen_ext as ext
17logger = logging.getLogger('pyrocko.fomosto.ahfullgreen')
20guts_prefix = 'pf'
23class AhfullgreenError(Exception):
24 pass
27def make_seismogram(
28 vp, vs, density, qp, qs, x, f, m6,
29 quantity, deltat, stf=None, wanted_components='ned',
30 want_far=True, want_intermediate=True, want_near=True,
31 npad_levelling=40, out_alignment=0.):
33 if stf is None:
34 stf = AhfullgreenSTFImpulse()
36 x = num.asarray(x, float)
37 f = num.asarray(f, float)
38 m6 = num.asarray(m6, float)
40 r = math.sqrt(num.sum(x**2))
42 tp = r / vp
43 ts = r / vs
45 if ts < tp:
46 raise AhfullgreenError('unsupported material properties: ts < tp')
48 tpad = stf.t_cutoff() or deltat * 10.
50 tstart = tp - tpad - npad_levelling * deltat
51 tstart = out_alignment + round((tstart - out_alignment) / deltat) * deltat
53 nt = trace.nextpow2(int(math.ceil(
54 (ts - tp + 2 * tpad + 2*npad_levelling * deltat) / deltat)))
56 nspec = nt // 2 + 1
58 specs = []
59 for component in 'ned':
60 if component in wanted_components:
61 specs.append(num.zeros(nspec, dtype=complex))
62 else:
63 specs.append(None)
65 oc_c = {
66 'displacement': 1, # treated in post processing
67 'velocity': 1,
68 'acceleration': 2}[quantity]
70 out_spec_delta = float(2.0 * math.pi / (nt*deltat))
71 out_spec_offset = 0.0
73 omega = out_spec_offset + out_spec_delta * num.arange(nspec)
74 coeffs_stf = stf(omega/(2.*math.pi)).astype(complex)
75 coeffs_stf *= num.exp(1.0j * omega * tstart)
77 omega_max = 2.0 * math.pi * 0.5 / deltat
78 omega_cut = omega_max * 0.75
79 icut = int(num.ceil((omega_cut - out_spec_offset) / out_spec_delta))
81 coeffs_stf[icut:] *= 0.5 + 0.5 * num.cos(
82 math.pi * num.minimum(
83 1.0, (omega[icut:] - omega_cut) / (omega_max - omega_cut)))
85 if num.all(x == 0.0):
86 logger.warn(
87 'Source and receiver are at the same position -> setting GF for '
88 'this combination to zero.')
89 else:
90 ext.add_seismogram(
91 float(vp), float(vs), float(density), float(qp), float(qs),
92 x, f, m6, oc_c, out_spec_delta, out_spec_offset,
93 specs[0], specs[1], specs[2],
94 want_far, want_intermediate, want_near)
96 outs = []
97 for i, component in enumerate('ned'):
98 if component not in wanted_components:
99 outs.append(None)
101 out = num.fft.irfft(coeffs_stf * specs[i], nt)
102 out /= deltat
103 assert out.size // 2 + 1 == specs[i].size
105 m1 = num.mean(
106 out[:npad_levelling] * num.linspace(1., 0., npad_levelling))
108 out -= m1 * 2.
110 if quantity == 'displacement':
111 out = num.cumsum(out) * deltat
113 outs.append(out)
115 outs_wanted = []
116 for component in wanted_components:
117 i = 'ned'.find(component)
118 if i != -1:
119 outs_wanted.append(outs[i])
120 else:
121 outs_wanted.append(None)
123 return tstart, outs_wanted
126def add_seismogram(
127 vp, vs, density, qp, qs, x, f, m6,
128 quantity, deltat, out_offset,
129 out_n, out_e, out_d, stf=None,
130 want_far=True, want_intermediate=True, want_near=True,
131 npad_levelling=40):
133 ns = [out.size for out in (out_n, out_e, out_d) if out is not None]
135 if not all(n == ns[0] for n in ns):
136 raise AhfullgreenError('Length of component arrays are not identical.')
138 n = ns[0]
140 wanted_components = ''.join(
141 (c if out is not None else '-')
142 for (out, c) in zip((out_n, out_e, out_d), 'ned'))
144 tstart, temps = make_seismogram(
145 vp, vs, density, qp, qs, x, f, m6,
146 quantity, deltat, stf=stf,
147 wanted_components=wanted_components,
148 want_far=want_far,
149 want_intermediate=want_intermediate,
150 want_near=want_near,
151 npad_levelling=npad_levelling, out_alignment=out_offset)
153 for i, out in enumerate((out_n, out_e, out_d)):
154 if out is None:
155 continue
157 temp = temps[i]
159 ntemp = temp.size
161 tmin = max(out_offset, tstart)
162 tmax = min(
163 out_offset + (n-1) * deltat,
164 tstart + (ntemp-1) * deltat)
166 def ind(t, t0):
167 return int(round((t-t0)/deltat))
169 out[ind(tmin, out_offset):ind(tmax, out_offset)+1] \
170 += temp[ind(tmin, tstart):ind(tmax, tstart)+1]
172 out[:ind(tmin, out_offset)] += 0.
173 out[ind(tmax, out_offset)+1:] += temp[ind(tmax, tstart)]
176class AhfullgreenSTF(Object):
177 pass
180class AhfullgreenSTFImpulse(AhfullgreenSTF):
182 def t_cutoff(self):
183 return None
185 def __call__(self, f):
186 return num.ones(f.size, dtype=complex)
189class AhfullgreenSTFGauss(AhfullgreenSTF):
191 tau = Float.T(default=1.0)
193 def t_cutoff(self):
194 return self.tau * 2.
196 def __call__(self, f):
197 omega = f * 2. * math.pi
199 return num.exp(-(omega**2 * self.tau**2 / 8.))