Coverage for /usr/local/lib/python3.13/dist-packages/pyrocko/io/mseed.py: 82%
151 statements
« prev ^ index » next coverage.py v7.6.0, created at 2025-12-04 10:41 +0000
« prev ^ index » next coverage.py v7.6.0, created at 2025-12-04 10:41 +0000
1# http://pyrocko.org - GPLv3
2#
3# The Pyrocko Developers, 21st Century
4# ---|P------/S----------~Lg----------
6'''
7Read/write MiniSEED files (wraps `libmseed
8<https://github.com/EarthScope/libmseed>`_).
9'''
12from collections import defaultdict
13from struct import unpack
14import os
15import re
16import math
17import logging
19from pyrocko import trace
20from pyrocko.util import reuse, ensuredirs
21from .io_common import FileLoadError, FileSaveError
23logger = logging.getLogger('pyrocko.io.mseed')
25MSEED_HEADER_BYTES = 64
26VALID_RECORD_LENGTHS = tuple(2**exp for exp in range(8, 20))
28g_bytes_read = 0
31class CodeTooLong(FileSaveError):
32 pass
35def iload(filename, load_data=True, offset=0, segment_size=0, nsegments=0,
36 tmin=0.0, tmax=0.0):
37 from pyrocko import mseed_ext
39 global g_bytes_read
41 try:
42 isegment = 0
43 while isegment < nsegments or nsegments == 0:
44 tr_tuples = mseed_ext.get_traces(
45 filename, load_data, offset, segment_size,
46 tmin=int(tmin*float(mseed_ext.HPTMODULUS)),
47 tmax=int(tmax*float(mseed_ext.HPTMODULUS)))
49 if not tr_tuples:
50 break
52 offset_end = tr_tuples[0][9]
53 assert all(tr_tuple[9] == offset_end for tr_tuple in tr_tuples)
55 g_bytes_read += offset_end - offset
57 for tr_tuple in tr_tuples:
58 network, station, location, channel = tr_tuple[1:5]
59 _tmin = float(tr_tuple[5])/float(mseed_ext.HPTMODULUS)
60 _tmax = float(tr_tuple[6])/float(mseed_ext.HPTMODULUS)
61 try:
62 deltat = reuse(1.0/float(tr_tuple[7]))
63 except ZeroDivisionError:
64 deltat = 0.0
66 ydata = tr_tuple[8]
68 tr = trace.Trace(
69 network.strip(),
70 station.strip(),
71 location.strip(),
72 channel.strip(),
73 _tmin,
74 _tmax,
75 deltat,
76 ydata)
78 tr.meta = {
79 'offset_start': offset,
80 'offset_end': offset_end,
81 'last': tr_tuple[10],
82 'segment_size': segment_size
83 }
85 if tmin or tmax:
86 try:
87 tr.chop(tmin or tr.tmin, tmax or tr.tmax,
88 include_last=False)
89 except trace.NoData:
90 pass
92 yield tr
94 if tr_tuple[10]:
95 break
97 offset = tr_tuple[9]
98 isegment += 1
100 except (OSError, mseed_ext.MSeedError) as e:
101 raise FileLoadError(str(e)+' (file: %s)' % filename)
104def as_tuple(tr, dataquality='D'):
105 from pyrocko import mseed_ext
106 itmin = int(round(tr.tmin*mseed_ext.HPTMODULUS))
107 itmax = int(round(tr.tmax*mseed_ext.HPTMODULUS))
108 srate = 1.0/tr.deltat
109 return (tr.network, tr.station, tr.location, tr.channel,
110 itmin, itmax, srate, dataquality, tr.get_ydata())
113def save(
114 traces,
115 filename_template,
116 additional={},
117 overwrite=True,
118 dataquality='D',
119 record_length=4096,
120 append=False,
121 check_append=False,
122 check_append_merge=False,
123 check_append_hook=None,
124 check_overlaps=True,
125 steim=1):
127 from pyrocko import mseed_ext
129 assert record_length in VALID_RECORD_LENGTHS
130 assert dataquality in ('D', 'E', 'C', 'O', 'T', 'L'), 'invalid dataquality'
132 # nifty logic for overwrite, append, check_append_hook(fn):
133 # file exists:
134 # overwrite, append
135 # 0, 0 => fail
136 # 0, 1, None => append
137 # 0, 1, 0 => fail
138 # 0, 1, 1 => append
139 # 1, 0 => truncate
140 # 1, 1, None => append
141 # 1, 1, 0 => truncate, append
142 # 1, 1, 1 => append
144 if not append:
145 check_append = False
146 check_append_hook = None
148 fn_tr = defaultdict(list)
149 for tr in traces:
150 for code, maxlen, val in zip(
151 ['network', 'station', 'location', 'channel'],
152 [2, 5, 2, 3],
153 tr.nslc_id):
155 if len(val) > maxlen:
156 raise CodeTooLong(
157 '%s code too long to be stored in MSeed file: %s' %
158 (code, val))
160 fn = tr.fill_template(filename_template, **additional)
161 if os.path.exists(fn):
162 if not overwrite:
163 if not append or (
164 append and check_append_hook
165 and not check_append_hook(fn)):
167 raise FileSaveError('File exists: %s' % fn)
169 else:
170 if not append or (
171 append and check_append_hook
172 and not check_append_hook(fn)):
174 os.unlink(fn)
176 fn_tr[fn].append(tr)
178 for fn, traces_thisfile in fn_tr.items():
179 if check_overlaps:
180 try:
181 trace.check_overlaps(
182 traces_thisfile,
183 message='Traces to be stored have overlaps.\n File: %s'
184 % fn)
186 except trace.OverlappingTraces as e:
187 raise FileSaveError(str(e)) from e
189 append_ = append
191 if check_append:
192 if os.path.exists(fn):
193 traces_infile = list(iload(fn, load_data=False))
194 try:
195 trace.check_overlaps(
196 traces_thisfile,
197 traces_infile,
198 message='Trace to be stored would overlap with '
199 'trace already stored in file.\n File: %s'
200 % fn)
201 except trace.OverlappingTraces as e:
202 if check_append_merge:
203 traces_thisfile_merge = list(iload(fn, load_data=True))
204 traces_thisfile_merge.extend(
205 tr.copy() for tr in traces_thisfile)
206 traces_thisfile = trace.degapper(
207 sorted(
208 traces_thisfile_merge,
209 key=lambda tr: tr.full_id))
211 append_ = False
213 else:
214 raise FileSaveError(str(e)) from e
216 trtups = []
217 traces_thisfile.sort(key=lambda a: a.full_id)
218 for tr in traces_thisfile:
219 trtups.append(as_tuple(tr, dataquality))
221 ensuredirs(fn)
222 try:
223 mseed_ext.store_traces(trtups, fn, record_length, append_, steim)
224 except mseed_ext.MSeedError as e:
225 raise FileSaveError(
226 str(e) + " (while storing traces to file '%s')" % fn)
228 return list(fn_tr.keys())
231tcs = {}
234def get_bytes(traces, dataquality='D', record_length=4096, steim=1):
235 from pyrocko import mseed_ext
237 assert record_length in VALID_RECORD_LENGTHS
238 assert dataquality in ('D', 'E', 'C', 'O', 'T', 'L'), 'invalid dataquality'
240 nbytes_approx = 0
241 rl = record_length
242 trtups = []
243 for tr in traces:
244 for code, maxlen, val in zip(
245 ['network', 'station', 'location', 'channel'],
246 [2, 5, 2, 3],
247 tr.nslc_id):
249 if len(val) > maxlen:
250 raise CodeTooLong(
251 '%s code too long to be stored in MSeed file: %s' %
252 (code, val))
254 nbytes_approx += math.ceil(
255 tr.ydata.nbytes / (rl-MSEED_HEADER_BYTES)) * rl
256 trtups.append(as_tuple(tr, dataquality))
258 return mseed_ext.mseed_bytes(trtups, nbytes_approx, record_length, steim)
261def detect(first512):
263 if len(first512) < 256:
264 return False
266 rec = first512
268 try:
269 sequence_number = int(rec[:6])
270 except Exception:
271 return False
272 if sequence_number < 0:
273 return False
275 type_code = rec[6:7]
276 if type_code in b'DRQM':
277 bads = []
278 for sex in '<>':
279 bad = False
280 fmt = sex + '6s1s1s5s2s3s2s10sH2h4Bl2H'
281 vals = unpack(fmt, rec[:48])
282 fmt_btime = sex + 'HHBBBBH'
283 tvals = unpack(fmt_btime, vals[7])
284 if tvals[1] < 1 or tvals[1] > 367 or tvals[2] > 23 or \
285 tvals[3] > 59 or tvals[4] > 60 or tvals[6] > 9999:
286 bad = True
288 bads.append(bad)
290 if all(bads):
291 return False
293 else:
294 if type_code not in b'VAST':
295 return False
297 continuation_code = rec[7:8]
298 if continuation_code != b' ':
299 return False
301 blockette_type = rec[8:8+3].decode()
302 if not re.match(r'^\d\d\d$', blockette_type):
303 return False
305 try:
306 blockette_length = int(rec[11:11+4])
307 except Exception:
308 return False
310 if blockette_length < 7:
311 return False
313 return True