Merge pull request #7539 from chrahunt/refactor/get-dist-from-zip

Use wheelfile-based pkg_resources.Distribution for metadata
This commit is contained in:
Christopher Hunt 2020-01-06 01:24:13 +08:00 committed by GitHub
commit b7ed044525
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 240 additions and 43 deletions

View File

@ -1,7 +1,8 @@
from pip._vendor import pkg_resources
from zipfile import ZipFile
from pip._internal.distributions.base import AbstractDistribution
from pip._internal.utils.typing import MYPY_CHECK_RUNNING
from pip._internal.utils.wheel import pkg_resources_distribution_for_wheel
if MYPY_CHECK_RUNNING:
from pip._vendor.pkg_resources import Distribution
@ -16,8 +17,19 @@ class WheelDistribution(AbstractDistribution):
def get_pkg_resources_distribution(self):
# type: () -> Distribution
return list(pkg_resources.find_distributions(
self.req.source_dir))[0]
"""Loads the metadata from the wheel file into memory and returns a
Distribution that uses it, not relying on the wheel file or
requirement.
"""
# Set as part of preparation during download.
assert self.req.local_file_path
# Wheels are never unnamed.
assert self.req.name
with ZipFile(self.req.local_file_path, allowZip64=True) as z:
return pkg_resources_distribution_for_wheel(
z, self.req.name, self.req.local_file_path
)
def prepare_distribution_metadata(self, finder, build_isolation):
# type: (PackageFinder, bool) -> None

View File

@ -0,0 +1,44 @@
from pip._vendor.pkg_resources import yield_lines
from pip._vendor.six import ensure_str
from pip._internal.utils.typing import MYPY_CHECK_RUNNING
if MYPY_CHECK_RUNNING:
from typing import Dict, Iterable, List
class DictMetadata(object):
"""IMetadataProvider that reads metadata files from a dictionary.
"""
def __init__(self, metadata):
# type: (Dict[str, bytes]) -> None
self._metadata = metadata
def has_metadata(self, name):
# type: (str) -> bool
return name in self._metadata
def get_metadata(self, name):
# type: (str) -> str
try:
return ensure_str(self._metadata[name])
except UnicodeDecodeError as e:
# Mirrors handling done in pkg_resources.NullProvider.
e.reason += " in {} file".format(name)
raise
def get_metadata_lines(self, name):
# type: (str) -> Iterable[str]
return yield_lines(self.get_metadata(name))
def metadata_isdir(self, name):
# type: (str) -> bool
return False
def metadata_listdir(self, name):
# type: (str) -> List[str]
return []
def run_script(self, script_name, namespace):
# type: (str, str) -> None
pass

View File

