1# http://pyrocko.org - GPLv3 

2# 

3# The Pyrocko Developers, 21st Century 

4# ---|P------/S----------~Lg---------- 

5 

6from __future__ import absolute_import, print_function 

7 

8import time 

9import os 

10import copy 

11import logging 

12import tempfile 

13from collections import defaultdict 

14try: 

15 import cPickle as pickle 

16except ImportError: 

17 import pickle 

18import os.path as op 

19from .base import Source, Constraint 

20from ..model import make_waveform_promise_nut, ehash, InvalidWaveform, \ 

21 order_summary, WaveformOrder, g_tmin, g_tmax, g_tmin_queries, \ 

22 codes_to_str_abbreviated 

23from ..database import ExecuteGet1Error 

24from pyrocko.client import fdsn 

25 

26from pyrocko import util, trace, io 

27from pyrocko.io.io_common import FileLoadError 

28from pyrocko.io import stationxml 

29from pyrocko.progress import progress 

30 

31from pyrocko.guts import Object, String, Timestamp, List, Tuple, Int, Dict, \ 

32 Duration, Bool 

33 

34guts_prefix = 'squirrel' 

35 

36fdsn.g_timeout = 60. 

37 

38logger = logging.getLogger('psq.client.fdsn') 

39 

40sites_not_supporting = { 

41 'startbefore': ['geonet'], 

42 'includerestricted': ['geonet']} 

43 

44 

45def make_task(*args): 

46 return progress.task(*args, logger=logger) 

47 

48 

49def plural_s(x): 

50 if not isinstance(x, int): 

51 x = len(x) 

52 

53 return 's' if x != 1 else '' 

54 

55 

56def diff(fn_a, fn_b): 

57 try: 

58 if os.stat(fn_a).st_size != os.stat(fn_b).st_size: 

59 return True 

60 

61 except OSError: 

62 return True 

63 

64 with open(fn_a, 'rb') as fa: 

65 with open(fn_b, 'rb') as fb: 

66 while True: 

67 a = fa.read(1024) 

68 b = fb.read(1024) 

69 if a != b: 

70 return True 

71 

72 if len(a) == 0 or len(b) == 0: 

73 return False 

74 

75 

76def move_or_keep(fn_temp, fn): 

77 if op.exists(fn): 

78 if diff(fn, fn_temp): 

79 os.rename(fn_temp, fn) 

80 status = 'updated' 

81 else: 

82 os.unlink(fn_temp) 

83 status = 'upstream unchanged' 

84 

85 else: 

86 os.rename(fn_temp, fn) 

87 status = 'new' 

88 

89 return status 

90 

91 

92class Archive(Object): 

93 

94 def add(self): 

95 raise NotImplementedError() 

96 

97 

98class MSeedArchive(Archive): 

99 template = String.T(default=op.join( 

100 '%(tmin_year)s', 

101 '%(tmin_month)s', 

102 '%(tmin_day)s', 

103 'trace_%(network)s_%(station)s_%(location)s_%(channel)s' 

104 + '_%(tmin_us)s_%(tmax_us)s.mseed')) 

105 

106 def __init__(self, **kwargs): 

107 Archive.__init__(self, **kwargs) 

108 self._base_path = None 

109 

110 def set_base_path(self, path): 

111 self._base_path = path 

112 

113 def add(self, trs): 

114 path = op.join(self._base_path, self.template) 

115 return io.save(trs, path, overwrite=True) 

116 

117 

118def combine_selections(selection): 

119 out = [] 

120 last = None 

121 for this in selection: 

122 if last and this[:4] == last[:4] and this[4] == last[5]: 

123 last = last[:5] + (this[5],) 

124 else: 

125 if last: 

126 out.append(last) 

127 

128 last = this 

129 

130 if last: 

131 out.append(last) 

132 

133 return out 

134 

135 

136def orders_sort_key(order): 

137 return (order.codes, order.tmin) 

138 

139 

140def orders_to_selection(orders): 

141 selection = [] 

142 for order in sorted(orders, key=orders_sort_key): 

143 selection.append( 

144 order.codes[1:5] + (order.tmin, order.tmax)) 

145 

146 return combine_selections(selection) 

147 

148 

149class ErrorEntry(Object): 

150 time = Timestamp.T() 

151 order = WaveformOrder.T() 

152 kind = String.T() 

153 details = String.T(optional=True) 

154 

155 

156class ErrorAggregate(Object): 

157 site = String.T() 

158 kind = String.T() 

159 details = String.T() 

160 entries = List.T(ErrorEntry.T()) 

161 codes_list = List.T(Tuple.T(None, String.T())) 

162 time_spans = List.T(Tuple.T(2, Timestamp.T())) 

163 

164 def __str__(self): 

165 codes = ['.'.join(x) for x in self.codes_list] 

166 scodes = '\n' + util.ewrap(codes, indent=' ') if codes else '<none>' 

167 tss = self.time_spans 

168 sspans = '\n' + util.ewrap(('%s - %s' % ( 

169 util.time_to_str(ts[0]), util.time_to_str(ts[1])) for ts in tss), 

170 indent=' ') 

171 

172 return ('FDSN "%s": download error summary for "%s" (%i)\n%s ' 

173 'Codes:%s\n Time spans:%s') % ( 

174 self.site, 

175 self.kind, 

176 len(self.entries), 

177 ' Details: %s\n' % self.details if self.details else '', 

178 scodes, 

179 sspans) 

180 

181 

182class ErrorLog(Object): 

183 site = String.T() 

184 entries = List.T(ErrorEntry.T()) 

185 checkpoints = List.T(Int.T()) 

186 

187 def append_checkpoint(self): 

188 self.checkpoints.append(len(self.entries)) 

189 

190 def append(self, time, order, kind, details=''): 

191 entry = ErrorEntry(time=time, order=order, kind=kind, details=details) 

192 self.entries.append(entry) 

193 

194 def iter_aggregates(self): 

195 by_kind_details = defaultdict(list) 

196 for entry in self.entries: 

197 by_kind_details[entry.kind, entry.details].append(entry) 

198 

199 kind_details = sorted(by_kind_details.keys()) 

200 

201 for kind, details in kind_details: 

202 entries = by_kind_details[kind, details] 

203 codes_list = sorted(set(entry.order.codes for entry in entries)) 

204 selection = orders_to_selection(entry.order for entry in entries) 

205 time_spans = sorted(set(row[-2:] for row in selection)) 

206 yield ErrorAggregate( 

207 site=self.site, 

208 kind=kind, 

209 details=details, 

210 entries=entries, 

211 codes_list=codes_list, 

212 time_spans=time_spans) 

213 

214 def summarize_recent(self): 

215 ioff = self.checkpoints[-1] if self.checkpoints else 0 

216 recent = self.entries[ioff:] 

217 kinds = sorted(set(entry.kind for entry in recent)) 

218 if recent: 

219 return '%i error%s (%s)' % ( 

220 len(recent), plural_s(recent), '; '.join(kinds)) 

221 else: 

222 return '' 

223 

224 

225class FDSNSource(Source): 

226 

227 ''' 

228 Squirrel data-source to transparently get data from FDSN web services. 

229 

230 Attaching an :py:class:`FDSNSource` object to a :py:class:`Squirrel` allows 

231 the latter to download station and waveform data from an FDSN web service 

232 should the data not already happen to be available locally. 

233 ''' 

234 

235 site = String.T( 

236 help='FDSN site url or alias name (see ' 

237 ':py:mod:`pyrocko.client.fdsn`).') 

238 

239 query_args = Dict.T( 

240 String.T(), String.T(), 

241 optional=True, 

242 help='Common query arguments, which are appended to all queries.') 

243 

244 expires = Duration.T( 

245 optional=True, 

246 help='Expiration time [s]. Information older than this will be ' 

247 'refreshed. This only applies to station-metadata. Waveforms do ' 

248 'not expire. If set to ``None`` neither type of data expires.') 

249 

250 cache_path = String.T( 

251 optional=True, 

252 help='Directory path where any downloaded waveforms and station ' 

253 'meta-data are to be kept. By default the Squirrel ' 

254 'environment\'s cache directory is used.') 

