"""Gandalf – Global Advanced Network Detection And Link Facilitator. Flask web application serving the monitoring dashboard and suppression management UI. Authentication via Authelia forward-auth headers. All monitoring and alerting is handled by the separate monitor.py daemon. """ import ipaddress import json import logging import re import threading import time import uuid from functools import wraps from flask import Flask, jsonify, redirect, render_template, request, url_for import db import diagnose from monitor import PulseClient logging.basicConfig( level=logging.INFO, format='%(asctime)s %(levelname)s %(name)s %(message)s', ) logger = logging.getLogger('gandalf.web') app = Flask(__name__) _cfg = None @app.context_processor def inject_config(): """Inject safe config values into all templates.""" cfg = _config() return { 'config': { 'ticket_api': { 'web_url': cfg.get('ticket_api', {}).get('web_url', 'http://t.lotusguild.org/ticket/'), } } } # In-memory diagnostic job store { job_id: { status, result, created_at } } _diag_jobs: dict = {} _diag_lock = threading.Lock() def _purge_old_jobs_loop(): """Background thread: remove jobs older than 10 minutes and mark stuck running jobs as errored.""" while True: time.sleep(120) cutoff = time.time() - 600 stuck_cutoff = time.time() - 300 # 5 min: job still 'running' → thread must have crashed with _diag_lock: stale = [jid for jid, j in _diag_jobs.items() if j.get('created_at', 0) < cutoff] for jid in stale: del _diag_jobs[jid] for jid, j in _diag_jobs.items(): if j['status'] == 'running' and j.get('created_at', 0) < stuck_cutoff: j['status'] = 'done' j['result'] = {'status': 'error', 'error': 'Diagnostic timed out (thread crash)'} logger.error(f'Diagnostic job {jid} appeared stuck; marked as errored') _purge_thread = threading.Thread(target=_purge_old_jobs_loop, daemon=True) _purge_thread.start() def _config() -> dict: global _cfg if _cfg is None: with open('config.json') as f: _cfg = json.load(f) return _cfg # --------------------------------------------------------------------------- # Auth helpers # --------------------------------------------------------------------------- def _get_user() -> dict: return { 'username': request.headers.get('Remote-User', ''), 'name': request.headers.get('Remote-Name', ''), 'email': request.headers.get('Remote-Email', ''), 'groups': [ g.strip() for g in request.headers.get('Remote-Groups', '').split(',') if g.strip() ], } def require_auth(f): @wraps(f) def wrapper(*args, **kwargs): user = _get_user() if not user['username']: return ( '

401 – Not authenticated

' '

Please access Gandalf through ' 'auth.lotusguild.org.

', 401, ) allowed = _config().get('auth', {}).get('allowed_groups', ['admin']) if not any(g in allowed for g in user['groups']): return ( f'

403 – Access denied

' f'

Your account ({user["username"]}) is not in an allowed group ' f'({", ".join(allowed)}).

