Coverage for /usr/local/lib/python3.11/dist-packages/grond/problems/double_dc/problem.py: 40%

127 statements  

« prev     ^ index     » next       coverage.py v6.5.0, created at 2023-10-26 16:25 +0000

1import numpy as num 

2import logging 

3 

4from pyrocko import gf, util 

5from pyrocko.guts import String, Float, Dict 

6 

7from grond.meta import Forbidden, expand_template, Parameter, \ 

8 has_get_plot_classes 

9 

10from ..base import Problem, ProblemConfig 

11 

12 

13guts_prefix = 'grond' 

14logger = logging.getLogger('grond.problems.double_dc.problem') 

15km = 1e3 

16as_km = dict(scale_factor=km, scale_unit='km') 

17 

18 

19class DoubleDCProblemConfig(ProblemConfig): 

20 

21 ranges = Dict.T(String.T(), gf.Range.T()) 

22 distance_min = Float.T(default=0.0) 

23 

24 def get_problem(self, event, target_groups, targets): 

25 self.check_deprecations() 

26 

27 if event.depth is None: 

28 event.depth = 0. 

29 

30 base_source = gf.DoubleDCSource.from_pyrocko_event(event) 

31 base_source.stf = gf.HalfSinusoidSTF(duration=event.duration or 0.0) 

32 

33 subs = dict( 

34 event_name=event.name, 

35 event_time=util.time_to_str(event.time)) 

36 

37 problem = DoubleDCProblem( 

38 name=expand_template(self.name_template, subs), 

39 base_source=base_source, 

40 target_groups=target_groups, 

41 targets=targets, 

42 ranges=self.ranges, 

43 distance_min=self.distance_min, 

44 norm_exponent=self.norm_exponent) 

45 

46 return problem 

47 

48 

49@has_get_plot_classes 

50class DoubleDCProblem(Problem): 

51 

52 problem_parameters = [ 

53 Parameter('time', 's', label='Time'), 

54 Parameter('north_shift', 'm', label='Northing', **as_km), 

55 Parameter('east_shift', 'm', label='Easting', **as_km), 

56 Parameter('depth', 'm', label='Depth', **as_km), 

57 Parameter('magnitude', label='Magnitude'), 

58 Parameter('strike1', 'deg', label='Strike 1'), 

59 Parameter('dip1', 'deg', label='Dip 1'), 

60 Parameter('rake1', 'deg', label='Rake 1'), 

61 Parameter('strike2', 'deg', label='Strike 2'), 

62 Parameter('dip2', 'deg', label='Dip 2'), 

63 Parameter('rake2', 'deg', label='Rake 2'), 

64 Parameter('delta_time', 's', label='$\\Delta$ Time'), 

65 Parameter('delta_depth', 'm', label='$\\Delta$ Depth'), 

66 Parameter('azimuth', 'deg', label='Azimuth'), 

67 Parameter('distance', 'm', label='Distance'), 

68 Parameter('mix', label='Mix'), 

69 Parameter('duration1', 's', label='Duration 1'), 

70 Parameter('duration2', 's', label='Duration 2')] 

71 

72 dependants = [] 

73 distance_min = Float.T(default=0.0) 

74 

75 def get_source(self, x): 

76 d = self.get_parameter_dict(x) 

77 p = {} 

78 for k in self.base_source.keys(): 

79 if k in d: 

80 p[k] = float( 

81 self.ranges[k].make_relative(self.base_source[k], d[k])) 

82 

83 stf1 = gf.HalfSinusoidSTF(duration=float(d.duration1)) 

84 stf2 = gf.HalfSinusoidSTF(duration=float(d.duration2)) 

85 

86 source = self.base_source.clone(stf1=stf1, stf2=stf2, **p) 

87 return source 

88 

89 def make_dependant(self, xs, pname): 

90 if xs.ndim == 1: 

91 return self.make_dependant(xs[num.newaxis, :], pname)[0] 

92 

93 raise KeyError(pname) 

94 

95 def pack(self, source): 

96 arr = self.get_parameter_array(source) 

97 for ip, p in enumerate(self.parameters): 

98 if p.name == 'time': 

99 arr[ip] -= self.base_source.time 

100 if p.name == 'duration1': 

101 arr[ip] = source.stf1.duration if source.stf1 else 0.0 

102 if p.name == 'duration2': 

103 arr[ip] = source.stf2.duration if source.stf2 else 0.0 

104 return arr 

105 

106 def random_uniform(self, xbounds, rstate, fixed_magnitude=None): 

107 x = num.zeros(self.nparameters) 

108 for i in range(self.nparameters): 

109 x[i] = rstate.uniform(xbounds[i, 0], xbounds[i, 1]) 

110 

111 if fixed_magnitude is not None: 

112 x[4] = fixed_magnitude 

113 

114 return x.tolist() 

115 

116 def preconstrain(self, x, optimizer=False): 

117 source = self.get_source(x) 

118 if any(self.distance_min > source.distance_to(t) 

119 for t in self.waveform_targets): 

120 raise Forbidden() 

121 

122 return num.array(x, dtype=float) 

123 

124 @classmethod 

125 def get_plot_classes(cls): 

126 from .. import plot 

127 plots = super(DoubleDCProblem, cls).get_plot_classes() 

128 plots.extend([plot.HudsonPlot, plot.MTDecompositionPlot, 

129 plot.MTLocationPlot, plot.MTFuzzyPlot]) 

130 return plots 

131 

132 

133class TripleDCProblemConfig(ProblemConfig): 

134 

