1# http://pyrocko.org - GPLv3 

2# 

3# The Pyrocko Developers, 21st Century 

4# ---|P------/S----------~Lg---------- 

5 

6import time 

7import os 

8import copy 

9import logging 

10import tempfile 

11import importlib.util 

12from collections import defaultdict 

13try: 

14 import cPickle as pickle 

15except ImportError: 

16 import pickle 

17import os.path as op 

18from .base import Source, Constraint 

19from ..model import make_waveform_promise_nut, ehash, InvalidWaveform, \ 

20 order_summary, WaveformOrder, g_tmin, g_tmax, g_tmin_queries, \ 

21 codes_to_str_abbreviated, CodesNSLCE 

22from ..database import ExecuteGet1Error 

23from pyrocko.squirrel.error import SquirrelError 

24from pyrocko.client import fdsn 

25 

26from pyrocko import util, trace, io 

27from pyrocko.io.io_common import FileLoadError 

28from pyrocko.io import stationxml 

29from pyrocko.progress import progress 

30from pyrocko import has_paths 

31 

32from pyrocko.guts import Object, String, Timestamp, List, Tuple, Int, Dict, \ 

33 Duration, Bool, clone 

34 

35guts_prefix = 'squirrel' 

36 

37fdsn.g_timeout = 60. 

38 

39logger = logging.getLogger('psq.client.fdsn') 

40 

41sites_not_supporting = { 

42 'startbefore': ['geonet'], 

43 'includerestricted': ['geonet']} 

44 

45 

46def make_task(*args): 

47 return progress.task(*args, logger=logger) 

48 

49 

50def diff(fn_a, fn_b): 

51 try: 

52 if os.stat(fn_a).st_size != os.stat(fn_b).st_size: 

53 return True 

54 

55 except OSError: 

56 return True 

57 

58 with open(fn_a, 'rb') as fa: 

59 with open(fn_b, 'rb') as fb: 

60 while True: 

61 a = fa.read(1024) 

62 b = fb.read(1024) 

63 if a != b: 

64 return True 

65 

66 if len(a) == 0 or len(b) == 0: 

67 return False 

68 

69 

70def move_or_keep(fn_temp, fn): 

71 if op.exists(fn): 

72 if diff(fn, fn_temp): 

73 os.rename(fn_temp, fn) 

74 status = 'updated' 

75 else: 

76 os.unlink(fn_temp) 

77 status = 'upstream unchanged' 

78 

79 else: 

80 os.rename(fn_temp, fn) 

81 status = 'new' 

82 

83 return status 

84 

85 

86class Archive(Object): 

87 

88 def add(self): 

89 raise NotImplementedError() 

90 

91 

92class MSeedArchive(Archive): 

93 template = String.T(default=op.join( 

94 '%(tmin_year)s', 

95 '%(tmin_month)s', 

96 '%(tmin_day)s', 

97 'trace_%(network)s_%(station)s_%(location)s_%(channel)s' 

98 + '_%(block_tmin_us)s_%(block_tmax_us)s.mseed')) 

99 

100 def __init__(self, **kwargs): 

101 Archive.__init__(self, **kwargs) 

102 self._base_path = None 

103 

104 def set_base_path(self, path): 

105 self._base_path = path 

106 

107 def add(self, order, trs): 

108 path = op.join(self._base_path, self.template) 

109 fmt = '%Y-%m-%d_%H-%M-%S.6FRAC' 

110 return io.save(trs, path, overwrite=True, additional=dict( 

111 block_tmin_us=util.time_to_str(order.tmin, format=fmt), 

112 block_tmax_us=util.time_to_str(order.tmax, format=fmt))) 

113 

114 

115def combine_selections(selection): 

116 out = [] 

117 last = None 

118 for this in selection: 

119 if last and this[:4] == last[:4] and this[4] == last[5]: 

120 last = last[:5] + (this[5],) 

121 else: 

122 if last: 

123 out.append(last) 

124 

125 last = this 

126 

127 if last: 

128 out.append(last) 

129 

130 return out 

131 

132 

133def orders_sort_key(order): 

134 return (order.codes, order.tmin) 

135 

136 

137def orders_to_selection(orders, pad=1.0): 

138 selection = [] 

