1# http://pyrocko.org - GPLv3 

2# 

3# The Pyrocko Developers, 21st Century 

4# ---|P------/S----------~Lg---------- 

5 

6'''This module provides convenience objects to handle frequency responses.''' 

7 

8from __future__ import print_function, division, absolute_import 

9 

10import math 

11import logging 

12 

13import numpy as num 

14from scipy import signal 

15 

16from pyrocko import util, evalresp 

17from pyrocko.guts import Object, Float, Int, String, Complex, Tuple, List, \ 

18 StringChoice, Bool 

19from pyrocko.guts_array import Array 

20 

21try: 

22 newstr = unicode 

23except NameError: 

24 newstr = str 

25 

26 

27guts_prefix = 'pf' 

28 

29logger = logging.getLogger('pyrocko.response') 

30 

31 

32def asarray_1d(x, dtype): 

33 if isinstance(x, (list, tuple)) and x and isinstance(x[0], (str, newstr)): 

34 return num.asarray(list(map(dtype, x)), dtype=dtype) 

35 else: 

36 a = num.asarray(x, dtype=dtype) 

37 if not a.ndim == 1: 

38 raise ValueError('could not convert to 1D array') 

39 return a 

40 

41 

42class FrequencyResponseCheckpoint(Object): 

43 frequency = Float.T() 

44 value = Float.T() 

45 

46 

47class IsNotScalar(Exception): 

48 pass 

49 

50 

51class FrequencyResponse(Object): 

52 ''' 

53 Evaluates frequency response at given frequencies. 

54 ''' 

55 

56 checkpoints = List.T(FrequencyResponseCheckpoint.T()) 

57 

58 def evaluate(self, freqs): 

59 return num.ones(freqs.size, dtype=complex) 

60 

61 def is_scalar(self): 

62 ''' 

63 Check if this is a flat response. 

64 ''' 

65 

66 if type(self) is FrequencyResponse: 

67 return True 

68 else: 

69 return False # default for derived classes 

70 

71 def get_scalar(self): 

72 ''' 

73 Get factor if this is a flat response. 

74 ''' 

75 if type(self) is FrequencyResponse: 

76 return 1.0 

77 else: 

78 raise IsNotScalar() # default for derived classes 

79 

80 def get_fmax(self): 

81 return None 

82 

83 

84class Gain(FrequencyResponse): 

85 ''' 

86 A flat frequency response. 

87 ''' 

88 

89 constant = Complex.T(default=1.0+0j) 

90 

91 def evaluate(self, freqs): 

92 return util.num_full_like(freqs, self.constant, dtype=complex) 

93 

94 def is_scalar(self): 

95 return True 

96 

97 def get_scalar(self): 

98 return self.constant 

99 

100 

101class Evalresp(FrequencyResponse): 

102 ''' 

103 Calls evalresp and generates values of the instrument response transfer 

104 function. 

105 

106 :param respfile: response file in evalresp format 

107 :param trace: trace for which the response is to be extracted from the file 

108 :param target: ``'dis'`` for displacement or ``'vel'`` for velocity 

109 ''' 

110 

111 respfile = String.T() 

112 nslc_id = Tuple.T(4, String.T()) 

113 target = String.T(default='dis') 

114 instant = Float.T() 

115 stages = Tuple.T(2, Int.T(), optional=True) 

116 

117 def __init__( 

118 self, 

119 respfile, 

120 trace=None, 

121 target='dis', 

122 nslc_id=None, 

123 time=None, 

124 stages=None, 

125 **kwargs): 

126 

127 if trace is not None: 

128 nslc_id = trace.nslc_id 

129 time = (trace.tmin + trace.tmax) / 2. 

130 

131 FrequencyResponse.__init__( 

132 self, 

133 respfile=respfile, 

134 nslc_id=nslc_id, 

135 instant=time, 

136 target=target, 

137 stages=stages, 

138 **kwargs) 

139 

140 def evaluate(self, freqs): 

141 network, station, location, channel = self.nslc_id 

142 if self.stages is None: 

143 stages = (-1, 0) 

144 else: 

