Coverage for /usr/local/lib/python3.11/dist-packages/pyrocko/io/gse1.py: 85%

67 statements  

« prev     ^ index     » next       coverage.py v6.5.0, created at 2023-10-06 15:01 +0000

1# http://pyrocko.org - GPLv3 

2# 

3# The Pyrocko Developers, 21st Century 

4# ---|P------/S----------~Lg---------- 

5 

6''' 

7Reader for GSE1 files. 

8''' 

9 

10import sys 

11import numpy as num 

12 

13from .io_common import FileLoadError 

14from pyrocko import util, trace 

15 

16 

17class GSE1LoadError(FileLoadError): 

18 pass 

19 

20 

21class EOF(Exception): 

22 pass 

23 

24 

25def read_xw01(f): 

26 line = f.readline() 

27 if not line[:4] == 'XW01': 

28 raise GSE1LoadError( 

29 '"XW01" marker not found, maybe this is not a GSE1 file') 

30 

31 f.readline() 

32 

33 

34def read_wid1(f): 

35 line = f.readline() 

36 if not line.strip(): 

37 raise EOF() 

38 

39 (wid1, stmin, imilli, nsamples, sta, channel_id, channel_name, sample_rate, 

40 system_type, data_format, diff_flag) = util.unpack_fixed( 

41 'a4,x1,a17,x1,i3,x1,i8,x1,a6,x1,a8,x1,a2,x1,f11,x1,a6,x1,a4,x1,i1', 

42 line[:80]) 

43 

44 if wid1 != 'WID1': 

45 raise GSE1LoadError('"WID1" marker expected but not found.') 

46 

47 tmin = util.str_to_time(stmin, format='%Y%j %H %M %S') + 0.001*imilli 

48 

49 line = f.readline() 

50 (gain, units, calib_period, lat, lon, elevation, depth, beam_azimuth, 

51 beam_slowness, horizontal_orientation) = util.unpack_fixed( 

52 'f9,i1,f7,x1,f9,x1,f9,x1,f9,x1,f9,x1,f7,x1,f7,x1,f6', line[:80]) 

53 

54 return (tmin, nsamples, sta, channel_id, channel_name, sample_rate, 

55 system_type, data_format, diff_flag, gain, units, calib_period, 

56 lat, lon, elevation, depth, beam_azimuth, beam_slowness, 

57 horizontal_orientation) 

58 

59 

60def read_dat1_chk1(f, data_format, diff_flag, nsamples): 

61 dat1 = f.readline()[:4] 

62 if dat1 != 'DAT1': 

63 raise GSE1LoadError('"DAT1" marker expected but not found.') 

64 

65 if data_format == 'INTV' and diff_flag == 0: 

66 samples = [] 

67 while len(samples) < nsamples: 

68 samples.extend(map(float, f.readline().split())) 

69 

70 data = num.array(samples[:nsamples], dtype=int) 

71 

72 else: 

73 raise GSE1LoadError( 

74 'GSE1 data format %s with differencing=%i not supported.' % 

75 (data_format, diff_flag)) 

76 

77 line = f.readline() 

78 if not line.startswith('CHK1'): 

79 raise GSE1LoadError('"CHK1" marker expected but not found.') 

80 

81 t = line.split() 

82 try: 

83 checksum = int(t[1]) 

84 except Exception: 

85 raise GSE1LoadError('could not parse CHK1 section') 

86 

87 f.readline() 

88 

89 return data, checksum 

90 

91 

92def skip_dat1_chk1(f, data_format, diff_flag, nsamples): 

93 dat1 = f.readline()[:4] 

94 if dat1 != 'DAT1': 

95 raise GSE1LoadError('"DAT1" marker expected but not found.') 

96 

97 while True: 

98 if f.readline().startswith('CHK1'): 

99 break 

100 

101 f.readline() 

102 

103 

104def iload(filename, load_data=True): 

105 with open(filename, 'r') as f: 

106 read_xw01(f) 

107 try: 

108 while True: 

109 h = read_wid1(f) 

110 (tmin, nsamples, sta, chid, cha, sample_rate, _, data_format, 

111 diff_flag, gain) = h[:10] 

112 

113 deltat = 1.0/sample_rate 

114 if load_data: 

115 ydata, checksum = read_dat1_chk1( 

116 f, data_format, diff_flag, nsamples) 

117 tmax = None 

118 else: 

119 skip_dat1_chk1(f, data_format, diff_flag, nsamples) 

120 ydata = None 

121 tmax = tmin + (nsamples-1)*deltat 

122 

123 yield trace.Trace( 

124 '', sta, '', cha, 

125 tmin=tmin, 

126 tmax=tmax, 

127 deltat=deltat, 

128 ydata=ydata) 

129 

130 except EOF: 

131 pass 

132 

133 

134def detect(first512): 

135 lines = first512.splitlines() 

136 if len(lines) >= 5 and \ 

137 lines[0].startswith(b'XW01') and lines[2].startswith(b'WID1') and \ 

138 lines[4].startswith(b'DAT1'): 

139 return True 

140 

141 return False 

142 

143 

144if __name__ == '__main__': 

145 all_traces = [] 

146 for fn in sys.argv[1:]: 

147 all_traces.extend(iload(fn)) 

148 

149 trace.snuffle(all_traces)