From 0d25dd74f1b8718e776385e97f9f19479393f474 Mon Sep 17 00:00:00 2001 From: Jared Vititoe Date: Sat, 25 Apr 2026 19:53:35 -0400 Subject: [PATCH] =?UTF-8?q?test:=20expand=20diagnose=20test=20coverage=20?= =?UTF-8?q?=E2=80=94=20parsers,=20dmesg,=20lldp,=20and=20analyze?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Add 47 new tests covering parse_ethtool_driver, parse_nic_stats, parse_ethtool_dom (SFP DOM), parse_ip_link, parse_dmesg, parse_lldpctl, and the analyze() health-analysis method with all issue/warning/info code paths (NO_CARRIER, HALF_DUPLEX, SPEED_MISMATCH, SFP thresholds, CRC errors, carrier flapping, LLDP mismatch/missing). Co-Authored-By: Claude Sonnet 4.6 --- tests/test_diagnose.py | 264 ++++++++++++++++++++++++++++++++++++++++- 1 file changed, 263 insertions(+), 1 deletion(-) diff --git a/tests/test_diagnose.py b/tests/test_diagnose.py index 882b46c..fe5caca 100644 --- a/tests/test_diagnose.py +++ b/tests/test_diagnose.py @@ -3,7 +3,7 @@ import sys import os sys.path.insert(0, os.path.dirname(os.path.dirname(__file__))) -from diagnose import DiagnosticsRunner +from diagnose import DiagnosticsRunner # noqa: E402 # ── build_ssh_command ──────────────────────────────────────────────────────── @@ -215,3 +215,265 @@ class TestParseEthtool: def test_empty_returns_empty(self): assert DiagnosticsRunner.parse_ethtool('') == {} + + +# ── parse_ethtool_driver ────────────────────────────────────────────────────── + +class TestParseEthtoolDriver: + def test_driver_name(self): + text = "driver: igb\nversion: 5.6.0-k\nfirmware-version: 1.67\nbus-info: 0000:03:00.0\n" + result = DiagnosticsRunner.parse_ethtool_driver(text) + assert result['driver'] == 'igb' + + def test_version(self): + text = "driver: igb\nversion: 5.6.0-k\n" + result = DiagnosticsRunner.parse_ethtool_driver(text) + assert result['version'] == '5.6.0-k' + + def test_firmware_version(self): + text = "firmware-version: 1.67, 0x80000d38\n" + result = DiagnosticsRunner.parse_ethtool_driver(text) + assert result['firmware_version'] == '1.67, 0x80000d38' + + def test_bus_info(self): + text = "bus-info: 0000:03:00.0\n" + result = DiagnosticsRunner.parse_ethtool_driver(text) + assert result['bus_info'] == '0000:03:00.0' + + def test_empty_returns_empty(self): + assert DiagnosticsRunner.parse_ethtool_driver('') == {} + + +# ── parse_nic_stats ─────────────────────────────────────────────────────────── + +class TestParseNicStats: + def test_parses_integer_values(self): + text = "rx_packets: 1234\ntx_packets: 5678\n" + result = DiagnosticsRunner.parse_nic_stats(text) + assert result['rx_packets'] == 1234 + assert result['tx_packets'] == 5678 + + def test_non_numeric_skipped(self): + text = "rx_packets: 100\nbad_stat: N/A\n" + result = DiagnosticsRunner.parse_nic_stats(text) + assert 'bad_stat' not in result + assert result['rx_packets'] == 100 + + def test_empty_returns_empty(self): + assert DiagnosticsRunner.parse_nic_stats('') == {} + + +# ── parse_ethtool_dom ───────────────────────────────────────────────────────── + +class TestParseEthtoolDom: + def test_unsupported_returns_empty(self): + assert DiagnosticsRunner.parse_ethtool_dom('Cannot get module EEPROM information') == {} + + def test_empty_returns_empty(self): + assert DiagnosticsRunner.parse_ethtool_dom('') == {} + + def test_vendor_name(self): + text = "Vendor name : CISCO-FINISAR\n" + result = DiagnosticsRunner.parse_ethtool_dom(text) + assert result['vendor'] == 'CISCO-FINISAR' + + def test_wavelength(self): + text = "Laser wavelength : 1310 nm\n" + result = DiagnosticsRunner.parse_ethtool_dom(text) + assert result['wavelength_nm'] == 1310 + + def test_tx_power_dbm(self): + text = "Laser output power : 0.3000 mW / -5.23 dBm\n" + result = DiagnosticsRunner.parse_ethtool_dom(text) + assert abs(result['tx_power_dbm'] - (-5.23)) < 0.01 + + def test_rx_power_dbm(self): + text = "Receiver signal average optical power : 0.1000 mW / -10.00 dBm\n" + result = DiagnosticsRunner.parse_ethtool_dom(text) + assert abs(result['rx_power_dbm'] - (-10.00)) < 0.01 + + +# ── parse_ip_link ───────────────────────────────────────────────────────────── + +class TestParseIpLink: + IP_LINK_SAMPLE = """\ +2: eth0: mtu 1500 qdisc mq state UP mode DEFAULT group default qlen 1000 + link/ether aa:bb:cc:dd:ee:ff brd ff:ff:ff:ff:ff:ff + RX: bytes packets errors dropped missed mcast + 123456789 1000000 0 5 0 0 + TX: bytes packets errors dropped carrier collsns + 98765432 900000 0 0 0 0 +""" + + def test_mtu_parsed(self): + result = DiagnosticsRunner.parse_ip_link(self.IP_LINK_SAMPLE) + assert result['mtu'] == 1500 + + def test_state_parsed(self): + result = DiagnosticsRunner.parse_ip_link(self.IP_LINK_SAMPLE) + assert result['state'] == 'up' + + def test_rx_bytes(self): + result = DiagnosticsRunner.parse_ip_link(self.IP_LINK_SAMPLE) + assert result['ip_rx_bytes'] == 123456789 + + def test_tx_bytes(self): + result = DiagnosticsRunner.parse_ip_link(self.IP_LINK_SAMPLE) + assert result['ip_tx_bytes'] == 98765432 + + def test_rx_dropped(self): + result = DiagnosticsRunner.parse_ip_link(self.IP_LINK_SAMPLE) + assert result['ip_rx_dropped'] == 5 + + def test_empty_returns_empty(self): + assert DiagnosticsRunner.parse_ip_link('') == {} + + +# ── parse_dmesg ─────────────────────────────────────────────────────────────── + +class TestParseDmesg: + def test_error_severity(self): + text = "[ 1.234567] eth0: Link failure detected\n" + events = DiagnosticsRunner.parse_dmesg(text) + assert len(events) == 1 + assert events[0]['severity'] == 'error' + + def test_warn_severity(self): + text = "[ 2.000000] eth0: dropped packet\n" + events = DiagnosticsRunner.parse_dmesg(text) + assert events[0]['severity'] == 'warn' + + def test_info_severity(self): + text = "[ 3.000000] eth0: NIC Link is Up\n" + events = DiagnosticsRunner.parse_dmesg(text) + assert events[0]['severity'] == 'info' + + def test_timestamp_extracted(self): + text = "[ 5.678900] eth0: reset\n" + events = DiagnosticsRunner.parse_dmesg(text) + assert events[0]['timestamp'] == '5.678900' + + def test_no_timestamp(self): + text = "eth0: some timeout event\n" + events = DiagnosticsRunner.parse_dmesg(text) + assert events[0]['timestamp'] == '' + assert events[0]['severity'] == 'error' + + def test_empty_returns_empty_list(self): + assert DiagnosticsRunner.parse_dmesg('') == [] + + +# ── parse_lldpctl ───────────────────────────────────────────────────────────── + +class TestParseLldpctl: + def test_unavailable_when_not_running(self): + result = DiagnosticsRunner.parse_lldpctl('lldpd not running') + assert result == {'available': False} + + def test_unavailable_on_empty(self): + result = DiagnosticsRunner.parse_lldpctl('') + assert result == {'available': False} + + def test_neighbor_system_parsed(self): + text = " SysName: core-sw-01\n PortID: Gi1/0/5\n ChassisID: aa:bb:cc:dd:ee:ff\n" + result = DiagnosticsRunner.parse_lldpctl(text) + assert result['available'] is True + assert result['neighbor_system'] == 'core-sw-01' + + def test_neighbor_port_parsed(self): + text = " SysName: core-sw-01\n PortID: Gi1/0/5\n" + result = DiagnosticsRunner.parse_lldpctl(text) + assert result['neighbor_port'] == 'Gi1/0/5' + + def test_chassis_id_parsed(self): + text = " ChassisID: aa:bb:cc:dd:ee:ff\n" + result = DiagnosticsRunner.parse_lldpctl(text) + assert result['neighbor_chassis_id'] == 'aa:bb:cc:dd:ee:ff' + + +# ── analyze ─────────────────────────────────────────────────────────────────── + +class TestAnalyze: + def _sections(self, **overrides): + base = { + 'carrier': '1', + 'operstate': 'up', + 'ethtool': {'link_detected': True, 'duplex': 'full', 'speed_mbps': 1000}, + 'sysfs_stats': {'rx_crc_errors': 0}, + 'ethtool_dom': {}, + 'dmesg': [], + 'lldpctl': {'available': False}, + 'carrier_changes': 0, + } + base.update(overrides) + return base + + def test_no_carrier_is_issue(self): + result = DiagnosticsRunner.analyze(self._sections(carrier='0'), {}) + codes = [i['code'] for i in result['issues']] + assert 'NO_CARRIER' in codes + + def test_half_duplex_is_issue(self): + sections = self._sections(ethtool={'duplex': 'half', 'link_detected': True, 'speed_mbps': 1000}) + result = DiagnosticsRunner.analyze(sections, {}) + codes = [i['code'] for i in result['issues']] + assert 'HALF_DUPLEX' in codes + + def test_speed_mismatch_is_warning(self): + result = DiagnosticsRunner.analyze( + self._sections(ethtool={'duplex': 'full', 'link_detected': True, 'speed_mbps': 100}), + {'speed_mbps': 1000} + ) + codes = [w['code'] for w in result['warnings']] + assert 'SPEED_MISMATCH' in codes + + def test_sfp_rx_critical_low(self): + sections = self._sections(ethtool_dom={'rx_power_dbm': -30.0}) + result = DiagnosticsRunner.analyze(sections, {}) + codes = [i['code'] for i in result['issues']] + assert 'SFP_RX_CRITICAL' in codes + + def test_sfp_rx_low_is_warning(self): + sections = self._sections(ethtool_dom={'rx_power_dbm': -20.0}) + result = DiagnosticsRunner.analyze(sections, {}) + codes = [w['code'] for w in result['warnings']] + assert 'SFP_RX_LOW' in codes + + def test_high_crc_is_issue(self): + sections = self._sections(sysfs_stats={'rx_crc_errors': 200}) + result = DiagnosticsRunner.analyze(sections, {}) + codes = [i['code'] for i in result['issues']] + assert 'CRC_ERRORS_HIGH' in codes + + def test_low_crc_is_warning(self): + sections = self._sections(sysfs_stats={'rx_crc_errors': 50}) + result = DiagnosticsRunner.analyze(sections, {}) + codes = [w['code'] for w in result['warnings']] + assert 'CRC_ERRORS_LOW' in codes + + def test_carrier_flapping_issue(self): + result = DiagnosticsRunner.analyze(self._sections(carrier_changes=150), {}) + codes = [i['code'] for i in result['issues']] + assert 'CARRIER_FLAPPING' in codes + + def test_carrier_flaps_warning(self): + result = DiagnosticsRunner.analyze(self._sections(carrier_changes=25), {}) + codes = [w['code'] for w in result['warnings']] + assert 'CARRIER_FLAPS' in codes + + def test_lldp_missing_is_info(self): + result = DiagnosticsRunner.analyze(self._sections(lldpctl={'available': False}), {}) + codes = [i['code'] for i in result['info']] + assert 'LLDP_MISSING' in codes + + def test_lldp_mismatch_is_warning(self): + sections = self._sections(lldpctl={'available': True, 'neighbor_system': 'wrong-switch'}) + switch_data = {'speed_mbps': 1000, 'lldp': {'system_name': 'core-sw-01'}} + result = DiagnosticsRunner.analyze(sections, switch_data) + codes = [w['code'] for w in result['warnings']] + assert 'LLDP_MISMATCH' in codes + + def test_healthy_link_no_issues(self): + result = DiagnosticsRunner.analyze(self._sections(), {}) + assert result['issues'] == [] + assert result['warnings'] == []