1
1
Fork 0
mirror of https://github.com/pypa/pip synced 2023-12-13 21:30:23 +01:00

intercept everything monkey patching wsgi_intercept

This commit is contained in:
Hugo Lopes Tavares 2010-06-30 18:33:43 -03:00
parent d3f0e2133c
commit 3df79d6476

View file

@ -1,23 +1,29 @@
import os
import re
import urllib2
import wsgi_intercept
import urllib
import webob
from UserDict import DictMixin
from wsgiproxy.exactproxy import proxy_exact_request
from webob.dec import wsgify
from wsgi_intercept.urllib2_intercept import install_opener
class HasEverythingProxiedWSGIIntercept(DictMixin):
def __contains__(self, key):
return True
def __getitem__(self, item):
return (PyPIProxy, '')
class PyPIProxy(object):
CACHE_PATH = os.path.join(os.path.dirname(os.path.abspath(__file__)), "pypi_cache")
DOMAIN_NAMES_FILEPATH = os.path.join(CACHE_PATH, 'domains.txt')
CACHE_PATH = os.path.join(os.path.dirname(os.path.abspath(__file__)), "tests_cache")
@classmethod
def setup(cls):
instance = cls()
instance._monkey_patch_urllib2()
instance._create_cache_folder()
instance._add_wsgi_intercepts()
return instance
@ -54,43 +60,11 @@ class PyPIProxy(object):
return (not os.path.exists(self._get_cache_filename(request)) and
request.method == 'GET')
def _monkey_patch_urllib2(self):
urlopen = urllib2.urlopen
def open_and_add_unknown_domains(arg):
if isinstance(arg, basestring):
self._store_domain(arg)
else:
self._store_domain(arg.get_full_url())
return urlopen(arg)
urllib2.urlopen = open_and_add_unknown_domains
def _add_wsgi_intercepts(self):
"""allow wsgi_intercept to work with urllib2 fakes"""
wsgi_intercept._wsgi_intercept = HasEverythingProxiedWSGIIntercept()
install_opener()
domain_fp = open(PyPIProxy.DOMAIN_NAMES_FILEPATH)
for line in domain_fp:
wsgi_intercept.add_wsgi_intercept(line.strip(), 80, PyPIProxy)
domain_fp.close()
def _create_cache_folder(self):
if not os.path.exists(self.CACHE_PATH):
os.mkdir(self.CACHE_PATH)
domain_fp = open(PyPIProxy.DOMAIN_NAMES_FILEPATH, 'w')
domain_fp.write('pypi.python.org\n')
domain_fp.close()
def _store_domain(self, url):
r = re.match(r'https?://([^/]+)', url)
if not r:
return
domain_line = r.group(1) + '\n'
if domain_line not in self._get_domains_to_be_intercepted():
domain_fp = open(PyPIProxy.DOMAIN_NAMES_FILEPATH, 'a')
domain_fp.write(domain_line)
domain_fp.close()
def _get_domains_to_be_intercepted(self):
domain_fp = open(PyPIProxy.DOMAIN_NAMES_FILEPATH)
domains = domain_fp.readlines()
domain_fp.close()
return domains