1# http://pyrocko.org - GPLv3 

2# 

3# The Pyrocko Developers, 21st Century 

4# ---|P------/S----------~Lg---------- 

5 

6'''This module provides convenience objects to handle frequency responses.''' 

7 

8import math 

9import logging 

10import uuid 

11 

12import numpy as num 

13from scipy import signal 

14 

15from pyrocko import util, evalresp 

16from pyrocko.guts import Object, Float, Int, String, Complex, Tuple, List, \ 

17 StringChoice, Bool 

18from pyrocko.guts_array import Array 

19 

20 

21guts_prefix = 'pf' 

22 

23logger = logging.getLogger('pyrocko.response') 

24 

25 

26def asarray_1d(x, dtype): 

27 if isinstance(x, (list, tuple)) and x and isinstance(x[0], str): 

28 return num.asarray(list(map(dtype, x)), dtype=dtype) 

29 else: 

30 a = num.asarray(x, dtype=dtype) 

31 if not a.ndim == 1: 

32 raise ValueError('could not convert to 1D array') 

33 return a 

34 

35 

36def finalize_construction(breakpoints): 

37 breakpoints.sort() 

38 breakpoints_out = [] 

39 f_last = None 

40 for f, c in breakpoints: 

41 if f_last is not None and f == f_last: 

42 breakpoints_out[-1][1] += c 

43 else: 

44 breakpoints_out.append([f, c]) 

45 

46 f_last = f 

47 

48 breakpoints_out = [(f, c) for (f, c) in breakpoints_out if c != 0] 

49 return breakpoints_out 

50 

51 

52class FrequencyResponseCheckpoint(Object): 

53 frequency = Float.T() 

54 value = Float.T() 

55 

56 

57class IsNotScalar(Exception): 

58 pass 

59 

60 

61def str_fmax_failsafe(resp): 

62 try: 

63 return '%g' % resp.get_fmax() 

64 except InvalidResponseError: 

65 return '?' 

66 

67 

68class FrequencyResponse(Object): 

69 ''' 

70 Evaluates frequency response at given frequencies. 

71 ''' 

72 

73 checkpoints = List.T(FrequencyResponseCheckpoint.T()) 

74 

75 def __init__(self, *args, **kwargs): 

76 Object.__init__(self, *args, **kwargs) 

77 self.uuid = uuid.uuid4() 

78 

79 def evaluate(self, freqs): 

80 return num.ones(freqs.size, dtype=complex) 

81 

82 def evaluate1(self, freq): 

83 return self.evaluate(num.atleast_1d(freq))[0] 

84 

85 def is_scalar(self): 

86 ''' 

87 Check if this is a flat response. 

88 ''' 

89 

90 if type(self) is FrequencyResponse: 

91 return True 

92 else: 

93 return False # default for derived classes 

94 

95 def get_scalar(self): 

96 ''' 

97 Get factor if this is a flat response. 

98 ''' 

99 if type(self) is FrequencyResponse: 

100 return 1.0 

101 else: 

102 raise IsNotScalar() # default for derived classes 

103 

104 def get_fmax(self): 

105 return None 

106 

107 def construction(self): 

108 return [] 

109 

110 @property 

111 def summary(self): 

112 if type(self) is FrequencyResponse: 

113 return 'one' 

114 else: 

115 return 'unknown' 

116 

117 

118def str_gain(gain): 

119 if gain == 1.0: 

120 return 'one' 

121 elif isinstance(gain, complex): 

122 return 'gain{%s}' % repr(gain) 

123 else: 

124 return 'gain{%g}' % gain 

125 

126 

127class Gain(FrequencyResponse): 

128 ''' 

129 A flat frequency response. 

130 ''' 

131 

132 constant = Complex.T(default=1.0+0j) 

133 

134 def evaluate(self, freqs): 

135 return util.num_full_like(freqs, self.constant, dtype=complex) 

136 

137 def is_scalar(self): 

138 return True 

139 

140 def get_scalar(self): 

141 return self.constant 

142 

143 @property 

144 def summary(self): 

145 return str_gain(self.constant) 

146 

147 

148class Evalresp(FrequencyResponse): 

