1# http://pyrocko.org - GPLv3
2#
3# The Pyrocko Developers, 21st Century
4# ---|P------/S----------~Lg----------
5from __future__ import absolute_import
7import subprocess
8import time
9import os
10import signal
11import select
12import logging
13import tempfile
15from pyrocko.io import mseed
17logger = logging.getLogger('pyrocko.streaming.slink')
18RECORD_LENGTH = 512
21def preexec():
22 os.setpgrp()
25class SlowSlinkError(Exception):
26 pass
29class SlowSlink(object):
30 def __init__(self, host='geofon.gfz-potsdam.de', port=18000):
31 self.host = host
32 self.port = port
33 self.running = False
34 self.stream_selectors = []
36 def query_streams(self):
37 cmd = ['slinktool', '-Q', self.host+':'+str(self.port)]
38 logger.debug('Running %s' % ' '.join(cmd))
39 try:
40 slink = subprocess.Popen(cmd, stdout=subprocess.PIPE)
41 except OSError as e:
42 raise SlowSlinkError('Could not start "slinktool": %s' % str(e))
44 (a, b) = slink.communicate()
45 streams = []
46 for line in a.splitlines():
47 line = line.decode()
48 toks = line.split()
49 if len(toks) == 9:
50 net, sta, loc, cha = toks[0], toks[1], '', toks[2]
51 else:
52 net, sta, loc, cha = toks[0], toks[1], toks[2], toks[3]
53 streams.append((net, sta, loc, cha))
54 return streams
56 def add_stream(self, network, station, location, channel):
57 self.stream_selectors.append(
58 '%s_%s:%s.D' % (network, station, channel))
60 def add_raw_stream_selector(self, stream_selector):
61 self.stream_selectors.append(stream_selector)
63 def acquisition_start(self):
64 assert not self.running
65 if self.stream_selectors:
66 streams = ['-S', ','.join(self.stream_selectors)]
67 else:
68 streams = []
70 cmd = ['slinktool', '-o', '-'] \
71 + streams + [self.host+':'+str(self.port)]
73 logger.debug('Starting %s' % ' '.join(cmd))
74 self.running = True
75 self.header = None
76 self.vals = []
77 try:
78 self.slink = subprocess.Popen(
79 cmd,
80 stdout=subprocess.PIPE,
81 preexec_fn=preexec,
82 close_fds=True)
83 except OSError as e:
84 raise SlowSlinkError('Could not start "slinktool": %s' % str(e))
86 logger.debug('Started.')
88 def acquisition_stop(self):
89 self.acquisition_request_stop()
91 def acquisition_request_stop(self):
92 if not self.running:
93 return
95 self.running = False # intentionally before the kill
97 os.kill(self.slink.pid, signal.SIGTERM)
98 logger.debug("Waiting for slinktool to terminate...")
99 it = 0
100 while self.slink.poll() == -1:
101 time.sleep(0.01)
102 if it == 200:
103 logger.debug(
104 "Waiting for slinktool to terminate... trying harder...")
105 os.kill(self.slink.pid, signal.SIGKILL)
107 it += 1
109 logger.debug("Done, slinktool has terminated")
111 def process(self):
112 try:
113 ready, _, _ = select.select([self.slink.stdout], [], [], .2)
114 if not ready:
115 return False
117 line = self.slink.stdout.read(RECORD_LENGTH)
119 with tempfile.NamedTemporaryFile(prefix='slink-stream') as f:
120 f.write(line)
121 f.flush()
123 traces = mseed.iload(f.name)
124 for tr in traces:
125 self.got_trace(tr)
127 return True
129 except Exception as e:
130 logger.debug(e)
131 return False
133 def got_trace(self, tr):
134 logger.info('Got trace from slinktool: %s' % tr)