Files
matrix/livekit/voice-limit-guard.py
T
jared a06f2c662a feat(livekit-guard): enforce per-room call permissions (screenshare/camera)
Extend voice-limit-guard to enforce a per-room publish-source policy
(io.lotus.room_quality allow_screenshare/allow_camera) for ALL Matrix clients,
alongside the existing participant limit.

- At token issue, re-sign the LiveKit JWT's canPublishSources to drop forbidden
  sources (microphone always kept). Verifies our own secret signed the token
  first and fails open on mismatch, so a secret drift can never mint a token the
  SFU rejects. Limit check and source policy are independent (one's outage can't
  skip the other).
- Live (mid-call) enforcement: a background reconcile loop calls LiveKit
  UpdateParticipant to revoke a forbidden source from participants who joined
  before the policy changed -- which unpublishes their in-progress
  screenshare/camera server-side within ~3s and blocks re-publish. Only removes
  sources (never grants), preserves other permission flags, fails open, and runs
  as a daemon thread that cannot crash or block token issuance.
- Endpoint-specific room-id extraction (/get_token->room_id, /sfu/get->room) so
  a client sending both keys can't get a different room's policy applied.
- Auto-deploy the guard on LXC 151 (py_compile-gated, backup + rollback).
- Unit tests: JWT re-sign/verify + tamper, secret-mismatch, source narrowing,
  reconcile (never-grant / preserve-flags / disable-on-empty), fail-open.

Numeric bitrate/fps caps are NOT server-enforceable on an SFU (LiveKit forwards,
never transcodes) and remain a Lotus-client-cooperative setting; the
screenshare/camera permission is the hard cross-client lever.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-30 22:34:34 -04:00

665 lines
27 KiB
Python

