Coverage for /usr/local/lib/python3.11/dist-packages/pyrocko/squirrel/client/fdsn.py: 83%

511 statements  

« prev     ^ index     » next       coverage.py v6.5.0, created at 2023-11-03 12:47 +0000

1# http://pyrocko.org - GPLv3 

2# 

3# The Pyrocko Developers, 21st Century 

4# ---|P------/S----------~Lg---------- 

5 

6''' 

7Squirrel client to access FDSN web services for seismic waveforms and metadata. 

8''' 

9 

10import time 

11import os 

12import copy 

13import logging 

14import tempfile 

15import importlib.util 

16from collections import defaultdict 

17try: 

18 import cPickle as pickle 

19except ImportError: 

20 import pickle 

21import os.path as op 

22from .base import Source, Constraint 

23from ..model import make_waveform_promise_nut, ehash, InvalidWaveform, \ 

24 order_summary, WaveformOrder, g_tmin, g_tmax, g_tmin_queries, \ 

25 codes_to_str_abbreviated, CodesNSLCE 

26from ..database import ExecuteGet1Error 

27from pyrocko.squirrel.error import SquirrelError 

28from pyrocko.client import fdsn 

29 

30from pyrocko import util, trace, io 

31from pyrocko.io.io_common import FileLoadError 

32from pyrocko.io import stationxml 

33from pyrocko.progress import progress 

34from pyrocko import has_paths 

35 

36from pyrocko.guts import Object, String, Timestamp, List, Tuple, Int, Dict, \ 

37 Duration, Bool, clone, dump_all_spickle 

38 

39 

40guts_prefix = 'squirrel' 

41 

42fdsn.g_timeout = 60. 

43 

44logger = logging.getLogger('psq.client.fdsn') 

45 

46g_sites_not_supporting = { 

47 'startbefore': ['geonet'], 

48 'includerestricted': ['geonet', 'ncedc', 'scedc']} 

49 

50g_keys_conflicting_post_codes = { 

51 'network', 'station', 'location', 'channel', 'minlatitude', 'maxlatitude', 

52 'minlongitude', 'maxlongitude', 'latitude', 'longitude', 'minradius', 

53 'maxradius'} 

54 

55 

56def make_task(*args): 

57 return progress.task(*args, logger=logger) 

58 

59 

60def diff(fn_a, fn_b): 

61 try: 

62 if os.stat(fn_a).st_size != os.stat(fn_b).st_size: 

63 return True 

64 

65 except OSError: 

66 return True 

67 

68 with open(fn_a, 'rb') as fa: 

69 with open(fn_b, 'rb') as fb: 

70 while True: 

71 a = fa.read(1024) 

72 b = fb.read(1024) 

73 if a != b: 

74 return True 

75 

76 if len(a) == 0 or len(b) == 0: 

77 return False 

78 

79 

80def move_or_keep(fn_temp, fn): 

81 if op.exists(fn): 

82 if diff(fn, fn_temp): 

83 os.rename(fn_temp, fn) 

84 status = 'updated' 

85 else: 

86 os.unlink(fn_temp) 

87 status = 'upstream unchanged' 

88 

89 else: 

90 os.rename(fn_temp, fn) 

91 status = 'new' 

92 

93 return status 

94 

95 

96class Archive(Object): 

97 

98 def add(self): 

99 raise NotImplementedError() 

100 

101 

102class MSeedArchive(Archive): 

103 template = String.T(default=op.join( 

104 '%(tmin_year)s', 

105 '%(tmin_month)s', 

106 '%(tmin_day)s', 

107 'trace_%(network)s_%(station)s_%(location)s_%(channel)s' 

108 + '_%(block_tmin_us)s_%(block_tmax_us)s.mseed')) 

109 

110 def __init__(self, **kwargs): 

111 Archive.__init__(self, **kwargs) 

112 self._base_path = None 

113 

114 def set_base_path(self, path): 

115 self._base_path = path 

116 

117 def add(self, order, trs): 

