Coverage for /usr/local/lib/python3.11/dist-packages/pyrocko/streaming/serial_hamster.py: 16%

318 statements  

« prev     ^ index     » next       coverage.py v6.5.0, created at 2024-03-07 11:54 +0000

1# http://pyrocko.org - GPLv3 

2# 

3# The Pyrocko Developers, 21st Century 

4# ---|P------/S----------~Lg---------- 

5 

6''' 

7Live stream reader for a few USB/serial digitizers. 

8''' 

9 

10import time 

11import logging 

12import math 

13import numpy as num 

14from scipy import stats 

15import threading 

16try: 

17 import queue 

18except ImportError: 

19 import Queue as queue 

20 

21from pyrocko import trace, util 

22 

23logger = logging.getLogger('pyrocko.streaming.serial_hamster') 

24 

25 

26class QueueIsEmpty(Exception): 

27 pass 

28 

29 

30class Queue(object): 

31 def __init__(self, nmax): 

32 self.nmax = nmax 

33 self.queue = [] 

34 

35 def push_back(self, val): 

36 self.queue.append(val) 

37 while len(self.queue) > self.nmax: 

38 self.queue.pop(0) 

39 

40 def mean(self): 

41 if not self.queue: 

42 raise QueueIsEmpty() 

43 return sum(self.queue)/float(len(self.queue)) 

44 

45 def median(self): 

46 if not self.queue: 

47 raise QueueIsEmpty() 

48 n = len(self.queue) 

49 s = sorted(self.queue) 

50 if n % 2 != 0: 

