Coverage for /usr/local/lib/python3.11/dist-packages/pyrocko/model/gnss.py: 86%

144 statements  

« prev     ^ index     » next       coverage.py v6.5.0, created at 2023-10-10 09:02 +0000

1# http://pyrocko.org - GPLv3 

2# 

3# The Pyrocko Developers, 21st Century 

4# ---|P------/S----------~Lg---------- 

5 

6''' 

7GNSS station and campaign data model. 

8''' 

9 

10import logging 

11import math 

12import numpy as num 

13from collections import OrderedDict 

14 

15import pyrocko.orthodrome as od 

16 

17from pyrocko.guts import (Object, Float, String, List, StringChoice, 

18 DateTimestamp) 

19from pyrocko.model import Location 

20 

21guts_prefix = 'pf.gnss' 

22logger = logging.getLogger('pyrocko.model.gnss') 

23 

24 

25class GNSSComponent(Object): 

26 ''' 

27 Component of a GNSSStation 

28 ''' 

29 unit = StringChoice.T( 

30 choices=['mm', 'cm', 'm'], 

31 help='Unit of displacement', 

32 default='m') 

33 

34 shift = Float.T( 

35 default=0., 

36 help="Component's shift in unit") 

37 

38 sigma = Float.T( 

39 default=0., 

40 help='One sigma uncertainty of the measurement') 

41 

42 def __add__(self, other): 

43 if not isinstance(other, self.__class__): 

44 raise AttributeError('Other has to be of instance %s' 

45 % self.__class__) 

46 comp = self.__class__() 

47 comp.shift = self.shift + other.shift 

48 comp.sigma = math.sqrt(self.sigma**2 + other.sigma**2) 

49 return comp 

50 

51 def __iadd__(self, other): 

52 self.shift += other.shift 

53 self.sigma = math.sqrt(self.sigma**2 + other.sigma**2) 

54 return self 

55 

56 

57class GNSSStation(Location): 

58 ''' 

59 Representation of a GNSS station during a campaign measurement 

60 

61 For more information see 

62 http://kb.unavco.org/kb/assets/660/UNAVCO_Campaign_GPS_GNSS_Handbook.pdf 

63 ''' 

64 

65 code = String.T( 

66 help='Four letter station code', 

67 optional=True) 

68 

69 style = StringChoice.T( 

70 choices=['static', 'rapid_static', 'kinematic'], 

71 default='static') 

72 

73 survey_start = DateTimestamp.T( 

74 optional=True, 

75 help='Survey start time') 

76 

77 survey_end = DateTimestamp.T( 

78 optional=True, 

79 help='Survey end time') 

80 

81 correlation_ne = Float.T( 

82 default=0., 

83 help='North-East component correlation') 

84 

85 correlation_eu = Float.T( 

86 default=0., 

87 help='East-Up component correlation') 

88 

89 correlation_nu = Float.T( 

90 default=0., 

91 help='North-Up component correlation') 

92 

93 north = GNSSComponent.T( 

94 optional=True) 

95 

96 east = GNSSComponent.T( 

97 optional=True) 

98 

99 up = GNSSComponent.T( 

100 optional=True) 

101 

102 def __eq__(self, other): 

103 try: 

104 return self.code == other.code 

105 except AttributeError: 

106 return False 

107 

108 def get_covariance_matrix(self): 

109 components = self.components.values() 

110 ncomponents = self.ncomponents 

111 

112 covar = num.zeros((ncomponents, ncomponents)) 

113 for ic1, comp1 in enumerate(components): 

114 for ic2, comp2 in enumerate(components): 

115 corr = self._get_comp_correlation(comp1, comp2) 

116 covar[ic1, ic2] = corr * comp1.sigma * comp2.sigma 

117 

118 # This floating point operation is inaccurate: 

119 # corr * comp1.sigma * comp2.sigma != corr * comp2.sigma * comp1.sigma 

120 # 

121 # Hence this identity 

122 covar[num.tril_indices_from(covar, k=-1)] = \ 

123 covar[num.triu_indices_from(covar, k=1)] 

124 

125 return covar 

126 

127 def get_correlation_matrix(self): 

128 components = self.components.values() 

129 ncomponents = self.ncomponents 

130 

131 corr = num.zeros((ncomponents, ncomponents)) 

132 corr[num.diag_indices_from(corr)] = num.array( 

133 [c.sigma for c in components]) 

134 

135 for ic1, comp1 in enumerate(components): 

136 for ic2, comp2 in enumerate(components): 

137 if comp1 is comp2: 

138 continue 

139 corr[ic1, ic2] = self._get_comp_correlation(comp1, comp2) 

140 

