Coverage for /usr/local/lib/python3.11/dist-packages/pyrocko/squirrel/client/fdsn.py: 83%

510 statements  

« prev     ^ index     » next       coverage.py v6.5.0, created at 2024-01-02 12:31 +0000

1# http://pyrocko.org - GPLv3 

2# 

3# The Pyrocko Developers, 21st Century 

4# ---|P------/S----------~Lg---------- 

5 

6''' 

7Squirrel client to access FDSN web services for seismic waveforms and metadata. 

8''' 

9 

10import time 

11import os 

12import copy 

13import logging 

14import tempfile 

15import importlib.util 

16from collections import defaultdict 

17try: 

18 import cPickle as pickle 

19except ImportError: 

20 import pickle 

21import os.path as op 

22from .base import Source, Constraint 

23from ..model import make_waveform_promise_nut, ehash, InvalidWaveform, \ 

24 order_summary, WaveformOrder, g_tmin, g_tmax, g_tmin_queries, \ 

25 codes_to_str_abbreviated, CodesNSLCE 

26from ..database import ExecuteGet1Error 

27from pyrocko.squirrel.error import SquirrelError 

28from pyrocko.client import fdsn 

29 

30from pyrocko import util, trace, io 

31from pyrocko.io.io_common import FileLoadError 

32from pyrocko.io import stationxml 

33from pyrocko import progress 

34from pyrocko import has_paths 

35 

36from pyrocko.guts import Object, String, Timestamp, List, Tuple, Int, Dict, \ 

37 Duration, Bool, clone, dump_all_spickle 

38 

39 

40guts_prefix = 'squirrel' 

41 

42logger = logging.getLogger('psq.client.fdsn') 

43 

44g_sites_not_supporting = { 

45 'startbefore': ['geonet'], 

46 'includerestricted': ['geonet', 'ncedc', 'scedc']} 

47 

48g_keys_conflicting_post_codes = { 

49 'network', 'station', 'location', 'channel', 'minlatitude', 'maxlatitude', 

50 'minlongitude', 'maxlongitude', 'latitude', 'longitude', 'minradius', 

51 'maxradius'} 

52 

53 

54def make_task(*args): 

55 return progress.task(*args, logger=logger) 

56 

57 

58def diff(fn_a, fn_b): 

59 try: 

60 if os.stat(fn_a).st_size != os.stat(fn_b).st_size: 

61 return True 

62 

63 except OSError: 

64 return True 

65 

66 with open(fn_a, 'rb') as fa: 

67 with open(fn_b, 'rb') as fb: 

68 while True: 

69 a = fa.read(1024) 

70 b = fb.read(1024) 

71 if a != b: 

72 return True 

73 

74 if len(a) == 0 or len(b) == 0: 

75 return False 

76 

77 

78def move_or_keep(fn_temp, fn): 

79 if op.exists(fn): 

80 if diff(fn, fn_temp): 

81 os.rename(fn_temp, fn) 

82 status = 'updated' 

83 else: 

84 os.unlink(fn_temp) 

85 status = 'upstream unchanged' 

86 

87 else: 

88 os.rename(fn_temp, fn) 

89 status = 'new' 

90 

91 return status 

92 

93 

94class Archive(Object): 

95 

96 def add(self): 

97 raise NotImplementedError() 

98 

99 

100class MSeedArchive(Archive): 

101 template = String.T(default=op.join( 

102 '%(tmin_year)s', 

103 '%(tmin_month)s', 

104 '%(tmin_day)s', 

105 'trace_%(network)s_%(station)s_%(location)s_%(channel)s' 

106 + '_%(block_tmin_us)s_%(block_tmax_us)s.mseed')) 

107 

108 def __init__(self, **kwargs): 

109 Archive.__init__(self, **kwargs) 

110 self._base_path = None 

111 

112 def set_base_path(self, path): 

113 self._base_path = path 

114 

115 def add(self, order, trs): 

116 path = op.join(self._base_path, self.template) 

