1import time
2import logging
3import numpy as num
4from pyrocko import util, trace
6logger = logging.getLogger('pyrocko.streaming.datacube')
8cube_rates = num.array([50., 100., 200., 400., 800.], dtype=float)
11def quotechars(chars):
12 return ''.join(['.', chr(c)][chr(c).isalnum()] for c in chars)
15def hexdump(chars, sep=' ', width=16):
16 while chars:
17 line = chars[:width]
18 chars = chars[width:]
19 line = line.ljust(width, b'\000')
21 print("%s%s%s" % (
22 sep.join("%02x" % c for c in line),
23 sep, quotechars(line)))
26def read_wait(s, nread):
27 buf = b''
28 while True:
29 buf += s.read(nread - len(buf))
30 if len(buf) == nread:
31 return buf
33 logger.info('Incomplete read from serial port. Waiting...')
34 time.sleep(1.0)
37def read_blocktype(s):
38 buf = read_wait(s, 1)
39 return buf[0] >> 4
42def determine_nchannels(s):
43 while True:
44 datablock_starts = []
45 for i in range(1024):
46 datablock_starts.append(read_blocktype(s) in (8, 9))
48 if len(datablock_starts) > 50:
49 dd = num.diff(num.where(datablock_starts)[0]) - 1
50 dd = dd[dd % 4 == 0]
52 if dd.size > 50:
53 nchannels = dd // 4
54 nchannels_hist, _ = num.histogram(nchannels, num.arange(5)-1)
55 nchannels = num.argmax(nchannels_hist)
56 if nchannels_hist[nchannels] > 50:
57 return nchannels
59 logger.info('Could not determine number of channels. Retrying...')
62def sync(s, nchannels, nok_want=10):
63 nok = 0
64 while nok < nok_want:
65 blocktype = read_blocktype(s)
66 if blocktype in (8, 9):
67 read_wait(s, 4*nchannels)
68 nok += 1
69 elif blocktype == 10:
70 read_wait(s, 79)
71 nok += 1
72 else:
73 nok = 0
76def read_sample(s, nchannels):
78 b = read_wait(s, nchannels*4)
80 sample = []
82 for i in range(nchannels):
83 v = b[i*4 + 0] << 17
84 v += b[i*4 + 1] << 10
85 v += b[i*4 + 2] << 3
86 v += b[i*4 + 3]
87 v -= (v & 0x800000) << 1
88 sample.append(v)
90 return sample
93class SerialCubeError(Exception):
94 pass
97class SerialCube(object):
99 def __init__(
100 self,
101 device='/dev/ttyUSB0',
102 network='',
103 station='CUBE',
104 location='',
105 timeout=5.0):
107 # fixed
108 self._network = network
109 self._station = station
110 self._location = location
111 self._device = device
112 self._timeout = timeout
113 self._baudrate = 115200
114 self._nsamples_buffer = 100
116 # state
117 self._init_state()
119 def _init_state(self):
120 self._serial = None
121 self._nchannels = None
122 self._tstart = None
123 self._rate = None
124 self._buffer = None
125 self._isamples_buffer = None
126 self._nsamples_read = 0
128 def acquisition_start(self):
129 import serial
130 assert self._serial is None
131 try:
132 self._serial = serial.Serial(
133 port=self._device,
134 baudrate=self._baudrate,
135 timeout=self._timeout)
137 except serial.serialutil.SerialException as e:
138 raise SerialCubeError(
139 'Opening serial interface failed: %s' % str(e))
141 nchannels = determine_nchannels(self._serial)
142 logger.info('Number of channels: %i' % nchannels)
144 sync(self._serial, nchannels)
145 self._nchannels = nchannels
147 def acquisition_stop(self):
148 assert self._serial is not None
149 self._serial.close()
150 self._init_state()
152 def process(self):
153 assert self._serial is not None
155 blocktype = read_blocktype(self._serial)
156 if blocktype in (8, 9):
157 if self._buffer is None:
158 self._buffer = num.zeros(
159 (self._nsamples_buffer, self._nchannels),
160 dtype=num.float32)
161 if self._tstart is None:
162 self._tstart = time.time()
164 self._isamples_buffer = 0
166 self._buffer[self._isamples_buffer, :] = read_sample(
167 self._serial, self._nchannels)
169 self._isamples_buffer += 1
171 elif blocktype == 10:
172 read_wait(self._serial, 79)
174 if self._isamples_buffer == self._nsamples_buffer:
175 if self._rate is None:
176 tdur = time.time() - self._tstart
177 assert tdur > 0.
178 rate_approx = self._nsamples_buffer / tdur
179 self._rate = cube_rates[
180 num.argmin(num.abs(rate_approx - cube_rates))]
182 logger.info('Sample rate [Hz]: %g' % self._rate)
184 deltat = 1.0 / self._rate
185 t = self._tstart + self._nsamples_read * deltat
186 for ichannel in range(self._nchannels):
187 tr = trace.Trace(
188 self._network,
189 self._station,
190 self._location,
191 'p%i' % ichannel,
192 ydata=self._buffer[:, ichannel],
193 tmin=t,
194 deltat=1.0/self._rate)
196 self.got_trace(tr)
198 self._nsamples_read += self._nsamples_buffer
199 self._buffer = None
200 self._isamples_buffer = None
202 def got_trace(self, tr):
203 logger.info(
204 'Got trace from DATA-CUBE: %s, mean: %g, std: %g' % (
205 tr.summary, num.mean(tr.ydata), num.std(tr.ydata)))
208def main():
209 import sys
210 util.setup_logging('main', 'info')
211 if len(sys.argv) != 2:
212 sys.exit('usage: python -m pyrocko.streaming.datacube DEVICE')
214 device = sys.argv[1]
215 cs = SerialCube(device)
216 try:
217 cs.acquisition_start()
218 except SerialCubeError as e:
219 sys.exit(str(e))
221 try:
222 while True:
223 cs.process()
225 except KeyboardInterrupt:
226 pass
228 finally:
229 cs.acquisition_stop()
232if __name__ == '__main__':
233 main()