1# http://pyrocko.org - GPLv3 

2# 

3# The Pyrocko Developers, 21st Century 

4# ---|P------/S----------~Lg---------- 

5 

6import time 

7import logging 

8import weakref 

9import math 

10import numpy as num 

11from scipy import stats 

12import threading 

13try: 

14 import queue 

15except ImportError: 

16 import Queue as queue 

17 

18from pyrocko import trace, util 

19 

20logger = logging.getLogger('pyrocko.streaming.serial_hamster') 

21 

22 

23class QueueIsEmpty(Exception): 

24 pass 

25 

26 

27class Queue(object): 

28 def __init__(self, nmax): 

29 self.nmax = nmax 

30 self.queue = [] 

31 

32 def push_back(self, val): 

33 self.queue.append(val) 

34 while len(self.queue) > self.nmax: 

35 self.queue.pop(0) 

36 

37 def mean(self): 

38 if not self.queue: 

39 raise QueueIsEmpty() 

40 return sum(self.queue)/float(len(self.queue)) 

41 

42 def median(self): 

43 if not self.queue: 

44 raise QueueIsEmpty() 

45 n = len(self.queue) 

46 s = sorted(self.queue) 

47 if n % 2 != 0: 

48 return s[n//2] 

49 else: 

50 return (s[n//2-1]+s[n//2])/2.0 

51 

52 def add(self, w): 

53 self.queue = [v+w for v in self.queue] 

54 

55 def empty(self): 

56 self.queue[:] = [] 

57 

58 def __len__(self): 

59 return len(self.queue) 

60 

61 def capacity(self): 

62 return self.nmax 

63 

64 

65class SerialHamsterError(Exception): 

66 pass 

67 

68 

69class SerialHamster(object): 

70 

71 def __init__( 

72 self, port=0, baudrate=9600, timeout=5, buffersize=128, 

73 network='', station='TEST', location='', channels=['Z'], 

74 disallow_uneven_sampling_rates=True, 

75 deltat=None, 

76 deltat_tolerance=0.01, 

77 in_file=None, 

78 lookback=5, 

79 tune_to_quickones=True): 

80 

81 self.port = port 

82 self.baudrate = baudrate 

83 self.timeout = timeout 

84 self.buffersize = buffersize 

85 self.ser = None 

86 self.values = [[]]*len(channels) 

87 self.times = [] 

88 self.fixed_deltat = deltat 

89 self.deltat = None 

90 self.deltat_tolerance = deltat_tolerance 

91 self.tmin = None 

92 self.previous_deltats = Queue(nmax=lookback) 

93 self.previous_tmin_offsets = Queue(nmax=lookback) 

94 self.ncontinuous = 0 

95 self.disallow_uneven_sampling_rates = disallow_uneven_sampling_rates 

96 self.network = network 

97 self.station = station 

98 self.location = location 

99 self.channels = channels 

100 self.in_file = in_file # for testing 

101 self.listeners = [] 

102 self.quit_requested = False 

103 self.tune_to_quickones = tune_to_quickones 

104 

105 self.min_detection_size = 5 

106 

107 def add_listener(self, obj): 

108 self.listeners.append(weakref.ref(obj)) 

109 

110 def clear_listeners(self): 

111 self.listeners = [] 

112 

113 def acquisition_start(self): 

114 if self.ser is not None: 

115 self.stop() 

116 

117 logger.debug( 

118 'Starting serial hamster (port=%s, baudrate=%i, timeout=%f)' 

119 % (self.port, self.baudrate, self.timeout)) 

120 

121 if self.in_file is None: 

122 import serial 

123 try: 

124 self.ser = serial.Serial( 

125 port=self.port, 

126 baudrate=self.baudrate, 

127 timeout=self.timeout) 

128 

129 self.send_start() 

130 

131 except serial.serialutil.SerialException: 

132 raise SerialHamsterError( 

133 'Cannot open serial port %s' % self.port) 

134 else: 

135 self.ser = self.in_file 

136 

137 def send_start(self): 

138 pass 

139 

140 def acquisition_stop(self): 

141 if self.ser is not None: 

142 logger.debug('Stopping serial hamster') 

143 if self.in_file is None: 

144 self.ser.close() 

145 self.ser = None 

146 self._flush_buffer() 

147 

148 def acquisition_request_stop(self): 

149 pass 

150 

151 def process(self): 

152 if self.ser is None: 

153 return False 

154 

155 try: 

156 line = self.ser.readline() 

157 if line == '': 

158 raise SerialHamsterError( 

159 'Failed to read from serial port %s' % self.port) 

160 except Exception: 

161 raise SerialHamsterError( 

162 'Failed to read from serial port %s' % self.port) 

163 

164 t = time.time() 

165 

166 for tok in line.split(): 

167 try: 

168 val = float(tok) 

169 except Exception: 

170 logger.warning('Got something unexpected on serial line. ' + 

171 'Current line: "%s". ' % line + 

172 'Could not convert string to float: "%s"' % tok) 

173 continue 

174 

175 self.values[0].append(val) 

176 self.times.append(t) 

177 

178 if len(self.values[0]) >= self.buffersize: 

179 self._flush_buffer() 

180 

181 return True 

182 

183 def _regression(self, t): 

184 toff = t[0] 

185 t = t-toff 

186 i = num.arange(t.size, dtype=float) 

187 r_deltat, r_tmin, r, tt, stderr = stats.linregress(i, t) 

188 if self.tune_to_quickones: 

189 for ii in range(2): 

190 t_fit = r_tmin+r_deltat*i 

191 quickones = num.where(t < t_fit) 

192 if quickones[0].size < 2: 

193 break 

194 i = i[quickones] 

195 t = t[quickones] 

196 r_deltat, r_tmin, r, tt, stderr = stats.linregress(i, t) 

197 

198 return r_deltat, r_tmin+toff 

199 

200 def _flush_buffer(self): 

201 

202 if len(self.times) < self.min_detection_size: 

203 return 

204 

205 t = num.array(self.times, dtype=float) 

206 r_deltat, r_tmin = self._regression(t) 

207 

208 if self.disallow_uneven_sampling_rates: 

209 r_deltat = 1./round(1./r_deltat) 

210 

211 # check if deltat is consistent with expectations 

212 if self.deltat is not None and self.fixed_deltat is None: 

213 try: 

214 p_deltat = self.previous_deltats.median() 

215 if (((self.disallow_uneven_sampling_rates 

216 and abs(1./p_deltat - 1./self.deltat) > 0.5) 

217 or (not self.disallow_uneven_sampling_rates 

218 and abs((self.deltat - p_deltat)/self.deltat) 

219 > self.deltat_tolerance)) 

220 and len(self.previous_deltats) 

221 > 0.5*self.previous_deltats.capacity()): 

222 

223 self.deltat = None 

224 self.previous_deltats.empty() 

225 except QueueIsEmpty: 

226 pass 

227 

228 self.previous_deltats.push_back(r_deltat) 

229 

230 # detect sampling rate 

231 if self.deltat is None: 

232 if self.fixed_deltat is not None: 

233 self.deltat = self.fixed_deltat 

234 else: 

235 self.deltat = r_deltat 

236 # must also set new time origin if sampling rate changes 

237 self.tmin = None 

238 logger.info( 

239 'Setting new sampling rate to %g Hz ' 

240 '(sampling interval is %g s)' % ( 

241 1./self.deltat, self.deltat)) 

242 

243 # check if onset has drifted / jumped 

244 if self.deltat is not None and self.tmin is not None: 

245 continuous_tmin = self.tmin + self.ncontinuous*self.deltat 

246 

247 tmin_offset = r_tmin - continuous_tmin 

248 try: 

249 toffset = self.previous_tmin_offsets.median() 

250 if abs(toffset) > self.deltat*0.7 \ 

251 and len(self.previous_tmin_offsets) \ 

252 > 0.5*self.previous_tmin_offsets.capacity(): 

253 

254 soffset = int(round(toffset/self.deltat)) 

255 logger.info( 

256 'Detected drift/jump/gap of %g sample%s' % ( 

257 soffset, ['s', ''][abs(soffset) == 1])) 

258 

259 if soffset == 1: 

260 for values in self.values: 

261 values.append(values[-1]) 

262 self.previous_tmin_offsets.add(-self.deltat) 

263 logger.info( 

264 'Adding one sample to compensate time drift') 

265 elif soffset == -1: 

266 for values in self.values: 

267 values.pop(-1) 

268 self.previous_tmin_offsets.add(+self.deltat) 

269 logger.info( 

270 'Removing one sample to compensate time drift') 

271 else: 

272 self.tmin = None 

273 self.previous_tmin_offsets.empty() 

274 

275 except QueueIsEmpty: 

276 pass 

277 

278 self.previous_tmin_offsets.push_back(tmin_offset) 

279 

280 # detect onset time 

281 if self.tmin is None and self.deltat is not None: 

282 self.tmin = r_tmin 

283 self.ncontinuous = 0 

284 logger.info( 

285 'Setting new time origin to %s' % util.time_to_str(self.tmin)) 

286 

287 if self.tmin is not None and self.deltat is not None: 

288 for channel, values in zip(self.channels, self.values): 

289 v = num.array(values, dtype=int) 

290 

291 tr = trace.Trace( 

292 network=self.network, 

293 station=self.station, 

294 location=self.location, 

295 channel=channel, 

296 tmin=self.tmin + self.ncontinuous*self.deltat, 

297 deltat=self.deltat, 

298 ydata=v) 

299 

300 self.got_trace(tr) 

301 self.ncontinuous += v.size 

302 

303 self.values = [[]] * len(self.channels) 

304 self.times = [] 

305 

306 def got_trace(self, tr): 

307 logger.debug('Completed trace from serial hamster: %s' % tr) 

308 

309 # deliver payload to registered listeners 

310 for ref in self.listeners: 

311 obj = ref() 

312 if obj: 

313 obj.insert_trace(tr) 

314 

315 

316class CamSerialHamster(SerialHamster): 

317 

318 def __init__(self, baudrate=115200, channels=['N'], *args, **kwargs): 

319 SerialHamster.__init__( 

320 self, disallow_uneven_sampling_rates=False, deltat_tolerance=0.001, 

321 baudrate=baudrate, channels=channels, *args, **kwargs) 

322 

323 def send_start(self): 

324 try: 

325 ser = self.ser 

326 ser.write('99,e\n') 

327 a = ser.readline() 

328 if not a: 

329 raise SerialHamsterError( 

330 'Camera did not respond to command "99,e"') 

331 

332 logger.debug( 

333 'Sent command "99,e" to cam; received answer: "%s"' 

334 % a.strip()) 

335 

336 ser.write('2,e\n') 

337 a = ser.readline() 

338 if not a: 

339 raise SerialHamsterError( 

340 'Camera did not respond to command "2,e"') 

341 

342 logger.debug( 

343 'Sent command "2,e" to cam; received answer: "%s"' % a.strip()) 

344 ser.write('2,01\n') 

345 ser.write('2,f400\n') 

346 except Exception: 

347 raise SerialHamsterError( 

348 'Initialization of camera acquisition failed.') 

349 

350 def process(self): 

351 ser = self.ser 

352 

353 if ser is None: 

354 return False 

355 

356 ser.write('2,X\n') 

357 isamp = 0 

358 while True: 

359 data = ser.read(2) 

360 if len(data) != 2: 

361 raise SerialHamsterError( 

362 'Failed to read from serial line interface.') 

363 

364 uclow = ord(data[0]) 

365 uchigh = ord(data[1]) 

366 

367 if uclow == 0xff and uchigh == 0xff: 

368 break 

369 

370 v = uclow + (uchigh << 8) 

371 

372 self.times.append(time.time()) 

373 self.values[isamp % len(self.channels)].append(v) 

374 isamp += 1 

375 

376 if len(self.values[-1]) >= self.buffersize: 

377 self._flush_buffer() 

378 

379 return True 

380 

381 

382class USBHB628Hamster(SerialHamster): 

383 

384 def __init__(self, baudrate=115200, channels=[(0, 'Z')], *args, **kwargs): 

385 SerialHamster.__init__( 

386 self, 

387 baudrate=baudrate, 

388 channels=[x[1] for x in channels], 

389 tune_to_quickones=False, 

390 *args, **kwargs) 

391 

392 self.channel_map = dict([(c[0], j) for (j, c) in enumerate(channels)]) 

393 self.first_initiated = None 

394 self.ntaken = 0 

395 

396 def process(self): 

397 import serial 

398 

399 ser = self.ser 

400 

401 if ser is None: 

402 return False 

403 

404 t = time.time() 

405 

406 # determine next appropriate sampling instant 

407 if self.first_initiated is not None: 

408 ts = self.first_initiated + self.fixed_deltat * self.ntaken 

409 if t - ts > self.fixed_deltat*10: 

410 logger.warning( 

411 'lagging more than ten samples on serial line %s - ' 

412 'resetting' % self.port) 

413 

414 self.first_initiated = None 

415 

416 if not self.first_initiated: 

417 ts = math.ceil(t/self.fixed_deltat)*self.fixed_deltat 

418 self.first_initiated = ts 

419 self.ntaken = 0 

420 

421 # wait for next sampling instant 

422 while t < ts: 

423 time.sleep(max(0., ts-t)) 

424 t = time.time() 

425 

426 if t - ts > self.fixed_deltat: 

427 logger.warning( 

428 'lagging more than one sample on serial line %s' % self.port) 

429 

430 # get the sample 

431 ser.write('c09') 

432 ser.flush() 

433 try: 

434 data = [ord(x) for x in ser.read(17)] 

435 if len(data) != 17: 

436 raise SerialHamsterError('Reading from serial line failed.') 

437 

438 except serial.serialutil.SerialException: 

439 raise SerialHamsterError('Reading from serial line failed.') 

440 

441 self.ntaken += 1 

442 

443 for ichan in range(8): 

444 if ichan in self.channel_map: 

445 v = data[ichan*2] + (data[ichan*2+1] << 8) 

446 self.values[self.channel_map[ichan]].append(v) 

447 

448 self.times.append(t) 

449 

450 if len(self.times) >= self.buffersize: 

451 self._flush_buffer() 

452 

453 return True 

454 

455 

456class AcquisitionThread(threading.Thread): 

457 def __init__(self, post_process_sleep=0.0): 

458 threading.Thread.__init__(self) 

459 self.queue = queue.Queue() 

460 self.post_process_sleep = post_process_sleep 

461 self._sun_is_shining = True 

462 

463 def run(self): 

464 while True: 

465 try: 

466 self.acquisition_start() 

467 while self._sun_is_shining: 

468 t0 = time.time() 

469 self.process() 

470 t1 = time.time() 

471 if self.post_process_sleep != 0.0: 

472 time.sleep(max(0, self.post_process_sleep-(t1-t0))) 

473 

474 self.acquisition_stop() 

475 break 

476 

477 except SerialHamsterError as e: 

478 

479 logger.error(str(e)) 

480 logger.error('Acquistion terminated, restart in 5 s') 

481 self.acquisition_stop() 

482 time.sleep(5) 

483 if not self._sun_is_shining: 

484 break 

485 

486 def stop(self): 

487 self._sun_is_shining = False 

488 

489 logger.debug('Waiting for thread to terminate...') 

490 self.wait() 

491 logger.debug('Thread has terminated.') 

492 

493 def got_trace(self, tr): 

494 self.queue.put(tr) 

495 

496 def poll(self): 

497 items = [] 

498 try: 

499 while True: 

500 items.append(self.queue.get_nowait()) 

501 

502 except queue.Empty: 

503 pass 

504 

505 return items 

506 

507 

508class Acquisition( 

509 SerialHamster, AcquisitionThread): 

510 

511 def __init__(self, *args, **kwargs): 

512 SerialHamster.__init__(self, *args, **kwargs) 

513 AcquisitionThread.__init__(self, post_process_sleep=0.001) 

514 

515 def got_trace(self, tr): 

516 logger.debug('acquisition got trace rate %g Hz, duration %g s' % ( 

517 1.0/tr.deltat, tr.tmax - tr.tmin)) 

518 AcquisitionThread.got_trace(self, tr)