Files
gandalf/app.py
T
jared 9c4dd5df51
Lint / Python (flake8) (push) Successful in 40s
Lint / JS (eslint) (push) Successful in 7s
Security / Python Security (bandit) (push) Successful in 44s
Test / Python Tests (pytest) (push) Successful in 49s
Lint / Notify on failure (push) Has been skipped
Lint / Deploy (push) Successful in 3s
fix: admin-only suppression enforcement, links.html broken date parsing
Security: add require_admin decorator; apply to POST/DELETE /api/suppressions
and /suppressions page. Previously any user in allowed_groups could create or
delete suppressions even though the nav restricts the UI to admins.

Bug: links.html "Updated:" timestamp and stale-warning both produced
Invalid Date because the raw "YYYY-MM-DD HH:MM:SS UTC" string was appended
with 'Z' instead of being normalised through _toIso(). Fix both call sites to
use _toIso(), and remove the now-redundant local _toIso redefinition.

Style: use `with open(sentinel, 'w'): pass` consistently (was open().close()
at avatar JPEG validation path).

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-11 13:03:37 -04:00

683 lines
24 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""Gandalf Global Advanced Network Detection And Link Facilitator.
Flask web application serving the monitoring dashboard and suppression
management UI. Authentication via Authelia forward-auth headers.
All monitoring and alerting is handled by the separate monitor.py daemon.
"""
import hashlib
import html
import ipaddress
import json
import logging
import os
import re
import tempfile
import threading
import time
import uuid
from datetime import datetime, timezone
from functools import wraps
from flask import Flask, jsonify, make_response, render_template, request, send_file
import db
import diagnose
from monitor import PulseClient
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s %(levelname)s %(name)s %(message)s',
)
logger = logging.getLogger('gandalf.web')
app = Flask(__name__)
_AVATAR_COLORS = ['lt-avatar--orange', 'lt-avatar--green', 'lt-avatar--purple', '']
@app.template_filter('avatar_color')
def avatar_color_filter(name: str) -> str:
return _AVATAR_COLORS[int(hashlib.md5(name.encode()).hexdigest(), 16) % len(_AVATAR_COLORS)] # nosec B324
_cfg = None
_cfg_lock = threading.Lock()
@app.context_processor
def inject_config():
"""Inject safe config values into all templates."""
cfg = _config()
return {
'config': {
'ticket_api': {
'web_url': cfg.get('ticket_api', {}).get('web_url', 'http://t.lotusguild.org/ticket/'),
}
}
}
# In-memory diagnostic job store { job_id: { status, result, created_at } }
_diag_jobs: dict = {}
_diag_lock = threading.Lock()
# Per-user rate-limit: { username: [epoch_float, ...] } — cleaned inside _diag_lock
_diag_rate: dict = {}
def _purge_old_jobs_loop():
"""Background thread: remove stale diag jobs and run daily event purge."""
while True:
time.sleep(120)
cutoff = time.time() - 600
stuck_cutoff = time.time() - 300 # 5 min: job still 'running' → thread must have crashed
with _diag_lock:
stale = [jid for jid, j in _diag_jobs.items() if j.get('created_at', 0) < cutoff]
for jid in stale:
del _diag_jobs[jid]
for jid, j in list(_diag_jobs.items()):
if j['status'] == 'running' and j.get('created_at', 0) < stuck_cutoff:
j['status'] = 'done'
j['result'] = {'status': 'error', 'error': 'Diagnostic abandoned — no activity for 5 minutes.'}
logger.error(f'Diagnostic job {jid} stuck (no activity for 5 min); marked done/error')
_purge_thread = threading.Thread(target=_purge_old_jobs_loop, daemon=True)
_purge_thread.start()
def _config() -> dict:
global _cfg
if _cfg is None:
with _cfg_lock:
if _cfg is None:
with open('config.json') as f:
_cfg = json.load(f)
return _cfg
@app.after_request
def add_security_headers(response):
response.headers.setdefault('X-Content-Type-Options', 'nosniff')
response.headers.setdefault('X-Frame-Options', 'DENY')
response.headers.setdefault('Referrer-Policy', 'strict-origin-when-cross-origin')
return response
def _daemon_ok(last_check: str) -> bool:
"""Return True if monitor last checked within 20 minutes."""
if not last_check or last_check == 'Never':
return False
try:
ts = datetime.strptime(last_check, '%Y-%m-%d %H:%M:%S UTC').replace(tzinfo=timezone.utc)
return (datetime.now(timezone.utc) - ts).total_seconds() < 1200
except Exception:
return False
# ---------------------------------------------------------------------------
# Auth helpers
# ---------------------------------------------------------------------------
def _get_user() -> dict:
return {
'username': request.headers.get('Remote-User', ''),
'name': request.headers.get('Remote-Name', ''),
'email': request.headers.get('Remote-Email', ''),
'groups': [
g.strip()
for g in request.headers.get('Remote-Groups', '').split(',')
if g.strip()
],
}
def require_auth(f):
@wraps(f)
def wrapper(*args, **kwargs):
user = _get_user()
if not user['username']:
return (
'<h1>401 Not authenticated</h1>'
'<p>Please access Gandalf through '
'<a href="https://auth.lotusguild.org">auth.lotusguild.org</a>.</p>',
401,
)
allowed = _config().get('auth', {}).get('allowed_groups', ['admin'])
if not any(g in allowed for g in user['groups']):
safe_user = html.escape(user['username'])
safe_groups = html.escape(', '.join(allowed))
return (
f'<h1>403 Access denied</h1>'
f'<p>Your account ({safe_user}) is not in an allowed group '
f'({safe_groups}).</p>',
403,
)
return f(*args, **kwargs)
return wrapper
def require_admin(f):
"""Decorator: require require_auth AND membership in the 'admin' group."""
@wraps(f)
def wrapper(*args, **kwargs):
user = _get_user()
if 'admin' not in user.get('groups', []):
return jsonify({'error': 'Admin access required'}), 403
return f(*args, **kwargs)
return wrapper
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
_PAGE_LIMIT = 200 # max events returned per request
def _annotate_suppressions(events: list, suppressions: list) -> None:
"""Annotate each event dict in-place with an is_suppressed bool."""
for ev in events:
sup_type = (
'unifi_device' if ev.get('event_type') == 'unifi_device_offline'
else 'interface' if ev.get('event_type') == 'interface_down'
else 'host'
)
ev['is_suppressed'] = db.check_suppressed(
suppressions, sup_type,
ev.get('target_name', ''), ev.get('target_detail', '') or '',
)
# ---------------------------------------------------------------------------
# Page routes
# ---------------------------------------------------------------------------
@app.route('/')
@require_auth
def index():
user = _get_user()
events = db.get_active_events(limit=_PAGE_LIMIT)
total_active = db.count_active_events()
summary = db.get_status_summary()
snapshot_raw = db.get_state('network_snapshot')
last_check = db.get_state('last_check', 'Never')
try:
snapshot = json.loads(snapshot_raw) if snapshot_raw else {}
except Exception as e:
logger.error(f'Failed to parse network_snapshot JSON: {e}')
snapshot = {}
suppressions = db.get_active_suppressions()
_annotate_suppressions(events, suppressions)
recent_resolved = db.get_recent_resolved(hours=24, limit=10)
return render_template(
'index.html',
user=user,
events=events,
total_active=total_active,
summary=summary,
snapshot=snapshot,
last_check=last_check,
suppressions=suppressions,
recent_resolved=recent_resolved,
daemon_ok=_daemon_ok(last_check),
)
@app.route('/links')
@require_auth
def links_page():
user = _get_user()
return render_template('links.html', user=user)
@app.route('/inspector')
@require_auth
def inspector():
user = _get_user()
return render_template('inspector.html', user=user)
@app.route('/suppressions')
@require_auth
@require_admin
def suppressions_page():
user = _get_user()
active = db.get_active_suppressions()
history = db.get_suppression_history(limit=50)
snapshot_raw = db.get_state('network_snapshot')
try:
snapshot = json.loads(snapshot_raw) if snapshot_raw else {}
except Exception as e:
logger.error(f'Failed to parse network_snapshot JSON: {e}')
snapshot = {}
return render_template(
'suppressions.html',
user=user,
active=active,
history=history,
snapshot=snapshot,
)
# ---------------------------------------------------------------------------
# API routes
# ---------------------------------------------------------------------------
@app.route('/api/status')
@require_auth
def api_status():
active = db.get_active_events(limit=_PAGE_LIMIT)
suppressions = db.get_active_suppressions()
_annotate_suppressions(active, suppressions)
last_check = db.get_state('last_check', 'Never')
return jsonify({
'summary': db.get_status_summary(),
'last_check': last_check,
'events': active,
'total_active': db.count_active_events(),
'daemon_ok': _daemon_ok(last_check),
})
@app.route('/api/network')
@require_auth
def api_network():
raw = db.get_state('network_snapshot')
if raw:
try:
return jsonify(json.loads(raw))
except Exception:
logger.error('Failed to parse network_snapshot JSON')
return jsonify({'hosts': {}, 'unifi': [], 'updated': None})
@app.route('/api/links')
@require_auth
def api_links():
raw = db.get_state('link_stats')
if raw:
if len(raw) > 10_000_000:
logger.error(f'link_stats exceeds 10 MB ({len(raw)} bytes); possible corruption')
return jsonify({'error': 'Invalid cached data'}), 503
try:
return jsonify(json.loads(raw))
except Exception as e:
logger.error(f'Failed to parse link_stats JSON: {e}')
return jsonify({'hosts': {}, 'unifi_switches': {}, 'updated': None})
@app.route('/api/events')
@require_auth
def api_events():
try:
limit = min(int(request.args.get('limit', _PAGE_LIMIT)), 1000)
offset = max(int(request.args.get('offset', 0)), 0)
except ValueError:
return jsonify({'error': 'limit and offset must be integers'}), 400
status_filter = request.args.get('status', 'active')
if status_filter not in ('active', 'resolved', 'all'):
return jsonify({'error': 'status must be active, resolved, or all'}), 400
result: dict = {}
if status_filter in ('active', 'all'):
result['active'] = db.get_active_events(limit=limit, offset=offset)
result['total_active'] = db.count_active_events()
if status_filter in ('resolved', 'all'):
result['resolved'] = db.get_recent_resolved(hours=24, limit=30)
return jsonify(result)
@app.route('/api/suppressions', methods=['GET'])
@require_auth
def api_get_suppressions():
return jsonify(db.get_active_suppressions())
@app.route('/api/suppressions', methods=['POST'])
@require_auth
@require_admin
def api_create_suppression():
user = _get_user()
data = request.get_json(silent=True) or {}
target_type = data.get('target_type', 'host')
target_name = (data.get('target_name') or '').strip()
target_detail = (data.get('target_detail') or '').strip()
reason = (data.get('reason') or '').strip()
expires_minutes = data.get('expires_minutes') # None = manual/permanent
if target_type not in ('host', 'interface', 'unifi_device', 'all'):
return jsonify({'error': 'Invalid target_type'}), 400
if target_type != 'all' and not target_name:
return jsonify({'error': 'target_name required'}), 400
if not reason:
return jsonify({'error': 'reason required'}), 400
if len(reason) > 500:
return jsonify({'error': 'reason must be 500 characters or fewer'}), 400
if len(target_name) > 255:
return jsonify({'error': 'target_name must be 255 characters or fewer'}), 400
if len(target_detail) > 255:
return jsonify({'error': 'target_detail must be 255 characters or fewer'}), 400
if expires_minutes is not None:
try:
expires_minutes = int(expires_minutes)
if expires_minutes <= 0 or expires_minutes > 43200:
return jsonify({'error': 'expires_minutes must be between 1 and 43200 (30 days)'}), 400
except (ValueError, TypeError):
return jsonify({'error': 'expires_minutes must be a valid integer'}), 400
sup_id = db.create_suppression(
target_type=target_type,
target_name=target_name,
target_detail=target_detail,
reason=reason,
suppressed_by=user['username'],
expires_minutes=expires_minutes,
)
logger.info(
f'Suppression #{sup_id} created by {user["username"]}: '
f'{target_type}/{target_name}/{target_detail} {reason}'
)
return jsonify({'success': True, 'id': sup_id})
@app.route('/api/suppressions/<int:sup_id>', methods=['DELETE'])
@require_auth
@require_admin
def api_delete_suppression(sup_id: int):
user = _get_user()
db.deactivate_suppression(sup_id)
logger.info(f'Suppression #{sup_id} removed by {user["username"]}')
return jsonify({'success': True})
@app.route('/api/diagnose', methods=['POST'])
@require_auth
def api_diagnose_start():
"""Start a link diagnostic job. Returns {job_id}."""
data = request.get_json(silent=True) or {}
switch_name = (data.get('switch_name') or '').strip()
try:
port_idx = int(data.get('port_idx'))
except (TypeError, ValueError):
return jsonify({'error': 'port_idx must be an integer'}), 400
if not switch_name:
return jsonify({'error': 'switch_name and port_idx required'}), 400
# Look up switch + port in cached link_stats
raw = db.get_state('link_stats')
if not raw:
return jsonify({'error': 'No link_stats data available'}), 503
try:
link_data = json.loads(raw)
except Exception as e:
logger.error(f'Failed to parse link_stats JSON in /api/diagnose: {e}')
return jsonify({'error': 'Internal data error'}), 500
switches = link_data.get('unifi_switches', {})
sw = switches.get(switch_name)
if not sw:
return jsonify({'error': f'Switch "{switch_name}" not found'}), 404
# Find port by port_idx
port_data = None
for pname, pd in sw.get('ports', {}).items():
if pd.get('port_idx') == port_idx:
port_data = dict(pd)
port_data['name'] = pname
break
if not port_data:
return jsonify({'error': f'Port {port_idx} not found on switch "{switch_name}"'}), 404
# LLDP neighbor required to know which host+iface to SSH into
lldp = port_data.get('lldp')
if not lldp or not lldp.get('system_name'):
return jsonify({'error': 'No LLDP neighbor data for this port'}), 400
server_name = lldp['system_name']
if not re.fullmatch(r'[a-zA-Z0-9._-]+', server_name):
logger.error(f'Refusing diagnostic: invalid server_name from LLDP: {server_name!r}')
return jsonify({'error': 'LLDP neighbor name contains invalid characters'}), 400
lldp_port_id = lldp.get('port_id', '')
# Find matching host + interface in link_stats hosts
hosts = link_data.get('hosts', {})
server_ifaces = hosts.get(server_name)
if not server_ifaces:
return jsonify({'error': f'Host "{server_name}" not in link stats'}), 404
# Match interface by LLDP port_id (exact then fuzzy)
matched_iface = None
if lldp_port_id and lldp_port_id in server_ifaces:
matched_iface = lldp_port_id
if not matched_iface and lldp_port_id:
matched_iface = next(
(k for k in server_ifaces if lldp_port_id in k or k in lldp_port_id),
None
)
if not matched_iface:
matched_iface = next(iter(server_ifaces), None)
if not matched_iface:
return jsonify({'error': 'Cannot determine server interface'}), 400
# Resolve host IP from link_stats host data
host_ip = (server_ifaces.get(matched_iface) or {}).get('host_ip')
if not host_ip:
# Fallback: use first valid IP from LLDP mgmt IPs
for candidate in (lldp.get('mgmt_ips') or []):
try:
ipaddress.ip_address(candidate)
host_ip = candidate
break
except ValueError:
continue
if not host_ip:
return jsonify({'error': 'Cannot determine host IP for SSH'}), 400
# Validate resolved values before passing to SSH command builder
try:
ipaddress.ip_address(host_ip)
except ValueError:
logger.error(f'Refusing diagnostic: invalid host_ip "{host_ip}" for {server_name}')
return jsonify({'error': 'Resolved host IP is not a valid IP address'}), 400
if not re.fullmatch(r'[a-zA-Z0-9._-]+', matched_iface):
logger.error(f'Refusing diagnostic: invalid iface "{matched_iface}" for {server_name}')
return jsonify({'error': 'Resolved interface name contains invalid characters'}), 400
job_id = str(uuid.uuid4())
requesting_user = _get_user()['username']
now = time.time()
with _diag_lock:
# Rate limit: max 5 diagnostic jobs per user per minute; prune stale user entries
stale_users = [u for u, ts in _diag_rate.items() if not ts or max(ts) < now - 3600]
for u in stale_users:
del _diag_rate[u]
recent = [t for t in _diag_rate.get(requesting_user, []) if now - t < 60]
if len(recent) >= 5:
return jsonify({'error': 'Rate limit exceeded: max 5 diagnostics per minute'}), 429
recent.append(now)
_diag_rate[requesting_user] = recent
_diag_jobs[job_id] = {
'status': 'running', 'result': None,
'created_at': now, 'user': requesting_user,
}
def _run():
try:
cfg = _config()
pulse = PulseClient(cfg)
runner = diagnose.DiagnosticsRunner(pulse)
result = runner.run(host_ip, server_name, matched_iface, port_data)
except Exception as e:
logger.error(f'Diagnostic job {job_id} failed: {e}', exc_info=True)
result = {'status': 'error', 'error': 'Diagnostic failed; check server logs.'}
with _diag_lock:
if job_id in _diag_jobs:
_diag_jobs[job_id]['status'] = 'done'
_diag_jobs[job_id]['result'] = result
t = threading.Thread(target=_run, daemon=True)
t.start()
return jsonify({'job_id': job_id})
@app.route('/api/diagnose/<job_id>', methods=['GET'])
@require_auth
def api_diagnose_poll(job_id: str):
"""Poll a diagnostic job. Returns {status, result}."""
current_user = _get_user()['username']
with _diag_lock:
job = _diag_jobs.get(job_id)
if not job:
return jsonify({'error': 'Job not found'}), 404
if job.get('user') != current_user:
return jsonify({'error': 'Forbidden'}), 403
snapshot = {'status': job['status'], 'result': job.get('result')}
return jsonify(snapshot)
@app.route('/api/avatar')
@require_auth
def api_avatar():
"""Serve the current user's LDAP avatar photo (JPEG), cached to disk."""
username = request.headers.get('Remote-User', '').strip()
if not username:
return '', 404
ldap_cfg = _config().get('ldap', {})
if not ldap_cfg.get('host') or not ldap_cfg.get('bind_dn'):
return '', 404
# Build a safe cache filename from the username (alphanumeric + - _ .)
safe_name = re.sub(r'[^a-zA-Z0-9._-]', '_', username)
cache_dir = os.path.abspath(
ldap_cfg.get('cache_dir', os.path.join(tempfile.gettempdir(), 'gandalf_avatars'))
)
os.makedirs(cache_dir, exist_ok=True)
cache_file = os.path.abspath(os.path.join(cache_dir, f'user_{safe_name}.jpg'))
sentinel = os.path.abspath(os.path.join(cache_dir, f'user_{safe_name}.none'))
# Guard against path escape (shouldn't happen with sanitised safe_name, but be explicit)
if not cache_file.startswith(cache_dir + os.sep) or not sentinel.startswith(cache_dir + os.sep):
logger.error(f'Avatar path escape detected for user {username!r}')
return '', 404
try:
cache_ttl = int(ldap_cfg.get('cache_ttl', 3600))
except (ValueError, TypeError):
logger.warning('Invalid cache_ttl in ldap config; using default 3600')
cache_ttl = 3600
now = time.time()
# Serve cached image if fresh
if os.path.exists(cache_file) and now - os.path.getmtime(cache_file) < cache_ttl:
return send_file(cache_file, mimetype='image/jpeg',
max_age=cache_ttl, conditional=True)
# Skip LDAP if we already know this user has no avatar
try:
if os.path.exists(sentinel) and now - os.path.getmtime(sentinel) < cache_ttl:
return '', 404
except OSError:
pass
# Query lldap
bind_pw = ldap_cfg.get('bind_pw', '')
if not bind_pw:
logger.error('LDAP bind_pw not configured — avatar lookup disabled')
return '', 404
avatar_data = None
conn = None
try:
import ldap3
server = ldap3.Server(ldap_cfg['host'], port=int(ldap_cfg.get('port', 3890)))
conn = ldap3.Connection(server,
user=ldap_cfg['bind_dn'],
password=bind_pw,
auto_bind=True, receive_timeout=5)
safe_uid = ldap3.utils.conv.escape_filter_chars(username)
conn.search(ldap_cfg.get('user_base', 'ou=people,dc=example,dc=com'),
f'(uid={safe_uid})', attributes=['avatar'])
if conn.entries and conn.entries[0]['avatar'].value:
avatar_data = conn.entries[0]['avatar'].value
except ImportError:
logger.error('ldap3 not installed — run: pip install ldap3')
return '', 404
except Exception as e:
logger.error(f'LDAP avatar lookup failed for {username}: {e}')
return '', 404
finally:
if conn is not None:
try:
conn.unbind()
except Exception:
pass
if not avatar_data or len(avatar_data) < 100:
with open(sentinel, 'w'):
pass
return '', 404
# Validate JPEG magic bytes (FF D8 FF)
if isinstance(avatar_data, str):
avatar_data = avatar_data.encode('latin-1')
if avatar_data[:3] != b'\xFF\xD8\xFF':
logger.warning(f'Non-JPEG avatar data for {username}')
with open(sentinel, 'w'):
pass
return '', 404
with open(cache_file, 'wb') as f:
f.write(avatar_data)
if os.path.exists(sentinel):
os.unlink(sentinel)
resp = make_response(avatar_data)
resp.headers['Content-Type'] = 'image/jpeg'
resp.headers['Cache-Control'] = f'private, max-age={cache_ttl}'
return resp
@app.route('/health')
def health():
"""Health check endpoint (no auth). Checks DB and monitor freshness."""
checks = {}
overall = 'ok'
# DB connectivity
try:
db.get_state('last_check')
checks['db'] = 'ok'
except Exception as e:
logger.error(f'Health check db error: {e}')
checks['db'] = 'error'
overall = 'degraded'
# Monitor freshness: fail if last_check is older than 20 minutes
try:
last_check = db.get_state('last_check', '')
if last_check:
ts = datetime.strptime(last_check, '%Y-%m-%d %H:%M:%S UTC').replace(tzinfo=timezone.utc)
age_s = (datetime.now(timezone.utc) - ts).total_seconds()
if age_s > 1200:
checks['monitor'] = 'stale'
overall = 'degraded'
else:
checks['monitor'] = 'ok'
else:
checks['monitor'] = 'no data yet'
except Exception as e:
logger.error(f'Health check monitor error: {e}')
checks['monitor'] = 'error'
overall = 'degraded'
status_code = 200 if overall == 'ok' else 503
return jsonify({'status': overall, 'service': 'gandalf', 'checks': checks}), status_code
if __name__ == '__main__':
app.run(debug=True, host='0.0.0.0', port=5000) # nosec B201 B104 — dev runner only; production uses gunicorn