Add pytest test suite and security scanning
- Add 33 tests for DiagnosticsRunner static methods (build_ssh_command, parse_output, parse_sysfs_stats, parse_ethtool and variants) - Add test.yml CI workflow running pytest on every push/PR - Add security.yml CI workflow running bandit on every push/PR (weekly) Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
@@ -0,0 +1,24 @@
|
||||
name: Test
|
||||
|
||||
on:
|
||||
push:
|
||||
branches: ["**"]
|
||||
pull_request:
|
||||
branches: ["**"]
|
||||
|
||||
jobs:
|
||||
pytest:
|
||||
name: Python Tests (pytest)
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
- uses: actions/checkout@v3
|
||||
|
||||
- name: Install Python and dependencies
|
||||
run: |
|
||||
apt-get update -qq
|
||||
apt-get install -y -qq python3 python3-pip
|
||||
pip3 install pytest
|
||||
pip3 install -r requirements.txt --quiet
|
||||
|
||||
- name: Run pytest
|
||||
run: python3 -m pytest tests/ -v
|
||||
@@ -0,0 +1,217 @@
|
||||
"""Tests for gandalf.diagnose — all pure static methods, no external deps."""
|
||||
import sys
|
||||
import os
|
||||
sys.path.insert(0, os.path.dirname(os.path.dirname(__file__)))
|
||||
|
||||
from diagnose import DiagnosticsRunner
|
||||
|
||||
|
||||
# ── build_ssh_command ────────────────────────────────────────────────────────
|
||||
|
||||
class TestBuildSshCommand:
|
||||
def test_contains_stricthostkeychecking_no(self):
|
||||
cmd = DiagnosticsRunner.build_ssh_command('10.0.0.1', 'eth0')
|
||||
assert 'StrictHostKeyChecking=no' in cmd
|
||||
|
||||
def test_contains_host_ip(self):
|
||||
cmd = DiagnosticsRunner.build_ssh_command('10.0.0.1', 'eth0')
|
||||
assert 'root@10.0.0.1' in cmd
|
||||
|
||||
def test_contains_interface_name(self):
|
||||
cmd = DiagnosticsRunner.build_ssh_command('10.0.0.1', 'eth0')
|
||||
assert 'eth0' in cmd
|
||||
|
||||
def test_shell_quotes_special_chars_in_iface(self):
|
||||
# shlex.quote wraps the whole iface string in single quotes so
|
||||
# shell metacharacters are not interpreted as commands
|
||||
cmd = DiagnosticsRunner.build_ssh_command('10.0.0.1', "eth0; rm -rf /")
|
||||
# The iface must appear inside single quotes
|
||||
assert "'eth0; rm -rf /'" in cmd
|
||||
|
||||
def test_contains_sysfs_stats_section(self):
|
||||
cmd = DiagnosticsRunner.build_ssh_command('10.0.0.1', 'eth0')
|
||||
assert 'sysfs_stats' in cmd
|
||||
|
||||
def test_contains_ethtool_section(self):
|
||||
cmd = DiagnosticsRunner.build_ssh_command('10.0.0.1', 'eth0')
|
||||
assert 'ethtool' in cmd
|
||||
|
||||
|
||||
# ── parse_output ─────────────────────────────────────────────────────────────
|
||||
|
||||
SAMPLE_OUTPUT = """\
|
||||
=== carrier ===
|
||||
1
|
||||
=== operstate ===
|
||||
up
|
||||
=== sysfs_stats ===
|
||||
rx_bytes:12345
|
||||
tx_bytes:67890
|
||||
rx_errors:0
|
||||
tx_errors:0
|
||||
rx_dropped:0
|
||||
tx_dropped:0
|
||||
rx_crc_errors:0
|
||||
rx_frame_errors:0
|
||||
rx_fifo_errors:0
|
||||
tx_carrier_errors:0
|
||||
collisions:0
|
||||
rx_missed_errors:0
|
||||
=== carrier_changes ===
|
||||
3
|
||||
=== ethtool ===
|
||||
Speed: 1000Mb/s
|
||||
Duplex: Full
|
||||
Link detected: yes
|
||||
Auto-negotiation: on
|
||||
=== ethtool_driver ===
|
||||
driver: igb
|
||||
version: 5.6.0-k
|
||||
firmware-version: 1.67, 0x80000d38
|
||||
bus-info: 0000:03:00.0
|
||||
=== ethtool_pause ===
|
||||
RX: on
|
||||
TX: off
|
||||
=== ethtool_ring ===
|
||||
Pre-set maximums:
|
||||
RX: 4096
|
||||
TX: 4096
|
||||
Current hardware settings:
|
||||
RX: 256
|
||||
TX: 256
|
||||
=== ethtool_stats ===
|
||||
rx_packets: 1000
|
||||
tx_packets: 2000
|
||||
=== ethtool_dom ===
|
||||
=== ip_link ===
|
||||
2: eth0: <BROADCAST,MULTICAST,UP,LOWER_UP> mtu 1500
|
||||
=== ip_addr ===
|
||||
inet 10.0.0.1/24
|
||||
=== ip_route ===
|
||||
default via 10.0.0.254
|
||||
=== dmesg ===
|
||||
eth0: renamed from ens3
|
||||
=== lldpctl ===
|
||||
lldpd not running
|
||||
=== end ===
|
||||
"""
|
||||
|
||||
|
||||
class TestParseOutput:
|
||||
def setup_method(self):
|
||||
self.parsed = DiagnosticsRunner.parse_output(SAMPLE_OUTPUT)
|
||||
|
||||
def test_carrier_parsed(self):
|
||||
assert self.parsed['carrier'] == '1'
|
||||
|
||||
def test_operstate_parsed(self):
|
||||
assert self.parsed['operstate'] == 'up'
|
||||
|
||||
def test_carrier_changes_is_int(self):
|
||||
assert self.parsed['carrier_changes'] == 3
|
||||
|
||||
def test_sysfs_stats_rx_bytes(self):
|
||||
assert self.parsed['sysfs_stats']['rx_bytes'] == 12345
|
||||
|
||||
def test_sysfs_stats_tx_bytes(self):
|
||||
assert self.parsed['sysfs_stats']['tx_bytes'] == 67890
|
||||
|
||||
def test_ethtool_speed(self):
|
||||
assert self.parsed['ethtool']['speed_mbps'] == 1000
|
||||
|
||||
def test_ethtool_link_detected(self):
|
||||
assert self.parsed['ethtool']['link_detected'] is True
|
||||
|
||||
def test_ethtool_duplex(self):
|
||||
assert self.parsed['ethtool']['duplex'] == 'full'
|
||||
|
||||
def test_ethtool_auto_neg(self):
|
||||
assert self.parsed['ethtool']['auto_neg'] is True
|
||||
|
||||
def test_ethtool_driver(self):
|
||||
assert self.parsed['ethtool_driver']['driver'] == 'igb'
|
||||
|
||||
def test_ethtool_pause_rx_on(self):
|
||||
assert self.parsed['ethtool_pause']['rx_pause'] is True
|
||||
|
||||
def test_ethtool_pause_tx_off(self):
|
||||
assert self.parsed['ethtool_pause']['tx_pause'] is False
|
||||
|
||||
def test_ethtool_ring_current(self):
|
||||
assert self.parsed['ethtool_ring']['rx_current'] == 256
|
||||
assert self.parsed['ethtool_ring']['tx_current'] == 256
|
||||
|
||||
def test_ethtool_ring_max(self):
|
||||
assert self.parsed['ethtool_ring']['rx_max'] == 4096
|
||||
|
||||
def test_ip_addr_present(self):
|
||||
assert '10.0.0.1/24' in self.parsed['ip_addr']
|
||||
|
||||
def test_ip_route_present(self):
|
||||
assert 'default via 10.0.0.254' in self.parsed['ip_route']
|
||||
|
||||
|
||||
class TestParseOutputEdgeCases:
|
||||
def test_empty_string_returns_dict(self):
|
||||
result = DiagnosticsRunner.parse_output('')
|
||||
assert isinstance(result, dict)
|
||||
assert result['carrier'] == '?'
|
||||
assert result['operstate'] == '?'
|
||||
assert result['carrier_changes'] == 0
|
||||
|
||||
def test_no_end_sentinel_still_parses(self):
|
||||
output = "=== carrier ===\n1\n=== operstate ===\nup\n"
|
||||
result = DiagnosticsRunner.parse_output(output)
|
||||
assert result['carrier'] == '1'
|
||||
|
||||
|
||||
# ── parse_sysfs_stats ────────────────────────────────────────────────────────
|
||||
|
||||
class TestParseSysfsStats:
|
||||
def test_parses_known_keys(self):
|
||||
text = "rx_bytes:100\ntx_bytes:200\nrx_errors:0\n"
|
||||
result = DiagnosticsRunner.parse_sysfs_stats(text)
|
||||
assert result['rx_bytes'] == 100
|
||||
assert result['tx_bytes'] == 200
|
||||
assert result['rx_errors'] == 0
|
||||
|
||||
def test_ignores_unknown_keys(self):
|
||||
text = "unknown_counter:999\nrx_bytes:50\n"
|
||||
result = DiagnosticsRunner.parse_sysfs_stats(text)
|
||||
assert 'unknown_counter' not in result
|
||||
assert result['rx_bytes'] == 50
|
||||
|
||||
def test_non_numeric_value_defaults_to_zero(self):
|
||||
text = "rx_bytes:not_a_number\n"
|
||||
result = DiagnosticsRunner.parse_sysfs_stats(text)
|
||||
assert result['rx_bytes'] == 0
|
||||
|
||||
def test_empty_text_returns_empty_dict(self):
|
||||
assert DiagnosticsRunner.parse_sysfs_stats('') == {}
|
||||
|
||||
|
||||
# ── parse_ethtool ─────────────────────────────────────────────────────────────
|
||||
|
||||
class TestParseEthtool:
|
||||
def test_speed_parsed(self):
|
||||
text = "Speed: 10000Mb/s\n"
|
||||
result = DiagnosticsRunner.parse_ethtool(text)
|
||||
assert result['speed_mbps'] == 10000
|
||||
|
||||
def test_unknown_speed(self):
|
||||
text = "Speed: Unknown! (0)\n"
|
||||
result = DiagnosticsRunner.parse_ethtool(text)
|
||||
assert result['speed_mbps'] is None
|
||||
|
||||
def test_link_detected_no(self):
|
||||
text = "Link detected: no\n"
|
||||
result = DiagnosticsRunner.parse_ethtool(text)
|
||||
assert result['link_detected'] is False
|
||||
|
||||
def test_auto_neg_off(self):
|
||||
text = "Auto-negotiation: off\n"
|
||||
result = DiagnosticsRunner.parse_ethtool(text)
|
||||
assert result['auto_neg'] is False
|
||||
|
||||
def test_empty_returns_empty(self):
|
||||
assert DiagnosticsRunner.parse_ethtool('') == {}
|
||||
Reference in New Issue
Block a user