Files
matrix/livekit/voice-limit-guard.py
T
jared 68a6acfa24
Lint / Shell (shellcheck) (push) Successful in 7s
Lint / JS (eslint) (push) Successful in 5s
Lint / Python (ruff) (push) Successful in 4s
Lint / Python deps (pip-audit) (push) Successful in 1m1s
Lint / Secret scan (gitleaks) (push) Successful in 4s
feat: hard cross-client voice channel limits via voice-limit-guard
Add a fail-open Python sidecar (livekit/voice-limit-guard.py) that fronts
lk-jwt-service to enforce per-room voice participant caps for ALL Matrix
clients, not just Lotus Chat:
- lk-jwt-service moved to :8071 (systemd drop-in), guard owns :8070 so NPM's
  existing /sfu/get + /get_token proxy targets are unchanged
- guard reads io.lotus.voice_limit.max_users (Synapse admin API, cached),
  forwards to lk-jwt-service, and on an issued token decodes the LiveKit alias
  + requester, counts distinct Matrix users via LiveKit ListParticipants, and
  returns 403 when the room is full (rejoins/extra devices allowed)
- any error fails open (returns upstream response) so calls never break
- systemd/voice-limit-guard.service; README documents ports, setup, revert

Also update landing page: voice limit is now server-enforced for all clients.

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
2026-06-12 23:45:41 -04:00

310 lines
11 KiB
Python

