1# http://pyrocko.org - GPLv3
2#
3# The Pyrocko Developers, 21st Century
4# ---|P------/S----------~Lg----------
6'''This module provides convenience objects to handle frequency responses.'''
8from __future__ import print_function, division, absolute_import
10import math
11import logging
13import numpy as num
14from scipy import signal
16from pyrocko import util, evalresp
17from pyrocko.guts import Object, Float, Int, String, Complex, Tuple, List, \
18 StringChoice, Bool
19from pyrocko.guts_array import Array
21try:
22 newstr = unicode
23except NameError:
24 newstr = str
27guts_prefix = 'pf'
29logger = logging.getLogger('pyrocko.response')
32def asarray_1d(x, dtype):
33 if isinstance(x, (list, tuple)) and x and isinstance(x[0], (str, newstr)):
34 return num.asarray(list(map(dtype, x)), dtype=dtype)
35 else:
36 a = num.asarray(x, dtype=dtype)
37 if not a.ndim == 1:
38 raise ValueError('could not convert to 1D array')
39 return a
42class FrequencyResponseCheckpoint(Object):
43 frequency = Float.T()
44 value = Float.T()
47class IsNotScalar(Exception):
48 pass
51class FrequencyResponse(Object):
52 '''
53 Evaluates frequency response at given frequencies.
54 '''
56 checkpoints = List.T(FrequencyResponseCheckpoint.T())
58 def evaluate(self, freqs):
59 return num.ones(freqs.size, dtype=complex)
61 def is_scalar(self):
62 '''
63 Check if this is a flat response.
64 '''
66 if type(self) is FrequencyResponse:
67 return True
68 else:
69 return False # default for derived classes
71 def get_scalar(self):
72 '''
73 Get factor if this is a flat response.
74 '''
75 if type(self) is FrequencyResponse:
76 return 1.0
77 else:
78 raise IsNotScalar() # default for derived classes
80 def get_fmax(self):
81 return None
84class Gain(FrequencyResponse):
85 '''
86 A flat frequency response.
87 '''
89 constant = Complex.T(default=1.0+0j)
91 def evaluate(self, freqs):
92 return util.num_full_like(freqs, self.constant, dtype=complex)
94 def is_scalar(self):
95 return True
97 def get_scalar(self):
98 return self.constant
101class Evalresp(FrequencyResponse):
102 '''
103 Calls evalresp and generates values of the instrument response transfer
104 function.
106 :param respfile: response file in evalresp format
107 :param trace: trace for which the response is to be extracted from the file
108 :param target: ``'dis'`` for displacement or ``'vel'`` for velocity
109 '''
111 respfile = String.T()
112 nslc_id = Tuple.T(4, String.T())
113 target = String.T(default='dis')
114 instant = Float.T()
115 stages = Tuple.T(2, Int.T(), optional=True)
117 def __init__(
118 self,
119 respfile,
120 trace=None,
121 target='dis',
122 nslc_id=None,
123 time=None,
124 stages=None,
125 **kwargs):
127 if trace is not None:
128 nslc_id = trace.nslc_id
129 time = (trace.tmin + trace.tmax) / 2.
131 FrequencyResponse.__init__(
132 self,
133 respfile=respfile,
134 nslc_id=nslc_id,
135 instant=time,
136 target=target,
137 stages=stages,
138 **kwargs)
140 def evaluate(self, freqs):
141 network, station, location, channel = self.nslc_id
142 if self.stages is None:
143 stages = (-1, 0)
144 else:
145 stages = self.stages[0]+1, self.stages[1]
147 x = evalresp.evalresp(
148 sta_list=station,
149 cha_list=channel,
150 net_code=network,
151 locid=location,
152 instant=self.instant,
153 freqs=freqs,
154 units=self.target.upper(),
155 file=self.respfile,
156 start_stage=stages[0],
157 stop_stage=stages[1],
158 rtype='CS')
160 transfer = x[0][4]
161 return transfer
164class InverseEvalresp(FrequencyResponse):
165 '''
166 Calls evalresp and generates values of the inverse instrument response for
167 deconvolution of instrument response.
169 :param respfile: response file in evalresp format
170 :param trace: trace for which the response is to be extracted from the file
171 :param target: ``'dis'`` for displacement or ``'vel'`` for velocity
172 '''
174 respfile = String.T()
175 nslc_id = Tuple.T(4, String.T())
176 target = String.T(default='dis')
177 instant = Float.T()
179 def __init__(self, respfile, trace, target='dis', **kwargs):
180 FrequencyResponse.__init__(
181 self,
182 respfile=respfile,
183 nslc_id=trace.nslc_id,
184 instant=(trace.tmin + trace.tmax)/2.,
185 target=target,
186 **kwargs)
188 def evaluate(self, freqs):
189 network, station, location, channel = self.nslc_id
190 x = evalresp.evalresp(sta_list=station,
191 cha_list=channel,
192 net_code=network,
193 locid=location,
194 instant=self.instant,
195 freqs=freqs,
196 units=self.target.upper(),
197 file=self.respfile,
198 rtype='CS')
200 transfer = x[0][4]
201 return 1./transfer
204def aslist(x):
205 if x is None:
206 return []
208 try:
209 return list(x)
210 except TypeError:
211 return [x]
214class PoleZeroResponse(FrequencyResponse):
215 '''
216 Evaluates frequency response from pole-zero representation.
218 :param zeros: positions of zeros
219 :type zeros: list of complex
220 :param poles: positions of poles
221 :type poles: list of complex
222 :param constant: gain factor
223 :type constant: complex
225 ::
227 (j*2*pi*f - zeros[0]) * (j*2*pi*f - zeros[1]) * ...
228 T(f) = constant * ----------------------------------------------------
229 (j*2*pi*f - poles[0]) * (j*2*pi*f - poles[1]) * ...
232 The poles and zeros should be given as angular frequencies, not in Hz.
233 '''
235 zeros = List.T(Complex.T())
236 poles = List.T(Complex.T())
237 constant = Complex.T(default=1.0+0j)
239 def __init__(
240 self,
241 zeros=None,
242 poles=None,
243 constant=1.0+0j,
244 **kwargs):
246 if zeros is None:
247 zeros = []
248 if poles is None:
249 poles = []
251 FrequencyResponse.__init__(
252 self,
253 zeros=aslist(zeros),
254 poles=aslist(poles),
255 constant=constant,
256 **kwargs)
258 def evaluate(self, freqs):
259 return signal.freqs_zpk(
260 self.zeros, self.poles, self.constant, freqs*2.*num.pi)[1]
262 def is_scalar(self):
263 return len(self.zeros) == 0 and len(self.poles) == 0
265 def get_scalar(self):
266 '''
267 Get factor if this is a flat response.
268 '''
269 if self.is_scalar():
270 return self.constant
271 else:
272 raise IsNotScalar()
274 def inverse(self):
275 return PoleZeroResponse(
276 poles=list(self.zeros),
277 zeros=list(self.poles),
278 constant=1.0/self.constant)
280 def to_analog(self):
281 b, a = signal.zpk2tf(self.zeros, self.poles, self.constant)
282 return AnalogFilterResponse(aslist(b), aslist(a))
284 def to_digital(self, deltat, method='bilinear'):
285 from scipy.signal import cont2discrete, zpk2tf
287 z, p, k, _ = cont2discrete(
288 (self.zeros, self.poles, self.constant),
289 deltat, method=method)
291 b, a = zpk2tf(z, p, k)
293 return DigitalFilterResponse(b, a, deltat)
295 def to_digital_polezero(self, deltat, method='bilinear'):
296 from scipy.signal import cont2discrete
298 z, p, k, _ = cont2discrete(
299 (self.zeros, self.poles, self.constant),
300 deltat, method=method)
302 return DigitalPoleZeroResponse(z, p, k, deltat)
305class DigitalPoleZeroResponse(FrequencyResponse):
306 '''
307 Evaluates frequency response from digital filter pole-zero representation.
309 :param zeros: positions of zeros
310 :type zeros: list of complex
311 :param poles: positions of poles
312 :type poles: list of complex
313 :param constant: gain factor
314 :type constant: complex
315 :param deltat: sampling interval
316 :type deltat: float
318 The poles and zeros should be given as angular frequencies, not in Hz.
319 '''
321 zeros = List.T(Complex.T())
322 poles = List.T(Complex.T())
323 constant = Complex.T(default=1.0+0j)
324 deltat = Float.T()
326 def __init__(
327 self,
328 zeros=None,
329 poles=None,
330 constant=1.0+0j,
331 deltat=None,
332 **kwargs):
334 if zeros is None:
335 zeros = []
336 if poles is None:
337 poles = []
338 if deltat is None:
339 raise ValueError(
340 'Sampling interval `deltat` must be given for '
341 'DigitalPoleZeroResponse')
343 FrequencyResponse.__init__(
344 self, zeros=aslist(zeros), poles=aslist(poles), constant=constant,
345 deltat=deltat, **kwargs)
347 def check_sampling_rate(self):
348 if self.deltat == 0.0:
349 raise InvalidResponseError(
350 'Invalid digital response: sampling rate undefined')
352 def get_fmax(self):
353 self.check_sampling_rate()
354 return 0.5 / self.deltat
356 def evaluate(self, freqs):
357 self.check_sampling_rate()
358 return signal.freqz_zpk(
359 self.zeros, self.poles, self.constant,
360 freqs*(2.*math.pi*self.deltat))[1]
362 def is_scalar(self):
363 return len(self.zeros) == 0 and len(self.poles) == 0
365 def get_scalar(self):
366 '''
367 Get factor if this is a flat response.
368 '''
369 if self.is_scalar():
370 return self.constant
371 else:
372 raise IsNotScalar()
374 def to_digital(self, deltat):
375 self.check_sampling_rate()
376 from scipy.signal import zpk2tf
378 b, a = zpk2tf(self.zeros, self.poles, self.constant)
379 return DigitalFilterResponse(b, a, deltat)
382class ButterworthResponse(FrequencyResponse):
383 '''
384 Butterworth frequency response.
386 :param corner: corner frequency of the response
387 :param order: order of the response
388 :param type: either ``high`` or ``low``
389 '''
391 corner = Float.T(default=1.0)
392 order = Int.T(default=4)
393 type = StringChoice.T(choices=['low', 'high'], default='low')
395 def to_polezero(self):
396 z, p, k = signal.butter(
397 self.order, self.corner*2.*math.pi,
398 btype=self.type, analog=True, output='zpk')
400 return PoleZeroResponse(
401 zeros=aslist(z),
402 poles=aslist(p),
403 constant=float(k))
405 def to_digital(self, deltat):
406 b, a = signal.butter(
407 self.order, self.corner*2.*deltat,
408 self.type, analog=False)
410 return DigitalFilterResponse(b, a, deltat)
412 def to_analog(self):
413 b, a = signal.butter(
414 self.order, self.corner*2.*math.pi,
415 self.type, analog=True)
417 return AnalogFilterResponse(b, a)
419 def to_digital_polezero(self, deltat):
420 z, p, k = signal.butter(
421 self.order, self.corner*2*deltat,
422 btype=self.type, analog=False, output='zpk')
424 return DigitalPoleZeroResponse(z, p, k, deltat)
426 def evaluate(self, freqs):
427 b, a = signal.butter(
428 self.order, self.corner*2.*math.pi,
429 self.type, analog=True)
431 return signal.freqs(b, a, freqs*2.*math.pi)[1]
434class SampledResponse(FrequencyResponse):
435 '''
436 Interpolates frequency response given at a set of sampled frequencies.
438 :param frequencies,values: frequencies and values of the sampled response
439 function.
440 :param left,right: values to return when input is out of range. If set to
441 ``None`` (the default) the endpoints are returned.
442 '''
444 frequencies = Array.T(shape=(None,), dtype=float, serialize_as='list')
445 values = Array.T(shape=(None,), dtype=complex, serialize_as='list')
446 left = Complex.T(optional=True)
447 right = Complex.T(optional=True)
449 def __init__(self, frequencies, values, left=None, right=None, **kwargs):
450 FrequencyResponse.__init__(
451 self,
452 frequencies=asarray_1d(frequencies, float),
453 values=asarray_1d(values, complex),
454 **kwargs)
456 def evaluate(self, freqs):
457 ereal = num.interp(
458 freqs, self.frequencies, num.real(self.values),
459 left=self.left, right=self.right)
460 eimag = num.interp(
461 freqs, self.frequencies, num.imag(self.values),
462 left=self.left, right=self.right)
463 transfer = ereal + 1.0j*eimag
464 return transfer
466 def inverse(self):
467 '''
468 Get inverse as a new :py:class:`SampledResponse` object.
469 '''
471 def inv_or_none(x):
472 if x is not None:
473 return 1./x
475 return SampledResponse(
476 self.frequencies, 1./self.values,
477 left=inv_or_none(self.left),
478 right=inv_or_none(self.right))
481class IntegrationResponse(FrequencyResponse):
482 '''
483 The integration response, optionally multiplied by a constant gain.
485 :param n: exponent (integer)
486 :param gain: gain factor (float)
488 ::
490 gain
491 T(f) = --------------
492 (j*2*pi * f)^n
493 '''
495 n = Int.T(optional=True, default=1)
496 gain = Float.T(optional=True, default=1.0)
498 def __init__(self, n=1, gain=1.0, **kwargs):
499 FrequencyResponse.__init__(self, n=n, gain=gain, **kwargs)
501 def evaluate(self, freqs):
502 nonzero = freqs != 0.0
503 resp = num.zeros(freqs.size, dtype=complex)
504 resp[nonzero] = self.gain / (1.0j * 2. * num.pi*freqs[nonzero])**self.n
505 return resp
508class DifferentiationResponse(FrequencyResponse):
509 '''
510 The differentiation response, optionally multiplied by a constant gain.
512 :param n: exponent (integer)
513 :param gain: gain factor (float)
515 ::
517 T(f) = gain * (j*2*pi * f)^n
518 '''
520 n = Int.T(optional=True, default=1)
521 gain = Float.T(optional=True, default=1.0)
523 def __init__(self, n=1, gain=1.0, **kwargs):
524 FrequencyResponse.__init__(self, n=n, gain=gain, **kwargs)
526 def evaluate(self, freqs):
527 return self.gain * (1.0j * 2. * num.pi * freqs)**self.n
530class DigitalFilterResponse(FrequencyResponse):
531 '''
532 Frequency response of an analog filter.
534 (see :py:func:`scipy.signal.freqz`).
535 '''
537 b = List.T(Float.T())
538 a = List.T(Float.T())
539 deltat = Float.T()
540 drop_phase = Bool.T(default=False)
542 def __init__(self, b, a, deltat, drop_phase=False, **kwargs):
543 FrequencyResponse.__init__(
544 self, b=aslist(b), a=aslist(a), deltat=float(deltat),
545 drop_phase=drop_phase, **kwargs)
547 def check_sampling_rate(self):
548 if self.deltat == 0.0:
549 raise InvalidResponseError(
550 'Invalid digital response: sampling rate undefined')
552 def get_fmax(self):
553 self.check_sampling_rate()
554 return 0.5 / self.deltat
556 def evaluate(self, freqs):
557 self.check_sampling_rate()
559 ok = freqs <= 0.5/self.deltat
560 coeffs = num.zeros(freqs.size, dtype=complex)
562 coeffs[ok] = signal.freqz(
563 self.b, self.a, freqs[ok]*2.*math.pi * self.deltat)[1]
565 coeffs[num.logical_not(ok)] = None
566 if self.drop_phase:
567 return num.abs(coeffs)
568 else:
569 return coeffs
571 def filter(self, tr):
572 self.check_sampling_rate()
574 from pyrocko import trace
575 trace.assert_same_sampling_rate(self, tr)
576 tr_new = tr.copy(data=False)
577 tr_new.set_ydata(signal.lfilter(self.b, self.a, tr.get_ydata()))
578 return tr_new
581class AnalogFilterResponse(FrequencyResponse):
582 '''
583 Frequency response of an analog filter.
585 (see :py:func:`scipy.signal.freqs`).
586 '''
588 b = List.T(Float.T())
589 a = List.T(Float.T())
591 def __init__(self, b, a, **kwargs):
592 FrequencyResponse.__init__(
593 self, b=aslist(b), a=aslist(a), **kwargs)
595 def evaluate(self, freqs):
596 return signal.freqs(self.b, self.a, freqs*2.*math.pi)[1]
598 def to_digital(self, deltat, method='bilinear'):
599 from scipy.signal import cont2discrete
600 b, a, _ = cont2discrete((self.b, self.a), deltat, method=method)
601 if b.ndim == 2:
602 b = b[0]
603 return DigitalFilterResponse(b.tolist(), a.tolist(), deltat)
606class MultiplyResponse(FrequencyResponse):
607 '''
608 Multiplication of several :py:class:`FrequencyResponse` objects.
609 '''
611 responses = List.T(FrequencyResponse.T())
613 def __init__(self, responses=None, **kwargs):
614 if responses is None:
615 responses = []
616 FrequencyResponse.__init__(self, responses=responses, **kwargs)
618 def get_fmax(self):
619 fmaxs = [resp.get_fmax() for resp in self.responses]
620 fmaxs = [fmax for fmax in fmaxs if fmax is not None]
621 if not fmaxs:
622 return None
623 else:
624 return min(fmaxs)
626 def evaluate(self, freqs):
627 a = num.ones(freqs.size, dtype=complex)
628 for resp in self.responses:
629 a *= resp.evaluate(freqs)
631 return a
633 def is_scalar(self):
634 return all(resp.is_scalar() for resp in self.responses)
636 def get_scalar(self):
637 '''
638 Get factor if this is a flat response.
639 '''
640 if self.is_scalar():
641 return num.product(resp.get_scalar() for resp in self.responses)
642 else:
643 raise IsNotScalar()
645 def simplify(self):
646 poles = []
647 zeros = []
648 constant = 1.0
649 responses = []
650 for resp in self.responses:
651 if isinstance(resp, PoleZeroResponse):
652 poles.extend(resp.poles)
653 zeros.extend(resp.zeros)
654 constant *= resp.constant
655 else:
656 responses.append(resp)
658 if poles or zeros or constant != 1.0:
659 responses[0:0] = [
660 PoleZeroResponse(poles=poles, zeros=zeros, constant=constant)]
662 self.responses = responses
665class DelayResponse(FrequencyResponse):
667 delay = Float.T()
669 def evaluate(self, freqs):
670 return num.exp(-2.0J * self.delay * num.pi * freqs)
673class InvalidResponseError(Exception):
674 pass
677class InvalidResponse(FrequencyResponse):
679 message = String.T()
681 def __init__(self, message):
682 FrequencyResponse.__init__(self, message=message)
683 self.have_warned = False
685 def evaluate(self, freqs):
686 if not self.have_warned:
687 logger.warning('Invalid response: %s' % self.message)
688 self.have_warned = True
690 return util.num_full_like(freqs, None, dtype=num.complex)
693def simplify_responses(responses):
695 def unpack_multi(responses):
696 for resp in responses:
697 if isinstance(resp, MultiplyResponse):
698 for sub in unpack_multi(resp.responses):
699 yield sub
700 else:
701 yield resp
703 def cancel_pzs(poles, zeros):
704 poles_new = []
705 zeros_new = list(zeros)
706 for p in poles:
707 try:
708 zeros_new.pop(zeros_new.index(p))
709 except ValueError:
710 poles_new.append(p)
712 return poles_new, zeros_new
714 def combine_pzs(responses):
715 poles = []
716 zeros = []
717 constant = 1.0
718 out = []
719 for resp in responses:
720 if isinstance(resp, PoleZeroResponse):
721 poles.extend(resp.poles)
722 zeros.extend(resp.zeros)
723 constant *= resp.constant
724 else:
725 out.append(resp)
727 poles, zeros = cancel_pzs(poles, zeros)
728 if poles or zeros:
729 out.insert(0, PoleZeroResponse(
730 poles=poles, zeros=zeros, constant=constant))
731 elif constant != 1.0:
732 out.insert(0, Gain(constant=constant))
734 return out
736 def split(xs, condition):
737 out = [], []
738 for x in xs:
739 out[condition(x)].append(x)
741 return out
743 def combine_gains(responses):
744 non_scalars, scalars = split(responses, lambda resp: resp.is_scalar())
745 if scalars:
746 factor = num.prod([resp.get_scalar() for resp in scalars])
747 yield Gain(constant=factor)
749 for resp in non_scalars:
750 yield resp
752 return list(combine_gains(combine_pzs(unpack_multi(responses))))