Remove pkg_resources usages from utils modules

Relevant functionalities are moved into pip._internal.metadata.
This commit is contained in:
Tzu-ping Chung 2021-11-22 16:58:41 +08:00
parent d130d31b14
commit cd01e4fd8f
11 changed files with 224 additions and 313 deletions

View File

@ -149,12 +149,12 @@ class InvalidPyProjectBuildRequires(DiagnosticPipError):
class NoneMetadataError(PipError):
"""
Raised when accessing "METADATA" or "PKG-INFO" metadata for a
pip._vendor.pkg_resources.Distribution object and
`dist.has_metadata('METADATA')` returns True but
`dist.get_metadata('METADATA')` returns None (and similarly for
"PKG-INFO").
"""Raised when accessing a Distribution's "METADATA" or "PKG-INFO".
This signifies an inconsistency, when the Distribution claims to have
the metadata file (if not, raise ``FileNotFoundError`` instead), but is
not actually able to produce its content. This may be due to permission
errors.
"""
def __init__(

View File

@ -23,13 +23,19 @@ from pip._vendor.packaging.specifiers import InvalidSpecifier, SpecifierSet
from pip._vendor.packaging.utils import NormalizedName
from pip._vendor.packaging.version import LegacyVersion, Version
from pip._internal.exceptions import NoneMetadataError
from pip._internal.locations import site_packages, user_site
from pip._internal.models.direct_url import (
DIRECT_URL_METADATA_NAME,
DirectUrl,
DirectUrlValidationError,
)
from pip._internal.utils.compat import stdlib_pkgs # TODO: Move definition here.
from pip._internal.utils.egg_link import egg_link_path_from_sys_path
from pip._internal.utils.egg_link import (
egg_link_path_from_location,
egg_link_path_from_sys_path,
)
from pip._internal.utils.misc import is_local, normalize_path
from pip._internal.utils.urls import url_to_path
if TYPE_CHECKING:
@ -131,6 +137,26 @@ class BaseDistribution(Protocol):
return self.location
return None
@property
def installed_location(self) -> Optional[str]:
"""The distribution's "installed" location.
This should generally be a ``site-packages`` directory. This is
usually ``dist.location``, except for legacy develop-installed packages,
where ``dist.location`` is the source code location, and this is where
the ``.egg-link`` file is.
The returned location is normalized (in particular, with symlinks removed).
"""
egg_link = egg_link_path_from_location(self.raw_name)
if egg_link:
location = egg_link
elif self.location:
location = self.location
else:
return None
return normalize_path(location)
@property
def info_location(self) -> Optional[str]:
"""Location of the .[egg|dist]-info directory or file.
@ -250,7 +276,15 @@ class BaseDistribution(Protocol):
@property
def installer(self) -> str:
raise NotImplementedError()
try:
installer_text = self.read_text("INSTALLER")
except (OSError, ValueError, NoneMetadataError):
return "" # Fail silently if the installer file cannot be read.
for line in installer_text.splitlines():
cleaned_line = line.strip()
if cleaned_line:
return cleaned_line
return ""
@property
def editable(self) -> bool:
@ -258,15 +292,25 @@ class BaseDistribution(Protocol):
@property
def local(self) -> bool:
raise NotImplementedError()
"""If distribution is installed in the current virtual environment.
Always True if we're not in a virtualenv.
"""
if self.installed_location is None:
return False
return is_local(self.installed_location)
@property
def in_usersite(self) -> bool:
raise NotImplementedError()
if self.installed_location is None or user_site is None:
return False
return self.installed_location.startswith(normalize_path(user_site))
@property
def in_site_packages(self) -> bool:
raise NotImplementedError()
if self.installed_location is None or site_packages is None:
return False
return self.installed_location.startswith(normalize_path(site_packages))
def is_file(self, path: InfoPath) -> bool:
"""Check whether an entry in the info directory is a file."""
@ -286,6 +330,8 @@ class BaseDistribution(Protocol):
"""Read a file in the info directory.
:raise FileNotFoundError: If ``name`` does not exist in the directory.
:raise NoneMetadataError: If ``name`` exists in the info directory, but
cannot be read.
"""
raise NotImplementedError()
@ -294,7 +340,13 @@ class BaseDistribution(Protocol):
@property
def metadata(self) -> email.message.Message:
"""Metadata of distribution parsed from e.g. METADATA or PKG-INFO."""
"""Metadata of distribution parsed from e.g. METADATA or PKG-INFO.
This should return an empty message if the metadata file is unavailable.
:raises NoneMetadataError: If the metadata file is available, but does
not contain valid metadata.
"""
raise NotImplementedError()
@property
@ -402,7 +454,11 @@ class BaseEnvironment:
raise NotImplementedError()
def get_distribution(self, name: str) -> Optional["BaseDistribution"]:
"""Given a requirement name, return the installed distributions."""
"""Given a requirement name, return the installed distributions.
The name may not be normalized. The implementation must canonicalize
it for lookup.
"""
raise NotImplementedError()
def _iter_distributions(self) -> Iterator["BaseDistribution"]:

