1# http://pyrocko.org - GPLv3 

2# 

3# The Pyrocko Developers, 21st Century 

4# ---|P------/S----------~Lg---------- 

5 

6import time 

7import os 

8import copy 

9import logging 

10import tempfile 

11import importlib.util 

12from collections import defaultdict 

13try: 

14 import cPickle as pickle 

15except ImportError: 

16 import pickle 

17import os.path as op 

18from .base import Source, Constraint 

19from ..model import make_waveform_promise_nut, ehash, InvalidWaveform, \ 

20 order_summary, WaveformOrder, g_tmin, g_tmax, g_tmin_queries, \ 

21 codes_to_str_abbreviated, CodesNSLCE 

22from ..database import ExecuteGet1Error 

23from pyrocko.squirrel.error import SquirrelError 

24from pyrocko.client import fdsn 

25 

26from pyrocko import util, trace, io 

27from pyrocko.io.io_common import FileLoadError 

28from pyrocko.io import stationxml 

29from pyrocko.progress import progress 

30from pyrocko import has_paths 

31 

32from pyrocko.guts import Object, String, Timestamp, List, Tuple, Int, Dict, \ 

33 Duration, Bool, clone 

34 

35guts_prefix = 'squirrel' 

36 

37fdsn.g_timeout = 60. 

38 

39logger = logging.getLogger('psq.client.fdsn') 

40 

41sites_not_supporting = { 

42 'startbefore': ['geonet'], 

43 'includerestricted': ['geonet']} 

44 

45 

46def make_task(*args): 

47 return progress.task(*args, logger=logger) 

48 

49 

50def diff(fn_a, fn_b): 

51 try: 

52 if os.stat(fn_a).st_size != os.stat(fn_b).st_size: 

53 return True 

54 

55 except OSError: 

56 return True 

57 

58 with open(fn_a, 'rb') as fa: 

59 with open(fn_b, 'rb') as fb: 

60 while True: 

61 a = fa.read(1024) 

62 b = fb.read(1024) 

63 if a != b: 

64 return True 

65 

66 if len(a) == 0 or len(b) == 0: 

67 return False 

68 

69 

70def move_or_keep(fn_temp, fn): 

71 if op.exists(fn): 

72 if diff(fn, fn_temp): 

73 os.rename(fn_temp, fn) 

74 status = 'updated' 

75 else: 

76 os.unlink(fn_temp) 

77 status = 'upstream unchanged' 

78 

79 else: 

80 os.rename(fn_temp, fn) 

81 status = 'new' 

82 

83 return status 

84 

85 

86class Archive(Object): 

87 

88 def add(self): 

89 raise NotImplementedError() 

90 

91 

92class MSeedArchive(Archive): 

93 template = String.T(default=op.join( 

94 '%(tmin_year)s', 

95 '%(tmin_month)s', 

96 '%(tmin_day)s', 

97 'trace_%(network)s_%(station)s_%(location)s_%(channel)s' 

98 + '_%(block_tmin_us)s_%(block_tmax_us)s.mseed')) 

99 

100 def __init__(self, **kwargs): 

101 Archive.__init__(self, **kwargs) 

102 self._base_path = None 

103 

104 def set_base_path(self, path): 

105 self._base_path = path 

106 

107 def add(self, order, trs): 

108 path = op.join(self._base_path, self.template) 

109 fmt = '%Y-%m-%d_%H-%M-%S.6FRAC' 

110 return io.save(trs, path, overwrite=True, additional=dict( 

111 block_tmin_us=util.time_to_str(order.tmin, format=fmt), 

112 block_tmax_us=util.time_to_str(order.tmax, format=fmt))) 

113 

114 

115def combine_selections(selection): 

116 out = [] 

117 last = None 

118 for this in selection: 

119 if last and this[:4] == last[:4] and this[4] == last[5]: 

120 last = last[:5] + (this[5],) 

121 else: 

122 if last: 

123 out.append(last) 

124 

125 last = this 

126 

127 if last: 

128 out.append(last) 

129 

130 return out 

131 

132 

133def orders_sort_key(order): 

134 return (order.codes, order.tmin) 

135 

136 

