1# http://pyrocko.org - GPLv3 

2# 

3# The Pyrocko Developers, 21st Century 

4# ---|P------/S----------~Lg---------- 

5 

6from __future__ import absolute_import, print_function 

7 

8import time 

9import os 

10import copy 

11import logging 

12import tempfile 

13from collections import defaultdict 

14try: 

15 import cPickle as pickle 

16except ImportError: 

17 import pickle 

18import os.path as op 

19from .base import Source, Constraint 

20from ..model import make_waveform_promise_nut, ehash, InvalidWaveform, \ 

21 order_summary, WaveformOrder, g_tmin, g_tmax, g_tmin_queries, \ 

22 codes_to_str_abbreviated 

23from ..database import ExecuteGet1Error 

24from pyrocko.client import fdsn 

25 

26from pyrocko import util, trace, io 

27from pyrocko.io.io_common import FileLoadError 

28from pyrocko.io import stationxml 

29from pyrocko.progress import progress 

30 

31from pyrocko.guts import Object, String, Timestamp, List, Tuple, Int, Dict, \ 

32 Duration, Bool 

33 

34guts_prefix = 'squirrel' 

35 

36fdsn.g_timeout = 60. 

37 

38logger = logging.getLogger('psq.client.fdsn') 

39 

40sites_not_supporting = { 

41 'startbefore': ['geonet'], 

42 'includerestricted': ['geonet']} 

43 

44 

45def make_task(*args): 

46 return progress.task(*args, logger=logger) 

47 

48 

49def plural_s(x): 

50 if not isinstance(x, int): 

51 x = len(x) 

52 

53 return 's' if x != 1 else '' 

54 

55 

56def diff(fn_a, fn_b): 

57 try: 

58 if os.stat(fn_a).st_size != os.stat(fn_b).st_size: 

59 return True 

60 

61 except OSError: 

62 return True 

63 

64 with open(fn_a, 'rb') as fa: 

65 with open(fn_b, 'rb') as fb: 

66 while True: 

67 a = fa.read(1024) 

68 b = fb.read(1024) 

69 if a != b: 

70 return True 

71 

72 if len(a) == 0 or len(b) == 0: 

73 return False 

74 

75 

76def move_or_keep(fn_temp, fn): 

77 if op.exists(fn): 

78 if diff(fn, fn_temp): 

79 os.rename(fn_temp, fn) 

80 status = 'updated' 

81 else: 

82 os.unlink(fn_temp) 

83 status = 'upstream unchanged' 

84 

85 else: 

86 os.rename(fn_temp, fn) 

87 status = 'new' 

88 

89 return status 

90 

91 

92class Archive(Object): 

93 

94 def add(self): 

95 raise NotImplementedError() 

96 

97 

98class MSeedArchive(Archive): 

99 template = String.T(default=op.join( 

100 '%(tmin_year)s', 

101 '%(tmin_month)s', 

102 '%(tmin_day)s', 

103 'trace_%(network)s_%(station)s_%(location)s_%(channel)s' 

104 + '_%(tmin_us)s_%(tmax_us)s.mseed')) 

105 

106 def __init__(self, **kwargs): 

107 Archive.__init__(self, **kwargs) 

108 self._base_path = None 

109 

110 def set_base_path(self, path): 

111 self._base_path = path 

112 

113 def add(self, trs): 

114 path = op.join(self._base_path, self.template) 

115 return io.save(trs, path, overwrite=True) 

116 

117 

118def combine_selections(selection): 

119 out = [] 

120 last = None 

121 for this in selection: 

122 if last and this[:4] == last[:4] and this[4] == last[5]: 

123 last = last[:5] + (this[5],) 

124 else: 

125 if last: 

126 out.append(last) 

127 

128 last = this 

129 

130 if last: 

131 out.append(last) 

132 

133 return out 

134 

135 

136def orders_sort_key(order): 

137 return (order.codes, order.tmin) 

138 

139 

140def orders_to_selection(orders): 

141 selection = [] 

142 for order in sorted(orders, key=orders_sort_key): 

143 selection.append( 

144 order.codes[1:5] + (order.tmin, order.tmax)) 

