1
1
Fork 0
mirror of https://github.com/pypa/pip synced 2023-12-13 21:30:23 +01:00

Merge pull request #7104 from pradyunsg/refactor/subprocess-utility

Add pip._internal.utils.subprocess
This commit is contained in:
Pradyun Gedam 2019-09-30 15:37:33 +05:30 committed by GitHub
commit eeba3cbe4e
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
14 changed files with 674 additions and 644 deletions

View file

@ -16,7 +16,7 @@ from sysconfig import get_paths
from pip._vendor.pkg_resources import Requirement, VersionConflict, WorkingSet
from pip import __file__ as pip_location
from pip._internal.utils.misc import call_subprocess
from pip._internal.utils.subprocess import call_subprocess
from pip._internal.utils.temp_dir import TempDirectory
from pip._internal.utils.typing import MYPY_CHECK_RUNNING
from pip._internal.utils.ui import open_spinner

View file

@ -4,8 +4,9 @@
import logging
import os
from pip._internal.utils.misc import call_subprocess, ensure_dir
from pip._internal.utils.misc import ensure_dir
from pip._internal.utils.setuptools_build import make_setuptools_shim_args
from pip._internal.utils.subprocess import call_subprocess
from pip._internal.utils.typing import MYPY_CHECK_RUNNING
if MYPY_CHECK_RUNNING:

View file

