1
1
Fork 0
mirror of https://github.com/pypa/pip synced 2023-12-13 21:30:23 +01:00

Merge pull request #9633 from jdufresne/blacken-utils

Blacken src/pip/_internal/utils directory
This commit is contained in:
Pradyun Gedam 2021-02-21 15:58:25 +00:00 committed by GitHub
commit 599fc57114
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
25 changed files with 403 additions and 443 deletions

View file

@ -31,7 +31,6 @@ repos:
^src/pip/_internal/network|
^src/pip/_internal/operations|
^src/pip/_internal/req|
^src/pip/_internal/utils|
^src/pip/_internal/vcs|
^src/pip/_internal/\w+\.py$|
^src/pip/__main__.py$|

View file

@ -21,7 +21,7 @@ def user_config_dir(appname, roaming=True):
# type: (str, bool) -> str
path = _appdirs.user_config_dir(appname, appauthor=False, roaming=roaming)
if _appdirs.system == "darwin" and not os.path.isdir(path):
path = os.path.expanduser('~/.config/')
path = os.path.expanduser("~/.config/")
if appname:
path = os.path.join(path, appname)
return path
@ -34,5 +34,5 @@ def site_config_dirs(appname):
dirval = _appdirs.site_config_dir(appname, appauthor=False, multipath=True)
if _appdirs.system not in ["win32", "darwin"]:
# always look in /etc directly as well
return dirval.split(os.pathsep) + ['/etc']
return dirval.split(os.pathsep) + ["/etc"]
return [dirval]

View file

@ -18,11 +18,13 @@ def has_tls():
# type: () -> bool
try:
import _ssl # noqa: F401 # ignore unused
return True
except ImportError:
pass
from pip._vendor.urllib3.util import IS_PYOPENSSL
return IS_PYOPENSSL
@ -39,7 +41,7 @@ def get_path_uid(path):
:raises OSError: When path is a symlink or can't be read.
"""
if hasattr(os, 'O_NOFOLLOW'):
if hasattr(os, "O_NOFOLLOW"):
fd = os.open(path, os.O_RDONLY | os.O_NOFOLLOW)
file_uid = os.fstat(fd).st_uid
os.close(fd)
@ -51,8 +53,7 @@ def get_path_uid(path):
else:
# raise OSError for parity with os.O_NOFOLLOW above
raise OSError(
"{} is a symlink; Will not return uid for symlinks".format(
path)
"{} is a symlink; Will not return uid for symlinks".format(path)
)
return file_uid
@ -66,5 +67,4 @@ stdlib_pkgs = {"python", "wsgiref", "argparse"}
# windows detection, covers cpython and ironpython
WINDOWS = (sys.platform.startswith("win") or
(sys.platform == 'cli' and os.name == 'nt'))
WINDOWS = sys.platform.startswith("win") or (sys.platform == "cli" and os.name == "nt")

View file

@ -18,13 +18,13 @@ if TYPE_CHECKING:
from pip._vendor.packaging.tags import PythonVersion
_osx_arch_pat = re.compile(r'(.+)_(\d+)_(\d+)_(.+)')
_osx_arch_pat = re.compile(r"(.+)_(\d+)_(\d+)_(.+)")
def version_info_to_nodot(version_info):
# type: (Tuple[int, ...]) -> str
# Only use up to the first two numbers.
return ''.join(map(str, version_info[:2]))
return "".join(map(str, version_info[:2]))
def _mac_platforms(arch):
@ -39,7 +39,7 @@ def _mac_platforms(arch):
# actual prefix provided by the user in case they provided
# something like "macosxcustom_". It may be good to remove
# this as undocumented or deprecate it in the future.
'{}_{}'.format(name, arch[len('macosx_'):])
"{}_{}".format(name, arch[len("macosx_") :])
for arch in mac_platforms(mac_version, actual_arch)
]
else:
@ -51,31 +51,31 @@ def _mac_platforms(arch):
def _custom_manylinux_platforms(arch):
# type: (str) -> List[str]
arches = [arch]
arch_prefix, arch_sep, arch_suffix = arch.partition('_')
if arch_prefix == 'manylinux2014':
arch_prefix, arch_sep, arch_suffix = arch.partition("_")
if arch_prefix == "manylinux2014":
# manylinux1/manylinux2010 wheels run on most manylinux2014 systems
# with the exception of wheels depending on ncurses. PEP 599 states
# manylinux1/manylinux2010 wheels should be considered
# manylinux2014 wheels:
# https://www.python.org/dev/peps/pep-0599/#backwards-compatibility-with-manylinux2010-wheels
if arch_suffix in {'i686', 'x86_64'}:
arches.append('manylinux2010' + arch_sep + arch_suffix)
arches.append('manylinux1' + arch_sep + arch_suffix)
elif arch_prefix == 'manylinux2010':
if arch_suffix in {"i686", "x86_64"}:
arches.append("manylinux2010" + arch_sep + arch_suffix)
arches.append("manylinux1" + arch_sep + arch_suffix)
elif arch_prefix == "manylinux2010":
# manylinux1 wheels run on most manylinux2010 systems with the
# exception of wheels depending on ncurses. PEP 571 states
# manylinux1 wheels should be considered manylinux2010 wheels:
# https://www.python.org/dev/peps/pep-0571/#backwards-compatibility-with-manylinux1-wheels
arches.append('manylinux1' + arch_sep + arch_suffix)
arches.append("manylinux1" + arch_sep + arch_suffix)
return arches
def _get_custom_platforms(arch):
# type: (str) -> List[str]
arch_prefix, arch_sep, arch_suffix = arch.partition('_')
if arch.startswith('macosx'):
arch_prefix, arch_sep, arch_suffix = arch.partition("_")
if arch.startswith("macosx"):
arches = _mac_platforms(arch)
elif arch_prefix in ['manylinux2014', 'manylinux2010']:
elif arch_prefix in ["manylinux2014", "manylinux2010"]:
arches = _custom_manylinux_platforms(arch)
else:
arches = [arch]
@ -121,7 +121,7 @@ def get_supported(
version=None, # type: Optional[str]
platforms=None, # type: Optional[List[str]]
impl=None, # type: Optional[str]
abis=None # type: Optional[List[str]]
abis=None, # type: Optional[List[str]]
):
# type: (...) -> List[Tag]
"""Return a list of supported tags for each version specified in

View file

