1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

73

74

75

76

77

78

79

80

81

82

83

84

85

86

87

88

89

90

91

92

93

94

95

96

97

98

99

100

101

102

103

104

105

106

107

108

109

110

111

112

113

114

115

116

117

118

119

120

121

122

123

124

125

126

127

128

129

130

131

132

133

134

135

136

137

138

139

 

__all__ = ['Composer', 'ComposerError'] 

 

from .error import MarkedYAMLError 

from .events import * 

from .nodes import * 

 

class ComposerError(MarkedYAMLError): 

pass 

 

class Composer: 

 

def __init__(self): 

self.anchors = {} 

 

def check_node(self): 

# Drop the STREAM-START event. 

if self.check_event(StreamStartEvent): 

self.get_event() 

 

# If there are more documents available? 

return not self.check_event(StreamEndEvent) 

 

def get_node(self): 

# Get the root node of the next document. 

if not self.check_event(StreamEndEvent): 

return self.compose_document() 

 

def get_single_node(self): 

# Drop the STREAM-START event. 

self.get_event() 

 

# Compose a document if the stream is not empty. 

document = None 

if not self.check_event(StreamEndEvent): 

document = self.compose_document() 

 

# Ensure that the stream contains no more documents. 

if not self.check_event(StreamEndEvent): 

event = self.get_event() 

raise ComposerError("expected a single document in the stream", 

document.start_mark, "but found another document", 

event.start_mark) 

 

# Drop the STREAM-END event. 

self.get_event() 

 

return document 

 

def compose_document(self): 

# Drop the DOCUMENT-START event. 

self.get_event() 

 

# Compose the root node. 

node = self.compose_node(None, None) 

 

# Drop the DOCUMENT-END event. 

self.get_event() 

 

self.anchors = {} 

return node 

 

def compose_node(self, parent, index): 

if self.check_event(AliasEvent): 

event = self.get_event() 

anchor = event.anchor 

if anchor not in self.anchors: 

raise ComposerError(None, None, "found undefined alias %r" 

% anchor, event.start_mark) 

return self.anchors[anchor] 

event = self.peek_event() 

anchor = event.anchor 

if anchor is not None: 

if anchor in self.anchors: 

raise ComposerError("found duplicate anchor %r; first occurence" 

% anchor, self.anchors[anchor].start_mark, 

"second occurence", event.start_mark) 

self.descend_resolver(parent, index) 

if self.check_event(ScalarEvent): 

node = self.compose_scalar_node(anchor) 

elif self.check_event(SequenceStartEvent): 

node = self.compose_sequence_node(anchor) 

elif self.check_event(MappingStartEvent): 

node = self.compose_mapping_node(anchor) 

self.ascend_resolver() 

return node 

 

def compose_scalar_node(self, anchor): 

event = self.get_event() 

tag = event.tag 

if tag is None or tag == '!': 

tag = self.resolve(ScalarNode, event.value, event.implicit) 

node = ScalarNode(tag, event.value, 

event.start_mark, event.end_mark, style=event.style) 

if anchor is not None: 

self.anchors[anchor] = node 

return node 

 

def compose_sequence_node(self, anchor): 

start_event = self.get_event() 

tag = start_event.tag 

if tag is None or tag == '!': 

tag = self.resolve(SequenceNode, None, start_event.implicit) 

node = SequenceNode(tag, [], 

start_event.start_mark, None, 

flow_style=start_event.flow_style) 

if anchor is not None: 

self.anchors[anchor] = node 

index = 0 

while not self.check_event(SequenceEndEvent): 

node.value.append(self.compose_node(node, index)) 

index += 1 

end_event = self.get_event() 

node.end_mark = end_event.end_mark 

return node 

 

def compose_mapping_node(self, anchor): 

start_event = self.get_event() 

tag = start_event.tag 

if tag is None or tag == '!': 

tag = self.resolve(MappingNode, None, start_event.implicit) 

node = MappingNode(tag, [], 

start_event.start_mark, None, 

flow_style=start_event.flow_style) 

if anchor is not None: 

self.anchors[anchor] = node 

while not self.check_event(MappingEndEvent): 

#key_event = self.peek_event() 

item_key = self.compose_node(node, None) 

#if item_key in node.value: 

# raise ComposerError("while composing a mapping", start_event.start_mark, 

# "found duplicate key", key_event.start_mark) 

item_value = self.compose_node(node, item_key) 

#node.value[item_key] = item_value 

node.value.append((item_key, item_value)) 

end_event = self.get_event() 

node.end_mark = end_event.end_mark 

return node