1# http://pyrocko.org - GPLv3
2#
3# The Pyrocko Developers, 21st Century
4# ---|P------/S----------~Lg----------
6from __future__ import absolute_import, print_function
8import time
9import os
10import copy
11import logging
12import tempfile
13import importlib.util
14from collections import defaultdict
15try:
16 import cPickle as pickle
17except ImportError:
18 import pickle
19import os.path as op
20from .base import Source, Constraint
21from ..model import make_waveform_promise_nut, ehash, InvalidWaveform, \
22 order_summary, WaveformOrder, g_tmin, g_tmax, g_tmin_queries, \
23 codes_to_str_abbreviated, CodesNSLCE
24from ..database import ExecuteGet1Error
25from pyrocko.client import fdsn
27from pyrocko import util, trace, io
28from pyrocko.io.io_common import FileLoadError
29from pyrocko.io import stationxml
30from pyrocko.progress import progress
31from pyrocko import has_paths
33from pyrocko.guts import Object, String, Timestamp, List, Tuple, Int, Dict, \
34 Duration, Bool, clone
36guts_prefix = 'squirrel'
38fdsn.g_timeout = 60.
40logger = logging.getLogger('psq.client.fdsn')
42sites_not_supporting = {
43 'startbefore': ['geonet'],
44 'includerestricted': ['geonet']}
47def make_task(*args):
48 return progress.task(*args, logger=logger)
51def diff(fn_a, fn_b):
52 try:
53 if os.stat(fn_a).st_size != os.stat(fn_b).st_size:
54 return True
56 except OSError:
57 return True
59 with open(fn_a, 'rb') as fa:
60 with open(fn_b, 'rb') as fb:
61 while True:
62 a = fa.read(1024)
63 b = fb.read(1024)
64 if a != b:
65 return True
67 if len(a) == 0 or len(b) == 0:
68 return False
71def move_or_keep(fn_temp, fn):
72 if op.exists(fn):
73 if diff(fn, fn_temp):
74 os.rename(fn_temp, fn)
75 status = 'updated'
76 else:
77 os.unlink(fn_temp)
78 status = 'upstream unchanged'
80 else:
81 os.rename(fn_temp, fn)
82 status = 'new'
84 return status
87class Archive(Object):
89 def add(self):
90 raise NotImplementedError()
93class MSeedArchive(Archive):
94 template = String.T(default=op.join(
95 '%(tmin_year)s',
96 '%(tmin_month)s',
97 '%(tmin_day)s',
98 'trace_%(network)s_%(station)s_%(location)s_%(channel)s'
99 + '_%(tmin_us)s_%(tmax_us)s.mseed'))
101 def __init__(self, **kwargs):
102 Archive.__init__(self, **kwargs)
103 self._base_path = None
105 def set_base_path(self, path):
106 self._base_path = path
108 def add(self, trs):
109 path = op.join(self._base_path, self.template)
110 return io.save(trs, path, overwrite=True)
113def combine_selections(selection):
114 out = []
115 last = None
116 for this in selection:
117 if last and this[:4] == last[:4] and this[4] == last[5]:
118 last = last[:5] + (this[5],)
119 else:
120 if last:
121 out.append(last)
123 last = this
125 if last:
126 out.append(last)
128 return out
131def orders_sort_key(order):
132 return (order.codes, order.tmin)
135def orders_to_selection(orders):
136 selection = []
137 for order in sorted(orders, key=orders_sort_key):
138 selection.append(
139 order.codes.nslc + (order.tmin, order.tmax))
141 return combine_selections(selection)
144class ErrorEntry(Object):
145 time = Timestamp.T()
146 order = WaveformOrder.T()
147 kind = String.T()
148 details = String.T(optional=True)
151class ErrorAggregate(Object):
152 site = String.T()
153 kind = String.T()
154 details = String.T()
155 entries = List.T(ErrorEntry.T())
156 codes = List.T(CodesNSLCE.T())
157 time_spans = List.T(Tuple.T(2, Timestamp.T()))
159 def __str__(self):
160 codes = [str(x) for x in self.codes]
161 scodes = '\n' + util.ewrap(codes, indent=' ') if codes else '<none>'
162 tss = self.time_spans
163 sspans = '\n' + util.ewrap(('%s - %s' % (
164 util.time_to_str(ts[0]), util.time_to_str(ts[1])) for ts in tss),
165 indent=' ')
167 return ('FDSN "%s": download error summary for "%s" (%i)\n%s '
168 'Codes:%s\n Time spans:%s') % (
169 self.site,
170 self.kind,
171 len(self.entries),
172 ' Details: %s\n' % self.details if self.details else '',
173 scodes,
174 sspans)
177class ErrorLog(Object):
178 site = String.T()
179 entries = List.T(ErrorEntry.T())
180 checkpoints = List.T(Int.T())
182 def append_checkpoint(self):
183 self.checkpoints.append(len(self.entries))
185 def append(self, time, order, kind, details=''):
186 entry = ErrorEntry(time=time, order=order, kind=kind, details=details)
187 self.entries.append(entry)
189 def iter_aggregates(self):
190 by_kind_details = defaultdict(list)
191 for entry in self.entries:
192 by_kind_details[entry.kind, entry.details].append(entry)
194 kind_details = sorted(by_kind_details.keys())
196 for kind, details in kind_details:
197 entries = by_kind_details[kind, details]
198 codes = sorted(set(entry.order.codes for entry in entries))
199 selection = orders_to_selection(entry.order for entry in entries)
200 time_spans = sorted(set(row[-2:] for row in selection))
201 yield ErrorAggregate(
202 site=self.site,
203 kind=kind,
204 details=details,
205 entries=entries,
206 codes=codes,
207 time_spans=time_spans)
209 def summarize_recent(self):
210 ioff = self.checkpoints[-1] if self.checkpoints else 0
211 recent = self.entries[ioff:]
212 kinds = sorted(set(entry.kind for entry in recent))
213 if recent:
214 return '%i error%s (%s)' % (
215 len(recent), util.plural_s(recent), '; '.join(kinds))
216 else:
217 return ''
220class FDSNSource(Source, has_paths.HasPaths):
222 '''
223 Squirrel data-source to transparently get data from FDSN web services.
225 Attaching an :py:class:`FDSNSource` object to a :py:class:`Squirrel` allows
226 the latter to download station and waveform data from an FDSN web service
227 should the data not already happen to be available locally.
228 '''
230 site = String.T(
231 help='FDSN site url or alias name (see '
232 ':py:mod:`pyrocko.client.fdsn`).')
234 query_args = Dict.T(
235 String.T(), String.T(),
236 optional=True,
237 help='Common query arguments, which are appended to all queries.')
239 expires = Duration.T(
240 optional=True,
241 help='Expiration time [s]. Information older than this will be '
242 'refreshed. This only applies to station-metadata. Waveforms do '
243 'not expire. If set to ``None`` neither type of data expires.')
245 cache_path = String.T(
246 optional=True,
247 help='Directory path where any downloaded waveforms and station '
248 'meta-data are to be kept. By default the Squirrel '
249 'environment\'s cache directory is used.')
251 shared_waveforms = Bool.T(
252 default=False,
253 help='If ``True``, waveforms are shared with other FDSN sources in '
254 'the same Squirrel environment. If ``False``, they are kept '
255 'separate.')
257 user_credentials = Tuple.T(
258 2, String.T(),
259 optional=True,
260 help='User and password for FDSN servers requiring password '
261 'authentication')
263 auth_token = String.T(
264 optional=True,
265 help='Authentication token to be presented to the FDSN server.')
267 auth_token_path = String.T(
268 optional=True,
269 help='Path to file containing the authentication token to be '
270 'presented to the FDSN server.')
272 hotfix_module_path = has_paths.Path.T(
273 optional=True,
274 help='Path to Python module to locally patch metadata errors.')
276 def __init__(self, site, query_args=None, **kwargs):
277 Source.__init__(self, site=site, query_args=query_args, **kwargs)
279 self._constraint = None
280 self._hash = self.make_hash()
281 self._source_id = 'client:fdsn:%s' % self._hash
282 self._error_infos = []
284 def describe(self):
285 return self._source_id
287 def make_hash(self):
288 s = self.site
289 s += 'notoken' \
290 if (self.auth_token is None and self.auth_token_path is None) \
291 else 'token'
293 if self.user_credentials is not None:
294 s += self.user_credentials[0]
295 else:
296 s += 'nocred'
298 if self.query_args is not None:
299 s += ','.join(
300 '%s:%s' % (k, self.query_args[k])
301 for k in sorted(self.query_args.keys()))
302 else:
303 s += 'noqueryargs'
305 return ehash(s)
307 def get_hash(self):
308 return self._hash
310 def get_auth_token(self):
311 if self.auth_token:
312 return self.auth_token
314 elif self.auth_token_path is not None:
315 try:
316 with open(self.auth_token_path, 'rb') as f:
317 return f.read().decode('ascii')
319 except OSError as e:
320 raise FileLoadError(
321 'Cannot load auth token file (%s): %s'
322 % (str(e), self.auth_token_path))
324 else:
325 raise Exception(
326 'FDSNSource: auth_token and auth_token_path are mutually '
327 'exclusive.')
329 def setup(self, squirrel, check=True):
330 self._cache_path = op.join(
331 self.cache_path or squirrel._cache_path, 'fdsn')
333 util.ensuredir(self._cache_path)
334 self._load_constraint()
335 self._archive = MSeedArchive()
336 waveforms_path = self._get_waveforms_path()
337 util.ensuredir(waveforms_path)
338 self._archive.set_base_path(waveforms_path)
340 squirrel.add(
341 self._get_waveforms_path(),
342 check=check)
344 fn = self._get_channels_path()
345 if os.path.exists(fn):
346 squirrel.add(fn)
348 squirrel.add_virtual(
349 [], virtual_paths=[self._source_id])
351 responses_path = self._get_responses_path()
352 if os.path.exists(responses_path):
353 squirrel.add(responses_path, kinds=['response'])
355 self._hotfix_module = None
357 def _hotfix(self, query_type, sx):
358 if self.hotfix_module_path is None:
359 return
361 if self._hotfix_module is None:
362 module_path = self.expand_path(self.hotfix_module_path)
363 spec = importlib.util.spec_from_file_location(
364 'hotfix_' + self._hash, module_path)
365 self._hotfix_module = importlib.util.module_from_spec(spec)
366 spec.loader.exec_module(self._hotfix_module)
368 hook = getattr(
369 self._hotfix_module, 'stationxml_' + query_type + '_hook')
371 return hook(sx)
373 def _get_constraint_path(self):
374 return op.join(self._cache_path, self._hash, 'constraint.pickle')
376 def _get_channels_path(self):
377 return op.join(self._cache_path, self._hash, 'channels.stationxml')
379 def _get_responses_path(self, nslc=None):
380 dirpath = op.join(
381 self._cache_path, self._hash, 'responses')
383 if nslc is None:
384 return dirpath
385 else:
386 return op.join(
387 dirpath, 'response_%s_%s_%s_%s.stationxml' % nslc)
389 def _get_waveforms_path(self):
390 if self.shared_waveforms:
391 return op.join(self._cache_path, 'waveforms')
392 else:
393 return op.join(self._cache_path, self._hash, 'waveforms')
395 def _log_meta(self, message, target=logger.info):
396 log_prefix = 'FDSN "%s" metadata:' % self.site
397 target(' '.join((log_prefix, message)))
399 def _log_responses(self, message, target=logger.info):
400 log_prefix = 'FDSN "%s" responses:' % self.site
401 target(' '.join((log_prefix, message)))
403 def _log_info_data(self, *args):
404 log_prefix = 'FDSN "%s" waveforms:' % self.site
405 logger.info(' '.join((log_prefix,) + args))
407 def _str_expires(self, t, now):
408 if t is None:
409 return 'expires: never'
410 else:
411 expire = 'expires' if t > now else 'expired'
412 return '%s: %s' % (
413 expire,
414 util.time_to_str(t, format='%Y-%m-%d %H:%M:%S'))
416 def update_channel_inventory(self, squirrel, constraint=None):
417 if constraint is None:
418 constraint = Constraint()
420 expiration_time = self._get_channels_expiration_time()
421 now = time.time()
423 log_target = logger.info
424 if self._constraint and self._constraint.contains(constraint) \
425 and (expiration_time is None or now < expiration_time):
427 status = 'using cached'
429 else:
430 if self._constraint:
431 constraint_temp = copy.deepcopy(self._constraint)
432 constraint_temp.expand(constraint)
433 constraint = constraint_temp
435 try:
436 channel_sx = self._do_channel_query(constraint)
438 channel_sx.created = None # timestamp would ruin diff
440 fn = self._get_channels_path()
441 util.ensuredirs(fn)
442 fn_temp = fn + '.%i.temp' % os.getpid()
443 channel_sx.dump_xml(filename=fn_temp)
445 status = move_or_keep(fn_temp, fn)
447 if status == 'upstream unchanged':
448 squirrel.get_database().silent_touch(fn)
450 self._constraint = constraint
451 self._dump_constraint()
453 except OSError as e:
454 status = 'update failed (%s)' % str(e)
455 log_target = logger.error
457 expiration_time = self._get_channels_expiration_time()
458 self._log_meta(
459 '%s (%s)' % (status, self._str_expires(expiration_time, now)),
460 target=log_target)
462 fn = self._get_channels_path()
463 if os.path.exists(fn):
464 squirrel.add(fn)
466 def _do_channel_query(self, constraint):
467 extra_args = {}
469 if self.site in sites_not_supporting['startbefore']:
470 if constraint.tmin is not None and constraint.tmin != g_tmin:
471 extra_args['starttime'] = constraint.tmin
472 if constraint.tmax is not None and constraint.tmax != g_tmax:
473 extra_args['endtime'] = constraint.tmax
475 else:
476 if constraint.tmin is not None and constraint.tmin != g_tmin:
477 extra_args['endafter'] = constraint.tmin
478 if constraint.tmax is not None and constraint.tmax != g_tmax:
479 extra_args['startbefore'] = constraint.tmax
481 if self.site not in sites_not_supporting['includerestricted']:
482 extra_args.update(
483 includerestricted=(
484 self.user_credentials is not None
485 or self.auth_token is not None
486 or self.auth_token_path is not None))
488 if self.query_args is not None:
489 extra_args.update(self.query_args)
491 self._log_meta('querying...')
493 try:
494 channel_sx = fdsn.station(
495 site=self.site,
496 format='text',
497 level='channel',
498 **extra_args)
500 self._hotfix('channel', channel_sx)
502 return channel_sx
504 except fdsn.EmptyResult:
505 return stationxml.FDSNStationXML(source='dummy-empty-result')
507 def _load_constraint(self):
508 fn = self._get_constraint_path()
509 if op.exists(fn):
510 with open(fn, 'rb') as f:
511 self._constraint = pickle.load(f)
512 else:
513 self._constraint = None
515 def _dump_constraint(self):
516 with open(self._get_constraint_path(), 'wb') as f:
517 pickle.dump(self._constraint, f, protocol=2)
519 def _get_expiration_time(self, path):
520 if self.expires is None:
521 return None
523 try:
524 t = os.stat(path)[8]
525 return t + self.expires
527 except OSError:
528 return 0.0
530 def _get_channels_expiration_time(self):
531 return self._get_expiration_time(self._get_channels_path())
533 def update_waveform_promises(self, squirrel, constraint):
534 from ..base import gaps
535 now = time.time()
536 cpath = os.path.abspath(self._get_channels_path())
538 ctmin = constraint.tmin
539 ctmax = constraint.tmax
541 nuts = squirrel.iter_nuts(
542 'channel',
543 path=cpath,
544 codes=constraint.codes,
545 tmin=ctmin,
546 tmax=ctmax)
548 coverages = squirrel.get_coverage(
549 'waveform',
550 codes=constraint.codes if constraint.codes else None,
551 tmin=ctmin,
552 tmax=ctmax)
554 codes_to_avail = defaultdict(list)
555 for coverage in coverages:
556 for tmin, tmax, _ in coverage.iter_spans():
557 codes_to_avail[coverage.codes].append((tmin, tmax))
559 def sgaps(nut):
560 for tmin, tmax in gaps(
561 codes_to_avail[nut.codes],
562 max(ctmin, nut.tmin) if ctmin is not None else nut.tmin,
563 min(ctmax, nut.tmax) if ctmax is not None else nut.tmax):
565 subnut = clone(nut)
566 subnut.tmin = tmin
567 subnut.tmax = tmax
568 yield subnut
570 def wanted(nuts):
571 for nut in nuts:
572 if nut.tmin < now:
573 if nut.tmax > now:
574 nut.tmax = now
576 for nut in sgaps(nut):
577 yield nut
579 path = self._source_id
580 squirrel.add_virtual(
581 (make_waveform_promise_nut(
582 file_path=path,
583 **nut.waveform_promise_kwargs) for nut in wanted(nuts)),
584 virtual_paths=[path])
586 def remove_waveform_promises(self, squirrel, from_database='selection'):
587 '''
588 Remove waveform promises from live selection or global database.
590 :param from_database:
591 Remove from live selection ``'selection'`` or global database
592 ``'global'``.
593 '''
595 path = self._source_id
596 if from_database == 'selection':
597 squirrel.remove(path)
598 elif from_database == 'global':
599 squirrel.get_database().remove(path)
600 else:
601 raise ValueError(
602 'Values allowed for from_database: ("selection", "global")')
604 def _get_user_credentials(self):
605 d = {}
606 if self.user_credentials is not None:
607 d['user'], d['passwd'] = self.user_credentials
609 if self.auth_token is not None or self.auth_token_path is not None:
610 d['token'] = self.get_auth_token()
612 return d
614 def download_waveforms(
615 self, orders, success, batch_add, error_permanent,
616 error_temporary):
618 elog = ErrorLog(site=self.site)
619 orders.sort(key=orders_sort_key)
620 neach = 20
621 i = 0
622 task = make_task(
623 'FDSN "%s" waveforms: downloading' % self.site, len(orders))
625 while i < len(orders):
626 orders_now = orders[i:i+neach]
627 selection_now = orders_to_selection(orders_now)
629 nsuccess = 0
630 elog.append_checkpoint()
631 self._log_info_data(
632 'downloading, %s' % order_summary(orders_now))
634 all_paths = []
635 with tempfile.TemporaryDirectory() as tmpdir:
636 try:
637 data = fdsn.dataselect(
638 site=self.site, selection=selection_now,
639 **self._get_user_credentials())
641 now = time.time()
643 path = op.join(tmpdir, 'tmp.mseed')
644 with open(path, 'wb') as f:
645 while True:
646 buf = data.read(1024)
647 if not buf:
648 break
649 f.write(buf)
651 trs = io.load(path)
653 by_nslc = defaultdict(list)
654 for tr in trs:
655 by_nslc[tr.nslc_id].append(tr)
657 for order in orders_now:
658 trs_order = []
659 err_this = None
660 for tr in by_nslc[order.codes.nslc]:
661 try:
662 order.validate(tr)
663 trs_order.append(tr.chop(
664 order.tmin, order.tmax, inplace=False))
666 except trace.NoData:
667 err_this = (
668 'empty result', 'empty sub-interval')
670 except InvalidWaveform as e:
671 err_this = ('invalid waveform', str(e))
673 if len(trs_order) == 0:
674 if err_this is None:
675 err_this = ('empty result', '')
677 elog.append(now, order, *err_this)
678 error_permanent(order)
679 else:
680 if len(trs_order) != 1:
681 if err_this:
682 elog.append(
683 now, order,
684 'partial result, %s' % err_this[0],
685 err_this[1])
686 else:
687 elog.append(now, order, 'partial result')
689 paths = self._archive.add(trs_order)
690 all_paths.extend(paths)
692 nsuccess += 1
693 success(order)
695 except fdsn.EmptyResult:
696 now = time.time()
697 for order in orders_now:
698 elog.append(now, order, 'empty result')
699 error_permanent(order)
701 except util.HTTPError as e:
702 now = time.time()
703 for order in orders_now:
704 elog.append(now, order, 'http error', str(e))
705 error_temporary(order)
707 emessage = elog.summarize_recent()
708 self._log_info_data(
709 '%i download%s successful' % (
710 nsuccess, util.plural_s(nsuccess))
711 + (', %s' % emessage if emessage else ''))
713 if all_paths:
714 batch_add(all_paths)
716 i += neach
717 task.update(i)
719 for agg in elog.iter_aggregates():
720 logger.warning(str(agg))
722 task.done()
724 def _do_response_query(self, selection):
725 extra_args = {}
727 if self.site not in sites_not_supporting['includerestricted']:
728 extra_args.update(
729 includerestricted=(
730 self.user_credentials is not None
731 or self.auth_token is not None
732 or self.auth_token_path is not None))
734 self._log_responses('querying...')
736 try:
737 response_sx = fdsn.station(
738 site=self.site,
739 level='response',
740 selection=selection,
741 **extra_args)
743 self._hotfix('response', response_sx)
744 return response_sx
746 except fdsn.EmptyResult:
747 return stationxml.FDSNStationXML(source='dummy-empty-result')
749 def update_response_inventory(self, squirrel, constraint):
750 cpath = os.path.abspath(self._get_channels_path())
751 nuts = squirrel.iter_nuts(
752 'channel', path=cpath, codes=constraint.codes)
754 tmin = g_tmin_queries
755 tmax = g_tmax
757 selection = []
758 now = time.time()
759 have = set()
760 status = defaultdict(list)
761 for nut in nuts:
762 nslc = nut.codes.nslc
763 if nslc in have:
764 continue
765 have.add(nslc)
767 fn = self._get_responses_path(nslc)
768 expiration_time = self._get_expiration_time(fn)
769 if os.path.exists(fn) \
770 and (expiration_time is None or now < expiration_time):
771 status['using cached'].append(nslc)
772 else:
773 selection.append(nslc + (tmin, tmax))
775 dummy = stationxml.FDSNStationXML(source='dummy-empty')
776 neach = 100
777 i = 0
778 fns = []
779 while i < len(selection):
780 selection_now = selection[i:i+neach]
781 i += neach
783 try:
784 sx = self._do_response_query(selection_now)
785 except Exception as e:
786 status['update failed (%s)' % str(e)].extend(
787 entry[:4] for entry in selection_now)
788 continue
790 sx.created = None # timestamp would ruin diff
792 by_nslc = dict(stationxml.split_channels(sx))
794 for entry in selection_now:
795 nslc = entry[:4]
796 response_sx = by_nslc.get(nslc, dummy)
797 try:
798 fn = self._get_responses_path(nslc)
799 fn_temp = fn + '.%i.temp' % os.getpid()
801 util.ensuredirs(fn_temp)
802 response_sx.dump_xml(filename=fn_temp)
804 status_this = move_or_keep(fn_temp, fn)
806 if status_this == 'upstream unchanged':
807 try:
808 squirrel.get_database().silent_touch(fn)
809 except ExecuteGet1Error:
810 pass
812 status[status_this].append(nslc)
813 fns.append(fn)
815 except OSError as e:
816 status['update failed (%s)' % str(e)].append(nslc)
818 for k in sorted(status):
819 if k.find('failed') != -1:
820 log_target = logger.error
821 else:
822 log_target = logger.info
824 self._log_responses(
825 '%s: %s' % (
826 k, codes_to_str_abbreviated(
827 CodesNSLCE(tup) for tup in status[k])),
828 target=log_target)
830 if fns:
831 squirrel.add(fns, kinds=['response'])
834__all__ = [
835 'FDSNSource',
836]