139 nslc_to_deltat = {} 

140 for order in sorted(orders, key=orders_sort_key): 

141 selection.append( 

142 order.codes.nslc + (order.tmin, order.tmax)) 

143 nslc_to_deltat[order.codes.nslc] = order.deltat 

144 

145 selection = combine_selections(selection) 

146 selection_padded = [] 

147 for (net, sta, loc, cha, tmin, tmax) in selection: 

148 deltat = nslc_to_deltat[net, sta, loc, cha] 

149 selection_padded.append(( 

150 net, sta, loc, cha, tmin-pad*deltat, tmax+pad*deltat)) 

151 

152 return selection_padded 

153 

154 

155class ErrorEntry(Object): 

156 time = Timestamp.T() 

157 order = WaveformOrder.T() 

158 kind = String.T() 

159 details = String.T(optional=True) 

160 

161 

162class ErrorAggregate(Object): 

163 site = String.T() 

164 kind = String.T() 

165 details = String.T() 

166 entries = List.T(ErrorEntry.T()) 

167 codes = List.T(CodesNSLCE.T()) 

168 time_spans = List.T(Tuple.T(2, Timestamp.T())) 

169 

170 def __str__(self): 

171 codes = [str(x) for x in self.codes] 

172 scodes = '\n' + util.ewrap(codes, indent=' ') if codes else '<none>' 

173 tss = self.time_spans 

174 sspans = '\n' + util.ewrap(('%s - %s' % ( 

175 util.time_to_str(ts[0]), util.time_to_str(ts[1])) for ts in tss), 

176 indent=' ') 

177 

178 return ('FDSN "%s": download error summary for "%s" (%i)\n%s ' 

179 'Codes:%s\n Time spans:%s') % ( 

180 self.site, 

181 self.kind, 

182 len(self.entries), 

183 ' Details: %s\n' % self.details if self.details else '', 

184 scodes, 

185 sspans) 

186 

187 

188class ErrorLog(Object): 

189 site = String.T() 

190 entries = List.T(ErrorEntry.T()) 

191 checkpoints = List.T(Int.T()) 

192 

193 def append_checkpoint(self): 

194 self.checkpoints.append(len(self.entries)) 

195 

196 def append(self, time, order, kind, details=''): 

197 entry = ErrorEntry(time=time, order=order, kind=kind, details=details) 

198 self.entries.append(entry) 

199 

200 def iter_aggregates(self): 

201 by_kind_details = defaultdict(list) 

202 for entry in self.entries: 

203 by_kind_details[entry.kind, entry.details].append(entry) 

204 

205 kind_details = sorted(by_kind_details.keys()) 

206 

207 for kind, details in kind_details: 

208 entries = by_kind_details[kind, details] 

209 codes = sorted(set(entry.order.codes for entry in entries)) 

210 selection = orders_to_selection(entry.order for entry in entries) 

211 time_spans = sorted(set(row[-2:] for row in selection)) 

212 yield ErrorAggregate( 

213 site=self.site, 

214 kind=kind, 

215 details=details, 

216 entries=entries, 

217 codes=codes, 

218 time_spans=time_spans) 

219 

220 def summarize_recent(self): 

221 ioff = self.checkpoints[-1] if self.checkpoints else 0 

222 recent = self.entries[ioff:] 

223 kinds = sorted(set(entry.kind for entry in recent)) 

224 if recent: 

225 return '%i error%s (%s)' % ( 

226 len(recent), util.plural_s(recent), '; '.join(kinds)) 

227 else: 

228 return '' 

229 

230 

231class Aborted(SquirrelError): 

232 pass 

233 

234 

235class FDSNSource(Source, has_paths.HasPaths): 

236 

237 ''' 

238 Squirrel data-source to transparently get data from FDSN web services. 

239 

240 Attaching an :py:class:`FDSNSource` object to a :py:class:`Squirrel` allows 

241 the latter to download station and waveform data from an FDSN web service 

242 should the data not already happen to be available locally. 

243 ''' 

244 

245 site = String.T( 

246 help='FDSN site url or alias name (see ' 

247 ':py:mod:`pyrocko.client.fdsn`).') 

248 

