Files
gandalf/diagnose.py
Jared Vititoe b29b70d88b Improve Pulse execution reliability: retry logic, better logging, SSH hardening
monitor.py / diagnose.py PulseClient.run_command:
- Add automatic single retry on submit failure, explicit Pulse failure
  (status=failed/timed_out), and poll timeout — handles transient SSH
  or Pulse hiccups without dropping the whole collection cycle
- Log execution_id and full Pulse URL on every failure so failed runs
  can be found in the Pulse UI immediately
- Handle 'timed_out' and 'cancelled' Pulse statuses explicitly (previously
  only 'failed' was caught; others would spin until local deadline)
- Poll every 2s instead of 1s to reduce Pulse API chatter

SSH command options (_ssh_batch + diagnose.py):
- Add BatchMode=yes: aborts immediately instead of hanging on a
  password prompt if key auth fails
- Add ServerAliveInterval=10 ServerAliveCountMax=2: SSH detects a
  hung remote command within ~20s instead of sitting silent until the
  45s Pulse timeout expires

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-15 09:19:07 -04:00

550 lines
21 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""Gandalf Link Diagnostics module.
Runs a comprehensive SSH-based diagnostic against a server NIC and
analyses the result against switch port data to surface root causes.
Executed in a background thread; results stored in _diag_jobs (app.py).
"""
import re
import shlex
import time
import logging
from typing import Dict, List, Optional, Tuple
logger = logging.getLogger('gandalf.diagnose')
# sysfs counters collected per interface
_SYSFS_STATS = [
'rx_bytes', 'tx_bytes', 'rx_errors', 'tx_errors',
'rx_dropped', 'tx_dropped', 'rx_crc_errors',
'rx_frame_errors', 'rx_fifo_errors', 'tx_carrier_errors',
'collisions', 'rx_missed_errors',
]
class DiagnosticsRunner:
"""Build and run a link diagnostic against a server NIC via PulseClient."""
def __init__(self, pulse_client):
self.pulse = pulse_client
# ------------------------------------------------------------------
# SSH command builder
# ------------------------------------------------------------------
@staticmethod
def build_ssh_command(host_ip: str, iface: str) -> str:
"""Return a single-line SSH command that collects all diagnostic data."""
q = shlex.quote(iface)
ip_q = shlex.quote(host_ip)
sysfs_loop = '; '.join(
f'echo "{s}:$(cat /sys/class/net/{q}/statistics/{s} 2>/dev/null || echo 0)"'
for s in _SYSFS_STATS
)
remote_cmd = (
f'echo "=== carrier ===";'
f' cat /sys/class/net/{q}/carrier 2>/dev/null || echo "?";'
f' echo "=== operstate ===";'
f' cat /sys/class/net/{q}/operstate 2>/dev/null || echo "?";'
f' echo "=== sysfs_stats ===";'
f' {sysfs_loop};'
f' echo "=== carrier_changes ===";'
f' cat /sys/class/net/{q}/carrier_changes 2>/dev/null || echo "0";'
f' echo "=== ethtool ===";'
f' ethtool {q} 2>/dev/null;'
f' echo "=== ethtool_driver ===";'
f' ethtool -i {q} 2>/dev/null;'
f' echo "=== ethtool_pause ===";'
f' ethtool -a {q} 2>/dev/null;'
f' echo "=== ethtool_ring ===";'
f' ethtool -g {q} 2>/dev/null;'
f' echo "=== ethtool_stats ===";'
f' ethtool -S {q} 2>/dev/null;'
f' echo "=== ethtool_dom ===";'
f' ethtool -m {q} 2>/dev/null;'
f' echo "=== ip_link ===";'
f' ip -s link show {q} 2>/dev/null;'
f' echo "=== ip_addr ===";'
f' ip addr show {q} 2>/dev/null;'
f' echo "=== ip_route ===";'
f' ip route show dev {q} 2>/dev/null;'
f' echo "=== dmesg ===";'
f' dmesg 2>/dev/null | grep {q} | tail -50;'
f' echo "=== lldpctl ===";'
f' lldpctl 2>/dev/null || echo "lldpd not running";'
f' echo "=== end ==="'
)
return (
f'ssh -o StrictHostKeyChecking=no -o ConnectTimeout=5 '
f'-o BatchMode=yes -o LogLevel=ERROR '
f'-o ServerAliveInterval=10 -o ServerAliveCountMax=2 '
f'root@{ip_q} \'{remote_cmd}\''
)
# ------------------------------------------------------------------
# Main entry point
# ------------------------------------------------------------------
def run(self, host_ip: str, host_name: str, iface: str,
switch_port_data: dict) -> dict:
"""Execute diagnostics and return structured result dict."""
cmd = self.build_ssh_command(host_ip, iface)
logger.info(f'Running link diagnostic: {host_name}/{iface} via Pulse')
# Reset execution_id before call
self.pulse.last_execution_id = None
output = self.pulse.run_command(cmd)
execution_id = getattr(self.pulse, 'last_execution_id', None)
if output is None:
return {
'status': 'error',
'error': 'Pulse command failed or timed out',
'host': host_name,
'iface': iface,
'pulse_execution_id': execution_id,
}
sections = self.parse_output(output)
health = self.analyze(sections, switch_port_data)
pulse_url = None
if execution_id:
base = getattr(self.pulse, 'url', '').rstrip('/')
pulse_url = f'{base}/executions/{execution_id}' if base else None
return {
'status': 'done',
'host': host_name,
'iface': iface,
'sections': sections,
'health': health,
'pulse_execution_id': execution_id,
'pulse_url': pulse_url,
'switch_port': switch_port_data,
}
# ------------------------------------------------------------------
# Output parser (splits on === SECTION_NAME === sentinels)
# ------------------------------------------------------------------
@staticmethod
def parse_output(raw: str) -> dict:
sections: Dict[str, str] = {}
current: Optional[str] = None
buf: List[str] = []
for line in raw.splitlines():
m = re.match(r'^=== (.+?) ===$', line.strip())
if m:
if current and current != 'end':
sections[current] = '\n'.join(buf).strip()
name = m.group(1)
if name == 'end':
current = None
else:
current = name
buf = []
elif current:
buf.append(line)
if current and current != 'end':
sections[current] = '\n'.join(buf).strip()
parsed: dict = {}
# Simple string sections
parsed['carrier'] = sections.get('carrier', '?').strip()
parsed['operstate'] = sections.get('operstate', '?').strip()
# carrier_changes
cc_raw = sections.get('carrier_changes', '0').strip()
try:
parsed['carrier_changes'] = int(cc_raw)
except ValueError:
parsed['carrier_changes'] = None
# Structured sections
parsed['sysfs_stats'] = DiagnosticsRunner.parse_sysfs_stats(sections.get('sysfs_stats', ''))
parsed['ethtool'] = DiagnosticsRunner.parse_ethtool(sections.get('ethtool', ''))
parsed['ethtool_driver'] = DiagnosticsRunner.parse_ethtool_driver(sections.get('ethtool_driver', ''))
parsed['ethtool_pause'] = DiagnosticsRunner.parse_ethtool_pause(sections.get('ethtool_pause', ''))
parsed['ethtool_ring'] = DiagnosticsRunner.parse_ethtool_ring(sections.get('ethtool_ring', ''))
parsed['ethtool_stats'] = DiagnosticsRunner.parse_nic_stats(sections.get('ethtool_stats', ''))
parsed['ethtool_dom'] = DiagnosticsRunner.parse_ethtool_dom(sections.get('ethtool_dom', ''))
parsed['ip_link'] = DiagnosticsRunner.parse_ip_link(sections.get('ip_link', ''))
parsed['ip_addr'] = sections.get('ip_addr', '').strip()
parsed['ip_route'] = sections.get('ip_route', '').strip()
parsed['dmesg'] = DiagnosticsRunner.parse_dmesg(sections.get('dmesg', ''))
parsed['lldpctl'] = DiagnosticsRunner.parse_lldpctl(sections.get('lldpctl', ''))
return parsed
# ------------------------------------------------------------------
# Individual parsers
# ------------------------------------------------------------------
@staticmethod
def parse_sysfs_stats(text: str) -> dict:
result: dict = {}
for line in text.splitlines():
if ':' not in line:
continue
key, _, val = line.partition(':')
key = key.strip()
val = val.strip()
if key in _SYSFS_STATS:
try:
result[key] = int(val)
except ValueError:
result[key] = 0
return result
@staticmethod
def parse_ethtool(text: str) -> dict:
"""Parse ethtool <iface> output."""
data: dict = {}
for line in text.splitlines():
if ':' not in line:
continue
key, _, val = line.partition(':')
key = key.strip()
val = val.strip()
if key == 'Speed':
m = re.match(r'(\d+)\s*Mb/s', val)
if m:
data['speed_mbps'] = int(m.group(1))
elif 'Unknown' in val or 'unknown' in val:
data['speed_mbps'] = None
elif key == 'Duplex':
data['duplex'] = val.lower()
elif key == 'Port':
data['port_type'] = val
elif key == 'Auto-negotiation':
data['auto_neg'] = (val.lower() == 'on')
elif key == 'Link detected':
data['link_detected'] = (val.lower() == 'yes')
elif 'Supported link modes' in key:
data.setdefault('supported_modes', []).append(val)
return data
@staticmethod
def parse_ethtool_driver(text: str) -> dict:
data: dict = {}
for line in text.splitlines():
if ':' not in line:
continue
key, _, val = line.partition(':')
key = key.strip()
val = val.strip()
if key == 'driver':
data['driver'] = val
elif key == 'version':
data['version'] = val
elif key == 'firmware-version':
data['firmware_version'] = val
elif key == 'bus-info':
data['bus_info'] = val
return data
@staticmethod
def parse_ethtool_pause(text: str) -> dict:
data = {'rx_pause': False, 'tx_pause': False}
for line in text.splitlines():
if ':' not in line:
continue
key, _, val = line.partition(':')
key = key.strip()
val = val.strip().lower()
if key == 'RX':
data['rx_pause'] = (val == 'on')
elif key == 'TX':
data['tx_pause'] = (val == 'on')
return data
@staticmethod
def parse_ethtool_ring(text: str) -> dict:
data: dict = {}
in_current = False
for line in text.splitlines():
if 'Current hardware settings' in line:
in_current = True
continue
if 'Pre-set maximums' in line:
in_current = False
continue
if ':' not in line:
continue
key, _, val = line.partition(':')
key = key.strip()
val = val.strip()
try:
v = int(val)
except ValueError:
continue
if in_current:
if 'RX' in key and 'rx_current' not in data:
data['rx_current'] = v
elif 'TX' in key and 'tx_current' not in data:
data['tx_current'] = v
else:
if 'RX' in key and 'rx_max' not in data:
data['rx_max'] = v
elif 'TX' in key and 'tx_max' not in data:
data['tx_max'] = v
return data
@staticmethod
def parse_nic_stats(text: str) -> dict:
"""Parse ethtool -S output into {key: int} dict."""
data: dict = {}
for line in text.splitlines():
if ':' not in line:
continue
key, _, val = line.partition(':')
key = key.strip()
val = val.strip()
try:
data[key] = int(val)
except ValueError:
pass
return data
@staticmethod
def parse_ethtool_dom(text: str) -> dict:
"""Parse ethtool -m (SFP DOM) output."""
if not text:
return {}
lower = text.lower()
if any(s in lower for s in ('cannot get', 'not supported', 'no sfp', 'operation not supported')):
return {}
data: dict = {}
for line in text.splitlines():
if ':' not in line:
continue
key, _, val = line.partition(':')
key = key.strip()
val = val.strip()
if key == 'Vendor name':
data['vendor'] = val
elif key == 'Vendor PN':
data['part_no'] = val
elif key == 'Identifier':
m = re.search(r'\((.+?)\)', val)
if m:
data['sfp_type'] = m.group(1)
elif key == 'Connector':
m = re.search(r'\((.+?)\)', val)
if m:
data['connector'] = m.group(1)
elif key == 'Laser wavelength':
m = re.match(r'(\d+)', val)
if m:
data['wavelength_nm'] = int(m.group(1))
elif key == 'Laser bias current':
m = re.match(r'([\d.]+)\s+mA', val)
if m:
data['bias_ma'] = float(m.group(1))
elif key == 'Laser output power':
m = re.search(r'/\s*([-\d.]+)\s*dBm', val)
if m:
try:
data['tx_power_dbm'] = float(m.group(1))
except ValueError:
pass
elif 'receiver' in key.lower() and ('power' in key.lower() or 'optical' in key.lower()):
m = re.search(r'/\s*([-\d.]+)\s*dBm', val)
if m:
try:
data['rx_power_dbm'] = float(m.group(1))
except ValueError:
pass
elif key == 'Module temperature':
m = re.match(r'([\d.]+)\s+degrees', val)
if m:
data['temp_c'] = float(m.group(1))
elif key == 'Module voltage':
m = re.match(r'([\d.]+)\s+V', val)
if m:
data['voltage_v'] = float(m.group(1))
return data
@staticmethod
def parse_ip_link(text: str) -> dict:
"""Parse ip -s link show output for basic link state and counters."""
data: dict = {}
lines = text.splitlines()
for i, line in enumerate(lines):
# MTU and state: "2: eth0: <BROADCAST,MULTICAST,UP,LOWER_UP> mtu 1500 ..."
m = re.search(r'mtu\s+(\d+)', line)
if m:
data['mtu'] = int(m.group(1))
m = re.search(r'state\s+(\S+)', line)
if m:
data['state'] = m.group(1).lower()
# RX line follows "RX:" label
if line.strip().startswith('RX:') and i + 1 < len(lines):
vals = lines[i + 1].split()
if len(vals) >= 5:
try:
data['ip_rx_bytes'] = int(vals[0])
data['ip_rx_packets'] = int(vals[1])
data['ip_rx_errors'] = int(vals[2])
data['ip_rx_dropped'] = int(vals[3])
except (ValueError, IndexError):
pass
if line.strip().startswith('TX:') and i + 1 < len(lines):
vals = lines[i + 1].split()
if len(vals) >= 5:
try:
data['ip_tx_bytes'] = int(vals[0])
data['ip_tx_packets'] = int(vals[1])
data['ip_tx_errors'] = int(vals[2])
data['ip_tx_dropped'] = int(vals[3])
except (ValueError, IndexError):
pass
return data
@staticmethod
def parse_dmesg(text: str) -> List[dict]:
"""Parse dmesg lines into [{timestamp, msg, severity}]."""
events = []
for line in text.splitlines():
if not line.strip():
continue
# Extract timestamp from [ 123.456789]
m = re.match(r'^\[\s*([\d.]+)\]\s*(.*)', line)
if m:
ts = m.group(1)
msg = m.group(2)
else:
ts = ''
msg = line
lower = msg.lower()
if any(w in lower for w in ('error', 'fail', 'reset', 'panic', 'oops', 'hung', 'timeout')):
severity = 'error'
elif any(w in lower for w in ('warn', 'drop', 'lost', 'miss')):
severity = 'warn'
else:
severity = 'info'
events.append({'timestamp': ts, 'msg': msg, 'severity': severity})
return events
@staticmethod
def parse_lldpctl(text: str) -> dict:
"""Extract neighbor info from lldpctl output."""
if not text or 'lldpd not running' in text or 'not found' in text.lower():
return {'available': False}
data: dict = {'available': True}
for line in text.splitlines():
if ':' not in line:
continue
key, _, val = line.partition(':')
key = key.strip()
val = val.strip()
if 'SysName' in key:
data['neighbor_system'] = val
elif 'PortID' in key and 'neighbor_port' not in data:
data['neighbor_port'] = val
elif 'ChassisID' in key and 'neighbor_chassis_id' not in data:
data['neighbor_chassis_id'] = val
return data
# ------------------------------------------------------------------
# Health analysis
# ------------------------------------------------------------------
@staticmethod
def analyze(sections: dict, switch_port_data: dict) -> dict:
"""Return {issues: [...], warnings: [...], info: [...]} health analysis."""
issues: List[dict] = []
warnings: List[dict] = []
info: List[dict] = []
def add(collection, code, message):
collection.append({'code': code, 'message': message})
carrier = sections.get('carrier', '?')
eth = sections.get('ethtool', {})
sysfs = sections.get('sysfs_stats', {})
dom = sections.get('ethtool_dom', {})
dmesg = sections.get('dmesg', [])
lldp = sections.get('lldpctl', {})
cc = sections.get('carrier_changes')
# Physical carrier
if carrier == '0':
add(issues, 'NO_CARRIER',
'No physical carrier — cable/SFP disconnected or switch port disabled')
elif eth.get('link_detected') is False and carrier != '0':
add(issues, 'LINK_NOT_DETECTED',
'NIC does not detect link signal despite carrier sysfs reading non-zero')
# Duplex
if eth.get('duplex') == 'half':
add(issues, 'HALF_DUPLEX',
'Half-duplex detected — likely duplex mismatch; force full-duplex on both ends')
# Speed mismatch (switch vs server NIC)
sw_speed = switch_port_data.get('speed_mbps', 0) or 0
srv_speed = eth.get('speed_mbps', 0) or 0
if sw_speed > 0 and srv_speed > 0 and sw_speed != srv_speed:
add(warnings, 'SPEED_MISMATCH',
f'Speed mismatch: switch reports {sw_speed} Mbps, NIC reports {srv_speed} Mbps')
# SFP DOM power levels
rx_dbm = dom.get('rx_power_dbm')
tx_dbm = dom.get('tx_power_dbm')
if rx_dbm is not None:
if rx_dbm < -25:
add(issues, 'SFP_RX_CRITICAL',
f'RX power critically low ({rx_dbm:.2f} dBm) — fiber not connected or SFP failed')
elif rx_dbm < -18:
add(warnings, 'SFP_RX_LOW',
f'RX power low ({rx_dbm:.2f} dBm) — check fiber cleanliness and SFP seating')
if tx_dbm is not None and tx_dbm < -10:
add(warnings, 'SFP_TX_LOW',
f'TX power low ({tx_dbm:.2f} dBm) — SFP may be failing or requires cleaning')
# Carrier changes (flapping)
if cc is not None:
if cc > 100:
add(issues, 'CARRIER_FLAPPING',
f'Link has flapped {cc} times — severe physical instability')
elif cc > 20:
add(warnings, 'CARRIER_FLAPS',
f'Link has flapped {cc} times — intermittent physical issue')
# CRC errors
crc = sysfs.get('rx_crc_errors', 0) or 0
if crc > 100:
add(issues, 'CRC_ERRORS_HIGH',
f'High CRC error count ({crc}) — dirty fiber/connector or cable damage')
elif crc > 10:
add(warnings, 'CRC_ERRORS_LOW',
f'CRC errors present ({crc}) — cable or SFP quality issue')
# Kernel events
err_events = [e for e in dmesg if e['severity'] == 'error']
if err_events:
add(warnings, 'KERNEL_EVENTS',
f'{len(err_events)} recent kernel error event(s) for this interface in dmesg')
# LLDP validation
if lldp.get('available'):
sw_lldp = switch_port_data.get('lldp') or {}
sw_system = (sw_lldp.get('system_name') or '').lower()
srv_neighbor = (lldp.get('neighbor_system') or '').lower()
if sw_system and srv_neighbor and sw_system not in srv_neighbor and srv_neighbor not in sw_system:
add(warnings, 'LLDP_MISMATCH',
f'LLDP mismatch: switch sees "{sw_lldp.get("system_name")}" but '
f'server lldpctl sees "{lldp.get("neighbor_system")}" — cross-cabled port?')
else:
add(info, 'LLDP_MISSING',
'lldpd not running on server — install lldpd for full path validation')
return {'issues': issues, 'warnings': warnings, 'info': info}