51 return s[n//2] 

52 else: 

53 return (s[n//2-1]+s[n//2])/2.0 

54 

55 def add(self, w): 

56 self.queue = [v+w for v in self.queue] 

57 

58 def empty(self): 

59 self.queue[:] = [] 

60 

61 def __len__(self): 

62 return len(self.queue) 

63 

64 def capacity(self): 

65 return self.nmax 

66 

67 def __str__(self): 

68 return ' '.join('%g' % v for v in self.queue) 

69 

70 

71class SerialHamsterError(Exception): 

72 pass 

73 

74 

75class SerialHamster(object): 

76 

77 def __init__( 

78 self, port=0, baudrate=9600, timeout=5, buffersize=128, 

79 start_string=None, 

80 network='', station='TEST', location='', channels=['Z'], 

81 disallow_uneven_sampling_rates=True, 

82 deltat=None, 

83 deltat_tolerance=0.01, 

84 in_file=None, 

85 lookback=5, 

86 min_detection_size=5, 

87 tune_to_quickones=True): 

88 

89 self.port = port 

90 self.baudrate = baudrate 

91 self.timeout = timeout 

92 self.buffersize = buffersize 

93 self.ser = None 

94 self.values = [[]]*len(channels) 

95 self.times = [] 

96 self.fixed_deltat = deltat 

97 self.deltat = None 

98 self.deltat_tolerance = deltat_tolerance 

99 self.tmin = None 

100 self.previous_deltats = Queue(nmax=lookback) 

101 self.previous_tmin_offsets = Queue(nmax=lookback) 

102 self.ncontinuous = 0 

103 self.disallow_uneven_sampling_rates = disallow_uneven_sampling_rates 

104 self.network = network 

105 self.station = station 

106 self.location = location 

107 self.channels = channels 

108 self.in_file = in_file # for testing 

109 self.listeners = [] 

110 self.quit_requested = False 

111 self.tune_to_quickones = tune_to_quickones 

112 self.start_string = start_string 

113 

114 self.min_detection_size = min_detection_size 

115 

116 def add_listener(self, obj): 

117 self.listeners.append(util.smart_weakref(obj)) 

118 

119 def clear_listeners(self): 

120 self.listeners = [] 

121 

122 def acquisition_start(self): 

123 if self.ser is not None: 

124 self.stop() 

125 

126 logger.debug( 

127 'Starting serial hamster (port=%s, baudrate=%i, timeout=%f)' 

128 % (self.port, self.baudrate, self.timeout)) 

129 

130 if self.in_file is None: 

131 import serial 

132 try: 

133 self.ser = serial.Serial( 

134 port=self.port, 

135 baudrate=self.baudrate, 

136 timeout=self.timeout) 

137 

138 self.send_start() 

139 

140 except serial.serialutil.SerialException: 

141 raise SerialHamsterError( 

142 'Cannot open serial port %s' % self.port) 

143 else: 

144 self.ser = self.in_file 

145 

146 def send_start(self): 

147 ser = self.ser 

148 if self.start_string is not None: 

149 ser.write(self.start_string.encode('ascii')) 

150 

151 def acquisition_stop(self): 

152 if self.ser is not None: 

153 logger.debug('Stopping serial hamster') 

154 if self.in_file is None: 

155 self.ser.close() 

156 self.ser = None 

157 self._flush_buffer() 

158 

159 def acquisition_request_stop(self): 

160 pass 

161 

162 def process(self): 

163 if self.ser is None: 

164 return False 

165 

166 try: 

167 line = self.ser.readline() 

168 if line == '': 

169 raise SerialHamsterError( 

170 'Failed to read from serial port %s' % self.port) 

171 except Exception: 

172 raise SerialHamsterError( 

173 'Failed to read from serial port %s' % self.port) 

174 

175 t = time.time() 

176 

177 for tok in line.split(): 

178 try: 

179 val = float(tok) 

180 except Exception: 

181 logger.warning('Got something unexpected on serial line. ' + 

182 'Current line: "%s". ' % line + 

183 'Could not convert string to float: "%s"' % tok) 

184 continue 

185 

186 self.values[0].append(val) 

187 self.times.append(t) 

188 

189 if len(self.values[0]) >= self.buffersize: 

190 self._flush_buffer() 

191 

192 return True 

193 

194 def _regression(self, t): 

195 toff = t[0] 

196 t = t-toff 

197 i = num.arange(t.size, dtype=float) 

198 r_deltat, r_tmin, r, tt, stderr = stats.linregress(i, t) 

199 if self.tune_to_quickones: 

200 for ii in range(2): 

201 t_fit = r_tmin+r_deltat*i 

202 quickones = num.where(t < t_fit) 

203 if quickones[0].size < 2: 

204 break 

205 i = i[quickones] 

206 t = t[quickones] 

207 r_deltat, r_tmin, r, tt, stderr = stats.linregress(i, t) 

208 

209 return r_deltat, r_tmin+toff 

210 

211 def _flush_buffer(self): 

212 

213 if len(self.times) < self.min_detection_size: 

214 return 

215 

216 t = num.array(self.times, dtype=float) 

217 r_deltat, r_tmin = self._regression(t) 

218 

219 if self.disallow_uneven_sampling_rates: 

220 r_deltat = 1./round(1./r_deltat) 

221 

222 # check if deltat is consistent with expectations 

223 if self.deltat is not None and self.fixed_deltat is None: 

224 try: 

225 p_deltat = self.previous_deltats.median() 

226 if (((self.disallow_uneven_sampling_rates 

227 and abs(1./p_deltat - 1./self.deltat) > 0.5) 

228 or (not self.disallow_uneven_sampling_rates 

229 and abs((self.deltat - p_deltat)/self.deltat) 

230 > self.deltat_tolerance)) 

231 and len(self.previous_deltats) 

232 > 0.5*self.previous_deltats.capacity()): 

233 

234 self.deltat = None 

235 self.previous_deltats.empty() 

236 except QueueIsEmpty: 

237 pass 

238 

239 self.previous_deltats.push_back(r_deltat) 

240 

241 # detect sampling rate 

242 if self.deltat is None: 

243 if self.fixed_deltat is not None: 

244 self.deltat = self.fixed_deltat 

245 else: 

246 self.deltat = r_deltat 

247 # must also set new time origin if sampling rate changes 

248 self.tmin = None 

249 logger.info( 

250 'Setting new sampling rate to %g Hz ' 

251 '(sampling interval is %g s)' % ( 

252 1./self.deltat, self.deltat)) 

253 

254 # check if onset has drifted / jumped 

255 if self.deltat is not None and self.tmin is not None: 

256 continuous_tmin = self.tmin + self.ncontinuous*self.deltat 

257 

258 tmin_offset = r_tmin - continuous_tmin 

259 try: 

260 toffset = self.previous_tmin_offsets.median() 

261 if abs(toffset) > self.deltat*0.7 \ 

262 and len(self.previous_tmin_offsets) \ 

263 > 0.5*self.previous_tmin_offsets.capacity(): 

264 

265 soffset = int(round(toffset/self.deltat)) 

266 logger.info( 

267 'Detected drift/jump/gap of %g sample%s' % ( 

268 soffset, ['s', ''][abs(soffset) == 1])) 

269 

270 if soffset == 1: 

271 for values in self.values: 

272 values.append(values[-1]) 

273 self.previous_tmin_offsets.add(-self.deltat) 

274 logger.info( 

275 'Adding one sample to compensate time drift') 

276 elif soffset == -1: 

277 for values in self.values: 

278 values.pop(-1) 

279 self.previous_tmin_offsets.add(+self.deltat) 

280 logger.info( 

281 'Removing one sample to compensate time drift') 

282 else: 

283 self.tmin = None 

284 self.previous_tmin_offsets.empty() 

285 

286 except QueueIsEmpty: 

287 pass 

288 

289 self.previous_tmin_offsets.push_back(tmin_offset) 

290 

291 # detect onset time 

292 if self.tmin is None and self.deltat is not None: 

293 self.tmin = r_tmin 

294 self.ncontinuous = 0 

295 logger.info( 

296 'Setting new time origin to %s' % util.time_to_str(self.tmin)) 

297 

298 if self.tmin is not None and self.deltat is not None: 

299 for channel, values in zip(self.channels, self.values): 

300 v = num.array(values, dtype=num.int32) 

301 

302 tr = trace.Trace( 

303 network=self.network, 

304 station=self.station, 

305 location=self.location, 

306 channel=channel, 

307 tmin=self.tmin + self.ncontinuous*self.deltat, 

308 deltat=self.deltat, 

309 ydata=v) 

310 

311 self.got_trace(tr) 

312 self.ncontinuous += v.size 

313 

314 self.values = [[]] * len(self.channels) 

315 self.times = [] 

316 

317 def got_trace(self, tr): 

318 logger.debug('Completed trace from serial hamster: %s' % tr) 

319 

320 # deliver payload to registered listeners 

321 for ref in self.listeners: 

322 obj = ref() 

323 if obj: 

324 obj.insert_trace(tr) 

325 

326 

327class CamSerialHamster(SerialHamster): 

328 

329 def __init__(self, baudrate=115200, channels=['N'], *args, **kwargs): 

330 SerialHamster.__init__( 

331 self, disallow_uneven_sampling_rates=False, deltat_tolerance=0.001, 

332 baudrate=baudrate, channels=channels, *args, **kwargs) 

333 

334 def send_start(self): 

335 try: 

336 ser = self.ser 

337 ser.write('99,e\n') 

338 a = ser.readline() 

339 if not a: 

340 raise SerialHamsterError( 

341 'Camera did not respond to command "99,e"') 

342 

343 logger.debug( 

344 'Sent command "99,e" to cam; received answer: "%s"' 

345 % a.strip()) 

346 

347 ser.write('2,e\n') 

348 a = ser.readline() 

349 if not a: 

350 raise SerialHamsterError( 

351 'Camera did not respond to command "2,e"') 

352 

353 logger.debug( 

354 'Sent command "2,e" to cam; received answer: "%s"' % a.strip()) 

355 ser.write('2,01\n') 

356 ser.write('2,f400\n') 

357 except Exception: 

358 raise SerialHamsterError( 

359 'Initialization of camera acquisition failed.') 

360 

361 def process(self): 

362 ser = self.ser 

363 

364 if ser is None: 

365 return False 

366 

367 ser.write('2,X\n') 

368 isamp = 0 

369 while True: 

370 data = ser.read(2) 

371 if len(data) != 2: 

372 raise SerialHamsterError( 

373 'Failed to read from serial line interface.') 

374 

375 uclow = ord(data[0]) 

376 uchigh = ord(data[1]) 

377 

378 if uclow == 0xff and uchigh == 0xff: 

379 break 

380 

381 v = uclow + (uchigh << 8) 

382 

383 self.times.append(time.time()) 

384 self.values[isamp % len(self.channels)].append(v) 

385 isamp += 1 

386 

387 if len(self.values[-1]) >= self.buffersize: 

388 self._flush_buffer() 

389 

390 return True 

391 

392 

393class USBHB628Hamster(SerialHamster): 

394 

395 def __init__(self, baudrate=115200, channels=[(0, 'Z')], *args, **kwargs): 

396 SerialHamster.__init__( 

397 self, 

398 baudrate=baudrate, 

399 channels=[x[1] for x in channels], 

400 tune_to_quickones=False, 

401 *args, **kwargs) 

402 

403 self.channel_map = dict([(c[0], j) for (j, c) in enumerate(channels)]) 

404 self.first_initiated = None 

405 self.ntaken = 0 

406 

407 def process(self): 

408 import serial 

409 

410 ser = self.ser 

411 

412 if ser is None: 

413 return False 

414 

415 t = time.time() 

416 

417 # determine next appropriate sampling instant 

418 if self.first_initiated is not None: 

419 ts = self.first_initiated + self.fixed_deltat * self.ntaken 

420 if t - ts > self.fixed_deltat*10: 

421 logger.warning( 

422 'lagging more than ten samples on serial line %s - ' 

423 'resetting' % self.port) 

424 

425 self.first_initiated = None 

426 

427 if not self.first_initiated: 

428 ts = math.ceil(t/self.fixed_deltat)*self.fixed_deltat 

429 self.first_initiated = ts 

430 self.ntaken = 0 

431 

432 # wait for next sampling instant 

433 while t < ts: 

434 time.sleep(max(0., ts-t)) 

435 t = time.time() 

436 

437 if t - ts > self.fixed_deltat: 

438 logger.warning( 

439 'lagging more than one sample on serial line %s' % self.port) 

440 

441 # get the sample 

442 ser.write('c09') 

443 ser.flush() 

444 try: 

445 data = [ord(x) for x in ser.read(17)] 

446 if len(data) != 17: 

447 raise SerialHamsterError('Reading from serial line failed.') 

448 

449 except serial.serialutil.SerialException: 

450 raise SerialHamsterError('Reading from serial line failed.') 

451 

452 self.ntaken += 1 

453 

454 for ichan in range(8): 

455 if ichan in self.channel_map: 

456 v = data[ichan*2] + (data[ichan*2+1] << 8) 

457 self.values[self.channel_map[ichan]].append(v) 

458 

459 self.times.append(t) 

460 

461 if len(self.times) >= self.buffersize: 

462 self._flush_buffer() 

463 

464 return True 

465 

466 

467class AcquisitionThread(threading.Thread): 

468 def __init__(self, post_process_sleep=0.0): 

469 threading.Thread.__init__(self) 

470 self.queue = queue.Queue() 

471 self.post_process_sleep = post_process_sleep 

472 self._sun_is_shining = True 

473 

474 def run(self): 

475 while True: 

476 try: 

477 self.acquisition_start() 

478 while self._sun_is_shining: 

479 t0 = time.time() 

480 self.process() 

481 t1 = time.time() 

482 if self.post_process_sleep != 0.0: 

483 time.sleep(max(0, self.post_process_sleep-(t1-t0))) 

484 

485 self.acquisition_stop() 

486 break 

487 

488 except SerialHamsterError as e: 

489 

490 logger.error(str(e)) 

491 logger.error('Acquistion terminated, restart in 5 s') 

492 self.acquisition_stop() 

493 time.sleep(5) 

494 if not self._sun_is_shining: 

495 break 

496 

497 def stop(self): 

498 self._sun_is_shining = False 

499 

500 logger.debug('Waiting for thread to terminate...') 

501 self.wait() 

502 logger.debug('Thread has terminated.') 

503 

504 def got_trace(self, tr): 

505 self.queue.put(tr) 

506 

507 def poll(self): 

508 items = [] 

509 try: 

510 while True: 

511 items.append(self.queue.get_nowait()) 

512 

513 except queue.Empty: 

514 pass 

515 

516 return items 

517 

518 

519class Acquisition( 

520 SerialHamster, AcquisitionThread): 

521 

522 def __init__(self, *args, **kwargs): 

523 SerialHamster.__init__(self, *args, **kwargs) 

524 AcquisitionThread.__init__(self, post_process_sleep=0.001) 

525 

526 def got_trace(self, tr): 

527 logger.debug('acquisition got trace rate %g Hz, duration %g s' % ( 

528 1.0/tr.deltat, tr.tmax - tr.tmin)) 

529 AcquisitionThread.got_trace(self, tr)