Coverage for /usr/local/lib/python3.13/dist-packages/pyrocko/squirrel/client/fdsn.py: 81%

525 statements  

« prev     ^ index     » next       coverage.py v7.6.0, created at 2025-12-04 10:41 +0000

1# http://pyrocko.org - GPLv3 

2# 

3# The Pyrocko Developers, 21st Century 

4# ---|P------/S----------~Lg---------- 

5 

6''' 

7Squirrel client to access FDSN web services for seismic waveforms and metadata. 

8''' 

9 

10import time 

11import os 

12import copy 

13import logging 

14import tempfile 

15import importlib.util 

16from collections import defaultdict 

17try: 

18 import cPickle as pickle 

19except ImportError: 

20 import pickle 

21import os.path as op 

22from .base import Source, Constraint 

23from ..model import make_waveform_promise_nut, ehash, InvalidWaveform, \ 

24 order_summary, WaveformOrder, g_tmin, g_tmax, g_tmin_queries, \ 

25 codes_to_str_abbreviated, CodesNSLCE 

26from .. import storage 

27from ..database import ExecuteGet1Error 

28from pyrocko.squirrel.error import SquirrelError 

29from pyrocko.client import fdsn 

30 

31from pyrocko import util, trace, io 

32from pyrocko.io.io_common import FileLoadError 

33from pyrocko.io import stationxml 

34from pyrocko import progress 

35from pyrocko import has_paths 

36 

37from pyrocko.guts import Object, String, Timestamp, List, Tuple, Int, Dict, \ 

38 Duration, Bool, clone, dump_all_spickle 

39 

40 

41guts_prefix = 'squirrel' 

42 

43logger = logging.getLogger('psq.client.fdsn') 

44 

45g_sites_not_supporting = { 

46 'startbefore': ['geonet'], 

47 'includerestricted': ['geonet', 'ncedc', 'scedc']} 

48 

49g_keys_conflicting_post_codes = { 

50 'network', 'station', 'location', 'channel', 'minlatitude', 'maxlatitude', 

51 'minlongitude', 'maxlongitude', 'latitude', 'longitude', 'minradius', 

52 'maxradius'} 

53 

54 

55def make_task(*args): 

56 return progress.task(*args, logger=logger) 

57 

58 

59def diff(fn_a, fn_b): 

60 try: 

61 if os.stat(fn_a).st_size != os.stat(fn_b).st_size: 

62 return True 

63 

64 except OSError: 

65 return True 

66 

67 with open(fn_a, 'rb') as fa: 

68 with open(fn_b, 'rb') as fb: 

69 while True: 

70 a = fa.read(1024) 

71 b = fb.read(1024) 

72 if a != b: 

73 return True 

74 

75 if len(a) == 0 or len(b) == 0: 

76 return False 

77 

78 

79def move_or_keep(fn_temp, fn): 

80 if op.exists(fn): 

81 if diff(fn, fn_temp): 

82 os.rename(fn_temp, fn) 

83 status = 'updated' 

84 else: 

85 os.unlink(fn_temp) 

86 status = 'upstream unchanged' 

87 

88 else: 

89 os.rename(fn_temp, fn) 

90 status = 'new' 

91 

92 return status 

93 

94 

95def combine_selections(selection): 

96 out = [] 

97 last = None 

98 for this in selection: 

99 if last and this[:4] == last[:4] and this[4] == last[5]: 

100 last = last[:5] + (this[5],) 

101 else: 

102 if last: 

103 out.append(last) 

104 

105 last = this 

106 

107 if last: 

108 out.append(last) 

109 

110 return out 

111 

112 

113def orders_sort_key(order): 

114 return (order.codes, order.tmin) 

115 

116 

117def orders_to_selection(orders, pad=1.0): 

118 selection = [] 

119 nslc_to_deltat = {} 

120 for order in sorted(orders, key=orders_sort_key): 

121 selection.append( 

122 order.codes.nslc + (order.tmin, order.tmax)) 

123 nslc_to_deltat[order.codes.nslc] = order.deltat 

124 

125 selection = combine_selections(selection) 

126 selection_padded = [] 

127 for (net, sta, loc, cha, tmin, tmax) in selection: 

