1# http://pyrocko.org - GPLv3 

2# 

3# The Pyrocko Developers, 21st Century 

4# ---|P------/S----------~Lg---------- 

5 

6'''This module provides convenience objects to handle frequency responses.''' 

7 

8import math 

9import logging 

10 

11import numpy as num 

12from scipy import signal 

13 

14from pyrocko import util, evalresp 

15from pyrocko.guts import Object, Float, Int, String, Complex, Tuple, List, \ 

16 StringChoice, Bool 

17from pyrocko.guts_array import Array 

18 

19 

20guts_prefix = 'pf' 

21 

22logger = logging.getLogger('pyrocko.response') 

23 

24 

25def asarray_1d(x, dtype): 

26 if isinstance(x, (list, tuple)) and x and isinstance(x[0], str): 

27 return num.asarray(list(map(dtype, x)), dtype=dtype) 

28 else: 

29 a = num.asarray(x, dtype=dtype) 

30 if not a.ndim == 1: 

31 raise ValueError('could not convert to 1D array') 

32 return a 

33 

34 

35def finalize_construction(breakpoints): 

36 breakpoints.sort() 

37 breakpoints_out = [] 

38 f_last = None 

39 for f, c in breakpoints: 

40 if f_last is not None and f == f_last: 

41 breakpoints_out[-1][1] += c 

42 else: 

43 breakpoints_out.append([f, c]) 

44 

45 f_last = f 

46 

47 breakpoints_out = [(f, c) for (f, c) in breakpoints_out if c != 0] 

48 return breakpoints_out 

49 

50 

51class FrequencyResponseCheckpoint(Object): 

52 frequency = Float.T() 

53 value = Float.T() 

54 

55 

56class IsNotScalar(Exception): 

57 pass 

58 

59 

60def str_fmax_failsafe(resp): 

61 try: 

62 return '%g' % resp.get_fmax() 

63 except InvalidResponseError: 

64 return '?' 

65 

66 

67class FrequencyResponse(Object): 

68 ''' 

69 Evaluates frequency response at given frequencies. 

70 ''' 

71 

72 checkpoints = List.T(FrequencyResponseCheckpoint.T()) 

73 

74 def evaluate(self, freqs): 

75 return num.ones(freqs.size, dtype=complex) 

76 

77 def evaluate1(self, freq): 

78 return self.evaluate(num.atleast_1d(freq))[0] 

79 

80 def is_scalar(self): 

81 ''' 

82 Check if this is a flat response. 

83 ''' 

84 

85 if type(self) is FrequencyResponse: 

86 return True 

87 else: 

88 return False # default for derived classes 

89 

90 def get_scalar(self): 

91 ''' 

92 Get factor if this is a flat response. 

93 ''' 

94 if type(self) is FrequencyResponse: 

95 return 1.0 

96 else: 

97 raise IsNotScalar() # default for derived classes 

98 

99 def get_fmax(self): 

100 return None 

101 

102 def construction(self): 

103 return [] 

104 

105 @property 

106 def summary(self): 

107 if type(self) is FrequencyResponse: 

108 return 'one' 

109 else: 

110 return 'unknown' 

111 

112 

113def str_gain(gain): 

114 if gain == 1.0: 

115 return 'one' 

116 elif isinstance(gain, complex): 

117 return 'gain{%s}' % repr(gain) 

118 else: 

119 return 'gain{%g}' % gain 

120 

121 

122class Gain(FrequencyResponse): 

123 ''' 

124 A flat frequency response. 

125 ''' 

126 

127 constant = Complex.T(default=1.0+0j) 

128 

129 def evaluate(self, freqs): 

130 return util.num_full_like(freqs, self.constant, dtype=complex) 

131 

132 def is_scalar(self): 

133 return True 

134 

135 def get_scalar(self): 

136 return self.constant 

137 

138 @property 

139 def summary(self): 

140 return str_gain(self.constant) 

141 

142 

143class Evalresp(FrequencyResponse): 

144 ''' 

145 Calls evalresp and generates values of the instrument response transfer 

146 function. 

147 

148 :param respfile: response file in evalresp format 

149 :param trace: trace for which the response is to be extracted from the file 

150 :param target: ``'dis'`` for displacement or ``'vel'`` for velocity 

151 ''' 

