1
1
Fork 0
mirror of https://github.com/pypa/pip synced 2023-12-13 21:30:23 +01:00

Merge pull request #7538 from chrahunt/refactor/get-metadata-from-zip

Read wheel metadata from wheel directly
This commit is contained in:
Xavier Fernandez 2020-01-02 10:35:45 +01:00 committed by GitHub
commit a71086eb9c
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
2 changed files with 102 additions and 35 deletions

View file

@ -18,12 +18,13 @@ import sys
import warnings
from base64 import urlsafe_b64encode
from email.parser import Parser
from zipfile import ZipFile
from pip._vendor import pkg_resources
from pip._vendor.distlib.scripts import ScriptMaker
from pip._vendor.distlib.util import get_export_entry
from pip._vendor.packaging.utils import canonicalize_name
from pip._vendor.six import StringIO, ensure_str
from pip._vendor.six import PY2, StringIO, ensure_str
from pip._internal.exceptions import InstallationError, UnsupportedWheel
from pip._internal.locations import get_major_minor_version
@ -43,6 +44,11 @@ if MYPY_CHECK_RUNNING:
InstalledCSVRow = Tuple[str, ...]
if PY2:
from zipfile import BadZipfile as BadZipFile
else:
from zipfile import BadZipFile
VERSION_COMPATIBLE = (1, 0)
@ -286,6 +292,7 @@ class PipScriptMaker(ScriptMaker):
def install_unpacked_wheel(
name, # type: str
wheeldir, # type: str
wheel_zip, # type: ZipFile
scheme, # type: Scheme
req_description, # type: str
pycompile=True, # type: bool
@ -296,6 +303,7 @@ def install_unpacked_wheel(
:param name: Name of the project to install
:param wheeldir: Base directory of the unpacked wheel
:param wheel_zip: open ZipFile for wheel being installed
:param scheme: Distutils scheme dictating the install directories
:param req_description: String used in place of the requirement, for
logging
@ -313,16 +321,7 @@ def install_unpacked_wheel(
source = wheeldir.rstrip(os.path.sep) + os.path.sep
try:
info_dir = wheel_dist_info_dir(source, name)
metadata = wheel_metadata(source, info_dir)
version = wheel_version(metadata)
except UnsupportedWheel as e:
raise UnsupportedWheel(
"{} has an invalid wheel, {}".format(name, str(e))
)
check_compatibility(version, name)
info_dir, metadata = parse_wheel(wheel_zip, name)
if wheel_root_is_purelib(metadata):
lib_dir = scheme.purelib
@ -612,11 +611,12 @@ def install_wheel(
# type: (...) -> None
with TempDirectory(
path=_temp_dir_for_testing, kind="unpacked-wheel"
) as unpacked_dir:
) as unpacked_dir, ZipFile(wheel_path, allowZip64=True) as z:
unpack_file(wheel_path, unpacked_dir.path)
install_unpacked_wheel(
name=name,
wheeldir=unpacked_dir.path,
wheel_zip=z,
scheme=scheme,
req_description=req_description,
pycompile=pycompile,
@ -624,14 +624,37 @@ def install_wheel(
)
def parse_wheel(wheel_zip, name):
# type: (ZipFile, str) -> Tuple[str, Message]
"""Extract information from the provided wheel, ensuring it meets basic
standards.
Returns the name of the .dist-info directory and the parsed WHEEL metadata.
"""
try:
info_dir = wheel_dist_info_dir(wheel_zip, name)
metadata = wheel_metadata(wheel_zip, info_dir)
version = wheel_version(metadata)
except UnsupportedWheel as e:
raise UnsupportedWheel(
"{} has an invalid wheel, {}".format(name, str(e))
)
check_compatibility(version, name)
return info_dir, metadata
def wheel_dist_info_dir(source, name):
# type: (str, str) -> str
# type: (ZipFile, str) -> str
"""Returns the name of the contained .dist-info directory.
Raises AssertionError or UnsupportedWheel if not found, >1 found, or
it doesn't match the provided name.
"""
subdirs = os.listdir(source)
# Zip file path separators must be /
subdirs = list(set(p.split("/")[0] for p in source.namelist()))
info_dirs = [s for s in subdirs if s.endswith('.dist-info')]
if not info_dirs:
@ -655,19 +678,26 @@ def wheel_dist_info_dir(source, name):
)
)
return info_dir
# Zip file paths can be unicode or str depending on the zip entry flags,
# so normalize it.
return ensure_str(info_dir)
def wheel_metadata(source, dist_info_dir):
# type: (str, str) -> Message
# type: (ZipFile, str) -> Message
"""Return the WHEEL metadata of an extracted wheel, if possible.
Otherwise, raise UnsupportedWheel.
"""
try:
with open(os.path.join(source, dist_info_dir, "WHEEL"), "rb") as f:
wheel_text = ensure_str(f.read())
except (IOError, OSError) as e:
# Zip file path separators must be /
wheel_contents = source.read("{}/WHEEL".format(dist_info_dir))
# BadZipFile for general corruption, KeyError for missing entry,
# and RuntimeError for password-protected files
except (BadZipFile, KeyError, RuntimeError) as e:
raise UnsupportedWheel("could not read WHEEL file: {!r}".format(e))
try:
wheel_text = ensure_str(wheel_contents)
except UnicodeDecodeError as e:
raise UnsupportedWheel("error decoding WHEEL: {!r}".format(e))

View file

@ -4,9 +4,12 @@ import logging
import os
import textwrap
from email import message_from_string
from io import BytesIO
from zipfile import ZipFile
import pytest
from mock import patch
from pip._vendor.contextlib2 import ExitStack
from pip._vendor.packaging.requirements import Requirement
from pip._internal.exceptions import UnsupportedWheel
@ -22,9 +25,13 @@ from pip._internal.operations.install.wheel import (
)
from pip._internal.utils.compat import WINDOWS
from pip._internal.utils.misc import hash_file
from pip._internal.utils.typing import MYPY_CHECK_RUNNING
from pip._internal.utils.unpacking import unpack_file
from tests.lib import DATA_DIR, assert_paths_equal, skip_if_python2
if MYPY_CHECK_RUNNING:
from tests.lib.path import Path
def call_get_legacy_build_wheel_path(caplog, names):
wheel_path = get_legacy_build_wheel_path(
@ -190,30 +197,60 @@ def test_get_csv_rows_for_installed__long_lines(tmpdir, caplog):
assert messages == expected
def test_wheel_dist_info_dir_found(tmpdir):
@pytest.fixture
def zip_dir():
def make_zip(path):
# type: (Path) -> ZipFile
buf = BytesIO()
with ZipFile(buf, "w", allowZip64=True) as z:
for dirpath, dirnames, filenames in os.walk(path):
for filename in filenames:
file_path = os.path.join(path, dirpath, filename)
# Zip files must always have / as path separator
archive_path = os.path.relpath(file_path, path).replace(
os.pathsep, "/"
)
z.write(file_path, archive_path)
return stack.enter_context(ZipFile(buf, "r", allowZip64=True))
stack = ExitStack()
with stack:
yield make_zip
def test_wheel_dist_info_dir_found(tmpdir, zip_dir):
expected = "simple-0.1.dist-info"
tmpdir.joinpath(expected).mkdir()
assert wheel.wheel_dist_info_dir(str(tmpdir), "simple") == expected
dist_info_dir = tmpdir / expected
dist_info_dir.mkdir()
dist_info_dir.joinpath("WHEEL").touch()
assert wheel.wheel_dist_info_dir(zip_dir(tmpdir), "simple") == expected
def test_wheel_dist_info_dir_multiple(tmpdir):
tmpdir.joinpath("simple-0.1.dist-info").mkdir()
tmpdir.joinpath("unrelated-0.1.dist-info").mkdir()
def test_wheel_dist_info_dir_multiple(tmpdir, zip_dir):
dist_info_dir_1 = tmpdir / "simple-0.1.dist-info"
dist_info_dir_1.mkdir()
dist_info_dir_1.joinpath("WHEEL").touch()
dist_info_dir_2 = tmpdir / "unrelated-0.1.dist-info"
dist_info_dir_2.mkdir()
dist_info_dir_2.joinpath("WHEEL").touch()
with pytest.raises(UnsupportedWheel) as e:
wheel.wheel_dist_info_dir(str(tmpdir), "simple")
wheel.wheel_dist_info_dir(zip_dir(tmpdir), "simple")
assert "multiple .dist-info directories found" in str(e.value)
def test_wheel_dist_info_dir_none(tmpdir):
def test_wheel_dist_info_dir_none(tmpdir, zip_dir):
with pytest.raises(UnsupportedWheel) as e:
wheel.wheel_dist_info_dir(str(tmpdir), "simple")
wheel.wheel_dist_info_dir(zip_dir(tmpdir), "simple")
assert "directory not found" in str(e.value)
def test_wheel_dist_info_dir_wrong_name(tmpdir):
tmpdir.joinpath("unrelated-0.1.dist-info").mkdir()
def test_wheel_dist_info_dir_wrong_name(tmpdir, zip_dir):
dist_info_dir = tmpdir / "unrelated-0.1.dist-info"
dist_info_dir.mkdir()
dist_info_dir.joinpath("WHEEL").touch()
with pytest.raises(UnsupportedWheel) as e:
wheel.wheel_dist_info_dir(str(tmpdir), "simple")
wheel.wheel_dist_info_dir(zip_dir(tmpdir), "simple")
assert "does not start with 'simple'" in str(e.value)
@ -223,25 +260,25 @@ def test_wheel_version_ok(tmpdir, data):
) == (1, 9)
def test_wheel_metadata_fails_missing_wheel(tmpdir):
def test_wheel_metadata_fails_missing_wheel(tmpdir, zip_dir):
dist_info_dir = tmpdir / "simple-0.1.0.dist-info"
dist_info_dir.mkdir()
dist_info_dir.joinpath("METADATA").touch()
with pytest.raises(UnsupportedWheel) as e:
wheel.wheel_metadata(str(tmpdir), dist_info_dir.name)
wheel.wheel_metadata(zip_dir(tmpdir), dist_info_dir.name)
assert "could not read WHEEL file" in str(e.value)
@skip_if_python2
def test_wheel_metadata_fails_on_bad_encoding(tmpdir):
def test_wheel_metadata_fails_on_bad_encoding(tmpdir, zip_dir):
dist_info_dir = tmpdir / "simple-0.1.0.dist-info"
dist_info_dir.mkdir()
dist_info_dir.joinpath("METADATA").touch()
dist_info_dir.joinpath("WHEEL").write_bytes(b"\xff")
with pytest.raises(UnsupportedWheel) as e:
wheel.wheel_metadata(str(tmpdir), dist_info_dir.name)
wheel.wheel_metadata(zip_dir(tmpdir), dist_info_dir.name)
assert "error decoding WHEEL" in str(e.value)