Coverage for /usr/local/lib/python3.11/dist-packages/grond/problems/rectangular/problem.py: 87%

69 statements  

« prev     ^ index     » next       coverage.py v6.5.0, created at 2023-10-25 10:12 +0000

1import numpy as num 

2import logging 

3 

4from pyrocko import gf, util 

5from pyrocko.guts import String, Float, Dict, Int, Bool 

6 

7from grond.meta import expand_template, Parameter, has_get_plot_classes 

8 

9from ..base import Problem, ProblemConfig 

10from .. import CMTProblem 

11 

12guts_prefix = 'grond' 

13logger = logging.getLogger('grond.problems.rectangular.problem') 

14km = 1e3 

15as_km = dict(scale_factor=km, scale_unit='km') 

16 

17 

18class RectangularProblemConfig(ProblemConfig): 

19 

20 ranges = Dict.T(String.T(), gf.Range.T()) 

21 decimation_factor = Int.T(default=1) 

22 distance_min = Float.T(default=0.) 

23 nthreads = Int.T(default=4) 

24 point_source_target_balancing = Bool.T( 

25 default=False, 

26 help='If ``True``, target balancing (if used) is performed on a ' 

27 'moment tensor point source at the events location. It increases ' 

28 'the speed, but might lead to not fully optimized target weights.' 

29 ) 

30 

31 def get_problem(self, event, target_groups, targets): 

32 if self.decimation_factor != 1: 

33 logger.warning( 

34 'Decimation factor for rectangular source set to %i. Results ' 

35 'may be inaccurate.' % self.decimation_factor) 

36 

37 base_source = gf.RectangularSource.from_pyrocko_event( 

38 event, 

39 anchor='top', 

40 decimation_factor=self.decimation_factor) 

41 

42 subs = dict( 

43 event_name=event.name, 

44 event_time=util.time_to_str(event.time)) 

45 

46 cmt_problem = None 

47 if self.point_source_target_balancing: 

48 base_source_cmt = gf.MTSource.from_pyrocko_event(event) 

49 

50 stf = gf.HalfSinusoidSTF() 

51 stf.duration = event.duration or 0.0 

52 

53 base_source_cmt.stf = stf 

54 

55 ranges = dict( 

56 time=self.ranges['time'], 

57 north_shift=self.ranges['north_shift'], 

58 east_shift=self.ranges['east_shift'], 

59 depth=self.ranges['depth'], 

60 magnitude=gf.Range( 

61 start=event.magnitude - 1., 

62 stop=event.magnitude + 1.), 

63 duration=gf.Range(start=0., stop=stf.duration * 2.), 

64 rmnn=gf.Range(start=-1.41421, stop=1.41421), 

65 rmee=gf.Range(start=-1.41421, stop=1.41421), 

66 rmdd=gf.Range(start=-1.41421, stop=1.41421), 

67 rmne=gf.Range(start=-1., stop=1.), 

68 rmnd=gf.Range(start=-1., stop=1.), 

69 rmed=gf.Range(start=-1., stop=1.)) 

70 

71 cmt_problem = CMTProblem( 

72 name=expand_template(self.name_template, subs), 

73 base_source=base_source_cmt, 

74 distance_min=self.distance_min, 

75 target_groups=target_groups, 

76 targets=targets, 

77 ranges=ranges, 

78 mt_type='dc', 

79 stf_type='HalfSinusoidSTF', 

80 norm_exponent=self.norm_exponent, 

81 nthreads=self.nthreads) 

82 

83 problem = RectangularProblem( 

84 name=expand_template(self.name_template, subs), 

85 base_source=base_source, 

86 distance_min=self.distance_min, 

87 target_groups=target_groups, 

88 targets=targets, 

89 ranges=self.ranges, 

90 norm_exponent=self.norm_exponent, 

91 nthreads=self.nthreads, 

92 cmt_problem=cmt_problem) 

93 

94 return problem 

95 

96 

97@has_get_plot_classes 

98class RectangularProblem(Problem): 

99 

100 problem_parameters = [ 

101 Parameter('east_shift', 'm', label='Easting', **as_km), 

102 Parameter('north_shift', 'm', label='Northing', **as_km), 

103 Parameter('depth', 'm', label='Depth', **as_km), 

104 Parameter('length', 'm', label='Length', optional=False, **as_km), 

105 Parameter('width', 'm', label='Width', optional=False, **as_km), 

106 Parameter('slip', 'm', label='Slip', optional=False), 

107 Parameter('strike', 'deg', label='Strike'), 

108 Parameter('dip', 'deg', label='Dip'), 

109 Parameter('rake', 'deg', label='Rake') 

110 ] 

111 

112 problem_waveform_parameters = [ 

113 Parameter('nucleation_x', 'offset', label='Nucleation X'), 

114 Parameter('nucleation_y', 'offset', label='Nucleation Y'), 

115 Parameter('time', 's', label='Time'), 

116 Parameter('velocity', 'm/s', label='Rupture Velocity') 

117 ] 

118 

119 dependants = [] 

120 

121 distance_min = Float.T(default=0.0) 

122 

123 cmt_problem = Problem.T(optional=True) 

124 

125 def set_engine(self, engine): 

126 self._engine = engine 

127 

128 if self.cmt_problem is not None: 

129 self.cmt_problem.set_engine(engine) 

130 

131 def pack(self, source): 

132 arr = self.get_parameter_array(source) 

133 for ip, p in enumerate(self.parameters): 

134 if p.name == 'time': 

135 arr[ip] -= self.base_source.time 

136 return arr 

137 

138 def get_source(self, x): 

139 d = self.get_parameter_dict(x) 

140 p = {} 

141 for k in self.base_source.keys(): 

142 if k in d: 

143 p[k] = float( 

144 self.ranges[k].make_relative(self.base_source[k], d[k])) 

145 

146 source = self.base_source.clone(**p) 

147 

148 return source 

149 

150 def random_uniform(self, xbounds, rstate, fixed_magnitude=None): 

151 x = num.zeros(self.nparameters) 

152 for i in range(self.nparameters): 

153 x[i] = rstate.uniform(xbounds[i, 0], xbounds[i, 1]) 

154 

155 return x 

156 

157 def preconstrain(self, x, optimizer=False): 

158 # source = self.get_source(x) 

159 # if any(self.distance_min > source.distance_to(t) 

160 # for t in self.targets): 

161 # raise Forbidden() 

162 return x 

163 

164 @classmethod 

165 def get_plot_classes(cls): 

166 plots = super(RectangularProblem, cls).get_plot_classes() 

167 return plots 

168 

169 

170__all__ = ''' 

171 RectangularProblem 

172 RectangularProblemConfig 

173'''.split()