# http://pyrocko.org - GPLv3
#
# The Pyrocko Developers, 21st Century
# ---|P------/S----------~Lg----------
# python 2/3
from __future__ import absolute_import
import os
import math
import logging
import numpy as num
from pyrocko import trace, util, plot
from pyrocko.guts import Object, Int, String, Timestamp
from . import io_common
logger = logging.getLogger(__name__)
N_GPS_TAGS_WANTED = 200 # must match definition in datacube_ext.c
APPLY_SUBSAMPLE_SHIFT_CORRECTION = True
def color(c):
c = plot.color(c)
return tuple(x/255. for x in c)
[docs]class DataCubeError(io_common.FileLoadError):
pass
[docs]class ControlPointError(Exception):
pass
def make_control_point(ipos_block, t_block, tref, deltat):
# reduce time (no drift would mean straight line)
tred = (t_block - tref) - ipos_block*deltat
# first round, remove outliers
q25, q75 = num.percentile(tred, (25., 75.))
iok = num.logical_and(q25 <= tred, tred <= q75)
# detrend
slope, offset = num.polyfit(ipos_block[iok], tred[iok], 1)
tred2 = tred - (offset + slope * ipos_block)
# second round, remove outliers based on detrended tred, refit
q25, q75 = num.percentile(tred2, (25., 75.))
iok = num.logical_and(q25 <= tred2, tred2 <= q75)
x = ipos_block[iok].copy()
ipos0 = x[0]
x -= ipos0
y = tred[iok].copy()
if x.size < 2:
raise ControlPointError('Insufficient number control points after QC.')
elif x.size < 5: # needed for older numpy versions
(slope, offset) = num.polyfit(x, y, 1)
else:
(slope, offset), cov = num.polyfit(x, y, 1, cov=True)
slope_err, offset_err = num.sqrt(num.diag(cov))
slope_err_limit = 1.0e-10
if ipos_block.size < N_GPS_TAGS_WANTED // 2:
slope_err_limit *= (200. / ipos_block.size)
offset_err_limit = 5.0e-3
if slope_err > slope_err_limit:
raise ControlPointError(
'Slope error too large: %g (limit: %g' % (
slope_err, slope_err_limit))
if offset_err > offset_err_limit:
raise ControlPointError(
'Offset error too large: %g (limit: %g' % (
offset_err, offset_err_limit))
ic = ipos_block[ipos_block.size//2]
tc = offset + slope * (ic - ipos0)
return ic, tc + ic * deltat + tref
def analyse_gps_tags(header, gps_tags, offset, nsamples):
ipos, t, fix, nsvs = gps_tags
deltat = 1.0 / int(header['S_RATE'])
tquartz = offset + ipos * deltat
toff = t - tquartz
toff_median = num.median(toff)
n = t.size
dtdt = (t[1:n] - t[0:n-1]) / (tquartz[1:n] - tquartz[0:n-1])
ok = abs(toff_median - toff) < 10.
xok = num.abs(dtdt - 1.0) < 0.00001
if ok.size >= 1:
ok[0] = False
ok[1:n] &= xok
ok[0:n-1] &= xok
ok[n-1] = False
ipos = ipos[ok]
t = t[ok]
fix = fix[ok]
nsvs = nsvs[ok]
blocksize = N_GPS_TAGS_WANTED // 2
if ipos.size < blocksize:
blocksize = max(10, ipos.size // 4)
logger.warning(
'Small number of GPS tags found. '
'Reducing analysis block size to %i tags. '
'Time correction may be unreliable.' % blocksize)
try:
if ipos.size < blocksize:
raise ControlPointError(
'Cannot determine GPS time correction: '
'Too few GPS tags found: %i' % ipos.size)
j = 0
control_points = []
tref = num.median(t - ipos*deltat)
while j < ipos.size - blocksize:
ipos_block = ipos[j:j+blocksize]
t_block = t[j:j+blocksize]
try:
ic, tc = make_control_point(ipos_block, t_block, tref, deltat)
control_points.append((ic, tc))
except ControlPointError as e:
logger.debug(str(e))
j += blocksize
ipos_last = ipos[-blocksize:]
t_last = t[-blocksize:]
try:
ic, tc = make_control_point(ipos_last, t_last, tref, deltat)
control_points.append((ic, tc))
except ControlPointError as e:
logger.debug(str(e))
if len(control_points) < 2:
raise ControlPointError(
'Could not safely determine time corrections from GPS: '
'unable to construct two or more control points')
i0, t0 = control_points[0]
i1, t1 = control_points[1]
i2, t2 = control_points[-2]
i3, t3 = control_points[-1]
if len(control_points) == 2:
tmin = t0 - i0 * deltat - offset * deltat
tmax = t3 + (nsamples - i3 - 1) * deltat
else:
icontrol = num.array(
[x[0] for x in control_points], dtype=num.int64)
tcontrol = num.array(
[x[1] for x in control_points], dtype=float)
# robust against steps:
slope = num.median(
(tcontrol[1:] - tcontrol[:-1])
/ (icontrol[1:] - icontrol[:-1]))
tmin = t0 + (offset - i0) * slope
tmax = t2 + (offset + nsamples - 1 - i2) * slope
if offset < i0:
control_points[0:0] = [(offset, tmin)]
if offset + nsamples - 1 > i3:
control_points.append((offset + nsamples - 1, tmax))
icontrol = num.array([x[0] for x in control_points], dtype=num.int64)
tcontrol = num.array([x[1] for x in control_points], dtype=float)
# corrected 2021-10-26: This sub-sample time shift introduced by the
# Cube's ADC was previously not recognized.
if APPLY_SUBSAMPLE_SHIFT_CORRECTION:
tcontrol -= 0.199 * deltat + 0.0003
return tmin, tmax, icontrol, tcontrol, ok
except ControlPointError as e:
logger.error(str(e))
tmin = util.str_to_time(header['S_DATE'] + header['S_TIME'],
format='%y/%m/%d%H:%M:%S')
idat = int(header['DAT_NO'])
if idat == 0:
tmin = tmin + util.gps_utc_offset(tmin)
else:
tmin = util.day_start(tmin + idat * 24.*3600.) \
+ util.gps_utc_offset(tmin)
tmax = tmin + (nsamples - 1) * deltat
icontrol = num.array([offset, offset+nsamples - 1], dtype=num.int64)
tcontrol = num.array([tmin, tmax])
return tmin, tmax, icontrol, tcontrol, ok
def plot_gnss_location_timeline(fns):
from matplotlib import pyplot as plt
from pyrocko.orthodrome import latlon_to_ne_numpy
not_ = num.logical_not
h = 3600.
fig = plt.figure()
axes = []
for i in range(4):
axes.append(
fig.add_subplot(4, 1, i+1, sharex=axes[-1] if axes else None))
background_colors = [
color('aluminium1'),
color('aluminium2')]
tref = None
for ifn, fn in enumerate(fns):
header, gps_tags, nsamples = get_time_infos(fn)
_, t, fix, nsvs, lats, lons, elevations, _ = gps_tags
fix = fix.astype(bool)
if t.size < 2:
logger.warning('Need at least 2 gps tags for plotting: %s' % fn)
if tref is None:
tref = util.day_start(t[0])
lat, lon, elevation = coordinates_from_gps(gps_tags)
norths, easts = latlon_to_ne_numpy(lat, lon, lats, lons)
for ax, data in zip(axes, (norths, easts, elevations, nsvs)):
tspan = t[num.array([0, -1])]
ax.axvspan(*((tspan - tref) / h), color=background_colors[ifn % 2])
med = num.median(data)
ax.plot(
(tspan - tref) / h,
[med, med],
ls='--',
c='k',
lw=3,
alpha=0.5)
ax.plot(
(t[not_(fix)] - tref) / h, data[not_(fix)], 'o',
ms=1.5,
mew=0,
color=color('scarletred2'))
ax.plot(
(t[fix] - tref) / h, data[fix], 'o',
ms=1.5,
mew=0,
color=color('aluminium6'))
for ax in axes:
ax.grid(alpha=.3)
ax_lat, ax_lon, ax_elev, ax_nsv = axes
ax_lat.set_ylabel('Northing [m]')
ax_lon.set_ylabel('Easting [m]')
ax_elev.set_ylabel('Elevation [m]')
ax_nsv.set_ylabel('Number of Satellites')
ax_lat.get_xaxis().set_tick_params(labelbottom=False)
ax_lon.get_xaxis().set_tick_params(labelbottom=False)
ax_nsv.set_xlabel(
'Hours after %s' % util.time_to_str(tref, format='%Y-%m-%d'))
fig.suptitle(
u'Lat: %.5f\u00b0 Lon: %.5f\u00b0 Elevation: %g m' % (
lat, lon, elevation))
plt.show()
def coordinates_from_gps(gps_tags):
ipos, t, fix, nsvs, lats, lons, elevations, temps = gps_tags
return tuple(num.median(x) for x in (lats, lons, elevations))
def extract_stations(fns):
import io
import sys
from pyrocko.model import Station
from pyrocko.guts import dump_all
stations = {}
for fn in fns:
sta_name = os.path.splitext(fn)[1].lstrip('.')
if sta_name in stations:
logger.warning('Cube %s already in list!', sta_name)
continue
header, gps_tags, nsamples = get_time_infos(fn)
lat, lon, elevation = coordinates_from_gps(gps_tags)
sta = Station(
network='',
station=sta_name,
name=sta_name,
location='',
lat=lat,
lon=lon,
elevation=elevation)
stations[sta_name] = sta
f = io.BytesIO()
dump_all(stations.values(), stream=f)
sys.stdout.write(f.getvalue().decode())
def plot_timeline(fns):
from matplotlib import pyplot as plt
fig = plt.figure()
axes = fig.gca()
h = 3600.
if isinstance(fns, str):
fn = fns
if os.path.isdir(fn):
fns = [
os.path.join(fn, entry) for entry in sorted(os.listdir(fn))]
ipos, t, fix, nsvs, header, offset, nsamples = \
get_timing_context(fns)
else:
ipos, t, fix, nsvs, header, offset, nsamples = \
get_extended_timing_context(fn)
else:
ipos, t, fix, nsvs, header, offset, nsamples = \
get_timing_context(fns)
deltat = 1.0 / int(header['S_RATE'])
tref = num.median(t - ipos * deltat)
tref = round(tref / deltat) * deltat
if APPLY_SUBSAMPLE_SHIFT_CORRECTION:
tcorr = 0.199 * deltat + 0.0003
else:
tcorr = 0.0
x = ipos*deltat
y = (t - tref) - ipos*deltat - tcorr
bfix = fix != 0
bnofix = fix == 0
tmin, tmax, icontrol, tcontrol, ok = analyse_gps_tags(
header, (ipos, t, fix, nsvs), offset, nsamples)
la = num.logical_and
nok = num.logical_not(ok)
axes.plot(
x[la(bfix, ok)]/h, y[la(bfix, ok)], '+',
ms=5, color=color('chameleon3'))
axes.plot(
x[la(bfix, nok)]/h, y[la(bfix, nok)], '+',
ms=5, color=color('aluminium4'))
axes.plot(
x[la(bnofix, ok)]/h, y[la(bnofix, ok)], 'x',
ms=5, color=color('chocolate3'))
axes.plot(
x[la(bnofix, nok)]/h, y[la(bnofix, nok)], 'x',
ms=5, color=color('aluminium4'))
tred = tcontrol - icontrol*deltat - tref
axes.plot(icontrol*deltat/h, tred, color=color('aluminium6'))
axes.plot(icontrol*deltat/h, tred, 'o', ms=5, color=color('aluminium6'))
ymin = (math.floor(tred.min() / deltat)-1) * deltat
ymax = (math.ceil(tred.max() / deltat)+1) * deltat
# ymin = min(ymin, num.min(y))
# ymax = max(ymax, num.max(y))
if ymax - ymin < 1000 * deltat:
ygrid = math.floor(tred.min() / deltat) * deltat
while ygrid < ymax:
axes.axhline(ygrid, color=color('aluminium4'), alpha=0.3)
ygrid += deltat
xmin = icontrol[0]*deltat/h
xmax = icontrol[-1]*deltat/h
xsize = xmax - xmin
xmin -= xsize * 0.1
xmax += xsize * 0.1
axes.set_xlim(xmin, xmax)
axes.set_ylim(ymin, ymax)
axes.set_xlabel('Uncorrected (quartz) time [h]')
axes.set_ylabel('Relative time correction [s]')
plt.show()
g_dir_contexts = {}
[docs]class DirContextEntry(Object):
path = String.T()
tstart = Timestamp.T()
ifile = Int.T()
[docs]class DirContext(Object):
path = String.T()
mtime = Timestamp.T()
entries = DirContextEntry.T()
def get_entry(self, fn):
path = os.path.abspath(fn)
for entry in self.entries:
if entry.path == path:
return entry
raise Exception('entry not found')
def iter_entries(self, fn, step=1):
current = self.get_entry(fn)
by_ifile = dict(
(entry.ifile, entry) for entry in self.entries
if entry.tstart == current.tstart)
icurrent = current.ifile
while True:
icurrent += step
try:
yield by_ifile[icurrent]
except KeyError:
break
def context(fn):
from pyrocko import datacube_ext
dpath = os.path.dirname(os.path.abspath(fn))
mtimes = [os.stat(dpath)[8]]
dentries = sorted([os.path.join(dpath, f) for f in os.listdir(dpath)
if os.path.isfile(os.path.join(dpath, f))])
for dentry in dentries:
fn2 = os.path.join(dpath, dentry)
mtimes.append(os.stat(fn2)[8])
mtime = float(max(mtimes))
if dpath in g_dir_contexts:
dir_context = g_dir_contexts[dpath]
if dir_context.mtime == mtime:
return dir_context
del g_dir_contexts[dpath]
entries = []
for dentry in dentries:
fn2 = os.path.join(dpath, dentry)
if not os.path.isfile(fn2):
continue
with open(fn2, 'rb') as f:
first512 = f.read(512)
if not detect(first512):
continue
with open(fn2, 'rb') as f:
try:
header, data_arrays, gps_tags, nsamples, _ = \
datacube_ext.load(f.fileno(), 3, 0, -1, None)
except datacube_ext.DataCubeError as e:
e = DataCubeError(str(e))
e.set_context('filename', fn)
raise e
header = dict(header)
entries.append(DirContextEntry(
path=os.path.abspath(fn2),
tstart=util.str_to_time(
'20' + header['S_DATE'] + ' ' + header['S_TIME'],
format='%Y/%m/%d %H:%M:%S'),
ifile=int(header['DAT_NO'])))
dir_context = DirContext(mtime=mtime, path=dpath, entries=entries)
return dir_context
def get_time_infos(fn):
from pyrocko import datacube_ext
with open(fn, 'rb') as f:
try:
header, _, gps_tags, nsamples, _ = datacube_ext.load(
f.fileno(), 1, 0, -1, None)
except datacube_ext.DataCubeError as e:
e = DataCubeError(str(e))
e.set_context('filename', fn)
raise e
return dict(header), gps_tags, nsamples
def get_timing_context(fns):
joined = [[], [], [], []]
ioff = 0
for fn in fns:
header, gps_tags, nsamples = get_time_infos(fn)
ipos = gps_tags[0]
ipos += ioff
for i in range(4):
joined[i].append(gps_tags[i])
ioff += nsamples
ipos, t, fix, nsvs = [num.concatenate(x) for x in joined]
nsamples = ioff
return ipos, t, fix, nsvs, header, 0, nsamples
def get_extended_timing_context(fn):
c = context(fn)
header, gps_tags, nsamples_base = get_time_infos(fn)
ioff = 0
aggregated = [gps_tags]
nsamples_total = nsamples_base
if num.sum(gps_tags[2]) == 0:
ioff = nsamples_base
for entry in c.iter_entries(fn, 1):
_, gps_tags, nsamples = get_time_infos(entry.path)
ipos = gps_tags[0]
ipos += ioff
aggregated.append(gps_tags)
nsamples_total += nsamples
if num.sum(gps_tags[2]) > 0:
break
ioff += nsamples
ioff = 0
for entry in c.iter_entries(fn, -1):
_, gps_tags, nsamples = get_time_infos(entry.path)
ioff -= nsamples
ipos = gps_tags[0]
ipos += ioff
aggregated[0:0] = [gps_tags]
nsamples_total += nsamples
if num.sum(gps_tags[2]) > 0:
break
ipos, t, fix, nsvs = [num.concatenate(x) for x in zip(*aggregated)][:4]
# return ipos, t, fix, nsvs, header, ioff, nsamples_total
return ipos, t, fix, nsvs, header, 0, nsamples_base
def iload(fn, load_data=True, interpolation='sinc'):
from pyrocko import datacube_ext
from pyrocko import signal_ext
if interpolation not in ('sinc', 'off'):
raise NotImplementedError(
'no such interpolation method: %s' % interpolation)
with open(fn, 'rb') as f:
if load_data:
loadflag = 2
else:
loadflag = 1
try:
header, data_arrays, gps_tags, nsamples, _ = datacube_ext.load(
f.fileno(), loadflag, 0, -1, None)
except datacube_ext.DataCubeError as e:
e = DataCubeError(str(e))
e.set_context('filename', fn)
raise e
header = dict(header)
deltat = 1.0 / int(header['S_RATE'])
nchannels = int(header['CH_NUM'])
ipos, t, fix, nsvs, header_, offset_, nsamples_ = \
get_extended_timing_context(fn)
tmin, tmax, icontrol, tcontrol, _ = analyse_gps_tags(
header_, (ipos, t, fix, nsvs), offset_, nsamples_)
if icontrol is None:
logger.warning(
'No usable GPS timestamps found. Using datacube header '
'information to guess time. (file: "%s")' % fn)
tmin_ip = round(tmin / deltat) * deltat
if interpolation != 'off':
tmax_ip = round(tmax / deltat) * deltat
else:
tmax_ip = tmin_ip + (nsamples-1) * deltat
nsamples_ip = int(round((tmax_ip - tmin_ip)/deltat)) + 1
# to prevent problems with rounding errors:
tmax_ip = tmin_ip + (nsamples_ip-1) * deltat
leaps = num.array(
[x[0] + util.gps_utc_offset(x[0]) for x in util.read_leap_seconds2()],
dtype=float)
if load_data and icontrol is not None:
ncontrol_this = num.sum(
num.logical_and(0 <= icontrol, icontrol < nsamples))
if ncontrol_this <= 1:
logger.warning(
'Extrapolating GPS time information from directory context '
'(insufficient number of GPS timestamps in file: "%s").' % fn)
for i in range(nchannels):
if load_data:
arr = data_arrays[i]
assert arr.size == nsamples
if interpolation == 'sinc' and icontrol is not None:
ydata = num.empty(nsamples_ip, dtype=float)
try:
signal_ext.antidrift(
icontrol, tcontrol,
arr.astype(float), tmin_ip, deltat, ydata)
except signal_ext.Error as e:
e = DataCubeError(str(e))
e.set_context('filename', fn)
e.set_context('n_control_points', icontrol.size)
e.set_context('n_samples_raw', arr.size)
e.set_context('n_samples_ip', ydata.size)
e.set_context('tmin_ip', util.time_to_str(tmin_ip))
raise e
ydata = num.round(ydata).astype(arr.dtype)
else:
ydata = arr
tr_tmin = tmin_ip
tr_tmax = None
else:
ydata = None
tr_tmin = tmin_ip
tr_tmax = tmax_ip
tr = trace.Trace('', header['DEV_NO'], '', 'p%i' % i, deltat=deltat,
ydata=ydata, tmin=tr_tmin, tmax=tr_tmax, meta=header)
bleaps = num.logical_and(tmin_ip <= leaps, leaps < tmax_ip)
if num.any(bleaps):
assert num.sum(bleaps) == 1
tcut = leaps[bleaps][0]
for tmin_cut, tmax_cut in [
(tr.tmin, tcut), (tcut, tr.tmax+tr.deltat)]:
try:
tr_cut = tr.chop(tmin_cut, tmax_cut, inplace=False)
tr_cut.shift(
util.utc_gps_offset(0.5*(tr_cut.tmin+tr_cut.tmax)))
yield tr_cut
except trace.NoData:
pass
else:
tr.shift(util.utc_gps_offset(0.5*(tr.tmin+tr.tmax)))
yield tr
header_keys = {
str: b'GIPP_V DEV_NO E_NAME GPS_PO S_TIME S_DATE DAT_NO'.split(),
int: b'''P_AMPL CH_NUM S_RATE D_FILT C_MODE A_CHOP F_TIME GPS_TI GPS_OF
A_FILT A_PHAS GPS_ON ACQ_ON V_TCXO D_VOLT E_VOLT'''.split()}
all_header_keys = header_keys[str] + header_keys[int]
def detect(first512):
s = first512
if len(s) < 512:
return False
if ord(s[0:1]) >> 4 != 15:
return False
n = s.find(b'\x80')
if n == -1:
n = len(s)
s = s[1:n]
s = s.replace(b'\xf0', b'')
s = s.replace(b';', b' ')
s = s.replace(b'=', b' ')
kvs = s.split(b' ')
if len([k for k in all_header_keys if k in kvs]) == 0:
return False
return True
if __name__ == '__main__':
import argparse
parser = argparse.ArgumentParser(description='Datacube reader')
parser.add_argument(
'action', choices=['timeline', 'gnss', 'stations'],
help='Action')
parser.add_argument(
'files', nargs='+')
parser.add_argument(
'--loglevel', '-l',
choices=['critical', 'error', 'warning', 'info', 'debug'],
default='info',
help='Set logger level. Default: %(default)s')
args = parser.parse_args()
util.setup_logging('pyrocko.io.datacube', args.loglevel)
logging.getLogger('matplotlib.font_manager').disabled = True
if args.action == 'timeline':
plot_timeline(args.files)
elif args.action == 'gnss':
plot_gnss_location_timeline(args.files)
elif args.action == 'stations':
extract_stations(args.files)