255 

256 shared_waveforms = Bool.T( 

257 default=True, 

258 help='If ``True``, waveforms are shared with other FDSN sources in ' 

259 'the same Squirrel environment. If ``False``, they are kept ' 

260 'separate.') 

261 

262 user_credentials = Tuple.T( 

263 2, String.T(), 

264 optional=True, 

265 help='User and password for FDSN servers requiring password ' 

266 'authentication') 

267 

268 auth_token = String.T( 

269 optional=True, 

270 help='Authentication token to be presented to the FDSN server.') 

271 

272 auth_token_path = String.T( 

273 optional=True, 

274 help='Path to file containing the authentication token to be ' 

275 'presented to the FDSN server.') 

276 

277 def __init__(self, site, query_args=None, **kwargs): 

278 Source.__init__(self, site=site, query_args=query_args, **kwargs) 

279 

280 self._constraint = None 

281 self._hash = self.make_hash() 

282 self._source_id = 'client:fdsn:%s' % self._hash 

283 self._error_infos = [] 

284 

285 def describe(self): 

286 return self._source_id 

287 

288 def make_hash(self): 

289 s = self.site 

290 s += 'notoken' \ 

291 if (self.auth_token is None and self.auth_token_path is None) \ 

292 else 'token' 

293 

294 if self.user_credentials is not None: 

295 s += self.user_credentials[0] 

296 else: 

297 s += 'nocred' 

298 

299 if self.query_args is not None: 

300 s += ','.join( 

301 '%s:%s' % (k, self.query_args[k]) 

302 for k in sorted(self.query_args.keys())) 

303 else: 

304 s += 'noqueryargs' 

305 

306 return ehash(s) 

307 

308 def get_hash(self): 

309 return self._hash 

310 

311 def get_auth_token(self): 

312 if self.auth_token: 

313 return self.auth_token 

314 

315 elif self.auth_token_path is not None: 

316 try: 

317 with open(self.auth_token_path, 'rb') as f: 

318 return f.read().decode('ascii') 

319 

320 except OSError as e: 

321 raise FileLoadError( 

322 'Cannot load auth token file (%s): %s' 

323 % (str(e), self.auth_token_path)) 

324 

325 else: 

326 raise Exception( 

327 'FDSNSource: auth_token and auth_token_path are mutually ' 

328 'exclusive.') 

329 

330 def setup(self, squirrel, check=True): 

331 self._cache_path = op.join( 

332 self.cache_path or squirrel._cache_path, 'fdsn') 

333 

334 util.ensuredir(self._cache_path) 

335 self._load_constraint() 

336 self._archive = MSeedArchive() 

337 waveforms_path = self._get_waveforms_path() 

338 util.ensuredir(waveforms_path) 

339 self._archive.set_base_path(waveforms_path) 

340 

341 squirrel.add( 

342 self._get_waveforms_path(), 

343 check=check) 

344 

345 fn = self._get_channels_path() 

346 if os.path.exists(fn): 

347 squirrel.add(fn) 

348 

349 squirrel.add_virtual( 

350 [], virtual_paths=[self._source_id]) 

351 

352 responses_path = self._get_responses_path() 

353 if os.path.exists(responses_path): 

354 squirrel.add(responses_path, kinds=['response']) 

355 

356 def _get_constraint_path(self): 

357 return op.join(self._cache_path, self._hash, 'constraint.pickle') 

358 

359 def _get_channels_path(self): 

360 return op.join(self._cache_path, self._hash, 'channels.stationxml') 

361 

362 def _get_responses_path(self, nslc=None): 

363 dirpath = op.join( 

364 self._cache_path, self._hash, 'responses') 

365 

366 if nslc is None: 

367 return dirpath 

368 else: 

369 return op.join( 

370 dirpath, 'response_%s_%s_%s_%s.stationxml' % nslc) 

371 

372 def _get_waveforms_path(self): 

373 if self.shared_waveforms: 

374 return op.join(self._cache_path, 'waveforms') 

375 else: 

376 return op.join(self._cache_path, self._hash, 'waveforms') 

