Source code for pyrocko.gui.marker

# http://pyrocko.org - GPLv3
#
# The Pyrocko Developers, 21st Century
# ---|P------/S----------~Lg----------
from __future__ import absolute_import

import calendar
import math
import time
import copy
import logging
import sys

import numpy as num

from pyrocko import util, plot, model, trace
from pyrocko.util import TableWriter, TableReader, gmtime_x, mystrftime


logger = logging.getLogger('pyrocko.gui.marker')


if sys.version_info[0] >= 3:
    polarity_symbols = {1: u'\u2191', -1: u'\u2193', None: u'', 0: u'\u2195'}
else:
    polarity_symbols = {1: '+', -1: '-', None: '', 0: '0'}


def str_to_float_or_none(s):
    if s == 'None':
        return None
    return float(s)


def str_to_str_or_none(s):
    if s == 'None':
        return None
    return s


def str_to_int_or_none(s):
    if s == 'None':
        return None
    return int(s)


def str_to_bool(s):
    return s.lower() in ('true', 't', '1')


def myctime(timestamp):
    tt, ms = gmtime_x(timestamp)
    return mystrftime(None, tt, ms)


[docs]class MarkerParseError(Exception): pass
[docs]class MarkerOneNSLCRequired(Exception): pass
[docs]class Marker(object): ''' General purpose marker GUI element and base class for :py:class:`EventMarker` and :py:class:`PhaseMarker`. :param nslc_ids: list of (network, station, location, channel) tuples (may contain wildcards) :param tmin: start time :param tmax: end time :param kind: (optional) integer to distinguish groups of markers (color-coded) ''' @staticmethod def from_string(line): def fail(): raise MarkerParseError( 'Unable to create marker from string: "%s"' % line) def parsedate(ymd, hms, sfs): return calendar.timegm( time.strptime(ymd+' '+hms, '%Y-%m-%d %H:%M:%S')) + float(sfs) try: toks = line.split() if len(toks) in (4, 5): tmin = parsedate(*toks[:3]) tmax = tmin elif len(toks) in (8, 9): tmin = parsedate(*toks[:3]) tmax = parsedate(*toks[3:6]) else: fail() if len(toks) in (5, 9): kind = int(toks[-2]) else: kind = int(toks[-1]) nslc_ids = [] if len(toks) in (5, 9): for snslc in toks[-1].split(','): nslc = snslc.split('.') if len(nslc) != 4: fail() nslc_ids.append(tuple(nslc)) except MarkerParseError: fail() return Marker(nslc_ids, tmin, tmax, kind=kind)
[docs] @staticmethod def save_markers(markers, fn, fdigits=3): '''Static method to write marker objects to file. :param markers: list of :py:class:`Marker` objects :param fn: filename as string :param fdigits: number of decimal digits to use for sub-second time strings (default 3) ''' f = open(fn, 'w') f.write('# Snuffler Markers File Version 0.2\n') writer = TableWriter(f) for marker in markers: a = marker.get_attributes(fdigits=fdigits) w = marker.get_attribute_widths(fdigits=fdigits) row = [] for x in a: if x is None or x == '': row.append('None') else: row.append(x) writer.writerow(row, w) f.close()
[docs] @staticmethod def load_markers(fn): ''' Static method to load markers from file. :param filename: filename as string :returns: list of :py:class:`Marker`, :py:class:`EventMarker` or :py:class:`PhaseMarker` objects ''' markers = [] with open(fn, 'r') as f: line = f.readline() if not line.startswith('# Snuffler Markers File Version'): f.seek(0) for iline, line in enumerate(f): line = str(line.decode('ascii')) sline = line.strip() if not sline or sline.startswith('#'): continue try: m = Marker.from_string(sline) markers.append(m) except MarkerParseError: logger.warning( 'Invalid marker definition in line %i of file "%s"' % (iline+1, fn)) f.close() elif line.startswith('# Snuffler Markers File Version 0.2'): reader = TableReader(f) while not reader.eof: row = reader.readrow() if not row: continue if row[0] == 'event:': marker = EventMarker.from_attributes(row) elif row[0] == 'phase:': marker = PhaseMarker.from_attributes(row) else: marker = Marker.from_attributes(row) markers.append(marker) else: logger.warning('Unsupported Markers File Version') return markers
def __init__(self, nslc_ids, tmin, tmax, kind=0): self.set(nslc_ids, tmin, tmax) c = plot.color self.color_a = [c(x) for x in ( 'aluminium4', 'aluminium5', 'aluminium6')] self.color_b = [c(x) for x in ( 'scarletred1', 'scarletred2', 'scarletred3', 'chameleon1', 'chameleon2', 'chameleon3', 'skyblue1', 'skyblue2', 'skyblue3', 'orange1', 'orange2', 'orange3', 'plum1', 'plum2', 'plum3', 'chocolate1', 'chocolate2', 'chocolate3')] self.alerted = False self.selected = False self.kind = kind
[docs] def set(self, nslc_ids, tmin, tmax): '''Set ``nslc_ids``, start time and end time of :py:class:`Marker` :param nslc_ids: list or set of (network, station, location, channel) tuples :param tmin: start time :param tmax: end time ''' self.nslc_ids = nslc_ids self.tmin = tmin self.tmax = tmax
[docs] def set_kind(self, kind): '''Set kind of :py:class:`Marker` :param kind: (optional) integer to distinguish groups of markers (color-coded) ''' self.kind = kind
[docs] def get_tmin(self): '''Get *start time* of :py:class:`Marker`''' return self.tmin
[docs] def get_tmax(self): '''Get *end time* of :py:class:`Marker`''' return self.tmax
[docs] def get_nslc_ids(self): '''Get marker's network-station-location-channel pattern. :returns: list or set of (network, station, location, channel) tuples The network, station, location, or channel strings may contain wildcard expressions. ''' return self.nslc_ids
def is_alerted(self): return self.alerted def is_selected(self): return self.selected def set_alerted(self, state): self.alerted = state def set_selected(self, state): self.selected = state
[docs] def match_nsl(self, nsl): '''See documentation of :py:func:`pyrocko.util.match_nslc`''' patterns = ['.'.join(x[:3]) for x in self.nslc_ids] return util.match_nslc(patterns, nsl)
[docs] def match_nslc(self, nslc): '''See documentation of :py:func:`pyrocko.util.match_nslc`''' patterns = ['.'.join(x) for x in self.nslc_ids] return util.match_nslc(patterns, nslc)
[docs] def one_nslc(self): '''If one *nslc_id* defines this marker return this id. If more than one *nslc_id* is defined in the :py:class:`Marker`s *nslc_ids* raise :py:exc:`MarkerOneNSLCRequired`.''' if len(self.nslc_ids) != 1: raise MarkerOneNSLCRequired() return list(self.nslc_ids)[0]
def hoover_message(self): return ''
[docs] def copy(self): ''' Get a copy of this marker. ''' return copy.deepcopy(self)
def __str__(self): traces = ','.join(['.'.join(nslc_id) for nslc_id in self.nslc_ids]) st = myctime if self.tmin == self.tmax: return '%s %i %s' % (st(self.tmin), self.kind, traces) else: return '%s %s %g %i %s' % ( st(self.tmin), st(self.tmax), self.tmax-self.tmin, self.kind, traces) def get_attributes(self, fdigits=3): traces = ','.join(['.'.join(nslc_id) for nslc_id in self.nslc_ids]) def st(t): return util.time_to_str( t, format='%Y-%m-%d %H:%M:%S.'+'%iFRAC' % fdigits) vals = [] vals.extend(st(self.tmin).split()) if self.tmin != self.tmax: vals.extend(st(self.tmax).split()) vals.append(self.tmax-self.tmin) vals.append(self.kind) vals.append(traces) return vals def get_attribute_widths(self, fdigits=3): ws = [10, 9+fdigits] if self.tmin != self.tmax: ws.extend([10, 9+fdigits, 12]) ws.extend([2, 15]) return ws @staticmethod def parse_attributes(vals): tmin = util.str_to_time(vals[0] + ' ' + vals[1]) i = 2 tmax = tmin if len(vals) == 7: tmax = util.str_to_time(vals[2] + ' ' + vals[3]) i = 5 kind = int(vals[i]) traces = vals[i+1] if traces == 'None': nslc_ids = [] else: nslc_ids = tuple( [tuple(nslc_id.split('.')) for nslc_id in traces.split(',')]) return nslc_ids, tmin, tmax, kind @staticmethod def from_attributes(vals): return Marker(*Marker.parse_attributes(vals)) def select_color(self, colorlist): def cl(x): return colorlist[(self.kind*3+x) % len(colorlist)] if self.selected: return cl(1) if self.alerted: return cl(1) return cl(2) def draw( self, p, time_projection, y_projection, draw_line=True, draw_triangle=False, **kwargs): from .qt_compat import qc, qg from . import util as gui_util if self.selected or self.alerted or not self.nslc_ids: color = self.select_color(self.color_b) pen = qg.QPen(qg.QColor(*color)) pen.setWidth(2) linepen = qg.QPen(pen) if self.selected or self.alerted: linepen.setStyle(qc.Qt.CustomDashLine) pat = [5., 3.] linepen.setDashPattern(pat) if self.alerted and not self.selected: linepen.setColor(qg.QColor(150, 150, 150)) s = 9. utriangle = gui_util.make_QPolygonF( [-0.577*s, 0., 0.577*s], [0., 1.*s, 0.]) ltriangle = gui_util.make_QPolygonF( [-0.577*s, 0., 0.577*s], [0., -1.*s, 0.]) def drawline(t): u = time_projection(t) v0, v1 = y_projection.get_out_range() line = qc.QLineF(u, v0, u, v1) p.drawLine(line) def drawtriangles(t): u = time_projection(t) v0, v1 = y_projection.get_out_range() t = qg.QPolygonF(utriangle) t.translate(u, v0) p.drawConvexPolygon(t) t = qg.QPolygonF(ltriangle) t.translate(u, v1) p.drawConvexPolygon(t) if draw_line or self.selected or self.alerted: p.setPen(linepen) drawline(self.tmin) drawline(self.tmax) if draw_triangle: pen.setStyle(qc.Qt.SolidLine) pen.setJoinStyle(qc.Qt.MiterJoin) pen.setWidth(2) p.setPen(pen) p.setBrush(qg.QColor(*color)) drawtriangles(self.tmin) def draw_trace( self, viewer, p, tr, time_projection, track_projection, gain, outline_label=False): from .qt_compat import qc, qg from . import util as gui_util if self.nslc_ids and not self.match_nslc(tr.nslc_id): return color = self.select_color(self.color_b) pen = qg.QPen(qg.QColor(*color)) pen.setWidth(2) p.setPen(pen) p.setBrush(qc.Qt.NoBrush) def drawpoint(t, y): u = time_projection(t) v = track_projection(y) rect = qc.QRectF(u-2, v-2, 4, 4) p.drawRect(rect) def drawline(t): u = time_projection(t) v0, v1 = track_projection.get_out_range() line = qc.QLineF(u, v0, u, v1) p.drawLine(line) try: snippet = tr.chop( self.tmin, self.tmax, inplace=False, include_last=True, snap=(math.ceil, math.floor)) vdata = track_projection(gain*snippet.get_ydata()) udata_min = float( time_projection(snippet.tmin)) udata_max = float( time_projection(snippet.tmin+snippet.deltat*(vdata.size-1))) udata = num.linspace(udata_min, udata_max, vdata.size) qpoints = gui_util.make_QPolygonF(udata, vdata) pen.setWidth(1) p.setPen(pen) p.drawPolyline(qpoints) pen.setWidth(2) p.setPen(pen) drawpoint(*tr(self.tmin, clip=True, snap=math.ceil)) drawpoint(*tr(self.tmax, clip=True, snap=math.floor)) except trace.NoData: pass color = self.select_color(self.color_b) pen = qg.QPen(qg.QColor(*color)) pen.setWidth(2) p.setPen(pen) drawline(self.tmin) drawline(self.tmax) label = self.get_label() if label: label_bg = qg.QBrush(qg.QColor(255, 255, 255)) u = time_projection(self.tmin) v0, v1 = track_projection.get_out_range() if outline_label: du = -7 else: du = -5 gui_util.draw_label( p, u+du, v0, label, label_bg, 'TR', outline=outline_label) if self.tmin == self.tmax: try: drawpoint(self.tmin, tr.interpolate(self.tmin)) except IndexError: pass def get_label(self): return None def convert_to_phase_marker( self, event=None, phasename=None, polarity=None, automatic=None, incidence_angle=None, takeoff_angle=None): if isinstance(self, PhaseMarker): return self.__class__ = PhaseMarker self._event = event self._phasename = phasename self._polarity = polarity self._automatic = automatic self._incidence_angle = incidence_angle self._takeoff_angle = takeoff_angle if self._event: self._event_hash = event.get_hash() self._event_time = event.time else: self._event_hash = None self._event_time = None def convert_to_event_marker(self, lat=0., lon=0.): if isinstance(self, EventMarker): return if isinstance(self, PhaseMarker): self.convert_to_marker() self.__class__ = EventMarker self._event = model.Event(lat, lon, time=self.tmin, name='Event') self._event_hash = self._event.get_hash() self._active = False self.tmax = self.tmin self.nslc_ids = []
[docs]class EventMarker(Marker): """ An EventMarker is a GUI element representing a seismological event :param event: A :py:class:`pyrocko.model.Event` object containing meta information of a seismological event :param kind: (optional) integer to distinguish groups of markers :param event_hash: (optional) hash code of event (see: :py:meth:`pyrocko.model.Event.get_hash`) """ def __init__(self, event, kind=0, event_hash=None): Marker.__init__(self, [], event.time, event.time, kind) self._event = event self._active = False self._event_hash = event_hash def get_event_hash(self): if self._event_hash is not None: return self._event_hash else: return self._event.get_hash() def set_active(self, active): self._active = active def label(self): t = [] mag = self._event.magnitude if mag is not None: t.append('M%3.1f' % mag) reg = self._event.region if reg is not None: t.append(reg) nam = self._event.name if nam is not None: t.append(nam) s = ' '.join(t) if not s: s = '(Event)' return s def draw(self, p, time_projection, y_projection, with_label=False): Marker.draw( self, p, time_projection, y_projection, draw_line=False, draw_triangle=True) if with_label: self.draw_label(p, time_projection, y_projection) def draw_label(self, p, time_projection, y_projection): from .qt_compat import qg from . import util as gui_util u = time_projection(self.tmin) v0, v1 = y_projection.get_out_range() label_bg = qg.QBrush(qg.QColor(255, 255, 255)) gui_util.draw_label( p, u, v0-10., self.label(), label_bg, 'CB', outline=self._active)
[docs] def get_event(self): '''Return an instance of the :py:class:`pyrocko.model.Event` associated to this :py:class:`EventMarker`''' return self._event
def draw_trace(self, viewer, p, tr, time_projection, track_projection, gain): pass def hoover_message(self): ev = self.get_event() evs = [] for k in 'magnitude lat lon depth name region catalog'.split(): if ev.__dict__[k] is not None and ev.__dict__[k] != '': if k == 'depth': sv = '%g km' % (ev.depth * 0.001) else: sv = '%s' % ev.__dict__[k] evs.append('%s = %s' % (k, sv)) return ', '.join(evs) def get_attributes(self, fdigits=3): attributes = ['event:'] attributes.extend(Marker.get_attributes(self, fdigits=fdigits)) del attributes[-1] e = self._event attributes.extend([ e.get_hash(), e.lat, e.lon, e.depth, e.magnitude, e.catalog, e.name, e.region]) return attributes def get_attribute_widths(self, fdigits=3): ws = [6] ws.extend(Marker.get_attribute_widths(self, fdigits=fdigits)) del ws[-1] ws.extend([14, 12, 12, 12, 4, 5, 0, 0]) return ws @staticmethod def from_attributes(vals): nslc_ids, tmin, tmax, kind = Marker.parse_attributes( vals[1:] + ['None']) lat, lon, depth, magnitude = [ str_to_float_or_none(x) for x in vals[5:9]] catalog, name, region = [ str_to_str_or_none(x) for x in vals[9:]] e = model.Event( lat, lon, time=tmin, name=name, depth=depth, magnitude=magnitude, region=region, catalog=catalog) marker = EventMarker( e, kind, event_hash=str_to_str_or_none(vals[4])) return marker
[docs]class PhaseMarker(Marker): ''' A PhaseMarker is a GUI-element representing a seismological phase arrival :param nslc_ids: list of (network, station, location, channel) tuples (may contain wildcards) :param tmin: start time :param tmax: end time :param kind: (optional) integer to distinguish groups of markers (color-coded) :param event: a :py:class:`pyrocko.model.Event` object containing meta information of a seismological event :param event_hash: (optional) hash code of event (see: :py:meth:`pyrocko.model.Event.get_hash`) :param event_time: (optional) time of the associated event :param phasename: (optional) name of the phase associated with the marker :param polarity: (optional) polarity of arriving phase :param automatic: (optional) :param incident_angle: (optional) incident angle of phase :param takeoff_angle: (optional) take off angle of phase ''' def __init__( self, nslc_ids, tmin, tmax, kind=0, event=None, event_hash=None, event_time=None, phasename=None, polarity=None, automatic=None, incidence_angle=None, takeoff_angle=None): Marker.__init__(self, nslc_ids, tmin, tmax, kind) self._event = event self._event_hash = event_hash self._event_time = event_time self._phasename = phasename self._automatic = automatic self._incidence_angle = incidence_angle self._takeoff_angle = takeoff_angle self.set_polarity(polarity) def draw_trace(self, viewer, p, tr, time_projection, track_projection, gain): Marker.draw_trace( self, viewer, p, tr, time_projection, track_projection, gain, outline_label=( self._event is not None and self._event == viewer.get_active_event())) def get_label(self): t = [] if self._phasename is not None: t.append(self._phasename) if self._polarity is not None: t.append(self.get_polarity_symbol()) if self._automatic: t.append('@') return ''.join(t)
[docs] def get_event(self): '''Return an instance of the :py:class:`pyrocko.model.Event` associated to this :py:class:`EventMarker`''' return self._event
def get_event_hash(self): if self._event_hash is not None: return self._event_hash else: if self._event is None: return None else: return self._event.get_hash() def get_event_time(self): if self._event is not None: return self._event.time else: return self._event_time def set_event_hash(self, event_hash): self._event_hash = event_hash def set_event(self, event): self._event = event if event is not None: self.set_event_hash(event.get_hash()) def get_phasename(self): return self._phasename def set_phasename(self, phasename): self._phasename = phasename def set_polarity(self, polarity): if polarity not in [1, -1, 0, None]: raise ValueError('polarity has to be 1, -1, 0 or None') self._polarity = polarity def get_polarity_symbol(self): return polarity_symbols.get(self._polarity, '') def get_polarity(self): return self._polarity def convert_to_marker(self): del self._event del self._event_hash del self._phasename del self._polarity del self._automatic del self._incidence_angle del self._takeoff_angle self.__class__ = Marker def hoover_message(self): toks = [] for k in 'incidence_angle takeoff_angle polarity'.split(): v = getattr(self, '_' + k) if v is not None: toks.append('%s = %s' % (k, v)) return ', '.join(toks) def get_attributes(self, fdigits=3): attributes = ['phase:'] attributes.extend(Marker.get_attributes(self, fdigits=fdigits)) et = None, None if self._event: et = self._st(self._event.time, fdigits).split() elif self._event_time: et = self._st(self._event_time, fdigits).split() attributes.extend([ self.get_event_hash(), et[0], et[1], self._phasename, self._polarity, self._automatic]) return attributes def _st(self, t, fdigits): return util.time_to_str( t, format='%Y-%m-%d %H:%M:%S.'+'%iFRAC' % fdigits) def get_attribute_widths(self, fdigits=3): ws = [6] ws.extend(Marker.get_attribute_widths(self, fdigits=fdigits)) ws.extend([14, 12, 12, 8, 4, 5]) return ws @staticmethod def from_attributes(vals): if len(vals) == 14: nbasicvals = 7 else: nbasicvals = 4 nslc_ids, tmin, tmax, kind = Marker.parse_attributes( vals[1:1+nbasicvals]) i = 8 if len(vals) == 14: i = 11 event_hash = str_to_str_or_none(vals[i-3]) event_sdate = str_to_str_or_none(vals[i-2]) event_stime = str_to_str_or_none(vals[i-1]) if event_sdate is not None and event_stime is not None: event_time = util.str_to_time(event_sdate + ' ' + event_stime) else: event_time = None phasename = str_to_str_or_none(vals[i]) polarity = str_to_int_or_none(vals[i+1]) automatic = str_to_bool(vals[i+2]) marker = PhaseMarker(nslc_ids, tmin, tmax, kind, event=None, event_hash=event_hash, event_time=event_time, phasename=phasename, polarity=polarity, automatic=automatic) return marker
[docs]def load_markers(filename): ''' Load markers from file. :param filename: filename as string :returns: list of :py:class:`Marker` Objects ''' return Marker.load_markers(filename)
[docs]def save_markers(markers, filename, fdigits=3): ''' Save markers to file. :param markers: list of :py:class:`Marker` Objects :param filename: filename as string :param fdigits: number of decimal digits to use for sub-second time strings ''' return Marker.save_markers(markers, filename, fdigits=fdigits)
[docs]def associate_phases_to_events(markers): ''' Reassociate phases to events after import from markers file. ''' hash_to_events = {} time_to_events = {} for marker in markers: if isinstance(marker, EventMarker): ev = marker.get_event() hash_to_events[marker.get_event_hash()] = ev time_to_events[ev.time] = ev for marker in markers: if isinstance(marker, PhaseMarker): h = marker.get_event_hash() t = marker.get_event_time() if marker.get_event() is None: if h is not None and h in hash_to_events: marker.set_event(hash_to_events[h]) marker.set_event_hash(None) elif t is not None and t in time_to_events: marker.set_event(time_to_events[t]) marker.set_event_hash(None)