1
1
Fork 0
mirror of https://github.com/pypa/pip synced 2023-12-13 21:30:23 +01:00

Support including port part in trusted-host (#6909)

This commit is contained in:
Frost Ming 2019-08-26 07:26:01 +08:00 committed by Chris Jerdonek
parent d2b7082932
commit 8ac22141c2
8 changed files with 111 additions and 43 deletions

1
news/6886.feature Normal file
View file

@ -0,0 +1 @@
Support including a port number in ``--trusted-host`` for both HTTP and HTTPS.

View file

@ -391,8 +391,8 @@ def trusted_host():
action="append", action="append",
metavar="HOSTNAME", metavar="HOSTNAME",
default=[], default=[],
help="Mark this host as trusted, even though it does not have valid " help="Mark this host or host:port pair as trusted, even though it "
"or any HTTPS.", "does not have valid or any HTTPS.",
) )

View file

@ -50,7 +50,7 @@ from pip._internal.utils.misc import (
format_size, format_size,
get_installed_version, get_installed_version,
hide_url, hide_url,
netloc_has_port, parse_netloc,
path_to_display, path_to_display,
path_to_url, path_to_url,
remove_auth_from_url, remove_auth_from_url,
@ -77,7 +77,7 @@ if MYPY_CHECK_RUNNING:
from pip._internal.vcs.versioncontrol import AuthInfo, VersionControl from pip._internal.vcs.versioncontrol import AuthInfo, VersionControl
Credentials = Tuple[str, str, str] Credentials = Tuple[str, str, str]
SecureOrigin = Tuple[str, str, Optional[str]] SecureOrigin = Tuple[str, str, Optional[Union[int, str]]]
if PY2: if PY2:
CopytreeKwargs = TypedDict( CopytreeKwargs = TypedDict(
@ -586,7 +586,7 @@ class PipSession(requests.Session):
# Namespace the attribute with "pip_" just in case to prevent # Namespace the attribute with "pip_" just in case to prevent
# possible conflicts with the base class. # possible conflicts with the base class.
self.pip_trusted_hosts = [] # type: List[str] self.pip_trusted_origins = [] # type: List[Tuple[str, Optional[int]]]
# Attach our User Agent to the request # Attach our User Agent to the request
self.headers["User-Agent"] = user_agent() self.headers["User-Agent"] = user_agent()
@ -670,11 +670,12 @@ class PipSession(requests.Session):
msg += ' (from {})'.format(source) msg += ' (from {})'.format(source)
logger.info(msg) logger.info(msg)
if host not in self.pip_trusted_hosts: host_port = parse_netloc(host)
self.pip_trusted_hosts.append(host) if host_port not in self.pip_trusted_origins:
self.pip_trusted_origins.append(host_port)
self.mount(build_url_from_netloc(host) + '/', self._insecure_adapter) self.mount(build_url_from_netloc(host) + '/', self._insecure_adapter)
if not netloc_has_port(host): if not host_port[1]:
# Mount wildcard ports for the same host. # Mount wildcard ports for the same host.
self.mount( self.mount(
build_url_from_netloc(host) + ':', build_url_from_netloc(host) + ':',
@ -685,8 +686,8 @@ class PipSession(requests.Session):
# type: () -> Iterator[SecureOrigin] # type: () -> Iterator[SecureOrigin]
for secure_origin in SECURE_ORIGINS: for secure_origin in SECURE_ORIGINS:
yield secure_origin yield secure_origin
for host in self.pip_trusted_hosts: for host, port in self.pip_trusted_origins:
yield ('*', host, '*') yield ('*', host, '*' if port is None else port)
def is_secure_origin(self, location): def is_secure_origin(self, location):
# type: (Link) -> bool # type: (Link) -> bool

View file

@ -38,6 +38,7 @@ from pip._internal.utils.misc import (
ARCHIVE_EXTENSIONS, ARCHIVE_EXTENSIONS,
SUPPORTED_EXTENSIONS, SUPPORTED_EXTENSIONS,
WHEEL_EXTENSION, WHEEL_EXTENSION,
build_netloc,
path_to_url, path_to_url,
redact_password_from_url, redact_password_from_url,
) )
@ -947,7 +948,8 @@ class PackageFinder(object):
@property @property
def trusted_hosts(self): def trusted_hosts(self):
# type: () -> Iterable[str] # type: () -> Iterable[str]
return iter(self.session.pip_trusted_hosts) for host_port in self.session.pip_trusted_origins:
yield build_netloc(*host_port)
@property @property
def allow_all_prereleases(self): def allow_all_prereleases(self):

View file

@ -1129,6 +1129,19 @@ def path_to_url(path):
return url return url
def build_netloc(host, port):
# type: (str, Optional[int]) -> str
"""
Build a netloc from a host-port pair
"""
if port is None:
return host
if ':' in host:
# Only wrap host with square brackets when it is IPv6
host = '[{}]'.format(host)
return '{}:{}'.format(host, port)
def build_url_from_netloc(netloc, scheme='https'): def build_url_from_netloc(netloc, scheme='https'):
# type: (str, str) -> str # type: (str, str) -> str
""" """
@ -1140,14 +1153,14 @@ def build_url_from_netloc(netloc, scheme='https'):
return '{}://{}'.format(scheme, netloc) return '{}://{}'.format(scheme, netloc)
def netloc_has_port(netloc): def parse_netloc(netloc):
# type: (str) -> bool # type: (str) -> Tuple[str, Optional[int]]
""" """
Return whether the netloc has a port part. Return the host-port pair from a netloc.
""" """
url = build_url_from_netloc(netloc) url = build_url_from_netloc(netloc)
parsed = urllib_parse.urlparse(url) parsed = urllib_parse.urlparse(url)
return bool(parsed.port) return parsed.hostname, parsed.port
def split_auth_from_netloc(netloc): def split_auth_from_netloc(netloc):

View file

@ -634,24 +634,39 @@ class TestPipSession:
insecure_adapter = session._insecure_adapter insecure_adapter = session._insecure_adapter
prefix2 = 'https://host2/' prefix2 = 'https://host2/'
prefix3 = 'https://host3/' prefix3 = 'https://host3/'
prefix3_wildcard = 'https://host3:'
# Confirm some initial conditions as a baseline. # Confirm some initial conditions as a baseline.
assert session.pip_trusted_hosts == ['host1', 'host3'] assert session.pip_trusted_origins == [
('host1', None), ('host3', None)
]
assert session.adapters[prefix3] is insecure_adapter assert session.adapters[prefix3] is insecure_adapter
assert session.adapters[prefix3_wildcard] is insecure_adapter
assert prefix2 not in session.adapters assert prefix2 not in session.adapters
# Test adding a new host. # Test adding a new host.
session.add_trusted_host('host2') session.add_trusted_host('host2')
assert session.pip_trusted_hosts == ['host1', 'host3', 'host2'] assert session.pip_trusted_origins == [
('host1', None), ('host3', None), ('host2', None)
]
# Check that prefix3 is still present. # Check that prefix3 is still present.
assert session.adapters[prefix3] is insecure_adapter assert session.adapters[prefix3] is insecure_adapter
assert session.adapters[prefix2] is insecure_adapter assert session.adapters[prefix2] is insecure_adapter
# Test that adding the same host doesn't create a duplicate. # Test that adding the same host doesn't create a duplicate.
session.add_trusted_host('host3') session.add_trusted_host('host3')
assert session.pip_trusted_hosts == ['host1', 'host3', 'host2'], ( assert session.pip_trusted_origins == [
'actual: {}'.format(session.pip_trusted_hosts) ('host1', None), ('host3', None), ('host2', None)
) ], 'actual: {}'.format(session.pip_trusted_origins)
session.add_trusted_host('host4:8080')
prefix4 = 'https://host4:8080/'
assert session.pip_trusted_origins == [
('host1', None), ('host3', None),
('host2', None), ('host4', 8080)
]
assert session.adapters[prefix4] is insecure_adapter
def test_add_trusted_host__logging(self, caplog): def test_add_trusted_host__logging(self, caplog):
""" """
@ -676,16 +691,17 @@ class TestPipSession:
assert actual == expected assert actual == expected
def test_iter_secure_origins(self): def test_iter_secure_origins(self):
trusted_hosts = ['host1', 'host2'] trusted_hosts = ['host1', 'host2', 'host3:8080']
session = PipSession(trusted_hosts=trusted_hosts) session = PipSession(trusted_hosts=trusted_hosts)
actual = list(session.iter_secure_origins()) actual = list(session.iter_secure_origins())
assert len(actual) == 8 assert len(actual) == 9
# Spot-check that SECURE_ORIGINS is included. # Spot-check that SECURE_ORIGINS is included.
assert actual[0] == ('https', '*', '*') assert actual[0] == ('https', '*', '*')
assert actual[-2:] == [ assert actual[-3:] == [
('*', 'host1', '*'), ('*', 'host1', '*'),
('*', 'host2', '*'), ('*', 'host2', '*'),
('*', 'host3', 8080)
] ]
def test_iter_secure_origins__trusted_hosts_empty(self): def test_iter_secure_origins__trusted_hosts_empty(self):
@ -713,6 +729,16 @@ class TestPipSession:
("http://example.com/something/", ["example.com"], True), ("http://example.com/something/", ["example.com"], True),
# Try changing the case. # Try changing the case.
("http://eXample.com/something/", ["example.cOm"], True), ("http://eXample.com/something/", ["example.cOm"], True),
# Test hosts with port.
("http://example.com:8080/something/", ["example.com"], True),
# Test a trusted_host with a port.
("http://example.com:8080/something/", ["example.com:8080"], True),
("http://example.com/something/", ["example.com:8080"], False),
(
"http://example.com:8888/something/",
["example.com:8080"],
False
),
], ],
) )
def test_is_secure_origin(self, caplog, location, trusted, expected): def test_is_secure_origin(self, caplog, location, trusted, expected):

