1# http://pyrocko.org - GPLv3 

2# 

3# The Pyrocko Developers, 21st Century 

4# ---|P------/S----------~Lg---------- 

5 

6import time 

7import os 

8import copy 

9import logging 

10import tempfile 

11import importlib.util 

12from collections import defaultdict 

13try: 

14 import cPickle as pickle 

15except ImportError: 

16 import pickle 

17import os.path as op 

18from .base import Source, Constraint 

19from ..model import make_waveform_promise_nut, ehash, InvalidWaveform, \ 

20 order_summary, WaveformOrder, g_tmin, g_tmax, g_tmin_queries, \ 

21 codes_to_str_abbreviated, CodesNSLCE 

22from ..database import ExecuteGet1Error 

23from pyrocko.squirrel.error import SquirrelError 

24from pyrocko.client import fdsn 

25 

26from pyrocko import util, trace, io 

27from pyrocko.io.io_common import FileLoadError 

28from pyrocko.io import stationxml 

29from pyrocko.progress import progress 

30from pyrocko import has_paths 

31 

32from pyrocko.guts import Object, String, Timestamp, List, Tuple, Int, Dict, \ 

33 Duration, Bool, clone 

34 

35guts_prefix = 'squirrel' 

36 

37fdsn.g_timeout = 60. 

38 

39logger = logging.getLogger('psq.client.fdsn') 

40 

41sites_not_supporting = { 

42 'startbefore': ['geonet'], 

43 'includerestricted': ['geonet']} 

44 

45 

46def make_task(*args): 

47 return progress.task(*args, logger=logger) 

48 

49 

50def diff(fn_a, fn_b): 

51 try: 

52 if os.stat(fn_a).st_size != os.stat(fn_b).st_size: 

53 return True 

54 

55 except OSError: 

56 return True 

57 

58 with open(fn_a, 'rb') as fa: 

59 with open(fn_b, 'rb') as fb: 

60 while True: 

61 a = fa.read(1024) 

62 b = fb.read(1024) 

63 if a != b: 

64 return True 

65 

66 if len(a) == 0 or len(b) == 0: 

67 return False 

68 

69 

70def move_or_keep(fn_temp, fn): 

71 if op.exists(fn): 

72 if diff(fn, fn_temp): 

73 os.rename(fn_temp, fn) 

74 status = 'updated' 

75 else: 

76 os.unlink(fn_temp) 

77 status = 'upstream unchanged' 

78 

79 else: 

80 os.rename(fn_temp, fn) 

81 status = 'new' 

82 

83 return status 

84 

85 

86class Archive(Object): 

87 

88 def add(self): 

89 raise NotImplementedError() 

90 

91 

92class MSeedArchive(Archive): 

93 template = String.T(default=op.join( 

94 '%(tmin_year)s', 

95 '%(tmin_month)s', 

96 '%(tmin_day)s', 

97 'trace_%(network)s_%(station)s_%(location)s_%(channel)s' 

98 + '_%(block_tmin_us)s_%(block_tmax_us)s.mseed')) 

99 

100 def __init__(self, **kwargs): 

101 Archive.__init__(self, **kwargs) 

102 self._base_path = None 

103 

104 def set_base_path(self, path): 

105 self._base_path = path 

106 

107 def add(self, order, trs): 

108 path = op.join(self._base_path, self.template) 

109 fmt = '%Y-%m-%d_%H-%M-%S.6FRAC' 

110 return io.save(trs, path, overwrite=True, additional=dict( 

111 block_tmin_us=util.time_to_str(order.tmin, format=fmt), 

112 block_tmax_us=util.time_to_str(order.tmax, format=fmt))) 

113 

114 

115def combine_selections(selection): 

116 out = [] 

117 last = None 

118 for this in selection: 

119 if last and this[:4] == last[:4] and this[4] == last[5]: 

120 last = last[:5] + (this[5],) 

121 else: 

122 if last: 

123 out.append(last) 

124 

125 last = this 

126 

127 if last: 

128 out.append(last) 

129 

130 return out 

131 

132 

133def orders_sort_key(order): 

134 return (order.codes, order.tmin) 

135 

136 

137def orders_to_selection(orders, pad=1.0): 