249 query_args = Dict.T( 

250 String.T(), String.T(), 

251 optional=True, 

252 help='Common query arguments, which are appended to all queries.') 

253 

254 expires = Duration.T( 

255 optional=True, 

256 help='Expiration time [s]. Information older than this will be ' 

257 'refreshed. This only applies to station-metadata. Waveforms do ' 

258 'not expire. If set to ``None`` neither type of data expires.') 

259 

260 cache_path = String.T( 

261 optional=True, 

262 help='Directory path where any downloaded waveforms and station ' 

263 'meta-data are to be kept. By default the Squirrel ' 

264 "environment's cache directory is used.") 

265 

266 shared_waveforms = Bool.T( 

267 default=False, 

268 help='If ``True``, waveforms are shared with other FDSN sources in ' 

269 'the same Squirrel environment. If ``False``, they are kept ' 

270 'separate.') 

271 

272 user_credentials = Tuple.T( 

273 2, String.T(), 

274 optional=True, 

275 help='User and password for FDSN servers requiring password ' 

276 'authentication') 

277 

278 auth_token = String.T( 

279 optional=True, 

280 help='Authentication token to be presented to the FDSN server.') 

281 

282 auth_token_path = String.T( 

283 optional=True, 

284 help='Path to file containing the authentication token to be ' 

285 'presented to the FDSN server.') 

286 

287 hotfix_module_path = has_paths.Path.T( 

288 optional=True, 

289 help='Path to Python module to locally patch metadata errors.') 

290 

291 def __init__(self, site, query_args=None, **kwargs): 

292 Source.__init__(self, site=site, query_args=query_args, **kwargs) 

293 

294 self._constraint = None 

295 self._hash = self.make_hash() 

296 self._source_id = 'client:fdsn:%s' % self._hash 

297 self._error_infos = [] 

298 

299 def describe(self): 

300 return self._source_id 

301 

302 def make_hash(self): 

303 s = self.site 

304 s += 'notoken' \ 

305 if (self.auth_token is None and self.auth_token_path is None) \ 

306 else 'token' 

307 

308 if self.user_credentials is not None: 

309 s += self.user_credentials[0] 

310 else: 

311 s += 'nocred' 

312 

313 if self.query_args is not None: 

314 s += ','.join( 

315 '%s:%s' % (k, self.query_args[k]) 

316 for k in sorted(self.query_args.keys())) 

317 else: 

318 s += 'noqueryargs' 

319 

320 return ehash(s) 

321 

322 def get_hash(self): 

323 return self._hash 

324 

325 def get_auth_token(self): 

326 if self.auth_token: 

327 return self.auth_token 

328 

329 elif self.auth_token_path is not None: 

330 try: 

331 with open(self.auth_token_path, 'rb') as f: 

332 return f.read().decode('ascii') 

333 

334 except OSError as e: 

335 raise FileLoadError( 

336 'Cannot load auth token file (%s): %s' 

337 % (str(e), self.auth_token_path)) 

338 

339 else: 

340 raise Exception( 

341 'FDSNSource: auth_token and auth_token_path are mutually ' 

342 'exclusive.') 

343 

344 def setup(self, squirrel, check=True): 

345 self._cache_path = op.join( 

346 self.cache_path or squirrel._cache_path, 'fdsn') 

347 

348 util.ensuredir(self._cache_path) 

349 self._load_constraint() 

350 self._archive = MSeedArchive() 

351 waveforms_path = self._get_waveforms_path() 

352 util.ensuredir(waveforms_path) 

353 self._archive.set_base_path(waveforms_path) 

354 

355 squirrel.add( 

356 self._get_waveforms_path(), 

357 check=check) 

358 

359 fn = self._get_channels_path() 

360 if os.path.exists(fn): 

361 squirrel.add(fn) 

362 

363 squirrel.add_virtual( 

364 [], virtual_paths=[self._source_id]) 

365 

366 responses_path = self._get_responses_path() 

367 if os.path.exists(responses_path): 

368 squirrel.add(responses_path, kinds=['response']) 

369 

370 self._hotfix_module = None 

371 

372 def _hotfix(self, query_type, sx): 

373 if self.hotfix_module_path is None: 

374 return 

375 

376 if self._hotfix_module is None: 