117 fmt = '%Y-%m-%d_%H-%M-%S.6FRAC' 

118 return io.save(trs, path, overwrite=True, additional=dict( 

119 block_tmin_us=util.time_to_str(order.tmin, format=fmt), 

120 block_tmax_us=util.time_to_str(order.tmax, format=fmt))) 

121 

122 

123def combine_selections(selection): 

124 out = [] 

125 last = None 

126 for this in selection: 

127 if last and this[:4] == last[:4] and this[4] == last[5]: 

128 last = last[:5] + (this[5],) 

129 else: 

130 if last: 

131 out.append(last) 

132 

133 last = this 

134 

135 if last: 

136 out.append(last) 

137 

138 return out 

139 

140 

141def orders_sort_key(order): 

142 return (order.codes, order.tmin) 

143 

144 

145def orders_to_selection(orders, pad=1.0): 

146 selection = [] 

147 nslc_to_deltat = {} 

148 for order in sorted(orders, key=orders_sort_key): 

149 selection.append( 

150 order.codes.nslc + (order.tmin, order.tmax)) 

151 nslc_to_deltat[order.codes.nslc] = order.deltat 

152 

153 selection = combine_selections(selection) 

154 selection_padded = [] 

155 for (net, sta, loc, cha, tmin, tmax) in selection: 

156 deltat = nslc_to_deltat[net, sta, loc, cha] 

157 selection_padded.append(( 

158 net, sta, loc, cha, tmin-pad*deltat, tmax+pad*deltat)) 

159 

160 return selection_padded 

161 

162 

163def codes_to_selection(codes_list, tmin, tmax): 

164 if codes_list is None: 

165 return None 

166 

167 selection = [] 

168 for codes in sorted(codes_list): 

169 selection.append( 

170 codes.nslc + (tmin, tmax)) 

171 

172 return selection 

173 

174 

175class ErrorEntry(Object): 

176 time = Timestamp.T() 

177 order = WaveformOrder.T() 

178 kind = String.T() 

179 details = String.T(optional=True) 

180 

181 

182class ErrorAggregate(Object): 

183 site = String.T() 

184 kind = String.T() 

185 details = String.T() 

186 entries = List.T(ErrorEntry.T()) 

187 codes = List.T(CodesNSLCE.T()) 

188 time_spans = List.T(Tuple.T(2, Timestamp.T())) 

189 

190 def __str__(self): 

191 codes = [str(x) for x in self.codes] 

192 scodes = '\n' + util.ewrap(codes, indent=' ') if codes else '<none>' 

193 tss = self.time_spans 

194 sspans = '\n' + util.ewrap(('%s - %s' % ( 

195 util.time_to_str(ts[0]), util.time_to_str(ts[1])) for ts in tss), 

196 indent=' ') 

197 

198 return ('FDSN "%s": download error summary for "%s" (%i)\n%s ' 

199 'Codes:%s\n Time spans:%s') % ( 

200 self.site, 

201 self.kind, 

202 len(self.entries), 

203 ' Details: %s\n' % self.details if self.details else '', 

204 scodes, 

205 sspans) 

206 

207 

208class ErrorLog(Object): 

209 site = String.T() 

210 entries = List.T(ErrorEntry.T()) 

211 checkpoints = List.T(Int.T()) 

212 

213 def append_checkpoint(self): 

214 self.checkpoints.append(len(self.entries)) 

215 

216 def append(self, time, order, kind, details=''): 

217 entry = ErrorEntry(time=time, order=order, kind=kind, details=details) 

218 self.entries.append(entry) 

219 

220 def iter_aggregates(self): 

221 by_kind_details = defaultdict(list) 

222 for entry in self.entries: 

223 by_kind_details[entry.kind, entry.details].append(entry) 

224 

225 kind_details = sorted(by_kind_details.keys()) 

226 

227 for kind, details in kind_details: 

228 entries = by_kind_details[kind, details] 

229 codes = sorted(set(entry.order.codes for entry in entries)) 

