1# http://pyrocko.org - GPLv3 

2# 

3# The Pyrocko Developers, 21st Century 

4# ---|P------/S----------~Lg---------- 

5 

6from __future__ import absolute_import, print_function 

7 

8import time 

9import os 

10import copy 

11import logging 

12import tempfile 

13import importlib.util 

14from collections import defaultdict 

15try: 

16 import cPickle as pickle 

17except ImportError: 

18 import pickle 

19import os.path as op 

20from .base import Source, Constraint 

21from ..model import make_waveform_promise_nut, ehash, InvalidWaveform, \ 

22 order_summary, WaveformOrder, g_tmin, g_tmax, g_tmin_queries, \ 

23 codes_to_str_abbreviated, CodesNSLCE 

24from ..database import ExecuteGet1Error 

25from pyrocko.client import fdsn 

26 

27from pyrocko import util, trace, io 

28from pyrocko.io.io_common import FileLoadError 

29from pyrocko.io import stationxml 

30from pyrocko.progress import progress 

31from pyrocko import has_paths 

32 

33from pyrocko.guts import Object, String, Timestamp, List, Tuple, Int, Dict, \ 

34 Duration, Bool, clone 

35 

36guts_prefix = 'squirrel' 

37 

38fdsn.g_timeout = 60. 

39 

40logger = logging.getLogger('psq.client.fdsn') 

41 

42sites_not_supporting = { 

43 'startbefore': ['geonet'], 

44 'includerestricted': ['geonet']} 

45 

46 

47def make_task(*args): 

48 return progress.task(*args, logger=logger) 

49 

50 

51def diff(fn_a, fn_b): 

52 try: 

53 if os.stat(fn_a).st_size != os.stat(fn_b).st_size: 

54 return True 

55 

56 except OSError: 

57 return True 

58 

59 with open(fn_a, 'rb') as fa: 

60 with open(fn_b, 'rb') as fb: 

61 while True: 

62 a = fa.read(1024) 

63 b = fb.read(1024) 

64 if a != b: 

65 return True 

66 

67 if len(a) == 0 or len(b) == 0: 

68 return False 

69 

70 

71def move_or_keep(fn_temp, fn): 

72 if op.exists(fn): 

73 if diff(fn, fn_temp): 

74 os.rename(fn_temp, fn) 

75 status = 'updated' 

76 else: 

77 os.unlink(fn_temp) 

78 status = 'upstream unchanged' 

79 

80 else: 

81 os.rename(fn_temp, fn) 

82 status = 'new' 

83 

84 return status 

85 

86 

87class Archive(Object): 

88 

89 def add(self): 

90 raise NotImplementedError() 

91 

92 

93class MSeedArchive(Archive): 

94 template = String.T(default=op.join( 

95 '%(tmin_year)s', 

96 '%(tmin_month)s', 

97 '%(tmin_day)s', 

98 'trace_%(network)s_%(station)s_%(location)s_%(channel)s' 

99 + '_%(tmin_us)s_%(tmax_us)s.mseed')) 

100 

101 def __init__(self, **kwargs): 

102 Archive.__init__(self, **kwargs) 

103 self._base_path = None 

104 

105 def set_base_path(self, path): 

106 self._base_path = path 

107 

108 def add(self, trs): 

109 path = op.join(self._base_path, self.template) 

110 return io.save(trs, path, overwrite=True) 

111 

112 

113def combine_selections(selection): 

114 out = [] 

115 last = None 

116 for this in selection: 

117 if last and this[:4] == last[:4] and this[4] == last[5]: 

118 last = last[:5] + (this[5],) 

119 else: 

120 if last: 

121 out.append(last) 

122 

123 last = this 

124 

125 if last: 

126 out.append(last) 

127 

128 return out 

129 

130 

131def orders_sort_key(order): 

132 return (order.codes, order.tmin) 

133 

134 

135def orders_to_selection(orders): 

136 selection = [] 

