1# http://pyrocko.org - GPLv3 

2# 

3# The Pyrocko Developers, 21st Century 

4# ---|P------/S----------~Lg---------- 

5 

6import time 

7import os 

8import copy 

9import logging 

10import tempfile 

11import importlib.util 

12from collections import defaultdict 

13try: 

14 import cPickle as pickle 

15except ImportError: 

16 import pickle 

17import os.path as op 

18from .base import Source, Constraint 

19from ..model import make_waveform_promise_nut, ehash, InvalidWaveform, \ 

20 order_summary, WaveformOrder, g_tmin, g_tmax, g_tmin_queries, \ 

21 codes_to_str_abbreviated, CodesNSLCE 

22from ..database import ExecuteGet1Error 

23from pyrocko.client import fdsn 

24 

25from pyrocko import util, trace, io 

26from pyrocko.io.io_common import FileLoadError 

27from pyrocko.io import stationxml 

28from pyrocko.progress import progress 

29from pyrocko import has_paths 

30 

31from pyrocko.guts import Object, String, Timestamp, List, Tuple, Int, Dict, \ 

32 Duration, Bool, clone 

33 

34guts_prefix = 'squirrel' 

35 

36fdsn.g_timeout = 60. 

37 

38logger = logging.getLogger('psq.client.fdsn') 

39 

40sites_not_supporting = { 

41 'startbefore': ['geonet'], 

42 'includerestricted': ['geonet']} 

43 

44 

45def make_task(*args): 

46 return progress.task(*args, logger=logger) 

47 

48 

49def diff(fn_a, fn_b): 

50 try: 

51 if os.stat(fn_a).st_size != os.stat(fn_b).st_size: 

52 return True 

53 

54 except OSError: 

55 return True 

56 

57 with open(fn_a, 'rb') as fa: 

58 with open(fn_b, 'rb') as fb: 

59 while True: 

60 a = fa.read(1024) 

61 b = fb.read(1024) 

62 if a != b: 

63 return True 

64 

65 if len(a) == 0 or len(b) == 0: 

66 return False 

67 

68 

69def move_or_keep(fn_temp, fn): 

70 if op.exists(fn): 

71 if diff(fn, fn_temp): 

72 os.rename(fn_temp, fn) 

73 status = 'updated' 

74 else: 

75 os.unlink(fn_temp) 

76 status = 'upstream unchanged' 

77 

78 else: 

79 os.rename(fn_temp, fn) 

80 status = 'new' 

81 

82 return status 

83 

84 

85class Archive(Object): 

86 

87 def add(self): 

88 raise NotImplementedError() 

89 

90 

91class MSeedArchive(Archive): 

92 template = String.T(default=op.join( 

93 '%(tmin_year)s', 

94 '%(tmin_month)s', 

95 '%(tmin_day)s', 

96 'trace_%(network)s_%(station)s_%(location)s_%(channel)s' 

97 + '_%(block_tmin_us)s_%(block_tmax_us)s.mseed')) 

98 

99 def __init__(self, **kwargs): 

100 Archive.__init__(self, **kwargs) 

101 self._base_path = None 

102 

103 def set_base_path(self, path): 

104 self._base_path = path 

105 

106 def add(self, order, trs): 

107 path = op.join(self._base_path, self.template) 

108 fmt = '%Y-%m-%d_%H-%M-%S.6FRAC' 

109 return io.save(trs, path, overwrite=True, additional=dict( 

110 block_tmin_us=util.time_to_str(order.tmin, format=fmt), 

111 block_tmax_us=util.time_to_str(order.tmax, format=fmt))) 

112 

113 

114def combine_selections(selection): 

115 out = [] 

116 last = None 

117 for this in selection: 

118 if last and this[:4] == last[:4] and this[4] == last[5]: 

119 last = last[:5] + (this[5],) 

120 else: 

121 if last: 

122 out.append(last) 

123 

124 last = this 

125 

126 if last: 

127 out.append(last) 

128 

129 return out 

130 

131 

132def orders_sort_key(order): 

133 return (order.codes, order.tmin) 