145 

146 return combine_selections(selection) 

147 

148 

149class ErrorEntry(Object): 

150 time = Timestamp.T() 

151 order = WaveformOrder.T() 

152 kind = String.T() 

153 details = String.T(optional=True) 

154 

155 

156class ErrorAggregate(Object): 

157 site = String.T() 

158 kind = String.T() 

159 details = String.T() 

160 entries = List.T(ErrorEntry.T()) 

161 codes_list = List.T(Tuple.T(None, String.T())) 

162 time_spans = List.T(Tuple.T(2, Timestamp.T())) 

163 

164 def __str__(self): 

165 codes = ['.'.join(x) for x in self.codes_list] 

166 scodes = '\n' + util.ewrap(codes, indent=' ') if codes else '<none>' 

167 tss = self.time_spans 

168 sspans = '\n' + util.ewrap(('%s - %s' % ( 

169 util.time_to_str(ts[0]), util.time_to_str(ts[1])) for ts in tss), 

170 indent=' ') 

171 

172 return ('FDSN "%s": download error summary for "%s" (%i)\n%s ' 

173 'Codes:%s\n Time spans:%s') % ( 

174 self.site, 

175 self.kind, 

176 len(self.entries), 

177 ' Details: %s\n' % self.details if self.details else '', 

178 scodes, 

179 sspans) 

180 

181 

182class ErrorLog(Object): 

183 site = String.T() 

184 entries = List.T(ErrorEntry.T()) 

185 checkpoints = List.T(Int.T()) 

186 

187 def append_checkpoint(self): 

188 self.checkpoints.append(len(self.entries)) 

189 

190 def append(self, time, order, kind, details=''): 

191 entry = ErrorEntry(time=time, order=order, kind=kind, details=details) 

192 self.entries.append(entry) 

193 

194 def iter_aggregates(self): 

195 by_kind_details = defaultdict(list) 

196 for entry in self.entries: 

197 by_kind_details[entry.kind, entry.details].append(entry) 

198 

199 kind_details = sorted(by_kind_details.keys()) 

200 

201 for kind, details in kind_details: 

202 entries = by_kind_details[kind, details] 

203 codes_list = sorted(set(entry.order.codes for entry in entries)) 

204 selection = orders_to_selection(entry.order for entry in entries) 

205 time_spans = sorted(set(row[-2:] for row in selection)) 

206 yield ErrorAggregate( 

207 site=self.site, 

208 kind=kind, 

209 details=details, 

210 entries=entries, 

211 codes_list=codes_list, 

212 time_spans=time_spans) 

213 

214 def summarize_recent(self): 

215 ioff = self.checkpoints[-1] if self.checkpoints else 0 

216 recent = self.entries[ioff:] 

217 kinds = sorted(set(entry.kind for entry in recent)) 

218 if recent: 

219 return '%i error%s (%s)' % ( 

220 len(recent), plural_s(recent), '; '.join(kinds)) 

221 else: 

222 return '' 

223 

224 

225class FDSNSource(Source): 

226 

227 ''' 

228 Squirrel data-source to transparently get data from FDSN web services. 

229 

230 Attaching an :py:class:`FDSNSource` object to a :py:class:`Squirrel` allows 

231 the latter to download station and waveform data from an FDSN web service 

232 should the data not already happen to be available locally. 

233 ''' 

234 

235 site = String.T( 

236 help='FDSN site url or alias name (see ' 

237 ':py:mod:`pyrocko.client.fdsn`).') 

238 

239 query_args = Dict.T( 

240 String.T(), String.T(), 

241 optional=True, 

242 help='Common query arguments, which are appended to all queries.') 

243 

244 expires = Duration.T( 

245 optional=True, 

246 help='Expiration time [s]. Information older than this will be ' 

247 'refreshed. This only applies to station-metadata. Waveforms do ' 

248 'not expire. If set to ``None`` neither type of data expires.') 

249 

250 cache_path = String.T( 

251 optional=True, 

252 help='Directory path where any downloaded waveforms and station ' 

253 'meta-data are to be kept. By default the Squirrel ' 

254 'environment\'s cache directory is used.') 