137 for order in sorted(orders, key=orders_sort_key): 

138 selection.append( 

139 order.codes.nslc + (order.tmin, order.tmax)) 

140 

141 return combine_selections(selection) 

142 

143 

144class ErrorEntry(Object): 

145 time = Timestamp.T() 

146 order = WaveformOrder.T() 

147 kind = String.T() 

148 details = String.T(optional=True) 

149 

150 

151class ErrorAggregate(Object): 

152 site = String.T() 

153 kind = String.T() 

154 details = String.T() 

155 entries = List.T(ErrorEntry.T()) 

156 codes = List.T(CodesNSLCE.T()) 

157 time_spans = List.T(Tuple.T(2, Timestamp.T())) 

158 

159 def __str__(self): 

160 codes = [str(x) for x in self.codes] 

161 scodes = '\n' + util.ewrap(codes, indent=' ') if codes else '<none>' 

162 tss = self.time_spans 

163 sspans = '\n' + util.ewrap(('%s - %s' % ( 

164 util.time_to_str(ts[0]), util.time_to_str(ts[1])) for ts in tss), 

165 indent=' ') 

166 

167 return ('FDSN "%s": download error summary for "%s" (%i)\n%s ' 

168 'Codes:%s\n Time spans:%s') % ( 

169 self.site, 

170 self.kind, 

171 len(self.entries), 

172 ' Details: %s\n' % self.details if self.details else '', 

173 scodes, 

174 sspans) 

175 

176 

177class ErrorLog(Object): 

178 site = String.T() 

179 entries = List.T(ErrorEntry.T()) 

180 checkpoints = List.T(Int.T()) 

181 

182 def append_checkpoint(self): 

183 self.checkpoints.append(len(self.entries)) 

184 

185 def append(self, time, order, kind, details=''): 

186 entry = ErrorEntry(time=time, order=order, kind=kind, details=details) 

187 self.entries.append(entry) 

188 

189 def iter_aggregates(self): 

190 by_kind_details = defaultdict(list) 

191 for entry in self.entries: 

192 by_kind_details[entry.kind, entry.details].append(entry) 

193 

194 kind_details = sorted(by_kind_details.keys()) 

195 

196 for kind, details in kind_details: 

197 entries = by_kind_details[kind, details] 

198 codes = sorted(set(entry.order.codes for entry in entries)) 

199 selection = orders_to_selection(entry.order for entry in entries) 

200 time_spans = sorted(set(row[-2:] for row in selection)) 

201 yield ErrorAggregate( 

202 site=self.site, 

203 kind=kind, 

204 details=details, 

205 entries=entries, 

206 codes=codes, 

207 time_spans=time_spans) 

208 

209 def summarize_recent(self): 

210 ioff = self.checkpoints[-1] if self.checkpoints else 0 

211 recent = self.entries[ioff:] 

212 kinds = sorted(set(entry.kind for entry in recent)) 

213 if recent: 

214 return '%i error%s (%s)' % ( 

215 len(recent), util.plural_s(recent), '; '.join(kinds)) 

216 else: 

217 return '' 

218 

219 

220class FDSNSource(Source, has_paths.HasPaths): 

221 

222 ''' 

223 Squirrel data-source to transparently get data from FDSN web services. 

224 

225 Attaching an :py:class:`FDSNSource` object to a :py:class:`Squirrel` allows 

226 the latter to download station and waveform data from an FDSN web service 

227 should the data not already happen to be available locally. 

228 ''' 

229 

230 site = String.T( 

231 help='FDSN site url or alias name (see ' 

232 ':py:mod:`pyrocko.client.fdsn`).') 

233 

234 query_args = Dict.T( 

235 String.T(), String.T(), 

236 optional=True, 

237 help='Common query arguments, which are appended to all queries.') 

238 

239 expires = Duration.T( 

240 optional=True, 

241 help='Expiration time [s]. Information older than this will be ' 

242 'refreshed. This only applies to station-metadata. Waveforms do ' 

243 'not expire. If set to ``None`` neither type of data expires.') 

