Coverage for /usr/local/lib/python3.11/dist-packages/pyrocko/gf/targets.py: 75%

158 statements  

« prev     ^ index     » next       coverage.py v6.5.0, created at 2024-03-07 11:54 +0000

1# http://pyrocko.org - GPLv3 

2# 

3# The Pyrocko Developers, 21st Century 

4# ---|P------/S----------~Lg---------- 

5 

6''' 

7Data structures representing the receivers for seismogram synthesis. 

8''' 

9 

10import numpy as num 

11import math 

12 

13from . import meta 

14from pyrocko.guts import Timestamp, Tuple, String, Float, Object, \ 

15 StringChoice, Int 

16from pyrocko.guts_array import Array 

17from pyrocko.model import gnss 

18from pyrocko.orthodrome import distance_accurate50m_numpy 

19 

20d2r = num.pi / 180. 

21 

22 

23class BadTarget(Exception): 

24 pass 

25 

26 

27class Filter(Object): 

28 pass 

29 

30 

31class OptimizationMethod(StringChoice): 

32 choices = ['enable', 'disable'] 

33 

34 

35def component_orientation(source, target, component): 

36 ''' 

37 Get component and azimuth for standard components R, T, Z, N, and E. 

38 

39 :param source: :py:class:`pyrocko.model.location.Location` object 

40 :param target: :py:class:`pyrocko.model.location.Location` object 

41 :param component: string ``'R'``, ``'T'``, ``'Z'``, ``'N'`` or ``'E'`` 

42 ''' 

43 

44 _, bazi = source.azibazi_to(target) 

45 

46 azi, dip = { 

47 'T': (bazi + 270., 0.), 

48 'R': (bazi + 180., 0.), 

49 'N': (0., 0.), 

50 'E': (90., 0.), 

51 'Z': (0., -90.)}[component] 

52 

53 return azi, dip 

54 

55 

56class Target(meta.Receiver): 

57 ''' 

58 A seismogram computation request for a single component, including 

59 its post-processing parmeters. 

60 ''' 

61 

62 quantity = meta.QuantityType.T( 

63 optional=True, 

64 help='Measurement quantity type. If not given, it is guessed from the ' 

65 'channel code. For some common cases, derivatives of the stored ' 

66 'quantities are supported by using finite difference ' 

67 'approximations (e.g. displacement to velocity or acceleration). ' 

68 '4th order central FD schemes are used.') 

69 

70 codes = Tuple.T( 

71 4, String.T(), default=('', 'STA', '', 'Z'), 

72 help='network, station, location and channel codes to be set on ' 

73 'the response trace.') 

74 

75 elevation = Float.T( 

76 default=0.0, 

77 help='station surface elevation in [m]') 

78 

79 store_id = meta.StringID.T( 

80 optional=True, 

81 help="ID of Green's function store to use for the computation. " 

82 'If not given, the processor may use a system default.') 

83 

84 sample_rate = Float.T( 

85 optional=True, 

86 help='sample rate to produce. ' 

87 "If not given the GF store's default sample rate is used. " 

88 'GF store specific restrictions may apply.') 

89 

90 interpolation = meta.InterpolationMethod.T( 

91 default='nearest_neighbor', 

92 help="Interpolation method between Green's functions. Supported are" 

93 ' ``nearest_neighbor`` and ``multilinear``') 

94 

95 optimization = OptimizationMethod.T( 

96 default='enable', 

97 optional=True, 

98 help='disable/enable optimizations in weight-delay-and-sum operation') 

99 

100 tmin = Timestamp.T( 

101 optional=True, 

102 help='time of first sample to request in [s]. ' 

103 "If not given, it is determined from the Green's functions.") 

104 

105 tmax = Timestamp.T( 

106 optional=True, 

107 help='time of last sample to request in [s]. ' 

108 "If not given, it is determined from the Green's functions.") 

109 

110 azimuth = Float.T( 

111 optional=True, 

112 help='azimuth of sensor component in [deg], clockwise from north. ' 

113 'If not given, it is guessed from the channel code.') 

114 

115 dip = Float.T( 

116 optional=True, 

117 help='dip of sensor component in [deg], ' 

118 'measured downward from horizontal. ' 

119 'If not given, it is guessed from the channel code.') 

120 

121 filter = Filter.T( 

122 optional=True, 

123 help='frequency response filter.') 

124 

125 def base_key(self): 

126 return (self.store_id, self.sample_rate, self.interpolation, 

127 self.optimization, 

128 self.tmin, self.tmax, 

129 self.elevation, self.depth, self.north_shift, self.east_shift, 

130 self.lat, self.lon) 

131 

132 def effective_quantity(self): 

133 if self.quantity is not None: 

134 return self.quantity 

135 

136 # guess from channel code 

137 cha = self.codes[-1].upper() 

138 if len(cha) == 3: 

139 # use most common SEED conventions here, however units have to be 