255 

256 shared_waveforms = Bool.T( 

257 default=False, 

258 help='If ``True``, waveforms are shared with other FDSN sources in ' 

259 'the same Squirrel environment. If ``False``, they are kept ' 

260 'separate.') 

261 

262 user_credentials = Tuple.T( 

263 2, String.T(), 

264 optional=True, 

265 help='User and password for FDSN servers requiring password ' 

266 'authentication') 

267 

268 auth_token = String.T( 

269 optional=True, 

270 help='Authentication token to be presented to the FDSN server.') 

271 

272 auth_token_path = String.T( 

273 optional=True, 

274 help='Path to file containing the authentication token to be ' 

275 'presented to the FDSN server.') 

276 

277 def __init__(self, site, query_args=None, **kwargs): 

278 Source.__init__(self, site=site, query_args=query_args, **kwargs) 

279 

280 self._constraint = None 

281 self._hash = self.make_hash() 

282 self._source_id = 'client:fdsn:%s' % self._hash 

283 self._error_infos = [] 

284 

285 def describe(self): 

286 return self._source_id 

287 

288 def make_hash(self): 

289 s = self.site 

290 s += 'notoken' \ 

291 if (self.auth_token is None and self.auth_token_path is None) \ 

292 else 'token' 

293 

294 if self.user_credentials is not None: 

295 s += self.user_credentials[0] 

296 else: 

297 s += 'nocred' 

298 

299 if self.query_args is not None: 

300 s += ','.join( 

301 '%s:%s' % (k, self.query_args[k]) 

302 for k in sorted(self.query_args.keys())) 

303 else: 

304 s += 'noqueryargs' 

305 

306 return ehash(s) 

307 

308 def get_hash(self): 

309 return self._hash 

310 

311 def get_auth_token(self): 

312 if self.auth_token: 

313 return self.auth_token 

314 

315 elif self.auth_token_path is not None: 

316 try: 

317 with open(self.auth_token_path, 'rb') as f: 

318 return f.read().decode('ascii') 

319 

320 except OSError as e: 

321 raise FileLoadError( 

322 'Cannot load auth token file (%s): %s' 

323 % (str(e), self.auth_token_path)) 

324 

325 else: 

326 raise Exception( 

327 'FDSNSource: auth_token and auth_token_path are mutually ' 

328 'exclusive.') 

329 

330 def setup(self, squirrel, check=True): 

331 self._cache_path = op.join( 

332 self.cache_path or squirrel._cache_path, 'fdsn') 

333 

334 util.ensuredir(self._cache_path) 

335 self._load_constraint() 

336 self._archive = MSeedArchive() 

337 waveforms_path = self._get_waveforms_path() 

338 util.ensuredir(waveforms_path) 

339 self._archive.set_base_path(waveforms_path) 

340 

341 squirrel.add( 

342 self._get_waveforms_path(), 

343 check=check) 

344 

345 fn = self._get_channels_path() 

346 if os.path.exists(fn): 

347 squirrel.add(fn) 

348 

349 squirrel.add_virtual( 

350 [], virtual_paths=[self._source_id]) 

351 

352 responses_path = self._get_responses_path() 

353 if os.path.exists(responses_path): 

354 squirrel.add(responses_path, kinds=['response']) 

355 

356 def _get_constraint_path(self): 

357 return op.join(self._cache_path, self._hash, 'constraint.pickle') 

358 

359 def _get_channels_path(self): 

360 return op.join(self._cache_path, self._hash, 'channels.stationxml') 

361 

362 def _get_responses_path(self, nslc=None): 

363 dirpath = op.join( 

364 self._cache_path, self._hash, 'responses') 

365 

366 if nslc is None: 

367 return dirpath 

368 else: 

369 return op.join( 

370 dirpath, 'response_%s_%s_%s_%s.stationxml' % nslc) 

371 

372 def _get_waveforms_path(self): 

373 if self.shared_waveforms: 

374 return op.join(self._cache_path, 'waveforms') 

375 else: 

376 return op.join(self._cache_path, self._hash, 'waveforms') 

