Coverage for /usr/local/lib/python3.11/dist-packages/grond/problems/singleforce/problem.py: 94%

81 statements  

« prev     ^ index     » next       coverage.py v6.5.0, created at 2024-11-27 15:15 +0000

1# https://pyrocko.org/grond - GPLv3 

2# 

3# The Grond Developers, 21st Century 

4import numpy as num 

5import logging 

6 

7from pyrocko import gf, util 

8from pyrocko.guts import String, Float, Dict, StringChoice, Int 

9 

10from grond.meta import expand_template, Parameter, has_get_plot_classes 

11 

12from ..base import Problem, ProblemConfig 

13 

14guts_prefix = 'grond' 

15logger = logging.getLogger('grond.problems.singleforce.problem') 

16km = 1e3 

17as_km = dict(scale_factor=km, scale_unit='km') 

18 

19 

20class STFType(StringChoice): 

21 choices = ['HalfSinusoidSTF', 'ResonatorSTF', 'TremorSTF'] 

22 

23 cls = { 

24 'HalfSinusoidSTF': gf.HalfSinusoidSTF, 

25 'ResonatorSTF': gf.ResonatorSTF, 

26 'TremorSTF': gf.TremorSTF} 

27 

28 @classmethod 

29 def base_stf(cls, name): 

30 return cls.cls[name]() 

31 

32 

33class SFProblemConfig(ProblemConfig): 

34 

35 ranges = Dict.T(String.T(), gf.Range.T()) 

36 distance_min = Float.T(default=0.0) 

37 stf_type = STFType.T(default='HalfSinusoidSTF') 

38 nthreads = Int.T(default=1) 

39 

40 def get_problem(self, event, target_groups, targets): 

41 if event.depth is None: 

42 event.depth = 0. 

43 

44 base_source = gf.SFSource.from_pyrocko_event(event) 

45 

46 stf = STFType.base_stf(self.stf_type) 

47 stf.duration = event.duration or 0.0 

48 

49 base_source.stf = stf 

50 

51 subs = dict( 

52 event_name=event.name, 

53 event_time=util.time_to_str(event.time)) 

54 

55 problem = SFProblem( 

56 name=expand_template(self.name_template, subs), 

57 base_source=base_source, 

58 target_groups=target_groups, 

59 targets=targets, 

60 ranges=self.ranges, 

61 distance_min=self.distance_min, 

62 stf_type=self.stf_type, 

63 norm_exponent=self.norm_exponent, 

64 nthreads=self.nthreads) 

65 

66 return problem 

67 

68 def get_depth_range(self): 

69 return self.ranges['depth'] 

70 

71 

72@has_get_plot_classes 

73class SFProblem(Problem): 

74 

75 problem_parameters = [ 

76 Parameter('time', 's', label='Time'), 

77 Parameter('north_shift', 'm', label='Northing', **as_km), 

78 Parameter('east_shift', 'm', label='Easting', **as_km), 

79 Parameter('depth', 'm', label='Depth', **as_km), 

80 Parameter('fn', 'N', label='$F_{n}$'), 

81 Parameter('fe', 'N', label='$F_{e}$'), 

82 Parameter('fd', 'N', label='$F_{d}$')] 

83 

84 problem_parameters_stf = { 

85 'HalfSinusoidSTF': [ 

86 Parameter('duration', 's', label='Duration')], 

87 'ResonatorSTF': [ 

88 Parameter('duration', 's', label='Duration'), 

89 Parameter('frequency', 'Hz', label='Frequency')], 

90 'TremorSTF': [ 

91 Parameter('duration', 's', label='Duration'), 

92 Parameter('frequency', 'Hz', label='Frequency')]} 

93 

94 dependants = [] 

95 

96 distance_min = Float.T(default=0.0) 

97 stf_type = STFType.T(default='HalfSinusoidSTF') 

98 

99 def __init__(self, **kwargs): 

100 Problem.__init__(self, **kwargs) 

101 self.deps_cache = {} 

102 self.problem_parameters = self.problem_parameters \ 

103 + self.problem_parameters_stf[self.stf_type] 

104 self._base_stf = STFType.base_stf(self.stf_type) 

105 

106 def get_stf(self, d): 

107 d_stf = {} 

108 for p in self.problem_parameters_stf[self.stf_type]: 

109 d_stf[p.name] = float(d[p.name]) 

110 

111 return self._base_stf.clone(**d_stf) 

112 

113 def get_source(self, x): 

114 d = self.get_parameter_dict(x) 

115 

116 p = {} 

117 for k in self.base_source.keys(): 

118 if k in d: 

119 p[k] = float( 

120 self.ranges[k].make_relative(self.base_source[k], d[k])) 

121 

122 source = self.base_source.clone(stf=self.get_stf(d), **p) 

123 return source 

124 

125 def make_dependant(self, xs, pname): 

126 pass 

127 

128 def pack_stf(self, stf): 

129 return [ 

130 stf[p.name] for p in self.problem_parameters_stf[self.stf_type]] 

131 

132 def pack(self, source): 

133 x = num.array([ 

134 source.time - self.base_source.time, 

135 source.north_shift, 

136 source.east_shift, 

137 source.depth, 

138 source.fn, 

139 source.fe, 

140 source.fd, 

141 ] + self.pack_stf(source.stf), dtype=float) 

142 

143 return x 

144 

145 def random_uniform(self, xbounds, rstate, fixed_magnitude=None): 

146 

147 x = num.zeros(self.nparameters) 

148 for i in range(self.nparameters): 

149 x[i] = rstate.uniform(xbounds[i, 0], xbounds[i, 1]) 

150 

151 return x.tolist() 

152 

153 def preconstrain(self, x): 

154 d = self.get_parameter_dict(x) 

155 x = self.get_parameter_array(d) 

156 

157 return x 

158 

159 @classmethod 

160 def get_plot_classes(cls): 

161 from . import plot 

162 plots = super(SFProblem, cls).get_plot_classes() 

163 plots.extend([plot.SFLocationPlot, plot.SFForcePlot]) 

164 return plots 

165 

166 

167__all__ = ''' 

168 SFProblem 

169 SFProblemConfig 

170'''.split()