Coverage for /usr/local/lib/python3.11/dist-packages/pyrocko/squirrel/client/fdsn.py: 84%
492 statements
« prev ^ index » next coverage.py v6.5.0, created at 2023-10-04 09:52 +0000
« prev ^ index » next coverage.py v6.5.0, created at 2023-10-04 09:52 +0000
1# http://pyrocko.org - GPLv3
2#
3# The Pyrocko Developers, 21st Century
4# ---|P------/S----------~Lg----------
6'''
7Squirrel client to access FDSN web services for seismic waveforms and metadata.
8'''
10import time
11import os
12import copy
13import logging
14import tempfile
15import importlib.util
16from collections import defaultdict
17try:
18 import cPickle as pickle
19except ImportError:
20 import pickle
21import os.path as op
22from .base import Source, Constraint
23from ..model import make_waveform_promise_nut, ehash, InvalidWaveform, \
24 order_summary, WaveformOrder, g_tmin, g_tmax, g_tmin_queries, \
25 codes_to_str_abbreviated, CodesNSLCE
26from ..database import ExecuteGet1Error
27from pyrocko.squirrel.error import SquirrelError
28from pyrocko.client import fdsn
30from pyrocko import util, trace, io
31from pyrocko.io.io_common import FileLoadError
32from pyrocko.io import stationxml
33from pyrocko.progress import progress
34from pyrocko import has_paths
36from pyrocko.guts import Object, String, Timestamp, List, Tuple, Int, Dict, \
37 Duration, Bool, clone
39guts_prefix = 'squirrel'
41fdsn.g_timeout = 60.
43logger = logging.getLogger('psq.client.fdsn')
45sites_not_supporting = {
46 'startbefore': ['geonet'],
47 'includerestricted': ['geonet']}
50def make_task(*args):
51 return progress.task(*args, logger=logger)
54def diff(fn_a, fn_b):
55 try:
56 if os.stat(fn_a).st_size != os.stat(fn_b).st_size:
57 return True
59 except OSError:
60 return True
62 with open(fn_a, 'rb') as fa:
63 with open(fn_b, 'rb') as fb:
64 while True:
65 a = fa.read(1024)
66 b = fb.read(1024)
67 if a != b:
68 return True
70 if len(a) == 0 or len(b) == 0:
71 return False
74def move_or_keep(fn_temp, fn):
75 if op.exists(fn):
76 if diff(fn, fn_temp):
77 os.rename(fn_temp, fn)
78 status = 'updated'
79 else:
80 os.unlink(fn_temp)
81 status = 'upstream unchanged'
83 else:
84 os.rename(fn_temp, fn)
85 status = 'new'
87 return status
90class Archive(Object):
92 def add(self):
93 raise NotImplementedError()
96class MSeedArchive(Archive):
97 template = String.T(default=op.join(
98 '%(tmin_year)s',
99 '%(tmin_month)s',
100 '%(tmin_day)s',
101 'trace_%(network)s_%(station)s_%(location)s_%(channel)s'
102 + '_%(block_tmin_us)s_%(block_tmax_us)s.mseed'))
104 def __init__(self, **kwargs):
105 Archive.__init__(self, **kwargs)
106 self._base_path = None
108 def set_base_path(self, path):
109 self._base_path = path
111 def add(self, order, trs):
112 path = op.join(self._base_path, self.template)
113 fmt = '%Y-%m-%d_%H-%M-%S.6FRAC'
114 return io.save(trs, path, overwrite=True, additional=dict(
115 block_tmin_us=util.time_to_str(order.tmin, format=fmt),
116 block_tmax_us=util.time_to_str(order.tmax, format=fmt)))
119def combine_selections(selection):
120 out = []
121 last = None
122 for this in selection:
123 if last and this[:4] == last[:4] and this[4] == last[5]:
124 last = last[:5] + (this[5],)
125 else:
126 if last:
127 out.append(last)
129 last = this
131 if last:
132 out.append(last)
134 return out
137def orders_sort_key(order):
138 return (order.codes, order.tmin)
141def orders_to_selection(orders, pad=1.0):
142 selection = []
143 nslc_to_deltat = {}
144 for order in sorted(orders, key=orders_sort_key):
145 selection.append(
146 order.codes.nslc + (order.tmin, order.tmax))
147 nslc_to_deltat[order.codes.nslc] = order.deltat
149 selection = combine_selections(selection)
150 selection_padded = []
151 for (net, sta, loc, cha, tmin, tmax) in selection:
152 deltat = nslc_to_deltat[net, sta, loc, cha]
153 selection_padded.append((
154 net, sta, loc, cha, tmin-pad*deltat, tmax+pad*deltat))
156 return selection_padded
159class ErrorEntry(Object):
160 time = Timestamp.T()
161 order = WaveformOrder.T()
162 kind = String.T()
163 details = String.T(optional=True)
166class ErrorAggregate(Object):
167 site = String.T()
168 kind = String.T()
169 details = String.T()
170 entries = List.T(ErrorEntry.T())
171 codes = List.T(CodesNSLCE.T())
172 time_spans = List.T(Tuple.T(2, Timestamp.T()))
174 def __str__(self):
175 codes = [str(x) for x in self.codes]
176 scodes = '\n' + util.ewrap(codes, indent=' ') if codes else '<none>'
177 tss = self.time_spans
178 sspans = '\n' + util.ewrap(('%s - %s' % (
179 util.time_to_str(ts[0]), util.time_to_str(ts[1])) for ts in tss),
180 indent=' ')
182 return ('FDSN "%s": download error summary for "%s" (%i)\n%s '
183 'Codes:%s\n Time spans:%s') % (
184 self.site,
185 self.kind,
186 len(self.entries),
187 ' Details: %s\n' % self.details if self.details else '',
188 scodes,
189 sspans)
192class ErrorLog(Object):
193 site = String.T()
194 entries = List.T(ErrorEntry.T())
195 checkpoints = List.T(Int.T())
197 def append_checkpoint(self):
198 self.checkpoints.append(len(self.entries))
200 def append(self, time, order, kind, details=''):
201 entry = ErrorEntry(time=time, order=order, kind=kind, details=details)
202 self.entries.append(entry)
204 def iter_aggregates(self):
205 by_kind_details = defaultdict(list)
206 for entry in self.entries:
207 by_kind_details[entry.kind, entry.details].append(entry)
209 kind_details = sorted(by_kind_details.keys())
211 for kind, details in kind_details:
212 entries = by_kind_details[kind, details]
213 codes = sorted(set(entry.order.codes for entry in entries))
214 selection = orders_to_selection(entry.order for entry in entries)
215 time_spans = sorted(set(row[-2:] for row in selection))
216 yield ErrorAggregate(
217 site=self.site,
218 kind=kind,
219 details=details,
220 entries=entries,
221 codes=codes,
222 time_spans=time_spans)
224 def summarize_recent(self):
225 ioff = self.checkpoints[-1] if self.checkpoints else 0
226 recent = self.entries[ioff:]
227 kinds = sorted(set(entry.kind for entry in recent))
228 if recent:
229 return '%i error%s (%s)' % (
230 len(recent), util.plural_s(recent), '; '.join(kinds))
231 else:
232 return ''
235class Aborted(SquirrelError):
236 pass
239class FDSNSource(Source, has_paths.HasPaths):
241 '''
242 Squirrel data-source to transparently get data from FDSN web services.
244 Attaching an :py:class:`FDSNSource` object to a
245 :py:class:`~pyrocko.squirrel.base.Squirrel` allows the latter to download
246 station and waveform data from an FDSN web service should the data not
247 already happen to be available locally.
248 '''
250 site = String.T(
251 help='FDSN site url or alias name (see '
252 ':py:mod:`pyrocko.client.fdsn`).')
254 query_args = Dict.T(
255 String.T(), String.T(),
256 optional=True,
257 help='Common query arguments, which are appended to all queries.')
259 expires = Duration.T(
260 optional=True,
261 help='Expiration time [s]. Information older than this will be '
262 'refreshed. This only applies to station-metadata. Waveforms do '
263 'not expire. If set to ``None`` neither type of data expires.')
265 cache_path = String.T(
266 optional=True,
267 help='Directory path where any downloaded waveforms and station '
268 'meta-data are to be kept. By default the Squirrel '
269 "environment's cache directory is used.")
271 shared_waveforms = Bool.T(
272 default=False,
273 help='If ``True``, waveforms are shared with other FDSN sources in '
274 'the same Squirrel environment. If ``False``, they are kept '
275 'separate.')
277 user_credentials = Tuple.T(
278 2, String.T(),
279 optional=True,
280 help='User and password for FDSN servers requiring password '
281 'authentication')
283 auth_token = String.T(
284 optional=True,
285 help='Authentication token to be presented to the FDSN server.')
287 auth_token_path = String.T(
288 optional=True,
289 help='Path to file containing the authentication token to be '
290 'presented to the FDSN server.')
292 hotfix_module_path = has_paths.Path.T(
293 optional=True,
294 help='Path to Python module to locally patch metadata errors.')
296 def __init__(self, site, query_args=None, **kwargs):
297 Source.__init__(self, site=site, query_args=query_args, **kwargs)
299 self._constraint = None
300 self._hash = self.make_hash()
301 self._source_id = 'client:fdsn:%s' % self._hash
302 self._error_infos = []
304 def describe(self):
305 return self._source_id
307 def make_hash(self):
308 s = self.site
309 s += 'notoken' \
310 if (self.auth_token is None and self.auth_token_path is None) \
311 else 'token'
313 if self.user_credentials is not None:
314 s += self.user_credentials[0]
315 else:
316 s += 'nocred'
318 if self.query_args is not None:
319 s += ','.join(
320 '%s:%s' % (k, self.query_args[k])
321 for k in sorted(self.query_args.keys()))
322 else:
323 s += 'noqueryargs'
325 return ehash(s)
327 def get_hash(self):
328 return self._hash
330 def get_auth_token(self):
331 if self.auth_token:
332 return self.auth_token
334 elif self.auth_token_path is not None:
335 try:
336 with open(self.auth_token_path, 'rb') as f:
337 return f.read().decode('ascii')
339 except OSError as e:
340 raise FileLoadError(
341 'Cannot load auth token file (%s): %s'
342 % (str(e), self.auth_token_path))
344 else:
345 raise Exception(
346 'FDSNSource: auth_token and auth_token_path are mutually '
347 'exclusive.')
349 def setup(self, squirrel, check=True):
350 self._cache_path = op.join(
351 self.cache_path or squirrel._cache_path, 'fdsn')
353 util.ensuredir(self._cache_path)
354 self._load_constraint()
355 self._archive = MSeedArchive()
356 waveforms_path = self._get_waveforms_path()
357 util.ensuredir(waveforms_path)
358 self._archive.set_base_path(waveforms_path)
360 squirrel.add(
361 self._get_waveforms_path(),
362 check=check)
364 fn = self._get_channels_path()
365 if os.path.exists(fn):
366 squirrel.add(fn)
368 squirrel.add_virtual(
369 [], virtual_paths=[self._source_id])
371 responses_path = self._get_responses_path()
372 if os.path.exists(responses_path):
373 squirrel.add(responses_path, kinds=['response'])
375 self._hotfix_module = None
377 def _hotfix(self, query_type, sx):
378 if self.hotfix_module_path is None:
379 return
381 if self._hotfix_module is None:
382 module_path = self.expand_path(self.hotfix_module_path)
383 spec = importlib.util.spec_from_file_location(
384 'hotfix_' + self._hash, module_path)
385 self._hotfix_module = importlib.util.module_from_spec(spec)
386 spec.loader.exec_module(self._hotfix_module)
388 hook = getattr(
389 self._hotfix_module, 'stationxml_' + query_type + '_hook')
391 return hook(sx)
393 def _get_constraint_path(self):
394 return op.join(self._cache_path, self._hash, 'constraint.pickle')
396 def _get_channels_path(self):
397 return op.join(self._cache_path, self._hash, 'channels.stationxml')
399 def _get_responses_path(self, nslc=None):
400 dirpath = op.join(
401 self._cache_path, self._hash, 'responses')
403 if nslc is None:
404 return dirpath
405 else:
406 return op.join(
407 dirpath, 'response_%s_%s_%s_%s.stationxml' % nslc)
409 def _get_waveforms_path(self):
410 if self.shared_waveforms:
411 return op.join(self._cache_path, 'waveforms')
412 else:
413 return op.join(self._cache_path, self._hash, 'waveforms')
415 def _log_meta(self, message, target=logger.info):
416 log_prefix = 'FDSN "%s" metadata:' % self.site
417 target(' '.join((log_prefix, message)))
419 def _log_responses(self, message, target=logger.info):
420 log_prefix = 'FDSN "%s" responses:' % self.site
421 target(' '.join((log_prefix, message)))
423 def _log_info_data(self, *args):
424 log_prefix = 'FDSN "%s" waveforms:' % self.site
425 logger.info(' '.join((log_prefix,) + args))
427 def _str_expires(self, t, now):
428 if t is None:
429 return 'expires: never'
430 else:
431 expire = 'expires' if t > now else 'expired'
432 return '%s: %s' % (
433 expire,
434 util.time_to_str(t, format='%Y-%m-%d %H:%M:%S'))
436 def update_channel_inventory(self, squirrel, constraint=None):
437 if constraint is None:
438 constraint = Constraint()
440 expiration_time = self._get_channels_expiration_time()
441 now = time.time()
443 log_target = logger.info
444 if self._constraint and self._constraint.contains(constraint) \
445 and (expiration_time is None or now < expiration_time):
447 status = 'using cached'
449 else:
450 if self._constraint:
451 constraint_temp = copy.deepcopy(self._constraint)
452 constraint_temp.expand(constraint)
453 constraint = constraint_temp
455 try:
456 channel_sx = self._do_channel_query(constraint)
458 channel_sx.created = None # timestamp would ruin diff
460 fn = self._get_channels_path()
461 util.ensuredirs(fn)
462 fn_temp = fn + '.%i.temp' % os.getpid()
463 channel_sx.dump_xml(filename=fn_temp)
465 status = move_or_keep(fn_temp, fn)
467 if status == 'upstream unchanged':
468 squirrel.get_database().silent_touch(fn)
470 self._constraint = constraint
471 self._dump_constraint()
473 except OSError as e:
474 status = 'update failed (%s)' % str(e)
475 log_target = logger.error
477 expiration_time = self._get_channels_expiration_time()
478 self._log_meta(
479 '%s (%s)' % (status, self._str_expires(expiration_time, now)),
480 target=log_target)
482 fn = self._get_channels_path()
483 if os.path.exists(fn):
484 squirrel.add(fn)
486 def _do_channel_query(self, constraint):
487 extra_args = {}
489 if self.site in sites_not_supporting['startbefore']:
490 if constraint.tmin is not None and constraint.tmin != g_tmin:
491 extra_args['starttime'] = constraint.tmin
492 if constraint.tmax is not None and constraint.tmax != g_tmax:
493 extra_args['endtime'] = constraint.tmax
495 else:
496 if constraint.tmin is not None and constraint.tmin != g_tmin:
497 extra_args['endafter'] = constraint.tmin
498 if constraint.tmax is not None and constraint.tmax != g_tmax:
499 extra_args['startbefore'] = constraint.tmax
501 if self.site not in sites_not_supporting['includerestricted']:
502 extra_args.update(
503 includerestricted=(
504 self.user_credentials is not None
505 or self.auth_token is not None
506 or self.auth_token_path is not None))
508 if self.query_args is not None:
509 extra_args.update(self.query_args)
511 self._log_meta('querying...')
513 try:
514 channel_sx = fdsn.station(
515 site=self.site,
516 format='text',
517 level='channel',
518 **extra_args)
520 self._hotfix('channel', channel_sx)
522 return channel_sx
524 except fdsn.EmptyResult:
525 return stationxml.FDSNStationXML(source='dummy-empty-result')
527 def _load_constraint(self):
528 fn = self._get_constraint_path()
529 if op.exists(fn):
530 with open(fn, 'rb') as f:
531 self._constraint = pickle.load(f)
532 else:
533 self._constraint = None
535 def _dump_constraint(self):
536 with open(self._get_constraint_path(), 'wb') as f:
537 pickle.dump(self._constraint, f, protocol=2)
539 def _get_expiration_time(self, path):
540 if self.expires is None:
541 return None
543 try:
544 t = os.stat(path)[8]
545 return t + self.expires
547 except OSError:
548 return 0.0
550 def _get_channels_expiration_time(self):
551 return self._get_expiration_time(self._get_channels_path())
553 def update_waveform_promises(self, squirrel, constraint):
554 from ..base import gaps
555 cpath = os.path.abspath(self._get_channels_path())
557 ctmin = constraint.tmin
558 ctmax = constraint.tmax
560 nuts = squirrel.iter_nuts(
561 'channel',
562 path=cpath,
563 codes=constraint.codes,
564 tmin=ctmin,
565 tmax=ctmax)
567 coverages = squirrel.get_coverage(
568 'waveform',
569 codes=constraint.codes if constraint.codes else None,
570 tmin=ctmin,
571 tmax=ctmax)
573 codes_to_avail = defaultdict(list)
574 for coverage in coverages:
575 for tmin, tmax, _ in coverage.iter_spans():
576 codes_to_avail[coverage.codes].append((tmin, tmax))
578 def sgaps(nut):
579 for tmin, tmax in gaps(
580 codes_to_avail[nut.codes],
581 max(ctmin, nut.tmin) if ctmin is not None else nut.tmin,
582 min(ctmax, nut.tmax) if ctmax is not None else nut.tmax):
584 subnut = clone(nut)
585 subnut.tmin = tmin
586 subnut.tmax = tmax
588 # ignore 1-sample gaps produced by rounding errors
589 if subnut.tmax - subnut.tmin < 2*subnut.deltat:
590 continue
592 yield subnut
594 def wanted(nuts):
595 for nut in nuts:
596 for nut in sgaps(nut):
597 yield nut
599 path = self._source_id
600 squirrel.add_virtual(
601 (make_waveform_promise_nut(
602 file_path=path,
603 **nut.waveform_promise_kwargs) for nut in wanted(nuts)),
604 virtual_paths=[path])
606 def remove_waveform_promises(self, squirrel, from_database='selection'):
607 '''
608 Remove waveform promises from live selection or global database.
610 :param from_database:
611 Remove from live selection ``'selection'`` or global database
612 ``'global'``.
613 '''
615 path = self._source_id
616 if from_database == 'selection':
617 squirrel.remove(path)
618 elif from_database == 'global':
619 squirrel.get_database().remove(path)
620 else:
621 raise ValueError(
622 'Values allowed for from_database: ("selection", "global")')
624 def _get_user_credentials(self):
625 d = {}
626 if self.user_credentials is not None:
627 d['user'], d['passwd'] = self.user_credentials
629 if self.auth_token is not None or self.auth_token_path is not None:
630 d['token'] = self.get_auth_token()
632 return d
634 def download_waveforms(
635 self, orders, success, batch_add, error_permanent,
636 error_temporary):
638 elog = ErrorLog(site=self.site)
639 orders.sort(key=orders_sort_key)
640 neach = 20
641 i = 0
642 task = make_task(
643 'FDSN "%s" waveforms: downloading' % self.site, len(orders))
645 while i < len(orders):
646 orders_now = orders[i:i+neach]
647 selection_now = orders_to_selection(orders_now)
648 nsamples_estimate = sum(
649 order.estimate_nsamples() for order in orders_now)
651 nsuccess = 0
652 elog.append_checkpoint()
653 self._log_info_data(
654 'downloading, %s' % order_summary(orders_now))
656 all_paths = []
657 with tempfile.TemporaryDirectory() as tmpdir:
658 try:
659 data = fdsn.dataselect(
660 site=self.site, selection=selection_now,
661 **self._get_user_credentials())
663 now = time.time()
665 path = op.join(tmpdir, 'tmp.mseed')
666 with open(path, 'wb') as f:
667 nread = 0
668 while True:
669 buf = data.read(1024)
670 nread += len(buf)
671 if not buf:
672 break
673 f.write(buf)
675 # abort if we get way more data than expected
676 if nread > max(
677 1024 * 1000,
678 nsamples_estimate * 4 * 10):
680 raise Aborted('Too much data received.')
682 trs = io.load(path)
684 by_nslc = defaultdict(list)
685 for tr in trs:
686 by_nslc[tr.nslc_id].append(tr)
688 for order in orders_now:
689 trs_order = []
690 err_this = None
691 for tr in by_nslc[order.codes.nslc]:
692 try:
693 order.validate(tr)
694 trs_order.append(tr.chop(
695 order.tmin, order.tmax, inplace=False))
697 except trace.NoData:
698 err_this = (
699 'empty result', 'empty sub-interval')
701 except InvalidWaveform as e:
702 err_this = ('invalid waveform', str(e))
704 if len(trs_order) == 0:
705 if err_this is None:
706 err_this = ('empty result', '')
708 elog.append(now, order, *err_this)
709 if order.is_near_real_time():
710 error_temporary(order)
711 else:
712 error_permanent(order)
713 else:
714 def tsame(ta, tb):
715 return abs(tb - ta) < 2 * order.deltat
717 if len(trs_order) != 1 \
718 or not tsame(
719 trs_order[0].tmin, order.tmin) \
720 or not tsame(
721 trs_order[0].tmax, order.tmax):
723 if err_this:
724 elog.append(
725 now, order,
726 'partial result, %s' % err_this[0],
727 err_this[1])
728 else:
729 elog.append(now, order, 'partial result')
731 paths = self._archive.add(order, trs_order)
732 all_paths.extend(paths)
734 nsuccess += 1
735 success(order, trs_order)
737 except fdsn.EmptyResult:
738 now = time.time()
739 for order in orders_now:
740 elog.append(now, order, 'empty result')
741 if order.is_near_real_time():
742 error_temporary(order)
743 else:
744 error_permanent(order)
746 except Aborted as e:
747 now = time.time()
748 for order in orders_now:
749 elog.append(now, order, 'aborted', str(e))
750 error_permanent(order)
752 except util.HTTPError as e:
753 now = time.time()
754 for order in orders_now:
755 elog.append(now, order, 'http error', str(e))
756 error_temporary(order)
758 emessage = elog.summarize_recent()
760 self._log_info_data(
761 '%i download%s %ssuccessful' % (
762 nsuccess,
763 util.plural_s(nsuccess),
764 '(partially) ' if emessage else '')
765 + (', %s' % emessage if emessage else ''))
767 if all_paths:
768 batch_add(all_paths)
770 i += neach
771 task.update(i)
773 for agg in elog.iter_aggregates():
774 logger.warning(str(agg))
776 task.done()
778 def _do_response_query(self, selection):
779 extra_args = {}
781 if self.site not in sites_not_supporting['includerestricted']:
782 extra_args.update(
783 includerestricted=(
784 self.user_credentials is not None
785 or self.auth_token is not None
786 or self.auth_token_path is not None))
788 self._log_responses('querying...')
790 try:
791 response_sx = fdsn.station(
792 site=self.site,
793 level='response',
794 selection=selection,
795 **extra_args)
797 self._hotfix('response', response_sx)
798 return response_sx
800 except fdsn.EmptyResult:
801 return stationxml.FDSNStationXML(source='dummy-empty-result')
803 def update_response_inventory(self, squirrel, constraint):
804 cpath = os.path.abspath(self._get_channels_path())
805 nuts = squirrel.iter_nuts(
806 'channel', path=cpath, codes=constraint.codes)
808 tmin = g_tmin_queries
809 tmax = g_tmax
811 selection = []
812 now = time.time()
813 have = set()
814 status = defaultdict(list)
815 for nut in nuts:
816 nslc = nut.codes.nslc
817 if nslc in have:
818 continue
819 have.add(nslc)
821 fn = self._get_responses_path(nslc)
822 expiration_time = self._get_expiration_time(fn)
823 if os.path.exists(fn) \
824 and (expiration_time is None or now < expiration_time):
825 status['using cached'].append(nslc)
826 else:
827 selection.append(nslc + (tmin, tmax))
829 dummy = stationxml.FDSNStationXML(source='dummy-empty')
830 neach = 100
831 i = 0
832 fns = []
833 while i < len(selection):
834 selection_now = selection[i:i+neach]
835 i += neach
837 try:
838 sx = self._do_response_query(selection_now)
839 except Exception as e:
840 status['update failed (%s)' % str(e)].extend(
841 entry[:4] for entry in selection_now)
842 continue
844 sx.created = None # timestamp would ruin diff
846 by_nslc = dict(stationxml.split_channels(sx))
848 for entry in selection_now:
849 nslc = entry[:4]
850 response_sx = by_nslc.get(nslc, dummy)
851 try:
852 fn = self._get_responses_path(nslc)
853 fn_temp = fn + '.%i.temp' % os.getpid()
855 util.ensuredirs(fn_temp)
856 response_sx.dump_xml(filename=fn_temp)
858 status_this = move_or_keep(fn_temp, fn)
860 if status_this == 'upstream unchanged':
861 try:
862 squirrel.get_database().silent_touch(fn)
863 except ExecuteGet1Error:
864 pass
866 status[status_this].append(nslc)
867 fns.append(fn)
869 except OSError as e:
870 status['update failed (%s)' % str(e)].append(nslc)
872 for k in sorted(status):
873 if k.find('failed') != -1:
874 log_target = logger.error
875 else:
876 log_target = logger.info
878 self._log_responses(
879 '%s: %s' % (
880 k, codes_to_str_abbreviated(
881 CodesNSLCE(tup) for tup in status[k])),
882 target=log_target)
884 if fns:
885 squirrel.add(fns, kinds=['response'])
888__all__ = [
889 'FDSNSource',
890]