149 ''' 

150 Calls evalresp and generates values of the instrument response transfer 

151 function. 

152 

153 :param respfile: response file in evalresp format 

154 :param trace: trace for which the response is to be extracted from the file 

155 :param target: ``'dis'`` for displacement or ``'vel'`` for velocity 

156 ''' 

157 

158 respfile = String.T() 

159 nslc_id = Tuple.T(4, String.T()) 

160 target = String.T(default='dis') 

161 instant = Float.T() 

162 stages = Tuple.T(2, Int.T(), optional=True) 

163 

164 def __init__( 

165 self, 

166 respfile, 

167 trace=None, 

168 target='dis', 

169 nslc_id=None, 

170 time=None, 

171 stages=None, 

172 **kwargs): 

173 

174 if trace is not None: 

175 nslc_id = trace.nslc_id 

176 time = (trace.tmin + trace.tmax) / 2. 

177 

178 FrequencyResponse.__init__( 

179 self, 

180 respfile=respfile, 

181 nslc_id=nslc_id, 

182 instant=time, 

183 target=target, 

184 stages=stages, 

185 **kwargs) 

186 

187 def evaluate(self, freqs): 

188 network, station, location, channel = self.nslc_id 

189 if self.stages is None: 

190 stages = (-1, 0) 

191 else: 

192 stages = self.stages[0]+1, self.stages[1] 

193 

194 x = evalresp.evalresp( 

195 sta_list=station, 

196 cha_list=channel, 

197 net_code=network, 

198 locid=location, 

199 instant=self.instant, 

200 freqs=freqs, 

201 units=self.target.upper(), 

202 file=self.respfile, 

203 start_stage=stages[0], 

204 stop_stage=stages[1], 

205 rtype='CS') 

206 

207 transfer = x[0][4] 

208 return transfer 

209 

210 @property 

211 def summary(self): 

212 return 'eresp' 

213 

214 

215class InverseEvalresp(FrequencyResponse): 

216 ''' 

217 Calls evalresp and generates values of the inverse instrument response for 

218 deconvolution of instrument response. 

219 

220 :param respfile: response file in evalresp format 

221 :param trace: trace for which the response is to be extracted from the file 

222 :param target: ``'dis'`` for displacement or ``'vel'`` for velocity 

223 ''' 

224 

225 respfile = String.T() 

226 nslc_id = Tuple.T(4, String.T()) 

227 target = String.T(default='dis') 

228 instant = Float.T() 

229 

230 def __init__(self, respfile, trace, target='dis', **kwargs): 

231 FrequencyResponse.__init__( 

232 self, 

233 respfile=respfile, 

234 nslc_id=trace.nslc_id, 

235 instant=(trace.tmin + trace.tmax)/2., 

236 target=target, 

237 **kwargs) 

238 

239 def evaluate(self, freqs): 

240 network, station, location, channel = self.nslc_id 

241 x = evalresp.evalresp(sta_list=station, 

242 cha_list=channel, 

243 net_code=network, 

244 locid=location, 

245 instant=self.instant, 

246 freqs=freqs, 

247 units=self.target.upper(), 

248 file=self.respfile, 

249 rtype='CS') 

250 

251 transfer = x[0][4] 

252 return 1./transfer 

253 

254 @property 

255 def summary(self): 

256 return 'inv_eresp' 

257 

258 

259def aslist(x): 

260 if x is None: 

261 return [] 

262 

263 try: 

264 return list(x) 

265 except TypeError: 

266 return [x] 

267 

268 

269class PoleZeroResponse(FrequencyResponse): 

270 ''' 

271 Evaluates frequency response from pole-zero representation. 

272 

273 :param zeros: positions of zeros 

274 :type zeros: list of complex 

275 :param poles: positions of poles 

276 :type poles: list of complex 

277 :param constant: gain factor 

278 :type constant: complex 

279 

280 :: 

281 

282 (j*2*pi*f - zeros[0]) * (j*2*pi*f - zeros[1]) * ... 

283 T(f) = constant * ---------------------------------------------------- 

284 (j*2*pi*f - poles[0]) * (j*2*pi*f - poles[1]) * ... 

285 

286 

287 The poles and zeros should be given as angular frequencies, not in Hz. 

288 ''' 

