import compileall import contextlib import fnmatch import http.server import os import re import shutil import subprocess import sys import threading from dataclasses import dataclass from enum import Enum from hashlib import sha256 from pathlib import Path from textwrap import dedent from typing import ( TYPE_CHECKING, Any, AnyStr, Callable, ClassVar, ContextManager, Dict, Iterable, Iterator, List, Optional, Set, Tuple, ) from unittest.mock import patch from zipfile import ZipFile import pytest # Config will be available from the public API in pytest >= 7.0.0: # https://github.com/pytest-dev/pytest/commit/88d84a57916b592b070f4201dc84f0286d1f9fef from _pytest.config import Config # Parser will be available from the public API in pytest >= 7.0.0: # https://github.com/pytest-dev/pytest/commit/538b5c24999e9ebb4fab43faabc8bcc28737bcdf from _pytest.config.argparsing import Parser from installer import install from installer.destinations import SchemeDictionaryDestination from installer.sources import WheelFile from pip import __file__ as pip_location from pip._internal.locations import _USE_SYSCONFIG from pip._internal.utils.temp_dir import global_tempdir_manager from tests.lib import ( DATA_DIR, SRC_DIR, CertFactory, InMemoryPip, PipTestEnvironment, ScriptFactory, TestData, ) from tests.lib.server import MockServer, make_mock_server from tests.lib.venv import VirtualEnvironment, VirtualEnvironmentType if TYPE_CHECKING: from pip._vendor.typing_extensions import Self def pytest_addoption(parser: Parser) -> None: parser.addoption( "--keep-tmpdir", action="store_true", default=False, help="keep temporary test directories", ) parser.addoption( "--resolver", action="store", default="resolvelib", choices=["resolvelib", "legacy"], help="use given resolver in tests", ) parser.addoption( "--use-venv", action="store_true", default=False, help="use venv for virtual environment creation", ) parser.addoption( "--run-search", action="store_true", default=False, help="run 'pip search' tests", ) parser.addoption( "--proxy", action="store", default=None, help="use given proxy in session network tests", ) parser.addoption( "--use-zipapp", action="store_true", default=False, help="use a zipapp when running pip in tests", ) def pytest_collection_modifyitems(config: Config, items: List[pytest.Function]) -> None: for item in items: if not hasattr(item, "module"): # e.g.: DoctestTextfile continue if item.get_closest_marker("search") and not config.getoption("--run-search"): item.add_marker(pytest.mark.skip("pip search test skipped")) if "CI" in os.environ: # Mark network tests as flaky if item.get_closest_marker("network") is not None: item.add_marker(pytest.mark.flaky(reruns=3, reruns_delay=2)) if ( item.get_closest_marker("incompatible_with_venv") and sys.prefix != sys.base_prefix ): item.add_marker(pytest.mark.skip("Incompatible with venv")) if item.get_closest_marker("incompatible_with_sysconfig") and _USE_SYSCONFIG: item.add_marker(pytest.mark.skip("Incompatible with sysconfig")) module_file = item.module.__file__ module_path = os.path.relpath( module_file, os.path.commonprefix([__file__, module_file]) ) module_root_dir = module_path.split(os.pathsep)[0] if ( module_root_dir.startswith("functional") or module_root_dir.startswith("integration") or module_root_dir.startswith("lib") ): item.add_marker(pytest.mark.integration) elif module_root_dir.startswith("unit"): item.add_marker(pytest.mark.unit) # We don't want to allow using the script resource if this is a # unit test, as unit tests should not need all that heavy lifting if "script" in item.fixturenames: raise RuntimeError( "Cannot use the ``script`` funcarg in a unit test: " f"(filename = {module_path}, item = {item})" ) else: raise RuntimeError(f"Unknown test type (filename = {module_path})") @pytest.fixture(scope="session", autouse=True) def resolver_variant(request: pytest.FixtureRequest) -> Iterator[str]: """Set environment variable to make pip default to the correct resolver.""" resolver = request.config.getoption("--resolver") # Handle the environment variables for this test. features = set(os.environ.get("PIP_USE_FEATURE", "").split()) deprecated_features = set(os.environ.get("PIP_USE_DEPRECATED", "").split()) if resolver == "legacy": deprecated_features.add("legacy-resolver") else: deprecated_features.discard("legacy-resolver") env = { "PIP_USE_FEATURE": " ".join(features), "PIP_USE_DEPRECATED": " ".join(deprecated_features), } with patch.dict(os.environ, env): yield resolver @pytest.fixture(scope="session") def tmp_path_factory( request: pytest.FixtureRequest, tmp_path_factory: pytest.TempPathFactory ) -> Iterator[pytest.TempPathFactory]: """Modified `tmpdir_factory` session fixture that will automatically cleanup after itself. """ yield tmp_path_factory if not request.config.getoption("--keep-tmpdir"): shutil.rmtree( tmp_path_factory.getbasetemp(), ignore_errors=True, ) @pytest.fixture(scope="session") def tmpdir_factory(tmp_path_factory: pytest.TempPathFactory) -> pytest.TempPathFactory: """Override Pytest's ``tmpdir_factory`` with our pathlib implementation. This prevents mis-use of this fixture. """ return tmp_path_factory @pytest.fixture def tmp_path(request: pytest.FixtureRequest, tmp_path: Path) -> Iterator[Path]: """ Return a temporary directory path object which is unique to each test function invocation, created as a sub directory of the base temporary directory. The returned object is a ``Path`` object. This uses the built-in tmp_path fixture from pytest itself, but deletes the temporary directories at the end of each test case. """ assert tmp_path.is_dir() yield tmp_path # Clear out the temporary directory after the test has finished using it. # This should prevent us from needing a multiple gigabyte temporary # directory while running the tests. if not request.config.getoption("--keep-tmpdir"): shutil.rmtree(tmp_path, ignore_errors=True) @pytest.fixture() def tmpdir(tmp_path: Path) -> Path: """Override Pytest's ``tmpdir`` with our pathlib implementation. This prevents mis-use of this fixture. """ return tmp_path @pytest.fixture(autouse=True) def isolate(tmpdir: Path, monkeypatch: pytest.MonkeyPatch) -> None: """ Isolate our tests so that things like global configuration files and the like do not affect our test results. We use an autouse function scoped fixture because we want to ensure that every test has it's own isolated home directory. """ # TODO: Figure out how to isolate from *system* level configuration files # as well as user level configuration files. # Create a directory to use as our home location. home_dir = os.path.join(str(tmpdir), "home") os.makedirs(home_dir) # Create a directory to use as a fake root fake_root = os.path.join(str(tmpdir), "fake-root") os.makedirs(fake_root) if sys.platform == "win32": # Note: this will only take effect in subprocesses... home_drive, home_path = os.path.splitdrive(home_dir) monkeypatch.setenv("USERPROFILE", home_dir) monkeypatch.setenv("HOMEDRIVE", home_drive) monkeypatch.setenv("HOMEPATH", home_path) for env_var, sub_path in ( ("APPDATA", "AppData/Roaming"), ("LOCALAPPDATA", "AppData/Local"), ): path = os.path.join(home_dir, *sub_path.split("/")) monkeypatch.setenv(env_var, path) os.makedirs(path) else: # Set our home directory to our temporary directory, this should force # all of our relative configuration files to be read from here instead # of the user's actual $HOME directory. monkeypatch.setenv("HOME", home_dir) # Isolate ourselves from XDG directories monkeypatch.setenv( "XDG_DATA_HOME", os.path.join( home_dir, ".local", "share", ), ) monkeypatch.setenv( "XDG_CONFIG_HOME", os.path.join( home_dir, ".config", ), ) monkeypatch.setenv("XDG_CACHE_HOME", os.path.join(home_dir, ".cache")) monkeypatch.setenv( "XDG_RUNTIME_DIR", os.path.join( home_dir, ".runtime", ), ) monkeypatch.setenv( "XDG_DATA_DIRS", os.pathsep.join( [ os.path.join(fake_root, "usr", "local", "share"), os.path.join(fake_root, "usr", "share"), ] ), ) monkeypatch.setenv( "XDG_CONFIG_DIRS", os.path.join( fake_root, "etc", "xdg", ), ) # Configure git, because without an author name/email git will complain # and cause test failures. monkeypatch.setenv("GIT_CONFIG_NOSYSTEM", "1") monkeypatch.setenv("GIT_AUTHOR_NAME", "pip") monkeypatch.setenv("GIT_AUTHOR_EMAIL", "distutils-sig@python.org") # We want to disable the version check from running in the tests monkeypatch.setenv("PIP_DISABLE_PIP_VERSION_CHECK", "true") # Make sure tests don't share a requirements tracker. monkeypatch.delenv("PIP_BUILD_TRACKER", False) # FIXME: Windows... os.makedirs(os.path.join(home_dir, ".config", "git")) with open(os.path.join(home_dir, ".config", "git", "config"), "wb") as fp: fp.write(b"[user]\n\tname = pip\n\temail = distutils-sig@python.org\n") @pytest.fixture(autouse=True) def scoped_global_tempdir_manager(request: pytest.FixtureRequest) -> Iterator[None]: """Make unit tests with globally-managed tempdirs easier Each test function gets its own individual scope for globally-managed temporary directories in the application. """ if "no_auto_tempdir_manager" in request.keywords: ctx: Callable[[], ContextManager[None]] = contextlib.nullcontext else: ctx = global_tempdir_manager with ctx(): yield @pytest.fixture(scope="session") def pip_src(tmpdir_factory: pytest.TempPathFactory) -> Path: def not_code_files_and_folders(path: str, names: List[str]) -> Iterable[str]: # In the root directory... if os.path.samefile(path, SRC_DIR): # ignore all folders except "src" folders = { name for name in names if os.path.isdir(os.path.join(path, name)) } to_ignore = folders - {"src"} # and ignore ".git" if present (which may be a file if in a linked # worktree). if ".git" in names: to_ignore.add(".git") return to_ignore # Ignore all compiled files and egg-info. ignored = set() for pattern in ("__pycache__", "*.pyc", "pip.egg-info"): ignored.update(fnmatch.filter(names, pattern)) return ignored pip_src = tmpdir_factory.mktemp("pip_src").joinpath("pip_src") # Copy over our source tree so that each use is self contained shutil.copytree( SRC_DIR, pip_src.resolve(), ignore=not_code_files_and_folders, ) return pip_src def _common_wheel_editable_install( tmpdir_factory: pytest.TempPathFactory, common_wheels: Path, package: str ) -> Path: wheel_candidates = list(common_wheels.glob(f"{package}-*.whl")) assert len(wheel_candidates) == 1, wheel_candidates install_dir = tmpdir_factory.mktemp(package) / "install" lib_install_dir = install_dir / "lib" bin_install_dir = install_dir / "bin" with WheelFile.open(wheel_candidates[0]) as source: install( source, SchemeDictionaryDestination( { "purelib": os.fspath(lib_install_dir), "platlib": os.fspath(lib_install_dir), "scripts": os.fspath(bin_install_dir), }, interpreter=sys.executable, script_kind="posix", ), additional_metadata={}, ) # The scripts are not necessary for our use cases, and they would be installed with # the wrong interpreter, so remove them. # TODO consider a refactoring by adding a install_from_wheel(path) method # to the virtualenv fixture. if bin_install_dir.exists(): shutil.rmtree(bin_install_dir) return lib_install_dir @pytest.fixture(scope="session") def setuptools_install( tmpdir_factory: pytest.TempPathFactory, common_wheels: Path ) -> Path: return _common_wheel_editable_install(tmpdir_factory, common_wheels, "setuptools") @pytest.fixture(scope="session") def wheel_install(tmpdir_factory: pytest.TempPathFactory, common_wheels: Path) -> Path: return _common_wheel_editable_install(tmpdir_factory, common_wheels, "wheel") @pytest.fixture(scope="session") def coverage_install( tmpdir_factory: pytest.TempPathFactory, common_wheels: Path ) -> Path: return _common_wheel_editable_install(tmpdir_factory, common_wheels, "coverage") def install_pth_link( venv: VirtualEnvironment, project_name: str, lib_dir: Path ) -> None: venv.site.joinpath(f"_pip_testsuite_{project_name}.pth").write_text( str(lib_dir.resolve()), encoding="utf-8" ) @pytest.fixture(scope="session") def virtualenv_template( request: pytest.FixtureRequest, tmpdir_factory: pytest.TempPathFactory, pip_src: Path, setuptools_install: Path, wheel_install: Path, coverage_install: Path, ) -> Iterator[VirtualEnvironment]: venv_type: VirtualEnvironmentType if request.config.getoption("--use-venv"): venv_type = "venv" else: venv_type = "virtualenv" # Create the virtual environment tmpdir = tmpdir_factory.mktemp("virtualenv") venv = VirtualEnvironment(tmpdir.joinpath("venv_orig"), venv_type=venv_type) # Install setuptools, wheel and pip. install_pth_link(venv, "setuptools", setuptools_install) install_pth_link(venv, "wheel", wheel_install) pip_editable = tmpdir_factory.mktemp("pip") / "pip" shutil.copytree(pip_src, pip_editable, symlinks=True) # noxfile.py is Python 3 only assert compileall.compile_dir( str(pip_editable), quiet=1, rx=re.compile("noxfile.py$"), ) subprocess.check_call( [os.fspath(venv.bin / "python"), "setup.py", "-q", "develop"], cwd=pip_editable ) # Install coverage and pth file for executing it in any spawned processes # in this virtual environment. install_pth_link(venv, "coverage", coverage_install) # zz prefix ensures the file is after easy-install.pth. with open(venv.site / "zz-coverage-helper.pth", "a") as f: f.write("import coverage; coverage.process_startup()") # Drop (non-relocatable) launchers. for exe in os.listdir(venv.bin): if not ( exe.startswith("python") or exe.startswith("libpy") # Don't remove libpypy-c.so... ): (venv.bin / exe).unlink() # Rename original virtualenv directory to make sure # it's not reused by mistake from one of the copies. venv_template = tmpdir / "venv_template" venv.move(venv_template) yield venv @pytest.fixture(scope="session") def virtualenv_factory( virtualenv_template: VirtualEnvironment, ) -> Callable[[Path], VirtualEnvironment]: def factory(tmpdir: Path) -> VirtualEnvironment: return VirtualEnvironment(tmpdir, virtualenv_template) return factory @pytest.fixture def virtualenv( virtualenv_factory: Callable[[Path], VirtualEnvironment], tmpdir: Path ) -> Iterator[VirtualEnvironment]: """ Return a virtual environment which is unique to each test function invocation created inside of a sub directory of the test function's temporary directory. The returned object is a ``tests.lib.venv.VirtualEnvironment`` object. """ yield virtualenv_factory(tmpdir.joinpath("workspace", "venv")) @pytest.fixture(scope="session") def script_factory( virtualenv_factory: Callable[[Path], VirtualEnvironment], deprecated_python: bool, zipapp: Optional[str], ) -> ScriptFactory: def factory( tmpdir: Path, virtualenv: Optional[VirtualEnvironment] = None, environ: Optional[Dict[AnyStr, AnyStr]] = None, ) -> PipTestEnvironment: kwargs = {} if environ: kwargs["environ"] = environ if virtualenv is None: virtualenv = virtualenv_factory(tmpdir.joinpath("venv")) return PipTestEnvironment( # The base location for our test environment tmpdir, # Tell the Test Environment where our virtualenv is located virtualenv=virtualenv, # Do not ignore hidden files, they need to be checked as well ignore_hidden=False, # We are starting with an already empty directory start_clear=False, # We want to ensure no temporary files are left behind, so the # PipTestEnvironment needs to capture and assert against temp capture_temp=True, assert_no_temp=True, # Deprecated python versions produce an extra deprecation warning pip_expect_warning=deprecated_python, # Tell the Test Environment if we want to run pip via a zipapp zipapp=zipapp, **kwargs, ) return factory ZIPAPP_MAIN = """\ #!/usr/bin/env python import os import runpy import sys lib = os.path.join(os.path.dirname(__file__), "lib") sys.path.insert(0, lib) runpy.run_module("pip", run_name="__main__") """ def make_zipapp_from_pip(zipapp_name: Path) -> None: pip_dir = Path(pip_location).parent with zipapp_name.open("wb") as zipapp_file: zipapp_file.write(b"#!/usr/bin/env python\n") with ZipFile(zipapp_file, "w") as zipapp: for pip_file in pip_dir.rglob("*"): if pip_file.suffix == ".pyc": continue if pip_file.name == "__pycache__": continue rel_name = pip_file.relative_to(pip_dir.parent) zipapp.write(pip_file, arcname=f"lib/{rel_name}") zipapp.writestr("__main__.py", ZIPAPP_MAIN) @pytest.fixture(scope="session") def zipapp( request: pytest.FixtureRequest, tmpdir_factory: pytest.TempPathFactory ) -> Optional[str]: """ If the user requested for pip to be run from a zipapp, build that zipapp and return its location. If the user didn't request a zipapp, return None. This fixture is session scoped, so the zipapp will only be created once. """ if not request.config.getoption("--use-zipapp"): return None temp_location = tmpdir_factory.mktemp("zipapp") pyz_file = temp_location / "pip.pyz" make_zipapp_from_pip(pyz_file) return str(pyz_file) @pytest.fixture def script( request: pytest.FixtureRequest, tmpdir: Path, virtualenv: VirtualEnvironment, script_factory: ScriptFactory, ) -> PipTestEnvironment: """ Return a PipTestEnvironment which is unique to each test function and will execute all commands inside of the unique virtual environment for this test function. The returned object is a ``tests.lib.PipTestEnvironment``. """ return script_factory(tmpdir.joinpath("workspace"), virtualenv) @pytest.fixture(scope="session") def common_wheels() -> Path: """Provide a directory with latest setuptools and wheel wheels""" return DATA_DIR.joinpath("common_wheels") @pytest.fixture(scope="session") def shared_data(tmpdir_factory: pytest.TempPathFactory) -> TestData: return TestData.copy(tmpdir_factory.mktemp("data")) @pytest.fixture def data(tmpdir: Path) -> TestData: return TestData.copy(tmpdir.joinpath("data")) @pytest.fixture def in_memory_pip() -> InMemoryPip: return InMemoryPip() @pytest.fixture(scope="session") def deprecated_python() -> bool: """Used to indicate whether pip deprecated this Python version""" return sys.version_info[:2] in [] @pytest.fixture(scope="session") def cert_factory(tmpdir_factory: pytest.TempPathFactory) -> CertFactory: # Delay the import requiring cryptography in order to make it possible # to deselect relevant tests on systems where cryptography cannot # be installed. from tests.lib.certs import make_tls_cert, serialize_cert, serialize_key def factory() -> str: """Returns path to cert/key file.""" output_path = tmpdir_factory.mktemp("certs") / "cert.pem" # Must be Text on PY2. cert, key = make_tls_cert("localhost") with open(str(output_path), "wb") as f: f.write(serialize_cert(cert)) f.write(serialize_key(key)) return str(output_path) return factory @pytest.fixture def mock_server() -> Iterator[MockServer]: server = make_mock_server() test_server = MockServer(server) with test_server.context: yield test_server @pytest.fixture def proxy(request: pytest.FixtureRequest) -> str: return request.config.getoption("proxy") @pytest.fixture def enable_user_site(virtualenv: VirtualEnvironment) -> None: virtualenv.user_site_packages = True class MetadataKind(Enum): """All the types of values we might be provided for the data-dist-info-metadata attribute from PEP 658.""" # Valid: will read metadata from the dist instead. No = "none" # Valid: will read the .metadata file, but won't check its hash. Unhashed = "unhashed" # Valid: will read the .metadata file and check its hash matches. Sha256 = "sha256" # Invalid: will error out after checking the hash. WrongHash = "wrong-hash" # Invalid: will error out after failing to fetch the .metadata file. NoFile = "no-file" @dataclass(frozen=True) class FakePackage: """Mock package structure used to generate a PyPI repository. FakePackage name and version should correspond to sdists (.tar.gz files) in our test data.""" name: str version: str filename: str metadata: MetadataKind # This will override any dependencies specified in the actual dist's METADATA. requires_dist: Tuple[str, ...] = () # This will override the Name specified in the actual dist's METADATA. metadata_name: Optional[str] = None def metadata_filename(self) -> str: """This is specified by PEP 658.""" return f"{self.filename}.metadata" def generate_additional_tag(self) -> str: """This gets injected into the tag in the generated PyPI index page for this package.""" if self.metadata == MetadataKind.No: return "" if self.metadata in [MetadataKind.Unhashed, MetadataKind.NoFile]: return 'data-dist-info-metadata="true"' if self.metadata == MetadataKind.WrongHash: return 'data-dist-info-metadata="sha256=WRONG-HASH"' assert self.metadata == MetadataKind.Sha256 checksum = sha256(self.generate_metadata()).hexdigest() return f'data-dist-info-metadata="sha256={checksum}"' def requires_str(self) -> str: if not self.requires_dist: return "" joined = " and ".join(self.requires_dist) return f"Requires-Dist: {joined}" def generate_metadata(self) -> bytes: """This is written to `self.metadata_filename()` and will override the actual dist's METADATA, unless `self.metadata == MetadataKind.NoFile`.""" return dedent( f"""\ Metadata-Version: 2.1 Name: {self.metadata_name or self.name} Version: {self.version} {self.requires_str()} """ ).encode("utf-8") @pytest.fixture(scope="session") def fake_packages() -> Dict[str, List[FakePackage]]: """The package database we generate for testing PEP 658 support.""" return { "simple": [ FakePackage("simple", "1.0", "simple-1.0.tar.gz", MetadataKind.Sha256), FakePackage("simple", "2.0", "simple-2.0.tar.gz", MetadataKind.No), # This will raise a hashing error. FakePackage("simple", "3.0", "simple-3.0.tar.gz", MetadataKind.WrongHash), ], "simple2": [ # Override the dependencies here in order to force pip to download # simple-1.0.tar.gz as well. FakePackage( "simple2", "1.0", "simple2-1.0.tar.gz", MetadataKind.Unhashed, ("simple==1.0",), ), # This will raise an error when pip attempts to fetch the metadata file. FakePackage("simple2", "2.0", "simple2-2.0.tar.gz", MetadataKind.NoFile), # This has a METADATA file with a mismatched name. FakePackage( "simple2", "3.0", "simple2-3.0.tar.gz", MetadataKind.Sha256, metadata_name="not-simple2", ), ], "colander": [ # Ensure we can read the dependencies from a metadata file within a wheel # *without* PEP 658 metadata. FakePackage( "colander", "0.9.9", "colander-0.9.9-py2.py3-none-any.whl", MetadataKind.No, ), ], "compilewheel": [ # Ensure we can override the dependencies of a wheel file by injecting PEP # 658 metadata. FakePackage( "compilewheel", "1.0", "compilewheel-1.0-py2.py3-none-any.whl", MetadataKind.Unhashed, ("simple==1.0",), ), ], "has-script": [ # Ensure we check PEP 658 metadata hashing errors for wheel files. FakePackage( "has-script", "1.0", "has.script-1.0-py2.py3-none-any.whl", MetadataKind.WrongHash, ), ], "translationstring": [ FakePackage( "translationstring", "1.1", "translationstring-1.1.tar.gz", MetadataKind.No, ), ], "priority": [ # Ensure we check for a missing metadata file for wheels. FakePackage( "priority", "1.0", "priority-1.0-py2.py3-none-any.whl", MetadataKind.NoFile, ), ], "requires-simple-extra": [ # Metadata name is not canonicalized. FakePackage( "requires-simple-extra", "0.1", "requires_simple_extra-0.1-py2.py3-none-any.whl", MetadataKind.Sha256, metadata_name="Requires_Simple.Extra", ), ], } @pytest.fixture(scope="session") def html_index_for_packages( shared_data: TestData, fake_packages: Dict[str, List[FakePackage]], tmpdir_factory: pytest.TempPathFactory, ) -> Path: """Generate a PyPI HTML package index within a local directory pointing to synthetic test data.""" html_dir = tmpdir_factory.mktemp("fake_index_html_content") # (1) Generate the content for a PyPI index.html. pkg_links = "\n".join( f' {pkg}' for pkg in fake_packages.keys() ) # Output won't be nicely indented because dedent() acts after f-string # arg insertion. index_html = dedent( f"""\ Simple index {pkg_links} """ ) # (2) Generate the index.html in a new subdirectory of the temp directory. (html_dir / "index.html").write_text(index_html) # (3) Generate subdirectories for individual packages, each with their own # index.html. for pkg, links in fake_packages.items(): pkg_subdir = html_dir / pkg pkg_subdir.mkdir() download_links: List[str] = [] for package_link in links: # (3.1) Generate the tag which pip can crawl pointing to this # specific package version. download_links.append( f' {package_link.filename}
' # noqa: E501 ) # (3.2) Copy over the corresponding file in `shared_data.packages`. shutil.copy( shared_data.packages / package_link.filename, pkg_subdir / package_link.filename, ) # (3.3) Write a metadata file, if applicable. if package_link.metadata != MetadataKind.NoFile: with open(pkg_subdir / package_link.metadata_filename(), "wb") as f: f.write(package_link.generate_metadata()) # (3.4) After collating all the download links and copying over the files, # write an index.html with the generated download links for each # copied file for this specific package name. download_links_str = "\n".join(download_links) pkg_index_content = dedent( f"""\ Links for {pkg}

