1# http://pyrocko.org - GPLv3 

2# 

3# The Pyrocko Developers, 21st Century 

4# ---|P------/S----------~Lg---------- 

5 

6import time 

7import os 

8import copy 

9import logging 

10import tempfile 

11import importlib.util 

12from collections import defaultdict 

13try: 

14 import cPickle as pickle 

15except ImportError: 

16 import pickle 

17import os.path as op 

18from .base import Source, Constraint 

19from ..model import make_waveform_promise_nut, ehash, InvalidWaveform, \ 

20 order_summary, WaveformOrder, g_tmin, g_tmax, g_tmin_queries, \ 

21 codes_to_str_abbreviated, CodesNSLCE 

22from ..database import ExecuteGet1Error 

23from pyrocko.client import fdsn 

24 

25from pyrocko import util, trace, io 

26from pyrocko.io.io_common import FileLoadError 

27from pyrocko.io import stationxml 

28from pyrocko.progress import progress 

29from pyrocko import has_paths 

30 

31from pyrocko.guts import Object, String, Timestamp, List, Tuple, Int, Dict, \ 

32 Duration, Bool, clone 

33 

34guts_prefix = 'squirrel' 

35 

36fdsn.g_timeout = 60. 

37 

38logger = logging.getLogger('psq.client.fdsn') 

39 

40sites_not_supporting = { 

41 'startbefore': ['geonet'], 

42 'includerestricted': ['geonet']} 

43 

44 

45def make_task(*args): 

46 return progress.task(*args, logger=logger) 

47 

48 

49def diff(fn_a, fn_b): 

50 try: 

51 if os.stat(fn_a).st_size != os.stat(fn_b).st_size: 

52 return True 

53 

54 except OSError: 

55 return True 

56 

57 with open(fn_a, 'rb') as fa: 

58 with open(fn_b, 'rb') as fb: 

59 while True: 

60 a = fa.read(1024) 

61 b = fb.read(1024) 

62 if a != b: 

63 return True 

64 

65 if len(a) == 0 or len(b) == 0: 

66 return False 

67 

68 

69def move_or_keep(fn_temp, fn): 

70 if op.exists(fn): 

71 if diff(fn, fn_temp): 

72 os.rename(fn_temp, fn) 

73 status = 'updated' 

74 else: 

75 os.unlink(fn_temp) 

76 status = 'upstream unchanged' 

77 

78 else: 

79 os.rename(fn_temp, fn) 

80 status = 'new' 

81 

82 return status 

83 

84 

85class Archive(Object): 

86 

87 def add(self): 

88 raise NotImplementedError() 

89 

90 

91class MSeedArchive(Archive): 

92 template = String.T(default=op.join( 

93 '%(tmin_year)s', 

94 '%(tmin_month)s', 

95 '%(tmin_day)s', 

96 'trace_%(network)s_%(station)s_%(location)s_%(channel)s' 

97 + '_%(tmin_us)s_%(tmax_us)s.mseed')) 

98 

99 def __init__(self, **kwargs): 

100 Archive.__init__(self, **kwargs) 

101 self._base_path = None 

102 

103 def set_base_path(self, path): 

104 self._base_path = path 

105 

106 def add(self, trs): 

107 path = op.join(self._base_path, self.template) 

108 return io.save(trs, path, overwrite=True) 

109 

110 

111def combine_selections(selection): 

112 out = [] 

113 last = None 

114 for this in selection: 

115 if last and this[:4] == last[:4] and this[4] == last[5]: 

116 last = last[:5] + (this[5],) 

117 else: 

118 if last: 

119 out.append(last) 

120 

121 last = this 

122 

123 if last: 

124 out.append(last) 

125 

126 return out 

127 

128 

129def orders_sort_key(order): 

130 return (order.codes, order.tmin) 

131 

132 

133def orders_to_selection(orders): 

134 selection = [] 

135 for order in sorted(orders, key=orders_sort_key): 

