Coverage for /usr/local/lib/python3.11/dist-packages/grond/problems/double_sf/problem.py: 31%

108 statements  

« prev     ^ index     » next       coverage.py v6.5.0, created at 2025-04-03 09:31 +0000

1# https://pyrocko.org/grond - GPLv3 

2# 

3# The Grond Developers, 21st Century 

4import numpy as num 

5import logging 

6 

7from pyrocko import gf, util 

8from pyrocko.guts import String, Float, Dict, StringChoice 

9 

10from grond.meta import Forbidden, expand_template, Parameter, \ 

11 has_get_plot_classes 

12 

13from ..base import Problem, ProblemConfig 

14 

15guts_prefix = 'grond' 

16logger = logging.getLogger('grond.problems.double_sf.problem') 

17km = 1e3 

18as_km = dict(scale_factor=km, scale_unit='km') 

19 

20 

21class DoubleSFProblemConfig(ProblemConfig): 

22 

23 ranges = Dict.T(String.T(), gf.Range.T()) 

24 distance_min = Float.T(default=0.0) 

25 force_directions = StringChoice.T( 

26 choices=('off', 'unidirectional', 'counterdirectional'), 

27 default='off') 

28 

29 def get_problem(self, event, target_groups, targets): 

30 if event.depth is None: 

31 event.depth = 0. 

32 

33 source = gf.SFSource.from_pyrocko_event(event) 

34 source.stf = gf.HalfSinusoidSTF(duration=event.duration or 0.0) 

35 

36 base_source = gf.CombiSFSource( 

37 name=source.name, 

38 subsources=[ 

39 source.clone(name=''), 

40 source.clone(name='')]) 

41 

42 source.lat, source.lon = event.effective_latlon 

43 

44 subs = dict( 

45 event_name=event.name, 

46 event_time=util.time_to_str(event.time)) 

47 

48 problem = DoubleSFProblem( 

49 name=expand_template(self.name_template, subs), 

50 base_source=base_source, 

51 target_groups=target_groups, 

52 targets=targets, 

53 ranges=self.ranges, 

54 distance_min=self.distance_min, 

55 norm_exponent=self.norm_exponent, 

56 force_directions=self.force_directions) 

57 

58 return problem 

59 

60 

61@has_get_plot_classes 

62class DoubleSFProblem(Problem): 

63 

64 problem_parameters = [ 

65 Parameter('time1', 's', label='Time'), 

66 Parameter('north_shift1', 'm', label='Northing', **as_km), 

67 Parameter('east_shift1', 'm', label='Easting', **as_km), 

68 Parameter('depth1', 'm', label='Depth', **as_km), 

69 Parameter('time2', 's', label='Time'), 

70 Parameter('north_shift2', 'm', label='Northing', **as_km), 

71 Parameter('east_shift2', 'm', label='Easting', **as_km), 

72 Parameter('depth2', 'm', label='Depth', **as_km), 

73 Parameter('fn1', 'N', label='$F_{n1}$'), 

74 Parameter('fe1', 'N', label='$F_{e1}$'), 

75 Parameter('fd1', 'N', label='$F_{d1}$'), 

76 Parameter('fn2', 'N', label='$F_{n2}$'), 

77 Parameter('fe2', 'N', label='$F_{e2}$'), 

78 Parameter('fd2', 'N', label='$F_{d2}$'), 

79 Parameter('duration1', 's', label='Duration 1'), 

80 Parameter('duration2', 's', label='Duration 2')] 

81 

82 dependants = [ 

83 Parameter('force1', 'N', label='$||F_1||$'), 

84 Parameter('force2', 'N', label='$||F_2||$')] 

85 

86 distance_min = Float.T(default=0.0) 

87 force_directions = String.T() 

88 

89 def __init__(self, **kwargs): 

90 Problem.__init__(self, **kwargs) 

91 self.deps_cache = {} 

92 

93 def get_source(self, x): 

94 d = self.get_parameter_dict(x) 

95 

96 sources = [] 

97 

98 for i, subsource in enumerate(self.base_source.subsources): 

99 p = {} 

100 

101 for k in subsource.keys(): 

102 key = '%s%d' % (k, i+1) 

103 if key in d: 

104 p[k] = float( 

105 self.ranges[key].make_relative(subsource[k], d[key])) 