377 

378 def _log_meta(self, message, target=logger.info): 

379 log_prefix = 'FDSN "%s" metadata:' % self.site 

380 target(' '.join((log_prefix, message))) 

381 

382 def _log_responses(self, message, target=logger.info): 

383 log_prefix = 'FDSN "%s" responses:' % self.site 

384 target(' '.join((log_prefix, message))) 

385 

386 def _log_info_data(self, *args): 

387 log_prefix = 'FDSN "%s" waveforms:' % self.site 

388 logger.info(' '.join((log_prefix,) + args)) 

389 

390 def _str_expires(self, t, now): 

391 if t is None: 

392 return 'expires: never' 

393 else: 

394 expire = 'expires' if t > now else 'expired' 

395 return '%s: %s' % ( 

396 expire, 

397 util.time_to_str(t, format='%Y-%m-%d %H:%M:%S')) 

398 

399 def update_channel_inventory(self, squirrel, constraint=None): 

400 if constraint is None: 

401 constraint = Constraint() 

402 

403 expiration_time = self._get_channels_expiration_time() 

404 now = time.time() 

405 

406 log_target = logger.info 

407 if self._constraint and self._constraint.contains(constraint) \ 

408 and (expiration_time is None or now < expiration_time): 

409 

410 status = 'using cached' 

411 

412 else: 

413 if self._constraint: 

414 constraint_temp = copy.deepcopy(self._constraint) 

415 constraint_temp.expand(constraint) 

416 constraint = constraint_temp 

417 

418 try: 

419 channel_sx = self._do_channel_query(constraint) 

420 

421 channel_sx.created = None # timestamp would ruin diff 

422 

423 fn = self._get_channels_path() 

424 util.ensuredirs(fn) 

425 fn_temp = fn + '.%i.temp' % os.getpid() 

426 channel_sx.dump_xml(filename=fn_temp) 

427 

428 status = move_or_keep(fn_temp, fn) 

429 

430 if status == 'upstream unchanged': 

431 squirrel.get_database().silent_touch(fn) 

432 

433 self._constraint = constraint 

434 self._dump_constraint() 

435 

436 except OSError as e: 

437 status = 'update failed (%s)' % str(e) 

438 log_target = logger.error 

439 

440 expiration_time = self._get_channels_expiration_time() 

441 self._log_meta( 

442 '%s (%s)' % (status, self._str_expires(expiration_time, now)), 

443 target=log_target) 

444 

445 fn = self._get_channels_path() 

446 if os.path.exists(fn): 

447 squirrel.add(fn) 

448 

449 def _do_channel_query(self, constraint): 

450 extra_args = {} 

451 

452 if self.site in sites_not_supporting['startbefore']: 

453 if constraint.tmin is not None and constraint.tmin != g_tmin: 

454 extra_args['starttime'] = constraint.tmin 

455 if constraint.tmax is not None and constraint.tmax != g_tmax: 

456 extra_args['endtime'] = constraint.tmax 

457 

458 else: 

459 if constraint.tmin is not None and constraint.tmin != g_tmin: 

460 extra_args['endafter'] = constraint.tmin 

461 if constraint.tmax is not None and constraint.tmax != g_tmax: 

462 extra_args['startbefore'] = constraint.tmax 

463 

464 if self.site not in sites_not_supporting['includerestricted']: 

465 extra_args.update( 

466 includerestricted=( 

467 self.user_credentials is not None 

468 or self.auth_token is not None 

469 or self.auth_token_path is not None)) 

470 

471 if self.query_args is not None: 

472 extra_args.update(self.query_args) 

473 

474 self._log_meta('querying...') 

475 

476 try: 

477 channel_sx = fdsn.station( 

478 site=self.site, 

479 format='text', 

480 level='channel', 

481 **extra_args) 

482 return channel_sx 

483 

484 except fdsn.EmptyResult: 

485 return stationxml.FDSNStationXML(source='dummy-empty-result') 

486 

487 def _load_constraint(self): 

488 fn = self._get_constraint_path() 

489 if op.exists(fn): 