128 deltat = nslc_to_deltat[net, sta, loc, cha] 

129 selection_padded.append(( 

130 net, sta, loc, cha, tmin-pad*deltat, tmax+pad*deltat)) 

131 

132 return selection_padded 

133 

134 

135def codes_to_selection(codes_list, tmin, tmax): 

136 if codes_list is None: 

137 return None 

138 

139 selection = [] 

140 for codes in sorted(codes_list): 

141 selection.append( 

142 codes.nslc + (tmin, tmax)) 

143 

144 return selection 

145 

146 

147class ErrorEntry(Object): 

148 time = Timestamp.T() 

149 order = WaveformOrder.T() 

150 kind = String.T() 

151 details = String.T(optional=True) 

152 

153 

154class ErrorAggregate(Object): 

155 site = String.T() 

156 kind = String.T() 

157 details = String.T() 

158 entries = List.T(ErrorEntry.T()) 

159 codes = List.T(CodesNSLCE.T()) 

160 time_spans = List.T(Tuple.T(2, Timestamp.T())) 

161 

162 def __str__(self): 

163 codes = [str(x) for x in self.codes] 

164 scodes = '\n' + util.ewrap(codes, indent=' ') if codes else '<none>' 

165 tss = self.time_spans 

166 sspans = '\n' + util.ewrap(('%s - %s' % ( 

167 util.time_to_str(ts[0]), util.time_to_str(ts[1])) for ts in tss), 

168 indent=' ') 

169 

170 return ('FDSN "%s": download error summary for "%s" (%i)\n%s ' 

171 'Codes:%s\n Time spans:%s') % ( 

172 self.site, 

173 self.kind, 

174 len(self.entries), 

175 ' Details: %s\n' % self.details if self.details else '', 

176 scodes, 

177 sspans) 

178 

179 

180class ErrorLog(Object): 

181 site = String.T() 

182 entries = List.T(ErrorEntry.T()) 

183 checkpoints = List.T(Int.T()) 

184 

185 def append_checkpoint(self): 

186 self.checkpoints.append(len(self.entries)) 

187 

188 def append(self, time, order, kind, details=''): 

189 entry = ErrorEntry(time=time, order=order, kind=kind, details=details) 

190 self.entries.append(entry) 

191 

192 def iter_aggregates(self): 

193 by_kind_details = defaultdict(list) 

194 for entry in self.entries: 

195 by_kind_details[entry.kind, entry.details].append(entry) 

196 

197 kind_details = sorted(by_kind_details.keys()) 

198 

199 for kind, details in kind_details: 

200 entries = by_kind_details[kind, details] 

201 codes = sorted(set(entry.order.codes for entry in entries)) 

202 selection = orders_to_selection(entry.order for entry in entries) 

203 time_spans = sorted(set(row[-2:] for row in selection)) 

204 yield ErrorAggregate( 

205 site=self.site, 

206 kind=kind, 

207 details=details, 

208 entries=entries, 

209 codes=codes, 

210 time_spans=time_spans) 

211 

212 def summarize_recent(self): 

213 ioff = self.checkpoints[-1] if self.checkpoints else 0 

214 recent = self.entries[ioff:] 

215 kinds = sorted(set(entry.kind for entry in recent)) 

216 if recent: 

217 return '%i error%s (%s)' % ( 

218 len(recent), util.plural_s(recent), '; '.join(kinds)) 

219 else: 

220 return '' 

221 

222 

223class Aborted(SquirrelError): 

224 pass 

225 

226 

227class FDSNSource(Source, has_paths.HasPaths): 

228 

229 ''' 

230 Squirrel data-source to transparently get data from FDSN web services. 

231 

232 Attaching an :py:class:`FDSNSource` object to a 

233 :py:class:`~pyrocko.squirrel.base.Squirrel` allows the latter to download 

234 station and waveform data from an FDSN web service should the data not 

235 already happen to be available locally. 

236 ''' 

237 

238 site = String.T( 

239 help='FDSN site url or alias name (see ' 

240 ':py:mod:`pyrocko.client.fdsn`).') 

241 

242 query_args = Dict.T( 

243 String.T(), String.T(), 

244 optional=True, 

245 help='Common query arguments, which are appended to all queries.') 

