From a06f2c662a150121759a48c2cc5ac9c35a994e59 Mon Sep 17 00:00:00 2001 From: Jared Vititoe Date: Tue, 30 Jun 2026 22:34:34 -0400 Subject: [PATCH] feat(livekit-guard): enforce per-room call permissions (screenshare/camera) Extend voice-limit-guard to enforce a per-room publish-source policy (io.lotus.room_quality allow_screenshare/allow_camera) for ALL Matrix clients, alongside the existing participant limit. - At token issue, re-sign the LiveKit JWT's canPublishSources to drop forbidden sources (microphone always kept). Verifies our own secret signed the token first and fails open on mismatch, so a secret drift can never mint a token the SFU rejects. Limit check and source policy are independent (one's outage can't skip the other). - Live (mid-call) enforcement: a background reconcile loop calls LiveKit UpdateParticipant to revoke a forbidden source from participants who joined before the policy changed -- which unpublishes their in-progress screenshare/camera server-side within ~3s and blocks re-publish. Only removes sources (never grants), preserves other permission flags, fails open, and runs as a daemon thread that cannot crash or block token issuance. - Endpoint-specific room-id extraction (/get_token->room_id, /sfu/get->room) so a client sending both keys can't get a different room's policy applied. - Auto-deploy the guard on LXC 151 (py_compile-gated, backup + rollback). - Unit tests: JWT re-sign/verify + tamper, secret-mismatch, source narrowing, reconcile (never-grant / preserve-flags / disable-on-empty), fail-open. Numeric bitrate/fps caps are NOT server-enforceable on an SFU (LiveKit forwards, never transcodes) and remain a Lotus-client-cooperative setting; the screenshare/camera permission is the hard cross-client lever. Co-Authored-By: Claude Opus 4.8 --- README.md | 78 +++-- deploy/lxc151-hookshot.sh | 45 ++- livekit/test_voice_limit_guard.py | 364 ++++++++++++++++++++++ livekit/voice-limit-guard.py | 483 ++++++++++++++++++++++++++---- 4 files changed, 873 insertions(+), 97 deletions(-) create mode 100644 livekit/test_voice_limit_guard.py diff --git a/README.md b/README.md index 54bbf69..47fc4fa 100644 --- a/README.md +++ b/README.md @@ -68,7 +68,7 @@ matrix/ - LiveKit config: `/etc/livekit/config.yaml` - LiveKit service: `livekit-server.service` - lk-jwt-service: `lk-jwt-service.service` (now binds `:8071` via drop-in `/etc/systemd/system/lk-jwt-service.service.d/override.conf`; serves JWT tokens for MatrixRTC at `/sfu/get` and legacy `/get_token`) -- voice-limit-guard: `voice-limit-guard.service` (binds `:8070`, fronts lk-jwt-service — enforces hard per-room voice participant limits for ALL clients; script `/opt/voice-limit-guard/voice-limit-guard.py`) — see [Voice Channel Limits](#voice-channel-limits) +- voice-limit-guard: `voice-limit-guard.service` (binds `:8070`, fronts lk-jwt-service — enforces hard per-room voice participant limits **and publish permissions (screenshare/camera via JWT re-signing)** for ALL clients; script `/opt/voice-limit-guard/voice-limit-guard.py`) — see [Voice Channel Limits & Call Permissions](#voice-channel-limits--call-permissions) - Hookshot: `/opt/hookshot/`, service: `matrix-hookshot.service` - Hookshot config: `/opt/hookshot/config.yml` - Hookshot registration: `/etc/matrix-synapse/hookshot-registration.yaml` @@ -129,7 +129,7 @@ Pushes to `main` on `LotusGuild/matrix` automatically deploy to the relevant LXC | LXC | Service | IP | Port | Deploys When Changed | |-----|---------|----|----|----------------------| -| 151 | matrix/hookshot | 10.10.10.29 | **9500** | `hookshot/*.js`, `systemd/livekit-server.service` | +| 151 | matrix/hookshot | 10.10.10.29 | **9500** | `hookshot/*.js`, `systemd/livekit-server.service`, `livekit/voice-limit-guard.py`, `systemd/voice-limit-guard.service`, `matrixbot/*` | | 106 | cinny | 10.10.10.6 | 9000 | `cinny/config.json`, `cinny/upstream-check.sh`, `cinny/lotus-build.sh`, `deploy/hooks-lxc106.json`, `systemd/cinny-upstream-check.cron` | | 139 | landing/NPM | 10.10.10.27 | 9000 | `landing/index.html` | | 110 | draupnir | 10.10.10.24 | 9000 | `draupnir/production.yaml` | @@ -141,6 +141,7 @@ Pushes to `main` on `LotusGuild/matrix` automatically deploy to the relevant LXC **LXC 151 — hookshot/livekit:** - `hookshot/*.js` changed → runs `hookshot/deploy.sh` (pushes transform functions to Matrix room state via API, requires `MATRIX_TOKEN` in `/etc/matrix-deploy.env`) - `systemd/livekit-server.service` changed → copies file, `daemon-reload`, sets `/run/livekit-restart-pending` flag (actual restart deferred — see Livekit Graceful Restart below) +- `livekit/voice-limit-guard.py` / `systemd/voice-limit-guard.service` changed → `py_compile`-validates, installs to `/opt/voice-limit-guard/`, `daemon-reload` (if unit changed), and restarts `voice-limit-guard` (restart only affects joins in a ~1s window; established calls talk directly to livekit-server, so no call is dropped) **LXC 106 — cinny:** - `cinny/config.json` → copies to `/var/www/html/config.json` @@ -188,34 +189,43 @@ Killing livekit-server while a call is active drops everyone. Instead: --- -## Voice Channel Limits +## Voice Channel Limits & Call Permissions -Per-room voice participant caps are enforced **server-side for every client** (Element, FluffyChat, Lotus Chat, …), not just our own web client. +Per-room voice **participant caps** and **publish permissions** (screenshare / camera) are enforced **server-side for every client** (Element, FluffyChat, Lotus Chat, …), not just our own web client. Both are enforced by the same `voice-limit-guard` sidecar (`livekit/voice-limit-guard.py`), which fronts lk-jwt-service at token issue. **How it works** -Every Matrix client must fetch a LiveKit JWT from lk-jwt-service before it can join a call. `voice-limit-guard` (a small fail-open Python sidecar, `livekit/voice-limit-guard.py` in this repo) sits in front of that service: +Every Matrix client must fetch a LiveKit JWT from lk-jwt-service before it can join a call. `voice-limit-guard` (a small fail-open Python sidecar) sits in front of that service: - lk-jwt-service was moved off `:8070` to `:8071` (systemd drop-in). The guard now owns `:8070`, so NPM's existing `/sfu/get` + `/get_token` proxy targets are unchanged. -- On each token request the guard reads `io.lotus.voice_limit` → `max_users` for the room (Synapse admin API, cached 10 s). `0` / absent = no limit. -- It forwards the request to lk-jwt-service, and if a token is issued it decodes the JWT to get the LiveKit alias (`video.room`) + requester identity (`sub`), then asks LiveKit `ListParticipants` how many **distinct Matrix users** are in the room. - - requester already present (rejoin / extra device) → allow - - distinct users ≥ limit → **403** (the client cannot get a token, so it cannot join) - - otherwise → allow -- **Fail-open:** any error (admin API down, bad token, LiveKit unreachable) returns the upstream response unchanged, so calls keep working even if enforcement is degraded. +- On each token request the guard reads the room's Lotus policy from Synapse admin state (one `/state` fetch, cached 10 s): `io.lotus.voice_limit` → `max_users`, and `io.lotus.room_quality` → `allow_screenshare` / `allow_camera`. The room id is taken from the **endpoint's own field** (`/get_token` → `room_id`, `/sfu/get` → `room`) exactly as lk-jwt-service reads it, so a client sending both keys can't get a different room's policy applied than the token is minted for. +- **Participant limit** — it forwards to lk-jwt-service, and if a token is issued decodes the JWT to get the LiveKit alias (`video.room`) + requester (`sub`), then asks LiveKit `ListParticipants` how many **distinct Matrix users** are in the room. requester already present (rejoin) → allow · distinct users ≥ limit → **403** · otherwise → allow. +- **Publish permissions (screenshare / camera)** — LiveKit is a pure SFU and **cannot cap a publisher's bitrate/framerate** (no such field exists in the grant/config/API — that stays a Lotus-client-cooperative setting). But the JWT's `video.canPublishSources` **is** SFU-enforced for every client. Since the guard holds the LiveKit signing secret, when a room forbids a source it **decodes the issued token, drops `screen_share`/`screen_share_audio` (and/or `camera`) from `canPublishSources`, and re-signs it** (HS256, same key). Microphone is always kept. The SFU then rejects those tracks for **all** clients — nothing to opt into. +- **Live (mid-call) enforcement** — the JWT re-sign covers anyone *joining* after a policy change. For people **already in the call**, a background **reconcile loop** (every `GUARD_RECONCILE_INTERVAL`, default 3 s) calls LiveKit `UpdateParticipant` to narrow their `canPublishSources`, which **unpublishes an in-progress screenshare/camera server-side for all clients** and blocks re-publish (confirmed LiveKit 1.9.11 behavior: reducing `can_publish_sources` removes the offending live track). So flipping a room to audio-only kills existing cameras/screenshares within ~one interval. The loop learns each LiveKit room's Matrix id from tokens it issues, only ever **removes** forbidden sources (never grants), preserves every other permission flag (full-replace safety), and no-ops once compliant. Disable with `GUARD_RECONCILE=0`. +- **Fail-open:** any error (admin API down, bad/absent token, LiveKit unreachable, unparseable room id, unexpected JWT shape) returns the upstream response **unchanged**, so calls keep working even if enforcement is degraded. The limit check and the source-policy re-sign are **independent** (a LiveKit-admin outage during the limit count can't skip the source restriction, and vice-versa). Before re-signing, the guard **verifies its own secret actually signed the token** — on a `LIVEKIT_SECRET` mismatch it skips the restriction and passes the original token through (so a secret drift can never emit a token the SFU rejects). A room with no policy set takes a zero-overhead fast path (token untouched). -**Setting a limit:** room admins set it from Lotus Chat → Room Settings → General → **Voice** (writes the `io.lotus.voice_limit` state event). Any tool that can send room state works too: +> **Security note:** `LIVEKIT_KEY`/`LIVEKIT_SECRET` are currently hardcoded in `systemd/voice-limit-guard.service` (pre-existing). Since this secret now also signs re-issued join tokens, it should be moved into `/etc/matrix-deploy.env` (already an `EnvironmentFile` on LXC 151) and the exposed value rotated. Not changed automatically to avoid a deploy breaking before the env file carries it. + +Pure logic (limit decision, source narrowing, JWT re-sign/verify roundtrip, tamper detection) is unit-tested in `livekit/test_voice_limit_guard.py` (`python3 -m unittest livekit.test_voice_limit_guard`). + +**Setting policy:** room admins use Lotus Chat → Room Settings → General → **Voice** (Call Permissions switches + Quality Caps). Any tool that can send room state works too: ```bash -# max 5 participants in ; send {} to remove the limit +# max 5 participants; send {} to remove the limit curl -X PUT -H "Authorization: Bearer " -H "Content-Type: application/json" \ "https://matrix.lotusguild.org/_matrix/client/v3/rooms//state/io.lotus.voice_limit/" \ -d '{"max_users": 5}' + +# forbid screenshare + make it audio-only (hard, all clients); numeric caps are +# Lotus-client-cooperative hints in the same event +curl -X PUT -H "Authorization: Bearer " -H "Content-Type: application/json" \ + "https://matrix.lotusguild.org/_matrix/client/v3/rooms//state/io.lotus.room_quality/" \ + -d '{"allow_screenshare": false, "allow_camera": false, "audio_max_kbps": 32}' ``` **Config:** the guard reads `MATRIX_TOKEN` (server-admin) from `/etc/matrix-deploy.env`; LiveKit key/secret + ports are set in `systemd/voice-limit-guard.service`. -**Manual (re)deploy** (the file-specific auto-deploy pipeline does not cover this service): +**Deploy:** auto-deploys on push (LXC 151 handler `py_compile`-validates then restarts the guard). Manual (re)deploy / first-time setup: ```bash # On LXC 151 @@ -454,27 +464,31 @@ chmod 600 /etc/cinny-monitor.env **Why 8GB RAM:** Vite's build process needs ~6GB Node heap (`--max_old_space_size=6144`) for the rendering-chunks phase. Previously at 4GB — OOM killed during render. -### 🔱 Planned: Element Call fork — "Lotus Call" (true ownership) +### 🔱 Element Call fork — "Lotus Call" (true ownership) — LIVE -We currently embed Element Call as a **pre-built npm bundle** -(`@element-hq/element-call-embedded` 0.20.1, copied to cinny `public/element-call/`). -We can steer it (widget API + same-origin DOM hacks) but **cannot change its -compiled logic** — so several in-call issues (avatar decorations on tiles, camera -focus/fullscreen during screenshare, mic-after-reconnect, native theming, real -call-audio injection for a soundboard) are unfixable from outside. +We **self-build** Element Call from a fork (`LotusGuild/element-call`) and publish +it to our private Gitea npm registry as `@lotusguild/element-call-embedded` +(`0.20.1-lotus.1`); cinny consumes that instead of the upstream +`@element-hq/element-call-embedded` bundle. In-call behavior is now editable +source, not just widget-API + DOM steering. This is AGPL (same license). -**Plan: fork `element-hq/element-call` → a new `LotusGuild/element-call` repo, -build it from source, host our build, and replace the npm bundle.** This is AGPL -(same license, compatible). Infra implications for THIS repo: -- EC talks to our **LiveKit SFU** (`livekit/`, LXC 151) + `lk-jwt-service` — the - fork's runtime `config.json` must point at `matrix.lotusguild.org` + our - LiveKit. The current cinny EC `config.json` lives in `cinny/config.json` here. -- A new build/deploy pipeline for the EC fork will be needed (likely its own LXC - or CI artifact), analogous to the cinny build on LXC 106. +**Shipped via the fork:** in-source denoise (a LiveKit `TrackProcessor` that +survives reconnects), in-call speaking/mute events, focus-a-participant during +screenshare, avatar decorations on EC video tiles, native transparent background. +**Built but dormant (need cinny UI):** call-audio injection +(`io.lotus.inject_audio`, unblocks a real in-call soundboard) and quality controls +(`io.lotus.set_quality`). + +Infra notes for THIS repo: +- EC talks to our **LiveKit SFU** (`livekit/`, LXC 151) + `lk-jwt-service`; the + fork's runtime `config.json` points at `matrix.lotusguild.org` + our LiveKit. + The cinny EC `config.json` lives in `cinny/config.json` here. +- **Build/deploy:** the fork builds in the cinny pipeline (its `dist/` is bundled + into the cinny build that LXC 106 serves) — no separate EC LXC. A future quality + controls feature (P5-31) would add a `voice-limit-guard`-style sidecar on LXC 151. **Full handoff & step-by-step plan:** `LotusGuild/cinny` → [`HANDOFF_ELEMENT_CALL_FORK.md`](https://code.lotusguild.org/LotusGuild/cinny/src/branch/lotus/HANDOFF_ELEMENT_CALL_FORK.md). -Start a fresh session with that doc. ### Custom Features @@ -482,7 +496,7 @@ All custom code lives in `src/app/` on the `lotus` branch of `code.lotusguild.or | Feature | Files | Notes | |---------|-------|-------| -| **Element Call embed** | `src/app/plugins/call/`, `src/app/hooks/useCallEmbed.ts`, `src/app/components/CallEmbedProvider.tsx` | EC 0.20.1 (`@element-hq/element-call-embedded`), **prebuilt** dist copied to `public/element-call/` by vite. Same-origin (`allow-same-origin`), controlled via `matrix-widget-api` + DOM-poking. 🔱 **[EC-FORK]** planned — see `LotusGuild/cinny` → `HANDOFF_ELEMENT_CALL_FORK.md` | +| **Element Call embed** | `src/app/plugins/call/`, `src/app/hooks/useCallEmbed.ts`, `src/app/components/CallEmbedProvider.tsx` | 🔱 **[EC-FORK] LIVE** — self-built fork `@lotusguild/element-call-embedded@0.20.1-lotus.1` (source `LotusGuild/element-call`), bundled into the cinny build, served same-origin. Steered via `matrix-widget-api` + custom `io.lotus.*` actions (call_state, focus_participant, decorations, inject_audio, set_quality) — DOM-poking retained only as fallback. See `LotusGuild/cinny` → `HANDOFF_ELEMENT_CALL_FORK.md` | | **DM calls** | `src/app/features/room/Room.tsx`, `src/app/features/room/RoomViewHeader.tsx` | Phone button in DM room header; `useCallStart(true)` passes `intent: StartedByUser`; Room.tsx switches to CallView layout when DM has active call | | **Picture-in-picture call** | `src/app/components/CallEmbedProvider.tsx` | When navigating away from the call room, the embed shrinks to a 280×158px PiP in the bottom-right. Click navigates back. Implemented via `useEffect` imperatively overriding styles on `callEmbedRef.current` — cannot use a wrapper div because `useCallEmbedPlacementSync` writes `top/left/width/height` directly onto that element | | **Screenshare fullscreen** | `src/app/features/call/CallControls.tsx`, `src/app/features/call/Controls.tsx` | When screensharing, a fullscreen button appears in call controls. Calls `callEmbedRef.current?.requestFullscreen()` on the Cinny call container. EC naturally spotlights the screenshare — the old 600ms grid-revert code was removed (it caused fullscreen to show avatars instead of the screen) | @@ -770,7 +784,7 @@ All commands use the `!` prefix. Run `!help` in any room for the full list. | Webhook bridge | matrix-hookshot | 7.3.2 | | Reverse proxy | Nginx Proxy Manager | — | | Web client | Lotus Cinny (fork of `cinnyapp/cinny` main) | custom | -| Element Call embed | `@element-hq/element-call-embedded` | 0.20.1 | +| Element Call embed | `@lotusguild/element-call-embedded` (self-built fork of `element-hq/element-call`) | 0.20.1-lotus.1 | | GIF picker | Giphy JS/React SDK (`@giphy/react-components`) | — | | Auto-deploy | adnanh/webhook | 2.8.0 | | Bot language | Python 3 | 3.x | diff --git a/deploy/lxc151-hookshot.sh b/deploy/lxc151-hookshot.sh index 2f392a2..b03c6cf 100644 --- a/deploy/lxc151-hookshot.sh +++ b/deploy/lxc151-hookshot.sh @@ -1,6 +1,7 @@ #!/bin/bash # Auto-deploy script for LXC 151 (matrix homeserver) -# Handles: hookshot transformation functions, livekit service file (graceful), matrixbot +# Handles: hookshot transformation functions, livekit service file (graceful), +# voice-limit-guard (livekit policy sidecar), matrixbot # Triggered by: Gitea webhook on push to main set -euo pipefail @@ -46,6 +47,48 @@ else echo "Restart pending — will apply when no active calls." fi + # Voice-limit / quality guard (fronts lk-jwt-service on :8070) + if echo "$CHANGED" | grep -qE '^livekit/voice-limit-guard\.py|^systemd/voice-limit-guard\.service'; then + echo "Deploying voice-limit-guard..." + # Validate syntax BEFORE swapping so a broken edit can never wedge token + # issuance (a syntax error would crash the guard and block all joins). + if python3 -m py_compile "$REPO_DIR/livekit/voice-limit-guard.py"; then + GUARD_DST="/opt/voice-limit-guard/voice-limit-guard.py" + GUARD_BAK="/opt/voice-limit-guard/voice-limit-guard.py.bak" + # Back up the last-known-good guard so a runtime-broken (but + # syntactically valid) deploy can self-heal — a dead guard breaks + # ALL new joins, so we must never leave it down. + [ -f "$GUARD_DST" ] && cp "$GUARD_DST" "$GUARD_BAK" + install -D -m644 "$REPO_DIR/livekit/voice-limit-guard.py" "$GUARD_DST" + if echo "$CHANGED" | grep -q '^systemd/voice-limit-guard\.service'; then + install -m644 "$REPO_DIR/systemd/voice-limit-guard.service" /etc/systemd/system/voice-limit-guard.service + systemctl daemon-reload + fi + # Restarting the guard only affects joins in a ~1s window (established + # calls talk directly to livekit-server); it does not drop calls. + # `|| true` so a non-zero restart can't abort the deploy under set -e. + systemctl restart voice-limit-guard || true + sleep 1 + if systemctl is-active --quiet voice-limit-guard; then + echo "voice-limit-guard restarted successfully." + elif [ -f "$GUARD_BAK" ]; then + echo "ERROR: new voice-limit-guard failed to start — rolling back to last-known-good." + cp "$GUARD_BAK" "$GUARD_DST" + systemctl restart voice-limit-guard || true + sleep 1 + if systemctl is-active --quiet voice-limit-guard; then + echo "voice-limit-guard rolled back and running." + else + echo "CRITICAL: voice-limit-guard down after rollback — manual intervention needed." + fi + else + echo "CRITICAL: voice-limit-guard failed to start and no backup to roll back to." + fi + else + echo "ERROR: py_compile failed on voice-limit-guard.py — skipping deploy." + fi + fi + # Matrixbot source files if echo "$CHANGED" | grep -q '^matrixbot/'; then echo "Deploying matrixbot changes..." diff --git a/livekit/test_voice_limit_guard.py b/livekit/test_voice_limit_guard.py new file mode 100644 index 0000000..af60862 --- /dev/null +++ b/livekit/test_voice_limit_guard.py @@ -0,0 +1,364 @@ +#!/usr/bin/env python3 +"""Unit tests for voice-limit-guard pure logic + JWT re-sign roundtrip. + +Run: python3 -m unittest livekit/test_voice_limit_guard.py (from repo root) +The module has a hyphenated filename, so it's loaded via importlib. +""" + +import hashlib +import hmac +import importlib.util +import urllib.error +import json +import os +import unittest + +_HERE = os.path.dirname(os.path.abspath(__file__)) +_spec = importlib.util.spec_from_file_location( + "voice_limit_guard", os.path.join(_HERE, "voice-limit-guard.py") +) +guard = importlib.util.module_from_spec(_spec) +_spec.loader.exec_module(guard) + + +def make_jwt(secret: str, payload: dict) -> str: + header_b64 = guard.b64url(json.dumps({"alg": "HS256", "typ": "JWT"}).encode()) + payload_b64 = guard.b64url(json.dumps(payload).encode()) + signing_input = f"{header_b64}.{payload_b64}".encode() + sig = guard.b64url(hmac.new(secret.encode(), signing_input, hashlib.sha256).digest()) + return f"{header_b64}.{payload_b64}.{sig}" + + +def verify_jwt(secret: str, token: str) -> bool: + header_b64, payload_b64, sig = token.split(".") + expected = guard.b64url( + hmac.new(secret.encode(), f"{header_b64}.{payload_b64}".encode(), hashlib.sha256).digest() + ) + return hmac.compare_digest(expected, sig) + + +class TestShouldBlock(unittest.TestCase): + def test_no_limit_never_blocks(self): + self.assertFalse(guard.should_block(0, {"@a:x", "@b:x"}, "@c:x")) + + def test_rejoin_never_blocks(self): + self.assertFalse(guard.should_block(2, {"@a:x", "@b:x"}, "@a:x")) + + def test_blocks_at_capacity_for_new_user(self): + self.assertTrue(guard.should_block(2, {"@a:x", "@b:x"}, "@c:x")) + + def test_allows_below_capacity(self): + self.assertFalse(guard.should_block(3, {"@a:x", "@b:x"}, "@c:x")) + + +class TestMatrixUser(unittest.TestCase): + def test_strips_device(self): + self.assertEqual(guard.matrix_user("@bob:example.org:DEVICEID"), "@bob:example.org") + + def test_plain_user(self): + self.assertEqual(guard.matrix_user("@bob:example.org"), "@bob:example.org") + + def test_non_matrix_identity_unchanged(self): + self.assertEqual(guard.matrix_user("hashedfederatedid"), "hashedfederatedid") + + +class TestAllowedSources(unittest.TestCase): + def test_all_allowed_returns_none(self): + self.assertIsNone(guard.allowed_sources({})) + self.assertIsNone( + guard.allowed_sources({"allow_camera": True, "allow_screenshare": True}) + ) + + def test_no_screenshare_drops_screen_sources_keeps_mic_cam(self): + self.assertEqual( + guard.allowed_sources({"allow_screenshare": False}), + ["microphone", "camera"], + ) + + def test_audio_only_keeps_mic_only(self): + self.assertEqual( + guard.allowed_sources({"allow_screenshare": False, "allow_camera": False}), + ["microphone"], + ) + + def test_no_camera_keeps_mic_and_screenshare(self): + self.assertEqual( + guard.allowed_sources({"allow_camera": False}), + ["microphone", "screen_share", "screen_share_audio"], + ) + + +class TestResignJwt(unittest.TestCase): + SECRET = "test-livekit-secret" + + def _make(self): + return make_jwt( + self.SECRET, + { + "iss": "APIkey", + "sub": "@alice:example.org:DEV1", + "nbf": 1000, + "exp": 5000, + "video": {"roomJoin": True, "room": "hashed-alias", "canPublish": True, + "canSubscribe": True}, + }, + ) + + def test_resigned_token_verifies_with_same_secret(self): + token = self._make() + new = guard.resign_jwt(token, self.SECRET, ["microphone", "camera"]) + self.assertTrue(verify_jwt(self.SECRET, new)) + + def test_sets_sources_and_preserves_identity_and_room(self): + token = self._make() + new = guard.resign_jwt(token, self.SECRET, ["microphone"]) + claims = guard.jwt_payload(new) + self.assertEqual(claims["video"]["canPublishSources"], ["microphone"]) + # Everything else preserved. + self.assertEqual(claims["sub"], "@alice:example.org:DEV1") + self.assertEqual(claims["video"]["room"], "hashed-alias") + self.assertEqual(claims["exp"], 5000) + self.assertTrue(claims["video"]["canPublish"]) + + def test_preserves_original_header_segment(self): + token = self._make() + new = guard.resign_jwt(token, self.SECRET, ["microphone"]) + self.assertEqual(token.split(".")[0], new.split(".")[0]) + + def test_raises_without_video_grant(self): + token = make_jwt(self.SECRET, {"sub": "@x:y", "exp": 1}) + with self.assertRaises(ValueError): + guard.resign_jwt(token, self.SECRET, ["microphone"]) + + def test_tampering_detectable(self): + # A token re-signed with the WRONG secret must not verify with the real one. + token = self._make() + forged = guard.resign_jwt(token, "wrong-secret", ["microphone"]) + self.assertFalse(verify_jwt(self.SECRET, forged)) + + +class TestVerifyJwtSig(unittest.TestCase): + SECRET = "shared-livekit-secret" + + def _token(self, secret): + return make_jwt(secret, {"sub": "@a:b:D", "exp": 9, "video": {"room": "r"}}) + + def test_verifies_token_signed_with_same_secret(self): + # Guard's own secret signed the token -> safe to re-sign. + self.assertTrue(guard.verify_jwt_sig(self._token(self.SECRET), self.SECRET)) + + def test_rejects_token_signed_with_different_secret(self): + # Secret drift: lk-jwt-service used a different key. Re-signing would + # produce a token the SFU rejects, so the guard must detect this and + # skip the restriction (fail open) instead. + self.assertFalse(guard.verify_jwt_sig(self._token("lk-jwt-secret"), self.SECRET)) + + def test_malformed_token_returns_false(self): + self.assertFalse(guard.verify_jwt_sig("not-a-jwt", self.SECRET)) + self.assertFalse(guard.verify_jwt_sig("", self.SECRET)) + + +class TestRoomIdFromRequest(unittest.TestCase): + def test_legacy_sfu_get_reads_room(self): + self.assertEqual(guard.room_id_from_request("/sfu/get", {"room": "!a:x"}), "!a:x") + + def test_new_get_token_reads_room_id(self): + self.assertEqual(guard.room_id_from_request("/get_token", {"room_id": "!b:x"}), "!b:x") + + def test_both_keys_uses_endpoint_field_not_the_other(self): + # A client sending both keys must not get the wrong room's policy: each + # endpoint reads only its own field (matching lk-jwt-service). + both = {"room": "!lax:x", "room_id": "!restricted:x"} + self.assertEqual(guard.room_id_from_request("/sfu/get", both), "!lax:x") + self.assertEqual(guard.room_id_from_request("/get_token", both), "!restricted:x") + + def test_missing_field_is_empty(self): + self.assertEqual(guard.room_id_from_request("/get_token", {"room": "!a:x"}), "") + + +class TestForbiddenSources(unittest.TestCase): + def test_none_forbidden_when_all_allowed(self): + self.assertEqual(guard.forbidden_sources({}), set()) + self.assertEqual( + guard.forbidden_sources({"allow_camera": True, "allow_screenshare": True}), set() + ) + + def test_screenshare_forbidden(self): + self.assertEqual( + guard.forbidden_sources({"allow_screenshare": False}), + {"SCREEN_SHARE", "SCREEN_SHARE_AUDIO"}, + ) + + def test_audio_only_forbids_cam_and_screen(self): + self.assertEqual( + guard.forbidden_sources({"allow_screenshare": False, "allow_camera": False}), + {"SCREEN_SHARE", "SCREEN_SHARE_AUDIO", "CAMERA"}, + ) + + +class TestReconcilePublishSources(unittest.TestCase): + SS = {"SCREEN_SHARE", "SCREEN_SHARE_AUDIO"} + + def test_empty_current_means_all_allowed_so_gets_narrowed(self): + # [] = all allowed; forbidding screenshare must produce the explicit + # non-screenshare list (never an empty list, which LK reads as "all"). + result = guard.reconcile_publish_sources([], self.SS) + self.assertEqual(result, ["CAMERA", "MICROPHONE"]) + + def test_compliant_when_no_forbidden_present(self): + self.assertIsNone(guard.reconcile_publish_sources(["CAMERA", "MICROPHONE"], self.SS)) + + def test_removes_only_forbidden_never_grants(self): + result = guard.reconcile_publish_sources(["MICROPHONE", "SCREEN_SHARE"], self.SS) + self.assertEqual(result, ["MICROPHONE"]) + + def test_never_widens_a_narrow_set(self): + # Participant only had mic; forbidding screenshare leaves mic — camera is + # NOT granted. + self.assertIsNone(guard.reconcile_publish_sources(["MICROPHONE"], self.SS)) + + def test_empty_result_signals_disable_publish(self): + # If the only source they had is now forbidden, the result is [] so the + # caller sets canPublish=False (not an empty allow-list). + result = guard.reconcile_publish_sources(["SCREEN_SHARE"], self.SS) + self.assertEqual(result, []) + + +class TestReconcileParticipant(unittest.TestCase): + def setUp(self): + self.calls = [] + self._orig = guard.livekit_update_participant + guard.livekit_update_participant = lambda a, i, p: self.calls.append((a, i, p)) + + def tearDown(self): + guard.livekit_update_participant = self._orig + + def test_skips_non_publisher(self): + p = {"identity": "u", "permission": {"canPublish": False}} + self.assertFalse(guard.reconcile_participant("room", p, {"SCREEN_SHARE"})) + self.assertEqual(self.calls, []) + + def test_skips_compliant_publisher(self): + p = { + "identity": "u", + "permission": {"canPublish": True, "canPublishSources": ["MICROPHONE", "CAMERA"]}, + } + self.assertFalse(guard.reconcile_participant("room", p, {"SCREEN_SHARE"})) + self.assertEqual(self.calls, []) + + def test_revokes_and_preserves_other_permission_flags(self): + p = { + "identity": "@a:b:D", + "permission": { + "canPublish": True, + "canSubscribe": True, + "canPublishData": True, + "canPublishSources": ["MICROPHONE", "SCREEN_SHARE"], + }, + } + self.assertTrue(guard.reconcile_participant("room", p, {"SCREEN_SHARE", "SCREEN_SHARE_AUDIO"})) + self.assertEqual(len(self.calls), 1) + _alias, identity, perm = self.calls[0] + self.assertEqual(identity, "@a:b:D") + self.assertEqual(perm["canPublishSources"], ["MICROPHONE"]) + self.assertTrue(perm["canPublish"]) + # Other flags preserved (full-replace safety). + self.assertTrue(perm["canSubscribe"]) + self.assertTrue(perm["canPublishData"]) + + def test_disables_publish_when_no_source_remains(self): + p = {"identity": "u", "permission": {"canPublish": True, "canPublishSources": ["SCREEN_SHARE"]}} + guard.reconcile_participant("room", p, {"SCREEN_SHARE", "SCREEN_SHARE_AUDIO"}) + _a, _i, perm = self.calls[0] + self.assertFalse(perm["canPublish"]) + + +class TestReconcileRoom(unittest.TestCase): + """End-to-end reconcile_room with LiveKit + Synapse mocked.""" + + def setUp(self): + self._orig_state = guard.room_state + self._orig_list = guard.livekit_list_participants + self._orig_update = guard.livekit_update_participant + self.updates = [] + guard.livekit_update_participant = lambda a, i, p: self.updates.append((i, p)) + guard._alias_to_room.clear() + + def tearDown(self): + guard.room_state = self._orig_state + guard.livekit_list_participants = self._orig_list + guard.livekit_update_participant = self._orig_update + guard._alias_to_room.clear() + + def test_unrestricted_room_touches_nothing(self): + guard.room_state = lambda rid, max_age=0: {"allow_screenshare": True, "allow_camera": True} + guard.livekit_list_participants = lambda a: (_ for _ in ()).throw( + AssertionError("should not list participants when unrestricted") + ) + guard.reconcile_room("alias", "!room:x") + self.assertEqual(self.updates, []) + + def test_screenshare_forbidden_revokes_the_sharer(self): + guard.room_state = lambda rid, max_age=0: {"allow_screenshare": False, "allow_camera": True} + guard.livekit_list_participants = lambda a: [ + {"identity": "sharer", "permission": {"canPublish": True, + "canPublishSources": ["MICROPHONE", "SCREEN_SHARE"]}}, + {"identity": "listener", "permission": {"canPublish": True, + "canPublishSources": ["MICROPHONE"]}}, + ] + guard.reconcile_room("alias", "!room:x") + self.assertEqual(len(self.updates), 1) + self.assertEqual(self.updates[0][0], "sharer") + self.assertEqual(self.updates[0][1]["canPublishSources"], ["MICROPHONE"]) + + def test_empty_room_is_NOT_forgotten(self): + # An empty read may be transient (room persists until empty_timeout); only + # a 404 prunes, so mid-call enforcement isn't dropped on a race. + guard._alias_to_room["alias"] = "!room:x" + guard.room_state = lambda rid, max_age=0: {"allow_screenshare": False, "allow_camera": True} + guard.livekit_list_participants = lambda a: [] + guard.reconcile_room("alias", "!room:x") + self.assertIn("alias", guard._alias_to_room) + + def test_room_gone_404_is_forgotten(self): + guard._alias_to_room["alias"] = "!room:x" + guard.room_state = lambda rid, max_age=0: {"allow_screenshare": False, "allow_camera": True} + + def _raise_404(_alias): + raise urllib.error.HTTPError("u", 404, "not found", {}, None) + + guard.livekit_list_participants = _raise_404 + guard.reconcile_room("alias", "!room:x") + self.assertNotIn("alias", guard._alias_to_room) + + +class TestRoomStateParsing(unittest.TestCase): + """allow_* only forbids on an explicit False; absent/other stays permissive.""" + + def _policy(self, content): + # Emulate the parsing block in room_state without hitting Synapse. + return { + "allow_screenshare": content.get("allow_screenshare", True) is not False, + "allow_camera": content.get("allow_camera", True) is not False, + } + + def test_absent_is_allowed(self): + self.assertEqual( + self._policy({}), {"allow_screenshare": True, "allow_camera": True} + ) + + def test_explicit_false_forbids(self): + self.assertEqual( + self._policy({"allow_screenshare": False}), + {"allow_screenshare": False, "allow_camera": True}, + ) + + def test_non_bool_stays_permissive(self): + self.assertEqual( + self._policy({"allow_screenshare": "no"}), + {"allow_screenshare": True, "allow_camera": True}, + ) + + +if __name__ == "__main__": + unittest.main() diff --git a/livekit/voice-limit-guard.py b/livekit/voice-limit-guard.py index 0979c2b..15e52a4 100644 --- a/livekit/voice-limit-guard.py +++ b/livekit/voice-limit-guard.py @@ -1,26 +1,47 @@ #!/usr/bin/env python3 """ -voice-limit-guard — hard, cross-client voice channel participant limits. +voice-limit-guard — hard, cross-client LiveKit room policy at token issue. Sits in front of lk-jwt-service (the LiveKit MatrixRTC JWT issuer). Every Matrix client (Element, FluffyChat, Lotus Chat, ...) must obtain a token from -this service before it can join a LiveKit room, so refusing the token here is a -hard block that applies to ALL clients — not just our own. +this service before it can join a LiveKit room, so decisions made here are HARD +and apply to ALL clients — not just our own. + +Two per-room policies are enforced, both read from Matrix room state via the +Synapse admin API (cached briefly): + + 1. Participant limit (`io.lotus.voice_limit` -> max_users). If the room is + full, the token is REFUSED (403), so the client cannot join. + + 2. Publish-source policy (`io.lotus.room_quality` -> allow_screenshare / + allow_camera). LiveKit is a pure SFU and CANNOT cap a publisher's bitrate/ + framerate server-side (no such field exists in the grant, config, or admin + API — that stays a client-cooperative setting). But the JWT's + `video.canPublishSources` IS enforced by the SFU for every client, so we + can hard-block screenshare and/or camera per room. Because this guard holds + the LiveKit signing secret, it decodes the issued token, drops the + forbidden sources, and re-signs it before returning. + + Enforced in TWO places so a policy applies both to new AND existing calls: + - at token issue (above), for anyone joining/rejoining, and + - LIVE, by a background reconcile loop (every GUARD_RECONCILE_INTERVAL s) + that calls LiveKit `UpdateParticipant` to narrow `canPublishSources` + for participants who were already connected when the policy changed — + which unpublishes their forbidden live track server-side for all + clients and blocks re-publish. It only ever REMOVES forbidden sources + (never grants), and no-ops once a room is compliant. Flow for token requests (POST /sfu/get and legacy POST /get_token): - 1. Read the request body and extract the Matrix `room` id (clients send the - *raw* room id here; lk-jwt-service maps it to the hashed LiveKit alias). - 2. Look up `io.lotus.voice_limit` -> max_users for that room via the Synapse - admin API (cached briefly). 0 / absent => no limit. - 3. Forward the request to lk-jwt-service unchanged and capture its response. - 4. If a limit applies and the token was issued (HTTP 200), decode the JWT to - read the LiveKit alias (`video.room`) and the requester identity (`sub`), - then ask LiveKit how many distinct Matrix users are currently in the room. - - requester already present (rejoin / extra device) -> allow - - distinct users >= limit -> 403 (blocked) - - otherwise -> allow - 5. Anything that goes wrong in steps 2-4 FAILS OPEN: the upstream response is - returned unchanged, so calls keep working even if this guard is degraded. + 1. Read the request body and extract the Matrix room id (`room` on the legacy + endpoint, `room_id` on the newer one). + 2. Forward the request to lk-jwt-service unchanged and capture its response. + 3. If a token was issued (HTTP 200), look up the room policy: + - over the participant limit -> 403 (blocked). + - a publish-source restriction applies -> re-sign the JWT with a + narrowed `video.canPublishSources`. + Otherwise pass the token through untouched. + 4. Anything that goes wrong FAILS OPEN: the upstream response is returned + unchanged, so calls keep working even if this guard is degraded. All other requests (OPTIONS preflight, GET, unknown paths) are proxied transparently so CORS and health behaviour match lk-jwt-service exactly. @@ -47,9 +68,28 @@ LIVEKIT_KEY = os.environ.get("LIVEKIT_KEY", "") LIVEKIT_SECRET = os.environ.get("LIVEKIT_SECRET", "") SYNAPSE_API = os.environ.get("SYNAPSE_API", "http://127.0.0.1:8008").rstrip("/") MATRIX_TOKEN = os.environ.get("MATRIX_TOKEN", "") +# Live (mid-call) enforcement: a background loop that revokes forbidden sources +# from participants who joined BEFORE a policy tightened. New joins are already +# handled by the JWT re-sign at issue time; this closes the mid-call-flip gap. +RECONCILE_ENABLED = os.environ.get("GUARD_RECONCILE", "1") not in ("0", "false", "") +# Floor the interval so a misconfigured 0 can't busy-loop the admin/Synapse APIs. +RECONCILE_INTERVAL = max(0.5, float(os.environ.get("GUARD_RECONCILE_INTERVAL", "3.0"))) TOKEN_PATHS = ("/sfu/get", "/get_token") LIMIT_STATE_TYPE = "io.lotus.voice_limit" +QUALITY_STATE_TYPE = "io.lotus.room_quality" +# JWT grant source keys (lowercase — livekit/protocol auth.VideoGrant.canPublishSources). +SOURCE_MIC = "microphone" +SOURCE_CAM = "camera" +SOURCE_SCREEN = "screen_share" +SOURCE_SCREEN_AUDIO = "screen_share_audio" +# LiveKit admin-API / ListParticipants source enum names (UPPERCASE — protojson +# of livekit.TrackSource). Distinct from the JWT keys above. +API_SOURCE_CAM = "CAMERA" +API_SOURCE_MIC = "MICROPHONE" +API_SOURCE_SCREEN = "SCREEN_SHARE" +API_SOURCE_SCREEN_AUDIO = "SCREEN_SHARE_AUDIO" +ALL_API_SOURCES = (API_SOURCE_CAM, API_SOURCE_MIC, API_SOURCE_SCREEN, API_SOURCE_SCREEN_AUDIO) CORS_HEADERS = { "Access-Control-Allow-Origin": "*", "Access-Control-Allow-Methods": "POST", @@ -67,16 +107,30 @@ def b64url(data: bytes) -> str: return base64.urlsafe_b64encode(data).rstrip(b"=").decode() +def b64url_decode(seg: str) -> bytes: + """Decode a base64url segment that may have had its `=` padding stripped.""" + seg += "=" * (-len(seg) % 4) + return base64.urlsafe_b64decode(seg) + + def jwt_payload(token: str): """Best-effort decode of a JWT payload without verification.""" try: - payload_b64 = token.split(".")[1] - payload_b64 += "=" * (-len(payload_b64) % 4) - return json.loads(base64.urlsafe_b64decode(payload_b64)) + return json.loads(b64url_decode(token.split(".")[1])) except Exception: return None +def room_id_from_request(path: str, data: dict) -> str: + """The room id field lk-jwt-service reads for this endpoint. Endpoint-specific + (`/get_token` -> room_id, `/sfu/get` -> room) so a client sending BOTH keys + can't make us enforce a different room's policy than the token is minted for. + """ + if path == "/get_token": + return data.get("room_id") or "" + return data.get("room") or "" + + def should_block(limit: int, present_users: set, requester: str) -> bool: """Pure decision: should this token be refused? @@ -91,6 +145,63 @@ def should_block(limit: int, present_users: set, requester: str) -> bool: return len(present_users) >= limit +def allowed_sources(policy: dict): + """Pure decision: the narrowed `canPublishSources` list, or None. + + Microphone is ALWAYS allowed (this is a voice call). Camera and screenshare + are dropped when the room policy forbids them. Returns None when everything + is allowed, so the caller can skip re-signing the token entirely. + """ + allow_cam = policy.get("allow_camera", True) + allow_screen = policy.get("allow_screenshare", True) + if allow_cam and allow_screen: + return None + sources = [SOURCE_MIC] + if allow_cam: + sources.append(SOURCE_CAM) + if allow_screen: + sources.extend((SOURCE_SCREEN, SOURCE_SCREEN_AUDIO)) + return sources + + +def verify_jwt_sig(token: str, secret: str) -> bool: + """True if `token` is HS256-signed with `secret`. + + Used to confirm OUR LiveKit secret actually signed the token lk-jwt-service + issued before we re-sign it. If the secrets have drifted, re-signing would + mint a token the SFU rejects (a fail-CLOSED break), so the caller skips the + restriction and passes the original token through instead. + """ + try: + header_b64, payload_b64, sig = token.split(".") + expected = b64url( + hmac.new(secret.encode(), f"{header_b64}.{payload_b64}".encode(), hashlib.sha256).digest() + ) + return hmac.compare_digest(expected, sig) + except Exception: + return False + + +def resign_jwt(token: str, secret: str, sources: list) -> str: + """Re-sign the issued JWT with a restricted `video.canPublishSources`. + + Preserves the original JWT header segment and every claim except + `video.canPublishSources` (so `sub`, `video.room`, `exp`, etc. are + unchanged), then HS256-signs with the shared LiveKit secret. lk-jwt-service + signs with the same key, so the SFU accepts our signature identically. + """ + header_b64, payload_b64, _sig = token.split(".") + payload = json.loads(b64url_decode(payload_b64)) + video = payload.get("video") + if not isinstance(video, dict): + raise ValueError("issued token has no video grant") + video["canPublishSources"] = sources + new_payload_b64 = b64url(json.dumps(payload, separators=(",", ":")).encode()) + signing_input = f"{header_b64}.{new_payload_b64}".encode() + sig = b64url(hmac.new(secret.encode(), signing_input, hashlib.sha256).digest()) + return f"{header_b64}.{new_payload_b64}.{sig}" + + def matrix_user(identity: str) -> str: """Reduce a LiveKit identity (`@user:domain:DEVICE`) to `@user:domain`. @@ -127,11 +238,11 @@ def livekit_admin_token(room: str) -> str: return f"{header}.{payload}.{sig}" -def livekit_present_users(alias: str): - """Return the set of distinct Matrix users currently in the LiveKit room.""" +def _livekit_admin_post(alias: str, method: str, body: dict) -> bytes: + """POST to a LiveKit RoomService Twirp method with a fresh admin token.""" req = urllib.request.Request( - f"{LIVEKIT_API}/twirp/livekit.RoomService/ListParticipants", - data=json.dumps({"room": alias}).encode(), + f"{LIVEKIT_API}/twirp/livekit.RoomService/{method}", + data=json.dumps(body).encode(), headers={ "Authorization": "Bearer " + livekit_admin_token(alias), "Content-Type": "application/json", @@ -139,25 +250,70 @@ def livekit_present_users(alias: str): method="POST", ) with urllib.request.urlopen(req, timeout=5) as resp: - data = json.loads(resp.read()) - return {matrix_user(p.get("identity", "")) for p in data.get("participants", [])} + return resp.read() -# --- per-room limit lookup (cached) ------------------------------------------ - -_limit_cache = {} # room_id -> (max_users, expiry_epoch) -_limit_cache_lock = threading.Lock() -_LIMIT_TTL = 10.0 +def livekit_list_participants(alias: str) -> list: + """Return the raw list of ParticipantInfo dicts in the LiveKit room.""" + data = json.loads(_livekit_admin_post(alias, "ListParticipants", {"room": alias})) + parts = data.get("participants", []) + return parts if isinstance(parts, list) else [] -def room_limit(room_id: str) -> int: +def livekit_present_users(alias: str): + """Return the set of distinct Matrix users currently in the LiveKit room.""" + return {matrix_user(p.get("identity", "")) for p in livekit_list_participants(alias)} + + +def livekit_update_participant(alias: str, identity: str, permission: dict) -> None: + """Replace a participant's ParticipantPermission (livekit UpdateParticipant). + + Narrowing `canPublishSources` unpublishes the participant's live tracks of + the removed sources server-side (for ALL clients) and blocks re-publish. + A participant who has already left yields a Twirp not_found (HTTP 404), + which is benign here. + """ + try: + _livekit_admin_post( + alias, + "UpdateParticipant", + {"room": alias, "identity": identity, "permission": permission}, + ) + except urllib.error.HTTPError as exc: + if exc.code != 404: + log(f"update_participant {identity} in {alias} -> HTTP {exc.code}") + + +# --- per-room policy lookup (cached) ----------------------------------------- + +# room_id -> (state_dict, fetched_at_epoch). One Synapse /state fetch serves +# both the participant-limit and publish-policy checks. Callers pass max_age so +# the token path can tolerate 10s staleness while the reconcile loop reads fresh. +_state_cache = {} +_state_cache_lock = threading.Lock() +_STATE_TTL = 10.0 + +_DEFAULT_STATE = {"max_users": 0, "allow_screenshare": True, "allow_camera": True} + + +def room_state(room_id: str, max_age: float = _STATE_TTL) -> dict: + """Fetch the room's Lotus policy (`io.lotus.voice_limit` + + `io.lotus.room_quality`) from Synapse admin state, cached briefly. + + `max_age` is the oldest cached value the caller will accept (seconds). The + token path uses the default 10s (dedupes Element Call's per-join burst); the + reconcile loop passes a small value so a policy change is seen quickly. + + Fails OPEN: on any error returns the permissive defaults (no limit, all + sources allowed) so a Synapse hiccup never blocks or restricts a call. + """ now = time.time() - with _limit_cache_lock: - cached = _limit_cache.get(room_id) - if cached and cached[1] > now: + with _state_cache_lock: + cached = _state_cache.get(room_id) + if cached and (now - cached[1]) < max_age: return cached[0] - limit = 0 + state = dict(_DEFAULT_STATE) try: url = ( f"{SYNAPSE_API}/_synapse/admin/v1/rooms/" @@ -167,17 +323,154 @@ def room_limit(room_id: str) -> int: with urllib.request.urlopen(req, timeout=5) as resp: events = json.loads(resp.read()).get("state", []) for ev in events: - if ev.get("type") == LIMIT_STATE_TYPE and ev.get("state_key", "") == "": - limit = int(ev.get("content", {}).get("max_users", 0) or 0) - break + if ev.get("state_key", "") != "": + continue + etype = ev.get("type") + content = ev.get("content", {}) or {} + if etype == LIMIT_STATE_TYPE: + state["max_users"] = int(content.get("max_users", 0) or 0) + elif etype == QUALITY_STATE_TYPE: + # Only an explicit `false` forbids; absent/other -> allowed. + state["allow_screenshare"] = content.get("allow_screenshare", True) is not False + state["allow_camera"] = content.get("allow_camera", True) is not False except Exception as exc: - # Fail open: treat lookup failure as "no limit". - log(f"limit lookup failed for {room_id}: {exc}") - return 0 + log(f"room state lookup failed for {room_id}: {exc}") + return dict(_DEFAULT_STATE) - with _limit_cache_lock: - _limit_cache[room_id] = (limit, now + _LIMIT_TTL) - return limit + with _state_cache_lock: + # Opportunistically drop stale entries so a long-lived server that sees + # many distinct rooms doesn't grow the cache without bound. + if len(_state_cache) > 512: + for key in [k for k, v in _state_cache.items() if (now - v[1]) >= _STATE_TTL]: + del _state_cache[key] + _state_cache[room_id] = (state, now) + return state + + +# --- live (mid-call) enforcement: reconcile loop ----------------------------- + +# LiveKit hashed-alias -> Matrix room id, learned at token issue. Lets the +# reconcile loop map an active LiveKit room back to its Matrix policy even for +# participants who joined before a policy change. +_alias_to_room = {} +_alias_lock = threading.Lock() + + +def remember_room(alias: str, room_id: str) -> None: + # No consumer when the reconcile loop is off, so don't accumulate the map. + if not RECONCILE_ENABLED or not alias or not room_id: + return + with _alias_lock: + # Soft cap; the reconcile loop prunes ended rooms each sweep. + if len(_alias_to_room) > 4096 and alias not in _alias_to_room: + return + _alias_to_room[alias] = room_id + + +def forget_room(alias: str) -> None: + with _alias_lock: + _alias_to_room.pop(alias, None) + + +def forbidden_sources(policy: dict) -> set: + """Pure: the set of UPPERCASE API source names this room forbids (may be + empty). Only an explicit False forbids.""" + forbidden = set() + if policy.get("allow_screenshare", True) is False: + forbidden.update((API_SOURCE_SCREEN, API_SOURCE_SCREEN_AUDIO)) + if policy.get("allow_camera", True) is False: + forbidden.add(API_SOURCE_CAM) + return forbidden + + +def reconcile_publish_sources(current, forbidden: set): + """Pure: given a participant's current `canPublishSources` (UPPERCASE list; + [] means ALL allowed) and the forbidden set, return the new sources list to + enforce, or None if the participant is already compliant. + + Never returns a wider set than `current` (we only REMOVE forbidden sources, + never grant). An empty return means "no permitted source remains" — the + caller should set canPublish=False instead of sending an empty list (which + LiveKit reads as 'all allowed'). + """ + effective = set(current) if current else set(ALL_API_SOURCES) + if effective.isdisjoint(forbidden): + return None # nothing forbidden is present -> already compliant + return sorted(effective - forbidden) + + +def reconcile_participant(alias: str, participant: dict, forbidden: set) -> bool: + """Enforce the forbidden-source policy on one live participant. Returns True + if an UpdateParticipant call was issued.""" + perm = participant.get("permission") or {} + if not perm.get("canPublish", False): + return False # publishes nothing -> nothing to revoke + current = perm.get("canPublishSources") or [] + desired = reconcile_publish_sources(current, forbidden) + if desired is None: + return False # already compliant + identity = participant.get("identity") + if not identity: + return False + # Full REPLACE: copy the existing permission and change only the publish + # fields, so canSubscribe / canPublishData / etc. are preserved. + new_perm = dict(perm) + if desired: + new_perm["canPublish"] = True + new_perm["canPublishSources"] = desired + else: + new_perm["canPublish"] = False + livekit_update_participant(alias, identity, new_perm) + log(f"revoked forbidden sources from {identity} in {alias}: keep {desired or '[]'}") + return True + + +def reconcile_room(alias: str, room_id: str) -> None: + """Enforce the current publish policy on every participant in one room. + Drops the room from the alias map when it is no longer active.""" + # Read the policy fresh-ish so a flip is picked up within ~one interval. + policy = room_state(room_id, max_age=RECONCILE_INTERVAL) + forbidden = forbidden_sources(policy) + if not forbidden: + return # nothing restricted; leave permissions untouched + try: + participants = livekit_list_participants(alias) + except urllib.error.HTTPError as exc: + if exc.code == 404: + forget_room(alias) # room gone + else: + log(f"reconcile list_participants {alias} -> HTTP {exc.code}") + return + except Exception as exc: + log(f"reconcile list_participants error for {alias}: {exc}") + return + if not participants: + # Empty, but the room may still exist (LiveKit keeps it until + # empty_timeout). Don't forget on an empty read — only a 404 (room gone) + # prunes — so a transient empty/race can't drop mid-call enforcement. + return + for p in participants: + try: + reconcile_participant(alias, p, forbidden) + except Exception as exc: + log(f"reconcile participant error in {alias}: {exc}") + + +def reconcile_all() -> None: + with _alias_lock: + items = list(_alias_to_room.items()) + for alias, room_id in items: + reconcile_room(alias, room_id) + + +def reconcile_loop() -> None: + log(f"reconcile loop started (interval {RECONCILE_INTERVAL}s)") + while True: + time.sleep(RECONCILE_INTERVAL) + try: + reconcile_all() + except Exception as exc: # never let the loop die + log(f"reconcile sweep error: {exc}") # --- HTTP handler ------------------------------------------------------------ @@ -219,7 +512,16 @@ class Handler(BaseHTTPRequestHandler): self.send_response(status) # send_response() already emits Server and Date; relaying them too would # produce duplicates. Content-Length is recomputed below. - skip = ("transfer-encoding", "content-length", "connection", "date", "server") + # content-encoding is stripped because we may re-serialize the body + # (a plaintext JSON body must never carry a stale gzip/deflate header). + skip = ( + "transfer-encoding", + "content-length", + "content-encoding", + "connection", + "date", + "server", + ) for key, value in headers.items(): if key.lower() in skip: continue @@ -240,44 +542,90 @@ class Handler(BaseHTTPRequestHandler): def _handle(self): body = self._read_body() - # Only token-issuing POSTs are subject to the limit check. + # Only token-issuing POSTs are subject to policy. if not (self.command == "POST" and self.path in TOKEN_PATHS): self._send(*self._proxy(body)) return - # Determine the room and its limit before bothering upstream. + # Extract the room id using the SAME field the endpoint's lk-jwt-service + # handler reads, so a client sending BOTH keys can't make us enforce a + # different room's policy than the token is minted for (bypass vector): + # /get_token (new) -> room_id + # /sfu/get (legacy) -> room try: - room_id = json.loads(body).get("room", "") + room_id = room_id_from_request(self.path, json.loads(body)) except Exception: room_id = "" - limit = room_limit(room_id) if room_id else 0 - status, headers, resp_body = self._proxy(body) - # No limit, or upstream didn't issue a token -> pass through unchanged. - if limit <= 0 or status != 200: + # Only a successfully-issued token can be gated or modified. + if status != 200: + self._send(status, headers, resp_body) + return + if not room_id: + # A token was issued but we couldn't identify the room, so no policy + # is applied. Log it so silent enforcement-loss (e.g. a request + # schema change) is observable rather than invisible. + log("issued token with unparseable room id — no policy enforced") self._send(status, headers, resp_body) return - # Limit applies and a token was issued — decide whether to allow it. + # Decode the issued token once (cheap). Needed for policy enforcement AND + # to learn this room's LiveKit alias for the reconcile map, so a policy + # set AFTER this participant joins can still be enforced on them. If the + # token can't be parsed, fail open. try: payload = json.loads(resp_body) token = payload.get("jwt", "") claims = jwt_payload(token) or {} alias = (claims.get("video") or {}).get("room", "") requester = matrix_user(claims.get("sub", "")) - if not alias: - raise ValueError("no alias in issued token") - - present = livekit_present_users(alias) - if should_block(limit, present, requester): - log(f"blocked {requester or '?'} from {room_id}: {len(present)}/{limit}") - self._send_blocked() - return except Exception as exc: - # Fail open: never break a join because the guard had a problem. - log(f"limit check error for {room_id}: {exc}") + log(f"could not parse issued token for {room_id}: {exc}") + self._send(status, headers, resp_body) + return + + remember_room(alias, room_id) + + state = room_state(room_id) + limit = state["max_users"] + sources = allowed_sources(state) + + # Fast path: nothing to enforce at join time for this room. + if limit <= 0 and sources is None: + self._send(status, headers, resp_body) + return + + # (1) Hard participant limit — refuse the token entirely. Isolated so a + # LiveKit-admin outage here cannot skip the publish-source policy below. + if limit > 0 and alias: + try: + present = livekit_present_users(alias) + if should_block(limit, present, requester): + log(f"blocked {requester or '?'} from {room_id}: {len(present)}/{limit}") + self._send_blocked() + return + except Exception as exc: + # Fail open on the limit only; still apply the policy below. + log(f"limit check error for {room_id}: {exc}") + + # (2) Hard publish-source policy — re-sign with a narrowed + # canPublishSources so the SFU forbids camera/screenshare for ALL clients + # (numeric bitrate/fps caps are not SFU-enforceable). Needs no LiveKit + # call, so it is independent of (1). Verify OUR secret actually signed + # the token first: on a secret mismatch, re-signing would mint a token + # the SFU rejects (fail-CLOSED), so skip and pass the original through. + if sources is not None and token: + try: + if verify_jwt_sig(token, LIVEKIT_SECRET): + payload["jwt"] = resign_jwt(token, LIVEKIT_SECRET, sources) + resp_body = json.dumps(payload).encode() + log(f"restricted publish sources for {requester or '?'} in {room_id}: {sources}") + else: + log(f"LIVEKIT_SECRET mismatch — skipping source policy for {room_id} (fail open)") + except Exception as exc: + log(f"publish-source policy error for {room_id}: {exc}") self._send(status, headers, resp_body) @@ -299,7 +647,14 @@ class GuardServer(ThreadingHTTPServer): def main(): if not (LIVEKIT_KEY and LIVEKIT_SECRET and MATRIX_TOKEN): - log("WARNING: missing LIVEKIT_KEY/LIVEKIT_SECRET/MATRIX_TOKEN — limit checks will fail open") + log("WARNING: missing LIVEKIT_KEY/LIVEKIT_SECRET/MATRIX_TOKEN — checks will fail open") + # Live enforcement loop (revokes forbidden sources from participants who + # joined before a policy change). Daemon thread — never blocks shutdown, and + # its errors can't take down token issuance. + if RECONCILE_ENABLED and LIVEKIT_KEY and LIVEKIT_SECRET and MATRIX_TOKEN: + threading.Thread(target=reconcile_loop, name="reconcile", daemon=True).start() + elif not RECONCILE_ENABLED: + log("reconcile loop disabled (GUARD_RECONCILE=0)") server = GuardServer((BIND_HOST, BIND_PORT), Handler) log(f"listening on {BIND_HOST}:{BIND_PORT} -> upstream {UPSTREAM}") server.serve_forever()