Coverage for /usr/local/lib/python3.13/dist-packages/pyrocko/squirrel/client/fdsn.py: 81%
525 statements
« prev ^ index » next coverage.py v7.6.0, created at 2025-12-04 10:41 +0000
« prev ^ index » next coverage.py v7.6.0, created at 2025-12-04 10:41 +0000
1# http://pyrocko.org - GPLv3
2#
3# The Pyrocko Developers, 21st Century
4# ---|P------/S----------~Lg----------
6'''
7Squirrel client to access FDSN web services for seismic waveforms and metadata.
8'''
10import time
11import os
12import copy
13import logging
14import tempfile
15import importlib.util
16from collections import defaultdict
17try:
18 import cPickle as pickle
19except ImportError:
20 import pickle
21import os.path as op
22from .base import Source, Constraint
23from ..model import make_waveform_promise_nut, ehash, InvalidWaveform, \
24 order_summary, WaveformOrder, g_tmin, g_tmax, g_tmin_queries, \
25 codes_to_str_abbreviated, CodesNSLCE
26from .. import storage
27from ..database import ExecuteGet1Error
28from pyrocko.squirrel.error import SquirrelError
29from pyrocko.client import fdsn
31from pyrocko import util, trace, io
32from pyrocko.io.io_common import FileLoadError
33from pyrocko.io import stationxml
34from pyrocko import progress
35from pyrocko import has_paths
37from pyrocko.guts import Object, String, Timestamp, List, Tuple, Int, Dict, \
38 Duration, Bool, clone, dump_all_spickle
41guts_prefix = 'squirrel'
43logger = logging.getLogger('psq.client.fdsn')
45g_sites_not_supporting = {
46 'startbefore': ['geonet'],
47 'includerestricted': ['geonet', 'ncedc', 'scedc']}
49g_keys_conflicting_post_codes = {
50 'network', 'station', 'location', 'channel', 'minlatitude', 'maxlatitude',
51 'minlongitude', 'maxlongitude', 'latitude', 'longitude', 'minradius',
52 'maxradius'}
55def make_task(*args):
56 return progress.task(*args, logger=logger)
59def diff(fn_a, fn_b):
60 try:
61 if os.stat(fn_a).st_size != os.stat(fn_b).st_size:
62 return True
64 except OSError:
65 return True
67 with open(fn_a, 'rb') as fa:
68 with open(fn_b, 'rb') as fb:
69 while True:
70 a = fa.read(1024)
71 b = fb.read(1024)
72 if a != b:
73 return True
75 if len(a) == 0 or len(b) == 0:
76 return False
79def move_or_keep(fn_temp, fn):
80 if op.exists(fn):
81 if diff(fn, fn_temp):
82 os.rename(fn_temp, fn)
83 status = 'updated'
84 else:
85 os.unlink(fn_temp)
86 status = 'upstream unchanged'
88 else:
89 os.rename(fn_temp, fn)
90 status = 'new'
92 return status
95def combine_selections(selection):
96 out = []
97 last = None
98 for this in selection:
99 if last and this[:4] == last[:4] and this[4] == last[5]:
100 last = last[:5] + (this[5],)
101 else:
102 if last:
103 out.append(last)
105 last = this
107 if last:
108 out.append(last)
110 return out
113def orders_sort_key(order):
114 return (order.codes, order.tmin)
117def orders_to_selection(orders, pad=1.0):
118 selection = []
119 nslc_to_deltat = {}
120 for order in sorted(orders, key=orders_sort_key):
121 selection.append(
122 order.codes.nslc + (order.tmin, order.tmax))
123 nslc_to_deltat[order.codes.nslc] = order.deltat
125 selection = combine_selections(selection)
126 selection_padded = []
127 for (net, sta, loc, cha, tmin, tmax) in selection:
128 deltat = nslc_to_deltat[net, sta, loc, cha]
129 selection_padded.append((
130 net, sta, loc, cha, tmin-pad*deltat, tmax+pad*deltat))
132 return selection_padded
135def codes_to_selection(codes_list, tmin, tmax):
136 if codes_list is None:
137 return None
139 selection = []
140 for codes in sorted(codes_list):
141 selection.append(
142 codes.nslc + (tmin, tmax))
144 return selection
147class ErrorEntry(Object):
148 time = Timestamp.T()
149 order = WaveformOrder.T()
150 kind = String.T()
151 details = String.T(optional=True)
154class ErrorAggregate(Object):
155 site = String.T()
156 kind = String.T()
157 details = String.T()
158 entries = List.T(ErrorEntry.T())
159 codes = List.T(CodesNSLCE.T())
160 time_spans = List.T(Tuple.T(2, Timestamp.T()))
162 def __str__(self):
163 codes = [str(x) for x in self.codes]
164 scodes = '\n' + util.ewrap(codes, indent=' ') if codes else '<none>'
165 tss = self.time_spans
166 sspans = '\n' + util.ewrap(('%s - %s' % (
167 util.time_to_str(ts[0]), util.time_to_str(ts[1])) for ts in tss),
168 indent=' ')
170 return ('FDSN "%s": download error summary for "%s" (%i)\n%s '
171 'Codes:%s\n Time spans:%s') % (
172 self.site,
173 self.kind,
174 len(self.entries),
175 ' Details: %s\n' % self.details if self.details else '',
176 scodes,
177 sspans)
180class ErrorLog(Object):
181 site = String.T()
182 entries = List.T(ErrorEntry.T())
183 checkpoints = List.T(Int.T())
185 def append_checkpoint(self):
186 self.checkpoints.append(len(self.entries))
188 def append(self, time, order, kind, details=''):
189 entry = ErrorEntry(time=time, order=order, kind=kind, details=details)
190 self.entries.append(entry)
192 def iter_aggregates(self):
193 by_kind_details = defaultdict(list)
194 for entry in self.entries:
195 by_kind_details[entry.kind, entry.details].append(entry)
197 kind_details = sorted(by_kind_details.keys())
199 for kind, details in kind_details:
200 entries = by_kind_details[kind, details]
201 codes = sorted(set(entry.order.codes for entry in entries))
202 selection = orders_to_selection(entry.order for entry in entries)
203 time_spans = sorted(set(row[-2:] for row in selection))
204 yield ErrorAggregate(
205 site=self.site,
206 kind=kind,
207 details=details,
208 entries=entries,
209 codes=codes,
210 time_spans=time_spans)
212 def summarize_recent(self):
213 ioff = self.checkpoints[-1] if self.checkpoints else 0
214 recent = self.entries[ioff:]
215 kinds = sorted(set(entry.kind for entry in recent))
216 if recent:
217 return '%i error%s (%s)' % (
218 len(recent), util.plural_s(recent), '; '.join(kinds))
219 else:
220 return ''
223class Aborted(SquirrelError):
224 pass
227class FDSNSource(Source, has_paths.HasPaths):
229 '''
230 Squirrel data-source to transparently get data from FDSN web services.
232 Attaching an :py:class:`FDSNSource` object to a
233 :py:class:`~pyrocko.squirrel.base.Squirrel` allows the latter to download
234 station and waveform data from an FDSN web service should the data not
235 already happen to be available locally.
236 '''
238 site = String.T(
239 help='FDSN site url or alias name (see '
240 ':py:mod:`pyrocko.client.fdsn`).')
242 query_args = Dict.T(
243 String.T(), String.T(),
244 optional=True,
245 help='Common query arguments, which are appended to all queries.')
247 codes = List.T(
248 CodesNSLCE.T(),
249 optional=True,
250 help='List of codes patterns to query via POST parameters.')
252 expires = Duration.T(
253 optional=True,
254 help='Expiration time [s]. Information older than this will be '
255 'refreshed. This only applies to station-metadata. Waveforms do '
256 'not expire. If set to ``None`` neither type of data expires.')
258 anxious = Duration.T(
259 default=600.,
260 help='Anxiety period [s]. Missing waveforms will not be treated as '
261 'permanently missing for orders spanning into times later than '
262 '(current time - anxious). Default: 600.')
264 cache_path = has_paths.Path.T(
265 optional=True,
266 help='Directory path where any downloaded waveforms and station '
267 'meta-data are to be kept. By default the Squirrel '
268 "environment's cache directory is used.")
270 shared_waveforms = Bool.T(
271 default=False,
272 help='If ``True``, waveforms are shared with other FDSN sources in '
273 'the same Squirrel environment. If ``False``, they are kept '
274 'separate.')
276 storage_path = has_paths.Path.T(
277 optional=True,
278 help='If set, manage waveforms in the given directory rather than '
279 'in the Squirrel cache.')
281 storage_scheme = storage.StorageSchemeChoice.T(
282 default='default',
283 help="Set layout of waveform storage. Available: %s. "
284 "Default: ``'default'``." % ', '.join(
285 "``'%s'``" % name
286 for name in storage.StorageSchemeChoice.choices))
288 user_credentials = Tuple.T(
289 2, String.T(),
290 optional=True,
291 help='User and password for FDSN servers requiring password '
292 'authentication')
294 auth_token = String.T(
295 optional=True,
296 help='Authentication token to be presented to the FDSN server.')
298 auth_token_path = has_paths.Path.T(
299 optional=True,
300 help='Path to file containing the authentication token to be '
301 'presented to the FDSN server.')
303 hotfix_module_path = has_paths.Path.T(
304 optional=True,
305 help='Path to Python module to locally patch metadata errors.')
307 def __init__(self, site, query_args=None, codes=None, **kwargs):
308 if codes:
309 codes = [CodesNSLCE(codes_) for codes_ in codes]
311 if codes is not None and query_args is not None:
312 conflicting = g_keys_conflicting_post_codes \
313 & set(query_args.keys())
315 if conflicting:
316 raise SquirrelError(
317 'Cannot use %s in `query_args` when `codes` are also '
318 'given.' % ' or '.join("'%s'" % k for k in conflicting))
320 Source.__init__(
321 self, site=site, query_args=query_args, codes=codes, **kwargs)
323 self._constraint = None
324 self._hash = self.make_hash()
325 self._source_id = 'client:fdsn:%s' % self._hash
326 self._error_infos = []
327 self._cache_path = None
329 def get_cache_path(self):
330 return op.join(self._cache_path, self._hash)
332 def describe(self):
333 return self._source_id
335 def make_hash(self):
336 s = self.site
337 s += 'notoken' \
338 if (self.auth_token is None and self.auth_token_path is None) \
339 else 'token'
341 if self.user_credentials is not None:
342 s += self.user_credentials[0]
343 else:
344 s += 'nocred'
346 if self.query_args is not None:
347 s += ','.join(
348 '%s:%s' % (k, self.query_args[k])
349 for k in sorted(self.query_args.keys()))
350 else:
351 s += 'noqueryargs'
353 if self.codes is not None:
354 s += 'post_codes:' + ','.join(
355 codes.safe_str for codes in self.codes)
357 if self.hotfix_module_path is not None:
358 s += 'hotfix:' + self.hotfix_module_path
360 return ehash(s)
362 def get_hash(self):
363 return self._hash
365 def get_auth_token(self):
366 if self.auth_token:
367 return self.auth_token
369 elif self.auth_token_path is not None:
370 try:
371 with open(self.auth_token_path, 'rb') as f:
372 return f.read().decode('ascii')
374 except OSError as e:
375 raise FileLoadError(
376 'Cannot load auth token file (%s): %s'
377 % (str(e), self.auth_token_path))
379 else:
380 raise Exception(
381 'FDSNSource: auth_token and auth_token_path are mutually '
382 'exclusive.')
384 def setup(self, squirrel, check=True, upgrade=False):
385 self._cache_path = op.join(
386 self.cache_path or squirrel._cache_path, 'fdsn')
388 util.ensuredir(self._cache_path)
389 self._load_constraint()
390 self._archive = storage.get_storage_scheme(self.storage_scheme)
392 waveforms_path = self._get_waveforms_path()
393 util.ensuredir(waveforms_path)
394 self._archive.set_base_path(waveforms_path)
396 for waveforms_path_compat in self._get_waveforms_paths_compat():
397 if os.path.exists(waveforms_path_compat):
399 if upgrade:
400 import shutil
401 from pyrocko.squirrel.tool.commands.jackseis \
402 import Converter
404 logger.info(
405 'Upgrading waveform archive.\n old: %s\n new: %s',
406 waveforms_path_compat,
407 waveforms_path)
409 converter = Converter(
410 in_path=waveforms_path_compat,
411 out_storage_path=waveforms_path,
412 tinc=3600.)
413 converter.set_basepath('.')
415 converter.convert(
416 squirrel_factory=lambda: squirrel,
417 append=True)
419 shutil.rmtree(waveforms_path_compat)
421 else:
422 logger.warning(
423 'Waveform archive with old layout: %s\n💡 Use '
424 '`squirrel ... --upgrade-storage` or '
425 '`.add_dataset(..., upgrade=True)` '
426 'to upgrade.' % waveforms_path_compat)
427 squirrel.add(waveforms_path_compat, check=check)
429 squirrel.add(
430 self._get_waveforms_path(),
431 check=check)
433 fn = self._get_channels_path()
434 if os.path.exists(fn):
435 squirrel.add(fn)
437 squirrel.add_virtual(
438 [], virtual_paths=[self._source_id])
440 responses_path = self._get_responses_path()
441 if os.path.exists(responses_path):
442 squirrel.add(
443 responses_path, kinds=['response'], exclude=r'\.temp$')
445 self._hotfix_module = None
447 def _hotfix(self, query_type, sx):
448 if self.hotfix_module_path is None:
449 return
451 if self._hotfix_module is None:
452 module_path = self.expand_path(self.hotfix_module_path)
453 spec = importlib.util.spec_from_file_location(
454 'hotfix_' + self._hash, module_path)
455 self._hotfix_module = importlib.util.module_from_spec(spec)
456 spec.loader.exec_module(self._hotfix_module)
458 hook = getattr(
459 self._hotfix_module, 'stationxml_' + query_type + '_hook')
461 return hook(sx)
463 def _get_constraint_path(self):
464 return op.join(self._cache_path, self._hash, 'constraint.pickle')
466 def _get_channels_path(self):
467 return op.join(self._cache_path, self._hash, 'channels.stationxml')
469 def _get_responses_path(self, nslc=None):
470 dirpath = op.join(
471 self._cache_path, self._hash, 'responses')
473 if nslc is None:
474 return dirpath
475 else:
476 return op.join(
477 dirpath, 'response_%s_%s_%s_%s.stationxml' % nslc)
479 def _get_waveforms_path(self):
480 if self.storage_path:
481 return self.expand_path(self.storage_path)
483 if self.shared_waveforms:
484 return op.join(self._cache_path, 'waveforms-v2')
485 else:
486 return op.join(self._cache_path, self._hash, 'waveforms-v2')
488 def _get_waveforms_paths_compat(self):
489 if self.shared_waveforms:
490 return [op.join(self._cache_path, 'waveforms')]
491 else:
492 return [op.join(self._cache_path, self._hash, 'waveforms')]
494 def _log_meta(self, message, target=logger.info):
495 log_prefix = 'FDSN "%s" metadata:' % self.site
496 target(' '.join((log_prefix, message)))
498 def _log_responses(self, message, target=logger.info):
499 log_prefix = 'FDSN "%s" responses:' % self.site
500 target(' '.join((log_prefix, message)))
502 def _log_info_data(self, *args):
503 log_prefix = 'FDSN "%s" waveforms:' % self.site
504 logger.info(' '.join((log_prefix,) + args))
506 def _str_expires(self, t, now):
507 if t is None:
508 return 'expires: never'
509 else:
510 expire = 'expires' if t > now else 'expired'
511 return '%s: %s' % (
512 expire,
513 util.time_to_str(t, format='%Y-%m-%d %H:%M:%S'))
515 def update_channel_inventory(self, squirrel, constraint=None):
516 if constraint is None:
517 constraint = Constraint()
519 expiration_time = self._get_channels_expiration_time()
520 now = time.time()
522 log_target = logger.info
523 if self._constraint and self._constraint.contains(constraint) \
524 and (expiration_time is None or now < expiration_time):
526 status = 'using cached'
528 else:
529 if self._constraint:
530 constraint_temp = copy.deepcopy(self._constraint)
531 constraint_temp.expand(constraint)
532 constraint = constraint_temp
534 try:
535 channel_sx = self._do_channel_query(constraint)
537 channel_sx.created = None # timestamp would ruin diff
539 fn = self._get_channels_path()
540 util.ensuredirs(fn)
541 fn_temp = fn + '.%i.temp' % os.getpid()
543 dump_all_spickle([channel_sx], filename=fn_temp)
544 # channel_sx.dump_xml(filename=fn_temp)
546 status = move_or_keep(fn_temp, fn)
548 if status == 'upstream unchanged':
549 squirrel.get_database().silent_touch(fn)
551 self._constraint = constraint
552 self._dump_constraint()
554 except OSError as e:
555 status = 'update failed (%s)' % str(e)
556 log_target = logger.error
558 expiration_time = self._get_channels_expiration_time()
559 self._log_meta(
560 '%s (%s)' % (status, self._str_expires(expiration_time, now)),
561 target=log_target)
563 fn = self._get_channels_path()
564 if os.path.exists(fn):
565 squirrel.add(fn)
567 def _do_channel_query(self, constraint):
568 extra_args = {}
570 tmin = constraint.tmin \
571 if constraint.tmin is not None and constraint.tmin != g_tmin \
572 else g_tmin_queries
574 tmax = constraint.tmax \
575 if constraint.tmax is not None and constraint.tmax != g_tmax \
576 else g_tmax
578 if self.site in g_sites_not_supporting['startbefore']:
579 ktmin = 'starttime'
580 ktmax = 'endtime'
581 else:
582 ktmin = 'endafter'
583 ktmax = 'startbefore'
585 if self.codes is None:
586 extra_args[ktmin] = tmin
587 extra_args[ktmax] = tmax
589 if self.site not in g_sites_not_supporting['includerestricted']:
590 extra_args.update(
591 includerestricted=(
592 self.user_credentials is not None
593 or self.auth_token is not None
594 or self.auth_token_path is not None))
596 if self.query_args is not None:
597 extra_args.update(self.query_args)
599 self._log_meta('querying...')
601 try:
602 channel_sx = fdsn.station(
603 site=self.site,
604 format='text',
605 level='channel',
606 selection=codes_to_selection(self.codes, tmin, tmax),
607 **extra_args)
609 self._hotfix('channel', channel_sx)
611 return channel_sx
613 except fdsn.EmptyResult:
614 return stationxml.FDSNStationXML(source='dummy-empty-result')
616 except fdsn.DownloadError as e:
617 raise SquirrelError(str(e))
619 def _load_constraint(self):
620 fn = self._get_constraint_path()
621 if op.exists(fn):
622 with open(fn, 'rb') as f:
623 self._constraint = pickle.load(f)
624 else:
625 self._constraint = None
627 def _dump_constraint(self):
628 with open(self._get_constraint_path(), 'wb') as f:
629 pickle.dump(self._constraint, f, protocol=2)
631 def _get_expiration_time(self, path):
632 if self.expires is None:
633 return None
635 try:
636 t = os.stat(path)[8]
637 return t + self.expires
639 except OSError:
640 return 0.0
642 def _get_channels_expiration_time(self):
643 return self._get_expiration_time(self._get_channels_path())
645 def update_waveform_promises(self, squirrel, constraint):
646 from ..base import gaps
647 cpath = os.path.abspath(self._get_channels_path())
649 ctmin = constraint.tmin
650 ctmax = constraint.tmax
652 nuts = squirrel.iter_nuts(
653 'channel',
654 path=cpath,
655 codes=constraint.codes,
656 tmin=ctmin,
657 tmax=ctmax)
659 coverages = squirrel.get_coverage(
660 'waveform',
661 codes=constraint.codes if constraint.codes else None,
662 tmin=ctmin,
663 tmax=ctmax)
665 codes_to_avail = defaultdict(list)
666 for coverage in coverages:
667 for tmin, tmax, _ in coverage.iter_spans():
668 codes_to_avail[coverage.codes].append((tmin, tmax))
670 def sgaps(nut):
671 for tmin, tmax in gaps(
672 codes_to_avail[nut.codes],
673 max(ctmin, nut.tmin) if ctmin is not None else nut.tmin,
674 min(ctmax, nut.tmax) if ctmax is not None else nut.tmax):
676 subnut = clone(nut)
677 subnut.tmin = tmin
678 subnut.tmax = tmax
680 if subnut.deltat is None:
681 continue
683 # ignore 1-sample gaps produced by rounding errors
684 if subnut.tmax - subnut.tmin < 2*subnut.deltat:
685 continue
687 yield subnut
689 def wanted(nuts):
690 for nut in nuts:
691 if nut.deltat is None:
692 logger.warning(
693 'Ignoring channel with unknown sampling rate: %s'
694 % str(nut.codes))
695 continue
697 for nut in sgaps(nut):
698 yield nut
700 path = self._source_id
701 squirrel.add_virtual(
702 (make_waveform_promise_nut(
703 file_path=path,
704 file_format='virtual',
705 file_mtime=0.0,
706 file_size=0,
707 **nut.waveform_promise_kwargs) for nut in wanted(nuts)),
708 virtual_paths=[path])
710 def remove_waveform_promises(self, squirrel, from_database='selection'):
711 '''
712 Remove waveform promises from live selection or global database.
714 :param from_database:
715 Remove from live selection ``'selection'`` or global database
716 ``'global'``.
717 '''
719 path = self._source_id
720 if from_database == 'selection':
721 squirrel.remove(path)
722 elif from_database == 'global':
723 squirrel.get_database().remove(path)
724 else:
725 raise ValueError(
726 'Values allowed for from_database: ("selection", "global")')
728 def _get_user_credentials(self):
729 d = {}
730 if self.user_credentials is not None:
731 d['user'], d['passwd'] = self.user_credentials
733 if self.auth_token is not None or self.auth_token_path is not None:
734 d['token'] = self.get_auth_token()
736 return d
738 def save_waveforms(self, trs):
739 return self._archive.save(trs, check_append_merge=True)
741 def download_waveforms(
742 self, orders, success, error_permanent, error_temporary, aborted):
744 elog = ErrorLog(site=self.site)
745 orders.sort(key=orders_sort_key)
746 neach = 20
747 i = 0
748 task = make_task(
749 'FDSN "%s" waveforms: downloading' % self.site, len(orders))
751 while i < len(orders) and not aborted():
752 orders_now = orders[i:i+neach]
753 selection_now = orders_to_selection(orders_now)
754 nsamples_estimate = sum(
755 order.estimate_nsamples() for order in orders_now)
757 nsuccess = 0
758 elog.append_checkpoint()
759 self._log_info_data(
760 'downloading, %s' % order_summary(orders_now))
762 with tempfile.TemporaryDirectory() as tmpdir:
763 try:
764 data = fdsn.dataselect(
765 site=self.site, selection=selection_now,
766 **self._get_user_credentials())
768 now = time.time()
770 path = op.join(tmpdir, 'tmp.mseed')
771 with open(path, 'wb') as f:
772 nread = 0
773 while True:
774 buf = data.read(1024)
775 nread += len(buf)
776 if not buf:
777 break
778 f.write(buf)
780 # abort if we get way more data than expected
781 if nread > max(
782 1024 * 1000,
783 nsamples_estimate * 4 * 10):
785 raise Aborted('Too much data received.')
787 trs = io.load(path)
789 by_nslc = defaultdict(list)
790 for tr in trs:
791 by_nslc[tr.nslc_id].append(tr)
793 for order in orders_now:
794 trs_order = []
795 err_this = None
796 for tr in by_nslc[order.codes.nslc]:
797 try:
798 order.validate(tr)
799 trs_order.append(tr.chop(
800 order.tmin, order.tmax, inplace=False))
802 except trace.NoData:
803 err_this = (
804 'empty result', 'empty sub-interval')
806 except InvalidWaveform as e:
807 err_this = ('invalid waveform', str(e))
809 if len(trs_order) == 0:
810 if err_this is None:
811 err_this = ('empty result', '')
813 elog.append(now, order, *err_this)
814 if order.is_near_real_time():
815 error_temporary(order)
816 else:
817 error_permanent(order)
818 else:
819 def tsame(ta, tb):
820 return abs(tb - ta) < 2 * order.deltat
822 if len(trs_order) != 1 \
823 or not tsame(
824 trs_order[0].tmin, order.tmin) \
825 or not tsame(
826 trs_order[0].tmax, order.tmax):
828 if err_this:
829 elog.append(
830 now, order,
831 'partial result, %s' % err_this[0],
832 err_this[1])
833 else:
834 elog.append(now, order, 'partial result')
836 nsuccess += 1
837 success(order, trs_order)
839 except fdsn.EmptyResult:
840 now = time.time()
841 for order in orders_now:
842 elog.append(now, order, 'empty result')
843 if order.is_near_real_time():
844 error_temporary(order)
845 else:
846 error_permanent(order)
848 except Aborted as e:
849 now = time.time()
850 for order in orders_now:
851 elog.append(now, order, 'aborted', str(e))
852 error_permanent(order)
854 except (util.HTTPError, fdsn.DownloadError) as e:
855 now = time.time()
856 for order in orders_now:
857 elog.append(now, order, 'http error', str(e))
858 error_temporary(order)
860 emessage = elog.summarize_recent()
862 self._log_info_data(
863 '%i download%s %ssuccessful' % (
864 nsuccess,
865 util.plural_s(nsuccess),
866 '(partially) ' if emessage else '')
867 + (', %s' % emessage if emessage else ''))
869 i += neach
870 task.update(i)
872 for agg in elog.iter_aggregates():
873 logger.warning(str(agg))
875 task.done()
877 def _do_response_query(self, selection):
878 extra_args = {}
880 if self.site not in g_sites_not_supporting['includerestricted']:
881 extra_args.update(
882 includerestricted=(
883 self.user_credentials is not None
884 or self.auth_token is not None
885 or self.auth_token_path is not None))
887 self._log_responses('querying...')
889 try:
890 response_sx = fdsn.station(
891 site=self.site,
892 level='response',
893 selection=selection,
894 **extra_args)
896 self._hotfix('response', response_sx)
897 return response_sx
899 except fdsn.EmptyResult:
900 return stationxml.FDSNStationXML(source='dummy-empty-result')
902 except fdsn.DownloadError as e:
903 raise SquirrelError(str(e))
905 def update_response_inventory(self, squirrel, constraint):
906 cpath = os.path.abspath(self._get_channels_path())
907 nuts = squirrel.iter_nuts(
908 'channel', path=cpath, codes=constraint.codes)
910 tmin = g_tmin_queries
911 tmax = g_tmax
913 selection = []
914 now = time.time()
915 have = set()
916 status = defaultdict(list)
917 for nut in nuts:
918 nslc = nut.codes.nslc
919 if nslc in have:
920 continue
921 have.add(nslc)
923 fn = self._get_responses_path(nslc)
924 expiration_time = self._get_expiration_time(fn)
925 if os.path.exists(fn) \
926 and (expiration_time is None or now < expiration_time):
927 status['using cached'].append(nslc)
928 else:
929 selection.append(nslc + (tmin, tmax))
931 dummy = stationxml.FDSNStationXML(source='dummy-empty')
932 neach = 100
933 i = 0
934 fns = []
935 while i < len(selection):
936 selection_now = selection[i:i+neach]
937 i += neach
939 try:
940 sx = self._do_response_query(selection_now)
941 except Exception as e:
942 status['update failed (%s)' % str(e)].extend(
943 entry[:4] for entry in selection_now)
944 continue
946 sx.created = None # timestamp would ruin diff
948 by_nslc = dict(stationxml.split_channels(sx))
950 for entry in selection_now:
951 nslc = entry[:4]
952 response_sx = by_nslc.get(nslc, dummy)
953 try:
954 fn = self._get_responses_path(nslc)
955 fn_temp = fn + '.%i.temp' % os.getpid()
957 util.ensuredirs(fn_temp)
959 dump_all_spickle([response_sx], filename=fn_temp)
960 # response_sx.dump_xml(filename=fn_temp)
962 status_this = move_or_keep(fn_temp, fn)
964 if status_this == 'upstream unchanged':
965 try:
966 squirrel.get_database().silent_touch(fn)
967 except ExecuteGet1Error:
968 pass
970 status[status_this].append(nslc)
971 fns.append(fn)
973 except OSError as e:
974 status['update failed (%s)' % str(e)].append(nslc)
976 for k in sorted(status):
977 if k.find('failed') != -1:
978 log_target = logger.error
979 else:
980 log_target = logger.info
982 self._log_responses(
983 '%s: %s' % (
984 k, codes_to_str_abbreviated(
985 CodesNSLCE(tup) for tup in status[k])),
986 target=log_target)
988 if fns:
989 squirrel.add(fns, kinds=['response'])
992__all__ = [
993 'FDSNSource',
994]