244 

245 cache_path = String.T( 

246 optional=True, 

247 help='Directory path where any downloaded waveforms and station ' 

248 'meta-data are to be kept. By default the Squirrel ' 

249 'environment\'s cache directory is used.') 

250 

251 shared_waveforms = Bool.T( 

252 default=False, 

253 help='If ``True``, waveforms are shared with other FDSN sources in ' 

254 'the same Squirrel environment. If ``False``, they are kept ' 

255 'separate.') 

256 

257 user_credentials = Tuple.T( 

258 2, String.T(), 

259 optional=True, 

260 help='User and password for FDSN servers requiring password ' 

261 'authentication') 

262 

263 auth_token = String.T( 

264 optional=True, 

265 help='Authentication token to be presented to the FDSN server.') 

266 

267 auth_token_path = String.T( 

268 optional=True, 

269 help='Path to file containing the authentication token to be ' 

270 'presented to the FDSN server.') 

271 

272 hotfix_module_path = has_paths.Path.T( 

273 optional=True, 

274 help='Path to Python module to locally patch metadata errors.') 

275 

276 def __init__(self, site, query_args=None, **kwargs): 

277 Source.__init__(self, site=site, query_args=query_args, **kwargs) 

278 

279 self._constraint = None 

280 self._hash = self.make_hash() 

281 self._source_id = 'client:fdsn:%s' % self._hash 

282 self._error_infos = [] 

283 

284 def describe(self): 

285 return self._source_id 

286 

287 def make_hash(self): 

288 s = self.site 

289 s += 'notoken' \ 

290 if (self.auth_token is None and self.auth_token_path is None) \ 

291 else 'token' 

292 

293 if self.user_credentials is not None: 

294 s += self.user_credentials[0] 

295 else: 

296 s += 'nocred' 

297 

298 if self.query_args is not None: 

299 s += ','.join( 

300 '%s:%s' % (k, self.query_args[k]) 

301 for k in sorted(self.query_args.keys())) 

302 else: 

303 s += 'noqueryargs' 

304 

305 return ehash(s) 

306 

307 def get_hash(self): 

308 return self._hash 

309 

310 def get_auth_token(self): 

311 if self.auth_token: 

312 return self.auth_token 

313 

314 elif self.auth_token_path is not None: 

315 try: 

316 with open(self.auth_token_path, 'rb') as f: 

317 return f.read().decode('ascii') 

318 

319 except OSError as e: 

320 raise FileLoadError( 

321 'Cannot load auth token file (%s): %s' 

322 % (str(e), self.auth_token_path)) 

323 

324 else: 

325 raise Exception( 

326 'FDSNSource: auth_token and auth_token_path are mutually ' 

327 'exclusive.') 

328 

329 def setup(self, squirrel, check=True): 

330 self._cache_path = op.join( 

331 self.cache_path or squirrel._cache_path, 'fdsn') 

332 

333 util.ensuredir(self._cache_path) 

334 self._load_constraint() 

335 self._archive = MSeedArchive() 

336 waveforms_path = self._get_waveforms_path() 

337 util.ensuredir(waveforms_path) 

338 self._archive.set_base_path(waveforms_path) 

339 

340 squirrel.add( 

341 self._get_waveforms_path(), 

342 check=check) 

343 

344 fn = self._get_channels_path() 

345 if os.path.exists(fn): 

346 squirrel.add(fn) 

347 

348 squirrel.add_virtual( 

349 [], virtual_paths=[self._source_id]) 

350 

351 responses_path = self._get_responses_path() 

352 if os.path.exists(responses_path): 

353 squirrel.add(responses_path, kinds=['response']) 

354 

355 self._hotfix_module = None 

356 

357 def _hotfix(self, query_type, sx): 

358 if self.hotfix_module_path is None: 

359 return 

360 

361 if self._hotfix_module is None: 

