Add test helpers for HTTP(S) server and certs

This commit is contained in:
Chris Hunt 2019-11-03 13:09:18 -05:00
parent 39d16b6ca8
commit 204a004377
4 changed files with 255 additions and 0 deletions

View File

@ -13,6 +13,7 @@ from setuptools.wheel import Wheel
from pip._internal.main import main as pip_entry_point
from tests.lib import DATA_DIR, SRC_DIR, TestData
from tests.lib.certs import make_tls_cert, serialize_cert, serialize_key
from tests.lib.path import Path
from tests.lib.scripttest import PipTestEnvironment
from tests.lib.venv import VirtualEnvironment
@ -385,3 +386,21 @@ def in_memory_pip():
def deprecated_python():
"""Used to indicate whether pip deprecated this python version"""
return sys.version_info[:2] in [(2, 7)]
@pytest.fixture(scope="session")
def cert_factory(tmpdir_factory):
def factory():
# type: () -> str
"""Returns path to cert/key file.
"""
output_path = Path(str(tmpdir_factory.mktemp("certs"))) / "cert.pem"
# Must be Text on PY2.
cert, key = make_tls_cert(u"localhost")
with open(str(output_path), "wb") as f:
f.write(serialize_cert(cert))
f.write(serialize_key(key))
return str(output_path)
return factory

53
tests/lib/certs.py Normal file
View File

@ -0,0 +1,53 @@
from datetime import datetime, timedelta
from cryptography import x509
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import hashes, serialization
from cryptography.hazmat.primitives.asymmetric import rsa
from cryptography.x509.oid import NameOID
from pip._internal.utils.typing import MYPY_CHECK_RUNNING
if MYPY_CHECK_RUNNING:
from typing import Text, Tuple
def make_tls_cert(hostname):
# type: (Text) -> Tuple[x509.Certificate, rsa.RSAPrivateKey]
key = rsa.generate_private_key(
public_exponent=65537,
key_size=2048,
backend=default_backend()
)
subject = issuer = x509.Name([
x509.NameAttribute(NameOID.COMMON_NAME, hostname),
])
cert = (
x509.CertificateBuilder()
.subject_name(subject)
.issuer_name(issuer)
.public_key(key.public_key())
.serial_number(x509.random_serial_number())
.not_valid_before(datetime.utcnow())
.not_valid_after(datetime.utcnow() + timedelta(days=10))
.add_extension(
x509.SubjectAlternativeName([x509.DNSName(hostname)]),
critical=False,
)
.sign(key, hashes.SHA256(), default_backend())
)
return cert, key
def serialize_key(key):
# type: (rsa.RSAPrivateKey) -> bytes
return key.private_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.TraditionalOpenSSL,
encryption_algorithm=serialization.NoEncryption(),
)
def serialize_cert(cert):
# type: (x509.Certificate) -> bytes
return cert.public_bytes(serialization.Encoding.PEM)

181
tests/lib/server.py Normal file
View File