230 selection = orders_to_selection(entry.order for entry in entries) 

231 time_spans = sorted(set(row[-2:] for row in selection)) 

232 yield ErrorAggregate( 

233 site=self.site, 

234 kind=kind, 

235 details=details, 

236 entries=entries, 

237 codes=codes, 

238 time_spans=time_spans) 

239 

240 def summarize_recent(self): 

241 ioff = self.checkpoints[-1] if self.checkpoints else 0 

242 recent = self.entries[ioff:] 

243 kinds = sorted(set(entry.kind for entry in recent)) 

244 if recent: 

245 return '%i error%s (%s)' % ( 

246 len(recent), util.plural_s(recent), '; '.join(kinds)) 

247 else: 

248 return '' 

249 

250 

251class Aborted(SquirrelError): 

252 pass 

253 

254 

255class FDSNSource(Source, has_paths.HasPaths): 

256 

257 ''' 

258 Squirrel data-source to transparently get data from FDSN web services. 

259 

260 Attaching an :py:class:`FDSNSource` object to a 

261 :py:class:`~pyrocko.squirrel.base.Squirrel` allows the latter to download 

262 station and waveform data from an FDSN web service should the data not 

263 already happen to be available locally. 

264 ''' 

265 

266 site = String.T( 

267 help='FDSN site url or alias name (see ' 

268 ':py:mod:`pyrocko.client.fdsn`).') 

269 

270 query_args = Dict.T( 

271 String.T(), String.T(), 

272 optional=True, 

273 help='Common query arguments, which are appended to all queries.') 

274 

275 codes = List.T( 

276 CodesNSLCE.T(), 

277 optional=True, 

278 help='List of codes patterns to query via POST parameters.') 

279 

280 expires = Duration.T( 

281 optional=True, 

282 help='Expiration time [s]. Information older than this will be ' 

283 'refreshed. This only applies to station-metadata. Waveforms do ' 

284 'not expire. If set to ``None`` neither type of data expires.') 

285 

286 cache_path = String.T( 

287 optional=True, 

288 help='Directory path where any downloaded waveforms and station ' 

289 'meta-data are to be kept. By default the Squirrel ' 

290 "environment's cache directory is used.") 

291 

292 shared_waveforms = Bool.T( 

293 default=False, 

294 help='If ``True``, waveforms are shared with other FDSN sources in ' 

295 'the same Squirrel environment. If ``False``, they are kept ' 

296 'separate.') 

297 

298 user_credentials = Tuple.T( 

299 2, String.T(), 

300 optional=True, 

301 help='User and password for FDSN servers requiring password ' 

302 'authentication') 

303 

304 auth_token = String.T( 

305 optional=True, 

306 help='Authentication token to be presented to the FDSN server.') 

307 

308 auth_token_path = String.T( 

309 optional=True, 

310 help='Path to file containing the authentication token to be ' 

311 'presented to the FDSN server.') 

312 

313 hotfix_module_path = has_paths.Path.T( 

314 optional=True, 

315 help='Path to Python module to locally patch metadata errors.') 

316 

317 def __init__(self, site, query_args=None, codes=None, **kwargs): 

318 if codes: 

319 codes = [CodesNSLCE(codes_) for codes_ in codes] 

320 

321 if codes is not None and query_args is not None: 

322 conflicting = g_keys_conflicting_post_codes \ 

323 & set(query_args.keys()) 

324 

325 if conflicting: 

326 raise SquirrelError( 

327 'Cannot use %s in `query_args` when `codes` are also ' 

328 'given.' % ' or '.join("'%s'" % k for k in conflicting)) 

329 

330 Source.__init__( 

331 self, site=site, query_args=query_args, codes=codes, **kwargs) 

332 

333 self._constraint = None 

334 self._hash = self.make_hash() 

335 self._source_id = 'client:fdsn:%s' % self._hash 

336 self._error_infos = [] 

337 

338 def describe(self): 

339 return self._source_id 

340 

341 def make_hash(self): 

342 s = self.site 