138 selection = [] 

139 nslc_to_deltat = {} 

140 for order in sorted(orders, key=orders_sort_key): 

141 selection.append( 

142 order.codes.nslc + (order.tmin, order.tmax)) 

143 nslc_to_deltat[order.codes.nslc] = order.deltat 

144 

145 selection = combine_selections(selection) 

146 selection_padded = [] 

147 for (net, sta, loc, cha, tmin, tmax) in selection: 

148 deltat = nslc_to_deltat[net, sta, loc, cha] 

149 selection_padded.append(( 

150 net, sta, loc, cha, tmin-pad*deltat, tmax+pad*deltat)) 

151 

152 return selection_padded 

153 

154 

155class ErrorEntry(Object): 

156 time = Timestamp.T() 

157 order = WaveformOrder.T() 

158 kind = String.T() 

159 details = String.T(optional=True) 

160 

161 

162class ErrorAggregate(Object): 

163 site = String.T() 

164 kind = String.T() 

165 details = String.T() 

166 entries = List.T(ErrorEntry.T()) 

167 codes = List.T(CodesNSLCE.T()) 

168 time_spans = List.T(Tuple.T(2, Timestamp.T())) 

169 

170 def __str__(self): 

171 codes = [str(x) for x in self.codes] 

172 scodes = '\n' + util.ewrap(codes, indent=' ') if codes else '<none>' 

173 tss = self.time_spans 

174 sspans = '\n' + util.ewrap(('%s - %s' % ( 

175 util.time_to_str(ts[0]), util.time_to_str(ts[1])) for ts in tss), 

176 indent=' ') 

177 

178 return ('FDSN "%s": download error summary for "%s" (%i)\n%s ' 

179 'Codes:%s\n Time spans:%s') % ( 

180 self.site, 

181 self.kind, 

182 len(self.entries), 

183 ' Details: %s\n' % self.details if self.details else '', 

184 scodes, 

185 sspans) 

186 

187 

188class ErrorLog(Object): 

189 site = String.T() 

190 entries = List.T(ErrorEntry.T()) 

191 checkpoints = List.T(Int.T()) 

192 

193 def append_checkpoint(self): 

194 self.checkpoints.append(len(self.entries)) 

195 

196 def append(self, time, order, kind, details=''): 

197 entry = ErrorEntry(time=time, order=order, kind=kind, details=details) 

198 self.entries.append(entry) 

199 

200 def iter_aggregates(self): 

201 by_kind_details = defaultdict(list) 

202 for entry in self.entries: 

203 by_kind_details[entry.kind, entry.details].append(entry) 

204 

205 kind_details = sorted(by_kind_details.keys()) 

206 

207 for kind, details in kind_details: 

208 entries = by_kind_details[kind, details] 

209 codes = sorted(set(entry.order.codes for entry in entries)) 

210 selection = orders_to_selection(entry.order for entry in entries) 

211 time_spans = sorted(set(row[-2:] for row in selection)) 

212 yield ErrorAggregate( 

213 site=self.site, 

214 kind=kind, 

215 details=details, 

216 entries=entries, 

217 codes=codes, 

218 time_spans=time_spans) 

219 

220 def summarize_recent(self): 

221 ioff = self.checkpoints[-1] if self.checkpoints else 0 

222 recent = self.entries[ioff:] 

223 kinds = sorted(set(entry.kind for entry in recent)) 

224 if recent: 

225 return '%i error%s (%s)' % ( 

226 len(recent), util.plural_s(recent), '; '.join(kinds)) 

227 else: 

228 return '' 

229 

230 

231class Aborted(SquirrelError): 

232 pass 

233 

234 

235class FDSNSource(Source, has_paths.HasPaths): 

236 

237 ''' 

238 Squirrel data-source to transparently get data from FDSN web services. 

239 

240 Attaching an :py:class:`FDSNSource` object to a :py:class:`Squirrel` allows 

241 the latter to download station and waveform data from an FDSN web service 

242 should the data not already happen to be available locally. 

243 ''' 

244 

245 site = String.T( 

246 help='FDSN site url or alias name (see ' 

247 ':py:mod:`pyrocko.client.fdsn`).') 

248 