289 

290 zeros = List.T(Complex.T()) 

291 poles = List.T(Complex.T()) 

292 constant = Complex.T(default=1.0+0j) 

293 

294 def __init__( 

295 self, 

296 zeros=None, 

297 poles=None, 

298 constant=1.0+0j, 

299 **kwargs): 

300 

301 if zeros is None: 

302 zeros = [] 

303 if poles is None: 

304 poles = [] 

305 

306 FrequencyResponse.__init__( 

307 self, 

308 zeros=aslist(zeros), 

309 poles=aslist(poles), 

310 constant=constant, 

311 **kwargs) 

312 

313 def evaluate(self, freqs): 

314 if hasattr(signal, 'freqs_zpk'): # added in scipy 0.19.0 

315 return signal.freqs_zpk( 

316 self.zeros, self.poles, self.constant, freqs*2.*num.pi)[1] 

317 else: 

318 jomeg = 1.0j * 2.*num.pi*freqs 

319 

320 a = num.ones(freqs.size, dtype=complex)*self.constant 

321 for z in self.zeros: 

322 a *= jomeg-z 

323 for p in self.poles: 

324 a /= jomeg-p 

325 

326 return a 

327 

328 def is_scalar(self): 

329 return len(self.zeros) == 0 and len(self.poles) == 0 

330 

331 def get_scalar(self): 

332 ''' 

333 Get factor if this is a flat response. 

334 ''' 

335 if self.is_scalar(): 

336 return self.constant 

337 else: 

338 raise IsNotScalar() 

339 

340 def inverse(self): 

341 return PoleZeroResponse( 

342 poles=list(self.zeros), 

343 zeros=list(self.poles), 

344 constant=1.0/self.constant) 

345 

346 def to_analog(self): 

347 b, a = signal.zpk2tf(self.zeros, self.poles, self.constant) 

348 return AnalogFilterResponse(aslist(b), aslist(a)) 

349 

350 def to_digital(self, deltat, method='bilinear'): 

351 from scipy.signal import cont2discrete, zpk2tf 

352 

353 z, p, k, _ = cont2discrete( 

354 (self.zeros, self.poles, self.constant), 

355 deltat, method=method) 

356 

357 b, a = zpk2tf(z, p, k) 

358 

359 return DigitalFilterResponse(b, a, deltat) 

360 

361 def to_digital_polezero(self, deltat, method='bilinear'): 

362 from scipy.signal import cont2discrete 

363 

364 z, p, k, _ = cont2discrete( 

365 (self.zeros, self.poles, self.constant), 

366 deltat, method=method) 

367 

368 return DigitalPoleZeroResponse(z, p, k, deltat) 

369 

370 def construction(self): 

371 breakpoints = [] 

372 for zero in self.zeros: 

373 f = abs(zero) / (2.*math.pi) 

374 breakpoints.append((f, 1)) 

375 

376 for pole in self.poles: 

377 f = abs(pole) / (2.*math.pi) 

378 breakpoints.append((f, -1)) 

379 

380 return finalize_construction(breakpoints) 

381 

382 @property 

383 def summary(self): 

384 if self.is_scalar(): 

385 return str_gain(self.get_scalar()) 

386 

387 return 'pz{%i,%i}' % (len(self.poles), len(self.zeros)) 

388 

389 

390class DigitalPoleZeroResponse(FrequencyResponse): 

391 ''' 

392 Evaluates frequency response from digital filter pole-zero representation. 

393 

394 :param zeros: positions of zeros 

395 :type zeros: list of complex 

396 :param poles: positions of poles 

397 :type poles: list of complex 

398 :param constant: gain factor 

399 :type constant: complex 

400 :param deltat: sampling interval 

401 :type deltat: float 

402 

403 The poles and zeros should be given as angular frequencies, not in Hz. 

404 ''' 

405 

406 zeros = List.T(Complex.T()) 

407 poles = List.T(Complex.T()) 

408 constant = Complex.T(default=1.0+0j) 

409 deltat = Float.T() 

410 

411 def __init__( 

412 self, 

413 zeros=None, 

414 poles=None, 

415 constant=1.0+0j, 

416 deltat=None, 

417 **kwargs): 

