Add graceful degradation when external tools are missing

Checks availability of required (smartctl, lsblk) and optional (nvme,
ceph, pct, dmidecode) tools at startup. Guards all tool-dependent code
sections to skip gracefully with informative log messages instead of
crashing. Also fixes pre-existing indentation bug in LXC exception handler.

Fixes: #19

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-02-10 13:13:08 -05:00
parent 92bca248ac
commit 7b36255fb4

View File

@@ -1,5 +1,5 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
import os, sys, json, requests, psutil, socket, subprocess, logging, argparse, urllib.request, re, glob, datetime, fcntl, textwrap import os, sys, json, requests, psutil, socket, subprocess, logging, argparse, urllib.request, re, glob, datetime, fcntl, textwrap, shutil
from typing import Dict, Any, List from typing import Dict, Any, List
# ============================================================================= # =============================================================================
@@ -668,6 +668,41 @@ class SystemHealthMonitor:
# Drive details cache (per-run, cleared on next execution) # Drive details cache (per-run, cleared on next execution)
self._drive_details_cache = {} self._drive_details_cache = {}
# Check tool availability at startup
self._available_tools = self._check_tool_availability()
def _check_tool_availability(self) -> Dict[str, bool]:
"""Check which external tools are available on this system.
Returns a dict mapping tool names to availability booleans.
Logs warnings for missing required tools and info for missing optional tools.
"""
required_tools = {
'smartctl': 'smartmontools',
'lsblk': 'util-linux',
}
optional_tools = {
'nvme': 'nvme-cli',
'ceph': 'ceph-common',
'pct': 'pve-container',
'dmidecode': 'dmidecode',
}
availability = {}
for tool, package in required_tools.items():
available = shutil.which(tool) is not None
availability[tool] = available
if not available:
logger.warning(f"Required tool '{tool}' not found (install: apt install {package})")
for tool, package in optional_tools.items():
available = shutil.which(tool) is not None
availability[tool] = available
if not available:
logger.debug(f"Optional tool '{tool}' not found (install: apt install {package})")
return availability
def _enforce_storage_limit(self, history_dir: str, max_bytes: int = None): def _enforce_storage_limit(self, history_dir: str, max_bytes: int = None):
""" """
Delete oldest history files if directory exceeds size limit. Delete oldest history files if directory exceeds size limit.
@@ -2588,8 +2623,8 @@ class SystemHealthMonitor:
logger.debug(f"Detected Issues: {smart_health['issues']}") logger.debug(f"Detected Issues: {smart_health['issues']}")
logger.debug("=== End SMART Check ===\n") logger.debug("=== End SMART Check ===\n")
# Special handling for NVMe drives # Special handling for NVMe drives (requires nvme-cli)
if 'nvme' in device: if 'nvme' in device and self._available_tools.get('nvme'):
try: try:
nvme_result = subprocess.run( nvme_result = subprocess.run(
['nvme', 'smart-log', device], ['nvme', 'smart-log', device],
@@ -2649,13 +2684,17 @@ class SystemHealthMonitor:
"""Dedicated NVMe SMART health check.""" """Dedicated NVMe SMART health check."""
smart_health = { smart_health = {
'status': 'UNKNOWN', 'status': 'UNKNOWN',
'severity': 'NORMAL', 'severity': 'NORMAL',
'issues': [], 'issues': [],
'temp': None, 'temp': None,
'attributes': {}, 'attributes': {},
'manufacturer_profile': None 'manufacturer_profile': None
} }
if not self._available_tools.get('nvme'):
logger.debug(f"nvme-cli not available, skipping NVMe health check for {device}")
return smart_health
try: try:
# Use nvme-cli for NVMe devices # Use nvme-cli for NVMe devices
result = subprocess.run( result = subprocess.run(
@@ -2721,7 +2760,12 @@ class SystemHealthMonitor:
def _check_drives_health(self) -> Dict[str, Any]: def _check_drives_health(self) -> Dict[str, Any]:
"""Check health of all drives in the system.""" """Check health of all drives in the system."""
drives_health = {'overall_status': 'NORMAL', 'drives': []} drives_health = {'overall_status': 'NORMAL', 'drives': []}
if not self._available_tools.get('smartctl') or not self._available_tools.get('lsblk'):
logger.warning("Drive health checks skipped: smartctl or lsblk not available")
drives_health['overall_status'] = 'UNKNOWN'
return drives_health
try: try:
# Get only valid physical disks # Get only valid physical disks
physical_disks = self._get_all_disks() physical_disks = self._get_all_disks()
@@ -2846,18 +2890,19 @@ class SystemHealthMonitor:
} }
try: try:
# First check using dmidecode # First check using dmidecode (if available)
result = subprocess.run( if self._available_tools.get('dmidecode'):
['dmidecode', '--type', 'memory'], result = subprocess.run(
stdout=subprocess.PIPE, ['dmidecode', '--type', 'memory'],
stderr=subprocess.PIPE, stdout=subprocess.PIPE,
text=True, stderr=subprocess.PIPE,
timeout=30 text=True,
) timeout=30
if 'Error Correction Type: Multi-bit ECC' in result.stdout: )
memory_health['has_ecc'] = True if 'Error Correction Type: Multi-bit ECC' in result.stdout:
memory_health['has_ecc'] = True
# If dmidecode didn't find ECC, try the edac method as backup
# If dmidecode unavailable or didn't find ECC, try the edac method as backup
if not memory_health['has_ecc']: if not memory_health['has_ecc']:
edac_path = '/sys/devices/system/edac/mc' edac_path = '/sys/devices/system/edac/mc'
if os.path.exists(edac_path) and os.listdir(edac_path): if os.path.exists(edac_path) and os.listdir(edac_path):
@@ -2994,8 +3039,6 @@ class SystemHealthMonitor:
Returns health status, cluster info, and any issues detected. Returns health status, cluster info, and any issues detected.
Cluster-wide issues use [cluster-wide] tag for cross-node deduplication. Cluster-wide issues use [cluster-wide] tag for cross-node deduplication.
""" """
import shutil
ceph_health = { ceph_health = {
'status': 'OK', 'status': 'OK',
'is_ceph_node': False, 'is_ceph_node': False,
@@ -3013,7 +3056,7 @@ class SystemHealthMonitor:
return ceph_health return ceph_health
# Check if ceph CLI is available # Check if ceph CLI is available
if not shutil.which('ceph'): if not self._available_tools.get('ceph'):
logger.debug("Ceph CLI not found - not a Ceph node") logger.debug("Ceph CLI not found - not a Ceph node")
return ceph_health return ceph_health
@@ -3419,7 +3462,11 @@ class SystemHealthMonitor:
'containers': [], 'containers': [],
'issues': [] 'issues': []
} }
if not self._available_tools.get('pct'):
logger.debug("pct not available - not a PVE node or pve-container not installed")
return lxc_health
try: try:
result = subprocess.run( result = subprocess.run(
['pct', 'list'], ['pct', 'list'],
@@ -3516,10 +3563,10 @@ class SystemHealthMonitor:
lxc_health['issues'].append(issue) lxc_health['issues'].append(issue)
logger.debug(f"Filesystem details: {filesystem}") logger.debug(f"Filesystem details: {filesystem}")
except Exception as e: except Exception as e:
logger.debug(f"Error processing line: {str(e)}") logger.debug(f"Error processing line: {str(e)}")
logger.debug(f"Full exception: {repr(e)}") logger.debug(f"Full exception: {repr(e)}")
continue continue
# Only add container info if we have filesystem data # Only add container info if we have filesystem data
if container_info['filesystems']: if container_info['filesystems']: