1
1
Fork 0
mirror of https://github.com/pypa/pip synced 2023-12-13 21:30:23 +01:00

Merge pull request #10709 from uranusjr/metadata-importlib-backend

This commit is contained in:
Pradyun Gedam 2022-04-15 12:49:32 +01:00 committed by GitHub
commit c6e274e7fd
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
14 changed files with 635 additions and 81 deletions

View file

@ -218,3 +218,38 @@ jobs:
--verbose --numprocesses auto --showlocals
env:
TEMP: "R:\\Temp"
tests-importlib-metadata:
name: tests for importlib.metadata backend
runs-on: ubuntu-latest
env:
_PIP_METADATA_BACKEND_IMPORTLIB: egg-compat
needs: [pre-commit, packaging, determine-changes]
if: >-
needs.determine-changes.outputs.tests == 'true' ||
github.event_name != 'pull_request'
steps:
- uses: actions/checkout@v2
- uses: actions/setup-python@v2
with:
python-version: '3.10'
- name: Install Ubuntu dependencies
run: sudo apt-get install bzr
- run: pip install nox 'virtualenv<20'
# Main check
- name: Run unit tests
run: >-
nox -s test-3.10 --
-m unit
--verbose --numprocesses auto --showlocals
- name: Run integration tests
run: >-
nox -s test-3.10 --
-m integration
--verbose --numprocesses auto --showlocals
--durations=5

3
news/10709.process.rst Normal file
View file

@ -0,0 +1,3 @@
Start migration of distribution metadata implementation from ``pkg_resources``
to ``importlib.metadata``. The new implementation is currently not exposed in
any user-facing way, but included in the code base for easier development.

View file