118 path = op.join(self._base_path, self.template) 

119 fmt = '%Y-%m-%d_%H-%M-%S.6FRAC' 

120 return io.save(trs, path, overwrite=True, additional=dict( 

121 block_tmin_us=util.time_to_str(order.tmin, format=fmt), 

122 block_tmax_us=util.time_to_str(order.tmax, format=fmt))) 

123 

124 

125def combine_selections(selection): 

126 out = [] 

127 last = None 

128 for this in selection: 

129 if last and this[:4] == last[:4] and this[4] == last[5]: 

130 last = last[:5] + (this[5],) 

131 else: 

132 if last: 

133 out.append(last) 

134 

135 last = this 

136 

137 if last: 

138 out.append(last) 

139 

140 return out 

141 

142 

143def orders_sort_key(order): 

144 return (order.codes, order.tmin) 

145 

146 

147def orders_to_selection(orders, pad=1.0): 

148 selection = [] 

149 nslc_to_deltat = {} 

150 for order in sorted(orders, key=orders_sort_key): 

151 selection.append( 

152 order.codes.nslc + (order.tmin, order.tmax)) 

153 nslc_to_deltat[order.codes.nslc] = order.deltat 

154 

155 selection = combine_selections(selection) 

156 selection_padded = [] 

157 for (net, sta, loc, cha, tmin, tmax) in selection: 

158 deltat = nslc_to_deltat[net, sta, loc, cha] 

159 selection_padded.append(( 

160 net, sta, loc, cha, tmin-pad*deltat, tmax+pad*deltat)) 

161 

162 return selection_padded 

163 

164 

165def codes_to_selection(codes_list, tmin, tmax): 

166 if codes_list is None: 

167 return None 

168 

169 selection = [] 

170 for codes in sorted(codes_list): 

171 selection.append( 

172 codes.nslc + (tmin, tmax)) 

173 

174 return selection 

175 

176 

177class ErrorEntry(Object): 

178 time = Timestamp.T() 

179 order = WaveformOrder.T() 

180 kind = String.T() 

181 details = String.T(optional=True) 

182 

183 

184class ErrorAggregate(Object): 

185 site = String.T() 

186 kind = String.T() 

187 details = String.T() 

188 entries = List.T(ErrorEntry.T()) 

189 codes = List.T(CodesNSLCE.T()) 

190 time_spans = List.T(Tuple.T(2, Timestamp.T())) 

191 

192 def __str__(self): 

193 codes = [str(x) for x in self.codes] 

194 scodes = '\n' + util.ewrap(codes, indent=' ') if codes else '<none>' 

195 tss = self.time_spans 

196 sspans = '\n' + util.ewrap(('%s - %s' % ( 

197 util.time_to_str(ts[0]), util.time_to_str(ts[1])) for ts in tss), 

198 indent=' ') 

199 

200 return ('FDSN "%s": download error summary for "%s" (%i)\n%s ' 

201 'Codes:%s\n Time spans:%s') % ( 

202 self.site, 

203 self.kind, 

204 len(self.entries), 

205 ' Details: %s\n' % self.details if self.details else '', 

206 scodes, 

207 sspans) 

208 

209 

210class ErrorLog(Object): 

211 site = String.T() 

212 entries = List.T(ErrorEntry.T()) 

213 checkpoints = List.T(Int.T()) 

214 

215 def append_checkpoint(self): 

216 self.checkpoints.append(len(self.entries)) 

217 

218 def append(self, time, order, kind, details=''): 

219 entry = ErrorEntry(time=time, order=order, kind=kind, details=details) 

220 self.entries.append(entry) 

221 

222 def iter_aggregates(self): 

223 by_kind_details = defaultdict(list) 

224 for entry in self.entries: 

225 by_kind_details[entry.kind, entry.details].append(entry) 

226 

227 kind_details = sorted(by_kind_details.keys()) 

228 

229 for kind, details in kind_details: 

230 entries = by_kind_details[kind, details] 

231 codes = sorted(set(entry.order.codes for entry in entries)) 

