Add servers command

This commit is contained in:
夜坂雅 2023-02-21 22:21:02 +08:00
parent bbc2fd560f
commit 3b6b703fab
2 changed files with 44 additions and 0 deletions

View File

@ -1,12 +1,14 @@
import logging
import time
from calendar import THURSDAY
from collections import defaultdict
from datetime import date, datetime
from zlib import crc32
from dateutil.relativedelta import relativedelta
from nio import (
AsyncClient,
JoinedMembersError,
MatrixRoom,
RoomGetEventError,
RoomMessageImage,
@ -27,6 +29,7 @@ from nyx_bot.config import Config
from nyx_bot.errors import NyxBotRuntimeError, NyxBotValueError
from nyx_bot.storage import MatrixMessage, MembershipUpdates, UserTag
from nyx_bot.utils import (
get_user_id_parts,
make_datetime,
make_divergence,
parse_matrixdotto_link,
@ -129,6 +132,8 @@ class Command:
await self._wordcloud()
elif self.command == "ping":
await self._ping()
elif self.command == "servers":
await self._servers()
else:
await self._unknown_command()
@ -313,6 +318,28 @@ class Command:
literal_text=True,
)
async def _servers(self):
results = defaultdict(int)
members_response = await self.client.joined_members(self.room.room_id)
if isinstance(members_response, JoinedMembersError):
error = members_response.message
raise NyxBotRuntimeError(f"Gathering member list failed: {error}")
members = members_response.members
for m in members:
user_id = m.user_id
_, domain = get_user_id_parts(user_id)
results[domain] += 1
msg = "\n".join(f"{k}: {v}" for k, v in results.items())
await send_text_to_room(
self.client,
self.room.room_id,
msg,
notice=True,
markdown_convert=False,
reply_to_event_id=self.event.event_id,
literal_text=True,
)
async def _user_id(self):
if not self.reply_to:
raise NyxBotValueError("Please reply to a message for sending user ID.")

View File

@ -235,3 +235,20 @@ def should_enable_jerryxiao(room_features, room_id: str) -> bool:
def should_enable_randomdraw(room_features, room_id: str) -> bool:
return room_features[room_id]["randomdraw"]
# A structure for a Matrix UID. It also supports legacy UID formats.
# First part: [\!-9\;-\~]+
# Matches legacy UIDs too.
# Second part:
# // IPv4 Address: [0-9]{1,3}.[0-9]{1,3}.[0-9]{1,3}.[0-9]{1,3}
# // IPv6 Address: \[[0-9A-Fa-f:.]{2,45}\]
# // DNS name: [-.0-9A-Za-z]{1,255}
# // Port: [0-9]{1,5}
# // Hostname: [0-9]{1,3}.[0-9]{1,3}.[0-9]{1,3}.[0-9]{1,3}|\[[0-9A-Fa-f:.]{2,45}\]|[-.0-9A-Za-z]{1,255}(?::[0-9]{1,5})?
MATRIX_UID_RE = r"@([\!-9\;-\~]+):([0-9]{1,3}.[0-9]{1,3}.[0-9]{1,3}.[0-9]{1,3}|\[[0-9A-Fa-f:.]{2,45}\]|[-.0-9A-Za-z]{1,255}(?::[0-9]{1,5})?)"
def get_user_id_parts(user_id: str) -> Tuple[str, str]:
uid, domain = re.match(MATRIX_UID_RE, user_id).groups()
return (uid, domain)