140 # guessed, because they are not uniquely defined by the conventions 

141 if cha[-2] in 'HL': # high gain, low gain seismometer 

142 return 'velocity' 

143 if cha[-2] == 'N': # accelerometer 

144 return 'acceleration' 

145 if cha[-2] == 'D': # hydrophone, barometer, ... 

146 return 'pressure' 

147 if cha[-2] == 'A': # tiltmeter 

148 return 'tilt' 

149 elif len(cha) == 2: 

150 if cha[-2] == 'U': 

151 return 'displacement' 

152 if cha[-2] == 'V': 

153 return 'velocity' 

154 elif len(cha) == 1: 

155 if cha in 'NEZ': 

156 return 'displacement' 

157 if cha == 'P': 

158 return 'pressure' 

159 

160 raise BadTarget('cannot guess measurement quantity type from channel ' 

161 'code "%s"' % cha) 

162 

163 def receiver(self, store): 

164 rec = meta.Receiver(**dict(meta.Receiver.T.inamevals(self))) 

165 return rec 

166 

167 def component_code(self): 

168 cha = self.codes[-1].upper() 

169 if cha: 

170 return cha[-1] 

171 else: 

172 return ' ' 

173 

174 def effective_azimuth(self): 

175 if self.azimuth is not None: 

176 return self.azimuth 

177 elif self.component_code() in 'NEZ': 

178 return {'N': 0., 'E': 90., 'Z': 0.}[self.component_code()] 

179 

180 raise BadTarget('cannot determine sensor component azimuth for ' 

181 '%s.%s.%s.%s' % self.codes) 

182 

183 def effective_dip(self): 

184 if self.dip is not None: 

185 return self.dip 

186 elif self.component_code() in 'NEZ': 

187 return {'N': 0., 'E': 0., 'Z': -90.}[self.component_code()] 

188 

189 raise BadTarget('cannot determine sensor component dip') 

190 

191 def get_sin_cos_factors(self): 

192 azi = self.effective_azimuth() 

193 dip = self.effective_dip() 

194 sa = math.sin(azi*d2r) 

195 ca = math.cos(azi*d2r) 

196 sd = math.sin(dip*d2r) 

197 cd = math.cos(dip*d2r) 

198 return sa, ca, sd, cd 

199 

200 def get_factor(self): 

201 return 1.0 

202 

203 def post_process(self, engine, source, tr): 

204 return meta.Result(trace=tr) 

205 

206 

207class StaticTarget(meta.MultiLocation): 

208 ''' 

209 A computation request for a spatial multi-location target of 

210 static/geodetic quantities. 

211 ''' 

212 quantity = meta.QuantityType.T( 

213 optional=True, 

214 default='displacement', 

215 help='Measurement quantity type, for now only `displacement` is' 

216 'supported.') 

217 

218 interpolation = meta.InterpolationMethod.T( 

219 default='nearest_neighbor', 

220 help="Interpolation method between Green's functions. Supported are" 

221 ' ``nearest_neighbor`` and ``multilinear``') 

222 

223 tsnapshot = Timestamp.T( 

224 optional=True, 

225 help='time of the desired snapshot in [s], ' 

226 'If not given, the first sample is taken. If the desired sample' 

227 " exceeds the length of the Green's function store," 

228 ' the last sample is taken.') 

229 

230 store_id = meta.StringID.T( 

231 optional=True, 

232 help="ID of Green's function store to use for the computation. " 

233 'If not given, the processor may use a system default.') 

234 

235 def base_key(self): 

236 return (self.store_id, 

237 self.coords5.shape, 

238 self.quantity, 

239 self.tsnapshot, 

240 self.interpolation) 

241 

242 @property 

243 def ntargets(self): 

244 ''' 

245 Number of targets held by instance. 

246 ''' 

247 return self.ncoords 

248 

249 def get_targets(self): 

250 ''' 

251 Discretizes the multilocation target into a list of 

252 :py:class:`Target` objects. 

253 

254 :returns: 

255 Individual point locations. 

256 :rtype: 

257 :py:class:`list` of :py:class:`Target` 

258 ''' 

259 targets = [] 

260 for i in range(self.ntargets): 

261 targets.append( 

262 Target( 

263 lat=float(self.coords5[i, 0]), 

264 lon=float(self.coords5[i, 1]), 

265 north_shift=float(self.coords5[i, 2]), 

266 east_shift=float(self.coords5[i, 3]), 

267 elevation=float(self.coords5[i, 4]))) 

268 return targets 

269 

270 def distance_to(self, source): 

271 src_lats = num.full_like(self.lats, fill_value=source.lat) 

272 src_lons = num.full_like(self.lons, fill_value=source.lon) 

273 

274 target_coords = self.get_latlon() 

275 target_lats = target_coords[:, 0] 

276 target_lons = target_coords[:, 1] 