343 s += 'notoken' \ 

344 if (self.auth_token is None and self.auth_token_path is None) \ 

345 else 'token' 

346 

347 if self.user_credentials is not None: 

348 s += self.user_credentials[0] 

349 else: 

350 s += 'nocred' 

351 

352 if self.query_args is not None: 

353 s += ','.join( 

354 '%s:%s' % (k, self.query_args[k]) 

355 for k in sorted(self.query_args.keys())) 

356 else: 

357 s += 'noqueryargs' 

358 

359 if self.codes is not None: 

360 s += 'post_codes:' + ','.join( 

361 codes.safe_str for codes in self.codes) 

362 

363 return ehash(s) 

364 

365 def get_hash(self): 

366 return self._hash 

367 

368 def get_auth_token(self): 

369 if self.auth_token: 

370 return self.auth_token 

371 

372 elif self.auth_token_path is not None: 

373 try: 

374 with open(self.auth_token_path, 'rb') as f: 

375 return f.read().decode('ascii') 

376 

377 except OSError as e: 

378 raise FileLoadError( 

379 'Cannot load auth token file (%s): %s' 

380 % (str(e), self.auth_token_path)) 

381 

382 else: 

383 raise Exception( 

384 'FDSNSource: auth_token and auth_token_path are mutually ' 

385 'exclusive.') 

386 

387 def setup(self, squirrel, check=True): 

388 self._cache_path = op.join( 

389 self.cache_path or squirrel._cache_path, 'fdsn') 

390 

391 util.ensuredir(self._cache_path) 

392 self._load_constraint() 

393 self._archive = MSeedArchive() 

394 waveforms_path = self._get_waveforms_path() 

395 util.ensuredir(waveforms_path) 

396 self._archive.set_base_path(waveforms_path) 

397 

398 squirrel.add( 

399 self._get_waveforms_path(), 

400 check=check) 

401 

402 fn = self._get_channels_path() 

403 if os.path.exists(fn): 

404 squirrel.add(fn) 

405 

406 squirrel.add_virtual( 

407 [], virtual_paths=[self._source_id]) 

408 

409 responses_path = self._get_responses_path() 

410 if os.path.exists(responses_path): 

411 squirrel.add( 

412 responses_path, kinds=['response'], exclude=r'\.temp$') 

413 

414 self._hotfix_module = None 

415 

416 def _hotfix(self, query_type, sx): 

417 if self.hotfix_module_path is None: 

418 return 

419 

420 if self._hotfix_module is None: 

421 module_path = self.expand_path(self.hotfix_module_path) 

422 spec = importlib.util.spec_from_file_location( 

423 'hotfix_' + self._hash, module_path) 

424 self._hotfix_module = importlib.util.module_from_spec(spec) 

425 spec.loader.exec_module(self._hotfix_module) 

426 

427 hook = getattr( 

428 self._hotfix_module, 'stationxml_' + query_type + '_hook') 

429 

430 return hook(sx) 

431 

432 def _get_constraint_path(self): 

433 return op.join(self._cache_path, self._hash, 'constraint.pickle') 

434 

435 def _get_channels_path(self): 

436 return op.join(self._cache_path, self._hash, 'channels.stationxml') 

437 

438 def _get_responses_path(self, nslc=None): 

439 dirpath = op.join( 

440 self._cache_path, self._hash, 'responses') 

441 

442 if nslc is None: 

443 return dirpath 

444 else: 

445 return op.join( 

446 dirpath, 'response_%s_%s_%s_%s.stationxml' % nslc) 

447 

448 def _get_waveforms_path(self): 

449 if self.shared_waveforms: 

450 return op.join(self._cache_path, 'waveforms') 

451 else: 

452 return op.join(self._cache_path, self._hash, 'waveforms') 

453 

454 def _log_meta(self, message, target=logger.info): 

455 log_prefix = 'FDSN "%s" metadata:' % self.site 

456 target(' '.join((log_prefix, message))) 

457 

