Coverage for /usr/local/lib/python3.11/dist-packages/pyrocko/guts_agnostic.py: 0%

116 statements  

« prev     ^ index     » next       coverage.py v6.5.0, created at 2024-01-02 13:30 +0000

1# http://pyrocko.org - GPLv3 

2# 

3# The Pyrocko Developers, 21st Century 

4# ---|P------/S----------~Lg---------- 

5 

6''' 

7Data-model free processing of :py:mod:`pyrocko.guts` generated YAML files. 

8''' 

9 

10import copy 

11 

12try: 

13 from yaml import CSafeLoader as SafeLoader, CSafeDumper as SafeDumper 

14except ImportError: 

15 from yaml import SafeLoader, SafeDumper 

16 

17from pyrocko import guts 

18 

19 

20class AgnosticSafeLoader(SafeLoader): 

21 pass 

22 

23 

24class AgnosticSafeDumper(SafeDumper): 

25 pass 

26 

27 

28class xstr(str): 

29 pass 

30 

31 

32def to_xstr(x): 

33 if isinstance(x, str): 

34 return xstr(x) 

35 elif isinstance(x, list): 

36 return [to_xstr(e) for e in x] 

37 elif isinstance(x, dict): 

38 return dict((k, to_xstr(v)) for (k, v) in x.items()) 

39 else: 

40 return x 

41 

42 

43def quoted_presenter(dumper, data): 

44 return dumper.represent_scalar( 

45 'tag:yaml.org,2002:str', str(data), style="'") 

46 

47 

48AgnosticSafeDumper.add_representer(xstr, quoted_presenter) 

49 

50 

51class Object(object): 

52 

53 def __init__(self, tagname, inamevals): 

54 self._tagname = tagname 

55 self._data = [] 

56 for kv in inamevals: 

57 self._data.append(list(kv)) 

58 

59 def inamevals_to_save(self): 

60 for k, v in self._data: 

61 yield (k, to_xstr(v)) 

62 

63 def inamevals(self): 

64 for k, v in self._data: 

65 yield (k, v) 

66 

67 def __iter__(self): 

68 for k, _ in self._data: 

69 yield k 

70 

71 def rename_attribute(self, old, new): 

72 for kv in self._data: 

73 if kv[0] == old: 

74 kv[0] = new 

75 

76 def drop_attribute(self, k): 

77 self._data = [kv for kv in self._data if kv[0] != k] 

78 

79 def replace(self, other): 

80 self._tagname = other._tagname 

81 self._data = copy.deepcopy(other._data) 

82 

83 def __setitem__(self, k, v): 

84 for kv in self._data: 

85 if kv[0] == k: 

86 kv[1] = v 

87 return 

88 

89 self._data.append([k, v]) 

90 

91 def __getitem__(self, item): 

92 for kv in self._data: 

93 if kv[0] == item: 

94 return kv[1] 

95 

96 raise KeyError(item) 

97 

98 def get(self, *args): 

99 if len(args) == 1: 

100 return self.__getitem__(args[0]) 

101 else: 

102 try: 

103 return self.__getitem__(args[0]) 

104 except KeyError: 

105 return args[1] 

106 

107 

108def multi_representer(dumper, data): 

109 node = dumper.represent_mapping( 

110 '!'+data._tagname, data.inamevals_to_save(), flow_style=False) 

111 

112 return node 

113 

114 

115def multi_constructor(loader, tag_suffix, node): 

116 tagname = str(tag_suffix) 

117 

118 o = Object(tagname, loader.construct_pairs(node, deep=True)) 

119 return o 

120 

121 

122AgnosticSafeDumper.add_multi_representer(Object, multi_representer) 

123AgnosticSafeLoader.add_multi_constructor('!', multi_constructor) 

124 

125 

126@guts.expand_stream_args('w') 

127def dump(*args, **kwargs): 

128 return guts._dump(Dumper=AgnosticSafeDumper, *args, **kwargs) 

129 

130 

131@guts.expand_stream_args('r') 

132def load(*args, **kwargs): 

133 return guts._load(Loader=AgnosticSafeLoader, *args, **kwargs) 

134 

135 

136def load_string(s, *args, **kwargs): 

137 return load(string=s, *args, **kwargs) 

138 

139 

140@guts.expand_stream_args('w') 

141def dump_all(*args, **kwargs): 

142 return guts._dump_all(Dumper=AgnosticSafeDumper, *args, **kwargs) 

143 

144 

145@guts.expand_stream_args('r') 

146def load_all(*args, **kwargs): 

147 return guts._load_all(Loader=AgnosticSafeLoader, *args, **kwargs) 

148 

149 

150@guts.expand_stream_args('r') 

151def iload_all(*args, **kwargs): 

152 return guts._iload_all(Loader=AgnosticSafeLoader, *args, **kwargs) 

153 

154 

155def walk(x, path=()): 

156 yield path, x 

157 

158 if isinstance(x, Object): 

159 for (name, val) in x.inamevals(): 

160 if isinstance(val, (list, tuple)): 

161 for iele, ele in enumerate(val): 

162 for y in walk(ele, path=path + ((name, iele),)): 

163 yield y 

164 elif isinstance(val, dict): 

165 for ele_k, ele_v in val.items(): 

166 for y in walk(ele_v, path=path + ((name, ele_k),)): 

167 yield y 

168 else: 

169 for y in walk(val, path=path+(name,)): 

170 yield y 

171 

172 

173def apply_tree(x, func, path=()): 

174 if isinstance(x, Object): 

175 for (name, val) in x.inamevals(): 

176 if isinstance(val, (list, tuple)): 

177 for iele, ele in enumerate(val): 

178 apply_tree(ele, func, path=path + ((name, iele),)) 

179 elif isinstance(val, dict): 

180 for ele_k, ele_v in val.items(): 

181 apply_tree(ele_v, func, path=path + ((name, ele_k),)) 

182 else: 

183 apply_tree(val, func, path=path+(name,)) 

184 

185 func(path, x)