Coverage for /usr/local/lib/python3.11/dist-packages/pyrocko/io/sac.py: 92%

227 statements  

« prev     ^ index     » next       coverage.py v6.5.0, created at 2023-10-06 15:01 +0000

1# http://pyrocko.org - GPLv3 

2# 

3# The Pyrocko Developers, 21st Century 

4# ---|P------/S----------~Lg---------- 

5''' 

6`SAC 

7<http://ds.iris.edu/ds/nodes/dmc/software/downloads/sac/>`_ waveform file 

8input, output and data model. 

9''' 

10 

11import struct 

12import logging 

13import math 

14import numpy as num 

15 

16from calendar import timegm 

17from time import gmtime 

18 

19from pyrocko import trace, util 

20from pyrocko.util import reuse 

21from .io_common import FileLoadError 

22 

23logger = logging.getLogger('pyrocko.io.sac') 

24 

25 

26def fixdoublefloat(x): 

27 f = 10**math.floor(math.log10(x)) / 1000000. 

28 return round(x/f)*f 

29 

30 

31class SacError(Exception): 

32 pass 

33 

34 

35def nonetoempty(s): 

36 if s is None: 

37 return '' 

38 else: 

39 return s.strip() 

40 

41 

42class SacFile(object): 

43 nbytes_header = 632 

44 header_num_format = {'little': '<70f40i', 'big': '>70f40i'} 

45 

46 header_keys = ''' 

47delta depmin depmax scale odelta b e o a internal0 t0 t1 t2 t3 t4 t5 t6 t7 t8 

48t9 f resp0 resp1 resp2 resp3 resp4 resp5 resp6 resp7 resp8 resp9 stla stlo stel 

49stdp evla evlo evel evdp mag user0 user1 user2 user3 user4 user5 user6 user7 

50user8 user9 dist az baz gcarc internal1 internal2 depmen cmpaz cmpinc xminimum 

51xmaximum yminimum ymaximum unused0 unused1 unused2 unused3 unused4 unused5 

52unused6 nzyear nzjday nzhour nzmin nzsec nzmsec nvhdr norid nevid npts 

53internal3 nwfid nxsize nysize unused7 iftype idep iztype unused8 iinst istreg 

54ievreg ievtyp iqual isynth imagtyp imagsrc unused9 unused10 unused11 unused12 

55unused13 unused14 unused15 unused16 leven lpspol lovrok lcalda unused17 kstnm 

56kevnm khole ko ka kt0 kt1 kt2 kt3 kt4 kt5 kt6 kt7 kt8 kt9 kf kuser0 kuser1 

57kuser2 kcmpnm knetwk kdatrd kinst 

58'''.split() 

59 

60 header_enum_symbols = ''' 

61itime irlim iamph ixy iunkn idisp ivel iacc ib iday io ia it0 it1 it2 it3 it4 

62it5 it6 it7 it8 it9 iradnv itannv iradev itanev inorth ieast ihorza idown iup 

63illlbb iwwsn1 iwwsn2 ihglp isro inucl ipren ipostn iquake ipreq ipostq ichem 

64iother igood iglch idrop ilowsn irldta ivolts ixyz imb ims iml imw imd imx 

65ineic ipde iisc ireb iusgs ibrk icaltech illnl ievloc ijsop iuser iunknown iqb 

66iqb1 iqb2 iqbx iqmt ieq ieq1 ieq2 ime iex inu inc io_ il ir it iu 

67'''.split() 

68 

69 enum_header_vars = 'iftype idep iztype imagtype imagsrc ievtyp iqual ' \ 

70 'isynth'.split() 

71 

72 header_num2name = dict( 

73 [(a+1, b) for (a, b) in enumerate(header_enum_symbols)]) 

74 header_name2num = dict( 

75 [(b, a+1) for (a, b) in enumerate(header_enum_symbols)]) 

76 header_types = 'f'*70 + 'i'*35 + 'l'*5 + 'k'*23 

77 undefined_value = {'f': -12345.0, 'i': -12345, 'l': None, 'k': '-12345'} 

78 ldefaults = { 

79 'leven': 1, 'lpspol': 0, 'lovrok': 1, 'lcalda': 1, 'unused17': 0} 

80 

