Wrap lazy map as well

This commit is contained in:
Nguyễn Gia Phong 2020-06-03 23:15:34 +07:00
parent e7f637e5ca
commit 13539d00f8
2 changed files with 230 additions and 44 deletions

View File

@ -1,65 +1,215 @@
"""Convenient parallelization of higher order functions."""
"""Convenient parallelization of higher order functions.
__all__ = ['map_multiprocess', 'map_multithread']
This module provides proper fallback functions for multiprocess
and multithread map, both the non-lazy, ordered variant
and the lazy, unordered variant.
"""
__all__ = ['map_multiprocess', 'imap_multiprocess',
'map_multithread', 'imap_multithread']
from contextlib import contextmanager
from multiprocessing import Pool as ProcessPool
from multiprocessing.dummy import Pool as ThreadPool
from pip._vendor.requests.adapters import DEFAULT_POOLSIZE
from pip._vendor.six import PY2
from pip._vendor.six.moves import map
from pip._internal.utils.typing import MYPY_CHECK_RUNNING
if MYPY_CHECK_RUNNING:
from typing import Callable, Iterable, List, Optional, TypeVar
from typing import (
Callable, Iterable, Iterator, List, Optional, Union, TypeVar)
from multiprocessing import pool
Pool = Union[pool.Pool, pool.ThreadPool]
S = TypeVar('S')
T = TypeVar('T')
# On platforms without sem_open, multiprocessing[.dummy] Pool
# cannot be created.
try:
import multiprocessing.synchronize # noqa
except ImportError:
LACK_SEM_OPEN = True
else:
LACK_SEM_OPEN = False
def map_multiprocess(func, iterable, chunksize=None, timeout=2000000):
# type: (Callable[[S], T], Iterable[S], Optional[int], int) -> List[T]
# Incredibly large timeout to work around bpo-8296 on Python 2.
TIMEOUT = 2000000
@contextmanager
def closing(pool):
# type: (Pool) -> Iterator[Pool]
"""Return a context manager that closes and joins pool.
This is needed for Pool.imap* to make the result iterator iterate.
"""
try:
yield pool
finally:
pool.close()
pool.join()
def _map_fallback(func, iterable, chunksize=None):
# type: (Callable[[S], T], Iterable[S], Optional[int]) -> List[T]
"""Return a list of func applied to each element in iterable.
This function is the sequential fallback when sem_open is unavailable.
"""
return list(map(func, iterable))
def _imap_fallback(func, iterable, chunksize=1):
# type: (Callable[[S], T], Iterable[S], int) -> Iterator[T]
"""Make an iterator applying func to each element in iterable.
This function is the sequential fallback when sem_open is unavailable.
"""
return map(func, iterable)
def _map_multiprocess_py2(func, iterable, chunksize=None):
# type: (Callable[[S], T], Iterable[S], Optional[int]) -> List[T]
"""Chop iterable into chunks and submit them to a process pool.
The (approximate) size of these chunks can be specified
by setting chunksize to a positive integer.
Block either until the results are ready and return them in a list
or till timeout is reached. By default timeout is an incredibly
large number to work around bpo-8296 on Python 2.
Note that this function may cause high memory usage
for long iterables.
Note that it may cause high memory usage for long iterables.
Return a list of results in order.
"""
pool = ProcessPool()
try:
pool = ProcessPool()
except ImportError:
return list(map(func, iterable))
else:
try:
return pool.map_async(func, iterable, chunksize).get(timeout)
finally:
pool.terminate()
return pool.map_async(func, iterable, chunksize).get(TIMEOUT)
finally:
pool.terminate()
def map_multithread(func, iterable, chunksize=None, timeout=2000000):
# type: (Callable[[S], T], Iterable[S], Optional[int], int) -> List[T]
def _map_multiprocess_py3(func, iterable, chunksize=None):
# type: (Callable[[S], T], Iterable[S], Optional[int]) -> List[T]
"""Chop iterable into chunks and submit them to a process pool.
The (approximate) size of these chunks can be specified
by setting chunksize to a positive integer.
Note that this function may cause high memory usage
for long iterables.
Return a list of results in order.
"""
with ProcessPool() as pool:
return pool.map(func, iterable, chunksize)
def _imap_multiprocess_py2(func, iterable, chunksize=1):
# type: (Callable[[S], T], Iterable[S], int) -> Iterator[T]
"""Chop iterable into chunks and submit them to a process pool.
For very long iterables using a large value for chunksize can make
the job complete much faster than using the default value of 1.
Return an unordered iterator of the results.
"""
pool = ProcessPool()
try:
return iter(pool.map_async(func, iterable, chunksize).get(TIMEOUT))
finally:
pool.terminate()
def _imap_multiprocess_py3(func, iterable, chunksize=1):
# type: (Callable[[S], T], Iterable[S], int) -> Iterator[T]
"""Chop iterable into chunks and submit them to a process pool.
For very long iterables using a large value for chunksize can make
the job complete much faster than using the default value of 1.
Return an unordered iterator of the results.
"""
with ProcessPool() as pool, closing(pool):
return pool.imap_unordered(func, iterable, chunksize)
def _map_multithread_py2(func, iterable, chunksize=None):
# type: (Callable[[S], T], Iterable[S], Optional[int]) -> List[T]
"""Chop iterable into chunks and submit them to a thread pool.
The (approximate) size of these chunks can be specified
by setting chunksize to a positive integer.
Block either until the results are ready and return them in a list
or till timeout is reached. By default timeout is an incredibly
large number to work around bpo-8296 on Python 2.
Note that this function may cause high memory usage
for long iterables.
Note that it may cause high memory usage for long iterables.
Return a list of results in order.
"""
pool = ThreadPool(DEFAULT_POOLSIZE)
try:
pool = ThreadPool(DEFAULT_POOLSIZE)
except ImportError:
return list(map(func, iterable))
else:
try:
return pool.map_async(func, iterable, chunksize).get(timeout)
finally:
pool.terminate()
return pool.map_async(func, iterable, chunksize).get(TIMEOUT)
finally:
pool.terminate()
def _map_multithread_py3(func, iterable, chunksize=None):
# type: (Callable[[S], T], Iterable[S], Optional[int]) -> List[T]
"""Chop iterable into chunks and submit them to a thread pool.
The (approximate) size of these chunks can be specified
by setting chunksize to a positive integer.
Note that this function may cause high memory usage
for long iterables.
Return a list of results in order.
"""
with ThreadPool(DEFAULT_POOLSIZE) as pool:
return pool.map(func, iterable, chunksize)
def _imap_multithread_py2(func, iterable, chunksize=1):
# type: (Callable[[S], T], Iterable[S], int) -> Iterator[T]
"""Chop iterable into chunks and submit them to a thread pool.
For very long iterables using a large value for chunksize can make
the job complete much faster than using the default value of 1.
Return an unordered iterator of the results.
"""
pool = ThreadPool(DEFAULT_POOLSIZE)
try:
return pool.map_async(func, iterable, chunksize).get(TIMEOUT)
finally:
pool.terminate()
def _imap_multithread_py3(func, iterable, chunksize=1):
# type: (Callable[[S], T], Iterable[S], int) -> Iterator[T]
"""Chop iterable into chunks and submit them to a thread pool.
For very long iterables using a large value for chunksize can make
the job complete much faster than using the default value of 1.
Return an unordered iterator of the results.
"""
with ThreadPool(DEFAULT_POOLSIZE) as pool, closing(pool):
return pool.imap_unordered(func, iterable, chunksize)
if LACK_SEM_OPEN:
map_multiprocess = map_multithread = _map_fallback
imap_multiprocess = imap_multithread = _imap_fallback
elif PY2:
map_multiprocess = _map_multiprocess_py2
imap_multiprocess = _imap_multiprocess_py2
map_multithread = _map_multithread_py2
imap_multithread = _imap_multithread_py2
else:
map_multiprocess = _map_multiprocess_py3
imap_multiprocess = _imap_multiprocess_py3
map_multithread = _map_multithread_py3
imap_multithread = _imap_multithread_py3

