1# http://pyrocko.org - GPLv3 

2# 

3# The Pyrocko Developers, 21st Century 

4# ---|P------/S----------~Lg---------- 

5 

6'''This module provides convenience objects to handle frequency responses.''' 

7 

8import math 

9import logging 

10 

11import numpy as num 

12from scipy import signal 

13 

14from pyrocko import util, evalresp 

15from pyrocko.guts import Object, Float, Int, String, Complex, Tuple, List, \ 

16 StringChoice, Bool 

17from pyrocko.guts_array import Array 

18 

19 

20guts_prefix = 'pf' 

21 

22logger = logging.getLogger('pyrocko.response') 

23 

24 

25def asarray_1d(x, dtype): 

26 if isinstance(x, (list, tuple)) and x and isinstance(x[0], str): 

27 return num.asarray(list(map(dtype, x)), dtype=dtype) 

28 else: 

29 a = num.asarray(x, dtype=dtype) 

30 if not a.ndim == 1: 

31 raise ValueError('could not convert to 1D array') 

32 return a 

33 

34 

35def finalize_construction(breakpoints): 

36 breakpoints.sort() 

37 breakpoints_out = [] 

38 f_last = None 

39 for f, c in breakpoints: 

40 if f_last is not None and f == f_last: 

41 breakpoints_out[-1][1] += c 

42 else: 

43 breakpoints_out.append([f, c]) 

44 

45 f_last = f 

46 

47 breakpoints_out = [(f, c) for (f, c) in breakpoints_out if c != 0] 

48 return breakpoints_out 

49 

50 

51class FrequencyResponseCheckpoint(Object): 

52 frequency = Float.T() 

53 value = Float.T() 

54 

55 

56class IsNotScalar(Exception): 

57 pass 

58 

59 

60class FrequencyResponse(Object): 

61 ''' 

62 Evaluates frequency response at given frequencies. 

63 ''' 

64 

65 checkpoints = List.T(FrequencyResponseCheckpoint.T()) 

66 

67 def evaluate(self, freqs): 

68 return num.ones(freqs.size, dtype=complex) 

69 

70 def evaluate1(self, freq): 

71 return self.evaluate(num.atleast_1d(freq))[0] 

72 

73 def is_scalar(self): 

74 ''' 

75 Check if this is a flat response. 

76 ''' 

77 

78 if type(self) is FrequencyResponse: 

79 return True 

80 else: 

81 return False # default for derived classes 

82 

83 def get_scalar(self): 

84 ''' 

85 Get factor if this is a flat response. 

86 ''' 

87 if type(self) is FrequencyResponse: 

88 return 1.0 

89 else: 

90 raise IsNotScalar() # default for derived classes 

91 

92 def get_fmax(self): 

93 return None 

94 

95 def construction(self): 

96 return [] 

97 

98 

99class Gain(FrequencyResponse): 

100 ''' 

101 A flat frequency response. 

102 ''' 

103 

104 constant = Complex.T(default=1.0+0j) 

105 

106 def evaluate(self, freqs): 

107 return util.num_full_like(freqs, self.constant, dtype=complex) 

108 

109 def is_scalar(self): 

110 return True 

111 

112 def get_scalar(self): 

113 return self.constant 

114 

115 

116class Evalresp(FrequencyResponse): 

117 ''' 

118 Calls evalresp and generates values of the instrument response transfer 

119 function. 

120 

121 :param respfile: response file in evalresp format 

122 :param trace: trace for which the response is to be extracted from the file 

123 :param target: ``'dis'`` for displacement or ``'vel'`` for velocity 

124 ''' 

125 

126 respfile = String.T() 

127 nslc_id = Tuple.T(4, String.T()) 

128 target = String.T(default='dis') 

129 instant = Float.T() 

130 stages = Tuple.T(2, Int.T(), optional=True) 

131 

132 def __init__( 

133 self, 

134 respfile, 

135 trace=None, 

136 target='dis', 

137 nslc_id=None, 

138 time=None, 

139 stages=None, 

140 **kwargs): 

141 

142 if trace is not None: 

143 nslc_id = trace.nslc_id 

144 time = (trace.tmin + trace.tmax) / 2. 

145 

