pip/tests/lib/pypi_server.py

139 lines
4.5 KiB
Python

import os
import sys
import pip.backwardcompat
from pip.backwardcompat import urllib, string_types, b, u, emailmessage
urlopen_original = pip.backwardcompat.urllib2.urlopen
class CachedResponse(object):
"""
CachedResponse always cache url access and returns the cached response.
It returns an object compatible with ``urllib.addinfourl``,
it means the object is like the result of a call like::
>>> response = urllib2.urlopen('http://example.com')
"""
def __init__(self, url, folder):
self.headers = emailmessage.Message()
# patch due to setuptools>=0.7 header processing
# easy_install fails w/o this on windows/py2
# https://github.com/pypa/pip/issues/946#issuecomment-20860320
if sys.version_info < (3,):
def getheaders(key):
return self.headers.get_all(key)
self.headers.getheaders = getheaders
self.code = 500
self.msg = 'Internal Server Error'
# url can be a simple string, or a urllib2.Request object
if isinstance(url, string_types):
self.url = url
else:
self.url = url.get_full_url()
for key, value in url.headers.items():
self.headers[key] = value
self._body = b('')
self._set_all_fields(folder)
def _set_all_fields(self, folder):
filename = os.path.join(folder, urllib.quote(self.url, ''))
if not os.path.exists(filename):
self._cache_url(filename)
fp = open(filename, 'rb')
try:
line = fp.readline().strip()
self.code, self.msg = line.split(None, 1)
except ValueError:
raise ValueError('Bad field line: %r' % line)
self.code = int(self.code)
self.msg = u(self.msg)
for line in fp:
if line == b('\n'):
break
key, value = line.split(b(': '), 1)
self.headers[u(key)] = u(value.strip())
for line in fp:
self._body += line
fp.close()
def getcode(self):
return self.code
def geturl(self):
return self.url
def info(self):
return self.headers
def read(self, bytes=None):
"""
it can read a chunk of bytes or everything
"""
if bytes:
result = self._body[:bytes]
self._body = self._body[bytes:]
return result
return self._body
def close(self):
pass
def _cache_url(self, filepath):
response = urlopen_original(self.url)
fp = open(filepath, 'wb')
# when it uses file:// scheme, code is None and there is no msg attr
# but it has been successfully opened
status = b('%s %s' % (getattr(response, 'code', 200) or 200, getattr(response, 'msg', 'OK')))
headers = [b('%s: %s' % (key, value)) for key, value in list(response.headers.items())]
body = response.read()
fp.write(b('\n').join([status] + headers + [b(''), body]))
fp.close()
class PyPIProxy(object):
CACHE_PATH = os.path.join(os.path.dirname(os.path.dirname(__file__)), 'tests_cache')
@classmethod
def setup(cls):
instance = cls()
instance._create_cache_folder()
instance._monkey_patch_urllib2_to_cache_everything()
def _monkey_patch_urllib2_to_cache_everything(self):
def urlopen(url):
return CachedResponse(url, self.CACHE_PATH)
pip.backwardcompat.urllib2.urlopen = urlopen
def _create_cache_folder(self):
if not os.path.exists(self.CACHE_PATH):
os.mkdir(self.CACHE_PATH)
def assert_equal(a, b):
assert a == b, "\nexpected:\n%r\ngot:\n%r" % (b, a)
def test_cache_proxy():
url = 'http://example.com'
here = os.path.dirname(os.path.abspath(__file__))
filepath = os.path.join(here, urllib.quote(url, ''))
if os.path.exists(filepath):
os.remove(filepath)
response = pip.backwardcompat.urllib2.urlopen(url)
r = CachedResponse(url, here)
try:
assert_equal(r.code, response.code)
assert_equal(r.msg, response.msg)
assert_equal(r.read(), response.read())
assert_equal(r.url, response.url)
assert_equal(r.geturl(), response.geturl())
assert_equal(set(r.headers.keys()), set(response.headers.keys()))
assert_equal(set(r.info().keys()), set(response.info().keys()))
assert_equal(r.headers['content-length'], response.headers['content-length'])
finally:
os.remove(filepath)