Add type annotations for pip._internal.pyproject, pip._internal.build_env, pip._internal.index, pip._internal.resolve (#6072)

This commit is contained in:
Maxim Kurnikov 2018-12-17 14:13:23 +03:00 committed by Pradyun Gedam
parent 05eb7d8e92
commit 8fc393a2d1
4 changed files with 187 additions and 54 deletions

View File

@ -14,14 +14,20 @@ from pip._vendor.pkg_resources import Requirement, VersionConflict, WorkingSet
from pip import __file__ as pip_location
from pip._internal.utils.misc import call_subprocess
from pip._internal.utils.temp_dir import TempDirectory
from pip._internal.utils.typing import MYPY_CHECK_RUNNING
from pip._internal.utils.ui import open_spinner
if MYPY_CHECK_RUNNING:
from typing import Tuple, Set, Iterable, Optional, List # noqa: F401
from pip._internal.index import PackageFinder # noqa: F401
logger = logging.getLogger(__name__)
class _Prefix:
def __init__(self, path):
# type: (str) -> None
self.path = path
self.setup = False
self.bin_dir = get_paths(
@ -30,8 +36,8 @@ class _Prefix:
)['scripts']
# Note: prefer distutils' sysconfig to get the
# library paths so PyPy is correctly supported.
purelib = get_python_lib(plat_specific=0, prefix=path)
platlib = get_python_lib(plat_specific=1, prefix=path)
purelib = get_python_lib(plat_specific=False, prefix=path)
platlib = get_python_lib(plat_specific=True, prefix=path)
if purelib == platlib:
self.lib_dirs = [purelib]
else:
@ -43,6 +49,7 @@ class BuildEnvironment(object):
"""
def __init__(self):
# type: () -> None
self._temp_dir = TempDirectory(kind="build-env")
self._temp_dir.create()
@ -51,8 +58,8 @@ class BuildEnvironment(object):
for name in ('normal', 'overlay')
))
self._bin_dirs = []
self._lib_dirs = []
self._bin_dirs = [] # type: List[str]
self._lib_dirs = [] # type: List[str]
for prefix in reversed(list(self._prefixes.values())):
self._bin_dirs.append(prefix.bin_dir)
self._lib_dirs.extend(prefix.lib_dirs)
@ -62,8 +69,8 @@ class BuildEnvironment(object):
# - prevent access to system site packages
system_sites = {
os.path.normcase(site) for site in (
get_python_lib(plat_specific=0),
get_python_lib(plat_specific=1),
get_python_lib(plat_specific=False),
get_python_lib(plat_specific=True),
)
}
self._site_dir = os.path.join(self._temp_dir.path, 'site')
@ -124,9 +131,11 @@ class BuildEnvironment(object):
os.environ[varname] = old_value
def cleanup(self):
# type: () -> None
self._temp_dir.cleanup()
def check_requirements(self, reqs):
# type: (Iterable[str]) -> Tuple[Set[Tuple[str, str]], Set[str]]
"""Return 2 sets:
- conflicting requirements: set of (installed, wanted) reqs tuples
- missing requirements: set of reqs
@ -144,8 +153,15 @@ class BuildEnvironment(object):
str(e.args[1])))
return conflicting, missing
def install_requirements(self, finder, requirements, prefix, message):
prefix = self._prefixes[prefix]
def install_requirements(
self,
finder, # type: PackageFinder
requirements, # type: Iterable[str]
prefix_as_string, # type: str
message # type: Optional[str]
):
# type: (...) -> None
prefix = self._prefixes[prefix_as_string]
assert not prefix.setup
prefix.setup = True
if not requirements:
@ -154,7 +170,7 @@ class BuildEnvironment(object):
sys.executable, os.path.dirname(pip_location), 'install',
'--ignore-installed', '--no-user', '--prefix', prefix.path,
'--no-warn-script-location',
]
] # type: List[str]
if logger.getEffectiveLevel() <= logging.DEBUG:
args.append('-v')
for format_control in ('no_binary', 'only_binary'):

View File