136 selection.append( 

137 order.codes.nslc + (order.tmin, order.tmax)) 

138 

139 return combine_selections(selection) 

140 

141 

142class ErrorEntry(Object): 

143 time = Timestamp.T() 

144 order = WaveformOrder.T() 

145 kind = String.T() 

146 details = String.T(optional=True) 

147 

148 

149class ErrorAggregate(Object): 

150 site = String.T() 

151 kind = String.T() 

152 details = String.T() 

153 entries = List.T(ErrorEntry.T()) 

154 codes = List.T(CodesNSLCE.T()) 

155 time_spans = List.T(Tuple.T(2, Timestamp.T())) 

156 

157 def __str__(self): 

158 codes = [str(x) for x in self.codes] 

159 scodes = '\n' + util.ewrap(codes, indent=' ') if codes else '<none>' 

160 tss = self.time_spans 

161 sspans = '\n' + util.ewrap(('%s - %s' % ( 

162 util.time_to_str(ts[0]), util.time_to_str(ts[1])) for ts in tss), 

163 indent=' ') 

164 

165 return ('FDSN "%s": download error summary for "%s" (%i)\n%s ' 

166 'Codes:%s\n Time spans:%s') % ( 

167 self.site, 

168 self.kind, 

169 len(self.entries), 

170 ' Details: %s\n' % self.details if self.details else '', 

171 scodes, 

172 sspans) 

173 

174 

175class ErrorLog(Object): 

176 site = String.T() 

177 entries = List.T(ErrorEntry.T()) 

178 checkpoints = List.T(Int.T()) 

179 

180 def append_checkpoint(self): 

181 self.checkpoints.append(len(self.entries)) 

182 

183 def append(self, time, order, kind, details=''): 

184 entry = ErrorEntry(time=time, order=order, kind=kind, details=details) 

185 self.entries.append(entry) 

186 

187 def iter_aggregates(self): 

188 by_kind_details = defaultdict(list) 

189 for entry in self.entries: 

190 by_kind_details[entry.kind, entry.details].append(entry) 

191 

192 kind_details = sorted(by_kind_details.keys()) 

193 

194 for kind, details in kind_details: 

195 entries = by_kind_details[kind, details] 

196 codes = sorted(set(entry.order.codes for entry in entries)) 

197 selection = orders_to_selection(entry.order for entry in entries) 

198 time_spans = sorted(set(row[-2:] for row in selection)) 

199 yield ErrorAggregate( 

200 site=self.site, 

201 kind=kind, 

202 details=details, 

203 entries=entries, 

204 codes=codes, 

205 time_spans=time_spans) 

206 

207 def summarize_recent(self): 

208 ioff = self.checkpoints[-1] if self.checkpoints else 0 

209 recent = self.entries[ioff:] 

210 kinds = sorted(set(entry.kind for entry in recent)) 

211 if recent: 

212 return '%i error%s (%s)' % ( 

213 len(recent), util.plural_s(recent), '; '.join(kinds)) 

214 else: 

215 return '' 

216 

217 

218class FDSNSource(Source, has_paths.HasPaths): 

219 

220 ''' 

221 Squirrel data-source to transparently get data from FDSN web services. 

222 

223 Attaching an :py:class:`FDSNSource` object to a :py:class:`Squirrel` allows 

224 the latter to download station and waveform data from an FDSN web service 

225 should the data not already happen to be available locally. 

226 ''' 

227 

228 site = String.T( 

229 help='FDSN site url or alias name (see ' 

230 ':py:mod:`pyrocko.client.fdsn`).') 

231 

232 query_args = Dict.T( 

233 String.T(), String.T(), 

234 optional=True, 

235 help='Common query arguments, which are appended to all queries.') 

236 

237 expires = Duration.T( 

238 optional=True, 

239 help='Expiration time [s]. Information older than this will be ' 

240 'refreshed. This only applies to station-metadata. Waveforms do ' 

241 'not expire. If set to ``None`` neither type of data expires.') 

