1import numpy as num
2import logging
4from pyrocko import gf, util
5from pyrocko.guts import String, Float, Dict, StringChoice, Int
7from grond.meta import expand_template, Parameter, has_get_plot_classes
9from ..base import Problem, ProblemConfig
11guts_prefix = 'grond'
12logger = logging.getLogger('grond.problems.singleforce.problem')
13km = 1e3
14as_km = dict(scale_factor=km, scale_unit='km')
17class STFType(StringChoice):
18 choices = ['HalfSinusoidSTF', 'ResonatorSTF']
20 cls = {
21 'HalfSinusoidSTF': gf.HalfSinusoidSTF,
22 'ResonatorSTF': gf.ResonatorSTF}
24 @classmethod
25 def base_stf(cls, name):
26 return cls.cls[name]()
29class SFProblemConfig(ProblemConfig):
31 ranges = Dict.T(String.T(), gf.Range.T())
32 distance_min = Float.T(default=0.0)
33 stf_type = STFType.T(default='HalfSinusoidSTF')
34 nthreads = Int.T(default=1)
36 def get_problem(self, event, target_groups, targets):
37 if event.depth is None:
38 event.depth = 0.
40 base_source = gf.SFSource.from_pyrocko_event(event)
42 stf = STFType.base_stf(self.stf_type)
43 stf.duration = event.duration or 0.0
45 base_source.stf = stf
47 subs = dict(
48 event_name=event.name,
49 event_time=util.time_to_str(event.time))
51 problem = SFProblem(
52 name=expand_template(self.name_template, subs),
53 base_source=base_source,
54 target_groups=target_groups,
55 targets=targets,
56 ranges=self.ranges,
57 distance_min=self.distance_min,
58 stf_type=self.stf_type,
59 norm_exponent=self.norm_exponent,
60 nthreads=self.nthreads)
62 return problem
65@has_get_plot_classes
66class SFProblem(Problem):
68 problem_parameters = [
69 Parameter('time', 's', label='Time'),
70 Parameter('north_shift', 'm', label='Northing', **as_km),
71 Parameter('east_shift', 'm', label='Easting', **as_km),
72 Parameter('depth', 'm', label='Depth', **as_km),
73 Parameter('fn', 'N', label='$F_{n}$'),
74 Parameter('fe', 'N', label='$F_{e}$'),
75 Parameter('fd', 'N', label='$F_{d}$')]
77 problem_parameters_stf = {
78 'HalfSinusoidSTF': [
79 Parameter('duration', 's', label='Duration')],
80 'ResonatorSTF': [
81 Parameter('duration', 's', label='Duration'),
82 Parameter('frequency', 'Hz', label='Frequency')]}
84 dependants = []
86 distance_min = Float.T(default=0.0)
87 stf_type = STFType.T(default='HalfSinusoidSTF')
89 def __init__(self, **kwargs):
90 Problem.__init__(self, **kwargs)
91 self.deps_cache = {}
92 self.problem_parameters = self.problem_parameters \
93 + self.problem_parameters_stf[self.stf_type]
94 self._base_stf = STFType.base_stf(self.stf_type)
96 def get_stf(self, d):
97 d_stf = {}
98 for p in self.problem_parameters_stf[self.stf_type]:
99 d_stf[p.name] = float(d[p.name])
101 return self._base_stf.clone(**d_stf)
103 def get_source(self, x):
104 d = self.get_parameter_dict(x)
106 p = {}
107 for k in self.base_source.keys():
108 if k in d:
109 p[k] = float(
110 self.ranges[k].make_relative(self.base_source[k], d[k]))
112 source = self.base_source.clone(stf=self.get_stf(d), **p)
113 return source
115 def make_dependant(self, xs, pname):
116 pass
118 def pack_stf(self, stf):
119 return [
120 stf[p.name] for p in self.problem_parameters_stf[self.stf_type]]
122 def pack(self, source):
123 x = num.array([
124 source.time - self.base_source.time,
125 source.north_shift,
126 source.east_shift,
127 source.depth,
128 source.fn,
129 source.fe,
130 source.fd,
131 ] + self.pack_stf(source.stf), dtype=num.float)
133 return x
135 def random_uniform(self, xbounds, rstate, fixed_magnitude=None):
137 x = num.zeros(self.nparameters)
138 for i in range(self.nparameters):
139 x[i] = rstate.uniform(xbounds[i, 0], xbounds[i, 1])
141 return x.tolist()
143 def preconstrain(self, x):
144 d = self.get_parameter_dict(x)
145 x = self.get_parameter_array(d)
147 return x
149 @classmethod
150 def get_plot_classes(cls):
151 from . import plot
152 plots = super(SFProblem, cls).get_plot_classes()
153 plots.extend([plot.SFLocationPlot, plot.SFForcePlot])
154 return plots
157__all__ = '''
158 SFProblem
159 SFProblemConfig
160'''.split()