Coverage for /usr/local/lib/python3.11/dist-packages/pyrocko/io/seisan_waveform.py: 88%

92 statements  

« prev     ^ index     » next       coverage.py v6.5.0, created at 2023-10-06 06:59 +0000

1# http://pyrocko.org - GPLv3 

2# 

3# The Pyrocko Developers, 21st Century 

4# ---|P------/S----------~Lg---------- 

5 

6''' 

7Read seismic waveforms from `SEISAN <http://seisan.info/>`_ files. 

8''' 

9 

10import sys 

11import calendar 

12import numpy as num 

13 

14from pyrocko import util, trace 

15from pyrocko.util import unpack_fixed 

16from .io_common import FileLoadError 

17 

18 

19class SeisanFileError(Exception): 

20 pass 

21 

22 

23def read_file_header(f, npad=4): 

24 header_infos = [] 

25 

26 nlines = 12 

27 iline = 0 

28 while iline < nlines: 

29 f.read(npad) 

30 d = f.read(80) 

31 d = str(d.decode('ascii')) 

32 f.read(npad) 

33 

34 if iline == 0: 

35 net_name, nchannels, ear, doy, mon, day, hr, min, secs, tlen = \ 

36 unpack_fixed( 

37 'x1,a29,i3,i3,x1,i3,x1,i2,x1,i2,x1,i2,x1,i2,x1,f6,x1,f9', 

38 d) 

39 

40 year = 1900 + ear 

41 tmin = util.to_time_float( 

42 calendar.timegm((year, mon, day, hr, min, secs))) 

43 header_infos.append( 

44 (net_name, nchannels, util.time_to_str(tmin))) 

45 

46 if nchannels > 30: 

47 nlines += (nchannels - 31)//3 + 1 

48 

49 if iline >= 2: 

50 for j in range(3): 

51 s = d[j*26:(j+1)*26] 

52 if s.strip(): 

53 sta1, cha1, cha2, sta2, toffset, tlen = unpack_fixed( 

54 'x1,a4,a2,x1,a1,a1,f7,x1,f8', s) 

55 

56 sta = sta1 + sta2 

57 cha = cha1 + cha2 

58 header_infos.append( 

59 (sta, cha, toffset, tlen)) 

60 

61 iline += 1 

62 return header_infos 

63 

64 

65class EOF(Exception): 

66 pass 

67 

68 

69def read_channel_header(f, npad=4): 

70 x = f.read(npad) 

71 if len(x) == 0: 

72 raise EOF() 

73 

74 d = f.read(1040) 

75 d = str(d.decode('ascii')) 

76 f.read(npad) 

77 

78 sta, cha1, loc1, cha2, ear, loc2, doy, net1, mon, net2, day, hr, min, \ 

79 tflag, secs, rate, nsamples, lat, lon, elevation, gain_flag, \ 

80 sample_bytes, response_flag1, response_flag2 = unpack_fixed( 

81 'a5,a2,a1,a1,i3,a1,i3,a1,i2,a1,i2,x1,i2,x1,i2,a1,f6,x1,f7,i7,x1,' 

82 'f8?,x1,f9?,x1,f5?,a1,i1,a1,a1', d[:79]) 

83 

84 gain = 1 

85 if gain_flag: 

86 gain = unpack_fixed('f12', d[147:159]) 

87 

88 cha = cha1+cha2 

89 loc = loc1+loc2 

90 net = net1+net2 

91 tmin = util.to_time_float( 

92 calendar.timegm((1900+ear, mon, day, hr, min, secs))) 

93 deltat = 1./rate 

94 

95 return (net, sta, loc, cha, 

96 tmin, tflag, deltat, nsamples, sample_bytes, 

97 lat, lon, elevation, gain) 

98 

99 

100def read_channel_data( 

101 f, endianness, sample_bytes, nsamples, gain, load_data=True, npad=4): 

102 

103 if not load_data: 

104 f.seek(sample_bytes*nsamples + 2*npad, 1) 

105 return None 

106 

107 else: 

108 f.read(npad) 

109 data = num.fromfile( 

110 f, 

111 dtype=num.dtype('%si%i' % (endianness, sample_bytes)), 

112 count=nsamples).astype('i%i' % sample_bytes) 

113 

114 f.read(npad) 

115 data *= gain 

116 return data 

117 

118 

119def iload(filename, load_data=True, subformat='l4'): 

120 

121 try: 

122 npad = 4 

123 if subformat is not None: 

124 try: 

125 endianness = {'l': '<', 'b': '>'}[subformat[0]] 

126 if len(subformat) > 1: 

127 npad = int(subformat[1:]) 

128 except Exception: 

129 raise SeisanFileError( 

130 'Bad subformat specification: "%s"' % subformat) 

131 else: 

132 endianness = '<' 

133 

134 with open(filename, 'rb') as f: 

135 try: 

136 read_file_header(f, npad=npad) 

137 

138 except util.UnpackError as e: 

139 raise SeisanFileError( 

140 'Error loading header from file %s: %s' 

141 % (filename, str(e))) 

142 

143 try: 

144 itrace = 0 

145 while True: 

146 try: 

147 (net, sta, loc, cha, tmin, tflag, deltat, nsamples, 

148 sample_bytes, lat, lon, elevation, gain) \ 

149 = read_channel_header(f, npad=npad) 

150 

151 data = read_channel_data( 

152 f, endianness, sample_bytes, nsamples, 

153 gain, load_data, 

154 npad=npad) 

155 

156 tmax = None 

157 if data is None: 

158 tmax = tmin + (nsamples-1)*deltat 

159 

160 t = trace.Trace( 

161 net, sta, loc, cha, tmin=tmin, tmax=tmax, 

162 deltat=deltat, ydata=data) 

163 

164 yield t 

165 

166 except util.UnpackError as e: 

167 raise SeisanFileError( 

168 'Error loading trace %i from file %s: %s' % ( 

169 itrace, filename, str(e))) 

170 

171 itrace += 1 

172 

173 except EOF: 

174 pass 

175 

176 except (OSError, SeisanFileError) as e: 

177 raise FileLoadError(e) 

178 

179 

180if __name__ == '__main__': 

181 fn = sys.argv[1] 

182 for tr in iload(fn): 

183 print(tr)