2026-04-14 12:22:57 -04:00
|
|
|
"""Tests for gandalf.diagnose — all pure static methods, no external deps."""
|
|
|
|
|
import sys
|
|
|
|
|
import os
|
|
|
|
|
sys.path.insert(0, os.path.dirname(os.path.dirname(__file__)))
|
|
|
|
|
|
2026-04-25 19:53:35 -04:00
|
|
|
from diagnose import DiagnosticsRunner # noqa: E402
|
2026-04-14 12:22:57 -04:00
|
|
|
|
|
|
|
|
|
|
|
|
|
# ── build_ssh_command ────────────────────────────────────────────────────────
|
|
|
|
|
|
|
|
|
|
class TestBuildSshCommand:
|
|
|
|
|
def test_contains_stricthostkeychecking_no(self):
|
|
|
|
|
cmd = DiagnosticsRunner.build_ssh_command('10.0.0.1', 'eth0')
|
|
|
|
|
assert 'StrictHostKeyChecking=no' in cmd
|
|
|
|
|
|
|
|
|
|
def test_contains_host_ip(self):
|
|
|
|
|
cmd = DiagnosticsRunner.build_ssh_command('10.0.0.1', 'eth0')
|
|
|
|
|
assert 'root@10.0.0.1' in cmd
|
|
|
|
|
|
|
|
|
|
def test_contains_interface_name(self):
|
|
|
|
|
cmd = DiagnosticsRunner.build_ssh_command('10.0.0.1', 'eth0')
|
|
|
|
|
assert 'eth0' in cmd
|
|
|
|
|
|
|
|
|
|
def test_shell_quotes_special_chars_in_iface(self):
|
|
|
|
|
# shlex.quote wraps the whole iface string in single quotes so
|
|
|
|
|
# shell metacharacters are not interpreted as commands
|
|
|
|
|
cmd = DiagnosticsRunner.build_ssh_command('10.0.0.1', "eth0; rm -rf /")
|
|
|
|
|
# The iface must appear inside single quotes
|
|
|
|
|
assert "'eth0; rm -rf /'" in cmd
|
|
|
|
|
|
|
|
|
|
def test_contains_sysfs_stats_section(self):
|
|
|
|
|
cmd = DiagnosticsRunner.build_ssh_command('10.0.0.1', 'eth0')
|
|
|
|
|
assert 'sysfs_stats' in cmd
|
|
|
|
|
|
|
|
|
|
def test_contains_ethtool_section(self):
|
|
|
|
|
cmd = DiagnosticsRunner.build_ssh_command('10.0.0.1', 'eth0')
|
|
|
|
|
assert 'ethtool' in cmd
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# ── parse_output ─────────────────────────────────────────────────────────────
|
|
|
|
|
|
|
|
|
|
SAMPLE_OUTPUT = """\
|
|
|
|
|
=== carrier ===
|
|
|
|
|
1
|
|
|
|
|
=== operstate ===
|
|
|
|
|
up
|
|
|
|
|
=== sysfs_stats ===
|
|
|
|
|
rx_bytes:12345
|
|
|
|
|
tx_bytes:67890
|
|
|
|
|
rx_errors:0
|
|
|
|
|
tx_errors:0
|
|
|
|
|
rx_dropped:0
|
|
|
|
|
tx_dropped:0
|
|
|
|
|
rx_crc_errors:0
|
|
|
|
|
rx_frame_errors:0
|
|
|
|
|
rx_fifo_errors:0
|
|
|
|
|
tx_carrier_errors:0
|
|
|
|
|
collisions:0
|
|
|
|
|
rx_missed_errors:0
|
|
|
|
|
=== carrier_changes ===
|
|
|
|
|
3
|
|
|
|
|
=== ethtool ===
|
|
|
|
|
Speed: 1000Mb/s
|
|
|
|
|
Duplex: Full
|
|
|
|
|
Link detected: yes
|
|
|
|
|
Auto-negotiation: on
|
|
|
|
|
=== ethtool_driver ===
|
|
|
|
|
driver: igb
|
|
|
|
|
version: 5.6.0-k
|
|
|
|
|
firmware-version: 1.67, 0x80000d38
|
|
|
|
|
bus-info: 0000:03:00.0
|
|
|
|
|
=== ethtool_pause ===
|
|
|
|
|
RX: on
|
|
|
|
|
TX: off
|
|
|
|
|
=== ethtool_ring ===
|
|
|
|
|
Pre-set maximums:
|
|
|
|
|
RX: 4096
|
|
|
|
|
TX: 4096
|
|
|
|
|
Current hardware settings:
|
|
|
|
|
RX: 256
|
|
|
|
|
TX: 256
|
|
|
|
|
=== ethtool_stats ===
|
|
|
|
|
rx_packets: 1000
|
|
|
|
|
tx_packets: 2000
|
|
|
|
|
=== ethtool_dom ===
|
|
|
|
|
=== ip_link ===
|
|
|
|
|
2: eth0: <BROADCAST,MULTICAST,UP,LOWER_UP> mtu 1500
|
|
|
|
|
=== ip_addr ===
|
|
|
|
|
inet 10.0.0.1/24
|
|
|
|
|
=== ip_route ===
|
|
|
|
|
default via 10.0.0.254
|
|
|
|
|
=== dmesg ===
|
|
|
|
|
eth0: renamed from ens3
|
|
|
|
|
=== lldpctl ===
|
|
|
|
|
lldpd not running
|
|
|
|
|
=== end ===
|
|
|
|
|
"""
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class TestParseOutput:
|
|
|
|
|
def setup_method(self):
|
|
|
|
|
self.parsed = DiagnosticsRunner.parse_output(SAMPLE_OUTPUT)
|
|
|
|
|
|
|
|
|
|
def test_carrier_parsed(self):
|
|
|
|
|
assert self.parsed['carrier'] == '1'
|
|
|
|
|
|
|
|
|
|
def test_operstate_parsed(self):
|
|
|
|
|
assert self.parsed['operstate'] == 'up'
|
|
|
|
|
|
|
|
|
|
def test_carrier_changes_is_int(self):
|
|
|
|
|
assert self.parsed['carrier_changes'] == 3
|
|
|
|
|
|
|
|
|
|
def test_sysfs_stats_rx_bytes(self):
|
|
|
|
|
assert self.parsed['sysfs_stats']['rx_bytes'] == 12345
|
|
|
|
|
|
|
|
|
|
def test_sysfs_stats_tx_bytes(self):
|
|
|
|
|
assert self.parsed['sysfs_stats']['tx_bytes'] == 67890
|
|
|
|
|
|
|
|
|
|
def test_ethtool_speed(self):
|
|
|
|
|
assert self.parsed['ethtool']['speed_mbps'] == 1000
|
|
|
|
|
|
|
|
|
|
def test_ethtool_link_detected(self):
|
|
|
|
|
assert self.parsed['ethtool']['link_detected'] is True
|
|
|
|
|
|
|
|
|
|
def test_ethtool_duplex(self):
|
|
|
|
|
assert self.parsed['ethtool']['duplex'] == 'full'
|
|
|
|
|
|
|
|
|
|
def test_ethtool_auto_neg(self):
|
|
|
|
|
assert self.parsed['ethtool']['auto_neg'] is True
|
|
|
|
|
|
|
|
|
|
def test_ethtool_driver(self):
|
|
|
|
|
assert self.parsed['ethtool_driver']['driver'] == 'igb'
|
|
|
|
|
|
|
|
|
|
def test_ethtool_pause_rx_on(self):
|
|
|
|
|
assert self.parsed['ethtool_pause']['rx_pause'] is True
|
|
|
|
|
|
|
|
|
|
def test_ethtool_pause_tx_off(self):
|
|
|
|
|
assert self.parsed['ethtool_pause']['tx_pause'] is False
|
|
|
|
|
|
|
|
|
|
def test_ethtool_ring_current(self):
|
|
|
|
|
assert self.parsed['ethtool_ring']['rx_current'] == 256
|
|
|
|
|
assert self.parsed['ethtool_ring']['tx_current'] == 256
|
|
|
|
|
|
|
|
|
|
def test_ethtool_ring_max(self):
|
|
|
|
|
assert self.parsed['ethtool_ring']['rx_max'] == 4096
|
|
|
|
|
|
|
|
|
|
def test_ip_addr_present(self):
|
|
|
|
|
assert '10.0.0.1/24' in self.parsed['ip_addr']
|
|
|
|
|
|
|
|
|
|
def test_ip_route_present(self):
|
|
|
|
|
assert 'default via 10.0.0.254' in self.parsed['ip_route']
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class TestParseOutputEdgeCases:
|
|
|
|
|
def test_empty_string_returns_dict(self):
|
|
|
|
|
result = DiagnosticsRunner.parse_output('')
|
|
|
|
|
assert isinstance(result, dict)
|
|
|
|
|
assert result['carrier'] == '?'
|
|
|
|
|
assert result['operstate'] == '?'
|
|
|
|
|
assert result['carrier_changes'] == 0
|
|
|
|
|
|
|
|
|
|
def test_no_end_sentinel_still_parses(self):
|
|
|
|
|
output = "=== carrier ===\n1\n=== operstate ===\nup\n"
|
|
|
|
|
result = DiagnosticsRunner.parse_output(output)
|
|
|
|
|
assert result['carrier'] == '1'
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# ── parse_sysfs_stats ────────────────────────────────────────────────────────
|
|
|
|
|
|
|
|
|
|
class TestParseSysfsStats:
|
|
|
|
|
def test_parses_known_keys(self):
|
|
|
|
|
text = "rx_bytes:100\ntx_bytes:200\nrx_errors:0\n"
|
|
|
|
|
result = DiagnosticsRunner.parse_sysfs_stats(text)
|
|
|
|
|
assert result['rx_bytes'] == 100
|
|
|
|
|
assert result['tx_bytes'] == 200
|
|
|
|
|
assert result['rx_errors'] == 0
|
|
|
|
|
|
|
|
|
|
def test_ignores_unknown_keys(self):
|
|
|
|
|
text = "unknown_counter:999\nrx_bytes:50\n"
|
|
|
|
|
result = DiagnosticsRunner.parse_sysfs_stats(text)
|
|
|
|
|
assert 'unknown_counter' not in result
|
|
|
|
|
assert result['rx_bytes'] == 50
|
|
|
|
|
|
|
|
|
|
def test_non_numeric_value_defaults_to_zero(self):
|
|
|
|
|
text = "rx_bytes:not_a_number\n"
|
|
|
|
|
result = DiagnosticsRunner.parse_sysfs_stats(text)
|
|
|
|
|
assert result['rx_bytes'] == 0
|
|
|
|
|
|
|
|
|
|
def test_empty_text_returns_empty_dict(self):
|
|
|
|
|
assert DiagnosticsRunner.parse_sysfs_stats('') == {}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# ── parse_ethtool ─────────────────────────────────────────────────────────────
|
|
|
|
|
|
|
|
|
|
class TestParseEthtool:
|
|
|
|
|
def test_speed_parsed(self):
|
|
|
|
|
text = "Speed: 10000Mb/s\n"
|
|
|
|
|
result = DiagnosticsRunner.parse_ethtool(text)
|
|
|
|
|
assert result['speed_mbps'] == 10000
|
|
|
|
|
|
|
|
|
|
def test_unknown_speed(self):
|
|
|
|
|
text = "Speed: Unknown! (0)\n"
|
|
|
|
|
result = DiagnosticsRunner.parse_ethtool(text)
|
|
|
|
|
assert result['speed_mbps'] is None
|
|
|
|
|
|
|
|
|
|
def test_link_detected_no(self):
|
|
|
|
|
text = "Link detected: no\n"
|
|
|
|
|
result = DiagnosticsRunner.parse_ethtool(text)
|
|
|
|
|
assert result['link_detected'] is False
|
|
|
|
|
|
|
|
|
|
def test_auto_neg_off(self):
|
|
|
|
|
text = "Auto-negotiation: off\n"
|
|
|
|
|
result = DiagnosticsRunner.parse_ethtool(text)
|
|
|
|
|
assert result['auto_neg'] is False
|
|
|
|
|
|
|
|
|
|
def test_empty_returns_empty(self):
|
|
|
|
|
assert DiagnosticsRunner.parse_ethtool('') == {}
|
2026-04-25 19:53:35 -04:00
|
|
|
|
|
|
|
|
|
|
|
|
|
# ── parse_ethtool_driver ──────────────────────────────────────────────────────
|
|
|
|
|
|
|
|
|
|
class TestParseEthtoolDriver:
|
|
|
|
|
def test_driver_name(self):
|
|
|
|
|
text = "driver: igb\nversion: 5.6.0-k\nfirmware-version: 1.67\nbus-info: 0000:03:00.0\n"
|
|
|
|
|
result = DiagnosticsRunner.parse_ethtool_driver(text)
|
|
|
|
|
assert result['driver'] == 'igb'
|
|
|
|
|
|
|
|
|
|
def test_version(self):
|
|
|
|
|
text = "driver: igb\nversion: 5.6.0-k\n"
|
|
|
|
|
result = DiagnosticsRunner.parse_ethtool_driver(text)
|
|
|
|
|
assert result['version'] == '5.6.0-k'
|
|
|
|
|
|
|
|
|
|
def test_firmware_version(self):
|
|
|
|
|
text = "firmware-version: 1.67, 0x80000d38\n"
|
|
|
|
|
result = DiagnosticsRunner.parse_ethtool_driver(text)
|
|
|
|
|
assert result['firmware_version'] == '1.67, 0x80000d38'
|
|
|
|
|
|
|
|
|
|
def test_bus_info(self):
|
|
|
|
|
text = "bus-info: 0000:03:00.0\n"
|
|
|
|
|
result = DiagnosticsRunner.parse_ethtool_driver(text)
|
|
|
|
|
assert result['bus_info'] == '0000:03:00.0'
|
|
|
|
|
|
|
|
|
|
def test_empty_returns_empty(self):
|
|
|
|
|
assert DiagnosticsRunner.parse_ethtool_driver('') == {}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# ── parse_nic_stats ───────────────────────────────────────────────────────────
|
|
|
|
|
|
|
|
|
|
class TestParseNicStats:
|
|
|
|
|
def test_parses_integer_values(self):
|
|
|
|
|
text = "rx_packets: 1234\ntx_packets: 5678\n"
|
|
|
|
|
result = DiagnosticsRunner.parse_nic_stats(text)
|
|
|
|
|
assert result['rx_packets'] == 1234
|
|
|
|
|
assert result['tx_packets'] == 5678
|
|
|
|
|
|
|
|
|
|
def test_non_numeric_skipped(self):
|
|
|
|
|
text = "rx_packets: 100\nbad_stat: N/A\n"
|
|
|
|
|
result = DiagnosticsRunner.parse_nic_stats(text)
|
|
|
|
|
assert 'bad_stat' not in result
|
|
|
|
|
assert result['rx_packets'] == 100
|
|
|
|
|
|
|
|
|
|
def test_empty_returns_empty(self):
|
|
|
|
|
assert DiagnosticsRunner.parse_nic_stats('') == {}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# ── parse_ethtool_dom ─────────────────────────────────────────────────────────
|
|
|
|
|
|
|
|
|
|
class TestParseEthtoolDom:
|
|
|
|
|
def test_unsupported_returns_empty(self):
|
|
|
|
|
assert DiagnosticsRunner.parse_ethtool_dom('Cannot get module EEPROM information') == {}
|
|
|
|
|
|
|
|
|
|
def test_empty_returns_empty(self):
|
|
|
|
|
assert DiagnosticsRunner.parse_ethtool_dom('') == {}
|
|
|
|
|
|
|
|
|
|
def test_vendor_name(self):
|
|
|
|
|
text = "Vendor name : CISCO-FINISAR\n"
|
|
|
|
|
result = DiagnosticsRunner.parse_ethtool_dom(text)
|
|
|
|
|
assert result['vendor'] == 'CISCO-FINISAR'
|
|
|
|
|
|
|
|
|
|
def test_wavelength(self):
|
|
|
|
|
text = "Laser wavelength : 1310 nm\n"
|
|
|
|
|
result = DiagnosticsRunner.parse_ethtool_dom(text)
|
|
|
|
|
assert result['wavelength_nm'] == 1310
|
|
|
|
|
|
|
|
|
|
def test_tx_power_dbm(self):
|
|
|
|
|
text = "Laser output power : 0.3000 mW / -5.23 dBm\n"
|
|
|
|
|
result = DiagnosticsRunner.parse_ethtool_dom(text)
|
|
|
|
|
assert abs(result['tx_power_dbm'] - (-5.23)) < 0.01
|
|
|
|
|
|
|
|
|
|
def test_rx_power_dbm(self):
|
|
|
|
|
text = "Receiver signal average optical power : 0.1000 mW / -10.00 dBm\n"
|
|
|
|
|
result = DiagnosticsRunner.parse_ethtool_dom(text)
|
|
|
|
|
assert abs(result['rx_power_dbm'] - (-10.00)) < 0.01
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# ── parse_ip_link ─────────────────────────────────────────────────────────────
|
|
|
|
|
|
|
|
|
|
class TestParseIpLink:
|
|
|
|
|
IP_LINK_SAMPLE = """\
|
|
|
|
|
2: eth0: <BROADCAST,MULTICAST,UP,LOWER_UP> mtu 1500 qdisc mq state UP mode DEFAULT group default qlen 1000
|
|
|
|
|
link/ether aa:bb:cc:dd:ee:ff brd ff:ff:ff:ff:ff:ff
|
|
|
|
|
RX: bytes packets errors dropped missed mcast
|
|
|
|
|
123456789 1000000 0 5 0 0
|
|
|
|
|
TX: bytes packets errors dropped carrier collsns
|
|
|
|
|
98765432 900000 0 0 0 0
|
|
|
|
|
"""
|
|
|
|
|
|
|
|
|
|
def test_mtu_parsed(self):
|
|
|
|
|
result = DiagnosticsRunner.parse_ip_link(self.IP_LINK_SAMPLE)
|
|
|
|
|
assert result['mtu'] == 1500
|
|
|
|
|
|
|
|
|
|
def test_state_parsed(self):
|
|
|
|
|
result = DiagnosticsRunner.parse_ip_link(self.IP_LINK_SAMPLE)
|
|
|
|
|
assert result['state'] == 'up'
|
|
|
|
|
|
|
|
|
|
def test_rx_bytes(self):
|
|
|
|
|
result = DiagnosticsRunner.parse_ip_link(self.IP_LINK_SAMPLE)
|
|
|
|
|
assert result['ip_rx_bytes'] == 123456789
|
|
|
|
|
|
|
|
|
|
def test_tx_bytes(self):
|
|
|
|
|
result = DiagnosticsRunner.parse_ip_link(self.IP_LINK_SAMPLE)
|
|
|
|
|
assert result['ip_tx_bytes'] == 98765432
|
|
|
|
|
|
|
|
|
|
def test_rx_dropped(self):
|
|
|
|
|
result = DiagnosticsRunner.parse_ip_link(self.IP_LINK_SAMPLE)
|
|
|
|
|
assert result['ip_rx_dropped'] == 5
|
|
|
|
|
|
|
|
|
|
def test_empty_returns_empty(self):
|
|
|
|
|
assert DiagnosticsRunner.parse_ip_link('') == {}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# ── parse_dmesg ───────────────────────────────────────────────────────────────
|
|
|
|
|
|
|
|
|
|
class TestParseDmesg:
|
|
|
|
|
def test_error_severity(self):
|
|
|
|
|
text = "[ 1.234567] eth0: Link failure detected\n"
|
|
|
|
|
events = DiagnosticsRunner.parse_dmesg(text)
|
|
|
|
|
assert len(events) == 1
|
|
|
|
|
assert events[0]['severity'] == 'error'
|
|
|
|
|
|
|
|
|
|
def test_warn_severity(self):
|
|
|
|
|
text = "[ 2.000000] eth0: dropped packet\n"
|
|
|
|
|
events = DiagnosticsRunner.parse_dmesg(text)
|
|
|
|
|
assert events[0]['severity'] == 'warn'
|
|
|
|
|
|
|
|
|
|
def test_info_severity(self):
|
|
|
|
|
text = "[ 3.000000] eth0: NIC Link is Up\n"
|
|
|
|
|
events = DiagnosticsRunner.parse_dmesg(text)
|
|
|
|
|
assert events[0]['severity'] == 'info'
|
|
|
|
|
|
|
|
|
|
def test_timestamp_extracted(self):
|
|
|
|
|
text = "[ 5.678900] eth0: reset\n"
|
|
|
|
|
events = DiagnosticsRunner.parse_dmesg(text)
|
|
|
|
|
assert events[0]['timestamp'] == '5.678900'
|
|
|
|
|
|
|
|
|
|
def test_no_timestamp(self):
|
|
|
|
|
text = "eth0: some timeout event\n"
|
|
|
|
|
events = DiagnosticsRunner.parse_dmesg(text)
|
|
|
|
|
assert events[0]['timestamp'] == ''
|
|
|
|
|
assert events[0]['severity'] == 'error'
|
|
|
|
|
|
|
|
|
|
def test_empty_returns_empty_list(self):
|
|
|
|
|
assert DiagnosticsRunner.parse_dmesg('') == []
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# ── parse_lldpctl ─────────────────────────────────────────────────────────────
|
|
|
|
|
|
|
|
|
|
class TestParseLldpctl:
|
|
|
|
|
def test_unavailable_when_not_running(self):
|
|
|
|
|
result = DiagnosticsRunner.parse_lldpctl('lldpd not running')
|
|
|
|
|
assert result == {'available': False}
|
|
|
|
|
|
|
|
|
|
def test_unavailable_on_empty(self):
|
|
|
|
|
result = DiagnosticsRunner.parse_lldpctl('')
|
|
|
|
|
assert result == {'available': False}
|
|
|
|
|
|
|
|
|
|
def test_neighbor_system_parsed(self):
|
|
|
|
|
text = " SysName: core-sw-01\n PortID: Gi1/0/5\n ChassisID: aa:bb:cc:dd:ee:ff\n"
|
|
|
|
|
result = DiagnosticsRunner.parse_lldpctl(text)
|
|
|
|
|
assert result['available'] is True
|
|
|
|
|
assert result['neighbor_system'] == 'core-sw-01'
|
|
|
|
|
|
|
|
|
|
def test_neighbor_port_parsed(self):
|
|
|
|
|
text = " SysName: core-sw-01\n PortID: Gi1/0/5\n"
|
|
|
|
|
result = DiagnosticsRunner.parse_lldpctl(text)
|
|
|
|
|
assert result['neighbor_port'] == 'Gi1/0/5'
|
|
|
|
|
|
|
|
|
|
def test_chassis_id_parsed(self):
|
|
|
|
|
text = " ChassisID: aa:bb:cc:dd:ee:ff\n"
|
|
|
|
|
result = DiagnosticsRunner.parse_lldpctl(text)
|
|
|
|
|
assert result['neighbor_chassis_id'] == 'aa:bb:cc:dd:ee:ff'
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# ── analyze ───────────────────────────────────────────────────────────────────
|
|
|
|
|
|
|
|
|
|
class TestAnalyze:
|
|
|
|
|
def _sections(self, **overrides):
|
|
|
|
|
base = {
|
|
|
|
|
'carrier': '1',
|
|
|
|
|
'operstate': 'up',
|
|
|
|
|
'ethtool': {'link_detected': True, 'duplex': 'full', 'speed_mbps': 1000},
|
|
|
|
|
'sysfs_stats': {'rx_crc_errors': 0},
|
|
|
|
|
'ethtool_dom': {},
|
|
|
|
|
'dmesg': [],
|
|
|
|
|
'lldpctl': {'available': False},
|
|
|
|
|
'carrier_changes': 0,
|
|
|
|
|
}
|
|
|
|
|
base.update(overrides)
|
|
|
|
|
return base
|
|
|
|
|
|
|
|
|
|
def test_no_carrier_is_issue(self):
|
|
|
|
|
result = DiagnosticsRunner.analyze(self._sections(carrier='0'), {})
|
|
|
|
|
codes = [i['code'] for i in result['issues']]
|
|
|
|
|
assert 'NO_CARRIER' in codes
|
|
|
|
|
|
|
|
|
|
def test_half_duplex_is_issue(self):
|
|
|
|
|
sections = self._sections(ethtool={'duplex': 'half', 'link_detected': True, 'speed_mbps': 1000})
|
|
|
|
|
result = DiagnosticsRunner.analyze(sections, {})
|
|
|
|
|
codes = [i['code'] for i in result['issues']]
|
|
|
|
|
assert 'HALF_DUPLEX' in codes
|
|
|
|
|
|
|
|
|
|
def test_speed_mismatch_is_warning(self):
|
|
|
|
|
result = DiagnosticsRunner.analyze(
|
|
|
|
|
self._sections(ethtool={'duplex': 'full', 'link_detected': True, 'speed_mbps': 100}),
|
|
|
|
|
{'speed_mbps': 1000}
|
|
|
|
|
)
|
|
|
|
|
codes = [w['code'] for w in result['warnings']]
|
|
|
|
|
assert 'SPEED_MISMATCH' in codes
|
|
|
|
|
|
|
|
|
|
def test_sfp_rx_critical_low(self):
|
|
|
|
|
sections = self._sections(ethtool_dom={'rx_power_dbm': -30.0})
|
|
|
|
|
result = DiagnosticsRunner.analyze(sections, {})
|
|
|
|
|
codes = [i['code'] for i in result['issues']]
|
|
|
|
|
assert 'SFP_RX_CRITICAL' in codes
|
|
|
|
|
|
|
|
|
|
def test_sfp_rx_low_is_warning(self):
|
|
|
|
|
sections = self._sections(ethtool_dom={'rx_power_dbm': -20.0})
|
|
|
|
|
result = DiagnosticsRunner.analyze(sections, {})
|
|
|
|
|
codes = [w['code'] for w in result['warnings']]
|
|
|
|
|
assert 'SFP_RX_LOW' in codes
|
|
|
|
|
|
|
|
|
|
def test_high_crc_is_issue(self):
|
|
|
|
|
sections = self._sections(sysfs_stats={'rx_crc_errors': 200})
|
|
|
|
|
result = DiagnosticsRunner.analyze(sections, {})
|
|
|
|
|
codes = [i['code'] for i in result['issues']]
|
|
|
|
|
assert 'CRC_ERRORS_HIGH' in codes
|
|
|
|
|
|
|
|
|
|
def test_low_crc_is_warning(self):
|
|
|
|
|
sections = self._sections(sysfs_stats={'rx_crc_errors': 50})
|
|
|
|
|
result = DiagnosticsRunner.analyze(sections, {})
|
|
|
|
|
codes = [w['code'] for w in result['warnings']]
|
|
|
|
|
assert 'CRC_ERRORS_LOW' in codes
|
|
|
|
|
|
|
|
|
|
def test_carrier_flapping_issue(self):
|
|
|
|
|
result = DiagnosticsRunner.analyze(self._sections(carrier_changes=150), {})
|
|
|
|
|
codes = [i['code'] for i in result['issues']]
|
|
|
|
|
assert 'CARRIER_FLAPPING' in codes
|
|
|
|
|
|
|
|
|
|
def test_carrier_flaps_warning(self):
|
|
|
|
|
result = DiagnosticsRunner.analyze(self._sections(carrier_changes=25), {})
|
|
|
|
|
codes = [w['code'] for w in result['warnings']]
|
|
|
|
|
assert 'CARRIER_FLAPS' in codes
|
|
|
|
|
|
|
|
|
|
def test_lldp_missing_is_info(self):
|
|
|
|
|
result = DiagnosticsRunner.analyze(self._sections(lldpctl={'available': False}), {})
|
|
|
|
|
codes = [i['code'] for i in result['info']]
|
|
|
|
|
assert 'LLDP_MISSING' in codes
|
|
|
|
|
|
|
|
|
|
def test_lldp_mismatch_is_warning(self):
|
|
|
|
|
sections = self._sections(lldpctl={'available': True, 'neighbor_system': 'wrong-switch'})
|
|
|
|
|
switch_data = {'speed_mbps': 1000, 'lldp': {'system_name': 'core-sw-01'}}
|
|
|
|
|
result = DiagnosticsRunner.analyze(sections, switch_data)
|
|
|
|
|
codes = [w['code'] for w in result['warnings']]
|
|
|
|
|
assert 'LLDP_MISMATCH' in codes
|
|
|
|
|
|
|
|
|
|
def test_healthy_link_no_issues(self):
|
|
|
|
|
result = DiagnosticsRunner.analyze(self._sections(), {})
|
|
|
|
|
assert result['issues'] == []
|
|
|
|
|
assert result['warnings'] == []
|