1# http://pyrocko.org - GPLv3
2#
3# The Pyrocko Developers, 21st Century
4# ---|P------/S----------~Lg----------
6from __future__ import absolute_import, print_function
8import time
9import os
10import copy
11import logging
12import tempfile
13from collections import defaultdict
14try:
15 import cPickle as pickle
16except ImportError:
17 import pickle
18import os.path as op
19from .base import Source, Constraint
20from ..model import make_waveform_promise_nut, ehash, InvalidWaveform, \
21 order_summary, WaveformOrder, g_tmin, g_tmax, g_tmin_queries, \
22 codes_to_str_abbreviated
23from ..database import ExecuteGet1Error
24from pyrocko.client import fdsn
26from pyrocko import util, trace, io
27from pyrocko.io.io_common import FileLoadError
28from pyrocko.io import stationxml
29from pyrocko.progress import progress
31from pyrocko.guts import Object, String, Timestamp, List, Tuple, Int, Dict, \
32 Duration, Bool
34guts_prefix = 'squirrel'
36fdsn.g_timeout = 60.
38logger = logging.getLogger('psq.client.fdsn')
40sites_not_supporting = {
41 'startbefore': ['geonet'],
42 'includerestricted': ['geonet']}
45def make_task(*args):
46 return progress.task(*args, logger=logger)
49def plural_s(x):
50 if not isinstance(x, int):
51 x = len(x)
53 return 's' if x != 1 else ''
56def diff(fn_a, fn_b):
57 try:
58 if os.stat(fn_a).st_size != os.stat(fn_b).st_size:
59 return True
61 except OSError:
62 return True
64 with open(fn_a, 'rb') as fa:
65 with open(fn_b, 'rb') as fb:
66 while True:
67 a = fa.read(1024)
68 b = fb.read(1024)
69 if a != b:
70 return True
72 if len(a) == 0 or len(b) == 0:
73 return False
76def move_or_keep(fn_temp, fn):
77 if op.exists(fn):
78 if diff(fn, fn_temp):
79 os.rename(fn_temp, fn)
80 status = 'updated'
81 else:
82 os.unlink(fn_temp)
83 status = 'upstream unchanged'
85 else:
86 os.rename(fn_temp, fn)
87 status = 'new'
89 return status
92class Archive(Object):
94 def add(self):
95 raise NotImplementedError()
98class MSeedArchive(Archive):
99 template = String.T(default=op.join(
100 '%(tmin_year)s',
101 '%(tmin_month)s',
102 '%(tmin_day)s',
103 'trace_%(network)s_%(station)s_%(location)s_%(channel)s'
104 + '_%(tmin_us)s_%(tmax_us)s.mseed'))
106 def __init__(self, **kwargs):
107 Archive.__init__(self, **kwargs)
108 self._base_path = None
110 def set_base_path(self, path):
111 self._base_path = path
113 def add(self, trs):
114 path = op.join(self._base_path, self.template)
115 return io.save(trs, path, overwrite=True)
118def combine_selections(selection):
119 out = []
120 last = None
121 for this in selection:
122 if last and this[:4] == last[:4] and this[4] == last[5]:
123 last = last[:5] + (this[5],)
124 else:
125 if last:
126 out.append(last)
128 last = this
130 if last:
131 out.append(last)
133 return out
136def orders_sort_key(order):
137 return (order.codes, order.tmin)
140def orders_to_selection(orders):
141 selection = []
142 for order in sorted(orders, key=orders_sort_key):
143 selection.append(
144 order.codes[1:5] + (order.tmin, order.tmax))
146 return combine_selections(selection)
149class ErrorEntry(Object):
150 time = Timestamp.T()
151 order = WaveformOrder.T()
152 kind = String.T()
153 details = String.T(optional=True)
156class ErrorAggregate(Object):
157 site = String.T()
158 kind = String.T()
159 details = String.T()
160 entries = List.T(ErrorEntry.T())
161 codes_list = List.T(Tuple.T(None, String.T()))
162 time_spans = List.T(Tuple.T(2, Timestamp.T()))
164 def __str__(self):
165 codes = ['.'.join(x) for x in self.codes_list]
166 scodes = '\n' + util.ewrap(codes, indent=' ') if codes else '<none>'
167 tss = self.time_spans
168 sspans = '\n' + util.ewrap(('%s - %s' % (
169 util.time_to_str(ts[0]), util.time_to_str(ts[1])) for ts in tss),
170 indent=' ')
172 return ('FDSN "%s": download error summary for "%s" (%i)\n%s '
173 'Codes:%s\n Time spans:%s') % (
174 self.site,
175 self.kind,
176 len(self.entries),
177 ' Details: %s\n' % self.details if self.details else '',
178 scodes,
179 sspans)
182class ErrorLog(Object):
183 site = String.T()
184 entries = List.T(ErrorEntry.T())
185 checkpoints = List.T(Int.T())
187 def append_checkpoint(self):
188 self.checkpoints.append(len(self.entries))
190 def append(self, time, order, kind, details=''):
191 entry = ErrorEntry(time=time, order=order, kind=kind, details=details)
192 self.entries.append(entry)
194 def iter_aggregates(self):
195 by_kind_details = defaultdict(list)
196 for entry in self.entries:
197 by_kind_details[entry.kind, entry.details].append(entry)
199 kind_details = sorted(by_kind_details.keys())
201 for kind, details in kind_details:
202 entries = by_kind_details[kind, details]
203 codes_list = sorted(set(entry.order.codes for entry in entries))
204 selection = orders_to_selection(entry.order for entry in entries)
205 time_spans = sorted(set(row[-2:] for row in selection))
206 yield ErrorAggregate(
207 site=self.site,
208 kind=kind,
209 details=details,
210 entries=entries,
211 codes_list=codes_list,
212 time_spans=time_spans)
214 def summarize_recent(self):
215 ioff = self.checkpoints[-1] if self.checkpoints else 0
216 recent = self.entries[ioff:]
217 kinds = sorted(set(entry.kind for entry in recent))
218 if recent:
219 return '%i error%s (%s)' % (
220 len(recent), plural_s(recent), '; '.join(kinds))
221 else:
222 return ''
225class FDSNSource(Source):
227 '''
228 Squirrel data-source to transparently get data from FDSN web services.
230 Attaching an :py:class:`FDSNSource` object to a :py:class:`Squirrel` allows
231 the latter to download station and waveform data from an FDSN web service
232 should the data not already happen to be available locally.
233 '''
235 site = String.T(
236 help='FDSN site url or alias name (see '
237 ':py:mod:`pyrocko.client.fdsn`).')
239 query_args = Dict.T(
240 String.T(), String.T(),
241 optional=True,
242 help='Common query arguments, which are appended to all queries.')
244 expires = Duration.T(
245 optional=True,
246 help='Expiration time [s]. Information older than this will be '
247 'refreshed. This only applies to station-metadata. Waveforms do '
248 'not expire. If set to ``None`` neither type of data expires.')
250 cache_path = String.T(
251 optional=True,
252 help='Directory path where any downloaded waveforms and station '
253 'meta-data are to be kept. By default the Squirrel '
254 'environment\'s cache directory is used.')
256 shared_waveforms = Bool.T(
257 default=True,
258 help='If ``True``, waveforms are shared with other FDSN sources in '
259 'the same Squirrel environment. If ``False``, they are kept '
260 'separate.')
262 user_credentials = Tuple.T(
263 2, String.T(),
264 optional=True,
265 help='User and password for FDSN servers requiring password '
266 'authentication')
268 auth_token = String.T(
269 optional=True,
270 help='Authentication token to be presented to the FDSN server.')
272 auth_token_path = String.T(
273 optional=True,
274 help='Path to file containing the authentication token to be '
275 'presented to the FDSN server.')
277 def __init__(self, site, query_args=None, **kwargs):
278 Source.__init__(self, site=site, query_args=query_args, **kwargs)
280 self._constraint = None
281 self._hash = self.make_hash()
282 self._source_id = 'client:fdsn:%s' % self._hash
283 self._error_infos = []
285 def describe(self):
286 return self._source_id
288 def make_hash(self):
289 s = self.site
290 s += 'notoken' \
291 if (self.auth_token is None and self.auth_token_path is None) \
292 else 'token'
294 if self.user_credentials is not None:
295 s += self.user_credentials[0]
296 else:
297 s += 'nocred'
299 if self.query_args is not None:
300 s += ','.join(
301 '%s:%s' % (k, self.query_args[k])
302 for k in sorted(self.query_args.keys()))
303 else:
304 s += 'noqueryargs'
306 return ehash(s)
308 def get_hash(self):
309 return self._hash
311 def get_auth_token(self):
312 if self.auth_token:
313 return self.auth_token
315 elif self.auth_token_path is not None:
316 try:
317 with open(self.auth_token_path, 'rb') as f:
318 return f.read().decode('ascii')
320 except OSError as e:
321 raise FileLoadError(
322 'Cannot load auth token file (%s): %s'
323 % (str(e), self.auth_token_path))
325 else:
326 raise Exception(
327 'FDSNSource: auth_token and auth_token_path are mutually '
328 'exclusive.')
330 def setup(self, squirrel, check=True):
331 self._cache_path = op.join(
332 self.cache_path or squirrel._cache_path, 'fdsn')
334 util.ensuredir(self._cache_path)
335 self._load_constraint()
336 self._archive = MSeedArchive()
337 waveforms_path = self._get_waveforms_path()
338 util.ensuredir(waveforms_path)
339 self._archive.set_base_path(waveforms_path)
341 squirrel.add(
342 self._get_waveforms_path(),
343 check=check)
345 fn = self._get_channels_path()
346 if os.path.exists(fn):
347 squirrel.add(fn)
349 squirrel.add_virtual(
350 [], virtual_paths=[self._source_id])
352 responses_path = self._get_responses_path()
353 if os.path.exists(responses_path):
354 squirrel.add(responses_path, kinds=['response'])
356 def _get_constraint_path(self):
357 return op.join(self._cache_path, self._hash, 'constraint.pickle')
359 def _get_channels_path(self):
360 return op.join(self._cache_path, self._hash, 'channels.stationxml')
362 def _get_responses_path(self, nslc=None):
363 dirpath = op.join(
364 self._cache_path, self._hash, 'responses')
366 if nslc is None:
367 return dirpath
368 else:
369 return op.join(
370 dirpath, 'response_%s_%s_%s_%s.stationxml' % nslc)
372 def _get_waveforms_path(self):
373 if self.shared_waveforms:
374 return op.join(self._cache_path, 'waveforms')
375 else:
376 return op.join(self._cache_path, self._hash, 'waveforms')
378 def _log_meta(self, message, target=logger.info):
379 log_prefix = 'FDSN "%s" metadata:' % self.site
380 target(' '.join((log_prefix, message)))
382 def _log_responses(self, message, target=logger.info):
383 log_prefix = 'FDSN "%s" responses:' % self.site
384 target(' '.join((log_prefix, message)))
386 def _log_info_data(self, *args):
387 log_prefix = 'FDSN "%s" waveforms:' % self.site
388 logger.info(' '.join((log_prefix,) + args))
390 def _str_expires(self, t, now):
391 if t is None:
392 return 'expires: never'
393 else:
394 expire = 'expires' if t > now else 'expired'
395 return '%s: %s' % (
396 expire,
397 util.time_to_str(t, format='%Y-%m-%d %H:%M:%S'))
399 def update_channel_inventory(self, squirrel, constraint=None):
400 if constraint is None:
401 constraint = Constraint()
403 expiration_time = self._get_channels_expiration_time()
404 now = time.time()
406 log_target = logger.info
407 if self._constraint and self._constraint.contains(constraint) \
408 and (expiration_time is None or now < expiration_time):
410 status = 'using cached'
412 else:
413 if self._constraint:
414 constraint_temp = copy.deepcopy(self._constraint)
415 constraint_temp.expand(constraint)
416 constraint = constraint_temp
418 try:
419 channel_sx = self._do_channel_query(constraint)
421 channel_sx.created = None # timestamp would ruin diff
423 fn = self._get_channels_path()
424 util.ensuredirs(fn)
425 fn_temp = fn + '.%i.temp' % os.getpid()
426 channel_sx.dump_xml(filename=fn_temp)
428 status = move_or_keep(fn_temp, fn)
430 if status == 'upstream unchanged':
431 squirrel.get_database().silent_touch(fn)
433 self._constraint = constraint
434 self._dump_constraint()
436 except OSError as e:
437 status = 'update failed (%s)' % str(e)
438 log_target = logger.error
440 expiration_time = self._get_channels_expiration_time()
441 self._log_meta(
442 '%s (%s)' % (status, self._str_expires(expiration_time, now)),
443 target=log_target)
445 fn = self._get_channels_path()
446 if os.path.exists(fn):
447 squirrel.add(fn)
449 def _do_channel_query(self, constraint):
450 extra_args = {}
452 if self.site in sites_not_supporting['startbefore']:
453 if constraint.tmin is not None and constraint.tmin != g_tmin:
454 extra_args['starttime'] = constraint.tmin
455 if constraint.tmax is not None and constraint.tmax != g_tmax:
456 extra_args['endtime'] = constraint.tmax
458 else:
459 if constraint.tmin is not None and constraint.tmin != g_tmin:
460 extra_args['endafter'] = constraint.tmin
461 if constraint.tmax is not None and constraint.tmax != g_tmax:
462 extra_args['startbefore'] = constraint.tmax
464 if self.site not in sites_not_supporting['includerestricted']:
465 extra_args.update(
466 includerestricted=(
467 self.user_credentials is not None
468 or self.auth_token is not None
469 or self.auth_token_path is not None))
471 if self.query_args is not None:
472 extra_args.update(self.query_args)
474 self._log_meta('querying...')
476 try:
477 channel_sx = fdsn.station(
478 site=self.site,
479 format='text',
480 level='channel',
481 **extra_args)
482 return channel_sx
484 except fdsn.EmptyResult:
485 return stationxml.FDSNStationXML(source='dummy-empty-result')
487 def _load_constraint(self):
488 fn = self._get_constraint_path()
489 if op.exists(fn):
490 with open(fn, 'rb') as f:
491 self._constraint = pickle.load(f)
492 else:
493 self._constraint = None
495 def _dump_constraint(self):
496 with open(self._get_constraint_path(), 'wb') as f:
497 pickle.dump(self._constraint, f, protocol=2)
499 def _get_expiration_time(self, path):
500 if self.expires is None:
501 return None
503 try:
504 t = os.stat(path)[8]
505 return t + self.expires
507 except OSError:
508 return 0.0
510 def _get_channels_expiration_time(self):
511 return self._get_expiration_time(self._get_channels_path())
513 def update_waveform_promises(self, squirrel):
514 cpath = os.path.abspath(self._get_channels_path())
515 nuts = squirrel.iter_nuts('channel', path=cpath)
517 path = self._source_id
518 squirrel.add_virtual(
519 (make_waveform_promise_nut(
520 file_path=path,
521 **nut.waveform_promise_kwargs) for nut in nuts),
522 virtual_paths=[path])
524 def _get_user_credentials(self):
525 d = {}
526 if self.user_credentials is not None:
527 d['user'], d['passwd'] = self.user_credentials
529 if self.auth_token is not None or self.auth_token_path is not None:
530 d['token'] = self.get_auth_token()
532 return d
534 def download_waveforms(
535 self, orders, success, batch_add, error_permanent,
536 error_temporary):
538 elog = ErrorLog(site=self.site)
539 orders.sort(key=orders_sort_key)
540 neach = 20
541 i = 0
542 task = make_task(
543 'FDSN "%s" waveforms: downloading' % self.site, len(orders))
545 while i < len(orders):
546 orders_now = orders[i:i+neach]
547 selection_now = orders_to_selection(orders_now)
549 nsuccess = 0
550 elog.append_checkpoint()
551 self._log_info_data(
552 'downloading, %s' % order_summary(orders_now))
554 all_paths = []
555 with tempfile.TemporaryDirectory() as tmpdir:
556 try:
557 data = fdsn.dataselect(
558 site=self.site, selection=selection_now,
559 **self._get_user_credentials())
561 now = time.time()
563 path = op.join(tmpdir, 'tmp.mseed')
564 with open(path, 'wb') as f:
565 while True:
566 buf = data.read(1024)
567 if not buf:
568 break
569 f.write(buf)
571 trs = io.load(path)
573 by_nslc = defaultdict(list)
574 for tr in trs:
575 by_nslc[tr.nslc_id].append(tr)
577 for order in orders_now:
578 trs_order = []
579 err_this = None
580 for tr in by_nslc[order.codes[1:5]]:
581 try:
582 order.validate(tr)
583 trs_order.append(tr.chop(
584 order.tmin, order.tmax, inplace=False))
586 except trace.NoData:
587 err_this = (
588 'empty result', 'empty sub-interval')
590 except InvalidWaveform as e:
591 err_this = ('invalid waveform', str(e))
593 if len(trs_order) == 0:
594 if err_this is None:
595 err_this = ('empty result', '')
597 elog.append(now, order, *err_this)
598 error_permanent(order)
599 else:
600 if len(trs_order) != 1:
601 if err_this:
602 elog.append(
603 now, order,
604 'partial result, %s' % err_this[0],
605 err_this[1])
606 else:
607 elog.append(now, order, 'partial result')
609 paths = self._archive.add(trs_order)
610 all_paths.extend(paths)
612 nsuccess += 1
613 success(order)
615 except fdsn.EmptyResult:
616 now = time.time()
617 for order in orders_now:
618 elog.append(now, order, 'empty result')
619 error_permanent(order)
621 except util.HTTPError as e:
622 now = time.time()
623 for order in orders_now:
624 elog.append(now, order, 'http error', str(e))
625 error_temporary(order)
627 emessage = elog.summarize_recent()
628 self._log_info_data(
629 '%i download%s successful' % (nsuccess, plural_s(nsuccess))
630 + (', %s' % emessage if emessage else ''))
632 if all_paths:
633 batch_add(all_paths)
635 i += neach
636 task.update(i)
638 for agg in elog.iter_aggregates():
639 logger.warning(str(agg))
641 task.done()
643 def _do_response_query(self, selection):
644 extra_args = {}
646 if self.site not in sites_not_supporting['includerestricted']:
647 extra_args.update(
648 includerestricted=(
649 self.user_credentials is not None
650 or self.auth_token is not None
651 or self.auth_token_path is not None))
653 self._log_responses('querying...')
655 try:
656 response_sx = fdsn.station(
657 site=self.site,
658 level='response',
659 selection=selection,
660 **extra_args)
662 return response_sx
664 except fdsn.EmptyResult:
665 return stationxml.FDSNStationXML(source='dummy-empty-result')
667 def update_response_inventory(self, squirrel, constraint):
668 cpath = os.path.abspath(self._get_channels_path())
669 nuts = squirrel.iter_nuts('channel', path=cpath)
671 tmin = g_tmin_queries
672 tmax = g_tmax
674 selection = []
675 now = time.time()
676 have = set()
677 status = defaultdict(list)
678 for nut in nuts:
679 nslc = nut.codes_tuple[1:5]
680 if nslc in have:
681 continue
682 have.add(nslc)
684 fn = self._get_responses_path(nslc)
685 expiration_time = self._get_expiration_time(fn)
686 if os.path.exists(fn) \
687 and (expiration_time is None or now < expiration_time):
688 status['using cached'].append(nslc)
689 else:
690 selection.append(nslc + (tmin, tmax))
692 dummy = stationxml.FDSNStationXML(source='dummy-empty')
693 neach = 100
694 i = 0
695 fns = []
696 while i < len(selection):
697 selection_now = selection[i:i+neach]
698 i += neach
700 try:
701 sx = self._do_response_query(selection_now)
702 except Exception as e:
703 status['update failed (%s)' % str(e)].extend(
704 entry[:4] for entry in selection_now)
705 continue
707 sx.created = None # timestamp would ruin diff
709 by_nslc = dict(stationxml.split_channels(sx))
711 for entry in selection_now:
712 nslc = entry[:4]
713 response_sx = by_nslc.get(nslc, dummy)
714 try:
715 fn = self._get_responses_path(nslc)
716 fn_temp = fn + '.%i.temp' % os.getpid()
718 util.ensuredirs(fn_temp)
719 response_sx.dump_xml(filename=fn_temp)
721 status_this = move_or_keep(fn_temp, fn)
723 if status_this == 'upstream unchanged':
724 try:
725 squirrel.get_database().silent_touch(fn)
726 except ExecuteGet1Error:
727 pass
729 status[status_this].append(nslc)
730 fns.append(fn)
732 except OSError as e:
733 status['update failed (%s)' % str(e)].append(nslc)
735 for k in sorted(status):
736 if k.find('failed') != -1:
737 log_target = logger.error
738 else:
739 log_target = logger.info
741 self._log_responses(
742 '%s: %s' % (
743 k, codes_to_str_abbreviated(status[k])),
744 target=log_target)
746 squirrel.add(fns, kinds=['response'])
749__all__ = [
750 'FDSNSource',
751]