242 

243 cache_path = String.T( 

244 optional=True, 

245 help='Directory path where any downloaded waveforms and station ' 

246 'meta-data are to be kept. By default the Squirrel ' 

247 'environment\'s cache directory is used.') 

248 

249 shared_waveforms = Bool.T( 

250 default=False, 

251 help='If ``True``, waveforms are shared with other FDSN sources in ' 

252 'the same Squirrel environment. If ``False``, they are kept ' 

253 'separate.') 

254 

255 user_credentials = Tuple.T( 

256 2, String.T(), 

257 optional=True, 

258 help='User and password for FDSN servers requiring password ' 

259 'authentication') 

260 

261 auth_token = String.T( 

262 optional=True, 

263 help='Authentication token to be presented to the FDSN server.') 

264 

265 auth_token_path = String.T( 

266 optional=True, 

267 help='Path to file containing the authentication token to be ' 

268 'presented to the FDSN server.') 

269 

270 hotfix_module_path = has_paths.Path.T( 

271 optional=True, 

272 help='Path to Python module to locally patch metadata errors.') 

273 

274 def __init__(self, site, query_args=None, **kwargs): 

275 Source.__init__(self, site=site, query_args=query_args, **kwargs) 

276 

277 self._constraint = None 

278 self._hash = self.make_hash() 

279 self._source_id = 'client:fdsn:%s' % self._hash 

280 self._error_infos = [] 

281 

282 def describe(self): 

283 return self._source_id 

284 

285 def make_hash(self): 

286 s = self.site 

287 s += 'notoken' \ 

288 if (self.auth_token is None and self.auth_token_path is None) \ 

289 else 'token' 

290 

291 if self.user_credentials is not None: 

292 s += self.user_credentials[0] 

293 else: 

294 s += 'nocred' 

295 

296 if self.query_args is not None: 

297 s += ','.join( 

298 '%s:%s' % (k, self.query_args[k]) 

299 for k in sorted(self.query_args.keys())) 

300 else: 

301 s += 'noqueryargs' 

302 

303 return ehash(s) 

304 

305 def get_hash(self): 

306 return self._hash 

307 

308 def get_auth_token(self): 

309 if self.auth_token: 

310 return self.auth_token 

311 

312 elif self.auth_token_path is not None: 

313 try: 

314 with open(self.auth_token_path, 'rb') as f: 

315 return f.read().decode('ascii') 

316 

317 except OSError as e: 

318 raise FileLoadError( 

319 'Cannot load auth token file (%s): %s' 

320 % (str(e), self.auth_token_path)) 

321 

322 else: 

323 raise Exception( 

324 'FDSNSource: auth_token and auth_token_path are mutually ' 

325 'exclusive.') 

326 

327 def setup(self, squirrel, check=True): 

328 self._cache_path = op.join( 

329 self.cache_path or squirrel._cache_path, 'fdsn') 

330 

331 util.ensuredir(self._cache_path) 

332 self._load_constraint() 

333 self._archive = MSeedArchive() 

334 waveforms_path = self._get_waveforms_path() 

335 util.ensuredir(waveforms_path) 

336 self._archive.set_base_path(waveforms_path) 

337 

338 squirrel.add( 

339 self._get_waveforms_path(), 

340 check=check) 

341 

342 fn = self._get_channels_path() 

343 if os.path.exists(fn): 

344 squirrel.add(fn) 

345 

346 squirrel.add_virtual( 

347 [], virtual_paths=[self._source_id]) 

348 

349 responses_path = self._get_responses_path() 

350 if os.path.exists(responses_path): 

351 squirrel.add(responses_path, kinds=['response']) 

352 

353 self._hotfix_module = None 

354 

355 def _hotfix(self, query_type, sx): 

356 if self.hotfix_module_path is None: 

357 return 

358 

359 if self._hotfix_module is None: 

360 module_path = self.expand_path(self.hotfix_module_path) 

