# -*- coding: utf-8 -*- """ util tests """ import os import shutil import stat import sys import tempfile import time import warnings import pytest from mock import Mock, patch from pip._vendor.six import BytesIO from pip._internal.exceptions import ( HashMismatch, HashMissing, InstallationError, UnsupportedPythonVersion, ) from pip._internal.utils.encoding import auto_decode from pip._internal.utils.glibc import check_glibc_version from pip._internal.utils.hashes import Hashes, MissingHashes from pip._internal.utils.misc import ( call_subprocess, egg_link_path, ensure_dir, get_installed_distributions, get_prog, make_vcs_requirement_url, normalize_path, remove_auth_from_url, rmtree, split_auth_from_netloc, untar_file, unzip_file, ) from pip._internal.utils.packaging import check_dist_requires_python from pip._internal.utils.temp_dir import TempDirectory class Tests_EgglinkPath: "util.egg_link_path() tests" def setup(self): project = 'foo' self.mock_dist = Mock(project_name=project) self.site_packages = 'SITE_PACKAGES' self.user_site = 'USER_SITE' self.user_site_egglink = os.path.join( self.user_site, '%s.egg-link' % project ) self.site_packages_egglink = os.path.join( self.site_packages, '%s.egg-link' % project, ) # patches from pip._internal.utils import misc as utils self.old_site_packages = utils.site_packages self.mock_site_packages = utils.site_packages = 'SITE_PACKAGES' self.old_running_under_virtualenv = utils.running_under_virtualenv self.mock_running_under_virtualenv = utils.running_under_virtualenv = \ Mock() self.old_virtualenv_no_global = utils.virtualenv_no_global self.mock_virtualenv_no_global = utils.virtualenv_no_global = Mock() self.old_user_site = utils.user_site self.mock_user_site = utils.user_site = self.user_site from os import path self.old_isfile = path.isfile self.mock_isfile = path.isfile = Mock() def teardown(self): from pip._internal.utils import misc as utils utils.site_packages = self.old_site_packages utils.running_under_virtualenv = self.old_running_under_virtualenv utils.virtualenv_no_global = self.old_virtualenv_no_global utils.user_site = self.old_user_site from os import path path.isfile = self.old_isfile def eggLinkInUserSite(self, egglink): return egglink == self.user_site_egglink def eggLinkInSitePackages(self, egglink): return egglink == self.site_packages_egglink # ####################### # # # egglink in usersite # # # ####################### # def test_egglink_in_usersite_notvenv(self): self.mock_virtualenv_no_global.return_value = False self.mock_running_under_virtualenv.return_value = False self.mock_isfile.side_effect = self.eggLinkInUserSite assert egg_link_path(self.mock_dist) == self.user_site_egglink def test_egglink_in_usersite_venv_noglobal(self): self.mock_virtualenv_no_global.return_value = True self.mock_running_under_virtualenv.return_value = True self.mock_isfile.side_effect = self.eggLinkInUserSite assert egg_link_path(self.mock_dist) is None def test_egglink_in_usersite_venv_global(self): self.mock_virtualenv_no_global.return_value = False self.mock_running_under_virtualenv.return_value = True self.mock_isfile.side_effect = self.eggLinkInUserSite assert egg_link_path(self.mock_dist) == self.user_site_egglink # ####################### # # # egglink in sitepkgs # # # ####################### # def test_egglink_in_sitepkgs_notvenv(self): self.mock_virtualenv_no_global.return_value = False self.mock_running_under_virtualenv.return_value = False self.mock_isfile.side_effect = self.eggLinkInSitePackages assert egg_link_path(self.mock_dist) == self.site_packages_egglink def test_egglink_in_sitepkgs_venv_noglobal(self): self.mock_virtualenv_no_global.return_value = True self.mock_running_under_virtualenv.return_value = True self.mock_isfile.side_effect = self.eggLinkInSitePackages assert egg_link_path(self.mock_dist) == self.site_packages_egglink def test_egglink_in_sitepkgs_venv_global(self): self.mock_virtualenv_no_global.return_value = False self.mock_running_under_virtualenv.return_value = True self.mock_isfile.side_effect = self.eggLinkInSitePackages assert egg_link_path(self.mock_dist) == self.site_packages_egglink # ################################## # # # egglink in usersite & sitepkgs # # # ################################## # def test_egglink_in_both_notvenv(self): self.mock_virtualenv_no_global.return_value = False self.mock_running_under_virtualenv.return_value = False self.mock_isfile.return_value = True assert egg_link_path(self.mock_dist) == self.user_site_egglink def test_egglink_in_both_venv_noglobal(self): self.mock_virtualenv_no_global.return_value = True self.mock_running_under_virtualenv.return_value = True self.mock_isfile.return_value = True assert egg_link_path(self.mock_dist) == self.site_packages_egglink def test_egglink_in_both_venv_global(self): self.mock_virtualenv_no_global.return_value = False self.mock_running_under_virtualenv.return_value = True self.mock_isfile.return_value = True assert egg_link_path(self.mock_dist) == self.site_packages_egglink # ############## # # # no egglink # # # ############## # def test_noegglink_in_sitepkgs_notvenv(self): self.mock_virtualenv_no_global.return_value = False self.mock_running_under_virtualenv.return_value = False self.mock_isfile.return_value = False assert egg_link_path(self.mock_dist) is None def test_noegglink_in_sitepkgs_venv_noglobal(self): self.mock_virtualenv_no_global.return_value = True self.mock_running_under_virtualenv.return_value = True self.mock_isfile.return_value = False assert egg_link_path(self.mock_dist) is None def test_noegglink_in_sitepkgs_venv_global(self): self.mock_virtualenv_no_global.return_value = False self.mock_running_under_virtualenv.return_value = True self.mock_isfile.return_value = False assert egg_link_path(self.mock_dist) is None @patch('pip._internal.utils.misc.dist_in_usersite') @patch('pip._internal.utils.misc.dist_is_local') @patch('pip._internal.utils.misc.dist_is_editable') class Tests_get_installed_distributions: """test util.get_installed_distributions""" workingset = [ Mock(test_name="global"), Mock(test_name="editable"), Mock(test_name="normal"), Mock(test_name="user"), ] workingset_stdlib = [ Mock(test_name='normal', key='argparse'), Mock(test_name='normal', key='wsgiref') ] workingset_freeze = [ Mock(test_name='normal', key='pip'), Mock(test_name='normal', key='setuptools'), Mock(test_name='normal', key='distribute') ] def dist_is_editable(self, dist): return dist.test_name == "editable" def dist_is_local(self, dist): return dist.test_name != "global" and dist.test_name != 'user' def dist_in_usersite(self, dist): return dist.test_name == "user" @patch('pip._vendor.pkg_resources.working_set', workingset) def test_editables_only(self, mock_dist_is_editable, mock_dist_is_local, mock_dist_in_usersite): mock_dist_is_editable.side_effect = self.dist_is_editable mock_dist_is_local.side_effect = self.dist_is_local mock_dist_in_usersite.side_effect = self.dist_in_usersite dists = get_installed_distributions(editables_only=True) assert len(dists) == 1, dists assert dists[0].test_name == "editable" @patch('pip._vendor.pkg_resources.working_set', workingset) def test_exclude_editables(self, mock_dist_is_editable, mock_dist_is_local, mock_dist_in_usersite): mock_dist_is_editable.side_effect = self.dist_is_editable mock_dist_is_local.side_effect = self.dist_is_local mock_dist_in_usersite.side_effect = self.dist_in_usersite dists = get_installed_distributions(include_editables=False) assert len(dists) == 1 assert dists[0].test_name == "normal" @patch('pip._vendor.pkg_resources.working_set', workingset) def test_include_globals(self, mock_dist_is_editable, mock_dist_is_local, mock_dist_in_usersite): mock_dist_is_editable.side_effect = self.dist_is_editable mock_dist_is_local.side_effect = self.dist_is_local mock_dist_in_usersite.side_effect = self.dist_in_usersite dists = get_installed_distributions(local_only=False) assert len(dists) == 4 @patch('pip._vendor.pkg_resources.working_set', workingset) def test_user_only(self, mock_dist_is_editable, mock_dist_is_local, mock_dist_in_usersite): mock_dist_is_editable.side_effect = self.dist_is_editable mock_dist_is_local.side_effect = self.dist_is_local mock_dist_in_usersite.side_effect = self.dist_in_usersite dists = get_installed_distributions(local_only=False, user_only=True) assert len(dists) == 1 assert dists[0].test_name == "user" @patch('pip._vendor.pkg_resources.working_set', workingset_stdlib) def test_gte_py27_excludes(self, mock_dist_is_editable, mock_dist_is_local, mock_dist_in_usersite): mock_dist_is_editable.side_effect = self.dist_is_editable mock_dist_is_local.side_effect = self.dist_is_local mock_dist_in_usersite.side_effect = self.dist_in_usersite dists = get_installed_distributions() assert len(dists) == 0 @patch('pip._vendor.pkg_resources.working_set', workingset_freeze) def test_freeze_excludes(self, mock_dist_is_editable, mock_dist_is_local, mock_dist_in_usersite): mock_dist_is_editable.side_effect = self.dist_is_editable mock_dist_is_local.side_effect = self.dist_is_local mock_dist_in_usersite.side_effect = self.dist_in_usersite dists = get_installed_distributions( skip=('setuptools', 'pip', 'distribute')) assert len(dists) == 0 class TestUnpackArchives(object): """ test_tar.tgz/test_tar.zip have content as follows engineered to confirm 3 things: 1) confirm that reg files, dirs, and symlinks get unpacked 2) permissions are not preserved (and go by the 022 umask) 3) reg files with *any* execute perms, get chmod +x file.txt 600 regular file symlink.txt 777 symlink to file.txt script_owner.sh 700 script where owner can execute script_group.sh 610 script where group can execute script_world.sh 601 script where world can execute dir 744 directory dir/dirfile 622 regular file """ def setup(self): self.tempdir = tempfile.mkdtemp() self.old_mask = os.umask(0o022) self.symlink_expected_mode = None def teardown(self): os.umask(self.old_mask) shutil.rmtree(self.tempdir, ignore_errors=True) def mode(self, path): return stat.S_IMODE(os.stat(path).st_mode) def confirm_files(self): # expectations based on 022 umask set above and the unpack logic that # sets execute permissions, not preservation for fname, expected_mode, test in [ ('file.txt', 0o644, os.path.isfile), ('symlink.txt', 0o644, os.path.isfile), ('script_owner.sh', 0o755, os.path.isfile), ('script_group.sh', 0o755, os.path.isfile), ('script_world.sh', 0o755, os.path.isfile), ('dir', 0o755, os.path.isdir), (os.path.join('dir', 'dirfile'), 0o644, os.path.isfile)]: path = os.path.join(self.tempdir, fname) if path.endswith('symlink.txt') and sys.platform == 'win32': # no symlinks created on windows continue assert test(path), path if sys.platform == 'win32': # the permissions tests below don't apply in windows # due to os.chmod being a noop continue mode = self.mode(path) assert mode == expected_mode, ( "mode: %s, expected mode: %s" % (mode, expected_mode) ) def test_unpack_tgz(self, data): """ Test unpacking a *.tgz, and setting execute permissions """ test_file = data.packages.join("test_tar.tgz") untar_file(test_file, self.tempdir) self.confirm_files() # Check the timestamp of an extracted file file_txt_path = os.path.join(self.tempdir, 'file.txt') mtime = time.gmtime(os.stat(file_txt_path).st_mtime) assert mtime[0:6] == (2013, 8, 16, 5, 13, 37), mtime def test_unpack_zip(self, data): """ Test unpacking a *.zip, and setting execute permissions """ test_file = data.packages.join("test_zip.zip") unzip_file(test_file, self.tempdir) self.confirm_files() class Failer: def __init__(self, duration=1): self.succeed_after = time.time() + duration def call(self, *args, **kw): """Fail with OSError self.max_fails times""" if time.time() < self.succeed_after: raise OSError("Failed") def test_rmtree_retries(tmpdir, monkeypatch): """ Test pip._internal.utils.rmtree will retry failures """ monkeypatch.setattr(shutil, 'rmtree', Failer(duration=1).call) rmtree('foo') def test_rmtree_retries_for_3sec(tmpdir, monkeypatch): """ Test pip._internal.utils.rmtree will retry failures for no more than 3 sec """ monkeypatch.setattr(shutil, 'rmtree', Failer(duration=5).call) with pytest.raises(OSError): rmtree('foo') class Test_normalize_path(object): # Technically, symlinks are possible on Windows, but you need a special # permission bit to create them, and Python 2 doesn't support it anyway, so # it's easiest just to skip this test on Windows altogether. @pytest.mark.skipif("sys.platform == 'win32'") def test_resolve_symlinks(self, tmpdir): print(type(tmpdir)) print(dir(tmpdir)) orig_working_dir = os.getcwd() os.chdir(tmpdir) try: d = os.path.join('foo', 'bar') f = os.path.join(d, 'file1') os.makedirs(d) with open(f, 'w'): # Create the file pass os.symlink(d, 'dir_link') os.symlink(f, 'file_link') assert normalize_path( 'dir_link/file1', resolve_symlinks=True ) == os.path.join(tmpdir, f) assert normalize_path( 'dir_link/file1', resolve_symlinks=False ) == os.path.join(tmpdir, 'dir_link', 'file1') assert normalize_path( 'file_link', resolve_symlinks=True ) == os.path.join(tmpdir, f) assert normalize_path( 'file_link', resolve_symlinks=False ) == os.path.join(tmpdir, 'file_link') finally: os.chdir(orig_working_dir) class TestHashes(object): """Tests for pip._internal.utils.hashes""" def test_success(self, tmpdir): """Make sure no error is raised when at least one hash matches. Test check_against_path because it calls everything else. """ file = tmpdir / 'to_hash' file.write('hello') hashes = Hashes({ 'sha256': ['2cf24dba5fb0a30e26e83b2ac5b9e29e' '1b161e5c1fa7425e73043362938b9824'], 'sha224': ['wrongwrong'], 'md5': ['5d41402abc4b2a76b9719d911017c592']}) hashes.check_against_path(file) def test_failure(self): """Hashes should raise HashMismatch when no hashes match.""" hashes = Hashes({'sha256': ['wrongwrong']}) with pytest.raises(HashMismatch): hashes.check_against_file(BytesIO(b'hello')) def test_missing_hashes(self): """MissingHashes should raise HashMissing when any check is done.""" with pytest.raises(HashMissing): MissingHashes().check_against_file(BytesIO(b'hello')) def test_unknown_hash(self): """Hashes should raise InstallationError when it encounters an unknown hash.""" hashes = Hashes({'badbad': ['dummy']}) with pytest.raises(InstallationError): hashes.check_against_file(BytesIO(b'hello')) def test_non_zero(self): """Test that truthiness tests tell whether any known-good hashes exist.""" assert Hashes({'sha256': 'dummy'}) assert not Hashes() assert not Hashes({}) class TestEncoding(object): """Tests for pip._internal.utils.encoding""" def test_auto_decode_utf16_le(self): data = ( b'\xff\xfeD\x00j\x00a\x00n\x00g\x00o\x00=\x00' b'=\x001\x00.\x004\x00.\x002\x00' ) assert auto_decode(data) == "Django==1.4.2" def test_auto_decode_no_bom(self): assert auto_decode(b'foobar') == u'foobar' def test_auto_decode_pep263_headers(self): latin1_req = u'# coding=latin1\n# Pas trop de café' assert auto_decode(latin1_req.encode('latin1')) == latin1_req def test_auto_decode_no_preferred_encoding(self): om, em = Mock(), Mock() om.return_value = 'ascii' em.return_value = None data = u'data' with patch('sys.getdefaultencoding', om): with patch('locale.getpreferredencoding', em): ret = auto_decode(data.encode(sys.getdefaultencoding())) assert ret == data class TestTempDirectory(object): # No need to test symlinked directories on Windows @pytest.mark.skipif("sys.platform == 'win32'") def test_symlinked_path(self): with TempDirectory() as tmp_dir: assert os.path.exists(tmp_dir.path) alt_tmp_dir = tempfile.mkdtemp(prefix="pip-test-") assert ( os.path.dirname(tmp_dir.path) == os.path.dirname(os.path.realpath(alt_tmp_dir)) ) # are we on a system where /tmp is a symlink if os.path.realpath(alt_tmp_dir) != os.path.abspath(alt_tmp_dir): assert ( os.path.dirname(tmp_dir.path) != os.path.dirname(alt_tmp_dir) ) else: assert ( os.path.dirname(tmp_dir.path) == os.path.dirname(alt_tmp_dir) ) os.rmdir(tmp_dir.path) assert not os.path.exists(tmp_dir.path) def test_deletes_readonly_files(self): def create_file(*args): fpath = os.path.join(*args) ensure_dir(os.path.dirname(fpath)) with open(fpath, "w") as f: f.write("Holla!") def readonly_file(*args): fpath = os.path.join(*args) os.chmod(fpath, stat.S_IREAD) with TempDirectory() as tmp_dir: create_file(tmp_dir.path, "normal-file") create_file(tmp_dir.path, "readonly-file") readonly_file(tmp_dir.path, "readonly-file") create_file(tmp_dir.path, "subfolder", "normal-file") create_file(tmp_dir.path, "subfolder", "readonly-file") readonly_file(tmp_dir.path, "subfolder", "readonly-file") assert tmp_dir.path is None def test_create_and_cleanup_work(self): tmp_dir = TempDirectory() assert tmp_dir.path is None tmp_dir.create() created_path = tmp_dir.path assert tmp_dir.path is not None assert os.path.exists(created_path) tmp_dir.cleanup() assert tmp_dir.path is None assert not os.path.exists(created_path) class TestGlibc(object): def test_manylinux1_check_glibc_version(self): """ Test that the check_glibc_version function is robust against weird glibc version strings. """ for two_twenty in ["2.20", # used by "linaro glibc", see gh-3588 "2.20-2014.11", # weird possibilities that I just made up "2.20+dev", "2.20-custom", "2.20.1", ]: assert check_glibc_version(two_twenty, 2, 15) assert check_glibc_version(two_twenty, 2, 20) assert not check_glibc_version(two_twenty, 2, 21) assert not check_glibc_version(two_twenty, 3, 15) assert not check_glibc_version(two_twenty, 1, 15) # For strings that we just can't parse at all, we should warn and # return false for bad_string in ["asdf", "", "foo.bar"]: with warnings.catch_warnings(record=True) as ws: warnings.filterwarnings("always") assert not check_glibc_version(bad_string, 2, 5) for w in ws: if "Expected glibc version with" in str(w.message): break else: # Didn't find the warning we were expecting assert False class TestCheckRequiresPython(object): @pytest.mark.parametrize( ("metadata", "should_raise"), [ ("Name: test\n", False), ("Name: test\nRequires-Python:", False), ("Name: test\nRequires-Python: invalid_spec", False), ("Name: test\nRequires-Python: <=1", True), ], ) def test_check_requires(self, metadata, should_raise): fake_dist = Mock( has_metadata=lambda _: True, get_metadata=lambda _: metadata) if should_raise: with pytest.raises(UnsupportedPythonVersion): check_dist_requires_python(fake_dist) else: check_dist_requires_python(fake_dist) class TestGetProg(object): @pytest.mark.parametrize( ("argv", "executable", "expected"), [ ('/usr/bin/pip', '', 'pip'), ('-c', '/usr/bin/python', '/usr/bin/python -m pip'), ('__main__.py', '/usr/bin/python', '/usr/bin/python -m pip'), ('/usr/bin/pip3', '', 'pip3'), ] ) def test_get_prog(self, monkeypatch, argv, executable, expected): monkeypatch.setattr('pip._internal.utils.misc.sys.argv', [argv]) monkeypatch.setattr( 'pip._internal.utils.misc.sys.executable', executable ) assert get_prog() == expected def test_call_subprocess_works_okay_when_just_given_nothing(): try: call_subprocess([sys.executable, '-c', 'print("Hello")']) except Exception: assert False, "Expected subprocess call to succeed" def test_call_subprocess_closes_stdin(): with pytest.raises(InstallationError): call_subprocess([sys.executable, '-c', 'input()']) @pytest.mark.parametrize('args, expected', [ # Test without subdir. (('git+https://example.com/pkg', 'dev', 'myproj'), 'git+https://example.com/pkg@dev#egg=myproj'), # Test with subdir. (('git+https://example.com/pkg', 'dev', 'myproj', 'sub/dir'), 'git+https://example.com/pkg@dev#egg=myproj&subdirectory=sub/dir'), # Test with None subdir. (('git+https://example.com/pkg', 'dev', 'myproj', None), 'git+https://example.com/pkg@dev#egg=myproj'), ]) def test_make_vcs_requirement_url(args, expected): actual = make_vcs_requirement_url(*args) assert actual == expected @pytest.mark.parametrize('netloc, expected', [ # Test a basic case. ('example.com', ('example.com', (None, None))), # Test with username and no password. ('user@example.com', ('example.com', ('user', None))), # Test with username and password. ('user:pass@example.com', ('example.com', ('user', 'pass'))), # Test with username and empty password. ('user:@example.com', ('example.com', ('user', ''))), # Test the password containing an @ symbol. ('user:pass@word@example.com', ('example.com', ('user', 'pass@word'))), # Test the password containing a : symbol. ('user:pass:word@example.com', ('example.com', ('user', 'pass:word'))), ]) def test_split_auth_from_netloc(netloc, expected): actual = split_auth_from_netloc(netloc) assert actual == expected @pytest.mark.parametrize('auth_url, expected_url', [ ('https://user:pass@domain.tld/project/tags/v0.2', 'https://domain.tld/project/tags/v0.2'), ('https://domain.tld/project/tags/v0.2', 'https://domain.tld/project/tags/v0.2',), ('https://user:pass@domain.tld/svn/project/trunk@8181', 'https://domain.tld/svn/project/trunk@8181'), ('https://domain.tld/project/trunk@8181', 'https://domain.tld/project/trunk@8181',), ('git+https://pypi.org/something', 'git+https://pypi.org/something'), ('git+https://user:pass@pypi.org/something', 'git+https://pypi.org/something'), ('git+ssh://git@pypi.org/something', 'git+ssh://pypi.org/something'), ]) def test_remove_auth_from_url(auth_url, expected_url): url = remove_auth_from_url(auth_url) assert url == expected_url