@ -8,14 +8,18 @@ from email.parser import Parser
from zipfile import ZipFile
from pip._vendor.packaging.utils import canonicalize_name
from pip._vendor.pkg_resources import DistInfoDistribution
from pip._vendor.six import PY2, ensure_str
from pip._internal.exceptions import UnsupportedWheel
from pip._internal.utils.pkg_resources import DictMetadata
from pip._internal.utils.typing import MYPY_CHECK_RUNNING
if MYPY_CHECK_RUNNING:
from email.message import Message
from typing import Tuple
from typing import Dict, Tuple
from pip._vendor.pkg_resources import Distribution
if PY2:
from zipfile import BadZipfile as BadZipFile
@ -29,6 +33,65 @@ VERSION_COMPATIBLE = (1, 0)
logger = logging.getLogger(__name__)
class WheelMetadata(DictMetadata):
"""Metadata provider that maps metadata decoding exceptions to our
internal exception type.
"""
def __init__(self, metadata, wheel_name):
# type: (Dict[str, bytes], str) -> None
super(WheelMetadata, self).__init__(metadata)
self._wheel_name = wheel_name
def get_metadata(self, name):
# type: (str) -> str
try:
return super(WheelMetadata, self).get_metadata(name)
except UnicodeDecodeError as e:
# Augment the default error with the origin of the file.
raise UnsupportedWheel(
"Error decoding metadata for {}: {}".format(
self._wheel_name, e
)
)
def pkg_resources_distribution_for_wheel(wheel_zip, name, location):
# type: (ZipFile, str, str) -> Distribution
"""Get a pkg_resources distribution given a wheel.
:raises UnsupportedWheel: on any errors
"""
info_dir, _ = parse_wheel(wheel_zip, name)
metadata_files = [
p for p in wheel_zip.namelist() if p.startswith("{}/".format(info_dir))
]
metadata_text = {} # type: Dict[str, bytes]
for path in metadata_files:
# If a flag is set, namelist entries may be unicode in Python 2.
# We coerce them to native str type to match the types used in the rest
# of the code. This cannot fail because unicode can always be encoded
# with UTF-8.
full_path = ensure_str(path)
_, metadata_name = full_path.split("/", 1)
try:
metadata_text[metadata_name] = read_wheel_metadata_file(
wheel_zip, full_path
)
except UnsupportedWheel as e:
raise UnsupportedWheel(
"{} has an invalid wheel, {}".format(name, str(e))
)
metadata = WheelMetadata(metadata_text, location)
return DistInfoDistribution(
location=location, metadata=metadata, project_name=name
)
def parse_wheel(wheel_zip, name):
# type: (ZipFile, str) -> Tuple[str, Message]
"""Extract information from the provided wheel, ensuring it meets basic
@ -88,23 +151,31 @@ def wheel_dist_info_dir(source, name):
return ensure_str(info_dir)
def read_wheel_metadata_file(source, path):
# type: (ZipFile, str) -> bytes
try:
return source.read(path)
# BadZipFile for general corruption, KeyError for missing entry,
# and RuntimeError for password-protected files
except (BadZipFile, KeyError, RuntimeError) as e:
raise UnsupportedWheel(
"could not read {!r} file: {!r}".format(path, e)
)
def wheel_metadata(source, dist_info_dir):
# type: (ZipFile, str) -> Message
"""Return the WHEEL metadata of an extracted wheel, if possible.
Otherwise, raise UnsupportedWheel.
"""
try:
# Zip file path separators must be /
wheel_contents = source.read("{}/WHEEL".format(dist_info_dir))
# BadZipFile for general corruption, KeyError for missing entry,
# and RuntimeError for password-protected files
except (BadZipFile, KeyError, RuntimeError) as e:
raise UnsupportedWheel("could not read WHEEL file: {!r}".format(e))
path = "{}/WHEEL".format(dist_info_dir)
# Zip file path separators must be /
wheel_contents = read_wheel_metadata_file(source, path)
try:
wheel_text = ensure_str(wheel_contents)
except UnicodeDecodeError as e:
raise UnsupportedWheel("error decoding WHEEL: {!r}".format(e))
raise UnsupportedWheel("error decoding {!r}: {!r}".format(path, e))
# FeedParser (used by Parser) does not raise any exceptions. The returned
# message may have .defects populated, but for backwards-compatibility we

View File

@ -13,13 +13,11 @@ from pip._internal.models.link import Link
from pip._internal.operations.build.wheel import build_wheel_pep517
from pip._internal.operations.build.wheel_legacy import build_wheel_legacy
from pip._internal.utils.logging import indent_log
from pip._internal.utils.marker_files import has_delete_marker_file
from pip._internal.utils.misc import ensure_dir, hash_file
from pip._internal.utils.setuptools_build import make_setuptools_clean_args
from pip._internal.utils.subprocess import call_subprocess
from pip._internal.utils.temp_dir import TempDirectory
from pip._internal.utils.typing import MYPY_CHECK_RUNNING
from pip._internal.utils.unpacking import unpack_file
from pip._internal.utils.urls import path_to_url
from pip._internal.vcs import vcs
@ -313,27 +311,6 @@ class WheelBuilder(object):
req.link = Link(path_to_url(wheel_file))
req.local_file_path = req.link.file_path
assert req.link.is_wheel
if should_unpack:
# XXX: This is mildly duplicative with prepare_files,
# but not close enough to pull out to a single common
# method.
# The code below assumes temporary source dirs -
# prevent it doing bad things.
if (
req.source_dir and
not has_delete_marker_file(req.source_dir)
):
raise AssertionError(
"bad source dir - missing marker")
# Delete the source we built the wheel from
req.remove_temporary_source()
# set the build directory again - name is known from
# the work prepare_files did.
req.source_dir = req.ensure_build_location(
self.preparer.build_dir
)
# extract the wheel into the dir
unpack_file(req.link.file_path, req.source_dir)
build_successes.append(req)
else:
build_failures.append(req)

View File

@ -5,7 +5,7 @@ import shutil
import pytest
from tests.lib import create_basic_wheel_for_package
from tests.lib import create_basic_wheel_for_package, skip_if_python2
from tests.lib.path import Path
@ -534,3 +534,38 @@ def test_wheel_installs_ok_with_nested_dist_info(script):
script.pip(
"install", "--no-cache-dir", "--no-index", package
)
def test_wheel_installs_ok_with_badly_encoded_irrelevant_dist_info_file(
script
):
package = create_basic_wheel_for_package(
script,
"simple",
"0.1.0",
extra_files={
"simple-0.1.0.dist-info/AUTHORS.txt": b"\xff"
},
)
script.pip(
"install", "--no-cache-dir", "--no-index", package
)
# Metadata is not decoded on Python 2.
@skip_if_python2
def test_wheel_install_fails_with_badly_encoded_metadata(script):
package = create_basic_wheel_for_package(
script,
"simple",
"0.1.0",
extra_files={
"simple-0.1.0.dist-info/METADATA": b"\xff"
},
)
result = script.pip(
"install", "--no-cache-dir", "--no-index", package, expect_error=True
)
assert "Error decoding metadata for" in result.stderr
assert "simple-0.1.0-py2.py3-none-any.whl" in result.stderr
assert "METADATA" in result.stderr

View File

@ -15,7 +15,7 @@ from textwrap import dedent
from zipfile import ZipFile
import pytest
from pip._vendor.six import PY2
from pip._vendor.six import PY2, ensure_binary
from scripttest import FoundDir, TestFileEnvironment
from pip._internal.index.collector import LinkCollector
@ -1018,9 +1018,6 @@ def create_basic_wheel_for_package(
"{dist_info}/RECORD": ""
}
if extra_files:
files.update(extra_files)
# Some useful shorthands
archive_name = "{name}-{version}-py2.py3-none-any.whl".format(
name=name, version=version
@ -1046,10 +1043,14 @@ def create_basic_wheel_for_package(
name=name, version=version, requires_dist=requires_dist
).strip()
# Add new files after formatting
if extra_files:
files.update(extra_files)
for fname in files:
path = script.temp_path / fname
path.parent.mkdir(exist_ok=True, parents=True)
path.write_text(files[fname])
path.write_bytes(ensure_binary(files[fname]))
retval = script.scratch_path / archive_name
generated = shutil.make_archive(retval, 'zip', script.temp_path)

View File

@ -0,0 +1,57 @@
from email.message import Message
import pytest
from pip._vendor.pkg_resources import DistInfoDistribution, Requirement
from pip._vendor.six import ensure_binary
from pip._internal.utils.packaging import get_metadata, get_requires_python
from pip._internal.utils.pkg_resources import DictMetadata
from tests.lib import skip_if_python2
def test_dict_metadata_works():
name = "simple"
version = "0.1.0"
require_a = "a==1.0"
require_b = "b==1.1; extra == 'also_b'"
requires = [require_a, require_b, "c==1.2; extra == 'also_c'"]
extras = ["also_b", "also_c"]
requires_python = ">=3"
metadata = Message()
metadata["Name"] = name
metadata["Version"] = version
for require in requires:
metadata["Requires-Dist"] = require
for extra in extras:
metadata["Provides-Extra"] = extra
metadata["Requires-Python"] = requires_python
inner_metadata = DictMetadata({
"METADATA": ensure_binary(metadata.as_string())
})
dist = DistInfoDistribution(
location="<in-memory>", metadata=inner_metadata, project_name=name
)
assert name == dist.project_name
assert version == dist.version
assert set(extras) == set(dist.extras)
assert [Requirement.parse(require_a)] == dist.requires([])
assert [
Requirement.parse(require_a), Requirement.parse(require_b)
] == dist.requires(["also_b"])
assert metadata.as_string() == get_metadata(dist).as_string()
assert requires_python == get_requires_python(dist)
# Metadata is not decoded on Python 2, so no chance for error.
@skip_if_python2
def test_dict_metadata_throws_on_bad_unicode():
metadata = DictMetadata({
"METADATA": b"\xff"
})
with pytest.raises(UnicodeDecodeError) as e:
metadata.get_metadata("METADATA")
assert "METADATA" in str(e.value)

View File

@ -85,7 +85,7 @@ def test_wheel_metadata_fails_missing_wheel(tmpdir, zip_dir):
with pytest.raises(UnsupportedWheel) as e:
wheel.wheel_metadata(zip_dir(tmpdir), dist_info_dir.name)
assert "could not read WHEEL file" in str(e.value)
assert "could not read" in str(e.value)
@skip_if_python2
@ -97,7 +97,7 @@ def test_wheel_metadata_fails_on_bad_encoding(tmpdir, zip_dir):
with pytest.raises(UnsupportedWheel) as e:
wheel.wheel_metadata(zip_dir(tmpdir), dist_info_dir.name)
assert "error decoding WHEEL" in str(e.value)
assert "error decoding" in str(e.value)
def test_wheel_version_fails_on_no_wheel_version():