#!/usr/bin/env python3
"""
voice-limit-guard — hard, cross-client LiveKit room policy at token issue.
Sits in front of lk-jwt-service (the LiveKit MatrixRTC JWT issuer). Every
Matrix client (Element, FluffyChat, Lotus Chat, ...) must obtain a token from
this service before it can join a LiveKit room, so decisions made here are HARD
and apply to ALL clients — not just our own.
Two per-room policies are enforced, both read from Matrix room state via the
Synapse admin API (cached briefly):
1. Participant limit (`io.lotus.voice_limit` -> max_users). If the room is
full, the token is REFUSED (403), so the client cannot join.
2. Publish-source policy (`io.lotus.room_quality` -> allow_screenshare /
allow_camera). LiveKit is a pure SFU and CANNOT cap a publisher's bitrate/
framerate server-side (no such field exists in the grant, config, or admin
API — that stays a client-cooperative setting). But the JWT's
`video.canPublishSources` IS enforced by the SFU for every client, so we
can hard-block screenshare and/or camera per room. Because this guard holds
the LiveKit signing secret, it decodes the issued token, drops the
forbidden sources, and re-signs it before returning.
Enforced in TWO places so a policy applies both to new AND existing calls:
- at token issue (above), for anyone joining/rejoining, and
- LIVE, by a background reconcile loop (every GUARD_RECONCILE_INTERVAL s)
that calls LiveKit `UpdateParticipant` to narrow `canPublishSources`
for participants who were already connected when the policy changed —
which unpublishes their forbidden live track server-side for all
clients and blocks re-publish. It only ever REMOVES forbidden sources
(never grants), and no-ops once a room is compliant.
Flow for token requests (POST /sfu/get and legacy POST /get_token):
1. Read the request body and extract the Matrix room id (`room` on the legacy
endpoint, `room_id` on the newer one).
2. Forward the request to lk-jwt-service unchanged and capture its response.
3. If a token was issued (HTTP 200), look up the room policy:
- over the participant limit -> 403 (blocked).
- a publish-source restriction applies -> re-sign the JWT with a
narrowed `video.canPublishSources`.
Otherwise pass the token through untouched.
4. Anything that goes wrong FAILS OPEN: the upstream response is returned
unchanged, so calls keep working even if this guard is degraded.
All other requests (OPTIONS preflight, GET, unknown paths) are proxied
transparently so CORS and health behaviour match lk-jwt-service exactly.
"""
import base64
import hashlib
import hmac
import json
import os
import threading
import time
import urllib.error
import urllib.parse
import urllib.request
from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer
# --- configuration (from environment) ---------------------------------------
BIND_HOST = os.environ.get("GUARD_BIND_HOST", "0.0.0.0")
BIND_PORT = int(os.environ.get("GUARD_BIND_PORT", "8070"))
UPSTREAM = os.environ.get("GUARD_UPSTREAM", "http://127.0.0.1:8071").rstrip("/")
LIVEKIT_API = os.environ.get("LIVEKIT_API", "http://127.0.0.1:7880").rstrip("/")
LIVEKIT_KEY = os.environ.get("LIVEKIT_KEY", "")
LIVEKIT_SECRET = os.environ.get("LIVEKIT_SECRET", "")
SYNAPSE_API = os.environ.get("SYNAPSE_API", "http://127.0.0.1:8008").rstrip("/")
MATRIX_TOKEN = os.environ.get("MATRIX_TOKEN", "")
# Live (mid-call) enforcement: a background loop that revokes forbidden sources
# from participants who joined BEFORE a policy tightened. New joins are already
# handled by the JWT re-sign at issue time; this closes the mid-call-flip gap.
RECONCILE_ENABLED = os.environ.get("GUARD_RECONCILE", "1") not in ("0", "false", "")
# Floor the interval so a misconfigured 0 can't busy-loop the admin/Synapse APIs.
RECONCILE_INTERVAL = max(0.5, float(os.environ.get("GUARD_RECONCILE_INTERVAL", "3.0")))
TOKEN_PATHS = ("/sfu/get", "/get_token")
LIMIT_STATE_TYPE = "io.lotus.voice_limit"
QUALITY_STATE_TYPE = "io.lotus.room_quality"
# JWT grant source keys (lowercase — livekit/protocol auth.VideoGrant.canPublishSources).
SOURCE_MIC = "microphone"
SOURCE_CAM = "camera"
SOURCE_SCREEN = "screen_share"
SOURCE_SCREEN_AUDIO = "screen_share_audio"
# LiveKit admin-API / ListParticipants source enum names (UPPERCASE — protojson
# of livekit.TrackSource). Distinct from the JWT keys above.
API_SOURCE_CAM = "CAMERA"
API_SOURCE_MIC = "MICROPHONE"
API_SOURCE_SCREEN = "SCREEN_SHARE"
API_SOURCE_SCREEN_AUDIO = "SCREEN_SHARE_AUDIO"
ALL_API_SOURCES = (API_SOURCE_CAM, API_SOURCE_MIC, API_SOURCE_SCREEN, API_SOURCE_SCREEN_AUDIO)
CORS_HEADERS = {
"Access-Control-Allow-Origin": "*",
"Access-Control-Allow-Methods": "POST",
"Access-Control-Allow-Headers": "Accept, Content-Type, Content-Length, Accept-Encoding, X-CSRF-Token",
}
# --- small helpers -----------------------------------------------------------
def log(msg):
print(f"[voice-limit-guard] {msg}", flush=True)
def b64url(data: bytes) -> str:
return base64.urlsafe_b64encode(data).rstrip(b"=").decode()
def b64url_decode(seg: str) -> bytes:
"""Decode a base64url segment that may have had its `=` padding stripped."""
seg += "=" * (-len(seg) % 4)
return base64.urlsafe_b64decode(seg)
def jwt_payload(token: str):
"""Best-effort decode of a JWT payload without verification."""
try:
return json.loads(b64url_decode(token.split(".")[1]))
except Exception:
return None
def room_id_from_request(path: str, data: dict) -> str:
"""The room id field lk-jwt-service reads for this endpoint. Endpoint-specific
(`/get_token` -> room_id, `/sfu/get` -> room) so a client sending BOTH keys
can't make us enforce a different room's policy than the token is minted for.
"""
if path == "/get_token":
return data.get("room_id") or ""
return data.get("room") or ""
def should_block(limit: int, present_users: set, requester: str) -> bool:
"""Pure decision: should this token be refused?
- limit <= 0 -> never block (no limit configured)
- requester already present -> never block (rejoin / extra device)
- distinct users >= limit -> block
"""
if limit <= 0:
return False
if requester and requester in present_users:
return False
return len(present_users) >= limit
def allowed_sources(policy: dict):
"""Pure decision: the narrowed `canPublishSources` list, or None.
Microphone is ALWAYS allowed (this is a voice call). Camera and screenshare
are dropped when the room policy forbids them. Returns None when everything
is allowed, so the caller can skip re-signing the token entirely.
"""
allow_cam = policy.get("allow_camera", True)
allow_screen = policy.get("allow_screenshare", True)
if allow_cam and allow_screen:
return None
sources = [SOURCE_MIC]
if allow_cam:
sources.append(SOURCE_CAM)
if allow_screen:
sources.extend((SOURCE_SCREEN, SOURCE_SCREEN_AUDIO))
return sources
def verify_jwt_sig(token: str, secret: str) -> bool:
"""True if `token` is HS256-signed with `secret`.
Used to confirm OUR LiveKit secret actually signed the token lk-jwt-service
issued before we re-sign it. If the secrets have drifted, re-signing would
mint a token the SFU rejects (a fail-CLOSED break), so the caller skips the
restriction and passes the original token through instead.
"""
try:
header_b64, payload_b64, sig = token.split(".")
expected = b64url(
hmac.new(secret.encode(), f"{header_b64}.{payload_b64}".encode(), hashlib.sha256).digest()
)
return hmac.compare_digest(expected, sig)
except Exception:
return False
def resign_jwt(token: str, secret: str, sources: list) -> str:
"""Re-sign the issued JWT with a restricted `video.canPublishSources`.
Preserves the original JWT header segment and every claim except
`video.canPublishSources` (so `sub`, `video.room`, `exp`, etc. are
unchanged), then HS256-signs with the shared LiveKit secret. lk-jwt-service
signs with the same key, so the SFU accepts our signature identically.
"""
header_b64, payload_b64, _sig = token.split(".")
payload = json.loads(b64url_decode(payload_b64))
video = payload.get("video")
if not isinstance(video, dict):
raise ValueError("issued token has no video grant")
video["canPublishSources"] = sources
new_payload_b64 = b64url(json.dumps(payload, separators=(",", ":")).encode())
signing_input = f"{header_b64}.{new_payload_b64}".encode()
sig = b64url(hmac.new(secret.encode(), signing_input, hashlib.sha256).digest())
return f"{header_b64}.{new_payload_b64}.{sig}"
def matrix_user(identity: str) -> str:
"""Reduce a LiveKit identity (`@user:domain:DEVICE`) to `@user:domain`.
Non-Matrix identities (e.g. hashed federated identities) are returned
unchanged so they each count as a distinct user.
"""
if not identity.startswith("@"):
return identity
first = identity.find(":")
if first == -1:
return identity
second = identity.find(":", first + 1)
return identity if second == -1 else identity[:second]
# --- LiveKit admin JWT + participant counting --------------------------------
def livekit_admin_token(room: str) -> str:
now = int(time.time())
header = b64url(json.dumps({"alg": "HS256", "typ": "JWT"}).encode())
payload = b64url(
json.dumps(
{
"iss": LIVEKIT_KEY,
"exp": now + 120,
"nbf": now - 10,
"video": {"roomAdmin": True, "room": room, "roomList": True},
}
).encode()
)
signing_input = f"{header}.{payload}".encode()
sig = b64url(hmac.new(LIVEKIT_SECRET.encode(), signing_input, hashlib.sha256).digest())
return f"{header}.{payload}.{sig}"
def _livekit_admin_post(alias: str, method: str, body: dict) -> bytes:
"""POST to a LiveKit RoomService Twirp method with a fresh admin token."""
req = urllib.request.Request(
f"{LIVEKIT_API}/twirp/livekit.RoomService/{method}",
data=json.dumps(body).encode(),
headers={
"Authorization": "Bearer " + livekit_admin_token(alias),
"Content-Type": "application/json",
},
method="POST",
)
with urllib.request.urlopen(req, timeout=5) as resp:
return resp.read()
def livekit_list_participants(alias: str) -> list:
"""Return the raw list of ParticipantInfo dicts in the LiveKit room."""
data = json.loads(_livekit_admin_post(alias, "ListParticipants", {"room": alias}))
parts = data.get("participants", [])
return parts if isinstance(parts, list) else []
def livekit_present_users(alias: str):
"""Return the set of distinct Matrix users currently in the LiveKit room."""
return {matrix_user(p.get("identity", "")) for p in livekit_list_participants(alias)}
def livekit_update_participant(alias: str, identity: str, permission: dict) -> None:
"""Replace a participant's ParticipantPermission (livekit UpdateParticipant).
Narrowing `canPublishSources` unpublishes the participant's live tracks of
the removed sources server-side (for ALL clients) and blocks re-publish.
A participant who has already left yields a Twirp not_found (HTTP 404),
which is benign here.
"""
try:
_livekit_admin_post(
alias,
"UpdateParticipant",
{"room": alias, "identity": identity, "permission": permission},
)
except urllib.error.HTTPError as exc:
if exc.code != 404:
log(f"update_participant {identity} in {alias} -> HTTP {exc.code}")
# --- per-room policy lookup (cached) -----------------------------------------
# room_id -> (state_dict, fetched_at_epoch). One Synapse /state fetch serves
# both the participant-limit and publish-policy checks. Callers pass max_age so
# the token path can tolerate 10s staleness while the reconcile loop reads fresh.
_state_cache = {}
_state_cache_lock = threading.Lock()
_STATE_TTL = 10.0
_DEFAULT_STATE = {"max_users": 0, "allow_screenshare": True, "allow_camera": True}
def room_state(room_id: str, max_age: float = _STATE_TTL) -> dict:
"""Fetch the room's Lotus policy (`io.lotus.voice_limit` +
`io.lotus.room_quality`) from Synapse admin state, cached briefly.
`max_age` is the oldest cached value the caller will accept (seconds). The
token path uses the default 10s (dedupes Element Call's per-join burst); the
reconcile loop passes a small value so a policy change is seen quickly.
Fails OPEN: on any error returns the permissive defaults (no limit, all
sources allowed) so a Synapse hiccup never blocks or restricts a call.
"""
now = time.time()
with _state_cache_lock:
cached = _state_cache.get(room_id)
if cached and (now - cached[1]) < max_age:
return cached[0]
state = dict(_DEFAULT_STATE)
try:
url = (
f"{SYNAPSE_API}/_synapse/admin/v1/rooms/"
f"{urllib.parse.quote(room_id, safe='')}/state"
)
req = urllib.request.Request(url, headers={"Authorization": "Bearer " + MATRIX_TOKEN})
with urllib.request.urlopen(req, timeout=5) as resp:
events = json.loads(resp.read()).get("state", [])
for ev in events:
if ev.get("state_key", "") != "":
continue
etype = ev.get("type")
content = ev.get("content", {}) or {}
if etype == LIMIT_STATE_TYPE:
state["max_users"] = int(content.get("max_users", 0) or 0)
elif etype == QUALITY_STATE_TYPE:
# Only an explicit `false` forbids; absent/other -> allowed.
state["allow_screenshare"] = content.get("allow_screenshare", True) is not False
state["allow_camera"] = content.get("allow_camera", True) is not False
except Exception as exc:
log(f"room state lookup failed for {room_id}: {exc}")
return dict(_DEFAULT_STATE)
with _state_cache_lock:
# Opportunistically drop stale entries so a long-lived server that sees
# many distinct rooms doesn't grow the cache without bound.
if len(_state_cache) > 512:
for key in [k for k, v in _state_cache.items() if (now - v[1]) >= _STATE_TTL]:
del _state_cache[key]
_state_cache[room_id] = (state, now)
return state
# --- live (mid-call) enforcement: reconcile loop -----------------------------
# LiveKit hashed-alias -> Matrix room id, learned at token issue. Lets the
# reconcile loop map an active LiveKit room back to its Matrix policy even for
# participants who joined before a policy change.
_alias_to_room = {}
_alias_lock = threading.Lock()
def remember_room(alias: str, room_id: str) -> None:
# No consumer when the reconcile loop is off, so don't accumulate the map.
if not RECONCILE_ENABLED or not alias or not room_id:
return
with _alias_lock:
# Soft cap; the reconcile loop prunes ended rooms each sweep.
if len(_alias_to_room) > 4096 and alias not in _alias_to_room:
return
_alias_to_room[alias] = room_id
def forget_room(alias: str) -> None:
with _alias_lock:
_alias_to_room.pop(alias, None)
def forbidden_sources(policy: dict) -> set:
"""Pure: the set of UPPERCASE API source names this room forbids (may be
empty). Only an explicit False forbids."""
forbidden = set()
if policy.get("allow_screenshare", True) is False:
forbidden.update((API_SOURCE_SCREEN, API_SOURCE_SCREEN_AUDIO))
if policy.get("allow_camera", True) is False:
forbidden.add(API_SOURCE_CAM)
return forbidden
def reconcile_publish_sources(current, forbidden: set):
"""Pure: given a participant's current `canPublishSources` (UPPERCASE list;
[] means ALL allowed) and the forbidden set, return the new sources list to
enforce, or None if the participant is already compliant.
Never returns a wider set than `current` (we only REMOVE forbidden sources,
never grant). An empty return means "no permitted source remains" — the
caller should set canPublish=False instead of sending an empty list (which
LiveKit reads as 'all allowed').
"""
effective = set(current) if current else set(ALL_API_SOURCES)
if effective.isdisjoint(forbidden):
return None # nothing forbidden is present -> already compliant
return sorted(effective - forbidden)
def reconcile_participant(alias: str, participant: dict, forbidden: set) -> bool:
"""Enforce the forbidden-source policy on one live participant. Returns True
if an UpdateParticipant call was issued."""
perm = participant.get("permission") or {}
if not perm.get("canPublish", False):
return False # publishes nothing -> nothing to revoke
current = perm.get("canPublishSources") or []
desired = reconcile_publish_sources(current, forbidden)
if desired is None:
return False # already compliant
identity = participant.get("identity")
if not identity:
return False
# Full REPLACE: copy the existing permission and change only the publish
# fields, so canSubscribe / canPublishData / etc. are preserved.
new_perm = dict(perm)
if desired:
new_perm["canPublish"] = True
new_perm["canPublishSources"] = desired
else:
new_perm["canPublish"] = False
livekit_update_participant(alias, identity, new_perm)
log(f"revoked forbidden sources from {identity} in {alias}: keep {desired or '[]'}")
return True
def reconcile_room(alias: str, room_id: str) -> None:
"""Enforce the current publish policy on every participant in one room.
Drops the room from the alias map when it is no longer active."""
# Read the policy fresh-ish so a flip is picked up within ~one interval.
policy = room_state(room_id, max_age=RECONCILE_INTERVAL)
forbidden = forbidden_sources(policy)
if not forbidden:
return # nothing restricted; leave permissions untouched
try:
participants = livekit_list_participants(alias)
except urllib.error.HTTPError as exc:
if exc.code == 404:
forget_room(alias) # room gone
else:
log(f"reconcile list_participants {alias} -> HTTP {exc.code}")
return
except Exception as exc:
log(f"reconcile list_participants error for {alias}: {exc}")
return
if not participants:
# Empty, but the room may still exist (LiveKit keeps it until
# empty_timeout). Don't forget on an empty read — only a 404 (room gone)
# prunes — so a transient empty/race can't drop mid-call enforcement.
return
for p in participants:
try:
reconcile_participant(alias, p, forbidden)
except Exception as exc:
log(f"reconcile participant error in {alias}: {exc}")
def reconcile_all() -> None:
with _alias_lock:
items = list(_alias_to_room.items())
for alias, room_id in items:
reconcile_room(alias, room_id)
def reconcile_loop() -> None:
log(f"reconcile loop started (interval {RECONCILE_INTERVAL}s)")
while True:
time.sleep(RECONCILE_INTERVAL)
try:
reconcile_all()
except Exception as exc: # never let the loop die
log(f"reconcile sweep error: {exc}")
# --- HTTP handler ------------------------------------------------------------
class Handler(BaseHTTPRequestHandler):
protocol_version = "HTTP/1.1"
def log_message(self, *args): # silence default request logging
pass
def _read_body(self) -> bytes:
length = int(self.headers.get("Content-Length", 0) or 0)
return self.rfile.read(length) if length else b""
def _proxy(self, body: bytes):
"""Forward the current request to lk-jwt-service and return its response
as (status, headers_dict, body_bytes)."""
headers = {}
if self.headers.get("Content-Type"):
headers["Content-Type"] = self.headers["Content-Type"]
if self.headers.get("Accept"):
headers["Accept"] = self.headers["Accept"]
if self.headers.get("Origin"):
headers["Origin"] = self.headers["Origin"]
req = urllib.request.Request(
UPSTREAM + self.path,
data=body if self.command in ("POST", "PUT", "PATCH") else None,
headers=headers,
method=self.command,
)
try:
with urllib.request.urlopen(req, timeout=15) as resp:
return resp.status, dict(resp.headers), resp.read()
except urllib.error.HTTPError as exc:
return exc.code, dict(exc.headers), exc.read()
def _send(self, status: int, headers: dict, body: bytes):
self.send_response(status)
# send_response() already emits Server and Date; relaying them too would
# produce duplicates. Content-Length is recomputed below.
# content-encoding is stripped because we may re-serialize the body
# (a plaintext JSON body must never carry a stale gzip/deflate header).
skip = (
"transfer-encoding",
"content-length",
"content-encoding",
"connection",
"date",
"server",
)
for key, value in headers.items():
if key.lower() in skip:
continue
self.send_header(key, value)
self.send_header("Content-Length", str(len(body)))
self.end_headers()
if body and self.command != "HEAD":
self.wfile.write(body)
def _send_blocked(self):
body = json.dumps(
{"errcode": "M_FORBIDDEN", "error": "This voice channel is full."}
).encode()
headers = dict(CORS_HEADERS)
headers["Content-Type"] = "application/json"
self._send(403, headers, body)
def _handle(self):
body = self._read_body()
# Only token-issuing POSTs are subject to policy.
if not (self.command == "POST" and self.path in TOKEN_PATHS):
self._send(*self._proxy(body))
return
# Extract the room id using the SAME field the endpoint's lk-jwt-service
# handler reads, so a client sending BOTH keys can't make us enforce a
# different room's policy than the token is minted for (bypass vector):
# /get_token (new) -> room_id
# /sfu/get (legacy) -> room
try:
room_id = room_id_from_request(self.path, json.loads(body))
except Exception:
room_id = ""
status, headers, resp_body = self._proxy(body)
# Only a successfully-issued token can be gated or modified.
if status != 200:
self._send(status, headers, resp_body)
return
if not room_id:
# A token was issued but we couldn't identify the room, so no policy
# is applied. Log it so silent enforcement-loss (e.g. a request
# schema change) is observable rather than invisible.
log("issued token with unparseable room id — no policy enforced")
self._send(status, headers, resp_body)
return
# Decode the issued token once (cheap). Needed for policy enforcement AND
# to learn this room's LiveKit alias for the reconcile map, so a policy
# set AFTER this participant joins can still be enforced on them. If the
# token can't be parsed, fail open.
try:
payload = json.loads(resp_body)
token = payload.get("jwt", "")
claims = jwt_payload(token) or {}
alias = (claims.get("video") or {}).get("room", "")
requester = matrix_user(claims.get("sub", ""))
except Exception as exc:
log(f"could not parse issued token for {room_id}: {exc}")
self._send(status, headers, resp_body)
return
remember_room(alias, room_id)
state = room_state(room_id)
limit = state["max_users"]
sources = allowed_sources(state)
# Fast path: nothing to enforce at join time for this room.
if limit <= 0 and sources is None:
self._send(status, headers, resp_body)
return
# (1) Hard participant limit — refuse the token entirely. Isolated so a
# LiveKit-admin outage here cannot skip the publish-source policy below.
if limit > 0 and alias:
try:
present = livekit_present_users(alias)
if should_block(limit, present, requester):
log(f"blocked {requester or '?'} from {room_id}: {len(present)}/{limit}")
self._send_blocked()
return
except Exception as exc:
# Fail open on the limit only; still apply the policy below.
log(f"limit check error for {room_id}: {exc}")
# (2) Hard publish-source policy — re-sign with a narrowed
# canPublishSources so the SFU forbids camera/screenshare for ALL clients
# (numeric bitrate/fps caps are not SFU-enforceable). Needs no LiveKit
# call, so it is independent of (1). Verify OUR secret actually signed
# the token first: on a secret mismatch, re-signing would mint a token
# the SFU rejects (fail-CLOSED), so skip and pass the original through.
if sources is not None and token:
try:
if verify_jwt_sig(token, LIVEKIT_SECRET):
payload["jwt"] = resign_jwt(token, LIVEKIT_SECRET, sources)
resp_body = json.dumps(payload).encode()
log(f"restricted publish sources for {requester or '?'} in {room_id}: {sources}")
else:
log(f"LIVEKIT_SECRET mismatch — skipping source policy for {room_id} (fail open)")
except Exception as exc:
log(f"publish-source policy error for {room_id}: {exc}")
self._send(status, headers, resp_body)
# Map the HTTP verbs we care about onto the shared handler.
do_GET = _handle
do_POST = _handle
do_OPTIONS = _handle
do_HEAD = _handle
do_PUT = _handle
class GuardServer(ThreadingHTTPServer):
daemon_threads = True
# Element Call fires a burst of token requests per join; keep the accept
# queue generous so none are dropped.
request_queue_size = 128
allow_reuse_address = True
def main():
if not (LIVEKIT_KEY and LIVEKIT_SECRET and MATRIX_TOKEN):
log("WARNING: missing LIVEKIT_KEY/LIVEKIT_SECRET/MATRIX_TOKEN — checks will fail open")
# Live enforcement loop (revokes forbidden sources from participants who
# joined before a policy change). Daemon thread — never blocks shutdown, and
# its errors can't take down token issuance.
if RECONCILE_ENABLED and LIVEKIT_KEY and LIVEKIT_SECRET and MATRIX_TOKEN:
threading.Thread(target=reconcile_loop, name="reconcile", daemon=True).start()
elif not RECONCILE_ENABLED:
log("reconcile loop disabled (GUARD_RECONCILE=0)")
server = GuardServer((BIND_HOST, BIND_PORT), Handler)
log(f"listening on {BIND_HOST}:{BIND_PORT} -> upstream {UPSTREAM}")
server.serve_forever()
if __name__ == "__main__":
main()