# https://pyrocko.org - GPLv3
#
# The Pyrocko Developers, 21st Century
# ---|P------/S----------~Lg----------
from __future__ import absolute_import, print_function, division
import os
import tarfile
import errno
import numpy as num
import time
from pyrocko.guts import Object, Timestamp
from pyrocko import gf, guts, util, pile, gmtpy
from .scenario import draw_scenario_gmt
from .error import ScenarioError
op = os.path
guts_prefix = 'pf.scenario'
def mtime(p):
return os.stat(p).st_mtime
[docs]class ScenarioCollectionItem(Object):
scenario_id = gf.StringID.T()
time_created = Timestamp.T()
def __init__(self, **kwargs):
Object.__init__(self, **kwargs)
self._path = None
self._pile = None
self._engine = None
self._scenes = None
def set_base_path(self, path):
self._path = path
def get_base_path(self):
if self._path is None:
raise EnvironmentError('Base path not set!')
return self._path
def init_modelling(self, engine):
self._engine = engine
def get_path(self, *entry):
return op.join(*((self._path,) + entry))
def get_generator(self):
generator = guts.load(filename=self.get_path('generator.yaml'))
generator.init_modelling(self._engine)
return generator
def get_time_range(self):
return self.get_generator().get_time_range()
def have_waveforms(self, tmin, tmax):
p = self.get_waveform_pile()
trs_have = p.all(
tmin=tmin, tmax=tmax, load_data=False, degap=False)
return any(tr.data_len() > 0 for tr in trs_have)
def get_waveform_pile(self):
self.ensure_data()
if self._pile is None:
path_waveforms = self.get_path('waveforms')
util.ensuredir(path_waveforms)
fns = util.select_files(
[path_waveforms], show_progress=False)
self._pile = pile.Pile()
if fns:
self._pile.load_files(
fns, fileformat='mseed', show_progress=False)
return self._pile
def get_insar_scenes(self):
from kite import Scene
if self._scenes is None:
self._scenes = []
path_insar = self.get_path('insar')
util.ensuredir(path_insar)
fns = util.select_files([path_insar], regex='\\.(npz)$',
show_progress=False)
for f in fns:
self._scenes.append(Scene.load(f))
return self._scenes
def get_gnss_campaigns(self):
return self.get_generator().get_gnss_campaigns()
def make_map(self, path_pdf):
draw_scenario_gmt(self.get_generator(), path_pdf)
def get_map(self, format='pdf'):
path_pdf = self.get_path('map.pdf')
if not op.exists(path_pdf):
self.make_map(path_pdf)
path = self.get_path('map.%s' % format)
outdated = op.exists(path) and mtime(path) < mtime(path_pdf)
if not op.exists(path) or outdated:
gmtpy.convert_graph(path_pdf, path)
return path
def ensure_data(self, tmin=None, tmax=None):
return self.get_generator().ensure_data(
self.get_path(), tmin, tmax)
def get_archive(self):
self.ensure_data()
path_tar = self.get_path('archive.tar')
if not op.exists(path_tar):
path_base = self.get_path()
path_waveforms = self.get_path('waveforms')
self.ensure_data()
fns = util.select_files(
[path_waveforms], show_progress=False)
f = tarfile.TarFile(path_tar, 'w')
for fn in fns:
fna = fn[len(path_base)+1:]
f.add(fn, fna)
f.close()
return path_tar
class ScenarioCollection(object):
def __init__(self, path, engine):
self._scenario_suffix = 'scenario'
self._path = path
util.ensuredir(self._path)
self._engine = engine
self._load_scenarios()
def _load_scenarios(self):
scenarios = []
base_path = self.get_path()
for path_entry in os.listdir(base_path):
scenario_id, suffix = op.splitext(path_entry)
if suffix == '.' + self._scenario_suffix:
path = op.join(base_path, path_entry, 'scenario.yaml')
scenario = guts.load(filename=path)
assert scenario.scenario_id == scenario_id
scenario.set_base_path(op.join(base_path, path_entry))
scenario.init_modelling(self._engine)
scenarios.append(scenario)
self._scenarios = scenarios
self._scenarios.sort(key=lambda s: s.time_created)
def get_path(self, scenario_id=None, *entry):
if scenario_id is not None:
return op.join(self._path, '%s.%s' % (
scenario_id, self._scenario_suffix), *entry)
else:
return self._path
def add_scenario(self, scenario_id, scenario_generator):
if scenario_generator.seed is None:
scenario_generator = guts.clone(scenario_generator)
scenario_generator.seed = num.random.randint(1, 2**32-1)
path = self.get_path(scenario_id)
try:
os.mkdir(path)
except OSError as e:
if e.errno == errno.EEXIST:
raise ScenarioError(
'Scenario id is already in use: %s' % scenario_id)
else:
raise
scenario = ScenarioCollectionItem(
scenario_id=scenario_id,
time_created=time.time())
scenario_path = self.get_path(scenario_id, 'scenario.yaml')
guts.dump(scenario, filename=scenario_path)
generator_path = self.get_path(scenario_id, 'generator.yaml')
guts.dump(scenario_generator, filename=generator_path)
scenario.set_base_path(self.get_path(scenario_id))
scenario.init_modelling(self._engine)
self._scenarios.append(scenario)
def list_scenarios(self, ilo=None, ihi=None):
return self._scenarios[ilo:ihi]
def get_scenario(self, scenario_id):
for scenario in self._scenarios:
if scenario.scenario_id == scenario_id:
return scenario
raise KeyError(scenario_id)