134 

135 

136def orders_to_selection(orders): 

137 selection = [] 

138 for order in sorted(orders, key=orders_sort_key): 

139 selection.append( 

140 order.codes.nslc + ( 

141 order.tmin-1.0*order.deltat, 

142 order.tmax+1.0*order.deltat)) 

143 

144 return combine_selections(selection) 

145 

146 

147class ErrorEntry(Object): 

148 time = Timestamp.T() 

149 order = WaveformOrder.T() 

150 kind = String.T() 

151 details = String.T(optional=True) 

152 

153 

154class ErrorAggregate(Object): 

155 site = String.T() 

156 kind = String.T() 

157 details = String.T() 

158 entries = List.T(ErrorEntry.T()) 

159 codes = List.T(CodesNSLCE.T()) 

160 time_spans = List.T(Tuple.T(2, Timestamp.T())) 

161 

162 def __str__(self): 

163 codes = [str(x) for x in self.codes] 

164 scodes = '\n' + util.ewrap(codes, indent=' ') if codes else '<none>' 

165 tss = self.time_spans 

166 sspans = '\n' + util.ewrap(('%s - %s' % ( 

167 util.time_to_str(ts[0]), util.time_to_str(ts[1])) for ts in tss), 

168 indent=' ') 

169 

170 return ('FDSN "%s": download error summary for "%s" (%i)\n%s ' 

171 'Codes:%s\n Time spans:%s') % ( 

172 self.site, 

173 self.kind, 

174 len(self.entries), 

175 ' Details: %s\n' % self.details if self.details else '', 

176 scodes, 

177 sspans) 

178 

179 

180class ErrorLog(Object): 

181 site = String.T() 

182 entries = List.T(ErrorEntry.T()) 

183 checkpoints = List.T(Int.T()) 

184 

185 def append_checkpoint(self): 

186 self.checkpoints.append(len(self.entries)) 

187 

188 def append(self, time, order, kind, details=''): 

189 entry = ErrorEntry(time=time, order=order, kind=kind, details=details) 

190 self.entries.append(entry) 

191 

192 def iter_aggregates(self): 

193 by_kind_details = defaultdict(list) 

194 for entry in self.entries: 

195 by_kind_details[entry.kind, entry.details].append(entry) 

196 

197 kind_details = sorted(by_kind_details.keys()) 

198 

199 for kind, details in kind_details: 

200 entries = by_kind_details[kind, details] 

201 codes = sorted(set(entry.order.codes for entry in entries)) 

202 selection = orders_to_selection(entry.order for entry in entries) 

203 time_spans = sorted(set(row[-2:] for row in selection)) 

204 yield ErrorAggregate( 

205 site=self.site, 

206 kind=kind, 

207 details=details, 

208 entries=entries, 

209 codes=codes, 

210 time_spans=time_spans) 

211 

212 def summarize_recent(self): 

213 ioff = self.checkpoints[-1] if self.checkpoints else 0 

214 recent = self.entries[ioff:] 

215 kinds = sorted(set(entry.kind for entry in recent)) 

216 if recent: 

217 return '%i error%s (%s)' % ( 

218 len(recent), util.plural_s(recent), '; '.join(kinds)) 

219 else: 

220 return '' 

221 

222 

223class FDSNSource(Source, has_paths.HasPaths): 

224 

225 ''' 

226 Squirrel data-source to transparently get data from FDSN web services. 

227 

228 Attaching an :py:class:`FDSNSource` object to a :py:class:`Squirrel` allows 

229 the latter to download station and waveform data from an FDSN web service 

230 should the data not already happen to be available locally. 

231 ''' 

232 

233 site = String.T( 

234 help='FDSN site url or alias name (see ' 

235 ':py:mod:`pyrocko.client.fdsn`).') 

236 

237 query_args = Dict.T( 

238 String.T(), String.T(), 

239 optional=True, 

240 help='Common query arguments, which are appended to all queries.') 

241 

