Coverage for /usr/local/lib/python3.11/dist-packages/pyrocko/io/mseed.py: 83%

126 statements  

« prev     ^ index     » next       coverage.py v6.5.0, created at 2023-10-04 09:52 +0000

1# http://pyrocko.org - GPLv3 

2# 

3# The Pyrocko Developers, 21st Century 

4# ---|P------/S----------~Lg---------- 

5 

6''' 

7Read/write MiniSEED files (wraps `libmseed 

8<https://github.com/EarthScope/libmseed>`_). 

9''' 

10 

11 

12from struct import unpack 

13import os 

14import re 

15import math 

16import logging 

17 

18from pyrocko import trace 

19from pyrocko.util import reuse, ensuredirs 

20from .io_common import FileLoadError, FileSaveError 

21 

22logger = logging.getLogger('pyrocko.io.mseed') 

23 

24MSEED_HEADER_BYTES = 64 

25VALID_RECORD_LENGTHS = tuple(2**exp for exp in range(8, 20)) 

26 

27 

28class CodeTooLong(FileSaveError): 

29 pass 

30 

31 

32def iload(filename, load_data=True, offset=0, segment_size=0, nsegments=0): 

33 from pyrocko import mseed_ext 

34 

35 have_zero_rate_traces = False 

36 try: 

37 isegment = 0 

38 while isegment < nsegments or nsegments == 0: 

39 tr_tuples = mseed_ext.get_traces( 

40 filename, load_data, offset, segment_size) 

41 

42 if not tr_tuples: 

43 break 

44 

45 for tr_tuple in tr_tuples: 

46 network, station, location, channel = tr_tuple[1:5] 

47 tmin = float(tr_tuple[5])/float(mseed_ext.HPTMODULUS) 

48 tmax = float(tr_tuple[6])/float(mseed_ext.HPTMODULUS) 

49 try: 

50 deltat = reuse(1.0/float(tr_tuple[7])) 

51 except ZeroDivisionError: 

52 have_zero_rate_traces = True 

53 continue 

54 

55 ydata = tr_tuple[8] 

56 

57 tr = trace.Trace( 

58 network.strip(), 

59 station.strip(), 

60 location.strip(), 

61 channel.strip(), 

62 tmin, 

63 tmax, 

64 deltat, 

65 ydata) 

66 

67 tr.meta = { 

68 'offset_start': offset, 

69 'offset_end': tr_tuple[9], 

70 'last': tr_tuple[10], 

71 'segment_size': segment_size 

72 } 

73 

74 yield tr 

75 

76 if tr_tuple[10]: 

77 break 

78 

79 offset = tr_tuple[9] 

80 isegment += 1 

81 

82 except (OSError, mseed_ext.MSeedError) as e: 

83 raise FileLoadError(str(e)+' (file: %s)' % filename) 

84 

85 if have_zero_rate_traces: 

86 logger.warning( 

87 'Ignoring traces with sampling rate of zero in file %s ' 

88 '(maybe LOG traces)' % filename) 

89 

90 

91def as_tuple(tr, dataquality='D'): 

92 from pyrocko import mseed_ext 

93 itmin = int(round(tr.tmin*mseed_ext.HPTMODULUS)) 

94 itmax = int(round(tr.tmax*mseed_ext.HPTMODULUS)) 

95 srate = 1.0/tr.deltat 

96 return (tr.network, tr.station, tr.location, tr.channel, 

97 itmin, itmax, srate, dataquality, tr.get_ydata()) 

98 

99 

100def save(traces, filename_template, additional={}, overwrite=True, 

101 dataquality='D', record_length=4096, append=False, steim=1): 

102 from pyrocko import mseed_ext 

103 

104 assert record_length in VALID_RECORD_LENGTHS 

105 assert dataquality in ('D', 'E', 'C', 'O', 'T', 'L'), 'invalid dataquality' 

106 overwrite = True if append else overwrite 

107 

108 fn_tr = {} 

109 for tr in traces: 

