Coverage for /usr/local/lib/python3.11/dist-packages/pyrocko/gf/targets.py: 82%
159 statements
« prev ^ index » next coverage.py v6.5.0, created at 2023-10-06 06:59 +0000
« prev ^ index » next coverage.py v6.5.0, created at 2023-10-06 06:59 +0000
1# http://pyrocko.org - GPLv3
2#
3# The Pyrocko Developers, 21st Century
4# ---|P------/S----------~Lg----------
6'''
7Data structures representing the receivers for seismogram synthesis.
8'''
10import numpy as num
11import math
13from . import meta
14from pyrocko.guts import Timestamp, Tuple, String, Float, Object,\
15 StringChoice, Int
16from pyrocko.guts_array import Array
17from pyrocko.model import gnss
18from pyrocko.orthodrome import distance_accurate50m_numpy
19from pyrocko.util import num_full_like, num_full
21d2r = num.pi / 180.
24class BadTarget(Exception):
25 pass
28class Filter(Object):
29 pass
32class OptimizationMethod(StringChoice):
33 choices = ['enable', 'disable']
36def component_orientation(source, target, component):
37 '''
38 Get component and azimuth for standard components R, T, Z, N, and E.
40 :param source: :py:class:`pyrocko.model.location.Location` object
41 :param target: :py:class:`pyrocko.model.location.Location` object
42 :param component: string ``'R'``, ``'T'``, ``'Z'``, ``'N'`` or ``'E'``
43 '''
45 _, bazi = source.azibazi_to(target)
47 azi, dip = {
48 'T': (bazi + 270., 0.),
49 'R': (bazi + 180., 0.),
50 'N': (0., 0.),
51 'E': (90., 0.),
52 'Z': (0., -90.)}[component]
54 return azi, dip
57class Target(meta.Receiver):
58 '''
59 A seismogram computation request for a single component, including
60 its post-processing parmeters.
61 '''
63 quantity = meta.QuantityType.T(
64 optional=True,
65 help='Measurement quantity type. If not given, it is guessed from the '
66 'channel code. For some common cases, derivatives of the stored '
67 'quantities are supported by using finite difference '
68 'approximations (e.g. displacement to velocity or acceleration). '
69 '4th order central FD schemes are used.')
71 codes = Tuple.T(
72 4, String.T(), default=('', 'STA', '', 'Z'),
73 help='network, station, location and channel codes to be set on '
74 'the response trace.')
76 elevation = Float.T(
77 default=0.0,
78 help='station surface elevation in [m]')
80 store_id = meta.StringID.T(
81 optional=True,
82 help="ID of Green's function store to use for the computation. "
83 'If not given, the processor may use a system default.')
85 sample_rate = Float.T(
86 optional=True,
87 help='sample rate to produce. '
88 "If not given the GF store's default sample rate is used. "
89 'GF store specific restrictions may apply.')
91 interpolation = meta.InterpolationMethod.T(
92 default='nearest_neighbor',
93 help="Interpolation method between Green's functions. Supported are"
94 ' ``nearest_neighbor`` and ``multilinear``')
96 optimization = OptimizationMethod.T(
97 default='enable',
98 optional=True,
99 help='disable/enable optimizations in weight-delay-and-sum operation')
101 tmin = Timestamp.T(
102 optional=True,
103 help='time of first sample to request in [s]. '
104 "If not given, it is determined from the Green's functions.")
106 tmax = Timestamp.T(
107 optional=True,
108 help='time of last sample to request in [s]. '
109 "If not given, it is determined from the Green's functions.")
111 azimuth = Float.T(
112 optional=True,
113 help='azimuth of sensor component in [deg], clockwise from north. '
114 'If not given, it is guessed from the channel code.')
116 dip = Float.T(
117 optional=True,
118 help='dip of sensor component in [deg], '
119 'measured downward from horizontal. '
120 'If not given, it is guessed from the channel code.')
122 filter = Filter.T(
123 optional=True,
124 help='frequency response filter.')
126 def base_key(self):
127 return (self.store_id, self.sample_rate, self.interpolation,
128 self.optimization,
129 self.tmin, self.tmax,
130 self.elevation, self.depth, self.north_shift, self.east_shift,
131 self.lat, self.lon)
133 def effective_quantity(self):
134 if self.quantity is not None:
135 return self.quantity
137 # guess from channel code
138 cha = self.codes[-1].upper()
139 if len(cha) == 3:
140 # use most common SEED conventions here, however units have to be
141 # guessed, because they are not uniquely defined by the conventions
142 if cha[-2] in 'HL': # high gain, low gain seismometer
143 return 'velocity'
144 if cha[-2] == 'N': # accelerometer
145 return 'acceleration'
146 if cha[-2] == 'D': # hydrophone, barometer, ...
147 return 'pressure'
148 if cha[-2] == 'A': # tiltmeter
149 return 'tilt'
150 elif len(cha) == 2:
151 if cha[-2] == 'U':
152 return 'displacement'
153 if cha[-2] == 'V':
154 return 'velocity'
155 elif len(cha) == 1:
156 if cha in 'NEZ':
157 return 'displacement'
158 if cha == 'P':
159 return 'pressure'
161 raise BadTarget('cannot guess measurement quantity type from channel '
162 'code "%s"' % cha)
164 def receiver(self, store):
165 rec = meta.Receiver(**dict(meta.Receiver.T.inamevals(self)))
166 return rec
168 def component_code(self):
169 cha = self.codes[-1].upper()
170 if cha:
171 return cha[-1]
172 else:
173 return ' '
175 def effective_azimuth(self):
176 if self.azimuth is not None:
177 return self.azimuth
178 elif self.component_code() in 'NEZ':
179 return {'N': 0., 'E': 90., 'Z': 0.}[self.component_code()]
181 raise BadTarget('cannot determine sensor component azimuth for '
182 '%s.%s.%s.%s' % self.codes)
184 def effective_dip(self):
185 if self.dip is not None:
186 return self.dip
187 elif self.component_code() in 'NEZ':
188 return {'N': 0., 'E': 0., 'Z': -90.}[self.component_code()]
190 raise BadTarget('cannot determine sensor component dip')
192 def get_sin_cos_factors(self):
193 azi = self.effective_azimuth()
194 dip = self.effective_dip()
195 sa = math.sin(azi*d2r)
196 ca = math.cos(azi*d2r)
197 sd = math.sin(dip*d2r)
198 cd = math.cos(dip*d2r)
199 return sa, ca, sd, cd
201 def get_factor(self):
202 return 1.0
204 def post_process(self, engine, source, tr):
205 return meta.Result(trace=tr)
208class StaticTarget(meta.MultiLocation):
209 '''
210 A computation request for a spatial multi-location target of
211 static/geodetic quantities.
212 '''
213 quantity = meta.QuantityType.T(
214 optional=True,
215 default='displacement',
216 help='Measurement quantity type, for now only `displacement` is'
217 'supported.')
219 interpolation = meta.InterpolationMethod.T(
220 default='nearest_neighbor',
221 help="Interpolation method between Green's functions. Supported are"
222 ' ``nearest_neighbor`` and ``multilinear``')
224 tsnapshot = Timestamp.T(
225 optional=True,
226 help='time of the desired snapshot in [s], '
227 'If not given, the first sample is taken. If the desired sample'
228 " exceeds the length of the Green's function store,"
229 ' the last sample is taken.')
231 store_id = meta.StringID.T(
232 optional=True,
233 help="ID of Green's function store to use for the computation. "
234 'If not given, the processor may use a system default.')
236 def base_key(self):
237 return (self.store_id,
238 self.coords5.shape,
239 self.quantity,
240 self.tsnapshot,
241 self.interpolation)
243 @property
244 def ntargets(self):
245 '''
246 Number of targets held by instance.
247 '''
248 return self.ncoords
250 def get_targets(self):
251 '''
252 Discretizes the multilocation target into a list of
253 :py:class:`Target` objects.
255 :returns:
256 Individual point locations.
257 :rtype:
258 :py:class:`list` of :py:class:`Target`
259 '''
260 targets = []
261 for i in range(self.ntargets):
262 targets.append(
263 Target(
264 lat=float(self.coords5[i, 0]),
265 lon=float(self.coords5[i, 1]),
266 north_shift=float(self.coords5[i, 2]),
267 east_shift=float(self.coords5[i, 3]),
268 elevation=float(self.coords5[i, 4])))
269 return targets
271 def distance_to(self, source):
272 src_lats = num_full_like(self.lats, fill_value=source.lat)
273 src_lons = num_full_like(self.lons, fill_value=source.lon)
275 target_coords = self.get_latlon()
276 target_lats = target_coords[:, 0]
277 target_lons = target_coords[:, 1]
278 return distance_accurate50m_numpy(
279 src_lats, src_lons, target_lats, target_lons)
281 def post_process(self, engine, source, statics):
282 return meta.StaticResult(result=statics)
285class SatelliteTarget(StaticTarget):
286 '''
287 A computation request for a spatial multi-location target of
288 static/geodetic quantities measured from a satellite instrument.
289 The line of sight angles are provided and projecting
290 post-processing is applied.
291 '''
292 theta = Array.T(
293 shape=(None,),
294 dtype=float,
295 serialize_as='base64-compat',
296 help="Horizontal angle towards satellite's line of sight in radians."
297 '\n\n .. important::\n\n'
298 ' :math:`0` is **east** and'
299 ' :math:`\\frac{\\pi}{2}` is **north**.\n\n')
301 phi = Array.T(
302 shape=(None,),
303 dtype=float,
304 serialize_as='base64-compat',
305 help='Theta is look vector elevation angle towards satellite from'
306 " horizon in radians. Matrix of theta towards satellite's"
307 ' line of sight.'
308 '\n\n .. important::\n\n'
309 ' :math:`-\\frac{\\pi}{2}` is **down** and'
310 ' :math:`\\frac{\\pi}{2}` is **up**.\n\n')
312 def __init__(self, *args, **kwargs):
313 super(SatelliteTarget, self).__init__(*args, **kwargs)
314 self._los_factors = None
316 def get_los_factors(self):
317 if (self.theta.size != self.phi.size != self.lats.size):
318 raise AttributeError('LOS angles inconsistent with provided'
319 ' coordinate shape.')
320 if self._los_factors is None:
321 self._los_factors = num.empty((self.theta.shape[0], 3))
322 self._los_factors[:, 0] = num.sin(self.theta)
323 self._los_factors[:, 1] = num.cos(self.theta) * num.cos(self.phi)
324 self._los_factors[:, 2] = num.cos(self.theta) * num.sin(self.phi)
325 return self._los_factors
327 def post_process(self, engine, source, statics):
328 return meta.SatelliteResult(
329 result=statics,
330 theta=self.theta, phi=self.phi)
333class KiteSceneTarget(SatelliteTarget):
335 shape = Tuple.T(
336 2, Int.T(),
337 optional=False,
338 help='Shape of the displacement vectors.')
340 def __init__(self, scene, **kwargs):
341 size = scene.displacement.size
343 if scene.frame.spacing == 'meter':
344 lats = num_full(size, scene.frame.llLat)
345 lons = num_full(size, scene.frame.llLon)
346 north_shifts = scene.frame.gridN.data.flatten()
347 east_shifts = scene.frame.gridE.data.flatten()
349 elif scene.frame.spacing == 'degree':
350 lats = scene.frame.gridN.data.flatten() + scene.frame.llLat
351 lons = scene.frame.gridE.data.flatten() + scene.frame.llLon
352 north_shifts = num.zeros(size)
353 east_shifts = num.zeros(size)
355 self.scene = scene
357 super(KiteSceneTarget, self).__init__(
358 lats=lats, lons=lons,
359 north_shifts=north_shifts, east_shifts=east_shifts,
360 theta=scene.theta.flatten(),
361 phi=scene.phi.flatten(),
362 shape=scene.shape, **kwargs)
364 def post_process(self, engine, source, statics):
365 res = meta.KiteSceneResult(
366 result=statics,
367 theta=self.theta, phi=self.phi,
368 shape=self.scene.shape)
369 res.config = self.scene.config
370 return res
373class GNSSCampaignTarget(StaticTarget):
375 def post_process(self, engine, source, statics):
376 campaign = gnss.GNSSCampaign()
378 for ista in range(self.ntargets):
379 north = gnss.GNSSComponent(
380 shift=float(statics['displacement.n'][ista]))
381 east = gnss.GNSSComponent(
382 shift=float(statics['displacement.e'][ista]))
383 up = gnss.GNSSComponent(
384 shift=-float(statics['displacement.d'][ista]))
386 coords = self.coords5
387 station = gnss.GNSSStation(
388 lat=float(coords[ista, 0]),
389 lon=float(coords[ista, 1]),
390 east_shift=float(coords[ista, 2]),
391 north_shift=float(coords[ista, 3]),
392 elevation=float(coords[ista, 4]),
393 north=north,
394 east=east,
395 up=up)
397 campaign.add_station(station)
399 return meta.GNSSCampaignResult(result=statics, campaign=campaign)