Coverage for /usr/local/lib/python3.11/dist-packages/pyrocko/squirrel/client/fdsn.py: 84%

492 statements  

« prev     ^ index     » next       coverage.py v6.5.0, created at 2023-10-25 15:33 +0000

1# http://pyrocko.org - GPLv3 

2# 

3# The Pyrocko Developers, 21st Century 

4# ---|P------/S----------~Lg---------- 

5 

6''' 

7Squirrel client to access FDSN web services for seismic waveforms and metadata. 

8''' 

9 

10import time 

11import os 

12import copy 

13import logging 

14import tempfile 

15import importlib.util 

16from collections import defaultdict 

17try: 

18 import cPickle as pickle 

19except ImportError: 

20 import pickle 

21import os.path as op 

22from .base import Source, Constraint 

23from ..model import make_waveform_promise_nut, ehash, InvalidWaveform, \ 

24 order_summary, WaveformOrder, g_tmin, g_tmax, g_tmin_queries, \ 

25 codes_to_str_abbreviated, CodesNSLCE 

26from ..database import ExecuteGet1Error 

27from pyrocko.squirrel.error import SquirrelError 

28from pyrocko.client import fdsn 

29 

30from pyrocko import util, trace, io 

31from pyrocko.io.io_common import FileLoadError 

32from pyrocko.io import stationxml 

33from pyrocko.progress import progress 

34from pyrocko import has_paths 

35 

36from pyrocko.guts import Object, String, Timestamp, List, Tuple, Int, Dict, \ 

37 Duration, Bool, clone 

38 

39guts_prefix = 'squirrel' 

40 

41fdsn.g_timeout = 60. 

42 

43logger = logging.getLogger('psq.client.fdsn') 

44 

45sites_not_supporting = { 

46 'startbefore': ['geonet'], 

47 'includerestricted': ['geonet']} 

48 

49 

50def make_task(*args): 

51 return progress.task(*args, logger=logger) 

52 

53 

54def diff(fn_a, fn_b): 

55 try: 

56 if os.stat(fn_a).st_size != os.stat(fn_b).st_size: 

57 return True 

58 

59 except OSError: 

60 return True 

61 

62 with open(fn_a, 'rb') as fa: 

63 with open(fn_b, 'rb') as fb: 

64 while True: 

65 a = fa.read(1024) 

66 b = fb.read(1024) 

67 if a != b: 

68 return True 

69 

70 if len(a) == 0 or len(b) == 0: 

71 return False 

72 

73 

74def move_or_keep(fn_temp, fn): 

75 if op.exists(fn): 

76 if diff(fn, fn_temp): 

77 os.rename(fn_temp, fn) 

78 status = 'updated' 

79 else: 

80 os.unlink(fn_temp) 

81 status = 'upstream unchanged' 

82 

83 else: 

84 os.rename(fn_temp, fn) 

85 status = 'new' 

86 

87 return status 

88 

89 

90class Archive(Object): 

91 

92 def add(self): 

93 raise NotImplementedError() 

94 

95 

96class MSeedArchive(Archive): 

97 template = String.T(default=op.join( 

98 '%(tmin_year)s', 

99 '%(tmin_month)s', 

100 '%(tmin_day)s', 

101 'trace_%(network)s_%(station)s_%(location)s_%(channel)s' 

102 + '_%(block_tmin_us)s_%(block_tmax_us)s.mseed')) 

103 

104 def __init__(self, **kwargs): 

105 Archive.__init__(self, **kwargs) 

106 self._base_path = None 

107 

108 def set_base_path(self, path): 

109 self._base_path = path 

110 

111 def add(self, order, trs): 

112 path = op.join(self._base_path, self.template) 

113 fmt = '%Y-%m-%d_%H-%M-%S.6FRAC' 

114 return io.save(trs, path, overwrite=True, additional=dict( 

115 block_tmin_us=util.time_to_str(order.tmin, format=fmt), 

116 block_tmax_us=util.time_to_str(order.tmax, format=fmt))) 

117 

118 

119def combine_selections(selection): 

120 out = [] 

121 last = None 

122 for this in selection: 

123 if last and this[:4] == last[:4] and this[4] == last[5]: 

124 last = last[:5] + (this[5],) 

125 else: 

126 if last: 

