Coverage for /usr/local/lib/python3.11/dist-packages/pyrocko/streaming/edl.py: 38%
307 statements
« prev ^ index » next coverage.py v6.5.0, created at 2023-10-25 15:33 +0000
« prev ^ index » next coverage.py v6.5.0, created at 2023-10-25 15:33 +0000
1# http://pyrocko.org - GPLv3
2#
3# The Pyrocko Developers, 21st Century
4# ---|P------/S----------~Lg----------
6'''
7Live stream reader for Earth Data EDL data loggers.
8'''
10import struct
11import collections
12import time
13import logging
14import calendar
15import numpy as num
17from pyrocko import trace, util
19logger = logging.getLogger('pyrocko.streaming.edl')
22def hexdump(chars, sep=' ', width=16):
23 while chars:
24 line = chars[:width]
25 chars = chars[width:]
26 line = line.ljust(width, '\000')
27 print('%s%s%s' % (
28 sep.join('%02x' % ord(c) for c in line),
29 sep, quotechars(line)))
32def quotechars(chars):
33 return ''.join(['.', c][c.isalnum()] for c in chars)
36MINBLOCKSIZE = 192
39class NotAquiring(Exception):
40 pass
43class ReadError(Exception):
44 pass
47class ReadTimeout(ReadError):
48 pass
51class ReadUnexpected(ReadError):
52 pass
55class GPSError(Exception):
56 pass
59class NoGPS(GPSError):
60 pass
63class NoGPSTime(GPSError):
64 pass
67class GPSTimeNotUTC(GPSError):
68 pass
71block_def = {}
73block_def['MOD\0'] = '''
74 identifier 4s
75 size_of_mod I
76 device_id 12s
77 version 6s
78 space1 1s
79 serial_no 4s
80 space2 1s
81 test_pattern 8s
82 block_count I
83 ncomps H
84 sample_rate H
85 bytes_per_sample H
86 filter_type 2s
87 decimation3 H
88 decimation4 H
89 plldata h
90 gain_g 1s
91 gain B
92 gain_component1 I
93 gain_component2 I
94 gain_component3 I
95 offset_component1 I
96 offset_component2 I
97 offset_component3 I
98 supply_voltage H
99 supply_current H
100 temp_sensor_voltage H
101 supply_voltage_remote H
102 user_input1 H
103 user_input2 H
104 user_input3 H
105 not_used H
106 coupling 2s
107 reserved1 I
108 reserved2 I
109 reserved3 I
110 gps_status_flags H
111 gps_message_block_count I
112 gps_message 72s
113'''.split()
115block_def['MDE\0'] = '''
116 identifier 4s
117 size_of_mde I
118 serial_no 8s
119 decimation5 H
120 decimation6 H
121 decimation7 H
122 gain_component4 I
123 gain_component5 I
124 gain_component6 I
125 offset_component4 I
126 offset_component5 I
127 offset_component6 I
128 temperature1 H
129 temperature2 H
130 temperature3 H
131 temperature4 H
132 temperature5 H
133 temperature6 H
134 gps_message 129s
135 pad 1s
136'''.split()
138block_def['SUM\0'] = '''
139 identifier 4s
140 size_of_sum I
141 reserved H
142 checksum_lo B
143 checksum_hi B
144'''.split()
146block_def['DAT\0'] = '''
147 identifier 4s
148 size_of_dat I
149'''.split()
152def make_block_classes():
154 Blocks = {}
155 for k in block_def.keys():
156 fmt = '<'+''.join(block_def[k][1::2])
157 fmt_len = struct.calcsize(fmt)
158 Block = collections.namedtuple('Block'+k[:3], block_def[k][::2])
159 Blocks[k] = (fmt, fmt_len, Block)
161 return Blocks
164Blocks = make_block_classes()
167gps_fmt = {}
168# gps altitude message
169gps_fmt['AL'] = '''
170 type 2 str
171 time_of_day 5 float
172 altitude 6 float
173 vertical_velocity 4 float
174 source_flag 1 int
175 age_flag 1 int
176'''
178# gps position velocity message
179gps_fmt['PV'] = '''
180 type 2 str
181 time_of_day 5 float
182 latitude 8 lat_float
183 longitude 9 lon_float
184 speed_mph 3 float
185 heading 3 float
186 source_flag 1 int
187 age_flag 1 int
188'''
190# gps status message
191gps_fmt['ST'] = '''
192 type 2 str
193 tracking_status_code 2 hex_int
194 nibble1 1 hex_int
195 nibble2 1 hex_int
196 machine_id 2 hex_int
197 nibble3 1 hex_int
198 nibble4 1 hex_int
199 reserved 2 str
200'''
202# gps time date message
203gps_fmt['TM'] = '''
204 type 2 str
205 hours 2 int
206 minutes 2 int
207 seconds 5 seconds_float
208 day 2 int
209 month 2 int
210 year 4 int
211 gps_utc_time_offset 2 int
212 current_fix_source 1 int
213 number_usable_svs 2 int
214 gps_utc_offset_flag 1 int
215 reserved 5 str
216'''
219def latlon_float(s):
220 return int(s) / 100000.
223def seconds_float(s):
224 return int(s) / 1000.
227def hex_int(s):
228 return int(s, 16)
231convert_functions = {
232 'int': int,
233 'float': float,
234 'lat_float': latlon_float,
235 'lon_float': latlon_float,
236 'seconds_float': seconds_float,
237 'str': str,
238 'hex_int': hex_int}
241class GPSFormat_(object):
242 def __init__(self, name, fmt):
244 nn = 0
245 names = []
246 items = []
247 for line in fmt.strip().splitlines():
248 toks = line.split()
249 n = int(toks[1])
250 items.append((nn, nn+n, convert_functions[toks[2]]))
251 names.append(toks[0])
252 nn += n
254 self.items = items
255 self.Message = collections.namedtuple('GPSMessage'+k, names)
257 def unpack(self, s):
258 return self.Message(
259 *(converter(s[begin:end])
260 for (begin, end, converter) in self.items))
263GPSFormat = {}
264for k in gps_fmt.keys():
265 GPSFormat[k] = GPSFormat_(k, gps_fmt[k])
268def portnames():
269 try:
270 # needs serial >= v2.6
271 from serial.tools.list_ports import comports
272 names = sorted(x[0] for x in comports())
274 except Exception:
275 # may only work on some linuxes
276 from glob import glob
277 names = sorted(glob('/dev/ttyS*') + glob('/dev/ttyUSB*'))
279 return names
282def unpack_block(data):
283 block_type = data[:4]
285 if block_type not in Blocks:
286 return None
288 fmt, fmt_len, Block = Blocks[block_type]
290 if len(data) < fmt_len:
291 raise ReadError('block size too short')
293 return Block(*struct.unpack(fmt, data[:fmt_len])), data[fmt_len:]
296def unpack_values(ncomps, bytes_per_sample, data):
297 if bytes_per_sample == 4:
298 return num.frombuffer(data, dtype=num.dtype('<i4'))
300 # 3-byte mode is broken:
301 # elif bytes_per_sample == 3:
302 # b1 = num.frombuffer(data, dtype=num.dtype('<i1'))
303 # b4 = num.zeros(len(data)/4, dtype=num.dtype('<i4'))
304 # b4.view(dtype='<i2')[::2] = b1.view(dtype='<i2')
305 # b4.view(dtype='<i1')[2::4] = b1[i::3]
306 # return b4.astype(num.int32)
308 else:
309 raise ReadError('unimplemented bytes_per_sample setting')
312class TimeEstimator(object):
313 def __init__(self, nlookback):
314 self._nlookback = nlookback
315 self._queue = []
316 self._t0 = None
317 self._n = 0
318 self._deltat = None
320 def insert(self, deltat, nadd, t):
322 if self._deltat is None or self._deltat != deltat:
323 self.reset()
324 self._deltat = deltat
326 if self._t0 is None and t is not None:
327 self._t0 = int(round(t/self._deltat))*self._deltat
329 if t is None:
330 self._n += nadd
331 if self._t0:
332 return self._t0 + (self._n-nadd)*self._deltat
333 else:
334 return None
336 self._queue.append((self._n, t))
337 self._n += nadd
339 while len(self._queue) > self._nlookback:
340 self._queue.pop(0)
342 ns, ts = num.array(self._queue, dtype=float).T
344 tpredicts = self._t0 + ns * self._deltat
346 terrors = ts - tpredicts
347 mterror = num.median(terrors)
348 print(mterror / deltat, '+-', num.std(terrors) / deltat)
350 if num.abs(mterror) > 0.75*deltat and \
351 len(self._queue) == self._nlookback:
353 t0 = self._t0 + mterror
354 self._queue[:] = []
355 self._t0 = int(round(t0/self._deltat))*self._deltat
357 return self._t0 + (self._n-nadd)*self._deltat
359 def reset(self):
360 self._queue[:] = []
361 self._n = 0
362 self._t0 = None
364 def __len__(self):
365 return len(self._queue)
368class GPSRecord(object):
369 def __init__(self, al, pv, st, tm):
370 self._al = al
371 self._pv = pv
372 self._st = st
373 self._tm = tm
375 @property
376 def time(self):
377 if not self._st.tracking_status_code == 0:
378 raise NoGPSTime()
380 if not self._tm.gps_utc_offset_flag:
381 raise GPSTimeNotUTC()
383 tm = self._tm
384 return util.to_time_float(calendar.timegm((
385 tm.year, tm.month, tm.day, tm.hours, tm.minutes, tm.seconds)))
387 @property
388 def latitude(self):
389 return self._pv.latitude
391 @property
392 def longitude(self):
393 return self._pv.longitude
395 @property
396 def altitude(self):
397 return self._al.altitude
399 def __str__(self):
400 try:
401 stime = util.time_to_str(self.time)
402 except GPSError:
403 stime = '?'
404 return '''%s %s %s %s''' % (
405 stime, self.latitude, self.longitude, self.altitude)
408def stime_none_aware(t):
409 if t is None:
410 return '?'
411 else:
412 return util.time_to_str(t)
415class Record(object):
416 def __init__(self, mod, mde, dat, sum, values):
417 self._mod = mod
418 self._mde = mde
419 self._dat = dat
420 self._sum = sum
421 self._values = values
422 self._approx_system_time = None
423 self._approx_gps_time = None
424 self._gps = None
426 def set_approx_times(
427 self, approx_system_time, approx_gps_time, measured_system_time):
429 self._approx_system_time = approx_system_time
430 self._approx_gps_time = approx_gps_time
431 self._measured_system_time = measured_system_time
433 @property
434 def time(self):
435 if self._mod.reserved1 != 0:
436 return float(self._mod.reserved1)
438 return self._approx_system_time
440 @property
441 def traces(self):
442 traces = []
443 for i in range(self._mod.ncomps):
444 tr = trace.Trace(
445 '', 'ed', '', 'p%i' % i,
446 deltat=float(self._mod.ncomps)/self._mod.sample_rate,
447 tmin=self.time,
448 ydata=self._values[i::3])
450 traces.append(tr)
452 traces.extend(self.traces_delays())
454 return traces
456 def traces_delays(self):
457 traces = []
458 for name, val in (
459 ('gp', self.gps_time_or_none),
460 ('sm', self._measured_system_time),
461 ('sp', self._approx_system_time)):
463 if val is not None:
464 tr = trace.Trace(
465 '', 'ed', name, 'del',
466 deltat=1.0,
467 tmin=self.time,
468 ydata=num.array([val - self.time]))
470 traces.append(tr)
472 return traces
474 def _gps_messages(self):
475 for line in self._mde.gps_message.splitlines():
476 if len(line) > 4 and line[0] == '>' and line.rstrip()[-1] == '<':
477 yield GPSFormat[line[2:4]].unpack(line[2:])
479 @property
480 def gps(self):
481 if self._mod.block_count != self._mod.gps_message_block_count:
482 raise NoGPS()
484 if self._gps is not None:
485 return self._gps
487 kwargs = {}
488 for mess in self._gps_messages():
489 kwargs[mess.type.lower()] = mess
491 if sorted(kwargs.keys()) == ['al', 'pv', 'st', 'tm']:
492 self._gps = GPSRecord(**kwargs)
493 return self._gps
494 else:
495 raise NoGPS()
497 @property
498 def gps_time_or_none(self):
499 try:
500 return self.gps.time
501 except GPSError:
502 return None
504 def __str__(self):
505 return '\n'.join([
506 '%s' % str(x) for x in (self._mod, self._mde)]) + '\n'
508 def str_times(self):
509 return '''--- Record ---
510Time GPS: %s (estimated) %s (measured)
511Time system: %s (estimated) %s (measured)
512''' % tuple([stime_none_aware(t) for t in (
513 self._approx_gps_time,
514 self.gps_time_or_none,
515 self._approx_system_time,
516 self._measured_system_time)])
519class Reader(object):
521 def __init__(self, port=0, timeout=2., baudrate=115200, lookback=30):
522 if isinstance(port, int):
523 self._port = portnames()[port]
524 else:
525 self._port = str(port)
527 self._timeout = float(timeout)
528 self._baudrate = int(baudrate)
529 self._serial = None
530 self._buffer = ''
531 self._irecord = 0
533 self._time_estimator_system = TimeEstimator(lookback)
534 self._time_estimator_gps = TimeEstimator(lookback)
536 def running(self):
537 return self._serial is not None
539 def assert_running(self):
540 if not self.running():
541 raise NotAquiring()
543 def start(self):
544 self.stop()
546 import serial
547 self._serial = serial.Serial(
548 port=self._port,
549 baudrate=self._baudrate,
550 timeout=self._timeout)
552 self._sync_on_mod()
554 def _sync_on_mod(self):
555 self._fill_buffer(MINBLOCKSIZE)
557 while self._buffer[:4] != 'MOD\0':
558 imark = self._buffer.find('MOD\0')
559 if imark != -1:
560 self._buffer = self._buffer[imark:]
561 else:
562 self._buffer = self._buffer[-4:]
564 self._fill_buffer(MINBLOCKSIZE)
566 def _fill_buffer(self, minlen):
567 if len(self._buffer) >= minlen:
568 return
570 nread = minlen-len(self._buffer)
571 try:
572 data = self._serial.read(nread)
573 hexdump(data)
575 except Exception:
576 raise ReadError()
578 if len(data) != nread:
579 self.stop()
580 raise ReadTimeout()
581 self._buffer += data
583 def _read_block(self, need_block_type=None):
584 self.assert_running()
585 self._fill_buffer(8)
586 block_type, block_len = struct.unpack('<4sI', self._buffer[:8])
587 if need_block_type is not None and block_type != need_block_type:
588 raise ReadUnexpected()
590 block_len += 8
591 self._fill_buffer(block_len)
592 block_data = self._buffer
593 self._buffer = ''
594 return unpack_block(block_data)
596 def read_record(self):
597 self._irecord += 1
598 mod, _ = self._read_block('MOD\0')
599 measured_system_time = time.time() - 4.0
600 mde, _ = self._read_block('MDE\0')
601 dat, values_data = self._read_block('DAT\0')
602 sum, _ = self._read_block('SUM\0')
603 values = unpack_values(mod.ncomps, mod.bytes_per_sample, values_data)
604 deltat = 1./mod.sample_rate * mod.ncomps
605 r = Record(mod, mde, dat, sum, values)
606 approx_system_time = self._time_estimator_system.insert(
607 deltat, values.size//mod.ncomps, measured_system_time)
609 try:
610 gpstime = r.gps.time
611 except GPSError:
612 gpstime = None
614 approx_gps_time = self._time_estimator_gps.insert(
615 deltat, values.size//mod.ncomps, gpstime)
617 r.set_approx_times(
618 approx_system_time, approx_gps_time, measured_system_time)
620 return r
622 def stop(self):
623 if not self.running():
624 return
626 self._serial.close()
627 self._serial = None
628 self._buffer = ''
629 self._time_estimator_system.reset()
630 self._time_estimator_gps.reset()
633class EDLHamster(object):
634 def __init__(self, *args, **kwargs):
635 self.reader = Reader(*args, **kwargs)
637 def acquisition_start(self):
638 self.reader.start()
640 def acquisition_stop(self):
641 self.reader.stop()
643 def process(self):
644 rec = self.reader.read_record()
645 self.got_record(rec)
647 def got_record(self, rec):
648 for tr in rec.traces:
649 self.got_trace(tr)
651 def got_trace(self, tr):
652 logger.info('Got trace from EDL: %s' % tr)