137def orders_to_selection(orders): 

138 selection = [] 

139 for order in sorted(orders, key=orders_sort_key): 

140 selection.append( 

141 order.codes.nslc + ( 

142 order.tmin-1.0*order.deltat, 

143 order.tmax+1.0*order.deltat)) 

144 

145 return combine_selections(selection) 

146 

147 

148class ErrorEntry(Object): 

149 time = Timestamp.T() 

150 order = WaveformOrder.T() 

151 kind = String.T() 

152 details = String.T(optional=True) 

153 

154 

155class ErrorAggregate(Object): 

156 site = String.T() 

157 kind = String.T() 

158 details = String.T() 

159 entries = List.T(ErrorEntry.T()) 

160 codes = List.T(CodesNSLCE.T()) 

161 time_spans = List.T(Tuple.T(2, Timestamp.T())) 

162 

163 def __str__(self): 

164 codes = [str(x) for x in self.codes] 

165 scodes = '\n' + util.ewrap(codes, indent=' ') if codes else '<none>' 

166 tss = self.time_spans 

167 sspans = '\n' + util.ewrap(('%s - %s' % ( 

168 util.time_to_str(ts[0]), util.time_to_str(ts[1])) for ts in tss), 

169 indent=' ') 

170 

171 return ('FDSN "%s": download error summary for "%s" (%i)\n%s ' 

172 'Codes:%s\n Time spans:%s') % ( 

173 self.site, 

174 self.kind, 

175 len(self.entries), 

176 ' Details: %s\n' % self.details if self.details else '', 

177 scodes, 

178 sspans) 

179 

180 

181class ErrorLog(Object): 

182 site = String.T() 

183 entries = List.T(ErrorEntry.T()) 

184 checkpoints = List.T(Int.T()) 

185 

186 def append_checkpoint(self): 

187 self.checkpoints.append(len(self.entries)) 

188 

189 def append(self, time, order, kind, details=''): 

190 entry = ErrorEntry(time=time, order=order, kind=kind, details=details) 

191 self.entries.append(entry) 

192 

193 def iter_aggregates(self): 

194 by_kind_details = defaultdict(list) 

195 for entry in self.entries: 

196 by_kind_details[entry.kind, entry.details].append(entry) 

197 

198 kind_details = sorted(by_kind_details.keys()) 

199 

200 for kind, details in kind_details: 

201 entries = by_kind_details[kind, details] 

202 codes = sorted(set(entry.order.codes for entry in entries)) 

203 selection = orders_to_selection(entry.order for entry in entries) 

204 time_spans = sorted(set(row[-2:] for row in selection)) 

205 yield ErrorAggregate( 

206 site=self.site, 

207 kind=kind, 

208 details=details, 

209 entries=entries, 

210 codes=codes, 

211 time_spans=time_spans) 

212 

213 def summarize_recent(self): 

214 ioff = self.checkpoints[-1] if self.checkpoints else 0 

215 recent = self.entries[ioff:] 

216 kinds = sorted(set(entry.kind for entry in recent)) 

217 if recent: 

218 return '%i error%s (%s)' % ( 

219 len(recent), util.plural_s(recent), '; '.join(kinds)) 

220 else: 

221 return '' 

222 

223 

224class Aborted(SquirrelError): 

225 pass 

226 

227 

228class FDSNSource(Source, has_paths.HasPaths): 

229 

230 ''' 

231 Squirrel data-source to transparently get data from FDSN web services. 

232 

233 Attaching an :py:class:`FDSNSource` object to a :py:class:`Squirrel` allows 

234 the latter to download station and waveform data from an FDSN web service 

235 should the data not already happen to be available locally. 

236 ''' 

237 

238 site = String.T( 

239 help='FDSN site url or alias name (see ' 

240 ':py:mod:`pyrocko.client.fdsn`).') 

241 

242 query_args = Dict.T( 

243 String.T(), String.T(), 

244 optional=True, 

245 help='Common query arguments, which are appended to all queries.') 

246 

247 expires = Duration.T( 

248 optional=True, 

249 help='Expiration time [s]. Information older than this will be ' 

250 'refreshed. This only applies to station-metadata. Waveforms do ' 

251 'not expire. If set to ``None`` neither type of data expires.') 

