1# http://pyrocko.org - GPLv3 

2# 

3# The Pyrocko Developers, 21st Century 

4# ---|P------/S----------~Lg---------- 

5 

6from __future__ import absolute_import, print_function 

7 

8import time 

9import os 

10import copy 

11import logging 

12import tempfile 

13from collections import defaultdict 

14try: 

15 import cPickle as pickle 

16except ImportError: 

17 import pickle 

18import os.path as op 

19from .base import Source, Constraint 

20from ..model import make_waveform_promise_nut, ehash, InvalidWaveform, \ 

21 order_summary, WaveformOrder, g_tmin, g_tmax, g_tmin_queries, \ 

22 codes_to_str_abbreviated 

23from ..database import ExecuteGet1Error 

24from pyrocko.client import fdsn 

25 

26from pyrocko import util, trace, io 

27from pyrocko.io.io_common import FileLoadError 

28from pyrocko.io import stationxml 

29from pyrocko.progress import progress 

30 

31from pyrocko.guts import Object, String, Timestamp, List, Tuple, Int, Dict, \ 

32 Duration, Bool, clone 

33 

34guts_prefix = 'squirrel' 

35 

36fdsn.g_timeout = 60. 

37 

38logger = logging.getLogger('psq.client.fdsn') 

39 

40sites_not_supporting = { 

41 'startbefore': ['geonet'], 

42 'includerestricted': ['geonet']} 

43 

44 

45def make_task(*args): 

46 return progress.task(*args, logger=logger) 

47 

48 

49def plural_s(x): 

50 if not isinstance(x, int): 

51 x = len(x) 

52 

53 return 's' if x != 1 else '' 

54 

55 

56def diff(fn_a, fn_b): 

57 try: 

58 if os.stat(fn_a).st_size != os.stat(fn_b).st_size: 

59 return True 

60 

61 except OSError: 

62 return True 

63 

64 with open(fn_a, 'rb') as fa: 

65 with open(fn_b, 'rb') as fb: 

66 while True: 

67 a = fa.read(1024) 

68 b = fb.read(1024) 

69 if a != b: 

70 return True 

71 

72 if len(a) == 0 or len(b) == 0: 

73 return False 

74 

75 

76def move_or_keep(fn_temp, fn): 

77 if op.exists(fn): 

78 if diff(fn, fn_temp): 

79 os.rename(fn_temp, fn) 

80 status = 'updated' 

81 else: 

82 os.unlink(fn_temp) 

83 status = 'upstream unchanged' 

84 

85 else: 

86 os.rename(fn_temp, fn) 

87 status = 'new' 

88 

89 return status 

90 

91 

92class Archive(Object): 

93 

94 def add(self): 

95 raise NotImplementedError() 

96 

97 

98class MSeedArchive(Archive): 

99 template = String.T(default=op.join( 

100 '%(tmin_year)s', 

101 '%(tmin_month)s', 

102 '%(tmin_day)s', 

103 'trace_%(network)s_%(station)s_%(location)s_%(channel)s' 

104 + '_%(tmin_us)s_%(tmax_us)s.mseed')) 

105 

106 def __init__(self, **kwargs): 

107 Archive.__init__(self, **kwargs) 

108 self._base_path = None 

109 

110 def set_base_path(self, path): 

111 self._base_path = path 

112 

113 def add(self, trs): 

114 path = op.join(self._base_path, self.template) 

115 return io.save(trs, path, overwrite=True) 

116 

117 

118def combine_selections(selection): 

119 out = [] 

120 last = None 

121 for this in selection: 

122 if last and this[:4] == last[:4] and this[4] == last[5]: 

123 last = last[:5] + (this[5],) 

124 else: 

125 if last: 

126 out.append(last) 

127 

128 last = this 

129 

130 if last: 

131 out.append(last) 

132 

133 return out 

134 

135 

136def orders_sort_key(order): 

137 return (order.codes, order.tmin) 

138 

139 

140def orders_to_selection(orders): 

141 selection = [] 

142 for order in sorted(orders, key=orders_sort_key): 

143 selection.append( 

144 order.codes[1:5] + (order.tmin, order.tmax)) 

145 

146 return combine_selections(selection) 

147 

148 

149class ErrorEntry(Object): 

150 time = Timestamp.T() 

