import logging from logging.handlers import RotatingFileHandler from pathlib import Path from nio import AsyncClient, RoomSendResponse from nio.exceptions import OlmUnverifiedDeviceError from config import MAX_INPUT_LENGTH def setup_logging(level="INFO"): Path("logs").mkdir(exist_ok=True) logger = logging.getLogger("matrixbot") logger.setLevel(getattr(logging, level.upper(), logging.INFO)) file_handler = RotatingFileHandler( "logs/matrixbot.log", maxBytes=10 * 1024 * 1024, backupCount=5, ) file_handler.setFormatter( logging.Formatter("%(asctime)s - %(levelname)s - %(message)s") ) stream_handler = logging.StreamHandler() stream_handler.setFormatter( logging.Formatter("%(asctime)s - %(levelname)s - %(message)s") ) logger.addHandler(file_handler) logger.addHandler(stream_handler) return logger def _trust_all(client: AsyncClient): """Trust all devices in the device store.""" if not client.olm: return for user_id, devices in client.device_store.items(): for device_id, olm_device in devices.items(): if not client.olm.is_device_verified(olm_device): client.verify_device(olm_device) async def _room_send_trusted(client: AsyncClient, room_id: str, message_type: str, content: dict): """Send a message, auto-trusting devices on OlmUnverifiedDeviceError.""" try: return await client.room_send( room_id, message_type=message_type, content=content, ignore_unverified_devices=True, ) except OlmUnverifiedDeviceError: _trust_all(client) return await client.room_send( room_id, message_type=message_type, content=content, ignore_unverified_devices=True, ) async def send_text(client: AsyncClient, room_id: str, text: str): logger = logging.getLogger("matrixbot") resp = await _room_send_trusted( client, room_id, message_type="m.room.message", content={"msgtype": "m.text", "body": text}, ) if not isinstance(resp, RoomSendResponse): logger.error("send_text failed: %s", resp) return resp async def send_html(client: AsyncClient, room_id: str, plain: str, html: str): logger = logging.getLogger("matrixbot") resp = await _room_send_trusted( client, room_id, message_type="m.room.message", content={ "msgtype": "m.text", "body": plain, "format": "org.matrix.custom.html", "formatted_body": html, }, ) if not isinstance(resp, RoomSendResponse): logger.error("send_html failed: %s", resp) return resp async def send_reaction(client: AsyncClient, room_id: str, event_id: str, emoji: str): return await _room_send_trusted( client, room_id, message_type="m.reaction", content={ "m.relates_to": { "rel_type": "m.annotation", "event_id": event_id, "key": emoji, } }, ) async def get_or_create_dm(client: AsyncClient, user_id: str) -> str | None: """Find an existing DM room with user_id, or create one. Returns room_id.""" logger = logging.getLogger("matrixbot") # Check existing rooms for a DM with this user for room_id, room in client.rooms.items(): if room.member_count == 2 and user_id in (m.user_id for m in room.users.values()): return room_id # Create a new DM room from nio import RoomCreateResponse resp = await client.room_create( is_direct=True, invite=[user_id], ) if isinstance(resp, RoomCreateResponse): logger.info("Created DM room %s with %s", resp.room_id, user_id) # Sync so the new room appears in client.rooms before we try to send await client.sync(timeout=5000) return resp.room_id logger.error("Failed to create DM room with %s: %s", user_id, resp) return None def sanitize_input(text: str, max_length: int = MAX_INPUT_LENGTH) -> str: text = text.strip()[:max_length] text = "".join(char for char in text if char.isprintable()) return text