127 out.append(last) 

128 

129 last = this 

130 

131 if last: 

132 out.append(last) 

133 

134 return out 

135 

136 

137def orders_sort_key(order): 

138 return (order.codes, order.tmin) 

139 

140 

141def orders_to_selection(orders, pad=1.0): 

142 selection = [] 

143 nslc_to_deltat = {} 

144 for order in sorted(orders, key=orders_sort_key): 

145 selection.append( 

146 order.codes.nslc + (order.tmin, order.tmax)) 

147 nslc_to_deltat[order.codes.nslc] = order.deltat 

148 

149 selection = combine_selections(selection) 

150 selection_padded = [] 

151 for (net, sta, loc, cha, tmin, tmax) in selection: 

152 deltat = nslc_to_deltat[net, sta, loc, cha] 

153 selection_padded.append(( 

154 net, sta, loc, cha, tmin-pad*deltat, tmax+pad*deltat)) 

155 

156 return selection_padded 

157 

158 

159class ErrorEntry(Object): 

160 time = Timestamp.T() 

161 order = WaveformOrder.T() 

162 kind = String.T() 

163 details = String.T(optional=True) 

164 

165 

166class ErrorAggregate(Object): 

167 site = String.T() 

168 kind = String.T() 

169 details = String.T() 

170 entries = List.T(ErrorEntry.T()) 

171 codes = List.T(CodesNSLCE.T()) 

172 time_spans = List.T(Tuple.T(2, Timestamp.T())) 

173 

174 def __str__(self): 

175 codes = [str(x) for x in self.codes] 

176 scodes = '\n' + util.ewrap(codes, indent=' ') if codes else '<none>' 

177 tss = self.time_spans 

178 sspans = '\n' + util.ewrap(('%s - %s' % ( 

179 util.time_to_str(ts[0]), util.time_to_str(ts[1])) for ts in tss), 

180 indent=' ') 

181 

182 return ('FDSN "%s": download error summary for "%s" (%i)\n%s ' 

183 'Codes:%s\n Time spans:%s') % ( 

184 self.site, 

185 self.kind, 

186 len(self.entries), 

187 ' Details: %s\n' % self.details if self.details else '', 

188 scodes, 

189 sspans) 

190 

191 

192class ErrorLog(Object): 

193 site = String.T() 

194 entries = List.T(ErrorEntry.T()) 

195 checkpoints = List.T(Int.T()) 

196 

197 def append_checkpoint(self): 

198 self.checkpoints.append(len(self.entries)) 

199 

200 def append(self, time, order, kind, details=''): 

201 entry = ErrorEntry(time=time, order=order, kind=kind, details=details) 

202 self.entries.append(entry) 

203 

204 def iter_aggregates(self): 

205 by_kind_details = defaultdict(list) 

206 for entry in self.entries: 

207 by_kind_details[entry.kind, entry.details].append(entry) 

208 

209 kind_details = sorted(by_kind_details.keys()) 

210 

211 for kind, details in kind_details: 

212 entries = by_kind_details[kind, details] 

213 codes = sorted(set(entry.order.codes for entry in entries)) 

214 selection = orders_to_selection(entry.order for entry in entries) 

215 time_spans = sorted(set(row[-2:] for row in selection)) 

216 yield ErrorAggregate( 

217 site=self.site, 

218 kind=kind, 

219 details=details, 

220 entries=entries, 

221 codes=codes, 

222 time_spans=time_spans) 

223 

224 def summarize_recent(self): 

225 ioff = self.checkpoints[-1] if self.checkpoints else 0 

226 recent = self.entries[ioff:] 

227 kinds = sorted(set(entry.kind for entry in recent)) 

228 if recent: 

229 return '%i error%s (%s)' % ( 

230 len(recent), util.plural_s(recent), '; '.join(kinds)) 

231 else: 

232 return '' 

233 

234 

235class Aborted(SquirrelError): 

236 pass 

237 

238 

239class FDSNSource(Source, has_paths.HasPaths): 

240 

241 ''' 

242 Squirrel data-source to transparently get data from FDSN web services. 

243 

244 Attaching an :py:class:`FDSNSource` object to a 

245 :py:class:`~pyrocko.squirrel.base.Squirrel` allows the latter to download 

246 station and waveform data from an FDSN web service should the data not 

247 already happen to be available locally. 

248 ''' 

