Files
gandalf/diagnose.py

550 lines
21 KiB
Python
Raw Permalink Normal View History

"""Gandalf Link Diagnostics module.
Runs a comprehensive SSH-based diagnostic against a server NIC and
analyses the result against switch port data to surface root causes.
Executed in a background thread; results stored in _diag_jobs (app.py).
"""
import re
import shlex
import time
import logging
from typing import Dict, List, Optional, Tuple
logger = logging.getLogger('gandalf.diagnose')
# sysfs counters collected per interface
_SYSFS_STATS = [
'rx_bytes', 'tx_bytes', 'rx_errors', 'tx_errors',
'rx_dropped', 'tx_dropped', 'rx_crc_errors',
'rx_frame_errors', 'rx_fifo_errors', 'tx_carrier_errors',
'collisions', 'rx_missed_errors',
]
class DiagnosticsRunner:
"""Build and run a link diagnostic against a server NIC via PulseClient."""
def __init__(self, pulse_client):
self.pulse = pulse_client
# ------------------------------------------------------------------
# SSH command builder
# ------------------------------------------------------------------
@staticmethod
def build_ssh_command(host_ip: str, iface: str) -> str:
"""Return a single-line SSH command that collects all diagnostic data."""
q = shlex.quote(iface)
ip_q = shlex.quote(host_ip)
sysfs_loop = '; '.join(
f'echo "{s}:$(cat /sys/class/net/{q}/statistics/{s} 2>/dev/null || echo 0)"'
for s in _SYSFS_STATS
)
remote_cmd = (
f'echo "=== carrier ===";'
f' cat /sys/class/net/{q}/carrier 2>/dev/null || echo "?";'
f' echo "=== operstate ===";'
f' cat /sys/class/net/{q}/operstate 2>/dev/null || echo "?";'
f' echo "=== sysfs_stats ===";'
f' {sysfs_loop};'
f' echo "=== carrier_changes ===";'
f' cat /sys/class/net/{q}/carrier_changes 2>/dev/null || echo "0";'
f' echo "=== ethtool ===";'
f' ethtool {q} 2>/dev/null;'
f' echo "=== ethtool_driver ===";'
f' ethtool -i {q} 2>/dev/null;'
f' echo "=== ethtool_pause ===";'
f' ethtool -a {q} 2>/dev/null;'
f' echo "=== ethtool_ring ===";'
f' ethtool -g {q} 2>/dev/null;'
f' echo "=== ethtool_stats ===";'
f' ethtool -S {q} 2>/dev/null;'
f' echo "=== ethtool_dom ===";'
f' ethtool -m {q} 2>/dev/null;'
f' echo "=== ip_link ===";'
f' ip -s link show {q} 2>/dev/null;'
f' echo "=== ip_addr ===";'
f' ip addr show {q} 2>/dev/null;'
f' echo "=== ip_route ===";'
f' ip route show dev {q} 2>/dev/null;'
f' echo "=== dmesg ===";'
f' dmesg 2>/dev/null | grep {q} | tail -50;'
f' echo "=== lldpctl ===";'
f' lldpctl 2>/dev/null || echo "lldpd not running";'
f' echo "=== end ==="'
)
return (
f'ssh -o StrictHostKeyChecking=no -o ConnectTimeout=5 '
f'-o BatchMode=yes -o LogLevel=ERROR '
f'-o ServerAliveInterval=10 -o ServerAliveCountMax=2 '
f'root@{ip_q} \'{remote_cmd}\''
)
# ------------------------------------------------------------------
# Main entry point
# ------------------------------------------------------------------
def run(self, host_ip: str, host_name: str, iface: str,
switch_port_data: dict) -> dict:
"""Execute diagnostics and return structured result dict."""
cmd = self.build_ssh_command(host_ip, iface)
logger.info(f'Running link diagnostic: {host_name}/{iface} via Pulse')
# Reset execution_id before call
self.pulse.last_execution_id = None
output = self.pulse.run_command(cmd)
execution_id = getattr(self.pulse, 'last_execution_id', None)
if output is None:
return {
'status': 'error',
'error': 'Pulse command failed or timed out',
'host': host_name,
'iface': iface,
'pulse_execution_id': execution_id,
}
sections = self.parse_output(output)
health = self.analyze(sections, switch_port_data)
pulse_url = None
if execution_id:
base = getattr(self.pulse, 'url', '').rstrip('/')
pulse_url = f'{base}/executions/{execution_id}' if base else None
return {
'status': 'done',
'host': host_name,
'iface': iface,
'sections': sections,
'health': health,
'pulse_execution_id': execution_id,
'pulse_url': pulse_url,
'switch_port': switch_port_data,
}
# ------------------------------------------------------------------
# Output parser (splits on === SECTION_NAME === sentinels)
# ------------------------------------------------------------------
@staticmethod
def parse_output(raw: str) -> dict:
sections: Dict[str, str] = {}
current: Optional[str] = None
buf: List[str] = []
for line in raw.splitlines():
m = re.match(r'^=== (.+?) ===$', line.strip())
if m:
if current and current != 'end':
sections[current] = '\n'.join(buf).strip()
name = m.group(1)
if name == 'end':
current = None
else:
current = name
buf = []
elif current:
buf.append(line)
if current and current != 'end':
sections[current] = '\n'.join(buf).strip()
parsed: dict = {}
# Simple string sections
parsed['carrier'] = sections.get('carrier', '?').strip()
parsed['operstate'] = sections.get('operstate', '?').strip()
# carrier_changes
cc_raw = sections.get('carrier_changes', '0').strip()
try:
parsed['carrier_changes'] = int(cc_raw)
except ValueError:
parsed['carrier_changes'] = None
# Structured sections
parsed['sysfs_stats'] = DiagnosticsRunner.parse_sysfs_stats(sections.get('sysfs_stats', ''))
parsed['ethtool'] = DiagnosticsRunner.parse_ethtool(sections.get('ethtool', ''))
parsed['ethtool_driver'] = DiagnosticsRunner.parse_ethtool_driver(sections.get('ethtool_driver', ''))
parsed['ethtool_pause'] = DiagnosticsRunner.parse_ethtool_pause(sections.get('ethtool_pause', ''))
parsed['ethtool_ring'] = DiagnosticsRunner.parse_ethtool_ring(sections.get('ethtool_ring', ''))
parsed['ethtool_stats'] = DiagnosticsRunner.parse_nic_stats(sections.get('ethtool_stats', ''))
parsed['ethtool_dom'] = DiagnosticsRunner.parse_ethtool_dom(sections.get('ethtool_dom', ''))
parsed['ip_link'] = DiagnosticsRunner.parse_ip_link(sections.get('ip_link', ''))
parsed['ip_addr'] = sections.get('ip_addr', '').strip()
parsed['ip_route'] = sections.get('ip_route', '').strip()
parsed['dmesg'] = DiagnosticsRunner.parse_dmesg(sections.get('dmesg', ''))
parsed['lldpctl'] = DiagnosticsRunner.parse_lldpctl(sections.get('lldpctl', ''))
return parsed
# ------------------------------------------------------------------
# Individual parsers
# ------------------------------------------------------------------
@staticmethod
def parse_sysfs_stats(text: str) -> dict:
result: dict = {}
for line in text.splitlines():
if ':' not in line:
continue
key, _, val = line.partition(':')
key = key.strip()
val = val.strip()
if key in _SYSFS_STATS:
try:
result[key] = int(val)
except ValueError:
result[key] = 0
return result
@staticmethod
def parse_ethtool(text: str) -> dict:
"""Parse ethtool <iface> output."""
data: dict = {}
for line in text.splitlines():
if ':' not in line:
continue
key, _, val = line.partition(':')
key = key.strip()
val = val.strip()
if key == 'Speed':
m = re.match(r'(\d+)\s*Mb/s', val)
if m:
data['speed_mbps'] = int(m.group(1))
elif 'Unknown' in val or 'unknown' in val:
data['speed_mbps'] = None
elif key == 'Duplex':
data['duplex'] = val.lower()
elif key == 'Port':
data['port_type'] = val
elif key == 'Auto-negotiation':
data['auto_neg'] = (val.lower() == 'on')
elif key == 'Link detected':
data['link_detected'] = (val.lower() == 'yes')
elif 'Supported link modes' in key:
data.setdefault('supported_modes', []).append(val)
return data
@staticmethod
def parse_ethtool_driver(text: str) -> dict:
data: dict = {}
for line in text.splitlines():
if ':' not in line:
continue
key, _, val = line.partition(':')
key = key.strip()
val = val.strip()
if key == 'driver':
data['driver'] = val
elif key == 'version':
data['version'] = val
elif key == 'firmware-version':
data['firmware_version'] = val
elif key == 'bus-info':
data['bus_info'] = val
return data
@staticmethod
def parse_ethtool_pause(text: str) -> dict:
data = {'rx_pause': False, 'tx_pause': False}
for line in text.splitlines():
if ':' not in line:
continue
key, _, val = line.partition(':')
key = key.strip()
val = val.strip().lower()
if key == 'RX':
data['rx_pause'] = (val == 'on')
elif key == 'TX':
data['tx_pause'] = (val == 'on')
return data
@staticmethod
def parse_ethtool_ring(text: str) -> dict:
data: dict = {}
in_current = False
for line in text.splitlines():
if 'Current hardware settings' in line:
in_current = True
continue
if 'Pre-set maximums' in line:
in_current = False
continue
if ':' not in line:
continue
key, _, val = line.partition(':')
key = key.strip()
val = val.strip()
try:
v = int(val)
except ValueError:
continue
if in_current:
if 'RX' in key and 'rx_current' not in data:
data['rx_current'] = v
elif 'TX' in key and 'tx_current' not in data:
data['tx_current'] = v
else:
if 'RX' in key and 'rx_max' not in data:
data['rx_max'] = v
elif 'TX' in key and 'tx_max' not in data:
data['tx_max'] = v
return data
@staticmethod
def parse_nic_stats(text: str) -> dict:
"""Parse ethtool -S output into {key: int} dict."""
data: dict = {}
for line in text.splitlines():
if ':' not in line:
continue
key, _, val = line.partition(':')
key = key.strip()
val = val.strip()
try:
data[key] = int(val)
except ValueError:
pass
return data
@staticmethod
def parse_ethtool_dom(text: str) -> dict:
"""Parse ethtool -m (SFP DOM) output."""
if not text:
return {}
lower = text.lower()
if any(s in lower for s in ('cannot get', 'not supported', 'no sfp', 'operation not supported')):
return {}
data: dict = {}
for line in text.splitlines():
if ':' not in line:
continue
key, _, val = line.partition(':')
key = key.strip()
val = val.strip()
if key == 'Vendor name':
data['vendor'] = val
elif key == 'Vendor PN':
data['part_no'] = val
elif key == 'Identifier':
m = re.search(r'\((.+?)\)', val)
if m:
data['sfp_type'] = m.group(1)
elif key == 'Connector':
m = re.search(r'\((.+?)\)', val)
if m:
data['connector'] = m.group(1)
elif key == 'Laser wavelength':
m = re.match(r'(\d+)', val)
if m:
data['wavelength_nm'] = int(m.group(1))
elif key == 'Laser bias current':
m = re.match(r'([\d.]+)\s+mA', val)
if m:
data['bias_ma'] = float(m.group(1))
elif key == 'Laser output power':
m = re.search(r'/\s*([-\d.]+)\s*dBm', val)
if m:
try:
data['tx_power_dbm'] = float(m.group(1))
except ValueError:
pass
elif 'receiver' in key.lower() and ('power' in key.lower() or 'optical' in key.lower()):
m = re.search(r'/\s*([-\d.]+)\s*dBm', val)
if m:
try:
data['rx_power_dbm'] = float(m.group(1))
except ValueError:
pass
elif key == 'Module temperature':
m = re.match(r'([\d.]+)\s+degrees', val)
if m:
data['temp_c'] = float(m.group(1))
elif key == 'Module voltage':
m = re.match(r'([\d.]+)\s+V', val)
if m:
data['voltage_v'] = float(m.group(1))
return data
@staticmethod
def parse_ip_link(text: str) -> dict:
"""Parse ip -s link show output for basic link state and counters."""
data: dict = {}
lines = text.splitlines()
for i, line in enumerate(lines):
# MTU and state: "2: eth0: <BROADCAST,MULTICAST,UP,LOWER_UP> mtu 1500 ..."
m = re.search(r'mtu\s+(\d+)', line)
if m:
data['mtu'] = int(m.group(1))
m = re.search(r'state\s+(\S+)', line)
if m:
data['state'] = m.group(1).lower()
# RX line follows "RX:" label
if line.strip().startswith('RX:') and i + 1 < len(lines):
vals = lines[i + 1].split()
if len(vals) >= 5:
try:
data['ip_rx_bytes'] = int(vals[0])
data['ip_rx_packets'] = int(vals[1])
data['ip_rx_errors'] = int(vals[2])
data['ip_rx_dropped'] = int(vals[3])
except (ValueError, IndexError):
pass
if line.strip().startswith('TX:') and i + 1 < len(lines):
vals = lines[i + 1].split()
if len(vals) >= 5:
try:
data['ip_tx_bytes'] = int(vals[0])
data['ip_tx_packets'] = int(vals[1])
data['ip_tx_errors'] = int(vals[2])
data['ip_tx_dropped'] = int(vals[3])
except (ValueError, IndexError):
pass
return data
@staticmethod
def parse_dmesg(text: str) -> List[dict]:
"""Parse dmesg lines into [{timestamp, msg, severity}]."""
events = []
for line in text.splitlines():
if not line.strip():
continue
# Extract timestamp from [ 123.456789]
m = re.match(r'^\[\s*([\d.]+)\]\s*(.*)', line)
if m:
ts = m.group(1)
msg = m.group(2)
else:
ts = ''
msg = line
lower = msg.lower()
if any(w in lower for w in ('error', 'fail', 'reset', 'panic', 'oops', 'hung', 'timeout')):
severity = 'error'
elif any(w in lower for w in ('warn', 'drop', 'lost', 'miss')):
severity = 'warn'
else:
severity = 'info'
events.append({'timestamp': ts, 'msg': msg, 'severity': severity})
return events
@staticmethod
def parse_lldpctl(text: str) -> dict:
"""Extract neighbor info from lldpctl output."""
if not text or 'lldpd not running' in text or 'not found' in text.lower():
return {'available': False}
data: dict = {'available': True}
for line in text.splitlines():
if ':' not in line:
continue
key, _, val = line.partition(':')
key = key.strip()
val = val.strip()
if 'SysName' in key:
data['neighbor_system'] = val
elif 'PortID' in key and 'neighbor_port' not in data:
data['neighbor_port'] = val
elif 'ChassisID' in key and 'neighbor_chassis_id' not in data:
data['neighbor_chassis_id'] = val
return data
# ------------------------------------------------------------------
# Health analysis
# ------------------------------------------------------------------
@staticmethod
def analyze(sections: dict, switch_port_data: dict) -> dict:
"""Return {issues: [...], warnings: [...], info: [...]} health analysis."""
issues: List[dict] = []
warnings: List[dict] = []
info: List[dict] = []
def add(collection, code, message):
collection.append({'code': code, 'message': message})
carrier = sections.get('carrier', '?')
eth = sections.get('ethtool', {})
sysfs = sections.get('sysfs_stats', {})
dom = sections.get('ethtool_dom', {})
dmesg = sections.get('dmesg', [])
lldp = sections.get('lldpctl', {})
cc = sections.get('carrier_changes')
# Physical carrier
if carrier == '0':
add(issues, 'NO_CARRIER',
'No physical carrier — cable/SFP disconnected or switch port disabled')
elif eth.get('link_detected') is False and carrier != '0':
add(issues, 'LINK_NOT_DETECTED',
'NIC does not detect link signal despite carrier sysfs reading non-zero')
# Duplex
if eth.get('duplex') == 'half':
add(issues, 'HALF_DUPLEX',
'Half-duplex detected — likely duplex mismatch; force full-duplex on both ends')
# Speed mismatch (switch vs server NIC)
sw_speed = switch_port_data.get('speed_mbps', 0) or 0
srv_speed = eth.get('speed_mbps', 0) or 0
if sw_speed > 0 and srv_speed > 0 and sw_speed != srv_speed:
add(warnings, 'SPEED_MISMATCH',
f'Speed mismatch: switch reports {sw_speed} Mbps, NIC reports {srv_speed} Mbps')
# SFP DOM power levels
rx_dbm = dom.get('rx_power_dbm')
tx_dbm = dom.get('tx_power_dbm')
if rx_dbm is not None:
if rx_dbm < -25:
add(issues, 'SFP_RX_CRITICAL',
f'RX power critically low ({rx_dbm:.2f} dBm) — fiber not connected or SFP failed')
elif rx_dbm < -18:
add(warnings, 'SFP_RX_LOW',
f'RX power low ({rx_dbm:.2f} dBm) — check fiber cleanliness and SFP seating')
if tx_dbm is not None and tx_dbm < -10:
add(warnings, 'SFP_TX_LOW',
f'TX power low ({tx_dbm:.2f} dBm) — SFP may be failing or requires cleaning')
# Carrier changes (flapping)
if cc is not None:
if cc > 100:
add(issues, 'CARRIER_FLAPPING',
f'Link has flapped {cc} times — severe physical instability')
elif cc > 20:
add(warnings, 'CARRIER_FLAPS',
f'Link has flapped {cc} times — intermittent physical issue')
# CRC errors
crc = sysfs.get('rx_crc_errors', 0) or 0
if crc > 100:
add(issues, 'CRC_ERRORS_HIGH',
f'High CRC error count ({crc}) — dirty fiber/connector or cable damage')
elif crc > 10:
add(warnings, 'CRC_ERRORS_LOW',
f'CRC errors present ({crc}) — cable or SFP quality issue')
# Kernel events
err_events = [e for e in dmesg if e['severity'] == 'error']
if err_events:
add(warnings, 'KERNEL_EVENTS',
f'{len(err_events)} recent kernel error event(s) for this interface in dmesg')
# LLDP validation
if lldp.get('available'):
sw_lldp = switch_port_data.get('lldp') or {}
sw_system = (sw_lldp.get('system_name') or '').lower()
srv_neighbor = (lldp.get('neighbor_system') or '').lower()
if sw_system and srv_neighbor and sw_system not in srv_neighbor and srv_neighbor not in sw_system:
add(warnings, 'LLDP_MISMATCH',
f'LLDP mismatch: switch sees "{sw_lldp.get("system_name")}" but '
f'server lldpctl sees "{lldp.get("neighbor_system")}" — cross-cabled port?')
else:
add(info, 'LLDP_MISSING',
'lldpd not running on server — install lldpd for full path validation')
return {'issues': issues, 'warnings': warnings, 'info': info}