1# http://pyrocko.org - GPLv3 

2# 

3# The Pyrocko Developers, 21st Century 

4# ---|P------/S----------~Lg---------- 

5 

6'''This module provides convenience objects to handle frequency responses.''' 

7 

8from __future__ import print_function, division, absolute_import 

9 

10import math 

11import logging 

12 

13import numpy as num 

14from scipy import signal 

15 

16from pyrocko import util, evalresp 

17from pyrocko.guts import Object, Float, Int, String, Complex, Tuple, List, \ 

18 StringChoice, Bool 

19from pyrocko.guts_array import Array 

20 

21try: 

22 newstr = unicode 

23except NameError: 

24 newstr = str 

25 

26 

27guts_prefix = 'pf' 

28 

29logger = logging.getLogger('pyrocko.response') 

30 

31 

32def asarray_1d(x, dtype): 

33 if isinstance(x, (list, tuple)) and x and isinstance(x[0], (str, newstr)): 

34 return num.asarray(list(map(dtype, x)), dtype=dtype) 

35 else: 

36 a = num.asarray(x, dtype=dtype) 

37 if not a.ndim == 1: 

38 raise ValueError('could not convert to 1D array') 

39 return a 

40 

41 

42class FrequencyResponseCheckpoint(Object): 

43 frequency = Float.T() 

44 value = Float.T() 

45 

46 

47class IsNotScalar(Exception): 

48 pass 

49 

50 

51class FrequencyResponse(Object): 

52 ''' 

53 Evaluates frequency response at given frequencies. 

54 ''' 

55 

56 checkpoints = List.T(FrequencyResponseCheckpoint.T()) 

57 

58 def evaluate(self, freqs): 

59 return num.ones(freqs.size, dtype=complex) 

60 

61 def is_scalar(self): 

62 ''' 

63 Check if this is a flat response. 

64 ''' 

65 

66 if type(self) is FrequencyResponse: 

67 return True 

68 else: 

69 return False # default for derived classes 

70 

71 def get_scalar(self): 

72 ''' 

73 Get factor if this is a flat response. 

74 ''' 

75 if type(self) is FrequencyResponse: 

76 return 1.0 

77 else: 

78 raise IsNotScalar() # default for derived classes 

79 

80 def get_fmax(self): 

81 return None 

82 

83 

84class Gain(FrequencyResponse): 

85 ''' 

86 A flat frequency response. 

87 ''' 

88 

89 constant = Complex.T(default=1.0+0j) 

90 

91 def evaluate(self, freqs): 

92 return util.num_full_like(freqs, self.constant, dtype=complex) 

93 

94 def is_scalar(self): 

95 return True 

96 

97 def get_scalar(self): 

98 return self.constant 

99 

100 

101class Evalresp(FrequencyResponse): 

102 ''' 

103 Calls evalresp and generates values of the instrument response transfer 

104 function. 

105 

106 :param respfile: response file in evalresp format 

107 :param trace: trace for which the response is to be extracted from the file 

108 :param target: ``'dis'`` for displacement or ``'vel'`` for velocity 

109 ''' 

110 

111 respfile = String.T() 

112 nslc_id = Tuple.T(4, String.T()) 

113 target = String.T(default='dis') 

114 instant = Float.T() 

115 stages = Tuple.T(2, Int.T(), optional=True) 

116 

117 def __init__( 

118 self, 

119 respfile, 

120 trace=None, 

121 target='dis', 

122 nslc_id=None, 

123 time=None, 

124 stages=None, 

125 **kwargs): 

126 

127 if trace is not None: 

128 nslc_id = trace.nslc_id 

129 time = (trace.tmin + trace.tmax) / 2. 

130 

131 FrequencyResponse.__init__( 

132 self, 

133 respfile=respfile, 

134 nslc_id=nslc_id, 

135 instant=time, 

136 target=target, 

137 stages=stages, 

138 **kwargs) 

139 

140 def evaluate(self, freqs): 

141 network, station, location, channel = self.nslc_id 

142 if self.stages is None: 

143 stages = (-1, 0) 

144 else: 

145 stages = self.stages[0]+1, self.stages[1] 

146 

147 x = evalresp.evalresp( 

148 sta_list=station, 

149 cha_list=channel, 

150 net_code=network, 

151 locid=location, 

152 instant=self.instant, 

153 freqs=freqs, 

154 units=self.target.upper(), 

155 file=self.respfile, 

156 start_stage=stages[0], 

157 stop_stage=stages[1], 

158 rtype='CS') 

159 

160 transfer = x[0][4] 

161 return transfer 

162 

163 

164class InverseEvalresp(FrequencyResponse): 

165 ''' 

166 Calls evalresp and generates values of the inverse instrument response for 

167 deconvolution of instrument response. 

168 

169 :param respfile: response file in evalresp format 

170 :param trace: trace for which the response is to be extracted from the file 

171 :param target: ``'dis'`` for displacement or ``'vel'`` for velocity 

172 ''' 

173 

174 respfile = String.T() 

175 nslc_id = Tuple.T(4, String.T()) 

176 target = String.T(default='dis') 

177 instant = Float.T() 

178 

179 def __init__(self, respfile, trace, target='dis', **kwargs): 

180 FrequencyResponse.__init__( 

181 self, 

182 respfile=respfile, 

183 nslc_id=trace.nslc_id, 

184 instant=(trace.tmin + trace.tmax)/2., 

185 target=target, 

186 **kwargs) 

187 

188 def evaluate(self, freqs): 

189 network, station, location, channel = self.nslc_id 

190 x = evalresp.evalresp(sta_list=station, 

191 cha_list=channel, 

192 net_code=network, 

193 locid=location, 

194 instant=self.instant, 

195 freqs=freqs, 

196 units=self.target.upper(), 

197 file=self.respfile, 

198 rtype='CS') 

199 

200 transfer = x[0][4] 

201 return 1./transfer 

202 

203 

204def aslist(x): 

205 if x is None: 

206 return [] 

207 

208 try: 

209 return list(x) 

210 except TypeError: 

211 return [x] 

212 

213 

214class PoleZeroResponse(FrequencyResponse): 

215 ''' 

216 Evaluates frequency response from pole-zero representation. 

217 

218 :param zeros: positions of zeros 

219 :type zeros: list of complex 

220 :param poles: positions of poles 

221 :type poles: list of complex 

222 :param constant: gain factor 

223 :type constant: complex 

224 

225 :: 

226 

227 (j*2*pi*f - zeros[0]) * (j*2*pi*f - zeros[1]) * ... 

228 T(f) = constant * ---------------------------------------------------- 

229 (j*2*pi*f - poles[0]) * (j*2*pi*f - poles[1]) * ... 

230 

231 

232 The poles and zeros should be given as angular frequencies, not in Hz. 

233 ''' 

234 

235 zeros = List.T(Complex.T()) 

236 poles = List.T(Complex.T()) 

237 constant = Complex.T(default=1.0+0j) 

238 

239 def __init__( 

240 self, 

241 zeros=None, 

242 poles=None, 

243 constant=1.0+0j, 

244 **kwargs): 

245 

246 if zeros is None: 

247 zeros = [] 

248 if poles is None: 

249 poles = [] 

250 

251 FrequencyResponse.__init__( 

252 self, 

253 zeros=aslist(zeros), 

254 poles=aslist(poles), 

255 constant=constant, 

256 **kwargs) 

257 

258 def evaluate(self, freqs): 

259 return signal.freqs_zpk( 

260 self.zeros, self.poles, self.constant, freqs*2.*num.pi)[1] 

261 

262 def is_scalar(self): 

263 return len(self.zeros) == 0 and len(self.poles) == 0 

264 

265 def get_scalar(self): 

266 ''' 

267 Get factor if this is a flat response. 

268 ''' 

269 if self.is_scalar(): 

270 return self.constant 

271 else: 

272 raise IsNotScalar() 

273 

274 def inverse(self): 

275 return PoleZeroResponse( 

276 poles=list(self.zeros), 

277 zeros=list(self.poles), 

278 constant=1.0/self.constant) 

279 

280 def to_analog(self): 

281 b, a = signal.zpk2tf(self.zeros, self.poles, self.constant) 

282 return AnalogFilterResponse(aslist(b), aslist(a)) 

283 

284 def to_digital(self, deltat, method='bilinear'): 

285 from scipy.signal import cont2discrete, zpk2tf 

286 

287 z, p, k, _ = cont2discrete( 

288 (self.zeros, self.poles, self.constant), 

289 deltat, method=method) 

290 

291 b, a = zpk2tf(z, p, k) 

292 

293 return DigitalFilterResponse(b, a, deltat) 

294 

295 def to_digital_polezero(self, deltat, method='bilinear'): 

296 from scipy.signal import cont2discrete 

297 

298 z, p, k, _ = cont2discrete( 

299 (self.zeros, self.poles, self.constant), 

300 deltat, method=method) 

301 

302 return DigitalPoleZeroResponse(z, p, k, deltat) 

303 

304 

305class DigitalPoleZeroResponse(FrequencyResponse): 

306 ''' 

307 Evaluates frequency response from digital filter pole-zero representation. 

308 

309 :param zeros: positions of zeros 

310 :type zeros: list of complex 

311 :param poles: positions of poles 

312 :type poles: list of complex 

313 :param constant: gain factor 

314 :type constant: complex 

315 :param deltat: sampling interval 

316 :type deltat: float 

317 

318 The poles and zeros should be given as angular frequencies, not in Hz. 

319 ''' 

320 

321 zeros = List.T(Complex.T()) 

322 poles = List.T(Complex.T()) 

323 constant = Complex.T(default=1.0+0j) 

324 deltat = Float.T() 

325 

326 def __init__( 

327 self, 

328 zeros=None, 

329 poles=None, 

330 constant=1.0+0j, 

331 deltat=None, 

332 **kwargs): 

333 

334 if zeros is None: 

335 zeros = [] 

336 if poles is None: 

337 poles = [] 

338 if deltat is None: 

339 raise ValueError( 

340 'Sampling interval `deltat` must be given for ' 

341 'DigitalPoleZeroResponse') 

342 

343 FrequencyResponse.__init__( 

344 self, zeros=aslist(zeros), poles=aslist(poles), constant=constant, 

345 deltat=deltat, **kwargs) 

346 

347 def check_sampling_rate(self): 

348 if self.deltat == 0.0: 

349 raise InvalidResponseError( 

350 'Invalid digital response: sampling rate undefined') 

351 

352 def get_fmax(self): 

353 self.check_sampling_rate() 

354 return 0.5 / self.deltat 

355 

356 def evaluate(self, freqs): 

357 self.check_sampling_rate() 

358 return signal.freqz_zpk( 

359 self.zeros, self.poles, self.constant, 

360 freqs*(2.*math.pi*self.deltat))[1] 

361 

362 def is_scalar(self): 

363 return len(self.zeros) == 0 and len(self.poles) == 0 

364 

365 def get_scalar(self): 

366 ''' 

367 Get factor if this is a flat response. 

368 ''' 

369 if self.is_scalar(): 

370 return self.constant 

371 else: 

372 raise IsNotScalar() 

373 

374 def to_digital(self, deltat): 

375 self.check_sampling_rate() 

376 from scipy.signal import zpk2tf 

377 

378 b, a = zpk2tf(self.zeros, self.poles, self.constant) 

379 return DigitalFilterResponse(b, a, deltat) 

380 

381 

382class ButterworthResponse(FrequencyResponse): 

383 ''' 

384 Butterworth frequency response. 

385 

386 :param corner: corner frequency of the response 

387 :param order: order of the response 

388 :param type: either ``high`` or ``low`` 

389 ''' 

390 

391 corner = Float.T(default=1.0) 

392 order = Int.T(default=4) 

393 type = StringChoice.T(choices=['low', 'high'], default='low') 

394 

395 def to_polezero(self): 