232 selection = orders_to_selection(entry.order for entry in entries) 

233 time_spans = sorted(set(row[-2:] for row in selection)) 

234 yield ErrorAggregate( 

235 site=self.site, 

236 kind=kind, 

237 details=details, 

238 entries=entries, 

239 codes=codes, 

240 time_spans=time_spans) 

241 

242 def summarize_recent(self): 

243 ioff = self.checkpoints[-1] if self.checkpoints else 0 

244 recent = self.entries[ioff:] 

245 kinds = sorted(set(entry.kind for entry in recent)) 

246 if recent: 

247 return '%i error%s (%s)' % ( 

248 len(recent), util.plural_s(recent), '; '.join(kinds)) 

249 else: 

250 return '' 

251 

252 

253class Aborted(SquirrelError): 

254 pass 

255 

256 

257class FDSNSource(Source, has_paths.HasPaths): 

258 

259 ''' 

260 Squirrel data-source to transparently get data from FDSN web services. 

261 

262 Attaching an :py:class:`FDSNSource` object to a 

263 :py:class:`~pyrocko.squirrel.base.Squirrel` allows the latter to download 

264 station and waveform data from an FDSN web service should the data not 

265 already happen to be available locally. 

266 ''' 

267 

268 site = String.T( 

269 help='FDSN site url or alias name (see ' 

270 ':py:mod:`pyrocko.client.fdsn`).') 

271 

272 query_args = Dict.T( 

273 String.T(), String.T(), 

274 optional=True, 

275 help='Common query arguments, which are appended to all queries.') 

276 

277 codes = List.T( 

278 CodesNSLCE.T(), 

279 optional=True, 

280 help='List of codes patterns to query via POST parameters.') 

281 

282 expires = Duration.T( 

283 optional=True, 

284 help='Expiration time [s]. Information older than this will be ' 

285 'refreshed. This only applies to station-metadata. Waveforms do ' 

286 'not expire. If set to ``None`` neither type of data expires.') 

287 

288 cache_path = String.T( 

289 optional=True, 

290 help='Directory path where any downloaded waveforms and station ' 

291 'meta-data are to be kept. By default the Squirrel ' 

292 "environment's cache directory is used.") 

293 

294 shared_waveforms = Bool.T( 

295 default=False, 

296 help='If ``True``, waveforms are shared with other FDSN sources in ' 

297 'the same Squirrel environment. If ``False``, they are kept ' 

298 'separate.') 

299 

300 user_credentials = Tuple.T( 

301 2, String.T(), 

302 optional=True, 

303 help='User and password for FDSN servers requiring password ' 

304 'authentication') 

305 

306 auth_token = String.T( 

307 optional=True, 

308 help='Authentication token to be presented to the FDSN server.') 

309 

310 auth_token_path = String.T( 

311 optional=True, 

312 help='Path to file containing the authentication token to be ' 

313 'presented to the FDSN server.') 

314 

315 hotfix_module_path = has_paths.Path.T( 

316 optional=True, 

317 help='Path to Python module to locally patch metadata errors.') 

318 

319 def __init__(self, site, query_args=None, codes=None, **kwargs): 

320 if codes: 

321 codes = [CodesNSLCE(codes_) for codes_ in codes] 

322 

323 if codes is not None and query_args is not None: 

324 conflicting = g_keys_conflicting_post_codes \ 

325 & set(query_args.keys()) 

326 

327 if conflicting: 

328 raise SquirrelError( 

329 'Cannot use %s in `query_args` when `codes` are also ' 

330 'given.' % ' or '.join("'%s'" % k for k in conflicting)) 

331 

332 Source.__init__( 

333 self, site=site, query_args=query_args, codes=codes, **kwargs) 

334 

335 self._constraint = None 

336 self._hash = self.make_hash() 

337 self._source_id = 'client:fdsn:%s' % self._hash 

338 self._error_infos = [] 

339 

340 def describe(self): 

341 return self._source_id 

342 