377 

378 def _log_meta(self, message, target=logger.info): 

379 log_prefix = 'FDSN "%s" metadata:' % self.site 

380 target(' '.join((log_prefix, message))) 

381 

382 def _log_responses(self, message, target=logger.info): 

383 log_prefix = 'FDSN "%s" responses:' % self.site 

384 target(' '.join((log_prefix, message))) 

385 

386 def _log_info_data(self, *args): 

387 log_prefix = 'FDSN "%s" waveforms:' % self.site 

388 logger.info(' '.join((log_prefix,) + args)) 

389 

390 def _str_expires(self, t, now): 

391 if t is None: 

392 return 'expires: never' 

393 else: 

394 expire = 'expires' if t > now else 'expired' 

395 return '%s: %s' % ( 

396 expire, 

397 util.time_to_str(t, format='%Y-%m-%d %H:%M:%S')) 

398 

399 def update_channel_inventory(self, squirrel, constraint=None): 

400 if constraint is None: 

401 constraint = Constraint() 

402 

403 expiration_time = self._get_channels_expiration_time() 

404 now = time.time() 

405 

406 log_target = logger.info 

407 if self._constraint and self._constraint.contains(constraint) \ 

408 and (expiration_time is None or now < expiration_time): 

409 

410 status = 'using cached' 

411 

412 else: 

413 if self._constraint: 

414 constraint_temp = copy.deepcopy(self._constraint) 

415 constraint_temp.expand(constraint) 

416 constraint = constraint_temp 

417 

418 try: 

419 channel_sx = self._do_channel_query(constraint) 

420 

421 channel_sx.created = None # timestamp would ruin diff 

422 

423 fn = self._get_channels_path() 

424 util.ensuredirs(fn) 

425 fn_temp = fn + '.%i.temp' % os.getpid() 

426 channel_sx.dump_xml(filename=fn_temp) 

427 

428 status = move_or_keep(fn_temp, fn) 

429 

430 if status == 'upstream unchanged': 

431 squirrel.get_database().silent_touch(fn) 

432 

433 self._constraint = constraint 

434 self._dump_constraint() 

435 

436 except OSError as e: 

437 status = 'update failed (%s)' % str(e) 

438 log_target = logger.error 

439 

440 expiration_time = self._get_channels_expiration_time() 

441 self._log_meta( 

442 '%s (%s)' % (status, self._str_expires(expiration_time, now)), 

443 target=log_target) 

444 

445 fn = self._get_channels_path() 

446 if os.path.exists(fn): 

447 squirrel.add(fn) 

448 

449 def _do_channel_query(self, constraint): 

450 extra_args = {} 

451 

452 if self.site in sites_not_supporting['startbefore']: 

453 if constraint.tmin is not None and constraint.tmin != g_tmin: 

454 extra_args['starttime'] = constraint.tmin 

455 if constraint.tmax is not None and constraint.tmax != g_tmax: 

456 extra_args['endtime'] = constraint.tmax 

457 

458 else: 

459 if constraint.tmin is not None and constraint.tmin != g_tmin: 

460 extra_args['endafter'] = constraint.tmin 

461 if constraint.tmax is not None and constraint.tmax != g_tmax: 

462 extra_args['startbefore'] = constraint.tmax 

463 

464 if self.site not in sites_not_supporting['includerestricted']: 

465 extra_args.update( 

466 includerestricted=( 

467 self.user_credentials is not None 

468 or self.auth_token is not None 

469 or self.auth_token_path is not None)) 

470 

471 if self.query_args is not None: 

472 extra_args.update(self.query_args) 

473 

474 self._log_meta('querying...') 

475 

476 try: 

477 channel_sx = fdsn.station( 

478 site=self.site, 

479 format='text', 

480 level='channel', 

481 **extra_args) 

482 return channel_sx 

483 

484 except fdsn.EmptyResult: 

485 return stationxml.FDSNStationXML(source='dummy-empty-result') 

486 

487 def _load_constraint(self): 

488 fn = self._get_constraint_path() 

489 if op.exists(fn): 

490 with open(fn, 'rb') as f: 