151 order = WaveformOrder.T() 

152 kind = String.T() 

153 details = String.T(optional=True) 

154 

155 

156class ErrorAggregate(Object): 

157 site = String.T() 

158 kind = String.T() 

159 details = String.T() 

160 entries = List.T(ErrorEntry.T()) 

161 codes_list = List.T(Tuple.T(None, String.T())) 

162 time_spans = List.T(Tuple.T(2, Timestamp.T())) 

163 

164 def __str__(self): 

165 codes = ['.'.join(x) for x in self.codes_list] 

166 scodes = '\n' + util.ewrap(codes, indent=' ') if codes else '<none>' 

167 tss = self.time_spans 

168 sspans = '\n' + util.ewrap(('%s - %s' % ( 

169 util.time_to_str(ts[0]), util.time_to_str(ts[1])) for ts in tss), 

170 indent=' ') 

171 

172 return ('FDSN "%s": download error summary for "%s" (%i)\n%s ' 

173 'Codes:%s\n Time spans:%s') % ( 

174 self.site, 

175 self.kind, 

176 len(self.entries), 

177 ' Details: %s\n' % self.details if self.details else '', 

178 scodes, 

179 sspans) 

180 

181 

182class ErrorLog(Object): 

183 site = String.T() 

184 entries = List.T(ErrorEntry.T()) 

185 checkpoints = List.T(Int.T()) 

186 

187 def append_checkpoint(self): 

188 self.checkpoints.append(len(self.entries)) 

189 

190 def append(self, time, order, kind, details=''): 

191 entry = ErrorEntry(time=time, order=order, kind=kind, details=details) 

192 self.entries.append(entry) 

193 

194 def iter_aggregates(self): 

195 by_kind_details = defaultdict(list) 

196 for entry in self.entries: 

197 by_kind_details[entry.kind, entry.details].append(entry) 

198 

199 kind_details = sorted(by_kind_details.keys()) 

200 

201 for kind, details in kind_details: 

202 entries = by_kind_details[kind, details] 

203 codes_list = sorted(set(entry.order.codes for entry in entries)) 

204 selection = orders_to_selection(entry.order for entry in entries) 

205 time_spans = sorted(set(row[-2:] for row in selection)) 

206 yield ErrorAggregate( 

207 site=self.site, 

208 kind=kind, 

209 details=details, 

210 entries=entries, 

211 codes_list=codes_list, 

212 time_spans=time_spans) 

213 

214 def summarize_recent(self): 

215 ioff = self.checkpoints[-1] if self.checkpoints else 0 

216 recent = self.entries[ioff:] 

217 kinds = sorted(set(entry.kind for entry in recent)) 

218 if recent: 

219 return '%i error%s (%s)' % ( 

220 len(recent), plural_s(recent), '; '.join(kinds)) 

221 else: 

222 return '' 

223 

224 

225class FDSNSource(Source): 

226 

227 ''' 

228 Squirrel data-source to transparently get data from FDSN web services. 

229 

230 Attaching an :py:class:`FDSNSource` object to a :py:class:`Squirrel` allows 

231 the latter to download station and waveform data from an FDSN web service 

232 should the data not already happen to be available locally. 

233 ''' 

234 

235 site = String.T( 

236 help='FDSN site url or alias name (see ' 

237 ':py:mod:`pyrocko.client.fdsn`).') 

238 

239 query_args = Dict.T( 

240 String.T(), String.T(), 

241 optional=True, 

242 help='Common query arguments, which are appended to all queries.') 

243 

244 expires = Duration.T( 

245 optional=True, 

246 help='Expiration time [s]. Information older than this will be ' 

247 'refreshed. This only applies to station-metadata. Waveforms do ' 

248 'not expire. If set to ``None`` neither type of data expires.') 

249 

250 cache_path = String.T( 

251 optional=True, 

252 help='Directory path where any downloaded waveforms and station ' 

253 'meta-data are to be kept. By default the Squirrel ' 

254 'environment\'s cache directory is used.') 

255 

256 shared_waveforms = Bool.T( 

257 default=False, 

258 help='If ``True``, waveforms are shared with other FDSN sources in ' 

259 'the same Squirrel environment. If ``False``, they are kept ' 

260 'separate.') 

261 