@ -38,8 +38,24 @@ from pip._internal.utils.misc import (
redact_password_from_url,
)
from pip._internal.utils.packaging import check_requires_python
from pip._internal.utils.typing import MYPY_CHECK_RUNNING
from pip._internal.wheel import Wheel
if MYPY_CHECK_RUNNING:
from logging import Logger # noqa: F401
from typing import ( # noqa: F401
Tuple, Optional, Any, List, Union, Callable, Set, Sequence,
Iterable, MutableMapping
)
from pip._vendor.packaging.version import _BaseVersion # noqa: F401
from pip._vendor.requests import Response # noqa: F401
from pip._internal.req import InstallRequirement # noqa: F401
from pip._internal.download import PipSession # noqa: F401
SecureOrigin = Tuple[str, str, Optional[str]]
BuildTag = Tuple[Any, ...] # either emply tuple or Tuple[int, str]
CandidateSortingKey = Tuple[int, _BaseVersion, BuildTag, Optional[int]]
__all__ = ['FormatControl', 'PackageFinder']
@ -53,13 +69,14 @@ SECURE_ORIGINS = [
("file", "*", None),
# ssh is always secure.
("ssh", "*", "*"),
]
] # type: List[SecureOrigin]
logger = logging.getLogger(__name__)
def _match_vcs_scheme(url):
# type: (str) -> Optional[str]
"""Look for VCS schemes in the URL.
Returns the matched VCS scheme, or None if there's no match.
@ -72,6 +89,7 @@ def _match_vcs_scheme(url):
def _is_url_like_archive(url):
# type: (str) -> bool
"""Return whether the URL looks like an archive.
"""
filename = Link(url).filename
@ -83,12 +101,14 @@ def _is_url_like_archive(url):
class _NotHTML(Exception):
def __init__(self, content_type, request_desc):
# type: (str, str) -> None
super(_NotHTML, self).__init__(content_type, request_desc)
self.content_type = content_type
self.request_desc = request_desc
def _ensure_html_header(response):
# type: (Response) -> None
"""Check the Content-Type header to ensure the response contains HTML.
Raises `_NotHTML` if the content type is not text/html.
@ -103,6 +123,7 @@ class _NotHTTP(Exception):
def _ensure_html_response(url, session):
# type: (str, PipSession) -> None
"""Send a HEAD request to the URL, and ensure the response contains HTML.
Raises `_NotHTTP` if the URL is not available for a HEAD request, or
@ -119,6 +140,7 @@ def _ensure_html_response(url, session):
def _get_html_response(url, session):
# type: (str, PipSession) -> Response
"""Access an HTML page with GET, and return the response.
This consists of three parts:
@ -168,13 +190,19 @@ def _get_html_response(url, session):
return resp
def _handle_get_page_fail(link, reason, url, meth=None):
def _handle_get_page_fail(
link, # type: Link
reason, # type: Union[str, Exception]
meth=None # type: Optional[Callable[..., None]]
):
# type: (...) -> None
if meth is None:
meth = logger.debug
meth("Could not fetch URL %s: %s - skipping", link, reason)
def _get_html_page(link, session=None):
# type: (Link, Optional[PipSession]) -> Optional[HTMLPage]
if session is None:
raise TypeError(
"_get_html_page() missing 1 required keyword argument: 'session'"
@ -211,19 +239,20 @@ def _get_html_page(link, session=None):
link, exc.request_desc, exc.content_type,
)
except requests.HTTPError as exc:
_handle_get_page_fail(link, exc, url)
_handle_get_page_fail(link, exc)
except RetryError as exc:
_handle_get_page_fail(link, exc, url)
_handle_get_page_fail(link, exc)
except SSLError as exc:
reason = "There was a problem confirming the ssl certificate: "
reason += str(exc)
_handle_get_page_fail(link, reason, url, meth=logger.info)
_handle_get_page_fail(link, reason, meth=logger.info)
except requests.ConnectionError as exc:
_handle_get_page_fail(link, "connection error: %s" % exc, url)
_handle_get_page_fail(link, "connection error: %s" % exc)
except requests.Timeout:
_handle_get_page_fail(link, "timed out", url)
_handle_get_page_fail(link, "timed out")
else:
return HTMLPage(resp.content, resp.url, resp.headers)
return None
class PackageFinder(object):
@ -233,11 +262,22 @@ class PackageFinder(object):
packages, by reading pages and looking for appropriate links.
"""
def __init__(self, find_links, index_urls, allow_all_prereleases=False,
trusted_hosts=None, process_dependency_links=False,
session=None, format_control=None, platform=None,
versions=None, abi=None, implementation=None,
prefer_binary=False):
def __init__(
self,
find_links, # type: List[str]
index_urls, # type: List[str]
allow_all_prereleases=False, # type: bool
trusted_hosts=None, # type: Optional[Iterable[str]]
process_dependency_links=False, # type: bool
session=None, # type: Optional[PipSession]
format_control=None, # type: Optional[FormatControl]
platform=None, # type: Optional[str]
versions=None, # type: Optional[List[str]]
abi=None, # type: Optional[str]
implementation=None, # type: Optional[str]
prefer_binary=False # type: bool
):
# type: (...) -> None
"""Create a PackageFinder.
:param format_control: A FormatControl object or None. Used to control
@ -266,7 +306,7 @@ class PackageFinder(object):
# it and if it exists, use the normalized version.
# This is deliberately conservative - it might be fine just to
# blindly normalize anything starting with a ~...
self.find_links = []
self.find_links = [] # type: List[str]
for link in find_links:
if link.startswith('~'):
new_link = normalize_path(link)
@ -275,10 +315,10 @@ class PackageFinder(object):
self.find_links.append(link)
self.index_urls = index_urls
self.dependency_links = []
self.dependency_links = [] # type: List[str]
# These are boring links that have already been logged somehow:
self.logged_links = set()
self.logged_links = set() # type: Set[Link]
self.format_control = format_control or FormatControl(set(), set())
@ -286,7 +326,7 @@ class PackageFinder(object):
self.secure_origins = [
("*", host, "*")
for host in (trusted_hosts if trusted_hosts else [])
]
] # type: List[SecureOrigin]
# Do we want to allow _all_ pre-releases?
self.allow_all_prereleases = allow_all_prereleases
@ -322,6 +362,7 @@ class PackageFinder(object):
break
def get_formatted_locations(self):
# type: () -> str
lines = []
if self.index_urls and self.index_urls != [PyPI.simple_url]:
lines.append(
@ -335,6 +376,7 @@ class PackageFinder(object):
return "\n".join(lines)
def add_dependency_links(self, links):
# type: (Iterable[str]) -> None
# FIXME: this shouldn't be global list this, it should only
# apply to requirements of the package that specifies the
# dependency_links value
@ -351,6 +393,7 @@ class PackageFinder(object):
@staticmethod
def _sort_locations(locations, expand_dir=False):
# type: (Sequence[str], bool) -> Tuple[List[str], List[str]]
"""
Sort locations into "files" (archives) and "urls", and return
a pair of lists (files,urls)
@ -407,6 +450,7 @@ class PackageFinder(object):
return files, urls
def _candidate_sort_key(self, candidate):
# type: (InstallationCandidate) -> CandidateSortingKey
"""
Function used to generate link sort key for link tuples.
The greater the return value, the more preferred it is.
@ -421,7 +465,7 @@ class PackageFinder(object):
with the same version, would have to be considered equal
"""
support_num = len(self.valid_tags)
build_tag = tuple()
build_tag = tuple() # type: BuildTag
binary_preference = 0
if candidate.location.is_wheel:
# can raise InvalidWheelFilename
@ -443,6 +487,7 @@ class PackageFinder(object):
return (binary_preference, candidate.version, build_tag, pri)
def _validate_secure_origin(self, logger, location):
# type: (Logger, Link) -> bool
# Determine if this url used a secure transport mechanism
parsed = urllib_parse.urlparse(str(location))
origin = (parsed.scheme, parsed.hostname, parsed.port)
@ -474,7 +519,9 @@ class PackageFinder(object):
network = ipaddress.ip_network(
secure_origin[1]
if isinstance(secure_origin[1], six.text_type)
else secure_origin[1].decode("utf8")
# setting secure_origin[1] to proper Union[bytes, str]
# creates problems in other places
else secure_origin[1].decode("utf8") # type: ignore
)
except ValueError:
# We don't have both a valid address or a valid network, so
@ -514,6 +561,7 @@ class PackageFinder(object):
return False
def _get_index_urls_locations(self, project_name):
# type: (str) -> List[str]
"""Returns the locations found via self.index_urls
Checks the url_name on the main (first in the list) index and
@ -536,6 +584,7 @@ class PackageFinder(object):
return [mkurl_pypi_url(url) for url in self.index_urls]
def find_all_candidates(self, project_name):
# type: (str) -> List[Optional[InstallationCandidate]]
"""Find all available InstallationCandidate for project_name
This checks index_urls, find_links and dependency_links.
@ -619,6 +668,7 @@ class PackageFinder(object):
)
def find_requirement(self, req, upgrade):
# type: (InstallRequirement, bool) -> Optional[Link]
"""Try to find a Link matching req
Expects req, an InstallRequirement and upgrade, a boolean
@ -656,7 +706,9 @@ class PackageFinder(object):
best_candidate = None
if req.satisfied_by is not None:
installed_version = parse_version(req.satisfied_by.version)
# type error fixed in mypy==0.641, remove after update
installed_version = parse_version(
req.satisfied_by.version) # type: ignore
else:
installed_version = None
@ -718,11 +770,12 @@ class PackageFinder(object):
return best_candidate.location
def _get_pages(self, locations, project_name):
# type: (Iterable[Link], str) -> Iterable[HTMLPage]
"""
Yields (page, page_url) from the given locations, skipping
locations that have errors.
"""
seen = set()
seen = set() # type: Set[Link]
for location in locations:
if location in seen:
continue
@ -737,12 +790,13 @@ class PackageFinder(object):
_py_version_re = re.compile(r'-py([123]\.?[0-9]?)$')
def _sort_links(self, links):
# type: (Iterable[Link]) -> List[Link]
"""
Returns elements of links in order, non-egg links first, egg links
second, while eliminating duplicates
"""
eggs, no_eggs = [], []
seen = set()
seen = set() # type: Set[Link]
for link in links:
if link not in seen:
seen.add(link)
@ -752,7 +806,12 @@ class PackageFinder(object):
no_eggs.append(link)
return no_eggs + eggs
def _package_versions(self, links, search):
def _package_versions(
self,
links, # type: Iterable[Link]
search # type: Search
):
# type: (...) -> List[Optional[InstallationCandidate]]
result = []
for link in self._sort_links(links):
v = self._link_package_versions(link, search)
@ -761,11 +820,13 @@ class PackageFinder(object):
return result
def _log_skipped_link(self, link, reason):
# type: (Link, str) -> None
if link not in self.logged_links:
logger.debug('Skipping link %s; %s', link, reason)
self.logged_links.add(link)
def _link_package_versions(self, link, search):
# type: (Link, Search) -> Optional[InstallationCandidate]
"""Return an InstallationCandidate or None"""
version = None
if link.egg_fragment:
@ -775,35 +836,35 @@ class PackageFinder(object):
egg_info, ext = link.splitext()
if not ext:
self._log_skipped_link(link, 'not a file')
return
return None
if ext not in SUPPORTED_EXTENSIONS:
self._log_skipped_link(
link, 'unsupported archive format: %s' % ext,
)
return
return None
if "binary" not in search.formats and ext == WHEEL_EXTENSION:
self._log_skipped_link(
link, 'No binaries permitted for %s' % search.supplied,
)
return
return None
if "macosx10" in link.path and ext == '.zip':
self._log_skipped_link(link, 'macosx10 one')
return
return None
if ext == WHEEL_EXTENSION:
try:
wheel = Wheel(link.filename)
except InvalidWheelFilename:
self._log_skipped_link(link, 'invalid wheel filename')
return
return None
if canonicalize_name(wheel.name) != search.canonical:
self._log_skipped_link(
link, 'wrong project name (not %s)' % search.supplied)
return
return None
if not wheel.supported(self.valid_tags):
self._log_skipped_link(
link, 'it is not compatible with this Python')
return
return None
version = wheel.version
@ -812,14 +873,14 @@ class PackageFinder(object):
self._log_skipped_link(
link, 'No sources permitted for %s' % search.supplied,
)
return
return None
if not version:
version = _egg_info_matches(egg_info, search.canonical)
if not version:
self._log_skipped_link(
link, 'Missing project version for %s' % search.supplied)
return
return None
match = self._py_version_re.search(version)
if match:
@ -828,7 +889,7 @@ class PackageFinder(object):
if py_version != sys.version[:3]:
self._log_skipped_link(
link, 'Python version is incorrect')
return
return None
try:
support_this_python = check_requires_python(link.requires_python)
except specifiers.InvalidSpecifier:
@ -840,13 +901,14 @@ class PackageFinder(object):
logger.debug("The package %s is incompatible with the python "
"version in use. Acceptable python versions are: %s",
link, link.requires_python)
return
return None
logger.debug('Found link %s, version: %s', link, version)
return InstallationCandidate(search.supplied, version, link)
def _find_name_version_sep(egg_info, canonical_name):
# type: (str, str) -> int
"""Find the separator's index based on the package's canonical name.
`egg_info` must be an egg info string for the given package, and
@ -872,6 +934,7 @@ def _find_name_version_sep(egg_info, canonical_name):
def _egg_info_matches(egg_info, canonical_name):
# type: (str, str) -> Optional[str]
"""Pull the version part out of a string.
:param egg_info: The string to parse. E.g. foo-2.1
@ -921,16 +984,20 @@ _CLEAN_LINK_RE = re.compile(r'[^a-z0-9$&+,/:;=?@.#%_\\|-]', re.I)
def _clean_link(url):
# type: (str) -> str
"""Makes sure a link is fully encoded. That is, if a ' ' shows up in
the link, it will be rewritten to %20 (while not over-quoting
% or other characters)."""
return _CLEAN_LINK_RE.sub(lambda match: '%%%2x' % ord(match.group(0)), url)
# type error fixed in mypy==0.641, remove after update
return _CLEAN_LINK_RE.sub(
lambda match: '%%%2x' % ord(match.group(0)), url) # type: ignore
class HTMLPage(object):
"""Represents one page, along with its URL"""
def __init__(self, content, url, headers=None):
# type: (bytes, str, MutableMapping[str, str]) -> None
self.content = content
self.url = url
self.headers = headers
@ -939,6 +1006,7 @@ class HTMLPage(object):
return redact_password_from_url(self.url)
def iter_links(self):
# type: () -> Iterable[Link]
"""Yields all links in the page"""
document = html5lib.parse(
self.content,

View File

@ -6,16 +6,27 @@ import os
from pip._vendor import pytoml, six
from pip._internal.exceptions import InstallationError
from pip._internal.utils.typing import MYPY_CHECK_RUNNING
if MYPY_CHECK_RUNNING:
from typing import Any, Tuple, Optional, List # noqa: F401
def _is_list_of_str(obj):
# type: (Any) -> bool
return (
isinstance(obj, list) and
all(isinstance(item, six.string_types) for item in obj)
)
def load_pyproject_toml(use_pep517, pyproject_toml, setup_py, req_name):
def load_pyproject_toml(
use_pep517, # type: Optional[bool]
pyproject_toml, # type: str
setup_py, # type: str
req_name # type: str
):
# type: (...) -> Optional[Tuple[List[str], str, List[str]]]
"""Load the pyproject.toml file.
Parameters:
@ -123,7 +134,7 @@ def load_pyproject_toml(use_pep517, pyproject_toml, setup_py, req_name):
))
backend = build_system.get("build-backend")
check = []
check = [] # type: List[str]
if backend is None:
# If the user didn't specify a backend, we assume they want to use
# the setuptools backend. But we can't be sure they have included

View File

@ -22,6 +22,18 @@ from pip._internal.req.constructors import install_req_from_req_string
from pip._internal.utils.logging import indent_log
from pip._internal.utils.misc import dist_in_usersite, ensure_dir
from pip._internal.utils.packaging import check_dist_requires_python
from pip._internal.utils.typing import MYPY_CHECK_RUNNING
if MYPY_CHECK_RUNNING:
from typing import Optional, DefaultDict, List, Set # noqa: F401
from pip._internal.download import PipSession # noqa: F401
from pip._internal.req.req_install import InstallRequirement # noqa: F401
from pip._internal.index import PackageFinder # noqa: F401
from pip._internal.req.req_set import RequirementSet # noqa: F401
from pip._internal.operations.prepare import ( # noqa: F401
DistAbstraction, RequirementPreparer
)
from pip._internal.cache import WheelCache # noqa: F401
logger = logging.getLogger(__name__)
@ -33,9 +45,22 @@ class Resolver(object):
_allowed_strategies = {"eager", "only-if-needed", "to-satisfy-only"}
def __init__(self, preparer, session, finder, wheel_cache, use_user_site,
ignore_dependencies, ignore_installed, ignore_requires_python,
force_reinstall, isolated, upgrade_strategy, use_pep517=None):
def __init__(
self,
preparer, # type: RequirementPreparer
session, # type: PipSession
finder, # type: PackageFinder
wheel_cache, # type: Optional[WheelCache]
use_user_site, # type: bool
ignore_dependencies, # type: bool
ignore_installed, # type: bool
ignore_requires_python, # type: bool
force_reinstall, # type: bool
isolated, # type: bool
upgrade_strategy, # type: str
use_pep517=None # type: Optional[bool]
):
# type: (...) -> None
super(Resolver, self).__init__()
assert upgrade_strategy in self._allowed_strategies
@ -47,7 +72,8 @@ class Resolver(object):
# information about both sdist and wheels transparently.
self.wheel_cache = wheel_cache
self.require_hashes = None # This is set in resolve
# This is set in resolve
self.require_hashes = None # type: Optional[bool]
self.upgrade_strategy = upgrade_strategy
self.force_reinstall = force_reinstall
@ -58,9 +84,11 @@ class Resolver(object):
self.use_user_site = use_user_site
self.use_pep517 = use_pep517
self._discovered_dependencies = defaultdict(list)
self._discovered_dependencies = \
defaultdict(list) # type: DefaultDict[str, List]
def resolve(self, requirement_set):
# type: (RequirementSet) -> None
"""Resolve what operations need to be done
As a side-effect of this method, the packages (and their dependencies)
@ -95,7 +123,7 @@ class Resolver(object):
# exceptions cannot be checked ahead of time, because
# req.populate_link() needs to be called before we can make decisions
# based on link type.
discovered_reqs = []
discovered_reqs = [] # type: List[InstallRequirement]
hash_errors = HashErrors()
for req in chain(root_reqs, discovered_reqs):
try:
@ -110,6 +138,7 @@ class Resolver(object):
raise hash_errors
def _is_upgrade_allowed(self, req):
# type: (InstallRequirement) -> bool
if self.upgrade_strategy == "to-satisfy-only":
return False
elif self.upgrade_strategy == "eager":
@ -119,6 +148,7 @@ class Resolver(object):
return req.is_direct
def _set_req_to_reinstall(self, req):
# type: (InstallRequirement) -> None
"""
Set a requirement to be installed.
"""
@ -130,6 +160,7 @@ class Resolver(object):
# XXX: Stop passing requirement_set for options
def _check_skip_installed(self, req_to_install):
# type: (InstallRequirement) -> Optional[str]
"""Check if req_to_install should be skipped.
This will check if the req is installed, and whether we should upgrade
@ -182,6 +213,7 @@ class Resolver(object):
return None
def _get_abstract_dist_for(self, req):
# type: (InstallRequirement) -> DistAbstraction
"""Takes a InstallRequirement and returns a single AbstractDist \
representing a prepared variant of the same.
"""
@ -238,7 +270,12 @@ class Resolver(object):
return abstract_dist
def _resolve_one(self, requirement_set, req_to_install):
def _resolve_one(
self,
requirement_set, # type: RequirementSet
req_to_install # type: InstallRequirement
):
# type: (...) -> List[InstallRequirement]
"""Prepare a single requirements file.
:return: A list of additional InstallRequirements to also install.
@ -266,7 +303,7 @@ class Resolver(object):
else:
raise
more_reqs = []
more_reqs = [] # type: List[InstallRequirement]
def add_req(subreq, extras_requested):
sub_install_req = install_req_from_req_string(
@ -328,6 +365,7 @@ class Resolver(object):
return more_reqs
def get_installation_order(self, req_set):
# type: (RequirementSet) -> List[InstallRequirement]
"""Create the installation order.
The installation order is topological - requirements are installed
@ -338,7 +376,7 @@ class Resolver(object):
# installs the user specified things in the order given, except when
# dependencies must come earlier to achieve topological order.
order = []
ordered_reqs = set()
ordered_reqs = set() # type: Set[InstallRequirement]
def schedule(req):
if req.satisfied_by or req in ordered_reqs: