mirror of https://github.com/pypa/pip
541 lines
17 KiB
Python
541 lines
17 KiB
Python
import functools
|
|
import os
|
|
import subprocess
|
|
import sys
|
|
from typing import Any, Dict, Iterable, List, Optional, Tuple
|
|
|
|
import pytest
|
|
|
|
import pip._internal.network.auth
|
|
from pip._internal.network.auth import MultiDomainBasicAuth
|
|
from tests.lib.requests_mocks import MockConnection, MockRequest, MockResponse
|
|
|
|
|
|
@pytest.fixture(scope="function", autouse=True)
|
|
def reset_keyring() -> Iterable[None]:
|
|
yield None
|
|
# Reset the state of the module between tests
|
|
pip._internal.network.auth.KEYRING_DISABLED = False
|
|
pip._internal.network.auth.get_keyring_provider.cache_clear()
|
|
|
|
|
|
@pytest.mark.parametrize(
|
|
["input_url", "url", "username", "password"],
|
|
[
|
|
(
|
|
"http://user%40email.com:password@example.com/path",
|
|
"http://example.com/path",
|
|
"user@email.com",
|
|
"password",
|
|
),
|
|
(
|
|
"http://username:password@example.com/path",
|
|
"http://example.com/path",
|
|
"username",
|
|
"password",
|
|
),
|
|
(
|
|
"http://token@example.com/path",
|
|
"http://example.com/path",
|
|
"token",
|
|
"",
|
|
),
|
|
(
|
|
"http://example.com/path",
|
|
"http://example.com/path",
|
|
None,
|
|
None,
|
|
),
|
|
],
|
|
)
|
|
def test_get_credentials_parses_correctly(
|
|
input_url: str, url: str, username: Optional[str], password: Optional[str]
|
|
) -> None:
|
|
auth = MultiDomainBasicAuth()
|
|
get = auth._get_url_and_credentials
|
|
|
|
# Check URL parsing
|
|
assert get(input_url) == (url, username, password)
|
|
assert (
|
|
# There are no credentials in the URL
|
|
(username is None and password is None)
|
|
or
|
|
# Credentials were found and "cached" appropriately
|
|
auth.passwords["example.com"] == (username, password)
|
|
)
|
|
|
|
|
|
def test_get_credentials_not_to_uses_cached_credentials() -> None:
|
|
auth = MultiDomainBasicAuth()
|
|
auth.passwords["example.com"] = ("user", "pass")
|
|
|
|
got = auth._get_url_and_credentials("http://foo:bar@example.com/path")
|
|
expected = ("http://example.com/path", "foo", "bar")
|
|
assert got == expected
|
|
|
|
|
|
def test_get_credentials_not_to_uses_cached_credentials_only_username() -> None:
|
|
auth = MultiDomainBasicAuth()
|
|
auth.passwords["example.com"] = ("user", "pass")
|
|
|
|
got = auth._get_url_and_credentials("http://foo@example.com/path")
|
|
expected = ("http://example.com/path", "foo", "")
|
|
assert got == expected
|
|
|
|
|
|
def test_get_credentials_uses_cached_credentials() -> None:
|
|
auth = MultiDomainBasicAuth()
|
|
auth.passwords["example.com"] = ("user", "pass")
|
|
|
|
got = auth._get_url_and_credentials("http://example.com/path")
|
|
expected = ("http://example.com/path", "user", "pass")
|
|
assert got == expected
|
|
|
|
|
|
def test_get_credentials_uses_cached_credentials_only_username() -> None:
|
|
auth = MultiDomainBasicAuth()
|
|
auth.passwords["example.com"] = ("user", "pass")
|
|
|
|
got = auth._get_url_and_credentials("http://user@example.com/path")
|
|
expected = ("http://example.com/path", "user", "pass")
|
|
assert got == expected
|
|
|
|
|
|
def test_get_index_url_credentials() -> None:
|
|
auth = MultiDomainBasicAuth(
|
|
index_urls=[
|
|
"http://example.com/",
|
|
"http://foo:bar@example.com/path",
|
|
]
|
|
)
|
|
get = functools.partial(
|
|
auth._get_new_credentials, allow_netrc=False, allow_keyring=False
|
|
)
|
|
|
|
# Check resolution of indexes
|
|
assert get("http://example.com/path/path2") == ("foo", "bar")
|
|
assert get("http://example.com/path3/path2") == (None, None)
|
|
|
|
|
|
def test_prioritize_longest_path_prefix_match_organization() -> None:
|
|
auth = MultiDomainBasicAuth(
|
|
index_urls=[
|
|
"http://foo:bar@example.com/org-name-alpha/repo-alias/simple",
|
|
"http://bar:foo@example.com/org-name-beta/repo-alias/simple",
|
|
]
|
|
)
|
|
get = functools.partial(
|
|
auth._get_new_credentials, allow_netrc=False, allow_keyring=False
|
|
)
|
|
|
|
# Inspired by Azure DevOps URL structure, GitLab should look similar
|
|
assert get("http://example.com/org-name-alpha/repo-guid/dowbload/") == (
|
|
"foo",
|
|
"bar",
|
|
)
|
|
assert get("http://example.com/org-name-beta/repo-guid/dowbload/") == ("bar", "foo")
|
|
|
|
|
|
def test_prioritize_longest_path_prefix_match_project() -> None:
|
|
auth = MultiDomainBasicAuth(
|
|
index_urls=[
|
|
"http://foo:bar@example.com/org-alpha/project-name-alpha/repo-alias/simple",
|
|
"http://bar:foo@example.com/org-alpha/project-name-beta/repo-alias/simple",
|
|
]
|
|
)
|
|
get = functools.partial(
|
|
auth._get_new_credentials, allow_netrc=False, allow_keyring=False
|
|
)
|
|
|
|
# Inspired by Azure DevOps URL structure, GitLab should look similar
|
|
assert get(
|
|
"http://example.com/org-alpha/project-name-alpha/repo-guid/dowbload/"
|
|
) == ("foo", "bar")
|
|
assert get(
|
|
"http://example.com/org-alpha/project-name-beta/repo-guid/dowbload/"
|
|
) == ("bar", "foo")
|
|
|
|
|
|
class KeyringModuleV1:
|
|
"""Represents the supported API of keyring before get_credential
|
|
was added.
|
|
"""
|
|
|
|
def __init__(self) -> None:
|
|
self.saved_passwords: List[Tuple[str, str, str]] = []
|
|
|
|
def get_password(self, system: str, username: str) -> Optional[str]:
|
|
if system == "example.com" and username:
|
|
return username + "!netloc"
|
|
if system == "http://example.com/path2/" and username:
|
|
return username + "!url"
|
|
return None
|
|
|
|
def set_password(self, system: str, username: str, password: str) -> None:
|
|
self.saved_passwords.append((system, username, password))
|
|
|
|
|
|
@pytest.mark.parametrize(
|
|
"url, expect",
|
|
(
|
|
("http://example.com/path1", (None, None)),
|
|
# path1 URLs will be resolved by netloc
|
|
("http://user@example.com/path3", ("user", "user!netloc")),
|
|
("http://user2@example.com/path3", ("user2", "user2!netloc")),
|
|
# path2 URLs will be resolved by index URL
|
|
("http://example.com/path2/path3", (None, None)),
|
|
("http://foo@example.com/path2/path3", ("foo", "foo!url")),
|
|
),
|
|
)
|
|
def test_keyring_get_password(
|
|
monkeypatch: pytest.MonkeyPatch,
|
|
url: str,
|
|
expect: Tuple[Optional[str], Optional[str]],
|
|
) -> None:
|
|
keyring = KeyringModuleV1()
|
|
monkeypatch.setitem(sys.modules, "keyring", keyring)
|
|
auth = MultiDomainBasicAuth(
|
|
index_urls=["http://example.com/path2", "http://example.com/path3"],
|
|
keyring_provider="import",
|
|
)
|
|
|
|
actual = auth._get_new_credentials(url, allow_netrc=False, allow_keyring=True)
|
|
assert actual == expect
|
|
|
|
|
|
def test_keyring_get_password_after_prompt(monkeypatch: pytest.MonkeyPatch) -> None:
|
|
keyring = KeyringModuleV1()
|
|
monkeypatch.setitem(sys.modules, "keyring", keyring)
|
|
auth = MultiDomainBasicAuth(keyring_provider="import")
|
|
|
|
def ask_input(prompt: str) -> str:
|
|
assert prompt == "User for example.com: "
|
|
return "user"
|
|
|
|
monkeypatch.setattr("pip._internal.network.auth.ask_input", ask_input)
|
|
actual = auth._prompt_for_password("example.com")
|
|
assert actual == ("user", "user!netloc", False)
|
|
|
|
|
|
def test_keyring_get_password_after_prompt_when_none(
|
|
monkeypatch: pytest.MonkeyPatch,
|
|
) -> None:
|
|
keyring = KeyringModuleV1()
|
|
monkeypatch.setitem(sys.modules, "keyring", keyring)
|
|
auth = MultiDomainBasicAuth(keyring_provider="import")
|
|
|
|
def ask_input(prompt: str) -> str:
|
|
assert prompt == "User for unknown.com: "
|
|
return "user"
|
|
|
|
def ask_password(prompt: str) -> str:
|
|
assert prompt == "Password: "
|
|
return "fake_password"
|
|
|
|
monkeypatch.setattr("pip._internal.network.auth.ask_input", ask_input)
|
|
monkeypatch.setattr("pip._internal.network.auth.ask_password", ask_password)
|
|
actual = auth._prompt_for_password("unknown.com")
|
|
assert actual == ("user", "fake_password", True)
|
|
|
|
|
|
def test_keyring_get_password_username_in_index(
|
|
monkeypatch: pytest.MonkeyPatch,
|
|
) -> None:
|
|
keyring = KeyringModuleV1()
|
|
monkeypatch.setitem(sys.modules, "keyring", keyring)
|
|
auth = MultiDomainBasicAuth(
|
|
index_urls=["http://user@example.com/path2", "http://example.com/path4"],
|
|
keyring_provider="import",
|
|
)
|
|
get = functools.partial(
|
|
auth._get_new_credentials, allow_netrc=False, allow_keyring=True
|
|
)
|
|
|
|
assert get("http://example.com/path2/path3") == ("user", "user!url")
|
|
assert get("http://example.com/path4/path1") == (None, None)
|
|
|
|
|
|
@pytest.mark.parametrize(
|
|
"response_status, creds, expect_save",
|
|
(
|
|
(403, ("user", "pass", True), False),
|
|
(
|
|
200,
|
|
("user", "pass", True),
|
|
True,
|
|
),
|
|
(
|
|
200,
|
|
("user", "pass", False),
|
|
False,
|
|
),
|
|
),
|
|
)
|
|
def test_keyring_set_password(
|
|
monkeypatch: pytest.MonkeyPatch,
|
|
response_status: int,
|
|
creds: Tuple[str, str, bool],
|
|
expect_save: bool,
|
|
) -> None:
|
|
keyring = KeyringModuleV1()
|
|
monkeypatch.setitem(sys.modules, "keyring", keyring)
|
|
auth = MultiDomainBasicAuth(prompting=True, keyring_provider="import")
|
|
monkeypatch.setattr(auth, "_get_url_and_credentials", lambda u: (u, None, None))
|
|
monkeypatch.setattr(auth, "_prompt_for_password", lambda *a: creds)
|
|
if creds[2]:
|
|
# when _prompt_for_password indicates to save, we should save
|
|
def should_save_password_to_keyring(*a: Any) -> bool:
|
|
return True
|
|
|
|
else:
|
|
# when _prompt_for_password indicates not to save, we should
|
|
# never call this function
|
|
def should_save_password_to_keyring(*a: Any) -> bool:
|
|
assert False, "_should_save_password_to_keyring should not be called"
|
|
|
|
monkeypatch.setattr(
|
|
auth, "_should_save_password_to_keyring", should_save_password_to_keyring
|
|
)
|
|
|
|
req = MockRequest("https://example.com")
|
|
resp = MockResponse(b"")
|
|
resp.url = req.url
|
|
connection = MockConnection()
|
|
|
|
def _send(sent_req: MockRequest, **kwargs: Any) -> MockResponse:
|
|
assert sent_req is req
|
|
assert "Authorization" in sent_req.headers
|
|
r = MockResponse(b"")
|
|
r.status_code = response_status
|
|
return r
|
|
|
|
# https://github.com/python/mypy/issues/2427
|
|
connection._send = _send # type: ignore[assignment]
|
|
|
|
resp.request = req
|
|
resp.status_code = 401
|
|
resp.connection = connection
|
|
|
|
auth.handle_401(resp)
|
|
|
|
if expect_save:
|
|
assert keyring.saved_passwords == [("example.com", creds[0], creds[1])]
|
|
else:
|
|
assert keyring.saved_passwords == []
|
|
|
|
|
|
class KeyringModuleV2:
|
|
"""Represents the current supported API of keyring"""
|
|
|
|
class Credential:
|
|
def __init__(self, username: str, password: str) -> None:
|
|
self.username = username
|
|
self.password = password
|
|
|
|
def get_password(self, system: str, username: str) -> None:
|
|
assert False, "get_password should not ever be called"
|
|
|
|
def get_credential(self, system: str, username: str) -> Optional[Credential]:
|
|
if system == "http://example.com/path2/":
|
|
return self.Credential("username", "url")
|
|
if system == "example.com":
|
|
return self.Credential("username", "netloc")
|
|
return None
|
|
|
|
|
|
@pytest.mark.parametrize(
|
|
"url, expect",
|
|
(
|
|
("http://example.com/path1", ("username", "netloc")),
|
|
("http://example.com/path2/path3", ("username", "url")),
|
|
("http://user2@example.com/path2/path3", ("username", "url")),
|
|
),
|
|
)
|
|
def test_keyring_get_credential(
|
|
monkeypatch: pytest.MonkeyPatch, url: str, expect: Tuple[str, str]
|
|
) -> None:
|
|
monkeypatch.setitem(sys.modules, "keyring", KeyringModuleV2())
|
|
auth = MultiDomainBasicAuth(
|
|
index_urls=["http://example.com/path1", "http://example.com/path2"],
|
|
keyring_provider="import",
|
|
)
|
|
|
|
assert (
|
|
auth._get_new_credentials(url, allow_netrc=False, allow_keyring=True) == expect
|
|
)
|
|
|
|
|
|
class KeyringModuleBroken:
|
|
"""Represents the current supported API of keyring, but broken"""
|
|
|
|
def __init__(self) -> None:
|
|
self._call_count = 0
|
|
|
|
def get_credential(self, system: str, username: str) -> None:
|
|
self._call_count += 1
|
|
raise Exception("This keyring is broken!")
|
|
|
|
|
|
def test_broken_keyring_disables_keyring(monkeypatch: pytest.MonkeyPatch) -> None:
|
|
keyring_broken = KeyringModuleBroken()
|
|
monkeypatch.setitem(sys.modules, "keyring", keyring_broken)
|
|
|
|
auth = MultiDomainBasicAuth(
|
|
index_urls=["http://example.com/"], keyring_provider="import"
|
|
)
|
|
|
|
assert keyring_broken._call_count == 0
|
|
for i in range(5):
|
|
url = "http://example.com/path" + str(i)
|
|
assert auth._get_new_credentials(
|
|
url, allow_netrc=False, allow_keyring=True
|
|
) == (None, None)
|
|
assert keyring_broken._call_count == 1
|
|
|
|
|
|
class KeyringSubprocessResult(KeyringModuleV1):
|
|
"""Represents the subprocess call to keyring"""
|
|
|
|
returncode = 0 # Default to zero retcode
|
|
|
|
def __call__(
|
|
self,
|
|
cmd: List[str],
|
|
*,
|
|
env: Dict[str, str],
|
|
stdin: Optional[Any] = None,
|
|
stdout: Optional[Any] = None,
|
|
input: Optional[bytes] = None,
|
|
check: Optional[bool] = None
|
|
) -> Any:
|
|
if cmd[1] == "get":
|
|
assert stdin == -3 # subprocess.DEVNULL
|
|
assert stdout == subprocess.PIPE
|
|
assert env["PYTHONIOENCODING"] == "utf-8"
|
|
assert check is None
|
|
|
|
password = self.get_password(*cmd[2:])
|
|
if password is None:
|
|
# Expect non-zero returncode if no password present
|
|
self.returncode = 1
|
|
else:
|
|
# Passwords are returned encoded with a newline appended
|
|
self.returncode = 0
|
|
self.stdout = (password + os.linesep).encode("utf-8")
|
|
|
|
if cmd[1] == "set":
|
|
assert stdin is None
|
|
assert stdout is None
|
|
assert env["PYTHONIOENCODING"] == "utf-8"
|
|
assert input is not None
|
|
assert check
|
|
|
|
# Input from stdin is encoded
|
|
self.set_password(cmd[2], cmd[3], input.decode("utf-8").strip(os.linesep))
|
|
|
|
return self
|
|
|
|
def check_returncode(self) -> None:
|
|
if self.returncode:
|
|
raise Exception()
|
|
|
|
|
|
@pytest.mark.parametrize(
|
|
"url, expect",
|
|
(
|
|
("http://example.com/path1", (None, None)),
|
|
# path1 URLs will be resolved by netloc
|
|
("http://user@example.com/path3", ("user", "user!netloc")),
|
|
("http://user2@example.com/path3", ("user2", "user2!netloc")),
|
|
# path2 URLs will be resolved by index URL
|
|
("http://example.com/path2/path3", (None, None)),
|
|
("http://foo@example.com/path2/path3", ("foo", "foo!url")),
|
|
),
|
|
)
|
|
def test_keyring_cli_get_password(
|
|
monkeypatch: pytest.MonkeyPatch,
|
|
url: str,
|
|
expect: Tuple[Optional[str], Optional[str]],
|
|
) -> None:
|
|
monkeypatch.setattr(pip._internal.network.auth.shutil, "which", lambda x: "keyring")
|
|
monkeypatch.setattr(
|
|
pip._internal.network.auth.subprocess, "run", KeyringSubprocessResult()
|
|
)
|
|
auth = MultiDomainBasicAuth(
|
|
index_urls=["http://example.com/path2", "http://example.com/path3"],
|
|
keyring_provider="subprocess",
|
|
)
|
|
|
|
actual = auth._get_new_credentials(url, allow_netrc=False, allow_keyring=True)
|
|
assert actual == expect
|
|
|
|
|
|
@pytest.mark.parametrize(
|
|
"response_status, creds, expect_save",
|
|
(
|
|
(403, ("user", "pass", True), False),
|
|
(
|
|
200,
|
|
("user", "pass", True),
|
|
True,
|
|
),
|
|
(
|
|
200,
|
|
("user", "pass", False),
|
|
False,
|
|
),
|
|
),
|
|
)
|
|
def test_keyring_cli_set_password(
|
|
monkeypatch: pytest.MonkeyPatch,
|
|
response_status: int,
|
|
creds: Tuple[str, str, bool],
|
|
expect_save: bool,
|
|
) -> None:
|
|
monkeypatch.setattr(pip._internal.network.auth.shutil, "which", lambda x: "keyring")
|
|
keyring = KeyringSubprocessResult()
|
|
monkeypatch.setattr(pip._internal.network.auth.subprocess, "run", keyring)
|
|
auth = MultiDomainBasicAuth(prompting=True, keyring_provider="subprocess")
|
|
monkeypatch.setattr(auth, "_get_url_and_credentials", lambda u: (u, None, None))
|
|
monkeypatch.setattr(auth, "_prompt_for_password", lambda *a: creds)
|
|
if creds[2]:
|
|
# when _prompt_for_password indicates to save, we should save
|
|
def should_save_password_to_keyring(*a: Any) -> bool:
|
|
return True
|
|
|
|
else:
|
|
# when _prompt_for_password indicates not to save, we should
|
|
# never call this function
|
|
def should_save_password_to_keyring(*a: Any) -> bool:
|
|
assert False, "_should_save_password_to_keyring should not be called"
|
|
|
|
monkeypatch.setattr(
|
|
auth, "_should_save_password_to_keyring", should_save_password_to_keyring
|
|
)
|
|
|
|
req = MockRequest("https://example.com")
|
|
resp = MockResponse(b"")
|
|
resp.url = req.url
|
|
connection = MockConnection()
|
|
|
|
def _send(sent_req: MockRequest, **kwargs: Any) -> MockResponse:
|
|
assert sent_req is req
|
|
assert "Authorization" in sent_req.headers
|
|
r = MockResponse(b"")
|
|
r.status_code = response_status
|
|
return r
|
|
|
|
# https://github.com/python/mypy/issues/2427
|
|
connection._send = _send # type: ignore[assignment]
|
|
|
|
resp.request = req
|
|
resp.status_code = 401
|
|
resp.connection = connection
|
|
|
|
auth.handle_401(resp)
|
|
|
|
if expect_save:
|
|
assert keyring.saved_passwords == [("example.com", creds[0], creds[1])]
|
|
else:
|
|
assert keyring.saved_passwords == []
|