1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

73

74

75

76

77

78

79

80

81

82

83

84

85

86

87

88

89

90

91

92

93

94

95

96

97

98

99

100

101

102

103

104

105

106

107

108

109

110

111

112

113

114

115

116

117

118

119

120

121

122

123

124

125

126

127

128

129

130

131

132

133

134

135

136

137

138

139

140

141

142

143

144

145

146

147

148

149

150

151

152

153

154

155

156

157

158

159

160

161

162

163

164

165

166

167

168

169

170

171

172

173

174

175

176

177

178

179

180

181

182

183

184

185

186

187

188

189

190

191

192

193

194

195

196

197

198

199

200

201

202

203

204

205

206

207

208

209

210

211

212

213

214

215

216

217

218

219

220

221

222

223

224

225

226

227

228

229

230

231

232

233

234

235

236

237

238

239

240

241

242

243

244

245

246

247

248

249

250

251

252

253

254

255

256

257

258

259

260

261

262

263

264

265

266

267

268

269

270

271

272

273

274

275

276

277

278

279

280

281

282

283

284

285

286

287

288

289

290

291

292

293

294

295

296

297

298

299

300

301

302

303

304

305

# -*- coding: utf-8 -*- 

 

""" 

requests.auth 

~~~~~~~~~~~~~ 

 

This module contains the authentication handlers for Requests. 

""" 

 

import os 

import re 

import time 

import hashlib 

import threading 

import warnings 

 

from base64 import b64encode 

 

from .compat import urlparse, str, basestring 

from .cookies import extract_cookies_to_jar 

from ._internal_utils import to_native_string 

from .utils import parse_dict_header 

 

CONTENT_TYPE_FORM_URLENCODED = 'application/x-www-form-urlencoded' 

CONTENT_TYPE_MULTI_PART = 'multipart/form-data' 

 

 

def _basic_auth_str(username, password): 

"""Returns a Basic Auth string.""" 

 

# "I want us to put a big-ol' comment on top of it that 

# says that this behaviour is dumb but we need to preserve 

# it because people are relying on it." 

# - Lukasa 

# 

# These are here solely to maintain backwards compatibility 

# for things like ints. This will be removed in 3.0.0. 

if not isinstance(username, basestring): 

warnings.warn( 

"Non-string usernames will no longer be supported in Requests " 

"3.0.0. Please convert the object you've passed in ({!r}) to " 

"a string or bytes object in the near future to avoid " 

"problems.".format(username), 

category=DeprecationWarning, 

) 

username = str(username) 

 

if not isinstance(password, basestring): 

warnings.warn( 

"Non-string passwords will no longer be supported in Requests " 

"3.0.0. Please convert the object you've passed in ({!r}) to " 

"a string or bytes object in the near future to avoid " 

"problems.".format(type(password)), 

category=DeprecationWarning, 

) 

password = str(password) 

# -- End Removal -- 

 

if isinstance(username, str): 

username = username.encode('latin1') 

 

if isinstance(password, str): 

password = password.encode('latin1') 

 

authstr = 'Basic ' + to_native_string( 

b64encode(b':'.join((username, password))).strip() 

) 

 

return authstr 

 

 

class AuthBase(object): 

"""Base class that all auth implementations derive from""" 

 

def __call__(self, r): 

raise NotImplementedError('Auth hooks must be callable.') 

 

 

class HTTPBasicAuth(AuthBase): 

"""Attaches HTTP Basic Authentication to the given Request object.""" 

 

def __init__(self, username, password): 

self.username = username 

self.password = password 

 

def __eq__(self, other): 

return all([ 

self.username == getattr(other, 'username', None), 

self.password == getattr(other, 'password', None) 

]) 

 

def __ne__(self, other): 

return not self == other 

 

def __call__(self, r): 

r.headers['Authorization'] = _basic_auth_str(self.username, self.password) 

return r 

 

 

class HTTPProxyAuth(HTTPBasicAuth): 

"""Attaches HTTP Proxy Authentication to a given Request object.""" 

 

def __call__(self, r): 

r.headers['Proxy-Authorization'] = _basic_auth_str(self.username, self.password) 

return r 

 

 

class HTTPDigestAuth(AuthBase): 

"""Attaches HTTP Digest Authentication to the given Request object.""" 

 

def __init__(self, username, password): 

self.username = username 

self.password = password 

# Keep state in per-thread local storage 

self._thread_local = threading.local() 

 

def init_per_thread_state(self): 

# Ensure state is initialized just once per-thread 

if not hasattr(self._thread_local, 'init'): 

self._thread_local.init = True 

self._thread_local.last_nonce = '' 

self._thread_local.nonce_count = 0 

self._thread_local.chal = {} 

self._thread_local.pos = None 

self._thread_local.num_401_calls = None 

 

def build_digest_header(self, method, url): 

""" 

:rtype: str 

""" 

 

realm = self._thread_local.chal['realm'] 

nonce = self._thread_local.chal['nonce'] 

qop = self._thread_local.chal.get('qop') 

algorithm = self._thread_local.chal.get('algorithm') 

opaque = self._thread_local.chal.get('opaque') 

hash_utf8 = None 

 

if algorithm is None: 

_algorithm = 'MD5' 

else: 

_algorithm = algorithm.upper() 

# lambdas assume digest modules are imported at the top level 

if _algorithm == 'MD5' or _algorithm == 'MD5-SESS': 

def md5_utf8(x): 

if isinstance(x, str): 

x = x.encode('utf-8') 

