Merge pull request #437 from XIncognito10/add_youtube_parsing
Implements #384: Youtube Feeds without API
This commit is contained in:
commit
368a20e5df
4
bin/gpo
4
bin/gpo
|
@ -290,8 +290,8 @@ class gPodderCli(object):
|
|||
self._error(_('Invalid url: %s') % url)
|
||||
return None
|
||||
|
||||
# Check if it's a YouTube feed, and if we have an API key, auto-resolve the channel
|
||||
url = youtube.resolve_v3_url(url, self._config.youtube.api_key_v3)
|
||||
# Check if it's a YouTube channel, user, or playlist and resolves it to its feed if that's the case
|
||||
url = youtube.parse_youtube_url(url)
|
||||
|
||||
# Subscribe to new podcast
|
||||
if create:
|
||||
|
|
|
@ -70,7 +70,7 @@ class CoverDownloader(object):
|
|||
# If allowed to download files, do so here
|
||||
if download:
|
||||
# YouTube-specific cover art image resolver
|
||||
youtube_cover_url = youtube.get_real_cover(feed_url)
|
||||
youtube_cover_url = youtube.get_cover(feed_url)
|
||||
if youtube_cover_url is not None:
|
||||
cover_url = youtube_cover_url
|
||||
|
||||
|
|
|
@ -2305,8 +2305,8 @@ class gPodder(BuilderWidget, dbus.service.Object):
|
|||
for input_title, input_url in podcasts:
|
||||
url = util.normalize_feed_url(input_url)
|
||||
|
||||
# Check if it's a YouTube feed, and if we have an API key, auto-resolve the channel
|
||||
url = youtube.resolve_v3_url(url, self.config.youtube.api_key_v3)
|
||||
# Check if it's a YouTube channel, user, or playlist and resolves it to its feed if that's the case
|
||||
url = youtube.parse_youtube_url(url)
|
||||
|
||||
if url is None:
|
||||
# Fail this one because the URL is not valid
|
||||
|
|
|
@ -20,23 +20,16 @@
|
|||
# Justin Forest <justin.forest@gmail.com> 2008-10-13
|
||||
#
|
||||
|
||||
|
||||
import gpodder
|
||||
|
||||
from urllib.parse import parse_qs
|
||||
from gpodder import util
|
||||
|
||||
import os.path
|
||||
|
||||
from html.parser import HTMLParser
|
||||
import json
|
||||
import re
|
||||
import urllib
|
||||
import xml.etree.ElementTree
|
||||
import logging
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
import json
|
||||
|
||||
import re
|
||||
import urllib.request, urllib.parse, urllib.error
|
||||
|
||||
from urllib.parse import parse_qs
|
||||
|
||||
# http://en.wikipedia.org/wiki/YouTube#Quality_and_codecs
|
||||
# format id, (preferred ids, path(?), description) # video bitrate, audio bitrate
|
||||
formats = [
|
||||
|
@ -74,7 +67,8 @@ V3_API_ENDPOINT = 'https://www.googleapis.com/youtube/v3'
|
|||
CHANNEL_VIDEOS_XML = 'https://www.youtube.com/feeds/videos.xml'
|
||||
|
||||
|
||||
class YouTubeError(Exception): pass
|
||||
class YouTubeError(Exception):
|
||||
pass
|
||||
|
||||
|
||||
def get_fmt_ids(youtube_config):
|
||||
|
@ -91,7 +85,7 @@ def get_fmt_ids(youtube_config):
|
|||
|
||||
def get_real_download_url(url, preferred_fmt_ids=None):
|
||||
if not preferred_fmt_ids:
|
||||
preferred_fmt_ids, _, _ = formats_dict[22] # MP4 720p
|
||||
preferred_fmt_ids, _, _ = formats_dict[22] # MP4 720p
|
||||
|
||||
vid = get_youtube_id(url)
|
||||
if vid is not None:
|
||||
|
@ -209,26 +203,44 @@ def get_real_channel_url(url):
|
|||
return for_each_feed_pattern(return_user_feed, url, url)
|
||||
|
||||
|
||||
def get_real_cover(url):
|
||||
def return_user_cover(url, channel):
|
||||
try:
|
||||
api_url = 'https://www.youtube.com/channel/{0}'.format(channel)
|
||||
data = util.urlopen(api_url).read().decode('utf-8')
|
||||
# Look for 900x900px image first.
|
||||
m = re.search('<link rel="image_src"[^>]* href=[\'"]([^\'"]+)[\'"][^>]*>', data)
|
||||
if m is None:
|
||||
def get_cover(url):
|
||||
if 'youtube.com' in url:
|
||||
|
||||
class YouTubeHTMLCoverParser(HTMLParser):
|
||||
"""This custom html parser searches for the youtube channel thumbnail/avatar"""
|
||||
def __init__(self):
|
||||
super().__init__()
|
||||
self.url = ""
|
||||
|
||||
def handle_starttag(self, tag, attributes):
|
||||
attribute_dict = {attribute[0]: attribute[1] for attribute in attributes}
|
||||
|
||||
# Look for 900x900px image first.
|
||||
if tag == 'link' \
|
||||
and 'rel' in attribute_dict \
|
||||
and attribute_dict['rel'] == 'image_src':
|
||||
self.url = attribute_dict['href']
|
||||
|
||||
# Fallback to image that may only be 100x100px.
|
||||
m = re.search('<img class="channel-header-profile-image"[^>]* src=[\'"]([^\'"]+)[\'"][^>]*>', data)
|
||||
if m is not None:
|
||||
logger.debug('YouTube userpic for %s is: %s', url, m.group(1))
|
||||
return m.group(1)
|
||||
except Exception as e:
|
||||
logger.warn('Could not retrieve cover art', exc_info=True)
|
||||
return None
|
||||
elif tag == 'img' \
|
||||
and 'class' in attribute_dict \
|
||||
and attribute_dict['class'] == "channel-header-profile-image":
|
||||
self.url = attribute_dict['src']
|
||||
|
||||
return None
|
||||
try:
|
||||
raw_xml_data = util.urlopen(url).read().decode('utf-8')
|
||||
xml_data = xml.etree.ElementTree.fromstring(raw_xml_data)
|
||||
channel_id = xml_data.find("{http://www.youtube.com/xml/schemas/2015}channelId").text
|
||||
channel_url = 'https://www.youtube.com/channel/{}'.format(channel_id)
|
||||
html_data = util.urlopen(channel_url).read().decode('utf-8')
|
||||
parser = YouTubeHTMLCoverParser()
|
||||
parser.feed(html_data)
|
||||
if parser.url:
|
||||
logger.debug('Youtube cover art for {} is: {}'.format(url, parser.url))
|
||||
return parser.url
|
||||
|
||||
return for_each_feed_pattern(return_user_cover, url, None)
|
||||
except Exception:
|
||||
logger.warning('Could not retrieve cover art', exc_info=True)
|
||||
|
||||
|
||||
def get_channels_for_user(username, api_key_v3):
|
||||
|
@ -250,15 +262,48 @@ def get_channels_for_user(username, api_key_v3):
|
|||
return ['{0}?channel_id={1}'.format(CHANNEL_VIDEOS_XML, item['id']) for item in data['items']]
|
||||
|
||||
|
||||
def resolve_v3_url(url, api_key_v3):
|
||||
# Check if it's a YouTube feed, and if we have an API key, auto-resolve the channel
|
||||
if url and api_key_v3:
|
||||
_, user = for_each_feed_pattern(lambda url, channel: (url, channel), url, (None, None))
|
||||
if user is not None:
|
||||
logger.info('Getting channels for YouTube user %s', user)
|
||||
new_urls = get_channels_for_user(user, api_key_v3)
|
||||
logger.debug('YouTube channels retrieved: %r', new_urls)
|
||||
if len(new_urls) == 1:
|
||||
return new_urls[0]
|
||||
def parse_youtube_url(url):
|
||||
"""
|
||||
Youtube Channel Links are parsed into youtube feed links
|
||||
>>> parse_youtube_url("https://www.youtube.com/channel/CHANNEL_ID")
|
||||
'https://www.youtube.com/feeds/videos.xml?channel_id=CHANNEL_ID'
|
||||
|
||||
return url
|
||||
Youtube User Links are parsed into youtube feed links
|
||||
>>> parse_youtube_url("https://www.youtube.com/user/USERNAME")
|
||||
'https://www.youtube.com/feeds/videos.xml?user=USERNAME'
|
||||
|
||||
Youtube Playlist Links are parsed into youtube feed links
|
||||
>>> parse_youtube_url("https://www.youtube.com/playlist?list=PLAYLIST_ID")
|
||||
'https://www.youtube.com/feeds/videos.xml?playlist_id=PLAYLIST_ID'
|
||||
|
||||
@param url: the path to the channel, user or playlist
|
||||
@return: the feed url if successful or the given url if not
|
||||
"""
|
||||
|
||||
scheme, netloc, path, query, fragment = urllib.parse.urlsplit(url)
|
||||
logger.debug("Analyzing URL: {}".format(" ".join([scheme, netloc, path, query, fragment])))
|
||||
|
||||
if 'youtube.com' in netloc and ('/user/' in path or '/channel/' in path or 'list=' in query):
|
||||
logger.debug("Valid Youtube URL detected. Parsing...")
|
||||
|
||||
if path.startswith('/user/'):
|
||||
user_id = path.split('/')[2]
|
||||
query = 'user={user_id}'.format(user_id=user_id)
|
||||
|
||||
if path.startswith('/channel/'):
|
||||
channel_id = path.split('/')[2]
|
||||
query = 'channel_id={channel_id}'.format(channel_id=channel_id)
|
||||
|
||||
if 'list=' in query:
|
||||
playlist_query = [query_value for query_value in query.split("&") if 'list=' in query_value][0]
|
||||
playlist_id = playlist_query.strip("list=")
|
||||
query = 'playlist_id={playlist_id}'.format(playlist_id=playlist_id)
|
||||
|
||||
path = '/feeds/videos.xml'
|
||||
|
||||
new_url = urllib.parse.urlunsplit((scheme, netloc, path, query, fragment))
|
||||
logger.debug("New Youtube URL: {}".format(new_url))
|
||||
return new_url
|
||||
else:
|
||||
logger.debug("Not a valid Youtube URL: {}".format(url))
|
||||
return url
|
||||
|
|
Loading…
Reference in a new issue