81 t_lookup = dict(zip(header_keys, header_types)) 

82 

83 u_lookup = dict() 

84 for k in header_keys: 

85 u_lookup[k] = undefined_value[t_lookup[k]] 

86 

87 def ndatablocks(self): 

88 ''' 

89 Get number of data blocks for this file's type. 

90 ''' 

91 nblocks = { 

92 'itime': 1, 'irlim': 2, 'iamph': 2, 'ixy': 2, 'ixyz': 3 

93 }[SacFile.header_num2name[self.iftype]] 

94 

95 if nblocks == 1 and not self.leven: 

96 nblocks = 2 # not sure about this... 

97 

98 return nblocks 

99 

100 def val_or_none(self, k, v): 

101 ''' 

102 Replace SAC undef flags with None. 

103 ''' 

104 if SacFile.u_lookup[k] == v: 

105 return None 

106 else: 

107 return v 

108 

109 def get_ref_time(self): 

110 ''' 

111 Get reference time as standard Unix timestamp. 

112 ''' 

113 

114 if None in (self.nzyear, self.nzjday, self.nzhour, self.nzmin, 

115 self.nzsec, self.nzmsec): 

116 raise SacError('Not all header values for reference time are set.') 

117 

118 return util.to_time_float(timegm( 

119 (self.nzyear, 1, self.nzjday, 

120 self.nzhour, self.nzmin, self.nzsec))) + self.nzmsec/1000. 

121 

122 def set_ref_time(self, timestamp): 

123 ''' 

124 Set all header values defining reference time based on standard Unix 

125 timestamp. 

126 ''' 

127 

128 secs = math.floor(timestamp) 

129 msec = int(round((timestamp-secs)*1000.)) 

130 if msec == 1000: 

131 secs += 1 

132 msec = 0 

133 

134 t = gmtime(secs) 

135 self.nzyear, self.nzjday, self.nzhour, self.nzmin, self.nzsec = \ 

136 t[0], t[7], t[3], t[4], t[5] 

137 self.nzmsec = msec 

138 

139 def val_for_file(self, k, v): 

140 ''' 

141 Convert attribute value to the form required when writing it to the 

142 SAC file. 

143 ''' 

144 

145 t = SacFile.t_lookup[k] 

146 if v is None: 

147 if t == 'l': 

148 return SacFile.ldefaults[k] 

149 v = SacFile.u_lookup[k] 

150 if t == 'f': 

151 return float(v) 

152 elif t == 'i': 

153 return int(v) 

154 elif t == 'l': 

155 if v: 

156 return 1 

157 return 0 

158 elif t == 'k': 

159 ln = 8 

160 if k == 'kevnm': 

161 ln = 16 # only this header val has different length 

162 return v.ljust(ln)[:ln] 

163 

164 def __init__(self, *args, **kwargs): 

165 if 'from_trace' in kwargs: 

166 self.clear() 

167 trace = kwargs['from_trace'] 

168 if trace.meta: 

169 for (k, v) in trace.meta.items(): 

170 if k in SacFile.header_keys: 

171 setattr(self, k, v) 

172 

173 self.knetwk = trace.network 

174 self.kstnm = trace.station 

175 self.khole = trace.location 

176 self.kcmpnm = trace.channel 

177 self.set_ref_time(trace.tmin) 

178 self.delta = trace.deltat 

179 self.data = [trace.ydata.copy()] 

180 self.npts = trace.ydata.size 

181 self.b = 0.0 

182 self.e = self.b + (self.npts-1)*self.delta 

183 

184 elif args: 

185 self.read(*args, **kwargs) 

186 else: 

187 self.clear() 

188 

189 def clear(self): 

190 ''' 

191 Empty file record. 

192 ''' 

193 

194 for k in SacFile.header_keys: 

195 self.__dict__[k] = None 

196 

197 # set the required attributes 

198 self.nvhdr = 6 

199 self.iftype = SacFile.header_name2num['itime'] 

200 self.leven = True 

201 self.delta = 1.0 

202 self.npts = 0 

203 self.b = 0.0 

204 self.e = 0.0 

205 self.data = [num.arange(0, dtype=num.float32)] 

206 

207 def check(self): 

208 ''' 

209 Check the required header variables to have reasonable values. 

210 ''' 

