# http://pyrocko.org - GPLv3
#
# The Pyrocko Developers, 21st Century
# ---|P------/S----------~Lg----------
'''
Functionality to check for common data/metadata problems.
'''
from pyrocko.guts import StringChoice, Object, String, List
from pyrocko import util
from pyrocko.squirrel.model import CodesNSLCE
from pyrocko.squirrel.operators.base import CodesPatternFiltering
from pyrocko.squirrel.model import codes_patterns_for_kind, to_kind_id
guts_prefix = 'squirrel'
def get_matching(coverages, coverage):
matching = []
for candidate in coverages:
if candidate.codes == coverage.codes:
matching.append(candidate)
matching.sort(
key=lambda c: (coverage.deltat == c.deltat, not c.deltat))
matching.reverse()
return matching
[docs]class SquirrelCheckProblemType(StringChoice):
'''
Potential dataset/metadata problem types.
.. list-table:: Squirrel check problem types
:widths: 10 90
:header-rows: 1
* - Type
- Description
%%(table)s
'''
types = {
'p1': 'Waveform duplicates.',
'p2': 'Overlaps in channel/response epochs.',
'p3': 'No waveforms available for a channel/response listed in '
'metadata.',
'p4': 'Channel/response information missing for an available '
'waveform.',
'p5': 'Multiple channel/response entries matching an available '
'waveform.',
'p6': 'Sampling rate of waveform does not match rate listed in '
'metadata.',
'p7': 'Waveform incompletely covered by channel/response epochs.'}
choices = list(types.keys())
SquirrelCheckProblemType.__doc__ %= {
'table': '\n'.join('''
* - %s
- %s''' % (k, v) for (k, v) in SquirrelCheckProblemType.types.items())
}
[docs]class SquirrelCheckProblem(Object):
'''
Diagnostics about a potential problem reported by Squirrel check.
'''
type = SquirrelCheckProblemType.T(
help='Coding indicating the type of problem detected.')
symptom = String.T(
help='Short description of the problem.')
details = List.T(
String.T(),
help='Details about the problem.')
[docs]class KindChoiceWCR(StringChoice):
choices = ['waveform', 'channel', 'response']
[docs]class SquirrelCheckEntry(Object):
'''
Squirrel check result for a given channel/response/waveform.
'''
codes = CodesNSLCE.T(
help='Codes denominating a seismic channel.')
available = List.T(
KindChoiceWCR.T(),
help='Available content kinds.')
problems = List.T(
SquirrelCheckProblem.T(),
help='Potential problems detected.')
def get_text(self):
lines = []
lines.append(' %s: %s' % (
self.codes.channel
+ ('.%s' % self.codes.extra if self.codes.extra != '' else ''),
', '.join(self.available)))
for problem in self.problems:
lines.append(' - %s [%s]' % (problem.symptom, problem.type))
for detail in problem.details:
lines.append(' - %s' % detail)
return '\n'.join(lines)
[docs]class SquirrelCheck(Object):
'''
Container for Squirrel check results.
'''
entries = List.T(SquirrelCheckEntry.T(), help='')
[docs] def get_nproblems(self):
'''
Total number of problems detected.
:rtype: int
'''
return sum(len(entry.problems) for entry in self.entries)
[docs] def get_summary(self):
'''
Textual summary of check result.
:rtype: str
'''
nproblems = self.get_nproblems()
lines = []
lines.append('%i potential problem%s discovered.' % (
nproblems, util.plural_s(nproblems)))
by_type = {}
for entry in self.entries:
for problem in entry.problems:
t = problem.type
if t not in by_type:
by_type[t] = 0
by_type[t] += 1
for t in sorted(by_type.keys()):
lines.append(' %5i [%s]: %s' % (
by_type[t], t, SquirrelCheckProblemType.types[t]))
return '\n'.join(lines)
[docs] def get_text(self, verbosity=0):
'''
Textual representation of check result.
:param verbosity:
Set verbosity level.
:type verbosity:
int
:rtype: str
'''
lines = []
by_nsl = {}
for entry in self.entries:
nsl = entry.codes.codes_nsl
if nsl not in by_nsl:
by_nsl[nsl] = []
by_nsl[nsl].append(entry)
for nsl in sorted(by_nsl.keys()):
entries_this = by_nsl[nsl]
nproblems = sum(len(entry.problems) for entry in entries_this)
ok = nproblems == 0
if ok and verbosity >= 1:
lines.append('')
lines.append('%s: ok' % str(nsl))
if not ok:
lines.append('')
lines.append('%s: %i potential problem%s' % (
str(nsl),
nproblems,
util.plural_s(nproblems)))
if not ok or verbosity >= 2:
for entry in entries_this:
lines.append(entry.get_text())
if self.get_nproblems() > 0 or verbosity >= 1:
lines.append('')
lines.append(self.get_summary())
return '\n'.join(lines)
[docs]def do_check(squirrel, codes=None, tmin=None, tmax=None, time=None, ignore=[]):
'''
Check for common data/metadata problems.
:param squirrel:
The Squirrel instance to be checked.
:type squirrel:
:py:class:`~pyrocko.squirrel.base.Squirrel`
:param tmin:
Start time of query interval.
:type tmin:
:py:func:`pyrocko.util.get_time_float`
:param tmax:
End time of query interval.
:type tmax:
:py:func:`pyrocko.util.get_time_float`
:param time:
Time instant to query. Equivalent to setting ``tmin`` and ``tmax``
to the same value.
:type time:
:py:func:`pyrocko.util.get_time_float`
:param codes:
Pattern of channel codes to query.
:type codes:
:class:`list` of :py:class:`~pyrocko.squirrel.model.CodesNSLCE`
objects
:param ignore:
Problem types to be ignored.
:type ignore:
:class:`list` of :class:`str` (:py:class:`SquirrelCheckProblemType`)
:returns:
:py:class:`SquirrelCheck` object containing the results of the check.
'''
codes_set = set()
for kind in ['waveform', 'channel', 'response']:
if codes is not None:
codes_pat = codes_patterns_for_kind(to_kind_id(kind), codes)
else:
codes_pat = None
codes_filter = CodesPatternFiltering(codes=codes_pat)
codes_set.update(
codes_filter.filter(squirrel.get_codes(kind=kind)))
entries = []
for codes_ in list(sorted(codes_set)):
problems = []
coverage = {}
for kind in ['waveform', 'channel', 'response']:
coverage[kind] = squirrel.get_coverage(
kind,
codes=[codes_],
tmin=tmin if tmin is not None else time,
tmax=tmax if tmax is not None else time)
available = [
kind for kind in ['waveform', 'channel', 'response']
if coverage[kind] and any(
cov.total is not None for cov in coverage[kind])]
for kind in ['waveform']:
for cov in coverage[kind]:
if any(count > 1 for (_, count) in cov.changes):
problems.append(SquirrelCheckProblem(
type='p1',
symptom='%s: %s' % (kind, 'duplicates')))
for kind in ['channel', 'response']:
for cov in coverage[kind]:
if any(count > 1 for (_, count) in cov.changes):
problems.append(SquirrelCheckProblem(
type='p2',
symptom='%s: %s' % (kind, 'overlapping epochs')))
if 'waveform' not in available:
problems.append(SquirrelCheckProblem(
type='p3',
symptom='no waveforms'))
for cw in coverage['waveform']:
for kind in ['channel', 'response']:
ccs = get_matching(coverage[kind], cw)
if not ccs:
problems.append(SquirrelCheckProblem(
type='p4',
symptom='no %s information' % kind))
elif len(ccs) > 1:
problems.append(SquirrelCheckProblem(
type='p5',
symptom='multiple %s matches (waveform: %g Hz, %s: %s)'
% (kind, 1.0 / cw.deltat, kind, ', '.join(
'%g Hz' % (1.0 / cc.deltat)
if cc.deltat else '? Hz' for cc in ccs))))
if ccs:
cc = ccs[0]
if cc.deltat and cc.deltat != cw.deltat:
problems.append(SquirrelCheckProblem(
type='p6',
symptom='sampling rate mismatch '
'(waveform: %g Hz, %s: %g Hz)' % (
1.0 / cw.deltat, kind, 1.0 / cc.deltat)))
uncovered_spans = list(cw.iter_uncovered_by_combined(cc))
if uncovered_spans:
problems.append(SquirrelCheckProblem(
type='p7',
symptom='incompletely covered by %s:' % kind,
details=[
'%s - %s' % (
util.time_to_str(span[0]),
util.time_to_str(span[1]))
for span in uncovered_spans]))
entries.append(SquirrelCheckEntry(
codes=codes_,
available=available,
problems=[p for p in problems if p.type not in ignore]))
return SquirrelCheck(entries=entries)
__all__ = [
'SquirrelCheckProblemType',
'SquirrelCheckProblem',
'SquirrelCheckEntry',
'SquirrelCheck',
'do_check']