Coverage for /usr/local/lib/python3.11/dist-packages/pyrocko/io/sac.py: 92%
227 statements
« prev ^ index » next coverage.py v6.5.0, created at 2023-10-04 09:52 +0000
« prev ^ index » next coverage.py v6.5.0, created at 2023-10-04 09:52 +0000
1# http://pyrocko.org - GPLv3
2#
3# The Pyrocko Developers, 21st Century
4# ---|P------/S----------~Lg----------
5'''
6`SAC
7<http://ds.iris.edu/ds/nodes/dmc/software/downloads/sac/>`_ waveform file
8input, output and data model.
9'''
11import struct
12import logging
13import math
14import numpy as num
16from calendar import timegm
17from time import gmtime
19from pyrocko import trace, util
20from pyrocko.util import reuse
21from .io_common import FileLoadError
23logger = logging.getLogger('pyrocko.io.sac')
26def fixdoublefloat(x):
27 f = 10**math.floor(math.log10(x)) / 1000000.
28 return round(x/f)*f
31class SacError(Exception):
32 pass
35def nonetoempty(s):
36 if s is None:
37 return ''
38 else:
39 return s.strip()
42class SacFile(object):
43 nbytes_header = 632
44 header_num_format = {'little': '<70f40i', 'big': '>70f40i'}
46 header_keys = '''
47delta depmin depmax scale odelta b e o a internal0 t0 t1 t2 t3 t4 t5 t6 t7 t8
48t9 f resp0 resp1 resp2 resp3 resp4 resp5 resp6 resp7 resp8 resp9 stla stlo stel
49stdp evla evlo evel evdp mag user0 user1 user2 user3 user4 user5 user6 user7
50user8 user9 dist az baz gcarc internal1 internal2 depmen cmpaz cmpinc xminimum
51xmaximum yminimum ymaximum unused0 unused1 unused2 unused3 unused4 unused5
52unused6 nzyear nzjday nzhour nzmin nzsec nzmsec nvhdr norid nevid npts
53internal3 nwfid nxsize nysize unused7 iftype idep iztype unused8 iinst istreg
54ievreg ievtyp iqual isynth imagtyp imagsrc unused9 unused10 unused11 unused12
55unused13 unused14 unused15 unused16 leven lpspol lovrok lcalda unused17 kstnm
56kevnm khole ko ka kt0 kt1 kt2 kt3 kt4 kt5 kt6 kt7 kt8 kt9 kf kuser0 kuser1
57kuser2 kcmpnm knetwk kdatrd kinst
58'''.split()
60 header_enum_symbols = '''
61itime irlim iamph ixy iunkn idisp ivel iacc ib iday io ia it0 it1 it2 it3 it4
62it5 it6 it7 it8 it9 iradnv itannv iradev itanev inorth ieast ihorza idown iup
63illlbb iwwsn1 iwwsn2 ihglp isro inucl ipren ipostn iquake ipreq ipostq ichem
64iother igood iglch idrop ilowsn irldta ivolts ixyz imb ims iml imw imd imx
65ineic ipde iisc ireb iusgs ibrk icaltech illnl ievloc ijsop iuser iunknown iqb
66iqb1 iqb2 iqbx iqmt ieq ieq1 ieq2 ime iex inu inc io_ il ir it iu
67'''.split()
69 enum_header_vars = 'iftype idep iztype imagtype imagsrc ievtyp iqual ' \
70 'isynth'.split()
72 header_num2name = dict(
73 [(a+1, b) for (a, b) in enumerate(header_enum_symbols)])
74 header_name2num = dict(
75 [(b, a+1) for (a, b) in enumerate(header_enum_symbols)])
76 header_types = 'f'*70 + 'i'*35 + 'l'*5 + 'k'*23
77 undefined_value = {'f': -12345.0, 'i': -12345, 'l': None, 'k': '-12345'}
78 ldefaults = {
79 'leven': 1, 'lpspol': 0, 'lovrok': 1, 'lcalda': 1, 'unused17': 0}
81 t_lookup = dict(zip(header_keys, header_types))
83 u_lookup = dict()
84 for k in header_keys:
85 u_lookup[k] = undefined_value[t_lookup[k]]
87 def ndatablocks(self):
88 '''
89 Get number of data blocks for this file's type.
90 '''
91 nblocks = {
92 'itime': 1, 'irlim': 2, 'iamph': 2, 'ixy': 2, 'ixyz': 3
93 }[SacFile.header_num2name[self.iftype]]
95 if nblocks == 1 and not self.leven:
96 nblocks = 2 # not sure about this...
98 return nblocks
100 def val_or_none(self, k, v):
101 '''
102 Replace SAC undef flags with None.
103 '''
104 if SacFile.u_lookup[k] == v:
105 return None
106 else:
107 return v
109 def get_ref_time(self):
110 '''
111 Get reference time as standard Unix timestamp.
112 '''
114 if None in (self.nzyear, self.nzjday, self.nzhour, self.nzmin,
115 self.nzsec, self.nzmsec):
116 raise SacError('Not all header values for reference time are set.')
118 return util.to_time_float(timegm(
119 (self.nzyear, 1, self.nzjday,
120 self.nzhour, self.nzmin, self.nzsec))) + self.nzmsec/1000.
122 def set_ref_time(self, timestamp):
123 '''
124 Set all header values defining reference time based on standard Unix
125 timestamp.
126 '''
128 secs = math.floor(timestamp)
129 msec = int(round((timestamp-secs)*1000.))
130 if msec == 1000:
131 secs += 1
132 msec = 0
134 t = gmtime(secs)
135 self.nzyear, self.nzjday, self.nzhour, self.nzmin, self.nzsec = \
136 t[0], t[7], t[3], t[4], t[5]
137 self.nzmsec = msec
139 def val_for_file(self, k, v):
140 '''
141 Convert attribute value to the form required when writing it to the
142 SAC file.
143 '''
145 t = SacFile.t_lookup[k]
146 if v is None:
147 if t == 'l':
148 return SacFile.ldefaults[k]
149 v = SacFile.u_lookup[k]
150 if t == 'f':
151 return float(v)
152 elif t == 'i':
153 return int(v)
154 elif t == 'l':
155 if v:
156 return 1
157 return 0
158 elif t == 'k':
159 ln = 8
160 if k == 'kevnm':
161 ln = 16 # only this header val has different length
162 return v.ljust(ln)[:ln]
164 def __init__(self, *args, **kwargs):
165 if 'from_trace' in kwargs:
166 self.clear()
167 trace = kwargs['from_trace']
168 if trace.meta:
169 for (k, v) in trace.meta.items():
170 if k in SacFile.header_keys:
171 setattr(self, k, v)
173 self.knetwk = trace.network
174 self.kstnm = trace.station
175 self.khole = trace.location
176 self.kcmpnm = trace.channel
177 self.set_ref_time(trace.tmin)
178 self.delta = trace.deltat
179 self.data = [trace.ydata.copy()]
180 self.npts = trace.ydata.size
181 self.b = 0.0
182 self.e = self.b + (self.npts-1)*self.delta
184 elif args:
185 self.read(*args, **kwargs)
186 else:
187 self.clear()
189 def clear(self):
190 '''
191 Empty file record.
192 '''
194 for k in SacFile.header_keys:
195 self.__dict__[k] = None
197 # set the required attributes
198 self.nvhdr = 6
199 self.iftype = SacFile.header_name2num['itime']
200 self.leven = True
201 self.delta = 1.0
202 self.npts = 0
203 self.b = 0.0
204 self.e = 0.0
205 self.data = [num.arange(0, dtype=num.float32)]
207 def check(self):
208 '''
209 Check the required header variables to have reasonable values.
210 '''
211 if self.iftype not in [SacFile.header_name2num[x] for x in
212 ('itime', 'irlim', 'iamph', 'ixy', 'ixyz')]:
213 raise SacError('Unknown SAC file type: %i.' % self.iftype)
214 if self.nvhdr < 1 or 20 < self.nvhdr:
215 raise SacError('Unreasonable SAC header version number found.')
216 if self.npts < 0:
217 raise SacError(
218 'Negative number of samples specified in NPTS header.')
219 if self.leven not in (0, 1, -12345):
220 raise SacError('Header value LEVEN must be either 0 or 1.')
221 if self.leven and self.delta <= 0.0:
222 raise SacError(
223 'Header value DELTA should be positive for evenly spaced '
224 'samples')
225 if self.e is not None and self.b > self.e:
226 raise SacError(
227 'Beginning value of independent variable greater than its '
228 'ending value.')
229 if self.nvhdr != 6:
230 logging.warn(
231 'This module has only been tested with SAC header version 6.'
232 'This file has header version %i. '
233 'It might still work though...' % self.nvhdr)
235 def read(self, filename, load_data=True, byte_sex='try'):
236 '''
237 Read SAC file.
239 filename -- Name of SAC file.
240 load_data -- If True, the data is read, otherwise only read headers.
241 byte_sex -- Endianness: 'try', 'little' or 'big'
242 '''
244 nbh = SacFile.nbytes_header
246 # read in all data
247 with open(filename, 'rb') as f:
248 if load_data:
249 filedata = f.read()
250 else:
251 filedata = f.read(nbh)
253 if len(filedata) < nbh:
254 raise SacError('File too short to be a SAC file: %s' % filename)
256 # possibly try out different endiannesses
257 if byte_sex == 'try':
258 sexes = ('little', 'big')
259 else:
260 sexes = (byte_sex,)
262 for isex, sex in enumerate(sexes):
263 format = SacFile.header_num_format[sex]
264 nbn = struct.calcsize(format)
265 hv = list(struct.unpack(format, filedata[:nbn]))
267 strings = str(filedata[nbn:nbh].decode('ascii'))
268 hv.append(strings[:8].rstrip(' \x00'))
269 hv.append(strings[8:24].rstrip(' \x00'))
270 for i in range(len(strings[24:])//8):
271 hv.append(strings[24+i*8:24+(i+1)*8].rstrip(' \x00'))
273 self.header_vals = hv
274 for k, v in zip(SacFile.header_keys, self.header_vals):
275 vn = self.val_or_none(k, v)
276 self.__dict__[k] = vn
278 if self.leven == -12345:
279 self.leven = True
281 self.data = []
282 try:
283 self.check()
284 break
285 except SacError as e:
286 if isex == len(sexes)-1:
287 raise e
289 self.delta = fixdoublefloat(self.delta)
291 if byte_sex == 'try':
292 logger.debug(
293 'This seems to be a %s endian SAC file: %s' % (sex, filename))
295 # possibly get data
296 if load_data:
297 nblocks = self.ndatablocks()
298 nbb = self.npts*4 # word length is always 4 bytes in sac files
299 for iblock in range(nblocks):
300 if len(filedata) < nbh+(iblock+1)*nbb:
301 raise SacError('File is incomplete.')
303 if sex == 'big':
304 dtype = num.dtype('>f4')
305 else:
306 dtype = num.dtype('<f4')
308 self.data.append(num.array(num.frombuffer(
309 filedata[nbh+iblock*nbb:nbh+(iblock+1)*nbb],
310 dtype=dtype),
311 dtype=float))
313 if len(filedata) > nbh+nblocks*nbb:
314 logger.warning(
315 'Unused data (%i bytes) at end of SAC file: %s (npts=%i)'
316 % (len(filedata) - nbh+nblocks*nbb, filename, self.npts))
318 def write(self, filename, byte_sex='little'):
319 '''
320 Write SAC file.
321 '''
323 self.check()
325 # create header data
326 format = SacFile.header_num_format[byte_sex]
327 numerical_values = []
328 string_values = []
329 for k in SacFile.header_keys:
330 v = self.__dict__[k]
331 vv = self.val_for_file(k, v)
332 if SacFile.t_lookup[k] == 'k':
333 string_values.append(vv)
334 else:
335 numerical_values.append(vv)
337 header_data = struct.pack(format, *numerical_values)
338 header_data += bytes(''.join(string_values).encode('ascii'))
340 # check that enough data is available
341 nblocks = self.ndatablocks()
342 if len(self.data) != nblocks:
343 raise SacError(
344 'Need %i data blocks for file type %s.'
345 % (nblocks, SacFile.header_num2name[self.iftype]))
347 for fdata in self.data:
348 if len(fdata) != self.npts:
349 raise SacError(
350 'Data length (%i) does not match NPTS header value (%i)'
351 % (len(fdata), self.npts))
353 # dump data to file
354 with open(filename, 'wb') as f:
355 f.write(header_data)
356 for fdata in self.data:
357 f.write(fdata.astype(num.float32).tobytes())
359 def __str__(self):
360 str = ''
361 for k in SacFile.header_keys:
362 v = self.__dict__[k]
363 if v is not None:
364 if k in SacFile.enum_header_vars:
365 if v in SacFile.header_num2name:
366 v = SacFile.header_num2name[v]
367 str += '%s: %s\n' % (k, v)
369 return str
371 def to_trace(self):
373 assert self.iftype == SacFile.header_name2num['itime']
374 assert self.leven
376 tmin = self.get_ref_time() + self.b
377 tmax = tmin + self.delta*(self.npts-1)
379 data = None
380 if self.data:
381 data = self.data[0]
383 meta = {}
384 exclude = ('b', 'e', 'knetwk', 'kstnm', 'khole', 'kcmpnm', 'delta',
385 'nzyear', 'nzjday', 'nzhour', 'nzmin', 'nzsec', 'nzmsec')
387 for k in SacFile.header_keys:
388 if k in exclude:
389 continue
390 v = self.__dict__[k]
391 if v is not None:
392 meta[reuse(k)] = v
394 return trace.Trace(
395 nonetoempty(self.knetwk)[:2],
396 nonetoempty(self.kstnm)[:5],
397 nonetoempty(self.khole)[:2],
398 nonetoempty(self.kcmpnm)[:3],
399 tmin,
400 tmax,
401 self.delta,
402 data,
403 meta=meta)
406def iload(filename, load_data=True):
408 try:
409 sacf = SacFile(filename, load_data=load_data)
410 tr = sacf.to_trace()
411 yield tr
413 except (OSError, SacError) as e:
414 raise FileLoadError(e)
417def detect(first512):
419 if len(first512) < 512: # SAC header is 632 bytes long
420 return False
422 for sex in 'little', 'big':
423 format = SacFile.header_num_format[sex]
424 nbn = struct.calcsize(format)
426 hv = list(struct.unpack(format, first512[:nbn]))
427 iftype, nvhdr, npts, leven, delta, e, b = [
428 hv[i] for i in (85, 76, 79, 105, 0, 6, 5)]
430 if iftype not in [SacFile.header_name2num[x] for x in (
431 'itime', 'irlim', 'iamph', 'ixy', 'ixyz')]:
432 continue
433 if nvhdr < 1 or 20 < nvhdr:
434 continue
435 if npts < 0:
436 continue
437 if leven not in (0, 1, -12345):
438 continue
439 if leven and delta <= 0.0:
440 continue
441 if e != -12345.0 and b > e:
442 continue
444 return True
446 return False