249 

250 site = String.T( 

251 help='FDSN site url or alias name (see ' 

252 ':py:mod:`pyrocko.client.fdsn`).') 

253 

254 query_args = Dict.T( 

255 String.T(), String.T(), 

256 optional=True, 

257 help='Common query arguments, which are appended to all queries.') 

258 

259 expires = Duration.T( 

260 optional=True, 

261 help='Expiration time [s]. Information older than this will be ' 

262 'refreshed. This only applies to station-metadata. Waveforms do ' 

263 'not expire. If set to ``None`` neither type of data expires.') 

264 

265 cache_path = String.T( 

266 optional=True, 

267 help='Directory path where any downloaded waveforms and station ' 

268 'meta-data are to be kept. By default the Squirrel ' 

269 "environment's cache directory is used.") 

270 

271 shared_waveforms = Bool.T( 

272 default=False, 

273 help='If ``True``, waveforms are shared with other FDSN sources in ' 

274 'the same Squirrel environment. If ``False``, they are kept ' 

275 'separate.') 

276 

277 user_credentials = Tuple.T( 

278 2, String.T(), 

279 optional=True, 

280 help='User and password for FDSN servers requiring password ' 

281 'authentication') 

282 

283 auth_token = String.T( 

284 optional=True, 

285 help='Authentication token to be presented to the FDSN server.') 

286 

287 auth_token_path = String.T( 

288 optional=True, 

289 help='Path to file containing the authentication token to be ' 

290 'presented to the FDSN server.') 

291 

292 hotfix_module_path = has_paths.Path.T( 

293 optional=True, 

294 help='Path to Python module to locally patch metadata errors.') 

295 

296 def __init__(self, site, query_args=None, **kwargs): 

297 Source.__init__(self, site=site, query_args=query_args, **kwargs) 

298 

299 self._constraint = None 

300 self._hash = self.make_hash() 

301 self._source_id = 'client:fdsn:%s' % self._hash 

302 self._error_infos = [] 

303 

304 def describe(self): 

305 return self._source_id 

306 

307 def make_hash(self): 

308 s = self.site 

309 s += 'notoken' \ 

310 if (self.auth_token is None and self.auth_token_path is None) \ 

311 else 'token' 

312 

313 if self.user_credentials is not None: 

314 s += self.user_credentials[0] 

315 else: 

316 s += 'nocred' 

317 

318 if self.query_args is not None: 

319 s += ','.join( 

320 '%s:%s' % (k, self.query_args[k]) 

321 for k in sorted(self.query_args.keys())) 

322 else: 

323 s += 'noqueryargs' 

324 

325 return ehash(s) 

326 

327 def get_hash(self): 

328 return self._hash 

329 

330 def get_auth_token(self): 

331 if self.auth_token: 

332 return self.auth_token 

333 

334 elif self.auth_token_path is not None: 

335 try: 

336 with open(self.auth_token_path, 'rb') as f: 

337 return f.read().decode('ascii') 

338 

339 except OSError as e: 

340 raise FileLoadError( 

341 'Cannot load auth token file (%s): %s' 

342 % (str(e), self.auth_token_path)) 

343 

344 else: 

345 raise Exception( 

346 'FDSNSource: auth_token and auth_token_path are mutually ' 

347 'exclusive.') 

348 

349 def setup(self, squirrel, check=True): 

350 self._cache_path = op.join( 

351 self.cache_path or squirrel._cache_path, 'fdsn') 

352 

353 util.ensuredir(self._cache_path) 

354 self._load_constraint() 

355 self._archive = MSeedArchive() 

356 waveforms_path = self._get_waveforms_path() 

357 util.ensuredir(waveforms_path) 

358 self._archive.set_base_path(waveforms_path) 

359 

360 squirrel.add( 

361 self._get_waveforms_path(), 

362 check=check) 

363 

364 fn = self._get_channels_path() 

365 if os.path.exists(fn): 

366 squirrel.add(fn) 

367 

368 squirrel.add_virtual( 

369 [], virtual_paths=[self._source_id]) 

370 

371 responses_path = self._get_responses_path() 

372 if os.path.exists(responses_path): 

373 squirrel.add(responses_path, kinds=['response']) 