141 # See comment at get_covariance_matrix 

142 corr[num.tril_indices_from(corr, k=-1)] = \ 

143 corr[num.triu_indices_from(corr, k=1)] 

144 

145 return corr 

146 

147 def get_displacement_data(self): 

148 return num.array([c.shift for c in self.components.values()]) 

149 

150 def get_component_mask(self): 

151 return num.array( 

152 [False if self.__getattribute__(name) is None else True 

153 for name in ('north', 'east', 'up')], dtype=bool) 

154 

155 @property 

156 def components(self): 

157 return OrderedDict( 

158 [(name, self.__getattribute__(name)) 

159 for name in ('north', 'east', 'up') 

160 if self.__getattribute__(name) is not None]) 

161 

162 @property 

163 def ncomponents(self): 

164 return len(self.components) 

165 

166 def _get_comp_correlation(self, comp1, comp2): 

167 if comp1 is comp2: 

168 return 1. 

169 

170 s = self 

171 

172 correlation_map = { 

173 (s.north, s.east): s.correlation_ne, 

174 (s.east, s.up): s.correlation_eu, 

175 (s.north, s.up): s.correlation_nu 

176 } 

177 

178 return correlation_map.get( 

179 (comp1, comp2), 

180 correlation_map.get((comp2, comp1), False)) 

181 

182 

183class GNSSCampaign(Object): 

184 

185 stations = List.T( 

186 GNSSStation.T(), 

187 help='List of GNSS campaign measurements') 

188 

189 name = String.T( 

190 help='Campaign name', 

191 default='Unnamed campaign') 

192 

193 survey_start = DateTimestamp.T( 

194 optional=True) 

195 

196 survey_end = DateTimestamp.T( 

197 optional=True) 

198 

199 def __init__(self, *args, **kwargs): 

200 Object.__init__(self, *args, **kwargs) 

201 self._cov_mat = None 

202 self._cor_mat = None 

203 

204 def add_station(self, station): 

205 self._cor_mat = None 

206 self._cov_mat = None 

207 return self.stations.append(station) 

208 

209 def remove_station(self, station_code): 

210 try: 

211 station = self.get_station(station_code) 

212 self.stations.remove(station) 

213 self._cor_mat = None 

214 self._cov_mat = None 

215 except ValueError: 

216 logger.warning('Station {} does not exist in campaign, ' 

217 'do nothing.'.format(station_code)) 

218 

219 def get_station(self, station_code): 

220 for sta in self.stations: 

221 if sta.code == station_code: 

222 return sta 

223 raise ValueError('Could not find station %s' % station_code) 

224 

225 def get_center_latlon(self): 

226 return od.geographic_midpoint_locations(self.stations) 

227 

228 def get_radius(self): 

229 coords = self.coordinates 

230 return od.distance_accurate50m( 

231 coords[:, 0].min(), coords[:, 1].min(), 

232 coords[:, 0].max(), coords[:, 1].max()) / 2. 

233 

234 def get_covariance_matrix(self): 

235 if self._cov_mat is None: 

236 ncomponents = self.ncomponents 

237 cov_arr = num.zeros((ncomponents, ncomponents)) 

238 

239 idx = 0 

240 for ista, sta in enumerate(self.stations): 

241 ncomp = sta.ncomponents 

242 cov_arr[idx:idx+ncomp, idx:idx+ncomp] = \ 

243 sta.get_covariance_matrix() 

244 idx += ncomp 

245 

246 self._cov_mat = cov_arr 

247 return self._cov_mat 

248 

249 def get_correlation_matrix(self): 

250 if self._cor_mat is None: 

251 ncomponents = self.ncomponents 

252 cor_arr = num.zeros((ncomponents, ncomponents)) 

253 

254 idx = 0 

255 for ista, sta in enumerate(self.stations): 

256 ncomp = sta.ncomponents 

257 cor_arr[idx:idx+ncomp, idx:idx+ncomp] = \ 

258 sta.get_correlation_matrix() 

259 idx += ncomp 

260 

261 self._cor_mat = cor_arr 

262 return self._cor_mat 

263 

264 def get_component_mask(self): 

265 return num.concatenate( 

266 [s.get_component_mask() for s in self.stations]) 

267 

268 def dump(self, *args, **kwargs): 

269 self.regularize() 

270 return Object.dump(self, *args, **kwargs) 

271 

272 @property 

273 def coordinates(self): 

274 return num.array([loc.effective_latlon for loc in self.stations]) 

275 

276 @property 

277 def nstations(self): 

278 return len(self.stations) 

279 

280 @property 

281 def ncomponents(self): 

282 return sum([s.ncomponents for s in self.stations])