# http://pyrocko.org - GPLv3
#
# The Pyrocko Developers, 21st Century
# ---|P------/S----------~Lg----------
from __future__ import absolute_import
import subprocess
import time
import os
import signal
import select
import logging
import tempfile
from pyrocko.io import mseed
logger = logging.getLogger('pyrocko.streaming.slink')
RECORD_LENGTH = 512
[docs]def preexec():
os.setpgrp()
[docs]class SlowSlinkError(Exception):
pass
[docs]class SlowSlink(object):
def __init__(self, host='geofon.gfz-potsdam.de', port=18000):
self.host = host
self.port = port
self.running = False
self.stream_selectors = []
[docs] def query_streams(self):
cmd = ['slinktool', '-Q', self.host+':'+str(self.port)]
logger.debug('Running %s' % ' '.join(cmd))
try:
slink = subprocess.Popen(cmd, stdout=subprocess.PIPE)
except OSError as e:
raise SlowSlinkError('Could not start "slinktool": %s' % str(e))
(a, b) = slink.communicate()
if b is not None:
streams = []
for line in a.splitlines():
line = line.decode()
toks = line.split()
if len(toks) == 9:
net, sta, loc, cha = toks[0], toks[1], '', toks[2]
else:
net, sta, loc, cha = toks[0], toks[1], toks[2], toks[3]
streams.append((net, sta, loc, cha))
else:
cmd = ['slinktool', '-L', self.host+':'+str(self.port)]
logger.debug('Running %s' % ' '.join(cmd))
channels = ['BHZ', 'HHZ', 'BH1', 'BHN', 'HH1', 'HHN', 'BH2', 'BHE',
'HH2', 'HHE']
try:
slink = subprocess.Popen(cmd, stdout=subprocess.PIPE)
except OSError as e:
raise SlowSlinkError('Could not start "slinktool": %s' % str(e))
(a, b) = slink.communicate()
streams = []
for line in a.splitlines():
line = line.decode()
toks = line.split()
for cha in channels:
net, sta, loc = toks[0], toks[1], ''
streams.append((net, sta, loc, cha))
return streams
[docs] def add_stream(self, network, station, location, channel):
self.stream_selectors.append(
'%s_%s:%s.D' % (network, station, channel))
[docs] def add_raw_stream_selector(self, stream_selector):
self.stream_selectors.append(stream_selector)
[docs] def acquisition_start(self):
assert not self.running
if self.stream_selectors:
streams = ['-S', ','.join(self.stream_selectors)]
else:
streams = []
cmd = ['slinktool', '-o', '-'] \
+ streams + [self.host+':'+str(self.port)]
logger.debug('Starting %s' % ' '.join(cmd))
self.running = True
self.header = None
self.vals = []
try:
self.slink = subprocess.Popen(
cmd,
stdout=subprocess.PIPE,
preexec_fn=preexec,
close_fds=True)
except OSError as e:
raise SlowSlinkError('Could not start "slinktool": %s' % str(e))
logger.debug('Started.')
[docs] def acquisition_stop(self):
self.acquisition_request_stop()
[docs] def acquisition_request_stop(self):
if not self.running:
return
self.running = False # intentionally before the kill
os.kill(self.slink.pid, signal.SIGTERM)
logger.debug("Waiting for slinktool to terminate...")
it = 0
while self.slink.poll() == -1:
time.sleep(0.01)
if it == 200:
logger.debug(
"Waiting for slinktool to terminate... trying harder...")
os.kill(self.slink.pid, signal.SIGKILL)
it += 1
logger.debug("Done, slinktool has terminated")
[docs] def process(self):
try:
ready, _, _ = select.select([self.slink.stdout], [], [], .2)
if not ready:
return False
line = self.slink.stdout.read(RECORD_LENGTH)
with tempfile.NamedTemporaryFile(prefix='slink-stream') as f:
f.write(line)
f.flush()
traces = mseed.iload(f.name)
for tr in traces:
self.got_trace(tr)
return True
except Exception as e:
logger.debug(e)
return False
[docs] def got_trace(self, tr):
logger.info('Got trace from slinktool: %s' % tr)