# http://pyrocko.org - GPLv3
#
# The Pyrocko Developers, 21st Century
# ---|P------/S----------~Lg----------
'''
Frequency response parameterizations useful as transfer functions in signal
processing.
'''
import math
import logging
import uuid
import numpy as num
from scipy import signal
from pyrocko import evalresp
from pyrocko.guts import Object, Float, Int, String, Complex, Tuple, List, \
StringChoice, Bool
from pyrocko.guts_array import Array
guts_prefix = 'pf'
logger = logging.getLogger('pyrocko.response')
def asarray_1d(x, dtype):
if isinstance(x, (list, tuple)) and x and isinstance(x[0], str):
return num.asarray(list(map(dtype, x)), dtype=dtype)
else:
a = num.asarray(x, dtype=dtype)
if not a.ndim == 1:
raise ValueError('Could not convert to 1D array.')
return a
def finalize_construction(breakpoints):
breakpoints.sort()
breakpoints_out = []
f_last = None
for f, c in breakpoints:
if f_last is not None and f == f_last:
breakpoints_out[-1][1] += c
else:
breakpoints_out.append([f, c])
f_last = f
breakpoints_out = [(f, c) for (f, c) in breakpoints_out if c != 0]
return breakpoints_out
[docs]class FrequencyResponseCheckpoint(Object):
frequency = Float.T()
value = Float.T()
class IsNotScalar(Exception):
pass
def str_fmax_failsafe(resp):
try:
return '%g' % resp.get_fmax()
except InvalidResponseError:
return '?'
[docs]class FrequencyResponse(Object):
'''
Base class for parameterized frequency responses.
'''
checkpoints = List.T(
FrequencyResponseCheckpoint.T())
def __init__(self, *args, **kwargs):
Object.__init__(self, *args, **kwargs)
self.uuid = uuid.uuid4()
[docs] def evaluate(self, freqs):
'''
Evaluate the response at given frequencies.
:param freqs:
Frequencies [Hz].
:type freqs:
:py:class:`numpy.ndarray` of shape ``(N,)`` and dtype
:py:class:`float`
:returns:
Complex coefficients of the response.
:rtype:
:py:class:`numpy.ndarray` of shape ``(N,)`` and dtype
:py:class:`complex`
'''
return num.ones(freqs.size, dtype=complex)
[docs] def evaluate1(self, freq):
'''
Evaluate the response at a single frequency.
:param freq:
Frequency [Hz].
:type freqs:
float
:returns:
Complex response coefficient.
:rtype:
complex
'''
return self.evaluate(num.atleast_1d(freq))[0]
[docs] def is_scalar(self):
'''
Check if this is a flat response.
'''
if type(self) is FrequencyResponse:
return True
else:
return False # default for derived classes
[docs] def get_scalar(self):
'''
Get factor if this is a flat response.
'''
if type(self) is FrequencyResponse:
return 1.0
else:
raise IsNotScalar() # default for derived classes
[docs] def get_fmax(self):
'''
Get maximum frequency for which the response is defined.
:returns:
``None`` if the response has no upper limit, otherwise the maximum
frequency in [Hz] for which the response is valid is returned.
:rtype:
float or None
'''
return None
def construction(self):
return []
@property
def summary(self):
'''
Short summary with key information about the response object.
'''
if type(self) is FrequencyResponse:
return 'one'
else:
return 'unknown'
def str_gain(gain):
if gain == 1.0:
return 'one'
elif isinstance(gain, complex):
return 'gain{%s}' % repr(gain)
else:
return 'gain{%g}' % gain
[docs]class Gain(FrequencyResponse):
'''
A flat frequency response.
'''
constant = Complex.T(default=1.0+0j)
def evaluate(self, freqs):
return num.full_like(freqs, self.constant, dtype=complex)
def is_scalar(self):
return True
def get_scalar(self):
return self.constant
@property
def summary(self):
return str_gain(self.constant)
[docs]class Evalresp(FrequencyResponse):
'''
Calls evalresp and generates values of the instrument response transfer
function.
:param respfile: response file in evalresp format
:param trace: trace for which the response is to be extracted from the file
:param target: ``'dis'`` for displacement or ``'vel'`` for velocity
'''
respfile = String.T()
nslc_id = Tuple.T(4, String.T())
target = String.T(default='dis')
instant = Float.T()
stages = Tuple.T(2, Int.T(), optional=True)
def __init__(
self,
respfile,
trace=None,
target='dis',
nslc_id=None,
time=None,
stages=None,
**kwargs):
if trace is not None:
nslc_id = trace.nslc_id
time = (trace.tmin + trace.tmax) / 2.
FrequencyResponse.__init__(
self,
respfile=respfile,
nslc_id=nslc_id,
instant=time,
target=target,
stages=stages,
**kwargs)
def evaluate(self, freqs):
network, station, location, channel = self.nslc_id
if self.stages is None:
stages = (-1, 0)
else:
stages = self.stages[0]+1, self.stages[1]
x = evalresp.evalresp(
sta_list=station,
cha_list=channel,
net_code=network,
locid=location,
instant=self.instant,
freqs=freqs,
units=self.target.upper(),
file=self.respfile,
start_stage=stages[0],
stop_stage=stages[1],
rtype='CS')
transfer = x[0][4]
return transfer
@property
def summary(self):
return 'eresp'
[docs]class InverseEvalresp(FrequencyResponse):
'''
Calls evalresp and generates values of the inverse instrument response for
deconvolution of instrument response.
:param respfile: response file in evalresp format
:param trace: trace for which the response is to be extracted from the file
:param target: ``'dis'`` for displacement or ``'vel'`` for velocity
'''
respfile = String.T()
nslc_id = Tuple.T(4, String.T())
target = String.T(default='dis')
instant = Float.T()
def __init__(self, respfile, trace, target='dis', **kwargs):
FrequencyResponse.__init__(
self,
respfile=respfile,
nslc_id=trace.nslc_id,
instant=(trace.tmin + trace.tmax)/2.,
target=target,
**kwargs)
def evaluate(self, freqs):
network, station, location, channel = self.nslc_id
x = evalresp.evalresp(sta_list=station,
cha_list=channel,
net_code=network,
locid=location,
instant=self.instant,
freqs=freqs,
units=self.target.upper(),
file=self.respfile,
rtype='CS')
transfer = x[0][4]
return 1./transfer
@property
def summary(self):
return 'inv_eresp'
def aslist(x):
if x is None:
return []
try:
return list(x)
except TypeError:
return [x]
[docs]class PoleZeroResponse(FrequencyResponse):
'''
Evaluates frequency response from pole-zero representation.
:param zeros: positions of zeros
:type zeros: :py:class:`list` of :py:class:`complex`
:param poles: positions of poles
:type poles: :py:class:`list` of :py:class:`complex`
:param constant: gain factor
:type constant: complex
::
(j*2*pi*f - zeros[0]) * (j*2*pi*f - zeros[1]) * ...
T(f) = constant * ----------------------------------------------------
(j*2*pi*f - poles[0]) * (j*2*pi*f - poles[1]) * ...
The poles and zeros should be given as angular frequencies, not in Hz.
'''
zeros = List.T(Complex.T())
poles = List.T(Complex.T())
constant = Complex.T(default=1.0+0j)
def __init__(
self,
zeros=None,
poles=None,
constant=1.0+0j,
**kwargs):
if zeros is None:
zeros = []
if poles is None:
poles = []
FrequencyResponse.__init__(
self,
zeros=aslist(zeros),
poles=aslist(poles),
constant=constant,
**kwargs)
def evaluate(self, freqs):
if hasattr(signal, 'freqs_zpk'): # added in scipy 0.19.0
return signal.freqs_zpk(
self.zeros, self.poles, self.constant, freqs*2.*num.pi)[1]
else:
jomeg = 1.0j * 2.*num.pi*freqs
a = num.ones(freqs.size, dtype=complex)*self.constant
for z in self.zeros:
a *= jomeg-z
for p in self.poles:
a /= jomeg-p
return a
def is_scalar(self):
return len(self.zeros) == 0 and len(self.poles) == 0
[docs] def get_scalar(self):
'''
Get factor if this is a flat response.
'''
if self.is_scalar():
return self.constant
else:
raise IsNotScalar()
def inverse(self):
return PoleZeroResponse(
poles=list(self.zeros),
zeros=list(self.poles),
constant=1.0/self.constant)
def to_analog(self):
b, a = signal.zpk2tf(self.zeros, self.poles, self.constant)
return AnalogFilterResponse(aslist(b), aslist(a))
def to_digital(self, deltat, method='bilinear'):
from scipy.signal import cont2discrete, zpk2tf
z, p, k, _ = cont2discrete(
(self.zeros, self.poles, self.constant),
deltat, method=method)
b, a = zpk2tf(z, p, k)
return DigitalFilterResponse(b, a, deltat)
def to_digital_polezero(self, deltat, method='bilinear'):
from scipy.signal import cont2discrete
z, p, k, _ = cont2discrete(
(self.zeros, self.poles, self.constant),
deltat, method=method)
return DigitalPoleZeroResponse(z, p, k, deltat)
def construction(self):
breakpoints = []
for zero in self.zeros:
f = abs(zero) / (2.*math.pi)
breakpoints.append((f, 1))
for pole in self.poles:
f = abs(pole) / (2.*math.pi)
breakpoints.append((f, -1))
return finalize_construction(breakpoints)
@property
def summary(self):
if self.is_scalar():
return str_gain(self.get_scalar())
return 'pz{%i,%i}' % (len(self.poles), len(self.zeros))
[docs]class DigitalPoleZeroResponse(FrequencyResponse):
'''
Evaluates frequency response from digital filter pole-zero representation.
:param zeros: positions of zeros
:type zeros: :py:class:`list` of :py:class:`complex`
:param poles: positions of poles
:type poles: :py:class:`list` of :py:class:`complex`
:param constant: gain factor
:type constant: complex
:param deltat: sampling interval
:type deltat: float
The poles and zeros should be given as angular frequencies, not in Hz.
'''
zeros = List.T(Complex.T())
poles = List.T(Complex.T())
constant = Complex.T(default=1.0+0j)
deltat = Float.T()
def __init__(
self,
zeros=None,
poles=None,
constant=1.0+0j,
deltat=None,
**kwargs):
if zeros is None:
zeros = []
if poles is None:
poles = []
if deltat is None:
raise ValueError(
'Sampling interval `deltat` must be given for '
'DigitalPoleZeroResponse.')
FrequencyResponse.__init__(
self, zeros=aslist(zeros), poles=aslist(poles), constant=constant,
deltat=deltat, **kwargs)
def check_sampling_rate(self):
if self.deltat == 0.0:
raise InvalidResponseError(
'Invalid digital response: sampling rate undefined.')
def get_fmax(self):
self.check_sampling_rate()
return 0.5 / self.deltat
def evaluate(self, freqs):
self.check_sampling_rate()
return signal.freqz_zpk(
self.zeros, self.poles, self.constant,
freqs*(2.*math.pi*self.deltat))[1]
def is_scalar(self):
return len(self.zeros) == 0 and len(self.poles) == 0
[docs] def get_scalar(self):
'''
Get factor if this is a flat response.
'''
if self.is_scalar():
return self.constant
else:
raise IsNotScalar()
def to_digital(self, deltat):
self.check_sampling_rate()
from scipy.signal import zpk2tf
b, a = zpk2tf(self.zeros, self.poles, self.constant)
return DigitalFilterResponse(b, a, deltat)
@property
def summary(self):
if self.is_scalar():
return str_gain(self.get_scalar())
return 'dpz{%i,%i,%s}' % (
len(self.poles), len(self.zeros), str_fmax_failsafe(self))
[docs]class ButterworthResponse(FrequencyResponse):
'''
Butterworth frequency response.
:param corner: corner frequency of the response
:param order: order of the response
:param type: either ``high`` or ``low``
'''
corner = Float.T(default=1.0)
order = Int.T(default=4)
type = StringChoice.T(choices=['low', 'high'], default='low')
def to_polezero(self):
z, p, k = signal.butter(
self.order, self.corner*2.*math.pi,
btype=self.type, analog=True, output='zpk')
return PoleZeroResponse(
zeros=aslist(z),
poles=aslist(p),
constant=float(k))
def to_digital(self, deltat):
b, a = signal.butter(
self.order, self.corner*2.*deltat,
self.type, analog=False)
return DigitalFilterResponse(b, a, deltat)
def to_analog(self):
b, a = signal.butter(
self.order, self.corner*2.*math.pi,
self.type, analog=True)
return AnalogFilterResponse(b, a)
def to_digital_polezero(self, deltat):
z, p, k = signal.butter(
self.order, self.corner*2*deltat,
btype=self.type, analog=False, output='zpk')
return DigitalPoleZeroResponse(z, p, k, deltat)
def evaluate(self, freqs):
b, a = signal.butter(
self.order, self.corner*2.*math.pi,
self.type, analog=True)
return signal.freqs(b, a, freqs*2.*math.pi)[1]
@property
def summary(self):
return 'butter_%s{%i,%g}' % (
self.type,
self.order,
self.corner)
[docs]class SampledResponse(FrequencyResponse):
'''
Interpolates frequency response given at a set of sampled frequencies.
:param frequencies,values: frequencies and values of the sampled response
function.
:param left,right: values to return when input is out of range. If set to
``None`` (the default) the endpoints are returned.
'''
frequencies = Array.T(shape=(None,), dtype=float, serialize_as='list')
values = Array.T(shape=(None,), dtype=complex, serialize_as='list')
left = Complex.T(optional=True)
right = Complex.T(optional=True)
def __init__(self, frequencies, values, left=None, right=None, **kwargs):
FrequencyResponse.__init__(
self,
frequencies=asarray_1d(frequencies, float),
values=asarray_1d(values, complex),
**kwargs)
def evaluate(self, freqs):
ereal = num.interp(
freqs, self.frequencies, num.real(self.values),
left=self.left, right=self.right)
eimag = num.interp(
freqs, self.frequencies, num.imag(self.values),
left=self.left, right=self.right)
transfer = ereal + 1.0j*eimag
return transfer
[docs] def inverse(self):
'''
Get inverse as a new :py:class:`SampledResponse` object.
'''
def inv_or_none(x):
if x is not None:
return 1./x
return SampledResponse(
self.frequencies, 1./self.values,
left=inv_or_none(self.left),
right=inv_or_none(self.right))
@property
def summary(self):
return 'sampled'
[docs]class IntegrationResponse(FrequencyResponse):
'''
The integration response, optionally multiplied by a constant gain.
:param n: exponent (integer)
:param gain: gain factor (float)
::
gain
T(f) = --------------
(j*2*pi * f)^n
'''
n = Int.T(optional=True, default=1)
gain = Float.T(optional=True, default=1.0)
def __init__(self, n=1, gain=1.0, **kwargs):
FrequencyResponse.__init__(self, n=n, gain=gain, **kwargs)
def evaluate(self, freqs):
nonzero = freqs != 0.0
resp = num.zeros(freqs.size, dtype=complex)
resp[nonzero] = self.gain / (1.0j * 2. * num.pi*freqs[nonzero])**self.n
return resp
@property
def summary(self):
return 'integration{%i}' % self.n + (
'*gain{%g}' % self.gain
if self.gain is not None and self.gain != 1.0
else '')
[docs]class DifferentiationResponse(FrequencyResponse):
'''
The differentiation response, optionally multiplied by a constant gain.
:param n: exponent (integer)
:param gain: gain factor (float)
::
T(f) = gain * (j*2*pi * f)^n
'''
n = Int.T(optional=True, default=1)
gain = Float.T(optional=True, default=1.0)
def __init__(self, n=1, gain=1.0, **kwargs):
FrequencyResponse.__init__(self, n=n, gain=gain, **kwargs)
def evaluate(self, freqs):
return self.gain * (1.0j * 2. * num.pi * freqs)**self.n
@property
def summary(self):
return 'differentiation{%i}' % self.n + (
'*gain{%g}' % self.gain
if self.gain is not None and self.gain != 1.0
else '')
[docs]class DigitalFilterResponse(FrequencyResponse):
'''
Frequency response of an analog filter.
(see :py:func:`scipy.signal.freqz`).
'''
b = List.T(Float.T())
a = List.T(Float.T())
deltat = Float.T()
drop_phase = Bool.T(default=False)
def __init__(self, b, a, deltat, drop_phase=False, **kwargs):
FrequencyResponse.__init__(
self, b=aslist(b), a=aslist(a), deltat=float(deltat),
drop_phase=drop_phase, **kwargs)
def check_sampling_rate(self):
if self.deltat == 0.0:
raise InvalidResponseError(
'Invalid digital response: sampling rate undefined.')
def is_scalar(self):
return len(self.a) == 1 and len(self.b) == 1
def get_scalar(self):
if self.is_scalar():
return self.b[0] / self.a[0]
else:
raise IsNotScalar()
def get_fmax(self):
if not self.is_scalar():
self.check_sampling_rate()
return 0.5 / self.deltat
else:
return None
def evaluate(self, freqs):
if self.is_scalar():
return num.full_like(freqs, self.get_scalar(), dtype=complex)
self.check_sampling_rate()
ok = freqs <= 0.5/self.deltat
coeffs = num.zeros(freqs.size, dtype=complex)
coeffs[ok] = signal.freqz(
self.b, self.a, freqs[ok]*2.*math.pi * self.deltat)[1]
coeffs[num.logical_not(ok)] = None
if self.drop_phase:
return num.abs(coeffs)
else:
return coeffs
def filter(self, tr):
self.check_sampling_rate()
from pyrocko import trace
trace.assert_same_sampling_rate(self, tr)
tr_new = tr.copy(data=False)
tr_new.set_ydata(signal.lfilter(self.b, self.a, tr.get_ydata()))
return tr_new
@property
def summary(self):
if self.is_scalar():
return str_gain(self.get_scalar())
elif len(self.a) == 1:
return 'fir{%i,<=%sHz}' % (
len(self.b), str_fmax_failsafe(self))
else:
return 'iir{%i,%i,<=%sHz}' % (
len(self.b), len(self.a), str_fmax_failsafe(self))
[docs]class AnalogFilterResponse(FrequencyResponse):
'''
Frequency response of an analog filter.
(see :py:func:`scipy.signal.freqs`).
'''
b = List.T(Float.T())
a = List.T(Float.T())
def __init__(self, b, a, **kwargs):
FrequencyResponse.__init__(
self, b=aslist(b), a=aslist(a), **kwargs)
def is_scalar(self):
return len(self.a) == 1 and len(self.b) == 1
def get_scalar(self):
if self.is_scalar():
return self.b[0] / self.a[0]
else:
raise IsNotScalar()
def evaluate(self, freqs):
return signal.freqs(self.b, self.a, freqs*2.*math.pi)[1]
def to_digital(self, deltat, method='bilinear'):
from scipy.signal import cont2discrete
b, a, _ = cont2discrete((self.b, self.a), deltat, method=method)
if b.ndim == 2:
b = b[0]
return DigitalFilterResponse(b.tolist(), a.tolist(), deltat)
@property
def summary(self):
if self.is_scalar():
return str_gain(self.get_scalar())
return 'analog{%i,%i,%g}' % (
len(self.b), len(self.a), self.get_fmax())
[docs]class MultiplyResponse(FrequencyResponse):
'''
Multiplication of several :py:class:`FrequencyResponse` objects.
'''
responses = List.T(FrequencyResponse.T())
def __init__(self, responses=None, **kwargs):
if responses is None:
responses = []
FrequencyResponse.__init__(self, responses=responses, **kwargs)
def get_fmax(self):
fmaxs = [resp.get_fmax() for resp in self.responses]
fmaxs = [fmax for fmax in fmaxs if fmax is not None]
if not fmaxs:
return None
else:
return min(fmaxs)
def evaluate(self, freqs):
a = num.ones(freqs.size, dtype=complex)
for resp in self.responses:
a *= resp.evaluate(freqs)
return a
def is_scalar(self):
return all(resp.is_scalar() for resp in self.responses)
[docs] def get_scalar(self):
'''
Get factor if this is a flat response.
'''
if self.is_scalar():
return num.prod(resp.get_scalar() for resp in self.responses)
else:
raise IsNotScalar()
def simplify(self):
self.responses = simplify_responses(self.responses)
def construction(self):
breakpoints = []
for resp in self.responses:
breakpoints.extend(resp.construction())
return finalize_construction(breakpoints)
@property
def summary(self):
if self.is_scalar():
return str_gain(self.get_scalar())
else:
xs = [x.summary for x in self.responses]
return '(%s)' % ('*'.join(x for x in xs if x != 'one') or 'one')
[docs]class DelayResponse(FrequencyResponse):
'''
Frequency response of a time delay.
'''
delay = Float.T(
help='Time delay [s]')
def evaluate(self, freqs):
return num.exp(-2.0J * self.delay * num.pi * freqs)
@property
def summary(self):
return 'delay{%g}' % self.delay
class InvalidResponseError(Exception):
pass
[docs]class InvalidResponse(FrequencyResponse):
'''
Frequency response returning NaN for all frequencies.
When using :py:meth:`FrequencyResponse.evaluate` for the first time after
instantiation, the user supplied warning :py:gattr:`message` is emitted.
'''
message = String.T(
help='Warning message to be emitted when the response is used.')
def __init__(self, message):
FrequencyResponse.__init__(self, message=message)
self.have_warned = False
def evaluate(self, freqs):
if not self.have_warned:
logger.warning('Invalid response: %s' % self.message)
self.have_warned = True
return num.full_like(freqs, None, dtype=num.complex)
@property
def summary(self):
return 'invalid'
def simplify_responses(responses):
def unpack_multi(responses):
for resp in responses:
if isinstance(resp, MultiplyResponse):
for sub in unpack_multi(resp.responses):
yield sub
else:
yield resp
def cancel_pzs(poles, zeros):
poles_new = []
zeros_new = list(zeros)
for p in poles:
try:
zeros_new.pop(zeros_new.index(p))
except ValueError:
poles_new.append(p)
return poles_new, zeros_new
def combine_pzs(responses):
poles = []
zeros = []
constant = 1.0
out = []
for resp in responses:
if isinstance(resp, PoleZeroResponse):
poles.extend(resp.poles)
zeros.extend(resp.zeros)
constant *= resp.constant
else:
out.append(resp)
poles, zeros = cancel_pzs(poles, zeros)
if poles or zeros:
out.insert(0, PoleZeroResponse(
poles=poles, zeros=zeros, constant=constant))
elif constant != 1.0:
out.insert(0, Gain(constant=constant))
return out
def split(xs, condition):
out = [], []
for x in xs:
out[condition(x)].append(x)
return out
def combine_gains(responses):
non_scalars, scalars = split(responses, lambda resp: resp.is_scalar())
if scalars:
factor = num.prod([resp.get_scalar() for resp in scalars])
yield Gain(constant=factor)
for resp in non_scalars:
yield resp
return list(combine_gains(combine_pzs(unpack_multi(responses))))