9c5a88fbce
Lint / Python (flake8) (push) Successful in 41s
Lint / JS (eslint) (push) Successful in 7s
Security / Python Security (bandit) (push) Successful in 40s
Test / Python Tests (pytest) (push) Successful in 1m18s
Lint / Notify on failure (push) Has been skipped
Lint / Deploy (push) Successful in 4s
upsert_event now returns ticket_id (4th element) so callers can skip ticket creation when one already exists. This prevents calling the ticket API every poll cycle for ongoing issues while still retrying if the previous creation attempt failed (ticket_id stays NULL until success). Cluster events use (is_new or not ticket_id) so they too get retried on failure rather than relying solely on is_new. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
988 lines
41 KiB
Python
988 lines
41 KiB
Python
#!/usr/bin/env python3
|
||
"""Gandalf network monitor daemon.
|
||
|
||
Polls Prometheus (node_exporter) and the UniFi controller for network
|
||
interface and device state. Creates tickets in Tinker Tickets when issues
|
||
are detected, with deduplication and suppression support.
|
||
|
||
Run as a separate systemd service alongside the Flask web app.
|
||
"""
|
||
import json
|
||
import logging
|
||
import re
|
||
import shlex
|
||
import time
|
||
from datetime import datetime, timezone
|
||
from typing import Dict, List, Optional
|
||
|
||
import requests
|
||
from urllib3.exceptions import InsecureRequestWarning
|
||
|
||
import db
|
||
|
||
|
||
logging.basicConfig(
|
||
level=logging.INFO,
|
||
format='%(asctime)s %(levelname)s %(name)s %(message)s',
|
||
)
|
||
logger = logging.getLogger('gandalf.monitor')
|
||
|
||
# --------------------------------------------------------------------------
|
||
# Interface filtering
|
||
# --------------------------------------------------------------------------
|
||
_SKIP_PREFIXES = (
|
||
'lo', 'veth', 'tap', 'fwbr', 'fwln', 'fwpr',
|
||
'docker', 'dummy', 'br-', 'virbr', 'vmbr',
|
||
)
|
||
_VLAN_SUFFIX = re.compile(r'\.\d+$')
|
||
|
||
|
||
def is_physical_interface(name: str) -> bool:
|
||
"""Return True for physical/bond interfaces worth monitoring."""
|
||
if any(name.startswith(p) for p in _SKIP_PREFIXES):
|
||
return False
|
||
if _VLAN_SUFFIX.search(name):
|
||
return False
|
||
return True
|
||
|
||
|
||
# --------------------------------------------------------------------------
|
||
# Prometheus client
|
||
# --------------------------------------------------------------------------
|
||
class PrometheusClient:
|
||
def __init__(self, url: str):
|
||
self.url = url.rstrip('/')
|
||
|
||
def query(self, promql: str) -> list:
|
||
try:
|
||
resp = requests.get(
|
||
f'{self.url}/api/v1/query',
|
||
params={'query': promql},
|
||
timeout=15,
|
||
)
|
||
resp.raise_for_status()
|
||
data = resp.json()
|
||
if data.get('status') == 'success':
|
||
return data['data']['result']
|
||
except Exception as e:
|
||
logger.error(f'Prometheus query failed ({promql!r}): {e}')
|
||
return []
|
||
|
||
def get_interface_states(self) -> Dict[str, Dict[str, bool]]:
|
||
"""Return {instance: {device: is_up}} for physical interfaces."""
|
||
results = self.query('node_network_up')
|
||
hosts: Dict[str, Dict[str, bool]] = {}
|
||
for r in results:
|
||
instance = r['metric'].get('instance', '')
|
||
device = r['metric'].get('device', '')
|
||
if not is_physical_interface(device):
|
||
continue
|
||
hosts.setdefault(instance, {})[device] = (r['value'][1] == '1')
|
||
return hosts
|
||
|
||
|
||
# --------------------------------------------------------------------------
|
||
# UniFi client
|
||
# --------------------------------------------------------------------------
|
||
class UnifiClient:
|
||
def __init__(self, cfg: dict):
|
||
self.base_url = cfg['controller']
|
||
self.site_id = cfg.get('site_id', 'default')
|
||
self.session = requests.Session()
|
||
self.session.verify = cfg.get('verify_ssl', True)
|
||
if not self.session.verify:
|
||
requests.packages.urllib3.disable_warnings(InsecureRequestWarning)
|
||
self.headers = {
|
||
'X-API-KEY': cfg['api_key'],
|
||
'Accept': 'application/json',
|
||
}
|
||
|
||
def get_devices(self) -> Optional[List[dict]]:
|
||
"""Return list of UniFi devices, or None if the controller is unreachable."""
|
||
try:
|
||
url = f'{self.base_url}/proxy/network/v2/api/site/{self.site_id}/device'
|
||
resp = self.session.get(url, headers=self.headers, timeout=15)
|
||
resp.raise_for_status()
|
||
data = resp.json()
|
||
devices = []
|
||
for d in data.get('network_devices', []):
|
||
state = d.get('state', 1)
|
||
devices.append({
|
||
'name': d.get('name') or d.get('mac', 'unknown'),
|
||
'mac': d.get('mac', ''),
|
||
'ip': d.get('ip', ''),
|
||
'type': d.get('type', 'unknown'),
|
||
'model': d.get('model', ''),
|
||
'state': state,
|
||
'connected': state == 1,
|
||
})
|
||
return devices
|
||
except Exception as e:
|
||
logger.error(f'UniFi API error: {e}')
|
||
return None
|
||
|
||
def get_switch_ports(self) -> Optional[Dict[str, dict]]:
|
||
"""Return per-port stats for all UniFi switches, keyed by switch name.
|
||
|
||
Uses the v1 stat API which includes full port_table data.
|
||
Returns {switch_name: {'ip': str, 'model': str, 'ports': {port_name: {...}}}}.
|
||
"""
|
||
try:
|
||
url = f'{self.base_url}/proxy/network/api/s/{self.site_id}/stat/device'
|
||
resp = self.session.get(url, headers=self.headers, timeout=15)
|
||
resp.raise_for_status()
|
||
devices = resp.json().get('data', [])
|
||
result: Dict[str, dict] = {}
|
||
for dev in devices:
|
||
if dev.get('type', '').lower() != 'usw':
|
||
continue
|
||
sw_name = dev.get('name') or dev.get('mac', 'unknown')
|
||
sw_ip = dev.get('ip', '')
|
||
sw_model = dev.get('model', '')
|
||
ports: Dict[str, dict] = {}
|
||
# Build LLDP neighbor map (keyed by port_idx)
|
||
lldp_map: Dict[int, dict] = {}
|
||
for entry in dev.get('lldp_table', []):
|
||
pidx = entry.get('lldp_port_idx')
|
||
if pidx is not None:
|
||
lldp_map[int(pidx)] = {
|
||
'chassis_id': entry.get('chassis_id', ''),
|
||
'system_name': entry.get('system_name', ''),
|
||
'port_id': entry.get('port_id', ''),
|
||
'port_desc': entry.get('port_desc', ''),
|
||
'mgmt_ips': entry.get('management_ips', []),
|
||
}
|
||
for port in dev.get('port_table', []):
|
||
idx = port.get('port_idx', 0)
|
||
pname = port.get('name') or f'Port {idx}'
|
||
raw_poe = port.get('poe_power')
|
||
raw_poe_max = port.get('poe_max_power')
|
||
ports[pname] = {
|
||
'port_idx': idx,
|
||
'switch_ip': sw_ip,
|
||
'up': port.get('up', False),
|
||
'speed_mbps': port.get('speed', 0),
|
||
'full_duplex': port.get('full_duplex', False),
|
||
'autoneg': port.get('autoneg', False),
|
||
'is_uplink': port.get('is_uplink', False),
|
||
'media': port.get('media', ''),
|
||
'poe_power': float(raw_poe) if raw_poe is not None else None,
|
||
'poe_class': port.get('poe_class'),
|
||
'poe_max_power': float(raw_poe_max) if raw_poe_max is not None else None,
|
||
'poe_mode': port.get('poe_mode', ''),
|
||
'lldp': lldp_map.get(idx),
|
||
'tx_bytes': port.get('tx_bytes', 0),
|
||
'rx_bytes': port.get('rx_bytes', 0),
|
||
'tx_errors': port.get('tx_errors', 0),
|
||
'rx_errors': port.get('rx_errors', 0),
|
||
'tx_dropped': port.get('tx_dropped', 0),
|
||
'rx_dropped': port.get('rx_dropped', 0),
|
||
}
|
||
if ports:
|
||
result[sw_name] = {'ip': sw_ip, 'model': sw_model, 'ports': ports}
|
||
return result
|
||
except Exception as e:
|
||
logger.error(f'UniFi switch port stats error: {e}')
|
||
return None
|
||
|
||
|
||
# --------------------------------------------------------------------------
|
||
# Ticket client
|
||
# --------------------------------------------------------------------------
|
||
class TicketClient:
|
||
def __init__(self, cfg: dict):
|
||
self.url = cfg.get('url', '')
|
||
self.api_key = cfg.get('api_key', '')
|
||
|
||
def create(self, title: str, description: str, priority: str = '2') -> Optional[str]:
|
||
if not self.api_key or not self.url:
|
||
logger.warning('Ticket API not configured – skipping ticket creation')
|
||
return None
|
||
try:
|
||
resp = requests.post(
|
||
self.url,
|
||
json={
|
||
'title': title,
|
||
'description': description,
|
||
'status': 'Open',
|
||
'priority': priority,
|
||
'category': 'Network',
|
||
'type': 'Issue',
|
||
},
|
||
headers={'Authorization': f'Bearer {self.api_key}'},
|
||
timeout=15,
|
||
)
|
||
resp.raise_for_status()
|
||
data = resp.json()
|
||
if data.get('success'):
|
||
tid = data.get('ticket_id')
|
||
if not tid:
|
||
logger.warning(f'Ticket API success but no ticket_id in response: {data}')
|
||
return None
|
||
logger.info(f'Created ticket #{tid}: {title}')
|
||
return tid
|
||
if data.get('existing_ticket_id'):
|
||
logger.info(f'Duplicate suppressed by API – existing #{data["existing_ticket_id"]}')
|
||
return data['existing_ticket_id']
|
||
logger.warning(f'Unexpected ticket API response: {data}')
|
||
except Exception as e:
|
||
logger.error(f'Ticket creation failed: {e}')
|
||
return None
|
||
|
||
|
||
# --------------------------------------------------------------------------
|
||
# Pulse HTTP client (delegates SSH commands to Pulse worker)
|
||
# --------------------------------------------------------------------------
|
||
class PulseClient:
|
||
"""Submit a command to a Pulse worker via the internal M2M API and poll for result."""
|
||
|
||
def __init__(self, cfg: dict):
|
||
p = cfg.get('pulse', {})
|
||
self.url = p.get('url', '').rstrip('/')
|
||
self.api_key = p.get('api_key', '')
|
||
self.worker_id = p.get('worker_id', '')
|
||
self.timeout = p.get('timeout', 45)
|
||
self.last_execution_id: Optional[str] = None
|
||
self.session = requests.Session()
|
||
self.session.headers.update({
|
||
'X-Gandalf-API-Key': self.api_key,
|
||
'Content-Type': 'application/json',
|
||
})
|
||
|
||
def run_command(self, command: str, _retry: bool = True) -> Optional[str]:
|
||
"""Submit *command* to Pulse, poll until done, return stdout or None.
|
||
|
||
Retries once automatically on transient submit failures or timeouts.
|
||
"""
|
||
self.last_execution_id = None
|
||
if not self.url or not self.api_key or not self.worker_id:
|
||
return None
|
||
|
||
# Submit
|
||
try:
|
||
resp = self.session.post(
|
||
f'{self.url}/api/internal/command',
|
||
json={'worker_id': self.worker_id, 'command': command},
|
||
timeout=10,
|
||
)
|
||
resp.raise_for_status()
|
||
execution_id = resp.json().get('execution_id')
|
||
if not execution_id:
|
||
logger.error('Pulse submit response missing execution_id')
|
||
return None
|
||
self.last_execution_id = execution_id
|
||
except Exception as e:
|
||
logger.error(f'Pulse command submit failed: {e}')
|
||
if _retry:
|
||
logger.info('Retrying Pulse command submit in 5s...')
|
||
time.sleep(5)
|
||
return self.run_command(command, _retry=False)
|
||
return None
|
||
|
||
# Poll
|
||
deadline = time.time() + self.timeout
|
||
while time.time() < deadline:
|
||
time.sleep(2)
|
||
try:
|
||
r = self.session.get(
|
||
f'{self.url}/api/internal/executions/{execution_id}',
|
||
timeout=10,
|
||
)
|
||
r.raise_for_status()
|
||
data = r.json()
|
||
status = data.get('status')
|
||
if status == 'completed':
|
||
logs = data.get('logs', [])
|
||
for entry in logs:
|
||
if entry.get('action') == 'command_result':
|
||
return entry.get('stdout', '')
|
||
return ''
|
||
if status in ('failed', 'timed_out', 'cancelled'):
|
||
logger.error(
|
||
f'Pulse execution {execution_id} ended with status={status!r}; '
|
||
f'view at {self.url}/executions/{execution_id}'
|
||
)
|
||
if _retry and status != 'cancelled':
|
||
logger.info('Retrying failed Pulse command in 5s...')
|
||
time.sleep(5)
|
||
return self.run_command(command, _retry=False)
|
||
return None
|
||
except Exception as e:
|
||
logger.error(f'Pulse poll failed for {execution_id}: {e}')
|
||
|
||
logger.warning(
|
||
f'Pulse command timed out after {self.timeout}s '
|
||
f'(execution_id={execution_id}); '
|
||
f'view at {self.url}/executions/{execution_id}'
|
||
)
|
||
if _retry:
|
||
logger.info('Retrying timed-out Pulse command in 5s...')
|
||
time.sleep(5)
|
||
return self.run_command(command, _retry=False)
|
||
return None
|
||
|
||
def ping(self, ip: str, count: int = 3, timeout: int = 2) -> bool:
|
||
"""Ping *ip* via the Pulse worker. Returns True if host responds."""
|
||
ip_q = shlex.quote(ip)
|
||
output = self.run_command(
|
||
f'ping -c {count} -W {timeout} {ip_q} >/dev/null 2>&1 && echo REACHABLE || echo UNREACHABLE'
|
||
)
|
||
return output is not None and output.strip() == 'REACHABLE'
|
||
|
||
|
||
# --------------------------------------------------------------------------
|
||
# Link stats collector (ethtool + Prometheus traffic metrics)
|
||
# --------------------------------------------------------------------------
|
||
class LinkStatsCollector:
|
||
"""Collects detailed per-interface statistics via SSH (ethtool) and Prometheus,
|
||
plus per-port stats from UniFi switches."""
|
||
|
||
def __init__(self, cfg: dict, prom: 'PrometheusClient',
|
||
unifi: Optional['UnifiClient'] = None):
|
||
self.cfg = cfg
|
||
self.prom = prom
|
||
self.pulse = PulseClient(cfg)
|
||
self.unifi = unifi
|
||
# State for UniFi rate calculation (previous snapshot + timestamp)
|
||
self._prev_unifi: Dict[str, dict] = {}
|
||
self._prev_unifi_time: float = 0.0
|
||
|
||
# ------------------------------------------------------------------
|
||
# SSH collection (via Pulse worker)
|
||
# ------------------------------------------------------------------
|
||
def _ssh_batch(self, ip: str, ifaces: List[str]) -> Dict[str, dict]:
|
||
"""
|
||
Delegate one SSH session to the Pulse worker to collect ethtool + SFP DOM
|
||
data for all *ifaces*. Returns {iface: {speed_mbps, duplex, ..., sfp: {...}}}.
|
||
"""
|
||
if not ifaces or not self.pulse.url:
|
||
return {}
|
||
|
||
# Validate interface names (kernel names: [a-zA-Z0-9_.-], max 15 chars per IFNAMSIZ)
|
||
safe_ifaces = [i for i in ifaces if re.match(r'^[a-zA-Z0-9_.-]{1,15}$', i)]
|
||
if not safe_ifaces:
|
||
return {}
|
||
|
||
# Build a single shell command: for each iface output ethtool + -m with sentinels
|
||
parts = []
|
||
for iface in safe_ifaces:
|
||
q = shlex.quote(iface)
|
||
parts.append(
|
||
f'echo "___IFACE:{iface}___";'
|
||
f' ethtool {q} 2>/dev/null;'
|
||
f' echo "___DOM:{iface}___";'
|
||
f' ethtool -m {q} 2>/dev/null;'
|
||
f' echo "___END___"'
|
||
)
|
||
shell_cmd = ' '.join(parts)
|
||
|
||
ssh_cmd = (
|
||
f'ssh -o StrictHostKeyChecking=accept-new -o ConnectTimeout=5 '
|
||
f'-o BatchMode=yes -o LogLevel=ERROR '
|
||
f'-o ServerAliveInterval=10 -o ServerAliveCountMax=2 '
|
||
f'root@{ip} {shlex.quote(shell_cmd)}'
|
||
)
|
||
output = self.pulse.run_command(ssh_cmd)
|
||
if output is None:
|
||
logger.error(f'Pulse ethtool collection returned None for {ip}')
|
||
return {}
|
||
|
||
return self._parse_ssh_output(output)
|
||
|
||
@staticmethod
|
||
def _parse_ssh_output(output: str) -> Dict[str, dict]:
|
||
result: Dict[str, dict] = {}
|
||
current_iface: Optional[str] = None
|
||
current_section: Optional[str] = None
|
||
buf: List[str] = []
|
||
|
||
def flush(iface, section, lines):
|
||
if not iface or not lines:
|
||
return
|
||
text = '\n'.join(lines)
|
||
if section == 'ethtool':
|
||
result.setdefault(iface, {}).update(
|
||
LinkStatsCollector._parse_ethtool(text)
|
||
)
|
||
elif section == 'dom':
|
||
sfp = LinkStatsCollector._parse_ethtool_m(text)
|
||
if sfp:
|
||
result.setdefault(iface, {})['sfp'] = sfp
|
||
|
||
for line in output.splitlines():
|
||
if line.startswith('___IFACE:') and line.endswith('___'):
|
||
flush(current_iface, current_section, buf)
|
||
current_iface = line[9:-3]
|
||
current_section = 'ethtool'
|
||
buf = []
|
||
elif line.startswith('___DOM:') and line.endswith('___'):
|
||
flush(current_iface, current_section, buf)
|
||
current_iface = line[7:-3]
|
||
current_section = 'dom'
|
||
buf = []
|
||
elif line == '___END___':
|
||
flush(current_iface, current_section, buf)
|
||
current_iface = None
|
||
current_section = None
|
||
buf = []
|
||
else:
|
||
buf.append(line)
|
||
|
||
flush(current_iface, current_section, buf)
|
||
return result
|
||
|
||
@staticmethod
|
||
def _parse_ethtool(output: str) -> dict:
|
||
data: dict = {}
|
||
for line in output.splitlines():
|
||
if ':' not in line:
|
||
continue
|
||
key, _, val = line.partition(':')
|
||
key = key.strip()
|
||
val = val.strip()
|
||
if key == 'Speed':
|
||
m = re.match(r'(\d+)Mb/s', val)
|
||
if m:
|
||
data['speed_mbps'] = int(m.group(1))
|
||
elif key == 'Duplex':
|
||
data['duplex'] = val.lower()
|
||
elif key == 'Port':
|
||
data['port_type'] = val
|
||
elif key == 'Auto-negotiation':
|
||
data['auto_neg'] = (val.lower() == 'on')
|
||
elif key == 'Link detected':
|
||
data['link_detected'] = (val.lower() == 'yes')
|
||
return data
|
||
|
||
@staticmethod
|
||
def _parse_ethtool_m(output: str) -> dict:
|
||
"""Parse ethtool -m (SFP DOM / digital optical monitoring) output."""
|
||
if not output:
|
||
return {}
|
||
# Skip if module diagnostics unsupported
|
||
lower = output.lower()
|
||
if 'cannot get' in lower or 'not supported' in lower or 'no sfp' in lower:
|
||
return {}
|
||
|
||
data: dict = {}
|
||
for line in output.splitlines():
|
||
if ':' not in line:
|
||
continue
|
||
key, _, val = line.partition(':')
|
||
key = key.strip()
|
||
val = val.strip()
|
||
|
||
if key == 'Vendor name':
|
||
data['vendor'] = val
|
||
elif key == 'Vendor PN':
|
||
data['part_no'] = val
|
||
elif key == 'Identifier':
|
||
m = re.search(r'\((.+?)\)', val)
|
||
if m:
|
||
data['sfp_type'] = m.group(1)
|
||
elif key == 'Connector':
|
||
m = re.search(r'\((.+?)\)', val)
|
||
if m:
|
||
data['connector'] = m.group(1)
|
||
elif key == 'Laser wavelength':
|
||
m = re.match(r'(\d+)', val)
|
||
if m:
|
||
data['wavelength_nm'] = int(m.group(1))
|
||
elif key == 'Laser bias current':
|
||
# e.g. "4.340 mA"
|
||
m = re.match(r'([\d.]+)\s+mA', val)
|
||
if m:
|
||
data['bias_ma'] = float(m.group(1))
|
||
elif key == 'Laser output power':
|
||
# e.g. "0.1234 mW / -9.09 dBm"
|
||
m = re.search(r'/\s*([-\d.]+)\s*dBm', val)
|
||
if m:
|
||
try:
|
||
data['tx_power_dbm'] = float(m.group(1))
|
||
except ValueError:
|
||
pass
|
||
elif 'receiver' in key.lower() and ('power' in key.lower() or 'optical' in key.lower()):
|
||
m = re.search(r'/\s*([-\d.]+)\s*dBm', val)
|
||
if m:
|
||
try:
|
||
data['rx_power_dbm'] = float(m.group(1))
|
||
except ValueError:
|
||
pass
|
||
elif key == 'Module temperature':
|
||
# e.g. "36.00 degrees C / 96.80 degrees F"
|
||
m = re.match(r'([\d.]+)\s+degrees', val)
|
||
if m:
|
||
data['temp_c'] = float(m.group(1))
|
||
elif key == 'Module voltage':
|
||
# e.g. "3.3180 V"
|
||
m = re.match(r'([\d.]+)\s+V', val)
|
||
if m:
|
||
data['voltage_v'] = float(m.group(1))
|
||
|
||
return data
|
||
|
||
# ------------------------------------------------------------------
|
||
# Prometheus traffic / error metrics
|
||
# ------------------------------------------------------------------
|
||
def _collect_prom_metrics(self) -> Dict[str, Dict[str, dict]]:
|
||
"""Return {instance: {device: {tx_bytes_rate, rx_bytes_rate, ...}}}."""
|
||
metrics: Dict[str, Dict[str, dict]] = {}
|
||
|
||
queries = [
|
||
('tx_bytes_rate', 'rate(node_network_transmit_bytes_total[5m])'),
|
||
('rx_bytes_rate', 'rate(node_network_receive_bytes_total[5m])'),
|
||
('tx_errs_rate', 'rate(node_network_transmit_errs_total[5m])'),
|
||
('rx_errs_rate', 'rate(node_network_receive_errs_total[5m])'),
|
||
('tx_drops_rate', 'rate(node_network_transmit_drop_total[5m])'),
|
||
('rx_drops_rate', 'rate(node_network_receive_drop_total[5m])'),
|
||
('carrier_changes', 'node_network_carrier_changes_total'),
|
||
]
|
||
|
||
for field, promql in queries:
|
||
for r in self.prom.query(promql):
|
||
instance = r['metric'].get('instance', '')
|
||
device = r['metric'].get('device', '')
|
||
if not is_physical_interface(device):
|
||
continue
|
||
raw = r['value'][1]
|
||
try:
|
||
val: Optional[float] = float(raw)
|
||
if val != val: # NaN
|
||
val = None
|
||
except (ValueError, TypeError):
|
||
val = None
|
||
metrics.setdefault(instance, {}).setdefault(device, {})[field] = val
|
||
|
||
return metrics
|
||
|
||
# ------------------------------------------------------------------
|
||
# Main collection entry point
|
||
# ------------------------------------------------------------------
|
||
def collect(self, instance_map: Dict[str, str]) -> dict:
|
||
"""
|
||
Collect full link stats for all Prometheus-monitored hosts.
|
||
|
||
*instance_map*: ``{'10.10.10.2:9100': 'large1', ...}``
|
||
|
||
Returns a dict suitable for ``db.set_state('link_stats', ...)``.
|
||
"""
|
||
prom_metrics = self._collect_prom_metrics()
|
||
result_hosts: Dict[str, Dict[str, dict]] = {}
|
||
exclude_ips = set(self.cfg.get('monitor', {}).get('links_exclude_ips', []))
|
||
|
||
for instance, iface_metrics in prom_metrics.items():
|
||
host_ip = instance.split(':')[0]
|
||
if host_ip in exclude_ips:
|
||
continue
|
||
host = instance_map.get(instance, host_ip)
|
||
ifaces = list(iface_metrics.keys())
|
||
|
||
# SSH ethtool collection via Pulse worker — only for explicitly configured
|
||
# hosts (instance_map keys). Hosts like postgresql/matrix may report
|
||
# node_exporter metrics to Prometheus but don't need link diagnostics.
|
||
ethtool_data: Dict[str, dict] = {}
|
||
if self.pulse.url and ifaces and instance in instance_map:
|
||
try:
|
||
ethtool_data = self._ssh_batch(host_ip, ifaces)
|
||
except Exception as e:
|
||
logger.warning(f'ethtool collection failed for {host} ({host_ip}): {e}')
|
||
|
||
# Merge Prometheus metrics + ethtool data per interface
|
||
merged: Dict[str, dict] = {}
|
||
for iface in ifaces:
|
||
d: dict = {'host_ip': host_ip}
|
||
d.update(iface_metrics.get(iface, {}))
|
||
eth = ethtool_data.get(iface, {})
|
||
for k, v in eth.items():
|
||
if k != 'sfp':
|
||
d[k] = v
|
||
if 'sfp' in eth:
|
||
d['sfp'] = eth['sfp']
|
||
merged[iface] = d
|
||
|
||
result_hosts[host] = merged
|
||
|
||
# Collect UniFi switch port stats
|
||
unifi_switches: dict = {}
|
||
if self.unifi:
|
||
try:
|
||
raw = self.unifi.get_switch_ports()
|
||
if raw is not None:
|
||
now = time.time()
|
||
unifi_switches = self._compute_unifi_rates(raw, now)
|
||
self._prev_unifi = raw
|
||
self._prev_unifi_time = now
|
||
except Exception as e:
|
||
logger.warning(f'UniFi switch port collection failed: {e}')
|
||
|
||
return {
|
||
'hosts': result_hosts,
|
||
'unifi_switches': unifi_switches,
|
||
'updated': datetime.now(timezone.utc).strftime('%Y-%m-%d %H:%M:%S UTC'),
|
||
}
|
||
|
||
def _compute_unifi_rates(self, raw: Dict[str, dict], now: float) -> Dict[str, dict]:
|
||
"""Compute per-port byte/error rates from delta against previous snapshot."""
|
||
dt = now - self._prev_unifi_time if self._prev_unifi_time > 0 else 0
|
||
|
||
def rate(new_val: int, old_val: int) -> Optional[float]:
|
||
if dt <= 0:
|
||
return None
|
||
return max(0.0, (new_val - old_val) / dt)
|
||
|
||
result: Dict[str, dict] = {}
|
||
for sw_name, sw_data in raw.items():
|
||
prev_ports = self._prev_unifi.get(sw_name, {}).get('ports', {})
|
||
merged_ports: Dict[str, dict] = {}
|
||
for pname, d in sw_data['ports'].items():
|
||
entry = dict(d)
|
||
prev = prev_ports.get(pname, {})
|
||
entry['tx_bytes_rate'] = rate(d['tx_bytes'], prev.get('tx_bytes', 0))
|
||
entry['rx_bytes_rate'] = rate(d['rx_bytes'], prev.get('rx_bytes', 0))
|
||
entry['tx_errs_rate'] = rate(d['tx_errors'], prev.get('tx_errors', 0))
|
||
entry['rx_errs_rate'] = rate(d['rx_errors'], prev.get('rx_errors', 0))
|
||
entry['tx_drops_rate'] = rate(d['tx_dropped'], prev.get('tx_dropped', 0))
|
||
entry['rx_drops_rate'] = rate(d['rx_dropped'], prev.get('rx_dropped', 0))
|
||
merged_ports[pname] = entry
|
||
result[sw_name] = {'ip': sw_data['ip'], 'model': sw_data['model'],
|
||
'ports': merged_ports}
|
||
return result
|
||
|
||
|
||
# --------------------------------------------------------------------------
|
||
# Helpers
|
||
# --------------------------------------------------------------------------
|
||
def _now_utc() -> str:
|
||
return datetime.now(timezone.utc).strftime('%Y-%m-%d %H:%M:%S UTC')
|
||
|
||
|
||
# --------------------------------------------------------------------------
|
||
# Monitor
|
||
# --------------------------------------------------------------------------
|
||
CLUSTER_NAME = 'proxmox-cluster'
|
||
|
||
|
||
class NetworkMonitor:
|
||
def __init__(self):
|
||
with open('config.json') as f:
|
||
self.cfg = json.load(f)
|
||
|
||
prom_url = self.cfg['prometheus']['url']
|
||
self.prom = PrometheusClient(prom_url)
|
||
self.unifi = UnifiClient(self.cfg['unifi'])
|
||
self.tickets = TicketClient(self.cfg.get('ticket_api', {}))
|
||
self.link_stats = LinkStatsCollector(self.cfg, self.prom, self.unifi)
|
||
self.pulse = self.link_stats.pulse # convenience alias
|
||
|
||
mon = self.cfg.get('monitor', {})
|
||
self.poll_interval = mon.get('poll_interval', 120)
|
||
self.fail_thresh = mon.get('failure_threshold', 2)
|
||
self.cluster_thresh = mon.get('cluster_threshold', 3)
|
||
self.cluster_name = mon.get('cluster_name', CLUSTER_NAME)
|
||
|
||
# Build Prometheus instance → hostname lookup
|
||
self._instance_map: Dict[str, str] = {
|
||
h['prometheus_instance']: h['name']
|
||
for h in self.cfg.get('hosts', [])
|
||
if 'prometheus_instance' in h
|
||
}
|
||
|
||
def _hostname(self, instance: str) -> str:
|
||
return self._instance_map.get(instance, instance.split(':')[0])
|
||
|
||
# ------------------------------------------------------------------
|
||
# Interface monitoring (Prometheus)
|
||
# ------------------------------------------------------------------
|
||
def _process_interfaces(self, states: Dict[str, Dict[str, bool]], suppressions: list) -> None:
|
||
baseline = db.get_baseline()
|
||
new_baseline = {k: dict(v) for k, v in baseline.items()}
|
||
# Only count hosts with genuine regressions (UP→DOWN) toward cluster threshold
|
||
hosts_with_regression: List[str] = []
|
||
|
||
for instance, ifaces in states.items():
|
||
if instance not in self._instance_map:
|
||
continue # skip unconfigured Prometheus instances
|
||
host = self._hostname(instance)
|
||
new_baseline.setdefault(host, {})
|
||
host_has_regression = False
|
||
|
||
for iface, is_up in ifaces.items():
|
||
prev = baseline.get(host, {}).get(iface) # 'up', 'initial_down', or None
|
||
|
||
if is_up:
|
||
new_baseline[host][iface] = 'up'
|
||
db.resolve_event('interface_down', host, iface)
|
||
else:
|
||
if prev is None:
|
||
# First observation is down – could be unused port, don't alert
|
||
new_baseline[host][iface] = 'initial_down'
|
||
|
||
elif prev == 'initial_down':
|
||
# Persistently down since first observation – no alert
|
||
pass
|
||
|
||
else: # prev == 'up'
|
||
# Regression: was UP, now DOWN
|
||
host_has_regression = True
|
||
sup = (
|
||
db.check_suppressed(suppressions, 'interface', host, iface) or
|
||
db.check_suppressed(suppressions, 'host', host)
|
||
)
|
||
event_id, is_new, consec, ticket_id = db.upsert_event(
|
||
'interface_down', 'critical', 'prometheus',
|
||
host, iface,
|
||
f'Interface {iface} on {host} went link-down ({_now_utc()})',
|
||
)
|
||
if not sup and consec >= self.fail_thresh and not ticket_id:
|
||
self._ticket_interface(event_id, host, iface, consec)
|
||
|
||
if host_has_regression:
|
||
hosts_with_regression.append(host)
|
||
|
||
db.set_baseline(new_baseline)
|
||
|
||
# Cluster-wide check – only genuine regressions count
|
||
if len(hosts_with_regression) >= self.cluster_thresh:
|
||
sup = db.check_suppressed(suppressions, 'all', '')
|
||
event_id, is_new, consec, ticket_id = db.upsert_event(
|
||
'cluster_network_issue', 'critical', 'prometheus',
|
||
self.cluster_name, '',
|
||
f'{len(hosts_with_regression)} hosts reporting simultaneous interface failures: '
|
||
f'{", ".join(hosts_with_regression)}',
|
||
)
|
||
if not sup and (is_new or not ticket_id):
|
||
title = (
|
||
f'[{self.cluster_name}][auto][production][issue][network][cluster-wide] '
|
||
f'Multiple hosts reporting interface failures'
|
||
)
|
||
desc = (
|
||
f'Cluster Network Alert\n{"=" * 40}\n\n'
|
||
f'Affected hosts: {", ".join(hosts_with_regression)}\n'
|
||
f'Detected: {_now_utc()}\n\n'
|
||
f'{len(hosts_with_regression)} Proxmox hosts simultaneously reported '
|
||
f'interface regressions (link-down on interfaces previously known UP).\n'
|
||
f'This likely indicates a switch or upstream network failure.\n\n'
|
||
f'Please check the core and management switches immediately.'
|
||
)
|
||
tid = self.tickets.create(title, desc, priority='1')
|
||
if tid:
|
||
db.set_ticket_id(event_id, tid)
|
||
else:
|
||
db.resolve_event('cluster_network_issue', self.cluster_name, '')
|
||
|
||
def _ticket_interface(
|
||
self, event_id: int, host: str, iface: str, consec: int
|
||
) -> None:
|
||
title = (
|
||
f'[{host}][auto][production][issue][network][single-node] '
|
||
f'Interface {iface} link-down'
|
||
)
|
||
desc = (
|
||
f'Network Interface Alert\n{"=" * 40}\n\n'
|
||
f'Host: {host}\n'
|
||
f'Interface: {iface}\n'
|
||
f'Detected: {_now_utc()}\n'
|
||
f'Consecutive check failures: {consec}\n\n'
|
||
f'Interface {iface} on {host} is reporting link-down state via '
|
||
f'Prometheus node_exporter.\n\n'
|
||
f'Note: {host} may still be reachable via its other network interface.\n'
|
||
f'Please inspect the cable/SFP/switch port for {host}/{iface}.'
|
||
)
|
||
tid = self.tickets.create(title, desc, priority='2')
|
||
if tid:
|
||
db.set_ticket_id(event_id, tid)
|
||
|
||
# ------------------------------------------------------------------
|
||
# UniFi device monitoring
|
||
# ------------------------------------------------------------------
|
||
def _process_unifi(self, devices: Optional[List[dict]], suppressions: list) -> None:
|
||
if devices is None:
|
||
logger.warning('UniFi API unreachable this cycle')
|
||
return
|
||
|
||
for d in devices:
|
||
name = d['name']
|
||
if not d['connected']:
|
||
sup = db.check_suppressed(suppressions, 'unifi_device', name)
|
||
event_id, is_new, consec, ticket_id = db.upsert_event(
|
||
'unifi_device_offline', 'critical', 'unifi',
|
||
name, d.get('type', ''),
|
||
f'UniFi {name} ({d.get("ip","")}) offline ({_now_utc()})',
|
||
)
|
||
if not sup and consec >= self.fail_thresh and not ticket_id:
|
||
self._ticket_unifi(event_id, d)
|
||
else:
|
||
db.resolve_event('unifi_device_offline', name, d.get('type', ''))
|
||
|
||
def _ticket_unifi(self, event_id: int, device: dict) -> None:
|
||
name = device['name']
|
||
title = (
|
||
f'[{name}][auto][production][issue][network][single-node] '
|
||
f'UniFi device offline'
|
||
)
|
||
desc = (
|
||
f'UniFi Device Alert\n{"=" * 40}\n\n'
|
||
f'Device: {name}\n'
|
||
f'Type: {device.get("type","unknown")}\n'
|
||
f'Model: {device.get("model","")}\n'
|
||
f'Last Known IP: {device.get("ip","unknown")}\n'
|
||
f'Detected: {_now_utc()}\n\n'
|
||
f'The UniFi device {name} is offline per the UniFi controller.\n'
|
||
f'Please check power and cable connectivity.'
|
||
)
|
||
tid = self.tickets.create(title, desc, priority='2')
|
||
if tid:
|
||
db.set_ticket_id(event_id, tid)
|
||
|
||
# ------------------------------------------------------------------
|
||
# Ping-only hosts (no node_exporter)
|
||
# ------------------------------------------------------------------
|
||
def _process_ping_hosts(self, suppressions: list, ping_states: Dict[str, bool]) -> None:
|
||
for h in self.cfg.get('monitor', {}).get('ping_hosts', []):
|
||
name, ip = h['name'], h['ip']
|
||
reachable = ping_states.get(name, False)
|
||
|
||
if not reachable:
|
||
sup = db.check_suppressed(suppressions, 'host', name)
|
||
event_id, is_new, consec, ticket_id = db.upsert_event(
|
||
'host_unreachable', 'critical', 'ping',
|
||
name, ip,
|
||
f'Host {name} ({ip}) unreachable via ping ({_now_utc()})',
|
||
)
|
||
if not sup and consec >= self.fail_thresh and not ticket_id:
|
||
self._ticket_unreachable(event_id, name, ip, consec)
|
||
else:
|
||
db.resolve_event('host_unreachable', name, ip)
|
||
|
||
def _ticket_unreachable(
|
||
self, event_id: int, name: str, ip: str, consec: int
|
||
) -> None:
|
||
title = (
|
||
f'[{name}][auto][production][issue][network][single-node] '
|
||
f'Host unreachable'
|
||
)
|
||
desc = (
|
||
f'Host Reachability Alert\n{"=" * 40}\n\n'
|
||
f'Host: {name}\n'
|
||
f'IP: {ip}\n'
|
||
f'Detected: {_now_utc()}\n'
|
||
f'Consecutive check failures: {consec}\n\n'
|
||
f'Host {name} ({ip}) is not responding to ping from the Gandalf monitor.\n'
|
||
f'This host does not have a Prometheus node_exporter, so interface-level '
|
||
f'detail is unavailable.\n\n'
|
||
f'Please check the host power, management interface, and network connectivity.'
|
||
)
|
||
tid = self.tickets.create(title, desc, priority='2')
|
||
if tid:
|
||
db.set_ticket_id(event_id, tid)
|
||
|
||
# ------------------------------------------------------------------
|
||
# Snapshot collection (for dashboard)
|
||
# ------------------------------------------------------------------
|
||
def _collect_snapshot(
|
||
self, iface_states: Dict[str, Dict[str, bool]],
|
||
unifi_devices: Optional[List[dict]] = None,
|
||
ping_states: Optional[Dict[str, bool]] = None,
|
||
) -> dict:
|
||
# Accept pre-fetched devices; fall back to empty list if unavailable
|
||
display_unifi = unifi_devices if unifi_devices is not None else []
|
||
|
||
hosts = {}
|
||
for instance, ifaces in iface_states.items():
|
||
if instance not in self._instance_map:
|
||
continue # skip Prometheus instances not in config (e.g. LXC app servers)
|
||
host = self._hostname(instance)
|
||
phys = {k: v for k, v in ifaces.items()}
|
||
up_count = sum(1 for v in phys.values() if v)
|
||
total = len(phys)
|
||
if total == 0 or up_count == total:
|
||
status = 'up'
|
||
elif up_count == 0:
|
||
status = 'down'
|
||
else:
|
||
status = 'degraded'
|
||
|
||
hosts[host] = {
|
||
'ip': instance.split(':')[0],
|
||
'interfaces': {k: ('up' if v else 'down') for k, v in phys.items()},
|
||
'status': status,
|
||
'source': 'prometheus',
|
||
}
|
||
|
||
for h in self.cfg.get('monitor', {}).get('ping_hosts', []):
|
||
name, ip = h['name'], h['ip']
|
||
reachable = (ping_states or {}).get(name, False)
|
||
hosts[name] = {
|
||
'ip': ip,
|
||
'interfaces': {},
|
||
'status': 'up' if reachable else 'down',
|
||
'source': 'ping',
|
||
}
|
||
|
||
return {
|
||
'hosts': hosts,
|
||
'unifi': display_unifi,
|
||
'updated': datetime.now(timezone.utc).isoformat().replace('+00:00', 'Z'),
|
||
}
|
||
|
||
# ------------------------------------------------------------------
|
||
# Main loop
|
||
# ------------------------------------------------------------------
|
||
def run(self) -> None:
|
||
logger.info(
|
||
f'Gandalf monitor started – poll_interval={self.poll_interval}s '
|
||
f'fail_thresh={self.fail_thresh}'
|
||
)
|
||
while True:
|
||
try:
|
||
logger.info('Starting network check cycle')
|
||
|
||
# 1. Fetch interface states once — shared by snapshot and alert processing
|
||
iface_states = self.prom.get_interface_states()
|
||
|
||
# 2. Fetch UniFi devices once — used by both snapshot and alert processing
|
||
unifi_devices = self.unifi.get_devices()
|
||
|
||
# 3a. Ping-only hosts once — shared by snapshot and alert processing
|
||
ping_states: Dict[str, bool] = {
|
||
h['name']: self.pulse.ping(h['ip'])
|
||
for h in self.cfg.get('monitor', {}).get('ping_hosts', [])
|
||
}
|
||
|
||
# 3b. Collect and store snapshot for dashboard
|
||
snapshot = self._collect_snapshot(iface_states, unifi_devices, ping_states)
|
||
db.set_state('network_snapshot', snapshot)
|
||
db.set_state('last_check', _now_utc())
|
||
|
||
# 4. Collect link stats (ethtool + traffic metrics)
|
||
try:
|
||
link_data = self.link_stats.collect(self._instance_map)
|
||
db.set_state('link_stats', link_data)
|
||
except Exception as e:
|
||
logger.error(f'Link stats collection failed: {e}', exc_info=True)
|
||
|
||
# 5. Process alerts using already-fetched data
|
||
suppressions = db.get_active_suppressions()
|
||
self._process_interfaces(iface_states, suppressions)
|
||
self._process_unifi(unifi_devices, suppressions)
|
||
|
||
self._process_ping_hosts(suppressions, ping_states)
|
||
|
||
# Housekeeping: deactivate expired suppressions and purge old resolved events
|
||
db.cleanup_expired_suppressions()
|
||
db.purge_old_resolved_events(days=90)
|
||
|
||
logger.info('Network check cycle complete')
|
||
|
||
except Exception as e:
|
||
logger.error(f'Monitor loop error: {e}', exc_info=True)
|
||
time.sleep(30)
|
||
continue
|
||
|
||
time.sleep(self.poll_interval)
|
||
|
||
|
||
if __name__ == '__main__':
|
||
monitor = NetworkMonitor()
|
||
monitor.run()
|