152 

153 respfile = String.T() 

154 nslc_id = Tuple.T(4, String.T()) 

155 target = String.T(default='dis') 

156 instant = Float.T() 

157 stages = Tuple.T(2, Int.T(), optional=True) 

158 

159 def __init__( 

160 self, 

161 respfile, 

162 trace=None, 

163 target='dis', 

164 nslc_id=None, 

165 time=None, 

166 stages=None, 

167 **kwargs): 

168 

169 if trace is not None: 

170 nslc_id = trace.nslc_id 

171 time = (trace.tmin + trace.tmax) / 2. 

172 

173 FrequencyResponse.__init__( 

174 self, 

175 respfile=respfile, 

176 nslc_id=nslc_id, 

177 instant=time, 

178 target=target, 

179 stages=stages, 

180 **kwargs) 

181 

182 def evaluate(self, freqs): 

183 network, station, location, channel = self.nslc_id 

184 if self.stages is None: 

185 stages = (-1, 0) 

186 else: 

187 stages = self.stages[0]+1, self.stages[1] 

188 

189 x = evalresp.evalresp( 

190 sta_list=station, 

191 cha_list=channel, 

192 net_code=network, 

193 locid=location, 

194 instant=self.instant, 

195 freqs=freqs, 

196 units=self.target.upper(), 

197 file=self.respfile, 

198 start_stage=stages[0], 

199 stop_stage=stages[1], 

200 rtype='CS') 

201 

202 transfer = x[0][4] 

203 return transfer 

204 

205 @property 

206 def summary(self): 

207 return 'eresp' 

208 

209 

210class InverseEvalresp(FrequencyResponse): 

211 ''' 

212 Calls evalresp and generates values of the inverse instrument response for 

213 deconvolution of instrument response. 

214 

215 :param respfile: response file in evalresp format 

216 :param trace: trace for which the response is to be extracted from the file 

217 :param target: ``'dis'`` for displacement or ``'vel'`` for velocity 

218 ''' 

219 

220 respfile = String.T() 

221 nslc_id = Tuple.T(4, String.T()) 

222 target = String.T(default='dis') 

223 instant = Float.T() 

224 

225 def __init__(self, respfile, trace, target='dis', **kwargs): 

226 FrequencyResponse.__init__( 

227 self, 

228 respfile=respfile, 

229 nslc_id=trace.nslc_id, 

230 instant=(trace.tmin + trace.tmax)/2., 

231 target=target, 

232 **kwargs) 

233 

234 def evaluate(self, freqs): 

235 network, station, location, channel = self.nslc_id 

236 x = evalresp.evalresp(sta_list=station, 

237 cha_list=channel, 

238 net_code=network, 

239 locid=location, 

240 instant=self.instant, 

241 freqs=freqs, 

242 units=self.target.upper(), 

243 file=self.respfile, 

244 rtype='CS') 

245 

246 transfer = x[0][4] 

247 return 1./transfer 

248 

249 @property 

250 def summary(self): 

251 return 'inv_eresp' 

252 

253 

254def aslist(x): 

255 if x is None: 

256 return [] 

257 

258 try: 

259 return list(x) 

260 except TypeError: 

261 return [x] 

262 

263 

264class PoleZeroResponse(FrequencyResponse): 

265 ''' 

266 Evaluates frequency response from pole-zero representation. 

267 

268 :param zeros: positions of zeros 

269 :type zeros: list of complex 

270 :param poles: positions of poles 

271 :type poles: list of complex 

272 :param constant: gain factor 

273 :type constant: complex 

274 

275 :: 

276 

277 (j*2*pi*f - zeros[0]) * (j*2*pi*f - zeros[1]) * ... 

278 T(f) = constant * ---------------------------------------------------- 

279 (j*2*pi*f - poles[0]) * (j*2*pi*f - poles[1]) * ... 

280 

281 

282 The poles and zeros should be given as angular frequencies, not in Hz. 

283 ''' 

284 

285 zeros = List.T(Complex.T()) 

286 poles = List.T(Complex.T()) 