246 

247 codes = List.T( 

248 CodesNSLCE.T(), 

249 optional=True, 

250 help='List of codes patterns to query via POST parameters.') 

251 

252 expires = Duration.T( 

253 optional=True, 

254 help='Expiration time [s]. Information older than this will be ' 

255 'refreshed. This only applies to station-metadata. Waveforms do ' 

256 'not expire. If set to ``None`` neither type of data expires.') 

257 

258 anxious = Duration.T( 

259 default=600., 

260 help='Anxiety period [s]. Missing waveforms will not be treated as ' 

261 'permanently missing for orders spanning into times later than ' 

262 '(current time - anxious). Default: 600.') 

263 

264 cache_path = has_paths.Path.T( 

265 optional=True, 

266 help='Directory path where any downloaded waveforms and station ' 

267 'meta-data are to be kept. By default the Squirrel ' 

268 "environment's cache directory is used.") 

269 

270 shared_waveforms = Bool.T( 

271 default=False, 

272 help='If ``True``, waveforms are shared with other FDSN sources in ' 

273 'the same Squirrel environment. If ``False``, they are kept ' 

274 'separate.') 

275 

276 storage_path = has_paths.Path.T( 

277 optional=True, 

278 help='If set, manage waveforms in the given directory rather than ' 

279 'in the Squirrel cache.') 

280 

281 storage_scheme = storage.StorageSchemeChoice.T( 

282 default='default', 

283 help="Set layout of waveform storage. Available: %s. " 

284 "Default: ``'default'``." % ', '.join( 

285 "``'%s'``" % name 

286 for name in storage.StorageSchemeChoice.choices)) 

287 

288 user_credentials = Tuple.T( 

289 2, String.T(), 

290 optional=True, 

291 help='User and password for FDSN servers requiring password ' 

292 'authentication') 

293 

294 auth_token = String.T( 

295 optional=True, 

296 help='Authentication token to be presented to the FDSN server.') 

297 

298 auth_token_path = has_paths.Path.T( 

299 optional=True, 

300 help='Path to file containing the authentication token to be ' 

301 'presented to the FDSN server.') 

302 

303 hotfix_module_path = has_paths.Path.T( 

304 optional=True, 

305 help='Path to Python module to locally patch metadata errors.') 

306 

307 def __init__(self, site, query_args=None, codes=None, **kwargs): 

308 if codes: 

309 codes = [CodesNSLCE(codes_) for codes_ in codes] 

310 

311 if codes is not None and query_args is not None: 

312 conflicting = g_keys_conflicting_post_codes \ 

313 & set(query_args.keys()) 

314 

315 if conflicting: 

316 raise SquirrelError( 

317 'Cannot use %s in `query_args` when `codes` are also ' 

318 'given.' % ' or '.join("'%s'" % k for k in conflicting)) 

319 

320 Source.__init__( 

321 self, site=site, query_args=query_args, codes=codes, **kwargs) 

322 

323 self._constraint = None 

324 self._hash = self.make_hash() 

325 self._source_id = 'client:fdsn:%s' % self._hash 

326 self._error_infos = [] 

327 self._cache_path = None 

328 

329 def get_cache_path(self): 

330 return op.join(self._cache_path, self._hash) 

331 

332 def describe(self): 

333 return self._source_id 

334 

335 def make_hash(self): 

336 s = self.site 

337 s += 'notoken' \ 

338 if (self.auth_token is None and self.auth_token_path is None) \ 

339 else 'token' 

340 

341 if self.user_credentials is not None: 

342 s += self.user_credentials[0] 

343 else: 

344 s += 'nocred' 

345 

346 if self.query_args is not None: 

347 s += ','.join( 

348 '%s:%s' % (k, self.query_args[k]) 

349 for k in sorted(self.query_args.keys())) 

350 else: 

351 s += 'noqueryargs' 

352 

353 if self.codes is not None: 

354 s += 'post_codes:' + ','.join( 

355 codes.safe_str for codes in self.codes) 

356 

357 if self.hotfix_module_path is not None: 

358 s += 'hotfix:' + self.hotfix_module_path 

359 

360 return ehash(s) 

361 

362 def get_hash(self): 

363 return self._hash 

