diff --git a/nyx_bot/bot_commands.py b/nyx_bot/bot_commands.py index 9ce77a0..5802bd0 100644 --- a/nyx_bot/bot_commands.py +++ b/nyx_bot/bot_commands.py @@ -1,12 +1,14 @@ import logging import time from calendar import THURSDAY +from collections import defaultdict from datetime import date, datetime from zlib import crc32 from dateutil.relativedelta import relativedelta from nio import ( AsyncClient, + JoinedMembersError, MatrixRoom, RoomGetEventError, RoomMessageImage, @@ -27,6 +29,7 @@ from nyx_bot.config import Config from nyx_bot.errors import NyxBotRuntimeError, NyxBotValueError from nyx_bot.storage import MatrixMessage, MembershipUpdates, UserTag from nyx_bot.utils import ( + get_user_id_parts, make_datetime, make_divergence, parse_matrixdotto_link, @@ -129,6 +132,8 @@ class Command: await self._wordcloud() elif self.command == "ping": await self._ping() + elif self.command == "servers": + await self._servers() else: await self._unknown_command() @@ -313,6 +318,28 @@ class Command: literal_text=True, ) + async def _servers(self): + results = defaultdict(int) + members_response = await self.client.joined_members(self.room.room_id) + if isinstance(members_response, JoinedMembersError): + error = members_response.message + raise NyxBotRuntimeError(f"Gathering member list failed: {error}") + members = members_response.members + for m in members: + user_id = m.user_id + _, domain = get_user_id_parts(user_id) + results[domain] += 1 + msg = "\n".join(f"{k}: {v}" for k, v in results.items()) + await send_text_to_room( + self.client, + self.room.room_id, + msg, + notice=True, + markdown_convert=False, + reply_to_event_id=self.event.event_id, + literal_text=True, + ) + async def _user_id(self): if not self.reply_to: raise NyxBotValueError("Please reply to a message for sending user ID.") diff --git a/nyx_bot/utils.py b/nyx_bot/utils.py index c3ce3be..57e29e0 100644 --- a/nyx_bot/utils.py +++ b/nyx_bot/utils.py @@ -235,3 +235,20 @@ def should_enable_jerryxiao(room_features, room_id: str) -> bool: def should_enable_randomdraw(room_features, room_id: str) -> bool: return room_features[room_id]["randomdraw"] + + +# A structure for a Matrix UID. It also supports legacy UID formats. +# First part: [\!-9\;-\~]+ +# Matches legacy UIDs too. +# Second part: +# // IPv4 Address: [0-9]{1,3}.[0-9]{1,3}.[0-9]{1,3}.[0-9]{1,3} +# // IPv6 Address: \[[0-9A-Fa-f:.]{2,45}\] +# // DNS name: [-.0-9A-Za-z]{1,255} +# // Port: [0-9]{1,5} +# // Hostname: [0-9]{1,3}.[0-9]{1,3}.[0-9]{1,3}.[0-9]{1,3}|\[[0-9A-Fa-f:.]{2,45}\]|[-.0-9A-Za-z]{1,255}(?::[0-9]{1,5})? +MATRIX_UID_RE = r"@([\!-9\;-\~]+):([0-9]{1,3}.[0-9]{1,3}.[0-9]{1,3}.[0-9]{1,3}|\[[0-9A-Fa-f:.]{2,45}\]|[-.0-9A-Za-z]{1,255}(?::[0-9]{1,5})?)" + + +def get_user_id_parts(user_id: str) -> Tuple[str, str]: + uid, domain = re.match(MATRIX_UID_RE, user_id).groups() + return (uid, domain)