242 expires = Duration.T( 

243 optional=True, 

244 help='Expiration time [s]. Information older than this will be ' 

245 'refreshed. This only applies to station-metadata. Waveforms do ' 

246 'not expire. If set to ``None`` neither type of data expires.') 

247 

248 cache_path = String.T( 

249 optional=True, 

250 help='Directory path where any downloaded waveforms and station ' 

251 'meta-data are to be kept. By default the Squirrel ' 

252 'environment\'s cache directory is used.') 

253 

254 shared_waveforms = Bool.T( 

255 default=False, 

256 help='If ``True``, waveforms are shared with other FDSN sources in ' 

257 'the same Squirrel environment. If ``False``, they are kept ' 

258 'separate.') 

259 

260 user_credentials = Tuple.T( 

261 2, String.T(), 

262 optional=True, 

263 help='User and password for FDSN servers requiring password ' 

264 'authentication') 

265 

266 auth_token = String.T( 

267 optional=True, 

268 help='Authentication token to be presented to the FDSN server.') 

269 

270 auth_token_path = String.T( 

271 optional=True, 

272 help='Path to file containing the authentication token to be ' 

273 'presented to the FDSN server.') 

274 

275 hotfix_module_path = has_paths.Path.T( 

276 optional=True, 

277 help='Path to Python module to locally patch metadata errors.') 

278 

279 def __init__(self, site, query_args=None, **kwargs): 

280 Source.__init__(self, site=site, query_args=query_args, **kwargs) 

281 

282 self._constraint = None 

283 self._hash = self.make_hash() 

284 self._source_id = 'client:fdsn:%s' % self._hash 

285 self._error_infos = [] 

286 

287 def describe(self): 

288 return self._source_id 

289 

290 def make_hash(self): 

291 s = self.site 

292 s += 'notoken' \ 

293 if (self.auth_token is None and self.auth_token_path is None) \ 

294 else 'token' 

295 

296 if self.user_credentials is not None: 

297 s += self.user_credentials[0] 

298 else: 

299 s += 'nocred' 

300 

301 if self.query_args is not None: 

302 s += ','.join( 

303 '%s:%s' % (k, self.query_args[k]) 

304 for k in sorted(self.query_args.keys())) 

305 else: 

306 s += 'noqueryargs' 

307 

308 return ehash(s) 

309 

310 def get_hash(self): 

311 return self._hash 

312 

313 def get_auth_token(self): 

314 if self.auth_token: 

315 return self.auth_token 

316 

317 elif self.auth_token_path is not None: 

318 try: 

319 with open(self.auth_token_path, 'rb') as f: 

320 return f.read().decode('ascii') 

321 

322 except OSError as e: 

323 raise FileLoadError( 

324 'Cannot load auth token file (%s): %s' 

325 % (str(e), self.auth_token_path)) 

326 

327 else: 

328 raise Exception( 

329 'FDSNSource: auth_token and auth_token_path are mutually ' 

330 'exclusive.') 

331 

332 def setup(self, squirrel, check=True): 

333 self._cache_path = op.join( 

334 self.cache_path or squirrel._cache_path, 'fdsn') 

335 

336 util.ensuredir(self._cache_path) 

337 self._load_constraint() 

338 self._archive = MSeedArchive() 

339 waveforms_path = self._get_waveforms_path() 

340 util.ensuredir(waveforms_path) 

341 self._archive.set_base_path(waveforms_path) 

342 

343 squirrel.add( 

344 self._get_waveforms_path(), 

345 check=check) 

346 

347 fn = self._get_channels_path() 

348 if os.path.exists(fn): 

349 squirrel.add(fn) 

350 

351 squirrel.add_virtual( 

352 [], virtual_paths=[self._source_id]) 

353 

354 responses_path = self._get_responses_path() 

355 if os.path.exists(responses_path): 

356 squirrel.add(responses_path, kinds=['response']) 

357 

358 self._hotfix_module = None 

359 

360 def _hotfix(self, query_type, sx): 

361 if self.hotfix_module_path is None: 

362 return 

363 

364 if self._hotfix_module is None: 

365 module_path = self.expand_path(self.hotfix_module_path) 