View file

@ -345,19 +345,23 @@ class TestProcessLine(object):
def test_set_finder_trusted_host(self, caplog, session, finder): def test_set_finder_trusted_host(self, caplog, session, finder):
with caplog.at_level(logging.INFO): with caplog.at_level(logging.INFO):
list(process_line( list(process_line(
"--trusted-host=host", "file.txt", 1, finder=finder, "--trusted-host=host1 --trusted-host=host2:8080",
session=session, "file.txt", 1, finder=finder, session=session,
)) ))
assert list(finder.trusted_hosts) == ['host'] assert list(finder.trusted_hosts) == ['host1', 'host2:8080']
session = finder.session session = finder.session
assert session.adapters['https://host/'] is session._insecure_adapter assert session.adapters['https://host1/'] is session._insecure_adapter
assert (
session.adapters['https://host2:8080/']
is session._insecure_adapter
)
# Test the log message. # Test the log message.
actual = [(r.levelname, r.message) for r in caplog.records] actual = [(r.levelname, r.message) for r in caplog.records]
expected = [ expected = (
('INFO', "adding trusted host: 'host' (from line 1 of file.txt)"), 'INFO', "adding trusted host: 'host1' (from line 1 of file.txt)"
] )
assert actual == expected assert expected in actual
def test_noop_always_unzip(self, finder): def test_noop_always_unzip(self, finder):
# noop, but confirm it can be set # noop, but confirm it can be set

