1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

73

74

75

76

77

78

79

80

81

82

83

84

85

86

87

88

89

90

91

92

93

94

95

96

97

98

99

100

101

102

103

104

105

106

107

108

109

110

111

112

113

114

115

116

117

118

119

120

121

122

123

124

125

126

127

128

129

130

131

132

133

134

135

136

137

138

139

140

141

142

143

144

145

146

147

148

149

150

151

152

153

154

155

156

157

158

159

160

161

162

163

164

165

166

167

168

169

170

171

172

173

174

175

176

177

178

179

180

181

182

183

184

185

186

187

188

189

190

191

192

193

194

195

196

197

198

199

200

201

202

203

204

205

206

207

208

209

210

211

212

213

214

215

216

217

218

219

220

221

222

223

224

225

226

227

228

229

230

231

232

233

234

235

236

237

238

239

240

241

242

243

244

245

""" 

This plugin captures logging statements issued during test execution. When an 

error or failure occurs, the captured log messages are attached to the running 

test in the test.capturedLogging attribute, and displayed with the error failure 

output. It is enabled by default but can be turned off with the option 

``--nologcapture``. 

 

You can filter captured logging statements with the ``--logging-filter`` option.  

If set, it specifies which logger(s) will be captured; loggers that do not match 

will be passed. Example: specifying ``--logging-filter=sqlalchemy,myapp`` 

will ensure that only statements logged via sqlalchemy.engine, myapp 

or myapp.foo.bar logger will be logged. 

 

You can remove other installed logging handlers with the 

``--logging-clear-handlers`` option. 

""" 

 

import logging 

from logging import Handler 

import threading 

 

from nose.plugins.base import Plugin 

from nose.util import anyp, ln, safe_str 

 

try: 

from io import StringIO 

except ImportError: 

from io import StringIO 

 

log = logging.getLogger(__name__) 

 

class FilterSet(object): 

def __init__(self, filter_components): 

self.inclusive, self.exclusive = self._partition(filter_components) 

 

# @staticmethod 

def _partition(components): 

inclusive, exclusive = [], [] 

for component in components: 

if component.startswith('-'): 

exclusive.append(component[1:]) 

else: 

inclusive.append(component) 

return inclusive, exclusive 

_partition = staticmethod(_partition) 

 

def allow(self, record): 

"""returns whether this record should be printed""" 

if not self: 

# nothing to filter 

return True 

return self._allow(record) and not self._deny(record) 

 

# @staticmethod 

def _any_match(matchers, record): 

"""return the bool of whether `record` starts with 

any item in `matchers`""" 

def record_matches_key(key): 

return record == key or record.startswith(key + '.') 

return anyp(bool, list(map(record_matches_key, matchers))) 

_any_match = staticmethod(_any_match) 

 

def _allow(self, record): 

if not self.inclusive: 

return True 

return self._any_match(self.inclusive, record) 

 

def _deny(self, record): 

if not self.exclusive: 

return False 

return self._any_match(self.exclusive, record) 

 

 

class MyMemoryHandler(Handler): 

def __init__(self, logformat, logdatefmt, filters): 

Handler.__init__(self) 

fmt = logging.Formatter(logformat, logdatefmt) 

self.setFormatter(fmt) 

self.filterset = FilterSet(filters) 

self.buffer = [] 

def emit(self, record): 

self.buffer.append(self.format(record)) 

def flush(self): 

pass # do nothing 

def truncate(self): 

self.buffer = [] 

def filter(self, record): 

if self.filterset.allow(record.name): 

return Handler.filter(self, record) 

def __getstate__(self): 

state = self.__dict__.copy() 

del state['lock'] 

return state 

def __setstate__(self, state): 

self.__dict__.update(state) 

self.lock = threading.RLock() 

 

 

class LogCapture(Plugin): 

""" 

Log capture plugin. Enabled by default. Disable with --nologcapture. 

This plugin captures logging statements issued during test execution, 

appending any output captured to the error or failure output, 

should the test fail or raise an error. 

""" 

enabled = True 

env_opt = 'NOSE_NOLOGCAPTURE' 

name = 'logcapture' 

score = 500 

logformat = '%(name)s: %(levelname)s: %(message)s' 

logdatefmt = None 

clear = False 

filters = ['-nose'] 

 

def options(self, parser, env): 

"""Register commandline options. 

""" 

parser.add_option( 

"--nologcapture", action="store_false", 

default=not env.get(self.env_opt), dest="logcapture", 

help="Disable logging capture plugin. " 

"Logging configuration will be left intact." 

" [NOSE_NOLOGCAPTURE]") 

