Coverage for /usr/local/lib/python3.11/dist-packages/pyrocko/squirrel/client/fdsn.py: 83%
511 statements
« prev ^ index » next coverage.py v6.5.0, created at 2023-11-03 12:47 +0000
« prev ^ index » next coverage.py v6.5.0, created at 2023-11-03 12:47 +0000
1# http://pyrocko.org - GPLv3
2#
3# The Pyrocko Developers, 21st Century
4# ---|P------/S----------~Lg----------
6'''
7Squirrel client to access FDSN web services for seismic waveforms and metadata.
8'''
10import time
11import os
12import copy
13import logging
14import tempfile
15import importlib.util
16from collections import defaultdict
17try:
18 import cPickle as pickle
19except ImportError:
20 import pickle
21import os.path as op
22from .base import Source, Constraint
23from ..model import make_waveform_promise_nut, ehash, InvalidWaveform, \
24 order_summary, WaveformOrder, g_tmin, g_tmax, g_tmin_queries, \
25 codes_to_str_abbreviated, CodesNSLCE
26from ..database import ExecuteGet1Error
27from pyrocko.squirrel.error import SquirrelError
28from pyrocko.client import fdsn
30from pyrocko import util, trace, io
31from pyrocko.io.io_common import FileLoadError
32from pyrocko.io import stationxml
33from pyrocko.progress import progress
34from pyrocko import has_paths
36from pyrocko.guts import Object, String, Timestamp, List, Tuple, Int, Dict, \
37 Duration, Bool, clone, dump_all_spickle
40guts_prefix = 'squirrel'
42fdsn.g_timeout = 60.
44logger = logging.getLogger('psq.client.fdsn')
46g_sites_not_supporting = {
47 'startbefore': ['geonet'],
48 'includerestricted': ['geonet', 'ncedc', 'scedc']}
50g_keys_conflicting_post_codes = {
51 'network', 'station', 'location', 'channel', 'minlatitude', 'maxlatitude',
52 'minlongitude', 'maxlongitude', 'latitude', 'longitude', 'minradius',
53 'maxradius'}
56def make_task(*args):
57 return progress.task(*args, logger=logger)
60def diff(fn_a, fn_b):
61 try:
62 if os.stat(fn_a).st_size != os.stat(fn_b).st_size:
63 return True
65 except OSError:
66 return True
68 with open(fn_a, 'rb') as fa:
69 with open(fn_b, 'rb') as fb:
70 while True:
71 a = fa.read(1024)
72 b = fb.read(1024)
73 if a != b:
74 return True
76 if len(a) == 0 or len(b) == 0:
77 return False
80def move_or_keep(fn_temp, fn):
81 if op.exists(fn):
82 if diff(fn, fn_temp):
83 os.rename(fn_temp, fn)
84 status = 'updated'
85 else:
86 os.unlink(fn_temp)
87 status = 'upstream unchanged'
89 else:
90 os.rename(fn_temp, fn)
91 status = 'new'
93 return status
96class Archive(Object):
98 def add(self):
99 raise NotImplementedError()
102class MSeedArchive(Archive):
103 template = String.T(default=op.join(
104 '%(tmin_year)s',
105 '%(tmin_month)s',
106 '%(tmin_day)s',
107 'trace_%(network)s_%(station)s_%(location)s_%(channel)s'
108 + '_%(block_tmin_us)s_%(block_tmax_us)s.mseed'))
110 def __init__(self, **kwargs):
111 Archive.__init__(self, **kwargs)
112 self._base_path = None
114 def set_base_path(self, path):
115 self._base_path = path
117 def add(self, order, trs):
118 path = op.join(self._base_path, self.template)
119 fmt = '%Y-%m-%d_%H-%M-%S.6FRAC'
120 return io.save(trs, path, overwrite=True, additional=dict(
121 block_tmin_us=util.time_to_str(order.tmin, format=fmt),
122 block_tmax_us=util.time_to_str(order.tmax, format=fmt)))
125def combine_selections(selection):
126 out = []
127 last = None
128 for this in selection:
129 if last and this[:4] == last[:4] and this[4] == last[5]:
130 last = last[:5] + (this[5],)
131 else:
132 if last:
133 out.append(last)
135 last = this
137 if last:
138 out.append(last)
140 return out
143def orders_sort_key(order):
144 return (order.codes, order.tmin)
147def orders_to_selection(orders, pad=1.0):
148 selection = []
149 nslc_to_deltat = {}
150 for order in sorted(orders, key=orders_sort_key):
151 selection.append(
152 order.codes.nslc + (order.tmin, order.tmax))
153 nslc_to_deltat[order.codes.nslc] = order.deltat
155 selection = combine_selections(selection)
156 selection_padded = []
157 for (net, sta, loc, cha, tmin, tmax) in selection:
158 deltat = nslc_to_deltat[net, sta, loc, cha]
159 selection_padded.append((
160 net, sta, loc, cha, tmin-pad*deltat, tmax+pad*deltat))
162 return selection_padded
165def codes_to_selection(codes_list, tmin, tmax):
166 if codes_list is None:
167 return None
169 selection = []
170 for codes in sorted(codes_list):
171 selection.append(
172 codes.nslc + (tmin, tmax))
174 return selection
177class ErrorEntry(Object):
178 time = Timestamp.T()
179 order = WaveformOrder.T()
180 kind = String.T()
181 details = String.T(optional=True)
184class ErrorAggregate(Object):
185 site = String.T()
186 kind = String.T()
187 details = String.T()
188 entries = List.T(ErrorEntry.T())
189 codes = List.T(CodesNSLCE.T())
190 time_spans = List.T(Tuple.T(2, Timestamp.T()))
192 def __str__(self):
193 codes = [str(x) for x in self.codes]
194 scodes = '\n' + util.ewrap(codes, indent=' ') if codes else '<none>'
195 tss = self.time_spans
196 sspans = '\n' + util.ewrap(('%s - %s' % (
197 util.time_to_str(ts[0]), util.time_to_str(ts[1])) for ts in tss),
198 indent=' ')
200 return ('FDSN "%s": download error summary for "%s" (%i)\n%s '
201 'Codes:%s\n Time spans:%s') % (
202 self.site,
203 self.kind,
204 len(self.entries),
205 ' Details: %s\n' % self.details if self.details else '',
206 scodes,
207 sspans)
210class ErrorLog(Object):
211 site = String.T()
212 entries = List.T(ErrorEntry.T())
213 checkpoints = List.T(Int.T())
215 def append_checkpoint(self):
216 self.checkpoints.append(len(self.entries))
218 def append(self, time, order, kind, details=''):
219 entry = ErrorEntry(time=time, order=order, kind=kind, details=details)
220 self.entries.append(entry)
222 def iter_aggregates(self):
223 by_kind_details = defaultdict(list)
224 for entry in self.entries:
225 by_kind_details[entry.kind, entry.details].append(entry)
227 kind_details = sorted(by_kind_details.keys())
229 for kind, details in kind_details:
230 entries = by_kind_details[kind, details]
231 codes = sorted(set(entry.order.codes for entry in entries))
232 selection = orders_to_selection(entry.order for entry in entries)
233 time_spans = sorted(set(row[-2:] for row in selection))
234 yield ErrorAggregate(
235 site=self.site,
236 kind=kind,
237 details=details,
238 entries=entries,
239 codes=codes,
240 time_spans=time_spans)
242 def summarize_recent(self):
243 ioff = self.checkpoints[-1] if self.checkpoints else 0
244 recent = self.entries[ioff:]
245 kinds = sorted(set(entry.kind for entry in recent))
246 if recent:
247 return '%i error%s (%s)' % (
248 len(recent), util.plural_s(recent), '; '.join(kinds))
249 else:
250 return ''
253class Aborted(SquirrelError):
254 pass
257class FDSNSource(Source, has_paths.HasPaths):
259 '''
260 Squirrel data-source to transparently get data from FDSN web services.
262 Attaching an :py:class:`FDSNSource` object to a
263 :py:class:`~pyrocko.squirrel.base.Squirrel` allows the latter to download
264 station and waveform data from an FDSN web service should the data not
265 already happen to be available locally.
266 '''
268 site = String.T(
269 help='FDSN site url or alias name (see '
270 ':py:mod:`pyrocko.client.fdsn`).')
272 query_args = Dict.T(
273 String.T(), String.T(),
274 optional=True,
275 help='Common query arguments, which are appended to all queries.')
277 codes = List.T(
278 CodesNSLCE.T(),
279 optional=True,
280 help='List of codes patterns to query via POST parameters.')
282 expires = Duration.T(
283 optional=True,
284 help='Expiration time [s]. Information older than this will be '
285 'refreshed. This only applies to station-metadata. Waveforms do '
286 'not expire. If set to ``None`` neither type of data expires.')
288 cache_path = String.T(
289 optional=True,
290 help='Directory path where any downloaded waveforms and station '
291 'meta-data are to be kept. By default the Squirrel '
292 "environment's cache directory is used.")
294 shared_waveforms = Bool.T(
295 default=False,
296 help='If ``True``, waveforms are shared with other FDSN sources in '
297 'the same Squirrel environment. If ``False``, they are kept '
298 'separate.')
300 user_credentials = Tuple.T(
301 2, String.T(),
302 optional=True,
303 help='User and password for FDSN servers requiring password '
304 'authentication')
306 auth_token = String.T(
307 optional=True,
308 help='Authentication token to be presented to the FDSN server.')
310 auth_token_path = String.T(
311 optional=True,
312 help='Path to file containing the authentication token to be '
313 'presented to the FDSN server.')
315 hotfix_module_path = has_paths.Path.T(
316 optional=True,
317 help='Path to Python module to locally patch metadata errors.')
319 def __init__(self, site, query_args=None, codes=None, **kwargs):
320 if codes:
321 codes = [CodesNSLCE(codes_) for codes_ in codes]
323 if codes is not None and query_args is not None:
324 conflicting = g_keys_conflicting_post_codes \
325 & set(query_args.keys())
327 if conflicting:
328 raise SquirrelError(
329 'Cannot use %s in `query_args` when `codes` are also '
330 'given.' % ' or '.join("'%s'" % k for k in conflicting))
332 Source.__init__(
333 self, site=site, query_args=query_args, codes=codes, **kwargs)
335 self._constraint = None
336 self._hash = self.make_hash()
337 self._source_id = 'client:fdsn:%s' % self._hash
338 self._error_infos = []
340 def describe(self):
341 return self._source_id
343 def make_hash(self):
344 s = self.site
345 s += 'notoken' \
346 if (self.auth_token is None and self.auth_token_path is None) \
347 else 'token'
349 if self.user_credentials is not None:
350 s += self.user_credentials[0]
351 else:
352 s += 'nocred'
354 if self.query_args is not None:
355 s += ','.join(
356 '%s:%s' % (k, self.query_args[k])
357 for k in sorted(self.query_args.keys()))
358 else:
359 s += 'noqueryargs'
361 if self.codes is not None:
362 s += 'post_codes:' + ','.join(
363 codes.safe_str for codes in self.codes)
365 return ehash(s)
367 def get_hash(self):
368 return self._hash
370 def get_auth_token(self):
371 if self.auth_token:
372 return self.auth_token
374 elif self.auth_token_path is not None:
375 try:
376 with open(self.auth_token_path, 'rb') as f:
377 return f.read().decode('ascii')
379 except OSError as e:
380 raise FileLoadError(
381 'Cannot load auth token file (%s): %s'
382 % (str(e), self.auth_token_path))
384 else:
385 raise Exception(
386 'FDSNSource: auth_token and auth_token_path are mutually '
387 'exclusive.')
389 def setup(self, squirrel, check=True):
390 self._cache_path = op.join(
391 self.cache_path or squirrel._cache_path, 'fdsn')
393 util.ensuredir(self._cache_path)
394 self._load_constraint()
395 self._archive = MSeedArchive()
396 waveforms_path = self._get_waveforms_path()
397 util.ensuredir(waveforms_path)
398 self._archive.set_base_path(waveforms_path)
400 squirrel.add(
401 self._get_waveforms_path(),
402 check=check)
404 fn = self._get_channels_path()
405 if os.path.exists(fn):
406 squirrel.add(fn)
408 squirrel.add_virtual(
409 [], virtual_paths=[self._source_id])
411 responses_path = self._get_responses_path()
412 if os.path.exists(responses_path):
413 squirrel.add(
414 responses_path, kinds=['response'], exclude=r'\.temp$')
416 self._hotfix_module = None
418 def _hotfix(self, query_type, sx):
419 if self.hotfix_module_path is None:
420 return
422 if self._hotfix_module is None:
423 module_path = self.expand_path(self.hotfix_module_path)
424 spec = importlib.util.spec_from_file_location(
425 'hotfix_' + self._hash, module_path)
426 self._hotfix_module = importlib.util.module_from_spec(spec)
427 spec.loader.exec_module(self._hotfix_module)
429 hook = getattr(
430 self._hotfix_module, 'stationxml_' + query_type + '_hook')
432 return hook(sx)
434 def _get_constraint_path(self):
435 return op.join(self._cache_path, self._hash, 'constraint.pickle')
437 def _get_channels_path(self):
438 return op.join(self._cache_path, self._hash, 'channels.stationxml')
440 def _get_responses_path(self, nslc=None):
441 dirpath = op.join(
442 self._cache_path, self._hash, 'responses')
444 if nslc is None:
445 return dirpath
446 else:
447 return op.join(
448 dirpath, 'response_%s_%s_%s_%s.stationxml' % nslc)
450 def _get_waveforms_path(self):
451 if self.shared_waveforms:
452 return op.join(self._cache_path, 'waveforms')
453 else:
454 return op.join(self._cache_path, self._hash, 'waveforms')
456 def _log_meta(self, message, target=logger.info):
457 log_prefix = 'FDSN "%s" metadata:' % self.site
458 target(' '.join((log_prefix, message)))
460 def _log_responses(self, message, target=logger.info):
461 log_prefix = 'FDSN "%s" responses:' % self.site
462 target(' '.join((log_prefix, message)))
464 def _log_info_data(self, *args):
465 log_prefix = 'FDSN "%s" waveforms:' % self.site
466 logger.info(' '.join((log_prefix,) + args))
468 def _str_expires(self, t, now):
469 if t is None:
470 return 'expires: never'
471 else:
472 expire = 'expires' if t > now else 'expired'
473 return '%s: %s' % (
474 expire,
475 util.time_to_str(t, format='%Y-%m-%d %H:%M:%S'))
477 def update_channel_inventory(self, squirrel, constraint=None):
478 if constraint is None:
479 constraint = Constraint()
481 expiration_time = self._get_channels_expiration_time()
482 now = time.time()
484 log_target = logger.info
485 if self._constraint and self._constraint.contains(constraint) \
486 and (expiration_time is None or now < expiration_time):
488 status = 'using cached'
490 else:
491 if self._constraint:
492 constraint_temp = copy.deepcopy(self._constraint)
493 constraint_temp.expand(constraint)
494 constraint = constraint_temp
496 try:
497 channel_sx = self._do_channel_query(constraint)
499 channel_sx.created = None # timestamp would ruin diff
501 fn = self._get_channels_path()
502 util.ensuredirs(fn)
503 fn_temp = fn + '.%i.temp' % os.getpid()
505 dump_all_spickle([channel_sx], filename=fn_temp)
506 # channel_sx.dump_xml(filename=fn_temp)
508 status = move_or_keep(fn_temp, fn)
510 if status == 'upstream unchanged':
511 squirrel.get_database().silent_touch(fn)
513 self._constraint = constraint
514 self._dump_constraint()
516 except OSError as e:
517 status = 'update failed (%s)' % str(e)
518 log_target = logger.error
520 expiration_time = self._get_channels_expiration_time()
521 self._log_meta(
522 '%s (%s)' % (status, self._str_expires(expiration_time, now)),
523 target=log_target)
525 fn = self._get_channels_path()
526 if os.path.exists(fn):
527 squirrel.add(fn)
529 def _do_channel_query(self, constraint):
530 extra_args = {}
532 tmin = constraint.tmin \
533 if constraint.tmin is not None and constraint.tmin != g_tmin \
534 else g_tmin_queries
536 tmax = constraint.tmax \
537 if constraint.tmax is not None and constraint.tmax != g_tmax \
538 else g_tmax
540 if self.site in g_sites_not_supporting['startbefore']:
541 ktmin = 'starttime'
542 ktmax = 'endtime'
543 else:
544 ktmin = 'endafter'
545 ktmax = 'startbefore'
547 if self.codes is None:
548 extra_args[ktmin] = tmin
549 extra_args[ktmax] = tmax
551 if self.site not in g_sites_not_supporting['includerestricted']:
552 extra_args.update(
553 includerestricted=(
554 self.user_credentials is not None
555 or self.auth_token is not None
556 or self.auth_token_path is not None))
558 if self.query_args is not None:
559 extra_args.update(self.query_args)
561 self._log_meta('querying...')
563 try:
564 channel_sx = fdsn.station(
565 site=self.site,
566 format='text',
567 level='channel',
568 selection=codes_to_selection(self.codes, tmin, tmax),
569 **extra_args)
571 self._hotfix('channel', channel_sx)
573 return channel_sx
575 except fdsn.EmptyResult:
576 return stationxml.FDSNStationXML(source='dummy-empty-result')
578 except fdsn.DownloadError as e:
579 raise SquirrelError(str(e))
581 def _load_constraint(self):
582 fn = self._get_constraint_path()
583 if op.exists(fn):
584 with open(fn, 'rb') as f:
585 self._constraint = pickle.load(f)
586 else:
587 self._constraint = None
589 def _dump_constraint(self):
590 with open(self._get_constraint_path(), 'wb') as f:
591 pickle.dump(self._constraint, f, protocol=2)
593 def _get_expiration_time(self, path):
594 if self.expires is None:
595 return None
597 try:
598 t = os.stat(path)[8]
599 return t + self.expires
601 except OSError:
602 return 0.0
604 def _get_channels_expiration_time(self):
605 return self._get_expiration_time(self._get_channels_path())
607 def update_waveform_promises(self, squirrel, constraint):
608 from ..base import gaps
609 cpath = os.path.abspath(self._get_channels_path())
611 ctmin = constraint.tmin
612 ctmax = constraint.tmax
614 nuts = squirrel.iter_nuts(
615 'channel',
616 path=cpath,
617 codes=constraint.codes,
618 tmin=ctmin,
619 tmax=ctmax)
621 coverages = squirrel.get_coverage(
622 'waveform',
623 codes=constraint.codes if constraint.codes else None,
624 tmin=ctmin,
625 tmax=ctmax)
627 codes_to_avail = defaultdict(list)
628 for coverage in coverages:
629 for tmin, tmax, _ in coverage.iter_spans():
630 codes_to_avail[coverage.codes].append((tmin, tmax))
632 def sgaps(nut):
633 for tmin, tmax in gaps(
634 codes_to_avail[nut.codes],
635 max(ctmin, nut.tmin) if ctmin is not None else nut.tmin,
636 min(ctmax, nut.tmax) if ctmax is not None else nut.tmax):
638 subnut = clone(nut)
639 subnut.tmin = tmin
640 subnut.tmax = tmax
642 # ignore 1-sample gaps produced by rounding errors
643 if subnut.tmax - subnut.tmin < 2*subnut.deltat:
644 continue
646 yield subnut
648 def wanted(nuts):
649 for nut in nuts:
650 for nut in sgaps(nut):
651 yield nut
653 path = self._source_id
654 squirrel.add_virtual(
655 (make_waveform_promise_nut(
656 file_path=path,
657 **nut.waveform_promise_kwargs) for nut in wanted(nuts)),
658 virtual_paths=[path])
660 def remove_waveform_promises(self, squirrel, from_database='selection'):
661 '''
662 Remove waveform promises from live selection or global database.
664 :param from_database:
665 Remove from live selection ``'selection'`` or global database
666 ``'global'``.
667 '''
669 path = self._source_id
670 if from_database == 'selection':
671 squirrel.remove(path)
672 elif from_database == 'global':
673 squirrel.get_database().remove(path)
674 else:
675 raise ValueError(
676 'Values allowed for from_database: ("selection", "global")')
678 def _get_user_credentials(self):
679 d = {}
680 if self.user_credentials is not None:
681 d['user'], d['passwd'] = self.user_credentials
683 if self.auth_token is not None or self.auth_token_path is not None:
684 d['token'] = self.get_auth_token()
686 return d
688 def download_waveforms(
689 self, orders, success, batch_add, error_permanent,
690 error_temporary):
692 elog = ErrorLog(site=self.site)
693 orders.sort(key=orders_sort_key)
694 neach = 20
695 i = 0
696 task = make_task(
697 'FDSN "%s" waveforms: downloading' % self.site, len(orders))
699 while i < len(orders):
700 orders_now = orders[i:i+neach]
701 selection_now = orders_to_selection(orders_now)
702 nsamples_estimate = sum(
703 order.estimate_nsamples() for order in orders_now)
705 nsuccess = 0
706 elog.append_checkpoint()
707 self._log_info_data(
708 'downloading, %s' % order_summary(orders_now))
710 all_paths = []
711 with tempfile.TemporaryDirectory() as tmpdir:
712 try:
713 data = fdsn.dataselect(
714 site=self.site, selection=selection_now,
715 **self._get_user_credentials())
717 now = time.time()
719 path = op.join(tmpdir, 'tmp.mseed')
720 with open(path, 'wb') as f:
721 nread = 0
722 while True:
723 buf = data.read(1024)
724 nread += len(buf)
725 if not buf:
726 break
727 f.write(buf)
729 # abort if we get way more data than expected
730 if nread > max(
731 1024 * 1000,
732 nsamples_estimate * 4 * 10):
734 raise Aborted('Too much data received.')
736 trs = io.load(path)
738 by_nslc = defaultdict(list)
739 for tr in trs:
740 by_nslc[tr.nslc_id].append(tr)
742 for order in orders_now:
743 trs_order = []
744 err_this = None
745 for tr in by_nslc[order.codes.nslc]:
746 try:
747 order.validate(tr)
748 trs_order.append(tr.chop(
749 order.tmin, order.tmax, inplace=False))
751 except trace.NoData:
752 err_this = (
753 'empty result', 'empty sub-interval')
755 except InvalidWaveform as e:
756 err_this = ('invalid waveform', str(e))
758 if len(trs_order) == 0:
759 if err_this is None:
760 err_this = ('empty result', '')
762 elog.append(now, order, *err_this)
763 if order.is_near_real_time():
764 error_temporary(order)
765 else:
766 error_permanent(order)
767 else:
768 def tsame(ta, tb):
769 return abs(tb - ta) < 2 * order.deltat
771 if len(trs_order) != 1 \
772 or not tsame(
773 trs_order[0].tmin, order.tmin) \
774 or not tsame(
775 trs_order[0].tmax, order.tmax):
777 if err_this:
778 elog.append(
779 now, order,
780 'partial result, %s' % err_this[0],
781 err_this[1])
782 else:
783 elog.append(now, order, 'partial result')
785 paths = self._archive.add(order, trs_order)
786 all_paths.extend(paths)
788 nsuccess += 1
789 success(order, trs_order)
791 except fdsn.EmptyResult:
792 now = time.time()
793 for order in orders_now:
794 elog.append(now, order, 'empty result')
795 if order.is_near_real_time():
796 error_temporary(order)
797 else:
798 error_permanent(order)
800 except Aborted as e:
801 now = time.time()
802 for order in orders_now:
803 elog.append(now, order, 'aborted', str(e))
804 error_permanent(order)
806 except (util.HTTPError, fdsn.DownloadError) as e:
807 now = time.time()
808 for order in orders_now:
809 elog.append(now, order, 'http error', str(e))
810 error_temporary(order)
812 emessage = elog.summarize_recent()
814 self._log_info_data(
815 '%i download%s %ssuccessful' % (
816 nsuccess,
817 util.plural_s(nsuccess),
818 '(partially) ' if emessage else '')
819 + (', %s' % emessage if emessage else ''))
821 if all_paths:
822 batch_add(all_paths)
824 i += neach
825 task.update(i)
827 for agg in elog.iter_aggregates():
828 logger.warning(str(agg))
830 task.done()
832 def _do_response_query(self, selection):
833 extra_args = {}
835 if self.site not in g_sites_not_supporting['includerestricted']:
836 extra_args.update(
837 includerestricted=(
838 self.user_credentials is not None
839 or self.auth_token is not None
840 or self.auth_token_path is not None))
842 self._log_responses('querying...')
844 try:
845 response_sx = fdsn.station(
846 site=self.site,
847 level='response',
848 selection=selection,
849 **extra_args)
851 self._hotfix('response', response_sx)
852 return response_sx
854 except fdsn.EmptyResult:
855 return stationxml.FDSNStationXML(source='dummy-empty-result')
857 except fdsn.DownloadError as e:
858 raise SquirrelError(str(e))
860 def update_response_inventory(self, squirrel, constraint):
861 cpath = os.path.abspath(self._get_channels_path())
862 nuts = squirrel.iter_nuts(
863 'channel', path=cpath, codes=constraint.codes)
865 tmin = g_tmin_queries
866 tmax = g_tmax
868 selection = []
869 now = time.time()
870 have = set()
871 status = defaultdict(list)
872 for nut in nuts:
873 nslc = nut.codes.nslc
874 if nslc in have:
875 continue
876 have.add(nslc)
878 fn = self._get_responses_path(nslc)
879 expiration_time = self._get_expiration_time(fn)
880 if os.path.exists(fn) \
881 and (expiration_time is None or now < expiration_time):
882 status['using cached'].append(nslc)
883 else:
884 selection.append(nslc + (tmin, tmax))
886 dummy = stationxml.FDSNStationXML(source='dummy-empty')
887 neach = 100
888 i = 0
889 fns = []
890 while i < len(selection):
891 selection_now = selection[i:i+neach]
892 i += neach
894 try:
895 sx = self._do_response_query(selection_now)
896 except Exception as e:
897 status['update failed (%s)' % str(e)].extend(
898 entry[:4] for entry in selection_now)
899 continue
901 sx.created = None # timestamp would ruin diff
903 by_nslc = dict(stationxml.split_channels(sx))
905 for entry in selection_now:
906 nslc = entry[:4]
907 response_sx = by_nslc.get(nslc, dummy)
908 try:
909 fn = self._get_responses_path(nslc)
910 fn_temp = fn + '.%i.temp' % os.getpid()
912 util.ensuredirs(fn_temp)
914 dump_all_spickle([response_sx], filename=fn_temp)
915 # response_sx.dump_xml(filename=fn_temp)
917 status_this = move_or_keep(fn_temp, fn)
919 if status_this == 'upstream unchanged':
920 try:
921 squirrel.get_database().silent_touch(fn)
922 except ExecuteGet1Error:
923 pass
925 status[status_this].append(nslc)
926 fns.append(fn)
928 except OSError as e:
929 status['update failed (%s)' % str(e)].append(nslc)
931 for k in sorted(status):
932 if k.find('failed') != -1:
933 log_target = logger.error
934 else:
935 log_target = logger.info
937 self._log_responses(
938 '%s: %s' % (
939 k, codes_to_str_abbreviated(
940 CodesNSLCE(tup) for tup in status[k])),
941 target=log_target)
943 if fns:
944 squirrel.add(fns, kinds=['response'])
947__all__ = [
948 'FDSNSource',
949]