146 FrequencyResponse.__init__( 

147 self, 

148 respfile=respfile, 

149 nslc_id=nslc_id, 

150 instant=time, 

151 target=target, 

152 stages=stages, 

153 **kwargs) 

154 

155 def evaluate(self, freqs): 

156 network, station, location, channel = self.nslc_id 

157 if self.stages is None: 

158 stages = (-1, 0) 

159 else: 

160 stages = self.stages[0]+1, self.stages[1] 

161 

162 x = evalresp.evalresp( 

163 sta_list=station, 

164 cha_list=channel, 

165 net_code=network, 

166 locid=location, 

167 instant=self.instant, 

168 freqs=freqs, 

169 units=self.target.upper(), 

170 file=self.respfile, 

171 start_stage=stages[0], 

172 stop_stage=stages[1], 

173 rtype='CS') 

174 

175 transfer = x[0][4] 

176 return transfer 

177 

178 

179class InverseEvalresp(FrequencyResponse): 

180 ''' 

181 Calls evalresp and generates values of the inverse instrument response for 

182 deconvolution of instrument response. 

183 

184 :param respfile: response file in evalresp format 

185 :param trace: trace for which the response is to be extracted from the file 

186 :param target: ``'dis'`` for displacement or ``'vel'`` for velocity 

187 ''' 

188 

189 respfile = String.T() 

190 nslc_id = Tuple.T(4, String.T()) 

191 target = String.T(default='dis') 

192 instant = Float.T() 

193 

194 def __init__(self, respfile, trace, target='dis', **kwargs): 

195 FrequencyResponse.__init__( 

196 self, 

197 respfile=respfile, 

198 nslc_id=trace.nslc_id, 

199 instant=(trace.tmin + trace.tmax)/2., 

200 target=target, 

201 **kwargs) 

202 

203 def evaluate(self, freqs): 

204 network, station, location, channel = self.nslc_id 

205 x = evalresp.evalresp(sta_list=station, 

206 cha_list=channel, 

207 net_code=network, 

208 locid=location, 

209 instant=self.instant, 

210 freqs=freqs, 

211 units=self.target.upper(), 

212 file=self.respfile, 

213 rtype='CS') 

214 

215 transfer = x[0][4] 

216 return 1./transfer 

217 

218 

219def aslist(x): 

220 if x is None: 

221 return [] 

222 

223 try: 

224 return list(x) 

225 except TypeError: 

226 return [x] 

227 

228 

229class PoleZeroResponse(FrequencyResponse): 

230 ''' 

231 Evaluates frequency response from pole-zero representation. 

232 

233 :param zeros: positions of zeros 

234 :type zeros: list of complex 

235 :param poles: positions of poles 

236 :type poles: list of complex 

237 :param constant: gain factor 

238 :type constant: complex 

239 

240 :: 

241 

242 (j*2*pi*f - zeros[0]) * (j*2*pi*f - zeros[1]) * ... 

243 T(f) = constant * ---------------------------------------------------- 

244 (j*2*pi*f - poles[0]) * (j*2*pi*f - poles[1]) * ... 

245 

246 

247 The poles and zeros should be given as angular frequencies, not in Hz. 

248 ''' 

249 

250 zeros = List.T(Complex.T()) 

251 poles = List.T(Complex.T()) 

252 constant = Complex.T(default=1.0+0j) 

253 

254 def __init__( 

255 self, 

256 zeros=None, 

257 poles=None, 

258 constant=1.0+0j, 

259 **kwargs): 

260 

261 if zeros is None: 

262 zeros = [] 

263 if poles is None: 

264 poles = [] 

265 

266 FrequencyResponse.__init__( 

267 self, 

268 zeros=aslist(zeros), 

269 poles=aslist(poles), 

270 constant=constant, 

271 **kwargs) 

272 

273 def evaluate(self, freqs): 

274 if hasattr(signal, 'freqs_zpk'): # added in scipy 0.19.0 

275 return signal.freqs_zpk( 

276 self.zeros, self.poles, self.constant, freqs*2.*num.pi)[1] 

277 else: 

278 jomeg = 1.0j * 2.*num.pi*freqs 

279 

280 a = num.ones(freqs.size, dtype=complex)*self.constant 

281 for z in self.zeros: 

282 a *= jomeg-z 

283 for p in self.poles: 

284 a /= jomeg-p 

285 

