1# http://pyrocko.org - GPLv3 

2# 

3# The Pyrocko Developers, 21st Century 

4# ---|P------/S----------~Lg---------- 

5 

6import time 

7import logging 

8import math 

9import numpy as num 

10from scipy import stats 

11import threading 

12try: 

13 import queue 

14except ImportError: 

15 import Queue as queue 

16 

17from pyrocko import trace, util 

18 

19logger = logging.getLogger('pyrocko.streaming.serial_hamster') 

20 

21 

22class QueueIsEmpty(Exception): 

23 pass 

24 

25 

26class Queue(object): 

27 def __init__(self, nmax): 

28 self.nmax = nmax 

29 self.queue = [] 

30 

31 def push_back(self, val): 

32 self.queue.append(val) 

33 while len(self.queue) > self.nmax: 

34 self.queue.pop(0) 

35 

36 def mean(self): 

37 if not self.queue: 

38 raise QueueIsEmpty() 

39 return sum(self.queue)/float(len(self.queue)) 

40 

41 def median(self): 

42 if not self.queue: 

43 raise QueueIsEmpty() 

44 n = len(self.queue) 

45 s = sorted(self.queue) 

46 if n % 2 != 0: 

47 return s[n//2] 

48 else: 

49 return (s[n//2-1]+s[n//2])/2.0 

50 

51 def add(self, w): 

52 self.queue = [v+w for v in self.queue] 

53 

54 def empty(self): 

55 self.queue[:] = [] 

56 

57 def __len__(self): 

58 return len(self.queue) 

59 

60 def capacity(self): 

61 return self.nmax 

62 

63 def __str__(self): 

64 return ' '.join('%g' % v for v in self.queue) 

65 

66 

67class SerialHamsterError(Exception): 

68 pass 

69 

70 

71class SerialHamster(object): 

72 

73 def __init__( 

74 self, port=0, baudrate=9600, timeout=5, buffersize=128, 

75 start_string=None, 

76 network='', station='TEST', location='', channels=['Z'], 

77 disallow_uneven_sampling_rates=True, 

78 deltat=None, 

79 deltat_tolerance=0.01, 

80 in_file=None, 

81 lookback=5, 

82 min_detection_size=5, 

83 tune_to_quickones=True): 

84 

85 self.port = port 

86 self.baudrate = baudrate 

87 self.timeout = timeout 

88 self.buffersize = buffersize 

89 self.ser = None 

90 self.values = [[]]*len(channels) 

91 self.times = [] 

92 self.fixed_deltat = deltat 

93 self.deltat = None 

94 self.deltat_tolerance = deltat_tolerance 

95 self.tmin = None 

96 self.previous_deltats = Queue(nmax=lookback) 

97 self.previous_tmin_offsets = Queue(nmax=lookback) 

98 self.ncontinuous = 0 

99 self.disallow_uneven_sampling_rates = disallow_uneven_sampling_rates 

100 self.network = network 

101 self.station = station 

102 self.location = location 

103 self.channels = channels 

104 self.in_file = in_file # for testing 

105 self.listeners = [] 

106 self.quit_requested = False 

107 self.tune_to_quickones = tune_to_quickones 

108 self.start_string = start_string 

109 

110 self.min_detection_size = min_detection_size 

111 

112 def add_listener(self, obj): 

113 self.listeners.append(util.smart_weakref(obj)) 

114 

115 def clear_listeners(self): 

116 self.listeners = [] 

117 

118 def acquisition_start(self): 

119 if self.ser is not None: 

120 self.stop() 

121 

122 logger.debug( 

123 'Starting serial hamster (port=%s, baudrate=%i, timeout=%f)' 

124 % (self.port, self.baudrate, self.timeout)) 

125 

126 if self.in_file is None: 

127 import serial 

128 try: 

129 self.ser = serial.Serial( 

130 port=self.port, 

131 baudrate=self.baudrate, 

132 timeout=self.timeout) 

133 

134 self.send_start() 

135 

136 except serial.serialutil.SerialException: 

137 raise SerialHamsterError( 

138 'Cannot open serial port %s' % self.port) 

139 else: 

140 self.ser = self.in_file 

141 

142 def send_start(self): 

143 ser = self.ser 

144 if self.start_string is not None: 

145 ser.write(self.start_string.encode('ascii')) 

146 

147 def acquisition_stop(self): 

148 if self.ser is not None: 

149 logger.debug('Stopping serial hamster') 

150 if self.in_file is None: 

151 self.ser.close() 

152 self.ser = None 

153 self._flush_buffer() 

154 

155 def acquisition_request_stop(self): 

156 pass 

157 

158 def process(self): 

159 if self.ser is None: 

160 return False 

161 

162 try: 

163 line = self.ser.readline() 

164 if line == '': 

165 raise SerialHamsterError( 

166 'Failed to read from serial port %s' % self.port) 

167 except Exception: 

168 raise SerialHamsterError( 

169 'Failed to read from serial port %s' % self.port) 

170 

171 t = time.time() 

172 

173 for tok in line.split(): 

174 try: 

175 val = float(tok) 

176 except Exception: 

177 logger.warning('Got something unexpected on serial line. ' + 

178 'Current line: "%s". ' % line + 

179 'Could not convert string to float: "%s"' % tok) 

180 continue 

181 

182 self.values[0].append(val) 

183 self.times.append(t) 

184 

185 if len(self.values[0]) >= self.buffersize: 

186 self._flush_buffer() 

187 

188 return True 

189 

190 def _regression(self, t): 

191 toff = t[0] 

192 t = t-toff 

193 i = num.arange(t.size, dtype=float) 

194 r_deltat, r_tmin, r, tt, stderr = stats.linregress(i, t) 

195 if self.tune_to_quickones: 

196 for ii in range(2): 

197 t_fit = r_tmin+r_deltat*i 

198 quickones = num.where(t < t_fit) 

199 if quickones[0].size < 2: 

200 break 

201 i = i[quickones] 

202 t = t[quickones] 

203 r_deltat, r_tmin, r, tt, stderr = stats.linregress(i, t) 

204 

205 return r_deltat, r_tmin+toff 

206 

207 def _flush_buffer(self): 

208 

209 if len(self.times) < self.min_detection_size: 

210 return 

211 

212 t = num.array(self.times, dtype=float) 

213 r_deltat, r_tmin = self._regression(t) 

214 

215 if self.disallow_uneven_sampling_rates: 

216 r_deltat = 1./round(1./r_deltat) 

217 

218 # check if deltat is consistent with expectations 

219 if self.deltat is not None and self.fixed_deltat is None: 

220 try: 

221 p_deltat = self.previous_deltats.median() 

222 if (((self.disallow_uneven_sampling_rates 

223 and abs(1./p_deltat - 1./self.deltat) > 0.5) 

224 or (not self.disallow_uneven_sampling_rates 

225 and abs((self.deltat - p_deltat)/self.deltat) 

226 > self.deltat_tolerance)) 

227 and len(self.previous_deltats) 

228 > 0.5*self.previous_deltats.capacity()): 

229 

230 self.deltat = None 

231 self.previous_deltats.empty() 

232 except QueueIsEmpty: 

233 pass 

234 

235 self.previous_deltats.push_back(r_deltat) 

236 

237 # detect sampling rate 

238 if self.deltat is None: 

239 if self.fixed_deltat is not None: 

240 self.deltat = self.fixed_deltat 

241 else: 

242 self.deltat = r_deltat 

243 # must also set new time origin if sampling rate changes 

244 self.tmin = None 

245 logger.info( 

246 'Setting new sampling rate to %g Hz ' 

247 '(sampling interval is %g s)' % ( 

248 1./self.deltat, self.deltat)) 

249 

250 # check if onset has drifted / jumped 

251 if self.deltat is not None and self.tmin is not None: 

252 continuous_tmin = self.tmin + self.ncontinuous*self.deltat 

253 

254 tmin_offset = r_tmin - continuous_tmin 

255 try: 

256 toffset = self.previous_tmin_offsets.median() 

257 if abs(toffset) > self.deltat*0.7 \ 

258 and len(self.previous_tmin_offsets) \ 

259 > 0.5*self.previous_tmin_offsets.capacity(): 

260 

261 soffset = int(round(toffset/self.deltat)) 

262 logger.info( 

263 'Detected drift/jump/gap of %g sample%s' % ( 

264 soffset, ['s', ''][abs(soffset) == 1])) 

265 

266 if soffset == 1: 

267 for values in self.values: 

268 values.append(values[-1]) 

269 self.previous_tmin_offsets.add(-self.deltat) 

270 logger.info( 

271 'Adding one sample to compensate time drift') 

272 elif soffset == -1: 

273 for values in self.values: 

274 values.pop(-1) 

275 self.previous_tmin_offsets.add(+self.deltat) 

276 logger.info( 

277 'Removing one sample to compensate time drift') 

278 else: 

279 self.tmin = None 

280 self.previous_tmin_offsets.empty() 

281 

282 except QueueIsEmpty: 

283 pass 

284 

285 self.previous_tmin_offsets.push_back(tmin_offset) 

286 

287 # detect onset time 

288 if self.tmin is None and self.deltat is not None: 

289 self.tmin = r_tmin 

290 self.ncontinuous = 0 

291 logger.info( 

292 'Setting new time origin to %s' % util.time_to_str(self.tmin)) 

293 

294 if self.tmin is not None and self.deltat is not None: 

295 for channel, values in zip(self.channels, self.values): 

296 v = num.array(values, dtype=num.int32) 

297 

298 tr = trace.Trace( 

299 network=self.network, 

300 station=self.station, 

301 location=self.location, 

302 channel=channel, 

303 tmin=self.tmin + self.ncontinuous*self.deltat, 

304 deltat=self.deltat, 

305 ydata=v) 

306 

307 self.got_trace(tr) 

308 self.ncontinuous += v.size 

309 

310 self.values = [[]] * len(self.channels) 

311 self.times = [] 

312 

313 def got_trace(self, tr): 

314 logger.debug('Completed trace from serial hamster: %s' % tr) 

315 

316 # deliver payload to registered listeners 

317 for ref in self.listeners: 

318 obj = ref() 

319 if obj: 

320 obj.insert_trace(tr) 

321 

322 

323class CamSerialHamster(SerialHamster): 

324 

325 def __init__(self, baudrate=115200, channels=['N'], *args, **kwargs): 

326 SerialHamster.__init__( 

327 self, disallow_uneven_sampling_rates=False, deltat_tolerance=0.001, 

328 baudrate=baudrate, channels=channels, *args, **kwargs) 

329 

330 def send_start(self): 

331 try: 

332 ser = self.ser 

333 ser.write('99,e\n') 

334 a = ser.readline() 

335 if not a: 

336 raise SerialHamsterError( 

337 'Camera did not respond to command "99,e"') 

338 

339 logger.debug( 

340 'Sent command "99,e" to cam; received answer: "%s"' 

341 % a.strip()) 

342 

343 ser.write('2,e\n') 

344 a = ser.readline() 

345 if not a: 

346 raise SerialHamsterError( 

347 'Camera did not respond to command "2,e"') 

348 

349 logger.debug( 

350 'Sent command "2,e" to cam; received answer: "%s"' % a.strip()) 

351 ser.write('2,01\n') 

352 ser.write('2,f400\n') 

353 except Exception: 

354 raise SerialHamsterError( 

355 'Initialization of camera acquisition failed.') 

356 

357 def process(self): 

358 ser = self.ser 

359 

360 if ser is None: 

361 return False 

362 

363 ser.write('2,X\n') 

364 isamp = 0 

365 while True: 

366 data = ser.read(2) 

367 if len(data) != 2: 

368 raise SerialHamsterError( 

369 'Failed to read from serial line interface.') 

370 

371 uclow = ord(data[0]) 

372 uchigh = ord(data[1]) 

373 

374 if uclow == 0xff and uchigh == 0xff: 

375 break 

376 

377 v = uclow + (uchigh << 8) 

378 

379 self.times.append(time.time()) 

380 self.values[isamp % len(self.channels)].append(v) 

381 isamp += 1 

382 

383 if len(self.values[-1]) >= self.buffersize: 

384 self._flush_buffer() 

385 

386 return True 

387 

388 

389class USBHB628Hamster(SerialHamster): 

390 

391 def __init__(self, baudrate=115200, channels=[(0, 'Z')], *args, **kwargs): 

392 SerialHamster.__init__( 

393 self, 

394 baudrate=baudrate, 

395 channels=[x[1] for x in channels], 

396 tune_to_quickones=False, 

397 *args, **kwargs) 

398 

399 self.channel_map = dict([(c[0], j) for (j, c) in enumerate(channels)]) 

400 self.first_initiated = None 

401 self.ntaken = 0 

402 

403 def process(self): 

404 import serial 

405 

406 ser = self.ser 

407 

408 if ser is None: 

409 return False 

410 

411 t = time.time() 

412 

413 # determine next appropriate sampling instant 

414 if self.first_initiated is not None: 

415 ts = self.first_initiated + self.fixed_deltat * self.ntaken 

416 if t - ts > self.fixed_deltat*10: 

417 logger.warning( 

418 'lagging more than ten samples on serial line %s - ' 

419 'resetting' % self.port) 

420 

421 self.first_initiated = None 

422 

423 if not self.first_initiated: 

424 ts = math.ceil(t/self.fixed_deltat)*self.fixed_deltat 

425 self.first_initiated = ts 

426 self.ntaken = 0 

427 

428 # wait for next sampling instant 

429 while t < ts: 

430 time.sleep(max(0., ts-t)) 

431 t = time.time() 

432 

433 if t - ts > self.fixed_deltat: 

434 logger.warning( 

435 'lagging more than one sample on serial line %s' % self.port) 

436 

437 # get the sample 

438 ser.write('c09') 

439 ser.flush() 

440 try: 

441 data = [ord(x) for x in ser.read(17)] 

442 if len(data) != 17: 

443 raise SerialHamsterError('Reading from serial line failed.') 

444 

445 except serial.serialutil.SerialException: 

446 raise SerialHamsterError('Reading from serial line failed.') 

447 

448 self.ntaken += 1 

449 

450 for ichan in range(8): 

451 if ichan in self.channel_map: 

452 v = data[ichan*2] + (data[ichan*2+1] << 8) 

453 self.values[self.channel_map[ichan]].append(v) 

454 

455 self.times.append(t) 

456 

457 if len(self.times) >= self.buffersize: 

458 self._flush_buffer() 

459 

460 return True 

461 

462 

463class AcquisitionThread(threading.Thread): 

464 def __init__(self, post_process_sleep=0.0): 

465 threading.Thread.__init__(self) 

466 self.queue = queue.Queue() 

467 self.post_process_sleep = post_process_sleep 

468 self._sun_is_shining = True 

469 

470 def run(self): 

471 while True: 

472 try: 

473 self.acquisition_start() 

474 while self._sun_is_shining: 

475 t0 = time.time() 

476 self.process() 

477 t1 = time.time() 

478 if self.post_process_sleep != 0.0: 

479 time.sleep(max(0, self.post_process_sleep-(t1-t0))) 

480 

481 self.acquisition_stop() 

482 break 

483 

484 except SerialHamsterError as e: 

485 

486 logger.error(str(e)) 

487 logger.error('Acquistion terminated, restart in 5 s') 

488 self.acquisition_stop() 

489 time.sleep(5) 

490 if not self._sun_is_shining: 

491 break 

492 

493 def stop(self): 

494 self._sun_is_shining = False 

495 

496 logger.debug('Waiting for thread to terminate...') 

497 self.wait() 

498 logger.debug('Thread has terminated.') 

499 

500 def got_trace(self, tr): 

501 self.queue.put(tr) 

502 

503 def poll(self): 

504 items = [] 

505 try: 

506 while True: 

507 items.append(self.queue.get_nowait()) 

508 

509 except queue.Empty: 

510 pass 

511 

512 return items 

513 

514 

515class Acquisition( 

516 SerialHamster, AcquisitionThread): 

517 

518 def __init__(self, *args, **kwargs): 

519 SerialHamster.__init__(self, *args, **kwargs) 

520 AcquisitionThread.__init__(self, post_process_sleep=0.001) 

521 

522 def got_trace(self, tr): 

523 logger.debug('acquisition got trace rate %g Hz, duration %g s' % ( 

524 1.0/tr.deltat, tr.tmax - tr.tmin)) 

525 AcquisitionThread.got_trace(self, tr)