287 constant = Complex.T(default=1.0+0j) 

288 

289 def __init__( 

290 self, 

291 zeros=None, 

292 poles=None, 

293 constant=1.0+0j, 

294 **kwargs): 

295 

296 if zeros is None: 

297 zeros = [] 

298 if poles is None: 

299 poles = [] 

300 

301 FrequencyResponse.__init__( 

302 self, 

303 zeros=aslist(zeros), 

304 poles=aslist(poles), 

305 constant=constant, 

306 **kwargs) 

307 

308 def evaluate(self, freqs): 

309 if hasattr(signal, 'freqs_zpk'): # added in scipy 0.19.0 

310 return signal.freqs_zpk( 

311 self.zeros, self.poles, self.constant, freqs*2.*num.pi)[1] 

312 else: 

313 jomeg = 1.0j * 2.*num.pi*freqs 

314 

315 a = num.ones(freqs.size, dtype=complex)*self.constant 

316 for z in self.zeros: 

317 a *= jomeg-z 

318 for p in self.poles: 

319 a /= jomeg-p 

320 

321 return a 

322 

323 def is_scalar(self): 

324 return len(self.zeros) == 0 and len(self.poles) == 0 

325 

326 def get_scalar(self): 

327 ''' 

328 Get factor if this is a flat response. 

329 ''' 

330 if self.is_scalar(): 

331 return self.constant 

332 else: 

333 raise IsNotScalar() 

334 

335 def inverse(self): 

336 return PoleZeroResponse( 

337 poles=list(self.zeros), 

338 zeros=list(self.poles), 

339 constant=1.0/self.constant) 

340 

341 def to_analog(self): 

342 b, a = signal.zpk2tf(self.zeros, self.poles, self.constant) 

343 return AnalogFilterResponse(aslist(b), aslist(a)) 

344 

345 def to_digital(self, deltat, method='bilinear'): 

346 from scipy.signal import cont2discrete, zpk2tf 

347 

348 z, p, k, _ = cont2discrete( 

349 (self.zeros, self.poles, self.constant), 

350 deltat, method=method) 

351 

352 b, a = zpk2tf(z, p, k) 

353 

354 return DigitalFilterResponse(b, a, deltat) 

355 

356 def to_digital_polezero(self, deltat, method='bilinear'): 

357 from scipy.signal import cont2discrete 

358 

359 z, p, k, _ = cont2discrete( 

360 (self.zeros, self.poles, self.constant), 

361 deltat, method=method) 

362 

363 return DigitalPoleZeroResponse(z, p, k, deltat) 

364 

365 def construction(self): 

366 breakpoints = [] 

367 for zero in self.zeros: 

368 f = abs(zero) / (2.*math.pi) 

369 breakpoints.append((f, 1)) 

370 

371 for pole in self.poles: 

372 f = abs(pole) / (2.*math.pi) 

373 breakpoints.append((f, -1)) 

374 

375 return finalize_construction(breakpoints) 

376 

377 @property 

378 def summary(self): 

379 if self.is_scalar(): 

380 return str_gain(self.get_scalar()) 

381 

382 return 'pz{%i,%i}' % (len(self.poles), len(self.zeros)) 

383 

384 

385class DigitalPoleZeroResponse(FrequencyResponse): 

386 ''' 

387 Evaluates frequency response from digital filter pole-zero representation. 

388 

389 :param zeros: positions of zeros 

390 :type zeros: list of complex 

391 :param poles: positions of poles 

392 :type poles: list of complex 

393 :param constant: gain factor 

394 :type constant: complex 

395 :param deltat: sampling interval 

396 :type deltat: float 

397 

398 The poles and zeros should be given as angular frequencies, not in Hz. 

399 ''' 

400 

401 zeros = List.T(Complex.T()) 

402 poles = List.T(Complex.T()) 

403 constant = Complex.T(default=1.0+0j) 

404 deltat = Float.T() 

405 

406 def __init__( 

407 self, 

408 zeros=None, 

409 poles=None, 

410 constant=1.0+0j, 

411 deltat=None, 

412 **kwargs): 

413 

414 if zeros is None: 

415 zeros = [] 

416 if poles is None: 