286 return a 

287 

288 def is_scalar(self): 

289 return len(self.zeros) == 0 and len(self.poles) == 0 

290 

291 def get_scalar(self): 

292 ''' 

293 Get factor if this is a flat response. 

294 ''' 

295 if self.is_scalar(): 

296 return self.constant 

297 else: 

298 raise IsNotScalar() 

299 

300 def inverse(self): 

301 return PoleZeroResponse( 

302 poles=list(self.zeros), 

303 zeros=list(self.poles), 

304 constant=1.0/self.constant) 

305 

306 def to_analog(self): 

307 b, a = signal.zpk2tf(self.zeros, self.poles, self.constant) 

308 return AnalogFilterResponse(aslist(b), aslist(a)) 

309 

310 def to_digital(self, deltat, method='bilinear'): 

311 from scipy.signal import cont2discrete, zpk2tf 

312 

313 z, p, k, _ = cont2discrete( 

314 (self.zeros, self.poles, self.constant), 

315 deltat, method=method) 

316 

317 b, a = zpk2tf(z, p, k) 

318 

319 return DigitalFilterResponse(b, a, deltat) 

320 

321 def to_digital_polezero(self, deltat, method='bilinear'): 

322 from scipy.signal import cont2discrete 

323 

324 z, p, k, _ = cont2discrete( 

325 (self.zeros, self.poles, self.constant), 

326 deltat, method=method) 

327 

328 return DigitalPoleZeroResponse(z, p, k, deltat) 

329 

330 def construction(self): 

331 breakpoints = [] 

332 for zero in self.zeros: 

333 f = abs(zero) / (2.*math.pi) 

334 breakpoints.append((f, 1)) 

335 

336 for pole in self.poles: 

337 f = abs(pole) / (2.*math.pi) 

338 breakpoints.append((f, -1)) 

339 

340 return finalize_construction(breakpoints) 

341 

342 

343class DigitalPoleZeroResponse(FrequencyResponse): 

344 ''' 

345 Evaluates frequency response from digital filter pole-zero representation. 

346 

347 :param zeros: positions of zeros 

348 :type zeros: list of complex 

349 :param poles: positions of poles 

350 :type poles: list of complex 

351 :param constant: gain factor 

352 :type constant: complex 

353 :param deltat: sampling interval 

354 :type deltat: float 

355 

356 The poles and zeros should be given as angular frequencies, not in Hz. 

357 ''' 

358 

359 zeros = List.T(Complex.T()) 

360 poles = List.T(Complex.T()) 

361 constant = Complex.T(default=1.0+0j) 

362 deltat = Float.T() 

363 

364 def __init__( 

365 self, 

366 zeros=None, 

367 poles=None, 

368 constant=1.0+0j, 

369 deltat=None, 

370 **kwargs): 

371 

372 if zeros is None: 

373 zeros = [] 

374 if poles is None: 

375 poles = [] 

376 if deltat is None: 

377 raise ValueError( 

378 'Sampling interval `deltat` must be given for ' 

379 'DigitalPoleZeroResponse') 

380 

381 FrequencyResponse.__init__( 

382 self, zeros=aslist(zeros), poles=aslist(poles), constant=constant, 

383 deltat=deltat, **kwargs) 

384 

385 def check_sampling_rate(self): 

386 if self.deltat == 0.0: 

387 raise InvalidResponseError( 

388 'Invalid digital response: sampling rate undefined') 

389 

390 def get_fmax(self): 

391 self.check_sampling_rate() 

392 return 0.5 / self.deltat 

393 

394 def evaluate(self, freqs): 

395 self.check_sampling_rate() 

396 return signal.freqz_zpk( 

397 self.zeros, self.poles, self.constant, 

398 freqs*(2.*math.pi*self.deltat))[1] 

399 

400 def is_scalar(self): 

401 return len(self.zeros) == 0 and len(self.poles) == 0 

402 

403 def get_scalar(self): 

404 ''' 

405 Get factor if this is a flat response. 

406 ''' 

407 if self.is_scalar(): 

408 return self.constant 

409 else: 

410 raise IsNotScalar() 

411 

412 def to_digital(self, deltat): 

413 self.check_sampling_rate() 

414 from scipy.signal import zpk2tf 

415 