249 query_args = Dict.T( 

250 String.T(), String.T(), 

251 optional=True, 

252 help='Common query arguments, which are appended to all queries.') 

253 

254 expires = Duration.T( 

255 optional=True, 

256 help='Expiration time [s]. Information older than this will be ' 

257 'refreshed. This only applies to station-metadata. Waveforms do ' 

258 'not expire. If set to ``None`` neither type of data expires.') 

259 

260 cache_path = String.T( 

261 optional=True, 

262 help='Directory path where any downloaded waveforms and station ' 

263 'meta-data are to be kept. By default the Squirrel ' 

264 "environment's cache directory is used.") 

265 

266 shared_waveforms = Bool.T( 

267 default=False, 

268 help='If ``True``, waveforms are shared with other FDSN sources in ' 

269 'the same Squirrel environment. If ``False``, they are kept ' 

270 'separate.') 

271 

272 user_credentials = Tuple.T( 

273 2, String.T(), 

274 optional=True, 

275 help='User and password for FDSN servers requiring password ' 

276 'authentication') 

277 

278 auth_token = String.T( 

279 optional=True, 

280 help='Authentication token to be presented to the FDSN server.') 

281 

282 auth_token_path = String.T( 

283 optional=True, 

284 help='Path to file containing the authentication token to be ' 

285 'presented to the FDSN server.') 

286 

287 hotfix_module_path = has_paths.Path.T( 

288 optional=True, 

289 help='Path to Python module to locally patch metadata errors.') 

290 

291 def __init__(self, site, query_args=None, **kwargs): 

292 Source.__init__(self, site=site, query_args=query_args, **kwargs) 

293 

294 self._constraint = None 

295 self._hash = self.make_hash() 

296 self._source_id = 'client:fdsn:%s' % self._hash 

297 self._error_infos = [] 

298 

299 def describe(self): 

300 return self._source_id 

301 

302 def make_hash(self): 

303 s = self.site 

304 s += 'notoken' \ 

305 if (self.auth_token is None and self.auth_token_path is None) \ 

306 else 'token' 

307 

308 if self.user_credentials is not None: 

309 s += self.user_credentials[0] 

310 else: 

311 s += 'nocred' 

312 

313 if self.query_args is not None: 

314 s += ','.join( 

315 '%s:%s' % (k, self.query_args[k]) 

316 for k in sorted(self.query_args.keys())) 

317 else: 

318 s += 'noqueryargs' 

319 

320 return ehash(s) 

321 

322 def get_hash(self): 

323 return self._hash 

324 

325 def get_auth_token(self): 

326 if self.auth_token: 

327 return self.auth_token 

328 

329 elif self.auth_token_path is not None: 

330 try: 

331 with open(self.auth_token_path, 'rb') as f: 

332 return f.read().decode('ascii') 

333 

334 except OSError as e: 

335 raise FileLoadError( 

336 'Cannot load auth token file (%s): %s' 

337 % (str(e), self.auth_token_path)) 

338 

339 else: 

340 raise Exception( 

341 'FDSNSource: auth_token and auth_token_path are mutually ' 

342 'exclusive.') 

343 

344 def setup(self, squirrel, check=True): 

345 self._cache_path = op.join( 

346 self.cache_path or squirrel._cache_path, 'fdsn') 

347 

348 util.ensuredir(self._cache_path) 

349 self._load_constraint() 

350 self._archive = MSeedArchive() 

351 waveforms_path = self._get_waveforms_path() 

352 util.ensuredir(waveforms_path) 

353 self._archive.set_base_path(waveforms_path) 

354 

355 squirrel.add( 

356 self._get_waveforms_path(), 

357 check=check) 

358 

359 fn = self._get_channels_path() 

360 if os.path.exists(fn): 

361 squirrel.add(fn) 

362 

363 squirrel.add_virtual( 

364 [], virtual_paths=[self._source_id]) 

365 

366 responses_path = self._get_responses_path() 

367 if os.path.exists(responses_path): 

368 squirrel.add(responses_path, kinds=['response']) 

369 

370 self._hotfix_module = None 

371 

372 def _hotfix(self, query_type, sx): 

373 if self.hotfix_module_path is None: 

374 return 

375 

376 if self._hotfix_module is None: 

