Source code for pyrocko.scenario.collection

# https://pyrocko.org - GPLv3
#
# The Pyrocko Developers, 21st Century
# ---|P------/S----------~Lg----------

from __future__ import absolute_import, print_function, division

import os
import tarfile
import errno
import numpy as num
import time


from pyrocko.guts import Object, Timestamp
from pyrocko import gf, guts, util, pile, gmtpy

from .scenario import draw_scenario_gmt
from .error import ScenarioError

op = os.path
guts_prefix = 'pf.scenario'


def mtime(p):
    return os.stat(p).st_mtime


[docs]class ScenarioCollectionItem(Object): scenario_id = gf.StringID.T() time_created = Timestamp.T() def __init__(self, **kwargs): Object.__init__(self, **kwargs) self._path = None self._pile = None self._engine = None self._scenes = None def set_base_path(self, path): self._path = path def get_base_path(self): if self._path is None: raise EnvironmentError('Base path not set!') return self._path def init_modelling(self, engine): self._engine = engine def get_path(self, *entry): return op.join(*((self._path,) + entry)) def get_generator(self): generator = guts.load(filename=self.get_path('generator.yaml')) generator.init_modelling(self._engine) return generator def get_time_range(self): return self.get_generator().get_time_range() def have_waveforms(self, tmin, tmax): p = self.get_waveform_pile() trs_have = p.all( tmin=tmin, tmax=tmax, load_data=False, degap=False) return any(tr.data_len() > 0 for tr in trs_have) def get_waveform_pile(self): self.ensure_data() if self._pile is None: path_waveforms = self.get_path('waveforms') util.ensuredir(path_waveforms) fns = util.select_files( [path_waveforms], show_progress=False) self._pile = pile.Pile() if fns: self._pile.load_files( fns, fileformat='mseed', show_progress=False) return self._pile def get_insar_scenes(self): from kite import Scene if self._scenes is None: self._scenes = [] path_insar = self.get_path('insar') util.ensuredir(path_insar) fns = util.select_files([path_insar], regex='\\.(npz)$', show_progress=False) for f in fns: self._scenes.append(Scene.load(f)) return self._scenes def get_gnss_campaigns(self): return self.get_generator().get_gnss_campaigns() def make_map(self, path_pdf): draw_scenario_gmt(self.get_generator(), path_pdf) def get_map(self, format='pdf'): path_pdf = self.get_path('map.pdf') if not op.exists(path_pdf): self.make_map(path_pdf) path = self.get_path('map.%s' % format) outdated = op.exists(path) and mtime(path) < mtime(path_pdf) if not op.exists(path) or outdated: gmtpy.convert_graph(path_pdf, path) return path def ensure_data(self, tmin=None, tmax=None): return self.get_generator().ensure_data( self.get_path(), tmin, tmax) def get_archive(self): self.ensure_data() path_tar = self.get_path('archive.tar') if not op.exists(path_tar): path_base = self.get_path() path_waveforms = self.get_path('waveforms') self.ensure_data() fns = util.select_files( [path_waveforms], show_progress=False) f = tarfile.TarFile(path_tar, 'w') for fn in fns: fna = fn[len(path_base)+1:] f.add(fn, fna) f.close() return path_tar
class ScenarioCollection(object): def __init__(self, path, engine): self._scenario_suffix = 'scenario' self._path = path util.ensuredir(self._path) self._engine = engine self._load_scenarios() def _load_scenarios(self): scenarios = [] base_path = self.get_path() for path_entry in os.listdir(base_path): scenario_id, suffix = op.splitext(path_entry) if suffix == '.' + self._scenario_suffix: path = op.join(base_path, path_entry, 'scenario.yaml') scenario = guts.load(filename=path) assert scenario.scenario_id == scenario_id scenario.set_base_path(op.join(base_path, path_entry)) scenario.init_modelling(self._engine) scenarios.append(scenario) self._scenarios = scenarios self._scenarios.sort(key=lambda s: s.time_created) def get_path(self, scenario_id=None, *entry): if scenario_id is not None: return op.join(self._path, '%s.%s' % ( scenario_id, self._scenario_suffix), *entry) else: return self._path def add_scenario(self, scenario_id, scenario_generator): if scenario_generator.seed is None: scenario_generator = guts.clone(scenario_generator) scenario_generator.seed = num.random.randint(1, 2**32-1) path = self.get_path(scenario_id) try: os.mkdir(path) except OSError as e: if e.errno == errno.EEXIST: raise ScenarioError( 'Scenario id is already in use: %s' % scenario_id) else: raise scenario = ScenarioCollectionItem( scenario_id=scenario_id, time_created=time.time()) scenario_path = self.get_path(scenario_id, 'scenario.yaml') guts.dump(scenario, filename=scenario_path) generator_path = self.get_path(scenario_id, 'generator.yaml') guts.dump(scenario_generator, filename=generator_path) scenario.set_base_path(self.get_path(scenario_id)) scenario.init_modelling(self._engine) self._scenarios.append(scenario) def list_scenarios(self, ilo=None, ihi=None): return self._scenarios[ilo:ihi] def get_scenario(self, scenario_id): for scenario in self._scenarios: if scenario.scenario_id == scenario_id: return scenario raise KeyError(scenario_id)