View file

@ -38,6 +38,7 @@ from pip._internal.utils.glibc import (
from pip._internal.utils.hashes import Hashes, MissingHashes from pip._internal.utils.hashes import Hashes, MissingHashes
from pip._internal.utils.misc import ( from pip._internal.utils.misc import (
HiddenText, HiddenText,
build_netloc,
build_url_from_netloc, build_url_from_netloc,
call_subprocess, call_subprocess,
egg_link_path, egg_link_path,
@ -49,9 +50,9 @@ from pip._internal.utils.misc import (
hide_value, hide_value,
make_command, make_command,
make_subprocess_output_error, make_subprocess_output_error,
netloc_has_port,
normalize_path, normalize_path,
normalize_version_info, normalize_version_info,
parse_netloc,
path_to_display, path_to_display,
path_to_url, path_to_url,
redact_netloc, redact_netloc,
@ -1232,29 +1233,49 @@ def test_path_to_url_win():
assert path_to_url('file') == 'file:' + urllib_request.pathname2url(path) assert path_to_url('file') == 'file:' + urllib_request.pathname2url(path)
@pytest.mark.parametrize('netloc, expected_url, expected_has_port', [ @pytest.mark.parametrize('host_port, expected_netloc', [
# Test domain name. # Test domain name.
('example.com', 'https://example.com', False), (('example.com', None), 'example.com'),
('example.com:5000', 'https://example.com:5000', True), (('example.com', 5000), 'example.com:5000'),
# Test IPv4 address. # Test IPv4 address.
('127.0.0.1', 'https://127.0.0.1', False), (('127.0.0.1', None), '127.0.0.1'),
('127.0.0.1:5000', 'https://127.0.0.1:5000', True), (('127.0.0.1', 5000), '127.0.0.1:5000'),
# Test bare IPv6 address. # Test bare IPv6 address.
('2001:DB6::1', 'https://[2001:DB6::1]', False), (('2001:db6::1', None), '2001:db6::1'),
# Test IPv6 with port. # Test IPv6 with port.
('[2001:DB6::1]:5000', 'https://[2001:DB6::1]:5000', True), (('2001:db6::1', 5000), '[2001:db6::1]:5000'),
])
def test_build_netloc(host_port, expected_netloc):
assert build_netloc(*host_port) == expected_netloc
@pytest.mark.parametrize('netloc, expected_url, expected_host_port', [
# Test domain name.
('example.com', 'https://example.com', ('example.com', None)),
('example.com:5000', 'https://example.com:5000', ('example.com', 5000)),
# Test IPv4 address.
('127.0.0.1', 'https://127.0.0.1', ('127.0.0.1', None)),
('127.0.0.1:5000', 'https://127.0.0.1:5000', ('127.0.0.1', 5000)),
# Test bare IPv6 address.
('2001:db6::1', 'https://[2001:db6::1]', ('2001:db6::1', None)),
# Test IPv6 with port.
(
'[2001:db6::1]:5000',
'https://[2001:db6::1]:5000',
('2001:db6::1', 5000)
),
# Test netloc with auth. # Test netloc with auth.
( (
'user:password@localhost:5000', 'user:password@localhost:5000',
'https://user:password@localhost:5000', 'https://user:password@localhost:5000',
True ('localhost', 5000)
) )
]) ])
def test_build_url_from_netloc_and_netloc_has_port( def test_build_url_from_netloc_and_parse_netloc(
netloc, expected_url, expected_has_port, netloc, expected_url, expected_host_port,
): ):
assert build_url_from_netloc(netloc) == expected_url assert build_url_from_netloc(netloc) == expected_url
assert netloc_has_port(netloc) is expected_has_port assert parse_netloc(netloc) == expected_host_port
@pytest.mark.parametrize('netloc, expected', [ @pytest.mark.parametrize('netloc, expected', [