Files
gandalf/tests/test_diagnose.py
T

480 lines
18 KiB
Python
Raw Normal View History

2026-04-14 12:22:57 -04:00
"""Tests for gandalf.diagnose — all pure static methods, no external deps."""
import sys
import os
sys.path.insert(0, os.path.dirname(os.path.dirname(__file__)))
from diagnose import DiagnosticsRunner # noqa: E402
2026-04-14 12:22:57 -04:00
# ── build_ssh_command ────────────────────────────────────────────────────────
class TestBuildSshCommand:
def test_contains_stricthostkeychecking_no(self):
cmd = DiagnosticsRunner.build_ssh_command('10.0.0.1', 'eth0')
assert 'StrictHostKeyChecking=no' in cmd
def test_contains_host_ip(self):
cmd = DiagnosticsRunner.build_ssh_command('10.0.0.1', 'eth0')
assert 'root@10.0.0.1' in cmd
def test_contains_interface_name(self):
cmd = DiagnosticsRunner.build_ssh_command('10.0.0.1', 'eth0')
assert 'eth0' in cmd
def test_shell_quotes_special_chars_in_iface(self):
# shlex.quote wraps the whole iface string in single quotes so
# shell metacharacters are not interpreted as commands
cmd = DiagnosticsRunner.build_ssh_command('10.0.0.1', "eth0; rm -rf /")
# The iface must appear inside single quotes
assert "'eth0; rm -rf /'" in cmd
def test_contains_sysfs_stats_section(self):
cmd = DiagnosticsRunner.build_ssh_command('10.0.0.1', 'eth0')
assert 'sysfs_stats' in cmd
def test_contains_ethtool_section(self):
cmd = DiagnosticsRunner.build_ssh_command('10.0.0.1', 'eth0')
assert 'ethtool' in cmd
# ── parse_output ─────────────────────────────────────────────────────────────
SAMPLE_OUTPUT = """\
=== carrier ===
1
=== operstate ===
up
=== sysfs_stats ===
rx_bytes:12345
tx_bytes:67890
rx_errors:0
tx_errors:0
rx_dropped:0
tx_dropped:0
rx_crc_errors:0
rx_frame_errors:0
rx_fifo_errors:0
tx_carrier_errors:0
collisions:0
rx_missed_errors:0
=== carrier_changes ===
3
=== ethtool ===
Speed: 1000Mb/s
Duplex: Full
Link detected: yes
Auto-negotiation: on
=== ethtool_driver ===
driver: igb
version: 5.6.0-k
firmware-version: 1.67, 0x80000d38
bus-info: 0000:03:00.0
=== ethtool_pause ===
RX: on
TX: off
=== ethtool_ring ===
Pre-set maximums:
RX: 4096
TX: 4096
Current hardware settings:
RX: 256
TX: 256
=== ethtool_stats ===
rx_packets: 1000
tx_packets: 2000
=== ethtool_dom ===
=== ip_link ===
2: eth0: <BROADCAST,MULTICAST,UP,LOWER_UP> mtu 1500
=== ip_addr ===
inet 10.0.0.1/24
=== ip_route ===
default via 10.0.0.254
=== dmesg ===
eth0: renamed from ens3
=== lldpctl ===
lldpd not running
=== end ===
"""
class TestParseOutput:
def setup_method(self):
self.parsed = DiagnosticsRunner.parse_output(SAMPLE_OUTPUT)
def test_carrier_parsed(self):
assert self.parsed['carrier'] == '1'
def test_operstate_parsed(self):
assert self.parsed['operstate'] == 'up'
def test_carrier_changes_is_int(self):
assert self.parsed['carrier_changes'] == 3
def test_sysfs_stats_rx_bytes(self):
assert self.parsed['sysfs_stats']['rx_bytes'] == 12345
def test_sysfs_stats_tx_bytes(self):
assert self.parsed['sysfs_stats']['tx_bytes'] == 67890
def test_ethtool_speed(self):
assert self.parsed['ethtool']['speed_mbps'] == 1000
def test_ethtool_link_detected(self):
assert self.parsed['ethtool']['link_detected'] is True
def test_ethtool_duplex(self):
assert self.parsed['ethtool']['duplex'] == 'full'
def test_ethtool_auto_neg(self):
assert self.parsed['ethtool']['auto_neg'] is True
def test_ethtool_driver(self):
assert self.parsed['ethtool_driver']['driver'] == 'igb'
def test_ethtool_pause_rx_on(self):
assert self.parsed['ethtool_pause']['rx_pause'] is True
def test_ethtool_pause_tx_off(self):
assert self.parsed['ethtool_pause']['tx_pause'] is False
def test_ethtool_ring_current(self):
assert self.parsed['ethtool_ring']['rx_current'] == 256
assert self.parsed['ethtool_ring']['tx_current'] == 256
def test_ethtool_ring_max(self):
assert self.parsed['ethtool_ring']['rx_max'] == 4096
def test_ip_addr_present(self):
assert '10.0.0.1/24' in self.parsed['ip_addr']
def test_ip_route_present(self):
assert 'default via 10.0.0.254' in self.parsed['ip_route']
class TestParseOutputEdgeCases:
def test_empty_string_returns_dict(self):
result = DiagnosticsRunner.parse_output('')
assert isinstance(result, dict)
assert result['carrier'] == '?'
assert result['operstate'] == '?'
assert result['carrier_changes'] == 0
def test_no_end_sentinel_still_parses(self):
output = "=== carrier ===\n1\n=== operstate ===\nup\n"
result = DiagnosticsRunner.parse_output(output)
assert result['carrier'] == '1'
# ── parse_sysfs_stats ────────────────────────────────────────────────────────
class TestParseSysfsStats:
def test_parses_known_keys(self):
text = "rx_bytes:100\ntx_bytes:200\nrx_errors:0\n"
result = DiagnosticsRunner.parse_sysfs_stats(text)
assert result['rx_bytes'] == 100
assert result['tx_bytes'] == 200
assert result['rx_errors'] == 0
def test_ignores_unknown_keys(self):
text = "unknown_counter:999\nrx_bytes:50\n"
result = DiagnosticsRunner.parse_sysfs_stats(text)
assert 'unknown_counter' not in result
assert result['rx_bytes'] == 50
def test_non_numeric_value_defaults_to_zero(self):
text = "rx_bytes:not_a_number\n"
result = DiagnosticsRunner.parse_sysfs_stats(text)
assert result['rx_bytes'] == 0
def test_empty_text_returns_empty_dict(self):
assert DiagnosticsRunner.parse_sysfs_stats('') == {}
# ── parse_ethtool ─────────────────────────────────────────────────────────────
class TestParseEthtool:
def test_speed_parsed(self):
text = "Speed: 10000Mb/s\n"
result = DiagnosticsRunner.parse_ethtool(text)
assert result['speed_mbps'] == 10000
def test_unknown_speed(self):
text = "Speed: Unknown! (0)\n"
result = DiagnosticsRunner.parse_ethtool(text)
assert result['speed_mbps'] is None
def test_link_detected_no(self):
text = "Link detected: no\n"
result = DiagnosticsRunner.parse_ethtool(text)
assert result['link_detected'] is False
def test_auto_neg_off(self):
text = "Auto-negotiation: off\n"
result = DiagnosticsRunner.parse_ethtool(text)
assert result['auto_neg'] is False
def test_empty_returns_empty(self):
assert DiagnosticsRunner.parse_ethtool('') == {}
# ── parse_ethtool_driver ──────────────────────────────────────────────────────
class TestParseEthtoolDriver:
def test_driver_name(self):
text = "driver: igb\nversion: 5.6.0-k\nfirmware-version: 1.67\nbus-info: 0000:03:00.0\n"
result = DiagnosticsRunner.parse_ethtool_driver(text)
assert result['driver'] == 'igb'
def test_version(self):
text = "driver: igb\nversion: 5.6.0-k\n"
result = DiagnosticsRunner.parse_ethtool_driver(text)
assert result['version'] == '5.6.0-k'
def test_firmware_version(self):
text = "firmware-version: 1.67, 0x80000d38\n"
result = DiagnosticsRunner.parse_ethtool_driver(text)
assert result['firmware_version'] == '1.67, 0x80000d38'
def test_bus_info(self):
text = "bus-info: 0000:03:00.0\n"
result = DiagnosticsRunner.parse_ethtool_driver(text)
assert result['bus_info'] == '0000:03:00.0'
def test_empty_returns_empty(self):
assert DiagnosticsRunner.parse_ethtool_driver('') == {}
# ── parse_nic_stats ───────────────────────────────────────────────────────────
class TestParseNicStats:
def test_parses_integer_values(self):
text = "rx_packets: 1234\ntx_packets: 5678\n"
result = DiagnosticsRunner.parse_nic_stats(text)
assert result['rx_packets'] == 1234
assert result['tx_packets'] == 5678
def test_non_numeric_skipped(self):
text = "rx_packets: 100\nbad_stat: N/A\n"
result = DiagnosticsRunner.parse_nic_stats(text)
assert 'bad_stat' not in result
assert result['rx_packets'] == 100
def test_empty_returns_empty(self):
assert DiagnosticsRunner.parse_nic_stats('') == {}
# ── parse_ethtool_dom ─────────────────────────────────────────────────────────
class TestParseEthtoolDom:
def test_unsupported_returns_empty(self):
assert DiagnosticsRunner.parse_ethtool_dom('Cannot get module EEPROM information') == {}
def test_empty_returns_empty(self):
assert DiagnosticsRunner.parse_ethtool_dom('') == {}
def test_vendor_name(self):
text = "Vendor name : CISCO-FINISAR\n"
result = DiagnosticsRunner.parse_ethtool_dom(text)
assert result['vendor'] == 'CISCO-FINISAR'
def test_wavelength(self):
text = "Laser wavelength : 1310 nm\n"
result = DiagnosticsRunner.parse_ethtool_dom(text)
assert result['wavelength_nm'] == 1310
def test_tx_power_dbm(self):
text = "Laser output power : 0.3000 mW / -5.23 dBm\n"
result = DiagnosticsRunner.parse_ethtool_dom(text)
assert abs(result['tx_power_dbm'] - (-5.23)) < 0.01
def test_rx_power_dbm(self):
text = "Receiver signal average optical power : 0.1000 mW / -10.00 dBm\n"
result = DiagnosticsRunner.parse_ethtool_dom(text)
assert abs(result['rx_power_dbm'] - (-10.00)) < 0.01
# ── parse_ip_link ─────────────────────────────────────────────────────────────
class TestParseIpLink:
IP_LINK_SAMPLE = """\
2: eth0: <BROADCAST,MULTICAST,UP,LOWER_UP> mtu 1500 qdisc mq state UP mode DEFAULT group default qlen 1000
link/ether aa:bb:cc:dd:ee:ff brd ff:ff:ff:ff:ff:ff
RX: bytes packets errors dropped missed mcast
123456789 1000000 0 5 0 0
TX: bytes packets errors dropped carrier collsns
98765432 900000 0 0 0 0
"""
def test_mtu_parsed(self):
result = DiagnosticsRunner.parse_ip_link(self.IP_LINK_SAMPLE)
assert result['mtu'] == 1500
def test_state_parsed(self):
result = DiagnosticsRunner.parse_ip_link(self.IP_LINK_SAMPLE)
assert result['state'] == 'up'
def test_rx_bytes(self):
result = DiagnosticsRunner.parse_ip_link(self.IP_LINK_SAMPLE)
assert result['ip_rx_bytes'] == 123456789
def test_tx_bytes(self):
result = DiagnosticsRunner.parse_ip_link(self.IP_LINK_SAMPLE)
assert result['ip_tx_bytes'] == 98765432
def test_rx_dropped(self):
result = DiagnosticsRunner.parse_ip_link(self.IP_LINK_SAMPLE)
assert result['ip_rx_dropped'] == 5
def test_empty_returns_empty(self):
assert DiagnosticsRunner.parse_ip_link('') == {}
# ── parse_dmesg ───────────────────────────────────────────────────────────────
class TestParseDmesg:
def test_error_severity(self):
text = "[ 1.234567] eth0: Link failure detected\n"
events = DiagnosticsRunner.parse_dmesg(text)
assert len(events) == 1
assert events[0]['severity'] == 'error'
def test_warn_severity(self):
text = "[ 2.000000] eth0: dropped packet\n"
events = DiagnosticsRunner.parse_dmesg(text)
assert events[0]['severity'] == 'warn'
def test_info_severity(self):
text = "[ 3.000000] eth0: NIC Link is Up\n"
events = DiagnosticsRunner.parse_dmesg(text)
assert events[0]['severity'] == 'info'
def test_timestamp_extracted(self):
text = "[ 5.678900] eth0: reset\n"
events = DiagnosticsRunner.parse_dmesg(text)
assert events[0]['timestamp'] == '5.678900'
def test_no_timestamp(self):
text = "eth0: some timeout event\n"
events = DiagnosticsRunner.parse_dmesg(text)
assert events[0]['timestamp'] == ''
assert events[0]['severity'] == 'error'
def test_empty_returns_empty_list(self):
assert DiagnosticsRunner.parse_dmesg('') == []
# ── parse_lldpctl ─────────────────────────────────────────────────────────────
class TestParseLldpctl:
def test_unavailable_when_not_running(self):
result = DiagnosticsRunner.parse_lldpctl('lldpd not running')
assert result == {'available': False}
def test_unavailable_on_empty(self):
result = DiagnosticsRunner.parse_lldpctl('')
assert result == {'available': False}
def test_neighbor_system_parsed(self):
text = " SysName: core-sw-01\n PortID: Gi1/0/5\n ChassisID: aa:bb:cc:dd:ee:ff\n"
result = DiagnosticsRunner.parse_lldpctl(text)
assert result['available'] is True
assert result['neighbor_system'] == 'core-sw-01'
def test_neighbor_port_parsed(self):
text = " SysName: core-sw-01\n PortID: Gi1/0/5\n"
result = DiagnosticsRunner.parse_lldpctl(text)
assert result['neighbor_port'] == 'Gi1/0/5'
def test_chassis_id_parsed(self):
text = " ChassisID: aa:bb:cc:dd:ee:ff\n"
result = DiagnosticsRunner.parse_lldpctl(text)
assert result['neighbor_chassis_id'] == 'aa:bb:cc:dd:ee:ff'
# ── analyze ───────────────────────────────────────────────────────────────────
class TestAnalyze:
def _sections(self, **overrides):
base = {
'carrier': '1',
'operstate': 'up',
'ethtool': {'link_detected': True, 'duplex': 'full', 'speed_mbps': 1000},
'sysfs_stats': {'rx_crc_errors': 0},
'ethtool_dom': {},
'dmesg': [],
'lldpctl': {'available': False},
'carrier_changes': 0,
}
base.update(overrides)
return base
def test_no_carrier_is_issue(self):
result = DiagnosticsRunner.analyze(self._sections(carrier='0'), {})
codes = [i['code'] for i in result['issues']]
assert 'NO_CARRIER' in codes
def test_half_duplex_is_issue(self):
sections = self._sections(ethtool={'duplex': 'half', 'link_detected': True, 'speed_mbps': 1000})
result = DiagnosticsRunner.analyze(sections, {})
codes = [i['code'] for i in result['issues']]
assert 'HALF_DUPLEX' in codes
def test_speed_mismatch_is_warning(self):
result = DiagnosticsRunner.analyze(
self._sections(ethtool={'duplex': 'full', 'link_detected': True, 'speed_mbps': 100}),
{'speed_mbps': 1000}
)
codes = [w['code'] for w in result['warnings']]
assert 'SPEED_MISMATCH' in codes
def test_sfp_rx_critical_low(self):
sections = self._sections(ethtool_dom={'rx_power_dbm': -30.0})
result = DiagnosticsRunner.analyze(sections, {})
codes = [i['code'] for i in result['issues']]
assert 'SFP_RX_CRITICAL' in codes
def test_sfp_rx_low_is_warning(self):
sections = self._sections(ethtool_dom={'rx_power_dbm': -20.0})
result = DiagnosticsRunner.analyze(sections, {})
codes = [w['code'] for w in result['warnings']]
assert 'SFP_RX_LOW' in codes
def test_high_crc_is_issue(self):
sections = self._sections(sysfs_stats={'rx_crc_errors': 200})
result = DiagnosticsRunner.analyze(sections, {})
codes = [i['code'] for i in result['issues']]
assert 'CRC_ERRORS_HIGH' in codes
def test_low_crc_is_warning(self):
sections = self._sections(sysfs_stats={'rx_crc_errors': 50})
result = DiagnosticsRunner.analyze(sections, {})
codes = [w['code'] for w in result['warnings']]
assert 'CRC_ERRORS_LOW' in codes
def test_carrier_flapping_issue(self):
result = DiagnosticsRunner.analyze(self._sections(carrier_changes=150), {})
codes = [i['code'] for i in result['issues']]
assert 'CARRIER_FLAPPING' in codes
def test_carrier_flaps_warning(self):
result = DiagnosticsRunner.analyze(self._sections(carrier_changes=25), {})
codes = [w['code'] for w in result['warnings']]
assert 'CARRIER_FLAPS' in codes
def test_lldp_missing_is_info(self):
result = DiagnosticsRunner.analyze(self._sections(lldpctl={'available': False}), {})
codes = [i['code'] for i in result['info']]
assert 'LLDP_MISSING' in codes
def test_lldp_mismatch_is_warning(self):
sections = self._sections(lldpctl={'available': True, 'neighbor_system': 'wrong-switch'})
switch_data = {'speed_mbps': 1000, 'lldp': {'system_name': 'core-sw-01'}}
result = DiagnosticsRunner.analyze(sections, switch_data)
codes = [w['code'] for w in result['warnings']]
assert 'LLDP_MISMATCH' in codes
def test_healthy_link_no_issues(self):
result = DiagnosticsRunner.analyze(self._sections(), {})
assert result['issues'] == []
assert result['warnings'] == []