Coverage for /usr/local/lib/python3.11/dist-packages/pyrocko/streaming/slink.py: 27%
89 statements
« prev ^ index » next coverage.py v6.5.0, created at 2024-03-07 11:54 +0000
« prev ^ index » next coverage.py v6.5.0, created at 2024-03-07 11:54 +0000
1# http://pyrocko.org - GPLv3
2#
3# The Pyrocko Developers, 21st Century
4# ---|P------/S----------~Lg----------
6'''
7Live stream reader for SeedLink streams (via `slinktool
8<https://www.seiscomp.de/doc/apps/slinktool.html>`_).
9'''
11import subprocess
12import time
13import os
14import signal
15import select
16import logging
17import tempfile
19from pyrocko.io import mseed
21logger = logging.getLogger('pyrocko.streaming.slink')
22RECORD_LENGTH = 512
25def preexec():
26 os.setpgrp()
29class SlowSlinkError(Exception):
30 pass
33class SlowSlink(object):
34 def __init__(self, host='geofon.gfz-potsdam.de', port=18000):
35 self.host = host
36 self.port = port
37 self.running = False
38 self.stream_selectors = []
40 def query_streams(self):
41 cmd = ['slinktool', '-Q', self.host+':'+str(self.port)]
42 logger.debug('Running %s' % ' '.join(cmd))
43 try:
44 slink = subprocess.Popen(cmd, stdout=subprocess.PIPE)
45 except OSError as e:
46 raise SlowSlinkError('Could not start "slinktool": %s' % str(e))
48 (a, b) = slink.communicate()
49 streams = []
50 for line in a.splitlines():
51 line = line.decode()
52 toks = line.split()
53 if len(toks) == 9:
54 net, sta, loc, cha = toks[0], toks[1], '', toks[2]
55 else:
56 net, sta, loc, cha = toks[0], toks[1], toks[2], toks[3]
57 streams.append((net, sta, loc, cha))
58 return streams
60 def add_stream(self, network, station, location, channel):
61 self.stream_selectors.append(
62 '%s_%s:%s.D' % (network, station, channel))
64 def add_raw_stream_selector(self, stream_selector):
65 self.stream_selectors.append(stream_selector)
67 def acquisition_start(self):
68 assert not self.running
69 if self.stream_selectors:
70 streams = ['-S', ','.join(self.stream_selectors)]
71 else:
72 streams = []
74 cmd = ['slinktool', '-o', '-'] \
75 + streams + [self.host+':'+str(self.port)]
77 logger.debug('Starting %s' % ' '.join(cmd))
78 self.running = True
79 self.header = None
80 self.vals = []
81 try:
82 self.slink = subprocess.Popen(
83 cmd,
84 stdout=subprocess.PIPE,
85 preexec_fn=preexec,
86 close_fds=True)
87 except OSError as e:
88 raise SlowSlinkError('Could not start "slinktool": %s' % str(e))
90 logger.debug('Started.')
92 def acquisition_stop(self):
93 self.acquisition_request_stop()
95 def acquisition_request_stop(self):
96 if not self.running:
97 return
99 self.running = False # intentionally before the kill
101 os.kill(self.slink.pid, signal.SIGTERM)
102 logger.debug('Waiting for slinktool to terminate...')
103 it = 0
104 while self.slink.poll() == -1:
105 time.sleep(0.01)
106 if it == 200:
107 logger.debug(
108 'Waiting for slinktool to terminate... trying harder...')
109 os.kill(self.slink.pid, signal.SIGKILL)
111 it += 1
113 logger.debug('Done, slinktool has terminated')
115 def process(self):
116 try:
117 ready, _, _ = select.select([self.slink.stdout], [], [], .2)
118 if not ready:
119 return False
121 line = self.slink.stdout.read(RECORD_LENGTH)
123 with tempfile.NamedTemporaryFile(prefix='slink-stream') as f:
124 f.write(line)
125 f.flush()
127 traces = mseed.iload(f.name)
128 for tr in traces:
129 self.got_trace(tr)
131 return True
133 except Exception as e:
134 logger.debug(e)
135 return False
137 def got_trace(self, tr):
138 logger.info('Got trace from slinktool: %s' % tr)