Coverage for /usr/local/lib/python3.11/dist-packages/pyrocko/model/station.py: 85%

291 statements  

« prev     ^ index     » next       coverage.py v6.5.0, created at 2023-10-04 09:52 +0000

1# http://pyrocko.org - GPLv3 

2# 

3# The Pyrocko Developers, 21st Century 

4# ---|P------/S----------~Lg---------- 

5 

6''' 

7Simple representation of a seismic station (Pyrocko classic). 

8 

9.. note:: 

10 

11 An improved data model for seismic stations is available in 

12 :py:mod:`pyrocko.squirrel.model`. This module will stay available for 

13 backward compatibility. Conversion from Squirrel-based to Pyrocko 

14 classic can be achieved with 

15 :py:meth:`pyrocko.squirrel.model.Station.get_pyrocko_station`. ''' 

16 

17import math 

18import copy 

19import logging 

20import numpy as num 

21 

22from pyrocko import orthodrome 

23from pyrocko.orthodrome import wrap 

24from pyrocko.guts import Object, Float, String, List, dump_all 

25 

26from .location import Location 

27 

28logger = logging.getLogger('pyrocko.model.station') 

29 

30guts_prefix = 'pf' 

31 

32d2r = num.pi / 180. 

33 

34 

35class ChannelsNotOrthogonal(Exception): 

36 pass 

37 

38 

39def guess_azimuth_from_name(channel_name): 

40 if channel_name.endswith('N'): 

41 return 0. 

42 elif channel_name.endswith('E'): 

43 return 90. 

44 elif channel_name.endswith('Z'): 

45 return 0. 

46 

47 return None 

48 

49 

50def guess_dip_from_name(channel_name): 

51 if channel_name.endswith('N'): 

52 return 0. 

53 elif channel_name.endswith('E'): 

54 return 0. 

55 elif channel_name.endswith('Z'): 

56 return -90. 

57 

58 return None 

59 

60 

61def guess_azimuth_dip_from_name(channel_name): 

62 return guess_azimuth_from_name(channel_name), \ 

63 guess_dip_from_name(channel_name) 

64 

65 

66def mkvec(x, y, z): 

67 return num.array([x, y, z], dtype=float) 

68 

69 

70def are_orthogonal(enus, eps=0.05): 

71 return all(abs(x) < eps for x in [ 

72 num.dot(enus[0], enus[1]), 

73 num.dot(enus[1], enus[2]), 

74 num.dot(enus[2], enus[0])]) 

75 

76 

77def fill_orthogonal(enus): 

78 

79 nmiss = sum(x is None for x in enus) 

80 

81 if nmiss == 1: 

82 for ic in range(len(enus)): 

83 if enus[ic] is None: 

84 enus[ic] = num.cross(enus[(ic-2) % 3], enus[(ic-1) % 3]) 

85 

86 if nmiss == 2: 

87 for ic in range(len(enus)): 

88 if enus[ic] is not None: 

89 xenu = enus[ic] + mkvec(1, 1, 1) 

90 enus[(ic+1) % 3] = num.cross(enus[ic], xenu) 

91 enus[(ic+2) % 3] = num.cross(enus[ic], enus[(ic+1) % 3]) 

92 

93 if nmiss == 3: 

94 # holy camoly.. 

95 enus[0] = mkvec(1, 0, 0) 

96 enus[1] = mkvec(0, 1, 0) 

97 enus[2] = mkvec(0, 0, 1) 

98 

99 

100class Channel(Object): 

101 name = String.T() 

102 azimuth = Float.T(optional=True) 

103 dip = Float.T(optional=True) 

104 gain = Float.T(default=1.0) 

105 

106 def __init__(self, name, azimuth=None, dip=None, gain=1.0): 

107 if azimuth is None: 

108 azimuth = guess_azimuth_from_name(name) 

109 if dip is None: 

110 dip = guess_dip_from_name(name) 

111 

112 Object.__init__( 

113 self, 

114 name=name, 

115 azimuth=float_or_none(azimuth), 

116 dip=float_or_none(dip), 

117 gain=float(gain)) 

118 

119 @property 

120 def ned(self): 