362 module_path = self.expand_path(self.hotfix_module_path) 

363 spec = importlib.util.spec_from_file_location( 

364 'hotfix_' + self._hash, module_path) 

365 self._hotfix_module = importlib.util.module_from_spec(spec) 

366 spec.loader.exec_module(self._hotfix_module) 

367 

368 hook = getattr( 

369 self._hotfix_module, 'stationxml_' + query_type + '_hook') 

370 

371 return hook(sx) 

372 

373 def _get_constraint_path(self): 

374 return op.join(self._cache_path, self._hash, 'constraint.pickle') 

375 

376 def _get_channels_path(self): 

377 return op.join(self._cache_path, self._hash, 'channels.stationxml') 

378 

379 def _get_responses_path(self, nslc=None): 

380 dirpath = op.join( 

381 self._cache_path, self._hash, 'responses') 

382 

383 if nslc is None: 

384 return dirpath 

385 else: 

386 return op.join( 

387 dirpath, 'response_%s_%s_%s_%s.stationxml' % nslc) 

388 

389 def _get_waveforms_path(self): 

390 if self.shared_waveforms: 

391 return op.join(self._cache_path, 'waveforms') 

392 else: 

393 return op.join(self._cache_path, self._hash, 'waveforms') 

394 

395 def _log_meta(self, message, target=logger.info): 

396 log_prefix = 'FDSN "%s" metadata:' % self.site 

397 target(' '.join((log_prefix, message))) 

398 

399 def _log_responses(self, message, target=logger.info): 

400 log_prefix = 'FDSN "%s" responses:' % self.site 

401 target(' '.join((log_prefix, message))) 

402 

403 def _log_info_data(self, *args): 

404 log_prefix = 'FDSN "%s" waveforms:' % self.site 

405 logger.info(' '.join((log_prefix,) + args)) 

406 

407 def _str_expires(self, t, now): 

408 if t is None: 

409 return 'expires: never' 

410 else: 

411 expire = 'expires' if t > now else 'expired' 

412 return '%s: %s' % ( 

413 expire, 

414 util.time_to_str(t, format='%Y-%m-%d %H:%M:%S')) 

415 

416 def update_channel_inventory(self, squirrel, constraint=None): 

417 if constraint is None: 

418 constraint = Constraint() 

419 

420 expiration_time = self._get_channels_expiration_time() 

421 now = time.time() 

422 

423 log_target = logger.info 

424 if self._constraint and self._constraint.contains(constraint) \ 

425 and (expiration_time is None or now < expiration_time): 

426 

427 status = 'using cached' 

428 

429 else: 

430 if self._constraint: 

431 constraint_temp = copy.deepcopy(self._constraint) 

432 constraint_temp.expand(constraint) 

433 constraint = constraint_temp 

434 

435 try: 

436 channel_sx = self._do_channel_query(constraint) 

437 

438 channel_sx.created = None # timestamp would ruin diff 

439 

440 fn = self._get_channels_path() 

441 util.ensuredirs(fn) 

442 fn_temp = fn + '.%i.temp' % os.getpid() 

443 channel_sx.dump_xml(filename=fn_temp) 

444 

445 status = move_or_keep(fn_temp, fn) 

446 

447 if status == 'upstream unchanged': 

448 squirrel.get_database().silent_touch(fn) 

449 

450 self._constraint = constraint 

451 self._dump_constraint() 

452 

453 except OSError as e: 

454 status = 'update failed (%s)' % str(e) 

455 log_target = logger.error 

456 

457 expiration_time = self._get_channels_expiration_time() 

458 self._log_meta( 

459 '%s (%s)' % (status, self._str_expires(expiration_time, now)), 

460 target=log_target) 

461 

462 fn = self._get_channels_path() 

463 if os.path.exists(fn): 

464 squirrel.add(fn) 

465 

466 def _do_channel_query(self, constraint): 

467 extra_args = {} 

468 