458 def _log_responses(self, message, target=logger.info): 

459 log_prefix = 'FDSN "%s" responses:' % self.site 

460 target(' '.join((log_prefix, message))) 

461 

462 def _log_info_data(self, *args): 

463 log_prefix = 'FDSN "%s" waveforms:' % self.site 

464 logger.info(' '.join((log_prefix,) + args)) 

465 

466 def _str_expires(self, t, now): 

467 if t is None: 

468 return 'expires: never' 

469 else: 

470 expire = 'expires' if t > now else 'expired' 

471 return '%s: %s' % ( 

472 expire, 

473 util.time_to_str(t, format='%Y-%m-%d %H:%M:%S')) 

474 

475 def update_channel_inventory(self, squirrel, constraint=None): 

476 if constraint is None: 

477 constraint = Constraint() 

478 

479 expiration_time = self._get_channels_expiration_time() 

480 now = time.time() 

481 

482 log_target = logger.info 

483 if self._constraint and self._constraint.contains(constraint) \ 

484 and (expiration_time is None or now < expiration_time): 

485 

486 status = 'using cached' 

487 

488 else: 

489 if self._constraint: 

490 constraint_temp = copy.deepcopy(self._constraint) 

491 constraint_temp.expand(constraint) 

492 constraint = constraint_temp 

493 

494 try: 

495 channel_sx = self._do_channel_query(constraint) 

496 

497 channel_sx.created = None # timestamp would ruin diff 

498 

499 fn = self._get_channels_path() 

500 util.ensuredirs(fn) 

501 fn_temp = fn + '.%i.temp' % os.getpid() 

502 

503 dump_all_spickle([channel_sx], filename=fn_temp) 

504 # channel_sx.dump_xml(filename=fn_temp) 

505 

506 status = move_or_keep(fn_temp, fn) 

507 

508 if status == 'upstream unchanged': 

509 squirrel.get_database().silent_touch(fn) 

510 

511 self._constraint = constraint 

512 self._dump_constraint() 

513 

514 except OSError as e: 

515 status = 'update failed (%s)' % str(e) 

516 log_target = logger.error 

517 

518 expiration_time = self._get_channels_expiration_time() 

519 self._log_meta( 

520 '%s (%s)' % (status, self._str_expires(expiration_time, now)), 

521 target=log_target) 

522 

523 fn = self._get_channels_path() 

524 if os.path.exists(fn): 

525 squirrel.add(fn) 

526 

527 def _do_channel_query(self, constraint): 

528 extra_args = {} 

529 

530 tmin = constraint.tmin \ 

531 if constraint.tmin is not None and constraint.tmin != g_tmin \ 

532 else g_tmin_queries 

533 

534 tmax = constraint.tmax \ 

535 if constraint.tmax is not None and constraint.tmax != g_tmax \ 

536 else g_tmax 

537 

538 if self.site in g_sites_not_supporting['startbefore']: 

539 ktmin = 'starttime' 

540 ktmax = 'endtime' 

541 else: 

542 ktmin = 'endafter' 

543 ktmax = 'startbefore' 

544 

545 if self.codes is None: 

546 extra_args[ktmin] = tmin 

547 extra_args[ktmax] = tmax 

548 

549 if self.site not in g_sites_not_supporting['includerestricted']: 

550 extra_args.update( 

551 includerestricted=( 

552 self.user_credentials is not None 

553 or self.auth_token is not None 

554 or self.auth_token_path is not None)) 

555 

556 if self.query_args is not None: 

557 extra_args.update(self.query_args) 

558 

559 self._log_meta('querying...') 

560 

561 try: 

562 channel_sx = fdsn.station( 

563 site=self.site, 

564 format='text', 

565 level='channel', 

566 selection=codes_to_selection(self.codes, tmin, tmax), 

567 **extra_args) 

568 

569 self._hotfix('channel', channel_sx) 

570 

571 return channel_sx 

572 

573 except fdsn.EmptyResult: 

574 return stationxml.FDSNStationXML(source='dummy-empty-result') 

