Coverage for /usr/local/lib/python3.13/dist-packages/pyrocko/squirrel/storage.py: 89%

102 statements  

« prev     ^ index     » next       coverage.py v7.6.0, created at 2025-12-04 10:41 +0000

1# http://pyrocko.org - GPLv3 

2# 

3# The Pyrocko Developers, 21st Century 

4# ---|P------/S----------~Lg---------- 

5 

6''' 

7Waveform data storage utilities. 

8''' 

9 

10import math 

11import os 

12from collections import defaultdict 

13 

14from pyrocko import guts, util, trace, io 

15from pyrocko.plot import nice_time_tick_inc_approx_secs 

16 

17from . import error 

18 

19guts_prefix = 'squirrel' 

20 

21nsamples_block = 100000 

22nsamples_segment = 1024**2 

23 

24 

25def _translate_path_template(s): 

26 map_name = { 

27 'year': '%(wmin_year)s', 

28 'month': '%(wmin_month)s', 

29 'day': '%(wmin_day)s', 

30 'jday': '%(wmin_jday)s', 

31 'hour': '%(wmin_hour)s', 

32 'minute': '%(wmin_minute)s', 

33 'second': '%(wmin_second)s', 

34 'net': '%(network_dsafe)s', 

35 'sta': '%(station_dsafe)s', 

36 'loc': '%(location_dsafe)s', 

37 'cha': '%(channel_dsafe)s', 

38 'ext': '%(extra_dsafe)s'} 

39 

40 return os.path.join(*[ 

41 '.'.join(map_name[x] for x in entry.split('.')) 

42 for entry in s.split('/')]) 

43 

44 

45def time_to_template_vars(prefix, t): 

46 d = dict(zip( 

47 [prefix + '_' + var 

48 for var in [ 

49 'year', 'month', 'day', 'jday', 'hour', 'minute', 'second']], 

50 util.time_to_str(t, '%Y.%m.%d.%j.%H.%M.%S').split('.'))) 

51 # needed for backwards compatibility 

52 d[prefix] = util.time_to_str(t, '%Y-%m-%d_%H-%M-%S') 

53 return d 

54 

55 

56def iter_windows(tmin, tmax, tinc, tinc_nonuniform): 

57 if tinc is None: 

58 yield tmin, tmax 

59 elif tinc_nonuniform is None: 

60 tmin = math.floor(tmin / tinc) * tinc 

61 t = tmin 

62 while t < tmax: 

63 yield t, t + tinc 

64 t += tinc 

65 elif tinc_nonuniform == 'month': 

66 yield from util.iter_months(tmin, tmax) 

67 elif tinc_nonuniform == 'year': 

68 yield from util.iter_years(tmin, tmax) 

69 else: 

70 raise error.SquirrelError( 

71 'Available non-uniform time intervals: month, year. ' 

72 'Invalid choice: %s' % tinc_nonuniform) 

73 

74 

75class StorageSchemeLayout(guts.Object): 

76 ''' 

77 Specific directory layout within a storage scheme. 

78 ''' 

79 name = guts.String.T( 

80 help='Name of the layout, informational.') 

81 time_increment = guts.Float.T( 

82 optional=True, 

83 help='Time window length stored in each file[s]. Exact or ' 

84 'approximate, depending on :py:gattr:`time_incement_nonuniform`.') 

85 time_increment_nonuniform = guts.String.T( 

86 optional=True, 

87 help='Identifier for non-uniform time windows. E.g. ``\'month\'`` or ' 

88 '``\'year\'``.') 

89 path_template = guts.String.T( 

90 help='Template for file paths.') 

91 

92 def get_additional(self, wmin, wmax): 

93 d = {} 

94 d.update(time_to_template_vars('wmin', wmin)) 

95 d.update(time_to_template_vars('wmax', wmax)) 

96 return d 

97 

98 @classmethod 

99 def describe_header(self): 

100 return '%8s %11s %6s %10s %12s %10s %6s %4s %8s %6s' % ( 

101 'rate', 

102 'deltat', 

103 'tblock', 

104 '', 

105 'tsegment', 

106 '', 

107 'layout', 

108 'fseg', 

109 'fsize', 

110 'levels') 

111 

112 def describe(self, deltat): 

113 rate = 1.0 / deltat 

114 tblock = nice_time_tick_inc_approx_secs(deltat * nsamples_block) 

115 tsegment = deltat * nsamples_segment 

116 segments_per_file = self.time_increment / tsegment \ 

117 if self.time_increment is not None else 0.0, 

118 bytesize = (self.time_increment or 0.0) / deltat * 4 

119 

120 return '%8.1f %11.5f %6.0f %10s %12.0f %10s %6s %4.0f %8s %6i' % ( 

121 rate, 

122 deltat, 

123 tblock, 

124 guts.str_duration(tblock), 

125 tsegment, 

126 guts.str_duration(tsegment), 

127 self.name, 

128 segments_per_file, 

129 util.human_bytesize(bytesize), 

130 len(self.path_template.split('/'))) 

131 

132 

133class StorageScheme(guts.Object): 

134 ''' 

135 Storage scheme for waveform archive data. 

136 ''' 

137 name = guts.String.T( 

138 help='Storage scheme name.') 

139 layouts = guts.List.T( 

140 StorageSchemeLayout.T(), 

141 help='Directory layouts supported by the scheme.') 