135 ranges = Dict.T(String.T(), gf.Range.T()) 

136 distance_min = Float.T(default=0.0) 

137 

138 def get_problem(self, event, target_groups, targets): 

139 if event.depth is None: 

140 event.depth = 0. 

141 

142 base_source = gf.TripleDCSource.from_pyrocko_event(event) 

143 base_source.stf = gf.HalfSinusoidSTF(duration=event.duration or 0.0) 

144 

145 subs = dict( 

146 event_name=event.name, 

147 event_time=util.time_to_str(event.time)) 

148 

149 problem = TripleDCProblem( 

150 name=expand_template(self.name_template, subs), 

151 base_source=base_source, 

152 target_groups=target_groups, 

153 targets=targets, 

154 ranges=self.ranges, 

155 distance_min=self.distance_min, 

156 norm_exponent=self.norm_exponent) 

157 

158 return problem 

159 

160 

161@has_get_plot_classes 

162class TripleDCProblem(Problem): 

163 

164 problem_parameters = [ 

165 Parameter('delta_time1', 's', label=r'$\Delta$Time 1'), 

166 Parameter('north_shift1', 'm', label='Northing 1', **as_km), 

167 Parameter('east_shift1', 'm', label='Easting 1', **as_km), 

168 Parameter('depth1', 'm', label='Depth 1', **as_km), 

169 Parameter('magnitude1', label='Magnitude 1'), 

170 Parameter('strike1', 'deg', label='Strike 1'), 

171 Parameter('dip1', 'deg', label='Dip 1'), 

172 Parameter('rake1', 'deg', label='Rake 1'), 

173 Parameter('delta_time2', 's', label=r'$\Delta$Time 2'), 

174 Parameter('north_shift2', 'm', label='Northing 2', **as_km), 

175 Parameter('east_shift2', 'm', label='Easting 2', **as_km), 

176 Parameter('depth2', 'm', label='Depth 2', **as_km), 

177 Parameter('magnitude2', label='Magnitude 2'), 

178 Parameter('strike2', 'deg', label='Strike 2'), 

179 Parameter('dip2', 'deg', label='Dip 2'), 

180 Parameter('rake2', 'deg', label='Rake 2'), 

181 Parameter('delta_time3', 's', label=r'$\Delta$Time 3'), 

182 Parameter('north_shift3', 'm', label='Northing 3', **as_km), 

183 Parameter('east_shift3', 'm', label='Easting 3', **as_km), 

184 Parameter('depth3', 'm', label='Depth 3', **as_km), 

185 Parameter('magnitude3', label='Magnitude 3'), 

186 Parameter('strike3', 'deg', label='Strike 3'), 

187 Parameter('dip3', 'deg', label='Dip 3'), 

188 Parameter('rake3', 'deg', label='Rake 3'), 

189 Parameter('duration1', 's', label='Duration 1'), 

190 Parameter('duration2', 's', label='Duration 2'), 

191 Parameter('duration3', 's', label='Duration 3')] 

192 

193 dependants = [] 

194 distance_min = Float.T(default=0.0) 

195 

196 def get_source(self, x): 

197 d = self.get_parameter_dict(x) 

198 p = {} 

199 for k in self.base_source.keys(): 

200 if k in d: 

201 p[k] = float( 

202 self.ranges[k].make_relative(self.base_source[k], d[k])) 

203 

204 stf1 = gf.HalfSinusoidSTF(duration=float(d.duration1)) 

205 stf2 = gf.HalfSinusoidSTF(duration=float(d.duration2)) 

206 stf3 = gf.HalfSinusoidSTF(duration=float(d.duration3)) 

207 

208 source = self.base_source.clone(stf1=stf1, stf2=stf2, stf3=stf3, **p) 

209 return source 

210 

211 def make_dependant(self, xs, pname): 

212 if xs.ndim == 1: 

213 return self.make_dependant(xs[num.newaxis, :], pname)[0] 

214 

215 raise KeyError(pname) 

216 

217 def pack(self, source): 

218 arr = self.get_parameter_array(source) 

219 for ip, p in enumerate(self.parameters): 

220 if p.name == 'duration1': 

221 arr[ip] = source.stf1.duration if source.stf1 else 0.0 

222 if p.name == 'duration2': 

223 arr[ip] = source.stf2.duration if source.stf2 else 0.0 

224 if p.name == 'duration3': 

225 arr[ip] = source.stf3.duration if source.stf3 else 0.0 

226 return arr 

227 

228 def random_uniform(self, xbounds, rstate, **kwargs): 

229 x = num.zeros(self.nparameters) 

230 for i in range(self.nparameters): 

231 x[i] = rstate.uniform(xbounds[i, 0], xbounds[i, 1]) 

232 

233 return x.tolist() 

234 

235 def preconstrain(self, x, optimizer=False): 

236 source = self.get_source(x) 

237 if any(self.distance_min > source.distance_to(t) 

238 for t in self.waveform_targets): 

239 raise Forbidden() 

240 

241 return num.array(x, dtype=float) 

242 

243 @classmethod 

244 def get_plot_classes(cls): 

245 from .. import plot 

246 plots = super(TripleDCProblem, cls).get_plot_classes() 

247 plots.extend([plot.HudsonPlot, plot.MTDecompositionPlot, 

248 plot.MTFuzzyPlot]) 

249 return plots 

250 

251 

252__all__ = ''' 

253 DoubleDCProblem 

254 DoubleDCProblemConfig 

255 TripleDCProblem 

256 TripleDCProblemConfig 

257'''.split()