Files
gandalf/app.py

227 lines
6.4 KiB
Python
Raw Normal View History

"""Gandalf Global Advanced Network Detection And Link Facilitator.
Flask web application serving the monitoring dashboard and suppression
management UI. Authentication via Authelia forward-auth headers.
All monitoring and alerting is handled by the separate monitor.py daemon.
"""
2025-01-04 01:42:16 -05:00
import json
import logging
from functools import wraps
from flask import Flask, jsonify, redirect, render_template, request, url_for
import db
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s %(levelname)s %(name)s %(message)s',
)
logger = logging.getLogger('gandalf.web')
2025-01-04 01:42:16 -05:00
app = Flask(__name__)
_cfg = None
def _config() -> dict:
global _cfg
if _cfg is None:
with open('config.json') as f:
_cfg = json.load(f)
return _cfg
# ---------------------------------------------------------------------------
# Auth helpers
# ---------------------------------------------------------------------------
def _get_user() -> dict:
return {
'username': request.headers.get('Remote-User', ''),
'name': request.headers.get('Remote-Name', ''),
'email': request.headers.get('Remote-Email', ''),
'groups': [
g.strip()
for g in request.headers.get('Remote-Groups', '').split(',')
if g.strip()
],
}
def require_auth(f):
@wraps(f)
def wrapper(*args, **kwargs):
user = _get_user()
if not user['username']:
return (
'<h1>401 Not authenticated</h1>'
'<p>Please access Gandalf through '
'<a href="https://auth.lotusguild.org">auth.lotusguild.org</a>.</p>',
401,
)
allowed = _config().get('auth', {}).get('allowed_groups', ['admin'])
if not any(g in allowed for g in user['groups']):
return (
f'<h1>403 Access denied</h1>'
f'<p>Your account ({user["username"]}) is not in an allowed group '
f'({", ".join(allowed)}).</p>',
403,
)
return f(*args, **kwargs)
return wrapper
# ---------------------------------------------------------------------------
# Page routes
# ---------------------------------------------------------------------------
2025-02-08 00:03:01 -05:00
@app.route('/')
@require_auth
def index():
user = _get_user()
events = db.get_active_events()
summary = db.get_status_summary()
snapshot_raw = db.get_state('network_snapshot')
last_check = db.get_state('last_check', 'Never')
snapshot = json.loads(snapshot_raw) if snapshot_raw else {}
suppressions = db.get_active_suppressions()
return render_template(
'index.html',
user=user,
events=events,
summary=summary,
snapshot=snapshot,
last_check=last_check,
suppressions=suppressions,
)
@app.route('/links')
@require_auth
def links_page():
user = _get_user()
return render_template('links.html', user=user)
@app.route('/suppressions')
@require_auth
def suppressions_page():
user = _get_user()
active = db.get_active_suppressions()
history = db.get_suppression_history(limit=50)
snapshot_raw = db.get_state('network_snapshot')
snapshot = json.loads(snapshot_raw) if snapshot_raw else {}
return render_template(
'suppressions.html',
user=user,
active=active,
history=history,
snapshot=snapshot,
)
# ---------------------------------------------------------------------------
# API routes
# ---------------------------------------------------------------------------
2025-02-07 21:03:31 -05:00
@app.route('/api/status')
@require_auth
def api_status():
return jsonify({
'summary': db.get_status_summary(),
'last_check': db.get_state('last_check', 'Never'),
'events': db.get_active_events(),
})
@app.route('/api/network')
@require_auth
def api_network():
raw = db.get_state('network_snapshot')
if raw:
try:
return jsonify(json.loads(raw))
except Exception:
pass
return jsonify({'hosts': {}, 'unifi': [], 'updated': None})
@app.route('/api/links')
@require_auth
def api_links():
raw = db.get_state('link_stats')
if raw:
try:
return jsonify(json.loads(raw))
except Exception:
pass
return jsonify({'hosts': {}, 'updated': None})
@app.route('/api/events')
@require_auth
def api_events():
return jsonify({
'active': db.get_active_events(),
'resolved': db.get_recent_resolved(hours=24, limit=30),
})
@app.route('/api/suppressions', methods=['GET'])
@require_auth
def api_get_suppressions():
return jsonify(db.get_active_suppressions())
@app.route('/api/suppressions', methods=['POST'])
@require_auth
def api_create_suppression():
user = _get_user()
data = request.get_json(silent=True) or {}
target_type = data.get('target_type', 'host')
target_name = (data.get('target_name') or '').strip()
target_detail = (data.get('target_detail') or '').strip()
reason = (data.get('reason') or '').strip()
expires_minutes = data.get('expires_minutes') # None = manual/permanent
if target_type not in ('host', 'interface', 'unifi_device', 'all'):
return jsonify({'error': 'Invalid target_type'}), 400
if target_type != 'all' and not target_name:
return jsonify({'error': 'target_name required'}), 400
if not reason:
return jsonify({'error': 'reason required'}), 400
sup_id = db.create_suppression(
target_type=target_type,
target_name=target_name,
target_detail=target_detail,
reason=reason,
suppressed_by=user['username'],
expires_minutes=int(expires_minutes) if expires_minutes else None,
)
logger.info(
f'Suppression #{sup_id} created by {user["username"]}: '
f'{target_type}/{target_name}/{target_detail} {reason}'
)
return jsonify({'success': True, 'id': sup_id})
@app.route('/api/suppressions/<int:sup_id>', methods=['DELETE'])
@require_auth
def api_delete_suppression(sup_id: int):
user = _get_user()
db.deactivate_suppression(sup_id)
logger.info(f'Suppression #{sup_id} removed by {user["username"]}')
return jsonify({'success': True})
@app.route('/health')
def health():
"""Health check endpoint (no auth)."""
return jsonify({'status': 'ok', 'service': 'gandalf'})
2025-01-04 01:42:16 -05:00
if __name__ == '__main__':
app.run(debug=True, host='0.0.0.0', port=5000)