366 spec = importlib.util.spec_from_file_location( 

367 'hotfix_' + self._hash, module_path) 

368 self._hotfix_module = importlib.util.module_from_spec(spec) 

369 spec.loader.exec_module(self._hotfix_module) 

370 

371 hook = getattr( 

372 self._hotfix_module, 'stationxml_' + query_type + '_hook') 

373 

374 return hook(sx) 

375 

376 def _get_constraint_path(self): 

377 return op.join(self._cache_path, self._hash, 'constraint.pickle') 

378 

379 def _get_channels_path(self): 

380 return op.join(self._cache_path, self._hash, 'channels.stationxml') 

381 

382 def _get_responses_path(self, nslc=None): 

383 dirpath = op.join( 

384 self._cache_path, self._hash, 'responses') 

385 

386 if nslc is None: 

387 return dirpath 

388 else: 

389 return op.join( 

390 dirpath, 'response_%s_%s_%s_%s.stationxml' % nslc) 

391 

392 def _get_waveforms_path(self): 

393 if self.shared_waveforms: 

394 return op.join(self._cache_path, 'waveforms') 

395 else: 

396 return op.join(self._cache_path, self._hash, 'waveforms') 

397 

398 def _log_meta(self, message, target=logger.info): 

399 log_prefix = 'FDSN "%s" metadata:' % self.site 

400 target(' '.join((log_prefix, message))) 

401 

402 def _log_responses(self, message, target=logger.info): 

403 log_prefix = 'FDSN "%s" responses:' % self.site 

404 target(' '.join((log_prefix, message))) 

405 

406 def _log_info_data(self, *args): 

407 log_prefix = 'FDSN "%s" waveforms:' % self.site 

408 logger.info(' '.join((log_prefix,) + args)) 

409 

410 def _str_expires(self, t, now): 

411 if t is None: 

412 return 'expires: never' 

413 else: 

414 expire = 'expires' if t > now else 'expired' 

415 return '%s: %s' % ( 

416 expire, 

417 util.time_to_str(t, format='%Y-%m-%d %H:%M:%S')) 

418 

419 def update_channel_inventory(self, squirrel, constraint=None): 

420 if constraint is None: 

421 constraint = Constraint() 

422 

423 expiration_time = self._get_channels_expiration_time() 

424 now = time.time() 

425 

426 log_target = logger.info 

427 if self._constraint and self._constraint.contains(constraint) \ 

428 and (expiration_time is None or now < expiration_time): 

429 

430 status = 'using cached' 

431 

432 else: 

433 if self._constraint: 

434 constraint_temp = copy.deepcopy(self._constraint) 

435 constraint_temp.expand(constraint) 

436 constraint = constraint_temp 

437 

438 try: 

439 channel_sx = self._do_channel_query(constraint) 

440 

441 channel_sx.created = None # timestamp would ruin diff 

442 

443 fn = self._get_channels_path() 

444 util.ensuredirs(fn) 

445 fn_temp = fn + '.%i.temp' % os.getpid() 

446 channel_sx.dump_xml(filename=fn_temp) 

447 

448 status = move_or_keep(fn_temp, fn) 

449 

450 if status == 'upstream unchanged': 

451 squirrel.get_database().silent_touch(fn) 

452 

453 self._constraint = constraint 

454 self._dump_constraint() 

455 

456 except OSError as e: 

457 status = 'update failed (%s)' % str(e) 

458 log_target = logger.error 

459 

460 expiration_time = self._get_channels_expiration_time() 

461 self._log_meta( 

462 '%s (%s)' % (status, self._str_expires(expiration_time, now)), 

463 target=log_target) 

464 

465 fn = self._get_channels_path() 

466 if os.path.exists(fn): 

467 squirrel.add(fn) 

468 

469 def _do_channel_query(self, constraint): 

470 extra_args = {} 

471 

472 if self.site in sites_not_supporting['startbefore']: 

473 if constraint.tmin is not None and constraint.tmin != g_tmin: 

474 extra_args['starttime'] = constraint.tmin 