418 

419 if zeros is None: 

420 zeros = [] 

421 if poles is None: 

422 poles = [] 

423 if deltat is None: 

424 raise ValueError( 

425 'Sampling interval `deltat` must be given for ' 

426 'DigitalPoleZeroResponse') 

427 

428 FrequencyResponse.__init__( 

429 self, zeros=aslist(zeros), poles=aslist(poles), constant=constant, 

430 deltat=deltat, **kwargs) 

431 

432 def check_sampling_rate(self): 

433 if self.deltat == 0.0: 

434 raise InvalidResponseError( 

435 'Invalid digital response: sampling rate undefined') 

436 

437 def get_fmax(self): 

438 self.check_sampling_rate() 

439 return 0.5 / self.deltat 

440 

441 def evaluate(self, freqs): 

442 self.check_sampling_rate() 

443 return signal.freqz_zpk( 

444 self.zeros, self.poles, self.constant, 

445 freqs*(2.*math.pi*self.deltat))[1] 

446 

447 def is_scalar(self): 

448 return len(self.zeros) == 0 and len(self.poles) == 0 

449 

450 def get_scalar(self): 

451 ''' 

452 Get factor if this is a flat response. 

453 ''' 

454 if self.is_scalar(): 

455 return self.constant 

456 else: 

457 raise IsNotScalar() 

458 

459 def to_digital(self, deltat): 

460 self.check_sampling_rate() 

461 from scipy.signal import zpk2tf 

462 

463 b, a = zpk2tf(self.zeros, self.poles, self.constant) 

464 return DigitalFilterResponse(b, a, deltat) 

465 

466 @property 

467 def summary(self): 

468 if self.is_scalar(): 

469 return str_gain(self.get_scalar()) 

470 

471 return 'dpz{%i,%i,%s}' % ( 

472 len(self.poles), len(self.zeros), str_fmax_failsafe(self)) 

473 

474 

475class ButterworthResponse(FrequencyResponse): 

476 ''' 

477 Butterworth frequency response. 

478 

479 :param corner: corner frequency of the response 

480 :param order: order of the response 

481 :param type: either ``high`` or ``low`` 

482 ''' 

483 

484 corner = Float.T(default=1.0) 

485 order = Int.T(default=4) 

486 type = StringChoice.T(choices=['low', 'high'], default='low') 

487 

488 def to_polezero(self): 

489 z, p, k = signal.butter( 

490 self.order, self.corner*2.*math.pi, 

491 btype=self.type, analog=True, output='zpk') 

492 

493 return PoleZeroResponse( 

494 zeros=aslist(z), 

495 poles=aslist(p), 

496 constant=float(k)) 

497 

498 def to_digital(self, deltat): 

499 b, a = signal.butter( 

500 self.order, self.corner*2.*deltat, 

501 self.type, analog=False) 

502 

503 return DigitalFilterResponse(b, a, deltat) 

504 

505 def to_analog(self): 

506 b, a = signal.butter( 

507 self.order, self.corner*2.*math.pi, 

508 self.type, analog=True) 

509 

510 return AnalogFilterResponse(b, a) 

511 

512 def to_digital_polezero(self, deltat): 

513 z, p, k = signal.butter( 

514 self.order, self.corner*2*deltat, 

515 btype=self.type, analog=False, output='zpk') 

516 

517 return DigitalPoleZeroResponse(z, p, k, deltat) 

518 

519 def evaluate(self, freqs): 

520 b, a = signal.butter( 

521 self.order, self.corner*2.*math.pi, 

522 self.type, analog=True) 

523 

524 return signal.freqs(b, a, freqs*2.*math.pi)[1] 

525 

526 @property 

527 def summary(self): 

528 return 'butter_%s{%i,%g}' % ( 

529 self.type, 

530 self.order, 

531 self.corner) 

532 

533 

534class SampledResponse(FrequencyResponse): 

535 ''' 

536 Interpolates frequency response given at a set of sampled frequencies. 

537 

538 :param frequencies,values: frequencies and values of the sampled response 

539 function. 

540 :param left,right: values to return when input is out of range. If set to 

541 ``None`` (the default) the endpoints are returned. 

542 ''' 

543 