343 def make_hash(self): 

344 s = self.site 

345 s += 'notoken' \ 

346 if (self.auth_token is None and self.auth_token_path is None) \ 

347 else 'token' 

348 

349 if self.user_credentials is not None: 

350 s += self.user_credentials[0] 

351 else: 

352 s += 'nocred' 

353 

354 if self.query_args is not None: 

355 s += ','.join( 

356 '%s:%s' % (k, self.query_args[k]) 

357 for k in sorted(self.query_args.keys())) 

358 else: 

359 s += 'noqueryargs' 

360 

361 if self.codes is not None: 

362 s += 'post_codes:' + ','.join( 

363 codes.safe_str for codes in self.codes) 

364 

365 return ehash(s) 

366 

367 def get_hash(self): 

368 return self._hash 

369 

370 def get_auth_token(self): 

371 if self.auth_token: 

372 return self.auth_token 

373 

374 elif self.auth_token_path is not None: 

375 try: 

376 with open(self.auth_token_path, 'rb') as f: 

377 return f.read().decode('ascii') 

378 

379 except OSError as e: 

380 raise FileLoadError( 

381 'Cannot load auth token file (%s): %s' 

382 % (str(e), self.auth_token_path)) 

383 

384 else: 

385 raise Exception( 

386 'FDSNSource: auth_token and auth_token_path are mutually ' 

387 'exclusive.') 

388 

389 def setup(self, squirrel, check=True): 

390 self._cache_path = op.join( 

391 self.cache_path or squirrel._cache_path, 'fdsn') 

392 

393 util.ensuredir(self._cache_path) 

394 self._load_constraint() 

395 self._archive = MSeedArchive() 

396 waveforms_path = self._get_waveforms_path() 

397 util.ensuredir(waveforms_path) 

398 self._archive.set_base_path(waveforms_path) 

399 

400 squirrel.add( 

401 self._get_waveforms_path(), 

402 check=check) 

403 

404 fn = self._get_channels_path() 

405 if os.path.exists(fn): 

406 squirrel.add(fn) 

407 

408 squirrel.add_virtual( 

409 [], virtual_paths=[self._source_id]) 

410 

411 responses_path = self._get_responses_path() 

412 if os.path.exists(responses_path): 

413 squirrel.add( 

414 responses_path, kinds=['response'], exclude=r'\.temp$') 

415 

416 self._hotfix_module = None 

417 

418 def _hotfix(self, query_type, sx): 

419 if self.hotfix_module_path is None: 

420 return 

421 

422 if self._hotfix_module is None: 

423 module_path = self.expand_path(self.hotfix_module_path) 

424 spec = importlib.util.spec_from_file_location( 

425 'hotfix_' + self._hash, module_path) 

426 self._hotfix_module = importlib.util.module_from_spec(spec) 

427 spec.loader.exec_module(self._hotfix_module) 

428 

429 hook = getattr( 

430 self._hotfix_module, 'stationxml_' + query_type + '_hook') 

431 

432 return hook(sx) 

433 

434 def _get_constraint_path(self): 

435 return op.join(self._cache_path, self._hash, 'constraint.pickle') 

436 

437 def _get_channels_path(self): 

438 return op.join(self._cache_path, self._hash, 'channels.stationxml') 

439 

440 def _get_responses_path(self, nslc=None): 

441 dirpath = op.join( 

442 self._cache_path, self._hash, 'responses') 

443 

444 if nslc is None: 

445 return dirpath 

446 else: 

447 return op.join( 

448 dirpath, 'response_%s_%s_%s_%s.stationxml' % nslc) 

449 

450 def _get_waveforms_path(self): 

451 if self.shared_waveforms: 

452 return op.join(self._cache_path, 'waveforms') 

453 else: 

454 return op.join(self._cache_path, self._hash, 'waveforms') 

455 

456 def _log_meta(self, message, target=logger.info): 

457 log_prefix = 'FDSN "%s" metadata:' % self.site 

458 target(' '.join((log_prefix, message))) 

