From 3b2b8f39fc3eb67cbff62bac7acf7907a61a55a6 Mon Sep 17 00:00:00 2001 From: soratobuneko Date: Fri, 22 Jan 2021 16:22:30 +0100 Subject: [PATCH] urltitel: tpb torrent name support. some typing refactor. --- urltitel.py | 80 ++++++++++++++++++++++++++++++++++++++++++----------- 1 file changed, 64 insertions(+), 16 deletions(-) diff --git a/urltitel.py b/urltitel.py index b900ee4..ebab3d3 100755 --- a/urltitel.py +++ b/urltitel.py @@ -1,16 +1,17 @@ # TODO option ignore chan import html +import json import re import weechat from socket import timeout -from typing import List, Optional +from typing import List, NamedTuple, Optional, Tuple from urllib.error import URLError -from urllib.parse import quote, urlsplit, urlunsplit +from urllib.parse import ParseResult, quote, urlparse, urlunparse from urllib.request import Request, urlopen SCRIPT_NAME = "urltitel" SCRIPT_AUTHOR = "soratobuneko" -SCRIPT_VERSION = "8dev" +SCRIPT_VERSION = "8" SCRIPT_LICENCE = "WTFPL" SCRIPT_DESCRIPTION = ( "Display or send titles of URLs from incoming and outcoming messages. " @@ -35,6 +36,7 @@ script_options = { "sendfromme": ("off", "Alway send titles for URLs sent by ourself."), "urlbuffer": ("off", "Create a buffer to collect the URLs with their titles."), "debug": ("off", "Show debug messages"), + "http_rewrite": ("on", "Rewrite HTTP URL to HTTPS") } url_buffer = None @@ -56,13 +58,24 @@ def error(message: str) -> None: weechat.prnt("", f"{weechat.prefix('error')}{SCRIPT_NAME}: {message}") -def fetch_html(url: str) -> Optional[str]: +class Document(NamedTuple): + url: ParseResult + src: str + +def fetch_html(url: str) -> Optional[Document]: # IRI to URL (unicode to ascii) - url_split = urlsplit(url) - url_list = list(url_split) - url_list[1] = quote(url_list[1]) # URL encode domain - url_list[2] = quote(url_list[2]) # URL encode path - url = urlunsplit(url_list) + url_parsed = urlparse(url) + url_urlencoded = ParseResult( + scheme=("https" if script_options["http_rewrite"] + and url_parsed.scheme == "http" + else url_parsed.scheme), + netloc=quote(url_parsed.netloc), + path=quote(url_parsed.path), + params=url_parsed.params, + query=url_parsed.query, + fragment=url_parsed.fragment + ) + url = urlunparse(url_urlencoded) request = Request(url, data=None, headers={"User-Agent": UA}) tries = 2 if script_options["retry"] == "on" else 1 @@ -73,7 +86,7 @@ def fetch_html(url: str) -> Optional[str]: if is_html: debug(f"Got an HTML document. Reading at most {script_options['maxdownload']} bytes.") html_doc_head = res.read(int(script_options["maxdownload"])).decode(errors="ignore") - return html_doc_head + return Document(url=url_urlencoded, src=html_doc_head) else: debug("Not an HTML document.") return None @@ -98,9 +111,10 @@ def find_urls(message: str) -> List[str]: _re_whitespace = re.compile(r"\s") -def get_title(html_doc: str) -> Optional[str]: +def get_title(html_doc: Document) -> Optional[str]: title = None - title_match = re.search(r"(?i)]*>([^<>]*)", html_doc) + title_match = re.search(r"(?i)]*>([^<>]*)", + html_doc.src) if title_match is None: debug("No found.") return None @@ -116,8 +130,38 @@ def get_title(html_doc: str) -> Optional[str]: stripped_title += " " stripped_title = stripped_title.strip() + if stripped_title.find("The Pirate Bay - The galaxy's most resilient bittorrent site") == 0: + torrent = tpb_get_torrent_by_url(html_doc.url) + if torrent is not None: + stripped_title = f"TPB torrent: {torrent.name}" + return stripped_title +class Torrent(NamedTuple): + id: int + name: str + +def tpb_get_torrent(id: int) -> Torrent: + request = Request(f"https://apibay.org/t.php?id={id}", + data=None, + headers={"User-Agent": UA}) + with urlopen(url=request, + timeout=int(script_options["timeout"])) as response: + json_ = json.load(response) + return Torrent(id=id, name=json_["name"]) + + +_re_query_id = re.compile(r"^(?:[^&]*[&])*id=([0-9]+)$(?:[&][^&]*)*") + + + +def tpb_get_torrent_by_url(url: ParseResult) -> Optional[Torrent]: + if url.path.endswith("description.php"): + id_match = re.match(_re_query_id, url.query) + return (tpb_get_torrent(id=int(id_match.group(1))) + if id_match is not None + else None) + return None def on_config_change(data, option, value): key = option.split(".")[-1] @@ -134,7 +178,8 @@ def on_buffer_close(data, buffer): def on_privmsg(data, signal, signal_data): global url_buffer server = signal.split(",")[0] - msg = weechat.info_get_hashtable("irc_message_parse", {"message": signal_data}) + msg = weechat.info_get_hashtable("irc_message_parse", + {"message": signal_data}) srvchan = f"{server},{msg['channel']}" # Parse only messages from configured server/channels @@ -168,7 +213,8 @@ def on_privmsg(data, signal, signal_data): url_buffer, f"<{nick}{weechat.color('red')}@{weechat.color('default')}{server}/{msg['channel']}>\t{msg['text']}", ) - force_send = script_options["sendfromme"] == "on" and len(msg["nick"]) == 0 + force_send = (script_options["sendfromme"] == "on" + and len(msg["nick"]) == 0) show_urls_title(srvchan, titles, force_send) return weechat.WEECHAT_RC_OK @@ -179,7 +225,8 @@ def show_urls_title(srvchan: str, titles: List[str], force_send: bool) -> None: buffer = weechat.info_get("irc_buffer", srvchan) action = ( (ACTION_SEND, "to") - if force_send or srvchan_in_list(srvchan, script_options["replyto"].split("|")) + if force_send or srvchan_in_list(srvchan, + script_options["replyto"].split("|")) else ("Displaying", "on") ) if buffer is not None: @@ -231,6 +278,7 @@ if script_options["urlbuffer"] == "on": create_buffer() -weechat.hook_config("plugins.var.python." + SCRIPT_NAME + ".*", "on_config_change", "") +weechat.hook_config("plugins.var.python." + SCRIPT_NAME + ".*", + "on_config_change", "") weechat.hook_signal("*,irc_in2_privmsg", "on_privmsg", "") weechat.hook_signal("*,irc_out1_privmsg", "on_privmsg", "")