544 frequencies = Array.T(shape=(None,), dtype=float, serialize_as='list') 

545 values = Array.T(shape=(None,), dtype=complex, serialize_as='list') 

546 left = Complex.T(optional=True) 

547 right = Complex.T(optional=True) 

548 

549 def __init__(self, frequencies, values, left=None, right=None, **kwargs): 

550 FrequencyResponse.__init__( 

551 self, 

552 frequencies=asarray_1d(frequencies, float), 

553 values=asarray_1d(values, complex), 

554 **kwargs) 

555 

556 def evaluate(self, freqs): 

557 ereal = num.interp( 

558 freqs, self.frequencies, num.real(self.values), 

559 left=self.left, right=self.right) 

560 eimag = num.interp( 

561 freqs, self.frequencies, num.imag(self.values), 

562 left=self.left, right=self.right) 

563 transfer = ereal + 1.0j*eimag 

564 return transfer 

565 

566 def inverse(self): 

567 ''' 

568 Get inverse as a new :py:class:`SampledResponse` object. 

569 ''' 

570 

571 def inv_or_none(x): 

572 if x is not None: 

573 return 1./x 

574 

575 return SampledResponse( 

576 self.frequencies, 1./self.values, 

577 left=inv_or_none(self.left), 

578 right=inv_or_none(self.right)) 

579 

580 @property 

581 def summary(self): 

582 return 'sampled' 

583 

584 

585class IntegrationResponse(FrequencyResponse): 

586 ''' 

587 The integration response, optionally multiplied by a constant gain. 

588 

589 :param n: exponent (integer) 

590 :param gain: gain factor (float) 

591 

592 :: 

593 

594 gain 

595 T(f) = -------------- 

596 (j*2*pi * f)^n 

597 ''' 

598 

599 n = Int.T(optional=True, default=1) 

600 gain = Float.T(optional=True, default=1.0) 

601 

602 def __init__(self, n=1, gain=1.0, **kwargs): 

603 FrequencyResponse.__init__(self, n=n, gain=gain, **kwargs) 

604 

605 def evaluate(self, freqs): 

606 nonzero = freqs != 0.0 

607 resp = num.zeros(freqs.size, dtype=complex) 

608 resp[nonzero] = self.gain / (1.0j * 2. * num.pi*freqs[nonzero])**self.n 

609 return resp 

610 

611 @property 

612 def summary(self): 

613 return 'integration{%i}' % self.n + ( 

614 '*gain{%g}' % self.gain 

615 if self.gain is not None and self.gain != 1.0 

616 else '') 

617 

618 

619class DifferentiationResponse(FrequencyResponse): 

620 ''' 

621 The differentiation response, optionally multiplied by a constant gain. 

622 

623 :param n: exponent (integer) 

624 :param gain: gain factor (float) 

625 

626 :: 

627 

628 T(f) = gain * (j*2*pi * f)^n 

629 ''' 

630 

631 n = Int.T(optional=True, default=1) 

632 gain = Float.T(optional=True, default=1.0) 

633 

634 def __init__(self, n=1, gain=1.0, **kwargs): 

635 FrequencyResponse.__init__(self, n=n, gain=gain, **kwargs) 

636 

637 def evaluate(self, freqs): 

638 return self.gain * (1.0j * 2. * num.pi * freqs)**self.n 

639 

640 @property 

641 def summary(self): 

642 return 'differentiation{%i}' % self.n + ( 

643 '*gain{%g}' % self.gain 

644 if self.gain is not None and self.gain != 1.0 

645 else '') 

646 

647 

648class DigitalFilterResponse(FrequencyResponse): 

649 ''' 

650 Frequency response of an analog filter. 

651 

652 (see :py:func:`scipy.signal.freqz`). 

653 ''' 

654 

655 b = List.T(Float.T()) 

656 a = List.T(Float.T()) 

657 deltat = Float.T() 

658 drop_phase = Bool.T(default=False) 

659 

660 def __init__(self, b, a, deltat, drop_phase=False, **kwargs): 

661 FrequencyResponse.__init__( 

662 self, b=aslist(b), a=aslist(a), deltat=float(deltat), 

663 drop_phase=drop_phase, **kwargs) 

