Source code for pyrocko.streaming.slink

# http://pyrocko.org - GPLv3
#
# The Pyrocko Developers, 21st Century
# ---|P------/S----------~Lg----------
from __future__ import absolute_import

import subprocess
import time
import os
import signal
import select
import logging
import tempfile

from pyrocko.io import mseed

logger = logging.getLogger('pyrocko.streaming.slink')
RECORD_LENGTH = 512


def preexec():
    os.setpgrp()


[docs]class SlowSlinkError(Exception): pass
class SlowSlink(object): def __init__(self, host='geofon.gfz-potsdam.de', port=18000): self.host = host self.port = port self.running = False self.stream_selectors = [] def query_streams(self): cmd = ['slinktool', '-Q', self.host+':'+str(self.port)] logger.debug('Running %s' % ' '.join(cmd)) try: slink = subprocess.Popen(cmd, stdout=subprocess.PIPE) except OSError as e: raise SlowSlinkError('Could not start "slinktool": %s' % str(e)) (a, b) = slink.communicate() streams = [] for line in a.splitlines(): line = line.decode() toks = line.split() if len(toks) == 9: net, sta, loc, cha = toks[0], toks[1], '', toks[2] else: net, sta, loc, cha = toks[0], toks[1], toks[2], toks[3] streams.append((net, sta, loc, cha)) return streams def add_stream(self, network, station, location, channel): self.stream_selectors.append( '%s_%s:%s.D' % (network, station, channel)) def add_raw_stream_selector(self, stream_selector): self.stream_selectors.append(stream_selector) def acquisition_start(self): assert not self.running if self.stream_selectors: streams = ['-S', ','.join(self.stream_selectors)] else: streams = [] cmd = ['slinktool', '-o', '-'] \ + streams + [self.host+':'+str(self.port)] logger.debug('Starting %s' % ' '.join(cmd)) self.running = True self.header = None self.vals = [] try: self.slink = subprocess.Popen( cmd, stdout=subprocess.PIPE, preexec_fn=preexec, close_fds=True) except OSError as e: raise SlowSlinkError('Could not start "slinktool": %s' % str(e)) logger.debug('Started.') def acquisition_stop(self): self.acquisition_request_stop() def acquisition_request_stop(self): if not self.running: return self.running = False # intentionally before the kill os.kill(self.slink.pid, signal.SIGTERM) logger.debug("Waiting for slinktool to terminate...") it = 0 while self.slink.poll() == -1: time.sleep(0.01) if it == 200: logger.debug( "Waiting for slinktool to terminate... trying harder...") os.kill(self.slink.pid, signal.SIGKILL) it += 1 logger.debug("Done, slinktool has terminated") def process(self): try: ready, _, _ = select.select([self.slink.stdout], [], [], .2) if not ready: return False line = self.slink.stdout.read(RECORD_LENGTH) with tempfile.NamedTemporaryFile(prefix='slink-stream') as f: f.write(line) f.flush() traces = mseed.iload(f.name) for tr in traces: self.got_trace(tr) return True except Exception as e: logger.debug(e) return False def got_trace(self, tr): logger.info('Got trace from slinktool: %s' % tr)