@ -38,7 +38,6 @@ from pip._internal.utils.misc import (
_make_build_dir,
ask_path_exists,
backup_dir,
call_subprocess,
display_path,
dist_in_site_packages,
dist_in_usersite,
@ -50,6 +49,7 @@ from pip._internal.utils.misc import (
)
from pip._internal.utils.packaging import get_metadata
from pip._internal.utils.setuptools_build import make_setuptools_shim_args
from pip._internal.utils.subprocess import call_subprocess
from pip._internal.utils.temp_dir import TempDirectory
from pip._internal.utils.typing import MYPY_CHECK_RUNNING
from pip._internal.utils.ui import open_spinner

View file

@ -15,7 +15,8 @@ from pip._vendor.six import PY2
from pip._internal.utils.compat import WINDOWS
from pip._internal.utils.deprecation import DEPRECATION_MSG_PREFIX
from pip._internal.utils.misc import ensure_dir, subprocess_logger
from pip._internal.utils.misc import ensure_dir
from pip._internal.utils.subprocess import subprocess_logger
try:
import threading

View file

@ -13,7 +13,6 @@ import os
import posixpath
import shutil
import stat
import subprocess
import sys
from collections import deque
@ -22,12 +21,12 @@ from pip._vendor import pkg_resources
# why we ignore the type on this import.
from pip._vendor.retrying import retry # type: ignore
from pip._vendor.six import PY2, text_type
from pip._vendor.six.moves import input, shlex_quote
from pip._vendor.six.moves import input
from pip._vendor.six.moves.urllib import parse as urllib_parse
from pip._vendor.six.moves.urllib.parse import unquote as urllib_unquote
from pip import __version__
from pip._internal.exceptions import CommandError, InstallationError
from pip._internal.exceptions import CommandError
from pip._internal.locations import (
get_major_minor_version,
site_packages,
@ -35,7 +34,6 @@ from pip._internal.locations import (
)
from pip._internal.utils.compat import (
WINDOWS,
console_to_str,
expanduser,
stdlib_pkgs,
str_to_display,
@ -54,14 +52,12 @@ else:
if MYPY_CHECK_RUNNING:
from typing import (
Any, AnyStr, Container, Iterable, List, Mapping, Optional, Text,
Any, AnyStr, Container, Iterable, List, Optional, Text,
Tuple, Union, cast,
)
from pip._vendor.pkg_resources import Distribution
from pip._internal.utils.ui import SpinnerInterface
VersionInfo = Tuple[int, int, int]
CommandArgs = List[Union[str, 'HiddenText']]
else:
# typing's cast() is needed at runtime, but we don't want to import typing.
# Thus, we use a dummy no-op version, which we tell mypy to ignore.
@ -74,15 +70,11 @@ __all__ = ['rmtree', 'display_path', 'backup_dir',
'format_size', 'is_installable_dir',
'normalize_path',
'renames', 'get_prog',
'call_subprocess',
'captured_stdout', 'ensure_dir',
'get_installed_version', 'remove_auth_from_url']
logger = logging.getLogger(__name__)
subprocess_logger = logging.getLogger('pip.subprocessor')
LOG_DIVIDER = '----------------------------------------'
def get_pip_version():
@ -533,228 +525,6 @@ def dist_location(dist):
return normalize_path(dist.location)
def make_command(*args):
# type: (Union[str, HiddenText, CommandArgs]) -> CommandArgs
"""
Create a CommandArgs object.
"""
command_args = [] # type: CommandArgs
for arg in args:
# Check for list instead of CommandArgs since CommandArgs is
# only known during type-checking.
if isinstance(arg, list):
command_args.extend(arg)
else:
# Otherwise, arg is str or HiddenText.
command_args.append(arg)
return command_args
def format_command_args(args):
# type: (Union[List[str], CommandArgs]) -> str
"""
Format command arguments for display.
"""
# For HiddenText arguments, display the redacted form by calling str().
# Also, we don't apply str() to arguments that aren't HiddenText since
# this can trigger a UnicodeDecodeError in Python 2 if the argument
# has type unicode and includes a non-ascii character. (The type
# checker doesn't ensure the annotations are correct in all cases.)
return ' '.join(
shlex_quote(str(arg)) if isinstance(arg, HiddenText)
else shlex_quote(arg) for arg in args
)
def reveal_command_args(args):
# type: (Union[List[str], CommandArgs]) -> List[str]
"""
Return the arguments in their raw, unredacted form.
"""
return [
arg.secret if isinstance(arg, HiddenText) else arg for arg in args
]
def make_subprocess_output_error(
cmd_args, # type: Union[List[str], CommandArgs]
cwd, # type: Optional[str]
lines, # type: List[Text]
exit_status, # type: int
):
# type: (...) -> Text
"""
Create and return the error message to use to log a subprocess error
with command output.
:param lines: A list of lines, each ending with a newline.
"""
command = format_command_args(cmd_args)
# Convert `command` and `cwd` to text (unicode in Python 2) so we can use
# them as arguments in the unicode format string below. This avoids
# "UnicodeDecodeError: 'ascii' codec can't decode byte ..." in Python 2
# if either contains a non-ascii character.
command_display = str_to_display(command, desc='command bytes')
cwd_display = path_to_display(cwd)
# We know the joined output value ends in a newline.
output = ''.join(lines)
msg = (
# Use a unicode string to avoid "UnicodeEncodeError: 'ascii'
# codec can't encode character ..." in Python 2 when a format
# argument (e.g. `output`) has a non-ascii character.
u'Command errored out with exit status {exit_status}:\n'
' command: {command_display}\n'
' cwd: {cwd_display}\n'
'Complete output ({line_count} lines):\n{output}{divider}'
).format(
exit_status=exit_status,
command_display=command_display,
cwd_display=cwd_display,
line_count=len(lines),
output=output,
divider=LOG_DIVIDER,
)
return msg
def call_subprocess(
cmd, # type: Union[List[str], CommandArgs]
show_stdout=False, # type: bool
cwd=None, # type: Optional[str]
on_returncode='raise', # type: str
extra_ok_returncodes=None, # type: Optional[Iterable[int]]
command_desc=None, # type: Optional[str]
extra_environ=None, # type: Optional[Mapping[str, Any]]
unset_environ=None, # type: Optional[Iterable[str]]
spinner=None # type: Optional[SpinnerInterface]
):
# type: (...) -> Text
"""
Args:
show_stdout: if true, use INFO to log the subprocess's stderr and
stdout streams. Otherwise, use DEBUG. Defaults to False.
extra_ok_returncodes: an iterable of integer return codes that are
acceptable, in addition to 0. Defaults to None, which means [].
unset_environ: an iterable of environment variable names to unset
prior to calling subprocess.Popen().
"""
if extra_ok_returncodes is None:
extra_ok_returncodes = []
if unset_environ is None:
unset_environ = []
# Most places in pip use show_stdout=False. What this means is--
#
# - We connect the child's output (combined stderr and stdout) to a
# single pipe, which we read.
# - We log this output to stderr at DEBUG level as it is received.
# - If DEBUG logging isn't enabled (e.g. if --verbose logging wasn't
# requested), then we show a spinner so the user can still see the
# subprocess is in progress.
# - If the subprocess exits with an error, we log the output to stderr
# at ERROR level if it hasn't already been displayed to the console
# (e.g. if --verbose logging wasn't enabled). This way we don't log
# the output to the console twice.
#
# If show_stdout=True, then the above is still done, but with DEBUG
# replaced by INFO.
if show_stdout:
# Then log the subprocess output at INFO level.
log_subprocess = subprocess_logger.info
used_level = logging.INFO
else:
# Then log the subprocess output using DEBUG. This also ensures
# it will be logged to the log file (aka user_log), if enabled.
log_subprocess = subprocess_logger.debug
used_level = logging.DEBUG
# Whether the subprocess will be visible in the console.
showing_subprocess = subprocess_logger.getEffectiveLevel() <= used_level
# Only use the spinner if we're not showing the subprocess output
# and we have a spinner.
use_spinner = not showing_subprocess and spinner is not None
if command_desc is None:
command_desc = format_command_args(cmd)
log_subprocess("Running command %s", command_desc)
env = os.environ.copy()
if extra_environ:
env.update(extra_environ)
for name in unset_environ:
env.pop(name, None)
try:
proc = subprocess.Popen(
# Convert HiddenText objects to the underlying str.
reveal_command_args(cmd),
stderr=subprocess.STDOUT, stdin=subprocess.PIPE,
stdout=subprocess.PIPE, cwd=cwd, env=env,
)
proc.stdin.close()
except Exception as exc:
subprocess_logger.critical(
"Error %s while executing command %s", exc, command_desc,
)
raise
all_output = []
while True:
# The "line" value is a unicode string in Python 2.
line = console_to_str(proc.stdout.readline())
if not line:
break
line = line.rstrip()
all_output.append(line + '\n')
# Show the line immediately.
log_subprocess(line)
# Update the spinner.
if use_spinner:
spinner.spin()
try:
proc.wait()
finally:
if proc.stdout:
proc.stdout.close()
proc_had_error = (
proc.returncode and proc.returncode not in extra_ok_returncodes
)
if use_spinner:
if proc_had_error:
spinner.finish("error")
else:
spinner.finish("done")
if proc_had_error:
if on_returncode == 'raise':
if not showing_subprocess:
# Then the subprocess streams haven't been logged to the
# console yet.
msg = make_subprocess_output_error(
cmd_args=cmd,
cwd=cwd,
lines=all_output,
exit_status=proc.returncode,
)
subprocess_logger.error(msg)
exc_msg = (
'Command errored out with exit status {}: {} '
'Check the logs for full command output.'
).format(proc.returncode, command_desc)
raise InstallationError(exc_msg)
elif on_returncode == 'warn':
subprocess_logger.warning(
'Command "%s" had error code %s in %s',
command_desc, proc.returncode, cwd,
)
elif on_returncode == 'ignore':
pass
else:
raise ValueError('Invalid value: on_returncode=%s' %
repr(on_returncode))
return ''.join(all_output)
def write_output(msg, *args):
# type: (str, str) -> None
logger.info(msg, *args)

View file

@ -0,0 +1,247 @@
# The following comment should be removed at some point in the future.
# mypy: strict-optional=False
from __future__ import absolute_import
import logging
import os
import subprocess
from pip._vendor.six.moves import shlex_quote
from pip._internal.exceptions import InstallationError
from pip._internal.utils.compat import console_to_str, str_to_display
from pip._internal.utils.misc import HiddenText, path_to_display
from pip._internal.utils.typing import MYPY_CHECK_RUNNING
if MYPY_CHECK_RUNNING:
from typing import Any, Iterable, List, Mapping, Optional, Text, Union
from pip._internal.utils.ui import SpinnerInterface
CommandArgs = List[Union[str, HiddenText]]
subprocess_logger = logging.getLogger('pip.subprocessor')
LOG_DIVIDER = '----------------------------------------'
def make_command(*args):
# type: (Union[str, HiddenText, CommandArgs]) -> CommandArgs
"""
Create a CommandArgs object.
"""
command_args = [] # type: CommandArgs
for arg in args:
# Check for list instead of CommandArgs since CommandArgs is
# only known during type-checking.
if isinstance(arg, list):
command_args.extend(arg)
else:
# Otherwise, arg is str or HiddenText.
command_args.append(arg)
return command_args
def format_command_args(args):
# type: (Union[List[str], CommandArgs]) -> str
"""
Format command arguments for display.
"""
# For HiddenText arguments, display the redacted form by calling str().
# Also, we don't apply str() to arguments that aren't HiddenText since
# this can trigger a UnicodeDecodeError in Python 2 if the argument
# has type unicode and includes a non-ascii character. (The type
# checker doesn't ensure the annotations are correct in all cases.)
return ' '.join(
shlex_quote(str(arg)) if isinstance(arg, HiddenText)
else shlex_quote(arg) for arg in args
)
def reveal_command_args(args):
# type: (Union[List[str], CommandArgs]) -> List[str]
"""
Return the arguments in their raw, unredacted form.
"""
return [
arg.secret if isinstance(arg, HiddenText) else arg for arg in args
]
def make_subprocess_output_error(
cmd_args, # type: Union[List[str], CommandArgs]
cwd, # type: Optional[str]
lines, # type: List[Text]
exit_status, # type: int
):
# type: (...) -> Text
"""
Create and return the error message to use to log a subprocess error
with command output.
:param lines: A list of lines, each ending with a newline.
"""
command = format_command_args(cmd_args)
# Convert `command` and `cwd` to text (unicode in Python 2) so we can use
# them as arguments in the unicode format string below. This avoids
# "UnicodeDecodeError: 'ascii' codec can't decode byte ..." in Python 2
# if either contains a non-ascii character.
command_display = str_to_display(command, desc='command bytes')
cwd_display = path_to_display(cwd)
# We know the joined output value ends in a newline.
output = ''.join(lines)
msg = (
# Use a unicode string to avoid "UnicodeEncodeError: 'ascii'
# codec can't encode character ..." in Python 2 when a format
# argument (e.g. `output`) has a non-ascii character.
u'Command errored out with exit status {exit_status}:\n'
' command: {command_display}\n'
' cwd: {cwd_display}\n'
'Complete output ({line_count} lines):\n{output}{divider}'
).format(
exit_status=exit_status,
command_display=command_display,
cwd_display=cwd_display,
line_count=len(lines),
output=output,
divider=LOG_DIVIDER,
)
return msg
def call_subprocess(
cmd, # type: Union[List[str], CommandArgs]
show_stdout=False, # type: bool
cwd=None, # type: Optional[str]
on_returncode='raise', # type: str
extra_ok_returncodes=None, # type: Optional[Iterable[int]]
command_desc=None, # type: Optional[str]
extra_environ=None, # type: Optional[Mapping[str, Any]]
unset_environ=None, # type: Optional[Iterable[str]]
spinner=None # type: Optional[SpinnerInterface]
):
# type: (...) -> Text
"""
Args:
show_stdout: if true, use INFO to log the subprocess's stderr and
stdout streams. Otherwise, use DEBUG. Defaults to False.
extra_ok_returncodes: an iterable of integer return codes that are
acceptable, in addition to 0. Defaults to None, which means [].
unset_environ: an iterable of environment variable names to unset
prior to calling subprocess.Popen().
"""
if extra_ok_returncodes is None:
extra_ok_returncodes = []
if unset_environ is None:
unset_environ = []
# Most places in pip use show_stdout=False. What this means is--
#
# - We connect the child's output (combined stderr and stdout) to a
# single pipe, which we read.
# - We log this output to stderr at DEBUG level as it is received.
# - If DEBUG logging isn't enabled (e.g. if --verbose logging wasn't
# requested), then we show a spinner so the user can still see the
# subprocess is in progress.
# - If the subprocess exits with an error, we log the output to stderr
# at ERROR level if it hasn't already been displayed to the console
# (e.g. if --verbose logging wasn't enabled). This way we don't log
# the output to the console twice.
#
# If show_stdout=True, then the above is still done, but with DEBUG
# replaced by INFO.
if show_stdout:
# Then log the subprocess output at INFO level.
log_subprocess = subprocess_logger.info
used_level = logging.INFO
else:
# Then log the subprocess output using DEBUG. This also ensures
# it will be logged to the log file (aka user_log), if enabled.
log_subprocess = subprocess_logger.debug
used_level = logging.DEBUG
# Whether the subprocess will be visible in the console.
showing_subprocess = subprocess_logger.getEffectiveLevel() <= used_level
# Only use the spinner if we're not showing the subprocess output
# and we have a spinner.
use_spinner = not showing_subprocess and spinner is not None
if command_desc is None:
command_desc = format_command_args(cmd)
log_subprocess("Running command %s", command_desc)
env = os.environ.copy()
if extra_environ:
env.update(extra_environ)
for name in unset_environ:
env.pop(name, None)
try:
proc = subprocess.Popen(
# Convert HiddenText objects to the underlying str.
reveal_command_args(cmd),
stderr=subprocess.STDOUT, stdin=subprocess.PIPE,
stdout=subprocess.PIPE, cwd=cwd, env=env,
)
proc.stdin.close()
except Exception as exc:
subprocess_logger.critical(
"Error %s while executing command %s", exc, command_desc,
)
raise
all_output = []
while True:
# The "line" value is a unicode string in Python 2.
line = console_to_str(proc.stdout.readline())
if not line:
break
line = line.rstrip()
all_output.append(line + '\n')
# Show the line immediately.
log_subprocess(line)
# Update the spinner.
if use_spinner:
spinner.spin()
try:
proc.wait()
finally:
if proc.stdout:
proc.stdout.close()
proc_had_error = (
proc.returncode and proc.returncode not in extra_ok_returncodes
)
if use_spinner:
if proc_had_error:
spinner.finish("error")
else:
spinner.finish("done")
if proc_had_error:
if on_returncode == 'raise':
if not showing_subprocess:
# Then the subprocess streams haven't been logged to the
# console yet.
msg = make_subprocess_output_error(
cmd_args=cmd,
cwd=cwd,
lines=all_output,
exit_status=proc.returncode,
)
subprocess_logger.error(msg)
exc_msg = (
'Command errored out with exit status {}: {} '
'Check the logs for full command output.'
).format(proc.returncode, command_desc)
raise InstallationError(exc_msg)
elif on_returncode == 'warn':
subprocess_logger.warning(
'Command "%s" had error code %s in %s',
command_desc, proc.returncode, cwd,
)
elif on_returncode == 'ignore':
pass
else:
raise ValueError('Invalid value: on_returncode=%s' %
repr(on_returncode))
return ''.join(all_output)

View file

@ -8,7 +8,8 @@ import os
from pip._vendor.six.moves.urllib import parse as urllib_parse
from pip._internal.utils.misc import display_path, make_command, rmtree
from pip._internal.utils.misc import display_path, rmtree
from pip._internal.utils.subprocess import make_command
from pip._internal.utils.typing import MYPY_CHECK_RUNNING
from pip._internal.utils.urls import path_to_url
from pip._internal.vcs.versioncontrol import VersionControl, vcs

View file

@ -13,7 +13,8 @@ from pip._vendor.six.moves.urllib import request as urllib_request
from pip._internal.exceptions import BadCommand
from pip._internal.utils.compat import samefile
from pip._internal.utils.misc import display_path, make_command
from pip._internal.utils.misc import display_path
from pip._internal.utils.subprocess import make_command
from pip._internal.utils.temp_dir import TempDirectory
from pip._internal.utils.typing import MYPY_CHECK_RUNNING
from pip._internal.vcs.versioncontrol import (

View file

@ -8,7 +8,8 @@ import os
from pip._vendor.six.moves import configparser
from pip._internal.utils.misc import display_path, make_command
from pip._internal.utils.misc import display_path
from pip._internal.utils.subprocess import make_command
from pip._internal.utils.temp_dir import TempDirectory
from pip._internal.utils.typing import MYPY_CHECK_RUNNING
from pip._internal.utils.urls import path_to_url

View file

@ -11,10 +11,10 @@ import sys
from pip._internal.utils.logging import indent_log
from pip._internal.utils.misc import (
display_path,
make_command,
rmtree,
split_auth_from_netloc,
)
from pip._internal.utils.subprocess import make_command
from pip._internal.utils.typing import MYPY_CHECK_RUNNING
from pip._internal.vcs.versioncontrol import VersionControl, vcs
@ -26,7 +26,8 @@ _svn_info_xml_url_re = re.compile(r'<url>(.*)</url>')
if MYPY_CHECK_RUNNING:
from typing import Optional, Tuple
from pip._internal.utils.misc import CommandArgs, HiddenText
from pip._internal.utils.subprocess import CommandArgs
from pip._internal.utils.misc import HiddenText
from pip._internal.vcs.versioncontrol import AuthInfo, RevOptions

View file

@ -18,13 +18,12 @@ from pip._internal.exceptions import BadCommand
from pip._internal.utils.misc import (
ask_path_exists,
backup_dir,
call_subprocess,
display_path,
hide_url,
hide_value,
make_command,
rmtree,
)
from pip._internal.utils.subprocess import call_subprocess, make_command
from pip._internal.utils.typing import MYPY_CHECK_RUNNING
from pip._internal.utils.urls import get_url_scheme
@ -33,7 +32,8 @@ if MYPY_CHECK_RUNNING:
Any, Dict, Iterable, List, Mapping, Optional, Text, Tuple, Type, Union
)
from pip._internal.utils.ui import SpinnerInterface
from pip._internal.utils.misc import CommandArgs, HiddenText
from pip._internal.utils.misc import HiddenText
from pip._internal.utils.subprocess import CommandArgs
AuthInfo = Tuple[Optional[str], Optional[str]]

View file

@ -38,15 +38,13 @@ from pip._internal.locations import distutils_scheme, get_major_minor_version
from pip._internal.models.link import Link
from pip._internal.utils.logging import indent_log
from pip._internal.utils.marker_files import has_delete_marker_file
from pip._internal.utils.misc import (
from pip._internal.utils.misc import captured_stdout, ensure_dir, read_chunks
from pip._internal.utils.setuptools_build import make_setuptools_shim_args
from pip._internal.utils.subprocess import (
LOG_DIVIDER,
call_subprocess,
captured_stdout,
ensure_dir,
format_command_args,
read_chunks,
)
from pip._internal.utils.setuptools_build import make_setuptools_shim_args
from pip._internal.utils.temp_dir import TempDirectory
from pip._internal.utils.typing import MYPY_CHECK_RUNNING
from pip._internal.utils.ui import open_spinner

View file

@ -5,7 +5,6 @@ util tests
"""
import codecs
import locale
import os
import shutil
import stat
@ -13,8 +12,6 @@ import sys
import time
import warnings
from io import BytesIO
from logging import DEBUG, ERROR, INFO, WARNING
from textwrap import dedent
import pytest
from mock import Mock, patch
@ -37,15 +34,11 @@ from pip._internal.utils.misc import (
HiddenText,
build_netloc,
build_url_from_netloc,
call_subprocess,
egg_link_path,
format_command_args,
get_installed_distributions,
get_prog,
hide_url,
hide_value,
make_command,
make_subprocess_output_error,
normalize_path,
normalize_version_info,
parse_netloc,
@ -59,7 +52,6 @@ from pip._internal.utils.misc import (
split_auth_netloc_from_url,
)
from pip._internal.utils.setuptools_build import make_setuptools_shim_args
from pip._internal.utils.ui import SpinnerInterface
class Tests_EgglinkPath:
@ -634,392 +626,6 @@ class TestGetProg(object):
assert get_prog() == expected
@pytest.mark.parametrize('args, expected', [
(['pip', 'list'], 'pip list'),
(['foo', 'space space', 'new\nline', 'double"quote', "single'quote"],
"""foo 'space space' 'new\nline' 'double"quote' 'single'"'"'quote'"""),
# Test HiddenText arguments.
(make_command(hide_value('secret1'), 'foo', hide_value('secret2')),
"'****' foo '****'"),
])
def test_format_command_args(args, expected):
actual = format_command_args(args)
assert actual == expected
def test_make_subprocess_output_error():
cmd_args = ['test', 'has space']
cwd = '/path/to/cwd'
lines = ['line1\n', 'line2\n', 'line3\n']
actual = make_subprocess_output_error(
cmd_args=cmd_args,
cwd=cwd,
lines=lines,
exit_status=3,
)
expected = dedent("""\
Command errored out with exit status 3:
command: test 'has space'
cwd: /path/to/cwd
Complete output (3 lines):
line1
line2
line3
----------------------------------------""")
assert actual == expected, 'actual: {}'.format(actual)
def test_make_subprocess_output_error__non_ascii_command_arg(monkeypatch):
"""
Test a command argument with a non-ascii character.
"""
cmd_args = ['foo', 'déf']
if sys.version_info[0] == 2:
# Check in Python 2 that the str (bytes object) with the non-ascii
# character has the encoding we expect. (This comes from the source
# code encoding at the top of the file.)
assert cmd_args[1].decode('utf-8') == u'déf'
# We need to monkeypatch so the encoding will be correct on Windows.
monkeypatch.setattr(locale, 'getpreferredencoding', lambda: 'utf-8')
actual = make_subprocess_output_error(
cmd_args=cmd_args,
cwd='/path/to/cwd',
lines=[],
exit_status=1,
)
expected = dedent(u"""\
Command errored out with exit status 1:
command: foo 'déf'
cwd: /path/to/cwd
Complete output (0 lines):
----------------------------------------""")
assert actual == expected, u'actual: {}'.format(actual)
@pytest.mark.skipif("sys.version_info < (3,)")
def test_make_subprocess_output_error__non_ascii_cwd_python_3(monkeypatch):
"""
Test a str (text) cwd with a non-ascii character in Python 3.
"""
cmd_args = ['test']
cwd = '/path/to/cwd/déf'
actual = make_subprocess_output_error(
cmd_args=cmd_args,
cwd=cwd,
lines=[],
exit_status=1,
)
expected = dedent("""\
Command errored out with exit status 1:
command: test
cwd: /path/to/cwd/déf
Complete output (0 lines):
----------------------------------------""")
assert actual == expected, 'actual: {}'.format(actual)
@pytest.mark.parametrize('encoding', [
'utf-8',
# Test a Windows encoding.
'cp1252',
])
@pytest.mark.skipif("sys.version_info >= (3,)")
def test_make_subprocess_output_error__non_ascii_cwd_python_2(
monkeypatch, encoding,
):
"""
Test a str (bytes object) cwd with a non-ascii character in Python 2.
"""
cmd_args = ['test']
cwd = u'/path/to/cwd/déf'.encode(encoding)
monkeypatch.setattr(sys, 'getfilesystemencoding', lambda: encoding)
actual = make_subprocess_output_error(
cmd_args=cmd_args,
cwd=cwd,
lines=[],
exit_status=1,
)
expected = dedent(u"""\
Command errored out with exit status 1:
command: test
cwd: /path/to/cwd/déf
Complete output (0 lines):
----------------------------------------""")
assert actual == expected, u'actual: {}'.format(actual)
# This test is mainly important for checking unicode in Python 2.
def test_make_subprocess_output_error__non_ascii_line():
"""
Test a line with a non-ascii character.
"""
lines = [u'curly-quote: \u2018\n']
actual = make_subprocess_output_error(
cmd_args=['test'],
cwd='/path/to/cwd',
lines=lines,
exit_status=1,
)
expected = dedent(u"""\
Command errored out with exit status 1:
command: test
cwd: /path/to/cwd
Complete output (1 lines):
curly-quote: \u2018
----------------------------------------""")
assert actual == expected, u'actual: {}'.format(actual)
class FakeSpinner(SpinnerInterface):
def __init__(self):
self.spin_count = 0
self.final_status = None
def spin(self):
self.spin_count += 1
def finish(self, final_status):
self.final_status = final_status
class TestCallSubprocess(object):
"""
Test call_subprocess().
"""
def check_result(
self, capfd, caplog, log_level, spinner, result, expected,
expected_spinner,
):
"""
Check the result of calling call_subprocess().
:param log_level: the logging level that caplog was set to.
:param spinner: the FakeSpinner object passed to call_subprocess()
to be checked.
:param result: the call_subprocess() return value to be checked.
:param expected: a pair (expected_proc, expected_records), where
1) `expected_proc` is the expected return value of
call_subprocess() as a list of lines, or None if the return
value is expected to be None;
2) `expected_records` is the expected value of
caplog.record_tuples.
:param expected_spinner: a 2-tuple of the spinner's expected
(spin_count, final_status).
"""
expected_proc, expected_records = expected
if expected_proc is None:
assert result is None
else:
assert result.splitlines() == expected_proc
# Confirm that stdout and stderr haven't been written to.
captured = capfd.readouterr()
assert (captured.out, captured.err) == ('', '')
records = caplog.record_tuples
if len(records) != len(expected_records):
raise RuntimeError('{} != {}'.format(records, expected_records))
for record, expected_record in zip(records, expected_records):
# Check the logger_name and log level parts exactly.
assert record[:2] == expected_record[:2]
# For the message portion, check only a substring. Also, we
# can't use startswith() since the order of stdout and stderr
# isn't guaranteed in cases where stderr is also present.
# For example, we observed the stderr lines coming before stdout
# in CI for PyPy 2.7 even though stdout happens first
# chronologically.
assert expected_record[2] in record[2]
assert (spinner.spin_count, spinner.final_status) == expected_spinner
def prepare_call(self, caplog, log_level, command=None):
if command is None:
command = 'print("Hello"); print("world")'
caplog.set_level(log_level)
spinner = FakeSpinner()
args = [sys.executable, '-c', command]
return (args, spinner)
def test_debug_logging(self, capfd, caplog):
"""
Test DEBUG logging (and without passing show_stdout=True).
"""
log_level = DEBUG
args, spinner = self.prepare_call(caplog, log_level)
result = call_subprocess(args, spinner=spinner)
expected = (['Hello', 'world'], [
('pip.subprocessor', DEBUG, 'Running command '),
('pip.subprocessor', DEBUG, 'Hello'),
('pip.subprocessor', DEBUG, 'world'),
])
# The spinner shouldn't spin in this case since the subprocess
# output is already being logged to the console.
self.check_result(
capfd, caplog, log_level, spinner, result, expected,
expected_spinner=(0, None),
)
def test_info_logging(self, capfd, caplog):
"""
Test INFO logging (and without passing show_stdout=True).
"""
log_level = INFO
args, spinner = self.prepare_call(caplog, log_level)
result = call_subprocess(args, spinner=spinner)
expected = (['Hello', 'world'], [])
# The spinner should spin twice in this case since the subprocess
# output isn't being written to the console.
self.check_result(
capfd, caplog, log_level, spinner, result, expected,
expected_spinner=(2, 'done'),
)
def test_info_logging__subprocess_error(self, capfd, caplog):
"""
Test INFO logging of a subprocess with an error (and without passing
show_stdout=True).
"""
log_level = INFO
command = 'print("Hello"); print("world"); exit("fail")'
args, spinner = self.prepare_call(caplog, log_level, command=command)
with pytest.raises(InstallationError) as exc:
call_subprocess(args, spinner=spinner)
result = None
exc_message = str(exc.value)
assert exc_message.startswith(
'Command errored out with exit status 1: '
)
assert exc_message.endswith('Check the logs for full command output.')
expected = (None, [
('pip.subprocessor', ERROR, 'Complete output (3 lines):\n'),
])
# The spinner should spin three times in this case since the
# subprocess output isn't being written to the console.
self.check_result(
capfd, caplog, log_level, spinner, result, expected,
expected_spinner=(3, 'error'),
)
# Do some further checking on the captured log records to confirm
# that the subprocess output was logged.
last_record = caplog.record_tuples[-1]
last_message = last_record[2]
lines = last_message.splitlines()
# We have to sort before comparing the lines because we can't
# guarantee the order in which stdout and stderr will appear.
# For example, we observed the stderr lines coming before stdout
# in CI for PyPy 2.7 even though stdout happens first chronologically.
actual = sorted(lines)
# Test the "command" line separately because we can't test an
# exact match.
command_line = actual.pop(1)
assert actual == [
' cwd: None',
'----------------------------------------',
'Command errored out with exit status 1:',
'Complete output (3 lines):',
'Hello',
'fail',
'world',
], 'lines: {}'.format(actual) # Show the full output on failure.
assert command_line.startswith(' command: ')
assert command_line.endswith('print("world"); exit("fail")\'')
def test_info_logging_with_show_stdout_true(self, capfd, caplog):
"""
Test INFO logging with show_stdout=True.
"""
log_level = INFO
args, spinner = self.prepare_call(caplog, log_level)
result = call_subprocess(args, spinner=spinner, show_stdout=True)
expected = (['Hello', 'world'], [
('pip.subprocessor', INFO, 'Running command '),
('pip.subprocessor', INFO, 'Hello'),
('pip.subprocessor', INFO, 'world'),
])
# The spinner shouldn't spin in this case since the subprocess
# output is already being written to the console.
self.check_result(
capfd, caplog, log_level, spinner, result, expected,
expected_spinner=(0, None),
)
@pytest.mark.parametrize((
'exit_status', 'show_stdout', 'extra_ok_returncodes', 'log_level',
'expected'),
[
# The spinner should show here because show_stdout=False means
# the subprocess should get logged at DEBUG level, but the passed
# log level is only INFO.
(0, False, None, INFO, (None, 'done', 2)),
# Test some cases where the spinner should not be shown.
(0, False, None, DEBUG, (None, None, 0)),
# Test show_stdout=True.
(0, True, None, DEBUG, (None, None, 0)),
(0, True, None, INFO, (None, None, 0)),
# The spinner should show here because show_stdout=True means
# the subprocess should get logged at INFO level, but the passed
# log level is only WARNING.
(0, True, None, WARNING, (None, 'done', 2)),
# Test a non-zero exit status.
(3, False, None, INFO, (InstallationError, 'error', 2)),
# Test a non-zero exit status also in extra_ok_returncodes.
(3, False, (3, ), INFO, (None, 'done', 2)),
])
def test_spinner_finish(
self, exit_status, show_stdout, extra_ok_returncodes, log_level,
caplog, expected,
):
"""
Test that the spinner finishes correctly.
"""
expected_exc_type = expected[0]
expected_final_status = expected[1]
expected_spin_count = expected[2]
command = (
'print("Hello"); print("world"); exit({})'.format(exit_status)
)
args, spinner = self.prepare_call(caplog, log_level, command=command)
try:
call_subprocess(
args,
show_stdout=show_stdout,
extra_ok_returncodes=extra_ok_returncodes,
spinner=spinner,
)
except Exception as exc:
exc_type = type(exc)
else:
exc_type = None
assert exc_type == expected_exc_type
assert spinner.final_status == expected_final_status
assert spinner.spin_count == expected_spin_count
def test_closes_stdin(self):
with pytest.raises(InstallationError):
call_subprocess(
[sys.executable, '-c', 'input()'],
show_stdout=True,
)
@pytest.mark.parametrize('host_port, expected_netloc', [
# Test domain name.
(('example.com', None), 'example.com'),

View file

@ -0,0 +1,403 @@
# -*- coding: utf-8 -*-
import locale
import sys
from logging import DEBUG, ERROR, INFO, WARNING
from textwrap import dedent
import pytest
from pip._internal.exceptions import InstallationError
from pip._internal.utils.misc import hide_value
from pip._internal.utils.subprocess import (
call_subprocess,
format_command_args,
make_command,
make_subprocess_output_error,
)
from pip._internal.utils.ui import SpinnerInterface
@pytest.mark.parametrize('args, expected', [
(['pip', 'list'], 'pip list'),
(['foo', 'space space', 'new\nline', 'double"quote', "single'quote"],
"""foo 'space space' 'new\nline' 'double"quote' 'single'"'"'quote'"""),
# Test HiddenText arguments.
(make_command(hide_value('secret1'), 'foo', hide_value('secret2')),
"'****' foo '****'"),
])
def test_format_command_args(args, expected):
actual = format_command_args(args)
assert actual == expected
def test_make_subprocess_output_error():
cmd_args = ['test', 'has space']
cwd = '/path/to/cwd'
lines = ['line1\n', 'line2\n', 'line3\n']
actual = make_subprocess_output_error(
cmd_args=cmd_args,
cwd=cwd,
lines=lines,
exit_status=3,
)
expected = dedent("""\
Command errored out with exit status 3:
command: test 'has space'
cwd: /path/to/cwd
Complete output (3 lines):
line1
line2
line3
----------------------------------------""")
assert actual == expected, 'actual: {}'.format(actual)
def test_make_subprocess_output_error__non_ascii_command_arg(monkeypatch):
"""
Test a command argument with a non-ascii character.
"""
cmd_args = ['foo', 'déf']
if sys.version_info[0] == 2:
# Check in Python 2 that the str (bytes object) with the non-ascii
# character has the encoding we expect. (This comes from the source
# code encoding at the top of the file.)
assert cmd_args[1].decode('utf-8') == u'déf'
# We need to monkeypatch so the encoding will be correct on Windows.
monkeypatch.setattr(locale, 'getpreferredencoding', lambda: 'utf-8')
actual = make_subprocess_output_error(
cmd_args=cmd_args,
cwd='/path/to/cwd',
lines=[],
exit_status=1,
)
expected = dedent(u"""\
Command errored out with exit status 1:
command: foo 'déf'
cwd: /path/to/cwd
Complete output (0 lines):
----------------------------------------""")
assert actual == expected, u'actual: {}'.format(actual)
@pytest.mark.skipif("sys.version_info < (3,)")
def test_make_subprocess_output_error__non_ascii_cwd_python_3(monkeypatch):
"""
Test a str (text) cwd with a non-ascii character in Python 3.
"""
cmd_args = ['test']
cwd = '/path/to/cwd/déf'
actual = make_subprocess_output_error(
cmd_args=cmd_args,
cwd=cwd,
lines=[],
exit_status=1,
)
expected = dedent("""\
Command errored out with exit status 1:
command: test
cwd: /path/to/cwd/déf
Complete output (0 lines):
----------------------------------------""")
assert actual == expected, 'actual: {}'.format(actual)
@pytest.mark.parametrize('encoding', [
'utf-8',
# Test a Windows encoding.
'cp1252',
])
@pytest.mark.skipif("sys.version_info >= (3,)")
def test_make_subprocess_output_error__non_ascii_cwd_python_2(
monkeypatch, encoding,
):
"""
Test a str (bytes object) cwd with a non-ascii character in Python 2.
"""
cmd_args = ['test']
cwd = u'/path/to/cwd/déf'.encode(encoding)
monkeypatch.setattr(sys, 'getfilesystemencoding', lambda: encoding)
actual = make_subprocess_output_error(
cmd_args=cmd_args,
cwd=cwd,
lines=[],
exit_status=1,
)
expected = dedent(u"""\
Command errored out with exit status 1:
command: test
cwd: /path/to/cwd/déf
Complete output (0 lines):
----------------------------------------""")
assert actual == expected, u'actual: {}'.format(actual)
# This test is mainly important for checking unicode in Python 2.
def test_make_subprocess_output_error__non_ascii_line():
"""
Test a line with a non-ascii character.
"""
lines = [u'curly-quote: \u2018\n']
actual = make_subprocess_output_error(
cmd_args=['test'],
cwd='/path/to/cwd',
lines=lines,
exit_status=1,
)
expected = dedent(u"""\
Command errored out with exit status 1:
command: test
cwd: /path/to/cwd
Complete output (1 lines):
curly-quote: \u2018
----------------------------------------""")
assert actual == expected, u'actual: {}'.format(actual)
class FakeSpinner(SpinnerInterface):
def __init__(self):
self.spin_count = 0
self.final_status = None
def spin(self):
self.spin_count += 1
def finish(self, final_status):
self.final_status = final_status
class TestCallSubprocess(object):
"""
Test call_subprocess().
"""
def check_result(
self, capfd, caplog, log_level, spinner, result, expected,
expected_spinner,
):
"""
Check the result of calling call_subprocess().
:param log_level: the logging level that caplog was set to.
:param spinner: the FakeSpinner object passed to call_subprocess()
to be checked.
:param result: the call_subprocess() return value to be checked.
:param expected: a pair (expected_proc, expected_records), where
1) `expected_proc` is the expected return value of
call_subprocess() as a list of lines, or None if the return
value is expected to be None;
2) `expected_records` is the expected value of
caplog.record_tuples.
:param expected_spinner: a 2-tuple of the spinner's expected
(spin_count, final_status).
"""
expected_proc, expected_records = expected
if expected_proc is None:
assert result is None
else:
assert result.splitlines() == expected_proc
# Confirm that stdout and stderr haven't been written to.
captured = capfd.readouterr()
assert (captured.out, captured.err) == ('', '')
records = caplog.record_tuples
if len(records) != len(expected_records):
raise RuntimeError('{} != {}'.format(records, expected_records))
for record, expected_record in zip(records, expected_records):
# Check the logger_name and log level parts exactly.
assert record[:2] == expected_record[:2]
# For the message portion, check only a substring. Also, we
# can't use startswith() since the order of stdout and stderr
# isn't guaranteed in cases where stderr is also present.
# For example, we observed the stderr lines coming before stdout
# in CI for PyPy 2.7 even though stdout happens first
# chronologically.
assert expected_record[2] in record[2]
assert (spinner.spin_count, spinner.final_status) == expected_spinner
def prepare_call(self, caplog, log_level, command=None):
if command is None:
command = 'print("Hello"); print("world")'
caplog.set_level(log_level)
spinner = FakeSpinner()
args = [sys.executable, '-c', command]
return (args, spinner)
def test_debug_logging(self, capfd, caplog):
"""
Test DEBUG logging (and without passing show_stdout=True).
"""
log_level = DEBUG
args, spinner = self.prepare_call(caplog, log_level)
result = call_subprocess(args, spinner=spinner)
expected = (['Hello', 'world'], [
('pip.subprocessor', DEBUG, 'Running command '),
('pip.subprocessor', DEBUG, 'Hello'),
('pip.subprocessor', DEBUG, 'world'),
])
# The spinner shouldn't spin in this case since the subprocess
# output is already being logged to the console.
self.check_result(
capfd, caplog, log_level, spinner, result, expected,
expected_spinner=(0, None),
)
def test_info_logging(self, capfd, caplog):
"""
Test INFO logging (and without passing show_stdout=True).
"""
log_level = INFO
args, spinner = self.prepare_call(caplog, log_level)
result = call_subprocess(args, spinner=spinner)
expected = (['Hello', 'world'], [])
# The spinner should spin twice in this case since the subprocess
# output isn't being written to the console.
self.check_result(
capfd, caplog, log_level, spinner, result, expected,
expected_spinner=(2, 'done'),
)
def test_info_logging__subprocess_error(self, capfd, caplog):
"""
Test INFO logging of a subprocess with an error (and without passing
show_stdout=True).
"""
log_level = INFO
command = 'print("Hello"); print("world"); exit("fail")'
args, spinner = self.prepare_call(caplog, log_level, command=command)
with pytest.raises(InstallationError) as exc:
call_subprocess(args, spinner=spinner)
result = None
exc_message = str(exc.value)
assert exc_message.startswith(
'Command errored out with exit status 1: '
)
assert exc_message.endswith('Check the logs for full command output.')
expected = (None, [
('pip.subprocessor', ERROR, 'Complete output (3 lines):\n'),
])
# The spinner should spin three times in this case since the
# subprocess output isn't being written to the console.
self.check_result(
capfd, caplog, log_level, spinner, result, expected,
expected_spinner=(3, 'error'),
)
# Do some further checking on the captured log records to confirm
# that the subprocess output was logged.
last_record = caplog.record_tuples[-1]
last_message = last_record[2]
lines = last_message.splitlines()
# We have to sort before comparing the lines because we can't
# guarantee the order in which stdout and stderr will appear.
# For example, we observed the stderr lines coming before stdout
# in CI for PyPy 2.7 even though stdout happens first chronologically.
actual = sorted(lines)
# Test the "command" line separately because we can't test an
# exact match.
command_line = actual.pop(1)
assert actual == [
' cwd: None',
'----------------------------------------',
'Command errored out with exit status 1:',
'Complete output (3 lines):',
'Hello',
'fail',
'world',
], 'lines: {}'.format(actual) # Show the full output on failure.
assert command_line.startswith(' command: ')
assert command_line.endswith('print("world"); exit("fail")\'')
def test_info_logging_with_show_stdout_true(self, capfd, caplog):
"""
Test INFO logging with show_stdout=True.
"""
log_level = INFO
args, spinner = self.prepare_call(caplog, log_level)
result = call_subprocess(args, spinner=spinner, show_stdout=True)
expected = (['Hello', 'world'], [
('pip.subprocessor', INFO, 'Running command '),
('pip.subprocessor', INFO, 'Hello'),
('pip.subprocessor', INFO, 'world'),
])
# The spinner shouldn't spin in this case since the subprocess
# output is already being written to the console.
self.check_result(
capfd, caplog, log_level, spinner, result, expected,
expected_spinner=(0, None),
)
@pytest.mark.parametrize((
'exit_status', 'show_stdout', 'extra_ok_returncodes', 'log_level',
'expected'),
[
# The spinner should show here because show_stdout=False means
# the subprocess should get logged at DEBUG level, but the passed
# log level is only INFO.
(0, False, None, INFO, (None, 'done', 2)),
# Test some cases where the spinner should not be shown.
(0, False, None, DEBUG, (None, None, 0)),
# Test show_stdout=True.
(0, True, None, DEBUG, (None, None, 0)),
(0, True, None, INFO, (None, None, 0)),
# The spinner should show here because show_stdout=True means
# the subprocess should get logged at INFO level, but the passed
# log level is only WARNING.
(0, True, None, WARNING, (None, 'done', 2)),
# Test a non-zero exit status.
(3, False, None, INFO, (InstallationError, 'error', 2)),
# Test a non-zero exit status also in extra_ok_returncodes.
(3, False, (3, ), INFO, (None, 'done', 2)),
])
def test_spinner_finish(
self, exit_status, show_stdout, extra_ok_returncodes, log_level,
caplog, expected,
):
"""
Test that the spinner finishes correctly.
"""
expected_exc_type = expected[0]
expected_final_status = expected[1]
expected_spin_count = expected[2]
command = (
'print("Hello"); print("world"); exit({})'.format(exit_status)
)
args, spinner = self.prepare_call(caplog, log_level, command=command)
try:
call_subprocess(
args,
show_stdout=show_stdout,
extra_ok_returncodes=extra_ok_returncodes,
spinner=spinner,
)
except Exception as exc:
exc_type = type(exc)
else:
exc_type = None
assert exc_type == expected_exc_type
assert spinner.final_status == expected_final_status
assert spinner.spin_count == expected_spin_count
def test_closes_stdin(self):
with pytest.raises(InstallationError):
call_subprocess(
[sys.executable, '-c', 'input()'],
show_stdout=True,
)