491 self._constraint = pickle.load(f) 

492 else: 

493 self._constraint = None 

494 

495 def _dump_constraint(self): 

496 with open(self._get_constraint_path(), 'wb') as f: 

497 pickle.dump(self._constraint, f, protocol=2) 

498 

499 def _get_expiration_time(self, path): 

500 if self.expires is None: 

501 return None 

502 

503 try: 

504 t = os.stat(path)[8] 

505 return t + self.expires 

506 

507 except OSError: 

508 return 0.0 

509 

510 def _get_channels_expiration_time(self): 

511 return self._get_expiration_time(self._get_channels_path()) 

512 

513 def update_waveform_promises(self, squirrel, constraint): 

514 cpath = os.path.abspath(self._get_channels_path()) 

515 nuts = squirrel.iter_nuts( 

516 'channel', path=cpath, codes=constraint.codes) 

517 

518 path = self._source_id 

519 squirrel.add_virtual( 

520 (make_waveform_promise_nut( 

521 file_path=path, 

522 **nut.waveform_promise_kwargs) for nut in nuts), 

523 virtual_paths=[path]) 

524 

525 def _get_user_credentials(self): 

526 d = {} 

527 if self.user_credentials is not None: 

528 d['user'], d['passwd'] = self.user_credentials 

529 

530 if self.auth_token is not None or self.auth_token_path is not None: 

531 d['token'] = self.get_auth_token() 

532 

533 return d 

534 

535 def download_waveforms( 

536 self, orders, success, batch_add, error_permanent, 

537 error_temporary): 

538 

539 elog = ErrorLog(site=self.site) 

540 orders.sort(key=orders_sort_key) 

541 neach = 20 

542 i = 0 

543 task = make_task( 

544 'FDSN "%s" waveforms: downloading' % self.site, len(orders)) 

545 

546 while i < len(orders): 

547 orders_now = orders[i:i+neach] 

548 selection_now = orders_to_selection(orders_now) 

549 

550 nsuccess = 0 

551 elog.append_checkpoint() 

552 self._log_info_data( 

553 'downloading, %s' % order_summary(orders_now)) 

554 

555 all_paths = [] 

556 with tempfile.TemporaryDirectory() as tmpdir: 

557 try: 

558 data = fdsn.dataselect( 

559 site=self.site, selection=selection_now, 

560 **self._get_user_credentials()) 

561 

562 now = time.time() 

563 

564 path = op.join(tmpdir, 'tmp.mseed') 

565 with open(path, 'wb') as f: 

566 while True: 

567 buf = data.read(1024) 

568 if not buf: 

569 break 

570 f.write(buf) 

571 

572 trs = io.load(path) 

573 

574 by_nslc = defaultdict(list) 

575 for tr in trs: 

576 by_nslc[tr.nslc_id].append(tr) 

577 

578 for order in orders_now: 

579 trs_order = [] 

580 err_this = None 

581 for tr in by_nslc[order.codes[1:5]]: 

582 try: 

583 order.validate(tr) 

584 trs_order.append(tr.chop( 

585 order.tmin, order.tmax, inplace=False)) 

586 

587 except trace.NoData: 

588 err_this = ( 

589 'empty result', 'empty sub-interval') 

590 

591 except InvalidWaveform as e: 

592 err_this = ('invalid waveform', str(e)) 

593 

594 if len(trs_order) == 0: 

595 if err_this is None: 

596 err_this = ('empty result', '') 

597 

598 elog.append(now, order, *err_this) 

599 error_permanent(order) 

600 else: 

601 if len(trs_order) != 1: 

602 if err_this: 

603 elog.append( 

604 now, order, 

605 'partial result, %s' % err_this[0], 

606 err_this[1]) 

607 else: 

608 elog.append(now, order, 'partial result') 

609 

610 paths = self._archive.add(trs_order) 

611 all_paths.extend(paths) 

612 

613 nsuccess += 1 

614 success(order) 

615 

616 except fdsn.EmptyResult: 

617 now = time.time() 

618 for order in orders_now: 

619 elog.append(now, order, 'empty result') 

620 error_permanent(order) 

