1# http://pyrocko.org - GPLv3
2#
3# The Pyrocko Developers, 21st Century
4# ---|P------/S----------~Lg----------
6'''This module provides convenience objects to handle frequency responses.'''
8import math
9import logging
11import numpy as num
12from scipy import signal
14from pyrocko import util, evalresp
15from pyrocko.guts import Object, Float, Int, String, Complex, Tuple, List, \
16 StringChoice, Bool
17from pyrocko.guts_array import Array
20guts_prefix = 'pf'
22logger = logging.getLogger('pyrocko.response')
25def asarray_1d(x, dtype):
26 if isinstance(x, (list, tuple)) and x and isinstance(x[0], str):
27 return num.asarray(list(map(dtype, x)), dtype=dtype)
28 else:
29 a = num.asarray(x, dtype=dtype)
30 if not a.ndim == 1:
31 raise ValueError('could not convert to 1D array')
32 return a
35def finalize_construction(breakpoints):
36 breakpoints.sort()
37 breakpoints_out = []
38 f_last = None
39 for f, c in breakpoints:
40 if f_last is not None and f == f_last:
41 breakpoints_out[-1][1] += c
42 else:
43 breakpoints_out.append([f, c])
45 f_last = f
47 breakpoints_out = [(f, c) for (f, c) in breakpoints_out if c != 0]
48 return breakpoints_out
51class FrequencyResponseCheckpoint(Object):
52 frequency = Float.T()
53 value = Float.T()
56class IsNotScalar(Exception):
57 pass
60class FrequencyResponse(Object):
61 '''
62 Evaluates frequency response at given frequencies.
63 '''
65 checkpoints = List.T(FrequencyResponseCheckpoint.T())
67 def evaluate(self, freqs):
68 return num.ones(freqs.size, dtype=complex)
70 def evaluate1(self, freq):
71 return self.evaluate(num.atleast_1d(freq))[0]
73 def is_scalar(self):
74 '''
75 Check if this is a flat response.
76 '''
78 if type(self) is FrequencyResponse:
79 return True
80 else:
81 return False # default for derived classes
83 def get_scalar(self):
84 '''
85 Get factor if this is a flat response.
86 '''
87 if type(self) is FrequencyResponse:
88 return 1.0
89 else:
90 raise IsNotScalar() # default for derived classes
92 def get_fmax(self):
93 return None
95 def construction(self):
96 return []
99class Gain(FrequencyResponse):
100 '''
101 A flat frequency response.
102 '''
104 constant = Complex.T(default=1.0+0j)
106 def evaluate(self, freqs):
107 return util.num_full_like(freqs, self.constant, dtype=complex)
109 def is_scalar(self):
110 return True
112 def get_scalar(self):
113 return self.constant
116class Evalresp(FrequencyResponse):
117 '''
118 Calls evalresp and generates values of the instrument response transfer
119 function.
121 :param respfile: response file in evalresp format
122 :param trace: trace for which the response is to be extracted from the file
123 :param target: ``'dis'`` for displacement or ``'vel'`` for velocity
124 '''
126 respfile = String.T()
127 nslc_id = Tuple.T(4, String.T())
128 target = String.T(default='dis')
129 instant = Float.T()
130 stages = Tuple.T(2, Int.T(), optional=True)
132 def __init__(
133 self,
134 respfile,
135 trace=None,
136 target='dis',
137 nslc_id=None,
138 time=None,
139 stages=None,
140 **kwargs):
142 if trace is not None:
143 nslc_id = trace.nslc_id
144 time = (trace.tmin + trace.tmax) / 2.
146 FrequencyResponse.__init__(
147 self,
148 respfile=respfile,
149 nslc_id=nslc_id,
150 instant=time,
151 target=target,
152 stages=stages,
153 **kwargs)
155 def evaluate(self, freqs):
156 network, station, location, channel = self.nslc_id
157 if self.stages is None:
158 stages = (-1, 0)
159 else:
160 stages = self.stages[0]+1, self.stages[1]
162 x = evalresp.evalresp(
163 sta_list=station,
164 cha_list=channel,
165 net_code=network,
166 locid=location,
167 instant=self.instant,
168 freqs=freqs,
169 units=self.target.upper(),
170 file=self.respfile,
171 start_stage=stages[0],
172 stop_stage=stages[1],
173 rtype='CS')
175 transfer = x[0][4]
176 return transfer
179class InverseEvalresp(FrequencyResponse):
180 '''
181 Calls evalresp and generates values of the inverse instrument response for
182 deconvolution of instrument response.
184 :param respfile: response file in evalresp format
185 :param trace: trace for which the response is to be extracted from the file
186 :param target: ``'dis'`` for displacement or ``'vel'`` for velocity
187 '''
189 respfile = String.T()
190 nslc_id = Tuple.T(4, String.T())
191 target = String.T(default='dis')
192 instant = Float.T()
194 def __init__(self, respfile, trace, target='dis', **kwargs):
195 FrequencyResponse.__init__(
196 self,
197 respfile=respfile,
198 nslc_id=trace.nslc_id,
199 instant=(trace.tmin + trace.tmax)/2.,
200 target=target,
201 **kwargs)
203 def evaluate(self, freqs):
204 network, station, location, channel = self.nslc_id
205 x = evalresp.evalresp(sta_list=station,
206 cha_list=channel,
207 net_code=network,
208 locid=location,
209 instant=self.instant,
210 freqs=freqs,
211 units=self.target.upper(),
212 file=self.respfile,
213 rtype='CS')
215 transfer = x[0][4]
216 return 1./transfer
219def aslist(x):
220 if x is None:
221 return []
223 try:
224 return list(x)
225 except TypeError:
226 return [x]
229class PoleZeroResponse(FrequencyResponse):
230 '''
231 Evaluates frequency response from pole-zero representation.
233 :param zeros: positions of zeros
234 :type zeros: list of complex
235 :param poles: positions of poles
236 :type poles: list of complex
237 :param constant: gain factor
238 :type constant: complex
240 ::
242 (j*2*pi*f - zeros[0]) * (j*2*pi*f - zeros[1]) * ...
243 T(f) = constant * ----------------------------------------------------
244 (j*2*pi*f - poles[0]) * (j*2*pi*f - poles[1]) * ...
247 The poles and zeros should be given as angular frequencies, not in Hz.
248 '''
250 zeros = List.T(Complex.T())
251 poles = List.T(Complex.T())
252 constant = Complex.T(default=1.0+0j)
254 def __init__(
255 self,
256 zeros=None,
257 poles=None,
258 constant=1.0+0j,
259 **kwargs):
261 if zeros is None:
262 zeros = []
263 if poles is None:
264 poles = []
266 FrequencyResponse.__init__(
267 self,
268 zeros=aslist(zeros),
269 poles=aslist(poles),
270 constant=constant,
271 **kwargs)
273 def evaluate(self, freqs):
274 if hasattr(signal, 'freqs_zpk'): # added in scipy 0.19.0
275 return signal.freqs_zpk(
276 self.zeros, self.poles, self.constant, freqs*2.*num.pi)[1]
277 else:
278 jomeg = 1.0j * 2.*num.pi*freqs
280 a = num.ones(freqs.size, dtype=complex)*self.constant
281 for z in self.zeros:
282 a *= jomeg-z
283 for p in self.poles:
284 a /= jomeg-p
286 return a
288 def is_scalar(self):
289 return len(self.zeros) == 0 and len(self.poles) == 0
291 def get_scalar(self):
292 '''
293 Get factor if this is a flat response.
294 '''
295 if self.is_scalar():
296 return self.constant
297 else:
298 raise IsNotScalar()
300 def inverse(self):
301 return PoleZeroResponse(
302 poles=list(self.zeros),
303 zeros=list(self.poles),
304 constant=1.0/self.constant)
306 def to_analog(self):
307 b, a = signal.zpk2tf(self.zeros, self.poles, self.constant)
308 return AnalogFilterResponse(aslist(b), aslist(a))
310 def to_digital(self, deltat, method='bilinear'):
311 from scipy.signal import cont2discrete, zpk2tf
313 z, p, k, _ = cont2discrete(
314 (self.zeros, self.poles, self.constant),
315 deltat, method=method)
317 b, a = zpk2tf(z, p, k)
319 return DigitalFilterResponse(b, a, deltat)
321 def to_digital_polezero(self, deltat, method='bilinear'):
322 from scipy.signal import cont2discrete
324 z, p, k, _ = cont2discrete(
325 (self.zeros, self.poles, self.constant),
326 deltat, method=method)
328 return DigitalPoleZeroResponse(z, p, k, deltat)
330 def construction(self):
331 breakpoints = []
332 for zero in self.zeros:
333 f = abs(zero) / (2.*math.pi)
334 breakpoints.append((f, 1))
336 for pole in self.poles:
337 f = abs(pole) / (2.*math.pi)
338 breakpoints.append((f, -1))
340 return finalize_construction(breakpoints)
343class DigitalPoleZeroResponse(FrequencyResponse):
344 '''
345 Evaluates frequency response from digital filter pole-zero representation.
347 :param zeros: positions of zeros
348 :type zeros: list of complex
349 :param poles: positions of poles
350 :type poles: list of complex
351 :param constant: gain factor
352 :type constant: complex
353 :param deltat: sampling interval
354 :type deltat: float
356 The poles and zeros should be given as angular frequencies, not in Hz.
357 '''
359 zeros = List.T(Complex.T())
360 poles = List.T(Complex.T())
361 constant = Complex.T(default=1.0+0j)
362 deltat = Float.T()
364 def __init__(
365 self,
366 zeros=None,
367 poles=None,
368 constant=1.0+0j,
369 deltat=None,
370 **kwargs):
372 if zeros is None:
373 zeros = []
374 if poles is None:
375 poles = []
376 if deltat is None:
377 raise ValueError(
378 'Sampling interval `deltat` must be given for '
379 'DigitalPoleZeroResponse')
381 FrequencyResponse.__init__(
382 self, zeros=aslist(zeros), poles=aslist(poles), constant=constant,
383 deltat=deltat, **kwargs)
385 def check_sampling_rate(self):
386 if self.deltat == 0.0:
387 raise InvalidResponseError(
388 'Invalid digital response: sampling rate undefined')
390 def get_fmax(self):
391 self.check_sampling_rate()
392 return 0.5 / self.deltat
394 def evaluate(self, freqs):
395 self.check_sampling_rate()
396 return signal.freqz_zpk(
397 self.zeros, self.poles, self.constant,
398 freqs*(2.*math.pi*self.deltat))[1]
400 def is_scalar(self):
401 return len(self.zeros) == 0 and len(self.poles) == 0
403 def get_scalar(self):
404 '''
405 Get factor if this is a flat response.
406 '''
407 if self.is_scalar():
408 return self.constant
409 else:
410 raise IsNotScalar()
412 def to_digital(self, deltat):
413 self.check_sampling_rate()
414 from scipy.signal import zpk2tf
416 b, a = zpk2tf(self.zeros, self.poles, self.constant)
417 return DigitalFilterResponse(b, a, deltat)
420class ButterworthResponse(FrequencyResponse):
421 '''
422 Butterworth frequency response.
424 :param corner: corner frequency of the response
425 :param order: order of the response
426 :param type: either ``high`` or ``low``
427 '''
429 corner = Float.T(default=1.0)
430 order = Int.T(default=4)
431 type = StringChoice.T(choices=['low', 'high'], default='low')
433 def to_polezero(self):
434 z, p, k = signal.butter(
435 self.order, self.corner*2.*math.pi,
436 btype=self.type, analog=True, output='zpk')
438 return PoleZeroResponse(
439 zeros=aslist(z),
440 poles=aslist(p),
441 constant=float(k))
443 def to_digital(self, deltat):
444 b, a = signal.butter(
445 self.order, self.corner*2.*deltat,
446 self.type, analog=False)
448 return DigitalFilterResponse(b, a, deltat)
450 def to_analog(self):
451 b, a = signal.butter(
452 self.order, self.corner*2.*math.pi,
453 self.type, analog=True)
455 return AnalogFilterResponse(b, a)
457 def to_digital_polezero(self, deltat):
458 z, p, k = signal.butter(
459 self.order, self.corner*2*deltat,
460 btype=self.type, analog=False, output='zpk')
462 return DigitalPoleZeroResponse(z, p, k, deltat)
464 def evaluate(self, freqs):
465 b, a = signal.butter(
466 self.order, self.corner*2.*math.pi,
467 self.type, analog=True)
469 return signal.freqs(b, a, freqs*2.*math.pi)[1]
472class SampledResponse(FrequencyResponse):
473 '''
474 Interpolates frequency response given at a set of sampled frequencies.
476 :param frequencies,values: frequencies and values of the sampled response
477 function.
478 :param left,right: values to return when input is out of range. If set to
479 ``None`` (the default) the endpoints are returned.
480 '''
482 frequencies = Array.T(shape=(None,), dtype=float, serialize_as='list')
483 values = Array.T(shape=(None,), dtype=complex, serialize_as='list')
484 left = Complex.T(optional=True)
485 right = Complex.T(optional=True)
487 def __init__(self, frequencies, values, left=None, right=None, **kwargs):
488 FrequencyResponse.__init__(
489 self,
490 frequencies=asarray_1d(frequencies, float),
491 values=asarray_1d(values, complex),
492 **kwargs)
494 def evaluate(self, freqs):
495 ereal = num.interp(
496 freqs, self.frequencies, num.real(self.values),
497 left=self.left, right=self.right)
498 eimag = num.interp(
499 freqs, self.frequencies, num.imag(self.values),
500 left=self.left, right=self.right)
501 transfer = ereal + 1.0j*eimag
502 return transfer
504 def inverse(self):
505 '''
506 Get inverse as a new :py:class:`SampledResponse` object.
507 '''
509 def inv_or_none(x):
510 if x is not None:
511 return 1./x
513 return SampledResponse(
514 self.frequencies, 1./self.values,
515 left=inv_or_none(self.left),
516 right=inv_or_none(self.right))
519class IntegrationResponse(FrequencyResponse):
520 '''
521 The integration response, optionally multiplied by a constant gain.
523 :param n: exponent (integer)
524 :param gain: gain factor (float)
526 ::
528 gain
529 T(f) = --------------
530 (j*2*pi * f)^n
531 '''
533 n = Int.T(optional=True, default=1)
534 gain = Float.T(optional=True, default=1.0)
536 def __init__(self, n=1, gain=1.0, **kwargs):
537 FrequencyResponse.__init__(self, n=n, gain=gain, **kwargs)
539 def evaluate(self, freqs):
540 nonzero = freqs != 0.0
541 resp = num.zeros(freqs.size, dtype=complex)
542 resp[nonzero] = self.gain / (1.0j * 2. * num.pi*freqs[nonzero])**self.n
543 return resp
546class DifferentiationResponse(FrequencyResponse):
547 '''
548 The differentiation response, optionally multiplied by a constant gain.
550 :param n: exponent (integer)
551 :param gain: gain factor (float)
553 ::
555 T(f) = gain * (j*2*pi * f)^n
556 '''
558 n = Int.T(optional=True, default=1)
559 gain = Float.T(optional=True, default=1.0)
561 def __init__(self, n=1, gain=1.0, **kwargs):
562 FrequencyResponse.__init__(self, n=n, gain=gain, **kwargs)
564 def evaluate(self, freqs):
565 return self.gain * (1.0j * 2. * num.pi * freqs)**self.n
568class DigitalFilterResponse(FrequencyResponse):
569 '''
570 Frequency response of an analog filter.
572 (see :py:func:`scipy.signal.freqz`).
573 '''
575 b = List.T(Float.T())
576 a = List.T(Float.T())
577 deltat = Float.T()
578 drop_phase = Bool.T(default=False)
580 def __init__(self, b, a, deltat, drop_phase=False, **kwargs):
581 FrequencyResponse.__init__(
582 self, b=aslist(b), a=aslist(a), deltat=float(deltat),
583 drop_phase=drop_phase, **kwargs)
585 def check_sampling_rate(self):
586 if self.deltat == 0.0:
587 raise InvalidResponseError(
588 'Invalid digital response: sampling rate undefined')
590 def is_scalar(self):
591 return len(self.a) == 1 and len(self.b) == 1
593 def get_scalar(self):
594 if self.is_scalar():
595 return self.b[0] / self.a[0]
596 else:
597 raise IsNotScalar()
599 def get_fmax(self):
600 if not self.is_scalar():
601 self.check_sampling_rate()
602 return 0.5 / self.deltat
603 else:
604 return None
606 def evaluate(self, freqs):
607 if self.is_scalar():
608 return util.num_full_like(freqs, self.get_scalar(), dtype=complex)
610 self.check_sampling_rate()
612 ok = freqs <= 0.5/self.deltat
613 coeffs = num.zeros(freqs.size, dtype=complex)
615 coeffs[ok] = signal.freqz(
616 self.b, self.a, freqs[ok]*2.*math.pi * self.deltat)[1]
618 coeffs[num.logical_not(ok)] = None
619 if self.drop_phase:
620 return num.abs(coeffs)
621 else:
622 return coeffs
624 def filter(self, tr):
625 self.check_sampling_rate()
627 from pyrocko import trace
628 trace.assert_same_sampling_rate(self, tr)
629 tr_new = tr.copy(data=False)
630 tr_new.set_ydata(signal.lfilter(self.b, self.a, tr.get_ydata()))
631 return tr_new
634class AnalogFilterResponse(FrequencyResponse):
635 '''
636 Frequency response of an analog filter.
638 (see :py:func:`scipy.signal.freqs`).
639 '''
641 b = List.T(Float.T())
642 a = List.T(Float.T())
644 def __init__(self, b, a, **kwargs):
645 FrequencyResponse.__init__(
646 self, b=aslist(b), a=aslist(a), **kwargs)
648 def evaluate(self, freqs):
649 return signal.freqs(self.b, self.a, freqs*2.*math.pi)[1]
651 def to_digital(self, deltat, method='bilinear'):
652 from scipy.signal import cont2discrete
653 b, a, _ = cont2discrete((self.b, self.a), deltat, method=method)
654 if b.ndim == 2:
655 b = b[0]
656 return DigitalFilterResponse(b.tolist(), a.tolist(), deltat)
659class MultiplyResponse(FrequencyResponse):
660 '''
661 Multiplication of several :py:class:`FrequencyResponse` objects.
662 '''
664 responses = List.T(FrequencyResponse.T())
666 def __init__(self, responses=None, **kwargs):
667 if responses is None:
668 responses = []
669 FrequencyResponse.__init__(self, responses=responses, **kwargs)
671 def get_fmax(self):
672 fmaxs = [resp.get_fmax() for resp in self.responses]
673 fmaxs = [fmax for fmax in fmaxs if fmax is not None]
674 if not fmaxs:
675 return None
676 else:
677 return min(fmaxs)
679 def evaluate(self, freqs):
680 a = num.ones(freqs.size, dtype=complex)
681 for resp in self.responses:
682 a *= resp.evaluate(freqs)
684 return a
686 def is_scalar(self):
687 return all(resp.is_scalar() for resp in self.responses)
689 def get_scalar(self):
690 '''
691 Get factor if this is a flat response.
692 '''
693 if self.is_scalar():
694 return num.product(resp.get_scalar() for resp in self.responses)
695 else:
696 raise IsNotScalar()
698 def simplify(self):
699 self.responses = simplify_responses(self.responses)
701 def construction(self):
702 breakpoints = []
703 for resp in self.responses:
704 breakpoints.extend(resp.construction())
706 return finalize_construction(breakpoints)
709class DelayResponse(FrequencyResponse):
711 delay = Float.T()
713 def evaluate(self, freqs):
714 return num.exp(-2.0J * self.delay * num.pi * freqs)
717class InvalidResponseError(Exception):
718 pass
721class InvalidResponse(FrequencyResponse):
723 message = String.T()
725 def __init__(self, message):
726 FrequencyResponse.__init__(self, message=message)
727 self.have_warned = False
729 def evaluate(self, freqs):
730 if not self.have_warned:
731 logger.warning('Invalid response: %s' % self.message)
732 self.have_warned = True
734 return util.num_full_like(freqs, None, dtype=num.complex)
737def simplify_responses(responses):
739 def unpack_multi(responses):
740 for resp in responses:
741 if isinstance(resp, MultiplyResponse):
742 for sub in unpack_multi(resp.responses):
743 yield sub
744 else:
745 yield resp
747 def cancel_pzs(poles, zeros):
748 poles_new = []
749 zeros_new = list(zeros)
750 for p in poles:
751 try:
752 zeros_new.pop(zeros_new.index(p))
753 except ValueError:
754 poles_new.append(p)
756 return poles_new, zeros_new
758 def combine_pzs(responses):
759 poles = []
760 zeros = []
761 constant = 1.0
762 out = []
763 for resp in responses:
764 if isinstance(resp, PoleZeroResponse):
765 poles.extend(resp.poles)
766 zeros.extend(resp.zeros)
767 constant *= resp.constant
768 else:
769 out.append(resp)
771 poles, zeros = cancel_pzs(poles, zeros)
772 if poles or zeros:
773 out.insert(0, PoleZeroResponse(
774 poles=poles, zeros=zeros, constant=constant))
775 elif constant != 1.0:
776 out.insert(0, Gain(constant=constant))
778 return out
780 def split(xs, condition):
781 out = [], []
782 for x in xs:
783 out[condition(x)].append(x)
785 return out
787 def combine_gains(responses):
788 non_scalars, scalars = split(responses, lambda resp: resp.is_scalar())
789 if scalars:
790 factor = num.prod([resp.get_scalar() for resp in scalars])
791 yield Gain(constant=factor)
793 for resp in non_scalars:
794 yield resp
796 return list(combine_gains(combine_pzs(unpack_multi(responses))))