364 

365 def get_auth_token(self): 

366 if self.auth_token: 

367 return self.auth_token 

368 

369 elif self.auth_token_path is not None: 

370 try: 

371 with open(self.auth_token_path, 'rb') as f: 

372 return f.read().decode('ascii') 

373 

374 except OSError as e: 

375 raise FileLoadError( 

376 'Cannot load auth token file (%s): %s' 

377 % (str(e), self.auth_token_path)) 

378 

379 else: 

380 raise Exception( 

381 'FDSNSource: auth_token and auth_token_path are mutually ' 

382 'exclusive.') 

383 

384 def setup(self, squirrel, check=True, upgrade=False): 

385 self._cache_path = op.join( 

386 self.cache_path or squirrel._cache_path, 'fdsn') 

387 

388 util.ensuredir(self._cache_path) 

389 self._load_constraint() 

390 self._archive = storage.get_storage_scheme(self.storage_scheme) 

391 

392 waveforms_path = self._get_waveforms_path() 

393 util.ensuredir(waveforms_path) 

394 self._archive.set_base_path(waveforms_path) 

395 

396 for waveforms_path_compat in self._get_waveforms_paths_compat(): 

397 if os.path.exists(waveforms_path_compat): 

398 

399 if upgrade: 

400 import shutil 

401 from pyrocko.squirrel.tool.commands.jackseis \ 

402 import Converter 

403 

404 logger.info( 

405 'Upgrading waveform archive.\n old: %s\n new: %s', 

406 waveforms_path_compat, 

407 waveforms_path) 

408 

409 converter = Converter( 

410 in_path=waveforms_path_compat, 

411 out_storage_path=waveforms_path, 

412 tinc=3600.) 

413 converter.set_basepath('.') 

414 

415 converter.convert( 

416 squirrel_factory=lambda: squirrel, 

417 append=True) 

418 

419 shutil.rmtree(waveforms_path_compat) 

420 

421 else: 

422 logger.warning( 

423 'Waveform archive with old layout: %s\n💡 Use ' 

424 '`squirrel ... --upgrade-storage` or ' 

425 '`.add_dataset(..., upgrade=True)` ' 

426 'to upgrade.' % waveforms_path_compat) 

427 squirrel.add(waveforms_path_compat, check=check) 

428 

429 squirrel.add( 

430 self._get_waveforms_path(), 

431 check=check) 

432 

433 fn = self._get_channels_path() 

434 if os.path.exists(fn): 

435 squirrel.add(fn) 

436 

437 squirrel.add_virtual( 

438 [], virtual_paths=[self._source_id]) 

439 

440 responses_path = self._get_responses_path() 

441 if os.path.exists(responses_path): 

442 squirrel.add( 

443 responses_path, kinds=['response'], exclude=r'\.temp$') 

444 

445 self._hotfix_module = None 

446 

447 def _hotfix(self, query_type, sx): 

448 if self.hotfix_module_path is None: 

449 return 

450 

451 if self._hotfix_module is None: 

452 module_path = self.expand_path(self.hotfix_module_path) 

453 spec = importlib.util.spec_from_file_location( 

454 'hotfix_' + self._hash, module_path) 

455 self._hotfix_module = importlib.util.module_from_spec(spec) 

456 spec.loader.exec_module(self._hotfix_module) 

457 

458 hook = getattr( 

459 self._hotfix_module, 'stationxml_' + query_type + '_hook') 

460 

461 return hook(sx) 

462 

463 def _get_constraint_path(self): 

464 return op.join(self._cache_path, self._hash, 'constraint.pickle') 

465 

466 def _get_channels_path(self): 

467 return op.join(self._cache_path, self._hash, 'channels.stationxml') 

468 

469 def _get_responses_path(self, nslc=None): 

470 dirpath = op.join( 

471 self._cache_path, self._hash, 'responses') 

472 

473 if nslc is None: 

474 return dirpath 

475 else: 

476 return op.join( 

477 dirpath, 'response_%s_%s_%s_%s.stationxml' % nslc) 

478 

479 def _get_waveforms_path(self): 

480 if self.storage_path: 

481 return self.expand_path(self.storage_path) 

482 

483 if self.shared_waveforms: 