parser.add_option( 

"--logging-format", action="store", dest="logcapture_format", 

default=env.get('NOSE_LOGFORMAT') or self.logformat, 

metavar="FORMAT", 

help="Specify custom format to print statements. " 

"Uses the same format as used by standard logging handlers." 

" [NOSE_LOGFORMAT]") 

parser.add_option( 

"--logging-datefmt", action="store", dest="logcapture_datefmt", 

default=env.get('NOSE_LOGDATEFMT') or self.logdatefmt, 

metavar="FORMAT", 

help="Specify custom date/time format to print statements. " 

"Uses the same format as used by standard logging handlers." 

" [NOSE_LOGDATEFMT]") 

parser.add_option( 

"--logging-filter", action="store", dest="logcapture_filters", 

default=env.get('NOSE_LOGFILTER'), 

metavar="FILTER", 

help="Specify which statements to filter in/out. " 

"By default, everything is captured. If the output is too" 

" verbose,\nuse this option to filter out needless output.\n" 

"Example: filter=foo will capture statements issued ONLY to\n" 

" foo or foo.what.ever.sub but not foobar or other logger.\n" 

"Specify multiple loggers with comma: filter=foo,bar,baz.\n" 

"If any logger name is prefixed with a minus, eg filter=-foo,\n" 

"it will be excluded rather than included. Default: " 

"exclude logging messages from nose itself (-nose)." 

" [NOSE_LOGFILTER]\n") 

parser.add_option( 

"--logging-clear-handlers", action="store_true", 

default=False, dest="logcapture_clear", 

help="Clear all other logging handlers") 

parser.add_option( 

"--logging-level", action="store", 

default='NOTSET', dest="logcapture_level", 

help="Set the log level to capture") 

 

def configure(self, options, conf): 

"""Configure plugin. 

""" 

self.conf = conf 

# Disable if explicitly disabled, or if logging is 

# configured via logging config file 

if not options.logcapture or conf.loggingConfig: 

self.enabled = False 

self.logformat = options.logcapture_format 

self.logdatefmt = options.logcapture_datefmt 

self.clear = options.logcapture_clear 

self.loglevel = options.logcapture_level 

if options.logcapture_filters: 

self.filters = options.logcapture_filters.split(',') 

 

def setupLoghandler(self): 

# setup our handler with root logger 

root_logger = logging.getLogger() 

if self.clear: 

if hasattr(root_logger, "handlers"): 

for handler in root_logger.handlers: 

root_logger.removeHandler(handler) 

for logger in list(logging.Logger.manager.loggerDict.values()): 

if hasattr(logger, "handlers"): 

for handler in logger.handlers: 

logger.removeHandler(handler) 

# make sure there isn't one already 

# you can't simply use "if self.handler not in root_logger.handlers" 

# since at least in unit tests this doesn't work -- 

# LogCapture() is instantiated for each test case while root_logger 

# is module global 

# so we always add new MyMemoryHandler instance 

for handler in root_logger.handlers[:]: 

if isinstance(handler, MyMemoryHandler): 

root_logger.handlers.remove(handler) 

root_logger.addHandler(self.handler) 

# to make sure everything gets captured 

loglevel = getattr(self, "loglevel", "NOTSET") 

root_logger.setLevel(getattr(logging, loglevel)) 

 

def begin(self): 

"""Set up logging handler before test run begins. 

""" 

self.start() 

 

def start(self): 

self.handler = MyMemoryHandler(self.logformat, self.logdatefmt, 

self.filters) 

self.setupLoghandler() 

 

def end(self): 

pass 

 

def beforeTest(self, test): 

"""Clear buffers and handlers before test. 

""" 

self.setupLoghandler() 

 

def afterTest(self, test): 

"""Clear buffers after test. 

""" 

self.handler.truncate() 

 

def formatFailure(self, test, err): 

"""Add captured log messages to failure output. 

""" 

return self.formatError(test, err) 

 

def formatError(self, test, err): 

"""Add captured log messages to error output. 

""" 

# logic flow copied from Capture.formatError 

test.capturedLogging = records = self.formatLogRecords() 

if not records: 

return err 

ec, ev, tb = err 

return (ec, self.addCaptureToErr(ev, records), tb) 

 

def formatLogRecords(self): 

return list(map(safe_str, self.handler.buffer)) 

 

def addCaptureToErr(self, ev, records): 

return '\n'.join([safe_str(ev), ln('>> begin captured logging <<')] + \ 

records + \ 

[ln('>> end captured logging <<')])