Files
gandalf/diagnose.py
Jared Vititoe b1dd5f9cad feat: deep link diagnostics via Pulse SSH
Adds comprehensive per-port link troubleshooting triggered from the
Inspector panel when a port has an LLDP-identified server counterpart.

- diagnose.py: DiagnosticsRunner with 15-section SSH command (carrier,
  operstate, sysfs counters, ethtool, ethtool -i/-a/-g/-S/-m, ip link,
  ip addr, ip route, dmesg, lldpctl); parsers for all sections; health
  analyzer with 14 check codes (NO_CARRIER, HALF_DUPLEX, SPEED_MISMATCH,
  SFP_RX_CRITICAL, CARRIER_FLAPPING, CRC_ERRORS_HIGH, LLDP_MISMATCH, etc.)
- monitor.py: PulseClient now tracks last_execution_id so callers can
  link back to the raw Pulse execution URL
- app.py: POST /api/diagnose + GET /api/diagnose/<job_id> with daemon
  thread background execution and 10-minute in-memory job store
- inspector.html: "Run Link Diagnostics" button (shown only when LLDP
  host is resolvable); full results panel: health banner, physical layer,
  SFP/DOM with power bars, NIC error counters, collapsible ethtool -S,
  flow control/ring buffers, driver info, LLDP 2-col validation,
  collapsible dmesg, switch port summary, "View in Pulse" link
- style.css: all .diag-* CSS classes with terminal aesthetic

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-03 16:03:54 -05:00

