diff --git a/news/6886.feature b/news/6886.feature new file mode 100644 index 000000000..b4f500b1b --- /dev/null +++ b/news/6886.feature @@ -0,0 +1 @@ +Support including a port number in ``--trusted-host`` for both HTTP and HTTPS. diff --git a/src/pip/_internal/cli/cmdoptions.py b/src/pip/_internal/cli/cmdoptions.py index 74efaecf9..1bfecef54 100644 --- a/src/pip/_internal/cli/cmdoptions.py +++ b/src/pip/_internal/cli/cmdoptions.py @@ -391,8 +391,8 @@ def trusted_host(): action="append", metavar="HOSTNAME", default=[], - help="Mark this host as trusted, even though it does not have valid " - "or any HTTPS.", + help="Mark this host or host:port pair as trusted, even though it " + "does not have valid or any HTTPS.", ) diff --git a/src/pip/_internal/download.py b/src/pip/_internal/download.py index 72d4cfc12..b681ccb06 100644 --- a/src/pip/_internal/download.py +++ b/src/pip/_internal/download.py @@ -50,7 +50,7 @@ from pip._internal.utils.misc import ( format_size, get_installed_version, hide_url, - netloc_has_port, + parse_netloc, path_to_display, path_to_url, remove_auth_from_url, @@ -77,7 +77,7 @@ if MYPY_CHECK_RUNNING: from pip._internal.vcs.versioncontrol import AuthInfo, VersionControl Credentials = Tuple[str, str, str] - SecureOrigin = Tuple[str, str, Optional[str]] + SecureOrigin = Tuple[str, str, Optional[Union[int, str]]] if PY2: CopytreeKwargs = TypedDict( @@ -586,7 +586,7 @@ class PipSession(requests.Session): # Namespace the attribute with "pip_" just in case to prevent # possible conflicts with the base class. - self.pip_trusted_hosts = [] # type: List[str] + self.pip_trusted_origins = [] # type: List[Tuple[str, Optional[int]]] # Attach our User Agent to the request self.headers["User-Agent"] = user_agent() @@ -670,11 +670,12 @@ class PipSession(requests.Session): msg += ' (from {})'.format(source) logger.info(msg) - if host not in self.pip_trusted_hosts: - self.pip_trusted_hosts.append(host) + host_port = parse_netloc(host) + if host_port not in self.pip_trusted_origins: + self.pip_trusted_origins.append(host_port) self.mount(build_url_from_netloc(host) + '/', self._insecure_adapter) - if not netloc_has_port(host): + if not host_port[1]: # Mount wildcard ports for the same host. self.mount( build_url_from_netloc(host) + ':', @@ -685,8 +686,8 @@ class PipSession(requests.Session): # type: () -> Iterator[SecureOrigin] for secure_origin in SECURE_ORIGINS: yield secure_origin - for host in self.pip_trusted_hosts: - yield ('*', host, '*') + for host, port in self.pip_trusted_origins: + yield ('*', host, '*' if port is None else port) def is_secure_origin(self, location): # type: (Link) -> bool diff --git a/src/pip/_internal/index.py b/src/pip/_internal/index.py index 71331861e..93c198433 100644 --- a/src/pip/_internal/index.py +++ b/src/pip/_internal/index.py @@ -38,6 +38,7 @@ from pip._internal.utils.misc import ( ARCHIVE_EXTENSIONS, SUPPORTED_EXTENSIONS, WHEEL_EXTENSION, + build_netloc, path_to_url, redact_password_from_url, ) @@ -947,7 +948,8 @@ class PackageFinder(object): @property def trusted_hosts(self): # type: () -> Iterable[str] - return iter(self.session.pip_trusted_hosts) + for host_port in self.session.pip_trusted_origins: + yield build_netloc(*host_port) @property def allow_all_prereleases(self): diff --git a/src/pip/_internal/utils/misc.py b/src/pip/_internal/utils/misc.py index f6da2a5ab..d19df9459 100644 --- a/src/pip/_internal/utils/misc.py +++ b/src/pip/_internal/utils/misc.py @@ -1129,6 +1129,19 @@ def path_to_url(path): return url +def build_netloc(host, port): + # type: (str, Optional[int]) -> str + """ + Build a netloc from a host-port pair + """ + if port is None: + return host + if ':' in host: + # Only wrap host with square brackets when it is IPv6 + host = '[{}]'.format(host) + return '{}:{}'.format(host, port) + + def build_url_from_netloc(netloc, scheme='https'): # type: (str, str) -> str """ @@ -1140,14 +1153,14 @@ def build_url_from_netloc(netloc, scheme='https'): return '{}://{}'.format(scheme, netloc) -def netloc_has_port(netloc): - # type: (str) -> bool +def parse_netloc(netloc): + # type: (str) -> Tuple[str, Optional[int]] """ - Return whether the netloc has a port part. + Return the host-port pair from a netloc. """ url = build_url_from_netloc(netloc) parsed = urllib_parse.urlparse(url) - return bool(parsed.port) + return parsed.hostname, parsed.port def split_auth_from_netloc(netloc): diff --git a/tests/unit/test_download.py b/tests/unit/test_download.py index 42265f327..9c0ccc2cf 100644 --- a/tests/unit/test_download.py +++ b/tests/unit/test_download.py @@ -634,24 +634,39 @@ class TestPipSession: insecure_adapter = session._insecure_adapter prefix2 = 'https://host2/' prefix3 = 'https://host3/' + prefix3_wildcard = 'https://host3:' # Confirm some initial conditions as a baseline. - assert session.pip_trusted_hosts == ['host1', 'host3'] + assert session.pip_trusted_origins == [ + ('host1', None), ('host3', None) + ] assert session.adapters[prefix3] is insecure_adapter + assert session.adapters[prefix3_wildcard] is insecure_adapter + assert prefix2 not in session.adapters # Test adding a new host. session.add_trusted_host('host2') - assert session.pip_trusted_hosts == ['host1', 'host3', 'host2'] + assert session.pip_trusted_origins == [ + ('host1', None), ('host3', None), ('host2', None) + ] # Check that prefix3 is still present. assert session.adapters[prefix3] is insecure_adapter assert session.adapters[prefix2] is insecure_adapter # Test that adding the same host doesn't create a duplicate. session.add_trusted_host('host3') - assert session.pip_trusted_hosts == ['host1', 'host3', 'host2'], ( - 'actual: {}'.format(session.pip_trusted_hosts) - ) + assert session.pip_trusted_origins == [ + ('host1', None), ('host3', None), ('host2', None) + ], 'actual: {}'.format(session.pip_trusted_origins) + + session.add_trusted_host('host4:8080') + prefix4 = 'https://host4:8080/' + assert session.pip_trusted_origins == [ + ('host1', None), ('host3', None), + ('host2', None), ('host4', 8080) + ] + assert session.adapters[prefix4] is insecure_adapter def test_add_trusted_host__logging(self, caplog): """ @@ -676,16 +691,17 @@ class TestPipSession: assert actual == expected def test_iter_secure_origins(self): - trusted_hosts = ['host1', 'host2'] + trusted_hosts = ['host1', 'host2', 'host3:8080'] session = PipSession(trusted_hosts=trusted_hosts) actual = list(session.iter_secure_origins()) - assert len(actual) == 8 + assert len(actual) == 9 # Spot-check that SECURE_ORIGINS is included. assert actual[0] == ('https', '*', '*') - assert actual[-2:] == [ + assert actual[-3:] == [ ('*', 'host1', '*'), ('*', 'host2', '*'), + ('*', 'host3', 8080) ] def test_iter_secure_origins__trusted_hosts_empty(self): @@ -713,6 +729,16 @@ class TestPipSession: ("http://example.com/something/", ["example.com"], True), # Try changing the case. ("http://eXample.com/something/", ["example.cOm"], True), + # Test hosts with port. + ("http://example.com:8080/something/", ["example.com"], True), + # Test a trusted_host with a port. + ("http://example.com:8080/something/", ["example.com:8080"], True), + ("http://example.com/something/", ["example.com:8080"], False), + ( + "http://example.com:8888/something/", + ["example.com:8080"], + False + ), ], ) def test_is_secure_origin(self, caplog, location, trusted, expected): diff --git a/tests/unit/test_req_file.py b/tests/unit/test_req_file.py index a11532707..443a76054 100644 --- a/tests/unit/test_req_file.py +++ b/tests/unit/test_req_file.py @@ -345,19 +345,23 @@ class TestProcessLine(object): def test_set_finder_trusted_host(self, caplog, session, finder): with caplog.at_level(logging.INFO): list(process_line( - "--trusted-host=host", "file.txt", 1, finder=finder, - session=session, + "--trusted-host=host1 --trusted-host=host2:8080", + "file.txt", 1, finder=finder, session=session, )) - assert list(finder.trusted_hosts) == ['host'] + assert list(finder.trusted_hosts) == ['host1', 'host2:8080'] session = finder.session - assert session.adapters['https://host/'] is session._insecure_adapter + assert session.adapters['https://host1/'] is session._insecure_adapter + assert ( + session.adapters['https://host2:8080/'] + is session._insecure_adapter + ) # Test the log message. actual = [(r.levelname, r.message) for r in caplog.records] - expected = [ - ('INFO', "adding trusted host: 'host' (from line 1 of file.txt)"), - ] - assert actual == expected + expected = ( + 'INFO', "adding trusted host: 'host1' (from line 1 of file.txt)" + ) + assert expected in actual def test_noop_always_unzip(self, finder): # noop, but confirm it can be set diff --git a/tests/unit/test_utils.py b/tests/unit/test_utils.py index ea252ceb2..51cf2a23a 100644 --- a/tests/unit/test_utils.py +++ b/tests/unit/test_utils.py @@ -38,6 +38,7 @@ from pip._internal.utils.glibc import ( from pip._internal.utils.hashes import Hashes, MissingHashes from pip._internal.utils.misc import ( HiddenText, + build_netloc, build_url_from_netloc, call_subprocess, egg_link_path, @@ -49,9 +50,9 @@ from pip._internal.utils.misc import ( hide_value, make_command, make_subprocess_output_error, - netloc_has_port, normalize_path, normalize_version_info, + parse_netloc, path_to_display, path_to_url, redact_netloc, @@ -1232,29 +1233,49 @@ def test_path_to_url_win(): assert path_to_url('file') == 'file:' + urllib_request.pathname2url(path) -@pytest.mark.parametrize('netloc, expected_url, expected_has_port', [ +@pytest.mark.parametrize('host_port, expected_netloc', [ # Test domain name. - ('example.com', 'https://example.com', False), - ('example.com:5000', 'https://example.com:5000', True), + (('example.com', None), 'example.com'), + (('example.com', 5000), 'example.com:5000'), # Test IPv4 address. - ('127.0.0.1', 'https://127.0.0.1', False), - ('127.0.0.1:5000', 'https://127.0.0.1:5000', True), + (('127.0.0.1', None), '127.0.0.1'), + (('127.0.0.1', 5000), '127.0.0.1:5000'), # Test bare IPv6 address. - ('2001:DB6::1', 'https://[2001:DB6::1]', False), + (('2001:db6::1', None), '2001:db6::1'), # Test IPv6 with port. - ('[2001:DB6::1]:5000', 'https://[2001:DB6::1]:5000', True), + (('2001:db6::1', 5000), '[2001:db6::1]:5000'), +]) +def test_build_netloc(host_port, expected_netloc): + assert build_netloc(*host_port) == expected_netloc + + +@pytest.mark.parametrize('netloc, expected_url, expected_host_port', [ + # Test domain name. + ('example.com', 'https://example.com', ('example.com', None)), + ('example.com:5000', 'https://example.com:5000', ('example.com', 5000)), + # Test IPv4 address. + ('127.0.0.1', 'https://127.0.0.1', ('127.0.0.1', None)), + ('127.0.0.1:5000', 'https://127.0.0.1:5000', ('127.0.0.1', 5000)), + # Test bare IPv6 address. + ('2001:db6::1', 'https://[2001:db6::1]', ('2001:db6::1', None)), + # Test IPv6 with port. + ( + '[2001:db6::1]:5000', + 'https://[2001:db6::1]:5000', + ('2001:db6::1', 5000) + ), # Test netloc with auth. ( 'user:password@localhost:5000', 'https://user:password@localhost:5000', - True + ('localhost', 5000) ) ]) -def test_build_url_from_netloc_and_netloc_has_port( - netloc, expected_url, expected_has_port, +def test_build_url_from_netloc_and_parse_netloc( + netloc, expected_url, expected_host_port, ): assert build_url_from_netloc(netloc) == expected_url - assert netloc_has_port(netloc) is expected_has_port + assert parse_netloc(netloc) == expected_host_port @pytest.mark.parametrize('netloc, expected', [