Create new module: network.auth (#7045)

This commit is contained in:
Pradyun Gedam 2019-09-20 22:43:34 +05:30 committed by GitHub
commit 0a2238ac38
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 516 additions and 485 deletions

View File

@ -17,10 +17,8 @@ from pip._vendor.cachecontrol import CacheControlAdapter
from pip._vendor.cachecontrol.cache import BaseCache
from pip._vendor.cachecontrol.caches import FileCache
from pip._vendor.requests.adapters import BaseAdapter, HTTPAdapter
from pip._vendor.requests.auth import AuthBase, HTTPBasicAuth
from pip._vendor.requests.models import CONTENT_CHUNK_SIZE, Response
from pip._vendor.requests.structures import CaseInsensitiveDict
from pip._vendor.requests.utils import get_netrc_auth
from pip._vendor.six import PY2
# NOTE: XMLRPC Client is not annotated in typeshed as on 2017-07-17, which is
# why we ignore the type on this import
@ -30,6 +28,7 @@ from pip._vendor.six.moves.urllib import parse as urllib_parse
import pip
from pip._internal.exceptions import HashMismatch, InstallationError
from pip._internal.models.index import PyPI
from pip._internal.network.auth import MultiDomainBasicAuth
# Import ssl from compat so the initial import occurs in only one place.
from pip._internal.utils.compat import HAS_TLS, ipaddress, ssl
from pip._internal.utils.encoding import auto_decode
@ -41,9 +40,6 @@ from pip._internal.utils.filesystem import (
)
from pip._internal.utils.glibc import libc_ver
from pip._internal.utils.misc import (
ask,
ask_input,
ask_password,
ask_path_exists,
backup_dir,
build_url_from_netloc,
@ -56,9 +52,7 @@ from pip._internal.utils.misc import (
parse_netloc,
path_to_display,
path_to_url,
remove_auth_from_url,
rmtree,
split_auth_netloc_from_url,
splitext,
)
from pip._internal.utils.temp_dir import TempDirectory
@ -70,17 +64,15 @@ from pip._internal.vcs import vcs
if MYPY_CHECK_RUNNING:
from typing import (
IO, Callable, Dict, Iterator, List, Optional, Text, Tuple, Union,
IO, Callable, Iterator, List, Optional, Text, Tuple, Union,
)
from optparse import Values
from mypy_extensions import TypedDict
from pip._internal.models.link import Link
from pip._internal.utils.hashes import Hashes
from pip._internal.vcs.versioncontrol import AuthInfo, VersionControl
from pip._internal.vcs.versioncontrol import VersionControl
Credentials = Tuple[str, str, str]
SecureOrigin = Tuple[str, str, Optional[Union[int, str]]]
if PY2:
@ -116,16 +108,6 @@ __all__ = ['get_file_content',
logger = logging.getLogger(__name__)
try:
import keyring # noqa
except ImportError:
keyring = None
except Exception as exc:
logger.warning("Keyring is skipped due to an exception: %s",
str(exc))
keyring = None
SECURE_ORIGINS = [
# protocol, hostname, port
# Taken from Chrome's list of secure origins (See: http://bit.ly/1qrySKC)
@ -248,256 +230,6 @@ def user_agent():
)
def _get_keyring_auth(url, username):
"""Return the tuple auth for a given url from keyring."""
if not url or not keyring:
return None
try:
try:
get_credential = keyring.get_credential
except AttributeError:
pass
else:
logger.debug("Getting credentials from keyring for %s", url)
cred = get_credential(url, username)
if cred is not None:
return cred.username, cred.password
return None
if username:
logger.debug("Getting password from keyring for %s", url)
password = keyring.get_password(url, username)
if password:
return username, password
except Exception as exc:
logger.warning("Keyring is skipped due to an exception: %s",
str(exc))
class MultiDomainBasicAuth(AuthBase):
def __init__(self, prompting=True, index_urls=None):
# type: (bool, Optional[Values]) -> None
self.prompting = prompting
self.index_urls = index_urls
self.passwords = {} # type: Dict[str, AuthInfo]
# When the user is prompted to enter credentials and keyring is
# available, we will offer to save them. If the user accepts,
# this value is set to the credentials they entered. After the
# request authenticates, the caller should call
# ``save_credentials`` to save these.
self._credentials_to_save = None # type: Optional[Credentials]
def _get_index_url(self, url):
"""Return the original index URL matching the requested URL.
Cached or dynamically generated credentials may work against
the original index URL rather than just the netloc.
The provided url should have had its username and password
removed already. If the original index url had credentials then
they will be included in the return value.
Returns None if no matching index was found, or if --no-index
was specified by the user.
"""
if not url or not self.index_urls:
return None
for u in self.index_urls:
prefix = remove_auth_from_url(u).rstrip("/") + "/"
if url.startswith(prefix):
return u
def _get_new_credentials(self, original_url, allow_netrc=True,
allow_keyring=True):
"""Find and return credentials for the specified URL."""
# Split the credentials and netloc from the url.
url, netloc, url_user_password = split_auth_netloc_from_url(
original_url)
# Start with the credentials embedded in the url
username, password = url_user_password
if username is not None and password is not None:
logger.debug("Found credentials in url for %s", netloc)
return url_user_password
# Find a matching index url for this request
index_url = self._get_index_url(url)
if index_url:
# Split the credentials from the url.
index_info = split_auth_netloc_from_url(index_url)
if index_info:
index_url, _, index_url_user_password = index_info
logger.debug("Found index url %s", index_url)
# If an index URL was found, try its embedded credentials
if index_url and index_url_user_password[0] is not None:
username, password = index_url_user_password
if username is not None and password is not None:
logger.debug("Found credentials in index url for %s", netloc)
return index_url_user_password
# Get creds from netrc if we still don't have them
if allow_netrc:
netrc_auth = get_netrc_auth(original_url)
if netrc_auth:
logger.debug("Found credentials in netrc for %s", netloc)
return netrc_auth
# If we don't have a password and keyring is available, use it.
if allow_keyring:
# The index url is more specific than the netloc, so try it first
kr_auth = (_get_keyring_auth(index_url, username) or
_get_keyring_auth(netloc, username))
if kr_auth:
logger.debug("Found credentials in keyring for %s", netloc)
return kr_auth
return username, password
def _get_url_and_credentials(self, original_url):
"""Return the credentials to use for the provided URL.
If allowed, netrc and keyring may be used to obtain the
correct credentials.
Returns (url_without_credentials, username, password). Note
that even if the original URL contains credentials, this
function may return a different username and password.
"""
url, netloc, _ = split_auth_netloc_from_url(original_url)
# Use any stored credentials that we have for this netloc
username, password = self.passwords.get(netloc, (None, None))
if username is None and password is None:
# No stored credentials. Acquire new credentials without prompting
# the user. (e.g. from netrc, keyring, or the URL itself)
username, password = self._get_new_credentials(original_url)
if username is not None or password is not None:
# Convert the username and password if they're None, so that
# this netloc will show up as "cached" in the conditional above.
# Further, HTTPBasicAuth doesn't accept None, so it makes sense to
# cache the value that is going to be used.
username = username or ""
password = password or ""
# Store any acquired credentials.
self.passwords[netloc] = (username, password)
assert (
# Credentials were found
(username is not None and password is not None) or
# Credentials were not found
(username is None and password is None)
), "Could not load credentials from url: {}".format(original_url)
return url, username, password
def __call__(self, req):
# Get credentials for this request
url, username, password = self._get_url_and_credentials(req.url)
# Set the url of the request to the url without any credentials
req.url = url
if username is not None and password is not None:
# Send the basic auth with this request
req = HTTPBasicAuth(username, password)(req)
# Attach a hook to handle 401 responses
req.register_hook("response", self.handle_401)
return req
# Factored out to allow for easy patching in tests
def _prompt_for_password(self, netloc):
username = ask_input("User for %s: " % netloc)
if not username:
return None, None
auth = _get_keyring_auth(netloc, username)
if auth:
return auth[0], auth[1], False
password = ask_password("Password: ")
return username, password, True
# Factored out to allow for easy patching in tests
def _should_save_password_to_keyring(self):
if not keyring:
return False
return ask("Save credentials to keyring [y/N]: ", ["y", "n"]) == "y"
def handle_401(self, resp, **kwargs):
# We only care about 401 responses, anything else we want to just
# pass through the actual response
if resp.status_code != 401:
return resp
# We are not able to prompt the user so simply return the response
if not self.prompting:
return resp
parsed = urllib_parse.urlparse(resp.url)
# Prompt the user for a new username and password
username, password, save = self._prompt_for_password(parsed.netloc)
# Store the new username and password to use for future requests
self._credentials_to_save = None
if username is not None and password is not None:
self.passwords[parsed.netloc] = (username, password)
# Prompt to save the password to keyring
if save and self._should_save_password_to_keyring():
self._credentials_to_save = (parsed.netloc, username, password)
# Consume content and release the original connection to allow our new
# request to reuse the same one.
resp.content
resp.raw.release_conn()
# Add our new username and password to the request
req = HTTPBasicAuth(username or "", password or "")(resp.request)
req.register_hook("response", self.warn_on_401)
# On successful request, save the credentials that were used to
# keyring. (Note that if the user responded "no" above, this member
# is not set and nothing will be saved.)
if self._credentials_to_save:
req.register_hook("response", self.save_credentials)
# Send our new request
new_resp = resp.connection.send(req, **kwargs)
new_resp.history.append(resp)
return new_resp
def warn_on_401(self, resp, **kwargs):
"""Response callback to warn about incorrect credentials."""
if resp.status_code == 401:
logger.warning('401 Error, Credentials not correct for %s',
resp.request.url)
def save_credentials(self, resp, **kwargs):
"""Response callback to save credentials on success."""
assert keyring is not None, "should never reach here without keyring"
if not keyring:
return
creds = self._credentials_to_save
self._credentials_to_save = None
if creds and resp.status_code < 400:
try:
logger.info('Saving credentials to keyring')
keyring.set_password(*creds)
except Exception:
logger.exception('Failed to save credentials')
class LocalFSAdapter(BaseAdapter):
def send(self, request, stream=None, timeout=None, verify=None, cert=None,

View File

View File

@ -0,0 +1,289 @@
import logging
from pip._vendor.requests.auth import AuthBase, HTTPBasicAuth
from pip._vendor.requests.utils import get_netrc_auth
from pip._vendor.six.moves.urllib import parse as urllib_parse
from pip._internal.utils.misc import (
ask,
ask_input,
ask_password,
remove_auth_from_url,
split_auth_netloc_from_url,
)
from pip._internal.utils.typing import MYPY_CHECK_RUNNING
if MYPY_CHECK_RUNNING:
from optparse import Values
from typing import Dict, Optional, Tuple
from pip._internal.vcs.versioncontrol import AuthInfo
Credentials = Tuple[str, str, str]
logger = logging.getLogger(__name__)
try:
import keyring # noqa
except ImportError:
keyring = None
except Exception as exc:
logger.warning(
"Keyring is skipped due to an exception: %s", str(exc),
)
keyring = None
def get_keyring_auth(url, username):
"""Return the tuple auth for a given url from keyring."""
if not url or not keyring:
return None
try:
try:
get_credential = keyring.get_credential
except AttributeError:
pass
else:
logger.debug("Getting credentials from keyring for %s", url)
cred = get_credential(url, username)
if cred is not None:
return cred.username, cred.password
return None
if username:
logger.debug("Getting password from keyring for %s", url)
password = keyring.get_password(url, username)
if password:
return username, password
except Exception as exc:
logger.warning(
"Keyring is skipped due to an exception: %s", str(exc),
)
class MultiDomainBasicAuth(AuthBase):
def __init__(self, prompting=True, index_urls=None):
# type: (bool, Optional[Values]) -> None
self.prompting = prompting
self.index_urls = index_urls
self.passwords = {} # type: Dict[str, AuthInfo]
# When the user is prompted to enter credentials and keyring is
# available, we will offer to save them. If the user accepts,
# this value is set to the credentials they entered. After the
# request authenticates, the caller should call
# ``save_credentials`` to save these.
self._credentials_to_save = None # type: Optional[Credentials]
def _get_index_url(self, url):
"""Return the original index URL matching the requested URL.
Cached or dynamically generated credentials may work against
the original index URL rather than just the netloc.
The provided url should have had its username and password
removed already. If the original index url had credentials then
they will be included in the return value.
Returns None if no matching index was found, or if --no-index
was specified by the user.
"""
if not url or not self.index_urls:
return None
for u in self.index_urls:
prefix = remove_auth_from_url(u).rstrip("/") + "/"
if url.startswith(prefix):
return u
def _get_new_credentials(self, original_url, allow_netrc=True,
allow_keyring=True):
"""Find and return credentials for the specified URL."""
# Split the credentials and netloc from the url.
url, netloc, url_user_password = split_auth_netloc_from_url(
original_url,
)
# Start with the credentials embedded in the url
username, password = url_user_password
if username is not None and password is not None:
logger.debug("Found credentials in url for %s", netloc)
return url_user_password
# Find a matching index url for this request
index_url = self._get_index_url(url)
if index_url:
# Split the credentials from the url.
index_info = split_auth_netloc_from_url(index_url)
if index_info:
index_url, _, index_url_user_password = index_info
logger.debug("Found index url %s", index_url)
# If an index URL was found, try its embedded credentials
if index_url and index_url_user_password[0] is not None:
username, password = index_url_user_password
if username is not None and password is not None:
logger.debug("Found credentials in index url for %s", netloc)
return index_url_user_password
# Get creds from netrc if we still don't have them
if allow_netrc:
netrc_auth = get_netrc_auth(original_url)
if netrc_auth:
logger.debug("Found credentials in netrc for %s", netloc)
return netrc_auth
# If we don't have a password and keyring is available, use it.
if allow_keyring:
# The index url is more specific than the netloc, so try it first
kr_auth = (
get_keyring_auth(index_url, username) or
get_keyring_auth(netloc, username)
)
if kr_auth:
logger.debug("Found credentials in keyring for %s", netloc)
return kr_auth
return username, password
def _get_url_and_credentials(self, original_url):
"""Return the credentials to use for the provided URL.
If allowed, netrc and keyring may be used to obtain the
correct credentials.
Returns (url_without_credentials, username, password). Note
that even if the original URL contains credentials, this
function may return a different username and password.
"""
url, netloc, _ = split_auth_netloc_from_url(original_url)
# Use any stored credentials that we have for this netloc
username, password = self.passwords.get(netloc, (None, None))
if username is None and password is None:
# No stored credentials. Acquire new credentials without prompting
# the user. (e.g. from netrc, keyring, or the URL itself)
username, password = self._get_new_credentials(original_url)
if username is not None or password is not None:
# Convert the username and password if they're None, so that
# this netloc will show up as "cached" in the conditional above.
# Further, HTTPBasicAuth doesn't accept None, so it makes sense to
# cache the value that is going to be used.
username = username or ""
password = password or ""
# Store any acquired credentials.
self.passwords[netloc] = (username, password)
assert (
# Credentials were found
(username is not None and password is not None) or
# Credentials were not found
(username is None and password is None)
), "Could not load credentials from url: {}".format(original_url)
return url, username, password
def __call__(self, req):
# Get credentials for this request
url, username, password = self._get_url_and_credentials(req.url)
# Set the url of the request to the url without any credentials
req.url = url
if username is not None and password is not None:
# Send the basic auth with this request
req = HTTPBasicAuth(username, password)(req)
# Attach a hook to handle 401 responses
req.register_hook("response", self.handle_401)
return req
# Factored out to allow for easy patching in tests
def _prompt_for_password(self, netloc):
username = ask_input("User for %s: " % netloc)
if not username:
return None, None
auth = get_keyring_auth(netloc, username)
if auth:
return auth[0], auth[1], False
password = ask_password("Password: ")
return username, password, True
# Factored out to allow for easy patching in tests
def _should_save_password_to_keyring(self):
if not keyring:
return False
return ask("Save credentials to keyring [y/N]: ", ["y", "n"]) == "y"
def handle_401(self, resp, **kwargs):
# We only care about 401 responses, anything else we want to just
# pass through the actual response
if resp.status_code != 401:
return resp
# We are not able to prompt the user so simply return the response
if not self.prompting:
return resp
parsed = urllib_parse.urlparse(resp.url)
# Prompt the user for a new username and password
username, password, save = self._prompt_for_password(parsed.netloc)
# Store the new username and password to use for future requests
self._credentials_to_save = None
if username is not None and password is not None:
self.passwords[parsed.netloc] = (username, password)
# Prompt to save the password to keyring
if save and self._should_save_password_to_keyring():
self._credentials_to_save = (parsed.netloc, username, password)
# Consume content and release the original connection to allow our new
# request to reuse the same one.
resp.content
resp.raw.release_conn()
# Add our new username and password to the request
req = HTTPBasicAuth(username or "", password or "")(resp.request)
req.register_hook("response", self.warn_on_401)
# On successful request, save the credentials that were used to
# keyring. (Note that if the user responded "no" above, this member
# is not set and nothing will be saved.)
if self._credentials_to_save:
req.register_hook("response", self.save_credentials)
# Send our new request
new_resp = resp.connection.send(req, **kwargs)
new_resp.history.append(resp)
return new_resp
def warn_on_401(self, resp, **kwargs):
"""Response callback to warn about incorrect credentials."""
if resp.status_code == 401:
logger.warning(
'401 Error, Credentials not correct for %s', resp.request.url,
)
def save_credentials(self, resp, **kwargs):
"""Response callback to save credentials on success."""
assert keyring is not None, "should never reach here without keyring"
if not keyring:
return
creds = self._credentials_to_save
self._credentials_to_save = None
if creds and resp.status_code < 400:
try:
logger.info('Saving credentials to keyring')
keyring.set_password(*creds)
except Exception:
logger.exception('Failed to save credentials')

View File

@ -1,4 +1,3 @@
import functools
import hashlib
import logging
import os
@ -15,7 +14,6 @@ from pip._vendor.cachecontrol.caches import FileCache
import pip
from pip._internal.download import (
CI_ENVIRONMENT_VARIABLES,
MultiDomainBasicAuth,
PipSession,
SafeFileCache,
_copy_source_tree,
@ -727,215 +725,3 @@ class TestPipSession:
actual_level, actual_message = log_records[0]
assert actual_level == 'WARNING'
assert 'is not a trusted or secure host' in actual_message
@pytest.mark.parametrize(["input_url", "url", "username", "password"], [
(
"http://user%40email.com:password@example.com/path",
"http://example.com/path",
"user@email.com",
"password",
),
(
"http://username:password@example.com/path",
"http://example.com/path",
"username",
"password",
),
(
"http://token@example.com/path",
"http://example.com/path",
"token",
"",
),
(
"http://example.com/path",
"http://example.com/path",
None,
None,
),
])
def test_get_credentials_parses_correctly(input_url, url, username, password):
auth = MultiDomainBasicAuth()
get = auth._get_url_and_credentials
# Check URL parsing
assert get(input_url) == (url, username, password)
assert (
# There are no credentials in the URL
(username is None and password is None) or
# Credentials were found and "cached" appropriately
auth.passwords['example.com'] == (username, password)
)
def test_get_credentials_uses_cached_credentials():
auth = MultiDomainBasicAuth()
auth.passwords['example.com'] = ('user', 'pass')
got = auth._get_url_and_credentials("http://foo:bar@example.com/path")
expected = ('http://example.com/path', 'user', 'pass')
assert got == expected
def test_get_index_url_credentials():
auth = MultiDomainBasicAuth(index_urls=[
"http://foo:bar@example.com/path"
])
get = functools.partial(
auth._get_new_credentials,
allow_netrc=False,
allow_keyring=False
)
# Check resolution of indexes
assert get("http://example.com/path/path2") == ('foo', 'bar')
assert get("http://example.com/path3/path2") == (None, None)
class KeyringModuleV1(object):
"""Represents the supported API of keyring before get_credential
was added.
"""
def __init__(self):
self.saved_passwords = []
def get_password(self, system, username):
if system == "example.com" and username:
return username + "!netloc"
if system == "http://example.com/path2" and username:
return username + "!url"
return None
def set_password(self, system, username, password):
self.saved_passwords.append((system, username, password))
@pytest.mark.parametrize('url, expect', (
("http://example.com/path1", (None, None)),
# path1 URLs will be resolved by netloc
("http://user@example.com/path1", ("user", "user!netloc")),
("http://user2@example.com/path1", ("user2", "user2!netloc")),
# path2 URLs will be resolved by index URL
("http://example.com/path2/path3", (None, None)),
("http://foo@example.com/path2/path3", ("foo", "foo!url")),
))
def test_keyring_get_password(monkeypatch, url, expect):
monkeypatch.setattr('pip._internal.download.keyring', KeyringModuleV1())
auth = MultiDomainBasicAuth(index_urls=["http://example.com/path2"])
actual = auth._get_new_credentials(url, allow_netrc=False,
allow_keyring=True)
assert actual == expect
def test_keyring_get_password_after_prompt(monkeypatch):
monkeypatch.setattr('pip._internal.download.keyring', KeyringModuleV1())
auth = MultiDomainBasicAuth()
def ask_input(prompt):
assert prompt == "User for example.com: "
return "user"
monkeypatch.setattr('pip._internal.download.ask_input', ask_input)
actual = auth._prompt_for_password("example.com")
assert actual == ("user", "user!netloc", False)
def test_keyring_get_password_username_in_index(monkeypatch):
monkeypatch.setattr('pip._internal.download.keyring', KeyringModuleV1())
auth = MultiDomainBasicAuth(index_urls=["http://user@example.com/path2"])
get = functools.partial(
auth._get_new_credentials,
allow_netrc=False,
allow_keyring=True
)
assert get("http://example.com/path2/path3") == ("user", "user!url")
assert get("http://example.com/path4/path1") == (None, None)
@pytest.mark.parametrize("response_status, creds, expect_save", (
(403, ("user", "pass", True), False),
(200, ("user", "pass", True), True,),
(200, ("user", "pass", False), False,),
))
def test_keyring_set_password(monkeypatch, response_status, creds,
expect_save):
keyring = KeyringModuleV1()
monkeypatch.setattr('pip._internal.download.keyring', keyring)
auth = MultiDomainBasicAuth(prompting=True)
monkeypatch.setattr(auth, '_get_url_and_credentials',
lambda u: (u, None, None))
monkeypatch.setattr(auth, '_prompt_for_password', lambda *a: creds)
if creds[2]:
# when _prompt_for_password indicates to save, we should save
def should_save_password_to_keyring(*a):
return True
else:
# when _prompt_for_password indicates not to save, we should
# never call this function
def should_save_password_to_keyring(*a):
assert False, ("_should_save_password_to_keyring should not be " +
"called")
monkeypatch.setattr(auth, '_should_save_password_to_keyring',
should_save_password_to_keyring)
req = MockRequest("https://example.com")
resp = MockResponse(b"")
resp.url = req.url
connection = MockConnection()
def _send(sent_req, **kwargs):
assert sent_req is req
assert "Authorization" in sent_req.headers
r = MockResponse(b"")
r.status_code = response_status
return r
connection._send = _send
resp.request = req
resp.status_code = 401
resp.connection = connection
auth.handle_401(resp)
if expect_save:
assert keyring.saved_passwords == [("example.com", creds[0], creds[1])]
else:
assert keyring.saved_passwords == []
class KeyringModuleV2(object):
"""Represents the current supported API of keyring"""
class Credential(object):
def __init__(self, username, password):
self.username = username
self.password = password
def get_password(self, system, username):
assert False, "get_password should not ever be called"
def get_credential(self, system, username):
if system == "http://example.com/path2":
return self.Credential("username", "url")
if system == "example.com":
return self.Credential("username", "netloc")
return None
@pytest.mark.parametrize('url, expect', (
("http://example.com/path1", ("username", "netloc")),
("http://example.com/path2/path3", ("username", "url")),
("http://user2@example.com/path2/path3", ("username", "url")),
))
def test_keyring_get_credential(monkeypatch, url, expect):
monkeypatch.setattr(pip._internal.download, 'keyring', KeyringModuleV2())
auth = MultiDomainBasicAuth(index_urls=["http://example.com/path2"])
assert auth._get_new_credentials(url, allow_netrc=False,
allow_keyring=True) \
== expect

View File

@ -0,0 +1,224 @@
import functools
import pytest
import pip._internal.network.auth
from pip._internal.network.auth import MultiDomainBasicAuth
from tests.unit.test_download import MockConnection, MockRequest, MockResponse
@pytest.mark.parametrize(["input_url", "url", "username", "password"], [
(
"http://user%40email.com:password@example.com/path",
"http://example.com/path",
"user@email.com",
"password",
),
(
"http://username:password@example.com/path",
"http://example.com/path",
"username",
"password",
),
(
"http://token@example.com/path",
"http://example.com/path",
"token",
"",
),
(
"http://example.com/path",
"http://example.com/path",
None,
None,
),
])
def test_get_credentials_parses_correctly(input_url, url, username, password):
auth = MultiDomainBasicAuth()
get = auth._get_url_and_credentials
# Check URL parsing
assert get(input_url) == (url, username, password)
assert (
# There are no credentials in the URL
(username is None and password is None) or
# Credentials were found and "cached" appropriately
auth.passwords['example.com'] == (username, password)
)
def test_get_credentials_uses_cached_credentials():
auth = MultiDomainBasicAuth()
auth.passwords['example.com'] = ('user', 'pass')
got = auth._get_url_and_credentials("http://foo:bar@example.com/path")
expected = ('http://example.com/path', 'user', 'pass')
assert got == expected
def test_get_index_url_credentials():
auth = MultiDomainBasicAuth(index_urls=[
"http://foo:bar@example.com/path"
])
get = functools.partial(
auth._get_new_credentials,
allow_netrc=False,
allow_keyring=False
)
# Check resolution of indexes
assert get("http://example.com/path/path2") == ('foo', 'bar')
assert get("http://example.com/path3/path2") == (None, None)
class KeyringModuleV1(object):
"""Represents the supported API of keyring before get_credential
was added.
"""
def __init__(self):
self.saved_passwords = []
def get_password(self, system, username):
if system == "example.com" and username:
return username + "!netloc"
if system == "http://example.com/path2" and username:
return username + "!url"
return None
def set_password(self, system, username, password):
self.saved_passwords.append((system, username, password))
@pytest.mark.parametrize('url, expect', (
("http://example.com/path1", (None, None)),
# path1 URLs will be resolved by netloc
("http://user@example.com/path1", ("user", "user!netloc")),
("http://user2@example.com/path1", ("user2", "user2!netloc")),
# path2 URLs will be resolved by index URL
("http://example.com/path2/path3", (None, None)),
("http://foo@example.com/path2/path3", ("foo", "foo!url")),
))
def test_keyring_get_password(monkeypatch, url, expect):
keyring = KeyringModuleV1()
monkeypatch.setattr('pip._internal.network.auth.keyring', keyring)
auth = MultiDomainBasicAuth(index_urls=["http://example.com/path2"])
actual = auth._get_new_credentials(url, allow_netrc=False,
allow_keyring=True)
assert actual == expect
def test_keyring_get_password_after_prompt(monkeypatch):
keyring = KeyringModuleV1()
monkeypatch.setattr('pip._internal.network.auth.keyring', keyring)
auth = MultiDomainBasicAuth()
def ask_input(prompt):
assert prompt == "User for example.com: "
return "user"
monkeypatch.setattr('pip._internal.network.auth.ask_input', ask_input)
actual = auth._prompt_for_password("example.com")
assert actual == ("user", "user!netloc", False)
def test_keyring_get_password_username_in_index(monkeypatch):
keyring = KeyringModuleV1()
monkeypatch.setattr('pip._internal.network.auth.keyring', keyring)
auth = MultiDomainBasicAuth(index_urls=["http://user@example.com/path2"])
get = functools.partial(
auth._get_new_credentials,
allow_netrc=False,
allow_keyring=True
)
assert get("http://example.com/path2/path3") == ("user", "user!url")
assert get("http://example.com/path4/path1") == (None, None)
@pytest.mark.parametrize("response_status, creds, expect_save", (
(403, ("user", "pass", True), False),
(200, ("user", "pass", True), True,),
(200, ("user", "pass", False), False,),
))
def test_keyring_set_password(monkeypatch, response_status, creds,
expect_save):
keyring = KeyringModuleV1()
monkeypatch.setattr('pip._internal.network.auth.keyring', keyring)
auth = MultiDomainBasicAuth(prompting=True)
monkeypatch.setattr(auth, '_get_url_and_credentials',
lambda u: (u, None, None))
monkeypatch.setattr(auth, '_prompt_for_password', lambda *a: creds)
if creds[2]:
# when _prompt_for_password indicates to save, we should save
def should_save_password_to_keyring(*a):
return True
else:
# when _prompt_for_password indicates not to save, we should
# never call this function
def should_save_password_to_keyring(*a):
assert False, ("_should_save_password_to_keyring should not be " +
"called")
monkeypatch.setattr(auth, '_should_save_password_to_keyring',
should_save_password_to_keyring)
req = MockRequest("https://example.com")
resp = MockResponse(b"")
resp.url = req.url
connection = MockConnection()
def _send(sent_req, **kwargs):
assert sent_req is req
assert "Authorization" in sent_req.headers
r = MockResponse(b"")
r.status_code = response_status
return r
connection._send = _send
resp.request = req
resp.status_code = 401
resp.connection = connection
auth.handle_401(resp)
if expect_save:
assert keyring.saved_passwords == [("example.com", creds[0], creds[1])]
else:
assert keyring.saved_passwords == []
class KeyringModuleV2(object):
"""Represents the current supported API of keyring"""
class Credential(object):
def __init__(self, username, password):
self.username = username
self.password = password
def get_password(self, system, username):
assert False, "get_password should not ever be called"
def get_credential(self, system, username):
if system == "http://example.com/path2":
return self.Credential("username", "url")
if system == "example.com":
return self.Credential("username", "netloc")
return None
@pytest.mark.parametrize('url, expect', (
("http://example.com/path1", ("username", "netloc")),
("http://example.com/path2/path3", ("username", "url")),
("http://user2@example.com/path2/path3", ("username", "url")),
))
def test_keyring_get_credential(monkeypatch, url, expect):
monkeypatch.setattr(
pip._internal.network.auth, 'keyring', KeyringModuleV2()
)
auth = MultiDomainBasicAuth(index_urls=["http://example.com/path2"])
assert auth._get_new_credentials(
url, allow_netrc=False, allow_keyring=True
) == expect