252 

253 cache_path = String.T( 

254 optional=True, 

255 help='Directory path where any downloaded waveforms and station ' 

256 'meta-data are to be kept. By default the Squirrel ' 

257 "environment's cache directory is used.") 

258 

259 shared_waveforms = Bool.T( 

260 default=False, 

261 help='If ``True``, waveforms are shared with other FDSN sources in ' 

262 'the same Squirrel environment. If ``False``, they are kept ' 

263 'separate.') 

264 

265 user_credentials = Tuple.T( 

266 2, String.T(), 

267 optional=True, 

268 help='User and password for FDSN servers requiring password ' 

269 'authentication') 

270 

271 auth_token = String.T( 

272 optional=True, 

273 help='Authentication token to be presented to the FDSN server.') 

274 

275 auth_token_path = String.T( 

276 optional=True, 

277 help='Path to file containing the authentication token to be ' 

278 'presented to the FDSN server.') 

279 

280 hotfix_module_path = has_paths.Path.T( 

281 optional=True, 

282 help='Path to Python module to locally patch metadata errors.') 

283 

284 def __init__(self, site, query_args=None, **kwargs): 

285 Source.__init__(self, site=site, query_args=query_args, **kwargs) 

286 

287 self._constraint = None 

288 self._hash = self.make_hash() 

289 self._source_id = 'client:fdsn:%s' % self._hash 

290 self._error_infos = [] 

291 

292 def describe(self): 

293 return self._source_id 

294 

295 def make_hash(self): 

296 s = self.site 

297 s += 'notoken' \ 

298 if (self.auth_token is None and self.auth_token_path is None) \ 

299 else 'token' 

300 

301 if self.user_credentials is not None: 

302 s += self.user_credentials[0] 

303 else: 

304 s += 'nocred' 

305 

306 if self.query_args is not None: 

307 s += ','.join( 

308 '%s:%s' % (k, self.query_args[k]) 

309 for k in sorted(self.query_args.keys())) 

310 else: 

311 s += 'noqueryargs' 

312 

313 return ehash(s) 

314 

315 def get_hash(self): 

316 return self._hash 

317 

318 def get_auth_token(self): 

319 if self.auth_token: 

320 return self.auth_token 

321 

322 elif self.auth_token_path is not None: 

323 try: 

324 with open(self.auth_token_path, 'rb') as f: 

325 return f.read().decode('ascii') 

326 

327 except OSError as e: 

328 raise FileLoadError( 

329 'Cannot load auth token file (%s): %s' 

330 % (str(e), self.auth_token_path)) 

331 

332 else: 

333 raise Exception( 

334 'FDSNSource: auth_token and auth_token_path are mutually ' 

335 'exclusive.') 

336 

337 def setup(self, squirrel, check=True): 

338 self._cache_path = op.join( 

339 self.cache_path or squirrel._cache_path, 'fdsn') 

340 

341 util.ensuredir(self._cache_path) 

342 self._load_constraint() 

343 self._archive = MSeedArchive() 

344 waveforms_path = self._get_waveforms_path() 

345 util.ensuredir(waveforms_path) 

346 self._archive.set_base_path(waveforms_path) 

347 

348 squirrel.add( 

349 self._get_waveforms_path(), 

350 check=check) 

351 

352 fn = self._get_channels_path() 

353 if os.path.exists(fn): 

354 squirrel.add(fn) 

355 

356 squirrel.add_virtual( 

357 [], virtual_paths=[self._source_id]) 

358 

359 responses_path = self._get_responses_path() 

360 if os.path.exists(responses_path): 

361 squirrel.add(responses_path, kinds=['response']) 

362 

363 self._hotfix_module = None 

364 

365 def _hotfix(self, query_type, sx): 

366 if self.hotfix_module_path is None: 

367 return 

368 

369 if self._hotfix_module is None: 

370 module_path = self.expand_path(self.hotfix_module_path) 

371 spec = importlib.util.spec_from_file_location( 

372 'hotfix_' + self._hash, module_path) 

373 self._hotfix_module = importlib.util.module_from_spec(spec) 

