pip/tests/unit/test_utils_parallel.py

74 lines
2.3 KiB
Python

"""Test multiprocessing/multithreading higher-order functions."""
from contextlib import contextmanager
from importlib import import_module
from math import factorial
from sys import modules
from typing import Any, Iterator
import pytest
DUNDER_IMPORT = "builtins.__import__"
FUNC, ITERABLE = factorial, range(42)
MAPS = "map_multiprocess", "map_multithread"
_import = __import__
def unload_parallel() -> None:
try:
del modules["pip._internal.utils.parallel"]
except KeyError:
pass
@contextmanager
def tmp_import_parallel() -> Iterator[Any]:
unload_parallel()
try:
yield import_module("pip._internal.utils.parallel")
finally:
unload_parallel()
def lack_sem_open(name: str, *args: Any, **kwargs: Any) -> Any:
"""Raise ImportError on import of multiprocessing.synchronize."""
if name.endswith("synchronize"):
raise ImportError
return _import(name, *args, **kwargs)
def have_sem_open(name: str, *args: Any, **kwargs: Any) -> Any:
"""Make sure multiprocessing.synchronize import is successful."""
# We don't care about the return value
# since we don't use the pool with this import.
if name.endswith("synchronize"):
return None
return _import(name, *args, **kwargs)
@pytest.mark.parametrize("name", MAPS)
def test_lack_sem_open(name: str, monkeypatch: pytest.MonkeyPatch) -> None:
"""Test fallback when sem_open is not available.
If so, multiprocessing[.dummy].Pool will fail to be created and
map_async should fallback to map.
"""
monkeypatch.setattr(DUNDER_IMPORT, lack_sem_open)
with tmp_import_parallel() as parallel:
assert getattr(parallel, name) is parallel._map_fallback
@pytest.mark.parametrize("name", MAPS)
def test_have_sem_open(name: str, monkeypatch: pytest.MonkeyPatch) -> None:
"""Test fallback when sem_open is available."""
monkeypatch.setattr(DUNDER_IMPORT, have_sem_open)
with tmp_import_parallel() as parallel:
assert getattr(parallel, name) is getattr(parallel, f"_{name}")
@pytest.mark.parametrize("name", MAPS)
def test_map(name: str) -> None:
"""Test correctness of result of asynchronous maps."""
map_async = getattr(import_module("pip._internal.utils.parallel"), name)
assert set(map_async(FUNC, ITERABLE)) == set(map(FUNC, ITERABLE))