142 min_segments_per_file = guts.Float.T( 

143 default=1.0, 

144 help='Target minimum number of segments to be stored in each file.') 

145 format = guts.String.T( 

146 default='mseed', 

147 help='File format of waveform data files.') 

148 description = guts.String.T( 

149 default='', 

150 help='Description of the storage scheme.') 

151 

152 def post_init(self): 

153 self._base_path = None 

154 

155 def set_base_path(self, base_path): 

156 self._base_path = base_path 

157 

158 def select_layout(self, deltat): 

159 tsegment = deltat * nsamples_segment 

160 twant = tsegment * self.min_segments_per_file 

161 for layout in self.layouts: 

162 if layout.time_increment is None or layout.time_increment > twant: 

163 return layout 

164 

165 return layout 

166 

167 def save(self, traces, **save_kwargs): 

168 

169 assert save_kwargs.get('append', True) 

170 assert save_kwargs.get('check_append', True) 

171 

172 save_kwargs['append'] = True 

173 save_kwargs['check_append'] = True 

174 additional_external = save_kwargs.pop('additional', {}) 

175 

176 by_deltat = defaultdict(list) 

177 for tr in traces: 

178 by_deltat[tr.deltat].append(tr) 

179 

180 file_names = set() 

181 for deltat, traces_group in by_deltat.items(): 

182 layout = self.select_layout(deltat) 

183 traces_group.sort(key=lambda tr: tr.full_id) 

184 traces_group = trace.deoverlap(traces_group) 

185 tmin = min(tr.tmin for tr in traces_group) 

186 tmax = max(tr.tmax for tr in traces_group) 

187 for wmin, wmax in iter_windows( 

188 tmin, 

189 tmax, 

190 layout.time_increment, 

191 layout.time_increment_nonuniform): 

192 

193 traces_window = [] 

194 for tr in traces_group: 

195 try: 

196 traces_window.append( 

197 tr.chop(wmin, wmax, inplace=False)) 

198 except trace.NoData: 

199 pass 

200 

201 additional = layout.get_additional(wmin, wmax) 

202 additional.update(additional_external) 

203 

204 file_names.update(io.save( 

205 traces_window, 

206 layout.path_template 

207 if self._base_path is None 

208 else os.path.join(self._base_path, layout.path_template), 

209 additional=additional, 

210 **save_kwargs)) 

211 

212 return sorted(file_names) 

213 

214 

215_g_schemes_list = [] 

216_g_schemes_list.append(StorageScheme( 

217 name='default', 

218 description='Dynamic storage scheme with balanced file sizes of ' 

219 '10 - 400 MB and a balanced directory hierarchy of 4-6 levels', 

220 layouts=[ 

221 StorageSchemeLayout( 

222 name=name, 

223 time_increment=time_increment, 

224 time_increment_nonuniform=time_increment_nonuniform, 

225 path_template=_translate_path_template(path_template)) 

226 for (name, time_increment, time_increment_nonuniform, path_template) 

227 in [ 

228 ('second', 1.0, None, 'net/sta/loc.cha/year/month/day/hour/net.sta.loc.cha.year.month.day.hour.minute.second'), # noqa 

229 ('minute', 60.0, None,'net/sta/loc.cha/year/month/day/net.sta.loc.cha.ext.year.month.day.hour.minute'), # noqa 

230 ('hour', 3600.0, None,'net/sta/loc.cha/year/month/net.sta.loc.cha.ext.year.month.day.hour'), # noqa 

231 ('day', 86400.0, None,'net/sta/loc.cha/year/net.sta.loc.cha.ext.year.month.day'), # noqa 

232 ('month', 2628000.0, 'month','net/sta/loc.cha/year/net.sta.loc.cha.ext.year.month'), # noqa 

233 ('year', 31536000.0, 'year', 'net/sta/loc.cha/net.sta.loc.cha.ext.year')]], # noqa 

234 min_segments_per_file=1.5)) 

235 

236 

237_g_schemes_list.append(StorageScheme( 

238 name='sds', 

239 description='Directory scheme conforming to SeisComP Data Structure (SDS) ' 

240 'archive format (https://www.seiscomp.de/seiscomp3/doc' 

241 '/applications/slarchive/SDS.html). The scheme has a fixed ' 

242 'layout with day files.', 

243 layouts=[ 

244 StorageSchemeLayout( 

245 name='sds', 

246 time_increment=24*3600., 

247 path_template=os.path.join( 

248 '%(wmin_year)s', 

249 '%(network_safe)s', 

250 '%(station_safe)s', 

251 '%(channel_safe)s.D', 

252 '%(network_safe)s.%(station)s.%(location)s.%(channel)s.D' 

253 '.%(wmin_year)s.%(wmin_jday)s'))])) 

254 

255g_schemes = dict((scheme.name, scheme) for scheme in _g_schemes_list) 

256 

257 

258def get_storage_scheme(name): 

259 return guts.clone(g_schemes[name]) 

260 

261 

262class StorageSchemeChoice(guts.StringChoice): 

263 ''' 

264 Name of a supported storage scheme. 

265 ''' 

266 choices = list(g_schemes.keys()) 

267 

268 

269__all__ = [ 

270 'get_storage_scheme', 

271 'StorageScheme', 

272 'StorageSchemeLayout', 

273 'StorageSchemeChoice']