Improve HTTP authentication handling (bug 525)

Add some additional code and checks to make the use
of password-protected podcasts more stable and easier
to use. Thanks to Dan Ramos for the bug report.
This commit is contained in:
Thomas Perl 2009-08-24 13:04:11 +02:00
parent dbb026b1cc
commit 996b2bcda2
4 changed files with 101 additions and 14 deletions

View File

@ -148,6 +148,7 @@ class ContentRange(object):
class DownloadCancelledException(Exception): pass
class AuthenticationError(Exception): pass
class gPodderDownloadHTTPError(Exception):
def __init__(self, url, error_code, error_message):
@ -160,6 +161,7 @@ class DownloadURLOpener(urllib.FancyURLopener):
def __init__( self, channel):
self.channel = channel
self._auth_retry_counter = 0
urllib.FancyURLopener.__init__(self, None)
def http_error_default(self, url, fp, errcode, errmsg, headers):
@ -284,11 +286,16 @@ class DownloadURLOpener(urllib.FancyURLopener):
# end code based on urllib.py
def prompt_user_passwd( self, host, realm):
# Keep track of authentication attempts, fail after the third one
self._auth_retry_counter += 1
if self._auth_retry_counter > 3:
raise AuthenticationError(_('Wrong username/password'))
if self.channel.username or self.channel.password:
log( 'Authenticating as "%s" to "%s" for realm "%s".', self.channel.username, host, realm, sender = self)
return ( self.channel.username, self.channel.password )
return ( None, None )
return (None, None)
class DownloadQueueWorker(threading.Thread):

View File

@ -63,15 +63,7 @@ class gPodderFetcher(feedcore.Fetcher):
# If we have a username or password, rebuild the url with them included
# Note: using a HTTPBasicAuthHandler would be pain because we need to
# know the realm. It can be done, but I think this method works, too
if channel.username or channel.password:
username = urllib.quote(channel.username)
password = urllib.quote(channel.password)
auth_string = ':'.join((username, password))
url_parts = list(urlparse.urlsplit(channel.url))
url_parts[1] = '@'.join((auth_string, url_parts[1]))
url = urlparse.urlunsplit(url_parts)
else:
url = channel.url
url = channel.authenticate_url(channel.url)
self.fetch(url, etag, modified)
def _resolve_url(self, url):
@ -291,6 +283,9 @@ class PodcastChannel(PodcastModelObject):
def stat(self, state=None, is_played=None, is_locked=None):
return self.db.get_channel_stat(self.url, state=state, is_played=is_played, is_locked=is_locked)
def authenticate_url(self, url):
return util.url_add_authentication(url, self.username, self.password)
def __init__(self, db, download_dir):
self.db = db
self.download_dir = download_dir
@ -759,7 +754,7 @@ class PodcastEpisode(PodcastModelObject):
# If we arrive here, current_try has a collision, so
# try to resolve the URL for a better basename
log('Filename collision: %s - trying to resolve...', current_try)
url = util.get_real_url(url)
url = util.get_real_url(self.channel.authenticate_url(url))
(episode_filename, extension_UNUSED) = util.filename_from_url(url)
current_try = util.sanitize_filename(episode_filename, self.MAX_FILENAME_LENGTH)+extension
if not self.db.episode_filename_exists(current_try) and current_try:
@ -832,7 +827,7 @@ class PodcastEpisode(PodcastModelObject):
if 'redirect' in fn_template:
# This looks like a redirection URL - force URL resolving!
log('Looks like a redirection to me: %s', self.url, sender=self)
url = util.get_real_url(self.url)
url = util.get_real_url(self.channel.authenticate_url(self.url))
log('Redirection resolved to: %s', url, sender=self)
(episode_filename, extension_UNUSED) = util.filename_from_url(url)
fn_template = util.sanitize_filename(episode_filename, self.MAX_FILENAME_LENGTH)

View File