459 

460 def _log_responses(self, message, target=logger.info): 

461 log_prefix = 'FDSN "%s" responses:' % self.site 

462 target(' '.join((log_prefix, message))) 

463 

464 def _log_info_data(self, *args): 

465 log_prefix = 'FDSN "%s" waveforms:' % self.site 

466 logger.info(' '.join((log_prefix,) + args)) 

467 

468 def _str_expires(self, t, now): 

469 if t is None: 

470 return 'expires: never' 

471 else: 

472 expire = 'expires' if t > now else 'expired' 

473 return '%s: %s' % ( 

474 expire, 

475 util.time_to_str(t, format='%Y-%m-%d %H:%M:%S')) 

476 

477 def update_channel_inventory(self, squirrel, constraint=None): 

478 if constraint is None: 

479 constraint = Constraint() 

480 

481 expiration_time = self._get_channels_expiration_time() 

482 now = time.time() 

483 

484 log_target = logger.info 

485 if self._constraint and self._constraint.contains(constraint) \ 

486 and (expiration_time is None or now < expiration_time): 

487 

488 status = 'using cached' 

489 

490 else: 

491 if self._constraint: 

492 constraint_temp = copy.deepcopy(self._constraint) 

493 constraint_temp.expand(constraint) 

494 constraint = constraint_temp 

495 

496 try: 

497 channel_sx = self._do_channel_query(constraint) 

498 

499 channel_sx.created = None # timestamp would ruin diff 

500 

501 fn = self._get_channels_path() 

502 util.ensuredirs(fn) 

503 fn_temp = fn + '.%i.temp' % os.getpid() 

504 

505 dump_all_spickle([channel_sx], filename=fn_temp) 

506 # channel_sx.dump_xml(filename=fn_temp) 

507 

508 status = move_or_keep(fn_temp, fn) 

509 

510 if status == 'upstream unchanged': 

511 squirrel.get_database().silent_touch(fn) 

512 

513 self._constraint = constraint 

514 self._dump_constraint() 

515 

516 except OSError as e: 

517 status = 'update failed (%s)' % str(e) 

518 log_target = logger.error 

519 

520 expiration_time = self._get_channels_expiration_time() 

521 self._log_meta( 

522 '%s (%s)' % (status, self._str_expires(expiration_time, now)), 

523 target=log_target) 

524 

525 fn = self._get_channels_path() 

526 if os.path.exists(fn): 

527 squirrel.add(fn) 

528 

529 def _do_channel_query(self, constraint): 

530 extra_args = {} 

531 

532 tmin = constraint.tmin \ 

533 if constraint.tmin is not None and constraint.tmin != g_tmin \ 

534 else g_tmin_queries 

535 

536 tmax = constraint.tmax \ 

537 if constraint.tmax is not None and constraint.tmax != g_tmax \ 

538 else g_tmax 

539 

540 if self.site in g_sites_not_supporting['startbefore']: 

541 ktmin = 'starttime' 

542 ktmax = 'endtime' 

543 else: 

544 ktmin = 'endafter' 

545 ktmax = 'startbefore' 

546 

547 if self.codes is None: 

548 extra_args[ktmin] = tmin 

549 extra_args[ktmax] = tmax 

550 

551 if self.site not in g_sites_not_supporting['includerestricted']: 

552 extra_args.update( 

553 includerestricted=( 

554 self.user_credentials is not None 

555 or self.auth_token is not None 

556 or self.auth_token_path is not None)) 

557 

558 if self.query_args is not None: 

559 extra_args.update(self.query_args) 

560 

561 self._log_meta('querying...') 

562 

563 try: 

564 channel_sx = fdsn.station( 

565 site=self.site, 

566 format='text', 

567 level='channel', 

568 selection=codes_to_selection(self.codes, tmin, tmax), 

569 **extra_args) 

570 

571 self._hotfix('channel', channel_sx) 

572 

573 return channel_sx 

574 

575 except fdsn.EmptyResult: 

