Source code for pyrocko.io.sac

# http://pyrocko.org - GPLv3
#
# The Pyrocko Developers, 21st Century
# ---|P------/S----------~Lg----------
'''SAC IO library for Python'''
from __future__ import absolute_import

import struct
import logging
import math
import numpy as num

from calendar import timegm
from time import gmtime

from pyrocko import trace
from pyrocko.util import reuse
from .io_common import FileLoadError

logger = logging.getLogger('pyrocko.io.sac')


def fixdoublefloat(x):
    f = 10**math.floor(math.log10(x)) / 1000000.
    return round(x/f)*f


[docs]class SacError(Exception): pass
def nonetoempty(s): if s is None: return '' else: return s.strip() class SacFile(object): nbytes_header = 632 header_num_format = {'little': '<70f40i', 'big': '>70f40i'} header_keys = ''' delta depmin depmax scale odelta b e o a internal0 t0 t1 t2 t3 t4 t5 t6 t7 t8 t9 f resp0 resp1 resp2 resp3 resp4 resp5 resp6 resp7 resp8 resp9 stla stlo stel stdp evla evlo evel evdp mag user0 user1 user2 user3 user4 user5 user6 user7 user8 user9 dist az baz gcarc internal1 internal2 depmen cmpaz cmpinc xminimum xmaximum yminimum ymaximum unused0 unused1 unused2 unused3 unused4 unused5 unused6 nzyear nzjday nzhour nzmin nzsec nzmsec nvhdr norid nevid npts internal3 nwfid nxsize nysize unused7 iftype idep iztype unused8 iinst istreg ievreg ievtyp iqual isynth imagtyp imagsrc unused9 unused10 unused11 unused12 unused13 unused14 unused15 unused16 leven lpspol lovrok lcalda unused17 kstnm kevnm khole ko ka kt0 kt1 kt2 kt3 kt4 kt5 kt6 kt7 kt8 kt9 kf kuser0 kuser1 kuser2 kcmpnm knetwk kdatrd kinst '''.split() header_enum_symbols = ''' itime irlim iamph ixy iunkn idisp ivel iacc ib iday io ia it0 it1 it2 it3 it4 it5 it6 it7 it8 it9 iradnv itannv iradev itanev inorth ieast ihorza idown iup illlbb iwwsn1 iwwsn2 ihglp isro inucl ipren ipostn iquake ipreq ipostq ichem iother igood iglch idrop ilowsn irldta ivolts ixyz imb ims iml imw imd imx ineic ipde iisc ireb iusgs ibrk icaltech illnl ievloc ijsop iuser iunknown iqb iqb1 iqb2 iqbx iqmt ieq ieq1 ieq2 ime iex inu inc io_ il ir it iu '''.split() enum_header_vars = 'iftype idep iztype imagtype imagsrc ievtyp iqual ' \ 'isynth'.split() header_num2name = dict( [(a+1, b) for (a, b) in enumerate(header_enum_symbols)]) header_name2num = dict( [(b, a+1) for (a, b) in enumerate(header_enum_symbols)]) header_types = 'f'*70 + 'i'*35 + 'l'*5 + 'k'*23 undefined_value = {'f': -12345.0, 'i': -12345, 'l': None, 'k': '-12345'} ldefaults = { 'leven': 1, 'lpspol': 0, 'lovrok': 1, 'lcalda': 1, 'unused17': 0} t_lookup = dict(zip(header_keys, header_types)) u_lookup = dict() for k in header_keys: u_lookup[k] = undefined_value[t_lookup[k]] def ndatablocks(self): '''Get number of data blocks for this file's type.''' nblocks = { 'itime': 1, 'irlim': 2, 'iamph': 2, 'ixy': 2, 'ixyz': 3 }[SacFile.header_num2name[self.iftype]] if nblocks == 1 and not self.leven: nblocks = 2 # not sure about this... return nblocks def val_or_none(self, k, v): '''Replace SAC undef flags with None.''' if SacFile.u_lookup[k] == v: return None else: return v def get_ref_time(self): '''Get reference time as standard Unix timestamp.''' if None in (self.nzyear, self.nzjday, self.nzhour, self.nzmin, self.nzsec, self.nzmsec): raise SacError('Not all header values for reference time are set.') return timegm((self.nzyear, 1, self.nzjday, self.nzhour, self.nzmin, self.nzsec)) + self.nzmsec/1000. def set_ref_time(self, timestamp): ''' Set all header values defining reference time based on standard Unix timestamp. ''' secs = math.floor(timestamp) msec = int(round((timestamp-secs)*1000.)) if msec == 1000: secs += 1 msec = 0 t = gmtime(secs) self.nzyear, self.nzjday, self.nzhour, self.nzmin, self.nzsec = \ t[0], t[7], t[3], t[4], t[5] self.nzmsec = msec def val_for_file(self, k, v): ''' Convert attribute value to the form required when writing it to the SAC file. ''' t = SacFile.t_lookup[k] if v is None: if t == 'l': return SacFile.ldefaults[k] v = SacFile.u_lookup[k] if t == 'f': return float(v) elif t == 'i': return int(v) elif t == 'l': if v: return 1 return 0 elif t == 'k': ln = 8 if k == 'kevnm': ln = 16 # only this header val has different length return v.ljust(ln)[:ln] def __init__(self, *args, **kwargs): if 'from_trace' in kwargs: self.clear() trace = kwargs['from_trace'] if trace.meta: for (k, v) in trace.meta.items(): if k in SacFile.header_keys: setattr(self, k, v) self.knetwk = trace.network self.kstnm = trace.station self.khole = trace.location self.kcmpnm = trace.channel self.set_ref_time(trace.tmin) self.delta = trace.deltat self.data = [trace.ydata.copy()] self.npts = trace.ydata.size self.b = 0.0 self.e = self.b + (self.npts-1)*self.delta elif args: self.read(*args, **kwargs) else: self.clear() def clear(self): '''Empty file record.''' for k in SacFile.header_keys: self.__dict__[k] = None # set the required attributes self.nvhdr = 6 self.iftype = SacFile.header_name2num['itime'] self.leven = True self.delta = 1.0 self.npts = 0 self.b = 0.0 self.e = 0.0 self.data = [num.arange(0, dtype=num.float32)] def check(self): '''Check the required header variables to have reasonable values.''' if self.iftype not in [SacFile.header_name2num[x] for x in ('itime', 'irlim', 'iamph', 'ixy', 'ixyz')]: raise SacError('Unknown SAC file type: %i.' % self.iftype) if self.nvhdr < 1 or 20 < self.nvhdr: raise SacError('Unreasonable SAC header version number found.') if self.npts < 0: raise SacError( 'Negative number of samples specified in NPTS header.') if self.leven not in (0, 1, -12345): raise SacError('Header value LEVEN must be either 0 or 1.') if self.leven and self.delta <= 0.0: raise SacError( 'Header value DELTA should be positive for evenly spaced ' 'samples') if self.e is not None and self.b > self.e: raise SacError( 'Beginning value of independent variable greater than its ' 'ending value.') if self.nvhdr != 6: logging.warn( 'This module has only been tested with SAC header version 6.' 'This file has header version %i. ' 'It might still work though...' % self.nvhdr) def read(self, filename, load_data=True, byte_sex='try'): ''' Read SAC file. filename -- Name of SAC file. load_data -- If True, the data is read, otherwise only read headers. byte_sex -- Endianness: 'try', 'little' or 'big' ''' nbh = SacFile.nbytes_header # read in all data with open(filename, 'rb') as f: if load_data: filedata = f.read() else: filedata = f.read(nbh) if len(filedata) < nbh: raise SacError('File too short to be a SAC file: %s' % filename) # possibly try out different endiannesses if byte_sex == 'try': sexes = ('little', 'big') else: sexes = (byte_sex,) for isex, sex in enumerate(sexes): format = SacFile.header_num_format[sex] nbn = struct.calcsize(format) hv = list(struct.unpack(format, filedata[:nbn])) strings = str(filedata[nbn:nbh].decode('ascii')) hv.append(strings[:8].rstrip()) hv.append(strings[8:24].rstrip()) for i in range(len(strings[24:])//8): hv.append(strings[24+i*8:24+(i+1)*8].rstrip()) self.header_vals = hv for k, v in zip(SacFile.header_keys, self.header_vals): vn = self.val_or_none(k, v) if isinstance(vn, str): ipos = vn.find('\x00') if ipos != -1: vn = vn[:ipos] self.__dict__[k] = vn.rstrip() else: self.__dict__[k] = vn if self.leven == -12345: self.leven = True self.data = [] try: self.check() break except SacError as e: if isex == len(sexes)-1: raise e self.delta = fixdoublefloat(self.delta) if byte_sex == 'try': logger.debug( 'This seems to be a %s endian SAC file: %s' % (sex, filename)) # possibly get data if load_data: nblocks = self.ndatablocks() nbb = self.npts*4 # word length is always 4 bytes in sac files for iblock in range(nblocks): if len(filedata) < nbh+(iblock+1)*nbb: raise SacError('File is incomplete.') if sex == 'big': dtype = num.dtype('>f4') else: dtype = num.dtype('<f4') self.data.append(num.array(num.fromstring( filedata[nbh+iblock*nbb:nbh+(iblock+1)*nbb], dtype=dtype), dtype=num.float)) if len(filedata) > nbh+nblocks*nbb: logger.warning( 'Unused data (%i bytes) at end of SAC file: %s (npts=%i)' % (len(filedata) - nbh+nblocks*nbb, filename, self.npts)) def write(self, filename, byte_sex='little'): '''Write SAC file.''' self.check() # create header data format = SacFile.header_num_format[byte_sex] numerical_values = [] string_values = [] for k in SacFile.header_keys: v = self.__dict__[k] vv = self.val_for_file(k, v) if SacFile.t_lookup[k] == 'k': string_values.append(vv) else: numerical_values.append(vv) header_data = struct.pack(format, *numerical_values) header_data += bytes(''.join(string_values).encode('ascii')) # check that enough data is available nblocks = self.ndatablocks() if len(self.data) != nblocks: raise SacError( 'Need %i data blocks for file type %s.' % (nblocks, SacFile.header_num2name[self.iftype])) for fdata in self.data: if len(fdata) != self.npts: raise SacError( 'Data length (%i) does not match NPTS header value (%i)' % (len(fdata), self.npts)) # dump data to file with open(filename, 'wb') as f: f.write(header_data) for fdata in self.data: f.write(fdata.astype(num.float32).tostring()) def __str__(self): str = '' for k in SacFile.header_keys: v = self.__dict__[k] if v is not None: if k in SacFile.enum_header_vars: if v in SacFile.header_num2name: v = SacFile.header_num2name[v] str += '%s: %s\n' % (k, v) return str def to_trace(self): assert self.iftype == SacFile.header_name2num['itime'] assert self.leven tmin = self.get_ref_time() + self.b tmax = tmin + self.delta*(self.npts-1) data = None if self.data: data = self.data[0] meta = {} exclude = ('b', 'e', 'knetwk', 'kstnm', 'khole', 'kcmpnm', 'delta', 'nzyear', 'nzjday', 'nzhour', 'nzmin', 'nzsec', 'nzmsec') for k in SacFile.header_keys: if k in exclude: continue v = self.__dict__[k] if v is not None: meta[reuse(k)] = v return trace.Trace( nonetoempty(self.knetwk)[:2], nonetoempty(self.kstnm)[:5], nonetoempty(self.khole)[:2], nonetoempty(self.kcmpnm)[:3], tmin, tmax, self.delta, data, meta=meta) def iload(filename, load_data=True): try: sacf = SacFile(filename, load_data=load_data) tr = sacf.to_trace() yield tr except (OSError, SacError) as e: raise FileLoadError(e) def detect(first512): if len(first512) < 512: # SAC header is 632 bytes long return False for sex in 'little', 'big': format = SacFile.header_num_format[sex] nbn = struct.calcsize(format) hv = list(struct.unpack(format, first512[:nbn])) iftype, nvhdr, npts, leven, delta, e, b = [ hv[i] for i in (85, 76, 79, 105, 0, 6, 5)] if iftype not in [SacFile.header_name2num[x] for x in ( 'itime', 'irlim', 'iamph', 'ixy', 'ixyz')]: continue if nvhdr < 1 or 20 < nvhdr: continue if npts < 0: continue if leven not in (0, 1, -12345): continue if leven and delta <= 0.0: continue if e != -12345.0 and b > e: continue return True return False