"""Tests for gandalf.diagnose — all pure static methods, no external deps.""" import sys import os sys.path.insert(0, os.path.dirname(os.path.dirname(__file__))) from diagnose import DiagnosticsRunner # noqa: E402 # ── build_ssh_command ──────────────────────────────────────────────────────── class TestBuildSshCommand: def test_contains_stricthostkeychecking_no(self): cmd = DiagnosticsRunner.build_ssh_command('10.0.0.1', 'eth0') assert 'StrictHostKeyChecking=no' in cmd def test_contains_host_ip(self): cmd = DiagnosticsRunner.build_ssh_command('10.0.0.1', 'eth0') assert 'root@10.0.0.1' in cmd def test_contains_interface_name(self): cmd = DiagnosticsRunner.build_ssh_command('10.0.0.1', 'eth0') assert 'eth0' in cmd def test_shell_quotes_special_chars_in_iface(self): # shlex.quote wraps the whole iface string in single quotes so # shell metacharacters are not interpreted as commands cmd = DiagnosticsRunner.build_ssh_command('10.0.0.1', "eth0; rm -rf /") # The iface must appear inside single quotes assert "'eth0; rm -rf /'" in cmd def test_contains_sysfs_stats_section(self): cmd = DiagnosticsRunner.build_ssh_command('10.0.0.1', 'eth0') assert 'sysfs_stats' in cmd def test_contains_ethtool_section(self): cmd = DiagnosticsRunner.build_ssh_command('10.0.0.1', 'eth0') assert 'ethtool' in cmd # ── parse_output ───────────────────────────────────────────────────────────── SAMPLE_OUTPUT = """\ === carrier === 1 === operstate === up === sysfs_stats === rx_bytes:12345 tx_bytes:67890 rx_errors:0 tx_errors:0 rx_dropped:0 tx_dropped:0 rx_crc_errors:0 rx_frame_errors:0 rx_fifo_errors:0 tx_carrier_errors:0 collisions:0 rx_missed_errors:0 === carrier_changes === 3 === ethtool === Speed: 1000Mb/s Duplex: Full Link detected: yes Auto-negotiation: on === ethtool_driver === driver: igb version: 5.6.0-k firmware-version: 1.67, 0x80000d38 bus-info: 0000:03:00.0 === ethtool_pause === RX: on TX: off === ethtool_ring === Pre-set maximums: RX: 4096 TX: 4096 Current hardware settings: RX: 256 TX: 256 === ethtool_stats === rx_packets: 1000 tx_packets: 2000 === ethtool_dom === === ip_link === 2: eth0: mtu 1500 === ip_addr === inet 10.0.0.1/24 === ip_route === default via 10.0.0.254 === dmesg === eth0: renamed from ens3 === lldpctl === lldpd not running === end === """ class TestParseOutput: def setup_method(self): self.parsed = DiagnosticsRunner.parse_output(SAMPLE_OUTPUT) def test_carrier_parsed(self): assert self.parsed['carrier'] == '1' def test_operstate_parsed(self): assert self.parsed['operstate'] == 'up' def test_carrier_changes_is_int(self): assert self.parsed['carrier_changes'] == 3 def test_sysfs_stats_rx_bytes(self): assert self.parsed['sysfs_stats']['rx_bytes'] == 12345 def test_sysfs_stats_tx_bytes(self): assert self.parsed['sysfs_stats']['tx_bytes'] == 67890 def test_ethtool_speed(self): assert self.parsed['ethtool']['speed_mbps'] == 1000 def test_ethtool_link_detected(self): assert self.parsed['ethtool']['link_detected'] is True def test_ethtool_duplex(self): assert self.parsed['ethtool']['duplex'] == 'full' def test_ethtool_auto_neg(self): assert self.parsed['ethtool']['auto_neg'] is True def test_ethtool_driver(self): assert self.parsed['ethtool_driver']['driver'] == 'igb' def test_ethtool_pause_rx_on(self): assert self.parsed['ethtool_pause']['rx_pause'] is True def test_ethtool_pause_tx_off(self): assert self.parsed['ethtool_pause']['tx_pause'] is False def test_ethtool_ring_current(self): assert self.parsed['ethtool_ring']['rx_current'] == 256 assert self.parsed['ethtool_ring']['tx_current'] == 256 def test_ethtool_ring_max(self): assert self.parsed['ethtool_ring']['rx_max'] == 4096 def test_ip_addr_present(self): assert '10.0.0.1/24' in self.parsed['ip_addr'] def test_ip_route_present(self): assert 'default via 10.0.0.254' in self.parsed['ip_route'] class TestParseOutputEdgeCases: def test_empty_string_returns_dict(self): result = DiagnosticsRunner.parse_output('') assert isinstance(result, dict) assert result['carrier'] == '?' assert result['operstate'] == '?' assert result['carrier_changes'] == 0 def test_no_end_sentinel_still_parses(self): output = "=== carrier ===\n1\n=== operstate ===\nup\n" result = DiagnosticsRunner.parse_output(output) assert result['carrier'] == '1' # ── parse_sysfs_stats ──────────────────────────────────────────────────────── class TestParseSysfsStats: def test_parses_known_keys(self): text = "rx_bytes:100\ntx_bytes:200\nrx_errors:0\n" result = DiagnosticsRunner.parse_sysfs_stats(text) assert result['rx_bytes'] == 100 assert result['tx_bytes'] == 200 assert result['rx_errors'] == 0 def test_ignores_unknown_keys(self): text = "unknown_counter:999\nrx_bytes:50\n" result = DiagnosticsRunner.parse_sysfs_stats(text) assert 'unknown_counter' not in result assert result['rx_bytes'] == 50 def test_non_numeric_value_defaults_to_zero(self): text = "rx_bytes:not_a_number\n" result = DiagnosticsRunner.parse_sysfs_stats(text) assert result['rx_bytes'] == 0 def test_empty_text_returns_empty_dict(self): assert DiagnosticsRunner.parse_sysfs_stats('') == {} # ── parse_ethtool ───────────────────────────────────────────────────────────── class TestParseEthtool: def test_speed_parsed(self): text = "Speed: 10000Mb/s\n" result = DiagnosticsRunner.parse_ethtool(text) assert result['speed_mbps'] == 10000 def test_unknown_speed(self): text = "Speed: Unknown! (0)\n" result = DiagnosticsRunner.parse_ethtool(text) assert result['speed_mbps'] is None def test_link_detected_no(self): text = "Link detected: no\n" result = DiagnosticsRunner.parse_ethtool(text) assert result['link_detected'] is False def test_auto_neg_off(self): text = "Auto-negotiation: off\n" result = DiagnosticsRunner.parse_ethtool(text) assert result['auto_neg'] is False def test_empty_returns_empty(self): assert DiagnosticsRunner.parse_ethtool('') == {} # ── parse_ethtool_driver ────────────────────────────────────────────────────── class TestParseEthtoolDriver: def test_driver_name(self): text = "driver: igb\nversion: 5.6.0-k\nfirmware-version: 1.67\nbus-info: 0000:03:00.0\n" result = DiagnosticsRunner.parse_ethtool_driver(text) assert result['driver'] == 'igb' def test_version(self): text = "driver: igb\nversion: 5.6.0-k\n" result = DiagnosticsRunner.parse_ethtool_driver(text) assert result['version'] == '5.6.0-k' def test_firmware_version(self): text = "firmware-version: 1.67, 0x80000d38\n" result = DiagnosticsRunner.parse_ethtool_driver(text) assert result['firmware_version'] == '1.67, 0x80000d38' def test_bus_info(self): text = "bus-info: 0000:03:00.0\n" result = DiagnosticsRunner.parse_ethtool_driver(text) assert result['bus_info'] == '0000:03:00.0' def test_empty_returns_empty(self): assert DiagnosticsRunner.parse_ethtool_driver('') == {} # ── parse_nic_stats ─────────────────────────────────────────────────────────── class TestParseNicStats: def test_parses_integer_values(self): text = "rx_packets: 1234\ntx_packets: 5678\n" result = DiagnosticsRunner.parse_nic_stats(text) assert result['rx_packets'] == 1234 assert result['tx_packets'] == 5678 def test_non_numeric_skipped(self): text = "rx_packets: 100\nbad_stat: N/A\n" result = DiagnosticsRunner.parse_nic_stats(text) assert 'bad_stat' not in result assert result['rx_packets'] == 100 def test_empty_returns_empty(self): assert DiagnosticsRunner.parse_nic_stats('') == {} # ── parse_ethtool_dom ───────────────────────────────────────────────────────── class TestParseEthtoolDom: def test_unsupported_returns_empty(self): assert DiagnosticsRunner.parse_ethtool_dom('Cannot get module EEPROM information') == {} def test_empty_returns_empty(self): assert DiagnosticsRunner.parse_ethtool_dom('') == {} def test_vendor_name(self): text = "Vendor name : CISCO-FINISAR\n" result = DiagnosticsRunner.parse_ethtool_dom(text) assert result['vendor'] == 'CISCO-FINISAR' def test_wavelength(self): text = "Laser wavelength : 1310 nm\n" result = DiagnosticsRunner.parse_ethtool_dom(text) assert result['wavelength_nm'] == 1310 def test_tx_power_dbm(self): text = "Laser output power : 0.3000 mW / -5.23 dBm\n" result = DiagnosticsRunner.parse_ethtool_dom(text) assert abs(result['tx_power_dbm'] - (-5.23)) < 0.01 def test_rx_power_dbm(self): text = "Receiver signal average optical power : 0.1000 mW / -10.00 dBm\n" result = DiagnosticsRunner.parse_ethtool_dom(text) assert abs(result['rx_power_dbm'] - (-10.00)) < 0.01 # ── parse_ip_link ───────────────────────────────────────────────────────────── class TestParseIpLink: IP_LINK_SAMPLE = """\ 2: eth0: mtu 1500 qdisc mq state UP mode DEFAULT group default qlen 1000 link/ether aa:bb:cc:dd:ee:ff brd ff:ff:ff:ff:ff:ff RX: bytes packets errors dropped missed mcast 123456789 1000000 0 5 0 0 TX: bytes packets errors dropped carrier collsns 98765432 900000 0 0 0 0 """ def test_mtu_parsed(self): result = DiagnosticsRunner.parse_ip_link(self.IP_LINK_SAMPLE) assert result['mtu'] == 1500 def test_state_parsed(self): result = DiagnosticsRunner.parse_ip_link(self.IP_LINK_SAMPLE) assert result['state'] == 'up' def test_rx_bytes(self): result = DiagnosticsRunner.parse_ip_link(self.IP_LINK_SAMPLE) assert result['ip_rx_bytes'] == 123456789 def test_tx_bytes(self): result = DiagnosticsRunner.parse_ip_link(self.IP_LINK_SAMPLE) assert result['ip_tx_bytes'] == 98765432 def test_rx_dropped(self): result = DiagnosticsRunner.parse_ip_link(self.IP_LINK_SAMPLE) assert result['ip_rx_dropped'] == 5 def test_empty_returns_empty(self): assert DiagnosticsRunner.parse_ip_link('') == {} # ── parse_dmesg ─────────────────────────────────────────────────────────────── class TestParseDmesg: def test_error_severity(self): text = "[ 1.234567] eth0: Link failure detected\n" events = DiagnosticsRunner.parse_dmesg(text) assert len(events) == 1 assert events[0]['severity'] == 'error' def test_warn_severity(self): text = "[ 2.000000] eth0: dropped packet\n" events = DiagnosticsRunner.parse_dmesg(text) assert events[0]['severity'] == 'warn' def test_info_severity(self): text = "[ 3.000000] eth0: NIC Link is Up\n" events = DiagnosticsRunner.parse_dmesg(text) assert events[0]['severity'] == 'info' def test_timestamp_extracted(self): text = "[ 5.678900] eth0: reset\n" events = DiagnosticsRunner.parse_dmesg(text) assert events[0]['timestamp'] == '5.678900' def test_no_timestamp(self): text = "eth0: some timeout event\n" events = DiagnosticsRunner.parse_dmesg(text) assert events[0]['timestamp'] == '' assert events[0]['severity'] == 'error' def test_empty_returns_empty_list(self): assert DiagnosticsRunner.parse_dmesg('') == [] # ── parse_lldpctl ───────────────────────────────────────────────────────────── class TestParseLldpctl: def test_unavailable_when_not_running(self): result = DiagnosticsRunner.parse_lldpctl('lldpd not running') assert result == {'available': False} def test_unavailable_on_empty(self): result = DiagnosticsRunner.parse_lldpctl('') assert result == {'available': False} def test_neighbor_system_parsed(self): text = " SysName: core-sw-01\n PortID: Gi1/0/5\n ChassisID: aa:bb:cc:dd:ee:ff\n" result = DiagnosticsRunner.parse_lldpctl(text) assert result['available'] is True assert result['neighbor_system'] == 'core-sw-01' def test_neighbor_port_parsed(self): text = " SysName: core-sw-01\n PortID: Gi1/0/5\n" result = DiagnosticsRunner.parse_lldpctl(text) assert result['neighbor_port'] == 'Gi1/0/5' def test_chassis_id_parsed(self): text = " ChassisID: aa:bb:cc:dd:ee:ff\n" result = DiagnosticsRunner.parse_lldpctl(text) assert result['neighbor_chassis_id'] == 'aa:bb:cc:dd:ee:ff' # ── analyze ─────────────────────────────────────────────────────────────────── class TestAnalyze: def _sections(self, **overrides): base = { 'carrier': '1', 'operstate': 'up', 'ethtool': {'link_detected': True, 'duplex': 'full', 'speed_mbps': 1000}, 'sysfs_stats': {'rx_crc_errors': 0}, 'ethtool_dom': {}, 'dmesg': [], 'lldpctl': {'available': False}, 'carrier_changes': 0, } base.update(overrides) return base def test_no_carrier_is_issue(self): result = DiagnosticsRunner.analyze(self._sections(carrier='0'), {}) codes = [i['code'] for i in result['issues']] assert 'NO_CARRIER' in codes def test_half_duplex_is_issue(self): sections = self._sections(ethtool={'duplex': 'half', 'link_detected': True, 'speed_mbps': 1000}) result = DiagnosticsRunner.analyze(sections, {}) codes = [i['code'] for i in result['issues']] assert 'HALF_DUPLEX' in codes def test_speed_mismatch_is_warning(self): result = DiagnosticsRunner.analyze( self._sections(ethtool={'duplex': 'full', 'link_detected': True, 'speed_mbps': 100}), {'speed_mbps': 1000} ) codes = [w['code'] for w in result['warnings']] assert 'SPEED_MISMATCH' in codes def test_sfp_rx_critical_low(self): sections = self._sections(ethtool_dom={'rx_power_dbm': -30.0}) result = DiagnosticsRunner.analyze(sections, {}) codes = [i['code'] for i in result['issues']] assert 'SFP_RX_CRITICAL' in codes def test_sfp_rx_low_is_warning(self): sections = self._sections(ethtool_dom={'rx_power_dbm': -20.0}) result = DiagnosticsRunner.analyze(sections, {}) codes = [w['code'] for w in result['warnings']] assert 'SFP_RX_LOW' in codes def test_high_crc_is_issue(self): sections = self._sections(sysfs_stats={'rx_crc_errors': 200}) result = DiagnosticsRunner.analyze(sections, {}) codes = [i['code'] for i in result['issues']] assert 'CRC_ERRORS_HIGH' in codes def test_low_crc_is_warning(self): sections = self._sections(sysfs_stats={'rx_crc_errors': 50}) result = DiagnosticsRunner.analyze(sections, {}) codes = [w['code'] for w in result['warnings']] assert 'CRC_ERRORS_LOW' in codes def test_carrier_flapping_issue(self): result = DiagnosticsRunner.analyze(self._sections(carrier_changes=150), {}) codes = [i['code'] for i in result['issues']] assert 'CARRIER_FLAPPING' in codes def test_carrier_flaps_warning(self): result = DiagnosticsRunner.analyze(self._sections(carrier_changes=25), {}) codes = [w['code'] for w in result['warnings']] assert 'CARRIER_FLAPS' in codes def test_lldp_missing_is_info(self): result = DiagnosticsRunner.analyze(self._sections(lldpctl={'available': False}), {}) codes = [i['code'] for i in result['info']] assert 'LLDP_MISSING' in codes def test_lldp_mismatch_is_warning(self): sections = self._sections(lldpctl={'available': True, 'neighbor_system': 'wrong-switch'}) switch_data = {'speed_mbps': 1000, 'lldp': {'system_name': 'core-sw-01'}} result = DiagnosticsRunner.analyze(sections, switch_data) codes = [w['code'] for w in result['warnings']] assert 'LLDP_MISMATCH' in codes def test_healthy_link_no_issues(self): result = DiagnosticsRunner.analyze(self._sections(), {}) assert result['issues'] == [] assert result['warnings'] == []