Coverage for /usr/local/lib/python3.11/dist-packages/pyrocko/streaming/edl.py: 38%

307 statements  

« prev     ^ index     » next       coverage.py v6.5.0, created at 2024-03-07 11:54 +0000

1# http://pyrocko.org - GPLv3 

2# 

3# The Pyrocko Developers, 21st Century 

4# ---|P------/S----------~Lg---------- 

5 

6''' 

7Live stream reader for Earth Data EDL data loggers. 

8''' 

9 

10import struct 

11import collections 

12import time 

13import logging 

14import calendar 

15import numpy as num 

16 

17from pyrocko import trace, util 

18 

19logger = logging.getLogger('pyrocko.streaming.edl') 

20 

21 

22def hexdump(chars, sep=' ', width=16): 

23 while chars: 

24 line = chars[:width] 

25 chars = chars[width:] 

26 line = line.ljust(width, '\000') 

27 print('%s%s%s' % ( 

28 sep.join('%02x' % ord(c) for c in line), 

29 sep, quotechars(line))) 

30 

31 

32def quotechars(chars): 

33 return ''.join(['.', c][c.isalnum()] for c in chars) 

34 

35 

36MINBLOCKSIZE = 192 

37 

38 

39class NotAquiring(Exception): 

40 pass 

41 

42 

43class ReadError(Exception): 

44 pass 

45 

46 

47class ReadTimeout(ReadError): 

48 pass 

49 

50 

51class ReadUnexpected(ReadError): 

52 pass 

53 

54 

55class GPSError(Exception): 

56 pass 

57 

58 

59class NoGPS(GPSError): 

60 pass 

61 

62 

63class NoGPSTime(GPSError): 

64 pass 

65 

66 

67class GPSTimeNotUTC(GPSError): 

68 pass 

69 

70 

71block_def = {} 

72 

73block_def['MOD\0'] = ''' 

74 identifier 4s 

75 size_of_mod I 

76 device_id 12s 

77 version 6s 

78 space1 1s 

79 serial_no 4s 

80 space2 1s 

81 test_pattern 8s 

82 block_count I 

83 ncomps H 

84 sample_rate H 

85 bytes_per_sample H 

86 filter_type 2s 

87 decimation3 H 

88 decimation4 H 

89 plldata h 

90 gain_g 1s 

91 gain B 

92 gain_component1 I 

93 gain_component2 I 

94 gain_component3 I 

95 offset_component1 I 

96 offset_component2 I 

97 offset_component3 I 

98 supply_voltage H 

99 supply_current H 

100 temp_sensor_voltage H 

101 supply_voltage_remote H 

102 user_input1 H 

103 user_input2 H 

104 user_input3 H 

105 not_used H 

106 coupling 2s 

107 reserved1 I 

108 reserved2 I 

109 reserved3 I 

110 gps_status_flags H 

111 gps_message_block_count I 

112 gps_message 72s 

113'''.split() 

114 

115block_def['MDE\0'] = ''' 

116 identifier 4s 

117 size_of_mde I 

118 serial_no 8s 

119 decimation5 H 

120 decimation6 H 

121 decimation7 H 

122 gain_component4 I 

123 gain_component5 I 

124 gain_component6 I 

125 offset_component4 I 

126 offset_component5 I 

127 offset_component6 I 

128 temperature1 H 

129 temperature2 H 

130 temperature3 H 

131 temperature4 H 

132 temperature5 H 

133 temperature6 H 

134 gps_message 129s 

135 pad 1s 

136'''.split() 

137 

138block_def['SUM\0'] = ''' 

139 identifier 4s 

140 size_of_sum I 

141 reserved H 

142 checksum_lo B 

143 checksum_hi B 

144'''.split() 

145 

146block_def['DAT\0'] = ''' 

147 identifier 4s 

148 size_of_dat I 

149'''.split() 

150 

151 