361 spec = importlib.util.spec_from_file_location( 

362 'hotfix_' + self._hash, module_path) 

363 self._hotfix_module = importlib.util.module_from_spec(spec) 

364 spec.loader.exec_module(self._hotfix_module) 

365 

366 hook = getattr( 

367 self._hotfix_module, 'stationxml_' + query_type + '_hook') 

368 

369 return hook(sx) 

370 

371 def _get_constraint_path(self): 

372 return op.join(self._cache_path, self._hash, 'constraint.pickle') 

373 

374 def _get_channels_path(self): 

375 return op.join(self._cache_path, self._hash, 'channels.stationxml') 

376 

377 def _get_responses_path(self, nslc=None): 

378 dirpath = op.join( 

379 self._cache_path, self._hash, 'responses') 

380 

381 if nslc is None: 

382 return dirpath 

383 else: 

384 return op.join( 

385 dirpath, 'response_%s_%s_%s_%s.stationxml' % nslc) 

386 

387 def _get_waveforms_path(self): 

388 if self.shared_waveforms: 

389 return op.join(self._cache_path, 'waveforms') 

390 else: 

391 return op.join(self._cache_path, self._hash, 'waveforms') 

392 

393 def _log_meta(self, message, target=logger.info): 

394 log_prefix = 'FDSN "%s" metadata:' % self.site 

395 target(' '.join((log_prefix, message))) 

396 

397 def _log_responses(self, message, target=logger.info): 

398 log_prefix = 'FDSN "%s" responses:' % self.site 

399 target(' '.join((log_prefix, message))) 

400 

401 def _log_info_data(self, *args): 

402 log_prefix = 'FDSN "%s" waveforms:' % self.site 

403 logger.info(' '.join((log_prefix,) + args)) 

404 

405 def _str_expires(self, t, now): 

406 if t is None: 

407 return 'expires: never' 

408 else: 

409 expire = 'expires' if t > now else 'expired' 

410 return '%s: %s' % ( 

411 expire, 

412 util.time_to_str(t, format='%Y-%m-%d %H:%M:%S')) 

413 

414 def update_channel_inventory(self, squirrel, constraint=None): 

415 if constraint is None: 

416 constraint = Constraint() 

417 

418 expiration_time = self._get_channels_expiration_time() 

419 now = time.time() 

420 

421 log_target = logger.info 

422 if self._constraint and self._constraint.contains(constraint) \ 

423 and (expiration_time is None or now < expiration_time): 

424 

425 status = 'using cached' 

426 

427 else: 

428 if self._constraint: 

429 constraint_temp = copy.deepcopy(self._constraint) 

430 constraint_temp.expand(constraint) 

431 constraint = constraint_temp 

432 

433 try: 

434 channel_sx = self._do_channel_query(constraint) 

435 

436 channel_sx.created = None # timestamp would ruin diff 

437 

438 fn = self._get_channels_path() 

439 util.ensuredirs(fn) 

440 fn_temp = fn + '.%i.temp' % os.getpid() 

441 channel_sx.dump_xml(filename=fn_temp) 

442 

443 status = move_or_keep(fn_temp, fn) 

444 

445 if status == 'upstream unchanged': 

446 squirrel.get_database().silent_touch(fn) 

447 

448 self._constraint = constraint 

449 self._dump_constraint() 

450 

451 except OSError as e: 

452 status = 'update failed (%s)' % str(e) 

453 log_target = logger.error 

454 

455 expiration_time = self._get_channels_expiration_time() 

456 self._log_meta( 

457 '%s (%s)' % (status, self._str_expires(expiration_time, now)), 

458 target=log_target) 

459 

460 fn = self._get_channels_path() 

461 if os.path.exists(fn): 

462 squirrel.add(fn) 

463 

464 def _do_channel_query(self, constraint): 

465 extra_args = {} 

466 

467 if self.site in sites_not_supporting['startbefore']: 

