# http://pyrocko.org - GPLv3
#
# The Pyrocko Developers, 21st Century
# ---|P------/S----------~Lg----------
from __future__ import absolute_import, print_function
import time
import os
import copy
import logging
import tempfile
from collections import defaultdict
try:
import cPickle as pickle
except ImportError:
import pickle
import os.path as op
from .base import Source, Constraint
from ..model import make_waveform_promise_nut, ehash, InvalidWaveform, \
order_summary, WaveformOrder, g_tmin, g_tmax, g_tmin_queries, \
codes_to_str_abbreviated
from ..database import ExecuteGet1Error
from pyrocko.client import fdsn
from pyrocko import util, trace, io
from pyrocko.io.io_common import FileLoadError
from pyrocko.io import stationxml
from pyrocko.progress import progress
from pyrocko.guts import Object, String, Timestamp, List, Tuple, Int, Dict, \
Duration, Bool
guts_prefix = 'squirrel'
fdsn.g_timeout = 60.
logger = logging.getLogger('psq.client.fdsn')
sites_not_supporting = {
'startbefore': ['geonet'],
'includerestricted': ['geonet']}
def make_task(*args):
return progress.task(*args, logger=logger)
def plural_s(x):
if not isinstance(x, int):
x = len(x)
return 's' if x != 1 else ''
def diff(fn_a, fn_b):
try:
if os.stat(fn_a).st_size != os.stat(fn_b).st_size:
return True
except OSError:
return True
with open(fn_a, 'rb') as fa:
with open(fn_b, 'rb') as fb:
while True:
a = fa.read(1024)
b = fb.read(1024)
if a != b:
return True
if len(a) == 0 or len(b) == 0:
return False
def move_or_keep(fn_temp, fn):
if op.exists(fn):
if diff(fn, fn_temp):
os.rename(fn_temp, fn)
status = 'updated'
else:
os.unlink(fn_temp)
status = 'upstream unchanged'
else:
os.rename(fn_temp, fn)
status = 'new'
return status
class Archive(Object):
def add(self):
raise NotImplementedError()
class MSeedArchive(Archive):
template = String.T(default=op.join(
'%(tmin_year)s',
'%(tmin_month)s',
'%(tmin_day)s',
'trace_%(network)s_%(station)s_%(location)s_%(channel)s'
+ '_%(tmin_us)s_%(tmax_us)s.mseed'))
def __init__(self, **kwargs):
Archive.__init__(self, **kwargs)
self._base_path = None
def set_base_path(self, path):
self._base_path = path
def add(self, trs):
path = op.join(self._base_path, self.template)
return io.save(trs, path, overwrite=True)
def combine_selections(selection):
out = []
last = None
for this in selection:
if last and this[:4] == last[:4] and this[4] == last[5]:
last = last[:5] + (this[5],)
else:
if last:
out.append(last)
last = this
if last:
out.append(last)
return out
def orders_sort_key(order):
return (order.codes, order.tmin)
def orders_to_selection(orders):
selection = []
for order in sorted(orders, key=orders_sort_key):
selection.append(
order.codes[1:5] + (order.tmin, order.tmax))
return combine_selections(selection)
class ErrorEntry(Object):
time = Timestamp.T()
order = WaveformOrder.T()
kind = String.T()
details = String.T(optional=True)
class ErrorAggregate(Object):
site = String.T()
kind = String.T()
details = String.T()
entries = List.T(ErrorEntry.T())
codes_list = List.T(Tuple.T(None, String.T()))
time_spans = List.T(Tuple.T(2, Timestamp.T()))
def __str__(self):
codes = ['.'.join(x) for x in self.codes_list]
scodes = '\n' + util.ewrap(codes, indent=' ') if codes else '<none>'
tss = self.time_spans
sspans = '\n' + util.ewrap(('%s - %s' % (
util.time_to_str(ts[0]), util.time_to_str(ts[1])) for ts in tss),
indent=' ')
return ('FDSN "%s": download error summary for "%s" (%i)\n%s '
'Codes:%s\n Time spans:%s') % (
self.site,
self.kind,
len(self.entries),
' Details: %s\n' % self.details if self.details else '',
scodes,
sspans)
class ErrorLog(Object):
site = String.T()
entries = List.T(ErrorEntry.T())
checkpoints = List.T(Int.T())
def append_checkpoint(self):
self.checkpoints.append(len(self.entries))
def append(self, time, order, kind, details=''):
entry = ErrorEntry(time=time, order=order, kind=kind, details=details)
self.entries.append(entry)
def iter_aggregates(self):
by_kind_details = defaultdict(list)
for entry in self.entries:
by_kind_details[entry.kind, entry.details].append(entry)
kind_details = sorted(by_kind_details.keys())
for kind, details in kind_details:
entries = by_kind_details[kind, details]
codes_list = sorted(set(entry.order.codes for entry in entries))
selection = orders_to_selection(entry.order for entry in entries)
time_spans = sorted(set(row[-2:] for row in selection))
yield ErrorAggregate(
site=self.site,
kind=kind,
details=details,
entries=entries,
codes_list=codes_list,
time_spans=time_spans)
def summarize_recent(self):
ioff = self.checkpoints[-1] if self.checkpoints else 0
recent = self.entries[ioff:]
kinds = sorted(set(entry.kind for entry in recent))
if recent:
return '%i error%s (%s)' % (
len(recent), plural_s(recent), '; '.join(kinds))
else:
return ''
[docs]class FDSNSource(Source):
'''
Squirrel data-source to transparently get data from FDSN web services.
Attaching an :py:class:`FDSNSource` object to a :py:class:`Squirrel` allows
the latter to download station and waveform data from an FDSN web service
should the data not already happen to be available locally.
'''
site = String.T(
help='FDSN site url or alias name (see '
':py:mod:`pyrocko.client.fdsn`).')
query_args = Dict.T(
String.T(), String.T(),
optional=True,
help='Common query arguments, which are appended to all queries.')
expires = Duration.T(
optional=True,
help='Expiration time [s]. Information older than this will be '
'refreshed. This only applies to station-metadata. Waveforms do '
'not expire. If set to ``None`` neither type of data expires.')
cache_path = String.T(
optional=True,
help='Directory path where any downloaded waveforms and station '
'meta-data are to be kept. By default the Squirrel '
'environment\'s cache directory is used.')
shared_waveforms = Bool.T(
default=True,
help='If ``True``, waveforms are shared with other FDSN sources in '
'the same Squirrel environment. If ``False``, they are kept '
'separate.')
user_credentials = Tuple.T(
2, String.T(),
optional=True,
help='User and password for FDSN servers requiring password '
'authentication')
auth_token = String.T(
optional=True,
help='Authentication token to be presented to the FDSN server.')
auth_token_path = String.T(
optional=True,
help='Path to file containing the authentication token to be '
'presented to the FDSN server.')
def __init__(self, site, query_args=None, **kwargs):
Source.__init__(self, site=site, query_args=query_args, **kwargs)
self._constraint = None
self._hash = self.make_hash()
self._source_id = 'client:fdsn:%s' % self._hash
self._error_infos = []
def describe(self):
return self._source_id
def make_hash(self):
s = self.site
s += 'notoken' \
if (self.auth_token is None and self.auth_token_path is None) \
else 'token'
if self.user_credentials is not None:
s += self.user_credentials[0]
else:
s += 'nocred'
if self.query_args is not None:
s += ','.join(
'%s:%s' % (k, self.query_args[k])
for k in sorted(self.query_args.keys()))
else:
s += 'noqueryargs'
return ehash(s)
def get_hash(self):
return self._hash
def get_auth_token(self):
if self.auth_token:
return self.auth_token
elif self.auth_token_path is not None:
try:
with open(self.auth_token_path, 'rb') as f:
return f.read().decode('ascii')
except OSError as e:
raise FileLoadError(
'Cannot load auth token file (%s): %s'
% (str(e), self.auth_token_path))
else:
raise Exception(
'FDSNSource: auth_token and auth_token_path are mutually '
'exclusive.')
def setup(self, squirrel, check=True):
self._cache_path = op.join(
self.cache_path or squirrel._cache_path, 'fdsn')
util.ensuredir(self._cache_path)
self._load_constraint()
self._archive = MSeedArchive()
waveforms_path = self._get_waveforms_path()
util.ensuredir(waveforms_path)
self._archive.set_base_path(waveforms_path)
squirrel.add(
self._get_waveforms_path(),
check=check)
fn = self._get_channels_path()
if os.path.exists(fn):
squirrel.add(fn)
squirrel.add_virtual(
[], virtual_paths=[self._source_id])
responses_path = self._get_responses_path()
if os.path.exists(responses_path):
squirrel.add(responses_path, kinds=['response'])
def _get_constraint_path(self):
return op.join(self._cache_path, self._hash, 'constraint.pickle')
def _get_channels_path(self):
return op.join(self._cache_path, self._hash, 'channels.stationxml')
def _get_responses_path(self, nslc=None):
dirpath = op.join(
self._cache_path, self._hash, 'responses')
if nslc is None:
return dirpath
else:
return op.join(
dirpath, 'response_%s_%s_%s_%s.stationxml' % nslc)
def _get_waveforms_path(self):
if self.shared_waveforms:
return op.join(self._cache_path, 'waveforms')
else:
return op.join(self._cache_path, self._hash, 'waveforms')
def _log_meta(self, message, target=logger.info):
log_prefix = 'FDSN "%s" metadata:' % self.site
target(' '.join((log_prefix, message)))
def _log_responses(self, message, target=logger.info):
log_prefix = 'FDSN "%s" responses:' % self.site
target(' '.join((log_prefix, message)))
def _log_info_data(self, *args):
log_prefix = 'FDSN "%s" waveforms:' % self.site
logger.info(' '.join((log_prefix,) + args))
def _str_expires(self, t, now):
if t is None:
return 'expires: never'
else:
expire = 'expires' if t > now else 'expired'
return '%s: %s' % (
expire,
util.time_to_str(t, format='%Y-%m-%d %H:%M:%S'))
[docs] def update_channel_inventory(self, squirrel, constraint=None):
if constraint is None:
constraint = Constraint()
expiration_time = self._get_channels_expiration_time()
now = time.time()
log_target = logger.info
if self._constraint and self._constraint.contains(constraint) \
and (expiration_time is None or now < expiration_time):
status = 'using cached'
else:
if self._constraint:
constraint_temp = copy.deepcopy(self._constraint)
constraint_temp.expand(constraint)
constraint = constraint_temp
try:
channel_sx = self._do_channel_query(constraint)
channel_sx.created = None # timestamp would ruin diff
fn = self._get_channels_path()
util.ensuredirs(fn)
fn_temp = fn + '.%i.temp' % os.getpid()
channel_sx.dump_xml(filename=fn_temp)
status = move_or_keep(fn_temp, fn)
if status == 'upstream unchanged':
squirrel.get_database().silent_touch(fn)
self._constraint = constraint
self._dump_constraint()
except OSError as e:
status = 'update failed (%s)' % str(e)
log_target = logger.error
expiration_time = self._get_channels_expiration_time()
self._log_meta(
'%s (%s)' % (status, self._str_expires(expiration_time, now)),
target=log_target)
fn = self._get_channels_path()
if os.path.exists(fn):
squirrel.add(fn)
def _do_channel_query(self, constraint):
extra_args = {}
if self.site in sites_not_supporting['startbefore']:
if constraint.tmin is not None and constraint.tmin != g_tmin:
extra_args['starttime'] = constraint.tmin
if constraint.tmax is not None and constraint.tmax != g_tmax:
extra_args['endtime'] = constraint.tmax
else:
if constraint.tmin is not None and constraint.tmin != g_tmin:
extra_args['endafter'] = constraint.tmin
if constraint.tmax is not None and constraint.tmax != g_tmax:
extra_args['startbefore'] = constraint.tmax
if self.site not in sites_not_supporting['includerestricted']:
extra_args.update(
includerestricted=(
self.user_credentials is not None
or self.auth_token is not None
or self.auth_token_path is not None))
if self.query_args is not None:
extra_args.update(self.query_args)
self._log_meta('querying...')
try:
channel_sx = fdsn.station(
site=self.site,
format='text',
level='channel',
**extra_args)
return channel_sx
except fdsn.EmptyResult:
return stationxml.FDSNStationXML(source='dummy-empty-result')
def _load_constraint(self):
fn = self._get_constraint_path()
if op.exists(fn):
with open(fn, 'rb') as f:
self._constraint = pickle.load(f)
else:
self._constraint = None
def _dump_constraint(self):
with open(self._get_constraint_path(), 'wb') as f:
pickle.dump(self._constraint, f, protocol=2)
def _get_expiration_time(self, path):
if self.expires is None:
return None
try:
t = os.stat(path)[8]
return t + self.expires
except OSError:
return 0.0
def _get_channels_expiration_time(self):
return self._get_expiration_time(self._get_channels_path())
def _get_user_credentials(self):
d = {}
if self.user_credentials is not None:
d['user'], d['passwd'] = self.user_credentials
if self.auth_token is not None or self.auth_token_path is not None:
d['token'] = self.get_auth_token()
return d
def download_waveforms(
self, orders, success, batch_add, error_permanent,
error_temporary):
elog = ErrorLog(site=self.site)
orders.sort(key=orders_sort_key)
neach = 20
i = 0
task = make_task(
'FDSN "%s" waveforms: downloading' % self.site, len(orders))
while i < len(orders):
orders_now = orders[i:i+neach]
selection_now = orders_to_selection(orders_now)
nsuccess = 0
elog.append_checkpoint()
self._log_info_data(
'downloading, %s' % order_summary(orders_now))
all_paths = []
with tempfile.TemporaryDirectory() as tmpdir:
try:
data = fdsn.dataselect(
site=self.site, selection=selection_now,
**self._get_user_credentials())
now = time.time()
path = op.join(tmpdir, 'tmp.mseed')
with open(path, 'wb') as f:
while True:
buf = data.read(1024)
if not buf:
break
f.write(buf)
trs = io.load(path)
by_nslc = defaultdict(list)
for tr in trs:
by_nslc[tr.nslc_id].append(tr)
for order in orders_now:
trs_order = []
err_this = None
for tr in by_nslc[order.codes[1:5]]:
try:
order.validate(tr)
trs_order.append(tr.chop(
order.tmin, order.tmax, inplace=False))
except trace.NoData:
err_this = (
'empty result', 'empty sub-interval')
except InvalidWaveform as e:
err_this = ('invalid waveform', str(e))
if len(trs_order) == 0:
if err_this is None:
err_this = ('empty result', '')
elog.append(now, order, *err_this)
error_permanent(order)
else:
if len(trs_order) != 1:
if err_this:
elog.append(
now, order,
'partial result, %s' % err_this[0],
err_this[1])
else:
elog.append(now, order, 'partial result')
paths = self._archive.add(trs_order)
all_paths.extend(paths)
nsuccess += 1
success(order)
except fdsn.EmptyResult:
now = time.time()
for order in orders_now:
elog.append(now, order, 'empty result')
error_permanent(order)
except util.HTTPError as e:
now = time.time()
for order in orders_now:
elog.append(now, order, 'http error', str(e))
error_temporary(order)
emessage = elog.summarize_recent()
self._log_info_data(
'%i download%s successful' % (nsuccess, plural_s(nsuccess))
+ (', %s' % emessage if emessage else ''))
if all_paths:
batch_add(all_paths)
i += neach
task.update(i)
for agg in elog.iter_aggregates():
logger.warning(str(agg))
task.done()
def _do_response_query(self, selection):
extra_args = {}
if self.site not in sites_not_supporting['includerestricted']:
extra_args.update(
includerestricted=(
self.user_credentials is not None
or self.auth_token is not None
or self.auth_token_path is not None))
self._log_responses('querying...')
try:
response_sx = fdsn.station(
site=self.site,
level='response',
selection=selection,
**extra_args)
return response_sx
except fdsn.EmptyResult:
return stationxml.FDSNStationXML(source='dummy-empty-result')
def update_response_inventory(self, squirrel, constraint):
cpath = os.path.abspath(self._get_channels_path())
nuts = squirrel.iter_nuts('channel', path=cpath)
tmin = g_tmin_queries
tmax = g_tmax
selection = []
now = time.time()
have = set()
status = defaultdict(list)
for nut in nuts:
nslc = nut.codes_tuple[1:5]
if nslc in have:
continue
have.add(nslc)
fn = self._get_responses_path(nslc)
expiration_time = self._get_expiration_time(fn)
if os.path.exists(fn) \
and (expiration_time is None or now < expiration_time):
status['using cached'].append(nslc)
else:
selection.append(nslc + (tmin, tmax))
dummy = stationxml.FDSNStationXML(source='dummy-empty')
neach = 100
i = 0
fns = []
while i < len(selection):
selection_now = selection[i:i+neach]
i += neach
try:
sx = self._do_response_query(selection_now)
except Exception as e:
status['update failed (%s)' % str(e)].extend(
entry[:4] for entry in selection_now)
continue
sx.created = None # timestamp would ruin diff
by_nslc = dict(stationxml.split_channels(sx))
for entry in selection_now:
nslc = entry[:4]
response_sx = by_nslc.get(nslc, dummy)
try:
fn = self._get_responses_path(nslc)
fn_temp = fn + '.%i.temp' % os.getpid()
util.ensuredirs(fn_temp)
response_sx.dump_xml(filename=fn_temp)
status_this = move_or_keep(fn_temp, fn)
if status_this == 'upstream unchanged':
try:
squirrel.get_database().silent_touch(fn)
except ExecuteGet1Error:
pass
status[status_this].append(nslc)
fns.append(fn)
except OSError as e:
status['update failed (%s)' % str(e)].append(nslc)
for k in sorted(status):
if k.find('failed') != -1:
log_target = logger.error
else:
log_target = logger.info
self._log_responses(
'%s: %s' % (
k, codes_to_str_abbreviated(status[k])),
target=log_target)
squirrel.add(fns, kinds=['response'])
__all__ = [
'FDSNSource',
]