# https://pyrocko.org - GPLv3
#
# The Pyrocko Developers, 21st Century
# ---|P------/S----------~Lg----------
import math
import copy
import logging
import sys
import numpy as num
from pyrocko import util, plot, model, trace
from pyrocko.util import TableWriter, TableReader, gmtime_x, mystrftime
logger = logging.getLogger('pyrocko.gui.snuffler.marker')
if sys.version_info[0] >= 3:
polarity_symbols = {1: u'\u2191', -1: u'\u2193', None: u'', 0: u'\u2195'}
else:
polarity_symbols = {1: '+', -1: '-', None: '', 0: '0'}
def str_to_float_or_none(s):
if s == 'None':
return None
return float(s)
def str_to_str_or_none(s):
if s == 'None':
return None
return s
def str_to_int_or_none(s):
if s == 'None':
return None
return int(s)
def str_to_bool(s):
return s.lower() in ('true', 't', '1')
def myctime(timestamp):
tt, ms = gmtime_x(timestamp)
return mystrftime(None, tt, ms)
g_color_b = [plot.color(x) for x in (
'scarletred1', 'scarletred2', 'scarletred3',
'chameleon1', 'chameleon2', 'chameleon3',
'skyblue1', 'skyblue2', 'skyblue3',
'orange1', 'orange2', 'orange3',
'plum1', 'plum2', 'plum3',
'chocolate1', 'chocolate2', 'chocolate3',
'butter1', 'butter2', 'butter3',
'aluminium3', 'aluminium4', 'aluminium5')]
class MarkerParseError(Exception):
pass
[docs]class MarkerOneNSLCRequired(Exception):
'''
Raised when a marker with exactly one NSLC entry is required but there are
zero or more than one.
'''
pass
[docs]class Marker(object):
'''
General purpose marker GUI element and base class for
:py:class:`EventMarker` and :py:class:`PhaseMarker`.
:param nslc_ids: list of (network, station, location, channel) tuples
(may contain wildcards)
:param tmin: start time
:param tmax: end time
:param kind: (optional) integer to distinguish groups of markers
(color-coded)
'''
[docs] @staticmethod
def save_markers(markers, fn, fdigits=3):
'''
Static method to write marker objects to file.
:param markers: list of :py:class:`Marker` objects
:param fn: filename as string
:param fdigits: number of decimal digits to use for sub-second time
strings (default 3)
'''
f = open(fn, 'w')
f.write('# Snuffler Markers File Version 0.2\n')
writer = TableWriter(f)
for marker in markers:
a = marker.get_attributes(fdigits=fdigits)
w = marker.get_attribute_widths(fdigits=fdigits)
row = []
for x in a:
if x is None or x == '':
row.append('None')
else:
row.append(x)
writer.writerow(row, w)
f.close()
[docs] @staticmethod
def load_markers(fn):
'''
Static method to load markers from file.
:param filename: filename as string
:returns: list of :py:class:`Marker`, :py:class:`EventMarker` or
:py:class:`PhaseMarker` objects
'''
markers = []
with open(fn, 'r') as f:
line = f.readline()
if not line.startswith('# Snuffler Markers File Version'):
raise MarkerParseError('Not a marker file')
elif line.startswith('# Snuffler Markers File Version 0.2'):
reader = TableReader(f)
while not reader.eof:
row = reader.readrow()
if not row:
continue
if row[0] == 'event:':
marker = EventMarker.from_attributes(row)
elif row[0] == 'phase:':
marker = PhaseMarker.from_attributes(row)
else:
marker = Marker.from_attributes(row)
markers.append(marker)
else:
logger.warning('Unsupported Markers File Version')
return markers
def __init__(self, nslc_ids, tmin, tmax, kind=0):
self.set(nslc_ids, tmin, tmax)
self.alerted = False
self.selected = False
self.kind = kind
self.active = False
[docs] def set(self, nslc_ids, tmin, tmax):
'''
Set ``nslc_ids``, start time and end time of :py:class:`Marker`.
:param nslc_ids: list or set of (network, station, location, channel)
tuples
:param tmin: start time
:param tmax: end time
'''
self.nslc_ids = nslc_ids
self.tmin = util.to_time_float(tmin)
self.tmax = util.to_time_float(tmax)
[docs] def set_kind(self, kind):
'''
Set kind of :py:class:`Marker`.
:param kind: (optional) integer to distinguish groups of markers
(color-coded)
'''
self.kind = kind
[docs] def get_tmin(self):
'''
Get *start time* of :py:class:`Marker`.
'''
return self.tmin
[docs] def get_tmax(self):
'''
Get *end time* of :py:class:`Marker`.
'''
return self.tmax
[docs] def get_nslc_ids(self):
'''
Get marker's network-station-location-channel pattern.
:returns: list or set of (network, station, location, channel) tuples
The network, station, location, or channel strings may contain wildcard
expressions.
'''
return self.nslc_ids
def is_alerted(self):
return self.alerted
def is_selected(self):
return self.selected
def set_alerted(self, state):
self.alerted = state
[docs] def match_nsl(self, nsl):
'''
See documentation of :py:func:`pyrocko.util.match_nslc`.
'''
patterns = ['.'.join(x[:3]) for x in self.nslc_ids]
return util.match_nslc(patterns, nsl)
[docs] def match_nslc(self, nslc):
'''
See documentation of :py:func:`pyrocko.util.match_nslc`.
'''
patterns = ['.'.join(x) for x in self.nslc_ids]
return util.match_nslc(patterns, nslc)
[docs] def one_nslc(self):
'''
Get single NSLC pattern or raise an exception if there is not exactly
one.
If *nslc_ids* contains a single entry, return it. If more than one is
available, raise :py:exc:`MarkerOneNSLCRequired`.
'''
if len(self.nslc_ids) != 1:
raise MarkerOneNSLCRequired()
return list(self.nslc_ids)[0]
def hoover_message(self):
return ''
[docs] def copy(self):
'''
Get a copy of this marker.
'''
return copy.deepcopy(self)
def __str__(self):
traces = ','.join(['.'.join(nslc_id) for nslc_id in self.nslc_ids])
st = myctime
if self.tmin == self.tmax:
return '%s %i %s' % (st(self.tmin), self.kind, traces)
else:
return '%s %s %g %i %s' % (
st(self.tmin), st(self.tmax), self.tmax-self.tmin, self.kind,
traces)
def get_attributes(self, fdigits=3):
traces = ','.join(['.'.join(nslc_id) for nslc_id in self.nslc_ids])
def st(t):
return util.time_to_str(
t, format='%Y-%m-%d %H:%M:%S.'+'%iFRAC' % fdigits)
vals = []
vals.extend(st(self.tmin).split())
if self.tmin != self.tmax:
vals.extend(st(self.tmax).split())
vals.append(self.tmax-self.tmin)
vals.append(self.kind)
vals.append(traces)
return vals
def get_attribute_widths(self, fdigits=3):
ws = [10, 9+fdigits]
if self.tmin != self.tmax:
ws.extend([10, 9+fdigits, 12])
ws.extend([2, 15])
return ws
@staticmethod
def parse_attributes(vals):
tmin = util.str_to_time(vals[0] + ' ' + vals[1])
i = 2
tmax = tmin
if len(vals) == 7:
tmax = util.str_to_time(vals[2] + ' ' + vals[3])
i = 5
kind = int(vals[i])
traces = vals[i+1]
if traces == 'None':
nslc_ids = []
else:
nslc_ids = tuple(
[tuple(nslc_id.split('.')) for nslc_id in traces.split(',')])
return nslc_ids, tmin, tmax, kind
@staticmethod
def from_attributes(vals):
return Marker(*Marker.parse_attributes(vals))
def select_color(self, colorlist):
def cl(x):
return colorlist[(self.kind*3+x) % len(colorlist)]
if self.selected:
return cl(1)
if self.alerted:
return cl(1)
return cl(2)
def draw(
self, p, time_projection, y_projection,
draw_line=True,
draw_triangle=False,
**kwargs):
from ..qt_compat import qc, qg
from .. import util as gui_util
color = self.select_color(g_color_b)
pen = qg.QPen(qg.QColor(*color))
pen.setWidth(2)
p.setPen(pen)
umin = time_projection(self.tmin)
umax = time_projection(self.tmax)
v0, v1 = y_projection.get_out_range()
line = qc.QLineF(umin-1, v0, umax+1, v0)
p.drawLine(line)
if self.selected or self.alerted or not self.nslc_ids:
linepen = qg.QPen(pen)
if self.selected or self.alerted:
linepen.setStyle(qc.Qt.CustomDashLine)
pat = [5., 3.]
linepen.setDashPattern(pat)
if self.alerted and not self.selected:
linepen.setColor(qg.QColor(150, 150, 150))
s = 9.
utriangle = gui_util.make_QPolygonF(
[-0.577*s, 0., 0.577*s], [0., 1.*s, 0.])
ltriangle = gui_util.make_QPolygonF(
[-0.577*s, 0., 0.577*s], [0., -1.*s, 0.])
def drawline(t):
u = time_projection(t)
line = qc.QLineF(u, v0, u, v1)
p.drawLine(line)
def drawtriangles(t):
u = time_projection(t)
t = qg.QPolygonF(utriangle)
t.translate(u, v0)
p.drawConvexPolygon(t)
t = qg.QPolygonF(ltriangle)
t.translate(u, v1)
p.drawConvexPolygon(t)
if draw_line or self.selected or self.alerted:
p.setPen(linepen)
drawline(self.tmin)
drawline(self.tmax)
if draw_triangle:
pen.setStyle(qc.Qt.SolidLine)
pen.setJoinStyle(qc.Qt.MiterJoin)
pen.setWidth(2)
p.setPen(pen)
p.setBrush(qg.QColor(*color))
drawtriangles(self.tmin)
def draw_trace(
self, viewer, p, tr, time_projection, track_projection, gain,
outline_label=False):
from ..qt_compat import qc, qg
from .. import util as gui_util
if self.nslc_ids and not self.match_nslc(tr.nslc_id):
return
color = self.select_color(g_color_b)
pen = qg.QPen(qg.QColor(*color))
pen.setWidth(2)
p.setPen(pen)
p.setBrush(qc.Qt.NoBrush)
def drawpoint(t, y):
u = time_projection(t)
v = track_projection(y)
rect = qc.QRectF(u-2, v-2, 4, 4)
p.drawRect(rect)
def drawline(t):
u = time_projection(t)
v0, v1 = track_projection.get_out_range()
line = qc.QLineF(u, v0, u, v1)
p.drawLine(line)
try:
snippet = tr.chop(
self.tmin, self.tmax,
inplace=False,
include_last=True,
snap=(math.ceil, math.floor))
vdata = track_projection(gain*snippet.get_ydata())
udata_min = float(
time_projection(snippet.tmin))
udata_max = float(
time_projection(snippet.tmin+snippet.deltat*(vdata.size-1)))
udata = num.linspace(udata_min, udata_max, vdata.size)
qpoints = gui_util.make_QPolygonF(udata, vdata)
pen.setWidth(1)
p.setPen(pen)
p.drawPolyline(qpoints)
pen.setWidth(2)
p.setPen(pen)
drawpoint(*tr(self.tmin, clip=True, snap=math.ceil))
drawpoint(*tr(self.tmax, clip=True, snap=math.floor))
except trace.NoData:
pass
color = self.select_color(g_color_b)
pen = qg.QPen(qg.QColor(*color))
pen.setWidth(2)
p.setPen(pen)
drawline(self.tmin)
drawline(self.tmax)
label = self.get_label()
if label:
label_bg = qg.QBrush(qg.QColor(255, 255, 255))
u = time_projection(self.tmin)
v0, v1 = track_projection.get_out_range()
if outline_label:
du = -7
else:
du = -5
gui_util.draw_label(
p, u+du, v0, label, label_bg, 'TR',
outline=outline_label)
if self.tmin == self.tmax:
try:
drawpoint(self.tmin, tr.interpolate(self.tmin))
except IndexError:
pass
def get_label(self):
return None
def convert_to_phase_marker(
self,
event=None,
phasename=None,
polarity=None,
automatic=None,
incidence_angle=None,
takeoff_angle=None):
if isinstance(self, PhaseMarker):
return
self.__class__ = PhaseMarker
self._event = event
self._phasename = phasename
self._polarity = polarity
self._automatic = automatic
self._incidence_angle = incidence_angle
self._takeoff_angle = takeoff_angle
if self._event:
self._event_hash = event.get_hash()
self._event_time = event.time
else:
self._event_hash = None
self._event_time = None
self.active = False
def convert_to_event_marker(self, lat=0., lon=0.):
if isinstance(self, EventMarker):
return
if isinstance(self, PhaseMarker):
self.convert_to_marker()
self.__class__ = EventMarker
self._event = model.Event(lat, lon, time=self.tmin, name='Event')
self._event_hash = self._event.get_hash()
self.active = False
self.tmax = self.tmin
self.nslc_ids = []
[docs]class EventMarker(Marker):
'''
GUI element representing a seismological event.
:param event: A :py:class:`~pyrocko.model.event.Event` object containing
meta information of a seismological event
:param kind: (optional) integer to distinguish groups of markers
:param event_hash: (optional) hash code of event (see:
:py:meth:`~pyrocko.model.event.Event.get_hash`)
'''
def __init__(self, event, kind=0, event_hash=None):
Marker.__init__(self, [], event.time, event.time, kind)
self._event = event
self.active = False
self._event_hash = event_hash
def get_event_hash(self):
if self._event_hash is not None:
return self._event_hash
else:
return self._event.get_hash()
def label(self):
t = []
mag = self._event.magnitude
if mag is not None:
t.append('M%3.1f' % mag)
reg = self._event.region
if reg is not None:
t.append(reg)
nam = self._event.name
if nam is not None:
t.append(nam)
s = ' '.join(t)
if not s:
s = '(Event)'
return s
def draw(self, p, time_projection, y_projection, with_label=False):
Marker.draw(
self, p, time_projection, y_projection,
draw_line=False,
draw_triangle=True)
if with_label:
self.draw_label(p, time_projection, y_projection)
def draw_label(self, p, time_projection, y_projection):
from ..qt_compat import qg
from .. import util as gui_util
u = time_projection(self.tmin)
v0, v1 = y_projection.get_out_range()
label_bg = qg.QBrush(qg.QColor(255, 255, 255))
gui_util.draw_label(
p, u, v0-10., self.label(), label_bg, 'CB',
outline=self.active)
[docs] def get_event(self):
'''
Return an instance of the :py:class:`~pyrocko.model.event.Event`
associated to this :py:class:`EventMarker`
'''
return self._event
def draw_trace(self, viewer, p, tr, time_projection, track_projection,
gain):
pass
def hoover_message(self):
ev = self.get_event()
evs = []
for k in 'magnitude lat lon depth name region catalog'.split():
if ev.__dict__[k] is not None and ev.__dict__[k] != '':
if k == 'depth':
sv = '%g km' % (ev.depth * 0.001)
else:
sv = '%s' % ev.__dict__[k]
evs.append('%s = %s' % (k, sv))
return ', '.join(evs)
def get_attributes(self, fdigits=3):
attributes = ['event:']
attributes.extend(Marker.get_attributes(self, fdigits=fdigits))
del attributes[-1]
e = self._event
attributes.extend([
e.get_hash(), e.lat, e.lon, e.depth, e.magnitude, e.catalog,
e.name, e.region])
return attributes
def get_attribute_widths(self, fdigits=3):
ws = [6]
ws.extend(Marker.get_attribute_widths(self, fdigits=fdigits))
del ws[-1]
ws.extend([14, 12, 12, 12, 4, 5, 0, 0])
return ws
@staticmethod
def from_attributes(vals):
nslc_ids, tmin, tmax, kind = Marker.parse_attributes(
vals[1:] + ['None'])
lat, lon, depth, magnitude = [
str_to_float_or_none(x) for x in vals[5:9]]
catalog, name, region = [
str_to_str_or_none(x) for x in vals[9:]]
e = model.Event(
lat, lon, time=tmin, name=name, depth=depth, magnitude=magnitude,
region=region, catalog=catalog)
marker = EventMarker(
e, kind, event_hash=str_to_str_or_none(vals[4]))
return marker
[docs]class PhaseMarker(Marker):
'''
A PhaseMarker is a GUI-element representing a seismological phase arrival
:param nslc_ids: list of (network, station, location, channel) tuples (may
contain wildcards)
:param tmin: start time
:param tmax: end time
:param kind: (optional) integer to distinguish groups of markers
(color-coded)
:param event: a :py:class:`~pyrocko.model.event.Event` object containing
meta information of a seismological event
:param event_hash: (optional) hash code of event (see:
:py:meth:`~pyrocko.model.event.Event.get_hash`)
:param event_time: (optional) time of the associated event
:param phasename: (optional) name of the phase associated with the marker
:param polarity: (optional) polarity of arriving phase
:param automatic: (optional)
:param incident_angle: (optional) incident angle of phase
:param takeoff_angle: (optional) take off angle of phase
'''
def __init__(
self, nslc_ids, tmin, tmax,
kind=0,
event=None,
event_hash=None,
event_time=None,
phasename=None,
polarity=None,
automatic=None,
incidence_angle=None,
takeoff_angle=None):
Marker.__init__(self, nslc_ids, tmin, tmax, kind)
self._event = event
self._event_hash = event_hash
self._event_time = event_time
self._phasename = phasename
self._automatic = automatic
self._incidence_angle = incidence_angle
self._takeoff_angle = takeoff_angle
self.set_polarity(polarity)
def draw_trace(self, viewer, p, tr, time_projection, track_projection,
gain):
Marker.draw_trace(
self, viewer, p, tr, time_projection, track_projection, gain,
outline_label=(
self._event is not None and
self._event == viewer.get_active_event()))
def get_label(self):
t = []
if self._phasename is not None:
t.append(self._phasename)
if self._polarity is not None:
t.append(self.get_polarity_symbol())
if self._automatic:
t.append('@')
return ''.join(t)
[docs] def get_event(self):
'''
Return an instance of the :py:class:`~pyrocko.model.event.Event`
associated to this :py:class:`EventMarker`
'''
return self._event
def get_event_hash(self):
if self._event_hash is not None:
return self._event_hash
else:
if self._event is None:
return None
else:
return self._event.get_hash()
def get_event_time(self):
if self._event is not None:
return self._event.time
else:
return self._event_time
def set_event_hash(self, event_hash):
self._event_hash = event_hash
def set_event(self, event):
self._event = event
if event is not None:
self.set_event_hash(event.get_hash())
def get_phasename(self):
return self._phasename
def set_phasename(self, phasename):
self._phasename = phasename
def set_polarity(self, polarity):
if polarity not in [1, -1, 0, None]:
raise ValueError('polarity has to be 1, -1, 0 or None')
self._polarity = polarity
def get_polarity_symbol(self):
return polarity_symbols.get(self._polarity, '')
def get_polarity(self):
return self._polarity
def convert_to_marker(self):
del self._event
del self._event_hash
del self._phasename
del self._polarity
del self._automatic
del self._incidence_angle
del self._takeoff_angle
self.active = False
self.__class__ = Marker
def hoover_message(self):
toks = []
for k in 'incidence_angle takeoff_angle polarity'.split():
v = getattr(self, '_' + k)
if v is not None:
toks.append('%s = %s' % (k, v))
return ', '.join(toks)
def get_attributes(self, fdigits=3):
attributes = ['phase:']
attributes.extend(Marker.get_attributes(self, fdigits=fdigits))
et = None, None
if self._event:
et = self._st(self._event.time, fdigits).split()
elif self._event_time:
et = self._st(self._event_time, fdigits).split()
attributes.extend([
self.get_event_hash(), et[0], et[1], self._phasename,
self._polarity, self._automatic])
return attributes
def _st(self, t, fdigits):
return util.time_to_str(
t, format='%Y-%m-%d %H:%M:%S.'+'%iFRAC' % fdigits)
def get_attribute_widths(self, fdigits=3):
ws = [6]
ws.extend(Marker.get_attribute_widths(self, fdigits=fdigits))
ws.extend([14, 12, 12, 8, 4, 5])
return ws
@staticmethod
def from_attributes(vals):
if len(vals) == 14:
nbasicvals = 7
else:
nbasicvals = 4
nslc_ids, tmin, tmax, kind = Marker.parse_attributes(
vals[1:1+nbasicvals])
i = 8
if len(vals) == 14:
i = 11
event_hash = str_to_str_or_none(vals[i-3])
event_sdate = str_to_str_or_none(vals[i-2])
event_stime = str_to_str_or_none(vals[i-1])
if event_sdate is not None and event_stime is not None:
event_time = util.str_to_time(event_sdate + ' ' + event_stime)
else:
event_time = None
phasename = str_to_str_or_none(vals[i])
polarity = str_to_int_or_none(vals[i+1])
automatic = str_to_bool(vals[i+2])
marker = PhaseMarker(nslc_ids, tmin, tmax, kind, event=None,
event_hash=event_hash, event_time=event_time,
phasename=phasename, polarity=polarity,
automatic=automatic)
return marker
[docs]def load_markers(filename):
'''
Load markers from file.
:param filename: filename as string
:returns: list of :py:class:`Marker` Objects
'''
return Marker.load_markers(filename)
[docs]def save_markers(markers, filename, fdigits=3):
'''
Save markers to file.
:param markers: list of :py:class:`Marker` Objects
:param filename: filename as string
:param fdigits: number of decimal digits to use for sub-second time strings
'''
return Marker.save_markers(markers, filename, fdigits=fdigits)
[docs]def associate_phases_to_events(markers):
'''
Reassociate phases to events after import from markers file.
'''
hash_to_events = {}
time_to_events = {}
for marker in markers:
if isinstance(marker, EventMarker):
ev = marker.get_event()
hash_to_events[marker.get_event_hash()] = ev
time_to_events[ev.time] = ev
for marker in markers:
if isinstance(marker, PhaseMarker):
h = marker.get_event_hash()
t = marker.get_event_time()
if marker.get_event() is None:
if h is not None and h in hash_to_events:
marker.set_event(hash_to_events[h])
marker.set_event_hash(None)
elif t is not None and t in time_to_events:
marker.set_event(time_to_events[t])
marker.set_event_hash(None)