417 poles = [] 

418 if deltat is None: 

419 raise ValueError( 

420 'Sampling interval `deltat` must be given for ' 

421 'DigitalPoleZeroResponse') 

422 

423 FrequencyResponse.__init__( 

424 self, zeros=aslist(zeros), poles=aslist(poles), constant=constant, 

425 deltat=deltat, **kwargs) 

426 

427 def check_sampling_rate(self): 

428 if self.deltat == 0.0: 

429 raise InvalidResponseError( 

430 'Invalid digital response: sampling rate undefined') 

431 

432 def get_fmax(self): 

433 self.check_sampling_rate() 

434 return 0.5 / self.deltat 

435 

436 def evaluate(self, freqs): 

437 self.check_sampling_rate() 

438 return signal.freqz_zpk( 

439 self.zeros, self.poles, self.constant, 

440 freqs*(2.*math.pi*self.deltat))[1] 

441 

442 def is_scalar(self): 

443 return len(self.zeros) == 0 and len(self.poles) == 0 

444 

445 def get_scalar(self): 

446 ''' 

447 Get factor if this is a flat response. 

448 ''' 

449 if self.is_scalar(): 

450 return self.constant 

451 else: 

452 raise IsNotScalar() 

453 

454 def to_digital(self, deltat): 

455 self.check_sampling_rate() 

456 from scipy.signal import zpk2tf 

457 

458 b, a = zpk2tf(self.zeros, self.poles, self.constant) 

459 return DigitalFilterResponse(b, a, deltat) 

460 

461 @property 

462 def summary(self): 

463 if self.is_scalar(): 

464 return str_gain(self.get_scalar()) 

465 

466 return 'dpz{%i,%i,%s}' % ( 

467 len(self.poles), len(self.zeros), str_fmax_failsafe(self)) 

468 

469 

470class ButterworthResponse(FrequencyResponse): 

471 ''' 

472 Butterworth frequency response. 

473 

474 :param corner: corner frequency of the response 

475 :param order: order of the response 

476 :param type: either ``high`` or ``low`` 

477 ''' 

478 

479 corner = Float.T(default=1.0) 

480 order = Int.T(default=4) 

481 type = StringChoice.T(choices=['low', 'high'], default='low') 

482 

483 def to_polezero(self): 

484 z, p, k = signal.butter( 

485 self.order, self.corner*2.*math.pi, 

486 btype=self.type, analog=True, output='zpk') 

487 

488 return PoleZeroResponse( 

489 zeros=aslist(z), 

490 poles=aslist(p), 

491 constant=float(k)) 

492 

493 def to_digital(self, deltat): 

494 b, a = signal.butter( 

495 self.order, self.corner*2.*deltat, 

496 self.type, analog=False) 

497 

498 return DigitalFilterResponse(b, a, deltat) 

499 

500 def to_analog(self): 

501 b, a = signal.butter( 

502 self.order, self.corner*2.*math.pi, 

503 self.type, analog=True) 

504 

505 return AnalogFilterResponse(b, a) 

506 

507 def to_digital_polezero(self, deltat): 

508 z, p, k = signal.butter( 

509 self.order, self.corner*2*deltat, 

510 btype=self.type, analog=False, output='zpk') 

511 

512 return DigitalPoleZeroResponse(z, p, k, deltat) 

513 

514 def evaluate(self, freqs): 

515 b, a = signal.butter( 

516 self.order, self.corner*2.*math.pi, 

517 self.type, analog=True) 

518 

519 return signal.freqs(b, a, freqs*2.*math.pi)[1] 

520 

521 @property 

522 def summary(self): 

523 return 'butter_%s{%i,%g}' % ( 

524 self.type, 

525 self.order, 

526 self.corner) 

527 

528 

529class SampledResponse(FrequencyResponse): 

530 ''' 

531 Interpolates frequency response given at a set of sampled frequencies. 

532 

533 :param frequencies,values: frequencies and values of the sampled response 

534 function. 

535 :param left,right: values to return when input is out of range. If set to 

536 ``None`` (the default) the endpoints are returned. 

537 ''' 

538 

539 frequencies = Array.T(shape=(None,), dtype=float, serialize_as='list') 

