1
1
Fork 0
mirror of https://github.com/pypa/pip synced 2023-12-13 21:30:23 +01:00

Move PipSession to network.session (#7089)

This commit is contained in:
Pradyun Gedam 2019-09-28 11:55:15 +05:30 committed by GitHub
commit ad33ecef74
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
16 changed files with 656 additions and 618 deletions

View file

@ -10,11 +10,11 @@ from functools import partial
from pip._internal.cli.base_command import Command from pip._internal.cli.base_command import Command
from pip._internal.cli.command_context import CommandContextMixIn from pip._internal.cli.command_context import CommandContextMixIn
from pip._internal.download import PipSession
from pip._internal.exceptions import CommandError from pip._internal.exceptions import CommandError
from pip._internal.index import PackageFinder from pip._internal.index import PackageFinder
from pip._internal.legacy_resolve import Resolver from pip._internal.legacy_resolve import Resolver
from pip._internal.models.selection_prefs import SelectionPreferences from pip._internal.models.selection_prefs import SelectionPreferences
from pip._internal.network.session import PipSession
from pip._internal.operations.prepare import RequirementPreparer from pip._internal.operations.prepare import RequirementPreparer
from pip._internal.req.constructors import ( from pip._internal.req.constructors import (
install_req_from_editable, install_req_from_editable,

View file

@ -32,7 +32,7 @@ if MYPY_CHECK_RUNNING:
from pip._vendor.requests import Response from pip._vendor.requests import Response
from pip._internal.models.search_scope import SearchScope from pip._internal.models.search_scope import SearchScope
from pip._internal.download import PipSession from pip._internal.network.session import PipSession
HTMLElement = xml.etree.ElementTree.Element HTMLElement = xml.etree.ElementTree.Element
ResponseHeaders = MutableMapping[str, str] ResponseHeaders = MutableMapping[str, str]

View file

@ -1,47 +1,33 @@
from __future__ import absolute_import from __future__ import absolute_import
import cgi import cgi
import email.utils
import json
import logging import logging
import mimetypes import mimetypes
import os import os
import platform
import re import re
import shutil import shutil
import sys import sys
from pip._vendor import requests, six, urllib3 from pip._vendor import requests
from pip._vendor.cachecontrol import CacheControlAdapter
from pip._vendor.requests.adapters import BaseAdapter, HTTPAdapter
from pip._vendor.requests.models import CONTENT_CHUNK_SIZE, Response from pip._vendor.requests.models import CONTENT_CHUNK_SIZE, Response
from pip._vendor.requests.structures import CaseInsensitiveDict
from pip._vendor.six import PY2 from pip._vendor.six import PY2
# NOTE: XMLRPC Client is not annotated in typeshed as on 2017-07-17, which is # NOTE: XMLRPC Client is not annotated in typeshed as on 2017-07-17, which is
# why we ignore the type on this import # why we ignore the type on this import
from pip._vendor.six.moves import xmlrpc_client # type: ignore from pip._vendor.six.moves import xmlrpc_client # type: ignore
from pip._vendor.six.moves.urllib import parse as urllib_parse from pip._vendor.six.moves.urllib import parse as urllib_parse
import pip
from pip._internal.exceptions import HashMismatch, InstallationError from pip._internal.exceptions import HashMismatch, InstallationError
from pip._internal.models.index import PyPI from pip._internal.models.index import PyPI
from pip._internal.network.auth import MultiDomainBasicAuth from pip._internal.network.session import PipSession
from pip._internal.network.cache import SafeFileCache
# Import ssl from compat so the initial import occurs in only one place.
from pip._internal.utils.compat import HAS_TLS, ipaddress, ssl
from pip._internal.utils.encoding import auto_decode from pip._internal.utils.encoding import auto_decode
from pip._internal.utils.filesystem import check_path_owner, copy2_fixed from pip._internal.utils.filesystem import copy2_fixed
from pip._internal.utils.glibc import libc_ver
from pip._internal.utils.misc import ( from pip._internal.utils.misc import (
ask_path_exists, ask_path_exists,
backup_dir, backup_dir,
build_url_from_netloc,
consume, consume,
display_path, display_path,
format_size, format_size,
get_installed_version,
hide_url, hide_url,
parse_netloc,
path_to_display, path_to_display,
rmtree, rmtree,
splitext, splitext,
@ -50,12 +36,12 @@ from pip._internal.utils.temp_dir import TempDirectory
from pip._internal.utils.typing import MYPY_CHECK_RUNNING from pip._internal.utils.typing import MYPY_CHECK_RUNNING
from pip._internal.utils.ui import DownloadProgressProvider from pip._internal.utils.ui import DownloadProgressProvider
from pip._internal.utils.unpacking import unpack_file from pip._internal.utils.unpacking import unpack_file
from pip._internal.utils.urls import get_url_scheme, url_to_path from pip._internal.utils.urls import get_url_scheme
from pip._internal.vcs import vcs from pip._internal.vcs import vcs
if MYPY_CHECK_RUNNING: if MYPY_CHECK_RUNNING:
from typing import ( from typing import (
IO, Callable, Iterator, List, Optional, Text, Tuple, Union, IO, Callable, List, Optional, Text, Tuple,
) )
from mypy_extensions import TypedDict from mypy_extensions import TypedDict
@ -64,8 +50,6 @@ if MYPY_CHECK_RUNNING:
from pip._internal.utils.hashes import Hashes from pip._internal.utils.hashes import Hashes
from pip._internal.vcs.versioncontrol import VersionControl from pip._internal.vcs.versioncontrol import VersionControl
SecureOrigin = Tuple[str, str, Optional[Union[int, str]]]
if PY2: if PY2:
CopytreeKwargs = TypedDict( CopytreeKwargs = TypedDict(
'CopytreeKwargs', 'CopytreeKwargs',
@ -98,374 +82,6 @@ __all__ = ['get_file_content',
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
SECURE_ORIGINS = [
# protocol, hostname, port
# Taken from Chrome's list of secure origins (See: http://bit.ly/1qrySKC)
("https", "*", "*"),
("*", "localhost", "*"),
("*", "127.0.0.0/8", "*"),
("*", "::1/128", "*"),
("file", "*", None),
# ssh is always secure.
("ssh", "*", "*"),
] # type: List[SecureOrigin]
# These are environment variables present when running under various
# CI systems. For each variable, some CI systems that use the variable
# are indicated. The collection was chosen so that for each of a number
# of popular systems, at least one of the environment variables is used.
# This list is used to provide some indication of and lower bound for
# CI traffic to PyPI. Thus, it is okay if the list is not comprehensive.
# For more background, see: https://github.com/pypa/pip/issues/5499
CI_ENVIRONMENT_VARIABLES = (
# Azure Pipelines
'BUILD_BUILDID',
# Jenkins
'BUILD_ID',
# AppVeyor, CircleCI, Codeship, Gitlab CI, Shippable, Travis CI
'CI',
# Explicit environment variable.
'PIP_IS_CI',
)
def looks_like_ci():
# type: () -> bool
"""
Return whether it looks like pip is running under CI.
"""
# We don't use the method of checking for a tty (e.g. using isatty())
# because some CI systems mimic a tty (e.g. Travis CI). Thus that
# method doesn't provide definitive information in either direction.
return any(name in os.environ for name in CI_ENVIRONMENT_VARIABLES)
def user_agent():
"""
Return a string representing the user agent.
"""
data = {
"installer": {"name": "pip", "version": pip.__version__},
"python": platform.python_version(),
"implementation": {
"name": platform.python_implementation(),
},
}
if data["implementation"]["name"] == 'CPython':
data["implementation"]["version"] = platform.python_version()
elif data["implementation"]["name"] == 'PyPy':
if sys.pypy_version_info.releaselevel == 'final':
pypy_version_info = sys.pypy_version_info[:3]
else:
pypy_version_info = sys.pypy_version_info
data["implementation"]["version"] = ".".join(
[str(x) for x in pypy_version_info]
)
elif data["implementation"]["name"] == 'Jython':
# Complete Guess
data["implementation"]["version"] = platform.python_version()
elif data["implementation"]["name"] == 'IronPython':
# Complete Guess
data["implementation"]["version"] = platform.python_version()
if sys.platform.startswith("linux"):
from pip._vendor import distro
distro_infos = dict(filter(
lambda x: x[1],
zip(["name", "version", "id"], distro.linux_distribution()),
))
libc = dict(filter(
lambda x: x[1],
zip(["lib", "version"], libc_ver()),
))
if libc:
distro_infos["libc"] = libc
if distro_infos:
data["distro"] = distro_infos
if sys.platform.startswith("darwin") and platform.mac_ver()[0]:
data["distro"] = {"name": "macOS", "version": platform.mac_ver()[0]}
if platform.system():
data.setdefault("system", {})["name"] = platform.system()
if platform.release():
data.setdefault("system", {})["release"] = platform.release()
if platform.machine():
data["cpu"] = platform.machine()
if HAS_TLS:
data["openssl_version"] = ssl.OPENSSL_VERSION
setuptools_version = get_installed_version("setuptools")
if setuptools_version is not None:
data["setuptools_version"] = setuptools_version
# Use None rather than False so as not to give the impression that
# pip knows it is not being run under CI. Rather, it is a null or
# inconclusive result. Also, we include some value rather than no
# value to make it easier to know that the check has been run.
data["ci"] = True if looks_like_ci() else None
user_data = os.environ.get("PIP_USER_AGENT_USER_DATA")
if user_data is not None:
data["user_data"] = user_data
return "{data[installer][name]}/{data[installer][version]} {json}".format(
data=data,
json=json.dumps(data, separators=(",", ":"), sort_keys=True),
)
class LocalFSAdapter(BaseAdapter):
def send(self, request, stream=None, timeout=None, verify=None, cert=None,
proxies=None):
pathname = url_to_path(request.url)
resp = Response()
resp.status_code = 200
resp.url = request.url
try:
stats = os.stat(pathname)
except OSError as exc:
resp.status_code = 404
resp.raw = exc
else:
modified = email.utils.formatdate(stats.st_mtime, usegmt=True)
content_type = mimetypes.guess_type(pathname)[0] or "text/plain"
resp.headers = CaseInsensitiveDict({
"Content-Type": content_type,
"Content-Length": stats.st_size,
"Last-Modified": modified,
})
resp.raw = open(pathname, "rb")
resp.close = resp.raw.close
return resp
def close(self):
pass
class InsecureHTTPAdapter(HTTPAdapter):
def cert_verify(self, conn, url, verify, cert):
conn.cert_reqs = 'CERT_NONE'
conn.ca_certs = None
class PipSession(requests.Session):
timeout = None # type: Optional[int]
def __init__(self, *args, **kwargs):
"""
:param trusted_hosts: Domains not to emit warnings for when not using
HTTPS.
"""
retries = kwargs.pop("retries", 0)
cache = kwargs.pop("cache", None)
trusted_hosts = kwargs.pop("trusted_hosts", []) # type: List[str]
index_urls = kwargs.pop("index_urls", None)
super(PipSession, self).__init__(*args, **kwargs)
# Namespace the attribute with "pip_" just in case to prevent
# possible conflicts with the base class.
self.pip_trusted_origins = [] # type: List[Tuple[str, Optional[int]]]
# Attach our User Agent to the request
self.headers["User-Agent"] = user_agent()
# Attach our Authentication handler to the session
self.auth = MultiDomainBasicAuth(index_urls=index_urls)
# Create our urllib3.Retry instance which will allow us to customize
# how we handle retries.
retries = urllib3.Retry(
# Set the total number of retries that a particular request can
# have.
total=retries,
# A 503 error from PyPI typically means that the Fastly -> Origin
# connection got interrupted in some way. A 503 error in general
# is typically considered a transient error so we'll go ahead and
# retry it.
# A 500 may indicate transient error in Amazon S3
# A 520 or 527 - may indicate transient error in CloudFlare
status_forcelist=[500, 503, 520, 527],
# Add a small amount of back off between failed requests in
# order to prevent hammering the service.
backoff_factor=0.25,
)
# Check to ensure that the directory containing our cache directory
# is owned by the user current executing pip. If it does not exist
# we will check the parent directory until we find one that does exist.
if cache and not check_path_owner(cache):
logger.warning(
"The directory '%s' or its parent directory is not owned by "
"the current user and the cache has been disabled. Please "
"check the permissions and owner of that directory. If "
"executing pip with sudo, you may want sudo's -H flag.",
cache,
)
cache = None
# We want to _only_ cache responses on securely fetched origins. We do
# this because we can't validate the response of an insecurely fetched
# origin, and we don't want someone to be able to poison the cache and
# require manual eviction from the cache to fix it.
if cache:
secure_adapter = CacheControlAdapter(
cache=SafeFileCache(cache),
max_retries=retries,
)
else:
secure_adapter = HTTPAdapter(max_retries=retries)
# Our Insecure HTTPAdapter disables HTTPS validation. It does not
# support caching (see above) so we'll use it for all http:// URLs as
# well as any https:// host that we've marked as ignoring TLS errors
# for.
insecure_adapter = InsecureHTTPAdapter(max_retries=retries)
# Save this for later use in add_insecure_host().
self._insecure_adapter = insecure_adapter
self.mount("https://", secure_adapter)
self.mount("http://", insecure_adapter)
# Enable file:// urls
self.mount("file://", LocalFSAdapter())
for host in trusted_hosts:
self.add_trusted_host(host, suppress_logging=True)
def add_trusted_host(self, host, source=None, suppress_logging=False):
# type: (str, Optional[str], bool) -> None
"""
:param host: It is okay to provide a host that has previously been
added.
:param source: An optional source string, for logging where the host
string came from.
"""
if not suppress_logging:
msg = 'adding trusted host: {!r}'.format(host)
if source is not None:
msg += ' (from {})'.format(source)
logger.info(msg)
host_port = parse_netloc(host)
if host_port not in self.pip_trusted_origins:
self.pip_trusted_origins.append(host_port)
self.mount(build_url_from_netloc(host) + '/', self._insecure_adapter)
if not host_port[1]:
# Mount wildcard ports for the same host.
self.mount(
build_url_from_netloc(host) + ':',
self._insecure_adapter
)
def iter_secure_origins(self):
# type: () -> Iterator[SecureOrigin]
for secure_origin in SECURE_ORIGINS:
yield secure_origin
for host, port in self.pip_trusted_origins:
yield ('*', host, '*' if port is None else port)
def is_secure_origin(self, location):
# type: (Link) -> bool
# Determine if this url used a secure transport mechanism
parsed = urllib_parse.urlparse(str(location))
origin_protocol, origin_host, origin_port = (
parsed.scheme, parsed.hostname, parsed.port,
)
# The protocol to use to see if the protocol matches.
# Don't count the repository type as part of the protocol: in
# cases such as "git+ssh", only use "ssh". (I.e., Only verify against
# the last scheme.)
origin_protocol = origin_protocol.rsplit('+', 1)[-1]
# Determine if our origin is a secure origin by looking through our
# hardcoded list of secure origins, as well as any additional ones
# configured on this PackageFinder instance.
for secure_origin in self.iter_secure_origins():
secure_protocol, secure_host, secure_port = secure_origin
if origin_protocol != secure_protocol and secure_protocol != "*":
continue
try:
# We need to do this decode dance to ensure that we have a
# unicode object, even on Python 2.x.
addr = ipaddress.ip_address(
origin_host
if (
isinstance(origin_host, six.text_type) or
origin_host is None
)
else origin_host.decode("utf8")
)
network = ipaddress.ip_network(
secure_host
if isinstance(secure_host, six.text_type)
# setting secure_host to proper Union[bytes, str]
# creates problems in other places
else secure_host.decode("utf8") # type: ignore
)
except ValueError:
# We don't have both a valid address or a valid network, so
# we'll check this origin against hostnames.
if (origin_host and
origin_host.lower() != secure_host.lower() and
secure_host != "*"):
continue
else:
# We have a valid address and network, so see if the address
# is contained within the network.
if addr not in network:
continue
# Check to see if the port matches.
if (origin_port != secure_port and
secure_port != "*" and
secure_port is not None):
continue
# If we've gotten here, then this origin matches the current
# secure origin and we should return True
return True
# If we've gotten to this point, then the origin isn't secure and we
# will not accept it as a valid location to search. We will however
# log a warning that we are ignoring it.
logger.warning(
"The repository located at %s is not a trusted or secure host and "
"is being ignored. If this repository is available via HTTPS we "
"recommend you use HTTPS instead, otherwise you may silence "
"this warning and allow it anyway with '--trusted-host %s'.",
origin_host,
origin_host,
)
return False
def request(self, method, url, *args, **kwargs):
# Allow setting a default timeout on a session
kwargs.setdefault("timeout", self.timeout)
# Dispatch the actual request
return super(PipSession, self).request(method, url, *args, **kwargs)
def get_file_content(url, comes_from=None, session=None): def get_file_content(url, comes_from=None, session=None):
# type: (str, Optional[str], Optional[PipSession]) -> Tuple[str, Text] # type: (str, Optional[str], Optional[PipSession]) -> Tuple[str, Text]
"""Gets the content of a file; it may be a filename, file: URL, or """Gets the content of a file; it may be a filename, file: URL, or

View file

@ -44,7 +44,7 @@ if MYPY_CHECK_RUNNING:
from pip._vendor import pkg_resources from pip._vendor import pkg_resources
from pip._internal.distributions import AbstractDistribution from pip._internal.distributions import AbstractDistribution
from pip._internal.download import PipSession from pip._internal.network.session import PipSession
from pip._internal.index import PackageFinder from pip._internal.index import PackageFinder
from pip._internal.operations.prepare import RequirementPreparer from pip._internal.operations.prepare import RequirementPreparer
from pip._internal.req.req_install import InstallRequirement from pip._internal.req.req_install import InstallRequirement

View file

@ -0,0 +1,416 @@
"""PipSession and supporting code, containing all pip-specific
network request configuration and behavior.
"""
import email.utils
import json
import logging
import mimetypes
import os
import platform
import sys
from pip._vendor import requests, six, urllib3
from pip._vendor.cachecontrol import CacheControlAdapter
from pip._vendor.requests.adapters import BaseAdapter, HTTPAdapter
from pip._vendor.requests.models import Response
from pip._vendor.requests.structures import CaseInsensitiveDict
from pip._vendor.six.moves.urllib import parse as urllib_parse
from pip import __version__
from pip._internal.network.auth import MultiDomainBasicAuth
from pip._internal.network.cache import SafeFileCache
# Import ssl from compat so the initial import occurs in only one place.
from pip._internal.utils.compat import HAS_TLS, ipaddress, ssl
from pip._internal.utils.filesystem import check_path_owner
from pip._internal.utils.glibc import libc_ver
from pip._internal.utils.misc import (
build_url_from_netloc,
get_installed_version,
parse_netloc,
)
from pip._internal.utils.typing import MYPY_CHECK_RUNNING
from pip._internal.utils.urls import url_to_path
if MYPY_CHECK_RUNNING:
from typing import (
Iterator, List, Optional, Tuple, Union,
)
from pip._internal.models.link import Link
SecureOrigin = Tuple[str, str, Optional[Union[int, str]]]
logger = logging.getLogger(__name__)
SECURE_ORIGINS = [
# protocol, hostname, port
# Taken from Chrome's list of secure origins (See: http://bit.ly/1qrySKC)
("https", "*", "*"),
("*", "localhost", "*"),
("*", "127.0.0.0/8", "*"),
("*", "::1/128", "*"),
("file", "*", None),
# ssh is always secure.
("ssh", "*", "*"),
] # type: List[SecureOrigin]
# These are environment variables present when running under various
# CI systems. For each variable, some CI systems that use the variable
# are indicated. The collection was chosen so that for each of a number
# of popular systems, at least one of the environment variables is used.
# This list is used to provide some indication of and lower bound for
# CI traffic to PyPI. Thus, it is okay if the list is not comprehensive.
# For more background, see: https://github.com/pypa/pip/issues/5499
CI_ENVIRONMENT_VARIABLES = (
# Azure Pipelines
'BUILD_BUILDID',
# Jenkins
'BUILD_ID',
# AppVeyor, CircleCI, Codeship, Gitlab CI, Shippable, Travis CI
'CI',
# Explicit environment variable.
'PIP_IS_CI',
)
def looks_like_ci():
# type: () -> bool
"""
Return whether it looks like pip is running under CI.
"""
# We don't use the method of checking for a tty (e.g. using isatty())
# because some CI systems mimic a tty (e.g. Travis CI). Thus that
# method doesn't provide definitive information in either direction.
return any(name in os.environ for name in CI_ENVIRONMENT_VARIABLES)
def user_agent():
"""
Return a string representing the user agent.
"""
data = {
"installer": {"name": "pip", "version": __version__},
"python": platform.python_version(),
"implementation": {
"name": platform.python_implementation(),
},
}
if data["implementation"]["name"] == 'CPython':
data["implementation"]["version"] = platform.python_version()
elif data["implementation"]["name"] == 'PyPy':
if sys.pypy_version_info.releaselevel == 'final':
pypy_version_info = sys.pypy_version_info[:3]
else:
pypy_version_info = sys.pypy_version_info
data["implementation"]["version"] = ".".join(
[str(x) for x in pypy_version_info]
)
elif data["implementation"]["name"] == 'Jython':
# Complete Guess
data["implementation"]["version"] = platform.python_version()
elif data["implementation"]["name"] == 'IronPython':
# Complete Guess
data["implementation"]["version"] = platform.python_version()
if sys.platform.startswith("linux"):
from pip._vendor import distro
distro_infos = dict(filter(
lambda x: x[1],
zip(["name", "version", "id"], distro.linux_distribution()),
))
libc = dict(filter(
lambda x: x[1],
zip(["lib", "version"], libc_ver()),
))
if libc:
distro_infos["libc"] = libc
if distro_infos:
data["distro"] = distro_infos
if sys.platform.startswith("darwin") and platform.mac_ver()[0]:
data["distro"] = {"name": "macOS", "version": platform.mac_ver()[0]}
if platform.system():
data.setdefault("system", {})["name"] = platform.system()
if platform.release():
data.setdefault("system", {})["release"] = platform.release()
if platform.machine():
data["cpu"] = platform.machine()
if HAS_TLS:
data["openssl_version"] = ssl.OPENSSL_VERSION
setuptools_version = get_installed_version("setuptools")
if setuptools_version is not None:
data["setuptools_version"] = setuptools_version
# Use None rather than False so as not to give the impression that
# pip knows it is not being run under CI. Rather, it is a null or
# inconclusive result. Also, we include some value rather than no
# value to make it easier to know that the check has been run.
data["ci"] = True if looks_like_ci() else None
user_data = os.environ.get("PIP_USER_AGENT_USER_DATA")
if user_data is not None:
data["user_data"] = user_data
return "{data[installer][name]}/{data[installer][version]} {json}".format(
data=data,
json=json.dumps(data, separators=(",", ":"), sort_keys=True),
)
class LocalFSAdapter(BaseAdapter):
def send(self, request, stream=None, timeout=None, verify=None, cert=None,
proxies=None):
pathname = url_to_path(request.url)
resp = Response()
resp.status_code = 200
resp.url = request.url
try:
stats = os.stat(pathname)
except OSError as exc:
resp.status_code = 404
resp.raw = exc
else:
modified = email.utils.formatdate(stats.st_mtime, usegmt=True)
content_type = mimetypes.guess_type(pathname)[0] or "text/plain"
resp.headers = CaseInsensitiveDict({
"Content-Type": content_type,
"Content-Length": stats.st_size,
"Last-Modified": modified,
})
resp.raw = open(pathname, "rb")
resp.close = resp.raw.close
return resp
def close(self):
pass
class InsecureHTTPAdapter(HTTPAdapter):
def cert_verify(self, conn, url, verify, cert):
conn.cert_reqs = 'CERT_NONE'
conn.ca_certs = None
class PipSession(requests.Session):
timeout = None # type: Optional[int]
def __init__(self, *args, **kwargs):
"""
:param trusted_hosts: Domains not to emit warnings for when not using
HTTPS.
"""
retries = kwargs.pop("retries", 0)
cache = kwargs.pop("cache", None)
trusted_hosts = kwargs.pop("trusted_hosts", []) # type: List[str]
index_urls = kwargs.pop("index_urls", None)
super(PipSession, self).__init__(*args, **kwargs)
# Namespace the attribute with "pip_" just in case to prevent
# possible conflicts with the base class.
self.pip_trusted_origins = [] # type: List[Tuple[str, Optional[int]]]
# Attach our User Agent to the request
self.headers["User-Agent"] = user_agent()
# Attach our Authentication handler to the session
self.auth = MultiDomainBasicAuth(index_urls=index_urls)
# Create our urllib3.Retry instance which will allow us to customize
# how we handle retries.
retries = urllib3.Retry(
# Set the total number of retries that a particular request can
# have.
total=retries,
# A 503 error from PyPI typically means that the Fastly -> Origin
# connection got interrupted in some way. A 503 error in general
# is typically considered a transient error so we'll go ahead and
# retry it.
# A 500 may indicate transient error in Amazon S3
# A 520 or 527 - may indicate transient error in CloudFlare
status_forcelist=[500, 503, 520, 527],
# Add a small amount of back off between failed requests in
# order to prevent hammering the service.
backoff_factor=0.25,
)
# Check to ensure that the directory containing our cache directory
# is owned by the user current executing pip. If it does not exist
# we will check the parent directory until we find one that does exist.
if cache and not check_path_owner(cache):
logger.warning(
"The directory '%s' or its parent directory is not owned by "
"the current user and the cache has been disabled. Please "
"check the permissions and owner of that directory. If "
"executing pip with sudo, you may want sudo's -H flag.",
cache,
)
cache = None
# We want to _only_ cache responses on securely fetched origins. We do
# this because we can't validate the response of an insecurely fetched
# origin, and we don't want someone to be able to poison the cache and
# require manual eviction from the cache to fix it.
if cache:
secure_adapter = CacheControlAdapter(
cache=SafeFileCache(cache),
max_retries=retries,
)
else:
secure_adapter = HTTPAdapter(max_retries=retries)
# Our Insecure HTTPAdapter disables HTTPS validation. It does not
# support caching (see above) so we'll use it for all http:// URLs as
# well as any https:// host that we've marked as ignoring TLS errors
# for.
insecure_adapter = InsecureHTTPAdapter(max_retries=retries)
# Save this for later use in add_insecure_host().
self._insecure_adapter = insecure_adapter
self.mount("https://", secure_adapter)
self.mount("http://", insecure_adapter)
# Enable file:// urls
self.mount("file://", LocalFSAdapter())
for host in trusted_hosts:
self.add_trusted_host(host, suppress_logging=True)
def add_trusted_host(self, host, source=None, suppress_logging=False):
# type: (str, Optional[str], bool) -> None
"""
:param host: It is okay to provide a host that has previously been
added.
:param source: An optional source string, for logging where the host
string came from.
"""
if not suppress_logging:
msg = 'adding trusted host: {!r}'.format(host)
if source is not None:
msg += ' (from {})'.format(source)
logger.info(msg)
host_port = parse_netloc(host)
if host_port not in self.pip_trusted_origins:
self.pip_trusted_origins.append(host_port)
self.mount(build_url_from_netloc(host) + '/', self._insecure_adapter)
if not host_port[1]:
# Mount wildcard ports for the same host.
self.mount(
build_url_from_netloc(host) + ':',
self._insecure_adapter
)
def iter_secure_origins(self):
# type: () -> Iterator[SecureOrigin]
for secure_origin in SECURE_ORIGINS:
yield secure_origin
for host, port in self.pip_trusted_origins:
yield ('*', host, '*' if port is None else port)
def is_secure_origin(self, location):
# type: (Link) -> bool
# Determine if this url used a secure transport mechanism
parsed = urllib_parse.urlparse(str(location))
origin_protocol, origin_host, origin_port = (
parsed.scheme, parsed.hostname, parsed.port,
)
# The protocol to use to see if the protocol matches.
# Don't count the repository type as part of the protocol: in
# cases such as "git+ssh", only use "ssh". (I.e., Only verify against
# the last scheme.)
origin_protocol = origin_protocol.rsplit('+', 1)[-1]
# Determine if our origin is a secure origin by looking through our
# hardcoded list of secure origins, as well as any additional ones
# configured on this PackageFinder instance.
for secure_origin in self.iter_secure_origins():
secure_protocol, secure_host, secure_port = secure_origin
if origin_protocol != secure_protocol and secure_protocol != "*":
continue
try:
# We need to do this decode dance to ensure that we have a
# unicode object, even on Python 2.x.
addr = ipaddress.ip_address(
origin_host
if (
isinstance(origin_host, six.text_type) or
origin_host is None
)
else origin_host.decode("utf8")
)
network = ipaddress.ip_network(
secure_host
if isinstance(secure_host, six.text_type)
# setting secure_host to proper Union[bytes, str]
# creates problems in other places
else secure_host.decode("utf8") # type: ignore
)
except ValueError:
# We don't have both a valid address or a valid network, so
# we'll check this origin against hostnames.
if (
origin_host and
origin_host.lower() != secure_host.lower() and
secure_host != "*"
):
continue
else:
# We have a valid address and network, so see if the address
# is contained within the network.
if addr not in network:
continue
# Check to see if the port matches.
if (
origin_port != secure_port and
secure_port != "*" and
secure_port is not None
):
continue
# If we've gotten here, then this origin matches the current
# secure origin and we should return True
return True
# If we've gotten to this point, then the origin isn't secure and we
# will not accept it as a valid location to search. We will however
# log a warning that we are ignoring it.
logger.warning(
"The repository located at %s is not a trusted or secure host and "
"is being ignored. If this repository is available via HTTPS we "
"recommend you use HTTPS instead, otherwise you may silence "
"this warning and allow it anyway with '--trusted-host %s'.",
origin_host,
origin_host,
)
return False
def request(self, method, url, *args, **kwargs):
# Allow setting a default timeout on a session
kwargs.setdefault("timeout", self.timeout)
# Dispatch the actual request
return super(PipSession, self).request(method, url, *args, **kwargs)

View file

@ -32,8 +32,8 @@ if MYPY_CHECK_RUNNING:
from typing import Optional from typing import Optional
from pip._internal.distributions import AbstractDistribution from pip._internal.distributions import AbstractDistribution
from pip._internal.download import PipSession
from pip._internal.index import PackageFinder from pip._internal.index import PackageFinder
from pip._internal.network.session import PipSession
from pip._internal.req.req_install import InstallRequirement from pip._internal.req.req_install import InstallRequirement
from pip._internal.req.req_tracker import RequirementTracker from pip._internal.req.req_tracker import RequirementTracker

View file

@ -33,7 +33,7 @@ if MYPY_CHECK_RUNNING:
from pip._internal.req import InstallRequirement from pip._internal.req import InstallRequirement
from pip._internal.cache import WheelCache from pip._internal.cache import WheelCache
from pip._internal.index import PackageFinder from pip._internal.index import PackageFinder
from pip._internal.download import PipSession from pip._internal.network.session import PipSession
ReqFileLines = Iterator[Tuple[int, Text]] ReqFileLines = Iterator[Tuple[int, Text]]

View file

@ -33,7 +33,8 @@ if MYPY_CHECK_RUNNING:
import optparse import optparse
from optparse import Values from optparse import Values
from typing import Any, Dict, Text, Union from typing import Any, Dict, Text, Union
from pip._internal.download import PipSession
from pip._internal.network.session import PipSession
SELFCHECK_DATE_FMT = "%Y-%m-%dT%H:%M:%SZ" SELFCHECK_DATE_FMT = "%Y-%m-%dT%H:%M:%SZ"

View file

@ -14,11 +14,11 @@ import pytest
from scripttest import FoundDir, TestFileEnvironment from scripttest import FoundDir, TestFileEnvironment
from pip._internal.collector import LinkCollector from pip._internal.collector import LinkCollector
from pip._internal.download import PipSession
from pip._internal.index import PackageFinder from pip._internal.index import PackageFinder
from pip._internal.locations import get_major_minor_version from pip._internal.locations import get_major_minor_version
from pip._internal.models.search_scope import SearchScope from pip._internal.models.search_scope import SearchScope
from pip._internal.models.selection_prefs import SelectionPreferences from pip._internal.models.selection_prefs import SelectionPreferences
from pip._internal.network.session import PipSession
from pip._internal.utils.deprecation import DEPRECATION_MSG_PREFIX from pip._internal.utils.deprecation import DEPRECATION_MSG_PREFIX
from pip._internal.utils.typing import MYPY_CHECK_RUNNING from pip._internal.utils.typing import MYPY_CHECK_RUNNING
from tests.lib.path import Path, curdir from tests.lib.path import Path, curdir

View file

@ -22,9 +22,9 @@ from pip._internal.collector import (
group_locations, group_locations,
parse_links, parse_links,
) )
from pip._internal.download import PipSession
from pip._internal.models.index import PyPI from pip._internal.models.index import PyPI
from pip._internal.models.link import Link from pip._internal.models.link import Link
from pip._internal.network.session import PipSession
from tests.lib import make_test_link_collector from tests.lib import make_test_link_collector

View file

@ -1,5 +1,4 @@
import hashlib import hashlib
import logging
import os import os
import shutil import shutil
import sys import sys
@ -10,10 +9,7 @@ from tempfile import mkdtemp
import pytest import pytest
from mock import Mock, patch from mock import Mock, patch
import pip
from pip._internal.download import ( from pip._internal.download import (
CI_ENVIRONMENT_VARIABLES,
PipSession,
_copy_source_tree, _copy_source_tree,
_download_http_url, _download_http_url,
parse_content_disposition, parse_content_disposition,
@ -23,6 +19,7 @@ from pip._internal.download import (
) )
from pip._internal.exceptions import HashMismatch from pip._internal.exceptions import HashMismatch
from pip._internal.models.link import Link from pip._internal.models.link import Link
from pip._internal.network.session import PipSession
from pip._internal.utils.hashes import Hashes from pip._internal.utils.hashes import Hashes
from pip._internal.utils.urls import path_to_url from pip._internal.utils.urls import path_to_url
from tests.lib import create_file from tests.lib import create_file
@ -65,49 +62,6 @@ def test_unpack_http_url_with_urllib_response_without_content_type(data):
rmtree(temp_dir) rmtree(temp_dir)
def get_user_agent():
return PipSession().headers["User-Agent"]
def test_user_agent():
user_agent = get_user_agent()
assert user_agent.startswith("pip/%s" % pip.__version__)
@pytest.mark.parametrize('name, expected_like_ci', [
('BUILD_BUILDID', True),
('BUILD_ID', True),
('CI', True),
('PIP_IS_CI', True),
# Test a prefix substring of one of the variable names we use.
('BUILD', False),
])
def test_user_agent__ci(monkeypatch, name, expected_like_ci):
# Delete the variable names we use to check for CI to prevent the
# detection from always returning True in case the tests are being run
# under actual CI. It is okay to depend on CI_ENVIRONMENT_VARIABLES
# here (part of the code under test) because this setup step can only
# prevent false test failures. It can't cause a false test passage.
for ci_name in CI_ENVIRONMENT_VARIABLES:
monkeypatch.delenv(ci_name, raising=False)
# Confirm the baseline before setting the environment variable.
user_agent = get_user_agent()
assert '"ci":null' in user_agent
assert '"ci":true' not in user_agent
monkeypatch.setenv(name, 'true')
user_agent = get_user_agent()
assert ('"ci":true' in user_agent) == expected_like_ci
assert ('"ci":null' in user_agent) == (not expected_like_ci)
def test_user_agent_user_data(monkeypatch):
monkeypatch.setenv("PIP_USER_AGENT_USER_DATA", "some_string")
assert "some_string" in PipSession().headers["User-Agent"]
class FakeStream(object): class FakeStream(object):
def __init__(self, contents): def __init__(self, contents):
@ -498,173 +452,3 @@ def test_unpack_file_url_excludes_expected_dirs(tmpdir, exclude_dir):
assert not os.path.isfile(dst_excluded_file) assert not os.path.isfile(dst_excluded_file)
assert os.path.isfile(dst_included_file) assert os.path.isfile(dst_included_file)
assert os.path.isdir(dst_included_dir) assert os.path.isdir(dst_included_dir)
class TestPipSession:
def test_cache_defaults_off(self):
session = PipSession()
assert not hasattr(session.adapters["http://"], "cache")
assert not hasattr(session.adapters["https://"], "cache")
def test_cache_is_enabled(self, tmpdir):
session = PipSession(cache=tmpdir.joinpath("test-cache"))
assert hasattr(session.adapters["https://"], "cache")
assert (session.adapters["https://"].cache.directory ==
tmpdir.joinpath("test-cache"))
def test_http_cache_is_not_enabled(self, tmpdir):
session = PipSession(cache=tmpdir.joinpath("test-cache"))
assert not hasattr(session.adapters["http://"], "cache")
def test_insecure_host_adapter(self, tmpdir):
session = PipSession(
cache=tmpdir.joinpath("test-cache"),
trusted_hosts=["example.com"],
)
assert "https://example.com/" in session.adapters
# Check that the "port wildcard" is present.
assert "https://example.com:" in session.adapters
# Check that the cache isn't enabled.
assert not hasattr(session.adapters["https://example.com/"], "cache")
def test_add_trusted_host(self):
# Leave a gap to test how the ordering is affected.
trusted_hosts = ['host1', 'host3']
session = PipSession(trusted_hosts=trusted_hosts)
insecure_adapter = session._insecure_adapter
prefix2 = 'https://host2/'
prefix3 = 'https://host3/'
prefix3_wildcard = 'https://host3:'
# Confirm some initial conditions as a baseline.
assert session.pip_trusted_origins == [
('host1', None), ('host3', None)
]
assert session.adapters[prefix3] is insecure_adapter
assert session.adapters[prefix3_wildcard] is insecure_adapter
assert prefix2 not in session.adapters
# Test adding a new host.
session.add_trusted_host('host2')
assert session.pip_trusted_origins == [
('host1', None), ('host3', None), ('host2', None)
]
# Check that prefix3 is still present.
assert session.adapters[prefix3] is insecure_adapter
assert session.adapters[prefix2] is insecure_adapter
# Test that adding the same host doesn't create a duplicate.
session.add_trusted_host('host3')
assert session.pip_trusted_origins == [
('host1', None), ('host3', None), ('host2', None)
], 'actual: {}'.format(session.pip_trusted_origins)
session.add_trusted_host('host4:8080')
prefix4 = 'https://host4:8080/'
assert session.pip_trusted_origins == [
('host1', None), ('host3', None),
('host2', None), ('host4', 8080)
]
assert session.adapters[prefix4] is insecure_adapter
def test_add_trusted_host__logging(self, caplog):
"""
Test logging when add_trusted_host() is called.
"""
trusted_hosts = ['host0', 'host1']
session = PipSession(trusted_hosts=trusted_hosts)
with caplog.at_level(logging.INFO):
# Test adding an existing host.
session.add_trusted_host('host1', source='somewhere')
session.add_trusted_host('host2')
# Test calling add_trusted_host() on the same host twice.
session.add_trusted_host('host2')
actual = [(r.levelname, r.message) for r in caplog.records]
# Observe that "host0" isn't included in the logs.
expected = [
('INFO', "adding trusted host: 'host1' (from somewhere)"),
('INFO', "adding trusted host: 'host2'"),
('INFO', "adding trusted host: 'host2'"),
]
assert actual == expected
def test_iter_secure_origins(self):
trusted_hosts = ['host1', 'host2', 'host3:8080']
session = PipSession(trusted_hosts=trusted_hosts)
actual = list(session.iter_secure_origins())
assert len(actual) == 9
# Spot-check that SECURE_ORIGINS is included.
assert actual[0] == ('https', '*', '*')
assert actual[-3:] == [
('*', 'host1', '*'),
('*', 'host2', '*'),
('*', 'host3', 8080)
]
def test_iter_secure_origins__trusted_hosts_empty(self):
"""
Test iter_secure_origins() after passing trusted_hosts=[].
"""
session = PipSession(trusted_hosts=[])
actual = list(session.iter_secure_origins())
assert len(actual) == 6
# Spot-check that SECURE_ORIGINS is included.
assert actual[0] == ('https', '*', '*')
@pytest.mark.parametrize(
'location, trusted, expected',
[
("http://pypi.org/something", [], False),
("https://pypi.org/something", [], True),
("git+http://pypi.org/something", [], False),
("git+https://pypi.org/something", [], True),
("git+ssh://git@pypi.org/something", [], True),
("http://localhost", [], True),
("http://127.0.0.1", [], True),
("http://example.com/something/", [], False),
("http://example.com/something/", ["example.com"], True),
# Try changing the case.
("http://eXample.com/something/", ["example.cOm"], True),
# Test hosts with port.
("http://example.com:8080/something/", ["example.com"], True),
# Test a trusted_host with a port.
("http://example.com:8080/something/", ["example.com:8080"], True),
("http://example.com/something/", ["example.com:8080"], False),
(
"http://example.com:8888/something/",
["example.com:8080"],
False
),
],
)
def test_is_secure_origin(self, caplog, location, trusted, expected):
class MockLogger(object):
def __init__(self):
self.called = False
def warning(self, *args, **kwargs):
self.called = True
session = PipSession(trusted_hosts=trusted)
actual = session.is_secure_origin(location)
assert actual == expected
log_records = [(r.levelname, r.message) for r in caplog.records]
if expected:
assert not log_records
return
assert len(log_records) == 1
actual_level, actual_message = log_records[0]
assert actual_level == 'WARNING'
assert 'is not a trusted or secure host' in actual_message

View file

@ -4,7 +4,6 @@ import pytest
from pip._vendor.packaging.specifiers import SpecifierSet from pip._vendor.packaging.specifiers import SpecifierSet
from pip._internal.collector import LinkCollector from pip._internal.collector import LinkCollector
from pip._internal.download import PipSession
from pip._internal.index import ( from pip._internal.index import (
CandidateEvaluator, CandidateEvaluator,
CandidatePreferences, CandidatePreferences,
@ -21,6 +20,7 @@ from pip._internal.models.link import Link
from pip._internal.models.search_scope import SearchScope from pip._internal.models.search_scope import SearchScope
from pip._internal.models.selection_prefs import SelectionPreferences from pip._internal.models.selection_prefs import SelectionPreferences
from pip._internal.models.target_python import TargetPython from pip._internal.models.target_python import TargetPython
from pip._internal.network.session import PipSession
from pip._internal.pep425tags import get_supported from pip._internal.pep425tags import get_supported
from pip._internal.utils.hashes import Hashes from pip._internal.utils.hashes import Hashes
from tests.lib import CURRENT_PY_VERSION_INFO from tests.lib import CURRENT_PY_VERSION_INFO

View file

@ -0,0 +1,221 @@
import logging
import pytest
from pip import __version__
from pip._internal.network.session import CI_ENVIRONMENT_VARIABLES, PipSession
def get_user_agent():
return PipSession().headers["User-Agent"]
def test_user_agent():
user_agent = get_user_agent()
assert user_agent.startswith("pip/{}".format(__version__))
@pytest.mark.parametrize('name, expected_like_ci', [
('BUILD_BUILDID', True),
('BUILD_ID', True),
('CI', True),
('PIP_IS_CI', True),
# Test a prefix substring of one of the variable names we use.
('BUILD', False),
])
def test_user_agent__ci(monkeypatch, name, expected_like_ci):
# Delete the variable names we use to check for CI to prevent the
# detection from always returning True in case the tests are being run
# under actual CI. It is okay to depend on CI_ENVIRONMENT_VARIABLES
# here (part of the code under test) because this setup step can only
# prevent false test failures. It can't cause a false test passage.
for ci_name in CI_ENVIRONMENT_VARIABLES:
monkeypatch.delenv(ci_name, raising=False)
# Confirm the baseline before setting the environment variable.
user_agent = get_user_agent()
assert '"ci":null' in user_agent
assert '"ci":true' not in user_agent
monkeypatch.setenv(name, 'true')
user_agent = get_user_agent()
assert ('"ci":true' in user_agent) == expected_like_ci
assert ('"ci":null' in user_agent) == (not expected_like_ci)
def test_user_agent_user_data(monkeypatch):
monkeypatch.setenv("PIP_USER_AGENT_USER_DATA", "some_string")
assert "some_string" in PipSession().headers["User-Agent"]
class TestPipSession:
def test_cache_defaults_off(self):
session = PipSession()
assert not hasattr(session.adapters["http://"], "cache")
assert not hasattr(session.adapters["https://"], "cache")
def test_cache_is_enabled(self, tmpdir):
cache_directory = tmpdir.joinpath("test-cache")
session = PipSession(cache=cache_directory)
assert hasattr(session.adapters["https://"], "cache")
assert (
session.adapters["https://"].cache.directory == cache_directory
)
def test_http_cache_is_not_enabled(self, tmpdir):
session = PipSession(cache=tmpdir.joinpath("test-cache"))
assert not hasattr(session.adapters["http://"], "cache")
def test_insecure_host_adapter(self, tmpdir):
session = PipSession(
cache=tmpdir.joinpath("test-cache"),
trusted_hosts=["example.com"],
)
assert "https://example.com/" in session.adapters
# Check that the "port wildcard" is present.
assert "https://example.com:" in session.adapters
# Check that the cache isn't enabled.
assert not hasattr(session.adapters["https://example.com/"], "cache")
def test_add_trusted_host(self):
# Leave a gap to test how the ordering is affected.
trusted_hosts = ['host1', 'host3']
session = PipSession(trusted_hosts=trusted_hosts)
insecure_adapter = session._insecure_adapter
prefix2 = 'https://host2/'
prefix3 = 'https://host3/'
prefix3_wildcard = 'https://host3:'
# Confirm some initial conditions as a baseline.
assert session.pip_trusted_origins == [
('host1', None), ('host3', None)
]
assert session.adapters[prefix3] is insecure_adapter
assert session.adapters[prefix3_wildcard] is insecure_adapter
assert prefix2 not in session.adapters
# Test adding a new host.
session.add_trusted_host('host2')
assert session.pip_trusted_origins == [
('host1', None), ('host3', None), ('host2', None)
]
# Check that prefix3 is still present.
assert session.adapters[prefix3] is insecure_adapter
assert session.adapters[prefix2] is insecure_adapter
# Test that adding the same host doesn't create a duplicate.
session.add_trusted_host('host3')
assert session.pip_trusted_origins == [
('host1', None), ('host3', None), ('host2', None)
], 'actual: {}'.format(session.pip_trusted_origins)
session.add_trusted_host('host4:8080')
prefix4 = 'https://host4:8080/'
assert session.pip_trusted_origins == [
('host1', None), ('host3', None),
('host2', None), ('host4', 8080)
]
assert session.adapters[prefix4] is insecure_adapter
def test_add_trusted_host__logging(self, caplog):
"""
Test logging when add_trusted_host() is called.
"""
trusted_hosts = ['host0', 'host1']
session = PipSession(trusted_hosts=trusted_hosts)
with caplog.at_level(logging.INFO):
# Test adding an existing host.
session.add_trusted_host('host1', source='somewhere')
session.add_trusted_host('host2')
# Test calling add_trusted_host() on the same host twice.
session.add_trusted_host('host2')
actual = [(r.levelname, r.message) for r in caplog.records]
# Observe that "host0" isn't included in the logs.
expected = [
('INFO', "adding trusted host: 'host1' (from somewhere)"),
('INFO', "adding trusted host: 'host2'"),
('INFO', "adding trusted host: 'host2'"),
]
assert actual == expected
def test_iter_secure_origins(self):
trusted_hosts = ['host1', 'host2', 'host3:8080']
session = PipSession(trusted_hosts=trusted_hosts)
actual = list(session.iter_secure_origins())
assert len(actual) == 9
# Spot-check that SECURE_ORIGINS is included.
assert actual[0] == ('https', '*', '*')
assert actual[-3:] == [
('*', 'host1', '*'),
('*', 'host2', '*'),
('*', 'host3', 8080)
]
def test_iter_secure_origins__trusted_hosts_empty(self):
"""
Test iter_secure_origins() after passing trusted_hosts=[].
"""
session = PipSession(trusted_hosts=[])
actual = list(session.iter_secure_origins())
assert len(actual) == 6
# Spot-check that SECURE_ORIGINS is included.
assert actual[0] == ('https', '*', '*')
@pytest.mark.parametrize(
'location, trusted, expected',
[
("http://pypi.org/something", [], False),
("https://pypi.org/something", [], True),
("git+http://pypi.org/something", [], False),
("git+https://pypi.org/something", [], True),
("git+ssh://git@pypi.org/something", [], True),
("http://localhost", [], True),
("http://127.0.0.1", [], True),
("http://example.com/something/", [], False),
("http://example.com/something/", ["example.com"], True),
# Try changing the case.
("http://eXample.com/something/", ["example.cOm"], True),
# Test hosts with port.
("http://example.com:8080/something/", ["example.com"], True),
# Test a trusted_host with a port.
("http://example.com:8080/something/", ["example.com:8080"], True),
("http://example.com/something/", ["example.com:8080"], False),
(
"http://example.com:8888/something/",
["example.com:8080"],
False
),
],
)
def test_is_secure_origin(self, caplog, location, trusted, expected):
class MockLogger(object):
def __init__(self):
self.called = False
def warning(self, *args, **kwargs):
self.called = True
session = PipSession(trusted_hosts=trusted)
actual = session.is_secure_origin(location)
assert actual == expected
log_records = [(r.levelname, r.message) for r in caplog.records]
if expected:
assert not log_records
return
assert len(log_records) == 1
actual_level, actual_message = log_records[0]
assert actual_level == 'WARNING'
assert 'is not a trusted or secure host' in actual_message

View file

@ -11,7 +11,6 @@ from pip._vendor.packaging.markers import Marker
from pip._vendor.packaging.requirements import Requirement from pip._vendor.packaging.requirements import Requirement
from pip._internal.commands import create_command from pip._internal.commands import create_command
from pip._internal.download import PipSession
from pip._internal.exceptions import ( from pip._internal.exceptions import (
HashErrors, HashErrors,
InstallationError, InstallationError,
@ -19,6 +18,7 @@ from pip._internal.exceptions import (
PreviousBuildDirError, PreviousBuildDirError,
) )
from pip._internal.legacy_resolve import Resolver from pip._internal.legacy_resolve import Resolver
from pip._internal.network.session import PipSession
from pip._internal.operations.prepare import RequirementPreparer from pip._internal.operations.prepare import RequirementPreparer
from pip._internal.req import InstallRequirement, RequirementSet from pip._internal.req import InstallRequirement, RequirementSet
from pip._internal.req.constructors import ( from pip._internal.req.constructors import (

View file

@ -8,12 +8,12 @@ from mock import Mock, patch
from pretend import stub from pretend import stub
import pip._internal.index import pip._internal.index
from pip._internal.download import PipSession
from pip._internal.exceptions import ( from pip._internal.exceptions import (
InstallationError, InstallationError,
RequirementsFileParseError, RequirementsFileParseError,
) )
from pip._internal.models.format_control import FormatControl from pip._internal.models.format_control import FormatControl
from pip._internal.network.session import PipSession
from pip._internal.req.constructors import ( from pip._internal.req.constructors import (
install_req_from_editable, install_req_from_editable,
install_req_from_line, install_req_from_line,

View file

@ -9,8 +9,8 @@ import pytest
from mock import patch from mock import patch
from pip._vendor import pkg_resources from pip._vendor import pkg_resources
from pip._internal.download import PipSession
from pip._internal.index import InstallationCandidate from pip._internal.index import InstallationCandidate
from pip._internal.network.session import PipSession
from pip._internal.utils import outdated from pip._internal.utils import outdated
from pip._internal.utils.outdated import ( from pip._internal.utils.outdated import (
SelfCheckState, SelfCheckState,