diff --git a/matrixbot/commands.py b/matrixbot/commands.py index 99df8f3..5c01180 100644 --- a/matrixbot/commands.py +++ b/matrixbot/commands.py @@ -7,6 +7,7 @@ import logging from collections import Counter from datetime import datetime from pathlib import Path +from urllib.parse import quote as _url_quote import aiohttp @@ -18,7 +19,8 @@ from config import ( MAX_DICE_SIDES, MAX_DICE_COUNT, BOT_PREFIX, ADMIN_USERS, OLLAMA_URL, OLLAMA_MODEL, CREATIVE_MODEL, ASK_MODEL, COOLDOWN_SECONDS, MINECRAFT_RCON_HOST, MINECRAFT_RCON_PORT, MINECRAFT_RCON_PASSWORD, - RCON_TIMEOUT, MIN_USERNAME_LENGTH, MAX_USERNAME_LENGTH, MATRIX_USER_ID, + RCON_TIMEOUT, MIN_USERNAME_LENGTH, MAX_USERNAME_LENGTH, + MATRIX_USER_ID, MATRIX_HOMESERVER, ) logger = logging.getLogger("matrixbot") @@ -137,6 +139,7 @@ async def cmd_help(client: AsyncClient, room_id: str, sender: str, args: str): ]), ("🎲 Random", ["flip", "roll", "random", "champion", "agent"]), ("πŸ–₯️ Server", ["minecraft", "ping", "health"]), + ("πŸ”§ Management (PL50+)", ["mkroom", "roominfo", "topic", "invite", "setpl"]), ] plain_lines = ["LotusBot Commands"] @@ -3689,3 +3692,254 @@ async def cmd_da(client: AsyncClient, room_id: str, sender: str, args: str): await asyncio.sleep(3) await _tduel_next_question(client, room_id) + + +# =========================================================================== +# Management Commands (PL 50+) +# =========================================================================== + +_MGMT_SPACE_ID = "!-1ZBnAH-JiCOV8MGSKN77zDGTuI3pgSdy8Unu_DrDyc" +_MGMT_TEMPLATE_ID = "!wfokQ1-pE896scu_AOcCBA2s3L4qFo-PTBAFTd0WMI0" # #general +_MGMT_SERVER_NAME = MATRIX_HOMESERVER.replace("https://", "").replace("http://", "") + + +async def _mx(client: AsyncClient, method: str, path: str, body: dict | None = None) -> dict: + """Authenticated Matrix Client-Server API request.""" + url = f"{MATRIX_HOMESERVER}{path}" + headers = { + "Authorization": f"Bearer {client.access_token}", + "Content-Type": "application/json", + } + timeout = aiohttp.ClientTimeout(total=30) + async with aiohttp.ClientSession(timeout=timeout) as session: + async with getattr(session, method)(url, json=body, headers=headers) as resp: + return await resp.json() + + +async def _get_state(client: AsyncClient, room_id: str, event_type: str, state_key: str = "") -> dict | None: + """Fetch a single state event content; returns None on error.""" + try: + path = f"/_matrix/client/v3/rooms/{_url_quote(room_id)}/state/{_url_quote(event_type)}/{_url_quote(state_key)}" + data = await _mx(client, "get", path) + return None if "errcode" in data else data + except Exception: + return None + + +async def _put_state(client: AsyncClient, room_id: str, event_type: str, content: dict, state_key: str = "") -> dict: + path = f"/_matrix/client/v3/rooms/{_url_quote(room_id)}/state/{_url_quote(event_type)}/{_url_quote(state_key)}" + return await _mx(client, "put", path, content) + + +@command("mkroom", "Create a new room using #general as the template (PL50+)") +async def cmd_mkroom(client: AsyncClient, room_id: str, sender: str, args: str): + if not is_elevated(client, room_id, sender): + await send_text(client, room_id, "β›” This command requires power level 50 or higher.") + return + + name = sanitize_input(args.strip()) + if not name: + await send_text(client, room_id, f"Usage: {BOT_PREFIX}mkroom ") + return + + await send_text(client, room_id, f"πŸ—οΈ Creating room '{name}' from the #general template…") + + # Fetch template state from #general + power_levels = await _get_state(client, _MGMT_TEMPLATE_ID, "m.room.power_levels") + join_rules = await _get_state(client, _MGMT_TEMPLATE_ID, "m.room.join_rules") + history_vis = await _get_state(client, _MGMT_TEMPLATE_ID, "m.room.history_visibility") + guest_access = await _get_state(client, _MGMT_TEMPLATE_ID, "m.room.guest_access") + encryption = await _get_state(client, _MGMT_TEMPLATE_ID, "m.room.encryption") + avatar = await _get_state(client, _MGMT_TEMPLATE_ID, "m.room.avatar") + + initial_state = [] + if power_levels: + initial_state.append({"type": "m.room.power_levels", "content": power_levels}) + if join_rules: + initial_state.append({"type": "m.room.join_rules", "content": join_rules}) + if history_vis: + initial_state.append({"type": "m.room.history_visibility", "content": history_vis}) + if guest_access: + initial_state.append({"type": "m.room.guest_access", "content": guest_access}) + if encryption: + initial_state.append({"type": "m.room.encryption", "content": encryption}) + if avatar and avatar.get("url"): + initial_state.append({"type": "m.room.avatar", "content": avatar}) + + try: + result = await _mx(client, "post", "/_matrix/client/v3/createRoom", { + "name": name, + "room_version": "12", + "preset": "private_chat", + "initial_state": initial_state, + }) + + if "errcode" in result: + await send_text(client, room_id, f"❌ Room creation failed: {result.get('error', result['errcode'])}") + return + + new_room_id = result["room_id"] + + # Add the new room to the Space as a child + await _put_state(client, _MGMT_SPACE_ID, "m.space.child", { + "via": [_MGMT_SERVER_NAME], + "suggested": False, + }, new_room_id) + + # Invite the requesting user (bot is already in the room as creator) + await _mx(client, "post", + f"/_matrix/client/v3/rooms/{_url_quote(new_room_id)}/invite", + {"user_id": sender}) + + await send_html(client, room_id, + f"βœ… Room '{name}' created!\nRoom ID: {new_room_id}\nAdded to the Space and invited you.", + f'βœ… Room created: {name}
' + f'{new_room_id}
' + f'Added to the Lotus Guild Space β€” invite yourself or others with !invite @user in the new room.', + ) + except Exception as e: + logger.error("mkroom error: %s", e, exc_info=True) + await send_text(client, room_id, "❌ An error occurred while creating the room.") + + +@command("roominfo", "Show info about the current room (PL50+)") +async def cmd_roominfo(client: AsyncClient, room_id: str, sender: str, args: str): + if not is_elevated(client, room_id, sender): + await send_text(client, room_id, "β›” This command requires power level 50 or higher.") + return + + room = client.rooms.get(room_id) + if not room: + await send_text(client, room_id, "Room not found in local state.") + return + + pl = room.power_levels + elevated = sorted( + [(uid, lvl) for uid, lvl in pl.users.items() if lvl > 0], + key=lambda x: x[1], reverse=True, + ) + el_lines = [f" PL{lvl}: {uid.split(':')[0].lstrip('@')}" for uid, lvl in elevated] + + join_state = await _get_state(client, room_id, "m.room.join_rules") + join_rule = join_state.get("join_rule", "unknown") if join_state else "unknown" + enc = await _get_state(client, room_id, "m.room.encryption") + encrypted = "yes" if enc else "no" + + member_count = len(room.users) + + plain = ( + f"πŸ“‹ Room Info\n" + f"Name: {room.display_name}\n" + f"ID: {room_id}\n" + f"Members: {member_count}\n" + f"Join rule: {join_rule}\n" + f"Encrypted: {encrypted}\n" + f"Elevated users:\n" + ("\n".join(el_lines) if el_lines else " (none)") + ) + html = ( + f'πŸ“‹ Room Info
' + f'Name: {room.display_name}
' + f'ID: {room_id}
' + f'Members: {member_count}
' + f'Join rule: {join_rule}
' + f'Encrypted: {encrypted}
' + f'Elevated users:" if elevated else "
  • (none)
  • ") + ) + await send_html(client, room_id, plain, html) + + +@command("topic", "Set the room topic (PL50+) β€” leave blank to clear") +async def cmd_topic(client: AsyncClient, room_id: str, sender: str, args: str): + if not is_elevated(client, room_id, sender): + await send_text(client, room_id, "β›” This command requires power level 50 or higher.") + return + + topic = sanitize_input(args.strip()) + try: + result = await _put_state(client, room_id, "m.room.topic", {"topic": topic}) + if "errcode" in result: + await send_text(client, room_id, f"❌ Failed: {result.get('error', result['errcode'])}") + return + if topic: + await send_text(client, room_id, f"βœ… Topic set to: {topic}") + else: + await send_text(client, room_id, "βœ… Topic cleared.") + except Exception as e: + logger.error("topic error: %s", e, exc_info=True) + await send_text(client, room_id, "❌ Failed to set topic.") + + +@command("invite", "Invite a user to this room (PL50+)") +async def cmd_invite(client: AsyncClient, room_id: str, sender: str, args: str): + if not is_elevated(client, room_id, sender): + await send_text(client, room_id, "β›” This command requires power level 50 or higher.") + return + + target = args.strip() + if not target or not target.startswith("@"): + await send_text(client, room_id, f"Usage: {BOT_PREFIX}invite @user:server") + return + + try: + result = await _mx(client, "post", + f"/_matrix/client/v3/rooms/{_url_quote(room_id)}/invite", + {"user_id": target}) + if "errcode" in result: + await send_text(client, room_id, f"❌ Failed: {result.get('error', result['errcode'])}") + return + name = target.split(":")[0].lstrip("@") + await send_text(client, room_id, f"βœ… Invited {name} to the room.") + except Exception as e: + logger.error("invite error: %s", e, exc_info=True) + await send_text(client, room_id, "❌ Failed to send invite.") + + +@command("setpl", "Set a user's power level (PL50+) β€” !setpl @user <0-100>") +async def cmd_setpl(client: AsyncClient, room_id: str, sender: str, args: str): + if not is_elevated(client, room_id, sender): + await send_text(client, room_id, "β›” This command requires power level 50 or higher.") + return + + parts = args.strip().split() + if len(parts) < 2 or not parts[0].startswith("@"): + await send_text(client, room_id, f"Usage: {BOT_PREFIX}setpl @user ") + return + + target = parts[0] + try: + new_level = int(parts[1]) + except ValueError: + await send_text(client, room_id, "Power level must be a number.") + return + + if not 0 <= new_level <= 100: + await send_text(client, room_id, "Power level must be between 0 and 100.") + return + + room = client.rooms.get(room_id) + sender_level = room.power_levels.get_user_level(sender) if room else 0 + if new_level > sender_level: + await send_text(client, room_id, + f"β›” You can't set a power level higher than your own ({sender_level}).") + return + + # Fetch current power_levels state and patch it + current_pl = await _get_state(client, room_id, "m.room.power_levels") + if not current_pl: + await send_text(client, room_id, "❌ Couldn't fetch current power levels.") + return + + current_pl.setdefault("users", {})[target] = new_level + + try: + result = await _put_state(client, room_id, "m.room.power_levels", current_pl) + if "errcode" in result: + await send_text(client, room_id, f"❌ Failed: {result.get('error', result['errcode'])}") + return + name = target.split(":")[0].lstrip("@") + await send_text(client, room_id, f"βœ… {name}'s power level set to {new_level}.") + except Exception as e: + logger.error("setpl error: %s", e, exc_info=True) + await send_text(client, room_id, "❌ Failed to update power level.")