469 if self.site in sites_not_supporting['startbefore']: 

470 if constraint.tmin is not None and constraint.tmin != g_tmin: 

471 extra_args['starttime'] = constraint.tmin 

472 if constraint.tmax is not None and constraint.tmax != g_tmax: 

473 extra_args['endtime'] = constraint.tmax 

474 

475 else: 

476 if constraint.tmin is not None and constraint.tmin != g_tmin: 

477 extra_args['endafter'] = constraint.tmin 

478 if constraint.tmax is not None and constraint.tmax != g_tmax: 

479 extra_args['startbefore'] = constraint.tmax 

480 

481 if self.site not in sites_not_supporting['includerestricted']: 

482 extra_args.update( 

483 includerestricted=( 

484 self.user_credentials is not None 

485 or self.auth_token is not None 

486 or self.auth_token_path is not None)) 

487 

488 if self.query_args is not None: 

489 extra_args.update(self.query_args) 

490 

491 self._log_meta('querying...') 

492 

493 try: 

494 channel_sx = fdsn.station( 

495 site=self.site, 

496 format='text', 

497 level='channel', 

498 **extra_args) 

499 

500 self._hotfix('channel', channel_sx) 

501 

502 return channel_sx 

503 

504 except fdsn.EmptyResult: 

505 return stationxml.FDSNStationXML(source='dummy-empty-result') 

506 

507 def _load_constraint(self): 

508 fn = self._get_constraint_path() 

509 if op.exists(fn): 

510 with open(fn, 'rb') as f: 

511 self._constraint = pickle.load(f) 

512 else: 

513 self._constraint = None 

514 

515 def _dump_constraint(self): 

516 with open(self._get_constraint_path(), 'wb') as f: 

517 pickle.dump(self._constraint, f, protocol=2) 

518 

519 def _get_expiration_time(self, path): 

520 if self.expires is None: 

521 return None 

522 

523 try: 

524 t = os.stat(path)[8] 

525 return t + self.expires 

526 

527 except OSError: 

528 return 0.0 

529 

530 def _get_channels_expiration_time(self): 

531 return self._get_expiration_time(self._get_channels_path()) 

532 

533 def update_waveform_promises(self, squirrel, constraint): 

534 from ..base import gaps 

535 now = time.time() 

536 cpath = os.path.abspath(self._get_channels_path()) 

537 

538 ctmin = constraint.tmin 

539 ctmax = constraint.tmax 

540 

541 nuts = squirrel.iter_nuts( 

542 'channel', 

543 path=cpath, 

544 codes=constraint.codes, 

545 tmin=ctmin, 

546 tmax=ctmax) 

547 

548 coverages = squirrel.get_coverage( 

549 'waveform', 

550 codes=constraint.codes if constraint.codes else None, 

551 tmin=ctmin, 

552 tmax=ctmax) 

553 

554 codes_to_avail = defaultdict(list) 

555 for coverage in coverages: 

556 for tmin, tmax, _ in coverage.iter_spans(): 

557 codes_to_avail[coverage.codes].append((tmin, tmax)) 

558 

559 def sgaps(nut): 

560 for tmin, tmax in gaps( 

561 codes_to_avail[nut.codes], 

562 max(ctmin, nut.tmin) if ctmin is not None else nut.tmin, 

563 min(ctmax, nut.tmax) if ctmax is not None else nut.tmax): 

564 

565 subnut = clone(nut) 

566 subnut.tmin = tmin 

567 subnut.tmax = tmax 

568 yield subnut 

569 

570 def wanted(nuts): 

571 for nut in nuts: 

572 if nut.tmin < now: 

573 if nut.tmax > now: 

574 nut.tmax = now 

575 

576 for nut in sgaps(nut): 

577 yield nut 

578 

579 path = self._source_id 

580 squirrel.add_virtual( 

581 (make_waveform_promise_nut( 

582 file_path=path, 

583 **nut.waveform_promise_kwargs) for nut in wanted(nuts)), 

584 virtual_paths=[path]) 