277 return distance_accurate50m_numpy( 

278 src_lats, src_lons, target_lats, target_lons) 

279 

280 def post_process(self, engine, source, statics): 

281 return meta.StaticResult(result=statics) 

282 

283 

284class SatelliteTarget(StaticTarget): 

285 ''' 

286 A computation request for a spatial multi-location target of 

287 static/geodetic quantities measured from a satellite instrument. 

288 The line of sight angles are provided and projecting 

289 post-processing is applied. 

290 ''' 

291 theta = Array.T( 

292 shape=(None,), 

293 dtype=float, 

294 serialize_as='base64-compat', 

295 help="Horizontal angle towards satellite's line of sight in radians." 

296 '\n\n .. important::\n\n' 

297 ' :math:`0` is **east** and' 

298 ' :math:`\\frac{\\pi}{2}` is **north**.\n\n') 

299 

300 phi = Array.T( 

301 shape=(None,), 

302 dtype=float, 

303 serialize_as='base64-compat', 

304 help='Theta is look vector elevation angle towards satellite from' 

305 " horizon in radians. Matrix of theta towards satellite's" 

306 ' line of sight.' 

307 '\n\n .. important::\n\n' 

308 ' :math:`-\\frac{\\pi}{2}` is **down** and' 

309 ' :math:`\\frac{\\pi}{2}` is **up**.\n\n') 

310 

311 def __init__(self, *args, **kwargs): 

312 super(SatelliteTarget, self).__init__(*args, **kwargs) 

313 self._los_factors = None 

314 

315 def get_los_factors(self): 

316 if (self.theta.size != self.phi.size != self.lats.size): 

317 raise AttributeError('LOS angles inconsistent with provided' 

318 ' coordinate shape.') 

319 if self._los_factors is None: 

320 self._los_factors = num.empty((self.theta.shape[0], 3)) 

321 self._los_factors[:, 0] = num.sin(self.theta) 

322 self._los_factors[:, 1] = num.cos(self.theta) * num.cos(self.phi) 

323 self._los_factors[:, 2] = num.cos(self.theta) * num.sin(self.phi) 

324 return self._los_factors 

325 

326 def post_process(self, engine, source, statics): 

327 return meta.SatelliteResult( 

328 result=statics, 

329 theta=self.theta, phi=self.phi) 

330 

331 

332class KiteSceneTarget(SatelliteTarget): 

333 

334 shape = Tuple.T( 

335 2, Int.T(), 

336 optional=False, 

337 help='Shape of the displacement vectors.') 

338 

339 def __init__(self, scene, **kwargs): 

340 size = scene.displacement.size 

341 

342 if scene.frame.spacing == 'meter': 

343 lats = num.full(size, scene.frame.llLat) 

344 lons = num.full(size, scene.frame.llLon) 

345 north_shifts = scene.frame.gridN.data.flatten() 

346 east_shifts = scene.frame.gridE.data.flatten() 

347 

348 elif scene.frame.spacing == 'degree': 

349 lats = scene.frame.gridN.data.flatten() + scene.frame.llLat 

350 lons = scene.frame.gridE.data.flatten() + scene.frame.llLon 

351 north_shifts = num.zeros(size) 

352 east_shifts = num.zeros(size) 

353 

354 self.scene = scene 

355 

356 super(KiteSceneTarget, self).__init__( 

357 lats=lats, lons=lons, 

358 north_shifts=north_shifts, east_shifts=east_shifts, 

359 theta=scene.theta.flatten(), 

360 phi=scene.phi.flatten(), 

361 shape=scene.shape, **kwargs) 

362 

363 def post_process(self, engine, source, statics): 

364 res = meta.KiteSceneResult( 

365 result=statics, 

366 theta=self.theta, phi=self.phi, 

367 shape=self.scene.shape) 

368 res.config = self.scene.config 

369 return res 

370 

371 

372class GNSSCampaignTarget(StaticTarget): 

373 

374 def post_process(self, engine, source, statics): 

375 campaign = gnss.GNSSCampaign() 

376 

377 for ista in range(self.ntargets): 

378 north = gnss.GNSSComponent( 

379 shift=float(statics['displacement.n'][ista])) 

380 east = gnss.GNSSComponent( 

381 shift=float(statics['displacement.e'][ista])) 

382 up = gnss.GNSSComponent( 

383 shift=-float(statics['displacement.d'][ista])) 

384 

385 coords = self.coords5 

386 station = gnss.GNSSStation( 

387 lat=float(coords[ista, 0]), 

388 lon=float(coords[ista, 1]), 

389 east_shift=float(coords[ista, 2]), 

390 north_shift=float(coords[ista, 3]), 

391 elevation=float(coords[ista, 4]), 

392 north=north, 

393 east=east, 

394 up=up) 

395 

396 campaign.add_station(station) 

397 

398 return meta.GNSSCampaignResult(result=statics, campaign=campaign)