1# http://pyrocko.org - GPLv3
2#
3# The Pyrocko Developers, 21st Century
4# ---|P------/S----------~Lg----------
5from __future__ import absolute_import, division
7import time
8import logging
9import weakref
10import math
11import numpy as num
12from scipy import stats
13import threading
14try:
15 import queue
16except ImportError:
17 import Queue as queue
19from pyrocko import trace, util
21logger = logging.getLogger('pyrocko.streaming.serial_hamster')
24class QueueIsEmpty(Exception):
25 pass
28class Queue(object):
29 def __init__(self, nmax):
30 self.nmax = nmax
31 self.queue = []
33 def push_back(self, val):
34 self.queue.append(val)
35 while len(self.queue) > self.nmax:
36 self.queue.pop(0)
38 def mean(self):
39 if not self.queue:
40 raise QueueIsEmpty()
41 return sum(self.queue)/float(len(self.queue))
43 def median(self):
44 if not self.queue:
45 raise QueueIsEmpty()
46 n = len(self.queue)
47 s = sorted(self.queue)
48 if n % 2 != 0:
49 return s[n//2]
50 else:
51 return (s[n//2-1]+s[n//2])/2.0
53 def add(self, w):
54 self.queue = [v+w for v in self.queue]
56 def empty(self):
57 self.queue[:] = []
59 def __len__(self):
60 return len(self.queue)
62 def capacity(self):
63 return self.nmax
66class SerialHamsterError(Exception):
67 pass
70class SerialHamster(object):
72 def __init__(
73 self, port=0, baudrate=9600, timeout=5, buffersize=128,
74 network='', station='TEST', location='', channels=['Z'],
75 disallow_uneven_sampling_rates=True,
76 deltat=None,
77 deltat_tolerance=0.01,
78 in_file=None,
79 lookback=5,
80 tune_to_quickones=True):
82 self.port = port
83 self.baudrate = baudrate
84 self.timeout = timeout
85 self.buffersize = buffersize
86 self.ser = None
87 self.values = [[]]*len(channels)
88 self.times = []
89 self.fixed_deltat = deltat
90 self.deltat = None
91 self.deltat_tolerance = deltat_tolerance
92 self.tmin = None
93 self.previous_deltats = Queue(nmax=lookback)
94 self.previous_tmin_offsets = Queue(nmax=lookback)
95 self.ncontinuous = 0
96 self.disallow_uneven_sampling_rates = disallow_uneven_sampling_rates
97 self.network = network
98 self.station = station
99 self.location = location
100 self.channels = channels
101 self.in_file = in_file # for testing
102 self.listeners = []
103 self.quit_requested = False
104 self.tune_to_quickones = tune_to_quickones
106 self.min_detection_size = 5
108 def add_listener(self, obj):
109 self.listeners.append(weakref.ref(obj))
111 def clear_listeners(self):
112 self.listeners = []
114 def acquisition_start(self):
115 if self.ser is not None:
116 self.stop()
118 logger.debug(
119 'Starting serial hamster (port=%s, baudrate=%i, timeout=%f)'
120 % (self.port, self.baudrate, self.timeout))
122 if self.in_file is None:
123 import serial
124 try:
125 self.ser = serial.Serial(
126 port=self.port,
127 baudrate=self.baudrate,
128 timeout=self.timeout)
130 self.send_start()
132 except serial.serialutil.SerialException:
133 raise SerialHamsterError(
134 'Cannot open serial port %s' % self.port)
135 else:
136 self.ser = self.in_file
138 def send_start(self):
139 pass
141 def acquisition_stop(self):
142 if self.ser is not None:
143 logger.debug('Stopping serial hamster')
144 if self.in_file is None:
145 self.ser.close()
146 self.ser = None
147 self._flush_buffer()
149 def acquisition_request_stop(self):
150 pass
152 def process(self):
153 if self.ser is None:
154 return False
156 try:
157 line = self.ser.readline()
158 if line == '':
159 raise SerialHamsterError(
160 'Failed to read from serial port %s' % self.port)
161 except Exception:
162 raise SerialHamsterError(
163 'Failed to read from serial port %s' % self.port)
165 t = time.time()
167 for tok in line.split():
168 try:
169 val = float(tok)
170 except Exception:
171 logger.warning('Got something unexpected on serial line. ' +
172 'Current line: "%s". ' % line +
173 'Could not convert string to float: "%s"' % tok)
174 continue
176 self.values[0].append(val)
177 self.times.append(t)
179 if len(self.values[0]) >= self.buffersize:
180 self._flush_buffer()
182 return True
184 def _regression(self, t):
185 toff = t[0]
186 t = t-toff
187 i = num.arange(t.size, dtype=float)
188 r_deltat, r_tmin, r, tt, stderr = stats.linregress(i, t)
189 if self.tune_to_quickones:
190 for ii in range(2):
191 t_fit = r_tmin+r_deltat*i
192 quickones = num.where(t < t_fit)
193 if quickones[0].size < 2:
194 break
195 i = i[quickones]
196 t = t[quickones]
197 r_deltat, r_tmin, r, tt, stderr = stats.linregress(i, t)
199 return r_deltat, r_tmin+toff
201 def _flush_buffer(self):
203 if len(self.times) < self.min_detection_size:
204 return
206 t = num.array(self.times, dtype=float)
207 r_deltat, r_tmin = self._regression(t)
209 if self.disallow_uneven_sampling_rates:
210 r_deltat = 1./round(1./r_deltat)
212 # check if deltat is consistent with expectations
213 if self.deltat is not None and self.fixed_deltat is None:
214 try:
215 p_deltat = self.previous_deltats.median()
216 if (((self.disallow_uneven_sampling_rates
217 and abs(1./p_deltat - 1./self.deltat) > 0.5)
218 or (not self.disallow_uneven_sampling_rates
219 and abs((self.deltat - p_deltat)/self.deltat)
220 > self.deltat_tolerance))
221 and len(self.previous_deltats)
222 > 0.5*self.previous_deltats.capacity()):
224 self.deltat = None
225 self.previous_deltats.empty()
226 except QueueIsEmpty:
227 pass
229 self.previous_deltats.push_back(r_deltat)
231 # detect sampling rate
232 if self.deltat is None:
233 if self.fixed_deltat is not None:
234 self.deltat = self.fixed_deltat
235 else:
236 self.deltat = r_deltat
237 # must also set new time origin if sampling rate changes
238 self.tmin = None
239 logger.info(
240 'Setting new sampling rate to %g Hz '
241 '(sampling interval is %g s)' % (
242 1./self.deltat, self.deltat))
244 # check if onset has drifted / jumped
245 if self.deltat is not None and self.tmin is not None:
246 continuous_tmin = self.tmin + self.ncontinuous*self.deltat
248 tmin_offset = r_tmin - continuous_tmin
249 try:
250 toffset = self.previous_tmin_offsets.median()
251 if abs(toffset) > self.deltat*0.7 \
252 and len(self.previous_tmin_offsets) \
253 > 0.5*self.previous_tmin_offsets.capacity():
255 soffset = int(round(toffset/self.deltat))
256 logger.info(
257 'Detected drift/jump/gap of %g sample%s' % (
258 soffset, ['s', ''][abs(soffset) == 1]))
260 if soffset == 1:
261 for values in self.values:
262 values.append(values[-1])
263 self.previous_tmin_offsets.add(-self.deltat)
264 logger.info(
265 'Adding one sample to compensate time drift')
266 elif soffset == -1:
267 for values in self.values:
268 values.pop(-1)
269 self.previous_tmin_offsets.add(+self.deltat)
270 logger.info(
271 'Removing one sample to compensate time drift')
272 else:
273 self.tmin = None
274 self.previous_tmin_offsets.empty()
276 except QueueIsEmpty:
277 pass
279 self.previous_tmin_offsets.push_back(tmin_offset)
281 # detect onset time
282 if self.tmin is None and self.deltat is not None:
283 self.tmin = r_tmin
284 self.ncontinuous = 0
285 logger.info(
286 'Setting new time origin to %s' % util.time_to_str(self.tmin))
288 if self.tmin is not None and self.deltat is not None:
289 for channel, values in zip(self.channels, self.values):
290 v = num.array(values, dtype=int)
292 tr = trace.Trace(
293 network=self.network,
294 station=self.station,
295 location=self.location,
296 channel=channel,
297 tmin=self.tmin + self.ncontinuous*self.deltat,
298 deltat=self.deltat,
299 ydata=v)
301 self.got_trace(tr)
302 self.ncontinuous += v.size
304 self.values = [[]] * len(self.channels)
305 self.times = []
307 def got_trace(self, tr):
308 logger.debug('Completed trace from serial hamster: %s' % tr)
310 # deliver payload to registered listeners
311 for ref in self.listeners:
312 obj = ref()
313 if obj:
314 obj.insert_trace(tr)
317class CamSerialHamster(SerialHamster):
319 def __init__(self, baudrate=115200, channels=['N'], *args, **kwargs):
320 SerialHamster.__init__(
321 self, disallow_uneven_sampling_rates=False, deltat_tolerance=0.001,
322 baudrate=baudrate, channels=channels, *args, **kwargs)
324 def send_start(self):
325 try:
326 ser = self.ser
327 ser.write('99,e\n')
328 a = ser.readline()
329 if not a:
330 raise SerialHamsterError(
331 'Camera did not respond to command "99,e"')
333 logger.debug(
334 'Sent command "99,e" to cam; received answer: "%s"'
335 % a.strip())
337 ser.write('2,e\n')
338 a = ser.readline()
339 if not a:
340 raise SerialHamsterError(
341 'Camera did not respond to command "2,e"')
343 logger.debug(
344 'Sent command "2,e" to cam; received answer: "%s"' % a.strip())
345 ser.write('2,01\n')
346 ser.write('2,f400\n')
347 except Exception:
348 raise SerialHamsterError(
349 'Initialization of camera acquisition failed.')
351 def process(self):
352 ser = self.ser
354 if ser is None:
355 return False
357 ser.write('2,X\n')
358 isamp = 0
359 while True:
360 data = ser.read(2)
361 if len(data) != 2:
362 raise SerialHamsterError(
363 'Failed to read from serial line interface.')
365 uclow = ord(data[0])
366 uchigh = ord(data[1])
368 if uclow == 0xff and uchigh == 0xff:
369 break
371 v = uclow + (uchigh << 8)
373 self.times.append(time.time())
374 self.values[isamp % len(self.channels)].append(v)
375 isamp += 1
377 if len(self.values[-1]) >= self.buffersize:
378 self._flush_buffer()
380 return True
383class USBHB628Hamster(SerialHamster):
385 def __init__(self, baudrate=115200, channels=[(0, 'Z')], *args, **kwargs):
386 SerialHamster.__init__(
387 self,
388 baudrate=baudrate,
389 channels=[x[1] for x in channels],
390 tune_to_quickones=False,
391 *args, **kwargs)
393 self.channel_map = dict([(c[0], j) for (j, c) in enumerate(channels)])
394 self.first_initiated = None
395 self.ntaken = 0
397 def process(self):
398 import serial
400 ser = self.ser
402 if ser is None:
403 return False
405 t = time.time()
407 # determine next appropriate sampling instant
408 if self.first_initiated is not None:
409 ts = self.first_initiated + self.fixed_deltat * self.ntaken
410 if t - ts > self.fixed_deltat*10:
411 logger.warning(
412 'lagging more than ten samples on serial line %s - '
413 'resetting' % self.port)
415 self.first_initiated = None
417 if not self.first_initiated:
418 ts = math.ceil(t/self.fixed_deltat)*self.fixed_deltat
419 self.first_initiated = ts
420 self.ntaken = 0
422 # wait for next sampling instant
423 while t < ts:
424 time.sleep(max(0., ts-t))
425 t = time.time()
427 if t - ts > self.fixed_deltat:
428 logger.warning(
429 'lagging more than one sample on serial line %s' % self.port)
431 # get the sample
432 ser.write('c09')
433 ser.flush()
434 try:
435 data = [ord(x) for x in ser.read(17)]
436 if len(data) != 17:
437 raise SerialHamsterError('Reading from serial line failed.')
439 except serial.serialutil.SerialException:
440 raise SerialHamsterError('Reading from serial line failed.')
442 self.ntaken += 1
444 for ichan in range(8):
445 if ichan in self.channel_map:
446 v = data[ichan*2] + (data[ichan*2+1] << 8)
447 self.values[self.channel_map[ichan]].append(v)
449 self.times.append(t)
451 if len(self.times) >= self.buffersize:
452 self._flush_buffer()
454 return True
457class AcquisitionThread(threading.Thread):
458 def __init__(self, post_process_sleep=0.0):
459 threading.Thread.__init__(self)
460 self.queue = queue.Queue()
461 self.post_process_sleep = post_process_sleep
462 self._sun_is_shining = True
464 def run(self):
465 while True:
466 try:
467 self.acquisition_start()
468 while self._sun_is_shining:
469 t0 = time.time()
470 self.process()
471 t1 = time.time()
472 if self.post_process_sleep != 0.0:
473 time.sleep(max(0, self.post_process_sleep-(t1-t0)))
475 self.acquisition_stop()
476 break
478 except SerialHamsterError as e:
480 logger.error(str(e))
481 logger.error('Acquistion terminated, restart in 5 s')
482 self.acquisition_stop()
483 time.sleep(5)
484 if not self._sun_is_shining:
485 break
487 def stop(self):
488 self._sun_is_shining = False
490 logger.debug("Waiting for thread to terminate...")
491 self.wait()
492 logger.debug("Thread has terminated.")
494 def got_trace(self, tr):
495 self.queue.put(tr)
497 def poll(self):
498 items = []
499 try:
500 while True:
501 items.append(self.queue.get_nowait())
503 except queue.Empty:
504 pass
506 return items
509class Acquisition(
510 SerialHamster, AcquisitionThread):
512 def __init__(self, *args, **kwargs):
513 SerialHamster.__init__(self, *args, **kwargs)
514 AcquisitionThread.__init__(self, post_process_sleep=0.001)
516 def got_trace(self, tr):
517 logger.debug('acquisition got trace rate %g Hz, duration %g s' % (
518 1.0/tr.deltat, tr.tmax - tr.tmin))
519 AcquisitionThread.got_trace(self, tr)