# http://pyrocko.org - GPLv3
#
# The Pyrocko Developers, 21st Century
# ---|P------/S----------~Lg----------
from __future__ import absolute_import, division, print_function
from builtins import map
import time
import re
import logging
from pyrocko import util, guts
from pyrocko.io import io_common
from pyrocko.io import stationxml as sxml
logger = logging.getLogger('pyrocko.io.resp')
[docs]class RespError(io_common.FileLoadError):
pass
[docs]def ppolezero(s):
v = s.split()
return sxml.PoleZero(
number=int(v[0]),
real=sxml.FloatNoUnit(
value=float(v[1]),
plus_error=float(v[3]) or None,
minus_error=float(v[3]) or None),
imaginary=sxml.FloatNoUnit(
value=float(v[2]),
plus_error=float(v[4]) or None,
minus_error=float(v[4]) or None))
[docs]def pcfu(s):
v = list(map(float, s.split()))
return sxml.FloatWithUnit(
value=float(v[-2]),
plus_error=float(v[-1]) or None,
minus_error=float(v[-1]) or None)
[docs]def pnc(s):
v = list(map(float, s.split()))
return sxml.NumeratorCoefficient(i=int(v[0]), value=float(v[1]))
[docs]def punit(s):
return str(s.split()[0].decode('ascii'))
[docs]def psymmetry(s):
return {
b'A': 'NONE',
b'B': 'ODD',
b'C': 'EVEN'}[s.upper()]
[docs]def ptftype(s):
if s.startswith(b'A'):
return 'LAPLACE (RADIANS/SECOND)'
elif s.startswith(b'B'):
return 'LAPLACE (HERTZ)'
elif s.startswith(b'D'):
return 'DIGITAL (Z-TRANSFORM)'
else:
raise RespError('unknown pz transfer function type')
[docs]def pcftype(s):
if s.startswith(b'A'):
return 'ANALOG (RADIANS/SECOND)'
elif s.startswith(b'B'):
return 'ANALOG (HERTZ)'
elif s.startswith(b'D'):
return 'DIGITAL'
else:
raise RespError('unknown cf transfer function type')
[docs]def pblock_060(content):
stage_number = int(get1(content, b'04'))
return stage_number, None
[docs]def pblock_053(content):
stage_number = int(get1(content, b'04'))
pzs = sxml.PolesZeros(
pz_transfer_function_type=ptftype(get1(content, b'03')),
input_units=sxml.Units(name=punit(get1(content, b'05'))),
output_units=sxml.Units(name=punit(get1(content, b'06'))),
normalization_factor=float(get1(content, b'07')),
normalization_frequency=sxml.Frequency(
value=float(get1(content, b'08'))),
zero_list=list(map(ppolezero, getn(content, b'10-13'))),
pole_list=list(map(ppolezero, getn(content, b'15-18'))))
for i, x in enumerate(pzs.zero_list):
x.number = i
for i, x in enumerate(pzs.pole_list):
x.number = i
return stage_number, pzs
[docs]def pblock_043(content):
stage_number = -1
pzs = sxml.PolesZeros(
pz_transfer_function_type=ptftype(get1(content, b'05')),
input_units=sxml.Units(name=punit(get1(content, b'06'))),
output_units=sxml.Units(name=punit(get1(content, b'07'))),
normalization_factor=float(get1(content, b'08')),
normalization_frequency=sxml.Frequency(
value=float(get1(content, b'09'))),
zero_list=list(map(ppolezero, getn(content, b'11-14'))),
pole_list=list(map(ppolezero, getn(content, b'16-19'))))
for i, x in enumerate(pzs.zero_list):
x.number = i
for i, x in enumerate(pzs.pole_list):
x.number = i
return stage_number, pzs
[docs]def pblock_058(content):
stage_number = int(get1(content, b'03'))
gain = sxml.Gain(
value=float(get1(content, b'04')),
frequency=float(get1(content, b'05').split()[0]))
return stage_number, gain
[docs]def pblock_048(content):
stage_number = -1
gain = sxml.Gain(
value=float(get1(content, b'05')),
frequency=float(get1(content, b'06').split()[0]))
return stage_number, gain
[docs]def pblock_054(content):
stage_number = int(get1(content, b'04'))
cfs = sxml.Coefficients(
cf_transfer_function_type=pcftype(get1(content, b'03')),
input_units=sxml.Units(name=punit(get1(content, b'05'))),
output_units=sxml.Units(name=punit(get1(content, b'06'))),
numerator_list=list(map(pcfu, getn(content, b'08-09'))),
denominator_list=list(map(pcfu, getn(content, b'11-12'))))
return stage_number, cfs
[docs]def pblock_044(content):
stage_number = -1
cfs = sxml.Coefficients(
cf_transfer_function_type=pcftype(get1(content, b'05')),
input_units=sxml.Units(name=punit(get1(content, b'06'))),
output_units=sxml.Units(name=punit(get1(content, b'07'))),
numerator_list=list(map(pcfu, getn(content, b'09-10'))),
denominator_list=list(map(pcfu, getn(content, b'12-13'))))
return stage_number, cfs
[docs]def pblock_057(content):
stage_number = int(get1(content, b'03'))
deci = sxml.Decimation(
input_sample_rate=sxml.Frequency(value=float(get1(content, b'04'))),
factor=int(get1(content, b'05')),
offset=int(get1(content, b'06')),
delay=sxml.FloatWithUnit(value=float(get1(content, b'07'))),
correction=sxml.FloatWithUnit(value=float(get1(content, b'08'))))
return stage_number, deci
[docs]def pblock_047(content):
stage_number = -1
deci = sxml.Decimation(
input_sample_rate=sxml.Frequency(value=float(get1(content, b'05'))),
factor=int(get1(content, b'06')),
offset=int(get1(content, b'07')),
delay=sxml.FloatWithUnit(value=float(get1(content, b'08'))),
correction=sxml.FloatWithUnit(value=float(get1(content, b'09'))))
return stage_number, deci
[docs]def pblock_061(content):
stage_number = int(get1(content, b'03'))
fir = sxml.FIR(
name=get1(content, b'04', optional=True),
input_units=sxml.Units(name=punit(get1(content, b'06'))),
output_units=sxml.Units(name=punit(get1(content, b'07'))),
symmetry=psymmetry(get1(content, b'05')),
numerator_coefficient_list=list(map(pnc, getn(content, b'09'))))
return stage_number, fir
[docs]def pblock_041(content):
stage_number = -1
fir = sxml.FIR(
name=get1(content, b'04', optional=True),
input_units=sxml.Units(name=punit(get1(content, b'06'))),
output_units=sxml.Units(name=punit(get1(content, b'07'))),
symmetry=psymmetry(get1(content, b'05')),
numerator_coefficient_list=list(map(pnc, getn(content, b'09'))))
return stage_number, fir
bdefs = {
b'050': {
'name': 'Station Identifier Blockette',
},
b'052': {
'name': 'Channel Identifier Blockette',
},
b'060': {
'name': 'Response Reference Information',
'parse': pblock_060,
},
b'053': {
'name': 'Response (Poles & Zeros) Blockette',
'parse': pblock_053,
},
b'043': {
'name': 'Response (Poles & Zeros) Dictionary Blockette',
'parse': pblock_043,
},
b'054': {
'name': 'Response (Coefficients) Blockette',
'parse': pblock_054,
},
b'044': {
'name': 'Response (Coefficients) Dictionary Blockette',
'parse': pblock_044,
},
b'057': {
'name': 'Decimation Blockette',
'parse': pblock_057,
},
b'047': {
'name': 'Decimation Dictionary Blockette',
'parse': pblock_047,
},
b'058': {
'name': 'Channel Sensitivity/Gain Blockette',
'parse': pblock_058,
},
b'048': {
'name': 'Channel Sensitivity/Gain Dictionary Blockette',
'parse': pblock_048,
},
b'061': {
'name': 'FIR Response Blockette',
'parse': pblock_061,
},
b'041': {
'name': 'FIR Dictionary Blockette',
'parse': pblock_041,
},
}
[docs]def parse1(f):
for line in f:
line = line.rstrip(b'\r\n')
m = re.match(
br'\s*(#(.+)|B(\d\d\d)F(\d\d(-\d\d)?)\s+(([^:]+):\s*)?(.*))', line)
if m:
if m.group(2):
pass
elif m.group(3):
block = m.group(3)
field = m.group(4)
key = m.group(7)
value = m.group(8)
yield block, field, key, value
[docs]def parse2(f):
current_b = None
content = []
for block, field, key, value in parse1(f):
if current_b != block or field == b'03':
if current_b is not None:
yield current_b, content
current_b = block
content = []
content.append((field, key, value))
if current_b is not None:
yield current_b, content
[docs]def parse3(f):
state = [None, None, []]
for block, content in parse2(f):
if block == b'050' and state[0] and state[1]:
yield state
state = [None, None, []]
if block == b'050':
state[0] = content
elif block == b'052':
state[1] = content
else:
state[2].append((block, content))
if state[0] and state[1]:
yield state
[docs]def get1(content, field, default=None, optional=False):
for field_, _, value in content:
if field_ == field:
return value
else:
if optional:
return None
elif default is not None:
return default
else:
raise RespError('key not found: %s' % field)
[docs]def getn(content, field):
lst = []
for field_, _, value in content:
if field_ == field:
lst.append(value)
return lst
[docs]def pdate(s):
if len(s) < 17:
s += b'0000,001,00:00:00'[len(s):]
if s.startswith(b'2599') or s.startswith(b'2999'):
return None
elif s.lower().startswith(b'no'):
return None
else:
t = s.split(b',')
if len(t) > 2 and t[1] == b'000':
s = b','.join([t[0], b'001'] + t[2:])
return util.str_to_time(
str(s.decode('ascii')), format='%Y,%j,%H:%M:%S.OPTFRAC')
[docs]def ploc(s):
if s == b'??':
return ''
else:
return str(s.decode('ascii'))
[docs]def pcode(s):
return str(s.decode('ascii'))
[docs]def gett(lst, t):
return [x for x in lst if isinstance(x, t)]
[docs]def gett1o(lst, t):
lst = [x for x in lst if isinstance(x, t)]
if len(lst) == 0:
return None
elif len(lst) == 1:
return lst[0]
else:
raise RespError('duplicate entry')
[docs]def gett1(lst, t):
lst = [x for x in lst if isinstance(x, t)]
if len(lst) == 0:
raise RespError('entry not found')
elif len(lst) == 1:
return lst[0]
else:
raise RespError('duplicate entry')
[docs]class ChannelResponse(guts.Object):
'''Response information + channel codes and time span.'''
codes = guts.Tuple.T(4, guts.String.T(default=''))
start_date = guts.Timestamp.T()
end_date = guts.Timestamp.T()
response = sxml.Response.T()
[docs]def iload_fh(f):
'''Read RESP information from open file handle.'''
for sc, cc, rcs in parse3(f):
nslc = (
pcode(get1(sc, b'16')),
pcode(get1(sc, b'03')),
ploc(get1(cc, b'03', b'')),
pcode(get1(cc, b'04')))
try:
tmin = pdate(get1(cc, b'22'))
tmax = pdate(get1(cc, b'23'))
except util.TimeStrError as e:
raise RespError('invalid date in RESP information. (%s)' % str(e))
stage_elements = {}
istage = -1
for block, content in rcs:
if block not in bdefs:
raise RespError('unknown block type found: %s' % block)
istage_temp, x = bdefs[block]['parse'](content)
if istage_temp != -1:
istage = istage_temp
if x is None:
continue
x.validate()
if istage not in stage_elements:
stage_elements[istage] = []
stage_elements[istage].append(x)
istages = sorted(stage_elements.keys())
stages = []
totalgain = None
for istage in istages:
elements = stage_elements[istage]
if istage == 0:
totalgain = gett1(elements, sxml.Gain)
else:
stage = sxml.ResponseStage(
number=istage,
poles_zeros_list=gett(elements, sxml.PolesZeros),
coefficients_list=gett(elements, sxml.Coefficients),
fir=gett1o(elements, sxml.FIR),
decimation=gett1o(elements, sxml.Decimation),
stage_gain=gett1o(elements, sxml.Gain))
stages.append(stage)
if stages:
resp = sxml.Response(
stage_list=stages)
if totalgain:
totalgain_value = totalgain.value
totalgain_frequency = totalgain.frequency
else:
totalgain_value = 1.
gain_frequencies = []
for stage in stages:
totalgain_value *= stage.stage_gain.value
gain_frequencies.append(stage.stage_gain.frequency)
totalgain_frequency = gain_frequencies[0]
if not all(f == totalgain_frequency for f in gain_frequencies):
logger.warn(
'no total gain reported and inconsistent gain '
'frequency values found in resp file for %s.%s.%s.%s: '
'omitting total gain and frequency from created '
'instrument sensitivity object' % nslc)
totalgain_value = None
totalgain_frequency = None
resp.instrument_sensitivity = sxml.Sensitivity(
value=totalgain_value,
frequency=totalgain_frequency,
input_units=stages[0].input_units,
output_units=stages[-1].output_units)
yield ChannelResponse(
codes=nslc,
start_date=tmin,
end_date=tmax,
response=resp)
else:
raise RespError('incomplete response information')
iload_filename, iload_dirname, iload_glob, iload = util.make_iload_family(
iload_fh, 'RESP', ':py:class:`ChannelResponse`')
[docs]def make_stationxml(pyrocko_stations, channel_responses):
'''Create stationxml from pyrocko station list and RESP information.
:param pyrocko_stations: list of :py:class:`pyrocko.model.Station` objects
:param channel_responses: iterable yielding :py:class:`ChannelResponse`
objects
:returns: :py:class:`pyrocko.fdsn.station.FDSNStationXML` object with
merged information
If no station information is available for any response information, it
is skipped and a warning is emitted.
'''
pstations = dict((s.nsl(), s) for s in pyrocko_stations)
networks = {}
stations = {}
for (net, sta, loc) in sorted(pstations.keys()):
pstation = pstations[net, sta, loc]
if net not in networks:
networks[net] = sxml.Network(code=net)
if (net, sta) not in stations:
stations[net, sta] = sxml.Station(
code=sta,
latitude=sxml.Latitude(pstation.lat),
longitude=sxml.Longitude(pstation.lon),
elevation=sxml.Distance(pstation.elevation))
networks[net].station_list.append(stations[net, sta])
for cr in channel_responses:
net, sta, loc, cha = cr.codes
if (net, sta, loc) in pstations:
pstation = pstations[net, sta, loc]
channel = sxml.Channel(
code=cha,
location_code=loc,
start_date=cr.start_date,
end_date=cr.end_date,
latitude=sxml.Latitude(pstation.lat),
longitude=sxml.Longitude(pstation.lon),
elevation=sxml.Distance(pstation.elevation),
depth=sxml.Distance(pstation.depth),
response=cr.response)
stations[net, sta].channel_list.append(channel)
else:
logger.warning('no station information for %s.%s.%s' %
(net, sta, loc))
for station in stations.values():
station.channel_list.sort(key=lambda c: (c.location_code, c.code))
return sxml.FDSNStationXML(
source='Converted from Pyrocko stations file and RESP information',
created=time.time(),
network_list=[networks[net_] for net_ in sorted(networks.keys())])
if __name__ == '__main__':
import sys
from pyrocko.model.station import load_stations
util.setup_logging(__name__)
if len(sys.argv) < 2:
sys.exit('usage: python -m pyrocko.fdsn.resp <stations> <resp> ...')
stations = load_stations(sys.argv[1])
sxml = make_stationxml(stations, iload(sys.argv[2:]))
print(sxml.dump_xml())