377 module_path = self.expand_path(self.hotfix_module_path) 

378 spec = importlib.util.spec_from_file_location( 

379 'hotfix_' + self._hash, module_path) 

380 self._hotfix_module = importlib.util.module_from_spec(spec) 

381 spec.loader.exec_module(self._hotfix_module) 

382 

383 hook = getattr( 

384 self._hotfix_module, 'stationxml_' + query_type + '_hook') 

385 

386 return hook(sx) 

387 

388 def _get_constraint_path(self): 

389 return op.join(self._cache_path, self._hash, 'constraint.pickle') 

390 

391 def _get_channels_path(self): 

392 return op.join(self._cache_path, self._hash, 'channels.stationxml') 

393 

394 def _get_responses_path(self, nslc=None): 

395 dirpath = op.join( 

396 self._cache_path, self._hash, 'responses') 

397 

398 if nslc is None: 

399 return dirpath 

400 else: 

401 return op.join( 

402 dirpath, 'response_%s_%s_%s_%s.stationxml' % nslc) 

403 

404 def _get_waveforms_path(self): 

405 if self.shared_waveforms: 

406 return op.join(self._cache_path, 'waveforms') 

407 else: 

408 return op.join(self._cache_path, self._hash, 'waveforms') 

409 

410 def _log_meta(self, message, target=logger.info): 

411 log_prefix = 'FDSN "%s" metadata:' % self.site 

412 target(' '.join((log_prefix, message))) 

413 

414 def _log_responses(self, message, target=logger.info): 

415 log_prefix = 'FDSN "%s" responses:' % self.site 

416 target(' '.join((log_prefix, message))) 

417 

418 def _log_info_data(self, *args): 

419 log_prefix = 'FDSN "%s" waveforms:' % self.site 

420 logger.info(' '.join((log_prefix,) + args)) 

421 

422 def _str_expires(self, t, now): 

423 if t is None: 

424 return 'expires: never' 

425 else: 

426 expire = 'expires' if t > now else 'expired' 

427 return '%s: %s' % ( 

428 expire, 

429 util.time_to_str(t, format='%Y-%m-%d %H:%M:%S')) 

430 

431 def update_channel_inventory(self, squirrel, constraint=None): 

432 if constraint is None: 

433 constraint = Constraint() 

434 

435 expiration_time = self._get_channels_expiration_time() 

436 now = time.time() 

437 

438 log_target = logger.info 

439 if self._constraint and self._constraint.contains(constraint) \ 

440 and (expiration_time is None or now < expiration_time): 

441 

442 status = 'using cached' 

443 

444 else: 

445 if self._constraint: 

446 constraint_temp = copy.deepcopy(self._constraint) 

447 constraint_temp.expand(constraint) 

448 constraint = constraint_temp 

449 

450 try: 

451 channel_sx = self._do_channel_query(constraint) 

452 

453 channel_sx.created = None # timestamp would ruin diff 

454 

455 fn = self._get_channels_path() 

456 util.ensuredirs(fn) 

457 fn_temp = fn + '.%i.temp' % os.getpid() 

458 channel_sx.dump_xml(filename=fn_temp) 

459 

460 status = move_or_keep(fn_temp, fn) 

461 

462 if status == 'upstream unchanged': 

463 squirrel.get_database().silent_touch(fn) 

464 

465 self._constraint = constraint 

466 self._dump_constraint() 

467 

468 except OSError as e: 

469 status = 'update failed (%s)' % str(e) 

470 log_target = logger.error 

471 

472 expiration_time = self._get_channels_expiration_time() 

473 self._log_meta( 

474 '%s (%s)' % (status, self._str_expires(expiration_time, now)), 

475 target=log_target) 

476 

477 fn = self._get_channels_path() 

478 if os.path.exists(fn): 

479 squirrel.add(fn) 

480 

481 def _do_channel_query(self, constraint): 

482 extra_args = {} 

483 

484 if self.site in sites_not_supporting['startbefore']: 

485 if constraint.tmin is not None and constraint.tmin != g_tmin: 

486 extra_args['starttime'] = constraint.tmin 

487 if constraint.tmax is not None and constraint.tmax != g_tmax: 