374 spec.loader.exec_module(self._hotfix_module) 

375 

376 hook = getattr( 

377 self._hotfix_module, 'stationxml_' + query_type + '_hook') 

378 

379 return hook(sx) 

380 

381 def _get_constraint_path(self): 

382 return op.join(self._cache_path, self._hash, 'constraint.pickle') 

383 

384 def _get_channels_path(self): 

385 return op.join(self._cache_path, self._hash, 'channels.stationxml') 

386 

387 def _get_responses_path(self, nslc=None): 

388 dirpath = op.join( 

389 self._cache_path, self._hash, 'responses') 

390 

391 if nslc is None: 

392 return dirpath 

393 else: 

394 return op.join( 

395 dirpath, 'response_%s_%s_%s_%s.stationxml' % nslc) 

396 

397 def _get_waveforms_path(self): 

398 if self.shared_waveforms: 

399 return op.join(self._cache_path, 'waveforms') 

400 else: 

401 return op.join(self._cache_path, self._hash, 'waveforms') 

402 

403 def _log_meta(self, message, target=logger.info): 

404 log_prefix = 'FDSN "%s" metadata:' % self.site 

405 target(' '.join((log_prefix, message))) 

406 

407 def _log_responses(self, message, target=logger.info): 

408 log_prefix = 'FDSN "%s" responses:' % self.site 

409 target(' '.join((log_prefix, message))) 

410 

411 def _log_info_data(self, *args): 

412 log_prefix = 'FDSN "%s" waveforms:' % self.site 

413 logger.info(' '.join((log_prefix,) + args)) 

414 

415 def _str_expires(self, t, now): 

416 if t is None: 

417 return 'expires: never' 

418 else: 

419 expire = 'expires' if t > now else 'expired' 

420 return '%s: %s' % ( 

421 expire, 

422 util.time_to_str(t, format='%Y-%m-%d %H:%M:%S')) 

423 

424 def update_channel_inventory(self, squirrel, constraint=None): 

425 if constraint is None: 

426 constraint = Constraint() 

427 

428 expiration_time = self._get_channels_expiration_time() 

429 now = time.time() 

430 

431 log_target = logger.info 

432 if self._constraint and self._constraint.contains(constraint) \ 

433 and (expiration_time is None or now < expiration_time): 

434 

435 status = 'using cached' 

436 

437 else: 

438 if self._constraint: 

439 constraint_temp = copy.deepcopy(self._constraint) 

440 constraint_temp.expand(constraint) 

441 constraint = constraint_temp 

442 

443 try: 

444 channel_sx = self._do_channel_query(constraint) 

445 

446 channel_sx.created = None # timestamp would ruin diff 

447 

448 fn = self._get_channels_path() 

449 util.ensuredirs(fn) 

450 fn_temp = fn + '.%i.temp' % os.getpid() 

451 channel_sx.dump_xml(filename=fn_temp) 

452 

453 status = move_or_keep(fn_temp, fn) 

454 

455 if status == 'upstream unchanged': 

456 squirrel.get_database().silent_touch(fn) 

457 

458 self._constraint = constraint 

459 self._dump_constraint() 

460 

461 except OSError as e: 

462 status = 'update failed (%s)' % str(e) 

463 log_target = logger.error 

464 

465 expiration_time = self._get_channels_expiration_time() 

466 self._log_meta( 

467 '%s (%s)' % (status, self._str_expires(expiration_time, now)), 

468 target=log_target) 

469 

470 fn = self._get_channels_path() 

471 if os.path.exists(fn): 

472 squirrel.add(fn) 

473 

474 def _do_channel_query(self, constraint): 

475 extra_args = {} 

476 

477 if self.site in sites_not_supporting['startbefore']: 

478 if constraint.tmin is not None and constraint.tmin != g_tmin: 

479 extra_args['starttime'] = constraint.tmin 

480 if constraint.tmax is not None and constraint.tmax != g_tmax: 

481 extra_args['endtime'] = constraint.tmax 

482 

483 else: 

484 if constraint.tmin is not None and constraint.tmin != g_tmin: 

485 extra_args['endafter'] = constraint.tmin 