View File

@ -1,36 +1,72 @@
"""Test multiprocessing/multithreading higher-order functions."""
from importlib import import_module
from math import factorial
from sys import modules
from mock import patch
from pip._vendor.six import PY2
from pip._vendor.six.moves import map
from pytest import mark
from pip._internal.utils.parallel import map_multiprocess, map_multithread
DUNDER_IMPORT = '__builtin__.__import__' if PY2 else 'builtins.__import__'
FUNC, ITERABLE = factorial, range(42)
MAPS = ('map_multiprocess', 'imap_multiprocess',
'map_multithread', 'imap_multithread')
_import = __import__
def import_sem_open(name, *args, **kwargs):
def reload_parallel():
try:
del modules['pip._internal.utils.parallel']
finally:
return import_module('pip._internal.utils.parallel')
def lack_sem_open(name, *args, **kwargs):
"""Raise ImportError on import of multiprocessing.synchronize."""
if name.endswith('.synchronize'):
if name.endswith('synchronize'):
raise ImportError
return _import(name, *args, **kwargs)
@mark.parametrize('map_async', (map_multiprocess, map_multithread))
def test_missing_sem_open(map_async, monkeypatch):
def have_sem_open(name, *args, **kwargs):
"""Make sure multiprocessing.synchronize import is successful."""
if name.endswith('synchronize'):
return
return _import(name, *args, **kwargs)
@mark.parametrize('name', MAPS)
def test_lack_sem_open(name, monkeypatch):
"""Test fallback when sem_open is not available.
If so, multiprocessing[.dummy].Pool will fail to be created and
map_async should fallback to map and still return correct result.
map_async should fallback to map.
"""
with patch(DUNDER_IMPORT, side_effect=import_sem_open):
assert map_async(FUNC, ITERABLE) == list(map(FUNC, ITERABLE))
monkeypatch.setattr(DUNDER_IMPORT, lack_sem_open)
parallel = reload_parallel()
fallback = '_{}_fallback'.format(name.split('_')[0])
assert getattr(parallel, name) is getattr(parallel, fallback)
@mark.parametrize('map_async', (map_multiprocess, map_multithread))
def test_map_order(map_async):
@mark.parametrize('name', MAPS)
def test_have_sem_open(name, monkeypatch):
"""Test fallback when sem_open is available."""
monkeypatch.setattr(DUNDER_IMPORT, have_sem_open)
parallel = reload_parallel()
impl = ('_{}_py2' if PY2 else '_{}_py3').format(name)
assert getattr(parallel, name) is getattr(parallel, impl)
@mark.parametrize('name', MAPS)
def test_map(name):
"""Test correctness of result of asynchronous maps."""
map_async = getattr(reload_parallel(), name)
assert set(map_async(FUNC, ITERABLE)) == set(map(FUNC, ITERABLE))
@mark.parametrize('name', ('map_multiprocess', 'map_multithread'))
def test_map_order(name):
"""Test result ordering of asynchronous maps."""
assert map_async(FUNC, ITERABLE) == list(map(FUNC, ITERABLE))
map_async = getattr(reload_parallel(), name)
assert tuple(map_async(FUNC, ITERABLE)) == tuple(map(FUNC, ITERABLE))