Merge pull request #8277 from deveshks/types-auth-encoding-unpacking

This commit is contained in:
Pradyun Gedam 2020-05-23 17:13:03 +05:30 committed by GitHub
commit 94cb8f582c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 22 additions and 15 deletions

View File

@ -4,9 +4,6 @@ Contains interface (MultiDomainBasicAuth) and associated glue code for
providing credentials in the context of network requests.
"""
# The following comment should be removed at some point in the future.
# mypy: disallow-untyped-defs=False
import logging
from pip._vendor.requests.auth import AuthBase, HTTPBasicAuth
@ -23,11 +20,12 @@ from pip._internal.utils.misc import (
from pip._internal.utils.typing import MYPY_CHECK_RUNNING
if MYPY_CHECK_RUNNING:
from optparse import Values
from typing import Dict, Optional, Tuple
from typing import Dict, Optional, Tuple, List, Any
from pip._internal.vcs.versioncontrol import AuthInfo
from pip._vendor.requests.models import Response, Request
Credentials = Tuple[str, str, str]
logger = logging.getLogger(__name__)
@ -44,6 +42,7 @@ except Exception as exc:
def get_keyring_auth(url, username):
# type: (str, str) -> Optional[AuthInfo]
"""Return the tuple auth for a given url from keyring."""
if not url or not keyring:
return None
@ -70,12 +69,13 @@ def get_keyring_auth(url, username):
logger.warning(
"Keyring is skipped due to an exception: %s", str(exc),
)
return None
class MultiDomainBasicAuth(AuthBase):
def __init__(self, prompting=True, index_urls=None):
# type: (bool, Optional[Values]) -> None
# type: (bool, Optional[List[str]]) -> None
self.prompting = prompting
self.index_urls = index_urls
self.passwords = {} # type: Dict[str, AuthInfo]
@ -87,6 +87,7 @@ class MultiDomainBasicAuth(AuthBase):
self._credentials_to_save = None # type: Optional[Credentials]
def _get_index_url(self, url):
# type: (str) -> Optional[str]
"""Return the original index URL matching the requested URL.
Cached or dynamically generated credentials may work against
@ -106,9 +107,11 @@ class MultiDomainBasicAuth(AuthBase):
prefix = remove_auth_from_url(u).rstrip("/") + "/"
if url.startswith(prefix):
return u
return None
def _get_new_credentials(self, original_url, allow_netrc=True,
allow_keyring=True):
# type: (str, bool, bool) -> AuthInfo
"""Find and return credentials for the specified URL."""
# Split the credentials and netloc from the url.
url, netloc, url_user_password = split_auth_netloc_from_url(
@ -158,6 +161,7 @@ class MultiDomainBasicAuth(AuthBase):
return username, password
def _get_url_and_credentials(self, original_url):
# type: (str) -> Tuple[str, Optional[str], Optional[str]]
"""Return the credentials to use for the provided URL.
If allowed, netrc and keyring may be used to obtain the
@ -198,6 +202,7 @@ class MultiDomainBasicAuth(AuthBase):
return url, username, password
def __call__(self, req):
# type: (Request) -> Request
# Get credentials for this request
url, username, password = self._get_url_and_credentials(req.url)
@ -215,9 +220,10 @@ class MultiDomainBasicAuth(AuthBase):
# Factored out to allow for easy patching in tests
def _prompt_for_password(self, netloc):
# type: (str) -> Tuple[Optional[str], Optional[str], bool]
username = ask_input("User for {}: ".format(netloc))
if not username:
return None, None
return None, None, False
auth = get_keyring_auth(netloc, username)
if auth and auth[0] is not None and auth[1] is not None:
return auth[0], auth[1], False
@ -226,11 +232,13 @@ class MultiDomainBasicAuth(AuthBase):
# Factored out to allow for easy patching in tests
def _should_save_password_to_keyring(self):
# type: () -> bool
if not keyring:
return False
return ask("Save credentials to keyring [y/N]: ", ["y", "n"]) == "y"
def handle_401(self, resp, **kwargs):
# type: (Response, **Any) -> Response
# We only care about 401 responses, anything else we want to just
# pass through the actual response
if resp.status_code != 401:
@ -276,6 +284,7 @@ class MultiDomainBasicAuth(AuthBase):
return new_resp
def warn_on_401(self, resp, **kwargs):
# type: (Response, **Any) -> None
"""Response callback to warn about incorrect credentials."""
if resp.status_code == 401:
logger.warning(
@ -283,6 +292,7 @@ class MultiDomainBasicAuth(AuthBase):
)
def save_credentials(self, resp, **kwargs):
# type: (Response, **Any) -> None
"""Response callback to save credentials on success."""
assert keyring is not None, "should never reach here without keyring"
if not keyring:

View File

@ -1,6 +1,3 @@
# The following comment should be removed at some point in the future.
# mypy: strict-optional=False
import codecs
import locale
import re
@ -35,7 +32,9 @@ def auto_decode(data):
# Lets check the first two lines as in PEP263
for line in data.split(b'\n')[:2]:
if line[0:1] == b'#' and ENCODING_RE.search(line):
encoding = ENCODING_RE.search(line).groups()[0].decode('ascii')
result = ENCODING_RE.search(line)
assert result is not None
encoding = result.groups()[0].decode('ascii')
return data.decode(encoding)
return data.decode(
locale.getpreferredencoding(False) or sys.getdefaultencoding(),

View File

@ -1,10 +1,6 @@
"""Utilities related archives.
"""
# The following comment should be removed at some point in the future.
# mypy: strict-optional=False
# mypy: disallow-untyped-defs=False
from __future__ import absolute_import
import logging
@ -48,6 +44,7 @@ except ImportError:
def current_umask():
# type: () -> int
"""Get the current umask which involves having to set it temporarily."""
mask = os.umask(0)
os.umask(mask)
@ -219,6 +216,7 @@ def untar_file(filename, location):
)
continue
ensure_dir(os.path.dirname(path))
assert fp is not None
with open(path, 'wb') as destfp:
shutil.copyfileobj(fp, destfp)
fp.close()