Coverage for /usr/local/lib/python3.11/dist-packages/pyrocko/streaming/serial_hamster.py: 16%
318 statements
« prev ^ index » next coverage.py v6.5.0, created at 2023-10-06 06:59 +0000
« prev ^ index » next coverage.py v6.5.0, created at 2023-10-06 06:59 +0000
1# http://pyrocko.org - GPLv3
2#
3# The Pyrocko Developers, 21st Century
4# ---|P------/S----------~Lg----------
6'''
7Live stream reader for a few USB/serial digitizers.
8'''
10import time
11import logging
12import math
13import numpy as num
14from scipy import stats
15import threading
16try:
17 import queue
18except ImportError:
19 import Queue as queue
21from pyrocko import trace, util
23logger = logging.getLogger('pyrocko.streaming.serial_hamster')
26class QueueIsEmpty(Exception):
27 pass
30class Queue(object):
31 def __init__(self, nmax):
32 self.nmax = nmax
33 self.queue = []
35 def push_back(self, val):
36 self.queue.append(val)
37 while len(self.queue) > self.nmax:
38 self.queue.pop(0)
40 def mean(self):
41 if not self.queue:
42 raise QueueIsEmpty()
43 return sum(self.queue)/float(len(self.queue))
45 def median(self):
46 if not self.queue:
47 raise QueueIsEmpty()
48 n = len(self.queue)
49 s = sorted(self.queue)
50 if n % 2 != 0:
51 return s[n//2]
52 else:
53 return (s[n//2-1]+s[n//2])/2.0
55 def add(self, w):
56 self.queue = [v+w for v in self.queue]
58 def empty(self):
59 self.queue[:] = []
61 def __len__(self):
62 return len(self.queue)
64 def capacity(self):
65 return self.nmax
67 def __str__(self):
68 return ' '.join('%g' % v for v in self.queue)
71class SerialHamsterError(Exception):
72 pass
75class SerialHamster(object):
77 def __init__(
78 self, port=0, baudrate=9600, timeout=5, buffersize=128,
79 start_string=None,
80 network='', station='TEST', location='', channels=['Z'],
81 disallow_uneven_sampling_rates=True,
82 deltat=None,
83 deltat_tolerance=0.01,
84 in_file=None,
85 lookback=5,
86 min_detection_size=5,
87 tune_to_quickones=True):
89 self.port = port
90 self.baudrate = baudrate
91 self.timeout = timeout
92 self.buffersize = buffersize
93 self.ser = None
94 self.values = [[]]*len(channels)
95 self.times = []
96 self.fixed_deltat = deltat
97 self.deltat = None
98 self.deltat_tolerance = deltat_tolerance
99 self.tmin = None
100 self.previous_deltats = Queue(nmax=lookback)
101 self.previous_tmin_offsets = Queue(nmax=lookback)
102 self.ncontinuous = 0
103 self.disallow_uneven_sampling_rates = disallow_uneven_sampling_rates
104 self.network = network
105 self.station = station
106 self.location = location
107 self.channels = channels
108 self.in_file = in_file # for testing
109 self.listeners = []
110 self.quit_requested = False
111 self.tune_to_quickones = tune_to_quickones
112 self.start_string = start_string
114 self.min_detection_size = min_detection_size
116 def add_listener(self, obj):
117 self.listeners.append(util.smart_weakref(obj))
119 def clear_listeners(self):
120 self.listeners = []
122 def acquisition_start(self):
123 if self.ser is not None:
124 self.stop()
126 logger.debug(
127 'Starting serial hamster (port=%s, baudrate=%i, timeout=%f)'
128 % (self.port, self.baudrate, self.timeout))
130 if self.in_file is None:
131 import serial
132 try:
133 self.ser = serial.Serial(
134 port=self.port,
135 baudrate=self.baudrate,
136 timeout=self.timeout)
138 self.send_start()
140 except serial.serialutil.SerialException:
141 raise SerialHamsterError(
142 'Cannot open serial port %s' % self.port)
143 else:
144 self.ser = self.in_file
146 def send_start(self):
147 ser = self.ser
148 if self.start_string is not None:
149 ser.write(self.start_string.encode('ascii'))
151 def acquisition_stop(self):
152 if self.ser is not None:
153 logger.debug('Stopping serial hamster')
154 if self.in_file is None:
155 self.ser.close()
156 self.ser = None
157 self._flush_buffer()
159 def acquisition_request_stop(self):
160 pass
162 def process(self):
163 if self.ser is None:
164 return False
166 try:
167 line = self.ser.readline()
168 if line == '':
169 raise SerialHamsterError(
170 'Failed to read from serial port %s' % self.port)
171 except Exception:
172 raise SerialHamsterError(
173 'Failed to read from serial port %s' % self.port)
175 t = time.time()
177 for tok in line.split():
178 try:
179 val = float(tok)
180 except Exception:
181 logger.warning('Got something unexpected on serial line. ' +
182 'Current line: "%s". ' % line +
183 'Could not convert string to float: "%s"' % tok)
184 continue
186 self.values[0].append(val)
187 self.times.append(t)
189 if len(self.values[0]) >= self.buffersize:
190 self._flush_buffer()
192 return True
194 def _regression(self, t):
195 toff = t[0]
196 t = t-toff
197 i = num.arange(t.size, dtype=float)
198 r_deltat, r_tmin, r, tt, stderr = stats.linregress(i, t)
199 if self.tune_to_quickones:
200 for ii in range(2):
201 t_fit = r_tmin+r_deltat*i
202 quickones = num.where(t < t_fit)
203 if quickones[0].size < 2:
204 break
205 i = i[quickones]
206 t = t[quickones]
207 r_deltat, r_tmin, r, tt, stderr = stats.linregress(i, t)
209 return r_deltat, r_tmin+toff
211 def _flush_buffer(self):
213 if len(self.times) < self.min_detection_size:
214 return
216 t = num.array(self.times, dtype=float)
217 r_deltat, r_tmin = self._regression(t)
219 if self.disallow_uneven_sampling_rates:
220 r_deltat = 1./round(1./r_deltat)
222 # check if deltat is consistent with expectations
223 if self.deltat is not None and self.fixed_deltat is None:
224 try:
225 p_deltat = self.previous_deltats.median()
226 if (((self.disallow_uneven_sampling_rates
227 and abs(1./p_deltat - 1./self.deltat) > 0.5)
228 or (not self.disallow_uneven_sampling_rates
229 and abs((self.deltat - p_deltat)/self.deltat)
230 > self.deltat_tolerance))
231 and len(self.previous_deltats)
232 > 0.5*self.previous_deltats.capacity()):
234 self.deltat = None
235 self.previous_deltats.empty()
236 except QueueIsEmpty:
237 pass
239 self.previous_deltats.push_back(r_deltat)
241 # detect sampling rate
242 if self.deltat is None:
243 if self.fixed_deltat is not None:
244 self.deltat = self.fixed_deltat
245 else:
246 self.deltat = r_deltat
247 # must also set new time origin if sampling rate changes
248 self.tmin = None
249 logger.info(
250 'Setting new sampling rate to %g Hz '
251 '(sampling interval is %g s)' % (
252 1./self.deltat, self.deltat))
254 # check if onset has drifted / jumped
255 if self.deltat is not None and self.tmin is not None:
256 continuous_tmin = self.tmin + self.ncontinuous*self.deltat
258 tmin_offset = r_tmin - continuous_tmin
259 try:
260 toffset = self.previous_tmin_offsets.median()
261 if abs(toffset) > self.deltat*0.7 \
262 and len(self.previous_tmin_offsets) \
263 > 0.5*self.previous_tmin_offsets.capacity():
265 soffset = int(round(toffset/self.deltat))
266 logger.info(
267 'Detected drift/jump/gap of %g sample%s' % (
268 soffset, ['s', ''][abs(soffset) == 1]))
270 if soffset == 1:
271 for values in self.values:
272 values.append(values[-1])
273 self.previous_tmin_offsets.add(-self.deltat)
274 logger.info(
275 'Adding one sample to compensate time drift')
276 elif soffset == -1:
277 for values in self.values:
278 values.pop(-1)
279 self.previous_tmin_offsets.add(+self.deltat)
280 logger.info(
281 'Removing one sample to compensate time drift')
282 else:
283 self.tmin = None
284 self.previous_tmin_offsets.empty()
286 except QueueIsEmpty:
287 pass
289 self.previous_tmin_offsets.push_back(tmin_offset)
291 # detect onset time
292 if self.tmin is None and self.deltat is not None:
293 self.tmin = r_tmin
294 self.ncontinuous = 0
295 logger.info(
296 'Setting new time origin to %s' % util.time_to_str(self.tmin))
298 if self.tmin is not None and self.deltat is not None:
299 for channel, values in zip(self.channels, self.values):
300 v = num.array(values, dtype=num.int32)
302 tr = trace.Trace(
303 network=self.network,
304 station=self.station,
305 location=self.location,
306 channel=channel,
307 tmin=self.tmin + self.ncontinuous*self.deltat,
308 deltat=self.deltat,
309 ydata=v)
311 self.got_trace(tr)
312 self.ncontinuous += v.size
314 self.values = [[]] * len(self.channels)
315 self.times = []
317 def got_trace(self, tr):
318 logger.debug('Completed trace from serial hamster: %s' % tr)
320 # deliver payload to registered listeners
321 for ref in self.listeners:
322 obj = ref()
323 if obj:
324 obj.insert_trace(tr)
327class CamSerialHamster(SerialHamster):
329 def __init__(self, baudrate=115200, channels=['N'], *args, **kwargs):
330 SerialHamster.__init__(
331 self, disallow_uneven_sampling_rates=False, deltat_tolerance=0.001,
332 baudrate=baudrate, channels=channels, *args, **kwargs)
334 def send_start(self):
335 try:
336 ser = self.ser
337 ser.write('99,e\n')
338 a = ser.readline()
339 if not a:
340 raise SerialHamsterError(
341 'Camera did not respond to command "99,e"')
343 logger.debug(
344 'Sent command "99,e" to cam; received answer: "%s"'
345 % a.strip())
347 ser.write('2,e\n')
348 a = ser.readline()
349 if not a:
350 raise SerialHamsterError(
351 'Camera did not respond to command "2,e"')
353 logger.debug(
354 'Sent command "2,e" to cam; received answer: "%s"' % a.strip())
355 ser.write('2,01\n')
356 ser.write('2,f400\n')
357 except Exception:
358 raise SerialHamsterError(
359 'Initialization of camera acquisition failed.')
361 def process(self):
362 ser = self.ser
364 if ser is None:
365 return False
367 ser.write('2,X\n')
368 isamp = 0
369 while True:
370 data = ser.read(2)
371 if len(data) != 2:
372 raise SerialHamsterError(
373 'Failed to read from serial line interface.')
375 uclow = ord(data[0])
376 uchigh = ord(data[1])
378 if uclow == 0xff and uchigh == 0xff:
379 break
381 v = uclow + (uchigh << 8)
383 self.times.append(time.time())
384 self.values[isamp % len(self.channels)].append(v)
385 isamp += 1
387 if len(self.values[-1]) >= self.buffersize:
388 self._flush_buffer()
390 return True
393class USBHB628Hamster(SerialHamster):
395 def __init__(self, baudrate=115200, channels=[(0, 'Z')], *args, **kwargs):
396 SerialHamster.__init__(
397 self,
398 baudrate=baudrate,
399 channels=[x[1] for x in channels],
400 tune_to_quickones=False,
401 *args, **kwargs)
403 self.channel_map = dict([(c[0], j) for (j, c) in enumerate(channels)])
404 self.first_initiated = None
405 self.ntaken = 0
407 def process(self):
408 import serial
410 ser = self.ser
412 if ser is None:
413 return False
415 t = time.time()
417 # determine next appropriate sampling instant
418 if self.first_initiated is not None:
419 ts = self.first_initiated + self.fixed_deltat * self.ntaken
420 if t - ts > self.fixed_deltat*10:
421 logger.warning(
422 'lagging more than ten samples on serial line %s - '
423 'resetting' % self.port)
425 self.first_initiated = None
427 if not self.first_initiated:
428 ts = math.ceil(t/self.fixed_deltat)*self.fixed_deltat
429 self.first_initiated = ts
430 self.ntaken = 0
432 # wait for next sampling instant
433 while t < ts:
434 time.sleep(max(0., ts-t))
435 t = time.time()
437 if t - ts > self.fixed_deltat:
438 logger.warning(
439 'lagging more than one sample on serial line %s' % self.port)
441 # get the sample
442 ser.write('c09')
443 ser.flush()
444 try:
445 data = [ord(x) for x in ser.read(17)]
446 if len(data) != 17:
447 raise SerialHamsterError('Reading from serial line failed.')
449 except serial.serialutil.SerialException:
450 raise SerialHamsterError('Reading from serial line failed.')
452 self.ntaken += 1
454 for ichan in range(8):
455 if ichan in self.channel_map:
456 v = data[ichan*2] + (data[ichan*2+1] << 8)
457 self.values[self.channel_map[ichan]].append(v)
459 self.times.append(t)
461 if len(self.times) >= self.buffersize:
462 self._flush_buffer()
464 return True
467class AcquisitionThread(threading.Thread):
468 def __init__(self, post_process_sleep=0.0):
469 threading.Thread.__init__(self)
470 self.queue = queue.Queue()
471 self.post_process_sleep = post_process_sleep
472 self._sun_is_shining = True
474 def run(self):
475 while True:
476 try:
477 self.acquisition_start()
478 while self._sun_is_shining:
479 t0 = time.time()
480 self.process()
481 t1 = time.time()
482 if self.post_process_sleep != 0.0:
483 time.sleep(max(0, self.post_process_sleep-(t1-t0)))
485 self.acquisition_stop()
486 break
488 except SerialHamsterError as e:
490 logger.error(str(e))
491 logger.error('Acquistion terminated, restart in 5 s')
492 self.acquisition_stop()
493 time.sleep(5)
494 if not self._sun_is_shining:
495 break
497 def stop(self):
498 self._sun_is_shining = False
500 logger.debug('Waiting for thread to terminate...')
501 self.wait()
502 logger.debug('Thread has terminated.')
504 def got_trace(self, tr):
505 self.queue.put(tr)
507 def poll(self):
508 items = []
509 try:
510 while True:
511 items.append(self.queue.get_nowait())
513 except queue.Empty:
514 pass
516 return items
519class Acquisition(
520 SerialHamster, AcquisitionThread):
522 def __init__(self, *args, **kwargs):
523 SerialHamster.__init__(self, *args, **kwargs)
524 AcquisitionThread.__init__(self, post_process_sleep=0.001)
526 def got_trace(self, tr):
527 logger.debug('acquisition got trace rate %g Hz, duration %g s' % (
528 1.0/tr.deltat, tr.tmax - tr.tmin))
529 AcquisitionThread.got_trace(self, tr)