Coverage for /usr/local/lib/python3.11/dist-packages/pyrocko/gf/targets.py: 82%

159 statements  

« prev     ^ index     » next       coverage.py v6.5.0, created at 2023-10-06 06:59 +0000

1# http://pyrocko.org - GPLv3 

2# 

3# The Pyrocko Developers, 21st Century 

4# ---|P------/S----------~Lg---------- 

5 

6''' 

7Data structures representing the receivers for seismogram synthesis. 

8''' 

9 

10import numpy as num 

11import math 

12 

13from . import meta 

14from pyrocko.guts import Timestamp, Tuple, String, Float, Object,\ 

15 StringChoice, Int 

16from pyrocko.guts_array import Array 

17from pyrocko.model import gnss 

18from pyrocko.orthodrome import distance_accurate50m_numpy 

19from pyrocko.util import num_full_like, num_full 

20 

21d2r = num.pi / 180. 

22 

23 

24class BadTarget(Exception): 

25 pass 

26 

27 

28class Filter(Object): 

29 pass 

30 

31 

32class OptimizationMethod(StringChoice): 

33 choices = ['enable', 'disable'] 

34 

35 

36def component_orientation(source, target, component): 

37 ''' 

38 Get component and azimuth for standard components R, T, Z, N, and E. 

39 

40 :param source: :py:class:`pyrocko.model.location.Location` object 

41 :param target: :py:class:`pyrocko.model.location.Location` object 

42 :param component: string ``'R'``, ``'T'``, ``'Z'``, ``'N'`` or ``'E'`` 

43 ''' 

44 

45 _, bazi = source.azibazi_to(target) 

46 

47 azi, dip = { 

48 'T': (bazi + 270., 0.), 

49 'R': (bazi + 180., 0.), 

50 'N': (0., 0.), 

51 'E': (90., 0.), 

52 'Z': (0., -90.)}[component] 

53 

54 return azi, dip 

55 

56 

57class Target(meta.Receiver): 

58 ''' 

59 A seismogram computation request for a single component, including 

60 its post-processing parmeters. 

61 ''' 

62 

63 quantity = meta.QuantityType.T( 

64 optional=True, 

65 help='Measurement quantity type. If not given, it is guessed from the ' 

66 'channel code. For some common cases, derivatives of the stored ' 

67 'quantities are supported by using finite difference ' 

68 'approximations (e.g. displacement to velocity or acceleration). ' 

69 '4th order central FD schemes are used.') 

70 

71 codes = Tuple.T( 

72 4, String.T(), default=('', 'STA', '', 'Z'), 

73 help='network, station, location and channel codes to be set on ' 

74 'the response trace.') 

75 

76 elevation = Float.T( 

77 default=0.0, 

78 help='station surface elevation in [m]') 

79 

80 store_id = meta.StringID.T( 

81 optional=True, 

82 help="ID of Green's function store to use for the computation. " 

83 'If not given, the processor may use a system default.') 

84 

85 sample_rate = Float.T( 

86 optional=True, 

87 help='sample rate to produce. ' 

88 "If not given the GF store's default sample rate is used. " 

89 'GF store specific restrictions may apply.') 

90 

91 interpolation = meta.InterpolationMethod.T( 

92 default='nearest_neighbor', 

93 help="Interpolation method between Green's functions. Supported are" 

94 ' ``nearest_neighbor`` and ``multilinear``') 

95 

96 optimization = OptimizationMethod.T( 

97 default='enable', 

98 optional=True, 

99 help='disable/enable optimizations in weight-delay-and-sum operation') 

100 

101 tmin = Timestamp.T( 

102 optional=True, 

103 help='time of first sample to request in [s]. ' 

104 "If not given, it is determined from the Green's functions.") 

105 

106 tmax = Timestamp.T( 

107 optional=True, 

108 help='time of last sample to request in [s]. ' 

109 "If not given, it is determined from the Green's functions.") 

110 

111 azimuth = Float.T( 

112 optional=True, 

113 help='azimuth of sensor component in [deg], clockwise from north. ' 

114 'If not given, it is guessed from the channel code.') 

115 

116 dip = Float.T( 

117 optional=True, 

118 help='dip of sensor component in [deg], ' 

119 'measured downward from horizontal. ' 

120 'If not given, it is guessed from the channel code.') 

121 

122 filter = Filter.T( 

123 optional=True, 

124 help='frequency response filter.') 

125 

126 def base_key(self): 

127 return (self.store_id, self.sample_rate, self.interpolation, 

128 self.optimization, 

129 self.tmin, self.tmax, 

130 self.elevation, self.depth, self.north_shift, self.east_shift, 

131 self.lat, self.lon) 

132 

133 def effective_quantity(self): 

134 if self.quantity is not None: 

135 return self.quantity 

136 

137 # guess from channel code 

138 cha = self.codes[-1].upper() 

139 if len(cha) == 3: 