377 module_path = self.expand_path(self.hotfix_module_path) 

378 spec = importlib.util.spec_from_file_location( 

379 'hotfix_' + self._hash, module_path) 

380 self._hotfix_module = importlib.util.module_from_spec(spec) 

381 spec.loader.exec_module(self._hotfix_module) 

382 

383 hook = getattr( 

384 self._hotfix_module, 'stationxml_' + query_type + '_hook') 

385 

386 return hook(sx) 

387 

388 def _get_constraint_path(self): 

389 return op.join(self._cache_path, self._hash, 'constraint.pickle') 

390 

391 def _get_channels_path(self): 

392 return op.join(self._cache_path, self._hash, 'channels.stationxml') 

393 

394 def _get_responses_path(self, nslc=None): 

395 dirpath = op.join( 

396 self._cache_path, self._hash, 'responses') 

397 

398 if nslc is None: 

399 return dirpath 

400 else: 

401 return op.join( 

402 dirpath, 'response_%s_%s_%s_%s.stationxml' % nslc) 

403 

404 def _get_waveforms_path(self): 

405 if self.shared_waveforms: 

406 return op.join(self._cache_path, 'waveforms') 

407 else: 

408 return op.join(self._cache_path, self._hash, 'waveforms') 

409 

410 def _log_meta(self, message, target=logger.info): 

411 log_prefix = 'FDSN "%s" metadata:' % self.site 

412 target(' '.join((log_prefix, message))) 

413 

414 def _log_responses(self, message, target=logger.info): 

415 log_prefix = 'FDSN "%s" responses:' % self.site 

416 target(' '.join((log_prefix, message))) 

417 

418 def _log_info_data(self, *args): 

419 log_prefix = 'FDSN "%s" waveforms:' % self.site 

420 logger.info(' '.join((log_prefix,) + args)) 

421 

422 def _str_expires(self, t, now): 

423 if t is None: 

424 return 'expires: never' 

425 else: 

426 expire = 'expires' if t > now else 'expired' 

427 return '%s: %s' % ( 

428 expire, 

429 util.time_to_str(t, format='%Y-%m-%d %H:%M:%S')) 

430 

431 def update_channel_inventory(self, squirrel, constraint=None): 

432 if constraint is None: 

433 constraint = Constraint() 

434 

435 expiration_time = self._get_channels_expiration_time() 

436 now = time.time() 

437 

438 log_target = logger.info 

439 if self._constraint and self._constraint.contains(constraint) \ 

440 and (expiration_time is None or now < expiration_time): 

441 

442 status = 'using cached' 

443 

444 else: 

445 if self._constraint: 

446 constraint_temp = copy.deepcopy(self._constraint) 

447 constraint_temp.expand(constraint) 

448 constraint = constraint_temp 

449 

450 try: 

451 channel_sx = self._do_channel_query(constraint) 

452 

453 channel_sx.created = None # timestamp would ruin diff 

454 

455 fn = self._get_channels_path() 

456 util.ensuredirs(fn) 

457 fn_temp = fn + '.%i.temp' % os.getpid() 

458 channel_sx.dump_xml(filename=fn_temp) 

459 

460 status = move_or_keep(fn_temp, fn) 

461 

462 if status == 'upstream unchanged': 

463 squirrel.get_database().silent_touch(fn) 

464 

465 self._constraint = constraint 

466 self._dump_constraint() 

467 

468 except OSError as e: 

469 status = 'update failed (%s)' % str(e) 

470 log_target = logger.error 

471 

472 expiration_time = self._get_channels_expiration_time() 

473 self._log_meta( 

474 '%s (%s)' % (status, self._str_expires(expiration_time, now)), 

475 target=log_target) 

476 

477 fn = self._get_channels_path() 

478 if os.path.exists(fn): 

479 squirrel.add(fn) 

480 

481 def _do_channel_query(self, constraint): 

482 extra_args = {} 

483 

484 if self.site in sites_not_supporting['startbefore']: 

485 if constraint.tmin is not None and constraint.tmin != g_tmin: 

486 extra_args['starttime'] = constraint.tmin 

487 if constraint.tmax is not None and constraint.tmax != g_tmax: 

488 extra_args['endtime'] = constraint.tmax 