396 z, p, k = signal.butter( 

397 self.order, self.corner*2.*math.pi, 

398 btype=self.type, analog=True, output='zpk') 

399 

400 return PoleZeroResponse( 

401 zeros=aslist(z), 

402 poles=aslist(p), 

403 constant=float(k)) 

404 

405 def to_digital(self, deltat): 

406 b, a = signal.butter( 

407 self.order, self.corner*2.*deltat, 

408 self.type, analog=False) 

409 

410 return DigitalFilterResponse(b, a, deltat) 

411 

412 def to_analog(self): 

413 b, a = signal.butter( 

414 self.order, self.corner*2.*math.pi, 

415 self.type, analog=True) 

416 

417 return AnalogFilterResponse(b, a) 

418 

419 def to_digital_polezero(self, deltat): 

420 z, p, k = signal.butter( 

421 self.order, self.corner*2*deltat, 

422 btype=self.type, analog=False, output='zpk') 

423 

424 return DigitalPoleZeroResponse(z, p, k, deltat) 

425 

426 def evaluate(self, freqs): 

427 b, a = signal.butter( 

428 self.order, self.corner*2.*math.pi, 

429 self.type, analog=True) 

430 

431 return signal.freqs(b, a, freqs*2.*math.pi)[1] 

432 

433 

434class SampledResponse(FrequencyResponse): 

435 ''' 

436 Interpolates frequency response given at a set of sampled frequencies. 

437 

438 :param frequencies,values: frequencies and values of the sampled response 

439 function. 

440 :param left,right: values to return when input is out of range. If set to 

441 ``None`` (the default) the endpoints are returned. 

442 ''' 

443 

444 frequencies = Array.T(shape=(None,), dtype=float, serialize_as='list') 

445 values = Array.T(shape=(None,), dtype=complex, serialize_as='list') 

446 left = Complex.T(optional=True) 

447 right = Complex.T(optional=True) 

448 

449 def __init__(self, frequencies, values, left=None, right=None, **kwargs): 

450 FrequencyResponse.__init__( 

451 self, 

452 frequencies=asarray_1d(frequencies, float), 

453 values=asarray_1d(values, complex), 

454 **kwargs) 

455 

456 def evaluate(self, freqs): 

457 ereal = num.interp( 

458 freqs, self.frequencies, num.real(self.values), 

459 left=self.left, right=self.right) 

460 eimag = num.interp( 

461 freqs, self.frequencies, num.imag(self.values), 

462 left=self.left, right=self.right) 

463 transfer = ereal + 1.0j*eimag 

464 return transfer 

465 

466 def inverse(self): 

467 ''' 

468 Get inverse as a new :py:class:`SampledResponse` object. 

469 ''' 

470 

471 def inv_or_none(x): 

472 if x is not None: 

473 return 1./x 

474 

475 return SampledResponse( 

476 self.frequencies, 1./self.values, 

477 left=inv_or_none(self.left), 

478 right=inv_or_none(self.right)) 

479 

480 

481class IntegrationResponse(FrequencyResponse): 

482 ''' 

483 The integration response, optionally multiplied by a constant gain. 

484 

485 :param n: exponent (integer) 

486 :param gain: gain factor (float) 

487 

488 :: 

489 

490 gain 

491 T(f) = -------------- 

492 (j*2*pi * f)^n 

493 ''' 

494 

495 n = Int.T(optional=True, default=1) 

496 gain = Float.T(optional=True, default=1.0) 

497 

498 def __init__(self, n=1, gain=1.0, **kwargs): 

499 FrequencyResponse.__init__(self, n=n, gain=gain, **kwargs) 

500 

501 def evaluate(self, freqs): 

502 nonzero = freqs != 0.0 

503 resp = num.zeros(freqs.size, dtype=complex) 

504 resp[nonzero] = self.gain / (1.0j * 2. * num.pi*freqs[nonzero])**self.n 

505 return resp 

506 

507 

508class DifferentiationResponse(FrequencyResponse): 