374 

375 self._hotfix_module = None 

376 

377 def _hotfix(self, query_type, sx): 

378 if self.hotfix_module_path is None: 

379 return 

380 

381 if self._hotfix_module is None: 

382 module_path = self.expand_path(self.hotfix_module_path) 

383 spec = importlib.util.spec_from_file_location( 

384 'hotfix_' + self._hash, module_path) 

385 self._hotfix_module = importlib.util.module_from_spec(spec) 

386 spec.loader.exec_module(self._hotfix_module) 

387 

388 hook = getattr( 

389 self._hotfix_module, 'stationxml_' + query_type + '_hook') 

390 

391 return hook(sx) 

392 

393 def _get_constraint_path(self): 

394 return op.join(self._cache_path, self._hash, 'constraint.pickle') 

395 

396 def _get_channels_path(self): 

397 return op.join(self._cache_path, self._hash, 'channels.stationxml') 

398 

399 def _get_responses_path(self, nslc=None): 

400 dirpath = op.join( 

401 self._cache_path, self._hash, 'responses') 

402 

403 if nslc is None: 

404 return dirpath 

405 else: 

406 return op.join( 

407 dirpath, 'response_%s_%s_%s_%s.stationxml' % nslc) 

408 

409 def _get_waveforms_path(self): 

410 if self.shared_waveforms: 

411 return op.join(self._cache_path, 'waveforms') 

412 else: 

413 return op.join(self._cache_path, self._hash, 'waveforms') 

414 

415 def _log_meta(self, message, target=logger.info): 

416 log_prefix = 'FDSN "%s" metadata:' % self.site 

417 target(' '.join((log_prefix, message))) 

418 

419 def _log_responses(self, message, target=logger.info): 

420 log_prefix = 'FDSN "%s" responses:' % self.site 

421 target(' '.join((log_prefix, message))) 

422 

423 def _log_info_data(self, *args): 

424 log_prefix = 'FDSN "%s" waveforms:' % self.site 

425 logger.info(' '.join((log_prefix,) + args)) 

426 

427 def _str_expires(self, t, now): 

428 if t is None: 

429 return 'expires: never' 

430 else: 

431 expire = 'expires' if t > now else 'expired' 

432 return '%s: %s' % ( 

433 expire, 

434 util.time_to_str(t, format='%Y-%m-%d %H:%M:%S')) 

435 

436 def update_channel_inventory(self, squirrel, constraint=None): 

437 if constraint is None: 

438 constraint = Constraint() 

439 

440 expiration_time = self._get_channels_expiration_time() 

441 now = time.time() 

442 

443 log_target = logger.info 

444 if self._constraint and self._constraint.contains(constraint) \ 

445 and (expiration_time is None or now < expiration_time): 

446 

447 status = 'using cached' 

448 

449 else: 

450 if self._constraint: 

451 constraint_temp = copy.deepcopy(self._constraint) 

452 constraint_temp.expand(constraint) 

453 constraint = constraint_temp 

454 

455 try: 

456 channel_sx = self._do_channel_query(constraint) 

457 

458 channel_sx.created = None # timestamp would ruin diff 

459 

460 fn = self._get_channels_path() 

461 util.ensuredirs(fn) 

462 fn_temp = fn + '.%i.temp' % os.getpid() 

463 channel_sx.dump_xml(filename=fn_temp) 

464 

465 status = move_or_keep(fn_temp, fn) 

466 

467 if status == 'upstream unchanged': 

468 squirrel.get_database().silent_touch(fn) 

469 

470 self._constraint = constraint 

471 self._dump_constraint() 

472 

473 except OSError as e: 

474 status = 'update failed (%s)' % str(e) 

475 log_target = logger.error 

476 

477 expiration_time = self._get_channels_expiration_time() 

478 self._log_meta( 

479 '%s (%s)' % (status, self._str_expires(expiration_time, now)), 

480 target=log_target) 

481 

482 fn = self._get_channels_path() 

483 if os.path.exists(fn): 

484 squirrel.add(fn) 

485 

486 def _do_channel_query(self, constraint): 

487 extra_args = {} 

488 

489 if self.site in sites_not_supporting['startbefore']: 

490 if constraint.tmin is not None and constraint.tmin != g_tmin: 

