1
1
Fork 0
mirror of https://github.com/pypa/pip synced 2023-12-13 21:30:23 +01:00
pip/tests/unit/test_download.py

696 lines
23 KiB
Python
Raw Normal View History

import functools
import hashlib
2013-05-28 23:58:08 +02:00
import os
2019-02-11 22:35:30 +01:00
import sys
from io import BytesIO
2017-05-16 12:16:30 +02:00
from shutil import copy, rmtree
2013-05-28 23:58:08 +02:00
from tempfile import mkdtemp
import pytest
from mock import Mock, patch
import pip
from pip._internal.download import (
CI_ENVIRONMENT_VARIABLES, MultiDomainBasicAuth, PipSession, SafeFileCache,
_download_http_url, parse_content_disposition, sanitize_content_filename,
unpack_file_url, unpack_http_url, url_to_path,
)
from pip._internal.exceptions import HashMismatch
from pip._internal.models.link import Link
from pip._internal.utils.hashes import Hashes
2019-08-03 08:14:41 +02:00
from pip._internal.utils.misc import path_to_url
from tests.lib import Path, create_file
2013-05-28 23:58:08 +02:00
@pytest.fixture(scope="function")
def cache_tmpdir(tmpdir):
cache_dir = tmpdir.joinpath("cache")
cache_dir.mkdir(parents=True)
yield cache_dir
def test_unpack_http_url_with_urllib_response_without_content_type(data):
2013-05-28 23:58:08 +02:00
"""
It should download and unpack files even if no Content-Type header exists
"""
2013-08-16 14:04:27 +02:00
_real_session = PipSession()
def _fake_session_get(*args, **kwargs):
resp = _real_session.get(*args, **kwargs)
del resp.headers["Content-Type"]
2013-05-28 23:58:08 +02:00
return resp
2013-08-16 14:04:27 +02:00
session = Mock()
session.get = _fake_session_get
uri = path_to_url(data.packages.joinpath("simple-1.0.tar.gz"))
2013-08-16 14:04:27 +02:00
link = Link(uri)
temp_dir = mkdtemp()
try:
unpack_http_url(
link,
temp_dir,
2013-08-16 14:04:27 +02:00
download_dir=None,
session=session,
)
assert set(os.listdir(temp_dir)) == {
'PKG-INFO', 'setup.cfg', 'setup.py', 'simple', 'simple.egg-info'
}
2013-08-16 14:04:27 +02:00
finally:
rmtree(temp_dir)
2013-05-28 23:58:08 +02:00
2019-02-18 08:03:51 +01:00
def get_user_agent():
return PipSession().headers["User-Agent"]
2013-05-28 23:58:08 +02:00
def test_user_agent():
2019-02-18 08:03:51 +01:00
user_agent = get_user_agent()
assert user_agent.startswith("pip/%s" % pip.__version__)
@pytest.mark.parametrize('name, expected_like_ci', [
('BUILD_BUILDID', True),
('BUILD_ID', True),
('CI', True),
('PIP_IS_CI', True),
2019-02-18 08:03:51 +01:00
# Test a prefix substring of one of the variable names we use.
('BUILD', False),
])
def test_user_agent__ci(monkeypatch, name, expected_like_ci):
2019-02-18 08:03:51 +01:00
# Delete the variable names we use to check for CI to prevent the
# detection from always returning True in case the tests are being run
# under actual CI. It is okay to depend on CI_ENVIRONMENT_VARIABLES
# here (part of the code under test) because this setup step can only
# prevent false test failures. It can't cause a false test passage.
for ci_name in CI_ENVIRONMENT_VARIABLES:
monkeypatch.delenv(ci_name, raising=False)
2019-02-18 08:03:51 +01:00
# Confirm the baseline before setting the environment variable.
user_agent = get_user_agent()
assert '"ci":null' in user_agent
assert '"ci":true' not in user_agent
2019-02-18 08:03:51 +01:00
monkeypatch.setenv(name, 'true')
user_agent = get_user_agent()
assert ('"ci":true' in user_agent) == expected_like_ci
assert ('"ci":null' in user_agent) == (not expected_like_ci)
2013-05-30 23:03:04 +02:00
def test_user_agent_user_data(monkeypatch):
monkeypatch.setenv("PIP_USER_AGENT_USER_DATA", "some_string")
assert "some_string" in PipSession().headers["User-Agent"]
class FakeStream(object):
2013-08-16 14:04:27 +02:00
def __init__(self, contents):
self._io = BytesIO(contents)
def read(self, size, decode_content=None):
return self._io.read(size)
def stream(self, size, decode_content=None):
2013-08-16 14:04:27 +02:00
yield self._io.read(size)
def release_conn(self):
pass
class MockResponse(object):
def __init__(self, contents):
self.raw = FakeStream(contents)
self.content = contents
self.request = None
self.status_code = 200
self.connection = None
self.url = None
self.headers = {}
self.history = []
2013-08-16 14:04:27 +02:00
def raise_for_status(self):
pass
class MockConnection(object):
def _send(self, req, **kwargs):
raise NotImplementedError("_send must be overridden for tests")
def send(self, req, **kwargs):
resp = self._send(req, **kwargs)
for cb in req.hooks.get("response", []):
cb(resp)
return resp
class MockRequest(object):
def __init__(self, url):
self.url = url
self.headers = {}
self.hooks = {}
def register_hook(self, event_name, callback):
self.hooks.setdefault(event_name, []).append(callback)
@patch('pip._internal.download.unpack_file')
2013-08-16 14:04:27 +02:00
def test_unpack_http_url_bad_downloaded_checksum(mock_unpack_file):
2013-05-30 23:03:04 +02:00
"""
If already-downloaded file has bad checksum, re-download.
"""
base_url = 'http://www.example.com/somepackage.tgz'
contents = b'downloaded'
download_hash = hashlib.new('sha1', contents)
link = Link(base_url + '#sha1=' + download_hash.hexdigest())
2013-08-16 14:04:27 +02:00
session = Mock()
session.get = Mock()
response = session.get.return_value = MockResponse(contents)
response.headers = {'content-type': 'application/x-tar'}
response.url = base_url
download_dir = mkdtemp()
try:
downloaded_file = os.path.join(download_dir, 'somepackage.tgz')
create_file(downloaded_file, 'some contents')
unpack_http_url(
link,
'location',
2013-08-16 14:04:27 +02:00
download_dir=download_dir,
session=session,
Add checks against requirements-file-dwelling hashes for most kinds of packages. Close #1175. * Add --require-hashes option. This is handy in deployment scripts to force application authors to hash their requirements. It is also a convenient way to get pip to show computed hashes for a virgin, unhashed requirements file. Eventually, additions to `pip freeze` should fill a superset of this use case. * In --require-hashes mode, at least one hash is required to match for each requirement. * Option-based requirements (--sha256=...) turn on --require-hashes mode implicitly. * Internet-derived URL-based hashes are "necessary but not sufficient": they do not satisfy --require-hashes mode when they match, but they are still used to guard against transmission errors. * Other URL-based requirements (#md5=...) are treated just like flag-based ones, except they don't turn on --require-hashes. * Complain informatively, with the most devastating errors first so you don't chase your tail all day only to run up against a brick wall at the end. This also means we don't complain that a hash is missing, only for the user to find, after fixing it, that we have no idea how to even compute a hash for that type of requirement. * Complain about unpinned requirements when hash-checking mode is on, lest they cause the user surprise later. * Complain about missing hashes. * Complain about requirement types we don't know how to hash (like VCS ones and local dirs). * Have InstallRequirement keep its original Link around (original_link) so we can differentiate between URL hashes from requirements files and ones downloaded from the (untrustworthy) internet. * Remove test_download_hashes, which is obsolete. Similar coverage is provided in test_utils.TestHashes and the various hash cases in test_req.py.
2015-09-09 19:01:53 +02:00
hashes=Hashes({'sha1': [download_hash.hexdigest()]})
2013-08-16 14:04:27 +02:00
)
# despite existence of downloaded file with bad hash, downloaded again
2013-08-16 14:04:27 +02:00
session.get.assert_called_once_with(
'http://www.example.com/somepackage.tgz',
headers={"Accept-Encoding": "identity"},
2013-08-16 14:04:27 +02:00
stream=True,
)
# cached file is replaced with newly downloaded file
with open(downloaded_file) as fh:
assert fh.read() == 'downloaded'
finally:
rmtree(download_dir)
@pytest.mark.parametrize("filename, expected", [
('dir/file', 'file'),
('../file', 'file'),
('../../file', 'file'),
('../', ''),
('../..', '..'),
('/', ''),
])
def test_sanitize_content_filename(filename, expected):
"""
Test inputs where the result is the same for Windows and non-Windows.
"""
assert sanitize_content_filename(filename) == expected
@pytest.mark.parametrize("filename, win_expected, non_win_expected", [
('dir\\file', 'file', 'dir\\file'),
('..\\file', 'file', '..\\file'),
('..\\..\\file', 'file', '..\\..\\file'),
('..\\', '', '..\\'),
('..\\..', '..', '..\\..'),
('\\', '', '\\'),
])
def test_sanitize_content_filename__platform_dependent(
filename,
win_expected,
non_win_expected
):
"""
Test inputs where the result is different for Windows and non-Windows.
"""
if sys.platform == 'win32':
expected = win_expected
else:
expected = non_win_expected
assert sanitize_content_filename(filename) == expected
@pytest.mark.parametrize("content_disposition, default_filename, expected", [
('attachment;filename="../file"', 'df', 'file'),
])
def test_parse_content_disposition(
content_disposition,
default_filename,
expected
):
actual = parse_content_disposition(content_disposition, default_filename)
assert actual == expected
def test_download_http_url__no_directory_traversal(tmpdir):
"""
Test that directory traversal doesn't happen on download when the
Content-Disposition header contains a filename with a ".." path part.
"""
mock_url = 'http://www.example.com/whatever.tgz'
contents = b'downloaded'
link = Link(mock_url)
session = Mock()
resp = MockResponse(contents)
resp.url = mock_url
resp.headers = {
# Set the content-type to a random value to prevent
# mimetypes.guess_extension from guessing the extension.
'content-type': 'random',
'content-disposition': 'attachment;filename="../out_dir_file"'
}
session.get.return_value = resp
download_dir = tmpdir.joinpath('download')
os.mkdir(download_dir)
file_path, content_type = _download_http_url(
link,
session,
download_dir,
hashes=None,
progress_bar='on',
)
# The file should be downloaded to download_dir.
actual = os.listdir(download_dir)
assert actual == ['out_dir_file']
2019-02-11 22:35:30 +01:00
@pytest.mark.parametrize("url,win_expected,non_win_expected", [
('file:tmp', 'tmp', 'tmp'),
('file:c:/path/to/file', r'C:\path\to\file', 'c:/path/to/file'),
('file:/path/to/file', r'\path\to\file', '/path/to/file'),
('file://localhost/tmp/file', r'\tmp\file', '/tmp/file'),
('file://localhost/c:/tmp/file', r'C:\tmp\file', '/c:/tmp/file'),
('file://somehost/tmp/file', r'\\somehost\tmp\file', None),
('file:///tmp/file', r'\tmp\file', '/tmp/file'),
('file:///c:/tmp/file', r'C:\tmp\file', '/c:/tmp/file'),
])
def test_url_to_path(url, win_expected, non_win_expected):
if sys.platform == 'win32':
expected_path = win_expected
else:
expected_path = non_win_expected
if expected_path is None:
with pytest.raises(ValueError):
url_to_path(url)
else:
assert url_to_path(url) == expected_path
@pytest.mark.skipif("sys.platform != 'win32'")
2014-12-18 17:52:26 +01:00
def test_url_to_path_path_to_url_symmetry_win():
path = r'C:\tmp\file'
assert url_to_path(path_to_url(path)) == path
unc_path = r'\\unc\share\path'
assert url_to_path(path_to_url(unc_path)) == unc_path
class Test_unpack_file_url(object):
def prep(self, tmpdir, data):
self.build_dir = tmpdir.joinpath('build')
self.download_dir = tmpdir.joinpath('download')
os.mkdir(self.build_dir)
os.mkdir(self.download_dir)
self.dist_file = "simple-1.0.tar.gz"
self.dist_file2 = "simple-2.0.tar.gz"
self.dist_path = data.packages.joinpath(self.dist_file)
self.dist_path2 = data.packages.joinpath(self.dist_file2)
self.dist_url = Link(path_to_url(self.dist_path))
self.dist_url2 = Link(path_to_url(self.dist_path2))
def test_unpack_file_url_no_download(self, tmpdir, data):
self.prep(tmpdir, data)
unpack_file_url(self.dist_url, self.build_dir)
assert os.path.isdir(os.path.join(self.build_dir, 'simple'))
assert not os.path.isfile(
os.path.join(self.download_dir, self.dist_file))
def test_unpack_file_url_and_download(self, tmpdir, data):
self.prep(tmpdir, data)
unpack_file_url(self.dist_url, self.build_dir,
download_dir=self.download_dir)
assert os.path.isdir(os.path.join(self.build_dir, 'simple'))
assert os.path.isfile(os.path.join(self.download_dir, self.dist_file))
def test_unpack_file_url_download_already_exists(self, tmpdir,
data, monkeypatch):
self.prep(tmpdir, data)
# add in previous download (copy simple-2.0 as simple-1.0)
# so we can tell it didn't get overwritten
dest_file = os.path.join(self.download_dir, self.dist_file)
copy(self.dist_path2, dest_file)
2017-05-24 06:29:07 +02:00
with open(self.dist_path2, 'rb') as f:
dist_path2_md5 = hashlib.md5(f.read()).hexdigest()
unpack_file_url(self.dist_url, self.build_dir,
download_dir=self.download_dir)
# our hash should be the same, i.e. not overwritten by simple-1.0 hash
2017-05-24 06:29:07 +02:00
with open(dest_file, 'rb') as f:
assert dist_path2_md5 == hashlib.md5(f.read()).hexdigest()
def test_unpack_file_url_bad_hash(self, tmpdir, data,
monkeypatch):
"""
Test when the file url hash fragment is wrong
"""
self.prep(tmpdir, data)
url = '{}#md5=bogus'.format(self.dist_url.url)
dist_url = Link(url)
with pytest.raises(HashMismatch):
unpack_file_url(dist_url,
Add checks against requirements-file-dwelling hashes for most kinds of packages. Close #1175. * Add --require-hashes option. This is handy in deployment scripts to force application authors to hash their requirements. It is also a convenient way to get pip to show computed hashes for a virgin, unhashed requirements file. Eventually, additions to `pip freeze` should fill a superset of this use case. * In --require-hashes mode, at least one hash is required to match for each requirement. * Option-based requirements (--sha256=...) turn on --require-hashes mode implicitly. * Internet-derived URL-based hashes are "necessary but not sufficient": they do not satisfy --require-hashes mode when they match, but they are still used to guard against transmission errors. * Other URL-based requirements (#md5=...) are treated just like flag-based ones, except they don't turn on --require-hashes. * Complain informatively, with the most devastating errors first so you don't chase your tail all day only to run up against a brick wall at the end. This also means we don't complain that a hash is missing, only for the user to find, after fixing it, that we have no idea how to even compute a hash for that type of requirement. * Complain about unpinned requirements when hash-checking mode is on, lest they cause the user surprise later. * Complain about missing hashes. * Complain about requirement types we don't know how to hash (like VCS ones and local dirs). * Have InstallRequirement keep its original Link around (original_link) so we can differentiate between URL hashes from requirements files and ones downloaded from the (untrustworthy) internet. * Remove test_download_hashes, which is obsolete. Similar coverage is provided in test_utils.TestHashes and the various hash cases in test_req.py.
2015-09-09 19:01:53 +02:00
self.build_dir,
hashes=Hashes({'md5': ['bogus']}))
def test_unpack_file_url_download_bad_hash(self, tmpdir, data,
monkeypatch):
"""
Test when existing download has different hash from the file url
fragment
"""
self.prep(tmpdir, data)
# add in previous download (copy simple-2.0 as simple-1.0 so it's wrong
# hash)
dest_file = os.path.join(self.download_dir, self.dist_file)
copy(self.dist_path2, dest_file)
2017-05-24 06:29:07 +02:00
with open(self.dist_path, 'rb') as f:
dist_path_md5 = hashlib.md5(f.read()).hexdigest()
with open(dest_file, 'rb') as f:
dist_path2_md5 = hashlib.md5(f.read()).hexdigest()
assert dist_path_md5 != dist_path2_md5
url = '{}#md5={}'.format(self.dist_url.url, dist_path_md5)
dist_url = Link(url)
unpack_file_url(dist_url, self.build_dir,
Add checks against requirements-file-dwelling hashes for most kinds of packages. Close #1175. * Add --require-hashes option. This is handy in deployment scripts to force application authors to hash their requirements. It is also a convenient way to get pip to show computed hashes for a virgin, unhashed requirements file. Eventually, additions to `pip freeze` should fill a superset of this use case. * In --require-hashes mode, at least one hash is required to match for each requirement. * Option-based requirements (--sha256=...) turn on --require-hashes mode implicitly. * Internet-derived URL-based hashes are "necessary but not sufficient": they do not satisfy --require-hashes mode when they match, but they are still used to guard against transmission errors. * Other URL-based requirements (#md5=...) are treated just like flag-based ones, except they don't turn on --require-hashes. * Complain informatively, with the most devastating errors first so you don't chase your tail all day only to run up against a brick wall at the end. This also means we don't complain that a hash is missing, only for the user to find, after fixing it, that we have no idea how to even compute a hash for that type of requirement. * Complain about unpinned requirements when hash-checking mode is on, lest they cause the user surprise later. * Complain about missing hashes. * Complain about requirement types we don't know how to hash (like VCS ones and local dirs). * Have InstallRequirement keep its original Link around (original_link) so we can differentiate between URL hashes from requirements files and ones downloaded from the (untrustworthy) internet. * Remove test_download_hashes, which is obsolete. Similar coverage is provided in test_utils.TestHashes and the various hash cases in test_req.py.
2015-09-09 19:01:53 +02:00
download_dir=self.download_dir,
hashes=Hashes({'md5': [dist_path_md5]}))
# confirm hash is for simple1-1.0
# the previous bad download has been removed
2017-05-24 06:29:07 +02:00
with open(dest_file, 'rb') as f:
assert hashlib.md5(f.read()).hexdigest() == dist_path_md5
def test_unpack_file_url_thats_a_dir(self, tmpdir, data):
self.prep(tmpdir, data)
dist_path = data.packages.joinpath("FSPkg")
dist_url = Link(path_to_url(dist_path))
unpack_file_url(dist_url, self.build_dir,
download_dir=self.download_dir)
assert os.path.isdir(os.path.join(self.build_dir, 'fspkg'))
2019-08-03 03:17:53 +02:00
@pytest.mark.parametrize('exclude_dir', [
'.nox',
'.tox'
])
2019-08-03 08:14:41 +02:00
def test_unpack_file_url_excludes_expected_dirs(tmpdir, exclude_dir):
src_dir = tmpdir / 'src'
dst_dir = tmpdir / 'dst'
src_included_file = Path.joinpath(src_dir, 'file.txt')
src_excluded_dir = Path.joinpath(src_dir, exclude_dir)
src_excluded_file = Path.joinpath(src_dir, exclude_dir, 'file.txt')
src_included_dir = Path.joinpath(src_dir, 'subdir', exclude_dir)
2019-08-03 03:17:53 +02:00
# set up source directory
2019-08-03 08:14:41 +02:00
src_excluded_dir.mkdir(parents=True)
src_included_dir.mkdir(parents=True)
Path.touch(src_included_file)
Path.touch(src_excluded_file)
dst_included_file = Path.joinpath(dst_dir, 'file.txt')
dst_excluded_dir = Path.joinpath(dst_dir, exclude_dir)
dst_excluded_file = Path.joinpath(dst_dir, exclude_dir, 'file.txt')
dst_included_dir = Path.joinpath(dst_dir, 'subdir', exclude_dir)
src_link = Link(path_to_url(src_dir))
unpack_file_url(
src_link,
dst_dir,
download_dir=None
)
assert not os.path.isdir(dst_excluded_dir)
assert not os.path.isfile(dst_excluded_file)
assert os.path.isfile(dst_included_file)
assert os.path.isdir(dst_included_dir)
2019-08-03 03:17:53 +02:00
class TestSafeFileCache:
"""
The no_perms test are useless on Windows since SafeFileCache uses
pip._internal.utils.filesystem.check_path_owner which is based on
os.geteuid which is absent on Windows.
"""
def test_cache_roundtrip(self, cache_tmpdir):
cache = SafeFileCache(cache_tmpdir)
assert cache.get("test key") is None
cache.set("test key", b"a test string")
assert cache.get("test key") == b"a test string"
cache.delete("test key")
assert cache.get("test key") is None
@pytest.mark.skipif("sys.platform == 'win32'")
def test_safe_get_no_perms(self, cache_tmpdir, monkeypatch):
os.chmod(cache_tmpdir, 000)
monkeypatch.setattr(os.path, "exists", lambda x: True)
cache = SafeFileCache(cache_tmpdir)
cache.get("foo")
@pytest.mark.skipif("sys.platform == 'win32'")
def test_safe_set_no_perms(self, cache_tmpdir):
os.chmod(cache_tmpdir, 000)
cache = SafeFileCache(cache_tmpdir)
cache.set("foo", b"bar")
@pytest.mark.skipif("sys.platform == 'win32'")
def test_safe_delete_no_perms(self, cache_tmpdir):
os.chmod(cache_tmpdir, 000)
cache = SafeFileCache(cache_tmpdir)
cache.delete("foo")
class TestPipSession:
def test_cache_defaults_off(self):
session = PipSession()
assert not hasattr(session.adapters["http://"], "cache")
assert not hasattr(session.adapters["https://"], "cache")
def test_cache_is_enabled(self, tmpdir):
session = PipSession(cache=tmpdir.joinpath("test-cache"))
assert hasattr(session.adapters["https://"], "cache")
2015-02-24 13:46:10 +01:00
assert (session.adapters["https://"].cache.directory ==
tmpdir.joinpath("test-cache"))
def test_http_cache_is_not_enabled(self, tmpdir):
session = PipSession(cache=tmpdir.joinpath("test-cache"))
assert not hasattr(session.adapters["http://"], "cache")
def test_insecure_host_cache_is_not_enabled(self, tmpdir):
session = PipSession(
cache=tmpdir.joinpath("test-cache"),
insecure_hosts=["example.com"],
)
assert not hasattr(session.adapters["https://example.com/"], "cache")
def test_get_credentials():
auth = MultiDomainBasicAuth()
get = auth._get_url_and_credentials
# Check URL parsing
assert get("http://foo:bar@example.com/path") \
== ('http://example.com/path', 'foo', 'bar')
assert auth.passwords['example.com'] == ('foo', 'bar')
auth.passwords['example.com'] = ('user', 'pass')
assert get("http://foo:bar@example.com/path") \
== ('http://example.com/path', 'user', 'pass')
def test_get_index_url_credentials():
auth = MultiDomainBasicAuth(index_urls=[
"http://foo:bar@example.com/path"
])
get = functools.partial(
auth._get_new_credentials,
allow_netrc=False,
allow_keyring=False
)
# Check resolution of indexes
assert get("http://example.com/path/path2") == ('foo', 'bar')
assert get("http://example.com/path3/path2") == (None, None)
class KeyringModuleV1(object):
"""Represents the supported API of keyring before get_credential
was added.
"""
def __init__(self):
self.saved_passwords = []
def get_password(self, system, username):
if system == "example.com" and username:
return username + "!netloc"
if system == "http://example.com/path2" and username:
return username + "!url"
return None
def set_password(self, system, username, password):
self.saved_passwords.append((system, username, password))
@pytest.mark.parametrize('url, expect', (
("http://example.com/path1", (None, None)),
# path1 URLs will be resolved by netloc
("http://user@example.com/path1", ("user", "user!netloc")),
("http://user2@example.com/path1", ("user2", "user2!netloc")),
# path2 URLs will be resolved by index URL
("http://example.com/path2/path3", (None, None)),
("http://foo@example.com/path2/path3", ("foo", "foo!url")),
))
def test_keyring_get_password(monkeypatch, url, expect):
monkeypatch.setattr('pip._internal.download.keyring', KeyringModuleV1())
auth = MultiDomainBasicAuth(index_urls=["http://example.com/path2"])
actual = auth._get_new_credentials(url, allow_netrc=False,
allow_keyring=True)
assert actual == expect
def test_keyring_get_password_after_prompt(monkeypatch):
monkeypatch.setattr('pip._internal.download.keyring', KeyringModuleV1())
auth = MultiDomainBasicAuth()
def ask_input(prompt):
assert prompt == "User for example.com: "
return "user"
monkeypatch.setattr('pip._internal.download.ask_input', ask_input)
actual = auth._prompt_for_password("example.com")
assert actual == ("user", "user!netloc", False)
def test_keyring_get_password_username_in_index(monkeypatch):
monkeypatch.setattr('pip._internal.download.keyring', KeyringModuleV1())
auth = MultiDomainBasicAuth(index_urls=["http://user@example.com/path2"])
get = functools.partial(
auth._get_new_credentials,
allow_netrc=False,
allow_keyring=True
)
assert get("http://example.com/path2/path3") == ("user", "user!url")
assert get("http://example.com/path4/path1") == (None, None)
@pytest.mark.parametrize("response_status, creds, expect_save", (
(403, ("user", "pass", True), False),
(200, ("user", "pass", True), True,),
(200, ("user", "pass", False), False,),
))
def test_keyring_set_password(monkeypatch, response_status, creds,
expect_save):
keyring = KeyringModuleV1()
monkeypatch.setattr('pip._internal.download.keyring', keyring)
auth = MultiDomainBasicAuth(prompting=True)
monkeypatch.setattr(auth, '_get_url_and_credentials',
lambda u: (u, None, None))
monkeypatch.setattr(auth, '_prompt_for_password', lambda *a: creds)
if creds[2]:
# when _prompt_for_password indicates to save, we should save
def should_save_password_to_keyring(*a):
return True
else:
# when _prompt_for_password indicates not to save, we should
# never call this function
def should_save_password_to_keyring(*a):
assert False, ("_should_save_password_to_keyring should not be " +
"called")
monkeypatch.setattr(auth, '_should_save_password_to_keyring',
should_save_password_to_keyring)
req = MockRequest("https://example.com")
resp = MockResponse(b"")
resp.url = req.url
connection = MockConnection()
def _send(sent_req, **kwargs):
assert sent_req is req
assert "Authorization" in sent_req.headers
r = MockResponse(b"")
r.status_code = response_status
return r
connection._send = _send
resp.request = req
resp.status_code = 401
resp.connection = connection
auth.handle_401(resp)
if expect_save:
assert keyring.saved_passwords == [("example.com", creds[0], creds[1])]
else:
assert keyring.saved_passwords == []
class KeyringModuleV2(object):
"""Represents the current supported API of keyring"""
class Credential(object):
def __init__(self, username, password):
self.username = username
self.password = password
def get_password(self, system, username):
assert False, "get_password should not ever be called"
def get_credential(self, system, username):
if system == "http://example.com/path2":
return self.Credential("username", "url")
if system == "example.com":
return self.Credential("username", "netloc")
return None
@pytest.mark.parametrize('url, expect', (
("http://example.com/path1", ("username", "netloc")),
("http://example.com/path2/path3", ("username", "url")),
("http://user2@example.com/path2/path3", ("username", "url")),
))
def test_keyring_get_credential(monkeypatch, url, expect):
monkeypatch.setattr(pip._internal.download, 'keyring', KeyringModuleV2())
auth = MultiDomainBasicAuth(index_urls=["http://example.com/path2"])
assert auth._get_new_credentials(url, allow_netrc=False,
allow_keyring=True) \
== expect