664 

665 def check_sampling_rate(self): 

666 if self.deltat == 0.0: 

667 raise InvalidResponseError( 

668 'Invalid digital response: sampling rate undefined') 

669 

670 def is_scalar(self): 

671 return len(self.a) == 1 and len(self.b) == 1 

672 

673 def get_scalar(self): 

674 if self.is_scalar(): 

675 return self.b[0] / self.a[0] 

676 else: 

677 raise IsNotScalar() 

678 

679 def get_fmax(self): 

680 if not self.is_scalar(): 

681 self.check_sampling_rate() 

682 return 0.5 / self.deltat 

683 else: 

684 return None 

685 

686 def evaluate(self, freqs): 

687 if self.is_scalar(): 

688 return util.num_full_like(freqs, self.get_scalar(), dtype=complex) 

689 

690 self.check_sampling_rate() 

691 

692 ok = freqs <= 0.5/self.deltat 

693 coeffs = num.zeros(freqs.size, dtype=complex) 

694 

695 coeffs[ok] = signal.freqz( 

696 self.b, self.a, freqs[ok]*2.*math.pi * self.deltat)[1] 

697 

698 coeffs[num.logical_not(ok)] = None 

699 if self.drop_phase: 

700 return num.abs(coeffs) 

701 else: 

702 return coeffs 

703 

704 def filter(self, tr): 

705 self.check_sampling_rate() 

706 

707 from pyrocko import trace 

708 trace.assert_same_sampling_rate(self, tr) 

709 tr_new = tr.copy(data=False) 

710 tr_new.set_ydata(signal.lfilter(self.b, self.a, tr.get_ydata())) 

711 return tr_new 

712 

713 @property 

714 def summary(self): 

715 if self.is_scalar(): 

716 return str_gain(self.get_scalar()) 

717 

718 elif len(self.a) == 1: 

719 return 'fir{%i,<=%sHz}' % ( 

720 len(self.b), str_fmax_failsafe(self)) 

721 

722 else: 

723 return 'iir{%i,%i,<=%sHz}' % ( 

724 len(self.b), len(self.a), str_fmax_failsafe(self)) 

725 

726 

727class AnalogFilterResponse(FrequencyResponse): 

728 ''' 

729 Frequency response of an analog filter. 

730 

731 (see :py:func:`scipy.signal.freqs`). 

732 ''' 

733 

734 b = List.T(Float.T()) 

735 a = List.T(Float.T()) 

736 

737 def __init__(self, b, a, **kwargs): 

738 FrequencyResponse.__init__( 

739 self, b=aslist(b), a=aslist(a), **kwargs) 

740 

741 def is_scalar(self): 

742 return len(self.a) == 1 and len(self.b) == 1 

743 

744 def get_scalar(self): 

745 if self.is_scalar(): 

746 return self.b[0] / self.a[0] 

747 else: 

748 raise IsNotScalar() 

749 

750 def evaluate(self, freqs): 

751 return signal.freqs(self.b, self.a, freqs*2.*math.pi)[1] 

752 

753 def to_digital(self, deltat, method='bilinear'): 

754 from scipy.signal import cont2discrete 

755 b, a, _ = cont2discrete((self.b, self.a), deltat, method=method) 

756 if b.ndim == 2: 

757 b = b[0] 

758 return DigitalFilterResponse(b.tolist(), a.tolist(), deltat) 

759 

760 @property 

761 def summary(self): 

762 if self.is_scalar(): 

763 return str_gain(self.get_scalar()) 

764 

765 return 'analog{%i,%i,%g}' % ( 

766 len(self.b), len(self.a), self.get_fmax()) 

767 

768 

769class MultiplyResponse(FrequencyResponse): 

770 ''' 

771 Multiplication of several :py:class:`FrequencyResponse` objects. 

772 ''' 

773 

774 responses = List.T(FrequencyResponse.T()) 

775 

776 def __init__(self, responses=None, **kwargs): 

777 if responses is None: 

778 responses = [] 

779 FrequencyResponse.__init__(self, responses=responses, **kwargs) 

780 

781 def get_fmax(self): 

782 fmaxs = [resp.get_fmax() for resp in self.responses] 