152def make_block_classes(): 

153 

154 Blocks = {} 

155 for k in block_def.keys(): 

156 fmt = '<'+''.join(block_def[k][1::2]) 

157 fmt_len = struct.calcsize(fmt) 

158 Block = collections.namedtuple('Block'+k[:3], block_def[k][::2]) 

159 Blocks[k] = (fmt, fmt_len, Block) 

160 

161 return Blocks 

162 

163 

164Blocks = make_block_classes() 

165 

166 

167gps_fmt = {} 

168# gps altitude message 

169gps_fmt['AL'] = ''' 

170 type 2 str 

171 time_of_day 5 float 

172 altitude 6 float 

173 vertical_velocity 4 float 

174 source_flag 1 int 

175 age_flag 1 int 

176''' 

177 

178# gps position velocity message 

179gps_fmt['PV'] = ''' 

180 type 2 str 

181 time_of_day 5 float 

182 latitude 8 lat_float 

183 longitude 9 lon_float 

184 speed_mph 3 float 

185 heading 3 float 

186 source_flag 1 int 

187 age_flag 1 int 

188''' 

189 

190# gps status message 

191gps_fmt['ST'] = ''' 

192 type 2 str 

193 tracking_status_code 2 hex_int 

194 nibble1 1 hex_int 

195 nibble2 1 hex_int 

196 machine_id 2 hex_int 

197 nibble3 1 hex_int 

198 nibble4 1 hex_int 

199 reserved 2 str 

200''' 

201 

202# gps time date message 

203gps_fmt['TM'] = ''' 

204 type 2 str 

205 hours 2 int 

206 minutes 2 int 

207 seconds 5 seconds_float 

208 day 2 int 

209 month 2 int 

210 year 4 int 

211 gps_utc_time_offset 2 int 

212 current_fix_source 1 int 

213 number_usable_svs 2 int 

214 gps_utc_offset_flag 1 int 

215 reserved 5 str 

216''' 

217 

218 

219def latlon_float(s): 

220 return int(s) / 100000. 

221 

222 

223def seconds_float(s): 

224 return int(s) / 1000. 

225 

226 

227def hex_int(s): 

228 return int(s, 16) 

229 

230 

231convert_functions = { 

232 'int': int, 

233 'float': float, 

234 'lat_float': latlon_float, 

235 'lon_float': latlon_float, 

236 'seconds_float': seconds_float, 

237 'str': str, 

238 'hex_int': hex_int} 

239 

240 

241class GPSFormat_(object): 

242 def __init__(self, name, fmt): 

243 

244 nn = 0 

245 names = [] 

246 items = [] 

247 for line in fmt.strip().splitlines(): 

248 toks = line.split() 

249 n = int(toks[1]) 

250 items.append((nn, nn+n, convert_functions[toks[2]])) 

251 names.append(toks[0]) 

252 nn += n 

253 

254 self.items = items 

255 self.Message = collections.namedtuple('GPSMessage'+k, names) 

256 

257 def unpack(self, s): 

258 return self.Message( 

259 *(converter(s[begin:end]) 

260 for (begin, end, converter) in self.items)) 

261 

262 

263GPSFormat = {} 

264for k in gps_fmt.keys(): 

265 GPSFormat[k] = GPSFormat_(k, gps_fmt[k]) 

266 

267 

268def portnames(): 

269 try: 

270 # needs serial >= v2.6 

271 from serial.tools.list_ports import comports 

272 names = sorted(x[0] for x in comports()) 

273 

274 except Exception: 

275 # may only work on some linuxes 

276 from glob import glob 

277 names = sorted(glob('/dev/ttyS*') + glob('/dev/ttyUSB*')) 

278 

279 return names 

280 

281 

282def unpack_block(data): 

283 block_type = data[:4] 

284 

285 if block_type not in Blocks: 

286 return None 

287 

288 fmt, fmt_len, Block = Blocks[block_type] 

