Coverage for /usr/local/lib/python3.11/dist-packages/pyrocko/io/gse1.py: 85%
67 statements
« prev ^ index » next coverage.py v6.5.0, created at 2024-03-07 11:54 +0000
« prev ^ index » next coverage.py v6.5.0, created at 2024-03-07 11:54 +0000
1# http://pyrocko.org - GPLv3
2#
3# The Pyrocko Developers, 21st Century
4# ---|P------/S----------~Lg----------
6'''
7Reader for GSE1 files.
8'''
10import sys
11import numpy as num
13from .io_common import FileLoadError
14from pyrocko import util, trace
17class GSE1LoadError(FileLoadError):
18 pass
21class EOF(Exception):
22 pass
25def read_xw01(f):
26 line = f.readline()
27 if not line[:4] == 'XW01':
28 raise GSE1LoadError(
29 '"XW01" marker not found, maybe this is not a GSE1 file')
31 f.readline()
34def read_wid1(f):
35 line = f.readline()
36 if not line.strip():
37 raise EOF()
39 (wid1, stmin, imilli, nsamples, sta, channel_id, channel_name, sample_rate,
40 system_type, data_format, diff_flag) = util.unpack_fixed(
41 'a4,x1,a17,x1,i3,x1,i8,x1,a6,x1,a8,x1,a2,x1,f11,x1,a6,x1,a4,x1,i1',
42 line[:80])
44 if wid1 != 'WID1':
45 raise GSE1LoadError('"WID1" marker expected but not found.')
47 tmin = util.str_to_time(stmin, format='%Y%j %H %M %S') + 0.001*imilli
49 line = f.readline()
50 (gain, units, calib_period, lat, lon, elevation, depth, beam_azimuth,
51 beam_slowness, horizontal_orientation) = util.unpack_fixed(
52 'f9,i1,f7,x1,f9,x1,f9,x1,f9,x1,f9,x1,f7,x1,f7,x1,f6', line[:80])
54 return (tmin, nsamples, sta, channel_id, channel_name, sample_rate,
55 system_type, data_format, diff_flag, gain, units, calib_period,
56 lat, lon, elevation, depth, beam_azimuth, beam_slowness,
57 horizontal_orientation)
60def read_dat1_chk1(f, data_format, diff_flag, nsamples):
61 dat1 = f.readline()[:4]
62 if dat1 != 'DAT1':
63 raise GSE1LoadError('"DAT1" marker expected but not found.')
65 if data_format == 'INTV' and diff_flag == 0:
66 samples = []
67 while len(samples) < nsamples:
68 samples.extend(map(float, f.readline().split()))
70 data = num.array(samples[:nsamples], dtype=int)
72 else:
73 raise GSE1LoadError(
74 'GSE1 data format %s with differencing=%i not supported.' %
75 (data_format, diff_flag))
77 line = f.readline()
78 if not line.startswith('CHK1'):
79 raise GSE1LoadError('"CHK1" marker expected but not found.')
81 t = line.split()
82 try:
83 checksum = int(t[1])
84 except Exception:
85 raise GSE1LoadError('could not parse CHK1 section')
87 f.readline()
89 return data, checksum
92def skip_dat1_chk1(f, data_format, diff_flag, nsamples):
93 dat1 = f.readline()[:4]
94 if dat1 != 'DAT1':
95 raise GSE1LoadError('"DAT1" marker expected but not found.')
97 while True:
98 if f.readline().startswith('CHK1'):
99 break
101 f.readline()
104def iload(filename, load_data=True):
105 with open(filename, 'r') as f:
106 read_xw01(f)
107 try:
108 while True:
109 h = read_wid1(f)
110 (tmin, nsamples, sta, chid, cha, sample_rate, _, data_format,
111 diff_flag, gain) = h[:10]
113 deltat = 1.0/sample_rate
114 if load_data:
115 ydata, checksum = read_dat1_chk1(
116 f, data_format, diff_flag, nsamples)
117 tmax = None
118 else:
119 skip_dat1_chk1(f, data_format, diff_flag, nsamples)
120 ydata = None
121 tmax = tmin + (nsamples-1)*deltat
123 yield trace.Trace(
124 '', sta, '', cha,
125 tmin=tmin,
126 tmax=tmax,
127 deltat=deltat,
128 ydata=ydata)
130 except EOF:
131 pass
134def detect(first512):
135 lines = first512.splitlines()
136 if len(lines) >= 5 and \
137 lines[0].startswith(b'XW01') and lines[2].startswith(b'WID1') and \
138 lines[4].startswith(b'DAT1'):
139 return True
141 return False
144if __name__ == '__main__':
145 all_traces = []
146 for fn in sys.argv[1:]:
147 all_traces.extend(iload(fn))
149 trace.snuffle(all_traces)