576 return stationxml.FDSNStationXML(source='dummy-empty-result') 

577 

578 except fdsn.DownloadError as e: 

579 raise SquirrelError(str(e)) 

580 

581 def _load_constraint(self): 

582 fn = self._get_constraint_path() 

583 if op.exists(fn): 

584 with open(fn, 'rb') as f: 

585 self._constraint = pickle.load(f) 

586 else: 

587 self._constraint = None 

588 

589 def _dump_constraint(self): 

590 with open(self._get_constraint_path(), 'wb') as f: 

591 pickle.dump(self._constraint, f, protocol=2) 

592 

593 def _get_expiration_time(self, path): 

594 if self.expires is None: 

595 return None 

596 

597 try: 

598 t = os.stat(path)[8] 

599 return t + self.expires 

600 

601 except OSError: 

602 return 0.0 

603 

604 def _get_channels_expiration_time(self): 

605 return self._get_expiration_time(self._get_channels_path()) 

606 

607 def update_waveform_promises(self, squirrel, constraint): 

608 from ..base import gaps 

609 cpath = os.path.abspath(self._get_channels_path()) 

610 

611 ctmin = constraint.tmin 

612 ctmax = constraint.tmax 

613 

614 nuts = squirrel.iter_nuts( 

615 'channel', 

616 path=cpath, 

617 codes=constraint.codes, 

618 tmin=ctmin, 

619 tmax=ctmax) 

620 

621 coverages = squirrel.get_coverage( 

622 'waveform', 

623 codes=constraint.codes if constraint.codes else None, 

624 tmin=ctmin, 

625 tmax=ctmax) 

626 

627 codes_to_avail = defaultdict(list) 

628 for coverage in coverages: 

629 for tmin, tmax, _ in coverage.iter_spans(): 

630 codes_to_avail[coverage.codes].append((tmin, tmax)) 

631 

632 def sgaps(nut): 

633 for tmin, tmax in gaps( 

634 codes_to_avail[nut.codes], 

635 max(ctmin, nut.tmin) if ctmin is not None else nut.tmin, 

636 min(ctmax, nut.tmax) if ctmax is not None else nut.tmax): 

637 

638 subnut = clone(nut) 

639 subnut.tmin = tmin 

640 subnut.tmax = tmax 

641 

642 # ignore 1-sample gaps produced by rounding errors 

643 if subnut.tmax - subnut.tmin < 2*subnut.deltat: 

644 continue 

645 

646 yield subnut 

647 

648 def wanted(nuts): 

649 for nut in nuts: 

650 for nut in sgaps(nut): 

651 yield nut 

652 

653 path = self._source_id 

654 squirrel.add_virtual( 

655 (make_waveform_promise_nut( 

656 file_path=path, 

657 **nut.waveform_promise_kwargs) for nut in wanted(nuts)), 

658 virtual_paths=[path]) 

659 

660 def remove_waveform_promises(self, squirrel, from_database='selection'): 

661 ''' 

662 Remove waveform promises from live selection or global database. 

663 

664 :param from_database: 

665 Remove from live selection ``'selection'`` or global database 

666 ``'global'``. 

667 ''' 

668 

669 path = self._source_id 

670 if from_database == 'selection': 

671 squirrel.remove(path) 

672 elif from_database == 'global': 

673 squirrel.get_database().remove(path) 

674 else: 

675 raise ValueError( 

676 'Values allowed for from_database: ("selection", "global")') 

677 

678 def _get_user_credentials(self): 

679 d = {} 

680 if self.user_credentials is not None: 

681 d['user'], d['passwd'] = self.user_credentials 

682 

683 if self.auth_token is not None or self.auth_token_path is not None: 

684 d['token'] = self.get_auth_token() 

685 

686 return d 

687 

688 def download_waveforms( 

689 self, orders, success, batch_add, error_permanent, 

690 error_temporary): 

691 

692 elog = ErrorLog(site=self.site) 

693 orders.sort(key=orders_sort_key) 

694 neach = 20 

695 i = 0 

