1# http://pyrocko.org - GPLv3 

2# 

3# The Pyrocko Developers, 21st Century 

4# ---|P------/S----------~Lg---------- 

5 

6import time 

7import logging 

8import weakref 

9import math 

10import numpy as num 

11from scipy import stats 

12import threading 

13try: 

14 import queue 

15except ImportError: 

16 import Queue as queue 

17 

18from pyrocko import trace, util 

19 

20logger = logging.getLogger('pyrocko.streaming.serial_hamster') 

21 

22 

23class QueueIsEmpty(Exception): 

24 pass 

25 

26 

27class Queue(object): 

28 def __init__(self, nmax): 

29 self.nmax = nmax 

30 self.queue = [] 

31 

32 def push_back(self, val): 

33 self.queue.append(val) 

34 while len(self.queue) > self.nmax: 

35 self.queue.pop(0) 

36 

37 def mean(self): 

38 if not self.queue: 

39 raise QueueIsEmpty() 

40 return sum(self.queue)/float(len(self.queue)) 

41 

42 def median(self): 

43 if not self.queue: 

44 raise QueueIsEmpty() 

45 n = len(self.queue) 

46 s = sorted(self.queue) 

47 if n % 2 != 0: 

48 return s[n//2] 

49 else: 

50 return (s[n//2-1]+s[n//2])/2.0 

51 

52 def add(self, w): 

53 self.queue = [v+w for v in self.queue] 

54 

55 def empty(self): 

56 self.queue[:] = [] 

57 

58 def __len__(self): 

59 return len(self.queue) 

60 

61 def capacity(self): 

62 return self.nmax 

63 

64 def __str__(self): 

65 return ' '.join('%g' % v for v in self.queue) 

66 

67 

68class SerialHamsterError(Exception): 

69 pass 

70 

71 

72class SerialHamster(object): 

73 

74 def __init__( 

75 self, port=0, baudrate=9600, timeout=5, buffersize=128, 

76 start_string=None, 

77 network='', station='TEST', location='', channels=['Z'], 

78 disallow_uneven_sampling_rates=True, 

79 deltat=None, 

80 deltat_tolerance=0.01, 

81 in_file=None, 

82 lookback=5, 

83 min_detection_size=5, 

84 tune_to_quickones=True): 

85 

86 self.port = port 

87 self.baudrate = baudrate 

88 self.timeout = timeout 

89 self.buffersize = buffersize 

90 self.ser = None 

91 self.values = [[]]*len(channels) 

92 self.times = [] 

93 self.fixed_deltat = deltat 

94 self.deltat = None 

95 self.deltat_tolerance = deltat_tolerance 

96 self.tmin = None 

97 self.previous_deltats = Queue(nmax=lookback) 

98 self.previous_tmin_offsets = Queue(nmax=lookback) 

99 self.ncontinuous = 0 

100 self.disallow_uneven_sampling_rates = disallow_uneven_sampling_rates 

101 self.network = network 

102 self.station = station 

103 self.location = location 

104 self.channels = channels 

105 self.in_file = in_file # for testing 

106 self.listeners = [] 

107 self.quit_requested = False 

108 self.tune_to_quickones = tune_to_quickones 

109 self.start_string = start_string 

110 

111 self.min_detection_size = min_detection_size 

112 

113 def add_listener(self, obj): 

114 self.listeners.append(weakref.ref(obj)) 

115 

116 def clear_listeners(self): 

117 self.listeners = [] 

118 

119 def acquisition_start(self): 

120 if self.ser is not None: 

121 self.stop() 

122 

123 logger.debug( 

124 'Starting serial hamster (port=%s, baudrate=%i, timeout=%f)' 

125 % (self.port, self.baudrate, self.timeout)) 

126 

127 if self.in_file is None: 

128 import serial 

129 try: 

130 self.ser = serial.Serial( 

131 port=self.port, 

132 baudrate=self.baudrate, 

133 timeout=self.timeout) 

134 

135 self.send_start() 

136 

137 except serial.serialutil.SerialException: 

138 raise SerialHamsterError( 

139 'Cannot open serial port %s' % self.port) 

140 else: 

141 self.ser = self.in_file 

142 

143 def send_start(self): 

144 ser = self.ser 

145 if self.start_string is not None: 

146 ser.write(self.start_string.encode('ascii')) 

147 

148 def acquisition_stop(self): 

149 if self.ser is not None: 

150 logger.debug('Stopping serial hamster') 

151 if self.in_file is None: 

152 self.ser.close() 

153 self.ser = None 

154 self._flush_buffer() 

155 

156 def acquisition_request_stop(self): 

157 pass 

158 

159 def process(self): 

160 if self.ser is None: 

161 return False 

162 

163 try: 

164 line = self.ser.readline() 

165 if line == '': 

166 raise SerialHamsterError( 

167 'Failed to read from serial port %s' % self.port) 

168 except Exception: 

169 raise SerialHamsterError( 

170 'Failed to read from serial port %s' % self.port) 

171 

172 t = time.time() 

173 

174 for tok in line.split(): 

175 try: 

176 val = float(tok) 

177 except Exception: 

178 logger.warning('Got something unexpected on serial line. ' + 

179 'Current line: "%s". ' % line + 

180 'Could not convert string to float: "%s"' % tok) 

181 continue 

182 

183 self.values[0].append(val) 

184 self.times.append(t) 

185 

186 if len(self.values[0]) >= self.buffersize: 

187 self._flush_buffer() 

188 

189 return True 

190 

191 def _regression(self, t): 

192 toff = t[0] 

193 t = t-toff 

194 i = num.arange(t.size, dtype=float) 

195 r_deltat, r_tmin, r, tt, stderr = stats.linregress(i, t) 

196 if self.tune_to_quickones: 

197 for ii in range(2): 

198 t_fit = r_tmin+r_deltat*i 

199 quickones = num.where(t < t_fit) 

200 if quickones[0].size < 2: 

201 break 

202 i = i[quickones] 

203 t = t[quickones] 

204 r_deltat, r_tmin, r, tt, stderr = stats.linregress(i, t) 

205 

206 return r_deltat, r_tmin+toff 

207 

208 def _flush_buffer(self): 

209 

210 if len(self.times) < self.min_detection_size: 

211 return 

212 

213 t = num.array(self.times, dtype=float) 

214 r_deltat, r_tmin = self._regression(t) 

215 

216 if self.disallow_uneven_sampling_rates: 

217 r_deltat = 1./round(1./r_deltat) 

218 

219 # check if deltat is consistent with expectations 

220 if self.deltat is not None and self.fixed_deltat is None: 

221 try: 

222 p_deltat = self.previous_deltats.median() 

223 if (((self.disallow_uneven_sampling_rates 

224 and abs(1./p_deltat - 1./self.deltat) > 0.5) 

225 or (not self.disallow_uneven_sampling_rates 

226 and abs((self.deltat - p_deltat)/self.deltat) 

227 > self.deltat_tolerance)) 

228 and len(self.previous_deltats) 

229 > 0.5*self.previous_deltats.capacity()): 

230 

231 self.deltat = None 

232 self.previous_deltats.empty() 

233 except QueueIsEmpty: 

234 pass 

235 

236 self.previous_deltats.push_back(r_deltat) 

237 

238 # detect sampling rate 

239 if self.deltat is None: 

240 if self.fixed_deltat is not None: 

241 self.deltat = self.fixed_deltat 

242 else: 

243 self.deltat = r_deltat 

244 # must also set new time origin if sampling rate changes 

245 self.tmin = None 

246 logger.info( 

247 'Setting new sampling rate to %g Hz ' 

248 '(sampling interval is %g s)' % ( 

249 1./self.deltat, self.deltat)) 

250 

251 # check if onset has drifted / jumped 

252 if self.deltat is not None and self.tmin is not None: 

253 continuous_tmin = self.tmin + self.ncontinuous*self.deltat 

254 

255 tmin_offset = r_tmin - continuous_tmin 

256 try: 

257 toffset = self.previous_tmin_offsets.median() 

258 if abs(toffset) > self.deltat*0.7 \ 

259 and len(self.previous_tmin_offsets) \ 

260 > 0.5*self.previous_tmin_offsets.capacity(): 

261 

262 soffset = int(round(toffset/self.deltat)) 

263 logger.info( 

264 'Detected drift/jump/gap of %g sample%s' % ( 

265 soffset, ['s', ''][abs(soffset) == 1])) 

266 

267 if soffset == 1: 

268 for values in self.values: 

269 values.append(values[-1]) 

270 self.previous_tmin_offsets.add(-self.deltat) 

271 logger.info( 

272 'Adding one sample to compensate time drift') 

273 elif soffset == -1: 

274 for values in self.values: 

275 values.pop(-1) 

276 self.previous_tmin_offsets.add(+self.deltat) 

277 logger.info( 

278 'Removing one sample to compensate time drift') 

279 else: 

280 self.tmin = None 

281 self.previous_tmin_offsets.empty() 

282 

283 except QueueIsEmpty: 

284 pass 

285 

286 self.previous_tmin_offsets.push_back(tmin_offset) 

287 

288 # detect onset time 

289 if self.tmin is None and self.deltat is not None: 

290 self.tmin = r_tmin 

291 self.ncontinuous = 0 

292 logger.info( 

293 'Setting new time origin to %s' % util.time_to_str(self.tmin)) 

294 

295 if self.tmin is not None and self.deltat is not None: 

296 for channel, values in zip(self.channels, self.values): 

297 v = num.array(values, dtype=num.int32) 

298 

299 tr = trace.Trace( 

300 network=self.network, 

301 station=self.station, 

302 location=self.location, 

303 channel=channel, 

304 tmin=self.tmin + self.ncontinuous*self.deltat, 

305 deltat=self.deltat, 

306 ydata=v) 

307 

308 self.got_trace(tr) 

309 self.ncontinuous += v.size 

310 

311 self.values = [[]] * len(self.channels) 

312 self.times = [] 

313 

314 def got_trace(self, tr): 

315 logger.debug('Completed trace from serial hamster: %s' % tr) 

316 

317 # deliver payload to registered listeners 

318 for ref in self.listeners: 

319 obj = ref() 

320 if obj: 

321 obj.insert_trace(tr) 

322 

323 

324class CamSerialHamster(SerialHamster): 

325 

326 def __init__(self, baudrate=115200, channels=['N'], *args, **kwargs): 

327 SerialHamster.__init__( 

328 self, disallow_uneven_sampling_rates=False, deltat_tolerance=0.001, 

329 baudrate=baudrate, channels=channels, *args, **kwargs) 

330 

331 def send_start(self): 

332 try: 

333 ser = self.ser 

334 ser.write('99,e\n') 

335 a = ser.readline() 

336 if not a: 

337 raise SerialHamsterError( 

338 'Camera did not respond to command "99,e"') 

339 

340 logger.debug( 

341 'Sent command "99,e" to cam; received answer: "%s"' 

342 % a.strip()) 

343 

344 ser.write('2,e\n') 

345 a = ser.readline() 

346 if not a: 

347 raise SerialHamsterError( 

348 'Camera did not respond to command "2,e"') 

349 

350 logger.debug( 

351 'Sent command "2,e" to cam; received answer: "%s"' % a.strip()) 

352 ser.write('2,01\n') 

353 ser.write('2,f400\n') 

354 except Exception: 

355 raise SerialHamsterError( 

356 'Initialization of camera acquisition failed.') 

357 

358 def process(self): 

359 ser = self.ser 

360 

361 if ser is None: 

362 return False 

363 

364 ser.write('2,X\n') 

365 isamp = 0 

366 while True: 

367 data = ser.read(2) 

368 if len(data) != 2: 

369 raise SerialHamsterError( 

370 'Failed to read from serial line interface.') 

371 

372 uclow = ord(data[0]) 

373 uchigh = ord(data[1]) 

374 

375 if uclow == 0xff and uchigh == 0xff: 

376 break 

377 

378 v = uclow + (uchigh << 8) 

379 

380 self.times.append(time.time()) 

381 self.values[isamp % len(self.channels)].append(v) 

382 isamp += 1 

383 

384 if len(self.values[-1]) >= self.buffersize: 

385 self._flush_buffer() 

386 

387 return True 

388 

389 

390class USBHB628Hamster(SerialHamster): 

391 

392 def __init__(self, baudrate=115200, channels=[(0, 'Z')], *args, **kwargs): 

393 SerialHamster.__init__( 

394 self, 

395 baudrate=baudrate, 

396 channels=[x[1] for x in channels], 

397 tune_to_quickones=False, 

398 *args, **kwargs) 

399 

400 self.channel_map = dict([(c[0], j) for (j, c) in enumerate(channels)]) 

401 self.first_initiated = None 

402 self.ntaken = 0 

403 

404 def process(self): 

405 import serial 

406 

407 ser = self.ser 

408 

409 if ser is None: 

410 return False 

411 

412 t = time.time() 

413 

414 # determine next appropriate sampling instant 

415 if self.first_initiated is not None: 

416 ts = self.first_initiated + self.fixed_deltat * self.ntaken 

417 if t - ts > self.fixed_deltat*10: 

418 logger.warning( 

419 'lagging more than ten samples on serial line %s - ' 

420 'resetting' % self.port) 

421 

422 self.first_initiated = None 

423 

424 if not self.first_initiated: 

425 ts = math.ceil(t/self.fixed_deltat)*self.fixed_deltat 

426 self.first_initiated = ts 

427 self.ntaken = 0 

428 

429 # wait for next sampling instant 

430 while t < ts: 

431 time.sleep(max(0., ts-t)) 

432 t = time.time() 

433 

434 if t - ts > self.fixed_deltat: 

435 logger.warning( 

436 'lagging more than one sample on serial line %s' % self.port) 

437 

438 # get the sample 

439 ser.write('c09') 

440 ser.flush() 

441 try: 

442 data = [ord(x) for x in ser.read(17)] 

443 if len(data) != 17: 

444 raise SerialHamsterError('Reading from serial line failed.') 

445 

446 except serial.serialutil.SerialException: 

447 raise SerialHamsterError('Reading from serial line failed.') 

448 

449 self.ntaken += 1 

450 

451 for ichan in range(8): 

452 if ichan in self.channel_map: 

453 v = data[ichan*2] + (data[ichan*2+1] << 8) 

454 self.values[self.channel_map[ichan]].append(v) 

455 

456 self.times.append(t) 

457 

458 if len(self.times) >= self.buffersize: 

459 self._flush_buffer() 

460 

461 return True 

462 

463 

464class AcquisitionThread(threading.Thread): 

465 def __init__(self, post_process_sleep=0.0): 

466 threading.Thread.__init__(self) 

467 self.queue = queue.Queue() 

468 self.post_process_sleep = post_process_sleep 

469 self._sun_is_shining = True 

470 

471 def run(self): 

472 while True: 

473 try: 

474 self.acquisition_start() 

475 while self._sun_is_shining: 

476 t0 = time.time() 

477 self.process() 

478 t1 = time.time() 

479 if self.post_process_sleep != 0.0: 

480 time.sleep(max(0, self.post_process_sleep-(t1-t0))) 

481 

482 self.acquisition_stop() 

483 break 

484 

485 except SerialHamsterError as e: 

486 

487 logger.error(str(e)) 

488 logger.error('Acquistion terminated, restart in 5 s') 

489 self.acquisition_stop() 

490 time.sleep(5) 

491 if not self._sun_is_shining: 

492 break 

493 

494 def stop(self): 

495 self._sun_is_shining = False 

496 

497 logger.debug("Waiting for thread to terminate...") 

498 self.wait() 

499 logger.debug("Thread has terminated.") 

500 

501 def got_trace(self, tr): 

502 self.queue.put(tr) 

503 

504 def poll(self): 

505 items = [] 

506 try: 

507 while True: 

508 items.append(self.queue.get_nowait()) 

509 

510 except queue.Empty: 

511 pass 

512 

513 return items 

514 

515 

516class Acquisition( 

517 SerialHamster, AcquisitionThread): 

518 

519 def __init__(self, *args, **kwargs): 

520 SerialHamster.__init__(self, *args, **kwargs) 

521 AcquisitionThread.__init__(self, post_process_sleep=0.001) 

522 

523 def got_trace(self, tr): 

524 logger.debug('acquisition got trace rate %g Hz, duration %g s' % ( 

525 1.0/tr.deltat, tr.tmax - tr.tmin)) 

526 AcquisitionThread.got_trace(self, tr)