Add Phase 2: integrations, admin, and remaining commands

New commands: agent, trivia (with 30s timer reveal), ask (Ollama LLM
with cooldown), minecraft (RCON whitelist), health (admin-only metrics).
Adds metrics tracking, per-user cooldowns, and admin permission checks.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-02-11 20:52:57 -05:00
parent 50de731a82
commit 5723ac3581
5 changed files with 325 additions and 2 deletions

View File

@@ -1,11 +1,23 @@
import asyncio
import json
import random
import time
import logging
from collections import Counter
from datetime import datetime
from functools import partial
import aiohttp
from nio import AsyncClient
from utils import send_text, send_html, send_reaction, sanitize_input
from config import MAX_DICE_SIDES, MAX_DICE_COUNT, BOT_PREFIX
from config import (
MAX_DICE_SIDES, MAX_DICE_COUNT, BOT_PREFIX, ADMIN_USERS,
OLLAMA_URL, OLLAMA_MODEL, MAX_INPUT_LENGTH, COOLDOWN_SECONDS,
MINECRAFT_RCON_HOST, MINECRAFT_RCON_PORT, MINECRAFT_RCON_PASSWORD,
RCON_TIMEOUT, MIN_USERNAME_LENGTH, MAX_USERNAME_LENGTH,
)
logger = logging.getLogger("matrixbot")
@@ -20,6 +32,53 @@ def command(name, description=""):
return decorator
# ==================== METRICS ====================
class MetricsCollector:
def __init__(self):
self.command_counts = Counter()
self.error_counts = Counter()
self.start_time = datetime.now()
def record_command(self, command_name: str):
self.command_counts[command_name] += 1
def record_error(self, command_name: str):
self.error_counts[command_name] += 1
def get_stats(self) -> dict:
uptime = datetime.now() - self.start_time
return {
"uptime_seconds": uptime.total_seconds(),
"commands_executed": sum(self.command_counts.values()),
"top_commands": self.command_counts.most_common(5),
"error_count": sum(self.error_counts.values()),
}
metrics = MetricsCollector()
# ==================== COOLDOWNS ====================
# sender -> {command: last_used_time}
_cooldowns: dict[str, dict[str, float]] = {}
def check_cooldown(sender: str, cmd_name: str, seconds: int = COOLDOWN_SECONDS) -> int:
"""Return 0 if allowed, otherwise seconds remaining."""
now = time.monotonic()
user_cds = _cooldowns.setdefault(sender, {})
last = user_cds.get(cmd_name, 0)
remaining = seconds - (now - last)
if remaining > 0:
return int(remaining) + 1
user_cds[cmd_name] = now
return 0
# ==================== COMMANDS ====================
@@ -316,3 +375,245 @@ async def cmd_champion(client: AsyncClient, room_id: str, sender: str, args: str
f"Lane: {lane}"
)
await send_html(client, room_id, plain, html)
@command("agent", "Random Valorant agent (optional: !agent duelist)")
async def cmd_agent(client: AsyncClient, room_id: str, sender: str, args: str):
agents = {
"Duelists": ["Jett", "Phoenix", "Raze", "Reyna", "Yoru", "Neon", "Iso", "Waylay"],
"Controllers": ["Brimstone", "Viper", "Omen", "Astra", "Harbor", "Clove"],
"Initiators": ["Sova", "Breach", "Skye", "KAY/O", "Fade", "Gekko", "Tejo"],
"Sentinels": ["Killjoy", "Cypher", "Sage", "Chamber", "Deadlock", "Vyse", "Veto"],
}
role_arg = args.strip().capitalize() if args.strip() else ""
# Allow partial match: "duelist" -> "Duelists"
role = None
if role_arg:
for key in agents:
if key.lower().startswith(role_arg.lower()):
role = key
break
if role is None:
role = random.choice(list(agents.keys()))
selected = random.choice(agents[role])
plain = f"Valorant Agent Picker: {selected} ({role})"
html = (
f"<strong>Valorant Agent Picker</strong><br>"
f"Agent: <strong>{selected}</strong><br>"
f"Role: {role}"
)
await send_html(client, room_id, plain, html)
@command("trivia", "Play a trivia game")
async def cmd_trivia(client: AsyncClient, room_id: str, sender: str, args: str):
questions = [
{"q": "What year was the original Super Mario Bros. released?", "options": ["1983", "1985", "1987", "1990"], "answer": 1},
{"q": "Which game features the quote 'The cake is a lie'?", "options": ["Half-Life 2", "Portal", "BioShock", "Minecraft"], "answer": 1},
{"q": "What is the max level in League of Legends?", "options": ["16", "18", "20", "25"], "answer": 1},
{"q": "Which Valorant agent has the codename 'Deadeye'?", "options": ["Jett", "Sova", "Chamber", "Cypher"], "answer": 2},
{"q": "How many Ender Dragon eggs can exist in a vanilla Minecraft world?", "options": ["1", "2", "Unlimited", "0"], "answer": 0},
{"q": "What was the first battle royale game to hit mainstream popularity?", "options": ["Fortnite", "PUBG", "H1Z1", "Apex Legends"], "answer": 2},
{"q": "In Minecraft, what is the rarest ore?", "options": ["Diamond", "Emerald", "Ancient Debris", "Lapis Lazuli"], "answer": 1},
{"q": "What is the name of the main character in The Legend of Zelda?", "options": ["Zelda", "Link", "Ganondorf", "Epona"], "answer": 1},
{"q": "Which game has the most registered players of all time?", "options": ["Fortnite", "Minecraft", "League of Legends", "Roblox"], "answer": 1},
{"q": "What type of animal is Sonic?", "options": ["Fox", "Hedgehog", "Rabbit", "Echidna"], "answer": 1},
{"q": "In Among Us, what is the maximum number of impostors?", "options": ["1", "2", "3", "4"], "answer": 2},
{"q": "What does GG stand for in gaming?", "options": ["Get Good", "Good Game", "Go Go", "Great Going"], "answer": 1},
{"q": "Which company developed Valorant?", "options": ["Blizzard", "Valve", "Riot Games", "Epic Games"], "answer": 2},
{"q": "What is the highest rank in Valorant?", "options": ["Immortal", "Diamond", "Radiant", "Challenger"], "answer": 2},
{"q": "In League of Legends, what is Baron Nashor an anagram of?", "options": ["Baron Roshan", "Roshan", "Nashor Baron", "Nash Robot"], "answer": 1},
{"q": "What does HTTP stand for?", "options": ["HyperText Transfer Protocol", "High Tech Transfer Program", "HyperText Transmission Process", "Home Tool Transfer Protocol"], "answer": 0},
{"q": "What year was Discord founded?", "options": ["2013", "2015", "2017", "2019"], "answer": 1},
{"q": "What programming language has a logo that is a snake?", "options": ["Java", "Ruby", "Python", "Go"], "answer": 2},
{"q": "How many bits are in a byte?", "options": ["4", "8", "16", "32"], "answer": 1},
{"q": "What does 'RGB' stand for?", "options": ["Really Good Build", "Red Green Blue", "Red Gold Black", "Rapid Gaming Boost"], "answer": 1},
{"q": "What is the most subscribed YouTube channel?", "options": ["PewDiePie", "MrBeast", "T-Series", "Cocomelon"], "answer": 1},
{"q": "What does 'AFK' stand for?", "options": ["A Free Kill", "Away From Keyboard", "Always Fun Killing", "Another Fake Knockdown"], "answer": 1},
{"q": "What animal is the Linux mascot?", "options": ["Fox", "Penguin", "Cat", "Dog"], "answer": 1},
{"q": "What does 'NPC' stand for?", "options": ["Non-Player Character", "New Player Content", "Normal Playing Conditions", "Never Played Competitively"], "answer": 0},
{"q": "In what year was the first iPhone released?", "options": ["2005", "2006", "2007", "2008"], "answer": 2},
]
labels = ["\U0001f1e6", "\U0001f1e7", "\U0001f1e8", "\U0001f1e9"] # A B C D regional indicators
label_letters = ["A", "B", "C", "D"]
question = random.choice(questions)
options_plain = "\n".join(f" {label_letters[i]}. {opt}" for i, opt in enumerate(question["options"]))
options_html = "".join(f"<li><strong>{label_letters[i]}</strong>. {opt}</li>" for i, opt in enumerate(question["options"]))
plain = f"Trivia Time!\n{question['q']}\n{options_plain}\n\nReact with A/B/C/D — answer revealed in 30s!"
html = (
f"<strong>Trivia Time!</strong><br>"
f"<em>{question['q']}</em><br>"
f"<ul>{options_html}</ul>"
f"React with A/B/C/D — answer revealed in 30s!"
)
resp = await send_html(client, room_id, plain, html)
if hasattr(resp, "event_id"):
for emoji in labels:
await send_reaction(client, room_id, resp.event_id, emoji)
# Reveal answer after 30 seconds
async def reveal():
await asyncio.sleep(30)
correct = question["answer"]
answer_text = f"{label_letters[correct]}. {question['options'][correct]}"
await send_html(
client, room_id,
f"Trivia Answer: {answer_text}",
f"<strong>Trivia Answer:</strong> {answer_text}",
)
asyncio.create_task(reveal())
# ==================== INTEGRATIONS ====================
@command("ask", "Ask Lotus LLM a question (2min cooldown)")
async def cmd_ask(client: AsyncClient, room_id: str, sender: str, args: str):
if not args:
await send_text(client, room_id, f"Usage: {BOT_PREFIX}ask <question>")
return
remaining = check_cooldown(sender, "ask")
if remaining:
await send_text(client, room_id, f"Command on cooldown. Try again in {remaining}s.")
return
question = sanitize_input(args)
if not question:
await send_text(client, room_id, "Please provide a valid question.")
return
await send_text(client, room_id, "Thinking...")
try:
timeout = aiohttp.ClientTimeout(total=60)
async with aiohttp.ClientSession(timeout=timeout) as session:
async with session.post(
f"{OLLAMA_URL}/api/generate",
json={"model": OLLAMA_MODEL, "prompt": question, "stream": True},
) as response:
full_response = ""
async for line in response.content:
try:
chunk = json.loads(line)
if "response" in chunk:
full_response += chunk["response"]
except json.JSONDecodeError:
pass
if not full_response:
full_response = "No response received from server."
plain = f"Lotus LLM\nQ: {question}\nA: {full_response}"
html = (
f"<strong>Lotus LLM</strong><br>"
f"<em>Q:</em> {question}<br>"
f"<em>A:</em> {full_response}"
)
await send_html(client, room_id, plain, html)
except asyncio.TimeoutError:
await send_text(client, room_id, "LLM request timed out. Try again later.")
except Exception as e:
logger.error(f"Ollama error: {e}", exc_info=True)
await send_text(client, room_id, "Failed to reach Lotus LLM. It may be offline.")
@command("minecraft", "Whitelist a player on the Minecraft server")
async def cmd_minecraft(client: AsyncClient, room_id: str, sender: str, args: str):
username = args.strip()
if not username:
await send_text(client, room_id, f"Usage: {BOT_PREFIX}minecraft <username>")
return
if not username.replace("_", "").isalnum():
await send_text(client, room_id, "Invalid username. Use only letters, numbers, and underscores.")
return
if not (MIN_USERNAME_LENGTH <= len(username) <= MAX_USERNAME_LENGTH):
await send_text(client, room_id, f"Username must be {MIN_USERNAME_LENGTH}-{MAX_USERNAME_LENGTH} characters.")
return
if not MINECRAFT_RCON_PASSWORD:
await send_text(client, room_id, "Minecraft server is not configured.")
return
await send_text(client, room_id, f"Whitelisting {username}...")
try:
from mcrcon import MCRcon
def _rcon():
with MCRcon(MINECRAFT_RCON_HOST, MINECRAFT_RCON_PASSWORD, port=MINECRAFT_RCON_PORT, timeout=3) as mcr:
return mcr.command(f"whitelist add {username}")
loop = asyncio.get_running_loop()
response = await asyncio.wait_for(loop.run_in_executor(None, _rcon), timeout=RCON_TIMEOUT)
logger.info(f"RCON response: {response}")
plain = f"Minecraft\nYou have been whitelisted on the SMP!\nServer: minecraft.lotusguild.org\nUsername: {username}"
html = (
f"<strong>Minecraft</strong><br>"
f"You have been whitelisted on the SMP!<br>"
f"Server: <strong>minecraft.lotusguild.org</strong><br>"
f"Username: <strong>{username}</strong>"
)
await send_html(client, room_id, plain, html)
except ImportError:
await send_text(client, room_id, "mcrcon is not installed. Ask an admin to install it.")
except asyncio.TimeoutError:
await send_text(client, room_id, "Minecraft server timed out. It may be offline.")
except Exception as e:
logger.error(f"RCON error: {e}", exc_info=True)
await send_text(client, room_id, "Failed to whitelist. The server may be offline (let jared know).")
# ==================== ADMIN COMMANDS ====================
@command("health", "Bot status and health (admin only)")
async def cmd_health(client: AsyncClient, room_id: str, sender: str, args: str):
if sender not in ADMIN_USERS:
await send_text(client, room_id, "You don't have permission to use this command.")
return
stats = metrics.get_stats()
uptime_hours = stats["uptime_seconds"] / 3600
top_cmds = ""
if stats["top_commands"]:
top_cmds = ", ".join(f"{name}({count})" for name, count in stats["top_commands"])
services = []
if OLLAMA_URL:
services.append("Ollama: configured")
else:
services.append("Ollama: N/A")
if MINECRAFT_RCON_PASSWORD:
services.append("RCON: configured")
else:
services.append("RCON: N/A")
plain = (
f"Bot Status\n"
f"Uptime: {uptime_hours:.1f}h\n"
f"Commands run: {stats['commands_executed']}\n"
f"Errors: {stats['error_count']}\n"
f"Top commands: {top_cmds or 'none'}\n"
f"Services: {', '.join(services)}"
)
html = (
f"<strong>Bot Status</strong><br>"
f"<strong>Uptime:</strong> {uptime_hours:.1f}h<br>"
f"<strong>Commands run:</strong> {stats['commands_executed']}<br>"
f"<strong>Errors:</strong> {stats['error_count']}<br>"
f"<strong>Top commands:</strong> {top_cmds or 'none'}<br>"
f"<strong>Services:</strong> {', '.join(services)}"
)
await send_html(client, room_id, plain, html)