540 values = Array.T(shape=(None,), dtype=complex, serialize_as='list') 

541 left = Complex.T(optional=True) 

542 right = Complex.T(optional=True) 

543 

544 def __init__(self, frequencies, values, left=None, right=None, **kwargs): 

545 FrequencyResponse.__init__( 

546 self, 

547 frequencies=asarray_1d(frequencies, float), 

548 values=asarray_1d(values, complex), 

549 **kwargs) 

550 

551 def evaluate(self, freqs): 

552 ereal = num.interp( 

553 freqs, self.frequencies, num.real(self.values), 

554 left=self.left, right=self.right) 

555 eimag = num.interp( 

556 freqs, self.frequencies, num.imag(self.values), 

557 left=self.left, right=self.right) 

558 transfer = ereal + 1.0j*eimag 

559 return transfer 

560 

561 def inverse(self): 

562 ''' 

563 Get inverse as a new :py:class:`SampledResponse` object. 

564 ''' 

565 

566 def inv_or_none(x): 

567 if x is not None: 

568 return 1./x 

569 

570 return SampledResponse( 

571 self.frequencies, 1./self.values, 

572 left=inv_or_none(self.left), 

573 right=inv_or_none(self.right)) 

574 

575 @property 

576 def summary(self): 

577 return 'sampled' 

578 

579 

580class IntegrationResponse(FrequencyResponse): 

581 ''' 

582 The integration response, optionally multiplied by a constant gain. 

583 

584 :param n: exponent (integer) 

585 :param gain: gain factor (float) 

586 

587 :: 

588 

589 gain 

590 T(f) = -------------- 

591 (j*2*pi * f)^n 

592 ''' 

593 

594 n = Int.T(optional=True, default=1) 

595 gain = Float.T(optional=True, default=1.0) 

596 

597 def __init__(self, n=1, gain=1.0, **kwargs): 

598 FrequencyResponse.__init__(self, n=n, gain=gain, **kwargs) 

599 

600 def evaluate(self, freqs): 

601 nonzero = freqs != 0.0 

602 resp = num.zeros(freqs.size, dtype=complex) 

603 resp[nonzero] = self.gain / (1.0j * 2. * num.pi*freqs[nonzero])**self.n 

604 return resp 

605 

606 @property 

607 def summary(self): 

608 return 'integration{%i}' % self.n + ( 

609 '*gain{%g}' % self.gain 

610 if self.gain is not None and self.gain != 1.0 

611 else '') 

612 

613 

614class DifferentiationResponse(FrequencyResponse): 

615 ''' 

616 The differentiation response, optionally multiplied by a constant gain. 

617 

618 :param n: exponent (integer) 

619 :param gain: gain factor (float) 

620 

621 :: 

622 

623 T(f) = gain * (j*2*pi * f)^n 

624 ''' 

625 

626 n = Int.T(optional=True, default=1) 

627 gain = Float.T(optional=True, default=1.0) 

628 

629 def __init__(self, n=1, gain=1.0, **kwargs): 

630 FrequencyResponse.__init__(self, n=n, gain=gain, **kwargs) 

631 

632 def evaluate(self, freqs): 

633 return self.gain * (1.0j * 2. * num.pi * freqs)**self.n 

634 

635 @property 

636 def summary(self): 

637 return 'differentiation{%i}' % self.n + ( 

638 '*gain{%g}' % self.gain 

639 if self.gain is not None and self.gain != 1.0 

640 else '') 

641 

642 

643class DigitalFilterResponse(FrequencyResponse): 

644 ''' 

645 Frequency response of an analog filter. 

646 

647 (see :py:func:`scipy.signal.freqz`). 

648 ''' 

649 

650 b = List.T(Float.T()) 

651 a = List.T(Float.T()) 

652 deltat = Float.T() 

653 drop_phase = Bool.T(default=False) 

654 

655 def __init__(self, b, a, deltat, drop_phase=False, **kwargs): 

656 FrequencyResponse.__init__( 

657 self, b=aslist(b), a=aslist(a), deltat=float(deltat), 

658 drop_phase=drop_phase, **kwargs) 

659 

660 def check_sampling_rate(self): 

