mirror of
https://github.com/pypa/pip
synced 2023-12-13 21:30:23 +01:00
206 lines
6.7 KiB
Python
206 lines
6.7 KiB
Python
import logging
|
|
import os
|
|
import time
|
|
from optparse import Values
|
|
from pathlib import Path
|
|
from typing import Callable, Iterator, List, NoReturn, Optional
|
|
from unittest.mock import Mock, patch
|
|
|
|
import pytest
|
|
|
|
from pip._internal.cli.base_command import Command
|
|
from pip._internal.cli.status_codes import SUCCESS
|
|
from pip._internal.utils import temp_dir
|
|
from pip._internal.utils.logging import BrokenStdoutLoggingError
|
|
from pip._internal.utils.temp_dir import TempDirectory
|
|
|
|
|
|
@pytest.fixture
|
|
def fixed_time() -> Iterator[None]:
|
|
with patch("time.time", lambda: 1547704837.040001 + time.timezone):
|
|
yield
|
|
|
|
|
|
class FakeCommand(Command):
|
|
|
|
_name = "fake"
|
|
|
|
def __init__(
|
|
self, run_func: Optional[Callable[[], int]] = None, error: bool = False
|
|
) -> None:
|
|
if error:
|
|
|
|
def run_func() -> int:
|
|
raise SystemExit(1)
|
|
|
|
self.run_func = run_func
|
|
super().__init__(self._name, self._name)
|
|
|
|
def main(self, args: List[str]) -> int:
|
|
args.append("--disable-pip-version-check")
|
|
return super().main(args)
|
|
|
|
def run(self, options: Values, args: List[str]) -> int:
|
|
logging.getLogger("pip.tests").info("fake")
|
|
# Return SUCCESS from run if run_func is not provided
|
|
if self.run_func:
|
|
return self.run_func()
|
|
else:
|
|
return SUCCESS
|
|
|
|
|
|
class FakeCommandWithUnicode(FakeCommand):
|
|
_name = "fake_unicode"
|
|
|
|
def run(self, options: Values, args: List[str]) -> int:
|
|
logging.getLogger("pip.tests").info(b"bytes here \xE9")
|
|
logging.getLogger("pip.tests").info(b"unicode here \xC3\xA9".decode("utf-8"))
|
|
return SUCCESS
|
|
|
|
|
|
class TestCommand:
|
|
def call_main(self, capsys: pytest.CaptureFixture[str], args: List[str]) -> str:
|
|
"""
|
|
Call command.main(), and return the command's stderr.
|
|
"""
|
|
|
|
def raise_broken_stdout() -> NoReturn:
|
|
raise BrokenStdoutLoggingError()
|
|
|
|
cmd = FakeCommand(run_func=raise_broken_stdout)
|
|
status = cmd.main(args)
|
|
assert status == 1
|
|
stderr = capsys.readouterr().err
|
|
|
|
return stderr
|
|
|
|
def test_raise_broken_stdout(self, capsys: pytest.CaptureFixture[str]) -> None:
|
|
"""
|
|
Test raising BrokenStdoutLoggingError.
|
|
"""
|
|
stderr = self.call_main(capsys, [])
|
|
|
|
assert stderr.rstrip() == "ERROR: Pipe to stdout was broken"
|
|
|
|
def test_raise_broken_stdout__debug_logging(
|
|
self, capsys: pytest.CaptureFixture[str]
|
|
) -> None:
|
|
"""
|
|
Test raising BrokenStdoutLoggingError with debug logging enabled.
|
|
"""
|
|
stderr = self.call_main(capsys, ["-vv"])
|
|
|
|
assert "ERROR: Pipe to stdout was broken" in stderr
|
|
assert "Traceback (most recent call last):" in stderr
|
|
|
|
|
|
@patch("pip._internal.cli.req_command.Command.handle_pip_version_check")
|
|
def test_handle_pip_version_check_called(mock_handle_version_check: Mock) -> None:
|
|
"""
|
|
Check that Command.handle_pip_version_check() is called.
|
|
"""
|
|
cmd = FakeCommand()
|
|
cmd.main([])
|
|
mock_handle_version_check.assert_called_once()
|
|
|
|
|
|
def test_log_command_success(fixed_time: None, tmpdir: Path) -> None:
|
|
"""Test the --log option logs when command succeeds."""
|
|
cmd = FakeCommand()
|
|
log_path = os.path.join(tmpdir, "log")
|
|
cmd.main(["fake", "--log", log_path])
|
|
with open(log_path) as f:
|
|
assert f.read().rstrip() == "2019-01-17T06:00:37,040 fake"
|
|
|
|
|
|
def test_log_command_error(fixed_time: None, tmpdir: Path) -> None:
|
|
"""Test the --log option logs when command fails."""
|
|
cmd = FakeCommand(error=True)
|
|
log_path = os.path.join(tmpdir, "log")
|
|
cmd.main(["fake", "--log", log_path])
|
|
with open(log_path) as f:
|
|
assert f.read().startswith("2019-01-17T06:00:37,040 fake")
|
|
|
|
|
|
def test_log_file_command_error(fixed_time: None, tmpdir: Path) -> None:
|
|
"""Test the --log-file option logs (when there's an error)."""
|
|
cmd = FakeCommand(error=True)
|
|
log_file_path = os.path.join(tmpdir, "log_file")
|
|
cmd.main(["fake", "--log-file", log_file_path])
|
|
with open(log_file_path) as f:
|
|
assert f.read().startswith("2019-01-17T06:00:37,040 fake")
|
|
|
|
|
|
def test_log_unicode_messages(fixed_time: None, tmpdir: Path) -> None:
|
|
"""Tests that logging bytestrings and unicode objects
|
|
don't break logging.
|
|
"""
|
|
cmd = FakeCommandWithUnicode()
|
|
log_path = os.path.join(tmpdir, "log")
|
|
cmd.main(["fake_unicode", "--log", log_path])
|
|
|
|
|
|
@pytest.mark.no_auto_tempdir_manager
|
|
def test_base_command_provides_tempdir_helpers() -> None:
|
|
assert temp_dir._tempdir_manager is None
|
|
assert temp_dir._tempdir_registry is None
|
|
|
|
def assert_helpers_set(options: Values, args: List[str]) -> int:
|
|
assert temp_dir._tempdir_manager is not None
|
|
assert temp_dir._tempdir_registry is not None
|
|
return SUCCESS
|
|
|
|
c = Command("fake", "fake")
|
|
# https://github.com/python/mypy/issues/2427
|
|
c.run = Mock(side_effect=assert_helpers_set) # type: ignore[assignment]
|
|
assert c.main(["fake"]) == SUCCESS
|
|
c.run.assert_called_once()
|
|
|
|
|
|
not_deleted = "not_deleted"
|
|
|
|
|
|
@pytest.mark.parametrize("kind,exists", [(not_deleted, True), ("deleted", False)])
|
|
@pytest.mark.no_auto_tempdir_manager
|
|
def test_base_command_global_tempdir_cleanup(kind: str, exists: bool) -> None:
|
|
assert temp_dir._tempdir_manager is None
|
|
assert temp_dir._tempdir_registry is None
|
|
|
|
class Holder:
|
|
value: str
|
|
|
|
def create_temp_dirs(options: Values, args: List[str]) -> int:
|
|
assert c.tempdir_registry is not None
|
|
c.tempdir_registry.set_delete(not_deleted, False)
|
|
Holder.value = TempDirectory(kind=kind, globally_managed=True).path
|
|
return SUCCESS
|
|
|
|
c = Command("fake", "fake")
|
|
# https://github.com/python/mypy/issues/2427
|
|
c.run = Mock(side_effect=create_temp_dirs) # type: ignore[assignment]
|
|
assert c.main(["fake"]) == SUCCESS
|
|
c.run.assert_called_once()
|
|
assert os.path.exists(Holder.value) == exists
|
|
|
|
|
|
@pytest.mark.parametrize("kind,exists", [(not_deleted, True), ("deleted", False)])
|
|
@pytest.mark.no_auto_tempdir_manager
|
|
def test_base_command_local_tempdir_cleanup(kind: str, exists: bool) -> None:
|
|
assert temp_dir._tempdir_manager is None
|
|
assert temp_dir._tempdir_registry is None
|
|
|
|
def create_temp_dirs(options: Values, args: List[str]) -> int:
|
|
assert c.tempdir_registry is not None
|
|
c.tempdir_registry.set_delete(not_deleted, False)
|
|
|
|
with TempDirectory(kind=kind) as d:
|
|
path = d.path
|
|
assert os.path.exists(path)
|
|
assert os.path.exists(path) == exists
|
|
return SUCCESS
|
|
|
|
c = Command("fake", "fake")
|
|
# https://github.com/python/mypy/issues/2427
|
|
c.run = Mock(side_effect=create_temp_dirs) # type: ignore[assignment]
|
|
assert c.main(["fake"]) == SUCCESS
|
|
c.run.assert_called_once()
|