Security and reliability fixes: input validation, logging, job cleanup
- C5: Validate host_ip (IPv4 check) and iface (allowlist regex) before SSH command builder - H6: Upgrade Pulse failure logging from debug to error so operators see outages - M6: Replace per-request O(n) purge with background daemon thread (runs every 2 min) - M7: Background thread marks jobs stuck in 'running' > 5 min as errored Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
29
app.py
29
app.py
@@ -4,8 +4,10 @@ Flask web application serving the monitoring dashboard and suppression
|
|||||||
management UI. Authentication via Authelia forward-auth headers.
|
management UI. Authentication via Authelia forward-auth headers.
|
||||||
All monitoring and alerting is handled by the separate monitor.py daemon.
|
All monitoring and alerting is handled by the separate monitor.py daemon.
|
||||||
"""
|
"""
|
||||||
|
import ipaddress
|
||||||
import json
|
import json
|
||||||
import logging
|
import logging
|
||||||
|
import re
|
||||||
import threading
|
import threading
|
||||||
import time
|
import time
|
||||||
import uuid
|
import uuid
|
||||||
@@ -32,13 +34,25 @@ _diag_jobs: dict = {}
|
|||||||
_diag_lock = threading.Lock()
|
_diag_lock = threading.Lock()
|
||||||
|
|
||||||
|
|
||||||
def _purge_old_jobs():
|
def _purge_old_jobs_loop():
|
||||||
"""Remove jobs older than 10 minutes (called before each new job creation)."""
|
"""Background thread: remove jobs older than 10 minutes and mark stuck running jobs as errored."""
|
||||||
|
while True:
|
||||||
|
time.sleep(120)
|
||||||
cutoff = time.time() - 600
|
cutoff = time.time() - 600
|
||||||
|
stuck_cutoff = time.time() - 300 # 5 min: job still 'running' → thread must have crashed
|
||||||
with _diag_lock:
|
with _diag_lock:
|
||||||
stale = [jid for jid, j in _diag_jobs.items() if j.get('created_at', 0) < cutoff]
|
stale = [jid for jid, j in _diag_jobs.items() if j.get('created_at', 0) < cutoff]
|
||||||
for jid in stale:
|
for jid in stale:
|
||||||
del _diag_jobs[jid]
|
del _diag_jobs[jid]
|
||||||
|
for jid, j in _diag_jobs.items():
|
||||||
|
if j['status'] == 'running' and j.get('created_at', 0) < stuck_cutoff:
|
||||||
|
j['status'] = 'done'
|
||||||
|
j['result'] = {'status': 'error', 'error': 'Diagnostic timed out (thread crash)'}
|
||||||
|
logger.error(f'Diagnostic job {jid} appeared stuck; marked as errored')
|
||||||
|
|
||||||
|
|
||||||
|
_purge_thread = threading.Thread(target=_purge_old_jobs_loop, daemon=True)
|
||||||
|
_purge_thread.start()
|
||||||
|
|
||||||
|
|
||||||
def _config() -> dict:
|
def _config() -> dict:
|
||||||
@@ -314,7 +328,16 @@ def api_diagnose_start():
|
|||||||
if not host_ip:
|
if not host_ip:
|
||||||
return jsonify({'error': 'Cannot determine host IP for SSH'}), 400
|
return jsonify({'error': 'Cannot determine host IP for SSH'}), 400
|
||||||
|
|
||||||
_purge_old_jobs()
|
# Validate resolved values before passing to SSH command builder
|
||||||
|
try:
|
||||||
|
ipaddress.ip_address(host_ip)
|
||||||
|
except ValueError:
|
||||||
|
logger.error(f'Refusing diagnostic: invalid host_ip "{host_ip}" for {server_name}')
|
||||||
|
return jsonify({'error': 'Resolved host IP is not a valid IP address'}), 400
|
||||||
|
if not re.fullmatch(r'[a-zA-Z0-9._-]+', matched_iface):
|
||||||
|
logger.error(f'Refusing diagnostic: invalid iface "{matched_iface}" for {server_name}')
|
||||||
|
return jsonify({'error': 'Resolved interface name contains invalid characters'}), 400
|
||||||
|
|
||||||
job_id = str(uuid.uuid4())
|
job_id = str(uuid.uuid4())
|
||||||
with _diag_lock:
|
with _diag_lock:
|
||||||
_diag_jobs[job_id] = {'status': 'running', 'result': None, 'created_at': time.time()}
|
_diag_jobs[job_id] = {'status': 'running', 'result': None, 'created_at': time.time()}
|
||||||
|
|||||||
@@ -261,7 +261,7 @@ class PulseClient:
|
|||||||
execution_id = resp.json()['execution_id']
|
execution_id = resp.json()['execution_id']
|
||||||
self.last_execution_id = execution_id
|
self.last_execution_id = execution_id
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.debug(f'Pulse command submit failed: {e}')
|
logger.error(f'Pulse command submit failed: {e}')
|
||||||
return None
|
return None
|
||||||
|
|
||||||
deadline = time.time() + self.timeout
|
deadline = time.time() + self.timeout
|
||||||
@@ -284,7 +284,7 @@ class PulseClient:
|
|||||||
if status == 'failed':
|
if status == 'failed':
|
||||||
return None
|
return None
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.debug(f'Pulse poll failed: {e}')
|
logger.error(f'Pulse poll failed: {e}')
|
||||||
logger.warning(f'Pulse command timed out after {self.timeout}s')
|
logger.warning(f'Pulse command timed out after {self.timeout}s')
|
||||||
return None
|
return None
|
||||||
|
|
||||||
@@ -340,7 +340,7 @@ class LinkStatsCollector:
|
|||||||
)
|
)
|
||||||
output = self.pulse.run_command(ssh_cmd)
|
output = self.pulse.run_command(ssh_cmd)
|
||||||
if output is None:
|
if output is None:
|
||||||
logger.debug(f'Pulse ethtool collection returned None for {ip}')
|
logger.error(f'Pulse ethtool collection returned None for {ip}')
|
||||||
return {}
|
return {}
|
||||||
|
|
||||||
return self._parse_ssh_output(output)
|
return self._parse_ssh_output(output)
|
||||||
|
|||||||
Reference in New Issue
Block a user