475 if constraint.tmax is not None and constraint.tmax != g_tmax: 

476 extra_args['endtime'] = constraint.tmax 

477 

478 else: 

479 if constraint.tmin is not None and constraint.tmin != g_tmin: 

480 extra_args['endafter'] = constraint.tmin 

481 if constraint.tmax is not None and constraint.tmax != g_tmax: 

482 extra_args['startbefore'] = constraint.tmax 

483 

484 if self.site not in sites_not_supporting['includerestricted']: 

485 extra_args.update( 

486 includerestricted=( 

487 self.user_credentials is not None 

488 or self.auth_token is not None 

489 or self.auth_token_path is not None)) 

490 

491 if self.query_args is not None: 

492 extra_args.update(self.query_args) 

493 

494 self._log_meta('querying...') 

495 

496 try: 

497 channel_sx = fdsn.station( 

498 site=self.site, 

499 format='text', 

500 level='channel', 

501 **extra_args) 

502 

503 self._hotfix('channel', channel_sx) 

504 

505 return channel_sx 

506 

507 except fdsn.EmptyResult: 

508 return stationxml.FDSNStationXML(source='dummy-empty-result') 

509 

510 def _load_constraint(self): 

511 fn = self._get_constraint_path() 

512 if op.exists(fn): 

513 with open(fn, 'rb') as f: 

514 self._constraint = pickle.load(f) 

515 else: 

516 self._constraint = None 

517 

518 def _dump_constraint(self): 

519 with open(self._get_constraint_path(), 'wb') as f: 

520 pickle.dump(self._constraint, f, protocol=2) 

521 

522 def _get_expiration_time(self, path): 

523 if self.expires is None: 

524 return None 

525 

526 try: 

527 t = os.stat(path)[8] 

528 return t + self.expires 

529 

530 except OSError: 

531 return 0.0 

532 

533 def _get_channels_expiration_time(self): 

534 return self._get_expiration_time(self._get_channels_path()) 

535 

536 def update_waveform_promises(self, squirrel, constraint): 

537 from ..base import gaps 

538 now = time.time() 

539 cpath = os.path.abspath(self._get_channels_path()) 

540 

541 ctmin = constraint.tmin 

542 ctmax = constraint.tmax 

543 

544 nuts = squirrel.iter_nuts( 

545 'channel', 

546 path=cpath, 

547 codes=constraint.codes, 

548 tmin=ctmin, 

549 tmax=ctmax) 

550 

551 coverages = squirrel.get_coverage( 

552 'waveform', 

553 codes=constraint.codes if constraint.codes else None, 

554 tmin=ctmin, 

555 tmax=ctmax) 

556 

557 codes_to_avail = defaultdict(list) 

558 for coverage in coverages: 

559 for tmin, tmax, _ in coverage.iter_spans(): 

560 codes_to_avail[coverage.codes].append((tmin, tmax)) 

561 

562 def sgaps(nut): 

563 for tmin, tmax in gaps( 

564 codes_to_avail[nut.codes], 

565 max(ctmin, nut.tmin) if ctmin is not None else nut.tmin, 

566 min(ctmax, nut.tmax) if ctmax is not None else nut.tmax): 

567 

568 subnut = clone(nut) 

569 subnut.tmin = tmin 

570 subnut.tmax = tmax 

571 

572 # ignore 1-sample gaps produced by rounding errors 

573 if subnut.tmax - subnut.tmin < 2*subnut.deltat: 

574 continue 

575 

576 yield subnut 

577 

578 def wanted(nuts): 

579 for nut in nuts: 

580 if nut.tmin < now: 

581 if nut.tmax > now: 

582 nut.tmax = now 

583 

584 for nut in sgaps(nut): 

585 yield nut 

586 

587 path = self._source_id 

588 squirrel.add_virtual( 

589 (make_waveform_promise_nut( 

590 file_path=path, 

591 **nut.waveform_promise_kwargs) for nut in wanted(nuts)), 

592 virtual_paths=[path]) 

593 

594 def remove_waveform_promises(self, squirrel, from_database='selection'): 

