data retention and large refactor of codebase

This commit is contained in:
2025-09-03 12:43:16 -04:00
parent 3d902620b0
commit bc73a691df
2 changed files with 461 additions and 145 deletions

View File

@ -13,6 +13,10 @@ A robust system health monitoring daemon that tracks hardware status and automat
- Configurable thresholds and monitoring parameters
- Dry-run mode for testing
- Systemd integration for automated daily checks
- LXC container storage monitoring
- Historical trend analysis for predictive failure detection
- Manufacturer-specific SMART attribute interpretation
- ECC memory error detection
## Installation
@ -53,19 +57,33 @@ python3 hwmonDaemon.py
The daemon monitors:
- Disk usage (warns at 80%, critical at 90%)
- LXC storage usage (warns at 80%, critical at 90%)
- Memory usage (warns at 80%)
- CPU usage (warns at 80%)
- CPU usage (warns at 95%)
- Network connectivity to management (10.10.10.1) and Ceph (10.10.90.1) networks
- SMART status of physical drives
- SMART status of physical drives with manufacturer-specific profiles
- Temperature monitoring (warns at 65°C)
- Automatic duplicate ticket prevention
- Enhanced logging with debug capabilities
## Data Storage
The daemon creates and maintains:
- **Log Directory**: `/var/log/hwmonDaemon/`
- **Historical SMART Data**: JSON files for trend analysis
- **Data Retention**: 30 days of historical monitoring data
## Ticket Creation
The daemon automatically creates tickets with:
- Standardized titles including hostname, hardware type, and scope
- Detailed descriptions of detected issues
- Detailed descriptions of detected issues with drive specifications
- Priority levels based on severity (P2-P4)
- Proper categorization and status tracking
- Executive summaries and technical analysis
## Dependencies
@ -73,7 +91,17 @@ The daemon automatically creates tickets with:
- Required Python packages:
- psutil
- requests
- System tools:
- smartmontools (for SMART disk monitoring)
- nvme-cli (for NVMe drive monitoring)
## Excluded Paths
The following paths are automatically excluded from monitoring:
- `/media/*`
- `/mnt/pve/mediafs/*`
- `/opt/metube_downloads`
- Pattern-based exclusions for media and download directories
## Service Configuration
@ -83,6 +111,19 @@ The daemon runs:
- As root user for hardware access
- With automatic restart on failure
## Troubleshooting
```bash
# View service logs
sudo journalctl -u hwmon.service -f
# Check service status
sudo systemctl status hwmon.timer
# Manual test run
python3 hwmonDaemon.py --dry-run
```
## Security Note
Ensure proper network security measures are in place as the service downloads and executes code from a specified URL.

View File

