Type annotations for pip._internal.network.auth

This commit is contained in:
Devesh Kumar Singh 2020-05-20 23:12:23 +05:30
parent 803a6f2017
commit c469e11d47
1 changed files with 17 additions and 9 deletions

View File

@ -4,9 +4,6 @@ Contains interface (MultiDomainBasicAuth) and associated glue code for
providing credentials in the context of network requests.
"""
# The following comment should be removed at some point in the future.
# mypy: disallow-untyped-defs=False
import logging
from pip._vendor.requests.auth import AuthBase, HTTPBasicAuth
@ -23,11 +20,10 @@ from pip._internal.utils.misc import (
from pip._internal.utils.typing import MYPY_CHECK_RUNNING
if MYPY_CHECK_RUNNING:
from optparse import Values
from typing import Dict, Optional, Tuple
from typing import Dict, Optional, Tuple, List, Any
from pip._internal.vcs.versioncontrol import AuthInfo
from pip._vendor.requests.models import Response, Request
Credentials = Tuple[str, str, str]
logger = logging.getLogger(__name__)
@ -44,6 +40,7 @@ except Exception as exc:
def get_keyring_auth(url, username):
# type: (str, str) -> Optional[AuthInfo]
"""Return the tuple auth for a given url from keyring."""
if not url or not keyring:
return None
@ -70,12 +67,13 @@ def get_keyring_auth(url, username):
logger.warning(
"Keyring is skipped due to an exception: %s", str(exc),
)
return None
class MultiDomainBasicAuth(AuthBase):
def __init__(self, prompting=True, index_urls=None):
# type: (bool, Optional[Values]) -> None
# type: (bool, Optional[List[str]]) -> None
self.prompting = prompting
self.index_urls = index_urls
self.passwords = {} # type: Dict[str, AuthInfo]
@ -87,6 +85,7 @@ class MultiDomainBasicAuth(AuthBase):
self._credentials_to_save = None # type: Optional[Credentials]
def _get_index_url(self, url):
# type: (str) -> Optional[str]
"""Return the original index URL matching the requested URL.
Cached or dynamically generated credentials may work against
@ -106,9 +105,11 @@ class MultiDomainBasicAuth(AuthBase):
prefix = remove_auth_from_url(u).rstrip("/") + "/"
if url.startswith(prefix):
return u
return None
def _get_new_credentials(self, original_url, allow_netrc=True,
allow_keyring=True):
# type: (str, bool, bool) -> AuthInfo
"""Find and return credentials for the specified URL."""
# Split the credentials and netloc from the url.
url, netloc, url_user_password = split_auth_netloc_from_url(
@ -158,6 +159,7 @@ class MultiDomainBasicAuth(AuthBase):
return username, password
def _get_url_and_credentials(self, original_url):
# type: (str) -> Tuple[str, Optional[str], Optional[str]]
"""Return the credentials to use for the provided URL.
If allowed, netrc and keyring may be used to obtain the
@ -178,7 +180,7 @@ class MultiDomainBasicAuth(AuthBase):
username, password = self._get_new_credentials(original_url)
if username is not None or password is not None:
# Convert the username and password if they're None, so that
# Convert the username and password if they 're None, so that
# this netloc will show up as "cached" in the conditional above.
# Further, HTTPBasicAuth doesn't accept None, so it makes sense to
# cache the value that is going to be used.
@ -198,6 +200,7 @@ class MultiDomainBasicAuth(AuthBase):
return url, username, password
def __call__(self, req):
# type: (Request) -> Request
# Get credentials for this request
url, username, password = self._get_url_and_credentials(req.url)
@ -215,9 +218,10 @@ class MultiDomainBasicAuth(AuthBase):
# Factored out to allow for easy patching in tests
def _prompt_for_password(self, netloc):
# type: (str) -> Tuple[Optional[str], Optional[str], bool]
username = ask_input("User for {}: ".format(netloc))
if not username:
return None, None
return None, None, False
auth = get_keyring_auth(netloc, username)
if auth and auth[0] is not None and auth[1] is not None:
return auth[0], auth[1], False
@ -226,11 +230,13 @@ class MultiDomainBasicAuth(AuthBase):
# Factored out to allow for easy patching in tests
def _should_save_password_to_keyring(self):
# type: () -> bool
if not keyring:
return False
return ask("Save credentials to keyring [y/N]: ", ["y", "n"]) == "y"
def handle_401(self, resp, **kwargs):
# type: (Response, **Any) -> None
# We only care about 401 responses, anything else we want to just
# pass through the actual response
if resp.status_code != 401:
@ -276,6 +282,7 @@ class MultiDomainBasicAuth(AuthBase):
return new_resp
def warn_on_401(self, resp, **kwargs):
# type: (Response, **Any) -> None
"""Response callback to warn about incorrect credentials."""
if resp.status_code == 401:
logger.warning(
@ -283,6 +290,7 @@ class MultiDomainBasicAuth(AuthBase):
)
def save_credentials(self, resp, **kwargs):
# type: (Response, **Any) -> None
"""Response callback to save credentials on success."""
assert keyring is not None, "should never reach here without keyring"
if not keyring: