Coverage for /usr/local/lib/python3.11/dist-packages/pyrocko/streaming/datacube.py: 17%
144 statements
« prev ^ index » next coverage.py v6.5.0, created at 2024-03-07 11:54 +0000
« prev ^ index » next coverage.py v6.5.0, created at 2024-03-07 11:54 +0000
1# http://pyrocko.org - GPLv3
2#
3# The Pyrocko Developers, 21st Century
4# ---|P------/S----------~Lg----------
6'''
7Live stream reader for `DiGOS DATA-CUBE³
8<https://digos.eu/the-seismic-data-recorder/>`_ digitizers
9'''
11import time
12import logging
13import numpy as num
14from pyrocko import util, trace
16logger = logging.getLogger('pyrocko.streaming.datacube')
18cube_rates = num.array([50., 100., 200., 400., 800.], dtype=float)
21def quotechars(chars):
22 return ''.join(['.', chr(c)][chr(c).isalnum()] for c in chars)
25def hexdump(chars, sep=' ', width=16):
26 while chars:
27 line = chars[:width]
28 chars = chars[width:]
29 line = line.ljust(width, b'\000')
31 print('%s%s%s' % (
32 sep.join('%02x' % c for c in line),
33 sep, quotechars(line)))
36def read_wait(s, nread):
37 buf = b''
38 while True:
39 buf += s.read(nread - len(buf))
40 if len(buf) == nread:
41 return buf
43 logger.info('Incomplete read from serial port. Waiting...')
44 time.sleep(1.0)
47def read_blocktype(s):
48 buf = read_wait(s, 1)
49 return buf[0] >> 4
52def determine_nchannels(s):
53 while True:
54 datablock_starts = []
55 for i in range(1024):
56 datablock_starts.append(read_blocktype(s) in (8, 9))
58 if len(datablock_starts) > 50:
59 dd = num.diff(num.where(datablock_starts)[0]) - 1
60 dd = dd[dd % 4 == 0]
62 if dd.size > 50:
63 nchannels = dd // 4
64 nchannels_hist, _ = num.histogram(nchannels, num.arange(5)-1)
65 nchannels = num.argmax(nchannels_hist)
66 if nchannels_hist[nchannels] > 50:
67 return nchannels
69 logger.info('Could not determine number of channels. Retrying...')
72def sync(s, nchannels, nok_want=10):
73 nok = 0
74 while nok < nok_want:
75 blocktype = read_blocktype(s)
76 if blocktype in (8, 9):
77 read_wait(s, 4*nchannels)
78 nok += 1
79 elif blocktype == 10:
80 read_wait(s, 79)
81 nok += 1
82 else:
83 nok = 0
86def read_sample(s, nchannels):
88 b = read_wait(s, nchannels*4)
90 sample = []
92 for i in range(nchannels):
93 v = b[i*4 + 0] << 17
94 v += b[i*4 + 1] << 10
95 v += b[i*4 + 2] << 3
96 v += b[i*4 + 3]
97 v -= (v & 0x800000) << 1
98 sample.append(v)
100 return sample
103class SerialCubeError(Exception):
104 pass
107class SerialCube(object):
109 def __init__(
110 self,
111 device='/dev/ttyUSB0',
112 network='',
113 station='CUBE',
114 location='',
115 timeout=5.0):
117 # fixed
118 self._network = network
119 self._station = station
120 self._location = location
121 self._device = device
122 self._timeout = timeout
123 self._baudrate = 115200
124 self._nsamples_buffer = 100
126 # state
127 self._init_state()
129 def _init_state(self):
130 self._serial = None
131 self._nchannels = None
132 self._tstart = None
133 self._rate = None
134 self._buffer = None
135 self._isamples_buffer = None
136 self._nsamples_read = 0
138 def acquisition_start(self):
139 import serial
140 assert self._serial is None
141 try:
142 self._serial = serial.Serial(
143 port=self._device,
144 baudrate=self._baudrate,
145 timeout=self._timeout)
147 except serial.serialutil.SerialException as e:
148 raise SerialCubeError(
149 'Opening serial interface failed: %s' % str(e))
151 nchannels = determine_nchannels(self._serial)
152 logger.info('Number of channels: %i' % nchannels)
154 sync(self._serial, nchannels)
155 self._nchannels = nchannels
157 def acquisition_stop(self):
158 assert self._serial is not None
159 self._serial.close()
160 self._init_state()
162 def process(self):
163 assert self._serial is not None
165 blocktype = read_blocktype(self._serial)
166 if blocktype in (8, 9):
167 if self._buffer is None:
168 self._buffer = num.zeros(
169 (self._nsamples_buffer, self._nchannels),
170 dtype=num.float32)
171 if self._tstart is None:
172 self._tstart = time.time()
174 self._isamples_buffer = 0
176 self._buffer[self._isamples_buffer, :] = read_sample(
177 self._serial, self._nchannels)
179 self._isamples_buffer += 1
181 elif blocktype == 10:
182 read_wait(self._serial, 79)
184 if self._isamples_buffer == self._nsamples_buffer:
185 if self._rate is None:
186 tdur = time.time() - self._tstart
187 assert tdur > 0.
188 rate_approx = self._nsamples_buffer / tdur
189 self._rate = cube_rates[
190 num.argmin(num.abs(rate_approx - cube_rates))]
192 logger.info('Sample rate [Hz]: %g' % self._rate)
194 deltat = 1.0 / self._rate
195 t = self._tstart + self._nsamples_read * deltat
196 for ichannel in range(self._nchannels):
197 tr = trace.Trace(
198 self._network,
199 self._station,
200 self._location,
201 'p%i' % ichannel,
202 ydata=self._buffer[:, ichannel],
203 tmin=t,
204 deltat=1.0/self._rate)
206 self.got_trace(tr)
208 self._nsamples_read += self._nsamples_buffer
209 self._buffer = None
210 self._isamples_buffer = None
212 def got_trace(self, tr):
213 logger.info(
214 'Got trace from DATA-CUBE: %s, mean: %g, std: %g' % (
215 tr.summary, num.mean(tr.ydata), num.std(tr.ydata)))
218def main():
219 import sys
220 util.setup_logging('main', 'info')
221 if len(sys.argv) != 2:
222 sys.exit('usage: python -m pyrocko.streaming.datacube DEVICE')
224 device = sys.argv[1]
225 cs = SerialCube(device)
226 try:
227 cs.acquisition_start()
228 except SerialCubeError as e:
229 sys.exit(str(e))
231 try:
232 while True:
233 cs.process()
235 except KeyboardInterrupt:
236 pass
238 finally:
239 cs.acquisition_stop()
242if __name__ == '__main__':
243 main()