489 

490 else: 

491 if constraint.tmin is not None and constraint.tmin != g_tmin: 

492 extra_args['endafter'] = constraint.tmin 

493 if constraint.tmax is not None and constraint.tmax != g_tmax: 

494 extra_args['startbefore'] = constraint.tmax 

495 

496 if self.site not in sites_not_supporting['includerestricted']: 

497 extra_args.update( 

498 includerestricted=( 

499 self.user_credentials is not None 

500 or self.auth_token is not None 

501 or self.auth_token_path is not None)) 

502 

503 if self.query_args is not None: 

504 extra_args.update(self.query_args) 

505 

506 self._log_meta('querying...') 

507 

508 try: 

509 channel_sx = fdsn.station( 

510 site=self.site, 

511 format='text', 

512 level='channel', 

513 **extra_args) 

514 

515 self._hotfix('channel', channel_sx) 

516 

517 return channel_sx 

518 

519 except fdsn.EmptyResult: 

520 return stationxml.FDSNStationXML(source='dummy-empty-result') 

521 

522 def _load_constraint(self): 

523 fn = self._get_constraint_path() 

524 if op.exists(fn): 

525 with open(fn, 'rb') as f: 

526 self._constraint = pickle.load(f) 

527 else: 

528 self._constraint = None 

529 

530 def _dump_constraint(self): 

531 with open(self._get_constraint_path(), 'wb') as f: 

532 pickle.dump(self._constraint, f, protocol=2) 

533 

534 def _get_expiration_time(self, path): 

535 if self.expires is None: 

536 return None 

537 

538 try: 

539 t = os.stat(path)[8] 

540 return t + self.expires 

541 

542 except OSError: 

543 return 0.0 

544 

545 def _get_channels_expiration_time(self): 

546 return self._get_expiration_time(self._get_channels_path()) 

547 

548 def update_waveform_promises(self, squirrel, constraint): 

549 from ..base import gaps 

550 cpath = os.path.abspath(self._get_channels_path()) 

551 

552 ctmin = constraint.tmin 

553 ctmax = constraint.tmax 

554 

555 nuts = squirrel.iter_nuts( 

556 'channel', 

557 path=cpath, 

558 codes=constraint.codes, 

559 tmin=ctmin, 

560 tmax=ctmax) 

561 

562 coverages = squirrel.get_coverage( 

563 'waveform', 

564 codes=constraint.codes if constraint.codes else None, 

565 tmin=ctmin, 

566 tmax=ctmax) 

567 

568 codes_to_avail = defaultdict(list) 

569 for coverage in coverages: 

570 for tmin, tmax, _ in coverage.iter_spans(): 

571 codes_to_avail[coverage.codes].append((tmin, tmax)) 

572 

573 def sgaps(nut): 

574 for tmin, tmax in gaps( 

575 codes_to_avail[nut.codes], 

576 max(ctmin, nut.tmin) if ctmin is not None else nut.tmin, 

577 min(ctmax, nut.tmax) if ctmax is not None else nut.tmax): 

578 

579 subnut = clone(nut) 

580 subnut.tmin = tmin 

581 subnut.tmax = tmax 

582 

583 # ignore 1-sample gaps produced by rounding errors 

584 if subnut.tmax - subnut.tmin < 2*subnut.deltat: 

585 continue 

586 

587 yield subnut 

588 

589 def wanted(nuts): 

590 for nut in nuts: 

591 for nut in sgaps(nut): 

592 yield nut 

593 

594 path = self._source_id 

595 squirrel.add_virtual( 

596 (make_waveform_promise_nut( 

597 file_path=path, 

598 **nut.waveform_promise_kwargs) for nut in wanted(nuts)), 

599 virtual_paths=[path]) 

600 

601 def remove_waveform_promises(self, squirrel, from_database='selection'): 

602 ''' 

603 Remove waveform promises from live selection or global database. 

604 

605 :param from_database: 

606 Remove from live selection ``'selection'`` or global database 

607 ``'global'``. 

608 ''' 

609 

610 path = self._source_id 

611 if from_database == 'selection': 

612 squirrel.remove(path) 

613 elif from_database == 'global': 

614 squirrel.get_database().remove(path) 

615 else: 

