1# http://pyrocko.org - GPLv3
2#
3# The Pyrocko Developers, 21st Century
4# ---|P------/S----------~Lg----------
6import subprocess
7import time
8import os
9import signal
10import select
11import logging
12import tempfile
14from pyrocko.io import mseed
16logger = logging.getLogger('pyrocko.streaming.slink')
17RECORD_LENGTH = 512
20def preexec():
21 os.setpgrp()
24class SlowSlinkError(Exception):
25 pass
28class SlowSlink(object):
29 def __init__(self, host='geofon.gfz-potsdam.de', port=18000):
30 self.host = host
31 self.port = port
32 self.running = False
33 self.stream_selectors = []
35 def query_streams(self):
36 cmd = ['slinktool', '-Q', self.host+':'+str(self.port)]
37 logger.debug('Running %s' % ' '.join(cmd))
38 try:
39 slink = subprocess.Popen(cmd, stdout=subprocess.PIPE)
40 except OSError as e:
41 raise SlowSlinkError('Could not start "slinktool": %s' % str(e))
43 (a, b) = slink.communicate()
44 streams = []
45 for line in a.splitlines():
46 line = line.decode()
47 toks = line.split()
48 if len(toks) == 9:
49 net, sta, loc, cha = toks[0], toks[1], '', toks[2]
50 else:
51 net, sta, loc, cha = toks[0], toks[1], toks[2], toks[3]
52 streams.append((net, sta, loc, cha))
53 return streams
55 def add_stream(self, network, station, location, channel):
56 self.stream_selectors.append(
57 '%s_%s:%s.D' % (network, station, channel))
59 def add_raw_stream_selector(self, stream_selector):
60 self.stream_selectors.append(stream_selector)
62 def acquisition_start(self):
63 assert not self.running
64 if self.stream_selectors:
65 streams = ['-S', ','.join(self.stream_selectors)]
66 else:
67 streams = []
69 cmd = ['slinktool', '-o', '-'] \
70 + streams + [self.host+':'+str(self.port)]
72 logger.debug('Starting %s' % ' '.join(cmd))
73 self.running = True
74 self.header = None
75 self.vals = []
76 try:
77 self.slink = subprocess.Popen(
78 cmd,
79 stdout=subprocess.PIPE,
80 preexec_fn=preexec,
81 close_fds=True)
82 except OSError as e:
83 raise SlowSlinkError('Could not start "slinktool": %s' % str(e))
85 logger.debug('Started.')
87 def acquisition_stop(self):
88 self.acquisition_request_stop()
90 def acquisition_request_stop(self):
91 if not self.running:
92 return
94 self.running = False # intentionally before the kill
96 os.kill(self.slink.pid, signal.SIGTERM)
97 logger.debug('Waiting for slinktool to terminate...')
98 it = 0
99 while self.slink.poll() == -1:
100 time.sleep(0.01)
101 if it == 200:
102 logger.debug(
103 'Waiting for slinktool to terminate... trying harder...')
104 os.kill(self.slink.pid, signal.SIGKILL)
106 it += 1
108 logger.debug('Done, slinktool has terminated')
110 def process(self):
111 try:
112 ready, _, _ = select.select([self.slink.stdout], [], [], .2)
113 if not ready:
114 return False
116 line = self.slink.stdout.read(RECORD_LENGTH)
118 with tempfile.NamedTemporaryFile(prefix='slink-stream') as f:
119 f.write(line)
120 f.flush()
122 traces = mseed.iload(f.name)
123 for tr in traces:
124 self.got_trace(tr)
126 return True
128 except Exception as e:
129 logger.debug(e)
130 return False
132 def got_trace(self, tr):
133 logger.info('Got trace from slinktool: %s' % tr)