Coverage for /usr/local/lib/python3.11/dist-packages/pyrocko/squirrel/client/fdsn.py: 83%
510 statements
« prev ^ index » next coverage.py v6.5.0, created at 2024-03-07 11:54 +0000
« prev ^ index » next coverage.py v6.5.0, created at 2024-03-07 11:54 +0000
1# http://pyrocko.org - GPLv3
2#
3# The Pyrocko Developers, 21st Century
4# ---|P------/S----------~Lg----------
6'''
7Squirrel client to access FDSN web services for seismic waveforms and metadata.
8'''
10import time
11import os
12import copy
13import logging
14import tempfile
15import importlib.util
16from collections import defaultdict
17try:
18 import cPickle as pickle
19except ImportError:
20 import pickle
21import os.path as op
22from .base import Source, Constraint
23from ..model import make_waveform_promise_nut, ehash, InvalidWaveform, \
24 order_summary, WaveformOrder, g_tmin, g_tmax, g_tmin_queries, \
25 codes_to_str_abbreviated, CodesNSLCE
26from ..database import ExecuteGet1Error
27from pyrocko.squirrel.error import SquirrelError
28from pyrocko.client import fdsn
30from pyrocko import util, trace, io
31from pyrocko.io.io_common import FileLoadError
32from pyrocko.io import stationxml
33from pyrocko import progress
34from pyrocko import has_paths
36from pyrocko.guts import Object, String, Timestamp, List, Tuple, Int, Dict, \
37 Duration, Bool, clone, dump_all_spickle
40guts_prefix = 'squirrel'
42logger = logging.getLogger('psq.client.fdsn')
44g_sites_not_supporting = {
45 'startbefore': ['geonet'],
46 'includerestricted': ['geonet', 'ncedc', 'scedc']}
48g_keys_conflicting_post_codes = {
49 'network', 'station', 'location', 'channel', 'minlatitude', 'maxlatitude',
50 'minlongitude', 'maxlongitude', 'latitude', 'longitude', 'minradius',
51 'maxradius'}
54def make_task(*args):
55 return progress.task(*args, logger=logger)
58def diff(fn_a, fn_b):
59 try:
60 if os.stat(fn_a).st_size != os.stat(fn_b).st_size:
61 return True
63 except OSError:
64 return True
66 with open(fn_a, 'rb') as fa:
67 with open(fn_b, 'rb') as fb:
68 while True:
69 a = fa.read(1024)
70 b = fb.read(1024)
71 if a != b:
72 return True
74 if len(a) == 0 or len(b) == 0:
75 return False
78def move_or_keep(fn_temp, fn):
79 if op.exists(fn):
80 if diff(fn, fn_temp):
81 os.rename(fn_temp, fn)
82 status = 'updated'
83 else:
84 os.unlink(fn_temp)
85 status = 'upstream unchanged'
87 else:
88 os.rename(fn_temp, fn)
89 status = 'new'
91 return status
94class Archive(Object):
96 def add(self):
97 raise NotImplementedError()
100class MSeedArchive(Archive):
101 template = String.T(default=op.join(
102 '%(tmin_year)s',
103 '%(tmin_month)s',
104 '%(tmin_day)s',
105 'trace_%(network)s_%(station)s_%(location)s_%(channel)s'
106 + '_%(block_tmin_us)s_%(block_tmax_us)s.mseed'))
108 def __init__(self, **kwargs):
109 Archive.__init__(self, **kwargs)
110 self._base_path = None
112 def set_base_path(self, path):
113 self._base_path = path
115 def add(self, order, trs):
116 path = op.join(self._base_path, self.template)
117 fmt = '%Y-%m-%d_%H-%M-%S.6FRAC'
118 return io.save(trs, path, overwrite=True, additional=dict(
119 block_tmin_us=util.time_to_str(order.tmin, format=fmt),
120 block_tmax_us=util.time_to_str(order.tmax, format=fmt)))
123def combine_selections(selection):
124 out = []
125 last = None
126 for this in selection:
127 if last and this[:4] == last[:4] and this[4] == last[5]:
128 last = last[:5] + (this[5],)
129 else:
130 if last:
131 out.append(last)
133 last = this
135 if last:
136 out.append(last)
138 return out
141def orders_sort_key(order):
142 return (order.codes, order.tmin)
145def orders_to_selection(orders, pad=1.0):
146 selection = []
147 nslc_to_deltat = {}
148 for order in sorted(orders, key=orders_sort_key):
149 selection.append(
150 order.codes.nslc + (order.tmin, order.tmax))
151 nslc_to_deltat[order.codes.nslc] = order.deltat
153 selection = combine_selections(selection)
154 selection_padded = []
155 for (net, sta, loc, cha, tmin, tmax) in selection:
156 deltat = nslc_to_deltat[net, sta, loc, cha]
157 selection_padded.append((
158 net, sta, loc, cha, tmin-pad*deltat, tmax+pad*deltat))
160 return selection_padded
163def codes_to_selection(codes_list, tmin, tmax):
164 if codes_list is None:
165 return None
167 selection = []
168 for codes in sorted(codes_list):
169 selection.append(
170 codes.nslc + (tmin, tmax))
172 return selection
175class ErrorEntry(Object):
176 time = Timestamp.T()
177 order = WaveformOrder.T()
178 kind = String.T()
179 details = String.T(optional=True)
182class ErrorAggregate(Object):
183 site = String.T()
184 kind = String.T()
185 details = String.T()
186 entries = List.T(ErrorEntry.T())
187 codes = List.T(CodesNSLCE.T())
188 time_spans = List.T(Tuple.T(2, Timestamp.T()))
190 def __str__(self):
191 codes = [str(x) for x in self.codes]
192 scodes = '\n' + util.ewrap(codes, indent=' ') if codes else '<none>'
193 tss = self.time_spans
194 sspans = '\n' + util.ewrap(('%s - %s' % (
195 util.time_to_str(ts[0]), util.time_to_str(ts[1])) for ts in tss),
196 indent=' ')
198 return ('FDSN "%s": download error summary for "%s" (%i)\n%s '
199 'Codes:%s\n Time spans:%s') % (
200 self.site,
201 self.kind,
202 len(self.entries),
203 ' Details: %s\n' % self.details if self.details else '',
204 scodes,
205 sspans)
208class ErrorLog(Object):
209 site = String.T()
210 entries = List.T(ErrorEntry.T())
211 checkpoints = List.T(Int.T())
213 def append_checkpoint(self):
214 self.checkpoints.append(len(self.entries))
216 def append(self, time, order, kind, details=''):
217 entry = ErrorEntry(time=time, order=order, kind=kind, details=details)
218 self.entries.append(entry)
220 def iter_aggregates(self):
221 by_kind_details = defaultdict(list)
222 for entry in self.entries:
223 by_kind_details[entry.kind, entry.details].append(entry)
225 kind_details = sorted(by_kind_details.keys())
227 for kind, details in kind_details:
228 entries = by_kind_details[kind, details]
229 codes = sorted(set(entry.order.codes for entry in entries))
230 selection = orders_to_selection(entry.order for entry in entries)
231 time_spans = sorted(set(row[-2:] for row in selection))
232 yield ErrorAggregate(
233 site=self.site,
234 kind=kind,
235 details=details,
236 entries=entries,
237 codes=codes,
238 time_spans=time_spans)
240 def summarize_recent(self):
241 ioff = self.checkpoints[-1] if self.checkpoints else 0
242 recent = self.entries[ioff:]
243 kinds = sorted(set(entry.kind for entry in recent))
244 if recent:
245 return '%i error%s (%s)' % (
246 len(recent), util.plural_s(recent), '; '.join(kinds))
247 else:
248 return ''
251class Aborted(SquirrelError):
252 pass
255class FDSNSource(Source, has_paths.HasPaths):
257 '''
258 Squirrel data-source to transparently get data from FDSN web services.
260 Attaching an :py:class:`FDSNSource` object to a
261 :py:class:`~pyrocko.squirrel.base.Squirrel` allows the latter to download
262 station and waveform data from an FDSN web service should the data not
263 already happen to be available locally.
264 '''
266 site = String.T(
267 help='FDSN site url or alias name (see '
268 ':py:mod:`pyrocko.client.fdsn`).')
270 query_args = Dict.T(
271 String.T(), String.T(),
272 optional=True,
273 help='Common query arguments, which are appended to all queries.')
275 codes = List.T(
276 CodesNSLCE.T(),
277 optional=True,
278 help='List of codes patterns to query via POST parameters.')
280 expires = Duration.T(
281 optional=True,
282 help='Expiration time [s]. Information older than this will be '
283 'refreshed. This only applies to station-metadata. Waveforms do '
284 'not expire. If set to ``None`` neither type of data expires.')
286 cache_path = String.T(
287 optional=True,
288 help='Directory path where any downloaded waveforms and station '
289 'meta-data are to be kept. By default the Squirrel '
290 "environment's cache directory is used.")
292 shared_waveforms = Bool.T(
293 default=False,
294 help='If ``True``, waveforms are shared with other FDSN sources in '
295 'the same Squirrel environment. If ``False``, they are kept '
296 'separate.')
298 user_credentials = Tuple.T(
299 2, String.T(),
300 optional=True,
301 help='User and password for FDSN servers requiring password '
302 'authentication')
304 auth_token = String.T(
305 optional=True,
306 help='Authentication token to be presented to the FDSN server.')
308 auth_token_path = String.T(
309 optional=True,
310 help='Path to file containing the authentication token to be '
311 'presented to the FDSN server.')
313 hotfix_module_path = has_paths.Path.T(
314 optional=True,
315 help='Path to Python module to locally patch metadata errors.')
317 def __init__(self, site, query_args=None, codes=None, **kwargs):
318 if codes:
319 codes = [CodesNSLCE(codes_) for codes_ in codes]
321 if codes is not None and query_args is not None:
322 conflicting = g_keys_conflicting_post_codes \
323 & set(query_args.keys())
325 if conflicting:
326 raise SquirrelError(
327 'Cannot use %s in `query_args` when `codes` are also '
328 'given.' % ' or '.join("'%s'" % k for k in conflicting))
330 Source.__init__(
331 self, site=site, query_args=query_args, codes=codes, **kwargs)
333 self._constraint = None
334 self._hash = self.make_hash()
335 self._source_id = 'client:fdsn:%s' % self._hash
336 self._error_infos = []
338 def describe(self):
339 return self._source_id
341 def make_hash(self):
342 s = self.site
343 s += 'notoken' \
344 if (self.auth_token is None and self.auth_token_path is None) \
345 else 'token'
347 if self.user_credentials is not None:
348 s += self.user_credentials[0]
349 else:
350 s += 'nocred'
352 if self.query_args is not None:
353 s += ','.join(
354 '%s:%s' % (k, self.query_args[k])
355 for k in sorted(self.query_args.keys()))
356 else:
357 s += 'noqueryargs'
359 if self.codes is not None:
360 s += 'post_codes:' + ','.join(
361 codes.safe_str for codes in self.codes)
363 return ehash(s)
365 def get_hash(self):
366 return self._hash
368 def get_auth_token(self):
369 if self.auth_token:
370 return self.auth_token
372 elif self.auth_token_path is not None:
373 try:
374 with open(self.auth_token_path, 'rb') as f:
375 return f.read().decode('ascii')
377 except OSError as e:
378 raise FileLoadError(
379 'Cannot load auth token file (%s): %s'
380 % (str(e), self.auth_token_path))
382 else:
383 raise Exception(
384 'FDSNSource: auth_token and auth_token_path are mutually '
385 'exclusive.')
387 def setup(self, squirrel, check=True):
388 self._cache_path = op.join(
389 self.cache_path or squirrel._cache_path, 'fdsn')
391 util.ensuredir(self._cache_path)
392 self._load_constraint()
393 self._archive = MSeedArchive()
394 waveforms_path = self._get_waveforms_path()
395 util.ensuredir(waveforms_path)
396 self._archive.set_base_path(waveforms_path)
398 squirrel.add(
399 self._get_waveforms_path(),
400 check=check)
402 fn = self._get_channels_path()
403 if os.path.exists(fn):
404 squirrel.add(fn)
406 squirrel.add_virtual(
407 [], virtual_paths=[self._source_id])
409 responses_path = self._get_responses_path()
410 if os.path.exists(responses_path):
411 squirrel.add(
412 responses_path, kinds=['response'], exclude=r'\.temp$')
414 self._hotfix_module = None
416 def _hotfix(self, query_type, sx):
417 if self.hotfix_module_path is None:
418 return
420 if self._hotfix_module is None:
421 module_path = self.expand_path(self.hotfix_module_path)
422 spec = importlib.util.spec_from_file_location(
423 'hotfix_' + self._hash, module_path)
424 self._hotfix_module = importlib.util.module_from_spec(spec)
425 spec.loader.exec_module(self._hotfix_module)
427 hook = getattr(
428 self._hotfix_module, 'stationxml_' + query_type + '_hook')
430 return hook(sx)
432 def _get_constraint_path(self):
433 return op.join(self._cache_path, self._hash, 'constraint.pickle')
435 def _get_channels_path(self):
436 return op.join(self._cache_path, self._hash, 'channels.stationxml')
438 def _get_responses_path(self, nslc=None):
439 dirpath = op.join(
440 self._cache_path, self._hash, 'responses')
442 if nslc is None:
443 return dirpath
444 else:
445 return op.join(
446 dirpath, 'response_%s_%s_%s_%s.stationxml' % nslc)
448 def _get_waveforms_path(self):
449 if self.shared_waveforms:
450 return op.join(self._cache_path, 'waveforms')
451 else:
452 return op.join(self._cache_path, self._hash, 'waveforms')
454 def _log_meta(self, message, target=logger.info):
455 log_prefix = 'FDSN "%s" metadata:' % self.site
456 target(' '.join((log_prefix, message)))
458 def _log_responses(self, message, target=logger.info):
459 log_prefix = 'FDSN "%s" responses:' % self.site
460 target(' '.join((log_prefix, message)))
462 def _log_info_data(self, *args):
463 log_prefix = 'FDSN "%s" waveforms:' % self.site
464 logger.info(' '.join((log_prefix,) + args))
466 def _str_expires(self, t, now):
467 if t is None:
468 return 'expires: never'
469 else:
470 expire = 'expires' if t > now else 'expired'
471 return '%s: %s' % (
472 expire,
473 util.time_to_str(t, format='%Y-%m-%d %H:%M:%S'))
475 def update_channel_inventory(self, squirrel, constraint=None):
476 if constraint is None:
477 constraint = Constraint()
479 expiration_time = self._get_channels_expiration_time()
480 now = time.time()
482 log_target = logger.info
483 if self._constraint and self._constraint.contains(constraint) \
484 and (expiration_time is None or now < expiration_time):
486 status = 'using cached'
488 else:
489 if self._constraint:
490 constraint_temp = copy.deepcopy(self._constraint)
491 constraint_temp.expand(constraint)
492 constraint = constraint_temp
494 try:
495 channel_sx = self._do_channel_query(constraint)
497 channel_sx.created = None # timestamp would ruin diff
499 fn = self._get_channels_path()
500 util.ensuredirs(fn)
501 fn_temp = fn + '.%i.temp' % os.getpid()
503 dump_all_spickle([channel_sx], filename=fn_temp)
504 # channel_sx.dump_xml(filename=fn_temp)
506 status = move_or_keep(fn_temp, fn)
508 if status == 'upstream unchanged':
509 squirrel.get_database().silent_touch(fn)
511 self._constraint = constraint
512 self._dump_constraint()
514 except OSError as e:
515 status = 'update failed (%s)' % str(e)
516 log_target = logger.error
518 expiration_time = self._get_channels_expiration_time()
519 self._log_meta(
520 '%s (%s)' % (status, self._str_expires(expiration_time, now)),
521 target=log_target)
523 fn = self._get_channels_path()
524 if os.path.exists(fn):
525 squirrel.add(fn)
527 def _do_channel_query(self, constraint):
528 extra_args = {}
530 tmin = constraint.tmin \
531 if constraint.tmin is not None and constraint.tmin != g_tmin \
532 else g_tmin_queries
534 tmax = constraint.tmax \
535 if constraint.tmax is not None and constraint.tmax != g_tmax \
536 else g_tmax
538 if self.site in g_sites_not_supporting['startbefore']:
539 ktmin = 'starttime'
540 ktmax = 'endtime'
541 else:
542 ktmin = 'endafter'
543 ktmax = 'startbefore'
545 if self.codes is None:
546 extra_args[ktmin] = tmin
547 extra_args[ktmax] = tmax
549 if self.site not in g_sites_not_supporting['includerestricted']:
550 extra_args.update(
551 includerestricted=(
552 self.user_credentials is not None
553 or self.auth_token is not None
554 or self.auth_token_path is not None))
556 if self.query_args is not None:
557 extra_args.update(self.query_args)
559 self._log_meta('querying...')
561 try:
562 channel_sx = fdsn.station(
563 site=self.site,
564 format='text',
565 level='channel',
566 selection=codes_to_selection(self.codes, tmin, tmax),
567 **extra_args)
569 self._hotfix('channel', channel_sx)
571 return channel_sx
573 except fdsn.EmptyResult:
574 return stationxml.FDSNStationXML(source='dummy-empty-result')
576 except fdsn.DownloadError as e:
577 raise SquirrelError(str(e))
579 def _load_constraint(self):
580 fn = self._get_constraint_path()
581 if op.exists(fn):
582 with open(fn, 'rb') as f:
583 self._constraint = pickle.load(f)
584 else:
585 self._constraint = None
587 def _dump_constraint(self):
588 with open(self._get_constraint_path(), 'wb') as f:
589 pickle.dump(self._constraint, f, protocol=2)
591 def _get_expiration_time(self, path):
592 if self.expires is None:
593 return None
595 try:
596 t = os.stat(path)[8]
597 return t + self.expires
599 except OSError:
600 return 0.0
602 def _get_channels_expiration_time(self):
603 return self._get_expiration_time(self._get_channels_path())
605 def update_waveform_promises(self, squirrel, constraint):
606 from ..base import gaps
607 cpath = os.path.abspath(self._get_channels_path())
609 ctmin = constraint.tmin
610 ctmax = constraint.tmax
612 nuts = squirrel.iter_nuts(
613 'channel',
614 path=cpath,
615 codes=constraint.codes,
616 tmin=ctmin,
617 tmax=ctmax)
619 coverages = squirrel.get_coverage(
620 'waveform',
621 codes=constraint.codes if constraint.codes else None,
622 tmin=ctmin,
623 tmax=ctmax)
625 codes_to_avail = defaultdict(list)
626 for coverage in coverages:
627 for tmin, tmax, _ in coverage.iter_spans():
628 codes_to_avail[coverage.codes].append((tmin, tmax))
630 def sgaps(nut):
631 for tmin, tmax in gaps(
632 codes_to_avail[nut.codes],
633 max(ctmin, nut.tmin) if ctmin is not None else nut.tmin,
634 min(ctmax, nut.tmax) if ctmax is not None else nut.tmax):
636 subnut = clone(nut)
637 subnut.tmin = tmin
638 subnut.tmax = tmax
640 # ignore 1-sample gaps produced by rounding errors
641 if subnut.tmax - subnut.tmin < 2*subnut.deltat:
642 continue
644 yield subnut
646 def wanted(nuts):
647 for nut in nuts:
648 for nut in sgaps(nut):
649 yield nut
651 path = self._source_id
652 squirrel.add_virtual(
653 (make_waveform_promise_nut(
654 file_path=path,
655 **nut.waveform_promise_kwargs) for nut in wanted(nuts)),
656 virtual_paths=[path])
658 def remove_waveform_promises(self, squirrel, from_database='selection'):
659 '''
660 Remove waveform promises from live selection or global database.
662 :param from_database:
663 Remove from live selection ``'selection'`` or global database
664 ``'global'``.
665 '''
667 path = self._source_id
668 if from_database == 'selection':
669 squirrel.remove(path)
670 elif from_database == 'global':
671 squirrel.get_database().remove(path)
672 else:
673 raise ValueError(
674 'Values allowed for from_database: ("selection", "global")')
676 def _get_user_credentials(self):
677 d = {}
678 if self.user_credentials is not None:
679 d['user'], d['passwd'] = self.user_credentials
681 if self.auth_token is not None or self.auth_token_path is not None:
682 d['token'] = self.get_auth_token()
684 return d
686 def download_waveforms(
687 self, orders, success, batch_add, error_permanent,
688 error_temporary):
690 elog = ErrorLog(site=self.site)
691 orders.sort(key=orders_sort_key)
692 neach = 20
693 i = 0
694 task = make_task(
695 'FDSN "%s" waveforms: downloading' % self.site, len(orders))
697 while i < len(orders):
698 orders_now = orders[i:i+neach]
699 selection_now = orders_to_selection(orders_now)
700 nsamples_estimate = sum(
701 order.estimate_nsamples() for order in orders_now)
703 nsuccess = 0
704 elog.append_checkpoint()
705 self._log_info_data(
706 'downloading, %s' % order_summary(orders_now))
708 all_paths = []
709 with tempfile.TemporaryDirectory() as tmpdir:
710 try:
711 data = fdsn.dataselect(
712 site=self.site, selection=selection_now,
713 **self._get_user_credentials())
715 now = time.time()
717 path = op.join(tmpdir, 'tmp.mseed')
718 with open(path, 'wb') as f:
719 nread = 0
720 while True:
721 buf = data.read(1024)
722 nread += len(buf)
723 if not buf:
724 break
725 f.write(buf)
727 # abort if we get way more data than expected
728 if nread > max(
729 1024 * 1000,
730 nsamples_estimate * 4 * 10):
732 raise Aborted('Too much data received.')
734 trs = io.load(path)
736 by_nslc = defaultdict(list)
737 for tr in trs:
738 by_nslc[tr.nslc_id].append(tr)
740 for order in orders_now:
741 trs_order = []
742 err_this = None
743 for tr in by_nslc[order.codes.nslc]:
744 try:
745 order.validate(tr)
746 trs_order.append(tr.chop(
747 order.tmin, order.tmax, inplace=False))
749 except trace.NoData:
750 err_this = (
751 'empty result', 'empty sub-interval')
753 except InvalidWaveform as e:
754 err_this = ('invalid waveform', str(e))
756 if len(trs_order) == 0:
757 if err_this is None:
758 err_this = ('empty result', '')
760 elog.append(now, order, *err_this)
761 if order.is_near_real_time():
762 error_temporary(order)
763 else:
764 error_permanent(order)
765 else:
766 def tsame(ta, tb):
767 return abs(tb - ta) < 2 * order.deltat
769 if len(trs_order) != 1 \
770 or not tsame(
771 trs_order[0].tmin, order.tmin) \
772 or not tsame(
773 trs_order[0].tmax, order.tmax):
775 if err_this:
776 elog.append(
777 now, order,
778 'partial result, %s' % err_this[0],
779 err_this[1])
780 else:
781 elog.append(now, order, 'partial result')
783 paths = self._archive.add(order, trs_order)
784 all_paths.extend(paths)
786 nsuccess += 1
787 success(order, trs_order)
789 except fdsn.EmptyResult:
790 now = time.time()
791 for order in orders_now:
792 elog.append(now, order, 'empty result')
793 if order.is_near_real_time():
794 error_temporary(order)
795 else:
796 error_permanent(order)
798 except Aborted as e:
799 now = time.time()
800 for order in orders_now:
801 elog.append(now, order, 'aborted', str(e))
802 error_permanent(order)
804 except (util.HTTPError, fdsn.DownloadError) as e:
805 now = time.time()
806 for order in orders_now:
807 elog.append(now, order, 'http error', str(e))
808 error_temporary(order)
810 emessage = elog.summarize_recent()
812 self._log_info_data(
813 '%i download%s %ssuccessful' % (
814 nsuccess,
815 util.plural_s(nsuccess),
816 '(partially) ' if emessage else '')
817 + (', %s' % emessage if emessage else ''))
819 if all_paths:
820 batch_add(all_paths)
822 i += neach
823 task.update(i)
825 for agg in elog.iter_aggregates():
826 logger.warning(str(agg))
828 task.done()
830 def _do_response_query(self, selection):
831 extra_args = {}
833 if self.site not in g_sites_not_supporting['includerestricted']:
834 extra_args.update(
835 includerestricted=(
836 self.user_credentials is not None
837 or self.auth_token is not None
838 or self.auth_token_path is not None))
840 self._log_responses('querying...')
842 try:
843 response_sx = fdsn.station(
844 site=self.site,
845 level='response',
846 selection=selection,
847 **extra_args)
849 self._hotfix('response', response_sx)
850 return response_sx
852 except fdsn.EmptyResult:
853 return stationxml.FDSNStationXML(source='dummy-empty-result')
855 except fdsn.DownloadError as e:
856 raise SquirrelError(str(e))
858 def update_response_inventory(self, squirrel, constraint):
859 cpath = os.path.abspath(self._get_channels_path())
860 nuts = squirrel.iter_nuts(
861 'channel', path=cpath, codes=constraint.codes)
863 tmin = g_tmin_queries
864 tmax = g_tmax
866 selection = []
867 now = time.time()
868 have = set()
869 status = defaultdict(list)
870 for nut in nuts:
871 nslc = nut.codes.nslc
872 if nslc in have:
873 continue
874 have.add(nslc)
876 fn = self._get_responses_path(nslc)
877 expiration_time = self._get_expiration_time(fn)
878 if os.path.exists(fn) \
879 and (expiration_time is None or now < expiration_time):
880 status['using cached'].append(nslc)
881 else:
882 selection.append(nslc + (tmin, tmax))
884 dummy = stationxml.FDSNStationXML(source='dummy-empty')
885 neach = 100
886 i = 0
887 fns = []
888 while i < len(selection):
889 selection_now = selection[i:i+neach]
890 i += neach
892 try:
893 sx = self._do_response_query(selection_now)
894 except Exception as e:
895 status['update failed (%s)' % str(e)].extend(
896 entry[:4] for entry in selection_now)
897 continue
899 sx.created = None # timestamp would ruin diff
901 by_nslc = dict(stationxml.split_channels(sx))
903 for entry in selection_now:
904 nslc = entry[:4]
905 response_sx = by_nslc.get(nslc, dummy)
906 try:
907 fn = self._get_responses_path(nslc)
908 fn_temp = fn + '.%i.temp' % os.getpid()
910 util.ensuredirs(fn_temp)
912 dump_all_spickle([response_sx], filename=fn_temp)
913 # response_sx.dump_xml(filename=fn_temp)
915 status_this = move_or_keep(fn_temp, fn)
917 if status_this == 'upstream unchanged':
918 try:
919 squirrel.get_database().silent_touch(fn)
920 except ExecuteGet1Error:
921 pass
923 status[status_this].append(nslc)
924 fns.append(fn)
926 except OSError as e:
927 status['update failed (%s)' % str(e)].append(nslc)
929 for k in sorted(status):
930 if k.find('failed') != -1:
931 log_target = logger.error
932 else:
933 log_target = logger.info
935 self._log_responses(
936 '%s: %s' % (
937 k, codes_to_str_abbreviated(
938 CodesNSLCE(tup) for tup in status[k])),
939 target=log_target)
941 if fns:
942 squirrel.add(fns, kinds=['response'])
945__all__ = [
946 'FDSNSource',
947]