1# http://pyrocko.org - GPLv3
2#
3# The Pyrocko Developers, 21st Century
4# ---|P------/S----------~Lg----------
6'''This module provides convenience objects to handle frequency responses.'''
8import math
9import logging
10import uuid
12import numpy as num
13from scipy import signal
15from pyrocko import util, evalresp
16from pyrocko.guts import Object, Float, Int, String, Complex, Tuple, List, \
17 StringChoice, Bool
18from pyrocko.guts_array import Array
21guts_prefix = 'pf'
23logger = logging.getLogger('pyrocko.response')
26def asarray_1d(x, dtype):
27 if isinstance(x, (list, tuple)) and x and isinstance(x[0], str):
28 return num.asarray(list(map(dtype, x)), dtype=dtype)
29 else:
30 a = num.asarray(x, dtype=dtype)
31 if not a.ndim == 1:
32 raise ValueError('could not convert to 1D array')
33 return a
36def finalize_construction(breakpoints):
37 breakpoints.sort()
38 breakpoints_out = []
39 f_last = None
40 for f, c in breakpoints:
41 if f_last is not None and f == f_last:
42 breakpoints_out[-1][1] += c
43 else:
44 breakpoints_out.append([f, c])
46 f_last = f
48 breakpoints_out = [(f, c) for (f, c) in breakpoints_out if c != 0]
49 return breakpoints_out
52class FrequencyResponseCheckpoint(Object):
53 frequency = Float.T()
54 value = Float.T()
57class IsNotScalar(Exception):
58 pass
61def str_fmax_failsafe(resp):
62 try:
63 return '%g' % resp.get_fmax()
64 except InvalidResponseError:
65 return '?'
68class FrequencyResponse(Object):
69 '''
70 Evaluates frequency response at given frequencies.
71 '''
73 checkpoints = List.T(FrequencyResponseCheckpoint.T())
75 def __init__(self, *args, **kwargs):
76 Object.__init__(self, *args, **kwargs)
77 self.uuid = uuid.uuid4()
79 def evaluate(self, freqs):
80 return num.ones(freqs.size, dtype=complex)
82 def evaluate1(self, freq):
83 return self.evaluate(num.atleast_1d(freq))[0]
85 def is_scalar(self):
86 '''
87 Check if this is a flat response.
88 '''
90 if type(self) is FrequencyResponse:
91 return True
92 else:
93 return False # default for derived classes
95 def get_scalar(self):
96 '''
97 Get factor if this is a flat response.
98 '''
99 if type(self) is FrequencyResponse:
100 return 1.0
101 else:
102 raise IsNotScalar() # default for derived classes
104 def get_fmax(self):
105 return None
107 def construction(self):
108 return []
110 @property
111 def summary(self):
112 if type(self) is FrequencyResponse:
113 return 'one'
114 else:
115 return 'unknown'
118def str_gain(gain):
119 if gain == 1.0:
120 return 'one'
121 elif isinstance(gain, complex):
122 return 'gain{%s}' % repr(gain)
123 else:
124 return 'gain{%g}' % gain
127class Gain(FrequencyResponse):
128 '''
129 A flat frequency response.
130 '''
132 constant = Complex.T(default=1.0+0j)
134 def evaluate(self, freqs):
135 return util.num_full_like(freqs, self.constant, dtype=complex)
137 def is_scalar(self):
138 return True
140 def get_scalar(self):
141 return self.constant
143 @property
144 def summary(self):
145 return str_gain(self.constant)
148class Evalresp(FrequencyResponse):
149 '''
150 Calls evalresp and generates values of the instrument response transfer
151 function.
153 :param respfile: response file in evalresp format
154 :param trace: trace for which the response is to be extracted from the file
155 :param target: ``'dis'`` for displacement or ``'vel'`` for velocity
156 '''
158 respfile = String.T()
159 nslc_id = Tuple.T(4, String.T())
160 target = String.T(default='dis')
161 instant = Float.T()
162 stages = Tuple.T(2, Int.T(), optional=True)
164 def __init__(
165 self,
166 respfile,
167 trace=None,
168 target='dis',
169 nslc_id=None,
170 time=None,
171 stages=None,
172 **kwargs):
174 if trace is not None:
175 nslc_id = trace.nslc_id
176 time = (trace.tmin + trace.tmax) / 2.
178 FrequencyResponse.__init__(
179 self,
180 respfile=respfile,
181 nslc_id=nslc_id,
182 instant=time,
183 target=target,
184 stages=stages,
185 **kwargs)
187 def evaluate(self, freqs):
188 network, station, location, channel = self.nslc_id
189 if self.stages is None:
190 stages = (-1, 0)
191 else:
192 stages = self.stages[0]+1, self.stages[1]
194 x = evalresp.evalresp(
195 sta_list=station,
196 cha_list=channel,
197 net_code=network,
198 locid=location,
199 instant=self.instant,
200 freqs=freqs,
201 units=self.target.upper(),
202 file=self.respfile,
203 start_stage=stages[0],
204 stop_stage=stages[1],
205 rtype='CS')
207 transfer = x[0][4]
208 return transfer
210 @property
211 def summary(self):
212 return 'eresp'
215class InverseEvalresp(FrequencyResponse):
216 '''
217 Calls evalresp and generates values of the inverse instrument response for
218 deconvolution of instrument response.
220 :param respfile: response file in evalresp format
221 :param trace: trace for which the response is to be extracted from the file
222 :param target: ``'dis'`` for displacement or ``'vel'`` for velocity
223 '''
225 respfile = String.T()
226 nslc_id = Tuple.T(4, String.T())
227 target = String.T(default='dis')
228 instant = Float.T()
230 def __init__(self, respfile, trace, target='dis', **kwargs):
231 FrequencyResponse.__init__(
232 self,
233 respfile=respfile,
234 nslc_id=trace.nslc_id,
235 instant=(trace.tmin + trace.tmax)/2.,
236 target=target,
237 **kwargs)
239 def evaluate(self, freqs):
240 network, station, location, channel = self.nslc_id
241 x = evalresp.evalresp(sta_list=station,
242 cha_list=channel,
243 net_code=network,
244 locid=location,
245 instant=self.instant,
246 freqs=freqs,
247 units=self.target.upper(),
248 file=self.respfile,
249 rtype='CS')
251 transfer = x[0][4]
252 return 1./transfer
254 @property
255 def summary(self):
256 return 'inv_eresp'
259def aslist(x):
260 if x is None:
261 return []
263 try:
264 return list(x)
265 except TypeError:
266 return [x]
269class PoleZeroResponse(FrequencyResponse):
270 '''
271 Evaluates frequency response from pole-zero representation.
273 :param zeros: positions of zeros
274 :type zeros: list of complex
275 :param poles: positions of poles
276 :type poles: list of complex
277 :param constant: gain factor
278 :type constant: complex
280 ::
282 (j*2*pi*f - zeros[0]) * (j*2*pi*f - zeros[1]) * ...
283 T(f) = constant * ----------------------------------------------------
284 (j*2*pi*f - poles[0]) * (j*2*pi*f - poles[1]) * ...
287 The poles and zeros should be given as angular frequencies, not in Hz.
288 '''
290 zeros = List.T(Complex.T())
291 poles = List.T(Complex.T())
292 constant = Complex.T(default=1.0+0j)
294 def __init__(
295 self,
296 zeros=None,
297 poles=None,
298 constant=1.0+0j,
299 **kwargs):
301 if zeros is None:
302 zeros = []
303 if poles is None:
304 poles = []
306 FrequencyResponse.__init__(
307 self,
308 zeros=aslist(zeros),
309 poles=aslist(poles),
310 constant=constant,
311 **kwargs)
313 def evaluate(self, freqs):
314 if hasattr(signal, 'freqs_zpk'): # added in scipy 0.19.0
315 return signal.freqs_zpk(
316 self.zeros, self.poles, self.constant, freqs*2.*num.pi)[1]
317 else:
318 jomeg = 1.0j * 2.*num.pi*freqs
320 a = num.ones(freqs.size, dtype=complex)*self.constant
321 for z in self.zeros:
322 a *= jomeg-z
323 for p in self.poles:
324 a /= jomeg-p
326 return a
328 def is_scalar(self):
329 return len(self.zeros) == 0 and len(self.poles) == 0
331 def get_scalar(self):
332 '''
333 Get factor if this is a flat response.
334 '''
335 if self.is_scalar():
336 return self.constant
337 else:
338 raise IsNotScalar()
340 def inverse(self):
341 return PoleZeroResponse(
342 poles=list(self.zeros),
343 zeros=list(self.poles),
344 constant=1.0/self.constant)
346 def to_analog(self):
347 b, a = signal.zpk2tf(self.zeros, self.poles, self.constant)
348 return AnalogFilterResponse(aslist(b), aslist(a))
350 def to_digital(self, deltat, method='bilinear'):
351 from scipy.signal import cont2discrete, zpk2tf
353 z, p, k, _ = cont2discrete(
354 (self.zeros, self.poles, self.constant),
355 deltat, method=method)
357 b, a = zpk2tf(z, p, k)
359 return DigitalFilterResponse(b, a, deltat)
361 def to_digital_polezero(self, deltat, method='bilinear'):
362 from scipy.signal import cont2discrete
364 z, p, k, _ = cont2discrete(
365 (self.zeros, self.poles, self.constant),
366 deltat, method=method)
368 return DigitalPoleZeroResponse(z, p, k, deltat)
370 def construction(self):
371 breakpoints = []
372 for zero in self.zeros:
373 f = abs(zero) / (2.*math.pi)
374 breakpoints.append((f, 1))
376 for pole in self.poles:
377 f = abs(pole) / (2.*math.pi)
378 breakpoints.append((f, -1))
380 return finalize_construction(breakpoints)
382 @property
383 def summary(self):
384 if self.is_scalar():
385 return str_gain(self.get_scalar())
387 return 'pz{%i,%i}' % (len(self.poles), len(self.zeros))
390class DigitalPoleZeroResponse(FrequencyResponse):
391 '''
392 Evaluates frequency response from digital filter pole-zero representation.
394 :param zeros: positions of zeros
395 :type zeros: list of complex
396 :param poles: positions of poles
397 :type poles: list of complex
398 :param constant: gain factor
399 :type constant: complex
400 :param deltat: sampling interval
401 :type deltat: float
403 The poles and zeros should be given as angular frequencies, not in Hz.
404 '''
406 zeros = List.T(Complex.T())
407 poles = List.T(Complex.T())
408 constant = Complex.T(default=1.0+0j)
409 deltat = Float.T()
411 def __init__(
412 self,
413 zeros=None,
414 poles=None,
415 constant=1.0+0j,
416 deltat=None,
417 **kwargs):
419 if zeros is None:
420 zeros = []
421 if poles is None:
422 poles = []
423 if deltat is None:
424 raise ValueError(
425 'Sampling interval `deltat` must be given for '
426 'DigitalPoleZeroResponse')
428 FrequencyResponse.__init__(
429 self, zeros=aslist(zeros), poles=aslist(poles), constant=constant,
430 deltat=deltat, **kwargs)
432 def check_sampling_rate(self):
433 if self.deltat == 0.0:
434 raise InvalidResponseError(
435 'Invalid digital response: sampling rate undefined')
437 def get_fmax(self):
438 self.check_sampling_rate()
439 return 0.5 / self.deltat
441 def evaluate(self, freqs):
442 self.check_sampling_rate()
443 return signal.freqz_zpk(
444 self.zeros, self.poles, self.constant,
445 freqs*(2.*math.pi*self.deltat))[1]
447 def is_scalar(self):
448 return len(self.zeros) == 0 and len(self.poles) == 0
450 def get_scalar(self):
451 '''
452 Get factor if this is a flat response.
453 '''
454 if self.is_scalar():
455 return self.constant
456 else:
457 raise IsNotScalar()
459 def to_digital(self, deltat):
460 self.check_sampling_rate()
461 from scipy.signal import zpk2tf
463 b, a = zpk2tf(self.zeros, self.poles, self.constant)
464 return DigitalFilterResponse(b, a, deltat)
466 @property
467 def summary(self):
468 if self.is_scalar():
469 return str_gain(self.get_scalar())
471 return 'dpz{%i,%i,%s}' % (
472 len(self.poles), len(self.zeros), str_fmax_failsafe(self))
475class ButterworthResponse(FrequencyResponse):
476 '''
477 Butterworth frequency response.
479 :param corner: corner frequency of the response
480 :param order: order of the response
481 :param type: either ``high`` or ``low``
482 '''
484 corner = Float.T(default=1.0)
485 order = Int.T(default=4)
486 type = StringChoice.T(choices=['low', 'high'], default='low')
488 def to_polezero(self):
489 z, p, k = signal.butter(
490 self.order, self.corner*2.*math.pi,
491 btype=self.type, analog=True, output='zpk')
493 return PoleZeroResponse(
494 zeros=aslist(z),
495 poles=aslist(p),
496 constant=float(k))
498 def to_digital(self, deltat):
499 b, a = signal.butter(
500 self.order, self.corner*2.*deltat,
501 self.type, analog=False)
503 return DigitalFilterResponse(b, a, deltat)
505 def to_analog(self):
506 b, a = signal.butter(
507 self.order, self.corner*2.*math.pi,
508 self.type, analog=True)
510 return AnalogFilterResponse(b, a)
512 def to_digital_polezero(self, deltat):
513 z, p, k = signal.butter(
514 self.order, self.corner*2*deltat,
515 btype=self.type, analog=False, output='zpk')
517 return DigitalPoleZeroResponse(z, p, k, deltat)
519 def evaluate(self, freqs):
520 b, a = signal.butter(
521 self.order, self.corner*2.*math.pi,
522 self.type, analog=True)
524 return signal.freqs(b, a, freqs*2.*math.pi)[1]
526 @property
527 def summary(self):
528 return 'butter_%s{%i,%g}' % (
529 self.type,
530 self.order,
531 self.corner)
534class SampledResponse(FrequencyResponse):
535 '''
536 Interpolates frequency response given at a set of sampled frequencies.
538 :param frequencies,values: frequencies and values of the sampled response
539 function.
540 :param left,right: values to return when input is out of range. If set to
541 ``None`` (the default) the endpoints are returned.
542 '''
544 frequencies = Array.T(shape=(None,), dtype=float, serialize_as='list')
545 values = Array.T(shape=(None,), dtype=complex, serialize_as='list')
546 left = Complex.T(optional=True)
547 right = Complex.T(optional=True)
549 def __init__(self, frequencies, values, left=None, right=None, **kwargs):
550 FrequencyResponse.__init__(
551 self,
552 frequencies=asarray_1d(frequencies, float),
553 values=asarray_1d(values, complex),
554 **kwargs)
556 def evaluate(self, freqs):
557 ereal = num.interp(
558 freqs, self.frequencies, num.real(self.values),
559 left=self.left, right=self.right)
560 eimag = num.interp(
561 freqs, self.frequencies, num.imag(self.values),
562 left=self.left, right=self.right)
563 transfer = ereal + 1.0j*eimag
564 return transfer
566 def inverse(self):
567 '''
568 Get inverse as a new :py:class:`SampledResponse` object.
569 '''
571 def inv_or_none(x):
572 if x is not None:
573 return 1./x
575 return SampledResponse(
576 self.frequencies, 1./self.values,
577 left=inv_or_none(self.left),
578 right=inv_or_none(self.right))
580 @property
581 def summary(self):
582 return 'sampled'
585class IntegrationResponse(FrequencyResponse):
586 '''
587 The integration response, optionally multiplied by a constant gain.
589 :param n: exponent (integer)
590 :param gain: gain factor (float)
592 ::
594 gain
595 T(f) = --------------
596 (j*2*pi * f)^n
597 '''
599 n = Int.T(optional=True, default=1)
600 gain = Float.T(optional=True, default=1.0)
602 def __init__(self, n=1, gain=1.0, **kwargs):
603 FrequencyResponse.__init__(self, n=n, gain=gain, **kwargs)
605 def evaluate(self, freqs):
606 nonzero = freqs != 0.0
607 resp = num.zeros(freqs.size, dtype=complex)
608 resp[nonzero] = self.gain / (1.0j * 2. * num.pi*freqs[nonzero])**self.n
609 return resp
611 @property
612 def summary(self):
613 return 'integration{%i}' % self.n + (
614 '*gain{%g}' % self.gain
615 if self.gain is not None and self.gain != 1.0
616 else '')
619class DifferentiationResponse(FrequencyResponse):
620 '''
621 The differentiation response, optionally multiplied by a constant gain.
623 :param n: exponent (integer)
624 :param gain: gain factor (float)
626 ::
628 T(f) = gain * (j*2*pi * f)^n
629 '''
631 n = Int.T(optional=True, default=1)
632 gain = Float.T(optional=True, default=1.0)
634 def __init__(self, n=1, gain=1.0, **kwargs):
635 FrequencyResponse.__init__(self, n=n, gain=gain, **kwargs)
637 def evaluate(self, freqs):
638 return self.gain * (1.0j * 2. * num.pi * freqs)**self.n
640 @property
641 def summary(self):
642 return 'differentiation{%i}' % self.n + (
643 '*gain{%g}' % self.gain
644 if self.gain is not None and self.gain != 1.0
645 else '')
648class DigitalFilterResponse(FrequencyResponse):
649 '''
650 Frequency response of an analog filter.
652 (see :py:func:`scipy.signal.freqz`).
653 '''
655 b = List.T(Float.T())
656 a = List.T(Float.T())
657 deltat = Float.T()
658 drop_phase = Bool.T(default=False)
660 def __init__(self, b, a, deltat, drop_phase=False, **kwargs):
661 FrequencyResponse.__init__(
662 self, b=aslist(b), a=aslist(a), deltat=float(deltat),
663 drop_phase=drop_phase, **kwargs)
665 def check_sampling_rate(self):
666 if self.deltat == 0.0:
667 raise InvalidResponseError(
668 'Invalid digital response: sampling rate undefined')
670 def is_scalar(self):
671 return len(self.a) == 1 and len(self.b) == 1
673 def get_scalar(self):
674 if self.is_scalar():
675 return self.b[0] / self.a[0]
676 else:
677 raise IsNotScalar()
679 def get_fmax(self):
680 if not self.is_scalar():
681 self.check_sampling_rate()
682 return 0.5 / self.deltat
683 else:
684 return None
686 def evaluate(self, freqs):
687 if self.is_scalar():
688 return util.num_full_like(freqs, self.get_scalar(), dtype=complex)
690 self.check_sampling_rate()
692 ok = freqs <= 0.5/self.deltat
693 coeffs = num.zeros(freqs.size, dtype=complex)
695 coeffs[ok] = signal.freqz(
696 self.b, self.a, freqs[ok]*2.*math.pi * self.deltat)[1]
698 coeffs[num.logical_not(ok)] = None
699 if self.drop_phase:
700 return num.abs(coeffs)
701 else:
702 return coeffs
704 def filter(self, tr):
705 self.check_sampling_rate()
707 from pyrocko import trace
708 trace.assert_same_sampling_rate(self, tr)
709 tr_new = tr.copy(data=False)
710 tr_new.set_ydata(signal.lfilter(self.b, self.a, tr.get_ydata()))
711 return tr_new
713 @property
714 def summary(self):
715 if self.is_scalar():
716 return str_gain(self.get_scalar())
718 elif len(self.a) == 1:
719 return 'fir{%i,<=%sHz}' % (
720 len(self.b), str_fmax_failsafe(self))
722 else:
723 return 'iir{%i,%i,<=%sHz}' % (
724 len(self.b), len(self.a), str_fmax_failsafe(self))
727class AnalogFilterResponse(FrequencyResponse):
728 '''
729 Frequency response of an analog filter.
731 (see :py:func:`scipy.signal.freqs`).
732 '''
734 b = List.T(Float.T())
735 a = List.T(Float.T())
737 def __init__(self, b, a, **kwargs):
738 FrequencyResponse.__init__(
739 self, b=aslist(b), a=aslist(a), **kwargs)
741 def is_scalar(self):
742 return len(self.a) == 1 and len(self.b) == 1
744 def get_scalar(self):
745 if self.is_scalar():
746 return self.b[0] / self.a[0]
747 else:
748 raise IsNotScalar()
750 def evaluate(self, freqs):
751 return signal.freqs(self.b, self.a, freqs*2.*math.pi)[1]
753 def to_digital(self, deltat, method='bilinear'):
754 from scipy.signal import cont2discrete
755 b, a, _ = cont2discrete((self.b, self.a), deltat, method=method)
756 if b.ndim == 2:
757 b = b[0]
758 return DigitalFilterResponse(b.tolist(), a.tolist(), deltat)
760 @property
761 def summary(self):
762 if self.is_scalar():
763 return str_gain(self.get_scalar())
765 return 'analog{%i,%i,%g}' % (
766 len(self.b), len(self.a), self.get_fmax())
769class MultiplyResponse(FrequencyResponse):
770 '''
771 Multiplication of several :py:class:`FrequencyResponse` objects.
772 '''
774 responses = List.T(FrequencyResponse.T())
776 def __init__(self, responses=None, **kwargs):
777 if responses is None:
778 responses = []
779 FrequencyResponse.__init__(self, responses=responses, **kwargs)
781 def get_fmax(self):
782 fmaxs = [resp.get_fmax() for resp in self.responses]
783 fmaxs = [fmax for fmax in fmaxs if fmax is not None]
784 if not fmaxs:
785 return None
786 else:
787 return min(fmaxs)
789 def evaluate(self, freqs):
790 a = num.ones(freqs.size, dtype=complex)
791 for resp in self.responses:
792 a *= resp.evaluate(freqs)
794 return a
796 def is_scalar(self):
797 return all(resp.is_scalar() for resp in self.responses)
799 def get_scalar(self):
800 '''
801 Get factor if this is a flat response.
802 '''
803 if self.is_scalar():
804 return num.prod(resp.get_scalar() for resp in self.responses)
805 else:
806 raise IsNotScalar()
808 def simplify(self):
809 self.responses = simplify_responses(self.responses)
811 def construction(self):
812 breakpoints = []
813 for resp in self.responses:
814 breakpoints.extend(resp.construction())
816 return finalize_construction(breakpoints)
818 @property
819 def summary(self):
820 if self.is_scalar(self):
821 return str_gain(self.get_scalar())
822 else:
823 xs = [x.summary for x in self.responses]
824 return '(%s)' % ('*'.join(x for x in xs if x != 'one') or 'one')
827class DelayResponse(FrequencyResponse):
829 delay = Float.T()
831 def evaluate(self, freqs):
832 return num.exp(-2.0J * self.delay * num.pi * freqs)
834 @property
835 def summary(self):
836 return 'delay{%g}' % self.delay
839class InvalidResponseError(Exception):
840 pass
843class InvalidResponse(FrequencyResponse):
845 message = String.T()
847 def __init__(self, message):
848 FrequencyResponse.__init__(self, message=message)
849 self.have_warned = False
851 def evaluate(self, freqs):
852 if not self.have_warned:
853 logger.warning('Invalid response: %s' % self.message)
854 self.have_warned = True
856 return util.num_full_like(freqs, None, dtype=num.complex)
858 @property
859 def summary(self):
860 return 'invalid'
863def simplify_responses(responses):
865 def unpack_multi(responses):
866 for resp in responses:
867 if isinstance(resp, MultiplyResponse):
868 for sub in unpack_multi(resp.responses):
869 yield sub
870 else:
871 yield resp
873 def cancel_pzs(poles, zeros):
874 poles_new = []
875 zeros_new = list(zeros)
876 for p in poles:
877 try:
878 zeros_new.pop(zeros_new.index(p))
879 except ValueError:
880 poles_new.append(p)
882 return poles_new, zeros_new
884 def combine_pzs(responses):
885 poles = []
886 zeros = []
887 constant = 1.0
888 out = []
889 for resp in responses:
890 if isinstance(resp, PoleZeroResponse):
891 poles.extend(resp.poles)
892 zeros.extend(resp.zeros)
893 constant *= resp.constant
894 else:
895 out.append(resp)
897 poles, zeros = cancel_pzs(poles, zeros)
898 if poles or zeros:
899 out.insert(0, PoleZeroResponse(
900 poles=poles, zeros=zeros, constant=constant))
901 elif constant != 1.0:
902 out.insert(0, Gain(constant=constant))
904 return out
906 def split(xs, condition):
907 out = [], []
908 for x in xs:
909 out[condition(x)].append(x)
911 return out
913 def combine_gains(responses):
914 non_scalars, scalars = split(responses, lambda resp: resp.is_scalar())
915 if scalars:
916 factor = num.prod([resp.get_scalar() for resp in scalars])
917 yield Gain(constant=factor)
919 for resp in non_scalars:
920 yield resp
922 return list(combine_gains(combine_pzs(unpack_multi(responses))))