1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

73

74

75

76

77

78

79

80

81

82

83

84

85

86

87

88

89

90

91

92

93

94

95

96

97

98

99

100

101

102

103

104

105

106

107

108

109

110

111

112

113

114

115

116

117

118

119

120

121

122

123

124

125

126

127

128

129

130

131

132

133

134

135

136

137

138

139

140

141

142

143

144

145

146

147

148

149

150

151

152

153

154

155

156

157

158

159

160

161

162

163

164

165

166

167

168

169

170

171

172

173

174

175

176

177

178

179

180

181

182

183

184

185

186

187

188

189

190

191

192

# This module contains abstractions for the input stream. You don't have to 

# looks further, there are no pretty code. 

# 

# We define two classes here. 

# 

# Mark(source, line, column) 

# It's just a record and its only use is producing nice error messages. 

# Parser does not use it for any other purposes. 

# 

# Reader(source, data) 

# Reader determines the encoding of `data` and converts it to unicode. 

# Reader provides the following methods and attributes: 

# reader.peek(length=1) - return the next `length` characters 

# reader.forward(length=1) - move the current position to `length` characters. 

# reader.index - the number of the current character. 

# reader.line, stream.column - the line and the column of the current character. 

 

__all__ = ['Reader', 'ReaderError'] 

 

from .error import YAMLError, Mark 

 

import codecs, re 

 

class ReaderError(YAMLError): 

 

def __init__(self, name, position, character, encoding, reason): 

self.name = name 

self.character = character 

self.position = position 

self.encoding = encoding 

self.reason = reason 

 

def __str__(self): 

if isinstance(self.character, bytes): 

return "'%s' codec can't decode byte #x%02x: %s\n" \ 

" in \"%s\", position %d" \ 

% (self.encoding, ord(self.character), self.reason, 

self.name, self.position) 

else: 

return "unacceptable character #x%04x: %s\n" \ 

" in \"%s\", position %d" \ 

% (self.character, self.reason, 

self.name, self.position) 

 

class Reader(object): 

# Reader: 

# - determines the data encoding and converts it to a unicode string, 

# - checks if characters are in allowed range, 

# - adds '\0' to the end. 

 

# Reader accepts 

# - a `bytes` object, 

# - a `str` object, 

# - a file-like object with its `read` method returning `str`, 

# - a file-like object with its `read` method returning `unicode`. 

 

# Yeah, it's ugly and slow. 

 

def __init__(self, stream): 

self.name = None 

self.stream = None 

self.stream_pointer = 0 

self.eof = True 

self.buffer = '' 

self.pointer = 0 

self.raw_buffer = None 

self.raw_decode = None 

self.encoding = None 

self.index = 0 

self.line = 0 

self.column = 0 

if isinstance(stream, str): 

self.name = "<unicode string>" 

self.check_printable(stream) 

self.buffer = stream+'\0' 

elif isinstance(stream, bytes): 

self.name = "<byte string>" 

self.raw_buffer = stream 

self.determine_encoding() 

else: 

self.stream = stream 

self.name = getattr(stream, 'name', "<file>") 

self.eof = False 

self.raw_buffer = None 

self.determine_encoding() 

 

def peek(self, index=0): 

try: 

return self.buffer[self.pointer+index] 

except IndexError: 

self.update(index+1) 

return self.buffer[self.pointer+index] 

 

def prefix(self, length=1): 

if self.pointer+length >= len(self.buffer): 

self.update(length) 

return self.buffer[self.pointer:self.pointer+length] 

 

def forward(self, length=1): 

if self.pointer+length+1 >= len(self.buffer): 

self.update(length+1) 

while length: 

ch = self.buffer[self.pointer] 

self.pointer += 1 

self.index += 1 

if ch in '\n\x85\u2028\u2029' \ 

or (ch == '\r' and self.buffer[self.pointer] != '\n'): 

self.line += 1 

self.column = 0 

elif ch != '\uFEFF': 

self.column += 1 

length -= 1 

 

def get_mark(self): 

if self.stream is None: 

return Mark(self.name, self.index, self.line, self.column, 

self.buffer, self.pointer) 

else: 

return Mark(self.name, self.index, self.line, self.column, 

None, None) 

 

def determine_encoding(self): 

while not self.eof and (self.raw_buffer is None or len(self.raw_buffer) < 2): 

self.update_raw() 

if isinstance(self.raw_buffer, bytes): 

if self.raw_buffer.startswith(codecs.BOM_UTF16_LE): 

self.raw_decode = codecs.utf_16_le_decode 

self.encoding = 'utf-16-le' 

elif self.raw_buffer.startswith(codecs.BOM_UTF16_BE): 

self.raw_decode = codecs.utf_16_be_decode 

self.encoding = 'utf-16-be' 

else: 

self.raw_decode = codecs.utf_8_decode 

self.encoding = 'utf-8' 

self.update(1) 

 

NON_PRINTABLE = re.compile('[^\x09\x0A\x0D\x20-\x7E\x85\xA0-\uD7FF\uE000-\uFFFD\U00010000-\U0010ffff]') 

def check_printable(self, data): 

match = self.NON_PRINTABLE.search(data) 

if match: 

character = match.group() 

position = self.index+(len(self.buffer)-self.pointer)+match.start() 

raise ReaderError(self.name, position, ord(character), 

'unicode', "special characters are not allowed") 

 

def update(self, length): 

if self.raw_buffer is None: 

return 

self.buffer = self.buffer[self.pointer:] 

self.pointer = 0 

while len(self.buffer) < length: 

if not self.eof: 

self.update_raw() 

if self.raw_decode is not None: 

try: 

data, converted = self.raw_decode(self.raw_buffer, 

'strict', self.eof) 

except UnicodeDecodeError as exc: 

character = self.raw_buffer[exc.start] 

if self.stream is not None: 

position = self.stream_pointer-len(self.raw_buffer)+exc.start 

else: 

position = exc.start 

raise ReaderError(self.name, position, character, 

exc.encoding, exc.reason) 

else: 

data = self.raw_buffer 

converted = len(data) 

self.check_printable(data) 

self.buffer += data 

self.raw_buffer = self.raw_buffer[converted:] 

if self.eof: 

self.buffer += '\0' 

self.raw_buffer = None 

break 

 

def update_raw(self, size=4096): 

data = self.stream.read(size) 

if self.raw_buffer is None: 

self.raw_buffer = data 

else: 

self.raw_buffer += data 

self.stream_pointer += len(data) 

if not data: 

self.eof = True 

 

#try: 

# import psyco 

# psyco.bind(Reader) 

#except ImportError: 

# pass