661 if self.deltat == 0.0: 

662 raise InvalidResponseError( 

663 'Invalid digital response: sampling rate undefined') 

664 

665 def is_scalar(self): 

666 return len(self.a) == 1 and len(self.b) == 1 

667 

668 def get_scalar(self): 

669 if self.is_scalar(): 

670 return self.b[0] / self.a[0] 

671 else: 

672 raise IsNotScalar() 

673 

674 def get_fmax(self): 

675 if not self.is_scalar(): 

676 self.check_sampling_rate() 

677 return 0.5 / self.deltat 

678 else: 

679 return None 

680 

681 def evaluate(self, freqs): 

682 if self.is_scalar(): 

683 return util.num_full_like(freqs, self.get_scalar(), dtype=complex) 

684 

685 self.check_sampling_rate() 

686 

687 ok = freqs <= 0.5/self.deltat 

688 coeffs = num.zeros(freqs.size, dtype=complex) 

689 

690 coeffs[ok] = signal.freqz( 

691 self.b, self.a, freqs[ok]*2.*math.pi * self.deltat)[1] 

692 

693 coeffs[num.logical_not(ok)] = None 

694 if self.drop_phase: 

695 return num.abs(coeffs) 

696 else: 

697 return coeffs 

698 

699 def filter(self, tr): 

700 self.check_sampling_rate() 

701 

702 from pyrocko import trace 

703 trace.assert_same_sampling_rate(self, tr) 

704 tr_new = tr.copy(data=False) 

705 tr_new.set_ydata(signal.lfilter(self.b, self.a, tr.get_ydata())) 

706 return tr_new 

707 

708 @property 

709 def summary(self): 

710 if self.is_scalar(): 

711 return str_gain(self.get_scalar()) 

712 

713 elif len(self.a) == 1: 

714 return 'fir{%i,<=%sHz}' % ( 

715 len(self.b), str_fmax_failsafe(self)) 

716 

717 else: 

718 return 'iir{%i,%i,<=%sHz}' % ( 

719 len(self.b), len(self.a), str_fmax_failsafe(self)) 

720 

721 

722class AnalogFilterResponse(FrequencyResponse): 

723 ''' 

724 Frequency response of an analog filter. 

725 

726 (see :py:func:`scipy.signal.freqs`). 

727 ''' 

728 

729 b = List.T(Float.T()) 

730 a = List.T(Float.T()) 

731 

732 def __init__(self, b, a, **kwargs): 

733 FrequencyResponse.__init__( 

734 self, b=aslist(b), a=aslist(a), **kwargs) 

735 

736 def is_scalar(self): 

737 return len(self.a) == 1 and len(self.b) == 1 

738 

739 def get_scalar(self): 

740 if self.is_scalar(): 

741 return self.b[0] / self.a[0] 

742 else: 

743 raise IsNotScalar() 

744 

745 def evaluate(self, freqs): 

746 return signal.freqs(self.b, self.a, freqs*2.*math.pi)[1] 

747 

748 def to_digital(self, deltat, method='bilinear'): 

749 from scipy.signal import cont2discrete 

750 b, a, _ = cont2discrete((self.b, self.a), deltat, method=method) 

751 if b.ndim == 2: 

752 b = b[0] 

753 return DigitalFilterResponse(b.tolist(), a.tolist(), deltat) 

754 

755 @property 

756 def summary(self): 

757 if self.is_scalar(): 

758 return str_gain(self.get_scalar()) 

759 

760 return 'analog{%i,%i,%g}' % ( 

761 len(self.b), len(self.a), self.get_fmax()) 

762 

763 

764class MultiplyResponse(FrequencyResponse): 

765 ''' 

766 Multiplication of several :py:class:`FrequencyResponse` objects. 

767 ''' 

768 

769 responses = List.T(FrequencyResponse.T()) 

770 

771 def __init__(self, responses=None, **kwargs): 

772 if responses is None: 

773 responses = [] 

774 FrequencyResponse.__init__(self, responses=responses, **kwargs) 

775 

776 def get_fmax(self): 

777 fmaxs = [resp.get_fmax() for resp in self.responses] 

