Files
matrix/livekit/test_voice_limit_guard.py
T
jared a06f2c662a feat(livekit-guard): enforce per-room call permissions (screenshare/camera)
Extend voice-limit-guard to enforce a per-room publish-source policy
(io.lotus.room_quality allow_screenshare/allow_camera) for ALL Matrix clients,
alongside the existing participant limit.

- At token issue, re-sign the LiveKit JWT's canPublishSources to drop forbidden
  sources (microphone always kept). Verifies our own secret signed the token
  first and fails open on mismatch, so a secret drift can never mint a token the
  SFU rejects. Limit check and source policy are independent (one's outage can't
  skip the other).
- Live (mid-call) enforcement: a background reconcile loop calls LiveKit
  UpdateParticipant to revoke a forbidden source from participants who joined
  before the policy changed -- which unpublishes their in-progress
  screenshare/camera server-side within ~3s and blocks re-publish. Only removes
  sources (never grants), preserves other permission flags, fails open, and runs
  as a daemon thread that cannot crash or block token issuance.
- Endpoint-specific room-id extraction (/get_token->room_id, /sfu/get->room) so
  a client sending both keys can't get a different room's policy applied.
- Auto-deploy the guard on LXC 151 (py_compile-gated, backup + rollback).
- Unit tests: JWT re-sign/verify + tamper, secret-mismatch, source narrowing,
  reconcile (never-grant / preserve-flags / disable-on-empty), fail-open.

Numeric bitrate/fps caps are NOT server-enforceable on an SFU (LiveKit forwards,
never transcodes) and remain a Lotus-client-cooperative setting; the
screenshare/camera permission is the hard cross-client lever.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-30 22:34:34 -04:00

365 lines
15 KiB
Python

