1# http://pyrocko.org - GPLv3
2#
3# The Pyrocko Developers, 21st Century
4# ---|P------/S----------~Lg----------
6import time
7import logging
8import weakref
9import math
10import numpy as num
11from scipy import stats
12import threading
13try:
14 import queue
15except ImportError:
16 import Queue as queue
18from pyrocko import trace, util
20logger = logging.getLogger('pyrocko.streaming.serial_hamster')
23class QueueIsEmpty(Exception):
24 pass
27class Queue(object):
28 def __init__(self, nmax):
29 self.nmax = nmax
30 self.queue = []
32 def push_back(self, val):
33 self.queue.append(val)
34 while len(self.queue) > self.nmax:
35 self.queue.pop(0)
37 def mean(self):
38 if not self.queue:
39 raise QueueIsEmpty()
40 return sum(self.queue)/float(len(self.queue))
42 def median(self):
43 if not self.queue:
44 raise QueueIsEmpty()
45 n = len(self.queue)
46 s = sorted(self.queue)
47 if n % 2 != 0:
48 return s[n//2]
49 else:
50 return (s[n//2-1]+s[n//2])/2.0
52 def add(self, w):
53 self.queue = [v+w for v in self.queue]
55 def empty(self):
56 self.queue[:] = []
58 def __len__(self):
59 return len(self.queue)
61 def capacity(self):
62 return self.nmax
65class SerialHamsterError(Exception):
66 pass
69class SerialHamster(object):
71 def __init__(
72 self, port=0, baudrate=9600, timeout=5, buffersize=128,
73 network='', station='TEST', location='', channels=['Z'],
74 disallow_uneven_sampling_rates=True,
75 deltat=None,
76 deltat_tolerance=0.01,
77 in_file=None,
78 lookback=5,
79 tune_to_quickones=True):
81 self.port = port
82 self.baudrate = baudrate
83 self.timeout = timeout
84 self.buffersize = buffersize
85 self.ser = None
86 self.values = [[]]*len(channels)
87 self.times = []
88 self.fixed_deltat = deltat
89 self.deltat = None
90 self.deltat_tolerance = deltat_tolerance
91 self.tmin = None
92 self.previous_deltats = Queue(nmax=lookback)
93 self.previous_tmin_offsets = Queue(nmax=lookback)
94 self.ncontinuous = 0
95 self.disallow_uneven_sampling_rates = disallow_uneven_sampling_rates
96 self.network = network
97 self.station = station
98 self.location = location
99 self.channels = channels
100 self.in_file = in_file # for testing
101 self.listeners = []
102 self.quit_requested = False
103 self.tune_to_quickones = tune_to_quickones
105 self.min_detection_size = 5
107 def add_listener(self, obj):
108 self.listeners.append(weakref.ref(obj))
110 def clear_listeners(self):
111 self.listeners = []
113 def acquisition_start(self):
114 if self.ser is not None:
115 self.stop()
117 logger.debug(
118 'Starting serial hamster (port=%s, baudrate=%i, timeout=%f)'
119 % (self.port, self.baudrate, self.timeout))
121 if self.in_file is None:
122 import serial
123 try:
124 self.ser = serial.Serial(
125 port=self.port,
126 baudrate=self.baudrate,
127 timeout=self.timeout)
129 self.send_start()
131 except serial.serialutil.SerialException:
132 raise SerialHamsterError(
133 'Cannot open serial port %s' % self.port)
134 else:
135 self.ser = self.in_file
137 def send_start(self):
138 pass
140 def acquisition_stop(self):
141 if self.ser is not None:
142 logger.debug('Stopping serial hamster')
143 if self.in_file is None:
144 self.ser.close()
145 self.ser = None
146 self._flush_buffer()
148 def acquisition_request_stop(self):
149 pass
151 def process(self):
152 if self.ser is None:
153 return False
155 try:
156 line = self.ser.readline()
157 if line == '':
158 raise SerialHamsterError(
159 'Failed to read from serial port %s' % self.port)
160 except Exception:
161 raise SerialHamsterError(
162 'Failed to read from serial port %s' % self.port)
164 t = time.time()
166 for tok in line.split():
167 try:
168 val = float(tok)
169 except Exception:
170 logger.warning('Got something unexpected on serial line. ' +
171 'Current line: "%s". ' % line +
172 'Could not convert string to float: "%s"' % tok)
173 continue
175 self.values[0].append(val)
176 self.times.append(t)
178 if len(self.values[0]) >= self.buffersize:
179 self._flush_buffer()
181 return True
183 def _regression(self, t):
184 toff = t[0]
185 t = t-toff
186 i = num.arange(t.size, dtype=float)
187 r_deltat, r_tmin, r, tt, stderr = stats.linregress(i, t)
188 if self.tune_to_quickones:
189 for ii in range(2):
190 t_fit = r_tmin+r_deltat*i
191 quickones = num.where(t < t_fit)
192 if quickones[0].size < 2:
193 break
194 i = i[quickones]
195 t = t[quickones]
196 r_deltat, r_tmin, r, tt, stderr = stats.linregress(i, t)
198 return r_deltat, r_tmin+toff
200 def _flush_buffer(self):
202 if len(self.times) < self.min_detection_size:
203 return
205 t = num.array(self.times, dtype=float)
206 r_deltat, r_tmin = self._regression(t)
208 if self.disallow_uneven_sampling_rates:
209 r_deltat = 1./round(1./r_deltat)
211 # check if deltat is consistent with expectations
212 if self.deltat is not None and self.fixed_deltat is None:
213 try:
214 p_deltat = self.previous_deltats.median()
215 if (((self.disallow_uneven_sampling_rates
216 and abs(1./p_deltat - 1./self.deltat) > 0.5)
217 or (not self.disallow_uneven_sampling_rates
218 and abs((self.deltat - p_deltat)/self.deltat)
219 > self.deltat_tolerance))
220 and len(self.previous_deltats)
221 > 0.5*self.previous_deltats.capacity()):
223 self.deltat = None
224 self.previous_deltats.empty()
225 except QueueIsEmpty:
226 pass
228 self.previous_deltats.push_back(r_deltat)
230 # detect sampling rate
231 if self.deltat is None:
232 if self.fixed_deltat is not None:
233 self.deltat = self.fixed_deltat
234 else:
235 self.deltat = r_deltat
236 # must also set new time origin if sampling rate changes
237 self.tmin = None
238 logger.info(
239 'Setting new sampling rate to %g Hz '
240 '(sampling interval is %g s)' % (
241 1./self.deltat, self.deltat))
243 # check if onset has drifted / jumped
244 if self.deltat is not None and self.tmin is not None:
245 continuous_tmin = self.tmin + self.ncontinuous*self.deltat
247 tmin_offset = r_tmin - continuous_tmin
248 try:
249 toffset = self.previous_tmin_offsets.median()
250 if abs(toffset) > self.deltat*0.7 \
251 and len(self.previous_tmin_offsets) \
252 > 0.5*self.previous_tmin_offsets.capacity():
254 soffset = int(round(toffset/self.deltat))
255 logger.info(
256 'Detected drift/jump/gap of %g sample%s' % (
257 soffset, ['s', ''][abs(soffset) == 1]))
259 if soffset == 1:
260 for values in self.values:
261 values.append(values[-1])
262 self.previous_tmin_offsets.add(-self.deltat)
263 logger.info(
264 'Adding one sample to compensate time drift')
265 elif soffset == -1:
266 for values in self.values:
267 values.pop(-1)
268 self.previous_tmin_offsets.add(+self.deltat)
269 logger.info(
270 'Removing one sample to compensate time drift')
271 else:
272 self.tmin = None
273 self.previous_tmin_offsets.empty()
275 except QueueIsEmpty:
276 pass
278 self.previous_tmin_offsets.push_back(tmin_offset)
280 # detect onset time
281 if self.tmin is None and self.deltat is not None:
282 self.tmin = r_tmin
283 self.ncontinuous = 0
284 logger.info(
285 'Setting new time origin to %s' % util.time_to_str(self.tmin))
287 if self.tmin is not None and self.deltat is not None:
288 for channel, values in zip(self.channels, self.values):
289 v = num.array(values, dtype=int)
291 tr = trace.Trace(
292 network=self.network,
293 station=self.station,
294 location=self.location,
295 channel=channel,
296 tmin=self.tmin + self.ncontinuous*self.deltat,
297 deltat=self.deltat,
298 ydata=v)
300 self.got_trace(tr)
301 self.ncontinuous += v.size
303 self.values = [[]] * len(self.channels)
304 self.times = []
306 def got_trace(self, tr):
307 logger.debug('Completed trace from serial hamster: %s' % tr)
309 # deliver payload to registered listeners
310 for ref in self.listeners:
311 obj = ref()
312 if obj:
313 obj.insert_trace(tr)
316class CamSerialHamster(SerialHamster):
318 def __init__(self, baudrate=115200, channels=['N'], *args, **kwargs):
319 SerialHamster.__init__(
320 self, disallow_uneven_sampling_rates=False, deltat_tolerance=0.001,
321 baudrate=baudrate, channels=channels, *args, **kwargs)
323 def send_start(self):
324 try:
325 ser = self.ser
326 ser.write('99,e\n')
327 a = ser.readline()
328 if not a:
329 raise SerialHamsterError(
330 'Camera did not respond to command "99,e"')
332 logger.debug(
333 'Sent command "99,e" to cam; received answer: "%s"'
334 % a.strip())
336 ser.write('2,e\n')
337 a = ser.readline()
338 if not a:
339 raise SerialHamsterError(
340 'Camera did not respond to command "2,e"')
342 logger.debug(
343 'Sent command "2,e" to cam; received answer: "%s"' % a.strip())
344 ser.write('2,01\n')
345 ser.write('2,f400\n')
346 except Exception:
347 raise SerialHamsterError(
348 'Initialization of camera acquisition failed.')
350 def process(self):
351 ser = self.ser
353 if ser is None:
354 return False
356 ser.write('2,X\n')
357 isamp = 0
358 while True:
359 data = ser.read(2)
360 if len(data) != 2:
361 raise SerialHamsterError(
362 'Failed to read from serial line interface.')
364 uclow = ord(data[0])
365 uchigh = ord(data[1])
367 if uclow == 0xff and uchigh == 0xff:
368 break
370 v = uclow + (uchigh << 8)
372 self.times.append(time.time())
373 self.values[isamp % len(self.channels)].append(v)
374 isamp += 1
376 if len(self.values[-1]) >= self.buffersize:
377 self._flush_buffer()
379 return True
382class USBHB628Hamster(SerialHamster):
384 def __init__(self, baudrate=115200, channels=[(0, 'Z')], *args, **kwargs):
385 SerialHamster.__init__(
386 self,
387 baudrate=baudrate,
388 channels=[x[1] for x in channels],
389 tune_to_quickones=False,
390 *args, **kwargs)
392 self.channel_map = dict([(c[0], j) for (j, c) in enumerate(channels)])
393 self.first_initiated = None
394 self.ntaken = 0
396 def process(self):
397 import serial
399 ser = self.ser
401 if ser is None:
402 return False
404 t = time.time()
406 # determine next appropriate sampling instant
407 if self.first_initiated is not None:
408 ts = self.first_initiated + self.fixed_deltat * self.ntaken
409 if t - ts > self.fixed_deltat*10:
410 logger.warning(
411 'lagging more than ten samples on serial line %s - '
412 'resetting' % self.port)
414 self.first_initiated = None
416 if not self.first_initiated:
417 ts = math.ceil(t/self.fixed_deltat)*self.fixed_deltat
418 self.first_initiated = ts
419 self.ntaken = 0
421 # wait for next sampling instant
422 while t < ts:
423 time.sleep(max(0., ts-t))
424 t = time.time()
426 if t - ts > self.fixed_deltat:
427 logger.warning(
428 'lagging more than one sample on serial line %s' % self.port)
430 # get the sample
431 ser.write('c09')
432 ser.flush()
433 try:
434 data = [ord(x) for x in ser.read(17)]
435 if len(data) != 17:
436 raise SerialHamsterError('Reading from serial line failed.')
438 except serial.serialutil.SerialException:
439 raise SerialHamsterError('Reading from serial line failed.')
441 self.ntaken += 1
443 for ichan in range(8):
444 if ichan in self.channel_map:
445 v = data[ichan*2] + (data[ichan*2+1] << 8)
446 self.values[self.channel_map[ichan]].append(v)
448 self.times.append(t)
450 if len(self.times) >= self.buffersize:
451 self._flush_buffer()
453 return True
456class AcquisitionThread(threading.Thread):
457 def __init__(self, post_process_sleep=0.0):
458 threading.Thread.__init__(self)
459 self.queue = queue.Queue()
460 self.post_process_sleep = post_process_sleep
461 self._sun_is_shining = True
463 def run(self):
464 while True:
465 try:
466 self.acquisition_start()
467 while self._sun_is_shining:
468 t0 = time.time()
469 self.process()
470 t1 = time.time()
471 if self.post_process_sleep != 0.0:
472 time.sleep(max(0, self.post_process_sleep-(t1-t0)))
474 self.acquisition_stop()
475 break
477 except SerialHamsterError as e:
479 logger.error(str(e))
480 logger.error('Acquistion terminated, restart in 5 s')
481 self.acquisition_stop()
482 time.sleep(5)
483 if not self._sun_is_shining:
484 break
486 def stop(self):
487 self._sun_is_shining = False
489 logger.debug('Waiting for thread to terminate...')
490 self.wait()
491 logger.debug('Thread has terminated.')
493 def got_trace(self, tr):
494 self.queue.put(tr)
496 def poll(self):
497 items = []
498 try:
499 while True:
500 items.append(self.queue.get_nowait())
502 except queue.Empty:
503 pass
505 return items
508class Acquisition(
509 SerialHamster, AcquisitionThread):
511 def __init__(self, *args, **kwargs):
512 SerialHamster.__init__(self, *args, **kwargs)
513 AcquisitionThread.__init__(self, post_process_sleep=0.001)
515 def got_trace(self, tr):
516 logger.debug('acquisition got trace rate %g Hz, duration %g s' % (
517 1.0/tr.deltat, tr.tmax - tr.tmin))
518 AcquisitionThread.got_trace(self, tr)