145 stages = self.stages[0]+1, self.stages[1] 

146 

147 x = evalresp.evalresp( 

148 sta_list=station, 

149 cha_list=channel, 

150 net_code=network, 

151 locid=location, 

152 instant=self.instant, 

153 freqs=freqs, 

154 units=self.target.upper(), 

155 file=self.respfile, 

156 start_stage=stages[0], 

157 stop_stage=stages[1], 

158 rtype='CS') 

159 

160 transfer = x[0][4] 

161 return transfer 

162 

163 

164class InverseEvalresp(FrequencyResponse): 

165 ''' 

166 Calls evalresp and generates values of the inverse instrument response for 

167 deconvolution of instrument response. 

168 

169 :param respfile: response file in evalresp format 

170 :param trace: trace for which the response is to be extracted from the file 

171 :param target: ``'dis'`` for displacement or ``'vel'`` for velocity 

172 ''' 

173 

174 respfile = String.T() 

175 nslc_id = Tuple.T(4, String.T()) 

176 target = String.T(default='dis') 

177 instant = Float.T() 

178 

179 def __init__(self, respfile, trace, target='dis', **kwargs): 

180 FrequencyResponse.__init__( 

181 self, 

182 respfile=respfile, 

183 nslc_id=trace.nslc_id, 

184 instant=(trace.tmin + trace.tmax)/2., 

185 target=target, 

186 **kwargs) 

187 

188 def evaluate(self, freqs): 

189 network, station, location, channel = self.nslc_id 

190 x = evalresp.evalresp(sta_list=station, 

191 cha_list=channel, 

192 net_code=network, 

193 locid=location, 

194 instant=self.instant, 

195 freqs=freqs, 

196 units=self.target.upper(), 

197 file=self.respfile, 

198 rtype='CS') 

199 

200 transfer = x[0][4] 

201 return 1./transfer 

202 

203 

204def aslist(x): 

205 if x is None: 

206 return [] 

207 

208 try: 

209 return list(x) 

210 except TypeError: 

211 return [x] 

212 

213 

214class PoleZeroResponse(FrequencyResponse): 

215 ''' 

216 Evaluates frequency response from pole-zero representation. 

217 

218 :param zeros: positions of zeros 

219 :type zeros: list of complex 

220 :param poles: positions of poles 

221 :type poles: list of complex 

222 :param constant: gain factor 

223 :type constant: complex 

224 

225 :: 

226 

227 (j*2*pi*f - zeros[0]) * (j*2*pi*f - zeros[1]) * ... 

228 T(f) = constant * ---------------------------------------------------- 

229 (j*2*pi*f - poles[0]) * (j*2*pi*f - poles[1]) * ... 

230 

231 

232 The poles and zeros should be given as angular frequencies, not in Hz. 

233 ''' 

234 

235 zeros = List.T(Complex.T()) 

236 poles = List.T(Complex.T()) 

237 constant = Complex.T(default=1.0+0j) 

238 

239 def __init__( 

240 self, 

241 zeros=None, 

242 poles=None, 

243 constant=1.0+0j, 

244 **kwargs): 

245 

246 if zeros is None: 

247 zeros = [] 

248 if poles is None: 

249 poles = [] 

250 

251 FrequencyResponse.__init__( 

252 self, 

253 zeros=aslist(zeros), 

254 poles=aslist(poles), 

255 constant=constant, 

256 **kwargs) 

257 

258 def evaluate(self, freqs): 

259 if hasattr(signal, 'freqs_zpk'): # added in scipy 0.19.0 

260 return signal.freqs_zpk( 

261 self.zeros, self.poles, self.constant, freqs*2.*num.pi)[1] 

262 else: 

263 jomeg = 1.0j * 2.*num.pi*freqs 

264 

265 a = num.ones(freqs.size, dtype=complex)*self.constant 

266 for z in self.zeros: 

267 a *= jomeg-z 

268 for p in self.poles: 

269 a /= jomeg-p 

270 

271 return a 

272 

273 def is_scalar(self): 

274 return len(self.zeros) == 0 and len(self.poles) == 0 

275 

276 def get_scalar(self): 