#!/usr/bin/env python3
"""Unit tests for voice-limit-guard pure logic + JWT re-sign roundtrip.
Run: python3 -m unittest livekit/test_voice_limit_guard.py (from repo root)
The module has a hyphenated filename, so it's loaded via importlib.
"""
import hashlib
import hmac
import importlib.util
import urllib.error
import json
import os
import unittest
_HERE = os.path.dirname(os.path.abspath(__file__))
_spec = importlib.util.spec_from_file_location(
"voice_limit_guard", os.path.join(_HERE, "voice-limit-guard.py")
)
guard = importlib.util.module_from_spec(_spec)
_spec.loader.exec_module(guard)
def make_jwt(secret: str, payload: dict) -> str:
header_b64 = guard.b64url(json.dumps({"alg": "HS256", "typ": "JWT"}).encode())
payload_b64 = guard.b64url(json.dumps(payload).encode())
signing_input = f"{header_b64}.{payload_b64}".encode()
sig = guard.b64url(hmac.new(secret.encode(), signing_input, hashlib.sha256).digest())
return f"{header_b64}.{payload_b64}.{sig}"
def verify_jwt(secret: str, token: str) -> bool:
header_b64, payload_b64, sig = token.split(".")
expected = guard.b64url(
hmac.new(secret.encode(), f"{header_b64}.{payload_b64}".encode(), hashlib.sha256).digest()
)
return hmac.compare_digest(expected, sig)
class TestShouldBlock(unittest.TestCase):
def test_no_limit_never_blocks(self):
self.assertFalse(guard.should_block(0, {"@a:x", "@b:x"}, "@c:x"))
def test_rejoin_never_blocks(self):
self.assertFalse(guard.should_block(2, {"@a:x", "@b:x"}, "@a:x"))
def test_blocks_at_capacity_for_new_user(self):
self.assertTrue(guard.should_block(2, {"@a:x", "@b:x"}, "@c:x"))
def test_allows_below_capacity(self):
self.assertFalse(guard.should_block(3, {"@a:x", "@b:x"}, "@c:x"))
class TestMatrixUser(unittest.TestCase):
def test_strips_device(self):
self.assertEqual(guard.matrix_user("@bob:example.org:DEVICEID"), "@bob:example.org")
def test_plain_user(self):
self.assertEqual(guard.matrix_user("@bob:example.org"), "@bob:example.org")
def test_non_matrix_identity_unchanged(self):
self.assertEqual(guard.matrix_user("hashedfederatedid"), "hashedfederatedid")
class TestAllowedSources(unittest.TestCase):
def test_all_allowed_returns_none(self):
self.assertIsNone(guard.allowed_sources({}))
self.assertIsNone(
guard.allowed_sources({"allow_camera": True, "allow_screenshare": True})
)
def test_no_screenshare_drops_screen_sources_keeps_mic_cam(self):
self.assertEqual(
guard.allowed_sources({"allow_screenshare": False}),
["microphone", "camera"],
)
def test_audio_only_keeps_mic_only(self):
self.assertEqual(
guard.allowed_sources({"allow_screenshare": False, "allow_camera": False}),
["microphone"],
)
def test_no_camera_keeps_mic_and_screenshare(self):
self.assertEqual(
guard.allowed_sources({"allow_camera": False}),
["microphone", "screen_share", "screen_share_audio"],
)
class TestResignJwt(unittest.TestCase):
SECRET = "test-livekit-secret"
def _make(self):
return make_jwt(
self.SECRET,
{
"iss": "APIkey",
"sub": "@alice:example.org:DEV1",
"nbf": 1000,
"exp": 5000,
"video": {"roomJoin": True, "room": "hashed-alias", "canPublish": True,
"canSubscribe": True},
},
)
def test_resigned_token_verifies_with_same_secret(self):
token = self._make()
new = guard.resign_jwt(token, self.SECRET, ["microphone", "camera"])
self.assertTrue(verify_jwt(self.SECRET, new))
def test_sets_sources_and_preserves_identity_and_room(self):
token = self._make()
new = guard.resign_jwt(token, self.SECRET, ["microphone"])
claims = guard.jwt_payload(new)
self.assertEqual(claims["video"]["canPublishSources"], ["microphone"])
# Everything else preserved.
self.assertEqual(claims["sub"], "@alice:example.org:DEV1")
self.assertEqual(claims["video"]["room"], "hashed-alias")
self.assertEqual(claims["exp"], 5000)
self.assertTrue(claims["video"]["canPublish"])
def test_preserves_original_header_segment(self):
token = self._make()
new = guard.resign_jwt(token, self.SECRET, ["microphone"])
self.assertEqual(token.split(".")[0], new.split(".")[0])
def test_raises_without_video_grant(self):
token = make_jwt(self.SECRET, {"sub": "@x:y", "exp": 1})
with self.assertRaises(ValueError):
guard.resign_jwt(token, self.SECRET, ["microphone"])
def test_tampering_detectable(self):
# A token re-signed with the WRONG secret must not verify with the real one.
token = self._make()
forged = guard.resign_jwt(token, "wrong-secret", ["microphone"])
self.assertFalse(verify_jwt(self.SECRET, forged))
class TestVerifyJwtSig(unittest.TestCase):
SECRET = "shared-livekit-secret"
def _token(self, secret):
return make_jwt(secret, {"sub": "@a:b:D", "exp": 9, "video": {"room": "r"}})
def test_verifies_token_signed_with_same_secret(self):
# Guard's own secret signed the token -> safe to re-sign.
self.assertTrue(guard.verify_jwt_sig(self._token(self.SECRET), self.SECRET))
def test_rejects_token_signed_with_different_secret(self):
# Secret drift: lk-jwt-service used a different key. Re-signing would
# produce a token the SFU rejects, so the guard must detect this and
# skip the restriction (fail open) instead.
self.assertFalse(guard.verify_jwt_sig(self._token("lk-jwt-secret"), self.SECRET))
def test_malformed_token_returns_false(self):
self.assertFalse(guard.verify_jwt_sig("not-a-jwt", self.SECRET))
self.assertFalse(guard.verify_jwt_sig("", self.SECRET))
class TestRoomIdFromRequest(unittest.TestCase):
def test_legacy_sfu_get_reads_room(self):
self.assertEqual(guard.room_id_from_request("/sfu/get", {"room": "!a:x"}), "!a:x")
def test_new_get_token_reads_room_id(self):
self.assertEqual(guard.room_id_from_request("/get_token", {"room_id": "!b:x"}), "!b:x")
def test_both_keys_uses_endpoint_field_not_the_other(self):
# A client sending both keys must not get the wrong room's policy: each
# endpoint reads only its own field (matching lk-jwt-service).
both = {"room": "!lax:x", "room_id": "!restricted:x"}
self.assertEqual(guard.room_id_from_request("/sfu/get", both), "!lax:x")
self.assertEqual(guard.room_id_from_request("/get_token", both), "!restricted:x")
def test_missing_field_is_empty(self):
self.assertEqual(guard.room_id_from_request("/get_token", {"room": "!a:x"}), "")
class TestForbiddenSources(unittest.TestCase):
def test_none_forbidden_when_all_allowed(self):
self.assertEqual(guard.forbidden_sources({}), set())
self.assertEqual(
guard.forbidden_sources({"allow_camera": True, "allow_screenshare": True}), set()
)
def test_screenshare_forbidden(self):
self.assertEqual(
guard.forbidden_sources({"allow_screenshare": False}),
{"SCREEN_SHARE", "SCREEN_SHARE_AUDIO"},
)
def test_audio_only_forbids_cam_and_screen(self):
self.assertEqual(
guard.forbidden_sources({"allow_screenshare": False, "allow_camera": False}),
{"SCREEN_SHARE", "SCREEN_SHARE_AUDIO", "CAMERA"},
)
class TestReconcilePublishSources(unittest.TestCase):
SS = {"SCREEN_SHARE", "SCREEN_SHARE_AUDIO"}
def test_empty_current_means_all_allowed_so_gets_narrowed(self):
# [] = all allowed; forbidding screenshare must produce the explicit
# non-screenshare list (never an empty list, which LK reads as "all").
result = guard.reconcile_publish_sources([], self.SS)
self.assertEqual(result, ["CAMERA", "MICROPHONE"])
def test_compliant_when_no_forbidden_present(self):
self.assertIsNone(guard.reconcile_publish_sources(["CAMERA", "MICROPHONE"], self.SS))
def test_removes_only_forbidden_never_grants(self):
result = guard.reconcile_publish_sources(["MICROPHONE", "SCREEN_SHARE"], self.SS)
self.assertEqual(result, ["MICROPHONE"])
def test_never_widens_a_narrow_set(self):
# Participant only had mic; forbidding screenshare leaves mic — camera is
# NOT granted.
self.assertIsNone(guard.reconcile_publish_sources(["MICROPHONE"], self.SS))
def test_empty_result_signals_disable_publish(self):
# If the only source they had is now forbidden, the result is [] so the
# caller sets canPublish=False (not an empty allow-list).
result = guard.reconcile_publish_sources(["SCREEN_SHARE"], self.SS)
self.assertEqual(result, [])
class TestReconcileParticipant(unittest.TestCase):
def setUp(self):
self.calls = []
self._orig = guard.livekit_update_participant
guard.livekit_update_participant = lambda a, i, p: self.calls.append((a, i, p))
def tearDown(self):
guard.livekit_update_participant = self._orig
def test_skips_non_publisher(self):
p = {"identity": "u", "permission": {"canPublish": False}}
self.assertFalse(guard.reconcile_participant("room", p, {"SCREEN_SHARE"}))
self.assertEqual(self.calls, [])
def test_skips_compliant_publisher(self):
p = {
"identity": "u",
"permission": {"canPublish": True, "canPublishSources": ["MICROPHONE", "CAMERA"]},
}
self.assertFalse(guard.reconcile_participant("room", p, {"SCREEN_SHARE"}))
self.assertEqual(self.calls, [])
def test_revokes_and_preserves_other_permission_flags(self):
p = {
"identity": "@a:b:D",
"permission": {
"canPublish": True,
"canSubscribe": True,
"canPublishData": True,
"canPublishSources": ["MICROPHONE", "SCREEN_SHARE"],
},
}
self.assertTrue(guard.reconcile_participant("room", p, {"SCREEN_SHARE", "SCREEN_SHARE_AUDIO"}))
self.assertEqual(len(self.calls), 1)
_alias, identity, perm = self.calls[0]
self.assertEqual(identity, "@a:b:D")
self.assertEqual(perm["canPublishSources"], ["MICROPHONE"])
self.assertTrue(perm["canPublish"])
# Other flags preserved (full-replace safety).
self.assertTrue(perm["canSubscribe"])
self.assertTrue(perm["canPublishData"])
def test_disables_publish_when_no_source_remains(self):
p = {"identity": "u", "permission": {"canPublish": True, "canPublishSources": ["SCREEN_SHARE"]}}
guard.reconcile_participant("room", p, {"SCREEN_SHARE", "SCREEN_SHARE_AUDIO"})
_a, _i, perm = self.calls[0]
self.assertFalse(perm["canPublish"])
class TestReconcileRoom(unittest.TestCase):
"""End-to-end reconcile_room with LiveKit + Synapse mocked."""
def setUp(self):
self._orig_state = guard.room_state
self._orig_list = guard.livekit_list_participants
self._orig_update = guard.livekit_update_participant
self.updates = []
guard.livekit_update_participant = lambda a, i, p: self.updates.append((i, p))
guard._alias_to_room.clear()
def tearDown(self):
guard.room_state = self._orig_state
guard.livekit_list_participants = self._orig_list
guard.livekit_update_participant = self._orig_update
guard._alias_to_room.clear()
def test_unrestricted_room_touches_nothing(self):
guard.room_state = lambda rid, max_age=0: {"allow_screenshare": True, "allow_camera": True}
guard.livekit_list_participants = lambda a: (_ for _ in ()).throw(
AssertionError("should not list participants when unrestricted")
)
guard.reconcile_room("alias", "!room:x")
self.assertEqual(self.updates, [])
def test_screenshare_forbidden_revokes_the_sharer(self):
guard.room_state = lambda rid, max_age=0: {"allow_screenshare": False, "allow_camera": True}
guard.livekit_list_participants = lambda a: [
{"identity": "sharer", "permission": {"canPublish": True,
"canPublishSources": ["MICROPHONE", "SCREEN_SHARE"]}},
{"identity": "listener", "permission": {"canPublish": True,
"canPublishSources": ["MICROPHONE"]}},
]
guard.reconcile_room("alias", "!room:x")
self.assertEqual(len(self.updates), 1)
self.assertEqual(self.updates[0][0], "sharer")
self.assertEqual(self.updates[0][1]["canPublishSources"], ["MICROPHONE"])
def test_empty_room_is_NOT_forgotten(self):
# An empty read may be transient (room persists until empty_timeout); only
# a 404 prunes, so mid-call enforcement isn't dropped on a race.
guard._alias_to_room["alias"] = "!room:x"
guard.room_state = lambda rid, max_age=0: {"allow_screenshare": False, "allow_camera": True}
guard.livekit_list_participants = lambda a: []
guard.reconcile_room("alias", "!room:x")
self.assertIn("alias", guard._alias_to_room)
def test_room_gone_404_is_forgotten(self):
guard._alias_to_room["alias"] = "!room:x"
guard.room_state = lambda rid, max_age=0: {"allow_screenshare": False, "allow_camera": True}
def _raise_404(_alias):
raise urllib.error.HTTPError("u", 404, "not found", {}, None)
guard.livekit_list_participants = _raise_404
guard.reconcile_room("alias", "!room:x")
self.assertNotIn("alias", guard._alias_to_room)
class TestRoomStateParsing(unittest.TestCase):
"""allow_* only forbids on an explicit False; absent/other stays permissive."""
def _policy(self, content):
# Emulate the parsing block in room_state without hitting Synapse.
return {
"allow_screenshare": content.get("allow_screenshare", True) is not False,
"allow_camera": content.get("allow_camera", True) is not False,
}
def test_absent_is_allowed(self):
self.assertEqual(
self._policy({}), {"allow_screenshare": True, "allow_camera": True}
)
def test_explicit_false_forbids(self):
self.assertEqual(
self._policy({"allow_screenshare": False}),
{"allow_screenshare": False, "allow_camera": True},
)
def test_non_bool_stays_permissive(self):
self.assertEqual(
self._policy({"allow_screenshare": "no"}),
{"allow_screenshare": True, "allow_camera": True},
)
if __name__ == "__main__":
unittest.main()