1# http://pyrocko.org - GPLv3
2#
3# The Pyrocko Developers, 21st Century
4# ---|P------/S----------~Lg----------
6'''This module provides convenience objects to handle frequency responses.'''
8from __future__ import print_function, division, absolute_import
10import math
11import logging
13import numpy as num
14from scipy import signal
16from pyrocko import util, evalresp
17from pyrocko.guts import Object, Float, Int, String, Complex, Tuple, List, \
18 StringChoice, Bool
19from pyrocko.guts_array import Array
21try:
22 newstr = unicode
23except NameError:
24 newstr = str
27guts_prefix = 'pf'
29logger = logging.getLogger('pyrocko.response')
32def asarray_1d(x, dtype):
33 if isinstance(x, (list, tuple)) and x and isinstance(x[0], (str, newstr)):
34 return num.asarray(list(map(dtype, x)), dtype=dtype)
35 else:
36 a = num.asarray(x, dtype=dtype)
37 if not a.ndim == 1:
38 raise ValueError('could not convert to 1D array')
39 return a
42def finalize_construction(breakpoints):
43 breakpoints.sort()
44 breakpoints_out = []
45 f_last = None
46 for f, c in breakpoints:
47 if f_last is not None and f == f_last:
48 breakpoints_out[-1][1] += c
49 else:
50 breakpoints_out.append([f, c])
52 f_last = f
54 breakpoints_out = [(f, c) for (f, c) in breakpoints_out if c != 0]
55 return breakpoints_out
58class FrequencyResponseCheckpoint(Object):
59 frequency = Float.T()
60 value = Float.T()
63class IsNotScalar(Exception):
64 pass
67class FrequencyResponse(Object):
68 '''
69 Evaluates frequency response at given frequencies.
70 '''
72 checkpoints = List.T(FrequencyResponseCheckpoint.T())
74 def evaluate(self, freqs):
75 return num.ones(freqs.size, dtype=complex)
77 def evaluate1(self, freq):
78 return self.evaluate(num.atleast_1d(freq))[0]
80 def is_scalar(self):
81 '''
82 Check if this is a flat response.
83 '''
85 if type(self) is FrequencyResponse:
86 return True
87 else:
88 return False # default for derived classes
90 def get_scalar(self):
91 '''
92 Get factor if this is a flat response.
93 '''
94 if type(self) is FrequencyResponse:
95 return 1.0
96 else:
97 raise IsNotScalar() # default for derived classes
99 def get_fmax(self):
100 return None
102 def construction(self):
103 return []
106class Gain(FrequencyResponse):
107 '''
108 A flat frequency response.
109 '''
111 constant = Complex.T(default=1.0+0j)
113 def evaluate(self, freqs):
114 return util.num_full_like(freqs, self.constant, dtype=complex)
116 def is_scalar(self):
117 return True
119 def get_scalar(self):
120 return self.constant
123class Evalresp(FrequencyResponse):
124 '''
125 Calls evalresp and generates values of the instrument response transfer
126 function.
128 :param respfile: response file in evalresp format
129 :param trace: trace for which the response is to be extracted from the file
130 :param target: ``'dis'`` for displacement or ``'vel'`` for velocity
131 '''
133 respfile = String.T()
134 nslc_id = Tuple.T(4, String.T())
135 target = String.T(default='dis')
136 instant = Float.T()
137 stages = Tuple.T(2, Int.T(), optional=True)
139 def __init__(
140 self,
141 respfile,
142 trace=None,
143 target='dis',
144 nslc_id=None,
145 time=None,
146 stages=None,
147 **kwargs):
149 if trace is not None:
150 nslc_id = trace.nslc_id
151 time = (trace.tmin + trace.tmax) / 2.
153 FrequencyResponse.__init__(
154 self,
155 respfile=respfile,
156 nslc_id=nslc_id,
157 instant=time,
158 target=target,
159 stages=stages,
160 **kwargs)
162 def evaluate(self, freqs):
163 network, station, location, channel = self.nslc_id
164 if self.stages is None:
165 stages = (-1, 0)
166 else:
167 stages = self.stages[0]+1, self.stages[1]
169 x = evalresp.evalresp(
170 sta_list=station,
171 cha_list=channel,
172 net_code=network,
173 locid=location,
174 instant=self.instant,
175 freqs=freqs,
176 units=self.target.upper(),
177 file=self.respfile,
178 start_stage=stages[0],
179 stop_stage=stages[1],
180 rtype='CS')
182 transfer = x[0][4]
183 return transfer
186class InverseEvalresp(FrequencyResponse):
187 '''
188 Calls evalresp and generates values of the inverse instrument response for
189 deconvolution of instrument response.
191 :param respfile: response file in evalresp format
192 :param trace: trace for which the response is to be extracted from the file
193 :param target: ``'dis'`` for displacement or ``'vel'`` for velocity
194 '''
196 respfile = String.T()
197 nslc_id = Tuple.T(4, String.T())
198 target = String.T(default='dis')
199 instant = Float.T()
201 def __init__(self, respfile, trace, target='dis', **kwargs):
202 FrequencyResponse.__init__(
203 self,
204 respfile=respfile,
205 nslc_id=trace.nslc_id,
206 instant=(trace.tmin + trace.tmax)/2.,
207 target=target,
208 **kwargs)
210 def evaluate(self, freqs):
211 network, station, location, channel = self.nslc_id
212 x = evalresp.evalresp(sta_list=station,
213 cha_list=channel,
214 net_code=network,
215 locid=location,
216 instant=self.instant,
217 freqs=freqs,
218 units=self.target.upper(),
219 file=self.respfile,
220 rtype='CS')
222 transfer = x[0][4]
223 return 1./transfer
226def aslist(x):
227 if x is None:
228 return []
230 try:
231 return list(x)
232 except TypeError:
233 return [x]
236class PoleZeroResponse(FrequencyResponse):
237 '''
238 Evaluates frequency response from pole-zero representation.
240 :param zeros: positions of zeros
241 :type zeros: list of complex
242 :param poles: positions of poles
243 :type poles: list of complex
244 :param constant: gain factor
245 :type constant: complex
247 ::
249 (j*2*pi*f - zeros[0]) * (j*2*pi*f - zeros[1]) * ...
250 T(f) = constant * ----------------------------------------------------
251 (j*2*pi*f - poles[0]) * (j*2*pi*f - poles[1]) * ...
254 The poles and zeros should be given as angular frequencies, not in Hz.
255 '''
257 zeros = List.T(Complex.T())
258 poles = List.T(Complex.T())
259 constant = Complex.T(default=1.0+0j)
261 def __init__(
262 self,
263 zeros=None,
264 poles=None,
265 constant=1.0+0j,
266 **kwargs):
268 if zeros is None:
269 zeros = []
270 if poles is None:
271 poles = []
273 FrequencyResponse.__init__(
274 self,
275 zeros=aslist(zeros),
276 poles=aslist(poles),
277 constant=constant,
278 **kwargs)
280 def evaluate(self, freqs):
281 if hasattr(signal, 'freqs_zpk'): # added in scipy 0.19.0
282 return signal.freqs_zpk(
283 self.zeros, self.poles, self.constant, freqs*2.*num.pi)[1]
284 else:
285 jomeg = 1.0j * 2.*num.pi*freqs
287 a = num.ones(freqs.size, dtype=complex)*self.constant
288 for z in self.zeros:
289 a *= jomeg-z
290 for p in self.poles:
291 a /= jomeg-p
293 return a
295 def is_scalar(self):
296 return len(self.zeros) == 0 and len(self.poles) == 0
298 def get_scalar(self):
299 '''
300 Get factor if this is a flat response.
301 '''
302 if self.is_scalar():
303 return self.constant
304 else:
305 raise IsNotScalar()
307 def inverse(self):
308 return PoleZeroResponse(
309 poles=list(self.zeros),
310 zeros=list(self.poles),
311 constant=1.0/self.constant)
313 def to_analog(self):
314 b, a = signal.zpk2tf(self.zeros, self.poles, self.constant)
315 return AnalogFilterResponse(aslist(b), aslist(a))
317 def to_digital(self, deltat, method='bilinear'):
318 from scipy.signal import cont2discrete, zpk2tf
320 z, p, k, _ = cont2discrete(
321 (self.zeros, self.poles, self.constant),
322 deltat, method=method)
324 b, a = zpk2tf(z, p, k)
326 return DigitalFilterResponse(b, a, deltat)
328 def to_digital_polezero(self, deltat, method='bilinear'):
329 from scipy.signal import cont2discrete
331 z, p, k, _ = cont2discrete(
332 (self.zeros, self.poles, self.constant),
333 deltat, method=method)
335 return DigitalPoleZeroResponse(z, p, k, deltat)
337 def construction(self):
338 breakpoints = []
339 for zero in self.zeros:
340 f = abs(zero) / (2.*math.pi)
341 breakpoints.append((f, 1))
343 for pole in self.poles:
344 f = abs(pole) / (2.*math.pi)
345 breakpoints.append((f, -1))
347 return finalize_construction(breakpoints)
350class DigitalPoleZeroResponse(FrequencyResponse):
351 '''
352 Evaluates frequency response from digital filter pole-zero representation.
354 :param zeros: positions of zeros
355 :type zeros: list of complex
356 :param poles: positions of poles
357 :type poles: list of complex
358 :param constant: gain factor
359 :type constant: complex
360 :param deltat: sampling interval
361 :type deltat: float
363 The poles and zeros should be given as angular frequencies, not in Hz.
364 '''
366 zeros = List.T(Complex.T())
367 poles = List.T(Complex.T())
368 constant = Complex.T(default=1.0+0j)
369 deltat = Float.T()
371 def __init__(
372 self,
373 zeros=None,
374 poles=None,
375 constant=1.0+0j,
376 deltat=None,
377 **kwargs):
379 if zeros is None:
380 zeros = []
381 if poles is None:
382 poles = []
383 if deltat is None:
384 raise ValueError(
385 'Sampling interval `deltat` must be given for '
386 'DigitalPoleZeroResponse')
388 FrequencyResponse.__init__(
389 self, zeros=aslist(zeros), poles=aslist(poles), constant=constant,
390 deltat=deltat, **kwargs)
392 def check_sampling_rate(self):
393 if self.deltat == 0.0:
394 raise InvalidResponseError(
395 'Invalid digital response: sampling rate undefined')
397 def get_fmax(self):
398 self.check_sampling_rate()
399 return 0.5 / self.deltat
401 def evaluate(self, freqs):
402 self.check_sampling_rate()
403 return signal.freqz_zpk(
404 self.zeros, self.poles, self.constant,
405 freqs*(2.*math.pi*self.deltat))[1]
407 def is_scalar(self):
408 return len(self.zeros) == 0 and len(self.poles) == 0
410 def get_scalar(self):
411 '''
412 Get factor if this is a flat response.
413 '''
414 if self.is_scalar():
415 return self.constant
416 else:
417 raise IsNotScalar()
419 def to_digital(self, deltat):
420 self.check_sampling_rate()
421 from scipy.signal import zpk2tf
423 b, a = zpk2tf(self.zeros, self.poles, self.constant)
424 return DigitalFilterResponse(b, a, deltat)
427class ButterworthResponse(FrequencyResponse):
428 '''
429 Butterworth frequency response.
431 :param corner: corner frequency of the response
432 :param order: order of the response
433 :param type: either ``high`` or ``low``
434 '''
436 corner = Float.T(default=1.0)
437 order = Int.T(default=4)
438 type = StringChoice.T(choices=['low', 'high'], default='low')
440 def to_polezero(self):
441 z, p, k = signal.butter(
442 self.order, self.corner*2.*math.pi,
443 btype=self.type, analog=True, output='zpk')
445 return PoleZeroResponse(
446 zeros=aslist(z),
447 poles=aslist(p),
448 constant=float(k))
450 def to_digital(self, deltat):
451 b, a = signal.butter(
452 self.order, self.corner*2.*deltat,
453 self.type, analog=False)
455 return DigitalFilterResponse(b, a, deltat)
457 def to_analog(self):
458 b, a = signal.butter(
459 self.order, self.corner*2.*math.pi,
460 self.type, analog=True)
462 return AnalogFilterResponse(b, a)
464 def to_digital_polezero(self, deltat):
465 z, p, k = signal.butter(
466 self.order, self.corner*2*deltat,
467 btype=self.type, analog=False, output='zpk')
469 return DigitalPoleZeroResponse(z, p, k, deltat)
471 def evaluate(self, freqs):
472 b, a = signal.butter(
473 self.order, self.corner*2.*math.pi,
474 self.type, analog=True)
476 return signal.freqs(b, a, freqs*2.*math.pi)[1]
479class SampledResponse(FrequencyResponse):
480 '''
481 Interpolates frequency response given at a set of sampled frequencies.
483 :param frequencies,values: frequencies and values of the sampled response
484 function.
485 :param left,right: values to return when input is out of range. If set to
486 ``None`` (the default) the endpoints are returned.
487 '''
489 frequencies = Array.T(shape=(None,), dtype=float, serialize_as='list')
490 values = Array.T(shape=(None,), dtype=complex, serialize_as='list')
491 left = Complex.T(optional=True)
492 right = Complex.T(optional=True)
494 def __init__(self, frequencies, values, left=None, right=None, **kwargs):
495 FrequencyResponse.__init__(
496 self,
497 frequencies=asarray_1d(frequencies, float),
498 values=asarray_1d(values, complex),
499 **kwargs)
501 def evaluate(self, freqs):
502 ereal = num.interp(
503 freqs, self.frequencies, num.real(self.values),
504 left=self.left, right=self.right)
505 eimag = num.interp(
506 freqs, self.frequencies, num.imag(self.values),
507 left=self.left, right=self.right)
508 transfer = ereal + 1.0j*eimag
509 return transfer
511 def inverse(self):
512 '''
513 Get inverse as a new :py:class:`SampledResponse` object.
514 '''
516 def inv_or_none(x):
517 if x is not None:
518 return 1./x
520 return SampledResponse(
521 self.frequencies, 1./self.values,
522 left=inv_or_none(self.left),
523 right=inv_or_none(self.right))
526class IntegrationResponse(FrequencyResponse):
527 '''
528 The integration response, optionally multiplied by a constant gain.
530 :param n: exponent (integer)
531 :param gain: gain factor (float)
533 ::
535 gain
536 T(f) = --------------
537 (j*2*pi * f)^n
538 '''
540 n = Int.T(optional=True, default=1)
541 gain = Float.T(optional=True, default=1.0)
543 def __init__(self, n=1, gain=1.0, **kwargs):
544 FrequencyResponse.__init__(self, n=n, gain=gain, **kwargs)
546 def evaluate(self, freqs):
547 nonzero = freqs != 0.0
548 resp = num.zeros(freqs.size, dtype=complex)
549 resp[nonzero] = self.gain / (1.0j * 2. * num.pi*freqs[nonzero])**self.n
550 return resp
553class DifferentiationResponse(FrequencyResponse):
554 '''
555 The differentiation response, optionally multiplied by a constant gain.
557 :param n: exponent (integer)
558 :param gain: gain factor (float)
560 ::
562 T(f) = gain * (j*2*pi * f)^n
563 '''
565 n = Int.T(optional=True, default=1)
566 gain = Float.T(optional=True, default=1.0)
568 def __init__(self, n=1, gain=1.0, **kwargs):
569 FrequencyResponse.__init__(self, n=n, gain=gain, **kwargs)
571 def evaluate(self, freqs):
572 return self.gain * (1.0j * 2. * num.pi * freqs)**self.n
575class DigitalFilterResponse(FrequencyResponse):
576 '''
577 Frequency response of an analog filter.
579 (see :py:func:`scipy.signal.freqz`).
580 '''
582 b = List.T(Float.T())
583 a = List.T(Float.T())
584 deltat = Float.T()
585 drop_phase = Bool.T(default=False)
587 def __init__(self, b, a, deltat, drop_phase=False, **kwargs):
588 FrequencyResponse.__init__(
589 self, b=aslist(b), a=aslist(a), deltat=float(deltat),
590 drop_phase=drop_phase, **kwargs)
592 def check_sampling_rate(self):
593 if self.deltat == 0.0:
594 raise InvalidResponseError(
595 'Invalid digital response: sampling rate undefined')
597 def is_scalar(self):
598 return len(self.a) == 1 and len(self.b) == 1
600 def get_scalar(self):
601 if self.is_scalar():
602 return self.b[0] / self.a[0]
603 else:
604 raise IsNotScalar()
606 def get_fmax(self):
607 if not self.is_scalar():
608 self.check_sampling_rate()
609 return 0.5 / self.deltat
610 else:
611 return None
613 def evaluate(self, freqs):
614 if self.is_scalar():
615 return util.num_full_like(freqs, self.get_scalar(), dtype=complex)
617 self.check_sampling_rate()
619 ok = freqs <= 0.5/self.deltat
620 coeffs = num.zeros(freqs.size, dtype=complex)
622 coeffs[ok] = signal.freqz(
623 self.b, self.a, freqs[ok]*2.*math.pi * self.deltat)[1]
625 coeffs[num.logical_not(ok)] = None
626 if self.drop_phase:
627 return num.abs(coeffs)
628 else:
629 return coeffs
631 def filter(self, tr):
632 self.check_sampling_rate()
634 from pyrocko import trace
635 trace.assert_same_sampling_rate(self, tr)
636 tr_new = tr.copy(data=False)
637 tr_new.set_ydata(signal.lfilter(self.b, self.a, tr.get_ydata()))
638 return tr_new
641class AnalogFilterResponse(FrequencyResponse):
642 '''
643 Frequency response of an analog filter.
645 (see :py:func:`scipy.signal.freqs`).
646 '''
648 b = List.T(Float.T())
649 a = List.T(Float.T())
651 def __init__(self, b, a, **kwargs):
652 FrequencyResponse.__init__(
653 self, b=aslist(b), a=aslist(a), **kwargs)
655 def evaluate(self, freqs):
656 return signal.freqs(self.b, self.a, freqs*2.*math.pi)[1]
658 def to_digital(self, deltat, method='bilinear'):
659 from scipy.signal import cont2discrete
660 b, a, _ = cont2discrete((self.b, self.a), deltat, method=method)
661 if b.ndim == 2:
662 b = b[0]
663 return DigitalFilterResponse(b.tolist(), a.tolist(), deltat)
666class MultiplyResponse(FrequencyResponse):
667 '''
668 Multiplication of several :py:class:`FrequencyResponse` objects.
669 '''
671 responses = List.T(FrequencyResponse.T())
673 def __init__(self, responses=None, **kwargs):
674 if responses is None:
675 responses = []
676 FrequencyResponse.__init__(self, responses=responses, **kwargs)
678 def get_fmax(self):
679 fmaxs = [resp.get_fmax() for resp in self.responses]
680 fmaxs = [fmax for fmax in fmaxs if fmax is not None]
681 if not fmaxs:
682 return None
683 else:
684 return min(fmaxs)
686 def evaluate(self, freqs):
687 a = num.ones(freqs.size, dtype=complex)
688 for resp in self.responses:
689 a *= resp.evaluate(freqs)
691 return a
693 def is_scalar(self):
694 return all(resp.is_scalar() for resp in self.responses)
696 def get_scalar(self):
697 '''
698 Get factor if this is a flat response.
699 '''
700 if self.is_scalar():
701 return num.product(resp.get_scalar() for resp in self.responses)
702 else:
703 raise IsNotScalar()
705 def simplify(self):
706 self.responses = simplify_responses(self.responses)
708 def construction(self):
709 breakpoints = []
710 for resp in self.responses:
711 breakpoints.extend(resp.construction())
713 return finalize_construction(breakpoints)
716class DelayResponse(FrequencyResponse):
718 delay = Float.T()
720 def evaluate(self, freqs):
721 return num.exp(-2.0J * self.delay * num.pi * freqs)
724class InvalidResponseError(Exception):
725 pass
728class InvalidResponse(FrequencyResponse):
730 message = String.T()
732 def __init__(self, message):
733 FrequencyResponse.__init__(self, message=message)
734 self.have_warned = False
736 def evaluate(self, freqs):
737 if not self.have_warned:
738 logger.warning('Invalid response: %s' % self.message)
739 self.have_warned = True
741 return util.num_full_like(freqs, None, dtype=num.complex)
744def simplify_responses(responses):
746 def unpack_multi(responses):
747 for resp in responses:
748 if isinstance(resp, MultiplyResponse):
749 for sub in unpack_multi(resp.responses):
750 yield sub
751 else:
752 yield resp
754 def cancel_pzs(poles, zeros):
755 poles_new = []
756 zeros_new = list(zeros)
757 for p in poles:
758 try:
759 zeros_new.pop(zeros_new.index(p))
760 except ValueError:
761 poles_new.append(p)
763 return poles_new, zeros_new
765 def combine_pzs(responses):
766 poles = []
767 zeros = []
768 constant = 1.0
769 out = []
770 for resp in responses:
771 if isinstance(resp, PoleZeroResponse):
772 poles.extend(resp.poles)
773 zeros.extend(resp.zeros)
774 constant *= resp.constant
775 else:
776 out.append(resp)
778 poles, zeros = cancel_pzs(poles, zeros)
779 if poles or zeros:
780 out.insert(0, PoleZeroResponse(
781 poles=poles, zeros=zeros, constant=constant))
782 elif constant != 1.0:
783 out.insert(0, Gain(constant=constant))
785 return out
787 def split(xs, condition):
788 out = [], []
789 for x in xs:
790 out[condition(x)].append(x)
792 return out
794 def combine_gains(responses):
795 non_scalars, scalars = split(responses, lambda resp: resp.is_scalar())
796 if scalars:
797 factor = num.prod([resp.get_scalar() for resp in scalars])
798 yield Gain(constant=factor)
800 for resp in non_scalars:
801 yield resp
803 return list(combine_gains(combine_pzs(unpack_multi(responses))))