1# http://pyrocko.org - GPLv3
2#
3# The Pyrocko Developers, 21st Century
4# ---|P------/S----------~Lg----------
6'''This module provides convenience objects to handle frequency responses.'''
8import math
9import logging
11import numpy as num
12from scipy import signal
14from pyrocko import util, evalresp
15from pyrocko.guts import Object, Float, Int, String, Complex, Tuple, List, \
16 StringChoice, Bool
17from pyrocko.guts_array import Array
20guts_prefix = 'pf'
22logger = logging.getLogger('pyrocko.response')
25def asarray_1d(x, dtype):
26 if isinstance(x, (list, tuple)) and x and isinstance(x[0], str):
27 return num.asarray(list(map(dtype, x)), dtype=dtype)
28 else:
29 a = num.asarray(x, dtype=dtype)
30 if not a.ndim == 1:
31 raise ValueError('could not convert to 1D array')
32 return a
35def finalize_construction(breakpoints):
36 breakpoints.sort()
37 breakpoints_out = []
38 f_last = None
39 for f, c in breakpoints:
40 if f_last is not None and f == f_last:
41 breakpoints_out[-1][1] += c
42 else:
43 breakpoints_out.append([f, c])
45 f_last = f
47 breakpoints_out = [(f, c) for (f, c) in breakpoints_out if c != 0]
48 return breakpoints_out
51class FrequencyResponseCheckpoint(Object):
52 frequency = Float.T()
53 value = Float.T()
56class IsNotScalar(Exception):
57 pass
60def str_fmax_failsafe(resp):
61 try:
62 return '%g' % resp.get_fmax()
63 except InvalidResponseError:
64 return '?'
67class FrequencyResponse(Object):
68 '''
69 Evaluates frequency response at given frequencies.
70 '''
72 checkpoints = List.T(FrequencyResponseCheckpoint.T())
74 def evaluate(self, freqs):
75 return num.ones(freqs.size, dtype=complex)
77 def evaluate1(self, freq):
78 return self.evaluate(num.atleast_1d(freq))[0]
80 def is_scalar(self):
81 '''
82 Check if this is a flat response.
83 '''
85 if type(self) is FrequencyResponse:
86 return True
87 else:
88 return False # default for derived classes
90 def get_scalar(self):
91 '''
92 Get factor if this is a flat response.
93 '''
94 if type(self) is FrequencyResponse:
95 return 1.0
96 else:
97 raise IsNotScalar() # default for derived classes
99 def get_fmax(self):
100 return None
102 def construction(self):
103 return []
105 @property
106 def summary(self):
107 if type(self) is FrequencyResponse:
108 return 'one'
109 else:
110 return 'unknown'
113def str_gain(gain):
114 if gain == 1.0:
115 return 'one'
116 elif isinstance(gain, complex):
117 return 'gain{%s}' % repr(gain)
118 else:
119 return 'gain{%g}' % gain
122class Gain(FrequencyResponse):
123 '''
124 A flat frequency response.
125 '''
127 constant = Complex.T(default=1.0+0j)
129 def evaluate(self, freqs):
130 return util.num_full_like(freqs, self.constant, dtype=complex)
132 def is_scalar(self):
133 return True
135 def get_scalar(self):
136 return self.constant
138 @property
139 def summary(self):
140 return str_gain(self.constant)
143class Evalresp(FrequencyResponse):
144 '''
145 Calls evalresp and generates values of the instrument response transfer
146 function.
148 :param respfile: response file in evalresp format
149 :param trace: trace for which the response is to be extracted from the file
150 :param target: ``'dis'`` for displacement or ``'vel'`` for velocity
151 '''
153 respfile = String.T()
154 nslc_id = Tuple.T(4, String.T())
155 target = String.T(default='dis')
156 instant = Float.T()
157 stages = Tuple.T(2, Int.T(), optional=True)
159 def __init__(
160 self,
161 respfile,
162 trace=None,
163 target='dis',
164 nslc_id=None,
165 time=None,
166 stages=None,
167 **kwargs):
169 if trace is not None:
170 nslc_id = trace.nslc_id
171 time = (trace.tmin + trace.tmax) / 2.
173 FrequencyResponse.__init__(
174 self,
175 respfile=respfile,
176 nslc_id=nslc_id,
177 instant=time,
178 target=target,
179 stages=stages,
180 **kwargs)
182 def evaluate(self, freqs):
183 network, station, location, channel = self.nslc_id
184 if self.stages is None:
185 stages = (-1, 0)
186 else:
187 stages = self.stages[0]+1, self.stages[1]
189 x = evalresp.evalresp(
190 sta_list=station,
191 cha_list=channel,
192 net_code=network,
193 locid=location,
194 instant=self.instant,
195 freqs=freqs,
196 units=self.target.upper(),
197 file=self.respfile,
198 start_stage=stages[0],
199 stop_stage=stages[1],
200 rtype='CS')
202 transfer = x[0][4]
203 return transfer
205 @property
206 def summary(self):
207 return 'eresp'
210class InverseEvalresp(FrequencyResponse):
211 '''
212 Calls evalresp and generates values of the inverse instrument response for
213 deconvolution of instrument response.
215 :param respfile: response file in evalresp format
216 :param trace: trace for which the response is to be extracted from the file
217 :param target: ``'dis'`` for displacement or ``'vel'`` for velocity
218 '''
220 respfile = String.T()
221 nslc_id = Tuple.T(4, String.T())
222 target = String.T(default='dis')
223 instant = Float.T()
225 def __init__(self, respfile, trace, target='dis', **kwargs):
226 FrequencyResponse.__init__(
227 self,
228 respfile=respfile,
229 nslc_id=trace.nslc_id,
230 instant=(trace.tmin + trace.tmax)/2.,
231 target=target,
232 **kwargs)
234 def evaluate(self, freqs):
235 network, station, location, channel = self.nslc_id
236 x = evalresp.evalresp(sta_list=station,
237 cha_list=channel,
238 net_code=network,
239 locid=location,
240 instant=self.instant,
241 freqs=freqs,
242 units=self.target.upper(),
243 file=self.respfile,
244 rtype='CS')
246 transfer = x[0][4]
247 return 1./transfer
249 @property
250 def summary(self):
251 return 'inv_eresp'
254def aslist(x):
255 if x is None:
256 return []
258 try:
259 return list(x)
260 except TypeError:
261 return [x]
264class PoleZeroResponse(FrequencyResponse):
265 '''
266 Evaluates frequency response from pole-zero representation.
268 :param zeros: positions of zeros
269 :type zeros: list of complex
270 :param poles: positions of poles
271 :type poles: list of complex
272 :param constant: gain factor
273 :type constant: complex
275 ::
277 (j*2*pi*f - zeros[0]) * (j*2*pi*f - zeros[1]) * ...
278 T(f) = constant * ----------------------------------------------------
279 (j*2*pi*f - poles[0]) * (j*2*pi*f - poles[1]) * ...
282 The poles and zeros should be given as angular frequencies, not in Hz.
283 '''
285 zeros = List.T(Complex.T())
286 poles = List.T(Complex.T())
287 constant = Complex.T(default=1.0+0j)
289 def __init__(
290 self,
291 zeros=None,
292 poles=None,
293 constant=1.0+0j,
294 **kwargs):
296 if zeros is None:
297 zeros = []
298 if poles is None:
299 poles = []
301 FrequencyResponse.__init__(
302 self,
303 zeros=aslist(zeros),
304 poles=aslist(poles),
305 constant=constant,
306 **kwargs)
308 def evaluate(self, freqs):
309 if hasattr(signal, 'freqs_zpk'): # added in scipy 0.19.0
310 return signal.freqs_zpk(
311 self.zeros, self.poles, self.constant, freqs*2.*num.pi)[1]
312 else:
313 jomeg = 1.0j * 2.*num.pi*freqs
315 a = num.ones(freqs.size, dtype=complex)*self.constant
316 for z in self.zeros:
317 a *= jomeg-z
318 for p in self.poles:
319 a /= jomeg-p
321 return a
323 def is_scalar(self):
324 return len(self.zeros) == 0 and len(self.poles) == 0
326 def get_scalar(self):
327 '''
328 Get factor if this is a flat response.
329 '''
330 if self.is_scalar():
331 return self.constant
332 else:
333 raise IsNotScalar()
335 def inverse(self):
336 return PoleZeroResponse(
337 poles=list(self.zeros),
338 zeros=list(self.poles),
339 constant=1.0/self.constant)
341 def to_analog(self):
342 b, a = signal.zpk2tf(self.zeros, self.poles, self.constant)
343 return AnalogFilterResponse(aslist(b), aslist(a))
345 def to_digital(self, deltat, method='bilinear'):
346 from scipy.signal import cont2discrete, zpk2tf
348 z, p, k, _ = cont2discrete(
349 (self.zeros, self.poles, self.constant),
350 deltat, method=method)
352 b, a = zpk2tf(z, p, k)
354 return DigitalFilterResponse(b, a, deltat)
356 def to_digital_polezero(self, deltat, method='bilinear'):
357 from scipy.signal import cont2discrete
359 z, p, k, _ = cont2discrete(
360 (self.zeros, self.poles, self.constant),
361 deltat, method=method)
363 return DigitalPoleZeroResponse(z, p, k, deltat)
365 def construction(self):
366 breakpoints = []
367 for zero in self.zeros:
368 f = abs(zero) / (2.*math.pi)
369 breakpoints.append((f, 1))
371 for pole in self.poles:
372 f = abs(pole) / (2.*math.pi)
373 breakpoints.append((f, -1))
375 return finalize_construction(breakpoints)
377 @property
378 def summary(self):
379 if self.is_scalar():
380 return str_gain(self.get_scalar())
382 return 'pz{%i,%i}' % (len(self.poles), len(self.zeros))
385class DigitalPoleZeroResponse(FrequencyResponse):
386 '''
387 Evaluates frequency response from digital filter pole-zero representation.
389 :param zeros: positions of zeros
390 :type zeros: list of complex
391 :param poles: positions of poles
392 :type poles: list of complex
393 :param constant: gain factor
394 :type constant: complex
395 :param deltat: sampling interval
396 :type deltat: float
398 The poles and zeros should be given as angular frequencies, not in Hz.
399 '''
401 zeros = List.T(Complex.T())
402 poles = List.T(Complex.T())
403 constant = Complex.T(default=1.0+0j)
404 deltat = Float.T()
406 def __init__(
407 self,
408 zeros=None,
409 poles=None,
410 constant=1.0+0j,
411 deltat=None,
412 **kwargs):
414 if zeros is None:
415 zeros = []
416 if poles is None:
417 poles = []
418 if deltat is None:
419 raise ValueError(
420 'Sampling interval `deltat` must be given for '
421 'DigitalPoleZeroResponse')
423 FrequencyResponse.__init__(
424 self, zeros=aslist(zeros), poles=aslist(poles), constant=constant,
425 deltat=deltat, **kwargs)
427 def check_sampling_rate(self):
428 if self.deltat == 0.0:
429 raise InvalidResponseError(
430 'Invalid digital response: sampling rate undefined')
432 def get_fmax(self):
433 self.check_sampling_rate()
434 return 0.5 / self.deltat
436 def evaluate(self, freqs):
437 self.check_sampling_rate()
438 return signal.freqz_zpk(
439 self.zeros, self.poles, self.constant,
440 freqs*(2.*math.pi*self.deltat))[1]
442 def is_scalar(self):
443 return len(self.zeros) == 0 and len(self.poles) == 0
445 def get_scalar(self):
446 '''
447 Get factor if this is a flat response.
448 '''
449 if self.is_scalar():
450 return self.constant
451 else:
452 raise IsNotScalar()
454 def to_digital(self, deltat):
455 self.check_sampling_rate()
456 from scipy.signal import zpk2tf
458 b, a = zpk2tf(self.zeros, self.poles, self.constant)
459 return DigitalFilterResponse(b, a, deltat)
461 @property
462 def summary(self):
463 if self.is_scalar():
464 return str_gain(self.get_scalar())
466 return 'dpz{%i,%i,%s}' % (
467 len(self.poles), len(self.zeros), str_fmax_failsafe(self))
470class ButterworthResponse(FrequencyResponse):
471 '''
472 Butterworth frequency response.
474 :param corner: corner frequency of the response
475 :param order: order of the response
476 :param type: either ``high`` or ``low``
477 '''
479 corner = Float.T(default=1.0)
480 order = Int.T(default=4)
481 type = StringChoice.T(choices=['low', 'high'], default='low')
483 def to_polezero(self):
484 z, p, k = signal.butter(
485 self.order, self.corner*2.*math.pi,
486 btype=self.type, analog=True, output='zpk')
488 return PoleZeroResponse(
489 zeros=aslist(z),
490 poles=aslist(p),
491 constant=float(k))
493 def to_digital(self, deltat):
494 b, a = signal.butter(
495 self.order, self.corner*2.*deltat,
496 self.type, analog=False)
498 return DigitalFilterResponse(b, a, deltat)
500 def to_analog(self):
501 b, a = signal.butter(
502 self.order, self.corner*2.*math.pi,
503 self.type, analog=True)
505 return AnalogFilterResponse(b, a)
507 def to_digital_polezero(self, deltat):
508 z, p, k = signal.butter(
509 self.order, self.corner*2*deltat,
510 btype=self.type, analog=False, output='zpk')
512 return DigitalPoleZeroResponse(z, p, k, deltat)
514 def evaluate(self, freqs):
515 b, a = signal.butter(
516 self.order, self.corner*2.*math.pi,
517 self.type, analog=True)
519 return signal.freqs(b, a, freqs*2.*math.pi)[1]
521 @property
522 def summary(self):
523 return 'butter_%s{%i,%g}' % (
524 self.type,
525 self.order,
526 self.corner)
529class SampledResponse(FrequencyResponse):
530 '''
531 Interpolates frequency response given at a set of sampled frequencies.
533 :param frequencies,values: frequencies and values of the sampled response
534 function.
535 :param left,right: values to return when input is out of range. If set to
536 ``None`` (the default) the endpoints are returned.
537 '''
539 frequencies = Array.T(shape=(None,), dtype=float, serialize_as='list')
540 values = Array.T(shape=(None,), dtype=complex, serialize_as='list')
541 left = Complex.T(optional=True)
542 right = Complex.T(optional=True)
544 def __init__(self, frequencies, values, left=None, right=None, **kwargs):
545 FrequencyResponse.__init__(
546 self,
547 frequencies=asarray_1d(frequencies, float),
548 values=asarray_1d(values, complex),
549 **kwargs)
551 def evaluate(self, freqs):
552 ereal = num.interp(
553 freqs, self.frequencies, num.real(self.values),
554 left=self.left, right=self.right)
555 eimag = num.interp(
556 freqs, self.frequencies, num.imag(self.values),
557 left=self.left, right=self.right)
558 transfer = ereal + 1.0j*eimag
559 return transfer
561 def inverse(self):
562 '''
563 Get inverse as a new :py:class:`SampledResponse` object.
564 '''
566 def inv_or_none(x):
567 if x is not None:
568 return 1./x
570 return SampledResponse(
571 self.frequencies, 1./self.values,
572 left=inv_or_none(self.left),
573 right=inv_or_none(self.right))
575 @property
576 def summary(self):
577 return 'sampled'
580class IntegrationResponse(FrequencyResponse):
581 '''
582 The integration response, optionally multiplied by a constant gain.
584 :param n: exponent (integer)
585 :param gain: gain factor (float)
587 ::
589 gain
590 T(f) = --------------
591 (j*2*pi * f)^n
592 '''
594 n = Int.T(optional=True, default=1)
595 gain = Float.T(optional=True, default=1.0)
597 def __init__(self, n=1, gain=1.0, **kwargs):
598 FrequencyResponse.__init__(self, n=n, gain=gain, **kwargs)
600 def evaluate(self, freqs):
601 nonzero = freqs != 0.0
602 resp = num.zeros(freqs.size, dtype=complex)
603 resp[nonzero] = self.gain / (1.0j * 2. * num.pi*freqs[nonzero])**self.n
604 return resp
606 @property
607 def summary(self):
608 return 'integration{%i}' % self.n + (
609 '*gain{%g}' % self.gain
610 if self.gain is not None and self.gain != 1.0
611 else '')
614class DifferentiationResponse(FrequencyResponse):
615 '''
616 The differentiation response, optionally multiplied by a constant gain.
618 :param n: exponent (integer)
619 :param gain: gain factor (float)
621 ::
623 T(f) = gain * (j*2*pi * f)^n
624 '''
626 n = Int.T(optional=True, default=1)
627 gain = Float.T(optional=True, default=1.0)
629 def __init__(self, n=1, gain=1.0, **kwargs):
630 FrequencyResponse.__init__(self, n=n, gain=gain, **kwargs)
632 def evaluate(self, freqs):
633 return self.gain * (1.0j * 2. * num.pi * freqs)**self.n
635 @property
636 def summary(self):
637 return 'differentiation{%i}' % self.n + (
638 '*gain{%g}' % self.gain
639 if self.gain is not None and self.gain != 1.0
640 else '')
643class DigitalFilterResponse(FrequencyResponse):
644 '''
645 Frequency response of an analog filter.
647 (see :py:func:`scipy.signal.freqz`).
648 '''
650 b = List.T(Float.T())
651 a = List.T(Float.T())
652 deltat = Float.T()
653 drop_phase = Bool.T(default=False)
655 def __init__(self, b, a, deltat, drop_phase=False, **kwargs):
656 FrequencyResponse.__init__(
657 self, b=aslist(b), a=aslist(a), deltat=float(deltat),
658 drop_phase=drop_phase, **kwargs)
660 def check_sampling_rate(self):
661 if self.deltat == 0.0:
662 raise InvalidResponseError(
663 'Invalid digital response: sampling rate undefined')
665 def is_scalar(self):
666 return len(self.a) == 1 and len(self.b) == 1
668 def get_scalar(self):
669 if self.is_scalar():
670 return self.b[0] / self.a[0]
671 else:
672 raise IsNotScalar()
674 def get_fmax(self):
675 if not self.is_scalar():
676 self.check_sampling_rate()
677 return 0.5 / self.deltat
678 else:
679 return None
681 def evaluate(self, freqs):
682 if self.is_scalar():
683 return util.num_full_like(freqs, self.get_scalar(), dtype=complex)
685 self.check_sampling_rate()
687 ok = freqs <= 0.5/self.deltat
688 coeffs = num.zeros(freqs.size, dtype=complex)
690 coeffs[ok] = signal.freqz(
691 self.b, self.a, freqs[ok]*2.*math.pi * self.deltat)[1]
693 coeffs[num.logical_not(ok)] = None
694 if self.drop_phase:
695 return num.abs(coeffs)
696 else:
697 return coeffs
699 def filter(self, tr):
700 self.check_sampling_rate()
702 from pyrocko import trace
703 trace.assert_same_sampling_rate(self, tr)
704 tr_new = tr.copy(data=False)
705 tr_new.set_ydata(signal.lfilter(self.b, self.a, tr.get_ydata()))
706 return tr_new
708 @property
709 def summary(self):
710 if self.is_scalar():
711 return str_gain(self.get_scalar())
713 elif len(self.a) == 1:
714 return 'fir{%i,<=%sHz}' % (
715 len(self.b), str_fmax_failsafe(self))
717 else:
718 return 'iir{%i,%i,<=%sHz}' % (
719 len(self.b), len(self.a), str_fmax_failsafe(self))
722class AnalogFilterResponse(FrequencyResponse):
723 '''
724 Frequency response of an analog filter.
726 (see :py:func:`scipy.signal.freqs`).
727 '''
729 b = List.T(Float.T())
730 a = List.T(Float.T())
732 def __init__(self, b, a, **kwargs):
733 FrequencyResponse.__init__(
734 self, b=aslist(b), a=aslist(a), **kwargs)
736 def is_scalar(self):
737 return len(self.a) == 1 and len(self.b) == 1
739 def get_scalar(self):
740 if self.is_scalar():
741 return self.b[0] / self.a[0]
742 else:
743 raise IsNotScalar()
745 def evaluate(self, freqs):
746 return signal.freqs(self.b, self.a, freqs*2.*math.pi)[1]
748 def to_digital(self, deltat, method='bilinear'):
749 from scipy.signal import cont2discrete
750 b, a, _ = cont2discrete((self.b, self.a), deltat, method=method)
751 if b.ndim == 2:
752 b = b[0]
753 return DigitalFilterResponse(b.tolist(), a.tolist(), deltat)
755 @property
756 def summary(self):
757 if self.is_scalar():
758 return str_gain(self.get_scalar())
760 return 'analog{%i,%i,%g}' % (
761 len(self.b), len(self.a), self.get_fmax())
764class MultiplyResponse(FrequencyResponse):
765 '''
766 Multiplication of several :py:class:`FrequencyResponse` objects.
767 '''
769 responses = List.T(FrequencyResponse.T())
771 def __init__(self, responses=None, **kwargs):
772 if responses is None:
773 responses = []
774 FrequencyResponse.__init__(self, responses=responses, **kwargs)
776 def get_fmax(self):
777 fmaxs = [resp.get_fmax() for resp in self.responses]
778 fmaxs = [fmax for fmax in fmaxs if fmax is not None]
779 if not fmaxs:
780 return None
781 else:
782 return min(fmaxs)
784 def evaluate(self, freqs):
785 a = num.ones(freqs.size, dtype=complex)
786 for resp in self.responses:
787 a *= resp.evaluate(freqs)
789 return a
791 def is_scalar(self):
792 return all(resp.is_scalar() for resp in self.responses)
794 def get_scalar(self):
795 '''
796 Get factor if this is a flat response.
797 '''
798 if self.is_scalar():
799 return num.product(resp.get_scalar() for resp in self.responses)
800 else:
801 raise IsNotScalar()
803 def simplify(self):
804 self.responses = simplify_responses(self.responses)
806 def construction(self):
807 breakpoints = []
808 for resp in self.responses:
809 breakpoints.extend(resp.construction())
811 return finalize_construction(breakpoints)
813 @property
814 def summary(self):
815 if self.is_scalar(self):
816 return str_gain(self.get_scalar())
817 else:
818 xs = [x.summary for x in self.responses]
819 return '(%s)' % ('*'.join(x for x in xs if x != 'one') or 'one')
822class DelayResponse(FrequencyResponse):
824 delay = Float.T()
826 def evaluate(self, freqs):
827 return num.exp(-2.0J * self.delay * num.pi * freqs)
829 @property
830 def summary(self):
831 return 'delay{%g}' % self.delay
834class InvalidResponseError(Exception):
835 pass
838class InvalidResponse(FrequencyResponse):
840 message = String.T()
842 def __init__(self, message):
843 FrequencyResponse.__init__(self, message=message)
844 self.have_warned = False
846 def evaluate(self, freqs):
847 if not self.have_warned:
848 logger.warning('Invalid response: %s' % self.message)
849 self.have_warned = True
851 return util.num_full_like(freqs, None, dtype=num.complex)
853 @property
854 def summary(self):
855 return 'invalid'
858def simplify_responses(responses):
860 def unpack_multi(responses):
861 for resp in responses:
862 if isinstance(resp, MultiplyResponse):
863 for sub in unpack_multi(resp.responses):
864 yield sub
865 else:
866 yield resp
868 def cancel_pzs(poles, zeros):
869 poles_new = []
870 zeros_new = list(zeros)
871 for p in poles:
872 try:
873 zeros_new.pop(zeros_new.index(p))
874 except ValueError:
875 poles_new.append(p)
877 return poles_new, zeros_new
879 def combine_pzs(responses):
880 poles = []
881 zeros = []
882 constant = 1.0
883 out = []
884 for resp in responses:
885 if isinstance(resp, PoleZeroResponse):
886 poles.extend(resp.poles)
887 zeros.extend(resp.zeros)
888 constant *= resp.constant
889 else:
890 out.append(resp)
892 poles, zeros = cancel_pzs(poles, zeros)
893 if poles or zeros:
894 out.insert(0, PoleZeroResponse(
895 poles=poles, zeros=zeros, constant=constant))
896 elif constant != 1.0:
897 out.insert(0, Gain(constant=constant))
899 return out
901 def split(xs, condition):
902 out = [], []
903 for x in xs:
904 out[condition(x)].append(x)
906 return out
908 def combine_gains(responses):
909 non_scalars, scalars = split(responses, lambda resp: resp.is_scalar())
910 if scalars:
911 factor = num.prod([resp.get_scalar() for resp in scalars])
912 yield Gain(constant=factor)
914 for resp in non_scalars:
915 yield resp
917 return list(combine_gains(combine_pzs(unpack_multi(responses))))