return hashlib.md5(x).hexdigest() 

hash_utf8 = md5_utf8 

elif _algorithm == 'SHA': 

def sha_utf8(x): 

if isinstance(x, str): 

x = x.encode('utf-8') 

return hashlib.sha1(x).hexdigest() 

hash_utf8 = sha_utf8 

elif _algorithm == 'SHA-256': 

def sha256_utf8(x): 

if isinstance(x, str): 

x = x.encode('utf-8') 

return hashlib.sha256(x).hexdigest() 

hash_utf8 = sha256_utf8 

elif _algorithm == 'SHA-512': 

def sha512_utf8(x): 

if isinstance(x, str): 

x = x.encode('utf-8') 

return hashlib.sha512(x).hexdigest() 

hash_utf8 = sha512_utf8 

 

KD = lambda s, d: hash_utf8("%s:%s" % (s, d)) 

 

if hash_utf8 is None: 

return None 

 

# XXX not implemented yet 

entdig = None 

p_parsed = urlparse(url) 

#: path is request-uri defined in RFC 2616 which should not be empty 

path = p_parsed.path or "/" 

if p_parsed.query: 

path += '?' + p_parsed.query 

 

A1 = '%s:%s:%s' % (self.username, realm, self.password) 

A2 = '%s:%s' % (method, path) 

 

HA1 = hash_utf8(A1) 

HA2 = hash_utf8(A2) 

 

if nonce == self._thread_local.last_nonce: 

self._thread_local.nonce_count += 1 

else: 

self._thread_local.nonce_count = 1 

ncvalue = '%08x' % self._thread_local.nonce_count 

s = str(self._thread_local.nonce_count).encode('utf-8') 

s += nonce.encode('utf-8') 

s += time.ctime().encode('utf-8') 

s += os.urandom(8) 

 

cnonce = (hashlib.sha1(s).hexdigest()[:16]) 

if _algorithm == 'MD5-SESS': 

HA1 = hash_utf8('%s:%s:%s' % (HA1, nonce, cnonce)) 

 

if not qop: 

respdig = KD(HA1, "%s:%s" % (nonce, HA2)) 

elif qop == 'auth' or 'auth' in qop.split(','): 

noncebit = "%s:%s:%s:%s:%s" % ( 

nonce, ncvalue, cnonce, 'auth', HA2 

) 

respdig = KD(HA1, noncebit) 

else: 

# XXX handle auth-int. 

return None 

 

self._thread_local.last_nonce = nonce 

 

# XXX should the partial digests be encoded too? 

base = 'username="%s", realm="%s", nonce="%s", uri="%s", ' \ 

'response="%s"' % (self.username, realm, nonce, path, respdig) 

if opaque: 

base += ', opaque="%s"' % opaque 

if algorithm: 

base += ', algorithm="%s"' % algorithm 

if entdig: 

base += ', digest="%s"' % entdig 

if qop: 

base += ', qop="auth", nc=%s, cnonce="%s"' % (ncvalue, cnonce) 

 

return 'Digest %s' % (base) 

 

def handle_redirect(self, r, **kwargs): 

"""Reset num_401_calls counter on redirects.""" 

if r.is_redirect: 

self._thread_local.num_401_calls = 1 

 

def handle_401(self, r, **kwargs): 

""" 

Takes the given response and tries digest-auth, if needed. 

 

:rtype: requests.Response 

""" 

 

# If response is not 4xx, do not auth 

# See https://github.com/psf/requests/issues/3772 

if not 400 <= r.status_code < 500: 

self._thread_local.num_401_calls = 1 

return r 

 

if self._thread_local.pos is not None: 

# Rewind the file position indicator of the body to where 

# it was to resend the request. 

r.request.body.seek(self._thread_local.pos) 

s_auth = r.headers.get('www-authenticate', '') 

 

if 'digest' in s_auth.lower() and self._thread_local.num_401_calls < 2: 

 

self._thread_local.num_401_calls += 1 

pat = re.compile(r'digest ', flags=re.IGNORECASE) 

self._thread_local.chal = parse_dict_header(pat.sub('', s_auth, count=1)) 

 

# Consume content and release the original connection 

# to allow our new request to reuse the same one. 

r.content 

r.close() 

prep = r.request.copy() 

extract_cookies_to_jar(prep._cookies, r.request, r.raw) 

prep.prepare_cookies(prep._cookies) 

 

prep.headers['Authorization'] = self.build_digest_header( 

prep.method, prep.url) 

_r = r.connection.send(prep, **kwargs) 

_r.history.append(r) 

_r.request = prep 

 

return _r 

 

self._thread_local.num_401_calls = 1 

return r 

 

def __call__(self, r): 

# Initialize per-thread state, if needed 

self.init_per_thread_state() 

# If we have a saved nonce, skip the 401 

if self._thread_local.last_nonce: 

r.headers['Authorization'] = self.build_digest_header(r.method, r.url) 

try: 

self._thread_local.pos = r.body.tell() 

except AttributeError: 

# In the case of HTTPDigestAuth being reused and the body of 

# the previous request was a file-like object, pos has the 

# file position of the previous body. Ensure it's set to 

# None. 

self._thread_local.pos = None 

r.register_hook('response', self.handle_401) 

r.register_hook('response', self.handle_redirect) 

self._thread_local.num_401_calls = 1 

 

return r 

 

def __eq__(self, other): 

return all([ 

self.username == getattr(other, 'username', None), 

self.password == getattr(other, 'password', None) 

]) 

 

def __ne__(self, other): 

return not self == other