289 

290 if len(data) < fmt_len: 

291 raise ReadError('block size too short') 

292 

293 return Block(*struct.unpack(fmt, data[:fmt_len])), data[fmt_len:] 

294 

295 

296def unpack_values(ncomps, bytes_per_sample, data): 

297 if bytes_per_sample == 4: 

298 return num.frombuffer(data, dtype=num.dtype('<i4')) 

299 

300 # 3-byte mode is broken: 

301 # elif bytes_per_sample == 3: 

302 # b1 = num.frombuffer(data, dtype=num.dtype('<i1')) 

303 # b4 = num.zeros(len(data)/4, dtype=num.dtype('<i4')) 

304 # b4.view(dtype='<i2')[::2] = b1.view(dtype='<i2') 

305 # b4.view(dtype='<i1')[2::4] = b1[i::3] 

306 # return b4.astype(num.int32) 

307 

308 else: 

309 raise ReadError('unimplemented bytes_per_sample setting') 

310 

311 

312class TimeEstimator(object): 

313 def __init__(self, nlookback): 

314 self._nlookback = nlookback 

315 self._queue = [] 

316 self._t0 = None 

317 self._n = 0 

318 self._deltat = None 

319 

320 def insert(self, deltat, nadd, t): 

321 

322 if self._deltat is None or self._deltat != deltat: 

323 self.reset() 

324 self._deltat = deltat 

325 

326 if self._t0 is None and t is not None: 

327 self._t0 = int(round(t/self._deltat))*self._deltat 

328 

329 if t is None: 

330 self._n += nadd 

331 if self._t0: 

332 return self._t0 + (self._n-nadd)*self._deltat 

333 else: 

334 return None 

335 

336 self._queue.append((self._n, t)) 

337 self._n += nadd 

338 

339 while len(self._queue) > self._nlookback: 

340 self._queue.pop(0) 

341 

342 ns, ts = num.array(self._queue, dtype=float).T 

343 

344 tpredicts = self._t0 + ns * self._deltat 

345 

346 terrors = ts - tpredicts 

347 mterror = num.median(terrors) 

348 print(mterror / deltat, '+-', num.std(terrors) / deltat) 

349 

350 if num.abs(mterror) > 0.75*deltat and \ 

351 len(self._queue) == self._nlookback: 

352 

353 t0 = self._t0 + mterror 

354 self._queue[:] = [] 

355 self._t0 = int(round(t0/self._deltat))*self._deltat 

356 

357 return self._t0 + (self._n-nadd)*self._deltat 

358 

359 def reset(self): 

360 self._queue[:] = [] 

361 self._n = 0 

362 self._t0 = None 

363 

364 def __len__(self): 

365 return len(self._queue) 

366 

367 

368class GPSRecord(object): 

369 def __init__(self, al, pv, st, tm): 

370 self._al = al 

371 self._pv = pv 

372 self._st = st 

373 self._tm = tm 

374 

375 @property 

376 def time(self): 

377 if not self._st.tracking_status_code == 0: 

378 raise NoGPSTime() 

379 

380 if not self._tm.gps_utc_offset_flag: 

381 raise GPSTimeNotUTC() 

382 

383 tm = self._tm 

384 return util.to_time_float(calendar.timegm(( 

385 tm.year, tm.month, tm.day, tm.hours, tm.minutes, tm.seconds))) 

386 

387 @property 

388 def latitude(self): 

389 return self._pv.latitude 

390 

391 @property 

392 def longitude(self): 

393 return self._pv.longitude 

394 

395 @property 

396 def altitude(self): 

397 return self._al.altitude 

398 

399 def __str__(self): 

400 try: 

401 stime = util.time_to_str(self.time) 

402 except GPSError: 

403 stime = '?' 

404 return '''%s %s %s %s''' % ( 

405 stime, self.latitude, self.longitude, self.altitude) 