486 if constraint.tmax is not None and constraint.tmax != g_tmax: 

487 extra_args['startbefore'] = constraint.tmax 

488 

489 if self.site not in sites_not_supporting['includerestricted']: 

490 extra_args.update( 

491 includerestricted=( 

492 self.user_credentials is not None 

493 or self.auth_token is not None 

494 or self.auth_token_path is not None)) 

495 

496 if self.query_args is not None: 

497 extra_args.update(self.query_args) 

498 

499 self._log_meta('querying...') 

500 

501 try: 

502 channel_sx = fdsn.station( 

503 site=self.site, 

504 format='text', 

505 level='channel', 

506 **extra_args) 

507 

508 self._hotfix('channel', channel_sx) 

509 

510 return channel_sx 

511 

512 except fdsn.EmptyResult: 

513 return stationxml.FDSNStationXML(source='dummy-empty-result') 

514 

515 def _load_constraint(self): 

516 fn = self._get_constraint_path() 

517 if op.exists(fn): 

518 with open(fn, 'rb') as f: 

519 self._constraint = pickle.load(f) 

520 else: 

521 self._constraint = None 

522 

523 def _dump_constraint(self): 

524 with open(self._get_constraint_path(), 'wb') as f: 

525 pickle.dump(self._constraint, f, protocol=2) 

526 

527 def _get_expiration_time(self, path): 

528 if self.expires is None: 

529 return None 

530 

531 try: 

532 t = os.stat(path)[8] 

533 return t + self.expires 

534 

535 except OSError: 

536 return 0.0 

537 

538 def _get_channels_expiration_time(self): 

539 return self._get_expiration_time(self._get_channels_path()) 

540 

541 def update_waveform_promises(self, squirrel, constraint): 

542 from ..base import gaps 

543 now = time.time() 

544 cpath = os.path.abspath(self._get_channels_path()) 

545 

546 ctmin = constraint.tmin 

547 ctmax = constraint.tmax 

548 

549 nuts = squirrel.iter_nuts( 

550 'channel', 

551 path=cpath, 

552 codes=constraint.codes, 

553 tmin=ctmin, 

554 tmax=ctmax) 

555 

556 coverages = squirrel.get_coverage( 

557 'waveform', 

558 codes=constraint.codes if constraint.codes else None, 

559 tmin=ctmin, 

560 tmax=ctmax) 

561 

562 codes_to_avail = defaultdict(list) 

563 for coverage in coverages: 

564 for tmin, tmax, _ in coverage.iter_spans(): 

565 codes_to_avail[coverage.codes].append((tmin, tmax)) 

566 

567 def sgaps(nut): 

568 for tmin, tmax in gaps( 

569 codes_to_avail[nut.codes], 

570 max(ctmin, nut.tmin) if ctmin is not None else nut.tmin, 

571 min(ctmax, nut.tmax) if ctmax is not None else nut.tmax): 

572 

573 subnut = clone(nut) 

574 subnut.tmin = tmin 

575 subnut.tmax = tmax 

576 

577 # ignore 1-sample gaps produced by rounding errors 

578 if subnut.tmax - subnut.tmin < 2*subnut.deltat: 

579 continue 

580 

581 yield subnut 

582 

583 def wanted(nuts): 

584 for nut in nuts: 

585 if nut.tmin < now: 

586 if nut.tmax > now: 

587 nut.tmax = now 

588 

589 for nut in sgaps(nut): 

590 yield nut 

591 

592 path = self._source_id 

593 squirrel.add_virtual( 

594 (make_waveform_promise_nut( 

595 file_path=path, 

596 **nut.waveform_promise_kwargs) for nut in wanted(nuts)), 

597 virtual_paths=[path]) 

598 

599 def remove_waveform_promises(self, squirrel, from_database='selection'): 

600 ''' 

601 Remove waveform promises from live selection or global database. 

602 

603 :param from_database: 

604 Remove from live selection ``'selection'`` or global database 

605 ``'global'``. 

606 ''' 

607 

608 path = self._source_id 

609 if from_database == 'selection': 

610 squirrel.remove(path) 

611 elif from_database == 'global': 