262 user_credentials = Tuple.T( 

263 2, String.T(), 

264 optional=True, 

265 help='User and password for FDSN servers requiring password ' 

266 'authentication') 

267 

268 auth_token = String.T( 

269 optional=True, 

270 help='Authentication token to be presented to the FDSN server.') 

271 

272 auth_token_path = String.T( 

273 optional=True, 

274 help='Path to file containing the authentication token to be ' 

275 'presented to the FDSN server.') 

276 

277 def __init__(self, site, query_args=None, **kwargs): 

278 Source.__init__(self, site=site, query_args=query_args, **kwargs) 

279 

280 self._constraint = None 

281 self._hash = self.make_hash() 

282 self._source_id = 'client:fdsn:%s' % self._hash 

283 self._error_infos = [] 

284 

285 def describe(self): 

286 return self._source_id 

287 

288 def make_hash(self): 

289 s = self.site 

290 s += 'notoken' \ 

291 if (self.auth_token is None and self.auth_token_path is None) \ 

292 else 'token' 

293 

294 if self.user_credentials is not None: 

295 s += self.user_credentials[0] 

296 else: 

297 s += 'nocred' 

298 

299 if self.query_args is not None: 

300 s += ','.join( 

301 '%s:%s' % (k, self.query_args[k]) 

302 for k in sorted(self.query_args.keys())) 

303 else: 

304 s += 'noqueryargs' 

305 

306 return ehash(s) 

307 

308 def get_hash(self): 

309 return self._hash 

310 

311 def get_auth_token(self): 

312 if self.auth_token: 

313 return self.auth_token 

314 

315 elif self.auth_token_path is not None: 

316 try: 

317 with open(self.auth_token_path, 'rb') as f: 

318 return f.read().decode('ascii') 

319 

320 except OSError as e: 

321 raise FileLoadError( 

322 'Cannot load auth token file (%s): %s' 

323 % (str(e), self.auth_token_path)) 

324 

325 else: 

326 raise Exception( 

327 'FDSNSource: auth_token and auth_token_path are mutually ' 

328 'exclusive.') 

329 

330 def setup(self, squirrel, check=True): 

331 self._cache_path = op.join( 

332 self.cache_path or squirrel._cache_path, 'fdsn') 

333 

334 util.ensuredir(self._cache_path) 

335 self._load_constraint() 

336 self._archive = MSeedArchive() 

337 waveforms_path = self._get_waveforms_path() 

338 util.ensuredir(waveforms_path) 

339 self._archive.set_base_path(waveforms_path) 

340 

341 squirrel.add( 

342 self._get_waveforms_path(), 

343 check=check) 

344 

345 fn = self._get_channels_path() 

346 if os.path.exists(fn): 

347 squirrel.add(fn) 

348 

349 squirrel.add_virtual( 

350 [], virtual_paths=[self._source_id]) 

351 

352 responses_path = self._get_responses_path() 

353 if os.path.exists(responses_path): 

354 squirrel.add(responses_path, kinds=['response']) 

355 

356 def _get_constraint_path(self): 

357 return op.join(self._cache_path, self._hash, 'constraint.pickle') 

358 

359 def _get_channels_path(self): 

360 return op.join(self._cache_path, self._hash, 'channels.stationxml') 

361 

362 def _get_responses_path(self, nslc=None): 

363 dirpath = op.join( 

364 self._cache_path, self._hash, 'responses') 

365 

366 if nslc is None: 

367 return dirpath 

368 else: 

369 return op.join( 

370 dirpath, 'response_%s_%s_%s_%s.stationxml' % nslc) 

371 

372 def _get_waveforms_path(self): 

373 if self.shared_waveforms: 

374 return op.join(self._cache_path, 'waveforms') 

375 else: 

376 return op.join(self._cache_path, self._hash, 'waveforms') 

377 

378 def _log_meta(self, message, target=logger.info): 

379 log_prefix = 'FDSN "%s" metadata:' % self.site 

380 target(' '.join((log_prefix, message))) 

381 

382 def _log_responses(self, message, target=logger.info): 

383 log_prefix = 'FDSN "%s" responses:' % self.site 

384 target(' '.join((log_prefix, message))) 

385 

386 def _log_info_data(self, *args): 

387 log_prefix = 'FDSN "%s" waveforms:' % self.site 

