"""Gandalf – Link Diagnostics module. Runs a comprehensive SSH-based diagnostic against a server NIC and analyses the result against switch port data to surface root causes. Executed in a background thread; results stored in _diag_jobs (app.py). """ import re import shlex import time import logging from typing import Dict, List, Optional, Tuple logger = logging.getLogger('gandalf.diagnose') # sysfs counters collected per interface _SYSFS_STATS = [ 'rx_bytes', 'tx_bytes', 'rx_errors', 'tx_errors', 'rx_dropped', 'tx_dropped', 'rx_crc_errors', 'rx_frame_errors', 'rx_fifo_errors', 'tx_carrier_errors', 'collisions', 'rx_missed_errors', ] class DiagnosticsRunner: """Build and run a link diagnostic against a server NIC via PulseClient.""" def __init__(self, pulse_client): self.pulse = pulse_client # ------------------------------------------------------------------ # SSH command builder # ------------------------------------------------------------------ @staticmethod def build_ssh_command(host_ip: str, iface: str) -> str: """Return a single-line SSH command that collects all diagnostic data.""" q = shlex.quote(iface) ip_q = shlex.quote(host_ip) sysfs_loop = '; '.join( f'echo "{s}:$(cat /sys/class/net/{q}/statistics/{s} 2>/dev/null || echo 0)"' for s in _SYSFS_STATS ) remote_cmd = ( f'echo "=== carrier ===";' f' cat /sys/class/net/{q}/carrier 2>/dev/null || echo "?";' f' echo "=== operstate ===";' f' cat /sys/class/net/{q}/operstate 2>/dev/null || echo "?";' f' echo "=== sysfs_stats ===";' f' {sysfs_loop};' f' echo "=== carrier_changes ===";' f' cat /sys/class/net/{q}/carrier_changes 2>/dev/null || echo "0";' f' echo "=== ethtool ===";' f' ethtool {q} 2>/dev/null;' f' echo "=== ethtool_driver ===";' f' ethtool -i {q} 2>/dev/null;' f' echo "=== ethtool_pause ===";' f' ethtool -a {q} 2>/dev/null;' f' echo "=== ethtool_ring ===";' f' ethtool -g {q} 2>/dev/null;' f' echo "=== ethtool_stats ===";' f' ethtool -S {q} 2>/dev/null;' f' echo "=== ethtool_dom ===";' f' ethtool -m {q} 2>/dev/null;' f' echo "=== ip_link ===";' f' ip -s link show {q} 2>/dev/null;' f' echo "=== ip_addr ===";' f' ip addr show {q} 2>/dev/null;' f' echo "=== ip_route ===";' f' ip route show dev {q} 2>/dev/null;' f' echo "=== dmesg ===";' f' dmesg 2>/dev/null | grep {q} | tail -50;' f' echo "=== lldpctl ===";' f' lldpctl 2>/dev/null || echo "lldpd not running";' f' echo "=== end ==="' ) return ( f'ssh -o StrictHostKeyChecking=no -o ConnectTimeout=5 ' f'-o BatchMode=yes -o LogLevel=ERROR ' f'-o ServerAliveInterval=10 -o ServerAliveCountMax=2 ' f'root@{ip_q} \'{remote_cmd}\'' ) # ------------------------------------------------------------------ # Main entry point # ------------------------------------------------------------------ def run(self, host_ip: str, host_name: str, iface: str, switch_port_data: dict) -> dict: """Execute diagnostics and return structured result dict.""" cmd = self.build_ssh_command(host_ip, iface) logger.info(f'Running link diagnostic: {host_name}/{iface} via Pulse') # Reset execution_id before call self.pulse.last_execution_id = None output = self.pulse.run_command(cmd) execution_id = getattr(self.pulse, 'last_execution_id', None) if output is None: return { 'status': 'error', 'error': 'Pulse command failed or timed out', 'host': host_name, 'iface': iface, 'pulse_execution_id': execution_id, } sections = self.parse_output(output) health = self.analyze(sections, switch_port_data) pulse_url = None if execution_id: base = getattr(self.pulse, 'url', '').rstrip('/') pulse_url = f'{base}/executions/{execution_id}' if base else None return { 'status': 'done', 'host': host_name, 'iface': iface, 'sections': sections, 'health': health, 'pulse_execution_id': execution_id, 'pulse_url': pulse_url, 'switch_port': switch_port_data, } # ------------------------------------------------------------------ # Output parser (splits on === SECTION_NAME === sentinels) # ------------------------------------------------------------------ @staticmethod def parse_output(raw: str) -> dict: sections: Dict[str, str] = {} current: Optional[str] = None buf: List[str] = [] for line in raw.splitlines(): m = re.match(r'^=== (.+?) ===$', line.strip()) if m: if current and current != 'end': sections[current] = '\n'.join(buf).strip() name = m.group(1) if name == 'end': current = None else: current = name buf = [] elif current: buf.append(line) if current and current != 'end': sections[current] = '\n'.join(buf).strip() parsed: dict = {} # Simple string sections parsed['carrier'] = sections.get('carrier', '?').strip() parsed['operstate'] = sections.get('operstate', '?').strip() # carrier_changes cc_raw = sections.get('carrier_changes', '0').strip() try: parsed['carrier_changes'] = int(cc_raw) except ValueError: parsed['carrier_changes'] = None # Structured sections parsed['sysfs_stats'] = DiagnosticsRunner.parse_sysfs_stats(sections.get('sysfs_stats', '')) parsed['ethtool'] = DiagnosticsRunner.parse_ethtool(sections.get('ethtool', '')) parsed['ethtool_driver'] = DiagnosticsRunner.parse_ethtool_driver(sections.get('ethtool_driver', '')) parsed['ethtool_pause'] = DiagnosticsRunner.parse_ethtool_pause(sections.get('ethtool_pause', '')) parsed['ethtool_ring'] = DiagnosticsRunner.parse_ethtool_ring(sections.get('ethtool_ring', '')) parsed['ethtool_stats'] = DiagnosticsRunner.parse_nic_stats(sections.get('ethtool_stats', '')) parsed['ethtool_dom'] = DiagnosticsRunner.parse_ethtool_dom(sections.get('ethtool_dom', '')) parsed['ip_link'] = DiagnosticsRunner.parse_ip_link(sections.get('ip_link', '')) parsed['ip_addr'] = sections.get('ip_addr', '').strip() parsed['ip_route'] = sections.get('ip_route', '').strip() parsed['dmesg'] = DiagnosticsRunner.parse_dmesg(sections.get('dmesg', '')) parsed['lldpctl'] = DiagnosticsRunner.parse_lldpctl(sections.get('lldpctl', '')) return parsed # ------------------------------------------------------------------ # Individual parsers # ------------------------------------------------------------------ @staticmethod def parse_sysfs_stats(text: str) -> dict: result: dict = {} for line in text.splitlines(): if ':' not in line: continue key, _, val = line.partition(':') key = key.strip() val = val.strip() if key in _SYSFS_STATS: try: result[key] = int(val) except ValueError: result[key] = 0 return result @staticmethod def parse_ethtool(text: str) -> dict: """Parse ethtool output.""" data: dict = {} for line in text.splitlines(): if ':' not in line: continue key, _, val = line.partition(':') key = key.strip() val = val.strip() if key == 'Speed': m = re.match(r'(\d+)\s*Mb/s', val) if m: data['speed_mbps'] = int(m.group(1)) elif 'Unknown' in val or 'unknown' in val: data['speed_mbps'] = None elif key == 'Duplex': data['duplex'] = val.lower() elif key == 'Port': data['port_type'] = val elif key == 'Auto-negotiation': data['auto_neg'] = (val.lower() == 'on') elif key == 'Link detected': data['link_detected'] = (val.lower() == 'yes') elif 'Supported link modes' in key: data.setdefault('supported_modes', []).append(val) return data @staticmethod def parse_ethtool_driver(text: str) -> dict: data: dict = {} for line in text.splitlines(): if ':' not in line: continue key, _, val = line.partition(':') key = key.strip() val = val.strip() if key == 'driver': data['driver'] = val elif key == 'version': data['version'] = val elif key == 'firmware-version': data['firmware_version'] = val elif key == 'bus-info': data['bus_info'] = val return data @staticmethod def parse_ethtool_pause(text: str) -> dict: data = {'rx_pause': False, 'tx_pause': False} for line in text.splitlines(): if ':' not in line: continue key, _, val = line.partition(':') key = key.strip() val = val.strip().lower() if key == 'RX': data['rx_pause'] = (val == 'on') elif key == 'TX': data['tx_pause'] = (val == 'on') return data @staticmethod def parse_ethtool_ring(text: str) -> dict: data: dict = {} in_current = False for line in text.splitlines(): if 'Current hardware settings' in line: in_current = True continue if 'Pre-set maximums' in line: in_current = False continue if ':' not in line: continue key, _, val = line.partition(':') key = key.strip() val = val.strip() try: v = int(val) except ValueError: continue if in_current: if 'RX' in key and 'rx_current' not in data: data['rx_current'] = v elif 'TX' in key and 'tx_current' not in data: data['tx_current'] = v else: if 'RX' in key and 'rx_max' not in data: data['rx_max'] = v elif 'TX' in key and 'tx_max' not in data: data['tx_max'] = v return data @staticmethod def parse_nic_stats(text: str) -> dict: """Parse ethtool -S output into {key: int} dict.""" data: dict = {} for line in text.splitlines(): if ':' not in line: continue key, _, val = line.partition(':') key = key.strip() val = val.strip() try: data[key] = int(val) except ValueError: pass return data @staticmethod def parse_ethtool_dom(text: str) -> dict: """Parse ethtool -m (SFP DOM) output.""" if not text: return {} lower = text.lower() if any(s in lower for s in ('cannot get', 'not supported', 'no sfp', 'operation not supported')): return {} data: dict = {} for line in text.splitlines(): if ':' not in line: continue key, _, val = line.partition(':') key = key.strip() val = val.strip() if key == 'Vendor name': data['vendor'] = val elif key == 'Vendor PN': data['part_no'] = val elif key == 'Identifier': m = re.search(r'\((.+?)\)', val) if m: data['sfp_type'] = m.group(1) elif key == 'Connector': m = re.search(r'\((.+?)\)', val) if m: data['connector'] = m.group(1) elif key == 'Laser wavelength': m = re.match(r'(\d+)', val) if m: data['wavelength_nm'] = int(m.group(1)) elif key == 'Laser bias current': m = re.match(r'([\d.]+)\s+mA', val) if m: data['bias_ma'] = float(m.group(1)) elif key == 'Laser output power': m = re.search(r'/\s*([-\d.]+)\s*dBm', val) if m: try: data['tx_power_dbm'] = float(m.group(1)) except ValueError: pass elif 'receiver' in key.lower() and ('power' in key.lower() or 'optical' in key.lower()): m = re.search(r'/\s*([-\d.]+)\s*dBm', val) if m: try: data['rx_power_dbm'] = float(m.group(1)) except ValueError: pass elif key == 'Module temperature': m = re.match(r'([\d.]+)\s+degrees', val) if m: data['temp_c'] = float(m.group(1)) elif key == 'Module voltage': m = re.match(r'([\d.]+)\s+V', val) if m: data['voltage_v'] = float(m.group(1)) return data @staticmethod def parse_ip_link(text: str) -> dict: """Parse ip -s link show output for basic link state and counters.""" data: dict = {} lines = text.splitlines() for i, line in enumerate(lines): # MTU and state: "2: eth0: mtu 1500 ..." m = re.search(r'mtu\s+(\d+)', line) if m: data['mtu'] = int(m.group(1)) m = re.search(r'state\s+(\S+)', line) if m: data['state'] = m.group(1).lower() # RX line follows "RX:" label if line.strip().startswith('RX:') and i + 1 < len(lines): vals = lines[i + 1].split() if len(vals) >= 5: try: data['ip_rx_bytes'] = int(vals[0]) data['ip_rx_packets'] = int(vals[1]) data['ip_rx_errors'] = int(vals[2]) data['ip_rx_dropped'] = int(vals[3]) except (ValueError, IndexError): pass if line.strip().startswith('TX:') and i + 1 < len(lines): vals = lines[i + 1].split() if len(vals) >= 5: try: data['ip_tx_bytes'] = int(vals[0]) data['ip_tx_packets'] = int(vals[1]) data['ip_tx_errors'] = int(vals[2]) data['ip_tx_dropped'] = int(vals[3]) except (ValueError, IndexError): pass return data @staticmethod def parse_dmesg(text: str) -> List[dict]: """Parse dmesg lines into [{timestamp, msg, severity}].""" events = [] for line in text.splitlines(): if not line.strip(): continue # Extract timestamp from [ 123.456789] m = re.match(r'^\[\s*([\d.]+)\]\s*(.*)', line) if m: ts = m.group(1) msg = m.group(2) else: ts = '' msg = line lower = msg.lower() if any(w in lower for w in ('error', 'fail', 'reset', 'panic', 'oops', 'hung', 'timeout')): severity = 'error' elif any(w in lower for w in ('warn', 'drop', 'lost', 'miss')): severity = 'warn' else: severity = 'info' events.append({'timestamp': ts, 'msg': msg, 'severity': severity}) return events @staticmethod def parse_lldpctl(text: str) -> dict: """Extract neighbor info from lldpctl output.""" if not text or 'lldpd not running' in text or 'not found' in text.lower(): return {'available': False} data: dict = {'available': True} for line in text.splitlines(): if ':' not in line: continue key, _, val = line.partition(':') key = key.strip() val = val.strip() if 'SysName' in key: data['neighbor_system'] = val elif 'PortID' in key and 'neighbor_port' not in data: data['neighbor_port'] = val elif 'ChassisID' in key and 'neighbor_chassis_id' not in data: data['neighbor_chassis_id'] = val return data # ------------------------------------------------------------------ # Health analysis # ------------------------------------------------------------------ @staticmethod def analyze(sections: dict, switch_port_data: dict) -> dict: """Return {issues: [...], warnings: [...], info: [...]} health analysis.""" issues: List[dict] = [] warnings: List[dict] = [] info: List[dict] = [] def add(collection, code, message): collection.append({'code': code, 'message': message}) carrier = sections.get('carrier', '?') eth = sections.get('ethtool', {}) sysfs = sections.get('sysfs_stats', {}) dom = sections.get('ethtool_dom', {}) dmesg = sections.get('dmesg', []) lldp = sections.get('lldpctl', {}) cc = sections.get('carrier_changes') # Physical carrier if carrier == '0': add(issues, 'NO_CARRIER', 'No physical carrier — cable/SFP disconnected or switch port disabled') elif eth.get('link_detected') is False and carrier != '0': add(issues, 'LINK_NOT_DETECTED', 'NIC does not detect link signal despite carrier sysfs reading non-zero') # Duplex if eth.get('duplex') == 'half': add(issues, 'HALF_DUPLEX', 'Half-duplex detected — likely duplex mismatch; force full-duplex on both ends') # Speed mismatch (switch vs server NIC) sw_speed = switch_port_data.get('speed_mbps', 0) or 0 srv_speed = eth.get('speed_mbps', 0) or 0 if sw_speed > 0 and srv_speed > 0 and sw_speed != srv_speed: add(warnings, 'SPEED_MISMATCH', f'Speed mismatch: switch reports {sw_speed} Mbps, NIC reports {srv_speed} Mbps') # SFP DOM power levels rx_dbm = dom.get('rx_power_dbm') tx_dbm = dom.get('tx_power_dbm') if rx_dbm is not None: if rx_dbm < -25: add(issues, 'SFP_RX_CRITICAL', f'RX power critically low ({rx_dbm:.2f} dBm) — fiber not connected or SFP failed') elif rx_dbm < -18: add(warnings, 'SFP_RX_LOW', f'RX power low ({rx_dbm:.2f} dBm) — check fiber cleanliness and SFP seating') if tx_dbm is not None and tx_dbm < -10: add(warnings, 'SFP_TX_LOW', f'TX power low ({tx_dbm:.2f} dBm) — SFP may be failing or requires cleaning') # Carrier changes (flapping) if cc is not None: if cc > 100: add(issues, 'CARRIER_FLAPPING', f'Link has flapped {cc} times — severe physical instability') elif cc > 20: add(warnings, 'CARRIER_FLAPS', f'Link has flapped {cc} times — intermittent physical issue') # CRC errors crc = sysfs.get('rx_crc_errors', 0) or 0 if crc > 100: add(issues, 'CRC_ERRORS_HIGH', f'High CRC error count ({crc}) — dirty fiber/connector or cable damage') elif crc > 10: add(warnings, 'CRC_ERRORS_LOW', f'CRC errors present ({crc}) — cable or SFP quality issue') # Kernel events err_events = [e for e in dmesg if e['severity'] == 'error'] if err_events: add(warnings, 'KERNEL_EVENTS', f'{len(err_events)} recent kernel error event(s) for this interface in dmesg') # LLDP validation if lldp.get('available'): sw_lldp = switch_port_data.get('lldp') or {} sw_system = (sw_lldp.get('system_name') or '').lower() srv_neighbor = (lldp.get('neighbor_system') or '').lower() if sw_system and srv_neighbor and sw_system not in srv_neighbor and srv_neighbor not in sw_system: add(warnings, 'LLDP_MISMATCH', f'LLDP mismatch: switch sees "{sw_lldp.get("system_name")}" but ' f'server lldpctl sees "{lldp.get("neighbor_system")}" — cross-cabled port?') else: add(info, 'LLDP_MISSING', 'lldpd not running on server — install lldpd for full path validation') return {'issues': issues, 'warnings': warnings, 'info': info}