Source code for pyrocko.io.datacube

# http://pyrocko.org - GPLv3
#
# The Pyrocko Developers, 21st Century
# ---|P------/S----------~Lg----------
# python 2/3
from __future__ import absolute_import

import os
import math
import logging

import numpy as num

from pyrocko import trace, util, plot
from pyrocko.guts import Object, Int, String, Timestamp

from . import io_common

logger = logging.getLogger(__name__)

N_GPS_TAGS_WANTED = 200  # must match definition in datacube_ext.c

APPLY_SUBSAMPLE_SHIFT_CORRECTION = True


def color(c):
    c = plot.color(c)
    return tuple(x/255. for x in c)


[docs]class DataCubeError(io_common.FileLoadError): pass
[docs]class ControlPointError(Exception): pass
def make_control_point(ipos_block, t_block, tref, deltat): # reduce time (no drift would mean straight line) tred = (t_block - tref) - ipos_block*deltat # first round, remove outliers q25, q75 = num.percentile(tred, (25., 75.)) iok = num.logical_and(q25 <= tred, tred <= q75) # detrend slope, offset = num.polyfit(ipos_block[iok], tred[iok], 1) tred2 = tred - (offset + slope * ipos_block) # second round, remove outliers based on detrended tred, refit q25, q75 = num.percentile(tred2, (25., 75.)) iok = num.logical_and(q25 <= tred2, tred2 <= q75) x = ipos_block[iok].copy() ipos0 = x[0] x -= ipos0 y = tred[iok].copy() if x.size < 2: raise ControlPointError('Insufficient number control points after QC.') elif x.size < 5: # needed for older numpy versions (slope, offset) = num.polyfit(x, y, 1) else: (slope, offset), cov = num.polyfit(x, y, 1, cov=True) slope_err, offset_err = num.sqrt(num.diag(cov)) slope_err_limit = 1.0e-10 if ipos_block.size < N_GPS_TAGS_WANTED // 2: slope_err_limit *= (200. / ipos_block.size) offset_err_limit = 5.0e-3 if slope_err > slope_err_limit: raise ControlPointError( 'Slope error too large: %g (limit: %g' % ( slope_err, slope_err_limit)) if offset_err > offset_err_limit: raise ControlPointError( 'Offset error too large: %g (limit: %g' % ( offset_err, offset_err_limit)) ic = ipos_block[ipos_block.size//2] tc = offset + slope * (ic - ipos0) return ic, tc + ic * deltat + tref def analyse_gps_tags(header, gps_tags, offset, nsamples): ipos, t, fix, nsvs = gps_tags deltat = 1.0 / int(header['S_RATE']) tquartz = offset + ipos * deltat toff = t - tquartz toff_median = num.median(toff) n = t.size dtdt = (t[1:n] - t[0:n-1]) / (tquartz[1:n] - tquartz[0:n-1]) ok = abs(toff_median - toff) < 10. xok = num.abs(dtdt - 1.0) < 0.00001 if ok.size >= 1: ok[0] = False ok[1:n] &= xok ok[0:n-1] &= xok ok[n-1] = False ipos = ipos[ok] t = t[ok] fix = fix[ok] nsvs = nsvs[ok] blocksize = N_GPS_TAGS_WANTED // 2 if ipos.size < blocksize: blocksize = max(10, ipos.size // 4) logger.warning( 'Small number of GPS tags found. ' 'Reducing analysis block size to %i tags. ' 'Time correction may be unreliable.' % blocksize) try: if ipos.size < blocksize: raise ControlPointError( 'Cannot determine GPS time correction: ' 'Too few GPS tags found: %i' % ipos.size) j = 0 control_points = [] tref = num.median(t - ipos*deltat) while j < ipos.size - blocksize: ipos_block = ipos[j:j+blocksize] t_block = t[j:j+blocksize] try: ic, tc = make_control_point(ipos_block, t_block, tref, deltat) control_points.append((ic, tc)) except ControlPointError as e: logger.debug(str(e)) j += blocksize ipos_last = ipos[-blocksize:] t_last = t[-blocksize:] try: ic, tc = make_control_point(ipos_last, t_last, tref, deltat) control_points.append((ic, tc)) except ControlPointError as e: logger.debug(str(e)) if len(control_points) < 2: raise ControlPointError( 'Could not safely determine time corrections from GPS: ' 'unable to construct two or more control points') i0, t0 = control_points[0] i1, t1 = control_points[1] i2, t2 = control_points[-2] i3, t3 = control_points[-1] if len(control_points) == 2: tmin = t0 - i0 * deltat - offset * deltat tmax = t3 + (nsamples - i3 - 1) * deltat else: icontrol = num.array( [x[0] for x in control_points], dtype=num.int64) tcontrol = num.array( [x[1] for x in control_points], dtype=float) # robust against steps: slope = num.median( (tcontrol[1:] - tcontrol[:-1]) / (icontrol[1:] - icontrol[:-1])) tmin = t0 + (offset - i0) * slope tmax = t2 + (offset + nsamples - 1 - i2) * slope if offset < i0: control_points[0:0] = [(offset, tmin)] if offset + nsamples - 1 > i3: control_points.append((offset + nsamples - 1, tmax)) icontrol = num.array([x[0] for x in control_points], dtype=num.int64) tcontrol = num.array([x[1] for x in control_points], dtype=float) # corrected 2021-10-26: This sub-sample time shift introduced by the # Cube's ADC was previously not recognized. if APPLY_SUBSAMPLE_SHIFT_CORRECTION: tcontrol -= 0.199 * deltat + 0.0003 return tmin, tmax, icontrol, tcontrol, ok except ControlPointError as e: logger.error(str(e)) tmin = util.str_to_time(header['S_DATE'] + header['S_TIME'], format='%y/%m/%d%H:%M:%S') idat = int(header['DAT_NO']) if idat == 0: tmin = tmin + util.gps_utc_offset(tmin) else: tmin = util.day_start(tmin + idat * 24.*3600.) \ + util.gps_utc_offset(tmin) tmax = tmin + (nsamples - 1) * deltat icontrol = num.array([offset, offset+nsamples - 1], dtype=num.int64) tcontrol = num.array([tmin, tmax]) return tmin, tmax, icontrol, tcontrol, ok def plot_gnss_location_timeline(fns): from matplotlib import pyplot as plt from pyrocko.orthodrome import latlon_to_ne_numpy not_ = num.logical_not h = 3600. fig = plt.figure() axes = [] for i in range(4): axes.append( fig.add_subplot(4, 1, i+1, sharex=axes[-1] if axes else None)) background_colors = [ color('aluminium1'), color('aluminium2')] tref = None for ifn, fn in enumerate(fns): header, gps_tags, nsamples = get_time_infos(fn) _, t, fix, nsvs, lats, lons, elevations, _ = gps_tags fix = fix.astype(bool) if t.size < 2: logger.warning('Need at least 2 gps tags for plotting: %s' % fn) if tref is None: tref = util.day_start(t[0]) lat, lon, elevation = coordinates_from_gps(gps_tags) norths, easts = latlon_to_ne_numpy(lat, lon, lats, lons) for ax, data in zip(axes, (norths, easts, elevations, nsvs)): tspan = t[num.array([0, -1])] ax.axvspan(*((tspan - tref) / h), color=background_colors[ifn % 2]) med = num.median(data) ax.plot( (tspan - tref) / h, [med, med], ls='--', c='k', lw=3, alpha=0.5) ax.plot( (t[not_(fix)] - tref) / h, data[not_(fix)], 'o', ms=1.5, mew=0, color=color('scarletred2')) ax.plot( (t[fix] - tref) / h, data[fix], 'o', ms=1.5, mew=0, color=color('aluminium6')) for ax in axes: ax.grid(alpha=.3) ax_lat, ax_lon, ax_elev, ax_nsv = axes ax_lat.set_ylabel('Northing [m]') ax_lon.set_ylabel('Easting [m]') ax_elev.set_ylabel('Elevation [m]') ax_nsv.set_ylabel('Number of Satellites') ax_lat.get_xaxis().set_tick_params(labelbottom=False) ax_lon.get_xaxis().set_tick_params(labelbottom=False) ax_nsv.set_xlabel( 'Hours after %s' % util.time_to_str(tref, format='%Y-%m-%d')) fig.suptitle( u'Lat: %.5f\u00b0 Lon: %.5f\u00b0 Elevation: %g m' % ( lat, lon, elevation)) plt.show() def coordinates_from_gps(gps_tags): ipos, t, fix, nsvs, lats, lons, elevations, temps = gps_tags return tuple(num.median(x) for x in (lats, lons, elevations)) def extract_stations(fns): import io import sys from pyrocko.model import Station from pyrocko.guts import dump_all stations = {} for fn in fns: sta_name = os.path.splitext(fn)[1].lstrip('.') if sta_name in stations: logger.warning('Cube %s already in list!', sta_name) continue header, gps_tags, nsamples = get_time_infos(fn) lat, lon, elevation = coordinates_from_gps(gps_tags) sta = Station( network='', station=sta_name, name=sta_name, location='', lat=lat, lon=lon, elevation=elevation) stations[sta_name] = sta f = io.BytesIO() dump_all(stations.values(), stream=f) sys.stdout.write(f.getvalue().decode()) def plot_timeline(fns): from matplotlib import pyplot as plt fig = plt.figure() axes = fig.gca() h = 3600. if isinstance(fns, str): fn = fns if os.path.isdir(fn): fns = [ os.path.join(fn, entry) for entry in sorted(os.listdir(fn))] ipos, t, fix, nsvs, header, offset, nsamples = \ get_timing_context(fns) else: ipos, t, fix, nsvs, header, offset, nsamples = \ get_extended_timing_context(fn) else: ipos, t, fix, nsvs, header, offset, nsamples = \ get_timing_context(fns) deltat = 1.0 / int(header['S_RATE']) tref = num.median(t - ipos * deltat) tref = round(tref / deltat) * deltat if APPLY_SUBSAMPLE_SHIFT_CORRECTION: tcorr = 0.199 * deltat + 0.0003 else: tcorr = 0.0 x = ipos*deltat y = (t - tref) - ipos*deltat - tcorr bfix = fix != 0 bnofix = fix == 0 tmin, tmax, icontrol, tcontrol, ok = analyse_gps_tags( header, (ipos, t, fix, nsvs), offset, nsamples) la = num.logical_and nok = num.logical_not(ok) axes.plot( x[la(bfix, ok)]/h, y[la(bfix, ok)], '+', ms=5, color=color('chameleon3')) axes.plot( x[la(bfix, nok)]/h, y[la(bfix, nok)], '+', ms=5, color=color('aluminium4')) axes.plot( x[la(bnofix, ok)]/h, y[la(bnofix, ok)], 'x', ms=5, color=color('chocolate3')) axes.plot( x[la(bnofix, nok)]/h, y[la(bnofix, nok)], 'x', ms=5, color=color('aluminium4')) tred = tcontrol - icontrol*deltat - tref axes.plot(icontrol*deltat/h, tred, color=color('aluminium6')) axes.plot(icontrol*deltat/h, tred, 'o', ms=5, color=color('aluminium6')) ymin = (math.floor(tred.min() / deltat)-1) * deltat ymax = (math.ceil(tred.max() / deltat)+1) * deltat # ymin = min(ymin, num.min(y)) # ymax = max(ymax, num.max(y)) if ymax - ymin < 1000 * deltat: ygrid = math.floor(tred.min() / deltat) * deltat while ygrid < ymax: axes.axhline(ygrid, color=color('aluminium4'), alpha=0.3) ygrid += deltat xmin = icontrol[0]*deltat/h xmax = icontrol[-1]*deltat/h xsize = xmax - xmin xmin -= xsize * 0.1 xmax += xsize * 0.1 axes.set_xlim(xmin, xmax) axes.set_ylim(ymin, ymax) axes.set_xlabel('Uncorrected (quartz) time [h]') axes.set_ylabel('Relative time correction [s]') plt.show() g_dir_contexts = {}
[docs]class DirContextEntry(Object): path = String.T() tstart = Timestamp.T() ifile = Int.T()
[docs]class DirContext(Object): path = String.T() mtime = Timestamp.T() entries = DirContextEntry.T() def get_entry(self, fn): path = os.path.abspath(fn) for entry in self.entries: if entry.path == path: return entry raise Exception('entry not found') def iter_entries(self, fn, step=1): current = self.get_entry(fn) by_ifile = dict( (entry.ifile, entry) for entry in self.entries if entry.tstart == current.tstart) icurrent = current.ifile while True: icurrent += step try: yield by_ifile[icurrent] except KeyError: break
def context(fn): from pyrocko import datacube_ext dpath = os.path.dirname(os.path.abspath(fn)) mtimes = [os.stat(dpath)[8]] dentries = sorted([os.path.join(dpath, f) for f in os.listdir(dpath) if os.path.isfile(os.path.join(dpath, f))]) for dentry in dentries: fn2 = os.path.join(dpath, dentry) mtimes.append(os.stat(fn2)[8]) mtime = float(max(mtimes)) if dpath in g_dir_contexts: dir_context = g_dir_contexts[dpath] if dir_context.mtime == mtime: return dir_context del g_dir_contexts[dpath] entries = [] for dentry in dentries: fn2 = os.path.join(dpath, dentry) if not os.path.isfile(fn2): continue with open(fn2, 'rb') as f: first512 = f.read(512) if not detect(first512): continue with open(fn2, 'rb') as f: try: header, data_arrays, gps_tags, nsamples, _ = \ datacube_ext.load(f.fileno(), 3, 0, -1, None) except datacube_ext.DataCubeError as e: e = DataCubeError(str(e)) e.set_context('filename', fn) raise e header = dict(header) entries.append(DirContextEntry( path=os.path.abspath(fn2), tstart=util.str_to_time( '20' + header['S_DATE'] + ' ' + header['S_TIME'], format='%Y/%m/%d %H:%M:%S'), ifile=int(header['DAT_NO']))) dir_context = DirContext(mtime=mtime, path=dpath, entries=entries) return dir_context def get_time_infos(fn): from pyrocko import datacube_ext with open(fn, 'rb') as f: try: header, _, gps_tags, nsamples, _ = datacube_ext.load( f.fileno(), 1, 0, -1, None) except datacube_ext.DataCubeError as e: e = DataCubeError(str(e)) e.set_context('filename', fn) raise e return dict(header), gps_tags, nsamples def get_timing_context(fns): joined = [[], [], [], []] ioff = 0 for fn in fns: header, gps_tags, nsamples = get_time_infos(fn) ipos = gps_tags[0] ipos += ioff for i in range(4): joined[i].append(gps_tags[i]) ioff += nsamples ipos, t, fix, nsvs = [num.concatenate(x) for x in joined] nsamples = ioff return ipos, t, fix, nsvs, header, 0, nsamples def get_extended_timing_context(fn): c = context(fn) header, gps_tags, nsamples_base = get_time_infos(fn) ioff = 0 aggregated = [gps_tags] nsamples_total = nsamples_base if num.sum(gps_tags[2]) == 0: ioff = nsamples_base for entry in c.iter_entries(fn, 1): _, gps_tags, nsamples = get_time_infos(entry.path) ipos = gps_tags[0] ipos += ioff aggregated.append(gps_tags) nsamples_total += nsamples if num.sum(gps_tags[2]) > 0: break ioff += nsamples ioff = 0 for entry in c.iter_entries(fn, -1): _, gps_tags, nsamples = get_time_infos(entry.path) ioff -= nsamples ipos = gps_tags[0] ipos += ioff aggregated[0:0] = [gps_tags] nsamples_total += nsamples if num.sum(gps_tags[2]) > 0: break ipos, t, fix, nsvs = [num.concatenate(x) for x in zip(*aggregated)][:4] # return ipos, t, fix, nsvs, header, ioff, nsamples_total return ipos, t, fix, nsvs, header, 0, nsamples_base def iload(fn, load_data=True, interpolation='sinc'): from pyrocko import datacube_ext from pyrocko import signal_ext if interpolation not in ('sinc', 'off'): raise NotImplementedError( 'no such interpolation method: %s' % interpolation) with open(fn, 'rb') as f: if load_data: loadflag = 2 else: loadflag = 1 try: header, data_arrays, gps_tags, nsamples, _ = datacube_ext.load( f.fileno(), loadflag, 0, -1, None) except datacube_ext.DataCubeError as e: e = DataCubeError(str(e)) e.set_context('filename', fn) raise e header = dict(header) deltat = 1.0 / int(header['S_RATE']) nchannels = int(header['CH_NUM']) ipos, t, fix, nsvs, header_, offset_, nsamples_ = \ get_extended_timing_context(fn) tmin, tmax, icontrol, tcontrol, _ = analyse_gps_tags( header_, (ipos, t, fix, nsvs), offset_, nsamples_) if icontrol is None: logger.warning( 'No usable GPS timestamps found. Using datacube header ' 'information to guess time. (file: "%s")' % fn) tmin_ip = round(tmin / deltat) * deltat if interpolation != 'off': tmax_ip = round(tmax / deltat) * deltat else: tmax_ip = tmin_ip + (nsamples-1) * deltat nsamples_ip = int(round((tmax_ip - tmin_ip)/deltat)) + 1 # to prevent problems with rounding errors: tmax_ip = tmin_ip + (nsamples_ip-1) * deltat leaps = num.array( [x[0] + util.gps_utc_offset(x[0]) for x in util.read_leap_seconds2()], dtype=float) if load_data and icontrol is not None: ncontrol_this = num.sum( num.logical_and(0 <= icontrol, icontrol < nsamples)) if ncontrol_this <= 1: logger.warning( 'Extrapolating GPS time information from directory context ' '(insufficient number of GPS timestamps in file: "%s").' % fn) for i in range(nchannels): if load_data: arr = data_arrays[i] assert arr.size == nsamples if interpolation == 'sinc' and icontrol is not None: ydata = num.empty(nsamples_ip, dtype=float) try: signal_ext.antidrift( icontrol, tcontrol, arr.astype(float), tmin_ip, deltat, ydata) except signal_ext.Error as e: e = DataCubeError(str(e)) e.set_context('filename', fn) e.set_context('n_control_points', icontrol.size) e.set_context('n_samples_raw', arr.size) e.set_context('n_samples_ip', ydata.size) e.set_context('tmin_ip', util.time_to_str(tmin_ip)) raise e ydata = num.round(ydata).astype(arr.dtype) else: ydata = arr tr_tmin = tmin_ip tr_tmax = None else: ydata = None tr_tmin = tmin_ip tr_tmax = tmax_ip tr = trace.Trace('', header['DEV_NO'], '', 'p%i' % i, deltat=deltat, ydata=ydata, tmin=tr_tmin, tmax=tr_tmax, meta=header) bleaps = num.logical_and(tmin_ip <= leaps, leaps < tmax_ip) if num.any(bleaps): assert num.sum(bleaps) == 1 tcut = leaps[bleaps][0] for tmin_cut, tmax_cut in [ (tr.tmin, tcut), (tcut, tr.tmax+tr.deltat)]: try: tr_cut = tr.chop(tmin_cut, tmax_cut, inplace=False) tr_cut.shift( util.utc_gps_offset(0.5*(tr_cut.tmin+tr_cut.tmax))) yield tr_cut except trace.NoData: pass else: tr.shift(util.utc_gps_offset(0.5*(tr.tmin+tr.tmax))) yield tr header_keys = { str: b'GIPP_V DEV_NO E_NAME GPS_PO S_TIME S_DATE DAT_NO'.split(), int: b'''P_AMPL CH_NUM S_RATE D_FILT C_MODE A_CHOP F_TIME GPS_TI GPS_OF A_FILT A_PHAS GPS_ON ACQ_ON V_TCXO D_VOLT E_VOLT'''.split()} all_header_keys = header_keys[str] + header_keys[int] def detect(first512): s = first512 if len(s) < 512: return False if ord(s[0:1]) >> 4 != 15: return False n = s.find(b'\x80') if n == -1: n = len(s) s = s[1:n] s = s.replace(b'\xf0', b'') s = s.replace(b';', b' ') s = s.replace(b'=', b' ') kvs = s.split(b' ') if len([k for k in all_header_keys if k in kvs]) == 0: return False return True if __name__ == '__main__': import argparse parser = argparse.ArgumentParser(description='Datacube reader') parser.add_argument( 'action', choices=['timeline', 'gnss', 'stations'], help='Action') parser.add_argument( 'files', nargs='+') parser.add_argument( '--loglevel', '-l', choices=['critical', 'error', 'warning', 'info', 'debug'], default='info', help='Set logger level. Default: %(default)s') args = parser.parse_args() util.setup_logging('pyrocko.io.datacube', args.loglevel) logging.getLogger('matplotlib.font_manager').disabled = True if args.action == 'timeline': plot_timeline(args.files) elif args.action == 'gnss': plot_gnss_location_timeline(args.files) elif args.action == 'stations': extract_stations(args.files)