616 raise ValueError( 

617 'Values allowed for from_database: ("selection", "global")') 

618 

619 def _get_user_credentials(self): 

620 d = {} 

621 if self.user_credentials is not None: 

622 d['user'], d['passwd'] = self.user_credentials 

623 

624 if self.auth_token is not None or self.auth_token_path is not None: 

625 d['token'] = self.get_auth_token() 

626 

627 return d 

628 

629 def download_waveforms( 

630 self, orders, success, batch_add, error_permanent, 

631 error_temporary): 

632 

633 elog = ErrorLog(site=self.site) 

634 orders.sort(key=orders_sort_key) 

635 neach = 20 

636 i = 0 

637 task = make_task( 

638 'FDSN "%s" waveforms: downloading' % self.site, len(orders)) 

639 

640 while i < len(orders): 

641 orders_now = orders[i:i+neach] 

642 selection_now = orders_to_selection(orders_now) 

643 nsamples_estimate = sum( 

644 order.estimate_nsamples() for order in orders_now) 

645 

646 nsuccess = 0 

647 elog.append_checkpoint() 

648 self._log_info_data( 

649 'downloading, %s' % order_summary(orders_now)) 

650 

651 all_paths = [] 

652 with tempfile.TemporaryDirectory() as tmpdir: 

653 try: 

654 data = fdsn.dataselect( 

655 site=self.site, selection=selection_now, 

656 **self._get_user_credentials()) 

657 

658 now = time.time() 

659 

660 path = op.join(tmpdir, 'tmp.mseed') 

661 with open(path, 'wb') as f: 

662 nread = 0 

663 while True: 

664 buf = data.read(1024) 

665 nread += len(buf) 

666 if not buf: 

667 break 

668 f.write(buf) 

669 

670 # abort if we get way more data than expected 

671 if nread > max( 

672 1024 * 1000, 

673 nsamples_estimate * 4 * 10): 

674 

675 raise Aborted('Too much data received.') 

676 

677 trs = io.load(path) 

678 

679 by_nslc = defaultdict(list) 

680 for tr in trs: 

681 by_nslc[tr.nslc_id].append(tr) 

682 

683 for order in orders_now: 

684 trs_order = [] 

685 err_this = None 

686 for tr in by_nslc[order.codes.nslc]: 

687 try: 

688 order.validate(tr) 

689 trs_order.append(tr.chop( 

690 order.tmin, order.tmax, inplace=False)) 

691 

692 except trace.NoData: 

693 err_this = ( 

694 'empty result', 'empty sub-interval') 

695 

696 except InvalidWaveform as e: 

697 err_this = ('invalid waveform', str(e)) 

698 

699 if len(trs_order) == 0: 

700 if err_this is None: 

701 err_this = ('empty result', '') 

702 

703 elog.append(now, order, *err_this) 

704 if order.is_near_real_time(): 

705 error_temporary(order) 

706 else: 

707 error_permanent(order) 

708 else: 

709 def tsame(ta, tb): 

710 return abs(tb - ta) < 2 * order.deltat 

711 

712 if len(trs_order) != 1 \ 

713 or not tsame( 

714 trs_order[0].tmin, order.tmin) \ 

715 or not tsame( 

716 trs_order[0].tmax, order.tmax): 

717 

718 if err_this: 

719 elog.append( 

720 now, order, 

721 'partial result, %s' % err_this[0], 

722 err_this[1]) 

723 else: 

724 elog.append(now, order, 'partial result') 

725 

726 paths = self._archive.add(order, trs_order) 

727 all_paths.extend(paths) 

728 

729 nsuccess += 1 

730 success(order, trs_order) 

731 

732 except fdsn.EmptyResult: 

733 now = time.time() 

734 for order in orders_now: 

735 elog.append(now, order, 'empty result') 

736 if order.is_near_real_time(): 

737 error_temporary(order) 

738 else: 

739 error_permanent(order) 

740 

741 except Aborted as e: 

742 now = time.time() 

743 for order in orders_now: 

744 elog.append(now, order, 'aborted', str(e)) 

745 error_permanent(order) 

746 

747 except util.HTTPError as e: 

748 now = time.time() 

749 for order in orders_now: 