696 task = make_task( 

697 'FDSN "%s" waveforms: downloading' % self.site, len(orders)) 

698 

699 while i < len(orders): 

700 orders_now = orders[i:i+neach] 

701 selection_now = orders_to_selection(orders_now) 

702 nsamples_estimate = sum( 

703 order.estimate_nsamples() for order in orders_now) 

704 

705 nsuccess = 0 

706 elog.append_checkpoint() 

707 self._log_info_data( 

708 'downloading, %s' % order_summary(orders_now)) 

709 

710 all_paths = [] 

711 with tempfile.TemporaryDirectory() as tmpdir: 

712 try: 

713 data = fdsn.dataselect( 

714 site=self.site, selection=selection_now, 

715 **self._get_user_credentials()) 

716 

717 now = time.time() 

718 

719 path = op.join(tmpdir, 'tmp.mseed') 

720 with open(path, 'wb') as f: 

721 nread = 0 

722 while True: 

723 buf = data.read(1024) 

724 nread += len(buf) 

725 if not buf: 

726 break 

727 f.write(buf) 

728 

729 # abort if we get way more data than expected 

730 if nread > max( 

731 1024 * 1000, 

732 nsamples_estimate * 4 * 10): 

733 

734 raise Aborted('Too much data received.') 

735 

736 trs = io.load(path) 

737 

738 by_nslc = defaultdict(list) 

739 for tr in trs: 

740 by_nslc[tr.nslc_id].append(tr) 

741 

742 for order in orders_now: 

743 trs_order = [] 

744 err_this = None 

745 for tr in by_nslc[order.codes.nslc]: 

746 try: 

747 order.validate(tr) 

748 trs_order.append(tr.chop( 

749 order.tmin, order.tmax, inplace=False)) 

750 

751 except trace.NoData: 

752 err_this = ( 

753 'empty result', 'empty sub-interval') 

754 

755 except InvalidWaveform as e: 

756 err_this = ('invalid waveform', str(e)) 

757 

758 if len(trs_order) == 0: 

759 if err_this is None: 

760 err_this = ('empty result', '') 

761 

762 elog.append(now, order, *err_this) 

763 if order.is_near_real_time(): 

764 error_temporary(order) 

765 else: 

766 error_permanent(order) 

767 else: 

768 def tsame(ta, tb): 

769 return abs(tb - ta) < 2 * order.deltat 

770 

771 if len(trs_order) != 1 \ 

772 or not tsame( 

773 trs_order[0].tmin, order.tmin) \ 

774 or not tsame( 

775 trs_order[0].tmax, order.tmax): 

776 

777 if err_this: 

778 elog.append( 

779 now, order, 

780 'partial result, %s' % err_this[0], 

781 err_this[1]) 

782 else: 

783 elog.append(now, order, 'partial result') 

784 

785 paths = self._archive.add(order, trs_order) 

786 all_paths.extend(paths) 

787 

788 nsuccess += 1 

789 success(order, trs_order) 

790 

791 except fdsn.EmptyResult: 

792 now = time.time() 

793 for order in orders_now: 

794 elog.append(now, order, 'empty result') 

795 if order.is_near_real_time(): 

796 error_temporary(order) 

797 else: 

798 error_permanent(order) 

799 

800 except Aborted as e: 

801 now = time.time() 

802 for order in orders_now: 

803 elog.append(now, order, 'aborted', str(e)) 

804 error_permanent(order) 

805 

806 except (util.HTTPError, fdsn.DownloadError) as e: 

807 now = time.time() 

808 for order in orders_now: 

809 elog.append(now, order, 'http error', str(e)) 

810 error_temporary(order) 

811 

812 emessage = elog.summarize_recent() 

813 

814 self._log_info_data( 

815 '%i download%s %ssuccessful' % ( 

816 nsuccess, 

817 util.plural_s(nsuccess), 

818 '(partially) ' if emessage else '') 

819 + (', %s' % emessage if emessage else '')) 

820 

821 if all_paths: 