488 extra_args['endtime'] = constraint.tmax 

489 

490 else: 

491 if constraint.tmin is not None and constraint.tmin != g_tmin: 

492 extra_args['endafter'] = constraint.tmin 

493 if constraint.tmax is not None and constraint.tmax != g_tmax: 

494 extra_args['startbefore'] = constraint.tmax 

495 

496 if self.site not in sites_not_supporting['includerestricted']: 

497 extra_args.update( 

498 includerestricted=( 

499 self.user_credentials is not None 

500 or self.auth_token is not None 

501 or self.auth_token_path is not None)) 

502 

503 if self.query_args is not None: 

504 extra_args.update(self.query_args) 

505 

506 self._log_meta('querying...') 

507 

508 try: 

509 channel_sx = fdsn.station( 

510 site=self.site, 

511 format='text', 

512 level='channel', 

513 **extra_args) 

514 

515 self._hotfix('channel', channel_sx) 

516 

517 return channel_sx 

518 

519 except fdsn.EmptyResult: 

520 return stationxml.FDSNStationXML(source='dummy-empty-result') 

521 

522 def _load_constraint(self): 

523 fn = self._get_constraint_path() 

524 if op.exists(fn): 

525 with open(fn, 'rb') as f: 

526 self._constraint = pickle.load(f) 

527 else: 

528 self._constraint = None 

529 

530 def _dump_constraint(self): 

531 with open(self._get_constraint_path(), 'wb') as f: 

532 pickle.dump(self._constraint, f, protocol=2) 

533 

534 def _get_expiration_time(self, path): 

535 if self.expires is None: 

536 return None 

537 

538 try: 

539 t = os.stat(path)[8] 

540 return t + self.expires 

541 

542 except OSError: 

543 return 0.0 

544 

545 def _get_channels_expiration_time(self): 

546 return self._get_expiration_time(self._get_channels_path()) 

547 

548 def update_waveform_promises(self, squirrel, constraint): 

549 from ..base import gaps 

550 now = time.time() 

551 cpath = os.path.abspath(self._get_channels_path()) 

552 

553 ctmin = constraint.tmin 

554 ctmax = constraint.tmax 

555 

556 nuts = squirrel.iter_nuts( 

557 'channel', 

558 path=cpath, 

559 codes=constraint.codes, 

560 tmin=ctmin, 

561 tmax=ctmax) 

562 

563 coverages = squirrel.get_coverage( 

564 'waveform', 

565 codes=constraint.codes if constraint.codes else None, 

566 tmin=ctmin, 

567 tmax=ctmax) 

568 

569 codes_to_avail = defaultdict(list) 

570 for coverage in coverages: 

571 for tmin, tmax, _ in coverage.iter_spans(): 

572 codes_to_avail[coverage.codes].append((tmin, tmax)) 

573 

574 def sgaps(nut): 

575 for tmin, tmax in gaps( 

576 codes_to_avail[nut.codes], 

577 max(ctmin, nut.tmin) if ctmin is not None else nut.tmin, 

578 min(ctmax, nut.tmax) if ctmax is not None else nut.tmax): 

579 

580 subnut = clone(nut) 

581 subnut.tmin = tmin 

582 subnut.tmax = tmax 

583 

584 # ignore 1-sample gaps produced by rounding errors 

585 if subnut.tmax - subnut.tmin < 2*subnut.deltat: 

586 continue 

587 

588 yield subnut 

589 

590 def wanted(nuts): 

591 for nut in nuts: 

592 if nut.tmin < now: 

593 if nut.tmax > now: 

594 nut.tmax = now 

595 

596 for nut in sgaps(nut): 

597 yield nut 

598 

599 path = self._source_id 

600 squirrel.add_virtual( 

601 (make_waveform_promise_nut( 

602 file_path=path, 

603 **nut.waveform_promise_kwargs) for nut in wanted(nuts)), 

604 virtual_paths=[path]) 

605 

606 def remove_waveform_promises(self, squirrel, from_database='selection'): 

607 ''' 

608 Remove waveform promises from live selection or global database. 

609 

610 :param from_database: 

611 Remove from live selection ``'selection'`` or global database 

612 ``'global'``. 

613 ''' 

614 

615 path = self._source_id 

