Source code for pyrocko.gato.array

from collections import defaultdict

import numpy as num

from pyrocko import util
from pyrocko.guts import Object, String, List, Timestamp, Dict, Tuple, Float
from pyrocko.model.location import Location
from pyrocko.squirrel import CodesNSLCE, FDSNSource, Dataset, CodesNSL, Sensor

from pyrocko.has_paths import HasPaths, Path
from .grid.location import UnstructuredLocationGrid, distances_3d

guts_prefix = 'gato'
km = 1000.


def time_or_none_to_str(t):
    return util.time_to_str(t, '%Y-%m-%d') if t else '...'


[docs]class SensorArrayInfo(Object): name = String.T(optional=True) codes = List.T(CodesNSLCE.T()) tmin = Timestamp.T(optional=True) tmax = Timestamp.T(optional=True) center = Location.T(optional=True) distances_stats = Tuple.T(5, Float.T(), optional=True) codes_nsl__ = List.T(CodesNSL.T()) codes_nsl_by_channels = Dict.T(Tuple.T(String.T()), List.T(CodesNSL.T())) sensors = List.T(Sensor.T()) @property def summary(self): return ' | '.join(( self.name.ljust(15), '%2i' % len(self.codes_nsl), '%2i' % len(self.codes), time_or_none_to_str(self.tmin).ljust(10), time_or_none_to_str(self.tmax).ljust(10), ', '.join('%5.1f' % (v/km) for v in self.distances_stats) if self.distances_stats is not None else ' ' * 33, ', '.join('%s: %i' % (','.join(k), len(v)) for (k, v) in self.codes_nsl_by_channels.items()))) @property def codes_nsl(self): return sorted(set(CodesNSL(c) for c in self.codes)) @codes_nsl.setter def codes_nsl(self, _): pass
[docs]class SensorArray(Object): name = String.T(optional=True) codes = List.T(CodesNSLCE.T()) comment = String.T(optional=True) def get_info(self, sq, channels=None, time=None, tmin=None, tmax=None): if isinstance(channels, str): channels = [channels] codes_query = set() if channels is not None: for c in self.codes: for cha in channels: codes_query.add(c.replace(channel=cha)) codes_query = sorted(codes_query) else: codes_query = self.codes tmins = [] tmaxs = [] codes = set() nsl_by_chas = defaultdict(set) sensors = sq.get_sensors( codes=codes_query, time=time, tmin=tmin, tmax=tmax) for sensor in sensors: for channel in sensor.channels: codes.add(channel.codes) sensor_chas = tuple( sorted(set( channel.codes.channel for channel in sensor.channels))) nsl_by_chas[sensor_chas].add(CodesNSL(sensor.codes)) tmins.append(sensor.tmin) tmaxs.append(sensor.tmax) tmins = [tmin for tmin in tmins if tmin is not None] tmin = min(tmins) if tmins else None tmax = None if None in tmaxs or not tmaxs else max(tmaxs) if sensors: grid = UnstructuredLocationGrid.from_locations( sensors, ignore_position_duplicates=True) center = grid.get_center() distances = distances_3d(grid, grid).flatten() distances = distances[distances != 0.0] distances_stats = tuple(num.percentile( distances, [0., 10., 50., 90., 100.])) \ if distances.size != 0 else None else: center = None distances_stats = None codes_nsl_by_channels = dict( (k, sorted(v)) for (k, v) in nsl_by_chas.items()) return SensorArrayInfo( name=self.name, codes=sorted(codes), codes_nsl_by_channels=codes_nsl_by_channels, tmin=tmin, tmax=tmax, distances_stats=distances_stats, center=center, sensors=sensors)
[docs]class SensorArrayFromFDSN(SensorArray): sources = List.T(FDSNSource.T())
def to_codes(codes): return [CodesNSLCE(c) for c in codes] def _make_fdsn_source(site, codes): return FDSNSource(site=site, codes=codes) g_sensor_arrays = [ SensorArrayFromFDSN( name=name, codes=to_codes(codes), sources=[_make_fdsn_source('iris', codes)]) for (name, codes) in [ ('yka', ['CN.YKA*.*.*']), ('esk', ['IM.EKB?.*.*', 'IM.EKR*.*.*']), ('esk1', ['IM.EKA?.*.*']), ('ilar', ['IM.IL*.*.*']), ('imar', ['IM.IM0?.*.*']), ('nia', ['IM.I56H?.*.*']), ('pfia', ['IM.I57H?.*.*', 'IM.I57L?.*.*']), ('hia', ['IM.I59H?.*.*']), ('ia-bermuda', ['IM.I51H?.*.*']), ('ia-fairmanks', ['IM.I53H?.*.*']), ('dgha-land', ['IM.I52H?.*.*']), ('bma', ['IM.BM0?.*.*']), ('bca', ['IM.BC0?.*.*']), ('nvar', ['IM.NV*.*.*']), ('pdar', ['IM.PD0*.*.*', 'IM.PD1*.*.*']), ('txar', ['IM.TX*.*.*']), ('pilbara', ['AU.PSA*.*.*']), ('alice-springs', ['AU.AS*.*.*']), ('dghas', ['IM.H08S?.*.*']), ('dghan', ['IM.H08N?.*.*']), ('tdc', ['IM.H09N?.*.*', 'IM.I49H?.*.*']), ('ia-narrogin', ['IM.I04H?.*.*']), ('cocos-island', ['IM.I06H?.*.*']), ('warramunga', ['IM.I07H?.*.*']) ] ] + [ SensorArrayFromFDSN( name=name, codes=to_codes(codes), sources=[_make_fdsn_source('geofon', codes)]) for (name, codes) in [ ('rohrbach', ['6A.V*.*.*']), ('anta-offshore', ['GR.I27L?.*.*']), ('anta-onshore', ['AW.VNA*.*.*']), ] ] + [ SensorArrayFromFDSN( name=name, codes=to_codes(codes), sources=[_make_fdsn_source('bgr', codes)]) for (name, codes) in [ ('geres', [ 'GR.GEA?.*.*', 'GR.GEB?.*.*', 'GR.GEC?.*.*', 'GR.GED?.*.*']), ('grf', ['GR.GR??.*.*']), ] ] g_sensor_arrays_dict = dict( (array.name, array) for array in g_sensor_arrays)
[docs]class SensorArrayFromFile(SensorArray, HasPaths): name = String.T() stations_path = Path.T()
def get_named_arrays_dataset(names=None): if isinstance(names, str): names = [names] sources = [] for array in g_sensor_arrays: if names is None or array.name in names: sources.extend(array.sources) if names is None: comment = 'Aggregated dataset containing data sources for all ' \ 'built-in arrays of Gato.' else: if len(names) == 1: comment = 'Data source for Gato array: %s' % names[0] else: comment = 'Data sources for Gato array: %s' % ', ' .join(names) return Dataset(sources=sources, comment=comment) def get_named_arrays(): return g_sensor_arrays_dict def get_named_array(name): return g_sensor_arrays_dict[name] __all__ = [ 'SensorArrayInfo', 'SensorArray', 'SensorArrayFromFDSN', 'get_named_arrays_dataset', 'get_named_arrays', 'get_named_array', ]