@ -76,7 +76,7 @@ def search_packages_info(query: List[str]) -> Generator[_PackageInfo, None, None
"""
env = get_default_environment()
installed = {dist.canonical_name: dist for dist in env.iter_distributions()}
installed = {dist.canonical_name: dist for dist in env.iter_all_distributions()}
query_names = [canonicalize_name(name) for name in query]
missing = sorted(
[name for name, pkg in zip(query, query_names) if pkg not in installed]

View file

@ -1,7 +1,14 @@
from typing import List, Optional
import functools
import os
from typing import TYPE_CHECKING, List, Optional, Type, cast
from .base import BaseDistribution, BaseEnvironment, FilesystemWheel, MemoryWheel, Wheel
if TYPE_CHECKING:
from typing import Protocol
else:
Protocol = object
__all__ = [
"BaseDistribution",
"BaseEnvironment",
@ -11,9 +18,26 @@ __all__ = [
"get_default_environment",
"get_environment",
"get_wheel_distribution",
"select_backend",
]
class Backend(Protocol):
Distribution: Type[BaseDistribution]
Environment: Type[BaseEnvironment]
@functools.lru_cache(maxsize=None)
def select_backend() -> Backend:
if os.environ.get("_PIP_METADATA_BACKEND_IMPORTLIB"):
from . import importlib
return cast(Backend, importlib)
from . import pkg_resources
return cast(Backend, pkg_resources)
def get_default_environment() -> BaseEnvironment:
"""Get the default representation for the current environment.
@ -21,9 +45,7 @@ def get_default_environment() -> BaseEnvironment:
Environment instance should be built from ``sys.path`` and may use caching
to share instance state accorss calls.
"""
from .pkg_resources import Environment
return Environment.default()
return select_backend().Environment.default()
def get_environment(paths: Optional[List[str]]) -> BaseEnvironment:
@ -33,9 +55,7 @@ def get_environment(paths: Optional[List[str]]) -> BaseEnvironment:
given import paths. The backend must build a fresh instance representing
the state of installed distributions when this function is called.
"""
from .pkg_resources import Environment
return Environment.from_paths(paths)
return select_backend().Environment.from_paths(paths)
def get_directory_distribution(directory: str) -> BaseDistribution:
@ -44,9 +64,7 @@ def get_directory_distribution(directory: str) -> BaseDistribution:
This returns a Distribution instance from the chosen backend based on
the given on-disk ``.dist-info`` directory.
"""
from .pkg_resources import Distribution
return Distribution.from_directory(directory)
return select_backend().Distribution.from_directory(directory)
def get_wheel_distribution(wheel: Wheel, canonical_name: str) -> BaseDistribution:
@ -57,6 +75,4 @@ def get_wheel_distribution(wheel: Wheel, canonical_name: str) -> BaseDistributio
:param canonical_name: Normalized project name of the given wheel.
"""
from .pkg_resources import Distribution
return Distribution.from_wheel(wheel, canonical_name)
return select_backend().Distribution.from_wheel(wheel, canonical_name)

View file

@ -10,7 +10,6 @@ from typing import (
TYPE_CHECKING,
Collection,
Container,
Generator,
Iterable,
Iterator,
List,
@ -32,10 +31,7 @@ from pip._internal.models.direct_url import (
DirectUrlValidationError,
)
from pip._internal.utils.compat import stdlib_pkgs # TODO: Move definition here.
from pip._internal.utils.egg_link import (
egg_link_path_from_location,
egg_link_path_from_sys_path,
)
from pip._internal.utils.egg_link import egg_link_path_from_sys_path
from pip._internal.utils.misc import is_local, normalize_path
from pip._internal.utils.urls import url_to_path
@ -46,7 +42,7 @@ else:
DistributionVersion = Union[LegacyVersion, Version]
InfoPath = Union[str, pathlib.PurePosixPath]
InfoPath = Union[str, pathlib.PurePath]
logger = logging.getLogger(__name__)
@ -96,6 +92,28 @@ def _convert_installed_files_path(
class BaseDistribution(Protocol):
@classmethod
def from_directory(cls, directory: str) -> "BaseDistribution":
"""Load the distribution from a metadata directory.
:param directory: Path to a metadata directory, e.g. ``.dist-info``.
"""
raise NotImplementedError()
@classmethod
def from_wheel(cls, wheel: "Wheel", name: str) -> "BaseDistribution":
"""Load the distribution from a given wheel.
:param wheel: A concrete wheel definition.
:param name: File name of the wheel.
:raises InvalidWheel: Whenever loading of the wheel causes a
:py:exc:`zipfile.BadZipFile` exception to be thrown.
:raises UnsupportedWheel: If the wheel is a valid zip, but malformed
internally.
"""
raise NotImplementedError()
def __repr__(self) -> str:
return f"{self.raw_name} {self.version} ({self.location})"
@ -149,14 +167,7 @@ class BaseDistribution(Protocol):
The returned location is normalized (in particular, with symlinks removed).
"""
egg_link = egg_link_path_from_location(self.raw_name)
if egg_link:
location = egg_link
elif self.location:
location = self.location
else:
return None
return normalize_path(location)
raise NotImplementedError()
@property
def info_location(self) -> Optional[str]:
@ -317,21 +328,19 @@ class BaseDistribution(Protocol):
"""Check whether an entry in the info directory is a file."""
raise NotImplementedError()
def iterdir(self, path: InfoPath) -> Iterator[pathlib.PurePosixPath]:
"""Iterate through a directory in the info directory.
def iter_distutils_script_names(self) -> Iterator[str]:
"""Find distutils 'scripts' entries metadata.
Each item yielded would be a path relative to the info directory.
:raise FileNotFoundError: If ``name`` does not exist in the directory.
:raise NotADirectoryError: If ``name`` does not point to a directory.
If 'scripts' is supplied in ``setup.py``, distutils records those in the
installed distribution's ``scripts`` directory, a file for each script.
"""
raise NotImplementedError()
def read_text(self, path: InfoPath) -> str:
"""Read a file in the info directory.
:raise FileNotFoundError: If ``name`` does not exist in the directory.
:raise NoneMetadataError: If ``name`` exists in the info directory, but
:raise FileNotFoundError: If ``path`` does not exist in the directory.
:raise NoneMetadataError: If ``path`` exists in the info directory, but
cannot be read.
"""
raise NotImplementedError()
@ -471,8 +480,8 @@ class BaseEnvironment:
"""
raise NotImplementedError()
def iter_distributions(self) -> Generator["BaseDistribution", None, None]:
"""Iterate through installed distributions."""
def iter_all_distributions(self) -> Iterator[BaseDistribution]:
"""Iterate through all installed distributions without any filtering."""
for dist in self._iter_distributions():
# Make sure the distribution actually comes from a valid Python
# packaging distribution. Pip's AdjacentTempDirectory leaves folders
@ -502,6 +511,11 @@ class BaseEnvironment:
) -> Iterator[BaseDistribution]:
"""Return a list of installed distributions.
This is based on ``iter_all_distributions()`` with additional filtering
options. Note that ``iter_installed_distributions()`` without arguments
is *not* equal to ``iter_all_distributions()``, since some of the
configurations exclude packages by default.
:param local_only: If True (default), only return installations
local to the current virtualenv, if in a virtualenv.
:param skip: An iterable of canonicalized project names to ignore;
@ -511,7 +525,7 @@ class BaseEnvironment:
:param user_only: If True, only report installations in the user
site directory.
"""
it = self.iter_distributions()
it = self.iter_all_distributions()
if local_only:
it = (d for d in it if d.local)
if not include_editables:

View file

@ -0,0 +1,4 @@
from ._dists import Distribution
from ._envs import Environment
__all__ = ["Distribution", "Environment"]

View file

@ -0,0 +1,41 @@
import importlib.metadata
from typing import Any, Optional, Protocol, cast
class BasePath(Protocol):
"""A protocol that various path objects conform.
This exists because importlib.metadata uses both ``pathlib.Path`` and
``zipfile.Path``, and we need a common base for type hints (Union does not
work well since ``zipfile.Path`` is too new for our linter setup).
This does not mean to be exhaustive, but only contains things that present
in both classes *that we need*.
"""
name: str
@property
def parent(self) -> "BasePath":
raise NotImplementedError()
def get_info_location(d: importlib.metadata.Distribution) -> Optional[BasePath]:
"""Find the path to the distribution's metadata directory.
HACK: This relies on importlib.metadata's private ``_path`` attribute. Not
all distributions exist on disk, so importlib.metadata is correct to not
expose the attribute as public. But pip's code base is old and not as clean,
so we do this to avoid having to rewrite too many things. Hopefully we can
eliminate this some day.
"""
return getattr(d, "_path", None)
def get_dist_name(dist: importlib.metadata.Distribution) -> str:
"""Get the distribution's project name.
The ``name`` attribute is only available in Python 3.10 or later. We are
targeting exactly that, but Mypy does not know this.
"""
return cast(Any, dist).name

View file

@ -0,0 +1,274 @@
import email.message
import importlib.metadata
import os
import pathlib
import zipfile
from typing import (
Collection,
Dict,
Iterable,
Iterator,
Mapping,
NamedTuple,
Optional,
Sequence,
)
from pip._vendor.packaging.requirements import Requirement
from pip._vendor.packaging.utils import NormalizedName, canonicalize_name
from pip._vendor.packaging.version import parse as parse_version
from pip._internal.exceptions import InvalidWheel, UnsupportedWheel
from pip._internal.metadata.base import (
BaseDistribution,
BaseEntryPoint,
DistributionVersion,
InfoPath,
Wheel,
)
from pip._internal.utils.misc import normalize_path
from pip._internal.utils.packaging import safe_extra
from pip._internal.utils.wheel import parse_wheel, read_wheel_metadata_file
from ._compat import BasePath, get_dist_name
class WheelDistribution(importlib.metadata.Distribution):
"""An ``importlib.metadata.Distribution`` read from a wheel.
Although ``importlib.metadata.PathDistribution`` accepts ``zipfile.Path``,
its implementation is too "lazy" for pip's needs (we can't keep the ZipFile
handle open for the entire lifetime of the distribution object).
This implementation eagerly reads the entire metadata directory into the
memory instead, and operates from that.
"""
def __init__(
self,
files: Mapping[pathlib.PurePosixPath, bytes],
info_location: pathlib.PurePosixPath,
) -> None:
self._files = files
self.info_location = info_location
@classmethod
def from_zipfile(
cls,
zf: zipfile.ZipFile,
name: str,
location: str,
) -> "WheelDistribution":
info_dir, _ = parse_wheel(zf, name)
paths = (
(name, pathlib.PurePosixPath(name.split("/", 1)[-1]))
for name in zf.namelist()
if name.startswith(f"{info_dir}/")
)
files = {
relpath: read_wheel_metadata_file(zf, fullpath)
for fullpath, relpath in paths
}
info_location = pathlib.PurePosixPath(location, info_dir)
return cls(files, info_location)
def iterdir(self, path: InfoPath) -> Iterator[pathlib.PurePosixPath]:
# Only allow iterating through the metadata directory.
if pathlib.PurePosixPath(str(path)) in self._files:
return iter(self._files)
raise FileNotFoundError(path)
def read_text(self, filename: str) -> Optional[str]:
try:
data = self._files[pathlib.PurePosixPath(filename)]
except KeyError:
return None
try:
text = data.decode("utf-8")
except UnicodeDecodeError as e:
wheel = self.info_location.parent
error = f"Error decoding metadata for {wheel}: {e} in {filename} file"
raise UnsupportedWheel(error)
return text
class RequiresEntry(NamedTuple):
requirement: str
extra: str
marker: str
class Distribution(BaseDistribution):
def __init__(
self,
dist: importlib.metadata.Distribution,
info_location: Optional[BasePath],
installed_location: Optional[BasePath],
) -> None:
self._dist = dist
self._info_location = info_location
self._installed_location = installed_location
@classmethod
def from_directory(cls, directory: str) -> BaseDistribution:
info_location = pathlib.Path(directory)
dist = importlib.metadata.Distribution.at(info_location)
return cls(dist, info_location, info_location.parent)
@classmethod
def from_wheel(cls, wheel: Wheel, name: str) -> BaseDistribution:
try:
with wheel.as_zipfile() as zf:
dist = WheelDistribution.from_zipfile(zf, name, wheel.location)
except zipfile.BadZipFile as e:
raise InvalidWheel(wheel.location, name) from e
except UnsupportedWheel as e:
raise UnsupportedWheel(f"{name} has an invalid wheel, {e}")
return cls(dist, dist.info_location, pathlib.PurePosixPath(wheel.location))
@property
def location(self) -> Optional[str]:
if self._info_location is None:
return None
return str(self._info_location.parent)
@property
def info_location(self) -> Optional[str]:
if self._info_location is None:
return None
return str(self._info_location)
@property
def installed_location(self) -> Optional[str]:
if self._installed_location is None:
return None
return normalize_path(str(self._installed_location))
def _get_dist_name_from_location(self) -> Optional[str]:
"""Try to get the name from the metadata directory name.
This is much faster than reading metadata.
"""
if self._info_location is None:
return None
stem, suffix = os.path.splitext(self._info_location.name)
if suffix not in (".dist-info", ".egg-info"):
return None
return stem.split("-", 1)[0]
@property
def canonical_name(self) -> NormalizedName:
name = self._get_dist_name_from_location() or get_dist_name(self._dist)
return canonicalize_name(name)
@property
def version(self) -> DistributionVersion:
return parse_version(self._dist.version)
def is_file(self, path: InfoPath) -> bool:
return self._dist.read_text(str(path)) is not None
def iter_distutils_script_names(self) -> Iterator[str]:
# A distutils installation is always "flat" (not in e.g. egg form), so
# if this distribution's info location is NOT a pathlib.Path (but e.g.
# zipfile.Path), it can never contain any distutils scripts.
if not isinstance(self._info_location, pathlib.Path):
return
for child in self._info_location.joinpath("scripts").iterdir():
yield child.name
def read_text(self, path: InfoPath) -> str:
content = self._dist.read_text(str(path))
if content is None:
raise FileNotFoundError(path)
return content
def iter_entry_points(self) -> Iterable[BaseEntryPoint]:
# importlib.metadata's EntryPoint structure sasitfies BaseEntryPoint.
return self._dist.entry_points
@property
def metadata(self) -> email.message.Message:
return self._dist.metadata
def _iter_requires_txt_entries(self) -> Iterator[RequiresEntry]:
"""Parse a ``requires.txt`` in an egg-info directory.
This is an INI-ish format where an egg-info stores dependencies. A
section name describes extra other environment markers, while each entry
is an arbitrary string (not a key-value pair) representing a dependency
as a requirement string (no markers).
There is a construct in ``importlib.metadata`` called ``Sectioned`` that
does mostly the same, but the format is currently considered private.
"""
content = self._dist.read_text("requires.txt")
if content is None:
return
extra = marker = "" # Section-less entries don't have markers.
for line in content.splitlines():
line = line.strip()
if not line or line.startswith("#"): # Comment; ignored.
continue
if line.startswith("[") and line.endswith("]"): # A section header.
extra, _, marker = line.strip("[]").partition(":")
continue
yield RequiresEntry(requirement=line, extra=extra, marker=marker)
def _iter_egg_info_extras(self) -> Iterable[str]:
"""Get extras from the egg-info directory."""
known_extras = {""}
for entry in self._iter_requires_txt_entries():
if entry.extra in known_extras:
continue
known_extras.add(entry.extra)
yield entry.extra
def iter_provided_extras(self) -> Iterable[str]:
iterator = (
self._dist.metadata.get_all("Provides-Extra")
or self._iter_egg_info_extras()
)
return (safe_extra(extra) for extra in iterator)
def _iter_egg_info_dependencies(self) -> Iterable[str]:
"""Get distribution dependencies from the egg-info directory.
To ease parsing, this converts a legacy dependency entry into a PEP 508
requirement string. Like ``_iter_requires_txt_entries()``, there is code
in ``importlib.metadata`` that does mostly the same, but not do exactly
what we need.
Namely, ``importlib.metadata`` does not normalize the extra name before
putting it into the requirement string, which causes marker comparison
to fail because the dist-info format do normalize. This is consistent in
all currently available PEP 517 backends, although not standardized.
"""
for entry in self._iter_requires_txt_entries():
if entry.extra and entry.marker:
marker = f'({entry.marker}) and extra == "{safe_extra(entry.extra)}"'
elif entry.extra:
marker = f'extra == "{safe_extra(entry.extra)}"'
elif entry.marker:
marker = entry.marker
else:
marker = ""
if marker:
yield f"{entry.requirement} ; {marker}"
else:
yield entry.requirement
def iter_dependencies(self, extras: Collection[str] = ()) -> Iterable[Requirement]:
req_string_iterator = (
self._dist.metadata.get_all("Requires-Dist")
or self._iter_egg_info_dependencies()
)
contexts: Sequence[Dict[str, str]] = [{"extra": safe_extra(e)} for e in extras]
for req_string in req_string_iterator:
req = Requirement(req_string)
if not req.marker:
yield req
elif not extras and req.marker.evaluate({"extra": ""}):
yield req
elif any(req.marker.evaluate(context) for context in contexts):
yield req

View file

@ -0,0 +1,163 @@
import functools
import importlib.metadata
import os
import pathlib
import sys
import zipfile
import zipimport
from typing import Iterator, List, Optional, Sequence, Set, Tuple
from pip._vendor.packaging.utils import NormalizedName, canonicalize_name
from pip._internal.metadata.base import BaseDistribution, BaseEnvironment
from pip._internal.utils.deprecation import deprecated
from ._compat import BasePath, get_dist_name, get_info_location
from ._dists import Distribution
class _DistributionFinder:
"""Finder to locate distributions.
The main purpose of this class is to memoize found distributions' names, so
only one distribution is returned for each package name. At lot of pip code
assumes this (because it is setuptools's behavior), and not doing the same
can potentially cause a distribution in lower precedence path to override a
higher precedence one if the caller is not careful.
Eventually we probably want to make it possible to see lower precedence
installations as well. It's useful feature, after all.
"""
FoundResult = Tuple[importlib.metadata.Distribution, Optional[BasePath]]
def __init__(self) -> None:
self._found_names: Set[NormalizedName] = set()
def _find_impl(self, location: str) -> Iterator[FoundResult]:
"""Find distributions in a location."""
# To know exactly where we find a distribution, we have to feed in the
# paths one by one, instead of dumping the list to importlib.metadata.
for dist in importlib.metadata.distributions(path=[location]):
normalized_name = canonicalize_name(get_dist_name(dist))
if normalized_name in self._found_names:
continue
self._found_names.add(normalized_name)
info_location = get_info_location(dist)
yield dist, info_location
def find(self, location: str) -> Iterator[BaseDistribution]:
"""Find distributions in a location.
The path can be either a directory, or a ZIP archive.
"""
for dist, info_location in self._find_impl(location):
if info_location is None:
installed_location: Optional[BasePath] = None
else:
installed_location = info_location.parent
yield Distribution(dist, info_location, installed_location)
def find_linked(self, location: str) -> Iterator[BaseDistribution]:
"""Read location in egg-link files and return distributions in there.
The path should be a directory; otherwise this returns nothing. This
follows how setuptools does this for compatibility. The first non-empty
line in the egg-link is read as a path (resolved against the egg-link's
containing directory if relative). Distributions found at that linked
location are returned.
"""
path = pathlib.Path(location)
if not path.is_dir():
return
for child in path.iterdir():
if child.suffix != ".egg-link":
continue
with child.open() as f:
lines = (line.strip() for line in f)
target_rel = next((line for line in lines if line), "")
if not target_rel:
continue
target_location = str(path.joinpath(target_rel))
for dist, info_location in self._find_impl(target_location):
yield Distribution(dist, info_location, path)
def _find_eggs_in_dir(self, location: str) -> Iterator[BaseDistribution]:
from pip._vendor.pkg_resources import find_distributions
from pip._internal.metadata import pkg_resources as legacy
with os.scandir(location) as it:
for entry in it:
if not entry.name.endswith(".egg"):
continue
for dist in find_distributions(entry.path):
yield legacy.Distribution(dist)
def _find_eggs_in_zip(self, location: str) -> Iterator[BaseDistribution]:
from pip._vendor.pkg_resources import find_eggs_in_zip
from pip._internal.metadata import pkg_resources as legacy
try:
importer = zipimport.zipimporter(location)
except zipimport.ZipImportError:
return
for dist in find_eggs_in_zip(importer, location):
yield legacy.Distribution(dist)
def find_eggs(self, location: str) -> Iterator[BaseDistribution]:
"""Find eggs in a location.
This actually uses the old *pkg_resources* backend. We likely want to
deprecate this so we can eventually remove the *pkg_resources*
dependency entirely. Before that, this should first emit a deprecation
warning for some versions when using the fallback since importing
*pkg_resources* is slow for those who don't need it.
"""
if os.path.isdir(location):
yield from self._find_eggs_in_dir(location)
if zipfile.is_zipfile(location):
yield from self._find_eggs_in_zip(location)
@functools.lru_cache(maxsize=None) # Warn a distribution exactly once.
def _emit_egg_deprecation(location: Optional[str]) -> None:
deprecated(
reason=f"Loading egg at {location} is deprecated.",
replacement="to use pip for package installation.",
gone_in=None,
)
class Environment(BaseEnvironment):
def __init__(self, paths: Sequence[str]) -> None:
self._paths = paths
@classmethod
def default(cls) -> BaseEnvironment:
return cls(sys.path)
@classmethod
def from_paths(cls, paths: Optional[List[str]]) -> BaseEnvironment:
if paths is None:
return cls(sys.path)
return cls(paths)
def _iter_distributions(self) -> Iterator[BaseDistribution]:
finder = _DistributionFinder()
for location in self._paths:
yield from finder.find(location)
for dist in finder.find_eggs(location):
# _emit_egg_deprecation(dist.location) # TODO: Enable this.
yield dist
# This must go last because that's how pkg_resources tie-breaks.
yield from finder.find_linked(location)
def get_distribution(self, name: str) -> Optional[BaseDistribution]:
matches = (
distribution
for distribution in self.iter_all_distributions()
if distribution.canonical_name == canonicalize_name(name)
)
return next(matches, None)

View file

@ -2,9 +2,8 @@ import email.message
import email.parser
import logging
import os
import pathlib
import zipfile
from typing import Collection, Generator, Iterable, List, Mapping, NamedTuple, Optional
from typing import Collection, Iterable, Iterator, List, Mapping, NamedTuple, Optional
from pip._vendor import pkg_resources
from pip._vendor.packaging.requirements import Requirement
@ -12,7 +11,8 @@ from pip._vendor.packaging.utils import NormalizedName, canonicalize_name
from pip._vendor.packaging.version import parse as parse_version
from pip._internal.exceptions import InvalidWheel, NoneMetadataError, UnsupportedWheel
from pip._internal.utils.misc import display_path
from pip._internal.utils.egg_link import egg_link_path_from_location
from pip._internal.utils.misc import display_path, normalize_path
from pip._internal.utils.wheel import parse_wheel, read_wheel_metadata_file
from .base import (
@ -73,7 +73,7 @@ class Distribution(BaseDistribution):
self._dist = dist
@classmethod
def from_directory(cls, directory: str) -> "Distribution":
def from_directory(cls, directory: str) -> BaseDistribution:
dist_dir = directory.rstrip(os.sep)
# Build a PathMetadata object, from path to metadata. :wink:
@ -93,14 +93,7 @@ class Distribution(BaseDistribution):
return cls(dist)
@classmethod
def from_wheel(cls, wheel: Wheel, name: str) -> "Distribution":
"""Load the distribution from a given wheel.
:raises InvalidWheel: Whenever loading of the wheel causes a
:py:exc:`zipfile.BadZipFile` exception to be thrown.
:raises UnsupportedWheel: If the wheel is a valid zip, but malformed
internally.
"""
def from_wheel(cls, wheel: Wheel, name: str) -> BaseDistribution:
try:
with wheel.as_zipfile() as zf:
info_dir, _ = parse_wheel(zf, name)
@ -124,6 +117,17 @@ class Distribution(BaseDistribution):
def location(self) -> Optional[str]:
return self._dist.location
@property
def installed_location(self) -> Optional[str]:
egg_link = egg_link_path_from_location(self.raw_name)
if egg_link:
location = egg_link
elif self.location:
location = self.location
else:
return None
return normalize_path(location)
@property
def info_location(self) -> Optional[str]:
return self._dist.egg_info
@ -149,14 +153,8 @@ class Distribution(BaseDistribution):
def is_file(self, path: InfoPath) -> bool:
return self._dist.has_metadata(str(path))
def iterdir(self, path: InfoPath) -> Generator[pathlib.PurePosixPath, None, None]:
name = str(path)
if not self._dist.has_metadata(name):
raise FileNotFoundError(name)
if not self._dist.isdir(name):
raise NotADirectoryError(name)
for child in self._dist.metadata_listdir(name):
yield pathlib.PurePosixPath(path, child)
def iter_distutils_script_names(self) -> Iterator[str]:
yield from self._dist.metadata_listdir("scripts")
def read_text(self, path: InfoPath) -> str:
name = str(path)
@ -217,6 +215,10 @@ class Environment(BaseEnvironment):
def from_paths(cls, paths: Optional[List[str]]) -> BaseEnvironment:
return cls(pkg_resources.WorkingSet(paths))
def _iter_distributions(self) -> Iterator[BaseDistribution]:
for dist in self._ws:
yield Distribution(dist)
def _search_distribution(self, name: str) -> Optional[BaseDistribution]:
"""Find a distribution matching the ``name`` in the environment.
@ -224,7 +226,7 @@ class Environment(BaseEnvironment):
match the behavior of ``pkg_resources.get_distribution()``.
"""
canonical_name = canonicalize_name(name)
for dist in self.iter_distributions():
for dist in self.iter_all_distributions():
if dist.canonical_name == canonical_name:
return dist
return None
@ -250,7 +252,3 @@ class Environment(BaseEnvironment):
except pkg_resources.DistributionNotFound:
return None
return self._search_distribution(name)
def _iter_distributions(self) -> Generator[BaseDistribution, None, None]:
for dist in self._ws:
yield Distribution(dist)

View file

@ -558,10 +558,10 @@ class UninstallPathSet:
# find distutils scripts= scripts
try:
for script in dist.iterdir("scripts"):
paths_to_remove.add(os.path.join(bin_dir, script.name))
for script in dist.iter_distutils_script_names():
paths_to_remove.add(os.path.join(bin_dir, script))
if WINDOWS:
paths_to_remove.add(os.path.join(bin_dir, f"{script.name}.bat"))
paths_to_remove.add(os.path.join(bin_dir, f"{script}.bat"))
except (FileNotFoundError, NotADirectoryError):
pass

View file

@ -1233,7 +1233,7 @@ def test_new_resolver_presents_messages_when_backtracking_a_lot(
# cannot handle it correctly. Nobody is complaining about it right now,
# we're probably dropping it for importlib.metadata soon(tm), so let's
# ignore it for the time being.
pytest.param("0.1.0+local-1", marks=pytest.mark.xfail),
pytest.param("0.1.0+local-1", marks=pytest.mark.xfail(strict=False)),
],
ids=["meta_dot", "meta_underscore", "meta_dash"],
)

View file

@ -4,7 +4,7 @@ import sys
import textwrap
from os.path import join, normpath
from tempfile import mkdtemp
from typing import Any
from typing import Any, Iterator
from unittest.mock import Mock
import pytest
@ -615,11 +615,13 @@ def test_uninstall_setuptools_develop_install(
script.assert_installed(FSPkg="0.1.dev0")
# Uninstall both develop and install
uninstall = script.pip("uninstall", "FSPkg", "-y")
assert any(filename.endswith(".egg") for filename in uninstall.files_deleted.keys())
assert any(filename.endswith(".egg") for filename in uninstall.files_deleted), str(
uninstall
)
uninstall2 = script.pip("uninstall", "FSPkg", "-y")
assert (
join(script.site_packages, "FSPkg.egg-link") in uninstall2.files_deleted
), list(uninstall2.files_deleted.keys())
), str(uninstall2)
script.assert_not_installed("FSPkg")
@ -649,6 +651,17 @@ def test_uninstall_editable_and_pip_install(
script.assert_not_installed("FSPkg")
@pytest.fixture()
def move_easy_install_pth(script: PipTestEnvironment) -> Iterator[None]:
"""Move easy-install.pth out of the way for testing easy_install."""
easy_install_pth = join(script.site_packages_path, "easy-install.pth")
pip_test_pth = join(script.site_packages_path, "pip-test.pth")
os.rename(easy_install_pth, pip_test_pth)
yield
os.rename(pip_test_pth, easy_install_pth)
@pytest.mark.usefixtures("move_easy_install_pth")
def test_uninstall_editable_and_pip_install_easy_install_remove(
script: PipTestEnvironment, data: TestData
) -> None:
@ -659,16 +672,12 @@ def test_uninstall_editable_and_pip_install_easy_install_remove(
# This becomes the default behavior in setuptools 25.
script.environ["SETUPTOOLS_SYS_PATH_TECHNIQUE"] = "raw"
# Rename easy-install.pth to pip-test.pth
easy_install_pth = join(script.site_packages_path, "easy-install.pth")
pip_test_pth = join(script.site_packages_path, "pip-test.pth")
os.rename(easy_install_pth, pip_test_pth)
# Install FSPkg
pkg_path = data.packages.joinpath("FSPkg")
script.pip("install", "-e", ".", expect_stderr=True, cwd=pkg_path)
# Rename easy-install.pth to pip-test-fspkg.pth
easy_install_pth = join(script.site_packages_path, "easy-install.pth")
pip_test_fspkg_pth = join(script.site_packages_path, "pip-test-fspkg.pth")
os.rename(easy_install_pth, pip_test_fspkg_pth)
@ -689,9 +698,6 @@ def test_uninstall_editable_and_pip_install_easy_install_remove(
# Confirm that FSPkg is uninstalled
script.assert_not_installed("FSPkg")
# Rename pip-test.pth back to easy-install.pth
os.rename(pip_test_pth, easy_install_pth)
def test_uninstall_ignores_missing_packages(
script: PipTestEnvironment, data: TestData

View file

@ -21,7 +21,7 @@ from pip._internal.exceptions import (
PreviousBuildDirError,
)
from pip._internal.index.package_finder import PackageFinder
from pip._internal.metadata.pkg_resources import Distribution
from pip._internal.metadata import select_backend
from pip._internal.network.session import PipSession
from pip._internal.operations.build.build_tracker import get_build_tracker
from pip._internal.operations.prepare import RequirementPreparer
@ -451,7 +451,7 @@ class TestInstallRequirement:
req = install_req_from_line("foo")
req.metadata_directory = path
dist = req.get_dist()
assert isinstance(dist, Distribution)
assert isinstance(dist, select_backend().Distribution)
assert dist.raw_name == dist.canonical_name == "foo"
assert dist.location == "/path/to".replace("/", os.path.sep)