468 if constraint.tmin is not None and constraint.tmin != g_tmin: 

469 extra_args['starttime'] = constraint.tmin 

470 if constraint.tmax is not None and constraint.tmax != g_tmax: 

471 extra_args['endtime'] = constraint.tmax 

472 

473 else: 

474 if constraint.tmin is not None and constraint.tmin != g_tmin: 

475 extra_args['endafter'] = constraint.tmin 

476 if constraint.tmax is not None and constraint.tmax != g_tmax: 

477 extra_args['startbefore'] = constraint.tmax 

478 

479 if self.site not in sites_not_supporting['includerestricted']: 

480 extra_args.update( 

481 includerestricted=( 

482 self.user_credentials is not None 

483 or self.auth_token is not None 

484 or self.auth_token_path is not None)) 

485 

486 if self.query_args is not None: 

487 extra_args.update(self.query_args) 

488 

489 self._log_meta('querying...') 

490 

491 try: 

492 channel_sx = fdsn.station( 

493 site=self.site, 

494 format='text', 

495 level='channel', 

496 **extra_args) 

497 

498 self._hotfix('channel', channel_sx) 

499 

500 return channel_sx 

501 

502 except fdsn.EmptyResult: 

503 return stationxml.FDSNStationXML(source='dummy-empty-result') 

504 

505 def _load_constraint(self): 

506 fn = self._get_constraint_path() 

507 if op.exists(fn): 

508 with open(fn, 'rb') as f: 

509 self._constraint = pickle.load(f) 

510 else: 

511 self._constraint = None 

512 

513 def _dump_constraint(self): 

514 with open(self._get_constraint_path(), 'wb') as f: 

515 pickle.dump(self._constraint, f, protocol=2) 

516 

517 def _get_expiration_time(self, path): 

518 if self.expires is None: 

519 return None 

520 

521 try: 

522 t = os.stat(path)[8] 

523 return t + self.expires 

524 

525 except OSError: 

526 return 0.0 

527 

528 def _get_channels_expiration_time(self): 

529 return self._get_expiration_time(self._get_channels_path()) 

530 

531 def update_waveform_promises(self, squirrel, constraint): 

532 from ..base import gaps 

533 now = time.time() 

534 cpath = os.path.abspath(self._get_channels_path()) 

535 

536 ctmin = constraint.tmin 

537 ctmax = constraint.tmax 

538 

539 nuts = squirrel.iter_nuts( 

540 'channel', 

541 path=cpath, 

542 codes=constraint.codes, 

543 tmin=ctmin, 

544 tmax=ctmax) 

545 

546 coverages = squirrel.get_coverage( 

547 'waveform', 

548 codes=constraint.codes if constraint.codes else None, 

549 tmin=ctmin, 

550 tmax=ctmax) 

551 

552 codes_to_avail = defaultdict(list) 

553 for coverage in coverages: 

554 for tmin, tmax, _ in coverage.iter_spans(): 

555 codes_to_avail[coverage.codes].append((tmin, tmax)) 

556 

557 def sgaps(nut): 

558 for tmin, tmax in gaps( 

559 codes_to_avail[nut.codes], 

560 max(ctmin, nut.tmin) if ctmin is not None else nut.tmin, 

561 min(ctmax, nut.tmax) if ctmax is not None else nut.tmax): 

562 

563 subnut = clone(nut) 

564 subnut.tmin = tmin 

565 subnut.tmax = tmax 

566 yield subnut 

567 

568 def wanted(nuts): 

569 for nut in nuts: 

570 if nut.tmin < now: 

571 if nut.tmax > now: 

572 nut.tmax = now 

573 

574 for nut in sgaps(nut): 

575 yield nut 

576 

577 path = self._source_id 

578 squirrel.add_virtual( 

579 (make_waveform_promise_nut( 

580 file_path=path, 

581 **nut.waveform_promise_kwargs) for nut in wanted(nuts)), 

582 virtual_paths=[path]) 