View File

@ -1,19 +1,19 @@
import email.message
import email.parser
import logging
import os
import pathlib
from typing import Collection, Iterable, Iterator, List, NamedTuple, Optional
from zipfile import BadZipFile
import zipfile
from typing import Collection, Iterable, Iterator, List, Mapping, NamedTuple, Optional
from pip._vendor import pkg_resources
from pip._vendor.packaging.requirements import Requirement
from pip._vendor.packaging.utils import NormalizedName, canonicalize_name
from pip._vendor.packaging.version import parse as parse_version
from pip._internal.exceptions import InvalidWheel
from pip._internal.utils import misc # TODO: Move definition here.
from pip._internal.utils.packaging import get_installer, get_metadata
from pip._internal.utils.wheel import pkg_resources_distribution_for_wheel
from pip._internal.exceptions import InvalidWheel, NoneMetadataError, UnsupportedWheel
from pip._internal.utils.misc import display_path
from pip._internal.utils.wheel import parse_wheel, read_wheel_metadata_file
from .base import (
BaseDistribution,
@ -33,6 +33,41 @@ class EntryPoint(NamedTuple):
group: str
class _WheelMetadata:
"""IMetadataProvider that reads metadata files from a dictionary.
This also maps metadata decoding exceptions to our internal exception type.
"""
def __init__(self, metadata: Mapping[str, bytes], wheel_name: str) -> None:
self._metadata = metadata
self._wheel_name = wheel_name
def has_metadata(self, name: str) -> bool:
return name in self._metadata
def get_metadata(self, name: str) -> str:
try:
return self._metadata[name].decode()
except UnicodeDecodeError as e:
# Augment the default error with the origin of the file.
raise UnsupportedWheel(
f"Error decoding metadata for {self._wheel_name}: {e} in {name} file"
)
def get_metadata_lines(self, name: str) -> Iterable[str]:
return pkg_resources.yield_lines(self.get_metadata(name))
def metadata_isdir(self, name: str) -> bool:
return False
def metadata_listdir(self, name: str) -> List[str]:
return []
def run_script(self, script_name: str, namespace: str) -> None:
pass
class Distribution(BaseDistribution):
def __init__(self, dist: pkg_resources.Distribution) -> None:
self._dist = dist
@ -63,12 +98,26 @@ class Distribution(BaseDistribution):
:raises InvalidWheel: Whenever loading of the wheel causes a
:py:exc:`zipfile.BadZipFile` exception to be thrown.
:raises UnsupportedWheel: If the wheel is a valid zip, but malformed
internally.
"""
try:
with wheel.as_zipfile() as zf:
dist = pkg_resources_distribution_for_wheel(zf, name, wheel.location)
except BadZipFile as e:
info_dir, _ = parse_wheel(zf, name)
metadata_text = {
path.split("/", 1)[-1]: read_wheel_metadata_file(zf, path)
for path in zf.namelist()
if path.startswith(f"{info_dir}/")
}
except zipfile.BadZipFile as e:
raise InvalidWheel(wheel.location, name) from e
except UnsupportedWheel as e:
raise UnsupportedWheel(f"{name} has an invalid wheel, {e}")
dist = pkg_resources.DistInfoDistribution(
location=wheel.location,
metadata=_WheelMetadata(metadata_text, wheel.location),
project_name=name,
)
return cls(dist)
@property
@ -97,25 +146,6 @@ class Distribution(BaseDistribution):
def version(self) -> DistributionVersion:
return parse_version(self._dist.version)
@property
def installer(self) -> str:
try:
return get_installer(self._dist)
except (OSError, ValueError):
return "" # Fail silently if the installer file cannot be read.
@property
def local(self) -> bool:
return misc.dist_is_local(self._dist)
@property
def in_usersite(self) -> bool:
return misc.dist_in_usersite(self._dist)
@property
def in_site_packages(self) -> bool:
return misc.dist_in_site_packages(self._dist)
def is_file(self, path: InfoPath) -> bool:
return self._dist.has_metadata(str(path))
@ -132,7 +162,10 @@ class Distribution(BaseDistribution):
name = str(path)
if not self._dist.has_metadata(name):
raise FileNotFoundError(name)
return self._dist.get_metadata(name)
content = self._dist.get_metadata(name)
if content is None:
raise NoneMetadataError(self, name)
return content
def iter_entry_points(self) -> Iterable[BaseEntryPoint]:
for group, entries in self._dist.get_entry_map().items():
@ -142,7 +175,26 @@ class Distribution(BaseDistribution):
@property
def metadata(self) -> email.message.Message:
return get_metadata(self._dist)
"""
:raises NoneMetadataError: if the distribution reports `has_metadata()`
True but `get_metadata()` returns None.
"""
if isinstance(self._dist, pkg_resources.DistInfoDistribution):
metadata_name = "METADATA"
else:
metadata_name = "PKG-INFO"
try:
metadata = self.read_text(metadata_name)
except FileNotFoundError:
if self.location:
displaying_path = display_path(self.location)
else:
displaying_path = repr(self.location)
logger.warning("No metadata found in %s", displaying_path)
metadata = ""
feed_parser = email.parser.FeedParser()
feed_parser.feed(metadata)
return feed_parser.close()
def iter_dependencies(self, extras: Collection[str] = ()) -> Iterable[Requirement]:
if extras: # pkg_resources raises on invalid extras, so we sanitize.
@ -178,7 +230,6 @@ class Environment(BaseEnvironment):
return None
def get_distribution(self, name: str) -> Optional[BaseDistribution]:
# Search the distribution by looking through the working set.
dist = self._search_distribution(name)
if dist:

View File

@ -51,8 +51,6 @@ from pip._internal.utils.misc import (
ask_path_exists,
backup_dir,
display_path,
dist_in_site_packages,
dist_in_usersite,
hide_url,
redact_auth_from_url,
)
@ -402,11 +400,9 @@ class InstallRequirement:
if not version_compatible:
self.satisfied_by = None
if use_user_site:
if dist_in_usersite(existing_dist):
if existing_dist.in_usersite:
self.should_reinstall = True
elif running_under_virtualenv() and dist_in_site_packages(
existing_dist
):
elif running_under_virtualenv() and existing_dist.in_site_packages:
raise InstallationError(
f"Will not install to the user site because it will "
f"lack sys.path precedence to {existing_dist.raw_name} "

View File

@ -32,14 +32,12 @@ from typing import (
cast,
)
from pip._vendor.pkg_resources import Distribution
from pip._vendor.tenacity import retry, stop_after_delay, wait_fixed
from pip import __version__
from pip._internal.exceptions import CommandError
from pip._internal.locations import get_major_minor_version, site_packages, user_site
from pip._internal.locations import get_major_minor_version
from pip._internal.utils.compat import WINDOWS
from pip._internal.utils.egg_link import egg_link_path_from_location
from pip._internal.utils.virtualenv import running_under_virtualenv
__all__ = [
@ -328,64 +326,6 @@ def is_local(path: str) -> bool:
return path.startswith(normalize_path(sys.prefix))
def dist_is_local(dist: Distribution) -> bool:
"""
Return True if given Distribution object is installed locally
(i.e. within current virtualenv).
Always True if we're not in a virtualenv.
"""
return is_local(dist_location(dist))
def dist_in_usersite(dist: Distribution) -> bool:
"""
Return True if given Distribution is installed in user site.
"""
return dist_location(dist).startswith(normalize_path(user_site))
def dist_in_site_packages(dist: Distribution) -> bool:
"""
Return True if given Distribution is installed in
sysconfig.get_python_lib().
"""
return dist_location(dist).startswith(normalize_path(site_packages))
def get_distribution(req_name: str) -> Optional[Distribution]:
"""Given a requirement name, return the installed Distribution object.
This searches from *all* distributions available in the environment, to
match the behavior of ``pkg_resources.get_distribution()``.
Left for compatibility until direct pkg_resources uses are refactored out.
"""
from pip._internal.metadata import get_default_environment
from pip._internal.metadata.pkg_resources import Distribution as _Dist
dist = get_default_environment().get_distribution(req_name)
if dist is None:
return None
return cast(_Dist, dist)._dist
def dist_location(dist: Distribution) -> str:
"""
Get the site-packages location of this distribution. Generally
this is dist.location, except in the case of develop-installed
packages, where dist.location is the source code location, and we
want to know where the egg-link file is.
The returned location is normalized (in particular, with symlinks removed).
"""
egg_link = egg_link_path_from_location(dist.project_name)
if egg_link:
return normalize_path(egg_link)
return normalize_path(dist.location)
def write_output(msg: Any, *args: Any) -> None:
logger.info(msg, *args)

View File

@ -1,16 +1,9 @@
import functools
import logging
from email.message import Message
from email.parser import FeedParser
from typing import Optional, Tuple
from pip._vendor import pkg_resources
from pip._vendor.packaging import specifiers, version
from pip._vendor.packaging.requirements import Requirement
from pip._vendor.pkg_resources import Distribution
from pip._internal.exceptions import NoneMetadataError
from pip._internal.utils.misc import display_path
logger = logging.getLogger(__name__)
@ -38,41 +31,6 @@ def check_requires_python(
return python_version in requires_python_specifier
def get_metadata(dist: Distribution) -> Message:
"""
:raises NoneMetadataError: if the distribution reports `has_metadata()`
True but `get_metadata()` returns None.
"""
metadata_name = "METADATA"
if isinstance(dist, pkg_resources.DistInfoDistribution) and dist.has_metadata(
metadata_name
):
metadata = dist.get_metadata(metadata_name)
elif dist.has_metadata("PKG-INFO"):
metadata_name = "PKG-INFO"
metadata = dist.get_metadata(metadata_name)
else:
logger.warning("No metadata found in %s", display_path(dist.location))
metadata = ""
if metadata is None:
raise NoneMetadataError(dist, metadata_name)
feed_parser = FeedParser()
# The following line errors out if with a "NoneType" TypeError if
# passed metadata=None.
feed_parser.feed(metadata)
return feed_parser.close()
def get_installer(dist: Distribution) -> str:
if dist.has_metadata("INSTALLER"):
for line in dist.get_metadata_lines("INSTALLER"):
if line.strip():
return line.strip()
return ""
@functools.lru_cache(maxsize=512)
def get_requirement(req_string: str) -> Requirement:
"""Construct a packaging.Requirement object with caching"""

View File

@ -1,33 +0,0 @@
from typing import Dict, Iterable, List
from pip._vendor.pkg_resources import yield_lines
class DictMetadata:
"""IMetadataProvider that reads metadata files from a dictionary."""
def __init__(self, metadata: Dict[str, bytes]) -> None:
self._metadata = metadata
def has_metadata(self, name: str) -> bool:
return name in self._metadata
def get_metadata(self, name: str) -> str:
try:
return self._metadata[name].decode()
except UnicodeDecodeError as e:
# Mirrors handling done in pkg_resources.NullProvider.
e.reason += f" in {name} file"
raise
def get_metadata_lines(self, name: str) -> Iterable[str]:
return yield_lines(self.get_metadata(name))
def metadata_isdir(self, name: str) -> bool:
return False
def metadata_listdir(self, name: str) -> List[str]:
return []
def run_script(self, script_name: str, namespace: str) -> None:
pass

View File

@ -4,14 +4,12 @@
import logging
from email.message import Message
from email.parser import Parser
from typing import Dict, Tuple
from typing import Tuple
from zipfile import BadZipFile, ZipFile
from pip._vendor.packaging.utils import canonicalize_name
from pip._vendor.pkg_resources import DistInfoDistribution, Distribution
from pip._internal.exceptions import UnsupportedWheel
from pip._internal.utils.pkg_resources import DictMetadata
VERSION_COMPATIBLE = (1, 0)
@ -19,50 +17,6 @@ VERSION_COMPATIBLE = (1, 0)
logger = logging.getLogger(__name__)
class WheelMetadata(DictMetadata):
"""Metadata provider that maps metadata decoding exceptions to our
internal exception type.
"""
def __init__(self, metadata: Dict[str, bytes], wheel_name: str) -> None:
super().__init__(metadata)
self._wheel_name = wheel_name
def get_metadata(self, name: str) -> str:
try:
return super().get_metadata(name)
except UnicodeDecodeError as e:
# Augment the default error with the origin of the file.
raise UnsupportedWheel(
f"Error decoding metadata for {self._wheel_name}: {e}"
)
def pkg_resources_distribution_for_wheel(
wheel_zip: ZipFile, name: str, location: str
) -> Distribution:
"""Get a pkg_resources distribution given a wheel.
:raises UnsupportedWheel: on any errors
"""
info_dir, _ = parse_wheel(wheel_zip, name)
metadata_files = [p for p in wheel_zip.namelist() if p.startswith(f"{info_dir}/")]
metadata_text: Dict[str, bytes] = {}
for path in metadata_files:
_, metadata_name = path.split("/", 1)
try:
metadata_text[metadata_name] = read_wheel_metadata_file(wheel_zip, path)
except UnsupportedWheel as e:
raise UnsupportedWheel("{} has an invalid wheel, {}".format(name, str(e)))
metadata = WheelMetadata(metadata_text, location)
return DistInfoDistribution(location=location, metadata=metadata, project_name=name)
def parse_wheel(wheel_zip: ZipFile, name: str) -> Tuple[str, Message]:
"""Extract information from the provided wheel, ensuring it meets basic
standards.

View File

@ -0,0 +1,70 @@
import itertools
from typing import List, cast
from unittest import mock
import pytest
from pip._internal.metadata.pkg_resources import Distribution, Environment
pkg_resources = pytest.importorskip("pip._vendor.pkg_resources")
def _dist_is_local(dist: mock.Mock) -> bool:
return dist.kind != "global" and dist.kind != "user"
def _dist_in_usersite(dist: mock.Mock) -> bool:
return dist.kind == "user"
@pytest.fixture(autouse=True)
def patch_distribution_lookups(monkeypatch: pytest.MonkeyPatch) -> None:
monkeypatch.setattr(Distribution, "local", property(_dist_is_local))
monkeypatch.setattr(Distribution, "in_usersite", property(_dist_in_usersite))
class _MockWorkingSet(List[mock.Mock]):
def require(self, name: str) -> None:
pass
workingset = _MockWorkingSet(
(
mock.Mock(test_name="global", project_name="global"),
mock.Mock(test_name="editable", project_name="editable"),
mock.Mock(test_name="normal", project_name="normal"),
mock.Mock(test_name="user", project_name="user"),
)
)
workingset_stdlib = _MockWorkingSet(
(
mock.Mock(test_name="normal", project_name="argparse"),
mock.Mock(test_name="normal", project_name="wsgiref"),
)
)
@pytest.mark.parametrize(
"ws, req_name",
itertools.chain(
itertools.product(
[workingset],
(d.project_name for d in workingset),
),
itertools.product(
[workingset_stdlib],
(d.project_name for d in workingset_stdlib),
),
),
)
def test_get_distribution(ws: _MockWorkingSet, req_name: str) -> None:
"""Ensure get_distribution() finds all kinds of distributions."""
dist = Environment(ws).get_distribution(req_name)
assert dist is not None
assert cast(Distribution, dist)._dist.project_name == req_name
def test_get_distribution_nonexist() -> None:
dist = Environment(workingset).get_distribution("non-exist")
assert dist is None

View File

@ -3,7 +3,6 @@ util tests
"""
import codecs
import itertools
import os
import shutil
import stat
@ -30,7 +29,6 @@ from pip._internal.utils.misc import (
build_netloc,
build_url_from_netloc,
format_size,
get_distribution,
get_prog,
hide_url,
hide_value,
@ -209,85 +207,6 @@ class Tests_EgglinkPath:
assert egg_link_path_from_location(self.mock_dist.project_name) is None
@patch("pip._internal.utils.misc.dist_in_usersite")
@patch("pip._internal.utils.misc.dist_is_local")
class TestsGetDistributions:
"""Test get_distribution()."""
class MockWorkingSet(List[Mock]):
def require(self, name: str) -> None:
pass
workingset = MockWorkingSet(
(
Mock(test_name="global", project_name="global"),
Mock(test_name="editable", project_name="editable"),
Mock(test_name="normal", project_name="normal"),
Mock(test_name="user", project_name="user"),
)
)
workingset_stdlib = MockWorkingSet(
(
Mock(test_name="normal", project_name="argparse"),
Mock(test_name="normal", project_name="wsgiref"),
)
)
workingset_freeze = MockWorkingSet(
(
Mock(test_name="normal", project_name="pip"),
Mock(test_name="normal", project_name="setuptools"),
Mock(test_name="normal", project_name="distribute"),
)
)
def dist_is_local(self, dist: Mock) -> bool:
return dist.test_name != "global" and dist.test_name != "user"
def dist_in_usersite(self, dist: Mock) -> bool:
return dist.test_name == "user"
@pytest.mark.parametrize(
"working_set, req_name",
itertools.chain(
itertools.product(
[workingset],
(d.project_name for d in workingset),
),
itertools.product(
[workingset_stdlib],
(d.project_name for d in workingset_stdlib),
),
),
)
def test_get_distribution(
self,
mock_dist_is_local: Mock,
mock_dist_in_usersite: Mock,
working_set: MockWorkingSet,
req_name: str,
) -> None:
"""Ensure get_distribution() finds all kinds of distributions."""
mock_dist_is_local.side_effect = self.dist_is_local
mock_dist_in_usersite.side_effect = self.dist_in_usersite
with patch("pip._vendor.pkg_resources.working_set", working_set):
dist = get_distribution(req_name)
assert dist is not None
assert dist.project_name == req_name
@patch("pip._vendor.pkg_resources.working_set", workingset)
def test_get_distribution_nonexist(
self,
mock_dist_is_local: Mock,
mock_dist_in_usersite: Mock,
) -> None:
mock_dist_is_local.side_effect = self.dist_is_local
mock_dist_in_usersite.side_effect = self.dist_in_usersite
dist = get_distribution("non-exist")
assert dist is None
def test_rmtree_errorhandler_nonexistent_directory(tmpdir: Path) -> None:
"""
Test rmtree_errorhandler ignores the given non-existing directory.