1# http://pyrocko.org - GPLv3
2#
3# The Pyrocko Developers, 21st Century
4# ---|P------/S----------~Lg----------
6from __future__ import absolute_import, print_function
8import time
9import os
10import copy
11import logging
12import tempfile
13from collections import defaultdict
14try:
15 import cPickle as pickle
16except ImportError:
17 import pickle
18import os.path as op
19from .base import Source, Constraint
20from ..model import make_waveform_promise_nut, ehash, InvalidWaveform, \
21 order_summary, WaveformOrder, g_tmin, g_tmax, g_tmin_queries, \
22 codes_to_str_abbreviated
23from ..database import ExecuteGet1Error
24from pyrocko.client import fdsn
26from pyrocko import util, trace, io
27from pyrocko.io.io_common import FileLoadError
28from pyrocko.io import stationxml
29from pyrocko.progress import progress
31from pyrocko.guts import Object, String, Timestamp, List, Tuple, Int, Dict, \
32 Duration, Bool, clone
34guts_prefix = 'squirrel'
36fdsn.g_timeout = 60.
38logger = logging.getLogger('psq.client.fdsn')
40sites_not_supporting = {
41 'startbefore': ['geonet'],
42 'includerestricted': ['geonet']}
45def make_task(*args):
46 return progress.task(*args, logger=logger)
49def plural_s(x):
50 if not isinstance(x, int):
51 x = len(x)
53 return 's' if x != 1 else ''
56def diff(fn_a, fn_b):
57 try:
58 if os.stat(fn_a).st_size != os.stat(fn_b).st_size:
59 return True
61 except OSError:
62 return True
64 with open(fn_a, 'rb') as fa:
65 with open(fn_b, 'rb') as fb:
66 while True:
67 a = fa.read(1024)
68 b = fb.read(1024)
69 if a != b:
70 return True
72 if len(a) == 0 or len(b) == 0:
73 return False
76def move_or_keep(fn_temp, fn):
77 if op.exists(fn):
78 if diff(fn, fn_temp):
79 os.rename(fn_temp, fn)
80 status = 'updated'
81 else:
82 os.unlink(fn_temp)
83 status = 'upstream unchanged'
85 else:
86 os.rename(fn_temp, fn)
87 status = 'new'
89 return status
92class Archive(Object):
94 def add(self):
95 raise NotImplementedError()
98class MSeedArchive(Archive):
99 template = String.T(default=op.join(
100 '%(tmin_year)s',
101 '%(tmin_month)s',
102 '%(tmin_day)s',
103 'trace_%(network)s_%(station)s_%(location)s_%(channel)s'
104 + '_%(tmin_us)s_%(tmax_us)s.mseed'))
106 def __init__(self, **kwargs):
107 Archive.__init__(self, **kwargs)
108 self._base_path = None
110 def set_base_path(self, path):
111 self._base_path = path
113 def add(self, trs):
114 path = op.join(self._base_path, self.template)
115 return io.save(trs, path, overwrite=True)
118def combine_selections(selection):
119 out = []
120 last = None
121 for this in selection:
122 if last and this[:4] == last[:4] and this[4] == last[5]:
123 last = last[:5] + (this[5],)
124 else:
125 if last:
126 out.append(last)
128 last = this
130 if last:
131 out.append(last)
133 return out
136def orders_sort_key(order):
137 return (order.codes, order.tmin)
140def orders_to_selection(orders):
141 selection = []
142 for order in sorted(orders, key=orders_sort_key):
143 selection.append(
144 order.codes[1:5] + (order.tmin, order.tmax))
146 return combine_selections(selection)
149class ErrorEntry(Object):
150 time = Timestamp.T()
151 order = WaveformOrder.T()
152 kind = String.T()
153 details = String.T(optional=True)
156class ErrorAggregate(Object):
157 site = String.T()
158 kind = String.T()
159 details = String.T()
160 entries = List.T(ErrorEntry.T())
161 codes_list = List.T(Tuple.T(None, String.T()))
162 time_spans = List.T(Tuple.T(2, Timestamp.T()))
164 def __str__(self):
165 codes = ['.'.join(x) for x in self.codes_list]
166 scodes = '\n' + util.ewrap(codes, indent=' ') if codes else '<none>'
167 tss = self.time_spans
168 sspans = '\n' + util.ewrap(('%s - %s' % (
169 util.time_to_str(ts[0]), util.time_to_str(ts[1])) for ts in tss),
170 indent=' ')
172 return ('FDSN "%s": download error summary for "%s" (%i)\n%s '
173 'Codes:%s\n Time spans:%s') % (
174 self.site,
175 self.kind,
176 len(self.entries),
177 ' Details: %s\n' % self.details if self.details else '',
178 scodes,
179 sspans)
182class ErrorLog(Object):
183 site = String.T()
184 entries = List.T(ErrorEntry.T())
185 checkpoints = List.T(Int.T())
187 def append_checkpoint(self):
188 self.checkpoints.append(len(self.entries))
190 def append(self, time, order, kind, details=''):
191 entry = ErrorEntry(time=time, order=order, kind=kind, details=details)
192 self.entries.append(entry)
194 def iter_aggregates(self):
195 by_kind_details = defaultdict(list)
196 for entry in self.entries:
197 by_kind_details[entry.kind, entry.details].append(entry)
199 kind_details = sorted(by_kind_details.keys())
201 for kind, details in kind_details:
202 entries = by_kind_details[kind, details]
203 codes_list = sorted(set(entry.order.codes for entry in entries))
204 selection = orders_to_selection(entry.order for entry in entries)
205 time_spans = sorted(set(row[-2:] for row in selection))
206 yield ErrorAggregate(
207 site=self.site,
208 kind=kind,
209 details=details,
210 entries=entries,
211 codes_list=codes_list,
212 time_spans=time_spans)
214 def summarize_recent(self):
215 ioff = self.checkpoints[-1] if self.checkpoints else 0
216 recent = self.entries[ioff:]
217 kinds = sorted(set(entry.kind for entry in recent))
218 if recent:
219 return '%i error%s (%s)' % (
220 len(recent), plural_s(recent), '; '.join(kinds))
221 else:
222 return ''
225class FDSNSource(Source):
227 '''
228 Squirrel data-source to transparently get data from FDSN web services.
230 Attaching an :py:class:`FDSNSource` object to a :py:class:`Squirrel` allows
231 the latter to download station and waveform data from an FDSN web service
232 should the data not already happen to be available locally.
233 '''
235 site = String.T(
236 help='FDSN site url or alias name (see '
237 ':py:mod:`pyrocko.client.fdsn`).')
239 query_args = Dict.T(
240 String.T(), String.T(),
241 optional=True,
242 help='Common query arguments, which are appended to all queries.')
244 expires = Duration.T(
245 optional=True,
246 help='Expiration time [s]. Information older than this will be '
247 'refreshed. This only applies to station-metadata. Waveforms do '
248 'not expire. If set to ``None`` neither type of data expires.')
250 cache_path = String.T(
251 optional=True,
252 help='Directory path where any downloaded waveforms and station '
253 'meta-data are to be kept. By default the Squirrel '
254 'environment\'s cache directory is used.')
256 shared_waveforms = Bool.T(
257 default=False,
258 help='If ``True``, waveforms are shared with other FDSN sources in '
259 'the same Squirrel environment. If ``False``, they are kept '
260 'separate.')
262 user_credentials = Tuple.T(
263 2, String.T(),
264 optional=True,
265 help='User and password for FDSN servers requiring password '
266 'authentication')
268 auth_token = String.T(
269 optional=True,
270 help='Authentication token to be presented to the FDSN server.')
272 auth_token_path = String.T(
273 optional=True,
274 help='Path to file containing the authentication token to be '
275 'presented to the FDSN server.')
277 def __init__(self, site, query_args=None, **kwargs):
278 Source.__init__(self, site=site, query_args=query_args, **kwargs)
280 self._constraint = None
281 self._hash = self.make_hash()
282 self._source_id = 'client:fdsn:%s' % self._hash
283 self._error_infos = []
285 def describe(self):
286 return self._source_id
288 def make_hash(self):
289 s = self.site
290 s += 'notoken' \
291 if (self.auth_token is None and self.auth_token_path is None) \
292 else 'token'
294 if self.user_credentials is not None:
295 s += self.user_credentials[0]
296 else:
297 s += 'nocred'
299 if self.query_args is not None:
300 s += ','.join(
301 '%s:%s' % (k, self.query_args[k])
302 for k in sorted(self.query_args.keys()))
303 else:
304 s += 'noqueryargs'
306 return ehash(s)
308 def get_hash(self):
309 return self._hash
311 def get_auth_token(self):
312 if self.auth_token:
313 return self.auth_token
315 elif self.auth_token_path is not None:
316 try:
317 with open(self.auth_token_path, 'rb') as f:
318 return f.read().decode('ascii')
320 except OSError as e:
321 raise FileLoadError(
322 'Cannot load auth token file (%s): %s'
323 % (str(e), self.auth_token_path))
325 else:
326 raise Exception(
327 'FDSNSource: auth_token and auth_token_path are mutually '
328 'exclusive.')
330 def setup(self, squirrel, check=True):
331 self._cache_path = op.join(
332 self.cache_path or squirrel._cache_path, 'fdsn')
334 util.ensuredir(self._cache_path)
335 self._load_constraint()
336 self._archive = MSeedArchive()
337 waveforms_path = self._get_waveforms_path()
338 util.ensuredir(waveforms_path)
339 self._archive.set_base_path(waveforms_path)
341 squirrel.add(
342 self._get_waveforms_path(),
343 check=check)
345 fn = self._get_channels_path()
346 if os.path.exists(fn):
347 squirrel.add(fn)
349 squirrel.add_virtual(
350 [], virtual_paths=[self._source_id])
352 responses_path = self._get_responses_path()
353 if os.path.exists(responses_path):
354 squirrel.add(responses_path, kinds=['response'])
356 def _get_constraint_path(self):
357 return op.join(self._cache_path, self._hash, 'constraint.pickle')
359 def _get_channels_path(self):
360 return op.join(self._cache_path, self._hash, 'channels.stationxml')
362 def _get_responses_path(self, nslc=None):
363 dirpath = op.join(
364 self._cache_path, self._hash, 'responses')
366 if nslc is None:
367 return dirpath
368 else:
369 return op.join(
370 dirpath, 'response_%s_%s_%s_%s.stationxml' % nslc)
372 def _get_waveforms_path(self):
373 if self.shared_waveforms:
374 return op.join(self._cache_path, 'waveforms')
375 else:
376 return op.join(self._cache_path, self._hash, 'waveforms')
378 def _log_meta(self, message, target=logger.info):
379 log_prefix = 'FDSN "%s" metadata:' % self.site
380 target(' '.join((log_prefix, message)))
382 def _log_responses(self, message, target=logger.info):
383 log_prefix = 'FDSN "%s" responses:' % self.site
384 target(' '.join((log_prefix, message)))
386 def _log_info_data(self, *args):
387 log_prefix = 'FDSN "%s" waveforms:' % self.site
388 logger.info(' '.join((log_prefix,) + args))
390 def _str_expires(self, t, now):
391 if t is None:
392 return 'expires: never'
393 else:
394 expire = 'expires' if t > now else 'expired'
395 return '%s: %s' % (
396 expire,
397 util.time_to_str(t, format='%Y-%m-%d %H:%M:%S'))
399 def update_channel_inventory(self, squirrel, constraint=None):
400 if constraint is None:
401 constraint = Constraint()
403 expiration_time = self._get_channels_expiration_time()
404 now = time.time()
406 log_target = logger.info
407 if self._constraint and self._constraint.contains(constraint) \
408 and (expiration_time is None or now < expiration_time):
410 status = 'using cached'
412 else:
413 if self._constraint:
414 constraint_temp = copy.deepcopy(self._constraint)
415 constraint_temp.expand(constraint)
416 constraint = constraint_temp
418 try:
419 channel_sx = self._do_channel_query(constraint)
421 channel_sx.created = None # timestamp would ruin diff
423 fn = self._get_channels_path()
424 util.ensuredirs(fn)
425 fn_temp = fn + '.%i.temp' % os.getpid()
426 channel_sx.dump_xml(filename=fn_temp)
428 status = move_or_keep(fn_temp, fn)
430 if status == 'upstream unchanged':
431 squirrel.get_database().silent_touch(fn)
433 self._constraint = constraint
434 self._dump_constraint()
436 except OSError as e:
437 status = 'update failed (%s)' % str(e)
438 log_target = logger.error
440 expiration_time = self._get_channels_expiration_time()
441 self._log_meta(
442 '%s (%s)' % (status, self._str_expires(expiration_time, now)),
443 target=log_target)
445 fn = self._get_channels_path()
446 if os.path.exists(fn):
447 squirrel.add(fn)
449 def _do_channel_query(self, constraint):
450 extra_args = {}
452 if self.site in sites_not_supporting['startbefore']:
453 if constraint.tmin is not None and constraint.tmin != g_tmin:
454 extra_args['starttime'] = constraint.tmin
455 if constraint.tmax is not None and constraint.tmax != g_tmax:
456 extra_args['endtime'] = constraint.tmax
458 else:
459 if constraint.tmin is not None and constraint.tmin != g_tmin:
460 extra_args['endafter'] = constraint.tmin
461 if constraint.tmax is not None and constraint.tmax != g_tmax:
462 extra_args['startbefore'] = constraint.tmax
464 if self.site not in sites_not_supporting['includerestricted']:
465 extra_args.update(
466 includerestricted=(
467 self.user_credentials is not None
468 or self.auth_token is not None
469 or self.auth_token_path is not None))
471 if self.query_args is not None:
472 extra_args.update(self.query_args)
474 self._log_meta('querying...')
476 try:
477 channel_sx = fdsn.station(
478 site=self.site,
479 format='text',
480 level='channel',
481 **extra_args)
482 return channel_sx
484 except fdsn.EmptyResult:
485 return stationxml.FDSNStationXML(source='dummy-empty-result')
487 def _load_constraint(self):
488 fn = self._get_constraint_path()
489 if op.exists(fn):
490 with open(fn, 'rb') as f:
491 self._constraint = pickle.load(f)
492 else:
493 self._constraint = None
495 def _dump_constraint(self):
496 with open(self._get_constraint_path(), 'wb') as f:
497 pickle.dump(self._constraint, f, protocol=2)
499 def _get_expiration_time(self, path):
500 if self.expires is None:
501 return None
503 try:
504 t = os.stat(path)[8]
505 return t + self.expires
507 except OSError:
508 return 0.0
510 def _get_channels_expiration_time(self):
511 return self._get_expiration_time(self._get_channels_path())
513 def update_waveform_promises(self, squirrel, constraint):
514 from ..base import gaps
515 now = time.time()
516 cpath = os.path.abspath(self._get_channels_path())
518 ctmin = constraint.tmin
519 ctmax = constraint.tmax
521 nuts = squirrel.iter_nuts(
522 'channel',
523 path=cpath,
524 codes=constraint.codes,
525 tmin=ctmin,
526 tmax=ctmax)
528 coverages = squirrel.get_coverage(
529 'waveform', codes_list=[constraint.codes],
530 tmin=ctmin,
531 tmax=ctmax,
532 return_raw=False)
534 codes_to_avail = defaultdict(list)
535 for coverage in coverages:
536 for tmin, tmax, _ in coverage.iter_spans():
537 codes_to_avail[coverage.codes].append((tmin, tmax))
539 def sgaps(nut):
540 for tmin, tmax in gaps(
541 codes_to_avail[nut.codes],
542 ctmin if ctmin is not None else nut.tmin,
543 ctmax if ctmax is not None else nut.tmax):
545 subnut = clone(nut)
546 subnut.tmin = tmin
547 subnut.tmax = tmax
548 yield subnut
550 def wanted(nuts):
551 for nut in nuts:
552 if nut.tmin < now:
553 if nut.tmax > now:
554 nut.tmax = now
556 for nut in sgaps(nut):
557 yield nut
559 path = self._source_id
560 squirrel.add_virtual(
561 (make_waveform_promise_nut(
562 file_path=path,
563 **nut.waveform_promise_kwargs) for nut in wanted(nuts)),
564 virtual_paths=[path])
566 def _get_user_credentials(self):
567 d = {}
568 if self.user_credentials is not None:
569 d['user'], d['passwd'] = self.user_credentials
571 if self.auth_token is not None or self.auth_token_path is not None:
572 d['token'] = self.get_auth_token()
574 return d
576 def download_waveforms(
577 self, orders, success, batch_add, error_permanent,
578 error_temporary):
580 elog = ErrorLog(site=self.site)
581 orders.sort(key=orders_sort_key)
582 neach = 20
583 i = 0
584 task = make_task(
585 'FDSN "%s" waveforms: downloading' % self.site, len(orders))
587 while i < len(orders):
588 orders_now = orders[i:i+neach]
589 selection_now = orders_to_selection(orders_now)
591 nsuccess = 0
592 elog.append_checkpoint()
593 self._log_info_data(
594 'downloading, %s' % order_summary(orders_now))
596 all_paths = []
597 with tempfile.TemporaryDirectory() as tmpdir:
598 try:
599 data = fdsn.dataselect(
600 site=self.site, selection=selection_now,
601 **self._get_user_credentials())
603 now = time.time()
605 path = op.join(tmpdir, 'tmp.mseed')
606 with open(path, 'wb') as f:
607 while True:
608 buf = data.read(1024)
609 if not buf:
610 break
611 f.write(buf)
613 trs = io.load(path)
615 by_nslc = defaultdict(list)
616 for tr in trs:
617 by_nslc[tr.nslc_id].append(tr)
619 for order in orders_now:
620 trs_order = []
621 err_this = None
622 for tr in by_nslc[order.codes[1:5]]:
623 try:
624 order.validate(tr)
625 trs_order.append(tr.chop(
626 order.tmin, order.tmax, inplace=False))
628 except trace.NoData:
629 err_this = (
630 'empty result', 'empty sub-interval')
632 except InvalidWaveform as e:
633 err_this = ('invalid waveform', str(e))
635 if len(trs_order) == 0:
636 if err_this is None:
637 err_this = ('empty result', '')
639 elog.append(now, order, *err_this)
640 error_permanent(order)
641 else:
642 if len(trs_order) != 1:
643 if err_this:
644 elog.append(
645 now, order,
646 'partial result, %s' % err_this[0],
647 err_this[1])
648 else:
649 elog.append(now, order, 'partial result')
651 paths = self._archive.add(trs_order)
652 all_paths.extend(paths)
654 nsuccess += 1
655 success(order)
657 except fdsn.EmptyResult:
658 now = time.time()
659 for order in orders_now:
660 elog.append(now, order, 'empty result')
661 error_permanent(order)
663 except util.HTTPError as e:
664 now = time.time()
665 for order in orders_now:
666 elog.append(now, order, 'http error', str(e))
667 error_temporary(order)
669 emessage = elog.summarize_recent()
670 self._log_info_data(
671 '%i download%s successful' % (nsuccess, plural_s(nsuccess))
672 + (', %s' % emessage if emessage else ''))
674 if all_paths:
675 batch_add(all_paths)
677 i += neach
678 task.update(i)
680 for agg in elog.iter_aggregates():
681 logger.warning(str(agg))
683 task.done()
685 def _do_response_query(self, selection):
686 extra_args = {}
688 if self.site not in sites_not_supporting['includerestricted']:
689 extra_args.update(
690 includerestricted=(
691 self.user_credentials is not None
692 or self.auth_token is not None
693 or self.auth_token_path is not None))
695 self._log_responses('querying...')
697 try:
698 response_sx = fdsn.station(
699 site=self.site,
700 level='response',
701 selection=selection,
702 **extra_args)
704 return response_sx
706 except fdsn.EmptyResult:
707 return stationxml.FDSNStationXML(source='dummy-empty-result')
709 def update_response_inventory(self, squirrel, constraint):
710 cpath = os.path.abspath(self._get_channels_path())
711 nuts = squirrel.iter_nuts('channel', path=cpath)
713 tmin = g_tmin_queries
714 tmax = g_tmax
716 selection = []
717 now = time.time()
718 have = set()
719 status = defaultdict(list)
720 for nut in nuts:
721 nslc = nut.codes_tuple[1:5]
722 if nslc in have:
723 continue
724 have.add(nslc)
726 fn = self._get_responses_path(nslc)
727 expiration_time = self._get_expiration_time(fn)
728 if os.path.exists(fn) \
729 and (expiration_time is None or now < expiration_time):
730 status['using cached'].append(nslc)
731 else:
732 selection.append(nslc + (tmin, tmax))
734 dummy = stationxml.FDSNStationXML(source='dummy-empty')
735 neach = 100
736 i = 0
737 fns = []
738 while i < len(selection):
739 selection_now = selection[i:i+neach]
740 i += neach
742 try:
743 sx = self._do_response_query(selection_now)
744 except Exception as e:
745 status['update failed (%s)' % str(e)].extend(
746 entry[:4] for entry in selection_now)
747 continue
749 sx.created = None # timestamp would ruin diff
751 by_nslc = dict(stationxml.split_channels(sx))
753 for entry in selection_now:
754 nslc = entry[:4]
755 response_sx = by_nslc.get(nslc, dummy)
756 try:
757 fn = self._get_responses_path(nslc)
758 fn_temp = fn + '.%i.temp' % os.getpid()
760 util.ensuredirs(fn_temp)
761 response_sx.dump_xml(filename=fn_temp)
763 status_this = move_or_keep(fn_temp, fn)
765 if status_this == 'upstream unchanged':
766 try:
767 squirrel.get_database().silent_touch(fn)
768 except ExecuteGet1Error:
769 pass
771 status[status_this].append(nslc)
772 fns.append(fn)
774 except OSError as e:
775 status['update failed (%s)' % str(e)].append(nslc)
777 for k in sorted(status):
778 if k.find('failed') != -1:
779 log_target = logger.error
780 else:
781 log_target = logger.info
783 self._log_responses(
784 '%s: %s' % (
785 k, codes_to_str_abbreviated(status[k])),
786 target=log_target)
788 squirrel.add(fns, kinds=['response'])
791__all__ = [
792 'FDSNSource',
793]