750 elog.append(now, order, 'http error', str(e)) 

751 error_temporary(order) 

752 

753 emessage = elog.summarize_recent() 

754 

755 self._log_info_data( 

756 '%i download%s %ssuccessful' % ( 

757 nsuccess, 

758 util.plural_s(nsuccess), 

759 '(partially) ' if emessage else '') 

760 + (', %s' % emessage if emessage else '')) 

761 

762 if all_paths: 

763 batch_add(all_paths) 

764 

765 i += neach 

766 task.update(i) 

767 

768 for agg in elog.iter_aggregates(): 

769 logger.warning(str(agg)) 

770 

771 task.done() 

772 

773 def _do_response_query(self, selection): 

774 extra_args = {} 

775 

776 if self.site not in sites_not_supporting['includerestricted']: 

777 extra_args.update( 

778 includerestricted=( 

779 self.user_credentials is not None 

780 or self.auth_token is not None 

781 or self.auth_token_path is not None)) 

782 

783 self._log_responses('querying...') 

784 

785 try: 

786 response_sx = fdsn.station( 

787 site=self.site, 

788 level='response', 

789 selection=selection, 

790 **extra_args) 

791 

792 self._hotfix('response', response_sx) 

793 return response_sx 

794 

795 except fdsn.EmptyResult: 

796 return stationxml.FDSNStationXML(source='dummy-empty-result') 

797 

798 def update_response_inventory(self, squirrel, constraint): 

799 cpath = os.path.abspath(self._get_channels_path()) 

800 nuts = squirrel.iter_nuts( 

801 'channel', path=cpath, codes=constraint.codes) 

802 

803 tmin = g_tmin_queries 

804 tmax = g_tmax 

805 

806 selection = [] 

807 now = time.time() 

808 have = set() 

809 status = defaultdict(list) 

810 for nut in nuts: 

811 nslc = nut.codes.nslc 

812 if nslc in have: 

813 continue 

814 have.add(nslc) 

815 

816 fn = self._get_responses_path(nslc) 

817 expiration_time = self._get_expiration_time(fn) 

818 if os.path.exists(fn) \ 

819 and (expiration_time is None or now < expiration_time): 

820 status['using cached'].append(nslc) 

821 else: 

822 selection.append(nslc + (tmin, tmax)) 

823 

824 dummy = stationxml.FDSNStationXML(source='dummy-empty') 

825 neach = 100 

826 i = 0 

827 fns = [] 

828 while i < len(selection): 

829 selection_now = selection[i:i+neach] 

830 i += neach 

831 

832 try: 

833 sx = self._do_response_query(selection_now) 

834 except Exception as e: 

835 status['update failed (%s)' % str(e)].extend( 

836 entry[:4] for entry in selection_now) 

837 continue 

838 

839 sx.created = None # timestamp would ruin diff 

840 

841 by_nslc = dict(stationxml.split_channels(sx)) 

842 

843 for entry in selection_now: 

844 nslc = entry[:4] 

845 response_sx = by_nslc.get(nslc, dummy) 

846 try: 

847 fn = self._get_responses_path(nslc) 

848 fn_temp = fn + '.%i.temp' % os.getpid() 

849 

850 util.ensuredirs(fn_temp) 

851 response_sx.dump_xml(filename=fn_temp) 

852 

853 status_this = move_or_keep(fn_temp, fn) 

854 

855 if status_this == 'upstream unchanged': 

856 try: 

857 squirrel.get_database().silent_touch(fn) 

858 except ExecuteGet1Error: 

859 pass 

860 

861 status[status_this].append(nslc) 

862 fns.append(fn) 

863 

864 except OSError as e: 

865 status['update failed (%s)' % str(e)].append(nslc) 

866 

867 for k in sorted(status): 

868 if k.find('failed') != -1: 

869 log_target = logger.error 

870 else: 

871 log_target = logger.info 

872 

873 self._log_responses( 

874 '%s: %s' % ( 

875 k, codes_to_str_abbreviated( 

876 CodesNSLCE(tup) for tup in status[k])), 

877 target=log_target) 

878 

879 if fns: 

880 squirrel.add(fns, kinds=['response']) 

881 

882 

883__all__ = [ 

884 'FDSNSource', 

885]