1# http://pyrocko.org - GPLv3
2#
3# The Pyrocko Developers, 21st Century
4# ---|P------/S----------~Lg----------
6import time
7import os
8import copy
9import logging
10import tempfile
11import importlib.util
12from collections import defaultdict
13try:
14 import cPickle as pickle
15except ImportError:
16 import pickle
17import os.path as op
18from .base import Source, Constraint
19from ..model import make_waveform_promise_nut, ehash, InvalidWaveform, \
20 order_summary, WaveformOrder, g_tmin, g_tmax, g_tmin_queries, \
21 codes_to_str_abbreviated, CodesNSLCE
22from ..database import ExecuteGet1Error
23from pyrocko.squirrel.error import SquirrelError
24from pyrocko.client import fdsn
26from pyrocko import util, trace, io
27from pyrocko.io.io_common import FileLoadError
28from pyrocko.io import stationxml
29from pyrocko.progress import progress
30from pyrocko import has_paths
32from pyrocko.guts import Object, String, Timestamp, List, Tuple, Int, Dict, \
33 Duration, Bool, clone
35guts_prefix = 'squirrel'
37fdsn.g_timeout = 60.
39logger = logging.getLogger('psq.client.fdsn')
41sites_not_supporting = {
42 'startbefore': ['geonet'],
43 'includerestricted': ['geonet']}
46def make_task(*args):
47 return progress.task(*args, logger=logger)
50def diff(fn_a, fn_b):
51 try:
52 if os.stat(fn_a).st_size != os.stat(fn_b).st_size:
53 return True
55 except OSError:
56 return True
58 with open(fn_a, 'rb') as fa:
59 with open(fn_b, 'rb') as fb:
60 while True:
61 a = fa.read(1024)
62 b = fb.read(1024)
63 if a != b:
64 return True
66 if len(a) == 0 or len(b) == 0:
67 return False
70def move_or_keep(fn_temp, fn):
71 if op.exists(fn):
72 if diff(fn, fn_temp):
73 os.rename(fn_temp, fn)
74 status = 'updated'
75 else:
76 os.unlink(fn_temp)
77 status = 'upstream unchanged'
79 else:
80 os.rename(fn_temp, fn)
81 status = 'new'
83 return status
86class Archive(Object):
88 def add(self):
89 raise NotImplementedError()
92class MSeedArchive(Archive):
93 template = String.T(default=op.join(
94 '%(tmin_year)s',
95 '%(tmin_month)s',
96 '%(tmin_day)s',
97 'trace_%(network)s_%(station)s_%(location)s_%(channel)s'
98 + '_%(block_tmin_us)s_%(block_tmax_us)s.mseed'))
100 def __init__(self, **kwargs):
101 Archive.__init__(self, **kwargs)
102 self._base_path = None
104 def set_base_path(self, path):
105 self._base_path = path
107 def add(self, order, trs):
108 path = op.join(self._base_path, self.template)
109 fmt = '%Y-%m-%d_%H-%M-%S.6FRAC'
110 return io.save(trs, path, overwrite=True, additional=dict(
111 block_tmin_us=util.time_to_str(order.tmin, format=fmt),
112 block_tmax_us=util.time_to_str(order.tmax, format=fmt)))
115def combine_selections(selection):
116 out = []
117 last = None
118 for this in selection:
119 if last and this[:4] == last[:4] and this[4] == last[5]:
120 last = last[:5] + (this[5],)
121 else:
122 if last:
123 out.append(last)
125 last = this
127 if last:
128 out.append(last)
130 return out
133def orders_sort_key(order):
134 return (order.codes, order.tmin)
137def orders_to_selection(orders):
138 selection = []
139 for order in sorted(orders, key=orders_sort_key):
140 selection.append(
141 order.codes.nslc + (
142 order.tmin-1.0*order.deltat,
143 order.tmax+1.0*order.deltat))
145 return combine_selections(selection)
148class ErrorEntry(Object):
149 time = Timestamp.T()
150 order = WaveformOrder.T()
151 kind = String.T()
152 details = String.T(optional=True)
155class ErrorAggregate(Object):
156 site = String.T()
157 kind = String.T()
158 details = String.T()
159 entries = List.T(ErrorEntry.T())
160 codes = List.T(CodesNSLCE.T())
161 time_spans = List.T(Tuple.T(2, Timestamp.T()))
163 def __str__(self):
164 codes = [str(x) for x in self.codes]
165 scodes = '\n' + util.ewrap(codes, indent=' ') if codes else '<none>'
166 tss = self.time_spans
167 sspans = '\n' + util.ewrap(('%s - %s' % (
168 util.time_to_str(ts[0]), util.time_to_str(ts[1])) for ts in tss),
169 indent=' ')
171 return ('FDSN "%s": download error summary for "%s" (%i)\n%s '
172 'Codes:%s\n Time spans:%s') % (
173 self.site,
174 self.kind,
175 len(self.entries),
176 ' Details: %s\n' % self.details if self.details else '',
177 scodes,
178 sspans)
181class ErrorLog(Object):
182 site = String.T()
183 entries = List.T(ErrorEntry.T())
184 checkpoints = List.T(Int.T())
186 def append_checkpoint(self):
187 self.checkpoints.append(len(self.entries))
189 def append(self, time, order, kind, details=''):
190 entry = ErrorEntry(time=time, order=order, kind=kind, details=details)
191 self.entries.append(entry)
193 def iter_aggregates(self):
194 by_kind_details = defaultdict(list)
195 for entry in self.entries:
196 by_kind_details[entry.kind, entry.details].append(entry)
198 kind_details = sorted(by_kind_details.keys())
200 for kind, details in kind_details:
201 entries = by_kind_details[kind, details]
202 codes = sorted(set(entry.order.codes for entry in entries))
203 selection = orders_to_selection(entry.order for entry in entries)
204 time_spans = sorted(set(row[-2:] for row in selection))
205 yield ErrorAggregate(
206 site=self.site,
207 kind=kind,
208 details=details,
209 entries=entries,
210 codes=codes,
211 time_spans=time_spans)
213 def summarize_recent(self):
214 ioff = self.checkpoints[-1] if self.checkpoints else 0
215 recent = self.entries[ioff:]
216 kinds = sorted(set(entry.kind for entry in recent))
217 if recent:
218 return '%i error%s (%s)' % (
219 len(recent), util.plural_s(recent), '; '.join(kinds))
220 else:
221 return ''
224class Aborted(SquirrelError):
225 pass
228class FDSNSource(Source, has_paths.HasPaths):
230 '''
231 Squirrel data-source to transparently get data from FDSN web services.
233 Attaching an :py:class:`FDSNSource` object to a :py:class:`Squirrel` allows
234 the latter to download station and waveform data from an FDSN web service
235 should the data not already happen to be available locally.
236 '''
238 site = String.T(
239 help='FDSN site url or alias name (see '
240 ':py:mod:`pyrocko.client.fdsn`).')
242 query_args = Dict.T(
243 String.T(), String.T(),
244 optional=True,
245 help='Common query arguments, which are appended to all queries.')
247 expires = Duration.T(
248 optional=True,
249 help='Expiration time [s]. Information older than this will be '
250 'refreshed. This only applies to station-metadata. Waveforms do '
251 'not expire. If set to ``None`` neither type of data expires.')
253 cache_path = String.T(
254 optional=True,
255 help='Directory path where any downloaded waveforms and station '
256 'meta-data are to be kept. By default the Squirrel '
257 "environment's cache directory is used.")
259 shared_waveforms = Bool.T(
260 default=False,
261 help='If ``True``, waveforms are shared with other FDSN sources in '
262 'the same Squirrel environment. If ``False``, they are kept '
263 'separate.')
265 user_credentials = Tuple.T(
266 2, String.T(),
267 optional=True,
268 help='User and password for FDSN servers requiring password '
269 'authentication')
271 auth_token = String.T(
272 optional=True,
273 help='Authentication token to be presented to the FDSN server.')
275 auth_token_path = String.T(
276 optional=True,
277 help='Path to file containing the authentication token to be '
278 'presented to the FDSN server.')
280 hotfix_module_path = has_paths.Path.T(
281 optional=True,
282 help='Path to Python module to locally patch metadata errors.')
284 def __init__(self, site, query_args=None, **kwargs):
285 Source.__init__(self, site=site, query_args=query_args, **kwargs)
287 self._constraint = None
288 self._hash = self.make_hash()
289 self._source_id = 'client:fdsn:%s' % self._hash
290 self._error_infos = []
292 def describe(self):
293 return self._source_id
295 def make_hash(self):
296 s = self.site
297 s += 'notoken' \
298 if (self.auth_token is None and self.auth_token_path is None) \
299 else 'token'
301 if self.user_credentials is not None:
302 s += self.user_credentials[0]
303 else:
304 s += 'nocred'
306 if self.query_args is not None:
307 s += ','.join(
308 '%s:%s' % (k, self.query_args[k])
309 for k in sorted(self.query_args.keys()))
310 else:
311 s += 'noqueryargs'
313 return ehash(s)
315 def get_hash(self):
316 return self._hash
318 def get_auth_token(self):
319 if self.auth_token:
320 return self.auth_token
322 elif self.auth_token_path is not None:
323 try:
324 with open(self.auth_token_path, 'rb') as f:
325 return f.read().decode('ascii')
327 except OSError as e:
328 raise FileLoadError(
329 'Cannot load auth token file (%s): %s'
330 % (str(e), self.auth_token_path))
332 else:
333 raise Exception(
334 'FDSNSource: auth_token and auth_token_path are mutually '
335 'exclusive.')
337 def setup(self, squirrel, check=True):
338 self._cache_path = op.join(
339 self.cache_path or squirrel._cache_path, 'fdsn')
341 util.ensuredir(self._cache_path)
342 self._load_constraint()
343 self._archive = MSeedArchive()
344 waveforms_path = self._get_waveforms_path()
345 util.ensuredir(waveforms_path)
346 self._archive.set_base_path(waveforms_path)
348 squirrel.add(
349 self._get_waveforms_path(),
350 check=check)
352 fn = self._get_channels_path()
353 if os.path.exists(fn):
354 squirrel.add(fn)
356 squirrel.add_virtual(
357 [], virtual_paths=[self._source_id])
359 responses_path = self._get_responses_path()
360 if os.path.exists(responses_path):
361 squirrel.add(responses_path, kinds=['response'])
363 self._hotfix_module = None
365 def _hotfix(self, query_type, sx):
366 if self.hotfix_module_path is None:
367 return
369 if self._hotfix_module is None:
370 module_path = self.expand_path(self.hotfix_module_path)
371 spec = importlib.util.spec_from_file_location(
372 'hotfix_' + self._hash, module_path)
373 self._hotfix_module = importlib.util.module_from_spec(spec)
374 spec.loader.exec_module(self._hotfix_module)
376 hook = getattr(
377 self._hotfix_module, 'stationxml_' + query_type + '_hook')
379 return hook(sx)
381 def _get_constraint_path(self):
382 return op.join(self._cache_path, self._hash, 'constraint.pickle')
384 def _get_channels_path(self):
385 return op.join(self._cache_path, self._hash, 'channels.stationxml')
387 def _get_responses_path(self, nslc=None):
388 dirpath = op.join(
389 self._cache_path, self._hash, 'responses')
391 if nslc is None:
392 return dirpath
393 else:
394 return op.join(
395 dirpath, 'response_%s_%s_%s_%s.stationxml' % nslc)
397 def _get_waveforms_path(self):
398 if self.shared_waveforms:
399 return op.join(self._cache_path, 'waveforms')
400 else:
401 return op.join(self._cache_path, self._hash, 'waveforms')
403 def _log_meta(self, message, target=logger.info):
404 log_prefix = 'FDSN "%s" metadata:' % self.site
405 target(' '.join((log_prefix, message)))
407 def _log_responses(self, message, target=logger.info):
408 log_prefix = 'FDSN "%s" responses:' % self.site
409 target(' '.join((log_prefix, message)))
411 def _log_info_data(self, *args):
412 log_prefix = 'FDSN "%s" waveforms:' % self.site
413 logger.info(' '.join((log_prefix,) + args))
415 def _str_expires(self, t, now):
416 if t is None:
417 return 'expires: never'
418 else:
419 expire = 'expires' if t > now else 'expired'
420 return '%s: %s' % (
421 expire,
422 util.time_to_str(t, format='%Y-%m-%d %H:%M:%S'))
424 def update_channel_inventory(self, squirrel, constraint=None):
425 if constraint is None:
426 constraint = Constraint()
428 expiration_time = self._get_channels_expiration_time()
429 now = time.time()
431 log_target = logger.info
432 if self._constraint and self._constraint.contains(constraint) \
433 and (expiration_time is None or now < expiration_time):
435 status = 'using cached'
437 else:
438 if self._constraint:
439 constraint_temp = copy.deepcopy(self._constraint)
440 constraint_temp.expand(constraint)
441 constraint = constraint_temp
443 try:
444 channel_sx = self._do_channel_query(constraint)
446 channel_sx.created = None # timestamp would ruin diff
448 fn = self._get_channels_path()
449 util.ensuredirs(fn)
450 fn_temp = fn + '.%i.temp' % os.getpid()
451 channel_sx.dump_xml(filename=fn_temp)
453 status = move_or_keep(fn_temp, fn)
455 if status == 'upstream unchanged':
456 squirrel.get_database().silent_touch(fn)
458 self._constraint = constraint
459 self._dump_constraint()
461 except OSError as e:
462 status = 'update failed (%s)' % str(e)
463 log_target = logger.error
465 expiration_time = self._get_channels_expiration_time()
466 self._log_meta(
467 '%s (%s)' % (status, self._str_expires(expiration_time, now)),
468 target=log_target)
470 fn = self._get_channels_path()
471 if os.path.exists(fn):
472 squirrel.add(fn)
474 def _do_channel_query(self, constraint):
475 extra_args = {}
477 if self.site in sites_not_supporting['startbefore']:
478 if constraint.tmin is not None and constraint.tmin != g_tmin:
479 extra_args['starttime'] = constraint.tmin
480 if constraint.tmax is not None and constraint.tmax != g_tmax:
481 extra_args['endtime'] = constraint.tmax
483 else:
484 if constraint.tmin is not None and constraint.tmin != g_tmin:
485 extra_args['endafter'] = constraint.tmin
486 if constraint.tmax is not None and constraint.tmax != g_tmax:
487 extra_args['startbefore'] = constraint.tmax
489 if self.site not in sites_not_supporting['includerestricted']:
490 extra_args.update(
491 includerestricted=(
492 self.user_credentials is not None
493 or self.auth_token is not None
494 or self.auth_token_path is not None))
496 if self.query_args is not None:
497 extra_args.update(self.query_args)
499 self._log_meta('querying...')
501 try:
502 channel_sx = fdsn.station(
503 site=self.site,
504 format='text',
505 level='channel',
506 **extra_args)
508 self._hotfix('channel', channel_sx)
510 return channel_sx
512 except fdsn.EmptyResult:
513 return stationxml.FDSNStationXML(source='dummy-empty-result')
515 def _load_constraint(self):
516 fn = self._get_constraint_path()
517 if op.exists(fn):
518 with open(fn, 'rb') as f:
519 self._constraint = pickle.load(f)
520 else:
521 self._constraint = None
523 def _dump_constraint(self):
524 with open(self._get_constraint_path(), 'wb') as f:
525 pickle.dump(self._constraint, f, protocol=2)
527 def _get_expiration_time(self, path):
528 if self.expires is None:
529 return None
531 try:
532 t = os.stat(path)[8]
533 return t + self.expires
535 except OSError:
536 return 0.0
538 def _get_channels_expiration_time(self):
539 return self._get_expiration_time(self._get_channels_path())
541 def update_waveform_promises(self, squirrel, constraint):
542 from ..base import gaps
543 now = time.time()
544 cpath = os.path.abspath(self._get_channels_path())
546 ctmin = constraint.tmin
547 ctmax = constraint.tmax
549 nuts = squirrel.iter_nuts(
550 'channel',
551 path=cpath,
552 codes=constraint.codes,
553 tmin=ctmin,
554 tmax=ctmax)
556 coverages = squirrel.get_coverage(
557 'waveform',
558 codes=constraint.codes if constraint.codes else None,
559 tmin=ctmin,
560 tmax=ctmax)
562 codes_to_avail = defaultdict(list)
563 for coverage in coverages:
564 for tmin, tmax, _ in coverage.iter_spans():
565 codes_to_avail[coverage.codes].append((tmin, tmax))
567 def sgaps(nut):
568 for tmin, tmax in gaps(
569 codes_to_avail[nut.codes],
570 max(ctmin, nut.tmin) if ctmin is not None else nut.tmin,
571 min(ctmax, nut.tmax) if ctmax is not None else nut.tmax):
573 subnut = clone(nut)
574 subnut.tmin = tmin
575 subnut.tmax = tmax
577 # ignore 1-sample gaps produced by rounding errors
578 if subnut.tmax - subnut.tmin < 2*subnut.deltat:
579 continue
581 yield subnut
583 def wanted(nuts):
584 for nut in nuts:
585 if nut.tmin < now:
586 if nut.tmax > now:
587 nut.tmax = now
589 for nut in sgaps(nut):
590 yield nut
592 path = self._source_id
593 squirrel.add_virtual(
594 (make_waveform_promise_nut(
595 file_path=path,
596 **nut.waveform_promise_kwargs) for nut in wanted(nuts)),
597 virtual_paths=[path])
599 def remove_waveform_promises(self, squirrel, from_database='selection'):
600 '''
601 Remove waveform promises from live selection or global database.
603 :param from_database:
604 Remove from live selection ``'selection'`` or global database
605 ``'global'``.
606 '''
608 path = self._source_id
609 if from_database == 'selection':
610 squirrel.remove(path)
611 elif from_database == 'global':
612 squirrel.get_database().remove(path)
613 else:
614 raise ValueError(
615 'Values allowed for from_database: ("selection", "global")')
617 def _get_user_credentials(self):
618 d = {}
619 if self.user_credentials is not None:
620 d['user'], d['passwd'] = self.user_credentials
622 if self.auth_token is not None or self.auth_token_path is not None:
623 d['token'] = self.get_auth_token()
625 return d
627 def download_waveforms(
628 self, orders, success, batch_add, error_permanent,
629 error_temporary):
631 elog = ErrorLog(site=self.site)
632 orders.sort(key=orders_sort_key)
633 neach = 20
634 i = 0
635 task = make_task(
636 'FDSN "%s" waveforms: downloading' % self.site, len(orders))
638 while i < len(orders):
639 orders_now = orders[i:i+neach]
640 selection_now = orders_to_selection(orders_now)
641 nsamples_estimate = sum(
642 order.estimate_nsamples() for order in orders_now)
644 nsuccess = 0
645 elog.append_checkpoint()
646 self._log_info_data(
647 'downloading, %s' % order_summary(orders_now))
649 all_paths = []
650 with tempfile.TemporaryDirectory() as tmpdir:
651 try:
652 data = fdsn.dataselect(
653 site=self.site, selection=selection_now,
654 **self._get_user_credentials())
656 now = time.time()
658 path = op.join(tmpdir, 'tmp.mseed')
659 with open(path, 'wb') as f:
660 nread = 0
661 while True:
662 buf = data.read(1024)
663 nread += len(buf)
664 if not buf:
665 break
666 f.write(buf)
668 # abort if we get way more data than expected
669 if nread > max(
670 1024 * 1000,
671 nsamples_estimate * 4 * 10):
673 raise Aborted('Too much data received.')
675 trs = io.load(path)
677 by_nslc = defaultdict(list)
678 for tr in trs:
679 by_nslc[tr.nslc_id].append(tr)
681 for order in orders_now:
682 trs_order = []
683 err_this = None
684 for tr in by_nslc[order.codes.nslc]:
685 try:
686 order.validate(tr)
687 trs_order.append(tr.chop(
688 order.tmin, order.tmax, inplace=False))
690 except trace.NoData:
691 err_this = (
692 'empty result', 'empty sub-interval')
694 except InvalidWaveform as e:
695 err_this = ('invalid waveform', str(e))
697 if len(trs_order) == 0:
698 if err_this is None:
699 err_this = ('empty result', '')
701 elog.append(now, order, *err_this)
702 error_permanent(order)
703 else:
704 def tsame(ta, tb):
705 return abs(tb - ta) < 2 * order.deltat
707 if len(trs_order) != 1 \
708 or not tsame(
709 trs_order[0].tmin, order.tmin) \
710 or not tsame(
711 trs_order[0].tmax, order.tmax):
713 if err_this:
714 elog.append(
715 now, order,
716 'partial result, %s' % err_this[0],
717 err_this[1])
718 else:
719 elog.append(now, order, 'partial result')
721 paths = self._archive.add(order, trs_order)
722 all_paths.extend(paths)
724 nsuccess += 1
725 success(order)
727 except fdsn.EmptyResult:
728 now = time.time()
729 for order in orders_now:
730 elog.append(now, order, 'empty result')
731 error_permanent(order)
733 except Aborted as e:
734 now = time.time()
735 for order in orders_now:
736 elog.append(now, order, 'aborted', str(e))
737 error_permanent(order)
739 except util.HTTPError as e:
740 now = time.time()
741 for order in orders_now:
742 elog.append(now, order, 'http error', str(e))
743 error_temporary(order)
745 emessage = elog.summarize_recent()
747 self._log_info_data(
748 '%i download%s %ssuccessful' % (
749 nsuccess,
750 util.plural_s(nsuccess),
751 '(partially) ' if emessage else '')
752 + (', %s' % emessage if emessage else ''))
754 if all_paths:
755 batch_add(all_paths)
757 i += neach
758 task.update(i)
760 for agg in elog.iter_aggregates():
761 logger.warning(str(agg))
763 task.done()
765 def _do_response_query(self, selection):
766 extra_args = {}
768 if self.site not in sites_not_supporting['includerestricted']:
769 extra_args.update(
770 includerestricted=(
771 self.user_credentials is not None
772 or self.auth_token is not None
773 or self.auth_token_path is not None))
775 self._log_responses('querying...')
777 try:
778 response_sx = fdsn.station(
779 site=self.site,
780 level='response',
781 selection=selection,
782 **extra_args)
784 self._hotfix('response', response_sx)
785 return response_sx
787 except fdsn.EmptyResult:
788 return stationxml.FDSNStationXML(source='dummy-empty-result')
790 def update_response_inventory(self, squirrel, constraint):
791 cpath = os.path.abspath(self._get_channels_path())
792 nuts = squirrel.iter_nuts(
793 'channel', path=cpath, codes=constraint.codes)
795 tmin = g_tmin_queries
796 tmax = g_tmax
798 selection = []
799 now = time.time()
800 have = set()
801 status = defaultdict(list)
802 for nut in nuts:
803 nslc = nut.codes.nslc
804 if nslc in have:
805 continue
806 have.add(nslc)
808 fn = self._get_responses_path(nslc)
809 expiration_time = self._get_expiration_time(fn)
810 if os.path.exists(fn) \
811 and (expiration_time is None or now < expiration_time):
812 status['using cached'].append(nslc)
813 else:
814 selection.append(nslc + (tmin, tmax))
816 dummy = stationxml.FDSNStationXML(source='dummy-empty')
817 neach = 100
818 i = 0
819 fns = []
820 while i < len(selection):
821 selection_now = selection[i:i+neach]
822 i += neach
824 try:
825 sx = self._do_response_query(selection_now)
826 except Exception as e:
827 status['update failed (%s)' % str(e)].extend(
828 entry[:4] for entry in selection_now)
829 continue
831 sx.created = None # timestamp would ruin diff
833 by_nslc = dict(stationxml.split_channels(sx))
835 for entry in selection_now:
836 nslc = entry[:4]
837 response_sx = by_nslc.get(nslc, dummy)
838 try:
839 fn = self._get_responses_path(nslc)
840 fn_temp = fn + '.%i.temp' % os.getpid()
842 util.ensuredirs(fn_temp)
843 response_sx.dump_xml(filename=fn_temp)
845 status_this = move_or_keep(fn_temp, fn)
847 if status_this == 'upstream unchanged':
848 try:
849 squirrel.get_database().silent_touch(fn)
850 except ExecuteGet1Error:
851 pass
853 status[status_this].append(nslc)
854 fns.append(fn)
856 except OSError as e:
857 status['update failed (%s)' % str(e)].append(nslc)
859 for k in sorted(status):
860 if k.find('failed') != -1:
861 log_target = logger.error
862 else:
863 log_target = logger.info
865 self._log_responses(
866 '%s: %s' % (
867 k, codes_to_str_abbreviated(
868 CodesNSLCE(tup) for tup in status[k])),
869 target=log_target)
871 if fns:
872 squirrel.add(fns, kinds=['response'])
875__all__ = [
876 'FDSNSource',
877]