612 squirrel.get_database().remove(path) 

613 else: 

614 raise ValueError( 

615 'Values allowed for from_database: ("selection", "global")') 

616 

617 def _get_user_credentials(self): 

618 d = {} 

619 if self.user_credentials is not None: 

620 d['user'], d['passwd'] = self.user_credentials 

621 

622 if self.auth_token is not None or self.auth_token_path is not None: 

623 d['token'] = self.get_auth_token() 

624 

625 return d 

626 

627 def download_waveforms( 

628 self, orders, success, batch_add, error_permanent, 

629 error_temporary): 

630 

631 elog = ErrorLog(site=self.site) 

632 orders.sort(key=orders_sort_key) 

633 neach = 20 

634 i = 0 

635 task = make_task( 

636 'FDSN "%s" waveforms: downloading' % self.site, len(orders)) 

637 

638 while i < len(orders): 

639 orders_now = orders[i:i+neach] 

640 selection_now = orders_to_selection(orders_now) 

641 nsamples_estimate = sum( 

642 order.estimate_nsamples() for order in orders_now) 

643 

644 nsuccess = 0 

645 elog.append_checkpoint() 

646 self._log_info_data( 

647 'downloading, %s' % order_summary(orders_now)) 

648 

649 all_paths = [] 

650 with tempfile.TemporaryDirectory() as tmpdir: 

651 try: 

652 data = fdsn.dataselect( 

653 site=self.site, selection=selection_now, 

654 **self._get_user_credentials()) 

655 

656 now = time.time() 

657 

658 path = op.join(tmpdir, 'tmp.mseed') 

659 with open(path, 'wb') as f: 

660 nread = 0 

661 while True: 

662 buf = data.read(1024) 

663 nread += len(buf) 

664 if not buf: 

665 break 

666 f.write(buf) 

667 

668 # abort if we get way more data than expected 

669 if nread > max( 

670 1024 * 1000, 

671 nsamples_estimate * 4 * 10): 

672 

673 raise Aborted('Too much data received.') 

674 

675 trs = io.load(path) 

676 

677 by_nslc = defaultdict(list) 

678 for tr in trs: 

679 by_nslc[tr.nslc_id].append(tr) 

680 

681 for order in orders_now: 

682 trs_order = [] 

683 err_this = None 

684 for tr in by_nslc[order.codes.nslc]: 

685 try: 

686 order.validate(tr) 

687 trs_order.append(tr.chop( 

688 order.tmin, order.tmax, inplace=False)) 

689 

690 except trace.NoData: 

691 err_this = ( 

692 'empty result', 'empty sub-interval') 

693 

694 except InvalidWaveform as e: 

695 err_this = ('invalid waveform', str(e)) 

696 

697 if len(trs_order) == 0: 

698 if err_this is None: 

699 err_this = ('empty result', '') 

700 

701 elog.append(now, order, *err_this) 

702 error_permanent(order) 

703 else: 

704 def tsame(ta, tb): 

705 return abs(tb - ta) < 2 * order.deltat 

706 

707 if len(trs_order) != 1 \ 

708 or not tsame( 

709 trs_order[0].tmin, order.tmin) \ 

710 or not tsame( 

711 trs_order[0].tmax, order.tmax): 

712 

713 if err_this: 

714 elog.append( 

715 now, order, 

716 'partial result, %s' % err_this[0], 

717 err_this[1]) 

718 else: 

719 elog.append(now, order, 'partial result') 

720 

721 paths = self._archive.add(order, trs_order) 

722 all_paths.extend(paths) 

723 

724 nsuccess += 1 

725 success(order) 

726 

727 except fdsn.EmptyResult: 

728 now = time.time() 

729 for order in orders_now: 

730 elog.append(now, order, 'empty result') 

731 error_permanent(order) 

732 

733 except Aborted as e: 

734 now = time.time() 

735 for order in orders_now: 

736 elog.append(now, order, 'aborted', str(e)) 

737 error_permanent(order) 

738 

739 except util.HTTPError as e: 

740 now = time.time() 

741 for order in orders_now: 

742 elog.append(now, order, 'http error', str(e)) 

743 error_temporary(order) 

