1# http://pyrocko.org - GPLv3
2#
3# The Pyrocko Developers, 21st Century
4# ---|P------/S----------~Lg----------
6import numpy as num
7import math
9from . import meta
10from pyrocko.guts import Timestamp, Tuple, String, Float, Object,\
11 StringChoice, Int
12from pyrocko.guts_array import Array
13from pyrocko.model import gnss
14from pyrocko.orthodrome import distance_accurate50m_numpy
15from pyrocko.util import num_full_like, num_full
17d2r = num.pi / 180.
20class BadTarget(Exception):
21 pass
24class Filter(Object):
25 pass
28class OptimizationMethod(StringChoice):
29 choices = ['enable', 'disable']
32def component_orientation(source, target, component):
33 '''
34 Get component and azimuth for standard components R, T, Z, N, and E.
36 :param source: :py:class:`pyrocko.gf.Location` object
37 :param target: :py:class:`pyrocko.gf.Location` object
38 :param component: string ``'R'``, ``'T'``, ``'Z'``, ``'N'`` or ``'E'``
39 '''
41 _, bazi = source.azibazi_to(target)
43 azi, dip = {
44 'T': (bazi + 270., 0.),
45 'R': (bazi + 180., 0.),
46 'N': (0., 0.),
47 'E': (90., 0.),
48 'Z': (0., -90.)}[component]
50 return azi, dip
53class Target(meta.Receiver):
54 '''
55 A seismogram computation request for a single component, including
56 its post-processing parmeters.
57 '''
59 quantity = meta.QuantityType.T(
60 optional=True,
61 help='Measurement quantity type. If not given, it is guessed from the '
62 'channel code. For some common cases, derivatives of the stored '
63 'quantities are supported by using finite difference '
64 'approximations (e.g. displacement to velocity or acceleration). '
65 '4th order central FD schemes are used.')
67 codes = Tuple.T(
68 4, String.T(), default=('', 'STA', '', 'Z'),
69 help='network, station, location and channel codes to be set on '
70 'the response trace.')
72 elevation = Float.T(
73 default=0.0,
74 help='station surface elevation in [m]')
76 store_id = meta.StringID.T(
77 optional=True,
78 help='ID of Green\'s function store to use for the computation. '
79 'If not given, the processor may use a system default.')
81 sample_rate = Float.T(
82 optional=True,
83 help='sample rate to produce. '
84 'If not given the GF store\'s default sample rate is used. '
85 'GF store specific restrictions may apply.')
87 interpolation = meta.InterpolationMethod.T(
88 default='nearest_neighbor',
89 help='Interpolation method between Green\'s functions. Supported are'
90 ' ``nearest_neighbor`` and ``multilinear``')
92 optimization = OptimizationMethod.T(
93 default='enable',
94 optional=True,
95 help='disable/enable optimizations in weight-delay-and-sum operation')
97 tmin = Timestamp.T(
98 optional=True,
99 help='time of first sample to request in [s]. '
100 'If not given, it is determined from the Green\'s functions.')
102 tmax = Timestamp.T(
103 optional=True,
104 help='time of last sample to request in [s]. '
105 'If not given, it is determined from the Green\'s functions.')
107 azimuth = Float.T(
108 optional=True,
109 help='azimuth of sensor component in [deg], clockwise from north. '
110 'If not given, it is guessed from the channel code.')
112 dip = Float.T(
113 optional=True,
114 help='dip of sensor component in [deg], '
115 'measured downward from horizontal. '
116 'If not given, it is guessed from the channel code.')
118 filter = Filter.T(
119 optional=True,
120 help='frequency response filter.')
122 def base_key(self):
123 return (self.store_id, self.sample_rate, self.interpolation,
124 self.optimization,
125 self.tmin, self.tmax,
126 self.elevation, self.depth, self.north_shift, self.east_shift,
127 self.lat, self.lon)
129 def effective_quantity(self):
130 if self.quantity is not None:
131 return self.quantity
133 # guess from channel code
134 cha = self.codes[-1].upper()
135 if len(cha) == 3:
136 # use most common SEED conventions here, however units have to be
137 # guessed, because they are not uniquely defined by the conventions
138 if cha[-2] in 'HL': # high gain, low gain seismometer
139 return 'velocity'
140 if cha[-2] == 'N': # accelerometer
141 return 'acceleration'
142 if cha[-2] == 'D': # hydrophone, barometer, ...
143 return 'pressure'
144 if cha[-2] == 'A': # tiltmeter
145 return 'tilt'
146 elif len(cha) == 2:
147 if cha[-2] == 'U':
148 return 'displacement'
149 if cha[-2] == 'V':
150 return 'velocity'
151 elif len(cha) == 1:
152 if cha in 'NEZ':
153 return 'displacement'
154 if cha == 'P':
155 return 'pressure'
157 raise BadTarget('cannot guess measurement quantity type from channel '
158 'code "%s"' % cha)
160 def receiver(self, store):
161 rec = meta.Receiver(**dict(meta.Receiver.T.inamevals(self)))
162 return rec
164 def component_code(self):
165 cha = self.codes[-1].upper()
166 if cha:
167 return cha[-1]
168 else:
169 return ' '
171 def effective_azimuth(self):
172 if self.azimuth is not None:
173 return self.azimuth
174 elif self.component_code() in 'NEZ':
175 return {'N': 0., 'E': 90., 'Z': 0.}[self.component_code()]
177 raise BadTarget('cannot determine sensor component azimuth for '
178 '%s.%s.%s.%s' % self.codes)
180 def effective_dip(self):
181 if self.dip is not None:
182 return self.dip
183 elif self.component_code() in 'NEZ':
184 return {'N': 0., 'E': 0., 'Z': -90.}[self.component_code()]
186 raise BadTarget('cannot determine sensor component dip')
188 def get_sin_cos_factors(self):
189 azi = self.effective_azimuth()
190 dip = self.effective_dip()
191 sa = math.sin(azi*d2r)
192 ca = math.cos(azi*d2r)
193 sd = math.sin(dip*d2r)
194 cd = math.cos(dip*d2r)
195 return sa, ca, sd, cd
197 def get_factor(self):
198 return 1.0
200 def post_process(self, engine, source, tr):
201 return meta.Result(trace=tr)
204class StaticTarget(meta.MultiLocation):
205 '''
206 A computation request for a spatial multi-location target of
207 static/geodetic quantities.
208 '''
209 quantity = meta.QuantityType.T(
210 optional=True,
211 default='displacement',
212 help='Measurement quantity type, for now only `displacement` is'
213 'supported.')
215 interpolation = meta.InterpolationMethod.T(
216 default='nearest_neighbor',
217 help='Interpolation method between Green\'s functions. Supported are'
218 ' ``nearest_neighbor`` and ``multilinear``')
220 tsnapshot = Timestamp.T(
221 optional=True,
222 help='time of the desired snapshot in [s], '
223 'If not given, the first sample is taken. If the desired sample'
224 ' exceeds the length of the Green\'s function store,'
225 ' the last sample is taken.')
227 store_id = meta.StringID.T(
228 optional=True,
229 help='ID of Green\'s function store to use for the computation. '
230 'If not given, the processor may use a system default.')
232 def base_key(self):
233 return (self.store_id,
234 self.coords5.shape,
235 self.quantity,
236 self.tsnapshot,
237 self.interpolation)
239 @property
240 def ntargets(self):
241 '''
242 Number of targets held by instance.
243 '''
244 return self.ncoords
246 def get_targets(self):
247 '''
248 Discretizes the multilocation target into a list of
249 :class:`Target:`
251 :returns: :class:`Target`
252 :rtype: list
253 '''
254 targets = []
255 for i in range(self.ntargets):
256 targets.append(
257 Target(
258 lat=float(self.coords5[i, 0]),
259 lon=float(self.coords5[i, 1]),
260 north_shift=float(self.coords5[i, 2]),
261 east_shift=float(self.coords5[i, 3]),
262 elevation=float(self.coords5[i, 4])))
263 return targets
265 def distance_to(self, source):
266 src_lats = num_full_like(self.lats, fill_value=source.lat)
267 src_lons = num_full_like(self.lons, fill_value=source.lon)
269 target_coords = self.get_latlon()
270 target_lats = target_coords[:, 0]
271 target_lons = target_coords[:, 1]
272 return distance_accurate50m_numpy(
273 src_lats, src_lons, target_lats, target_lons)
275 def post_process(self, engine, source, statics):
276 return meta.StaticResult(result=statics)
279class SatelliteTarget(StaticTarget):
280 '''
281 A computation request for a spatial multi-location target of
282 static/geodetic quantities measured from a satellite instrument.
283 The line of sight angles are provided and projecting
284 post-processing is applied.
285 '''
286 theta = Array.T(
287 shape=(None,),
288 dtype=float,
289 serialize_as='base64-compat',
290 help='Horizontal angle towards satellite\'s line of sight in radians.'
291 '\n\n .. important::\n\n'
292 ' :math:`0` is **east** and'
293 ' :math:`\\frac{\\pi}{2}` is **north**.\n\n')
295 phi = Array.T(
296 shape=(None,),
297 dtype=float,
298 serialize_as='base64-compat',
299 help='Theta is look vector elevation angle towards satellite from'
300 ' horizon in radians. Matrix of theta towards satellite\'s'
301 ' line of sight.'
302 '\n\n .. important::\n\n'
303 ' :math:`-\\frac{\\pi}{2}` is **down** and'
304 ' :math:`\\frac{\\pi}{2}` is **up**.\n\n')
306 def __init__(self, *args, **kwargs):
307 super(SatelliteTarget, self).__init__(*args, **kwargs)
308 self._los_factors = None
310 def get_los_factors(self):
311 if (self.theta.size != self.phi.size != self.lats.size):
312 raise AttributeError('LOS angles inconsistent with provided'
313 ' coordinate shape.')
314 if self._los_factors is None:
315 self._los_factors = num.empty((self.theta.shape[0], 3))
316 self._los_factors[:, 0] = num.sin(self.theta)
317 self._los_factors[:, 1] = num.cos(self.theta) * num.cos(self.phi)
318 self._los_factors[:, 2] = num.cos(self.theta) * num.sin(self.phi)
319 return self._los_factors
321 def post_process(self, engine, source, statics):
322 return meta.SatelliteResult(
323 result=statics,
324 theta=self.theta, phi=self.phi)
327class KiteSceneTarget(SatelliteTarget):
329 shape = Tuple.T(
330 2, Int.T(),
331 optional=False,
332 help='Shape of the displacement vectors.')
334 def __init__(self, scene, **kwargs):
335 size = scene.displacement.size
337 if scene.frame.spacing == 'meter':
338 lats = num_full(size, scene.frame.llLat)
339 lons = num_full(size, scene.frame.llLon)
340 north_shifts = scene.frame.gridN.data.flatten()
341 east_shifts = scene.frame.gridE.data.flatten()
343 elif scene.frame.spacing == 'degree':
344 lats = scene.frame.gridN.data.flatten() + scene.frame.llLat
345 lons = scene.frame.gridE.data.flatten() + scene.frame.llLon
346 north_shifts = num.zeros(size)
347 east_shifts = num.zeros(size)
349 self.scene = scene
351 super(KiteSceneTarget, self).__init__(
352 lats=lats, lons=lons,
353 north_shifts=north_shifts, east_shifts=east_shifts,
354 theta=scene.theta.flatten(),
355 phi=scene.phi.flatten(),
356 shape=scene.shape, **kwargs)
358 def post_process(self, engine, source, statics):
359 res = meta.KiteSceneResult(
360 result=statics,
361 theta=self.theta, phi=self.phi,
362 shape=self.scene.shape)
363 res.config = self.scene.config
364 return res
367class GNSSCampaignTarget(StaticTarget):
369 def post_process(self, engine, source, statics):
370 campaign = gnss.GNSSCampaign()
372 for ista in range(self.ntargets):
373 north = gnss.GNSSComponent(
374 shift=float(statics['displacement.n'][ista]))
375 east = gnss.GNSSComponent(
376 shift=float(statics['displacement.e'][ista]))
377 up = gnss.GNSSComponent(
378 shift=-float(statics['displacement.d'][ista]))
380 coords = self.coords5
381 station = gnss.GNSSStation(
382 lat=float(coords[ista, 0]),
383 lon=float(coords[ista, 1]),
384 east_shift=float(coords[ista, 2]),
385 north_shift=float(coords[ista, 3]),
386 elevation=float(coords[ista, 4]),
387 north=north,
388 east=east,
389 up=up)
391 campaign.add_station(station)
393 return meta.GNSSCampaignResult(result=statics, campaign=campaign)