509 ''' 

510 The differentiation response, optionally multiplied by a constant gain. 

511 

512 :param n: exponent (integer) 

513 :param gain: gain factor (float) 

514 

515 :: 

516 

517 T(f) = gain * (j*2*pi * f)^n 

518 ''' 

519 

520 n = Int.T(optional=True, default=1) 

521 gain = Float.T(optional=True, default=1.0) 

522 

523 def __init__(self, n=1, gain=1.0, **kwargs): 

524 FrequencyResponse.__init__(self, n=n, gain=gain, **kwargs) 

525 

526 def evaluate(self, freqs): 

527 return self.gain * (1.0j * 2. * num.pi * freqs)**self.n 

528 

529 

530class DigitalFilterResponse(FrequencyResponse): 

531 ''' 

532 Frequency response of an analog filter. 

533 

534 (see :py:func:`scipy.signal.freqz`). 

535 ''' 

536 

537 b = List.T(Float.T()) 

538 a = List.T(Float.T()) 

539 deltat = Float.T() 

540 drop_phase = Bool.T(default=False) 

541 

542 def __init__(self, b, a, deltat, drop_phase=False, **kwargs): 

543 FrequencyResponse.__init__( 

544 self, b=aslist(b), a=aslist(a), deltat=float(deltat), 

545 drop_phase=drop_phase, **kwargs) 

546 

547 def check_sampling_rate(self): 

548 if self.deltat == 0.0: 

549 raise InvalidResponseError( 

550 'Invalid digital response: sampling rate undefined') 

551 

552 def get_fmax(self): 

553 self.check_sampling_rate() 

554 return 0.5 / self.deltat 

555 

556 def evaluate(self, freqs): 

557 self.check_sampling_rate() 

558 

559 ok = freqs <= 0.5/self.deltat 

560 coeffs = num.zeros(freqs.size, dtype=complex) 

561 

562 coeffs[ok] = signal.freqz( 

563 self.b, self.a, freqs[ok]*2.*math.pi * self.deltat)[1] 

564 

565 coeffs[num.logical_not(ok)] = None 

566 if self.drop_phase: 

567 return num.abs(coeffs) 

568 else: 

569 return coeffs 

570 

571 def filter(self, tr): 

572 self.check_sampling_rate() 

573 

574 from pyrocko import trace 

575 trace.assert_same_sampling_rate(self, tr) 

576 tr_new = tr.copy(data=False) 

577 tr_new.set_ydata(signal.lfilter(self.b, self.a, tr.get_ydata())) 

578 return tr_new 

579 

580 

581class AnalogFilterResponse(FrequencyResponse): 

582 ''' 

583 Frequency response of an analog filter. 

584 

585 (see :py:func:`scipy.signal.freqs`). 

586 ''' 

587 

588 b = List.T(Float.T()) 

589 a = List.T(Float.T()) 

590 

591 def __init__(self, b, a, **kwargs): 

592 FrequencyResponse.__init__( 

593 self, b=aslist(b), a=aslist(a), **kwargs) 

594 

595 def evaluate(self, freqs): 

596 return signal.freqs(self.b, self.a, freqs*2.*math.pi)[1] 

597 

598 def to_digital(self, deltat, method='bilinear'): 

599 from scipy.signal import cont2discrete 

600 b, a, _ = cont2discrete((self.b, self.a), deltat, method=method) 

601 if b.ndim == 2: 

602 b = b[0] 

603 return DigitalFilterResponse(b.tolist(), a.tolist(), deltat) 

604 

605 

606class MultiplyResponse(FrequencyResponse): 

607 ''' 

608 Multiplication of several :py:class:`FrequencyResponse` objects. 

609 ''' 

610 

611 responses = List.T(FrequencyResponse.T()) 

612 

613 def __init__(self, responses=None, **kwargs): 

614 if responses is None: 

615 responses = [] 

616 FrequencyResponse.__init__(self, responses=responses, **kwargs) 

617 

618 def get_fmax(self): 

619 fmaxs = [resp.get_fmax() for resp in self.responses] 

