pip/tests/unit/test_network_auth.py

541 lines
17 KiB
Python

import functools
import os
import subprocess
import sys
from typing import Any, Dict, Iterable, List, Optional, Tuple
import pytest
import pip._internal.network.auth
from pip._internal.network.auth import MultiDomainBasicAuth
from tests.lib.requests_mocks import MockConnection, MockRequest, MockResponse
@pytest.fixture(scope="function", autouse=True)
def reset_keyring() -> Iterable[None]:
yield None
# Reset the state of the module between tests
pip._internal.network.auth.KEYRING_DISABLED = False
pip._internal.network.auth.get_keyring_provider.cache_clear()
@pytest.mark.parametrize(
["input_url", "url", "username", "password"],
[
(
"http://user%40email.com:password@example.com/path",
"http://example.com/path",
"user@email.com",
"password",
),
(
"http://username:password@example.com/path",
"http://example.com/path",
"username",
"password",
),
(
"http://token@example.com/path",
"http://example.com/path",
"token",
"",
),
(
"http://example.com/path",
"http://example.com/path",
None,
None,
),
],
)
def test_get_credentials_parses_correctly(
input_url: str, url: str, username: Optional[str], password: Optional[str]
) -> None:
auth = MultiDomainBasicAuth()
get = auth._get_url_and_credentials
# Check URL parsing
assert get(input_url) == (url, username, password)
assert (
# There are no credentials in the URL
(username is None and password is None)
or
# Credentials were found and "cached" appropriately
auth.passwords["example.com"] == (username, password)
)
def test_get_credentials_not_to_uses_cached_credentials() -> None:
auth = MultiDomainBasicAuth()
auth.passwords["example.com"] = ("user", "pass")
got = auth._get_url_and_credentials("http://foo:bar@example.com/path")
expected = ("http://example.com/path", "foo", "bar")
assert got == expected
def test_get_credentials_not_to_uses_cached_credentials_only_username() -> None:
auth = MultiDomainBasicAuth()
auth.passwords["example.com"] = ("user", "pass")
got = auth._get_url_and_credentials("http://foo@example.com/path")
expected = ("http://example.com/path", "foo", "")
assert got == expected
def test_get_credentials_uses_cached_credentials() -> None:
auth = MultiDomainBasicAuth()
auth.passwords["example.com"] = ("user", "pass")
got = auth._get_url_and_credentials("http://example.com/path")
expected = ("http://example.com/path", "user", "pass")
assert got == expected
def test_get_credentials_uses_cached_credentials_only_username() -> None:
auth = MultiDomainBasicAuth()
auth.passwords["example.com"] = ("user", "pass")
got = auth._get_url_and_credentials("http://user@example.com/path")
expected = ("http://example.com/path", "user", "pass")
assert got == expected
def test_get_index_url_credentials() -> None:
auth = MultiDomainBasicAuth(
index_urls=[
"http://example.com/",
"http://foo:bar@example.com/path",
]
)
get = functools.partial(
auth._get_new_credentials, allow_netrc=False, allow_keyring=False
)
# Check resolution of indexes
assert get("http://example.com/path/path2") == ("foo", "bar")
assert get("http://example.com/path3/path2") == (None, None)
def test_prioritize_longest_path_prefix_match_organization() -> None:
auth = MultiDomainBasicAuth(
index_urls=[
"http://foo:bar@example.com/org-name-alpha/repo-alias/simple",
"http://bar:foo@example.com/org-name-beta/repo-alias/simple",
]
)
get = functools.partial(
auth._get_new_credentials, allow_netrc=False, allow_keyring=False
)
# Inspired by Azure DevOps URL structure, GitLab should look similar
assert get("http://example.com/org-name-alpha/repo-guid/dowbload/") == (
"foo",
"bar",
)
assert get("http://example.com/org-name-beta/repo-guid/dowbload/") == ("bar", "foo")
def test_prioritize_longest_path_prefix_match_project() -> None:
auth = MultiDomainBasicAuth(
index_urls=[
"http://foo:bar@example.com/org-alpha/project-name-alpha/repo-alias/simple",
"http://bar:foo@example.com/org-alpha/project-name-beta/repo-alias/simple",
]
)
get = functools.partial(
auth._get_new_credentials, allow_netrc=False, allow_keyring=False
)
# Inspired by Azure DevOps URL structure, GitLab should look similar
assert get(
"http://example.com/org-alpha/project-name-alpha/repo-guid/dowbload/"
) == ("foo", "bar")
assert get(
"http://example.com/org-alpha/project-name-beta/repo-guid/dowbload/"
) == ("bar", "foo")
class KeyringModuleV1:
"""Represents the supported API of keyring before get_credential
was added.
"""
def __init__(self) -> None:
self.saved_passwords: List[Tuple[str, str, str]] = []
def get_password(self, system: str, username: str) -> Optional[str]:
if system == "example.com" and username:
return username + "!netloc"
if system == "http://example.com/path2/" and username:
return username + "!url"
return None
def set_password(self, system: str, username: str, password: str) -> None:
self.saved_passwords.append((system, username, password))
@pytest.mark.parametrize(
"url, expect",
(
("http://example.com/path1", (None, None)),
# path1 URLs will be resolved by netloc
("http://user@example.com/path3", ("user", "user!netloc")),
("http://user2@example.com/path3", ("user2", "user2!netloc")),
# path2 URLs will be resolved by index URL
("http://example.com/path2/path3", (None, None)),
("http://foo@example.com/path2/path3", ("foo", "foo!url")),
),
)
def test_keyring_get_password(
monkeypatch: pytest.MonkeyPatch,
url: str,
expect: Tuple[Optional[str], Optional[str]],
) -> None:
keyring = KeyringModuleV1()
monkeypatch.setitem(sys.modules, "keyring", keyring)
auth = MultiDomainBasicAuth(
index_urls=["http://example.com/path2", "http://example.com/path3"],
keyring_provider="import",
)
actual = auth._get_new_credentials(url, allow_netrc=False, allow_keyring=True)
assert actual == expect
def test_keyring_get_password_after_prompt(monkeypatch: pytest.MonkeyPatch) -> None:
keyring = KeyringModuleV1()
monkeypatch.setitem(sys.modules, "keyring", keyring)
auth = MultiDomainBasicAuth(keyring_provider="import")
def ask_input(prompt: str) -> str:
assert prompt == "User for example.com: "
return "user"
monkeypatch.setattr("pip._internal.network.auth.ask_input", ask_input)
actual = auth._prompt_for_password("example.com")
assert actual == ("user", "user!netloc", False)
def test_keyring_get_password_after_prompt_when_none(
monkeypatch: pytest.MonkeyPatch,
) -> None:
keyring = KeyringModuleV1()
monkeypatch.setitem(sys.modules, "keyring", keyring)
auth = MultiDomainBasicAuth(keyring_provider="import")
def ask_input(prompt: str) -> str:
assert prompt == "User for unknown.com: "
return "user"
def ask_password(prompt: str) -> str:
assert prompt == "Password: "
return "fake_password"
monkeypatch.setattr("pip._internal.network.auth.ask_input", ask_input)
monkeypatch.setattr("pip._internal.network.auth.ask_password", ask_password)
actual = auth._prompt_for_password("unknown.com")
assert actual == ("user", "fake_password", True)
def test_keyring_get_password_username_in_index(
monkeypatch: pytest.MonkeyPatch,
) -> None:
keyring = KeyringModuleV1()
monkeypatch.setitem(sys.modules, "keyring", keyring)
auth = MultiDomainBasicAuth(
index_urls=["http://user@example.com/path2", "http://example.com/path4"],
keyring_provider="import",
)
get = functools.partial(
auth._get_new_credentials, allow_netrc=False, allow_keyring=True
)
assert get("http://example.com/path2/path3") == ("user", "user!url")
assert get("http://example.com/path4/path1") == (None, None)
@pytest.mark.parametrize(
"response_status, creds, expect_save",
(
(403, ("user", "pass", True), False),
(
200,
("user", "pass", True),
True,
),
(
200,
("user", "pass", False),
False,
),
),
)
def test_keyring_set_password(
monkeypatch: pytest.MonkeyPatch,
response_status: int,
creds: Tuple[str, str, bool],
expect_save: bool,
) -> None:
keyring = KeyringModuleV1()
monkeypatch.setitem(sys.modules, "keyring", keyring)
auth = MultiDomainBasicAuth(prompting=True, keyring_provider="import")
monkeypatch.setattr(auth, "_get_url_and_credentials", lambda u: (u, None, None))
monkeypatch.setattr(auth, "_prompt_for_password", lambda *a: creds)
if creds[2]:
# when _prompt_for_password indicates to save, we should save
def should_save_password_to_keyring(*a: Any) -> bool:
return True
else:
# when _prompt_for_password indicates not to save, we should
# never call this function
def should_save_password_to_keyring(*a: Any) -> bool:
assert False, "_should_save_password_to_keyring should not be called"
monkeypatch.setattr(
auth, "_should_save_password_to_keyring", should_save_password_to_keyring
)
req = MockRequest("https://example.com")
resp = MockResponse(b"")
resp.url = req.url
connection = MockConnection()
def _send(sent_req: MockRequest, **kwargs: Any) -> MockResponse:
assert sent_req is req
assert "Authorization" in sent_req.headers
r = MockResponse(b"")
r.status_code = response_status
return r
# https://github.com/python/mypy/issues/2427
connection._send = _send # type: ignore[assignment]
resp.request = req
resp.status_code = 401
resp.connection = connection
auth.handle_401(resp)
if expect_save:
assert keyring.saved_passwords == [("example.com", creds[0], creds[1])]
else:
assert keyring.saved_passwords == []
class KeyringModuleV2:
"""Represents the current supported API of keyring"""
class Credential:
def __init__(self, username: str, password: str) -> None:
self.username = username
self.password = password
def get_password(self, system: str, username: str) -> None:
assert False, "get_password should not ever be called"
def get_credential(self, system: str, username: str) -> Optional[Credential]:
if system == "http://example.com/path2/":
return self.Credential("username", "url")
if system == "example.com":
return self.Credential("username", "netloc")
return None
@pytest.mark.parametrize(
"url, expect",
(
("http://example.com/path1", ("username", "netloc")),
("http://example.com/path2/path3", ("username", "url")),
("http://user2@example.com/path2/path3", ("username", "url")),
),
)
def test_keyring_get_credential(
monkeypatch: pytest.MonkeyPatch, url: str, expect: Tuple[str, str]
) -> None:
monkeypatch.setitem(sys.modules, "keyring", KeyringModuleV2())
auth = MultiDomainBasicAuth(
index_urls=["http://example.com/path1", "http://example.com/path2"],
keyring_provider="import",
)
assert (
auth._get_new_credentials(url, allow_netrc=False, allow_keyring=True) == expect
)
class KeyringModuleBroken:
"""Represents the current supported API of keyring, but broken"""
def __init__(self) -> None:
self._call_count = 0
def get_credential(self, system: str, username: str) -> None:
self._call_count += 1
raise Exception("This keyring is broken!")
def test_broken_keyring_disables_keyring(monkeypatch: pytest.MonkeyPatch) -> None:
keyring_broken = KeyringModuleBroken()
monkeypatch.setitem(sys.modules, "keyring", keyring_broken)
auth = MultiDomainBasicAuth(
index_urls=["http://example.com/"], keyring_provider="import"
)
assert keyring_broken._call_count == 0
for i in range(5):
url = "http://example.com/path" + str(i)
assert auth._get_new_credentials(
url, allow_netrc=False, allow_keyring=True
) == (None, None)
assert keyring_broken._call_count == 1
class KeyringSubprocessResult(KeyringModuleV1):
"""Represents the subprocess call to keyring"""
returncode = 0 # Default to zero retcode
def __call__(
self,
cmd: List[str],
*,
env: Dict[str, str],
stdin: Optional[Any] = None,
stdout: Optional[Any] = None,
input: Optional[bytes] = None,
check: Optional[bool] = None
) -> Any:
if cmd[1] == "get":
assert stdin == -3 # subprocess.DEVNULL
assert stdout == subprocess.PIPE
assert env["PYTHONIOENCODING"] == "utf-8"
assert check is None
password = self.get_password(*cmd[2:])
if password is None:
# Expect non-zero returncode if no password present
self.returncode = 1
else:
# Passwords are returned encoded with a newline appended
self.returncode = 0
self.stdout = (password + os.linesep).encode("utf-8")
if cmd[1] == "set":
assert stdin is None
assert stdout is None
assert env["PYTHONIOENCODING"] == "utf-8"
assert input is not None
assert check
# Input from stdin is encoded
self.set_password(cmd[2], cmd[3], input.decode("utf-8").strip(os.linesep))
return self
def check_returncode(self) -> None:
if self.returncode:
raise Exception()
@pytest.mark.parametrize(
"url, expect",
(
("http://example.com/path1", (None, None)),
# path1 URLs will be resolved by netloc
("http://user@example.com/path3", ("user", "user!netloc")),
("http://user2@example.com/path3", ("user2", "user2!netloc")),
# path2 URLs will be resolved by index URL
("http://example.com/path2/path3", (None, None)),
("http://foo@example.com/path2/path3", ("foo", "foo!url")),
),
)
def test_keyring_cli_get_password(
monkeypatch: pytest.MonkeyPatch,
url: str,
expect: Tuple[Optional[str], Optional[str]],
) -> None:
monkeypatch.setattr(pip._internal.network.auth.shutil, "which", lambda x: "keyring")
monkeypatch.setattr(
pip._internal.network.auth.subprocess, "run", KeyringSubprocessResult()
)
auth = MultiDomainBasicAuth(
index_urls=["http://example.com/path2", "http://example.com/path3"],
keyring_provider="subprocess",
)
actual = auth._get_new_credentials(url, allow_netrc=False, allow_keyring=True)
assert actual == expect
@pytest.mark.parametrize(
"response_status, creds, expect_save",
(
(403, ("user", "pass", True), False),
(
200,
("user", "pass", True),
True,
),
(
200,
("user", "pass", False),
False,
),
),
)
def test_keyring_cli_set_password(
monkeypatch: pytest.MonkeyPatch,
response_status: int,
creds: Tuple[str, str, bool],
expect_save: bool,
) -> None:
monkeypatch.setattr(pip._internal.network.auth.shutil, "which", lambda x: "keyring")
keyring = KeyringSubprocessResult()
monkeypatch.setattr(pip._internal.network.auth.subprocess, "run", keyring)
auth = MultiDomainBasicAuth(prompting=True, keyring_provider="subprocess")
monkeypatch.setattr(auth, "_get_url_and_credentials", lambda u: (u, None, None))
monkeypatch.setattr(auth, "_prompt_for_password", lambda *a: creds)
if creds[2]:
# when _prompt_for_password indicates to save, we should save
def should_save_password_to_keyring(*a: Any) -> bool:
return True
else:
# when _prompt_for_password indicates not to save, we should
# never call this function
def should_save_password_to_keyring(*a: Any) -> bool:
assert False, "_should_save_password_to_keyring should not be called"
monkeypatch.setattr(
auth, "_should_save_password_to_keyring", should_save_password_to_keyring
)
req = MockRequest("https://example.com")
resp = MockResponse(b"")
resp.url = req.url
connection = MockConnection()
def _send(sent_req: MockRequest, **kwargs: Any) -> MockResponse:
assert sent_req is req
assert "Authorization" in sent_req.headers
r = MockResponse(b"")
r.status_code = response_status
return r
# https://github.com/python/mypy/issues/2427
connection._send = _send # type: ignore[assignment]
resp.request = req
resp.status_code = 401
resp.connection = connection
auth.handle_401(resp)
if expect_save:
assert keyring.saved_passwords == [("example.com", creds[0], creds[1])]
else:
assert keyring.saved_passwords == []