616 if from_database == 'selection': 

617 squirrel.remove(path) 

618 elif from_database == 'global': 

619 squirrel.get_database().remove(path) 

620 else: 

621 raise ValueError( 

622 'Values allowed for from_database: ("selection", "global")') 

623 

624 def _get_user_credentials(self): 

625 d = {} 

626 if self.user_credentials is not None: 

627 d['user'], d['passwd'] = self.user_credentials 

628 

629 if self.auth_token is not None or self.auth_token_path is not None: 

630 d['token'] = self.get_auth_token() 

631 

632 return d 

633 

634 def download_waveforms( 

635 self, orders, success, batch_add, error_permanent, 

636 error_temporary): 

637 

638 elog = ErrorLog(site=self.site) 

639 orders.sort(key=orders_sort_key) 

640 neach = 20 

641 i = 0 

642 task = make_task( 

643 'FDSN "%s" waveforms: downloading' % self.site, len(orders)) 

644 

645 while i < len(orders): 

646 orders_now = orders[i:i+neach] 

647 selection_now = orders_to_selection(orders_now) 

648 nsamples_estimate = sum( 

649 order.estimate_nsamples() for order in orders_now) 

650 

651 nsuccess = 0 

652 elog.append_checkpoint() 

653 self._log_info_data( 

654 'downloading, %s' % order_summary(orders_now)) 

655 

656 all_paths = [] 

657 with tempfile.TemporaryDirectory() as tmpdir: 

658 try: 

659 data = fdsn.dataselect( 

660 site=self.site, selection=selection_now, 

661 **self._get_user_credentials()) 

662 

663 now = time.time() 

664 

665 path = op.join(tmpdir, 'tmp.mseed') 

666 with open(path, 'wb') as f: 

667 nread = 0 

668 while True: 

669 buf = data.read(1024) 

670 nread += len(buf) 

671 if not buf: 

672 break 

673 f.write(buf) 

674 

675 # abort if we get way more data than expected 

676 if nread > max( 

677 1024 * 1000, 

678 nsamples_estimate * 4 * 10): 

679 

680 raise Aborted('Too much data received.') 

681 

682 trs = io.load(path) 

683 

684 by_nslc = defaultdict(list) 

685 for tr in trs: 

686 by_nslc[tr.nslc_id].append(tr) 

687 

688 for order in orders_now: 

689 trs_order = [] 

690 err_this = None 

691 for tr in by_nslc[order.codes.nslc]: 

692 try: 

693 order.validate(tr) 

694 trs_order.append(tr.chop( 

695 order.tmin, order.tmax, inplace=False)) 

696 

697 except trace.NoData: 

698 err_this = ( 

699 'empty result', 'empty sub-interval') 

700 

701 except InvalidWaveform as e: 

702 err_this = ('invalid waveform', str(e)) 

703 

704 if len(trs_order) == 0: 

705 if err_this is None: 

706 err_this = ('empty result', '') 

707 

708 elog.append(now, order, *err_this) 

709 error_permanent(order) 

710 else: 

711 def tsame(ta, tb): 

712 return abs(tb - ta) < 2 * order.deltat 

713 

714 if len(trs_order) != 1 \ 

715 or not tsame( 

716 trs_order[0].tmin, order.tmin) \ 

717 or not tsame( 

718 trs_order[0].tmax, order.tmax): 

719 

720 if err_this: 

721 elog.append( 

722 now, order, 

723 'partial result, %s' % err_this[0], 

724 err_this[1]) 

725 else: 

726 elog.append(now, order, 'partial result') 

727 

728 paths = self._archive.add(order, trs_order) 

729 all_paths.extend(paths) 

730 

731 nsuccess += 1 

732 success(order) 

733 

734 except fdsn.EmptyResult: 

735 now = time.time() 

736 for order in orders_now: 

737 elog.append(now, order, 'empty result') 

738 error_permanent(order) 

739 

740 except Aborted as e: 

741 now = time.time() 

742 for order in orders_now: 

743 elog.append(now, order, 'aborted', str(e)) 

744 error_permanent(order) 

745 

746 except util.HTTPError as e: 

747 now = time.time() 

748 for order in orders_now: 

749 elog.append(now, order, 'http error', str(e)) 