388 logger.info(' '.join((log_prefix,) + args)) 

389 

390 def _str_expires(self, t, now): 

391 if t is None: 

392 return 'expires: never' 

393 else: 

394 expire = 'expires' if t > now else 'expired' 

395 return '%s: %s' % ( 

396 expire, 

397 util.time_to_str(t, format='%Y-%m-%d %H:%M:%S')) 

398 

399 def update_channel_inventory(self, squirrel, constraint=None): 

400 if constraint is None: 

401 constraint = Constraint() 

402 

403 expiration_time = self._get_channels_expiration_time() 

404 now = time.time() 

405 

406 log_target = logger.info 

407 if self._constraint and self._constraint.contains(constraint) \ 

408 and (expiration_time is None or now < expiration_time): 

409 

410 status = 'using cached' 

411 

412 else: 

413 if self._constraint: 

414 constraint_temp = copy.deepcopy(self._constraint) 

415 constraint_temp.expand(constraint) 

416 constraint = constraint_temp 

417 

418 try: 

419 channel_sx = self._do_channel_query(constraint) 

420 

421 channel_sx.created = None # timestamp would ruin diff 

422 

423 fn = self._get_channels_path() 

424 util.ensuredirs(fn) 

425 fn_temp = fn + '.%i.temp' % os.getpid() 

426 channel_sx.dump_xml(filename=fn_temp) 

427 

428 status = move_or_keep(fn_temp, fn) 

429 

430 if status == 'upstream unchanged': 

431 squirrel.get_database().silent_touch(fn) 

432 

433 self._constraint = constraint 

434 self._dump_constraint() 

435 

436 except OSError as e: 

437 status = 'update failed (%s)' % str(e) 

438 log_target = logger.error 

439 

440 expiration_time = self._get_channels_expiration_time() 

441 self._log_meta( 

442 '%s (%s)' % (status, self._str_expires(expiration_time, now)), 

443 target=log_target) 

444 

445 fn = self._get_channels_path() 

446 if os.path.exists(fn): 

447 squirrel.add(fn) 

448 

449 def _do_channel_query(self, constraint): 

450 extra_args = {} 

451 

452 if self.site in sites_not_supporting['startbefore']: 

453 if constraint.tmin is not None and constraint.tmin != g_tmin: 

454 extra_args['starttime'] = constraint.tmin 

455 if constraint.tmax is not None and constraint.tmax != g_tmax: 

456 extra_args['endtime'] = constraint.tmax 

457 

458 else: 

459 if constraint.tmin is not None and constraint.tmin != g_tmin: 

460 extra_args['endafter'] = constraint.tmin 

461 if constraint.tmax is not None and constraint.tmax != g_tmax: 

462 extra_args['startbefore'] = constraint.tmax 

463 

464 if self.site not in sites_not_supporting['includerestricted']: 

465 extra_args.update( 

466 includerestricted=( 

467 self.user_credentials is not None 

468 or self.auth_token is not None 

469 or self.auth_token_path is not None)) 

470 

471 if self.query_args is not None: 

472 extra_args.update(self.query_args) 

473 

474 self._log_meta('querying...') 

475 

476 try: 

477 channel_sx = fdsn.station( 

478 site=self.site, 

479 format='text', 

480 level='channel', 

481 **extra_args) 

482 return channel_sx 

483 

484 except fdsn.EmptyResult: 

485 return stationxml.FDSNStationXML(source='dummy-empty-result') 

486 

487 def _load_constraint(self): 

488 fn = self._get_constraint_path() 

489 if op.exists(fn): 

490 with open(fn, 'rb') as f: 

491 self._constraint = pickle.load(f) 

492 else: 

493 self._constraint = None 

494 

495 def _dump_constraint(self): 

496 with open(self._get_constraint_path(), 'wb') as f: 

497 pickle.dump(self._constraint, f, protocol=2) 

498 

499 def _get_expiration_time(self, path): 

500 if self.expires is None: 

501 return None 

502 

503 try: 

504 t = os.stat(path)[8] 

505 return t + self.expires 

506 

507 except OSError: 

508 return 0.0 

509 

510 def _get_channels_expiration_time(self): 

511 return self._get_expiration_time(self._get_channels_path()) 

512 

