1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

73

74

75

76

77

78

79

80

81

82

83

84

85

86

87

88

89

90

91

92

93

94

95

96

97

98

99

100

101

102

103

104

105

106

107

108

109

110

111

112

113

114

115

116

117

118

119

120

121

122

123

124

125

126

127

128

129

130

131

132

133

134

135

# https://pyrocko.org - GPLv3 

# 

# The Pyrocko Developers, 21st Century 

# ---|P------/S----------~Lg---------- 

 

from __future__ import absolute_import, print_function, division 

 

import subprocess 

import time 

import os 

import signal 

import select 

import logging 

import tempfile 

 

from pyrocko.io import mseed 

 

logger = logging.getLogger('pyrocko.streaming.slink') 

RECORD_LENGTH = 512 

 

 

def preexec(): 

os.setpgrp() 

 

 

class SlowSlinkError(Exception): 

pass 

 

 

class SlowSlink(object): 

def __init__(self, host='geofon.gfz-potsdam.de', port=18000): 

self.host = host 

self.port = port 

self.running = False 

self.stream_selectors = [] 

 

def query_streams(self): 

cmd = ['slinktool', '-Q', self.host+':'+str(self.port)] 

logger.debug('Running %s' % ' '.join(cmd)) 

try: 

slink = subprocess.Popen(cmd, stdout=subprocess.PIPE) 

except OSError as e: 

raise SlowSlinkError('Could not start "slinktool": %s' % str(e)) 

 

(a, b) = slink.communicate() 

streams = [] 

for line in a.splitlines(): 

line = line.decode() 

toks = line.split() 

if len(toks) == 9: 

net, sta, loc, cha = toks[0], toks[1], '', toks[2] 

else: 

net, sta, loc, cha = toks[0], toks[1], toks[2], toks[3] 

streams.append((net, sta, loc, cha)) 

return streams 

 

def add_stream(self, network, station, location, channel): 

self.stream_selectors.append( 

'%s_%s:%s.D' % (network, station, channel)) 

 

def add_raw_stream_selector(self, stream_selector): 

self.stream_selectors.append(stream_selector) 

 

def acquisition_start(self): 

assert not self.running 

if self.stream_selectors: 

streams = ['-S', ','.join(self.stream_selectors)] 

else: 

streams = [] 

 

cmd = ['slinktool', '-o', '-'] \ 

+ streams + [self.host+':'+str(self.port)] 

 

logger.debug('Starting %s' % ' '.join(cmd)) 

self.running = True 

self.header = None 

self.vals = [] 

try: 

self.slink = subprocess.Popen( 

cmd, 

stdout=subprocess.PIPE, 

preexec_fn=preexec, 

close_fds=True) 

except OSError as e: 

raise SlowSlinkError('Could not start "slinktool": %s' % str(e)) 

 

logger.debug('Started.') 

 

def acquisition_stop(self): 

self.acquisition_request_stop() 

 

def acquisition_request_stop(self): 

if not self.running: 

return 

 

self.running = False # intentionally before the kill 

 

os.kill(self.slink.pid, signal.SIGTERM) 

logger.debug("Waiting for slinktool to terminate...") 

it = 0 

while self.slink.poll() == -1: 

time.sleep(0.01) 

if it == 200: 

logger.debug( 

"Waiting for slinktool to terminate... trying harder...") 

os.kill(self.slink.pid, signal.SIGKILL) 

 

it += 1 

 

logger.debug("Done, slinktool has terminated") 

 

def process(self): 

try: 

ready, _, _ = select.select([self.slink.stdout], [], [], .2) 

if not ready: 

return False 

 

line = self.slink.stdout.read(RECORD_LENGTH) 

 

with tempfile.NamedTemporaryFile(prefix='slink-stream') as f: 

f.write(line) 

f.flush() 

 

traces = mseed.iload(f.name) 

for tr in traces: 

self.got_trace(tr) 

 

return True 

 

except Exception as e: 

logger.debug(e) 

return False 

 

def got_trace(self, tr): 

logger.info('Got trace from slinktool: %s' % tr)