583 

584 def remove_waveform_promises(self, squirrel, from_database='selection'): 

585 ''' 

586 Remove waveform promises from live selection or global database. 

587 

588 :param from_database: 

589 Remove from live selection ``'selection'`` or global database 

590 ``'global'``. 

591 ''' 

592 

593 path = self._source_id 

594 if from_database == 'selection': 

595 squirrel.remove(path) 

596 elif from_database == 'global': 

597 squirrel.get_database().remove(path) 

598 else: 

599 raise ValueError( 

600 'Values allowed for from_database: ("selection", "global")') 

601 

602 def _get_user_credentials(self): 

603 d = {} 

604 if self.user_credentials is not None: 

605 d['user'], d['passwd'] = self.user_credentials 

606 

607 if self.auth_token is not None or self.auth_token_path is not None: 

608 d['token'] = self.get_auth_token() 

609 

610 return d 

611 

612 def download_waveforms( 

613 self, orders, success, batch_add, error_permanent, 

614 error_temporary): 

615 

616 elog = ErrorLog(site=self.site) 

617 orders.sort(key=orders_sort_key) 

618 neach = 20 

619 i = 0 

620 task = make_task( 

621 'FDSN "%s" waveforms: downloading' % self.site, len(orders)) 

622 

623 while i < len(orders): 

624 orders_now = orders[i:i+neach] 

625 selection_now = orders_to_selection(orders_now) 

626 

627 nsuccess = 0 

628 elog.append_checkpoint() 

629 self._log_info_data( 

630 'downloading, %s' % order_summary(orders_now)) 

631 

632 all_paths = [] 

633 with tempfile.TemporaryDirectory() as tmpdir: 

634 try: 

635 data = fdsn.dataselect( 

636 site=self.site, selection=selection_now, 

637 **self._get_user_credentials()) 

638 

639 now = time.time() 

640 

641 path = op.join(tmpdir, 'tmp.mseed') 

642 with open(path, 'wb') as f: 

643 while True: 

644 buf = data.read(1024) 

645 if not buf: 

646 break 

647 f.write(buf) 

648 

649 trs = io.load(path) 

650 

651 by_nslc = defaultdict(list) 

652 for tr in trs: 

653 by_nslc[tr.nslc_id].append(tr) 

654 

655 for order in orders_now: 

656 trs_order = [] 

657 err_this = None 

658 for tr in by_nslc[order.codes.nslc]: 

659 try: 

660 order.validate(tr) 

661 trs_order.append(tr.chop( 

662 order.tmin, order.tmax, inplace=False)) 

663 

664 except trace.NoData: 

665 err_this = ( 

666 'empty result', 'empty sub-interval') 

667 

668 except InvalidWaveform as e: 

669 err_this = ('invalid waveform', str(e)) 

670 

671 if len(trs_order) == 0: 

672 if err_this is None: 

673 err_this = ('empty result', '') 

674 

675 elog.append(now, order, *err_this) 

676 error_permanent(order) 

677 else: 

678 if len(trs_order) != 1: 

679 if err_this: 

680 elog.append( 

681 now, order, 

682 'partial result, %s' % err_this[0], 

683 err_this[1]) 

684 else: 

685 elog.append(now, order, 'partial result') 

686 

687 paths = self._archive.add(trs_order) 

688 all_paths.extend(paths) 

689 

690 nsuccess += 1 

691 success(order) 

692 

693 except fdsn.EmptyResult: 

694 now = time.time() 

695 for order in orders_now: 

696 elog.append(now, order, 'empty result') 

697 error_permanent(order) 

698 

699 except util.HTTPError as e: 

700 now = time.time() 

701 for order in orders_now: 

702 elog.append(now, order, 'http error', str(e)) 

703 error_temporary(order) 

704 

705 emessage = elog.summarize_recent() 

706 self._log_info_data( 

707 '%i download%s successful' % ( 

708 nsuccess, util.plural_s(nsuccess)) 

709 + (', %s' % emessage if emessage else '')) 