490 with open(fn, 'rb') as f: 

491 self._constraint = pickle.load(f) 

492 else: 

493 self._constraint = None 

494 

495 def _dump_constraint(self): 

496 with open(self._get_constraint_path(), 'wb') as f: 

497 pickle.dump(self._constraint, f, protocol=2) 

498 

499 def _get_expiration_time(self, path): 

500 if self.expires is None: 

501 return None 

502 

503 try: 

504 t = os.stat(path)[8] 

505 return t + self.expires 

506 

507 except OSError: 

508 return 0.0 

509 

510 def _get_channels_expiration_time(self): 

511 return self._get_expiration_time(self._get_channels_path()) 

512 

513 def update_waveform_promises(self, squirrel): 

514 cpath = os.path.abspath(self._get_channels_path()) 

515 nuts = squirrel.iter_nuts('channel', path=cpath) 

516 

517 path = self._source_id 

518 squirrel.add_virtual( 

519 (make_waveform_promise_nut( 

520 file_path=path, 

521 **nut.waveform_promise_kwargs) for nut in nuts), 

522 virtual_paths=[path]) 

523 

524 def _get_user_credentials(self): 

525 d = {} 

526 if self.user_credentials is not None: 

527 d['user'], d['passwd'] = self.user_credentials 

528 

529 if self.auth_token is not None or self.auth_token_path is not None: 

530 d['token'] = self.get_auth_token() 

531 

532 return d 

533 

534 def download_waveforms( 

535 self, orders, success, batch_add, error_permanent, 

536 error_temporary): 

537 

538 elog = ErrorLog(site=self.site) 

539 orders.sort(key=orders_sort_key) 

540 neach = 20 

541 i = 0 

542 task = make_task( 

543 'FDSN "%s" waveforms: downloading' % self.site, len(orders)) 

544 

545 while i < len(orders): 

546 orders_now = orders[i:i+neach] 

547 selection_now = orders_to_selection(orders_now) 

548 

549 nsuccess = 0 

550 elog.append_checkpoint() 

551 self._log_info_data( 

552 'downloading, %s' % order_summary(orders_now)) 

553 

554 all_paths = [] 

555 with tempfile.TemporaryDirectory() as tmpdir: 

556 try: 

557 data = fdsn.dataselect( 

558 site=self.site, selection=selection_now, 

559 **self._get_user_credentials()) 

560 

561 now = time.time() 

562 

563 path = op.join(tmpdir, 'tmp.mseed') 

564 with open(path, 'wb') as f: 

565 while True: 

566 buf = data.read(1024) 

567 if not buf: 

568 break 

569 f.write(buf) 

570 

571 trs = io.load(path) 

572 

573 by_nslc = defaultdict(list) 

574 for tr in trs: 

575 by_nslc[tr.nslc_id].append(tr) 

576 

577 for order in orders_now: 

578 trs_order = [] 

579 err_this = None 

580 for tr in by_nslc[order.codes[1:5]]: 

581 try: 

582 order.validate(tr) 

583 trs_order.append(tr.chop( 

584 order.tmin, order.tmax, inplace=False)) 

585 

586 except trace.NoData: 

587 err_this = ( 

588 'empty result', 'empty sub-interval') 

589 

590 except InvalidWaveform as e: 

591 err_this = ('invalid waveform', str(e)) 

592 

593 if len(trs_order) == 0: 

594 if err_this is None: 

595 err_this = ('empty result', '') 

596 

597 elog.append(now, order, *err_this) 

598 error_permanent(order) 

599 else: 

600 if len(trs_order) != 1: 

601 if err_this: 

602 elog.append( 

603 now, order, 

604 'partial result, %s' % err_this[0], 

605 err_this[1]) 

606 else: 

607 elog.append(now, order, 'partial result') 

608 

609 paths = self._archive.add(trs_order) 

610 all_paths.extend(paths) 

611 

612 nsuccess += 1 

613 success(order) 

614 

615 except fdsn.EmptyResult: 

616 now = time.time() 

617 for order in orders_now: 

618 elog.append(now, order, 'empty result') 

619 error_permanent(order) 

620 