277 ''' 

278 Get factor if this is a flat response. 

279 ''' 

280 if self.is_scalar(): 

281 return self.constant 

282 else: 

283 raise IsNotScalar() 

284 

285 def inverse(self): 

286 return PoleZeroResponse( 

287 poles=list(self.zeros), 

288 zeros=list(self.poles), 

289 constant=1.0/self.constant) 

290 

291 def to_analog(self): 

292 b, a = signal.zpk2tf(self.zeros, self.poles, self.constant) 

293 return AnalogFilterResponse(aslist(b), aslist(a)) 

294 

295 def to_digital(self, deltat, method='bilinear'): 

296 from scipy.signal import cont2discrete, zpk2tf 

297 

298 z, p, k, _ = cont2discrete( 

299 (self.zeros, self.poles, self.constant), 

300 deltat, method=method) 

301 

302 b, a = zpk2tf(z, p, k) 

303 

304 return DigitalFilterResponse(b, a, deltat) 

305 

306 def to_digital_polezero(self, deltat, method='bilinear'): 

307 from scipy.signal import cont2discrete 

308 

309 z, p, k, _ = cont2discrete( 

310 (self.zeros, self.poles, self.constant), 

311 deltat, method=method) 

312 

313 return DigitalPoleZeroResponse(z, p, k, deltat) 

314 

315 

316class DigitalPoleZeroResponse(FrequencyResponse): 

317 ''' 

318 Evaluates frequency response from digital filter pole-zero representation. 

319 

320 :param zeros: positions of zeros 

321 :type zeros: list of complex 

322 :param poles: positions of poles 

323 :type poles: list of complex 

324 :param constant: gain factor 

325 :type constant: complex 

326 :param deltat: sampling interval 

327 :type deltat: float 

328 

329 The poles and zeros should be given as angular frequencies, not in Hz. 

330 ''' 

331 

332 zeros = List.T(Complex.T()) 

333 poles = List.T(Complex.T()) 

334 constant = Complex.T(default=1.0+0j) 

335 deltat = Float.T() 

336 

337 def __init__( 

338 self, 

339 zeros=None, 

340 poles=None, 

341 constant=1.0+0j, 

342 deltat=None, 

343 **kwargs): 

344 

345 if zeros is None: 

346 zeros = [] 

347 if poles is None: 

348 poles = [] 

349 if deltat is None: 

350 raise ValueError( 

351 'Sampling interval `deltat` must be given for ' 

352 'DigitalPoleZeroResponse') 

353 

354 FrequencyResponse.__init__( 

355 self, zeros=aslist(zeros), poles=aslist(poles), constant=constant, 

356 deltat=deltat, **kwargs) 

357 

358 def check_sampling_rate(self): 

359 if self.deltat == 0.0: 

360 raise InvalidResponseError( 

361 'Invalid digital response: sampling rate undefined') 

362 

363 def get_fmax(self): 

364 self.check_sampling_rate() 

365 return 0.5 / self.deltat 

366 

367 def evaluate(self, freqs): 

368 self.check_sampling_rate() 

369 return signal.freqz_zpk( 

370 self.zeros, self.poles, self.constant, 

371 freqs*(2.*math.pi*self.deltat))[1] 

372 

373 def is_scalar(self): 

374 return len(self.zeros) == 0 and len(self.poles) == 0 

375 

376 def get_scalar(self): 

377 ''' 

378 Get factor if this is a flat response. 

379 ''' 

380 if self.is_scalar(): 

381 return self.constant 

382 else: 

383 raise IsNotScalar() 

384 

385 def to_digital(self, deltat): 

386 self.check_sampling_rate() 

387 from scipy.signal import zpk2tf 

388 

389 b, a = zpk2tf(self.zeros, self.poles, self.constant) 

390 return DigitalFilterResponse(b, a, deltat) 

391 

392 

393class ButterworthResponse(FrequencyResponse): 

394 ''' 

395 Butterworth frequency response. 

396 

397 :param corner: corner frequency of the response 

398 :param order: order of the response 

399 :param type: either ``high`` or ``low`` 

400 ''' 

