Coverage for /usr/local/lib/python3.11/dist-packages/pyrocko/model/station.py: 85%
291 statements
« prev ^ index » next coverage.py v6.5.0, created at 2023-10-06 06:59 +0000
« prev ^ index » next coverage.py v6.5.0, created at 2023-10-06 06:59 +0000
1# http://pyrocko.org - GPLv3
2#
3# The Pyrocko Developers, 21st Century
4# ---|P------/S----------~Lg----------
6'''
7Simple representation of a seismic station (Pyrocko classic).
9.. note::
11 An improved data model for seismic stations is available in
12 :py:mod:`pyrocko.squirrel.model`. This module will stay available for
13 backward compatibility. Conversion from Squirrel-based to Pyrocko
14 classic can be achieved with
15 :py:meth:`pyrocko.squirrel.model.Station.get_pyrocko_station`. '''
17import math
18import copy
19import logging
20import numpy as num
22from pyrocko import orthodrome
23from pyrocko.orthodrome import wrap
24from pyrocko.guts import Object, Float, String, List, dump_all
26from .location import Location
28logger = logging.getLogger('pyrocko.model.station')
30guts_prefix = 'pf'
32d2r = num.pi / 180.
35class ChannelsNotOrthogonal(Exception):
36 pass
39def guess_azimuth_from_name(channel_name):
40 if channel_name.endswith('N'):
41 return 0.
42 elif channel_name.endswith('E'):
43 return 90.
44 elif channel_name.endswith('Z'):
45 return 0.
47 return None
50def guess_dip_from_name(channel_name):
51 if channel_name.endswith('N'):
52 return 0.
53 elif channel_name.endswith('E'):
54 return 0.
55 elif channel_name.endswith('Z'):
56 return -90.
58 return None
61def guess_azimuth_dip_from_name(channel_name):
62 return guess_azimuth_from_name(channel_name), \
63 guess_dip_from_name(channel_name)
66def mkvec(x, y, z):
67 return num.array([x, y, z], dtype=float)
70def are_orthogonal(enus, eps=0.05):
71 return all(abs(x) < eps for x in [
72 num.dot(enus[0], enus[1]),
73 num.dot(enus[1], enus[2]),
74 num.dot(enus[2], enus[0])])
77def fill_orthogonal(enus):
79 nmiss = sum(x is None for x in enus)
81 if nmiss == 1:
82 for ic in range(len(enus)):
83 if enus[ic] is None:
84 enus[ic] = num.cross(enus[(ic-2) % 3], enus[(ic-1) % 3])
86 if nmiss == 2:
87 for ic in range(len(enus)):
88 if enus[ic] is not None:
89 xenu = enus[ic] + mkvec(1, 1, 1)
90 enus[(ic+1) % 3] = num.cross(enus[ic], xenu)
91 enus[(ic+2) % 3] = num.cross(enus[ic], enus[(ic+1) % 3])
93 if nmiss == 3:
94 # holy camoly..
95 enus[0] = mkvec(1, 0, 0)
96 enus[1] = mkvec(0, 1, 0)
97 enus[2] = mkvec(0, 0, 1)
100class Channel(Object):
101 name = String.T()
102 azimuth = Float.T(optional=True)
103 dip = Float.T(optional=True)
104 gain = Float.T(default=1.0)
106 def __init__(self, name, azimuth=None, dip=None, gain=1.0):
107 if azimuth is None:
108 azimuth = guess_azimuth_from_name(name)
109 if dip is None:
110 dip = guess_dip_from_name(name)
112 Object.__init__(
113 self,
114 name=name,
115 azimuth=float_or_none(azimuth),
116 dip=float_or_none(dip),
117 gain=float(gain))
119 @property
120 def ned(self):
121 if None in (self.azimuth, self.dip):
122 return None
124 n = math.cos(self.azimuth*d2r)*math.cos(self.dip*d2r)
125 e = math.sin(self.azimuth*d2r)*math.cos(self.dip*d2r)
126 d = math.sin(self.dip*d2r)
127 return mkvec(n, e, d)
129 @property
130 def enu(self):
131 if None in (self.azimuth, self.dip):
132 return None
134 n = math.cos(self.azimuth*d2r)*math.cos(self.dip*d2r)
135 e = math.sin(self.azimuth*d2r)*math.cos(self.dip*d2r)
136 d = math.sin(self.dip*d2r)
137 return mkvec(e, n, -d)
139 def __str__(self):
140 return '%s %f %f %g' % (self.name, self.azimuth, self.dip, self.gain)
143class Station(Location):
144 network = String.T()
145 station = String.T()
146 location = String.T()
147 name = String.T(default='')
148 channels = List.T(Channel.T())
150 def __init__(self, network='', station='', location='',
151 lat=0.0, lon=0.0,
152 elevation=0.0, depth=0.0,
153 north_shift=0.0, east_shift=0.0,
154 name='', channels=None):
156 Location.__init__(
157 self,
158 network=network, station=station, location=location,
159 lat=float(lat), lon=float(lon),
160 elevation=float(elevation),
161 depth=float(depth),
162 north_shift=float(north_shift),
163 east_shift=float(east_shift),
164 name=name or '',
165 channels=channels or [])
167 self.dist_deg = None
168 self.dist_m = None
169 self.azimuth = None
170 self.backazimuth = None
172 def copy(self):
173 return copy.deepcopy(self)
175 def set_event_relative_data(self, event, distance_3d=False):
176 surface_dist = self.distance_to(event)
177 if distance_3d:
178 if event.depth is None:
179 logger.warning('No event depth given: using 0 m.')
180 dd = 0.0 - self.depth
181 else:
182 dd = event.depth - self.depth
184 self.dist_m = math.sqrt(dd**2 + surface_dist**2)
185 else:
186 self.dist_m = surface_dist
188 self.dist_deg = surface_dist / orthodrome.earthradius_equator * \
189 orthodrome.r2d
191 self.azimuth, self.backazimuth = event.azibazi_to(self)
193 def set_channels_by_name(self, *args):
194 self.set_channels([])
195 for name in args:
196 self.add_channel(Channel(name))
198 def set_channels(self, channels):
199 self.channels = []
200 for ch in channels:
201 self.add_channel(ch)
203 def get_channels(self):
204 return list(self.channels)
206 def get_channel_names(self):
207 return set(ch.name for ch in self.channels)
209 def remove_channel_by_name(self, name):
210 todel = [ch for ch in self.channels if ch.name == name]
211 for ch in todel:
212 self.channels.remove(ch)
214 def add_channel(self, channel):
215 self.remove_channel_by_name(channel.name)
216 self.channels.append(channel)
217 self.channels.sort(key=lambda ch: ch.name)
219 def get_channel(self, name):
220 for ch in self.channels:
221 if ch.name == name:
222 return ch
224 return None
226 def rotation_ne_to_rt(self, in_channel_names, out_channel_names):
228 angle = wrap(self.backazimuth + 180., -180., 180.)
229 in_channels = [self.get_channel(name) for name in in_channel_names]
230 out_channels = [
231 Channel(out_channel_names[0],
232 wrap(self.backazimuth+180., -180., 180.), 0., 1.),
233 Channel(out_channel_names[1],
234 wrap(self.backazimuth+270., -180., 180.), 0., 1.)]
235 return angle, in_channels, out_channels
237 def _projection_to(
238 self, to, in_channel_names, out_channel_names, use_gains=False):
240 in_channels = [self.get_channel(name) for name in in_channel_names]
242 # create orthogonal vectors for missing components, such that this
243 # won't break projections when components are missing.
245 vecs = []
246 for ch in in_channels:
247 if ch is None:
248 vecs.append(None)
249 else:
250 vec = getattr(ch, to)
251 if use_gains:
252 vec /= ch.gain
253 vecs.append(vec)
255 fill_orthogonal(vecs)
256 if not are_orthogonal(vecs):
257 raise ChannelsNotOrthogonal(
258 'components are not orthogonal: station %s.%s.%s, '
259 'channels %s, %s, %s'
260 % (self.nsl() + tuple(in_channel_names)))
262 m = num.hstack([vec2[:, num.newaxis] for vec2 in vecs])
264 m = num.where(num.abs(m) < num.max(num.abs(m))*1e-16, 0., m)
266 if to == 'ned':
267 out_channels = [
268 Channel(out_channel_names[0], 0., 0., 1.),
269 Channel(out_channel_names[1], 90., 0., 1.),
270 Channel(out_channel_names[2], 0., 90., 1.)]
272 elif to == 'enu':
273 out_channels = [
274 Channel(out_channel_names[0], 90., 0., 1.),
275 Channel(out_channel_names[1], 0., 0., 1.),
276 Channel(out_channel_names[2], 0., -90., 1.)]
278 return m, in_channels, out_channels
280 def guess_channel_groups(self):
281 cg = {}
282 for channel in self.get_channels():
283 if len(channel.name) >= 1:
284 kind = channel.name[:-1]
285 if kind not in cg:
286 cg[kind] = []
287 cg[kind].append(channel.name[-1])
289 def allin(a, b):
290 return all(x in b for x in a)
292 out_groups = []
293 for kind, components in cg.items():
294 for sys in ('ENZ', '12Z', 'XYZ', 'RTZ', '123'):
295 if allin(sys, components):
296 out_groups.append(tuple([kind+c for c in sys]))
298 return out_groups
300 def guess_projections_to_enu(self, out_channels=('E', 'N', 'U'), **kwargs):
301 proj = []
302 for cg in self.guess_channel_groups():
303 try:
304 proj.append(self.projection_to_enu(
305 cg, out_channels=out_channels, **kwargs))
307 except ChannelsNotOrthogonal as e:
308 logger.warning(str(e))
310 return proj
312 def guess_projections_to_rtu(
313 self, out_channels=('R', 'T', 'U'), backazimuth=None, **kwargs):
315 if backazimuth is None:
316 backazimuth = self.backazimuth
317 out_channels_ = [
318 Channel(
319 out_channels[0], wrap(backazimuth+180., -180., 180.), 0., 1.),
320 Channel(
321 out_channels[1], wrap(backazimuth+270., -180., 180.), 0., 1.),
322 Channel(
323 out_channels[2], 0., -90., 1.)]
325 proj = []
326 for (m, in_channels, _) in self.guess_projections_to_enu(**kwargs):
327 phi = (backazimuth + 180.)*d2r
328 r = num.array([[math.sin(phi), math.cos(phi), 0.0],
329 [math.cos(phi), -math.sin(phi), 0.0],
330 [0.0, 0.0, 1.0]])
331 proj.append((num.dot(r, m), in_channels, out_channels_))
333 return proj
335 def projection_to_enu(
336 self,
337 in_channels,
338 out_channels=('E', 'N', 'U'),
339 **kwargs):
341 return self._projection_to('enu', in_channels, out_channels, **kwargs)
343 def projection_to_ned(
344 self,
345 in_channels,
346 out_channels=('N', 'E', 'D'),
347 **kwargs):
349 return self._projection_to('ned', in_channels, out_channels, **kwargs)
351 def projection_from_enu(
352 self,
353 in_channels=('E', 'N', 'U'),
354 out_channels=('X', 'Y', 'Z'),
355 **kwargs):
357 m, out_channels, in_channels = self._projection_to(
358 'enu', out_channels, in_channels, **kwargs)
360 return num.linalg.inv(m), in_channels, out_channels
362 def projection_from_ned(
363 self,
364 in_channels=('N', 'E', 'D'),
365 out_channels=('X', 'Y', 'Z'),
366 **kwargs):
368 m, out_channels, in_channels = self._projection_to(
369 'ned', out_channels, in_channels, **kwargs)
371 return num.linalg.inv(m), in_channels, out_channels
373 def nsl_string(self):
374 return '.'.join((self.network, self.station, self.location))
376 def nsl(self):
377 return self.network, self.station, self.location
379 def cannot_handle_offsets(self):
380 if self.north_shift != 0.0 or self.east_shift != 0.0:
381 logger.warning(
382 'Station %s.%s.%s has a non-zero Cartesian offset. Such '
383 'offsets cannot be saved in the basic station file format. '
384 'Effective lat/lons are saved only. Please save the stations '
385 'in YAML format to preserve the reference-and-offset '
386 'coordinates.' % self.nsl())
388 def oldstr(self):
389 self.cannot_handle_offsets()
390 nsl = '%s.%s.%s' % (self.network, self.station, self.location)
391 s = '%-15s %14.5f %14.5f %14.1f %14.1f %s' % (
392 nsl, self.effective_lat, self.effective_lon, self.elevation,
393 self.depth, self.name)
394 return s
397def dump_stations(stations, filename):
398 '''
399 Write stations file.
401 :param stations: list of :py:class:`Station` objects
402 :param filename: filename as str
403 '''
404 f = open(filename, 'w')
405 for sta in stations:
406 f.write(sta.oldstr()+'\n')
407 for cha in sta.get_channels():
408 azimuth = 'NaN'
409 if cha.azimuth is not None:
410 azimuth = '%7g' % cha.azimuth
412 dip = 'NaN'
413 if cha.dip is not None:
414 dip = '%7g' % cha.dip
416 f.write('%5s %14s %14s %14g\n' % (
417 cha.name, azimuth, dip, cha.gain))
419 f.close()
422def dump_stations_yaml(stations, filename):
423 '''
424 Write stations file in YAML format.
426 :param stations: list of :py:class:`Station` objects
427 :param filename: filename as str
428 '''
430 dump_all(stations, filename=filename)
433def float_or_none(s):
434 if s is None:
435 return None
436 elif isinstance(s, str) and s.lower() == 'nan':
437 return None
438 else:
439 return float(s)
442def detect_format(filename):
443 with open(filename, 'r') as f:
444 for line in f:
445 line = line.strip()
446 if not line or line.startswith('#') or line.startswith('%'):
447 continue
448 if line.startswith('--- !pf.Station'):
449 return 'yaml'
450 else:
451 return 'basic'
453 return 'basic'
456def load_stations(filename, format='detect'):
457 '''
458 Read stations file.
460 :param filename: filename
461 :returns: list of :py:class:`Station` objects
462 '''
464 if format == 'detect':
465 format = detect_format(filename)
467 if format == 'yaml':
468 from pyrocko import guts
469 stations = [
470 st for st in guts.load_all(filename=filename)
471 if isinstance(st, Station)]
473 return stations
475 elif format == 'basic':
476 stations = []
477 f = open(filename, 'r')
478 station = None
479 channel_names = []
480 for (iline, line) in enumerate(f):
481 toks = line.split(None, 5)
482 if line.strip().startswith('#') or line.strip() == '':
483 continue
485 if len(toks) == 5 or len(toks) == 6:
486 net, sta, loc = toks[0].split('.')
487 lat, lon, elevation, depth = [float(x) for x in toks[1:5]]
488 if len(toks) == 5:
489 name = ''
490 else:
491 name = toks[5].rstrip()
493 station = Station(
494 net, sta, loc, lat, lon,
495 elevation=elevation, depth=depth, name=name)
497 stations.append(station)
498 channel_names = []
500 elif len(toks) == 4 and station is not None:
501 name, azi, dip, gain = (
502 toks[0],
503 float_or_none(toks[1]),
504 float_or_none(toks[2]),
505 float(toks[3]))
506 if name in channel_names:
507 logger.warning(
508 'redefined channel! (line: %i, file: %s)' %
509 (iline + 1, filename))
510 else:
511 channel_names.append(name)
513 channel = Channel(name, azimuth=azi, dip=dip, gain=gain)
514 station.add_channel(channel)
516 else:
517 logger.warning('skipping invalid station/channel definition '
518 '(line: %i, file: %s' % (iline + 1, filename))
520 f.close()
521 return stations
523 else:
524 from pyrocko.io.io_common import FileLoadError
525 raise FileLoadError('unknown event file format: %s' % format)
528def dump_kml(objects, filename):
529 station_template = '''
530 <Placemark>
531 <name>%(network)s.%(station)s.%(location)s</name>
532 <description></description>
533 <styleUrl>#msn_S</styleUrl>
534 <Point>
535 <coordinates>%(elon)f,%(elat)f,%(elevation)f</coordinates>
536 </Point>
537 </Placemark>
538'''
540 f = open(filename, 'w')
541 f.write('<?xml version="1.0" encoding="UTF-8"?>\n')
542 f.write('<kml xmlns="http://www.opengis.net/kml/2.2">\n')
543 f.write('<Document>\n')
544 f.write(''' <Style id="sh_S">
545 <IconStyle>
546 <scale>1.3</scale>
547 <Icon>
548 <href>http://maps.google.com/mapfiles/kml/paddle/S.png</href>
549 </Icon>
550 <hotSpot x="32" y="1" xunits="pixels" yunits="pixels"/>
551 </IconStyle>
552 <ListStyle>
553 <ItemIcon>
554 <href>http://maps.google.com/mapfiles/kml/paddle/S-lv.png</href>
555 </ItemIcon>
556 </ListStyle>
557 </Style>
558 <Style id="sn_S">
559 <IconStyle>
560 <scale>1.1</scale>
561 <Icon>
562 <href>http://maps.google.com/mapfiles/kml/paddle/S.png</href>
563 </Icon>
564 <hotSpot x="32" y="1" xunits="pixels" yunits="pixels"/>
565 </IconStyle>
566 <ListStyle>
567 <ItemIcon>
568 <href>http://maps.google.com/mapfiles/kml/paddle/S-lv.png</href>
569 </ItemIcon>
570 </ListStyle>
571 </Style>
572 <StyleMap id="msn_S">
573 <Pair>
574 <key>normal</key>
575 <styleUrl>#sn_S</styleUrl>
576 </Pair>
577 <Pair>
578 <key>highlight</key>
579 <styleUrl>#sh_S</styleUrl>
580 </Pair>
581 </StyleMap>
582''')
583 for obj in objects:
585 if isinstance(obj, Station):
586 d = obj.__dict__.copy()
587 d['elat'], d['elon'] = obj.effective_latlon
588 f.write(station_template % d)
590 f.write('</Document>')
591 f.write('</kml>\n')
592 f.close()