Resolve conflicts

This commit is contained in:
harupy 2021-08-02 15:28:55 +09:00
commit 5db63be6f6
115 changed files with 1374 additions and 1077 deletions

View File

@ -30,7 +30,6 @@ jobs:
tests:
# Anything that's touching testable stuff
- ".github/workflows/ci.yml"
- "tools/requirements/tests.txt"
- "src/**"
- "tests/**"
if: github.event_name == 'pull_request'

View File

@ -25,12 +25,9 @@ repos:
^src/pip/_internal/commands|
^src/pip/_internal/index|
^src/pip/_internal/models|
^src/pip/_internal/network|
^src/pip/_internal/operations|
^src/pip/_internal/req|
^src/pip/_internal/vcs|
^src/pip/_internal/\w+\.py$|
^tools/|
# Tests
^tests/data|
^tests/unit|

View File

@ -7,4 +7,4 @@ sphinx:
python:
version: 3.8
install:
- requirements: tools/requirements/docs.txt
- requirements: docs/requirements.txt

View File

@ -98,13 +98,16 @@ Brandt Bucher
Brett Randall
Brian Cristante
Brian Rosner
briantracy
BrownTruck
Bruno Oliveira
Bruno Renié
Bruno S
Bstrdsmkr
Buck Golemon
burrows
Bussonnier Matthias
bwoodsend
c22
Caleb Martinez
Calvin Smith
@ -177,13 +180,16 @@ David Tucker
David Wales
Davidovich
Deepak Sharma
Deepyaman Datta
Denise Yu
derwolfe
Desetude
Devesh Kumar Singh
Diego Caraballo
Diego Ramirez
DiegoCaraballo
Dimitri Merejkowsky
Dirk Stolle
Dmitry Gladkov
Domen Kožar
Dominic Davis-Foster
@ -245,17 +251,18 @@ Greg Ward
Guilherme Espada
gutsytechster
Guy Rozendorn
Guy Tuval
gzpan123
Hanjun Kim
Hari Charan
Harsh Vardhan
harupy
Harutaka Kawamura
Henry Schreiner
Herbert Pfennig
Hsiaoming Yang
Hugo
Hugo Lopes Tavares
Hugo van Kemenade
hugovk
Hynek Schlawack
Ian Bicking
Ian Cordasco
@ -265,12 +272,14 @@ Ian Wienand
Igor Kuzmitshov
Igor Sobreira
Ilan Schnell
Illia Volochii
Ilya Baryshev
Inada Naoki
Ionel Cristian Mărieș
Ionel Maries Cristian
Ivan Pozdeev
Jacob Kim
Jacob Walls
jakirkham
Jakub Stasiak
Jakub Vysoky
@ -295,6 +304,7 @@ Jiashuo Li
Jim Fisher
Jim Garrison
Jivan Amara
Joe Michelini
John Paton
John T. Wodder II
John-Scott Atlakson
@ -388,6 +398,7 @@ mayeut
mbaluna
mdebi
memoselyk
meowmeowcat
Michael
Michael Aquilina
Michael E. Karpeles
@ -425,6 +436,7 @@ Noah Gorny
Nowell Strite
NtaleGrey
nvdv
OBITORASU
Ofekmeister
ofrinevo
Oliver Jeeves
@ -530,6 +542,7 @@ Simon Cross
Simon Pichugin
sinoroc
sinscary
snook92
socketubs
Sorin Sbarnea
Srinivas Nyayapati
@ -552,6 +565,7 @@ Surbhi Sharma
Sviatoslav Sydorenko
Swat009
Takayuki SHIMIZUKAWA
Taneli Hukkinen
tbeswick
Thijs Triemstra
Thomas Fenzl

View File

@ -10,6 +10,7 @@ recursive-include src/pip/_vendor *LICENSE*
recursive-include src/pip/_vendor *COPYING*
include docs/docutils.conf
include docs/requirements.txt
exclude .coveragerc
exclude .mailmap

124
NEWS.rst
View File

@ -1,3 +1,127 @@
21.2.2 (2021-07-31)
===================
Bug Fixes
---------
- New resolver: When a package is specified with extras in constraints, and with
extras in non-constraint requirements, the resolver now correctly identifies the
constraint's existence and avoids backtracking. (`#10233 <https://github.com/pypa/pip/issues/10233>`_)
21.2.1 (2021-07-25)
===================
Process
-------
- The source distribution re-installation feature removal has been delayed to 21.3.
21.2 (2021-07-24)
=================
Process
-------
- ``pip freeze``, ``pip list``, and ``pip show`` no longer normalize underscore
(``_``) in distribution names to dash (``-``). This is a side effect of the
migration to ``importlib.metadata``, since the underscore-dash normalization
behavior is non-standard and specific to setuptools. This should not affect
other parts of pip (for example, when feeding the ``pip freeze`` result back
into ``pip install``) since pip internally performs standard PEP 503
normalization independently to setuptools.
Deprecations and Removals
-------------------------
- Git version parsing is now done with regular expression to prepare for the
pending upstream removal of non-PEP-440 version parsing logic. (`#10117 <https://github.com/pypa/pip/issues/10117>`_)
- Re-enable the "Value for ... does not match" location warnings to field a new
round of feedback for the ``distutils``-``sysconfig`` transition. (`#10151 <https://github.com/pypa/pip/issues/10151>`_)
- Remove deprecated ``--find-links`` option in ``pip freeze`` (`#9069 <https://github.com/pypa/pip/issues/9069>`_)
Features
--------
- New resolver: Loosen URL comparison logic when checking for direct URL reference
equivalency. The logic includes the following notable characteristics:
* The authentication part of the URL is explicitly ignored.
* Most of the fragment part, including ``egg=``, is explicitly ignored. Only
``subdirectory=`` and hash values (e.g. ``sha256=``) are kept.
* The query part of the URL is parsed to allow ordering differences. (`#10002 <https://github.com/pypa/pip/issues/10002>`_)
- Support TOML v1.0.0 syntax in ``pyproject.toml``. (`#10034 <https://github.com/pypa/pip/issues/10034>`_)
- Added a warning message for errors caused due to Long Paths being disabled on Windows. (`#10045 <https://github.com/pypa/pip/issues/10045>`_)
- Change the encoding of log file from default text encoding to UTF-8. (`#10071 <https://github.com/pypa/pip/issues/10071>`_)
- Log the resolved commit SHA when installing a package from a Git repository. (`#10149 <https://github.com/pypa/pip/issues/10149>`_)
- Add a warning when passing an invalid requirement to ``pip uninstall``. (`#4958 <https://github.com/pypa/pip/issues/4958>`_)
- Add new subcommand ``pip index`` used to interact with indexes, and implement
``pip index version`` to list available versions of a package. (`#7975 <https://github.com/pypa/pip/issues/7975>`_)
- When pip is asked to uninstall a project without the dist-info/RECORD file
it will no longer traceback with FileNotFoundError,
but it will provide a better error message instead, such as::
ERROR: Cannot uninstall foobar 0.1, RECORD file not found. You might be able to recover from this via: 'pip install --force-reinstall --no-deps foobar==0.1'.
When dist-info/INSTALLER is present and contains some useful information, the info is included in the error message instead::
ERROR: Cannot uninstall foobar 0.1, RECORD file not found. Hint: The package was installed by rpm.
(`#8954 <https://github.com/pypa/pip/issues/8954>`_)
- Add an additional level of verbosity. ``--verbose`` (and the shorthand ``-v``) now
contains significantly less output, and users that need complete full debug-level output
should pass it twice (``--verbose --verbose`` or ``-vv``). (`#9450 <https://github.com/pypa/pip/issues/9450>`_)
- New resolver: The order of dependencies resolution has been tweaked to traverse
the dependency graph in a more breadth-first approach. (`#9455 <https://github.com/pypa/pip/issues/9455>`_)
- Make "yes" the default choice in ``pip uninstall``'s prompt. (`#9686 <https://github.com/pypa/pip/issues/9686>`_)
- Add a special error message when users forget the ``-r`` flag when installing. (`#9915 <https://github.com/pypa/pip/issues/9915>`_)
- New resolver: A distribution's ``Requires-Python`` metadata is now checked
before its Python dependencies. This makes the resolver fail quicker when
there's an interpreter version conflict. (`#9925 <https://github.com/pypa/pip/issues/9925>`_)
- Suppress "not on PATH" warning when ``--prefix`` is given. (`#9931 <https://github.com/pypa/pip/issues/9931>`_)
- Include ``rustc`` version in pip's ``User-Agent``, when the system has ``rustc``. (`#9987 <https://github.com/pypa/pip/issues/9987>`_)
Bug Fixes
---------
- Update vendored six to 1.16.0 and urllib3 to 1.26.5 (`#10043 <https://github.com/pypa/pip/issues/10043>`_)
- Correctly allow PEP 517 projects to be detected without warnings in ``pip freeze``. (`#10080 <https://github.com/pypa/pip/issues/10080>`_)
- Strip leading slash from a ``file://`` URL built from an path with the Windows
drive notation. This fixes bugs where the ``file://`` URL cannot be correctly
used as requirement, constraint, or index URLs on Windows. (`#10115 <https://github.com/pypa/pip/issues/10115>`_)
- New resolver: URL comparison logic now treats ``file://localhost/`` and
``file:///`` as equivalent to conform to RFC 8089. (`#10162 <https://github.com/pypa/pip/issues/10162>`_)
- Prefer credentials from the URL over the previously-obtained credentials from URLs of the same domain, so it is possible to use different credentials on the same index server for different ``--extra-index-url`` options. (`#3931 <https://github.com/pypa/pip/issues/3931>`_)
- Fix extraction of files with utf-8 encoded paths from tars. (`#7667 <https://github.com/pypa/pip/issues/7667>`_)
- Skip distutils configuration parsing on encoding errors. (`#8931 <https://github.com/pypa/pip/issues/8931>`_)
- New resolver: Detect an unnamed requirement is user-specified (by building its
metadata for the project name) so it can be correctly ordered in the resolver. (`#9204 <https://github.com/pypa/pip/issues/9204>`_)
- Fix :ref:`pip freeze` to output packages :ref:`installed from git <vcs support>`
in the correct ``git+protocol://git.example.com/MyProject#egg=MyProject`` format
rather than the old and no longer supported ``git+git@`` format. (`#9822 <https://github.com/pypa/pip/issues/9822>`_)
- Fix warnings about install scheme selection for Python framework builds
distributed by Apple's Command Line Tools. (`#9844 <https://github.com/pypa/pip/issues/9844>`_)
- Relax interpreter detection to quelch a location mismatch warning where PyPy
is deliberately breaking backwards compatibility. (`#9845 <https://github.com/pypa/pip/issues/9845>`_)
Vendored Libraries
------------------
- Upgrade certifi to 2021.05.30.
- Upgrade idna to 3.2.
- Upgrade packaging to 21.0
- Upgrade requests to 2.26.0.
- Upgrade resolvelib to 0.7.1.
- Upgrade urllib3 to 1.26.6.
.. note
You should *NOT* be adding new change log entries to this file, this

View File

@ -44,7 +44,7 @@ rooms, and mailing lists is expected to follow the `PSF Code of Conduct`_.
.. _package installer: https://packaging.python.org/guides/tool-recommendations/
.. _Python Package Index: https://pypi.org
.. _Installation: https://pip.pypa.io/en/stable/installing.html
.. _Installation: https://pip.pypa.io/en/stable/installation/
.. _Usage: https://pip.pypa.io/en/stable/
.. _Release notes: https://pip.pypa.io/en/stable/news.html
.. _Release process: https://pip.pypa.io/en/latest/development/release-process/

View File

