Coverage for /usr/local/lib/python3.11/dist-packages/pyrocko/guts_agnostic.py: 0%
116 statements
« prev ^ index » next coverage.py v6.5.0, created at 2023-10-06 15:01 +0000
« prev ^ index » next coverage.py v6.5.0, created at 2023-10-06 15:01 +0000
1# http://pyrocko.org - GPLv3
2#
3# The Pyrocko Developers, 21st Century
4# ---|P------/S----------~Lg----------
6'''
7Data-model free processing of :py:mod:`pyrocko.guts` generated YAML files.
8'''
10import copy
12try:
13 from yaml import CSafeLoader as SafeLoader, CSafeDumper as SafeDumper
14except ImportError:
15 from yaml import SafeLoader, SafeDumper
17from pyrocko import guts
20class AgnosticSafeLoader(SafeLoader):
21 pass
24class AgnosticSafeDumper(SafeDumper):
25 pass
28class xstr(str):
29 pass
32def to_xstr(x):
33 if isinstance(x, str):
34 return xstr(x)
35 elif isinstance(x, list):
36 return [to_xstr(e) for e in x]
37 elif isinstance(x, dict):
38 return dict((k, to_xstr(v)) for (k, v) in x.items())
39 else:
40 return x
43def quoted_presenter(dumper, data):
44 return dumper.represent_scalar(
45 'tag:yaml.org,2002:str', str(data), style="'")
48AgnosticSafeDumper.add_representer(xstr, quoted_presenter)
51class Object(object):
53 def __init__(self, tagname, inamevals):
54 self._tagname = tagname
55 self._data = []
56 for kv in inamevals:
57 self._data.append(list(kv))
59 def inamevals_to_save(self):
60 for k, v in self._data:
61 yield (k, to_xstr(v))
63 def inamevals(self):
64 for k, v in self._data:
65 yield (k, v)
67 def __iter__(self):
68 for k, _ in self._data:
69 yield k
71 def rename_attribute(self, old, new):
72 for kv in self._data:
73 if kv[0] == old:
74 kv[0] = new
76 def drop_attribute(self, k):
77 self._data = [kv for kv in self._data if kv[0] != k]
79 def replace(self, other):
80 self._tagname = other._tagname
81 self._data = copy.deepcopy(other._data)
83 def __setitem__(self, k, v):
84 for kv in self._data:
85 if kv[0] == k:
86 kv[1] = v
87 return
89 self._data.append([k, v])
91 def __getitem__(self, item):
92 for kv in self._data:
93 if kv[0] == item:
94 return kv[1]
96 raise KeyError(item)
98 def get(self, *args):
99 if len(args) == 1:
100 return self.__getitem__(args[0])
101 else:
102 try:
103 return self.__getitem__(args[0])
104 except KeyError:
105 return args[1]
108def multi_representer(dumper, data):
109 node = dumper.represent_mapping(
110 '!'+data._tagname, data.inamevals_to_save(), flow_style=False)
112 return node
115def multi_constructor(loader, tag_suffix, node):
116 tagname = str(tag_suffix)
118 o = Object(tagname, loader.construct_pairs(node, deep=True))
119 return o
122AgnosticSafeDumper.add_multi_representer(Object, multi_representer)
123AgnosticSafeLoader.add_multi_constructor('!', multi_constructor)
126@guts.expand_stream_args('w')
127def dump(*args, **kwargs):
128 return guts._dump(Dumper=AgnosticSafeDumper, *args, **kwargs)
131@guts.expand_stream_args('r')
132def load(*args, **kwargs):
133 return guts._load(Loader=AgnosticSafeLoader, *args, **kwargs)
136def load_string(s, *args, **kwargs):
137 return load(string=s, *args, **kwargs)
140@guts.expand_stream_args('w')
141def dump_all(*args, **kwargs):
142 return guts._dump_all(Dumper=AgnosticSafeDumper, *args, **kwargs)
145@guts.expand_stream_args('r')
146def load_all(*args, **kwargs):
147 return guts._load_all(Loader=AgnosticSafeLoader, *args, **kwargs)
150@guts.expand_stream_args('r')
151def iload_all(*args, **kwargs):
152 return guts._iload_all(Loader=AgnosticSafeLoader, *args, **kwargs)
155def walk(x, path=()):
156 yield path, x
158 if isinstance(x, Object):
159 for (name, val) in x.inamevals():
160 if isinstance(val, (list, tuple)):
161 for iele, ele in enumerate(val):
162 for y in walk(ele, path=path + ((name, iele),)):
163 yield y
164 elif isinstance(val, dict):
165 for ele_k, ele_v in val.items():
166 for y in walk(ele_v, path=path + ((name, ele_k),)):
167 yield y
168 else:
169 for y in walk(val, path=path+(name,)):
170 yield y
173def apply_tree(x, func, path=()):
174 if isinstance(x, Object):
175 for (name, val) in x.inamevals():
176 if isinstance(val, (list, tuple)):
177 for iele, ele in enumerate(val):
178 apply_tree(ele, func, path=path + ((name, iele),))
179 elif isinstance(val, dict):
180 for ele_k, ele_v in val.items():
181 apply_tree(ele_v, func, path=path + ((name, ele_k),))
182 else:
183 apply_tree(val, func, path=path+(name,))
185 func(path, x)