#!/usr/bin/env python3 """Unit tests for voice-limit-guard pure logic + JWT re-sign roundtrip. Run: python3 -m unittest livekit/test_voice_limit_guard.py (from repo root) The module has a hyphenated filename, so it's loaded via importlib. """ import hashlib import hmac import importlib.util import urllib.error import json import os import unittest _HERE = os.path.dirname(os.path.abspath(__file__)) _spec = importlib.util.spec_from_file_location( "voice_limit_guard", os.path.join(_HERE, "voice-limit-guard.py") ) guard = importlib.util.module_from_spec(_spec) _spec.loader.exec_module(guard) def make_jwt(secret: str, payload: dict) -> str: header_b64 = guard.b64url(json.dumps({"alg": "HS256", "typ": "JWT"}).encode()) payload_b64 = guard.b64url(json.dumps(payload).encode()) signing_input = f"{header_b64}.{payload_b64}".encode() sig = guard.b64url(hmac.new(secret.encode(), signing_input, hashlib.sha256).digest()) return f"{header_b64}.{payload_b64}.{sig}" def verify_jwt(secret: str, token: str) -> bool: header_b64, payload_b64, sig = token.split(".") expected = guard.b64url( hmac.new(secret.encode(), f"{header_b64}.{payload_b64}".encode(), hashlib.sha256).digest() ) return hmac.compare_digest(expected, sig) class TestShouldBlock(unittest.TestCase): def test_no_limit_never_blocks(self): self.assertFalse(guard.should_block(0, {"@a:x", "@b:x"}, "@c:x")) def test_rejoin_never_blocks(self): self.assertFalse(guard.should_block(2, {"@a:x", "@b:x"}, "@a:x")) def test_blocks_at_capacity_for_new_user(self): self.assertTrue(guard.should_block(2, {"@a:x", "@b:x"}, "@c:x")) def test_allows_below_capacity(self): self.assertFalse(guard.should_block(3, {"@a:x", "@b:x"}, "@c:x")) class TestMatrixUser(unittest.TestCase): def test_strips_device(self): self.assertEqual(guard.matrix_user("@bob:example.org:DEVICEID"), "@bob:example.org") def test_plain_user(self): self.assertEqual(guard.matrix_user("@bob:example.org"), "@bob:example.org") def test_non_matrix_identity_unchanged(self): self.assertEqual(guard.matrix_user("hashedfederatedid"), "hashedfederatedid") class TestAllowedSources(unittest.TestCase): def test_all_allowed_returns_none(self): self.assertIsNone(guard.allowed_sources({})) self.assertIsNone( guard.allowed_sources({"allow_camera": True, "allow_screenshare": True}) ) def test_no_screenshare_drops_screen_sources_keeps_mic_cam(self): self.assertEqual( guard.allowed_sources({"allow_screenshare": False}), ["microphone", "camera"], ) def test_audio_only_keeps_mic_only(self): self.assertEqual( guard.allowed_sources({"allow_screenshare": False, "allow_camera": False}), ["microphone"], ) def test_no_camera_keeps_mic_and_screenshare(self): self.assertEqual( guard.allowed_sources({"allow_camera": False}), ["microphone", "screen_share", "screen_share_audio"], ) class TestResignJwt(unittest.TestCase): SECRET = "test-livekit-secret" def _make(self): return make_jwt( self.SECRET, { "iss": "APIkey", "sub": "@alice:example.org:DEV1", "nbf": 1000, "exp": 5000, "video": {"roomJoin": True, "room": "hashed-alias", "canPublish": True, "canSubscribe": True}, }, ) def test_resigned_token_verifies_with_same_secret(self): token = self._make() new = guard.resign_jwt(token, self.SECRET, ["microphone", "camera"]) self.assertTrue(verify_jwt(self.SECRET, new)) def test_sets_sources_and_preserves_identity_and_room(self): token = self._make() new = guard.resign_jwt(token, self.SECRET, ["microphone"]) claims = guard.jwt_payload(new) self.assertEqual(claims["video"]["canPublishSources"], ["microphone"]) # Everything else preserved. self.assertEqual(claims["sub"], "@alice:example.org:DEV1") self.assertEqual(claims["video"]["room"], "hashed-alias") self.assertEqual(claims["exp"], 5000) self.assertTrue(claims["video"]["canPublish"]) def test_preserves_original_header_segment(self): token = self._make() new = guard.resign_jwt(token, self.SECRET, ["microphone"]) self.assertEqual(token.split(".")[0], new.split(".")[0]) def test_raises_without_video_grant(self): token = make_jwt(self.SECRET, {"sub": "@x:y", "exp": 1}) with self.assertRaises(ValueError): guard.resign_jwt(token, self.SECRET, ["microphone"]) def test_tampering_detectable(self): # A token re-signed with the WRONG secret must not verify with the real one. token = self._make() forged = guard.resign_jwt(token, "wrong-secret", ["microphone"]) self.assertFalse(verify_jwt(self.SECRET, forged)) class TestVerifyJwtSig(unittest.TestCase): SECRET = "shared-livekit-secret" def _token(self, secret): return make_jwt(secret, {"sub": "@a:b:D", "exp": 9, "video": {"room": "r"}}) def test_verifies_token_signed_with_same_secret(self): # Guard's own secret signed the token -> safe to re-sign. self.assertTrue(guard.verify_jwt_sig(self._token(self.SECRET), self.SECRET)) def test_rejects_token_signed_with_different_secret(self): # Secret drift: lk-jwt-service used a different key. Re-signing would # produce a token the SFU rejects, so the guard must detect this and # skip the restriction (fail open) instead. self.assertFalse(guard.verify_jwt_sig(self._token("lk-jwt-secret"), self.SECRET)) def test_malformed_token_returns_false(self): self.assertFalse(guard.verify_jwt_sig("not-a-jwt", self.SECRET)) self.assertFalse(guard.verify_jwt_sig("", self.SECRET)) class TestRoomIdFromRequest(unittest.TestCase): def test_legacy_sfu_get_reads_room(self): self.assertEqual(guard.room_id_from_request("/sfu/get", {"room": "!a:x"}), "!a:x") def test_new_get_token_reads_room_id(self): self.assertEqual(guard.room_id_from_request("/get_token", {"room_id": "!b:x"}), "!b:x") def test_both_keys_uses_endpoint_field_not_the_other(self): # A client sending both keys must not get the wrong room's policy: each # endpoint reads only its own field (matching lk-jwt-service). both = {"room": "!lax:x", "room_id": "!restricted:x"} self.assertEqual(guard.room_id_from_request("/sfu/get", both), "!lax:x") self.assertEqual(guard.room_id_from_request("/get_token", both), "!restricted:x") def test_missing_field_is_empty(self): self.assertEqual(guard.room_id_from_request("/get_token", {"room": "!a:x"}), "") class TestForbiddenSources(unittest.TestCase): def test_none_forbidden_when_all_allowed(self): self.assertEqual(guard.forbidden_sources({}), set()) self.assertEqual( guard.forbidden_sources({"allow_camera": True, "allow_screenshare": True}), set() ) def test_screenshare_forbidden(self): self.assertEqual( guard.forbidden_sources({"allow_screenshare": False}), {"SCREEN_SHARE", "SCREEN_SHARE_AUDIO"}, ) def test_audio_only_forbids_cam_and_screen(self): self.assertEqual( guard.forbidden_sources({"allow_screenshare": False, "allow_camera": False}), {"SCREEN_SHARE", "SCREEN_SHARE_AUDIO", "CAMERA"}, ) class TestReconcilePublishSources(unittest.TestCase): SS = {"SCREEN_SHARE", "SCREEN_SHARE_AUDIO"} def test_empty_current_means_all_allowed_so_gets_narrowed(self): # [] = all allowed; forbidding screenshare must produce the explicit # non-screenshare list (never an empty list, which LK reads as "all"). result = guard.reconcile_publish_sources([], self.SS) self.assertEqual(result, ["CAMERA", "MICROPHONE"]) def test_compliant_when_no_forbidden_present(self): self.assertIsNone(guard.reconcile_publish_sources(["CAMERA", "MICROPHONE"], self.SS)) def test_removes_only_forbidden_never_grants(self): result = guard.reconcile_publish_sources(["MICROPHONE", "SCREEN_SHARE"], self.SS) self.assertEqual(result, ["MICROPHONE"]) def test_never_widens_a_narrow_set(self): # Participant only had mic; forbidding screenshare leaves mic — camera is # NOT granted. self.assertIsNone(guard.reconcile_publish_sources(["MICROPHONE"], self.SS)) def test_empty_result_signals_disable_publish(self): # If the only source they had is now forbidden, the result is [] so the # caller sets canPublish=False (not an empty allow-list). result = guard.reconcile_publish_sources(["SCREEN_SHARE"], self.SS) self.assertEqual(result, []) class TestReconcileParticipant(unittest.TestCase): def setUp(self): self.calls = [] self._orig = guard.livekit_update_participant guard.livekit_update_participant = lambda a, i, p: self.calls.append((a, i, p)) def tearDown(self): guard.livekit_update_participant = self._orig def test_skips_non_publisher(self): p = {"identity": "u", "permission": {"canPublish": False}} self.assertFalse(guard.reconcile_participant("room", p, {"SCREEN_SHARE"})) self.assertEqual(self.calls, []) def test_skips_compliant_publisher(self): p = { "identity": "u", "permission": {"canPublish": True, "canPublishSources": ["MICROPHONE", "CAMERA"]}, } self.assertFalse(guard.reconcile_participant("room", p, {"SCREEN_SHARE"})) self.assertEqual(self.calls, []) def test_revokes_and_preserves_other_permission_flags(self): p = { "identity": "@a:b:D", "permission": { "canPublish": True, "canSubscribe": True, "canPublishData": True, "canPublishSources": ["MICROPHONE", "SCREEN_SHARE"], }, } self.assertTrue(guard.reconcile_participant("room", p, {"SCREEN_SHARE", "SCREEN_SHARE_AUDIO"})) self.assertEqual(len(self.calls), 1) _alias, identity, perm = self.calls[0] self.assertEqual(identity, "@a:b:D") self.assertEqual(perm["canPublishSources"], ["MICROPHONE"]) self.assertTrue(perm["canPublish"]) # Other flags preserved (full-replace safety). self.assertTrue(perm["canSubscribe"]) self.assertTrue(perm["canPublishData"]) def test_disables_publish_when_no_source_remains(self): p = {"identity": "u", "permission": {"canPublish": True, "canPublishSources": ["SCREEN_SHARE"]}} guard.reconcile_participant("room", p, {"SCREEN_SHARE", "SCREEN_SHARE_AUDIO"}) _a, _i, perm = self.calls[0] self.assertFalse(perm["canPublish"]) class TestReconcileRoom(unittest.TestCase): """End-to-end reconcile_room with LiveKit + Synapse mocked.""" def setUp(self): self._orig_state = guard.room_state self._orig_list = guard.livekit_list_participants self._orig_update = guard.livekit_update_participant self.updates = [] guard.livekit_update_participant = lambda a, i, p: self.updates.append((i, p)) guard._alias_to_room.clear() def tearDown(self): guard.room_state = self._orig_state guard.livekit_list_participants = self._orig_list guard.livekit_update_participant = self._orig_update guard._alias_to_room.clear() def test_unrestricted_room_touches_nothing(self): guard.room_state = lambda rid, max_age=0: {"allow_screenshare": True, "allow_camera": True} guard.livekit_list_participants = lambda a: (_ for _ in ()).throw( AssertionError("should not list participants when unrestricted") ) guard.reconcile_room("alias", "!room:x") self.assertEqual(self.updates, []) def test_screenshare_forbidden_revokes_the_sharer(self): guard.room_state = lambda rid, max_age=0: {"allow_screenshare": False, "allow_camera": True} guard.livekit_list_participants = lambda a: [ {"identity": "sharer", "permission": {"canPublish": True, "canPublishSources": ["MICROPHONE", "SCREEN_SHARE"]}}, {"identity": "listener", "permission": {"canPublish": True, "canPublishSources": ["MICROPHONE"]}}, ] guard.reconcile_room("alias", "!room:x") self.assertEqual(len(self.updates), 1) self.assertEqual(self.updates[0][0], "sharer") self.assertEqual(self.updates[0][1]["canPublishSources"], ["MICROPHONE"]) def test_empty_room_is_NOT_forgotten(self): # An empty read may be transient (room persists until empty_timeout); only # a 404 prunes, so mid-call enforcement isn't dropped on a race. guard._alias_to_room["alias"] = "!room:x" guard.room_state = lambda rid, max_age=0: {"allow_screenshare": False, "allow_camera": True} guard.livekit_list_participants = lambda a: [] guard.reconcile_room("alias", "!room:x") self.assertIn("alias", guard._alias_to_room) def test_room_gone_404_is_forgotten(self): guard._alias_to_room["alias"] = "!room:x" guard.room_state = lambda rid, max_age=0: {"allow_screenshare": False, "allow_camera": True} def _raise_404(_alias): raise urllib.error.HTTPError("u", 404, "not found", {}, None) guard.livekit_list_participants = _raise_404 guard.reconcile_room("alias", "!room:x") self.assertNotIn("alias", guard._alias_to_room) class TestRoomStateParsing(unittest.TestCase): """allow_* only forbids on an explicit False; absent/other stays permissive.""" def _policy(self, content): # Emulate the parsing block in room_state without hitting Synapse. return { "allow_screenshare": content.get("allow_screenshare", True) is not False, "allow_camera": content.get("allow_camera", True) is not False, } def test_absent_is_allowed(self): self.assertEqual( self._policy({}), {"allow_screenshare": True, "allow_camera": True} ) def test_explicit_false_forbids(self): self.assertEqual( self._policy({"allow_screenshare": False}), {"allow_screenshare": False, "allow_camera": True}, ) def test_non_bool_stays_permissive(self): self.assertEqual( self._policy({"allow_screenshare": "no"}), {"allow_screenshare": True, "allow_camera": True}, ) if __name__ == "__main__": unittest.main()