484 return op.join(self._cache_path, 'waveforms-v2') 

485 else: 

486 return op.join(self._cache_path, self._hash, 'waveforms-v2') 

487 

488 def _get_waveforms_paths_compat(self): 

489 if self.shared_waveforms: 

490 return [op.join(self._cache_path, 'waveforms')] 

491 else: 

492 return [op.join(self._cache_path, self._hash, 'waveforms')] 

493 

494 def _log_meta(self, message, target=logger.info): 

495 log_prefix = 'FDSN "%s" metadata:' % self.site 

496 target(' '.join((log_prefix, message))) 

497 

498 def _log_responses(self, message, target=logger.info): 

499 log_prefix = 'FDSN "%s" responses:' % self.site 

500 target(' '.join((log_prefix, message))) 

501 

502 def _log_info_data(self, *args): 

503 log_prefix = 'FDSN "%s" waveforms:' % self.site 

504 logger.info(' '.join((log_prefix,) + args)) 

505 

506 def _str_expires(self, t, now): 

507 if t is None: 

508 return 'expires: never' 

509 else: 

510 expire = 'expires' if t > now else 'expired' 

511 return '%s: %s' % ( 

512 expire, 

513 util.time_to_str(t, format='%Y-%m-%d %H:%M:%S')) 

514 

515 def update_channel_inventory(self, squirrel, constraint=None): 

516 if constraint is None: 

517 constraint = Constraint() 

518 

519 expiration_time = self._get_channels_expiration_time() 

520 now = time.time() 

521 

522 log_target = logger.info 

523 if self._constraint and self._constraint.contains(constraint) \ 

524 and (expiration_time is None or now < expiration_time): 

525 

526 status = 'using cached' 

527 

528 else: 

529 if self._constraint: 

530 constraint_temp = copy.deepcopy(self._constraint) 

531 constraint_temp.expand(constraint) 

532 constraint = constraint_temp 

533 

534 try: 

535 channel_sx = self._do_channel_query(constraint) 

536 

537 channel_sx.created = None # timestamp would ruin diff 

538 

539 fn = self._get_channels_path() 

540 util.ensuredirs(fn) 

541 fn_temp = fn + '.%i.temp' % os.getpid() 

542 

543 dump_all_spickle([channel_sx], filename=fn_temp) 

544 # channel_sx.dump_xml(filename=fn_temp) 

545 

546 status = move_or_keep(fn_temp, fn) 

547 

548 if status == 'upstream unchanged': 

549 squirrel.get_database().silent_touch(fn) 

550 

551 self._constraint = constraint 

552 self._dump_constraint() 

553 

554 except OSError as e: 

555 status = 'update failed (%s)' % str(e) 

556 log_target = logger.error 

557 

558 expiration_time = self._get_channels_expiration_time() 

559 self._log_meta( 

560 '%s (%s)' % (status, self._str_expires(expiration_time, now)), 

561 target=log_target) 

562 

563 fn = self._get_channels_path() 

564 if os.path.exists(fn): 

565 squirrel.add(fn) 

566 

567 def _do_channel_query(self, constraint): 

568 extra_args = {} 

569 

570 tmin = constraint.tmin \ 

571 if constraint.tmin is not None and constraint.tmin != g_tmin \ 

572 else g_tmin_queries 

573 

574 tmax = constraint.tmax \ 

575 if constraint.tmax is not None and constraint.tmax != g_tmax \ 

576 else g_tmax 

577 

578 if self.site in g_sites_not_supporting['startbefore']: 

579 ktmin = 'starttime' 

580 ktmax = 'endtime' 

581 else: 

582 ktmin = 'endafter' 

583 ktmax = 'startbefore' 

584 

585 if self.codes is None: 

586 extra_args[ktmin] = tmin 

587 extra_args[ktmax] = tmax 

588 

589 if self.site not in g_sites_not_supporting['includerestricted']: 

590 extra_args.update( 

591 includerestricted=( 

592 self.user_credentials is not None 

593 or self.auth_token is not None 

594 or self.auth_token_path is not None)) 

595 

596 if self.query_args is not None: 

597 extra_args.update(self.query_args) 

598 

599 self._log_meta('querying...') 

600 

