Compare commits
6 Commits
130a7334a3
...
main
| Author | SHA1 | Date | |
|---|---|---|---|
| 8c9edf60c3 | |||
| 52e9be1f8d | |||
| 442ad9b6ed | |||
| 68a6acfa24 | |||
| 295a072dc9 | |||
| b392798e3f |
@@ -67,7 +67,8 @@ matrix/
|
||||
- coturn config: `/etc/turnserver.conf`
|
||||
- LiveKit config: `/etc/livekit/config.yaml`
|
||||
- LiveKit service: `livekit-server.service`
|
||||
- lk-jwt-service: `lk-jwt-service.service` (binds `:8070`, serves JWT tokens for MatrixRTC at `/sfu/get` and legacy `/get_token`)
|
||||
- lk-jwt-service: `lk-jwt-service.service` (now binds `:8071` via drop-in `/etc/systemd/system/lk-jwt-service.service.d/override.conf`; serves JWT tokens for MatrixRTC at `/sfu/get` and legacy `/get_token`)
|
||||
- voice-limit-guard: `voice-limit-guard.service` (binds `:8070`, fronts lk-jwt-service — enforces hard per-room voice participant limits for ALL clients; script `/opt/voice-limit-guard/voice-limit-guard.py`) — see [Voice Channel Limits](#voice-channel-limits)
|
||||
- Hookshot: `/opt/hookshot/`, service: `matrix-hookshot.service`
|
||||
- Hookshot config: `/opt/hookshot/config.yml`
|
||||
- Hookshot registration: `/etc/matrix-synapse/hookshot-registration.yaml`
|
||||
@@ -187,6 +188,49 @@ Killing livekit-server while a call is active drops everyone. Instead:
|
||||
|
||||
---
|
||||
|
||||
## Voice Channel Limits
|
||||
|
||||
Per-room voice participant caps are enforced **server-side for every client** (Element, FluffyChat, Lotus Chat, …), not just our own web client.
|
||||
|
||||
**How it works**
|
||||
|
||||
Every Matrix client must fetch a LiveKit JWT from lk-jwt-service before it can join a call. `voice-limit-guard` (a small fail-open Python sidecar, `livekit/voice-limit-guard.py` in this repo) sits in front of that service:
|
||||
|
||||
- lk-jwt-service was moved off `:8070` to `:8071` (systemd drop-in). The guard now owns `:8070`, so NPM's existing `/sfu/get` + `/get_token` proxy targets are unchanged.
|
||||
- On each token request the guard reads `io.lotus.voice_limit` → `max_users` for the room (Synapse admin API, cached 10 s). `0` / absent = no limit.
|
||||
- It forwards the request to lk-jwt-service, and if a token is issued it decodes the JWT to get the LiveKit alias (`video.room`) + requester identity (`sub`), then asks LiveKit `ListParticipants` how many **distinct Matrix users** are in the room.
|
||||
- requester already present (rejoin / extra device) → allow
|
||||
- distinct users ≥ limit → **403** (the client cannot get a token, so it cannot join)
|
||||
- otherwise → allow
|
||||
- **Fail-open:** any error (admin API down, bad token, LiveKit unreachable) returns the upstream response unchanged, so calls keep working even if enforcement is degraded.
|
||||
|
||||
**Setting a limit:** room admins set it from Lotus Chat → Room Settings → General → **Voice** (writes the `io.lotus.voice_limit` state event). Any tool that can send room state works too:
|
||||
|
||||
```bash
|
||||
# max 5 participants in <roomId>; send {} to remove the limit
|
||||
curl -X PUT -H "Authorization: Bearer <admin_token>" -H "Content-Type: application/json" \
|
||||
"https://matrix.lotusguild.org/_matrix/client/v3/rooms/<roomId>/state/io.lotus.voice_limit/" \
|
||||
-d '{"max_users": 5}'
|
||||
```
|
||||
|
||||
**Config:** the guard reads `MATRIX_TOKEN` (server-admin) from `/etc/matrix-deploy.env`; LiveKit key/secret + ports are set in `systemd/voice-limit-guard.service`.
|
||||
|
||||
**Manual (re)deploy** (the file-specific auto-deploy pipeline does not cover this service):
|
||||
|
||||
```bash
|
||||
# On LXC 151
|
||||
install -D -m644 /opt/matrix-config/livekit/voice-limit-guard.py /opt/voice-limit-guard/voice-limit-guard.py
|
||||
install -m644 /opt/matrix-config/systemd/voice-limit-guard.service /etc/systemd/system/voice-limit-guard.service
|
||||
# one-time: rebind lk-jwt-service to :8071
|
||||
mkdir -p /etc/systemd/system/lk-jwt-service.service.d
|
||||
printf '[Service]\nEnvironment=LIVEKIT_JWT_BIND=:8071\n' > /etc/systemd/system/lk-jwt-service.service.d/override.conf
|
||||
systemctl daemon-reload && systemctl restart lk-jwt-service && systemctl enable --now voice-limit-guard
|
||||
```
|
||||
|
||||
**To fully revert** (back to lk-jwt-service directly on `:8070`): `systemctl disable --now voice-limit-guard`, remove the drop-in, `daemon-reload`, `systemctl restart lk-jwt-service`.
|
||||
|
||||
---
|
||||
|
||||
## Access Token Rotation
|
||||
|
||||
The `MATRIX_TOKEN` in `/etc/matrix-deploy.env` on LXC 151 is a Jared user token used to push hookshot transforms to Matrix room state (requires power level ≥ 50 in Spam and Stuff).
|
||||
@@ -230,7 +274,8 @@ The token in `draupnir/production.yaml` in this repo is **intentionally redacted
|
||||
| 6789 | LiveKit metrics | 0.0.0.0 |
|
||||
| 7880 | LiveKit HTTP | 0.0.0.0 |
|
||||
| 7881 | LiveKit RTC TCP | 0.0.0.0 |
|
||||
| 8070 | lk-jwt-service | 0.0.0.0 |
|
||||
| 8070 | voice-limit-guard (fronts lk-jwt-service) | 0.0.0.0 |
|
||||
| 8071 | lk-jwt-service (behind guard) | 0.0.0.0 |
|
||||
| 8080 | synapse-admin (nginx) | 0.0.0.0 |
|
||||
| 3478 | coturn STUN/TURN | 0.0.0.0 |
|
||||
| 5349 | coturn TURNS/TLS | 0.0.0.0 |
|
||||
@@ -415,7 +460,7 @@ All custom code lives in `src/app/` on the `lotus` branch of `code.lotusguild.or
|
||||
|
||||
| Feature | Files | Notes |
|
||||
|---------|-------|-------|
|
||||
| **Element Call embed** | `src/app/plugins/call/`, `src/app/hooks/useCallEmbed.ts`, `src/app/components/CallEmbedProvider.tsx` | EC 0.19.3 (`@element-hq/element-call-embedded`), dist copied to `public/element-call/` by vite |
|
||||
| **Element Call embed** | `src/app/plugins/call/`, `src/app/hooks/useCallEmbed.ts`, `src/app/components/CallEmbedProvider.tsx` | EC 0.20.1 (`@element-hq/element-call-embedded`), dist copied to `public/element-call/` by vite |
|
||||
| **DM calls** | `src/app/features/room/Room.tsx`, `src/app/features/room/RoomViewHeader.tsx` | Phone button in DM room header; `useCallStart(true)` passes `intent: StartedByUser`; Room.tsx switches to CallView layout when DM has active call |
|
||||
| **Picture-in-picture call** | `src/app/components/CallEmbedProvider.tsx` | When navigating away from the call room, the embed shrinks to a 280×158px PiP in the bottom-right. Click navigates back. Implemented via `useEffect` imperatively overriding styles on `callEmbedRef.current` — cannot use a wrapper div because `useCallEmbedPlacementSync` writes `top/left/width/height` directly onto that element |
|
||||
| **Screenshare fullscreen** | `src/app/features/call/CallControls.tsx`, `src/app/features/call/Controls.tsx` | When screensharing, a fullscreen button appears in call controls. Calls `callEmbedRef.current?.requestFullscreen()` on the Cinny call container. EC naturally spotlights the screenshare — the old 600ms grid-revert code was removed (it caused fullscreen to show avatars instead of the screen) |
|
||||
@@ -699,7 +744,7 @@ All commands use the `!` prefix. Run `!help` in any room for the full list.
|
||||
| Webhook bridge | matrix-hookshot | 7.3.2 |
|
||||
| Reverse proxy | Nginx Proxy Manager | — |
|
||||
| Web client | Lotus Cinny (fork of `cinnyapp/cinny` main) | custom |
|
||||
| Element Call embed | `@element-hq/element-call-embedded` | 0.19.3 |
|
||||
| Element Call embed | `@element-hq/element-call-embedded` | 0.20.1 |
|
||||
| GIF picker | Giphy JS/React SDK (`@giphy/react-components`) | — |
|
||||
| Auto-deploy | adnanh/webhook | 2.8.0 |
|
||||
| Bot language | Python 3 | 3.x |
|
||||
|
||||
+41
-1
File diff suppressed because one or more lines are too long
@@ -0,0 +1,309 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
voice-limit-guard — hard, cross-client voice channel participant limits.
|
||||
|
||||
Sits in front of lk-jwt-service (the LiveKit MatrixRTC JWT issuer). Every
|
||||
Matrix client (Element, FluffyChat, Lotus Chat, ...) must obtain a token from
|
||||
this service before it can join a LiveKit room, so refusing the token here is a
|
||||
hard block that applies to ALL clients — not just our own.
|
||||
|
||||
Flow for token requests (POST /sfu/get and legacy POST /get_token):
|
||||
1. Read the request body and extract the Matrix `room` id (clients send the
|
||||
*raw* room id here; lk-jwt-service maps it to the hashed LiveKit alias).
|
||||
2. Look up `io.lotus.voice_limit` -> max_users for that room via the Synapse
|
||||
admin API (cached briefly). 0 / absent => no limit.
|
||||
3. Forward the request to lk-jwt-service unchanged and capture its response.
|
||||
4. If a limit applies and the token was issued (HTTP 200), decode the JWT to
|
||||
read the LiveKit alias (`video.room`) and the requester identity (`sub`),
|
||||
then ask LiveKit how many distinct Matrix users are currently in the room.
|
||||
- requester already present (rejoin / extra device) -> allow
|
||||
- distinct users >= limit -> 403 (blocked)
|
||||
- otherwise -> allow
|
||||
5. Anything that goes wrong in steps 2-4 FAILS OPEN: the upstream response is
|
||||
returned unchanged, so calls keep working even if this guard is degraded.
|
||||
|
||||
All other requests (OPTIONS preflight, GET, unknown paths) are proxied
|
||||
transparently so CORS and health behaviour match lk-jwt-service exactly.
|
||||
"""
|
||||
|
||||
import base64
|
||||
import hashlib
|
||||
import hmac
|
||||
import json
|
||||
import os
|
||||
import threading
|
||||
import time
|
||||
import urllib.error
|
||||
import urllib.parse
|
||||
import urllib.request
|
||||
from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer
|
||||
|
||||
# --- configuration (from environment) ---------------------------------------
|
||||
BIND_HOST = os.environ.get("GUARD_BIND_HOST", "0.0.0.0")
|
||||
BIND_PORT = int(os.environ.get("GUARD_BIND_PORT", "8070"))
|
||||
UPSTREAM = os.environ.get("GUARD_UPSTREAM", "http://127.0.0.1:8071").rstrip("/")
|
||||
LIVEKIT_API = os.environ.get("LIVEKIT_API", "http://127.0.0.1:7880").rstrip("/")
|
||||
LIVEKIT_KEY = os.environ.get("LIVEKIT_KEY", "")
|
||||
LIVEKIT_SECRET = os.environ.get("LIVEKIT_SECRET", "")
|
||||
SYNAPSE_API = os.environ.get("SYNAPSE_API", "http://127.0.0.1:8008").rstrip("/")
|
||||
MATRIX_TOKEN = os.environ.get("MATRIX_TOKEN", "")
|
||||
|
||||
TOKEN_PATHS = ("/sfu/get", "/get_token")
|
||||
LIMIT_STATE_TYPE = "io.lotus.voice_limit"
|
||||
CORS_HEADERS = {
|
||||
"Access-Control-Allow-Origin": "*",
|
||||
"Access-Control-Allow-Methods": "POST",
|
||||
"Access-Control-Allow-Headers": "Accept, Content-Type, Content-Length, Accept-Encoding, X-CSRF-Token",
|
||||
}
|
||||
|
||||
# --- small helpers -----------------------------------------------------------
|
||||
|
||||
|
||||
def log(msg):
|
||||
print(f"[voice-limit-guard] {msg}", flush=True)
|
||||
|
||||
|
||||
def b64url(data: bytes) -> str:
|
||||
return base64.urlsafe_b64encode(data).rstrip(b"=").decode()
|
||||
|
||||
|
||||
def jwt_payload(token: str):
|
||||
"""Best-effort decode of a JWT payload without verification."""
|
||||
try:
|
||||
payload_b64 = token.split(".")[1]
|
||||
payload_b64 += "=" * (-len(payload_b64) % 4)
|
||||
return json.loads(base64.urlsafe_b64decode(payload_b64))
|
||||
except Exception:
|
||||
return None
|
||||
|
||||
|
||||
def should_block(limit: int, present_users: set, requester: str) -> bool:
|
||||
"""Pure decision: should this token be refused?
|
||||
|
||||
- limit <= 0 -> never block (no limit configured)
|
||||
- requester already present -> never block (rejoin / extra device)
|
||||
- distinct users >= limit -> block
|
||||
"""
|
||||
if limit <= 0:
|
||||
return False
|
||||
if requester and requester in present_users:
|
||||
return False
|
||||
return len(present_users) >= limit
|
||||
|
||||
|
||||
def matrix_user(identity: str) -> str:
|
||||
"""Reduce a LiveKit identity (`@user:domain:DEVICE`) to `@user:domain`.
|
||||
|
||||
Non-Matrix identities (e.g. hashed federated identities) are returned
|
||||
unchanged so they each count as a distinct user.
|
||||
"""
|
||||
if not identity.startswith("@"):
|
||||
return identity
|
||||
first = identity.find(":")
|
||||
if first == -1:
|
||||
return identity
|
||||
second = identity.find(":", first + 1)
|
||||
return identity if second == -1 else identity[:second]
|
||||
|
||||
|
||||
# --- LiveKit admin JWT + participant counting --------------------------------
|
||||
|
||||
|
||||
def livekit_admin_token(room: str) -> str:
|
||||
now = int(time.time())
|
||||
header = b64url(json.dumps({"alg": "HS256", "typ": "JWT"}).encode())
|
||||
payload = b64url(
|
||||
json.dumps(
|
||||
{
|
||||
"iss": LIVEKIT_KEY,
|
||||
"exp": now + 120,
|
||||
"nbf": now - 10,
|
||||
"video": {"roomAdmin": True, "room": room, "roomList": True},
|
||||
}
|
||||
).encode()
|
||||
)
|
||||
signing_input = f"{header}.{payload}".encode()
|
||||
sig = b64url(hmac.new(LIVEKIT_SECRET.encode(), signing_input, hashlib.sha256).digest())
|
||||
return f"{header}.{payload}.{sig}"
|
||||
|
||||
|
||||
def livekit_present_users(alias: str):
|
||||
"""Return the set of distinct Matrix users currently in the LiveKit room."""
|
||||
req = urllib.request.Request(
|
||||
f"{LIVEKIT_API}/twirp/livekit.RoomService/ListParticipants",
|
||||
data=json.dumps({"room": alias}).encode(),
|
||||
headers={
|
||||
"Authorization": "Bearer " + livekit_admin_token(alias),
|
||||
"Content-Type": "application/json",
|
||||
},
|
||||
method="POST",
|
||||
)
|
||||
with urllib.request.urlopen(req, timeout=5) as resp:
|
||||
data = json.loads(resp.read())
|
||||
return {matrix_user(p.get("identity", "")) for p in data.get("participants", [])}
|
||||
|
||||
|
||||
# --- per-room limit lookup (cached) ------------------------------------------
|
||||
|
||||
_limit_cache = {} # room_id -> (max_users, expiry_epoch)
|
||||
_limit_cache_lock = threading.Lock()
|
||||
_LIMIT_TTL = 10.0
|
||||
|
||||
|
||||
def room_limit(room_id: str) -> int:
|
||||
now = time.time()
|
||||
with _limit_cache_lock:
|
||||
cached = _limit_cache.get(room_id)
|
||||
if cached and cached[1] > now:
|
||||
return cached[0]
|
||||
|
||||
limit = 0
|
||||
try:
|
||||
url = (
|
||||
f"{SYNAPSE_API}/_synapse/admin/v1/rooms/"
|
||||
f"{urllib.parse.quote(room_id, safe='')}/state"
|
||||
)
|
||||
req = urllib.request.Request(url, headers={"Authorization": "Bearer " + MATRIX_TOKEN})
|
||||
with urllib.request.urlopen(req, timeout=5) as resp:
|
||||
events = json.loads(resp.read()).get("state", [])
|
||||
for ev in events:
|
||||
if ev.get("type") == LIMIT_STATE_TYPE and ev.get("state_key", "") == "":
|
||||
limit = int(ev.get("content", {}).get("max_users", 0) or 0)
|
||||
break
|
||||
except Exception as exc:
|
||||
# Fail open: treat lookup failure as "no limit".
|
||||
log(f"limit lookup failed for {room_id}: {exc}")
|
||||
return 0
|
||||
|
||||
with _limit_cache_lock:
|
||||
_limit_cache[room_id] = (limit, now + _LIMIT_TTL)
|
||||
return limit
|
||||
|
||||
|
||||
# --- HTTP handler ------------------------------------------------------------
|
||||
|
||||
|
||||
class Handler(BaseHTTPRequestHandler):
|
||||
protocol_version = "HTTP/1.1"
|
||||
|
||||
def log_message(self, *args): # silence default request logging
|
||||
pass
|
||||
|
||||
def _read_body(self) -> bytes:
|
||||
length = int(self.headers.get("Content-Length", 0) or 0)
|
||||
return self.rfile.read(length) if length else b""
|
||||
|
||||
def _proxy(self, body: bytes):
|
||||
"""Forward the current request to lk-jwt-service and return its response
|
||||
as (status, headers_dict, body_bytes)."""
|
||||
headers = {}
|
||||
if self.headers.get("Content-Type"):
|
||||
headers["Content-Type"] = self.headers["Content-Type"]
|
||||
if self.headers.get("Accept"):
|
||||
headers["Accept"] = self.headers["Accept"]
|
||||
if self.headers.get("Origin"):
|
||||
headers["Origin"] = self.headers["Origin"]
|
||||
req = urllib.request.Request(
|
||||
UPSTREAM + self.path,
|
||||
data=body if self.command in ("POST", "PUT", "PATCH") else None,
|
||||
headers=headers,
|
||||
method=self.command,
|
||||
)
|
||||
try:
|
||||
with urllib.request.urlopen(req, timeout=15) as resp:
|
||||
return resp.status, dict(resp.headers), resp.read()
|
||||
except urllib.error.HTTPError as exc:
|
||||
return exc.code, dict(exc.headers), exc.read()
|
||||
|
||||
def _send(self, status: int, headers: dict, body: bytes):
|
||||
self.send_response(status)
|
||||
# send_response() already emits Server and Date; relaying them too would
|
||||
# produce duplicates. Content-Length is recomputed below.
|
||||
skip = ("transfer-encoding", "content-length", "connection", "date", "server")
|
||||
for key, value in headers.items():
|
||||
if key.lower() in skip:
|
||||
continue
|
||||
self.send_header(key, value)
|
||||
self.send_header("Content-Length", str(len(body)))
|
||||
self.end_headers()
|
||||
if body and self.command != "HEAD":
|
||||
self.wfile.write(body)
|
||||
|
||||
def _send_blocked(self):
|
||||
body = json.dumps(
|
||||
{"errcode": "M_FORBIDDEN", "error": "This voice channel is full."}
|
||||
).encode()
|
||||
headers = dict(CORS_HEADERS)
|
||||
headers["Content-Type"] = "application/json"
|
||||
self._send(403, headers, body)
|
||||
|
||||
def _handle(self):
|
||||
body = self._read_body()
|
||||
|
||||
# Only token-issuing POSTs are subject to the limit check.
|
||||
if not (self.command == "POST" and self.path in TOKEN_PATHS):
|
||||
self._send(*self._proxy(body))
|
||||
return
|
||||
|
||||
# Determine the room and its limit before bothering upstream.
|
||||
try:
|
||||
room_id = json.loads(body).get("room", "")
|
||||
except Exception:
|
||||
room_id = ""
|
||||
|
||||
limit = room_limit(room_id) if room_id else 0
|
||||
|
||||
status, headers, resp_body = self._proxy(body)
|
||||
|
||||
# No limit, or upstream didn't issue a token -> pass through unchanged.
|
||||
if limit <= 0 or status != 200:
|
||||
self._send(status, headers, resp_body)
|
||||
return
|
||||
|
||||
# Limit applies and a token was issued — decide whether to allow it.
|
||||
try:
|
||||
payload = json.loads(resp_body)
|
||||
token = payload.get("jwt", "")
|
||||
claims = jwt_payload(token) or {}
|
||||
alias = (claims.get("video") or {}).get("room", "")
|
||||
requester = matrix_user(claims.get("sub", ""))
|
||||
if not alias:
|
||||
raise ValueError("no alias in issued token")
|
||||
|
||||
present = livekit_present_users(alias)
|
||||
if should_block(limit, present, requester):
|
||||
log(f"blocked {requester or '?'} from {room_id}: {len(present)}/{limit}")
|
||||
self._send_blocked()
|
||||
return
|
||||
except Exception as exc:
|
||||
# Fail open: never break a join because the guard had a problem.
|
||||
log(f"limit check error for {room_id}: {exc}")
|
||||
|
||||
self._send(status, headers, resp_body)
|
||||
|
||||
# Map the HTTP verbs we care about onto the shared handler.
|
||||
do_GET = _handle
|
||||
do_POST = _handle
|
||||
do_OPTIONS = _handle
|
||||
do_HEAD = _handle
|
||||
do_PUT = _handle
|
||||
|
||||
|
||||
class GuardServer(ThreadingHTTPServer):
|
||||
daemon_threads = True
|
||||
# Element Call fires a burst of token requests per join; keep the accept
|
||||
# queue generous so none are dropped.
|
||||
request_queue_size = 128
|
||||
allow_reuse_address = True
|
||||
|
||||
|
||||
def main():
|
||||
if not (LIVEKIT_KEY and LIVEKIT_SECRET and MATRIX_TOKEN):
|
||||
log("WARNING: missing LIVEKIT_KEY/LIVEKIT_SECRET/MATRIX_TOKEN — limit checks will fail open")
|
||||
server = GuardServer((BIND_HOST, BIND_PORT), Handler)
|
||||
log(f"listening on {BIND_HOST}:{BIND_PORT} -> upstream {UPSTREAM}")
|
||||
server.serve_forever()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
@@ -0,0 +1,22 @@
|
||||
[Unit]
|
||||
Description=Voice Limit Guard (hard per-room voice channel participant limits, fronts lk-jwt-service)
|
||||
After=network.target livekit-server.service lk-jwt-service.service
|
||||
Wants=lk-jwt-service.service
|
||||
|
||||
[Service]
|
||||
Type=simple
|
||||
ExecStart=/usr/bin/env python3 /opt/voice-limit-guard/voice-limit-guard.py
|
||||
Restart=on-failure
|
||||
RestartSec=5
|
||||
# MATRIX_TOKEN (server-admin) is read from the existing deploy env file.
|
||||
EnvironmentFile=/etc/matrix-deploy.env
|
||||
Environment=GUARD_BIND_HOST=0.0.0.0
|
||||
Environment=GUARD_BIND_PORT=8070
|
||||
Environment=GUARD_UPSTREAM=http://127.0.0.1:8071
|
||||
Environment=LIVEKIT_API=http://127.0.0.1:7880
|
||||
Environment=SYNAPSE_API=http://127.0.0.1:8008
|
||||
Environment=LIVEKIT_KEY=lotuskey
|
||||
Environment=LIVEKIT_SECRET=GoI5PPLbNXZlQHlfdAzLFy0B/QoqA9uXiyb/p6dQEtc=
|
||||
|
||||
[Install]
|
||||
WantedBy=multi-user.target
|
||||
Reference in New Issue
Block a user