575 

576 except fdsn.DownloadError as e: 

577 raise SquirrelError(str(e)) 

578 

579 def _load_constraint(self): 

580 fn = self._get_constraint_path() 

581 if op.exists(fn): 

582 with open(fn, 'rb') as f: 

583 self._constraint = pickle.load(f) 

584 else: 

585 self._constraint = None 

586 

587 def _dump_constraint(self): 

588 with open(self._get_constraint_path(), 'wb') as f: 

589 pickle.dump(self._constraint, f, protocol=2) 

590 

591 def _get_expiration_time(self, path): 

592 if self.expires is None: 

593 return None 

594 

595 try: 

596 t = os.stat(path)[8] 

597 return t + self.expires 

598 

599 except OSError: 

600 return 0.0 

601 

602 def _get_channels_expiration_time(self): 

603 return self._get_expiration_time(self._get_channels_path()) 

604 

605 def update_waveform_promises(self, squirrel, constraint): 

606 from ..base import gaps 

607 cpath = os.path.abspath(self._get_channels_path()) 

608 

609 ctmin = constraint.tmin 

610 ctmax = constraint.tmax 

611 

612 nuts = squirrel.iter_nuts( 

613 'channel', 

614 path=cpath, 

615 codes=constraint.codes, 

616 tmin=ctmin, 

617 tmax=ctmax) 

618 

619 coverages = squirrel.get_coverage( 

620 'waveform', 

621 codes=constraint.codes if constraint.codes else None, 

622 tmin=ctmin, 

623 tmax=ctmax) 

624 

625 codes_to_avail = defaultdict(list) 

626 for coverage in coverages: 

627 for tmin, tmax, _ in coverage.iter_spans(): 

628 codes_to_avail[coverage.codes].append((tmin, tmax)) 

629 

630 def sgaps(nut): 

631 for tmin, tmax in gaps( 

632 codes_to_avail[nut.codes], 

633 max(ctmin, nut.tmin) if ctmin is not None else nut.tmin, 

634 min(ctmax, nut.tmax) if ctmax is not None else nut.tmax): 

635 

636 subnut = clone(nut) 

637 subnut.tmin = tmin 

638 subnut.tmax = tmax 

639 

640 # ignore 1-sample gaps produced by rounding errors 

641 if subnut.tmax - subnut.tmin < 2*subnut.deltat: 

642 continue 

643 

644 yield subnut 

645 

646 def wanted(nuts): 

647 for nut in nuts: 

648 for nut in sgaps(nut): 

649 yield nut 

650 

651 path = self._source_id 

652 squirrel.add_virtual( 

653 (make_waveform_promise_nut( 

654 file_path=path, 

655 **nut.waveform_promise_kwargs) for nut in wanted(nuts)), 

656 virtual_paths=[path]) 

657 

658 def remove_waveform_promises(self, squirrel, from_database='selection'): 

659 ''' 

660 Remove waveform promises from live selection or global database. 

661 

662 :param from_database: 

663 Remove from live selection ``'selection'`` or global database 

664 ``'global'``. 

665 ''' 

666 

667 path = self._source_id 

668 if from_database == 'selection': 

669 squirrel.remove(path) 

670 elif from_database == 'global': 

671 squirrel.get_database().remove(path) 

672 else: 

673 raise ValueError( 

674 'Values allowed for from_database: ("selection", "global")') 

675 

676 def _get_user_credentials(self): 

677 d = {} 

678 if self.user_credentials is not None: 

679 d['user'], d['passwd'] = self.user_credentials 

680 

681 if self.auth_token is not None or self.auth_token_path is not None: 

682 d['token'] = self.get_auth_token() 

683 

684 return d 

685 

686 def download_waveforms( 

687 self, orders, success, batch_add, error_permanent, 

688 error_temporary): 

689 

690 elog = ErrorLog(site=self.site) 

691 orders.sort(key=orders_sort_key) 

692 neach = 20 

693 i = 0 