416 b, a = zpk2tf(self.zeros, self.poles, self.constant) 

417 return DigitalFilterResponse(b, a, deltat) 

418 

419 

420class ButterworthResponse(FrequencyResponse): 

421 ''' 

422 Butterworth frequency response. 

423 

424 :param corner: corner frequency of the response 

425 :param order: order of the response 

426 :param type: either ``high`` or ``low`` 

427 ''' 

428 

429 corner = Float.T(default=1.0) 

430 order = Int.T(default=4) 

431 type = StringChoice.T(choices=['low', 'high'], default='low') 

432 

433 def to_polezero(self): 

434 z, p, k = signal.butter( 

435 self.order, self.corner*2.*math.pi, 

436 btype=self.type, analog=True, output='zpk') 

437 

438 return PoleZeroResponse( 

439 zeros=aslist(z), 

440 poles=aslist(p), 

441 constant=float(k)) 

442 

443 def to_digital(self, deltat): 

444 b, a = signal.butter( 

445 self.order, self.corner*2.*deltat, 

446 self.type, analog=False) 

447 

448 return DigitalFilterResponse(b, a, deltat) 

449 

450 def to_analog(self): 

451 b, a = signal.butter( 

452 self.order, self.corner*2.*math.pi, 

453 self.type, analog=True) 

454 

455 return AnalogFilterResponse(b, a) 

456 

457 def to_digital_polezero(self, deltat): 

458 z, p, k = signal.butter( 

459 self.order, self.corner*2*deltat, 

460 btype=self.type, analog=False, output='zpk') 

461 

462 return DigitalPoleZeroResponse(z, p, k, deltat) 

463 

464 def evaluate(self, freqs): 

465 b, a = signal.butter( 

466 self.order, self.corner*2.*math.pi, 

467 self.type, analog=True) 

468 

469 return signal.freqs(b, a, freqs*2.*math.pi)[1] 

470 

471 

472class SampledResponse(FrequencyResponse): 

473 ''' 

474 Interpolates frequency response given at a set of sampled frequencies. 

475 

476 :param frequencies,values: frequencies and values of the sampled response 

477 function. 

478 :param left,right: values to return when input is out of range. If set to 

479 ``None`` (the default) the endpoints are returned. 

480 ''' 

481 

482 frequencies = Array.T(shape=(None,), dtype=float, serialize_as='list') 

483 values = Array.T(shape=(None,), dtype=complex, serialize_as='list') 

484 left = Complex.T(optional=True) 

485 right = Complex.T(optional=True) 

486 

487 def __init__(self, frequencies, values, left=None, right=None, **kwargs): 

488 FrequencyResponse.__init__( 

489 self, 

490 frequencies=asarray_1d(frequencies, float), 

491 values=asarray_1d(values, complex), 

492 **kwargs) 

493 

494 def evaluate(self, freqs): 

495 ereal = num.interp( 

496 freqs, self.frequencies, num.real(self.values), 

497 left=self.left, right=self.right) 

498 eimag = num.interp( 

499 freqs, self.frequencies, num.imag(self.values), 

500 left=self.left, right=self.right) 

501 transfer = ereal + 1.0j*eimag 

502 return transfer 

503 

504 def inverse(self): 

505 ''' 

506 Get inverse as a new :py:class:`SampledResponse` object. 

507 ''' 

508 

509 def inv_or_none(x): 

510 if x is not None: 

511 return 1./x 

512 

513 return SampledResponse( 

514 self.frequencies, 1./self.values, 

515 left=inv_or_none(self.left), 

516 right=inv_or_none(self.right)) 

517 

518 

519class IntegrationResponse(FrequencyResponse): 

520 ''' 

521 The integration response, optionally multiplied by a constant gain. 

522 

523 :param n: exponent (integer) 

524 :param gain: gain factor (float) 

525 

526 :: 

527 

528 gain 

529 T(f) = -------------- 

530 (j*2*pi * f)^n 

531 ''' 

532 

533 n = Int.T(optional=True, default=1) 

534 gain = Float.T(optional=True, default=1.0) 

535 

536 def __init__(self, n=1, gain=1.0, **kwargs): 

537 FrequencyResponse.__init__(self, n=n, gain=gain, **kwargs) 

538 

539 def evaluate(self, freqs): 

