Coverage for /usr/local/lib/python3.11/dist-packages/pyrocko/gui/talkie.py: 85%
349 statements
« prev ^ index » next coverage.py v6.5.0, created at 2023-10-06 15:01 +0000
« prev ^ index » next coverage.py v6.5.0, created at 2023-10-06 15:01 +0000
1# https://pyrocko.org - GPLv3
2#
3# The Pyrocko Developers, 21st Century
4# ---|P------/S----------~Lg----------
6import difflib
7from collections import defaultdict
9from pyrocko import util
10from pyrocko.guts import Object, List, clone, path_to_str
11from weakref import ref
14class listdict(dict):
15 def __missing__(self, k):
16 self[k] = []
17 return self[k]
20def root_and_path(obj, name=None):
21 root = obj
22 path = []
23 if name is not None:
24 path.append(name)
26 while True:
27 try:
28 root, name_at_parent = root._talkie_parent
29 path.append(name_at_parent)
30 except AttributeError:
31 break
33 return root, '.'.join(path[::-1])
36def lclone(xs):
37 return [clone(x) for x in xs]
40g_uid = 0
43def new_uid():
44 global g_uid
45 g_uid += 1
46 return '#%i' % g_uid
49def has_computed(cls):
50 cls._computed = defaultdict(list)
51 cls._computed_rev = defaultdict(list)
53 for prop_name in dir(cls):
54 try:
55 prop = getattr(cls, prop_name)
56 depends_on = prop.fget._computed_depends_on
57 for name in depends_on:
58 cls._computed_rev[name].append(prop_name)
59 cls._computed[prop_name].append(name)
61 except AttributeError:
62 pass
64 return cls
67def computed(depends_on):
68 def wrapper(func):
69 func._computed_depends_on = depends_on
70 return property(func)
72 return wrapper
75class Talkie(Object):
76 _computed = ()
77 _computed_rev = None
79 def __setattr__(self, name, value):
80 try:
81 t = self.T.get_property(name)
82 except ValueError:
83 Object.__setattr__(self, name, value)
84 return
86 if isinstance(t, List.T):
87 value = TalkieList(value)
89 oldvalue = getattr(self, name, None)
90 if oldvalue:
91 if isinstance(oldvalue, (Talkie, TalkieList)):
92 oldvalue.unset_parent()
94 if isinstance(value, (Talkie, TalkieList)):
95 value.set_parent(self, name)
97 Object.__setattr__(self, name, value)
98 self.fire([name], value)
99 if self._computed_rev is not None:
100 for dname in self._computed_rev[name]:
101 self.fire([dname], None)
103 def fire(self, path, value):
104 self.fire_event(path, value)
105 if hasattr(self, '_talkie_parent'):
106 root, name_at_parent = self._talkie_parent
107 path.append(name_at_parent)
108 root.fire(path, value)
110 def set_parent(self, parent, name):
111 Object.__setattr__(self, '_talkie_parent', (parent, name))
113 def unset_parent(self):
114 Object.__delattr__(self, '_talkie_parent')
116 def fire_event(self, path, value):
117 pass
119 def diff(self, other, path=()):
120 assert type(self) is type(other), '%s %s' % (type(self), type(other))
122 for (s_prop, s_val), (o_prop, o_val) in zip(
123 self.T.ipropvals(self), other.T.ipropvals(other)):
125 if not s_prop.multivalued:
126 if isinstance(s_val, Talkie) \
127 and type(s_val) is type(o_val):
129 for x in s_val.diff(o_val, path + (s_prop.name,)):
130 yield x
131 else:
132 if not equal(s_val, o_val):
133 yield 'set', path + (s_prop.name,), clone(o_val)
134 else:
135 if issubclass(s_prop.content_t._cls, Talkie):
136 sm = difflib.SequenceMatcher(
137 None,
138 type_eq_proxy_seq(s_val),
139 type_eq_proxy_seq(o_val))
140 mode = 1
142 else:
143 sm = difflib.SequenceMatcher(
144 None,
145 eq_proxy_seq(s_val),
146 eq_proxy_seq(o_val))
147 mode = 2
149 for tag, i1, i2, j1, j2 in list(sm.get_opcodes()):
150 if tag == 'equal' and mode == 1:
151 for koff, (s_element, o_element) in enumerate(zip(
152 s_val[i1:i2], o_val[j1:j2])):
154 for x in s_element.diff(
155 o_element,
156 path + ((s_prop.name, i1+koff),)):
157 yield x
159 if tag == 'replace':
160 yield (
161 'replace',
162 path + ((s_prop.name, i1, i2),),
163 lclone(o_val[j1:j2]))
165 elif tag == 'delete':
166 yield (
167 'delete',
168 path + ((s_prop.name, i1, i2),),
169 None)
171 elif tag == 'insert':
172 yield (
173 'insert',
174 path + ((s_prop.name, i1, i2),),
175 lclone(o_val[j1:j2]))
177 def str_diff(self, other):
178 lines = []
179 for tag, path, value in self.diff(other):
180 lines.append('%s %s' % (tag, path_to_str(path)))
182 return '\n'.join(lines)
184 def diff_update(self, other, path=()):
185 assert type(self) is type(other), '%s %s' % (type(self), type(other))
187 for (s_prop, s_val), (o_prop, o_val) in zip(
188 self.T.ipropvals(self), other.T.ipropvals(other)):
190 if not s_prop.multivalued:
191 if isinstance(s_val, Talkie) \
192 and type(s_val) is type(o_val):
194 s_val.diff_update(o_val, path + (s_prop.name,))
195 else:
196 if not equal(s_val, o_val):
197 setattr(self, s_prop.name, clone(o_val))
198 else:
199 if issubclass(s_prop.content_t._cls, Talkie):
200 sm = difflib.SequenceMatcher(
201 None,
202 type_eq_proxy_seq(s_val),
203 type_eq_proxy_seq(o_val))
204 mode = 1
206 else:
207 sm = difflib.SequenceMatcher(
208 None,
209 eq_proxy_seq(s_val),
210 eq_proxy_seq(o_val))
211 mode = 2
213 ioff = 0
214 for tag, i1, i2, j1, j2 in list(sm.get_opcodes()):
215 if tag == 'equal' and mode == 1:
216 for koff, (s_element, o_element) in enumerate(zip(
217 s_val[i1+ioff:i2+ioff], o_val[j1:j2])):
219 s_element.diff_update(
220 o_element,
221 path + ((s_prop.name, i1+ioff+koff),))
223 elif tag == 'replace':
224 for _ in range(i1, i2):
225 s_val.pop(i1+ioff)
227 for j in range(j1, j2):
228 s_val.insert(i1+ioff, clone(o_val[j]))
230 ioff += (j2 - j1) - (i2 - i1)
232 elif tag == 'delete':
233 for _ in range(i1, i2):
234 s_val.pop(i1 + ioff)
236 ioff -= (i2 - i1)
238 elif tag == 'insert':
239 for j in range(j1, j2):
240 s_val.insert(i1+ioff, clone(o_val[j]))
242 ioff += (j2 - j1)
245def equal(a, b):
246 return str(a) == str(b) # to be replaced by recursive guts.equal
249def ghash(obj):
250 return hash(str(obj))
253class GutsEqProxy(object):
254 def __init__(self, obj):
255 self._obj = obj
257 def __eq__(self, other):
258 return equal(self._obj, other._obj)
260 def __hash__(self):
261 return ghash(self._obj)
264class TypeEqProxy(object):
265 def __init__(self, obj):
266 self._obj = obj
268 def __eq__(self, other):
269 return type(self._obj) is type(other._obj) # noqa
271 def __hash__(self):
272 return hash(type(self._obj))
275def eq_proxy_seq(seq):
276 return list(GutsEqProxy(x) for x in seq)
279def type_eq_proxy_seq(seq):
280 return list(TypeEqProxy(x) for x in seq)
283class TalkieConnection(object):
284 def __init__(self, talkie_root, path, listener):
285 self._talkie_root = talkie_root
286 self._listener = listener
287 self._path = path
288 self._ref_listener = ref(listener)
290 def release(self):
291 self._talkie_root.disconnect(self)
294class TalkieRoot(Talkie):
296 def __init__(self, **kwargs):
297 self._listeners = listdict()
298 Talkie.__init__(self, **kwargs)
300 def talkie_connect(self, path, listener):
301 connection = TalkieConnection(self, path, listener)
302 self._listeners[path].append(connection._ref_listener)
303 return connection
305 def talkie_disconnect(self, connection):
306 try:
307 self._listeners[connection._path].remove(
308 connection._ref_listener)
309 except ValueError:
310 pass
312 def fire_event(self, path, value):
313 path = '.'.join(path[::-1])
314 # print('fire_event:', path, value)
315 parts = path.split('.')
316 for i in range(len(parts)+1):
317 subpath = '.'.join(parts[:i])
318 target_refs = self._listeners[subpath]
319 delete = []
320 for target_ref in target_refs:
321 target = target_ref()
322 if target:
323 target(path, value)
324 else:
325 delete.append(target_ref)
327 for target_ref in delete:
328 target_refs.remove(target_ref)
330 def get(self, path):
331 x = self
332 for s in path.split('.'):
333 x = getattr(x, s)
335 return x
337 def set(self, path, value):
338 x = self
339 p = path.split('.')
340 for s in p[:-1]:
341 x = getattr(x, s)
343 setattr(x, p[-1], value)
346class TalkieList(list):
348 def fire(self, path, value):
349 if self._talkie_parent:
350 root, name_at_parent = self._talkie_parent
351 path.append(name_at_parent)
352 root.fire(path, value)
354 def set_parent(self, parent, name):
355 list.__setattr__(self, '_talkie_parent', (parent, name))
357 def unset_parent(self):
358 list.__delattr__(self, '_talkie_parent')
360 def append(self, element):
361 retval = list.append(self, element)
362 name = new_uid()
363 if isinstance(element, (Talkie, TalkieList)):
364 element.set_parent(self, name)
366 self.fire([], self)
367 return retval
369 def insert(self, index, element):
370 retval = list.insert(self, index, element)
371 name = new_uid()
372 if isinstance(element, (Talkie, TalkieList)):
373 element.set_parent(self, name)
375 self.fire([], self)
376 return retval
378 def remove(self, element):
379 list.remove(self, element)
380 if isinstance(element, (Talkie, TalkieList)):
381 element.unset_parent()
383 self.fire([], self)
385 def pop(self, index=-1):
386 element = list.pop(self, index)
387 if isinstance(element, (Talkie, TalkieList)):
388 element.unset_parent()
390 self.fire([], self)
391 return element
393 def extend(self, elements):
394 for element in elements:
395 self.append(element)
397 self.fire([], self)
399 def __setitem__(self, key, value):
400 try:
401 element = self[key]
402 if isinstance(element, (Talkie, TalkieList)):
403 element.unset_parent()
405 except IndexError:
406 pass
408 list.__setitem__(self, key, value)
409 self.fire([], self)
411 def __setslice__(self, *args, **kwargs):
412 raise Exception('not implemented')
414 def __iadd__(self, *args, **kwargs):
415 raise Exception('not implemented')
417 def __imul__(self, *args, **kwargs):
418 raise Exception('not implemented')
421for method_name in ['reverse', 'sort']:
423 def x():
424 list_meth = getattr(list, method_name)
426 def meth(self, *args, **kwargs):
427 retval = list_meth(self, *args, **kwargs)
428 self.fire([], self)
429 return retval
431 return meth
433 try:
434 setattr(TalkieList, method_name, x())
435 except AttributeError:
436 pass
439def drop_args_wrapper(f):
440 def f_drop_args(*args):
441 f()
443 return f_drop_args
446class TalkieConnectionOwner(object):
447 def __init__(self):
448 self._connections = []
450 def talkie_connect(self, state, path, listener, drop_args=False):
451 if drop_args:
452 listener = drop_args_wrapper(listener)
454 if not isinstance(path, str):
455 return [
456 self.talkie_connect(state, path_, listener, False)
457 for path_ in path]
459 connection = state.talkie_connect(path, listener)
460 self._connections.append(connection)
461 return connection
463 def talkie_disconnect_all(self):
464 while self._connections:
465 try:
466 self._connections.pop().release()
467 except Exception:
468 pass
471def none_or(f):
472 def g(x):
473 if x is None:
474 return ''
475 else:
476 return f(x)
478 return g
481def noop(x):
482 return x
485class TalkieStringer:
486 def __init__(self, root):
487 self._root = root
488 self._formatters = {
489 'date': none_or(lambda v: util.time_to_str(v, format='%Y-%m-%d')),
490 'datetime': none_or(lambda v: util.time_to_str(v))}
492 self._paths = []
494 def __getitem__(self, key):
496 stringer = self
498 class Proxy:
499 def __init__(self, val, formatter, path, talkie_root):
500 self._formatter = formatter
501 self._val = val
502 self._path = path
503 self._talkie_root = talkie_root
505 def register_path(self):
506 stringer._paths.append(
507 (self._talkie_root, '.'.join(self._path)))
509 def _computed(self):
510 return getattr(self._val, '_computed', ())
512 def __getattr__(self, key):
513 if key in self._val.T.propnames \
514 or key in self._computed():
516 self._path.append(key)
517 val = getattr(self._val, key)
518 pval = self._proxy(val)
519 if not isinstance(pval, (Proxy, ListProxy)):
520 self.register_path()
522 return pval
523 else:
524 return '{' + key + '}'
526 def __format__(self, *args, **kwargs):
527 self.register_path()
528 return self._val.__format__(*args, **kwargs)
530 def _proxy(self, val):
531 if isinstance(val, TalkieRoot):
532 return Proxy(
533 val, self._formatter, [], val)
535 elif isinstance(val, Object):
536 return Proxy(
537 val, self._formatter, self._path, self._talkie_root)
539 elif isinstance(val, list):
540 return ListProxy(
541 val, self._formatter, self._path, self._talkie_root)
543 else:
544 return self._formatter(val)
546 class ListProxy(Proxy):
547 def __getitem__(self, i):
548 self._path[-1] += '[%i]' % i
549 return self._proxy(self._val[i])
551 key = key.split('|', 1)
552 if len(key) == 2:
553 key, formatter = key[0], self._formatters[key[1]]
554 else:
555 key, formatter = key[0], noop
557 return getattr(Proxy(self._root, formatter, [], self._root), key)
559 def get_paths(self):
560 return self._paths