1# http://pyrocko.org - GPLv3
2#
3# The Pyrocko Developers, 21st Century
4# ---|P------/S----------~Lg----------
6import time
7import logging
8import math
9import numpy as num
10from scipy import stats
11import threading
12try:
13 import queue
14except ImportError:
15 import Queue as queue
17from pyrocko import trace, util
19logger = logging.getLogger('pyrocko.streaming.serial_hamster')
22class QueueIsEmpty(Exception):
23 pass
26class Queue(object):
27 def __init__(self, nmax):
28 self.nmax = nmax
29 self.queue = []
31 def push_back(self, val):
32 self.queue.append(val)
33 while len(self.queue) > self.nmax:
34 self.queue.pop(0)
36 def mean(self):
37 if not self.queue:
38 raise QueueIsEmpty()
39 return sum(self.queue)/float(len(self.queue))
41 def median(self):
42 if not self.queue:
43 raise QueueIsEmpty()
44 n = len(self.queue)
45 s = sorted(self.queue)
46 if n % 2 != 0:
47 return s[n//2]
48 else:
49 return (s[n//2-1]+s[n//2])/2.0
51 def add(self, w):
52 self.queue = [v+w for v in self.queue]
54 def empty(self):
55 self.queue[:] = []
57 def __len__(self):
58 return len(self.queue)
60 def capacity(self):
61 return self.nmax
63 def __str__(self):
64 return ' '.join('%g' % v for v in self.queue)
67class SerialHamsterError(Exception):
68 pass
71class SerialHamster(object):
73 def __init__(
74 self, port=0, baudrate=9600, timeout=5, buffersize=128,
75 start_string=None,
76 network='', station='TEST', location='', channels=['Z'],
77 disallow_uneven_sampling_rates=True,
78 deltat=None,
79 deltat_tolerance=0.01,
80 in_file=None,
81 lookback=5,
82 min_detection_size=5,
83 tune_to_quickones=True):
85 self.port = port
86 self.baudrate = baudrate
87 self.timeout = timeout
88 self.buffersize = buffersize
89 self.ser = None
90 self.values = [[]]*len(channels)
91 self.times = []
92 self.fixed_deltat = deltat
93 self.deltat = None
94 self.deltat_tolerance = deltat_tolerance
95 self.tmin = None
96 self.previous_deltats = Queue(nmax=lookback)
97 self.previous_tmin_offsets = Queue(nmax=lookback)
98 self.ncontinuous = 0
99 self.disallow_uneven_sampling_rates = disallow_uneven_sampling_rates
100 self.network = network
101 self.station = station
102 self.location = location
103 self.channels = channels
104 self.in_file = in_file # for testing
105 self.listeners = []
106 self.quit_requested = False
107 self.tune_to_quickones = tune_to_quickones
108 self.start_string = start_string
110 self.min_detection_size = min_detection_size
112 def add_listener(self, obj):
113 self.listeners.append(util.smart_weakref(obj))
115 def clear_listeners(self):
116 self.listeners = []
118 def acquisition_start(self):
119 if self.ser is not None:
120 self.stop()
122 logger.debug(
123 'Starting serial hamster (port=%s, baudrate=%i, timeout=%f)'
124 % (self.port, self.baudrate, self.timeout))
126 if self.in_file is None:
127 import serial
128 try:
129 self.ser = serial.Serial(
130 port=self.port,
131 baudrate=self.baudrate,
132 timeout=self.timeout)
134 self.send_start()
136 except serial.serialutil.SerialException:
137 raise SerialHamsterError(
138 'Cannot open serial port %s' % self.port)
139 else:
140 self.ser = self.in_file
142 def send_start(self):
143 ser = self.ser
144 if self.start_string is not None:
145 ser.write(self.start_string.encode('ascii'))
147 def acquisition_stop(self):
148 if self.ser is not None:
149 logger.debug('Stopping serial hamster')
150 if self.in_file is None:
151 self.ser.close()
152 self.ser = None
153 self._flush_buffer()
155 def acquisition_request_stop(self):
156 pass
158 def process(self):
159 if self.ser is None:
160 return False
162 try:
163 line = self.ser.readline()
164 if line == '':
165 raise SerialHamsterError(
166 'Failed to read from serial port %s' % self.port)
167 except Exception:
168 raise SerialHamsterError(
169 'Failed to read from serial port %s' % self.port)
171 t = time.time()
173 for tok in line.split():
174 try:
175 val = float(tok)
176 except Exception:
177 logger.warning('Got something unexpected on serial line. ' +
178 'Current line: "%s". ' % line +
179 'Could not convert string to float: "%s"' % tok)
180 continue
182 self.values[0].append(val)
183 self.times.append(t)
185 if len(self.values[0]) >= self.buffersize:
186 self._flush_buffer()
188 return True
190 def _regression(self, t):
191 toff = t[0]
192 t = t-toff
193 i = num.arange(t.size, dtype=float)
194 r_deltat, r_tmin, r, tt, stderr = stats.linregress(i, t)
195 if self.tune_to_quickones:
196 for ii in range(2):
197 t_fit = r_tmin+r_deltat*i
198 quickones = num.where(t < t_fit)
199 if quickones[0].size < 2:
200 break
201 i = i[quickones]
202 t = t[quickones]
203 r_deltat, r_tmin, r, tt, stderr = stats.linregress(i, t)
205 return r_deltat, r_tmin+toff
207 def _flush_buffer(self):
209 if len(self.times) < self.min_detection_size:
210 return
212 t = num.array(self.times, dtype=float)
213 r_deltat, r_tmin = self._regression(t)
215 if self.disallow_uneven_sampling_rates:
216 r_deltat = 1./round(1./r_deltat)
218 # check if deltat is consistent with expectations
219 if self.deltat is not None and self.fixed_deltat is None:
220 try:
221 p_deltat = self.previous_deltats.median()
222 if (((self.disallow_uneven_sampling_rates
223 and abs(1./p_deltat - 1./self.deltat) > 0.5)
224 or (not self.disallow_uneven_sampling_rates
225 and abs((self.deltat - p_deltat)/self.deltat)
226 > self.deltat_tolerance))
227 and len(self.previous_deltats)
228 > 0.5*self.previous_deltats.capacity()):
230 self.deltat = None
231 self.previous_deltats.empty()
232 except QueueIsEmpty:
233 pass
235 self.previous_deltats.push_back(r_deltat)
237 # detect sampling rate
238 if self.deltat is None:
239 if self.fixed_deltat is not None:
240 self.deltat = self.fixed_deltat
241 else:
242 self.deltat = r_deltat
243 # must also set new time origin if sampling rate changes
244 self.tmin = None
245 logger.info(
246 'Setting new sampling rate to %g Hz '
247 '(sampling interval is %g s)' % (
248 1./self.deltat, self.deltat))
250 # check if onset has drifted / jumped
251 if self.deltat is not None and self.tmin is not None:
252 continuous_tmin = self.tmin + self.ncontinuous*self.deltat
254 tmin_offset = r_tmin - continuous_tmin
255 try:
256 toffset = self.previous_tmin_offsets.median()
257 if abs(toffset) > self.deltat*0.7 \
258 and len(self.previous_tmin_offsets) \
259 > 0.5*self.previous_tmin_offsets.capacity():
261 soffset = int(round(toffset/self.deltat))
262 logger.info(
263 'Detected drift/jump/gap of %g sample%s' % (
264 soffset, ['s', ''][abs(soffset) == 1]))
266 if soffset == 1:
267 for values in self.values:
268 values.append(values[-1])
269 self.previous_tmin_offsets.add(-self.deltat)
270 logger.info(
271 'Adding one sample to compensate time drift')
272 elif soffset == -1:
273 for values in self.values:
274 values.pop(-1)
275 self.previous_tmin_offsets.add(+self.deltat)
276 logger.info(
277 'Removing one sample to compensate time drift')
278 else:
279 self.tmin = None
280 self.previous_tmin_offsets.empty()
282 except QueueIsEmpty:
283 pass
285 self.previous_tmin_offsets.push_back(tmin_offset)
287 # detect onset time
288 if self.tmin is None and self.deltat is not None:
289 self.tmin = r_tmin
290 self.ncontinuous = 0
291 logger.info(
292 'Setting new time origin to %s' % util.time_to_str(self.tmin))
294 if self.tmin is not None and self.deltat is not None:
295 for channel, values in zip(self.channels, self.values):
296 v = num.array(values, dtype=num.int32)
298 tr = trace.Trace(
299 network=self.network,
300 station=self.station,
301 location=self.location,
302 channel=channel,
303 tmin=self.tmin + self.ncontinuous*self.deltat,
304 deltat=self.deltat,
305 ydata=v)
307 self.got_trace(tr)
308 self.ncontinuous += v.size
310 self.values = [[]] * len(self.channels)
311 self.times = []
313 def got_trace(self, tr):
314 logger.debug('Completed trace from serial hamster: %s' % tr)
316 # deliver payload to registered listeners
317 for ref in self.listeners:
318 obj = ref()
319 if obj:
320 obj.insert_trace(tr)
323class CamSerialHamster(SerialHamster):
325 def __init__(self, baudrate=115200, channels=['N'], *args, **kwargs):
326 SerialHamster.__init__(
327 self, disallow_uneven_sampling_rates=False, deltat_tolerance=0.001,
328 baudrate=baudrate, channels=channels, *args, **kwargs)
330 def send_start(self):
331 try:
332 ser = self.ser
333 ser.write('99,e\n')
334 a = ser.readline()
335 if not a:
336 raise SerialHamsterError(
337 'Camera did not respond to command "99,e"')
339 logger.debug(
340 'Sent command "99,e" to cam; received answer: "%s"'
341 % a.strip())
343 ser.write('2,e\n')
344 a = ser.readline()
345 if not a:
346 raise SerialHamsterError(
347 'Camera did not respond to command "2,e"')
349 logger.debug(
350 'Sent command "2,e" to cam; received answer: "%s"' % a.strip())
351 ser.write('2,01\n')
352 ser.write('2,f400\n')
353 except Exception:
354 raise SerialHamsterError(
355 'Initialization of camera acquisition failed.')
357 def process(self):
358 ser = self.ser
360 if ser is None:
361 return False
363 ser.write('2,X\n')
364 isamp = 0
365 while True:
366 data = ser.read(2)
367 if len(data) != 2:
368 raise SerialHamsterError(
369 'Failed to read from serial line interface.')
371 uclow = ord(data[0])
372 uchigh = ord(data[1])
374 if uclow == 0xff and uchigh == 0xff:
375 break
377 v = uclow + (uchigh << 8)
379 self.times.append(time.time())
380 self.values[isamp % len(self.channels)].append(v)
381 isamp += 1
383 if len(self.values[-1]) >= self.buffersize:
384 self._flush_buffer()
386 return True
389class USBHB628Hamster(SerialHamster):
391 def __init__(self, baudrate=115200, channels=[(0, 'Z')], *args, **kwargs):
392 SerialHamster.__init__(
393 self,
394 baudrate=baudrate,
395 channels=[x[1] for x in channels],
396 tune_to_quickones=False,
397 *args, **kwargs)
399 self.channel_map = dict([(c[0], j) for (j, c) in enumerate(channels)])
400 self.first_initiated = None
401 self.ntaken = 0
403 def process(self):
404 import serial
406 ser = self.ser
408 if ser is None:
409 return False
411 t = time.time()
413 # determine next appropriate sampling instant
414 if self.first_initiated is not None:
415 ts = self.first_initiated + self.fixed_deltat * self.ntaken
416 if t - ts > self.fixed_deltat*10:
417 logger.warning(
418 'lagging more than ten samples on serial line %s - '
419 'resetting' % self.port)
421 self.first_initiated = None
423 if not self.first_initiated:
424 ts = math.ceil(t/self.fixed_deltat)*self.fixed_deltat
425 self.first_initiated = ts
426 self.ntaken = 0
428 # wait for next sampling instant
429 while t < ts:
430 time.sleep(max(0., ts-t))
431 t = time.time()
433 if t - ts > self.fixed_deltat:
434 logger.warning(
435 'lagging more than one sample on serial line %s' % self.port)
437 # get the sample
438 ser.write('c09')
439 ser.flush()
440 try:
441 data = [ord(x) for x in ser.read(17)]
442 if len(data) != 17:
443 raise SerialHamsterError('Reading from serial line failed.')
445 except serial.serialutil.SerialException:
446 raise SerialHamsterError('Reading from serial line failed.')
448 self.ntaken += 1
450 for ichan in range(8):
451 if ichan in self.channel_map:
452 v = data[ichan*2] + (data[ichan*2+1] << 8)
453 self.values[self.channel_map[ichan]].append(v)
455 self.times.append(t)
457 if len(self.times) >= self.buffersize:
458 self._flush_buffer()
460 return True
463class AcquisitionThread(threading.Thread):
464 def __init__(self, post_process_sleep=0.0):
465 threading.Thread.__init__(self)
466 self.queue = queue.Queue()
467 self.post_process_sleep = post_process_sleep
468 self._sun_is_shining = True
470 def run(self):
471 while True:
472 try:
473 self.acquisition_start()
474 while self._sun_is_shining:
475 t0 = time.time()
476 self.process()
477 t1 = time.time()
478 if self.post_process_sleep != 0.0:
479 time.sleep(max(0, self.post_process_sleep-(t1-t0)))
481 self.acquisition_stop()
482 break
484 except SerialHamsterError as e:
486 logger.error(str(e))
487 logger.error('Acquistion terminated, restart in 5 s')
488 self.acquisition_stop()
489 time.sleep(5)
490 if not self._sun_is_shining:
491 break
493 def stop(self):
494 self._sun_is_shining = False
496 logger.debug('Waiting for thread to terminate...')
497 self.wait()
498 logger.debug('Thread has terminated.')
500 def got_trace(self, tr):
501 self.queue.put(tr)
503 def poll(self):
504 items = []
505 try:
506 while True:
507 items.append(self.queue.get_nowait())
509 except queue.Empty:
510 pass
512 return items
515class Acquisition(
516 SerialHamster, AcquisitionThread):
518 def __init__(self, *args, **kwargs):
519 SerialHamster.__init__(self, *args, **kwargs)
520 AcquisitionThread.__init__(self, post_process_sleep=0.001)
522 def got_trace(self, tr):
523 logger.debug('acquisition got trace rate %g Hz, duration %g s' % (
524 1.0/tr.deltat, tr.tmax - tr.tmin))
525 AcquisitionThread.got_trace(self, tr)