585 

586 def remove_waveform_promises(self, squirrel, from_database='selection'): 

587 ''' 

588 Remove waveform promises from live selection or global database. 

589 

590 :param from_database: 

591 Remove from live selection ``'selection'`` or global database 

592 ``'global'``. 

593 ''' 

594 

595 path = self._source_id 

596 if from_database == 'selection': 

597 squirrel.remove(path) 

598 elif from_database == 'global': 

599 squirrel.get_database().remove(path) 

600 else: 

601 raise ValueError( 

602 'Values allowed for from_database: ("selection", "global")') 

603 

604 def _get_user_credentials(self): 

605 d = {} 

606 if self.user_credentials is not None: 

607 d['user'], d['passwd'] = self.user_credentials 

608 

609 if self.auth_token is not None or self.auth_token_path is not None: 

610 d['token'] = self.get_auth_token() 

611 

612 return d 

613 

614 def download_waveforms( 

615 self, orders, success, batch_add, error_permanent, 

616 error_temporary): 

617 

618 elog = ErrorLog(site=self.site) 

619 orders.sort(key=orders_sort_key) 

620 neach = 20 

621 i = 0 

622 task = make_task( 

623 'FDSN "%s" waveforms: downloading' % self.site, len(orders)) 

624 

625 while i < len(orders): 

626 orders_now = orders[i:i+neach] 

627 selection_now = orders_to_selection(orders_now) 

628 

629 nsuccess = 0 

630 elog.append_checkpoint() 

631 self._log_info_data( 

632 'downloading, %s' % order_summary(orders_now)) 

633 

634 all_paths = [] 

635 with tempfile.TemporaryDirectory() as tmpdir: 

636 try: 

637 data = fdsn.dataselect( 

638 site=self.site, selection=selection_now, 

639 **self._get_user_credentials()) 

640 

641 now = time.time() 

642 

643 path = op.join(tmpdir, 'tmp.mseed') 

644 with open(path, 'wb') as f: 

645 while True: 

646 buf = data.read(1024) 

647 if not buf: 

648 break 

649 f.write(buf) 

650 

651 trs = io.load(path) 

652 

653 by_nslc = defaultdict(list) 

654 for tr in trs: 

655 by_nslc[tr.nslc_id].append(tr) 

656 

657 for order in orders_now: 

658 trs_order = [] 

659 err_this = None 

660 for tr in by_nslc[order.codes.nslc]: 

661 try: 

662 order.validate(tr) 

663 trs_order.append(tr.chop( 

664 order.tmin, order.tmax, inplace=False)) 

665 

666 except trace.NoData: 

667 err_this = ( 

668 'empty result', 'empty sub-interval') 

669 

670 except InvalidWaveform as e: 

671 err_this = ('invalid waveform', str(e)) 

672 

673 if len(trs_order) == 0: 

674 if err_this is None: 

675 err_this = ('empty result', '') 

676 

677 elog.append(now, order, *err_this) 

678 error_permanent(order) 

679 else: 

680 if len(trs_order) != 1: 

681 if err_this: 

682 elog.append( 

683 now, order, 

684 'partial result, %s' % err_this[0], 

685 err_this[1]) 

686 else: 

687 elog.append(now, order, 'partial result') 

688 

689 paths = self._archive.add(trs_order) 

690 all_paths.extend(paths) 

691 

692 nsuccess += 1 

693 success(order) 

694 

695 except fdsn.EmptyResult: 

696 now = time.time() 

697 for order in orders_now: 

698 elog.append(now, order, 'empty result') 

699 error_permanent(order) 

700 

701 except util.HTTPError as e: 

702 now = time.time() 

703 for order in orders_now: 

704 elog.append(now, order, 'http error', str(e)) 

705 error_temporary(order) 

706 

707 emessage = elog.summarize_recent() 

