Coverage for /usr/local/lib/python3.11/dist-packages/grond/problems/double_sf/problem.py: 31%
108 statements
« prev ^ index » next coverage.py v6.5.0, created at 2025-04-03 09:31 +0000
« prev ^ index » next coverage.py v6.5.0, created at 2025-04-03 09:31 +0000
1# https://pyrocko.org/grond - GPLv3
2#
3# The Grond Developers, 21st Century
4import numpy as num
5import logging
7from pyrocko import gf, util
8from pyrocko.guts import String, Float, Dict, StringChoice
10from grond.meta import Forbidden, expand_template, Parameter, \
11 has_get_plot_classes
13from ..base import Problem, ProblemConfig
15guts_prefix = 'grond'
16logger = logging.getLogger('grond.problems.double_sf.problem')
17km = 1e3
18as_km = dict(scale_factor=km, scale_unit='km')
21class DoubleSFProblemConfig(ProblemConfig):
23 ranges = Dict.T(String.T(), gf.Range.T())
24 distance_min = Float.T(default=0.0)
25 force_directions = StringChoice.T(
26 choices=('off', 'unidirectional', 'counterdirectional'),
27 default='off')
29 def get_problem(self, event, target_groups, targets):
30 if event.depth is None:
31 event.depth = 0.
33 source = gf.SFSource.from_pyrocko_event(event)
34 source.stf = gf.HalfSinusoidSTF(duration=event.duration or 0.0)
36 base_source = gf.CombiSFSource(
37 name=source.name,
38 subsources=[
39 source.clone(name=''),
40 source.clone(name='')])
42 source.lat, source.lon = event.effective_latlon
44 subs = dict(
45 event_name=event.name,
46 event_time=util.time_to_str(event.time))
48 problem = DoubleSFProblem(
49 name=expand_template(self.name_template, subs),
50 base_source=base_source,
51 target_groups=target_groups,
52 targets=targets,
53 ranges=self.ranges,
54 distance_min=self.distance_min,
55 norm_exponent=self.norm_exponent,
56 force_directions=self.force_directions)
58 return problem
61@has_get_plot_classes
62class DoubleSFProblem(Problem):
64 problem_parameters = [
65 Parameter('time1', 's', label='Time'),
66 Parameter('north_shift1', 'm', label='Northing', **as_km),
67 Parameter('east_shift1', 'm', label='Easting', **as_km),
68 Parameter('depth1', 'm', label='Depth', **as_km),
69 Parameter('time2', 's', label='Time'),
70 Parameter('north_shift2', 'm', label='Northing', **as_km),
71 Parameter('east_shift2', 'm', label='Easting', **as_km),
72 Parameter('depth2', 'm', label='Depth', **as_km),
73 Parameter('fn1', 'N', label='$F_{n1}$'),
74 Parameter('fe1', 'N', label='$F_{e1}$'),
75 Parameter('fd1', 'N', label='$F_{d1}$'),
76 Parameter('fn2', 'N', label='$F_{n2}$'),
77 Parameter('fe2', 'N', label='$F_{e2}$'),
78 Parameter('fd2', 'N', label='$F_{d2}$'),
79 Parameter('duration1', 's', label='Duration 1'),
80 Parameter('duration2', 's', label='Duration 2')]
82 dependants = [
83 Parameter('force1', 'N', label='$||F_1||$'),
84 Parameter('force2', 'N', label='$||F_2||$')]
86 distance_min = Float.T(default=0.0)
87 force_directions = String.T()
89 def __init__(self, **kwargs):
90 Problem.__init__(self, **kwargs)
91 self.deps_cache = {}
93 def get_source(self, x):
94 d = self.get_parameter_dict(x)
96 sources = []
98 for i, subsource in enumerate(self.base_source.subsources):
99 p = {}
101 for k in subsource.keys():
102 key = '%s%d' % (k, i+1)
103 if key in d:
104 p[k] = float(
105 self.ranges[key].make_relative(subsource[k], d[key]))
107 sources.append(self.base_source.subsources[i].clone(**p))
109 sources[0].stf = gf.HalfSinusoidSTF(duration=float(d.duration1))
110 sources[1].stf = gf.HalfSinusoidSTF(duration=float(d.duration2))
112 return self.base_source.clone(subsources=sources)
114 def make_dependant(self, xs, pname):
115 cache = self.deps_cache
116 if xs.ndim == 1:
117 return self.make_dependant(xs[num.newaxis, :], pname)[0]
119 if pname not in self.dependant_names:
120 raise KeyError(pname)
122 y = num.zeros(xs.shape[0])
123 for i, x in enumerate(xs):
124 k = tuple(x.tolist())
125 if k not in cache:
126 source = self.get_source(x)
127 cache[k] = source
129 source = cache[k]
131 y[i] = getattr(source.subsources[int(pname[-1]) - 1], pname[:-1])
133 return y
135 def pack(self, source):
136 arr = self.get_parameter_array(source)
137 subsrcs = source.subsources
138 for ip, p in enumerate(self.parameters):
139 # if p.name == 'time':
140 # arr[ip] -= self.base_source.time
141 if p.name == 'duration1':
142 arr[ip] = subsrcs[0].stf.duration if subsrcs[0].stf else 0.0
143 if p.name == 'duration2':
144 arr[ip] = subsrcs[1].stf.duration if subsrcs[1].stf else 0.0
145 return arr
147 def preconstrain(self, x):
148 source = self.get_source(x)
149 if any(self.distance_min > source.distance_to(t)
150 for t in self.waveform_targets + self.phase_pick_targets):
151 raise Forbidden()
153 if self.force_directions == 'unidirectional':
154 force1 = source.subsources[0].force
155 force2 = source.subsources[1].force
157 ratio = force2 / force1
159 idx_fn2 = self.get_parameter_index('fn2')
160 idx_fe2 = self.get_parameter_index('fe2')
161 idx_fd2 = self.get_parameter_index('fd2')
163 x[idx_fn2] = source.subsources[0].fn * ratio
164 x[idx_fe2] = source.subsources[0].fe * ratio
165 x[idx_fd2] = source.subsources[0].fd * ratio
167 elif self.force_directions == 'counterdirectional':
168 force1 = source.subsources[0].force
169 force2 = source.subsources[1].force
171 ratio = force2 / force1
173 idx_fn2 = self.get_parameter_index('fn2')
174 idx_fe2 = self.get_parameter_index('fe2')
175 idx_fd2 = self.get_parameter_index('fd2')
177 x[idx_fn2] = -source.subsources[0].fn * ratio
178 x[idx_fe2] = -source.subsources[0].fe * ratio
179 x[idx_fd2] = -source.subsources[0].fd * ratio
181 return x
183 def get_dependant_bounds(self):
184 range_start = num.min([
185 (self.ranges['f{}1'.format(f)].start,
186 self.ranges['f{}2'.format(f)].start) for f in 'n e d'.split()],
187 axis=0)
189 range_stop = num.max([
190 (self.ranges['f{}1'.format(f)].stop,
191 self.ranges['f{}2'.format(f)].stop) for f in 'n e d'.split()],
192 axis=0)
194 force_range = (
195 -num.linalg.norm(range_start),
196 num.linalg.norm(range_stop))
198 out = [force_range, force_range]
200 return out
202 @classmethod
203 def get_plot_classes(cls):
204 from . import plot
205 plots = super(DoubleSFProblem, cls).get_plot_classes()
206 plots.extend([
207 plot.DoubleSFForcePlot,
208 plot.DoubleSFDecompositionPlot])
209 return plots
212__all__ = '''
213 DoubleSFProblem
214 DoubleSFProblemConfig
215'''.split()