401 

402 corner = Float.T(default=1.0) 

403 order = Int.T(default=4) 

404 type = StringChoice.T(choices=['low', 'high'], default='low') 

405 

406 def to_polezero(self): 

407 z, p, k = signal.butter( 

408 self.order, self.corner*2.*math.pi, 

409 btype=self.type, analog=True, output='zpk') 

410 

411 return PoleZeroResponse( 

412 zeros=aslist(z), 

413 poles=aslist(p), 

414 constant=float(k)) 

415 

416 def to_digital(self, deltat): 

417 b, a = signal.butter( 

418 self.order, self.corner*2.*deltat, 

419 self.type, analog=False) 

420 

421 return DigitalFilterResponse(b, a, deltat) 

422 

423 def to_analog(self): 

424 b, a = signal.butter( 

425 self.order, self.corner*2.*math.pi, 

426 self.type, analog=True) 

427 

428 return AnalogFilterResponse(b, a) 

429 

430 def to_digital_polezero(self, deltat): 

431 z, p, k = signal.butter( 

432 self.order, self.corner*2*deltat, 

433 btype=self.type, analog=False, output='zpk') 

434 

435 return DigitalPoleZeroResponse(z, p, k, deltat) 

436 

437 def evaluate(self, freqs): 

438 b, a = signal.butter( 

439 self.order, self.corner*2.*math.pi, 

440 self.type, analog=True) 

441 

442 return signal.freqs(b, a, freqs*2.*math.pi)[1] 

443 

444 

445class SampledResponse(FrequencyResponse): 

446 ''' 

447 Interpolates frequency response given at a set of sampled frequencies. 

448 

449 :param frequencies,values: frequencies and values of the sampled response 

450 function. 

451 :param left,right: values to return when input is out of range. If set to 

452 ``None`` (the default) the endpoints are returned. 

453 ''' 

454 

455 frequencies = Array.T(shape=(None,), dtype=float, serialize_as='list') 

456 values = Array.T(shape=(None,), dtype=complex, serialize_as='list') 

457 left = Complex.T(optional=True) 

458 right = Complex.T(optional=True) 

459 

460 def __init__(self, frequencies, values, left=None, right=None, **kwargs): 

461 FrequencyResponse.__init__( 

462 self, 

463 frequencies=asarray_1d(frequencies, float), 

464 values=asarray_1d(values, complex), 

465 **kwargs) 

466 

467 def evaluate(self, freqs): 

468 ereal = num.interp( 

469 freqs, self.frequencies, num.real(self.values), 

470 left=self.left, right=self.right) 

471 eimag = num.interp( 

472 freqs, self.frequencies, num.imag(self.values), 

473 left=self.left, right=self.right) 

474 transfer = ereal + 1.0j*eimag 

475 return transfer 

476 

477 def inverse(self): 

478 ''' 

479 Get inverse as a new :py:class:`SampledResponse` object. 

480 ''' 

481 

482 def inv_or_none(x): 

483 if x is not None: 

484 return 1./x 

485 

486 return SampledResponse( 

487 self.frequencies, 1./self.values, 

488 left=inv_or_none(self.left), 

489 right=inv_or_none(self.right)) 

490 

491 

492class IntegrationResponse(FrequencyResponse): 

493 ''' 

494 The integration response, optionally multiplied by a constant gain. 

495 

496 :param n: exponent (integer) 

497 :param gain: gain factor (float) 

498 

499 :: 

500 

501 gain 

502 T(f) = -------------- 

503 (j*2*pi * f)^n 

504 ''' 

505 

506 n = Int.T(optional=True, default=1) 

507 gain = Float.T(optional=True, default=1.0) 

508 

509 def __init__(self, n=1, gain=1.0, **kwargs): 

510 FrequencyResponse.__init__(self, n=n, gain=gain, **kwargs) 

511 

512 def evaluate(self, freqs): 

513 nonzero = freqs != 0.0 

514 resp = num.zeros(freqs.size, dtype=complex) 

515 resp[nonzero] = self.gain / (1.0j * 2. * num.pi*freqs[nonzero])**self.n 

