Files
gandalf/db.py
Jared Vititoe 0c0150f698 Complete rewrite: full-featured network monitoring dashboard
- Two-service architecture: Flask web app (gandalf.service) + background
  polling daemon (gandalf-monitor.service)
- Monitor polls Prometheus node_network_up for physical NIC states on all
  6 hypervisors (added storage-01 at 10.10.10.11:9100)
- UniFi API monitoring for switches, APs, and gateway device status
- Ping reachability for hosts without node_exporter (pbs only now)
- Smart baseline: interfaces first seen as down are never alerted on;
  only UP→DOWN regressions trigger tickets
- Cluster-wide P1 ticket when 3+ hosts have genuine simultaneous
  interface regressions (guards against false positives on startup)
- Tinker Tickets integration with 24-hour hash-based deduplication
- Alert suppression: manual toggle or timed windows (30m/1h/4h/8h)
- Authelia SSO via forward-auth headers, admin group required
- Network topology: Internet → UDM-Pro → Agg Switch (10G DAC) →
  PoE Switch (10G DAC) → Hosts
- MariaDB schema, suppression management UI, host/interface cards

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-01 23:03:18 -05:00

305 lines
10 KiB
Python

"""Database operations for Gandalf network monitor."""
import json
import logging
from contextlib import contextmanager
from datetime import datetime, timedelta
from typing import Optional
import pymysql
import pymysql.cursors
logger = logging.getLogger(__name__)
_config_cache = None
def _config() -> dict:
global _config_cache
if _config_cache is None:
with open('config.json') as f:
_config_cache = json.load(f)['database']
return _config_cache
@contextmanager
def get_conn():
cfg = _config()
conn = pymysql.connect(
host=cfg['host'],
port=cfg.get('port', 3306),
user=cfg['user'],
password=cfg['password'],
database=cfg['name'],
autocommit=True,
cursorclass=pymysql.cursors.DictCursor,
connect_timeout=10,
charset='utf8mb4',
)
try:
yield conn
finally:
conn.close()
# ---------------------------------------------------------------------------
# Monitor state (key/value store)
# ---------------------------------------------------------------------------
def set_state(key: str, value) -> None:
if not isinstance(value, str):
value = json.dumps(value, default=str)
with get_conn() as conn:
with conn.cursor() as cur:
cur.execute(
"""INSERT INTO monitor_state (key_name, value)
VALUES (%s, %s)
ON DUPLICATE KEY UPDATE value=VALUES(value), updated_at=NOW()""",
(key, value),
)
def get_state(key: str, default=None):
with get_conn() as conn:
with conn.cursor() as cur:
cur.execute('SELECT value FROM monitor_state WHERE key_name=%s', (key,))
row = cur.fetchone()
return row['value'] if row else default
# ---------------------------------------------------------------------------
# Interface baseline tracking
# ---------------------------------------------------------------------------
def get_baseline() -> dict:
raw = get_state('interface_baseline')
if raw:
try:
return json.loads(raw)
except Exception:
pass
return {}
def set_baseline(baseline: dict) -> None:
set_state('interface_baseline', json.dumps(baseline))
# ---------------------------------------------------------------------------
# Network events
# ---------------------------------------------------------------------------
def upsert_event(
event_type: str,
severity: str,
source_type: str,
target_name: str,
target_detail: str,
description: str,
) -> tuple:
"""Insert or update a network event. Returns (id, is_new, consecutive_failures)."""
detail = target_detail or ''
with get_conn() as conn:
with conn.cursor() as cur:
cur.execute(
"""SELECT id, consecutive_failures FROM network_events
WHERE event_type=%s AND target_name=%s AND target_detail=%s
AND resolved_at IS NULL LIMIT 1""",
(event_type, target_name, detail),
)
existing = cur.fetchone()
if existing:
new_count = existing['consecutive_failures'] + 1
cur.execute(
"""UPDATE network_events
SET last_seen=NOW(), consecutive_failures=%s, description=%s
WHERE id=%s""",
(new_count, description, existing['id']),
)
return existing['id'], False, new_count
else:
cur.execute(
"""INSERT INTO network_events
(event_type, severity, source_type, target_name, target_detail, description)
VALUES (%s, %s, %s, %s, %s, %s)""",
(event_type, severity, source_type, target_name, detail, description),
)
return cur.lastrowid, True, 1
def resolve_event(event_type: str, target_name: str, target_detail: str = '') -> None:
detail = target_detail or ''
with get_conn() as conn:
with conn.cursor() as cur:
cur.execute(
"""UPDATE network_events SET resolved_at=NOW()
WHERE event_type=%s AND target_name=%s AND target_detail=%s
AND resolved_at IS NULL""",
(event_type, target_name, detail),
)
def set_ticket_id(event_id: int, ticket_id: str) -> None:
with get_conn() as conn:
with conn.cursor() as cur:
cur.execute(
'UPDATE network_events SET ticket_id=%s WHERE id=%s',
(ticket_id, event_id),
)
def get_active_events() -> list:
with get_conn() as conn:
with conn.cursor() as cur:
cur.execute(
"""SELECT * FROM network_events
WHERE resolved_at IS NULL
ORDER BY
FIELD(severity,'critical','warning','info'),
first_seen DESC"""
)
rows = cur.fetchall()
for r in rows:
for k in ('first_seen', 'last_seen'):
if r.get(k) and hasattr(r[k], 'isoformat'):
r[k] = r[k].isoformat()
return rows
def get_recent_resolved(hours: int = 24, limit: int = 50) -> list:
with get_conn() as conn:
with conn.cursor() as cur:
cur.execute(
"""SELECT * FROM network_events
WHERE resolved_at IS NOT NULL
AND resolved_at > DATE_SUB(NOW(), INTERVAL %s HOUR)
ORDER BY resolved_at DESC LIMIT %s""",
(hours, limit),
)
rows = cur.fetchall()
for r in rows:
for k in ('first_seen', 'last_seen', 'resolved_at'):
if r.get(k) and hasattr(r[k], 'isoformat'):
r[k] = r[k].isoformat()
return rows
def get_status_summary() -> dict:
with get_conn() as conn:
with conn.cursor() as cur:
cur.execute(
"""SELECT severity, COUNT(*) as cnt FROM network_events
WHERE resolved_at IS NULL GROUP BY severity"""
)
counts = {r['severity']: r['cnt'] for r in cur.fetchall()}
return {
'critical': counts.get('critical', 0),
'warning': counts.get('warning', 0),
'info': counts.get('info', 0),
}
# ---------------------------------------------------------------------------
# Suppression rules
# ---------------------------------------------------------------------------
def get_active_suppressions() -> list:
with get_conn() as conn:
with conn.cursor() as cur:
cur.execute(
"""SELECT * FROM suppression_rules
WHERE active=TRUE AND (expires_at IS NULL OR expires_at > NOW())
ORDER BY created_at DESC"""
)
rows = cur.fetchall()
for r in rows:
for k in ('created_at', 'expires_at'):
if r.get(k) and hasattr(r[k], 'isoformat'):
r[k] = r[k].isoformat()
return rows
def get_suppression_history(limit: int = 50) -> list:
with get_conn() as conn:
with conn.cursor() as cur:
cur.execute(
'SELECT * FROM suppression_rules ORDER BY created_at DESC LIMIT %s',
(limit,),
)
rows = cur.fetchall()
for r in rows:
for k in ('created_at', 'expires_at'):
if r.get(k) and hasattr(r[k], 'isoformat'):
r[k] = r[k].isoformat()
return rows
def create_suppression(
target_type: str,
target_name: str,
target_detail: str,
reason: str,
suppressed_by: str,
expires_minutes: Optional[int] = None,
) -> int:
expires_at = None
if expires_minutes:
expires_at = datetime.utcnow() + timedelta(minutes=int(expires_minutes))
with get_conn() as conn:
with conn.cursor() as cur:
cur.execute(
"""INSERT INTO suppression_rules
(target_type, target_name, target_detail, reason, suppressed_by, expires_at, active)
VALUES (%s, %s, %s, %s, %s, %s, TRUE)""",
(target_type, target_name or '', target_detail or '', reason, suppressed_by, expires_at),
)
return cur.lastrowid
def deactivate_suppression(sup_id: int) -> None:
with get_conn() as conn:
with conn.cursor() as cur:
cur.execute(
'UPDATE suppression_rules SET active=FALSE WHERE id=%s', (sup_id,)
)
def is_suppressed(target_type: str, target_name: str, target_detail: str = '') -> bool:
with get_conn() as conn:
with conn.cursor() as cur:
# Global suppression (all)
cur.execute(
"""SELECT id FROM suppression_rules
WHERE active=TRUE AND (expires_at IS NULL OR expires_at > NOW())
AND target_type='all' LIMIT 1"""
)
if cur.fetchone():
return True
if not target_name:
return False
# Host-level suppression (covers all interfaces on that host)
cur.execute(
"""SELECT id FROM suppression_rules
WHERE active=TRUE AND (expires_at IS NULL OR expires_at > NOW())
AND target_type=%s AND target_name=%s
AND (target_detail IS NULL OR target_detail='') LIMIT 1""",
(target_type, target_name),
)
if cur.fetchone():
return True
# Interface/device-specific suppression
if target_detail:
cur.execute(
"""SELECT id FROM suppression_rules
WHERE active=TRUE AND (expires_at IS NULL OR expires_at > NOW())
AND target_type=%s AND target_name=%s AND target_detail=%s LIMIT 1""",
(target_type, target_name, target_detail),
)
if cur.fetchone():
return True
return False