# -*- coding: utf-8 -*- import locale import sys from logging import DEBUG, ERROR, INFO, WARNING from textwrap import dedent import pytest from pip._internal.cli.spinners import SpinnerInterface from pip._internal.exceptions import InstallationError from pip._internal.utils.misc import hide_value from pip._internal.utils.subprocess import ( call_subprocess, format_command_args, make_command, make_subprocess_output_error, ) @pytest.mark.parametrize('args, expected', [ (['pip', 'list'], 'pip list'), (['foo', 'space space', 'new\nline', 'double"quote', "single'quote"], """foo 'space space' 'new\nline' 'double"quote' 'single'"'"'quote'"""), # Test HiddenText arguments. (make_command(hide_value('secret1'), 'foo', hide_value('secret2')), "'****' foo '****'"), ]) def test_format_command_args(args, expected): actual = format_command_args(args) assert actual == expected def test_make_subprocess_output_error(): cmd_args = ['test', 'has space'] cwd = '/path/to/cwd' lines = ['line1\n', 'line2\n', 'line3\n'] actual = make_subprocess_output_error( cmd_args=cmd_args, cwd=cwd, lines=lines, exit_status=3, ) expected = dedent("""\ Command errored out with exit status 3: command: test 'has space' cwd: /path/to/cwd Complete output (3 lines): line1 line2 line3 ----------------------------------------""") assert actual == expected, 'actual: {}'.format(actual) def test_make_subprocess_output_error__non_ascii_command_arg(monkeypatch): """ Test a command argument with a non-ascii character. """ cmd_args = ['foo', 'déf'] if sys.version_info[0] == 2: # Check in Python 2 that the str (bytes object) with the non-ascii # character has the encoding we expect. (This comes from the source # code encoding at the top of the file.) assert cmd_args[1].decode('utf-8') == u'déf' # We need to monkeypatch so the encoding will be correct on Windows. monkeypatch.setattr(locale, 'getpreferredencoding', lambda: 'utf-8') actual = make_subprocess_output_error( cmd_args=cmd_args, cwd='/path/to/cwd', lines=[], exit_status=1, ) expected = dedent(u"""\ Command errored out with exit status 1: command: foo 'déf' cwd: /path/to/cwd Complete output (0 lines): ----------------------------------------""") assert actual == expected, u'actual: {}'.format(actual) @pytest.mark.skipif("sys.version_info < (3,)") def test_make_subprocess_output_error__non_ascii_cwd_python_3(monkeypatch): """ Test a str (text) cwd with a non-ascii character in Python 3. """ cmd_args = ['test'] cwd = '/path/to/cwd/déf' actual = make_subprocess_output_error( cmd_args=cmd_args, cwd=cwd, lines=[], exit_status=1, ) expected = dedent("""\ Command errored out with exit status 1: command: test cwd: /path/to/cwd/déf Complete output (0 lines): ----------------------------------------""") assert actual == expected, 'actual: {}'.format(actual) @pytest.mark.parametrize('encoding', [ 'utf-8', # Test a Windows encoding. 'cp1252', ]) @pytest.mark.skipif("sys.version_info >= (3,)") def test_make_subprocess_output_error__non_ascii_cwd_python_2( monkeypatch, encoding, ): """ Test a str (bytes object) cwd with a non-ascii character in Python 2. """ cmd_args = ['test'] cwd = u'/path/to/cwd/déf'.encode(encoding) monkeypatch.setattr(sys, 'getfilesystemencoding', lambda: encoding) actual = make_subprocess_output_error( cmd_args=cmd_args, cwd=cwd, lines=[], exit_status=1, ) expected = dedent(u"""\ Command errored out with exit status 1: command: test cwd: /path/to/cwd/déf Complete output (0 lines): ----------------------------------------""") assert actual == expected, u'actual: {}'.format(actual) # This test is mainly important for checking unicode in Python 2. def test_make_subprocess_output_error__non_ascii_line(): """ Test a line with a non-ascii character. """ lines = [u'curly-quote: \u2018\n'] actual = make_subprocess_output_error( cmd_args=['test'], cwd='/path/to/cwd', lines=lines, exit_status=1, ) expected = dedent(u"""\ Command errored out with exit status 1: command: test cwd: /path/to/cwd Complete output (1 lines): curly-quote: \u2018 ----------------------------------------""") assert actual == expected, u'actual: {}'.format(actual) class FakeSpinner(SpinnerInterface): def __init__(self): self.spin_count = 0 self.final_status = None def spin(self): self.spin_count += 1 def finish(self, final_status): self.final_status = final_status class TestCallSubprocess(object): """ Test call_subprocess(). """ def check_result( self, capfd, caplog, log_level, spinner, result, expected, expected_spinner, ): """ Check the result of calling call_subprocess(). :param log_level: the logging level that caplog was set to. :param spinner: the FakeSpinner object passed to call_subprocess() to be checked. :param result: the call_subprocess() return value to be checked. :param expected: a pair (expected_proc, expected_records), where 1) `expected_proc` is the expected return value of call_subprocess() as a list of lines, or None if the return value is expected to be None; 2) `expected_records` is the expected value of caplog.record_tuples. :param expected_spinner: a 2-tuple of the spinner's expected (spin_count, final_status). """ expected_proc, expected_records = expected if expected_proc is None: assert result is None else: assert result.splitlines() == expected_proc # Confirm that stdout and stderr haven't been written to. captured = capfd.readouterr() assert (captured.out, captured.err) == ('', '') records = caplog.record_tuples if len(records) != len(expected_records): raise RuntimeError('{} != {}'.format(records, expected_records)) for record, expected_record in zip(records, expected_records): # Check the logger_name and log level parts exactly. assert record[:2] == expected_record[:2] # For the message portion, check only a substring. Also, we # can't use startswith() since the order of stdout and stderr # isn't guaranteed in cases where stderr is also present. # For example, we observed the stderr lines coming before stdout # in CI for PyPy 2.7 even though stdout happens first # chronologically. assert expected_record[2] in record[2] assert (spinner.spin_count, spinner.final_status) == expected_spinner def prepare_call(self, caplog, log_level, command=None): if command is None: command = 'print("Hello"); print("world")' caplog.set_level(log_level) spinner = FakeSpinner() args = [sys.executable, '-c', command] return (args, spinner) def test_debug_logging(self, capfd, caplog): """ Test DEBUG logging (and without passing show_stdout=True). """ log_level = DEBUG args, spinner = self.prepare_call(caplog, log_level) result = call_subprocess(args, spinner=spinner) expected = (['Hello', 'world'], [ ('pip.subprocessor', DEBUG, 'Running command '), ('pip.subprocessor', DEBUG, 'Hello'), ('pip.subprocessor', DEBUG, 'world'), ]) # The spinner shouldn't spin in this case since the subprocess # output is already being logged to the console. self.check_result( capfd, caplog, log_level, spinner, result, expected, expected_spinner=(0, None), ) def test_info_logging(self, capfd, caplog): """ Test INFO logging (and without passing show_stdout=True). """ log_level = INFO args, spinner = self.prepare_call(caplog, log_level) result = call_subprocess(args, spinner=spinner) expected = (['Hello', 'world'], []) # The spinner should spin twice in this case since the subprocess # output isn't being written to the console. self.check_result( capfd, caplog, log_level, spinner, result, expected, expected_spinner=(2, 'done'), ) def test_info_logging__subprocess_error(self, capfd, caplog): """ Test INFO logging of a subprocess with an error (and without passing show_stdout=True). """ log_level = INFO command = 'print("Hello"); print("world"); exit("fail")' args, spinner = self.prepare_call(caplog, log_level, command=command) with pytest.raises(InstallationError) as exc: call_subprocess(args, spinner=spinner) result = None exc_message = str(exc.value) assert exc_message.startswith( 'Command errored out with exit status 1: ' ) assert exc_message.endswith('Check the logs for full command output.') expected = (None, [ ('pip.subprocessor', ERROR, 'Complete output (3 lines):\n'), ]) # The spinner should spin three times in this case since the # subprocess output isn't being written to the console. self.check_result( capfd, caplog, log_level, spinner, result, expected, expected_spinner=(3, 'error'), ) # Do some further checking on the captured log records to confirm # that the subprocess output was logged. last_record = caplog.record_tuples[-1] last_message = last_record[2] lines = last_message.splitlines() # We have to sort before comparing the lines because we can't # guarantee the order in which stdout and stderr will appear. # For example, we observed the stderr lines coming before stdout # in CI for PyPy 2.7 even though stdout happens first chronologically. actual = sorted(lines) # Test the "command" line separately because we can't test an # exact match. command_line = actual.pop(1) assert actual == [ ' cwd: None', '----------------------------------------', 'Command errored out with exit status 1:', 'Complete output (3 lines):', 'Hello', 'fail', 'world', ], 'lines: {}'.format(actual) # Show the full output on failure. assert command_line.startswith(' command: ') assert command_line.endswith('print("world"); exit("fail")\'') def test_info_logging_with_show_stdout_true(self, capfd, caplog): """ Test INFO logging with show_stdout=True. """ log_level = INFO args, spinner = self.prepare_call(caplog, log_level) result = call_subprocess(args, spinner=spinner, show_stdout=True) expected = (['Hello', 'world'], [ ('pip.subprocessor', INFO, 'Running command '), ('pip.subprocessor', INFO, 'Hello'), ('pip.subprocessor', INFO, 'world'), ]) # The spinner shouldn't spin in this case since the subprocess # output is already being written to the console. self.check_result( capfd, caplog, log_level, spinner, result, expected, expected_spinner=(0, None), ) @pytest.mark.parametrize(( 'exit_status', 'show_stdout', 'extra_ok_returncodes', 'log_level', 'expected'), [ # The spinner should show here because show_stdout=False means # the subprocess should get logged at DEBUG level, but the passed # log level is only INFO. (0, False, None, INFO, (None, 'done', 2)), # Test some cases where the spinner should not be shown. (0, False, None, DEBUG, (None, None, 0)), # Test show_stdout=True. (0, True, None, DEBUG, (None, None, 0)), (0, True, None, INFO, (None, None, 0)), # The spinner should show here because show_stdout=True means # the subprocess should get logged at INFO level, but the passed # log level is only WARNING. (0, True, None, WARNING, (None, 'done', 2)), # Test a non-zero exit status. (3, False, None, INFO, (InstallationError, 'error', 2)), # Test a non-zero exit status also in extra_ok_returncodes. (3, False, (3, ), INFO, (None, 'done', 2)), ]) def test_spinner_finish( self, exit_status, show_stdout, extra_ok_returncodes, log_level, caplog, expected, ): """ Test that the spinner finishes correctly. """ expected_exc_type = expected[0] expected_final_status = expected[1] expected_spin_count = expected[2] command = ( 'print("Hello"); print("world"); exit({})'.format(exit_status) ) args, spinner = self.prepare_call(caplog, log_level, command=command) try: call_subprocess( args, show_stdout=show_stdout, extra_ok_returncodes=extra_ok_returncodes, spinner=spinner, ) except Exception as exc: exc_type = type(exc) else: exc_type = None assert exc_type == expected_exc_type assert spinner.final_status == expected_final_status assert spinner.spin_count == expected_spin_count def test_closes_stdin(self): with pytest.raises(InstallationError): call_subprocess( [sys.executable, '-c', 'input()'], show_stdout=True, )