491 extra_args['starttime'] = constraint.tmin 

492 if constraint.tmax is not None and constraint.tmax != g_tmax: 

493 extra_args['endtime'] = constraint.tmax 

494 

495 else: 

496 if constraint.tmin is not None and constraint.tmin != g_tmin: 

497 extra_args['endafter'] = constraint.tmin 

498 if constraint.tmax is not None and constraint.tmax != g_tmax: 

499 extra_args['startbefore'] = constraint.tmax 

500 

501 if self.site not in sites_not_supporting['includerestricted']: 

502 extra_args.update( 

503 includerestricted=( 

504 self.user_credentials is not None 

505 or self.auth_token is not None 

506 or self.auth_token_path is not None)) 

507 

508 if self.query_args is not None: 

509 extra_args.update(self.query_args) 

510 

511 self._log_meta('querying...') 

512 

513 try: 

514 channel_sx = fdsn.station( 

515 site=self.site, 

516 format='text', 

517 level='channel', 

518 **extra_args) 

519 

520 self._hotfix('channel', channel_sx) 

521 

522 return channel_sx 

523 

524 except fdsn.EmptyResult: 

525 return stationxml.FDSNStationXML(source='dummy-empty-result') 

526 

527 def _load_constraint(self): 

528 fn = self._get_constraint_path() 

529 if op.exists(fn): 

530 with open(fn, 'rb') as f: 

531 self._constraint = pickle.load(f) 

532 else: 

533 self._constraint = None 

534 

535 def _dump_constraint(self): 

536 with open(self._get_constraint_path(), 'wb') as f: 

537 pickle.dump(self._constraint, f, protocol=2) 

538 

539 def _get_expiration_time(self, path): 

540 if self.expires is None: 

541 return None 

542 

543 try: 

544 t = os.stat(path)[8] 

545 return t + self.expires 

546 

547 except OSError: 

548 return 0.0 

549 

550 def _get_channels_expiration_time(self): 

551 return self._get_expiration_time(self._get_channels_path()) 

552 

553 def update_waveform_promises(self, squirrel, constraint): 

554 from ..base import gaps 

555 cpath = os.path.abspath(self._get_channels_path()) 

556 

557 ctmin = constraint.tmin 

558 ctmax = constraint.tmax 

559 

560 nuts = squirrel.iter_nuts( 

561 'channel', 

562 path=cpath, 

563 codes=constraint.codes, 

564 tmin=ctmin, 

565 tmax=ctmax) 

566 

567 coverages = squirrel.get_coverage( 

568 'waveform', 

569 codes=constraint.codes if constraint.codes else None, 

570 tmin=ctmin, 

571 tmax=ctmax) 

572 

573 codes_to_avail = defaultdict(list) 

574 for coverage in coverages: 

575 for tmin, tmax, _ in coverage.iter_spans(): 

576 codes_to_avail[coverage.codes].append((tmin, tmax)) 

577 

578 def sgaps(nut): 

579 for tmin, tmax in gaps( 

580 codes_to_avail[nut.codes], 

581 max(ctmin, nut.tmin) if ctmin is not None else nut.tmin, 

582 min(ctmax, nut.tmax) if ctmax is not None else nut.tmax): 

583 

584 subnut = clone(nut) 

585 subnut.tmin = tmin 

586 subnut.tmax = tmax 

587 

588 # ignore 1-sample gaps produced by rounding errors 

589 if subnut.tmax - subnut.tmin < 2*subnut.deltat: 

590 continue 

591 

592 yield subnut 

593 

594 def wanted(nuts): 

595 for nut in nuts: 

596 for nut in sgaps(nut): 

597 yield nut 

598 

599 path = self._source_id 

600 squirrel.add_virtual( 

601 (make_waveform_promise_nut( 

602 file_path=path, 

603 **nut.waveform_promise_kwargs) for nut in wanted(nuts)), 

604 virtual_paths=[path]) 

605 

606 def remove_waveform_promises(self, squirrel, from_database='selection'): 

607 ''' 

608 Remove waveform promises from live selection or global database. 

609 

610 :param from_database: 

611 Remove from live selection ``'selection'`` or global database 

612 ``'global'``. 

613 ''' 

614 

615 path = self._source_id 

616 if from_database == 'selection': 

617 squirrel.remove(path) 

