1# http://pyrocko.org - GPLv3 

2# 

3# The Pyrocko Developers, 21st Century 

4# ---|P------/S----------~Lg---------- 

5 

6import subprocess 

7import time 

8import os 

9import signal 

10import select 

11import logging 

12import tempfile 

13 

14from pyrocko.io import mseed 

15 

16logger = logging.getLogger('pyrocko.streaming.slink') 

17RECORD_LENGTH = 512 

18 

19 

20def preexec(): 

21 os.setpgrp() 

22 

23 

24class SlowSlinkError(Exception): 

25 pass 

26 

27 

28class SlowSlink(object): 

29 def __init__(self, host='geofon.gfz-potsdam.de', port=18000): 

30 self.host = host 

31 self.port = port 

32 self.running = False 

33 self.stream_selectors = [] 

34 

35 def query_streams(self): 

36 cmd = ['slinktool', '-Q', self.host+':'+str(self.port)] 

37 logger.debug('Running %s' % ' '.join(cmd)) 

38 try: 

39 slink = subprocess.Popen(cmd, stdout=subprocess.PIPE) 

40 except OSError as e: 

41 raise SlowSlinkError('Could not start "slinktool": %s' % str(e)) 

42 

43 (a, b) = slink.communicate() 

44 streams = [] 

45 for line in a.splitlines(): 

46 line = line.decode() 

47 toks = line.split() 

48 if len(toks) == 9: 

49 net, sta, loc, cha = toks[0], toks[1], '', toks[2] 

50 else: 

51 net, sta, loc, cha = toks[0], toks[1], toks[2], toks[3] 

52 streams.append((net, sta, loc, cha)) 

53 return streams 

54 

55 def add_stream(self, network, station, location, channel): 

56 self.stream_selectors.append( 

57 '%s_%s:%s.D' % (network, station, channel)) 

58 

59 def add_raw_stream_selector(self, stream_selector): 

60 self.stream_selectors.append(stream_selector) 

61 

62 def acquisition_start(self): 

63 assert not self.running 

64 if self.stream_selectors: 

65 streams = ['-S', ','.join(self.stream_selectors)] 

66 else: 

67 streams = [] 

68 

69 cmd = ['slinktool', '-o', '-'] \ 

70 + streams + [self.host+':'+str(self.port)] 

71 

72 logger.debug('Starting %s' % ' '.join(cmd)) 

73 self.running = True 

74 self.header = None 

75 self.vals = [] 

76 try: 

77 self.slink = subprocess.Popen( 

78 cmd, 

79 stdout=subprocess.PIPE, 

80 preexec_fn=preexec, 

81 close_fds=True) 

82 except OSError as e: 

83 raise SlowSlinkError('Could not start "slinktool": %s' % str(e)) 

84 

85 logger.debug('Started.') 

86 

87 def acquisition_stop(self): 

88 self.acquisition_request_stop() 

89 

90 def acquisition_request_stop(self): 

91 if not self.running: 

92 return 

93 

94 self.running = False # intentionally before the kill 

95 

96 os.kill(self.slink.pid, signal.SIGTERM) 

97 logger.debug("Waiting for slinktool to terminate...") 

98 it = 0 

99 while self.slink.poll() == -1: 

100 time.sleep(0.01) 

101 if it == 200: 

102 logger.debug( 

103 "Waiting for slinktool to terminate... trying harder...") 

104 os.kill(self.slink.pid, signal.SIGKILL) 

105 

106 it += 1 

107 

108 logger.debug("Done, slinktool has terminated") 

109 

110 def process(self): 

111 try: 

112 ready, _, _ = select.select([self.slink.stdout], [], [], .2) 

113 if not ready: 

114 return False 

115 

116 line = self.slink.stdout.read(RECORD_LENGTH) 

117 

118 with tempfile.NamedTemporaryFile(prefix='slink-stream') as f: 

119 f.write(line) 

120 f.flush() 

121 

122 traces = mseed.iload(f.name) 

123 for tr in traces: 

124 self.got_trace(tr) 

125 

126 return True 

127 

128 except Exception as e: 

129 logger.debug(e) 

130 return False 

131 

132 def got_trace(self, tr): 

133 logger.info('Got trace from slinktool: %s' % tr)