694 task = make_task( 

695 'FDSN "%s" waveforms: downloading' % self.site, len(orders)) 

696 

697 while i < len(orders): 

698 orders_now = orders[i:i+neach] 

699 selection_now = orders_to_selection(orders_now) 

700 nsamples_estimate = sum( 

701 order.estimate_nsamples() for order in orders_now) 

702 

703 nsuccess = 0 

704 elog.append_checkpoint() 

705 self._log_info_data( 

706 'downloading, %s' % order_summary(orders_now)) 

707 

708 all_paths = [] 

709 with tempfile.TemporaryDirectory() as tmpdir: 

710 try: 

711 data = fdsn.dataselect( 

712 site=self.site, selection=selection_now, 

713 **self._get_user_credentials()) 

714 

715 now = time.time() 

716 

717 path = op.join(tmpdir, 'tmp.mseed') 

718 with open(path, 'wb') as f: 

719 nread = 0 

720 while True: 

721 buf = data.read(1024) 

722 nread += len(buf) 

723 if not buf: 

724 break 

725 f.write(buf) 

726 

727 # abort if we get way more data than expected 

728 if nread > max( 

729 1024 * 1000, 

730 nsamples_estimate * 4 * 10): 

731 

732 raise Aborted('Too much data received.') 

733 

734 trs = io.load(path) 

735 

736 by_nslc = defaultdict(list) 

737 for tr in trs: 

738 by_nslc[tr.nslc_id].append(tr) 

739 

740 for order in orders_now: 

741 trs_order = [] 

742 err_this = None 

743 for tr in by_nslc[order.codes.nslc]: 

744 try: 

745 order.validate(tr) 

746 trs_order.append(tr.chop( 

747 order.tmin, order.tmax, inplace=False)) 

748 

749 except trace.NoData: 

750 err_this = ( 

751 'empty result', 'empty sub-interval') 

752 

753 except InvalidWaveform as e: 

754 err_this = ('invalid waveform', str(e)) 

755 

756 if len(trs_order) == 0: 

757 if err_this is None: 

758 err_this = ('empty result', '') 

759 

760 elog.append(now, order, *err_this) 

761 if order.is_near_real_time(): 

762 error_temporary(order) 

763 else: 

764 error_permanent(order) 

765 else: 

766 def tsame(ta, tb): 

767 return abs(tb - ta) < 2 * order.deltat 

768 

769 if len(trs_order) != 1 \ 

770 or not tsame( 

771 trs_order[0].tmin, order.tmin) \ 

772 or not tsame( 

773 trs_order[0].tmax, order.tmax): 

774 

775 if err_this: 

776 elog.append( 

777 now, order, 

778 'partial result, %s' % err_this[0], 

779 err_this[1]) 

780 else: 

781 elog.append(now, order, 'partial result') 

782 

783 paths = self._archive.add(order, trs_order) 

784 all_paths.extend(paths) 

785 

786 nsuccess += 1 

787 success(order, trs_order) 

788 

789 except fdsn.EmptyResult: 

790 now = time.time() 

791 for order in orders_now: 

792 elog.append(now, order, 'empty result') 

793 if order.is_near_real_time(): 

794 error_temporary(order) 

795 else: 

796 error_permanent(order) 

797 

798 except Aborted as e: 

799 now = time.time() 

800 for order in orders_now: 

801 elog.append(now, order, 'aborted', str(e)) 

802 error_permanent(order) 

803 

804 except (util.HTTPError, fdsn.DownloadError) as e: 

805 now = time.time() 

806 for order in orders_now: 

807 elog.append(now, order, 'http error', str(e)) 

808 error_temporary(order) 

809 

810 emessage = elog.summarize_recent() 

811 

812 self._log_info_data( 

813 '%i download%s %ssuccessful' % ( 

814 nsuccess, 

815 util.plural_s(nsuccess), 

816 '(partially) ' if emessage else '') 

817 + (', %s' % emessage if emessage else '')) 

818 

819 if all_paths: 

