Source code for pyrocko.gui.sparrow.elements.source

# https://pyrocko.org - GPLv3
#
# The Pyrocko Developers, 21st Century
# ---|P------/S----------~Lg----------

import string

import numpy as num

from pyrocko.guts import Bool, Float, Object, String

from pyrocko import cake, geometry, gf
from pyrocko.gui.qt_compat import qc, qw
from pyrocko.gui.talkie import TalkieRoot

from pyrocko.gui.vtk_util import \
    ArrowPipe, ColorbarPipe, PolygonPipe, ScatterPipe, OutlinesPipe

from .. import state as vstate
from .. import common
from . import base

guts_prefix = 'sparrow'


d2r = num.pi / 180.


map_anchor = {
    'center': (0.0, 0.0),
    'center_left': (-1.0, 0.0),
    'center_right': (1.0, 0.0),
    'top': (0.0, -1.0),
    'top_left': (-1.0, -1.0),
    'top_right': (1.0, -1.0),
    'bottom': (0.0, 1.0),
    'bottom_left': (-1.0, 1.0),
    'bottom_right': (1.0, 1.0)}


[docs]class ProxySource(TalkieRoot): pass
for source_cls in [gf.RectangularSource]: cls_name = 'Proxy' + source_cls.__name__
[docs] class proxy_source_cls(ProxySource): class_name = cls_name def __init__(self, **kwargs): ProxySource.__init__(self) for key, value in self._ranges.items(): setattr(self, key, value['ini']) if kwargs is not None: for it in kwargs.items(): setattr(self, it[0], it[1])
proxy_source_cls.__name__ = cls_name vars()[cls_name] = proxy_source_cls for prop in source_cls.T.properties: proxy_source_cls.T.add_property(prop.name, prop) ProxyRectangularSource = vars()['ProxyRectangularSource'] # silence flake8 ProxyRectangularSource._name = 'RectangularSource' ProxyRectangularSource._ranges = { 'lat': {'min': -90., 'max': 90., 'step': 1, 'ini': 0.}, 'lon': {'min': -180., 'max': 180., 'step': 1, 'ini': 0.}, 'depth': {'min': 0., 'max': 600000., 'step': 1000, 'ini': 10000.}, 'width': {'min': 0.1, 'max': 500000., 'step': 1000, 'ini': 10000.}, 'length': {'min': 0.1, 'max': 1000000., 'step': 1000, 'ini': 50000.}, 'strike': {'min': -180., 'max': 180., 'step': 1, 'ini': 0.}, 'dip': {'min': 0., 'max': 90., 'step': 1, 'ini': 45.}, 'rake': {'min': -180., 'max': 180., 'step': 1, 'ini': 0.}, 'nucleation_x': {'min': -100., 'max': 100., 'step': 1, 'ini': 0., 'fac': .01}, 'nucleation_y': {'min': -100., 'max': 100., 'step': 1, 'ini': 0., 'fac': .01}, 'slip': {'min': 0., 'max': 1000., 'step': 1, 'ini': 1., 'fac': .01}}
[docs]class ProxyConfig(Object): deltas = num.array([1000., 1000.]) deltat = Float.T(default=0.5) rho = Float.T(default=2800) vs = Float.T(default=3600) def get_shear_moduli(self, *args, **kwargs): points = kwargs.get('points') return num.ones(len(points)) * num.power(self.vs, 2) * self.rho
[docs]class ProxyStore(Object): def __init__(self, **kwargs): config = ProxyConfig() if kwargs: config.deltas = kwargs.get('deltas', config.deltas) config.deltat = kwargs.get('deltat', config.deltat) config.rho = kwargs.get('rho', config.rho) config.vs = kwargs.get('vs', config.vs) self.config = config self.mode = String.T(default='r') self._f_data = None self._f_index = None
parameter_label = { 'time (s)': 'times', 'slip (m)': 'slip', 'moment (Nm)': 'moment' } parameter_geometry = { 'time (s)': 't_arrival', 'slip (m)': 'slip', 'moment (Nm)': 'moment' } unit_label = { 'lat': '(deg)', 'lon': '(deg)', 'depth': '(m)', 'strike': '(deg)', 'dip': '(deg)', 'rake': '(deg)', 'length': '(m)', 'width': '(m)', 'slip': '(m)' }
[docs]class SourceState(base.ElementState): visible = Bool.T(default=True) source_selection = ProxySource.T(default=ProxyRectangularSource()) # noqa deltat = Float.T(default=0.5) display_parameter = String.T(default='time (s)') cpt = base.CPTState.T(default=base.CPTState.D()) @classmethod def get_name(self): return 'Rectangular Source' def create(self): element = SourceElement() return element
class SourceElement(base.Element): def __init__(self): base.Element.__init__(self) self._parent = None self._pipe = [] self._controls = None self._points = num.array([]) self.cpt_handler = base.CPTHandler() def _state_bind_source(self, *args, **kwargs): vstate.state_bind(self, self._state.source_selection, *args, **kwargs) def _state_bind_store(self, *args, **kwargs): vstate.state_bind(self, self._state, *args, **kwargs) def bind_state(self, state): base.Element.bind_state(self, state) self.talkie_connect( state, ['visible', 'source_selection', 'deltat', 'display_parameter'], self.update) self.cpt_handler.bind_state(state.cpt, self.update) def unbind_state(self): self.cpt_handler.unbind_state() base.Element.unbind_state(self) def get_name(self): return 'Rectangular Source' def set_parent(self, parent): self._parent = parent self._parent.add_panel( self.get_title_label(), self._get_controls(), visible=True, title_controls=[ self.get_title_control_remove(), self.get_title_control_visible()]) self.update() def unset_parent(self): self.unbind_state() if self._parent: if self._pipe: for pipe in self._pipe: if isinstance(pipe.actor, list): for act in pipe.actor: self._parent.remove_actor(act) else: self._parent.remove_actor(pipe.actor) self._pipe = [] if self._controls: self._parent.remove_panel(self._controls) self._controls = None self._parent.update_view() self._parent = None def open_file_load_dialog(self): caption = 'Select one file to open' fns, _ = qw.QFileDialog.getOpenFileNames( self._parent, caption, options=common.qfiledialog_options) if fns: try: self.load_source_file(str(fns[0])) except FileNotFoundError as e: raise e else: return def load_source_file(self, path): loaded_source = gf.load(filename=path) source = ProxyRectangularSource( **{prop: getattr(loaded_source, prop) for prop in loaded_source.T.propnames if getattr(loaded_source, prop)}) self._parent.remove_panel(self._controls) self._controls = None self._state.source_selection = source self._parent.add_panel( self.get_title_label(), self._get_controls(), visible=True, title_controls=[ self.get_title_control_remove(), self.get_title_control_visible()]) self.update() def open_file_save_dialog(self, fn=None): caption = 'Choose a file name to write source' if not fn: fn, _ = qw.QFileDialog.getSaveFileName( self._parent, caption, options=common.qfiledialog_options) if fn: self.save_file(str(fn)) def save_file(self, path): source = self._state.source_selection source2dump = gf.RectangularSource( **{prop: getattr(source, prop) for prop in source.T.propnames}) if path.split('.')[-1].lower() in ['xml']: source2dump.dump_xml(filename=path) else: source2dump.dump(filename=path) def update_loc(self, *args): pstate = self._parent.state state = self._state source = state.source_selection source.lat = pstate.lat source.lon = pstate.lon self._state.source_selection.source = source self.update() def update_source(self, store): state = self._state source = state.source_selection source_list = gf.source_classes for i, a in enumerate(source_list): if a.__name__ is source._name: fault = a( **{prop: source.__dict__[prop] for prop in source.T.propnames}) source_geom = fault.geometry(store) self._update_outlines(source_geom) self._update_scatter(source, fault) self._update_raster(source_geom, state.display_parameter) self._update_rake_arrow(fault) def _update_outlines(self, source_geom): if source_geom.outlines: self._pipe.append(OutlinesPipe( source_geom, color=(1., 1., 1.), cs='latlondepth')) self._parent.add_actor( self._pipe[-1].actor) self._pipe.append(OutlinesPipe( source_geom, color=(0.6, 0.6, 0.6), cs='latlon')) self._parent.add_actor( self._pipe[-1].actor) def _update_scatter(self, source, fault): for point, color in zip( ((source.nucleation_x, source.nucleation_y), map_anchor[source.anchor]), (num.array([[1., 0., 0.]]), num.array([[0., 0., 1.]]))): points = geometry.latlondepth2xyz( fault.xy_to_coord( x=[point[0]], y=[point[1]], cs='latlondepth'), planetradius=cake.earthradius) vertices = geometry.arr_vertices(points) p = ScatterPipe(vertices) p.set_symbol('sphere') p.set_colors(color) self._pipe.append(p) self._parent.add_actor(p.actor) def _update_raster(self, source_geom, param): vertices = geometry.arr_vertices( source_geom.get_vertices(col='xyz')) faces = source_geom.get_faces() if param not in parameter_label: raise NameError('No parameter label given for %s', param) if not source_geom.has_property(parameter_geometry[param]): raise AttributeError( 'No property within source geometry called %s', parameter_geometry[param]) self.cpt_handler._values = source_geom.get_property( parameter_geometry[param]) cbar_title = parameter_label[param] self.cpt_handler.update_cpt() poly_pipe = PolygonPipe( vertices, faces, values=self.cpt_handler._values, lut=self.cpt_handler._lookuptable) if not source_geom.has_property(parameter_geometry[param]): raise AttributeError( 'No property within source geometry called %s', parameter_geometry[param]) tmin = self._parent.state.tmin_effective tmax = self._parent.state.tmax_effective times = source_geom.get_property(parameter_geometry[param]) times += self._state.source_selection.time if tmin is not None: m1 = times < tmin else: m1 = num.zeros(times.size, dtype=bool) if tmax is not None: m3 = tmax < times else: m3 = num.zeros(times.size, dtype=bool) m2 = num.logical_not(num.logical_or(m1, m3)) if not any(m2): poly_pipe.set_alpha(0.) # print(m2.astype(num.float_)) # poly_pipe.set_alpha(m2.copy().astype(num.float_)) # print(self.cpt_handler._lookuptable.GetRange()) self._pipe.append(poly_pipe) self._parent.add_actor(self._pipe[-1].actor) if cbar_title is not None: cbar_pipe = ColorbarPipe( parent_pipe=poly_pipe, cbar_title=cbar_title, lut=self.cpt_handler._lookuptable) self._pipe.append(cbar_pipe) self._parent.add_actor(self._pipe[-1].actor) def _update_rake_arrow(self, fault): source = self._state.source_selection rake = source.rake * d2r nucl_x = source.nucleation_x nucl_y = source.nucleation_y wd_ln = source.width / source.length endpoint = [None] * 2 endpoint[0] = nucl_x + num.cos(rake) * wd_ln endpoint[1] = nucl_y + num.sin(-rake) points = geometry.latlondepth2xyz( fault.xy_to_coord( x=[nucl_x, endpoint[0]], y=[nucl_y, endpoint[1]], cs='latlondepth'), planetradius=cake.earthradius) vertices = geometry.arr_vertices(points) self._pipe.append(ArrowPipe(vertices[0], vertices[1])) self._parent.add_actor(self._pipe[-1].actor) def update(self, *args): state = self._state store = ProxyStore( deltat=state.deltat) store.config.deltas = num.array( [(store.config.deltat * store.config.vs) + 1] * 2) if self._pipe: for pipe in self._pipe: self._parent.remove_actor(pipe.actor) self._pipe = [] if state.visible: self.update_source(store) self._parent.update_view() def _get_controls(self): if not self._controls: from ..state import \ state_bind_slider, state_bind_combobox from pyrocko import gf source = self._state.source_selection frame = qw.QFrame() layout = qw.QGridLayout() frame.setLayout(layout) def state_to_lineedit(state, attribute, widget): sel = getattr(state, attribute) widget.setText('%g' % sel) def lineedit_to_state(widget, state, attribute): s = float(widget.text()) try: setattr(state, attribute, s) except Exception: raise ValueError( 'Value of %s needs to be a float or integer' % string.capwords(attribute)) il = 0 # Origin time controls layout.addWidget(qw.QLabel('Origin time'), il, 0) le_time = qw.QLineEdit() layout.addWidget(le_time, il, 1, 1, 2) self._state_bind_source( ['time'], common.lineedit_to_time, le_time, [le_time.editingFinished, le_time.returnPressed], common.time_to_lineedit, attribute='time') for var in ['tmin', 'tmax', 'tduration', 'tposition']: self.talkie_connect( self._parent.state, var, self.update) # Source property controls for il, label in enumerate(source.T.propnames, start=il+1): if label in source._ranges.keys(): unit = unit_label[label] if label in unit_label else '' layout.addWidget(qw.QLabel( f'{string.capwords(label)} {unit}'), il, 0) slider = qw.QSlider(qc.Qt.Horizontal) slider.setSizePolicy( qw.QSizePolicy( qw.QSizePolicy.Expanding, qw.QSizePolicy.Fixed)) slider.setMinimum( int(round(source._ranges[label]['min']))) slider.setMaximum( int(round(source._ranges[label]['max']))) slider.setSingleStep( int(round(source._ranges[label]['step']))) slider.setPageStep( int(round(source._ranges[label]['step']))) layout.addWidget(slider, il, 1) try: state_bind_slider( self, self._state.source_selection, label, slider, factor=source._ranges[label]['fac']) except Exception: state_bind_slider( self, self._state.source_selection, label, slider) le = qw.QLineEdit() layout.addWidget(le, il, 2) self._state_bind_source( [label], lineedit_to_state, le, [le.editingFinished, le.returnPressed], state_to_lineedit, attribute=label) for label, name in zip( ['Sampling int. (s)'], ['deltat']): il += 1 layout.addWidget(qw.QLabel(label), il, 0) slider = qw.QSlider(qc.Qt.Horizontal) slider.setSizePolicy( qw.QSizePolicy( qw.QSizePolicy.Expanding, qw.QSizePolicy.Fixed)) slider.setMinimum(1) slider.setMaximum(1000) slider.setSingleStep(1) slider.setPageStep(1) layout.addWidget(slider, il, 1) state_bind_slider( self, self._state, name, slider, factor=0.01) le = qw.QLineEdit() layout.addWidget(le, il, 2) self._state_bind_store( [name], lineedit_to_state, le, [le.editingFinished, le.returnPressed], state_to_lineedit, attribute=name) il += 1 layout.addWidget(qw.QLabel('Anchor'), il, 0) cb = qw.QComboBox() for i, s in enumerate(gf.RectangularSource.anchor.choices): cb.insertItem(i, s) layout.addWidget(cb, il, 1, 1, 2) state_bind_combobox( self, self._state.source_selection, 'anchor', cb) il += 1 layout.addWidget(qw.QLabel('Display Param.'), il, 0) cb = qw.QComboBox() for i, s in enumerate(parameter_label.keys()): cb.insertItem(i, s) layout.addWidget(cb, il, 1) state_bind_combobox( self, self._state, 'display_parameter', cb) self.cpt_handler.cpt_controls( self._parent, self._state.cpt, layout) il = layout.rowCount() + 1 pb = qw.QPushButton('Move Here') layout.addWidget(pb, il, 0) pb.clicked.connect(self.update_loc) pb = qw.QPushButton('Load') layout.addWidget(pb, il, 1) pb.clicked.connect(self.open_file_load_dialog) pb = qw.QPushButton('Save') layout.addWidget(pb, il, 2) pb.clicked.connect(self.open_file_save_dialog) il += 1 layout.addWidget(qw.QFrame(), il, 0, 1, 3) self._controls = frame self.cpt_handler._update_cpt_combobox() self.cpt_handler._update_cptscale_lineedit() return self._controls __all__ = [ 'SourceElement', 'SourceState', ]