540 nonzero = freqs != 0.0 

541 resp = num.zeros(freqs.size, dtype=complex) 

542 resp[nonzero] = self.gain / (1.0j * 2. * num.pi*freqs[nonzero])**self.n 

543 return resp 

544 

545 

546class DifferentiationResponse(FrequencyResponse): 

547 ''' 

548 The differentiation response, optionally multiplied by a constant gain. 

549 

550 :param n: exponent (integer) 

551 :param gain: gain factor (float) 

552 

553 :: 

554 

555 T(f) = gain * (j*2*pi * f)^n 

556 ''' 

557 

558 n = Int.T(optional=True, default=1) 

559 gain = Float.T(optional=True, default=1.0) 

560 

561 def __init__(self, n=1, gain=1.0, **kwargs): 

562 FrequencyResponse.__init__(self, n=n, gain=gain, **kwargs) 

563 

564 def evaluate(self, freqs): 

565 return self.gain * (1.0j * 2. * num.pi * freqs)**self.n 

566 

567 

568class DigitalFilterResponse(FrequencyResponse): 

569 ''' 

570 Frequency response of an analog filter. 

571 

572 (see :py:func:`scipy.signal.freqz`). 

573 ''' 

574 

575 b = List.T(Float.T()) 

576 a = List.T(Float.T()) 

577 deltat = Float.T() 

578 drop_phase = Bool.T(default=False) 

579 

580 def __init__(self, b, a, deltat, drop_phase=False, **kwargs): 

581 FrequencyResponse.__init__( 

582 self, b=aslist(b), a=aslist(a), deltat=float(deltat), 

583 drop_phase=drop_phase, **kwargs) 

584 

585 def check_sampling_rate(self): 

586 if self.deltat == 0.0: 

587 raise InvalidResponseError( 

588 'Invalid digital response: sampling rate undefined') 

589 

590 def is_scalar(self): 

591 return len(self.a) == 1 and len(self.b) == 1 

592 

593 def get_scalar(self): 

594 if self.is_scalar(): 

595 return self.b[0] / self.a[0] 

596 else: 

597 raise IsNotScalar() 

598 

599 def get_fmax(self): 

600 if not self.is_scalar(): 

601 self.check_sampling_rate() 

602 return 0.5 / self.deltat 

603 else: 

604 return None 

605 

606 def evaluate(self, freqs): 

607 if self.is_scalar(): 

608 return util.num_full_like(freqs, self.get_scalar(), dtype=complex) 

609 

610 self.check_sampling_rate() 

611 

612 ok = freqs <= 0.5/self.deltat 

613 coeffs = num.zeros(freqs.size, dtype=complex) 

614 

615 coeffs[ok] = signal.freqz( 

616 self.b, self.a, freqs[ok]*2.*math.pi * self.deltat)[1] 

617 

618 coeffs[num.logical_not(ok)] = None 

619 if self.drop_phase: 

620 return num.abs(coeffs) 

621 else: 

622 return coeffs 

623 

624 def filter(self, tr): 

625 self.check_sampling_rate() 

626 

627 from pyrocko import trace 

628 trace.assert_same_sampling_rate(self, tr) 

629 tr_new = tr.copy(data=False) 

630 tr_new.set_ydata(signal.lfilter(self.b, self.a, tr.get_ydata())) 

631 return tr_new 

632 

633 

634class AnalogFilterResponse(FrequencyResponse): 

635 ''' 

636 Frequency response of an analog filter. 

637 

638 (see :py:func:`scipy.signal.freqs`). 

639 ''' 

640 

641 b = List.T(Float.T()) 

642 a = List.T(Float.T()) 

643 

644 def __init__(self, b, a, **kwargs): 

645 FrequencyResponse.__init__( 

646 self, b=aslist(b), a=aslist(a), **kwargs) 

647 

648 def evaluate(self, freqs): 

649 return signal.freqs(self.b, self.a, freqs*2.*math.pi)[1] 

650 

651 def to_digital(self, deltat, method='bilinear'): 

652 from scipy.signal import cont2discrete 

653 b, a, _ = cont2discrete((self.b, self.a), deltat, method=method) 

654 if b.ndim == 2: 

655 b = b[0] 

656 return DigitalFilterResponse(b.tolist(), a.tolist(), deltat) 

