"""Gandalf – Global Advanced Network Detection And Link Facilitator. Flask web application serving the monitoring dashboard and suppression management UI. Authentication via Authelia forward-auth headers. All monitoring and alerting is handled by the separate monitor.py daemon. """ import hashlib import html import ipaddress import json import logging import os import re import tempfile import threading import time import uuid from datetime import datetime, timezone from functools import wraps from flask import Flask, jsonify, make_response, render_template, request, send_file import db import diagnose from monitor import PulseClient logging.basicConfig( level=logging.INFO, format='%(asctime)s %(levelname)s %(name)s %(message)s', ) logger = logging.getLogger('gandalf.web') app = Flask(__name__) _AVATAR_COLORS = ['lt-avatar--orange', 'lt-avatar--green', 'lt-avatar--purple', ''] @app.template_filter('avatar_color') def avatar_color_filter(name: str) -> str: return _AVATAR_COLORS[int(hashlib.md5(name.encode()).hexdigest(), 16) % len(_AVATAR_COLORS)] # nosec B324 _cfg = None _cfg_lock = threading.Lock() @app.context_processor def inject_config(): """Inject safe config values into all templates.""" cfg = _config() return { 'config': { 'ticket_api': { 'web_url': cfg.get('ticket_api', {}).get('web_url', 'http://t.lotusguild.org/ticket/'), } } } # In-memory diagnostic job store { job_id: { status, result, created_at } } _diag_jobs: dict = {} _diag_lock = threading.Lock() # Per-user rate-limit: { username: [epoch_float, ...] } — cleaned inside _diag_lock _diag_rate: dict = {} def _purge_old_jobs_loop(): """Background thread: remove stale diag jobs and run daily event purge.""" while True: time.sleep(120) cutoff = time.time() - 600 stuck_cutoff = time.time() - 300 # 5 min: job still 'running' → thread must have crashed with _diag_lock: stale = [jid for jid, j in _diag_jobs.items() if j.get('created_at', 0) < cutoff] for jid in stale: del _diag_jobs[jid] for jid, j in list(_diag_jobs.items()): if j['status'] == 'running' and j.get('created_at', 0) < stuck_cutoff: j['status'] = 'done' j['result'] = {'status': 'error', 'error': 'Diagnostic abandoned — no activity for 5 minutes.'} logger.error(f'Diagnostic job {jid} stuck (no activity for 5 min); marked done/error') _purge_thread = threading.Thread(target=_purge_old_jobs_loop, daemon=True) _purge_thread.start() def _config() -> dict: global _cfg if _cfg is None: with _cfg_lock: if _cfg is None: with open('config.json') as f: _cfg = json.load(f) return _cfg @app.after_request def add_security_headers(response): response.headers.setdefault('X-Content-Type-Options', 'nosniff') response.headers.setdefault('X-Frame-Options', 'DENY') response.headers.setdefault('Referrer-Policy', 'strict-origin-when-cross-origin') return response def _daemon_ok(last_check: str) -> bool: """Return True if monitor last checked within 20 minutes.""" if not last_check or last_check == 'Never': return False try: ts = datetime.strptime(last_check, '%Y-%m-%d %H:%M:%S UTC').replace(tzinfo=timezone.utc) return (datetime.now(timezone.utc) - ts).total_seconds() < 1200 except Exception: return False # --------------------------------------------------------------------------- # Auth helpers # --------------------------------------------------------------------------- def _get_user() -> dict: return { 'username': request.headers.get('Remote-User', ''), 'name': request.headers.get('Remote-Name', ''), 'email': request.headers.get('Remote-Email', ''), 'groups': [ g.strip() for g in request.headers.get('Remote-Groups', '').split(',') if g.strip() ], } def require_auth(f): @wraps(f) def wrapper(*args, **kwargs): user = _get_user() if not user['username']: return ( '
Please access Gandalf through ' 'auth.lotusguild.org.
', 401, ) allowed = _config().get('auth', {}).get('allowed_groups', ['admin']) if not any(g in allowed for g in user['groups']): safe_user = html.escape(user['username']) safe_groups = html.escape(', '.join(allowed)) return ( f'Your account ({safe_user}) is not in an allowed group ' f'({safe_groups}).
', 403, ) return f(*args, **kwargs) return wrapper def require_admin(f): """Decorator: require require_auth AND membership in the 'admin' group.""" @wraps(f) def wrapper(*args, **kwargs): user = _get_user() if 'admin' not in user.get('groups', []): return jsonify({'error': 'Admin access required'}), 403 return f(*args, **kwargs) return wrapper # --------------------------------------------------------------------------- # Helpers # --------------------------------------------------------------------------- _PAGE_LIMIT = 200 # max events returned per request def _annotate_suppressions(events: list, suppressions: list) -> None: """Annotate each event dict in-place with an is_suppressed bool.""" for ev in events: sup_type = ( 'unifi_device' if ev.get('event_type') == 'unifi_device_offline' else 'interface' if ev.get('event_type') == 'interface_down' else 'host' ) ev['is_suppressed'] = db.check_suppressed( suppressions, sup_type, ev.get('target_name', ''), ev.get('target_detail', '') or '', ) # --------------------------------------------------------------------------- # Page routes # --------------------------------------------------------------------------- @app.route('/') @require_auth def index(): user = _get_user() events = db.get_active_events(limit=_PAGE_LIMIT) total_active = db.count_active_events() summary = db.get_status_summary() snapshot_raw = db.get_state('network_snapshot') last_check = db.get_state('last_check', 'Never') try: snapshot = json.loads(snapshot_raw) if snapshot_raw else {} except Exception as e: logger.error(f'Failed to parse network_snapshot JSON: {e}') snapshot = {} suppressions = db.get_active_suppressions() _annotate_suppressions(events, suppressions) recent_resolved = db.get_recent_resolved(hours=24, limit=10) return render_template( 'index.html', user=user, events=events, total_active=total_active, summary=summary, snapshot=snapshot, last_check=last_check, suppressions=suppressions, recent_resolved=recent_resolved, daemon_ok=_daemon_ok(last_check), ) @app.route('/links') @require_auth def links_page(): user = _get_user() return render_template('links.html', user=user) @app.route('/inspector') @require_auth def inspector(): user = _get_user() return render_template('inspector.html', user=user) @app.route('/suppressions') @require_auth @require_admin def suppressions_page(): user = _get_user() active = db.get_active_suppressions() history = db.get_suppression_history(limit=50) snapshot_raw = db.get_state('network_snapshot') try: snapshot = json.loads(snapshot_raw) if snapshot_raw else {} except Exception as e: logger.error(f'Failed to parse network_snapshot JSON: {e}') snapshot = {} return render_template( 'suppressions.html', user=user, active=active, history=history, snapshot=snapshot, ) # --------------------------------------------------------------------------- # API routes # --------------------------------------------------------------------------- @app.route('/api/status') @require_auth def api_status(): active = db.get_active_events(limit=_PAGE_LIMIT) suppressions = db.get_active_suppressions() _annotate_suppressions(active, suppressions) last_check = db.get_state('last_check', 'Never') return jsonify({ 'summary': db.get_status_summary(), 'last_check': last_check, 'events': active, 'total_active': db.count_active_events(), 'daemon_ok': _daemon_ok(last_check), }) @app.route('/api/network') @require_auth def api_network(): raw = db.get_state('network_snapshot') if raw: try: return jsonify(json.loads(raw)) except Exception: logger.error('Failed to parse network_snapshot JSON') return jsonify({'hosts': {}, 'unifi': [], 'updated': None}) @app.route('/api/links') @require_auth def api_links(): raw = db.get_state('link_stats') if raw: if len(raw) > 10_000_000: logger.error(f'link_stats exceeds 10 MB ({len(raw)} bytes); possible corruption') return jsonify({'error': 'Invalid cached data'}), 503 try: return jsonify(json.loads(raw)) except Exception as e: logger.error(f'Failed to parse link_stats JSON: {e}') return jsonify({'hosts': {}, 'unifi_switches': {}, 'updated': None}) @app.route('/api/events') @require_auth def api_events(): try: limit = min(int(request.args.get('limit', _PAGE_LIMIT)), 1000) offset = max(int(request.args.get('offset', 0)), 0) except ValueError: return jsonify({'error': 'limit and offset must be integers'}), 400 status_filter = request.args.get('status', 'active') if status_filter not in ('active', 'resolved', 'all'): return jsonify({'error': 'status must be active, resolved, or all'}), 400 result: dict = {} if status_filter in ('active', 'all'): result['active'] = db.get_active_events(limit=limit, offset=offset) result['total_active'] = db.count_active_events() if status_filter in ('resolved', 'all'): result['resolved'] = db.get_recent_resolved(hours=24, limit=30) return jsonify(result) @app.route('/api/suppressions', methods=['GET']) @require_auth def api_get_suppressions(): return jsonify(db.get_active_suppressions()) @app.route('/api/suppressions', methods=['POST']) @require_auth @require_admin def api_create_suppression(): user = _get_user() data = request.get_json(silent=True) or {} target_type = data.get('target_type', 'host') target_name = (data.get('target_name') or '').strip() target_detail = (data.get('target_detail') or '').strip() reason = (data.get('reason') or '').strip() expires_minutes = data.get('expires_minutes') # None = manual/permanent if target_type not in ('host', 'interface', 'unifi_device', 'all'): return jsonify({'error': 'Invalid target_type'}), 400 if target_type != 'all' and not target_name: return jsonify({'error': 'target_name required'}), 400 if not reason: return jsonify({'error': 'reason required'}), 400 if len(reason) > 500: return jsonify({'error': 'reason must be 500 characters or fewer'}), 400 if len(target_name) > 255: return jsonify({'error': 'target_name must be 255 characters or fewer'}), 400 if len(target_detail) > 255: return jsonify({'error': 'target_detail must be 255 characters or fewer'}), 400 if expires_minutes is not None: try: expires_minutes = int(expires_minutes) if expires_minutes <= 0 or expires_minutes > 43200: return jsonify({'error': 'expires_minutes must be between 1 and 43200 (30 days)'}), 400 except (ValueError, TypeError): return jsonify({'error': 'expires_minutes must be a valid integer'}), 400 sup_id = db.create_suppression( target_type=target_type, target_name=target_name, target_detail=target_detail, reason=reason, suppressed_by=user['username'], expires_minutes=expires_minutes, ) logger.info( f'Suppression #{sup_id} created by {user["username"]}: ' f'{target_type}/{target_name}/{target_detail} – {reason}' ) return jsonify({'success': True, 'id': sup_id}) @app.route('/api/suppressions/