import logging import os.path from textwrap import dedent import mock import pytest from mock import Mock, patch from pip._vendor import html5lib, requests from pip._vendor.six.moves.urllib import request as urllib_request from pip._internal.collector import ( HTMLPage, _clean_link, _determine_base_url, _get_html_page, _get_html_response, _NotHTML, _NotHTTP, group_locations, ) from import PipSession from pip._internal.models.index import PyPI from import Link from tests.lib import make_test_link_collector @pytest.mark.parametrize( "url", [ "", "file:///opt/data/pip-18.0.tar.gz", ], ) def test_get_html_response_archive_to_naive_scheme(url): """ `_get_html_response()` should error on an archive-like URL if the scheme does not allow "poking" without getting data. """ with pytest.raises(_NotHTTP): _get_html_response(url, session=mock.Mock(PipSession)) @pytest.mark.parametrize( "url, content_type", [ ("", "application/zip"), ("", "application/gzip"), ], ) def test_get_html_response_archive_to_http_scheme(url, content_type): """ `_get_html_response()` should send a HEAD request on an archive-like URL if the scheme supports it, and raise `_NotHTML` if the response isn't HTML. """ session = mock.Mock(PipSession) session.head.return_value = mock.Mock(**{ "request.method": "HEAD", "headers": {"Content-Type": content_type}, }) with pytest.raises(_NotHTML) as ctx: _get_html_response(url, session=session) session.assert_has_calls([, allow_redirects=True), ]) assert ctx.value.args == (content_type, "HEAD") @pytest.mark.parametrize( "url", [ "", "", ], ) def test_get_html_response_archive_to_http_scheme_is_html(url): """ `_get_html_response()` should work with archive-like URLs if the HEAD request is responded with text/html. """ session = mock.Mock(PipSession) session.head.return_value = mock.Mock(**{ "request.method": "HEAD", "headers": {"Content-Type": "text/html"}, }) session.get.return_value = mock.Mock(headers={"Content-Type": "text/html"}) resp = _get_html_response(url, session=session) assert resp is not None assert session.mock_calls == [, allow_redirects=True),,, headers={ "Accept": "text/html", "Cache-Control": "max-age=0", }),, ] @pytest.mark.parametrize( "url", [ "", "", "", ], ) def test_get_html_response_no_head(url): """ `_get_html_response()` shouldn't send a HEAD request if the URL does not look like an archive, only the GET request that retrieves data. """ session = mock.Mock(PipSession) # Mock the headers dict to ensure it is accessed. session.get.return_value = mock.Mock(headers=mock.Mock(**{ "get.return_value": "text/html", })) resp = _get_html_response(url, session=session) assert resp is not None assert session.head.call_count == 0 assert session.get.mock_calls == [, headers={ "Accept": "text/html", "Cache-Control": "max-age=0", }),,"Content-Type", ""), ] def test_get_html_response_dont_log_clear_text_password(caplog): """ `_get_html_response()` should redact the password from the index URL in its DEBUG log message. """ session = mock.Mock(PipSession) # Mock the headers dict to ensure it is accessed. session.get.return_value = mock.Mock(headers=mock.Mock(**{ "get.return_value": "text/html", })) caplog.set_level(logging.DEBUG) resp = _get_html_response( "", session=session ) assert resp is not None assert len(caplog.records) == 1 record = caplog.records[0] assert record.levelname == 'DEBUG' assert record.message.splitlines() == [ "Getting page https://user:****", ] @pytest.mark.parametrize( ("html", "url", "expected"), [ (b"", "", ""), ( b"" b"" b"", "", "", ), ( b"" b"" b"", "", "", ), ], ) def test_determine_base_url(html, url, expected): document = html5lib.parse( html, transport_encoding=None, namespaceHTMLElements=False, ) assert _determine_base_url(document, url) == expected @pytest.mark.parametrize( ("url", "clean_url"), [ # URL with hostname and port. Port separator should not be quoted. ("https://localhost.localdomain:8181/path/with space/", "https://localhost.localdomain:8181/path/with%20space/"), # URL that is already properly quoted. The quoting `%` # characters should not be quoted again. ("https://localhost.localdomain:8181/path/with%20quoted%20space/", "https://localhost.localdomain:8181/path/with%20quoted%20space/"), # URL with IPv4 address and port. (" space/", ""), # URL with IPv6 address and port. The `[]` brackets around the # IPv6 address should not be quoted. ("https://[fd00:0:0:236::100]:8181/path/with space/", "https://[fd00:0:0:236::100]:8181/path/with%20space/"), # URL with query. The leading `?` should not be quoted. ("https://localhost.localdomain:8181/path/with/query?request=test", "https://localhost.localdomain:8181/path/with/query?request=test"), # URL with colon in the path portion. ("https://localhost.localdomain:8181/path:/with:/colon", "https://localhost.localdomain:8181/path%3A/with%3A/colon"), # URL with something that looks like a drive letter, but is # not. The `:` should be quoted. ("https://localhost.localdomain/T:/path/", "https://localhost.localdomain/T%3A/path/"), # VCS URL containing revision string. ("git+ssh:// to/repo.git@1.0#egg=my-package-1.0", "git+ssh://"), # URL with Windows drive letter. The `:` after the drive # letter should not be quoted. The trailing `/` should be # removed. pytest.param( "file:///T:/path/with spaces/", "file:///T:/path/with%20spaces", marks=pytest.mark.skipif("sys.platform != 'win32'"), ), # URL with Windows drive letter, running on non-windows # platform. The `:` after the drive should be quoted. pytest.param( "file:///T:/path/with spaces/", "file:///T%3A/path/with%20spaces/", marks=pytest.mark.skipif("sys.platform == 'win32'"), ), ] ) def test_clean_link(url, clean_url): assert(_clean_link(url) == clean_url) class TestHTMLPage: @pytest.mark.parametrize( ('anchor_html, expected'), [ # Test not present. ('', None), # Test present with no value. ('', ''), # Test the empty string. ('', ''), # Test a non-empty string. ('', 'error'), # Test a value with an escaped character. ('', 'version < 1'), # Test a yanked reason with a non-ascii character. (u'', u'curlyquote \u2018'), ] ) def test_iter_links__yanked_reason(self, anchor_html, expected): html = ( # Mark this as a unicode string for Python 2 since anchor_html # can contain non-ascii. u'' '{}' ).format(anchor_html) html_bytes = html.encode('utf-8') page = HTMLPage(html_bytes, url='') links = list(page.iter_links()) link, = links actual = link.yanked_reason assert actual == expected def test_request_http_error(caplog): caplog.set_level(logging.DEBUG) link = Link('http://localhost') session = Mock(PipSession) session.get.return_value = resp = Mock() resp.raise_for_status.side_effect = requests.HTTPError('Http error') assert _get_html_page(link, session=session) is None assert ( 'Could not fetch URL http://localhost: Http error - skipping' in caplog.text ) def test_request_retries(caplog): caplog.set_level(logging.DEBUG) link = Link('http://localhost') session = Mock(PipSession) session.get.side_effect = requests.exceptions.RetryError('Retry error') assert _get_html_page(link, session=session) is None assert ( 'Could not fetch URL http://localhost: Retry error - skipping' in caplog.text ) @pytest.mark.parametrize( "url, vcs_scheme", [ ("svn+", "svn"), ("git+", "git"), ], ) def test_get_html_page_invalid_scheme(caplog, url, vcs_scheme): """`_get_html_page()` should error if an invalid scheme is given. Only file:, http:, https:, and ftp: are allowed. """ with caplog.at_level(logging.DEBUG): page = _get_html_page(Link(url), session=mock.Mock(PipSession)) assert page is None assert caplog.record_tuples == [ ( "pip._internal.collector", logging.DEBUG, "Cannot look at {} URL {}".format(vcs_scheme, url), ), ] def make_fake_html_page(url): html = dedent(u"""\ abc-1.0.tar.gz """) content = html.encode('utf-8') headers = {} return HTMLPage(content, url=url, headers=headers) def test_get_html_page_directory_append_index(tmpdir): """`_get_html_page()` should append "index.html" to a directory URL. """ dirpath = tmpdir.mkdir("something") dir_url = "file:///{}".format( urllib_request.pathname2url(dirpath).lstrip("/"), ) session = mock.Mock(PipSession) with mock.patch("pip._internal.collector._get_html_response") as mock_func: _get_html_page(Link(dir_url), session=session) assert mock_func.mock_calls == [ "{}/index.html".format(dir_url.rstrip("/")), session=session, ), ] def test_group_locations__file_expand_dir(data): """ Test that a file:// dir gets listdir run with expand_dir """ files, urls = group_locations([data.find_links], expand_dir=True) assert files and not urls, ( "files and not urls should have been found at find-links url: %s" % data.find_links ) def test_group_locations__file_not_find_link(data): """ Test that a file:// url dir that's not a find-link, doesn't get a listdir run """ files, urls = group_locations([data.index_url("empty_with_pkg")]) assert urls and not files, "urls, but not files should have been found" def test_group_locations__non_existing_path(): """ Test that a non-existing path is ignored. """ files, urls = group_locations([os.path.join('this', 'doesnt', 'exist')]) assert not urls and not files, "nothing should have been found" def check_links_include(links, names): """ Assert that the given list of Link objects includes, for each of the given names, a link whose URL has a base name matching that name. """ for name in names: assert any(link.url.endswith(name) for link in links), ( 'name {!r} not among links: {}'.format(name, links) ) class TestLinkCollector(object): @patch('pip._internal.collector._get_html_response') def test_collect_links(self, mock_get_html_response, data): expected_url = '' fake_page = make_fake_html_page(expected_url) mock_get_html_response.return_value = fake_page link_collector = make_test_link_collector( find_links=[data.find_links], index_urls=[PyPI.simple_url], ) actual = link_collector.collect_links('twine') mock_get_html_response.assert_called_once_with( expected_url, session=link_collector.session, ) # Spot-check the CollectedLinks return value. assert len(actual.files) > 20 check_links_include(actual.files, names=['simple-1.0.tar.gz']) assert len(actual.find_links) == 1 check_links_include(actual.find_links, names=['packages']) actual_pages = actual.pages assert list(actual_pages) == [expected_url] actual_page_links = actual_pages[expected_url] assert len(actual_page_links) == 1 assert actual_page_links[0].url == ( '' )