601 try: 

602 channel_sx = fdsn.station( 

603 site=self.site, 

604 format='text', 

605 level='channel', 

606 selection=codes_to_selection(self.codes, tmin, tmax), 

607 **extra_args) 

608 

609 self._hotfix('channel', channel_sx) 

610 

611 return channel_sx 

612 

613 except fdsn.EmptyResult: 

614 return stationxml.FDSNStationXML(source='dummy-empty-result') 

615 

616 except fdsn.DownloadError as e: 

617 raise SquirrelError(str(e)) 

618 

619 def _load_constraint(self): 

620 fn = self._get_constraint_path() 

621 if op.exists(fn): 

622 with open(fn, 'rb') as f: 

623 self._constraint = pickle.load(f) 

624 else: 

625 self._constraint = None 

626 

627 def _dump_constraint(self): 

628 with open(self._get_constraint_path(), 'wb') as f: 

629 pickle.dump(self._constraint, f, protocol=2) 

630 

631 def _get_expiration_time(self, path): 

632 if self.expires is None: 

633 return None 

634 

635 try: 

636 t = os.stat(path)[8] 

637 return t + self.expires 

638 

639 except OSError: 

640 return 0.0 

641 

642 def _get_channels_expiration_time(self): 

643 return self._get_expiration_time(self._get_channels_path()) 

644 

645 def update_waveform_promises(self, squirrel, constraint): 

646 from ..base import gaps 

647 cpath = os.path.abspath(self._get_channels_path()) 

648 

649 ctmin = constraint.tmin 

650 ctmax = constraint.tmax 

651 

652 nuts = squirrel.iter_nuts( 

653 'channel', 

654 path=cpath, 

655 codes=constraint.codes, 

656 tmin=ctmin, 

657 tmax=ctmax) 

658 

659 coverages = squirrel.get_coverage( 

660 'waveform', 

661 codes=constraint.codes if constraint.codes else None, 

662 tmin=ctmin, 

663 tmax=ctmax) 

664 

665 codes_to_avail = defaultdict(list) 

666 for coverage in coverages: 

667 for tmin, tmax, _ in coverage.iter_spans(): 

668 codes_to_avail[coverage.codes].append((tmin, tmax)) 

669 

670 def sgaps(nut): 

671 for tmin, tmax in gaps( 

672 codes_to_avail[nut.codes], 

673 max(ctmin, nut.tmin) if ctmin is not None else nut.tmin, 

674 min(ctmax, nut.tmax) if ctmax is not None else nut.tmax): 

675 

676 subnut = clone(nut) 

677 subnut.tmin = tmin 

678 subnut.tmax = tmax 

679 

680 if subnut.deltat is None: 

681 continue 

682 

683 # ignore 1-sample gaps produced by rounding errors 

684 if subnut.tmax - subnut.tmin < 2*subnut.deltat: 

685 continue 

686 

687 yield subnut 

688 

689 def wanted(nuts): 

690 for nut in nuts: 

691 if nut.deltat is None: 

692 logger.warning( 

693 'Ignoring channel with unknown sampling rate: %s' 

694 % str(nut.codes)) 

695 continue 

696 

697 for nut in sgaps(nut): 

698 yield nut 

699 

700 path = self._source_id 

701 squirrel.add_virtual( 

702 (make_waveform_promise_nut( 

703 file_path=path, 

704 file_format='virtual', 

705 file_mtime=0.0, 

706 file_size=0, 

707 **nut.waveform_promise_kwargs) for nut in wanted(nuts)), 

708 virtual_paths=[path]) 

709 

710 def remove_waveform_promises(self, squirrel, from_database='selection'): 

711 ''' 

712 Remove waveform promises from live selection or global database. 

713 

714 :param from_database: 

715 Remove from live selection ``'selection'`` or global database 

716 ``'global'``. 

717 ''' 

718 

719 path = self._source_id 

720 if from_database == 'selection': 

721 squirrel.remove(path) 

722 elif from_database == 'global': 

723 squirrel.get_database().remove(path) 

724 else: 

725 raise ValueError( 

726 'Values allowed for from_database: ("selection", "global")') 

727 

728 def _get_user_credentials(self): 

729 d = {} 