618 elif from_database == 'global': 

619 squirrel.get_database().remove(path) 

620 else: 

621 raise ValueError( 

622 'Values allowed for from_database: ("selection", "global")') 

623 

624 def _get_user_credentials(self): 

625 d = {} 

626 if self.user_credentials is not None: 

627 d['user'], d['passwd'] = self.user_credentials 

628 

629 if self.auth_token is not None or self.auth_token_path is not None: 

630 d['token'] = self.get_auth_token() 

631 

632 return d 

633 

634 def download_waveforms( 

635 self, orders, success, batch_add, error_permanent, 

636 error_temporary): 

637 

638 elog = ErrorLog(site=self.site) 

639 orders.sort(key=orders_sort_key) 

640 neach = 20 

641 i = 0 

642 task = make_task( 

643 'FDSN "%s" waveforms: downloading' % self.site, len(orders)) 

644 

645 while i < len(orders): 

646 orders_now = orders[i:i+neach] 

647 selection_now = orders_to_selection(orders_now) 

648 nsamples_estimate = sum( 

649 order.estimate_nsamples() for order in orders_now) 

650 

651 nsuccess = 0 

652 elog.append_checkpoint() 

653 self._log_info_data( 

654 'downloading, %s' % order_summary(orders_now)) 

655 

656 all_paths = [] 

657 with tempfile.TemporaryDirectory() as tmpdir: 

658 try: 

659 data = fdsn.dataselect( 

660 site=self.site, selection=selection_now, 

661 **self._get_user_credentials()) 

662 

663 now = time.time() 

664 

665 path = op.join(tmpdir, 'tmp.mseed') 

666 with open(path, 'wb') as f: 

667 nread = 0 

668 while True: 

669 buf = data.read(1024) 

670 nread += len(buf) 

671 if not buf: 

672 break 

673 f.write(buf) 

674 

675 # abort if we get way more data than expected 

676 if nread > max( 

677 1024 * 1000, 

678 nsamples_estimate * 4 * 10): 

679 

680 raise Aborted('Too much data received.') 

681 

682 trs = io.load(path) 

683 

684 by_nslc = defaultdict(list) 

685 for tr in trs: 

686 by_nslc[tr.nslc_id].append(tr) 

687 

688 for order in orders_now: 

689 trs_order = [] 

690 err_this = None 

691 for tr in by_nslc[order.codes.nslc]: 

692 try: 

693 order.validate(tr) 

694 trs_order.append(tr.chop( 

695 order.tmin, order.tmax, inplace=False)) 

696 

697 except trace.NoData: 

698 err_this = ( 

699 'empty result', 'empty sub-interval') 

700 

701 except InvalidWaveform as e: 

702 err_this = ('invalid waveform', str(e)) 

703 

704 if len(trs_order) == 0: 

705 if err_this is None: 

706 err_this = ('empty result', '') 

707 

708 elog.append(now, order, *err_this) 

709 if order.is_near_real_time(): 

710 error_temporary(order) 

711 else: 

712 error_permanent(order) 

713 else: 

714 def tsame(ta, tb): 

715 return abs(tb - ta) < 2 * order.deltat 

716 

717 if len(trs_order) != 1 \ 

718 or not tsame( 

719 trs_order[0].tmin, order.tmin) \ 

720 or not tsame( 

721 trs_order[0].tmax, order.tmax): 

722 

723 if err_this: 

724 elog.append( 

725 now, order, 

726 'partial result, %s' % err_this[0], 

727 err_this[1]) 

728 else: 

729 elog.append(now, order, 'partial result') 

730 

731 paths = self._archive.add(order, trs_order) 

732 all_paths.extend(paths) 

733 

734 nsuccess += 1 

735 success(order, trs_order) 

736 

737 except fdsn.EmptyResult: 

738 now = time.time() 

739 for order in orders_now: 

740 elog.append(now, order, 'empty result') 

741 if order.is_near_real_time(): 

742 error_temporary(order) 

743 else: 

744 error_permanent(order) 

745 

746 except Aborted as e: 

747 now = time.time() 

748 for order in orders_now: 

749 elog.append(now, order, 'aborted', str(e)) 

750 error_permanent(order) 

751 

752 except util.HTTPError as e: 

753 now = time.time() 