140 # use most common SEED conventions here, however units have to be 

141 # guessed, because they are not uniquely defined by the conventions 

142 if cha[-2] in 'HL': # high gain, low gain seismometer 

143 return 'velocity' 

144 if cha[-2] == 'N': # accelerometer 

145 return 'acceleration' 

146 if cha[-2] == 'D': # hydrophone, barometer, ... 

147 return 'pressure' 

148 if cha[-2] == 'A': # tiltmeter 

149 return 'tilt' 

150 elif len(cha) == 2: 

151 if cha[-2] == 'U': 

152 return 'displacement' 

153 if cha[-2] == 'V': 

154 return 'velocity' 

155 elif len(cha) == 1: 

156 if cha in 'NEZ': 

157 return 'displacement' 

158 if cha == 'P': 

159 return 'pressure' 

160 

161 raise BadTarget('cannot guess measurement quantity type from channel ' 

162 'code "%s"' % cha) 

163 

164 def receiver(self, store): 

165 rec = meta.Receiver(**dict(meta.Receiver.T.inamevals(self))) 

166 return rec 

167 

168 def component_code(self): 

169 cha = self.codes[-1].upper() 

170 if cha: 

171 return cha[-1] 

172 else: 

173 return ' ' 

174 

175 def effective_azimuth(self): 

176 if self.azimuth is not None: 

177 return self.azimuth 

178 elif self.component_code() in 'NEZ': 

179 return {'N': 0., 'E': 90., 'Z': 0.}[self.component_code()] 

180 

181 raise BadTarget('cannot determine sensor component azimuth for ' 

182 '%s.%s.%s.%s' % self.codes) 

183 

184 def effective_dip(self): 

185 if self.dip is not None: 

186 return self.dip 

187 elif self.component_code() in 'NEZ': 

188 return {'N': 0., 'E': 0., 'Z': -90.}[self.component_code()] 

189 

190 raise BadTarget('cannot determine sensor component dip') 

191 

192 def get_sin_cos_factors(self): 

193 azi = self.effective_azimuth() 

194 dip = self.effective_dip() 

195 sa = math.sin(azi*d2r) 

196 ca = math.cos(azi*d2r) 

197 sd = math.sin(dip*d2r) 

198 cd = math.cos(dip*d2r) 

199 return sa, ca, sd, cd 

200 

201 def get_factor(self): 

202 return 1.0 

203 

204 def post_process(self, engine, source, tr): 

205 return meta.Result(trace=tr) 

206 

207 

208class StaticTarget(meta.MultiLocation): 

209 ''' 

210 A computation request for a spatial multi-location target of 

211 static/geodetic quantities. 

212 ''' 

213 quantity = meta.QuantityType.T( 

214 optional=True, 

215 default='displacement', 

216 help='Measurement quantity type, for now only `displacement` is' 

217 'supported.') 

218 

219 interpolation = meta.InterpolationMethod.T( 

220 default='nearest_neighbor', 

221 help="Interpolation method between Green's functions. Supported are" 

222 ' ``nearest_neighbor`` and ``multilinear``') 

223 

224 tsnapshot = Timestamp.T( 

225 optional=True, 

226 help='time of the desired snapshot in [s], ' 

227 'If not given, the first sample is taken. If the desired sample' 

228 " exceeds the length of the Green's function store," 

229 ' the last sample is taken.') 

230 

231 store_id = meta.StringID.T( 

232 optional=True, 

233 help="ID of Green's function store to use for the computation. " 

234 'If not given, the processor may use a system default.') 

235 

236 def base_key(self): 

237 return (self.store_id, 

238 self.coords5.shape, 

239 self.quantity, 

240 self.tsnapshot, 

241 self.interpolation) 

242 

243 @property 

244 def ntargets(self): 

245 ''' 

246 Number of targets held by instance. 

247 ''' 

248 return self.ncoords 

249 

250 def get_targets(self): 

251 ''' 

252 Discretizes the multilocation target into a list of 

253 :py:class:`Target` objects. 

254 

255 :returns: 

256 Individual point locations. 

257 :rtype: 

258 :py:class:`list` of :py:class:`Target` 

259 ''' 

260 targets = [] 

261 for i in range(self.ntargets): 

262 targets.append( 

263 Target( 

264 lat=float(self.coords5[i, 0]), 

265 lon=float(self.coords5[i, 1]), 

266 north_shift=float(self.coords5[i, 2]), 

267 east_shift=float(self.coords5[i, 3]), 

268 elevation=float(self.coords5[i, 4]))) 

269 return targets 

270 

271 def distance_to(self, source): 

272 src_lats = num_full_like(self.lats, fill_value=source.lat) 

273 src_lons = num_full_like(self.lons, fill_value=source.lon) 

274 

275 target_coords = self.get_latlon() 

276 target_lats = target_coords[:, 0] 

277 target_lons = target_coords[:, 1] 