#!/usr/bin/env python3
"""
voice-limit-guard — hard, cross-client voice channel participant limits.
Sits in front of lk-jwt-service (the LiveKit MatrixRTC JWT issuer). Every
Matrix client (Element, FluffyChat, Lotus Chat, ...) must obtain a token from
this service before it can join a LiveKit room, so refusing the token here is a
hard block that applies to ALL clients — not just our own.
Flow for token requests (POST /sfu/get and legacy POST /get_token):
1. Read the request body and extract the Matrix `room` id (clients send the
*raw* room id here; lk-jwt-service maps it to the hashed LiveKit alias).
2. Look up `io.lotus.voice_limit` -> max_users for that room via the Synapse
admin API (cached briefly). 0 / absent => no limit.
3. Forward the request to lk-jwt-service unchanged and capture its response.
4. If a limit applies and the token was issued (HTTP 200), decode the JWT to
read the LiveKit alias (`video.room`) and the requester identity (`sub`),
then ask LiveKit how many distinct Matrix users are currently in the room.
- requester already present (rejoin / extra device) -> allow
- distinct users >= limit -> 403 (blocked)
- otherwise -> allow
5. Anything that goes wrong in steps 2-4 FAILS OPEN: the upstream response is
returned unchanged, so calls keep working even if this guard is degraded.
All other requests (OPTIONS preflight, GET, unknown paths) are proxied
transparently so CORS and health behaviour match lk-jwt-service exactly.
"""
import base64
import hashlib
import hmac
import json
import os
import threading
import time
import urllib.error
import urllib.parse
import urllib.request
from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer
# --- configuration (from environment) ---------------------------------------
BIND_HOST = os.environ.get("GUARD_BIND_HOST", "0.0.0.0")
BIND_PORT = int(os.environ.get("GUARD_BIND_PORT", "8070"))
UPSTREAM = os.environ.get("GUARD_UPSTREAM", "http://127.0.0.1:8071").rstrip("/")
LIVEKIT_API = os.environ.get("LIVEKIT_API", "http://127.0.0.1:7880").rstrip("/")
LIVEKIT_KEY = os.environ.get("LIVEKIT_KEY", "")
LIVEKIT_SECRET = os.environ.get("LIVEKIT_SECRET", "")
SYNAPSE_API = os.environ.get("SYNAPSE_API", "http://127.0.0.1:8008").rstrip("/")
MATRIX_TOKEN = os.environ.get("MATRIX_TOKEN", "")
TOKEN_PATHS = ("/sfu/get", "/get_token")
LIMIT_STATE_TYPE = "io.lotus.voice_limit"
CORS_HEADERS = {
"Access-Control-Allow-Origin": "*",
"Access-Control-Allow-Methods": "POST",
"Access-Control-Allow-Headers": "Accept, Content-Type, Content-Length, Accept-Encoding, X-CSRF-Token",
}
# --- small helpers -----------------------------------------------------------
def log(msg):
print(f"[voice-limit-guard] {msg}", flush=True)
def b64url(data: bytes) -> str:
return base64.urlsafe_b64encode(data).rstrip(b"=").decode()
def jwt_payload(token: str):
"""Best-effort decode of a JWT payload without verification."""
try:
payload_b64 = token.split(".")[1]
payload_b64 += "=" * (-len(payload_b64) % 4)
return json.loads(base64.urlsafe_b64decode(payload_b64))
except Exception:
return None
def should_block(limit: int, present_users: set, requester: str) -> bool:
"""Pure decision: should this token be refused?
- limit <= 0 -> never block (no limit configured)
- requester already present -> never block (rejoin / extra device)
- distinct users >= limit -> block
"""
if limit <= 0:
return False
if requester and requester in present_users:
return False
return len(present_users) >= limit
def matrix_user(identity: str) -> str:
"""Reduce a LiveKit identity (`@user:domain:DEVICE`) to `@user:domain`.
Non-Matrix identities (e.g. hashed federated identities) are returned
unchanged so they each count as a distinct user.
"""
if not identity.startswith("@"):
return identity
first = identity.find(":")
if first == -1:
return identity
second = identity.find(":", first + 1)
return identity if second == -1 else identity[:second]
# --- LiveKit admin JWT + participant counting --------------------------------
def livekit_admin_token(room: str) -> str:
now = int(time.time())
header = b64url(json.dumps({"alg": "HS256", "typ": "JWT"}).encode())
payload = b64url(
json.dumps(
{
"iss": LIVEKIT_KEY,
"exp": now + 120,
"nbf": now - 10,
"video": {"roomAdmin": True, "room": room, "roomList": True},
}
).encode()
)
signing_input = f"{header}.{payload}".encode()
sig = b64url(hmac.new(LIVEKIT_SECRET.encode(), signing_input, hashlib.sha256).digest())
return f"{header}.{payload}.{sig}"
def livekit_present_users(alias: str):
"""Return the set of distinct Matrix users currently in the LiveKit room."""
req = urllib.request.Request(
f"{LIVEKIT_API}/twirp/livekit.RoomService/ListParticipants",
data=json.dumps({"room": alias}).encode(),
headers={
"Authorization": "Bearer " + livekit_admin_token(alias),
"Content-Type": "application/json",
},
method="POST",
)
with urllib.request.urlopen(req, timeout=5) as resp:
data = json.loads(resp.read())
return {matrix_user(p.get("identity", "")) for p in data.get("participants", [])}
# --- per-room limit lookup (cached) ------------------------------------------
_limit_cache = {} # room_id -> (max_users, expiry_epoch)
_limit_cache_lock = threading.Lock()
_LIMIT_TTL = 10.0
def room_limit(room_id: str) -> int:
now = time.time()
with _limit_cache_lock:
cached = _limit_cache.get(room_id)
if cached and cached[1] > now:
return cached[0]
limit = 0
try:
url = (
f"{SYNAPSE_API}/_synapse/admin/v1/rooms/"
f"{urllib.parse.quote(room_id, safe='')}/state"
)
req = urllib.request.Request(url, headers={"Authorization": "Bearer " + MATRIX_TOKEN})
with urllib.request.urlopen(req, timeout=5) as resp:
events = json.loads(resp.read()).get("state", [])
for ev in events:
if ev.get("type") == LIMIT_STATE_TYPE and ev.get("state_key", "") == "":
limit = int(ev.get("content", {}).get("max_users", 0) or 0)
break
except Exception as exc:
# Fail open: treat lookup failure as "no limit".
log(f"limit lookup failed for {room_id}: {exc}")
return 0
with _limit_cache_lock:
_limit_cache[room_id] = (limit, now + _LIMIT_TTL)
return limit
# --- HTTP handler ------------------------------------------------------------
class Handler(BaseHTTPRequestHandler):
protocol_version = "HTTP/1.1"
def log_message(self, *args): # silence default request logging
pass
def _read_body(self) -> bytes:
length = int(self.headers.get("Content-Length", 0) or 0)
return self.rfile.read(length) if length else b""
def _proxy(self, body: bytes):
"""Forward the current request to lk-jwt-service and return its response
as (status, headers_dict, body_bytes)."""
headers = {}
if self.headers.get("Content-Type"):
headers["Content-Type"] = self.headers["Content-Type"]
if self.headers.get("Accept"):
headers["Accept"] = self.headers["Accept"]
if self.headers.get("Origin"):
headers["Origin"] = self.headers["Origin"]
req = urllib.request.Request(
UPSTREAM + self.path,
data=body if self.command in ("POST", "PUT", "PATCH") else None,
headers=headers,
method=self.command,
)
try:
with urllib.request.urlopen(req, timeout=15) as resp:
return resp.status, dict(resp.headers), resp.read()
except urllib.error.HTTPError as exc:
return exc.code, dict(exc.headers), exc.read()
def _send(self, status: int, headers: dict, body: bytes):
self.send_response(status)
# send_response() already emits Server and Date; relaying them too would
# produce duplicates. Content-Length is recomputed below.
skip = ("transfer-encoding", "content-length", "connection", "date", "server")
for key, value in headers.items():
if key.lower() in skip:
continue
self.send_header(key, value)
self.send_header("Content-Length", str(len(body)))
self.end_headers()
if body and self.command != "HEAD":
self.wfile.write(body)
def _send_blocked(self):
body = json.dumps(
{"errcode": "M_FORBIDDEN", "error": "This voice channel is full."}
).encode()
headers = dict(CORS_HEADERS)
headers["Content-Type"] = "application/json"
self._send(403, headers, body)
def _handle(self):
body = self._read_body()
# Only token-issuing POSTs are subject to the limit check.
if not (self.command == "POST" and self.path in TOKEN_PATHS):
self._send(*self._proxy(body))
return
# Determine the room and its limit before bothering upstream.
try:
room_id = json.loads(body).get("room", "")
except Exception:
room_id = ""
limit = room_limit(room_id) if room_id else 0
status, headers, resp_body = self._proxy(body)
# No limit, or upstream didn't issue a token -> pass through unchanged.
if limit <= 0 or status != 200:
self._send(status, headers, resp_body)
return
# Limit applies and a token was issued — decide whether to allow it.
try:
payload = json.loads(resp_body)
token = payload.get("jwt", "")
claims = jwt_payload(token) or {}
alias = (claims.get("video") or {}).get("room", "")
requester = matrix_user(claims.get("sub", ""))
if not alias:
raise ValueError("no alias in issued token")
present = livekit_present_users(alias)
if should_block(limit, present, requester):
log(f"blocked {requester or '?'} from {room_id}: {len(present)}/{limit}")
self._send_blocked()
return
except Exception as exc:
# Fail open: never break a join because the guard had a problem.
log(f"limit check error for {room_id}: {exc}")
self._send(status, headers, resp_body)
# Map the HTTP verbs we care about onto the shared handler.
do_GET = _handle
do_POST = _handle
do_OPTIONS = _handle
do_HEAD = _handle
do_PUT = _handle
class GuardServer(ThreadingHTTPServer):
daemon_threads = True
# Element Call fires a burst of token requests per join; keep the accept
# queue generous so none are dropped.
request_queue_size = 128
allow_reuse_address = True
def main():
if not (LIVEKIT_KEY and LIVEKIT_SECRET and MATRIX_TOKEN):
log("WARNING: missing LIVEKIT_KEY/LIVEKIT_SECRET/MATRIX_TOKEN — limit checks will fail open")
server = GuardServer((BIND_HOST, BIND_PORT), Handler)
log(f"listening on {BIND_HOST}:{BIND_PORT} -> upstream {UPSTREAM}")
server.serve_forever()
if __name__ == "__main__":
main()