516 return resp 

517 

518 

519class DifferentiationResponse(FrequencyResponse): 

520 ''' 

521 The differentiation response, optionally multiplied by a constant gain. 

522 

523 :param n: exponent (integer) 

524 :param gain: gain factor (float) 

525 

526 :: 

527 

528 T(f) = gain * (j*2*pi * f)^n 

529 ''' 

530 

531 n = Int.T(optional=True, default=1) 

532 gain = Float.T(optional=True, default=1.0) 

533 

534 def __init__(self, n=1, gain=1.0, **kwargs): 

535 FrequencyResponse.__init__(self, n=n, gain=gain, **kwargs) 

536 

537 def evaluate(self, freqs): 

538 return self.gain * (1.0j * 2. * num.pi * freqs)**self.n 

539 

540 

541class DigitalFilterResponse(FrequencyResponse): 

542 ''' 

543 Frequency response of an analog filter. 

544 

545 (see :py:func:`scipy.signal.freqz`). 

546 ''' 

547 

548 b = List.T(Float.T()) 

549 a = List.T(Float.T()) 

550 deltat = Float.T() 

551 drop_phase = Bool.T(default=False) 

552 

553 def __init__(self, b, a, deltat, drop_phase=False, **kwargs): 

554 FrequencyResponse.__init__( 

555 self, b=aslist(b), a=aslist(a), deltat=float(deltat), 

556 drop_phase=drop_phase, **kwargs) 

557 

558 def check_sampling_rate(self): 

559 if self.deltat == 0.0: 

560 raise InvalidResponseError( 

561 'Invalid digital response: sampling rate undefined') 

562 

563 def get_fmax(self): 

564 self.check_sampling_rate() 

565 return 0.5 / self.deltat 

566 

567 def evaluate(self, freqs): 

568 self.check_sampling_rate() 

569 

570 ok = freqs <= 0.5/self.deltat 

571 coeffs = num.zeros(freqs.size, dtype=complex) 

572 

573 coeffs[ok] = signal.freqz( 

574 self.b, self.a, freqs[ok]*2.*math.pi * self.deltat)[1] 

575 

576 coeffs[num.logical_not(ok)] = None 

577 if self.drop_phase: 

578 return num.abs(coeffs) 

579 else: 

580 return coeffs 

581 

582 def filter(self, tr): 

583 self.check_sampling_rate() 

584 

585 from pyrocko import trace 

586 trace.assert_same_sampling_rate(self, tr) 

587 tr_new = tr.copy(data=False) 

588 tr_new.set_ydata(signal.lfilter(self.b, self.a, tr.get_ydata())) 

589 return tr_new 

590 

591 

592class AnalogFilterResponse(FrequencyResponse): 

593 ''' 

594 Frequency response of an analog filter. 

595 

596 (see :py:func:`scipy.signal.freqs`). 

597 ''' 

598 

599 b = List.T(Float.T()) 

600 a = List.T(Float.T()) 

601 

602 def __init__(self, b, a, **kwargs): 

603 FrequencyResponse.__init__( 

604 self, b=aslist(b), a=aslist(a), **kwargs) 

605 

606 def evaluate(self, freqs): 

607 return signal.freqs(self.b, self.a, freqs*2.*math.pi)[1] 

608 

609 def to_digital(self, deltat, method='bilinear'): 

610 from scipy.signal import cont2discrete 

611 b, a, _ = cont2discrete((self.b, self.a), deltat, method=method) 

612 if b.ndim == 2: 

613 b = b[0] 

614 return DigitalFilterResponse(b.tolist(), a.tolist(), deltat) 

615 

616 

617class MultiplyResponse(FrequencyResponse): 

618 ''' 

619 Multiplication of several :py:class:`FrequencyResponse` objects. 

620 ''' 

621 

622 responses = List.T(FrequencyResponse.T()) 

623 

624 def __init__(self, responses=None, **kwargs): 

625 if responses is None: 

626 responses = [] 

627 FrequencyResponse.__init__(self, responses=responses, **kwargs) 

628 

629 def get_fmax(self): 

