Files
gandalf/app.py
Jared Vititoe 0278dad502 feat: inspector page, link debug enhancements, security hardening
- Add /inspector page: visual model-accurate switch chassis diagrams
  (USF5P, USL8A, US24PRO, USPPDUP, USMINI), clickable port blocks
  with color coding (green=up, amber=PoE, cyan=uplink, grey=down),
  detail panel with stats/PoE/LLDP, LLDP-based path debug side-by-side

- Link Debug: port number badges (#N), LLDP neighbor line, PoE class/max,
  collapsible host/switch panels with sessionStorage persistence

- monitor.py: collect LLDP neighbor map + PoE class/max/mode per switch
  port; PulseClient uses requests.Session() for HTTP keep-alive; add
  shlex.quote() around interface names (defense-in-depth)

- Security: suppress buttons use data-* attrs + delegated click handler
  instead of inline onclick with Jinja2 variable interpolation; remove
  | safe filter from user-controlled fields in suppressions.html;
  setDuration() takes explicit el param instead of implicit event global

- db.py: thread-local connection reuse with ping(reconnect=True) to
  avoid a new TCP handshake per query

- .gitignore: add config.json (contains credentials), __pycache__

- README: full rewrite covering architecture, all 4 pages, alert logic,
  config reference, deployment, troubleshooting, security notes

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-03 15:39:48 -05:00

234 lines
6.5 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""Gandalf Global Advanced Network Detection And Link Facilitator.
Flask web application serving the monitoring dashboard and suppression
management UI. Authentication via Authelia forward-auth headers.
All monitoring and alerting is handled by the separate monitor.py daemon.
"""
import json
import logging
from functools import wraps
from flask import Flask, jsonify, redirect, render_template, request, url_for
import db
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s %(levelname)s %(name)s %(message)s',
)
logger = logging.getLogger('gandalf.web')
app = Flask(__name__)
_cfg = None
def _config() -> dict:
global _cfg
if _cfg is None:
with open('config.json') as f:
_cfg = json.load(f)
return _cfg
# ---------------------------------------------------------------------------
# Auth helpers
# ---------------------------------------------------------------------------
def _get_user() -> dict:
return {
'username': request.headers.get('Remote-User', ''),
'name': request.headers.get('Remote-Name', ''),
'email': request.headers.get('Remote-Email', ''),
'groups': [
g.strip()
for g in request.headers.get('Remote-Groups', '').split(',')
if g.strip()
],
}
def require_auth(f):
@wraps(f)
def wrapper(*args, **kwargs):
user = _get_user()
if not user['username']:
return (
'<h1>401 Not authenticated</h1>'
'<p>Please access Gandalf through '
'<a href="https://auth.lotusguild.org">auth.lotusguild.org</a>.</p>',
401,
)
allowed = _config().get('auth', {}).get('allowed_groups', ['admin'])
if not any(g in allowed for g in user['groups']):
return (
f'<h1>403 Access denied</h1>'
f'<p>Your account ({user["username"]}) is not in an allowed group '
f'({", ".join(allowed)}).</p>',
403,
)
return f(*args, **kwargs)
return wrapper
# ---------------------------------------------------------------------------
# Page routes
# ---------------------------------------------------------------------------
@app.route('/')
@require_auth
def index():
user = _get_user()
events = db.get_active_events()
summary = db.get_status_summary()
snapshot_raw = db.get_state('network_snapshot')
last_check = db.get_state('last_check', 'Never')
snapshot = json.loads(snapshot_raw) if snapshot_raw else {}
suppressions = db.get_active_suppressions()
return render_template(
'index.html',
user=user,
events=events,
summary=summary,
snapshot=snapshot,
last_check=last_check,
suppressions=suppressions,
)
@app.route('/links')
@require_auth
def links_page():
user = _get_user()
return render_template('links.html', user=user)
@app.route('/inspector')
@require_auth
def inspector():
user = _get_user()
return render_template('inspector.html', user=user)
@app.route('/suppressions')
@require_auth
def suppressions_page():
user = _get_user()
active = db.get_active_suppressions()
history = db.get_suppression_history(limit=50)
snapshot_raw = db.get_state('network_snapshot')
snapshot = json.loads(snapshot_raw) if snapshot_raw else {}
return render_template(
'suppressions.html',
user=user,
active=active,
history=history,
snapshot=snapshot,
)
# ---------------------------------------------------------------------------
# API routes
# ---------------------------------------------------------------------------
@app.route('/api/status')
@require_auth
def api_status():
return jsonify({
'summary': db.get_status_summary(),
'last_check': db.get_state('last_check', 'Never'),
'events': db.get_active_events(),
})
@app.route('/api/network')
@require_auth
def api_network():
raw = db.get_state('network_snapshot')
if raw:
try:
return jsonify(json.loads(raw))
except Exception:
pass
return jsonify({'hosts': {}, 'unifi': [], 'updated': None})
@app.route('/api/links')
@require_auth
def api_links():
raw = db.get_state('link_stats')
if raw:
try:
return jsonify(json.loads(raw))
except Exception:
pass
return jsonify({'hosts': {}, 'updated': None})
@app.route('/api/events')
@require_auth
def api_events():
return jsonify({
'active': db.get_active_events(),
'resolved': db.get_recent_resolved(hours=24, limit=30),
})
@app.route('/api/suppressions', methods=['GET'])
@require_auth
def api_get_suppressions():
return jsonify(db.get_active_suppressions())
@app.route('/api/suppressions', methods=['POST'])
@require_auth
def api_create_suppression():
user = _get_user()
data = request.get_json(silent=True) or {}
target_type = data.get('target_type', 'host')
target_name = (data.get('target_name') or '').strip()
target_detail = (data.get('target_detail') or '').strip()
reason = (data.get('reason') or '').strip()
expires_minutes = data.get('expires_minutes') # None = manual/permanent
if target_type not in ('host', 'interface', 'unifi_device', 'all'):
return jsonify({'error': 'Invalid target_type'}), 400
if target_type != 'all' and not target_name:
return jsonify({'error': 'target_name required'}), 400
if not reason:
return jsonify({'error': 'reason required'}), 400
sup_id = db.create_suppression(
target_type=target_type,
target_name=target_name,
target_detail=target_detail,
reason=reason,
suppressed_by=user['username'],
expires_minutes=int(expires_minutes) if expires_minutes else None,
)
logger.info(
f'Suppression #{sup_id} created by {user["username"]}: '
f'{target_type}/{target_name}/{target_detail} {reason}'
)
return jsonify({'success': True, 'id': sup_id})
@app.route('/api/suppressions/<int:sup_id>', methods=['DELETE'])
@require_auth
def api_delete_suppression(sup_id: int):
user = _get_user()
db.deactivate_suppression(sup_id)
logger.info(f'Suppression #{sup_id} removed by {user["username"]}')
return jsonify({'success': True})
@app.route('/health')
def health():
"""Health check endpoint (no auth)."""
return jsonify({'status': 'ok', 'service': 'gandalf'})
if __name__ == '__main__':
app.run(debug=True, host='0.0.0.0', port=5000)