744 

745 emessage = elog.summarize_recent() 

746 

747 self._log_info_data( 

748 '%i download%s %ssuccessful' % ( 

749 nsuccess, 

750 util.plural_s(nsuccess), 

751 '(partially) ' if emessage else '') 

752 + (', %s' % emessage if emessage else '')) 

753 

754 if all_paths: 

755 batch_add(all_paths) 

756 

757 i += neach 

758 task.update(i) 

759 

760 for agg in elog.iter_aggregates(): 

761 logger.warning(str(agg)) 

762 

763 task.done() 

764 

765 def _do_response_query(self, selection): 

766 extra_args = {} 

767 

768 if self.site not in sites_not_supporting['includerestricted']: 

769 extra_args.update( 

770 includerestricted=( 

771 self.user_credentials is not None 

772 or self.auth_token is not None 

773 or self.auth_token_path is not None)) 

774 

775 self._log_responses('querying...') 

776 

777 try: 

778 response_sx = fdsn.station( 

779 site=self.site, 

780 level='response', 

781 selection=selection, 

782 **extra_args) 

783 

784 self._hotfix('response', response_sx) 

785 return response_sx 

786 

787 except fdsn.EmptyResult: 

788 return stationxml.FDSNStationXML(source='dummy-empty-result') 

789 

790 def update_response_inventory(self, squirrel, constraint): 

791 cpath = os.path.abspath(self._get_channels_path()) 

792 nuts = squirrel.iter_nuts( 

793 'channel', path=cpath, codes=constraint.codes) 

794 

795 tmin = g_tmin_queries 

796 tmax = g_tmax 

797 

798 selection = [] 

799 now = time.time() 

800 have = set() 

801 status = defaultdict(list) 

802 for nut in nuts: 

803 nslc = nut.codes.nslc 

804 if nslc in have: 

805 continue 

806 have.add(nslc) 

807 

808 fn = self._get_responses_path(nslc) 

809 expiration_time = self._get_expiration_time(fn) 

810 if os.path.exists(fn) \ 

811 and (expiration_time is None or now < expiration_time): 

812 status['using cached'].append(nslc) 

813 else: 

814 selection.append(nslc + (tmin, tmax)) 

815 

816 dummy = stationxml.FDSNStationXML(source='dummy-empty') 

817 neach = 100 

818 i = 0 

819 fns = [] 

820 while i < len(selection): 

821 selection_now = selection[i:i+neach] 

822 i += neach 

823 

824 try: 

825 sx = self._do_response_query(selection_now) 

826 except Exception as e: 

827 status['update failed (%s)' % str(e)].extend( 

828 entry[:4] for entry in selection_now) 

829 continue 

830 

831 sx.created = None # timestamp would ruin diff 

832 

833 by_nslc = dict(stationxml.split_channels(sx)) 

834 

835 for entry in selection_now: 

836 nslc = entry[:4] 

837 response_sx = by_nslc.get(nslc, dummy) 

838 try: 

839 fn = self._get_responses_path(nslc) 

840 fn_temp = fn + '.%i.temp' % os.getpid() 

841 

842 util.ensuredirs(fn_temp) 

843 response_sx.dump_xml(filename=fn_temp) 

844 

845 status_this = move_or_keep(fn_temp, fn) 

846 

847 if status_this == 'upstream unchanged': 

848 try: 

849 squirrel.get_database().silent_touch(fn) 

850 except ExecuteGet1Error: 

851 pass 

852 

853 status[status_this].append(nslc) 

854 fns.append(fn) 

855 

856 except OSError as e: 

857 status['update failed (%s)' % str(e)].append(nslc) 

858 

859 for k in sorted(status): 

860 if k.find('failed') != -1: 

861 log_target = logger.error 

862 else: 

863 log_target = logger.info 

864 

865 self._log_responses( 

866 '%s: %s' % ( 

867 k, codes_to_str_abbreviated( 

868 CodesNSLCE(tup) for tup in status[k])), 

869 target=log_target) 

870 

871 if fns: 

872 squirrel.add(fns, kinds=['response']) 

873 

874 

875__all__ = [ 

876 'FDSNSource', 

877]