@ -2,31 +2,33 @@
import os, sys, json, requests, psutil, socket, subprocess, logging, argparse, urllib.request, re, glob, datetime
from typing import Dict, Any, List
# Create a logger
# =============================================================================
# LOGGING SETUP
# =============================================================================
logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)
# Create a console handler and set its level to DEBUG
console_handler = logging.StreamHandler()
console_handler.setLevel(logging.DEBUG)
# Create a formatter
formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
# Add the formatter to the console handler
console_handler.setFormatter(formatter)
# Add the console handler to the logger
logger.addHandler(console_handler)
class SystemHealthMonitor:
# =============================================================================
# CLASS CONSTANTS AND CONFIGURATION
# =============================================================================
STANDARD_WIDTH = 80
PRIORITIES = {
'CRITICAL': '1',
'HIGH': '2',
'MEDIUM': '3',
'LOW': '4'
}
ISSUE_PRIORITIES = {
'SMART_FAILURE': PRIORITIES['HIGH'],
'DISK_CRITICAL': PRIORITIES['HIGH'],
@ -36,6 +38,7 @@ class SystemHealthMonitor:
'CPU_HIGH': PRIORITIES['LOW'],
'NETWORK_FAILURE': PRIORITIES['HIGH']
}
CONFIG = {
'TICKET_API_URL': 'http://10.10.10.45/create_ticket_api.php',
'THRESHOLDS': {
@ -63,8 +66,11 @@ class SystemHealthMonitor:
r'.*/media$',
r'.*mediafs.*',
r'.*/downloads.*'
]
],
'HISTORY_DIR': '/var/log/hwmonDaemon',
'HISTORY_RETENTION_DAYS': 30
}
TICKET_TEMPLATES = {
'ACTION_TYPE': {
'AUTO': '[auto]',
@ -92,6 +98,7 @@ class SystemHealthMonitor:
'DEFAULT_CATEGORY': 'Hardware',
'DEFAULT_ISSUE_TYPE': 'Problem'
}
PROBLEMATIC_FIRMWARE = {
'Samsung': {
'EVO860': ['RVT01B6Q', 'RVT02B6Q'], # Known issues with sudden performance drops
@ -107,6 +114,7 @@ class SystemHealthMonitor:
'WD141KRYZ': ['02.01A02']
}
}
MANUFACTURER_SMART_PROFILES = {
'Ridata': {
'aliases': ['Ridata', 'Ritek', 'RIDATA', 'RITEK', 'SSD 512GB'],
@ -229,12 +237,14 @@ class SystemHealthMonitor:
}
}
}
SEVERITY_INDICATORS = {
'CRITICAL': '🔴',
'WARNING': '🟡',
'HEALTHY': '🟢',
'UNKNOWN': ''
}
SMART_DESCRIPTIONS = {
'Reported_Uncorrect': """
Number of errors that could not be recovered using hardware ECC.
@ -411,9 +421,10 @@ class SystemHealthMonitor:
"""
}
def __init__(self,
ticket_api_url: str = 'http://10.10.10.45/create_ticket_api.php',
dry_run: bool = False):
# =============================================================================
# INITIALIZATION
# =============================================================================
def __init__(self, ticket_api_url: str = 'http://10.10.10.45/create_ticket_api.php', dry_run: bool = False):
"""
Initialize the system health monitor.
@ -423,10 +434,14 @@ class SystemHealthMonitor:
self.ticket_api_url = ticket_api_url
self.dry_run = dry_run
# Ensure history directory exists
os.makedirs(self.CONFIG['HISTORY_DIR'], exist_ok=True)
# =============================================================================
# MAIN EXECUTION METHODS
# =============================================================================
def run(self):
"""
Perform a one-shot health check of the system.
"""
"""Perform a one-shot health check of the system."""
try:
# Perform health checks and gather the report
health_report = self.perform_health_checks()
@ -439,15 +454,14 @@ class SystemHealthMonitor:
logger.error(traceback.format_exc())
def perform_health_checks(self) -> Dict[str, Any]:
"""
Perform comprehensive system health checks and return a report.
"""
"""Perform comprehensive system health checks and return a report."""
health_report = {
'drives_health': self._check_drives_health(),
'memory_health': self._check_memory_usage(),
'cpu_health': self._check_cpu_usage(),
'network_health': self._check_network_status(),
'lxc_health': self._check_lxc_storage()
'lxc_health': self._check_lxc_storage(),
'system_health': self._check_system_drive_indicators()
}
if self.dry_run:
@ -482,14 +496,260 @@ class SystemHealthMonitor:
logger.info("\nNetwork Status:")
logger.info(f"Management: {health_report['network_health']['management_network']['status']}")
logger.info(f"Ceph: {health_report['network_health']['ceph_network']['status']}")
if health_report['system_health']['issues']:
logger.info(f"\nSystem Issues: {len(health_report['system_health']['issues'])} found")
logger.info("\n=== End Summary ===")
return health_report
# =============================================================================
# ENHANCED SMART ANALYSIS METHODS
# =============================================================================
def _analyze_smart_trends(self, device: str, current_attributes: dict) -> List[str]:
"""Analyze SMART attribute trends to predict failures."""
issues = []
# Create safe filename from device path
device_safe = device.replace('/', '_').replace('-', '_')
historical_file = os.path.join(self.CONFIG['HISTORY_DIR'], f"smart_history_{device_safe}.json")
try:
# Load historical data
if os.path.exists(historical_file):
with open(historical_file, 'r') as f:
history = json.load(f)
else:
history = []
# Add current reading
current_reading = {
'timestamp': datetime.datetime.now().isoformat(),
'attributes': current_attributes
}
history.append(current_reading)
# Keep only recent data
cutoff_date = datetime.datetime.now() - datetime.timedelta(days=self.CONFIG['HISTORY_RETENTION_DAYS'])
history = [h for h in history if datetime.datetime.fromisoformat(h['timestamp']) > cutoff_date]
# Analyze trends for critical attributes
if len(history) >= 3: # Need at least 3 data points
critical_attrs = ['Reallocated_Sector_Ct', 'Current_Pending_Sector', 'Reported_Uncorrect',
'Offline_Uncorrectable', 'Program_Fail_Count', 'Erase_Fail_Count']
for attr in critical_attrs:
if attr in current_attributes:
# Get last week's values
recent_history = history[-7:] if len(history) >= 7 else history
values = [h['attributes'].get(attr, 0) for h in recent_history]
if len(values) >= 3:
# Check for rapid increase
recent_increase = values[-1] - values[0]
if recent_increase > 0:
rate = recent_increase / len(values)
# Different thresholds for different attributes
if attr in ['Reallocated_Sector_Ct', 'Current_Pending_Sector']:
if rate > 0.5: # More than 0.5 sectors per check
issues.append(f"TREND ALERT: Rapid increase in {attr}: +{recent_increase} in {len(values)} checks")
elif attr in ['Reported_Uncorrect', 'Offline_Uncorrectable']:
if rate > 0.2: # Any consistent increase is concerning
issues.append(f"TREND ALERT: Increasing {attr}: +{recent_increase} in {len(values)} checks")
else: # Program/Erase fail counts
if rate > 1: # More than 1 error per check
issues.append(f"TREND ALERT: Rapid increase in {attr}: +{recent_increase} in {len(values)} checks")
# Save updated history
with open(historical_file, 'w') as f:
json.dump(history, f, indent=2)
except Exception as e:
logger.debug(f"Error analyzing trends for {device}: {e}")
return issues
def _check_thermal_health(self, device: str, temperature: int, drive_type: str = 'HDD') -> List[str]:
"""Enhanced thermal health checking with drive-type specific thresholds."""
issues = []
if temperature is None:
return issues
# Drive-type specific temperature thresholds
if drive_type == 'SSD':
temp_thresholds = {'warning': 70, 'critical': 85, 'optimal_max': 60}
else: # HDD
temp_thresholds = {'warning': 55, 'critical': 65, 'optimal_max': 45}
if temperature >= temp_thresholds['critical']:
issues.append(f"CRITICAL: Drive temperature {temperature}°C exceeds safe operating limit for {drive_type}")
elif temperature >= temp_thresholds['warning']:
issues.append(f"WARNING: Drive temperature {temperature}°C approaching thermal limit for {drive_type}")
elif temperature > temp_thresholds['optimal_max']:
issues.append(f"INFO: Drive temperature {temperature}°C above optimal range for {drive_type}")
return issues
def _analyze_error_patterns(self, device: str, smart_output: str) -> List[str]:
"""Analyze SMART error logs for failure patterns."""
issues = []
# Pattern matching for different error types
error_patterns = {
'media_errors': [
r'UNC_ERR',
r'ABRT_ERR',
r'read error',
r'write error',
r'medium error'
],
'interface_errors': [
r'ICRC_ERR',
r'interface CRC error',
r'SATA link down',
r'communication failure'
],
'timeout_errors': [
r'command timeout',
r'NCQ error',
r'device fault',
r'reset required'
]
}
for error_type, patterns in error_patterns.items():
error_count = 0
for pattern in patterns:
matches = re.findall(pattern, smart_output, re.IGNORECASE)
error_count += len(matches)
if error_count > 0:
if error_count >= 10:
issues.append(f"CRITICAL: Multiple {error_type} detected ({error_count} occurrences)")
elif error_count >= 3:
issues.append(f"WARNING: {error_type} detected ({error_count} occurrences)")
elif error_count >= 1:
issues.append(f"INFO: {error_type} detected ({error_count} occurrences)")
return issues
def _check_ssd_health(self, device: str, smart_attributes: dict) -> List[str]:
"""SSD-specific health checks for wear and endurance."""
issues = []
# Check wear leveling and endurance indicators
wear_indicators = [
'Media_Wearout_Indicator',
'SSD_Life_Left',
'Percent_Lifetime_Remain',
'Available_Spare',
'Available_Spare_Threshold'
]
for indicator in wear_indicators:
if indicator in smart_attributes:
value = smart_attributes[indicator]
# Handle percentage-based indicators (countdown from 100)
if indicator in ['Media_Wearout_Indicator', 'SSD_Life_Left', 'Percent_Lifetime_Remain', 'Available_Spare']:
if value <= 5:
issues.append(f"CRITICAL: {indicator} at {value}% - SSD near end of life")
elif value <= 15:
issues.append(f"WARNING: {indicator} at {value}% - SSD showing significant wear")
elif value <= 30:
issues.append(f"INFO: {indicator} at {value}% - SSD wear monitoring recommended")
# Check for excessive bad blocks
bad_block_indicators = [
'Runtime_Bad_Block',
'Factory_Bad_Block_Ct',
'Grown_Failing_Block_Ct',
'End-to-End_Error'
]
for indicator in bad_block_indicators:
if indicator in smart_attributes:
value = smart_attributes[indicator]
if value > 100:
issues.append(f"WARNING: High {indicator}: {value}")
elif value > 10:
issues.append(f"INFO: Elevated {indicator}: {value}")
# Check write amplification and endurance metrics
endurance_indicators = [
'Total_LBAs_Written',
'Total_LBAs_Read',
'Host_Program_NAND_Pages_Count',
'FTL_Program_NAND_Pages_Count'
]
# Calculate write amplification if both host and FTL write counts are available
host_writes = smart_attributes.get('Host_Program_NAND_Pages_Count', 0)
ftl_writes = smart_attributes.get('FTL_Program_NAND_Pages_Count', 0)
if host_writes > 0 and ftl_writes > 0:
write_amplification = ftl_writes / host_writes
if write_amplification > 5.0:
issues.append(f"WARNING: High write amplification factor: {write_amplification:.2f}")
elif write_amplification > 3.0:
issues.append(f"INFO: Elevated write amplification factor: {write_amplification:.2f}")
return issues
def _check_system_drive_indicators(self) -> Dict[str, Any]:
"""Check system logs and kernel messages for drive issues."""
system_health = {
'status': 'OK',
'issues': []
}
try:
# Check dmesg for drive-related errors (last 1000 lines to avoid overwhelming output)
result = subprocess.run(['dmesg', '-T', '--level=err,warn'],
stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, timeout=10)
if result.returncode == 0:
error_patterns = [
(r'ata\d+.*failed command', 'ATA command failures'),
(r'sd \w+.*Medium Error', 'SCSI medium errors'),
(r'Buffer I/O error', 'Buffer I/O errors'),
(r'critical medium error', 'Critical medium errors'),
(r'unrecovered read error', 'Unrecovered read errors'),
(r'Current_Pending_Sector.*increased', 'Pending sector increases'),
(r'ata\d+.*SError:', 'SATA errors'),
(r'nvme\d+.*I/O error', 'NVMe I/O errors')
]
for pattern, description in error_patterns:
matches = re.findall(pattern, result.stdout, re.IGNORECASE)
if matches:
count = len(matches)
if count >= 5:
system_health['status'] = 'CRITICAL'
system_health['issues'].append(f"CRITICAL: {description} in system logs ({count} occurrences)")
elif count >= 2:
if system_health['status'] != 'CRITICAL':
system_health['status'] = 'WARNING'
system_health['issues'].append(f"WARNING: {description} in system logs ({count} occurrences)")
else:
system_health['issues'].append(f"INFO: {description} in system logs ({count} occurrences)")
except subprocess.TimeoutExpired:
system_health['issues'].append("WARNING: System log check timed out")
except Exception as e:
logger.debug(f"Error checking system drive indicators: {e}")
system_health['issues'].append(f"ERROR: Failed to check system logs: {str(e)}")
return system_health
# =============================================================================
# DRIVE HEALTH CHECKING METHODS
# =============================================================================
def _get_drive_details(self, device: str) -> Dict[str, str]:
"""
Get detailed drive information using smartctl
"""
"""Get detailed drive information using smartctl."""
drive_details = {
'model': None,
'serial': None,
@ -552,19 +812,8 @@ class SystemHealthMonitor:
{content}
{'' * content_width}"""
# Format each section using the consistent width
sections = {
'DRIVE SPECIFICATIONS': ...,
'SMART STATUS': ...,
'PARTITION INFO': ...
}
# Each content line should pad to content_width
for section, content in sections.items():
formatted_content = '\n'.join(f"{line:<{content_width-2}}" for line in content.split('\n'))
description += make_box(section, formatted_content)
def _get_issue_type(self, issue: str) -> str:
"""Determine issue type from issue description."""
if "SMART" in issue:
return "SMART Health Issue"
elif "Drive" in issue:
@ -578,6 +827,7 @@ class SystemHealthMonitor:
return "Hardware Issue"
def _get_impact_level(self, issue: str) -> str:
"""Determine impact level from issue description."""
if "CRITICAL" in issue or "UNHEALTHY" in issue:
return "🔴 Critical - Immediate Action Required"
elif "WARNING" in issue:
@ -585,6 +835,7 @@ class SystemHealthMonitor:
return "🟢 Low - Monitor Only"
def _generate_detailed_description(self, issue: str, health_report: Dict[str, Any]) -> str:
"""Generate detailed ticket description."""
hostname = socket.gethostname()
timestamp = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
priority = "⚠ HIGH" if "CRITICAL" in issue else "● MEDIUM"
@ -825,7 +1076,11 @@ class SystemHealthMonitor:
# P4 - Low priority monitoring alerts
return self.PRIORITIES['LOW'] # P4
# =============================================================================
# TICKET CREATION METHODS
# =============================================================================
def _create_tickets_for_issues(self, health_report: Dict[str, Any]):
"""Create tickets for detected issues."""
issues = self._detect_issues(health_report)
if not issues:
logger.info("No issues detected.")
@ -966,20 +1221,27 @@ class SystemHealthMonitor:
if lxc_health.get('status') in ['WARNING', 'CRITICAL']:
issues.extend(lxc_health.get('issues', []))
# Check for system-level drive issues
system_health = health_report.get('system_health', {})
if system_health.get('issues'):
issues.extend(system_health['issues'])
logger.info("=== Issue Detection Started ===")
logger.info(f"Checking drives: {len(health_report['drives_health']['drives'])} found")
logger.info(f"Memory status: {health_report['memory_health']['status']}")
logger.info(f"CPU status: {health_report['cpu_health']['status']}")
logger.info(f"Network status: {health_report['network_health']}")
logger.info(f"System status: {health_report['system_health']['status']}")
logger.info(f"Detected issues: {issues}")
logger.info("=== Issue Detection Completed ===\n")
return issues
# =============================================================================
# DISK AND STORAGE UTILITY METHODS
# =============================================================================
def _get_all_disks(self) -> List[str]:
"""
Get all physical disks using multiple detection methods.
"""
"""Get all physical disks using multiple detection methods."""
disks = set()
# Method 1: Use lsblk to get physical disks, excluding virtual devices
@ -1076,7 +1338,7 @@ class SystemHealthMonitor:
except (ValueError, AttributeError, TypeError) as e:
logger.debug(f"Failed to parse size string: {size_str}")
logger.debug(f"Parse error details: {str(e)}")
logger.debug(f"P**** error details: {str(e)}")
return 0.0
def _is_physical_disk(self, device_path):
@ -1117,9 +1379,7 @@ class SystemHealthMonitor:
return is_physical
def _check_disk_firmware(self, device: str) -> Dict[str, Any]:
"""
Check disk firmware version against known problematic versions.
"""
"""Check disk firmware version against known problematic versions."""
firmware_info = {
'version': None,
'model': None,
@ -1187,10 +1447,11 @@ class SystemHealthMonitor:
return firmware_info
# =============================================================================
# SMART HEALTH CHECKING METHODS
# =============================================================================
def _parse_smart_value(self, raw_value: str) -> int:
"""
Parse SMART values handling different formats including NVMe temperature readings
"""
"""Parse SMART values handling different formats including NVMe temperature readings."""
try:
# Handle temperature values with °C
if isinstance(raw_value, str) and '°C' in raw_value:
@ -1210,9 +1471,7 @@ class SystemHealthMonitor:
return 0
def _get_manufacturer_profile(self, model: str, manufacturer: str = None, firmware: str = None) -> Dict[str, Any]:
"""
Get manufacturer-specific SMART profile based on drive model/manufacturer/firmware.
"""
"""Get manufacturer-specific SMART profile based on drive model/manufacturer/firmware."""
logger.debug(f"Looking for profile - Model: '{model}', Manufacturer: '{manufacturer}', Firmware: '{firmware}'")
# Check each manufacturer profile
@ -1235,9 +1494,7 @@ class SystemHealthMonitor:
return self.MANUFACTURER_SMART_PROFILES['Generic']
def _should_monitor_attribute(self, attr_name: str, manufacturer_profile: dict) -> bool:
"""
Check if an attribute should be monitored based on manufacturer profile
"""
"""Check if an attribute should be monitored based on manufacturer profile."""
if not manufacturer_profile:
return True # Default: monitor everything
@ -1251,9 +1508,7 @@ class SystemHealthMonitor:
return True # Default: monitor unless explicitly disabled
def _get_attribute_thresholds(self, attr_name: str, manufacturer_profile: dict) -> dict:
"""
Get attribute-specific thresholds, falling back to defaults
"""
"""Get attribute-specific thresholds, falling back to defaults."""
# Check for manufacturer-specific thresholds first
if manufacturer_profile:
attr_config = manufacturer_profile.get('attributes', {}).get(attr_name, {})
@ -1264,7 +1519,7 @@ class SystemHealthMonitor:
'behavior': attr_config.get('behavior', 'countup')
}
# Fall back to BASE_SMART_THRESHOLDS (your existing thresholds)
# Enhanced BASE_SMART_THRESHOLDS with additional attributes
BASE_SMART_THRESHOLDS = {
'Reallocated_Sector_Ct': {'warning': 5, 'critical': 10},
'Current_Pending_Sector': {'warning': 1, 'critical': 5},
@ -1280,7 +1535,20 @@ class SystemHealthMonitor:
'Load_Cycle_Count': {'warning': 900000, 'critical': 1000000},
'SSD_Life_Left': {'warning': 30, 'critical': 10},
'Program_Fail_Cnt_Total': {'warning': 1, 'critical': 5},
'Erase_Fail_Count_Total': {'warning': 1, 'critical': 5}
'Erase_Fail_Count_Total': {'warning': 1, 'critical': 5},
# Enhanced SMART attributes for better failure detection
'Raw_Read_Error_Rate': {'warning': 100000, 'critical': 1000000},
'Seek_Error_Rate': {'warning': 100000, 'critical': 1000000},
'Command_Timeout': {'warning': 1, 'critical': 5},
'High_Fly_Writes': {'warning': 1, 'critical': 5},
'Airflow_Temperature_Cel': {'warning': 65, 'critical': 75},
'G_Sense_Error_Rate': {'warning': 100, 'critical': 1000},
'Power-Off_Retract_Count': {'warning': 100000, 'critical': 500000},
'Head_Flying_Hours': {'warning': 50000, 'critical': 70000},
'Runtime_Bad_Block': {'warning': 10, 'critical': 100},
'Factory_Bad_Block_Ct': {'warning': 50, 'critical': 200},
'Grown_Failing_Block_Ct': {'warning': 10, 'critical': 50},
'End-to-End_Error': {'warning': 1, 'critical': 5}
}
if attr_name in BASE_SMART_THRESHOLDS:
@ -1293,15 +1561,11 @@ class SystemHealthMonitor:
return None # No thresholds defined
def _is_new_drive(self, power_on_hours: int) -> bool:
"""
Determine if a drive is considered "new" based on power-on hours.
"""
"""Determine if a drive is considered "new" based on power-on hours."""
return power_on_hours < 720 # Less than 1 week of runtime
def _check_smart_health(self, device: str) -> Dict[str, Any]:
"""
Enhanced SMART health check with better error handling.
"""
"""Enhanced SMART health check with better error handling and predictive analysis."""
smart_health = {
'status': 'UNKNOWN',
'severity': 'NORMAL',
@ -1358,7 +1622,7 @@ class SystemHealthMonitor:
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
timeout=30 # 30 second timeout
timeout=30
)
output = result.stdout
@ -1411,23 +1675,6 @@ class SystemHealthMonitor:
is_new_drive = self._is_new_drive(power_on_hours)
logger.debug(f"Drive {device} power-on hours: {power_on_hours}, is_new_drive: {is_new_drive}")
# Define base SMART thresholds (for non-manufacturer specific attributes)
BASE_SMART_THRESHOLDS = {
'Reallocated_Sector_Ct': {'warning': 5, 'critical': 10},
'Current_Pending_Sector': {'warning': 1, 'critical': 5},
'Offline_Uncorrectable': {'warning': 1, 'critical': 2},
'Reported_Uncorrect': {'warning': 1, 'critical': 10},
'Spin_Retry_Count': {'warning': 1, 'critical': 5},
'Power_Cycle_Count': {'warning': 5000, 'critical': 10000},
'Power_On_Hours': {'warning': 61320, 'critical': 70080}, # ~7-8 years
'Temperature_Celsius': {'warning': 65, 'critical': 75},
'Available_Spare': {'warning': 30, 'critical': 10},
'Program_Fail_Count': {'warning': 10, 'critical': 20},
'Erase_Fail_Count': {'warning': 10, 'critical': 20},
'Load_Cycle_Count': {'warning': 900000, 'critical': 1000000},
'SSD_Life_Left': {'warning': 30, 'critical': 10}
}
# Parse remaining SMART attributes
for line in output.split('\n'):
# Handle manufacturer-specific Wear_Leveling_Count
@ -1469,15 +1716,19 @@ class SystemHealthMonitor:
smart_health['issues'].append(f"Low wear leveling remaining: {raw_value}")
# Handle all SMART attributes with manufacturer-specific logic
# Define all possible attributes we might encounter
ALL_SMART_ATTRIBUTES = [
'Reallocated_Sector_Ct', 'Current_Pending_Sector', 'Offline_Uncorrectable',
'Reported_Uncorrect', 'Spin_Retry_Count', 'Power_Cycle_Count', 'Power_On_Hours',
'Temperature_Celsius', 'Available_Spare', 'Program_Fail_Count', 'Erase_Fail_Count',
'Load_Cycle_Count', 'SSD_Life_Left', 'Program_Fail_Cnt_Total', 'Erase_Fail_Count_Total',
'Program_Fail_Count_Chip', 'Erase_Fail_Count_Chip'
'Program_Fail_Count_Chip', 'Erase_Fail_Count_Chip',
'Raw_Read_Error_Rate', 'Seek_Error_Rate', 'Command_Timeout', 'High_Fly_Writes',
'Airflow_Temperature_Cel', 'G_Sense_Error_Rate', 'Power-Off_Retract_Count',
'Head_Flying_Hours', 'Runtime_Bad_Block', 'Factory_Bad_Block_Ct',
'Grown_Failing_Block_Ct', 'End-to-End_Error'
]
for line in output.split('\n'):
for attr in ALL_SMART_ATTRIBUTES:
if attr in line and attr not in ['Wear_Leveling_Count']: # Wear_Leveling handled separately above
# Check if we should monitor this attribute
@ -1525,20 +1776,6 @@ class SystemHealthMonitor:
smart_health['severity'] = 'WARNING'
smart_health['issues'].append(f"Warning {attr}: {raw_value}")
# Now check the collected Erase_Fail_Count and Program_Fail_Count
for attr in ['Erase_Fail_Count', 'Program_Fail_Count']:
if attr in smart_health['attributes']:
raw_value = smart_health['attributes'][attr]
thresholds = BASE_SMART_THRESHOLDS[attr]
if raw_value >= thresholds['critical']:
smart_health['severity'] = 'CRITICAL'
smart_health['issues'].append(f"Critical {attr}: {raw_value}")
elif raw_value >= thresholds['warning']:
if smart_health['severity'] != 'CRITICAL':
smart_health['severity'] = 'WARNING'
smart_health['issues'].append(f"Warning {attr}: {raw_value}")
# Check for recent SMART errors
error_log_pattern = r"Error \d+ occurred at disk power-on lifetime: (\d+) hours"
error_matches = re.finditer(error_log_pattern, output)
@ -1554,6 +1791,28 @@ class SystemHealthMonitor:
smart_health['severity'] = 'WARNING'
smart_health['issues'].extend(recent_errors)
# Enhanced analysis methods
if smart_health['attributes']:
# Trend analysis for predictive failure detection
trend_issues = self._analyze_smart_trends(device, smart_health['attributes'])
smart_health['issues'].extend(trend_issues)
# SSD-specific checks
drive_type = drive_details.get('type', 'HDD')
if drive_type == 'SSD':
ssd_issues = self._check_ssd_health(device, smart_health['attributes'])
smart_health['issues'].extend(ssd_issues)
# Enhanced temperature analysis
if smart_health['temp']:
drive_type = drive_details.get('type', 'HDD')
thermal_issues = self._check_thermal_health(device, smart_health['temp'], drive_type)
smart_health['issues'].extend(thermal_issues)
# Error pattern analysis
error_pattern_issues = self._analyze_error_patterns(device, output)
smart_health['issues'].extend(error_pattern_issues)
logger.debug(f"=== SMART Health Check for {device} ===")
logger.debug(f"Manufacturer profile: {manufacturer_profile.get('aliases', ['Unknown'])[0] if manufacturer_profile else 'None'}")
logger.debug("Raw SMART attributes:")
@ -1622,9 +1881,7 @@ class SystemHealthMonitor:
return smart_health
def _check_nvme_smart_health(self, device: str) -> Dict[str, Any]:
"""
Dedicated NVMe SMART health check.
"""
"""Dedicated NVMe SMART health check."""
smart_health = {
'status': 'UNKNOWN',
'severity': 'NORMAL',
@ -1667,6 +1924,22 @@ class SystemHealthMonitor:
elif spare_pct < 30:
smart_health['severity'] = 'WARNING'
smart_health['issues'].append(f"Low Available_Spare: {spare_pct}%")
# Enhanced NVMe analysis
if smart_health['attributes']:
# Trend analysis for NVMe devices
trend_issues = self._analyze_smart_trends(device, smart_health['attributes'])
smart_health['issues'].extend(trend_issues)
# SSD-specific checks for NVMe
ssd_issues = self._check_ssd_health(device, smart_health['attributes'])
smart_health['issues'].extend(ssd_issues)
# Enhanced temperature analysis for NVMe
if smart_health['temp']:
thermal_issues = self._check_thermal_health(device, smart_health['temp'], 'SSD')
smart_health['issues'].extend(thermal_issues)
else:
smart_health['status'] = 'ERROR'
smart_health['issues'].append("Failed to read NVMe SMART data")
@ -1681,6 +1954,7 @@ class SystemHealthMonitor:
return smart_health
def _check_drives_health(self) -> Dict[str, Any]:
"""Check health of all drives in the system."""
drives_health = {'overall_status': 'NORMAL', 'drives': []}
try:
@ -1770,6 +2044,9 @@ class SystemHealthMonitor:
return drives_health
# =============================================================================
# SYSTEM HEALTH CHECKING METHODS
# =============================================================================
@staticmethod
def _convert_bytes(bytes_value: int, suffix: str = 'B') -> str:
"""
@ -1786,16 +2063,14 @@ class SystemHealthMonitor:
return f"{bytes_value:.1f}Y{suffix}"
def _convert_size_to_bytes(self, size_str: str) -> float:
"""Convert size string with units to bytes"""
"""Convert size string with units to bytes."""
units = {'B': 1, 'K': 1024, 'M': 1024**2, 'G': 1024**3, 'T': 1024**4}
size = float(size_str[:-1])
unit = size_str[-1].upper()
return size * units[unit]
def _check_memory_usage(self) -> Dict[str, Any]:
"""
Check for ECC memory errors if ECC memory is present.
"""
"""Check for ECC memory errors if ECC memory is present."""
memory_health = {
'has_ecc': False,
'ecc_errors': [],