diff --git a/app.py b/app.py index 20373a6..8acdd07 100644 --- a/app.py +++ b/app.py @@ -4,8 +4,10 @@ Flask web application serving the monitoring dashboard and suppression management UI. Authentication via Authelia forward-auth headers. All monitoring and alerting is handled by the separate monitor.py daemon. """ +import ipaddress import json import logging +import re import threading import time import uuid @@ -32,13 +34,25 @@ _diag_jobs: dict = {} _diag_lock = threading.Lock() -def _purge_old_jobs(): - """Remove jobs older than 10 minutes (called before each new job creation).""" - cutoff = time.time() - 600 - with _diag_lock: - stale = [jid for jid, j in _diag_jobs.items() if j.get('created_at', 0) < cutoff] - for jid in stale: - del _diag_jobs[jid] +def _purge_old_jobs_loop(): + """Background thread: remove jobs older than 10 minutes and mark stuck running jobs as errored.""" + while True: + time.sleep(120) + cutoff = time.time() - 600 + stuck_cutoff = time.time() - 300 # 5 min: job still 'running' → thread must have crashed + with _diag_lock: + stale = [jid for jid, j in _diag_jobs.items() if j.get('created_at', 0) < cutoff] + for jid in stale: + del _diag_jobs[jid] + for jid, j in _diag_jobs.items(): + if j['status'] == 'running' and j.get('created_at', 0) < stuck_cutoff: + j['status'] = 'done' + j['result'] = {'status': 'error', 'error': 'Diagnostic timed out (thread crash)'} + logger.error(f'Diagnostic job {jid} appeared stuck; marked as errored') + + +_purge_thread = threading.Thread(target=_purge_old_jobs_loop, daemon=True) +_purge_thread.start() def _config() -> dict: @@ -314,7 +328,16 @@ def api_diagnose_start(): if not host_ip: return jsonify({'error': 'Cannot determine host IP for SSH'}), 400 - _purge_old_jobs() + # Validate resolved values before passing to SSH command builder + try: + ipaddress.ip_address(host_ip) + except ValueError: + logger.error(f'Refusing diagnostic: invalid host_ip "{host_ip}" for {server_name}') + return jsonify({'error': 'Resolved host IP is not a valid IP address'}), 400 + if not re.fullmatch(r'[a-zA-Z0-9._-]+', matched_iface): + logger.error(f'Refusing diagnostic: invalid iface "{matched_iface}" for {server_name}') + return jsonify({'error': 'Resolved interface name contains invalid characters'}), 400 + job_id = str(uuid.uuid4()) with _diag_lock: _diag_jobs[job_id] = {'status': 'running', 'result': None, 'created_at': time.time()} diff --git a/monitor.py b/monitor.py index e550fb6..e4deed6 100644 --- a/monitor.py +++ b/monitor.py @@ -261,7 +261,7 @@ class PulseClient: execution_id = resp.json()['execution_id'] self.last_execution_id = execution_id except Exception as e: - logger.debug(f'Pulse command submit failed: {e}') + logger.error(f'Pulse command submit failed: {e}') return None deadline = time.time() + self.timeout @@ -284,7 +284,7 @@ class PulseClient: if status == 'failed': return None except Exception as e: - logger.debug(f'Pulse poll failed: {e}') + logger.error(f'Pulse poll failed: {e}') logger.warning(f'Pulse command timed out after {self.timeout}s') return None @@ -340,7 +340,7 @@ class LinkStatsCollector: ) output = self.pulse.run_command(ssh_cmd) if output is None: - logger.debug(f'Pulse ethtool collection returned None for {ip}') + logger.error(f'Pulse ethtool collection returned None for {ip}') return {} return self._parse_ssh_output(output)