Complete rewrite: full-featured network monitoring dashboard
- Two-service architecture: Flask web app (gandalf.service) + background polling daemon (gandalf-monitor.service) - Monitor polls Prometheus node_network_up for physical NIC states on all 6 hypervisors (added storage-01 at 10.10.10.11:9100) - UniFi API monitoring for switches, APs, and gateway device status - Ping reachability for hosts without node_exporter (pbs only now) - Smart baseline: interfaces first seen as down are never alerted on; only UP→DOWN regressions trigger tickets - Cluster-wide P1 ticket when 3+ hosts have genuine simultaneous interface regressions (guards against false positives on startup) - Tinker Tickets integration with 24-hour hash-based deduplication - Alert suppression: manual toggle or timed windows (30m/1h/4h/8h) - Authelia SSO via forward-auth headers, admin group required - Network topology: Internet → UDM-Pro → Agg Switch (10G DAC) → PoE Switch (10G DAC) → Hosts - MariaDB schema, suppression management UI, host/interface cards Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
304
db.py
Normal file
304
db.py
Normal file
@@ -0,0 +1,304 @@
|
||||
"""Database operations for Gandalf network monitor."""
|
||||
import json
|
||||
import logging
|
||||
from contextlib import contextmanager
|
||||
from datetime import datetime, timedelta
|
||||
from typing import Optional
|
||||
|
||||
import pymysql
|
||||
import pymysql.cursors
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
_config_cache = None
|
||||
|
||||
|
||||
def _config() -> dict:
|
||||
global _config_cache
|
||||
if _config_cache is None:
|
||||
with open('config.json') as f:
|
||||
_config_cache = json.load(f)['database']
|
||||
return _config_cache
|
||||
|
||||
|
||||
@contextmanager
|
||||
def get_conn():
|
||||
cfg = _config()
|
||||
conn = pymysql.connect(
|
||||
host=cfg['host'],
|
||||
port=cfg.get('port', 3306),
|
||||
user=cfg['user'],
|
||||
password=cfg['password'],
|
||||
database=cfg['name'],
|
||||
autocommit=True,
|
||||
cursorclass=pymysql.cursors.DictCursor,
|
||||
connect_timeout=10,
|
||||
charset='utf8mb4',
|
||||
)
|
||||
try:
|
||||
yield conn
|
||||
finally:
|
||||
conn.close()
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Monitor state (key/value store)
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def set_state(key: str, value) -> None:
|
||||
if not isinstance(value, str):
|
||||
value = json.dumps(value, default=str)
|
||||
with get_conn() as conn:
|
||||
with conn.cursor() as cur:
|
||||
cur.execute(
|
||||
"""INSERT INTO monitor_state (key_name, value)
|
||||
VALUES (%s, %s)
|
||||
ON DUPLICATE KEY UPDATE value=VALUES(value), updated_at=NOW()""",
|
||||
(key, value),
|
||||
)
|
||||
|
||||
|
||||
def get_state(key: str, default=None):
|
||||
with get_conn() as conn:
|
||||
with conn.cursor() as cur:
|
||||
cur.execute('SELECT value FROM monitor_state WHERE key_name=%s', (key,))
|
||||
row = cur.fetchone()
|
||||
return row['value'] if row else default
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Interface baseline tracking
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def get_baseline() -> dict:
|
||||
raw = get_state('interface_baseline')
|
||||
if raw:
|
||||
try:
|
||||
return json.loads(raw)
|
||||
except Exception:
|
||||
pass
|
||||
return {}
|
||||
|
||||
|
||||
def set_baseline(baseline: dict) -> None:
|
||||
set_state('interface_baseline', json.dumps(baseline))
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Network events
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def upsert_event(
|
||||
event_type: str,
|
||||
severity: str,
|
||||
source_type: str,
|
||||
target_name: str,
|
||||
target_detail: str,
|
||||
description: str,
|
||||
) -> tuple:
|
||||
"""Insert or update a network event. Returns (id, is_new, consecutive_failures)."""
|
||||
detail = target_detail or ''
|
||||
with get_conn() as conn:
|
||||
with conn.cursor() as cur:
|
||||
cur.execute(
|
||||
"""SELECT id, consecutive_failures FROM network_events
|
||||
WHERE event_type=%s AND target_name=%s AND target_detail=%s
|
||||
AND resolved_at IS NULL LIMIT 1""",
|
||||
(event_type, target_name, detail),
|
||||
)
|
||||
existing = cur.fetchone()
|
||||
|
||||
if existing:
|
||||
new_count = existing['consecutive_failures'] + 1
|
||||
cur.execute(
|
||||
"""UPDATE network_events
|
||||
SET last_seen=NOW(), consecutive_failures=%s, description=%s
|
||||
WHERE id=%s""",
|
||||
(new_count, description, existing['id']),
|
||||
)
|
||||
return existing['id'], False, new_count
|
||||
else:
|
||||
cur.execute(
|
||||
"""INSERT INTO network_events
|
||||
(event_type, severity, source_type, target_name, target_detail, description)
|
||||
VALUES (%s, %s, %s, %s, %s, %s)""",
|
||||
(event_type, severity, source_type, target_name, detail, description),
|
||||
)
|
||||
return cur.lastrowid, True, 1
|
||||
|
||||
|
||||
def resolve_event(event_type: str, target_name: str, target_detail: str = '') -> None:
|
||||
detail = target_detail or ''
|
||||
with get_conn() as conn:
|
||||
with conn.cursor() as cur:
|
||||
cur.execute(
|
||||
"""UPDATE network_events SET resolved_at=NOW()
|
||||
WHERE event_type=%s AND target_name=%s AND target_detail=%s
|
||||
AND resolved_at IS NULL""",
|
||||
(event_type, target_name, detail),
|
||||
)
|
||||
|
||||
|
||||
def set_ticket_id(event_id: int, ticket_id: str) -> None:
|
||||
with get_conn() as conn:
|
||||
with conn.cursor() as cur:
|
||||
cur.execute(
|
||||
'UPDATE network_events SET ticket_id=%s WHERE id=%s',
|
||||
(ticket_id, event_id),
|
||||
)
|
||||
|
||||
|
||||
def get_active_events() -> list:
|
||||
with get_conn() as conn:
|
||||
with conn.cursor() as cur:
|
||||
cur.execute(
|
||||
"""SELECT * FROM network_events
|
||||
WHERE resolved_at IS NULL
|
||||
ORDER BY
|
||||
FIELD(severity,'critical','warning','info'),
|
||||
first_seen DESC"""
|
||||
)
|
||||
rows = cur.fetchall()
|
||||
for r in rows:
|
||||
for k in ('first_seen', 'last_seen'):
|
||||
if r.get(k) and hasattr(r[k], 'isoformat'):
|
||||
r[k] = r[k].isoformat()
|
||||
return rows
|
||||
|
||||
|
||||
def get_recent_resolved(hours: int = 24, limit: int = 50) -> list:
|
||||
with get_conn() as conn:
|
||||
with conn.cursor() as cur:
|
||||
cur.execute(
|
||||
"""SELECT * FROM network_events
|
||||
WHERE resolved_at IS NOT NULL
|
||||
AND resolved_at > DATE_SUB(NOW(), INTERVAL %s HOUR)
|
||||
ORDER BY resolved_at DESC LIMIT %s""",
|
||||
(hours, limit),
|
||||
)
|
||||
rows = cur.fetchall()
|
||||
for r in rows:
|
||||
for k in ('first_seen', 'last_seen', 'resolved_at'):
|
||||
if r.get(k) and hasattr(r[k], 'isoformat'):
|
||||
r[k] = r[k].isoformat()
|
||||
return rows
|
||||
|
||||
|
||||
def get_status_summary() -> dict:
|
||||
with get_conn() as conn:
|
||||
with conn.cursor() as cur:
|
||||
cur.execute(
|
||||
"""SELECT severity, COUNT(*) as cnt FROM network_events
|
||||
WHERE resolved_at IS NULL GROUP BY severity"""
|
||||
)
|
||||
counts = {r['severity']: r['cnt'] for r in cur.fetchall()}
|
||||
return {
|
||||
'critical': counts.get('critical', 0),
|
||||
'warning': counts.get('warning', 0),
|
||||
'info': counts.get('info', 0),
|
||||
}
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Suppression rules
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def get_active_suppressions() -> list:
|
||||
with get_conn() as conn:
|
||||
with conn.cursor() as cur:
|
||||
cur.execute(
|
||||
"""SELECT * FROM suppression_rules
|
||||
WHERE active=TRUE AND (expires_at IS NULL OR expires_at > NOW())
|
||||
ORDER BY created_at DESC"""
|
||||
)
|
||||
rows = cur.fetchall()
|
||||
for r in rows:
|
||||
for k in ('created_at', 'expires_at'):
|
||||
if r.get(k) and hasattr(r[k], 'isoformat'):
|
||||
r[k] = r[k].isoformat()
|
||||
return rows
|
||||
|
||||
|
||||
def get_suppression_history(limit: int = 50) -> list:
|
||||
with get_conn() as conn:
|
||||
with conn.cursor() as cur:
|
||||
cur.execute(
|
||||
'SELECT * FROM suppression_rules ORDER BY created_at DESC LIMIT %s',
|
||||
(limit,),
|
||||
)
|
||||
rows = cur.fetchall()
|
||||
for r in rows:
|
||||
for k in ('created_at', 'expires_at'):
|
||||
if r.get(k) and hasattr(r[k], 'isoformat'):
|
||||
r[k] = r[k].isoformat()
|
||||
return rows
|
||||
|
||||
|
||||
def create_suppression(
|
||||
target_type: str,
|
||||
target_name: str,
|
||||
target_detail: str,
|
||||
reason: str,
|
||||
suppressed_by: str,
|
||||
expires_minutes: Optional[int] = None,
|
||||
) -> int:
|
||||
expires_at = None
|
||||
if expires_minutes:
|
||||
expires_at = datetime.utcnow() + timedelta(minutes=int(expires_minutes))
|
||||
with get_conn() as conn:
|
||||
with conn.cursor() as cur:
|
||||
cur.execute(
|
||||
"""INSERT INTO suppression_rules
|
||||
(target_type, target_name, target_detail, reason, suppressed_by, expires_at, active)
|
||||
VALUES (%s, %s, %s, %s, %s, %s, TRUE)""",
|
||||
(target_type, target_name or '', target_detail or '', reason, suppressed_by, expires_at),
|
||||
)
|
||||
return cur.lastrowid
|
||||
|
||||
|
||||
def deactivate_suppression(sup_id: int) -> None:
|
||||
with get_conn() as conn:
|
||||
with conn.cursor() as cur:
|
||||
cur.execute(
|
||||
'UPDATE suppression_rules SET active=FALSE WHERE id=%s', (sup_id,)
|
||||
)
|
||||
|
||||
|
||||
def is_suppressed(target_type: str, target_name: str, target_detail: str = '') -> bool:
|
||||
with get_conn() as conn:
|
||||
with conn.cursor() as cur:
|
||||
# Global suppression (all)
|
||||
cur.execute(
|
||||
"""SELECT id FROM suppression_rules
|
||||
WHERE active=TRUE AND (expires_at IS NULL OR expires_at > NOW())
|
||||
AND target_type='all' LIMIT 1"""
|
||||
)
|
||||
if cur.fetchone():
|
||||
return True
|
||||
|
||||
if not target_name:
|
||||
return False
|
||||
|
||||
# Host-level suppression (covers all interfaces on that host)
|
||||
cur.execute(
|
||||
"""SELECT id FROM suppression_rules
|
||||
WHERE active=TRUE AND (expires_at IS NULL OR expires_at > NOW())
|
||||
AND target_type=%s AND target_name=%s
|
||||
AND (target_detail IS NULL OR target_detail='') LIMIT 1""",
|
||||
(target_type, target_name),
|
||||
)
|
||||
if cur.fetchone():
|
||||
return True
|
||||
|
||||
# Interface/device-specific suppression
|
||||
if target_detail:
|
||||
cur.execute(
|
||||
"""SELECT id FROM suppression_rules
|
||||
WHERE active=TRUE AND (expires_at IS NULL OR expires_at > NOW())
|
||||
AND target_type=%s AND target_name=%s AND target_detail=%s LIMIT 1""",
|
||||
(target_type, target_name, target_detail),
|
||||
)
|
||||
if cur.fetchone():
|
||||
return True
|
||||
|
||||
return False
|
||||
Reference in New Issue
Block a user