68a6acfa24
Add a fail-open Python sidecar (livekit/voice-limit-guard.py) that fronts lk-jwt-service to enforce per-room voice participant caps for ALL Matrix clients, not just Lotus Chat: - lk-jwt-service moved to :8071 (systemd drop-in), guard owns :8070 so NPM's existing /sfu/get + /get_token proxy targets are unchanged - guard reads io.lotus.voice_limit.max_users (Synapse admin API, cached), forwards to lk-jwt-service, and on an issued token decodes the LiveKit alias + requester, counts distinct Matrix users via LiveKit ListParticipants, and returns 403 when the room is full (rejoins/extra devices allowed) - any error fails open (returns upstream response) so calls never break - systemd/voice-limit-guard.service; README documents ports, setup, revert Also update landing page: voice limit is now server-enforced for all clients. Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
310 lines
11 KiB
Python
310 lines
11 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
voice-limit-guard — hard, cross-client voice channel participant limits.
|
|
|
|
Sits in front of lk-jwt-service (the LiveKit MatrixRTC JWT issuer). Every
|
|
Matrix client (Element, FluffyChat, Lotus Chat, ...) must obtain a token from
|
|
this service before it can join a LiveKit room, so refusing the token here is a
|
|
hard block that applies to ALL clients — not just our own.
|
|
|
|
Flow for token requests (POST /sfu/get and legacy POST /get_token):
|
|
1. Read the request body and extract the Matrix `room` id (clients send the
|
|
*raw* room id here; lk-jwt-service maps it to the hashed LiveKit alias).
|
|
2. Look up `io.lotus.voice_limit` -> max_users for that room via the Synapse
|
|
admin API (cached briefly). 0 / absent => no limit.
|
|
3. Forward the request to lk-jwt-service unchanged and capture its response.
|
|
4. If a limit applies and the token was issued (HTTP 200), decode the JWT to
|
|
read the LiveKit alias (`video.room`) and the requester identity (`sub`),
|
|
then ask LiveKit how many distinct Matrix users are currently in the room.
|
|
- requester already present (rejoin / extra device) -> allow
|
|
- distinct users >= limit -> 403 (blocked)
|
|
- otherwise -> allow
|
|
5. Anything that goes wrong in steps 2-4 FAILS OPEN: the upstream response is
|
|
returned unchanged, so calls keep working even if this guard is degraded.
|
|
|
|
All other requests (OPTIONS preflight, GET, unknown paths) are proxied
|
|
transparently so CORS and health behaviour match lk-jwt-service exactly.
|
|
"""
|
|
|
|
import base64
|
|
import hashlib
|
|
import hmac
|
|
import json
|
|
import os
|
|
import threading
|
|
import time
|
|
import urllib.error
|
|
import urllib.parse
|
|
import urllib.request
|
|
from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer
|
|
|
|
# --- configuration (from environment) ---------------------------------------
|
|
BIND_HOST = os.environ.get("GUARD_BIND_HOST", "0.0.0.0")
|
|
BIND_PORT = int(os.environ.get("GUARD_BIND_PORT", "8070"))
|
|
UPSTREAM = os.environ.get("GUARD_UPSTREAM", "http://127.0.0.1:8071").rstrip("/")
|
|
LIVEKIT_API = os.environ.get("LIVEKIT_API", "http://127.0.0.1:7880").rstrip("/")
|
|
LIVEKIT_KEY = os.environ.get("LIVEKIT_KEY", "")
|
|
LIVEKIT_SECRET = os.environ.get("LIVEKIT_SECRET", "")
|
|
SYNAPSE_API = os.environ.get("SYNAPSE_API", "http://127.0.0.1:8008").rstrip("/")
|
|
MATRIX_TOKEN = os.environ.get("MATRIX_TOKEN", "")
|
|
|
|
TOKEN_PATHS = ("/sfu/get", "/get_token")
|
|
LIMIT_STATE_TYPE = "io.lotus.voice_limit"
|
|
CORS_HEADERS = {
|
|
"Access-Control-Allow-Origin": "*",
|
|
"Access-Control-Allow-Methods": "POST",
|
|
"Access-Control-Allow-Headers": "Accept, Content-Type, Content-Length, Accept-Encoding, X-CSRF-Token",
|
|
}
|
|
|
|
# --- small helpers -----------------------------------------------------------
|
|
|
|
|
|
def log(msg):
|
|
print(f"[voice-limit-guard] {msg}", flush=True)
|
|
|
|
|
|
def b64url(data: bytes) -> str:
|
|
return base64.urlsafe_b64encode(data).rstrip(b"=").decode()
|
|
|
|
|
|
def jwt_payload(token: str):
|
|
"""Best-effort decode of a JWT payload without verification."""
|
|
try:
|
|
payload_b64 = token.split(".")[1]
|
|
payload_b64 += "=" * (-len(payload_b64) % 4)
|
|
return json.loads(base64.urlsafe_b64decode(payload_b64))
|
|
except Exception:
|
|
return None
|
|
|
|
|
|
def should_block(limit: int, present_users: set, requester: str) -> bool:
|
|
"""Pure decision: should this token be refused?
|
|
|
|
- limit <= 0 -> never block (no limit configured)
|
|
- requester already present -> never block (rejoin / extra device)
|
|
- distinct users >= limit -> block
|
|
"""
|
|
if limit <= 0:
|
|
return False
|
|
if requester and requester in present_users:
|
|
return False
|
|
return len(present_users) >= limit
|
|
|
|
|
|
def matrix_user(identity: str) -> str:
|
|
"""Reduce a LiveKit identity (`@user:domain:DEVICE`) to `@user:domain`.
|
|
|
|
Non-Matrix identities (e.g. hashed federated identities) are returned
|
|
unchanged so they each count as a distinct user.
|
|
"""
|
|
if not identity.startswith("@"):
|
|
return identity
|
|
first = identity.find(":")
|
|
if first == -1:
|
|
return identity
|
|
second = identity.find(":", first + 1)
|
|
return identity if second == -1 else identity[:second]
|
|
|
|
|
|
# --- LiveKit admin JWT + participant counting --------------------------------
|
|
|
|
|
|
def livekit_admin_token(room: str) -> str:
|
|
now = int(time.time())
|
|
header = b64url(json.dumps({"alg": "HS256", "typ": "JWT"}).encode())
|
|
payload = b64url(
|
|
json.dumps(
|
|
{
|
|
"iss": LIVEKIT_KEY,
|
|
"exp": now + 120,
|
|
"nbf": now - 10,
|
|
"video": {"roomAdmin": True, "room": room, "roomList": True},
|
|
}
|
|
).encode()
|
|
)
|
|
signing_input = f"{header}.{payload}".encode()
|
|
sig = b64url(hmac.new(LIVEKIT_SECRET.encode(), signing_input, hashlib.sha256).digest())
|
|
return f"{header}.{payload}.{sig}"
|
|
|
|
|
|
def livekit_present_users(alias: str):
|
|
"""Return the set of distinct Matrix users currently in the LiveKit room."""
|
|
req = urllib.request.Request(
|
|
f"{LIVEKIT_API}/twirp/livekit.RoomService/ListParticipants",
|
|
data=json.dumps({"room": alias}).encode(),
|
|
headers={
|
|
"Authorization": "Bearer " + livekit_admin_token(alias),
|
|
"Content-Type": "application/json",
|
|
},
|
|
method="POST",
|
|
)
|
|
with urllib.request.urlopen(req, timeout=5) as resp:
|
|
data = json.loads(resp.read())
|
|
return {matrix_user(p.get("identity", "")) for p in data.get("participants", [])}
|
|
|
|
|
|
# --- per-room limit lookup (cached) ------------------------------------------
|
|
|
|
_limit_cache = {} # room_id -> (max_users, expiry_epoch)
|
|
_limit_cache_lock = threading.Lock()
|
|
_LIMIT_TTL = 10.0
|
|
|
|
|
|
def room_limit(room_id: str) -> int:
|
|
now = time.time()
|
|
with _limit_cache_lock:
|
|
cached = _limit_cache.get(room_id)
|
|
if cached and cached[1] > now:
|
|
return cached[0]
|
|
|
|
limit = 0
|
|
try:
|
|
url = (
|
|
f"{SYNAPSE_API}/_synapse/admin/v1/rooms/"
|
|
f"{urllib.parse.quote(room_id, safe='')}/state"
|
|
)
|
|
req = urllib.request.Request(url, headers={"Authorization": "Bearer " + MATRIX_TOKEN})
|
|
with urllib.request.urlopen(req, timeout=5) as resp:
|
|
events = json.loads(resp.read()).get("state", [])
|
|
for ev in events:
|
|
if ev.get("type") == LIMIT_STATE_TYPE and ev.get("state_key", "") == "":
|
|
limit = int(ev.get("content", {}).get("max_users", 0) or 0)
|
|
break
|
|
except Exception as exc:
|
|
# Fail open: treat lookup failure as "no limit".
|
|
log(f"limit lookup failed for {room_id}: {exc}")
|
|
return 0
|
|
|
|
with _limit_cache_lock:
|
|
_limit_cache[room_id] = (limit, now + _LIMIT_TTL)
|
|
return limit
|
|
|
|
|
|
# --- HTTP handler ------------------------------------------------------------
|
|
|
|
|
|
class Handler(BaseHTTPRequestHandler):
|
|
protocol_version = "HTTP/1.1"
|
|
|
|
def log_message(self, *args): # silence default request logging
|
|
pass
|
|
|
|
def _read_body(self) -> bytes:
|
|
length = int(self.headers.get("Content-Length", 0) or 0)
|
|
return self.rfile.read(length) if length else b""
|
|
|
|
def _proxy(self, body: bytes):
|
|
"""Forward the current request to lk-jwt-service and return its response
|
|
as (status, headers_dict, body_bytes)."""
|
|
headers = {}
|
|
if self.headers.get("Content-Type"):
|
|
headers["Content-Type"] = self.headers["Content-Type"]
|
|
if self.headers.get("Accept"):
|
|
headers["Accept"] = self.headers["Accept"]
|
|
if self.headers.get("Origin"):
|
|
headers["Origin"] = self.headers["Origin"]
|
|
req = urllib.request.Request(
|
|
UPSTREAM + self.path,
|
|
data=body if self.command in ("POST", "PUT", "PATCH") else None,
|
|
headers=headers,
|
|
method=self.command,
|
|
)
|
|
try:
|
|
with urllib.request.urlopen(req, timeout=15) as resp:
|
|
return resp.status, dict(resp.headers), resp.read()
|
|
except urllib.error.HTTPError as exc:
|
|
return exc.code, dict(exc.headers), exc.read()
|
|
|
|
def _send(self, status: int, headers: dict, body: bytes):
|
|
self.send_response(status)
|
|
# send_response() already emits Server and Date; relaying them too would
|
|
# produce duplicates. Content-Length is recomputed below.
|
|
skip = ("transfer-encoding", "content-length", "connection", "date", "server")
|
|
for key, value in headers.items():
|
|
if key.lower() in skip:
|
|
continue
|
|
self.send_header(key, value)
|
|
self.send_header("Content-Length", str(len(body)))
|
|
self.end_headers()
|
|
if body and self.command != "HEAD":
|
|
self.wfile.write(body)
|
|
|
|
def _send_blocked(self):
|
|
body = json.dumps(
|
|
{"errcode": "M_FORBIDDEN", "error": "This voice channel is full."}
|
|
).encode()
|
|
headers = dict(CORS_HEADERS)
|
|
headers["Content-Type"] = "application/json"
|
|
self._send(403, headers, body)
|
|
|
|
def _handle(self):
|
|
body = self._read_body()
|
|
|
|
# Only token-issuing POSTs are subject to the limit check.
|
|
if not (self.command == "POST" and self.path in TOKEN_PATHS):
|
|
self._send(*self._proxy(body))
|
|
return
|
|
|
|
# Determine the room and its limit before bothering upstream.
|
|
try:
|
|
room_id = json.loads(body).get("room", "")
|
|
except Exception:
|
|
room_id = ""
|
|
|
|
limit = room_limit(room_id) if room_id else 0
|
|
|
|
status, headers, resp_body = self._proxy(body)
|
|
|
|
# No limit, or upstream didn't issue a token -> pass through unchanged.
|
|
if limit <= 0 or status != 200:
|
|
self._send(status, headers, resp_body)
|
|
return
|
|
|
|
# Limit applies and a token was issued — decide whether to allow it.
|
|
try:
|
|
payload = json.loads(resp_body)
|
|
token = payload.get("jwt", "")
|
|
claims = jwt_payload(token) or {}
|
|
alias = (claims.get("video") or {}).get("room", "")
|
|
requester = matrix_user(claims.get("sub", ""))
|
|
if not alias:
|
|
raise ValueError("no alias in issued token")
|
|
|
|
present = livekit_present_users(alias)
|
|
if should_block(limit, present, requester):
|
|
log(f"blocked {requester or '?'} from {room_id}: {len(present)}/{limit}")
|
|
self._send_blocked()
|
|
return
|
|
except Exception as exc:
|
|
# Fail open: never break a join because the guard had a problem.
|
|
log(f"limit check error for {room_id}: {exc}")
|
|
|
|
self._send(status, headers, resp_body)
|
|
|
|
# Map the HTTP verbs we care about onto the shared handler.
|
|
do_GET = _handle
|
|
do_POST = _handle
|
|
do_OPTIONS = _handle
|
|
do_HEAD = _handle
|
|
do_PUT = _handle
|
|
|
|
|
|
class GuardServer(ThreadingHTTPServer):
|
|
daemon_threads = True
|
|
# Element Call fires a burst of token requests per join; keep the accept
|
|
# queue generous so none are dropped.
|
|
request_queue_size = 128
|
|
allow_reuse_address = True
|
|
|
|
|
|
def main():
|
|
if not (LIVEKIT_KEY and LIVEKIT_SECRET and MATRIX_TOKEN):
|
|
log("WARNING: missing LIVEKIT_KEY/LIVEKIT_SECRET/MATRIX_TOKEN — limit checks will fail open")
|
|
server = GuardServer((BIND_HOST, BIND_PORT), Handler)
|
|
log(f"listening on {BIND_HOST}:{BIND_PORT} -> upstream {UPSTREAM}")
|
|
server.serve_forever()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|