595 ''' 

596 Remove waveform promises from live selection or global database. 

597 

598 :param from_database: 

599 Remove from live selection ``'selection'`` or global database 

600 ``'global'``. 

601 ''' 

602 

603 path = self._source_id 

604 if from_database == 'selection': 

605 squirrel.remove(path) 

606 elif from_database == 'global': 

607 squirrel.get_database().remove(path) 

608 else: 

609 raise ValueError( 

610 'Values allowed for from_database: ("selection", "global")') 

611 

612 def _get_user_credentials(self): 

613 d = {} 

614 if self.user_credentials is not None: 

615 d['user'], d['passwd'] = self.user_credentials 

616 

617 if self.auth_token is not None or self.auth_token_path is not None: 

618 d['token'] = self.get_auth_token() 

619 

620 return d 

621 

622 def download_waveforms( 

623 self, orders, success, batch_add, error_permanent, 

624 error_temporary): 

625 

626 elog = ErrorLog(site=self.site) 

627 orders.sort(key=orders_sort_key) 

628 neach = 20 

629 i = 0 

630 task = make_task( 

631 'FDSN "%s" waveforms: downloading' % self.site, len(orders)) 

632 

633 while i < len(orders): 

634 orders_now = orders[i:i+neach] 

635 selection_now = orders_to_selection(orders_now) 

636 

637 nsuccess = 0 

638 elog.append_checkpoint() 

639 self._log_info_data( 

640 'downloading, %s' % order_summary(orders_now)) 

641 

642 all_paths = [] 

643 with tempfile.TemporaryDirectory() as tmpdir: 

644 try: 

645 data = fdsn.dataselect( 

646 site=self.site, selection=selection_now, 

647 **self._get_user_credentials()) 

648 

649 now = time.time() 

650 

651 path = op.join(tmpdir, 'tmp.mseed') 

652 with open(path, 'wb') as f: 

653 while True: 

654 buf = data.read(1024) 

655 if not buf: 

656 break 

657 f.write(buf) 

658 

659 trs = io.load(path) 

660 

661 by_nslc = defaultdict(list) 

662 for tr in trs: 

663 by_nslc[tr.nslc_id].append(tr) 

664 

665 for order in orders_now: 

666 trs_order = [] 

667 err_this = None 

668 for tr in by_nslc[order.codes.nslc]: 

669 try: 

670 order.validate(tr) 

671 trs_order.append(tr.chop( 

672 order.tmin, order.tmax, inplace=False)) 

673 

674 except trace.NoData: 

675 err_this = ( 

676 'empty result', 'empty sub-interval') 

677 

678 except InvalidWaveform as e: 

679 err_this = ('invalid waveform', str(e)) 

680 

681 if len(trs_order) == 0: 

682 if err_this is None: 

683 err_this = ('empty result', '') 

684 

685 elog.append(now, order, *err_this) 

686 error_permanent(order) 

687 else: 

688 def tsame(ta, tb): 

689 return abs(tb - ta) < 2 * order.deltat 

690 

691 if len(trs_order) != 1 \ 

692 or not tsame( 

693 trs_order[0].tmin, order.tmin) \ 

694 or not tsame( 

695 trs_order[0].tmax, order.tmax): 

696 

697 if err_this: 

698 elog.append( 

699 now, order, 

700 'partial result, %s' % err_this[0], 

701 err_this[1]) 

702 else: 

703 elog.append(now, order, 'partial result') 

704 

705 paths = self._archive.add(order, trs_order) 

706 all_paths.extend(paths) 

707 

708 nsuccess += 1 

709 success(order) 

710 

711 except fdsn.EmptyResult: 

712 now = time.time() 

713 for order in orders_now: 

714 elog.append(now, order, 'empty result') 

715 error_permanent(order) 

716 

717 except util.HTTPError as e: 

718 now = time.time() 

719 for order in orders_now: 

720 elog.append(now, order, 'http error', str(e)) 

721 error_temporary(order) 

722 

723 emessage = elog.summarize_recent() 

724 