657 

658 

659class MultiplyResponse(FrequencyResponse): 

660 ''' 

661 Multiplication of several :py:class:`FrequencyResponse` objects. 

662 ''' 

663 

664 responses = List.T(FrequencyResponse.T()) 

665 

666 def __init__(self, responses=None, **kwargs): 

667 if responses is None: 

668 responses = [] 

669 FrequencyResponse.__init__(self, responses=responses, **kwargs) 

670 

671 def get_fmax(self): 

672 fmaxs = [resp.get_fmax() for resp in self.responses] 

673 fmaxs = [fmax for fmax in fmaxs if fmax is not None] 

674 if not fmaxs: 

675 return None 

676 else: 

677 return min(fmaxs) 

678 

679 def evaluate(self, freqs): 

680 a = num.ones(freqs.size, dtype=complex) 

681 for resp in self.responses: 

682 a *= resp.evaluate(freqs) 

683 

684 return a 

685 

686 def is_scalar(self): 

687 return all(resp.is_scalar() for resp in self.responses) 

688 

689 def get_scalar(self): 

690 ''' 

691 Get factor if this is a flat response. 

692 ''' 

693 if self.is_scalar(): 

694 return num.product(resp.get_scalar() for resp in self.responses) 

695 else: 

696 raise IsNotScalar() 

697 

698 def simplify(self): 

699 self.responses = simplify_responses(self.responses) 

700 

701 def construction(self): 

702 breakpoints = [] 

703 for resp in self.responses: 

704 breakpoints.extend(resp.construction()) 

705 

706 return finalize_construction(breakpoints) 

707 

708 

709class DelayResponse(FrequencyResponse): 

710 

711 delay = Float.T() 

712 

713 def evaluate(self, freqs): 

714 return num.exp(-2.0J * self.delay * num.pi * freqs) 

715 

716 

717class InvalidResponseError(Exception): 

718 pass 

719 

720 

721class InvalidResponse(FrequencyResponse): 

722 

723 message = String.T() 

724 

725 def __init__(self, message): 

726 FrequencyResponse.__init__(self, message=message) 

727 self.have_warned = False 

728 

729 def evaluate(self, freqs): 

730 if not self.have_warned: 

731 logger.warning('Invalid response: %s' % self.message) 

732 self.have_warned = True 

733 

734 return util.num_full_like(freqs, None, dtype=num.complex) 

735 

736 

737def simplify_responses(responses): 

738 

739 def unpack_multi(responses): 

740 for resp in responses: 

741 if isinstance(resp, MultiplyResponse): 

742 for sub in unpack_multi(resp.responses): 

743 yield sub 

744 else: 

745 yield resp 

746 

747 def cancel_pzs(poles, zeros): 

748 poles_new = [] 

749 zeros_new = list(zeros) 

750 for p in poles: 

751 try: 

752 zeros_new.pop(zeros_new.index(p)) 

753 except ValueError: 

754 poles_new.append(p) 

755 

756 return poles_new, zeros_new 

757 

758 def combine_pzs(responses): 

759 poles = [] 

760 zeros = [] 

761 constant = 1.0 

762 out = [] 

763 for resp in responses: 

764 if isinstance(resp, PoleZeroResponse): 

765 poles.extend(resp.poles) 

766 zeros.extend(resp.zeros) 

767 constant *= resp.constant 

768 else: 

769 out.append(resp) 

770 

771 poles, zeros = cancel_pzs(poles, zeros) 

772 if poles or zeros: 

773 out.insert(0, PoleZeroResponse( 

774 poles=poles, zeros=zeros, constant=constant)) 

775 elif constant != 1.0: 

776 out.insert(0, Gain(constant=constant)) 

777 

778 return out 

779 

780 def split(xs, condition): 

781 out = [], [] 

782 for x in xs: 

783 out[condition(x)].append(x) 

784 

785 return out 

786 

787 def combine_gains(responses): 

788 non_scalars, scalars = split(responses, lambda resp: resp.is_scalar()) 

789 if scalars: 

790 factor = num.prod([resp.get_scalar() for resp in scalars]) 

791 yield Gain(constant=factor) 

792 

793 for resp in non_scalars: 

794 yield resp 

795 

796 return list(combine_gains(combine_pzs(unpack_multi(responses))))