Coverage for /usr/local/lib/python3.11/dist-packages/pyrocko/scenario/collection.py: 79%
140 statements
« prev ^ index » next coverage.py v6.5.0, created at 2023-10-06 15:01 +0000
« prev ^ index » next coverage.py v6.5.0, created at 2023-10-06 15:01 +0000
1# http://pyrocko.org - GPLv3
2#
3# The Pyrocko Developers, 21st Century
4# ---|P------/S----------~Lg----------
6'''
7Handle collections of scenarios.
8'''
11import os
12import tarfile
13import errno
14import numpy as num
15import time
18from pyrocko.guts import Object, Timestamp
19from pyrocko import gf, guts, util, pile
20from pyrocko.plot import gmtpy
22from .scenario import draw_scenario_gmt
23from .error import ScenarioError
25op = os.path
26guts_prefix = 'pf.scenario'
29def mtime(p):
30 return os.stat(p).st_mtime
33class ScenarioCollectionItem(Object):
34 scenario_id = gf.StringID.T()
35 time_created = Timestamp.T()
37 def __init__(self, **kwargs):
38 Object.__init__(self, **kwargs)
39 self._path = None
40 self._pile = None
41 self._engine = None
42 self._scenes = None
44 def set_base_path(self, path):
45 self._path = path
47 def get_base_path(self):
48 if self._path is None:
49 raise EnvironmentError('Base path not set!')
50 return self._path
52 def init_modelling(self, engine):
53 self._engine = engine
55 def get_path(self, *entry):
56 return op.join(*((self._path,) + entry))
58 def get_generator(self):
59 generator = guts.load(filename=self.get_path('generator.yaml'))
60 generator.init_modelling(self._engine)
61 return generator
63 def get_time_range(self):
64 return self.get_generator().get_time_range()
66 def have_waveforms(self, tmin, tmax):
67 p = self.get_waveform_pile()
68 trs_have = p.all(
69 tmin=tmin, tmax=tmax, load_data=False, degap=False)
71 return any(tr.data_len() > 0 for tr in trs_have)
73 def get_waveform_pile(self):
74 self.ensure_data()
76 if self._pile is None:
77 path_waveforms = self.get_path('waveforms')
78 util.ensuredir(path_waveforms)
79 fns = util.select_files(
80 [path_waveforms], show_progress=False)
82 self._pile = pile.Pile()
83 if fns:
84 self._pile.load_files(
85 fns, fileformat='mseed', show_progress=False)
87 return self._pile
89 def get_insar_scenes(self):
90 from kite import Scene
91 if self._scenes is None:
92 self._scenes = []
93 path_insar = self.get_path('insar')
94 util.ensuredir(path_insar)
96 fns = util.select_files([path_insar], include='\\.(npz)$',
97 show_progress=False)
98 for f in fns:
99 self._scenes.append(Scene.load(f))
101 return self._scenes
103 def get_gnss_campaigns(self):
104 return self.get_generator().get_gnss_campaigns()
106 def make_map(self, path_pdf):
107 draw_scenario_gmt(self.get_generator(), path_pdf)
109 def get_map(self, format='pdf'):
110 path_pdf = self.get_path('map.pdf')
112 if not op.exists(path_pdf):
113 self.make_map(path_pdf)
115 path = self.get_path('map.%s' % format)
117 outdated = op.exists(path) and mtime(path) < mtime(path_pdf)
118 if not op.exists(path) or outdated:
119 gmtpy.convert_graph(path_pdf, path)
121 return path
123 def ensure_data(self, tmin=None, tmax=None):
124 return self.get_generator().ensure_data(
125 self.get_path(), tmin, tmax)
127 def get_archive(self):
128 self.ensure_data()
130 path_tar = self.get_path('archive.tar')
131 if not op.exists(path_tar):
132 path_base = self.get_path()
133 path_waveforms = self.get_path('waveforms')
134 self.ensure_data()
136 fns = util.select_files(
137 [path_waveforms], show_progress=False)
139 f = tarfile.TarFile(path_tar, 'w')
140 for fn in fns:
141 fna = fn[len(path_base)+1:]
142 f.add(fn, fna)
144 f.close()
146 return path_tar
149class ScenarioCollection(object):
151 def __init__(self, path, engine):
152 self._scenario_suffix = 'scenario'
153 self._path = path
154 util.ensuredir(self._path)
155 self._engine = engine
156 self._load_scenarios()
158 def _load_scenarios(self):
159 scenarios = []
160 base_path = self.get_path()
161 for path_entry in os.listdir(base_path):
162 scenario_id, suffix = op.splitext(path_entry)
163 if suffix == '.' + self._scenario_suffix:
164 path = op.join(base_path, path_entry, 'scenario.yaml')
165 scenario = guts.load(filename=path)
166 assert scenario.scenario_id == scenario_id
167 scenario.set_base_path(op.join(base_path, path_entry))
168 scenario.init_modelling(self._engine)
169 scenarios.append(scenario)
171 self._scenarios = scenarios
172 self._scenarios.sort(key=lambda s: s.time_created)
174 def get_path(self, scenario_id=None, *entry):
175 if scenario_id is not None:
176 return op.join(self._path, '%s.%s' % (
177 scenario_id, self._scenario_suffix), *entry)
178 else:
179 return self._path
181 def add_scenario(self, scenario_id, scenario_generator):
183 if scenario_generator.seed is None:
184 scenario_generator = guts.clone(scenario_generator)
185 scenario_generator.seed = num.random.randint(1, 2**32-1)
187 path = self.get_path(scenario_id)
188 try:
189 os.mkdir(path)
190 except OSError as e:
191 if e.errno == errno.EEXIST:
192 raise ScenarioError(
193 'Scenario id is already in use: %s' % scenario_id)
194 else:
195 raise
197 scenario = ScenarioCollectionItem(
198 scenario_id=scenario_id,
199 time_created=util.to_time_float(time.time()))
201 scenario_path = self.get_path(scenario_id, 'scenario.yaml')
202 guts.dump(scenario, filename=scenario_path)
204 generator_path = self.get_path(scenario_id, 'generator.yaml')
205 guts.dump(scenario_generator, filename=generator_path)
207 scenario.set_base_path(self.get_path(scenario_id))
208 scenario.init_modelling(self._engine)
210 self._scenarios.append(scenario)
212 def list_scenarios(self, ilo=None, ihi=None):
213 return self._scenarios[ilo:ihi]
215 def get_scenario(self, scenario_id):
216 for scenario in self._scenarios:
217 if scenario.scenario_id == scenario_id:
218 return scenario
220 raise KeyError(scenario_id)