2026-03-01 23:03:18 -05:00
|
|
|
"""Database operations for Gandalf network monitor."""
|
|
|
|
|
import json
|
|
|
|
|
import logging
|
feat: inspector page, link debug enhancements, security hardening
- Add /inspector page: visual model-accurate switch chassis diagrams
(USF5P, USL8A, US24PRO, USPPDUP, USMINI), clickable port blocks
with color coding (green=up, amber=PoE, cyan=uplink, grey=down),
detail panel with stats/PoE/LLDP, LLDP-based path debug side-by-side
- Link Debug: port number badges (#N), LLDP neighbor line, PoE class/max,
collapsible host/switch panels with sessionStorage persistence
- monitor.py: collect LLDP neighbor map + PoE class/max/mode per switch
port; PulseClient uses requests.Session() for HTTP keep-alive; add
shlex.quote() around interface names (defense-in-depth)
- Security: suppress buttons use data-* attrs + delegated click handler
instead of inline onclick with Jinja2 variable interpolation; remove
| safe filter from user-controlled fields in suppressions.html;
setDuration() takes explicit el param instead of implicit event global
- db.py: thread-local connection reuse with ping(reconnect=True) to
avoid a new TCP handshake per query
- .gitignore: add config.json (contains credentials), __pycache__
- README: full rewrite covering architecture, all 4 pages, alert logic,
config reference, deployment, troubleshooting, security notes
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-03 15:39:48 -05:00
|
|
|
import threading
|
2026-03-01 23:03:18 -05:00
|
|
|
from contextlib import contextmanager
|
|
|
|
|
from datetime import datetime, timedelta
|
|
|
|
|
from typing import Optional
|
|
|
|
|
|
|
|
|
|
import pymysql
|
|
|
|
|
import pymysql.cursors
|
|
|
|
|
|
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
|
|
|
|
_config_cache = None
|
feat: inspector page, link debug enhancements, security hardening
- Add /inspector page: visual model-accurate switch chassis diagrams
(USF5P, USL8A, US24PRO, USPPDUP, USMINI), clickable port blocks
with color coding (green=up, amber=PoE, cyan=uplink, grey=down),
detail panel with stats/PoE/LLDP, LLDP-based path debug side-by-side
- Link Debug: port number badges (#N), LLDP neighbor line, PoE class/max,
collapsible host/switch panels with sessionStorage persistence
- monitor.py: collect LLDP neighbor map + PoE class/max/mode per switch
port; PulseClient uses requests.Session() for HTTP keep-alive; add
shlex.quote() around interface names (defense-in-depth)
- Security: suppress buttons use data-* attrs + delegated click handler
instead of inline onclick with Jinja2 variable interpolation; remove
| safe filter from user-controlled fields in suppressions.html;
setDuration() takes explicit el param instead of implicit event global
- db.py: thread-local connection reuse with ping(reconnect=True) to
avoid a new TCP handshake per query
- .gitignore: add config.json (contains credentials), __pycache__
- README: full rewrite covering architecture, all 4 pages, alert logic,
config reference, deployment, troubleshooting, security notes
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-03 15:39:48 -05:00
|
|
|
_local = threading.local()
|
2026-03-01 23:03:18 -05:00
|
|
|
|
|
|
|
|
|
|
|
|
|
def _config() -> dict:
|
|
|
|
|
global _config_cache
|
|
|
|
|
if _config_cache is None:
|
|
|
|
|
with open('config.json') as f:
|
|
|
|
|
_config_cache = json.load(f)['database']
|
|
|
|
|
return _config_cache
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@contextmanager
|
|
|
|
|
def get_conn():
|
feat: inspector page, link debug enhancements, security hardening
- Add /inspector page: visual model-accurate switch chassis diagrams
(USF5P, USL8A, US24PRO, USPPDUP, USMINI), clickable port blocks
with color coding (green=up, amber=PoE, cyan=uplink, grey=down),
detail panel with stats/PoE/LLDP, LLDP-based path debug side-by-side
- Link Debug: port number badges (#N), LLDP neighbor line, PoE class/max,
collapsible host/switch panels with sessionStorage persistence
- monitor.py: collect LLDP neighbor map + PoE class/max/mode per switch
port; PulseClient uses requests.Session() for HTTP keep-alive; add
shlex.quote() around interface names (defense-in-depth)
- Security: suppress buttons use data-* attrs + delegated click handler
instead of inline onclick with Jinja2 variable interpolation; remove
| safe filter from user-controlled fields in suppressions.html;
setDuration() takes explicit el param instead of implicit event global
- db.py: thread-local connection reuse with ping(reconnect=True) to
avoid a new TCP handshake per query
- .gitignore: add config.json (contains credentials), __pycache__
- README: full rewrite covering architecture, all 4 pages, alert logic,
config reference, deployment, troubleshooting, security notes
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-03 15:39:48 -05:00
|
|
|
"""Yield a per-thread cached database connection, reconnecting as needed."""
|
2026-03-01 23:03:18 -05:00
|
|
|
cfg = _config()
|
feat: inspector page, link debug enhancements, security hardening
- Add /inspector page: visual model-accurate switch chassis diagrams
(USF5P, USL8A, US24PRO, USPPDUP, USMINI), clickable port blocks
with color coding (green=up, amber=PoE, cyan=uplink, grey=down),
detail panel with stats/PoE/LLDP, LLDP-based path debug side-by-side
- Link Debug: port number badges (#N), LLDP neighbor line, PoE class/max,
collapsible host/switch panels with sessionStorage persistence
- monitor.py: collect LLDP neighbor map + PoE class/max/mode per switch
port; PulseClient uses requests.Session() for HTTP keep-alive; add
shlex.quote() around interface names (defense-in-depth)
- Security: suppress buttons use data-* attrs + delegated click handler
instead of inline onclick with Jinja2 variable interpolation; remove
| safe filter from user-controlled fields in suppressions.html;
setDuration() takes explicit el param instead of implicit event global
- db.py: thread-local connection reuse with ping(reconnect=True) to
avoid a new TCP handshake per query
- .gitignore: add config.json (contains credentials), __pycache__
- README: full rewrite covering architecture, all 4 pages, alert logic,
config reference, deployment, troubleshooting, security notes
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-03 15:39:48 -05:00
|
|
|
conn = getattr(_local, 'conn', None)
|
|
|
|
|
if conn is None:
|
|
|
|
|
conn = pymysql.connect(
|
|
|
|
|
host=cfg['host'],
|
|
|
|
|
port=cfg.get('port', 3306),
|
|
|
|
|
user=cfg['user'],
|
|
|
|
|
password=cfg['password'],
|
|
|
|
|
database=cfg['name'],
|
|
|
|
|
autocommit=True,
|
|
|
|
|
cursorclass=pymysql.cursors.DictCursor,
|
|
|
|
|
connect_timeout=10,
|
|
|
|
|
charset='utf8mb4',
|
|
|
|
|
)
|
|
|
|
|
_local.conn = conn
|
|
|
|
|
else:
|
|
|
|
|
conn.ping(reconnect=True)
|
|
|
|
|
yield conn
|
2026-03-01 23:03:18 -05:00
|
|
|
|
|
|
|
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
# Monitor state (key/value store)
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
|
|
|
|
def set_state(key: str, value) -> None:
|
|
|
|
|
if not isinstance(value, str):
|
|
|
|
|
value = json.dumps(value, default=str)
|
|
|
|
|
with get_conn() as conn:
|
|
|
|
|
with conn.cursor() as cur:
|
|
|
|
|
cur.execute(
|
|
|
|
|
"""INSERT INTO monitor_state (key_name, value)
|
|
|
|
|
VALUES (%s, %s)
|
|
|
|
|
ON DUPLICATE KEY UPDATE value=VALUES(value), updated_at=NOW()""",
|
|
|
|
|
(key, value),
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def get_state(key: str, default=None):
|
|
|
|
|
with get_conn() as conn:
|
|
|
|
|
with conn.cursor() as cur:
|
|
|
|
|
cur.execute('SELECT value FROM monitor_state WHERE key_name=%s', (key,))
|
|
|
|
|
row = cur.fetchone()
|
|
|
|
|
return row['value'] if row else default
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
# Interface baseline tracking
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
|
|
|
|
def get_baseline() -> dict:
|
|
|
|
|
raw = get_state('interface_baseline')
|
|
|
|
|
if raw:
|
|
|
|
|
try:
|
|
|
|
|
return json.loads(raw)
|
|
|
|
|
except Exception:
|
2026-03-14 14:13:54 -04:00
|
|
|
logger.error('Failed to parse interface_baseline JSON; resetting baseline')
|
2026-03-01 23:03:18 -05:00
|
|
|
return {}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def set_baseline(baseline: dict) -> None:
|
|
|
|
|
set_state('interface_baseline', json.dumps(baseline))
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
# Network events
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
|
|
|
|
def upsert_event(
|
|
|
|
|
event_type: str,
|
|
|
|
|
severity: str,
|
|
|
|
|
source_type: str,
|
|
|
|
|
target_name: str,
|
|
|
|
|
target_detail: str,
|
|
|
|
|
description: str,
|
|
|
|
|
) -> tuple:
|
|
|
|
|
"""Insert or update a network event. Returns (id, is_new, consecutive_failures)."""
|
|
|
|
|
detail = target_detail or ''
|
|
|
|
|
with get_conn() as conn:
|
|
|
|
|
with conn.cursor() as cur:
|
|
|
|
|
cur.execute(
|
|
|
|
|
"""SELECT id, consecutive_failures FROM network_events
|
|
|
|
|
WHERE event_type=%s AND target_name=%s AND target_detail=%s
|
|
|
|
|
AND resolved_at IS NULL LIMIT 1""",
|
|
|
|
|
(event_type, target_name, detail),
|
|
|
|
|
)
|
|
|
|
|
existing = cur.fetchone()
|
|
|
|
|
|
|
|
|
|
if existing:
|
|
|
|
|
new_count = existing['consecutive_failures'] + 1
|
|
|
|
|
cur.execute(
|
|
|
|
|
"""UPDATE network_events
|
|
|
|
|
SET last_seen=NOW(), consecutive_failures=%s, description=%s
|
|
|
|
|
WHERE id=%s""",
|
|
|
|
|
(new_count, description, existing['id']),
|
|
|
|
|
)
|
|
|
|
|
return existing['id'], False, new_count
|
|
|
|
|
else:
|
|
|
|
|
cur.execute(
|
|
|
|
|
"""INSERT INTO network_events
|
|
|
|
|
(event_type, severity, source_type, target_name, target_detail, description)
|
|
|
|
|
VALUES (%s, %s, %s, %s, %s, %s)""",
|
|
|
|
|
(event_type, severity, source_type, target_name, detail, description),
|
|
|
|
|
)
|
|
|
|
|
return cur.lastrowid, True, 1
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def resolve_event(event_type: str, target_name: str, target_detail: str = '') -> None:
|
|
|
|
|
detail = target_detail or ''
|
|
|
|
|
with get_conn() as conn:
|
|
|
|
|
with conn.cursor() as cur:
|
|
|
|
|
cur.execute(
|
|
|
|
|
"""UPDATE network_events SET resolved_at=NOW()
|
|
|
|
|
WHERE event_type=%s AND target_name=%s AND target_detail=%s
|
|
|
|
|
AND resolved_at IS NULL""",
|
|
|
|
|
(event_type, target_name, detail),
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def set_ticket_id(event_id: int, ticket_id: str) -> None:
|
|
|
|
|
with get_conn() as conn:
|
|
|
|
|
with conn.cursor() as cur:
|
|
|
|
|
cur.execute(
|
|
|
|
|
'UPDATE network_events SET ticket_id=%s WHERE id=%s',
|
|
|
|
|
(ticket_id, event_id),
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def get_active_events() -> list:
|
|
|
|
|
with get_conn() as conn:
|
|
|
|
|
with conn.cursor() as cur:
|
|
|
|
|
cur.execute(
|
|
|
|
|
"""SELECT * FROM network_events
|
|
|
|
|
WHERE resolved_at IS NULL
|
|
|
|
|
ORDER BY
|
|
|
|
|
FIELD(severity,'critical','warning','info'),
|
|
|
|
|
first_seen DESC"""
|
|
|
|
|
)
|
|
|
|
|
rows = cur.fetchall()
|
|
|
|
|
for r in rows:
|
|
|
|
|
for k in ('first_seen', 'last_seen'):
|
|
|
|
|
if r.get(k) and hasattr(r[k], 'isoformat'):
|
|
|
|
|
r[k] = r[k].isoformat()
|
|
|
|
|
return rows
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def get_recent_resolved(hours: int = 24, limit: int = 50) -> list:
|
|
|
|
|
with get_conn() as conn:
|
|
|
|
|
with conn.cursor() as cur:
|
|
|
|
|
cur.execute(
|
|
|
|
|
"""SELECT * FROM network_events
|
|
|
|
|
WHERE resolved_at IS NOT NULL
|
|
|
|
|
AND resolved_at > DATE_SUB(NOW(), INTERVAL %s HOUR)
|
|
|
|
|
ORDER BY resolved_at DESC LIMIT %s""",
|
|
|
|
|
(hours, limit),
|
|
|
|
|
)
|
|
|
|
|
rows = cur.fetchall()
|
|
|
|
|
for r in rows:
|
|
|
|
|
for k in ('first_seen', 'last_seen', 'resolved_at'):
|
|
|
|
|
if r.get(k) and hasattr(r[k], 'isoformat'):
|
|
|
|
|
r[k] = r[k].isoformat()
|
|
|
|
|
return rows
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def get_status_summary() -> dict:
|
|
|
|
|
with get_conn() as conn:
|
|
|
|
|
with conn.cursor() as cur:
|
|
|
|
|
cur.execute(
|
|
|
|
|
"""SELECT severity, COUNT(*) as cnt FROM network_events
|
|
|
|
|
WHERE resolved_at IS NULL GROUP BY severity"""
|
|
|
|
|
)
|
|
|
|
|
counts = {r['severity']: r['cnt'] for r in cur.fetchall()}
|
|
|
|
|
return {
|
|
|
|
|
'critical': counts.get('critical', 0),
|
|
|
|
|
'warning': counts.get('warning', 0),
|
|
|
|
|
'info': counts.get('info', 0),
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
# Suppression rules
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
|
|
|
|
def get_active_suppressions() -> list:
|
|
|
|
|
with get_conn() as conn:
|
|
|
|
|
with conn.cursor() as cur:
|
|
|
|
|
cur.execute(
|
|
|
|
|
"""SELECT * FROM suppression_rules
|
|
|
|
|
WHERE active=TRUE AND (expires_at IS NULL OR expires_at > NOW())
|
|
|
|
|
ORDER BY created_at DESC"""
|
|
|
|
|
)
|
|
|
|
|
rows = cur.fetchall()
|
|
|
|
|
for r in rows:
|
|
|
|
|
for k in ('created_at', 'expires_at'):
|
|
|
|
|
if r.get(k) and hasattr(r[k], 'isoformat'):
|
|
|
|
|
r[k] = r[k].isoformat()
|
|
|
|
|
return rows
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def get_suppression_history(limit: int = 50) -> list:
|
|
|
|
|
with get_conn() as conn:
|
|
|
|
|
with conn.cursor() as cur:
|
|
|
|
|
cur.execute(
|
|
|
|
|
'SELECT * FROM suppression_rules ORDER BY created_at DESC LIMIT %s',
|
|
|
|
|
(limit,),
|
|
|
|
|
)
|
|
|
|
|
rows = cur.fetchall()
|
|
|
|
|
for r in rows:
|
|
|
|
|
for k in ('created_at', 'expires_at'):
|
|
|
|
|
if r.get(k) and hasattr(r[k], 'isoformat'):
|
|
|
|
|
r[k] = r[k].isoformat()
|
|
|
|
|
return rows
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def create_suppression(
|
|
|
|
|
target_type: str,
|
|
|
|
|
target_name: str,
|
|
|
|
|
target_detail: str,
|
|
|
|
|
reason: str,
|
|
|
|
|
suppressed_by: str,
|
|
|
|
|
expires_minutes: Optional[int] = None,
|
|
|
|
|
) -> int:
|
|
|
|
|
expires_at = None
|
|
|
|
|
if expires_minutes:
|
|
|
|
|
expires_at = datetime.utcnow() + timedelta(minutes=int(expires_minutes))
|
|
|
|
|
with get_conn() as conn:
|
|
|
|
|
with conn.cursor() as cur:
|
|
|
|
|
cur.execute(
|
|
|
|
|
"""INSERT INTO suppression_rules
|
|
|
|
|
(target_type, target_name, target_detail, reason, suppressed_by, expires_at, active)
|
|
|
|
|
VALUES (%s, %s, %s, %s, %s, %s, TRUE)""",
|
|
|
|
|
(target_type, target_name or '', target_detail or '', reason, suppressed_by, expires_at),
|
|
|
|
|
)
|
|
|
|
|
return cur.lastrowid
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def deactivate_suppression(sup_id: int) -> None:
|
|
|
|
|
with get_conn() as conn:
|
|
|
|
|
with conn.cursor() as cur:
|
|
|
|
|
cur.execute(
|
|
|
|
|
'UPDATE suppression_rules SET active=FALSE WHERE id=%s', (sup_id,)
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
2026-03-14 21:35:32 -04:00
|
|
|
def cleanup_expired_suppressions() -> int:
|
|
|
|
|
"""Mark expired time-limited suppressions as inactive. Returns count deactivated."""
|
|
|
|
|
with get_conn() as conn:
|
|
|
|
|
with conn.cursor() as cur:
|
|
|
|
|
cur.execute(
|
|
|
|
|
"""UPDATE suppression_rules
|
|
|
|
|
SET active=FALSE
|
|
|
|
|
WHERE active=TRUE AND expires_at IS NOT NULL AND expires_at <= NOW()"""
|
|
|
|
|
)
|
|
|
|
|
n = cur.rowcount
|
|
|
|
|
if n:
|
|
|
|
|
logger.info(f'Deactivated {n} expired suppression(s)')
|
|
|
|
|
return n
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def purge_old_resolved_events(days: int = 90) -> int:
|
|
|
|
|
"""Delete resolved events older than `days` days. Returns count deleted."""
|
|
|
|
|
with get_conn() as conn:
|
|
|
|
|
with conn.cursor() as cur:
|
|
|
|
|
cur.execute(
|
|
|
|
|
"""DELETE FROM network_events
|
|
|
|
|
WHERE resolved_at IS NOT NULL
|
|
|
|
|
AND resolved_at < DATE_SUB(NOW(), INTERVAL %s DAY)""",
|
|
|
|
|
(days,),
|
|
|
|
|
)
|
|
|
|
|
n = cur.rowcount
|
|
|
|
|
if n:
|
|
|
|
|
logger.info(f'Purged {n} old resolved event(s) (>{days}d)')
|
|
|
|
|
return n
|
|
|
|
|
|
|
|
|
|
|
2026-03-14 14:13:54 -04:00
|
|
|
def check_suppressed(suppressions: list, target_type: str, target_name: str, target_detail: str = '') -> bool:
|
|
|
|
|
"""Check suppression against a pre-loaded list (avoids per-call DB queries)."""
|
|
|
|
|
for s in suppressions:
|
|
|
|
|
if s['target_type'] == 'all':
|
|
|
|
|
return True
|
|
|
|
|
if s['target_type'] == target_type and s['target_name'] == target_name:
|
|
|
|
|
if not (s.get('target_detail') or ''):
|
|
|
|
|
return True
|
|
|
|
|
if target_detail and s.get('target_detail') == target_detail:
|
|
|
|
|
return True
|
|
|
|
|
return False
|
|
|
|
|
|
|
|
|
|
|
2026-03-01 23:03:18 -05:00
|
|
|
def is_suppressed(target_type: str, target_name: str, target_detail: str = '') -> bool:
|
|
|
|
|
with get_conn() as conn:
|
|
|
|
|
with conn.cursor() as cur:
|
|
|
|
|
# Global suppression (all)
|
|
|
|
|
cur.execute(
|
|
|
|
|
"""SELECT id FROM suppression_rules
|
|
|
|
|
WHERE active=TRUE AND (expires_at IS NULL OR expires_at > NOW())
|
|
|
|
|
AND target_type='all' LIMIT 1"""
|
|
|
|
|
)
|
|
|
|
|
if cur.fetchone():
|
|
|
|
|
return True
|
|
|
|
|
|
|
|
|
|
if not target_name:
|
|
|
|
|
return False
|
|
|
|
|
|
|
|
|
|
# Host-level suppression (covers all interfaces on that host)
|
|
|
|
|
cur.execute(
|
|
|
|
|
"""SELECT id FROM suppression_rules
|
|
|
|
|
WHERE active=TRUE AND (expires_at IS NULL OR expires_at > NOW())
|
|
|
|
|
AND target_type=%s AND target_name=%s
|
|
|
|
|
AND (target_detail IS NULL OR target_detail='') LIMIT 1""",
|
|
|
|
|
(target_type, target_name),
|
|
|
|
|
)
|
|
|
|
|
if cur.fetchone():
|
|
|
|
|
return True
|
|
|
|
|
|
|
|
|
|
# Interface/device-specific suppression
|
|
|
|
|
if target_detail:
|
|
|
|
|
cur.execute(
|
|
|
|
|
"""SELECT id FROM suppression_rules
|
|
|
|
|
WHERE active=TRUE AND (expires_at IS NULL OR expires_at > NOW())
|
|
|
|
|
AND target_type=%s AND target_name=%s AND target_detail=%s LIMIT 1""",
|
|
|
|
|
(target_type, target_name, target_detail),
|
|
|
|
|
)
|
|
|
|
|
if cur.fetchone():
|
|
|
|
|
return True
|
|
|
|
|
|
|
|
|
|
return False
|