#!/usr/bin/env python3 """Gandalf network monitor daemon. Polls Prometheus (node_exporter) and the UniFi controller for network interface and device state. Creates tickets in Tinker Tickets when issues are detected, with deduplication and suppression support. Run as a separate systemd service alongside the Flask web app. """ import json import logging import re import shlex import subprocess import time from datetime import datetime from typing import Dict, List, Optional import requests from urllib3.exceptions import InsecureRequestWarning import db requests.packages.urllib3.disable_warnings(InsecureRequestWarning) logging.basicConfig( level=logging.INFO, format='%(asctime)s %(levelname)s %(name)s %(message)s', ) logger = logging.getLogger('gandalf.monitor') # -------------------------------------------------------------------------- # Interface filtering # -------------------------------------------------------------------------- _SKIP_PREFIXES = ( 'lo', 'veth', 'tap', 'fwbr', 'fwln', 'fwpr', 'docker', 'dummy', 'br-', 'virbr', 'vmbr', ) _VLAN_SUFFIX = re.compile(r'\.\d+$') def is_physical_interface(name: str) -> bool: """Return True for physical/bond interfaces worth monitoring.""" if any(name.startswith(p) for p in _SKIP_PREFIXES): return False if _VLAN_SUFFIX.search(name): return False return True # -------------------------------------------------------------------------- # Prometheus client # -------------------------------------------------------------------------- class PrometheusClient: def __init__(self, url: str): self.url = url.rstrip('/') def query(self, promql: str) -> list: try: resp = requests.get( f'{self.url}/api/v1/query', params={'query': promql}, timeout=15, ) resp.raise_for_status() data = resp.json() if data.get('status') == 'success': return data['data']['result'] except Exception as e: logger.error(f'Prometheus query failed ({promql!r}): {e}') return [] def get_interface_states(self) -> Dict[str, Dict[str, bool]]: """Return {instance: {device: is_up}} for physical interfaces.""" results = self.query('node_network_up') hosts: Dict[str, Dict[str, bool]] = {} for r in results: instance = r['metric'].get('instance', '') device = r['metric'].get('device', '') if not is_physical_interface(device): continue hosts.setdefault(instance, {})[device] = (r['value'][1] == '1') return hosts # -------------------------------------------------------------------------- # UniFi client # -------------------------------------------------------------------------- class UnifiClient: def __init__(self, cfg: dict): self.base_url = cfg['controller'] self.site_id = cfg.get('site_id', 'default') self.session = requests.Session() self.session.verify = False self.headers = { 'X-API-KEY': cfg['api_key'], 'Accept': 'application/json', } def get_devices(self) -> Optional[List[dict]]: """Return list of UniFi devices, or None if the controller is unreachable.""" try: url = f'{self.base_url}/proxy/network/v2/api/site/{self.site_id}/device' resp = self.session.get(url, headers=self.headers, timeout=15) resp.raise_for_status() data = resp.json() devices = [] for d in data.get('network_devices', []): state = d.get('state', 1) devices.append({ 'name': d.get('name') or d.get('mac', 'unknown'), 'mac': d.get('mac', ''), 'ip': d.get('ip', ''), 'type': d.get('type', 'unknown'), 'model': d.get('model', ''), 'state': state, 'connected': state == 1, }) return devices except Exception as e: logger.error(f'UniFi API error: {e}') return None def get_switch_ports(self) -> Optional[Dict[str, dict]]: """Return per-port stats for all UniFi switches, keyed by switch name. Uses the v1 stat API which includes full port_table data. Returns {switch_name: {'ip': str, 'model': str, 'ports': {port_name: {...}}}}. """ try: url = f'{self.base_url}/proxy/network/api/s/{self.site_id}/stat/device' resp = self.session.get(url, headers=self.headers, timeout=15) resp.raise_for_status() devices = resp.json().get('data', []) result: Dict[str, dict] = {} for dev in devices: if dev.get('type', '').lower() != 'usw': continue sw_name = dev.get('name') or dev.get('mac', 'unknown') sw_ip = dev.get('ip', '') sw_model = dev.get('model', '') ports: Dict[str, dict] = {} # Build LLDP neighbor map (keyed by port_idx) lldp_map: Dict[int, dict] = {} for entry in dev.get('lldp_table', []): pidx = entry.get('lldp_port_idx') if pidx is not None: lldp_map[int(pidx)] = { 'chassis_id': entry.get('chassis_id', ''), 'system_name': entry.get('system_name', ''), 'port_id': entry.get('port_id', ''), 'port_desc': entry.get('port_desc', ''), 'mgmt_ips': entry.get('management_ips', []), } for port in dev.get('port_table', []): idx = port.get('port_idx', 0) pname = port.get('name') or f'Port {idx}' raw_poe = port.get('poe_power') raw_poe_max = port.get('poe_max_power') ports[pname] = { 'port_idx': idx, 'switch_ip': sw_ip, 'up': port.get('up', False), 'speed_mbps': port.get('speed', 0), 'full_duplex': port.get('full_duplex', False), 'autoneg': port.get('autoneg', False), 'is_uplink': port.get('is_uplink', False), 'media': port.get('media', ''), 'poe_power': float(raw_poe) if raw_poe is not None else None, 'poe_class': port.get('poe_class'), 'poe_max_power': float(raw_poe_max) if raw_poe_max is not None else None, 'poe_mode': port.get('poe_mode', ''), 'lldp': lldp_map.get(idx), 'tx_bytes': port.get('tx_bytes', 0), 'rx_bytes': port.get('rx_bytes', 0), 'tx_errors': port.get('tx_errors', 0), 'rx_errors': port.get('rx_errors', 0), 'tx_dropped': port.get('tx_dropped', 0), 'rx_dropped': port.get('rx_dropped', 0), } if ports: result[sw_name] = {'ip': sw_ip, 'model': sw_model, 'ports': ports} return result except Exception as e: logger.error(f'UniFi switch port stats error: {e}') return None # -------------------------------------------------------------------------- # Ticket client # -------------------------------------------------------------------------- class TicketClient: def __init__(self, cfg: dict): self.url = cfg.get('url', '') self.api_key = cfg.get('api_key', '') def create(self, title: str, description: str, priority: str = '2') -> Optional[str]: if not self.api_key or not self.url: logger.warning('Ticket API not configured – skipping ticket creation') return None try: resp = requests.post( self.url, json={ 'title': title, 'description': description, 'status': 'Open', 'priority': priority, 'category': 'Network', 'type': 'Issue', }, headers={'Authorization': f'Bearer {self.api_key}'}, timeout=15, ) resp.raise_for_status() data = resp.json() if data.get('success'): tid = data['ticket_id'] logger.info(f'Created ticket #{tid}: {title}') return tid if data.get('existing_ticket_id'): logger.info(f'Duplicate suppressed by API – existing #{data["existing_ticket_id"]}') return data['existing_ticket_id'] logger.warning(f'Unexpected ticket API response: {data}') except Exception as e: logger.error(f'Ticket creation failed: {e}') return None # -------------------------------------------------------------------------- # Pulse HTTP client (delegates SSH commands to Pulse worker) # -------------------------------------------------------------------------- class PulseClient: """Submit a command to a Pulse worker via the internal M2M API and poll for result.""" def __init__(self, cfg: dict): p = cfg.get('pulse', {}) self.url = p.get('url', '').rstrip('/') self.api_key = p.get('api_key', '') self.worker_id = p.get('worker_id', '') self.timeout = p.get('timeout', 45) self.last_execution_id: Optional[str] = None self.session = requests.Session() self.session.headers.update({ 'X-Gandalf-API-Key': self.api_key, 'Content-Type': 'application/json', }) def run_command(self, command: str, _retry: bool = True) -> Optional[str]: """Submit *command* to Pulse, poll until done, return stdout or None. Retries once automatically on transient submit failures or timeouts. """ self.last_execution_id = None if not self.url or not self.api_key or not self.worker_id: return None # Submit try: resp = self.session.post( f'{self.url}/api/internal/command', json={'worker_id': self.worker_id, 'command': command}, timeout=10, ) resp.raise_for_status() execution_id = resp.json()['execution_id'] self.last_execution_id = execution_id except Exception as e: logger.error(f'Pulse command submit failed: {e}') if _retry: logger.info('Retrying Pulse command submit in 5s...') time.sleep(5) return self.run_command(command, _retry=False) return None # Poll deadline = time.time() + self.timeout while time.time() < deadline: time.sleep(2) try: r = self.session.get( f'{self.url}/api/internal/executions/{execution_id}', timeout=10, ) r.raise_for_status() data = r.json() status = data.get('status') if status == 'completed': logs = data.get('logs', []) for entry in logs: if entry.get('action') == 'command_result': return entry.get('stdout', '') return '' if status in ('failed', 'timed_out', 'cancelled'): logger.error( f'Pulse execution {execution_id} ended with status={status!r}; ' f'view at {self.url}/executions/{execution_id}' ) if _retry and status != 'cancelled': logger.info('Retrying failed Pulse command in 5s...') time.sleep(5) return self.run_command(command, _retry=False) return None except Exception as e: logger.error(f'Pulse poll failed for {execution_id}: {e}') logger.warning( f'Pulse command timed out after {self.timeout}s ' f'(execution_id={execution_id}); ' f'view at {self.url}/executions/{execution_id}' ) if _retry: logger.info('Retrying timed-out Pulse command in 5s...') time.sleep(5) return self.run_command(command, _retry=False) return None # -------------------------------------------------------------------------- # Link stats collector (ethtool + Prometheus traffic metrics) # -------------------------------------------------------------------------- class LinkStatsCollector: """Collects detailed per-interface statistics via SSH (ethtool) and Prometheus, plus per-port stats from UniFi switches.""" def __init__(self, cfg: dict, prom: 'PrometheusClient', unifi: Optional['UnifiClient'] = None): self.prom = prom self.pulse = PulseClient(cfg) self.unifi = unifi # State for UniFi rate calculation (previous snapshot + timestamp) self._prev_unifi: Dict[str, dict] = {} self._prev_unifi_time: float = 0.0 # ------------------------------------------------------------------ # SSH collection (via Pulse worker) # ------------------------------------------------------------------ def _ssh_batch(self, ip: str, ifaces: List[str]) -> Dict[str, dict]: """ Delegate one SSH session to the Pulse worker to collect ethtool + SFP DOM data for all *ifaces*. Returns {iface: {speed_mbps, duplex, ..., sfp: {...}}}. """ if not ifaces or not self.pulse.url: return {} # Validate interface names (kernel names only contain [a-zA-Z0-9_.-]) safe_ifaces = [i for i in ifaces if re.match(r'^[a-zA-Z0-9_.@-]+$', i)] if not safe_ifaces: return {} # Build a single shell command: for each iface output ethtool + -m with sentinels parts = [] for iface in safe_ifaces: q = shlex.quote(iface) parts.append( f'echo "___IFACE:{iface}___";' f' ethtool {q} 2>/dev/null;' f' echo "___DOM:{iface}___";' f' ethtool -m {q} 2>/dev/null;' f' echo "___END___"' ) shell_cmd = ' '.join(parts) ssh_cmd = ( f'ssh -o StrictHostKeyChecking=no -o ConnectTimeout=5 ' f'-o BatchMode=yes -o LogLevel=ERROR ' f'-o ServerAliveInterval=10 -o ServerAliveCountMax=2 ' f'root@{ip} "{shell_cmd}"' ) output = self.pulse.run_command(ssh_cmd) if output is None: logger.error(f'Pulse ethtool collection returned None for {ip}') return {} return self._parse_ssh_output(output) @staticmethod def _parse_ssh_output(output: str) -> Dict[str, dict]: result: Dict[str, dict] = {} current_iface: Optional[str] = None current_section: Optional[str] = None buf: List[str] = [] def flush(iface, section, lines): if not iface or not lines: return text = '\n'.join(lines) if section == 'ethtool': result.setdefault(iface, {}).update( LinkStatsCollector._parse_ethtool(text) ) elif section == 'dom': sfp = LinkStatsCollector._parse_ethtool_m(text) if sfp: result.setdefault(iface, {})['sfp'] = sfp for line in output.splitlines(): if line.startswith('___IFACE:') and line.endswith('___'): flush(current_iface, current_section, buf) current_iface = line[9:-3] current_section = 'ethtool' buf = [] elif line.startswith('___DOM:') and line.endswith('___'): flush(current_iface, current_section, buf) current_iface = line[7:-3] current_section = 'dom' buf = [] elif line == '___END___': flush(current_iface, current_section, buf) current_iface = None current_section = None buf = [] else: buf.append(line) flush(current_iface, current_section, buf) return result @staticmethod def _parse_ethtool(output: str) -> dict: data: dict = {} for line in output.splitlines(): if ':' not in line: continue key, _, val = line.partition(':') key = key.strip() val = val.strip() if key == 'Speed': m = re.match(r'(\d+)Mb/s', val) if m: data['speed_mbps'] = int(m.group(1)) elif key == 'Duplex': data['duplex'] = val.lower() elif key == 'Port': data['port_type'] = val elif key == 'Auto-negotiation': data['auto_neg'] = (val.lower() == 'on') elif key == 'Link detected': data['link_detected'] = (val.lower() == 'yes') return data @staticmethod def _parse_ethtool_m(output: str) -> dict: """Parse ethtool -m (SFP DOM / digital optical monitoring) output.""" if not output: return {} # Skip if module diagnostics unsupported lower = output.lower() if 'cannot get' in lower or 'not supported' in lower or 'no sfp' in lower: return {} data: dict = {} for line in output.splitlines(): if ':' not in line: continue key, _, val = line.partition(':') key = key.strip() val = val.strip() if key == 'Vendor name': data['vendor'] = val elif key == 'Vendor PN': data['part_no'] = val elif key == 'Identifier': m = re.search(r'\((.+?)\)', val) if m: data['sfp_type'] = m.group(1) elif key == 'Connector': m = re.search(r'\((.+?)\)', val) if m: data['connector'] = m.group(1) elif key == 'Laser wavelength': m = re.match(r'(\d+)', val) if m: data['wavelength_nm'] = int(m.group(1)) elif key == 'Laser bias current': # e.g. "4.340 mA" m = re.match(r'([\d.]+)\s+mA', val) if m: data['bias_ma'] = float(m.group(1)) elif key == 'Laser output power': # e.g. "0.1234 mW / -9.09 dBm" m = re.search(r'/\s*([-\d.]+)\s*dBm', val) if m: try: data['tx_power_dbm'] = float(m.group(1)) except ValueError: pass elif 'receiver' in key.lower() and ('power' in key.lower() or 'optical' in key.lower()): m = re.search(r'/\s*([-\d.]+)\s*dBm', val) if m: try: data['rx_power_dbm'] = float(m.group(1)) except ValueError: pass elif key == 'Module temperature': # e.g. "36.00 degrees C / 96.80 degrees F" m = re.match(r'([\d.]+)\s+degrees', val) if m: data['temp_c'] = float(m.group(1)) elif key == 'Module voltage': # e.g. "3.3180 V" m = re.match(r'([\d.]+)\s+V', val) if m: data['voltage_v'] = float(m.group(1)) return data # ------------------------------------------------------------------ # Prometheus traffic / error metrics # ------------------------------------------------------------------ def _collect_prom_metrics(self) -> Dict[str, Dict[str, dict]]: """Return {instance: {device: {tx_bytes_rate, rx_bytes_rate, ...}}}.""" metrics: Dict[str, Dict[str, dict]] = {} queries = [ ('tx_bytes_rate', 'rate(node_network_transmit_bytes_total[5m])'), ('rx_bytes_rate', 'rate(node_network_receive_bytes_total[5m])'), ('tx_errs_rate', 'rate(node_network_transmit_errs_total[5m])'), ('rx_errs_rate', 'rate(node_network_receive_errs_total[5m])'), ('tx_drops_rate', 'rate(node_network_transmit_drop_total[5m])'), ('rx_drops_rate', 'rate(node_network_receive_drop_total[5m])'), ('carrier_changes', 'node_network_carrier_changes_total'), ] for field, promql in queries: for r in self.prom.query(promql): instance = r['metric'].get('instance', '') device = r['metric'].get('device', '') if not is_physical_interface(device): continue raw = r['value'][1] try: val: Optional[float] = float(raw) if val != val: # NaN val = None except (ValueError, TypeError): val = None metrics.setdefault(instance, {}).setdefault(device, {})[field] = val return metrics # ------------------------------------------------------------------ # Main collection entry point # ------------------------------------------------------------------ def collect(self, instance_map: Dict[str, str]) -> dict: """ Collect full link stats for all Prometheus-monitored hosts. *instance_map*: ``{'10.10.10.2:9100': 'large1', ...}`` Returns a dict suitable for ``db.set_state('link_stats', ...)``. """ prom_metrics = self._collect_prom_metrics() result_hosts: Dict[str, Dict[str, dict]] = {} for instance, iface_metrics in prom_metrics.items(): host = instance_map.get(instance, instance.split(':')[0]) host_ip = instance.split(':')[0] ifaces = list(iface_metrics.keys()) # SSH ethtool collection via Pulse worker — only for explicitly configured # hosts (instance_map keys). Hosts like postgresql/matrix may report # node_exporter metrics to Prometheus but don't need link diagnostics. ethtool_data: Dict[str, dict] = {} if self.pulse.url and ifaces and instance in instance_map: try: ethtool_data = self._ssh_batch(host_ip, ifaces) except Exception as e: logger.warning(f'ethtool collection failed for {host} ({host_ip}): {e}') # Merge Prometheus metrics + ethtool data per interface merged: Dict[str, dict] = {} for iface in ifaces: d: dict = {'host_ip': host_ip} d.update(iface_metrics.get(iface, {})) eth = ethtool_data.get(iface, {}) for k, v in eth.items(): if k != 'sfp': d[k] = v if 'sfp' in eth: d['sfp'] = eth['sfp'] merged[iface] = d result_hosts[host] = merged # Collect UniFi switch port stats unifi_switches: dict = {} if self.unifi: try: raw = self.unifi.get_switch_ports() if raw is not None: now = time.time() unifi_switches = self._compute_unifi_rates(raw, now) self._prev_unifi = raw self._prev_unifi_time = now except Exception as e: logger.warning(f'UniFi switch port collection failed: {e}') return { 'hosts': result_hosts, 'unifi_switches': unifi_switches, 'updated': datetime.utcnow().strftime('%Y-%m-%d %H:%M:%S UTC'), } def _compute_unifi_rates(self, raw: Dict[str, dict], now: float) -> Dict[str, dict]: """Compute per-port byte/error rates from delta against previous snapshot.""" dt = now - self._prev_unifi_time if self._prev_unifi_time > 0 else 0 def rate(new_val: int, old_val: int) -> Optional[float]: if dt <= 0: return None return max(0.0, (new_val - old_val) / dt) result: Dict[str, dict] = {} for sw_name, sw_data in raw.items(): prev_ports = self._prev_unifi.get(sw_name, {}).get('ports', {}) merged_ports: Dict[str, dict] = {} for pname, d in sw_data['ports'].items(): entry = dict(d) prev = prev_ports.get(pname, {}) entry['tx_bytes_rate'] = rate(d['tx_bytes'], prev.get('tx_bytes', 0)) entry['rx_bytes_rate'] = rate(d['rx_bytes'], prev.get('rx_bytes', 0)) entry['tx_errs_rate'] = rate(d['tx_errors'], prev.get('tx_errors', 0)) entry['rx_errs_rate'] = rate(d['rx_errors'], prev.get('rx_errors', 0)) entry['tx_drops_rate'] = rate(d['tx_dropped'], prev.get('tx_dropped', 0)) entry['rx_drops_rate'] = rate(d['rx_dropped'], prev.get('rx_dropped', 0)) merged_ports[pname] = entry result[sw_name] = {'ip': sw_data['ip'], 'model': sw_data['model'], 'ports': merged_ports} return result # -------------------------------------------------------------------------- # Helpers # -------------------------------------------------------------------------- def ping(ip: str, count: int = 3, timeout: int = 2) -> bool: try: r = subprocess.run( ['ping', '-c', str(count), '-W', str(timeout), ip], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, timeout=30, ) return r.returncode == 0 except Exception: return False def _now_utc() -> str: return datetime.utcnow().strftime('%Y-%m-%d %H:%M:%S UTC') # -------------------------------------------------------------------------- # Monitor # -------------------------------------------------------------------------- CLUSTER_NAME = 'proxmox-cluster' class NetworkMonitor: def __init__(self): with open('config.json') as f: self.cfg = json.load(f) prom_url = self.cfg['prometheus']['url'] self.prom = PrometheusClient(prom_url) self.unifi = UnifiClient(self.cfg['unifi']) self.tickets = TicketClient(self.cfg.get('ticket_api', {})) self.link_stats = LinkStatsCollector(self.cfg, self.prom, self.unifi) mon = self.cfg.get('monitor', {}) self.poll_interval = mon.get('poll_interval', 120) self.fail_thresh = mon.get('failure_threshold', 2) self.cluster_thresh = mon.get('cluster_threshold', 3) self.cluster_name = mon.get('cluster_name', CLUSTER_NAME) # Build Prometheus instance → hostname lookup self._instance_map: Dict[str, str] = { h['prometheus_instance']: h['name'] for h in self.cfg.get('hosts', []) if 'prometheus_instance' in h } def _hostname(self, instance: str) -> str: return self._instance_map.get(instance, instance.split(':')[0]) # ------------------------------------------------------------------ # Interface monitoring (Prometheus) # ------------------------------------------------------------------ def _process_interfaces(self, states: Dict[str, Dict[str, bool]], suppressions: list) -> None: baseline = db.get_baseline() new_baseline = {k: dict(v) for k, v in baseline.items()} # Only count hosts with genuine regressions (UP→DOWN) toward cluster threshold hosts_with_regression: List[str] = [] for instance, ifaces in states.items(): if instance not in self._instance_map: continue # skip unconfigured Prometheus instances host = self._hostname(instance) new_baseline.setdefault(host, {}) host_has_regression = False for iface, is_up in ifaces.items(): prev = baseline.get(host, {}).get(iface) # 'up', 'initial_down', or None if is_up: new_baseline[host][iface] = 'up' db.resolve_event('interface_down', host, iface) else: if prev is None: # First observation is down – could be unused port, don't alert new_baseline[host][iface] = 'initial_down' elif prev == 'initial_down': # Persistently down since first observation – no alert pass else: # prev == 'up' # Regression: was UP, now DOWN host_has_regression = True sup = ( db.check_suppressed(suppressions, 'interface', host, iface) or db.check_suppressed(suppressions, 'host', host) ) event_id, is_new, consec = db.upsert_event( 'interface_down', 'critical', 'prometheus', host, iface, f'Interface {iface} on {host} went link-down ({_now_utc()})', ) if not sup and consec >= self.fail_thresh: self._ticket_interface(event_id, is_new, host, iface, consec) if host_has_regression: hosts_with_regression.append(host) db.set_baseline(new_baseline) # Cluster-wide check – only genuine regressions count if len(hosts_with_regression) >= self.cluster_thresh: sup = db.check_suppressed(suppressions, 'all', '') event_id, is_new, consec = db.upsert_event( 'cluster_network_issue', 'critical', 'prometheus', self.cluster_name, '', f'{len(hosts_with_regression)} hosts reporting simultaneous interface failures: ' f'{", ".join(hosts_with_regression)}', ) if not sup and is_new: title = ( f'[{self.cluster_name}][auto][production][issue][network][cluster-wide] ' f'Multiple hosts reporting interface failures' ) desc = ( f'Cluster Network Alert\n{"=" * 40}\n\n' f'Affected hosts: {", ".join(hosts_with_regression)}\n' f'Detected: {_now_utc()}\n\n' f'{len(hosts_with_regression)} Proxmox hosts simultaneously reported ' f'interface regressions (link-down on interfaces previously known UP).\n' f'This likely indicates a switch or upstream network failure.\n\n' f'Please check the core and management switches immediately.' ) tid = self.tickets.create(title, desc, priority='1') if tid: db.set_ticket_id(event_id, tid) else: db.resolve_event('cluster_network_issue', self.cluster_name, '') def _ticket_interface( self, event_id: int, is_new: bool, host: str, iface: str, consec: int ) -> None: title = ( f'[{host}][auto][production][issue][network][single-node] ' f'Interface {iface} link-down' ) desc = ( f'Network Interface Alert\n{"=" * 40}\n\n' f'Host: {host}\n' f'Interface: {iface}\n' f'Detected: {_now_utc()}\n' f'Consecutive check failures: {consec}\n\n' f'Interface {iface} on {host} is reporting link-down state via ' f'Prometheus node_exporter.\n\n' f'Note: {host} may still be reachable via its other network interface.\n' f'Please inspect the cable/SFP/switch port for {host}/{iface}.' ) tid = self.tickets.create(title, desc, priority='2') if tid and is_new: db.set_ticket_id(event_id, tid) # ------------------------------------------------------------------ # UniFi device monitoring # ------------------------------------------------------------------ def _process_unifi(self, devices: Optional[List[dict]], suppressions: list) -> None: if devices is None: logger.warning('UniFi API unreachable this cycle') return for d in devices: name = d['name'] if not d['connected']: sup = db.check_suppressed(suppressions, 'unifi_device', name) event_id, is_new, consec = db.upsert_event( 'unifi_device_offline', 'critical', 'unifi', name, d.get('type', ''), f'UniFi {name} ({d.get("ip","")}) offline ({_now_utc()})', ) if not sup and consec >= self.fail_thresh: self._ticket_unifi(event_id, is_new, d) else: db.resolve_event('unifi_device_offline', name, d.get('type', '')) def _ticket_unifi(self, event_id: int, is_new: bool, device: dict) -> None: name = device['name'] title = ( f'[{name}][auto][production][issue][network][single-node] ' f'UniFi device offline' ) desc = ( f'UniFi Device Alert\n{"=" * 40}\n\n' f'Device: {name}\n' f'Type: {device.get("type","unknown")}\n' f'Model: {device.get("model","")}\n' f'Last Known IP: {device.get("ip","unknown")}\n' f'Detected: {_now_utc()}\n\n' f'The UniFi device {name} is offline per the UniFi controller.\n' f'Please check power and cable connectivity.' ) tid = self.tickets.create(title, desc, priority='2') if tid and is_new: db.set_ticket_id(event_id, tid) # ------------------------------------------------------------------ # Ping-only hosts (no node_exporter) # ------------------------------------------------------------------ def _process_ping_hosts(self, suppressions: list) -> None: for h in self.cfg.get('monitor', {}).get('ping_hosts', []): name, ip = h['name'], h['ip'] reachable = ping(ip) if not reachable: sup = db.check_suppressed(suppressions, 'host', name) event_id, is_new, consec = db.upsert_event( 'host_unreachable', 'critical', 'ping', name, ip, f'Host {name} ({ip}) unreachable via ping ({_now_utc()})', ) if not sup and consec >= self.fail_thresh: self._ticket_unreachable(event_id, is_new, name, ip, consec) else: db.resolve_event('host_unreachable', name, ip) def _ticket_unreachable( self, event_id: int, is_new: bool, name: str, ip: str, consec: int ) -> None: title = ( f'[{name}][auto][production][issue][network][single-node] ' f'Host unreachable' ) desc = ( f'Host Reachability Alert\n{"=" * 40}\n\n' f'Host: {name}\n' f'IP: {ip}\n' f'Detected: {_now_utc()}\n' f'Consecutive check failures: {consec}\n\n' f'Host {name} ({ip}) is not responding to ping from the Gandalf monitor.\n' f'This host does not have a Prometheus node_exporter, so interface-level ' f'detail is unavailable.\n\n' f'Please check the host power, management interface, and network connectivity.' ) tid = self.tickets.create(title, desc, priority='2') if tid and is_new: db.set_ticket_id(event_id, tid) # ------------------------------------------------------------------ # Snapshot collection (for dashboard) # ------------------------------------------------------------------ def _collect_snapshot(self) -> dict: iface_states = self.prom.get_interface_states() unifi_devices = self.unifi.get_devices() or [] hosts = {} for instance, ifaces in iface_states.items(): if instance not in self._instance_map: continue # skip Prometheus instances not in config (e.g. LXC app servers) host = self._hostname(instance) phys = {k: v for k, v in ifaces.items()} up_count = sum(1 for v in phys.values() if v) total = len(phys) if total == 0 or up_count == total: status = 'up' elif up_count == 0: status = 'down' else: status = 'degraded' hosts[host] = { 'ip': instance.split(':')[0], 'interfaces': {k: ('up' if v else 'down') for k, v in phys.items()}, 'status': status, 'source': 'prometheus', } for h in self.cfg.get('monitor', {}).get('ping_hosts', []): name, ip = h['name'], h['ip'] reachable = ping(ip, count=1, timeout=2) hosts[name] = { 'ip': ip, 'interfaces': {}, 'status': 'up' if reachable else 'down', 'source': 'ping', } return { 'hosts': hosts, 'unifi': unifi_devices, 'updated': datetime.utcnow().isoformat(), } # ------------------------------------------------------------------ # Main loop # ------------------------------------------------------------------ def run(self) -> None: logger.info( f'Gandalf monitor started – poll_interval={self.poll_interval}s ' f'fail_thresh={self.fail_thresh}' ) while True: try: logger.info('Starting network check cycle') # 1. Collect and store snapshot for dashboard snapshot = self._collect_snapshot() db.set_state('network_snapshot', snapshot) db.set_state('last_check', _now_utc()) # 2. Collect link stats (ethtool + traffic metrics) try: link_data = self.link_stats.collect(self._instance_map) db.set_state('link_stats', link_data) except Exception as e: logger.error(f'Link stats collection failed: {e}', exc_info=True) # 3. Process alerts (separate Prometheus call for fresh data) # Load suppressions once per cycle to avoid N*M DB queries suppressions = db.get_active_suppressions() iface_states = self.prom.get_interface_states() self._process_interfaces(iface_states, suppressions) unifi_devices = self.unifi.get_devices() self._process_unifi(unifi_devices, suppressions) self._process_ping_hosts(suppressions) # Housekeeping: deactivate expired suppressions and purge old resolved events db.cleanup_expired_suppressions() db.purge_old_resolved_events(days=90) logger.info('Network check cycle complete') except Exception as e: logger.error(f'Monitor loop error: {e}', exc_info=True) time.sleep(self.poll_interval) if __name__ == '__main__': monitor = NetworkMonitor() monitor.run()