110 for code, maxlen, val in zip( 

111 ['network', 'station', 'location', 'channel'], 

112 [2, 5, 2, 3], 

113 tr.nslc_id): 

114 

115 if len(val) > maxlen: 

116 raise CodeTooLong( 

117 '%s code too long to be stored in MSeed file: %s' % 

118 (code, val)) 

119 

120 fn = tr.fill_template(filename_template, **additional) 

121 if not overwrite and os.path.exists(fn): 

122 raise FileSaveError('File exists: %s' % fn) 

123 

124 if fn not in fn_tr: 

125 fn_tr[fn] = [] 

126 

127 fn_tr[fn].append(tr) 

128 

129 for fn, traces_thisfile in fn_tr.items(): 

130 trtups = [] 

131 traces_thisfile.sort(key=lambda a: a.full_id) 

132 for tr in traces_thisfile: 

133 trtups.append(as_tuple(tr, dataquality)) 

134 

135 ensuredirs(fn) 

136 try: 

137 mseed_ext.store_traces(trtups, fn, record_length, append, steim) 

138 except mseed_ext.MSeedError as e: 

139 raise FileSaveError( 

140 str(e) + " (while storing traces to file '%s')" % fn) 

141 

142 return list(fn_tr.keys()) 

143 

144 

145tcs = {} 

146 

147 

148def get_bytes(traces, dataquality='D', record_length=4096, steim=1): 

149 from pyrocko import mseed_ext 

150 

151 assert record_length in VALID_RECORD_LENGTHS 

152 assert dataquality in ('D', 'E', 'C', 'O', 'T', 'L'), 'invalid dataquality' 

153 

154 nbytes_approx = 0 

155 rl = record_length 

156 trtups = [] 

157 for tr in traces: 

158 for code, maxlen, val in zip( 

159 ['network', 'station', 'location', 'channel'], 

160 [2, 5, 2, 3], 

161 tr.nslc_id): 

162 

163 if len(val) > maxlen: 

164 raise CodeTooLong( 

165 '%s code too long to be stored in MSeed file: %s' % 

166 (code, val)) 

167 

168 nbytes_approx += math.ceil( 

169 tr.ydata.nbytes / (rl-MSEED_HEADER_BYTES)) * rl 

170 trtups.append(as_tuple(tr, dataquality)) 

171 

172 return mseed_ext.mseed_bytes(trtups, nbytes_approx, record_length, steim) 

173 

174 

175def detect(first512): 

176 

177 if len(first512) < 256: 

178 return False 

179 

180 rec = first512 

181 

182 try: 

183 sequence_number = int(rec[:6]) 

184 except Exception: 

185 return False 

186 if sequence_number < 0: 

187 return False 

188 

189 type_code = rec[6:7] 

190 if type_code in b'DRQM': 

191 bads = [] 

192 for sex in '<>': 

193 bad = False 

194 fmt = sex + '6s1s1s5s2s3s2s10sH2h4Bl2H' 

195 vals = unpack(fmt, rec[:48]) 

196 fmt_btime = sex + 'HHBBBBH' 

197 tvals = unpack(fmt_btime, vals[7]) 

198 if tvals[1] < 1 or tvals[1] > 367 or tvals[2] > 23 or \ 

199 tvals[3] > 59 or tvals[4] > 60 or tvals[6] > 9999: 

200 bad = True 

201 

202 bads.append(bad) 

203 

204 if all(bads): 

205 return False 

206 

207 else: 

208 if type_code not in b'VAST': 

209 return False 

210 

211 continuation_code = rec[7:8] 

212 if continuation_code != b' ': 

213 return False 

214 

215 blockette_type = rec[8:8+3].decode() 

216 if not re.match(r'^\d\d\d$', blockette_type): 

217 return False 

218 

219 try: 

220 blockette_length = int(rec[11:11+4]) 

221 except Exception: 

222 return False 

223 

224 if blockette_length < 7: 

225 return False 

226 

227 return True