211 if self.iftype not in [SacFile.header_name2num[x] for x in 

212 ('itime', 'irlim', 'iamph', 'ixy', 'ixyz')]: 

213 raise SacError('Unknown SAC file type: %i.' % self.iftype) 

214 if self.nvhdr < 1 or 20 < self.nvhdr: 

215 raise SacError('Unreasonable SAC header version number found.') 

216 if self.npts < 0: 

217 raise SacError( 

218 'Negative number of samples specified in NPTS header.') 

219 if self.leven not in (0, 1, -12345): 

220 raise SacError('Header value LEVEN must be either 0 or 1.') 

221 if self.leven and self.delta <= 0.0: 

222 raise SacError( 

223 'Header value DELTA should be positive for evenly spaced ' 

224 'samples') 

225 if self.e is not None and self.b > self.e: 

226 raise SacError( 

227 'Beginning value of independent variable greater than its ' 

228 'ending value.') 

229 if self.nvhdr != 6: 

230 logging.warn( 

231 'This module has only been tested with SAC header version 6.' 

232 'This file has header version %i. ' 

233 'It might still work though...' % self.nvhdr) 

234 

235 def read(self, filename, load_data=True, byte_sex='try'): 

236 ''' 

237 Read SAC file. 

238 

239 filename -- Name of SAC file. 

240 load_data -- If True, the data is read, otherwise only read headers. 

241 byte_sex -- Endianness: 'try', 'little' or 'big' 

242 ''' 

243 

244 nbh = SacFile.nbytes_header 

245 

246 # read in all data 

247 with open(filename, 'rb') as f: 

248 if load_data: 

249 filedata = f.read() 

250 else: 

251 filedata = f.read(nbh) 

252 

253 if len(filedata) < nbh: 

254 raise SacError('File too short to be a SAC file: %s' % filename) 

255 

256 # possibly try out different endiannesses 

257 if byte_sex == 'try': 

258 sexes = ('little', 'big') 

259 else: 

260 sexes = (byte_sex,) 

261 

262 for isex, sex in enumerate(sexes): 

263 format = SacFile.header_num_format[sex] 

264 nbn = struct.calcsize(format) 

265 hv = list(struct.unpack(format, filedata[:nbn])) 

266 

267 strings = str(filedata[nbn:nbh].decode('ascii')) 

268 hv.append(strings[:8].rstrip(' \x00')) 

269 hv.append(strings[8:24].rstrip(' \x00')) 

