# http://pyrocko.org - GPLv3
#
# The Pyrocko Developers, 21st Century
# ---|P------/S----------~Lg----------
from __future__ import absolute_import, print_function
import time
from collections import defaultdict
from pyrocko import trace, util, pz
from pyrocko.io import io_common
from pyrocko.io import stationxml as sxml
from pyrocko.guts import Object, Tuple, String, Timestamp, Float
[docs]class EnhancedSacPzError(io_common.FileLoadError):
pass
[docs]class EnhancedSacPzResponse(Object):
codes = Tuple.T(4, String.T())
tmin = Timestamp.T(optional=True)
tmax = Timestamp.T(optional=True)
lat = Float.T()
lon = Float.T()
elevation = Float.T()
depth = Float.T()
dip = Float.T()
azimuth = Float.T()
input_unit = String.T()
output_unit = String.T()
response = trace.PoleZeroResponse.T()
[docs] def spans(self, *args):
if len(args) == 0:
return True
elif len(args) == 1:
return ((self.tmin is None or
self.tmin <= args[0]) and
(self.tmax is None or
args[0] <= self.tmax))
elif len(args) == 2:
return ((self.tmin is None or
args[1] >= self.tmin) and
(self.tmax is None or
self.tmax >= args[0]))
[docs]def make_stationxml_response(presponse, input_unit, output_unit):
return sxml.Response.from_pyrocko_pz_response(
presponse, input_unit, output_unit, 1.0)
this_year = time.gmtime()[0]
[docs]def dummy_aware_str_to_time(s, time_format='%Y-%m-%dT%H:%M:%S'):
try:
util.str_to_time(s, format=time_format)
except util.TimeStrError:
year = int(s[:4])
if year > this_year + 100:
return None # StationXML contained a dummy end date
raise
[docs]def iload_fh(f, time_format='%Y-%m-%dT%H:%M:%S'):
zeros, poles, constant, comments = pz.read_sac_zpk(file=f,
get_comments=True)
d = {}
for line in comments:
toks = line.split(':', 1)
if len(toks) == 2:
temp = toks[0].strip('* \t')
for k in ('network', 'station', 'location', 'channel', 'start',
'end', 'latitude', 'longitude', 'depth', 'elevation',
'dip', 'azimuth', 'input unit', 'output unit'):
if temp.lower().startswith(k):
d[k] = toks[1].strip()
response = trace.PoleZeroResponse(zeros, poles, constant)
try:
yield EnhancedSacPzResponse(
codes=(d['network'], d['station'], d['location'], d['channel']),
tmin=util.str_to_time(d['start'], format=time_format),
tmax=dummy_aware_str_to_time(d['end']),
lat=float(d['latitude']),
lon=float(d['longitude']),
elevation=float(d['elevation']),
depth=float(d['depth']),
dip=float(d['dip']),
azimuth=float(d['azimuth']),
input_unit=d['input unit'],
output_unit=d['output unit'],
response=response)
except KeyError as e:
raise EnhancedSacPzError(
'cannot get all required information "%s"' % e.args[0])
iload_filename, iload_dirname, iload_glob, iload = util.make_iload_family(
iload_fh, 'SACPZ', ':py:class:`EnhancedSacPzResponse`')
[docs]def make_stationxml(responses, inconsistencies='warn'):
'''
Create stationxml from "enhanced" SACPZ information.
:param responses: iterable yielding
:py:class:`EnhancedSacPzResponse` objects
:returns: :py:class:`pyrocko.fdsn.station.FDSNStationXML` object
'''
networks = {}
stations = {}
station_coords = defaultdict(list)
station_channels = defaultdict(list)
for cr in responses:
net, sta, loc, cha = cr.codes
station_coords[net, sta].append(
(cr.codes, cr.lat, cr.lon, cr.elevation))
station_channels[net, sta].append(sxml.Channel(
code=cha,
location_code=loc,
start_date=cr.tmin,
end_date=cr.tmax,
latitude=sxml.Latitude(cr.lat),
longitude=sxml.Longitude(cr.lon),
elevation=sxml.Distance(cr.elevation),
depth=sxml.Distance(cr.depth),
response=make_stationxml_response(
cr.response, cr.input_unit, cr.output_unit)))
for (net, sta), v in sorted(station_coords.items()):
lat, lon, elevation = util.consistency_merge(
v,
message='channel lat/lon/elevation values differ',
error=inconsistencies)
if net not in networks:
networks[net] = sxml.Network(code=net)
stations[net, sta] = sxml.Station(
code=sta,
latitude=sxml.Latitude(lat),
longitude=sxml.Longitude(lon),
elevation=sxml.Distance(elevation),
channel_list=sorted(
station_channels[net, sta],
key=lambda c: (c.location_code, c.code)))
networks[net].station_list.append(stations[net, sta])
return sxml.FDSNStationXML(
source='Converted from "enhanced" SACPZ information',
created=time.time(),
network_list=[networks[net_] for net_ in sorted(networks.keys())])
if __name__ == '__main__':
import sys
util.setup_logging(__name__)
if len(sys.argv) < 2:
sys.exit('usage: python -m pyrocko.station.enhanced_sacpz <sacpz> ...')
sxml_in = make_stationxml(iload(sys.argv[1:]))
print(sxml_in.dump_xml())