Files
gandalf/monitor.py

965 lines
39 KiB
Python
Raw Normal View History

#!/usr/bin/env python3
"""Gandalf network monitor daemon.
Polls Prometheus (node_exporter) and the UniFi controller for network
interface and device state. Creates tickets in Tinker Tickets when issues
are detected, with deduplication and suppression support.
Run as a separate systemd service alongside the Flask web app.
"""
import json
import logging
import re
import shlex
import subprocess
import time
from datetime import datetime
from typing import Dict, List, Optional
import requests
from urllib3.exceptions import InsecureRequestWarning
import db
requests.packages.urllib3.disable_warnings(InsecureRequestWarning)
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s %(levelname)s %(name)s %(message)s',
)
logger = logging.getLogger('gandalf.monitor')
# --------------------------------------------------------------------------
# Interface filtering
# --------------------------------------------------------------------------
_SKIP_PREFIXES = (
'lo', 'veth', 'tap', 'fwbr', 'fwln', 'fwpr',
'docker', 'dummy', 'br-', 'virbr', 'vmbr',
)
_VLAN_SUFFIX = re.compile(r'\.\d+$')
def is_physical_interface(name: str) -> bool:
"""Return True for physical/bond interfaces worth monitoring."""
if any(name.startswith(p) for p in _SKIP_PREFIXES):
return False
if _VLAN_SUFFIX.search(name):
return False
return True
# --------------------------------------------------------------------------
# Prometheus client
# --------------------------------------------------------------------------
class PrometheusClient:
def __init__(self, url: str):
self.url = url.rstrip('/')
def query(self, promql: str) -> list:
try:
resp = requests.get(
f'{self.url}/api/v1/query',
params={'query': promql},
timeout=15,
)
resp.raise_for_status()
data = resp.json()
if data.get('status') == 'success':
return data['data']['result']
except Exception as e:
logger.error(f'Prometheus query failed ({promql!r}): {e}')
return []
def get_interface_states(self) -> Dict[str, Dict[str, bool]]:
"""Return {instance: {device: is_up}} for physical interfaces."""
results = self.query('node_network_up')
hosts: Dict[str, Dict[str, bool]] = {}
for r in results:
instance = r['metric'].get('instance', '')
device = r['metric'].get('device', '')
if not is_physical_interface(device):
continue
hosts.setdefault(instance, {})[device] = (r['value'][1] == '1')
return hosts
# --------------------------------------------------------------------------
# UniFi client
# --------------------------------------------------------------------------
class UnifiClient:
def __init__(self, cfg: dict):
self.base_url = cfg['controller']
self.site_id = cfg.get('site_id', 'default')
self.session = requests.Session()
self.session.verify = False
self.headers = {
'X-API-KEY': cfg['api_key'],
'Accept': 'application/json',
}
def get_devices(self) -> Optional[List[dict]]:
"""Return list of UniFi devices, or None if the controller is unreachable."""
try:
url = f'{self.base_url}/proxy/network/v2/api/site/{self.site_id}/device'
resp = self.session.get(url, headers=self.headers, timeout=15)
resp.raise_for_status()
data = resp.json()
devices = []
for d in data.get('network_devices', []):
state = d.get('state', 1)
devices.append({
'name': d.get('name') or d.get('mac', 'unknown'),
'mac': d.get('mac', ''),
'ip': d.get('ip', ''),
'type': d.get('type', 'unknown'),
'model': d.get('model', ''),
'state': state,
'connected': state == 1,
})
return devices
except Exception as e:
logger.error(f'UniFi API error: {e}')
return None
def get_switch_ports(self) -> Optional[Dict[str, dict]]:
"""Return per-port stats for all UniFi switches, keyed by switch name.
Uses the v1 stat API which includes full port_table data.
Returns {switch_name: {'ip': str, 'model': str, 'ports': {port_name: {...}}}}.
"""
try:
url = f'{self.base_url}/proxy/network/api/s/{self.site_id}/stat/device'
resp = self.session.get(url, headers=self.headers, timeout=15)
resp.raise_for_status()
devices = resp.json().get('data', [])
result: Dict[str, dict] = {}
for dev in devices:
if dev.get('type', '').lower() != 'usw':
continue
sw_name = dev.get('name') or dev.get('mac', 'unknown')
sw_ip = dev.get('ip', '')
sw_model = dev.get('model', '')
ports: Dict[str, dict] = {}
# Build LLDP neighbor map (keyed by port_idx)
lldp_map: Dict[int, dict] = {}
for entry in dev.get('lldp_table', []):
pidx = entry.get('lldp_port_idx')
if pidx is not None:
lldp_map[int(pidx)] = {
'chassis_id': entry.get('chassis_id', ''),
'system_name': entry.get('system_name', ''),
'port_id': entry.get('port_id', ''),
'port_desc': entry.get('port_desc', ''),
'mgmt_ips': entry.get('management_ips', []),
}
for port in dev.get('port_table', []):
idx = port.get('port_idx', 0)
pname = port.get('name') or f'Port {idx}'
raw_poe = port.get('poe_power')
raw_poe_max = port.get('poe_max_power')
ports[pname] = {
'port_idx': idx,
'switch_ip': sw_ip,
'up': port.get('up', False),
'speed_mbps': port.get('speed', 0),
'full_duplex': port.get('full_duplex', False),
'autoneg': port.get('autoneg', False),
'is_uplink': port.get('is_uplink', False),
'media': port.get('media', ''),
'poe_power': float(raw_poe) if raw_poe is not None else None,
'poe_class': port.get('poe_class'),
'poe_max_power': float(raw_poe_max) if raw_poe_max is not None else None,
'poe_mode': port.get('poe_mode', ''),
'lldp': lldp_map.get(idx),
'tx_bytes': port.get('tx_bytes', 0),
'rx_bytes': port.get('rx_bytes', 0),
'tx_errors': port.get('tx_errors', 0),
'rx_errors': port.get('rx_errors', 0),
'tx_dropped': port.get('tx_dropped', 0),
'rx_dropped': port.get('rx_dropped', 0),
}
if ports:
result[sw_name] = {'ip': sw_ip, 'model': sw_model, 'ports': ports}
return result
except Exception as e:
logger.error(f'UniFi switch port stats error: {e}')
return None
# --------------------------------------------------------------------------
# Ticket client
# --------------------------------------------------------------------------
class TicketClient:
def __init__(self, cfg: dict):
self.url = cfg.get('url', '')
self.api_key = cfg.get('api_key', '')
def create(self, title: str, description: str, priority: str = '2') -> Optional[str]:
if not self.api_key or not self.url:
logger.warning('Ticket API not configured skipping ticket creation')
return None
try:
resp = requests.post(
self.url,
json={
'title': title,
'description': description,
'status': 'Open',
'priority': priority,
'category': 'Network',
'type': 'Issue',
},
headers={'Authorization': f'Bearer {self.api_key}'},
timeout=15,
)
resp.raise_for_status()
data = resp.json()
if data.get('success'):
tid = data['ticket_id']
logger.info(f'Created ticket #{tid}: {title}')
return tid
if data.get('existing_ticket_id'):
logger.info(f'Duplicate suppressed by API existing #{data["existing_ticket_id"]}')
return data['existing_ticket_id']
logger.warning(f'Unexpected ticket API response: {data}')
except Exception as e:
logger.error(f'Ticket creation failed: {e}')
return None
# --------------------------------------------------------------------------
# Pulse HTTP client (delegates SSH commands to Pulse worker)
# --------------------------------------------------------------------------
class PulseClient:
"""Submit a command to a Pulse worker via the internal M2M API and poll for result."""
def __init__(self, cfg: dict):
p = cfg.get('pulse', {})
self.url = p.get('url', '').rstrip('/')
self.api_key = p.get('api_key', '')
self.worker_id = p.get('worker_id', '')
self.timeout = p.get('timeout', 45)
self.last_execution_id: Optional[str] = None
self.session = requests.Session()
self.session.headers.update({
'X-Gandalf-API-Key': self.api_key,
'Content-Type': 'application/json',
})
def run_command(self, command: str, _retry: bool = True) -> Optional[str]:
"""Submit *command* to Pulse, poll until done, return stdout or None.
Retries once automatically on transient submit failures or timeouts.
"""
self.last_execution_id = None
if not self.url or not self.api_key or not self.worker_id:
return None
# Submit
try:
resp = self.session.post(
f'{self.url}/api/internal/command',
json={'worker_id': self.worker_id, 'command': command},
timeout=10,
)
resp.raise_for_status()
execution_id = resp.json()['execution_id']
self.last_execution_id = execution_id
except Exception as e:
logger.error(f'Pulse command submit failed: {e}')
if _retry:
logger.info('Retrying Pulse command submit in 5s...')
time.sleep(5)
return self.run_command(command, _retry=False)
return None
# Poll
deadline = time.time() + self.timeout
while time.time() < deadline:
time.sleep(2)
try:
r = self.session.get(
f'{self.url}/api/internal/executions/{execution_id}',
timeout=10,
)
r.raise_for_status()
data = r.json()
status = data.get('status')
if status == 'completed':
logs = data.get('logs', [])
for entry in logs:
if entry.get('action') == 'command_result':
return entry.get('stdout', '')
return ''
if status in ('failed', 'timed_out', 'cancelled'):
logger.error(
f'Pulse execution {execution_id} ended with status={status!r}; '
f'view at {self.url}/executions/{execution_id}'
)
if _retry and status != 'cancelled':
logger.info('Retrying failed Pulse command in 5s...')
time.sleep(5)
return self.run_command(command, _retry=False)
return None
except Exception as e:
logger.error(f'Pulse poll failed for {execution_id}: {e}')
logger.warning(
f'Pulse command timed out after {self.timeout}s '
f'(execution_id={execution_id}); '
f'view at {self.url}/executions/{execution_id}'
)
if _retry:
logger.info('Retrying timed-out Pulse command in 5s...')
time.sleep(5)
return self.run_command(command, _retry=False)
return None
# --------------------------------------------------------------------------
# Link stats collector (ethtool + Prometheus traffic metrics)
# --------------------------------------------------------------------------
class LinkStatsCollector:
"""Collects detailed per-interface statistics via SSH (ethtool) and Prometheus,
plus per-port stats from UniFi switches."""
def __init__(self, cfg: dict, prom: 'PrometheusClient',
unifi: Optional['UnifiClient'] = None):
self.prom = prom
self.pulse = PulseClient(cfg)
self.unifi = unifi
# State for UniFi rate calculation (previous snapshot + timestamp)
self._prev_unifi: Dict[str, dict] = {}
self._prev_unifi_time: float = 0.0
# ------------------------------------------------------------------
# SSH collection (via Pulse worker)
# ------------------------------------------------------------------
def _ssh_batch(self, ip: str, ifaces: List[str]) -> Dict[str, dict]:
"""
Delegate one SSH session to the Pulse worker to collect ethtool + SFP DOM
data for all *ifaces*. Returns {iface: {speed_mbps, duplex, ..., sfp: {...}}}.
"""
if not ifaces or not self.pulse.url:
return {}
# Validate interface names (kernel names only contain [a-zA-Z0-9_.-])
safe_ifaces = [i for i in ifaces if re.match(r'^[a-zA-Z0-9_.@-]+$', i)]
if not safe_ifaces:
return {}
# Build a single shell command: for each iface output ethtool + -m with sentinels
parts = []
for iface in safe_ifaces:
q = shlex.quote(iface)
parts.append(
f'echo "___IFACE:{iface}___";'
f' ethtool {q} 2>/dev/null;'
f' echo "___DOM:{iface}___";'
f' ethtool -m {q} 2>/dev/null;'
f' echo "___END___"'
)
shell_cmd = ' '.join(parts)
ssh_cmd = (
f'ssh -o StrictHostKeyChecking=no -o ConnectTimeout=5 '
f'-o BatchMode=yes -o LogLevel=ERROR '
f'-o ServerAliveInterval=10 -o ServerAliveCountMax=2 '
f'root@{ip} "{shell_cmd}"'
)
output = self.pulse.run_command(ssh_cmd)
if output is None:
logger.error(f'Pulse ethtool collection returned None for {ip}')
return {}
return self._parse_ssh_output(output)
@staticmethod
def _parse_ssh_output(output: str) -> Dict[str, dict]:
result: Dict[str, dict] = {}
current_iface: Optional[str] = None
current_section: Optional[str] = None
buf: List[str] = []
def flush(iface, section, lines):
if not iface or not lines:
return
text = '\n'.join(lines)
if section == 'ethtool':
result.setdefault(iface, {}).update(
LinkStatsCollector._parse_ethtool(text)
)
elif section == 'dom':
sfp = LinkStatsCollector._parse_ethtool_m(text)
if sfp:
result.setdefault(iface, {})['sfp'] = sfp
for line in output.splitlines():
if line.startswith('___IFACE:') and line.endswith('___'):
flush(current_iface, current_section, buf)
current_iface = line[9:-3]
current_section = 'ethtool'
buf = []
elif line.startswith('___DOM:') and line.endswith('___'):
flush(current_iface, current_section, buf)
current_iface = line[7:-3]
current_section = 'dom'
buf = []
elif line == '___END___':
flush(current_iface, current_section, buf)
current_iface = None
current_section = None
buf = []
else:
buf.append(line)
flush(current_iface, current_section, buf)
return result
@staticmethod
def _parse_ethtool(output: str) -> dict:
data: dict = {}
for line in output.splitlines():
if ':' not in line:
continue
key, _, val = line.partition(':')
key = key.strip()
val = val.strip()
if key == 'Speed':
m = re.match(r'(\d+)Mb/s', val)
if m:
data['speed_mbps'] = int(m.group(1))
elif key == 'Duplex':
data['duplex'] = val.lower()
elif key == 'Port':
data['port_type'] = val
elif key == 'Auto-negotiation':
data['auto_neg'] = (val.lower() == 'on')
elif key == 'Link detected':
data['link_detected'] = (val.lower() == 'yes')
return data
@staticmethod
def _parse_ethtool_m(output: str) -> dict:
"""Parse ethtool -m (SFP DOM / digital optical monitoring) output."""
if not output:
return {}
# Skip if module diagnostics unsupported
lower = output.lower()
if 'cannot get' in lower or 'not supported' in lower or 'no sfp' in lower:
return {}
data: dict = {}
for line in output.splitlines():
if ':' not in line:
continue
key, _, val = line.partition(':')
key = key.strip()
val = val.strip()
if key == 'Vendor name':
data['vendor'] = val
elif key == 'Vendor PN':
data['part_no'] = val
elif key == 'Identifier':
m = re.search(r'\((.+?)\)', val)
if m:
data['sfp_type'] = m.group(1)
elif key == 'Connector':
m = re.search(r'\((.+?)\)', val)
if m:
data['connector'] = m.group(1)
elif key == 'Laser wavelength':
m = re.match(r'(\d+)', val)
if m:
data['wavelength_nm'] = int(m.group(1))
elif key == 'Laser bias current':
# e.g. "4.340 mA"
m = re.match(r'([\d.]+)\s+mA', val)
if m:
data['bias_ma'] = float(m.group(1))
elif key == 'Laser output power':
# e.g. "0.1234 mW / -9.09 dBm"
m = re.search(r'/\s*([-\d.]+)\s*dBm', val)
if m:
try:
data['tx_power_dbm'] = float(m.group(1))
except ValueError:
pass
elif 'receiver' in key.lower() and ('power' in key.lower() or 'optical' in key.lower()):
m = re.search(r'/\s*([-\d.]+)\s*dBm', val)
if m:
try:
data['rx_power_dbm'] = float(m.group(1))
except ValueError:
pass
elif key == 'Module temperature':
# e.g. "36.00 degrees C / 96.80 degrees F"
m = re.match(r'([\d.]+)\s+degrees', val)
if m:
data['temp_c'] = float(m.group(1))
elif key == 'Module voltage':
# e.g. "3.3180 V"
m = re.match(r'([\d.]+)\s+V', val)
if m:
data['voltage_v'] = float(m.group(1))
return data
# ------------------------------------------------------------------
# Prometheus traffic / error metrics
# ------------------------------------------------------------------
def _collect_prom_metrics(self) -> Dict[str, Dict[str, dict]]:
"""Return {instance: {device: {tx_bytes_rate, rx_bytes_rate, ...}}}."""
metrics: Dict[str, Dict[str, dict]] = {}
queries = [
('tx_bytes_rate', 'rate(node_network_transmit_bytes_total[5m])'),
('rx_bytes_rate', 'rate(node_network_receive_bytes_total[5m])'),
('tx_errs_rate', 'rate(node_network_transmit_errs_total[5m])'),
('rx_errs_rate', 'rate(node_network_receive_errs_total[5m])'),
('tx_drops_rate', 'rate(node_network_transmit_drop_total[5m])'),
('rx_drops_rate', 'rate(node_network_receive_drop_total[5m])'),
('carrier_changes', 'node_network_carrier_changes_total'),
]
for field, promql in queries:
for r in self.prom.query(promql):
instance = r['metric'].get('instance', '')
device = r['metric'].get('device', '')
if not is_physical_interface(device):
continue
raw = r['value'][1]
try:
val: Optional[float] = float(raw)
if val != val: # NaN
val = None
except (ValueError, TypeError):
val = None
metrics.setdefault(instance, {}).setdefault(device, {})[field] = val
return metrics
# ------------------------------------------------------------------
# Main collection entry point
# ------------------------------------------------------------------
def collect(self, instance_map: Dict[str, str]) -> dict:
"""
Collect full link stats for all Prometheus-monitored hosts.
*instance_map*: ``{'10.10.10.2:9100': 'large1', ...}``
Returns a dict suitable for ``db.set_state('link_stats', ...)``.
"""
prom_metrics = self._collect_prom_metrics()
result_hosts: Dict[str, Dict[str, dict]] = {}
for instance, iface_metrics in prom_metrics.items():
host = instance_map.get(instance, instance.split(':')[0])
host_ip = instance.split(':')[0]
ifaces = list(iface_metrics.keys())
# SSH ethtool collection via Pulse worker — only for explicitly configured
# hosts (instance_map keys). Hosts like postgresql/matrix may report
# node_exporter metrics to Prometheus but don't need link diagnostics.
ethtool_data: Dict[str, dict] = {}
if self.pulse.url and ifaces and instance in instance_map:
try:
ethtool_data = self._ssh_batch(host_ip, ifaces)
except Exception as e:
logger.warning(f'ethtool collection failed for {host} ({host_ip}): {e}')
# Merge Prometheus metrics + ethtool data per interface
merged: Dict[str, dict] = {}
for iface in ifaces:
d: dict = {'host_ip': host_ip}
d.update(iface_metrics.get(iface, {}))
eth = ethtool_data.get(iface, {})
for k, v in eth.items():
if k != 'sfp':
d[k] = v
if 'sfp' in eth:
d['sfp'] = eth['sfp']
merged[iface] = d
result_hosts[host] = merged
# Collect UniFi switch port stats
unifi_switches: dict = {}
if self.unifi:
try:
raw = self.unifi.get_switch_ports()
if raw is not None:
now = time.time()
unifi_switches = self._compute_unifi_rates(raw, now)
self._prev_unifi = raw
self._prev_unifi_time = now
except Exception as e:
logger.warning(f'UniFi switch port collection failed: {e}')
return {
'hosts': result_hosts,
'unifi_switches': unifi_switches,
'updated': datetime.utcnow().strftime('%Y-%m-%d %H:%M:%S UTC'),
}
def _compute_unifi_rates(self, raw: Dict[str, dict], now: float) -> Dict[str, dict]:
"""Compute per-port byte/error rates from delta against previous snapshot."""
dt = now - self._prev_unifi_time if self._prev_unifi_time > 0 else 0
def rate(new_val: int, old_val: int) -> Optional[float]:
if dt <= 0:
return None
return max(0.0, (new_val - old_val) / dt)
result: Dict[str, dict] = {}
for sw_name, sw_data in raw.items():
prev_ports = self._prev_unifi.get(sw_name, {}).get('ports', {})
merged_ports: Dict[str, dict] = {}
for pname, d in sw_data['ports'].items():
entry = dict(d)
prev = prev_ports.get(pname, {})
entry['tx_bytes_rate'] = rate(d['tx_bytes'], prev.get('tx_bytes', 0))
entry['rx_bytes_rate'] = rate(d['rx_bytes'], prev.get('rx_bytes', 0))
entry['tx_errs_rate'] = rate(d['tx_errors'], prev.get('tx_errors', 0))
entry['rx_errs_rate'] = rate(d['rx_errors'], prev.get('rx_errors', 0))
entry['tx_drops_rate'] = rate(d['tx_dropped'], prev.get('tx_dropped', 0))
entry['rx_drops_rate'] = rate(d['rx_dropped'], prev.get('rx_dropped', 0))
merged_ports[pname] = entry
result[sw_name] = {'ip': sw_data['ip'], 'model': sw_data['model'],
'ports': merged_ports}
return result
# --------------------------------------------------------------------------
# Helpers
# --------------------------------------------------------------------------
def ping(ip: str, count: int = 3, timeout: int = 2) -> bool:
try:
r = subprocess.run(
['ping', '-c', str(count), '-W', str(timeout), ip],
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
timeout=30,
)
return r.returncode == 0
except Exception:
return False
def _now_utc() -> str:
return datetime.utcnow().strftime('%Y-%m-%d %H:%M:%S UTC')
# --------------------------------------------------------------------------
# Monitor
# --------------------------------------------------------------------------
CLUSTER_NAME = 'proxmox-cluster'
class NetworkMonitor:
def __init__(self):
with open('config.json') as f:
self.cfg = json.load(f)
prom_url = self.cfg['prometheus']['url']
self.prom = PrometheusClient(prom_url)
self.unifi = UnifiClient(self.cfg['unifi'])
self.tickets = TicketClient(self.cfg.get('ticket_api', {}))
self.link_stats = LinkStatsCollector(self.cfg, self.prom, self.unifi)
mon = self.cfg.get('monitor', {})
self.poll_interval = mon.get('poll_interval', 120)
self.fail_thresh = mon.get('failure_threshold', 2)
self.cluster_thresh = mon.get('cluster_threshold', 3)
self.cluster_name = mon.get('cluster_name', CLUSTER_NAME)
# Build Prometheus instance → hostname lookup
self._instance_map: Dict[str, str] = {
h['prometheus_instance']: h['name']
for h in self.cfg.get('hosts', [])
if 'prometheus_instance' in h
}
def _hostname(self, instance: str) -> str:
return self._instance_map.get(instance, instance.split(':')[0])
# ------------------------------------------------------------------
# Interface monitoring (Prometheus)
# ------------------------------------------------------------------
def _process_interfaces(self, states: Dict[str, Dict[str, bool]], suppressions: list) -> None:
baseline = db.get_baseline()
new_baseline = {k: dict(v) for k, v in baseline.items()}
# Only count hosts with genuine regressions (UP→DOWN) toward cluster threshold
hosts_with_regression: List[str] = []
for instance, ifaces in states.items():
host = self._hostname(instance)
new_baseline.setdefault(host, {})
host_has_regression = False
for iface, is_up in ifaces.items():
prev = baseline.get(host, {}).get(iface) # 'up', 'initial_down', or None
if is_up:
new_baseline[host][iface] = 'up'
db.resolve_event('interface_down', host, iface)
else:
if prev is None:
# First observation is down could be unused port, don't alert
new_baseline[host][iface] = 'initial_down'
elif prev == 'initial_down':
# Persistently down since first observation no alert
pass
else: # prev == 'up'
# Regression: was UP, now DOWN
host_has_regression = True
sup = (
db.check_suppressed(suppressions, 'interface', host, iface) or
db.check_suppressed(suppressions, 'host', host)
)
event_id, is_new, consec = db.upsert_event(
'interface_down', 'critical', 'prometheus',
host, iface,
f'Interface {iface} on {host} went link-down ({_now_utc()})',
)
if not sup and consec >= self.fail_thresh:
self._ticket_interface(event_id, is_new, host, iface, consec)
if host_has_regression:
hosts_with_regression.append(host)
db.set_baseline(new_baseline)
# Cluster-wide check only genuine regressions count
if len(hosts_with_regression) >= self.cluster_thresh:
sup = db.check_suppressed(suppressions, 'all', '')
event_id, is_new, consec = db.upsert_event(
'cluster_network_issue', 'critical', 'prometheus',
self.cluster_name, '',
f'{len(hosts_with_regression)} hosts reporting simultaneous interface failures: '
f'{", ".join(hosts_with_regression)}',
)
if not sup and is_new:
title = (
f'[{self.cluster_name}][auto][production][issue][network][cluster-wide] '
f'Multiple hosts reporting interface failures'
)
desc = (
f'Cluster Network Alert\n{"=" * 40}\n\n'
f'Affected hosts: {", ".join(hosts_with_regression)}\n'
f'Detected: {_now_utc()}\n\n'
f'{len(hosts_with_regression)} Proxmox hosts simultaneously reported '
f'interface regressions (link-down on interfaces previously known UP).\n'
f'This likely indicates a switch or upstream network failure.\n\n'
f'Please check the core and management switches immediately.'
)
tid = self.tickets.create(title, desc, priority='1')
if tid:
db.set_ticket_id(event_id, tid)
else:
db.resolve_event('cluster_network_issue', self.cluster_name, '')
def _ticket_interface(
self, event_id: int, is_new: bool, host: str, iface: str, consec: int
) -> None:
title = (
f'[{host}][auto][production][issue][network][single-node] '
f'Interface {iface} link-down'
)
desc = (
f'Network Interface Alert\n{"=" * 40}\n\n'
f'Host: {host}\n'
f'Interface: {iface}\n'
f'Detected: {_now_utc()}\n'
f'Consecutive check failures: {consec}\n\n'
f'Interface {iface} on {host} is reporting link-down state via '
f'Prometheus node_exporter.\n\n'
f'Note: {host} may still be reachable via its other network interface.\n'
f'Please inspect the cable/SFP/switch port for {host}/{iface}.'
)
tid = self.tickets.create(title, desc, priority='2')
if tid and is_new:
db.set_ticket_id(event_id, tid)
# ------------------------------------------------------------------
# UniFi device monitoring
# ------------------------------------------------------------------
def _process_unifi(self, devices: Optional[List[dict]], suppressions: list) -> None:
if devices is None:
logger.warning('UniFi API unreachable this cycle')
return
for d in devices:
name = d['name']
if not d['connected']:
sup = db.check_suppressed(suppressions, 'unifi_device', name)
event_id, is_new, consec = db.upsert_event(
'unifi_device_offline', 'critical', 'unifi',
name, d.get('type', ''),
f'UniFi {name} ({d.get("ip","")}) offline ({_now_utc()})',
)
if not sup and consec >= self.fail_thresh:
self._ticket_unifi(event_id, is_new, d)
else:
db.resolve_event('unifi_device_offline', name, d.get('type', ''))
def _ticket_unifi(self, event_id: int, is_new: bool, device: dict) -> None:
name = device['name']
title = (
f'[{name}][auto][production][issue][network][single-node] '
f'UniFi device offline'
)
desc = (
f'UniFi Device Alert\n{"=" * 40}\n\n'
f'Device: {name}\n'
f'Type: {device.get("type","unknown")}\n'
f'Model: {device.get("model","")}\n'
f'Last Known IP: {device.get("ip","unknown")}\n'
f'Detected: {_now_utc()}\n\n'
f'The UniFi device {name} is offline per the UniFi controller.\n'
f'Please check power and cable connectivity.'
)
tid = self.tickets.create(title, desc, priority='2')
if tid and is_new:
db.set_ticket_id(event_id, tid)
# ------------------------------------------------------------------
# Ping-only hosts (no node_exporter)
# ------------------------------------------------------------------
def _process_ping_hosts(self, suppressions: list) -> None:
for h in self.cfg.get('monitor', {}).get('ping_hosts', []):
name, ip = h['name'], h['ip']
reachable = ping(ip)
if not reachable:
sup = db.check_suppressed(suppressions, 'host', name)
event_id, is_new, consec = db.upsert_event(
'host_unreachable', 'critical', 'ping',
name, ip,
f'Host {name} ({ip}) unreachable via ping ({_now_utc()})',
)
if not sup and consec >= self.fail_thresh:
self._ticket_unreachable(event_id, is_new, name, ip, consec)
else:
db.resolve_event('host_unreachable', name, ip)
def _ticket_unreachable(
self, event_id: int, is_new: bool, name: str, ip: str, consec: int
) -> None:
title = (
f'[{name}][auto][production][issue][network][single-node] '
f'Host unreachable'
)
desc = (
f'Host Reachability Alert\n{"=" * 40}\n\n'
f'Host: {name}\n'
f'IP: {ip}\n'
f'Detected: {_now_utc()}\n'
f'Consecutive check failures: {consec}\n\n'
f'Host {name} ({ip}) is not responding to ping from the Gandalf monitor.\n'
f'This host does not have a Prometheus node_exporter, so interface-level '
f'detail is unavailable.\n\n'
f'Please check the host power, management interface, and network connectivity.'
)
tid = self.tickets.create(title, desc, priority='2')
if tid and is_new:
db.set_ticket_id(event_id, tid)
# ------------------------------------------------------------------
# Snapshot collection (for dashboard)
# ------------------------------------------------------------------
def _collect_snapshot(self) -> dict:
iface_states = self.prom.get_interface_states()
unifi_devices = self.unifi.get_devices() or []
hosts = {}
for instance, ifaces in iface_states.items():
host = self._hostname(instance)
phys = {k: v for k, v in ifaces.items()}
up_count = sum(1 for v in phys.values() if v)
total = len(phys)
if total == 0 or up_count == total:
status = 'up'
elif up_count == 0:
status = 'down'
else:
status = 'degraded'
hosts[host] = {
'ip': instance.split(':')[0],
'interfaces': {k: ('up' if v else 'down') for k, v in phys.items()},
'status': status,
'source': 'prometheus',
}
for h in self.cfg.get('monitor', {}).get('ping_hosts', []):
name, ip = h['name'], h['ip']
reachable = ping(ip, count=1, timeout=2)
hosts[name] = {
'ip': ip,
'interfaces': {},
'status': 'up' if reachable else 'down',
'source': 'ping',
}
return {
'hosts': hosts,
'unifi': unifi_devices,
'updated': datetime.utcnow().isoformat(),
}
# ------------------------------------------------------------------
# Main loop
# ------------------------------------------------------------------
def run(self) -> None:
logger.info(
f'Gandalf monitor started poll_interval={self.poll_interval}s '
f'fail_thresh={self.fail_thresh}'
)
while True:
try:
logger.info('Starting network check cycle')
# 1. Collect and store snapshot for dashboard
snapshot = self._collect_snapshot()
db.set_state('network_snapshot', snapshot)
db.set_state('last_check', _now_utc())
# 2. Collect link stats (ethtool + traffic metrics)
try:
link_data = self.link_stats.collect(self._instance_map)
db.set_state('link_stats', link_data)
except Exception as e:
logger.error(f'Link stats collection failed: {e}', exc_info=True)
# 3. Process alerts (separate Prometheus call for fresh data)
# Load suppressions once per cycle to avoid N*M DB queries
suppressions = db.get_active_suppressions()
iface_states = self.prom.get_interface_states()
self._process_interfaces(iface_states, suppressions)
unifi_devices = self.unifi.get_devices()
self._process_unifi(unifi_devices, suppressions)
self._process_ping_hosts(suppressions)
# Housekeeping: deactivate expired suppressions and purge old resolved events
db.cleanup_expired_suppressions()
db.purge_old_resolved_events(days=90)
logger.info('Network check cycle complete')
except Exception as e:
logger.error(f'Monitor loop error: {e}', exc_info=True)
time.sleep(self.poll_interval)
if __name__ == '__main__':
monitor = NetworkMonitor()
monitor.run()