Coverage for /usr/local/lib/python3.11/dist-packages/pyrocko/scenario/collection.py: 79%

140 statements  

« prev     ^ index     » next       coverage.py v6.5.0, created at 2024-01-02 12:31 +0000

1# http://pyrocko.org - GPLv3 

2# 

3# The Pyrocko Developers, 21st Century 

4# ---|P------/S----------~Lg---------- 

5 

6''' 

7Handle collections of scenarios. 

8''' 

9 

10 

11import os 

12import tarfile 

13import errno 

14import numpy as num 

15import time 

16 

17 

18from pyrocko.guts import Object, Timestamp 

19from pyrocko import gf, guts, util, pile 

20from pyrocko.plot import gmtpy 

21 

22from .scenario import draw_scenario_gmt 

23from .error import ScenarioError 

24 

25op = os.path 

26guts_prefix = 'pf.scenario' 

27 

28 

29def mtime(p): 

30 return os.stat(p).st_mtime 

31 

32 

33class ScenarioCollectionItem(Object): 

34 scenario_id = gf.StringID.T() 

35 time_created = Timestamp.T() 

36 

37 def __init__(self, **kwargs): 

38 Object.__init__(self, **kwargs) 

39 self._path = None 

40 self._pile = None 

41 self._engine = None 

42 self._scenes = None 

43 

44 def set_base_path(self, path): 

45 self._path = path 

46 

47 def get_base_path(self): 

48 if self._path is None: 

49 raise EnvironmentError('Base path not set!') 

50 return self._path 

51 

52 def init_modelling(self, engine): 

53 self._engine = engine 

54 

55 def get_path(self, *entry): 

56 return op.join(*((self._path,) + entry)) 

57 

58 def get_generator(self): 

59 generator = guts.load(filename=self.get_path('generator.yaml')) 

60 generator.init_modelling(self._engine) 

61 return generator 

62 

63 def get_time_range(self): 

64 return self.get_generator().get_time_range() 

65 

66 def have_waveforms(self, tmin, tmax): 

67 p = self.get_waveform_pile() 

68 trs_have = p.all( 

69 tmin=tmin, tmax=tmax, load_data=False, degap=False) 

70 

71 return any(tr.data_len() > 0 for tr in trs_have) 

72 

73 def get_waveform_pile(self): 

74 self.ensure_data() 

75 

76 if self._pile is None: 

77 path_waveforms = self.get_path('waveforms') 

78 util.ensuredir(path_waveforms) 

79 fns = util.select_files( 

80 [path_waveforms], show_progress=False) 

81 

82 self._pile = pile.Pile() 

83 if fns: 

84 self._pile.load_files( 

85 fns, fileformat='mseed', show_progress=False) 

86 

87 return self._pile 

88 

89 def get_insar_scenes(self): 

90 from kite import Scene 

91 if self._scenes is None: 

92 self._scenes = [] 

93 path_insar = self.get_path('insar') 

94 util.ensuredir(path_insar) 

95 

96 fns = util.select_files([path_insar], include='\\.(npz)$', 

97 show_progress=False) 

98 for f in fns: 

99 self._scenes.append(Scene.load(f)) 

100 

101 return self._scenes 

102 

103 def get_gnss_campaigns(self): 

104 return self.get_generator().get_gnss_campaigns() 

105 

106 def make_map(self, path_pdf): 

107 draw_scenario_gmt(self.get_generator(), path_pdf) 

108 

109 def get_map(self, format='pdf'): 

110 path_pdf = self.get_path('map.pdf') 

111 

112 if not op.exists(path_pdf): 

113 self.make_map(path_pdf) 

114 

115 path = self.get_path('map.%s' % format) 

116 

117 outdated = op.exists(path) and mtime(path) < mtime(path_pdf) 

118 if not op.exists(path) or outdated: 

119 gmtpy.convert_graph(path_pdf, path) 

120 

121 return path 

122 

123 def ensure_data(self, tmin=None, tmax=None): 

124 return self.get_generator().ensure_data( 

125 self.get_path(), tmin, tmax) 

126 

127 def get_archive(self): 

128 self.ensure_data() 

129 

130 path_tar = self.get_path('archive.tar') 

131 if not op.exists(path_tar): 

132 path_base = self.get_path() 

133 path_waveforms = self.get_path('waveforms') 

134 self.ensure_data() 

135 

136 fns = util.select_files( 

137 [path_waveforms], show_progress=False) 

138 

139 f = tarfile.TarFile(path_tar, 'w') 

140 for fn in fns: 

141 fna = fn[len(path_base)+1:] 

142 f.add(fn, fna) 

143 

144 f.close() 

145 

146 return path_tar 

147 

148 

149class ScenarioCollection(object): 

150 

151 def __init__(self, path, engine): 

152 self._scenario_suffix = 'scenario' 

153 self._path = path 

154 util.ensuredir(self._path) 

155 self._engine = engine 

156 self._load_scenarios() 

157 

158 def _load_scenarios(self): 

159 scenarios = [] 

160 base_path = self.get_path() 

161 for path_entry in os.listdir(base_path): 

162 scenario_id, suffix = op.splitext(path_entry) 

163 if suffix == '.' + self._scenario_suffix: 

164 path = op.join(base_path, path_entry, 'scenario.yaml') 

165 scenario = guts.load(filename=path) 

166 assert scenario.scenario_id == scenario_id 

167 scenario.set_base_path(op.join(base_path, path_entry)) 

168 scenario.init_modelling(self._engine) 

169 scenarios.append(scenario) 

170 

171 self._scenarios = scenarios 

172 self._scenarios.sort(key=lambda s: s.time_created) 

173 

174 def get_path(self, scenario_id=None, *entry): 

175 if scenario_id is not None: 

176 return op.join(self._path, '%s.%s' % ( 

177 scenario_id, self._scenario_suffix), *entry) 

178 else: 

179 return self._path 

180 

181 def add_scenario(self, scenario_id, scenario_generator): 

182 

183 if scenario_generator.seed is None: 

184 scenario_generator = guts.clone(scenario_generator) 

185 scenario_generator.seed = num.random.randint(1, 2**32-1) 

186 

187 path = self.get_path(scenario_id) 

188 try: 

189 os.mkdir(path) 

190 except OSError as e: 

191 if e.errno == errno.EEXIST: 

192 raise ScenarioError( 

193 'Scenario id is already in use: %s' % scenario_id) 

194 else: 

195 raise 

196 

197 scenario = ScenarioCollectionItem( 

198 scenario_id=scenario_id, 

199 time_created=util.to_time_float(time.time())) 

200 

201 scenario_path = self.get_path(scenario_id, 'scenario.yaml') 

202 guts.dump(scenario, filename=scenario_path) 

203 

204 generator_path = self.get_path(scenario_id, 'generator.yaml') 

205 guts.dump(scenario_generator, filename=generator_path) 

206 

207 scenario.set_base_path(self.get_path(scenario_id)) 

208 scenario.init_modelling(self._engine) 

209 

210 self._scenarios.append(scenario) 

211 

212 def list_scenarios(self, ilo=None, ihi=None): 

213 return self._scenarios[ilo:ihi] 

214 

215 def get_scenario(self, scenario_id): 

216 for scenario in self._scenarios: 

217 if scenario.scenario_id == scenario_id: 

218 return scenario 

219 

220 raise KeyError(scenario_id)