1
1
Fork 0
mirror of https://github.com/pypa/pip synced 2023-12-13 21:30:23 +01:00

Merge branch 'master' into architecture

This commit is contained in:
Pradyun Gedam 2019-08-22 21:07:58 +05:30 committed by GitHub
commit 9b82fcc9d8
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
53 changed files with 1491 additions and 572 deletions

View file

@ -8,8 +8,9 @@ Architecture of pip's internals
interested in helping out, please let us know in the `tracking issue`_.
.. note::
Direct use of pip's internals is *not supported*.
For more details, see :ref:`Using pip from your program`.
Direct use of pip's internals is *not supported*, and these internals
can change at any time. For more details, see :ref:`Using pip from
your program`.
.. toctree::
@ -17,8 +18,7 @@ Architecture of pip's internals
overview
anatomy
package-finding
.. _`tracking issue`: https://github.com/pypa/pip/issues/6831

View file

@ -0,0 +1,202 @@
Finding and choosing files (``index.py`` and ``PackageFinder``)
---------------------------------------------------------------
The ``index.py`` module is a top-level module in pip responsible for deciding
what file to download and from where, given a requirement for a project. The
module's functionality is largely exposed through and coordinated by the
module's ``PackageFinder`` class.
.. _index-py-overview:
Overview
********
Here is a rough description of the process that pip uses to choose what
file to download for a package, given a requirement:
1. Access the various network and file system locations configured for pip
that contain package files. These locations can include, for example,
pip's :ref:`--index-url <--index-url>` (with default
https://pypi.org/simple/ ) and any configured
:ref:`--extra-index-url <--extra-index-url>` locations.
Each of these locations is a `PEP 503`_ "simple repository" page, which
is an HTML page of anchor links.
2. Collect together all of the links (e.g. by parsing the anchor links
from the HTML pages) and create ``Link`` objects from each of these.
3. Determine which of the links are minimally relevant, using the
:ref:`LinkEvaluator <link-evaluator-class>` class. Create an
``InstallationCandidate`` object (aka candidate for install) for each
of these relevant links.
4. Further filter the collection of ``InstallationCandidate`` objects (using
the :ref:`CandidateEvaluator <candidate-evaluator-class>` class) to a
collection of "applicable" candidates.
5. If there are applicable candidates, choose the best candidate by sorting
them (again using the :ref:`CandidateEvaluator
<candidate-evaluator-class>` class).
The remainder of this section is organized by documenting some of the
classes inside ``index.py``, in the following order:
* the main :ref:`PackageFinder <package-finder-class>` class,
* the :ref:`LinkEvaluator <link-evaluator-class>` class,
* the :ref:`CandidateEvaluator <candidate-evaluator-class>` class,
* the :ref:`CandidatePreferences <candidate-preferences-class>` class, and
* the :ref:`BestCandidateResult <best-candidate-result-class>` class.
.. _package-finder-class:
The ``PackageFinder`` class
***************************
The ``PackageFinder`` class is the primary way through which code in pip
interacts with ``index.py``. It is an umbrella class that encapsulates and
groups together various package-finding functionality.
The ``PackageFinder`` class is responsible for searching the network and file
system for what versions of a package pip can install, and also for deciding
which version is most preferred, given the user's preferences, target Python
environment, etc.
The pip commands that use the ``PackageFinder`` class are:
* :ref:`pip download`
* :ref:`pip install`
* :ref:`pip list`
* :ref:`pip wheel`
The pip commands requiring use of the ``PackageFinder`` class generally
instantiate ``PackageFinder`` only once for the whole pip invocation. In
fact, pip creates this ``PackageFinder`` instance when command options
are first parsed.
With the excepton of :ref:`pip list`, each of the above commands is
implemented as a ``Command`` class inheriting from ``RequirementCommand``
(for example :ref:`pip download` is implemented by ``DownloadCommand``), and
the ``PackageFinder`` instance is created by calling the
``RequirementCommand`` class's ``_build_package_finder()`` method. ``pip
list``, on the other hand, constructs its ``PackageFinder`` instance by
calling the ``ListCommand`` class's ``_build_package_finder()``. (This
difference may simply be historical and may not actually be necessary.)
Each of these commands also uses the ``PackageFinder`` class for pip's
"self-check," (i.e. to check whether a pip upgrade is available). In this
case, the ``PackageFinder`` instance is created by the ``outdated.py``
module's ``pip_version_check()`` function.
The ``PackageFinder`` class is responsible for doing all of the things listed
in the :ref:`Overview <index-py-overview>` section like fetching and parsing
`PEP 503`_ simple repository HTML pages, evaluating which links in the simple
repository pages are relevant for each requirement, and further filtering and
sorting by preference the candidates for install coming from the relevant
links.
One of ``PackageFinder``'s main top-level methods is
``find_best_candidate()``. This method does the following two things:
1. Calls its ``find_all_candidates()`` method, which reads and parses all the
index URL's provided by the user, constructs a :ref:`LinkEvaluator
<link-evaluator-class>` object to filter out some of those links, and then
returns a list of ``InstallationCandidates`` (aka candidates for install).
This corresponds to steps 1-3 of the :ref:`Overview <index-py-overview>`
above.
2. Constructs a ``CandidateEvaluator`` object and uses that to determine
the best candidate. It does this by calling the ``CandidateEvaluator``
class's ``compute_best_candidate()`` method on the return value of
``find_all_candidates()``. This corresponds to steps 4-5 of the Overview.
.. _link-evaluator-class:
The ``LinkEvaluator`` class
***************************
The ``LinkEvaluator`` class contains the business logic for determining
whether a link (e.g. in a simple repository page) satisfies minimal
conditions to be a candidate for install (resulting in an
``InstallationCandidate`` object). When making this determination, the
``LinkEvaluator`` instance uses information like the target Python
interpreter as well as user preferences like whether binary files are
allowed or preferred, etc.
Specifically, the ``LinkEvaluator`` class has an ``evaluate_link()`` method
that returns whether a link is a candidate for install.
Instances of this class are created by the ``PackageFinder`` class's
``make_link_evaluator()`` on a per-requirement basis.
.. _candidate-evaluator-class:
The ``CandidateEvaluator`` class
********************************
The ``CandidateEvaluator`` class contains the business logic for evaluating
which ``InstallationCandidate`` objects should be preferred. This can be
viewed as a determination that is finer-grained than that performed by the
``LinkEvaluator`` class.
In particular, the ``CandidateEvaluator`` class uses the whole set of
``InstallationCandidate`` objects when making its determinations, as opposed
to evaluating each candidate in isolation, as ``LinkEvaluator`` does. For
example, whether a pre-release is eligible for selection or whether a file
whose hash doesn't match is eligible depends on properties of the collection
as a whole.
The ``CandidateEvaluator`` class uses information like the list of `PEP 425`_
tags compatible with the target Python interpreter, hashes provided by the
user, and other user preferences, etc.
Specifically, the class has a ``get_applicable_candidates()`` method.
This accepts the ``InstallationCandidate`` objects resulting from the links
accepted by the ``LinkEvaluator`` class's ``evaluate_link()`` method, and
it further filters them to a list of "applicable" candidates.
The ``CandidateEvaluator`` class also has a ``sort_best_candidate()`` method
that orders the applicable candidates by preference, and then returns the
best (i.e. most preferred).
Finally, the class has a ``compute_best_candidate()`` method that calls
``get_applicable_candidates()`` followed by ``sort_best_candidate()``, and
then returning a :ref:`BestCandidateResult <best-candidate-result-class>`
object encapsulating both the intermediate and final results of the decision.
Instances of ``CandidateEvaluator`` are created by the ``PackageFinder``
class's ``make_candidate_evaluator()`` method on a per-requirement basis.
.. _candidate-preferences-class:
The ``CandidatePreferences`` class
**********************************
The ``CandidatePreferences`` class is a simple container class that groups
together some of the user preferences that ``PackageFinder`` uses to
construct ``CandidateEvaluator`` objects (via the ``PackageFinder`` class's
``make_candidate_evaluator()`` method).
A ``PackageFinder`` instance has a ``_candidate_prefs`` attribute whose value
is a ``CandidatePreferences`` instance. Since ``PackageFinder`` has a number
of responsibilities and options that control its behavior, grouping the
preferences specific to ``CandidateEvaluator`` helps maintainers know which
attributes are needed only for ``CandidateEvaluator``.
.. _best-candidate-result-class:
The ``BestCandidateResult`` class
*********************************
The ``BestCandidateResult`` class is a convenience "container" class that
encapsulates the result of finding the best candidate for a requirement.
(By "container" we mean an object that simply contains data and has no
business logic or state-changing methods of its own.)
The class is the return type of both the ``CandidateEvaluator`` class's
``compute_best_candidate()`` method and the ``PackageFinder`` class's
``find_best_candidate()`` method.
.. _`PEP 425`: https://www.python.org/dev/peps/pep-0425/
.. _`PEP 503`: https://www.python.org/dev/peps/pep-0503/

1
news/5306.bugfix Normal file
View file

@ -0,0 +1 @@
Ignore errors copying socket files for local source installs (in Python 3).

2
news/6705.bugfix Normal file
View file

@ -0,0 +1,2 @@
Fix ``--trusted-host`` processing under HTTPS to trust any port number used
with the host.

1
news/6858.feature Normal file
View file

@ -0,0 +1 @@
Make ``pip show`` warn about packages not found.

1
news/6869.trivial Normal file
View file

@ -0,0 +1 @@
Clarify WheelBuilder.build() a bit

1
news/6883.trivial Normal file
View file

@ -0,0 +1 @@
replace is_vcs_url function by is_vcs Link property

1
news/6885.bugfix Normal file
View file

@ -0,0 +1 @@
Fix 'm' flag erroneously being appended to ABI tag in Python 3.8 on platforms that do not provide SOABI

2
news/6890.bugfix Normal file
View file

@ -0,0 +1,2 @@
Hide security-sensitive strings like passwords in log messages related to
version control system (aka VCS) command invocations.

View file

View file

@ -62,7 +62,7 @@ class SessionCommandMixin(object):
if options.cache_dir else None
),
retries=retries if retries is not None else options.retries,
insecure_hosts=options.trusted_hosts,
trusted_hosts=options.trusted_hosts,
index_urls=self._get_index_urls(options),
)
@ -276,7 +276,6 @@ class RequirementCommand(IndexGroupCommand):
return PackageFinder.create(
search_scope=search_scope,
selection_prefs=selection_prefs,
trusted_hosts=options.trusted_hosts,
session=session,
target_python=target_python,
)

View file

@ -78,7 +78,7 @@ def build_wheels(
# Always build PEP 517 requirements
build_failures = builder.build(
pep517_requirements,
autobuilding=True,
should_unpack=True,
)
if should_build_legacy:
@ -87,7 +87,7 @@ def build_wheels(
# install for those.
builder.build(
legacy_requirements,
autobuilding=True,
should_unpack=True,
)
return build_failures

View file

@ -126,7 +126,6 @@ class ListCommand(IndexGroupCommand):
return PackageFinder.create(
search_scope=search_scope,
selection_prefs=selection_prefs,
trusted_hosts=options.trusted_hosts,
session=session,
)
@ -192,7 +191,7 @@ class ListCommand(IndexGroupCommand):
evaluator = finder.make_candidate_evaluator(
project_name=dist.project_name,
)
best_candidate = evaluator.get_best_candidate(all_candidates)
best_candidate = evaluator.sort_best_candidate(all_candidates)
if best_candidate is None:
continue

View file

@ -60,6 +60,11 @@ def search_packages_info(query):
installed[canonicalize_name(p.project_name)] = p
query_names = [canonicalize_name(name) for name in query]
missing = sorted(
[name for name, pkg in zip(query, query_names) if pkg not in installed]
)
if missing:
logger.warning('Package(s) not found: %s', ', '.join(missing))
for dist in [installed[pkg] for pkg in query_names if pkg in installed]:
package = {

View file

@ -12,7 +12,7 @@ import shutil
import sys
from contextlib import contextmanager
from pip._vendor import requests, urllib3
from pip._vendor import requests, six, urllib3
from pip._vendor.cachecontrol import CacheControlAdapter
from pip._vendor.cachecontrol.caches import FileCache
from pip._vendor.lockfile import LockError
@ -21,6 +21,7 @@ from pip._vendor.requests.auth import AuthBase, HTTPBasicAuth
from pip._vendor.requests.models import CONTENT_CHUNK_SIZE, Response
from pip._vendor.requests.structures import CaseInsensitiveDict
from pip._vendor.requests.utils import get_netrc_auth
from pip._vendor.six import PY2
# NOTE: XMLRPC Client is not annotated in typeshed as on 2017-07-17, which is
# why we ignore the type on this import
from pip._vendor.six.moves import xmlrpc_client # type: ignore
@ -31,9 +32,9 @@ import pip
from pip._internal.exceptions import HashMismatch, InstallationError
from pip._internal.models.index import PyPI
# Import ssl from compat so the initial import occurs in only one place.
from pip._internal.utils.compat import HAS_TLS, ssl
from pip._internal.utils.compat import HAS_TLS, ipaddress, ssl
from pip._internal.utils.encoding import auto_decode
from pip._internal.utils.filesystem import check_path_owner
from pip._internal.utils.filesystem import check_path_owner, copy2_fixed
from pip._internal.utils.glibc import libc_ver
from pip._internal.utils.marker_files import write_delete_marker_file
from pip._internal.utils.misc import (
@ -43,10 +44,14 @@ from pip._internal.utils.misc import (
ask_password,
ask_path_exists,
backup_dir,
build_url_from_netloc,
consume,
display_path,
format_size,
get_installed_version,
hide_url,
netloc_has_port,
path_to_display,
path_to_url,
remove_auth_from_url,
rmtree,
@ -61,20 +66,45 @@ from pip._internal.vcs import vcs
if MYPY_CHECK_RUNNING:
from typing import (
Optional, Tuple, Dict, IO, Text, Union
IO, Callable, Dict, Iterator, List, Optional, Text, Tuple, Union,
)
from optparse import Values
from mypy_extensions import TypedDict
from pip._internal.models.link import Link
from pip._internal.utils.hashes import Hashes
from pip._internal.vcs.versioncontrol import AuthInfo, VersionControl
Credentials = Tuple[str, str, str]
SecureOrigin = Tuple[str, str, Optional[str]]
if PY2:
CopytreeKwargs = TypedDict(
'CopytreeKwargs',
{
'ignore': Callable[[str, List[str]], List[str]],
'symlinks': bool,
},
total=False,
)
else:
CopytreeKwargs = TypedDict(
'CopytreeKwargs',
{
'copy_function': Callable[[str, str], None],
'ignore': Callable[[str, List[str]], List[str]],
'ignore_dangling_symlinks': bool,
'symlinks': bool,
},
total=False,
)
__all__ = ['get_file_content',
'is_url', 'url_to_path', 'path_to_url',
'is_archive_file', 'unpack_vcs_link',
'unpack_file_url', 'is_vcs_url', 'is_file_url',
'unpack_file_url', 'is_file_url',
'unpack_http_url', 'unpack_url',
'parse_content_disposition', 'sanitize_content_filename']
@ -91,6 +121,20 @@ except Exception as exc:
str(exc))
keyring = None
SECURE_ORIGINS = [
# protocol, hostname, port
# Taken from Chrome's list of secure origins (See: http://bit.ly/1qrySKC)
("https", "*", "*"),
("*", "localhost", "*"),
("*", "127.0.0.0/8", "*"),
("*", "::1/128", "*"),
("file", "*", None),
# ssh is always secure.
("ssh", "*", "*"),
] # type: List[SecureOrigin]
# These are environment variables present when running under various
# CI systems. For each variable, some CI systems that use the variable
# are indicated. The collection was chosen so that for each of a number
@ -529,13 +573,21 @@ class PipSession(requests.Session):
timeout = None # type: Optional[int]
def __init__(self, *args, **kwargs):
"""
:param trusted_hosts: Domains not to emit warnings for when not using
HTTPS.
"""
retries = kwargs.pop("retries", 0)
cache = kwargs.pop("cache", None)
insecure_hosts = kwargs.pop("insecure_hosts", [])
trusted_hosts = kwargs.pop("trusted_hosts", []) # type: List[str]
index_urls = kwargs.pop("index_urls", None)
super(PipSession, self).__init__(*args, **kwargs)
# Namespace the attribute with "pip_" just in case to prevent
# possible conflicts with the base class.
self.pip_trusted_hosts = [] # type: List[str]
# Attach our User Agent to the request
self.headers["User-Agent"] = user_agent()
@ -601,14 +653,117 @@ class PipSession(requests.Session):
# Enable file:// urls
self.mount("file://", LocalFSAdapter())
# We want to use a non-validating adapter for any requests which are
# deemed insecure.
for host in insecure_hosts:
self.add_insecure_host(host)
for host in trusted_hosts:
self.add_trusted_host(host, suppress_logging=True)
def add_insecure_host(self, host):
# type: (str) -> None
self.mount('https://{}/'.format(host), self._insecure_adapter)
def add_trusted_host(self, host, source=None, suppress_logging=False):
# type: (str, Optional[str], bool) -> None
"""
:param host: It is okay to provide a host that has previously been
added.
:param source: An optional source string, for logging where the host
string came from.
"""
if not suppress_logging:
msg = 'adding trusted host: {!r}'.format(host)
if source is not None:
msg += ' (from {})'.format(source)
logger.info(msg)
if host not in self.pip_trusted_hosts:
self.pip_trusted_hosts.append(host)
self.mount(build_url_from_netloc(host) + '/', self._insecure_adapter)
if not netloc_has_port(host):
# Mount wildcard ports for the same host.
self.mount(
build_url_from_netloc(host) + ':',
self._insecure_adapter
)
def iter_secure_origins(self):
# type: () -> Iterator[SecureOrigin]
for secure_origin in SECURE_ORIGINS:
yield secure_origin
for host in self.pip_trusted_hosts:
yield ('*', host, '*')
def is_secure_origin(self, location):
# type: (Link) -> bool
# Determine if this url used a secure transport mechanism
parsed = urllib_parse.urlparse(str(location))
origin_protocol, origin_host, origin_port = (
parsed.scheme, parsed.hostname, parsed.port,
)
# The protocol to use to see if the protocol matches.
# Don't count the repository type as part of the protocol: in
# cases such as "git+ssh", only use "ssh". (I.e., Only verify against
# the last scheme.)
origin_protocol = origin_protocol.rsplit('+', 1)[-1]
# Determine if our origin is a secure origin by looking through our
# hardcoded list of secure origins, as well as any additional ones
# configured on this PackageFinder instance.
for secure_origin in self.iter_secure_origins():
secure_protocol, secure_host, secure_port = secure_origin
if origin_protocol != secure_protocol and secure_protocol != "*":
continue
try:
# We need to do this decode dance to ensure that we have a
# unicode object, even on Python 2.x.
addr = ipaddress.ip_address(
origin_host
if (
isinstance(origin_host, six.text_type) or
origin_host is None
)
else origin_host.decode("utf8")
)
network = ipaddress.ip_network(
secure_host
if isinstance(secure_host, six.text_type)
# setting secure_host to proper Union[bytes, str]
# creates problems in other places
else secure_host.decode("utf8") # type: ignore
)
except ValueError:
# We don't have both a valid address or a valid network, so
# we'll check this origin against hostnames.
if (origin_host and
origin_host.lower() != secure_host.lower() and
secure_host != "*"):
continue
else:
# We have a valid address and network, so see if the address
# is contained within the network.
if addr not in network:
continue
# Check to see if the port matches.
if (origin_port != secure_port and
secure_port != "*" and
secure_port is not None):
continue
# If we've gotten here, then this origin matches the current
# secure origin and we should return True
return True
# If we've gotten to this point, then the origin isn't secure and we
# will not accept it as a valid location to search. We will however
# log a warning that we are ignoring it.
logger.warning(
"The repository located at %s is not a trusted or secure host and "
"is being ignored. If this repository is available via HTTPS we "
"recommend you use HTTPS instead, otherwise you may silence "
"this warning and allow it anyway with '--trusted-host %s'.",
origin_host,
origin_host,
)
return False
def request(self, method, url, *args, **kwargs):
# Allow setting a default timeout on a session
@ -721,8 +876,10 @@ def is_archive_file(name):
def unpack_vcs_link(link, location):
# type: (Link, str) -> None
vcs_backend = _get_used_vcs_backend(link)
vcs_backend.unpack(location, url=link.url)
assert vcs_backend is not None
vcs_backend.unpack(location, url=hide_url(link.url))
def _get_used_vcs_backend(link):
@ -736,11 +893,6 @@ def _get_used_vcs_backend(link):
return None
def is_vcs_url(link):
# type: (Link) -> bool
return bool(_get_used_vcs_backend(link))
def is_file_url(link):
# type: (Link) -> bool
return link.url.lower().startswith('file:')
@ -936,6 +1088,46 @@ def unpack_http_url(
os.unlink(from_path)
def _copy2_ignoring_special_files(src, dest):
# type: (str, str) -> None
"""Copying special files is not supported, but as a convenience to users
we skip errors copying them. This supports tools that may create e.g.
socket files in the project source directory.
"""
try:
copy2_fixed(src, dest)
except shutil.SpecialFileError as e:
# SpecialFileError may be raised due to either the source or
# destination. If the destination was the cause then we would actually
# care, but since the destination directory is deleted prior to
# copy we ignore all of them assuming it is caused by the source.
logger.warning(
"Ignoring special file error '%s' encountered copying %s to %s.",
str(e),
path_to_display(src),
path_to_display(dest),
)
def _copy_source_tree(source, target):
# type: (str, str) -> None
def ignore(d, names):
# Pulling in those directories can potentially be very slow,
# exclude the following directories if they appear in the top
# level dir (and only it).
# See discussion at https://github.com/pypa/pip/pull/6770
return ['.tox', '.nox'] if d == source else []
kwargs = dict(ignore=ignore, symlinks=True) # type: CopytreeKwargs
if not PY2:
# Python 2 does not support copy_function, so we only ignore
# errors on special file copy in Python 3.
kwargs['copy_function'] = _copy2_ignoring_special_files
shutil.copytree(source, target, **kwargs)
def unpack_file_url(
link, # type: Link
location, # type: str
@ -951,21 +1143,9 @@ def unpack_file_url(
link_path = url_to_path(link.url_without_fragment)
# If it's a url to a local directory
if is_dir_url(link):
def ignore(d, names):
# Pulling in those directories can potentially be very slow,
# exclude the following directories if they appear in the top
# level dir (and only it).
# See discussion at https://github.com/pypa/pip/pull/6770
return ['.tox', '.nox'] if d == link_path else []
if os.path.isdir(location):
rmtree(location)
shutil.copytree(link_path,
location,
symlinks=True,
ignore=ignore)
_copy_source_tree(link_path, location)
if download_dir:
logger.info('Link is a directory, ignoring download_dir')
return
@ -1055,7 +1235,7 @@ def unpack_url(
would ordinarily raise HashUnsupported) are allowed.
"""
# non-editable vcs urls
if is_vcs_url(link):
if link.is_vcs:
unpack_vcs_link(link, location)
# file urls

View file

@ -12,7 +12,7 @@ import mimetypes
import os
import re
from pip._vendor import html5lib, requests, six
from pip._vendor import html5lib, requests
from pip._vendor.distlib.compat import unescape
from pip._vendor.packaging import specifiers
from pip._vendor.packaging.utils import canonicalize_name
@ -33,7 +33,6 @@ from pip._internal.models.format_control import FormatControl
from pip._internal.models.link import Link
from pip._internal.models.selection_prefs import SelectionPreferences
from pip._internal.models.target_python import TargetPython
from pip._internal.utils.compat import ipaddress
from pip._internal.utils.logging import indent_log
from pip._internal.utils.misc import (
ARCHIVE_EXTENSIONS,
@ -47,10 +46,9 @@ from pip._internal.utils.typing import MYPY_CHECK_RUNNING
from pip._internal.wheel import Wheel
if MYPY_CHECK_RUNNING:
from logging import Logger
from typing import (
Any, Callable, FrozenSet, Iterable, Iterator, List, MutableMapping,
Optional, Sequence, Set, Text, Tuple, Union,
Any, Callable, FrozenSet, Iterable, List, MutableMapping, Optional,
Sequence, Set, Text, Tuple, Union,
)
import xml.etree.ElementTree
from pip._vendor.packaging.version import _BaseVersion
@ -66,23 +64,9 @@ if MYPY_CHECK_RUNNING:
Tuple[int, int, int, _BaseVersion, BuildTag, Optional[int]]
)
HTMLElement = xml.etree.ElementTree.Element
SecureOrigin = Tuple[str, str, Optional[str]]
__all__ = ['FormatControl', 'FoundCandidates', 'PackageFinder']
SECURE_ORIGINS = [
# protocol, hostname, port
# Taken from Chrome's list of secure origins (See: http://bit.ly/1qrySKC)
("https", "*", "*"),
("*", "localhost", "*"),
("*", "127.0.0.0/8", "*"),
("*", "::1/128", "*"),
("file", "*", None),
# ssh is always secure.
("ssh", "*", "*"),
] # type: List[SecureOrigin]
__all__ = ['FormatControl', 'BestCandidateResult', 'PackageFinder']
logger = logging.getLogger(__name__)
@ -545,6 +529,51 @@ class CandidatePreferences(object):
self.prefer_binary = prefer_binary
class BestCandidateResult(object):
"""A collection of candidates, returned by `PackageFinder.find_best_candidate`.
This class is only intended to be instantiated by CandidateEvaluator's
`compute_best_candidate()` method.
"""
def __init__(
self,
candidates, # type: List[InstallationCandidate]
applicable_candidates, # type: List[InstallationCandidate]
best_candidate, # type: Optional[InstallationCandidate]
):
# type: (...) -> None
"""
:param candidates: A sequence of all available candidates found.
:param applicable_candidates: The applicable candidates.
:param best_candidate: The most preferred candidate found, or None
if no applicable candidates were found.
"""
assert set(applicable_candidates) <= set(candidates)
if best_candidate is None:
assert not applicable_candidates
else:
assert best_candidate in applicable_candidates
self._applicable_candidates = applicable_candidates
self._candidates = candidates
self.best_candidate = best_candidate
def iter_all(self):
# type: () -> Iterable[InstallationCandidate]
"""Iterate through all candidates.
"""
return iter(self._candidates)
def iter_applicable(self):
# type: () -> Iterable[InstallationCandidate]
"""Iterate through the applicable candidates.
"""
return iter(self._applicable_candidates)
class CandidateEvaluator(object):
"""
@ -568,6 +597,9 @@ class CandidateEvaluator(object):
:param target_python: The target Python interpreter to use when
checking compatibility. If None (the default), a TargetPython
object will be constructed from the running Python.
:param specifier: An optional object implementing `filter`
(e.g. `packaging.specifiers.SpecifierSet`) to filter applicable
versions.
:param hashes: An optional collection of allowed hashes.
"""
if target_python is None:
@ -643,26 +675,6 @@ class CandidateEvaluator(object):
project_name=self._project_name,
)
def make_found_candidates(
self,
candidates, # type: List[InstallationCandidate]
):
# type: (...) -> FoundCandidates
"""
Create and return a `FoundCandidates` instance.
:param specifier: An optional object implementing `filter`
(e.g. `packaging.specifiers.SpecifierSet`) to filter applicable
versions.
"""
applicable_candidates = self.get_applicable_candidates(candidates)
return FoundCandidates(
candidates,
applicable_candidates=applicable_candidates,
evaluator=self,
)
def _sort_key(self, candidate):
# type: (InstallationCandidate) -> CandidateSortingKey
"""
@ -723,7 +735,7 @@ class CandidateEvaluator(object):
build_tag, pri,
)
def get_best_candidate(
def sort_best_candidate(
self,
candidates, # type: List[InstallationCandidate]
):
@ -753,50 +765,23 @@ class CandidateEvaluator(object):
return best_candidate
class FoundCandidates(object):
"""A collection of candidates, returned by `PackageFinder.find_candidates`.
This class is only intended to be instantiated by CandidateEvaluator's
`make_found_candidates()` method.
"""
def __init__(
def compute_best_candidate(
self,
candidates, # type: List[InstallationCandidate]
applicable_candidates, # type: List[InstallationCandidate]
evaluator, # type: CandidateEvaluator
candidates, # type: List[InstallationCandidate]
):
# type: (...) -> None
# type: (...) -> BestCandidateResult
"""
:param candidates: A sequence of all available candidates found.
:param applicable_candidates: The applicable candidates.
:param evaluator: A CandidateEvaluator object to sort applicable
candidates by order of preference.
Compute and return a `BestCandidateResult` instance.
"""
self._applicable_candidates = applicable_candidates
self._candidates = candidates
self._evaluator = evaluator
applicable_candidates = self.get_applicable_candidates(candidates)
def iter_all(self):
# type: () -> Iterable[InstallationCandidate]
"""Iterate through all candidates.
"""
return iter(self._candidates)
best_candidate = self.sort_best_candidate(applicable_candidates)
def iter_applicable(self):
# type: () -> Iterable[InstallationCandidate]
"""Iterate through the applicable candidates.
"""
return iter(self._applicable_candidates)
def get_best(self):
# type: () -> Optional[InstallationCandidate]
"""Return the best candidate available, or None if no applicable
candidates are found.
"""
candidates = list(self.iter_applicable())
return self._evaluator.get_best_candidate(candidates)
return BestCandidateResult(
candidates,
applicable_candidates=applicable_candidates,
best_candidate=best_candidate,
)
class PackageFinder(object):
@ -813,7 +798,6 @@ class PackageFinder(object):
target_python, # type: TargetPython
allow_yanked, # type: bool
format_control=None, # type: Optional[FormatControl]
trusted_hosts=None, # type: Optional[List[str]]
candidate_prefs=None, # type: CandidatePreferences
ignore_requires_python=None, # type: Optional[bool]
):
@ -829,8 +813,6 @@ class PackageFinder(object):
:param candidate_prefs: Options to use when creating a
CandidateEvaluator object.
"""
if trusted_hosts is None:
trusted_hosts = []
if candidate_prefs is None:
candidate_prefs = CandidatePreferences()
@ -844,7 +826,6 @@ class PackageFinder(object):
self.search_scope = search_scope
self.session = session
self.format_control = format_control
self.trusted_hosts = trusted_hosts
# These are boring links that have already been logged somehow.
self._logged_links = set() # type: Set[Link]
@ -858,7 +839,6 @@ class PackageFinder(object):
cls,
search_scope, # type: SearchScope
selection_prefs, # type: SelectionPreferences
trusted_hosts=None, # type: Optional[List[str]]
session=None, # type: Optional[PipSession]
target_python=None, # type: Optional[TargetPython]
):
@ -867,8 +847,6 @@ class PackageFinder(object):
:param selection_prefs: The candidate selection preferences, as a
SelectionPreferences object.
:param trusted_hosts: Domains not to emit warnings for when not using
HTTPS.
:param session: The Session to use to make requests.
:param target_python: The target Python interpreter to use when
checking compatibility. If None (the default), a TargetPython
@ -894,7 +872,6 @@ class PackageFinder(object):
target_python=target_python,
allow_yanked=selection_prefs.allow_yanked,
format_control=selection_prefs.format_control,
trusted_hosts=trusted_hosts,
ignore_requires_python=selection_prefs.ignore_requires_python,
)
@ -908,6 +885,11 @@ class PackageFinder(object):
# type: () -> List[str]
return self.search_scope.index_urls
@property
def trusted_hosts(self):
# type: () -> Iterable[str]
return iter(self.session.pip_trusted_hosts)
@property
def allow_all_prereleases(self):
# type: () -> bool
@ -917,31 +899,6 @@ class PackageFinder(object):
# type: () -> None
self._candidate_prefs.allow_all_prereleases = True
def add_trusted_host(self, host, source=None):
# type: (str, Optional[str]) -> None
"""
:param source: An optional source string, for logging where the host
string came from.
"""
# It is okay to add a previously added host because PipSession stores
# the resulting prefixes in a dict.
msg = 'adding trusted host: {!r}'.format(host)
if source is not None:
msg += ' (from {})'.format(source)
logger.info(msg)
self.session.add_insecure_host(host)
if host in self.trusted_hosts:
return
self.trusted_hosts.append(host)
def iter_secure_origins(self):
# type: () -> Iterator[SecureOrigin]
for secure_origin in SECURE_ORIGINS:
yield secure_origin
for host in self.trusted_hosts:
yield ('*', host, '*')
@staticmethod
def _sort_locations(locations, expand_dir=False):
# type: (Sequence[str], bool) -> Tuple[List[str], List[str]]
@ -1000,80 +957,6 @@ class PackageFinder(object):
return files, urls
def _validate_secure_origin(self, logger, location):
# type: (Logger, Link) -> bool
# Determine if this url used a secure transport mechanism
parsed = urllib_parse.urlparse(str(location))
origin = (parsed.scheme, parsed.hostname, parsed.port)
# The protocol to use to see if the protocol matches.
# Don't count the repository type as part of the protocol: in
# cases such as "git+ssh", only use "ssh". (I.e., Only verify against
# the last scheme.)
protocol = origin[0].rsplit('+', 1)[-1]
# Determine if our origin is a secure origin by looking through our
# hardcoded list of secure origins, as well as any additional ones
# configured on this PackageFinder instance.
for secure_origin in self.iter_secure_origins():
if protocol != secure_origin[0] and secure_origin[0] != "*":
continue
try:
# We need to do this decode dance to ensure that we have a
# unicode object, even on Python 2.x.
addr = ipaddress.ip_address(
origin[1]
if (
isinstance(origin[1], six.text_type) or
origin[1] is None
)
else origin[1].decode("utf8")
)
network = ipaddress.ip_network(
secure_origin[1]
if isinstance(secure_origin[1], six.text_type)
# setting secure_origin[1] to proper Union[bytes, str]
# creates problems in other places
else secure_origin[1].decode("utf8") # type: ignore
)
except ValueError:
# We don't have both a valid address or a valid network, so
# we'll check this origin against hostnames.
if (origin[1] and
origin[1].lower() != secure_origin[1].lower() and
secure_origin[1] != "*"):
continue
else:
# We have a valid address and network, so see if the address
# is contained within the network.
if addr not in network:
continue
# Check to see if the port patches
if (origin[2] != secure_origin[2] and
secure_origin[2] != "*" and
secure_origin[2] is not None):
continue
# If we've gotten here, then this origin matches the current
# secure origin and we should return True
return True
# If we've gotten to this point, then the origin isn't secure and we
# will not accept it as a valid location to search. We will however
# log a warning that we are ignoring it.
logger.warning(
"The repository located at %s is not a trusted or secure host and "
"is being ignored. If this repository is available via HTTPS we "
"recommend you use HTTPS instead, otherwise you may silence "
"this warning and allow it anyway with '--trusted-host %s'.",
parsed.hostname,
parsed.hostname,
)
return False
def make_link_evaluator(self, project_name):
# type: (str) -> LinkEvaluator
canonical_name = canonicalize_name(project_name)
@ -1117,7 +1000,7 @@ class PackageFinder(object):
(Link(url) for url in index_url_loc),
(Link(url) for url in fl_url_loc),
)
if self._validate_secure_origin(logger, link)
if self.session.is_secure_origin(link)
]
logger.debug('%d location(s) to search for versions of %s:',
@ -1174,20 +1057,20 @@ class PackageFinder(object):
hashes=hashes,
)
def find_candidates(
def find_best_candidate(
self,
project_name, # type: str
specifier=None, # type: Optional[specifiers.BaseSpecifier]
hashes=None, # type: Optional[Hashes]
):
# type: (...) -> FoundCandidates
# type: (...) -> BestCandidateResult
"""Find matches for the given project and specifier.
:param specifier: An optional object implementing `filter`
(e.g. `packaging.specifiers.SpecifierSet`) to filter applicable
versions.
:return: A `FoundCandidates` instance.
:return: A `BestCandidateResult` instance.
"""
candidates = self.find_all_candidates(project_name)
candidate_evaluator = self.make_candidate_evaluator(
@ -1195,7 +1078,7 @@ class PackageFinder(object):
specifier=specifier,
hashes=hashes,
)
return candidate_evaluator.make_found_candidates(candidates)
return candidate_evaluator.compute_best_candidate(candidates)
def find_requirement(self, req, upgrade):
# type: (InstallRequirement, bool) -> Optional[Link]
@ -1206,10 +1089,10 @@ class PackageFinder(object):
Raises DistributionNotFound or BestVersionAlreadyInstalled otherwise
"""
hashes = req.hashes(trust_internet=False)
candidates = self.find_candidates(
best_candidate_result = self.find_best_candidate(
req.name, specifier=req.specifier, hashes=hashes,
)
best_candidate = candidates.get_best()
best_candidate = best_candidate_result.best_candidate
installed_version = None # type: Optional[_BaseVersion]
if req.satisfied_by is not None:
@ -1230,7 +1113,7 @@ class PackageFinder(object):
'Could not find a version that satisfies the requirement %s '
'(from versions: %s)',
req,
_format_versions(candidates.iter_all()),
_format_versions(best_candidate_result.iter_all()),
)
raise DistributionNotFound(
@ -1265,14 +1148,14 @@ class PackageFinder(object):
'Installed version (%s) is most up-to-date (past versions: '
'%s)',
installed_version,
_format_versions(candidates.iter_applicable()),
_format_versions(best_candidate_result.iter_applicable()),
)
raise BestVersionAlreadyInstalled
logger.debug(
'Using version %s (newest of versions: %s)',
best_candidate.version,
_format_versions(candidates.iter_applicable()),
_format_versions(best_candidate_result.iter_applicable()),
)
return best_candidate.link

View file

@ -27,6 +27,15 @@ if MYPY_CHECK_RUNNING:
USER_CACHE_DIR = appdirs.user_cache_dir("pip")
def get_major_minor_version():
# type: () -> str
"""
Return the major-minor version of the current Python as a string, e.g.
"3.7" or "3.10".
"""
return '{}.{}'.format(*sys.version_info)
def get_src_prefix():
if running_under_virtualenv():
src_prefix = os.path.join(sys.prefix, 'src')
@ -131,7 +140,7 @@ def distutils_scheme(dist_name, user=False, home=None, root=None,
sys.prefix,
'include',
'site',
'python' + sys.version[:3],
'python{}'.format(get_major_minor_version()),
dist_name,
)

View file

@ -179,6 +179,13 @@ class Link(KeyBasedCompareMixin):
# type: () -> bool
return self.ext == WHEEL_EXTENSION
@property
def is_vcs(self):
# type: () -> bool
from pip._internal.vcs import vcs
return self.scheme in vcs.all_schemes
@property
def is_artifact(self):
# type: () -> bool
@ -186,12 +193,7 @@ class Link(KeyBasedCompareMixin):
Determines if this points to an actual artifact (e.g. a tarball) or if
it points to an "abstract" thing like a path or a VCS location.
"""
from pip._internal.vcs import vcs
if self.scheme in vcs.all_schemes:
return False
return True
return not self.is_vcs
@property
def is_yanked(self):

View file

@ -16,7 +16,6 @@ from pip._internal.distributions.installed import InstalledDistribution
from pip._internal.download import (
is_dir_url,
is_file_url,
is_vcs_url,
unpack_url,
url_to_path,
)
@ -163,7 +162,7 @@ class RequirementPreparer(object):
# we would report less-useful error messages for
# unhashable requirements, complaining that there's no
# hash provided.
if is_vcs_url(link):
if link.is_vcs:
raise VcsHashUnsupported()
elif is_file_url(link) and is_dir_url(link):
raise DirectoryUrlHashUnsupported()

View file

@ -111,20 +111,17 @@ def get_abi_tag():
d = ''
m = ''
u = ''
if get_flag('Py_DEBUG',
lambda: hasattr(sys, 'gettotalrefcount'),
warn=(impl == 'cp')):
is_cpython = (impl == 'cp')
if get_flag(
'Py_DEBUG', lambda: hasattr(sys, 'gettotalrefcount'),
warn=is_cpython):
d = 'd'
if get_flag('WITH_PYMALLOC',
lambda: impl == 'cp',
warn=(impl == 'cp')):
if sys.version_info < (3, 8) and get_flag(
'WITH_PYMALLOC', lambda: is_cpython, warn=is_cpython):
m = 'm'
if get_flag('Py_UNICODE_SIZE',
lambda: sys.maxunicode == 0x10ffff,
expected=4,
warn=(impl == 'cp' and
sys.version_info < (3, 3))) \
and sys.version_info < (3, 3):
if sys.version_info < (3, 3) and get_flag(
'Py_UNICODE_SIZE', lambda: sys.maxunicode == 0x10ffff,
expected=4, warn=is_cpython):
u = 'u'
abi = '%s%s%s%s%s' % (impl, get_impl_ver(), d, m, u)
elif soabi and soabi.startswith('cpython-'):

View file

@ -272,7 +272,7 @@ def process_line(
finder.set_allow_all_prereleases()
for host in opts.trusted_hosts or []:
source = 'line {} of {}'.format(line_number, filename)
finder.add_trusted_host(host, source=source)
session.add_trusted_host(host, source=source)
def break_args_options(line):

View file

@ -38,6 +38,7 @@ from pip._internal.utils.misc import (
dist_in_usersite,
ensure_dir,
get_installed_version,
hide_url,
redact_password_from_url,
rmtree,
)
@ -813,11 +814,11 @@ class InstallRequirement(object):
vc_type, url = self.link.url.split('+', 1)
vcs_backend = vcs.get_backend(vc_type)
if vcs_backend:
url = self.link.url
hidden_url = hide_url(self.link.url)
if obtain:
vcs_backend.obtain(self.source_dir, url=url)
vcs_backend.obtain(self.source_dir, url=hidden_url)
else:
vcs_backend.export(self.source_dir, url=url)
vcs_backend.export(self.source_dir, url=hidden_url)
else:
assert 0, (
'Unexpected version control type (in %s): %s'

View file

@ -52,6 +52,21 @@ class RequirementSet(object):
return ('<%s object; %d requirement(s): %s>'
% (self.__class__.__name__, len(reqs), reqs_str))
def add_unnamed_requirement(self, install_req):
# type: (InstallRequirement) -> None
assert not install_req.name
self.unnamed_requirements.append(install_req)
def add_named_requirement(self, install_req):
# type: (InstallRequirement) -> None
assert install_req.name
name = install_req.name
self.requirements[name] = install_req
# FIXME: what about other normalizations? E.g., _ vs. -?
if name.lower() != name:
self.requirement_aliases[name.lower()] = name
def add_requirement(
self,
install_req, # type: InstallRequirement
@ -105,8 +120,7 @@ class RequirementSet(object):
# Unnamed requirements are scanned again and the requirement won't be
# added as a dependency until after scanning.
if not name:
# url or path requirement w/o an egg fragment
self.unnamed_requirements.append(install_req)
self.add_unnamed_requirement(install_req)
return [install_req], None
try:
@ -130,11 +144,8 @@ class RequirementSet(object):
# When no existing requirement exists, add the requirement as a
# dependency and it will be scanned again after.
if not existing_req:
self.requirements[name] = install_req
# FIXME: what about other normalizations? E.g., _ vs. -?
if name.lower() != name:
self.requirement_aliases[name.lower()] = name
# We'd want to rescan this requirements later
self.add_named_requirement(install_req)
# We'd want to rescan this requirement later
return [install_req], install_req
# Assume there's no need to scan, and that we've already

View file

@ -1,5 +1,7 @@
import os
import os.path
import shutil
import stat
from pip._internal.utils.compat import get_path_uid
@ -28,3 +30,32 @@ def check_path_owner(path):
else:
previous, path = path, os.path.dirname(path)
return False # assume we don't own the path
def copy2_fixed(src, dest):
# type: (str, str) -> None
"""Wrap shutil.copy2() but map errors copying socket files to
SpecialFileError as expected.
See also https://bugs.python.org/issue37700.
"""
try:
shutil.copy2(src, dest)
except (OSError, IOError):
for f in [src, dest]:
try:
is_socket_file = is_socket(f)
except OSError:
# An error has already occurred. Another error here is not
# a problem and we can ignore it.
pass
else:
if is_socket_file:
raise shutil.SpecialFileError("`%s` is a socket" % f)
raise
def is_socket(path):
# type: (str) -> bool
return stat.S_ISSOCK(os.lstat(path).st_mode)

View file

@ -31,7 +31,11 @@ from pip._vendor.six.moves.urllib.parse import unquote as urllib_unquote
from pip import __version__
from pip._internal.exceptions import CommandError, InstallationError
from pip._internal.locations import site_packages, user_site
from pip._internal.locations import (
get_major_minor_version,
site_packages,
user_site,
)
from pip._internal.utils.compat import (
WINDOWS,
console_to_str,
@ -61,6 +65,7 @@ if MYPY_CHECK_RUNNING:
from pip._internal.utils.ui import SpinnerInterface
VersionInfo = Tuple[int, int, int]
CommandArgs = List[Union[str, 'HiddenText']]
else:
# typing's cast() is needed at runtime, but we don't want to import typing.
# Thus, we use a dummy no-op version, which we tell mypy to ignore.
@ -116,7 +121,7 @@ def get_pip_version():
return (
'pip {} from {} (python {})'.format(
__version__, pip_pkg_dir, sys.version[:3],
__version__, pip_pkg_dir, get_major_minor_version(),
)
)
@ -749,8 +754,8 @@ def unpack_file(
is_svn_page(file_contents(filename))):
# We don't really care about this
from pip._internal.vcs.subversion import Subversion
url = 'svn+' + link.url
Subversion().unpack(location, url=url)
hidden_url = hide_url('svn+' + link.url)
Subversion().unpack(location, url=hidden_url)
else:
# FIXME: handle?
# FIXME: magic signatures?
@ -764,16 +769,52 @@ def unpack_file(
)
def make_command(*args):
# type: (Union[str, HiddenText, CommandArgs]) -> CommandArgs
"""
Create a CommandArgs object.
"""
command_args = [] # type: CommandArgs
for arg in args:
# Check for list instead of CommandArgs since CommandArgs is
# only known during type-checking.
if isinstance(arg, list):
command_args.extend(arg)
else:
# Otherwise, arg is str or HiddenText.
command_args.append(arg)
return command_args
def format_command_args(args):
# type: (List[str]) -> str
# type: (Union[List[str], CommandArgs]) -> str
"""
Format command arguments for display.
"""
return ' '.join(shlex_quote(arg) for arg in args)
# For HiddenText arguments, display the redacted form by calling str().
# Also, we don't apply str() to arguments that aren't HiddenText since
# this can trigger a UnicodeDecodeError in Python 2 if the argument
# has type unicode and includes a non-ascii character. (The type
# checker doesn't ensure the annotations are correct in all cases.)
return ' '.join(
shlex_quote(str(arg)) if isinstance(arg, HiddenText)
else shlex_quote(arg) for arg in args
)
def reveal_command_args(args):
# type: (Union[List[str], CommandArgs]) -> List[str]
"""
Return the arguments in their raw, unredacted form.
"""
return [
arg.secret if isinstance(arg, HiddenText) else arg for arg in args
]
def make_subprocess_output_error(
cmd_args, # type: List[str]
cmd_args, # type: Union[List[str], CommandArgs]
cwd, # type: Optional[str]
lines, # type: List[Text]
exit_status, # type: int
@ -815,7 +856,7 @@ def make_subprocess_output_error(
def call_subprocess(
cmd, # type: List[str]
cmd, # type: Union[List[str], CommandArgs]
show_stdout=False, # type: bool
cwd=None, # type: Optional[str]
on_returncode='raise', # type: str
@ -882,7 +923,9 @@ def call_subprocess(
env.pop(name, None)
try:
proc = subprocess.Popen(
cmd, stderr=subprocess.STDOUT, stdin=subprocess.PIPE,
# Convert HiddenText objects to the underlying str.
reveal_command_args(cmd),
stderr=subprocess.STDOUT, stdin=subprocess.PIPE,
stdout=subprocess.PIPE, cwd=cwd, env=env,
)
proc.stdin.close()
@ -1081,6 +1124,27 @@ def path_to_url(path):
return url
def build_url_from_netloc(netloc, scheme='https'):
# type: (str, str) -> str
"""
Build a full URL from a netloc.
"""
if netloc.count(':') >= 2 and '@' not in netloc and '[' not in netloc:
# It must be a bare IPv6 address, so wrap it with brackets.
netloc = '[{}]'.format(netloc)
return '{}://{}'.format(scheme, netloc)
def netloc_has_port(netloc):
# type: (str) -> bool
"""
Return whether the netloc has a port part.
"""
url = build_url_from_netloc(netloc)
parsed = urllib_parse.urlparse(url)
return bool(parsed.port)
def split_auth_from_netloc(netloc):
"""
Parse out and remove the auth information from a netloc.
@ -1178,6 +1242,52 @@ def redact_password_from_url(url):
return _transform_url(url, _redact_netloc)[0]
class HiddenText(object):
def __init__(
self,
secret, # type: str
redacted, # type: str
):
# type: (...) -> None
self.secret = secret
self.redacted = redacted
def __repr__(self):
# type: (...) -> str
return '<HiddenText {!r}>'.format(str(self))
def __str__(self):
# type: (...) -> str
return self.redacted
# This is useful for testing.
def __eq__(self, other):
# type: (Any) -> bool
if type(self) != type(other):
return False
# The string being used for redaction doesn't also have to match,
# just the raw, original string.
return (self.secret == other.secret)
# We need to provide an explicit __ne__ implementation for Python 2.
# TODO: remove this when we drop PY2 support.
def __ne__(self, other):
# type: (Any) -> bool
return not self == other
def hide_value(value):
# type: (str) -> HiddenText
return HiddenText(value, redacted='****')
def hide_url(url):
# type: (str) -> HiddenText
redacted = redact_password_from_url(url)
return HiddenText(url, redacted=redacted)
def protect_pip_from_modification_on_windows(modifying_pip):
"""Protection of pip.exe from modification on Windows

View file

@ -136,13 +136,12 @@ def pip_version_check(session, options):
finder = PackageFinder.create(
search_scope=search_scope,
selection_prefs=selection_prefs,
trusted_hosts=options.trusted_hosts,
session=session,
)
candidate = finder.find_candidates("pip").get_best()
if candidate is None:
best_candidate = finder.find_best_candidate("pip").best_candidate
if best_candidate is None:
return
pypi_version = str(candidate.version)
pypi_version = str(best_candidate.version)
# save that we've performed a check
state.save(pypi_version, current_time)

View file

@ -5,9 +5,21 @@ import os
from pip._vendor.six.moves.urllib import parse as urllib_parse
from pip._internal.utils.misc import display_path, path_to_url, rmtree
from pip._internal.utils.misc import (
display_path,
make_command,
path_to_url,
rmtree,
)
from pip._internal.utils.typing import MYPY_CHECK_RUNNING
from pip._internal.vcs.versioncontrol import VersionControl, vcs
if MYPY_CHECK_RUNNING:
from typing import Optional, Tuple
from pip._internal.utils.misc import HiddenText
from pip._internal.vcs.versioncontrol import AuthInfo, RevOptions
logger = logging.getLogger(__name__)
@ -32,6 +44,7 @@ class Bazaar(VersionControl):
return ['-r', rev]
def export(self, location, url):
# type: (str, HiddenText) -> None
"""
Export the Bazaar repository at the url to the destination location
"""
@ -41,11 +54,12 @@ class Bazaar(VersionControl):
url, rev_options = self.get_url_rev_options(url)
self.run_command(
['export', location, url] + rev_options.to_args(),
make_command('export', location, url, rev_options.to_args()),
show_stdout=False,
)
def fetch_new(self, dest, url, rev_options):
# type: (str, HiddenText, RevOptions) -> None
rev_display = rev_options.to_display()
logger.info(
'Checking out %s%s to %s',
@ -53,18 +67,23 @@ class Bazaar(VersionControl):
rev_display,
display_path(dest),
)
cmd_args = ['branch', '-q'] + rev_options.to_args() + [url, dest]
cmd_args = (
make_command('branch', '-q', rev_options.to_args(), url, dest)
)
self.run_command(cmd_args)
def switch(self, dest, url, rev_options):
self.run_command(['switch', url], cwd=dest)
# type: (str, HiddenText, RevOptions) -> None
self.run_command(make_command('switch', url), cwd=dest)
def update(self, dest, url, rev_options):
cmd_args = ['pull', '-q'] + rev_options.to_args()
# type: (str, HiddenText, RevOptions) -> None
cmd_args = make_command('pull', '-q', rev_options.to_args())
self.run_command(cmd_args, cwd=dest)
@classmethod
def get_url_rev_and_auth(cls, url):
# type: (str) -> Tuple[str, Optional[str], AuthInfo]
# hotfix the URL scheme after removing bzr+ from bzr+ssh:// readd it
url, rev, user_pass = super(Bazaar, cls).get_url_rev_and_auth(url)
if url.startswith('ssh://'):

View file

@ -10,14 +10,21 @@ from pip._vendor.six.moves.urllib import request as urllib_request
from pip._internal.exceptions import BadCommand
from pip._internal.utils.compat import samefile
from pip._internal.utils.misc import display_path, redact_password_from_url
from pip._internal.utils.misc import display_path, make_command
from pip._internal.utils.temp_dir import TempDirectory
from pip._internal.utils.typing import MYPY_CHECK_RUNNING
from pip._internal.vcs.versioncontrol import (
RemoteNotFoundError,
VersionControl,
vcs,
)
if MYPY_CHECK_RUNNING:
from typing import Optional, Tuple
from pip._internal.utils.misc import HiddenText
from pip._internal.vcs.versioncontrol import AuthInfo, RevOptions
urlsplit = urllib_parse.urlsplit
urlunsplit = urllib_parse.urlunsplit
@ -83,6 +90,7 @@ class Git(VersionControl):
return None
def export(self, location, url):
# type: (str, HiddenText) -> None
"""Export the Git repository at the url to the destination location"""
if not location.endswith('/'):
location = location + '/'
@ -131,6 +139,7 @@ class Git(VersionControl):
@classmethod
def resolve_revision(cls, dest, url, rev_options):
# type: (str, HiddenText, RevOptions) -> RevOptions
"""
Resolve a revision to a new RevOptions object with the SHA1 of the
branch, tag, or ref if found.
@ -139,6 +148,10 @@ class Git(VersionControl):
rev_options: a RevOptions object.
"""
rev = rev_options.arg_rev
# The arg_rev property's implementation for Git ensures that the
# rev return value is always non-None.
assert rev is not None
sha, is_branch = cls.get_revision_sha(dest, rev)
if sha is not None:
@ -160,7 +173,7 @@ class Git(VersionControl):
# If it looks like a ref, we have to fetch it explicitly.
cls.run_command(
['fetch', '-q', url] + rev_options.to_args(),
make_command('fetch', '-q', url, rev_options.to_args()),
cwd=dest,
)
# Change the revision to the SHA of the ref we fetched
@ -185,12 +198,10 @@ class Git(VersionControl):
return cls.get_revision(dest) == name
def fetch_new(self, dest, url, rev_options):
# type: (str, HiddenText, RevOptions) -> None
rev_display = rev_options.to_display()
logger.info(
'Cloning %s%s to %s', redact_password_from_url(url),
rev_display, display_path(dest),
)
self.run_command(['clone', '-q', url, dest])
logger.info('Cloning %s%s to %s', url, rev_display, display_path(dest))
self.run_command(make_command('clone', '-q', url, dest))
if rev_options.rev:
# Then a specific revision was requested.
@ -200,7 +211,9 @@ class Git(VersionControl):
# Only do a checkout if the current commit id doesn't match
# the requested revision.
if not self.is_commit_id_equal(dest, rev_options.rev):
cmd_args = ['checkout', '-q'] + rev_options.to_args()
cmd_args = make_command(
'checkout', '-q', rev_options.to_args(),
)
self.run_command(cmd_args, cwd=dest)
elif self.get_current_branch(dest) != branch_name:
# Then a specific branch was requested, and that branch
@ -215,13 +228,18 @@ class Git(VersionControl):
self.update_submodules(dest)
def switch(self, dest, url, rev_options):
self.run_command(['config', 'remote.origin.url', url], cwd=dest)
cmd_args = ['checkout', '-q'] + rev_options.to_args()
# type: (str, HiddenText, RevOptions) -> None
self.run_command(
make_command('config', 'remote.origin.url', url),
cwd=dest,
)
cmd_args = make_command('checkout', '-q', rev_options.to_args())
self.run_command(cmd_args, cwd=dest)
self.update_submodules(dest)
def update(self, dest, url, rev_options):
# type: (str, HiddenText, RevOptions) -> None
# First fetch changes from the default remote
if self.get_git_version() >= parse_version('1.9.0'):
# fetch tags in addition to everything else
@ -230,7 +248,7 @@ class Git(VersionControl):
self.run_command(['fetch', '-q'], cwd=dest)
# Then reset to wanted revision (maybe even origin/master)
rev_options = self.resolve_revision(dest, url, rev_options)
cmd_args = ['reset', '--hard', '-q'] + rev_options.to_args()
cmd_args = make_command('reset', '--hard', '-q', rev_options.to_args())
self.run_command(cmd_args, cwd=dest)
#: update submodules
self.update_submodules(dest)
@ -300,6 +318,7 @@ class Git(VersionControl):
@classmethod
def get_url_rev_and_auth(cls, url):
# type: (str) -> Tuple[str, Optional[str], AuthInfo]
"""
Prefixes stub URLs like 'user@hostname:user/repo.git' with 'ssh://'.
That's required because although they use SSH they sometimes don't

View file

@ -5,10 +5,16 @@ import os
from pip._vendor.six.moves import configparser
from pip._internal.utils.misc import display_path, path_to_url
from pip._internal.utils.misc import display_path, make_command, path_to_url
from pip._internal.utils.temp_dir import TempDirectory
from pip._internal.utils.typing import MYPY_CHECK_RUNNING
from pip._internal.vcs.versioncontrol import VersionControl, vcs
if MYPY_CHECK_RUNNING:
from pip._internal.utils.misc import HiddenText
from pip._internal.vcs.versioncontrol import RevOptions
logger = logging.getLogger(__name__)
@ -23,6 +29,7 @@ class Mercurial(VersionControl):
return [rev]
def export(self, location, url):
# type: (str, HiddenText) -> None
"""Export the Hg repository at the url to the destination location"""
with TempDirectory(kind="export") as temp_dir:
self.unpack(temp_dir.path, url=url)
@ -32,6 +39,7 @@ class Mercurial(VersionControl):
)
def fetch_new(self, dest, url, rev_options):
# type: (str, HiddenText, RevOptions) -> None
rev_display = rev_options.to_display()
logger.info(
'Cloning hg %s%s to %s',
@ -39,16 +47,19 @@ class Mercurial(VersionControl):
rev_display,
display_path(dest),
)
self.run_command(['clone', '--noupdate', '-q', url, dest])
cmd_args = ['update', '-q'] + rev_options.to_args()
self.run_command(cmd_args, cwd=dest)
self.run_command(make_command('clone', '--noupdate', '-q', url, dest))
self.run_command(
make_command('update', '-q', rev_options.to_args()),
cwd=dest,
)
def switch(self, dest, url, rev_options):
# type: (str, HiddenText, RevOptions) -> None
repo_config = os.path.join(dest, self.dirname, 'hgrc')
config = configparser.RawConfigParser()
try:
config.read(repo_config)
config.set('paths', 'default', url)
config.set('paths', 'default', url.secret)
with open(repo_config, 'w') as config_file:
config.write(config_file)
except (OSError, configparser.NoSectionError) as exc:
@ -56,12 +67,13 @@ class Mercurial(VersionControl):
'Could not switch Mercurial repository to %s: %s', url, exc,
)
else:
cmd_args = ['update', '-q'] + rev_options.to_args()
cmd_args = make_command('update', '-q', rev_options.to_args())
self.run_command(cmd_args, cwd=dest)
def update(self, dest, url, rev_options):
# type: (str, HiddenText, RevOptions) -> None
self.run_command(['pull', '-q'], cwd=dest)
cmd_args = ['update', '-q'] + rev_options.to_args()
cmd_args = make_command('update', '-q', rev_options.to_args())
self.run_command(cmd_args, cwd=dest)
@classmethod

View file

@ -8,6 +8,7 @@ import sys
from pip._internal.utils.logging import indent_log
from pip._internal.utils.misc import (
display_path,
make_command,
rmtree,
split_auth_from_netloc,
)
@ -21,8 +22,10 @@ _svn_info_xml_url_re = re.compile(r'<url>(.*)</url>')
if MYPY_CHECK_RUNNING:
from typing import List, Optional, Tuple
from pip._internal.vcs.versioncontrol import RevOptions
from typing import Optional, Tuple
from pip._internal.utils.misc import CommandArgs, HiddenText
from pip._internal.vcs.versioncontrol import AuthInfo, RevOptions
logger = logging.getLogger(__name__)
@ -84,6 +87,7 @@ class Subversion(VersionControl):
@classmethod
def get_url_rev_and_auth(cls, url):
# type: (str) -> Tuple[str, Optional[str], AuthInfo]
# hotfix the URL scheme after removing svn+ from svn+ssh:// readd it
url, rev, user_pass = super(Subversion, cls).get_url_rev_and_auth(url)
if url.startswith('ssh://'):
@ -92,7 +96,8 @@ class Subversion(VersionControl):
@staticmethod
def make_rev_args(username, password):
extra_args = []
# type: (Optional[str], Optional[HiddenText]) -> CommandArgs
extra_args = [] # type: CommandArgs
if username:
extra_args += ['--username', username]
if password:
@ -240,7 +245,7 @@ class Subversion(VersionControl):
return vcs_version
def get_remote_call_options(self):
# type: () -> List[str]
# type: () -> CommandArgs
"""Return options to be used on calls to Subversion that contact the server.
These options are applicable for the following ``svn`` subcommands used
@ -273,6 +278,7 @@ class Subversion(VersionControl):
return []
def export(self, location, url):
# type: (str, HiddenText) -> None
"""Export the svn repository at the url to the destination location"""
url, rev_options = self.get_url_rev_options(url)
@ -282,12 +288,14 @@ class Subversion(VersionControl):
# Subversion doesn't like to check out over an existing
# directory --force fixes this, but was only added in svn 1.5
rmtree(location)
cmd_args = (['export'] + self.get_remote_call_options() +
rev_options.to_args() + [url, location])
cmd_args = make_command(
'export', self.get_remote_call_options(),
rev_options.to_args(), url, location,
)
self.run_command(cmd_args, show_stdout=False)
def fetch_new(self, dest, url, rev_options):
# type: (str, str, RevOptions) -> None
# type: (str, HiddenText, RevOptions) -> None
rev_display = rev_options.to_display()
logger.info(
'Checking out %s%s to %s',
@ -295,21 +303,26 @@ class Subversion(VersionControl):
rev_display,
display_path(dest),
)
cmd_args = (['checkout', '-q'] +
self.get_remote_call_options() +
rev_options.to_args() + [url, dest])
cmd_args = make_command(
'checkout', '-q', self.get_remote_call_options(),
rev_options.to_args(), url, dest,
)
self.run_command(cmd_args)
def switch(self, dest, url, rev_options):
# type: (str, str, RevOptions) -> None
cmd_args = (['switch'] + self.get_remote_call_options() +
rev_options.to_args() + [url, dest])
# type: (str, HiddenText, RevOptions) -> None
cmd_args = make_command(
'switch', self.get_remote_call_options(), rev_options.to_args(),
url, dest,
)
self.run_command(cmd_args)
def update(self, dest, url, rev_options):
# type: (str, str, RevOptions) -> None
cmd_args = (['update'] + self.get_remote_call_options() +
rev_options.to_args() + [dest])
# type: (str, HiddenText, RevOptions) -> None
cmd_args = make_command(
'update', self.get_remote_call_options(), rev_options.to_args(),
dest,
)
self.run_command(cmd_args)

View file

@ -16,18 +16,23 @@ from pip._internal.utils.misc import (
backup_dir,
call_subprocess,
display_path,
hide_url,
hide_value,
make_command,
rmtree,
)
from pip._internal.utils.typing import MYPY_CHECK_RUNNING
if MYPY_CHECK_RUNNING:
from typing import (
Any, Dict, Iterable, List, Mapping, Optional, Text, Tuple, Type
Any, Dict, Iterable, List, Mapping, Optional, Text, Tuple, Type, Union
)
from pip._internal.utils.ui import SpinnerInterface
from pip._internal.utils.misc import CommandArgs, HiddenText
AuthInfo = Tuple[Optional[str], Optional[str]]
__all__ = ['vcs']
@ -67,7 +72,7 @@ class RevOptions(object):
self,
vc_class, # type: Type[VersionControl]
rev=None, # type: Optional[str]
extra_args=None, # type: Optional[List[str]]
extra_args=None, # type: Optional[CommandArgs]
):
# type: (...) -> None
"""
@ -82,6 +87,7 @@ class RevOptions(object):
self.extra_args = extra_args
self.rev = rev
self.vc_class = vc_class
self.branch_name = None # type: Optional[str]
def __repr__(self):
return '<RevOptions {}: rev={!r}>'.format(self.vc_class.name, self.rev)
@ -95,11 +101,11 @@ class RevOptions(object):
return self.rev
def to_args(self):
# type: () -> List[str]
# type: () -> CommandArgs
"""
Return the VCS-specific command arguments.
"""
args = [] # type: List[str]
args = [] # type: CommandArgs
rev = self.arg_rev
if rev is not None:
args += self.vc_class.get_base_rev_args(rev)
@ -270,7 +276,7 @@ class VersionControl(object):
@classmethod
def make_rev_options(cls, rev=None, extra_args=None):
# type: (Optional[str], Optional[List[str]]) -> RevOptions
# type: (Optional[str], Optional[CommandArgs]) -> RevOptions
"""
Return a RevOptions object.
@ -291,6 +297,7 @@ class VersionControl(object):
return repo.startswith(os.path.sep) or bool(drive)
def export(self, location, url):
# type: (str, HiddenText) -> None
"""
Export the repository at the url to the destination location
i.e. only download the files, without vcs informations
@ -345,23 +352,27 @@ class VersionControl(object):
@staticmethod
def make_rev_args(username, password):
# type: (Optional[str], Optional[HiddenText]) -> CommandArgs
"""
Return the RevOptions "extra arguments" to use in obtain().
"""
return []
def get_url_rev_options(self, url):
# type: (str) -> Tuple[str, RevOptions]
# type: (HiddenText) -> Tuple[HiddenText, RevOptions]
"""
Return the URL and RevOptions object to use in obtain() and in
some cases export(), as a tuple (url, rev_options).
"""
url, rev, user_pass = self.get_url_rev_and_auth(url)
username, password = user_pass
secret_url, rev, user_pass = self.get_url_rev_and_auth(url.secret)
username, secret_password = user_pass
password = None # type: Optional[HiddenText]
if secret_password is not None:
password = hide_value(secret_password)
extra_args = self.make_rev_args(username, password)
rev_options = self.make_rev_options(rev, extra_args=extra_args)
return url, rev_options
return hide_url(secret_url), rev_options
@staticmethod
def normalize_url(url):
@ -381,6 +392,7 @@ class VersionControl(object):
return (cls.normalize_url(url1) == cls.normalize_url(url2))
def fetch_new(self, dest, url, rev_options):
# type: (str, HiddenText, RevOptions) -> None
"""
Fetch a revision from a repository, in the case that this is the
first fetch from the repository.
@ -392,6 +404,7 @@ class VersionControl(object):
raise NotImplementedError
def switch(self, dest, url, rev_options):
# type: (str, HiddenText, RevOptions) -> None
"""
Switch the repo at ``dest`` to point to ``URL``.
@ -401,6 +414,7 @@ class VersionControl(object):
raise NotImplementedError
def update(self, dest, url, rev_options):
# type: (str, HiddenText, RevOptions) -> None
"""
Update an already-existing repo to the given ``rev_options``.
@ -421,7 +435,7 @@ class VersionControl(object):
raise NotImplementedError
def obtain(self, dest, url):
# type: (str, str) -> None
# type: (str, HiddenText) -> None
"""
Install or update in editable mode the package represented by this
VersionControl object.
@ -438,7 +452,7 @@ class VersionControl(object):
rev_display = rev_options.to_display()
if self.is_repository_directory(dest):
existing_url = self.get_remote_url(dest)
if self.compare_urls(existing_url, url):
if self.compare_urls(existing_url, url.secret):
logger.debug(
'%s in %s exists, and has correct URL (%s)',
self.repo_name.title(),
@ -514,7 +528,7 @@ class VersionControl(object):
self.switch(dest, url, rev_options)
def unpack(self, location, url):
# type: (str, str) -> None
# type: (str, HiddenText) -> None
"""
Clean up current location and download the url repository
(and vcs infos) into location
@ -545,7 +559,7 @@ class VersionControl(object):
@classmethod
def run_command(
cls,
cmd, # type: List[str]
cmd, # type: Union[List[str], CommandArgs]
show_stdout=True, # type: bool
cwd=None, # type: Optional[str]
on_returncode='raise', # type: str
@ -560,7 +574,7 @@ class VersionControl(object):
This is simply a wrapper around call_subprocess that adds the VCS
command name, and checks that the VCS is available
"""
cmd = [cls.name] + cmd
cmd = make_command(cls.name, *cmd)
try:
return call_subprocess(cmd, show_stdout, cwd,
on_returncode=on_returncode,

View file

@ -33,7 +33,7 @@ from pip._internal.exceptions import (
InvalidWheelFilename,
UnsupportedWheel,
)
from pip._internal.locations import distutils_scheme
from pip._internal.locations import distutils_scheme, get_major_minor_version
from pip._internal.models.link import Link
from pip._internal.utils.logging import indent_log
from pip._internal.utils.marker_files import PIP_DELETE_MARKER_FILENAME
@ -560,10 +560,10 @@ if __name__ == '__main__':
generated.extend(maker.make(spec))
if os.environ.get("ENSUREPIP_OPTIONS", "") != "altinstall":
spec = 'pip%s = %s' % (sys.version[:1], pip_script)
spec = 'pip%s = %s' % (sys.version_info[0], pip_script)
generated.extend(maker.make(spec))
spec = 'pip%s = %s' % (sys.version[:3], pip_script)
spec = 'pip%s = %s' % (get_major_minor_version(), pip_script)
generated.extend(maker.make(spec))
# Delete any other versioned pip entry points
pip_ep = [k for k in console if re.match(r'pip(\d(\.\d)?)?$', k)]
@ -575,7 +575,9 @@ if __name__ == '__main__':
spec = 'easy_install = ' + easy_install_script
generated.extend(maker.make(spec))
spec = 'easy_install-%s = %s' % (sys.version[:3], easy_install_script)
spec = 'easy_install-%s = %s' % (
get_major_minor_version(), easy_install_script,
)
generated.extend(maker.make(spec))
# Delete any other versioned easy_install entry points
easy_install_ep = [
@ -774,7 +776,7 @@ def _contains_egg_info(
def should_use_ephemeral_cache(
req, # type: InstallRequirement
format_control, # type: FormatControl
autobuilding, # type: bool
should_unpack, # type: bool
cache_available # type: bool
):
# type: (...) -> Optional[bool]
@ -783,20 +785,25 @@ def should_use_ephemeral_cache(
ephemeral cache.
:param cache_available: whether a cache directory is available for the
autobuilding=True case.
should_unpack=True case.
:return: True or False to build the requirement with ephem_cache=True
or False, respectively; or None not to build the requirement.
"""
if req.constraint:
# never build requirements that are merely constraints
return None
if req.is_wheel:
if not autobuilding:
if not should_unpack:
logger.info(
'Skipping %s, due to already being wheel.', req.name,
)
return None
if not autobuilding:
if not should_unpack:
# i.e. pip wheel, not pip install;
# return False, knowing that the caller will never cache
# in this case anyway, so this return merely means "build it".
# TODO improve this behavior
return False
if req.editable or not req.source_dir:
@ -810,7 +817,7 @@ def should_use_ephemeral_cache(
)
return None
if req.link and not req.link.is_artifact:
if req.link and req.link.is_vcs:
# VCS checkout. Build wheel just for this run.
return True
@ -1031,23 +1038,34 @@ class WheelBuilder(object):
def build(
self,
requirements, # type: Iterable[InstallRequirement]
autobuilding=False # type: bool
should_unpack=False # type: bool
):
# type: (...) -> List[InstallRequirement]
"""Build wheels.
:param unpack: If True, replace the sdist we built from with the
newly built wheel, in preparation for installation.
:param should_unpack: If True, after building the wheel, unpack it
and replace the sdist with the unpacked version in preparation
for installation.
:return: True if all the wheels built correctly.
"""
# pip install uses should_unpack=True.
# pip install never provides a _wheel_dir.
# pip wheel uses should_unpack=False.
# pip wheel always provides a _wheel_dir (via the preparer).
assert (
(should_unpack and not self._wheel_dir) or
(not should_unpack and self._wheel_dir)
)
buildset = []
format_control = self.finder.format_control
# Whether a cache directory is available for autobuilding=True.
cache_available = bool(self._wheel_dir or self.wheel_cache.cache_dir)
cache_available = bool(self.wheel_cache.cache_dir)
for req in requirements:
ephem_cache = should_use_ephemeral_cache(
req, format_control=format_control, autobuilding=autobuilding,
req,
format_control=format_control,
should_unpack=should_unpack,
cache_available=cache_available,
)
if ephem_cache is None:
@ -1061,7 +1079,7 @@ class WheelBuilder(object):
# Is any wheel build not using the ephemeral cache?
if any(not ephem_cache for _, ephem_cache in buildset):
have_directory_for_build = self._wheel_dir or (
autobuilding and self.wheel_cache.cache_dir
should_unpack and self.wheel_cache.cache_dir
)
assert have_directory_for_build
@ -1078,7 +1096,7 @@ class WheelBuilder(object):
build_success, build_failure = [], []
for req, ephem in buildset:
python_tag = None
if autobuilding:
if should_unpack:
python_tag = pep425tags.implementation_tag
if ephem:
output_dir = _cache.get_ephem_path_for_link(req.link)
@ -1099,7 +1117,7 @@ class WheelBuilder(object):
)
if wheel_file:
build_success.append(req)
if autobuilding:
if should_unpack:
# XXX: This is mildly duplicative with prepare_files,
# but not close enough to pull out to a single common
# method.

View file

@ -1,6 +1,7 @@
import distutils
import glob
import os
import shutil
import sys
import textwrap
from os.path import curdir, join, pardir
@ -23,6 +24,7 @@ from tests.lib import (
pyversion_tuple,
requirements_file,
)
from tests.lib.filesystem import make_socket_file
from tests.lib.local_repos import local_checkout
from tests.lib.path import Path
@ -488,6 +490,29 @@ def test_install_from_local_directory_with_symlinks_to_directories(
assert egg_info_folder in result.files_created, str(result)
@pytest.mark.skipif("sys.platform == 'win32' or sys.version_info < (3,)")
def test_install_from_local_directory_with_socket_file(script, data, tmpdir):
"""
Test installing from a local directory containing a socket file.
"""
egg_info_file = (
script.site_packages / "FSPkg-0.1.dev0-py%s.egg-info" % pyversion
)
package_folder = script.site_packages / "fspkg"
to_copy = data.packages.joinpath("FSPkg")
to_install = tmpdir.joinpath("src")
shutil.copytree(to_copy, to_install)
# Socket file, should be ignored.
socket_file_path = os.path.join(to_install, "example")
make_socket_file(socket_file_path)
result = script.pip("install", "--verbose", to_install, expect_error=False)
assert package_folder in result.files_created, str(result.stdout)
assert egg_info_file in result.files_created, str(result)
assert str(socket_file_path) in result.stderr
def test_install_from_local_directory_with_no_setup_py(script, data):
"""
Test installing from a local directory with no 'setup.py'.

View file

@ -79,6 +79,34 @@ def test_find_package_not_found():
assert len(list(result)) == 0
def test_report_single_not_found(script):
"""
Test passing one name and that isn't found.
"""
# We choose a non-canonicalized name to test that the non-canonical
# form is logged.
# Also, the following should report an error as there are no results
# to print. Consequently, there is no need to pass
# allow_stderr_warning=True since this is implied by expect_error=True.
result = script.pip('show', 'Abcd-3', expect_error=True)
assert 'WARNING: Package(s) not found: Abcd-3' in result.stderr
assert not result.stdout.splitlines()
def test_report_mixed_not_found(script):
"""
Test passing a mixture of found and not-found names.
"""
# We test passing non-canonicalized names.
result = script.pip(
'show', 'Abcd3', 'A-B-C', 'pip', allow_stderr_warning=True
)
assert 'WARNING: Package(s) not found: A-B-C, Abcd3' in result.stderr
lines = result.stdout.splitlines()
assert len(lines) == 10
assert 'Name: pip' in lines
def test_search_any_case():
"""
Search for a package in any case.
@ -86,7 +114,7 @@ def test_search_any_case():
"""
result = list(search_packages_info(['PIP']))
assert len(result) == 1
assert 'pip' == result[0]['name']
assert result[0]['name'] == 'pip'
def test_more_than_one_package():

View file

@ -6,6 +6,7 @@ import os
import pytest
from pip._internal.utils.misc import hide_url
from pip._internal.vcs.bazaar import Bazaar
from tests.lib import (
_test_path_to_file_url,
@ -35,7 +36,7 @@ def test_export(script, tmpdir):
_vcs_add(script, str(source_dir), vcs='bazaar')
export_dir = str(tmpdir / 'export')
url = 'bzr+' + _test_path_to_file_url(source_dir)
url = hide_url('bzr+' + _test_path_to_file_url(source_dir))
Bazaar().export(export_dir, url=url)
assert os.listdir(export_dir) == ['test_file']
@ -59,7 +60,7 @@ def test_export_rev(script, tmpdir):
)
export_dir = tmpdir / 'export'
url = 'bzr+' + _test_path_to_file_url(source_dir) + '@1'
url = hide_url('bzr+' + _test_path_to_file_url(source_dir) + '@1')
Bazaar().export(str(export_dir), url=url)
with open(export_dir / 'test_file', 'r') as f:

View file

@ -15,6 +15,7 @@ from scripttest import FoundDir, TestFileEnvironment
from pip._internal.download import PipSession
from pip._internal.index import PackageFinder
from pip._internal.locations import get_major_minor_version
from pip._internal.models.search_scope import SearchScope
from pip._internal.models.selection_prefs import SelectionPreferences
from pip._internal.utils.deprecation import DEPRECATION_MSG_PREFIX
@ -22,14 +23,14 @@ from pip._internal.utils.typing import MYPY_CHECK_RUNNING
from tests.lib.path import Path, curdir
if MYPY_CHECK_RUNNING:
from typing import Iterable, List, Optional
from typing import List, Optional
from pip._internal.models.target_python import TargetPython
DATA_DIR = Path(__file__).parent.parent.joinpath("data").abspath
SRC_DIR = Path(__file__).abspath.parent.parent.parent
pyversion = sys.version[:3]
pyversion = get_major_minor_version()
pyversion_tuple = sys.version_info
CURRENT_PY_VERSION_INFO = sys.version_info[:3]
@ -83,7 +84,6 @@ def make_test_finder(
find_links=None, # type: Optional[List[str]]
index_urls=None, # type: Optional[List[str]]
allow_all_prereleases=False, # type: bool
trusted_hosts=None, # type: Optional[Iterable[str]]
session=None, # type: Optional[PipSession]
target_python=None, # type: Optional[TargetPython]
):
@ -110,7 +110,6 @@ def make_test_finder(
return PackageFinder.create(
search_scope=search_scope,
selection_prefs=selection_prefs,
trusted_hosts=trusted_hosts,
session=session,
target_python=target_python,
)

48
tests/lib/filesystem.py Normal file
View file

@ -0,0 +1,48 @@
"""Helpers for filesystem-dependent tests.
"""
import os
import socket
import subprocess
import sys
from functools import partial
from itertools import chain
from .path import Path
def make_socket_file(path):
# Socket paths are limited to 108 characters (sometimes less) so we
# chdir before creating it and use a relative path name.
cwd = os.getcwd()
os.chdir(os.path.dirname(path))
try:
sock = socket.socket(socket.AF_UNIX)
sock.bind(os.path.basename(path))
finally:
os.chdir(cwd)
def make_unreadable_file(path):
Path(path).touch()
os.chmod(path, 0o000)
if sys.platform == "win32":
# Once we drop PY2 we can use `os.getlogin()` instead.
username = os.environ["USERNAME"]
# Remove "Read Data/List Directory" permission for current user, but
# leave everything else.
args = ["icacls", path, "/deny", username + ":(RD)"]
subprocess.check_call(args)
def get_filelist(base):
def join(dirpath, dirnames, filenames):
relative_dirpath = os.path.relpath(dirpath, base)
join_dirpath = partial(os.path.join, relative_dirpath)
return chain(
(join_dirpath(p) for p in dirnames),
(join_dirpath(p) for p in filenames),
)
return set(chain.from_iterable(
join(*dirinfo) for dirinfo in os.walk(base)
))

View file

@ -5,6 +5,7 @@ import subprocess
from pip._vendor.six.moves.urllib import request as urllib_request
from pip._internal.utils.misc import hide_url
from pip._internal.vcs import bazaar, git, mercurial, subversion
from tests.lib import path_to_url
@ -59,7 +60,8 @@ def _get_vcs_and_checkout_url(remote_repository, directory):
destination_path = os.path.join(directory, repository_name)
if not os.path.exists(destination_path):
vcs_class().obtain(destination_path, url=remote_repository)
url = hide_url(remote_repository)
vcs_class().obtain(destination_path, url=url)
return '%s+%s' % (
vcs,
path_to_url('/'.join([directory, repository_name, branch])),

View file

@ -39,8 +39,8 @@ class TestWheelCache:
# Legacy requirements were built.
assert mock_calls == [
call(['a', 'b'], autobuilding=True),
call(['c', 'd'], autobuilding=True),
call(['a', 'b'], should_unpack=True),
call(['c', 'd'], should_unpack=True),
]
# Legacy build failures are not included in the return value.
@ -57,7 +57,7 @@ class TestWheelCache:
# Legacy requirements were not built.
assert mock_calls == [
call(['a', 'b'], autobuilding=True),
call(['a', 'b'], should_unpack=True),
]
assert build_failures == ['a']

View file

@ -1,6 +1,8 @@
import functools
import hashlib
import logging
import os
import shutil
import sys
from io import BytesIO
from shutil import copy, rmtree
@ -15,6 +17,7 @@ from pip._internal.download import (
MultiDomainBasicAuth,
PipSession,
SafeFileCache,
_copy_source_tree,
_download_http_url,
_get_url_scheme,
parse_content_disposition,
@ -28,6 +31,12 @@ from pip._internal.models.link import Link
from pip._internal.utils.hashes import Hashes
from pip._internal.utils.misc import path_to_url
from tests.lib import create_file
from tests.lib.filesystem import (
get_filelist,
make_socket_file,
make_unreadable_file,
)
from tests.lib.path import Path
@pytest.fixture(scope="function")
@ -334,6 +343,85 @@ def test_url_to_path_path_to_url_symmetry_win():
assert url_to_path(path_to_url(unc_path)) == unc_path
@pytest.fixture
def clean_project(tmpdir_factory, data):
tmpdir = Path(str(tmpdir_factory.mktemp("clean_project")))
new_project_dir = tmpdir.joinpath("FSPkg")
path = data.packages.joinpath("FSPkg")
shutil.copytree(path, new_project_dir)
return new_project_dir
def test_copy_source_tree(clean_project, tmpdir):
target = tmpdir.joinpath("target")
expected_files = get_filelist(clean_project)
assert len(expected_files) == 3
_copy_source_tree(clean_project, target)
copied_files = get_filelist(target)
assert expected_files == copied_files
@pytest.mark.skipif("sys.platform == 'win32' or sys.version_info < (3,)")
def test_copy_source_tree_with_socket(clean_project, tmpdir, caplog):
target = tmpdir.joinpath("target")
expected_files = get_filelist(clean_project)
socket_path = str(clean_project.joinpath("aaa"))
make_socket_file(socket_path)
_copy_source_tree(clean_project, target)
copied_files = get_filelist(target)
assert expected_files == copied_files
# Warning should have been logged.
assert len(caplog.records) == 1
record = caplog.records[0]
assert record.levelname == 'WARNING'
assert socket_path in record.message
@pytest.mark.skipif("sys.platform == 'win32' or sys.version_info < (3,)")
def test_copy_source_tree_with_socket_fails_with_no_socket_error(
clean_project, tmpdir
):
target = tmpdir.joinpath("target")
expected_files = get_filelist(clean_project)
make_socket_file(clean_project.joinpath("aaa"))
unreadable_file = clean_project.joinpath("bbb")
make_unreadable_file(unreadable_file)
with pytest.raises(shutil.Error) as e:
_copy_source_tree(clean_project, target)
errored_files = [err[0] for err in e.value.args[0]]
assert len(errored_files) == 1
assert unreadable_file in errored_files
copied_files = get_filelist(target)
# All files without errors should have been copied.
assert expected_files == copied_files
def test_copy_source_tree_with_unreadable_dir_fails(clean_project, tmpdir):
target = tmpdir.joinpath("target")
expected_files = get_filelist(clean_project)
unreadable_file = clean_project.joinpath("bbb")
make_unreadable_file(unreadable_file)
with pytest.raises(shutil.Error) as e:
_copy_source_tree(clean_project, target)
errored_files = [err[0] for err in e.value.args[0]]
assert len(errored_files) == 1
assert unreadable_file in errored_files
copied_files = get_filelist(target)
# All files without errors should have been copied.
assert expected_files == copied_files
class Test_unpack_file_url(object):
def prep(self, tmpdir, data):
@ -527,14 +615,128 @@ class TestPipSession:
assert not hasattr(session.adapters["http://"], "cache")
def test_insecure_host_cache_is_not_enabled(self, tmpdir):
def test_insecure_host_adapter(self, tmpdir):
session = PipSession(
cache=tmpdir.joinpath("test-cache"),
insecure_hosts=["example.com"],
trusted_hosts=["example.com"],
)
assert "https://example.com/" in session.adapters
# Check that the "port wildcard" is present.
assert "https://example.com:" in session.adapters
# Check that the cache isn't enabled.
assert not hasattr(session.adapters["https://example.com/"], "cache")
def test_add_trusted_host(self):
# Leave a gap to test how the ordering is affected.
trusted_hosts = ['host1', 'host3']
session = PipSession(trusted_hosts=trusted_hosts)
insecure_adapter = session._insecure_adapter
prefix2 = 'https://host2/'
prefix3 = 'https://host3/'
# Confirm some initial conditions as a baseline.
assert session.pip_trusted_hosts == ['host1', 'host3']
assert session.adapters[prefix3] is insecure_adapter
assert prefix2 not in session.adapters
# Test adding a new host.
session.add_trusted_host('host2')
assert session.pip_trusted_hosts == ['host1', 'host3', 'host2']
# Check that prefix3 is still present.
assert session.adapters[prefix3] is insecure_adapter
assert session.adapters[prefix2] is insecure_adapter
# Test that adding the same host doesn't create a duplicate.
session.add_trusted_host('host3')
assert session.pip_trusted_hosts == ['host1', 'host3', 'host2'], (
'actual: {}'.format(session.pip_trusted_hosts)
)
def test_add_trusted_host__logging(self, caplog):
"""
Test logging when add_trusted_host() is called.
"""
trusted_hosts = ['host0', 'host1']
session = PipSession(trusted_hosts=trusted_hosts)
with caplog.at_level(logging.INFO):
# Test adding an existing host.
session.add_trusted_host('host1', source='somewhere')
session.add_trusted_host('host2')
# Test calling add_trusted_host() on the same host twice.
session.add_trusted_host('host2')
actual = [(r.levelname, r.message) for r in caplog.records]
# Observe that "host0" isn't included in the logs.
expected = [
('INFO', "adding trusted host: 'host1' (from somewhere)"),
('INFO', "adding trusted host: 'host2'"),
('INFO', "adding trusted host: 'host2'"),
]
assert actual == expected
def test_iter_secure_origins(self):
trusted_hosts = ['host1', 'host2']
session = PipSession(trusted_hosts=trusted_hosts)
actual = list(session.iter_secure_origins())
assert len(actual) == 8
# Spot-check that SECURE_ORIGINS is included.
assert actual[0] == ('https', '*', '*')
assert actual[-2:] == [
('*', 'host1', '*'),
('*', 'host2', '*'),
]
def test_iter_secure_origins__trusted_hosts_empty(self):
"""
Test iter_secure_origins() after passing trusted_hosts=[].
"""
session = PipSession(trusted_hosts=[])
actual = list(session.iter_secure_origins())
assert len(actual) == 6
# Spot-check that SECURE_ORIGINS is included.
assert actual[0] == ('https', '*', '*')
@pytest.mark.parametrize(
'location, trusted, expected',
[
("http://pypi.org/something", [], False),
("https://pypi.org/something", [], True),
("git+http://pypi.org/something", [], False),
("git+https://pypi.org/something", [], True),
("git+ssh://git@pypi.org/something", [], True),
("http://localhost", [], True),
("http://127.0.0.1", [], True),
("http://example.com/something/", [], False),
("http://example.com/something/", ["example.com"], True),
# Try changing the case.
("http://eXample.com/something/", ["example.cOm"], True),
],
)
def test_is_secure_origin(self, caplog, location, trusted, expected):
class MockLogger(object):
def __init__(self):
self.called = False
def warning(self, *args, **kwargs):
self.called = True
session = PipSession(trusted_hosts=trusted)
actual = session.is_secure_origin(location)
assert actual == expected
log_records = [(r.levelname, r.message) for r in caplog.records]
if expected:
assert not log_records
return
assert len(log_records) == 1
actual_level, actual_message = log_records[0]
assert actual_level == 'WARNING'
assert 'is not a trusted or secure host' in actual_message
@pytest.mark.parametrize(["input_url", "url", "username", "password"], [
(

View file

@ -387,7 +387,7 @@ class TestCandidateEvaluator:
actual_versions = [str(c.version) for c in actual]
assert actual_versions == expected_versions
def test_make_found_candidates(self):
def test_compute_best_candidate(self):
specifier = SpecifierSet('<= 1.11')
versions = ['1.10', '1.11', '1.12']
candidates = [
@ -397,16 +397,36 @@ class TestCandidateEvaluator:
'my-project',
specifier=specifier,
)
found_candidates = evaluator.make_found_candidates(candidates)
result = evaluator.compute_best_candidate(candidates)
assert found_candidates._candidates == candidates
assert found_candidates._evaluator is evaluator
assert result._candidates == candidates
expected_applicable = candidates[:2]
assert [str(c.version) for c in expected_applicable] == [
'1.10',
'1.11',
]
assert found_candidates._applicable_candidates == expected_applicable
assert result._applicable_candidates == expected_applicable
assert result.best_candidate is expected_applicable[1]
def test_compute_best_candidate__none_best(self):
"""
Test returning a None best candidate.
"""
specifier = SpecifierSet('<= 1.10')
versions = ['1.11', '1.12']
candidates = [
make_mock_candidate(version) for version in versions
]
evaluator = CandidateEvaluator.create(
'my-project',
specifier=specifier,
)
result = evaluator.compute_best_candidate(candidates)
assert result._candidates == candidates
assert result._applicable_candidates == []
assert result.best_candidate is None
@pytest.mark.parametrize('hex_digest, expected', [
# Test a link with no hash.
@ -448,15 +468,15 @@ class TestCandidateEvaluator:
actual = sort_value[1]
assert actual == expected
def test_get_best_candidate__no_candidates(self):
def test_sort_best_candidate__no_candidates(self):
"""
Test passing an empty list.
"""
evaluator = CandidateEvaluator.create('my-project')
actual = evaluator.get_best_candidate([])
actual = evaluator.sort_best_candidate([])
assert actual is None
def test_get_best_candidate__all_yanked(self, caplog):
def test_sort_best_candidate__all_yanked(self, caplog):
"""
Test all candidates yanked.
"""
@ -468,7 +488,7 @@ class TestCandidateEvaluator:
]
expected_best = candidates[1]
evaluator = CandidateEvaluator.create('my-project')
actual = evaluator.get_best_candidate(candidates)
actual = evaluator.sort_best_candidate(candidates)
assert actual is expected_best
assert str(actual.version) == '3.0'
@ -489,7 +509,7 @@ class TestCandidateEvaluator:
# Test a unicode string with a non-ascii character.
(u'curly quote: \u2018', u'curly quote: \u2018'),
])
def test_get_best_candidate__yanked_reason(
def test_sort_best_candidate__yanked_reason(
self, caplog, yanked_reason, expected_reason,
):
"""
@ -499,7 +519,7 @@ class TestCandidateEvaluator:
make_mock_candidate('1.0', yanked_reason=yanked_reason),
]
evaluator = CandidateEvaluator.create('my-project')
actual = evaluator.get_best_candidate(candidates)
actual = evaluator.sort_best_candidate(candidates)
assert str(actual.version) == '1.0'
assert len(caplog.records) == 1
@ -513,7 +533,9 @@ class TestCandidateEvaluator:
) + expected_reason
assert record.message == expected_message
def test_get_best_candidate__best_yanked_but_not_all(self, caplog):
def test_sort_best_candidate__best_yanked_but_not_all(
self, caplog,
):
"""
Test the best candidates being yanked, but not all.
"""
@ -526,7 +548,7 @@ class TestCandidateEvaluator:
]
expected_best = candidates[1]
evaluator = CandidateEvaluator.create('my-project')
actual = evaluator.get_best_candidate(candidates)
actual = evaluator.sort_best_candidate(candidates)
assert actual is expected_best
assert str(actual.version) == '2.0'
@ -642,96 +664,6 @@ class TestPackageFinder:
# Check that the attributes weren't reset.
assert actual_format_control.only_binary == {':all:'}
def test_add_trusted_host(self):
# Leave a gap to test how the ordering is affected.
trusted_hosts = ['host1', 'host3']
session = PipSession(insecure_hosts=trusted_hosts)
finder = make_test_finder(
session=session,
trusted_hosts=trusted_hosts,
)
insecure_adapter = session._insecure_adapter
prefix2 = 'https://host2/'
prefix3 = 'https://host3/'
# Confirm some initial conditions as a baseline.
assert finder.trusted_hosts == ['host1', 'host3']
assert session.adapters[prefix3] is insecure_adapter
assert prefix2 not in session.adapters
# Test adding a new host.
finder.add_trusted_host('host2')
assert finder.trusted_hosts == ['host1', 'host3', 'host2']
# Check that prefix3 is still present.
assert session.adapters[prefix3] is insecure_adapter
assert session.adapters[prefix2] is insecure_adapter
# Test that adding the same host doesn't create a duplicate.
finder.add_trusted_host('host3')
assert finder.trusted_hosts == ['host1', 'host3', 'host2'], (
'actual: {}'.format(finder.trusted_hosts)
)
def test_add_trusted_host__logging(self, caplog):
"""
Test logging when add_trusted_host() is called.
"""
trusted_hosts = ['host1']
session = PipSession(insecure_hosts=trusted_hosts)
finder = make_test_finder(
session=session,
trusted_hosts=trusted_hosts,
)
with caplog.at_level(logging.INFO):
# Test adding an existing host.
finder.add_trusted_host('host1', source='somewhere')
finder.add_trusted_host('host2')
# Test calling add_trusted_host() on the same host twice.
finder.add_trusted_host('host2')
actual = [(r.levelname, r.message) for r in caplog.records]
expected = [
('INFO', "adding trusted host: 'host1' (from somewhere)"),
('INFO', "adding trusted host: 'host2'"),
('INFO', "adding trusted host: 'host2'"),
]
assert actual == expected
def test_iter_secure_origins(self):
trusted_hosts = ['host1', 'host2']
finder = make_test_finder(trusted_hosts=trusted_hosts)
actual = list(finder.iter_secure_origins())
assert len(actual) == 8
# Spot-check that SECURE_ORIGINS is included.
assert actual[0] == ('https', '*', '*')
assert actual[-2:] == [
('*', 'host1', '*'),
('*', 'host2', '*'),
]
def test_iter_secure_origins__none_trusted_hosts(self):
"""
Test iter_secure_origins() after passing trusted_hosts=None.
"""
# Use PackageFinder.create() rather than make_test_finder()
# to make sure we're really passing trusted_hosts=None.
search_scope = SearchScope([], [])
selection_prefs = SelectionPreferences(
allow_yanked=True,
)
finder = PackageFinder.create(
search_scope=search_scope,
selection_prefs=selection_prefs,
trusted_hosts=None,
session=object(),
)
actual = list(finder.iter_secure_origins())
assert len(actual) == 6
# Spot-check that SECURE_ORIGINS is included.
assert actual[0] == ('https', '*', '*')
@pytest.mark.parametrize(
'allow_yanked, ignore_requires_python, only_binary, expected_formats',
[
@ -875,36 +807,6 @@ def test_determine_base_url(html, url, expected):
assert _determine_base_url(document, url) == expected
class MockLogger(object):
def __init__(self):
self.called = False
def warning(self, *args, **kwargs):
self.called = True
@pytest.mark.parametrize(
("location", "trusted", "expected"),
[
("http://pypi.org/something", [], True),
("https://pypi.org/something", [], False),
("git+http://pypi.org/something", [], True),
("git+https://pypi.org/something", [], False),
("git+ssh://git@pypi.org/something", [], False),
("http://localhost", [], False),
("http://127.0.0.1", [], False),
("http://example.com/something/", [], True),
("http://example.com/something/", ["example.com"], False),
("http://eXample.com/something/", ["example.cOm"], False),
],
)
def test_secure_origin(location, trusted, expected):
finder = make_test_finder(trusted_hosts=trusted)
logger = MockLogger()
finder._validate_secure_origin(logger, location)
assert logger.called == expected
@pytest.mark.parametrize(
("fragment", "canonical_name", "expected"),
[

View file

@ -127,3 +127,13 @@ class TestLink:
url = 'https://example.com/wheel.whl#sha512={}'.format(128 * 'a')
link = Link(url)
assert link.is_hash_allowed(hashes) == expected
@pytest.mark.parametrize('url, expected', [
('git+https://github.com/org/repo', True),
('bzr+http://bzr.myproject.org/MyProject/trunk/#egg=MyProject', True),
('https://example.com/some.whl', False),
('file://home/foo/some.whl', False),
])
def test_is_vcs(self, url, expected):
link = Link(url)
assert link.is_vcs is expected

View file

@ -47,6 +47,10 @@ class TestPEP425Tags(object):
base = pip._internal.pep425tags.get_abbr_impl() + \
pip._internal.pep425tags.get_impl_ver()
if sys.version_info >= (3, 8):
# Python 3.8 removes the m flag, so don't look for it.
flags = flags.replace('m', '')
if sys.version_info < (3, 3):
config_vars.update({'Py_UNICODE_SIZE': 2})
mock_gcf = self.mock_get_config_var(**config_vars)

View file

@ -601,16 +601,16 @@ def test_parse_editable_local_extras(
def test_exclusive_environment_markers():
"""Make sure RequirementSet accepts several excluding env markers"""
eq26 = install_req_from_line(
"Django>=1.6.10,<1.7 ; python_version == '2.6'")
eq26.is_direct = True
ne26 = install_req_from_line(
"Django>=1.6.10,<1.8 ; python_version != '2.6'")
ne26.is_direct = True
eq36 = install_req_from_line(
"Django>=1.6.10,<1.7 ; python_version == '3.6'")
eq36.is_direct = True
ne36 = install_req_from_line(
"Django>=1.6.10,<1.8 ; python_version != '3.6'")
ne36.is_direct = True
req_set = RequirementSet()
req_set.add_requirement(eq26)
req_set.add_requirement(ne26)
req_set.add_requirement(eq36)
req_set.add_requirement(ne36)
assert req_set.has_requirement('Django')

View file

@ -342,12 +342,13 @@ class TestProcessLine(object):
list(process_line("--extra-index-url=url", "file", 1, finder=finder))
assert finder.index_urls == ['url']
def test_set_finder_trusted_host(self, caplog, finder):
def test_set_finder_trusted_host(self, caplog, session, finder):
with caplog.at_level(logging.INFO):
list(process_line(
"--trusted-host=host", "file.txt", 1, finder=finder,
session=session,
))
assert finder.trusted_hosts == ['host']
assert list(finder.trusted_hosts) == ['host']
session = finder.session
assert session.adapters['https://host/'] is session._insecure_adapter

View file

@ -1,10 +1,8 @@
import sys
import pytest
from mock import patch
from pip._internal.models.target_python import TargetPython
from tests.lib import CURRENT_PY_VERSION_INFO
from tests.lib import CURRENT_PY_VERSION_INFO, pyversion
class TestTargetPython:
@ -36,16 +34,12 @@ class TestTargetPython:
"""
Test passing py_version_info=None.
"""
# Get the index of the second dot.
index = sys.version.find('.', 2)
current_major_minor = sys.version[:index] # e.g. "3.6"
target_python = TargetPython(py_version_info=None)
assert target_python._given_py_version_info is None
assert target_python.py_version_info == CURRENT_PY_VERSION_INFO
assert target_python.py_version == current_major_minor
assert target_python.py_version == pyversion
@pytest.mark.parametrize('kwargs, expected', [
({}, ''),

View file

@ -12,12 +12,9 @@ from pip._internal.index import InstallationCandidate
from pip._internal.utils import outdated
class MockFoundCandidates(object):
class MockBestCandidateResult(object):
def __init__(self, best):
self._best = best
def get_best(self):
return self._best
self.best_candidate = best
class MockPackageFinder(object):
@ -37,8 +34,8 @@ class MockPackageFinder(object):
def create(cls, *args, **kwargs):
return cls()
def find_candidates(self, project_name):
return MockFoundCandidates(self.INSTALLATION_CANDIDATES[0])
def find_best_candidate(self, project_name):
return MockBestCandidateResult(self.INSTALLATION_CANDIDATES[0])
class MockDistribution(object):
@ -59,7 +56,7 @@ def _options():
''' Some default options that we pass to outdated.pip_version_check '''
return pretend.stub(
find_links=[], index_url='default_url', extra_index_urls=[],
no_index=False, pre=False, trusted_hosts=False, cache_dir='',
no_index=False, pre=False, cache_dir='',
)

View file

@ -37,13 +37,19 @@ from pip._internal.utils.glibc import (
)
from pip._internal.utils.hashes import Hashes, MissingHashes
from pip._internal.utils.misc import (
HiddenText,
build_url_from_netloc,
call_subprocess,
egg_link_path,
ensure_dir,
format_command_args,
get_installed_distributions,
get_prog,
hide_url,
hide_value,
make_command,
make_subprocess_output_error,
netloc_has_port,
normalize_path,
normalize_version_info,
path_to_display,
@ -828,6 +834,9 @@ class TestGetProg(object):
(['pip', 'list'], 'pip list'),
(['foo', 'space space', 'new\nline', 'double"quote', "single'quote"],
"""foo 'space space' 'new\nline' 'double"quote' 'single'"'"'quote'"""),
# Test HiddenText arguments.
(make_command(hide_value('secret1'), 'foo', hide_value('secret2')),
"'****' foo '****'"),
])
def test_format_command_args(args, expected):
actual = format_command_args(args)
@ -1223,6 +1232,31 @@ def test_path_to_url_win():
assert path_to_url('file') == 'file:' + urllib_request.pathname2url(path)
@pytest.mark.parametrize('netloc, expected_url, expected_has_port', [
# Test domain name.
('example.com', 'https://example.com', False),
('example.com:5000', 'https://example.com:5000', True),
# Test IPv4 address.
('127.0.0.1', 'https://127.0.0.1', False),
('127.0.0.1:5000', 'https://127.0.0.1:5000', True),
# Test bare IPv6 address.
('2001:DB6::1', 'https://[2001:DB6::1]', False),
# Test IPv6 with port.
('[2001:DB6::1]:5000', 'https://[2001:DB6::1]:5000', True),
# Test netloc with auth.
(
'user:password@localhost:5000',
'https://user:password@localhost:5000',
True
)
])
def test_build_url_from_netloc_and_netloc_has_port(
netloc, expected_url, expected_has_port,
):
assert build_url_from_netloc(netloc) == expected_url
assert netloc_has_port(netloc) is expected_has_port
@pytest.mark.parametrize('netloc, expected', [
# Test a basic case.
('example.com', ('example.com', (None, None))),
@ -1329,6 +1363,73 @@ def test_redact_password_from_url(auth_url, expected_url):
assert url == expected_url
class TestHiddenText:
def test_basic(self):
"""
Test str(), repr(), and attribute access.
"""
hidden = HiddenText('my-secret', redacted='######')
assert repr(hidden) == "<HiddenText '######'>"
assert str(hidden) == '######'
assert hidden.redacted == '######'
assert hidden.secret == 'my-secret'
def test_equality_with_str(self):
"""
Test equality (and inequality) with str objects.
"""
hidden = HiddenText('secret', redacted='****')
# Test that the object doesn't compare equal to either its original
# or redacted forms.
assert hidden != hidden.secret
assert hidden.secret != hidden
assert hidden != hidden.redacted
assert hidden.redacted != hidden
def test_equality_same_secret(self):
"""
Test equality with an object having the same secret.
"""
# Choose different redactions for the two objects.
hidden1 = HiddenText('secret', redacted='****')
hidden2 = HiddenText('secret', redacted='####')
assert hidden1 == hidden2
# Also test __ne__. This assertion fails in Python 2 without
# defining HiddenText.__ne__.
assert not hidden1 != hidden2
def test_equality_different_secret(self):
"""
Test equality with an object having a different secret.
"""
hidden1 = HiddenText('secret-1', redacted='****')
hidden2 = HiddenText('secret-2', redacted='****')
assert hidden1 != hidden2
# Also test __eq__.
assert not hidden1 == hidden2
def test_hide_value():
hidden = hide_value('my-secret')
assert repr(hidden) == "<HiddenText '****'>"
assert str(hidden) == '****'
assert hidden.redacted == '****'
assert hidden.secret == 'my-secret'
def test_hide_url():
hidden_url = hide_url('https://user:password@example.com')
assert repr(hidden_url) == "<HiddenText 'https://user:****@example.com'>"
assert str(hidden_url) == 'https://user:****@example.com'
assert hidden_url.redacted == 'https://user:****@example.com'
assert hidden_url.secret == 'https://user:password@example.com'
@pytest.fixture()
def patch_deprecation_check_version():
# We do this, so that the deprecation tests are easier to write.

View file

@ -0,0 +1,61 @@
import os
import shutil
import pytest
from pip._internal.utils.filesystem import copy2_fixed, is_socket
from tests.lib.filesystem import make_socket_file, make_unreadable_file
from tests.lib.path import Path
def make_file(path):
Path(path).touch()
def make_valid_symlink(path):
target = path + "1"
make_file(target)
os.symlink(target, path)
def make_broken_symlink(path):
os.symlink("foo", path)
def make_dir(path):
os.mkdir(path)
skip_on_windows = pytest.mark.skipif("sys.platform == 'win32'")
@skip_on_windows
@pytest.mark.parametrize("create,result", [
(make_socket_file, True),
(make_file, False),
(make_valid_symlink, False),
(make_broken_symlink, False),
(make_dir, False),
])
def test_is_socket(create, result, tmpdir):
target = tmpdir.joinpath("target")
create(target)
assert os.path.lexists(target)
assert is_socket(target) == result
@pytest.mark.parametrize("create,error_type", [
pytest.param(
make_socket_file, shutil.SpecialFileError, marks=skip_on_windows
),
(make_unreadable_file, OSError),
])
def test_copy2_fixed_raises_appropriate_errors(create, error_type, tmpdir):
src = tmpdir.joinpath("src")
create(src)
dest = tmpdir.joinpath("dest")
with pytest.raises(error_type):
copy2_fixed(src, dest)
assert not dest.exists()

View file

@ -6,6 +6,7 @@ from mock import patch
from pip._vendor.packaging.version import parse as parse_version
from pip._internal.exceptions import BadCommand
from pip._internal.utils.misc import hide_url, hide_value
from pip._internal.vcs import make_vcs_requirement_url
from pip._internal.vcs.bazaar import Bazaar
from pip._internal.vcs.git import Git, looks_like_hash
@ -342,7 +343,7 @@ def test_subversion__get_url_rev_and_auth(url, expected):
@pytest.mark.parametrize('username, password, expected', [
(None, None, []),
('user', None, []),
('user', 'pass', []),
('user', hide_value('pass'), []),
])
def test_git__make_rev_args(username, password, expected):
"""
@ -355,7 +356,8 @@ def test_git__make_rev_args(username, password, expected):
@pytest.mark.parametrize('username, password, expected', [
(None, None, []),
('user', None, ['--username', 'user']),
('user', 'pass', ['--username', 'user', '--password', 'pass']),
('user', hide_value('pass'),
['--username', 'user', '--password', hide_value('pass')]),
])
def test_subversion__make_rev_args(username, password, expected):
"""
@ -369,12 +371,15 @@ def test_subversion__get_url_rev_options():
"""
Test Subversion.get_url_rev_options().
"""
url = 'svn+https://user:pass@svn.example.com/MyProject@v1.0#egg=MyProject'
url, rev_options = Subversion().get_url_rev_options(url)
assert url == 'https://svn.example.com/MyProject'
secret_url = (
'svn+https://user:pass@svn.example.com/MyProject@v1.0#egg=MyProject'
)
hidden_url = hide_url(secret_url)
url, rev_options = Subversion().get_url_rev_options(hidden_url)
assert url == hide_url('https://svn.example.com/MyProject')
assert rev_options.rev == 'v1.0'
assert rev_options.extra_args == (
['--username', 'user', '--password', 'pass']
['--username', 'user', '--password', hide_value('pass')]
)
@ -519,43 +524,48 @@ class TestSubversionArgs(TestCase):
assert self.call_subprocess_mock.call_args[0][0] == args
def test_obtain(self):
self.svn.obtain(self.dest, self.url)
self.assert_call_args(
['svn', 'checkout', '-q', '--non-interactive', '--username',
'username', '--password', 'password',
'http://svn.example.com/', '/tmp/test'])
self.svn.obtain(self.dest, hide_url(self.url))
self.assert_call_args([
'svn', 'checkout', '-q', '--non-interactive', '--username',
'username', '--password', hide_value('password'),
hide_url('http://svn.example.com/'), '/tmp/test',
])
def test_export(self):
self.svn.export(self.dest, self.url)
self.assert_call_args(
['svn', 'export', '--non-interactive', '--username', 'username',
'--password', 'password', 'http://svn.example.com/',
'/tmp/test'])
self.svn.export(self.dest, hide_url(self.url))
self.assert_call_args([
'svn', 'export', '--non-interactive', '--username', 'username',
'--password', hide_value('password'),
hide_url('http://svn.example.com/'), '/tmp/test',
])
def test_fetch_new(self):
self.svn.fetch_new(self.dest, self.url, self.rev_options)
self.assert_call_args(
['svn', 'checkout', '-q', '--non-interactive',
'svn+http://username:password@svn.example.com/',
'/tmp/test'])
self.svn.fetch_new(self.dest, hide_url(self.url), self.rev_options)
self.assert_call_args([
'svn', 'checkout', '-q', '--non-interactive',
hide_url('svn+http://username:password@svn.example.com/'),
'/tmp/test',
])
def test_fetch_new_revision(self):
rev_options = RevOptions(Subversion, '123')
self.svn.fetch_new(self.dest, self.url, rev_options)
self.assert_call_args(
['svn', 'checkout', '-q', '--non-interactive',
'-r', '123',
'svn+http://username:password@svn.example.com/',
'/tmp/test'])
self.svn.fetch_new(self.dest, hide_url(self.url), rev_options)
self.assert_call_args([
'svn', 'checkout', '-q', '--non-interactive', '-r', '123',
hide_url('svn+http://username:password@svn.example.com/'),
'/tmp/test',
])
def test_switch(self):
self.svn.switch(self.dest, self.url, self.rev_options)
self.assert_call_args(
['svn', 'switch', '--non-interactive',
'svn+http://username:password@svn.example.com/',
'/tmp/test'])
self.svn.switch(self.dest, hide_url(self.url), self.rev_options)
self.assert_call_args([
'svn', 'switch', '--non-interactive',
hide_url('svn+http://username:password@svn.example.com/'),
'/tmp/test',
])
def test_update(self):
self.svn.update(self.dest, self.url, self.rev_options)
self.assert_call_args(
['svn', 'update', '--non-interactive', '/tmp/test'])
self.svn.update(self.dest, hide_url(self.url), self.rev_options)
self.assert_call_args([
'svn', 'update', '--non-interactive', '/tmp/test',
])

View file

@ -6,6 +6,7 @@ import os
from pip._vendor.six.moves import configparser
from pip._internal.utils.misc import hide_url
from pip._internal.vcs.mercurial import Mercurial
from tests.lib import need_mercurial
@ -24,7 +25,7 @@ def test_mercurial_switch_updates_config_file_when_found(tmpdir):
hgrc_path = os.path.join(hg_dir, 'hgrc')
with open(hgrc_path, 'w') as f:
config.write(f)
hg.switch(tmpdir, 'new_url', options)
hg.switch(tmpdir, hide_url('new_url'), options)
config.read(hgrc_path)

View file

@ -78,10 +78,10 @@ def test_format_tag(file_tag, expected):
@pytest.mark.parametrize(
"base_name, autobuilding, cache_available, expected",
"base_name, should_unpack, cache_available, expected",
[
('pendulum-2.0.4', False, False, False),
# The following cases test autobuilding=True.
# The following cases test should_unpack=True.
# Test _contains_egg_info() returning True.
('pendulum-2.0.4', True, True, False),
('pendulum-2.0.4', True, False, True),
@ -91,7 +91,7 @@ def test_format_tag(file_tag, expected):
],
)
def test_should_use_ephemeral_cache__issue_6197(
base_name, autobuilding, cache_available, expected,
base_name, should_unpack, cache_available, expected,
):
"""
Regression test for: https://github.com/pypa/pip/issues/6197
@ -102,7 +102,7 @@ def test_should_use_ephemeral_cache__issue_6197(
format_control = FormatControl()
ephem_cache = wheel.should_use_ephemeral_cache(
req, format_control=format_control, autobuilding=autobuilding,
req, format_control=format_control, should_unpack=should_unpack,
cache_available=cache_available,
)
assert ephem_cache is expected
@ -126,7 +126,6 @@ def test_should_use_ephemeral_cache__disallow_binaries_and_vcs_checkout(
causes should_use_ephemeral_cache() to return None for VCS checkouts.
"""
req = Requirement('pendulum')
# Passing a VCS url causes link.is_artifact to return False.
link = Link(url='git+https://git.example.com/pendulum.git')
req = InstallRequirement(
req=req,
@ -137,7 +136,7 @@ def test_should_use_ephemeral_cache__disallow_binaries_and_vcs_checkout(
source_dir='/tmp/pip-install-9py5m2z1/pendulum',
)
assert not req.is_wheel
assert not req.link.is_artifact
assert req.link.is_vcs
format_control = FormatControl()
if disallow_binaries:
@ -145,7 +144,7 @@ def test_should_use_ephemeral_cache__disallow_binaries_and_vcs_checkout(
# The cache_available value doesn't matter for this test.
ephem_cache = wheel.should_use_ephemeral_cache(
req, format_control=format_control, autobuilding=True,
req, format_control=format_control, should_unpack=True,
cache_available=True,
)
assert ephem_cache is expected
@ -697,7 +696,9 @@ class TestWheelBuilder(object):
as mock_build_one:
wheel_req = Mock(is_wheel=True, editable=False, constraint=False)
wb = wheel.WheelBuilder(
finder=Mock(), preparer=Mock(), wheel_cache=None,
finder=Mock(),
preparer=Mock(),
wheel_cache=Mock(cache_dir=None),
)
with caplog.at_level(logging.INFO):
wb.build([wheel_req])

View file

@ -10,7 +10,8 @@ envlist =
pip = python {toxinidir}/tools/tox_pip.py
[testenv]
passenv = CI GIT_SSL_CAINFO
# Remove USERNAME once we drop PY2.
passenv = CI GIT_SSL_CAINFO USERNAME
setenv =
# This is required in order to get UTF-8 output inside of the subprocesses
# that our tests use.