mirror of https://github.com/pypa/pip
Merge branch 'main' into main
This commit is contained in:
commit
bec1d0644f
|
@ -0,0 +1,2 @@
|
|||
Enable the use of ``keyring`` found on ``PATH``. This allows ``keyring``
|
||||
installed using ``pipx`` to be used by ``pip``.
|
|
@ -4,8 +4,12 @@ Contains interface (MultiDomainBasicAuth) and associated glue code for
|
|||
providing credentials in the context of network requests.
|
||||
"""
|
||||
|
||||
import os
|
||||
import shutil
|
||||
import subprocess
|
||||
import urllib.parse
|
||||
from typing import Any, Dict, List, Optional, Tuple
|
||||
from abc import ABC, abstractmethod
|
||||
from typing import Any, Dict, List, NamedTuple, Optional, Tuple
|
||||
|
||||
from pip._vendor.requests.auth import AuthBase, HTTPBasicAuth
|
||||
from pip._vendor.requests.models import Request, Response
|
||||
|
@ -23,51 +27,165 @@ from pip._internal.vcs.versioncontrol import AuthInfo
|
|||
|
||||
logger = getLogger(__name__)
|
||||
|
||||
Credentials = Tuple[str, str, str]
|
||||
|
||||
try:
|
||||
import keyring
|
||||
except ImportError:
|
||||
keyring = None # type: ignore[assignment]
|
||||
except Exception as exc:
|
||||
logger.warning(
|
||||
"Keyring is skipped due to an exception: %s",
|
||||
str(exc),
|
||||
)
|
||||
keyring = None # type: ignore[assignment]
|
||||
KEYRING_DISABLED = False
|
||||
|
||||
|
||||
def get_keyring_auth(url: Optional[str], username: Optional[str]) -> Optional[AuthInfo]:
|
||||
"""Return the tuple auth for a given url from keyring."""
|
||||
global keyring
|
||||
if not url or not keyring:
|
||||
class Credentials(NamedTuple):
|
||||
url: str
|
||||
username: str
|
||||
password: str
|
||||
|
||||
|
||||
class KeyRingBaseProvider(ABC):
|
||||
"""Keyring base provider interface"""
|
||||
|
||||
@abstractmethod
|
||||
def get_auth_info(self, url: str, username: Optional[str]) -> Optional[AuthInfo]:
|
||||
...
|
||||
|
||||
@abstractmethod
|
||||
def save_auth_info(self, url: str, username: str, password: str) -> None:
|
||||
...
|
||||
|
||||
|
||||
class KeyRingNullProvider(KeyRingBaseProvider):
|
||||
"""Keyring null provider"""
|
||||
|
||||
def get_auth_info(self, url: str, username: Optional[str]) -> Optional[AuthInfo]:
|
||||
return None
|
||||
|
||||
try:
|
||||
try:
|
||||
get_credential = keyring.get_credential
|
||||
except AttributeError:
|
||||
pass
|
||||
else:
|
||||
def save_auth_info(self, url: str, username: str, password: str) -> None:
|
||||
return None
|
||||
|
||||
|
||||
class KeyRingPythonProvider(KeyRingBaseProvider):
|
||||
"""Keyring interface which uses locally imported `keyring`"""
|
||||
|
||||
def __init__(self) -> None:
|
||||
import keyring
|
||||
|
||||
self.keyring = keyring
|
||||
|
||||
def get_auth_info(self, url: str, username: Optional[str]) -> Optional[AuthInfo]:
|
||||
# Support keyring's get_credential interface which supports getting
|
||||
# credentials without a username. This is only available for
|
||||
# keyring>=15.2.0.
|
||||
if hasattr(self.keyring, "get_credential"):
|
||||
logger.debug("Getting credentials from keyring for %s", url)
|
||||
cred = get_credential(url, username)
|
||||
cred = self.keyring.get_credential(url, username)
|
||||
if cred is not None:
|
||||
return cred.username, cred.password
|
||||
return None
|
||||
|
||||
if username:
|
||||
if username is not None:
|
||||
logger.debug("Getting password from keyring for %s", url)
|
||||
password = keyring.get_password(url, username)
|
||||
password = self.keyring.get_password(url, username)
|
||||
if password:
|
||||
return username, password
|
||||
return None
|
||||
|
||||
def save_auth_info(self, url: str, username: str, password: str) -> None:
|
||||
self.keyring.set_password(url, username, password)
|
||||
|
||||
|
||||
class KeyRingCliProvider(KeyRingBaseProvider):
|
||||
"""Provider which uses `keyring` cli
|
||||
|
||||
Instead of calling the keyring package installed alongside pip
|
||||
we call keyring on the command line which will enable pip to
|
||||
use which ever installation of keyring is available first in
|
||||
PATH.
|
||||
"""
|
||||
|
||||
def __init__(self, cmd: str) -> None:
|
||||
self.keyring = cmd
|
||||
|
||||
def get_auth_info(self, url: str, username: Optional[str]) -> Optional[AuthInfo]:
|
||||
# This is the default implementation of keyring.get_credential
|
||||
# https://github.com/jaraco/keyring/blob/97689324abcf01bd1793d49063e7ca01e03d7d07/keyring/backend.py#L134-L139
|
||||
if username is not None:
|
||||
password = self._get_password(url, username)
|
||||
if password is not None:
|
||||
return username, password
|
||||
return None
|
||||
|
||||
def save_auth_info(self, url: str, username: str, password: str) -> None:
|
||||
return self._set_password(url, username, password)
|
||||
|
||||
def _get_password(self, service_name: str, username: str) -> Optional[str]:
|
||||
"""Mirror the implemenation of keyring.get_password using cli"""
|
||||
if self.keyring is None:
|
||||
return None
|
||||
|
||||
cmd = [self.keyring, "get", service_name, username]
|
||||
env = os.environ.copy()
|
||||
env["PYTHONIOENCODING"] = "utf-8"
|
||||
res = subprocess.run(
|
||||
cmd,
|
||||
stdin=subprocess.DEVNULL,
|
||||
capture_output=True,
|
||||
env=env,
|
||||
)
|
||||
if res.returncode:
|
||||
return None
|
||||
return res.stdout.decode("utf-8").strip("\n")
|
||||
|
||||
def _set_password(self, service_name: str, username: str, password: str) -> None:
|
||||
"""Mirror the implemenation of keyring.set_password using cli"""
|
||||
if self.keyring is None:
|
||||
return None
|
||||
|
||||
cmd = [self.keyring, "set", service_name, username]
|
||||
input_ = password.encode("utf-8") + b"\n"
|
||||
env = os.environ.copy()
|
||||
env["PYTHONIOENCODING"] = "utf-8"
|
||||
res = subprocess.run(cmd, input=input_, env=env)
|
||||
res.check_returncode()
|
||||
return None
|
||||
|
||||
|
||||
def get_keyring_provider() -> KeyRingBaseProvider:
|
||||
# keyring has previously failed and been disabled
|
||||
if not KEYRING_DISABLED:
|
||||
# Default to trying to use Python provider
|
||||
try:
|
||||
return KeyRingPythonProvider()
|
||||
except ImportError:
|
||||
pass
|
||||
except Exception as exc:
|
||||
# In the event of an unexpected exception
|
||||
# we should warn the user
|
||||
logger.warning(
|
||||
"Installed copy of keyring fails with exception %s, "
|
||||
"trying to find a keyring executable as a fallback",
|
||||
str(exc),
|
||||
)
|
||||
|
||||
# Fallback to Cli Provider if `keyring` isn't installed
|
||||
cli = shutil.which("keyring")
|
||||
if cli:
|
||||
return KeyRingCliProvider(cli)
|
||||
|
||||
return KeyRingNullProvider()
|
||||
|
||||
|
||||
def get_keyring_auth(url: Optional[str], username: Optional[str]) -> Optional[AuthInfo]:
|
||||
"""Return the tuple auth for a given url from keyring."""
|
||||
# Do nothing if no url was provided
|
||||
if not url:
|
||||
return None
|
||||
|
||||
keyring = get_keyring_provider()
|
||||
try:
|
||||
return keyring.get_auth_info(url, username)
|
||||
except Exception as exc:
|
||||
logger.warning(
|
||||
"Keyring is skipped due to an exception: %s",
|
||||
str(exc),
|
||||
)
|
||||
keyring = None # type: ignore[assignment]
|
||||
return None
|
||||
global KEYRING_DISABLED
|
||||
KEYRING_DISABLED = True
|
||||
return None
|
||||
|
||||
|
||||
class MultiDomainBasicAuth(AuthBase):
|
||||
|
@ -241,7 +359,7 @@ class MultiDomainBasicAuth(AuthBase):
|
|||
|
||||
# Factored out to allow for easy patching in tests
|
||||
def _should_save_password_to_keyring(self) -> bool:
|
||||
if not keyring:
|
||||
if get_keyring_provider() is None:
|
||||
return False
|
||||
return ask("Save credentials to keyring [y/N]: ", ["y", "n"]) == "y"
|
||||
|
||||
|
@ -276,7 +394,11 @@ class MultiDomainBasicAuth(AuthBase):
|
|||
|
||||
# Prompt to save the password to keyring
|
||||
if save and self._should_save_password_to_keyring():
|
||||
self._credentials_to_save = (parsed.netloc, username, password)
|
||||
self._credentials_to_save = Credentials(
|
||||
url=parsed.netloc,
|
||||
username=username,
|
||||
password=password,
|
||||
)
|
||||
|
||||
# Consume content and release the original connection to allow our new
|
||||
# request to reuse the same one.
|
||||
|
@ -309,15 +431,16 @@ class MultiDomainBasicAuth(AuthBase):
|
|||
|
||||
def save_credentials(self, resp: Response, **kwargs: Any) -> None:
|
||||
"""Response callback to save credentials on success."""
|
||||
assert keyring is not None, "should never reach here without keyring"
|
||||
if not keyring:
|
||||
return
|
||||
keyring = get_keyring_provider()
|
||||
assert not isinstance(
|
||||
keyring, KeyRingNullProvider
|
||||
), "should never reach here without keyring"
|
||||
|
||||
creds = self._credentials_to_save
|
||||
self._credentials_to_save = None
|
||||
if creds and resp.status_code < 400:
|
||||
try:
|
||||
logger.info("Saving credentials to keyring")
|
||||
keyring.set_password(*creds)
|
||||
keyring.save_auth_info(creds.url, creds.username, creds.password)
|
||||
except Exception:
|
||||
logger.exception("Failed to save credentials")
|
||||
|
|
|
@ -1,5 +1,6 @@
|
|||
import functools
|
||||
from typing import Any, List, Optional, Tuple
|
||||
import sys
|
||||
from typing import Any, Dict, Iterable, List, Optional, Tuple
|
||||
|
||||
import pytest
|
||||
|
||||
|
@ -8,6 +9,13 @@ from pip._internal.network.auth import MultiDomainBasicAuth
|
|||
from tests.lib.requests_mocks import MockConnection, MockRequest, MockResponse
|
||||
|
||||
|
||||
@pytest.fixture(scope="function", autouse=True)
|
||||
def reset_keyring() -> Iterable[None]:
|
||||
yield None
|
||||
# Reset the state of the module between tests
|
||||
pip._internal.network.auth.KEYRING_DISABLED = False
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
["input_url", "url", "username", "password"],
|
||||
[
|
||||
|
@ -138,7 +146,7 @@ def test_keyring_get_password(
|
|||
expect: Tuple[Optional[str], Optional[str]],
|
||||
) -> None:
|
||||
keyring = KeyringModuleV1()
|
||||
monkeypatch.setattr("pip._internal.network.auth.keyring", keyring)
|
||||
monkeypatch.setitem(sys.modules, "keyring", keyring) # type: ignore[misc]
|
||||
auth = MultiDomainBasicAuth(index_urls=["http://example.com/path2"])
|
||||
|
||||
actual = auth._get_new_credentials(url, allow_netrc=False, allow_keyring=True)
|
||||
|
@ -147,7 +155,7 @@ def test_keyring_get_password(
|
|||
|
||||
def test_keyring_get_password_after_prompt(monkeypatch: pytest.MonkeyPatch) -> None:
|
||||
keyring = KeyringModuleV1()
|
||||
monkeypatch.setattr("pip._internal.network.auth.keyring", keyring)
|
||||
monkeypatch.setitem(sys.modules, "keyring", keyring) # type: ignore[misc]
|
||||
auth = MultiDomainBasicAuth()
|
||||
|
||||
def ask_input(prompt: str) -> str:
|
||||
|
@ -163,7 +171,7 @@ def test_keyring_get_password_after_prompt_when_none(
|
|||
monkeypatch: pytest.MonkeyPatch,
|
||||
) -> None:
|
||||
keyring = KeyringModuleV1()
|
||||
monkeypatch.setattr("pip._internal.network.auth.keyring", keyring)
|
||||
monkeypatch.setitem(sys.modules, "keyring", keyring) # type: ignore[misc]
|
||||
auth = MultiDomainBasicAuth()
|
||||
|
||||
def ask_input(prompt: str) -> str:
|
||||
|
@ -184,7 +192,7 @@ def test_keyring_get_password_username_in_index(
|
|||
monkeypatch: pytest.MonkeyPatch,
|
||||
) -> None:
|
||||
keyring = KeyringModuleV1()
|
||||
monkeypatch.setattr("pip._internal.network.auth.keyring", keyring)
|
||||
monkeypatch.setitem(sys.modules, "keyring", keyring) # type: ignore[misc]
|
||||
auth = MultiDomainBasicAuth(index_urls=["http://user@example.com/path2"])
|
||||
get = functools.partial(
|
||||
auth._get_new_credentials, allow_netrc=False, allow_keyring=True
|
||||
|
@ -217,7 +225,7 @@ def test_keyring_set_password(
|
|||
expect_save: bool,
|
||||
) -> None:
|
||||
keyring = KeyringModuleV1()
|
||||
monkeypatch.setattr("pip._internal.network.auth.keyring", keyring)
|
||||
monkeypatch.setitem(sys.modules, "keyring", keyring) # type: ignore[misc]
|
||||
auth = MultiDomainBasicAuth(prompting=True)
|
||||
monkeypatch.setattr(auth, "_get_url_and_credentials", lambda u: (u, None, None))
|
||||
monkeypatch.setattr(auth, "_prompt_for_password", lambda *a: creds)
|
||||
|
@ -293,7 +301,7 @@ class KeyringModuleV2:
|
|||
def test_keyring_get_credential(
|
||||
monkeypatch: pytest.MonkeyPatch, url: str, expect: str
|
||||
) -> None:
|
||||
monkeypatch.setattr(pip._internal.network.auth, "keyring", KeyringModuleV2())
|
||||
monkeypatch.setitem(sys.modules, "keyring", KeyringModuleV2()) # type: ignore[misc]
|
||||
auth = MultiDomainBasicAuth(index_urls=["http://example.com/path2"])
|
||||
|
||||
assert (
|
||||
|
@ -314,7 +322,7 @@ class KeyringModuleBroken:
|
|||
|
||||
def test_broken_keyring_disables_keyring(monkeypatch: pytest.MonkeyPatch) -> None:
|
||||
keyring_broken = KeyringModuleBroken()
|
||||
monkeypatch.setattr(pip._internal.network.auth, "keyring", keyring_broken)
|
||||
monkeypatch.setitem(sys.modules, "keyring", keyring_broken) # type: ignore[misc]
|
||||
|
||||
auth = MultiDomainBasicAuth(index_urls=["http://example.com/"])
|
||||
|
||||
|
@ -325,3 +333,143 @@ def test_broken_keyring_disables_keyring(monkeypatch: pytest.MonkeyPatch) -> Non
|
|||
url, allow_netrc=False, allow_keyring=True
|
||||
) == (None, None)
|
||||
assert keyring_broken._call_count == 1
|
||||
|
||||
|
||||
class KeyringSubprocessResult(KeyringModuleV1):
|
||||
"""Represents the subprocess call to keyring"""
|
||||
|
||||
returncode = 0 # Default to zero retcode
|
||||
|
||||
def __call__(
|
||||
self,
|
||||
cmd: List[str],
|
||||
*,
|
||||
env: Dict[str, str],
|
||||
stdin: Optional[Any] = None,
|
||||
capture_output: Optional[bool] = None,
|
||||
input: Optional[bytes] = None,
|
||||
) -> Any:
|
||||
if cmd[1] == "get":
|
||||
assert stdin == -3 # subprocess.DEVNULL
|
||||
assert capture_output is True
|
||||
assert env["PYTHONIOENCODING"] == "utf-8"
|
||||
|
||||
password = self.get_password(*cmd[2:])
|
||||
if password is None:
|
||||
# Expect non-zero returncode if no password present
|
||||
self.returncode = 1
|
||||
else:
|
||||
# Passwords are returned encoded with a newline appended
|
||||
self.stdout = password.encode("utf-8") + b"\n"
|
||||
|
||||
if cmd[1] == "set":
|
||||
assert stdin is None
|
||||
assert capture_output is None
|
||||
assert env["PYTHONIOENCODING"] == "utf-8"
|
||||
assert input is not None
|
||||
|
||||
# Input from stdin is encoded
|
||||
self.set_password(cmd[2], cmd[3], input.decode("utf-8").strip("\n"))
|
||||
|
||||
return self
|
||||
|
||||
def check_returncode(self) -> None:
|
||||
if self.returncode:
|
||||
raise Exception()
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"url, expect",
|
||||
(
|
||||
("http://example.com/path1", (None, None)),
|
||||
# path1 URLs will be resolved by netloc
|
||||
("http://user@example.com/path1", ("user", "user!netloc")),
|
||||
("http://user2@example.com/path1", ("user2", "user2!netloc")),
|
||||
# path2 URLs will be resolved by index URL
|
||||
("http://example.com/path2/path3", (None, None)),
|
||||
("http://foo@example.com/path2/path3", ("foo", "foo!url")),
|
||||
),
|
||||
)
|
||||
def test_keyring_cli_get_password(
|
||||
monkeypatch: pytest.MonkeyPatch,
|
||||
url: str,
|
||||
expect: Tuple[Optional[str], Optional[str]],
|
||||
) -> None:
|
||||
monkeypatch.setattr(pip._internal.network.auth.shutil, "which", lambda x: "keyring")
|
||||
monkeypatch.setattr(
|
||||
pip._internal.network.auth.subprocess, "run", KeyringSubprocessResult()
|
||||
)
|
||||
auth = MultiDomainBasicAuth(index_urls=["http://example.com/path2"])
|
||||
|
||||
actual = auth._get_new_credentials(url, allow_netrc=False, allow_keyring=True)
|
||||
assert actual == expect
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"response_status, creds, expect_save",
|
||||
(
|
||||
(403, ("user", "pass", True), False),
|
||||
(
|
||||
200,
|
||||
("user", "pass", True),
|
||||
True,
|
||||
),
|
||||
(
|
||||
200,
|
||||
("user", "pass", False),
|
||||
False,
|
||||
),
|
||||
),
|
||||
)
|
||||
def test_keyring_cli_set_password(
|
||||
monkeypatch: pytest.MonkeyPatch,
|
||||
response_status: int,
|
||||
creds: Tuple[str, str, bool],
|
||||
expect_save: bool,
|
||||
) -> None:
|
||||
monkeypatch.setattr(pip._internal.network.auth.shutil, "which", lambda x: "keyring")
|
||||
keyring = KeyringSubprocessResult()
|
||||
monkeypatch.setattr(pip._internal.network.auth.subprocess, "run", keyring)
|
||||
auth = MultiDomainBasicAuth(prompting=True)
|
||||
monkeypatch.setattr(auth, "_get_url_and_credentials", lambda u: (u, None, None))
|
||||
monkeypatch.setattr(auth, "_prompt_for_password", lambda *a: creds)
|
||||
if creds[2]:
|
||||
# when _prompt_for_password indicates to save, we should save
|
||||
def should_save_password_to_keyring(*a: Any) -> bool:
|
||||
return True
|
||||
|
||||
else:
|
||||
# when _prompt_for_password indicates not to save, we should
|
||||
# never call this function
|
||||
def should_save_password_to_keyring(*a: Any) -> bool:
|
||||
assert False, "_should_save_password_to_keyring should not be called"
|
||||
|
||||
monkeypatch.setattr(
|
||||
auth, "_should_save_password_to_keyring", should_save_password_to_keyring
|
||||
)
|
||||
|
||||
req = MockRequest("https://example.com")
|
||||
resp = MockResponse(b"")
|
||||
resp.url = req.url
|
||||
connection = MockConnection()
|
||||
|
||||
def _send(sent_req: MockRequest, **kwargs: Any) -> MockResponse:
|
||||
assert sent_req is req
|
||||
assert "Authorization" in sent_req.headers
|
||||
r = MockResponse(b"")
|
||||
r.status_code = response_status
|
||||
return r
|
||||
|
||||
# https://github.com/python/mypy/issues/2427
|
||||
connection._send = _send # type: ignore[assignment]
|
||||
|
||||
resp.request = req
|
||||
resp.status_code = 401
|
||||
resp.connection = connection
|
||||
|
||||
auth.handle_401(resp)
|
||||
|
||||
if expect_save:
|
||||
assert keyring.saved_passwords == [("example.com", creds[0], creds[1])]
|
||||
else:
|
||||
assert keyring.saved_passwords == []
|
||||
|
|
Loading…
Reference in New Issue