@ -27,18 +27,14 @@ _original_showwarning = None # type: Any
def _showwarning(message, category, filename, lineno, file=None, line=None):
if file is not None:
if _original_showwarning is not None:
_original_showwarning(
message, category, filename, lineno, file, line,
)
_original_showwarning(message, category, filename, lineno, file, line)
elif issubclass(category, PipDeprecationWarning):
# We use a specially named logger which will handle all of the
# deprecation messages for pip.
logger = logging.getLogger("pip._internal.deprecations")
logger.warning(message)
else:
_original_showwarning(
message, category, filename, lineno, file, line,
)
_original_showwarning(message, category, filename, lineno, file, line)
def install_warning_logger():
@ -82,10 +78,13 @@ def deprecated(reason, replacement, gone_in, issue=None):
(reason, DEPRECATION_MSG_PREFIX + "{}"),
(gone_in, "pip {} will remove support for this functionality."),
(replacement, "A possible replacement is {}."),
(issue, (
"You can find discussion regarding this at "
"https://github.com/pypa/pip/issues/{}."
)),
(
issue,
(
"You can find discussion regarding this at "
"https://github.com/pypa/pip/issues/{}."
),
),
]
message = " ".join(
template.format(val) for val, template in sentences if val is not None

View file

@ -47,8 +47,8 @@ def direct_url_from_link(link, source_dir=None, link_is_in_wheel_cache=False):
if link.is_vcs:
vcs_backend = vcs.get_backend_for_scheme(link.scheme)
assert vcs_backend
url, requested_revision, _ = (
vcs_backend.get_url_rev_and_auth(link.url_without_fragment)
url, requested_revision, _ = vcs_backend.get_url_rev_and_auth(
link.url_without_fragment
)
# For VCS links, we need to find out and add commit_id.
if link_is_in_wheel_cache:
@ -106,7 +106,7 @@ def dist_get_direct_url(dist):
except (
DirectUrlValidationError,
json.JSONDecodeError,
UnicodeDecodeError
UnicodeDecodeError,
) as e:
logger.warning(
"Error parsing %s for %s: %s",

View file

@ -5,16 +5,16 @@ import sys
from typing import List, Tuple
BOMS = [
(codecs.BOM_UTF8, 'utf-8'),
(codecs.BOM_UTF16, 'utf-16'),
(codecs.BOM_UTF16_BE, 'utf-16-be'),
(codecs.BOM_UTF16_LE, 'utf-16-le'),
(codecs.BOM_UTF32, 'utf-32'),
(codecs.BOM_UTF32_BE, 'utf-32-be'),
(codecs.BOM_UTF32_LE, 'utf-32-le'),
(codecs.BOM_UTF8, "utf-8"),
(codecs.BOM_UTF16, "utf-16"),
(codecs.BOM_UTF16_BE, "utf-16-be"),
(codecs.BOM_UTF16_LE, "utf-16-le"),
(codecs.BOM_UTF32, "utf-32"),
(codecs.BOM_UTF32_BE, "utf-32-be"),
(codecs.BOM_UTF32_LE, "utf-32-le"),
] # type: List[Tuple[bytes, str]]
ENCODING_RE = re.compile(br'coding[:=]\s*([-\w.]+)')
ENCODING_RE = re.compile(br"coding[:=]\s*([-\w.]+)")
def auto_decode(data):
@ -24,13 +24,13 @@ def auto_decode(data):
Fallback to locale.getpreferredencoding(False) like open() on Python3"""
for bom, encoding in BOMS:
if data.startswith(bom):
return data[len(bom):].decode(encoding)
return data[len(bom) :].decode(encoding)
# Lets check the first two lines as in PEP263
for line in data.split(b'\n')[:2]:
if line[0:1] == b'#' and ENCODING_RE.search(line):
for line in data.split(b"\n")[:2]:
if line[0:1] == b"#" and ENCODING_RE.search(line):
result = ENCODING_RE.search(line)
assert result is not None
encoding = result.groups()[0].decode('ascii')
encoding = result.groups()[0].decode("ascii")
return data.decode(encoding)
return data.decode(
locale.getpreferredencoding(False) or sys.getdefaultencoding(),

View file

@ -89,8 +89,8 @@ def adjacent_tmp_file(path, **kwargs):
delete=False,
dir=os.path.dirname(path),
prefix=os.path.basename(path),
suffix='.tmp',
**kwargs
suffix=".tmp",
**kwargs,
) as f:
result = cast(BinaryIO, f)
try:
@ -120,7 +120,7 @@ def test_writable_dir(path):
break # Should never get here, but infinite loops are bad
path = parent
if os.name == 'posix':
if os.name == "posix":
return os.access(path, os.W_OK)
return _test_writable_dir_win(path)
@ -130,10 +130,10 @@ def _test_writable_dir_win(path):
# type: (str) -> bool
# os.access doesn't work on Windows: http://bugs.python.org/issue2528
# and we can't use tempfile: http://bugs.python.org/issue22107
basename = 'accesstest_deleteme_fishfingers_custard_'
alphabet = 'abcdefghijklmnopqrstuvwxyz0123456789'
basename = "accesstest_deleteme_fishfingers_custard_"
alphabet = "abcdefghijklmnopqrstuvwxyz0123456789"
for _ in range(10):
name = basename + ''.join(random.choice(alphabet) for _ in range(6))
name = basename + "".join(random.choice(alphabet) for _ in range(6))
file = os.path.join(path, name)
try:
fd = os.open(file, os.O_RDWR | os.O_CREAT | os.O_EXCL)
@ -152,9 +152,7 @@ def _test_writable_dir_win(path):
return True
# This should never be reached
raise OSError(
'Unexpected condition testing for writable directory'
)
raise OSError("Unexpected condition testing for writable directory")
def find_files(path, pattern):

View file

@ -5,15 +5,18 @@ from typing import Tuple
from pip._internal.utils.misc import splitext
WHEEL_EXTENSION = '.whl'
BZ2_EXTENSIONS = ('.tar.bz2', '.tbz') # type: Tuple[str, ...]
XZ_EXTENSIONS = ('.tar.xz', '.txz', '.tlz',
'.tar.lz', '.tar.lzma') # type: Tuple[str, ...]
ZIP_EXTENSIONS = ('.zip', WHEEL_EXTENSION) # type: Tuple[str, ...]
TAR_EXTENSIONS = ('.tar.gz', '.tgz', '.tar') # type: Tuple[str, ...]
ARCHIVE_EXTENSIONS = (
ZIP_EXTENSIONS + BZ2_EXTENSIONS + TAR_EXTENSIONS + XZ_EXTENSIONS
)
WHEEL_EXTENSION = ".whl"
BZ2_EXTENSIONS = (".tar.bz2", ".tbz") # type: Tuple[str, ...]
XZ_EXTENSIONS = (
".tar.xz",
".txz",
".tlz",
".tar.lz",
".tar.lzma",
) # type: Tuple[str, ...]
ZIP_EXTENSIONS = (".zip", WHEEL_EXTENSION) # type: Tuple[str, ...]
TAR_EXTENSIONS = (".tar.gz", ".tgz", ".tar") # type: Tuple[str, ...]
ARCHIVE_EXTENSIONS = ZIP_EXTENSIONS + BZ2_EXTENSIONS + TAR_EXTENSIONS + XZ_EXTENSIONS
def is_archive_file(name):

View file

@ -10,12 +10,12 @@ if TYPE_CHECKING:
# The recommended hash algo of the moment. Change this whenever the state of
# the art changes; it won't hurt backward compatibility.
FAVORITE_HASH = 'sha256'
FAVORITE_HASH = "sha256"
# Names of hashlib algorithms allowed by the --hash option and ``pip hash``
# Currently, those are the ones at least as collision-resistant as sha256.
STRONG_HASHES = ['sha256', 'sha384', 'sha512']
STRONG_HASHES = ["sha256", "sha384", "sha512"]
class Hashes:
@ -23,6 +23,7 @@ class Hashes:
known-good values
"""
def __init__(self, hashes=None):
# type: (Dict[str, List[str]]) -> None
"""
@ -63,7 +64,7 @@ class Hashes:
def is_hash_allowed(
self,
hash_name, # type: str
hash_name, # type: str
hex_digest, # type: str
):
# type: (...) -> bool
@ -83,9 +84,7 @@ class Hashes:
try:
gots[hash_name] = hashlib.new(hash_name)
except (ValueError, TypeError):
raise InstallationError(
f'Unknown hash name: {hash_name}'
)
raise InstallationError(f"Unknown hash name: {hash_name}")
for chunk in chunks:
for hash in gots.values():
@ -111,7 +110,7 @@ class Hashes:
def check_against_path(self, path):
# type: (str) -> None
with open(path, 'rb') as file:
with open(path, "rb") as file:
return self.check_against_file(file)
def __nonzero__(self):
@ -132,11 +131,13 @@ class Hashes:
def __hash__(self):
# type: () -> int
return hash(
",".join(sorted(
":".join((alg, digest))
for alg, digest_list in self._allowed.items()
for digest in digest_list
))
",".join(
sorted(
":".join((alg, digest))
for alg, digest_list in self._allowed.items()
for digest in digest_list
)
)
)
@ -147,6 +148,7 @@ class MissingHashes(Hashes):
exception showing it to the user.
"""
def __init__(self):
# type: () -> None
"""Don't offer the ``hashes`` kwarg."""

View file

@ -22,7 +22,7 @@ def inject_securetransport():
return
# Checks for OpenSSL 1.0.1
if ssl.OPENSSL_VERSION_NUMBER >= 0x1000100f:
if ssl.OPENSSL_VERSION_NUMBER >= 0x1000100F:
return
try:

View file

@ -29,13 +29,14 @@ except Exception:
_log_state = threading.local()
subprocess_logger = getLogger('pip.subprocessor')
subprocess_logger = getLogger("pip.subprocessor")
class BrokenStdoutLoggingError(Exception):
"""
Raised if BrokenPipeError occurs for the stdout stream while logging.
"""
pass
@ -46,9 +47,11 @@ if WINDOWS:
# https://bugs.python.org/issue30418
def _is_broken_pipe_error(exc_class, exc):
"""See the docstring for non-Windows below."""
return ((exc_class is BrokenPipeError) or
(exc_class is OSError and
exc.errno in (errno.EINVAL, errno.EPIPE)))
return (exc_class is BrokenPipeError) or (
exc_class is OSError and exc.errno in (errno.EINVAL, errno.EPIPE)
)
else:
# Then we are in the non-Windows case.
def _is_broken_pipe_error(exc_class, exc):
@ -59,7 +62,7 @@ else:
exc_class: an exception class.
exc: an exception instance.
"""
return (exc_class is BrokenPipeError)
return exc_class is BrokenPipeError
@contextlib.contextmanager
@ -78,7 +81,7 @@ def indent_log(num=2):
def get_indentation():
return getattr(_log_state, 'indentation', 0)
return getattr(_log_state, "indentation", 0)
class IndentingFormatter(logging.Formatter):
@ -106,15 +109,15 @@ class IndentingFormatter(logging.Formatter):
prefix to add to each line).
"""
if levelno < logging.WARNING:
return ''
return ""
if formatted.startswith(DEPRECATION_MSG_PREFIX):
# Then the message already has a prefix. We don't want it to
# look like "WARNING: DEPRECATION: ...."
return ''
return ""
if levelno < logging.ERROR:
return 'WARNING: '
return "WARNING: "
return 'ERROR: '
return "ERROR: "
def format(self, record):
"""
@ -125,20 +128,18 @@ class IndentingFormatter(logging.Formatter):
message_start = self.get_message_start(formatted, record.levelno)
formatted = message_start + formatted
prefix = ''
prefix = ""
if self.add_timestamp:
prefix = f"{self.formatTime(record)} "
prefix += " " * get_indentation()
formatted = "".join([
prefix + line
for line in formatted.splitlines(True)
])
formatted = "".join([prefix + line for line in formatted.splitlines(True)])
return formatted
def _color_wrap(*colors):
def wrapped(inp):
return "".join(list(colors) + [inp, colorama.Style.RESET_ALL])
return wrapped
@ -177,7 +178,8 @@ class ColorizedStreamHandler(logging.StreamHandler):
return False
real_stream = (
self.stream if not isinstance(self.stream, colorama.AnsiToWin32)
self.stream
if not isinstance(self.stream, colorama.AnsiToWin32)
else self.stream.wrapped
)
@ -210,22 +212,19 @@ class ColorizedStreamHandler(logging.StreamHandler):
# stdout stream in logging's Handler.emit(), then raise our special
# exception so we can handle it in main() instead of logging the
# broken pipe error and continuing.
if (exc_class and self._using_stdout() and
_is_broken_pipe_error(exc_class, exc)):
if exc_class and self._using_stdout() and _is_broken_pipe_error(exc_class, exc):
raise BrokenStdoutLoggingError()
return super().handleError(record)
class BetterRotatingFileHandler(logging.handlers.RotatingFileHandler):
def _open(self):
ensure_dir(os.path.dirname(self.baseFilename))
return logging.handlers.RotatingFileHandler._open(self)
class MaxLevelFilter(Filter):
def __init__(self, level):
self.level = level
@ -292,78 +291,76 @@ def setup_logging(verbosity, no_color, user_log_file):
["user_log"] if include_user_log else []
)
logging.config.dictConfig({
"version": 1,
"disable_existing_loggers": False,
"filters": {
"exclude_warnings": {
"()": "pip._internal.utils.logging.MaxLevelFilter",
"level": logging.WARNING,
logging.config.dictConfig(
{
"version": 1,
"disable_existing_loggers": False,
"filters": {
"exclude_warnings": {
"()": "pip._internal.utils.logging.MaxLevelFilter",
"level": logging.WARNING,
},
"restrict_to_subprocess": {
"()": "logging.Filter",
"name": subprocess_logger.name,
},
"exclude_subprocess": {
"()": "pip._internal.utils.logging.ExcludeLoggerFilter",
"name": subprocess_logger.name,
},
},
"restrict_to_subprocess": {
"()": "logging.Filter",
"name": subprocess_logger.name,
"formatters": {
"indent": {
"()": IndentingFormatter,
"format": "%(message)s",
},
"indent_with_timestamp": {
"()": IndentingFormatter,
"format": "%(message)s",
"add_timestamp": True,
},
},
"exclude_subprocess": {
"()": "pip._internal.utils.logging.ExcludeLoggerFilter",
"name": subprocess_logger.name,
"handlers": {
"console": {
"level": level,
"class": handler_classes["stream"],
"no_color": no_color,
"stream": log_streams["stdout"],
"filters": ["exclude_subprocess", "exclude_warnings"],
"formatter": "indent",
},
"console_errors": {
"level": "WARNING",
"class": handler_classes["stream"],
"no_color": no_color,
"stream": log_streams["stderr"],
"filters": ["exclude_subprocess"],
"formatter": "indent",
},
# A handler responsible for logging to the console messages
# from the "subprocessor" logger.
"console_subprocess": {
"level": level,
"class": handler_classes["stream"],
"no_color": no_color,
"stream": log_streams["stderr"],
"filters": ["restrict_to_subprocess"],
"formatter": "indent",
},
"user_log": {
"level": "DEBUG",
"class": handler_classes["file"],
"filename": additional_log_file,
"delay": True,
"formatter": "indent_with_timestamp",
},
},
},
"formatters": {
"indent": {
"()": IndentingFormatter,
"format": "%(message)s",
"root": {
"level": root_level,
"handlers": handlers,
},
"indent_with_timestamp": {
"()": IndentingFormatter,
"format": "%(message)s",
"add_timestamp": True,
},
},
"handlers": {
"console": {
"level": level,
"class": handler_classes["stream"],
"no_color": no_color,
"stream": log_streams["stdout"],
"filters": ["exclude_subprocess", "exclude_warnings"],
"formatter": "indent",
},
"console_errors": {
"level": "WARNING",
"class": handler_classes["stream"],
"no_color": no_color,
"stream": log_streams["stderr"],
"filters": ["exclude_subprocess"],
"formatter": "indent",
},
# A handler responsible for logging to the console messages
# from the "subprocessor" logger.
"console_subprocess": {
"level": level,
"class": handler_classes["stream"],
"no_color": no_color,
"stream": log_streams["stderr"],
"filters": ["restrict_to_subprocess"],
"formatter": "indent",
},
"user_log": {
"level": "DEBUG",
"class": handler_classes["file"],
"filename": additional_log_file,
"delay": True,
"formatter": "indent_with_timestamp",
},
},
"root": {
"level": root_level,
"handlers": handlers,
},
"loggers": {
"pip._vendor": {
"level": vendored_log_level
}
},
})
"loggers": {"pip._vendor": {"level": vendored_log_level}},
}
)
return level_number

View file

@ -45,13 +45,21 @@ from pip._internal.utils.virtualenv import (
virtualenv_no_global,
)
__all__ = ['rmtree', 'display_path', 'backup_dir',
'ask', 'splitext',
'format_size', 'is_installable_dir',
'normalize_path',
'renames', 'get_prog',
'captured_stdout', 'ensure_dir',
'remove_auth_from_url']
__all__ = [
"rmtree",
"display_path",
"backup_dir",
"ask",
"splitext",
"format_size",
"is_installable_dir",
"normalize_path",
"renames",
"get_prog",
"captured_stdout",
"ensure_dir",
"remove_auth_from_url",
]
logger = logging.getLogger(__name__)
@ -65,10 +73,10 @@ def get_pip_version():
pip_pkg_dir = os.path.join(os.path.dirname(__file__), "..", "..")
pip_pkg_dir = os.path.abspath(pip_pkg_dir)
return (
'pip {} from {} (python {})'.format(
__version__, pip_pkg_dir, get_major_minor_version(),
)
return "pip {} from {} (python {})".format(
__version__,
pip_pkg_dir,
get_major_minor_version(),
)
@ -89,7 +97,7 @@ def normalize_version_info(py_version_info):
elif len(py_version_info) > 3:
py_version_info = py_version_info[:3]
return cast('VersionInfo', py_version_info)
return cast("VersionInfo", py_version_info)
def ensure_dir(path):
@ -107,21 +115,20 @@ def get_prog():
# type: () -> str
try:
prog = os.path.basename(sys.argv[0])
if prog in ('__main__.py', '-c'):
if prog in ("__main__.py", "-c"):
return f"{sys.executable} -m pip"
else:
return prog
except (AttributeError, TypeError, IndexError):
pass
return 'pip'
return "pip"
# Retry every half second for up to 3 seconds
@retry(stop_max_delay=3000, wait_fixed=500)
def rmtree(dir, ignore_errors=False):
# type: (AnyStr, bool) -> None
shutil.rmtree(dir, ignore_errors=ignore_errors,
onerror=rmtree_errorhandler)
shutil.rmtree(dir, ignore_errors=ignore_errors, onerror=rmtree_errorhandler)
def rmtree_errorhandler(func, path, exc_info):
@ -150,11 +157,11 @@ def display_path(path):
if possible."""
path = os.path.normcase(os.path.abspath(path))
if path.startswith(os.getcwd() + os.path.sep):
path = '.' + path[len(os.getcwd()):]
path = "." + path[len(os.getcwd()) :]
return path
def backup_dir(dir, ext='.bak'):
def backup_dir(dir, ext=".bak"):
# type: (str, str) -> str
"""Figure out the name of a directory to back up the given dir to
(adding .bak, .bak2, etc)"""
@ -168,7 +175,7 @@ def backup_dir(dir, ext='.bak'):
def ask_path_exists(message, options):
# type: (str, Iterable[str]) -> str
for action in os.environ.get('PIP_EXISTS_ACTION', '').split():
for action in os.environ.get("PIP_EXISTS_ACTION", "").split():
if action in options:
return action
return ask(message, options)
@ -177,10 +184,9 @@ def ask_path_exists(message, options):
def _check_no_input(message):
# type: (str) -> None
"""Raise an error if no input is allowed."""
if os.environ.get('PIP_NO_INPUT'):
if os.environ.get("PIP_NO_INPUT"):
raise Exception(
'No input was expected ($PIP_NO_INPUT set); question: {}'.format(
message)
"No input was expected ($PIP_NO_INPUT set); question: {}".format(message)
)
@ -193,8 +199,8 @@ def ask(message, options):
response = response.strip().lower()
if response not in options:
print(
'Your response ({!r}) was not one of the expected responses: '
'{}'.format(response, ', '.join(options))
"Your response ({!r}) was not one of the expected responses: "
"{}".format(response, ", ".join(options))
)
else:
return response
@ -223,9 +229,9 @@ def strtobool(val):
'val' is anything else.
"""
val = val.lower()
if val in ('y', 'yes', 't', 'true', 'on', '1'):
if val in ("y", "yes", "t", "true", "on", "1"):
return 1
elif val in ('n', 'no', 'f', 'false', 'off', '0'):
elif val in ("n", "no", "f", "false", "off", "0"):
return 0
else:
raise ValueError("invalid truth value %r" % (val,))
@ -234,13 +240,13 @@ def strtobool(val):
def format_size(bytes):
# type: (float) -> str
if bytes > 1000 * 1000:
return '{:.1f} MB'.format(bytes / 1000.0 / 1000)
return "{:.1f} MB".format(bytes / 1000.0 / 1000)
elif bytes > 10 * 1000:
return '{} kB'.format(int(bytes / 1000))
return "{} kB".format(int(bytes / 1000))
elif bytes > 1000:
return '{:.1f} kB'.format(bytes / 1000.0)
return "{:.1f} kB".format(bytes / 1000.0)
else:
return '{} bytes'.format(int(bytes))
return "{} bytes".format(int(bytes))
def tabulate(rows):
@ -253,21 +259,20 @@ def tabulate(rows):
(['foobar 2000', '3735928559'], [10, 4])
"""
rows = [tuple(map(str, row)) for row in rows]
sizes = [max(map(len, col)) for col in zip_longest(*rows, fillvalue='')]
sizes = [max(map(len, col)) for col in zip_longest(*rows, fillvalue="")]
table = [" ".join(map(str.ljust, row, sizes)).rstrip() for row in rows]
return table, sizes
def is_installable_dir(path):
# type: (str) -> bool
"""Is path is a directory containing setup.py or pyproject.toml?
"""
"""Is path is a directory containing setup.py or pyproject.toml?"""
if not os.path.isdir(path):
return False
setup_py = os.path.join(path, 'setup.py')
setup_py = os.path.join(path, "setup.py")
if os.path.isfile(setup_py):
return True
pyproject_toml = os.path.join(path, 'pyproject.toml')
pyproject_toml = os.path.join(path, "pyproject.toml")
if os.path.isfile(pyproject_toml):
return True
return False
@ -300,7 +305,7 @@ def splitext(path):
# type: (str) -> Tuple[str, str]
"""Like os.path.splitext, but take off .tar too"""
base, ext = posixpath.splitext(path)
if base.lower().endswith('.tar'):
if base.lower().endswith(".tar"):
ext = base[-4:] + ext
base = base[:-4]
return base, ext
@ -374,19 +379,19 @@ def dist_is_editable(dist):
Return True if given Distribution is an editable install.
"""
for path_item in sys.path:
egg_link = os.path.join(path_item, dist.project_name + '.egg-link')
egg_link = os.path.join(path_item, dist.project_name + ".egg-link")
if os.path.isfile(egg_link):
return True
return False
def get_installed_distributions(
local_only=True, # type: bool
skip=stdlib_pkgs, # type: Container[str]
include_editables=True, # type: bool
editables_only=False, # type: bool
user_only=False, # type: bool
paths=None # type: Optional[List[str]]
local_only=True, # type: bool
skip=stdlib_pkgs, # type: Container[str]
include_editables=True, # type: bool
editables_only=False, # type: bool
user_only=False, # type: bool
paths=None, # type: Optional[List[str]]
):
# type: (...) -> List[Distribution]
"""Return a list of installed Distribution objects.
@ -421,6 +426,7 @@ def get_distribution(req_name):
"""
from pip._internal.metadata import get_default_environment
from pip._internal.metadata.pkg_resources import Distribution as _Dist
dist = get_default_environment().get_distribution(req_name)
if dist is None:
return None
@ -457,7 +463,7 @@ def egg_link_path(dist):
sites.append(site_packages)
for site in sites:
egglink = os.path.join(site, dist.project_name) + '.egg-link'
egglink = os.path.join(site, dist.project_name) + ".egg-link"
if os.path.isfile(egglink):
return egglink
return None
@ -485,7 +491,6 @@ def write_output(msg, *args):
class StreamWrapper(StringIO):
@classmethod
def from_stream(cls, orig_stream):
cls.orig_stream = orig_stream
@ -521,22 +526,22 @@ def captured_stdout():
Taken from Lib/support/__init__.py in the CPython repo.
"""
return captured_output('stdout')
return captured_output("stdout")
def captured_stderr():
"""
See captured_stdout().
"""
return captured_output('stderr')
return captured_output("stderr")
# Simulates an enum
def enum(*sequential, **named):
enums = dict(zip(sequential, range(len(sequential))), **named)
reverse = {value: key for key, value in enums.items()}
enums['reverse_mapping'] = reverse
return type('Enum', (), enums)
enums["reverse_mapping"] = reverse
return type("Enum", (), enums)
def build_netloc(host, port):
@ -546,21 +551,21 @@ def build_netloc(host, port):
"""
if port is None:
return host
if ':' in host:
if ":" in host:
# Only wrap host with square brackets when it is IPv6
host = f'[{host}]'
return f'{host}:{port}'
host = f"[{host}]"
return f"{host}:{port}"
def build_url_from_netloc(netloc, scheme='https'):
def build_url_from_netloc(netloc, scheme="https"):
# type: (str, str) -> str
"""
Build a full URL from a netloc.
"""
if netloc.count(':') >= 2 and '@' not in netloc and '[' not in netloc:
if netloc.count(":") >= 2 and "@" not in netloc and "[" not in netloc:
# It must be a bare IPv6 address, so wrap it with brackets.
netloc = f'[{netloc}]'
return f'{scheme}://{netloc}'
netloc = f"[{netloc}]"
return f"{scheme}://{netloc}"
def parse_netloc(netloc):
@ -579,24 +584,22 @@ def split_auth_from_netloc(netloc):
Returns: (netloc, (username, password)).
"""
if '@' not in netloc:
if "@" not in netloc:
return netloc, (None, None)
# Split from the right because that's how urllib.parse.urlsplit()
# behaves if more than one @ is present (which can be checked using
# the password attribute of urlsplit()'s return value).
auth, netloc = netloc.rsplit('@', 1)
if ':' in auth:
auth, netloc = netloc.rsplit("@", 1)
if ":" in auth:
# Split from the left because that's how urllib.parse.urlsplit()
# behaves if more than one : is present (which again can be checked
# using the password attribute of the return value)
user_pass = auth.split(':', 1)
user_pass = auth.split(":", 1)
else:
user_pass = auth, None
user_pass = tuple(
None if x is None else urllib.parse.unquote(x) for x in user_pass
)
user_pass = tuple(None if x is None else urllib.parse.unquote(x) for x in user_pass)
return netloc, user_pass
@ -614,14 +617,14 @@ def redact_netloc(netloc):
if user is None:
return netloc
if password is None:
user = '****'
password = ''
user = "****"
password = ""
else:
user = urllib.parse.quote(user)
password = ':****'
return '{user}{password}@{netloc}'.format(user=user,
password=password,
netloc=netloc)
password = ":****"
return "{user}{password}@{netloc}".format(
user=user, password=password, netloc=netloc
)
def _transform_url(url, transform_netloc):
@ -637,9 +640,7 @@ def _transform_url(url, transform_netloc):
purl = urllib.parse.urlsplit(url)
netloc_tuple = transform_netloc(purl.netloc)
# stripped url
url_pieces = (
purl.scheme, netloc_tuple[0], purl.path, purl.query, purl.fragment
)
url_pieces = (purl.scheme, netloc_tuple[0], purl.path, purl.query, purl.fragment)
surl = urllib.parse.urlunsplit(url_pieces)
return surl, netloc_tuple
@ -680,7 +681,7 @@ def redact_auth_from_url(url):
class HiddenText:
def __init__(
self,
secret, # type: str
secret, # type: str
redacted, # type: str
):
# type: (...) -> None
@ -689,7 +690,7 @@ class HiddenText:
def __repr__(self):
# type: (...) -> str
return '<HiddenText {!r}>'.format(str(self))
return "<HiddenText {!r}>".format(str(self))
def __str__(self):
# type: (...) -> str
@ -703,12 +704,12 @@ class HiddenText:
# The string being used for redaction doesn't also have to match,
# just the raw, original string.
return (self.secret == other.secret)
return self.secret == other.secret
def hide_value(value):
# type: (str) -> HiddenText
return HiddenText(value, redacted='****')
return HiddenText(value, redacted="****")
def hide_url(url):
@ -727,41 +728,36 @@ def protect_pip_from_modification_on_windows(modifying_pip):
pip_names = [
"pip.exe",
"pip{}.exe".format(sys.version_info[0]),
"pip{}.{}.exe".format(*sys.version_info[:2])
"pip{}.{}.exe".format(*sys.version_info[:2]),
]
# See https://github.com/pypa/pip/issues/1299 for more discussion
should_show_use_python_msg = (
modifying_pip and
WINDOWS and
os.path.basename(sys.argv[0]) in pip_names
modifying_pip and WINDOWS and os.path.basename(sys.argv[0]) in pip_names
)
if should_show_use_python_msg:
new_command = [
sys.executable, "-m", "pip"
] + sys.argv[1:]
new_command = [sys.executable, "-m", "pip"] + sys.argv[1:]
raise CommandError(
'To modify pip, please run the following command:\n{}'
.format(" ".join(new_command))
"To modify pip, please run the following command:\n{}".format(
" ".join(new_command)
)
)
def is_console_interactive():
# type: () -> bool
"""Is this console interactive?
"""
"""Is this console interactive?"""
return sys.stdin is not None and sys.stdin.isatty()
def hash_file(path, blocksize=1 << 20):
# type: (str, int) -> Tuple[Any, int]
"""Return (hash, length) for path using hashlib.sha256()
"""
"""Return (hash, length) for path using hashlib.sha256()"""
h = hashlib.sha256()
length = 0
with open(path, 'rb') as f:
with open(path, "rb") as f:
for block in read_chunks(f, size=blocksize):
length += len(block)
h.update(block)

View file

@ -7,10 +7,9 @@ import operator
class KeyBasedCompareMixin:
"""Provides comparison capabilities that is based on a key
"""
"""Provides comparison capabilities that is based on a key"""
__slots__ = ['_compare_key', '_defining_class']
__slots__ = ["_compare_key", "_defining_class"]
def __init__(self, key, defining_class):
self._compare_key = key

View file

@ -31,7 +31,7 @@ def check_requires_python(requires_python, version_info):
return True
requires_python_specifier = specifiers.SpecifierSet(requires_python)
python_version = version.parse('.'.join(map(str, version_info)))
python_version = version.parse(".".join(map(str, version_info)))
return python_version in requires_python_specifier
@ -41,16 +41,17 @@ def get_metadata(dist):
:raises NoneMetadataError: if the distribution reports `has_metadata()`
True but `get_metadata()` returns None.
"""
metadata_name = 'METADATA'
if (isinstance(dist, pkg_resources.DistInfoDistribution) and
dist.has_metadata(metadata_name)):
metadata_name = "METADATA"
if isinstance(dist, pkg_resources.DistInfoDistribution) and dist.has_metadata(
metadata_name
):
metadata = dist.get_metadata(metadata_name)
elif dist.has_metadata('PKG-INFO'):
metadata_name = 'PKG-INFO'
elif dist.has_metadata("PKG-INFO"):
metadata_name = "PKG-INFO"
metadata = dist.get_metadata(metadata_name)
else:
logger.warning("No metadata found in %s", display_path(dist.location))
metadata = ''
metadata = ""
if metadata is None:
raise NoneMetadataError(dist, metadata_name)
@ -69,7 +70,7 @@ def get_requires_python(dist):
if not present.
"""
pkg_info_dict = get_metadata(dist)
requires_python = pkg_info_dict.get('Requires-Python')
requires_python = pkg_info_dict.get("Requires-Python")
if requires_python is not None:
# Convert to a str to satisfy the type checker, since requires_python
@ -81,8 +82,8 @@ def get_requires_python(dist):
def get_installer(dist):
# type: (Distribution) -> str
if dist.has_metadata('INSTALLER'):
for line in dist.get_metadata_lines('INSTALLER'):
if dist.has_metadata("INSTALLER"):
for line in dist.get_metadata_lines("INSTALLER"):
if line.strip():
return line.strip()
return ''
return ""

View file

@ -16,7 +16,7 @@ These helpers work like Python 3's map, with two differences:
than using the default value of 1.
"""
__all__ = ['map_multiprocess', 'map_multithread']
__all__ = ["map_multiprocess", "map_multithread"]
from contextlib import contextmanager
from multiprocessing import Pool as ProcessPool
@ -27,8 +27,8 @@ from typing import Callable, Iterable, Iterator, TypeVar, Union
from pip._vendor.requests.adapters import DEFAULT_POOLSIZE
Pool = Union[pool.Pool, pool.ThreadPool]
S = TypeVar('S')
T = TypeVar('T')
S = TypeVar("S")
T = TypeVar("T")
# On platforms without sem_open, multiprocessing[.dummy] Pool
# cannot be created.

View file

@ -4,8 +4,8 @@ from pip._vendor.pkg_resources import yield_lines
class DictMetadata:
"""IMetadataProvider that reads metadata files from a dictionary.
"""
"""IMetadataProvider that reads metadata files from a dictionary."""
def __init__(self, metadata):
# type: (Dict[str, bytes]) -> None
self._metadata = metadata

View file

@ -20,7 +20,7 @@ def make_setuptools_shim_args(
setup_py_path, # type: str
global_options=None, # type: Sequence[str]
no_user_config=False, # type: bool
unbuffered_output=False # type: bool
unbuffered_output=False, # type: bool
):
# type: (...) -> List[str]
"""
@ -55,9 +55,7 @@ def make_setuptools_bdist_wheel_args(
# relies on site.py to find parts of the standard library outside the
# virtualenv.
args = make_setuptools_shim_args(
setup_py_path,
global_options=global_options,
unbuffered_output=True
setup_py_path, global_options=global_options, unbuffered_output=True
)
args += ["bdist_wheel", "-d", destination_dir]
args += build_options
@ -70,9 +68,7 @@ def make_setuptools_clean_args(
):
# type: (...) -> List[str]
args = make_setuptools_shim_args(
setup_py_path,
global_options=global_options,
unbuffered_output=True
setup_py_path, global_options=global_options, unbuffered_output=True
)
args += ["clean", "--all"]
return args
@ -117,9 +113,7 @@ def make_setuptools_egg_info_args(
no_user_config, # type: bool
):
# type: (...) -> List[str]
args = make_setuptools_shim_args(
setup_py_path, no_user_config=no_user_config
)
args = make_setuptools_shim_args(setup_py_path, no_user_config=no_user_config)
args += ["egg_info"]
@ -140,7 +134,7 @@ def make_setuptools_install_args(
home, # type: Optional[str]
use_user_site, # type: bool
no_user_config, # type: bool
pycompile # type: bool
pycompile, # type: bool
):
# type: (...) -> List[str]
assert not (use_user_site and prefix)
@ -150,7 +144,7 @@ def make_setuptools_install_args(
setup_py_path,
global_options=global_options,
no_user_config=no_user_config,
unbuffered_output=True
unbuffered_output=True,
)
args += ["install", "--record", record_filename]
args += ["--single-version-externally-managed"]

View file

@ -12,7 +12,7 @@ from pip._internal.utils.misc import HiddenText
CommandArgs = List[Union[str, HiddenText]]
LOG_DIVIDER = '----------------------------------------'
LOG_DIVIDER = "----------------------------------------"
def make_command(*args):
@ -43,9 +43,9 @@ def format_command_args(args):
# this can trigger a UnicodeDecodeError in Python 2 if the argument
# has type unicode and includes a non-ascii character. (The type
# checker doesn't ensure the annotations are correct in all cases.)
return ' '.join(
shlex.quote(str(arg)) if isinstance(arg, HiddenText)
else shlex.quote(arg) for arg in args
return " ".join(
shlex.quote(str(arg)) if isinstance(arg, HiddenText) else shlex.quote(arg)
for arg in args
)
@ -54,15 +54,13 @@ def reveal_command_args(args):
"""
Return the arguments in their raw, unredacted form.
"""
return [
arg.secret if isinstance(arg, HiddenText) else arg for arg in args
]
return [arg.secret if isinstance(arg, HiddenText) else arg for arg in args]
def make_subprocess_output_error(
cmd_args, # type: Union[List[str], CommandArgs]
cwd, # type: Optional[str]
lines, # type: List[str]
cmd_args, # type: Union[List[str], CommandArgs]
cwd, # type: Optional[str]
lines, # type: List[str]
exit_status, # type: int
):
# type: (...) -> str
@ -75,15 +73,15 @@ def make_subprocess_output_error(
command = format_command_args(cmd_args)
# We know the joined output value ends in a newline.
output = ''.join(lines)
output = "".join(lines)
msg = (
# Use a unicode string to avoid "UnicodeEncodeError: 'ascii'
# codec can't encode character ..." in Python 2 when a format
# argument (e.g. `output`) has a non-ascii character.
'Command errored out with exit status {exit_status}:\n'
' command: {command_display}\n'
' cwd: {cwd_display}\n'
'Complete output ({line_count} lines):\n{output}{divider}'
"Command errored out with exit status {exit_status}:\n"
" command: {command_display}\n"
" cwd: {cwd_display}\n"
"Complete output ({line_count} lines):\n{output}{divider}"
).format(
exit_status=exit_status,
command_display=command,
@ -99,7 +97,7 @@ def call_subprocess(
cmd, # type: Union[List[str], CommandArgs]
show_stdout=False, # type: bool
cwd=None, # type: Optional[str]
on_returncode='raise', # type: str
on_returncode="raise", # type: str
extra_ok_returncodes=None, # type: Optional[Iterable[int]]
command_desc=None, # type: Optional[str]
extra_environ=None, # type: Optional[Mapping[str, Any]]
@ -181,7 +179,9 @@ def call_subprocess(
except Exception as exc:
if log_failed_cmd:
subprocess_logger.critical(
"Error %s while executing command %s", exc, command_desc,
"Error %s while executing command %s",
exc,
command_desc,
)
raise
all_output = []
@ -195,7 +195,7 @@ def call_subprocess(
if not line:
break
line = line.rstrip()
all_output.append(line + '\n')
all_output.append(line + "\n")
# Show the line immediately.
log_subprocess(line)
@ -208,7 +208,7 @@ def call_subprocess(
finally:
if proc.stdout:
proc.stdout.close()
output = ''.join(all_output)
output = "".join(all_output)
else:
# In this mode, stdout and stderr are in different pipes.
# We must use communicate() which is the only safe way to read both.
@ -222,9 +222,7 @@ def call_subprocess(
all_output.append(err)
output = out
proc_had_error = (
proc.returncode and proc.returncode not in extra_ok_returncodes
)
proc_had_error = proc.returncode and proc.returncode not in extra_ok_returncodes
if use_spinner:
assert spinner
if proc_had_error:
@ -232,7 +230,7 @@ def call_subprocess(
else:
spinner.finish("done")
if proc_had_error:
if on_returncode == 'raise':
if on_returncode == "raise":
if not showing_subprocess and log_failed_cmd:
# Then the subprocess streams haven't been logged to the
# console yet.
@ -244,18 +242,17 @@ def call_subprocess(
)
subprocess_logger.error(msg)
raise InstallationSubprocessError(proc.returncode, command_desc)
elif on_returncode == 'warn':
elif on_returncode == "warn":
subprocess_logger.warning(
'Command "%s" had error code %s in %s',
command_desc,
proc.returncode,
cwd,
)
elif on_returncode == 'ignore':
elif on_returncode == "ignore":
pass
else:
raise ValueError('Invalid value: on_returncode={!r}'.format(
on_returncode))
raise ValueError("Invalid value: on_returncode={!r}".format(on_returncode))
return output
@ -270,7 +267,7 @@ def runner_with_spinner_message(message):
def runner(
cmd, # type: List[str]
cwd=None, # type: Optional[str]
extra_environ=None # type: Optional[Mapping[str, Any]]
extra_environ=None, # type: Optional[Mapping[str, Any]]
):
# type: (...) -> None
with open_spinner(message) as spinner:

View file

@ -10,7 +10,7 @@ from pip._internal.utils.misc import enum, rmtree
logger = logging.getLogger(__name__)
_T = TypeVar('_T', bound='TempDirectory')
_T = TypeVar("_T", bound="TempDirectory")
# Kinds of temporary directories. Only needed for ones that are
@ -38,8 +38,7 @@ def global_tempdir_manager():
class TempDirectoryTypeRegistry:
"""Manages temp directory behavior
"""
"""Manages temp directory behavior"""
def __init__(self):
# type: () -> None
@ -108,7 +107,7 @@ class TempDirectory:
def __init__(
self,
path=None, # type: Optional[str]
path=None, # type: Optional[str]
delete=_default, # type: Union[bool, None, _Default]
kind="temp", # type: str
globally_managed=False, # type: bool
@ -142,9 +141,7 @@ class TempDirectory:
@property
def path(self):
# type: () -> str
assert not self._deleted, (
f"Attempted to access deleted path: {self._path}"
)
assert not self._deleted, f"Attempted to access deleted path: {self._path}"
return self._path
def __repr__(self):
@ -169,22 +166,18 @@ class TempDirectory:
def _create(self, kind):
# type: (str) -> str
"""Create a temporary directory and store its path in self.path
"""
"""Create a temporary directory and store its path in self.path"""
# We realpath here because some systems have their default tmpdir
# symlinked to another directory. This tends to confuse build
# scripts, so we canonicalize the path by traversing potential
# symlinks here.
path = os.path.realpath(
tempfile.mkdtemp(prefix=f"pip-{kind}-")
)
path = os.path.realpath(tempfile.mkdtemp(prefix=f"pip-{kind}-"))
logger.debug("Created temporary directory: %s", path)
return path
def cleanup(self):
# type: () -> None
"""Remove the temporary directory created and reset state
"""
"""Remove the temporary directory created and reset state"""
self._deleted = True
if not os.path.exists(self._path):
return
@ -205,6 +198,7 @@ class AdjacentTempDirectory(TempDirectory):
(when used as a contextmanager)
"""
# The characters that may be used to name the temp directory
# We always prepend a ~ and then rotate through these until
# a usable name is found.
@ -214,7 +208,7 @@ class AdjacentTempDirectory(TempDirectory):
def __init__(self, original, delete=None):
# type: (str, Optional[bool]) -> None
self.original = original.rstrip('/\\')
self.original = original.rstrip("/\\")
super().__init__(delete=delete)
@classmethod
@ -229,16 +223,18 @@ class AdjacentTempDirectory(TempDirectory):
"""
for i in range(1, len(name)):
for candidate in itertools.combinations_with_replacement(
cls.LEADING_CHARS, i - 1):
new_name = '~' + ''.join(candidate) + name[i:]
cls.LEADING_CHARS, i - 1
):
new_name = "~" + "".join(candidate) + name[i:]
if new_name != name:
yield new_name
# If we make it this far, we will have to make a longer name
for i in range(len(cls.LEADING_CHARS)):
for candidate in itertools.combinations_with_replacement(
cls.LEADING_CHARS, i):
new_name = '~' + ''.join(candidate) + name
cls.LEADING_CHARS, i
):
new_name = "~" + "".join(candidate) + name
if new_name != name:
yield new_name
@ -258,9 +254,7 @@ class AdjacentTempDirectory(TempDirectory):
break
else:
# Final fallback on the default behavior.
path = os.path.realpath(
tempfile.mkdtemp(prefix=f"pip-{kind}-")
)
path = os.path.realpath(tempfile.mkdtemp(prefix=f"pip-{kind}-"))
logger.debug("Created temporary directory: %s", path)
return path

View file

@ -26,16 +26,18 @@ SUPPORTED_EXTENSIONS = ZIP_EXTENSIONS + TAR_EXTENSIONS
try:
import bz2 # noqa
SUPPORTED_EXTENSIONS += BZ2_EXTENSIONS
except ImportError:
logger.debug('bz2 module is not available')
logger.debug("bz2 module is not available")
try:
# Only for Python 3.3+
import lzma # noqa
SUPPORTED_EXTENSIONS += XZ_EXTENSIONS
except ImportError:
logger.debug('lzma module is not available')
logger.debug("lzma module is not available")
def current_umask():
@ -48,18 +50,15 @@ def current_umask():
def split_leading_dir(path):
# type: (str) -> List[str]
path = path.lstrip('/').lstrip('\\')
if (
'/' in path and (
('\\' in path and path.find('/') < path.find('\\')) or
'\\' not in path
)
path = path.lstrip("/").lstrip("\\")
if "/" in path and (
("\\" in path and path.find("/") < path.find("\\")) or "\\" not in path
):
return path.split('/', 1)
elif '\\' in path:
return path.split('\\', 1)
return path.split("/", 1)
elif "\\" in path:
return path.split("\\", 1)
else:
return [path, '']
return [path, ""]
def has_leading_dir(paths):
@ -118,7 +117,7 @@ def unzip_file(filename, location, flatten=True):
no-ops per the python docs.
"""
ensure_dir(location)
zipfp = open(filename, 'rb')
zipfp = open(filename, "rb")
try:
zip = zipfile.ZipFile(zipfp, allowZip64=True)
leading = has_leading_dir(zip.namelist()) and flatten
@ -131,11 +130,11 @@ def unzip_file(filename, location, flatten=True):
dir = os.path.dirname(fn)
if not is_within_directory(location, fn):
message = (
'The zip file ({}) has a file ({}) trying to install '
'outside target directory ({})'
"The zip file ({}) has a file ({}) trying to install "
"outside target directory ({})"
)
raise InstallationError(message.format(filename, fn, location))
if fn.endswith('/') or fn.endswith('\\'):
if fn.endswith("/") or fn.endswith("\\"):
# A directory
ensure_dir(fn)
else:
@ -144,7 +143,7 @@ def unzip_file(filename, location, flatten=True):
# chunk of memory for the file's content
fp = zip.open(name)
try:
with open(fn, 'wb') as destfp:
with open(fn, "wb") as destfp:
shutil.copyfileobj(fp, destfp)
finally:
fp.close()
@ -165,24 +164,23 @@ def untar_file(filename, location):
no-ops per the python docs.
"""
ensure_dir(location)
if filename.lower().endswith('.gz') or filename.lower().endswith('.tgz'):
mode = 'r:gz'
if filename.lower().endswith(".gz") or filename.lower().endswith(".tgz"):
mode = "r:gz"
elif filename.lower().endswith(BZ2_EXTENSIONS):
mode = 'r:bz2'
mode = "r:bz2"
elif filename.lower().endswith(XZ_EXTENSIONS):
mode = 'r:xz'
elif filename.lower().endswith('.tar'):
mode = 'r'
mode = "r:xz"
elif filename.lower().endswith(".tar"):
mode = "r"
else:
logger.warning(
'Cannot determine compression type for file %s', filename,
"Cannot determine compression type for file %s",
filename,
)
mode = 'r:*'
mode = "r:*"
tar = tarfile.open(filename, mode)
try:
leading = has_leading_dir([
member.name for member in tar.getmembers()
])
leading = has_leading_dir([member.name for member in tar.getmembers()])
for member in tar.getmembers():
fn = member.name
if leading:
@ -190,12 +188,10 @@ def untar_file(filename, location):
path = os.path.join(location, fn)
if not is_within_directory(location, path):
message = (
'The tar file ({}) has a file ({}) trying to install '
'outside target directory ({})'
)
raise InstallationError(
message.format(filename, path, location)
"The tar file ({}) has a file ({}) trying to install "
"outside target directory ({})"
)
raise InstallationError(message.format(filename, path, location))
if member.isdir():
ensure_dir(path)
elif member.issym():
@ -206,8 +202,10 @@ def untar_file(filename, location):
# Some corrupt tar files seem to produce this
# (specifically bad symlinks)
logger.warning(
'In the tar file %s the member %s is invalid: %s',
filename, member.name, exc,
"In the tar file %s the member %s is invalid: %s",
filename,
member.name,
exc,
)
continue
else:
@ -217,13 +215,15 @@ def untar_file(filename, location):
# Some corrupt tar files seem to produce this
# (specifically bad symlinks)
logger.warning(
'In the tar file %s the member %s is invalid: %s',
filename, member.name, exc,
"In the tar file %s the member %s is invalid: %s",
filename,
member.name,
exc,
)
continue
ensure_dir(os.path.dirname(path))
assert fp is not None
with open(path, 'wb') as destfp:
with open(path, "wb") as destfp:
shutil.copyfileobj(fp, destfp)
fp.close()
# Update the timestamp (useful for cython compiled files)
@ -236,38 +236,32 @@ def untar_file(filename, location):
def unpack_file(
filename, # type: str
location, # type: str
content_type=None, # type: Optional[str]
filename, # type: str
location, # type: str
content_type=None, # type: Optional[str]
):
# type: (...) -> None
filename = os.path.realpath(filename)
if (
content_type == 'application/zip' or
filename.lower().endswith(ZIP_EXTENSIONS) or
zipfile.is_zipfile(filename)
content_type == "application/zip"
or filename.lower().endswith(ZIP_EXTENSIONS)
or zipfile.is_zipfile(filename)
):
unzip_file(
filename,
location,
flatten=not filename.endswith('.whl')
)
unzip_file(filename, location, flatten=not filename.endswith(".whl"))
elif (
content_type == 'application/x-gzip' or
tarfile.is_tarfile(filename) or
filename.lower().endswith(
TAR_EXTENSIONS + BZ2_EXTENSIONS + XZ_EXTENSIONS
)
content_type == "application/x-gzip"
or tarfile.is_tarfile(filename)
or filename.lower().endswith(TAR_EXTENSIONS + BZ2_EXTENSIONS + XZ_EXTENSIONS)
):
untar_file(filename, location)
else:
# FIXME: handle?
# FIXME: magic signatures?
logger.critical(
'Cannot unpack file %s (downloaded from %s, content-type: %s); '
'cannot detect archive format',
filename, location, content_type,
)
raise InstallationError(
f'Cannot determine archive format of {location}'
"Cannot unpack file %s (downloaded from %s, content-type: %s); "
"cannot detect archive format",
filename,
location,
content_type,
)
raise InstallationError(f"Cannot determine archive format of {location}")

View file

@ -7,9 +7,9 @@ from typing import Optional
def get_url_scheme(url):
# type: (str) -> Optional[str]
if ':' not in url:
if ":" not in url:
return None
return url.split(':', 1)[0].lower()
return url.split(":", 1)[0].lower()
def path_to_url(path):
@ -19,7 +19,7 @@ def path_to_url(path):
quoted path parts.
"""
path = os.path.normpath(os.path.abspath(path))
url = urllib.parse.urljoin('file:', urllib.request.pathname2url(path))
url = urllib.parse.urljoin("file:", urllib.request.pathname2url(path))
return url
@ -28,20 +28,21 @@ def url_to_path(url):
"""
Convert a file: URL to a path.
"""
assert url.startswith('file:'), (
f"You can only turn file: urls into filenames (not {url!r})")
assert url.startswith(
"file:"
), f"You can only turn file: urls into filenames (not {url!r})"
_, netloc, path, _, _ = urllib.parse.urlsplit(url)
if not netloc or netloc == 'localhost':
if not netloc or netloc == "localhost":
# According to RFC 8089, same as empty authority.
netloc = ''
elif sys.platform == 'win32':
netloc = ""
elif sys.platform == "win32":
# If we have a UNC path, prepend UNC share notation.
netloc = '\\\\' + netloc
netloc = "\\\\" + netloc
else:
raise ValueError(
f'non-local file URIs are not supported on this platform: {url!r}'
f"non-local file URIs are not supported on this platform: {url!r}"
)
path = urllib.request.url2pathname(netloc + path)

View file

@ -27,13 +27,12 @@ def _running_under_regular_virtualenv():
This handles virtual environments created with pypa's virtualenv.
"""
# pypa/virtualenv case
return hasattr(sys, 'real_prefix')
return hasattr(sys, "real_prefix")
def running_under_virtualenv():
# type: () -> bool
"""Return True if we're running inside a virtualenv, False otherwise.
"""
"""Return True if we're running inside a virtualenv, False otherwise."""
return _running_under_venv() or _running_under_regular_virtualenv()
@ -43,11 +42,11 @@ def _get_pyvenv_cfg_lines():
Returns None, if it could not read/access the file.
"""
pyvenv_cfg_file = os.path.join(sys.prefix, 'pyvenv.cfg')
pyvenv_cfg_file = os.path.join(sys.prefix, "pyvenv.cfg")
try:
# Although PEP 405 does not specify, the built-in venv module always
# writes with UTF-8. (pypa/pip#8717)
with open(pyvenv_cfg_file, encoding='utf-8') as f:
with open(pyvenv_cfg_file, encoding="utf-8") as f:
return f.read().splitlines() # avoids trailing newlines
except OSError:
return None
@ -78,7 +77,7 @@ def _no_global_under_venv():
for line in cfg_lines:
match = _INCLUDE_SYSTEM_SITE_PACKAGES_REGEX.match(line)
if match is not None and match.group('value') == 'false':
if match is not None and match.group("value") == "false":
return True
return False
@ -92,15 +91,15 @@ def _no_global_under_regular_virtualenv():
"""
site_mod_dir = os.path.dirname(os.path.abspath(site.__file__))
no_global_site_packages_file = os.path.join(
site_mod_dir, 'no-global-site-packages.txt',
site_mod_dir,
"no-global-site-packages.txt",
)
return os.path.exists(no_global_site_packages_file)
def virtualenv_no_global():
# type: () -> bool
"""Returns a boolean, whether running in venv with no system site-packages.
"""
"""Returns a boolean, whether running in venv with no system site-packages."""
# PEP 405 compliance needs to be checked first since virtualenv >=20 would
# return True for both checks, but is only able to use the PEP 405 config.
if _running_under_venv():

View file

@ -23,6 +23,7 @@ class WheelMetadata(DictMetadata):
"""Metadata provider that maps metadata decoding exceptions to our
internal exception type.
"""
def __init__(self, metadata, wheel_name):
# type: (Dict[str, bytes], str) -> None
super().__init__(metadata)
@ -35,9 +36,7 @@ class WheelMetadata(DictMetadata):
except UnicodeDecodeError as e:
# Augment the default error with the origin of the file.
raise UnsupportedWheel(
"Error decoding metadata for {}: {}".format(
self._wheel_name, e
)
"Error decoding metadata for {}: {}".format(self._wheel_name, e)
)
@ -49,9 +48,7 @@ def pkg_resources_distribution_for_wheel(wheel_zip, name, location):
"""
info_dir, _ = parse_wheel(wheel_zip, name)
metadata_files = [
p for p in wheel_zip.namelist() if p.startswith(f"{info_dir}/")
]
metadata_files = [p for p in wheel_zip.namelist() if p.startswith(f"{info_dir}/")]
metadata_text = {} # type: Dict[str, bytes]
for path in metadata_files:
@ -60,15 +57,11 @@ def pkg_resources_distribution_for_wheel(wheel_zip, name, location):
try:
metadata_text[metadata_name] = read_wheel_metadata_file(wheel_zip, path)
except UnsupportedWheel as e:
raise UnsupportedWheel(
"{} has an invalid wheel, {}".format(name, str(e))
)
raise UnsupportedWheel("{} has an invalid wheel, {}".format(name, str(e)))
metadata = WheelMetadata(metadata_text, location)
return DistInfoDistribution(
location=location, metadata=metadata, project_name=name
)
return DistInfoDistribution(location=location, metadata=metadata, project_name=name)
def parse_wheel(wheel_zip, name):
@ -83,9 +76,7 @@ def parse_wheel(wheel_zip, name):
metadata = wheel_metadata(wheel_zip, info_dir)
version = wheel_version(metadata)
except UnsupportedWheel as e:
raise UnsupportedWheel(
"{} has an invalid wheel, {}".format(name, str(e))
)
raise UnsupportedWheel("{} has an invalid wheel, {}".format(name, str(e)))
check_compatibility(version, name)
@ -102,16 +93,14 @@ def wheel_dist_info_dir(source, name):
# Zip file path separators must be /
subdirs = {p.split("/", 1)[0] for p in source.namelist()}
info_dirs = [s for s in subdirs if s.endswith('.dist-info')]
info_dirs = [s for s in subdirs if s.endswith(".dist-info")]
if not info_dirs:
raise UnsupportedWheel(".dist-info directory not found")
if len(info_dirs) > 1:
raise UnsupportedWheel(
"multiple .dist-info directories found: {}".format(
", ".join(info_dirs)
)
"multiple .dist-info directories found: {}".format(", ".join(info_dirs))
)
info_dir = info_dirs[0]
@ -135,9 +124,7 @@ def read_wheel_metadata_file(source, path):
# BadZipFile for general corruption, KeyError for missing entry,
# and RuntimeError for password-protected files
except (BadZipFile, KeyError, RuntimeError) as e:
raise UnsupportedWheel(
f"could not read {path!r} file: {e!r}"
)
raise UnsupportedWheel(f"could not read {path!r} file: {e!r}")
def wheel_metadata(source, dist_info_dir):
@ -172,7 +159,7 @@ def wheel_version(wheel_data):
version = version_text.strip()
try:
return tuple(map(int, version.split('.')))
return tuple(map(int, version.split(".")))
except ValueError:
raise UnsupportedWheel(f"invalid Wheel-Version: {version!r}")
@ -193,10 +180,10 @@ def check_compatibility(version, name):
if version[0] > VERSION_COMPATIBLE[0]:
raise UnsupportedWheel(
"{}'s Wheel-Version ({}) is not compatible with this version "
"of pip".format(name, '.'.join(map(str, version)))
"of pip".format(name, ".".join(map(str, version)))
)
elif version > VERSION_COMPATIBLE:
logger.warning(
'Installing from a newer Wheel-Version (%s)',
'.'.join(map(str, version)),
"Installing from a newer Wheel-Version (%s)",
".".join(map(str, version)),
)