820 batch_add(all_paths) 

821 

822 i += neach 

823 task.update(i) 

824 

825 for agg in elog.iter_aggregates(): 

826 logger.warning(str(agg)) 

827 

828 task.done() 

829 

830 def _do_response_query(self, selection): 

831 extra_args = {} 

832 

833 if self.site not in g_sites_not_supporting['includerestricted']: 

834 extra_args.update( 

835 includerestricted=( 

836 self.user_credentials is not None 

837 or self.auth_token is not None 

838 or self.auth_token_path is not None)) 

839 

840 self._log_responses('querying...') 

841 

842 try: 

843 response_sx = fdsn.station( 

844 site=self.site, 

845 level='response', 

846 selection=selection, 

847 **extra_args) 

848 

849 self._hotfix('response', response_sx) 

850 return response_sx 

851 

852 except fdsn.EmptyResult: 

853 return stationxml.FDSNStationXML(source='dummy-empty-result') 

854 

855 except fdsn.DownloadError as e: 

856 raise SquirrelError(str(e)) 

857 

858 def update_response_inventory(self, squirrel, constraint): 

859 cpath = os.path.abspath(self._get_channels_path()) 

860 nuts = squirrel.iter_nuts( 

861 'channel', path=cpath, codes=constraint.codes) 

862 

863 tmin = g_tmin_queries 

864 tmax = g_tmax 

865 

866 selection = [] 

867 now = time.time() 

868 have = set() 

869 status = defaultdict(list) 

870 for nut in nuts: 

871 nslc = nut.codes.nslc 

872 if nslc in have: 

873 continue 

874 have.add(nslc) 

875 

876 fn = self._get_responses_path(nslc) 

877 expiration_time = self._get_expiration_time(fn) 

878 if os.path.exists(fn) \ 

879 and (expiration_time is None or now < expiration_time): 

880 status['using cached'].append(nslc) 

881 else: 

882 selection.append(nslc + (tmin, tmax)) 

883 

884 dummy = stationxml.FDSNStationXML(source='dummy-empty') 

885 neach = 100 

886 i = 0 

887 fns = [] 

888 while i < len(selection): 

889 selection_now = selection[i:i+neach] 

890 i += neach 

891 

892 try: 

893 sx = self._do_response_query(selection_now) 

894 except Exception as e: 

895 status['update failed (%s)' % str(e)].extend( 

896 entry[:4] for entry in selection_now) 

897 continue 

898 

899 sx.created = None # timestamp would ruin diff 

900 

901 by_nslc = dict(stationxml.split_channels(sx)) 

902 

903 for entry in selection_now: 

904 nslc = entry[:4] 

905 response_sx = by_nslc.get(nslc, dummy) 

906 try: 

907 fn = self._get_responses_path(nslc) 

908 fn_temp = fn + '.%i.temp' % os.getpid() 

909 

910 util.ensuredirs(fn_temp) 

911 

912 dump_all_spickle([response_sx], filename=fn_temp) 

913 # response_sx.dump_xml(filename=fn_temp) 

914 

915 status_this = move_or_keep(fn_temp, fn) 

916 

917 if status_this == 'upstream unchanged': 

918 try: 

919 squirrel.get_database().silent_touch(fn) 

920 except ExecuteGet1Error: 

921 pass 

922 

923 status[status_this].append(nslc) 

924 fns.append(fn) 

925 

926 except OSError as e: 

927 status['update failed (%s)' % str(e)].append(nslc) 

928 

929 for k in sorted(status): 

930 if k.find('failed') != -1: 

931 log_target = logger.error 

932 else: 

933 log_target = logger.info 

934 

935 self._log_responses( 

936 '%s: %s' % ( 

937 k, codes_to_str_abbreviated( 

938 CodesNSLCE(tup) for tup in status[k])), 

939 target=log_target) 

940 

941 if fns: 

942 squirrel.add(fns, kinds=['response']) 

943 

944 

945__all__ = [ 

946 'FDSNSource', 

947]