1# http://pyrocko.org - GPLv3
2#
3# The Pyrocko Developers, 21st Century
4# ---|P------/S----------~Lg----------
6'''This module provides convenience objects to handle frequency responses.'''
8from __future__ import print_function, division, absolute_import
10import math
11import logging
13import numpy as num
14from scipy import signal
16from pyrocko import util, evalresp
17from pyrocko.guts import Object, Float, Int, String, Complex, Tuple, List, \
18 StringChoice, Bool
19from pyrocko.guts_array import Array
21try:
22 newstr = unicode
23except NameError:
24 newstr = str
27guts_prefix = 'pf'
29logger = logging.getLogger('pyrocko.response')
32def asarray_1d(x, dtype):
33 if isinstance(x, (list, tuple)) and x and isinstance(x[0], (str, newstr)):
34 return num.asarray(list(map(dtype, x)), dtype=dtype)
35 else:
36 a = num.asarray(x, dtype=dtype)
37 if not a.ndim == 1:
38 raise ValueError('could not convert to 1D array')
39 return a
42class FrequencyResponseCheckpoint(Object):
43 frequency = Float.T()
44 value = Float.T()
47class IsNotScalar(Exception):
48 pass
51class FrequencyResponse(Object):
52 '''
53 Evaluates frequency response at given frequencies.
54 '''
56 checkpoints = List.T(FrequencyResponseCheckpoint.T())
58 def evaluate(self, freqs):
59 return num.ones(freqs.size, dtype=complex)
61 def is_scalar(self):
62 '''
63 Check if this is a flat response.
64 '''
66 if type(self) is FrequencyResponse:
67 return True
68 else:
69 return False # default for derived classes
71 def get_scalar(self):
72 '''
73 Get factor if this is a flat response.
74 '''
75 if type(self) is FrequencyResponse:
76 return 1.0
77 else:
78 raise IsNotScalar() # default for derived classes
80 def get_fmax(self):
81 return None
84class Gain(FrequencyResponse):
85 '''
86 A flat frequency response.
87 '''
89 constant = Complex.T(default=1.0+0j)
91 def evaluate(self, freqs):
92 return util.num_full_like(freqs, self.constant, dtype=complex)
94 def is_scalar(self):
95 return True
97 def get_scalar(self):
98 return self.constant
101class Evalresp(FrequencyResponse):
102 '''
103 Calls evalresp and generates values of the instrument response transfer
104 function.
106 :param respfile: response file in evalresp format
107 :param trace: trace for which the response is to be extracted from the file
108 :param target: ``'dis'`` for displacement or ``'vel'`` for velocity
109 '''
111 respfile = String.T()
112 nslc_id = Tuple.T(4, String.T())
113 target = String.T(default='dis')
114 instant = Float.T()
115 stages = Tuple.T(2, Int.T(), optional=True)
117 def __init__(
118 self,
119 respfile,
120 trace=None,
121 target='dis',
122 nslc_id=None,
123 time=None,
124 stages=None,
125 **kwargs):
127 if trace is not None:
128 nslc_id = trace.nslc_id
129 time = (trace.tmin + trace.tmax) / 2.
131 FrequencyResponse.__init__(
132 self,
133 respfile=respfile,
134 nslc_id=nslc_id,
135 instant=time,
136 target=target,
137 stages=stages,
138 **kwargs)
140 def evaluate(self, freqs):
141 network, station, location, channel = self.nslc_id
142 if self.stages is None:
143 stages = (-1, 0)
144 else:
145 stages = self.stages[0]+1, self.stages[1]
147 x = evalresp.evalresp(
148 sta_list=station,
149 cha_list=channel,
150 net_code=network,
151 locid=location,
152 instant=self.instant,
153 freqs=freqs,
154 units=self.target.upper(),
155 file=self.respfile,
156 start_stage=stages[0],
157 stop_stage=stages[1],
158 rtype='CS')
160 transfer = x[0][4]
161 return transfer
164class InverseEvalresp(FrequencyResponse):
165 '''
166 Calls evalresp and generates values of the inverse instrument response for
167 deconvolution of instrument response.
169 :param respfile: response file in evalresp format
170 :param trace: trace for which the response is to be extracted from the file
171 :param target: ``'dis'`` for displacement or ``'vel'`` for velocity
172 '''
174 respfile = String.T()
175 nslc_id = Tuple.T(4, String.T())
176 target = String.T(default='dis')
177 instant = Float.T()
179 def __init__(self, respfile, trace, target='dis', **kwargs):
180 FrequencyResponse.__init__(
181 self,
182 respfile=respfile,
183 nslc_id=trace.nslc_id,
184 instant=(trace.tmin + trace.tmax)/2.,
185 target=target,
186 **kwargs)
188 def evaluate(self, freqs):
189 network, station, location, channel = self.nslc_id
190 x = evalresp.evalresp(sta_list=station,
191 cha_list=channel,
192 net_code=network,
193 locid=location,
194 instant=self.instant,
195 freqs=freqs,
196 units=self.target.upper(),
197 file=self.respfile,
198 rtype='CS')
200 transfer = x[0][4]
201 return 1./transfer
204def aslist(x):
205 if x is None:
206 return []
208 try:
209 return list(x)
210 except TypeError:
211 return [x]
214class PoleZeroResponse(FrequencyResponse):
215 '''
216 Evaluates frequency response from pole-zero representation.
218 :param zeros: positions of zeros
219 :type zeros: list of complex
220 :param poles: positions of poles
221 :type poles: list of complex
222 :param constant: gain factor
223 :type constant: complex
225 ::
227 (j*2*pi*f - zeros[0]) * (j*2*pi*f - zeros[1]) * ...
228 T(f) = constant * ----------------------------------------------------
229 (j*2*pi*f - poles[0]) * (j*2*pi*f - poles[1]) * ...
232 The poles and zeros should be given as angular frequencies, not in Hz.
233 '''
235 zeros = List.T(Complex.T())
236 poles = List.T(Complex.T())
237 constant = Complex.T(default=1.0+0j)
239 def __init__(
240 self,
241 zeros=None,
242 poles=None,
243 constant=1.0+0j,
244 **kwargs):
246 if zeros is None:
247 zeros = []
248 if poles is None:
249 poles = []
251 FrequencyResponse.__init__(
252 self,
253 zeros=aslist(zeros),
254 poles=aslist(poles),
255 constant=constant,
256 **kwargs)
258 def evaluate(self, freqs):
259 if hasattr(signal, 'freqs_zpk'): # added in scipy 0.19.0
260 return signal.freqs_zpk(
261 self.zeros, self.poles, self.constant, freqs*2.*num.pi)[1]
262 else:
263 jomeg = 1.0j * 2.*num.pi*freqs
265 a = num.ones(freqs.size, dtype=complex)*self.constant
266 for z in self.zeros:
267 a *= jomeg-z
268 for p in self.poles:
269 a /= jomeg-p
271 return a
273 def is_scalar(self):
274 return len(self.zeros) == 0 and len(self.poles) == 0
276 def get_scalar(self):
277 '''
278 Get factor if this is a flat response.
279 '''
280 if self.is_scalar():
281 return self.constant
282 else:
283 raise IsNotScalar()
285 def inverse(self):
286 return PoleZeroResponse(
287 poles=list(self.zeros),
288 zeros=list(self.poles),
289 constant=1.0/self.constant)
291 def to_analog(self):
292 b, a = signal.zpk2tf(self.zeros, self.poles, self.constant)
293 return AnalogFilterResponse(aslist(b), aslist(a))
295 def to_digital(self, deltat, method='bilinear'):
296 from scipy.signal import cont2discrete, zpk2tf
298 z, p, k, _ = cont2discrete(
299 (self.zeros, self.poles, self.constant),
300 deltat, method=method)
302 b, a = zpk2tf(z, p, k)
304 return DigitalFilterResponse(b, a, deltat)
306 def to_digital_polezero(self, deltat, method='bilinear'):
307 from scipy.signal import cont2discrete
309 z, p, k, _ = cont2discrete(
310 (self.zeros, self.poles, self.constant),
311 deltat, method=method)
313 return DigitalPoleZeroResponse(z, p, k, deltat)
316class DigitalPoleZeroResponse(FrequencyResponse):
317 '''
318 Evaluates frequency response from digital filter pole-zero representation.
320 :param zeros: positions of zeros
321 :type zeros: list of complex
322 :param poles: positions of poles
323 :type poles: list of complex
324 :param constant: gain factor
325 :type constant: complex
326 :param deltat: sampling interval
327 :type deltat: float
329 The poles and zeros should be given as angular frequencies, not in Hz.
330 '''
332 zeros = List.T(Complex.T())
333 poles = List.T(Complex.T())
334 constant = Complex.T(default=1.0+0j)
335 deltat = Float.T()
337 def __init__(
338 self,
339 zeros=None,
340 poles=None,
341 constant=1.0+0j,
342 deltat=None,
343 **kwargs):
345 if zeros is None:
346 zeros = []
347 if poles is None:
348 poles = []
349 if deltat is None:
350 raise ValueError(
351 'Sampling interval `deltat` must be given for '
352 'DigitalPoleZeroResponse')
354 FrequencyResponse.__init__(
355 self, zeros=aslist(zeros), poles=aslist(poles), constant=constant,
356 deltat=deltat, **kwargs)
358 def check_sampling_rate(self):
359 if self.deltat == 0.0:
360 raise InvalidResponseError(
361 'Invalid digital response: sampling rate undefined')
363 def get_fmax(self):
364 self.check_sampling_rate()
365 return 0.5 / self.deltat
367 def evaluate(self, freqs):
368 self.check_sampling_rate()
369 return signal.freqz_zpk(
370 self.zeros, self.poles, self.constant,
371 freqs*(2.*math.pi*self.deltat))[1]
373 def is_scalar(self):
374 return len(self.zeros) == 0 and len(self.poles) == 0
376 def get_scalar(self):
377 '''
378 Get factor if this is a flat response.
379 '''
380 if self.is_scalar():
381 return self.constant
382 else:
383 raise IsNotScalar()
385 def to_digital(self, deltat):
386 self.check_sampling_rate()
387 from scipy.signal import zpk2tf
389 b, a = zpk2tf(self.zeros, self.poles, self.constant)
390 return DigitalFilterResponse(b, a, deltat)
393class ButterworthResponse(FrequencyResponse):
394 '''
395 Butterworth frequency response.
397 :param corner: corner frequency of the response
398 :param order: order of the response
399 :param type: either ``high`` or ``low``
400 '''
402 corner = Float.T(default=1.0)
403 order = Int.T(default=4)
404 type = StringChoice.T(choices=['low', 'high'], default='low')
406 def to_polezero(self):
407 z, p, k = signal.butter(
408 self.order, self.corner*2.*math.pi,
409 btype=self.type, analog=True, output='zpk')
411 return PoleZeroResponse(
412 zeros=aslist(z),
413 poles=aslist(p),
414 constant=float(k))
416 def to_digital(self, deltat):
417 b, a = signal.butter(
418 self.order, self.corner*2.*deltat,
419 self.type, analog=False)
421 return DigitalFilterResponse(b, a, deltat)
423 def to_analog(self):
424 b, a = signal.butter(
425 self.order, self.corner*2.*math.pi,
426 self.type, analog=True)
428 return AnalogFilterResponse(b, a)
430 def to_digital_polezero(self, deltat):
431 z, p, k = signal.butter(
432 self.order, self.corner*2*deltat,
433 btype=self.type, analog=False, output='zpk')
435 return DigitalPoleZeroResponse(z, p, k, deltat)
437 def evaluate(self, freqs):
438 b, a = signal.butter(
439 self.order, self.corner*2.*math.pi,
440 self.type, analog=True)
442 return signal.freqs(b, a, freqs*2.*math.pi)[1]
445class SampledResponse(FrequencyResponse):
446 '''
447 Interpolates frequency response given at a set of sampled frequencies.
449 :param frequencies,values: frequencies and values of the sampled response
450 function.
451 :param left,right: values to return when input is out of range. If set to
452 ``None`` (the default) the endpoints are returned.
453 '''
455 frequencies = Array.T(shape=(None,), dtype=float, serialize_as='list')
456 values = Array.T(shape=(None,), dtype=complex, serialize_as='list')
457 left = Complex.T(optional=True)
458 right = Complex.T(optional=True)
460 def __init__(self, frequencies, values, left=None, right=None, **kwargs):
461 FrequencyResponse.__init__(
462 self,
463 frequencies=asarray_1d(frequencies, float),
464 values=asarray_1d(values, complex),
465 **kwargs)
467 def evaluate(self, freqs):
468 ereal = num.interp(
469 freqs, self.frequencies, num.real(self.values),
470 left=self.left, right=self.right)
471 eimag = num.interp(
472 freqs, self.frequencies, num.imag(self.values),
473 left=self.left, right=self.right)
474 transfer = ereal + 1.0j*eimag
475 return transfer
477 def inverse(self):
478 '''
479 Get inverse as a new :py:class:`SampledResponse` object.
480 '''
482 def inv_or_none(x):
483 if x is not None:
484 return 1./x
486 return SampledResponse(
487 self.frequencies, 1./self.values,
488 left=inv_or_none(self.left),
489 right=inv_or_none(self.right))
492class IntegrationResponse(FrequencyResponse):
493 '''
494 The integration response, optionally multiplied by a constant gain.
496 :param n: exponent (integer)
497 :param gain: gain factor (float)
499 ::
501 gain
502 T(f) = --------------
503 (j*2*pi * f)^n
504 '''
506 n = Int.T(optional=True, default=1)
507 gain = Float.T(optional=True, default=1.0)
509 def __init__(self, n=1, gain=1.0, **kwargs):
510 FrequencyResponse.__init__(self, n=n, gain=gain, **kwargs)
512 def evaluate(self, freqs):
513 nonzero = freqs != 0.0
514 resp = num.zeros(freqs.size, dtype=complex)
515 resp[nonzero] = self.gain / (1.0j * 2. * num.pi*freqs[nonzero])**self.n
516 return resp
519class DifferentiationResponse(FrequencyResponse):
520 '''
521 The differentiation response, optionally multiplied by a constant gain.
523 :param n: exponent (integer)
524 :param gain: gain factor (float)
526 ::
528 T(f) = gain * (j*2*pi * f)^n
529 '''
531 n = Int.T(optional=True, default=1)
532 gain = Float.T(optional=True, default=1.0)
534 def __init__(self, n=1, gain=1.0, **kwargs):
535 FrequencyResponse.__init__(self, n=n, gain=gain, **kwargs)
537 def evaluate(self, freqs):
538 return self.gain * (1.0j * 2. * num.pi * freqs)**self.n
541class DigitalFilterResponse(FrequencyResponse):
542 '''
543 Frequency response of an analog filter.
545 (see :py:func:`scipy.signal.freqz`).
546 '''
548 b = List.T(Float.T())
549 a = List.T(Float.T())
550 deltat = Float.T()
551 drop_phase = Bool.T(default=False)
553 def __init__(self, b, a, deltat, drop_phase=False, **kwargs):
554 FrequencyResponse.__init__(
555 self, b=aslist(b), a=aslist(a), deltat=float(deltat),
556 drop_phase=drop_phase, **kwargs)
558 def check_sampling_rate(self):
559 if self.deltat == 0.0:
560 raise InvalidResponseError(
561 'Invalid digital response: sampling rate undefined')
563 def get_fmax(self):
564 self.check_sampling_rate()
565 return 0.5 / self.deltat
567 def evaluate(self, freqs):
568 self.check_sampling_rate()
570 ok = freqs <= 0.5/self.deltat
571 coeffs = num.zeros(freqs.size, dtype=complex)
573 coeffs[ok] = signal.freqz(
574 self.b, self.a, freqs[ok]*2.*math.pi * self.deltat)[1]
576 coeffs[num.logical_not(ok)] = None
577 if self.drop_phase:
578 return num.abs(coeffs)
579 else:
580 return coeffs
582 def filter(self, tr):
583 self.check_sampling_rate()
585 from pyrocko import trace
586 trace.assert_same_sampling_rate(self, tr)
587 tr_new = tr.copy(data=False)
588 tr_new.set_ydata(signal.lfilter(self.b, self.a, tr.get_ydata()))
589 return tr_new
592class AnalogFilterResponse(FrequencyResponse):
593 '''
594 Frequency response of an analog filter.
596 (see :py:func:`scipy.signal.freqs`).
597 '''
599 b = List.T(Float.T())
600 a = List.T(Float.T())
602 def __init__(self, b, a, **kwargs):
603 FrequencyResponse.__init__(
604 self, b=aslist(b), a=aslist(a), **kwargs)
606 def evaluate(self, freqs):
607 return signal.freqs(self.b, self.a, freqs*2.*math.pi)[1]
609 def to_digital(self, deltat, method='bilinear'):
610 from scipy.signal import cont2discrete
611 b, a, _ = cont2discrete((self.b, self.a), deltat, method=method)
612 if b.ndim == 2:
613 b = b[0]
614 return DigitalFilterResponse(b.tolist(), a.tolist(), deltat)
617class MultiplyResponse(FrequencyResponse):
618 '''
619 Multiplication of several :py:class:`FrequencyResponse` objects.
620 '''
622 responses = List.T(FrequencyResponse.T())
624 def __init__(self, responses=None, **kwargs):
625 if responses is None:
626 responses = []
627 FrequencyResponse.__init__(self, responses=responses, **kwargs)
629 def get_fmax(self):
630 fmaxs = [resp.get_fmax() for resp in self.responses]
631 fmaxs = [fmax for fmax in fmaxs if fmax is not None]
632 if not fmaxs:
633 return None
634 else:
635 return min(fmaxs)
637 def evaluate(self, freqs):
638 a = num.ones(freqs.size, dtype=complex)
639 for resp in self.responses:
640 a *= resp.evaluate(freqs)
642 return a
644 def is_scalar(self):
645 return all(resp.is_scalar() for resp in self.responses)
647 def get_scalar(self):
648 '''
649 Get factor if this is a flat response.
650 '''
651 if self.is_scalar():
652 return num.product(resp.get_scalar() for resp in self.responses)
653 else:
654 raise IsNotScalar()
656 def simplify(self):
657 poles = []
658 zeros = []
659 constant = 1.0
660 responses = []
661 for resp in self.responses:
662 if isinstance(resp, PoleZeroResponse):
663 poles.extend(resp.poles)
664 zeros.extend(resp.zeros)
665 constant *= resp.constant
666 else:
667 responses.append(resp)
669 if poles or zeros or constant != 1.0:
670 responses[0:0] = [
671 PoleZeroResponse(poles=poles, zeros=zeros, constant=constant)]
673 self.responses = responses
676class DelayResponse(FrequencyResponse):
678 delay = Float.T()
680 def evaluate(self, freqs):
681 return num.exp(-2.0J * self.delay * num.pi * freqs)
684class InvalidResponseError(Exception):
685 pass
688class InvalidResponse(FrequencyResponse):
690 message = String.T()
692 def __init__(self, message):
693 FrequencyResponse.__init__(self, message=message)
694 self.have_warned = False
696 def evaluate(self, freqs):
697 if not self.have_warned:
698 logger.warning('Invalid response: %s' % self.message)
699 self.have_warned = True
701 return util.num_full_like(freqs, None, dtype=num.complex)
704def simplify_responses(responses):
706 def unpack_multi(responses):
707 for resp in responses:
708 if isinstance(resp, MultiplyResponse):
709 for sub in unpack_multi(resp.responses):
710 yield sub
711 else:
712 yield resp
714 def cancel_pzs(poles, zeros):
715 poles_new = []
716 zeros_new = list(zeros)
717 for p in poles:
718 try:
719 zeros_new.pop(zeros_new.index(p))
720 except ValueError:
721 poles_new.append(p)
723 return poles_new, zeros_new
725 def combine_pzs(responses):
726 poles = []
727 zeros = []
728 constant = 1.0
729 out = []
730 for resp in responses:
731 if isinstance(resp, PoleZeroResponse):
732 poles.extend(resp.poles)
733 zeros.extend(resp.zeros)
734 constant *= resp.constant
735 else:
736 out.append(resp)
738 poles, zeros = cancel_pzs(poles, zeros)
739 if poles or zeros:
740 out.insert(0, PoleZeroResponse(
741 poles=poles, zeros=zeros, constant=constant))
742 elif constant != 1.0:
743 out.insert(0, Gain(constant=constant))
745 return out
747 def split(xs, condition):
748 out = [], []
749 for x in xs:
750 out[condition(x)].append(x)
752 return out
754 def combine_gains(responses):
755 non_scalars, scalars = split(responses, lambda resp: resp.is_scalar())
756 if scalars:
757 factor = num.prod([resp.get_scalar() for resp in scalars])
758 yield Gain(constant=factor)
760 for resp in non_scalars:
761 yield resp
763 return list(combine_gains(combine_pzs(unpack_multi(responses))))