1# http://pyrocko.org - GPLv3 

2# 

3# The Pyrocko Developers, 21st Century 

4# ---|P------/S----------~Lg---------- 

5from __future__ import absolute_import 

6 

7import subprocess 

8import time 

9import os 

10import signal 

11import select 

12import logging 

13import tempfile 

14 

15from pyrocko.io import mseed 

16 

17logger = logging.getLogger('pyrocko.streaming.slink') 

18RECORD_LENGTH = 512 

19 

20 

21def preexec(): 

22 os.setpgrp() 

23 

24 

25class SlowSlinkError(Exception): 

26 pass 

27 

28 

29class SlowSlink(object): 

30 def __init__(self, host='geofon.gfz-potsdam.de', port=18000): 

31 self.host = host 

32 self.port = port 

33 self.running = False 

34 self.stream_selectors = [] 

35 

36 def query_streams(self): 

37 cmd = ['slinktool', '-Q', self.host+':'+str(self.port)] 

38 logger.debug('Running %s' % ' '.join(cmd)) 

39 try: 

40 slink = subprocess.Popen(cmd, stdout=subprocess.PIPE) 

41 except OSError as e: 

42 raise SlowSlinkError('Could not start "slinktool": %s' % str(e)) 

43 

44 (a, b) = slink.communicate() 

45 streams = [] 

46 for line in a.splitlines(): 

47 line = line.decode() 

48 toks = line.split() 

49 if len(toks) == 9: 

50 net, sta, loc, cha = toks[0], toks[1], '', toks[2] 

51 else: 

52 net, sta, loc, cha = toks[0], toks[1], toks[2], toks[3] 

53 streams.append((net, sta, loc, cha)) 

54 return streams 

55 

56 def add_stream(self, network, station, location, channel): 

57 self.stream_selectors.append( 

58 '%s_%s:%s.D' % (network, station, channel)) 

59 

60 def add_raw_stream_selector(self, stream_selector): 

61 self.stream_selectors.append(stream_selector) 

62 

63 def acquisition_start(self): 

64 assert not self.running 

65 if self.stream_selectors: 

66 streams = ['-S', ','.join(self.stream_selectors)] 

67 else: 

68 streams = [] 

69 

70 cmd = ['slinktool', '-o', '-'] \ 

71 + streams + [self.host+':'+str(self.port)] 

72 

73 logger.debug('Starting %s' % ' '.join(cmd)) 

74 self.running = True 

75 self.header = None 

76 self.vals = [] 

77 try: 

78 self.slink = subprocess.Popen( 

79 cmd, 

80 stdout=subprocess.PIPE, 

81 preexec_fn=preexec, 

82 close_fds=True) 

83 except OSError as e: 

84 raise SlowSlinkError('Could not start "slinktool": %s' % str(e)) 

85 

86 logger.debug('Started.') 

87 

88 def acquisition_stop(self): 

89 self.acquisition_request_stop() 

90 

91 def acquisition_request_stop(self): 

92 if not self.running: 

93 return 

94 

95 self.running = False # intentionally before the kill 

96 

97 os.kill(self.slink.pid, signal.SIGTERM) 

98 logger.debug("Waiting for slinktool to terminate...") 

99 it = 0 

100 while self.slink.poll() == -1: 

101 time.sleep(0.01) 

102 if it == 200: 

103 logger.debug( 

104 "Waiting for slinktool to terminate... trying harder...") 

105 os.kill(self.slink.pid, signal.SIGKILL) 

106 

107 it += 1 

108 

109 logger.debug("Done, slinktool has terminated") 

110 

111 def process(self): 

112 try: 

113 ready, _, _ = select.select([self.slink.stdout], [], [], .2) 

114 if not ready: 

115 return False 

116 

117 line = self.slink.stdout.read(RECORD_LENGTH) 

118 

119 with tempfile.NamedTemporaryFile(prefix='slink-stream') as f: 

120 f.write(line) 

121 f.flush() 

122 

123 traces = mseed.iload(f.name) 

124 for tr in traces: 

125 self.got_trace(tr) 

126 

127 return True 

128 

129 except Exception as e: 

130 logger.debug(e) 

131 return False 

132 

133 def got_trace(self, tr): 

134 logger.info('Got trace from slinktool: %s' % tr)