106 

107 sources.append(self.base_source.subsources[i].clone(**p)) 

108 

109 sources[0].stf = gf.HalfSinusoidSTF(duration=float(d.duration1)) 

110 sources[1].stf = gf.HalfSinusoidSTF(duration=float(d.duration2)) 

111 

112 return self.base_source.clone(subsources=sources) 

113 

114 def make_dependant(self, xs, pname): 

115 cache = self.deps_cache 

116 if xs.ndim == 1: 

117 return self.make_dependant(xs[num.newaxis, :], pname)[0] 

118 

119 if pname not in self.dependant_names: 

120 raise KeyError(pname) 

121 

122 y = num.zeros(xs.shape[0]) 

123 for i, x in enumerate(xs): 

124 k = tuple(x.tolist()) 

125 if k not in cache: 

126 source = self.get_source(x) 

127 cache[k] = source 

128 

129 source = cache[k] 

130 

131 y[i] = getattr(source.subsources[int(pname[-1]) - 1], pname[:-1]) 

132 

133 return y 

134 

135 def pack(self, source): 

136 arr = self.get_parameter_array(source) 

137 subsrcs = source.subsources 

138 for ip, p in enumerate(self.parameters): 

139 # if p.name == 'time': 

140 # arr[ip] -= self.base_source.time 

141 if p.name == 'duration1': 

142 arr[ip] = subsrcs[0].stf.duration if subsrcs[0].stf else 0.0 

143 if p.name == 'duration2': 

144 arr[ip] = subsrcs[1].stf.duration if subsrcs[1].stf else 0.0 

145 return arr 

146 

147 def preconstrain(self, x): 

148 source = self.get_source(x) 

149 if any(self.distance_min > source.distance_to(t) 

150 for t in self.waveform_targets + self.phase_pick_targets): 

151 raise Forbidden() 

152 

153 if self.force_directions == 'unidirectional': 

154 force1 = source.subsources[0].force 

155 force2 = source.subsources[1].force 

156 

157 ratio = force2 / force1 

158 

159 idx_fn2 = self.get_parameter_index('fn2') 

160 idx_fe2 = self.get_parameter_index('fe2') 

161 idx_fd2 = self.get_parameter_index('fd2') 

162 

163 x[idx_fn2] = source.subsources[0].fn * ratio 

164 x[idx_fe2] = source.subsources[0].fe * ratio 

165 x[idx_fd2] = source.subsources[0].fd * ratio 

166 

167 elif self.force_directions == 'counterdirectional': 

168 force1 = source.subsources[0].force 

169 force2 = source.subsources[1].force 

170 

171 ratio = force2 / force1 

172 

173 idx_fn2 = self.get_parameter_index('fn2') 

174 idx_fe2 = self.get_parameter_index('fe2') 

175 idx_fd2 = self.get_parameter_index('fd2') 

176 

177 x[idx_fn2] = -source.subsources[0].fn * ratio 

178 x[idx_fe2] = -source.subsources[0].fe * ratio 

179 x[idx_fd2] = -source.subsources[0].fd * ratio 

180 

181 return x 

182 

183 def get_dependant_bounds(self): 

184 range_start = num.min([ 

185 (self.ranges['f{}1'.format(f)].start, 

186 self.ranges['f{}2'.format(f)].start) for f in 'n e d'.split()], 

187 axis=0) 

188 

189 range_stop = num.max([ 

190 (self.ranges['f{}1'.format(f)].stop, 

191 self.ranges['f{}2'.format(f)].stop) for f in 'n e d'.split()], 

192 axis=0) 

193 

194 force_range = ( 

195 -num.linalg.norm(range_start), 

196 num.linalg.norm(range_stop)) 

197 

198 out = [force_range, force_range] 

199 

200 return out 

201 

202 @classmethod 

203 def get_plot_classes(cls): 

204 from . import plot 

205 plots = super(DoubleSFProblem, cls).get_plot_classes() 

206 plots.extend([ 

207 plot.DoubleSFForcePlot, 

208 plot.DoubleSFDecompositionPlot]) 

209 return plots 

210 

211 

212__all__ = ''' 

213 DoubleSFProblem 

214 DoubleSFProblemConfig 

215'''.split()