750 error_temporary(order) 

751 

752 emessage = elog.summarize_recent() 

753 

754 self._log_info_data( 

755 '%i download%s %ssuccessful' % ( 

756 nsuccess, 

757 util.plural_s(nsuccess), 

758 '(partially) ' if emessage else '') 

759 + (', %s' % emessage if emessage else '')) 

760 

761 if all_paths: 

762 batch_add(all_paths) 

763 

764 i += neach 

765 task.update(i) 

766 

767 for agg in elog.iter_aggregates(): 

768 logger.warning(str(agg)) 

769 

770 task.done() 

771 

772 def _do_response_query(self, selection): 

773 extra_args = {} 

774 

775 if self.site not in sites_not_supporting['includerestricted']: 

776 extra_args.update( 

777 includerestricted=( 

778 self.user_credentials is not None 

779 or self.auth_token is not None 

780 or self.auth_token_path is not None)) 

781 

782 self._log_responses('querying...') 

783 

784 try: 

785 response_sx = fdsn.station( 

786 site=self.site, 

787 level='response', 

788 selection=selection, 

789 **extra_args) 

790 

791 self._hotfix('response', response_sx) 

792 return response_sx 

793 

794 except fdsn.EmptyResult: 

795 return stationxml.FDSNStationXML(source='dummy-empty-result') 

796 

797 def update_response_inventory(self, squirrel, constraint): 

798 cpath = os.path.abspath(self._get_channels_path()) 

799 nuts = squirrel.iter_nuts( 

800 'channel', path=cpath, codes=constraint.codes) 

801 

802 tmin = g_tmin_queries 

803 tmax = g_tmax 

804 

805 selection = [] 

806 now = time.time() 

807 have = set() 

808 status = defaultdict(list) 

809 for nut in nuts: 

810 nslc = nut.codes.nslc 

811 if nslc in have: 

812 continue 

813 have.add(nslc) 

814 

815 fn = self._get_responses_path(nslc) 

816 expiration_time = self._get_expiration_time(fn) 

817 if os.path.exists(fn) \ 

818 and (expiration_time is None or now < expiration_time): 

819 status['using cached'].append(nslc) 

820 else: 

821 selection.append(nslc + (tmin, tmax)) 

822 

823 dummy = stationxml.FDSNStationXML(source='dummy-empty') 

824 neach = 100 

825 i = 0 

826 fns = [] 

827 while i < len(selection): 

828 selection_now = selection[i:i+neach] 

829 i += neach 

830 

831 try: 

832 sx = self._do_response_query(selection_now) 

833 except Exception as e: 

834 status['update failed (%s)' % str(e)].extend( 

835 entry[:4] for entry in selection_now) 

836 continue 

837 

838 sx.created = None # timestamp would ruin diff 

839 

840 by_nslc = dict(stationxml.split_channels(sx)) 

841 

842 for entry in selection_now: 

843 nslc = entry[:4] 

844 response_sx = by_nslc.get(nslc, dummy) 

845 try: 

846 fn = self._get_responses_path(nslc) 

847 fn_temp = fn + '.%i.temp' % os.getpid() 

848 

849 util.ensuredirs(fn_temp) 

850 response_sx.dump_xml(filename=fn_temp) 

851 

852 status_this = move_or_keep(fn_temp, fn) 

853 

854 if status_this == 'upstream unchanged': 

855 try: 

856 squirrel.get_database().silent_touch(fn) 

857 except ExecuteGet1Error: 

858 pass 

859 

860 status[status_this].append(nslc) 

861 fns.append(fn) 

862 

863 except OSError as e: 

864 status['update failed (%s)' % str(e)].append(nslc) 

865 

866 for k in sorted(status): 

867 if k.find('failed') != -1: 

868 log_target = logger.error 

869 else: 

870 log_target = logger.info 

871 

872 self._log_responses( 

873 '%s: %s' % ( 

874 k, codes_to_str_abbreviated( 

875 CodesNSLCE(tup) for tup in status[k])), 

876 target=log_target) 

877 

878 if fns: 

879 squirrel.add(fns, kinds=['response']) 

880 

881 

882__all__ = [ 

883 'FDSNSource', 

884]