', 403, ) return f(*args, **kwargs) return wrapper # --------------------------------------------------------------------------- # Page routes # --------------------------------------------------------------------------- @app.route('/') @require_auth def index(): user = _get_user() events = db.get_active_events() summary = db.get_status_summary() snapshot_raw = db.get_state('network_snapshot') last_check = db.get_state('last_check', 'Never') snapshot = json.loads(snapshot_raw) if snapshot_raw else {} suppressions = db.get_active_suppressions() return render_template( 'index.html', user=user, events=events, summary=summary, snapshot=snapshot, last_check=last_check, suppressions=suppressions, ) @app.route('/links') @require_auth def links_page(): user = _get_user() return render_template('links.html', user=user) @app.route('/inspector') @require_auth def inspector(): user = _get_user() return render_template('inspector.html', user=user) @app.route('/suppressions') @require_auth def suppressions_page(): user = _get_user() active = db.get_active_suppressions() history = db.get_suppression_history(limit=50) snapshot_raw = db.get_state('network_snapshot') snapshot = json.loads(snapshot_raw) if snapshot_raw else {} return render_template( 'suppressions.html', user=user, active=active, history=history, snapshot=snapshot, ) # --------------------------------------------------------------------------- # API routes # --------------------------------------------------------------------------- @app.route('/api/status') @require_auth def api_status(): return jsonify({ 'summary': db.get_status_summary(), 'last_check': db.get_state('last_check', 'Never'), 'events': db.get_active_events(), }) @app.route('/api/network') @require_auth def api_network(): raw = db.get_state('network_snapshot') if raw: try: return jsonify(json.loads(raw)) except Exception: logger.error('Failed to parse network_snapshot JSON') return jsonify({'hosts': {}, 'unifi': [], 'updated': None}) @app.route('/api/links') @require_auth def api_links(): raw = db.get_state('link_stats') if raw: try: return jsonify(json.loads(raw)) except Exception: logger.error('Failed to parse link_stats JSON') return jsonify({'hosts': {}, 'updated': None}) @app.route('/api/events') @require_auth def api_events(): return jsonify({ 'active': db.get_active_events(), 'resolved': db.get_recent_resolved(hours=24, limit=30), }) @app.route('/api/suppressions', methods=['GET']) @require_auth def api_get_suppressions(): return jsonify(db.get_active_suppressions()) @app.route('/api/suppressions', methods=['POST']) @require_auth def api_create_suppression(): user = _get_user() data = request.get_json(silent=True) or {} target_type = data.get('target_type', 'host') target_name = (data.get('target_name') or '').strip() target_detail = (data.get('target_detail') or '').strip() reason = (data.get('reason') or '').strip() expires_minutes = data.get('expires_minutes') # None = manual/permanent if target_type not in ('host', 'interface', 'unifi_device', 'all'): return jsonify({'error': 'Invalid target_type'}), 400 if target_type != 'all' and not target_name: return jsonify({'error': 'target_name required'}), 400 if not reason: return jsonify({'error': 'reason required'}), 400 sup_id = db.create_suppression( target_type=target_type, target_name=target_name, target_detail=target_detail, reason=reason, suppressed_by=user['username'], expires_minutes=int(expires_minutes) if expires_minutes else None, ) logger.info( f'Suppression #{sup_id} created by {user["username"]}: ' f'{target_type}/{target_name}/{target_detail} – {reason}' ) return jsonify({'success': True, 'id': sup_id}) @app.route('/api/suppressions/', methods=['DELETE']) @require_auth def api_delete_suppression(sup_id: int): user = _get_user() db.deactivate_suppression(sup_id) logger.info(f'Suppression #{sup_id} removed by {user["username"]}') return jsonify({'success': True}) @app.route('/api/diagnose', methods=['POST']) @require_auth def api_diagnose_start(): """Start a link diagnostic job. Returns {job_id}.""" data = request.get_json(silent=True) or {} switch_name = (data.get('switch_name') or '').strip() try: port_idx = int(data.get('port_idx')) except (TypeError, ValueError): return jsonify({'error': 'port_idx must be an integer'}), 400 if not switch_name: return jsonify({'error': 'switch_name and port_idx required'}), 400 # Look up switch + port in cached link_stats raw = db.get_state('link_stats') if not raw: return jsonify({'error': 'No link_stats data available'}), 503 try: link_data = json.loads(raw) except Exception: logger.error('Failed to parse link_stats JSON in /api/diagnose') return jsonify({'error': 'Internal data error'}), 500 switches = link_data.get('unifi_switches', {}) sw = switches.get(switch_name) if not sw: return jsonify({'error': f'Switch "{switch_name}" not found'}), 404 # Find port by port_idx port_data = None for pname, pd in sw.get('ports', {}).items(): if pd.get('port_idx') == port_idx: port_data = dict(pd) port_data['name'] = pname break if not port_data: return jsonify({'error': f'Port {port_idx} not found on switch "{switch_name}"'}), 404 # LLDP neighbor required to know which host+iface to SSH into lldp = port_data.get('lldp') if not lldp or not lldp.get('system_name'): return jsonify({'error': 'No LLDP neighbor data for this port'}), 400 server_name = lldp['system_name'] lldp_port_id = lldp.get('port_id', '') # Find matching host + interface in link_stats hosts hosts = link_data.get('hosts', {}) server_ifaces = hosts.get(server_name) if not server_ifaces: return jsonify({'error': f'Host "{server_name}" not in link stats'}), 404 # Match interface by LLDP port_id (exact then fuzzy) matched_iface = None if lldp_port_id and lldp_port_id in server_ifaces: matched_iface = lldp_port_id if not matched_iface and lldp_port_id: matched_iface = next( (k for k in server_ifaces if lldp_port_id in k or k in lldp_port_id), None ) if not matched_iface: matched_iface = next(iter(server_ifaces), None) if not matched_iface: return jsonify({'error': 'Cannot determine server interface'}), 400 # Resolve host IP from link_stats host data host_ip = (server_ifaces.get(matched_iface) or {}).get('host_ip') if not host_ip: # Fallback: use LLDP mgmt IPs mgmt_ips = lldp.get('mgmt_ips') or [] host_ip = mgmt_ips[0] if mgmt_ips else None if not host_ip: return jsonify({'error': 'Cannot determine host IP for SSH'}), 400 # Validate resolved values before passing to SSH command builder try: ipaddress.ip_address(host_ip) except ValueError: logger.error(f'Refusing diagnostic: invalid host_ip "{host_ip}" for {server_name}') return jsonify({'error': 'Resolved host IP is not a valid IP address'}), 400 if not re.fullmatch(r'[a-zA-Z0-9._-]+', matched_iface): logger.error(f'Refusing diagnostic: invalid iface "{matched_iface}" for {server_name}') return jsonify({'error': 'Resolved interface name contains invalid characters'}), 400 job_id = str(uuid.uuid4()) with _diag_lock: _diag_jobs[job_id] = {'status': 'running', 'result': None, 'created_at': time.time()} def _run(): try: cfg = _config() pulse = PulseClient(cfg) runner = diagnose.DiagnosticsRunner(pulse) result = runner.run(host_ip, server_name, matched_iface, port_data) except Exception as e: logger.error(f'Diagnostic job {job_id} failed: {e}', exc_info=True) result = {'status': 'error', 'error': str(e)} with _diag_lock: if job_id in _diag_jobs: _diag_jobs[job_id]['status'] = 'done' _diag_jobs[job_id]['result'] = result t = threading.Thread(target=_run, daemon=True) t.start() return jsonify({'job_id': job_id}) @app.route('/api/diagnose/', methods=['GET']) @require_auth def api_diagnose_poll(job_id: str): """Poll a diagnostic job. Returns {status, result}.""" with _diag_lock: job = _diag_jobs.get(job_id) if not job: return jsonify({'error': 'Job not found'}), 404 return jsonify({'status': job['status'], 'result': job.get('result')}) @app.route('/health') def health(): """Health check endpoint (no auth). Checks DB and monitor freshness.""" checks = {} overall = 'ok' # DB connectivity try: db.get_state('last_check') checks['db'] = 'ok' except Exception as e: checks['db'] = f'error: {e}' overall = 'degraded' # Monitor freshness: fail if last_check is older than 20 minutes try: last_check = db.get_state('last_check', '') if last_check: from datetime import datetime, timezone ts = datetime.strptime(last_check, '%Y-%m-%d %H:%M:%S UTC').replace(tzinfo=timezone.utc) age_s = (datetime.now(timezone.utc) - ts).total_seconds() if age_s > 1200: checks['monitor'] = f'stale ({int(age_s)}s since last check)' overall = 'degraded' else: checks['monitor'] = f'ok ({int(age_s)}s ago)' else: checks['monitor'] = 'no data yet' except Exception as e: checks['monitor'] = f'error: {e}' overall = 'degraded' status_code = 200 if overall == 'ok' else 503 return jsonify({'status': overall, 'service': 'gandalf', 'checks': checks}), status_code if __name__ == '__main__': app.run(debug=True, host='0.0.0.0', port=5000)