730 if self.user_credentials is not None: 

731 d['user'], d['passwd'] = self.user_credentials 

732 

733 if self.auth_token is not None or self.auth_token_path is not None: 

734 d['token'] = self.get_auth_token() 

735 

736 return d 

737 

738 def save_waveforms(self, trs): 

739 return self._archive.save(trs, check_append_merge=True) 

740 

741 def download_waveforms( 

742 self, orders, success, error_permanent, error_temporary, aborted): 

743 

744 elog = ErrorLog(site=self.site) 

745 orders.sort(key=orders_sort_key) 

746 neach = 20 

747 i = 0 

748 task = make_task( 

749 'FDSN "%s" waveforms: downloading' % self.site, len(orders)) 

750 

751 while i < len(orders) and not aborted(): 

752 orders_now = orders[i:i+neach] 

753 selection_now = orders_to_selection(orders_now) 

754 nsamples_estimate = sum( 

755 order.estimate_nsamples() for order in orders_now) 

756 

757 nsuccess = 0 

758 elog.append_checkpoint() 

759 self._log_info_data( 

760 'downloading, %s' % order_summary(orders_now)) 

761 

762 with tempfile.TemporaryDirectory() as tmpdir: 

763 try: 

764 data = fdsn.dataselect( 

765 site=self.site, selection=selection_now, 

766 **self._get_user_credentials()) 

767 

768 now = time.time() 

769 

770 path = op.join(tmpdir, 'tmp.mseed') 

771 with open(path, 'wb') as f: 

772 nread = 0 

773 while True: 

774 buf = data.read(1024) 

775 nread += len(buf) 

776 if not buf: 

777 break 

778 f.write(buf) 

779 

780 # abort if we get way more data than expected 

781 if nread > max( 

782 1024 * 1000, 

783 nsamples_estimate * 4 * 10): 

784 

785 raise Aborted('Too much data received.') 

786 

787 trs = io.load(path) 

788 

789 by_nslc = defaultdict(list) 

790 for tr in trs: 

791 by_nslc[tr.nslc_id].append(tr) 

792 

793 for order in orders_now: 

794 trs_order = [] 

795 err_this = None 

796 for tr in by_nslc[order.codes.nslc]: 

797 try: 

798 order.validate(tr) 

799 trs_order.append(tr.chop( 

800 order.tmin, order.tmax, inplace=False)) 

801 

802 except trace.NoData: 

803 err_this = ( 

804 'empty result', 'empty sub-interval') 

805 

806 except InvalidWaveform as e: 

807 err_this = ('invalid waveform', str(e)) 

808 

809 if len(trs_order) == 0: 

810 if err_this is None: 

811 err_this = ('empty result', '') 

812 

813 elog.append(now, order, *err_this) 

814 if order.is_near_real_time(): 

815 error_temporary(order) 

816 else: 

817 error_permanent(order) 

818 else: 

819 def tsame(ta, tb): 

820 return abs(tb - ta) < 2 * order.deltat 

821 

822 if len(trs_order) != 1 \ 

823 or not tsame( 

824 trs_order[0].tmin, order.tmin) \ 

825 or not tsame( 

826 trs_order[0].tmax, order.tmax): 

827 

828 if err_this: 

829 elog.append( 

830 now, order, 

831 'partial result, %s' % err_this[0], 

832 err_this[1]) 

833 else: 

834 elog.append(now, order, 'partial result') 

835 

836 nsuccess += 1 

837 success(order, trs_order) 

838 

839 except fdsn.EmptyResult: 

840 now = time.time() 

841 for order in orders_now: 

842 elog.append(now, order, 'empty result') 

843 if order.is_near_real_time(): 

844 error_temporary(order) 

845 else: 

846 error_permanent(order) 

847 

848 except Aborted as e: 

849 now = time.time() 

850 for order in orders_now: 

851 elog.append(now, order, 'aborted', str(e)) 

852 error_permanent(order) 

853 

854 except (util.HTTPError, fdsn.DownloadError) as e: 

855 now = time.time() 

856 for order in orders_now: 

857 elog.append(now, order, 'http error', str(e)) 

858 error_temporary(order) 

859 

860 emessage = elog.summarize_recent() 

861 

