1# http://pyrocko.org - GPLv3
2#
3# The Pyrocko Developers, 21st Century
4# ---|P------/S----------~Lg----------
6import time
7import os
8import copy
9import logging
10import tempfile
11import importlib.util
12from collections import defaultdict
13try:
14 import cPickle as pickle
15except ImportError:
16 import pickle
17import os.path as op
18from .base import Source, Constraint
19from ..model import make_waveform_promise_nut, ehash, InvalidWaveform, \
20 order_summary, WaveformOrder, g_tmin, g_tmax, g_tmin_queries, \
21 codes_to_str_abbreviated, CodesNSLCE
22from ..database import ExecuteGet1Error
23from pyrocko.client import fdsn
25from pyrocko import util, trace, io
26from pyrocko.io.io_common import FileLoadError
27from pyrocko.io import stationxml
28from pyrocko.progress import progress
29from pyrocko import has_paths
31from pyrocko.guts import Object, String, Timestamp, List, Tuple, Int, Dict, \
32 Duration, Bool, clone
34guts_prefix = 'squirrel'
36fdsn.g_timeout = 60.
38logger = logging.getLogger('psq.client.fdsn')
40sites_not_supporting = {
41 'startbefore': ['geonet'],
42 'includerestricted': ['geonet']}
45def make_task(*args):
46 return progress.task(*args, logger=logger)
49def diff(fn_a, fn_b):
50 try:
51 if os.stat(fn_a).st_size != os.stat(fn_b).st_size:
52 return True
54 except OSError:
55 return True
57 with open(fn_a, 'rb') as fa:
58 with open(fn_b, 'rb') as fb:
59 while True:
60 a = fa.read(1024)
61 b = fb.read(1024)
62 if a != b:
63 return True
65 if len(a) == 0 or len(b) == 0:
66 return False
69def move_or_keep(fn_temp, fn):
70 if op.exists(fn):
71 if diff(fn, fn_temp):
72 os.rename(fn_temp, fn)
73 status = 'updated'
74 else:
75 os.unlink(fn_temp)
76 status = 'upstream unchanged'
78 else:
79 os.rename(fn_temp, fn)
80 status = 'new'
82 return status
85class Archive(Object):
87 def add(self):
88 raise NotImplementedError()
91class MSeedArchive(Archive):
92 template = String.T(default=op.join(
93 '%(tmin_year)s',
94 '%(tmin_month)s',
95 '%(tmin_day)s',
96 'trace_%(network)s_%(station)s_%(location)s_%(channel)s'
97 + '_%(block_tmin_us)s_%(block_tmax_us)s.mseed'))
99 def __init__(self, **kwargs):
100 Archive.__init__(self, **kwargs)
101 self._base_path = None
103 def set_base_path(self, path):
104 self._base_path = path
106 def add(self, order, trs):
107 path = op.join(self._base_path, self.template)
108 fmt = '%Y-%m-%d_%H-%M-%S.6FRAC'
109 return io.save(trs, path, overwrite=True, additional=dict(
110 block_tmin_us=util.time_to_str(order.tmin, format=fmt),
111 block_tmax_us=util.time_to_str(order.tmax, format=fmt)))
114def combine_selections(selection):
115 out = []
116 last = None
117 for this in selection:
118 if last and this[:4] == last[:4] and this[4] == last[5]:
119 last = last[:5] + (this[5],)
120 else:
121 if last:
122 out.append(last)
124 last = this
126 if last:
127 out.append(last)
129 return out
132def orders_sort_key(order):
133 return (order.codes, order.tmin)
136def orders_to_selection(orders):
137 selection = []
138 for order in sorted(orders, key=orders_sort_key):
139 selection.append(
140 order.codes.nslc + (
141 order.tmin-1.0*order.deltat,
142 order.tmax+1.0*order.deltat))
144 return combine_selections(selection)
147class ErrorEntry(Object):
148 time = Timestamp.T()
149 order = WaveformOrder.T()
150 kind = String.T()
151 details = String.T(optional=True)
154class ErrorAggregate(Object):
155 site = String.T()
156 kind = String.T()
157 details = String.T()
158 entries = List.T(ErrorEntry.T())
159 codes = List.T(CodesNSLCE.T())
160 time_spans = List.T(Tuple.T(2, Timestamp.T()))
162 def __str__(self):
163 codes = [str(x) for x in self.codes]
164 scodes = '\n' + util.ewrap(codes, indent=' ') if codes else '<none>'
165 tss = self.time_spans
166 sspans = '\n' + util.ewrap(('%s - %s' % (
167 util.time_to_str(ts[0]), util.time_to_str(ts[1])) for ts in tss),
168 indent=' ')
170 return ('FDSN "%s": download error summary for "%s" (%i)\n%s '
171 'Codes:%s\n Time spans:%s') % (
172 self.site,
173 self.kind,
174 len(self.entries),
175 ' Details: %s\n' % self.details if self.details else '',
176 scodes,
177 sspans)
180class ErrorLog(Object):
181 site = String.T()
182 entries = List.T(ErrorEntry.T())
183 checkpoints = List.T(Int.T())
185 def append_checkpoint(self):
186 self.checkpoints.append(len(self.entries))
188 def append(self, time, order, kind, details=''):
189 entry = ErrorEntry(time=time, order=order, kind=kind, details=details)
190 self.entries.append(entry)
192 def iter_aggregates(self):
193 by_kind_details = defaultdict(list)
194 for entry in self.entries:
195 by_kind_details[entry.kind, entry.details].append(entry)
197 kind_details = sorted(by_kind_details.keys())
199 for kind, details in kind_details:
200 entries = by_kind_details[kind, details]
201 codes = sorted(set(entry.order.codes for entry in entries))
202 selection = orders_to_selection(entry.order for entry in entries)
203 time_spans = sorted(set(row[-2:] for row in selection))
204 yield ErrorAggregate(
205 site=self.site,
206 kind=kind,
207 details=details,
208 entries=entries,
209 codes=codes,
210 time_spans=time_spans)
212 def summarize_recent(self):
213 ioff = self.checkpoints[-1] if self.checkpoints else 0
214 recent = self.entries[ioff:]
215 kinds = sorted(set(entry.kind for entry in recent))
216 if recent:
217 return '%i error%s (%s)' % (
218 len(recent), util.plural_s(recent), '; '.join(kinds))
219 else:
220 return ''
223class FDSNSource(Source, has_paths.HasPaths):
225 '''
226 Squirrel data-source to transparently get data from FDSN web services.
228 Attaching an :py:class:`FDSNSource` object to a :py:class:`Squirrel` allows
229 the latter to download station and waveform data from an FDSN web service
230 should the data not already happen to be available locally.
231 '''
233 site = String.T(
234 help='FDSN site url or alias name (see '
235 ':py:mod:`pyrocko.client.fdsn`).')
237 query_args = Dict.T(
238 String.T(), String.T(),
239 optional=True,
240 help='Common query arguments, which are appended to all queries.')
242 expires = Duration.T(
243 optional=True,
244 help='Expiration time [s]. Information older than this will be '
245 'refreshed. This only applies to station-metadata. Waveforms do '
246 'not expire. If set to ``None`` neither type of data expires.')
248 cache_path = String.T(
249 optional=True,
250 help='Directory path where any downloaded waveforms and station '
251 'meta-data are to be kept. By default the Squirrel '
252 'environment\'s cache directory is used.')
254 shared_waveforms = Bool.T(
255 default=False,
256 help='If ``True``, waveforms are shared with other FDSN sources in '
257 'the same Squirrel environment. If ``False``, they are kept '
258 'separate.')
260 user_credentials = Tuple.T(
261 2, String.T(),
262 optional=True,
263 help='User and password for FDSN servers requiring password '
264 'authentication')
266 auth_token = String.T(
267 optional=True,
268 help='Authentication token to be presented to the FDSN server.')
270 auth_token_path = String.T(
271 optional=True,
272 help='Path to file containing the authentication token to be '
273 'presented to the FDSN server.')
275 hotfix_module_path = has_paths.Path.T(
276 optional=True,
277 help='Path to Python module to locally patch metadata errors.')
279 def __init__(self, site, query_args=None, **kwargs):
280 Source.__init__(self, site=site, query_args=query_args, **kwargs)
282 self._constraint = None
283 self._hash = self.make_hash()
284 self._source_id = 'client:fdsn:%s' % self._hash
285 self._error_infos = []
287 def describe(self):
288 return self._source_id
290 def make_hash(self):
291 s = self.site
292 s += 'notoken' \
293 if (self.auth_token is None and self.auth_token_path is None) \
294 else 'token'
296 if self.user_credentials is not None:
297 s += self.user_credentials[0]
298 else:
299 s += 'nocred'
301 if self.query_args is not None:
302 s += ','.join(
303 '%s:%s' % (k, self.query_args[k])
304 for k in sorted(self.query_args.keys()))
305 else:
306 s += 'noqueryargs'
308 return ehash(s)
310 def get_hash(self):
311 return self._hash
313 def get_auth_token(self):
314 if self.auth_token:
315 return self.auth_token
317 elif self.auth_token_path is not None:
318 try:
319 with open(self.auth_token_path, 'rb') as f:
320 return f.read().decode('ascii')
322 except OSError as e:
323 raise FileLoadError(
324 'Cannot load auth token file (%s): %s'
325 % (str(e), self.auth_token_path))
327 else:
328 raise Exception(
329 'FDSNSource: auth_token and auth_token_path are mutually '
330 'exclusive.')
332 def setup(self, squirrel, check=True):
333 self._cache_path = op.join(
334 self.cache_path or squirrel._cache_path, 'fdsn')
336 util.ensuredir(self._cache_path)
337 self._load_constraint()
338 self._archive = MSeedArchive()
339 waveforms_path = self._get_waveforms_path()
340 util.ensuredir(waveforms_path)
341 self._archive.set_base_path(waveforms_path)
343 squirrel.add(
344 self._get_waveforms_path(),
345 check=check)
347 fn = self._get_channels_path()
348 if os.path.exists(fn):
349 squirrel.add(fn)
351 squirrel.add_virtual(
352 [], virtual_paths=[self._source_id])
354 responses_path = self._get_responses_path()
355 if os.path.exists(responses_path):
356 squirrel.add(responses_path, kinds=['response'])
358 self._hotfix_module = None
360 def _hotfix(self, query_type, sx):
361 if self.hotfix_module_path is None:
362 return
364 if self._hotfix_module is None:
365 module_path = self.expand_path(self.hotfix_module_path)
366 spec = importlib.util.spec_from_file_location(
367 'hotfix_' + self._hash, module_path)
368 self._hotfix_module = importlib.util.module_from_spec(spec)
369 spec.loader.exec_module(self._hotfix_module)
371 hook = getattr(
372 self._hotfix_module, 'stationxml_' + query_type + '_hook')
374 return hook(sx)
376 def _get_constraint_path(self):
377 return op.join(self._cache_path, self._hash, 'constraint.pickle')
379 def _get_channels_path(self):
380 return op.join(self._cache_path, self._hash, 'channels.stationxml')
382 def _get_responses_path(self, nslc=None):
383 dirpath = op.join(
384 self._cache_path, self._hash, 'responses')
386 if nslc is None:
387 return dirpath
388 else:
389 return op.join(
390 dirpath, 'response_%s_%s_%s_%s.stationxml' % nslc)
392 def _get_waveforms_path(self):
393 if self.shared_waveforms:
394 return op.join(self._cache_path, 'waveforms')
395 else:
396 return op.join(self._cache_path, self._hash, 'waveforms')
398 def _log_meta(self, message, target=logger.info):
399 log_prefix = 'FDSN "%s" metadata:' % self.site
400 target(' '.join((log_prefix, message)))
402 def _log_responses(self, message, target=logger.info):
403 log_prefix = 'FDSN "%s" responses:' % self.site
404 target(' '.join((log_prefix, message)))
406 def _log_info_data(self, *args):
407 log_prefix = 'FDSN "%s" waveforms:' % self.site
408 logger.info(' '.join((log_prefix,) + args))
410 def _str_expires(self, t, now):
411 if t is None:
412 return 'expires: never'
413 else:
414 expire = 'expires' if t > now else 'expired'
415 return '%s: %s' % (
416 expire,
417 util.time_to_str(t, format='%Y-%m-%d %H:%M:%S'))
419 def update_channel_inventory(self, squirrel, constraint=None):
420 if constraint is None:
421 constraint = Constraint()
423 expiration_time = self._get_channels_expiration_time()
424 now = time.time()
426 log_target = logger.info
427 if self._constraint and self._constraint.contains(constraint) \
428 and (expiration_time is None or now < expiration_time):
430 status = 'using cached'
432 else:
433 if self._constraint:
434 constraint_temp = copy.deepcopy(self._constraint)
435 constraint_temp.expand(constraint)
436 constraint = constraint_temp
438 try:
439 channel_sx = self._do_channel_query(constraint)
441 channel_sx.created = None # timestamp would ruin diff
443 fn = self._get_channels_path()
444 util.ensuredirs(fn)
445 fn_temp = fn + '.%i.temp' % os.getpid()
446 channel_sx.dump_xml(filename=fn_temp)
448 status = move_or_keep(fn_temp, fn)
450 if status == 'upstream unchanged':
451 squirrel.get_database().silent_touch(fn)
453 self._constraint = constraint
454 self._dump_constraint()
456 except OSError as e:
457 status = 'update failed (%s)' % str(e)
458 log_target = logger.error
460 expiration_time = self._get_channels_expiration_time()
461 self._log_meta(
462 '%s (%s)' % (status, self._str_expires(expiration_time, now)),
463 target=log_target)
465 fn = self._get_channels_path()
466 if os.path.exists(fn):
467 squirrel.add(fn)
469 def _do_channel_query(self, constraint):
470 extra_args = {}
472 if self.site in sites_not_supporting['startbefore']:
473 if constraint.tmin is not None and constraint.tmin != g_tmin:
474 extra_args['starttime'] = constraint.tmin
475 if constraint.tmax is not None and constraint.tmax != g_tmax:
476 extra_args['endtime'] = constraint.tmax
478 else:
479 if constraint.tmin is not None and constraint.tmin != g_tmin:
480 extra_args['endafter'] = constraint.tmin
481 if constraint.tmax is not None and constraint.tmax != g_tmax:
482 extra_args['startbefore'] = constraint.tmax
484 if self.site not in sites_not_supporting['includerestricted']:
485 extra_args.update(
486 includerestricted=(
487 self.user_credentials is not None
488 or self.auth_token is not None
489 or self.auth_token_path is not None))
491 if self.query_args is not None:
492 extra_args.update(self.query_args)
494 self._log_meta('querying...')
496 try:
497 channel_sx = fdsn.station(
498 site=self.site,
499 format='text',
500 level='channel',
501 **extra_args)
503 self._hotfix('channel', channel_sx)
505 return channel_sx
507 except fdsn.EmptyResult:
508 return stationxml.FDSNStationXML(source='dummy-empty-result')
510 def _load_constraint(self):
511 fn = self._get_constraint_path()
512 if op.exists(fn):
513 with open(fn, 'rb') as f:
514 self._constraint = pickle.load(f)
515 else:
516 self._constraint = None
518 def _dump_constraint(self):
519 with open(self._get_constraint_path(), 'wb') as f:
520 pickle.dump(self._constraint, f, protocol=2)
522 def _get_expiration_time(self, path):
523 if self.expires is None:
524 return None
526 try:
527 t = os.stat(path)[8]
528 return t + self.expires
530 except OSError:
531 return 0.0
533 def _get_channels_expiration_time(self):
534 return self._get_expiration_time(self._get_channels_path())
536 def update_waveform_promises(self, squirrel, constraint):
537 from ..base import gaps
538 now = time.time()
539 cpath = os.path.abspath(self._get_channels_path())
541 ctmin = constraint.tmin
542 ctmax = constraint.tmax
544 nuts = squirrel.iter_nuts(
545 'channel',
546 path=cpath,
547 codes=constraint.codes,
548 tmin=ctmin,
549 tmax=ctmax)
551 coverages = squirrel.get_coverage(
552 'waveform',
553 codes=constraint.codes if constraint.codes else None,
554 tmin=ctmin,
555 tmax=ctmax)
557 codes_to_avail = defaultdict(list)
558 for coverage in coverages:
559 for tmin, tmax, _ in coverage.iter_spans():
560 codes_to_avail[coverage.codes].append((tmin, tmax))
562 def sgaps(nut):
563 for tmin, tmax in gaps(
564 codes_to_avail[nut.codes],
565 max(ctmin, nut.tmin) if ctmin is not None else nut.tmin,
566 min(ctmax, nut.tmax) if ctmax is not None else nut.tmax):
568 subnut = clone(nut)
569 subnut.tmin = tmin
570 subnut.tmax = tmax
572 # ignore 1-sample gaps produced by rounding errors
573 if subnut.tmax - subnut.tmin < 2*subnut.deltat:
574 continue
576 yield subnut
578 def wanted(nuts):
579 for nut in nuts:
580 if nut.tmin < now:
581 if nut.tmax > now:
582 nut.tmax = now
584 for nut in sgaps(nut):
585 yield nut
587 path = self._source_id
588 squirrel.add_virtual(
589 (make_waveform_promise_nut(
590 file_path=path,
591 **nut.waveform_promise_kwargs) for nut in wanted(nuts)),
592 virtual_paths=[path])
594 def remove_waveform_promises(self, squirrel, from_database='selection'):
595 '''
596 Remove waveform promises from live selection or global database.
598 :param from_database:
599 Remove from live selection ``'selection'`` or global database
600 ``'global'``.
601 '''
603 path = self._source_id
604 if from_database == 'selection':
605 squirrel.remove(path)
606 elif from_database == 'global':
607 squirrel.get_database().remove(path)
608 else:
609 raise ValueError(
610 'Values allowed for from_database: ("selection", "global")')
612 def _get_user_credentials(self):
613 d = {}
614 if self.user_credentials is not None:
615 d['user'], d['passwd'] = self.user_credentials
617 if self.auth_token is not None or self.auth_token_path is not None:
618 d['token'] = self.get_auth_token()
620 return d
622 def download_waveforms(
623 self, orders, success, batch_add, error_permanent,
624 error_temporary):
626 elog = ErrorLog(site=self.site)
627 orders.sort(key=orders_sort_key)
628 neach = 20
629 i = 0
630 task = make_task(
631 'FDSN "%s" waveforms: downloading' % self.site, len(orders))
633 while i < len(orders):
634 orders_now = orders[i:i+neach]
635 selection_now = orders_to_selection(orders_now)
637 nsuccess = 0
638 elog.append_checkpoint()
639 self._log_info_data(
640 'downloading, %s' % order_summary(orders_now))
642 all_paths = []
643 with tempfile.TemporaryDirectory() as tmpdir:
644 try:
645 data = fdsn.dataselect(
646 site=self.site, selection=selection_now,
647 **self._get_user_credentials())
649 now = time.time()
651 path = op.join(tmpdir, 'tmp.mseed')
652 with open(path, 'wb') as f:
653 while True:
654 buf = data.read(1024)
655 if not buf:
656 break
657 f.write(buf)
659 trs = io.load(path)
661 by_nslc = defaultdict(list)
662 for tr in trs:
663 by_nslc[tr.nslc_id].append(tr)
665 for order in orders_now:
666 trs_order = []
667 err_this = None
668 for tr in by_nslc[order.codes.nslc]:
669 try:
670 order.validate(tr)
671 trs_order.append(tr.chop(
672 order.tmin, order.tmax, inplace=False))
674 except trace.NoData:
675 err_this = (
676 'empty result', 'empty sub-interval')
678 except InvalidWaveform as e:
679 err_this = ('invalid waveform', str(e))
681 if len(trs_order) == 0:
682 if err_this is None:
683 err_this = ('empty result', '')
685 elog.append(now, order, *err_this)
686 error_permanent(order)
687 else:
688 def tsame(ta, tb):
689 return abs(tb - ta) < 2 * order.deltat
691 if len(trs_order) != 1 \
692 or not tsame(
693 trs_order[0].tmin, order.tmin) \
694 or not tsame(
695 trs_order[0].tmax, order.tmax):
697 if err_this:
698 elog.append(
699 now, order,
700 'partial result, %s' % err_this[0],
701 err_this[1])
702 else:
703 elog.append(now, order, 'partial result')
705 paths = self._archive.add(order, trs_order)
706 all_paths.extend(paths)
708 nsuccess += 1
709 success(order)
711 except fdsn.EmptyResult:
712 now = time.time()
713 for order in orders_now:
714 elog.append(now, order, 'empty result')
715 error_permanent(order)
717 except util.HTTPError as e:
718 now = time.time()
719 for order in orders_now:
720 elog.append(now, order, 'http error', str(e))
721 error_temporary(order)
723 emessage = elog.summarize_recent()
725 self._log_info_data(
726 '%i download%s %ssuccessful' % (
727 nsuccess,
728 util.plural_s(nsuccess),
729 '(partially) ' if emessage else '')
730 + (', %s' % emessage if emessage else ''))
732 if all_paths:
733 batch_add(all_paths)
735 i += neach
736 task.update(i)
738 for agg in elog.iter_aggregates():
739 logger.warning(str(agg))
741 task.done()
743 def _do_response_query(self, selection):
744 extra_args = {}
746 if self.site not in sites_not_supporting['includerestricted']:
747 extra_args.update(
748 includerestricted=(
749 self.user_credentials is not None
750 or self.auth_token is not None
751 or self.auth_token_path is not None))
753 self._log_responses('querying...')
755 try:
756 response_sx = fdsn.station(
757 site=self.site,
758 level='response',
759 selection=selection,
760 **extra_args)
762 self._hotfix('response', response_sx)
763 return response_sx
765 except fdsn.EmptyResult:
766 return stationxml.FDSNStationXML(source='dummy-empty-result')
768 def update_response_inventory(self, squirrel, constraint):
769 cpath = os.path.abspath(self._get_channels_path())
770 nuts = squirrel.iter_nuts(
771 'channel', path=cpath, codes=constraint.codes)
773 tmin = g_tmin_queries
774 tmax = g_tmax
776 selection = []
777 now = time.time()
778 have = set()
779 status = defaultdict(list)
780 for nut in nuts:
781 nslc = nut.codes.nslc
782 if nslc in have:
783 continue
784 have.add(nslc)
786 fn = self._get_responses_path(nslc)
787 expiration_time = self._get_expiration_time(fn)
788 if os.path.exists(fn) \
789 and (expiration_time is None or now < expiration_time):
790 status['using cached'].append(nslc)
791 else:
792 selection.append(nslc + (tmin, tmax))
794 dummy = stationxml.FDSNStationXML(source='dummy-empty')
795 neach = 100
796 i = 0
797 fns = []
798 while i < len(selection):
799 selection_now = selection[i:i+neach]
800 i += neach
802 try:
803 sx = self._do_response_query(selection_now)
804 except Exception as e:
805 status['update failed (%s)' % str(e)].extend(
806 entry[:4] for entry in selection_now)
807 continue
809 sx.created = None # timestamp would ruin diff
811 by_nslc = dict(stationxml.split_channels(sx))
813 for entry in selection_now:
814 nslc = entry[:4]
815 response_sx = by_nslc.get(nslc, dummy)
816 try:
817 fn = self._get_responses_path(nslc)
818 fn_temp = fn + '.%i.temp' % os.getpid()
820 util.ensuredirs(fn_temp)
821 response_sx.dump_xml(filename=fn_temp)
823 status_this = move_or_keep(fn_temp, fn)
825 if status_this == 'upstream unchanged':
826 try:
827 squirrel.get_database().silent_touch(fn)
828 except ExecuteGet1Error:
829 pass
831 status[status_this].append(nslc)
832 fns.append(fn)
834 except OSError as e:
835 status['update failed (%s)' % str(e)].append(nslc)
837 for k in sorted(status):
838 if k.find('failed') != -1:
839 log_target = logger.error
840 else:
841 log_target = logger.info
843 self._log_responses(
844 '%s: %s' % (
845 k, codes_to_str_abbreviated(
846 CodesNSLCE(tup) for tup in status[k])),
847 target=log_target)
849 if fns:
850 squirrel.add(fns, kinds=['response'])
853__all__ = [
854 'FDSNSource',
855]