513 def update_waveform_promises(self, squirrel, constraint): 

514 from ..base import gaps 

515 now = time.time() 

516 cpath = os.path.abspath(self._get_channels_path()) 

517 nuts = squirrel.iter_nuts( 

518 'channel', path=cpath, codes=constraint.codes) 

519 

520 coverages = squirrel.get_coverage( 

521 'waveform', codes_list=[constraint.codes], return_raw=False) 

522 

523 codes_to_avail = defaultdict(list) 

524 for coverage in coverages: 

525 for tmin, tmax, _ in coverage.iter_spans(): 

526 codes_to_avail[coverage.codes].append((tmin, tmax)) 

527 

528 def sgaps(nut): 

529 for tmin, tmax in gaps( 

530 codes_to_avail[nut.codes], nut.tmin, nut.tmax): 

531 

532 subnut = clone(nut) 

533 subnut.tmin = tmin 

534 subnut.tmax = tmax 

535 yield subnut 

536 

537 def wanted(nuts): 

538 for nut in nuts: 

539 if nut.tmin < now: 

540 if nut.tmax > now: 

541 nut.tmax = now 

542 

543 for nut in sgaps(nut): 

544 yield nut 

545 

546 path = self._source_id 

547 squirrel.add_virtual( 

548 (make_waveform_promise_nut( 

549 file_path=path, 

550 **nut.waveform_promise_kwargs) for nut in wanted(nuts)), 

551 virtual_paths=[path]) 

552 

553 def _get_user_credentials(self): 

554 d = {} 

555 if self.user_credentials is not None: 

556 d['user'], d['passwd'] = self.user_credentials 

557 

558 if self.auth_token is not None or self.auth_token_path is not None: 

559 d['token'] = self.get_auth_token() 

560 

561 return d 

562 

563 def download_waveforms( 

564 self, orders, success, batch_add, error_permanent, 

565 error_temporary): 

566 

567 elog = ErrorLog(site=self.site) 

568 orders.sort(key=orders_sort_key) 

569 neach = 20 

570 i = 0 

571 task = make_task( 

572 'FDSN "%s" waveforms: downloading' % self.site, len(orders)) 

573 

574 while i < len(orders): 

575 orders_now = orders[i:i+neach] 

576 selection_now = orders_to_selection(orders_now) 

577 

578 nsuccess = 0 

579 elog.append_checkpoint() 

580 self._log_info_data( 

581 'downloading, %s' % order_summary(orders_now)) 

582 

583 all_paths = [] 

584 with tempfile.TemporaryDirectory() as tmpdir: 

585 try: 

586 data = fdsn.dataselect( 

587 site=self.site, selection=selection_now, 

588 **self._get_user_credentials()) 

589 

590 now = time.time() 

591 

592 path = op.join(tmpdir, 'tmp.mseed') 

593 with open(path, 'wb') as f: 

594 while True: 

595 buf = data.read(1024) 

596 if not buf: 

597 break 

598 f.write(buf) 

599 

600 trs = io.load(path) 

601 

602 by_nslc = defaultdict(list) 

603 for tr in trs: 

604 by_nslc[tr.nslc_id].append(tr) 

605 

606 for order in orders_now: 

607 trs_order = [] 

608 err_this = None 

609 for tr in by_nslc[order.codes[1:5]]: 

610 try: 

611 order.validate(tr) 

612 trs_order.append(tr.chop( 

613 order.tmin, order.tmax, inplace=False)) 

614 

615 except trace.NoData: 

616 err_this = ( 

617 'empty result', 'empty sub-interval') 

618 

619 except InvalidWaveform as e: 

620 err_this = ('invalid waveform', str(e)) 

621 

622 if len(trs_order) == 0: 

623 if err_this is None: 

624 err_this = ('empty result', '') 

625 

626 elog.append(now, order, *err_this) 

627 error_permanent(order) 

628 else: 

629 if len(trs_order) != 1: 

630 if err_this: 

631 elog.append( 

632 now, order, 

633 'partial result, %s' % err_this[0], 

634 err_this[1]) 

635 else: 

636 elog.append(now, order, 'partial result') 

637 

638 paths = self._archive.add(trs_order) 

639 all_paths.extend(paths) 

640 

641 nsuccess += 1 

642 success(order) 

643 