621 except util.HTTPError as e: 

622 now = time.time() 

623 for order in orders_now: 

624 elog.append(now, order, 'http error', str(e)) 

625 error_temporary(order) 

626 

627 emessage = elog.summarize_recent() 

628 self._log_info_data( 

629 '%i download%s successful' % (nsuccess, plural_s(nsuccess)) 

630 + (', %s' % emessage if emessage else '')) 

631 

632 if all_paths: 

633 batch_add(all_paths) 

634 

635 i += neach 

636 task.update(i) 

637 

638 for agg in elog.iter_aggregates(): 

639 logger.warning(str(agg)) 

640 

641 task.done() 

642 

643 def _do_response_query(self, selection): 

644 extra_args = {} 

645 

646 if self.site not in sites_not_supporting['includerestricted']: 

647 extra_args.update( 

648 includerestricted=( 

649 self.user_credentials is not None 

650 or self.auth_token is not None 

651 or self.auth_token_path is not None)) 

652 

653 self._log_responses('querying...') 

654 

655 try: 

656 response_sx = fdsn.station( 

657 site=self.site, 

658 level='response', 

659 selection=selection, 

660 **extra_args) 

661 

662 return response_sx 

663 

664 except fdsn.EmptyResult: 

665 return stationxml.FDSNStationXML(source='dummy-empty-result') 

666 

667 def update_response_inventory(self, squirrel, constraint): 

668 cpath = os.path.abspath(self._get_channels_path()) 

669 nuts = squirrel.iter_nuts('channel', path=cpath) 

670 

671 tmin = g_tmin_queries 

672 tmax = g_tmax 

673 

674 selection = [] 

675 now = time.time() 

676 have = set() 

677 status = defaultdict(list) 

678 for nut in nuts: 

679 nslc = nut.codes_tuple[1:5] 

680 if nslc in have: 

681 continue 

682 have.add(nslc) 

683 

684 fn = self._get_responses_path(nslc) 

685 expiration_time = self._get_expiration_time(fn) 

686 if os.path.exists(fn) \ 

687 and (expiration_time is None or now < expiration_time): 

688 status['using cached'].append(nslc) 

689 else: 

690 selection.append(nslc + (tmin, tmax)) 

691 

692 dummy = stationxml.FDSNStationXML(source='dummy-empty') 

693 neach = 100 

694 i = 0 

695 fns = [] 

696 while i < len(selection): 

697 selection_now = selection[i:i+neach] 

698 i += neach 

699 

700 try: 

701 sx = self._do_response_query(selection_now) 

702 except Exception as e: 

703 status['update failed (%s)' % str(e)].extend( 

704 entry[:4] for entry in selection_now) 

705 continue 

706 

707 sx.created = None # timestamp would ruin diff 

708 

709 by_nslc = dict(stationxml.split_channels(sx)) 

710 

711 for entry in selection_now: 

712 nslc = entry[:4] 

713 response_sx = by_nslc.get(nslc, dummy) 

714 try: 

715 fn = self._get_responses_path(nslc) 

716 fn_temp = fn + '.%i.temp' % os.getpid() 

717 

718 util.ensuredirs(fn_temp) 

719 response_sx.dump_xml(filename=fn_temp) 

720 

721 status_this = move_or_keep(fn_temp, fn) 

722 

723 if status_this == 'upstream unchanged': 

724 try: 

725 squirrel.get_database().silent_touch(fn) 

726 except ExecuteGet1Error: 

727 pass 

728 

729 status[status_this].append(nslc) 

730 fns.append(fn) 

731 

732 except OSError as e: 

733 status['update failed (%s)' % str(e)].append(nslc) 

734 

735 for k in sorted(status): 

736 if k.find('failed') != -1: 

737 log_target = logger.error 

738 else: 

739 log_target = logger.info 

740 

741 self._log_responses( 

742 '%s: %s' % ( 

743 k, codes_to_str_abbreviated(status[k])), 

744 target=log_target) 

745 

746 squirrel.add(fns, kinds=['response']) 

747 

748 

749__all__ = [ 

750 'FDSNSource', 

751]