Coverage for /usr/local/lib/python3.13/dist-packages/pyrocko/squirrel/storage.py: 89%
102 statements
« prev ^ index » next coverage.py v7.6.0, created at 2025-12-04 10:41 +0000
« prev ^ index » next coverage.py v7.6.0, created at 2025-12-04 10:41 +0000
1# http://pyrocko.org - GPLv3
2#
3# The Pyrocko Developers, 21st Century
4# ---|P------/S----------~Lg----------
6'''
7Waveform data storage utilities.
8'''
10import math
11import os
12from collections import defaultdict
14from pyrocko import guts, util, trace, io
15from pyrocko.plot import nice_time_tick_inc_approx_secs
17from . import error
19guts_prefix = 'squirrel'
21nsamples_block = 100000
22nsamples_segment = 1024**2
25def _translate_path_template(s):
26 map_name = {
27 'year': '%(wmin_year)s',
28 'month': '%(wmin_month)s',
29 'day': '%(wmin_day)s',
30 'jday': '%(wmin_jday)s',
31 'hour': '%(wmin_hour)s',
32 'minute': '%(wmin_minute)s',
33 'second': '%(wmin_second)s',
34 'net': '%(network_dsafe)s',
35 'sta': '%(station_dsafe)s',
36 'loc': '%(location_dsafe)s',
37 'cha': '%(channel_dsafe)s',
38 'ext': '%(extra_dsafe)s'}
40 return os.path.join(*[
41 '.'.join(map_name[x] for x in entry.split('.'))
42 for entry in s.split('/')])
45def time_to_template_vars(prefix, t):
46 d = dict(zip(
47 [prefix + '_' + var
48 for var in [
49 'year', 'month', 'day', 'jday', 'hour', 'minute', 'second']],
50 util.time_to_str(t, '%Y.%m.%d.%j.%H.%M.%S').split('.')))
51 # needed for backwards compatibility
52 d[prefix] = util.time_to_str(t, '%Y-%m-%d_%H-%M-%S')
53 return d
56def iter_windows(tmin, tmax, tinc, tinc_nonuniform):
57 if tinc is None:
58 yield tmin, tmax
59 elif tinc_nonuniform is None:
60 tmin = math.floor(tmin / tinc) * tinc
61 t = tmin
62 while t < tmax:
63 yield t, t + tinc
64 t += tinc
65 elif tinc_nonuniform == 'month':
66 yield from util.iter_months(tmin, tmax)
67 elif tinc_nonuniform == 'year':
68 yield from util.iter_years(tmin, tmax)
69 else:
70 raise error.SquirrelError(
71 'Available non-uniform time intervals: month, year. '
72 'Invalid choice: %s' % tinc_nonuniform)
75class StorageSchemeLayout(guts.Object):
76 '''
77 Specific directory layout within a storage scheme.
78 '''
79 name = guts.String.T(
80 help='Name of the layout, informational.')
81 time_increment = guts.Float.T(
82 optional=True,
83 help='Time window length stored in each file[s]. Exact or '
84 'approximate, depending on :py:gattr:`time_incement_nonuniform`.')
85 time_increment_nonuniform = guts.String.T(
86 optional=True,
87 help='Identifier for non-uniform time windows. E.g. ``\'month\'`` or '
88 '``\'year\'``.')
89 path_template = guts.String.T(
90 help='Template for file paths.')
92 def get_additional(self, wmin, wmax):
93 d = {}
94 d.update(time_to_template_vars('wmin', wmin))
95 d.update(time_to_template_vars('wmax', wmax))
96 return d
98 @classmethod
99 def describe_header(self):
100 return '%8s %11s %6s %10s %12s %10s %6s %4s %8s %6s' % (
101 'rate',
102 'deltat',
103 'tblock',
104 '',
105 'tsegment',
106 '',
107 'layout',
108 'fseg',
109 'fsize',
110 'levels')
112 def describe(self, deltat):
113 rate = 1.0 / deltat
114 tblock = nice_time_tick_inc_approx_secs(deltat * nsamples_block)
115 tsegment = deltat * nsamples_segment
116 segments_per_file = self.time_increment / tsegment \
117 if self.time_increment is not None else 0.0,
118 bytesize = (self.time_increment or 0.0) / deltat * 4
120 return '%8.1f %11.5f %6.0f %10s %12.0f %10s %6s %4.0f %8s %6i' % (
121 rate,
122 deltat,
123 tblock,
124 guts.str_duration(tblock),
125 tsegment,
126 guts.str_duration(tsegment),
127 self.name,
128 segments_per_file,
129 util.human_bytesize(bytesize),
130 len(self.path_template.split('/')))
133class StorageScheme(guts.Object):
134 '''
135 Storage scheme for waveform archive data.
136 '''
137 name = guts.String.T(
138 help='Storage scheme name.')
139 layouts = guts.List.T(
140 StorageSchemeLayout.T(),
141 help='Directory layouts supported by the scheme.')
142 min_segments_per_file = guts.Float.T(
143 default=1.0,
144 help='Target minimum number of segments to be stored in each file.')
145 format = guts.String.T(
146 default='mseed',
147 help='File format of waveform data files.')
148 description = guts.String.T(
149 default='',
150 help='Description of the storage scheme.')
152 def post_init(self):
153 self._base_path = None
155 def set_base_path(self, base_path):
156 self._base_path = base_path
158 def select_layout(self, deltat):
159 tsegment = deltat * nsamples_segment
160 twant = tsegment * self.min_segments_per_file
161 for layout in self.layouts:
162 if layout.time_increment is None or layout.time_increment > twant:
163 return layout
165 return layout
167 def save(self, traces, **save_kwargs):
169 assert save_kwargs.get('append', True)
170 assert save_kwargs.get('check_append', True)
172 save_kwargs['append'] = True
173 save_kwargs['check_append'] = True
174 additional_external = save_kwargs.pop('additional', {})
176 by_deltat = defaultdict(list)
177 for tr in traces:
178 by_deltat[tr.deltat].append(tr)
180 file_names = set()
181 for deltat, traces_group in by_deltat.items():
182 layout = self.select_layout(deltat)
183 traces_group.sort(key=lambda tr: tr.full_id)
184 traces_group = trace.deoverlap(traces_group)
185 tmin = min(tr.tmin for tr in traces_group)
186 tmax = max(tr.tmax for tr in traces_group)
187 for wmin, wmax in iter_windows(
188 tmin,
189 tmax,
190 layout.time_increment,
191 layout.time_increment_nonuniform):
193 traces_window = []
194 for tr in traces_group:
195 try:
196 traces_window.append(
197 tr.chop(wmin, wmax, inplace=False))
198 except trace.NoData:
199 pass
201 additional = layout.get_additional(wmin, wmax)
202 additional.update(additional_external)
204 file_names.update(io.save(
205 traces_window,
206 layout.path_template
207 if self._base_path is None
208 else os.path.join(self._base_path, layout.path_template),
209 additional=additional,
210 **save_kwargs))
212 return sorted(file_names)
215_g_schemes_list = []
216_g_schemes_list.append(StorageScheme(
217 name='default',
218 description='Dynamic storage scheme with balanced file sizes of '
219 '10 - 400 MB and a balanced directory hierarchy of 4-6 levels',
220 layouts=[
221 StorageSchemeLayout(
222 name=name,
223 time_increment=time_increment,
224 time_increment_nonuniform=time_increment_nonuniform,
225 path_template=_translate_path_template(path_template))
226 for (name, time_increment, time_increment_nonuniform, path_template)
227 in [
228 ('second', 1.0, None, 'net/sta/loc.cha/year/month/day/hour/net.sta.loc.cha.year.month.day.hour.minute.second'), # noqa
229 ('minute', 60.0, None,'net/sta/loc.cha/year/month/day/net.sta.loc.cha.ext.year.month.day.hour.minute'), # noqa
230 ('hour', 3600.0, None,'net/sta/loc.cha/year/month/net.sta.loc.cha.ext.year.month.day.hour'), # noqa
231 ('day', 86400.0, None,'net/sta/loc.cha/year/net.sta.loc.cha.ext.year.month.day'), # noqa
232 ('month', 2628000.0, 'month','net/sta/loc.cha/year/net.sta.loc.cha.ext.year.month'), # noqa
233 ('year', 31536000.0, 'year', 'net/sta/loc.cha/net.sta.loc.cha.ext.year')]], # noqa
234 min_segments_per_file=1.5))
237_g_schemes_list.append(StorageScheme(
238 name='sds',
239 description='Directory scheme conforming to SeisComP Data Structure (SDS) '
240 'archive format (https://www.seiscomp.de/seiscomp3/doc'
241 '/applications/slarchive/SDS.html). The scheme has a fixed '
242 'layout with day files.',
243 layouts=[
244 StorageSchemeLayout(
245 name='sds',
246 time_increment=24*3600.,
247 path_template=os.path.join(
248 '%(wmin_year)s',
249 '%(network_safe)s',
250 '%(station_safe)s',
251 '%(channel_safe)s.D',
252 '%(network_safe)s.%(station)s.%(location)s.%(channel)s.D'
253 '.%(wmin_year)s.%(wmin_jday)s'))]))
255g_schemes = dict((scheme.name, scheme) for scheme in _g_schemes_list)
258def get_storage_scheme(name):
259 return guts.clone(g_schemes[name])
262class StorageSchemeChoice(guts.StringChoice):
263 '''
264 Name of a supported storage scheme.
265 '''
266 choices = list(g_schemes.keys())
269__all__ = [
270 'get_storage_scheme',
271 'StorageScheme',
272 'StorageSchemeLayout',
273 'StorageSchemeChoice']