725 self._log_info_data( 

726 '%i download%s %ssuccessful' % ( 

727 nsuccess, 

728 util.plural_s(nsuccess), 

729 '(partially) ' if emessage else '') 

730 + (', %s' % emessage if emessage else '')) 

731 

732 if all_paths: 

733 batch_add(all_paths) 

734 

735 i += neach 

736 task.update(i) 

737 

738 for agg in elog.iter_aggregates(): 

739 logger.warning(str(agg)) 

740 

741 task.done() 

742 

743 def _do_response_query(self, selection): 

744 extra_args = {} 

745 

746 if self.site not in sites_not_supporting['includerestricted']: 

747 extra_args.update( 

748 includerestricted=( 

749 self.user_credentials is not None 

750 or self.auth_token is not None 

751 or self.auth_token_path is not None)) 

752 

753 self._log_responses('querying...') 

754 

755 try: 

756 response_sx = fdsn.station( 

757 site=self.site, 

758 level='response', 

759 selection=selection, 

760 **extra_args) 

761 

762 self._hotfix('response', response_sx) 

763 return response_sx 

764 

765 except fdsn.EmptyResult: 

766 return stationxml.FDSNStationXML(source='dummy-empty-result') 

767 

768 def update_response_inventory(self, squirrel, constraint): 

769 cpath = os.path.abspath(self._get_channels_path()) 

770 nuts = squirrel.iter_nuts( 

771 'channel', path=cpath, codes=constraint.codes) 

772 

773 tmin = g_tmin_queries 

774 tmax = g_tmax 

775 

776 selection = [] 

777 now = time.time() 

778 have = set() 

779 status = defaultdict(list) 

780 for nut in nuts: 

781 nslc = nut.codes.nslc 

782 if nslc in have: 

783 continue 

784 have.add(nslc) 

785 

786 fn = self._get_responses_path(nslc) 

787 expiration_time = self._get_expiration_time(fn) 

788 if os.path.exists(fn) \ 

789 and (expiration_time is None or now < expiration_time): 

790 status['using cached'].append(nslc) 

791 else: 

792 selection.append(nslc + (tmin, tmax)) 

793 

794 dummy = stationxml.FDSNStationXML(source='dummy-empty') 

795 neach = 100 

796 i = 0 

797 fns = [] 

798 while i < len(selection): 

799 selection_now = selection[i:i+neach] 

800 i += neach 

801 

802 try: 

803 sx = self._do_response_query(selection_now) 

804 except Exception as e: 

805 status['update failed (%s)' % str(e)].extend( 

806 entry[:4] for entry in selection_now) 

807 continue 

808 

809 sx.created = None # timestamp would ruin diff 

810 

811 by_nslc = dict(stationxml.split_channels(sx)) 

812 

813 for entry in selection_now: 

814 nslc = entry[:4] 

815 response_sx = by_nslc.get(nslc, dummy) 

816 try: 

817 fn = self._get_responses_path(nslc) 

818 fn_temp = fn + '.%i.temp' % os.getpid() 

819 

820 util.ensuredirs(fn_temp) 

821 response_sx.dump_xml(filename=fn_temp) 

822 

823 status_this = move_or_keep(fn_temp, fn) 

824 

825 if status_this == 'upstream unchanged': 

826 try: 

827 squirrel.get_database().silent_touch(fn) 

828 except ExecuteGet1Error: 

829 pass 

830 

831 status[status_this].append(nslc) 

832 fns.append(fn) 

833 

834 except OSError as e: 

835 status['update failed (%s)' % str(e)].append(nslc) 

836 

837 for k in sorted(status): 

838 if k.find('failed') != -1: 

839 log_target = logger.error 

840 else: 

841 log_target = logger.info 

842 

843 self._log_responses( 

844 '%s: %s' % ( 

845 k, codes_to_str_abbreviated( 

846 CodesNSLCE(tup) for tup in status[k])), 

847 target=log_target) 

848 

849 if fns: 

850 squirrel.add(fns, kinds=['response']) 

851 

852 

853__all__ = [ 

854 'FDSNSource', 

855]