270 for i in range(len(strings[24:])//8): 

271 hv.append(strings[24+i*8:24+(i+1)*8].rstrip(' \x00')) 

272 

273 self.header_vals = hv 

274 for k, v in zip(SacFile.header_keys, self.header_vals): 

275 vn = self.val_or_none(k, v) 

276 self.__dict__[k] = vn 

277 

278 if self.leven == -12345: 

279 self.leven = True 

280 

281 self.data = [] 

282 try: 

283 self.check() 

284 break 

285 except SacError as e: 

286 if isex == len(sexes)-1: 

287 raise e 

288 

289 self.delta = fixdoublefloat(self.delta) 

290 

291 if byte_sex == 'try': 

292 logger.debug( 

293 'This seems to be a %s endian SAC file: %s' % (sex, filename)) 

294 

295 # possibly get data 

296 if load_data: 

297 nblocks = self.ndatablocks() 

298 nbb = self.npts*4 # word length is always 4 bytes in sac files 

299 for iblock in range(nblocks): 

300 if len(filedata) < nbh+(iblock+1)*nbb: 

301 raise SacError('File is incomplete.') 

302 

303 if sex == 'big': 

304 dtype = num.dtype('>f4') 

305 else: 

306 dtype = num.dtype('<f4') 

307 

308 self.data.append(num.array(num.frombuffer( 

309 filedata[nbh+iblock*nbb:nbh+(iblock+1)*nbb], 

310 dtype=dtype), 

311 dtype=float)) 

312 

313 if len(filedata) > nbh+nblocks*nbb: 

314 logger.warning( 

315 'Unused data (%i bytes) at end of SAC file: %s (npts=%i)' 

316 % (len(filedata) - nbh+nblocks*nbb, filename, self.npts)) 

317 

318 def write(self, filename, byte_sex='little'): 

319 ''' 

320 Write SAC file. 

321 ''' 

322 

323 self.check() 

324 

325 # create header data 

326 format = SacFile.header_num_format[byte_sex] 

327 numerical_values = [] 

328 string_values = [] 

329 for k in SacFile.header_keys: 

330 v = self.__dict__[k] 

331 vv = self.val_for_file(k, v) 

332 if SacFile.t_lookup[k] == 'k': 

333 string_values.append(vv) 

334 else: 

335 numerical_values.append(vv) 

336 

337 header_data = struct.pack(format, *numerical_values) 

338 header_data += bytes(''.join(string_values).encode('ascii')) 

339 

340 # check that enough data is available 

341 nblocks = self.ndatablocks() 

342 if len(self.data) != nblocks: 

343 raise SacError( 

344 'Need %i data blocks for file type %s.' 

345 % (nblocks, SacFile.header_num2name[self.iftype])) 

346 

347 for fdata in self.data: 

348 if len(fdata) != self.npts: 

349 raise SacError( 

350 'Data length (%i) does not match NPTS header value (%i)' 

351 % (len(fdata), self.npts)) 

352 

353 # dump data to file 

354 with open(filename, 'wb') as f: 

355 f.write(header_data) 

356 for fdata in self.data: 

357 f.write(fdata.astype(num.float32).tobytes()) 

358 

359 def __str__(self): 

360 str = '' 

361 for k in SacFile.header_keys: 

362 v = self.__dict__[k] 

363 if v is not None: 

364 if k in SacFile.enum_header_vars: 

365 if v in SacFile.header_num2name: 

366 v = SacFile.header_num2name[v] 

367 str += '%s: %s\n' % (k, v) 

368 

369 return str 

370 

371 def to_trace(self): 

372 

373 assert self.iftype == SacFile.header_name2num['itime'] 

374 assert self.leven 

375 

376 tmin = self.get_ref_time() + self.b 

377 tmax = tmin + self.delta*(self.npts-1) 

378 

379 data = None 

380 if self.data: 

381 data = self.data[0] 

382 

383 meta = {} 

384 exclude = ('b', 'e', 'knetwk', 'kstnm', 'khole', 'kcmpnm', 'delta', 

385 'nzyear', 'nzjday', 'nzhour', 'nzmin', 'nzsec', 'nzmsec') 

386 

387 for k in SacFile.header_keys: 

388 if k in exclude: 

389 continue 

390 v = self.__dict__[k] 

391 if v is not None: 

392 meta[reuse(k)] = v 

393 

394 return trace.Trace( 

395 nonetoempty(self.knetwk)[:2], 

396 nonetoempty(self.kstnm)[:5], 

397 nonetoempty(self.khole)[:2], 

398 nonetoempty(self.kcmpnm)[:3], 

399 tmin, 

400 tmax, 

401 self.delta, 

402 data, 

403 meta=meta) 

404 

405 

406def iload(filename, load_data=True): 

407 

408 try: 

409 sacf = SacFile(filename, load_data=load_data) 

410 tr = sacf.to_trace() 

411 yield tr 

412 

413 except (OSError, SacError) as e: 

414 raise FileLoadError(e) 

415 

416 

417def detect(first512): 

418 

419 if len(first512) < 512: # SAC header is 632 bytes long 

420 return False 

421 

422 for sex in 'little', 'big': 

423 format = SacFile.header_num_format[sex] 

424 nbn = struct.calcsize(format) 

425 

426 hv = list(struct.unpack(format, first512[:nbn])) 

427 iftype, nvhdr, npts, leven, delta, e, b = [ 

428 hv[i] for i in (85, 76, 79, 105, 0, 6, 5)] 

429 

430 if iftype not in [SacFile.header_name2num[x] for x in ( 

431 'itime', 'irlim', 'iamph', 'ixy', 'ixyz')]: 

432 continue 

433 if nvhdr < 1 or 20 < nvhdr: 

434 continue 

435 if npts < 0: 

436 continue 

437 if leven not in (0, 1, -12345): 

438 continue 

439 if leven and delta <= 0.0: 

440 continue 

441 if e != -12345.0 and b > e: 

442 continue 

443 

444 return True 

445 

446 return False