630 fmaxs = [resp.get_fmax() for resp in self.responses] 

631 fmaxs = [fmax for fmax in fmaxs if fmax is not None] 

632 if not fmaxs: 

633 return None 

634 else: 

635 return min(fmaxs) 

636 

637 def evaluate(self, freqs): 

638 a = num.ones(freqs.size, dtype=complex) 

639 for resp in self.responses: 

640 a *= resp.evaluate(freqs) 

641 

642 return a 

643 

644 def is_scalar(self): 

645 return all(resp.is_scalar() for resp in self.responses) 

646 

647 def get_scalar(self): 

648 ''' 

649 Get factor if this is a flat response. 

650 ''' 

651 if self.is_scalar(): 

652 return num.product(resp.get_scalar() for resp in self.responses) 

653 else: 

654 raise IsNotScalar() 

655 

656 def simplify(self): 

657 poles = [] 

658 zeros = [] 

659 constant = 1.0 

660 responses = [] 

661 for resp in self.responses: 

662 if isinstance(resp, PoleZeroResponse): 

663 poles.extend(resp.poles) 

664 zeros.extend(resp.zeros) 

665 constant *= resp.constant 

666 else: 

667 responses.append(resp) 

668 

669 if poles or zeros or constant != 1.0: 

670 responses[0:0] = [ 

671 PoleZeroResponse(poles=poles, zeros=zeros, constant=constant)] 

672 

673 self.responses = responses 

674 

675 

676class DelayResponse(FrequencyResponse): 

677 

678 delay = Float.T() 

679 

680 def evaluate(self, freqs): 

681 return num.exp(-2.0J * self.delay * num.pi * freqs) 

682 

683 

684class InvalidResponseError(Exception): 

685 pass 

686 

687 

688class InvalidResponse(FrequencyResponse): 

689 

690 message = String.T() 

691 

692 def __init__(self, message): 

693 FrequencyResponse.__init__(self, message=message) 

694 self.have_warned = False 

695 

696 def evaluate(self, freqs): 

697 if not self.have_warned: 

698 logger.warning('Invalid response: %s' % self.message) 

699 self.have_warned = True 

700 

701 return util.num_full_like(freqs, None, dtype=num.complex) 

702 

703 

704def simplify_responses(responses): 

705 

706 def unpack_multi(responses): 

707 for resp in responses: 

708 if isinstance(resp, MultiplyResponse): 

709 for sub in unpack_multi(resp.responses): 

710 yield sub 

711 else: 

712 yield resp 

713 

714 def cancel_pzs(poles, zeros): 

715 poles_new = [] 

716 zeros_new = list(zeros) 

717 for p in poles: 

718 try: 

719 zeros_new.pop(zeros_new.index(p)) 

720 except ValueError: 

721 poles_new.append(p) 

722 

723 return poles_new, zeros_new 

724 

725 def combine_pzs(responses): 

726 poles = [] 

727 zeros = [] 

728 constant = 1.0 

729 out = [] 

730 for resp in responses: 

731 if isinstance(resp, PoleZeroResponse): 

732 poles.extend(resp.poles) 

733 zeros.extend(resp.zeros) 

734 constant *= resp.constant 

735 else: 

736 out.append(resp) 

737 

738 poles, zeros = cancel_pzs(poles, zeros) 

739 if poles or zeros: 

740 out.insert(0, PoleZeroResponse( 

741 poles=poles, zeros=zeros, constant=constant)) 

742 elif constant != 1.0: 

743 out.insert(0, Gain(constant=constant)) 

744 

745 return out 

746 

747 def split(xs, condition): 

748 out = [], [] 

749 for x in xs: 

750 out[condition(x)].append(x) 

751 

752 return out 

753 

754 def combine_gains(responses): 

755 non_scalars, scalars = split(responses, lambda resp: resp.is_scalar()) 

756 if scalars: 

757 factor = num.prod([resp.get_scalar() for resp in scalars]) 

758 yield Gain(constant=factor) 

759 

760 for resp in non_scalars: 

761 yield resp 

762 

763 return list(combine_gains(combine_pzs(unpack_multi(responses))))