a06f2c662a
Extend voice-limit-guard to enforce a per-room publish-source policy (io.lotus.room_quality allow_screenshare/allow_camera) for ALL Matrix clients, alongside the existing participant limit. - At token issue, re-sign the LiveKit JWT's canPublishSources to drop forbidden sources (microphone always kept). Verifies our own secret signed the token first and fails open on mismatch, so a secret drift can never mint a token the SFU rejects. Limit check and source policy are independent (one's outage can't skip the other). - Live (mid-call) enforcement: a background reconcile loop calls LiveKit UpdateParticipant to revoke a forbidden source from participants who joined before the policy changed -- which unpublishes their in-progress screenshare/camera server-side within ~3s and blocks re-publish. Only removes sources (never grants), preserves other permission flags, fails open, and runs as a daemon thread that cannot crash or block token issuance. - Endpoint-specific room-id extraction (/get_token->room_id, /sfu/get->room) so a client sending both keys can't get a different room's policy applied. - Auto-deploy the guard on LXC 151 (py_compile-gated, backup + rollback). - Unit tests: JWT re-sign/verify + tamper, secret-mismatch, source narrowing, reconcile (never-grant / preserve-flags / disable-on-empty), fail-open. Numeric bitrate/fps caps are NOT server-enforceable on an SFU (LiveKit forwards, never transcodes) and remain a Lotus-client-cooperative setting; the screenshare/camera permission is the hard cross-client lever. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
365 lines
15 KiB
Python
365 lines
15 KiB
Python
#!/usr/bin/env python3
|
|
"""Unit tests for voice-limit-guard pure logic + JWT re-sign roundtrip.
|
|
|
|
Run: python3 -m unittest livekit/test_voice_limit_guard.py (from repo root)
|
|
The module has a hyphenated filename, so it's loaded via importlib.
|
|
"""
|
|
|
|
import hashlib
|
|
import hmac
|
|
import importlib.util
|
|
import urllib.error
|
|
import json
|
|
import os
|
|
import unittest
|
|
|
|
_HERE = os.path.dirname(os.path.abspath(__file__))
|
|
_spec = importlib.util.spec_from_file_location(
|
|
"voice_limit_guard", os.path.join(_HERE, "voice-limit-guard.py")
|
|
)
|
|
guard = importlib.util.module_from_spec(_spec)
|
|
_spec.loader.exec_module(guard)
|
|
|
|
|
|
def make_jwt(secret: str, payload: dict) -> str:
|
|
header_b64 = guard.b64url(json.dumps({"alg": "HS256", "typ": "JWT"}).encode())
|
|
payload_b64 = guard.b64url(json.dumps(payload).encode())
|
|
signing_input = f"{header_b64}.{payload_b64}".encode()
|
|
sig = guard.b64url(hmac.new(secret.encode(), signing_input, hashlib.sha256).digest())
|
|
return f"{header_b64}.{payload_b64}.{sig}"
|
|
|
|
|
|
def verify_jwt(secret: str, token: str) -> bool:
|
|
header_b64, payload_b64, sig = token.split(".")
|
|
expected = guard.b64url(
|
|
hmac.new(secret.encode(), f"{header_b64}.{payload_b64}".encode(), hashlib.sha256).digest()
|
|
)
|
|
return hmac.compare_digest(expected, sig)
|
|
|
|
|
|
class TestShouldBlock(unittest.TestCase):
|
|
def test_no_limit_never_blocks(self):
|
|
self.assertFalse(guard.should_block(0, {"@a:x", "@b:x"}, "@c:x"))
|
|
|
|
def test_rejoin_never_blocks(self):
|
|
self.assertFalse(guard.should_block(2, {"@a:x", "@b:x"}, "@a:x"))
|
|
|
|
def test_blocks_at_capacity_for_new_user(self):
|
|
self.assertTrue(guard.should_block(2, {"@a:x", "@b:x"}, "@c:x"))
|
|
|
|
def test_allows_below_capacity(self):
|
|
self.assertFalse(guard.should_block(3, {"@a:x", "@b:x"}, "@c:x"))
|
|
|
|
|
|
class TestMatrixUser(unittest.TestCase):
|
|
def test_strips_device(self):
|
|
self.assertEqual(guard.matrix_user("@bob:example.org:DEVICEID"), "@bob:example.org")
|
|
|
|
def test_plain_user(self):
|
|
self.assertEqual(guard.matrix_user("@bob:example.org"), "@bob:example.org")
|
|
|
|
def test_non_matrix_identity_unchanged(self):
|
|
self.assertEqual(guard.matrix_user("hashedfederatedid"), "hashedfederatedid")
|
|
|
|
|
|
class TestAllowedSources(unittest.TestCase):
|
|
def test_all_allowed_returns_none(self):
|
|
self.assertIsNone(guard.allowed_sources({}))
|
|
self.assertIsNone(
|
|
guard.allowed_sources({"allow_camera": True, "allow_screenshare": True})
|
|
)
|
|
|
|
def test_no_screenshare_drops_screen_sources_keeps_mic_cam(self):
|
|
self.assertEqual(
|
|
guard.allowed_sources({"allow_screenshare": False}),
|
|
["microphone", "camera"],
|
|
)
|
|
|
|
def test_audio_only_keeps_mic_only(self):
|
|
self.assertEqual(
|
|
guard.allowed_sources({"allow_screenshare": False, "allow_camera": False}),
|
|
["microphone"],
|
|
)
|
|
|
|
def test_no_camera_keeps_mic_and_screenshare(self):
|
|
self.assertEqual(
|
|
guard.allowed_sources({"allow_camera": False}),
|
|
["microphone", "screen_share", "screen_share_audio"],
|
|
)
|
|
|
|
|
|
class TestResignJwt(unittest.TestCase):
|
|
SECRET = "test-livekit-secret"
|
|
|
|
def _make(self):
|
|
return make_jwt(
|
|
self.SECRET,
|
|
{
|
|
"iss": "APIkey",
|
|
"sub": "@alice:example.org:DEV1",
|
|
"nbf": 1000,
|
|
"exp": 5000,
|
|
"video": {"roomJoin": True, "room": "hashed-alias", "canPublish": True,
|
|
"canSubscribe": True},
|
|
},
|
|
)
|
|
|
|
def test_resigned_token_verifies_with_same_secret(self):
|
|
token = self._make()
|
|
new = guard.resign_jwt(token, self.SECRET, ["microphone", "camera"])
|
|
self.assertTrue(verify_jwt(self.SECRET, new))
|
|
|
|
def test_sets_sources_and_preserves_identity_and_room(self):
|
|
token = self._make()
|
|
new = guard.resign_jwt(token, self.SECRET, ["microphone"])
|
|
claims = guard.jwt_payload(new)
|
|
self.assertEqual(claims["video"]["canPublishSources"], ["microphone"])
|
|
# Everything else preserved.
|
|
self.assertEqual(claims["sub"], "@alice:example.org:DEV1")
|
|
self.assertEqual(claims["video"]["room"], "hashed-alias")
|
|
self.assertEqual(claims["exp"], 5000)
|
|
self.assertTrue(claims["video"]["canPublish"])
|
|
|
|
def test_preserves_original_header_segment(self):
|
|
token = self._make()
|
|
new = guard.resign_jwt(token, self.SECRET, ["microphone"])
|
|
self.assertEqual(token.split(".")[0], new.split(".")[0])
|
|
|
|
def test_raises_without_video_grant(self):
|
|
token = make_jwt(self.SECRET, {"sub": "@x:y", "exp": 1})
|
|
with self.assertRaises(ValueError):
|
|
guard.resign_jwt(token, self.SECRET, ["microphone"])
|
|
|
|
def test_tampering_detectable(self):
|
|
# A token re-signed with the WRONG secret must not verify with the real one.
|
|
token = self._make()
|
|
forged = guard.resign_jwt(token, "wrong-secret", ["microphone"])
|
|
self.assertFalse(verify_jwt(self.SECRET, forged))
|
|
|
|
|
|
class TestVerifyJwtSig(unittest.TestCase):
|
|
SECRET = "shared-livekit-secret"
|
|
|
|
def _token(self, secret):
|
|
return make_jwt(secret, {"sub": "@a:b:D", "exp": 9, "video": {"room": "r"}})
|
|
|
|
def test_verifies_token_signed_with_same_secret(self):
|
|
# Guard's own secret signed the token -> safe to re-sign.
|
|
self.assertTrue(guard.verify_jwt_sig(self._token(self.SECRET), self.SECRET))
|
|
|
|
def test_rejects_token_signed_with_different_secret(self):
|
|
# Secret drift: lk-jwt-service used a different key. Re-signing would
|
|
# produce a token the SFU rejects, so the guard must detect this and
|
|
# skip the restriction (fail open) instead.
|
|
self.assertFalse(guard.verify_jwt_sig(self._token("lk-jwt-secret"), self.SECRET))
|
|
|
|
def test_malformed_token_returns_false(self):
|
|
self.assertFalse(guard.verify_jwt_sig("not-a-jwt", self.SECRET))
|
|
self.assertFalse(guard.verify_jwt_sig("", self.SECRET))
|
|
|
|
|
|
class TestRoomIdFromRequest(unittest.TestCase):
|
|
def test_legacy_sfu_get_reads_room(self):
|
|
self.assertEqual(guard.room_id_from_request("/sfu/get", {"room": "!a:x"}), "!a:x")
|
|
|
|
def test_new_get_token_reads_room_id(self):
|
|
self.assertEqual(guard.room_id_from_request("/get_token", {"room_id": "!b:x"}), "!b:x")
|
|
|
|
def test_both_keys_uses_endpoint_field_not_the_other(self):
|
|
# A client sending both keys must not get the wrong room's policy: each
|
|
# endpoint reads only its own field (matching lk-jwt-service).
|
|
both = {"room": "!lax:x", "room_id": "!restricted:x"}
|
|
self.assertEqual(guard.room_id_from_request("/sfu/get", both), "!lax:x")
|
|
self.assertEqual(guard.room_id_from_request("/get_token", both), "!restricted:x")
|
|
|
|
def test_missing_field_is_empty(self):
|
|
self.assertEqual(guard.room_id_from_request("/get_token", {"room": "!a:x"}), "")
|
|
|
|
|
|
class TestForbiddenSources(unittest.TestCase):
|
|
def test_none_forbidden_when_all_allowed(self):
|
|
self.assertEqual(guard.forbidden_sources({}), set())
|
|
self.assertEqual(
|
|
guard.forbidden_sources({"allow_camera": True, "allow_screenshare": True}), set()
|
|
)
|
|
|
|
def test_screenshare_forbidden(self):
|
|
self.assertEqual(
|
|
guard.forbidden_sources({"allow_screenshare": False}),
|
|
{"SCREEN_SHARE", "SCREEN_SHARE_AUDIO"},
|
|
)
|
|
|
|
def test_audio_only_forbids_cam_and_screen(self):
|
|
self.assertEqual(
|
|
guard.forbidden_sources({"allow_screenshare": False, "allow_camera": False}),
|
|
{"SCREEN_SHARE", "SCREEN_SHARE_AUDIO", "CAMERA"},
|
|
)
|
|
|
|
|
|
class TestReconcilePublishSources(unittest.TestCase):
|
|
SS = {"SCREEN_SHARE", "SCREEN_SHARE_AUDIO"}
|
|
|
|
def test_empty_current_means_all_allowed_so_gets_narrowed(self):
|
|
# [] = all allowed; forbidding screenshare must produce the explicit
|
|
# non-screenshare list (never an empty list, which LK reads as "all").
|
|
result = guard.reconcile_publish_sources([], self.SS)
|
|
self.assertEqual(result, ["CAMERA", "MICROPHONE"])
|
|
|
|
def test_compliant_when_no_forbidden_present(self):
|
|
self.assertIsNone(guard.reconcile_publish_sources(["CAMERA", "MICROPHONE"], self.SS))
|
|
|
|
def test_removes_only_forbidden_never_grants(self):
|
|
result = guard.reconcile_publish_sources(["MICROPHONE", "SCREEN_SHARE"], self.SS)
|
|
self.assertEqual(result, ["MICROPHONE"])
|
|
|
|
def test_never_widens_a_narrow_set(self):
|
|
# Participant only had mic; forbidding screenshare leaves mic — camera is
|
|
# NOT granted.
|
|
self.assertIsNone(guard.reconcile_publish_sources(["MICROPHONE"], self.SS))
|
|
|
|
def test_empty_result_signals_disable_publish(self):
|
|
# If the only source they had is now forbidden, the result is [] so the
|
|
# caller sets canPublish=False (not an empty allow-list).
|
|
result = guard.reconcile_publish_sources(["SCREEN_SHARE"], self.SS)
|
|
self.assertEqual(result, [])
|
|
|
|
|
|
class TestReconcileParticipant(unittest.TestCase):
|
|
def setUp(self):
|
|
self.calls = []
|
|
self._orig = guard.livekit_update_participant
|
|
guard.livekit_update_participant = lambda a, i, p: self.calls.append((a, i, p))
|
|
|
|
def tearDown(self):
|
|
guard.livekit_update_participant = self._orig
|
|
|
|
def test_skips_non_publisher(self):
|
|
p = {"identity": "u", "permission": {"canPublish": False}}
|
|
self.assertFalse(guard.reconcile_participant("room", p, {"SCREEN_SHARE"}))
|
|
self.assertEqual(self.calls, [])
|
|
|
|
def test_skips_compliant_publisher(self):
|
|
p = {
|
|
"identity": "u",
|
|
"permission": {"canPublish": True, "canPublishSources": ["MICROPHONE", "CAMERA"]},
|
|
}
|
|
self.assertFalse(guard.reconcile_participant("room", p, {"SCREEN_SHARE"}))
|
|
self.assertEqual(self.calls, [])
|
|
|
|
def test_revokes_and_preserves_other_permission_flags(self):
|
|
p = {
|
|
"identity": "@a:b:D",
|
|
"permission": {
|
|
"canPublish": True,
|
|
"canSubscribe": True,
|
|
"canPublishData": True,
|
|
"canPublishSources": ["MICROPHONE", "SCREEN_SHARE"],
|
|
},
|
|
}
|
|
self.assertTrue(guard.reconcile_participant("room", p, {"SCREEN_SHARE", "SCREEN_SHARE_AUDIO"}))
|
|
self.assertEqual(len(self.calls), 1)
|
|
_alias, identity, perm = self.calls[0]
|
|
self.assertEqual(identity, "@a:b:D")
|
|
self.assertEqual(perm["canPublishSources"], ["MICROPHONE"])
|
|
self.assertTrue(perm["canPublish"])
|
|
# Other flags preserved (full-replace safety).
|
|
self.assertTrue(perm["canSubscribe"])
|
|
self.assertTrue(perm["canPublishData"])
|
|
|
|
def test_disables_publish_when_no_source_remains(self):
|
|
p = {"identity": "u", "permission": {"canPublish": True, "canPublishSources": ["SCREEN_SHARE"]}}
|
|
guard.reconcile_participant("room", p, {"SCREEN_SHARE", "SCREEN_SHARE_AUDIO"})
|
|
_a, _i, perm = self.calls[0]
|
|
self.assertFalse(perm["canPublish"])
|
|
|
|
|
|
class TestReconcileRoom(unittest.TestCase):
|
|
"""End-to-end reconcile_room with LiveKit + Synapse mocked."""
|
|
|
|
def setUp(self):
|
|
self._orig_state = guard.room_state
|
|
self._orig_list = guard.livekit_list_participants
|
|
self._orig_update = guard.livekit_update_participant
|
|
self.updates = []
|
|
guard.livekit_update_participant = lambda a, i, p: self.updates.append((i, p))
|
|
guard._alias_to_room.clear()
|
|
|
|
def tearDown(self):
|
|
guard.room_state = self._orig_state
|
|
guard.livekit_list_participants = self._orig_list
|
|
guard.livekit_update_participant = self._orig_update
|
|
guard._alias_to_room.clear()
|
|
|
|
def test_unrestricted_room_touches_nothing(self):
|
|
guard.room_state = lambda rid, max_age=0: {"allow_screenshare": True, "allow_camera": True}
|
|
guard.livekit_list_participants = lambda a: (_ for _ in ()).throw(
|
|
AssertionError("should not list participants when unrestricted")
|
|
)
|
|
guard.reconcile_room("alias", "!room:x")
|
|
self.assertEqual(self.updates, [])
|
|
|
|
def test_screenshare_forbidden_revokes_the_sharer(self):
|
|
guard.room_state = lambda rid, max_age=0: {"allow_screenshare": False, "allow_camera": True}
|
|
guard.livekit_list_participants = lambda a: [
|
|
{"identity": "sharer", "permission": {"canPublish": True,
|
|
"canPublishSources": ["MICROPHONE", "SCREEN_SHARE"]}},
|
|
{"identity": "listener", "permission": {"canPublish": True,
|
|
"canPublishSources": ["MICROPHONE"]}},
|
|
]
|
|
guard.reconcile_room("alias", "!room:x")
|
|
self.assertEqual(len(self.updates), 1)
|
|
self.assertEqual(self.updates[0][0], "sharer")
|
|
self.assertEqual(self.updates[0][1]["canPublishSources"], ["MICROPHONE"])
|
|
|
|
def test_empty_room_is_NOT_forgotten(self):
|
|
# An empty read may be transient (room persists until empty_timeout); only
|
|
# a 404 prunes, so mid-call enforcement isn't dropped on a race.
|
|
guard._alias_to_room["alias"] = "!room:x"
|
|
guard.room_state = lambda rid, max_age=0: {"allow_screenshare": False, "allow_camera": True}
|
|
guard.livekit_list_participants = lambda a: []
|
|
guard.reconcile_room("alias", "!room:x")
|
|
self.assertIn("alias", guard._alias_to_room)
|
|
|
|
def test_room_gone_404_is_forgotten(self):
|
|
guard._alias_to_room["alias"] = "!room:x"
|
|
guard.room_state = lambda rid, max_age=0: {"allow_screenshare": False, "allow_camera": True}
|
|
|
|
def _raise_404(_alias):
|
|
raise urllib.error.HTTPError("u", 404, "not found", {}, None)
|
|
|
|
guard.livekit_list_participants = _raise_404
|
|
guard.reconcile_room("alias", "!room:x")
|
|
self.assertNotIn("alias", guard._alias_to_room)
|
|
|
|
|
|
class TestRoomStateParsing(unittest.TestCase):
|
|
"""allow_* only forbids on an explicit False; absent/other stays permissive."""
|
|
|
|
def _policy(self, content):
|
|
# Emulate the parsing block in room_state without hitting Synapse.
|
|
return {
|
|
"allow_screenshare": content.get("allow_screenshare", True) is not False,
|
|
"allow_camera": content.get("allow_camera", True) is not False,
|
|
}
|
|
|
|
def test_absent_is_allowed(self):
|
|
self.assertEqual(
|
|
self._policy({}), {"allow_screenshare": True, "allow_camera": True}
|
|
)
|
|
|
|
def test_explicit_false_forbids(self):
|
|
self.assertEqual(
|
|
self._policy({"allow_screenshare": False}),
|
|
{"allow_screenshare": False, "allow_camera": True},
|
|
)
|
|
|
|
def test_non_bool_stays_permissive(self):
|
|
self.assertEqual(
|
|
self._policy({"allow_screenshare": "no"}),
|
|
{"allow_screenshare": True, "allow_camera": True},
|
|
)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
unittest.main()
|