scripts/weechat/urltitel.py

370 lines
12 KiB
Python
Executable File

import html
import json
import re
from collections import defaultdict
from socket import timeout
from typing import DefaultDict, List, NamedTuple, Optional
from urllib.error import URLError
from urllib.parse import ParseResult, quote, urlparse, urlunparse
from urllib.request import Request, urlopen
import weechat
SCRIPT_NAME = "urltitel"
SCRIPT_AUTHOR = "soratobuneko"
SCRIPT_VERSION = "10"
SCRIPT_LICENCE = "WTFPL"
SCRIPT_DESCRIPTION = (
"Display or send titles of URLs from incoming and outcoming messages. "
+ "Also features an optional URL buffer"
)
UA = f"Mozilla/5.0 (Python) weechat {SCRIPT_NAME}"
BUFFER_NAME = SCRIPT_NAME
OPTIONS = {
"debug": ("off",
"Show debug messages"),
"http_rewrite": ("on",
"Rewrite HTTP URL to HTTPS"),
"maxdownload": ("262144",
"Maximum size (Bytes) to fetch from URL."),
"maxlength": ("200",
"Maximum length of title."),
"replyto": ("",
'"|" separated list of server,#channel for which instead of displaying localy a message we send it to the channel.'),
"retry": ("off",
"Retry fetching URL if it fails the first time."),
"sendfromme": ("off",
"Always send titles for URLs sent by ourself."),
"serverchans": ("*,*",
'"|" separated list of server,#channel to parse. for instance: "server0,#channel1|server0,#channel2"'),
"timeout": ("3",
"Maximum time to wait to fetch URL."),
"urlbuffer": ("off",
"Create a buffer to collect the URLs with their titles."),
}
script_options: DefaultDict[str, str] = defaultdict()
url_buffer = None
def create_buffer() -> None:
"""
Create URL buffer.
"""
global url_buffer
url_buffer = weechat.buffer_new(BUFFER_NAME, "", "", "on_buffer_close", "")
weechat.buffer_set(
url_buffer, "title", f"URL buffer ({SCRIPT_NAME} v{SCRIPT_VERSION})"
)
def debug(message: str) -> None:
"""
Print debug message to weechat console.
:param message str: message to print.
"""
if script_options["debug"] == "on":
weechat.prnt("", f"{SCRIPT_NAME}: {message}")
def error(message: str) -> None:
"""
Print error message to weechat console.
:param message str: message to print.
"""
weechat.prnt("", f"{weechat.prefix('error')}{SCRIPT_NAME}: {message}")
class Document(NamedTuple):
"""
A fetched document.
"""
url: ParseResult
src: str
def fetch_html(url: str) -> Optional[Document]:
"""
Fetch HTML Document at given URL.
:param url str: URL to fetch from.
:return: a Document if the URL point to an HTML document.
:rtype: Optional[Document]
"""
# IRI to URL (unicode to ascii)
debug(f"raw URL: {url}")
url_parsed = urlparse(url)
if (url_parsed.netloc.find("%") == -1 and url_parsed.path.find("%") == -1):
debug(f"parsed URL: {url_parsed}")
url_parsed = ParseResult(
scheme=("https" if script_options["http_rewrite"]
and url_parsed.scheme == "http"
else url_parsed.scheme),
netloc=quote(url_parsed.netloc),
path=quote(url_parsed.path),
params=url_parsed.params,
query=url_parsed.query,
fragment=url_parsed.fragment
)
debug(f"parsed encoded URL: {url_parsed}")
url = urlunparse(url_parsed)
request = Request(url, data=None, headers={"User-Agent": UA})
tries = 2 if script_options["retry"] == "on" else 1
for i in range(0, tries):
try:
with urlopen(request, timeout=int(script_options["timeout"])) as res:
is_html = bool(re.match(".*/html.*",
res.info()["Content-Type"]))
if is_html:
debug(f"Got an HTML document. Reading at most {script_options['maxdownload']} bytes.")
html_doc_head = res.read(int(script_options["maxdownload"])).decode(errors="ignore")
return Document(url=url_parsed, src=html_doc_head)
debug("Not an HTML document.")
return None
except URLError as err:
error(f"Cannot fetch {url}. {err.reason}")
except timeout:
error(f"Socket timed out while fetching {url}")
return None
_re_url = re.compile(r"https?://[\w0-9@:%._\+~#=()?&/\-]+")
def find_urls(message: str) -> List[str]:
"""
Get URLs in a given string.
:param message str: string to parse.
:return: a list of found URLs.
:rtype: List[str]
"""
if re.match(r"^url\|\d+\): ", message):
return []
return re.findall(_re_url, message)
_re_whitespace = re.compile(r"\s")
def get_title(html_doc: Document) -> Optional[str]:
"""
Get the title of an HTML Document.
:param html_doc Document: document to parse.
:return: Document title.
:rtype: str
"""
title = None
title_match = re.search(r"(?i)<title ?[^<>]*>([^<>]*)</title>",
html_doc.src)
if title_match is None:
debug("No <title> found.")
return None
title = html.unescape(title_match.group(1))
# many whitespaces to one space
stripped_title = ""
for i, char in enumerate(title):
if not re.match(_re_whitespace, char):
stripped_title += char
elif i > 0 and not re.match(_re_whitespace, title[i - 1]):
stripped_title += " "
stripped_title = stripped_title.strip()
if stripped_title.find("The Pirate Bay - The galaxy's most resilient bittorrent site") == 0:
torrent = tpb_get_torrent_by_url(html_doc.url)
if torrent is not None:
stripped_title = f"TPB torrent: {torrent.name}"
return stripped_title
class Torrent(NamedTuple):
"""
Torrent informations.
"""
id: int
name: str
def tpb_get_torrent(tid: int) -> Torrent:
"""
Fetch torrent informations from the Pirate Bay.
:param tid int: torrent id.
:return: torrent informations.
:rtype: Torrent
"""
request = Request(f"https://apibay.org/t.php?id={tid}",
data=None,
headers={"User-Agent": UA})
with urlopen(url=request,
timeout=int(script_options["timeout"])) as response:
json_ = json.load(response)
return Torrent(id=tid, name=json_["name"])
_re_query_id = re.compile(r"^(?:[^&]*[&])*id=([0-9]+)$(?:[&][^&]*)*")
def tpb_get_torrent_by_url(url: ParseResult) -> Optional[Torrent]:
"""
Fetch torrent information given is description URL.
:param url ParseResult: torrent URL on the Pirate Bay.
:return: torrent informations.
:rtype: Optional[Torrent]
"""
if url.path.endswith("description.php"):
id_match = re.match(_re_query_id, url.query)
return (tpb_get_torrent(tid=int(id_match.group(1)))
if id_match is not None
else None)
return None
def on_config_change(data, option, value):
"""
Update runtime options on config change.
"""
key = option.split(".")[-1]
script_options[key] = value
return weechat.WEECHAT_RC_OK
def on_buffer_close(data, buffer):
"""
Buffer close callback.
"""
global url_buffer
url_buffer = None
return weechat.WEECHAT_RC_OK
def on_privmsg(data, signal, signal_data):
"""
Callback for incoming and outcoming IRC messages.
"""
global url_buffer
server = signal.split(",")[0]
msg = weechat.info_get_hashtable("irc_message_parse",
{"message": signal_data})
srvchan = f"{server},{msg['channel']}"
# Parse only messages from configured server/channels
if not srvchan_in_list(srvchan, script_options["serverchans"].split("|")):
debug(f"Ignoring message from {server}/{msg['channel']}")
return weechat.WEECHAT_RC_OK
urls = find_urls(msg["text"])
titles = []
for url in urls:
debug(f"Fetching title for {url}")
html_doc = fetch_html(url)
if html_doc is not None:
title = get_title(html_doc)
if title is not None and len(title) > 0:
if len(title) > int(script_options["maxlength"]):
title = title[0: int(script_options["maxlength"])] + ""
debug(f"Found title: {title}")
titles.append(title)
else:
titles.append(None)
if len(urls) > 0:
if script_options["urlbuffer"] == "on":
nick = msg["nick"]
if len(nick) == 0:
nick = f"{weechat.color('*white')}{weechat.info_get('irc_nick', server)}{weechat.color('default')}"
if not url_buffer:
create_buffer()
weechat.prnt(
url_buffer,
f"<{nick}{weechat.color('red')}@{weechat.color('default')}{server}/{msg['channel']}>\t{msg['text']}",
)
force_send = (script_options["sendfromme"] == "on"
and len(msg["nick"]) == 0)
show_urls_title(srvchan, titles, force_send)
return weechat.WEECHAT_RC_OK
def show_urls_title(srvchan: str, titles: List[str], force_send: bool) -> None:
"""
Show or send titles to buffer.
:param srvchan str: server and channel name.
:param titles List[str]: title list.
:param force_send bool: when set always send titles.
"""
ACTION_SEND = "Sending"
buffer = weechat.info_get("irc_buffer", srvchan)
action = (
(ACTION_SEND, "to")
if force_send or srvchan_in_list(srvchan,
script_options["replyto"].split("|"))
else ("Displaying", "on")
)
if buffer is not None:
for i, title in enumerate(titles):
if title is not None:
debug(f"{action[0]} title(s) {action[1]} {srvchan}")
if action[0] == ACTION_SEND:
weechat.command(buffer, f"url|{i + 1}): {title}")
else: # We have already checked script_options["serverchans"] in on_privmsg
weechat.prnt(buffer, f"{i + 1}:\t{title}")
if script_options["urlbuffer"] == "on":
if url_buffer is None:
create_buffer()
weechat.prnt(url_buffer, f"{i + 1}:\t{title}")
def srvchan_in_list(srvchan: str, srvchan_list: List[str]) -> bool:
"""
Check if a server and channel is in a given list.
:param srvchan str: server and channel to look for.
:param srvchan_list List[str]: server and channel combination list.
:return: True if srvchan is in the list.
:rtype: bool
"""
srv_chan = srvchan.lower().split(",")
for _srvchan in srvchan_list:
_srv_chan = _srvchan.lower().split(",")
if (_srv_chan[0] == "*" or srv_chan[0] == _srv_chan[0]) and (
_srv_chan[1] == "*" or srv_chan[1] == _srv_chan[1]
):
return True
return False
weechat.register(
SCRIPT_NAME,
SCRIPT_AUTHOR,
SCRIPT_VERSION,
SCRIPT_LICENCE,
SCRIPT_DESCRIPTION,
"",
"",
)
for option, default_value in list(OPTIONS.items()):
if not weechat.config_is_set_plugin(option):
weechat.config_set_plugin(option, default_value[0])
script_options[option] = default_value[0]
else:
script_options[option] = weechat.config_get_plugin(option)
weechat.config_set_desc_plugin(
option, f"{default_value[1]} (default: {default_value[0]})"
)
if script_options["urlbuffer"] == "on":
create_buffer()
weechat.hook_config("plugins.var.python." + SCRIPT_NAME + ".*",
"on_config_change", "")
weechat.hook_signal("*,irc_in2_privmsg", "on_privmsg", "")
weechat.hook_signal("*,irc_out1_privmsg", "on_privmsg", "")