Files
gandalf/monitor.py

480 lines
18 KiB
Python
Raw Normal View History

#!/usr/bin/env python3
"""Gandalf network monitor daemon.
Polls Prometheus (node_exporter) and the UniFi controller for network
interface and device state. Creates tickets in Tinker Tickets when issues
are detected, with deduplication and suppression support.
Run as a separate systemd service alongside the Flask web app.
"""
import json
import logging
import re
import subprocess
import time
from datetime import datetime
from typing import Dict, List, Optional
import requests
from urllib3.exceptions import InsecureRequestWarning
import db
requests.packages.urllib3.disable_warnings(InsecureRequestWarning)
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s %(levelname)s %(name)s %(message)s',
)
logger = logging.getLogger('gandalf.monitor')
# --------------------------------------------------------------------------
# Interface filtering
# --------------------------------------------------------------------------
_SKIP_PREFIXES = (
'lo', 'veth', 'tap', 'fwbr', 'fwln', 'fwpr',
'docker', 'dummy', 'br-', 'virbr', 'vmbr',
)
_VLAN_SUFFIX = re.compile(r'\.\d+$')
def is_physical_interface(name: str) -> bool:
"""Return True for physical/bond interfaces worth monitoring."""
if any(name.startswith(p) for p in _SKIP_PREFIXES):
return False
if _VLAN_SUFFIX.search(name):
return False
return True
# --------------------------------------------------------------------------
# Prometheus client
# --------------------------------------------------------------------------
class PrometheusClient:
def __init__(self, url: str):
self.url = url.rstrip('/')
def query(self, promql: str) -> list:
try:
resp = requests.get(
f'{self.url}/api/v1/query',
params={'query': promql},
timeout=15,
)
resp.raise_for_status()
data = resp.json()
if data.get('status') == 'success':
return data['data']['result']
except Exception as e:
logger.error(f'Prometheus query failed ({promql!r}): {e}')
return []
def get_interface_states(self) -> Dict[str, Dict[str, bool]]:
"""Return {instance: {device: is_up}} for physical interfaces."""
results = self.query('node_network_up')
hosts: Dict[str, Dict[str, bool]] = {}
for r in results:
instance = r['metric'].get('instance', '')
device = r['metric'].get('device', '')
if not is_physical_interface(device):
continue
hosts.setdefault(instance, {})[device] = (r['value'][1] == '1')
return hosts
# --------------------------------------------------------------------------
# UniFi client
# --------------------------------------------------------------------------
class UnifiClient:
def __init__(self, cfg: dict):
self.base_url = cfg['controller']
self.site_id = cfg.get('site_id', 'default')
self.session = requests.Session()
self.session.verify = False
self.headers = {
'X-API-KEY': cfg['api_key'],
'Accept': 'application/json',
}
def get_devices(self) -> Optional[List[dict]]:
"""Return list of UniFi devices, or None if the controller is unreachable."""
try:
url = f'{self.base_url}/proxy/network/v2/api/site/{self.site_id}/device'
resp = self.session.get(url, headers=self.headers, timeout=15)
resp.raise_for_status()
data = resp.json()
devices = []
for d in data.get('network_devices', []):
state = d.get('state', 1)
devices.append({
'name': d.get('name') or d.get('mac', 'unknown'),
'mac': d.get('mac', ''),
'ip': d.get('ip', ''),
'type': d.get('type', 'unknown'),
'model': d.get('model', ''),
'state': state,
'connected': state == 1,
})
return devices
except Exception as e:
logger.error(f'UniFi API error: {e}')
return None
# --------------------------------------------------------------------------
# Ticket client
# --------------------------------------------------------------------------
class TicketClient:
def __init__(self, cfg: dict):
self.url = cfg.get('url', '')
self.api_key = cfg.get('api_key', '')
def create(self, title: str, description: str, priority: str = '2') -> Optional[str]:
if not self.api_key or not self.url:
logger.warning('Ticket API not configured skipping ticket creation')
return None
try:
resp = requests.post(
self.url,
json={
'title': title,
'description': description,
'status': 'Open',
'priority': priority,
'category': 'Network',
'type': 'Issue',
},
headers={'Authorization': f'Bearer {self.api_key}'},
timeout=15,
)
resp.raise_for_status()
data = resp.json()
if data.get('success'):
tid = data['ticket_id']
logger.info(f'Created ticket #{tid}: {title}')
return tid
if data.get('existing_ticket_id'):
logger.info(f'Duplicate suppressed by API existing #{data["existing_ticket_id"]}')
return data['existing_ticket_id']
logger.warning(f'Unexpected ticket API response: {data}')
except Exception as e:
logger.error(f'Ticket creation failed: {e}')
return None
# --------------------------------------------------------------------------
# Helpers
# --------------------------------------------------------------------------
def ping(ip: str, count: int = 3, timeout: int = 2) -> bool:
try:
r = subprocess.run(
['ping', '-c', str(count), '-W', str(timeout), ip],
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
timeout=30,
)
return r.returncode == 0
except Exception:
return False
def _now_utc() -> str:
return datetime.utcnow().strftime('%Y-%m-%d %H:%M:%S UTC')
# --------------------------------------------------------------------------
# Monitor
# --------------------------------------------------------------------------
CLUSTER_NAME = 'proxmox-cluster'
class NetworkMonitor:
def __init__(self):
with open('config.json') as f:
self.cfg = json.load(f)
prom_url = self.cfg['prometheus']['url']
self.prom = PrometheusClient(prom_url)
self.unifi = UnifiClient(self.cfg['unifi'])
self.tickets = TicketClient(self.cfg.get('ticket_api', {}))
mon = self.cfg.get('monitor', {})
self.poll_interval = mon.get('poll_interval', 120)
self.fail_thresh = mon.get('failure_threshold', 2)
self.cluster_thresh = mon.get('cluster_threshold', 3)
# Build Prometheus instance → hostname lookup
self._instance_map: Dict[str, str] = {
h['prometheus_instance']: h['name']
for h in self.cfg.get('hosts', [])
if 'prometheus_instance' in h
}
def _hostname(self, instance: str) -> str:
return self._instance_map.get(instance, instance.split(':')[0])
# ------------------------------------------------------------------
# Interface monitoring (Prometheus)
# ------------------------------------------------------------------
def _process_interfaces(self, states: Dict[str, Dict[str, bool]]) -> None:
baseline = db.get_baseline()
new_baseline = {k: dict(v) for k, v in baseline.items()}
# Only count hosts with genuine regressions (UP→DOWN) toward cluster threshold
hosts_with_regression: List[str] = []
for instance, ifaces in states.items():
host = self._hostname(instance)
new_baseline.setdefault(host, {})
host_has_regression = False
for iface, is_up in ifaces.items():
prev = baseline.get(host, {}).get(iface) # 'up', 'initial_down', or None
if is_up:
new_baseline[host][iface] = 'up'
db.resolve_event('interface_down', host, iface)
else:
if prev is None:
# First observation is down could be unused port, don't alert
new_baseline[host][iface] = 'initial_down'
elif prev == 'initial_down':
# Persistently down since first observation no alert
pass
else: # prev == 'up'
# Regression: was UP, now DOWN
host_has_regression = True
sup = (
db.is_suppressed('interface', host, iface) or
db.is_suppressed('host', host)
)
event_id, is_new, consec = db.upsert_event(
'interface_down', 'critical', 'prometheus',
host, iface,
f'Interface {iface} on {host} went link-down ({_now_utc()})',
)
if not sup and consec >= self.fail_thresh:
self._ticket_interface(event_id, is_new, host, iface, consec)
if host_has_regression:
hosts_with_regression.append(host)
db.set_baseline(new_baseline)
# Cluster-wide check only genuine regressions count
if len(hosts_with_regression) >= self.cluster_thresh:
sup = db.is_suppressed('all', '')
event_id, is_new, consec = db.upsert_event(
'cluster_network_issue', 'critical', 'prometheus',
CLUSTER_NAME, '',
f'{len(hosts_with_regression)} hosts reporting simultaneous interface failures: '
f'{", ".join(hosts_with_regression)}',
)
if not sup and is_new:
title = (
f'[{CLUSTER_NAME}][auto][production][issue][network][cluster-wide] '
f'Multiple hosts reporting interface failures'
)
desc = (
f'Cluster Network Alert\n{"=" * 40}\n\n'
f'Affected hosts: {", ".join(hosts_with_regression)}\n'
f'Detected: {_now_utc()}\n\n'
f'{len(hosts_with_regression)} Proxmox hosts simultaneously reported '
f'interface regressions (link-down on interfaces previously known UP).\n'
f'This likely indicates a switch or upstream network failure.\n\n'
f'Please check the core and management switches immediately.'
)
tid = self.tickets.create(title, desc, priority='1')
if tid:
db.set_ticket_id(event_id, tid)
else:
db.resolve_event('cluster_network_issue', CLUSTER_NAME, '')
def _ticket_interface(
self, event_id: int, is_new: bool, host: str, iface: str, consec: int
) -> None:
title = (
f'[{host}][auto][production][issue][network][single-node] '
f'Interface {iface} link-down'
)
desc = (
f'Network Interface Alert\n{"=" * 40}\n\n'
f'Host: {host}\n'
f'Interface: {iface}\n'
f'Detected: {_now_utc()}\n'
f'Consecutive check failures: {consec}\n\n'
f'Interface {iface} on {host} is reporting link-down state via '
f'Prometheus node_exporter.\n\n'
f'Note: {host} may still be reachable via its other network interface.\n'
f'Please inspect the cable/SFP/switch port for {host}/{iface}.'
)
tid = self.tickets.create(title, desc, priority='2')
if tid and is_new:
db.set_ticket_id(event_id, tid)
# ------------------------------------------------------------------
# UniFi device monitoring
# ------------------------------------------------------------------
def _process_unifi(self, devices: Optional[List[dict]]) -> None:
if devices is None:
logger.warning('UniFi API unreachable this cycle')
return
for d in devices:
name = d['name']
if not d['connected']:
sup = db.is_suppressed('unifi_device', name)
event_id, is_new, consec = db.upsert_event(
'unifi_device_offline', 'critical', 'unifi',
name, d.get('type', ''),
f'UniFi {name} ({d.get("ip","")}) offline ({_now_utc()})',
)
if not sup and consec >= self.fail_thresh:
self._ticket_unifi(event_id, is_new, d)
else:
db.resolve_event('unifi_device_offline', name, d.get('type', ''))
def _ticket_unifi(self, event_id: int, is_new: bool, device: dict) -> None:
name = device['name']
title = (
f'[{name}][auto][production][issue][network][single-node] '
f'UniFi device offline'
)
desc = (
f'UniFi Device Alert\n{"=" * 40}\n\n'
f'Device: {name}\n'
f'Type: {device.get("type","unknown")}\n'
f'Model: {device.get("model","")}\n'
f'Last Known IP: {device.get("ip","unknown")}\n'
f'Detected: {_now_utc()}\n\n'
f'The UniFi device {name} is offline per the UniFi controller.\n'
f'Please check power and cable connectivity.'
)
tid = self.tickets.create(title, desc, priority='2')
if tid and is_new:
db.set_ticket_id(event_id, tid)
# ------------------------------------------------------------------
# Ping-only hosts (no node_exporter)
# ------------------------------------------------------------------
def _process_ping_hosts(self) -> None:
for h in self.cfg.get('monitor', {}).get('ping_hosts', []):
name, ip = h['name'], h['ip']
reachable = ping(ip)
if not reachable:
sup = db.is_suppressed('host', name)
event_id, is_new, consec = db.upsert_event(
'host_unreachable', 'critical', 'ping',
name, ip,
f'Host {name} ({ip}) unreachable via ping ({_now_utc()})',
)
if not sup and consec >= self.fail_thresh:
self._ticket_unreachable(event_id, is_new, name, ip, consec)
else:
db.resolve_event('host_unreachable', name, ip)
def _ticket_unreachable(
self, event_id: int, is_new: bool, name: str, ip: str, consec: int
) -> None:
title = (
f'[{name}][auto][production][issue][network][single-node] '
f'Host unreachable'
)
desc = (
f'Host Reachability Alert\n{"=" * 40}\n\n'
f'Host: {name}\n'
f'IP: {ip}\n'
f'Detected: {_now_utc()}\n'
f'Consecutive check failures: {consec}\n\n'
f'Host {name} ({ip}) is not responding to ping from the Gandalf monitor.\n'
f'This host does not have a Prometheus node_exporter, so interface-level '
f'detail is unavailable.\n\n'
f'Please check the host power, management interface, and network connectivity.'
)
tid = self.tickets.create(title, desc, priority='2')
if tid and is_new:
db.set_ticket_id(event_id, tid)
# ------------------------------------------------------------------
# Snapshot collection (for dashboard)
# ------------------------------------------------------------------
def _collect_snapshot(self) -> dict:
iface_states = self.prom.get_interface_states()
unifi_devices = self.unifi.get_devices() or []
hosts = {}
for instance, ifaces in iface_states.items():
host = self._hostname(instance)
phys = {k: v for k, v in ifaces.items()}
up_count = sum(1 for v in phys.values() if v)
total = len(phys)
if total == 0 or up_count == total:
status = 'up'
elif up_count == 0:
status = 'down'
else:
status = 'degraded'
hosts[host] = {
'ip': instance.split(':')[0],
'interfaces': {k: ('up' if v else 'down') for k, v in phys.items()},
'status': status,
'source': 'prometheus',
}
for h in self.cfg.get('monitor', {}).get('ping_hosts', []):
name, ip = h['name'], h['ip']
reachable = ping(ip, count=1, timeout=2)
hosts[name] = {
'ip': ip,
'interfaces': {},
'status': 'up' if reachable else 'down',
'source': 'ping',
}
return {
'hosts': hosts,
'unifi': unifi_devices,
'updated': datetime.utcnow().isoformat(),
}
# ------------------------------------------------------------------
# Main loop
# ------------------------------------------------------------------
def run(self) -> None:
logger.info(
f'Gandalf monitor started poll_interval={self.poll_interval}s '
f'fail_thresh={self.fail_thresh}'
)
while True:
try:
logger.info('Starting network check cycle')
# 1. Collect and store snapshot for dashboard
snapshot = self._collect_snapshot()
db.set_state('network_snapshot', snapshot)
db.set_state('last_check', _now_utc())
# 2. Process alerts (separate Prometheus call for fresh data)
iface_states = self.prom.get_interface_states()
self._process_interfaces(iface_states)
unifi_devices = self.unifi.get_devices()
self._process_unifi(unifi_devices)
self._process_ping_hosts()
logger.info('Network check cycle complete')
except Exception as e:
logger.error(f'Monitor loop error: {e}', exc_info=True)
time.sleep(self.poll_interval)
if __name__ == '__main__':
monitor = NetworkMonitor()
monitor.run()