121 if None in (self.azimuth, self.dip): 

122 return None 

123 

124 n = math.cos(self.azimuth*d2r)*math.cos(self.dip*d2r) 

125 e = math.sin(self.azimuth*d2r)*math.cos(self.dip*d2r) 

126 d = math.sin(self.dip*d2r) 

127 return mkvec(n, e, d) 

128 

129 @property 

130 def enu(self): 

131 if None in (self.azimuth, self.dip): 

132 return None 

133 

134 n = math.cos(self.azimuth*d2r)*math.cos(self.dip*d2r) 

135 e = math.sin(self.azimuth*d2r)*math.cos(self.dip*d2r) 

136 d = math.sin(self.dip*d2r) 

137 return mkvec(e, n, -d) 

138 

139 def __str__(self): 

140 return '%s %f %f %g' % (self.name, self.azimuth, self.dip, self.gain) 

141 

142 

143class Station(Location): 

144 network = String.T() 

145 station = String.T() 

146 location = String.T() 

147 name = String.T(default='') 

148 channels = List.T(Channel.T()) 

149 

150 def __init__(self, network='', station='', location='', 

151 lat=0.0, lon=0.0, 

152 elevation=0.0, depth=0.0, 

153 north_shift=0.0, east_shift=0.0, 

154 name='', channels=None): 

155 

156 Location.__init__( 

157 self, 

158 network=network, station=station, location=location, 

159 lat=float(lat), lon=float(lon), 

160 elevation=float(elevation), 

161 depth=float(depth), 

162 north_shift=float(north_shift), 

163 east_shift=float(east_shift), 

164 name=name or '', 

165 channels=channels or []) 

166 

167 self.dist_deg = None 

168 self.dist_m = None 

169 self.azimuth = None 

170 self.backazimuth = None 

171 

172 def copy(self): 

173 return copy.deepcopy(self) 

174 

175 def set_event_relative_data(self, event, distance_3d=False): 

176 surface_dist = self.distance_to(event) 

177 if distance_3d: 

178 if event.depth is None: 

179 logger.warning('No event depth given: using 0 m.') 

180 dd = 0.0 - self.depth 

181 else: 

182 dd = event.depth - self.depth 

183 

184 self.dist_m = math.sqrt(dd**2 + surface_dist**2) 

185 else: 

186 self.dist_m = surface_dist 

187 

188 self.dist_deg = surface_dist / orthodrome.earthradius_equator * \ 

189 orthodrome.r2d 

190 

191 self.azimuth, self.backazimuth = event.azibazi_to(self) 

192 

193 def set_channels_by_name(self, *args): 

194 self.set_channels([]) 

195 for name in args: 

196 self.add_channel(Channel(name)) 

197 

198 def set_channels(self, channels): 

199 self.channels = [] 

200 for ch in channels: 

201 self.add_channel(ch) 

202 

203 def get_channels(self): 

204 return list(self.channels) 

205 

206 def get_channel_names(self): 

207 return set(ch.name for ch in self.channels) 

208 

209 def remove_channel_by_name(self, name): 

210 todel = [ch for ch in self.channels if ch.name == name] 

211 for ch in todel: 

212 self.channels.remove(ch) 

213 

214 def add_channel(self, channel): 

215 self.remove_channel_by_name(channel.name) 

216 self.channels.append(channel) 

217 self.channels.sort(key=lambda ch: ch.name) 

218 

219 def get_channel(self, name): 

220 for ch in self.channels: 

221 if ch.name == name: 

222 return ch 

223 

224 return None 

225 

226 def rotation_ne_to_rt(self, in_channel_names, out_channel_names): 

227 

228 angle = wrap(self.backazimuth + 180., -180., 180.) 

229 in_channels = [self.get_channel(name) for name in in_channel_names] 

230 out_channels = [ 

231 Channel(out_channel_names[0], 

232 wrap(self.backazimuth+180., -180., 180.), 0., 1.), 

233 Channel(out_channel_names[1], 

234 wrap(self.backazimuth+270., -180., 180.), 0., 1.)] 

235 return angle, in_channels, out_channels 

236 

237 def _projection_to( 

238 self, to, in_channel_names, out_channel_names, use_gains=False): 

