fix: diagnostic rate limiting, lock-held ownership check, iface name length cap
Lint / Python (flake8) (push) Failing after 47s
Lint / JS (eslint) (push) Successful in 8s
Security / Python Security (bandit) (push) Successful in 43s
Test / Python Tests (pytest) (push) Failing after 1m22s
Lint / Notify on failure (push) Successful in 3s
Lint / Deploy (push) Has been skipped

- app.py: add per-user diagnostic rate limit (5/min) enforced atomically under _diag_lock
- app.py: move diagnostic job ownership check inside _diag_lock to close TOCTOU window; snapshot result before releasing lock
- monitor.py: cap interface name regex to 15 chars (Linux IFNAMSIZ limit)

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-05-11 08:42:50 -04:00
parent c71d0da97d
commit 25baec67ac
2 changed files with 19 additions and 8 deletions
+17 -6
View File
@@ -59,6 +59,8 @@ def inject_config():
# In-memory diagnostic job store { job_id: { status, result, created_at } }
_diag_jobs: dict = {}
_diag_lock = threading.Lock()
# Per-user rate-limit: { username: [epoch_float, ...] } — cleaned inside _diag_lock
_diag_rate: dict = {}
def _purge_old_jobs_loop():
@@ -437,10 +439,17 @@ def api_diagnose_start():
job_id = str(uuid.uuid4())
requesting_user = _get_user()['username']
now = time.time()
with _diag_lock:
# Rate limit: max 5 diagnostic jobs per user per minute
recent = [t for t in _diag_rate.get(requesting_user, []) if now - t < 60]
if len(recent) >= 5:
return jsonify({'error': 'Rate limit exceeded: max 5 diagnostics per minute'}), 429
recent.append(now)
_diag_rate[requesting_user] = recent
_diag_jobs[job_id] = {
'status': 'running', 'result': None,
'created_at': time.time(), 'user': requesting_user,
'created_at': now, 'user': requesting_user,
}
def _run():
@@ -467,13 +476,15 @@ def api_diagnose_start():
@require_auth
def api_diagnose_poll(job_id: str):
"""Poll a diagnostic job. Returns {status, result}."""
current_user = _get_user()['username']
with _diag_lock:
job = _diag_jobs.get(job_id)
if not job:
return jsonify({'error': 'Job not found'}), 404
if job.get('user') != _get_user()['username']:
return jsonify({'error': 'Forbidden'}), 403
return jsonify({'status': job['status'], 'result': job.get('result')})
if not job:
return jsonify({'error': 'Job not found'}), 404
if job.get('user') != current_user:
return jsonify({'error': 'Forbidden'}), 403
snapshot = {'status': job['status'], 'result': job.get('result')}
return jsonify(snapshot)
@app.route('/api/avatar')
+2 -2
View File
@@ -355,8 +355,8 @@ class LinkStatsCollector:
if not ifaces or not self.pulse.url:
return {}
# Validate interface names (kernel names only contain [a-zA-Z0-9_.-])
safe_ifaces = [i for i in ifaces if re.match(r'^[a-zA-Z0-9_.-]+$', i)]
# Validate interface names (kernel names: [a-zA-Z0-9_.-], max 15 chars per IFNAMSIZ)
safe_ifaces = [i for i in ifaces if re.match(r'^[a-zA-Z0-9_.-]{1,15}$', i)]
if not safe_ifaces:
return {}