1# http://pyrocko.org - GPLv3
2#
3# The Pyrocko Developers, 21st Century
4# ---|P------/S----------~Lg----------
6from __future__ import absolute_import, print_function
8import time
9import os
10import copy
11import logging
12import tempfile
13from collections import defaultdict
14try:
15 import cPickle as pickle
16except ImportError:
17 import pickle
18import os.path as op
19from .base import Source, Constraint
20from ..model import make_waveform_promise_nut, ehash, InvalidWaveform, \
21 order_summary, WaveformOrder, g_tmin, g_tmax, g_tmin_queries, \
22 codes_to_str_abbreviated, CodesNSLCE
23from ..database import ExecuteGet1Error
24from pyrocko.client import fdsn
26from pyrocko import util, trace, io
27from pyrocko.io.io_common import FileLoadError
28from pyrocko.io import stationxml
29from pyrocko.progress import progress
31from pyrocko.guts import Object, String, Timestamp, List, Tuple, Int, Dict, \
32 Duration, Bool, clone
34guts_prefix = 'squirrel'
36fdsn.g_timeout = 60.
38logger = logging.getLogger('psq.client.fdsn')
40sites_not_supporting = {
41 'startbefore': ['geonet'],
42 'includerestricted': ['geonet']}
45def make_task(*args):
46 return progress.task(*args, logger=logger)
49def diff(fn_a, fn_b):
50 try:
51 if os.stat(fn_a).st_size != os.stat(fn_b).st_size:
52 return True
54 except OSError:
55 return True
57 with open(fn_a, 'rb') as fa:
58 with open(fn_b, 'rb') as fb:
59 while True:
60 a = fa.read(1024)
61 b = fb.read(1024)
62 if a != b:
63 return True
65 if len(a) == 0 or len(b) == 0:
66 return False
69def move_or_keep(fn_temp, fn):
70 if op.exists(fn):
71 if diff(fn, fn_temp):
72 os.rename(fn_temp, fn)
73 status = 'updated'
74 else:
75 os.unlink(fn_temp)
76 status = 'upstream unchanged'
78 else:
79 os.rename(fn_temp, fn)
80 status = 'new'
82 return status
85class Archive(Object):
87 def add(self):
88 raise NotImplementedError()
91class MSeedArchive(Archive):
92 template = String.T(default=op.join(
93 '%(tmin_year)s',
94 '%(tmin_month)s',
95 '%(tmin_day)s',
96 'trace_%(network)s_%(station)s_%(location)s_%(channel)s'
97 + '_%(tmin_us)s_%(tmax_us)s.mseed'))
99 def __init__(self, **kwargs):
100 Archive.__init__(self, **kwargs)
101 self._base_path = None
103 def set_base_path(self, path):
104 self._base_path = path
106 def add(self, trs):
107 path = op.join(self._base_path, self.template)
108 return io.save(trs, path, overwrite=True)
111def combine_selections(selection):
112 out = []
113 last = None
114 for this in selection:
115 if last and this[:4] == last[:4] and this[4] == last[5]:
116 last = last[:5] + (this[5],)
117 else:
118 if last:
119 out.append(last)
121 last = this
123 if last:
124 out.append(last)
126 return out
129def orders_sort_key(order):
130 return (order.codes, order.tmin)
133def orders_to_selection(orders):
134 selection = []
135 for order in sorted(orders, key=orders_sort_key):
136 selection.append(
137 order.codes.nslc + (order.tmin, order.tmax))
139 return combine_selections(selection)
142class ErrorEntry(Object):
143 time = Timestamp.T()
144 order = WaveformOrder.T()
145 kind = String.T()
146 details = String.T(optional=True)
149class ErrorAggregate(Object):
150 site = String.T()
151 kind = String.T()
152 details = String.T()
153 entries = List.T(ErrorEntry.T())
154 codes_list = List.T(CodesNSLCE.T())
155 time_spans = List.T(Tuple.T(2, Timestamp.T()))
157 def __str__(self):
158 codes = [str(x) for x in self.codes_list]
159 scodes = '\n' + util.ewrap(codes, indent=' ') if codes else '<none>'
160 tss = self.time_spans
161 sspans = '\n' + util.ewrap(('%s - %s' % (
162 util.time_to_str(ts[0]), util.time_to_str(ts[1])) for ts in tss),
163 indent=' ')
165 return ('FDSN "%s": download error summary for "%s" (%i)\n%s '
166 'Codes:%s\n Time spans:%s') % (
167 self.site,
168 self.kind,
169 len(self.entries),
170 ' Details: %s\n' % self.details if self.details else '',
171 scodes,
172 sspans)
175class ErrorLog(Object):
176 site = String.T()
177 entries = List.T(ErrorEntry.T())
178 checkpoints = List.T(Int.T())
180 def append_checkpoint(self):
181 self.checkpoints.append(len(self.entries))
183 def append(self, time, order, kind, details=''):
184 entry = ErrorEntry(time=time, order=order, kind=kind, details=details)
185 self.entries.append(entry)
187 def iter_aggregates(self):
188 by_kind_details = defaultdict(list)
189 for entry in self.entries:
190 by_kind_details[entry.kind, entry.details].append(entry)
192 kind_details = sorted(by_kind_details.keys())
194 for kind, details in kind_details:
195 entries = by_kind_details[kind, details]
196 codes_list = sorted(set(entry.order.codes for entry in entries))
197 selection = orders_to_selection(entry.order for entry in entries)
198 time_spans = sorted(set(row[-2:] for row in selection))
199 yield ErrorAggregate(
200 site=self.site,
201 kind=kind,
202 details=details,
203 entries=entries,
204 codes_list=codes_list,
205 time_spans=time_spans)
207 def summarize_recent(self):
208 ioff = self.checkpoints[-1] if self.checkpoints else 0
209 recent = self.entries[ioff:]
210 kinds = sorted(set(entry.kind for entry in recent))
211 if recent:
212 return '%i error%s (%s)' % (
213 len(recent), util.plural_s(recent), '; '.join(kinds))
214 else:
215 return ''
218class FDSNSource(Source):
220 '''
221 Squirrel data-source to transparently get data from FDSN web services.
223 Attaching an :py:class:`FDSNSource` object to a :py:class:`Squirrel` allows
224 the latter to download station and waveform data from an FDSN web service
225 should the data not already happen to be available locally.
226 '''
228 site = String.T(
229 help='FDSN site url or alias name (see '
230 ':py:mod:`pyrocko.client.fdsn`).')
232 query_args = Dict.T(
233 String.T(), String.T(),
234 optional=True,
235 help='Common query arguments, which are appended to all queries.')
237 expires = Duration.T(
238 optional=True,
239 help='Expiration time [s]. Information older than this will be '
240 'refreshed. This only applies to station-metadata. Waveforms do '
241 'not expire. If set to ``None`` neither type of data expires.')
243 cache_path = String.T(
244 optional=True,
245 help='Directory path where any downloaded waveforms and station '
246 'meta-data are to be kept. By default the Squirrel '
247 'environment\'s cache directory is used.')
249 shared_waveforms = Bool.T(
250 default=False,
251 help='If ``True``, waveforms are shared with other FDSN sources in '
252 'the same Squirrel environment. If ``False``, they are kept '
253 'separate.')
255 user_credentials = Tuple.T(
256 2, String.T(),
257 optional=True,
258 help='User and password for FDSN servers requiring password '
259 'authentication')
261 auth_token = String.T(
262 optional=True,
263 help='Authentication token to be presented to the FDSN server.')
265 auth_token_path = String.T(
266 optional=True,
267 help='Path to file containing the authentication token to be '
268 'presented to the FDSN server.')
270 def __init__(self, site, query_args=None, **kwargs):
271 Source.__init__(self, site=site, query_args=query_args, **kwargs)
273 self._constraint = None
274 self._hash = self.make_hash()
275 self._source_id = 'client:fdsn:%s' % self._hash
276 self._error_infos = []
278 def describe(self):
279 return self._source_id
281 def make_hash(self):
282 s = self.site
283 s += 'notoken' \
284 if (self.auth_token is None and self.auth_token_path is None) \
285 else 'token'
287 if self.user_credentials is not None:
288 s += self.user_credentials[0]
289 else:
290 s += 'nocred'
292 if self.query_args is not None:
293 s += ','.join(
294 '%s:%s' % (k, self.query_args[k])
295 for k in sorted(self.query_args.keys()))
296 else:
297 s += 'noqueryargs'
299 return ehash(s)
301 def get_hash(self):
302 return self._hash
304 def get_auth_token(self):
305 if self.auth_token:
306 return self.auth_token
308 elif self.auth_token_path is not None:
309 try:
310 with open(self.auth_token_path, 'rb') as f:
311 return f.read().decode('ascii')
313 except OSError as e:
314 raise FileLoadError(
315 'Cannot load auth token file (%s): %s'
316 % (str(e), self.auth_token_path))
318 else:
319 raise Exception(
320 'FDSNSource: auth_token and auth_token_path are mutually '
321 'exclusive.')
323 def setup(self, squirrel, check=True):
324 self._cache_path = op.join(
325 self.cache_path or squirrel._cache_path, 'fdsn')
327 util.ensuredir(self._cache_path)
328 self._load_constraint()
329 self._archive = MSeedArchive()
330 waveforms_path = self._get_waveforms_path()
331 util.ensuredir(waveforms_path)
332 self._archive.set_base_path(waveforms_path)
334 squirrel.add(
335 self._get_waveforms_path(),
336 check=check)
338 fn = self._get_channels_path()
339 if os.path.exists(fn):
340 squirrel.add(fn)
342 squirrel.add_virtual(
343 [], virtual_paths=[self._source_id])
345 responses_path = self._get_responses_path()
346 if os.path.exists(responses_path):
347 squirrel.add(responses_path, kinds=['response'])
349 def _get_constraint_path(self):
350 return op.join(self._cache_path, self._hash, 'constraint.pickle')
352 def _get_channels_path(self):
353 return op.join(self._cache_path, self._hash, 'channels.stationxml')
355 def _get_responses_path(self, nslc=None):
356 dirpath = op.join(
357 self._cache_path, self._hash, 'responses')
359 if nslc is None:
360 return dirpath
361 else:
362 return op.join(
363 dirpath, 'response_%s_%s_%s_%s.stationxml' % nslc)
365 def _get_waveforms_path(self):
366 if self.shared_waveforms:
367 return op.join(self._cache_path, 'waveforms')
368 else:
369 return op.join(self._cache_path, self._hash, 'waveforms')
371 def _log_meta(self, message, target=logger.info):
372 log_prefix = 'FDSN "%s" metadata:' % self.site
373 target(' '.join((log_prefix, message)))
375 def _log_responses(self, message, target=logger.info):
376 log_prefix = 'FDSN "%s" responses:' % self.site
377 target(' '.join((log_prefix, message)))
379 def _log_info_data(self, *args):
380 log_prefix = 'FDSN "%s" waveforms:' % self.site
381 logger.info(' '.join((log_prefix,) + args))
383 def _str_expires(self, t, now):
384 if t is None:
385 return 'expires: never'
386 else:
387 expire = 'expires' if t > now else 'expired'
388 return '%s: %s' % (
389 expire,
390 util.time_to_str(t, format='%Y-%m-%d %H:%M:%S'))
392 def update_channel_inventory(self, squirrel, constraint=None):
393 if constraint is None:
394 constraint = Constraint()
396 expiration_time = self._get_channels_expiration_time()
397 now = time.time()
399 log_target = logger.info
400 if self._constraint and self._constraint.contains(constraint) \
401 and (expiration_time is None or now < expiration_time):
403 status = 'using cached'
405 else:
406 if self._constraint:
407 constraint_temp = copy.deepcopy(self._constraint)
408 constraint_temp.expand(constraint)
409 constraint = constraint_temp
411 try:
412 channel_sx = self._do_channel_query(constraint)
414 channel_sx.created = None # timestamp would ruin diff
416 fn = self._get_channels_path()
417 util.ensuredirs(fn)
418 fn_temp = fn + '.%i.temp' % os.getpid()
419 channel_sx.dump_xml(filename=fn_temp)
421 status = move_or_keep(fn_temp, fn)
423 if status == 'upstream unchanged':
424 squirrel.get_database().silent_touch(fn)
426 self._constraint = constraint
427 self._dump_constraint()
429 except OSError as e:
430 status = 'update failed (%s)' % str(e)
431 log_target = logger.error
433 expiration_time = self._get_channels_expiration_time()
434 self._log_meta(
435 '%s (%s)' % (status, self._str_expires(expiration_time, now)),
436 target=log_target)
438 fn = self._get_channels_path()
439 if os.path.exists(fn):
440 squirrel.add(fn)
442 def _do_channel_query(self, constraint):
443 extra_args = {}
445 if self.site in sites_not_supporting['startbefore']:
446 if constraint.tmin is not None and constraint.tmin != g_tmin:
447 extra_args['starttime'] = constraint.tmin
448 if constraint.tmax is not None and constraint.tmax != g_tmax:
449 extra_args['endtime'] = constraint.tmax
451 else:
452 if constraint.tmin is not None and constraint.tmin != g_tmin:
453 extra_args['endafter'] = constraint.tmin
454 if constraint.tmax is not None and constraint.tmax != g_tmax:
455 extra_args['startbefore'] = constraint.tmax
457 if self.site not in sites_not_supporting['includerestricted']:
458 extra_args.update(
459 includerestricted=(
460 self.user_credentials is not None
461 or self.auth_token is not None
462 or self.auth_token_path is not None))
464 if self.query_args is not None:
465 extra_args.update(self.query_args)
467 self._log_meta('querying...')
469 try:
470 channel_sx = fdsn.station(
471 site=self.site,
472 format='text',
473 level='channel',
474 **extra_args)
475 return channel_sx
477 except fdsn.EmptyResult:
478 return stationxml.FDSNStationXML(source='dummy-empty-result')
480 def _load_constraint(self):
481 fn = self._get_constraint_path()
482 if op.exists(fn):
483 with open(fn, 'rb') as f:
484 self._constraint = pickle.load(f)
485 else:
486 self._constraint = None
488 def _dump_constraint(self):
489 with open(self._get_constraint_path(), 'wb') as f:
490 pickle.dump(self._constraint, f, protocol=2)
492 def _get_expiration_time(self, path):
493 if self.expires is None:
494 return None
496 try:
497 t = os.stat(path)[8]
498 return t + self.expires
500 except OSError:
501 return 0.0
503 def _get_channels_expiration_time(self):
504 return self._get_expiration_time(self._get_channels_path())
506 def update_waveform_promises(self, squirrel, constraint):
507 from ..base import gaps
508 now = time.time()
509 cpath = os.path.abspath(self._get_channels_path())
511 ctmin = constraint.tmin
512 ctmax = constraint.tmax
514 nuts = squirrel.iter_nuts(
515 'channel',
516 path=cpath,
517 codes=constraint.codes,
518 tmin=ctmin,
519 tmax=ctmax)
521 coverages = squirrel.get_coverage(
522 'waveform',
523 codes_list=[constraint.codes] if constraint.codes else None,
524 tmin=ctmin,
525 tmax=ctmax)
527 codes_to_avail = defaultdict(list)
528 for coverage in coverages:
529 for tmin, tmax, _ in coverage.iter_spans():
530 codes_to_avail[coverage.codes].append((tmin, tmax))
532 def sgaps(nut):
533 for tmin, tmax in gaps(
534 codes_to_avail[nut.codes],
535 ctmin if ctmin is not None else nut.tmin,
536 ctmax if ctmax is not None else nut.tmax):
538 subnut = clone(nut)
539 subnut.tmin = tmin
540 subnut.tmax = tmax
541 yield subnut
543 def wanted(nuts):
544 for nut in nuts:
545 if nut.tmin < now:
546 if nut.tmax > now:
547 nut.tmax = now
549 for nut in sgaps(nut):
550 yield nut
552 path = self._source_id
553 squirrel.add_virtual(
554 (make_waveform_promise_nut(
555 file_path=path,
556 **nut.waveform_promise_kwargs) for nut in wanted(nuts)),
557 virtual_paths=[path])
559 def _get_user_credentials(self):
560 d = {}
561 if self.user_credentials is not None:
562 d['user'], d['passwd'] = self.user_credentials
564 if self.auth_token is not None or self.auth_token_path is not None:
565 d['token'] = self.get_auth_token()
567 return d
569 def download_waveforms(
570 self, orders, success, batch_add, error_permanent,
571 error_temporary):
573 elog = ErrorLog(site=self.site)
574 orders.sort(key=orders_sort_key)
575 neach = 20
576 i = 0
577 task = make_task(
578 'FDSN "%s" waveforms: downloading' % self.site, len(orders))
580 while i < len(orders):
581 orders_now = orders[i:i+neach]
582 selection_now = orders_to_selection(orders_now)
584 nsuccess = 0
585 elog.append_checkpoint()
586 self._log_info_data(
587 'downloading, %s' % order_summary(orders_now))
589 all_paths = []
590 with tempfile.TemporaryDirectory() as tmpdir:
591 try:
592 data = fdsn.dataselect(
593 site=self.site, selection=selection_now,
594 **self._get_user_credentials())
596 now = time.time()
598 path = op.join(tmpdir, 'tmp.mseed')
599 with open(path, 'wb') as f:
600 while True:
601 buf = data.read(1024)
602 if not buf:
603 break
604 f.write(buf)
606 trs = io.load(path)
608 by_nslc = defaultdict(list)
609 for tr in trs:
610 by_nslc[tr.nslc_id].append(tr)
612 for order in orders_now:
613 trs_order = []
614 err_this = None
615 for tr in by_nslc[order.codes.nslc]:
616 try:
617 order.validate(tr)
618 trs_order.append(tr.chop(
619 order.tmin, order.tmax, inplace=False))
621 except trace.NoData:
622 err_this = (
623 'empty result', 'empty sub-interval')
625 except InvalidWaveform as e:
626 err_this = ('invalid waveform', str(e))
628 if len(trs_order) == 0:
629 if err_this is None:
630 err_this = ('empty result', '')
632 elog.append(now, order, *err_this)
633 error_permanent(order)
634 else:
635 if len(trs_order) != 1:
636 if err_this:
637 elog.append(
638 now, order,
639 'partial result, %s' % err_this[0],
640 err_this[1])
641 else:
642 elog.append(now, order, 'partial result')
644 paths = self._archive.add(trs_order)
645 all_paths.extend(paths)
647 nsuccess += 1
648 success(order)
650 except fdsn.EmptyResult:
651 now = time.time()
652 for order in orders_now:
653 elog.append(now, order, 'empty result')
654 error_permanent(order)
656 except util.HTTPError as e:
657 now = time.time()
658 for order in orders_now:
659 elog.append(now, order, 'http error', str(e))
660 error_temporary(order)
662 emessage = elog.summarize_recent()
663 self._log_info_data(
664 '%i download%s successful' % (
665 nsuccess, util.plural_s(nsuccess))
666 + (', %s' % emessage if emessage else ''))
668 if all_paths:
669 batch_add(all_paths)
671 i += neach
672 task.update(i)
674 for agg in elog.iter_aggregates():
675 logger.warning(str(agg))
677 task.done()
679 def _do_response_query(self, selection):
680 extra_args = {}
682 if self.site not in sites_not_supporting['includerestricted']:
683 extra_args.update(
684 includerestricted=(
685 self.user_credentials is not None
686 or self.auth_token is not None
687 or self.auth_token_path is not None))
689 self._log_responses('querying...')
691 try:
692 response_sx = fdsn.station(
693 site=self.site,
694 level='response',
695 selection=selection,
696 **extra_args)
698 return response_sx
700 except fdsn.EmptyResult:
701 return stationxml.FDSNStationXML(source='dummy-empty-result')
703 def update_response_inventory(self, squirrel, constraint):
704 cpath = os.path.abspath(self._get_channels_path())
705 nuts = squirrel.iter_nuts(
706 'channel', path=cpath, codes=constraint.codes)
708 tmin = g_tmin_queries
709 tmax = g_tmax
711 selection = []
712 now = time.time()
713 have = set()
714 status = defaultdict(list)
715 for nut in nuts:
716 nslc = nut.codes.nslc
717 if nslc in have:
718 continue
719 have.add(nslc)
721 fn = self._get_responses_path(nslc)
722 expiration_time = self._get_expiration_time(fn)
723 if os.path.exists(fn) \
724 and (expiration_time is None or now < expiration_time):
725 status['using cached'].append(nslc)
726 else:
727 selection.append(nslc + (tmin, tmax))
729 dummy = stationxml.FDSNStationXML(source='dummy-empty')
730 neach = 100
731 i = 0
732 fns = []
733 while i < len(selection):
734 selection_now = selection[i:i+neach]
735 i += neach
737 try:
738 sx = self._do_response_query(selection_now)
739 except Exception as e:
740 status['update failed (%s)' % str(e)].extend(
741 entry[:4] for entry in selection_now)
742 continue
744 sx.created = None # timestamp would ruin diff
746 by_nslc = dict(stationxml.split_channels(sx))
748 for entry in selection_now:
749 nslc = entry[:4]
750 response_sx = by_nslc.get(nslc, dummy)
751 try:
752 fn = self._get_responses_path(nslc)
753 fn_temp = fn + '.%i.temp' % os.getpid()
755 util.ensuredirs(fn_temp)
756 response_sx.dump_xml(filename=fn_temp)
758 status_this = move_or_keep(fn_temp, fn)
760 if status_this == 'upstream unchanged':
761 try:
762 squirrel.get_database().silent_touch(fn)
763 except ExecuteGet1Error:
764 pass
766 status[status_this].append(nslc)
767 fns.append(fn)
769 except OSError as e:
770 status['update failed (%s)' % str(e)].append(nslc)
772 for k in sorted(status):
773 if k.find('failed') != -1:
774 log_target = logger.error
775 else:
776 log_target = logger.info
778 self._log_responses(
779 '%s: %s' % (
780 k, codes_to_str_abbreviated(status[k])),
781 target=log_target)
783 squirrel.add(fns, kinds=['response'])
786__all__ = [
787 'FDSNSource',
788]