710 

711 if all_paths: 

712 batch_add(all_paths) 

713 

714 i += neach 

715 task.update(i) 

716 

717 for agg in elog.iter_aggregates(): 

718 logger.warning(str(agg)) 

719 

720 task.done() 

721 

722 def _do_response_query(self, selection): 

723 extra_args = {} 

724 

725 if self.site not in sites_not_supporting['includerestricted']: 

726 extra_args.update( 

727 includerestricted=( 

728 self.user_credentials is not None 

729 or self.auth_token is not None 

730 or self.auth_token_path is not None)) 

731 

732 self._log_responses('querying...') 

733 

734 try: 

735 response_sx = fdsn.station( 

736 site=self.site, 

737 level='response', 

738 selection=selection, 

739 **extra_args) 

740 

741 self._hotfix('response', response_sx) 

742 return response_sx 

743 

744 except fdsn.EmptyResult: 

745 return stationxml.FDSNStationXML(source='dummy-empty-result') 

746 

747 def update_response_inventory(self, squirrel, constraint): 

748 cpath = os.path.abspath(self._get_channels_path()) 

749 nuts = squirrel.iter_nuts( 

750 'channel', path=cpath, codes=constraint.codes) 

751 

752 tmin = g_tmin_queries 

753 tmax = g_tmax 

754 

755 selection = [] 

756 now = time.time() 

757 have = set() 

758 status = defaultdict(list) 

759 for nut in nuts: 

760 nslc = nut.codes.nslc 

761 if nslc in have: 

762 continue 

763 have.add(nslc) 

764 

765 fn = self._get_responses_path(nslc) 

766 expiration_time = self._get_expiration_time(fn) 

767 if os.path.exists(fn) \ 

768 and (expiration_time is None or now < expiration_time): 

769 status['using cached'].append(nslc) 

770 else: 

771 selection.append(nslc + (tmin, tmax)) 

772 

773 dummy = stationxml.FDSNStationXML(source='dummy-empty') 

774 neach = 100 

775 i = 0 

776 fns = [] 

777 while i < len(selection): 

778 selection_now = selection[i:i+neach] 

779 i += neach 

780 

781 try: 

782 sx = self._do_response_query(selection_now) 

783 except Exception as e: 

784 status['update failed (%s)' % str(e)].extend( 

785 entry[:4] for entry in selection_now) 

786 continue 

787 

788 sx.created = None # timestamp would ruin diff 

789 

790 by_nslc = dict(stationxml.split_channels(sx)) 

791 

792 for entry in selection_now: 

793 nslc = entry[:4] 

794 response_sx = by_nslc.get(nslc, dummy) 

795 try: 

796 fn = self._get_responses_path(nslc) 

797 fn_temp = fn + '.%i.temp' % os.getpid() 

798 

799 util.ensuredirs(fn_temp) 

800 response_sx.dump_xml(filename=fn_temp) 

801 

802 status_this = move_or_keep(fn_temp, fn) 

803 

804 if status_this == 'upstream unchanged': 

805 try: 

806 squirrel.get_database().silent_touch(fn) 

807 except ExecuteGet1Error: 

808 pass 

809 

810 status[status_this].append(nslc) 

811 fns.append(fn) 

812 

813 except OSError as e: 

814 status['update failed (%s)' % str(e)].append(nslc) 

815 

816 for k in sorted(status): 

817 if k.find('failed') != -1: 

818 log_target = logger.error 

819 else: 

820 log_target = logger.info 

821 

822 self._log_responses( 

823 '%s: %s' % ( 

824 k, codes_to_str_abbreviated( 

825 CodesNSLCE(tup) for tup in status[k])), 

826 target=log_target) 

827 

828 if fns: 

829 squirrel.add(fns, kinds=['response']) 

830 

831 

832__all__ = [ 

833 'FDSNSource', 

834]