mirror of https://github.com/pypa/pip
Replace Iterator[T] with Generator[T,None, None] (#11007)
Co-authored-by: Tzu-ping Chung <uranusjr@gmail.com>
This commit is contained in:
parent
e679df4051
commit
3820b0e52c
|
@ -0,0 +1 @@
|
|||
Replace ``Iterator[T]`` with ``Generator[T, None, None]``.
|
|
@ -11,7 +11,7 @@ import zipfile
|
|||
from collections import OrderedDict
|
||||
from sysconfig import get_paths
|
||||
from types import TracebackType
|
||||
from typing import TYPE_CHECKING, Iterable, Iterator, List, Optional, Set, Tuple, Type
|
||||
from typing import TYPE_CHECKING, Generator, Iterable, List, Optional, Set, Tuple, Type
|
||||
|
||||
from pip._vendor.certifi import where
|
||||
from pip._vendor.packaging.requirements import Requirement
|
||||
|
@ -42,7 +42,7 @@ class _Prefix:
|
|||
|
||||
|
||||
@contextlib.contextmanager
|
||||
def _create_standalone_pip() -> Iterator[str]:
|
||||
def _create_standalone_pip() -> Generator[str, None, None]:
|
||||
"""Create a "standalone pip" zip file.
|
||||
|
||||
The zip file's content is identical to the currently-running pip.
|
||||
|
|
|
@ -1,5 +1,5 @@
|
|||
from contextlib import ExitStack, contextmanager
|
||||
from typing import ContextManager, Iterator, TypeVar
|
||||
from typing import ContextManager, Generator, TypeVar
|
||||
|
||||
_T = TypeVar("_T", covariant=True)
|
||||
|
||||
|
@ -11,7 +11,7 @@ class CommandContextMixIn:
|
|||
self._main_context = ExitStack()
|
||||
|
||||
@contextmanager
|
||||
def main_context(self) -> Iterator[None]:
|
||||
def main_context(self) -> Generator[None, None, None]:
|
||||
assert not self._in_main_context
|
||||
|
||||
self._in_main_context = True
|
||||
|
|
|
@ -6,7 +6,7 @@ import shutil
|
|||
import sys
|
||||
import textwrap
|
||||
from contextlib import suppress
|
||||
from typing import Any, Dict, Iterator, List, Tuple
|
||||
from typing import Any, Dict, Generator, List, Tuple
|
||||
|
||||
from pip._internal.cli.status_codes import UNKNOWN_ERROR
|
||||
from pip._internal.configuration import Configuration, ConfigurationError
|
||||
|
@ -175,7 +175,9 @@ class ConfigOptionParser(CustomOptionParser):
|
|||
print(f"An error occurred during configuration: {exc}")
|
||||
sys.exit(3)
|
||||
|
||||
def _get_ordered_configuration_items(self) -> Iterator[Tuple[str, Any]]:
|
||||
def _get_ordered_configuration_items(
|
||||
self,
|
||||
) -> Generator[Tuple[str, Any], None, None]:
|
||||
# Configuration gives keys in an unordered manner. Order them.
|
||||
override_order = ["global", self.name, ":env:"]
|
||||
|
||||
|
|
|
@ -1,5 +1,5 @@
|
|||
import functools
|
||||
from typing import Callable, Iterator, Optional, Tuple
|
||||
from typing import Callable, Generator, Iterable, Iterator, Optional, Tuple
|
||||
|
||||
from pip._vendor.rich.progress import (
|
||||
BarColumn,
|
||||
|
@ -16,15 +16,15 @@ from pip._vendor.rich.progress import (
|
|||
|
||||
from pip._internal.utils.logging import get_indentation
|
||||
|
||||
DownloadProgressRenderer = Callable[[Iterator[bytes]], Iterator[bytes]]
|
||||
DownloadProgressRenderer = Callable[[Iterable[bytes]], Iterator[bytes]]
|
||||
|
||||
|
||||
def _rich_progress_bar(
|
||||
iterable: Iterator[bytes],
|
||||
iterable: Iterable[bytes],
|
||||
*,
|
||||
bar_type: str,
|
||||
size: int,
|
||||
) -> Iterator[bytes]:
|
||||
) -> Generator[bytes, None, None]:
|
||||
assert bar_type == "on", "This should only be used in the default mode."
|
||||
|
||||
if not size:
|
||||
|
|
|
@ -3,7 +3,7 @@ import itertools
|
|||
import logging
|
||||
import sys
|
||||
import time
|
||||
from typing import IO, Iterator
|
||||
from typing import IO, Generator
|
||||
|
||||
from pip._internal.utils.compat import WINDOWS
|
||||
from pip._internal.utils.logging import get_indentation
|
||||
|
@ -113,7 +113,7 @@ class RateLimiter:
|
|||
|
||||
|
||||
@contextlib.contextmanager
|
||||
def open_spinner(message: str) -> Iterator[SpinnerInterface]:
|
||||
def open_spinner(message: str) -> Generator[SpinnerInterface, None, None]:
|
||||
# Interactive spinner goes directly to sys.stdout rather than being routed
|
||||
# through the logging system, but it acts like it has level INFO,
|
||||
# i.e. it's only displayed if we're at level INFO or better.
|
||||
|
@ -141,7 +141,7 @@ SHOW_CURSOR = "\x1b[?25h"
|
|||
|
||||
|
||||
@contextlib.contextmanager
|
||||
def hidden_cursor(file: IO[str]) -> Iterator[None]:
|
||||
def hidden_cursor(file: IO[str]) -> Generator[None, None, None]:
|
||||
# The Windows terminal does not support the hide/show cursor ANSI codes,
|
||||
# even via colorama. So don't even try.
|
||||
if WINDOWS:
|
||||
|
|
|
@ -1,7 +1,7 @@
|
|||
import json
|
||||
import logging
|
||||
from optparse import Values
|
||||
from typing import TYPE_CHECKING, Iterator, List, Optional, Sequence, Tuple, cast
|
||||
from typing import TYPE_CHECKING, Generator, List, Optional, Sequence, Tuple, cast
|
||||
|
||||
from pip._vendor.packaging.utils import canonicalize_name
|
||||
|
||||
|
@ -222,7 +222,7 @@ class ListCommand(IndexGroupCommand):
|
|||
|
||||
def iter_packages_latest_infos(
|
||||
self, packages: "_ProcessedDists", options: Values
|
||||
) -> Iterator["_DistWithLatestInfo"]:
|
||||
) -> Generator["_DistWithLatestInfo", None, None]:
|
||||
with self._build_session(options) as session:
|
||||
finder = self._build_package_finder(options, session)
|
||||
|
||||
|
|
|
@ -1,6 +1,6 @@
|
|||
import logging
|
||||
from optparse import Values
|
||||
from typing import Iterator, List, NamedTuple, Optional
|
||||
from typing import Generator, Iterable, Iterator, List, NamedTuple, Optional
|
||||
|
||||
from pip._vendor.packaging.utils import canonicalize_name
|
||||
|
||||
|
@ -67,7 +67,7 @@ class _PackageInfo(NamedTuple):
|
|||
files: Optional[List[str]]
|
||||
|
||||
|
||||
def search_packages_info(query: List[str]) -> Iterator[_PackageInfo]:
|
||||
def search_packages_info(query: List[str]) -> Generator[_PackageInfo, None, None]:
|
||||
"""
|
||||
Gather details from installed distributions. Print distribution name,
|
||||
version, location, and installed files. Installed files requires a
|
||||
|
@ -135,7 +135,7 @@ def search_packages_info(query: List[str]) -> Iterator[_PackageInfo]:
|
|||
|
||||
|
||||
def print_results(
|
||||
distributions: Iterator[_PackageInfo],
|
||||
distributions: Iterable[_PackageInfo],
|
||||
list_files: bool,
|
||||
verbose: bool,
|
||||
) -> bool:
|
||||
|
|
|
@ -4,7 +4,7 @@ import os
|
|||
import pathlib
|
||||
import sys
|
||||
import sysconfig
|
||||
from typing import Any, Dict, Iterator, List, Optional, Tuple
|
||||
from typing import Any, Dict, Generator, List, Optional, Tuple
|
||||
|
||||
from pip._internal.models.scheme import SCHEME_KEYS, Scheme
|
||||
from pip._internal.utils.compat import WINDOWS
|
||||
|
@ -169,7 +169,7 @@ def _looks_like_msys2_mingw_scheme() -> bool:
|
|||
)
|
||||
|
||||
|
||||
def _fix_abiflags(parts: Tuple[str]) -> Iterator[str]:
|
||||
def _fix_abiflags(parts: Tuple[str]) -> Generator[str, None, None]:
|
||||
ldversion = sysconfig.get_config_var("LDVERSION")
|
||||
abiflags = getattr(sys, "abiflags", None)
|
||||
|
||||
|
|
|
@ -10,6 +10,7 @@ from typing import (
|
|||
TYPE_CHECKING,
|
||||
Collection,
|
||||
Container,
|
||||
Generator,
|
||||
Iterable,
|
||||
Iterator,
|
||||
List,
|
||||
|
@ -470,7 +471,7 @@ class BaseEnvironment:
|
|||
"""
|
||||
raise NotImplementedError()
|
||||
|
||||
def iter_distributions(self) -> Iterator["BaseDistribution"]:
|
||||
def iter_distributions(self) -> Generator["BaseDistribution", None, None]:
|
||||
"""Iterate through installed distributions."""
|
||||
for dist in self._iter_distributions():
|
||||
# Make sure the distribution actually comes from a valid Python
|
||||
|
|
|
@ -4,7 +4,7 @@ import logging
|
|||
import os
|
||||
import pathlib
|
||||
import zipfile
|
||||
from typing import Collection, Iterable, Iterator, List, Mapping, NamedTuple, Optional
|
||||
from typing import Collection, Generator, Iterable, List, Mapping, NamedTuple, Optional
|
||||
|
||||
from pip._vendor import pkg_resources
|
||||
from pip._vendor.packaging.requirements import Requirement
|
||||
|
@ -149,7 +149,7 @@ class Distribution(BaseDistribution):
|
|||
def is_file(self, path: InfoPath) -> bool:
|
||||
return self._dist.has_metadata(str(path))
|
||||
|
||||
def iterdir(self, path: InfoPath) -> Iterator[pathlib.PurePosixPath]:
|
||||
def iterdir(self, path: InfoPath) -> Generator[pathlib.PurePosixPath, None, None]:
|
||||
name = str(path)
|
||||
if not self._dist.has_metadata(name):
|
||||
raise FileNotFoundError(name)
|
||||
|
@ -251,6 +251,6 @@ class Environment(BaseEnvironment):
|
|||
return None
|
||||
return self._search_distribution(name)
|
||||
|
||||
def _iter_distributions(self) -> Iterator[BaseDistribution]:
|
||||
def _iter_distributions(self) -> Generator[BaseDistribution, None, None]:
|
||||
for dist in self._ws:
|
||||
yield Distribution(dist)
|
||||
|
|
|
@ -3,7 +3,7 @@
|
|||
|
||||
import os
|
||||
from contextlib import contextmanager
|
||||
from typing import Iterator, Optional
|
||||
from typing import Generator, Optional
|
||||
|
||||
from pip._vendor.cachecontrol.cache import BaseCache
|
||||
from pip._vendor.cachecontrol.caches import FileCache
|
||||
|
@ -18,7 +18,7 @@ def is_from_cache(response: Response) -> bool:
|
|||
|
||||
|
||||
@contextmanager
|
||||
def suppressed_cache_errors() -> Iterator[None]:
|
||||
def suppressed_cache_errors() -> Generator[None, None, None]:
|
||||
"""If we can't access the cache then we can just skip caching and process
|
||||
requests as if caching wasn't enabled.
|
||||
"""
|
||||
|
|
|
@ -5,7 +5,7 @@ __all__ = ["HTTPRangeRequestUnsupported", "dist_from_wheel_url"]
|
|||
from bisect import bisect_left, bisect_right
|
||||
from contextlib import contextmanager
|
||||
from tempfile import NamedTemporaryFile
|
||||
from typing import Any, Dict, Iterator, List, Optional, Tuple
|
||||
from typing import Any, Dict, Generator, List, Optional, Tuple
|
||||
from zipfile import BadZipfile, ZipFile
|
||||
|
||||
from pip._vendor.packaging.utils import canonicalize_name
|
||||
|
@ -139,7 +139,7 @@ class LazyZipOverHTTP:
|
|||
return self._file.__exit__(*exc)
|
||||
|
||||
@contextmanager
|
||||
def _stay(self) -> Iterator[None]:
|
||||
def _stay(self) -> Generator[None, None, None]:
|
||||
"""Return a context manager keeping the position.
|
||||
|
||||
At the end of the block, seek back to original position.
|
||||
|
@ -177,8 +177,8 @@ class LazyZipOverHTTP:
|
|||
|
||||
def _merge(
|
||||
self, start: int, end: int, left: int, right: int
|
||||
) -> Iterator[Tuple[int, int]]:
|
||||
"""Return an iterator of intervals to be fetched.
|
||||
) -> Generator[Tuple[int, int], None, None]:
|
||||
"""Return a generator of intervals to be fetched.
|
||||
|
||||
Args:
|
||||
start (int): Start of needed interval
|
||||
|
|
|
@ -15,7 +15,7 @@ import subprocess
|
|||
import sys
|
||||
import urllib.parse
|
||||
import warnings
|
||||
from typing import Any, Dict, Iterator, List, Mapping, Optional, Sequence, Tuple, Union
|
||||
from typing import Any, Dict, Generator, List, Mapping, Optional, Sequence, Tuple, Union
|
||||
|
||||
from pip._vendor import requests, urllib3
|
||||
from pip._vendor.cachecontrol import CacheControlAdapter
|
||||
|
@ -374,7 +374,7 @@ class PipSession(requests.Session):
|
|||
# Mount wildcard ports for the same host.
|
||||
self.mount(build_url_from_netloc(host) + ":", self._trusted_host_adapter)
|
||||
|
||||
def iter_secure_origins(self) -> Iterator[SecureOrigin]:
|
||||
def iter_secure_origins(self) -> Generator[SecureOrigin, None, None]:
|
||||
yield from SECURE_ORIGINS
|
||||
for host, port in self.pip_trusted_origins:
|
||||
yield ("*", host, "*" if port is None else port)
|
||||
|
|
|
@ -1,4 +1,4 @@
|
|||
from typing import Dict, Iterator
|
||||
from typing import Dict, Generator
|
||||
|
||||
from pip._vendor.requests.models import CONTENT_CHUNK_SIZE, Response
|
||||
|
||||
|
@ -56,7 +56,7 @@ def raise_for_status(resp: Response) -> None:
|
|||
|
||||
def response_chunks(
|
||||
response: Response, chunk_size: int = CONTENT_CHUNK_SIZE
|
||||
) -> Iterator[bytes]:
|
||||
) -> Generator[bytes, None, None]:
|
||||
"""Given a requests Response, provide the data chunks."""
|
||||
try:
|
||||
# Special case for urllib3.
|
||||
|
|
|
@ -3,7 +3,7 @@ import hashlib
|
|||
import logging
|
||||
import os
|
||||
from types import TracebackType
|
||||
from typing import Dict, Iterator, Optional, Set, Type, Union
|
||||
from typing import Dict, Generator, Optional, Set, Type, Union
|
||||
|
||||
from pip._internal.models.link import Link
|
||||
from pip._internal.req.req_install import InstallRequirement
|
||||
|
@ -13,7 +13,7 @@ logger = logging.getLogger(__name__)
|
|||
|
||||
|
||||
@contextlib.contextmanager
|
||||
def update_env_context_manager(**changes: str) -> Iterator[None]:
|
||||
def update_env_context_manager(**changes: str) -> Generator[None, None, None]:
|
||||
target = os.environ
|
||||
|
||||
# Save values from the target and change them.
|
||||
|
@ -39,7 +39,7 @@ def update_env_context_manager(**changes: str) -> Iterator[None]:
|
|||
|
||||
|
||||
@contextlib.contextmanager
|
||||
def get_build_tracker() -> Iterator["BuildTracker"]:
|
||||
def get_build_tracker() -> Generator["BuildTracker", None, None]:
|
||||
root = os.environ.get("PIP_BUILD_TRACKER")
|
||||
with contextlib.ExitStack() as ctx:
|
||||
if root is None:
|
||||
|
@ -118,7 +118,7 @@ class BuildTracker:
|
|||
logger.debug("Removed build tracker: %r", self._root)
|
||||
|
||||
@contextlib.contextmanager
|
||||
def track(self, req: InstallRequirement) -> Iterator[None]:
|
||||
def track(self, req: InstallRequirement) -> Generator[None, None, None]:
|
||||
self.add(req)
|
||||
yield
|
||||
self.remove(req)
|
||||
|
|
|
@ -1,7 +1,7 @@
|
|||
import collections
|
||||
import logging
|
||||
import os
|
||||
from typing import Container, Dict, Iterable, Iterator, List, NamedTuple, Optional, Set
|
||||
from typing import Container, Dict, Generator, Iterable, List, NamedTuple, Optional, Set
|
||||
|
||||
from pip._vendor.packaging.utils import canonicalize_name
|
||||
from pip._vendor.packaging.version import Version
|
||||
|
@ -31,7 +31,7 @@ def freeze(
|
|||
isolated: bool = False,
|
||||
exclude_editable: bool = False,
|
||||
skip: Container[str] = (),
|
||||
) -> Iterator[str]:
|
||||
) -> Generator[str, None, None]:
|
||||
installations: Dict[str, FrozenRequirement] = {}
|
||||
|
||||
dists = get_environment(paths).iter_installed_distributions(
|
||||
|
|
|
@ -22,6 +22,7 @@ from typing import (
|
|||
BinaryIO,
|
||||
Callable,
|
||||
Dict,
|
||||
Generator,
|
||||
Iterable,
|
||||
Iterator,
|
||||
List,
|
||||
|
@ -589,7 +590,7 @@ def _install_wheel(
|
|||
file.save()
|
||||
record_installed(file.src_record_path, file.dest_path, file.changed)
|
||||
|
||||
def pyc_source_file_paths() -> Iterator[str]:
|
||||
def pyc_source_file_paths() -> Generator[str, None, None]:
|
||||
# We de-duplicate installation paths, since there can be overlap (e.g.
|
||||
# file in .data maps to same location as file in wheel root).
|
||||
# Sorting installation paths makes it easier to reproduce and debug
|
||||
|
@ -656,7 +657,7 @@ def _install_wheel(
|
|||
generated_file_mode = 0o666 & ~current_umask()
|
||||
|
||||
@contextlib.contextmanager
|
||||
def _generate_file(path: str, **kwargs: Any) -> Iterator[BinaryIO]:
|
||||
def _generate_file(path: str, **kwargs: Any) -> Generator[BinaryIO, None, None]:
|
||||
with adjacent_tmp_file(path, **kwargs) as f:
|
||||
yield f
|
||||
os.chmod(f.name, generated_file_mode)
|
||||
|
@ -706,7 +707,7 @@ def _install_wheel(
|
|||
|
||||
|
||||
@contextlib.contextmanager
|
||||
def req_error_context(req_description: str) -> Iterator[None]:
|
||||
def req_error_context(req_description: str) -> Generator[None, None, None]:
|
||||
try:
|
||||
yield
|
||||
except InstallationError as e:
|
||||
|
|
|
@ -1,6 +1,6 @@
|
|||
import collections
|
||||
import logging
|
||||
from typing import Iterator, List, Optional, Sequence, Tuple
|
||||
from typing import Generator, List, Optional, Sequence, Tuple
|
||||
|
||||
from pip._internal.utils.logging import indent_log
|
||||
|
||||
|
@ -28,7 +28,7 @@ class InstallationResult:
|
|||
|
||||
def _validate_requirements(
|
||||
requirements: List[InstallRequirement],
|
||||
) -> Iterator[Tuple[str, InstallRequirement]]:
|
||||
) -> Generator[Tuple[str, InstallRequirement], None, None]:
|
||||
for req in requirements:
|
||||
assert req.name, f"invalid to-be-installed requirement: {req}"
|
||||
yield req.name, req
|
||||
|
|
|
@ -13,8 +13,8 @@ from typing import (
|
|||
Any,
|
||||
Callable,
|
||||
Dict,
|
||||
Generator,
|
||||
Iterable,
|
||||
Iterator,
|
||||
List,
|
||||
Optional,
|
||||
Tuple,
|
||||
|
@ -129,7 +129,7 @@ def parse_requirements(
|
|||
finder: Optional["PackageFinder"] = None,
|
||||
options: Optional[optparse.Values] = None,
|
||||
constraint: bool = False,
|
||||
) -> Iterator[ParsedRequirement]:
|
||||
) -> Generator[ParsedRequirement, None, None]:
|
||||
"""Parse a requirements file and yield ParsedRequirement instances.
|
||||
|
||||
:param filename: Path or url of requirements file.
|
||||
|
@ -321,13 +321,15 @@ class RequirementsFileParser:
|
|||
self._session = session
|
||||
self._line_parser = line_parser
|
||||
|
||||
def parse(self, filename: str, constraint: bool) -> Iterator[ParsedLine]:
|
||||
def parse(
|
||||
self, filename: str, constraint: bool
|
||||
) -> Generator[ParsedLine, None, None]:
|
||||
"""Parse a given file, yielding parsed lines."""
|
||||
yield from self._parse_and_recurse(filename, constraint)
|
||||
|
||||
def _parse_and_recurse(
|
||||
self, filename: str, constraint: bool
|
||||
) -> Iterator[ParsedLine]:
|
||||
) -> Generator[ParsedLine, None, None]:
|
||||
for line in self._parse_file(filename, constraint):
|
||||
if not line.is_requirement and (
|
||||
line.opts.requirements or line.opts.constraints
|
||||
|
@ -356,7 +358,9 @@ class RequirementsFileParser:
|
|||
else:
|
||||
yield line
|
||||
|
||||
def _parse_file(self, filename: str, constraint: bool) -> Iterator[ParsedLine]:
|
||||
def _parse_file(
|
||||
self, filename: str, constraint: bool
|
||||
) -> Generator[ParsedLine, None, None]:
|
||||
_, content = get_file_content(filename, self._session)
|
||||
|
||||
lines_enum = preprocess(content)
|
||||
|
|
|
@ -3,7 +3,7 @@ import os
|
|||
import sys
|
||||
import sysconfig
|
||||
from importlib.util import cache_from_source
|
||||
from typing import Any, Callable, Dict, Iterable, Iterator, List, Optional, Set, Tuple
|
||||
from typing import Any, Callable, Dict, Generator, Iterable, List, Optional, Set, Tuple
|
||||
|
||||
from pip._internal.exceptions import UninstallationError
|
||||
from pip._internal.locations import get_bin_prefix, get_bin_user
|
||||
|
@ -17,7 +17,9 @@ from pip._internal.utils.temp_dir import AdjacentTempDirectory, TempDirectory
|
|||
logger = getLogger(__name__)
|
||||
|
||||
|
||||
def _script_names(bin_dir: str, script_name: str, is_gui: bool) -> Iterator[str]:
|
||||
def _script_names(
|
||||
bin_dir: str, script_name: str, is_gui: bool
|
||||
) -> Generator[str, None, None]:
|
||||
"""Create the fully qualified name of the files created by
|
||||
{console,gui}_scripts for the given ``dist``.
|
||||
Returns the list of file names
|
||||
|
@ -34,9 +36,11 @@ def _script_names(bin_dir: str, script_name: str, is_gui: bool) -> Iterator[str]
|
|||
yield f"{exe_name}-script.py"
|
||||
|
||||
|
||||
def _unique(fn: Callable[..., Iterator[Any]]) -> Callable[..., Iterator[Any]]:
|
||||
def _unique(
|
||||
fn: Callable[..., Generator[Any, None, None]]
|
||||
) -> Callable[..., Generator[Any, None, None]]:
|
||||
@functools.wraps(fn)
|
||||
def unique(*args: Any, **kw: Any) -> Iterator[Any]:
|
||||
def unique(*args: Any, **kw: Any) -> Generator[Any, None, None]:
|
||||
seen: Set[Any] = set()
|
||||
for item in fn(*args, **kw):
|
||||
if item not in seen:
|
||||
|
@ -47,7 +51,7 @@ def _unique(fn: Callable[..., Iterator[Any]]) -> Callable[..., Iterator[Any]]:
|
|||
|
||||
|
||||
@_unique
|
||||
def uninstallation_paths(dist: BaseDistribution) -> Iterator[str]:
|
||||
def uninstallation_paths(dist: BaseDistribution) -> Generator[str, None, None]:
|
||||
"""
|
||||
Yield all the uninstallation paths for dist based on RECORD-without-.py[co]
|
||||
|
||||
|
@ -565,7 +569,7 @@ class UninstallPathSet:
|
|||
def iter_scripts_to_remove(
|
||||
dist: BaseDistribution,
|
||||
bin_dir: str,
|
||||
) -> Iterator[str]:
|
||||
) -> Generator[str, None, None]:
|
||||
for entry_point in dist.iter_entry_points():
|
||||
if entry_point.group == "console_scripts":
|
||||
yield from _script_names(bin_dir, entry_point.name, False)
|
||||
|
|
|
@ -5,7 +5,7 @@ import random
|
|||
import sys
|
||||
from contextlib import contextmanager
|
||||
from tempfile import NamedTemporaryFile
|
||||
from typing import Any, BinaryIO, Iterator, List, Union, cast
|
||||
from typing import Any, BinaryIO, Generator, List, Union, cast
|
||||
|
||||
from pip._vendor.tenacity import retry, stop_after_delay, wait_fixed
|
||||
|
||||
|
@ -41,7 +41,7 @@ def check_path_owner(path: str) -> bool:
|
|||
|
||||
|
||||
@contextmanager
|
||||
def adjacent_tmp_file(path: str, **kwargs: Any) -> Iterator[BinaryIO]:
|
||||
def adjacent_tmp_file(path: str, **kwargs: Any) -> Generator[BinaryIO, None, None]:
|
||||
"""Return a file-like object pointing to a tmp file next to path.
|
||||
|
||||
The file is created securely and is ensured to be written to disk
|
||||
|
|
|
@ -1,5 +1,5 @@
|
|||
import hashlib
|
||||
from typing import TYPE_CHECKING, BinaryIO, Dict, Iterator, List
|
||||
from typing import TYPE_CHECKING, BinaryIO, Dict, Iterable, List
|
||||
|
||||
from pip._internal.exceptions import HashMismatch, HashMissing, InstallationError
|
||||
from pip._internal.utils.misc import read_chunks
|
||||
|
@ -71,7 +71,7 @@ class Hashes:
|
|||
"""Return whether the given hex digest is allowed."""
|
||||
return hex_digest in self._allowed.get(hash_name, [])
|
||||
|
||||
def check_against_chunks(self, chunks: Iterator[bytes]) -> None:
|
||||
def check_against_chunks(self, chunks: Iterable[bytes]) -> None:
|
||||
"""Check good hashes against ones built from iterable of chunks of
|
||||
data.
|
||||
|
||||
|
|
|
@ -8,7 +8,7 @@ import threading
|
|||
from dataclasses import dataclass
|
||||
from io import TextIOWrapper
|
||||
from logging import Filter
|
||||
from typing import Any, ClassVar, Iterator, List, Optional, TextIO, Type
|
||||
from typing import Any, ClassVar, Generator, List, Optional, TextIO, Type
|
||||
|
||||
from pip._vendor.rich.console import (
|
||||
Console,
|
||||
|
@ -51,7 +51,7 @@ def _is_broken_pipe_error(exc_class: Type[BaseException], exc: BaseException) ->
|
|||
|
||||
|
||||
@contextlib.contextmanager
|
||||
def indent_log(num: int = 2) -> Iterator[None]:
|
||||
def indent_log(num: int = 2) -> Generator[None, None, None]:
|
||||
"""
|
||||
A context manager which will cause the log output to be indented for any
|
||||
log messages emitted inside it.
|
||||
|
|
|
@ -21,6 +21,7 @@ from typing import (
|
|||
BinaryIO,
|
||||
Callable,
|
||||
ContextManager,
|
||||
Generator,
|
||||
Iterable,
|
||||
Iterator,
|
||||
List,
|
||||
|
@ -264,7 +265,9 @@ def is_installable_dir(path: str) -> bool:
|
|||
return False
|
||||
|
||||
|
||||
def read_chunks(file: BinaryIO, size: int = io.DEFAULT_BUFFER_SIZE) -> Iterator[bytes]:
|
||||
def read_chunks(
|
||||
file: BinaryIO, size: int = io.DEFAULT_BUFFER_SIZE
|
||||
) -> Generator[bytes, None, None]:
|
||||
"""Yield pieces of data from a file-like object until EOF."""
|
||||
while True:
|
||||
chunk = file.read(size)
|
||||
|
@ -346,7 +349,7 @@ class StreamWrapper(StringIO):
|
|||
|
||||
|
||||
@contextlib.contextmanager
|
||||
def captured_output(stream_name: str) -> Iterator[StreamWrapper]:
|
||||
def captured_output(stream_name: str) -> Generator[StreamWrapper, None, None]:
|
||||
"""Return a context manager used by captured_stdout/stdin/stderr
|
||||
that temporarily replaces the sys stream *stream_name* with a StringIO.
|
||||
|
||||
|
|
|
@ -4,7 +4,7 @@ import logging
|
|||
import os.path
|
||||
import tempfile
|
||||
from contextlib import ExitStack, contextmanager
|
||||
from typing import Any, Dict, Iterator, Optional, TypeVar, Union
|
||||
from typing import Any, Dict, Generator, Optional, TypeVar, Union
|
||||
|
||||
from pip._internal.utils.misc import enum, rmtree
|
||||
|
||||
|
@ -26,7 +26,7 @@ _tempdir_manager: Optional[ExitStack] = None
|
|||
|
||||
|
||||
@contextmanager
|
||||
def global_tempdir_manager() -> Iterator[None]:
|
||||
def global_tempdir_manager() -> Generator[None, None, None]:
|
||||
global _tempdir_manager
|
||||
with ExitStack() as stack:
|
||||
old_tempdir_manager, _tempdir_manager = _tempdir_manager, stack
|
||||
|
@ -59,7 +59,7 @@ _tempdir_registry: Optional[TempDirectoryTypeRegistry] = None
|
|||
|
||||
|
||||
@contextmanager
|
||||
def tempdir_registry() -> Iterator[TempDirectoryTypeRegistry]:
|
||||
def tempdir_registry() -> Generator[TempDirectoryTypeRegistry, None, None]:
|
||||
"""Provides a scoped global tempdir registry that can be used to dictate
|
||||
whether directories should be deleted.
|
||||
"""
|
||||
|
@ -200,7 +200,7 @@ class AdjacentTempDirectory(TempDirectory):
|
|||
super().__init__(delete=delete)
|
||||
|
||||
@classmethod
|
||||
def _generate_names(cls, name: str) -> Iterator[str]:
|
||||
def _generate_names(cls, name: str) -> Generator[str, None, None]:
|
||||
"""Generates a series of temporary names.
|
||||
|
||||
The algorithm replaces the leading characters in the name
|
||||
|
|
Loading…
Reference in New Issue