# http://pyrocko.org - GPLv3
#
# The Pyrocko Developers, 21st Century
# ---|P------/S----------~Lg----------
from __future__ import absolute_import
from builtins import object
import calendar
import math
import time
import copy
import logging
import sys
import numpy as num
from pyrocko import util, plot, model, trace
from pyrocko.util import TableWriter, TableReader, gmtime_x, mystrftime
logger = logging.getLogger('pyrocko.gui.marker')
if sys.version_info[0] >= 3:
polarity_symbols = {1: u'\u2191', -1: u'\u2193', None: u'', 0: u'\u2195'}
else:
polarity_symbols = {1: '+', -1: '-', None: '', 0: '0'}
def str_to_float_or_none(s):
if s == 'None':
return None
return float(s)
def str_to_str_or_none(s):
if s == 'None':
return None
return s
def str_to_int_or_none(s):
if s == 'None':
return None
return int(s)
def str_to_bool(s):
return s.lower() in ('true', 't', '1')
def myctime(timestamp):
tt, ms = gmtime_x(timestamp)
return mystrftime(None, tt, ms)
[docs]class MarkerParseError(Exception):
pass
[docs]class MarkerOneNSLCRequired(Exception):
pass
[docs]class Marker(object):
'''
General purpose marker GUI element and base class for
:py:class:`EventMarker` and :py:class:`PhaseMarker`.
:param nslc_ids: list of (network, station, location, channel) tuples
(may contain wildcards)
:param tmin: start time
:param tmax: end time
:param kind: (optional) integer to distinguish groups of markers
(color-coded)
'''
@staticmethod
def from_string(line):
def fail():
raise MarkerParseError(
'Unable to create marker from string: "%s"' % line)
def parsedate(ymd, hms, sfs):
return calendar.timegm(
time.strptime(ymd+' '+hms, '%Y-%m-%d %H:%M:%S')) + float(sfs)
try:
toks = line.split()
if len(toks) in (4, 5):
tmin = parsedate(*toks[:3])
tmax = tmin
elif len(toks) in (8, 9):
tmin = parsedate(*toks[:3])
tmax = parsedate(*toks[3:6])
else:
fail()
if len(toks) in (5, 9):
kind = int(toks[-2])
else:
kind = int(toks[-1])
nslc_ids = []
if len(toks) in (5, 9):
for snslc in toks[-1].split(','):
nslc = snslc.split('.')
if len(nslc) != 4:
fail()
nslc_ids.append(tuple(nslc))
except MarkerParseError:
fail()
return Marker(nslc_ids, tmin, tmax, kind=kind)
[docs] @staticmethod
def save_markers(markers, fn, fdigits=3):
'''Static method to write marker objects to file.
:param markers: list of :py:class:`Marker` objects
:param fn: filename as string
:param fdigits: number of decimal digits to use for sub-second time
strings (default 3)
'''
f = open(fn, 'w')
f.write('# Snuffler Markers File Version 0.2\n')
writer = TableWriter(f)
for marker in markers:
a = marker.get_attributes(fdigits=fdigits)
w = marker.get_attribute_widths(fdigits=fdigits)
row = []
for x in a:
if x is None or x == '':
row.append('None')
else:
row.append(x)
writer.writerow(row, w)
f.close()
[docs] @staticmethod
def load_markers(fn):
'''
Static method to load markers from file.
:param filename: filename as string
:returns: list of :py:class:`Marker`, :py:class:`EventMarker` or
:py:class:`PhaseMarker` objects
'''
markers = []
with open(fn, 'r') as f:
line = f.readline()
if not line.startswith('# Snuffler Markers File Version'):
f.seek(0)
for iline, line in enumerate(f):
line = str(line.decode('ascii'))
sline = line.strip()
if not sline or sline.startswith('#'):
continue
try:
m = Marker.from_string(sline)
markers.append(m)
except MarkerParseError:
logger.warning(
'Invalid marker definition in line %i of file "%s"'
% (iline+1, fn))
f.close()
elif line.startswith('# Snuffler Markers File Version 0.2'):
reader = TableReader(f)
while not reader.eof:
row = reader.readrow()
if not row:
continue
if row[0] == 'event:':
marker = EventMarker.from_attributes(row)
elif row[0] == 'phase:':
marker = PhaseMarker.from_attributes(row)
else:
marker = Marker.from_attributes(row)
markers.append(marker)
else:
logger.warning('Unsupported Markers File Version')
return markers
def __init__(self, nslc_ids, tmin, tmax, kind=0):
self.set(nslc_ids, tmin, tmax)
c = plot.color
self.color_a = [c(x) for x in (
'aluminium4', 'aluminium5', 'aluminium6')]
self.color_b = [c(x) for x in (
'scarletred1', 'scarletred2', 'scarletred3',
'chameleon1', 'chameleon2', 'chameleon3',
'skyblue1', 'skyblue2', 'skyblue3',
'orange1', 'orange2', 'orange3',
'plum1', 'plum2', 'plum3',
'chocolate1', 'chocolate2', 'chocolate3')]
self.alerted = False
self.selected = False
self.kind = kind
[docs] def set(self, nslc_ids, tmin, tmax):
'''Set ``nslc_ids``, start time and end time of :py:class:`Marker`
:param nslc_ids: list or set of (network, station, location, channel)
tuples
:param tmin: start time
:param tmax: end time
'''
self.nslc_ids = nslc_ids
self.tmin = tmin
self.tmax = tmax
[docs] def set_kind(self, kind):
'''Set kind of :py:class:`Marker`
:param kind: (optional) integer to distinguish groups of markers
(color-coded)
'''
self.kind = kind
[docs] def get_tmin(self):
'''Get *start time* of :py:class:`Marker`'''
return self.tmin
[docs] def get_tmax(self):
'''Get *end time* of :py:class:`Marker`'''
return self.tmax
[docs] def get_nslc_ids(self):
'''Get marker's network-station-location-channel pattern.
:returns: list or set of (network, station, location, channel) tuples
The network, station, location, or channel strings may contain wildcard
expressions.
'''
return self.nslc_ids
def is_alerted(self):
return self.alerted
def is_selected(self):
return self.selected
def set_alerted(self, state):
self.alerted = state
def set_selected(self, state):
self.selected = state
[docs] def match_nsl(self, nsl):
'''See documentation of :py:func:`pyrocko.util.match_nslc`'''
patterns = ['.'.join(x[:3]) for x in self.nslc_ids]
return util.match_nslc(patterns, nsl)
[docs] def match_nslc(self, nslc):
'''See documentation of :py:func:`pyrocko.util.match_nslc`'''
patterns = ['.'.join(x) for x in self.nslc_ids]
return util.match_nslc(patterns, nslc)
[docs] def one_nslc(self):
'''If one *nslc_id* defines this marker return this id.
If more than one *nslc_id* is defined in the :py:class:`Marker`s
*nslc_ids* raise :py:exc:`MarkerOneNSLCRequired`.'''
if len(self.nslc_ids) != 1:
raise MarkerOneNSLCRequired()
return list(self.nslc_ids)[0]
def hoover_message(self):
return ''
[docs] def copy(self):
''' Get a copy of this marker. '''
return copy.deepcopy(self)
def __str__(self):
traces = ','.join(['.'.join(nslc_id) for nslc_id in self.nslc_ids])
st = myctime
if self.tmin == self.tmax:
return '%s %i %s' % (st(self.tmin), self.kind, traces)
else:
return '%s %s %g %i %s' % (
st(self.tmin), st(self.tmax), self.tmax-self.tmin, self.kind,
traces)
def get_attributes(self, fdigits=3):
traces = ','.join(['.'.join(nslc_id) for nslc_id in self.nslc_ids])
def st(t):
return util.time_to_str(
t, format='%Y-%m-%d %H:%M:%S.'+'%iFRAC' % fdigits)
vals = []
vals.extend(st(self.tmin).split())
if self.tmin != self.tmax:
vals.extend(st(self.tmax).split())
vals.append(self.tmax-self.tmin)
vals.append(self.kind)
vals.append(traces)
return vals
def get_attribute_widths(self, fdigits=3):
ws = [10, 9+fdigits]
if self.tmin != self.tmax:
ws.extend([10, 9+fdigits, 12])
ws.extend([2, 15])
return ws
@staticmethod
def parse_attributes(vals):
tmin = util.str_to_time(vals[0] + ' ' + vals[1])
i = 2
tmax = tmin
if len(vals) == 7:
tmax = util.str_to_time(vals[2] + ' ' + vals[3])
i = 5
kind = int(vals[i])
traces = vals[i+1]
if traces == 'None':
nslc_ids = []
else:
nslc_ids = tuple(
[tuple(nslc_id.split('.')) for nslc_id in traces.split(',')])
return nslc_ids, tmin, tmax, kind
@staticmethod
def from_attributes(vals):
return Marker(*Marker.parse_attributes(vals))
def select_color(self, colorlist):
def cl(x):
return colorlist[(self.kind*3+x) % len(colorlist)]
if self.selected:
return cl(1)
if self.alerted:
return cl(1)
return cl(2)
def draw(
self, p, time_projection, y_projection,
draw_line=True,
draw_triangle=False,
**kwargs):
from .qt_compat import qc, qg
from . import util as gui_util
if self.selected or self.alerted or not self.nslc_ids:
color = self.select_color(self.color_b)
pen = qg.QPen(qg.QColor(*color))
pen.setWidth(2)
linepen = qg.QPen(pen)
if self.selected or self.alerted:
linepen.setStyle(qc.Qt.CustomDashLine)
pat = [5., 3.]
linepen.setDashPattern(pat)
if self.alerted and not self.selected:
linepen.setColor(qg.QColor(150, 150, 150))
s = 9.
utriangle = gui_util.make_QPolygonF(
[-0.577*s, 0., 0.577*s], [0., 1.*s, 0.])
ltriangle = gui_util.make_QPolygonF(
[-0.577*s, 0., 0.577*s], [0., -1.*s, 0.])
def drawline(t):
u = time_projection(t)
v0, v1 = y_projection.get_out_range()
line = qc.QLineF(u, v0, u, v1)
p.drawLine(line)
def drawtriangles(t):
u = time_projection(t)
v0, v1 = y_projection.get_out_range()
t = qg.QPolygonF(utriangle)
t.translate(u, v0)
p.drawConvexPolygon(t)
t = qg.QPolygonF(ltriangle)
t.translate(u, v1)
p.drawConvexPolygon(t)
if draw_line or self.selected or self.alerted:
p.setPen(linepen)
drawline(self.tmin)
drawline(self.tmax)
if draw_triangle:
pen.setStyle(qc.Qt.SolidLine)
pen.setJoinStyle(qc.Qt.MiterJoin)
pen.setWidth(2)
p.setPen(pen)
p.setBrush(qg.QColor(*color))
drawtriangles(self.tmin)
def draw_trace(
self, viewer, p, tr, time_projection, track_projection, gain,
outline_label=False):
from .qt_compat import qc, qg
from . import util as gui_util
if self.nslc_ids and not self.match_nslc(tr.nslc_id):
return
color = self.select_color(self.color_b)
pen = qg.QPen(qg.QColor(*color))
pen.setWidth(2)
p.setPen(pen)
p.setBrush(qc.Qt.NoBrush)
def drawpoint(t, y):
u = time_projection(t)
v = track_projection(y)
rect = qc.QRectF(u-2, v-2, 4, 4)
p.drawRect(rect)
def drawline(t):
u = time_projection(t)
v0, v1 = track_projection.get_out_range()
line = qc.QLineF(u, v0, u, v1)
p.drawLine(line)
try:
snippet = tr.chop(
self.tmin, self.tmax,
inplace=False,
include_last=True,
snap=(math.ceil, math.floor))
vdata = track_projection(gain*snippet.get_ydata())
udata_min = float(
time_projection(snippet.tmin))
udata_max = float(
time_projection(snippet.tmin+snippet.deltat*(vdata.size-1)))
udata = num.linspace(udata_min, udata_max, vdata.size)
qpoints = gui_util.make_QPolygonF(udata, vdata)
pen.setWidth(1)
p.setPen(pen)
p.drawPolyline(qpoints)
pen.setWidth(2)
p.setPen(pen)
drawpoint(*tr(self.tmin, clip=True, snap=math.ceil))
drawpoint(*tr(self.tmax, clip=True, snap=math.floor))
except trace.NoData:
pass
color = self.select_color(self.color_b)
pen = qg.QPen(qg.QColor(*color))
pen.setWidth(2)
p.setPen(pen)
drawline(self.tmin)
drawline(self.tmax)
label = self.get_label()
if label:
label_bg = qg.QBrush(qg.QColor(255, 255, 255))
u = time_projection(self.tmin)
v0, v1 = track_projection.get_out_range()
if outline_label:
du = -7
else:
du = -5
gui_util.draw_label(
p, u+du, v0, label, label_bg, 'TR',
outline=outline_label)
if self.tmin == self.tmax:
try:
drawpoint(self.tmin, tr.interpolate(self.tmin))
except IndexError:
pass
def get_label(self):
return None
def convert_to_phase_marker(
self,
event=None,
phasename=None,
polarity=None,
automatic=None,
incidence_angle=None,
takeoff_angle=None):
if isinstance(self, PhaseMarker):
return
self.__class__ = PhaseMarker
self._event = event
self._phasename = phasename
self._polarity = polarity
self._automatic = automatic
self._incidence_angle = incidence_angle
self._takeoff_angle = takeoff_angle
if self._event:
self._event_hash = event.get_hash()
self._event_time = event.time
else:
self._event_hash = None
self._event_time = None
def convert_to_event_marker(self, lat=0., lon=0.):
if isinstance(self, EventMarker):
return
if isinstance(self, PhaseMarker):
self.convert_to_marker()
self.__class__ = EventMarker
self._event = model.Event(lat, lon, time=self.tmin, name='Event')
self._event_hash = self._event.get_hash()
self._active = False
self.tmax = self.tmin
self.nslc_ids = []
[docs]class EventMarker(Marker):
"""
An EventMarker is a GUI element representing a seismological event
:param event: A :py:class:`pyrocko.model.Event` object containing meta
information of a seismological event
:param kind: (optional) integer to distinguish groups of markers
:param event_hash: (optional) hash code of event (see:
:py:meth:`pyrocko.model.Event.get_hash`)
"""
def __init__(self, event, kind=0, event_hash=None):
Marker.__init__(self, [], event.time, event.time, kind)
self._event = event
self._active = False
self._event_hash = event_hash
def get_event_hash(self):
if self._event_hash is not None:
return self._event_hash
else:
return self._event.get_hash()
def set_active(self, active):
self._active = active
def label(self):
t = []
mag = self._event.magnitude
if mag is not None:
t.append('M%3.1f' % mag)
reg = self._event.region
if reg is not None:
t.append(reg)
nam = self._event.name
if nam is not None:
t.append(nam)
s = ' '.join(t)
if not s:
s = '(Event)'
return s
def draw(self, p, time_projection, y_projection, with_label=False):
Marker.draw(
self, p, time_projection, y_projection,
draw_line=False,
draw_triangle=True)
if with_label:
self.draw_label(p, time_projection, y_projection)
def draw_label(self, p, time_projection, y_projection):
from .qt_compat import qg
from . import util as gui_util
u = time_projection(self.tmin)
v0, v1 = y_projection.get_out_range()
label_bg = qg.QBrush(qg.QColor(255, 255, 255))
gui_util.draw_label(
p, u, v0-10., self.label(), label_bg, 'CB',
outline=self._active)
[docs] def get_event(self):
'''Return an instance of the :py:class:`pyrocko.model.Event` associated
to this :py:class:`EventMarker`'''
return self._event
def draw_trace(self, viewer, p, tr, time_projection, track_projection,
gain):
pass
def hoover_message(self):
ev = self.get_event()
evs = []
for k in 'magnitude lat lon depth name region catalog'.split():
if ev.__dict__[k] is not None and ev.__dict__[k] != '':
if k == 'depth':
sv = '%g km' % (ev.depth * 0.001)
else:
sv = '%s' % ev.__dict__[k]
evs.append('%s = %s' % (k, sv))
return ', '.join(evs)
def get_attributes(self, fdigits=3):
attributes = ['event:']
attributes.extend(Marker.get_attributes(self, fdigits=fdigits))
del attributes[-1]
e = self._event
attributes.extend([
e.get_hash(), e.lat, e.lon, e.depth, e.magnitude, e.catalog,
e.name, e.region])
return attributes
def get_attribute_widths(self, fdigits=3):
ws = [6]
ws.extend(Marker.get_attribute_widths(self, fdigits=fdigits))
del ws[-1]
ws.extend([14, 12, 12, 12, 4, 5, 0, 0])
return ws
@staticmethod
def from_attributes(vals):
nslc_ids, tmin, tmax, kind = Marker.parse_attributes(
vals[1:] + ['None'])
lat, lon, depth, magnitude = [
str_to_float_or_none(x) for x in vals[5:9]]
catalog, name, region = [
str_to_str_or_none(x) for x in vals[9:]]
e = model.Event(
lat, lon, time=tmin, name=name, depth=depth, magnitude=magnitude,
region=region, catalog=catalog)
marker = EventMarker(
e, kind, event_hash=str_to_str_or_none(vals[4]))
return marker
[docs]class PhaseMarker(Marker):
'''
A PhaseMarker is a GUI-element representing a seismological phase arrival
:param nslc_ids: list of (network, station, location, channel) tuples (may
contain wildcards)
:param tmin: start time
:param tmax: end time
:param kind: (optional) integer to distinguish groups of markers
(color-coded)
:param event: a :py:class:`pyrocko.model.Event` object containing meta
information of a seismological event
:param event_hash: (optional) hash code of event (see:
:py:meth:`pyrocko.model.Event.get_hash`)
:param event_time: (optional) time of the associated event
:param phasename: (optional) name of the phase associated with the marker
:param polarity: (optional) polarity of arriving phase
:param automatic: (optional)
:param incident_angle: (optional) incident angle of phase
:param takeoff_angle: (optional) take off angle of phase
'''
def __init__(
self, nslc_ids, tmin, tmax,
kind=0,
event=None,
event_hash=None,
event_time=None,
phasename=None,
polarity=None,
automatic=None,
incidence_angle=None,
takeoff_angle=None):
Marker.__init__(self, nslc_ids, tmin, tmax, kind)
self._event = event
self._event_hash = event_hash
self._event_time = event_time
self._phasename = phasename
self._automatic = automatic
self._incidence_angle = incidence_angle
self._takeoff_angle = takeoff_angle
self.set_polarity(polarity)
def draw_trace(self, viewer, p, tr, time_projection, track_projection,
gain):
Marker.draw_trace(
self, viewer, p, tr, time_projection, track_projection, gain,
outline_label=(
self._event is not None and
self._event == viewer.get_active_event()))
def get_label(self):
t = []
if self._phasename is not None:
t.append(self._phasename)
if self._polarity is not None:
t.append(self.get_polarity_symbol())
if self._automatic:
t.append('@')
return ''.join(t)
[docs] def get_event(self):
'''Return an instance of the :py:class:`pyrocko.model.Event` associated
to this :py:class:`EventMarker`'''
return self._event
def get_event_hash(self):
if self._event_hash is not None:
return self._event_hash
else:
if self._event is None:
return None
else:
return self._event.get_hash()
def get_event_time(self):
if self._event is not None:
return self._event.time
else:
return self._event_time
def set_event_hash(self, event_hash):
self._event_hash = event_hash
def set_event(self, event):
self._event = event
if event is not None:
self.set_event_hash(event.get_hash())
def get_phasename(self):
return self._phasename
def set_phasename(self, phasename):
self._phasename = phasename
def set_polarity(self, polarity):
if polarity not in [1, -1, 0, None]:
raise ValueError('polarity has to be 1, -1, 0 or None')
self._polarity = polarity
def get_polarity_symbol(self):
return polarity_symbols.get(self._polarity, '')
def get_polarity(self):
return self._polarity
def convert_to_marker(self):
del self._event
del self._event_hash
del self._phasename
del self._polarity
del self._automatic
del self._incidence_angle
del self._takeoff_angle
self.__class__ = Marker
def hoover_message(self):
toks = []
for k in 'incidence_angle takeoff_angle polarity'.split():
v = getattr(self, '_' + k)
if v is not None:
toks.append('%s = %s' % (k, v))
return ', '.join(toks)
def get_attributes(self, fdigits=3):
attributes = ['phase:']
attributes.extend(Marker.get_attributes(self, fdigits=fdigits))
et = None, None
if self._event:
et = self._st(self._event.time, fdigits).split()
elif self._event_time:
et = self._st(self._event_time, fdigits).split()
attributes.extend([
self.get_event_hash(), et[0], et[1], self._phasename,
self._polarity, self._automatic])
return attributes
def _st(self, t, fdigits):
return util.time_to_str(
t, format='%Y-%m-%d %H:%M:%S.'+'%iFRAC' % fdigits)
def get_attribute_widths(self, fdigits=3):
ws = [6]
ws.extend(Marker.get_attribute_widths(self, fdigits=fdigits))
ws.extend([14, 12, 12, 8, 4, 5])
return ws
@staticmethod
def from_attributes(vals):
if len(vals) == 14:
nbasicvals = 7
else:
nbasicvals = 4
nslc_ids, tmin, tmax, kind = Marker.parse_attributes(
vals[1:1+nbasicvals])
i = 8
if len(vals) == 14:
i = 11
event_hash = str_to_str_or_none(vals[i-3])
event_sdate = str_to_str_or_none(vals[i-2])
event_stime = str_to_str_or_none(vals[i-1])
if event_sdate is not None and event_stime is not None:
event_time = util.str_to_time(event_sdate + ' ' + event_stime)
else:
event_time = None
phasename = str_to_str_or_none(vals[i])
polarity = str_to_int_or_none(vals[i+1])
automatic = str_to_bool(vals[i+2])
marker = PhaseMarker(nslc_ids, tmin, tmax, kind, event=None,
event_hash=event_hash, event_time=event_time,
phasename=phasename, polarity=polarity,
automatic=automatic)
return marker
[docs]def load_markers(filename):
'''
Load markers from file.
:param filename: filename as string
:returns: list of :py:class:`Marker` Objects
'''
return Marker.load_markers(filename)
[docs]def save_markers(markers, filename, fdigits=3):
'''
Save markers to file.
:param markers: list of :py:class:`Marker` Objects
:param filename: filename as string
:param fdigits: number of decimal digits to use for sub-second time strings
'''
return Marker.save_markers(markers, filename, fdigits=fdigits)
[docs]def associate_phases_to_events(markers):
'''
Reassociate phases to events after import from markers file.
'''
hash_to_events = {}
time_to_events = {}
for marker in markers:
if isinstance(marker, EventMarker):
ev = marker.get_event()
hash_to_events[marker.get_event_hash()] = ev
time_to_events[ev.time] = ev
for marker in markers:
if isinstance(marker, PhaseMarker):
h = marker.get_event_hash()
t = marker.get_event_time()
if marker.get_event() is None:
if h is not None and h in hash_to_events:
marker.set_event(hash_to_events[h])
marker.set_event_hash(None)
elif t is not None and t in time_to_events:
marker.set_event(time_to_events[t])
marker.set_event_hash(None)