Coverage for /usr/local/lib/python3.11/dist-packages/pyrocko/streaming/slink.py: 27%

89 statements  

« prev     ^ index     » next       coverage.py v6.5.0, created at 2023-10-06 06:59 +0000

1# http://pyrocko.org - GPLv3 

2# 

3# The Pyrocko Developers, 21st Century 

4# ---|P------/S----------~Lg---------- 

5 

6''' 

7Live stream reader for SeedLink streams (via `slinktool 

8<https://www.seiscomp.de/doc/apps/slinktool.html>`_). 

9''' 

10 

11import subprocess 

12import time 

13import os 

14import signal 

15import select 

16import logging 

17import tempfile 

18 

19from pyrocko.io import mseed 

20 

21logger = logging.getLogger('pyrocko.streaming.slink') 

22RECORD_LENGTH = 512 

23 

24 

25def preexec(): 

26 os.setpgrp() 

27 

28 

29class SlowSlinkError(Exception): 

30 pass 

31 

32 

33class SlowSlink(object): 

34 def __init__(self, host='geofon.gfz-potsdam.de', port=18000): 

35 self.host = host 

36 self.port = port 

37 self.running = False 

38 self.stream_selectors = [] 

39 

40 def query_streams(self): 

41 cmd = ['slinktool', '-Q', self.host+':'+str(self.port)] 

42 logger.debug('Running %s' % ' '.join(cmd)) 

43 try: 

44 slink = subprocess.Popen(cmd, stdout=subprocess.PIPE) 

45 except OSError as e: 

46 raise SlowSlinkError('Could not start "slinktool": %s' % str(e)) 

47 

48 (a, b) = slink.communicate() 

49 streams = [] 

50 for line in a.splitlines(): 

51 line = line.decode() 

52 toks = line.split() 

53 if len(toks) == 9: 

54 net, sta, loc, cha = toks[0], toks[1], '', toks[2] 

55 else: 

56 net, sta, loc, cha = toks[0], toks[1], toks[2], toks[3] 

57 streams.append((net, sta, loc, cha)) 

58 return streams 

59 

60 def add_stream(self, network, station, location, channel): 

61 self.stream_selectors.append( 

62 '%s_%s:%s.D' % (network, station, channel)) 

63 

64 def add_raw_stream_selector(self, stream_selector): 

65 self.stream_selectors.append(stream_selector) 

66 

67 def acquisition_start(self): 

68 assert not self.running 

69 if self.stream_selectors: 

70 streams = ['-S', ','.join(self.stream_selectors)] 

71 else: 

72 streams = [] 

73 

74 cmd = ['slinktool', '-o', '-'] \ 

75 + streams + [self.host+':'+str(self.port)] 

76 

77 logger.debug('Starting %s' % ' '.join(cmd)) 

78 self.running = True 

79 self.header = None 

80 self.vals = [] 

81 try: 

82 self.slink = subprocess.Popen( 

83 cmd, 

84 stdout=subprocess.PIPE, 

85 preexec_fn=preexec, 

86 close_fds=True) 

87 except OSError as e: 

88 raise SlowSlinkError('Could not start "slinktool": %s' % str(e)) 

89 

90 logger.debug('Started.') 

91 

92 def acquisition_stop(self): 

93 self.acquisition_request_stop() 

94 

95 def acquisition_request_stop(self): 

96 if not self.running: 

97 return 

98 

99 self.running = False # intentionally before the kill 

100 

101 os.kill(self.slink.pid, signal.SIGTERM) 

102 logger.debug('Waiting for slinktool to terminate...') 

103 it = 0 

104 while self.slink.poll() == -1: 

105 time.sleep(0.01) 

106 if it == 200: 

107 logger.debug( 

108 'Waiting for slinktool to terminate... trying harder...') 

109 os.kill(self.slink.pid, signal.SIGKILL) 

110 

111 it += 1 

112 

113 logger.debug('Done, slinktool has terminated') 

114 

115 def process(self): 

116 try: 

117 ready, _, _ = select.select([self.slink.stdout], [], [], .2) 

118 if not ready: 

119 return False 

120 

121 line = self.slink.stdout.read(RECORD_LENGTH) 

122 

123 with tempfile.NamedTemporaryFile(prefix='slink-stream') as f: 

124 f.write(line) 

125 f.flush() 

126 

127 traces = mseed.iload(f.name) 

128 for tr in traces: 

129 self.got_trace(tr) 

130 

131 return True 

132 

133 except Exception as e: 

134 logger.debug(e) 

135 return False 

136 

137 def got_trace(self, tr): 

138 logger.info('Got trace from slinktool: %s' % tr)