621 

622 except util.HTTPError as e: 

623 now = time.time() 

624 for order in orders_now: 

625 elog.append(now, order, 'http error', str(e)) 

626 error_temporary(order) 

627 

628 emessage = elog.summarize_recent() 

629 self._log_info_data( 

630 '%i download%s successful' % (nsuccess, plural_s(nsuccess)) 

631 + (', %s' % emessage if emessage else '')) 

632 

633 if all_paths: 

634 batch_add(all_paths) 

635 

636 i += neach 

637 task.update(i) 

638 

639 for agg in elog.iter_aggregates(): 

640 logger.warning(str(agg)) 

641 

642 task.done() 

643 

644 def _do_response_query(self, selection): 

645 extra_args = {} 

646 

647 if self.site not in sites_not_supporting['includerestricted']: 

648 extra_args.update( 

649 includerestricted=( 

650 self.user_credentials is not None 

651 or self.auth_token is not None 

652 or self.auth_token_path is not None)) 

653 

654 self._log_responses('querying...') 

655 

656 try: 

657 response_sx = fdsn.station( 

658 site=self.site, 

659 level='response', 

660 selection=selection, 

661 **extra_args) 

662 

663 return response_sx 

664 

665 except fdsn.EmptyResult: 

666 return stationxml.FDSNStationXML(source='dummy-empty-result') 

667 

668 def update_response_inventory(self, squirrel, constraint): 

669 cpath = os.path.abspath(self._get_channels_path()) 

670 nuts = squirrel.iter_nuts('channel', path=cpath) 

671 

672 tmin = g_tmin_queries 

673 tmax = g_tmax 

674 

675 selection = [] 

676 now = time.time() 

677 have = set() 

678 status = defaultdict(list) 

679 for nut in nuts: 

680 nslc = nut.codes_tuple[1:5] 

681 if nslc in have: 

682 continue 

683 have.add(nslc) 

684 

685 fn = self._get_responses_path(nslc) 

686 expiration_time = self._get_expiration_time(fn) 

687 if os.path.exists(fn) \ 

688 and (expiration_time is None or now < expiration_time): 

689 status['using cached'].append(nslc) 

690 else: 

691 selection.append(nslc + (tmin, tmax)) 

692 

693 dummy = stationxml.FDSNStationXML(source='dummy-empty') 

694 neach = 100 

695 i = 0 

696 fns = [] 

697 while i < len(selection): 

698 selection_now = selection[i:i+neach] 

699 i += neach 

700 

701 try: 

702 sx = self._do_response_query(selection_now) 

703 except Exception as e: 

704 status['update failed (%s)' % str(e)].extend( 

705 entry[:4] for entry in selection_now) 

706 continue 

707 

708 sx.created = None # timestamp would ruin diff 

709 

710 by_nslc = dict(stationxml.split_channels(sx)) 

711 

712 for entry in selection_now: 

713 nslc = entry[:4] 

714 response_sx = by_nslc.get(nslc, dummy) 

715 try: 

716 fn = self._get_responses_path(nslc) 

717 fn_temp = fn + '.%i.temp' % os.getpid() 

718 

719 util.ensuredirs(fn_temp) 

720 response_sx.dump_xml(filename=fn_temp) 

721 

722 status_this = move_or_keep(fn_temp, fn) 

723 

724 if status_this == 'upstream unchanged': 

725 try: 

726 squirrel.get_database().silent_touch(fn) 

727 except ExecuteGet1Error: 

728 pass 

729 

730 status[status_this].append(nslc) 

731 fns.append(fn) 

732 

733 except OSError as e: 

734 status['update failed (%s)' % str(e)].append(nslc) 

735 

736 for k in sorted(status): 

737 if k.find('failed') != -1: 

738 log_target = logger.error 

739 else: 

740 log_target = logger.info 

741 

742 self._log_responses( 

743 '%s: %s' % ( 

744 k, codes_to_str_abbreviated(status[k])), 

745 target=log_target) 

746 

747 squirrel.add(fns, kinds=['response']) 

748 

749 

750__all__ = [ 

751 'FDSNSource', 

752]