239 

240 in_channels = [self.get_channel(name) for name in in_channel_names] 

241 

242 # create orthogonal vectors for missing components, such that this 

243 # won't break projections when components are missing. 

244 

245 vecs = [] 

246 for ch in in_channels: 

247 if ch is None: 

248 vecs.append(None) 

249 else: 

250 vec = getattr(ch, to) 

251 if use_gains: 

252 vec /= ch.gain 

253 vecs.append(vec) 

254 

255 fill_orthogonal(vecs) 

256 if not are_orthogonal(vecs): 

257 raise ChannelsNotOrthogonal( 

258 'components are not orthogonal: station %s.%s.%s, ' 

259 'channels %s, %s, %s' 

260 % (self.nsl() + tuple(in_channel_names))) 

261 

262 m = num.hstack([vec2[:, num.newaxis] for vec2 in vecs]) 

263 

264 m = num.where(num.abs(m) < num.max(num.abs(m))*1e-16, 0., m) 

265 

266 if to == 'ned': 

267 out_channels = [ 

268 Channel(out_channel_names[0], 0., 0., 1.), 

269 Channel(out_channel_names[1], 90., 0., 1.), 

270 Channel(out_channel_names[2], 0., 90., 1.)] 

271 

272 elif to == 'enu': 

273 out_channels = [ 

274 Channel(out_channel_names[0], 90., 0., 1.), 

275 Channel(out_channel_names[1], 0., 0., 1.), 

276 Channel(out_channel_names[2], 0., -90., 1.)] 

277 

278 return m, in_channels, out_channels 

279 

280 def guess_channel_groups(self): 

281 cg = {} 

282 for channel in self.get_channels(): 

283 if len(channel.name) >= 1: 

284 kind = channel.name[:-1] 

285 if kind not in cg: 

286 cg[kind] = [] 

287 cg[kind].append(channel.name[-1]) 

288 

289 def allin(a, b): 

290 return all(x in b for x in a) 

291 

292 out_groups = [] 

293 for kind, components in cg.items(): 

294 for sys in ('ENZ', '12Z', 'XYZ', 'RTZ', '123'): 

295 if allin(sys, components): 

296 out_groups.append(tuple([kind+c for c in sys])) 

297 

298 return out_groups 

299 

300 def guess_projections_to_enu(self, out_channels=('E', 'N', 'U'), **kwargs): 

301 proj = [] 

302 for cg in self.guess_channel_groups(): 

303 try: 

304 proj.append(self.projection_to_enu( 

305 cg, out_channels=out_channels, **kwargs)) 

306 

307 except ChannelsNotOrthogonal as e: 

308 logger.warning(str(e)) 

309 

310 return proj 

311 

312 def guess_projections_to_rtu( 

313 self, out_channels=('R', 'T', 'U'), backazimuth=None, **kwargs): 

314 

315 if backazimuth is None: 

316 backazimuth = self.backazimuth 

317 out_channels_ = [ 

318 Channel( 

319 out_channels[0], wrap(backazimuth+180., -180., 180.), 0., 1.), 

320 Channel( 

321 out_channels[1], wrap(backazimuth+270., -180., 180.), 0., 1.), 

322 Channel( 

323 out_channels[2], 0., -90., 1.)] 

324 

325 proj = [] 

326 for (m, in_channels, _) in self.guess_projections_to_enu(**kwargs): 

327 phi = (backazimuth + 180.)*d2r 

328 r = num.array([[math.sin(phi), math.cos(phi), 0.0], 

329 [math.cos(phi), -math.sin(phi), 0.0], 

330 [0.0, 0.0, 1.0]]) 

331 proj.append((num.dot(r, m), in_channels, out_channels_)) 

332 

333 return proj 

334 

335 def projection_to_enu( 

336 self, 

337 in_channels, 

338 out_channels=('E', 'N', 'U'), 

339 **kwargs): 

340 

341 return self._projection_to('enu', in_channels, out_channels, **kwargs) 

342 

343 def projection_to_ned( 

344 self, 

345 in_channels, 

346 out_channels=('N', 'E', 'D'), 

347 **kwargs): 