@ -148,7 +148,7 @@ def find_youtube_channels(string):
url = 'http://www.youtube.com/results?search_query='+ urllib.quote(string, '') +'&search_type=search_users&aq=f'
r = re.compile('>\s+<')
data = r.sub('><', urllib.urlopen(url).read())
data = r.sub('><', urllib2.urlopen(url).read())
r1 = re.compile('<a href="/user/([^"]+)"[^>]*>([^<]+)</a>')
m1 = r1.findall(data)

View File

@ -679,12 +679,97 @@ def format_desktop_command(command, filenames):
return commands
def url_strip_authentication(url):
"""
Strips authentication data from an URL. Returns the URL with
the authentication data removed from it.
>>> url_strip_authentication('https://host.com/')
'https://host.com/'
>>> url_strip_authentication('telnet://foo:bar@host.com/')
'telnet://host.com/'
>>> url_strip_authentication('ftp://billy@example.org')
'ftp://example.org'
>>> url_strip_authentication('ftp://billy:@example.org')
'ftp://example.org'
>>> url_strip_authentication('http://aa:bc@localhost/x')
'http://localhost/x'
>>> url_strip_authentication('http://i%2Fo:P%40ss%3A@blubb.lan/u.html')
'http://blubb.lan/u.html'
>>> url_strip_authentication('http://c:d@x.org/')
'http://x.org/'
>>> url_strip_authentication('http://P%40%3A:i%2F@cx.lan')
'http://cx.lan'
"""
url_parts = list(urlparse.urlsplit(url))
# url_parts[1] is the HOST part of the URL
# Remove existing authentication data
if '@' in url_parts[1]:
url_parts[1] = url_parts[1].split('@', 2)[1]
return urlparse.urlunsplit(url_parts)
def url_add_authentication(url, username, password):
"""
Adds authentication data (username, password) to a given
URL in order to construct an authenticated URL.
>>> url_add_authentication('https://host.com/', '', None)
'https://host.com/'
>>> url_add_authentication('http://example.org/', None, None)
'http://example.org/'
>>> url_add_authentication('telnet://host.com/', 'foo', 'bar')
'telnet://foo:bar@host.com/'
>>> url_add_authentication('ftp://example.org', 'billy', None)
'ftp://billy@example.org'
>>> url_add_authentication('ftp://example.org', 'billy', '')
'ftp://billy:@example.org'
>>> url_add_authentication('http://localhost/x', 'aa', 'bc')
'http://aa:bc@localhost/x'
>>> url_add_authentication('http://blubb.lan/u.html', 'i/o', 'P@ss:')
'http://i%2Fo:P%40ss%3A@blubb.lan/u.html'
>>> url_add_authentication('http://a:b@x.org/', 'c', 'd')
'http://c:d@x.org/'
>>> url_add_authentication('http://i%2F:P%40%3A@cx.lan', 'P@:', 'i/')
'http://P%40%3A:i%2F@cx.lan'
"""
if username is None or username == '':
return url
username = urllib.quote_plus(username)
if password is not None:
password = urllib.quote_plus(password)
auth_string = ':'.join((username, password))
else:
auth_string = username
url = url_strip_authentication(url)
url_parts = list(urlparse.urlsplit(url))
# url_parts[1] is the HOST part of the URL
url_parts[1] = '@'.join((auth_string, url_parts[1]))
return urlparse.urlunsplit(url_parts)
def get_real_url(url):
"""
Gets the real URL of a file and resolves all redirects.
"""
return urllib.urlopen(url).geturl()
username, password = username_password_from_url(url)
if username or password:
url = url_strip_authentication(url)
log('url=%s, username=%s, password=%s', url, username, password)
password_mgr = urllib2.HTTPPasswordMgrWithDefaultRealm()
password_mgr.add_password(None, url, username, password)
handler = urllib2.HTTPBasicAuthHandler(password_mgr)
opener = urllib2.build_opener(handler)
return opener.open(url).geturl()
else:
return urllib2.urlopen(url).geturl()
def find_command( command):