# -*- coding: utf-8 -*-
requests.auth ~~~~~~~~~~~~~
This module contains the authentication handlers for Requests. """
"""Returns a Basic Auth string."""
# "I want us to put a big-ol' comment on top of it that # says that this behaviour is dumb but we need to preserve # it because people are relying on it." # - Lukasa # # These are here solely to maintain backwards compatibility # for things like ints. This will be removed in 3.0.0. if not isinstance(username, basestring): warnings.warn( "Non-string usernames will no longer be supported in Requests " "3.0.0. Please convert the object you've passed in ({!r}) to " "a string or bytes object in the near future to avoid " "problems.".format(username), category=DeprecationWarning, ) username = str(username)
if not isinstance(password, basestring): warnings.warn( "Non-string passwords will no longer be supported in Requests " "3.0.0. Please convert the object you've passed in ({!r}) to " "a string or bytes object in the near future to avoid " "problems.".format(type(password)), category=DeprecationWarning, ) password = str(password) # -- End Removal --
if isinstance(username, str): username = username.encode('latin1')
if isinstance(password, str): password = password.encode('latin1')
authstr = 'Basic ' + to_native_string( b64encode(b':'.join((username, password))).strip() )
return authstr
"""Base class that all auth implementations derive from"""
raise NotImplementedError('Auth hooks must be callable.')
"""Attaches HTTP Basic Authentication to the given Request object."""
self.username = username self.password = password
return all([ self.username == getattr(other, 'username', None), self.password == getattr(other, 'password', None) ])
return not self == other
r.headers['Authorization'] = _basic_auth_str(self.username, self.password) return r
"""Attaches HTTP Proxy Authentication to a given Request object."""
r.headers['Proxy-Authorization'] = _basic_auth_str(self.username, self.password) return r
"""Attaches HTTP Digest Authentication to the given Request object."""
self.username = username self.password = password # Keep state in per-thread local storage self._thread_local = threading.local()
# Ensure state is initialized just once per-thread if not hasattr(self._thread_local, 'init'): self._thread_local.init = True self._thread_local.last_nonce = '' self._thread_local.nonce_count = 0 self._thread_local.chal = {} self._thread_local.pos = None self._thread_local.num_401_calls = None
""" :rtype: str """
realm = self._thread_local.chal['realm'] nonce = self._thread_local.chal['nonce'] qop = self._thread_local.chal.get('qop') algorithm = self._thread_local.chal.get('algorithm') opaque = self._thread_local.chal.get('opaque') hash_utf8 = None
if algorithm is None: _algorithm = 'MD5' else: _algorithm = algorithm.upper() # lambdas assume digest modules are imported at the top level if _algorithm == 'MD5' or _algorithm == 'MD5-SESS': def md5_utf8(x): if isinstance(x, str): x = x.encode('utf-8') return hashlib.md5(x).hexdigest() hash_utf8 = md5_utf8 elif _algorithm == 'SHA': def sha_utf8(x): if isinstance(x, str): x = x.encode('utf-8') return hashlib.sha1(x).hexdigest() hash_utf8 = sha_utf8 elif _algorithm == 'SHA-256': def sha256_utf8(x): if isinstance(x, str): x = x.encode('utf-8') return hashlib.sha256(x).hexdigest() hash_utf8 = sha256_utf8 elif _algorithm == 'SHA-512': def sha512_utf8(x): if isinstance(x, str): x = x.encode('utf-8') return hashlib.sha512(x).hexdigest() hash_utf8 = sha512_utf8
KD = lambda s, d: hash_utf8("%s:%s" % (s, d))
if hash_utf8 is None: return None
# XXX not implemented yet entdig = None p_parsed = urlparse(url) #: path is request-uri defined in RFC 2616 which should not be empty path = p_parsed.path or "/" if p_parsed.query: path += '?' + p_parsed.query
A1 = '%s:%s:%s' % (self.username, realm, self.password) A2 = '%s:%s' % (method, path)
HA1 = hash_utf8(A1) HA2 = hash_utf8(A2)
if nonce == self._thread_local.last_nonce: self._thread_local.nonce_count += 1 else: self._thread_local.nonce_count = 1 ncvalue = '%08x' % self._thread_local.nonce_count s = str(self._thread_local.nonce_count).encode('utf-8') s += nonce.encode('utf-8') s += time.ctime().encode('utf-8') s += os.urandom(8)
cnonce = (hashlib.sha1(s).hexdigest()[:16]) if _algorithm == 'MD5-SESS': HA1 = hash_utf8('%s:%s:%s' % (HA1, nonce, cnonce))
if not qop: respdig = KD(HA1, "%s:%s" % (nonce, HA2)) elif qop == 'auth' or 'auth' in qop.split(','): noncebit = "%s:%s:%s:%s:%s" % ( nonce, ncvalue, cnonce, 'auth', HA2 ) respdig = KD(HA1, noncebit) else: # XXX handle auth-int. return None
self._thread_local.last_nonce = nonce
# XXX should the partial digests be encoded too? base = 'username="%s", realm="%s", nonce="%s", uri="%s", ' \ 'response="%s"' % (self.username, realm, nonce, path, respdig) if opaque: base += ', opaque="%s"' % opaque if algorithm: base += ', algorithm="%s"' % algorithm if entdig: base += ', digest="%s"' % entdig if qop: base += ', qop="auth", nc=%s, cnonce="%s"' % (ncvalue, cnonce)
return 'Digest %s' % (base)
"""Reset num_401_calls counter on redirects.""" if r.is_redirect: self._thread_local.num_401_calls = 1
""" Takes the given response and tries digest-auth, if needed.
:rtype: requests.Response """
# If response is not 4xx, do not auth # See https://github.com/psf/requests/issues/3772 if not 400 <= r.status_code < 500: self._thread_local.num_401_calls = 1 return r
if self._thread_local.pos is not None: # Rewind the file position indicator of the body to where # it was to resend the request. r.request.body.seek(self._thread_local.pos) s_auth = r.headers.get('www-authenticate', '')
if 'digest' in s_auth.lower() and self._thread_local.num_401_calls < 2:
self._thread_local.num_401_calls += 1 pat = re.compile(r'digest ', flags=re.IGNORECASE) self._thread_local.chal = parse_dict_header(pat.sub('', s_auth, count=1))
# Consume content and release the original connection # to allow our new request to reuse the same one. r.content r.close() prep = r.request.copy() extract_cookies_to_jar(prep._cookies, r.request, r.raw) prep.prepare_cookies(prep._cookies)
prep.headers['Authorization'] = self.build_digest_header( prep.method, prep.url) _r = r.connection.send(prep, **kwargs) _r.history.append(r) _r.request = prep
return _r
self._thread_local.num_401_calls = 1 return r
# Initialize per-thread state, if needed self.init_per_thread_state() # If we have a saved nonce, skip the 401 if self._thread_local.last_nonce: r.headers['Authorization'] = self.build_digest_header(r.method, r.url) try: self._thread_local.pos = r.body.tell() except AttributeError: # In the case of HTTPDigestAuth being reused and the body of # the previous request was a file-like object, pos has the # file position of the previous body. Ensure it's set to # None. self._thread_local.pos = None r.register_hook('response', self.handle_401) r.register_hook('response', self.handle_redirect) self._thread_local.num_401_calls = 1
return r
return all([ self.username == getattr(other, 'username', None), self.password == getattr(other, 'password', None) ])
return not self == other |