1
1
Fork 0
mirror of https://github.com/pypa/pip synced 2023-12-13 21:30:23 +01:00

Add checks against requirements-file-dwelling hashes for most kinds of packages. Close #1175.

* Add --require-hashes option. This is handy in deployment scripts to force application authors to hash their requirements. It is also a convenient way to get pip to show computed hashes for a virgin, unhashed requirements file. Eventually, additions to `pip freeze` should fill a superset of this use case.
  * In --require-hashes mode, at least one hash is required to match for each requirement.
  * Option-based requirements (--sha256=...) turn on --require-hashes mode implicitly.
  * Internet-derived URL-based hashes are "necessary but not sufficient": they do not satisfy --require-hashes mode when they match, but they are still used to guard against transmission errors.
  * Other URL-based requirements (#md5=...) are treated just like flag-based ones, except they don't turn on --require-hashes.
* Complain informatively, with the most devastating errors first so you don't chase your tail all day only to run up against a brick wall at the end. This also means we don't complain that a hash is missing, only for the user to find, after fixing it, that we have no idea how to even compute a hash for that type of requirement.
  * Complain about unpinned requirements when hash-checking mode is on, lest they cause the user surprise later.
  * Complain about missing hashes.
  * Complain about requirement types we don't know how to hash (like VCS ones and local dirs).
* Have InstallRequirement keep its original Link around (original_link) so we can differentiate between URL hashes from requirements files and ones downloaded from the (untrustworthy) internet.
* Remove test_download_hashes, which is obsolete. Similar coverage is provided in test_utils.TestHashes and the various hash cases in test_req.py.
This commit is contained in:
Erik Rose 2015-09-09 13:01:53 -04:00
parent 3303be0c4e
commit 1e41f01823
14 changed files with 777 additions and 378 deletions

View file

@ -159,6 +159,15 @@ class InstallCommand(RequirementCommand):
cmd_opts.add_option(cmdoptions.no_clean())
cmd_opts.add_option(
'--require-hashes',
dest='require_hashes',
action='store_true',
help='Perform a provably repeatable installation by requiring a '
'hash to check each package against. Implied by the presence '
'of a hash flag, like --sha256, on any individual '
'requirement')
index_opts = cmdoptions.make_option_group(
cmdoptions.index_group,
self.parser,
@ -266,6 +275,7 @@ class InstallCommand(RequirementCommand):
pycompile=options.compile,
isolated=options.isolated_mode,
wheel_cache=wheel_cache,
require_hashes=options.require_hashes,
)
self.populate_requirement_set(

View file

@ -29,7 +29,7 @@ from pip.exceptions import InstallationError, HashMismatch
from pip.models import PyPI
from pip.utils import (splitext, rmtree, format_size, display_path,
backup_dir, ask_path_exists, unpack_file,
call_subprocess, ARCHIVE_EXTENSIONS)
call_subprocess, ARCHIVE_EXTENSIONS, consume)
from pip.utils.filesystem import check_path_owner
from pip.utils.logging import indent_log
from pip.utils.ui import DownloadProgressBar, DownloadProgressSpinner
@ -485,57 +485,22 @@ def is_file_url(link):
return link.url.lower().startswith('file:')
def _check_hash(download_hash, link):
if download_hash.digest_size != hashlib.new(link.hash_name).digest_size:
logger.critical(
"Hash digest size of the package %d (%s) doesn't match the "
"expected hash name %s!",
download_hash.digest_size, link, link.hash_name,
)
raise HashMismatch('Hash name mismatch for package %s' % link)
if download_hash.hexdigest() != link.hash:
logger.critical(
"Hash of the package %s (%s) doesn't match the expected hash %s!",
link, download_hash.hexdigest(), link.hash,
)
raise HashMismatch(
'Bad %s hash for package %s' % (link.hash_name, link)
)
def is_dir_url(link):
"""Return whether a file:// Link points to a directory.
``link`` must not have any other scheme but file://. Call is_file_url()
first.
def _get_hash_from_file(target_file, link):
try:
download_hash = hashlib.new(link.hash_name)
except (ValueError, TypeError):
logger.warning(
"Unsupported hash name %s for package %s", link.hash_name, link,
)
return None
with open(target_file, 'rb') as fp:
while True:
chunk = fp.read(4096)
if not chunk:
break
download_hash.update(chunk)
return download_hash
"""
link_path = url_to_path(link.url_without_fragment)
return os.path.isdir(link_path)
def _progress_indicator(iterable, *args, **kwargs):
return iterable
def _download_url(resp, link, content_file):
download_hash = None
if link.hash and link.hash_name:
try:
download_hash = hashlib.new(link.hash_name)
except ValueError:
logger.warning(
"Unsupported hash name %s for package %s",
link.hash_name, link,
)
def _download_url(resp, link, content_file, hashes):
try:
total_length = int(resp.headers['content-length'])
except (ValueError, KeyError, TypeError):
@ -593,6 +558,11 @@ def _download_url(resp, link, content_file):
break
yield chunk
def written_chunks(chunks):
for chunk in chunks:
content_file.write(chunk)
yield chunk
progress_indicator = _progress_indicator
if link.netloc == PyPI.netloc:
@ -614,13 +584,12 @@ def _download_url(resp, link, content_file):
logger.debug('Downloading from URL %s', link)
for chunk in progress_indicator(resp_read(4096), 4096):
if download_hash is not None:
download_hash.update(chunk)
content_file.write(chunk)
if link.hash and link.hash_name:
_check_hash(download_hash, link)
return download_hash
downloaded_chunks = written_chunks(progress_indicator(resp_read(4096),
4096))
if hashes:
hashes.check_against_chunks(downloaded_chunks)
else:
consume(downloaded_chunks)
def _copy_file(filename, location, content_type, link):
@ -648,7 +617,11 @@ def _copy_file(filename, location, content_type, link):
logger.info('Saved %s', display_path(download_location))
def unpack_http_url(link, location, download_dir=None, session=None):
def unpack_http_url(link,
location,
download_dir=None,
session=None,
hashes=None):
if session is None:
raise TypeError(
"unpack_http_url() missing 1 required keyword argument: 'session'"
@ -659,14 +632,19 @@ def unpack_http_url(link, location, download_dir=None, session=None):
# If a download dir is specified, is the file already downloaded there?
already_downloaded_path = None
if download_dir:
already_downloaded_path = _check_download_dir(link, download_dir)
already_downloaded_path = _check_download_dir(link,
download_dir,
hashes)
if already_downloaded_path:
from_path = already_downloaded_path
content_type = mimetypes.guess_type(from_path)[0]
else:
# let's download to a tmp dir
from_path, content_type = _download_http_url(link, session, temp_dir)
from_path, content_type = _download_http_url(link,
session,
temp_dir,
hashes)
# unpack the archive to the build dir location. even when only downloading
# archives, they have to be unpacked to parse dependencies
@ -681,15 +659,16 @@ def unpack_http_url(link, location, download_dir=None, session=None):
rmtree(temp_dir)
def unpack_file_url(link, location, download_dir=None):
def unpack_file_url(link, location, download_dir=None, hashes=None):
"""Unpack link into location.
If download_dir is provided and link points to a file, make a copy
of the link file inside download_dir."""
If download_dir is provided and link points to a file, make a copy
of the link file inside download_dir.
"""
link_path = url_to_path(link.url_without_fragment)
# If it's a url to a local directory
if os.path.isdir(link_path):
if is_dir_url(link):
if os.path.isdir(location):
rmtree(location)
shutil.copytree(link_path, location, symlinks=True)
@ -697,15 +676,17 @@ def unpack_file_url(link, location, download_dir=None):
logger.info('Link is a directory, ignoring download_dir')
return
# if link has a hash, let's confirm it matches
if link.hash:
link_path_hash = _get_hash_from_file(link_path, link)
_check_hash(link_path_hash, link)
# If --require-hashes is off, `hashes` is either empty, the link hash, or
# MissingHashes, and it's required to match. If --require-hashes is on, we
# are satisfied by any hash in `hashes` matching: a URL-based or an
# option-based one; no internet-sourced hash will be in `hashes`.
if hashes:
hashes.check_against_path(link_path)
# If a download dir is specified, is the file already there and valid?
already_downloaded_path = None
if download_dir:
already_downloaded_path = _check_download_dir(link, download_dir)
already_downloaded_path = _check_download_dir(link, download_dir, hashes)
if already_downloaded_path:
from_path = already_downloaded_path
@ -752,7 +733,7 @@ class PipXmlrpcTransport(xmlrpc_client.Transport):
def unpack_url(link, location, download_dir=None,
only_download=False, session=None):
only_download=False, session=None, hashes=None):
"""Unpack link.
If link is a VCS link:
if only_download, export into download_dir and ignore location
@ -761,6 +742,11 @@ def unpack_url(link, location, download_dir=None,
- unpack into location
- if download_dir, copy the file into download_dir
- if only_download, mark location for deletion
:param hashes: A Hashes object, one of whose embedded hashes must match,
or I'll raise HashMismatch. If the Hashes is empty, no matches are
required, and unhashable types of requirements (like VCS ones, which
would ordinarily raise HashUnsupported) are allowed.
"""
# non-editable vcs urls
if is_vcs_url(link):
@ -768,7 +754,7 @@ def unpack_url(link, location, download_dir=None,
# file urls
elif is_file_url(link):
unpack_file_url(link, location, download_dir)
unpack_file_url(link, location, download_dir, hashes=hashes)
# http urls
else:
@ -780,12 +766,13 @@ def unpack_url(link, location, download_dir=None,
location,
download_dir,
session,
hashes=hashes
)
if only_download:
write_delete_marker_file(location)
def _download_http_url(link, session, temp_dir):
def _download_http_url(link, session, temp_dir, hashes):
"""Download link url into temp_dir using provided session"""
target_url = link.url.split('#', 1)[0]
try:
@ -840,11 +827,11 @@ def _download_http_url(link, session, temp_dir):
filename += ext
file_path = os.path.join(temp_dir, filename)
with open(file_path, 'wb') as content_file:
_download_url(resp, link, content_file)
_download_url(resp, link, content_file, hashes)
return file_path, content_type
def _check_download_dir(link, download_dir):
def _check_download_dir(link, download_dir, hashes):
""" Check download_dir for previously downloaded file with correct hash
If a correct file is found return its path else None
"""
@ -852,10 +839,9 @@ def _check_download_dir(link, download_dir):
if os.path.exists(download_path):
# If already downloaded, does its hash match?
logger.info('File was already downloaded %s', download_path)
if link.hash:
download_hash = _get_hash_from_file(download_path, link)
if hashes:
try:
_check_hash(download_hash, link)
hashes.check_against_path(download_path)
except HashMismatch:
logger.warning(
'Previously-downloaded file %s has bad hash. '

View file

@ -1,6 +1,10 @@
"""Exceptions used throughout package"""
from __future__ import absolute_import
from itertools import chain, groupby, repeat
from pip._vendor.six import iteritems
class PipError(Exception):
"""Base pip exception"""
@ -39,13 +43,208 @@ class PreviousBuildDirError(PipError):
"""Raised when there's a previous conflicting build directory"""
class HashMismatch(InstallationError):
"""Distribution file hash values don't match."""
class InvalidWheelFilename(InstallationError):
"""Invalid wheel filename."""
class UnsupportedWheel(InstallationError):
"""Unsupported wheel."""
# The recommended hash algo of the moment. Feel free to change this any time.
FAVORITE_HASH = 'sha256'
class HashErrors(InstallationError):
"""Multiple HashError instances rolled into one for reporting"""
def __init__(self):
self.errors = []
def append(self, error):
self.errors.append(error)
def __str__(self):
lines = []
self.errors.sort(key=lambda e: e.order)
for cls, errors_of_cls in groupby(self.errors, lambda e: e.__class__):
lines.append(cls.head())
lines.extend(e.body() for e in errors_of_cls)
if lines:
return '\n'.join(lines)
def __nonzero__(self):
return bool(self.errors)
def __bool__(self):
return self.__nonzero__()
class HashError(InstallationError):
"""A failure to verify a package against known-good hashes
:cvar order: An int sorting hash exception classes by difficulty of
recovery (lower being harder), so the user doesn't bother fretting
about unpinned packages when he has deeper issues, like VCS
dependencies, to deal with. Also keeps error reports in a
deterministic order.
:ivar req: The InstallRequirement that triggered this error. This is
pasted on after the exception is instantiated, because it's not
typically available earlier.
"""
req = None
@classmethod
def head(cls):
"""Return a section heading for display above potentially many
exceptions of this kind."""
def body(self):
"""Return a summary of me for display under the heading.
This default implementation simply prints a description of the
triggering requirement.
:param req: The InstallRequirement that provoked this error, with
populate_link() having already been called
"""
return ' %s' % self._requirement_name()
def __str__(self):
return '%s\n%s' % (self.head(), self.body())
def _requirement_name(self): # TODO: Make sure this is the best it can be and is DRY with subclasses.
"""Return a description of the requirement that triggered me.
This default implementation returns long description of the req, with
line numbers
"""
return str(self.req) if self.req else 'unknown package'
class VcsHashUnsupported(HashError):
"""A hash was provided for a version-control-system-based requirement, but
we don't have a method for hashing those."""
order = 0
@classmethod
def head(cls):
return ("Can't verify hashes for these requirements because we don't "
"have a way to hash version control repositories:")
class DirectoryUrlHashUnsupported(HashError):
"""A hash was provided for a version-control-system-based requirement, but
we don't have a method for hashing those."""
order = 1
@classmethod
def head(cls):
return ("Can't verify hashes for these file:// requirements because "
"they point to directories:")
class HashMissing(HashError):
"""A hash was needed for a requirement but is absent."""
order = 2
def __init__(self, gotten_hash):
"""
:param gotten_hash: The hash of the (possibly malicious) archive we
just downloaded
"""
self.gotten_hash = gotten_hash
@classmethod
def head(cls):
return ('These requirements were missing hashes, which leaves them '
'open to tampering. (Hashes are required in --require-hashes '
'mode, which is implicitly on when a hash is specified for '
'any package.) Here are the hashes the downloaded archives '
'actually had. You can add lines like these to your '
'requirements files to pin them down.')
def body(self):
return ' %s --%s=%s' % (self.req.req if self.req and
# In case someone feeds something
# downright stupid to
# InstallRequirement's constructor:
getattr(self.req, 'req', None)
else 'unknown package',
FAVORITE_HASH,
self.gotten_hash)
class HashUnpinned(HashError):
"""A requirement had a hash specified but was not pinned to a specific
version."""
order = 3
@classmethod
def head(cls):
return ('When a hash is specified, a requirement must also have its '
'version pinned with ==. These do not:')
class HashMismatch(HashError):
"""Distribution file hash values don't match.
:ivar package_name: The name of the package that triggered the hash
mismatch. Feel free to write to this after the exception is raise to
improve its error message.
"""
order = 4
def __init__(self, goods, gots):
"""
:param goods: A dict of algorithm names pointing to lists of allowed
hex digests
:param gots: A dict of algorithm names pointing to hashes we
actually got from the files under suspicion
"""
self.goods = goods
self.gots = gots
@classmethod
def head(cls):
return ('THESE PACKAGES DID NOT MATCH THE HASHES FROM THE '
'REQUIREMENTS FILE. If you have updated the package versions, '
'update the hashes. Otherwise, examine the package contents '
'carefully; someone may have tampered with them.')
def body(self):
return ' %s:\n%s' % (self._requirement_name(),
self._hash_comparison())
def _hash_comparison(self):
"""Return a comparison of actual and expected hash values.
Example::
Expected sha256 abcdeabcdeabcdeabcdeabcdeabcdeabcdeabcdeabcde
or 123451234512345123451234512345123451234512345
Got bcdefbcdefbcdefbcdefbcdefbcdefbcdefbcdefbcdef
"""
def hash_then_or(hash_name):
# For now, all the decent hashes have 6-char names, so we can get
# away with hard-coding space literals.
return chain([hash_name], repeat(' or'))
lines = []
for hash_name, expecteds in iteritems(self.goods):
prefix = hash_then_or(hash_name)
lines.extend((' Expected %s %s' % (next(prefix), e))
for e in expecteds)
lines.append(' Got %s\n' %
self.gots[hash_name].hexdigest())
prefix = ' or'
return '\n'.join(lines)

View file

@ -32,6 +32,7 @@ from pip.utils import (
call_subprocess, read_text_file, FakeFile, _make_build_dir, ensure_dir,
get_installed_version
)
from pip.utils.hashes import Hashes
from pip.utils.logging import indent_log
from pip.req.req_uninstall import UninstallPathSet
from pip.vcs import vcs
@ -76,7 +77,7 @@ class InstallRequirement(object):
self.editable_options = editable_options
self._wheel_cache = wheel_cache
self.link = link
self.link = self.original_link = link
self.as_egg = as_egg
self.markers = markers
self._egg_info_path = None
@ -265,6 +266,15 @@ class InstallRequirement(object):
def specifier(self):
return self.req.specifier
@property
def is_pinned(self):
"""Return whether I am pinned to an exact version.
For example, some-package==1.2 is pinned; some-package>1.2 is not.
"""
specifiers = self.specifier
return len(specifiers) == 1 and next(iter(specifiers)).operator == '=='
def from_path(self):
if self.req is None:
return None
@ -1005,6 +1015,36 @@ exec(compile(
project_name=dist_name,
metadata=metadata)
@property
def has_hash_options(self):
"""Return whether any known-good hashes are specified as options.
These activate --require-hashes mode; hashes specified as part of a
URL do not.
"""
return bool(self.options.get('hashes', {}))
def hashes(self, trust_internet=True):
"""Return a hash-comparer that considers my option- and URL-based
hashes to be known-good.
Hashes in URLs are almost peers with ones from flags. They satisfy
--require-hashes (whether it was implicitly or explicitly activated)
but do not activate it. md5 and sha224 are not allowed in flags, which
should nudge people toward good algos. We always OR all hashes
together, even ones from URLs.
:param trust_internet: Whether to trust URL-based (#md5=...) hashes
downloaded from the internet, as by populate_link()
"""
good_hashes = self.options.get('hashes', {}).copy()
link = self.link if trust_internet else self.original_link
if link and link.hash:
good_hashes.setdefault(link.hash_name, []).append(link.hash)
return Hashes(good_hashes)
def _strip_postfix(req):
"""

View file

@ -2,19 +2,23 @@ from __future__ import absolute_import
from collections import defaultdict
import functools
import itertools
from itertools import chain
import logging
import os
from pip._vendor import pkg_resources
from pip._vendor import requests
from pip.download import url_to_path, unpack_url
from pip.download import (is_file_url, is_dir_url, is_vcs_url, url_to_path,
unpack_url)
from pip.exceptions import (InstallationError, BestVersionAlreadyInstalled,
DistributionNotFound, PreviousBuildDirError)
DistributionNotFound, PreviousBuildDirError,
HashError, HashErrors, HashUnpinned,
DirectoryUrlHashUnsupported, VcsHashUnsupported)
from pip.req.req_install import InstallRequirement
from pip.utils import (
display_path, dist_in_usersite, ensure_dir, normalize_path)
from pip.utils.hashes import MissingHashes
from pip.utils.logging import indent_log
from pip.vcs import vcs
@ -140,7 +144,7 @@ class RequirementSet(object):
ignore_dependencies=False, force_reinstall=False,
use_user_site=False, session=None, pycompile=True,
isolated=False, wheel_download_dir=None,
wheel_cache=None):
wheel_cache=None, require_hashes=False):
"""Create a RequirementSet.
:param wheel_download_dir: Where still-packed .whl files should be
@ -186,6 +190,7 @@ class RequirementSet(object):
wheel_download_dir = normalize_path(wheel_download_dir)
self.wheel_download_dir = wheel_download_dir
self._wheel_cache = wheel_cache
self._require_hashes = require_hashes
# Maps from install_req -> dependencies_of_install_req
self._dependencies = defaultdict(list)
@ -315,23 +320,6 @@ class RequirementSet(object):
req.uninstall(auto_confirm=auto_confirm)
req.commit_uninstall()
def _walk_req_to_install(self, handler):
"""Call handler for all pending reqs.
:param handler: Handle a single requirement. Should take a requirement
to install. Can optionally return an iterable of additional
InstallRequirements to cover.
"""
# The list() here is to avoid potential mutate-while-iterating bugs.
discovered_reqs = []
reqs = itertools.chain(
list(self.unnamed_requirements), list(self.requirements.values()),
discovered_reqs)
for req_to_install in reqs:
more_reqs = handler(req_to_install)
if more_reqs:
discovered_reqs.extend(more_reqs)
def prepare_files(self, finder):
"""
Prepare process. Create temp directories, download and/or unpack files.
@ -340,8 +328,37 @@ class RequirementSet(object):
if self.wheel_download_dir:
ensure_dir(self.wheel_download_dir)
self._walk_req_to_install(
functools.partial(self._prepare_file, finder))
# If any top-level requirement has a hash specified, enter
# hash-checking mode, which requires hashes from all.
root_reqs = self.unnamed_requirements + self.requirements.values()
require_hashes = (self._require_hashes or
any(req.has_hash_options for req in root_reqs))
if require_hashes and self.as_egg:
raise InstallationError(
'--egg is not allowed with --require-hashes mode, since it '
'delegates dependency resolution to setuptools and could thus '
'result in installation of unhashed packages.')
# Actually prepare the files, and collect any exceptions. The
# *HashUnsupported exceptions cannot be checked ahead of time, because
# req.populate_links() needs to be called before we can examine the
# link type.
discovered_reqs = []
hash_errors = HashErrors()
for req in chain(root_reqs, discovered_reqs):
try:
discovered_reqs.extend(self._prepare_file(
finder,
req,
require_hashes=require_hashes,
ignore_dependencies=self.ignore_dependencies))
except HashError as exc:
exc.req = req
hash_errors.append(exc)
if hash_errors:
raise hash_errors
def _check_skip_installed(self, req_to_install, finder):
"""Check if req_to_install should be skipped.
@ -395,7 +412,11 @@ class RequirementSet(object):
else:
return None
def _prepare_file(self, finder, req_to_install):
def _prepare_file(self,
finder,
req_to_install,
require_hashes=False,
ignore_dependencies=False):
"""Prepare a single requirements file.
:return: A list of additional InstallRequirements to also install.
@ -442,6 +463,11 @@ class RequirementSet(object):
# # vcs update or unpack archive # #
# ################################ #
if req_to_install.editable:
if require_hashes:
raise InstallationError(
'The editable requirement %s cannot be installed when '
'requiring hashes, because there is no single file to '
'hash.' % req_to_install)
req_to_install.ensure_has_source_dir(self.src_dir)
req_to_install.update_editable(not self.is_download)
abstract_dist = make_abstract_dist(req_to_install)
@ -449,6 +475,12 @@ class RequirementSet(object):
if self.is_download:
req_to_install.archive(self.download_dir)
elif req_to_install.satisfied_by:
if require_hashes:
logger.info(
'Since it is already installed, we are trusting this '
'package without checking its hash. To ensure a '
'completely repeatable environment, install into an '
'empty virtualenv.')
abstract_dist = Installed(req_to_install)
else:
# @@ if filesystem packages are not marked
@ -480,6 +512,41 @@ class RequirementSet(object):
# If no new versions are found, DistributionNotFound is raised,
# otherwise a result is guaranteed.
assert req_to_install.link
link = req_to_install.link
# Now that we have the real link, we can tell what kind of
# requirements we have and raise some more informative errors
# than otherwise. (For example, we can raise VcsHashUnsupported
# for a VCS URL rather than HashMissing.)
if require_hashes:
# We could check these first 2 conditions inside
# unpack_url and save repetition of conditions, but then
# we would report less-useful error messages for
# unhashable requirements, complaining that there's no
# hash provided.
if is_vcs_url(link):
raise VcsHashUnsupported()
elif is_file_url(link) and is_dir_url(link):
raise DirectoryUrlHashUnsupported()
if (not req_to_install.original_link and
not req_to_install.is_pinned):
# Unpinned packages are asking for trouble when a new
# version is uploaded. This isn't a security check, but
# it saves users a surprising hash mismatch in the
# future.
#
# file:/// URLs aren't pinnable, so don't complain
# about them not being pinned.
raise HashUnpinned()
hashes = req_to_install.hashes(
trust_internet=not require_hashes)
if require_hashes and not hashes:
# Known-good hashes are missing for this requirement, so
# shim it with a facade object that will provoke hash
# computation and then raise a HashMissing exception
# showing the user what the hash should be.
hashes = MissingHashes()
try:
download_dir = self.download_dir
# We always delete unpacked sdists after pip ran.
@ -501,7 +568,7 @@ class RequirementSet(object):
unpack_url(
req_to_install.link, req_to_install.source_dir,
download_dir, autodelete_unpacked,
session=self.session)
session=self.session, hashes=hashes)
except requests.HTTPError as exc:
logger.critical(
'Could not install requirement %s because '
@ -564,7 +631,11 @@ class RequirementSet(object):
# 'unnamed' requirements will get added here
self.add_requirement(req_to_install, None)
if not self.ignore_dependencies:
if not ignore_dependencies and not require_hashes:
# --require-hashes implies --no-deps because, otherwise,
# unhashed dependencies could creep in. In the future, we
# should report unhashed dependencies rather than just not
# installing them.
if (req_to_install.extras):
logger.debug(
"Installing extra requirements: %r",

View file

@ -1,5 +1,6 @@
from __future__ import absolute_import
from collections import deque
import contextlib
import errno
import locale
@ -832,3 +833,8 @@ def get_installed_version(dist_name):
def canonicalize_name(name):
"""Convert an arbitrary string to a canonical name used for comparison"""
return pkg_resources.safe_name(name).lower()
def consume(iterator):
"""Consume an iterable at C speed."""
deque(iterator, maxlen=0)

88
pip/utils/hashes.py Normal file
View file

@ -0,0 +1,88 @@
from __future__ import absolute_import
import hashlib
from pip.exceptions import (HashMismatch, HashMissing, InstallationError,
FAVORITE_HASH)
from pip._vendor.six import iteritems, iterkeys, itervalues
class Hashes(object):
"""A wrapper that builds multiple hashes at once and checks them against
known-good values
"""
def __init__(self, hashes=None):
"""
:param hashes: A dict of algorithm names pointing to lists of allowed
hex digests
"""
self._goods = {} if hashes is None else hashes
def check_against_chunks(self, chunks):
"""Check good hashes against ones built from iterable of chunks of
data.
Raise HashMismatch if none match.
"""
gots = {}
for hash_name in iterkeys(self._goods):
try:
gots[hash_name] = hashlib.new(hash_name)
except (ValueError, TypeError):
raise InstallationError('Unknown hash name: %s' % hash_name)
for chunk in chunks:
for hash in itervalues(gots):
hash.update(chunk)
for hash_name, got in iteritems(gots):
if got.hexdigest() in self._goods[hash_name]:
return
self._raise(gots)
def _raise(self, gots):
raise HashMismatch(self._goods, gots)
def check_against_file(self, file):
"""Check good hashes against a file-like object
Raise HashMismatch if none match.
"""
def chunks():
while True:
chunk = file.read(4096)
if not chunk:
break
yield chunk
return self.check_against_chunks(chunks())
def check_against_path(self, path):
with open(path, 'rb') as file:
return self.check_against_file(file)
def __nonzero__(self):
"""Return whether I know any known-good hashes."""
return bool(self._goods)
def __bool__(self):
return self.__nonzero__()
class MissingHashes(Hashes):
"""A workalike for Hashes used when we're missing a hash for a requirement
It computes the "gotten" hash of the requirement and raises a HashMissing
exception showing it to the user.
"""
def __init__(self):
"""Don't offer the ``hashes`` kwarg."""
# Pass our favorite hash in to generate a "gotten hash". With the
# empty list, it will never match, so an error will always raise.
super(MissingHashes, self).__init__(hashes={FAVORITE_HASH: []})
def _raise(self, gots):
raise HashMissing(gots[FAVORITE_HASH].hexdigest())

View file

@ -1,4 +1,3 @@
import os
import textwrap
import glob
@ -9,7 +8,8 @@ import pytest
from pip.utils import appdirs, rmtree
from tests.lib import (pyversion, pyversion_tuple,
_create_test_package, _create_svn_repo, path_to_url)
_create_test_package, _create_svn_repo, path_to_url,
requirements_file)
from tests.lib.local_repos import local_checkout
from tests.lib.path import Path
@ -217,6 +217,44 @@ def test_install_from_local_directory(script, data):
assert egg_info_folder in result.files_created, str(result)
def test_hashed_install_success(script, data, tmpdir):
"""
Test that installing various sorts of requirements with correct hashes
works.
Test file URLs and index packages (which become HTTP URLs behind the
scenes).
"""
file_url = path_to_url(
(data.packages / 'simple-1.0.tar.gz').abspath)
with requirements_file('simple2==1.0 --sha256=9336af72ca661e6336eb87bc7de3e8844d853e3848c2b9bbd2e8bf01db88c2c7\n'
'{simple} --sha256=393043e672415891885c9a2a0929b1af95fb866d6ca016b42d2e6ce53619b653'.format(simple=file_url),
tmpdir) as reqs_file:
result = script.pip_install_local('-r',
reqs_file.abspath,
expect_error=False)
def test_hashed_install_failure(script, data, tmpdir):
"""Test that wrong hashes stop installation.
This makes sure prepare_files() is called in the course of installation
and so has the opportunity to halt if hashes are wrong. Checks on various
kinds of hashes are in test_req.py.
"""
file_url = path_to_url(
(data.packages / 'simple-1.0.tar.gz').abspath)
with requirements_file('simple2==1.0 --sha256=9336af72ca661e6336eb87bc7de3e8844d853e3848c2b9bbd2e8bf01db88c2c\n',
tmpdir) as reqs_file:
result = script.pip_install_local('-r',
reqs_file.abspath,
expect_error=True)
assert len(result.files_created) == 0
def test_install_from_local_directory_with_symlinks_to_directories(
script, data):
"""

View file

@ -1,5 +1,6 @@
from __future__ import absolute_import
from contextlib import contextmanager
import os
import sys
import re
@ -569,7 +570,22 @@ def assert_raises_regexp(exception, reg, run, *args, **kwargs):
try:
run(*args, **kwargs)
assert False, "%s should have been thrown" % exception
except Exception:
except exception:
e = sys.exc_info()[1]
p = re.compile(reg)
assert p.search(str(e)), str(e)
@contextmanager
def requirements_file(contents, tmpdir):
"""Return a Path to a requirements file of given contents.
As long as the context manager is open, the requirements file will exist.
:param tmpdir: A Path to the folder in which to create the file
"""
path = tmpdir / 'reqs.txt'
path.write(contents)
yield path
path.remove()

View file

@ -16,6 +16,7 @@ from pip.download import (
unpack_file_url,
)
from pip.index import Link
from pip.utils.hashes import Hashes
def test_unpack_http_url_with_urllib_response_without_content_type(data):
@ -105,6 +106,7 @@ def test_unpack_http_url_bad_downloaded_checksum(mock_unpack_file):
'location',
download_dir=download_dir,
session=session,
hashes=Hashes({'sha1': [download_hash.hexdigest()]})
)
# despite existence of downloaded file with bad hash, downloaded again
@ -209,7 +211,9 @@ class Test_unpack_file_url(object):
self.prep(tmpdir, data)
self.dist_url.url = "%s#md5=bogus" % self.dist_url.url
with pytest.raises(HashMismatch):
unpack_file_url(self.dist_url, self.build_dir)
unpack_file_url(self.dist_url,
self.build_dir,
hashes=Hashes({'md5': ['bogus']}))
def test_unpack_file_url_download_bad_hash(self, tmpdir, data,
monkeypatch):
@ -235,7 +239,8 @@ class Test_unpack_file_url(object):
dist_path_md5
)
unpack_file_url(self.dist_url, self.build_dir,
download_dir=self.download_dir)
download_dir=self.download_dir,
hashes=Hashes({'md5': [dist_path_md5]}))
# confirm hash is for simple1-1.0
# the previous bad download has been removed

View file

@ -1,263 +0,0 @@
import pytest
from pip.download import _get_hash_from_file, _check_hash
from pip.exceptions import InstallationError
from pip.index import Link
def test_get_hash_from_file_md5(data):
file_path = data.packages.join("gmpy-1.15.tar.gz")
file_link = Link(
"http://testserver/gmpy-1.15.tar.gz"
"#md5=d41d8cd98f00b204e9800998ecf8427e"
)
download_hash = _get_hash_from_file(file_path, file_link)
assert download_hash.digest_size == 16
assert download_hash.hexdigest() == "d41d8cd98f00b204e9800998ecf8427e"
def test_get_hash_from_file_sha1(data):
file_path = data.packages.join("gmpy-1.15.tar.gz")
file_link = Link(
"http://testserver/gmpy-1.15.tar.gz"
"#sha1=da39a3ee5e6b4b0d3255bfef95601890afd80709"
)
download_hash = _get_hash_from_file(file_path, file_link)
assert download_hash.digest_size == 20
assert download_hash.hexdigest() == (
"da39a3ee5e6b4b0d3255bfef95601890afd80709"
)
def test_get_hash_from_file_sha224(data):
file_path = data.packages.join("gmpy-1.15.tar.gz")
file_link = Link(
"http://testserver/gmpy-1.15.tar.gz"
"#sha224=d14a028c2a3a2bc9476102bb288234c415a2b01f828ea62ac5b3e42f"
)
download_hash = _get_hash_from_file(file_path, file_link)
assert download_hash.digest_size == 28
assert download_hash.hexdigest() == (
"d14a028c2a3a2bc9476102bb288234c415a2b01f828ea62ac5b3e42f"
)
def test_get_hash_from_file_sha384(data):
file_path = data.packages.join("gmpy-1.15.tar.gz")
file_link = Link(
"http://testserver/gmpy-1.15.tar.gz"
"#sha384=38b060a751ac96384cd9327eb1b1e36a21fdb71114be07434c0cc7bf63f6e"
"1da274edebfe76f65fbd51ad2f14898b95b"
)
download_hash = _get_hash_from_file(file_path, file_link)
assert download_hash.digest_size == 48
assert download_hash.hexdigest() == (
"38b060a751ac96384cd9327eb1b1e36a21fdb71114be07434c0cc7bf63f6e1da274e"
"debfe76f65fbd51ad2f14898b95b"
)
def test_get_hash_from_file_sha256(data):
file_path = data.packages.join("gmpy-1.15.tar.gz")
file_link = Link(
"http://testserver/gmpy-1.15.tar.gz"
"#sha256=e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852"
"b855"
)
download_hash = _get_hash_from_file(file_path, file_link)
assert download_hash.digest_size == 32
assert download_hash.hexdigest() == (
"e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855"
)
def test_get_hash_from_file_sha512(data):
file_path = data.packages.join("gmpy-1.15.tar.gz")
file_link = Link(
"http://testserver/gmpy-1.15.tar.gz"
"#sha512=cf83e1357eefb8bdf1542850d66d8007d620e4050b5715dc83f4a921d36"
"ce9ce47d0d13c5d85f2b0ff8318d2877eec2f63b931bd47417a81a538327af927da3e"
)
download_hash = _get_hash_from_file(file_path, file_link)
assert download_hash.digest_size == 64
assert download_hash.hexdigest() == (
"cf83e1357eefb8bdf1542850d66d8007d620e4050b5715dc83f4a921d36ce9ce47d0"
"d13c5d85f2b0ff8318d2877eec2f63b931bd47417a81a538327af927da3e"
)
def test_get_hash_from_file_unknown(data):
file_path = data.packages.join("gmpy-1.15.tar.gz")
file_link = Link(
"http://testserver/gmpy-1.15.tar.gz"
"#unknown_hash=d41d8cd98f00b204e9800998ecf8427e"
)
download_hash = _get_hash_from_file(file_path, file_link)
assert download_hash is None
def test_check_hash_md5_valid(data):
file_path = data.packages.join("gmpy-1.15.tar.gz")
file_link = Link(
"http://testserver/gmpy-1.15.tar.gz"
"#md5=d41d8cd98f00b204e9800998ecf8427e"
)
download_hash = _get_hash_from_file(file_path, file_link)
_check_hash(download_hash, file_link)
def test_check_hash_md5_invalid(data):
file_path = data.packages.join("gmpy-1.15.tar.gz")
file_link = Link("http://testserver/gmpy-1.15.tar.gz#md5=deadbeef")
download_hash = _get_hash_from_file(file_path, file_link)
with pytest.raises(InstallationError):
_check_hash(download_hash, file_link)
def test_check_hash_sha1_valid(data):
file_path = data.packages.join("gmpy-1.15.tar.gz")
file_link = Link(
"http://testserver/gmpy-1.15.tar.gz"
"#sha1=da39a3ee5e6b4b0d3255bfef95601890afd80709"
)
download_hash = _get_hash_from_file(file_path, file_link)
_check_hash(download_hash, file_link)
def test_check_hash_sha1_invalid(data):
file_path = data.packages.join("gmpy-1.15.tar.gz")
file_link = Link("http://testserver/gmpy-1.15.tar.gz#sha1=deadbeef")
download_hash = _get_hash_from_file(file_path, file_link)
with pytest.raises(InstallationError):
_check_hash(download_hash, file_link)
def test_check_hash_sha224_valid(data):
file_path = data.packages.join("gmpy-1.15.tar.gz")
file_link = Link(
"http://testserver/gmpy-1.15.tar.gz"
"#sha224=d14a028c2a3a2bc9476102bb288234c415a2b01f828ea62ac5b3e42f'"
)
download_hash = _get_hash_from_file(file_path, file_link)
_check_hash(download_hash, file_link)
def test_check_hash_sha224_invalid(data):
file_path = data.packages.join("gmpy-1.15.tar.gz")
file_link = Link("http://testserver/gmpy-1.15.tar.gz#sha224=deadbeef")
download_hash = _get_hash_from_file(file_path, file_link)
with pytest.raises(InstallationError):
_check_hash(download_hash, file_link)
def test_check_hash_sha384_valid(data):
file_path = data.packages.join("gmpy-1.15.tar.gz")
file_link = Link(
"http://testserver/gmpy-1.15.tar.gz"
"#sha384=38b060a751ac96384cd9327eb1b1e36a21fdb71114be07434c0cc7bf63f6"
"e1da274edebfe76f65fbd51ad2f14898b95b"
)
download_hash = _get_hash_from_file(file_path, file_link)
_check_hash(download_hash, file_link)
def test_check_hash_sha384_invalid(data):
file_path = data.packages.join("gmpy-1.15.tar.gz")
file_link = Link("http://testserver/gmpy-1.15.tar.gz#sha384=deadbeef")
download_hash = _get_hash_from_file(file_path, file_link)
with pytest.raises(InstallationError):
_check_hash(download_hash, file_link)
def test_check_hash_sha256_valid(data):
file_path = data.packages.join("gmpy-1.15.tar.gz")
file_link = Link(
"http://testserver/gmpy-1.15.tar.gz"
"#sha256=e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b785"
"2b855"
)
download_hash = _get_hash_from_file(file_path, file_link)
_check_hash(download_hash, file_link)
def test_check_hash_sha256_invalid(data):
file_path = data.packages.join("gmpy-1.15.tar.gz")
file_link = Link("http://testserver/gmpy-1.15.tar.gz#sha256=deadbeef")
download_hash = _get_hash_from_file(file_path, file_link)
with pytest.raises(InstallationError):
_check_hash(download_hash, file_link)
def test_check_hash_sha512_valid(data):
file_path = data.packages.join("gmpy-1.15.tar.gz")
file_link = Link(
"http://testserver/gmpy-1.15.tar.gz"
"#sha512=cf83e1357eefb8bdf1542850d66d8007d620e4050b5715dc83f4a921d36c"
"e9ce47d0d13c5d85f2b0ff8318d2877eec2f63b931bd47417a81a538327af927da3e"
)
download_hash = _get_hash_from_file(file_path, file_link)
_check_hash(download_hash, file_link)
def test_check_hash_sha512_invalid(data):
file_path = data.packages.join("gmpy-1.15.tar.gz")
file_link = Link("http://testserver/gmpy-1.15.tar.gz#sha512=deadbeef")
download_hash = _get_hash_from_file(file_path, file_link)
with pytest.raises(InstallationError):
_check_hash(download_hash, file_link)
def test_check_hasher_mismsatch(data):
file_path = data.packages.join("gmpy-1.15.tar.gz")
file_link = Link(
"http://testserver/gmpy-1.15.tar.gz"
"#md5=d41d8cd98f00b204e9800998ecf8427e"
)
other_link = Link(
"http://testserver/gmpy-1.15.tar.gz"
"#sha256=e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b785"
"2b855"
)
download_hash = _get_hash_from_file(file_path, file_link)
with pytest.raises(InstallationError):
_check_hash(download_hash, other_link)

View file

@ -8,9 +8,12 @@ import pytest
from mock import Mock, patch, mock_open
from pip.exceptions import (PreviousBuildDirError, InvalidWheelFilename,
UnsupportedWheel)
from pip.download import PipSession
from pip.download import path_to_url, PipSession
from pip.exceptions import (HashMissing, HashUnpinned, VcsHashUnsupported,
HashErrors, InstallationError)
from pip.index import PackageFinder
from pip.req import (InstallRequirement, RequirementSet, Requirements)
from pip.req.req_file import process_line
from pip.req.req_install import parse_editable
from pip.utils import read_text_file
from pip._vendor import pkg_resources
@ -26,12 +29,13 @@ class TestRequirementSet(object):
def teardown(self):
shutil.rmtree(self.tempdir, ignore_errors=True)
def basic_reqset(self):
def basic_reqset(self, **kwargs):
return RequirementSet(
build_dir=os.path.join(self.tempdir, 'build'),
src_dir=os.path.join(self.tempdir, 'src'),
download_dir=None,
session=PipSession(),
**kwargs
)
def test_no_reuse_existing_build_dir(self, data):
@ -69,6 +73,158 @@ class TestRequirementSet(object):
else:
assert not reqset.has_requirement('simple')
@pytest.mark.network
def test_missing_hash_checking(self, data):
"""Make sure prepare_files() raises an error when a requirement has no
hash in implicit hash-checking mode.
"""
reqset = self.basic_reqset()
# No flags here. This tests that detection of later flags nonetheless
# requires earlier packages to have hashes:
reqset.add_requirement(
list(process_line('blessings==1.0', 'file', 1))[0])
# This flag activates --require-hashes mode:
reqset.add_requirement(
list(process_line('tracefront==0.1 --sha256=somehash', 'file', 2))[0])
# This hash should be accepted because it came from the reqs file, not
# from the internet:
reqset.add_requirement(
list(process_line('https://pypi.python.org/packages/source/m/more-'
'itertools/more-itertools-1.0.tar.gz#md5=b21850c'
'3cfa7efbb70fd662ab5413bdd', 'file', 3))[0])
finder = PackageFinder([],
['https://pypi.python.org/simple'],
session=PipSession())
assert_raises_regexp(
HashErrors,
r'These requirements were missing hashes.*\n'
r' blessings==1.0 --sha256=[0-9a-f]+\n'
r'THESE PACKAGES DID NOT MATCH THE HASHES.*\n'
r' tracefront==0.1 .*:\n'
r' Expected sha256 somehash\n'
r' Got [0-9a-f]+$',
reqset.prepare_files,
finder)
def test_missing_hash_with_require_hashes(self, data):
"""Setting --require-hashes explicitly should raise errors if hashes
are missing.
"""
reqset = self.basic_reqset(require_hashes=True)
reqset.add_requirement(
list(process_line('simple==1.0', 'file', 1))[0])
finder = PackageFinder([data.find_links], [], session=PipSession())
assert_raises_regexp(
HashErrors,
r'These requirements were missing hashes.*\n'
r' simple==1.0 --sha256=393043e672415891885c9a2a0929b1af95fb866'
r'd6ca016b42d2e6ce53619b653$',
reqset.prepare_files,
finder)
def test_unsupported_hashes(self, data): # NEXT: Add any other test cases needed, probably delete the ones in test_install or just have one or two functional tests to make sure prepare_files() gets called when we expect (so we can actually stop on hash errors), clean up, and call it a day. Make sure we test that hashes are checked all 3 places in pip.download. Test http success.
"""VCS and dir links should raise errors when --require-hashes is
on.
In addition, complaints about the type of requirement (VCS or dir)
should trump the presence or absence of a hash.
"""
reqset = self.basic_reqset(require_hashes=True)
reqset.add_requirement(
list(process_line(
'git+git://github.com/pypa/pip-test-package --sha256=12345',
'file',
1))[0])
dir_path = data.packages.join('FSPkg')
reqset.add_requirement(
list(process_line(
'file://%s' % (dir_path,),
'file',
2))[0])
finder = PackageFinder([data.find_links], [], session=PipSession())
assert_raises_regexp(
HashErrors,
r"Can't verify hashes for these requirements because we don't "
r"have a way to hash version control repositories:\n"
r" git\+git://github\.com/pypa/pip-test-package \(from -r file "
r"\(line 1\)\)\n"
r"Can't verify hashes for these file:// requirements because they "
r"point to directories:\n"
r" file:///.*/data/packages/FSPkg \(from -r file \(line 2\)\)",
reqset.prepare_files,
finder)
def test_unpinned_hash_checking(self, data):
"""Make sure prepare_files() raises an error when a requirement is not
version-pinned in hash-checking mode.
"""
reqset = self.basic_reqset()
# Test that there must be exactly 1 specifier:
reqset.add_requirement(
list(process_line('simple --sha256=a90427ae31f5d1d0d7ec06ee97d9fcf'
'2d0fc9a786985250c1c83fd68df5911dd',
'file',
1))[0])
# Test that the operator must be ==:
reqset.add_requirement(
list(process_line('simple2>1.0 --sha256=3ad45e1e9aa48b4462af0123f6'
'a7e44a9115db1ef945d4d92c123dfe21815a06',
'file',
2))[0])
finder = PackageFinder([data.find_links], [], session=PipSession())
assert_raises_regexp(
HashErrors,
# Make sure all failing requirements are listed:
r'version pinned with ==. These do not:\n'
r' simple .* \(from -r file \(line 1\)\)\n'
r' simple2>1.0 .* \(from -r file \(line 2\)\)',
reqset.prepare_files,
finder)
def test_hash_mismatch(self, data):
"""A hash mismatch should raise an error."""
file_url = path_to_url(
(data.packages / 'simple-1.0.tar.gz').abspath)
reqset = self.basic_reqset(require_hashes=True)
reqset.add_requirement(
list(process_line('%s --sha256=badbad' % file_url, 'file', 1))[0])
finder = PackageFinder([data.find_links], [], session=PipSession())
assert_raises_regexp(
HashErrors,
r'THESE PACKAGES DID NOT MATCH THE HASHES.*\n'
r' file:///.*/data/packages/simple-1\.0\.tar\.gz .*:\n'
r' Expected sha256 badbad\n'
r' Got 393043e672415891885c9a2a0929b1af95fb866d'
r'6ca016b42d2e6ce53619b653$',
reqset.prepare_files,
finder)
def test_no_deps_on_require_hashes(self, data):
"""Make sure --require-hashes mode implies --no-deps."""
reqset = self.basic_reqset()
finder = PackageFinder([data.find_links], [], session=PipSession())
req = list(process_line(
'TopoRequires2==0.0.1 '
'--sha256=eaf9a01242c9f2f42cf2bd82a6a848cd'
'e3591d14f7896bdbefcf48543720c970',
'file', 1))[0]
deps = reqset._prepare_file(finder, req, require_hashes=True)
assert deps == [], ('_prepare_files() resolved dependencies even '
'though --require-hashes was on.')
def test_no_egg_on_require_hashes(self, data):
"""Make sure --egg is illegal with --require-hashes.
--egg would cause dependencies to always be installed, since it cedes
control directly to setuptools.
"""
reqset = self.basic_reqset(require_hashes=True, as_egg=True)
finder = PackageFinder([data.find_links], [], session=PipSession())
with pytest.raises(InstallationError):
reqset.prepare_files(finder)
@pytest.mark.parametrize(('file_contents', 'expected'), [
(b'\xf6\x80', b'\xc3\xb6\xe2\x82\xac'), # cp1252

View file

@ -13,6 +13,7 @@ from pip.index import PackageFinder
from pip.req.req_install import InstallRequirement
from pip.req.req_file import (parse_requirements, process_line, join_lines,
ignore_comments, break_args_options)
from tests.lib import requirements_file
@pytest.fixture
@ -480,12 +481,11 @@ class TestParseRequirements(object):
--install-option "{install_option}"
'''.format(global_option=global_option, install_option=install_option)
req_path = tmpdir.join('requirements.txt')
with open(req_path, 'w') as fh:
fh.write(content)
req = next(parse_requirements(
req_path, finder=finder, options=options, session=session))
with requirements_file(content, tmpdir) as reqs_file:
req = next(parse_requirements(reqs_file.abspath,
finder=finder,
options=options,
session=session))
req.source_dir = os.curdir
with patch.object(subprocess, 'Popen') as popen:

View file

@ -12,9 +12,12 @@ import tempfile
import pytest
from mock import Mock, patch
from pip.exceptions import HashMismatch, HashMissing, InstallationError
from pip.utils import (egg_link_path, Inf, get_installed_distributions,
untar_file, unzip_file, rmtree, normalize_path)
from pip.utils.hashes import Hashes, MissingHashes
from pip.operations.freeze import freeze_excludes
from pip._vendor.six import StringIO
class Tests_EgglinkPath:
@ -406,3 +409,47 @@ class Test_normalize_path(object):
) == os.path.join(tmpdir, 'file_link')
finally:
os.chdir(orig_working_dir)
class TestHashes(object):
"""Tests for pip.utils.hashes"""
def test_success(self, tmpdir):
"""Make sure no error is raised when at least one hash matches.
Test check_against_path because it calls everything else.
"""
file = tmpdir / 'to_hash'
file.write('hello')
hashes = Hashes({
'sha256': ['2cf24dba5fb0a30e26e83b2ac5b9e29e'
'1b161e5c1fa7425e73043362938b9824'],
'sha224': ['wrongwrong'],
'md5': ['5d41402abc4b2a76b9719d911017c592']})
hashes.check_against_path(file)
def test_failure(self):
"""Hashes should raise HashMismatch when no hashes match."""
hashes = Hashes({'sha256': ['wrongwrong']})
with pytest.raises(HashMismatch):
hashes.check_against_file(StringIO('hello'))
def test_missing_hashes(self):
"""MissingHashes should raise HashMissing when any check is done."""
with pytest.raises(HashMissing):
MissingHashes().check_against_file(StringIO('hello'))
def test_unknown_hash(self):
"""Hashes should raise InstallationError when it encounters an unknown
hash."""
hashes = Hashes({'badbad': ['dummy']})
with pytest.raises(InstallationError):
hashes.check_against_file(StringIO('hello'))
def test_non_zero(self):
"""Test that truthiness tests tell whether any known-good hashes
exist."""
assert Hashes({'sha256': 'dummy'})
assert not Hashes()
assert not Hashes({})