gpodder/tools/test-auth-server.py

206 lines
7.3 KiB
Python
Executable File

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# Simple HTTP web server for testing HTTP Authentication (see bug 1539)
# from our crappy-but-does-the-job department
# Thomas Perl <thp.io/about>; 2012-01-20
import base64
import datetime
import hashlib
import http.server
import re
import sys
import threading
import time
USERNAME = 'user@example.com' # Username used for HTTP Authentication
PASSWORD = 'secret' # Password used for HTTP Authentication
HOST, PORT, RPORT = 'localhost', 8000, 8001 # Hostname and port for the HTTP server
# When the script contents change, the feed's episodes each get a new GUID
GUID = hashlib.sha1(open(__file__, mode='rb').read()).hexdigest()
URL = 'http://%(HOST)s:%(PORT)s' % locals()
FEEDNAME = sys.argv[0] # The title of the RSS feed
REDIRECT = 'redirect.rss' # The path for a redirection
REDIRECT_TO_BAD_HOST = 'redirect_bad' # The path for a redirection
FEEDFILE = 'feed.rss' # The "filename" of the feed on the server
EPISODES = 'episode' # Base name for the episode files
TIMEOUT = 'timeout' # The path to never return
EPISODES_EXT = '.mp3' # Extension for the episode files
EPISODES_MIME = 'audio/mpeg' # Mime type for the episode files
EP_COUNT = 7 # Number of episodes in the feed
SIZE = 500000 # Size (in bytes) of the episode downloads)
def mkpubdates(items):
"""Generate fake pubDates (one each day, recently)"""
current = datetime.datetime.now() - datetime.timedelta(days=items + 3)
for i in range(items):
yield current.ctime()
current += datetime.timedelta(days=1)
def mkrss(items=EP_COUNT):
"""Generate a dumm RSS feed with a given number of items"""
ITEMS = '\n'.join("""
<item>
<title>Episode %(INDEX)s</title>
<guid>tag:test.gpodder.org,2012:%(GUID)s,%(URL)s,%(INDEX)s</guid>
<pubDate>%(PUBDATE)s</pubDate>
<enclosure
url="%(URL)s/%(EPISODES)s%(INDEX)s%(EPISODES_EXT)s"
type="%(EPISODES_MIME)s"
length="%(SIZE)s"/>
</item>
""" % dict(list(locals().items()) + list(globals().items()))
for INDEX, PUBDATE in enumerate(mkpubdates(items)))
ITEMS += """
<item>
<title>Missing Episode</title>
<guid>tag:test.gpodder.org,2012:missing</guid>
<pubDate>Sun, 25 Nov 2018 17:28:03 +0000</pubDate>
<enclosure
url="%(URL)s/not_there%(EPISODES_EXT)s"
type="%(EPISODES_MIME)s"
length="%(SIZE)s"/>
</item>""" % dict(list(locals().items()) + list(globals().items()))
ITEMS += """
<item>
<title>Server Timeout Episode</title>
<guid>tag:test.gpodder.org,2012:timeout</guid>
<pubDate>Sun, 25 Nov 2018 17:28:03 +0000</pubDate>
<enclosure
url="%(URL)s/%(TIMEOUT)s"
type="%(EPISODES_MIME)s"
length="%(SIZE)s"/>
</item>""" % dict(list(locals().items()) + list(globals().items()))
ITEMS += """
<item>
<title>Bad Host Episode</title>
<guid>tag:test.gpodder.org,2012:timeout</guid>
<pubDate>Sun, 25 Nov 2018 17:28:03 +0000</pubDate>
<enclosure
url="%(URL)s/%(REDIRECT_TO_BAD_HOST)s"
type="%(EPISODES_MIME)s"
length="%(SIZE)s"/>
</item>""" % dict(list(locals().items()) + list(globals().items()))
ITEMS += """
<item>
<title>Space in url Episode</title>
<guid>tag:test.gpodder.org,2012:timeout</guid>
<pubDate>Sun, 25 Nov 2018 17:28:03 +0000</pubDate>
<enclosure
url="%(URL)s/%(EPISODES)s with space%(EPISODES_EXT)s"
type="%(EPISODES_MIME)s"
length="%(SIZE)s"/>
</item>""" % dict(list(locals().items()) + list(globals().items()))
return """
<rss>
<channel><title>%(FEEDNAME)s</title><link>%(URL)s</link>
%(ITEMS)s
</channel>
</rss>
""" % dict(list(locals().items()) + list(globals().items()))
def mkdata(size=SIZE):
"""Generate dummy data of a given size (in bytes)"""
return bytes([32 + (i % (127 - 32)) for i in range(size)])
class AuthRequestHandler(http.server.BaseHTTPRequestHandler):
FEEDFILE_PATH = '/%s' % FEEDFILE
EPISODES_PATH = '/%s' % EPISODES
REDIRECT_PATH = '/%s' % REDIRECT
REDIRECT_TO_BAD_HOST_PATH = '/%s' % REDIRECT_TO_BAD_HOST
TIMEOUT_PATH = '/%s' % TIMEOUT
def do_GET(self):
authorized = False
is_feed = False
is_episode = False
auth_header = self.headers.get('authorization', '')
m = re.match(r'^Basic (.*)$', auth_header)
if m is not None:
auth_data = base64.b64decode(m.group(1)).decode().split(':', 1)
if len(auth_data) == 2:
username, password = auth_data
print('Got username:', username)
print('Got password:', password)
if (username, password) == (USERNAME, PASSWORD):
print('Valid credentials provided.')
authorized = True
if self.path == self.FEEDFILE_PATH:
print('Feed request.')
is_feed = True
elif self.path.startswith(self.EPISODES_PATH):
print('Episode request.')
is_episode = True
elif self.path == self.REDIRECT_PATH:
print('Redirect request.')
self.send_response(302)
self.send_header('Location', '%s/%s' % (URL, FEEDFILE))
self.end_headers()
return
elif self.path.startswith(self.REDIRECT_TO_BAD_HOST_PATH):
print('Redirect request => bad host.')
self.send_response(302)
self.send_header('Location', '//notthere.gpodder.io/%s' % (FEEDFILE))
self.end_headers()
return
elif self.path == self.TIMEOUT_PATH:
# will need to restart the server or wait 80s before next request
time.sleep(80)
return
if not authorized:
print('Not authorized - sending WWW-Authenticate header.')
self.send_response(401)
self.send_header('WWW-Authenticate',
'Basic realm="%s"' % sys.argv[0])
self.end_headers()
return
if not is_feed and not is_episode:
print('Not there episode - sending 404.')
self.send_response(404)
self.end_headers()
return
self.send_response(200)
self.send_header('Content-type',
'application/xml' if is_feed else 'audio/mpeg')
self.end_headers()
self.wfile.write(mkrss().encode('utf-8') if is_feed else mkdata())
def run(httpd):
while True:
httpd.handle_request()
if __name__ == '__main__':
httpd = http.server.HTTPServer((HOST, PORT), AuthRequestHandler)
print("""
Feed URL: %(URL)s/%(FEEDFILE)s
Redirect URL: http://%(HOST)s:%(RPORT)d/%(REDIRECT)s
Timeout URL: %(URL)s/%(TIMEOUT)s
Username: %(USERNAME)s
Password: %(PASSWORD)s
""" % locals())
httpdr = http.server.HTTPServer((HOST, RPORT), AuthRequestHandler)
t1 = threading.Thread(name='http', target=run, args=(httpd,), daemon=True)
t1.start()
t2 = threading.Thread(name='http redirect', target=run, args=(httpdr,), daemon=True)
t2.start()
try:
t1.join()
t2.join()
except KeyboardInterrupt:
pass