import asyncio import json import logging import signal import sys from pathlib import Path from nio import ( AsyncClient, AsyncClientConfig, InviteMemberEvent, LoginResponse, RoomMemberEvent, RoomMessageText, UnknownEvent, ) from config import ( MATRIX_HOMESERVER, MATRIX_USER_ID, MATRIX_ACCESS_TOKEN, MATRIX_DEVICE_ID, MATRIX_PASSWORD, LOG_LEVEL, ConfigValidator, ) from callbacks import Callbacks from utils import setup_logging from welcome import post_welcome_message logger = setup_logging(LOG_LEVEL) CREDENTIALS_FILE = Path("credentials.json") STORE_PATH = Path("nio_store") def save_credentials(resp, homeserver): data = { "homeserver": homeserver, "user_id": resp.user_id, "device_id": resp.device_id, "access_token": resp.access_token, } CREDENTIALS_FILE.write_text(json.dumps(data, indent=2)) logger.info("Credentials saved to %s", CREDENTIALS_FILE) async def trust_devices(client: AsyncClient): """Query keys and trust all devices for all users we share rooms with.""" if not client.olm: logger.warning("Olm not loaded, skipping device trust") return # Collect all users across all joined rooms users = set() for room in client.rooms.values(): for user_id in room.users: users.add(user_id) # Fetch device keys so the store is complete if users: await client.keys_query() # Trust every device for user_id, devices in client.device_store.items(): for device_id, olm_device in devices.items(): if not client.olm.is_device_verified(olm_device): client.verify_device(olm_device) logger.info("Trusted all known devices (%d users)", len(users)) async def main(): errors = ConfigValidator.validate() if errors: for e in errors: logger.error(e) sys.exit(1) STORE_PATH.mkdir(exist_ok=True) client_config = AsyncClientConfig( store_sync_tokens=True, encryption_enabled=True, store_name="matrixbot", ) client = AsyncClient( MATRIX_HOMESERVER, MATRIX_USER_ID, device_id=MATRIX_DEVICE_ID or None, config=client_config, store_path=str(STORE_PATH), ) # Try saved credentials first, then .env token, then password login logged_in = False has_creds = False if CREDENTIALS_FILE.exists(): creds = json.loads(CREDENTIALS_FILE.read_text()) client.access_token = creds["access_token"] client.user_id = creds["user_id"] client.device_id = creds["device_id"] has_creds = True logger.info("Loaded credentials from %s", CREDENTIALS_FILE) elif MATRIX_ACCESS_TOKEN and MATRIX_DEVICE_ID: client.access_token = MATRIX_ACCESS_TOKEN client.user_id = MATRIX_USER_ID client.device_id = MATRIX_DEVICE_ID has_creds = True logger.info("Using access token from .env") # Load the olm/e2ee store only if we have a device_id if has_creds: client.load_store() # Test the token with a sync; if it fails, fall back to password login if has_creds and client.access_token: logger.info("Testing existing access token...") sync_resp = await client.sync(timeout=30000, full_state=True) if hasattr(sync_resp, "next_batch"): logged_in = True logger.info("Existing token is valid") else: logger.warning("Existing token is invalid, will try password login") client.access_token = "" if not logged_in: if not MATRIX_PASSWORD: logger.error("No valid token and no MATRIX_PASSWORD set — cannot authenticate") await client.close() sys.exit(1) logger.info("Logging in with password...") login_resp = await client.login(MATRIX_PASSWORD, device_name="LotusBot") if isinstance(login_resp, LoginResponse): logger.info("Password login successful, device_id=%s", login_resp.device_id) save_credentials(login_resp, MATRIX_HOMESERVER) client.load_store() sync_resp = await client.sync(timeout=30000, full_state=True) else: logger.error("Password login failed: %s", login_resp) await client.close() sys.exit(1) callbacks = Callbacks(client) client.add_event_callback(callbacks.message, RoomMessageText) client.add_event_callback(callbacks.reaction, UnknownEvent) client.add_event_callback(callbacks.member, RoomMemberEvent) # Auto-accept room invites async def _auto_accept_invite(room, event): if event.membership == "invite" and event.state_key == MATRIX_USER_ID: logger.info("Auto-accepting invite to %s", room.room_id) await client.join(room.room_id) client.add_event_callback(_auto_accept_invite, InviteMemberEvent) # Graceful shutdown loop = asyncio.get_running_loop() shutdown_event = asyncio.Event() def _signal_handler(): logger.info("Shutdown signal received") shutdown_event.set() for sig in (signal.SIGTERM, signal.SIGINT): loop.add_signal_handler(sig, _signal_handler) # Mark startup complete from the initial sync if hasattr(sync_resp, "next_batch"): callbacks.startup_sync_token = sync_resp.next_batch logger.info("Initial sync complete, token: %s", sync_resp.next_batch[:20]) else: logger.error("Initial sync failed: %s", sync_resp) await client.close() sys.exit(1) # Trust devices after initial sync loads the device store await trust_devices(client) # Post welcome message (idempotent — only posts if not already stored) await post_welcome_message(client) logger.info("Bot ready as %s — listening for commands", MATRIX_USER_ID) # Run sync_forever in a task so we can cancel on shutdown async def _sync_loop(): await client.sync_forever(timeout=30000, full_state=False) sync_task = asyncio.create_task(_sync_loop()) await shutdown_event.wait() sync_task.cancel() try: await sync_task except asyncio.CancelledError: pass await client.close() logger.info("Bot shut down cleanly") if __name__ == "__main__": asyncio.run(main())