Coverage for /usr/local/lib/python3.11/dist-packages/pyrocko/model/gnss.py: 86%
144 statements
« prev ^ index » next coverage.py v6.5.0, created at 2024-03-07 11:54 +0000
« prev ^ index » next coverage.py v6.5.0, created at 2024-03-07 11:54 +0000
1# http://pyrocko.org - GPLv3
2#
3# The Pyrocko Developers, 21st Century
4# ---|P------/S----------~Lg----------
6'''
7GNSS station and campaign data model.
8'''
10import logging
11import math
12import numpy as num
13from collections import OrderedDict
15import pyrocko.orthodrome as od
17from pyrocko.guts import (Object, Float, String, List, StringChoice,
18 DateTimestamp)
19from pyrocko.model import Location
21guts_prefix = 'pf.gnss'
22logger = logging.getLogger('pyrocko.model.gnss')
25class GNSSComponent(Object):
26 '''
27 Component of a GNSSStation
28 '''
29 unit = StringChoice.T(
30 choices=['mm', 'cm', 'm'],
31 help='Unit of displacement',
32 default='m')
34 shift = Float.T(
35 default=0.,
36 help="Component's shift in unit")
38 sigma = Float.T(
39 default=0.,
40 help='One sigma uncertainty of the measurement')
42 def __add__(self, other):
43 if not isinstance(other, self.__class__):
44 raise AttributeError('Other has to be of instance %s'
45 % self.__class__)
46 comp = self.__class__()
47 comp.shift = self.shift + other.shift
48 comp.sigma = math.sqrt(self.sigma**2 + other.sigma**2)
49 return comp
51 def __iadd__(self, other):
52 self.shift += other.shift
53 self.sigma = math.sqrt(self.sigma**2 + other.sigma**2)
54 return self
57class GNSSStation(Location):
58 '''
59 Representation of a GNSS station during a campaign measurement
61 For more information see
62 http://kb.unavco.org/kb/assets/660/UNAVCO_Campaign_GPS_GNSS_Handbook.pdf
63 '''
65 code = String.T(
66 help='Four letter station code',
67 optional=True)
69 style = StringChoice.T(
70 choices=['static', 'rapid_static', 'kinematic'],
71 default='static')
73 survey_start = DateTimestamp.T(
74 optional=True,
75 help='Survey start time')
77 survey_end = DateTimestamp.T(
78 optional=True,
79 help='Survey end time')
81 correlation_ne = Float.T(
82 default=0.,
83 help='North-East component correlation')
85 correlation_eu = Float.T(
86 default=0.,
87 help='East-Up component correlation')
89 correlation_nu = Float.T(
90 default=0.,
91 help='North-Up component correlation')
93 north = GNSSComponent.T(
94 optional=True)
96 east = GNSSComponent.T(
97 optional=True)
99 up = GNSSComponent.T(
100 optional=True)
102 def __eq__(self, other):
103 try:
104 return self.code == other.code
105 except AttributeError:
106 return False
108 def get_covariance_matrix(self):
109 components = self.components.values()
110 ncomponents = self.ncomponents
112 covar = num.zeros((ncomponents, ncomponents))
113 for ic1, comp1 in enumerate(components):
114 for ic2, comp2 in enumerate(components):
115 corr = self._get_comp_correlation(comp1, comp2)
116 covar[ic1, ic2] = corr * comp1.sigma * comp2.sigma
118 # This floating point operation is inaccurate:
119 # corr * comp1.sigma * comp2.sigma != corr * comp2.sigma * comp1.sigma
120 #
121 # Hence this identity
122 covar[num.tril_indices_from(covar, k=-1)] = \
123 covar[num.triu_indices_from(covar, k=1)]
125 return covar
127 def get_correlation_matrix(self):
128 components = self.components.values()
129 ncomponents = self.ncomponents
131 corr = num.zeros((ncomponents, ncomponents))
132 corr[num.diag_indices_from(corr)] = num.array(
133 [c.sigma for c in components])
135 for ic1, comp1 in enumerate(components):
136 for ic2, comp2 in enumerate(components):
137 if comp1 is comp2:
138 continue
139 corr[ic1, ic2] = self._get_comp_correlation(comp1, comp2)
141 # See comment at get_covariance_matrix
142 corr[num.tril_indices_from(corr, k=-1)] = \
143 corr[num.triu_indices_from(corr, k=1)]
145 return corr
147 def get_displacement_data(self):
148 return num.array([c.shift for c in self.components.values()])
150 def get_component_mask(self):
151 return num.array(
152 [False if self.__getattribute__(name) is None else True
153 for name in ('north', 'east', 'up')], dtype=bool)
155 @property
156 def components(self):
157 return OrderedDict(
158 [(name, self.__getattribute__(name))
159 for name in ('north', 'east', 'up')
160 if self.__getattribute__(name) is not None])
162 @property
163 def ncomponents(self):
164 return len(self.components)
166 def _get_comp_correlation(self, comp1, comp2):
167 if comp1 is comp2:
168 return 1.
170 s = self
172 correlation_map = {
173 (s.north, s.east): s.correlation_ne,
174 (s.east, s.up): s.correlation_eu,
175 (s.north, s.up): s.correlation_nu
176 }
178 return correlation_map.get(
179 (comp1, comp2),
180 correlation_map.get((comp2, comp1), False))
183class GNSSCampaign(Object):
185 stations = List.T(
186 GNSSStation.T(),
187 help='List of GNSS campaign measurements')
189 name = String.T(
190 help='Campaign name',
191 default='Unnamed campaign')
193 survey_start = DateTimestamp.T(
194 optional=True)
196 survey_end = DateTimestamp.T(
197 optional=True)
199 def __init__(self, *args, **kwargs):
200 Object.__init__(self, *args, **kwargs)
201 self._cov_mat = None
202 self._cor_mat = None
204 def add_station(self, station):
205 self._cor_mat = None
206 self._cov_mat = None
207 return self.stations.append(station)
209 def remove_station(self, station_code):
210 try:
211 station = self.get_station(station_code)
212 self.stations.remove(station)
213 self._cor_mat = None
214 self._cov_mat = None
215 except ValueError:
216 logger.warning('Station {} does not exist in campaign, '
217 'do nothing.'.format(station_code))
219 def get_station(self, station_code):
220 for sta in self.stations:
221 if sta.code == station_code:
222 return sta
223 raise ValueError('Could not find station %s' % station_code)
225 def get_center_latlon(self):
226 return od.geographic_midpoint_locations(self.stations)
228 def get_radius(self):
229 coords = self.coordinates
230 return od.distance_accurate50m(
231 coords[:, 0].min(), coords[:, 1].min(),
232 coords[:, 0].max(), coords[:, 1].max()) / 2.
234 def get_covariance_matrix(self):
235 if self._cov_mat is None:
236 ncomponents = self.ncomponents
237 cov_arr = num.zeros((ncomponents, ncomponents))
239 idx = 0
240 for ista, sta in enumerate(self.stations):
241 ncomp = sta.ncomponents
242 cov_arr[idx:idx+ncomp, idx:idx+ncomp] = \
243 sta.get_covariance_matrix()
244 idx += ncomp
246 self._cov_mat = cov_arr
247 return self._cov_mat
249 def get_correlation_matrix(self):
250 if self._cor_mat is None:
251 ncomponents = self.ncomponents
252 cor_arr = num.zeros((ncomponents, ncomponents))
254 idx = 0
255 for ista, sta in enumerate(self.stations):
256 ncomp = sta.ncomponents
257 cor_arr[idx:idx+ncomp, idx:idx+ncomp] = \
258 sta.get_correlation_matrix()
259 idx += ncomp
261 self._cor_mat = cor_arr
262 return self._cor_mat
264 def get_component_mask(self):
265 return num.concatenate(
266 [s.get_component_mask() for s in self.stations])
268 def dump(self, *args, **kwargs):
269 self.regularize()
270 return Object.dump(self, *args, **kwargs)
272 @property
273 def coordinates(self):
274 return num.array([loc.effective_latlon for loc in self.stations])
276 @property
277 def nstations(self):
278 return len(self.stations)
280 @property
281 def ncomponents(self):
282 return sum([s.ncomponents for s in self.stations])