822 batch_add(all_paths) 

823 

824 i += neach 

825 task.update(i) 

826 

827 for agg in elog.iter_aggregates(): 

828 logger.warning(str(agg)) 

829 

830 task.done() 

831 

832 def _do_response_query(self, selection): 

833 extra_args = {} 

834 

835 if self.site not in g_sites_not_supporting['includerestricted']: 

836 extra_args.update( 

837 includerestricted=( 

838 self.user_credentials is not None 

839 or self.auth_token is not None 

840 or self.auth_token_path is not None)) 

841 

842 self._log_responses('querying...') 

843 

844 try: 

845 response_sx = fdsn.station( 

846 site=self.site, 

847 level='response', 

848 selection=selection, 

849 **extra_args) 

850 

851 self._hotfix('response', response_sx) 

852 return response_sx 

853 

854 except fdsn.EmptyResult: 

855 return stationxml.FDSNStationXML(source='dummy-empty-result') 

856 

857 except fdsn.DownloadError as e: 

858 raise SquirrelError(str(e)) 

859 

860 def update_response_inventory(self, squirrel, constraint): 

861 cpath = os.path.abspath(self._get_channels_path()) 

862 nuts = squirrel.iter_nuts( 

863 'channel', path=cpath, codes=constraint.codes) 

864 

865 tmin = g_tmin_queries 

866 tmax = g_tmax 

867 

868 selection = [] 

869 now = time.time() 

870 have = set() 

871 status = defaultdict(list) 

872 for nut in nuts: 

873 nslc = nut.codes.nslc 

874 if nslc in have: 

875 continue 

876 have.add(nslc) 

877 

878 fn = self._get_responses_path(nslc) 

879 expiration_time = self._get_expiration_time(fn) 

880 if os.path.exists(fn) \ 

881 and (expiration_time is None or now < expiration_time): 

882 status['using cached'].append(nslc) 

883 else: 

884 selection.append(nslc + (tmin, tmax)) 

885 

886 dummy = stationxml.FDSNStationXML(source='dummy-empty') 

887 neach = 100 

888 i = 0 

889 fns = [] 

890 while i < len(selection): 

891 selection_now = selection[i:i+neach] 

892 i += neach 

893 

894 try: 

895 sx = self._do_response_query(selection_now) 

896 except Exception as e: 

897 status['update failed (%s)' % str(e)].extend( 

898 entry[:4] for entry in selection_now) 

899 continue 

900 

901 sx.created = None # timestamp would ruin diff 

902 

903 by_nslc = dict(stationxml.split_channels(sx)) 

904 

905 for entry in selection_now: 

906 nslc = entry[:4] 

907 response_sx = by_nslc.get(nslc, dummy) 

908 try: 

909 fn = self._get_responses_path(nslc) 

910 fn_temp = fn + '.%i.temp' % os.getpid() 

911 

912 util.ensuredirs(fn_temp) 

913 

914 dump_all_spickle([response_sx], filename=fn_temp) 

915 # response_sx.dump_xml(filename=fn_temp) 

916 

917 status_this = move_or_keep(fn_temp, fn) 

918 

919 if status_this == 'upstream unchanged': 

920 try: 

921 squirrel.get_database().silent_touch(fn) 

922 except ExecuteGet1Error: 

923 pass 

924 

925 status[status_this].append(nslc) 

926 fns.append(fn) 

927 

928 except OSError as e: 

929 status['update failed (%s)' % str(e)].append(nslc) 

930 

931 for k in sorted(status): 

932 if k.find('failed') != -1: 

933 log_target = logger.error 

934 else: 

935 log_target = logger.info 

936 

937 self._log_responses( 

938 '%s: %s' % ( 

939 k, codes_to_str_abbreviated( 

940 CodesNSLCE(tup) for tup in status[k])), 

941 target=log_target) 

942 

943 if fns: 

944 squirrel.add(fns, kinds=['response']) 

945 

946 

947__all__ = [ 

948 'FDSNSource', 

949]