Links for {pkg}

{download_links_str} """ ) with open(pkg_subdir / "index.html", "w") as f: f.write(pkg_index_content) return html_dir class OneTimeDownloadHandler(http.server.SimpleHTTPRequestHandler): """Serve files from the current directory, but error if a file is downloaded more than once.""" _seen_paths: ClassVar[Set[str]] = set() def do_GET(self) -> None: if self.path in self._seen_paths: self.send_error( http.HTTPStatus.NOT_FOUND, f"File {self.path} not available more than once!", ) return super().do_GET() if not (self.path.endswith("/") or self.path.endswith(".metadata")): self._seen_paths.add(self.path) @pytest.fixture(scope="function") def html_index_with_onetime_server( html_index_for_packages: Path, ) -> Iterator[http.server.ThreadingHTTPServer]: """Serve files from a generated pypi index, erroring if a file is downloaded more than once. Provide `-i http://localhost:8000` to pip invocations to point them at this server. """ class InDirectoryServer(http.server.ThreadingHTTPServer): def finish_request(self: "Self", request: Any, client_address: Any) -> None: self.RequestHandlerClass( request, client_address, self, directory=str(html_index_for_packages), # type: ignore[call-arg] ) class Handler(OneTimeDownloadHandler): _seen_paths: ClassVar[Set[str]] = set() with InDirectoryServer(("", 8000), Handler) as httpd: server_thread = threading.Thread(target=httpd.serve_forever) server_thread.start() try: yield httpd finally: httpd.shutdown() server_thread.join()