644 except fdsn.EmptyResult: 

645 now = time.time() 

646 for order in orders_now: 

647 elog.append(now, order, 'empty result') 

648 error_permanent(order) 

649 

650 except util.HTTPError as e: 

651 now = time.time() 

652 for order in orders_now: 

653 elog.append(now, order, 'http error', str(e)) 

654 error_temporary(order) 

655 

656 emessage = elog.summarize_recent() 

657 self._log_info_data( 

658 '%i download%s successful' % (nsuccess, plural_s(nsuccess)) 

659 + (', %s' % emessage if emessage else '')) 

660 

661 if all_paths: 

662 batch_add(all_paths) 

663 

664 i += neach 

665 task.update(i) 

666 

667 for agg in elog.iter_aggregates(): 

668 logger.warning(str(agg)) 

669 

670 task.done() 

671 

672 def _do_response_query(self, selection): 

673 extra_args = {} 

674 

675 if self.site not in sites_not_supporting['includerestricted']: 

676 extra_args.update( 

677 includerestricted=( 

678 self.user_credentials is not None 

679 or self.auth_token is not None 

680 or self.auth_token_path is not None)) 

681 

682 self._log_responses('querying...') 

683 

684 try: 

685 response_sx = fdsn.station( 

686 site=self.site, 

687 level='response', 

688 selection=selection, 

689 **extra_args) 

690 

691 return response_sx 

692 

693 except fdsn.EmptyResult: 

694 return stationxml.FDSNStationXML(source='dummy-empty-result') 

695 

696 def update_response_inventory(self, squirrel, constraint): 

697 cpath = os.path.abspath(self._get_channels_path()) 

698 nuts = squirrel.iter_nuts('channel', path=cpath) 

699 

700 tmin = g_tmin_queries 

701 tmax = g_tmax 

702 

703 selection = [] 

704 now = time.time() 

705 have = set() 

706 status = defaultdict(list) 

707 for nut in nuts: 

708 nslc = nut.codes_tuple[1:5] 

709 if nslc in have: 

710 continue 

711 have.add(nslc) 

712 

713 fn = self._get_responses_path(nslc) 

714 expiration_time = self._get_expiration_time(fn) 

715 if os.path.exists(fn) \ 

716 and (expiration_time is None or now < expiration_time): 

717 status['using cached'].append(nslc) 

718 else: 

719 selection.append(nslc + (tmin, tmax)) 

720 

721 dummy = stationxml.FDSNStationXML(source='dummy-empty') 

722 neach = 100 

723 i = 0 

724 fns = [] 

725 while i < len(selection): 

726 selection_now = selection[i:i+neach] 

727 i += neach 

728 

729 try: 

730 sx = self._do_response_query(selection_now) 

731 except Exception as e: 

732 status['update failed (%s)' % str(e)].extend( 

733 entry[:4] for entry in selection_now) 

734 continue 

735 

736 sx.created = None # timestamp would ruin diff 

737 

738 by_nslc = dict(stationxml.split_channels(sx)) 

739 

740 for entry in selection_now: 

741 nslc = entry[:4] 

742 response_sx = by_nslc.get(nslc, dummy) 

743 try: 

744 fn = self._get_responses_path(nslc) 

745 fn_temp = fn + '.%i.temp' % os.getpid() 

746 

747 util.ensuredirs(fn_temp) 

748 response_sx.dump_xml(filename=fn_temp) 

749 

750 status_this = move_or_keep(fn_temp, fn) 

751 

752 if status_this == 'upstream unchanged': 

753 try: 

754 squirrel.get_database().silent_touch(fn) 

755 except ExecuteGet1Error: 

756 pass 

757 

758 status[status_this].append(nslc) 

759 fns.append(fn) 

760 

761 except OSError as e: 

762 status['update failed (%s)' % str(e)].append(nslc) 

763 

764 for k in sorted(status): 

765 if k.find('failed') != -1: 

766 log_target = logger.error 

767 else: 

768 log_target = logger.info 

769 

770 self._log_responses( 

771 '%s: %s' % ( 

772 k, codes_to_str_abbreviated(status[k])), 

773 target=log_target) 

774 

775 squirrel.add(fns, kinds=['response']) 

776 

777 

778__all__ = [ 

779 'FDSNSource', 

780]