547 lines
21 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""Gandalf Link Diagnostics module.
Runs a comprehensive SSH-based diagnostic against a server NIC and
analyses the result against switch port data to surface root causes.
Executed in a background thread; results stored in _diag_jobs (app.py).
"""
import re
import shlex
import time
import logging
from typing import Dict, List, Optional, Tuple
logger = logging.getLogger('gandalf.diagnose')
# sysfs counters collected per interface
_SYSFS_STATS = [
'rx_bytes', 'tx_bytes', 'rx_errors', 'tx_errors',
'rx_dropped', 'tx_dropped', 'rx_crc_errors',
'rx_frame_errors', 'rx_fifo_errors', 'tx_carrier_errors',
'collisions', 'rx_missed_errors',
]
class DiagnosticsRunner:
"""Build and run a link diagnostic against a server NIC via PulseClient."""
def __init__(self, pulse_client):
self.pulse = pulse_client
# ------------------------------------------------------------------
# SSH command builder
# ------------------------------------------------------------------
@staticmethod
def build_ssh_command(host_ip: str, iface: str) -> str:
"""Return a single-line SSH command that collects all diagnostic data."""
q = shlex.quote(iface)
ip_q = shlex.quote(host_ip)
sysfs_loop = '; '.join(
f'echo "{s}:$(cat /sys/class/net/{q}/statistics/{s} 2>/dev/null || echo 0)"'
for s in _SYSFS_STATS
)
remote_cmd = (
f'echo "=== carrier ===";'
f' cat /sys/class/net/{q}/carrier 2>/dev/null || echo "?";'
f' echo "=== operstate ===";'
f' cat /sys/class/net/{q}/operstate 2>/dev/null || echo "?";'
f' echo "=== sysfs_stats ===";'
f' {sysfs_loop};'
f' echo "=== carrier_changes ===";'
f' cat /sys/class/net/{q}/carrier_changes 2>/dev/null || echo "0";'
f' echo "=== ethtool ===";'
f' ethtool {q} 2>/dev/null;'
f' echo "=== ethtool_driver ===";'
f' ethtool -i {q} 2>/dev/null;'
f' echo "=== ethtool_pause ===";'
f' ethtool -a {q} 2>/dev/null;'
f' echo "=== ethtool_ring ===";'
f' ethtool -g {q} 2>/dev/null;'
f' echo "=== ethtool_stats ===";'
f' ethtool -S {q} 2>/dev/null;'
f' echo "=== ethtool_dom ===";'
f' ethtool -m {q} 2>/dev/null;'
f' echo "=== ip_link ===";'
f' ip -s link show {q} 2>/dev/null;'
f' echo "=== ip_addr ===";'
f' ip addr show {q} 2>/dev/null;'
f' echo "=== ip_route ===";'
f' ip route show dev {q} 2>/dev/null;'
f' echo "=== dmesg ===";'
f' dmesg 2>/dev/null | grep {q} | tail -50;'
f' echo "=== lldpctl ===";'
f' lldpctl 2>/dev/null || echo "lldpd not running";'
f' echo "=== end ==="'
)
return (
f'ssh -o StrictHostKeyChecking=no -o ConnectTimeout=5 '
f'-o LogLevel=ERROR root@{ip_q} \'{remote_cmd}\''
)
# ------------------------------------------------------------------
# Main entry point
# ------------------------------------------------------------------
def run(self, host_ip: str, host_name: str, iface: str,
switch_port_data: dict) -> dict:
"""Execute diagnostics and return structured result dict."""
cmd = self.build_ssh_command(host_ip, iface)
logger.info(f'Running link diagnostic: {host_name}/{iface} via Pulse')
# Reset execution_id before call
self.pulse.last_execution_id = None
output = self.pulse.run_command(cmd)
execution_id = getattr(self.pulse, 'last_execution_id', None)
if output is None:
return {
'status': 'error',
'error': 'Pulse command failed or timed out',
'host': host_name,
'iface': iface,
'pulse_execution_id': execution_id,
}
sections = self.parse_output(output)
health = self.analyze(sections, switch_port_data)
pulse_url = None
if execution_id:
pulse_url = f'http://pulse.lotusguild.org/executions/{execution_id}'
return {
'status': 'done',
'host': host_name,
'iface': iface,
'sections': sections,
'health': health,
'pulse_execution_id': execution_id,
'pulse_url': pulse_url,
'switch_port': switch_port_data,
}
# ------------------------------------------------------------------
# Output parser (splits on === SECTION_NAME === sentinels)
# ------------------------------------------------------------------
@staticmethod
def parse_output(raw: str) -> dict:
sections: Dict[str, str] = {}
current: Optional[str] = None
buf: List[str] = []
for line in raw.splitlines():
m = re.match(r'^=== (.+?) ===$', line.strip())
if m:
if current and current != 'end':
sections[current] = '\n'.join(buf).strip()
name = m.group(1)
if name == 'end':
current = None
else:
current = name
buf = []
elif current:
buf.append(line)
if current and current != 'end':
sections[current] = '\n'.join(buf).strip()
parsed: dict = {}
# Simple string sections
parsed['carrier'] = sections.get('carrier', '?').strip()
parsed['operstate'] = sections.get('operstate', '?').strip()
# carrier_changes
cc_raw = sections.get('carrier_changes', '0').strip()
try:
parsed['carrier_changes'] = int(cc_raw)
except ValueError:
parsed['carrier_changes'] = None
# Structured sections
parsed['sysfs_stats'] = DiagnosticsRunner.parse_sysfs_stats(sections.get('sysfs_stats', ''))
parsed['ethtool'] = DiagnosticsRunner.parse_ethtool(sections.get('ethtool', ''))
parsed['ethtool_driver'] = DiagnosticsRunner.parse_ethtool_driver(sections.get('ethtool_driver', ''))
parsed['ethtool_pause'] = DiagnosticsRunner.parse_ethtool_pause(sections.get('ethtool_pause', ''))
parsed['ethtool_ring'] = DiagnosticsRunner.parse_ethtool_ring(sections.get('ethtool_ring', ''))
parsed['ethtool_stats'] = DiagnosticsRunner.parse_nic_stats(sections.get('ethtool_stats', ''))
parsed['ethtool_dom'] = DiagnosticsRunner.parse_ethtool_dom(sections.get('ethtool_dom', ''))
parsed['ip_link'] = DiagnosticsRunner.parse_ip_link(sections.get('ip_link', ''))
parsed['ip_addr'] = sections.get('ip_addr', '').strip()
parsed['ip_route'] = sections.get('ip_route', '').strip()
parsed['dmesg'] = DiagnosticsRunner.parse_dmesg(sections.get('dmesg', ''))
parsed['lldpctl'] = DiagnosticsRunner.parse_lldpctl(sections.get('lldpctl', ''))
return parsed
# ------------------------------------------------------------------
# Individual parsers
# ------------------------------------------------------------------
@staticmethod
def parse_sysfs_stats(text: str) -> dict:
result: dict = {}
for line in text.splitlines():
if ':' not in line:
continue
key, _, val = line.partition(':')
key = key.strip()
val = val.strip()
if key in _SYSFS_STATS:
try:
result[key] = int(val)
except ValueError:
result[key] = 0
return result
@staticmethod
def parse_ethtool(text: str) -> dict:
"""Parse ethtool <iface> output."""
data: dict = {}
for line in text.splitlines():
if ':' not in line:
continue
key, _, val = line.partition(':')
key = key.strip()
val = val.strip()
if key == 'Speed':
m = re.match(r'(\d+)\s*Mb/s', val)
if m:
data['speed_mbps'] = int(m.group(1))
elif 'Unknown' in val or 'unknown' in val:
data['speed_mbps'] = None
elif key == 'Duplex':
data['duplex'] = val.lower()
elif key == 'Port':
data['port_type'] = val
elif key == 'Auto-negotiation':
data['auto_neg'] = (val.lower() == 'on')
elif key == 'Link detected':
data['link_detected'] = (val.lower() == 'yes')
elif 'Supported link modes' in key:
data.setdefault('supported_modes', []).append(val)
return data
@staticmethod
def parse_ethtool_driver(text: str) -> dict:
data: dict = {}
for line in text.splitlines():
if ':' not in line:
continue
key, _, val = line.partition(':')
key = key.strip()
val = val.strip()
if key == 'driver':
data['driver'] = val
elif key == 'version':
data['version'] = val
elif key == 'firmware-version':
data['firmware_version'] = val
elif key == 'bus-info':
data['bus_info'] = val
return data
@staticmethod
def parse_ethtool_pause(text: str) -> dict:
data = {'rx_pause': False, 'tx_pause': False}
for line in text.splitlines():
if ':' not in line:
continue
key, _, val = line.partition(':')
key = key.strip()
val = val.strip().lower()
if key == 'RX':
data['rx_pause'] = (val == 'on')
elif key == 'TX':
data['tx_pause'] = (val == 'on')
return data
@staticmethod
def parse_ethtool_ring(text: str) -> dict:
data: dict = {}
in_current = False
for line in text.splitlines():
if 'Current hardware settings' in line:
in_current = True
continue
if 'Pre-set maximums' in line:
in_current = False
continue
if ':' not in line:
continue
key, _, val = line.partition(':')
key = key.strip()
val = val.strip()
try:
v = int(val)
except ValueError:
continue
if in_current:
if 'RX' in key and 'rx_current' not in data:
data['rx_current'] = v
elif 'TX' in key and 'tx_current' not in data:
data['tx_current'] = v
else:
if 'RX' in key and 'rx_max' not in data:
data['rx_max'] = v
elif 'TX' in key and 'tx_max' not in data:
data['tx_max'] = v
return data
@staticmethod
def parse_nic_stats(text: str) -> dict:
"""Parse ethtool -S output into {key: int} dict."""
data: dict = {}
for line in text.splitlines():
if ':' not in line:
continue
key, _, val = line.partition(':')
key = key.strip()
val = val.strip()
try:
data[key] = int(val)
except ValueError:
pass
return data
@staticmethod
def parse_ethtool_dom(text: str) -> dict:
"""Parse ethtool -m (SFP DOM) output."""
if not text:
return {}
lower = text.lower()
if any(s in lower for s in ('cannot get', 'not supported', 'no sfp', 'operation not supported')):
return {}
data: dict = {}
for line in text.splitlines():
if ':' not in line:
continue
key, _, val = line.partition(':')
key = key.strip()
val = val.strip()
if key == 'Vendor name':
data['vendor'] = val
elif key == 'Vendor PN':
data['part_no'] = val
elif key == 'Identifier':
m = re.search(r'\((.+?)\)', val)
if m:
data['sfp_type'] = m.group(1)
elif key == 'Connector':
m = re.search(r'\((.+?)\)', val)
if m:
data['connector'] = m.group(1)
elif key == 'Laser wavelength':
m = re.match(r'(\d+)', val)
if m:
data['wavelength_nm'] = int(m.group(1))
elif key == 'Laser bias current':
m = re.match(r'([\d.]+)\s+mA', val)
if m:
data['bias_ma'] = float(m.group(1))
elif key == 'Laser output power':
m = re.search(r'/\s*([-\d.]+)\s*dBm', val)
if m:
try:
data['tx_power_dbm'] = float(m.group(1))
except ValueError:
pass
elif 'receiver' in key.lower() and ('power' in key.lower() or 'optical' in key.lower()):
m = re.search(r'/\s*([-\d.]+)\s*dBm', val)
if m:
try:
data['rx_power_dbm'] = float(m.group(1))
except ValueError:
pass
elif key == 'Module temperature':
m = re.match(r'([\d.]+)\s+degrees', val)
if m:
data['temp_c'] = float(m.group(1))
elif key == 'Module voltage':
m = re.match(r'([\d.]+)\s+V', val)
if m:
data['voltage_v'] = float(m.group(1))
return data
@staticmethod
def parse_ip_link(text: str) -> dict:
"""Parse ip -s link show output for basic link state and counters."""
data: dict = {}
lines = text.splitlines()
for i, line in enumerate(lines):
# MTU and state: "2: eth0: <BROADCAST,MULTICAST,UP,LOWER_UP> mtu 1500 ..."
m = re.search(r'mtu\s+(\d+)', line)
if m:
data['mtu'] = int(m.group(1))
m = re.search(r'state\s+(\S+)', line)
if m:
data['state'] = m.group(1).lower()
# RX line follows "RX:" label
if line.strip().startswith('RX:') and i + 1 < len(lines):
vals = lines[i + 1].split()
if len(vals) >= 5:
try:
data['ip_rx_bytes'] = int(vals[0])
data['ip_rx_packets'] = int(vals[1])
data['ip_rx_errors'] = int(vals[2])
data['ip_rx_dropped'] = int(vals[3])
except (ValueError, IndexError):
pass
if line.strip().startswith('TX:') and i + 1 < len(lines):
vals = lines[i + 1].split()
if len(vals) >= 5:
try:
data['ip_tx_bytes'] = int(vals[0])
data['ip_tx_packets'] = int(vals[1])
data['ip_tx_errors'] = int(vals[2])
data['ip_tx_dropped'] = int(vals[3])
except (ValueError, IndexError):
pass
return data
@staticmethod
def parse_dmesg(text: str) -> List[dict]:
"""Parse dmesg lines into [{timestamp, msg, severity}]."""
events = []
for line in text.splitlines():
if not line.strip():
continue
# Extract timestamp from [ 123.456789]
m = re.match(r'^\[\s*([\d.]+)\]\s*(.*)', line)
if m:
ts = m.group(1)
msg = m.group(2)
else:
ts = ''
msg = line
lower = msg.lower()
if any(w in lower for w in ('error', 'fail', 'reset', 'panic', 'oops', 'hung', 'timeout')):
severity = 'error'
elif any(w in lower for w in ('warn', 'drop', 'lost', 'miss')):
severity = 'warn'
else:
severity = 'info'
events.append({'timestamp': ts, 'msg': msg, 'severity': severity})
return events
@staticmethod
def parse_lldpctl(text: str) -> dict:
"""Extract neighbor info from lldpctl output."""
if not text or 'lldpd not running' in text or 'not found' in text.lower():
return {'available': False}
data: dict = {'available': True}
for line in text.splitlines():
if ':' not in line:
continue
key, _, val = line.partition(':')
key = key.strip()
val = val.strip()
if 'SysName' in key:
data['neighbor_system'] = val
elif 'PortID' in key and 'neighbor_port' not in data:
data['neighbor_port'] = val
elif 'ChassisID' in key and 'neighbor_chassis_id' not in data:
data['neighbor_chassis_id'] = val
return data
# ------------------------------------------------------------------
# Health analysis
# ------------------------------------------------------------------
@staticmethod
def analyze(sections: dict, switch_port_data: dict) -> dict:
"""Return {issues: [...], warnings: [...], info: [...]} health analysis."""
issues: List[dict] = []
warnings: List[dict] = []
info: List[dict] = []
def add(collection, code, message):
collection.append({'code': code, 'message': message})
carrier = sections.get('carrier', '?')
eth = sections.get('ethtool', {})
sysfs = sections.get('sysfs_stats', {})
dom = sections.get('ethtool_dom', {})
dmesg = sections.get('dmesg', [])
lldp = sections.get('lldpctl', {})
cc = sections.get('carrier_changes')
# Physical carrier
if carrier == '0':
add(issues, 'NO_CARRIER',
'No physical carrier — cable/SFP disconnected or switch port disabled')
elif eth.get('link_detected') is False and carrier != '0':
add(issues, 'LINK_NOT_DETECTED',
'NIC does not detect link signal despite carrier sysfs reading non-zero')
# Duplex
if eth.get('duplex') == 'half':
add(issues, 'HALF_DUPLEX',
'Half-duplex detected — likely duplex mismatch; force full-duplex on both ends')
# Speed mismatch (switch vs server NIC)
sw_speed = switch_port_data.get('speed_mbps', 0) or 0
srv_speed = eth.get('speed_mbps', 0) or 0
if sw_speed > 0 and srv_speed > 0 and sw_speed != srv_speed:
add(warnings, 'SPEED_MISMATCH',
f'Speed mismatch: switch reports {sw_speed} Mbps, NIC reports {srv_speed} Mbps')
# SFP DOM power levels
rx_dbm = dom.get('rx_power_dbm')
tx_dbm = dom.get('tx_power_dbm')
if rx_dbm is not None:
if rx_dbm < -25:
add(issues, 'SFP_RX_CRITICAL',
f'RX power critically low ({rx_dbm:.2f} dBm) — fiber not connected or SFP failed')
elif rx_dbm < -18:
add(warnings, 'SFP_RX_LOW',
f'RX power low ({rx_dbm:.2f} dBm) — check fiber cleanliness and SFP seating')
if tx_dbm is not None and tx_dbm < -10:
add(warnings, 'SFP_TX_LOW',
f'TX power low ({tx_dbm:.2f} dBm) — SFP may be failing or requires cleaning')
# Carrier changes (flapping)
if cc is not None:
if cc > 100:
add(issues, 'CARRIER_FLAPPING',
f'Link has flapped {cc} times — severe physical instability')
elif cc > 20:
add(warnings, 'CARRIER_FLAPS',
f'Link has flapped {cc} times — intermittent physical issue')
# CRC errors
crc = sysfs.get('rx_crc_errors', 0) or 0
if crc > 100:
add(issues, 'CRC_ERRORS_HIGH',
f'High CRC error count ({crc}) — dirty fiber/connector or cable damage')
elif crc > 10:
add(warnings, 'CRC_ERRORS_LOW',
f'CRC errors present ({crc}) — cable or SFP quality issue')
# Kernel events
err_events = [e for e in dmesg if e['severity'] == 'error']
if err_events:
add(warnings, 'KERNEL_EVENTS',
f'{len(err_events)} recent kernel error event(s) for this interface in dmesg')
# LLDP validation
if lldp.get('available'):
sw_lldp = switch_port_data.get('lldp') or {}
sw_system = (sw_lldp.get('system_name') or '').lower()
srv_neighbor = (lldp.get('neighbor_system') or '').lower()
if sw_system and srv_neighbor and sw_system not in srv_neighbor and srv_neighbor not in sw_system:
add(warnings, 'LLDP_MISMATCH',
f'LLDP mismatch: switch sees "{sw_lldp.get("system_name")}" but '
f'server lldpctl sees "{lldp.get("neighbor_system")}" — cross-cabled port?')
else:
add(info, 'LLDP_MISSING',
'lldpd not running on server — install lldpd for full path validation')
return {'issues': issues, 'warnings': warnings, 'info': info}