1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

73

74

75

76

77

78

79

80

81

82

83

84

85

86

87

88

89

90

91

92

93

94

95

96

97

98

99

100

101

102

103

104

105

106

107

108

109

110

111

112

113

114

115

116

117

118

119

120

121

122

123

124

125

126

127

128

129

130

131

132

133

134

135

136

137

138

139

140

141

142

143

144

145

146

147

# http://pyrocko.org - GPLv3 

# 

# The Pyrocko Developers, 21st Century 

# ---|P------/S----------~Lg---------- 

from __future__ import absolute_import, division 

from builtins import map 

 

import sys 

import numpy as num 

 

from .io_common import FileLoadError 

from pyrocko import util, trace 

 

 

class GSE1LoadError(FileLoadError): 

pass 

 

 

class EOF(Exception): 

pass 

 

 

def read_xw01(f): 

line = f.readline() 

if not line[:4] == 'XW01': 

raise GSE1LoadError( 

'"XW01" marker not found, maybe this is not a GSE1 file') 

 

f.readline() 

 

 

def read_wid1(f): 

line = f.readline() 

if not line.strip(): 

raise EOF() 

 

(wid1, stmin, imilli, nsamples, sta, channel_id, channel_name, sample_rate, 

system_type, data_format, diff_flag) = util.unpack_fixed( 

'a4,x1,a17,x1,i3,x1,i8,x1,a6,x1,a8,x1,a2,x1,f11,x1,a6,x1,a4,x1,i1', 

line[:80]) 

 

if wid1 != 'WID1': 

raise GSE1LoadError('"WID1" marker expected but not found.') 

 

tmin = util.str_to_time(stmin, format='%Y%j %H %M %S') + 0.001*imilli 

 

line = f.readline() 

(gain, units, calib_period, lat, lon, elevation, depth, beam_azimuth, 

beam_slowness, horizontal_orientation) = util.unpack_fixed( 

'f9,i1,f7,x1,f9,x1,f9,x1,f9,x1,f9,x1,f7,x1,f7,x1,f6', line[:80]) 

 

return (tmin, nsamples, sta, channel_id, channel_name, sample_rate, 

system_type, data_format, diff_flag, gain, units, calib_period, 

lat, lon, elevation, depth, beam_azimuth, beam_slowness, 

horizontal_orientation) 

 

 

def read_dat1_chk1(f, data_format, diff_flag, nsamples): 

dat1 = f.readline()[:4] 

if dat1 != 'DAT1': 

raise GSE1LoadError('"DAT1" marker expected but not found.') 

 

if data_format == 'INTV' and diff_flag == 0: 

samples = [] 

while len(samples) < nsamples: 

samples.extend(map(float, f.readline().split())) 

 

data = num.array(samples[:nsamples], dtype=num.int) 

 

else: 

raise GSE1LoadError( 

'GSE1 data format %s with differencing=%i not supported.' % 

(data_format, diff_flag)) 

 

line = f.readline() 

if not line.startswith('CHK1'): 

raise GSE1LoadError('"CHK1" marker expected but not found.') 

 

t = line.split() 

try: 

checksum = int(t[1]) 

except Exception: 

raise GSE1LoadError('could not parse CHK1 section') 

 

f.readline() 

 

return data, checksum 

 

 

def skip_dat1_chk1(f, data_format, diff_flag, nsamples): 

dat1 = f.readline()[:4] 

if dat1 != 'DAT1': 

raise GSE1LoadError('"DAT1" marker expected but not found.') 

 

while True: 

if f.readline().startswith('CHK1'): 

break 

 

f.readline() 

 

 

def iload(filename, load_data=True): 

with open(filename, 'r') as f: 

read_xw01(f) 

try: 

while True: 

h = read_wid1(f) 

(tmin, nsamples, sta, chid, cha, sample_rate, _, data_format, 

diff_flag, gain) = h[:10] 

 

deltat = 1.0/sample_rate 

if load_data: 

ydata, checksum = read_dat1_chk1( 

f, data_format, diff_flag, nsamples) 

tmax = None 

else: 

skip_dat1_chk1(f, data_format, diff_flag, nsamples) 

ydata = None 

tmax = tmin + (nsamples-1)*deltat 

 

yield trace.Trace( 

'', sta, '', cha, 

tmin=tmin, 

tmax=tmax, 

deltat=deltat, 

ydata=ydata) 

 

except EOF: 

pass 

 

 

def detect(first512): 

lines = first512.splitlines() 

if len(lines) >= 5 and \ 

lines[0].startswith(b'XW01') and lines[2].startswith(b'WID1') and \ 

lines[4].startswith(b'DAT1'): 

return True 

 

return False 

 

 

if __name__ == '__main__': 

all_traces = [] 

for fn in sys.argv[1:]: 

all_traces.extend(iload(fn)) 

 

trace.snuffle(all_traces)