diff --git a/app.py b/app.py index c1c5004..739e7c1 100644 --- a/app.py +++ b/app.py @@ -59,6 +59,8 @@ def inject_config(): # In-memory diagnostic job store { job_id: { status, result, created_at } } _diag_jobs: dict = {} _diag_lock = threading.Lock() +# Per-user rate-limit: { username: [epoch_float, ...] } — cleaned inside _diag_lock +_diag_rate: dict = {} def _purge_old_jobs_loop(): @@ -437,10 +439,17 @@ def api_diagnose_start(): job_id = str(uuid.uuid4()) requesting_user = _get_user()['username'] + now = time.time() with _diag_lock: + # Rate limit: max 5 diagnostic jobs per user per minute + recent = [t for t in _diag_rate.get(requesting_user, []) if now - t < 60] + if len(recent) >= 5: + return jsonify({'error': 'Rate limit exceeded: max 5 diagnostics per minute'}), 429 + recent.append(now) + _diag_rate[requesting_user] = recent _diag_jobs[job_id] = { 'status': 'running', 'result': None, - 'created_at': time.time(), 'user': requesting_user, + 'created_at': now, 'user': requesting_user, } def _run(): @@ -467,13 +476,15 @@ def api_diagnose_start(): @require_auth def api_diagnose_poll(job_id: str): """Poll a diagnostic job. Returns {status, result}.""" + current_user = _get_user()['username'] with _diag_lock: job = _diag_jobs.get(job_id) - if not job: - return jsonify({'error': 'Job not found'}), 404 - if job.get('user') != _get_user()['username']: - return jsonify({'error': 'Forbidden'}), 403 - return jsonify({'status': job['status'], 'result': job.get('result')}) + if not job: + return jsonify({'error': 'Job not found'}), 404 + if job.get('user') != current_user: + return jsonify({'error': 'Forbidden'}), 403 + snapshot = {'status': job['status'], 'result': job.get('result')} + return jsonify(snapshot) @app.route('/api/avatar') diff --git a/monitor.py b/monitor.py index ef2a5bf..63b16a8 100644 --- a/monitor.py +++ b/monitor.py @@ -355,8 +355,8 @@ class LinkStatsCollector: if not ifaces or not self.pulse.url: return {} - # Validate interface names (kernel names only contain [a-zA-Z0-9_.-]) - safe_ifaces = [i for i in ifaces if re.match(r'^[a-zA-Z0-9_.-]+$', i)] + # Validate interface names (kernel names: [a-zA-Z0-9_.-], max 15 chars per IFNAMSIZ) + safe_ifaces = [i for i in ifaces if re.match(r'^[a-zA-Z0-9_.-]{1,15}$', i)] if not safe_ifaces: return {}