783 fmaxs = [fmax for fmax in fmaxs if fmax is not None] 

784 if not fmaxs: 

785 return None 

786 else: 

787 return min(fmaxs) 

788 

789 def evaluate(self, freqs): 

790 a = num.ones(freqs.size, dtype=complex) 

791 for resp in self.responses: 

792 a *= resp.evaluate(freqs) 

793 

794 return a 

795 

796 def is_scalar(self): 

797 return all(resp.is_scalar() for resp in self.responses) 

798 

799 def get_scalar(self): 

800 ''' 

801 Get factor if this is a flat response. 

802 ''' 

803 if self.is_scalar(): 

804 return num.prod(resp.get_scalar() for resp in self.responses) 

805 else: 

806 raise IsNotScalar() 

807 

808 def simplify(self): 

809 self.responses = simplify_responses(self.responses) 

810 

811 def construction(self): 

812 breakpoints = [] 

813 for resp in self.responses: 

814 breakpoints.extend(resp.construction()) 

815 

816 return finalize_construction(breakpoints) 

817 

818 @property 

819 def summary(self): 

820 if self.is_scalar(self): 

821 return str_gain(self.get_scalar()) 

822 else: 

823 xs = [x.summary for x in self.responses] 

824 return '(%s)' % ('*'.join(x for x in xs if x != 'one') or 'one') 

825 

826 

827class DelayResponse(FrequencyResponse): 

828 

829 delay = Float.T() 

830 

831 def evaluate(self, freqs): 

832 return num.exp(-2.0J * self.delay * num.pi * freqs) 

833 

834 @property 

835 def summary(self): 

836 return 'delay{%g}' % self.delay 

837 

838 

839class InvalidResponseError(Exception): 

840 pass 

841 

842 

843class InvalidResponse(FrequencyResponse): 

844 

845 message = String.T() 

846 

847 def __init__(self, message): 

848 FrequencyResponse.__init__(self, message=message) 

849 self.have_warned = False 

850 

851 def evaluate(self, freqs): 

852 if not self.have_warned: 

853 logger.warning('Invalid response: %s' % self.message) 

854 self.have_warned = True 

855 

856 return util.num_full_like(freqs, None, dtype=num.complex) 

857 

858 @property 

859 def summary(self): 

860 return 'invalid' 

861 

862 

863def simplify_responses(responses): 

864 

865 def unpack_multi(responses): 

866 for resp in responses: 

867 if isinstance(resp, MultiplyResponse): 

868 for sub in unpack_multi(resp.responses): 

869 yield sub 

870 else: 

871 yield resp 

872 

873 def cancel_pzs(poles, zeros): 

874 poles_new = [] 

875 zeros_new = list(zeros) 

876 for p in poles: 

877 try: 

878 zeros_new.pop(zeros_new.index(p)) 

879 except ValueError: 

880 poles_new.append(p) 

881 

882 return poles_new, zeros_new 

883 

884 def combine_pzs(responses): 

885 poles = [] 

886 zeros = [] 

887 constant = 1.0 

888 out = [] 

889 for resp in responses: 

890 if isinstance(resp, PoleZeroResponse): 

891 poles.extend(resp.poles) 

892 zeros.extend(resp.zeros) 

893 constant *= resp.constant 

894 else: 

895 out.append(resp) 

896 

897 poles, zeros = cancel_pzs(poles, zeros) 

898 if poles or zeros: 

899 out.insert(0, PoleZeroResponse( 

900 poles=poles, zeros=zeros, constant=constant)) 

901 elif constant != 1.0: 

902 out.insert(0, Gain(constant=constant)) 

903 

904 return out 

905 

906 def split(xs, condition): 

907 out = [], [] 

908 for x in xs: 

909 out[condition(x)].append(x) 

910 

911 return out 

912 

913 def combine_gains(responses): 

914 non_scalars, scalars = split(responses, lambda resp: resp.is_scalar()) 

915 if scalars: 

916 factor = num.prod([resp.get_scalar() for resp in scalars]) 

917 yield Gain(constant=factor) 

918 

919 for resp in non_scalars: 

920 yield resp 

921 

922 return list(combine_gains(combine_pzs(unpack_multi(responses))))