1# http://pyrocko.org - GPLv3
2#
3# The Pyrocko Developers, 21st Century
4# ---|P------/S----------~Lg----------
6import time
7import logging
8import weakref
9import math
10import numpy as num
11from scipy import stats
12import threading
13try:
14 import queue
15except ImportError:
16 import Queue as queue
18from pyrocko import trace, util
20logger = logging.getLogger('pyrocko.streaming.serial_hamster')
23class QueueIsEmpty(Exception):
24 pass
27class Queue(object):
28 def __init__(self, nmax):
29 self.nmax = nmax
30 self.queue = []
32 def push_back(self, val):
33 self.queue.append(val)
34 while len(self.queue) > self.nmax:
35 self.queue.pop(0)
37 def mean(self):
38 if not self.queue:
39 raise QueueIsEmpty()
40 return sum(self.queue)/float(len(self.queue))
42 def median(self):
43 if not self.queue:
44 raise QueueIsEmpty()
45 n = len(self.queue)
46 s = sorted(self.queue)
47 if n % 2 != 0:
48 return s[n//2]
49 else:
50 return (s[n//2-1]+s[n//2])/2.0
52 def add(self, w):
53 self.queue = [v+w for v in self.queue]
55 def empty(self):
56 self.queue[:] = []
58 def __len__(self):
59 return len(self.queue)
61 def capacity(self):
62 return self.nmax
64 def __str__(self):
65 return ' '.join('%g' % v for v in self.queue)
68class SerialHamsterError(Exception):
69 pass
72class SerialHamster(object):
74 def __init__(
75 self, port=0, baudrate=9600, timeout=5, buffersize=128,
76 start_string=None,
77 network='', station='TEST', location='', channels=['Z'],
78 disallow_uneven_sampling_rates=True,
79 deltat=None,
80 deltat_tolerance=0.01,
81 in_file=None,
82 lookback=5,
83 min_detection_size=5,
84 tune_to_quickones=True):
86 self.port = port
87 self.baudrate = baudrate
88 self.timeout = timeout
89 self.buffersize = buffersize
90 self.ser = None
91 self.values = [[]]*len(channels)
92 self.times = []
93 self.fixed_deltat = deltat
94 self.deltat = None
95 self.deltat_tolerance = deltat_tolerance
96 self.tmin = None
97 self.previous_deltats = Queue(nmax=lookback)
98 self.previous_tmin_offsets = Queue(nmax=lookback)
99 self.ncontinuous = 0
100 self.disallow_uneven_sampling_rates = disallow_uneven_sampling_rates
101 self.network = network
102 self.station = station
103 self.location = location
104 self.channels = channels
105 self.in_file = in_file # for testing
106 self.listeners = []
107 self.quit_requested = False
108 self.tune_to_quickones = tune_to_quickones
109 self.start_string = start_string
111 self.min_detection_size = min_detection_size
113 def add_listener(self, obj):
114 self.listeners.append(weakref.ref(obj))
116 def clear_listeners(self):
117 self.listeners = []
119 def acquisition_start(self):
120 if self.ser is not None:
121 self.stop()
123 logger.debug(
124 'Starting serial hamster (port=%s, baudrate=%i, timeout=%f)'
125 % (self.port, self.baudrate, self.timeout))
127 if self.in_file is None:
128 import serial
129 try:
130 self.ser = serial.Serial(
131 port=self.port,
132 baudrate=self.baudrate,
133 timeout=self.timeout)
135 self.send_start()
137 except serial.serialutil.SerialException:
138 raise SerialHamsterError(
139 'Cannot open serial port %s' % self.port)
140 else:
141 self.ser = self.in_file
143 def send_start(self):
144 ser = self.ser
145 if self.start_string is not None:
146 ser.write(self.start_string.encode('ascii'))
148 def acquisition_stop(self):
149 if self.ser is not None:
150 logger.debug('Stopping serial hamster')
151 if self.in_file is None:
152 self.ser.close()
153 self.ser = None
154 self._flush_buffer()
156 def acquisition_request_stop(self):
157 pass
159 def process(self):
160 if self.ser is None:
161 return False
163 try:
164 line = self.ser.readline()
165 if line == '':
166 raise SerialHamsterError(
167 'Failed to read from serial port %s' % self.port)
168 except Exception:
169 raise SerialHamsterError(
170 'Failed to read from serial port %s' % self.port)
172 t = time.time()
174 for tok in line.split():
175 try:
176 val = float(tok)
177 except Exception:
178 logger.warning('Got something unexpected on serial line. ' +
179 'Current line: "%s". ' % line +
180 'Could not convert string to float: "%s"' % tok)
181 continue
183 self.values[0].append(val)
184 self.times.append(t)
186 if len(self.values[0]) >= self.buffersize:
187 self._flush_buffer()
189 return True
191 def _regression(self, t):
192 toff = t[0]
193 t = t-toff
194 i = num.arange(t.size, dtype=float)
195 r_deltat, r_tmin, r, tt, stderr = stats.linregress(i, t)
196 if self.tune_to_quickones:
197 for ii in range(2):
198 t_fit = r_tmin+r_deltat*i
199 quickones = num.where(t < t_fit)
200 if quickones[0].size < 2:
201 break
202 i = i[quickones]
203 t = t[quickones]
204 r_deltat, r_tmin, r, tt, stderr = stats.linregress(i, t)
206 return r_deltat, r_tmin+toff
208 def _flush_buffer(self):
210 if len(self.times) < self.min_detection_size:
211 return
213 t = num.array(self.times, dtype=float)
214 r_deltat, r_tmin = self._regression(t)
216 if self.disallow_uneven_sampling_rates:
217 r_deltat = 1./round(1./r_deltat)
219 # check if deltat is consistent with expectations
220 if self.deltat is not None and self.fixed_deltat is None:
221 try:
222 p_deltat = self.previous_deltats.median()
223 if (((self.disallow_uneven_sampling_rates
224 and abs(1./p_deltat - 1./self.deltat) > 0.5)
225 or (not self.disallow_uneven_sampling_rates
226 and abs((self.deltat - p_deltat)/self.deltat)
227 > self.deltat_tolerance))
228 and len(self.previous_deltats)
229 > 0.5*self.previous_deltats.capacity()):
231 self.deltat = None
232 self.previous_deltats.empty()
233 except QueueIsEmpty:
234 pass
236 self.previous_deltats.push_back(r_deltat)
238 # detect sampling rate
239 if self.deltat is None:
240 if self.fixed_deltat is not None:
241 self.deltat = self.fixed_deltat
242 else:
243 self.deltat = r_deltat
244 # must also set new time origin if sampling rate changes
245 self.tmin = None
246 logger.info(
247 'Setting new sampling rate to %g Hz '
248 '(sampling interval is %g s)' % (
249 1./self.deltat, self.deltat))
251 # check if onset has drifted / jumped
252 if self.deltat is not None and self.tmin is not None:
253 continuous_tmin = self.tmin + self.ncontinuous*self.deltat
255 tmin_offset = r_tmin - continuous_tmin
256 try:
257 toffset = self.previous_tmin_offsets.median()
258 if abs(toffset) > self.deltat*0.7 \
259 and len(self.previous_tmin_offsets) \
260 > 0.5*self.previous_tmin_offsets.capacity():
262 soffset = int(round(toffset/self.deltat))
263 logger.info(
264 'Detected drift/jump/gap of %g sample%s' % (
265 soffset, ['s', ''][abs(soffset) == 1]))
267 if soffset == 1:
268 for values in self.values:
269 values.append(values[-1])
270 self.previous_tmin_offsets.add(-self.deltat)
271 logger.info(
272 'Adding one sample to compensate time drift')
273 elif soffset == -1:
274 for values in self.values:
275 values.pop(-1)
276 self.previous_tmin_offsets.add(+self.deltat)
277 logger.info(
278 'Removing one sample to compensate time drift')
279 else:
280 self.tmin = None
281 self.previous_tmin_offsets.empty()
283 except QueueIsEmpty:
284 pass
286 self.previous_tmin_offsets.push_back(tmin_offset)
288 # detect onset time
289 if self.tmin is None and self.deltat is not None:
290 self.tmin = r_tmin
291 self.ncontinuous = 0
292 logger.info(
293 'Setting new time origin to %s' % util.time_to_str(self.tmin))
295 if self.tmin is not None and self.deltat is not None:
296 for channel, values in zip(self.channels, self.values):
297 v = num.array(values, dtype=num.int32)
299 tr = trace.Trace(
300 network=self.network,
301 station=self.station,
302 location=self.location,
303 channel=channel,
304 tmin=self.tmin + self.ncontinuous*self.deltat,
305 deltat=self.deltat,
306 ydata=v)
308 self.got_trace(tr)
309 self.ncontinuous += v.size
311 self.values = [[]] * len(self.channels)
312 self.times = []
314 def got_trace(self, tr):
315 logger.debug('Completed trace from serial hamster: %s' % tr)
317 # deliver payload to registered listeners
318 for ref in self.listeners:
319 obj = ref()
320 if obj:
321 obj.insert_trace(tr)
324class CamSerialHamster(SerialHamster):
326 def __init__(self, baudrate=115200, channels=['N'], *args, **kwargs):
327 SerialHamster.__init__(
328 self, disallow_uneven_sampling_rates=False, deltat_tolerance=0.001,
329 baudrate=baudrate, channels=channels, *args, **kwargs)
331 def send_start(self):
332 try:
333 ser = self.ser
334 ser.write('99,e\n')
335 a = ser.readline()
336 if not a:
337 raise SerialHamsterError(
338 'Camera did not respond to command "99,e"')
340 logger.debug(
341 'Sent command "99,e" to cam; received answer: "%s"'
342 % a.strip())
344 ser.write('2,e\n')
345 a = ser.readline()
346 if not a:
347 raise SerialHamsterError(
348 'Camera did not respond to command "2,e"')
350 logger.debug(
351 'Sent command "2,e" to cam; received answer: "%s"' % a.strip())
352 ser.write('2,01\n')
353 ser.write('2,f400\n')
354 except Exception:
355 raise SerialHamsterError(
356 'Initialization of camera acquisition failed.')
358 def process(self):
359 ser = self.ser
361 if ser is None:
362 return False
364 ser.write('2,X\n')
365 isamp = 0
366 while True:
367 data = ser.read(2)
368 if len(data) != 2:
369 raise SerialHamsterError(
370 'Failed to read from serial line interface.')
372 uclow = ord(data[0])
373 uchigh = ord(data[1])
375 if uclow == 0xff and uchigh == 0xff:
376 break
378 v = uclow + (uchigh << 8)
380 self.times.append(time.time())
381 self.values[isamp % len(self.channels)].append(v)
382 isamp += 1
384 if len(self.values[-1]) >= self.buffersize:
385 self._flush_buffer()
387 return True
390class USBHB628Hamster(SerialHamster):
392 def __init__(self, baudrate=115200, channels=[(0, 'Z')], *args, **kwargs):
393 SerialHamster.__init__(
394 self,
395 baudrate=baudrate,
396 channels=[x[1] for x in channels],
397 tune_to_quickones=False,
398 *args, **kwargs)
400 self.channel_map = dict([(c[0], j) for (j, c) in enumerate(channels)])
401 self.first_initiated = None
402 self.ntaken = 0
404 def process(self):
405 import serial
407 ser = self.ser
409 if ser is None:
410 return False
412 t = time.time()
414 # determine next appropriate sampling instant
415 if self.first_initiated is not None:
416 ts = self.first_initiated + self.fixed_deltat * self.ntaken
417 if t - ts > self.fixed_deltat*10:
418 logger.warning(
419 'lagging more than ten samples on serial line %s - '
420 'resetting' % self.port)
422 self.first_initiated = None
424 if not self.first_initiated:
425 ts = math.ceil(t/self.fixed_deltat)*self.fixed_deltat
426 self.first_initiated = ts
427 self.ntaken = 0
429 # wait for next sampling instant
430 while t < ts:
431 time.sleep(max(0., ts-t))
432 t = time.time()
434 if t - ts > self.fixed_deltat:
435 logger.warning(
436 'lagging more than one sample on serial line %s' % self.port)
438 # get the sample
439 ser.write('c09')
440 ser.flush()
441 try:
442 data = [ord(x) for x in ser.read(17)]
443 if len(data) != 17:
444 raise SerialHamsterError('Reading from serial line failed.')
446 except serial.serialutil.SerialException:
447 raise SerialHamsterError('Reading from serial line failed.')
449 self.ntaken += 1
451 for ichan in range(8):
452 if ichan in self.channel_map:
453 v = data[ichan*2] + (data[ichan*2+1] << 8)
454 self.values[self.channel_map[ichan]].append(v)
456 self.times.append(t)
458 if len(self.times) >= self.buffersize:
459 self._flush_buffer()
461 return True
464class AcquisitionThread(threading.Thread):
465 def __init__(self, post_process_sleep=0.0):
466 threading.Thread.__init__(self)
467 self.queue = queue.Queue()
468 self.post_process_sleep = post_process_sleep
469 self._sun_is_shining = True
471 def run(self):
472 while True:
473 try:
474 self.acquisition_start()
475 while self._sun_is_shining:
476 t0 = time.time()
477 self.process()
478 t1 = time.time()
479 if self.post_process_sleep != 0.0:
480 time.sleep(max(0, self.post_process_sleep-(t1-t0)))
482 self.acquisition_stop()
483 break
485 except SerialHamsterError as e:
487 logger.error(str(e))
488 logger.error('Acquistion terminated, restart in 5 s')
489 self.acquisition_stop()
490 time.sleep(5)
491 if not self._sun_is_shining:
492 break
494 def stop(self):
495 self._sun_is_shining = False
497 logger.debug("Waiting for thread to terminate...")
498 self.wait()
499 logger.debug("Thread has terminated.")
501 def got_trace(self, tr):
502 self.queue.put(tr)
504 def poll(self):
505 items = []
506 try:
507 while True:
508 items.append(self.queue.get_nowait())
510 except queue.Empty:
511 pass
513 return items
516class Acquisition(
517 SerialHamster, AcquisitionThread):
519 def __init__(self, *args, **kwargs):
520 SerialHamster.__init__(self, *args, **kwargs)
521 AcquisitionThread.__init__(self, post_process_sleep=0.001)
523 def got_trace(self, tr):
524 logger.debug('acquisition got trace rate %g Hz, duration %g s' % (
525 1.0/tr.deltat, tr.tmax - tr.tmin))
526 AcquisitionThread.got_trace(self, tr)