708 self._log_info_data( 

709 '%i download%s successful' % ( 

710 nsuccess, util.plural_s(nsuccess)) 

711 + (', %s' % emessage if emessage else '')) 

712 

713 if all_paths: 

714 batch_add(all_paths) 

715 

716 i += neach 

717 task.update(i) 

718 

719 for agg in elog.iter_aggregates(): 

720 logger.warning(str(agg)) 

721 

722 task.done() 

723 

724 def _do_response_query(self, selection): 

725 extra_args = {} 

726 

727 if self.site not in sites_not_supporting['includerestricted']: 

728 extra_args.update( 

729 includerestricted=( 

730 self.user_credentials is not None 

731 or self.auth_token is not None 

732 or self.auth_token_path is not None)) 

733 

734 self._log_responses('querying...') 

735 

736 try: 

737 response_sx = fdsn.station( 

738 site=self.site, 

739 level='response', 

740 selection=selection, 

741 **extra_args) 

742 

743 self._hotfix('response', response_sx) 

744 return response_sx 

745 

746 except fdsn.EmptyResult: 

747 return stationxml.FDSNStationXML(source='dummy-empty-result') 

748 

749 def update_response_inventory(self, squirrel, constraint): 

750 cpath = os.path.abspath(self._get_channels_path()) 

751 nuts = squirrel.iter_nuts( 

752 'channel', path=cpath, codes=constraint.codes) 

753 

754 tmin = g_tmin_queries 

755 tmax = g_tmax 

756 

757 selection = [] 

758 now = time.time() 

759 have = set() 

760 status = defaultdict(list) 

761 for nut in nuts: 

762 nslc = nut.codes.nslc 

763 if nslc in have: 

764 continue 

765 have.add(nslc) 

766 

767 fn = self._get_responses_path(nslc) 

768 expiration_time = self._get_expiration_time(fn) 

769 if os.path.exists(fn) \ 

770 and (expiration_time is None or now < expiration_time): 

771 status['using cached'].append(nslc) 

772 else: 

773 selection.append(nslc + (tmin, tmax)) 

774 

775 dummy = stationxml.FDSNStationXML(source='dummy-empty') 

776 neach = 100 

777 i = 0 

778 fns = [] 

779 while i < len(selection): 

780 selection_now = selection[i:i+neach] 

781 i += neach 

782 

783 try: 

784 sx = self._do_response_query(selection_now) 

785 except Exception as e: 

786 status['update failed (%s)' % str(e)].extend( 

787 entry[:4] for entry in selection_now) 

788 continue 

789 

790 sx.created = None # timestamp would ruin diff 

791 

792 by_nslc = dict(stationxml.split_channels(sx)) 

793 

794 for entry in selection_now: 

795 nslc = entry[:4] 

796 response_sx = by_nslc.get(nslc, dummy) 

797 try: 

798 fn = self._get_responses_path(nslc) 

799 fn_temp = fn + '.%i.temp' % os.getpid() 

800 

801 util.ensuredirs(fn_temp) 

802 response_sx.dump_xml(filename=fn_temp) 

803 

804 status_this = move_or_keep(fn_temp, fn) 

805 

806 if status_this == 'upstream unchanged': 

807 try: 

808 squirrel.get_database().silent_touch(fn) 

809 except ExecuteGet1Error: 

810 pass 

811 

812 status[status_this].append(nslc) 

813 fns.append(fn) 

814 

815 except OSError as e: 

816 status['update failed (%s)' % str(e)].append(nslc) 

817 

818 for k in sorted(status): 

819 if k.find('failed') != -1: 

820 log_target = logger.error 

821 else: 

822 log_target = logger.info 

823 

824 self._log_responses( 

825 '%s: %s' % ( 

826 k, codes_to_str_abbreviated( 

827 CodesNSLCE(tup) for tup in status[k])), 

828 target=log_target) 

829 

830 squirrel.add(fns, kinds=['response']) 

831 

832 

833__all__ = [ 

834 'FDSNSource', 

835]