348 

349 return self._projection_to('ned', in_channels, out_channels, **kwargs) 

350 

351 def projection_from_enu( 

352 self, 

353 in_channels=('E', 'N', 'U'), 

354 out_channels=('X', 'Y', 'Z'), 

355 **kwargs): 

356 

357 m, out_channels, in_channels = self._projection_to( 

358 'enu', out_channels, in_channels, **kwargs) 

359 

360 return num.linalg.inv(m), in_channels, out_channels 

361 

362 def projection_from_ned( 

363 self, 

364 in_channels=('N', 'E', 'D'), 

365 out_channels=('X', 'Y', 'Z'), 

366 **kwargs): 

367 

368 m, out_channels, in_channels = self._projection_to( 

369 'ned', out_channels, in_channels, **kwargs) 

370 

371 return num.linalg.inv(m), in_channels, out_channels 

372 

373 def nsl_string(self): 

374 return '.'.join((self.network, self.station, self.location)) 

375 

376 def nsl(self): 

377 return self.network, self.station, self.location 

378 

379 def cannot_handle_offsets(self): 

380 if self.north_shift != 0.0 or self.east_shift != 0.0: 

381 logger.warning( 

382 'Station %s.%s.%s has a non-zero Cartesian offset. Such ' 

383 'offsets cannot be saved in the basic station file format. ' 

384 'Effective lat/lons are saved only. Please save the stations ' 

385 'in YAML format to preserve the reference-and-offset ' 

386 'coordinates.' % self.nsl()) 

387 

388 def oldstr(self): 

389 self.cannot_handle_offsets() 

390 nsl = '%s.%s.%s' % (self.network, self.station, self.location) 

391 s = '%-15s %14.5f %14.5f %14.1f %14.1f %s' % ( 

392 nsl, self.effective_lat, self.effective_lon, self.elevation, 

393 self.depth, self.name) 

394 return s 

395 

396 

397def dump_stations(stations, filename): 

398 ''' 

399 Write stations file. 

400 

401 :param stations: list of :py:class:`Station` objects 

402 :param filename: filename as str 

403 ''' 

404 f = open(filename, 'w') 

405 for sta in stations: 

406 f.write(sta.oldstr()+'\n') 

407 for cha in sta.get_channels(): 

408 azimuth = 'NaN' 

409 if cha.azimuth is not None: 

410 azimuth = '%7g' % cha.azimuth 

411 

412 dip = 'NaN' 

413 if cha.dip is not None: 

414 dip = '%7g' % cha.dip 

415 

416 f.write('%5s %14s %14s %14g\n' % ( 

417 cha.name, azimuth, dip, cha.gain)) 

418 

419 f.close() 

420 

421 

422def dump_stations_yaml(stations, filename): 

423 ''' 

424 Write stations file in YAML format. 

425 

426 :param stations: list of :py:class:`Station` objects 

427 :param filename: filename as str 

428 ''' 

429 

430 dump_all(stations, filename=filename) 

431 

432 

433def float_or_none(s): 

434 if s is None: 

435 return None 

436 elif isinstance(s, str) and s.lower() == 'nan': 

437 return None 

438 else: 

439 return float(s) 

440 

441 

442def detect_format(filename): 

443 with open(filename, 'r') as f: 

444 for line in f: 

445 line = line.strip() 

446 if not line or line.startswith('#') or line.startswith('%'): 

447 continue 

448 if line.startswith('--- !pf.Station'): 

449 return 'yaml' 

450 else: 

451 return 'basic' 

452 

453 return 'basic' 

454 

455 

456def load_stations(filename, format='detect'): 

457 ''' 

458 Read stations file. 

459 

460 :param filename: filename 

461 :returns: list of :py:class:`Station` objects 

462 ''' 

463 

464 if format == 'detect': 

465 format = detect_format(filename) 

466 

467 if format == 'yaml': 

468 from pyrocko import guts 

469 stations = [ 

470 st for st in guts.load_all(filename=filename) 

471 if isinstance(st, Station)] 

472 

473 return stations 

474 

475 elif format == 'basic': 

476 stations = [] 

477 f = open(filename, 'r') 

478 station = None 

