Files
matrix/utils.py

133 lines
4.1 KiB
Python
Raw Normal View History

import logging
from logging.handlers import RotatingFileHandler
from pathlib import Path
from nio import AsyncClient, RoomSendResponse
from nio.exceptions import OlmUnverifiedDeviceError
from config import MAX_INPUT_LENGTH
def setup_logging(level="INFO"):
Path("logs").mkdir(exist_ok=True)
logger = logging.getLogger("matrixbot")
logger.setLevel(getattr(logging, level.upper(), logging.INFO))
file_handler = RotatingFileHandler(
"logs/matrixbot.log",
maxBytes=10 * 1024 * 1024,
backupCount=5,
)
file_handler.setFormatter(
logging.Formatter("%(asctime)s - %(levelname)s - %(message)s")
)
stream_handler = logging.StreamHandler()
stream_handler.setFormatter(
logging.Formatter("%(asctime)s - %(levelname)s - %(message)s")
)
logger.addHandler(file_handler)
logger.addHandler(stream_handler)
return logger
def _trust_all(client: AsyncClient):
"""Trust all devices in the device store."""
if not client.olm:
return
for user_id, devices in client.device_store.items():
for device_id, olm_device in devices.items():
if not client.olm.is_device_verified(olm_device):
client.verify_device(olm_device)
async def _room_send_trusted(client: AsyncClient, room_id: str, message_type: str, content: dict):
"""Send a message, auto-trusting devices on OlmUnverifiedDeviceError."""
try:
return await client.room_send(
room_id, message_type=message_type, content=content,
ignore_unverified_devices=True,
)
except OlmUnverifiedDeviceError:
_trust_all(client)
return await client.room_send(
room_id, message_type=message_type, content=content,
ignore_unverified_devices=True,
)
async def send_text(client: AsyncClient, room_id: str, text: str):
logger = logging.getLogger("matrixbot")
resp = await _room_send_trusted(
client, room_id,
message_type="m.room.message",
content={"msgtype": "m.text", "body": text},
)
if not isinstance(resp, RoomSendResponse):
logger.error("send_text failed: %s", resp)
return resp
async def send_html(client: AsyncClient, room_id: str, plain: str, html: str):
logger = logging.getLogger("matrixbot")
resp = await _room_send_trusted(
client, room_id,
message_type="m.room.message",
content={
"msgtype": "m.text",
"body": plain,
"format": "org.matrix.custom.html",
"formatted_body": html,
},
)
if not isinstance(resp, RoomSendResponse):
logger.error("send_html failed: %s", resp)
return resp
async def send_reaction(client: AsyncClient, room_id: str, event_id: str, emoji: str):
return await _room_send_trusted(
client, room_id,
message_type="m.reaction",
content={
"m.relates_to": {
"rel_type": "m.annotation",
"event_id": event_id,
"key": emoji,
}
},
)
async def get_or_create_dm(client: AsyncClient, user_id: str) -> str | None:
"""Find an existing DM room with user_id, or create one. Returns room_id."""
logger = logging.getLogger("matrixbot")
# Check existing rooms for a DM with this user
for room_id, room in client.rooms.items():
if room.member_count == 2 and user_id in (m.user_id for m in room.users.values()):
return room_id
# Create a new DM room
from nio import RoomCreateResponse
resp = await client.room_create(
is_direct=True,
invite=[user_id],
)
if isinstance(resp, RoomCreateResponse):
logger.info("Created DM room %s with %s", resp.room_id, user_id)
# Sync so the new room appears in client.rooms before we try to send
await client.sync(timeout=5000)
return resp.room_id
logger.error("Failed to create DM room with %s: %s", user_id, resp)
return None
def sanitize_input(text: str, max_length: int = MAX_INPUT_LENGTH) -> str:
text = text.strip()[:max_length]
text = "".join(char for char in text if char.isprintable())
return text