mirror of https://github.com/pypa/pip
Move trusted_hosts logic to PipSession.
This commit is contained in:
parent
5e97de4773
commit
a6cdb490b0
|
@ -276,7 +276,6 @@ class RequirementCommand(IndexGroupCommand):
|
|||
return PackageFinder.create(
|
||||
search_scope=search_scope,
|
||||
selection_prefs=selection_prefs,
|
||||
trusted_hosts=options.trusted_hosts,
|
||||
session=session,
|
||||
target_python=target_python,
|
||||
)
|
||||
|
|
|
@ -126,7 +126,6 @@ class ListCommand(IndexGroupCommand):
|
|||
return PackageFinder.create(
|
||||
search_scope=search_scope,
|
||||
selection_prefs=selection_prefs,
|
||||
trusted_hosts=options.trusted_hosts,
|
||||
session=session,
|
||||
)
|
||||
|
||||
|
|
|
@ -12,7 +12,7 @@ import shutil
|
|||
import sys
|
||||
from contextlib import contextmanager
|
||||
|
||||
from pip._vendor import requests, urllib3
|
||||
from pip._vendor import requests, six, urllib3
|
||||
from pip._vendor.cachecontrol import CacheControlAdapter
|
||||
from pip._vendor.cachecontrol.caches import FileCache
|
||||
from pip._vendor.lockfile import LockError
|
||||
|
@ -32,7 +32,7 @@ import pip
|
|||
from pip._internal.exceptions import HashMismatch, InstallationError
|
||||
from pip._internal.models.index import PyPI
|
||||
# Import ssl from compat so the initial import occurs in only one place.
|
||||
from pip._internal.utils.compat import HAS_TLS, ssl
|
||||
from pip._internal.utils.compat import HAS_TLS, ipaddress, ssl
|
||||
from pip._internal.utils.encoding import auto_decode
|
||||
from pip._internal.utils.filesystem import check_path_owner, copy2_fixed
|
||||
from pip._internal.utils.glibc import libc_ver
|
||||
|
@ -64,8 +64,9 @@ from pip._internal.utils.ui import DownloadProgressProvider
|
|||
from pip._internal.vcs import vcs
|
||||
|
||||
if MYPY_CHECK_RUNNING:
|
||||
from logging import Logger
|
||||
from typing import (
|
||||
Callable, Dict, List, IO, Optional, Text, Tuple, Union
|
||||
IO, Callable, Dict, Iterator, List, Optional, Text, Tuple, Union,
|
||||
)
|
||||
from optparse import Values
|
||||
|
||||
|
@ -76,6 +77,7 @@ if MYPY_CHECK_RUNNING:
|
|||
from pip._internal.vcs.versioncontrol import AuthInfo, VersionControl
|
||||
|
||||
Credentials = Tuple[str, str, str]
|
||||
SecureOrigin = Tuple[str, str, Optional[str]]
|
||||
|
||||
if PY2:
|
||||
CopytreeKwargs = TypedDict(
|
||||
|
@ -119,6 +121,20 @@ except Exception as exc:
|
|||
str(exc))
|
||||
keyring = None
|
||||
|
||||
|
||||
SECURE_ORIGINS = [
|
||||
# protocol, hostname, port
|
||||
# Taken from Chrome's list of secure origins (See: http://bit.ly/1qrySKC)
|
||||
("https", "*", "*"),
|
||||
("*", "localhost", "*"),
|
||||
("*", "127.0.0.0/8", "*"),
|
||||
("*", "::1/128", "*"),
|
||||
("file", "*", None),
|
||||
# ssh is always secure.
|
||||
("ssh", "*", "*"),
|
||||
] # type: List[SecureOrigin]
|
||||
|
||||
|
||||
# These are environment variables present when running under various
|
||||
# CI systems. For each variable, some CI systems that use the variable
|
||||
# are indicated. The collection was chosen so that for each of a number
|
||||
|
@ -557,13 +573,21 @@ class PipSession(requests.Session):
|
|||
timeout = None # type: Optional[int]
|
||||
|
||||
def __init__(self, *args, **kwargs):
|
||||
"""
|
||||
:param insecure_hosts: Domains not to emit warnings for when not using
|
||||
HTTPS.
|
||||
"""
|
||||
retries = kwargs.pop("retries", 0)
|
||||
cache = kwargs.pop("cache", None)
|
||||
insecure_hosts = kwargs.pop("insecure_hosts", [])
|
||||
insecure_hosts = kwargs.pop("insecure_hosts", []) # type: List[str]
|
||||
index_urls = kwargs.pop("index_urls", None)
|
||||
|
||||
super(PipSession, self).__init__(*args, **kwargs)
|
||||
|
||||
# Namespace the attribute with "pip_" just in case to prevent
|
||||
# possible conflicts with the base class.
|
||||
self.pip_trusted_hosts = [] # type: List[str]
|
||||
|
||||
# Attach our User Agent to the request
|
||||
self.headers["User-Agent"] = user_agent()
|
||||
|
||||
|
@ -629,13 +653,26 @@ class PipSession(requests.Session):
|
|||
# Enable file:// urls
|
||||
self.mount("file://", LocalFSAdapter())
|
||||
|
||||
# We want to use a non-validating adapter for any requests which are
|
||||
# deemed insecure.
|
||||
for host in insecure_hosts:
|
||||
self.add_insecure_host(host)
|
||||
self.add_trusted_host(host, suppress_logging=True)
|
||||
|
||||
def add_trusted_host(self, host, source=None, suppress_logging=False):
|
||||
# type: (str, Optional[str], bool) -> None
|
||||
"""
|
||||
:param host: It is okay to provide a host that has previously been
|
||||
added.
|
||||
:param source: An optional source string, for logging where the host
|
||||
string came from.
|
||||
"""
|
||||
if not suppress_logging:
|
||||
msg = 'adding trusted host: {!r}'.format(host)
|
||||
if source is not None:
|
||||
msg += ' (from {})'.format(source)
|
||||
logger.info(msg)
|
||||
|
||||
if host not in self.pip_trusted_hosts:
|
||||
self.pip_trusted_hosts.append(host)
|
||||
|
||||
def add_insecure_host(self, host):
|
||||
# type: (str) -> None
|
||||
self.mount(build_url_from_netloc(host) + '/', self._insecure_adapter)
|
||||
if not netloc_has_port(host):
|
||||
# Mount wildcard ports for the same host.
|
||||
|
@ -644,6 +681,87 @@ class PipSession(requests.Session):
|
|||
self._insecure_adapter
|
||||
)
|
||||
|
||||
def iter_secure_origins(self):
|
||||
# type: () -> Iterator[SecureOrigin]
|
||||
for secure_origin in SECURE_ORIGINS:
|
||||
yield secure_origin
|
||||
for host in self.pip_trusted_hosts:
|
||||
yield ('*', host, '*')
|
||||
|
||||
def is_secure_origin(self, logger, location):
|
||||
# type: (Logger, Link) -> bool
|
||||
# Determine if this url used a secure transport mechanism
|
||||
parsed = urllib_parse.urlparse(str(location))
|
||||
origin = (parsed.scheme, parsed.hostname, parsed.port)
|
||||
|
||||
# The protocol to use to see if the protocol matches.
|
||||
# Don't count the repository type as part of the protocol: in
|
||||
# cases such as "git+ssh", only use "ssh". (I.e., Only verify against
|
||||
# the last scheme.)
|
||||
protocol = origin[0].rsplit('+', 1)[-1]
|
||||
|
||||
# Determine if our origin is a secure origin by looking through our
|
||||
# hardcoded list of secure origins, as well as any additional ones
|
||||
# configured on this PackageFinder instance.
|
||||
for secure_origin in self.iter_secure_origins():
|
||||
if protocol != secure_origin[0] and secure_origin[0] != "*":
|
||||
continue
|
||||
|
||||
try:
|
||||
# We need to do this decode dance to ensure that we have a
|
||||
# unicode object, even on Python 2.x.
|
||||
addr = ipaddress.ip_address(
|
||||
origin[1]
|
||||
if (
|
||||
isinstance(origin[1], six.text_type) or
|
||||
origin[1] is None
|
||||
)
|
||||
else origin[1].decode("utf8")
|
||||
)
|
||||
network = ipaddress.ip_network(
|
||||
secure_origin[1]
|
||||
if isinstance(secure_origin[1], six.text_type)
|
||||
# setting secure_origin[1] to proper Union[bytes, str]
|
||||
# creates problems in other places
|
||||
else secure_origin[1].decode("utf8") # type: ignore
|
||||
)
|
||||
except ValueError:
|
||||
# We don't have both a valid address or a valid network, so
|
||||
# we'll check this origin against hostnames.
|
||||
if (origin[1] and
|
||||
origin[1].lower() != secure_origin[1].lower() and
|
||||
secure_origin[1] != "*"):
|
||||
continue
|
||||
else:
|
||||
# We have a valid address and network, so see if the address
|
||||
# is contained within the network.
|
||||
if addr not in network:
|
||||
continue
|
||||
|
||||
# Check to see if the port patches
|
||||
if (origin[2] != secure_origin[2] and
|
||||
secure_origin[2] != "*" and
|
||||
secure_origin[2] is not None):
|
||||
continue
|
||||
|
||||
# If we've gotten here, then this origin matches the current
|
||||
# secure origin and we should return True
|
||||
return True
|
||||
|
||||
# If we've gotten to this point, then the origin isn't secure and we
|
||||
# will not accept it as a valid location to search. We will however
|
||||
# log a warning that we are ignoring it.
|
||||
logger.warning(
|
||||
"The repository located at %s is not a trusted or secure host and "
|
||||
"is being ignored. If this repository is available via HTTPS we "
|
||||
"recommend you use HTTPS instead, otherwise you may silence "
|
||||
"this warning and allow it anyway with '--trusted-host %s'.",
|
||||
parsed.hostname,
|
||||
parsed.hostname,
|
||||
)
|
||||
|
||||
return False
|
||||
|
||||
def request(self, method, url, *args, **kwargs):
|
||||
# Allow setting a default timeout on a session
|
||||
kwargs.setdefault("timeout", self.timeout)
|
||||
|
|
|
@ -12,7 +12,7 @@ import mimetypes
|
|||
import os
|
||||
import re
|
||||
|
||||
from pip._vendor import html5lib, requests, six
|
||||
from pip._vendor import html5lib, requests
|
||||
from pip._vendor.distlib.compat import unescape
|
||||
from pip._vendor.packaging import specifiers
|
||||
from pip._vendor.packaging.utils import canonicalize_name
|
||||
|
@ -33,7 +33,6 @@ from pip._internal.models.format_control import FormatControl
|
|||
from pip._internal.models.link import Link
|
||||
from pip._internal.models.selection_prefs import SelectionPreferences
|
||||
from pip._internal.models.target_python import TargetPython
|
||||
from pip._internal.utils.compat import ipaddress
|
||||
from pip._internal.utils.logging import indent_log
|
||||
from pip._internal.utils.misc import (
|
||||
ARCHIVE_EXTENSIONS,
|
||||
|
@ -47,10 +46,9 @@ from pip._internal.utils.typing import MYPY_CHECK_RUNNING
|
|||
from pip._internal.wheel import Wheel
|
||||
|
||||
if MYPY_CHECK_RUNNING:
|
||||
from logging import Logger
|
||||
from typing import (
|
||||
Any, Callable, FrozenSet, Iterable, Iterator, List, MutableMapping,
|
||||
Optional, Sequence, Set, Text, Tuple, Union,
|
||||
Any, Callable, FrozenSet, Iterable, List, MutableMapping, Optional,
|
||||
Sequence, Set, Text, Tuple, Union,
|
||||
)
|
||||
import xml.etree.ElementTree
|
||||
from pip._vendor.packaging.version import _BaseVersion
|
||||
|
@ -66,25 +64,11 @@ if MYPY_CHECK_RUNNING:
|
|||
Tuple[int, int, int, _BaseVersion, BuildTag, Optional[int]]
|
||||
)
|
||||
HTMLElement = xml.etree.ElementTree.Element
|
||||
SecureOrigin = Tuple[str, str, Optional[str]]
|
||||
|
||||
|
||||
__all__ = ['FormatControl', 'FoundCandidates', 'PackageFinder']
|
||||
|
||||
|
||||
SECURE_ORIGINS = [
|
||||
# protocol, hostname, port
|
||||
# Taken from Chrome's list of secure origins (See: http://bit.ly/1qrySKC)
|
||||
("https", "*", "*"),
|
||||
("*", "localhost", "*"),
|
||||
("*", "127.0.0.0/8", "*"),
|
||||
("*", "::1/128", "*"),
|
||||
("file", "*", None),
|
||||
# ssh is always secure.
|
||||
("ssh", "*", "*"),
|
||||
] # type: List[SecureOrigin]
|
||||
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
|
@ -813,7 +797,6 @@ class PackageFinder(object):
|
|||
target_python, # type: TargetPython
|
||||
allow_yanked, # type: bool
|
||||
format_control=None, # type: Optional[FormatControl]
|
||||
trusted_hosts=None, # type: Optional[List[str]]
|
||||
candidate_prefs=None, # type: CandidatePreferences
|
||||
ignore_requires_python=None, # type: Optional[bool]
|
||||
):
|
||||
|
@ -829,8 +812,6 @@ class PackageFinder(object):
|
|||
:param candidate_prefs: Options to use when creating a
|
||||
CandidateEvaluator object.
|
||||
"""
|
||||
if trusted_hosts is None:
|
||||
trusted_hosts = []
|
||||
if candidate_prefs is None:
|
||||
candidate_prefs = CandidatePreferences()
|
||||
|
||||
|
@ -844,7 +825,6 @@ class PackageFinder(object):
|
|||
self.search_scope = search_scope
|
||||
self.session = session
|
||||
self.format_control = format_control
|
||||
self.trusted_hosts = trusted_hosts
|
||||
|
||||
# These are boring links that have already been logged somehow.
|
||||
self._logged_links = set() # type: Set[Link]
|
||||
|
@ -858,7 +838,6 @@ class PackageFinder(object):
|
|||
cls,
|
||||
search_scope, # type: SearchScope
|
||||
selection_prefs, # type: SelectionPreferences
|
||||
trusted_hosts=None, # type: Optional[List[str]]
|
||||
session=None, # type: Optional[PipSession]
|
||||
target_python=None, # type: Optional[TargetPython]
|
||||
):
|
||||
|
@ -867,8 +846,6 @@ class PackageFinder(object):
|
|||
|
||||
:param selection_prefs: The candidate selection preferences, as a
|
||||
SelectionPreferences object.
|
||||
:param trusted_hosts: Domains not to emit warnings for when not using
|
||||
HTTPS.
|
||||
:param session: The Session to use to make requests.
|
||||
:param target_python: The target Python interpreter to use when
|
||||
checking compatibility. If None (the default), a TargetPython
|
||||
|
@ -894,7 +871,6 @@ class PackageFinder(object):
|
|||
target_python=target_python,
|
||||
allow_yanked=selection_prefs.allow_yanked,
|
||||
format_control=selection_prefs.format_control,
|
||||
trusted_hosts=trusted_hosts,
|
||||
ignore_requires_python=selection_prefs.ignore_requires_python,
|
||||
)
|
||||
|
||||
|
@ -908,6 +884,11 @@ class PackageFinder(object):
|
|||
# type: () -> List[str]
|
||||
return self.search_scope.index_urls
|
||||
|
||||
@property
|
||||
def trusted_hosts(self):
|
||||
# type: () -> Iterable[str]
|
||||
return iter(self.session.pip_trusted_hosts)
|
||||
|
||||
@property
|
||||
def allow_all_prereleases(self):
|
||||
# type: () -> bool
|
||||
|
@ -917,31 +898,6 @@ class PackageFinder(object):
|
|||
# type: () -> None
|
||||
self._candidate_prefs.allow_all_prereleases = True
|
||||
|
||||
def add_trusted_host(self, host, source=None):
|
||||
# type: (str, Optional[str]) -> None
|
||||
"""
|
||||
:param source: An optional source string, for logging where the host
|
||||
string came from.
|
||||
"""
|
||||
# It is okay to add a previously added host because PipSession stores
|
||||
# the resulting prefixes in a dict.
|
||||
msg = 'adding trusted host: {!r}'.format(host)
|
||||
if source is not None:
|
||||
msg += ' (from {})'.format(source)
|
||||
logger.info(msg)
|
||||
self.session.add_insecure_host(host)
|
||||
if host in self.trusted_hosts:
|
||||
return
|
||||
|
||||
self.trusted_hosts.append(host)
|
||||
|
||||
def iter_secure_origins(self):
|
||||
# type: () -> Iterator[SecureOrigin]
|
||||
for secure_origin in SECURE_ORIGINS:
|
||||
yield secure_origin
|
||||
for host in self.trusted_hosts:
|
||||
yield ('*', host, '*')
|
||||
|
||||
@staticmethod
|
||||
def _sort_locations(locations, expand_dir=False):
|
||||
# type: (Sequence[str], bool) -> Tuple[List[str], List[str]]
|
||||
|
@ -1000,80 +956,6 @@ class PackageFinder(object):
|
|||
|
||||
return files, urls
|
||||
|
||||
def _validate_secure_origin(self, logger, location):
|
||||
# type: (Logger, Link) -> bool
|
||||
# Determine if this url used a secure transport mechanism
|
||||
parsed = urllib_parse.urlparse(str(location))
|
||||
origin = (parsed.scheme, parsed.hostname, parsed.port)
|
||||
|
||||
# The protocol to use to see if the protocol matches.
|
||||
# Don't count the repository type as part of the protocol: in
|
||||
# cases such as "git+ssh", only use "ssh". (I.e., Only verify against
|
||||
# the last scheme.)
|
||||
protocol = origin[0].rsplit('+', 1)[-1]
|
||||
|
||||
# Determine if our origin is a secure origin by looking through our
|
||||
# hardcoded list of secure origins, as well as any additional ones
|
||||
# configured on this PackageFinder instance.
|
||||
for secure_origin in self.iter_secure_origins():
|
||||
if protocol != secure_origin[0] and secure_origin[0] != "*":
|
||||
continue
|
||||
|
||||
try:
|
||||
# We need to do this decode dance to ensure that we have a
|
||||
# unicode object, even on Python 2.x.
|
||||
addr = ipaddress.ip_address(
|
||||
origin[1]
|
||||
if (
|
||||
isinstance(origin[1], six.text_type) or
|
||||
origin[1] is None
|
||||
)
|
||||
else origin[1].decode("utf8")
|
||||
)
|
||||
network = ipaddress.ip_network(
|
||||
secure_origin[1]
|
||||
if isinstance(secure_origin[1], six.text_type)
|
||||
# setting secure_origin[1] to proper Union[bytes, str]
|
||||
# creates problems in other places
|
||||
else secure_origin[1].decode("utf8") # type: ignore
|
||||
)
|
||||
except ValueError:
|
||||
# We don't have both a valid address or a valid network, so
|
||||
# we'll check this origin against hostnames.
|
||||
if (origin[1] and
|
||||
origin[1].lower() != secure_origin[1].lower() and
|
||||
secure_origin[1] != "*"):
|
||||
continue
|
||||
else:
|
||||
# We have a valid address and network, so see if the address
|
||||
# is contained within the network.
|
||||
if addr not in network:
|
||||
continue
|
||||
|
||||
# Check to see if the port patches
|
||||
if (origin[2] != secure_origin[2] and
|
||||
secure_origin[2] != "*" and
|
||||
secure_origin[2] is not None):
|
||||
continue
|
||||
|
||||
# If we've gotten here, then this origin matches the current
|
||||
# secure origin and we should return True
|
||||
return True
|
||||
|
||||
# If we've gotten to this point, then the origin isn't secure and we
|
||||
# will not accept it as a valid location to search. We will however
|
||||
# log a warning that we are ignoring it.
|
||||
logger.warning(
|
||||
"The repository located at %s is not a trusted or secure host and "
|
||||
"is being ignored. If this repository is available via HTTPS we "
|
||||
"recommend you use HTTPS instead, otherwise you may silence "
|
||||
"this warning and allow it anyway with '--trusted-host %s'.",
|
||||
parsed.hostname,
|
||||
parsed.hostname,
|
||||
)
|
||||
|
||||
return False
|
||||
|
||||
def make_link_evaluator(self, project_name):
|
||||
# type: (str) -> LinkEvaluator
|
||||
canonical_name = canonicalize_name(project_name)
|
||||
|
@ -1117,7 +999,7 @@ class PackageFinder(object):
|
|||
(Link(url) for url in index_url_loc),
|
||||
(Link(url) for url in fl_url_loc),
|
||||
)
|
||||
if self._validate_secure_origin(logger, link)
|
||||
if self.session.is_secure_origin(logger, link)
|
||||
]
|
||||
|
||||
logger.debug('%d location(s) to search for versions of %s:',
|
||||
|
|
|
@ -272,7 +272,7 @@ def process_line(
|
|||
finder.set_allow_all_prereleases()
|
||||
for host in opts.trusted_hosts or []:
|
||||
source = 'line {} of {}'.format(line_number, filename)
|
||||
finder.add_trusted_host(host, source=source)
|
||||
session.add_trusted_host(host, source=source)
|
||||
|
||||
|
||||
def break_args_options(line):
|
||||
|
|
|
@ -136,7 +136,6 @@ def pip_version_check(session, options):
|
|||
finder = PackageFinder.create(
|
||||
search_scope=search_scope,
|
||||
selection_prefs=selection_prefs,
|
||||
trusted_hosts=options.trusted_hosts,
|
||||
session=session,
|
||||
)
|
||||
candidate = finder.find_candidates("pip").get_best()
|
||||
|
|
|
@ -23,7 +23,7 @@ from pip._internal.utils.typing import MYPY_CHECK_RUNNING
|
|||
from tests.lib.path import Path, curdir
|
||||
|
||||
if MYPY_CHECK_RUNNING:
|
||||
from typing import Iterable, List, Optional
|
||||
from typing import List, Optional
|
||||
from pip._internal.models.target_python import TargetPython
|
||||
|
||||
|
||||
|
@ -84,7 +84,6 @@ def make_test_finder(
|
|||
find_links=None, # type: Optional[List[str]]
|
||||
index_urls=None, # type: Optional[List[str]]
|
||||
allow_all_prereleases=False, # type: bool
|
||||
trusted_hosts=None, # type: Optional[Iterable[str]]
|
||||
session=None, # type: Optional[PipSession]
|
||||
target_python=None, # type: Optional[TargetPython]
|
||||
):
|
||||
|
@ -111,7 +110,6 @@ def make_test_finder(
|
|||
return PackageFinder.create(
|
||||
search_scope=search_scope,
|
||||
selection_prefs=selection_prefs,
|
||||
trusted_hosts=trusted_hosts,
|
||||
session=session,
|
||||
target_python=target_python,
|
||||
)
|
||||
|
|
|
@ -1,5 +1,6 @@
|
|||
import functools
|
||||
import hashlib
|
||||
import logging
|
||||
import os
|
||||
import shutil
|
||||
import sys
|
||||
|
@ -626,6 +627,106 @@ class TestPipSession:
|
|||
# Check that the cache isn't enabled.
|
||||
assert not hasattr(session.adapters["https://example.com/"], "cache")
|
||||
|
||||
def test_add_trusted_host(self):
|
||||
# Leave a gap to test how the ordering is affected.
|
||||
trusted_hosts = ['host1', 'host3']
|
||||
session = PipSession(insecure_hosts=trusted_hosts)
|
||||
insecure_adapter = session._insecure_adapter
|
||||
prefix2 = 'https://host2/'
|
||||
prefix3 = 'https://host3/'
|
||||
|
||||
# Confirm some initial conditions as a baseline.
|
||||
assert session.pip_trusted_hosts == ['host1', 'host3']
|
||||
assert session.adapters[prefix3] is insecure_adapter
|
||||
assert prefix2 not in session.adapters
|
||||
|
||||
# Test adding a new host.
|
||||
session.add_trusted_host('host2')
|
||||
assert session.pip_trusted_hosts == ['host1', 'host3', 'host2']
|
||||
# Check that prefix3 is still present.
|
||||
assert session.adapters[prefix3] is insecure_adapter
|
||||
assert session.adapters[prefix2] is insecure_adapter
|
||||
|
||||
# Test that adding the same host doesn't create a duplicate.
|
||||
session.add_trusted_host('host3')
|
||||
assert session.pip_trusted_hosts == ['host1', 'host3', 'host2'], (
|
||||
'actual: {}'.format(session.pip_trusted_hosts)
|
||||
)
|
||||
|
||||
def test_add_trusted_host__logging(self, caplog):
|
||||
"""
|
||||
Test logging when add_trusted_host() is called.
|
||||
"""
|
||||
trusted_hosts = ['host0', 'host1']
|
||||
session = PipSession(insecure_hosts=trusted_hosts)
|
||||
with caplog.at_level(logging.INFO):
|
||||
# Test adding an existing host.
|
||||
session.add_trusted_host('host1', source='somewhere')
|
||||
session.add_trusted_host('host2')
|
||||
# Test calling add_trusted_host() on the same host twice.
|
||||
session.add_trusted_host('host2')
|
||||
|
||||
actual = [(r.levelname, r.message) for r in caplog.records]
|
||||
# Observe that "host0" isn't included in the logs.
|
||||
expected = [
|
||||
('INFO', "adding trusted host: 'host1' (from somewhere)"),
|
||||
('INFO', "adding trusted host: 'host2'"),
|
||||
('INFO', "adding trusted host: 'host2'"),
|
||||
]
|
||||
assert actual == expected
|
||||
|
||||
def test_iter_secure_origins(self):
|
||||
trusted_hosts = ['host1', 'host2']
|
||||
session = PipSession(insecure_hosts=trusted_hosts)
|
||||
|
||||
actual = list(session.iter_secure_origins())
|
||||
assert len(actual) == 8
|
||||
# Spot-check that SECURE_ORIGINS is included.
|
||||
assert actual[0] == ('https', '*', '*')
|
||||
assert actual[-2:] == [
|
||||
('*', 'host1', '*'),
|
||||
('*', 'host2', '*'),
|
||||
]
|
||||
|
||||
def test_iter_secure_origins__insecure_hosts_empty(self):
|
||||
"""
|
||||
Test iter_secure_origins() after passing insecure_hosts=[].
|
||||
"""
|
||||
session = PipSession(insecure_hosts=[])
|
||||
|
||||
actual = list(session.iter_secure_origins())
|
||||
assert len(actual) == 6
|
||||
# Spot-check that SECURE_ORIGINS is included.
|
||||
assert actual[0] == ('https', '*', '*')
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
("location", "trusted", "expected"),
|
||||
[
|
||||
("http://pypi.org/something", [], True),
|
||||
("https://pypi.org/something", [], False),
|
||||
("git+http://pypi.org/something", [], True),
|
||||
("git+https://pypi.org/something", [], False),
|
||||
("git+ssh://git@pypi.org/something", [], False),
|
||||
("http://localhost", [], False),
|
||||
("http://127.0.0.1", [], False),
|
||||
("http://example.com/something/", [], True),
|
||||
("http://example.com/something/", ["example.com"], False),
|
||||
("http://eXample.com/something/", ["example.cOm"], False),
|
||||
],
|
||||
)
|
||||
def test_secure_origin(self, location, trusted, expected):
|
||||
class MockLogger(object):
|
||||
def __init__(self):
|
||||
self.called = False
|
||||
|
||||
def warning(self, *args, **kwargs):
|
||||
self.called = True
|
||||
|
||||
session = PipSession(insecure_hosts=trusted)
|
||||
logger = MockLogger()
|
||||
session.is_secure_origin(logger, location)
|
||||
assert logger.called == expected
|
||||
|
||||
|
||||
@pytest.mark.parametrize(["input_url", "url", "username", "password"], [
|
||||
(
|
||||
|
|
|
@ -642,96 +642,6 @@ class TestPackageFinder:
|
|||
# Check that the attributes weren't reset.
|
||||
assert actual_format_control.only_binary == {':all:'}
|
||||
|
||||
def test_add_trusted_host(self):
|
||||
# Leave a gap to test how the ordering is affected.
|
||||
trusted_hosts = ['host1', 'host3']
|
||||
session = PipSession(insecure_hosts=trusted_hosts)
|
||||
finder = make_test_finder(
|
||||
session=session,
|
||||
trusted_hosts=trusted_hosts,
|
||||
)
|
||||
insecure_adapter = session._insecure_adapter
|
||||
prefix2 = 'https://host2/'
|
||||
prefix3 = 'https://host3/'
|
||||
|
||||
# Confirm some initial conditions as a baseline.
|
||||
assert finder.trusted_hosts == ['host1', 'host3']
|
||||
assert session.adapters[prefix3] is insecure_adapter
|
||||
assert prefix2 not in session.adapters
|
||||
|
||||
# Test adding a new host.
|
||||
finder.add_trusted_host('host2')
|
||||
assert finder.trusted_hosts == ['host1', 'host3', 'host2']
|
||||
# Check that prefix3 is still present.
|
||||
assert session.adapters[prefix3] is insecure_adapter
|
||||
assert session.adapters[prefix2] is insecure_adapter
|
||||
|
||||
# Test that adding the same host doesn't create a duplicate.
|
||||
finder.add_trusted_host('host3')
|
||||
assert finder.trusted_hosts == ['host1', 'host3', 'host2'], (
|
||||
'actual: {}'.format(finder.trusted_hosts)
|
||||
)
|
||||
|
||||
def test_add_trusted_host__logging(self, caplog):
|
||||
"""
|
||||
Test logging when add_trusted_host() is called.
|
||||
"""
|
||||
trusted_hosts = ['host1']
|
||||
session = PipSession(insecure_hosts=trusted_hosts)
|
||||
finder = make_test_finder(
|
||||
session=session,
|
||||
trusted_hosts=trusted_hosts,
|
||||
)
|
||||
with caplog.at_level(logging.INFO):
|
||||
# Test adding an existing host.
|
||||
finder.add_trusted_host('host1', source='somewhere')
|
||||
finder.add_trusted_host('host2')
|
||||
# Test calling add_trusted_host() on the same host twice.
|
||||
finder.add_trusted_host('host2')
|
||||
|
||||
actual = [(r.levelname, r.message) for r in caplog.records]
|
||||
expected = [
|
||||
('INFO', "adding trusted host: 'host1' (from somewhere)"),
|
||||
('INFO', "adding trusted host: 'host2'"),
|
||||
('INFO', "adding trusted host: 'host2'"),
|
||||
]
|
||||
assert actual == expected
|
||||
|
||||
def test_iter_secure_origins(self):
|
||||
trusted_hosts = ['host1', 'host2']
|
||||
finder = make_test_finder(trusted_hosts=trusted_hosts)
|
||||
|
||||
actual = list(finder.iter_secure_origins())
|
||||
assert len(actual) == 8
|
||||
# Spot-check that SECURE_ORIGINS is included.
|
||||
assert actual[0] == ('https', '*', '*')
|
||||
assert actual[-2:] == [
|
||||
('*', 'host1', '*'),
|
||||
('*', 'host2', '*'),
|
||||
]
|
||||
|
||||
def test_iter_secure_origins__none_trusted_hosts(self):
|
||||
"""
|
||||
Test iter_secure_origins() after passing trusted_hosts=None.
|
||||
"""
|
||||
# Use PackageFinder.create() rather than make_test_finder()
|
||||
# to make sure we're really passing trusted_hosts=None.
|
||||
search_scope = SearchScope([], [])
|
||||
selection_prefs = SelectionPreferences(
|
||||
allow_yanked=True,
|
||||
)
|
||||
finder = PackageFinder.create(
|
||||
search_scope=search_scope,
|
||||
selection_prefs=selection_prefs,
|
||||
trusted_hosts=None,
|
||||
session=object(),
|
||||
)
|
||||
|
||||
actual = list(finder.iter_secure_origins())
|
||||
assert len(actual) == 6
|
||||
# Spot-check that SECURE_ORIGINS is included.
|
||||
assert actual[0] == ('https', '*', '*')
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
'allow_yanked, ignore_requires_python, only_binary, expected_formats',
|
||||
[
|
||||
|
@ -875,36 +785,6 @@ def test_determine_base_url(html, url, expected):
|
|||
assert _determine_base_url(document, url) == expected
|
||||
|
||||
|
||||
class MockLogger(object):
|
||||
def __init__(self):
|
||||
self.called = False
|
||||
|
||||
def warning(self, *args, **kwargs):
|
||||
self.called = True
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
("location", "trusted", "expected"),
|
||||
[
|
||||
("http://pypi.org/something", [], True),
|
||||
("https://pypi.org/something", [], False),
|
||||
("git+http://pypi.org/something", [], True),
|
||||
("git+https://pypi.org/something", [], False),
|
||||
("git+ssh://git@pypi.org/something", [], False),
|
||||
("http://localhost", [], False),
|
||||
("http://127.0.0.1", [], False),
|
||||
("http://example.com/something/", [], True),
|
||||
("http://example.com/something/", ["example.com"], False),
|
||||
("http://eXample.com/something/", ["example.cOm"], False),
|
||||
],
|
||||
)
|
||||
def test_secure_origin(location, trusted, expected):
|
||||
finder = make_test_finder(trusted_hosts=trusted)
|
||||
logger = MockLogger()
|
||||
finder._validate_secure_origin(logger, location)
|
||||
assert logger.called == expected
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
("fragment", "canonical_name", "expected"),
|
||||
[
|
||||
|
|
|
@ -342,12 +342,13 @@ class TestProcessLine(object):
|
|||
list(process_line("--extra-index-url=url", "file", 1, finder=finder))
|
||||
assert finder.index_urls == ['url']
|
||||
|
||||
def test_set_finder_trusted_host(self, caplog, finder):
|
||||
def test_set_finder_trusted_host(self, caplog, session, finder):
|
||||
with caplog.at_level(logging.INFO):
|
||||
list(process_line(
|
||||
"--trusted-host=host", "file.txt", 1, finder=finder,
|
||||
session=session,
|
||||
))
|
||||
assert finder.trusted_hosts == ['host']
|
||||
assert list(finder.trusted_hosts) == ['host']
|
||||
session = finder.session
|
||||
assert session.adapters['https://host/'] is session._insecure_adapter
|
||||
|
||||
|
|
|
@ -59,7 +59,7 @@ def _options():
|
|||
''' Some default options that we pass to outdated.pip_version_check '''
|
||||
return pretend.stub(
|
||||
find_links=[], index_url='default_url', extra_index_urls=[],
|
||||
no_index=False, pre=False, trusted_hosts=False, cache_dir='',
|
||||
no_index=False, pre=False, cache_dir='',
|
||||
)
|
||||
|
||||
|
||||
|
|
Loading…
Reference in New Issue