Source code for pyrocko.gato.arrays

from collections import defaultdict

import numpy as num

from pyrocko import util
from pyrocko.guts import Object, String, List, Timestamp, Dict, Tuple, Float
from pyrocko.model.location import Location
from pyrocko.squirrel import CodesNSLCE, FDSNSource, Dataset, CodesNSL

from pyrocko.has_paths import HasPaths, Path
from .grid.location import UnstructuredLocationGrid, distances_3d

guts_prefix = 'gato'
km = 1000.


def time_or_none_to_str(t):
    return util.time_to_str(t, '%Y-%m-%d') if t else '...'


[docs]class SensorArrayInfo(Object): name = String.T(optional=True) codes = List.T(CodesNSLCE.T()) tmin = Timestamp.T(optional=True) tmax = Timestamp.T(optional=True) center = Location.T(optional=True) distances_stats = Tuple.T(5, Float.T(), optional=True) codes_nsl__ = List.T(CodesNSL.T()) codes_nsl_by_channels = Dict.T(Tuple.T(String.T()), List.T(CodesNSL.T())) @property def summary(self): return ' | '.join(( self.name.ljust(15), '%2i' % len(self.codes_nsl), '%2i' % len(self.codes), time_or_none_to_str(self.tmin).ljust(10), time_or_none_to_str(self.tmax).ljust(10), ', '.join('%5.1f' % (v/km) for v in self.distances_stats) if self.distances_stats is not None else ' ' * 33, ', '.join('%s: %i' % (','.join(k), len(v)) for (k, v) in self.codes_nsl_by_channels.items()))) @property def codes_nsl(self): return sorted(set(CodesNSL(c) for c in self.codes)) @codes_nsl.setter def codes_nsl(self, _): pass
[docs]class SensorArray(Object): name = String.T(optional=True) codes = List.T(CodesNSLCE.T()) def get_info(self, sq, channels=None, time=None, tmin=None, tmax=None): if isinstance(channels, str): channels = [channels] codes_query = set() if channels is not None: for c in self.codes: for cha in channels: codes_query.add(c.replace(channel=cha)) codes_query = sorted(codes_query) else: codes_query = self.codes tmins = [] tmaxs = [] codes = set() coordinates = set() nsl_by_chas = defaultdict(set) for sensor in sq.get_sensors( codes=codes_query, time=time, tmin=tmin, tmax=tmax): for channel in sensor.channels: codes.add(channel.codes) sensor_chas = tuple( sorted(set( channel.codes.channel for channel in sensor.channels))) nsl_by_chas[sensor_chas].add(CodesNSL(sensor.codes)) tmins.append(sensor.tmin) tmaxs.append(sensor.tmax) coordinates.add(( sensor.lat, sensor.lon, -sensor.elevation+sensor.depth, 0., 0.)) tmins = [tmin for tmin in tmins if tmin is not None] tmin = min(tmins) if tmins else None tmax = None if None in tmaxs or not tmaxs else max(tmaxs) coordinates = num.array(list(coordinates)) if coordinates.size != 0: grid = UnstructuredLocationGrid(coordinates=coordinates) center = grid.get_center() distances = distances_3d(grid, grid).flatten() distances = distances[distances != 0.0] distances_stats = tuple(num.percentile( distances, [0., 10., 50., 90., 100.])) \ if distances.size != 0 else None else: center = None distances_stats = None codes_nsl_by_channels = dict( (k, sorted(v)) for (k, v) in nsl_by_chas.items()) return SensorArrayInfo( name=self.name, codes=sorted(codes), codes_nsl_by_channels=codes_nsl_by_channels, tmin=tmin, tmax=tmax, distances_stats=distances_stats, center=center)
[docs]class SensorArrayFromFDSN(SensorArray): sources = List.T(FDSNSource.T())
def to_codes(codes): return [CodesNSLCE(c) for c in codes] def _make_fdsn_source(site, codes): return FDSNSource(site=site, codes=codes) g_sensor_arrays = [ SensorArrayFromFDSN( name=name, codes=to_codes(codes), sources=[_make_fdsn_source('iris', codes)]) for (name, codes) in [ ('YKA', ['CN.YKA*.*.*']), ('ESK', ['IM.EKB?.*.*', 'IM.EKR*.*.*']), ('ESK1', ['IM.EKA?.*.*']), ('ILAR', ['IM.IL*.*.*']), ('IMAR', ['IM.IM0?.*.*']), ('NIA', ['IM.I56H?.*.*']), ('PFIA', ['IM.I57H?.*.*', 'IM.I57L?.*.*']), ('HIA', ['IM.I59H?.*.*']), ('BermudaIA', ['IM.I51H?.*.*']), ('FairbanksIA', ['IM.I53H?.*.*']), ('DGHAland', ['IM.I52H?.*.*']), ('BMA', ['IM.BM0?.*.*']), ('BCA', ['IM.BC0?.*.*']), ('NVAR', ['IM.NV*.*.*']), ('PDAR', ['IM.PD0*.*.*', 'IM.PD1*.*.*']), ('TXAR', ['IM.TX*.*.*']), ('Pilbara', ['AU.PSA*.*.*']), ('AliceSprings', ['AU.AS*.*.*']), ('DGHAS', ['IM.H08S?.*.*']), ('DGHAN', ['IM.H08N?.*.*']), ('TDC', ['IM.H09N?.*.*', 'IM.I49H?.*.*']), ('NarroginIA', ['IM.I04H?.*.*']), ('CocosIslands', ['IM.I06H?.*.*']), ('Warramunga', ['IM.I07H?.*.*']) ] ] + [ SensorArrayFromFDSN( name=name, codes=to_codes(codes), sources=[_make_fdsn_source('geofon', codes)]) for (name, codes) in [ ('ROHRBACH', ['6A.V*.*.*']), ('AntaOffshore', ['GR.I27L?.*.*']), ('AntaOnshore', ['AW.VNA*.*.*']), ] ] + [ SensorArrayFromFDSN( name=name, codes=to_codes(codes), sources=[_make_fdsn_source('bgr', codes)]) for (name, codes) in [ ('GERES', [ 'GR.GEA?.*.*', 'GR.GEB?.*.*', 'GR.GEC?.*.*', 'GR.GED?.*.*']), ('GRF', ['GR.GR??.*.*']), ] ]
[docs]class SensorArrayFromFile(SensorArray, HasPaths): name = String.T() stations_path = Path.T()
def named_arrays_dataset(names=None): sources = [] for array in g_sensor_arrays: if names is None or array.name in names: sources.extend(array.sources) return Dataset(sources=sources) def named_arrays(): return g_sensor_arrays def named_array(name): for array in g_sensor_arrays: if array.name == name: return array raise KeyError(name) __all__ = [ 'SensorArrayInfo', 'SensorArray', 'SensorArrayFromFDSN', 'named_arrays_dataset', 'named_arrays', 'named_array', ]