@ -0,0 +1,181 @@
import os
import signal
import ssl
import threading
from contextlib import contextmanager
from textwrap import dedent
from mock import Mock
from pip._vendor.contextlib2 import nullcontext
from werkzeug.serving import WSGIRequestHandler
from werkzeug.serving import make_server as _make_server
from pip._internal.utils.typing import MYPY_CHECK_RUNNING
if MYPY_CHECK_RUNNING:
from types import TracebackType
from typing import (
Any, Callable, Dict, Iterable, List, Optional, Text, Tuple, Type, Union
)
from werkzeug.serving import BaseWSGIServer
Environ = Dict[str, str]
Status = str
Headers = Iterable[Tuple[str, str]]
ExcInfo = Tuple[Type[BaseException], BaseException, TracebackType]
Write = Callable[[bytes], None]
StartResponse = Callable[[Status, Headers, Optional[ExcInfo]], Write]
Body = List[bytes]
Responder = Callable[[Environ, StartResponse], Body]
class MockServer(BaseWSGIServer):
mock = Mock() # type: Mock
# Applies on Python 2 and Windows.
if not hasattr(signal, "pthread_sigmask"):
# We're not relying on this behavior anywhere currently, it's just best
# practice.
blocked_signals = nullcontext
else:
@contextmanager
def blocked_signals():
"""Block all signals for e.g. starting a worker thread.
"""
old_mask = signal.pthread_sigmask(
signal.SIG_SETMASK, range(1, signal.NSIG)
)
try:
yield
finally:
signal.pthread_sigmask(signal.SIG_SETMASK, old_mask)
class RequestHandler(WSGIRequestHandler):
def make_environ(self):
environ = super(RequestHandler, self).make_environ()
# From pallets/werkzeug#1469, will probably be in release after
# 0.16.0.
try:
# binary_form=False gives nicer information, but wouldn't be
# compatible with what Nginx or Apache could return.
peer_cert = self.connection.getpeercert(binary_form=True)
if peer_cert is not None:
# Nginx and Apache use PEM format.
environ["SSL_CLIENT_CERT"] = ssl.DER_cert_to_PEM_cert(
peer_cert,
)
except ValueError:
# SSL handshake hasn't finished.
self.server.log("error", "Cannot fetch SSL peer certificate info")
except AttributeError:
# Not using TLS, the socket will not have getpeercert().
pass
return environ
def mock_wsgi_adapter(mock):
# type: (Callable[[Environ, StartResponse], Responder]) -> Responder
"""Uses a mock to record function arguments and provide
the actual function that should respond.
"""
def adapter(environ, start_response):
# type: (Environ, StartResponse) -> Body
responder = mock(environ, start_response)
return responder(environ, start_response)
return adapter
def make_mock_server(**kwargs):
# type: (Any) -> MockServer
kwargs.setdefault("request_handler", RequestHandler)
mock = Mock()
app = mock_wsgi_adapter(mock)
server = _make_server("localhost", 0, app=app, **kwargs)
server.mock = mock
return server
@contextmanager
def server_running(server):
# type: (BaseWSGIServer) -> None
thread = threading.Thread(target=server.serve_forever)
thread.daemon = True
with blocked_signals():
thread.start()
try:
yield
finally:
server.shutdown()
thread.join()
# Helper functions for making responses in a declarative way.
def text_html_response(text):
# type: (Text) -> Responder
def responder(environ, start_response):
# type: (Environ, StartResponse) -> Body
start_response("200 OK", [
("Content-Type", "text/html; charset=UTF-8"),
])
return [text.encode('utf-8')]
return responder
def html5_page(text):
# type: (Union[Text, str]) -> Text
return dedent(u"""
<!DOCTYPE html>
<html>
<body>
{}
</body>
</html>
""").strip().format(text)
def index_page(spec):
# type: (Dict[str, str]) -> Responder
def link(name, value):
return '<a href="{}">{}</a>'.format(
value, name
)
links = ''.join(link(*kv) for kv in spec.items())
return text_html_response(html5_page(links))
def package_page(spec):
# type: (Dict[str, str]) -> Responder
def link(name, value):
return '<a href="{}">{}</a>'.format(
value, name
)
links = ''.join(link(*kv) for kv in spec.items())
return text_html_response(html5_page(links))
def file_response(path):
# type: (str) -> Responder
def responder(environ, start_response):
# type: (Environ, StartResponse) -> Body
size = os.stat(path).st_size
start_response(
"200 OK", [
("Content-Type", "application/octet-stream"),
("Content-Length", str(size)),
],
)
with open(path, 'rb') as f:
return [f.read()]
return responder

View File

@ -1,3 +1,4 @@
cryptography==2.8
freezegun
mock
pretend
@ -12,4 +13,5 @@ pyyaml
setuptools>=39.2.0 # Needed for `setuptools.wheel.Wheel` support.
scripttest
https://github.com/pypa/virtualenv/archive/master.zip#egg=virtualenv
werkzeug==0.16.0
wheel