#!/usr/bin/env python3 """ voice-limit-guard — hard, cross-client voice channel participant limits. Sits in front of lk-jwt-service (the LiveKit MatrixRTC JWT issuer). Every Matrix client (Element, FluffyChat, Lotus Chat, ...) must obtain a token from this service before it can join a LiveKit room, so refusing the token here is a hard block that applies to ALL clients — not just our own. Flow for token requests (POST /sfu/get and legacy POST /get_token): 1. Read the request body and extract the Matrix `room` id (clients send the *raw* room id here; lk-jwt-service maps it to the hashed LiveKit alias). 2. Look up `io.lotus.voice_limit` -> max_users for that room via the Synapse admin API (cached briefly). 0 / absent => no limit. 3. Forward the request to lk-jwt-service unchanged and capture its response. 4. If a limit applies and the token was issued (HTTP 200), decode the JWT to read the LiveKit alias (`video.room`) and the requester identity (`sub`), then ask LiveKit how many distinct Matrix users are currently in the room. - requester already present (rejoin / extra device) -> allow - distinct users >= limit -> 403 (blocked) - otherwise -> allow 5. Anything that goes wrong in steps 2-4 FAILS OPEN: the upstream response is returned unchanged, so calls keep working even if this guard is degraded. All other requests (OPTIONS preflight, GET, unknown paths) are proxied transparently so CORS and health behaviour match lk-jwt-service exactly. """ import base64 import hashlib import hmac import json import os import threading import time import urllib.error import urllib.parse import urllib.request from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer # --- configuration (from environment) --------------------------------------- BIND_HOST = os.environ.get("GUARD_BIND_HOST", "0.0.0.0") BIND_PORT = int(os.environ.get("GUARD_BIND_PORT", "8070")) UPSTREAM = os.environ.get("GUARD_UPSTREAM", "http://127.0.0.1:8071").rstrip("/") LIVEKIT_API = os.environ.get("LIVEKIT_API", "http://127.0.0.1:7880").rstrip("/") LIVEKIT_KEY = os.environ.get("LIVEKIT_KEY", "") LIVEKIT_SECRET = os.environ.get("LIVEKIT_SECRET", "") SYNAPSE_API = os.environ.get("SYNAPSE_API", "http://127.0.0.1:8008").rstrip("/") MATRIX_TOKEN = os.environ.get("MATRIX_TOKEN", "") TOKEN_PATHS = ("/sfu/get", "/get_token") LIMIT_STATE_TYPE = "io.lotus.voice_limit" CORS_HEADERS = { "Access-Control-Allow-Origin": "*", "Access-Control-Allow-Methods": "POST", "Access-Control-Allow-Headers": "Accept, Content-Type, Content-Length, Accept-Encoding, X-CSRF-Token", } # --- small helpers ----------------------------------------------------------- def log(msg): print(f"[voice-limit-guard] {msg}", flush=True) def b64url(data: bytes) -> str: return base64.urlsafe_b64encode(data).rstrip(b"=").decode() def jwt_payload(token: str): """Best-effort decode of a JWT payload without verification.""" try: payload_b64 = token.split(".")[1] payload_b64 += "=" * (-len(payload_b64) % 4) return json.loads(base64.urlsafe_b64decode(payload_b64)) except Exception: return None def should_block(limit: int, present_users: set, requester: str) -> bool: """Pure decision: should this token be refused? - limit <= 0 -> never block (no limit configured) - requester already present -> never block (rejoin / extra device) - distinct users >= limit -> block """ if limit <= 0: return False if requester and requester in present_users: return False return len(present_users) >= limit def matrix_user(identity: str) -> str: """Reduce a LiveKit identity (`@user:domain:DEVICE`) to `@user:domain`. Non-Matrix identities (e.g. hashed federated identities) are returned unchanged so they each count as a distinct user. """ if not identity.startswith("@"): return identity first = identity.find(":") if first == -1: return identity second = identity.find(":", first + 1) return identity if second == -1 else identity[:second] # --- LiveKit admin JWT + participant counting -------------------------------- def livekit_admin_token(room: str) -> str: now = int(time.time()) header = b64url(json.dumps({"alg": "HS256", "typ": "JWT"}).encode()) payload = b64url( json.dumps( { "iss": LIVEKIT_KEY, "exp": now + 120, "nbf": now - 10, "video": {"roomAdmin": True, "room": room, "roomList": True}, } ).encode() ) signing_input = f"{header}.{payload}".encode() sig = b64url(hmac.new(LIVEKIT_SECRET.encode(), signing_input, hashlib.sha256).digest()) return f"{header}.{payload}.{sig}" def livekit_present_users(alias: str): """Return the set of distinct Matrix users currently in the LiveKit room.""" req = urllib.request.Request( f"{LIVEKIT_API}/twirp/livekit.RoomService/ListParticipants", data=json.dumps({"room": alias}).encode(), headers={ "Authorization": "Bearer " + livekit_admin_token(alias), "Content-Type": "application/json", }, method="POST", ) with urllib.request.urlopen(req, timeout=5) as resp: data = json.loads(resp.read()) return {matrix_user(p.get("identity", "")) for p in data.get("participants", [])} # --- per-room limit lookup (cached) ------------------------------------------ _limit_cache = {} # room_id -> (max_users, expiry_epoch) _limit_cache_lock = threading.Lock() _LIMIT_TTL = 10.0 def room_limit(room_id: str) -> int: now = time.time() with _limit_cache_lock: cached = _limit_cache.get(room_id) if cached and cached[1] > now: return cached[0] limit = 0 try: url = ( f"{SYNAPSE_API}/_synapse/admin/v1/rooms/" f"{urllib.parse.quote(room_id, safe='')}/state" ) req = urllib.request.Request(url, headers={"Authorization": "Bearer " + MATRIX_TOKEN}) with urllib.request.urlopen(req, timeout=5) as resp: events = json.loads(resp.read()).get("state", []) for ev in events: if ev.get("type") == LIMIT_STATE_TYPE and ev.get("state_key", "") == "": limit = int(ev.get("content", {}).get("max_users", 0) or 0) break except Exception as exc: # Fail open: treat lookup failure as "no limit". log(f"limit lookup failed for {room_id}: {exc}") return 0 with _limit_cache_lock: _limit_cache[room_id] = (limit, now + _LIMIT_TTL) return limit # --- HTTP handler ------------------------------------------------------------ class Handler(BaseHTTPRequestHandler): protocol_version = "HTTP/1.1" def log_message(self, *args): # silence default request logging pass def _read_body(self) -> bytes: length = int(self.headers.get("Content-Length", 0) or 0) return self.rfile.read(length) if length else b"" def _proxy(self, body: bytes): """Forward the current request to lk-jwt-service and return its response as (status, headers_dict, body_bytes).""" headers = {} if self.headers.get("Content-Type"): headers["Content-Type"] = self.headers["Content-Type"] if self.headers.get("Accept"): headers["Accept"] = self.headers["Accept"] if self.headers.get("Origin"): headers["Origin"] = self.headers["Origin"] req = urllib.request.Request( UPSTREAM + self.path, data=body if self.command in ("POST", "PUT", "PATCH") else None, headers=headers, method=self.command, ) try: with urllib.request.urlopen(req, timeout=15) as resp: return resp.status, dict(resp.headers), resp.read() except urllib.error.HTTPError as exc: return exc.code, dict(exc.headers), exc.read() def _send(self, status: int, headers: dict, body: bytes): self.send_response(status) # send_response() already emits Server and Date; relaying them too would # produce duplicates. Content-Length is recomputed below. skip = ("transfer-encoding", "content-length", "connection", "date", "server") for key, value in headers.items(): if key.lower() in skip: continue self.send_header(key, value) self.send_header("Content-Length", str(len(body))) self.end_headers() if body and self.command != "HEAD": self.wfile.write(body) def _send_blocked(self): body = json.dumps( {"errcode": "M_FORBIDDEN", "error": "This voice channel is full."} ).encode() headers = dict(CORS_HEADERS) headers["Content-Type"] = "application/json" self._send(403, headers, body) def _handle(self): body = self._read_body() # Only token-issuing POSTs are subject to the limit check. if not (self.command == "POST" and self.path in TOKEN_PATHS): self._send(*self._proxy(body)) return # Determine the room and its limit before bothering upstream. try: room_id = json.loads(body).get("room", "") except Exception: room_id = "" limit = room_limit(room_id) if room_id else 0 status, headers, resp_body = self._proxy(body) # No limit, or upstream didn't issue a token -> pass through unchanged. if limit <= 0 or status != 200: self._send(status, headers, resp_body) return # Limit applies and a token was issued — decide whether to allow it. try: payload = json.loads(resp_body) token = payload.get("jwt", "") claims = jwt_payload(token) or {} alias = (claims.get("video") or {}).get("room", "") requester = matrix_user(claims.get("sub", "")) if not alias: raise ValueError("no alias in issued token") present = livekit_present_users(alias) if should_block(limit, present, requester): log(f"blocked {requester or '?'} from {room_id}: {len(present)}/{limit}") self._send_blocked() return except Exception as exc: # Fail open: never break a join because the guard had a problem. log(f"limit check error for {room_id}: {exc}") self._send(status, headers, resp_body) # Map the HTTP verbs we care about onto the shared handler. do_GET = _handle do_POST = _handle do_OPTIONS = _handle do_HEAD = _handle do_PUT = _handle class GuardServer(ThreadingHTTPServer): daemon_threads = True # Element Call fires a burst of token requests per join; keep the accept # queue generous so none are dropped. request_queue_size = 128 allow_reuse_address = True def main(): if not (LIVEKIT_KEY and LIVEKIT_SECRET and MATRIX_TOKEN): log("WARNING: missing LIVEKIT_KEY/LIVEKIT_SECRET/MATRIX_TOKEN — limit checks will fail open") server = GuardServer((BIND_HOST, BIND_PORT), Handler) log(f"listening on {BIND_HOST}:{BIND_PORT} -> upstream {UPSTREAM}") server.serve_forever() if __name__ == "__main__": main()