754 for order in orders_now: 

755 elog.append(now, order, 'http error', str(e)) 

756 error_temporary(order) 

757 

758 emessage = elog.summarize_recent() 

759 

760 self._log_info_data( 

761 '%i download%s %ssuccessful' % ( 

762 nsuccess, 

763 util.plural_s(nsuccess), 

764 '(partially) ' if emessage else '') 

765 + (', %s' % emessage if emessage else '')) 

766 

767 if all_paths: 

768 batch_add(all_paths) 

769 

770 i += neach 

771 task.update(i) 

772 

773 for agg in elog.iter_aggregates(): 

774 logger.warning(str(agg)) 

775 

776 task.done() 

777 

778 def _do_response_query(self, selection): 

779 extra_args = {} 

780 

781 if self.site not in sites_not_supporting['includerestricted']: 

782 extra_args.update( 

783 includerestricted=( 

784 self.user_credentials is not None 

785 or self.auth_token is not None 

786 or self.auth_token_path is not None)) 

787 

788 self._log_responses('querying...') 

789 

790 try: 

791 response_sx = fdsn.station( 

792 site=self.site, 

793 level='response', 

794 selection=selection, 

795 **extra_args) 

796 

797 self._hotfix('response', response_sx) 

798 return response_sx 

799 

800 except fdsn.EmptyResult: 

801 return stationxml.FDSNStationXML(source='dummy-empty-result') 

802 

803 def update_response_inventory(self, squirrel, constraint): 

804 cpath = os.path.abspath(self._get_channels_path()) 

805 nuts = squirrel.iter_nuts( 

806 'channel', path=cpath, codes=constraint.codes) 

807 

808 tmin = g_tmin_queries 

809 tmax = g_tmax 

810 

811 selection = [] 

812 now = time.time() 

813 have = set() 

814 status = defaultdict(list) 

815 for nut in nuts: 

816 nslc = nut.codes.nslc 

817 if nslc in have: 

818 continue 

819 have.add(nslc) 

820 

821 fn = self._get_responses_path(nslc) 

822 expiration_time = self._get_expiration_time(fn) 

823 if os.path.exists(fn) \ 

824 and (expiration_time is None or now < expiration_time): 

825 status['using cached'].append(nslc) 

826 else: 

827 selection.append(nslc + (tmin, tmax)) 

828 

829 dummy = stationxml.FDSNStationXML(source='dummy-empty') 

830 neach = 100 

831 i = 0 

832 fns = [] 

833 while i < len(selection): 

834 selection_now = selection[i:i+neach] 

835 i += neach 

836 

837 try: 

838 sx = self._do_response_query(selection_now) 

839 except Exception as e: 

840 status['update failed (%s)' % str(e)].extend( 

841 entry[:4] for entry in selection_now) 

842 continue 

843 

844 sx.created = None # timestamp would ruin diff 

845 

846 by_nslc = dict(stationxml.split_channels(sx)) 

847 

848 for entry in selection_now: 

849 nslc = entry[:4] 

850 response_sx = by_nslc.get(nslc, dummy) 

851 try: 

852 fn = self._get_responses_path(nslc) 

853 fn_temp = fn + '.%i.temp' % os.getpid() 

854 

855 util.ensuredirs(fn_temp) 

856 response_sx.dump_xml(filename=fn_temp) 

857 

858 status_this = move_or_keep(fn_temp, fn) 

859 

860 if status_this == 'upstream unchanged': 

861 try: 

862 squirrel.get_database().silent_touch(fn) 

863 except ExecuteGet1Error: 

864 pass 

865 

866 status[status_this].append(nslc) 

867 fns.append(fn) 

868 

869 except OSError as e: 

870 status['update failed (%s)' % str(e)].append(nslc) 

871 

872 for k in sorted(status): 

873 if k.find('failed') != -1: 

874 log_target = logger.error 

875 else: 

876 log_target = logger.info 

877 

878 self._log_responses( 

879 '%s: %s' % ( 

880 k, codes_to_str_abbreviated( 

881 CodesNSLCE(tup) for tup in status[k])), 

882 target=log_target) 

883 

884 if fns: 

885 squirrel.add(fns, kinds=['response']) 

886 

887 

888__all__ = [ 

889 'FDSNSource', 

890]