diff --git a/.env.example b/.env.example new file mode 100644 index 0000000..b46566b --- /dev/null +++ b/.env.example @@ -0,0 +1,7 @@ +MATRIX_HOMESERVER=https://matrix.lotusguild.org +MATRIX_USER_ID=@lotusbot:matrix.lotusguild.org +MATRIX_ACCESS_TOKEN= +MATRIX_DEVICE_ID= +BOT_PREFIX=! +ADMIN_USERS=@jared:matrix.lotusguild.org +LOG_LEVEL=INFO diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..226f48b --- /dev/null +++ b/.gitignore @@ -0,0 +1,6 @@ +.env +nio_store/ +logs/ +__pycache__/ +*.pyc +credentials.json diff --git a/bot.py b/bot.py new file mode 100644 index 0000000..bc08357 --- /dev/null +++ b/bot.py @@ -0,0 +1,136 @@ +import asyncio +import json +import logging +import signal +import sys +from pathlib import Path + +from nio import ( + AsyncClient, + AsyncClientConfig, + LoginResponse, + RoomMessageText, +) + +from config import ( + MATRIX_HOMESERVER, + MATRIX_USER_ID, + MATRIX_ACCESS_TOKEN, + MATRIX_DEVICE_ID, + LOG_LEVEL, + ConfigValidator, +) +from callbacks import Callbacks +from utils import setup_logging + +logger = setup_logging(LOG_LEVEL) + +CREDENTIALS_FILE = Path("credentials.json") +STORE_PATH = Path("nio_store") + + +def save_credentials(resp, homeserver): + data = { + "homeserver": homeserver, + "user_id": resp.user_id, + "device_id": resp.device_id, + "access_token": resp.access_token, + } + CREDENTIALS_FILE.write_text(json.dumps(data, indent=2)) + logger.info("Credentials saved to %s", CREDENTIALS_FILE) + + +def trust_devices(client: AsyncClient): + """Auto-trust all devices for all users we share rooms with.""" + if not client.olm: + logger.warning("Olm not loaded, skipping device trust") + return + for user_id, devices in client.device_store.items(): + for device_id, olm_device in devices.items(): + if not client.olm.is_device_verified(olm_device): + client.verify_device(olm_device) + logger.info("Trusted all known devices") + + +async def main(): + errors = ConfigValidator.validate() + if errors: + for e in errors: + logger.error(e) + sys.exit(1) + + STORE_PATH.mkdir(exist_ok=True) + + client_config = AsyncClientConfig( + store_sync_tokens=True, + encryption_enabled=True, + store_name="matrixbot", + ) + + client = AsyncClient( + MATRIX_HOMESERVER, + MATRIX_USER_ID, + device_id=MATRIX_DEVICE_ID, + config=client_config, + store_path=str(STORE_PATH), + ) + + # Restore access token (no password login needed) + client.access_token = MATRIX_ACCESS_TOKEN + client.user_id = MATRIX_USER_ID + client.device_id = MATRIX_DEVICE_ID + + # Load the olm/e2ee store if it exists + client.load_store() + + callbacks = Callbacks(client) + client.add_event_callback(callbacks.message, RoomMessageText) + + # Graceful shutdown + loop = asyncio.get_running_loop() + shutdown_event = asyncio.Event() + + def _signal_handler(): + logger.info("Shutdown signal received") + shutdown_event.set() + + for sig in (signal.SIGTERM, signal.SIGINT): + loop.add_signal_handler(sig, _signal_handler) + + logger.info("Starting initial sync...") + + # Do a first sync to catch up, then mark startup complete so we only + # process new messages going forward. + sync_resp = await client.sync(timeout=30000, full_state=True) + if hasattr(sync_resp, "next_batch"): + callbacks.startup_sync_token = sync_resp.next_batch + logger.info("Initial sync complete, token: %s", sync_resp.next_batch[:20]) + else: + logger.error("Initial sync failed: %s", sync_resp) + await client.close() + sys.exit(1) + + # Trust devices after initial sync loads the device store + trust_devices(client) + + logger.info("Bot ready as %s — listening for commands", MATRIX_USER_ID) + + # Run sync_forever in a task so we can cancel on shutdown + async def _sync_loop(): + await client.sync_forever(timeout=30000, full_state=False) + + sync_task = asyncio.create_task(_sync_loop()) + + await shutdown_event.wait() + sync_task.cancel() + try: + await sync_task + except asyncio.CancelledError: + pass + + await client.close() + logger.info("Bot shut down cleanly") + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/callbacks.py b/callbacks.py new file mode 100644 index 0000000..3d4577f --- /dev/null +++ b/callbacks.py @@ -0,0 +1,60 @@ +import logging +from functools import wraps + +from nio import AsyncClient, RoomMessageText + +from config import BOT_PREFIX, MATRIX_USER_ID +from commands import COMMANDS + +logger = logging.getLogger("matrixbot") + + +def handle_command_errors(func): + @wraps(func) + async def wrapper(client, room_id, sender, args): + try: + return await func(client, room_id, sender, args) + except Exception as e: + logger.error(f"Error in command {func.__name__}: {e}", exc_info=True) + try: + from utils import send_text + await send_text(client, room_id, "An unexpected error occurred. Please try again later.") + except Exception as e2: + logger.error(f"Failed to send error message: {e2}", exc_info=True) + return wrapper + + +class Callbacks: + def __init__(self, client: AsyncClient): + self.client = client + # Track the sync token so we ignore old messages on startup + self.startup_sync_token = None + + async def message(self, room, event): + # Ignore messages from before the bot started + if self.startup_sync_token is None: + return + + # Ignore our own messages + if event.sender == MATRIX_USER_ID: + return + + body = event.body.strip() if event.body else "" + if not body.startswith(BOT_PREFIX): + return + + # Parse command and args + without_prefix = body[len(BOT_PREFIX):] + parts = without_prefix.split(None, 1) + cmd_name = parts[0].lower() if parts else "" + args = parts[1] if len(parts) > 1 else "" + + logger.info(f"Command '{cmd_name}' from {event.sender} in {room.room_id}") + + handler_entry = COMMANDS.get(cmd_name) + if handler_entry is None: + return + + handler, _ = handler_entry + wrapped = handle_command_errors(handler) + await wrapped(self.client, room.room_id, event.sender, args) diff --git a/commands.py b/commands.py new file mode 100644 index 0000000..e02df7e --- /dev/null +++ b/commands.py @@ -0,0 +1,318 @@ +import random +import time +import logging + +from nio import AsyncClient + +from utils import send_text, send_html, send_reaction, sanitize_input +from config import MAX_DICE_SIDES, MAX_DICE_COUNT, BOT_PREFIX + +logger = logging.getLogger("matrixbot") + +# Registry: name -> (handler, description) +COMMANDS = {} + + +def command(name, description=""): + def decorator(func): + COMMANDS[name] = (func, description) + return func + return decorator + + +# ==================== COMMANDS ==================== + + +@command("help", "Show all available commands") +async def cmd_help(client: AsyncClient, room_id: str, sender: str, args: str): + lines_plain = ["Commands:"] + lines_html = ["

Commands

") + await send_html(client, room_id, "\n".join(lines_plain), "\n".join(lines_html)) + + +@command("ping", "Check bot latency") +async def cmd_ping(client: AsyncClient, room_id: str, sender: str, args: str): + start = time.monotonic() + resp = await send_text(client, room_id, "Pong!") + elapsed = (time.monotonic() - start) * 1000 + # Edit isn't straightforward in Matrix, so just send a follow-up if slow + if elapsed > 500: + await send_text(client, room_id, f"(round-trip: {elapsed:.0f}ms)") + + +@command("8ball", "Ask the magic 8-ball a question") +async def cmd_8ball(client: AsyncClient, room_id: str, sender: str, args: str): + if not args: + await send_text(client, room_id, f"Usage: {BOT_PREFIX}8ball ") + return + + responses = [ + "It is certain", "Without a doubt", "You may rely on it", + "Yes definitely", "It is decidedly so", "As I see it, yes", + "Most likely", "Yes sir!", "Hell yeah my dude", "100% easily", + "Reply hazy try again", "Ask again later", "Better not tell you now", + "Cannot predict now", "Concentrate and ask again", "Idk bro", + "Don't count on it", "My reply is no", "My sources say no", + "Outlook not so good", "Very doubtful", "Hell no", "Prolly not", + ] + + answer = random.choice(responses) + plain = f"Question: {args}\nAnswer: {answer}" + html = ( + f"Magic 8-Ball
" + f"Q: {args}
" + f"A: {answer}" + ) + await send_html(client, room_id, plain, html) + + +@command("fortune", "Get a fortune cookie message") +async def cmd_fortune(client: AsyncClient, room_id: str, sender: str, args: str): + fortunes = [ + "If you eat something & nobody sees you eat it, it has no calories", + "Your pet is plotting world domination", + "Error 404: Fortune not found. Try again after system reboot", + "The fortune you seek is in another cookie", + "A journey of a thousand miles begins with ordering delivery", + "You will find great fortune... in between your couch cushions", + "A true friend is someone who tells you when your stream is muted", + "Your next competitive match will be legendary", + "The cake is still a lie", + "Press Alt+F4 for instant success", + "You will not encounter any campers today", + "Your tank will have a healer", + "No one will steal your pentakill", + "Your random teammate will have a mic", + "You will find diamonds on your first dig", + "The boss will drop the rare loot", + "Your speedrun will be WR pace", + "No lag spikes in your next match", + "Your gaming chair will grant you powers", + "The RNG gods will bless you", + "You will not get third partied", + "Your squad will actually stick together", + "The enemy team will forfeit at 15", + "Your aim will be crispy today", + "You will escape the backrooms", + "The imposter will not sus you", + "Your Minecraft bed will remain unbroken", + "You will get Play of the Game", + "Your next meme will go viral", + "Someone is talking about you in their Discord server", + "Your FBI agent thinks you're hilarious", + "Your next TikTok will hit the FYP, if the government doesn't ban it first", + "Someone will actually read your Twitter thread", + "Your DMs will be blessed with quality memes today", + "Touch grass (respectfully)", + "The algorithm will be in your favor today", + "Your next Spotify shuffle will hit different", + "Someone saved your Instagram post", + "Your Reddit comment will get gold", + "POV: You're about to go viral", + "Main character energy detected", + "No cap, you're gonna have a great day fr fr", + "Your rizz levels are increasing", + "You will not get ratio'd today", + "Someone will actually use your custom emoji", + "Your next selfie will be iconic", + "Buy a dolphin - your life will have a porpoise", + "Stop procrastinating - starting tomorrow", + "Catch fire with enthusiasm - people will come for miles to watch you burn", + "Your code will compile on the first try today", + "A semicolon will save your day", + "The bug you've been hunting is just a typo", + "Your next Git commit will be perfect", + "You will find the solution on the first StackOverflow link", + "Your Docker container will build without errors", + "The cloud is just someone else's computer", + "Your backup strategy will soon prove its worth", + "A mechanical keyboard is in your future", + "You will finally understand regex... maybe", + "Your CSS will align perfectly on the first try", + "Someone will star your GitHub repo today", + "Your Linux installation will not break after updates", + "You will remember to push your changes before shutdown", + "Your code comments will actually make sense in 6 months", + "The missing curly brace is on line 247", + "Have you tried turning it off and on again?", + "Your next pull request will be merged without comments", + "Your keyboard RGB will sync perfectly today", + "You will find that memory leak", + "Your next algorithm will have O(1) complexity", + "The force quit was strong with this one", + "Ctrl+S will save you today", + "Your next Python script will need no debugging", + "Your next API call will return 200 OK", + ] + + fortune = random.choice(fortunes) + plain = f"Fortune Cookie: {fortune}" + html = f"Fortune Cookie
{fortune}" + await send_html(client, room_id, plain, html) + + +@command("flip", "Flip a coin") +async def cmd_flip(client: AsyncClient, room_id: str, sender: str, args: str): + result = random.choice(["Heads", "Tails"]) + plain = f"Coin Flip: {result}" + html = f"Coin Flip: {result}" + await send_html(client, room_id, plain, html) + + +@command("roll", "Roll dice (e.g. !roll 2d6)") +async def cmd_roll(client: AsyncClient, room_id: str, sender: str, args: str): + dice_str = args.strip() if args.strip() else "1d6" + + try: + num, sides = map(int, dice_str.lower().split("d")) + except ValueError: + await send_text(client, room_id, f"Usage: {BOT_PREFIX}roll NdS (example: 2d6)") + return + + if num < 1 or num > MAX_DICE_COUNT: + await send_text(client, room_id, f"Number of dice must be 1-{MAX_DICE_COUNT}") + return + if sides < 2 or sides > MAX_DICE_SIDES: + await send_text(client, room_id, f"Sides must be 2-{MAX_DICE_SIDES}") + return + + results = [random.randint(1, sides) for _ in range(num)] + total = sum(results) + plain = f"Dice Roll ({dice_str}): {results} = {total}" + html = ( + f"Dice Roll ({dice_str})
" + f"Rolls: {results}
" + f"Total: {total}" + ) + await send_html(client, room_id, plain, html) + + +@command("random", "Random number (e.g. !random 1 100)") +async def cmd_random(client: AsyncClient, room_id: str, sender: str, args: str): + parts = args.split() + try: + lo = int(parts[0]) if len(parts) >= 1 else 1 + hi = int(parts[1]) if len(parts) >= 2 else 100 + except ValueError: + await send_text(client, room_id, f"Usage: {BOT_PREFIX}random ") + return + + if lo > hi: + lo, hi = hi, lo + + result = random.randint(lo, hi) + plain = f"Random ({lo}-{hi}): {result}" + html = f"Random Number ({lo}\u2013{hi}): {result}" + await send_html(client, room_id, plain, html) + + +@command("rps", "Rock Paper Scissors") +async def cmd_rps(client: AsyncClient, room_id: str, sender: str, args: str): + choices = ["rock", "paper", "scissors"] + choice = args.strip().lower() + + if choice not in choices: + await send_text(client, room_id, f"Usage: {BOT_PREFIX}rps ") + return + + bot_choice = random.choice(choices) + + if choice == bot_choice: + result = "It's a tie!" + elif ( + (choice == "rock" and bot_choice == "scissors") + or (choice == "paper" and bot_choice == "rock") + or (choice == "scissors" and bot_choice == "paper") + ): + result = "You win!" + else: + result = "Bot wins!" + + plain = f"RPS: You={choice}, Bot={bot_choice} -> {result}" + html = ( + f"Rock Paper Scissors
" + f"You: {choice.capitalize()} | Bot: {bot_choice.capitalize()}
" + f"{result}" + ) + await send_html(client, room_id, plain, html) + + +@command("poll", "Create a yes/no poll") +async def cmd_poll(client: AsyncClient, room_id: str, sender: str, args: str): + if not args: + await send_text(client, room_id, f"Usage: {BOT_PREFIX}poll ") + return + + plain = f"Poll: {args}" + html = f"Poll
{args}" + resp = await send_html(client, room_id, plain, html) + + if hasattr(resp, "event_id"): + await send_reaction(client, room_id, resp.event_id, "\U0001f44d") + await send_reaction(client, room_id, resp.event_id, "\U0001f44e") + + +@command("champion", "Random LoL champion (optional: !champion top)") +async def cmd_champion(client: AsyncClient, room_id: str, sender: str, args: str): + champions = { + "Top": [ + "Aatrox", "Ambessa", "Aurora", "Camille", "Cho'Gath", "Darius", + "Dr. Mundo", "Fiora", "Gangplank", "Garen", "Gnar", "Gragas", + "Gwen", "Illaoi", "Irelia", "Jax", "Jayce", "K'Sante", "Kennen", + "Kled", "Malphite", "Mordekaiser", "Nasus", "Olaf", "Ornn", + "Poppy", "Quinn", "Renekton", "Riven", "Rumble", "Sett", "Shen", + "Singed", "Sion", "Teemo", "Trundle", "Tryndamere", "Urgot", + "Vladimir", "Volibear", "Wukong", "Yone", "Yorick", + ], + "Jungle": [ + "Amumu", "Bel'Veth", "Briar", "Diana", "Ekko", "Elise", + "Evelynn", "Fiddlesticks", "Graves", "Hecarim", "Ivern", + "Jarvan IV", "Kayn", "Kha'Zix", "Kindred", "Lee Sin", "Lillia", + "Maokai", "Master Yi", "Nidalee", "Nocturne", "Nunu", "Olaf", + "Rek'Sai", "Rengar", "Sejuani", "Shaco", "Skarner", "Taliyah", + "Udyr", "Vi", "Viego", "Warwick", "Xin Zhao", "Zac", + ], + "Mid": [ + "Ahri", "Akali", "Akshan", "Annie", "Aurelion Sol", "Azir", + "Cassiopeia", "Corki", "Ekko", "Fizz", "Galio", "Heimerdinger", + "Hwei", "Irelia", "Katarina", "LeBlanc", "Lissandra", "Lux", + "Malzahar", "Mel", "Naafiri", "Neeko", "Orianna", "Qiyana", + "Ryze", "Sylas", "Syndra", "Talon", "Twisted Fate", "Veigar", + "Vex", "Viktor", "Vladimir", "Xerath", "Yasuo", "Yone", "Zed", + "Zoe", + ], + "Bot": [ + "Aphelios", "Ashe", "Caitlyn", "Draven", "Ezreal", "Jhin", + "Jinx", "Kai'Sa", "Kalista", "Kog'Maw", "Lucian", + "Miss Fortune", "Nilah", "Samira", "Sivir", "Smolder", + "Tristana", "Twitch", "Varus", "Vayne", "Xayah", "Zeri", + ], + "Support": [ + "Alistar", "Bard", "Blitzcrank", "Brand", "Braum", "Janna", + "Karma", "Leona", "Lulu", "Lux", "Milio", "Morgana", "Nami", + "Nautilus", "Pyke", "Rakan", "Rell", "Renata Glasc", "Senna", + "Seraphine", "Sona", "Soraka", "Swain", "Taric", "Thresh", + "Yuumi", "Zilean", "Zyra", + ], + } + + lane_arg = args.strip().capitalize() if args.strip() else "" + if lane_arg and lane_arg in champions: + lane = lane_arg + else: + lane = random.choice(list(champions.keys())) + + champ = random.choice(champions[lane]) + plain = f"Champion Picker: {champ} ({lane})" + html = ( + f"League Champion Picker
" + f"Champion: {champ}
" + f"Lane: {lane}" + ) + await send_html(client, room_id, plain, html) diff --git a/config.py b/config.py new file mode 100644 index 0000000..d97e7f0 --- /dev/null +++ b/config.py @@ -0,0 +1,34 @@ +import os +import logging +from dotenv import load_dotenv + +load_dotenv() + + +# Required +MATRIX_HOMESERVER = os.getenv("MATRIX_HOMESERVER", "https://matrix.lotusguild.org") +MATRIX_USER_ID = os.getenv("MATRIX_USER_ID", "@lotusbot:matrix.lotusguild.org") +MATRIX_ACCESS_TOKEN = os.getenv("MATRIX_ACCESS_TOKEN", "") +MATRIX_DEVICE_ID = os.getenv("MATRIX_DEVICE_ID", "") + +# Bot settings +BOT_PREFIX = os.getenv("BOT_PREFIX", "!") +ADMIN_USERS = [u.strip() for u in os.getenv("ADMIN_USERS", "").split(",") if u.strip()] +LOG_LEVEL = os.getenv("LOG_LEVEL", "INFO") + +# Constants +MAX_INPUT_LENGTH = 500 +MAX_DICE_SIDES = 100 +MAX_DICE_COUNT = 20 + + +class ConfigValidator: + REQUIRED = ["MATRIX_HOMESERVER", "MATRIX_USER_ID", "MATRIX_ACCESS_TOKEN", "MATRIX_DEVICE_ID"] + + @classmethod + def validate(cls): + errors = [] + for var in cls.REQUIRED: + if not os.getenv(var): + errors.append(f"Missing required: {var}") + return errors diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..f1afe86 --- /dev/null +++ b/requirements.txt @@ -0,0 +1,4 @@ +matrix-nio[e2e] +python-dotenv +aiohttp +markdown diff --git a/utils.py b/utils.py new file mode 100644 index 0000000..e2fb9ef --- /dev/null +++ b/utils.py @@ -0,0 +1,81 @@ +import logging +from logging.handlers import RotatingFileHandler +from pathlib import Path + +from nio import AsyncClient, RoomSendResponse + +from config import MAX_INPUT_LENGTH + + +def setup_logging(level="INFO"): + Path("logs").mkdir(exist_ok=True) + + logger = logging.getLogger("matrixbot") + logger.setLevel(getattr(logging, level.upper(), logging.INFO)) + + file_handler = RotatingFileHandler( + "logs/matrixbot.log", + maxBytes=10 * 1024 * 1024, + backupCount=5, + ) + file_handler.setFormatter( + logging.Formatter("%(asctime)s - %(levelname)s - %(message)s") + ) + + stream_handler = logging.StreamHandler() + stream_handler.setFormatter( + logging.Formatter("%(asctime)s - %(levelname)s - %(message)s") + ) + + logger.addHandler(file_handler) + logger.addHandler(stream_handler) + return logger + + +async def send_text(client: AsyncClient, room_id: str, text: str): + logger = logging.getLogger("matrixbot") + resp = await client.room_send( + room_id, + message_type="m.room.message", + content={"msgtype": "m.text", "body": text}, + ) + if not isinstance(resp, RoomSendResponse): + logger.error("send_text failed: %s", resp) + return resp + + +async def send_html(client: AsyncClient, room_id: str, plain: str, html: str): + logger = logging.getLogger("matrixbot") + resp = await client.room_send( + room_id, + message_type="m.room.message", + content={ + "msgtype": "m.text", + "body": plain, + "format": "org.matrix.custom.html", + "formatted_body": html, + }, + ) + if not isinstance(resp, RoomSendResponse): + logger.error("send_html failed: %s", resp) + return resp + + +async def send_reaction(client: AsyncClient, room_id: str, event_id: str, emoji: str): + return await client.room_send( + room_id, + message_type="m.reaction", + content={ + "m.relates_to": { + "rel_type": "m.annotation", + "event_id": event_id, + "key": emoji, + } + }, + ) + + +def sanitize_input(text: str, max_length: int = MAX_INPUT_LENGTH) -> str: + text = text.strip()[:max_length] + text = "".join(char for char in text if char.isprintable()) + return text