2013-05-31 00:43:58 +02:00
|
|
|
import hashlib
|
2013-05-28 23:58:08 +02:00
|
|
|
import os
|
|
|
|
from shutil import rmtree
|
|
|
|
from tempfile import mkdtemp
|
|
|
|
|
2013-08-16 14:04:27 +02:00
|
|
|
from mock import Mock, patch
|
2013-05-31 00:43:58 +02:00
|
|
|
import pip
|
|
|
|
from pip.backwardcompat import urllib, BytesIO, b
|
2013-08-16 14:04:27 +02:00
|
|
|
from pip.download import PipSession, path_to_url2, unpack_http_url
|
2013-05-28 23:58:08 +02:00
|
|
|
from pip.index import Link
|
|
|
|
|
|
|
|
|
2013-08-23 13:12:37 +02:00
|
|
|
def test_unpack_http_url_with_urllib_response_without_content_type(data):
|
2013-05-28 23:58:08 +02:00
|
|
|
"""
|
|
|
|
It should download and unpack files even if no Content-Type header exists
|
|
|
|
"""
|
2013-08-16 14:04:27 +02:00
|
|
|
_real_session = PipSession()
|
|
|
|
|
|
|
|
def _fake_session_get(*args, **kwargs):
|
|
|
|
resp = _real_session.get(*args, **kwargs)
|
|
|
|
del resp.headers["Content-Type"]
|
2013-05-28 23:58:08 +02:00
|
|
|
return resp
|
|
|
|
|
2013-08-16 14:04:27 +02:00
|
|
|
session = Mock()
|
|
|
|
session.get = _fake_session_get
|
|
|
|
|
|
|
|
uri = path_to_url2(data.packages.join("simple-1.0.tar.gz"))
|
|
|
|
link = Link(uri)
|
|
|
|
temp_dir = mkdtemp()
|
|
|
|
try:
|
|
|
|
unpack_http_url(link, temp_dir,
|
|
|
|
download_cache=None,
|
|
|
|
download_dir=None,
|
|
|
|
session=session,
|
|
|
|
)
|
|
|
|
assert set(os.listdir(temp_dir)) == set(['PKG-INFO', 'setup.cfg', 'setup.py', 'simple', 'simple.egg-info'])
|
|
|
|
finally:
|
|
|
|
rmtree(temp_dir)
|
2013-05-28 23:58:08 +02:00
|
|
|
|
|
|
|
|
|
|
|
def test_user_agent():
|
2013-08-16 14:04:27 +02:00
|
|
|
PipSession().headers["User-Agent"].startswith("pip/%s" % pip.__version__)
|
2013-05-30 23:03:04 +02:00
|
|
|
|
|
|
|
|
2013-05-31 00:43:58 +02:00
|
|
|
def _write_file(fn, contents):
|
|
|
|
with open(fn, 'w') as fh:
|
|
|
|
fh.write(contents)
|
|
|
|
|
|
|
|
|
|
|
|
class MockResponse(object):
|
2013-08-16 14:04:27 +02:00
|
|
|
|
2013-05-31 00:43:58 +02:00
|
|
|
def __init__(self, contents):
|
|
|
|
self._io = BytesIO(contents)
|
|
|
|
|
2013-08-16 14:04:27 +02:00
|
|
|
def iter_content(self, size):
|
|
|
|
yield self._io.read(size)
|
|
|
|
|
|
|
|
def raise_for_status(self):
|
|
|
|
pass
|
2013-05-31 00:43:58 +02:00
|
|
|
|
|
|
|
|
|
|
|
@patch('pip.download.unpack_file')
|
2013-08-16 14:04:27 +02:00
|
|
|
def test_unpack_http_url_bad_cache_checksum(mock_unpack_file):
|
2013-05-30 23:03:04 +02:00
|
|
|
"""
|
|
|
|
If cached download has bad checksum, re-download.
|
|
|
|
"""
|
2013-05-31 00:43:58 +02:00
|
|
|
base_url = 'http://www.example.com/somepackage.tgz'
|
|
|
|
contents = b('downloaded')
|
|
|
|
download_hash = hashlib.new('sha1', contents)
|
|
|
|
link = Link(base_url + '#sha1=' + download_hash.hexdigest())
|
2013-08-16 14:04:27 +02:00
|
|
|
|
|
|
|
session = Mock()
|
|
|
|
session.get = Mock()
|
|
|
|
response = session.get.return_value = MockResponse(contents)
|
|
|
|
response.headers = {'content-type': 'application/x-tar'}
|
|
|
|
response.url = base_url
|
2013-05-31 00:43:58 +02:00
|
|
|
|
|
|
|
cache_dir = mkdtemp()
|
|
|
|
try:
|
|
|
|
cache_file = os.path.join(cache_dir, urllib.quote(base_url, ''))
|
|
|
|
cache_ct_file = cache_file + '.content-type'
|
|
|
|
_write_file(cache_file, 'some contents')
|
|
|
|
_write_file(cache_ct_file, 'application/x-tar')
|
2013-05-30 23:03:04 +02:00
|
|
|
|
2013-08-16 14:04:27 +02:00
|
|
|
unpack_http_url(link, 'location',
|
|
|
|
download_cache=cache_dir,
|
|
|
|
session=session,
|
|
|
|
)
|
2013-05-30 23:03:04 +02:00
|
|
|
|
2013-05-31 00:43:58 +02:00
|
|
|
# despite existence of cached file with bad hash, downloaded again
|
2013-08-16 14:04:27 +02:00
|
|
|
session.get.assert_called_once_with(
|
|
|
|
"http://www.example.com/somepackage.tgz",
|
|
|
|
stream=True,
|
|
|
|
)
|
2013-05-31 00:43:58 +02:00
|
|
|
# cached file is replaced with newly downloaded file
|
|
|
|
with open(cache_file) as fh:
|
|
|
|
assert fh.read() == 'downloaded'
|
|
|
|
|
|
|
|
finally:
|
|
|
|
rmtree(cache_dir)
|
|
|
|
|
|
|
|
|
|
|
|
@patch('pip.download.unpack_file')
|
2013-08-16 14:04:27 +02:00
|
|
|
def test_unpack_http_url_bad_downloaded_checksum(mock_unpack_file):
|
2013-05-30 23:03:04 +02:00
|
|
|
"""
|
|
|
|
If already-downloaded file has bad checksum, re-download.
|
|
|
|
"""
|
2013-05-31 00:43:58 +02:00
|
|
|
base_url = 'http://www.example.com/somepackage.tgz'
|
|
|
|
contents = b('downloaded')
|
|
|
|
download_hash = hashlib.new('sha1', contents)
|
|
|
|
link = Link(base_url + '#sha1=' + download_hash.hexdigest())
|
2013-08-16 14:04:27 +02:00
|
|
|
|
|
|
|
session = Mock()
|
|
|
|
session.get = Mock()
|
|
|
|
response = session.get.return_value = MockResponse(contents)
|
|
|
|
response.headers = {'content-type': 'application/x-tar'}
|
|
|
|
response.url = base_url
|
2013-05-31 00:43:58 +02:00
|
|
|
|
|
|
|
download_dir = mkdtemp()
|
|
|
|
try:
|
|
|
|
downloaded_file = os.path.join(download_dir, 'somepackage.tgz')
|
|
|
|
_write_file(downloaded_file, 'some contents')
|
|
|
|
|
2013-08-16 14:04:27 +02:00
|
|
|
unpack_http_url(link, 'location',
|
|
|
|
download_cache=None,
|
|
|
|
download_dir=download_dir,
|
|
|
|
session=session,
|
|
|
|
)
|
2013-05-31 00:43:58 +02:00
|
|
|
|
|
|
|
# despite existence of downloaded file with bad hash, downloaded again
|
2013-08-16 14:04:27 +02:00
|
|
|
session.get.assert_called_once_with(
|
|
|
|
'http://www.example.com/somepackage.tgz',
|
|
|
|
stream=True,
|
|
|
|
)
|
2013-05-31 00:43:58 +02:00
|
|
|
# cached file is replaced with newly downloaded file
|
|
|
|
with open(downloaded_file) as fh:
|
|
|
|
assert fh.read() == 'downloaded'
|
|
|
|
|
|
|
|
finally:
|
|
|
|
rmtree(download_dir)
|