1import time 

2import logging 

3import numpy as num 

4from pyrocko import util, trace 

5 

6logger = logging.getLogger('pyrocko.streaming.datacube') 

7 

8cube_rates = num.array([50., 100., 200., 400., 800.], dtype=float) 

9 

10 

11def quotechars(chars): 

12 return ''.join(['.', chr(c)][chr(c).isalnum()] for c in chars) 

13 

14 

15def hexdump(chars, sep=' ', width=16): 

16 while chars: 

17 line = chars[:width] 

18 chars = chars[width:] 

19 line = line.ljust(width, b'\000') 

20 

21 print("%s%s%s" % ( 

22 sep.join("%02x" % c for c in line), 

23 sep, quotechars(line))) 

24 

25 

26def read_wait(s, nread): 

27 buf = b'' 

28 while True: 

29 buf += s.read(nread - len(buf)) 

30 if len(buf) == nread: 

31 return buf 

32 

33 logger.info('Incomplete read from serial port. Waiting...') 

34 time.sleep(1.0) 

35 

36 

37def read_blocktype(s): 

38 buf = read_wait(s, 1) 

39 return buf[0] >> 4 

40 

41 

42def determine_nchannels(s): 

43 while True: 

44 datablock_starts = [] 

45 for i in range(1024): 

46 datablock_starts.append(read_blocktype(s) in (8, 9)) 

47 

48 if len(datablock_starts) > 50: 

49 dd = num.diff(num.where(datablock_starts)[0]) - 1 

50 dd = dd[dd % 4 == 0] 

51 

52 if dd.size > 50: 

53 nchannels = dd // 4 

54 nchannels_hist, _ = num.histogram(nchannels, num.arange(5)-1) 

55 nchannels = num.argmax(nchannels_hist) 

56 if nchannels_hist[nchannels] > 50: 

57 return nchannels 

58 

59 logger.info('Could not determine number of channels. Retrying...') 

60 

61 

62def sync(s, nchannels, nok_want=10): 

63 nok = 0 

64 while nok < nok_want: 

65 blocktype = read_blocktype(s) 

66 if blocktype in (8, 9): 

67 read_wait(s, 4*nchannels) 

68 nok += 1 

69 elif blocktype == 10: 

70 read_wait(s, 79) 

71 nok += 1 

72 else: 

73 nok = 0 

74 

75 

76def read_sample(s, nchannels): 

77 

78 b = read_wait(s, nchannels*4) 

79 

80 sample = [] 

81 

82 for i in range(nchannels): 

83 v = b[i*4 + 0] << 17 

84 v += b[i*4 + 1] << 10 

85 v += b[i*4 + 2] << 3 

86 v += b[i*4 + 3] 

87 v -= (v & 0x800000) << 1 

88 sample.append(v) 

89 

90 return sample 

91 

92 

93class SerialCubeError(Exception): 

94 pass 

95 

96 

97class SerialCube(object): 

98 

99 def __init__( 

100 self, 

101 device='/dev/ttyUSB0', 

102 network='', 

103 station='CUBE', 

104 location='', 

105 timeout=5.0): 

106 

107 # fixed 

108 self._network = network 

109 self._station = station 

110 self._location = location 

111 self._device = device 

112 self._timeout = timeout 

113 self._baudrate = 115200 

114 self._nsamples_buffer = 100 

115 

116 # state 

117 self._init_state() 

118 

119 def _init_state(self): 

120 self._serial = None 

121 self._nchannels = None 

122 self._tstart = None 

123 self._rate = None 

124 self._buffer = None 

125 self._isamples_buffer = None 

126 self._nsamples_read = 0 

127 

128 def acquisition_start(self): 

129 import serial 

130 assert self._serial is None 

131 try: 

132 self._serial = serial.Serial( 

133 port=self._device, 

134 baudrate=self._baudrate, 

135 timeout=self._timeout) 

136 

137 except serial.serialutil.SerialException as e: 

138 raise SerialCubeError( 

139 'Opening serial interface failed: %s' % str(e)) 

140 

141 nchannels = determine_nchannels(self._serial) 

142 logger.info('Number of channels: %i' % nchannels) 

143 

144 sync(self._serial, nchannels) 

145 self._nchannels = nchannels 

146 

147 def acquisition_stop(self): 

148 assert self._serial is not None 

149 self._serial.close() 

150 self._init_state() 

151 

152 def process(self): 

153 assert self._serial is not None 

154 

155 blocktype = read_blocktype(self._serial) 

156 if blocktype in (8, 9): 

157 if self._buffer is None: 

158 self._buffer = num.zeros( 

159 (self._nsamples_buffer, self._nchannels), 

160 dtype=num.float32) 

161 if self._tstart is None: 

162 self._tstart = time.time() 

163 

164 self._isamples_buffer = 0 

165 

166 self._buffer[self._isamples_buffer, :] = read_sample( 

167 self._serial, self._nchannels) 

168 

169 self._isamples_buffer += 1 

170 

171 elif blocktype == 10: 

172 read_wait(self._serial, 79) 

173 

174 if self._isamples_buffer == self._nsamples_buffer: 

175 if self._rate is None: 

176 tdur = time.time() - self._tstart 

177 assert tdur > 0. 

178 rate_approx = self._nsamples_buffer / tdur 

179 self._rate = cube_rates[ 

180 num.argmin(num.abs(rate_approx - cube_rates))] 

181 

182 logger.info('Sample rate [Hz]: %g' % self._rate) 

183 

184 deltat = 1.0 / self._rate 

185 t = self._tstart + self._nsamples_read * deltat 

186 for ichannel in range(self._nchannels): 

187 tr = trace.Trace( 

188 self._network, 

189 self._station, 

190 self._location, 

191 'p%i' % ichannel, 

192 ydata=self._buffer[:, ichannel], 

193 tmin=t, 

194 deltat=1.0/self._rate) 

195 

196 self.got_trace(tr) 

197 

198 self._nsamples_read += self._nsamples_buffer 

199 self._buffer = None 

200 self._isamples_buffer = None 

201 

202 def got_trace(self, tr): 

203 logger.info( 

204 'Got trace from DATA-CUBE: %s, mean: %g, std: %g' % ( 

205 tr.summary, num.mean(tr.ydata), num.std(tr.ydata))) 

206 

207 

208def main(): 

209 import sys 

210 util.setup_logging('main', 'info') 

211 if len(sys.argv) != 2: 

212 sys.exit('usage: python -m pyrocko.streaming.datacube DEVICE') 

213 

214 device = sys.argv[1] 

215 cs = SerialCube(device) 

216 try: 

217 cs.acquisition_start() 

218 except SerialCubeError as e: 

219 sys.exit(str(e)) 

220 

221 try: 

222 while True: 

223 cs.process() 

224 

225 except KeyboardInterrupt: 

226 pass 

227 

228 finally: 

229 cs.acquisition_stop() 

230 

231 

232if __name__ == '__main__': 

233 main()