Source code for pyrocko.io.seisan_waveform

# http://pyrocko.org - GPLv3
#
# The Pyrocko Developers, 21st Century
# ---|P------/S----------~Lg----------
from __future__ import division, absolute_import, print_function
from builtins import range

import sys
import calendar
import numpy as num

from pyrocko import util, trace
from pyrocko.util import unpack_fixed
from .io_common import FileLoadError


[docs]class SeisanFileError(Exception): pass
[docs]def read_file_header(f, npad=4): header_infos = [] nlines = 12 iline = 0 while iline < nlines: f.read(npad) d = f.read(80) d = str(d.decode('ascii')) f.read(npad) if iline == 0: net_name, nchannels, ear, doy, mon, day, hr, min, secs, tlen = \ unpack_fixed( 'x1,a29,i3,i3,x1,i3,x1,i2,x1,i2,x1,i2,x1,i2,x1,f6,x1,f9', d) year = 1900 + ear tmin = calendar.timegm((year, mon, day, hr, min, secs)) header_infos.append( (net_name, nchannels, util.time_to_str(tmin))) if nchannels > 30: nlines += (nchannels - 31)//3 + 1 if iline >= 2: for j in range(3): s = d[j*26:(j+1)*26] if s.strip(): sta1, cha1, cha2, sta2, toffset, tlen = unpack_fixed( 'x1,a4,a2,x1,a1,a1,f7,x1,f8', s) sta = sta1 + sta2 cha = cha1 + cha2 header_infos.append( (sta, cha, toffset, tlen)) iline += 1 return header_infos
[docs]class EOF(Exception): pass
[docs]def read_channel_header(f, npad=4): x = f.read(npad) if len(x) == 0: raise EOF() d = f.read(1040) d = str(d.decode('ascii')) f.read(npad) sta, cha1, loc1, cha2, ear, loc2, doy, net1, mon, net2, day, hr, min, \ tflag, secs, rate, nsamples, lat, lon, elevation, gain_flag, \ sample_bytes, response_flag1, response_flag2 = unpack_fixed( 'a5,a2,a1,a1,i3,a1,i3,a1,i2,a1,i2,x1,i2,x1,i2,a1,f6,x1,f7,i7,x1,' 'f8?,x1,f9?,x1,f5?,a1,i1,a1,a1', d[:79]) gain = 1 if gain_flag: gain = unpack_fixed('f12', d[147:159]) cha = cha1+cha2 loc = loc1+loc2 net = net1+net2 tmin = calendar.timegm((1900+ear, mon, day, hr, min, secs)) deltat = 1./rate return (net, sta, loc, cha, tmin, tflag, deltat, nsamples, sample_bytes, lat, lon, elevation, gain)
[docs]def read_channel_data( f, endianness, sample_bytes, nsamples, gain, load_data=True, npad=4): if not load_data: f.seek(sample_bytes*nsamples + 2*npad, 1) return None else: f.read(npad) data = num.fromfile( f, dtype=num.dtype('%si%i' % (endianness, sample_bytes)), count=nsamples).astype('i%i' % sample_bytes) f.read(npad) data *= gain return data
[docs]def iload(filename, load_data=True, subformat='l4'): try: npad = 4 if subformat is not None: try: endianness = {'l': '<', 'b': '>'}[subformat[0]] if len(subformat) > 1: npad = int(subformat[1:]) except Exception: raise SeisanFileError( 'Bad subformat specification: "%s"' % subformat) else: endianness = '<' with open(filename, 'rb') as f: try: read_file_header(f, npad=npad) except util.UnpackError as e: raise SeisanFileError( 'Error loading header from file %s: %s' % (filename, str(e))) try: itrace = 0 while True: try: (net, sta, loc, cha, tmin, tflag, deltat, nsamples, sample_bytes, lat, lon, elevation, gain) \ = read_channel_header(f, npad=npad) data = read_channel_data( f, endianness, sample_bytes, nsamples, gain, load_data, npad=npad) tmax = None if data is None: tmax = tmin + (nsamples-1)*deltat t = trace.Trace( net, sta, loc, cha, tmin=tmin, tmax=tmax, deltat=deltat, ydata=data) yield t except util.UnpackError as e: raise SeisanFileError( 'Error loading trace %i from file %s: %s' % ( itrace, filename, str(e))) itrace += 1 except EOF: pass except (OSError, SeisanFileError) as e: raise FileLoadError(e)
if __name__ == '__main__': fn = sys.argv[1] for tr in iload(fn): print(tr)