Move MultiDomainBasicAuth to networking.auth

This commit is contained in:
Chris Hunt 2019-09-19 21:02:22 -04:00
parent e9274f6548
commit f8d03e4baa
4 changed files with 312 additions and 306 deletions

View File

@ -17,10 +17,8 @@ from pip._vendor.cachecontrol import CacheControlAdapter
from pip._vendor.cachecontrol.cache import BaseCache
from pip._vendor.cachecontrol.caches import FileCache
from pip._vendor.requests.adapters import BaseAdapter, HTTPAdapter
from pip._vendor.requests.auth import AuthBase, HTTPBasicAuth
from pip._vendor.requests.models import CONTENT_CHUNK_SIZE, Response
from pip._vendor.requests.structures import CaseInsensitiveDict
from pip._vendor.requests.utils import get_netrc_auth
from pip._vendor.six import PY2
# NOTE: XMLRPC Client is not annotated in typeshed as on 2017-07-17, which is
# why we ignore the type on this import
@ -30,7 +28,7 @@ from pip._vendor.six.moves.urllib import parse as urllib_parse
import pip
from pip._internal.exceptions import HashMismatch, InstallationError
from pip._internal.models.index import PyPI
from pip._internal.networking.auth import get_keyring_auth, keyring
from pip._internal.networking.auth import MultiDomainBasicAuth
# Import ssl from compat so the initial import occurs in only one place.
from pip._internal.utils.compat import HAS_TLS, ipaddress, ssl
from pip._internal.utils.encoding import auto_decode
@ -42,9 +40,6 @@ from pip._internal.utils.filesystem import (
)
from pip._internal.utils.glibc import libc_ver
from pip._internal.utils.misc import (
ask,
ask_input,
ask_password,
ask_path_exists,
backup_dir,
build_url_from_netloc,
@ -57,9 +52,7 @@ from pip._internal.utils.misc import (
parse_netloc,
path_to_display,
path_to_url,
remove_auth_from_url,
rmtree,
split_auth_netloc_from_url,
splitext,
)
from pip._internal.utils.temp_dir import TempDirectory
@ -71,17 +64,15 @@ from pip._internal.vcs import vcs
if MYPY_CHECK_RUNNING:
from typing import (
IO, Callable, Dict, Iterator, List, Optional, Text, Tuple, Union,
IO, Callable, Iterator, List, Optional, Text, Tuple, Union,
)
from optparse import Values
from mypy_extensions import TypedDict
from pip._internal.models.link import Link
from pip._internal.utils.hashes import Hashes
from pip._internal.vcs.versioncontrol import AuthInfo, VersionControl
from pip._internal.vcs.versioncontrol import VersionControl
Credentials = Tuple[str, str, str]
SecureOrigin = Tuple[str, str, Optional[Union[int, str]]]
if PY2:
@ -239,228 +230,6 @@ def user_agent():
)
class MultiDomainBasicAuth(AuthBase):
def __init__(self, prompting=True, index_urls=None):
# type: (bool, Optional[Values]) -> None
self.prompting = prompting
self.index_urls = index_urls
self.passwords = {} # type: Dict[str, AuthInfo]
# When the user is prompted to enter credentials and keyring is
# available, we will offer to save them. If the user accepts,
# this value is set to the credentials they entered. After the
# request authenticates, the caller should call
# ``save_credentials`` to save these.
self._credentials_to_save = None # type: Optional[Credentials]
def _get_index_url(self, url):
"""Return the original index URL matching the requested URL.
Cached or dynamically generated credentials may work against
the original index URL rather than just the netloc.
The provided url should have had its username and password
removed already. If the original index url had credentials then
they will be included in the return value.
Returns None if no matching index was found, or if --no-index
was specified by the user.
"""
if not url or not self.index_urls:
return None
for u in self.index_urls:
prefix = remove_auth_from_url(u).rstrip("/") + "/"
if url.startswith(prefix):
return u
def _get_new_credentials(self, original_url, allow_netrc=True,
allow_keyring=True):
"""Find and return credentials for the specified URL."""
# Split the credentials and netloc from the url.
url, netloc, url_user_password = split_auth_netloc_from_url(
original_url)
# Start with the credentials embedded in the url
username, password = url_user_password
if username is not None and password is not None:
logger.debug("Found credentials in url for %s", netloc)
return url_user_password
# Find a matching index url for this request
index_url = self._get_index_url(url)
if index_url:
# Split the credentials from the url.
index_info = split_auth_netloc_from_url(index_url)
if index_info:
index_url, _, index_url_user_password = index_info
logger.debug("Found index url %s", index_url)
# If an index URL was found, try its embedded credentials
if index_url and index_url_user_password[0] is not None:
username, password = index_url_user_password
if username is not None and password is not None:
logger.debug("Found credentials in index url for %s", netloc)
return index_url_user_password
# Get creds from netrc if we still don't have them
if allow_netrc:
netrc_auth = get_netrc_auth(original_url)
if netrc_auth:
logger.debug("Found credentials in netrc for %s", netloc)
return netrc_auth
# If we don't have a password and keyring is available, use it.
if allow_keyring:
# The index url is more specific than the netloc, so try it first
kr_auth = (get_keyring_auth(index_url, username) or
get_keyring_auth(netloc, username))
if kr_auth:
logger.debug("Found credentials in keyring for %s", netloc)
return kr_auth
return username, password
def _get_url_and_credentials(self, original_url):
"""Return the credentials to use for the provided URL.
If allowed, netrc and keyring may be used to obtain the
correct credentials.
Returns (url_without_credentials, username, password). Note
that even if the original URL contains credentials, this
function may return a different username and password.
"""
url, netloc, _ = split_auth_netloc_from_url(original_url)
# Use any stored credentials that we have for this netloc
username, password = self.passwords.get(netloc, (None, None))
if username is None and password is None:
# No stored credentials. Acquire new credentials without prompting
# the user. (e.g. from netrc, keyring, or the URL itself)
username, password = self._get_new_credentials(original_url)
if username is not None or password is not None:
# Convert the username and password if they're None, so that
# this netloc will show up as "cached" in the conditional above.
# Further, HTTPBasicAuth doesn't accept None, so it makes sense to
# cache the value that is going to be used.
username = username or ""
password = password or ""
# Store any acquired credentials.
self.passwords[netloc] = (username, password)
assert (
# Credentials were found
(username is not None and password is not None) or
# Credentials were not found
(username is None and password is None)
), "Could not load credentials from url: {}".format(original_url)
return url, username, password
def __call__(self, req):
# Get credentials for this request
url, username, password = self._get_url_and_credentials(req.url)
# Set the url of the request to the url without any credentials
req.url = url
if username is not None and password is not None:
# Send the basic auth with this request
req = HTTPBasicAuth(username, password)(req)
# Attach a hook to handle 401 responses
req.register_hook("response", self.handle_401)
return req
# Factored out to allow for easy patching in tests
def _prompt_for_password(self, netloc):
username = ask_input("User for %s: " % netloc)
if not username:
return None, None
auth = get_keyring_auth(netloc, username)
if auth:
return auth[0], auth[1], False
password = ask_password("Password: ")
return username, password, True
# Factored out to allow for easy patching in tests
def _should_save_password_to_keyring(self):
if not keyring:
return False
return ask("Save credentials to keyring [y/N]: ", ["y", "n"]) == "y"
def handle_401(self, resp, **kwargs):
# We only care about 401 responses, anything else we want to just
# pass through the actual response
if resp.status_code != 401:
return resp
# We are not able to prompt the user so simply return the response
if not self.prompting:
return resp
parsed = urllib_parse.urlparse(resp.url)
# Prompt the user for a new username and password
username, password, save = self._prompt_for_password(parsed.netloc)
# Store the new username and password to use for future requests
self._credentials_to_save = None
if username is not None and password is not None:
self.passwords[parsed.netloc] = (username, password)
# Prompt to save the password to keyring
if save and self._should_save_password_to_keyring():
self._credentials_to_save = (parsed.netloc, username, password)
# Consume content and release the original connection to allow our new
# request to reuse the same one.
resp.content
resp.raw.release_conn()
# Add our new username and password to the request
req = HTTPBasicAuth(username or "", password or "")(resp.request)
req.register_hook("response", self.warn_on_401)
# On successful request, save the credentials that were used to
# keyring. (Note that if the user responded "no" above, this member
# is not set and nothing will be saved.)
if self._credentials_to_save:
req.register_hook("response", self.save_credentials)
# Send our new request
new_resp = resp.connection.send(req, **kwargs)
new_resp.history.append(resp)
return new_resp
def warn_on_401(self, resp, **kwargs):
"""Response callback to warn about incorrect credentials."""
if resp.status_code == 401:
logger.warning('401 Error, Credentials not correct for %s',
resp.request.url)
def save_credentials(self, resp, **kwargs):
"""Response callback to save credentials on success."""
assert keyring is not None, "should never reach here without keyring"
if not keyring:
return
creds = self._credentials_to_save
self._credentials_to_save = None
if creds and resp.status_code < 400:
try:
logger.info('Saving credentials to keyring')
keyring.set_password(*creds)
except Exception:
logger.exception('Failed to save credentials')
class LocalFSAdapter(BaseAdapter):
def send(self, request, stream=None, timeout=None, verify=None, cert=None,

View File

@ -1,5 +1,26 @@
import logging
from pip._vendor.requests.auth import AuthBase, HTTPBasicAuth
from pip._vendor.requests.utils import get_netrc_auth
from pip._vendor.six.moves.urllib import parse as urllib_parse
from pip._internal.utils.misc import (
ask,
ask_input,
ask_password,
remove_auth_from_url,
split_auth_netloc_from_url,
)
from pip._internal.utils.typing import MYPY_CHECK_RUNNING
if MYPY_CHECK_RUNNING:
from optparse import Values
from typing import Dict, Optional, Tuple
from pip._internal.vcs.versioncontrol import AuthInfo
Credentials = Tuple[str, str, str]
logger = logging.getLogger(__name__)
try:
@ -38,3 +59,225 @@ def get_keyring_auth(url, username):
except Exception as exc:
logger.warning("Keyring is skipped due to an exception: %s",
str(exc))
class MultiDomainBasicAuth(AuthBase):
def __init__(self, prompting=True, index_urls=None):
# type: (bool, Optional[Values]) -> None
self.prompting = prompting
self.index_urls = index_urls
self.passwords = {} # type: Dict[str, AuthInfo]
# When the user is prompted to enter credentials and keyring is
# available, we will offer to save them. If the user accepts,
# this value is set to the credentials they entered. After the
# request authenticates, the caller should call
# ``save_credentials`` to save these.
self._credentials_to_save = None # type: Optional[Credentials]
def _get_index_url(self, url):
"""Return the original index URL matching the requested URL.
Cached or dynamically generated credentials may work against
the original index URL rather than just the netloc.
The provided url should have had its username and password
removed already. If the original index url had credentials then
they will be included in the return value.
Returns None if no matching index was found, or if --no-index
was specified by the user.
"""
if not url or not self.index_urls:
return None
for u in self.index_urls:
prefix = remove_auth_from_url(u).rstrip("/") + "/"
if url.startswith(prefix):
return u
def _get_new_credentials(self, original_url, allow_netrc=True,
allow_keyring=True):
"""Find and return credentials for the specified URL."""
# Split the credentials and netloc from the url.
url, netloc, url_user_password = split_auth_netloc_from_url(
original_url)
# Start with the credentials embedded in the url
username, password = url_user_password
if username is not None and password is not None:
logger.debug("Found credentials in url for %s", netloc)
return url_user_password
# Find a matching index url for this request
index_url = self._get_index_url(url)
if index_url:
# Split the credentials from the url.
index_info = split_auth_netloc_from_url(index_url)
if index_info:
index_url, _, index_url_user_password = index_info
logger.debug("Found index url %s", index_url)
# If an index URL was found, try its embedded credentials
if index_url and index_url_user_password[0] is not None:
username, password = index_url_user_password
if username is not None and password is not None:
logger.debug("Found credentials in index url for %s", netloc)
return index_url_user_password
# Get creds from netrc if we still don't have them
if allow_netrc:
netrc_auth = get_netrc_auth(original_url)
if netrc_auth:
logger.debug("Found credentials in netrc for %s", netloc)
return netrc_auth
# If we don't have a password and keyring is available, use it.
if allow_keyring:
# The index url is more specific than the netloc, so try it first
kr_auth = (get_keyring_auth(index_url, username) or
get_keyring_auth(netloc, username))
if kr_auth:
logger.debug("Found credentials in keyring for %s", netloc)
return kr_auth
return username, password
def _get_url_and_credentials(self, original_url):
"""Return the credentials to use for the provided URL.
If allowed, netrc and keyring may be used to obtain the
correct credentials.
Returns (url_without_credentials, username, password). Note
that even if the original URL contains credentials, this
function may return a different username and password.
"""
url, netloc, _ = split_auth_netloc_from_url(original_url)
# Use any stored credentials that we have for this netloc
username, password = self.passwords.get(netloc, (None, None))
if username is None and password is None:
# No stored credentials. Acquire new credentials without prompting
# the user. (e.g. from netrc, keyring, or the URL itself)
username, password = self._get_new_credentials(original_url)
if username is not None or password is not None:
# Convert the username and password if they're None, so that
# this netloc will show up as "cached" in the conditional above.
# Further, HTTPBasicAuth doesn't accept None, so it makes sense to
# cache the value that is going to be used.
username = username or ""
password = password or ""
# Store any acquired credentials.
self.passwords[netloc] = (username, password)
assert (
# Credentials were found
(username is not None and password is not None) or
# Credentials were not found
(username is None and password is None)
), "Could not load credentials from url: {}".format(original_url)
return url, username, password
def __call__(self, req):
# Get credentials for this request
url, username, password = self._get_url_and_credentials(req.url)
# Set the url of the request to the url without any credentials
req.url = url
if username is not None and password is not None:
# Send the basic auth with this request
req = HTTPBasicAuth(username, password)(req)
# Attach a hook to handle 401 responses
req.register_hook("response", self.handle_401)
return req
# Factored out to allow for easy patching in tests
def _prompt_for_password(self, netloc):
username = ask_input("User for %s: " % netloc)
if not username:
return None, None
auth = get_keyring_auth(netloc, username)
if auth:
return auth[0], auth[1], False
password = ask_password("Password: ")
return username, password, True
# Factored out to allow for easy patching in tests
def _should_save_password_to_keyring(self):
if not keyring:
return False
return ask("Save credentials to keyring [y/N]: ", ["y", "n"]) == "y"
def handle_401(self, resp, **kwargs):
# We only care about 401 responses, anything else we want to just
# pass through the actual response
if resp.status_code != 401:
return resp
# We are not able to prompt the user so simply return the response
if not self.prompting:
return resp
parsed = urllib_parse.urlparse(resp.url)
# Prompt the user for a new username and password
username, password, save = self._prompt_for_password(parsed.netloc)
# Store the new username and password to use for future requests
self._credentials_to_save = None
if username is not None and password is not None:
self.passwords[parsed.netloc] = (username, password)
# Prompt to save the password to keyring
if save and self._should_save_password_to_keyring():
self._credentials_to_save = (parsed.netloc, username, password)
# Consume content and release the original connection to allow our new
# request to reuse the same one.
resp.content
resp.raw.release_conn()
# Add our new username and password to the request
req = HTTPBasicAuth(username or "", password or "")(resp.request)
req.register_hook("response", self.warn_on_401)
# On successful request, save the credentials that were used to
# keyring. (Note that if the user responded "no" above, this member
# is not set and nothing will be saved.)
if self._credentials_to_save:
req.register_hook("response", self.save_credentials)
# Send our new request
new_resp = resp.connection.send(req, **kwargs)
new_resp.history.append(resp)
return new_resp
def warn_on_401(self, resp, **kwargs):
"""Response callback to warn about incorrect credentials."""
if resp.status_code == 401:
logger.warning('401 Error, Credentials not correct for %s',
resp.request.url)
def save_credentials(self, resp, **kwargs):
"""Response callback to save credentials on success."""
assert keyring is not None, "should never reach here without keyring"
if not keyring:
return
creds = self._credentials_to_save
self._credentials_to_save = None
if creds and resp.status_code < 400:
try:
logger.info('Saving credentials to keyring')
keyring.set_password(*creds)
except Exception:
logger.exception('Failed to save credentials')

View File

@ -1,4 +1,3 @@
import functools
import hashlib
import logging
import os
@ -15,7 +14,6 @@ from pip._vendor.cachecontrol.caches import FileCache
import pip
from pip._internal.download import (
CI_ENVIRONMENT_VARIABLES,
MultiDomainBasicAuth,
PipSession,
SafeFileCache,
_copy_source_tree,
@ -727,67 +725,3 @@ class TestPipSession:
actual_level, actual_message = log_records[0]
assert actual_level == 'WARNING'
assert 'is not a trusted or secure host' in actual_message
@pytest.mark.parametrize(["input_url", "url", "username", "password"], [
(
"http://user%40email.com:password@example.com/path",
"http://example.com/path",
"user@email.com",
"password",
),
(
"http://username:password@example.com/path",
"http://example.com/path",
"username",
"password",
),
(
"http://token@example.com/path",
"http://example.com/path",
"token",
"",
),
(
"http://example.com/path",
"http://example.com/path",
None,
None,
),
])
def test_get_credentials_parses_correctly(input_url, url, username, password):
auth = MultiDomainBasicAuth()
get = auth._get_url_and_credentials
# Check URL parsing
assert get(input_url) == (url, username, password)
assert (
# There are no credentials in the URL
(username is None and password is None) or
# Credentials were found and "cached" appropriately
auth.passwords['example.com'] == (username, password)
)
def test_get_credentials_uses_cached_credentials():
auth = MultiDomainBasicAuth()
auth.passwords['example.com'] = ('user', 'pass')
got = auth._get_url_and_credentials("http://foo:bar@example.com/path")
expected = ('http://example.com/path', 'user', 'pass')
assert got == expected
def test_get_index_url_credentials():
auth = MultiDomainBasicAuth(index_urls=[
"http://foo:bar@example.com/path"
])
get = functools.partial(
auth._get_new_credentials,
allow_netrc=False,
allow_keyring=False
)
# Check resolution of indexes
assert get("http://example.com/path/path2") == ('foo', 'bar')
assert get("http://example.com/path3/path2") == (None, None)

View File

@ -3,10 +3,74 @@ import functools
import pytest
import pip._internal.networking.auth
from pip._internal.download import MultiDomainBasicAuth
from pip._internal.networking.auth import MultiDomainBasicAuth
from tests.unit.test_download import MockConnection, MockRequest, MockResponse
@pytest.mark.parametrize(["input_url", "url", "username", "password"], [
(
"http://user%40email.com:password@example.com/path",
"http://example.com/path",
"user@email.com",
"password",
),
(
"http://username:password@example.com/path",
"http://example.com/path",
"username",
"password",
),
(
"http://token@example.com/path",
"http://example.com/path",
"token",
"",
),
(
"http://example.com/path",
"http://example.com/path",
None,
None,
),
])
def test_get_credentials_parses_correctly(input_url, url, username, password):
auth = MultiDomainBasicAuth()
get = auth._get_url_and_credentials
# Check URL parsing
assert get(input_url) == (url, username, password)
assert (
# There are no credentials in the URL
(username is None and password is None) or
# Credentials were found and "cached" appropriately
auth.passwords['example.com'] == (username, password)
)
def test_get_credentials_uses_cached_credentials():
auth = MultiDomainBasicAuth()
auth.passwords['example.com'] = ('user', 'pass')
got = auth._get_url_and_credentials("http://foo:bar@example.com/path")
expected = ('http://example.com/path', 'user', 'pass')
assert got == expected
def test_get_index_url_credentials():
auth = MultiDomainBasicAuth(index_urls=[
"http://foo:bar@example.com/path"
])
get = functools.partial(
auth._get_new_credentials,
allow_netrc=False,
allow_keyring=False
)
# Check resolution of indexes
assert get("http://example.com/path/path2") == ('foo', 'bar')
assert get("http://example.com/path3/path2") == (None, None)
class KeyringModuleV1(object):
"""Represents the supported API of keyring before get_credential
was added.
@ -38,7 +102,6 @@ class KeyringModuleV1(object):
def test_keyring_get_password(monkeypatch, url, expect):
keyring = KeyringModuleV1()
monkeypatch.setattr('pip._internal.networking.auth.keyring', keyring)
monkeypatch.setattr('pip._internal.download.keyring', keyring)
auth = MultiDomainBasicAuth(index_urls=["http://example.com/path2"])
actual = auth._get_new_credentials(url, allow_netrc=False,
@ -49,14 +112,13 @@ def test_keyring_get_password(monkeypatch, url, expect):
def test_keyring_get_password_after_prompt(monkeypatch):
keyring = KeyringModuleV1()
monkeypatch.setattr('pip._internal.networking.auth.keyring', keyring)
monkeypatch.setattr('pip._internal.download.keyring', keyring)
auth = MultiDomainBasicAuth()
def ask_input(prompt):
assert prompt == "User for example.com: "
return "user"
monkeypatch.setattr('pip._internal.download.ask_input', ask_input)
monkeypatch.setattr('pip._internal.networking.auth.ask_input', ask_input)
actual = auth._prompt_for_password("example.com")
assert actual == ("user", "user!netloc", False)
@ -64,7 +126,6 @@ def test_keyring_get_password_after_prompt(monkeypatch):
def test_keyring_get_password_username_in_index(monkeypatch):
keyring = KeyringModuleV1()
monkeypatch.setattr('pip._internal.networking.auth.keyring', keyring)
monkeypatch.setattr('pip._internal.download.keyring', keyring)
auth = MultiDomainBasicAuth(index_urls=["http://user@example.com/path2"])
get = functools.partial(
auth._get_new_credentials,
@ -85,7 +146,6 @@ def test_keyring_set_password(monkeypatch, response_status, creds,
expect_save):
keyring = KeyringModuleV1()
monkeypatch.setattr('pip._internal.networking.auth.keyring', keyring)
monkeypatch.setattr('pip._internal.download.keyring', keyring)
auth = MultiDomainBasicAuth(prompting=True)
monkeypatch.setattr(auth, '_get_url_and_credentials',
lambda u: (u, None, None))