- Remove all 15 "Issue #XX" reference comments from production code - Remove dead code: CooldownManager class (unused, using discord.py cooldown), is_owner() function (unreferenced), unused imports (datetime_time, get) - Remove unused env vars: ANNOUNCEMENT_CHANNEL_ID, PELICAN_URL, HYTALE_SERVER_UUID - Fix AuditLogger.flush() dropping items when queue > 10 (now processes in batches) - Fix AuditLogger.log() using .seconds instead of .total_seconds() (broke after 1+ days) - Use unicode escapes for emoji in poll reactions for consistency Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
1349 lines
56 KiB
Python
1349 lines
56 KiB
Python
import discord
|
|
import os
|
|
import random
|
|
import asyncio
|
|
import json
|
|
import logging
|
|
from logging.handlers import RotatingFileHandler
|
|
from collections import Counter
|
|
from datetime import datetime, timedelta
|
|
from pathlib import Path
|
|
from functools import wraps, partial
|
|
from dotenv import load_dotenv
|
|
from discord import app_commands
|
|
from discord.ext import commands, tasks
|
|
from itertools import cycle
|
|
from mcrcon import MCRcon
|
|
import aiohttp
|
|
|
|
Path("logs").mkdir(exist_ok=True)
|
|
|
|
|
|
def setup_logging():
|
|
"""Configure logging with rotation"""
|
|
logger = logging.getLogger('discord_bot')
|
|
logger.setLevel(logging.INFO)
|
|
|
|
file_handler = RotatingFileHandler(
|
|
'logs/discord.log',
|
|
maxBytes=10*1024*1024,
|
|
backupCount=5
|
|
)
|
|
file_handler.setFormatter(
|
|
logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
|
|
)
|
|
|
|
stream_handler = logging.StreamHandler()
|
|
stream_handler.setFormatter(
|
|
logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
|
|
)
|
|
|
|
logger.addHandler(file_handler)
|
|
logger.addHandler(stream_handler)
|
|
return logger
|
|
|
|
|
|
logger = setup_logging()
|
|
|
|
load_dotenv()
|
|
|
|
# Configuration
|
|
GUILD_ID = int(os.getenv('GUILD_ID', '605864889927467181'))
|
|
AUDIT_CHANNEL_ID = int(os.getenv('AUDIT_CHANNEL_ID', '1340861451392520233'))
|
|
ADMIN_ROLE_ID = int(os.getenv('ADMIN_ROLE_ID', '605867042104541194'))
|
|
MINECRAFT_ROLE_ID = int(os.getenv('MINECRAFT_ROLE_ID', '821163520942145556'))
|
|
HYTALE_ROLE_ID = int(os.getenv('HYTALE_ROLE_ID', '1460750779848589548'))
|
|
COOL_KIDS_ROLE_ID = int(os.getenv('COOL_KIDS_ROLE_ID', '788968178117902347'))
|
|
OWNER_ID = int(os.getenv('OWNER_ID', '238728085342519296'))
|
|
REACTION_MESSAGE_ID = int(os.getenv('REACTION_MESSAGE_ID', '744047519696420914'))
|
|
MINECRAFT_RCON_HOST = os.getenv('MINECRAFT_RCON_HOST', '10.10.10.67')
|
|
MINECRAFT_RCON_PASSWORD = os.getenv('MINECRAFT_RCON_PASSWORD', '')
|
|
PELICAN_API_KEY = os.getenv('PELICAN_API_KEY', '')
|
|
STATUS_UPDATE_INTERVAL = int(os.getenv('STATUS_UPDATE_INTERVAL', '300'))
|
|
OLLAMA_URL = os.getenv('OLLAMA_HOST', 'http://10.10.10.157:11434')
|
|
|
|
# Constants
|
|
MIN_USERNAME_LENGTH = 3
|
|
MAX_USERNAME_LENGTH = 16
|
|
MAX_INPUT_LENGTH = 500
|
|
MAX_DICE_SIDES = 100
|
|
MAX_DICE_COUNT = 20
|
|
COOLDOWN_MINUTES = 2
|
|
RCON_TIMEOUT = 5.0
|
|
USERNAME_CACHE_TTL_MINUTES = 60
|
|
AUDIT_BATCH_SIZE = 5
|
|
AUDIT_FLUSH_INTERVAL = 30
|
|
|
|
EMOJI_ROLE_MAP = {
|
|
"Overwatch": "(Overwatch)",
|
|
"Minecraft": "(Minecraft)",
|
|
"LeagueOfLegends": "(League Of Legends)",
|
|
"ClashRoyale": "(Clash Royale)",
|
|
"CSGO": "(CSGO)",
|
|
"CivilizationVI": "(Civilization VI)",
|
|
"Python": "Computer Nerd",
|
|
"computer": "Computer Nerd",
|
|
"Valorant": "(Valorant)",
|
|
"Ark": "(Ark Survival Evolved)",
|
|
"AmongUs": "(Among Us)",
|
|
"RainbowSixSiege": "(Rainbow Six Siege)",
|
|
"Phasmophobia": "(Phasmophobia)",
|
|
"StardewValley": "(Stardew Valley)",
|
|
"Tarkov": "(Tarkov)",
|
|
"LethalCompany": "(Lethal Company)",
|
|
"BTD": "(Balloons Tower Defense)",
|
|
"HellDivers": "(Hell Divers)",
|
|
"ABI": "(Arena Breakout Infinite)",
|
|
"UnoReverse": "(Uno)",
|
|
"Hytale": "(Hytale)"
|
|
}
|
|
|
|
|
|
class ConfigValidator:
|
|
REQUIRED = ['DISCORD_TOKEN']
|
|
OPTIONAL = {
|
|
'MINECRAFT_RCON_PASSWORD': 'Minecraft commands',
|
|
'PELICAN_API_KEY': 'Pelican integration',
|
|
'OLLAMA_HOST': '/ask command'
|
|
}
|
|
|
|
@classmethod
|
|
def validate(cls):
|
|
errors = []
|
|
warnings = []
|
|
|
|
for var in cls.REQUIRED:
|
|
if not os.getenv(var):
|
|
errors.append(f"Missing required: {var}")
|
|
|
|
for var, feature in cls.OPTIONAL.items():
|
|
if not os.getenv(var):
|
|
warnings.append(f"Missing {var} - {feature} may use defaults")
|
|
|
|
return errors, warnings
|
|
|
|
|
|
class UsernameCache:
|
|
def __init__(self, ttl_minutes: int = 60):
|
|
self.cache = {}
|
|
self.ttl = timedelta(minutes=ttl_minutes)
|
|
|
|
async def is_valid(self, username: str, session: aiohttp.ClientSession) -> bool:
|
|
if username in self.cache:
|
|
is_valid, timestamp = self.cache[username]
|
|
if datetime.now() - timestamp < self.ttl:
|
|
return is_valid
|
|
|
|
url = f'https://api.mojang.com/users/profiles/minecraft/{username}'
|
|
try:
|
|
async with session.get(url) as response:
|
|
is_valid = response.status == 200
|
|
self.cache[username] = (is_valid, datetime.now())
|
|
return is_valid
|
|
except Exception as e:
|
|
logger.error(f"Error validating username: {e}")
|
|
return False
|
|
|
|
|
|
username_cache = UsernameCache(ttl_minutes=USERNAME_CACHE_TTL_MINUTES)
|
|
|
|
|
|
class MediaManager:
|
|
def __init__(self):
|
|
self._cache = {}
|
|
|
|
def get_random(self, category: str) -> str | None:
|
|
if category not in self._cache:
|
|
patterns = [f"media/{category}.gif", f"media/{category}1.gif",
|
|
f"media/{category}2.gif", f"media/{category}3.gif"]
|
|
self._cache[category] = [p for p in patterns if Path(p).exists()]
|
|
|
|
files = self._cache[category]
|
|
return random.choice(files) if files else None
|
|
|
|
|
|
media_manager = MediaManager()
|
|
|
|
|
|
class AuditLogger:
|
|
def __init__(self, channel_id: int, batch_size: int = AUDIT_BATCH_SIZE,
|
|
flush_interval: int = AUDIT_FLUSH_INTERVAL):
|
|
self.channel_id = channel_id
|
|
self.batch_size = batch_size
|
|
self.flush_interval = flush_interval
|
|
self.queue = []
|
|
self.last_flush = datetime.now()
|
|
|
|
async def log(self, message: str, color: int = 0x980000):
|
|
self.queue.append((message, color, datetime.now()))
|
|
if len(self.queue) >= self.batch_size or \
|
|
(datetime.now() - self.last_flush).total_seconds() >= self.flush_interval:
|
|
await self.flush()
|
|
|
|
async def flush(self):
|
|
if not self.queue:
|
|
return
|
|
|
|
channel = client.get_channel(self.channel_id)
|
|
if not channel:
|
|
return
|
|
|
|
# Process in batches of 10 (Discord embed field limit)
|
|
while self.queue:
|
|
batch = self.queue[:10]
|
|
self.queue = self.queue[10:]
|
|
|
|
embed = discord.Embed(color=0x980000, timestamp=datetime.now())
|
|
for msg, _, timestamp in batch:
|
|
embed.add_field(name=timestamp.strftime("%H:%M:%S"), value=msg[:1024], inline=False)
|
|
|
|
try:
|
|
await channel.send(embed=embed)
|
|
except Exception as e:
|
|
logger.error(f"Failed to send audit log: {e}", exc_info=True)
|
|
break
|
|
|
|
self.last_flush = datetime.now()
|
|
|
|
|
|
audit_logger = AuditLogger(AUDIT_CHANNEL_ID)
|
|
|
|
|
|
class MetricsCollector:
|
|
def __init__(self):
|
|
self.command_counts = Counter()
|
|
self.error_counts = Counter()
|
|
self.start_time = datetime.now()
|
|
|
|
def record_command(self, command_name: str):
|
|
self.command_counts[command_name] += 1
|
|
|
|
def record_error(self, command_name: str):
|
|
self.error_counts[command_name] += 1
|
|
|
|
def get_stats(self) -> dict:
|
|
uptime = datetime.now() - self.start_time
|
|
return {
|
|
"uptime_seconds": uptime.total_seconds(),
|
|
"commands_executed": sum(self.command_counts.values()),
|
|
"top_commands": self.command_counts.most_common(5),
|
|
"error_count": sum(self.error_counts.values())
|
|
}
|
|
|
|
|
|
metrics = MetricsCollector()
|
|
|
|
|
|
def sanitize_input(text: str, max_length: int = MAX_INPUT_LENGTH) -> str:
|
|
text = text.strip()[:max_length]
|
|
text = ''.join(char for char in text if char.isprintable())
|
|
return text
|
|
|
|
|
|
def handle_command_errors(func):
|
|
@wraps(func)
|
|
async def wrapper(interaction: discord.Interaction, *args, **kwargs):
|
|
try:
|
|
metrics.record_command(func.__name__)
|
|
return await func(interaction, *args, **kwargs)
|
|
except Exception as e:
|
|
metrics.record_error(func.__name__)
|
|
logger.error(f"Error in {func.__name__}: {e}", exc_info=True)
|
|
error_msg = "An unexpected error occurred. Please try again later."
|
|
if not interaction.response.is_done():
|
|
await interaction.response.send_message(error_msg, ephemeral=True)
|
|
else:
|
|
await interaction.followup.send(error_msg, ephemeral=True)
|
|
return wrapper
|
|
|
|
|
|
def execute_rcon(host: str, password: str, command: str):
|
|
with MCRcon(host, password, timeout=3) as mcr:
|
|
return mcr.command(command)
|
|
|
|
|
|
async def rcon_command(host: str, password: str, command: str, timeout: float = RCON_TIMEOUT):
|
|
try:
|
|
loop = asyncio.get_event_loop()
|
|
func = partial(execute_rcon, host, password, command)
|
|
return await asyncio.wait_for(loop.run_in_executor(None, func), timeout=timeout)
|
|
except asyncio.TimeoutError:
|
|
raise Exception("RCON timeout - server may be offline")
|
|
except Exception as e:
|
|
raise Exception(f"RCON error: {str(e)}")
|
|
|
|
|
|
async def send_interaction_media(
|
|
interaction: discord.Interaction,
|
|
member: discord.Member,
|
|
action: str,
|
|
media_prefix: str
|
|
):
|
|
messages = {
|
|
'kill': f"You killed {member}",
|
|
'punch': f"You punched {member}",
|
|
'hug': f"{member} has been squeezed tightly!",
|
|
'revive': f"{member} has been brought back to life"
|
|
}
|
|
message = messages[action]
|
|
|
|
media_file = media_manager.get_random(media_prefix)
|
|
if media_file:
|
|
await interaction.response.send_message(
|
|
message, file=discord.File(media_file)
|
|
)
|
|
else:
|
|
await interaction.response.send_message(message)
|
|
|
|
|
|
whitelisted_usernames = set()
|
|
WHITELIST_FILE = Path("data/whitelisted_usernames.json")
|
|
|
|
|
|
def load_whitelisted_usernames():
|
|
global whitelisted_usernames
|
|
try:
|
|
if WHITELIST_FILE.exists():
|
|
with open(WHITELIST_FILE) as f:
|
|
whitelisted_usernames = set(json.load(f))
|
|
except Exception as e:
|
|
logger.error(f"Error loading whitelisted usernames: {e}")
|
|
|
|
|
|
def save_whitelisted_username(username: str):
|
|
whitelisted_usernames.add(username)
|
|
try:
|
|
WHITELIST_FILE.parent.mkdir(parents=True, exist_ok=True)
|
|
with open(WHITELIST_FILE, 'w') as f:
|
|
json.dump(list(whitelisted_usernames), f)
|
|
except Exception as e:
|
|
logger.error(f"Error saving whitelisted username: {e}")
|
|
|
|
|
|
async def minecraft_username_autocomplete(
|
|
interaction: discord.Interaction,
|
|
current: str
|
|
) -> list:
|
|
return [
|
|
app_commands.Choice(name=username, value=username)
|
|
for username in sorted(whitelisted_usernames)
|
|
if current.lower() in username.lower()
|
|
][:25]
|
|
|
|
|
|
class CustomBot(commands.Bot):
|
|
def __init__(self):
|
|
intents = discord.Intents.all()
|
|
intents.message_content = True
|
|
super().__init__(
|
|
command_prefix=".",
|
|
intents=intents
|
|
)
|
|
self.status_cycle = cycle([
|
|
"The Lotus Guild is boomin",
|
|
"lotusguild.org",
|
|
"Ranked Minesweeper"
|
|
])
|
|
self.remove_command("help")
|
|
self.http_session: aiohttp.ClientSession = None
|
|
|
|
async def setup_hook(self):
|
|
self.http_session = aiohttp.ClientSession()
|
|
|
|
if not MINECRAFT_RCON_PASSWORD:
|
|
logger.warning("MINECRAFT_RCON_PASSWORD not set - /minecraft command will fail")
|
|
if not PELICAN_API_KEY:
|
|
logger.warning("PELICAN_API_KEY not set - Pelican features disabled")
|
|
if not OLLAMA_URL:
|
|
logger.warning("OLLAMA_HOST not set - /ask command will fail")
|
|
|
|
load_whitelisted_usernames()
|
|
|
|
logger.info("Syncing commands...")
|
|
await self.tree.sync()
|
|
logger.info("Commands synced!")
|
|
|
|
async def close(self):
|
|
if self.http_session:
|
|
await self.http_session.close()
|
|
await audit_logger.flush()
|
|
await super().close()
|
|
|
|
@tasks.loop(seconds=STATUS_UPDATE_INTERVAL)
|
|
async def change_status(self):
|
|
try:
|
|
await self.change_presence(activity=discord.Game(next(self.status_cycle)))
|
|
except Exception as e:
|
|
logger.error(f"Error changing status: {e}", exc_info=True)
|
|
|
|
@change_status.before_loop
|
|
async def before_change_status(self):
|
|
await self.wait_until_ready()
|
|
|
|
async def on_ready(self):
|
|
logger.info(f"Bot logged in as: {self.user.name}")
|
|
logger.info(f"Registered commands: {[cmd.name for cmd in self.tree.get_commands()]}")
|
|
self.change_status.start()
|
|
self.flush_audit_logs.start()
|
|
|
|
@tasks.loop(seconds=AUDIT_FLUSH_INTERVAL)
|
|
async def flush_audit_logs(self):
|
|
await audit_logger.flush()
|
|
|
|
@flush_audit_logs.before_loop
|
|
async def before_flush_audit_logs(self):
|
|
await self.wait_until_ready()
|
|
|
|
|
|
client = CustomBot()
|
|
|
|
|
|
def get_role_from_emoji(emoji_name: str) -> str:
|
|
return EMOJI_ROLE_MAP.get(emoji_name, emoji_name)
|
|
|
|
|
|
def has_role_check(role_id: int):
|
|
async def predicate(interaction: discord.Interaction) -> bool:
|
|
return any(role.id == role_id for role in interaction.user.roles)
|
|
return app_commands.check(predicate)
|
|
|
|
|
|
# ==================== EVENTS ====================
|
|
|
|
@client.event
|
|
async def on_message(message):
|
|
try:
|
|
if message.content.startswith(('.', '/')):
|
|
logger.info(f"Command in {message.channel}: {message.author} - {message.content[:50]}")
|
|
|
|
if message.author.id == 331862422996647938 and message.content.startswith('.'):
|
|
await message.channel.send("Whatever conversation is happening right now, Jared is right")
|
|
except Exception as e:
|
|
logger.error(f"Error in on_message: {e}", exc_info=True)
|
|
|
|
|
|
@client.event
|
|
async def on_member_join(member):
|
|
try:
|
|
logger.info(f"New member joined: {member.name} (ID: {member.id})")
|
|
await audit_logger.log(f"Member Joined\nUser: {member.mention}\nAccount Created: {member.created_at}")
|
|
except Exception as e:
|
|
logger.error(f"Error in on_member_join: {e}", exc_info=True)
|
|
|
|
|
|
@client.event
|
|
async def on_member_remove(member):
|
|
try:
|
|
logger.info(f"Member left: {member.name} (ID: {member.id})")
|
|
await audit_logger.log(f"Member Left\nUser: {member.name} ({member.id})")
|
|
except Exception as e:
|
|
logger.error(f"Error in on_member_remove: {e}", exc_info=True)
|
|
|
|
|
|
@client.event
|
|
async def on_message_delete(message):
|
|
try:
|
|
logger.warning(f"Message deleted in {message.channel}: {message.author}")
|
|
await audit_logger.log(f"Message Deleted\nChannel: {message.channel}\nUser: {message.author}\nContent: {message.content}")
|
|
except Exception as e:
|
|
logger.error(f"Error in on_message_delete: {e}", exc_info=True)
|
|
|
|
|
|
@client.event
|
|
async def on_voice_state_update(member, before, after):
|
|
try:
|
|
if before.channel != after.channel:
|
|
if after.channel:
|
|
await audit_logger.log(f"Voice Join\nUser: {member.mention}\nChannel: {after.channel.name}")
|
|
else:
|
|
await audit_logger.log(f"Voice Leave\nUser: {member.mention}\nChannel: {before.channel.name}")
|
|
except Exception as e:
|
|
logger.error(f"Error in on_voice_state_update: {e}", exc_info=True)
|
|
|
|
|
|
@client.event
|
|
async def on_integration_create(integration):
|
|
try:
|
|
await audit_logger.log(f"New Integration Added\nName: {integration.name}\nType: {integration.type}")
|
|
except Exception as e:
|
|
logger.error(f"Error in on_integration_create: {e}", exc_info=True)
|
|
|
|
|
|
@client.event
|
|
async def on_scheduled_event_create(event):
|
|
try:
|
|
await audit_logger.log(f"Event Created\nName: {event.name}\nStart Time: {event.start_time}")
|
|
except Exception as e:
|
|
logger.error(f"Error in on_scheduled_event_create: {e}", exc_info=True)
|
|
|
|
|
|
@client.event
|
|
async def on_scheduled_event_update(before, after):
|
|
try:
|
|
await audit_logger.log(f"Event Updated\nName: {after.name}\nChanges Made")
|
|
except Exception as e:
|
|
logger.error(f"Error in on_scheduled_event_update: {e}", exc_info=True)
|
|
|
|
|
|
@client.event
|
|
async def on_member_update(before, after):
|
|
try:
|
|
if before.timed_out_until != after.timed_out_until:
|
|
await audit_logger.log(f"Member Timeout\nUser: {after.mention}\nUntil: {after.timed_out_until}")
|
|
|
|
if before.premium_since != after.premium_since:
|
|
await audit_logger.log(f"Server Boost\nUser: {after.mention} boosted the server!")
|
|
|
|
if before.nick != after.nick:
|
|
await audit_logger.log(f"Nickname Change\nUser: {before.mention}\nBefore: {before.nick}\nAfter: {after.nick}")
|
|
|
|
if before.roles != after.roles:
|
|
logger.info(f"Role change for {before.name}: {before.roles} -> {after.roles}")
|
|
await audit_logger.log(
|
|
f"Role Update\nUser: {before.mention}\n"
|
|
f"Before: {', '.join([r.name for r in before.roles])}\n"
|
|
f"After: {', '.join([r.name for r in after.roles])}"
|
|
)
|
|
except Exception as e:
|
|
logger.error(f"Error in on_member_update: {e}", exc_info=True)
|
|
|
|
|
|
@client.event
|
|
async def on_member_ban(guild, user):
|
|
try:
|
|
await audit_logger.log(f"Member Banned\nUser: {user.name}")
|
|
except Exception as e:
|
|
logger.error(f"Error in on_member_ban: {e}", exc_info=True)
|
|
|
|
|
|
@client.event
|
|
async def on_guild_role_update(before, after):
|
|
try:
|
|
if before.permissions != after.permissions:
|
|
await audit_logger.log(f"Role Permissions Updated\nRole: {after.name}")
|
|
except Exception as e:
|
|
logger.error(f"Error in on_guild_role_update: {e}", exc_info=True)
|
|
|
|
|
|
@client.event
|
|
async def on_guild_emoji_create(emoji):
|
|
try:
|
|
await audit_logger.log(f"New Emoji Added\nName: {emoji.name}")
|
|
except Exception as e:
|
|
logger.error(f"Error in on_guild_emoji_create: {e}", exc_info=True)
|
|
|
|
|
|
@client.event
|
|
async def on_guild_sticker_create(sticker):
|
|
try:
|
|
await audit_logger.log(f"New Sticker Added\nName: {sticker.name}")
|
|
except Exception as e:
|
|
logger.error(f"Error in on_guild_sticker_create: {e}", exc_info=True)
|
|
|
|
|
|
@client.event
|
|
async def on_guild_role_create(role):
|
|
try:
|
|
await audit_logger.log(f"Role Created\nName: {role.name}\nColor: {role.color}")
|
|
except Exception as e:
|
|
logger.error(f"Error in on_guild_role_create: {e}", exc_info=True)
|
|
|
|
|
|
@client.event
|
|
async def on_guild_role_delete(role):
|
|
try:
|
|
await audit_logger.log(f"Role Deleted\nName: {role.name}")
|
|
except Exception as e:
|
|
logger.error(f"Error in on_guild_role_delete: {e}", exc_info=True)
|
|
|
|
|
|
@client.event
|
|
async def on_message_edit(before, after):
|
|
try:
|
|
if before.content == after.content:
|
|
return
|
|
|
|
logger.info(f"Message edited in {before.channel}")
|
|
|
|
await audit_logger.log(
|
|
f"Message Edited\n"
|
|
f"Channel: {before.channel.mention}\n"
|
|
f"User: {before.author.mention}\n"
|
|
f"Before: {before.content}\n"
|
|
f"After: {after.content}"
|
|
)
|
|
except Exception as e:
|
|
logger.error(f"Error in on_message_edit: {e}", exc_info=True)
|
|
|
|
|
|
@client.event
|
|
async def on_guild_channel_create(channel):
|
|
try:
|
|
logger.info(f"Channel created: {channel.name}")
|
|
except Exception as e:
|
|
logger.error(f"Error in on_guild_channel_create: {e}", exc_info=True)
|
|
|
|
|
|
@client.event
|
|
async def on_guild_channel_delete(channel):
|
|
try:
|
|
logger.info(f"Channel deleted: {channel.name}")
|
|
except Exception as e:
|
|
logger.error(f"Error in on_guild_channel_delete: {e}", exc_info=True)
|
|
|
|
|
|
@client.event
|
|
async def on_raw_reaction_add(payload):
|
|
try:
|
|
if payload.message_id != REACTION_MESSAGE_ID:
|
|
return
|
|
|
|
guild = client.get_guild(payload.guild_id)
|
|
if not guild:
|
|
return
|
|
|
|
member = await guild.fetch_member(payload.user_id)
|
|
if not member:
|
|
logger.warning(f"Member not found: {payload.user_id}")
|
|
return
|
|
|
|
role_name = get_role_from_emoji(payload.emoji.name)
|
|
role = discord.utils.get(guild.roles, name=role_name)
|
|
|
|
if role:
|
|
await member.add_roles(role)
|
|
try:
|
|
member_role = discord.utils.get(guild.roles, name='Member')
|
|
if member_role:
|
|
await member.add_roles(member_role)
|
|
except Exception as e:
|
|
logger.error(f"Failed to add Member role: {e}")
|
|
logger.info(f"Role {role_name} added to {member.name}")
|
|
else:
|
|
logger.warning(f"Role not found: {role_name}")
|
|
except Exception as e:
|
|
logger.error(f"Error in on_raw_reaction_add: {e}", exc_info=True)
|
|
|
|
|
|
@client.event
|
|
async def on_raw_reaction_remove(payload):
|
|
try:
|
|
if payload.message_id != REACTION_MESSAGE_ID:
|
|
return
|
|
|
|
guild = client.get_guild(payload.guild_id)
|
|
if not guild:
|
|
return
|
|
|
|
member = await guild.fetch_member(payload.user_id)
|
|
if not member:
|
|
logger.warning(f"Member not found: {payload.user_id}")
|
|
return
|
|
|
|
role_name = get_role_from_emoji(payload.emoji.name)
|
|
role = discord.utils.get(guild.roles, name=role_name)
|
|
|
|
if role:
|
|
await member.remove_roles(role)
|
|
logger.info(f"Role {role_name} removed from {member.name}")
|
|
else:
|
|
logger.warning(f"Role not found: {role_name}")
|
|
except Exception as e:
|
|
logger.error(f"Error in on_raw_reaction_remove: {e}", exc_info=True)
|
|
|
|
|
|
# ==================== SLASH COMMANDS ====================
|
|
|
|
@client.tree.command(name="help", description="Shows all available commands")
|
|
@handle_command_errors
|
|
async def help_command(interaction: discord.Interaction):
|
|
embed = discord.Embed(
|
|
color=discord.Color.from_rgb(152, 0, 0),
|
|
title="Command List",
|
|
description="Describes what commands do and how to use them."
|
|
)
|
|
|
|
embed.set_author(name="Help Page (Lotus Bot)", icon_url="https://lotusguild.org/Lotus.png")
|
|
embed.set_image(url="https://lotusguild.org/favicon.ico")
|
|
embed.set_thumbnail(url="https://lotusguild.org/favicon.ico")
|
|
|
|
embed.add_field(name="/help", value="Shows this help message with all available commands", inline=False)
|
|
embed.add_field(name="/ping", value="Shows the bot's response time in milliseconds", inline=False)
|
|
|
|
embed.add_field(name="/8ball", value="Ask the magic 8-ball a question and receive an answer", inline=False)
|
|
embed.add_field(name="/fortune", value="Get your fortune cookie message", inline=False)
|
|
embed.add_field(name="/flip", value="Flip a coin and get heads or tails", inline=False)
|
|
embed.add_field(name="/roll", value="Roll dice (format: NdS, example: 2d6)", inline=False)
|
|
embed.add_field(name="/random", value="Generate a random number between specified range", inline=False)
|
|
embed.add_field(name="/rps", value="Play Rock Paper Scissors against the bot", inline=False)
|
|
embed.add_field(name="/poll", value="Create a simple yes/no poll", inline=False)
|
|
embed.add_field(name="/trivia", value="Play a trivia game with random questions", inline=False)
|
|
|
|
embed.add_field(name="/agent", value="Get a random Valorant agent with their role", inline=False)
|
|
embed.add_field(name="/champion", value="Get a random League of Legends champion with their lane", inline=False)
|
|
embed.add_field(name="/minecraft", value="Whitelists a player on the Minecraft server and shows server info", inline=False)
|
|
embed.add_field(name="/hytale", value="Request whitelist on the Hytale server", inline=False)
|
|
embed.add_field(name="/problem", value="Express your opinion about Canada geese", inline=False)
|
|
|
|
embed.add_field(name="/kill", value="Kill another user with a random animation", inline=False)
|
|
embed.add_field(name="/punch", value="Punch another user with a random animation", inline=False)
|
|
embed.add_field(name="/hug", value="Hug another user with a random animation", inline=False)
|
|
embed.add_field(name="/revive", value="Revive another user with a random animation", inline=False)
|
|
|
|
if discord.utils.get(interaction.user.roles, id=ADMIN_ROLE_ID):
|
|
embed.add_field(name="Admin Commands", value="---------------", inline=False)
|
|
embed.add_field(name="/clear", value="Clears specified number of messages from the channel", inline=False)
|
|
embed.add_field(name="/lockdown", value="Lock a channel to prevent messages", inline=False)
|
|
embed.add_field(name="/unlock", value="Unlock a previously locked channel", inline=False)
|
|
embed.add_field(name="/health", value="Check bot system health and service status", inline=False)
|
|
|
|
embed.set_footer(text="Made by https://lotusguild.org")
|
|
await interaction.response.send_message(embed=embed)
|
|
|
|
|
|
@client.tree.command(name="ping", description="Check the bot's latency")
|
|
@handle_command_errors
|
|
async def ping(interaction: discord.Interaction):
|
|
await interaction.response.send_message(f"Pong! {round(client.latency * 1000)}ms")
|
|
|
|
|
|
@client.tree.command(name="problem", description="Express your opinion about Canada geese")
|
|
@handle_command_errors
|
|
async def problem(interaction: discord.Interaction):
|
|
media_file = media_manager.get_random("canadagoose")
|
|
msg = "If you got a problem with canada gooses then you got a problem with me and I suggest you let that one marinate."
|
|
if media_file:
|
|
await interaction.response.send_message(msg, file=discord.File(media_file))
|
|
else:
|
|
await interaction.response.send_message(msg)
|
|
|
|
|
|
@client.tree.command(name="clear", description="Clear messages from the channel")
|
|
@app_commands.describe(amount="Number of messages to delete")
|
|
@has_role_check(ADMIN_ROLE_ID)
|
|
@handle_command_errors
|
|
async def clear(interaction: discord.Interaction, amount: int = 5):
|
|
await interaction.response.defer(ephemeral=True)
|
|
deleted = await interaction.channel.purge(limit=amount)
|
|
await interaction.followup.send(f"Successfully cleared {len(deleted)} messages!", ephemeral=True)
|
|
|
|
|
|
@client.tree.command(name="lockdown", description="Lock a channel")
|
|
@has_role_check(ADMIN_ROLE_ID)
|
|
@handle_command_errors
|
|
async def lockdown(interaction: discord.Interaction, channel: discord.TextChannel = None):
|
|
channel = channel or interaction.channel
|
|
await channel.set_permissions(interaction.guild.default_role, send_messages=False)
|
|
await audit_logger.log(f"Channel Locked\nChannel: {channel.name}")
|
|
await interaction.response.send_message(f"Locked {channel.mention}")
|
|
|
|
|
|
@client.tree.command(name="unlock", description="Unlock a channel")
|
|
@has_role_check(ADMIN_ROLE_ID)
|
|
@handle_command_errors
|
|
async def unlock(interaction: discord.Interaction, channel: discord.TextChannel = None):
|
|
channel = channel or interaction.channel
|
|
await channel.set_permissions(interaction.guild.default_role, send_messages=True)
|
|
await audit_logger.log(f"Channel Unlocked\nChannel: {channel.name}")
|
|
await interaction.response.send_message(f"Unlocked {channel.mention}")
|
|
|
|
|
|
@client.tree.command(name="minecraft", description="Whitelist a player on the Minecraft server")
|
|
@app_commands.describe(minecraft_username="The Minecraft username to whitelist")
|
|
@app_commands.autocomplete(minecraft_username=minecraft_username_autocomplete)
|
|
@has_role_check(MINECRAFT_ROLE_ID)
|
|
@handle_command_errors
|
|
async def minecraft(interaction: discord.Interaction, minecraft_username: str):
|
|
if not await username_cache.is_valid(minecraft_username, client.http_session):
|
|
await interaction.response.send_message("Invalid MC Username", ephemeral=True)
|
|
return
|
|
|
|
if not MINECRAFT_RCON_PASSWORD:
|
|
logger.error("MINECRAFT_RCON_PASSWORD not configured")
|
|
await interaction.response.send_message(
|
|
"Server configuration error. Please contact an administrator.",
|
|
ephemeral=True
|
|
)
|
|
return
|
|
|
|
await interaction.response.defer()
|
|
|
|
try:
|
|
response = await rcon_command(MINECRAFT_RCON_HOST, MINECRAFT_RCON_PASSWORD,
|
|
f"whitelist add {minecraft_username}")
|
|
logger.info(f"RCON response: {response}")
|
|
except Exception as e:
|
|
logger.error(f"RCON error: {e}", exc_info=True)
|
|
await interaction.followup.send(
|
|
"An error occurred while whitelisting the player (jared will fix just let him know)."
|
|
)
|
|
return
|
|
|
|
save_whitelisted_username(minecraft_username)
|
|
|
|
minecraft_embed = discord.Embed(
|
|
color=discord.Color.from_rgb(152, 0, 0),
|
|
title="Minecraft"
|
|
)
|
|
minecraft_embed.set_author(
|
|
name="(Lotus Bot)",
|
|
icon_url="https://photos.lotusguild.org/api/assets/3c4eb2da-0d06-407f-bdb7-c9e4cf795f0a/thumbnail?key=4aoZxX5-FHE3m_Ywwz1uGo3iNW53kmFztxfUw91PdOgphPNxayLFicNuxPvit1OYTpY&size=preview&c=jUqDBQAWF9iId3J%2FyAeIcIAICEd4d3BzSA%3D%3D"
|
|
)
|
|
minecraft_embed.set_image(
|
|
url="https://photos.lotusguild.org/api/assets/3c4eb2da-0d06-407f-bdb7-c9e4cf795f0a/thumbnail?key=4aoZxX5-FHE3m_Ywwz1uGo3iNW53kmFztxfUw91PdOgphPNxayLFicNuxPvit1OYTpY&size=preview&c=jUqDBQAWF9iId3J%2FyAeIcIAICEd4d3BzSA%3D%3D"
|
|
)
|
|
minecraft_embed.set_thumbnail(
|
|
url="https://photos.lotusguild.org/api/assets/3c4eb2da-0d06-407f-bdb7-c9e4cf795f0a/thumbnail?key=4aoZxX5-FHE3m_Ywwz1uGo3iNW53kmFztxfUw91PdOgphPNxayLFicNuxPvit1OYTpY&size=preview&c=jUqDBQAWF9iId3J%2FyAeIcIAICEd4d3BzSA%3D%3D"
|
|
)
|
|
minecraft_embed.add_field(name="You", value="have been whitelisted on the SMP", inline=False)
|
|
minecraft_embed.add_field(name="Server Address", value="minecraft.lotusguild.org", inline=False)
|
|
minecraft_embed.add_field(name="Version", value="1.21.11", inline=False)
|
|
minecraft_embed.set_footer(text="Thanks for using Lotus Minecraft Server!")
|
|
await interaction.followup.send(embed=minecraft_embed)
|
|
|
|
|
|
@client.tree.command(name="hytale", description="Request whitelist on the Hytale server")
|
|
@app_commands.describe(hytale_username="Your Hytale username")
|
|
@has_role_check(HYTALE_ROLE_ID)
|
|
@handle_command_errors
|
|
async def hytale(interaction: discord.Interaction, hytale_username: str):
|
|
await interaction.response.defer()
|
|
|
|
if not hytale_username.replace('_', '').replace('-', '').isalnum():
|
|
await interaction.followup.send("Invalid username. Use only letters, numbers, _ and -", ephemeral=True)
|
|
return
|
|
if not (MIN_USERNAME_LENGTH <= len(hytale_username) <= MAX_USERNAME_LENGTH):
|
|
await interaction.followup.send(
|
|
f"Username must be {MIN_USERNAME_LENGTH}-{MAX_USERNAME_LENGTH} characters",
|
|
ephemeral=True
|
|
)
|
|
return
|
|
|
|
await audit_logger.log(
|
|
f"Hytale Whitelist Request\nUser: {interaction.user.mention}\nUsername: `{hytale_username}`\nAction: Run `/whitelist add {hytale_username}`",
|
|
color=0x00ff00
|
|
)
|
|
|
|
embed = discord.Embed(color=discord.Color.from_rgb(152, 0, 0), title="Hytale")
|
|
embed.set_author(name="(Lotus Bot)", icon_url="https://photos.lotusguild.org/api/assets/3c4eb2da-0d06-407f-bdb7-c9e4cf795f0a/thumbnail?key=wd3-Z4zFdrR6WBUfXLnBN8RgeT9tivgQwT6iDN3T0AaBIOfyIuYrbEszABB8OvUplFM&size=preview&c=jUqDBQAWF9iId3J%2FyAeIcIAICEd4d3BzSA%3D%3D")
|
|
embed.add_field(name="Request Submitted", value=f"Whitelist request for `{hytale_username}` sent!", inline=False)
|
|
embed.add_field(name="Server Address", value="hytale.lotusguild.org", inline=False)
|
|
embed.add_field(name="Status", value="An admin will whitelist you shortly!", inline=False)
|
|
embed.set_footer(text="Welcome to Lotus Hytale Server!")
|
|
await interaction.followup.send(embed=embed)
|
|
logger.info(f"Hytale whitelist request: {hytale_username} by {interaction.user.name}")
|
|
|
|
|
|
@client.tree.command(name="ask", description="Ask a question to Lotus LLM")
|
|
@app_commands.describe(question="Your question for the AI")
|
|
@app_commands.checks.cooldown(1, COOLDOWN_MINUTES * 60, key=lambda i: i.user.id)
|
|
@has_role_check(COOL_KIDS_ROLE_ID)
|
|
@handle_command_errors
|
|
async def ask(interaction: discord.Interaction, question: str):
|
|
question = sanitize_input(question)
|
|
if not question:
|
|
await interaction.response.send_message("Please provide a valid question.", ephemeral=True)
|
|
return
|
|
|
|
await interaction.response.defer()
|
|
|
|
model = "lotusllmben" if interaction.user.id == 460640040096104459 else "lotusllm"
|
|
logger.info(f"Sending question to Ollama: {question}")
|
|
|
|
async with client.http_session.post(
|
|
f"{OLLAMA_URL}/api/generate",
|
|
json={
|
|
"model": model,
|
|
"prompt": question,
|
|
"stream": True
|
|
}
|
|
) as response:
|
|
full_response = ""
|
|
async for line in response.content:
|
|
try:
|
|
chunk = json.loads(line)
|
|
if "response" in chunk:
|
|
full_response += chunk["response"]
|
|
except json.JSONDecodeError as e:
|
|
logger.error(f"Failed to parse JSON: {e}")
|
|
|
|
embed = discord.Embed(
|
|
title="Lotus LLM",
|
|
description=full_response if full_response else "No response received from server",
|
|
color=discord.Color.from_rgb(152, 0, 0)
|
|
)
|
|
embed.add_field(name="Question", value=question, inline=False)
|
|
embed.set_footer(text=f"Asked by {interaction.user.display_name}")
|
|
await interaction.followup.send(embed=embed)
|
|
|
|
|
|
@client.tree.command(name="8ball", description="Ask the magic 8-ball a question")
|
|
@app_commands.describe(question="What would you like to ask the 8ball?")
|
|
@handle_command_errors
|
|
async def eight_ball(interaction: discord.Interaction, question: str):
|
|
possible_responses = [
|
|
"It is certain", "Without a doubt", "You may rely on it",
|
|
"Yes definitely", "It is decidedly so", "As I see it, yes",
|
|
"Most likely", "Yes sir!", "Hell yeah my dude", "100% easily",
|
|
"Reply hazy try again", "Ask again later", "Better not tell you now",
|
|
"Cannot predict now", "Concentrate and ask again", "Idk bro",
|
|
"Don't count on it", "My reply is no", "My sources say no",
|
|
"Outlook not so good", "Very doubtful", "Hell no", "Prolly not"
|
|
]
|
|
|
|
embed = discord.Embed(
|
|
color=discord.Color.from_rgb(152, 0, 0),
|
|
title="Magic 8-Ball"
|
|
)
|
|
embed.add_field(name="Question", value=question, inline=False)
|
|
embed.add_field(name="Answer", value=random.choice(possible_responses), inline=False)
|
|
embed.set_footer(text=f"Asked by {interaction.user.display_name}")
|
|
|
|
await interaction.response.send_message(embed=embed)
|
|
|
|
|
|
@client.tree.command(name="fortune", description="Get your fortune cookie message")
|
|
@handle_command_errors
|
|
async def fortune(interaction: discord.Interaction):
|
|
possible_responses = [
|
|
"If you eat something & nobody sees you eat it, it has no calories",
|
|
"Your pet is plotting world domination",
|
|
"Error 404: Fortune not found. Try again after system reboot",
|
|
"The fortune you seek is in another cookie",
|
|
"A journey of a thousand miles begins with ordering delivery",
|
|
"You will find great fortune... in between your couch cushions",
|
|
"A true friend is someone who tells you when your stream is muted",
|
|
"Your next competitive match will be legendary",
|
|
"The cake is still a lie",
|
|
"Press Alt+F4 for instant success",
|
|
"You will not encounter any campers today",
|
|
"Your tank will have a healer",
|
|
"No one will steal your pentakill",
|
|
"Your random teammate will have a mic",
|
|
"You will find diamonds on your first dig",
|
|
"The boss will drop the rare loot",
|
|
"Your speedrun will be WR pace",
|
|
"No lag spikes in your next match",
|
|
"Your gaming chair will grant you powers",
|
|
"The RNG gods will bless you",
|
|
"You will not get third partied",
|
|
"Your squad will actually stick together",
|
|
"The enemy team will forfeit at 15",
|
|
"Your aim will be crispy today",
|
|
"You will escape the backrooms",
|
|
"The imposter will not sus you",
|
|
"Your Minecraft bed will remain unbroken",
|
|
"You will get Play of the Game",
|
|
"Your next meme will go viral",
|
|
"Someone is talking about you in their Discord server",
|
|
"Your FBI agent thinks you're hilarious",
|
|
"Your next TikTok will hit the FYP, if the government doesn't ban it first",
|
|
"Someone will actually read your Twitter thread",
|
|
"Your DMs will be blessed with quality memes today",
|
|
"Touch grass (respectfully)",
|
|
"The algorithm will be in your favor today",
|
|
"Your next Spotify shuffle will hit different",
|
|
"Someone saved your Instagram post",
|
|
"Your Reddit comment will get gold",
|
|
"POV: You're about to go viral",
|
|
"Main character energy detected",
|
|
"No cap, you're gonna have a great day fr fr",
|
|
"Your rizz levels are increasing",
|
|
"You will not get ratio'd today",
|
|
"Someone will actually use your custom emoji",
|
|
"Your next selfie will be iconic",
|
|
"Buy a dolphin - your life will have a porpoise",
|
|
"Stop procrastinating - starting tomorrow",
|
|
"Catch fire with enthusiasm - people will come for miles to watch you burn",
|
|
"Your code will compile on the first try today",
|
|
"A semicolon will save your day",
|
|
"The bug you've been hunting is just a typo",
|
|
"Your next Git commit will be perfect",
|
|
"You will find the solution on the first StackOverflow link",
|
|
"Your Docker container will build without errors",
|
|
"The cloud is just someone else's computer",
|
|
"Your backup strategy will soon prove its worth",
|
|
"A mechanical keyboard is in your future",
|
|
"You will finally understand regex... maybe",
|
|
"Your CSS will align perfectly on the first try",
|
|
"Someone will star your GitHub repo today",
|
|
"Your Linux installation will not break after updates",
|
|
"You will remember to push your changes before shutdown",
|
|
"Your code comments will actually make sense in 6 months",
|
|
"The missing curly brace is on line 247",
|
|
"Have you tried turning it off and on again?",
|
|
"Your next pull request will be merged without comments",
|
|
"Your keyboard RGB will sync perfectly today",
|
|
"You will find that memory leak",
|
|
"Your next algorithm will have O(1) complexity",
|
|
"The force quit was strong with this one",
|
|
"Ctrl+S will save you today",
|
|
"Your next Python script will need no debugging",
|
|
"Your next API call will return 200 OK"
|
|
]
|
|
|
|
embed = discord.Embed(
|
|
color=discord.Color.from_rgb(152, 0, 0),
|
|
title="Fortune Cookie"
|
|
)
|
|
embed.add_field(name="Your Fortune", value=random.choice(possible_responses), inline=False)
|
|
embed.set_footer(text=f"Cracked open by {interaction.user.display_name}")
|
|
|
|
await interaction.response.send_message(embed=embed)
|
|
|
|
|
|
@client.tree.command(name="flip", description="Flip a coin")
|
|
@handle_command_errors
|
|
async def flip(interaction: discord.Interaction):
|
|
result = random.choice(["Heads", "Tails"])
|
|
embed = discord.Embed(title="Coin Flip", color=discord.Color.from_rgb(152, 0, 0))
|
|
embed.add_field(name="Result", value=result)
|
|
await interaction.response.send_message(embed=embed)
|
|
|
|
|
|
@client.tree.command(name="roll", description="Roll dice (e.g. 2d6)")
|
|
@app_commands.describe(dice="Format: NdS (N=number of dice, S=sides)")
|
|
@handle_command_errors
|
|
async def roll(interaction: discord.Interaction, dice: str = "1d6"):
|
|
try:
|
|
num, sides = map(int, dice.lower().split('d'))
|
|
except ValueError:
|
|
await interaction.response.send_message("Please use format: NdS (example: 2d6)", ephemeral=True)
|
|
return
|
|
|
|
if num < 1 or num > MAX_DICE_COUNT:
|
|
await interaction.response.send_message(f"Number of dice must be 1-{MAX_DICE_COUNT}", ephemeral=True)
|
|
return
|
|
if sides < 2 or sides > MAX_DICE_SIDES:
|
|
await interaction.response.send_message(f"Sides must be 2-{MAX_DICE_SIDES}", ephemeral=True)
|
|
return
|
|
|
|
results = [random.randint(1, sides) for _ in range(num)]
|
|
embed = discord.Embed(title="Dice Roll", color=discord.Color.from_rgb(152, 0, 0))
|
|
embed.add_field(name="Results", value=f"Rolls: {results}\nTotal: {sum(results)}")
|
|
await interaction.response.send_message(embed=embed)
|
|
|
|
|
|
@client.tree.command(name="random", description="Generate a random number")
|
|
@app_commands.describe(min="Minimum number", max="Maximum number")
|
|
@handle_command_errors
|
|
async def random_number(interaction: discord.Interaction, min: int = 1, max: int = 100):
|
|
result = random.randint(min, max)
|
|
embed = discord.Embed(title="Random Number", color=discord.Color.from_rgb(152, 0, 0))
|
|
embed.add_field(name="Result", value=str(result))
|
|
await interaction.response.send_message(embed=embed)
|
|
|
|
|
|
@client.tree.command(name="rps", description="Play Rock Paper Scissors")
|
|
@app_commands.describe(choice="Your choice: rock, paper, or scissors")
|
|
@handle_command_errors
|
|
async def rps(interaction: discord.Interaction, choice: str):
|
|
choices = ["rock", "paper", "scissors"]
|
|
bot_choice = random.choice(choices)
|
|
choice = choice.lower()
|
|
|
|
if choice not in choices:
|
|
await interaction.response.send_message("Please choose rock, paper, or scissors!", ephemeral=True)
|
|
return
|
|
|
|
embed = discord.Embed(title="Rock Paper Scissors", color=discord.Color.from_rgb(152, 0, 0))
|
|
embed.add_field(name="Your Choice", value=choice.capitalize())
|
|
embed.add_field(name="Bot's Choice", value=bot_choice.capitalize())
|
|
|
|
if choice == bot_choice:
|
|
result = "It's a tie!"
|
|
elif (choice == "rock" and bot_choice == "scissors") or \
|
|
(choice == "paper" and bot_choice == "rock") or \
|
|
(choice == "scissors" and bot_choice == "paper"):
|
|
result = "You win!"
|
|
else:
|
|
result = "Bot wins!"
|
|
|
|
embed.add_field(name="Result", value=result, inline=False)
|
|
await interaction.response.send_message(embed=embed)
|
|
|
|
|
|
@client.tree.command(name="poll", description="Create a simple yes/no poll")
|
|
@app_commands.describe(question="The poll question")
|
|
@handle_command_errors
|
|
async def poll(interaction: discord.Interaction, question: str):
|
|
embed = discord.Embed(title="Poll", description=question, color=discord.Color.from_rgb(152, 0, 0))
|
|
embed.set_footer(text=f"Created by {interaction.user.display_name}")
|
|
await interaction.response.send_message(embed=embed)
|
|
message = await interaction.original_response()
|
|
await message.add_reaction("\U0001f44d")
|
|
await message.add_reaction("\U0001f44e")
|
|
|
|
|
|
@client.tree.command(name="kill", description="Kill another user")
|
|
@app_commands.describe(member="The user to kill")
|
|
@handle_command_errors
|
|
async def kill(interaction: discord.Interaction, member: discord.Member):
|
|
await send_interaction_media(interaction, member, 'kill', 'kill')
|
|
|
|
|
|
@client.tree.command(name="punch", description="Punch another user")
|
|
@app_commands.describe(member="The user to punch")
|
|
@handle_command_errors
|
|
async def punch(interaction: discord.Interaction, member: discord.Member):
|
|
await send_interaction_media(interaction, member, 'punch', 'punch')
|
|
|
|
|
|
@client.tree.command(name="hug", description="Hug another user")
|
|
@app_commands.describe(member="The user to hug")
|
|
@handle_command_errors
|
|
async def hug(interaction: discord.Interaction, member: discord.Member):
|
|
await send_interaction_media(interaction, member, 'hug', 'hug')
|
|
|
|
|
|
@client.tree.command(name="revive", description="Revive another user")
|
|
@app_commands.describe(member="The user to revive")
|
|
@handle_command_errors
|
|
async def revive(interaction: discord.Interaction, member: discord.Member):
|
|
await send_interaction_media(interaction, member, 'revive', 'revive')
|
|
|
|
|
|
@client.tree.command(name="agent", description="Get a random Valorant agent")
|
|
@handle_command_errors
|
|
async def agent(interaction: discord.Interaction):
|
|
agents = {
|
|
"Duelists": ["Jett", "Phoenix", "Raze", "Reyna", "Yoru", "Neon", "Iso", "Waylay"],
|
|
"Controllers": ["Brimstone", "Viper", "Omen", "Astra", "Harbor", "Clove"],
|
|
"Initiators": ["Sova", "Breach", "Skye", "KAY/O", "Fade", "Gekko", "Tejo"],
|
|
"Sentinels": ["Killjoy", "Cypher", "Sage", "Chamber", "Deadlock", "Vyse", "Veto"]
|
|
}
|
|
|
|
category = random.choice(list(agents.keys()))
|
|
selected_agent = random.choice(agents[category])
|
|
|
|
embed = discord.Embed(
|
|
title="Valorant Agent Picker",
|
|
color=discord.Color.from_rgb(152, 0, 0)
|
|
)
|
|
embed.add_field(name="Selected Agent", value=selected_agent, inline=False)
|
|
embed.add_field(name="Role", value=category, inline=False)
|
|
embed.set_footer(text=f"Selected for {interaction.user.display_name}")
|
|
|
|
await interaction.response.send_message(embed=embed)
|
|
|
|
|
|
@client.tree.command(name="champion", description="Get a random League of Legends champion")
|
|
@handle_command_errors
|
|
async def champion(interaction: discord.Interaction):
|
|
champions = {
|
|
"Top": [
|
|
"Aatrox", "Ambessa", "Aurora", "Camille", "Cho'Gath", "Darius",
|
|
"Dr. Mundo", "Fiora", "Gangplank", "Garen", "Gnar", "Gragas",
|
|
"Gwen", "Illaoi", "Irelia", "Jax", "Jayce", "K'Sante", "Kennen",
|
|
"Kled", "Malphite", "Mordekaiser", "Nasus", "Olaf", "Ornn",
|
|
"Poppy", "Quinn", "Renekton", "Riven", "Rumble", "Sett", "Shen",
|
|
"Singed", "Sion", "Teemo", "Trundle", "Tryndamere", "Urgot",
|
|
"Vladimir", "Volibear", "Wukong", "Yone", "Yorick"
|
|
],
|
|
"Jungle": [
|
|
"Amumu", "Bel'Veth", "Briar", "Diana", "Ekko", "Elise",
|
|
"Evelynn", "Fiddlesticks", "Graves", "Hecarim", "Ivern",
|
|
"Jarvan IV", "Kayn", "Kha'Zix", "Kindred", "Lee Sin", "Lillia",
|
|
"Maokai", "Master Yi", "Nidalee", "Nocturne", "Nunu", "Olaf",
|
|
"Rek'Sai", "Rengar", "Sejuani", "Shaco", "Skarner", "Taliyah",
|
|
"Udyr", "Vi", "Viego", "Warwick", "Xin Zhao", "Zac"
|
|
],
|
|
"Mid": [
|
|
"Ahri", "Akali", "Akshan", "Annie", "Aurelion Sol", "Azir",
|
|
"Cassiopeia", "Corki", "Ekko", "Fizz", "Galio", "Heimerdinger",
|
|
"Hwei", "Irelia", "Katarina", "LeBlanc", "Lissandra", "Lux",
|
|
"Malzahar", "Mel", "Naafiri", "Neeko", "Orianna", "Qiyana",
|
|
"Ryze", "Sylas", "Syndra", "Talon", "Twisted Fate", "Veigar",
|
|
"Vex", "Viktor", "Vladimir", "Xerath", "Yasuo", "Yone", "Zed",
|
|
"Zoe"
|
|
],
|
|
"Bot": [
|
|
"Aphelios", "Ashe", "Caitlyn", "Draven", "Ezreal", "Jhin",
|
|
"Jinx", "Kai'Sa", "Kalista", "Kog'Maw", "Lucian",
|
|
"Miss Fortune", "Nilah", "Samira", "Sivir", "Smolder",
|
|
"Tristana", "Twitch", "Varus", "Vayne", "Xayah", "Zeri"
|
|
],
|
|
"Support": [
|
|
"Alistar", "Bard", "Blitzcrank", "Brand", "Braum", "Janna",
|
|
"Karma", "Leona", "Lulu", "Lux", "Milio", "Morgana", "Nami",
|
|
"Nautilus", "Pyke", "Rakan", "Rell", "Renata Glasc", "Senna",
|
|
"Seraphine", "Sona", "Soraka", "Swain", "Taric", "Thresh",
|
|
"Yuumi", "Zilean", "Zyra"
|
|
]
|
|
}
|
|
|
|
lane = random.choice(list(champions.keys()))
|
|
selected_champion = random.choice(champions[lane])
|
|
|
|
embed = discord.Embed(
|
|
title="League Champion Picker",
|
|
color=discord.Color.from_rgb(152, 0, 0)
|
|
)
|
|
embed.add_field(name="Selected Champion", value=selected_champion, inline=False)
|
|
embed.add_field(name="Lane", value=lane, inline=False)
|
|
embed.set_footer(text=f"Selected for {interaction.user.display_name}")
|
|
|
|
await interaction.response.send_message(embed=embed)
|
|
|
|
|
|
@client.tree.command(name="trivia", description="Play a trivia game")
|
|
@handle_command_errors
|
|
async def trivia(interaction: discord.Interaction):
|
|
questions = [
|
|
{"q": "What year was the original Super Mario Bros. released?", "options": ["1983", "1985", "1987", "1990"], "answer": 1},
|
|
{"q": "Which game features the quote 'The cake is a lie'?", "options": ["Half-Life 2", "Portal", "BioShock", "Minecraft"], "answer": 1},
|
|
{"q": "What is the max level in League of Legends?", "options": ["16", "18", "20", "25"], "answer": 1},
|
|
{"q": "Which Valorant agent has the codename 'Deadeye'?", "options": ["Jett", "Sova", "Chamber", "Cypher"], "answer": 2},
|
|
{"q": "How many Ender Dragon eggs can exist in a vanilla Minecraft world?", "options": ["1", "2", "Unlimited", "0"], "answer": 0},
|
|
{"q": "What was the first battle royale game to hit mainstream popularity?", "options": ["Fortnite", "PUBG", "H1Z1", "Apex Legends"], "answer": 2},
|
|
{"q": "In Minecraft, what is the rarest ore?", "options": ["Diamond", "Emerald", "Ancient Debris", "Lapis Lazuli"], "answer": 1},
|
|
{"q": "What is the name of the main character in The Legend of Zelda?", "options": ["Zelda", "Link", "Ganondorf", "Epona"], "answer": 1},
|
|
{"q": "Which game has the most registered players of all time?", "options": ["Fortnite", "Minecraft", "League of Legends", "Roblox"], "answer": 1},
|
|
{"q": "What type of animal is Sonic?", "options": ["Fox", "Hedgehog", "Rabbit", "Echidna"], "answer": 1},
|
|
{"q": "In Among Us, what is the maximum number of impostors?", "options": ["1", "2", "3", "4"], "answer": 2},
|
|
{"q": "What does GG stand for in gaming?", "options": ["Get Good", "Good Game", "Go Go", "Great Going"], "answer": 1},
|
|
{"q": "Which company developed Valorant?", "options": ["Blizzard", "Valve", "Riot Games", "Epic Games"], "answer": 2},
|
|
{"q": "What is the highest rank in Valorant?", "options": ["Immortal", "Diamond", "Radiant", "Challenger"], "answer": 2},
|
|
{"q": "In League of Legends, what is Baron Nashor an anagram of?", "options": ["Baron Roshan", "Roshan", "Nashor Baron", "Nash Robot"], "answer": 1},
|
|
{"q": "What does HTTP stand for?", "options": ["HyperText Transfer Protocol", "High Tech Transfer Program", "HyperText Transmission Process", "Home Tool Transfer Protocol"], "answer": 0},
|
|
{"q": "What year was Discord founded?", "options": ["2013", "2015", "2017", "2019"], "answer": 1},
|
|
{"q": "What programming language has a logo that is a snake?", "options": ["Java", "Ruby", "Python", "Go"], "answer": 2},
|
|
{"q": "How many bits are in a byte?", "options": ["4", "8", "16", "32"], "answer": 1},
|
|
{"q": "What does 'RGB' stand for?", "options": ["Really Good Build", "Red Green Blue", "Red Gold Black", "Rapid Gaming Boost"], "answer": 1},
|
|
{"q": "What is the most subscribed YouTube channel?", "options": ["PewDiePie", "MrBeast", "T-Series", "Cocomelon"], "answer": 1},
|
|
{"q": "What does 'AFK' stand for?", "options": ["A Free Kill", "Away From Keyboard", "Always Fun Killing", "Another Fake Knockdown"], "answer": 1},
|
|
{"q": "What animal is the Linux mascot?", "options": ["Fox", "Penguin", "Cat", "Dog"], "answer": 1},
|
|
{"q": "What does 'NPC' stand for?", "options": ["Non-Player Character", "New Player Content", "Normal Playing Conditions", "Never Played Competitively"], "answer": 0},
|
|
{"q": "In what year was the first iPhone released?", "options": ["2005", "2006", "2007", "2008"], "answer": 2},
|
|
]
|
|
|
|
question = random.choice(questions)
|
|
labels = ["A", "B", "C", "D"]
|
|
|
|
class TriviaView(discord.ui.View):
|
|
def __init__(self):
|
|
super().__init__(timeout=30)
|
|
self.answered_users = set()
|
|
|
|
async def handle_answer(self, interaction: discord.Interaction, selected: int):
|
|
if interaction.user.id in self.answered_users:
|
|
await interaction.response.send_message("You already answered!", ephemeral=True)
|
|
return
|
|
self.answered_users.add(interaction.user.id)
|
|
if selected == question["answer"]:
|
|
await interaction.response.send_message(
|
|
f"Correct! The answer was **{labels[question['answer']]}. {question['options'][question['answer']]}**",
|
|
ephemeral=True
|
|
)
|
|
else:
|
|
await interaction.response.send_message(
|
|
f"Wrong! The correct answer was **{labels[question['answer']]}. {question['options'][question['answer']]}**",
|
|
ephemeral=True
|
|
)
|
|
|
|
@discord.ui.button(label="A", style=discord.ButtonStyle.primary)
|
|
async def answer_a(self, interaction: discord.Interaction, button: discord.ui.Button):
|
|
await self.handle_answer(interaction, 0)
|
|
|
|
@discord.ui.button(label="B", style=discord.ButtonStyle.primary)
|
|
async def answer_b(self, interaction: discord.Interaction, button: discord.ui.Button):
|
|
await self.handle_answer(interaction, 1)
|
|
|
|
@discord.ui.button(label="C", style=discord.ButtonStyle.primary)
|
|
async def answer_c(self, interaction: discord.Interaction, button: discord.ui.Button):
|
|
await self.handle_answer(interaction, 2)
|
|
|
|
@discord.ui.button(label="D", style=discord.ButtonStyle.primary)
|
|
async def answer_d(self, interaction: discord.Interaction, button: discord.ui.Button):
|
|
await self.handle_answer(interaction, 3)
|
|
|
|
embed = discord.Embed(title="Trivia Time!", color=discord.Color.from_rgb(152, 0, 0))
|
|
embed.description = question["q"]
|
|
for i, option in enumerate(question["options"]):
|
|
embed.add_field(name=f"{labels[i]}", value=option, inline=True)
|
|
embed.set_footer(text="You have 30 seconds to answer!")
|
|
await interaction.response.send_message(embed=embed, view=TriviaView())
|
|
|
|
|
|
@client.tree.command(name="userinfo", description="Get information about a user")
|
|
@app_commands.describe(member="The user to get information about")
|
|
@handle_command_errors
|
|
async def userinfo(interaction: discord.Interaction, member: discord.Member):
|
|
embed = discord.Embed(title="User Information", color=discord.Color.from_rgb(152, 0, 0))
|
|
embed.add_field(name="Joined Server", value=member.joined_at.strftime("%Y-%m-%d"))
|
|
embed.add_field(name="Account Created", value=member.created_at.strftime("%Y-%m-%d"))
|
|
embed.add_field(name="Roles", value=", ".join([role.name for role in member.roles[1:]]))
|
|
await interaction.response.send_message(embed=embed)
|
|
|
|
|
|
@client.tree.command(name="health", description="Check bot system status")
|
|
@has_role_check(ADMIN_ROLE_ID)
|
|
@handle_command_errors
|
|
async def health(interaction: discord.Interaction):
|
|
stats = metrics.get_stats()
|
|
uptime_hours = stats["uptime_seconds"] / 3600
|
|
|
|
embed = discord.Embed(title="Bot Status", color=discord.Color.green())
|
|
|
|
embed.add_field(name="Latency", value=f"{round(client.latency * 1000)}ms")
|
|
embed.add_field(name="Guilds", value=str(len(client.guilds)))
|
|
embed.add_field(name="Users", value=str(len(client.users)))
|
|
embed.add_field(name="Uptime", value=f"{uptime_hours:.1f}h")
|
|
embed.add_field(name="Commands Run", value=str(stats["commands_executed"]))
|
|
embed.add_field(name="Errors", value=str(stats["error_count"]))
|
|
|
|
if stats["top_commands"]:
|
|
top_cmds = "\n".join([f"`{name}`: {count}" for name, count in stats["top_commands"]])
|
|
embed.add_field(name="Top Commands", value=top_cmds, inline=False)
|
|
|
|
checks = {
|
|
"RCON": bool(MINECRAFT_RCON_PASSWORD),
|
|
"Ollama": bool(OLLAMA_URL),
|
|
"Pelican": bool(PELICAN_API_KEY)
|
|
}
|
|
|
|
status_text = "\n".join([f"{'OK' if available else 'N/A'} {service}" for service, available in checks.items()])
|
|
embed.add_field(name="Services", value=status_text, inline=False)
|
|
|
|
await interaction.response.send_message(embed=embed, ephemeral=True)
|
|
|
|
|
|
@client.tree.error
|
|
async def on_app_command_error(interaction: discord.Interaction, error: app_commands.AppCommandError):
|
|
try:
|
|
if isinstance(error, app_commands.CheckFailure):
|
|
await interaction.response.send_message(
|
|
"You don't have permission to use this command!",
|
|
ephemeral=True
|
|
)
|
|
logger.warning(f"Permission denied for {interaction.user} on command {interaction.command.name}")
|
|
elif isinstance(error, app_commands.CommandOnCooldown):
|
|
await interaction.response.send_message(
|
|
f"This command is on cooldown. Try again in {error.retry_after:.0f} seconds.",
|
|
ephemeral=True
|
|
)
|
|
else:
|
|
await interaction.response.send_message(
|
|
"An error occurred while executing this command.",
|
|
ephemeral=True
|
|
)
|
|
logger.error(f"App command error: {str(error)}", exc_info=True)
|
|
except Exception as e:
|
|
logger.error(f"Error in on_app_command_error: {e}", exc_info=True)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
errors, warnings = ConfigValidator.validate()
|
|
|
|
if errors:
|
|
for error in errors:
|
|
logger.error(error)
|
|
exit(1)
|
|
|
|
for warning in warnings:
|
|
logger.warning(warning)
|
|
|
|
token = os.getenv('DISCORD_TOKEN')
|
|
logger.info("Starting bot...")
|
|
client.run(token)
|