# http://pyrocko.org - GPLv3
#
# The Pyrocko Developers, 21st Century
# ---|P------/S----------~Lg----------
# python 2/3
from __future__ import absolute_import
import os
import math
import logging
import numpy as num
from pyrocko import trace, util, plot
from pyrocko.guts import Object, Int, String, Timestamp
from . import io_common
logger = logging.getLogger('pyrocko.io.datacube')
N_GPS_TAGS_WANTED = 200 # must match definition in datacube_ext.c
[docs]def color(c):
c = plot.color(c)
return tuple(x/255. for x in c)
[docs]class DataCubeError(io_common.FileLoadError):
pass
[docs]class ControlPointError(Exception):
pass
[docs]def make_control_point(ipos_block, t_block, tref, deltat):
# reduce time (no drift would mean straight line)
tred = (t_block - tref) - ipos_block*deltat
# first round, remove outliers
q25, q75 = num.percentile(tred, (25., 75.))
iok = num.logical_and(q25 <= tred, tred <= q75)
# detrend
slope, offset = num.polyfit(ipos_block[iok], tred[iok], 1)
tred2 = tred - (offset + slope * ipos_block)
# second round, remove outliers based on detrended tred, refit
q25, q75 = num.percentile(tred2, (25., 75.))
iok = num.logical_and(q25 <= tred2, tred2 <= q75)
x = ipos_block[iok].copy()
ipos0 = x[0]
x -= ipos0
y = tred[iok].copy()
(slope, offset), cov = num.polyfit(x, y, 1, cov=True)
slope_err, offset_err = num.sqrt(num.diag(cov))
slope_err_limit = 1.0e-10
offset_err_limit = 5.0e-3
if slope_err > slope_err_limit:
raise ControlPointError('slope error too large')
if offset_err > offset_err_limit:
raise ControlPointError('offset error too large')
ic = ipos_block[ipos_block.size//2]
tc = offset + slope * (ic - ipos0)
return ic, tc + ic * deltat + tref
[docs]def plot_timeline(fns):
from matplotlib import pyplot as plt
fig = plt.figure()
axes = fig.gca()
h = 3600.
if isinstance(fns, str):
fn = fns
if os.path.isdir(fn):
fns = [
os.path.join(fn, entry) for entry in sorted(os.listdir(fn))]
ipos, t, fix, nsvs, header, offset, nsamples = \
get_timing_context(fns)
else:
ipos, t, fix, nsvs, header, offset, nsamples = \
get_extended_timing_context(fn)
else:
ipos, t, fix, nsvs, header, offset, nsamples = \
get_timing_context(fns)
deltat = 1.0 / int(header['S_RATE'])
tref = num.median(t - ipos * deltat)
tref = round(tref / deltat) * deltat
x = ipos*deltat
y = (t - tref) - ipos*deltat
bfix = fix != 0
bnofix = fix == 0
tmin, tmax, icontrol, tcontrol, ok = analyse_gps_tags(
header, (ipos, t, fix, nsvs), offset, nsamples)
la = num.logical_and
nok = num.logical_not(ok)
axes.plot(
x[la(bfix, ok)]/h, y[la(bfix, ok)], '+',
ms=5, color=color('chameleon3'))
axes.plot(
x[la(bfix, nok)]/h, y[la(bfix, nok)], '+',
ms=5, color=color('aluminium4'))
axes.plot(
x[la(bnofix, ok)]/h, y[la(bnofix, ok)], 'x',
ms=5, color=color('chocolate3'))
axes.plot(
x[la(bnofix, nok)]/h, y[la(bnofix, nok)], 'x',
ms=5, color=color('aluminium4'))
tred = tcontrol - icontrol*deltat - tref
axes.plot(icontrol*deltat/h, tred, color=color('aluminium6'))
axes.plot(icontrol*deltat/h, tred, 'o', ms=5, color=color('aluminium6'))
ymin = (math.floor(tred.min() / deltat)-1) * deltat
ymax = (math.ceil(tred.max() / deltat)+1) * deltat
# axes.set_ylim(ymin, ymax)
if ymax - ymin < 1000 * deltat:
ygrid = math.floor(tred.min() / deltat) * deltat
while ygrid < ymax:
axes.axhline(ygrid, color=color('aluminium4'), alpha=0.3)
ygrid += deltat
xmin = icontrol[0]*deltat/h
xmax = icontrol[-1]*deltat/h
xsize = xmax - xmin
xmin -= xsize * 0.1
xmax += xsize * 0.1
axes.set_xlim(xmin, xmax)
axes.set_ylim(ymin, ymax)
axes.set_xlabel('Uncorrected (quartz) time [h]')
axes.set_ylabel('Relative time correction [s]')
plt.show()
g_dir_contexts = {}
[docs]class DirContextEntry(Object):
path = String.T()
tstart = Timestamp.T()
ifile = Int.T()
[docs]class DirContext(Object):
path = String.T()
mtime = Timestamp.T()
entries = DirContextEntry.T()
[docs] def get_entry(self, fn):
path = os.path.abspath(fn)
for entry in self.entries:
if entry.path == path:
return entry
raise Exception('entry not found')
[docs] def iter_entries(self, fn, step=1):
current = self.get_entry(fn)
by_ifile = dict(
(entry.ifile, entry) for entry in self.entries
if entry.tstart == current.tstart)
icurrent = current.ifile
while True:
icurrent += step
try:
yield by_ifile[icurrent]
except KeyError:
break
[docs]def context(fn):
from pyrocko import datacube_ext
dpath = os.path.dirname(os.path.abspath(fn))
mtimes = [os.stat(dpath)[8]]
dentries = sorted([os.path.join(dpath, f) for f in os.listdir(dpath)
if os.path.isfile(os.path.join(dpath, f))])
for dentry in dentries:
fn2 = os.path.join(dpath, dentry)
mtimes.append(os.stat(fn2)[8])
mtime = float(max(mtimes))
if dpath in g_dir_contexts:
dir_context = g_dir_contexts[dpath]
if dir_context.mtime == mtime:
return dir_context
del g_dir_contexts[dpath]
entries = []
for dentry in dentries:
fn2 = os.path.join(dpath, dentry)
if not os.path.isfile(fn2):
continue
with open(fn2, 'rb') as f:
first512 = f.read(512)
if not detect(first512):
continue
with open(fn2, 'rb') as f:
try:
header, data_arrays, gps_tags, nsamples, _ = \
datacube_ext.load(f.fileno(), 3, 0, -1, None)
except datacube_ext.DataCubeError as e:
e = DataCubeError(str(e))
e.set_context('filename', fn)
raise e
header = dict(header)
entries.append(DirContextEntry(
path=os.path.abspath(fn2),
tstart=util.str_to_time(
'20' + header['S_DATE'] + ' ' + header['S_TIME'],
format='%Y/%m/%d %H:%M:%S'),
ifile=int(header['DAT_NO'])))
dir_context = DirContext(mtime=mtime, path=dpath, entries=entries)
return dir_context
[docs]def get_time_infos(fn):
from pyrocko import datacube_ext
with open(fn, 'rb') as f:
try:
header, _, gps_tags, nsamples, _ = datacube_ext.load(
f.fileno(), 1, 0, -1, None)
except datacube_ext.DataCubeError as e:
e = DataCubeError(str(e))
e.set_context('filename', fn)
raise e
return dict(header), gps_tags, nsamples
[docs]def get_timing_context(fns):
joined = [[], [], [], []]
ioff = 0
for fn in fns:
header, gps_tags, nsamples = get_time_infos(fn)
ipos = gps_tags[0]
ipos += ioff
for i in range(4):
joined[i].append(gps_tags[i])
ioff += nsamples
ipos, t, fix, nsvs = [num.concatenate(x) for x in joined]
nsamples = ioff
return ipos, t, fix, nsvs, header, 0, nsamples
[docs]def get_extended_timing_context(fn):
c = context(fn)
header, gps_tags, nsamples_base = get_time_infos(fn)
ioff = 0
aggregated = [gps_tags]
nsamples_total = nsamples_base
if num.sum(gps_tags[2]) == 0:
ioff = nsamples_base
for entry in c.iter_entries(fn, 1):
_, gps_tags, nsamples = get_time_infos(entry.path)
ipos = gps_tags[0]
ipos += ioff
aggregated.append(gps_tags)
nsamples_total += nsamples
if num.sum(gps_tags[2]) > 0:
break
ioff += nsamples
ioff = 0
for entry in c.iter_entries(fn, -1):
_, gps_tags, nsamples = get_time_infos(entry.path)
ioff -= nsamples
ipos = gps_tags[0]
ipos += ioff
aggregated[0:0] = [gps_tags]
nsamples_total += nsamples
if num.sum(gps_tags[2]) > 0:
break
ipos, t, fix, nsvs = [num.concatenate(x) for x in zip(*aggregated)]
# return ipos, t, fix, nsvs, header, ioff, nsamples_total
return ipos, t, fix, nsvs, header, 0, nsamples_base
[docs]def iload(fn, load_data=True, interpolation='sinc'):
from pyrocko import datacube_ext
from pyrocko import signal_ext
if interpolation not in ('sinc', 'off'):
raise NotImplementedError(
'no such interpolation method: %s' % interpolation)
with open(fn, 'rb') as f:
if load_data:
loadflag = 2
else:
loadflag = 1
try:
header, data_arrays, gps_tags, nsamples, _ = datacube_ext.load(
f.fileno(), loadflag, 0, -1, None)
except datacube_ext.DataCubeError as e:
e = DataCubeError(str(e))
e.set_context('filename', fn)
raise e
header = dict(header)
deltat = 1.0 / int(header['S_RATE'])
nchannels = int(header['CH_NUM'])
ipos, t, fix, nsvs, header_, offset_, nsamples_ = \
get_extended_timing_context(fn)
tmin, tmax, icontrol, tcontrol, _ = analyse_gps_tags(
header_, (ipos, t, fix, nsvs), offset_, nsamples_)
if icontrol is None:
logger.warn(
'No usable GPS timestamps found. Using datacube header '
'information to guess time. (file: "%s")' % fn)
tmin_ip = round(tmin / deltat) * deltat
if interpolation != 'off':
tmax_ip = round(tmax / deltat) * deltat
else:
tmax_ip = tmin_ip + (nsamples-1) * deltat
nsamples_ip = int(round((tmax_ip - tmin_ip)/deltat)) + 1
# to prevent problems with rounding errors:
tmax_ip = tmin_ip + (nsamples_ip-1) * deltat
leaps = num.array(
[x[0] + util.gps_utc_offset(x[0]) for x in util.read_leap_seconds2()],
dtype=num.float)
if load_data and icontrol is not None:
ncontrol_this = num.sum(
num.logical_and(0 <= icontrol, icontrol < nsamples))
if ncontrol_this <= 1:
logger.warn(
'Extrapolating GPS time information from directory context '
'(insufficient number of GPS timestamps in file: "%s").' % fn)
for i in range(nchannels):
if load_data:
arr = data_arrays[i]
assert arr.size == nsamples
if interpolation == 'sinc' and icontrol is not None:
ydata = num.empty(nsamples_ip, dtype=num.float)
try:
signal_ext.antidrift(
icontrol, tcontrol,
arr.astype(num.float), tmin_ip, deltat, ydata)
except signal_ext.Error as e:
e = DataCubeError(str(e))
e.set_context('filename', fn)
e.set_context('n_control_points', icontrol.size)
e.set_context('n_samples_raw', arr.size)
e.set_context('n_samples_ip', ydata.size)
e.set_context('tmin_ip', util.time_to_str(tmin_ip))
raise e
ydata = num.round(ydata).astype(arr.dtype)
else:
ydata = arr
tr_tmin = tmin_ip
tr_tmax = None
else:
ydata = None
tr_tmin = tmin_ip
tr_tmax = tmax_ip
tr = trace.Trace('', header['DEV_NO'], '', 'p%i' % i, deltat=deltat,
ydata=ydata, tmin=tr_tmin, tmax=tr_tmax, meta=header)
bleaps = num.logical_and(tmin_ip <= leaps, leaps < tmax_ip)
if num.any(bleaps):
assert num.sum(bleaps) == 1
tcut = leaps[bleaps][0]
for tmin_cut, tmax_cut in [
(tr.tmin, tcut), (tcut, tr.tmax+tr.deltat)]:
try:
tr_cut = tr.chop(tmin_cut, tmax_cut, inplace=False)
tr_cut.shift(
util.utc_gps_offset(0.5*(tr_cut.tmin+tr_cut.tmax)))
yield tr_cut
except trace.NoData:
pass
else:
tr.shift(util.utc_gps_offset(0.5*(tr.tmin+tr.tmax)))
yield tr
header_keys = {
str: b'GIPP_V DEV_NO E_NAME GPS_PO S_TIME S_DATE DAT_NO'.split(),
int: b'''P_AMPL CH_NUM S_RATE D_FILT C_MODE A_CHOP F_TIME GPS_TI GPS_OF
A_FILT A_PHAS GPS_ON ACQ_ON V_TCXO D_VOLT E_VOLT'''.split()}
all_header_keys = header_keys[str] + header_keys[int]
[docs]def detect(first512):
s = first512
if len(s) < 512:
return False
if ord(s[0:1]) >> 4 != 15:
return False
n = s.find(b'\x80')
if n == -1:
n = len(s)
s = s[1:n]
s = s.replace(b'\xf0', b'')
s = s.replace(b';', b' ')
s = s.replace(b'=', b' ')
kvs = s.split(b' ')
if len([k for k in all_header_keys if k in kvs]) == 0:
return False
return True
if __name__ == '__main__':
import sys
fns = sys.argv[1:]
if len(fns) > 1:
plot_timeline(fns)
else:
plot_timeline(fns[0])