1# http://pyrocko.org - GPLv3
2#
3# The Pyrocko Developers, 21st Century
4# ---|P------/S----------~Lg----------
6import time
7import os
8import copy
9import logging
10import tempfile
11import importlib.util
12from collections import defaultdict
13try:
14 import cPickle as pickle
15except ImportError:
16 import pickle
17import os.path as op
18from .base import Source, Constraint
19from ..model import make_waveform_promise_nut, ehash, InvalidWaveform, \
20 order_summary, WaveformOrder, g_tmin, g_tmax, g_tmin_queries, \
21 codes_to_str_abbreviated, CodesNSLCE
22from ..database import ExecuteGet1Error
23from pyrocko.client import fdsn
25from pyrocko import util, trace, io
26from pyrocko.io.io_common import FileLoadError
27from pyrocko.io import stationxml
28from pyrocko.progress import progress
29from pyrocko import has_paths
31from pyrocko.guts import Object, String, Timestamp, List, Tuple, Int, Dict, \
32 Duration, Bool, clone
34guts_prefix = 'squirrel'
36fdsn.g_timeout = 60.
38logger = logging.getLogger('psq.client.fdsn')
40sites_not_supporting = {
41 'startbefore': ['geonet'],
42 'includerestricted': ['geonet']}
45def make_task(*args):
46 return progress.task(*args, logger=logger)
49def diff(fn_a, fn_b):
50 try:
51 if os.stat(fn_a).st_size != os.stat(fn_b).st_size:
52 return True
54 except OSError:
55 return True
57 with open(fn_a, 'rb') as fa:
58 with open(fn_b, 'rb') as fb:
59 while True:
60 a = fa.read(1024)
61 b = fb.read(1024)
62 if a != b:
63 return True
65 if len(a) == 0 or len(b) == 0:
66 return False
69def move_or_keep(fn_temp, fn):
70 if op.exists(fn):
71 if diff(fn, fn_temp):
72 os.rename(fn_temp, fn)
73 status = 'updated'
74 else:
75 os.unlink(fn_temp)
76 status = 'upstream unchanged'
78 else:
79 os.rename(fn_temp, fn)
80 status = 'new'
82 return status
85class Archive(Object):
87 def add(self):
88 raise NotImplementedError()
91class MSeedArchive(Archive):
92 template = String.T(default=op.join(
93 '%(tmin_year)s',
94 '%(tmin_month)s',
95 '%(tmin_day)s',
96 'trace_%(network)s_%(station)s_%(location)s_%(channel)s'
97 + '_%(tmin_us)s_%(tmax_us)s.mseed'))
99 def __init__(self, **kwargs):
100 Archive.__init__(self, **kwargs)
101 self._base_path = None
103 def set_base_path(self, path):
104 self._base_path = path
106 def add(self, trs):
107 path = op.join(self._base_path, self.template)
108 return io.save(trs, path, overwrite=True)
111def combine_selections(selection):
112 out = []
113 last = None
114 for this in selection:
115 if last and this[:4] == last[:4] and this[4] == last[5]:
116 last = last[:5] + (this[5],)
117 else:
118 if last:
119 out.append(last)
121 last = this
123 if last:
124 out.append(last)
126 return out
129def orders_sort_key(order):
130 return (order.codes, order.tmin)
133def orders_to_selection(orders):
134 selection = []
135 for order in sorted(orders, key=orders_sort_key):
136 selection.append(
137 order.codes.nslc + (order.tmin, order.tmax))
139 return combine_selections(selection)
142class ErrorEntry(Object):
143 time = Timestamp.T()
144 order = WaveformOrder.T()
145 kind = String.T()
146 details = String.T(optional=True)
149class ErrorAggregate(Object):
150 site = String.T()
151 kind = String.T()
152 details = String.T()
153 entries = List.T(ErrorEntry.T())
154 codes = List.T(CodesNSLCE.T())
155 time_spans = List.T(Tuple.T(2, Timestamp.T()))
157 def __str__(self):
158 codes = [str(x) for x in self.codes]
159 scodes = '\n' + util.ewrap(codes, indent=' ') if codes else '<none>'
160 tss = self.time_spans
161 sspans = '\n' + util.ewrap(('%s - %s' % (
162 util.time_to_str(ts[0]), util.time_to_str(ts[1])) for ts in tss),
163 indent=' ')
165 return ('FDSN "%s": download error summary for "%s" (%i)\n%s '
166 'Codes:%s\n Time spans:%s') % (
167 self.site,
168 self.kind,
169 len(self.entries),
170 ' Details: %s\n' % self.details if self.details else '',
171 scodes,
172 sspans)
175class ErrorLog(Object):
176 site = String.T()
177 entries = List.T(ErrorEntry.T())
178 checkpoints = List.T(Int.T())
180 def append_checkpoint(self):
181 self.checkpoints.append(len(self.entries))
183 def append(self, time, order, kind, details=''):
184 entry = ErrorEntry(time=time, order=order, kind=kind, details=details)
185 self.entries.append(entry)
187 def iter_aggregates(self):
188 by_kind_details = defaultdict(list)
189 for entry in self.entries:
190 by_kind_details[entry.kind, entry.details].append(entry)
192 kind_details = sorted(by_kind_details.keys())
194 for kind, details in kind_details:
195 entries = by_kind_details[kind, details]
196 codes = sorted(set(entry.order.codes for entry in entries))
197 selection = orders_to_selection(entry.order for entry in entries)
198 time_spans = sorted(set(row[-2:] for row in selection))
199 yield ErrorAggregate(
200 site=self.site,
201 kind=kind,
202 details=details,
203 entries=entries,
204 codes=codes,
205 time_spans=time_spans)
207 def summarize_recent(self):
208 ioff = self.checkpoints[-1] if self.checkpoints else 0
209 recent = self.entries[ioff:]
210 kinds = sorted(set(entry.kind for entry in recent))
211 if recent:
212 return '%i error%s (%s)' % (
213 len(recent), util.plural_s(recent), '; '.join(kinds))
214 else:
215 return ''
218class FDSNSource(Source, has_paths.HasPaths):
220 '''
221 Squirrel data-source to transparently get data from FDSN web services.
223 Attaching an :py:class:`FDSNSource` object to a :py:class:`Squirrel` allows
224 the latter to download station and waveform data from an FDSN web service
225 should the data not already happen to be available locally.
226 '''
228 site = String.T(
229 help='FDSN site url or alias name (see '
230 ':py:mod:`pyrocko.client.fdsn`).')
232 query_args = Dict.T(
233 String.T(), String.T(),
234 optional=True,
235 help='Common query arguments, which are appended to all queries.')
237 expires = Duration.T(
238 optional=True,
239 help='Expiration time [s]. Information older than this will be '
240 'refreshed. This only applies to station-metadata. Waveforms do '
241 'not expire. If set to ``None`` neither type of data expires.')
243 cache_path = String.T(
244 optional=True,
245 help='Directory path where any downloaded waveforms and station '
246 'meta-data are to be kept. By default the Squirrel '
247 'environment\'s cache directory is used.')
249 shared_waveforms = Bool.T(
250 default=False,
251 help='If ``True``, waveforms are shared with other FDSN sources in '
252 'the same Squirrel environment. If ``False``, they are kept '
253 'separate.')
255 user_credentials = Tuple.T(
256 2, String.T(),
257 optional=True,
258 help='User and password for FDSN servers requiring password '
259 'authentication')
261 auth_token = String.T(
262 optional=True,
263 help='Authentication token to be presented to the FDSN server.')
265 auth_token_path = String.T(
266 optional=True,
267 help='Path to file containing the authentication token to be '
268 'presented to the FDSN server.')
270 hotfix_module_path = has_paths.Path.T(
271 optional=True,
272 help='Path to Python module to locally patch metadata errors.')
274 def __init__(self, site, query_args=None, **kwargs):
275 Source.__init__(self, site=site, query_args=query_args, **kwargs)
277 self._constraint = None
278 self._hash = self.make_hash()
279 self._source_id = 'client:fdsn:%s' % self._hash
280 self._error_infos = []
282 def describe(self):
283 return self._source_id
285 def make_hash(self):
286 s = self.site
287 s += 'notoken' \
288 if (self.auth_token is None and self.auth_token_path is None) \
289 else 'token'
291 if self.user_credentials is not None:
292 s += self.user_credentials[0]
293 else:
294 s += 'nocred'
296 if self.query_args is not None:
297 s += ','.join(
298 '%s:%s' % (k, self.query_args[k])
299 for k in sorted(self.query_args.keys()))
300 else:
301 s += 'noqueryargs'
303 return ehash(s)
305 def get_hash(self):
306 return self._hash
308 def get_auth_token(self):
309 if self.auth_token:
310 return self.auth_token
312 elif self.auth_token_path is not None:
313 try:
314 with open(self.auth_token_path, 'rb') as f:
315 return f.read().decode('ascii')
317 except OSError as e:
318 raise FileLoadError(
319 'Cannot load auth token file (%s): %s'
320 % (str(e), self.auth_token_path))
322 else:
323 raise Exception(
324 'FDSNSource: auth_token and auth_token_path are mutually '
325 'exclusive.')
327 def setup(self, squirrel, check=True):
328 self._cache_path = op.join(
329 self.cache_path or squirrel._cache_path, 'fdsn')
331 util.ensuredir(self._cache_path)
332 self._load_constraint()
333 self._archive = MSeedArchive()
334 waveforms_path = self._get_waveforms_path()
335 util.ensuredir(waveforms_path)
336 self._archive.set_base_path(waveforms_path)
338 squirrel.add(
339 self._get_waveforms_path(),
340 check=check)
342 fn = self._get_channels_path()
343 if os.path.exists(fn):
344 squirrel.add(fn)
346 squirrel.add_virtual(
347 [], virtual_paths=[self._source_id])
349 responses_path = self._get_responses_path()
350 if os.path.exists(responses_path):
351 squirrel.add(responses_path, kinds=['response'])
353 self._hotfix_module = None
355 def _hotfix(self, query_type, sx):
356 if self.hotfix_module_path is None:
357 return
359 if self._hotfix_module is None:
360 module_path = self.expand_path(self.hotfix_module_path)
361 spec = importlib.util.spec_from_file_location(
362 'hotfix_' + self._hash, module_path)
363 self._hotfix_module = importlib.util.module_from_spec(spec)
364 spec.loader.exec_module(self._hotfix_module)
366 hook = getattr(
367 self._hotfix_module, 'stationxml_' + query_type + '_hook')
369 return hook(sx)
371 def _get_constraint_path(self):
372 return op.join(self._cache_path, self._hash, 'constraint.pickle')
374 def _get_channels_path(self):
375 return op.join(self._cache_path, self._hash, 'channels.stationxml')
377 def _get_responses_path(self, nslc=None):
378 dirpath = op.join(
379 self._cache_path, self._hash, 'responses')
381 if nslc is None:
382 return dirpath
383 else:
384 return op.join(
385 dirpath, 'response_%s_%s_%s_%s.stationxml' % nslc)
387 def _get_waveforms_path(self):
388 if self.shared_waveforms:
389 return op.join(self._cache_path, 'waveforms')
390 else:
391 return op.join(self._cache_path, self._hash, 'waveforms')
393 def _log_meta(self, message, target=logger.info):
394 log_prefix = 'FDSN "%s" metadata:' % self.site
395 target(' '.join((log_prefix, message)))
397 def _log_responses(self, message, target=logger.info):
398 log_prefix = 'FDSN "%s" responses:' % self.site
399 target(' '.join((log_prefix, message)))
401 def _log_info_data(self, *args):
402 log_prefix = 'FDSN "%s" waveforms:' % self.site
403 logger.info(' '.join((log_prefix,) + args))
405 def _str_expires(self, t, now):
406 if t is None:
407 return 'expires: never'
408 else:
409 expire = 'expires' if t > now else 'expired'
410 return '%s: %s' % (
411 expire,
412 util.time_to_str(t, format='%Y-%m-%d %H:%M:%S'))
414 def update_channel_inventory(self, squirrel, constraint=None):
415 if constraint is None:
416 constraint = Constraint()
418 expiration_time = self._get_channels_expiration_time()
419 now = time.time()
421 log_target = logger.info
422 if self._constraint and self._constraint.contains(constraint) \
423 and (expiration_time is None or now < expiration_time):
425 status = 'using cached'
427 else:
428 if self._constraint:
429 constraint_temp = copy.deepcopy(self._constraint)
430 constraint_temp.expand(constraint)
431 constraint = constraint_temp
433 try:
434 channel_sx = self._do_channel_query(constraint)
436 channel_sx.created = None # timestamp would ruin diff
438 fn = self._get_channels_path()
439 util.ensuredirs(fn)
440 fn_temp = fn + '.%i.temp' % os.getpid()
441 channel_sx.dump_xml(filename=fn_temp)
443 status = move_or_keep(fn_temp, fn)
445 if status == 'upstream unchanged':
446 squirrel.get_database().silent_touch(fn)
448 self._constraint = constraint
449 self._dump_constraint()
451 except OSError as e:
452 status = 'update failed (%s)' % str(e)
453 log_target = logger.error
455 expiration_time = self._get_channels_expiration_time()
456 self._log_meta(
457 '%s (%s)' % (status, self._str_expires(expiration_time, now)),
458 target=log_target)
460 fn = self._get_channels_path()
461 if os.path.exists(fn):
462 squirrel.add(fn)
464 def _do_channel_query(self, constraint):
465 extra_args = {}
467 if self.site in sites_not_supporting['startbefore']:
468 if constraint.tmin is not None and constraint.tmin != g_tmin:
469 extra_args['starttime'] = constraint.tmin
470 if constraint.tmax is not None and constraint.tmax != g_tmax:
471 extra_args['endtime'] = constraint.tmax
473 else:
474 if constraint.tmin is not None and constraint.tmin != g_tmin:
475 extra_args['endafter'] = constraint.tmin
476 if constraint.tmax is not None and constraint.tmax != g_tmax:
477 extra_args['startbefore'] = constraint.tmax
479 if self.site not in sites_not_supporting['includerestricted']:
480 extra_args.update(
481 includerestricted=(
482 self.user_credentials is not None
483 or self.auth_token is not None
484 or self.auth_token_path is not None))
486 if self.query_args is not None:
487 extra_args.update(self.query_args)
489 self._log_meta('querying...')
491 try:
492 channel_sx = fdsn.station(
493 site=self.site,
494 format='text',
495 level='channel',
496 **extra_args)
498 self._hotfix('channel', channel_sx)
500 return channel_sx
502 except fdsn.EmptyResult:
503 return stationxml.FDSNStationXML(source='dummy-empty-result')
505 def _load_constraint(self):
506 fn = self._get_constraint_path()
507 if op.exists(fn):
508 with open(fn, 'rb') as f:
509 self._constraint = pickle.load(f)
510 else:
511 self._constraint = None
513 def _dump_constraint(self):
514 with open(self._get_constraint_path(), 'wb') as f:
515 pickle.dump(self._constraint, f, protocol=2)
517 def _get_expiration_time(self, path):
518 if self.expires is None:
519 return None
521 try:
522 t = os.stat(path)[8]
523 return t + self.expires
525 except OSError:
526 return 0.0
528 def _get_channels_expiration_time(self):
529 return self._get_expiration_time(self._get_channels_path())
531 def update_waveform_promises(self, squirrel, constraint):
532 from ..base import gaps
533 now = time.time()
534 cpath = os.path.abspath(self._get_channels_path())
536 ctmin = constraint.tmin
537 ctmax = constraint.tmax
539 nuts = squirrel.iter_nuts(
540 'channel',
541 path=cpath,
542 codes=constraint.codes,
543 tmin=ctmin,
544 tmax=ctmax)
546 coverages = squirrel.get_coverage(
547 'waveform',
548 codes=constraint.codes if constraint.codes else None,
549 tmin=ctmin,
550 tmax=ctmax)
552 codes_to_avail = defaultdict(list)
553 for coverage in coverages:
554 for tmin, tmax, _ in coverage.iter_spans():
555 codes_to_avail[coverage.codes].append((tmin, tmax))
557 def sgaps(nut):
558 for tmin, tmax in gaps(
559 codes_to_avail[nut.codes],
560 max(ctmin, nut.tmin) if ctmin is not None else nut.tmin,
561 min(ctmax, nut.tmax) if ctmax is not None else nut.tmax):
563 subnut = clone(nut)
564 subnut.tmin = tmin
565 subnut.tmax = tmax
566 yield subnut
568 def wanted(nuts):
569 for nut in nuts:
570 if nut.tmin < now:
571 if nut.tmax > now:
572 nut.tmax = now
574 for nut in sgaps(nut):
575 yield nut
577 path = self._source_id
578 squirrel.add_virtual(
579 (make_waveform_promise_nut(
580 file_path=path,
581 **nut.waveform_promise_kwargs) for nut in wanted(nuts)),
582 virtual_paths=[path])
584 def remove_waveform_promises(self, squirrel, from_database='selection'):
585 '''
586 Remove waveform promises from live selection or global database.
588 :param from_database:
589 Remove from live selection ``'selection'`` or global database
590 ``'global'``.
591 '''
593 path = self._source_id
594 if from_database == 'selection':
595 squirrel.remove(path)
596 elif from_database == 'global':
597 squirrel.get_database().remove(path)
598 else:
599 raise ValueError(
600 'Values allowed for from_database: ("selection", "global")')
602 def _get_user_credentials(self):
603 d = {}
604 if self.user_credentials is not None:
605 d['user'], d['passwd'] = self.user_credentials
607 if self.auth_token is not None or self.auth_token_path is not None:
608 d['token'] = self.get_auth_token()
610 return d
612 def download_waveforms(
613 self, orders, success, batch_add, error_permanent,
614 error_temporary):
616 elog = ErrorLog(site=self.site)
617 orders.sort(key=orders_sort_key)
618 neach = 20
619 i = 0
620 task = make_task(
621 'FDSN "%s" waveforms: downloading' % self.site, len(orders))
623 while i < len(orders):
624 orders_now = orders[i:i+neach]
625 selection_now = orders_to_selection(orders_now)
627 nsuccess = 0
628 elog.append_checkpoint()
629 self._log_info_data(
630 'downloading, %s' % order_summary(orders_now))
632 all_paths = []
633 with tempfile.TemporaryDirectory() as tmpdir:
634 try:
635 data = fdsn.dataselect(
636 site=self.site, selection=selection_now,
637 **self._get_user_credentials())
639 now = time.time()
641 path = op.join(tmpdir, 'tmp.mseed')
642 with open(path, 'wb') as f:
643 while True:
644 buf = data.read(1024)
645 if not buf:
646 break
647 f.write(buf)
649 trs = io.load(path)
651 by_nslc = defaultdict(list)
652 for tr in trs:
653 by_nslc[tr.nslc_id].append(tr)
655 for order in orders_now:
656 trs_order = []
657 err_this = None
658 for tr in by_nslc[order.codes.nslc]:
659 try:
660 order.validate(tr)
661 trs_order.append(tr.chop(
662 order.tmin, order.tmax, inplace=False))
664 except trace.NoData:
665 err_this = (
666 'empty result', 'empty sub-interval')
668 except InvalidWaveform as e:
669 err_this = ('invalid waveform', str(e))
671 if len(trs_order) == 0:
672 if err_this is None:
673 err_this = ('empty result', '')
675 elog.append(now, order, *err_this)
676 error_permanent(order)
677 else:
678 if len(trs_order) != 1:
679 if err_this:
680 elog.append(
681 now, order,
682 'partial result, %s' % err_this[0],
683 err_this[1])
684 else:
685 elog.append(now, order, 'partial result')
687 paths = self._archive.add(trs_order)
688 all_paths.extend(paths)
690 nsuccess += 1
691 success(order)
693 except fdsn.EmptyResult:
694 now = time.time()
695 for order in orders_now:
696 elog.append(now, order, 'empty result')
697 error_permanent(order)
699 except util.HTTPError as e:
700 now = time.time()
701 for order in orders_now:
702 elog.append(now, order, 'http error', str(e))
703 error_temporary(order)
705 emessage = elog.summarize_recent()
706 self._log_info_data(
707 '%i download%s successful' % (
708 nsuccess, util.plural_s(nsuccess))
709 + (', %s' % emessage if emessage else ''))
711 if all_paths:
712 batch_add(all_paths)
714 i += neach
715 task.update(i)
717 for agg in elog.iter_aggregates():
718 logger.warning(str(agg))
720 task.done()
722 def _do_response_query(self, selection):
723 extra_args = {}
725 if self.site not in sites_not_supporting['includerestricted']:
726 extra_args.update(
727 includerestricted=(
728 self.user_credentials is not None
729 or self.auth_token is not None
730 or self.auth_token_path is not None))
732 self._log_responses('querying...')
734 try:
735 response_sx = fdsn.station(
736 site=self.site,
737 level='response',
738 selection=selection,
739 **extra_args)
741 self._hotfix('response', response_sx)
742 return response_sx
744 except fdsn.EmptyResult:
745 return stationxml.FDSNStationXML(source='dummy-empty-result')
747 def update_response_inventory(self, squirrel, constraint):
748 cpath = os.path.abspath(self._get_channels_path())
749 nuts = squirrel.iter_nuts(
750 'channel', path=cpath, codes=constraint.codes)
752 tmin = g_tmin_queries
753 tmax = g_tmax
755 selection = []
756 now = time.time()
757 have = set()
758 status = defaultdict(list)
759 for nut in nuts:
760 nslc = nut.codes.nslc
761 if nslc in have:
762 continue
763 have.add(nslc)
765 fn = self._get_responses_path(nslc)
766 expiration_time = self._get_expiration_time(fn)
767 if os.path.exists(fn) \
768 and (expiration_time is None or now < expiration_time):
769 status['using cached'].append(nslc)
770 else:
771 selection.append(nslc + (tmin, tmax))
773 dummy = stationxml.FDSNStationXML(source='dummy-empty')
774 neach = 100
775 i = 0
776 fns = []
777 while i < len(selection):
778 selection_now = selection[i:i+neach]
779 i += neach
781 try:
782 sx = self._do_response_query(selection_now)
783 except Exception as e:
784 status['update failed (%s)' % str(e)].extend(
785 entry[:4] for entry in selection_now)
786 continue
788 sx.created = None # timestamp would ruin diff
790 by_nslc = dict(stationxml.split_channels(sx))
792 for entry in selection_now:
793 nslc = entry[:4]
794 response_sx = by_nslc.get(nslc, dummy)
795 try:
796 fn = self._get_responses_path(nslc)
797 fn_temp = fn + '.%i.temp' % os.getpid()
799 util.ensuredirs(fn_temp)
800 response_sx.dump_xml(filename=fn_temp)
802 status_this = move_or_keep(fn_temp, fn)
804 if status_this == 'upstream unchanged':
805 try:
806 squirrel.get_database().silent_touch(fn)
807 except ExecuteGet1Error:
808 pass
810 status[status_this].append(nslc)
811 fns.append(fn)
813 except OSError as e:
814 status['update failed (%s)' % str(e)].append(nslc)
816 for k in sorted(status):
817 if k.find('failed') != -1:
818 log_target = logger.error
819 else:
820 log_target = logger.info
822 self._log_responses(
823 '%s: %s' % (
824 k, codes_to_str_abbreviated(
825 CodesNSLCE(tup) for tup in status[k])),
826 target=log_target)
828 if fns:
829 squirrel.add(fns, kinds=['response'])
832__all__ = [
833 'FDSNSource',
834]