406 

407 

408def stime_none_aware(t): 

409 if t is None: 

410 return '?' 

411 else: 

412 return util.time_to_str(t) 

413 

414 

415class Record(object): 

416 def __init__(self, mod, mde, dat, sum, values): 

417 self._mod = mod 

418 self._mde = mde 

419 self._dat = dat 

420 self._sum = sum 

421 self._values = values 

422 self._approx_system_time = None 

423 self._approx_gps_time = None 

424 self._gps = None 

425 

426 def set_approx_times( 

427 self, approx_system_time, approx_gps_time, measured_system_time): 

428 

429 self._approx_system_time = approx_system_time 

430 self._approx_gps_time = approx_gps_time 

431 self._measured_system_time = measured_system_time 

432 

433 @property 

434 def time(self): 

435 if self._mod.reserved1 != 0: 

436 return float(self._mod.reserved1) 

437 

438 return self._approx_system_time 

439 

440 @property 

441 def traces(self): 

442 traces = [] 

443 for i in range(self._mod.ncomps): 

444 tr = trace.Trace( 

445 '', 'ed', '', 'p%i' % i, 

446 deltat=float(self._mod.ncomps)/self._mod.sample_rate, 

447 tmin=self.time, 

448 ydata=self._values[i::3]) 

449 

450 traces.append(tr) 

451 

452 traces.extend(self.traces_delays()) 

453 

454 return traces 

455 

456 def traces_delays(self): 

457 traces = [] 

458 for name, val in ( 

459 ('gp', self.gps_time_or_none), 

460 ('sm', self._measured_system_time), 

461 ('sp', self._approx_system_time)): 

462 

463 if val is not None: 

464 tr = trace.Trace( 

465 '', 'ed', name, 'del', 

466 deltat=1.0, 

467 tmin=self.time, 

468 ydata=num.array([val - self.time])) 

469 

470 traces.append(tr) 

471 

472 return traces 

473 

474 def _gps_messages(self): 

475 for line in self._mde.gps_message.splitlines(): 

476 if len(line) > 4 and line[0] == '>' and line.rstrip()[-1] == '<': 

477 yield GPSFormat[line[2:4]].unpack(line[2:]) 

478 

479 @property 

480 def gps(self): 

481 if self._mod.block_count != self._mod.gps_message_block_count: 

482 raise NoGPS() 

483 

484 if self._gps is not None: 

485 return self._gps 

486 

487 kwargs = {} 

488 for mess in self._gps_messages(): 

489 kwargs[mess.type.lower()] = mess 

490 

491 if sorted(kwargs.keys()) == ['al', 'pv', 'st', 'tm']: 

492 self._gps = GPSRecord(**kwargs) 

493 return self._gps 

494 else: 

495 raise NoGPS() 

496 

497 @property 

498 def gps_time_or_none(self): 

499 try: 

500 return self.gps.time 

501 except GPSError: 

502 return None 

503 

504 def __str__(self): 

505 return '\n'.join([ 

506 '%s' % str(x) for x in (self._mod, self._mde)]) + '\n' 

507 

508 def str_times(self): 

509 return '''--- Record --- 

510Time GPS: %s (estimated) %s (measured) 

511Time system: %s (estimated) %s (measured) 

512''' % tuple([stime_none_aware(t) for t in ( 

513 self._approx_gps_time, 

514 self.gps_time_or_none, 

515 self._approx_system_time, 

516 self._measured_system_time)]) 

517 

518 

519class Reader(object): 

520 

521 def __init__(self, port=0, timeout=2., baudrate=115200, lookback=30): 

522 if isinstance(port, int): 

523 self._port = portnames()[port] 

524 else: 

525 self._port = str(port) 

526 

527 self._timeout = float(timeout) 

528 self._baudrate = int(baudrate) 

529 self._serial = None 

530 self._buffer = '' 

531 self._irecord = 0 

