mirror of
https://github.com/pypa/pip
synced 2023-12-13 21:30:23 +01:00
403 lines
14 KiB
Python
403 lines
14 KiB
Python
# -*- coding: utf-8 -*-
|
|
import locale
|
|
import sys
|
|
from logging import DEBUG, ERROR, INFO, WARNING
|
|
from textwrap import dedent
|
|
|
|
import pytest
|
|
|
|
from pip._internal.cli.spinners import SpinnerInterface
|
|
from pip._internal.exceptions import InstallationError
|
|
from pip._internal.utils.misc import hide_value
|
|
from pip._internal.utils.subprocess import (
|
|
call_subprocess,
|
|
format_command_args,
|
|
make_command,
|
|
make_subprocess_output_error,
|
|
)
|
|
|
|
|
|
@pytest.mark.parametrize('args, expected', [
|
|
(['pip', 'list'], 'pip list'),
|
|
(['foo', 'space space', 'new\nline', 'double"quote', "single'quote"],
|
|
"""foo 'space space' 'new\nline' 'double"quote' 'single'"'"'quote'"""),
|
|
# Test HiddenText arguments.
|
|
(make_command(hide_value('secret1'), 'foo', hide_value('secret2')),
|
|
"'****' foo '****'"),
|
|
])
|
|
def test_format_command_args(args, expected):
|
|
actual = format_command_args(args)
|
|
assert actual == expected
|
|
|
|
|
|
def test_make_subprocess_output_error():
|
|
cmd_args = ['test', 'has space']
|
|
cwd = '/path/to/cwd'
|
|
lines = ['line1\n', 'line2\n', 'line3\n']
|
|
actual = make_subprocess_output_error(
|
|
cmd_args=cmd_args,
|
|
cwd=cwd,
|
|
lines=lines,
|
|
exit_status=3,
|
|
)
|
|
expected = dedent("""\
|
|
Command errored out with exit status 3:
|
|
command: test 'has space'
|
|
cwd: /path/to/cwd
|
|
Complete output (3 lines):
|
|
line1
|
|
line2
|
|
line3
|
|
----------------------------------------""")
|
|
assert actual == expected, 'actual: {}'.format(actual)
|
|
|
|
|
|
def test_make_subprocess_output_error__non_ascii_command_arg(monkeypatch):
|
|
"""
|
|
Test a command argument with a non-ascii character.
|
|
"""
|
|
cmd_args = ['foo', 'déf']
|
|
if sys.version_info[0] == 2:
|
|
# Check in Python 2 that the str (bytes object) with the non-ascii
|
|
# character has the encoding we expect. (This comes from the source
|
|
# code encoding at the top of the file.)
|
|
assert cmd_args[1].decode('utf-8') == u'déf'
|
|
|
|
# We need to monkeypatch so the encoding will be correct on Windows.
|
|
monkeypatch.setattr(locale, 'getpreferredencoding', lambda: 'utf-8')
|
|
actual = make_subprocess_output_error(
|
|
cmd_args=cmd_args,
|
|
cwd='/path/to/cwd',
|
|
lines=[],
|
|
exit_status=1,
|
|
)
|
|
expected = dedent(u"""\
|
|
Command errored out with exit status 1:
|
|
command: foo 'déf'
|
|
cwd: /path/to/cwd
|
|
Complete output (0 lines):
|
|
----------------------------------------""")
|
|
assert actual == expected, u'actual: {}'.format(actual)
|
|
|
|
|
|
@pytest.mark.skipif("sys.version_info < (3,)")
|
|
def test_make_subprocess_output_error__non_ascii_cwd_python_3(monkeypatch):
|
|
"""
|
|
Test a str (text) cwd with a non-ascii character in Python 3.
|
|
"""
|
|
cmd_args = ['test']
|
|
cwd = '/path/to/cwd/déf'
|
|
actual = make_subprocess_output_error(
|
|
cmd_args=cmd_args,
|
|
cwd=cwd,
|
|
lines=[],
|
|
exit_status=1,
|
|
)
|
|
expected = dedent("""\
|
|
Command errored out with exit status 1:
|
|
command: test
|
|
cwd: /path/to/cwd/déf
|
|
Complete output (0 lines):
|
|
----------------------------------------""")
|
|
assert actual == expected, 'actual: {}'.format(actual)
|
|
|
|
|
|
@pytest.mark.parametrize('encoding', [
|
|
'utf-8',
|
|
# Test a Windows encoding.
|
|
'cp1252',
|
|
])
|
|
@pytest.mark.skipif("sys.version_info >= (3,)")
|
|
def test_make_subprocess_output_error__non_ascii_cwd_python_2(
|
|
monkeypatch, encoding,
|
|
):
|
|
"""
|
|
Test a str (bytes object) cwd with a non-ascii character in Python 2.
|
|
"""
|
|
cmd_args = ['test']
|
|
cwd = u'/path/to/cwd/déf'.encode(encoding)
|
|
monkeypatch.setattr(sys, 'getfilesystemencoding', lambda: encoding)
|
|
actual = make_subprocess_output_error(
|
|
cmd_args=cmd_args,
|
|
cwd=cwd,
|
|
lines=[],
|
|
exit_status=1,
|
|
)
|
|
expected = dedent(u"""\
|
|
Command errored out with exit status 1:
|
|
command: test
|
|
cwd: /path/to/cwd/déf
|
|
Complete output (0 lines):
|
|
----------------------------------------""")
|
|
assert actual == expected, u'actual: {}'.format(actual)
|
|
|
|
|
|
# This test is mainly important for checking unicode in Python 2.
|
|
def test_make_subprocess_output_error__non_ascii_line():
|
|
"""
|
|
Test a line with a non-ascii character.
|
|
"""
|
|
lines = [u'curly-quote: \u2018\n']
|
|
actual = make_subprocess_output_error(
|
|
cmd_args=['test'],
|
|
cwd='/path/to/cwd',
|
|
lines=lines,
|
|
exit_status=1,
|
|
)
|
|
expected = dedent(u"""\
|
|
Command errored out with exit status 1:
|
|
command: test
|
|
cwd: /path/to/cwd
|
|
Complete output (1 lines):
|
|
curly-quote: \u2018
|
|
----------------------------------------""")
|
|
assert actual == expected, u'actual: {}'.format(actual)
|
|
|
|
|
|
class FakeSpinner(SpinnerInterface):
|
|
|
|
def __init__(self):
|
|
self.spin_count = 0
|
|
self.final_status = None
|
|
|
|
def spin(self):
|
|
self.spin_count += 1
|
|
|
|
def finish(self, final_status):
|
|
self.final_status = final_status
|
|
|
|
|
|
class TestCallSubprocess(object):
|
|
|
|
"""
|
|
Test call_subprocess().
|
|
"""
|
|
|
|
def check_result(
|
|
self, capfd, caplog, log_level, spinner, result, expected,
|
|
expected_spinner,
|
|
):
|
|
"""
|
|
Check the result of calling call_subprocess().
|
|
|
|
:param log_level: the logging level that caplog was set to.
|
|
:param spinner: the FakeSpinner object passed to call_subprocess()
|
|
to be checked.
|
|
:param result: the call_subprocess() return value to be checked.
|
|
:param expected: a pair (expected_proc, expected_records), where
|
|
1) `expected_proc` is the expected return value of
|
|
call_subprocess() as a list of lines, or None if the return
|
|
value is expected to be None;
|
|
2) `expected_records` is the expected value of
|
|
caplog.record_tuples.
|
|
:param expected_spinner: a 2-tuple of the spinner's expected
|
|
(spin_count, final_status).
|
|
"""
|
|
expected_proc, expected_records = expected
|
|
|
|
if expected_proc is None:
|
|
assert result is None
|
|
else:
|
|
assert result.splitlines() == expected_proc
|
|
|
|
# Confirm that stdout and stderr haven't been written to.
|
|
captured = capfd.readouterr()
|
|
assert (captured.out, captured.err) == ('', '')
|
|
|
|
records = caplog.record_tuples
|
|
if len(records) != len(expected_records):
|
|
raise RuntimeError('{} != {}'.format(records, expected_records))
|
|
|
|
for record, expected_record in zip(records, expected_records):
|
|
# Check the logger_name and log level parts exactly.
|
|
assert record[:2] == expected_record[:2]
|
|
# For the message portion, check only a substring. Also, we
|
|
# can't use startswith() since the order of stdout and stderr
|
|
# isn't guaranteed in cases where stderr is also present.
|
|
# For example, we observed the stderr lines coming before stdout
|
|
# in CI for PyPy 2.7 even though stdout happens first
|
|
# chronologically.
|
|
assert expected_record[2] in record[2]
|
|
|
|
assert (spinner.spin_count, spinner.final_status) == expected_spinner
|
|
|
|
def prepare_call(self, caplog, log_level, command=None):
|
|
if command is None:
|
|
command = 'print("Hello"); print("world")'
|
|
|
|
caplog.set_level(log_level)
|
|
spinner = FakeSpinner()
|
|
args = [sys.executable, '-c', command]
|
|
|
|
return (args, spinner)
|
|
|
|
def test_debug_logging(self, capfd, caplog):
|
|
"""
|
|
Test DEBUG logging (and without passing show_stdout=True).
|
|
"""
|
|
log_level = DEBUG
|
|
args, spinner = self.prepare_call(caplog, log_level)
|
|
result = call_subprocess(args, spinner=spinner)
|
|
|
|
expected = (['Hello', 'world'], [
|
|
('pip.subprocessor', DEBUG, 'Running command '),
|
|
('pip.subprocessor', DEBUG, 'Hello'),
|
|
('pip.subprocessor', DEBUG, 'world'),
|
|
])
|
|
# The spinner shouldn't spin in this case since the subprocess
|
|
# output is already being logged to the console.
|
|
self.check_result(
|
|
capfd, caplog, log_level, spinner, result, expected,
|
|
expected_spinner=(0, None),
|
|
)
|
|
|
|
def test_info_logging(self, capfd, caplog):
|
|
"""
|
|
Test INFO logging (and without passing show_stdout=True).
|
|
"""
|
|
log_level = INFO
|
|
args, spinner = self.prepare_call(caplog, log_level)
|
|
result = call_subprocess(args, spinner=spinner)
|
|
|
|
expected = (['Hello', 'world'], [])
|
|
# The spinner should spin twice in this case since the subprocess
|
|
# output isn't being written to the console.
|
|
self.check_result(
|
|
capfd, caplog, log_level, spinner, result, expected,
|
|
expected_spinner=(2, 'done'),
|
|
)
|
|
|
|
def test_info_logging__subprocess_error(self, capfd, caplog):
|
|
"""
|
|
Test INFO logging of a subprocess with an error (and without passing
|
|
show_stdout=True).
|
|
"""
|
|
log_level = INFO
|
|
command = 'print("Hello"); print("world"); exit("fail")'
|
|
args, spinner = self.prepare_call(caplog, log_level, command=command)
|
|
|
|
with pytest.raises(InstallationError) as exc:
|
|
call_subprocess(args, spinner=spinner)
|
|
result = None
|
|
exc_message = str(exc.value)
|
|
assert exc_message.startswith(
|
|
'Command errored out with exit status 1: '
|
|
)
|
|
assert exc_message.endswith('Check the logs for full command output.')
|
|
|
|
expected = (None, [
|
|
('pip.subprocessor', ERROR, 'Complete output (3 lines):\n'),
|
|
])
|
|
# The spinner should spin three times in this case since the
|
|
# subprocess output isn't being written to the console.
|
|
self.check_result(
|
|
capfd, caplog, log_level, spinner, result, expected,
|
|
expected_spinner=(3, 'error'),
|
|
)
|
|
|
|
# Do some further checking on the captured log records to confirm
|
|
# that the subprocess output was logged.
|
|
last_record = caplog.record_tuples[-1]
|
|
last_message = last_record[2]
|
|
lines = last_message.splitlines()
|
|
|
|
# We have to sort before comparing the lines because we can't
|
|
# guarantee the order in which stdout and stderr will appear.
|
|
# For example, we observed the stderr lines coming before stdout
|
|
# in CI for PyPy 2.7 even though stdout happens first chronologically.
|
|
actual = sorted(lines)
|
|
# Test the "command" line separately because we can't test an
|
|
# exact match.
|
|
command_line = actual.pop(1)
|
|
assert actual == [
|
|
' cwd: None',
|
|
'----------------------------------------',
|
|
'Command errored out with exit status 1:',
|
|
'Complete output (3 lines):',
|
|
'Hello',
|
|
'fail',
|
|
'world',
|
|
], 'lines: {}'.format(actual) # Show the full output on failure.
|
|
|
|
assert command_line.startswith(' command: ')
|
|
assert command_line.endswith('print("world"); exit("fail")\'')
|
|
|
|
def test_info_logging_with_show_stdout_true(self, capfd, caplog):
|
|
"""
|
|
Test INFO logging with show_stdout=True.
|
|
"""
|
|
log_level = INFO
|
|
args, spinner = self.prepare_call(caplog, log_level)
|
|
result = call_subprocess(args, spinner=spinner, show_stdout=True)
|
|
|
|
expected = (['Hello', 'world'], [
|
|
('pip.subprocessor', INFO, 'Running command '),
|
|
('pip.subprocessor', INFO, 'Hello'),
|
|
('pip.subprocessor', INFO, 'world'),
|
|
])
|
|
# The spinner shouldn't spin in this case since the subprocess
|
|
# output is already being written to the console.
|
|
self.check_result(
|
|
capfd, caplog, log_level, spinner, result, expected,
|
|
expected_spinner=(0, None),
|
|
)
|
|
|
|
@pytest.mark.parametrize((
|
|
'exit_status', 'show_stdout', 'extra_ok_returncodes', 'log_level',
|
|
'expected'),
|
|
[
|
|
# The spinner should show here because show_stdout=False means
|
|
# the subprocess should get logged at DEBUG level, but the passed
|
|
# log level is only INFO.
|
|
(0, False, None, INFO, (None, 'done', 2)),
|
|
# Test some cases where the spinner should not be shown.
|
|
(0, False, None, DEBUG, (None, None, 0)),
|
|
# Test show_stdout=True.
|
|
(0, True, None, DEBUG, (None, None, 0)),
|
|
(0, True, None, INFO, (None, None, 0)),
|
|
# The spinner should show here because show_stdout=True means
|
|
# the subprocess should get logged at INFO level, but the passed
|
|
# log level is only WARNING.
|
|
(0, True, None, WARNING, (None, 'done', 2)),
|
|
# Test a non-zero exit status.
|
|
(3, False, None, INFO, (InstallationError, 'error', 2)),
|
|
# Test a non-zero exit status also in extra_ok_returncodes.
|
|
(3, False, (3, ), INFO, (None, 'done', 2)),
|
|
])
|
|
def test_spinner_finish(
|
|
self, exit_status, show_stdout, extra_ok_returncodes, log_level,
|
|
caplog, expected,
|
|
):
|
|
"""
|
|
Test that the spinner finishes correctly.
|
|
"""
|
|
expected_exc_type = expected[0]
|
|
expected_final_status = expected[1]
|
|
expected_spin_count = expected[2]
|
|
|
|
command = (
|
|
'print("Hello"); print("world"); exit({})'.format(exit_status)
|
|
)
|
|
args, spinner = self.prepare_call(caplog, log_level, command=command)
|
|
try:
|
|
call_subprocess(
|
|
args,
|
|
show_stdout=show_stdout,
|
|
extra_ok_returncodes=extra_ok_returncodes,
|
|
spinner=spinner,
|
|
)
|
|
except Exception as exc:
|
|
exc_type = type(exc)
|
|
else:
|
|
exc_type = None
|
|
|
|
assert exc_type == expected_exc_type
|
|
assert spinner.final_status == expected_final_status
|
|
assert spinner.spin_count == expected_spin_count
|
|
|
|
def test_closes_stdin(self):
|
|
with pytest.raises(InstallationError):
|
|
call_subprocess(
|
|
[sys.executable, '-c', 'input()'],
|
|
show_stdout=True,
|
|
)
|