# http://pyrocko.org - GPLv3
#
# The Pyrocko Developers, 21st Century
# ---|P------/S----------~Lg----------
'''
A slim table-like data structure for when pandas are too fat.
'''
import math
import numpy as num
from pyrocko.guts import Object, String, Unicode, List, Int, SObject, Any
from pyrocko.guts_array import Array
from pyrocko import geometry, cake
from pyrocko import orthodrome as od
guts_prefix = 'pf'
def nextpow2(i):
return 2**int(math.ceil(math.log(i)/math.log(2.)))
def ncols(arr):
return 1 if arr.ndim == 1 else arr.shape[1]
def nrows(arr):
return arr.shape[0]
def resize_shape(shape, n):
return (n, ) if len(shape) == 1 else (n, shape[1])
[docs]class DType(SObject):
'''
Guts placeholder for :py:class:`numpy.dtype`.
'''
dummy_for = num.dtype
dummy_for_description = 'numpy.dtype'
def anything_to_header(args):
if isinstance(args, Header):
return args
elif isinstance(args, str):
return Header(name=args)
elif isinstance(args, tuple):
return Header(*args)
else:
raise ValueError('argument of type Header, str or tuple expected')
def anything_to_sub_header(args):
if isinstance(args, SubHeader):
return args
elif isinstance(args, str):
return SubHeader(name=args)
elif isinstance(args, tuple):
return SubHeader(*args)
else:
raise ValueError('argument of type SubHeader, str or tuple expected')
[docs]class Description(Object):
name = String.T(optional=True)
headers = List.T(Header.T())
nrows = Int.T()
ncols = Int.T()
def __init__(self, table=None, **kwargs):
if table:
Object.__init__(
self,
name=table._name,
headers=table._headers,
nrows=table.get_nrows(),
ncols=table.get_ncols())
else:
Object.__init__(self, **kwargs)
class NoSuchRecipe(Exception):
pass
class NoSuchCol(Exception):
pass
[docs]class Recipe(Object):
def __init__(self):
self._table = None
self._table = Table()
self._required_headers = []
self._headers = []
self._col_update_map = {}
self._name_to_header = {}
def has_col(self, name):
return name in self._name_to_header
def get_col_names(self):
names = []
for h in self._headers:
names.append(h.name)
for sh in h.sub_headers:
names.append(sh.name)
return names
def get_table(self):
return self._table
def get_header(self, name):
try:
return self._name_to_header[name]
except KeyError:
for h in self._required_headers:
if h.name == name:
return h
raise KeyError(name)
def _add_required_cols(self, table):
for h in self._headers:
if not table.has_col(h.name):
table.add_col(h)
def _update_col(self, table, name):
if not self._table.has_col(name):
self._col_update_map[name](table)
def _add_rows_handler(self, table, nrows_added):
pass
def _register_required_col(self, header):
self._required_headers.append(header)
def _register_computed_col(self, header, updater):
self._headers.append(header)
self._name_to_header[header.name] = header
self._col_update_map[header.name] = updater
for sh in header.sub_headers:
self._col_update_map[sh.name] = updater
self._name_to_header[sh.name] = sh
[docs]class Table(Object):
description__ = Description.T()
arrays__ = List.T(Array.T(serialize_as='base64+meta'))
recipes__ = List.T(Recipe.T())
def __init__(
self,
name=None,
nrows_capacity=None,
nrows_capacity_min=0,
description=None,
arrays=None,
recipes=[]):
Object.__init__(self, init_props=False)
self._name = name
self._buffers = []
self._arrays = []
self._headers = []
self._cols = {}
self._recipes = []
self.nrows_capacity_min = nrows_capacity_min
self._nrows_capacity = 0
if nrows_capacity is not None:
self.set_nrows_capacity(max(nrows_capacity, nrows_capacity_min))
if description and arrays:
self.T.get_property('arrays').validate(
arrays, regularize=True, depth=0)
self._name = description.name
self.add_cols(description.headers, arrays)
for recipe in recipes:
self.add_recipe(recipe)
@property
def description(self):
return self.get_description()
@property
def arrays(self):
return self._arrays
@property
def recipes(self):
return self._recipes
def add_recipe(self, recipe):
self._recipes.append(recipe)
# recipe._add_required_cols(self)
def get_nrows(self):
if not self._arrays:
return 0
else:
return nrows(self._arrays[0])
def get_nrows_capacity(self):
return self._nrows_capacity
def set_nrows_capacity(self, nrows_capacity_new):
if self.get_nrows_capacity() != nrows_capacity_new:
if self.get_nrows() > nrows_capacity_new:
raise ValueError('new capacity too small to hold current data')
new_buffers = []
for buf in self._buffers:
shape = resize_shape(buf.shape, nrows_capacity_new)
new_buffers.append(num.zeros(shape, dtype=buf.dtype))
ncopy = min(self.get_nrows(), nrows_capacity_new)
new_arrays = []
for arr, buf in zip(self._arrays, new_buffers):
buf[:ncopy, ...] = arr[:ncopy, ...]
new_arrays.append(buf[:ncopy, ...])
self._buffers = new_buffers
self._arrays = new_arrays
self._nrows_capacity = nrows_capacity_new
def get_ncols(self):
return len(self._arrays)
def add_col(self, header, array=None):
header = anything_to_header(header)
nrows_current = self.get_nrows()
if array is None:
array = header.default_array(nrows_current)
array = num.asarray(array)
assert header.get_ncols() == ncols(array)
assert array.ndim in (1, 2)
if self._arrays:
assert nrows(array) == nrows_current
if nrows_current == 0:
nrows_current = nrows(array)
self.set_nrows_capacity(
max(nrows_current, self.nrows_capacity_min))
iarr = len(self._arrays)
shape = resize_shape(array.shape, self.get_nrows_capacity())
if shape != array.shape:
buf = num.zeros(shape, dtype=array.dtype)
buf[:nrows_current, ...] = array[:, ...]
else:
buf = array
self._buffers.append(buf)
self._arrays.append(buf[:nrows_current, ...])
self._headers.append(header)
self._cols[header.name] = iarr, None
for icol, sub_header in enumerate(header.sub_headers):
self._cols[sub_header.name] = iarr, icol
def add_cols(self, headers, arrays=None):
if arrays is None:
arrays = [None] * len(headers)
for header, array in zip(headers, arrays):
self.add_col(header, array)
def add_rows(self, arrays):
assert self.get_ncols() == len(arrays)
arrays = [num.asarray(arr) for arr in arrays]
nrows_add = nrows(arrays[0])
nrows_current = self.get_nrows()
nrows_new = nrows_current + nrows_add
if self.get_nrows_capacity() < nrows_new:
self.set_nrows_capacity(max(
self.nrows_capacity_min, nextpow2(nrows_new)))
new_arrays = []
for buf, arr in zip(self._buffers, arrays):
assert ncols(arr) == ncols(buf)
assert nrows(arr) == nrows_add
buf[nrows_current:nrows_new, ...] = arr[:, ...]
new_arrays.append(buf[:nrows_new, ...])
self._arrays = new_arrays
for recipe in self._recipes:
recipe._add_rows_handler(self, nrows_add)
def get_col(self, name, mask=slice(None)):
if name in self._cols:
if isinstance(mask, str):
mask = self.get_col(mask)
iarr, icol = self._cols[name]
if icol is None:
return self._arrays[iarr][mask]
else:
return self._arrays[iarr][mask, icol]
else:
recipe = self.get_recipe_for_col(name)
recipe._update_col(self, name)
return recipe.get_table().get_col(name, mask)
def get_header(self, name):
if name in self._cols:
iarr, icol = self._cols[name]
if icol is None:
return self._headers[iarr]
else:
return self._headers[iarr].sub_headers[icol]
else:
recipe = self.get_recipe_for_col(name)
return recipe.get_header(name)
def has_col(self, name):
return name in self._cols or \
any(rec.has_col(name) for rec in self._recipes)
def get_col_names(self, sub_headers=True):
names = []
for h in self._headers:
names.append(h.name)
if sub_headers:
for sh in h.sub_headers:
names.append(sh.name)
for recipe in self._recipes:
names.extend(recipe.get_col_names())
return names
def get_recipe_for_col(self, name):
for recipe in self._recipes:
if recipe.has_col(name):
return recipe
raise NoSuchCol(name)
def get_description(self):
return Description(self)
def get_as_text(self):
scols = []
formats = {
num.dtype('float64'): '%e'}
for name in self.get_col_names(sub_headers=False):
array = self.get_col(name)
header = self.get_header(name)
fmt = formats.get(array.dtype, '%s')
if array.ndim == 1:
scol = [header.get_caption(), '']
for val in array:
scol.append(fmt % val)
scols.append(scol)
else:
for icol in range(ncols(array)):
sub_header = header.sub_headers[icol]
scol = [header.get_caption(), sub_header.get_caption()]
for val in array[:, icol]:
scol.append(fmt % val)
scols.append(scol)
for scol in scols:
width = max(len(s) for s in scol)
for i in range(len(scol)):
scol[i] = scol[i].rjust(width)
return '\n'.join(' '.join(s for s in srow) for srow in zip(*scols))
def add_computed_col(self, header, func):
header = anything_to_header(header)
self.add_recipe(SimpleRecipe(header, func))
def get_row(self, irow):
if irow == -1:
return [arr[-1:] for arr in self.arrays]
else:
return [arr[irow:irow+1] for arr in self.arrays]
[docs]class SimpleRecipe(Recipe):
def __init__(self, header, func):
Recipe.__init__(self)
self._col_name = header.name
def call_func(tab):
self._table.add_col(header, func(tab))
self._register_computed_col(header, call_func)
def _add_rows_handler(self, table, nrows_added):
Recipe._add_rows_handler(self, table, nrows_added)
if self._table.has_col(self._col_name):
self._table.remove_col(self._col_name)
[docs]class LocationRecipe(Recipe):
def __init__(self):
Recipe.__init__(self)
self.c5_header = Header(name='c5', sub_headers=[
SubHeader(name='ref_lat', unit='degrees'),
SubHeader(name='ref_lon', unit='degrees'),
SubHeader(name='north_shift', unit='m'),
SubHeader(name='east_shift', unit='m'),
SubHeader(name='depth', unit='m')])
self._register_required_col(self.c5_header)
self._latlon_header = Header(name='latlon', sub_headers=[
SubHeader(name='lat', unit='degrees'),
SubHeader(name='lon', unit='degrees')])
self._register_computed_col(self._latlon_header, self._update_latlon)
self._xyz_header = Header(name='xyz', sub_headers=[
SubHeader(name='x', unit='m'),
SubHeader(name='y', unit='m'),
SubHeader(name='z', unit='m')])
self._register_computed_col(self._xyz_header, self._update_xyz)
def set_depth_offset(self, depth_offset):
self.depth_offset = depth_offset
def _add_rows_handler(self, table, nrows_added):
Recipe._add_rows_handler(self, table, nrows_added)
for colname in ['latlon', 'xyz']:
if self._table.has_col(colname):
self._table.remove_col(colname)
def _update_latlon(self, table):
lats, lons = od.ne_to_latlon(
table.get_col('ref_lat'),
table.get_col('ref_lon'),
table.get_col('north_shift'),
table.get_col('east_shift'))
latlons = num.zeros((lats.size, 2))
latlons[:, 0] = lats
latlons[:, 1] = lons
self._table.add_col(self._latlon_header, latlons)
def _update_xyz(self, table):
self._update_latlon(table)
xyzs = geometry.latlondepth2xyz(
num.concatenate((
table.get_col('lat').reshape(-1, 1),
table.get_col('lon').reshape(-1, 1),
table.get_col('depth').reshape(-1, 1)),
axis=1),
planetradius=cake.earthradius)
self._table.add_col(self._xyz_header, xyzs)
[docs]class EventRecipe(LocationRecipe):
def __init__(self):
LocationRecipe.__init__(self)
self._register_required_col(Header(name='time', unit='s'))
self._register_required_col(Header(name='magnitude'))
def iter_events(self, table):
from pyrocko import model
for vec in zip(table.get_col(x) for x in [
'time', 'lat', 'lon',
'north_shift', 'east_shift', 'depth',
'magnitude']):
yield model.Event(
time=vec[0],
lat=vec[1],
lon=vec[2],
north_shift=vec[3],
east_shift=vec[4],
depth=vec[5],
magnitude=vec[6])
def get_events(self, table):
return list(self.iter_events(table))
[docs]class MomentTensorRecipe(Recipe):
def __init__(self):
Recipe.__init__(self)
self.m6_header = Header(name='m6', sub_headers=[
SubHeader(name='Mnn', unit='Nm'),
SubHeader(name='Mee', unit='Nm'),
SubHeader(name='Mdd', unit='Nm'),
SubHeader(name='Mne', unit='Nm'),
SubHeader(name='Mnd', unit='Nm'),
SubHeader(name='Med', unit='Nm')])
self._register_required_col(self.m6_header)