1# http://pyrocko.org - GPLv3 

2# 

3# The Pyrocko Developers, 21st Century 

4# ---|P------/S----------~Lg---------- 

5from __future__ import absolute_import, division 

6 

7import time 

8import logging 

9import weakref 

10import math 

11import numpy as num 

12from scipy import stats 

13import threading 

14try: 

15 import queue 

16except ImportError: 

17 import Queue as queue 

18 

19from pyrocko import trace, util 

20 

21logger = logging.getLogger('pyrocko.streaming.serial_hamster') 

22 

23 

24class QueueIsEmpty(Exception): 

25 pass 

26 

27 

28class Queue(object): 

29 def __init__(self, nmax): 

30 self.nmax = nmax 

31 self.queue = [] 

32 

33 def push_back(self, val): 

34 self.queue.append(val) 

35 while len(self.queue) > self.nmax: 

36 self.queue.pop(0) 

37 

38 def mean(self): 

39 if not self.queue: 

40 raise QueueIsEmpty() 

41 return sum(self.queue)/float(len(self.queue)) 

42 

43 def median(self): 

44 if not self.queue: 

45 raise QueueIsEmpty() 

46 n = len(self.queue) 

47 s = sorted(self.queue) 

48 if n % 2 != 0: 

49 return s[n//2] 

50 else: 

51 return (s[n//2-1]+s[n//2])/2.0 

52 

53 def add(self, w): 

54 self.queue = [v+w for v in self.queue] 

55 

56 def empty(self): 

57 self.queue[:] = [] 

58 

59 def __len__(self): 

60 return len(self.queue) 

61 

62 def capacity(self): 

63 return self.nmax 

64 

65 

66class SerialHamsterError(Exception): 

67 pass 

68 

69 

70class SerialHamster(object): 

71 

72 def __init__( 

73 self, port=0, baudrate=9600, timeout=5, buffersize=128, 

74 network='', station='TEST', location='', channels=['Z'], 

75 disallow_uneven_sampling_rates=True, 

76 deltat=None, 

77 deltat_tolerance=0.01, 

78 in_file=None, 

79 lookback=5, 

80 tune_to_quickones=True): 

81 

82 self.port = port 

83 self.baudrate = baudrate 

84 self.timeout = timeout 

85 self.buffersize = buffersize 

86 self.ser = None 

87 self.values = [[]]*len(channels) 

88 self.times = [] 

89 self.fixed_deltat = deltat 

90 self.deltat = None 

91 self.deltat_tolerance = deltat_tolerance 

92 self.tmin = None 

93 self.previous_deltats = Queue(nmax=lookback) 

94 self.previous_tmin_offsets = Queue(nmax=lookback) 

95 self.ncontinuous = 0 

96 self.disallow_uneven_sampling_rates = disallow_uneven_sampling_rates 

97 self.network = network 

98 self.station = station 

99 self.location = location 

100 self.channels = channels 

101 self.in_file = in_file # for testing 

102 self.listeners = [] 

103 self.quit_requested = False 

104 self.tune_to_quickones = tune_to_quickones 

105 

106 self.min_detection_size = 5 

107 

108 def add_listener(self, obj): 

109 self.listeners.append(weakref.ref(obj)) 

110 

111 def clear_listeners(self): 

112 self.listeners = [] 

113 

114 def acquisition_start(self): 

115 if self.ser is not None: 

116 self.stop() 

117 

118 logger.debug( 

119 'Starting serial hamster (port=%s, baudrate=%i, timeout=%f)' 

120 % (self.port, self.baudrate, self.timeout)) 

121 

122 if self.in_file is None: 

123 import serial 

124 try: 

125 self.ser = serial.Serial( 

126 port=self.port, 

127 baudrate=self.baudrate, 

128 timeout=self.timeout) 

129 

130 self.send_start() 

131 

132 except serial.serialutil.SerialException: 

133 raise SerialHamsterError( 

134 'Cannot open serial port %s' % self.port) 

135 else: 

136 self.ser = self.in_file 

137 

138 def send_start(self): 

139 pass 

140 

141 def acquisition_stop(self): 

142 if self.ser is not None: 

143 logger.debug('Stopping serial hamster') 

144 if self.in_file is None: 

145 self.ser.close() 

146 self.ser = None 

147 self._flush_buffer() 

148 

149 def acquisition_request_stop(self): 

150 pass 

151 

152 def process(self): 

153 if self.ser is None: 

154 return False 

155 

156 try: 

157 line = self.ser.readline() 

158 if line == '': 

159 raise SerialHamsterError( 

160 'Failed to read from serial port %s' % self.port) 

161 except Exception: 

162 raise SerialHamsterError( 

163 'Failed to read from serial port %s' % self.port) 

164 

165 t = time.time() 

166 

167 for tok in line.split(): 

168 try: 

169 val = float(tok) 

170 except Exception: 

171 logger.warning('Got something unexpected on serial line. ' + 

172 'Current line: "%s". ' % line + 

173 'Could not convert string to float: "%s"' % tok) 

174 continue 

175 

176 self.values[0].append(val) 

177 self.times.append(t) 

178 

179 if len(self.values[0]) >= self.buffersize: 

180 self._flush_buffer() 

181 

182 return True 

183 

184 def _regression(self, t): 

185 toff = t[0] 

186 t = t-toff 

187 i = num.arange(t.size, dtype=float) 

188 r_deltat, r_tmin, r, tt, stderr = stats.linregress(i, t) 

189 if self.tune_to_quickones: 

190 for ii in range(2): 

191 t_fit = r_tmin+r_deltat*i 

192 quickones = num.where(t < t_fit) 

193 if quickones[0].size < 2: 

194 break 

195 i = i[quickones] 

196 t = t[quickones] 

197 r_deltat, r_tmin, r, tt, stderr = stats.linregress(i, t) 

198 

199 return r_deltat, r_tmin+toff 

200 

201 def _flush_buffer(self): 

202 

203 if len(self.times) < self.min_detection_size: 

204 return 

205 

206 t = num.array(self.times, dtype=float) 

207 r_deltat, r_tmin = self._regression(t) 

208 

209 if self.disallow_uneven_sampling_rates: 

210 r_deltat = 1./round(1./r_deltat) 

211 

212 # check if deltat is consistent with expectations 

213 if self.deltat is not None and self.fixed_deltat is None: 

214 try: 

215 p_deltat = self.previous_deltats.median() 

216 if (((self.disallow_uneven_sampling_rates 

217 and abs(1./p_deltat - 1./self.deltat) > 0.5) 

218 or (not self.disallow_uneven_sampling_rates 

219 and abs((self.deltat - p_deltat)/self.deltat) 

220 > self.deltat_tolerance)) 

221 and len(self.previous_deltats) 

222 > 0.5*self.previous_deltats.capacity()): 

223 

224 self.deltat = None 

225 self.previous_deltats.empty() 

226 except QueueIsEmpty: 

227 pass 

228 

229 self.previous_deltats.push_back(r_deltat) 

230 

231 # detect sampling rate 

232 if self.deltat is None: 

233 if self.fixed_deltat is not None: 

234 self.deltat = self.fixed_deltat 

235 else: 

236 self.deltat = r_deltat 

237 # must also set new time origin if sampling rate changes 

238 self.tmin = None 

239 logger.info( 

240 'Setting new sampling rate to %g Hz ' 

241 '(sampling interval is %g s)' % ( 

242 1./self.deltat, self.deltat)) 

243 

244 # check if onset has drifted / jumped 

245 if self.deltat is not None and self.tmin is not None: 

246 continuous_tmin = self.tmin + self.ncontinuous*self.deltat 

247 

248 tmin_offset = r_tmin - continuous_tmin 

249 try: 

250 toffset = self.previous_tmin_offsets.median() 

251 if abs(toffset) > self.deltat*0.7 \ 

252 and len(self.previous_tmin_offsets) \ 

253 > 0.5*self.previous_tmin_offsets.capacity(): 

254 

255 soffset = int(round(toffset/self.deltat)) 

256 logger.info( 

257 'Detected drift/jump/gap of %g sample%s' % ( 

258 soffset, ['s', ''][abs(soffset) == 1])) 

259 

260 if soffset == 1: 

261 for values in self.values: 

262 values.append(values[-1]) 

263 self.previous_tmin_offsets.add(-self.deltat) 

264 logger.info( 

265 'Adding one sample to compensate time drift') 

266 elif soffset == -1: 

267 for values in self.values: 

268 values.pop(-1) 

269 self.previous_tmin_offsets.add(+self.deltat) 

270 logger.info( 

271 'Removing one sample to compensate time drift') 

272 else: 

273 self.tmin = None 

274 self.previous_tmin_offsets.empty() 

275 

276 except QueueIsEmpty: 

277 pass 

278 

279 self.previous_tmin_offsets.push_back(tmin_offset) 

280 

281 # detect onset time 

282 if self.tmin is None and self.deltat is not None: 

283 self.tmin = r_tmin 

284 self.ncontinuous = 0 

285 logger.info( 

286 'Setting new time origin to %s' % util.time_to_str(self.tmin)) 

287 

288 if self.tmin is not None and self.deltat is not None: 

289 for channel, values in zip(self.channels, self.values): 

290 v = num.array(values, dtype=int) 

291 

292 tr = trace.Trace( 

293 network=self.network, 

294 station=self.station, 

295 location=self.location, 

296 channel=channel, 

297 tmin=self.tmin + self.ncontinuous*self.deltat, 

298 deltat=self.deltat, 

299 ydata=v) 

300 

301 self.got_trace(tr) 

302 self.ncontinuous += v.size 

303 

304 self.values = [[]] * len(self.channels) 

305 self.times = [] 

306 

307 def got_trace(self, tr): 

308 logger.debug('Completed trace from serial hamster: %s' % tr) 

309 

310 # deliver payload to registered listeners 

311 for ref in self.listeners: 

312 obj = ref() 

313 if obj: 

314 obj.insert_trace(tr) 

315 

316 

317class CamSerialHamster(SerialHamster): 

318 

319 def __init__(self, baudrate=115200, channels=['N'], *args, **kwargs): 

320 SerialHamster.__init__( 

321 self, disallow_uneven_sampling_rates=False, deltat_tolerance=0.001, 

322 baudrate=baudrate, channels=channels, *args, **kwargs) 

323 

324 def send_start(self): 

325 try: 

326 ser = self.ser 

327 ser.write('99,e\n') 

328 a = ser.readline() 

329 if not a: 

330 raise SerialHamsterError( 

331 'Camera did not respond to command "99,e"') 

332 

333 logger.debug( 

334 'Sent command "99,e" to cam; received answer: "%s"' 

335 % a.strip()) 

336 

337 ser.write('2,e\n') 

338 a = ser.readline() 

339 if not a: 

340 raise SerialHamsterError( 

341 'Camera did not respond to command "2,e"') 

342 

343 logger.debug( 

344 'Sent command "2,e" to cam; received answer: "%s"' % a.strip()) 

345 ser.write('2,01\n') 

346 ser.write('2,f400\n') 

347 except Exception: 

348 raise SerialHamsterError( 

349 'Initialization of camera acquisition failed.') 

350 

351 def process(self): 

352 ser = self.ser 

353 

354 if ser is None: 

355 return False 

356 

357 ser.write('2,X\n') 

358 isamp = 0 

359 while True: 

360 data = ser.read(2) 

361 if len(data) != 2: 

362 raise SerialHamsterError( 

363 'Failed to read from serial line interface.') 

364 

365 uclow = ord(data[0]) 

366 uchigh = ord(data[1]) 

367 

368 if uclow == 0xff and uchigh == 0xff: 

369 break 

370 

371 v = uclow + (uchigh << 8) 

372 

373 self.times.append(time.time()) 

374 self.values[isamp % len(self.channels)].append(v) 

375 isamp += 1 

376 

377 if len(self.values[-1]) >= self.buffersize: 

378 self._flush_buffer() 

379 

380 return True 

381 

382 

383class USBHB628Hamster(SerialHamster): 

384 

385 def __init__(self, baudrate=115200, channels=[(0, 'Z')], *args, **kwargs): 

386 SerialHamster.__init__( 

387 self, 

388 baudrate=baudrate, 

389 channels=[x[1] for x in channels], 

390 tune_to_quickones=False, 

391 *args, **kwargs) 

392 

393 self.channel_map = dict([(c[0], j) for (j, c) in enumerate(channels)]) 

394 self.first_initiated = None 

395 self.ntaken = 0 

396 

397 def process(self): 

398 import serial 

399 

400 ser = self.ser 

401 

402 if ser is None: 

403 return False 

404 

405 t = time.time() 

406 

407 # determine next appropriate sampling instant 

408 if self.first_initiated is not None: 

409 ts = self.first_initiated + self.fixed_deltat * self.ntaken 

410 if t - ts > self.fixed_deltat*10: 

411 logger.warning( 

412 'lagging more than ten samples on serial line %s - ' 

413 'resetting' % self.port) 

414 

415 self.first_initiated = None 

416 

417 if not self.first_initiated: 

418 ts = math.ceil(t/self.fixed_deltat)*self.fixed_deltat 

419 self.first_initiated = ts 

420 self.ntaken = 0 

421 

422 # wait for next sampling instant 

423 while t < ts: 

424 time.sleep(max(0., ts-t)) 

425 t = time.time() 

426 

427 if t - ts > self.fixed_deltat: 

428 logger.warning( 

429 'lagging more than one sample on serial line %s' % self.port) 

430 

431 # get the sample 

432 ser.write('c09') 

433 ser.flush() 

434 try: 

435 data = [ord(x) for x in ser.read(17)] 

436 if len(data) != 17: 

437 raise SerialHamsterError('Reading from serial line failed.') 

438 

439 except serial.serialutil.SerialException: 

440 raise SerialHamsterError('Reading from serial line failed.') 

441 

442 self.ntaken += 1 

443 

444 for ichan in range(8): 

445 if ichan in self.channel_map: 

446 v = data[ichan*2] + (data[ichan*2+1] << 8) 

447 self.values[self.channel_map[ichan]].append(v) 

448 

449 self.times.append(t) 

450 

451 if len(self.times) >= self.buffersize: 

452 self._flush_buffer() 

453 

454 return True 

455 

456 

457class AcquisitionThread(threading.Thread): 

458 def __init__(self, post_process_sleep=0.0): 

459 threading.Thread.__init__(self) 

460 self.queue = queue.Queue() 

461 self.post_process_sleep = post_process_sleep 

462 self._sun_is_shining = True 

463 

464 def run(self): 

465 while True: 

466 try: 

467 self.acquisition_start() 

468 while self._sun_is_shining: 

469 t0 = time.time() 

470 self.process() 

471 t1 = time.time() 

472 if self.post_process_sleep != 0.0: 

473 time.sleep(max(0, self.post_process_sleep-(t1-t0))) 

474 

475 self.acquisition_stop() 

476 break 

477 

478 except SerialHamsterError as e: 

479 

480 logger.error(str(e)) 

481 logger.error('Acquistion terminated, restart in 5 s') 

482 self.acquisition_stop() 

483 time.sleep(5) 

484 if not self._sun_is_shining: 

485 break 

486 

487 def stop(self): 

488 self._sun_is_shining = False 

489 

490 logger.debug("Waiting for thread to terminate...") 

491 self.wait() 

492 logger.debug("Thread has terminated.") 

493 

494 def got_trace(self, tr): 

495 self.queue.put(tr) 

496 

497 def poll(self): 

498 items = [] 

499 try: 

500 while True: 

501 items.append(self.queue.get_nowait()) 

502 

503 except queue.Empty: 

504 pass 

505 

506 return items 

507 

508 

509class Acquisition( 

510 SerialHamster, AcquisitionThread): 

511 

512 def __init__(self, *args, **kwargs): 

513 SerialHamster.__init__(self, *args, **kwargs) 

514 AcquisitionThread.__init__(self, post_process_sleep=0.001) 

515 

516 def got_trace(self, tr): 

517 logger.debug('acquisition got trace rate %g Hz, duration %g s' % ( 

518 1.0/tr.deltat, tr.tmax - tr.tmin)) 

519 AcquisitionThread.got_trace(self, tr)