862 self._log_info_data( 

863 '%i download%s %ssuccessful' % ( 

864 nsuccess, 

865 util.plural_s(nsuccess), 

866 '(partially) ' if emessage else '') 

867 + (', %s' % emessage if emessage else '')) 

868 

869 i += neach 

870 task.update(i) 

871 

872 for agg in elog.iter_aggregates(): 

873 logger.warning(str(agg)) 

874 

875 task.done() 

876 

877 def _do_response_query(self, selection): 

878 extra_args = {} 

879 

880 if self.site not in g_sites_not_supporting['includerestricted']: 

881 extra_args.update( 

882 includerestricted=( 

883 self.user_credentials is not None 

884 or self.auth_token is not None 

885 or self.auth_token_path is not None)) 

886 

887 self._log_responses('querying...') 

888 

889 try: 

890 response_sx = fdsn.station( 

891 site=self.site, 

892 level='response', 

893 selection=selection, 

894 **extra_args) 

895 

896 self._hotfix('response', response_sx) 

897 return response_sx 

898 

899 except fdsn.EmptyResult: 

900 return stationxml.FDSNStationXML(source='dummy-empty-result') 

901 

902 except fdsn.DownloadError as e: 

903 raise SquirrelError(str(e)) 

904 

905 def update_response_inventory(self, squirrel, constraint): 

906 cpath = os.path.abspath(self._get_channels_path()) 

907 nuts = squirrel.iter_nuts( 

908 'channel', path=cpath, codes=constraint.codes) 

909 

910 tmin = g_tmin_queries 

911 tmax = g_tmax 

912 

913 selection = [] 

914 now = time.time() 

915 have = set() 

916 status = defaultdict(list) 

917 for nut in nuts: 

918 nslc = nut.codes.nslc 

919 if nslc in have: 

920 continue 

921 have.add(nslc) 

922 

923 fn = self._get_responses_path(nslc) 

924 expiration_time = self._get_expiration_time(fn) 

925 if os.path.exists(fn) \ 

926 and (expiration_time is None or now < expiration_time): 

927 status['using cached'].append(nslc) 

928 else: 

929 selection.append(nslc + (tmin, tmax)) 

930 

931 dummy = stationxml.FDSNStationXML(source='dummy-empty') 

932 neach = 100 

933 i = 0 

934 fns = [] 

935 while i < len(selection): 

936 selection_now = selection[i:i+neach] 

937 i += neach 

938 

939 try: 

940 sx = self._do_response_query(selection_now) 

941 except Exception as e: 

942 status['update failed (%s)' % str(e)].extend( 

943 entry[:4] for entry in selection_now) 

944 continue 

945 

946 sx.created = None # timestamp would ruin diff 

947 

948 by_nslc = dict(stationxml.split_channels(sx)) 

949 

950 for entry in selection_now: 

951 nslc = entry[:4] 

952 response_sx = by_nslc.get(nslc, dummy) 

953 try: 

954 fn = self._get_responses_path(nslc) 

955 fn_temp = fn + '.%i.temp' % os.getpid() 

956 

957 util.ensuredirs(fn_temp) 

958 

959 dump_all_spickle([response_sx], filename=fn_temp) 

960 # response_sx.dump_xml(filename=fn_temp) 

961 

962 status_this = move_or_keep(fn_temp, fn) 

963 

964 if status_this == 'upstream unchanged': 

965 try: 

966 squirrel.get_database().silent_touch(fn) 

967 except ExecuteGet1Error: 

968 pass 

969 

970 status[status_this].append(nslc) 

971 fns.append(fn) 

972 

973 except OSError as e: 

974 status['update failed (%s)' % str(e)].append(nslc) 

975 

976 for k in sorted(status): 

977 if k.find('failed') != -1: 

978 log_target = logger.error 

979 else: 

980 log_target = logger.info 

981 

982 self._log_responses( 

983 '%s: %s' % ( 

984 k, codes_to_str_abbreviated( 

985 CodesNSLCE(tup) for tup in status[k])), 

986 target=log_target) 

987 

988 if fns: 

989 squirrel.add(fns, kinds=['response']) 

990 

991 

992__all__ = [ 

993 'FDSNSource', 

994]