@ -53,7 +53,7 @@ Specifiers`
py -m pip install SomePackage # latest version
py -m pip install SomePackage==1.0.4 # specific version
py -m pip install 'SomePackage>=1.0.4' # minimum version
py -m pip install 'SomePackage>=1.0.4' # minimum version
For more information and examples, see the :ref:`pip install` reference.
@ -271,6 +271,26 @@ To install directly from a wheel archive:
py -m pip install SomePackage-1.0-py2.py3-none-any.whl
To include optional dependencies provided in the ``provides_extras``
metadata in the wheel, you must add quotes around the install target
name:
.. tab:: Unix/macOS
.. code-block:: shell
python -m pip install './somepackage-1.0-py2.py3-none-any.whl[my-extras]'
.. tab:: Windows
.. code-block:: shell
py -m pip install './somepackage-1.0-py2.py3-none-any.whl[my-extras]'
.. note::
In the future, the ``path[extras]`` syntax may become deprecated. It is
recommended to use PEP 508 syntax wherever possible.
For the cases where wheels are not available, pip offers :ref:`pip wheel` as a
convenience, to build wheels for all your requirements and dependencies.

View File

@ -1,4 +1,4 @@
sphinx == 3.2.1
sphinx ~= 4.1.0
towncrier
furo
myst_parser

View File

@ -1 +0,0 @@
Fix typos in several files.

View File

@ -1,7 +0,0 @@
New resolver: Loosen URL comparison logic when checking for direct URL reference
equivalency. The logic includes the following notable characteristics:
* The authentication part of the URL is explicitly ignored.
* Most of the fragment part, including ``egg=``, is explicitly ignored. Only
``subdirectory=`` and hash values (e.g. ``sha256=``) are kept.
* The query part of the URL is parsed to allow ordering differences.

View File

@ -1 +0,0 @@
Annotate ``typing.List`` into ``tools.tox_pip.pip()``

View File

@ -1 +0,0 @@
Use annotations from the ``typing`` module on some functions.

View File

@ -1 +0,0 @@
Support TOML v1.0.0 syntax in ``pyproject.toml``.

View File

@ -1 +0,0 @@
Update vendored six to 1.16.0 and urllib3 to 1.26.5

View File

@ -1 +0,0 @@
Added a warning message for errors caused due to Long Paths being disabled on Windows.

View File

@ -1 +0,0 @@
Convert type annotations into proper annotations in ``noxfile.py``.

View File

View File

@ -1 +0,0 @@
Fixed all the annotations from ``pip/_internal/cli``.

View File

@ -1 +0,0 @@
Change the encoding of log file from default text encoding to UTF-8.

View File

@ -1 +0,0 @@
Fixed all the annotations from ``pip/_internal/distributions``.

View File

@ -1 +0,0 @@
Correctly allow PEP 517 projects to be detected without warnings in ``pip freeze``.

View File

@ -1 +0,0 @@
Convert type hint commentaries into annotations on ``setup.py``.

View File

@ -1 +0,0 @@
Converted type commentaries into annotations in ``pip/_internal/index``.

View File

@ -1,3 +0,0 @@
Strip leading slash from a ``file://`` URL built from an path with the Windows
drive notation. This fixes bugs where the ``file://`` URL cannot be correctly
used as requirement, constraint, or index URLs on Windows.

View File

@ -1,2 +0,0 @@
Git version parsing is now done with regular expression to prepare for the
pending upstream removal of non-PEP-440 version parsing logic.

View File

@ -1 +0,0 @@
Converted type commentaries into annotations in ``pip/_internal/metadata``.

View File

@ -1 +0,0 @@
Converted type commentaries into annotations in ``pip/_internal/resolution``.

View File

@ -1 +0,0 @@
Use ``--color=yes`` to color pytest outputs.

View File

@ -1 +0,0 @@
Converted type commentaries into annotations in ``pip/_internal/locations``.

1
news/10128.removal.rst Normal file
View File

@ -0,0 +1 @@
Improve deprecation warning regarding the copying of source trees when installing from a local directory.

View File

@ -1 +0,0 @@
Convert type commentaries to annotations on ``pip/_internal/models``.

View File

@ -1 +0,0 @@
Log the resolved commit SHA when installing a package from a Git repository.

View File

@ -1,2 +0,0 @@
Re-enable the "Value for ... does not match" location warnings to field a new
round of feedback for the ``distutils``-``sysconfig`` transition.

View File

@ -1,7 +0,0 @@
``pip freeze``, ``pip list``, and ``pip show`` no longer normalize underscore
(``_``) in distribution names to dash (``-``). This is a side effect of the
migration to ``importlib.metadata``, since the underscore-dash normalization
behavior is non-standard and specific to setuptools. This should not affect
other parts of pip (for example, when feeding the ``pip freeze`` result back
into ``pip install``) since pip internally performs standard PEP 503
normalization independently to setuptools.

View File

@ -1 +0,0 @@
Complete the type annotations from ``pip/_internal/utils``.

View File

@ -1,2 +0,0 @@
New resolver: URL comparison logic now treats ``file://localhost/`` and
``file:///`` as equivalent to conform to RFC 8089.

4
news/10165.trivial.rst Normal file
View File

@ -0,0 +1,4 @@
Add a ``feature_flag`` optional kwarg to the ``deprecated()`` function
``pip._internal.utils.deprecation:deprecated``. Also formulate a corresponding canned
message which suggests using the ``--use-feature={feature_flag}`` to test upcoming
behavior.

3
news/10233.bugfix.rst Normal file
View File

@ -0,0 +1,3 @@
New resolver: When a package is specified with extras in constraints, and with
extras in non-constraint requirements, the resolver now correctly identifies the
constraint's existence and avoids backtracking.

2
news/10252.bugfix.rst Normal file
View File

@ -0,0 +1,2 @@
Modify the ``sysconfig.get_preferred_scheme`` function check to be
compatible with CPython 3.10s alphareleases.

View File

@ -1 +0,0 @@
Prefer credentials from the URL over the previously-obtained credentials from URLs of the same domain, so it is possible to use different credentials on the same index server for different ``--extra-index-url`` options.

View File

@ -1 +0,0 @@
Fix extraction of files with utf-8 encoded paths from tars.

View File

@ -1,2 +0,0 @@
Add new subcommand ``pip index`` used to interact with indexes, and implement
``pip index version`` to list available versions of a package.

View File

@ -1 +0,0 @@
Skip distutils configuration parsing on encoding errors.

View File

@ -1,9 +0,0 @@
When pip is asked to uninstall a project without the dist-info/RECORD file
it will no longer traceback with FileNotFoundError,
but it will provide a better error message instead, such as::
ERROR: Cannot uninstall foobar 0.1, RECORD file not found. You might be able to recover from this via: 'pip install --force-reinstall --no-deps foobar==0.1'.
When dist-info/INSTALLER is present and contains some useful information, the info is included in the error message instead::
ERROR: Cannot uninstall foobar 0.1, RECORD file not found. Hint: The package was installed by rpm.

View File

@ -1 +0,0 @@
mailmap: Clean up Git entries

View File

@ -1 +0,0 @@
Remove deprecated ``--find-links`` option in ``pip freeze``

View File

@ -1,2 +0,0 @@
New resolver: Detect an unnamed requirement is user-specified (by building its
metadata for the project name) so it can be correctly ordered in the resolver.

View File

@ -1,3 +0,0 @@
Add an additional level of verbosity. ``--verbose`` (and the shorthand ``-v``) now
contains significantly less output, and users that need complete full debug-level output
should pass it twice (``--verbose --verbose`` or ``-vv``).

View File

@ -1,2 +0,0 @@
New resolver: The order of dependencies resolution has been tweaked to traverse
the dependency graph in a more breadth-first approach.

View File

@ -1 +0,0 @@
Make "yes" the default choice in ``pip uninstall``'s prompt.

View File

@ -1,3 +0,0 @@
Fix :ref:`pip freeze` to output packages :ref:`installed from git <vcs support>`
in the correct ``git+protocol://git.example.com/MyProject#egg=MyProject`` format
rather than the old and no longer supported ``git+git@`` format.

View File

@ -1,2 +0,0 @@
Fix warnings about install scheme selection for Python framework builds
distributed by Apple's Command Line Tools.

View File

@ -1,2 +0,0 @@
Relax interpreter detection to quelch a location mismatch warning where PyPy
is deliberately breaking backwards compatibility.

View File

@ -1 +0,0 @@
Add a special error message when users forget the ``-r`` flag when installing.

View File

@ -1,3 +0,0 @@
New resolver: A distribution's ``Requires-Python`` metadata is now checked
before its Python dependencies. This makes the resolver fail quicker when
there's an interpreter version conflict.

View File

@ -1 +0,0 @@
Suppress "not on PATH" warning when ``--prefix`` is given.

View File

@ -1 +0,0 @@
Include ``rustc`` version in pip's ``User-Agent``, when the system has ``rustc``.

View File

@ -1 +0,0 @@
Upgrade certifi to 2021.05.30.

View File

@ -1 +0,0 @@
Upgrade idna to 3.2.

View File

@ -1 +0,0 @@
Upgrade packaging to 21.0

View File

@ -1 +0,0 @@
Upgrade requests to 2.26.0.

View File

@ -1 +0,0 @@
Upgrade resolvelib to 0.7.1.

View File

@ -1 +0,0 @@
Upgrade urllib3 to 1.26.6.

View File

@ -24,9 +24,9 @@ LOCATIONS = {
"protected-pip": "tools/tox_pip.py",
}
REQUIREMENTS = {
"docs": "tools/requirements/docs.txt",
"tests": "tools/requirements/tests.txt",
"common-wheels": "tools/requirements/tests-common_wheels.txt",
"docs": "docs/requirements.txt",
"tests": "tests/requirements.txt",
"common-wheels": "tests/requirements-common_wheels.txt",
}
AUTHORS_FILE = "AUTHORS.txt"

View File

@ -1,6 +1,6 @@
from typing import List, Optional
__version__ = "21.2.dev0"
__version__ = "21.3.dev0"
def main(args: Optional[List[str]] = None) -> int:

View File

@ -1,8 +1,8 @@
import csv
import logging
import os
import pathlib
from optparse import Values
from typing import Iterator, List, NamedTuple, Optional
from typing import Iterator, List, NamedTuple, Optional, Tuple
from pip._vendor.packaging.utils import canonicalize_name
@ -66,6 +66,33 @@ class _PackageInfo(NamedTuple):
files: Optional[List[str]]
def _covert_legacy_entry(entry: Tuple[str, ...], info: Tuple[str, ...]) -> str:
"""Convert a legacy installed-files.txt path into modern RECORD path.
The legacy format stores paths relative to the info directory, while the
modern format stores paths relative to the package root, e.g. the
site-packages directory.
:param entry: Path parts of the installed-files.txt entry.
:param info: Path parts of the egg-info directory relative to package root.
:returns: The converted entry.
For best compatibility with symlinks, this does not use ``abspath()`` or
``Path.resolve()``, but tries to work with path parts:
1. While ``entry`` starts with ``..``, remove the equal amounts of parts
from ``info``; if ``info`` is empty, start appending ``..`` instead.
2. Join the two directly.
"""
while entry and entry[0] == "..":
if not info or info[-1] == "..":
info += ("..",)
else:
info = info[:-1]
entry = entry[1:]
return str(pathlib.Path(*info, *entry))
def search_packages_info(query: List[str]) -> Iterator[_PackageInfo]:
"""
Gather details from installed distributions. Print distribution name,
@ -100,14 +127,29 @@ def search_packages_info(query: List[str]) -> Iterator[_PackageInfo]:
text = dist.read_text('RECORD')
except FileNotFoundError:
return None
return (row[0] for row in csv.reader(text.splitlines()))
# This extra Path-str cast normalizes entries.
return (str(pathlib.Path(row[0])) for row in csv.reader(text.splitlines()))
def _files_from_installed_files(dist: BaseDistribution) -> Optional[Iterator[str]]:
def _files_from_legacy(dist: BaseDistribution) -> Optional[Iterator[str]]:
try:
text = dist.read_text('installed-files.txt')
except FileNotFoundError:
return None
return (p for p in text.splitlines(keepends=False) if p)
paths = (p for p in text.splitlines(keepends=False) if p)
root = dist.location
info = dist.info_directory
if root is None or info is None:
return paths
try:
info_rel = pathlib.Path(info).relative_to(root)
except ValueError: # info is not relative to root.
return paths
if not info_rel.parts: # info *is* root.
return paths
return (
_covert_legacy_entry(pathlib.Path(p).parts, info_rel.parts)
for p in paths
)
for query_name in query_names:
try:
@ -121,11 +163,11 @@ def search_packages_info(query: List[str]) -> Iterator[_PackageInfo]:
except FileNotFoundError:
entry_points = []
files_iter = _files_from_record(dist) or _files_from_installed_files(dist)
files_iter = _files_from_record(dist) or _files_from_legacy(dist)
if files_iter is None:
files: Optional[List[str]] = None
else:
files = sorted(os.path.relpath(p, dist.location) for p in files_iter)
files = sorted(files_iter)
metadata = dist.metadata

View File

@ -1,3 +1,4 @@
import logging
from optparse import Values
from typing import List
@ -14,6 +15,8 @@ from pip._internal.req.constructors import (
)
from pip._internal.utils.misc import protect_pip_from_modification_on_windows
logger = logging.getLogger(__name__)
class UninstallCommand(Command, SessionCommandMixin):
"""
@ -58,6 +61,13 @@ class UninstallCommand(Command, SessionCommandMixin):
)
if req.name:
reqs_to_uninstall[canonicalize_name(req.name)] = req
else:
logger.warning(
"Invalid requirement: %r ignored -"
" the uninstall command expects named"
" requirements.",
name,
)
for filename in options.requirements:
for parsed_req in parse_requirements(
filename,

View File

@ -502,7 +502,7 @@ class CandidateEvaluator:
"""
valid_tags = self._supported_tags
support_num = len(valid_tags)
build_tag = () # type: BuildTag
build_tag: BuildTag = ()
binary_preference = 0
link = candidate.link
if link.is_wheel:
@ -603,7 +603,7 @@ class PackageFinder:
self.format_control = format_control
# These are boring links that have already been logged somehow.
self._logged_links = set() # type: Set[Link]
self._logged_links: Set[Link] = set()
# Don't include an allow_yanked default value to make sure each call
# site considers whether yanked releases are allowed. This also causes
@ -699,7 +699,7 @@ class PackageFinder:
second, while eliminating duplicates
"""
eggs, no_eggs = [], []
seen = set() # type: Set[Link]
seen: Set[Link] = set()
for link in links:
if link not in seen:
seen.add(link)
@ -871,7 +871,7 @@ class PackageFinder:
)
best_candidate = best_candidate_result.best_candidate
installed_version = None # type: Optional[_BaseVersion]
installed_version: Optional[_BaseVersion] = None
if req.satisfied_by is not None:
installed_version = parse_version(req.satisfied_by.version)

View File

@ -4,9 +4,11 @@ import os
import pathlib
import sys
import sysconfig
from typing import List, Optional
from typing import Dict, Iterator, List, Optional, Tuple
from pip._internal.models.scheme import SCHEME_KEYS, Scheme
from pip._internal.utils.compat import WINDOWS
from pip._internal.utils.deprecation import deprecated
from . import _distutils, _sysconfig
from .base import (
@ -41,6 +43,53 @@ else:
_MISMATCH_LEVEL = logging.WARNING
def _looks_like_red_hat_patched_platlib_purelib(scheme: Dict[str, str]) -> bool:
platlib = scheme["platlib"]
if "/lib64/" not in platlib:
return False
unpatched = platlib.replace("/lib64/", "/lib/")
return unpatched.replace("$platbase/", "$base/") == scheme["purelib"]
@functools.lru_cache(maxsize=None)
def _looks_like_red_hat_patched() -> bool:
"""Red Hat patches platlib in unix_prefix and unix_home, but not purelib.
This is the only way I can see to tell a Red Hat-patched Python.
"""
from distutils.command.install import INSTALL_SCHEMES # type: ignore
return all(
k in INSTALL_SCHEMES
and _looks_like_red_hat_patched_platlib_purelib(INSTALL_SCHEMES[k])
for k in ("unix_prefix", "unix_home")
)
@functools.lru_cache(maxsize=None)
def _looks_like_debian_patched() -> bool:
"""Debian adds two additional schemes."""
from distutils.command.install import INSTALL_SCHEMES # type: ignore
return "deb_system" in INSTALL_SCHEMES and "unix_local" in INSTALL_SCHEMES
def _fix_abiflags(parts: Tuple[str]) -> Iterator[str]:
ldversion = sysconfig.get_config_var("LDVERSION")
abiflags: str = getattr(sys, "abiflags", None)
# LDVERSION does not end with sys.abiflags. Just return the path unchanged.
if not ldversion or not abiflags or not ldversion.endswith(abiflags):
yield from parts
return
# Strip sys.abiflags from LDVERSION-based path components.
for part in parts:
if part.endswith(ldversion):
part = part[: (0 - len(abiflags))]
yield part
def _default_base(*, user: bool) -> str:
if user:
base = sysconfig.get_config_var("userbase")
@ -51,9 +100,7 @@ def _default_base(*, user: bool) -> str:
@functools.lru_cache(maxsize=None)
def _warn_if_mismatch(old: pathlib.Path, new: pathlib.Path, *, key: str) -> bool:
if old == new:
return False
def _warn_mismatched(old: pathlib.Path, new: pathlib.Path, *, key: str) -> None:
issue_url = "https://github.com/pypa/pip/issues/10151"
message = (
"Value for %s does not match. Please report this to <%s>"
@ -61,6 +108,12 @@ def _warn_if_mismatch(old: pathlib.Path, new: pathlib.Path, *, key: str) -> bool
"\nsysconfig: %s"
)
logger.log(_MISMATCH_LEVEL, message, key, issue_url, old, new)
def _warn_if_mismatch(old: pathlib.Path, new: pathlib.Path, *, key: str) -> bool:
if old == new:
return False
_warn_mismatched(old, new, key=key)
return True
@ -72,10 +125,15 @@ def _log_context(
root: Optional[str] = None,
prefix: Optional[str] = None,
) -> None:
message = (
"Additional context:" "\nuser = %r" "\nhome = %r" "\nroot = %r" "\nprefix = %r"
)
logger.log(_MISMATCH_LEVEL, message, user, home, root, prefix)
parts = [
"Additional context:",
"user = %r",
"home = %r",
"root = %r",
"prefix = %r",
]
logger.log(_MISMATCH_LEVEL, "\n".join(parts), user, home, root, prefix)
def get_scheme(
@ -104,12 +162,15 @@ def get_scheme(
)
base = prefix or home or _default_base(user=user)
warned = []
warning_contexts = []
for k in SCHEME_KEYS:
# Extra join because distutils can return relative paths.
old_v = pathlib.Path(base, getattr(old, k))
new_v = pathlib.Path(getattr(new, k))
if old_v == new_v:
continue
# distutils incorrectly put PyPy packages under ``site-packages/python``
# in the ``posix_home`` scheme, but PyPy devs said they expect the
# directory name to be ``pypy`` instead. So we treat this as a bug fix
@ -132,16 +193,74 @@ def get_scheme(
user
and is_osx_framework()
and k == "headers"
and old_v.parent == new_v
and old_v.name.startswith("python")
and old_v.parent.parent == new_v.parent
and old_v.parent.name.startswith("python")
)
if skip_osx_framework_user_special_case:
continue
warned.append(_warn_if_mismatch(old_v, new_v, key=f"scheme.{k}"))
# On Red Hat and derived Linux distributions, distutils is patched to
# use "lib64" instead of "lib" for platlib.
if k == "platlib" and _looks_like_red_hat_patched():
continue
if any(warned):
_log_context(user=user, home=home, root=root, prefix=prefix)
# Both Debian and Red Hat patch Python to place the system site under
# /usr/local instead of /usr. Debian also places lib in dist-packages
# instead of site-packages, but the /usr/local check should cover it.
skip_linux_system_special_case = (
not (user or home or prefix)
and old_v.parts[1:3] == ("usr", "local")
and len(new_v.parts) > 1
and new_v.parts[1] == "usr"
and (len(new_v.parts) < 3 or new_v.parts[2] != "local")
and (_looks_like_red_hat_patched() or _looks_like_debian_patched())
)
if skip_linux_system_special_case:
continue
# On Python 3.7 and earlier, sysconfig does not include sys.abiflags in
# the "pythonX.Y" part of the path, but distutils does.
skip_sysconfig_abiflag_bug = (
sys.version_info < (3, 8)
and not WINDOWS
and k in ("headers", "platlib", "purelib")
and tuple(_fix_abiflags(old_v.parts)) == new_v.parts
)
if skip_sysconfig_abiflag_bug:
continue
warning_contexts.append((old_v, new_v, f"scheme.{k}"))
if not warning_contexts:
return old
# Check if this path mismatch is caused by distutils config files. Those
# files will no longer work once we switch to sysconfig, so this raises a
# deprecation message for them.
default_old = _distutils.distutils_scheme(
dist_name,
user,
home,
root,
isolated,
prefix,
ignore_config_files=True,
)
if any(default_old[k] != getattr(old, k) for k in SCHEME_KEYS):
deprecated(
"Configuring installation scheme with distutils config files "
"is deprecated and will no longer work in the near future. If you "
"are using a Homebrew or Linuxbrew Python, please see discussion "
"at https://github.com/Homebrew/homebrew-core/issues/76621",
replacement=None,
gone_in=None,
)
return old
# Post warnings about this mismatch so user can report them back.
for old_v, new_v, key in warning_contexts:
_warn_mismatched(old_v, new_v, key=key)
_log_context(user=user, home=home, root=root, prefix=prefix)
return old

View File

@ -21,13 +21,15 @@ from .base import get_major_minor_version
logger = logging.getLogger(__name__)
def _distutils_scheme(
def distutils_scheme(
dist_name: str,
user: bool = False,
home: str = None,
root: str = None,
isolated: bool = False,
prefix: str = None,
*,
ignore_config_files: bool = False,
) -> Dict[str, str]:
"""
Return a distutils install scheme
@ -39,15 +41,16 @@ def _distutils_scheme(
dist_args["script_args"] = ["--no-user-cfg"]
d = Distribution(dist_args)
try:
d.parse_config_files()
except UnicodeDecodeError:
# Typeshed does not include find_config_files() for some reason.
paths = d.find_config_files() # type: ignore
logger.warning(
"Ignore distutils configs in %s due to encoding errors.",
", ".join(os.path.basename(p) for p in paths),
)
if not ignore_config_files:
try:
d.parse_config_files()
except UnicodeDecodeError:
# Typeshed does not include find_config_files() for some reason.
paths = d.find_config_files() # type: ignore
logger.warning(
"Ignore distutils configs in %s due to encoding errors.",
", ".join(os.path.basename(p) for p in paths),
)
obj: Optional[DistutilsCommand] = None
obj = d.get_command_obj("install", create=True)
assert obj is not None
@ -121,7 +124,7 @@ def get_scheme(
:param prefix: indicates to use the "prefix" scheme and provides the
base directory for the same
"""
scheme = _distutils_scheme(dist_name, user, home, root, isolated, prefix)
scheme = distutils_scheme(dist_name, user, home, root, isolated, prefix)
return Scheme(
platlib=scheme["platlib"],
purelib=scheme["purelib"],

View File

@ -24,7 +24,7 @@ logger = logging.getLogger(__name__)
_AVAILABLE_SCHEMES = set(sysconfig.get_scheme_names())
_HAS_PREFERRED_SCHEME_API = sys.version_info >= (3, 10)
_PREFERRED_SCHEME_API = getattr(sysconfig, "get_preferred_scheme", None)
def _infer_prefix() -> str:
@ -41,8 +41,8 @@ def _infer_prefix() -> str:
If none of the above works, fall back to ``posix_prefix``.
"""
if _HAS_PREFERRED_SCHEME_API:
return sysconfig.get_preferred_scheme("prefix") # type: ignore
if _PREFERRED_SCHEME_API:
return _PREFERRED_SCHEME_API("prefix")
os_framework_global = is_osx_framework() and not running_under_virtualenv()
if os_framework_global and "osx_framework_library" in _AVAILABLE_SCHEMES:
return "osx_framework_library"
@ -61,8 +61,8 @@ def _infer_prefix() -> str:
def _infer_user() -> str:
"""Try to find a user scheme for the current platform."""
if _HAS_PREFERRED_SCHEME_API:
return sysconfig.get_preferred_scheme("user") # type: ignore
if _PREFERRED_SCHEME_API:
return _PREFERRED_SCHEME_API("user")
if is_osx_framework() and not running_under_virtualenv():
suffixed = "osx_framework_user"
else:
@ -76,8 +76,8 @@ def _infer_user() -> str:
def _infer_home() -> str:
"""Try to find a home for the current platform."""
if _HAS_PREFERRED_SCHEME_API:
return sysconfig.get_preferred_scheme("home") # type: ignore
if _PREFERRED_SCHEME_API:
return _PREFERRED_SCHEME_API("home")
suffixed = f"{os.name}_home"
if suffixed in _AVAILABLE_SCHEMES:
return suffixed

View File

@ -1,3 +1,4 @@
import functools
import os
import site
import sys
@ -46,5 +47,6 @@ except AttributeError:
user_site = site.USER_SITE
@functools.lru_cache(maxsize=None)
def is_osx_framework() -> bool:
return bool(sysconfig.get_config_var("PYTHONFRAMEWORK"))

View File

@ -57,6 +57,26 @@ class BaseDistribution(Protocol):
A string value is not necessarily a filesystem path, since distributions
can be loaded from other sources, e.g. arbitrary zip archives. ``None``
means the distribution is created in-memory.
Do not canonicalize this value with e.g. ``pathlib.Path.resolve()``. If
this is a symbolic link, we want to preserve the relative path between
it and files in the distribution.
"""
raise NotImplementedError()
@property
def info_directory(self) -> Optional[str]:
"""Location of the .[egg|dist]-info directory.
Similarly to ``location``, a string value is not necessarily a
filesystem path. ``None`` means the distribution is created in-memory.
For a modern .dist-info installation on disk, this should be something
like ``{location}/{raw_name}-{version}.dist-info``.
Do not canonicalize this value with e.g. ``pathlib.Path.resolve()``. If
this is a symbolic link, we want to preserve the relative path between
it and other files in the distribution.
"""
raise NotImplementedError()

View File

@ -48,6 +48,10 @@ class Distribution(BaseDistribution):
def location(self) -> Optional[str]:
return self._dist.location
@property
def info_directory(self) -> Optional[str]:
return self._dist.egg_info
@property
def canonical_name(self) -> "NormalizedName":
return canonicalize_name(self._dist.project_name)

View File

@ -36,7 +36,7 @@ class SearchScope:
# it and if it exists, use the normalized version.
# This is deliberately conservative - it might be fine just to
# blindly normalize anything starting with a ~...
built_find_links = [] # type: List[str]
built_find_links: List[str] = []
for link in find_links:
if link.startswith('~'):
new_link = normalize_path(link)

View File

@ -23,8 +23,7 @@ class SelectionPreferences:
format_control: Optional[FormatControl] = None,
prefer_binary: bool = False,
ignore_requires_python: Optional[bool] = None,
):
# type: (...) -> None
) -> None:
"""Create a SelectionPreferences object.
:param allow_yanked: Whether files marked as yanked (in the sense

View File

@ -62,7 +62,7 @@ class TargetPython:
self.py_version_info = py_version_info
# This is used to cache the return value of get_tags().
self._valid_tags = None # type: Optional[List[Tag]]
self._valid_tags: Optional[List[Tag]] = None
def format_given(self) -> str:
"""

View File

@ -31,7 +31,8 @@ except ImportError:
keyring = None
except Exception as exc:
logger.warning(
"Keyring is skipped due to an exception: %s", str(exc),
"Keyring is skipped due to an exception: %s",
str(exc),
)
keyring = None
@ -62,14 +63,14 @@ def get_keyring_auth(url: Optional[str], username: Optional[str]) -> Optional[Au
except Exception as exc:
logger.warning(
"Keyring is skipped due to an exception: %s", str(exc),
"Keyring is skipped due to an exception: %s",
str(exc),
)
keyring = None
return None
class MultiDomainBasicAuth(AuthBase):
def __init__(
self, prompting: bool = True, index_urls: Optional[List[str]] = None
) -> None:
@ -105,8 +106,12 @@ class MultiDomainBasicAuth(AuthBase):
return u
return None
def _get_new_credentials(self, original_url: str, allow_netrc: bool = True,
allow_keyring: bool = False) -> AuthInfo:
def _get_new_credentials(
self,
original_url: str,
allow_netrc: bool = True,
allow_keyring: bool = False,
) -> AuthInfo:
"""Find and return credentials for the specified URL."""
# Split the credentials and netloc from the url.
url, netloc, url_user_password = split_auth_netloc_from_url(
@ -145,10 +150,12 @@ class MultiDomainBasicAuth(AuthBase):
# If we don't have a password and keyring is available, use it.
if allow_keyring:
# The index url is more specific than the netloc, so try it first
# fmt: off
kr_auth = (
get_keyring_auth(index_url, username) or
get_keyring_auth(netloc, username)
)
# fmt: on
if kr_auth:
logger.debug("Found credentials in keyring for %s", netloc)
return kr_auth
@ -189,9 +196,9 @@ class MultiDomainBasicAuth(AuthBase):
assert (
# Credentials were found
(username is not None and password is not None) or
(username is not None and password is not None)
# Credentials were not found
(username is None and password is None)
or (username is None and password is None)
), f"Could not load credentials from url: {original_url}"
return url, username, password
@ -244,9 +251,11 @@ class MultiDomainBasicAuth(AuthBase):
parsed = urllib.parse.urlparse(resp.url)
# Query the keyring for credentials:
username, password = self._get_new_credentials(resp.url,
allow_netrc=False,
allow_keyring=True)
username, password = self._get_new_credentials(
resp.url,
allow_netrc=False,
allow_keyring=True,
)
# Prompt the user for a new username and password
save = False
@ -287,7 +296,8 @@ class MultiDomainBasicAuth(AuthBase):
"""Response callback to warn about incorrect credentials."""
if resp.status_code == 401:
logger.warning(
'401 Error, Credentials not correct for %s', resp.request.url,
"401 Error, Credentials not correct for %s",
resp.request.url,
)
def save_credentials(self, resp: Response, **kwargs: Any) -> None:
@ -300,7 +310,7 @@ class MultiDomainBasicAuth(AuthBase):
self._credentials_to_save = None
if creds and resp.status_code < 400:
try:
logger.info('Saving credentials to keyring')
logger.info("Saving credentials to keyring")
keyring.set_password(*creds)
except Exception:
logger.exception('Failed to save credentials')
logger.exception("Failed to save credentials")

View File

@ -50,7 +50,7 @@ class SafeFileCache(BaseCache):
def get(self, key: str) -> Optional[bytes]:
path = self._get_cache_path(key)
with suppressed_cache_errors():
with open(path, 'rb') as f:
with open(path, "rb") as f:
return f.read()
def set(self, key: str, value: bytes) -> None:

View File

@ -22,7 +22,7 @@ logger = logging.getLogger(__name__)
def _get_http_response_size(resp: Response) -> Optional[int]:
try:
return int(resp.headers['content-length'])
return int(resp.headers["content-length"])
except (ValueError, KeyError, TypeError):
return None
@ -30,7 +30,7 @@ def _get_http_response_size(resp: Response) -> Optional[int]:
def _prepare_download(
resp: Response,
link: Link,
progress_bar: str
progress_bar: str,
) -> Iterable[bytes]:
total_length = _get_http_response_size(resp)
@ -42,7 +42,7 @@ def _prepare_download(
logged_url = redact_auth_from_url(url)
if total_length:
logged_url = '{} ({})'.format(logged_url, format_size(total_length))
logged_url = "{} ({})".format(logged_url, format_size(total_length))
if is_from_cache(resp):
logger.info("Using cached %s", logged_url)
@ -65,9 +65,7 @@ def _prepare_download(
if not show_progress:
return chunks
return DownloadProgressProvider(
progress_bar, max=total_length
)(chunks)
return DownloadProgressProvider(progress_bar, max=total_length)(chunks)
def sanitize_content_filename(filename: str) -> str:
@ -83,7 +81,7 @@ def parse_content_disposition(content_disposition: str, default_filename: str) -
return the default filename if the result is empty.
"""
_type, params = cgi.parse_header(content_disposition)
filename = params.get('filename')
filename = params.get("filename")
if filename:
# We need to sanitize the filename to prevent directory traversal
# in case the filename contains ".." path parts.
@ -97,14 +95,12 @@ def _get_http_response_filename(resp: Response, link: Link) -> str:
"""
filename = link.filename # fallback
# Have a look at the Content-Disposition header for a better guess
content_disposition = resp.headers.get('content-disposition')
content_disposition = resp.headers.get("content-disposition")
if content_disposition:
filename = parse_content_disposition(content_disposition, filename)
ext: Optional[str] = splitext(filename)[1]
if not ext:
ext = mimetypes.guess_extension(
resp.headers.get('content-type', '')
)
ext = mimetypes.guess_extension(resp.headers.get("content-type", ""))
if ext:
filename += ext
if not ext and link.url != resp.url:
@ -115,7 +111,7 @@ def _get_http_response_filename(resp: Response, link: Link) -> str:
def _http_get_download(session: PipSession, link: Link) -> Response:
target_url = link.url.split('#', 1)[0]
target_url = link.url.split("#", 1)[0]
resp = session.get(target_url, headers=HEADERS, stream=True)
raise_for_status(resp)
return resp
@ -145,15 +141,14 @@ class Downloader:
filepath = os.path.join(location, filename)
chunks = _prepare_download(resp, link, self._progress_bar)
with open(filepath, 'wb') as content_file:
with open(filepath, "wb") as content_file:
for chunk in chunks:
content_file.write(chunk)
content_type = resp.headers.get('Content-Type', '')
content_type = resp.headers.get("Content-Type", "")
return filepath, content_type
class BatchDownloader:
def __init__(
self,
session: PipSession,
@ -173,7 +168,8 @@ class BatchDownloader:
assert e.response is not None
logger.critical(
"HTTP error %s while getting %s",
e.response.status_code, link,
e.response.status_code,
link,
)
raise
@ -181,8 +177,8 @@ class BatchDownloader:
filepath = os.path.join(location, filename)
chunks = _prepare_download(resp, link, self._progress_bar)
with open(filepath, 'wb') as content_file:
with open(filepath, "wb") as content_file:
for chunk in chunks:
content_file.write(chunk)
content_type = resp.headers.get('Content-Type', '')
content_type = resp.headers.get("Content-Type", "")
yield link, (filepath, content_type)

View File

@ -1,6 +1,6 @@
"""Lazy ZIP over HTTP"""
__all__ = ['HTTPRangeRequestUnsupported', 'dist_from_wheel_url']
__all__ = ["HTTPRangeRequestUnsupported", "dist_from_wheel_url"]
from bisect import bisect_left, bisect_right
from contextlib import contextmanager
@ -53,19 +53,19 @@ class LazyZipOverHTTP:
raise_for_status(head)
assert head.status_code == 200
self._session, self._url, self._chunk_size = session, url, chunk_size
self._length = int(head.headers['Content-Length'])
self._length = int(head.headers["Content-Length"])
self._file = NamedTemporaryFile()
self.truncate(self._length)
self._left: List[int] = []
self._right: List[int] = []
if 'bytes' not in head.headers.get('Accept-Ranges', 'none'):
raise HTTPRangeRequestUnsupported('range request is not supported')
if "bytes" not in head.headers.get("Accept-Ranges", "none"):
raise HTTPRangeRequestUnsupported("range request is not supported")
self._check_zip()
@property
def mode(self) -> str:
"""Opening mode, which is always rb."""
return 'rb'
return "rb"
@property
def name(self) -> str:
@ -94,9 +94,9 @@ class LazyZipOverHTTP:
"""
download_size = max(size, self._chunk_size)
start, length = self.tell(), self._length
stop = length if size < 0 else min(start+download_size, length)
start = max(0, stop-download_size)
self._download(start, stop-1)
stop = length if size < 0 else min(start + download_size, length)
start = max(0, stop - download_size)
self._download(start, stop - 1)
return self._file.read(size)
def readable(self) -> bool:
@ -170,9 +170,9 @@ class LazyZipOverHTTP:
) -> Response:
"""Return HTTP response to a range request from start to end."""
headers = base_headers.copy()
headers['Range'] = f'bytes={start}-{end}'
headers["Range"] = f"bytes={start}-{end}"
# TODO: Get range requests to be correctly cached
headers['Cache-Control'] = 'no-cache'
headers["Cache-Control"] = "no-cache"
return self._session.get(self._url, headers=headers, stream=True)
def _merge(
@ -187,11 +187,11 @@ class LazyZipOverHTTP:
right (int): Index after last overlapping downloaded data
"""
lslice, rslice = self._left[left:right], self._right[left:right]
i = start = min([start]+lslice[:1])
end = max([end]+rslice[-1:])
i = start = min([start] + lslice[:1])
end = max([end] + rslice[-1:])
for j, k in zip(lslice, rslice):
if j > i:
yield i, j-1
yield i, j - 1
i = k + 1
if i <= end:
yield i, end

View File

@ -77,13 +77,13 @@ SECURE_ORIGINS: List[SecureOrigin] = [
# For more background, see: https://github.com/pypa/pip/issues/5499
CI_ENVIRONMENT_VARIABLES = (
# Azure Pipelines
'BUILD_BUILDID',
"BUILD_BUILDID",
# Jenkins
'BUILD_ID',
"BUILD_ID",
# AppVeyor, CircleCI, Codeship, Gitlab CI, Shippable, Travis CI
'CI',
"CI",
# Explicit environment variable.
'PIP_IS_CI',
"PIP_IS_CI",
)
@ -109,19 +109,19 @@ def user_agent() -> str:
},
}
if data["implementation"]["name"] == 'CPython':
if data["implementation"]["name"] == "CPython":
data["implementation"]["version"] = platform.python_version()
elif data["implementation"]["name"] == 'PyPy':
elif data["implementation"]["name"] == "PyPy":
pypy_version_info = sys.pypy_version_info # type: ignore
if pypy_version_info.releaselevel == 'final':
if pypy_version_info.releaselevel == "final":
pypy_version_info = pypy_version_info[:3]
data["implementation"]["version"] = ".".join(
[str(x) for x in pypy_version_info]
)
elif data["implementation"]["name"] == 'Jython':
elif data["implementation"]["name"] == "Jython":
# Complete Guess
data["implementation"]["version"] = platform.python_version()
elif data["implementation"]["name"] == 'IronPython':
elif data["implementation"]["name"] == "IronPython":
# Complete Guess
data["implementation"]["version"] = platform.python_version()
@ -130,14 +130,18 @@ def user_agent() -> str:
# https://github.com/nir0s/distro/pull/269
linux_distribution = distro.linux_distribution() # type: ignore
distro_infos = dict(filter(
lambda x: x[1],
zip(["name", "version", "id"], linux_distribution),
))
libc = dict(filter(
lambda x: x[1],
zip(["lib", "version"], libc_ver()),
))
distro_infos = dict(
filter(
lambda x: x[1],
zip(["name", "version", "id"], linux_distribution),
)
)
libc = dict(
filter(
lambda x: x[1],
zip(["lib", "version"], libc_ver()),
)
)
if libc:
distro_infos["libc"] = libc
if distro_infos:
@ -157,6 +161,7 @@ def user_agent() -> str:
if has_tls():
import _ssl as ssl
data["openssl_version"] = ssl.OPENSSL_VERSION
setuptools_dist = get_default_environment().get_distribution("setuptools")
@ -167,7 +172,7 @@ def user_agent() -> str:
# If for any reason `rustc --version` fails, silently ignore it
try:
rustc_output = subprocess.check_output(
["rustc", "--version"], stderr=subprocess.STDOUT, timeout=.5
["rustc", "--version"], stderr=subprocess.STDOUT, timeout=0.5
)
except Exception:
pass
@ -195,7 +200,6 @@ def user_agent() -> str:
class LocalFSAdapter(BaseAdapter):
def send(
self,
request: PreparedRequest,
@ -219,11 +223,13 @@ class LocalFSAdapter(BaseAdapter):
else:
modified = email.utils.formatdate(stats.st_mtime, usegmt=True)
content_type = mimetypes.guess_type(pathname)[0] or "text/plain"
resp.headers = CaseInsensitiveDict({
"Content-Type": content_type,
"Content-Length": stats.st_size,
"Last-Modified": modified,
})
resp.headers = CaseInsensitiveDict(
{
"Content-Type": content_type,
"Content-Length": stats.st_size,
"Last-Modified": modified,
}
)
resp.raw = open(pathname, "rb")
resp.close = resp.raw.close
@ -235,7 +241,6 @@ class LocalFSAdapter(BaseAdapter):
class InsecureHTTPAdapter(HTTPAdapter):
def cert_verify(
self,
conn: ConnectionPool,
@ -247,7 +252,6 @@ class InsecureHTTPAdapter(HTTPAdapter):
class InsecureCacheControlAdapter(CacheControlAdapter):
def cert_verify(
self,
conn: ConnectionPool,
@ -293,7 +297,6 @@ class PipSession(requests.Session):
# Set the total number of retries that a particular request can
# have.
total=retries,
# A 503 error from PyPI typically means that the Fastly -> Origin
# connection got interrupted in some way. A 503 error in general
# is typically considered a transient error so we'll go ahead and
@ -301,7 +304,6 @@ class PipSession(requests.Session):
# A 500 may indicate transient error in Amazon S3
# A 520 or 527 - may indicate transient error in CloudFlare
status_forcelist=[500, 503, 520, 527],
# Add a small amount of back off between failed requests in
# order to prevent hammering the service.
backoff_factor=0.25,
@ -358,43 +360,39 @@ class PipSession(requests.Session):
string came from.
"""
if not suppress_logging:
msg = f'adding trusted host: {host!r}'
msg = f"adding trusted host: {host!r}"
if source is not None:
msg += f' (from {source})'
msg += f" (from {source})"
logger.info(msg)
host_port = parse_netloc(host)
if host_port not in self.pip_trusted_origins:
self.pip_trusted_origins.append(host_port)
self.mount(
build_url_from_netloc(host) + '/',
self._trusted_host_adapter
)
self.mount(build_url_from_netloc(host) + "/", self._trusted_host_adapter)
if not host_port[1]:
# Mount wildcard ports for the same host.
self.mount(
build_url_from_netloc(host) + ':',
self._trusted_host_adapter
)
self.mount(build_url_from_netloc(host) + ":", self._trusted_host_adapter)
def iter_secure_origins(self) -> Iterator[SecureOrigin]:
yield from SECURE_ORIGINS
for host, port in self.pip_trusted_origins:
yield ('*', host, '*' if port is None else port)
yield ("*", host, "*" if port is None else port)
def is_secure_origin(self, location: Link) -> bool:
# Determine if this url used a secure transport mechanism
parsed = urllib.parse.urlparse(str(location))
origin_protocol, origin_host, origin_port = (
parsed.scheme, parsed.hostname, parsed.port,
parsed.scheme,
parsed.hostname,
parsed.port,
)
# The protocol to use to see if the protocol matches.
# Don't count the repository type as part of the protocol: in
# cases such as "git+ssh", only use "ssh". (I.e., Only verify against
# the last scheme.)
origin_protocol = origin_protocol.rsplit('+', 1)[-1]
origin_protocol = origin_protocol.rsplit("+", 1)[-1]
# Determine if our origin is a secure origin by looking through our
# hardcoded list of secure origins, as well as any additional ones
@ -411,9 +409,9 @@ class PipSession(requests.Session):
# We don't have both a valid address or a valid network, so
# we'll check this origin against hostnames.
if (
origin_host and
origin_host.lower() != secure_host.lower() and
secure_host != "*"
origin_host
and origin_host.lower() != secure_host.lower()
and secure_host != "*"
):
continue
else:
@ -424,9 +422,9 @@ class PipSession(requests.Session):
# Check to see if the port matches.
if (
origin_port != secure_port and
secure_port != "*" and
secure_port is not None
origin_port != secure_port
and secure_port != "*"
and secure_port is not None
):
continue

View File

@ -23,30 +23,32 @@ from pip._internal.exceptions import NetworkConnectionError
# you're not asking for a compressed file and will then decompress it
# before sending because if that's the case I don't think it'll ever be
# possible to make this work.
HEADERS: Dict[str, str] = {'Accept-Encoding': 'identity'}
HEADERS: Dict[str, str] = {"Accept-Encoding": "identity"}
def raise_for_status(resp: Response) -> None:
http_error_msg = ''
http_error_msg = ""
if isinstance(resp.reason, bytes):
# We attempt to decode utf-8 first because some servers
# choose to localize their reason strings. If the string
# isn't utf-8, we fall back to iso-8859-1 for all other
# encodings.
try:
reason = resp.reason.decode('utf-8')
reason = resp.reason.decode("utf-8")
except UnicodeDecodeError:
reason = resp.reason.decode('iso-8859-1')
reason = resp.reason.decode("iso-8859-1")
else:
reason = resp.reason
if 400 <= resp.status_code < 500:
http_error_msg = (
f'{resp.status_code} Client Error: {reason} for url: {resp.url}')
f"{resp.status_code} Client Error: {reason} for url: {resp.url}"
)
elif 500 <= resp.status_code < 600:
http_error_msg = (
f'{resp.status_code} Server Error: {reason} for url: {resp.url}')
f"{resp.status_code} Server Error: {reason} for url: {resp.url}"
)
if http_error_msg:
raise NetworkConnectionError(http_error_msg, response=resp)
@ -55,8 +57,7 @@ def raise_for_status(resp: Response) -> None:
def response_chunks(
response: Response, chunk_size: int = CONTENT_CHUNK_SIZE
) -> Iterator[bytes]:
"""Given a requests Response, provide the data chunks.
"""
"""Given a requests Response, provide the data chunks."""
try:
# Special case for urllib3.
for chunk in response.raw.stream(

View File

@ -40,9 +40,13 @@ class PipXmlrpcTransport(xmlrpc.client.Transport):
parts = (self._scheme, host, handler, None, None, None)
url = urllib.parse.urlunparse(parts)
try:
headers = {'Content-Type': 'text/xml'}
response = self._session.post(url, data=request_body,
headers=headers, stream=True)
headers = {"Content-Type": "text/xml"}
response = self._session.post(
url,
data=request_body,
headers=headers,
stream=True,
)
raise_for_status(response)
self.verbose = verbose
return self.parse_response(response.raw)
@ -50,6 +54,7 @@ class PipXmlrpcTransport(xmlrpc.client.Transport):
assert exc.response
logger.critical(
"HTTP error %s while getting %s",
exc.response.status_code, url,
exc.response.status_code,
url,
)
raise

View File

@ -24,6 +24,44 @@ class LegacyInstallFailure(Exception):
self.parent = sys.exc_info()
def write_installed_files_from_setuptools_record(
record_lines: List[str],
root: Optional[str],
req_description: str,
) -> None:
def prepend_root(path):
# type: (str) -> str
if root is None or not os.path.isabs(path):
return path
else:
return change_root(root, path)
for line in record_lines:
directory = os.path.dirname(line)
if directory.endswith(".egg-info"):
egg_info_dir = prepend_root(directory)
break
else:
message = (
"{} did not indicate that it installed an "
".egg-info directory. Only setup.py projects "
"generating .egg-info directories are supported."
).format(req_description)
raise InstallationError(message)
new_lines = []
for line in record_lines:
filename = line.strip()
if os.path.isdir(filename):
filename += os.path.sep
new_lines.append(os.path.relpath(prepend_root(filename), egg_info_dir))
new_lines.sort()
ensure_dir(egg_info_dir)
inst_files_path = os.path.join(egg_info_dir, "installed-files.txt")
with open(inst_files_path, "w") as f:
f.write("\n".join(new_lines) + "\n")
def install(
install_options: List[str],
global_options: Sequence[str],
@ -45,7 +83,7 @@ def install(
with TempDirectory(kind="record") as temp_dir:
try:
record_filename = os.path.join(temp_dir.path, 'install-record.txt')
record_filename = os.path.join(temp_dir.path, "install-record.txt")
install_args = make_setuptools_install_args(
setup_py_path,
global_options=global_options,
@ -70,7 +108,7 @@ def install(
)
if not os.path.exists(record_filename):
logger.debug('Record file %s not found', record_filename)
logger.debug("Record file %s not found", record_filename)
# Signal to the caller that we didn't install the new package
return False
@ -86,37 +124,5 @@ def install(
with open(record_filename) as f:
record_lines = f.read().splitlines()
def prepend_root(path: str) -> str:
if root is None or not os.path.isabs(path):
return path
else:
return change_root(root, path)
for line in record_lines:
directory = os.path.dirname(line)
if directory.endswith('.egg-info'):
egg_info_dir = prepend_root(directory)
break
else:
message = (
"{} did not indicate that it installed an "
".egg-info directory. Only setup.py projects "
"generating .egg-info directories are supported."
).format(req_description)
raise InstallationError(message)
new_lines = []
for line in record_lines:
filename = line.strip()
if os.path.isdir(filename):
filename += os.path.sep
new_lines.append(
os.path.relpath(prepend_root(filename), egg_info_dir)
)
new_lines.sort()
ensure_dir(egg_info_dir)
inst_files_path = os.path.join(egg_info_dir, 'installed-files.txt')
with open(inst_files_path, 'w') as f:
f.write('\n'.join(new_lines) + '\n')
write_installed_files_from_setuptools_record(record_lines, root, req_description)
return True

View File

@ -207,14 +207,16 @@ def unpack_url(
# be removed.
if link.is_existing_dir():
deprecated(
"A future pip version will change local packages to be built "
"in-place without first copying to a temporary directory. "
"We recommend you use --use-feature=in-tree-build to test "
"your packages with this new behavior before it becomes the "
"default.\n",
reason=(
"pip copied the source tree into a temporary directory "
"before building it. This is changing so that packages "
"are built in-place "
'within the original source tree ("in-tree build").'
),
replacement=None,
gone_in="21.3",
issue=7555
feature_flag="in-tree-build",
issue=7555,
)
if os.path.isdir(location):
rmtree(location)

View File

@ -9,44 +9,42 @@ from .req_install import InstallRequirement
from .req_set import RequirementSet
__all__ = [
"RequirementSet", "InstallRequirement",
"parse_requirements", "install_given_reqs",
"RequirementSet",
"InstallRequirement",
"parse_requirements",
"install_given_reqs",
]
logger = logging.getLogger(__name__)
class InstallationResult:
def __init__(self, name):
# type: (str) -> None
def __init__(self, name: str) -> None:
self.name = name
def __repr__(self):
# type: () -> str
def __repr__(self) -> str:
return f"InstallationResult(name={self.name!r})"
def _validate_requirements(
requirements, # type: List[InstallRequirement]
):
# type: (...) -> Iterator[Tuple[str, InstallRequirement]]
requirements: List[InstallRequirement],
) -> Iterator[Tuple[str, InstallRequirement]]:
for req in requirements:
assert req.name, f"invalid to-be-installed requirement: {req}"
yield req.name, req
def install_given_reqs(
requirements, # type: List[InstallRequirement]
install_options, # type: List[str]
global_options, # type: Sequence[str]
root, # type: Optional[str]
home, # type: Optional[str]
prefix, # type: Optional[str]
warn_script_location, # type: bool
use_user_site, # type: bool
pycompile, # type: bool
):
# type: (...) -> List[InstallationResult]
requirements: List[InstallRequirement],
install_options: List[str],
global_options: Sequence[str],
root: Optional[str],
home: Optional[str],
prefix: Optional[str],
warn_script_location: bool,
use_user_site: bool,
pycompile: bool,
) -> List[InstallationResult]:
"""
Install everything in the given list.
@ -56,8 +54,8 @@ def install_given_reqs(
if to_install:
logger.info(
'Installing collected packages: %s',
', '.join(to_install.keys()),
"Installing collected packages: %s",
", ".join(to_install.keys()),
)
installed = []
@ -65,11 +63,9 @@ def install_given_reqs(
with indent_log():
for req_name, requirement in to_install.items():
if requirement.should_reinstall:
logger.info('Attempting uninstall: %s', req_name)
logger.info("Attempting uninstall: %s", req_name)
with indent_log():
uninstalled_pathset = requirement.uninstall(
auto_confirm=True
)
uninstalled_pathset = requirement.uninstall(auto_confirm=True)
else:
uninstalled_pathset = None

View File

@ -31,17 +31,17 @@ from pip._internal.utils.urls import path_to_url
from pip._internal.vcs import is_url, vcs
__all__ = [
"install_req_from_editable", "install_req_from_line",
"parse_editable"
"install_req_from_editable",
"install_req_from_line",
"parse_editable",
]
logger = logging.getLogger(__name__)
operators = Specifier._operators.keys()
def _strip_extras(path):
# type: (str) -> Tuple[str, Optional[str]]
m = re.match(r'^(.+)(\[[^\]]+\])$', path)
def _strip_extras(path: str) -> Tuple[str, Optional[str]]:
m = re.match(r"^(.+)(\[[^\]]+\])$", path)
extras = None
if m:
path_no_extras = m.group(1)
@ -52,15 +52,13 @@ def _strip_extras(path):
return path_no_extras, extras
def convert_extras(extras):
# type: (Optional[str]) -> Set[str]
def convert_extras(extras: Optional[str]) -> Set[str]:
if not extras:
return set()
return Requirement("placeholder" + extras.lower()).extras
def parse_editable(editable_req):
# type: (str) -> Tuple[Optional[str], str, Set[str]]
def parse_editable(editable_req: str) -> Tuple[Optional[str], str, Set[str]]:
"""Parses an editable requirement into:
- a requirement name
- an URL
@ -77,26 +75,25 @@ def parse_editable(editable_req):
url_no_extras, extras = _strip_extras(url)
if os.path.isdir(url_no_extras):
setup_py = os.path.join(url_no_extras, 'setup.py')
setup_cfg = os.path.join(url_no_extras, 'setup.cfg')
setup_py = os.path.join(url_no_extras, "setup.py")
setup_cfg = os.path.join(url_no_extras, "setup.cfg")
if not os.path.exists(setup_py) and not os.path.exists(setup_cfg):
msg = (
'File "setup.py" or "setup.cfg" not found. Directory cannot be '
'installed in editable mode: {}'
.format(os.path.abspath(url_no_extras))
"installed in editable mode: {}".format(os.path.abspath(url_no_extras))
)
pyproject_path = make_pyproject_path(url_no_extras)
if os.path.isfile(pyproject_path):
msg += (
'\n(A "pyproject.toml" file was found, but editable '
'mode currently requires a setuptools-based build.)'
"mode currently requires a setuptools-based build.)"
)
raise InstallationError(msg)
# Treating it as code that has already been checked out
url_no_extras = path_to_url(url_no_extras)
if url_no_extras.lower().startswith('file:'):
if url_no_extras.lower().startswith("file:"):
package_name = Link(url_no_extras).egg_fragment
if extras:
return (
@ -108,8 +105,8 @@ def parse_editable(editable_req):
return package_name, url_no_extras, set()
for version_control in vcs:
if url.lower().startswith(f'{version_control}:'):
url = f'{version_control}+{url}'
if url.lower().startswith(f"{version_control}:"):
url = f"{version_control}+{url}"
break
link = Link(url)
@ -117,9 +114,9 @@ def parse_editable(editable_req):
if not link.is_vcs:
backends = ", ".join(vcs.all_schemes)
raise InstallationError(
f'{editable_req} is not a valid editable requirement. '
f'It should either be a path to a local project or a VCS URL '
f'(beginning with {backends}).'
f"{editable_req} is not a valid editable requirement. "
f"It should either be a path to a local project or a VCS URL "
f"(beginning with {backends})."
)
package_name = link.egg_fragment
@ -131,8 +128,7 @@ def parse_editable(editable_req):
return package_name, url, set()
def deduce_helpful_msg(req):
# type: (str) -> str
def deduce_helpful_msg(req: str) -> str:
"""Returns helpful msg in case requirements file does not exist,
or cannot be parsed.
@ -154,9 +150,7 @@ def deduce_helpful_msg(req):
" the packages specified within it."
).format(req)
except RequirementParseError:
logger.debug(
"Cannot parse '%s' as requirements file", req, exc_info=True
)
logger.debug("Cannot parse '%s' as requirements file", req, exc_info=True)
else:
msg += f" File '{req}' does not exist."
return msg
@ -164,11 +158,11 @@ def deduce_helpful_msg(req):
class RequirementParts:
def __init__(
self,
requirement, # type: Optional[Requirement]
link, # type: Optional[Link]
markers, # type: Optional[Marker]
extras, # type: Set[str]
self,
requirement: Optional[Requirement],
link: Optional[Link],
markers: Optional[Marker],
extras: Set[str],
):
self.requirement = requirement
self.link = link
@ -176,13 +170,12 @@ class RequirementParts:
self.extras = extras
def parse_req_from_editable(editable_req):
# type: (str) -> RequirementParts
def parse_req_from_editable(editable_req: str) -> RequirementParts:
name, url, extras_override = parse_editable(editable_req)
if name is not None:
try:
req = Requirement(name) # type: Optional[Requirement]
req: Optional[Requirement] = Requirement(name)
except InvalidRequirement:
raise InstallationError(f"Invalid requirement: '{name}'")
else:
@ -197,15 +190,14 @@ def parse_req_from_editable(editable_req):
def install_req_from_editable(
editable_req, # type: str
comes_from=None, # type: Optional[Union[InstallRequirement, str]]
use_pep517=None, # type: Optional[bool]
isolated=False, # type: bool
options=None, # type: Optional[Dict[str, Any]]
constraint=False, # type: bool
user_supplied=False, # type: bool
):
# type: (...) -> InstallRequirement
editable_req: str,
comes_from: Optional[Union[InstallRequirement, str]] = None,
use_pep517: Optional[bool] = None,
isolated: bool = False,
options: Optional[Dict[str, Any]] = None,
constraint: bool = False,
user_supplied: bool = False,
) -> InstallRequirement:
parts = parse_req_from_editable(editable_req)
@ -225,8 +217,7 @@ def install_req_from_editable(
)
def _looks_like_path(name):
# type: (str) -> bool
def _looks_like_path(name: str) -> bool:
"""Checks whether the string "looks like" a path on the filesystem.
This does not check whether the target actually exists, only judge from the
@ -245,8 +236,7 @@ def _looks_like_path(name):
return False
def _get_url_from_path(path, name):
# type: (str, str) -> Optional[str]
def _get_url_from_path(path: str, name: str) -> Optional[str]:
"""
First, it checks whether a provided path is an installable directory. If it
is, returns the path.
@ -266,25 +256,23 @@ def _get_url_from_path(path, name):
return None
if os.path.isfile(path):
return path_to_url(path)
urlreq_parts = name.split('@', 1)
urlreq_parts = name.split("@", 1)
if len(urlreq_parts) >= 2 and not _looks_like_path(urlreq_parts[0]):
# If the path contains '@' and the part before it does not look
# like a path, try to treat it as a PEP 440 URL req instead.
return None
logger.warning(
'Requirement %r looks like a filename, but the '
'file does not exist',
name
"Requirement %r looks like a filename, but the file does not exist",
name,
)
return path_to_url(path)
def parse_req_from_line(name, line_source):
# type: (str, Optional[str]) -> RequirementParts
def parse_req_from_line(name: str, line_source: Optional[str]) -> RequirementParts:
if is_url(name):
marker_sep = '; '
marker_sep = "; "
else:
marker_sep = ';'
marker_sep = ";"
if marker_sep in name:
name, markers_as_string = name.split(marker_sep, 1)
markers_as_string = markers_as_string.strip()
@ -311,9 +299,8 @@ def parse_req_from_line(name, line_source):
# it's a local file, dir, or url
if link:
# Handle relative file URLs
if link.scheme == 'file' and re.search(r'\.\./', link.url):
link = Link(
path_to_url(os.path.normpath(os.path.abspath(link.path))))
if link.scheme == "file" and re.search(r"\.\./", link.url):
link = Link(path_to_url(os.path.normpath(os.path.abspath(link.path))))
# wheel file
if link.is_wheel:
wheel = Wheel(link.filename) # can raise InvalidWheelFilename
@ -329,11 +316,10 @@ def parse_req_from_line(name, line_source):
extras = convert_extras(extras_as_string)
def with_source(text):
# type: (str) -> str
def with_source(text: str) -> str:
if not line_source:
return text
return f'{text} (from {line_source})'
return f"{text} (from {line_source})"
def _parse_req_string(req_as_string: str) -> Requirement:
try:
@ -342,16 +328,15 @@ def parse_req_from_line(name, line_source):
if os.path.sep in req_as_string:
add_msg = "It looks like a path."
add_msg += deduce_helpful_msg(req_as_string)
elif ('=' in req_as_string and
not any(op in req_as_string for op in operators)):
elif "=" in req_as_string and not any(
op in req_as_string for op in operators
):
add_msg = "= is not a valid operator. Did you mean == ?"
else:
add_msg = ''
msg = with_source(
f'Invalid requirement: {req_as_string!r}'
)
add_msg = ""
msg = with_source(f"Invalid requirement: {req_as_string!r}")
if add_msg:
msg += f'\nHint: {add_msg}'
msg += f"\nHint: {add_msg}"
raise InstallationError(msg)
else:
# Deprecate extras after specifiers: "name>=1.0[extras]"
@ -360,13 +345,13 @@ def parse_req_from_line(name, line_source):
# RequirementParts
for spec in req.specifier:
spec_str = str(spec)
if spec_str.endswith(']'):
if spec_str.endswith("]"):
msg = f"Extras after version '{spec_str}'."
raise InstallationError(msg)
return req
if req_as_string is not None:
req = _parse_req_string(req_as_string) # type: Optional[Requirement]
req: Optional[Requirement] = _parse_req_string(req_as_string)
else:
req = None
@ -374,16 +359,15 @@ def parse_req_from_line(name, line_source):
def install_req_from_line(
name, # type: str
comes_from=None, # type: Optional[Union[str, InstallRequirement]]
use_pep517=None, # type: Optional[bool]
isolated=False, # type: bool
options=None, # type: Optional[Dict[str, Any]]
constraint=False, # type: bool
line_source=None, # type: Optional[str]
user_supplied=False, # type: bool
):
# type: (...) -> InstallRequirement
name: str,
comes_from: Optional[Union[str, InstallRequirement]] = None,
use_pep517: Optional[bool] = None,
isolated: bool = False,
options: Optional[Dict[str, Any]] = None,
constraint: bool = False,
line_source: Optional[str] = None,
user_supplied: bool = False,
) -> InstallRequirement:
"""Creates an InstallRequirement from a name, which might be a
requirement, directory containing 'setup.py', filename, or URL.
@ -393,8 +377,12 @@ def install_req_from_line(
parts = parse_req_from_line(name, line_source)
return InstallRequirement(
parts.requirement, comes_from, link=parts.link, markers=parts.markers,
use_pep517=use_pep517, isolated=isolated,
parts.requirement,
comes_from,
link=parts.link,
markers=parts.markers,
use_pep517=use_pep517,
isolated=isolated,
install_options=options.get("install_options", []) if options else [],
global_options=options.get("global_options", []) if options else [],
hash_options=options.get("hashes", {}) if options else {},
@ -405,13 +393,12 @@ def install_req_from_line(
def install_req_from_req_string(
req_string, # type: str
comes_from=None, # type: Optional[InstallRequirement]
isolated=False, # type: bool
use_pep517=None, # type: Optional[bool]
user_supplied=False, # type: bool
):
# type: (...) -> InstallRequirement
req_string: str,
comes_from: Optional[InstallRequirement] = None,
isolated: bool = False,
use_pep517: Optional[bool] = None,
user_supplied: bool = False,
) -> InstallRequirement:
try:
req = Requirement(req_string)
except InvalidRequirement:
@ -421,8 +408,12 @@ def install_req_from_req_string(
PyPI.file_storage_domain,
TestPyPI.file_storage_domain,
]
if (req.url and comes_from and comes_from.link and
comes_from.link.netloc in domains_not_allowed):
if (
req.url
and comes_from
and comes_from.link
and comes_from.link.netloc in domains_not_allowed
):
# Explicitly disallow pypi packages that depend on external urls
raise InstallationError(
"Packages installed from PyPI cannot depend on packages "
@ -440,12 +431,11 @@ def install_req_from_req_string(
def install_req_from_parsed_requirement(
parsed_req, # type: ParsedRequirement
isolated=False, # type: bool
use_pep517=None, # type: Optional[bool]
user_supplied=False, # type: bool
):
# type: (...) -> InstallRequirement
parsed_req: ParsedRequirement,
isolated: bool = False,
use_pep517: Optional[bool] = None,
user_supplied: bool = False,
) -> InstallRequirement:
if parsed_req.is_editable:
req = install_req_from_editable(
parsed_req.requirement,
@ -470,8 +460,9 @@ def install_req_from_parsed_requirement(
return req
def install_req_from_link_and_ireq(link, ireq):
# type: (Link, InstallRequirement) -> InstallRequirement
def install_req_from_link_and_ireq(
link: Link, ireq: InstallRequirement
) -> InstallRequirement:
return InstallRequirement(
req=ireq.req,
comes_from=ireq.comes_from,

View File

@ -25,22 +25,22 @@ if TYPE_CHECKING:
from pip._internal.index.package_finder import PackageFinder
__all__ = ['parse_requirements']
__all__ = ["parse_requirements"]
ReqFileLines = Iterator[Tuple[int, str]]
LineParser = Callable[[str], Tuple[str, Values]]
SCHEME_RE = re.compile(r'^(http|https|file):', re.I)
COMMENT_RE = re.compile(r'(^|\s+)#.*$')
SCHEME_RE = re.compile(r"^(http|https|file):", re.I)
COMMENT_RE = re.compile(r"(^|\s+)#.*$")
# Matches environment variable-style values in '${MY_VARIABLE_1}' with the
# variable name consisting of only uppercase letters, digits or the '_'
# (underscore). This follows the POSIX standard defined in IEEE Std 1003.1,
# 2013 Edition.
ENV_VAR_RE = re.compile(r'(?P<var>\$\{(?P<name>[A-Z0-9_]+)\})')
ENV_VAR_RE = re.compile(r"(?P<var>\$\{(?P<name>[A-Z0-9_]+)\})")
SUPPORTED_OPTIONS = [
SUPPORTED_OPTIONS: List[Callable[..., optparse.Option]] = [
cmdoptions.index_url,
cmdoptions.extra_index_url,
cmdoptions.no_index,
@ -55,14 +55,14 @@ SUPPORTED_OPTIONS = [
cmdoptions.pre,
cmdoptions.trusted_host,
cmdoptions.use_new_feature,
] # type: List[Callable[..., optparse.Option]]
]
# options to be passed to requirements
SUPPORTED_OPTIONS_REQ = [
SUPPORTED_OPTIONS_REQ: List[Callable[..., optparse.Option]] = [
cmdoptions.install_options,
cmdoptions.global_options,
cmdoptions.hash,
] # type: List[Callable[..., optparse.Option]]
]
# the 'dest' string values
SUPPORTED_OPTIONS_REQ_DEST = [str(o().dest) for o in SUPPORTED_OPTIONS_REQ]
@ -71,14 +71,13 @@ SUPPORTED_OPTIONS_REQ_DEST = [str(o().dest) for o in SUPPORTED_OPTIONS_REQ]
class ParsedRequirement:
def __init__(
self,
requirement, # type:str
is_editable, # type: bool
comes_from, # type: str
constraint, # type: bool
options=None, # type: Optional[Dict[str, Any]]
line_source=None, # type: Optional[str]
):
# type: (...) -> None
requirement: str,
is_editable: bool,
comes_from: str,
constraint: bool,
options: Optional[Dict[str, Any]] = None,
line_source: Optional[str] = None,
) -> None:
self.requirement = requirement
self.is_editable = is_editable
self.comes_from = comes_from
@ -90,13 +89,12 @@ class ParsedRequirement:
class ParsedLine:
def __init__(
self,
filename, # type: str
lineno, # type: int
args, # type: str
opts, # type: Values
constraint, # type: bool
):
# type: (...) -> None
filename: str,
lineno: int,
args: str,
opts: Values,
constraint: bool,
) -> None:
self.filename = filename
self.lineno = lineno
self.opts = opts
@ -116,13 +114,12 @@ class ParsedLine:
def parse_requirements(
filename, # type: str
session, # type: PipSession
finder=None, # type: Optional[PackageFinder]
options=None, # type: Optional[optparse.Values]
constraint=False, # type: bool
):
# type: (...) -> Iterator[ParsedRequirement]
filename: str,
session: PipSession,
finder: Optional["PackageFinder"] = None,
options: Optional[optparse.Values] = None,
constraint: bool = False,
) -> Iterator[ParsedRequirement]:
"""Parse a requirements file and yield ParsedRequirement instances.
:param filename: Path or url of requirements file.
@ -137,22 +134,18 @@ def parse_requirements(
for parsed_line in parser.parse(filename, constraint):
parsed_req = handle_line(
parsed_line,
options=options,
finder=finder,
session=session
parsed_line, options=options, finder=finder, session=session
)
if parsed_req is not None:
yield parsed_req
def preprocess(content):
# type: (str) -> ReqFileLines
def preprocess(content: str) -> ReqFileLines:
"""Split, filter, and join lines, and return a line iterator
:param content: the content of the requirements file
"""
lines_enum = enumerate(content.splitlines(), start=1) # type: ReqFileLines
lines_enum: ReqFileLines = enumerate(content.splitlines(), start=1)
lines_enum = join_lines(lines_enum)
lines_enum = ignore_comments(lines_enum)
lines_enum = expand_env_variables(lines_enum)
@ -160,14 +153,15 @@ def preprocess(content):
def handle_requirement_line(
line, # type: ParsedLine
options=None, # type: Optional[optparse.Values]
):
# type: (...) -> ParsedRequirement
line: ParsedLine,
options: Optional[optparse.Values] = None,
) -> ParsedRequirement:
# preserve for the nested code path
line_comes_from = '{} {} (line {})'.format(
'-c' if line.constraint else '-r', line.filename, line.lineno,
line_comes_from = "{} {} (line {})".format(
"-c" if line.constraint else "-r",
line.filename,
line.lineno,
)
assert line.is_requirement
@ -192,7 +186,7 @@ def handle_requirement_line(
if dest in line.opts.__dict__ and line.opts.__dict__[dest]:
req_options[dest] = line.opts.__dict__[dest]
line_source = f'line {line.lineno} of {line.filename}'
line_source = f"line {line.lineno} of {line.filename}"
return ParsedRequirement(
requirement=line.requirement,
is_editable=line.is_editable,
@ -204,14 +198,13 @@ def handle_requirement_line(
def handle_option_line(
opts, # type: Values
filename, # type: str
lineno, # type: int
finder=None, # type: Optional[PackageFinder]
options=None, # type: Optional[optparse.Values]
session=None, # type: Optional[PipSession]
):
# type: (...) -> None
opts: Values,
filename: str,
lineno: int,
finder: Optional["PackageFinder"] = None,
options: Optional[optparse.Values] = None,
session: Optional[PipSession] = None,
) -> None:
if options:
# percolate options upward
@ -219,8 +212,7 @@ def handle_option_line(
options.require_hashes = opts.require_hashes
if opts.features_enabled:
options.features_enabled.extend(
f for f in opts.features_enabled
if f not in options.features_enabled
f for f in opts.features_enabled if f not in options.features_enabled
)
# set finder options
@ -262,17 +254,16 @@ def handle_option_line(
if session:
for host in opts.trusted_hosts or []:
source = f'line {lineno} of {filename}'
source = f"line {lineno} of {filename}"
session.add_trusted_host(host, source=source)
def handle_line(
line, # type: ParsedLine
options=None, # type: Optional[optparse.Values]
finder=None, # type: Optional[PackageFinder]
session=None, # type: Optional[PipSession]
):
# type: (...) -> Optional[ParsedRequirement]
line: ParsedLine,
options: Optional[optparse.Values] = None,
finder: Optional["PackageFinder"] = None,
session: Optional[PipSession] = None,
) -> Optional[ParsedRequirement]:
"""Handle a single parsed requirements line; This can result in
creating/yielding requirements, or updating the finder.
@ -314,25 +305,22 @@ def handle_line(
class RequirementsFileParser:
def __init__(
self,
session, # type: PipSession
line_parser, # type: LineParser
):
# type: (...) -> None
session: PipSession,
line_parser: LineParser,
) -> None:
self._session = session
self._line_parser = line_parser
def parse(self, filename, constraint):
# type: (str, bool) -> Iterator[ParsedLine]
"""Parse a given file, yielding parsed lines.
"""
def parse(self, filename: str, constraint: bool) -> Iterator[ParsedLine]:
"""Parse a given file, yielding parsed lines."""
yield from self._parse_and_recurse(filename, constraint)
def _parse_and_recurse(self, filename, constraint):
# type: (str, bool) -> Iterator[ParsedLine]
def _parse_and_recurse(
self, filename: str, constraint: bool
) -> Iterator[ParsedLine]:
for line in self._parse_file(filename, constraint):
if (
not line.is_requirement and
(line.opts.requirements or line.opts.constraints)
if not line.is_requirement and (
line.opts.requirements or line.opts.constraints
):
# parse a nested requirements file
if line.opts.requirements:
@ -350,15 +338,15 @@ class RequirementsFileParser:
elif not SCHEME_RE.search(req_path):
# do a join so relative paths work
req_path = os.path.join(
os.path.dirname(filename), req_path,
os.path.dirname(filename),
req_path,
)
yield from self._parse_and_recurse(req_path, nested_constraint)
else:
yield line
def _parse_file(self, filename, constraint):
# type: (str, bool) -> Iterator[ParsedLine]
def _parse_file(self, filename: str, constraint: bool) -> Iterator[ParsedLine]:
_, content = get_file_content(filename, self._session)
lines_enum = preprocess(content)
@ -368,7 +356,7 @@ class RequirementsFileParser:
args_str, opts = self._line_parser(line)
except OptionParsingError as e:
# add offending line
msg = f'Invalid requirement: {line}\n{e.msg}'
msg = f"Invalid requirement: {line}\n{e.msg}"
raise RequirementsFileParseError(msg)
yield ParsedLine(
@ -380,10 +368,8 @@ class RequirementsFileParser:
)
def get_line_parser(finder):
# type: (Optional[PackageFinder]) -> LineParser
def parse_line(line):
# type: (str) -> Tuple[str, Values]
def get_line_parser(finder: Optional["PackageFinder"]) -> LineParser:
def parse_line(line: str) -> Tuple[str, Values]:
# Build new parser for each line since it accumulates appendable
# options.
parser = build_parser()
@ -401,32 +387,29 @@ def get_line_parser(finder):
return parse_line
def break_args_options(line):
# type: (str) -> Tuple[str, str]
def break_args_options(line: str) -> Tuple[str, str]:
"""Break up the line into an args and options string. We only want to shlex
(and then optparse) the options, not the args. args can contain markers
which are corrupted by shlex.
"""
tokens = line.split(' ')
tokens = line.split(" ")
args = []
options = tokens[:]
for token in tokens:
if token.startswith('-') or token.startswith('--'):
if token.startswith("-") or token.startswith("--"):
break
else:
args.append(token)
options.pop(0)
return ' '.join(args), ' '.join(options)
return " ".join(args), " ".join(options)
class OptionParsingError(Exception):
def __init__(self, msg):
# type: (str) -> None
def __init__(self, msg: str) -> None:
self.msg = msg
def build_parser():
# type: () -> optparse.OptionParser
def build_parser() -> optparse.OptionParser:
"""
Return a parser for parsing requirement lines
"""
@ -439,9 +422,9 @@ def build_parser():
# By default optparse sys.exits on parsing errors. We want to wrap
# that in our own exception.
def parser_exit(self, msg):
# type: (Any, str) -> NoReturn
def parser_exit(self: Any, msg: str) -> "NoReturn":
raise OptionParsingError(msg)
# NOTE: mypy disallows assigning to a method
# https://github.com/python/mypy/issues/2427
parser.exit = parser_exit # type: ignore
@ -449,52 +432,49 @@ def build_parser():
return parser
def join_lines(lines_enum):
# type: (ReqFileLines) -> ReqFileLines
def join_lines(lines_enum: ReqFileLines) -> ReqFileLines:
"""Joins a line ending in '\' with the previous line (except when following
comments). The joined line takes on the index of the first line.
"""
primary_line_number = None
new_line = [] # type: List[str]
new_line: List[str] = []
for line_number, line in lines_enum:
if not line.endswith('\\') or COMMENT_RE.match(line):
if not line.endswith("\\") or COMMENT_RE.match(line):
if COMMENT_RE.match(line):
# this ensures comments are always matched later
line = ' ' + line
line = " " + line
if new_line:
new_line.append(line)
assert primary_line_number is not None
yield primary_line_number, ''.join(new_line)
yield primary_line_number, "".join(new_line)
new_line = []
else:
yield line_number, line
else:
if not new_line:
primary_line_number = line_number
new_line.append(line.strip('\\'))
new_line.append(line.strip("\\"))
# last line contains \
if new_line:
assert primary_line_number is not None
yield primary_line_number, ''.join(new_line)
yield primary_line_number, "".join(new_line)
# TODO: handle space after '\'.
def ignore_comments(lines_enum):
# type: (ReqFileLines) -> ReqFileLines
def ignore_comments(lines_enum: ReqFileLines) -> ReqFileLines:
"""
Strips comments and filter empty lines.
"""
for line_number, line in lines_enum:
line = COMMENT_RE.sub('', line)
line = COMMENT_RE.sub("", line)
line = line.strip()
if line:
yield line_number, line
def expand_env_variables(lines_enum):
# type: (ReqFileLines) -> ReqFileLines
def expand_env_variables(lines_enum: ReqFileLines) -> ReqFileLines:
"""Replace all environment variables that can be retrieved via `os.getenv`.
The only allowed format for environment variables defined in the
@ -521,8 +501,7 @@ def expand_env_variables(lines_enum):
yield line_number, line
def get_file_content(url, session):
# type: (str, PipSession) -> Tuple[str, str]
def get_file_content(url: str, session: PipSession) -> Tuple[str, str]:
"""Gets the content of a file; it may be a filename, file: URL, or
http: URL. Returns (location, content). Content is unicode.
Respects # -*- coding: declarations on the retrieved files.
@ -533,15 +512,15 @@ def get_file_content(url, session):
scheme = get_url_scheme(url)
# Pip has special support for file:// URLs (LocalFSAdapter).
if scheme in ['http', 'https', 'file']:
if scheme in ["http", "https", "file"]:
resp = session.get(url)
raise_for_status(resp)
return resp.url, resp.text
# Assume this is a bare path.
try:
with open(url, 'rb') as f:
with open(url, "rb") as f:
content = auto_decode(f.read())
except OSError as exc:
raise InstallationError(f'Could not open requirements file: {exc}')
raise InstallationError(f"Could not open requirements file: {exc}")
return url, content

View File

@ -57,8 +57,7 @@ from pip._internal.vcs import vcs
logger = logging.getLogger(__name__)
def _get_dist(metadata_directory):
# type: (str) -> Distribution
def _get_dist(metadata_directory: str) -> Distribution:
"""Return a pkg_resources.Distribution for the provided
metadata directory.
"""
@ -93,40 +92,37 @@ class InstallRequirement:
def __init__(
self,
req, # type: Optional[Requirement]
comes_from, # type: Optional[Union[str, InstallRequirement]]
editable=False, # type: bool
link=None, # type: Optional[Link]
markers=None, # type: Optional[Marker]
use_pep517=None, # type: Optional[bool]
isolated=False, # type: bool
install_options=None, # type: Optional[List[str]]
global_options=None, # type: Optional[List[str]]
hash_options=None, # type: Optional[Dict[str, List[str]]]
constraint=False, # type: bool
extras=(), # type: Iterable[str]
user_supplied=False, # type: bool
):
# type: (...) -> None
req: Optional[Requirement],
comes_from: Optional[Union[str, "InstallRequirement"]],
editable: bool = False,
link: Optional[Link] = None,
markers: Optional[Marker] = None,
use_pep517: Optional[bool] = None,
isolated: bool = False,
install_options: Optional[List[str]] = None,
global_options: Optional[List[str]] = None,
hash_options: Optional[Dict[str, List[str]]] = None,
constraint: bool = False,
extras: Iterable[str] = (),
user_supplied: bool = False,
) -> None:
assert req is None or isinstance(req, Requirement), req
self.req = req
self.comes_from = comes_from
self.constraint = constraint
self.editable = editable
self.legacy_install_reason = None # type: Optional[int]
self.legacy_install_reason: Optional[int] = None
# source_dir is the local directory where the linked requirement is
# located, or unpacked. In case unpacking is needed, creating and
# populating source_dir is done by the RequirementPreparer. Note this
# is not necessarily the directory where pyproject.toml or setup.py is
# located - that one is obtained via unpacked_source_directory.
self.source_dir = None # type: Optional[str]
self.source_dir: Optional[str] = None
if self.editable:
assert link
if link.is_file:
self.source_dir = os.path.normpath(
os.path.abspath(link.file_path)
)
self.source_dir = os.path.normpath(os.path.abspath(link.file_path))
if link is None and req and req.url:
# PEP 508 URL requirement
@ -135,16 +131,14 @@ class InstallRequirement:
self.original_link_is_in_wheel_cache = False
# Path to any downloaded or already-existing package.
self.local_file_path = None # type: Optional[str]
self.local_file_path: Optional[str] = None
if self.link and self.link.is_file:
self.local_file_path = self.link.file_path
if extras:
self.extras = extras
elif req:
self.extras = {
pkg_resources.safe_extra(extra) for extra in req.extras
}
self.extras = {pkg_resources.safe_extra(extra) for extra in req.extras}
else:
self.extras = set()
if markers is None and req:
@ -153,14 +147,14 @@ class InstallRequirement:
# This holds the pkg_resources.Distribution object if this requirement
# is already available:
self.satisfied_by = None # type: Optional[Distribution]
self.satisfied_by: Optional[Distribution] = None
# Whether the installation process should try to uninstall an existing
# distribution before installing this requirement.
self.should_reinstall = False
# Temporary build location
self._temp_build_dir = None # type: Optional[TempDirectory]
self._temp_build_dir: Optional[TempDirectory] = None
# Set to True after successful installation
self.install_succeeded = None # type: Optional[bool]
self.install_succeeded: Optional[bool] = None
# Supplied options
self.install_options = install_options if install_options else []
self.global_options = global_options if global_options else []
@ -173,22 +167,22 @@ class InstallRequirement:
self.user_supplied = user_supplied
self.isolated = isolated
self.build_env = NoOpBuildEnvironment() # type: BuildEnvironment
self.build_env: BuildEnvironment = NoOpBuildEnvironment()
# For PEP 517, the directory where we request the project metadata
# gets stored. We need this to pass to build_wheel, so the backend
# can ensure that the wheel matches the metadata (see the PEP for
# details).
self.metadata_directory = None # type: Optional[str]
self.metadata_directory: Optional[str] = None
# The static build requirements (from pyproject.toml)
self.pyproject_requires = None # type: Optional[List[str]]
self.pyproject_requires: Optional[List[str]] = None
# Build requirements that we will check are available
self.requirements_to_check = [] # type: List[str]
self.requirements_to_check: List[str] = []
# The PEP 517 backend we should use to build the project
self.pep517_backend = None # type: Optional[Pep517HookCaller]
self.pep517_backend: Optional[Pep517HookCaller] = None
# Are we using PEP 517 for this requirement?
# After pyproject.toml has been loaded, the only valid values are True
@ -200,87 +194,76 @@ class InstallRequirement:
# This requirement needs more preparation before it can be built
self.needs_more_preparation = False
def __str__(self):
# type: () -> str
def __str__(self) -> str:
if self.req:
s = str(self.req)
if self.link:
s += ' from {}'.format(redact_auth_from_url(self.link.url))
s += " from {}".format(redact_auth_from_url(self.link.url))
elif self.link:
s = redact_auth_from_url(self.link.url)
else:
s = '<InstallRequirement>'
s = "<InstallRequirement>"
if self.satisfied_by is not None:
s += ' in {}'.format(display_path(self.satisfied_by.location))
s += " in {}".format(display_path(self.satisfied_by.location))
if self.comes_from:
if isinstance(self.comes_from, str):
comes_from = self.comes_from # type: Optional[str]
comes_from: Optional[str] = self.comes_from
else:
comes_from = self.comes_from.from_path()
if comes_from:
s += f' (from {comes_from})'
s += f" (from {comes_from})"
return s
def __repr__(self):
# type: () -> str
return '<{} object: {} editable={!r}>'.format(
self.__class__.__name__, str(self), self.editable)
def __repr__(self) -> str:
return "<{} object: {} editable={!r}>".format(
self.__class__.__name__, str(self), self.editable
)
def format_debug(self):
# type: () -> str
"""An un-tested helper for getting state, for debugging.
"""
def format_debug(self) -> str:
"""An un-tested helper for getting state, for debugging."""
attributes = vars(self)
names = sorted(attributes)
state = (
"{}={!r}".format(attr, attributes[attr]) for attr in sorted(names)
)
return '<{name} object: {{{state}}}>'.format(
state = ("{}={!r}".format(attr, attributes[attr]) for attr in sorted(names))
return "<{name} object: {{{state}}}>".format(
name=self.__class__.__name__,
state=", ".join(state),
)
# Things that are valid for all kinds of requirements?
@property
def name(self):
# type: () -> Optional[str]
def name(self) -> Optional[str]:
if self.req is None:
return None
return pkg_resources.safe_name(self.req.name)
@property
def specifier(self):
# type: () -> SpecifierSet
def specifier(self) -> SpecifierSet:
return self.req.specifier
@property
def is_pinned(self):
# type: () -> bool
def is_pinned(self) -> bool:
"""Return whether I am pinned to an exact version.
For example, some-package==1.2 is pinned; some-package>1.2 is not.
"""
specifiers = self.specifier
return (len(specifiers) == 1 and
next(iter(specifiers)).operator in {'==', '==='})
return len(specifiers) == 1 and next(iter(specifiers)).operator in {"==", "==="}
def match_markers(self, extras_requested=None):
# type: (Optional[Iterable[str]]) -> bool
def match_markers(self, extras_requested: Optional[Iterable[str]] = None) -> bool:
if not extras_requested:
# Provide an extra to safely evaluate the markers
# without matching any extra
extras_requested = ('',)
extras_requested = ("",)
if self.markers is not None:
return any(
self.markers.evaluate({'extra': extra})
for extra in extras_requested)
self.markers.evaluate({"extra": extra}) for extra in extras_requested
)
else:
return True
@property
def has_hash_options(self):
# type: () -> bool
def has_hash_options(self) -> bool:
"""Return whether any known-good hashes are specified as options.
These activate --require-hashes mode; hashes specified as part of a
@ -289,8 +272,7 @@ class InstallRequirement:
"""
return bool(self.hash_options)
def hashes(self, trust_internet=True):
# type: (bool) -> Hashes
def hashes(self, trust_internet: bool = True) -> Hashes:
"""Return a hash-comparer that considers my option- and URL-based
hashes to be known-good.
@ -311,10 +293,8 @@ class InstallRequirement:
good_hashes.setdefault(link.hash_name, []).append(link.hash)
return Hashes(good_hashes)
def from_path(self):
# type: () -> Optional[str]
"""Format a nice indicator to show where this "comes from"
"""
def from_path(self) -> Optional[str]:
"""Format a nice indicator to show where this "comes from" """
if self.req is None:
return None
s = str(self.req)
@ -324,11 +304,12 @@ class InstallRequirement:
else:
comes_from = self.comes_from.from_path()
if comes_from:
s += '->' + comes_from
s += "->" + comes_from
return s
def ensure_build_location(self, build_dir, autodelete, parallel_builds):
# type: (str, bool, bool) -> str
def ensure_build_location(
self, build_dir: str, autodelete: bool, parallel_builds: bool
) -> str:
assert build_dir is not None
if self._temp_build_dir is not None:
assert self._temp_build_dir.path
@ -349,14 +330,14 @@ class InstallRequirement:
# When parallel builds are enabled, add a UUID to the build directory
# name so multiple builds do not interfere with each other.
dir_name = canonicalize_name(self.name) # type: str
dir_name: str = canonicalize_name(self.name)
if parallel_builds:
dir_name = f"{dir_name}_{uuid.uuid4().hex}"
# FIXME: Is there a better place to create the build_dir? (hg and bzr
# need this)
if not os.path.exists(build_dir):
logger.debug('Creating directory %s', build_dir)
logger.debug("Creating directory %s", build_dir)
os.makedirs(build_dir)
actual_build_dir = os.path.join(build_dir, dir_name)
# `None` indicates that we respect the globally-configured deletion
@ -369,10 +350,8 @@ class InstallRequirement:
globally_managed=True,
).path
def _set_requirement(self):
# type: () -> None
"""Set requirement after generating metadata.
"""
def _set_requirement(self) -> None:
"""Set requirement after generating metadata."""
assert self.req is None
assert self.metadata is not None
assert self.source_dir is not None
@ -384,15 +363,16 @@ class InstallRequirement:
op = "==="
self.req = Requirement(
"".join([
self.metadata["Name"],
op,
self.metadata["Version"],
])
"".join(
[
self.metadata["Name"],
op,
self.metadata["Version"],
]
)
)
def warn_on_mismatching_name(self):
# type: () -> None
def warn_on_mismatching_name(self) -> None:
metadata_name = canonicalize_name(self.metadata["Name"])
if canonicalize_name(self.req.name) == metadata_name:
# Everything is fine.
@ -400,15 +380,16 @@ class InstallRequirement:
# If we're here, there's a mismatch. Log a warning about it.
logger.warning(
'Generating metadata for package %s '
'produced metadata for project name %s. Fix your '
'#egg=%s fragments.',
self.name, metadata_name, self.name
"Generating metadata for package %s "
"produced metadata for project name %s. Fix your "
"#egg=%s fragments.",
self.name,
metadata_name,
self.name,
)
self.req = Requirement(metadata_name)
def check_if_exists(self, use_user_site):
# type: (bool) -> None
def check_if_exists(self, use_user_site: bool) -> None:
"""Find an installed distribution that satisfies or conflicts
with this requirement, and set self.satisfied_by or
self.should_reinstall appropriately.
@ -425,20 +406,22 @@ class InstallRequirement:
# parses the version instead.
existing_version = existing_dist.version
version_compatible = (
existing_version is not None and
self.req.specifier.contains(existing_version, prereleases=True)
existing_version is not None
and self.req.specifier.contains(existing_version, prereleases=True)
)
if not version_compatible:
self.satisfied_by = None
if use_user_site:
if dist_in_usersite(existing_dist):
self.should_reinstall = True
elif (running_under_virtualenv() and
dist_in_site_packages(existing_dist)):
elif running_under_virtualenv() and dist_in_site_packages(
existing_dist
):
raise InstallationError(
"Will not install to the user site because it will "
"lack sys.path precedence to {} in {}".format(
existing_dist.project_name, existing_dist.location)
existing_dist.project_name, existing_dist.location
)
)
else:
self.should_reinstall = True
@ -453,36 +436,31 @@ class InstallRequirement:
# Things valid for wheels
@property
def is_wheel(self):
# type: () -> bool
def is_wheel(self) -> bool:
if not self.link:
return False
return self.link.is_wheel
# Things valid for sdists
@property
def unpacked_source_directory(self):
# type: () -> str
def unpacked_source_directory(self) -> str:
return os.path.join(
self.source_dir,
self.link and self.link.subdirectory_fragment or '')
self.source_dir, self.link and self.link.subdirectory_fragment or ""
)
@property
def setup_py_path(self):
# type: () -> str
def setup_py_path(self) -> str:
assert self.source_dir, f"No source dir for {self}"
setup_py = os.path.join(self.unpacked_source_directory, 'setup.py')
setup_py = os.path.join(self.unpacked_source_directory, "setup.py")
return setup_py
@property
def pyproject_toml_path(self):
# type: () -> str
def pyproject_toml_path(self) -> str:
assert self.source_dir, f"No source dir for {self}"
return make_pyproject_path(self.unpacked_source_directory)
def load_pyproject_toml(self):
# type: () -> None
def load_pyproject_toml(self) -> None:
"""Load the pyproject.toml file.
After calling this routine, all of the attributes related to PEP 517
@ -491,10 +469,7 @@ class InstallRequirement:
follow the PEP 517 or legacy (setup.py) code path.
"""
pyproject_toml_data = load_pyproject_toml(
self.use_pep517,
self.pyproject_toml_path,
self.setup_py_path,
str(self)
self.use_pep517, self.pyproject_toml_path, self.setup_py_path, str(self)
)
if pyproject_toml_data is None:
@ -506,13 +481,13 @@ class InstallRequirement:
self.requirements_to_check = check
self.pyproject_requires = requires
self.pep517_backend = Pep517HookCaller(
self.unpacked_source_directory, backend, backend_path=backend_path,
self.unpacked_source_directory,
backend,
backend_path=backend_path,
)
def _generate_metadata(self):
# type: () -> str
"""Invokes metadata generator functions, with the required arguments.
"""
def _generate_metadata(self) -> str:
"""Invokes metadata generator functions, with the required arguments."""
if not self.use_pep517:
assert self.unpacked_source_directory
@ -526,7 +501,7 @@ class InstallRequirement:
setup_py_path=self.setup_py_path,
source_dir=self.unpacked_source_directory,
isolated=self.isolated,
details=self.name or f"from {self.link}"
details=self.name or f"from {self.link}",
)
assert self.pep517_backend is not None
@ -536,8 +511,7 @@ class InstallRequirement:
backend=self.pep517_backend,
)
def prepare_metadata(self):
# type: () -> None
def prepare_metadata(self) -> None:
"""Ensure that project metadata is available.
Under PEP 517, call the backend hook to prepare the metadata.
@ -557,30 +531,27 @@ class InstallRequirement:
self.assert_source_matches_version()
@property
def metadata(self):
# type: () -> Any
if not hasattr(self, '_metadata'):
def metadata(self) -> Any:
if not hasattr(self, "_metadata"):
self._metadata = get_metadata(self.get_dist())
return self._metadata
def get_dist(self):
# type: () -> Distribution
def get_dist(self) -> Distribution:
return _get_dist(self.metadata_directory)
def assert_source_matches_version(self):
# type: () -> None
def assert_source_matches_version(self) -> None:
assert self.source_dir
version = self.metadata['version']
version = self.metadata["version"]
if self.req.specifier and version not in self.req.specifier:
logger.warning(
'Requested %s, but installing version %s',
"Requested %s, but installing version %s",
self,
version,
)
else:
logger.debug(
'Source in %s has version %s, which satisfies requirement %s',
"Source in %s has version %s, which satisfies requirement %s",
display_path(self.source_dir),
version,
self,
@ -589,11 +560,10 @@ class InstallRequirement:
# For both source distributions and editables
def ensure_has_source_dir(
self,
parent_dir,
autodelete=False,
parallel_builds=False,
):
# type: (str, bool, bool) -> None
parent_dir: str,
autodelete: bool = False,
parallel_builds: bool = False,
) -> None:
"""Ensure that a source_dir is set.
This will create a temporary build dir if the name of the requirement
@ -611,18 +581,16 @@ class InstallRequirement:
)
# For editable installations
def update_editable(self):
# type: () -> None
def update_editable(self) -> None:
if not self.link:
logger.debug(
"Cannot update repository at %s; repository location is "
"unknown",
"Cannot update repository at %s; repository location is unknown",
self.source_dir,
)
return
assert self.editable
assert self.source_dir
if self.link.scheme == 'file':
if self.link.scheme == "file":
# Static paths don't get updated
return
vcs_backend = vcs.get_backend_for_scheme(self.link.scheme)
@ -633,8 +601,9 @@ class InstallRequirement:
vcs_backend.obtain(self.source_dir, url=hidden_url)
# Top-level Actions
def uninstall(self, auto_confirm=False, verbose=False):
# type: (bool, bool) -> Optional[UninstallPathSet]
def uninstall(
self, auto_confirm: bool = False, verbose: bool = False
) -> Optional[UninstallPathSet]:
"""
Uninstall the distribution currently satisfying this requirement.
@ -652,30 +621,26 @@ class InstallRequirement:
if not dist:
logger.warning("Skipping %s as it is not installed.", self.name)
return None
logger.info('Found existing installation: %s', dist)
logger.info("Found existing installation: %s", dist)
uninstalled_pathset = UninstallPathSet.from_dist(dist)
uninstalled_pathset.remove(auto_confirm, verbose)
return uninstalled_pathset
def _get_archive_name(self, path, parentdir, rootdir):
# type: (str, str, str) -> str
def _clean_zip_name(name, prefix):
# type: (str, str) -> str
assert name.startswith(prefix + os.path.sep), (
f"name {name!r} doesn't start with prefix {prefix!r}"
)
name = name[len(prefix) + 1:]
name = name.replace(os.path.sep, '/')
def _get_archive_name(self, path: str, parentdir: str, rootdir: str) -> str:
def _clean_zip_name(name: str, prefix: str) -> str:
assert name.startswith(
prefix + os.path.sep
), f"name {name!r} doesn't start with prefix {prefix!r}"
name = name[len(prefix) + 1 :]
name = name.replace(os.path.sep, "/")
return name
path = os.path.join(parentdir, path)
name = _clean_zip_name(path, rootdir)
return self.name + '/' + name
return self.name + "/" + name
def archive(self, build_dir):
# type: (Optional[str]) -> None
def archive(self, build_dir: Optional[str]) -> None:
"""Saves archive to provided build_dir.
Used for saving downloaded VCS requirements as part of `pip download`.
@ -685,70 +650,74 @@ class InstallRequirement:
return
create_archive = True
archive_name = '{}-{}.zip'.format(self.name, self.metadata["version"])
archive_name = "{}-{}.zip".format(self.name, self.metadata["version"])
archive_path = os.path.join(build_dir, archive_name)
if os.path.exists(archive_path):
response = ask_path_exists(
'The file {} exists. (i)gnore, (w)ipe, '
'(b)ackup, (a)bort '.format(
display_path(archive_path)),
('i', 'w', 'b', 'a'))
if response == 'i':
"The file {} exists. (i)gnore, (w)ipe, "
"(b)ackup, (a)bort ".format(display_path(archive_path)),
("i", "w", "b", "a"),
)
if response == "i":
create_archive = False
elif response == 'w':
logger.warning('Deleting %s', display_path(archive_path))
elif response == "w":
logger.warning("Deleting %s", display_path(archive_path))
os.remove(archive_path)
elif response == 'b':
elif response == "b":
dest_file = backup_dir(archive_path)
logger.warning(
'Backing up %s to %s',
"Backing up %s to %s",
display_path(archive_path),
display_path(dest_file),
)
shutil.move(archive_path, dest_file)
elif response == 'a':
elif response == "a":
sys.exit(-1)
if not create_archive:
return
zip_output = zipfile.ZipFile(
archive_path, 'w', zipfile.ZIP_DEFLATED, allowZip64=True,
archive_path,
"w",
zipfile.ZIP_DEFLATED,
allowZip64=True,
)
with zip_output:
dir = os.path.normcase(
os.path.abspath(self.unpacked_source_directory)
)
dir = os.path.normcase(os.path.abspath(self.unpacked_source_directory))
for dirpath, dirnames, filenames in os.walk(dir):
for dirname in dirnames:
dir_arcname = self._get_archive_name(
dirname, parentdir=dirpath, rootdir=dir,
dirname,
parentdir=dirpath,
rootdir=dir,
)
zipdir = zipfile.ZipInfo(dir_arcname + '/')
zipdir = zipfile.ZipInfo(dir_arcname + "/")
zipdir.external_attr = 0x1ED << 16 # 0o755
zip_output.writestr(zipdir, '')
zip_output.writestr(zipdir, "")
for filename in filenames:
file_arcname = self._get_archive_name(
filename, parentdir=dirpath, rootdir=dir,
filename,
parentdir=dirpath,
rootdir=dir,
)
filename = os.path.join(dirpath, filename)
zip_output.write(filename, file_arcname)
logger.info('Saved %s', display_path(archive_path))
logger.info("Saved %s", display_path(archive_path))
def install(
self,
install_options, # type: List[str]
global_options=None, # type: Optional[Sequence[str]]
root=None, # type: Optional[str]
home=None, # type: Optional[str]
prefix=None, # type: Optional[str]
warn_script_location=True, # type: bool
use_user_site=False, # type: bool
pycompile=True # type: bool
):
# type: (...) -> None
install_options: List[str],
global_options: Optional[Sequence[str]] = None,
root: Optional[str] = None,
home: Optional[str] = None,
prefix: Optional[str] = None,
warn_script_location: bool = True,
use_user_site: bool = False,
pycompile: bool = True,
) -> None:
scheme = get_scheme(
self.name,
user=use_user_site,
@ -837,8 +806,9 @@ class InstallRequirement:
deprecated(
reason=(
"{} was installed using the legacy 'setup.py install' "
"method, because a wheel could not be built for it.".
format(self.name)
"method, because a wheel could not be built for it.".format(
self.name
)
),
replacement="to fix the wheel build issue reported above",
gone_in=None,
@ -846,8 +816,7 @@ class InstallRequirement:
)
def check_invalid_constraint_type(req):
# type: (InstallRequirement) -> str
def check_invalid_constraint_type(req: InstallRequirement) -> str:
# Check for unsupported forms
problem = ""
@ -867,12 +836,10 @@ def check_invalid_constraint_type(req):
"undocumented. The new implementation of the resolver no "
"longer supports these forms."
),
replacement=(
"replacing the constraint with a requirement."
),
replacement="replacing the constraint with a requirement.",
# No plan yet for when the new resolver becomes default
gone_in=None,
issue=8210
issue=8210,
)
return problem

View File

@ -13,46 +13,39 @@ logger = logging.getLogger(__name__)
class RequirementSet:
def __init__(self, check_supported_wheels: bool = True) -> None:
"""Create a RequirementSet."""
def __init__(self, check_supported_wheels=True):
# type: (bool) -> None
"""Create a RequirementSet.
"""
self.requirements = OrderedDict() # type: Dict[str, InstallRequirement]
self.requirements: Dict[str, InstallRequirement] = OrderedDict()
self.check_supported_wheels = check_supported_wheels
self.unnamed_requirements = [] # type: List[InstallRequirement]
self.unnamed_requirements: List[InstallRequirement] = []
def __str__(self):
# type: () -> str
def __str__(self) -> str:
requirements = sorted(
(req for req in self.requirements.values() if not req.comes_from),
key=lambda req: canonicalize_name(req.name or ""),
)
return ' '.join(str(req.req) for req in requirements)
return " ".join(str(req.req) for req in requirements)
def __repr__(self):
# type: () -> str
def __repr__(self) -> str:
requirements = sorted(
self.requirements.values(),
key=lambda req: canonicalize_name(req.name or ""),
)
format_string = '<{classname} object; {count} requirement(s): {reqs}>'
format_string = "<{classname} object; {count} requirement(s): {reqs}>"
return format_string.format(
classname=self.__class__.__name__,
count=len(requirements),
reqs=', '.join(str(req.req) for req in requirements),
reqs=", ".join(str(req.req) for req in requirements),
)
def add_unnamed_requirement(self, install_req):
# type: (InstallRequirement) -> None
def add_unnamed_requirement(self, install_req: InstallRequirement) -> None:
assert not install_req.name
self.unnamed_requirements.append(install_req)
def add_named_requirement(self, install_req):
# type: (InstallRequirement) -> None
def add_named_requirement(self, install_req: InstallRequirement) -> None:
assert install_req.name
project_name = canonicalize_name(install_req.name)
@ -60,11 +53,10 @@ class RequirementSet:
def add_requirement(
self,
install_req, # type: InstallRequirement
parent_req_name=None, # type: Optional[str]
extras_requested=None # type: Optional[Iterable[str]]
):
# type: (...) -> Tuple[List[InstallRequirement], Optional[InstallRequirement]]
install_req: InstallRequirement,
parent_req_name: Optional[str] = None,
extras_requested: Optional[Iterable[str]] = None,
) -> Tuple[List[InstallRequirement], Optional[InstallRequirement]]:
"""Add install_req as a requirement to install.
:param parent_req_name: The name of the requirement that needed this
@ -83,7 +75,8 @@ class RequirementSet:
if not install_req.match_markers(extras_requested):
logger.info(
"Ignoring %s: markers '%s' don't match your environment",
install_req.name, install_req.markers,
install_req.name,
install_req.markers,
)
return [], None
@ -94,16 +87,17 @@ class RequirementSet:
if install_req.link and install_req.link.is_wheel:
wheel = Wheel(install_req.link.filename)
tags = compatibility_tags.get_supported()
if (self.check_supported_wheels and not wheel.supported(tags)):
if self.check_supported_wheels and not wheel.supported(tags):
raise InstallationError(
"{} is not a supported wheel on this platform.".format(
wheel.filename)
wheel.filename
)
)
# This next bit is really a sanity check.
assert not install_req.user_supplied or parent_req_name is None, (
"a user supplied req shouldn't have a parent"
)
assert (
not install_req.user_supplied or parent_req_name is None
), "a user supplied req shouldn't have a parent"
# Unnamed requirements are scanned again and the requirement won't be
# added as a dependency until after scanning.
@ -112,24 +106,26 @@ class RequirementSet:
return [install_req], None
try:
existing_req = self.get_requirement(
install_req.name) # type: Optional[InstallRequirement]
existing_req: Optional[InstallRequirement] = self.get_requirement(
install_req.name
)
except KeyError:
existing_req = None
has_conflicting_requirement = (
parent_req_name is None and
existing_req and
not existing_req.constraint and
existing_req.extras == install_req.extras and
existing_req.req and
install_req.req and
existing_req.req.specifier != install_req.req.specifier
parent_req_name is None
and existing_req
and not existing_req.constraint
and existing_req.extras == install_req.extras
and existing_req.req
and install_req.req
and existing_req.req.specifier != install_req.req.specifier
)
if has_conflicting_requirement:
raise InstallationError(
"Double requirement given: {} (already in {}, name={!r})"
.format(install_req, existing_req, install_req.name)
"Double requirement given: {} (already in {}, name={!r})".format(
install_req, existing_req, install_req.name
)
)
# When no existing requirement exists, add the requirement as a
@ -144,12 +140,8 @@ class RequirementSet:
if install_req.constraint or not existing_req.constraint:
return [], existing_req
does_not_satisfy_constraint = (
install_req.link and
not (
existing_req.link and
install_req.link.path == existing_req.link.path
)
does_not_satisfy_constraint = install_req.link and not (
existing_req.link and install_req.link.path == existing_req.link.path
)
if does_not_satisfy_constraint:
raise InstallationError(
@ -164,28 +156,27 @@ class RequirementSet:
# mark the existing object as such.
if install_req.user_supplied:
existing_req.user_supplied = True
existing_req.extras = tuple(sorted(
set(existing_req.extras) | set(install_req.extras)
))
existing_req.extras = tuple(
sorted(set(existing_req.extras) | set(install_req.extras))
)
logger.debug(
"Setting %s extras to: %s",
existing_req, existing_req.extras,
existing_req,
existing_req.extras,
)
# Return the existing requirement for addition to the parent and
# scanning again.
return [existing_req], existing_req
def has_requirement(self, name):
# type: (str) -> bool
def has_requirement(self, name: str) -> bool:
project_name = canonicalize_name(name)
return (
project_name in self.requirements and
not self.requirements[project_name].constraint
project_name in self.requirements
and not self.requirements[project_name].constraint
)
def get_requirement(self, name):
# type: (str) -> InstallRequirement
def get_requirement(self, name: str) -> InstallRequirement:
project_name = canonicalize_name(name)
if project_name in self.requirements:
@ -194,6 +185,5 @@ class RequirementSet:
raise KeyError(f"No project with the name {name!r}")
@property
def all_requirements(self):
# type: () -> List[InstallRequirement]
def all_requirements(self) -> List[InstallRequirement]:
return self.unnamed_requirements + list(self.requirements.values())

View File

@ -13,13 +13,12 @@ logger = logging.getLogger(__name__)
@contextlib.contextmanager
def update_env_context_manager(**changes):
# type: (str) -> Iterator[None]
def update_env_context_manager(**changes: str) -> Iterator[None]:
target = os.environ
# Save values from the target and change them.
non_existent_marker = object()
saved_values = {} # type: Dict[str, Union[object, str]]
saved_values: Dict[str, Union[object, str]] = {}
for name, new_value in changes.items():
try:
saved_values[name] = target[name]
@ -40,14 +39,11 @@ def update_env_context_manager(**changes):
@contextlib.contextmanager
def get_requirement_tracker():
# type: () -> Iterator[RequirementTracker]
root = os.environ.get('PIP_REQ_TRACKER')
def get_requirement_tracker() -> Iterator["RequirementTracker"]:
root = os.environ.get("PIP_REQ_TRACKER")
with contextlib.ExitStack() as ctx:
if root is None:
root = ctx.enter_context(
TempDirectory(kind='req-tracker')
).path
root = ctx.enter_context(TempDirectory(kind="req-tracker")).path
ctx.enter_context(update_env_context_manager(PIP_REQ_TRACKER=root))
logger.debug("Initialized build tracking at %s", root)
@ -56,36 +52,29 @@ def get_requirement_tracker():
class RequirementTracker:
def __init__(self, root):
# type: (str) -> None
def __init__(self, root: str) -> None:
self._root = root
self._entries = set() # type: Set[InstallRequirement]
self._entries: Set[InstallRequirement] = set()
logger.debug("Created build tracker: %s", self._root)
def __enter__(self):
# type: () -> RequirementTracker
def __enter__(self) -> "RequirementTracker":
logger.debug("Entered build tracker: %s", self._root)
return self
def __exit__(
self,
exc_type, # type: Optional[Type[BaseException]]
exc_val, # type: Optional[BaseException]
exc_tb # type: Optional[TracebackType]
):
# type: (...) -> None
exc_type: Optional[Type[BaseException]],
exc_val: Optional[BaseException],
exc_tb: Optional[TracebackType],
) -> None:
self.cleanup()
def _entry_path(self, link):
# type: (Link) -> str
def _entry_path(self, link: Link) -> str:
hashed = hashlib.sha224(link.url_without_fragment.encode()).hexdigest()
return os.path.join(self._root, hashed)
def add(self, req):
# type: (InstallRequirement) -> None
"""Add an InstallRequirement to build tracking.
"""
def add(self, req: InstallRequirement) -> None:
"""Add an InstallRequirement to build tracking."""
assert req.link
# Get the file to write information about this requirement.
@ -99,42 +88,37 @@ class RequirementTracker:
except FileNotFoundError:
pass
else:
message = '{} is already being built: {}'.format(
req.link, contents)
message = "{} is already being built: {}".format(req.link, contents)
raise LookupError(message)
# If we're here, req should really not be building already.
assert req not in self._entries
# Start tracking this requirement.
with open(entry_path, 'w', encoding="utf-8") as fp:
with open(entry_path, "w", encoding="utf-8") as fp:
fp.write(str(req))
self._entries.add(req)
logger.debug('Added %s to build tracker %r', req, self._root)
logger.debug("Added %s to build tracker %r", req, self._root)
def remove(self, req):
# type: (InstallRequirement) -> None
"""Remove an InstallRequirement from build tracking.
"""
def remove(self, req: InstallRequirement) -> None:
"""Remove an InstallRequirement from build tracking."""
assert req.link
# Delete the created file and the corresponding entries.
os.unlink(self._entry_path(req.link))
self._entries.remove(req)
logger.debug('Removed %s from build tracker %r', req, self._root)
logger.debug("Removed %s from build tracker %r", req, self._root)
def cleanup(self):
# type: () -> None
def cleanup(self) -> None:
for req in set(self._entries):
self.remove(req)
logger.debug("Removed build tracker: %r", self._root)
@contextlib.contextmanager
def track(self, req):
# type: (InstallRequirement) -> Iterator[None]
def track(self, req: InstallRequirement) -> Iterator[None]:
self.add(req)
yield
self.remove(req)

View File

@ -28,8 +28,7 @@ from pip._internal.utils.temp_dir import AdjacentTempDirectory, TempDirectory
logger = getLogger(__name__)
def _script_names(dist, script_name, is_gui):
# type: (Distribution, str, bool) -> List[str]
def _script_names(dist: Distribution, script_name: str, is_gui: bool) -> List[str]:
"""Create the fully qualified name of the files created by
{console,gui}_scripts for the given ``dist``.
Returns the list of file names
@ -41,31 +40,29 @@ def _script_names(dist, script_name, is_gui):
exe_name = os.path.join(bin_dir, script_name)
paths_to_remove = [exe_name]
if WINDOWS:
paths_to_remove.append(exe_name + '.exe')
paths_to_remove.append(exe_name + '.exe.manifest')
paths_to_remove.append(exe_name + ".exe")
paths_to_remove.append(exe_name + ".exe.manifest")
if is_gui:
paths_to_remove.append(exe_name + '-script.pyw')
paths_to_remove.append(exe_name + "-script.pyw")
else:
paths_to_remove.append(exe_name + '-script.py')
paths_to_remove.append(exe_name + "-script.py")
return paths_to_remove
def _unique(fn):
# type: (Callable[..., Iterator[Any]]) -> Callable[..., Iterator[Any]]
def _unique(fn: Callable[..., Iterator[Any]]) -> Callable[..., Iterator[Any]]:
@functools.wraps(fn)
def unique(*args, **kw):
# type: (Any, Any) -> Iterator[Any]
seen = set() # type: Set[Any]
def unique(*args: Any, **kw: Any) -> Iterator[Any]:
seen: Set[Any] = set()
for item in fn(*args, **kw):
if item not in seen:
seen.add(item)
yield item
return unique
@_unique
def uninstallation_paths(dist):
# type: (Distribution) -> Iterator[str]
def uninstallation_paths(dist: Distribution) -> Iterator[str]:
"""
Yield all the uninstallation paths for dist based on RECORD-without-.py[co]
@ -80,45 +77,46 @@ def uninstallation_paths(dist):
https://packaging.python.org/specifications/recording-installed-packages/
"""
try:
r = csv.reader(dist.get_metadata_lines('RECORD'))
r = csv.reader(dist.get_metadata_lines("RECORD"))
except FileNotFoundError as missing_record_exception:
msg = 'Cannot uninstall {dist}, RECORD file not found.'.format(dist=dist)
msg = "Cannot uninstall {dist}, RECORD file not found.".format(dist=dist)
try:
installer = next(dist.get_metadata_lines('INSTALLER'))
if not installer or installer == 'pip':
installer = next(dist.get_metadata_lines("INSTALLER"))
if not installer or installer == "pip":
raise ValueError()
except (OSError, StopIteration, ValueError):
dep = '{}=={}'.format(dist.project_name, dist.version)
msg += (" You might be able to recover from this via: "
"'pip install --force-reinstall --no-deps {}'.".format(dep))
dep = "{}=={}".format(dist.project_name, dist.version)
msg += (
" You might be able to recover from this via: "
"'pip install --force-reinstall --no-deps {}'.".format(dep)
)
else:
msg += ' Hint: The package was installed by {}.'.format(installer)
msg += " Hint: The package was installed by {}.".format(installer)
raise UninstallationError(msg) from missing_record_exception
for row in r:
path = os.path.join(dist.location, row[0])
yield path
if path.endswith('.py'):
if path.endswith(".py"):
dn, fn = os.path.split(path)
base = fn[:-3]
path = os.path.join(dn, base + '.pyc')
path = os.path.join(dn, base + ".pyc")
yield path
path = os.path.join(dn, base + '.pyo')
path = os.path.join(dn, base + ".pyo")
yield path
def compact(paths):
# type: (Iterable[str]) -> Set[str]
def compact(paths: Iterable[str]) -> Set[str]:
"""Compact a path set to contain the minimal number of paths
necessary to contain all paths in the set. If /a/path/ and
/a/path/to/a/file.txt are both in the set, leave only the
shorter path."""
sep = os.path.sep
short_paths = set() # type: Set[str]
short_paths: Set[str] = set()
for path in sorted(paths, key=len):
should_skip = any(
path.startswith(shortpath.rstrip("*")) and
path[len(shortpath.rstrip("*").rstrip(sep))] == sep
path.startswith(shortpath.rstrip("*"))
and path[len(shortpath.rstrip("*").rstrip(sep))] == sep
for shortpath in short_paths
)
if not should_skip:
@ -126,8 +124,7 @@ def compact(paths):
return short_paths
def compress_for_rename(paths):
# type: (Iterable[str]) -> Set[str]
def compress_for_rename(paths: Iterable[str]) -> Set[str]:
"""Returns a set containing the paths that need to be renamed.
This set may include directories when the original sequence of paths
@ -136,25 +133,21 @@ def compress_for_rename(paths):
case_map = {os.path.normcase(p): p for p in paths}
remaining = set(case_map)
unchecked = sorted({os.path.split(p)[0] for p in case_map.values()}, key=len)
wildcards = set() # type: Set[str]
wildcards: Set[str] = set()
def norm_join(*a):
# type: (str) -> str
def norm_join(*a: str) -> str:
return os.path.normcase(os.path.join(*a))
for root in unchecked:
if any(os.path.normcase(root).startswith(w)
for w in wildcards):
if any(os.path.normcase(root).startswith(w) for w in wildcards):
# This directory has already been handled.
continue
all_files = set() # type: Set[str]
all_subdirs = set() # type: Set[str]
all_files: Set[str] = set()
all_subdirs: Set[str] = set()
for dirname, subdirs, files in os.walk(root):
all_subdirs.update(norm_join(root, dirname, d)
for d in subdirs)
all_files.update(norm_join(root, dirname, f)
for f in files)
all_subdirs.update(norm_join(root, dirname, d) for d in subdirs)
all_files.update(norm_join(root, dirname, f) for f in files)
# If all the files we found are in our remaining set of files to
# remove, then remove them from the latter set and add a wildcard
# for the directory.
@ -165,8 +158,7 @@ def compress_for_rename(paths):
return set(map(case_map.__getitem__, remaining)) | wildcards
def compress_for_output_listing(paths):
# type: (Iterable[str]) -> Tuple[Set[str], Set[str]]
def compress_for_output_listing(paths: Iterable[str]) -> Tuple[Set[str], Set[str]]:
"""Returns a tuple of 2 sets of which paths to display to user
The first set contains paths that would be deleted. Files of a package
@ -204,14 +196,14 @@ def compress_for_output_listing(paths):
continue
file_ = os.path.join(dirpath, fname)
if (os.path.isfile(file_) and
os.path.normcase(file_) not in _normcased_files):
if (
os.path.isfile(file_)
and os.path.normcase(file_) not in _normcased_files
):
# We are skipping this file. Add it to the set.
will_skip.add(file_)
will_remove = files | {
os.path.join(folder, "*") for folder in folders
}
will_remove = files | {os.path.join(folder, "*") for folder in folders}
return will_remove, will_skip
@ -219,32 +211,30 @@ def compress_for_output_listing(paths):
class StashedUninstallPathSet:
"""A set of file rename operations to stash files while
tentatively uninstalling them."""
def __init__(self):
# type: () -> None
def __init__(self) -> None:
# Mapping from source file root to [Adjacent]TempDirectory
# for files under that directory.
self._save_dirs = {} # type: Dict[str, TempDirectory]
self._save_dirs: Dict[str, TempDirectory] = {}
# (old path, new path) tuples for each move that may need
# to be undone.
self._moves = [] # type: List[Tuple[str, str]]
self._moves: List[Tuple[str, str]] = []
def _get_directory_stash(self, path):
# type: (str) -> str
def _get_directory_stash(self, path: str) -> str:
"""Stashes a directory.
Directories are stashed adjacent to their original location if
possible, or else moved/copied into the user's temp dir."""
try:
save_dir = AdjacentTempDirectory(path) # type: TempDirectory
save_dir: TempDirectory = AdjacentTempDirectory(path)
except OSError:
save_dir = TempDirectory(kind="uninstall")
self._save_dirs[os.path.normcase(path)] = save_dir
return save_dir.path
def _get_file_stash(self, path):
# type: (str) -> str
def _get_file_stash(self, path: str) -> str:
"""Stashes a file.
If no root has been provided, one will be created for the directory
@ -263,7 +253,7 @@ class StashedUninstallPathSet:
else:
# Did not find any suitable root
head = os.path.dirname(path)
save_dir = TempDirectory(kind='uninstall')
save_dir = TempDirectory(kind="uninstall")
self._save_dirs[head] = save_dir
relpath = os.path.relpath(path, head)
@ -271,8 +261,7 @@ class StashedUninstallPathSet:
return os.path.join(save_dir.path, relpath)
return save_dir.path
def stash(self, path):
# type: (str) -> str
def stash(self, path: str) -> str:
"""Stashes the directory or file and returns its new location.
Handle symlinks as files to avoid modifying the symlink targets.
"""
@ -283,7 +272,7 @@ class StashedUninstallPathSet:
new_path = self._get_file_stash(path)
self._moves.append((path, new_path))
if (path_is_dir and os.path.isdir(new_path)):
if path_is_dir and os.path.isdir(new_path):
# If we're moving a directory, we need to
# remove the destination first or else it will be
# moved to inside the existing directory.
@ -293,23 +282,21 @@ class StashedUninstallPathSet:
renames(path, new_path)
return new_path
def commit(self):
# type: () -> None
def commit(self) -> None:
"""Commits the uninstall by removing stashed files."""
for _, save_dir in self._save_dirs.items():
save_dir.cleanup()
self._moves = []
self._save_dirs = {}
def rollback(self):
# type: () -> None
def rollback(self) -> None:
"""Undoes the uninstall by moving stashed files back."""
for p in self._moves:
logger.info("Moving to %s\n from %s", *p)
for new_path, path in self._moves:
try:
logger.debug('Replacing %s from %s', new_path, path)
logger.debug("Replacing %s from %s", new_path, path)
if os.path.isfile(new_path) or os.path.islink(new_path):
os.unlink(new_path)
elif os.path.isdir(new_path):
@ -322,24 +309,22 @@ class StashedUninstallPathSet:
self.commit()
@property
def can_rollback(self):
# type: () -> bool
def can_rollback(self) -> bool:
return bool(self._moves)
class UninstallPathSet:
"""A set of file paths to be removed in the uninstallation of a
requirement."""
def __init__(self, dist):
# type: (Distribution) -> None
self.paths = set() # type: Set[str]
self._refuse = set() # type: Set[str]
self.pth = {} # type: Dict[str, UninstallPthEntries]
def __init__(self, dist: Distribution) -> None:
self.paths: Set[str] = set()
self._refuse: Set[str] = set()
self.pth: Dict[str, UninstallPthEntries] = {}
self.dist = dist
self._moved_paths = StashedUninstallPathSet()
def _permitted(self, path):
# type: (str) -> bool
def _permitted(self, path: str) -> bool:
"""
Return True if the given path is one we are permitted to
remove/modify, False otherwise.
@ -347,8 +332,7 @@ class UninstallPathSet:
"""
return is_local(path)
def add(self, path):
# type: (str) -> None
def add(self, path: str) -> None:
head, tail = os.path.split(path)
# we normalize the head to resolve parent directory symlinks, but not
@ -364,11 +348,10 @@ class UninstallPathSet:
# __pycache__ files can show up after 'installed-files.txt' is created,
# due to imports
if os.path.splitext(path)[1] == '.py':
if os.path.splitext(path)[1] == ".py":
self.add(cache_from_source(path))
def add_pth(self, pth_file, entry):
# type: (str, str) -> None
def add_pth(self, pth_file: str, entry: str) -> None:
pth_file = normalize_path(pth_file)
if self._permitted(pth_file):
if pth_file not in self.pth:
@ -377,8 +360,7 @@ class UninstallPathSet:
else:
self._refuse.add(pth_file)
def remove(self, auto_confirm=False, verbose=False):
# type: (bool, bool) -> None
def remove(self, auto_confirm: bool = False, verbose: bool = False) -> None:
"""Remove paths in ``self.paths`` with confirmation (unless
``auto_confirm`` is True)."""
@ -389,10 +371,8 @@ class UninstallPathSet:
)
return
dist_name_version = (
self.dist.project_name + "-" + self.dist.version
)
logger.info('Uninstalling %s:', dist_name_version)
dist_name_version = self.dist.project_name + "-" + self.dist.version
logger.info("Uninstalling %s:", dist_name_version)
with indent_log():
if auto_confirm or self._allowed_to_proceed(verbose):
@ -402,20 +382,17 @@ class UninstallPathSet:
for path in sorted(compact(for_rename)):
moved.stash(path)
logger.verbose('Removing file or directory %s', path)
logger.verbose("Removing file or directory %s", path)
for pth in self.pth.values():
pth.remove()
logger.info('Successfully uninstalled %s', dist_name_version)
logger.info("Successfully uninstalled %s", dist_name_version)
def _allowed_to_proceed(self, verbose):
# type: (bool) -> bool
"""Display which files would be deleted and prompt for confirmation
"""
def _allowed_to_proceed(self, verbose: bool) -> bool:
"""Display which files would be deleted and prompt for confirmation"""
def _display(msg, paths):
# type: (str, Iterable[str]) -> None
def _display(msg: str, paths: Iterable[str]) -> None:
if not paths:
return
@ -432,16 +409,15 @@ class UninstallPathSet:
will_remove = set(self.paths)
will_skip = set()
_display('Would remove:', will_remove)
_display('Would not remove (might be manually added):', will_skip)
_display('Would not remove (outside of prefix):', self._refuse)
_display("Would remove:", will_remove)
_display("Would not remove (might be manually added):", will_skip)
_display("Would not remove (outside of prefix):", self._refuse)
if verbose:
_display('Will actually move:', compress_for_rename(self.paths))
_display("Will actually move:", compress_for_rename(self.paths))
return ask('Proceed (Y/n)? ', ('y', 'n', '')) != 'n'
return ask("Proceed (Y/n)? ", ("y", "n", "")) != "n"
def rollback(self):
# type: () -> None
def rollback(self) -> None:
"""Rollback the changes previously made by remove()."""
if not self._moved_paths.can_rollback:
logger.error(
@ -449,19 +425,17 @@ class UninstallPathSet:
self.dist.project_name,
)
return
logger.info('Rolling back uninstall of %s', self.dist.project_name)
logger.info("Rolling back uninstall of %s", self.dist.project_name)
self._moved_paths.rollback()
for pth in self.pth.values():
pth.rollback()
def commit(self):
# type: () -> None
def commit(self) -> None:
"""Remove temporary save dir: rollback will no longer be possible."""
self._moved_paths.commit()
@classmethod
def from_dist(cls, dist):
# type: (Distribution) -> UninstallPathSet
def from_dist(cls, dist: Distribution) -> "UninstallPathSet":
dist_path = normalize_path(dist.location)
if not dist_is_local(dist):
logger.info(
@ -472,9 +446,11 @@ class UninstallPathSet:
)
return cls(dist)
if dist_path in {p for p in {sysconfig.get_path("stdlib"),
sysconfig.get_path("platstdlib")}
if p}:
if dist_path in {
p
for p in {sysconfig.get_path("stdlib"), sysconfig.get_path("platstdlib")}
if p
}:
logger.info(
"Not uninstalling %s at %s, as it is in the standard library.",
dist.key,
@ -484,43 +460,47 @@ class UninstallPathSet:
paths_to_remove = cls(dist)
develop_egg_link = egg_link_path(dist)
develop_egg_link_egg_info = '{}.egg-info'.format(
pkg_resources.to_filename(dist.project_name))
develop_egg_link_egg_info = "{}.egg-info".format(
pkg_resources.to_filename(dist.project_name)
)
egg_info_exists = dist.egg_info and os.path.exists(dist.egg_info)
# Special case for distutils installed package
distutils_egg_info = getattr(dist._provider, 'path', None)
distutils_egg_info = getattr(dist._provider, "path", None)
# Uninstall cases order do matter as in the case of 2 installs of the
# same package, pip needs to uninstall the currently detected version
if (egg_info_exists and dist.egg_info.endswith('.egg-info') and
not dist.egg_info.endswith(develop_egg_link_egg_info)):
if (
egg_info_exists
and dist.egg_info.endswith(".egg-info")
and not dist.egg_info.endswith(develop_egg_link_egg_info)
):
# if dist.egg_info.endswith(develop_egg_link_egg_info), we
# are in fact in the develop_egg_link case
paths_to_remove.add(dist.egg_info)
if dist.has_metadata('installed-files.txt'):
if dist.has_metadata("installed-files.txt"):
for installed_file in dist.get_metadata(
'installed-files.txt').splitlines():
path = os.path.normpath(
os.path.join(dist.egg_info, installed_file)
)
"installed-files.txt"
).splitlines():
path = os.path.normpath(os.path.join(dist.egg_info, installed_file))
paths_to_remove.add(path)
# FIXME: need a test for this elif block
# occurs with --single-version-externally-managed/--record outside
# of pip
elif dist.has_metadata('top_level.txt'):
if dist.has_metadata('namespace_packages.txt'):
namespaces = dist.get_metadata('namespace_packages.txt')
elif dist.has_metadata("top_level.txt"):
if dist.has_metadata("namespace_packages.txt"):
namespaces = dist.get_metadata("namespace_packages.txt")
else:
namespaces = []
for top_level_pkg in [
p for p
in dist.get_metadata('top_level.txt').splitlines()
if p and p not in namespaces]:
p
for p in dist.get_metadata("top_level.txt").splitlines()
if p and p not in namespaces
]:
path = os.path.join(dist.location, top_level_pkg)
paths_to_remove.add(path)
paths_to_remove.add(path + '.py')
paths_to_remove.add(path + '.pyc')
paths_to_remove.add(path + '.pyo')
paths_to_remove.add(path + ".py")
paths_to_remove.add(path + ".pyc")
paths_to_remove.add(path + ".pyo")
elif distutils_egg_info:
raise UninstallationError(
@ -531,17 +511,18 @@ class UninstallPathSet:
)
)
elif dist.location.endswith('.egg'):
elif dist.location.endswith(".egg"):
# package installed by easy_install
# We cannot match on dist.egg_name because it can slightly vary
# i.e. setuptools-0.6c11-py2.6.egg vs setuptools-0.6rc11-py2.6.egg
paths_to_remove.add(dist.location)
easy_install_egg = os.path.split(dist.location)[1]
easy_install_pth = os.path.join(os.path.dirname(dist.location),
'easy-install.pth')
paths_to_remove.add_pth(easy_install_pth, './' + easy_install_egg)
easy_install_pth = os.path.join(
os.path.dirname(dist.location), "easy-install.pth"
)
paths_to_remove.add_pth(easy_install_pth, "./" + easy_install_egg)
elif egg_info_exists and dist.egg_info.endswith('.dist-info'):
elif egg_info_exists and dist.egg_info.endswith(".dist-info"):
for path in uninstallation_paths(dist):
paths_to_remove.add(path)
@ -549,40 +530,42 @@ class UninstallPathSet:
# develop egg
with open(develop_egg_link) as fh:
link_pointer = os.path.normcase(fh.readline().strip())
assert (link_pointer == dist.location), (
'Egg-link {} does not match installed location of {} '
'(at {})'.format(
link_pointer, dist.project_name, dist.location)
assert (
link_pointer == dist.location
), "Egg-link {} does not match installed location of {} (at {})".format(
link_pointer, dist.project_name, dist.location
)
paths_to_remove.add(develop_egg_link)
easy_install_pth = os.path.join(os.path.dirname(develop_egg_link),
'easy-install.pth')
easy_install_pth = os.path.join(
os.path.dirname(develop_egg_link), "easy-install.pth"
)
paths_to_remove.add_pth(easy_install_pth, dist.location)
else:
logger.debug(
'Not sure how to uninstall: %s - Check: %s',
dist, dist.location,
"Not sure how to uninstall: %s - Check: %s",
dist,
dist.location,
)
# find distutils scripts= scripts
if dist.has_metadata('scripts') and dist.metadata_isdir('scripts'):
for script in dist.metadata_listdir('scripts'):
if dist.has_metadata("scripts") and dist.metadata_isdir("scripts"):
for script in dist.metadata_listdir("scripts"):
if dist_in_usersite(dist):
bin_dir = get_bin_user()
else:
bin_dir = get_bin_prefix()
paths_to_remove.add(os.path.join(bin_dir, script))
if WINDOWS:
paths_to_remove.add(os.path.join(bin_dir, script) + '.bat')
paths_to_remove.add(os.path.join(bin_dir, script) + ".bat")
# find console_scripts
_scripts_to_remove = []
console_scripts = dist.get_entry_map(group='console_scripts')
console_scripts = dist.get_entry_map(group="console_scripts")
for name in console_scripts.keys():
_scripts_to_remove.extend(_script_names(dist, name, False))
# find gui_scripts
gui_scripts = dist.get_entry_map(group='gui_scripts')
gui_scripts = dist.get_entry_map(group="gui_scripts")
for name in gui_scripts.keys():
_scripts_to_remove.extend(_script_names(dist, name, True))
@ -593,14 +576,12 @@ class UninstallPathSet:
class UninstallPthEntries:
def __init__(self, pth_file):
# type: (str) -> None
def __init__(self, pth_file: str) -> None:
self.file = pth_file
self.entries = set() # type: Set[str]
self._saved_lines = None # type: Optional[List[bytes]]
self.entries: Set[str] = set()
self._saved_lines: Optional[List[bytes]] = None
def add(self, entry):
# type: (str) -> None
def add(self, entry: str) -> None:
entry = os.path.normcase(entry)
# On Windows, os.path.normcase converts the entry to use
# backslashes. This is correct for entries that describe absolute
@ -612,47 +593,41 @@ class UninstallPthEntries:
# have more than "\\sever\share". Valid examples: "\\server\share\" or
# "\\server\share\folder".
if WINDOWS and not os.path.splitdrive(entry)[0]:
entry = entry.replace('\\', '/')
entry = entry.replace("\\", "/")
self.entries.add(entry)
def remove(self):
# type: () -> None
logger.verbose('Removing pth entries from %s:', self.file)
def remove(self) -> None:
logger.verbose("Removing pth entries from %s:", self.file)
# If the file doesn't exist, log a warning and return
if not os.path.isfile(self.file):
logger.warning(
"Cannot remove entries from nonexistent file %s", self.file
)
logger.warning("Cannot remove entries from nonexistent file %s", self.file)
return
with open(self.file, 'rb') as fh:
with open(self.file, "rb") as fh:
# windows uses '\r\n' with py3k, but uses '\n' with py2.x
lines = fh.readlines()
self._saved_lines = lines
if any(b'\r\n' in line for line in lines):
endline = '\r\n'
if any(b"\r\n" in line for line in lines):
endline = "\r\n"
else:
endline = '\n'
endline = "\n"
# handle missing trailing newline
if lines and not lines[-1].endswith(endline.encode("utf-8")):
lines[-1] = lines[-1] + endline.encode("utf-8")
for entry in self.entries:
try:
logger.verbose('Removing entry: %s', entry)
logger.verbose("Removing entry: %s", entry)
lines.remove((entry + endline).encode("utf-8"))
except ValueError:
pass
with open(self.file, 'wb') as fh:
with open(self.file, "wb") as fh:
fh.writelines(lines)
def rollback(self):
# type: () -> bool
def rollback(self) -> bool:
if self._saved_lines is None:
logger.error(
'Cannot roll back changes to %s, none were made', self.file
)
logger.error("Cannot roll back changes to %s, none were made", self.file)
return False
logger.debug('Rolling %s back to previous state', self.file)
with open(self.file, 'wb') as fh:
logger.debug("Rolling %s back to previous state", self.file)
with open(self.file, "wb") as fh:
fh.writelines(self._saved_lines)
return True

View File

@ -83,7 +83,7 @@ def _check_dist_requires_python(
version = ".".join(map(str, version_info))
if ignore_requires_python:
logger.debug(
"Ignoring failed Requires-Python check for package %r: " "%s not in %r",
"Ignoring failed Requires-Python check for package %r: %s not in %r",
dist.project_name,
version,
requires_python,
@ -344,7 +344,7 @@ class Resolver(BaseResolver):
self._set_req_to_reinstall(req)
else:
logger.info(
"Requirement already satisfied (use --upgrade to upgrade):" " %s",
"Requirement already satisfied (use --upgrade to upgrade): %s",
req,
)
return dist

View File

@ -143,6 +143,21 @@ class PipProvider(_ProviderBase):
identifier,
)
def _get_constraint(self, identifier: str) -> Constraint:
if identifier in self._constraints:
return self._constraints[identifier]
# HACK: Theoratically we should check whether this identifier is a valid
# "NAME[EXTRAS]" format, and parse out the name part with packaging or
# some regular expression. But since pip's resolver only spits out
# three kinds of identifiers: normalized PEP 503 names, normalized names
# plus extras, and Requires-Python, we can cheat a bit here.
name, open_bracket, _ = identifier.partition("[")
if open_bracket and name in self._constraints:
return self._constraints[name]
return Constraint.empty()
def find_matches(
self,
identifier: str,
@ -169,7 +184,7 @@ class PipProvider(_ProviderBase):
return self._factory.find_candidates(
identifier=identifier,
requirements=requirements,
constraint=self._constraints.get(identifier, Constraint.empty()),
constraint=self._get_constraint(identifier),
prefers_installed=(not _eligible_for_upgrade(identifier)),
incompatibilities=incompatibilities,
)

View File

@ -151,7 +151,7 @@ class Resolver(BaseResolver):
deprecated(
reason=reason,
replacement=replacement,
gone_in="21.2",
gone_in="21.3",
issue=8711,
)

View File

@ -11,6 +11,15 @@ from pip._vendor.packaging.version import parse
from pip import __version__ as current_version
DEPRECATION_MSG_PREFIX = "DEPRECATION: "
DEPRECATION_MESSAGE = DEPRECATION_MSG_PREFIX + "{reason}"
GONE_IN_MESSAGE_FUTURE = "pip {gone_in} will enforce this behavior change."
GONE_IN_MESSAGE_PAST = "This behavior change has been enforced since pip {gone_in}."
REPLACEMENT_MESSAGE = "A possible replacement is {replacement}."
FEATURE_FLAG_MESSAGE = (
"You can temporarily use the flag --use-feature={feature_flag} "
"to test the upcoming behavior."
)
ISSUE_MESSAGE = "Discussion can be found at https://github.com/pypa/pip/issues/{issue}."
class PipDeprecationWarning(Warning):
@ -56,20 +65,24 @@ def deprecated(
reason: str,
replacement: Optional[str],
gone_in: Optional[str],
feature_flag: Optional[str] = None,
issue: Optional[int] = None,
) -> None:
"""Helper to deprecate existing functionality.
reason:
Textual reason shown to the user about why this functionality has
been deprecated.
been deprecated. Should be a complete sentence.
replacement:
Textual suggestion shown to the user about what alternative
functionality they can use.
gone_in:
The version of pip does this functionality should get removed in.
Raises errors if pip's current version is greater than or equal to
Raises an error if pip's current version is greater than or equal to
this.
feature_flag:
Command-line flag of the form --use-feature={feature_flag} for testing
upcoming functionality.
issue:
Issue number on the tracker that would serve as a useful place for
users to find related discussion and provide feedback.
@ -77,28 +90,38 @@ def deprecated(
Always pass replacement, gone_in and issue as keyword arguments for clarity
at the call site.
"""
# Determine whether or not the feature is already gone in this version.
is_gone = gone_in is not None and parse(current_version) >= parse(gone_in)
# Allow variable substitutions within the "reason" variable.
formatted_reason = reason.format(gone_in=gone_in)
# Construct a nice message.
# This is eagerly formatted as we want it to get logged as if someone
# typed this entire message out.
sentences = [
(reason, DEPRECATION_MSG_PREFIX + "{}"),
(gone_in, "pip {} will remove support for this functionality."),
(replacement, "A possible replacement is {}."),
(
issue,
(
"You can find discussion regarding this at "
"https://github.com/pypa/pip/issues/{}."
),
),
]
message = " ".join(
template.format(val) for val, template in sentences if val is not None
formatted_deprecation_message = DEPRECATION_MESSAGE.format(reason=formatted_reason)
gone_in_message = GONE_IN_MESSAGE_PAST if is_gone else GONE_IN_MESSAGE_FUTURE
formatted_gone_in_message = (
gone_in_message.format(gone_in=gone_in) if gone_in else None
)
formatted_replacement_message = (
REPLACEMENT_MESSAGE.format(replacement=replacement) if replacement else None
)
formatted_feature_flag_message = (
None
if is_gone or not feature_flag
else FEATURE_FLAG_MESSAGE.format(feature_flag=feature_flag)
)
formatted_issue_message = ISSUE_MESSAGE.format(issue=issue) if issue else None
sentences = [
formatted_deprecation_message,
formatted_gone_in_message,
formatted_replacement_message,
formatted_feature_flag_message,
formatted_issue_message,
]
message = " ".join(sentence for sentence in sentences if sentence)
# Raise as an error if it has to be removed.
if gone_in is not None and parse(current_version) >= parse(gone_in):
# Raise as an error if the functionality is gone.
if is_gone:
raise PipDeprecationWarning(message)
warnings.warn(message, category=PipDeprecationWarning, stacklevel=2)
else:
warnings.warn(message, category=PipDeprecationWarning, stacklevel=2)

View File

@ -190,7 +190,7 @@ class RetryError(Exception):
self.last_attempt = last_attempt
super().__init__(last_attempt)
def reraise(self) -> t.NoReturn:
def reraise(self) -> "t.NoReturn":
if self.last_attempt.failed:
raise self.last_attempt.result()
raise self

View File

@ -1835,8 +1835,8 @@ def test_invalid_index_url_argument(script, shared_data):
shared_data.find_links3, "Dinner",
expect_error=True)
assert 'WARNING: The index url "--user" seems invalid, ' \
'please provide a scheme.' in result.stderr, str(result)
assert ('WARNING: The index url "--user" seems invalid, '
'please provide a scheme.') in result.stderr, str(result)
def test_valid_index_url_argument(script, shared_data):

Some files were not shown because too many files have changed in this diff Show More