532 

533 self._time_estimator_system = TimeEstimator(lookback) 

534 self._time_estimator_gps = TimeEstimator(lookback) 

535 

536 def running(self): 

537 return self._serial is not None 

538 

539 def assert_running(self): 

540 if not self.running(): 

541 raise NotAquiring() 

542 

543 def start(self): 

544 self.stop() 

545 

546 import serial 

547 self._serial = serial.Serial( 

548 port=self._port, 

549 baudrate=self._baudrate, 

550 timeout=self._timeout) 

551 

552 self._sync_on_mod() 

553 

554 def _sync_on_mod(self): 

555 self._fill_buffer(MINBLOCKSIZE) 

556 

557 while self._buffer[:4] != 'MOD\0': 

558 imark = self._buffer.find('MOD\0') 

559 if imark != -1: 

560 self._buffer = self._buffer[imark:] 

561 else: 

562 self._buffer = self._buffer[-4:] 

563 

564 self._fill_buffer(MINBLOCKSIZE) 

565 

566 def _fill_buffer(self, minlen): 

567 if len(self._buffer) >= minlen: 

568 return 

569 

570 nread = minlen-len(self._buffer) 

571 try: 

572 data = self._serial.read(nread) 

573 hexdump(data) 

574 

575 except Exception: 

576 raise ReadError() 

577 

578 if len(data) != nread: 

579 self.stop() 

580 raise ReadTimeout() 

581 self._buffer += data 

582 

583 def _read_block(self, need_block_type=None): 

584 self.assert_running() 

585 self._fill_buffer(8) 

586 block_type, block_len = struct.unpack('<4sI', self._buffer[:8]) 

587 if need_block_type is not None and block_type != need_block_type: 

588 raise ReadUnexpected() 

589 

590 block_len += 8 

591 self._fill_buffer(block_len) 

592 block_data = self._buffer 

593 self._buffer = '' 

594 return unpack_block(block_data) 

595 

596 def read_record(self): 

597 self._irecord += 1 

598 mod, _ = self._read_block('MOD\0') 

599 measured_system_time = time.time() - 4.0 

600 mde, _ = self._read_block('MDE\0') 

601 dat, values_data = self._read_block('DAT\0') 

602 sum, _ = self._read_block('SUM\0') 

603 values = unpack_values(mod.ncomps, mod.bytes_per_sample, values_data) 

604 deltat = 1./mod.sample_rate * mod.ncomps 

605 r = Record(mod, mde, dat, sum, values) 

606 approx_system_time = self._time_estimator_system.insert( 

607 deltat, values.size//mod.ncomps, measured_system_time) 

608 

609 try: 

610 gpstime = r.gps.time 

611 except GPSError: 

612 gpstime = None 

613 

614 approx_gps_time = self._time_estimator_gps.insert( 

615 deltat, values.size//mod.ncomps, gpstime) 

616 

617 r.set_approx_times( 

618 approx_system_time, approx_gps_time, measured_system_time) 

619 

620 return r 

621 

622 def stop(self): 

623 if not self.running(): 

624 return 

625 

626 self._serial.close() 

627 self._serial = None 

628 self._buffer = '' 

629 self._time_estimator_system.reset() 

630 self._time_estimator_gps.reset() 

631 

632 

633class EDLHamster(object): 

634 def __init__(self, *args, **kwargs): 

635 self.reader = Reader(*args, **kwargs) 

636 

637 def acquisition_start(self): 

638 self.reader.start() 

639 

640 def acquisition_stop(self): 

641 self.reader.stop() 

642 

643 def process(self): 

644 rec = self.reader.read_record() 

645 self.got_record(rec) 

646 

647 def got_record(self, rec): 

648 for tr in rec.traces: 

649 self.got_trace(tr) 

650 

651 def got_trace(self, tr): 

652 logger.info('Got trace from EDL: %s' % tr)