479 channel_names = [] 

480 for (iline, line) in enumerate(f): 

481 toks = line.split(None, 5) 

482 if line.strip().startswith('#') or line.strip() == '': 

483 continue 

484 

485 if len(toks) == 5 or len(toks) == 6: 

486 net, sta, loc = toks[0].split('.') 

487 lat, lon, elevation, depth = [float(x) for x in toks[1:5]] 

488 if len(toks) == 5: 

489 name = '' 

490 else: 

491 name = toks[5].rstrip() 

492 

493 station = Station( 

494 net, sta, loc, lat, lon, 

495 elevation=elevation, depth=depth, name=name) 

496 

497 stations.append(station) 

498 channel_names = [] 

499 

500 elif len(toks) == 4 and station is not None: 

501 name, azi, dip, gain = ( 

502 toks[0], 

503 float_or_none(toks[1]), 

504 float_or_none(toks[2]), 

505 float(toks[3])) 

506 if name in channel_names: 

507 logger.warning( 

508 'redefined channel! (line: %i, file: %s)' % 

509 (iline + 1, filename)) 

510 else: 

511 channel_names.append(name) 

512 

513 channel = Channel(name, azimuth=azi, dip=dip, gain=gain) 

514 station.add_channel(channel) 

515 

516 else: 

517 logger.warning('skipping invalid station/channel definition ' 

518 '(line: %i, file: %s' % (iline + 1, filename)) 

519 

520 f.close() 

521 return stations 

522 

523 else: 

524 from pyrocko.io.io_common import FileLoadError 

525 raise FileLoadError('unknown event file format: %s' % format) 

526 

527 

528def dump_kml(objects, filename): 

529 station_template = ''' 

530 <Placemark> 

531 <name>%(network)s.%(station)s.%(location)s</name> 

532 <description></description> 

533 <styleUrl>#msn_S</styleUrl> 

534 <Point> 

535 <coordinates>%(elon)f,%(elat)f,%(elevation)f</coordinates> 

536 </Point> 

537 </Placemark> 

538''' 

539 

540 f = open(filename, 'w') 

541 f.write('<?xml version="1.0" encoding="UTF-8"?>\n') 

542 f.write('<kml xmlns="http://www.opengis.net/kml/2.2">\n') 

543 f.write('<Document>\n') 

544 f.write(''' <Style id="sh_S"> 

545 <IconStyle> 

546 <scale>1.3</scale> 

547 <Icon> 

548 <href>http://maps.google.com/mapfiles/kml/paddle/S.png</href> 

549 </Icon> 

550 <hotSpot x="32" y="1" xunits="pixels" yunits="pixels"/> 

551 </IconStyle> 

552 <ListStyle> 

553 <ItemIcon> 

554 <href>http://maps.google.com/mapfiles/kml/paddle/S-lv.png</href> 

555 </ItemIcon> 

556 </ListStyle> 

557 </Style> 

558 <Style id="sn_S"> 

559 <IconStyle> 

560 <scale>1.1</scale> 

561 <Icon> 

562 <href>http://maps.google.com/mapfiles/kml/paddle/S.png</href> 

563 </Icon> 

564 <hotSpot x="32" y="1" xunits="pixels" yunits="pixels"/> 

565 </IconStyle> 

566 <ListStyle> 

567 <ItemIcon> 

568 <href>http://maps.google.com/mapfiles/kml/paddle/S-lv.png</href> 

569 </ItemIcon> 

570 </ListStyle> 

571 </Style> 

572 <StyleMap id="msn_S"> 

573 <Pair> 

574 <key>normal</key> 

575 <styleUrl>#sn_S</styleUrl> 

576 </Pair> 

577 <Pair> 

578 <key>highlight</key> 

579 <styleUrl>#sh_S</styleUrl> 

580 </Pair> 

581 </StyleMap> 

582''') 

583 for obj in objects: 

584 

585 if isinstance(obj, Station): 

586 d = obj.__dict__.copy() 

587 d['elat'], d['elon'] = obj.effective_latlon 

588 f.write(station_template % d) 

589 

590 f.write('</Document>') 

591 f.write('</kml>\n') 

592 f.close()