1# http://pyrocko.org - GPLv3
2#
3# The Pyrocko Developers, 21st Century
4# ---|P------/S----------~Lg----------
6import logging
7import math
8import numpy as num
9from collections import OrderedDict
11import pyrocko.orthodrome as od
13from pyrocko.guts import (Object, Float, String, List, StringChoice,
14 DateTimestamp)
15from pyrocko.model import Location
17guts_prefix = 'pf.gnss'
18logger = logging.getLogger('pyrocko.model.gnss')
21class GNSSComponent(Object):
22 '''
23 Component of a GNSSStation
24 '''
25 unit = StringChoice.T(
26 choices=['mm', 'cm', 'm'],
27 help='Unit of displacement',
28 default='m')
30 shift = Float.T(
31 default=0.,
32 help='Component\'s shift in unit')
34 sigma = Float.T(
35 default=0.,
36 help='One sigma uncertainty of the measurement')
38 def __add__(self, other):
39 if not isinstance(other, self.__class__):
40 raise AttributeError('Other has to be of instance %s'
41 % self.__class__)
42 comp = self.__class__()
43 comp.shift = self.shift + other.shift
44 comp.sigma = math.sqrt(self.sigma**2 + other.sigma**2)
45 return comp
47 def __iadd__(self, other):
48 self.shift += other.shift
49 self.sigma = math.sqrt(self.sigma**2 + other.sigma**2)
50 return self
53class GNSSStation(Location):
54 '''
55 Representation of a GNSS station during a campaign measurement
57 For more information see
58 http://kb.unavco.org/kb/assets/660/UNAVCO_Campaign_GPS_GNSS_Handbook.pdf
59 '''
61 code = String.T(
62 help='Four letter station code',
63 optional=True)
65 style = StringChoice.T(
66 choices=['static', 'rapid_static', 'kinematic'],
67 default='static')
69 survey_start = DateTimestamp.T(
70 optional=True,
71 help='Survey start time')
73 survey_end = DateTimestamp.T(
74 optional=True,
75 help='Survey end time')
77 correlation_ne = Float.T(
78 default=0.,
79 help='North-East component correlation')
81 correlation_eu = Float.T(
82 default=0.,
83 help='East-Up component correlation')
85 correlation_nu = Float.T(
86 default=0.,
87 help='North-Up component correlation')
89 north = GNSSComponent.T(
90 optional=True)
92 east = GNSSComponent.T(
93 optional=True)
95 up = GNSSComponent.T(
96 optional=True)
98 def __eq__(self, other):
99 try:
100 return self.code == other.code
101 except AttributeError:
102 return False
104 def get_covariance_matrix(self):
105 components = self.components.values()
106 ncomponents = self.ncomponents
108 covar = num.zeros((ncomponents, ncomponents))
109 for ic1, comp1 in enumerate(components):
110 for ic2, comp2 in enumerate(components):
111 corr = self._get_comp_correlation(comp1, comp2)
112 covar[ic1, ic2] = corr * comp1.sigma * comp2.sigma
114 # This floating point operation is inaccurate:
115 # corr * comp1.sigma * comp2.sigma != corr * comp2.sigma * comp1.sigma
116 #
117 # Hence this identity
118 covar[num.tril_indices_from(covar, k=-1)] = \
119 covar[num.triu_indices_from(covar, k=1)]
121 return covar
123 def get_correlation_matrix(self):
124 components = self.components.values()
125 ncomponents = self.ncomponents
127 corr = num.zeros((ncomponents, ncomponents))
128 corr[num.diag_indices_from(corr)] = num.array(
129 [c.sigma for c in components])
131 for ic1, comp1 in enumerate(components):
132 for ic2, comp2 in enumerate(components):
133 if comp1 is comp2:
134 continue
135 corr[ic1, ic2] = self._get_comp_correlation(comp1, comp2)
137 # See comment at get_covariance_matrix
138 corr[num.tril_indices_from(corr, k=-1)] = \
139 corr[num.triu_indices_from(corr, k=1)]
141 return corr
143 def get_displacement_data(self):
144 return num.array([c.shift for c in self.components.values()])
146 def get_component_mask(self):
147 return num.array(
148 [False if self.__getattribute__(name) is None else True
149 for name in ('north', 'east', 'up')], dtype=num.bool)
151 @property
152 def components(self):
153 return OrderedDict(
154 [(name, self.__getattribute__(name))
155 for name in ('north', 'east', 'up')
156 if self.__getattribute__(name) is not None])
158 @property
159 def ncomponents(self):
160 return len(self.components)
162 def _get_comp_correlation(self, comp1, comp2):
163 if comp1 is comp2:
164 return 1.
166 s = self
168 correlation_map = {
169 (s.north, s.east): s.correlation_ne,
170 (s.east, s.up): s.correlation_eu,
171 (s.north, s.up): s.correlation_nu
172 }
174 return correlation_map.get(
175 (comp1, comp2),
176 correlation_map.get((comp2, comp1), False))
179class GNSSCampaign(Object):
181 stations = List.T(
182 GNSSStation.T(),
183 help='List of GNSS campaign measurements')
185 name = String.T(
186 help='Campaign name',
187 default='Unnamed campaign')
189 survey_start = DateTimestamp.T(
190 optional=True)
192 survey_end = DateTimestamp.T(
193 optional=True)
195 def __init__(self, *args, **kwargs):
196 Object.__init__(self, *args, **kwargs)
197 self._cov_mat = None
198 self._cor_mat = None
200 def add_station(self, station):
201 self._cor_mat = None
202 self._cov_mat = None
203 return self.stations.append(station)
205 def remove_station(self, station_code):
206 try:
207 station = self.get_station(station_code)
208 self.stations.remove(station)
209 self._cor_mat = None
210 self._cov_mat = None
211 except ValueError:
212 logger.warn('Station {} does not exist in campaign, '
213 'do nothing.'.format(station_code))
215 def get_station(self, station_code):
216 for sta in self.stations:
217 if sta.code == station_code:
218 return sta
219 raise ValueError('Could not find station %s' % station_code)
221 def get_center_latlon(self):
222 return od.geographic_midpoint_locations(self.stations)
224 def get_radius(self):
225 coords = self.coordinates
226 return od.distance_accurate50m(
227 coords[:, 0].min(), coords[:, 1].min(),
228 coords[:, 0].max(), coords[:, 1].max()) / 2.
230 def get_covariance_matrix(self):
231 if self._cov_mat is None:
232 ncomponents = self.ncomponents
233 cov_arr = num.zeros((ncomponents, ncomponents))
235 idx = 0
236 for ista, sta in enumerate(self.stations):
237 ncomp = sta.ncomponents
238 cov_arr[idx:idx+ncomp, idx:idx+ncomp] = \
239 sta.get_covariance_matrix()
240 idx += ncomp
242 self._cov_mat = cov_arr
243 return self._cov_mat
245 def get_correlation_matrix(self):
246 if self._cor_mat is None:
247 ncomponents = self.ncomponents
248 cor_arr = num.zeros((ncomponents, ncomponents))
250 idx = 0
251 for ista, sta in enumerate(self.stations):
252 ncomp = sta.ncomponents
253 cor_arr[idx:idx+ncomp, idx:idx+ncomp] = \
254 sta.get_correlation_matrix()
255 idx += ncomp
257 self._cor_mat = cor_arr
258 return self._cor_mat
260 def get_component_mask(self):
261 return num.concatenate(
262 [s.get_component_mask() for s in self.stations])
264 def dump(self, *args, **kwargs):
265 self.regularize()
266 return Object.dump(self, *args, **kwargs)
268 @property
269 def coordinates(self):
270 return num.array([loc.effective_latlon for loc in self.stations])
272 @property
273 def nstations(self):
274 return len(self.stations)
276 @property
277 def ncomponents(self):
278 return sum([s.ncomponents for s in self.stations])