278 return distance_accurate50m_numpy( 

279 src_lats, src_lons, target_lats, target_lons) 

280 

281 def post_process(self, engine, source, statics): 

282 return meta.StaticResult(result=statics) 

283 

284 

285class SatelliteTarget(StaticTarget): 

286 ''' 

287 A computation request for a spatial multi-location target of 

288 static/geodetic quantities measured from a satellite instrument. 

289 The line of sight angles are provided and projecting 

290 post-processing is applied. 

291 ''' 

292 theta = Array.T( 

293 shape=(None,), 

294 dtype=float, 

295 serialize_as='base64-compat', 

296 help="Horizontal angle towards satellite's line of sight in radians." 

297 '\n\n .. important::\n\n' 

298 ' :math:`0` is **east** and' 

299 ' :math:`\\frac{\\pi}{2}` is **north**.\n\n') 

300 

301 phi = Array.T( 

302 shape=(None,), 

303 dtype=float, 

304 serialize_as='base64-compat', 

305 help='Theta is look vector elevation angle towards satellite from' 

306 " horizon in radians. Matrix of theta towards satellite's" 

307 ' line of sight.' 

308 '\n\n .. important::\n\n' 

309 ' :math:`-\\frac{\\pi}{2}` is **down** and' 

310 ' :math:`\\frac{\\pi}{2}` is **up**.\n\n') 

311 

312 def __init__(self, *args, **kwargs): 

313 super(SatelliteTarget, self).__init__(*args, **kwargs) 

314 self._los_factors = None 

315 

316 def get_los_factors(self): 

317 if (self.theta.size != self.phi.size != self.lats.size): 

318 raise AttributeError('LOS angles inconsistent with provided' 

319 ' coordinate shape.') 

320 if self._los_factors is None: 

321 self._los_factors = num.empty((self.theta.shape[0], 3)) 

322 self._los_factors[:, 0] = num.sin(self.theta) 

323 self._los_factors[:, 1] = num.cos(self.theta) * num.cos(self.phi) 

324 self._los_factors[:, 2] = num.cos(self.theta) * num.sin(self.phi) 

325 return self._los_factors 

326 

327 def post_process(self, engine, source, statics): 

328 return meta.SatelliteResult( 

329 result=statics, 

330 theta=self.theta, phi=self.phi) 

331 

332 

333class KiteSceneTarget(SatelliteTarget): 

334 

335 shape = Tuple.T( 

336 2, Int.T(), 

337 optional=False, 

338 help='Shape of the displacement vectors.') 

339 

340 def __init__(self, scene, **kwargs): 

341 size = scene.displacement.size 

342 

343 if scene.frame.spacing == 'meter': 

344 lats = num_full(size, scene.frame.llLat) 

345 lons = num_full(size, scene.frame.llLon) 

346 north_shifts = scene.frame.gridN.data.flatten() 

347 east_shifts = scene.frame.gridE.data.flatten() 

348 

349 elif scene.frame.spacing == 'degree': 

350 lats = scene.frame.gridN.data.flatten() + scene.frame.llLat 

351 lons = scene.frame.gridE.data.flatten() + scene.frame.llLon 

352 north_shifts = num.zeros(size) 

353 east_shifts = num.zeros(size) 

354 

355 self.scene = scene 

356 

357 super(KiteSceneTarget, self).__init__( 

358 lats=lats, lons=lons, 

359 north_shifts=north_shifts, east_shifts=east_shifts, 

360 theta=scene.theta.flatten(), 

361 phi=scene.phi.flatten(), 

362 shape=scene.shape, **kwargs) 

363 

364 def post_process(self, engine, source, statics): 

365 res = meta.KiteSceneResult( 

366 result=statics, 

367 theta=self.theta, phi=self.phi, 

368 shape=self.scene.shape) 

369 res.config = self.scene.config 

370 return res 

371 

372 

373class GNSSCampaignTarget(StaticTarget): 

374 

375 def post_process(self, engine, source, statics): 

376 campaign = gnss.GNSSCampaign() 

377 

378 for ista in range(self.ntargets): 

379 north = gnss.GNSSComponent( 

380 shift=float(statics['displacement.n'][ista])) 

381 east = gnss.GNSSComponent( 

382 shift=float(statics['displacement.e'][ista])) 

383 up = gnss.GNSSComponent( 

384 shift=-float(statics['displacement.d'][ista])) 

385 

386 coords = self.coords5 

387 station = gnss.GNSSStation( 

388 lat=float(coords[ista, 0]), 

389 lon=float(coords[ista, 1]), 

390 east_shift=float(coords[ista, 2]), 

391 north_shift=float(coords[ista, 3]), 

392 elevation=float(coords[ista, 4]), 

393 north=north, 

394 east=east, 

395 up=up) 

396 

397 campaign.add_station(station) 

398 

399 return meta.GNSSCampaignResult(result=statics, campaign=campaign)