from collections import defaultdict
import numpy as num
from pyrocko import util
from pyrocko.guts import Object, String, List, Timestamp, Dict, Tuple, Float, \
StringChoice
from pyrocko.model.location import Location
from pyrocko.squirrel import CodesNSLCE, FDSNSource, Dataset, CodesNSL, Sensor
from pyrocko.has_paths import HasPaths, Path
from .grid.location import UnstructuredLocationGrid, distances_3d
guts_prefix = 'gato'
km = 1000.
def time_or_none_to_str(t):
return util.time_to_str(t, '%Y-%m-%d') if t else '...'
[docs]class SensorArrayInfo(Object):
codes = List.T(CodesNSLCE.T())
tmin = Timestamp.T(optional=True)
tmax = Timestamp.T(optional=True)
center = Location.T(optional=True)
distances_stats = Tuple.T(5, Float.T(), optional=True)
codes_nsl__ = List.T(CodesNSL.T())
codes_nsl_by_channels = Dict.T(Tuple.T(String.T()), List.T(CodesNSL.T()))
sensors = List.T(Sensor.T())
@property
def n_codes_nsl(self):
return len(self.codes_nsl)
@property
def n_codes(self):
return len(self.codes)
@property
def str_codes_nsl_by_channels(self):
return ', '.join(
'%s: %i' % (','.join(k), len(v))
for (k, v) in self.codes_nsl_by_channels.items())
@property
def str_distances_stats(self):
return ', '.join('%5.1f' % (v/km) for v in self.distances_stats) \
if self.distances_stats else ''
@property
def str_tmin(self):
return time_or_none_to_str(self.tmin)
@property
def str_tmax(self):
return time_or_none_to_str(self.tmax)
@property
def summary(self):
return ' | '.join((
'%3i' % self.n_codes_nsl,
'%3i' % self.n_codes,
time_or_none_to_str(self.tmin).ljust(10),
time_or_none_to_str(self.tmax).ljust(10),
'%-33s' % self.str_distances_stats,
self.str_codes_nsl_by_channels))
@property
def codes_nsl(self):
return sorted(set(CodesNSL(c) for c in self.codes))
@codes_nsl.setter
def codes_nsl(self, _):
pass
[docs]class SensorArrayType(StringChoice):
choices = [
'seismic',
'infrasound',
'hydrophone',
]
[docs]class SensorArray(Object):
name = String.T(optional=True)
codes = List.T(CodesNSLCE.T())
type = SensorArrayType.T(optional=True)
comment = String.T(optional=True)
@property
def summary(self):
return ' | '.join(('%-17s' % self.name, self.type[:1]))
@property
def summary2(self):
return ' | '.join(
('%-17s' % self.name, self.type[:1], self.comment or ''))
def get_info(self, sq, channels=None, time=None, tmin=None, tmax=None):
if isinstance(channels, str):
channels = [channels]
codes_query = set()
if channels is not None:
for c in self.codes:
for cha in channels:
codes_query.add(c.replace(channel=cha))
codes_query = sorted(codes_query)
else:
codes_query = self.codes
tmins = []
tmaxs = []
codes = set()
nsl_by_chas = defaultdict(set)
sensors = sq.get_sensors(
codes=codes_query, time=time, tmin=tmin, tmax=tmax)
for sensor in sensors:
for channel in sensor.channels:
codes.add(channel.codes)
sensor_chas = tuple(
sorted(set(
channel.codes.channel
for channel in sensor.channels)))
nsl_by_chas[sensor_chas].add(CodesNSL(sensor.codes))
tmins.append(sensor.tmin)
tmaxs.append(sensor.tmax)
tmins = [tmin for tmin in tmins if tmin is not None]
tmin = min(tmins) if tmins else None
tmax = None if None in tmaxs or not tmaxs else max(tmaxs)
if sensors:
grid = UnstructuredLocationGrid.from_locations(
sensors, ignore_position_duplicates=True)
center = grid.get_center()
distances = distances_3d(grid, grid).flatten()
distances = distances[distances != 0.0]
distances_stats = tuple(num.percentile(
distances, [0., 10., 50., 90., 100.])) \
if distances.size != 0 else None
else:
center = None
distances_stats = None
codes_nsl_by_channels = dict(
(k, sorted(v)) for (k, v) in nsl_by_chas.items())
return SensorArrayInfo(
codes=sorted(codes),
codes_nsl_by_channels=codes_nsl_by_channels,
tmin=tmin,
tmax=tmax,
distances_stats=distances_stats,
center=center,
sensors=sensors)
[docs]class SensorArrayFromFDSN(SensorArray):
sources = List.T(FDSNSource.T())
def to_codes(codes):
return [CodesNSLCE(c) for c in codes]
def _make_fdsn_source(site, codes):
return FDSNSource(site=site, codes=codes)
g_sensor_arrays = [
SensorArrayFromFDSN(
name=name,
type=typ,
codes=to_codes(codes),
comment=comment,
sources=[_make_fdsn_source('iris', codes)])
for (typ, name, codes, comment) in [
('hydrophone', 'dghan', ['IM.H08N?.*.?DH'], ''),
('hydrophone', 'dghas', ['IM.H08S?.*.?DH'], ''),
('infrasound', 'bermuda', ['IM.I51H?.*.?DF'], ''),
('infrasound', 'cocos', ['IM.I06H?.*.?DF'], ''),
('infrasound', 'dgha-land', ['IM.I52H?.*.?DF'], ''),
('infrasound', 'fairbanks', ['IM.I53H?.*.?DF'], ''),
('infrasound', 'hia', ['IM.I59H?.*.?DF'], ''),
('infrasound', 'narrogin', ['IM.I04H?.*.?DF'], ''),
('infrasound', 'nia', ['IM.I56H?.*.?DF'], ''),
('infrasound', 'pfia', ['IM.I57H?.*.?DF', 'IM.I57L?.*.?DF'], ''),
('infrasound', 'tdc', ['IM.H09N?.*.?DF', 'IM.I49H?.*.?DF'], ''),
('infrasound', 'warramunga', ['IM.I07H?.*.?DF'], ''),
('seismic', 'alice', ['AU.AS*.*.?H?'],
'Alice Springs, Australia'),
('seismic', 'bca', ['IM.BC0?.*.?H?'],
'Beaver Creek, Alaska, USA'),
('seismic', 'bma', ['IM.BM0?.*.?H?'],
'Burnt Mountain, Alaska, USA'),
('seismic', 'esk', ['IM.EKB?.*.?H?', 'IM.EKR*.*.?H?'],
'Eskdalemuir, Scotland'),
('seismic', 'ilar', ['IM.IL*.*.?H?'],
'Eielson, Alaska, USA'),
('seismic', 'imar', ['IM.IM0?.*.?H?'],
'Indian Mountain, Alaska, USA'),
('seismic', 'nvar', ['IM.NV*.*.?H?'],
'Mina, Nevada, USA'),
('seismic', 'pdar', ['IM.PD0*.*.?H?', 'IM.PD1*.*.?H?'],
'Wyoming, USA'),
('seismic', 'psar', ['AU.PSA*.*.?H?'],
'Pilbara, northwestern Australia'),
('seismic', 'txar', ['IM.TX*.*.?H?'],
'Lajitas, Texas, USA'),
('seismic', 'yka', ['CN.YKA*.*.?H?'],
'Yellowknife, Northwest Territories, Canada,'),
('seismic', 'knet', ['KN.*.*.?H?'],
'Kyrgyzstan'),
]
] + [
SensorArrayFromFDSN(
name=name,
type='seismic',
codes=to_codes(codes),
sources=[_make_fdsn_source('geofon', codes)],
comment=comment)
for (name, codes, comment) in [
('rohrbach', ['6A.V*.*.?H?'],
'Rohrbach/Vogtland, German-Czech border region'),
('neumayer', ['AW.VNA??.*.?H?', 'AW.VNA2.*.?H?'],
'Station Neumayer Watz, Antarctica'),
]
] + [
SensorArrayFromFDSN(
name=name,
type='seismic',
codes=to_codes(codes),
sources=[_make_fdsn_source('bgr', codes)],
comment=comment)
for (name, codes, comment) in [
('geres', [
'GR.GEA?.*.?H?',
'GR.GEB?.*.?H?',
'GR.GEC?.*.?H?',
'GR.GED?.*.?H?'],
'GERESS, Germany'),
('grf', ['GR.GR??.*.?H?'], 'Gräfenberg, Germany')]
] + [
SensorArrayFromFDSN(
name=name,
type='seismic',
codes=to_codes(codes),
sources=[_make_fdsn_source('norsar', codes)],
comment=comment)
for (name, codes, comment) in [
('norsar', [
'NO.NA*.*.*',
'NO.NB*.*.*',
'NO.NC*.*.*', ],
'Central Norway'),
('arces', [
'NO.ARA*.*.*',
'NO.ARB*.*.*',
'NO.ARC*.*.*',
'NO.ARD*.*.*',
'NO.ARE*.*.*'],
'Northern Norway'),
('spits', [
'NO.SPA*.*.*',
'NO.SPB*.*.*'],
'Spitsbergen, Norway'),
('bear', ['NO.BEA?.*.*'],
'Bear Island, Norway'),
('hspa', ['NO.HSPA?.*.*'],
'Hornsund, Spitsbergen, Norway'),
]
]
g_sensor_arrays_dict = dict(
(array.name, array) for array in g_sensor_arrays)
[docs]class SensorArrayFromFile(SensorArray, HasPaths):
name = String.T()
stations_path = Path.T()
def get_named_arrays_dataset(names=None):
if isinstance(names, str):
names = [names]
sources = []
for array in g_sensor_arrays:
if names is None or array.name in names:
sources.extend(array.sources)
if names is None:
comment = 'Aggregated dataset containing data sources for all ' \
'built-in arrays of Gato.'
else:
if len(names) == 1:
comment = 'Data source for Gato array: %s' % names[0]
else:
comment = 'Data sources for Gato array: %s' % ', ' .join(names)
return Dataset(sources=sources, comment=comment)
def get_named_arrays():
return g_sensor_arrays_dict
def get_named_array(name):
return g_sensor_arrays_dict[name]
__all__ = [
'SensorArrayInfo',
'SensorArray',
'SensorArrayFromFDSN',
'get_named_arrays_dataset',
'get_named_arrays',
'get_named_array',
]