620 fmaxs = [fmax for fmax in fmaxs if fmax is not None] 

621 if not fmaxs: 

622 return None 

623 else: 

624 return min(fmaxs) 

625 

626 def evaluate(self, freqs): 

627 a = num.ones(freqs.size, dtype=complex) 

628 for resp in self.responses: 

629 a *= resp.evaluate(freqs) 

630 

631 return a 

632 

633 def is_scalar(self): 

634 return all(resp.is_scalar() for resp in self.responses) 

635 

636 def get_scalar(self): 

637 ''' 

638 Get factor if this is a flat response. 

639 ''' 

640 if self.is_scalar(): 

641 return num.product(resp.get_scalar() for resp in self.responses) 

642 else: 

643 raise IsNotScalar() 

644 

645 def simplify(self): 

646 poles = [] 

647 zeros = [] 

648 constant = 1.0 

649 responses = [] 

650 for resp in self.responses: 

651 if isinstance(resp, PoleZeroResponse): 

652 poles.extend(resp.poles) 

653 zeros.extend(resp.zeros) 

654 constant *= resp.constant 

655 else: 

656 responses.append(resp) 

657 

658 if poles or zeros or constant != 1.0: 

659 responses[0:0] = [ 

660 PoleZeroResponse(poles=poles, zeros=zeros, constant=constant)] 

661 

662 self.responses = responses 

663 

664 

665class DelayResponse(FrequencyResponse): 

666 

667 delay = Float.T() 

668 

669 def evaluate(self, freqs): 

670 return num.exp(-2.0J * self.delay * num.pi * freqs) 

671 

672 

673class InvalidResponseError(Exception): 

674 pass 

675 

676 

677class InvalidResponse(FrequencyResponse): 

678 

679 message = String.T() 

680 

681 def __init__(self, message): 

682 FrequencyResponse.__init__(self, message=message) 

683 self.have_warned = False 

684 

685 def evaluate(self, freqs): 

686 if not self.have_warned: 

687 logger.warning('Invalid response: %s' % self.message) 

688 self.have_warned = True 

689 

690 return util.num_full_like(freqs, None, dtype=num.complex) 

691 

692 

693def simplify_responses(responses): 

694 

695 def unpack_multi(responses): 

696 for resp in responses: 

697 if isinstance(resp, MultiplyResponse): 

698 for sub in unpack_multi(resp.responses): 

699 yield sub 

700 else: 

701 yield resp 

702 

703 def cancel_pzs(poles, zeros): 

704 poles_new = [] 

705 zeros_new = list(zeros) 

706 for p in poles: 

707 try: 

708 zeros_new.pop(zeros_new.index(p)) 

709 except ValueError: 

710 poles_new.append(p) 

711 

712 return poles_new, zeros_new 

713 

714 def combine_pzs(responses): 

715 poles = [] 

716 zeros = [] 

717 constant = 1.0 

718 out = [] 

719 for resp in responses: 

720 if isinstance(resp, PoleZeroResponse): 

721 poles.extend(resp.poles) 

722 zeros.extend(resp.zeros) 

723 constant *= resp.constant 

724 else: 

725 out.append(resp) 

726 

727 poles, zeros = cancel_pzs(poles, zeros) 

728 if poles or zeros: 

729 out.insert(0, PoleZeroResponse( 

730 poles=poles, zeros=zeros, constant=constant)) 

731 elif constant != 1.0: 

732 out.insert(0, Gain(constant=constant)) 

733 

734 return out 

735 

736 def split(xs, condition): 

737 out = [], [] 

738 for x in xs: 

739 out[condition(x)].append(x) 

740 

741 return out 

742 

743 def combine_gains(responses): 

744 non_scalars, scalars = split(responses, lambda resp: resp.is_scalar()) 

745 if scalars: 

746 factor = num.prod([resp.get_scalar() for resp in scalars]) 

747 yield Gain(constant=factor) 

748 

749 for resp in non_scalars: 

750 yield resp 

751 

752 return list(combine_gains(combine_pzs(unpack_multi(responses))))