778 fmaxs = [fmax for fmax in fmaxs if fmax is not None] 

779 if not fmaxs: 

780 return None 

781 else: 

782 return min(fmaxs) 

783 

784 def evaluate(self, freqs): 

785 a = num.ones(freqs.size, dtype=complex) 

786 for resp in self.responses: 

787 a *= resp.evaluate(freqs) 

788 

789 return a 

790 

791 def is_scalar(self): 

792 return all(resp.is_scalar() for resp in self.responses) 

793 

794 def get_scalar(self): 

795 ''' 

796 Get factor if this is a flat response. 

797 ''' 

798 if self.is_scalar(): 

799 return num.product(resp.get_scalar() for resp in self.responses) 

800 else: 

801 raise IsNotScalar() 

802 

803 def simplify(self): 

804 self.responses = simplify_responses(self.responses) 

805 

806 def construction(self): 

807 breakpoints = [] 

808 for resp in self.responses: 

809 breakpoints.extend(resp.construction()) 

810 

811 return finalize_construction(breakpoints) 

812 

813 @property 

814 def summary(self): 

815 if self.is_scalar(self): 

816 return str_gain(self.get_scalar()) 

817 else: 

818 xs = [x.summary for x in self.responses] 

819 return '(%s)' % ('*'.join(x for x in xs if x != 'one') or 'one') 

820 

821 

822class DelayResponse(FrequencyResponse): 

823 

824 delay = Float.T() 

825 

826 def evaluate(self, freqs): 

827 return num.exp(-2.0J * self.delay * num.pi * freqs) 

828 

829 @property 

830 def summary(self): 

831 return 'delay{%g}' % self.delay 

832 

833 

834class InvalidResponseError(Exception): 

835 pass 

836 

837 

838class InvalidResponse(FrequencyResponse): 

839 

840 message = String.T() 

841 

842 def __init__(self, message): 

843 FrequencyResponse.__init__(self, message=message) 

844 self.have_warned = False 

845 

846 def evaluate(self, freqs): 

847 if not self.have_warned: 

848 logger.warning('Invalid response: %s' % self.message) 

849 self.have_warned = True 

850 

851 return util.num_full_like(freqs, None, dtype=num.complex) 

852 

853 @property 

854 def summary(self): 

855 return 'invalid' 

856 

857 

858def simplify_responses(responses): 

859 

860 def unpack_multi(responses): 

861 for resp in responses: 

862 if isinstance(resp, MultiplyResponse): 

863 for sub in unpack_multi(resp.responses): 

864 yield sub 

865 else: 

866 yield resp 

867 

868 def cancel_pzs(poles, zeros): 

869 poles_new = [] 

870 zeros_new = list(zeros) 

871 for p in poles: 

872 try: 

873 zeros_new.pop(zeros_new.index(p)) 

874 except ValueError: 

875 poles_new.append(p) 

876 

877 return poles_new, zeros_new 

878 

879 def combine_pzs(responses): 

880 poles = [] 

881 zeros = [] 

882 constant = 1.0 

883 out = [] 

884 for resp in responses: 

885 if isinstance(resp, PoleZeroResponse): 

886 poles.extend(resp.poles) 

887 zeros.extend(resp.zeros) 

888 constant *= resp.constant 

889 else: 

890 out.append(resp) 

891 

892 poles, zeros = cancel_pzs(poles, zeros) 

893 if poles or zeros: 

894 out.insert(0, PoleZeroResponse( 

895 poles=poles, zeros=zeros, constant=constant)) 

896 elif constant != 1.0: 

897 out.insert(0, Gain(constant=constant)) 

898 

899 return out 

900 

901 def split(xs, condition): 

902 out = [], [] 

903 for x in xs: 

904 out[condition(x)].append(x) 

905 

906 return out 

907 

908 def combine_gains(responses): 

909 non_scalars, scalars = split(responses, lambda resp: resp.is_scalar()) 

910 if scalars: 

911 factor = num.prod([resp.get_scalar() for resp in scalars]) 

912 yield Gain(constant=factor) 

913 

914 for resp in non_scalars: 

915 yield resp 

916 

917 return list(combine_gains(combine_pzs(unpack_multi(responses))))