#!/usr/bin/env python3 import os, sys, json, requests, psutil, socket, subprocess, logging, argparse, urllib.request, re, glob, datetime from typing import Dict, Any, List # ============================================================================= # LOGGING SETUP # ============================================================================= logger = logging.getLogger(__name__) logger.setLevel(logging.DEBUG) console_handler = logging.StreamHandler() console_handler.setLevel(logging.DEBUG) formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s') console_handler.setFormatter(formatter) logger.addHandler(console_handler) class SystemHealthMonitor: # ============================================================================= # CLASS CONSTANTS AND CONFIGURATION # ============================================================================= STANDARD_WIDTH = 80 PRIORITIES = { 'CRITICAL': '1', 'HIGH': '2', 'MEDIUM': '3', 'LOW': '4' } ISSUE_PRIORITIES = { 'SMART_FAILURE': PRIORITIES['HIGH'], 'DISK_CRITICAL': PRIORITIES['HIGH'], 'DISK_WARNING': PRIORITIES['MEDIUM'], 'UNCORRECTABLE_ECC': PRIORITIES['HIGH'], 'CORRECTABLE_ECC': PRIORITIES['MEDIUM'], 'CPU_HIGH': PRIORITIES['LOW'], 'NETWORK_FAILURE': PRIORITIES['HIGH'] } CONFIG = { 'TICKET_API_URL': 'http://10.10.10.45/create_ticket_api.php', 'THRESHOLDS': { 'DISK_CRITICAL': 90, 'DISK_WARNING': 80, 'LXC_CRITICAL': 90, 'LXC_WARNING': 80, 'CPU_WARNING': 95, 'TEMPERATURE_WARNING': 65 }, 'NETWORKS': { 'MANAGEMENT': '10.10.10.1', 'CEPH': '10.10.90.1', 'PING_TIMEOUT': 1, 'PING_COUNT': 1 }, 'EXCLUDED_MOUNTS': [ '/media', '/mnt/pve/mediafs', '/opt/metube_downloads' ], 'EXCLUDED_PATTERNS': [ r'/media.*', r'/mnt/pve/mediafs.*', r'.*/media$', r'.*mediafs.*', r'.*/downloads.*' ], 'HISTORY_DIR': '/var/log/hwmonDaemon', 'HISTORY_RETENTION_DAYS': 30 } TICKET_TEMPLATES = { 'ACTION_TYPE': { 'AUTO': '[auto]', 'MANUAL': '[manual]' }, 'ENVIRONMENT': { 'PRODUCTION': '[production]' }, 'TICKET_TYPE': { 'MAINTENANCE': '[maintenance]' }, 'HARDWARE_TYPE': { 'HARDWARE': '[hardware]' }, 'SOFTWARE_TYPE': { 'SOFTWARE': '[software]' }, 'NETWORK_TYPE': { 'NETWORK': '[network]' }, 'SCOPE': { 'SINGLE_NODE': '[single-node]', 'CLUSTER_WIDE': '[cluster-wide]' }, 'DEFAULT_CATEGORY': 'Hardware', 'DEFAULT_ISSUE_TYPE': 'Problem' } PROBLEMATIC_FIRMWARE = { 'Samsung': { 'EVO860': ['RVT01B6Q', 'RVT02B6Q'], # Known issues with sudden performance drops 'EVO870': ['SVT01B6Q'], 'PM883': ['HXT7404Q'] # Known issues with TRIM }, 'Seagate': { 'ST8000NM': ['CC64'], # Known issues with NCQ 'ST12000NM': ['SN02'] }, 'WDC': { 'WD121KRYZ': ['01.01A01'], # RAID rebuild issues 'WD141KRYZ': ['02.01A02'] } } MANUFACTURER_SMART_PROFILES = { 'Western Digital': { 'aliases': ['WDC', 'Western Digital', 'HGST', 'Ultrastar'], 'attributes': { 'Raw_Read_Error_Rate': { 'monitor': False, 'description': 'WD drives use this as operation counter, not error count' }, 'Seek_Error_Rate': { 'monitor': False, 'description': 'WD drives use this as operation counter, not error count' } } }, 'Seagate': { 'aliases': ['Seagate', 'ST'], 'attributes': { 'Raw_Read_Error_Rate': { 'monitor': False, 'description': 'Seagate drives use this as operation counter' } } }, 'Ridata': { 'aliases': ['Ridata', 'Ritek', 'RIDATA', 'RITEK', 'SSD 512GB'], 'firmware_patterns': ['HT3618B7', 'HT36'], 'wear_leveling_behavior': 'countup', 'wear_leveling_baseline': 0, 'wear_leveling_thresholds': { 'warning': 1000000000, # 1 billion - very conservative 'critical': 2000000000 # 2 billion - extremely conservative }, 'attributes': { 'Wear_Leveling_Count': { 'behavior': 'countup', 'baseline': 0, 'warning_threshold': 1000000000, 'critical_threshold': 2000000000, 'description': 'Total wear leveling operations (countup from 0)', 'ignore_on_new_drive': False, 'monitor': True # Include in health checks }, # These are operation counters, NOT actual failures - ignore completely 'Erase_Fail_Count_Chip': { 'monitor': False, # Skip monitoring entirely 'description': 'Operation counter, not actual failures - IGNORED' }, 'Program_Fail_Count_Chip': { 'monitor': False, # Skip monitoring entirely 'description': 'Operation counter, not actual failures - IGNORED' }, # ADD THIS: Regular Erase_Fail_Count is also an operation counter for Ridata 'Erase_Fail_Count': { 'monitor': False, # Skip monitoring entirely for Ridata 'description': 'Operation counter for Ridata drives, not actual failures - IGNORED' }, 'Program_Fail_Count': { 'monitor': False, # Skip monitoring entirely for Ridata 'description': 'Operation counter for Ridata drives, not actual failures - IGNORED' }, # These are the REAL failure counters - monitor with standard thresholds 'Program_Fail_Cnt_Total': { 'monitor': True, 'behavior': 'countup', 'baseline': 0, 'warning_threshold': 1, # Any failures are concerning 'critical_threshold': 5, 'description': 'Actual program failures (real failures)' }, 'Erase_Fail_Count_Total': { 'monitor': True, 'behavior': 'countup', 'baseline': 0, 'warning_threshold': 1, # Any failures are concerning 'critical_threshold': 5, 'description': 'Actual erase failures (real failures)' } } }, 'OOS': { 'aliases': ['OOS12000G', 'OOS'], 'attributes': { # These drives seem to report very high error rates normally 'Raw_Read_Error_Rate': { 'monitor': False, # Skip monitoring - seems to be a counter 'description': 'OOS drives report high values normally' }, 'Seek_Error_Rate': { 'monitor': False, # Skip monitoring - seems to be a counter 'description': 'OOS drives report high values normally' }, 'Command_Timeout': { 'warning_threshold': 100000000000, # 100 billion 'critical_threshold': 200000000000, # 200 billion 'description': 'OOS drives report very high timeout counters' } } }, 'Samsung': { 'aliases': ['Samsung', 'SAMSUNG'], 'wear_leveling_behavior': 'countup', 'wear_leveling_baseline': 0, 'wear_leveling_thresholds': { 'warning': 2000, 'critical': 3000 }, 'attributes': { 'Wear_Leveling_Count': { 'behavior': 'countup', 'baseline': 0, 'warning_threshold': 2000, 'critical_threshold': 3000, 'description': 'Total wear leveling operations performed', 'monitor': True }, # Standard monitoring for all other attributes 'Program_Fail_Count': { 'monitor': True, 'warning_threshold': 10, 'critical_threshold': 20 }, 'Erase_Fail_Count': { 'monitor': True, 'warning_threshold': 10, 'critical_threshold': 20 } } }, 'Intel': { 'aliases': ['Intel', 'INTEL'], 'wear_leveling_behavior': 'percentage', 'wear_leveling_baseline': 100, 'wear_leveling_thresholds': { 'warning': 30, 'critical': 10 }, 'attributes': { 'Media_Wearout_Indicator': { 'behavior': 'countdown', 'baseline': 100, 'warning_threshold': 30, 'critical_threshold': 10, 'description': 'Percentage of rated life remaining', 'monitor': True } } }, 'Micron': { 'aliases': ['Micron', 'MICRON', 'Crucial', 'CRUCIAL'], 'wear_leveling_behavior': 'percentage', 'wear_leveling_baseline': 100, 'wear_leveling_thresholds': { 'warning': 30, 'critical': 10 }, 'attributes': { # All attributes use default monitoring unless specified } }, 'Generic': { # Fallback for unknown manufacturers 'aliases': ['Unknown', 'Generic'], 'wear_leveling_behavior': 'unknown', 'wear_leveling_baseline': None, 'wear_leveling_thresholds': { 'warning': None, # Don't trigger on unknown 'critical': None }, 'attributes': { # All attributes use default monitoring } } } SEVERITY_INDICATORS = { 'CRITICAL': 'šŸ”“', 'WARNING': '🟔', 'HEALTHY': '🟢', 'UNKNOWN': '⚪' } SMART_DESCRIPTIONS = { 'Reported_Uncorrect': """ Number of errors that could not be recovered using hardware ECC. Impact: - Indicates permanent data loss in affected sectors - High correlation with drive hardware failure - Critical reliability indicator Recommended Actions: 1. Backup critical data immediately 2. Check drive logs for related errors 3. Plan for drive replacement 4. Monitor for error count increases """, 'Reallocated_Sector_Ct': """ Number of sectors that have been reallocated due to errors. Impact: - High counts indicate degrading media - Each reallocation uses one of the drive's limited spare sectors - Rapid increases suggest accelerating drive wear Recommended Actions: 1. Monitor rate of increase 2. Check drive temperature 3. Plan replacement if count grows rapidly """, 'Current_Pending_Sector': """ Sectors waiting to be reallocated due to read/write errors. Impact: - Indicates potentially unstable sectors - May result in data loss if unrecoverable - Should be monitored for increases Recommended Actions: 1. Backup affected files 2. Run extended SMART tests 3. Monitor for conversion to reallocated sectors """, 'Offline_Uncorrectable': """ Count of uncorrectable errors detected during offline data collection. Impact: - Direct indicator of media reliability issues - May affect data integrity - High values suggest drive replacement needed Recommended Actions: 1. Run extended SMART tests 2. Check drive logs 3. Plan replacement if count is increasing """, 'Spin_Retry_Count': """ Number of spin start retry attempts. Impact: - Indicates potential motor or bearing issues - May predict imminent mechanical failure - Increasing values suggest degrading drive health Recommended Actions: 1. Monitor for rapid increases 2. Check drive temperature 3. Plan replacement if count grows rapidly """, 'Power_On_Hours': """ Total number of hours the device has been powered on. Impact: - Normal aging metric - Used to gauge overall drive lifetime - Compare against manufacturer's MTBF rating Recommended Actions: 1. Compare to warranty period 2. Plan replacement if approaching rated lifetime """, 'Media_Wearout_Indicator': """ Percentage of drive's rated life remaining (SSDs). Impact: - 100 indicates new drive - 0 indicates exceeded rated writes - Critical for SSD lifecycle management Recommended Actions: 1. Plan replacement below 20% 2. Monitor write workload 3. Consider workload redistribution """, 'Temperature_Celsius': """ Current drive temperature. Impact: - High temperatures accelerate wear - Optimal range: 20-45°C - Sustained high temps reduce lifespan Recommended Actions: 1. Check system cooling 2. Verify airflow 3. Monitor for sustained high temperatures """, 'Available_Spare': """ Percentage of spare blocks remaining (SSDs). Impact: - Critical for SSD endurance - Low values indicate approaching end-of-life - Rapid decreases suggest excessive writes Recommended Actions: 1. Plan replacement if below 20% 2. Monitor write patterns 3. Consider workload changes """, 'Program_Fail_Count': """ Number of flash program operation failures. Impact: - Indicates NAND cell reliability - Important for SSD health assessment - Increasing values suggest flash degradation Recommended Actions: 1. Monitor rate of increase 2. Check firmware updates 3. Plan replacement if rapidly increasing """, 'Erase_Fail_Count': """ Number of flash erase operation failures. Impact: - Related to NAND block health - Critical for SSD reliability - High counts suggest failing flash blocks Recommended Actions: 1. Monitor count increases 2. Check firmware version 3. Plan replacement if count is high """, 'Load_Cycle_Count': """ Number of power cycles and head load/unload events. Impact: - Normal operation metric - High counts may indicate power management issues - Compare against rated cycles (typically 600k-1M) Recommended Actions: 1. Review power management settings 2. Monitor rate of increase 3. Plan replacement near rated limit """, 'Wear_Leveling_Count': """ SSD block erase distribution metric. Impact: - Indicates wear pattern uniformity - Interpretation varies by manufacturer - Critical for SSD longevity Recommended Actions: 1. Monitor trend over time 2. Compare with manufacturer baseline 3. Check workload distribution Note: Different manufacturers use different counting methods: - Some count up from 0 (Samsung, etc.) - Others count down from baseline (Ridata, etc.) - Always check manufacturer specifications """ } # ============================================================================= # INITIALIZATION # ============================================================================= def __init__(self, ticket_api_url: str = 'http://10.10.10.45/create_ticket_api.php', dry_run: bool = False): """ Initialize the system health monitor. :param ticket_api_url: URL for the ticket creation API. :param dry_run: If True, simulate API calls without sending requests. """ self.ticket_api_url = ticket_api_url self.dry_run = dry_run # Ensure history directory exists os.makedirs(self.CONFIG['HISTORY_DIR'], exist_ok=True) # ============================================================================= # MAIN EXECUTION METHODS # ============================================================================= def run(self): """Perform a one-shot health check of the system.""" try: # Perform health checks and gather the report health_report = self.perform_health_checks() # Create tickets for any detected critical issues self._create_tickets_for_issues(health_report) except Exception as e: import traceback logger.error(f"Unexpected error during health check: {e}") logger.error(traceback.format_exc()) def perform_health_checks(self) -> Dict[str, Any]: """Perform comprehensive system health checks and return a report.""" health_report = { 'drives_health': self._check_drives_health(), 'memory_health': self._check_memory_usage(), 'cpu_health': self._check_cpu_usage(), 'network_health': self._check_network_status(), 'lxc_health': self._check_lxc_storage(), 'system_health': self._check_system_drive_indicators() } if self.dry_run: logger.info("\n=== System Health Summary ===") logger.info(f"Overall Drive Health: {health_report['drives_health']['overall_status']}") # Summarized drive information with usage logger.info("\nDrive Status:") for drive in health_report['drives_health']['drives']: issues = drive.get('smart_issues', []) temp = f", {drive.get('temperature')}°C" if drive.get('temperature') else "" status = "āš ļø " if issues else "āœ“ " # Disk usage information usage_info = "" if drive.get('partitions'): for partition in drive['partitions']: usage_info += f"\n └─ {partition['mountpoint']}: {partition['used_space']}/{partition['total_space']} ({partition['usage_percent']}% used)" logger.info(f"{status}{drive['device']}{temp} - SMART: {drive['smart_status']}{usage_info}") if issues: logger.info(f" Issues: {', '.join(issues)}") logger.info(f"\nMemory: {health_report['memory_health']['memory_percent']}% used") if health_report['memory_health'].get('has_ecc'): logger.info("ECC Memory: Present") if health_report['memory_health'].get('ecc_errors'): logger.info(f"ECC Errors: {len(health_report['memory_health']['ecc_errors'])} found") logger.info(f"\nCPU Usage: {health_report['cpu_health']['cpu_usage_percent']}%") logger.info("\nNetwork Status:") logger.info(f"Management: {health_report['network_health']['management_network']['status']}") logger.info(f"Ceph: {health_report['network_health']['ceph_network']['status']}") if health_report['system_health']['issues']: logger.info(f"\nSystem Issues: {len(health_report['system_health']['issues'])} found") logger.info("\n=== End Summary ===") return health_report # ============================================================================= # ENHANCED SMART ANALYSIS METHODS # ============================================================================= def _analyze_smart_trends(self, device: str, current_attributes: dict) -> List[str]: """Analyze SMART attribute trends to predict failures.""" issues = [] # Create safe filename from device path device_safe = device.replace('/', '_').replace('-', '_') historical_file = os.path.join(self.CONFIG['HISTORY_DIR'], f"smart_history_{device_safe}.json") try: # Load historical data if os.path.exists(historical_file): with open(historical_file, 'r') as f: history = json.load(f) else: history = [] # Add current reading current_reading = { 'timestamp': datetime.datetime.now().isoformat(), 'attributes': current_attributes } history.append(current_reading) # Keep only recent data cutoff_date = datetime.datetime.now() - datetime.timedelta(days=self.CONFIG['HISTORY_RETENTION_DAYS']) history = [h for h in history if datetime.datetime.fromisoformat(h['timestamp']) > cutoff_date] # Analyze trends for critical attributes if len(history) >= 3: # Need at least 3 data points critical_attrs = ['Reallocated_Sector_Ct', 'Current_Pending_Sector', 'Reported_Uncorrect', 'Offline_Uncorrectable', 'Program_Fail_Count', 'Erase_Fail_Count'] for attr in critical_attrs: if attr in current_attributes: # Get last week's values recent_history = history[-7:] if len(history) >= 7 else history values = [h['attributes'].get(attr, 0) for h in recent_history] if len(values) >= 3: # Check for rapid increase recent_increase = values[-1] - values[0] if recent_increase > 0: rate = recent_increase / len(values) # Different thresholds for different attributes if attr in ['Reallocated_Sector_Ct', 'Current_Pending_Sector']: if rate > 0.5: # More than 0.5 sectors per check issues.append(f"TREND ALERT: Rapid increase in {attr}: +{recent_increase} in {len(values)} checks") elif attr in ['Reported_Uncorrect', 'Offline_Uncorrectable']: if rate > 0.2: # Any consistent increase is concerning issues.append(f"TREND ALERT: Increasing {attr}: +{recent_increase} in {len(values)} checks") else: # Program/Erase fail counts if rate > 1: # More than 1 error per check issues.append(f"TREND ALERT: Rapid increase in {attr}: +{recent_increase} in {len(values)} checks") # Save updated history with open(historical_file, 'w') as f: json.dump(history, f, indent=2) except Exception as e: logger.debug(f"Error analyzing trends for {device}: {e}") return issues def _check_thermal_health(self, device: str, temperature: int, drive_type: str = 'HDD') -> List[str]: """Enhanced thermal health checking with drive-type specific thresholds.""" issues = [] if temperature is None: return issues # Drive-type specific temperature thresholds - ADJUSTED TO BE LESS SENSITIVE if drive_type == 'SSD': temp_thresholds = {'warning': 70, 'critical': 85, 'optimal_max': 65} else: # HDD temp_thresholds = {'warning': 65, 'critical': 75, 'optimal_max': 60} if temperature >= temp_thresholds['critical']: issues.append(f"CRITICAL: Drive temperature {temperature}°C exceeds safe operating limit for {drive_type}") elif temperature >= temp_thresholds['warning']: issues.append(f"WARNING: Drive temperature {temperature}°C approaching thermal limit for {drive_type}") elif temperature > temp_thresholds['optimal_max']: issues.append(f"INFO: Drive temperature {temperature}°C above optimal range for {drive_type}") return issues def _analyze_error_patterns(self, device: str, smart_output: str) -> List[str]: """Analyze SMART error logs for failure patterns.""" issues = [] # Pattern matching for different error types error_patterns = { 'media_errors': [ r'UNC_ERR', r'ABRT_ERR', r'read error', r'write error', r'medium error' ], 'interface_errors': [ r'ICRC_ERR', r'interface CRC error', r'SATA link down', r'communication failure' ], 'timeout_errors': [ r'command timeout', r'NCQ error', r'device fault', r'reset required' ] } for error_type, patterns in error_patterns.items(): error_count = 0 for pattern in patterns: matches = re.findall(pattern, smart_output, re.IGNORECASE) error_count += len(matches) if error_count > 0: if error_count >= 10: issues.append(f"CRITICAL: Multiple {error_type} detected ({error_count} occurrences)") elif error_count >= 3: issues.append(f"WARNING: {error_type} detected ({error_count} occurrences)") elif error_count >= 1: issues.append(f"INFO: {error_type} detected ({error_count} occurrences)") return issues def _check_ssd_health(self, device: str, smart_attributes: dict) -> List[str]: """SSD-specific health checks for wear and endurance.""" issues = [] # Check wear leveling and endurance indicators wear_indicators = [ 'Media_Wearout_Indicator', 'SSD_Life_Left', 'Percent_Lifetime_Remain', 'Available_Spare', 'Available_Spare_Threshold' ] for indicator in wear_indicators: if indicator in smart_attributes: value = smart_attributes[indicator] # Handle percentage-based indicators (countdown from 100) if indicator in ['Media_Wearout_Indicator', 'SSD_Life_Left', 'Percent_Lifetime_Remain', 'Available_Spare']: if value <= 5: issues.append(f"CRITICAL: {indicator} at {value}% - SSD near end of life") elif value <= 15: issues.append(f"WARNING: {indicator} at {value}% - SSD showing significant wear") elif value <= 30: issues.append(f"INFO: {indicator} at {value}% - SSD wear monitoring recommended") # Check for excessive bad blocks bad_block_indicators = [ 'Runtime_Bad_Block', 'Factory_Bad_Block_Ct', 'Grown_Failing_Block_Ct', 'End-to-End_Error' ] for indicator in bad_block_indicators: if indicator in smart_attributes: value = smart_attributes[indicator] if value > 100: issues.append(f"WARNING: High {indicator}: {value}") elif value > 10: issues.append(f"INFO: Elevated {indicator}: {value}") # Check write amplification and endurance metrics endurance_indicators = [ 'Total_LBAs_Written', 'Total_LBAs_Read', 'Host_Program_NAND_Pages_Count', 'FTL_Program_NAND_Pages_Count' ] # Calculate write amplification if both host and FTL write counts are available host_writes = smart_attributes.get('Host_Program_NAND_Pages_Count', 0) ftl_writes = smart_attributes.get('FTL_Program_NAND_Pages_Count', 0) if host_writes > 0 and ftl_writes > 0: write_amplification = ftl_writes / host_writes if write_amplification > 5.0: issues.append(f"WARNING: High write amplification factor: {write_amplification:.2f}") elif write_amplification > 3.0: issues.append(f"INFO: Elevated write amplification factor: {write_amplification:.2f}") return issues def _check_system_drive_indicators(self) -> Dict[str, Any]: """Check system logs and kernel messages for drive issues.""" system_health = { 'status': 'OK', 'issues': [] } try: # Check dmesg for drive-related errors (last 1000 lines to avoid overwhelming output) result = subprocess.run(['dmesg', '-T', '--level=err,warn'], stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, timeout=10) if result.returncode == 0: error_patterns = [ (r'ata\d+.*failed command', 'ATA command failures'), (r'sd \w+.*Medium Error', 'SCSI medium errors'), (r'Buffer I/O error', 'Buffer I/O errors'), (r'critical medium error', 'Critical medium errors'), (r'unrecovered read error', 'Unrecovered read errors'), (r'Current_Pending_Sector.*increased', 'Pending sector increases'), (r'ata\d+.*SError:', 'SATA errors'), (r'nvme\d+.*I/O error', 'NVMe I/O errors') ] for pattern, description in error_patterns: matches = re.findall(pattern, result.stdout, re.IGNORECASE) if matches: count = len(matches) if count >= 5: system_health['status'] = 'CRITICAL' system_health['issues'].append(f"CRITICAL: {description} in system logs ({count} occurrences)") elif count >= 2: if system_health['status'] != 'CRITICAL': system_health['status'] = 'WARNING' system_health['issues'].append(f"WARNING: {description} in system logs ({count} occurrences)") else: system_health['issues'].append(f"INFO: {description} in system logs ({count} occurrences)") except subprocess.TimeoutExpired: system_health['issues'].append("WARNING: System log check timed out") except Exception as e: logger.debug(f"Error checking system drive indicators: {e}") system_health['issues'].append(f"ERROR: Failed to check system logs: {str(e)}") return system_health # ============================================================================= # DRIVE HEALTH CHECKING METHODS # ============================================================================= def _get_drive_details(self, device: str) -> Dict[str, str]: """Get detailed drive information using smartctl.""" drive_details = { 'model': None, 'serial': None, 'capacity': None, 'firmware': None, 'type': None, # SSD or HDD 'smart_capable': False } try: # First check if device supports SMART capability_result = subprocess.run( ['smartctl', '-i', device], stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True ) # Check if smartctl failed completely if capability_result.returncode not in [0, 4]: # 0 = success, 4 = some SMART errors but readable logger.debug(f"smartctl failed for {device}: return code {capability_result.returncode}") return drive_details output = capability_result.stdout # Check if SMART is supported if "SMART support is: Enabled" in output or "SMART support is: Available" in output: drive_details['smart_capable'] = True elif "SMART support is: Unavailable" in output or "does not support SMART" in output: logger.debug(f"Device {device} does not support SMART") return drive_details for line in output.split('\n'): if 'Device Model' in line or 'Model Number' in line: drive_details['model'] = line.split(':')[1].strip() elif 'Serial Number' in line: drive_details['serial'] = line.split(':')[1].strip() elif 'User Capacity' in line: # Extract capacity from brackets capacity_match = re.search(r'\[(.*?)\]', line) if capacity_match: drive_details['capacity'] = capacity_match.group(1) elif 'Firmware Version' in line: drive_details['firmware'] = line.split(':')[1].strip() elif 'Rotation Rate' in line: if 'Solid State Device' in line: drive_details['type'] = 'SSD' else: drive_details['type'] = 'HDD' except Exception as e: logger.debug(f"Error getting drive details for {device}: {e}") return drive_details def make_box(self, title: str, content: str, content_width: int = 70) -> str: """Create a formatted box with title and content.""" return f""" ā”ā” {title} {'━' * (content_width - len(title) - 3)}┓ {content} ā”—{'━' * content_width}ā”›""" def _get_issue_type(self, issue: str) -> str: """Determine issue type from issue description.""" if "SMART" in issue: return "SMART Health Issue" elif "Drive" in issue: return "Storage Issue" elif "ECC" in issue: return "Memory Issue" elif "CPU" in issue: return "Performance Issue" elif "Network" in issue: return "Network Issue" return "Hardware Issue" def _get_impact_level(self, issue: str) -> str: """Determine impact level from issue description.""" if "CRITICAL" in issue or "UNHEALTHY" in issue: return "šŸ”“ Critical - Immediate Action Required" elif "WARNING" in issue: return "🟔 Warning - Action Needed Soon" return "🟢 Low - Monitor Only" def _generate_detailed_description(self, issue: str, health_report: Dict[str, Any]) -> str: """Generate detailed ticket description.""" hostname = socket.gethostname() timestamp = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") priority = "⚠ HIGH" if "CRITICAL" in issue else "ā— MEDIUM" content_width = self.STANDARD_WIDTH - 2 banner = f""" ā”{'━' * content_width}┓ ā”ƒ{' HARDWARE MONITORING ALERT TICKET '.center(content_width)}ā”ƒ ┣{'━' * content_width}┫ ā”ƒ Host : {hostname:<{content_width-13}}ā”ƒ ā”ƒ Generated : {timestamp:<{content_width-13}}ā”ƒ ā”ƒ Priority : {priority:<{content_width-13}}ā”ƒ ā”—{'━' * content_width}ā”›""" executive_summary = f""" ā”ā” EXECUTIVE SUMMARY ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┓ ā”ƒ Issue Type │ {self._get_issue_type(issue)} ā”ƒ ā”ƒ Impact Level │ {self._get_impact_level(issue)} ā”ƒ ┗━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┛ """ description = banner + executive_summary # Add relevant SMART descriptions for attr in self.SMART_DESCRIPTIONS: if attr in issue: description += f"\n{attr}:\n{self.SMART_DESCRIPTIONS[attr]}\n" if "SMART" in issue: description += """ SMART (Self-Monitoring, Analysis, and Reporting Technology) Attribute Details: - Possible drive failure! """ if "Drive" in issue and "/dev/" in issue: try: device = re.search(r'/dev/[a-zA-Z0-9]+', issue).group(0) if '/dev/' in issue else None drive_info = next((d for d in health_report['drives_health']['drives'] if d['device'] == device), None) if drive_info: drive_details = self._get_drive_details(device) smart_data = { 'attributes': drive_info.get('smart_attributes', {}), 'performance_metrics': drive_info.get('performance_metrics', {}), 'last_test_date': drive_info.get('last_test_date', 'N/A') } power_on_hours = smart_data['attributes'].get('Power_On_Hours', 'N/A') last_test_date = smart_data.get('last_test_date', 'N/A') age = f"{int(power_on_hours/24/365) if isinstance(power_on_hours, (int, float)) else 'N/A'} years" if power_on_hours != 'N/A' else 'N/A' # Fix the formatting issue by ensuring all values are strings and not None device_safe = device or 'N/A' model_safe = drive_details.get('model') or 'N/A' serial_safe = drive_details.get('serial') or 'N/A' capacity_safe = drive_details.get('capacity') or 'N/A' type_safe = drive_details.get('type') or 'N/A' firmware_safe = drive_details.get('firmware') or 'N/A' description += f""" ā”ā” DRIVE SPECIFICATIONS ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┓ ā”ƒ Device Path │ {device_safe:<60} ā”ƒ ā”ƒ Model │ {model_safe:<60} ā”ƒ ā”ƒ Serial │ {serial_safe:<60} ā”ƒ ā”ƒ Capacity │ {capacity_safe:<60} ā”ƒ ā”ƒ Type │ {type_safe:<60} ā”ƒ ā”ƒ Firmware │ {firmware_safe:<60} ā”ƒ ┗━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┛ """ if drive_info: perf_metrics = { 'read_speed': drive_info.get('performance_metrics', {}).get('read_speed', 'N/A'), 'write_speed': drive_info.get('performance_metrics', {}).get('write_speed', 'N/A'), 'access_time': drive_info.get('performance_metrics', {}).get('access_time', 'N/A'), 'iops': drive_info.get('performance_metrics', {}).get('iops', 'N/A') } power_on_safe = f"{power_on_hours} hours" if power_on_hours != 'N/A' else 'N/A' last_test_safe = last_test_date or 'N/A' age_safe = age or 'N/A' description += f""" ā”ā” DRIVE TIMELINE ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┓ ā”ƒ Power-On Hours │ {power_on_safe:<56} ā”ƒ ā”ƒ Last SMART Test │ {last_test_safe:<56} ā”ƒ ā”ƒ Drive Age │ {age_safe:<56} ā”ƒ ┗━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┛ """ smart_status_safe = drive_info.get('smart_status') or 'N/A' temp_safe = f"{drive_info.get('temperature')}°C" if drive_info.get('temperature') else 'N/A' description += f""" ā”ā” SMART STATUS ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┓ ā”ƒ Status │ {smart_status_safe:<60} ā”ƒ ā”ƒ Temperature │ {temp_safe:<60} ā”ƒ ┗━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┛ """ if drive_info.get('smart_attributes'): description += "\nā”ā” SMART ATTRIBUTES " + "━" * 48 + "┓\n" for attr, value in drive_info['smart_attributes'].items(): attr_safe = str(attr).replace('_', ' ') if attr else 'Unknown' value_safe = str(value) if value is not None else 'N/A' description += f"ā”ƒ {attr_safe:<25} │ {value_safe:<37} ā”ƒ\n" description += "ā”—" + "━" * 71 + "ā”›\n" if drive_info.get('partitions'): for partition in drive_info['partitions']: usage_percent = partition.get('usage_percent', 0) blocks = int(usage_percent / 5) # 20 blocks total = 100% usage_meter = 'ā–ˆ' * blocks + 'ā–‘' * (20 - blocks) mountpoint_safe = partition.get('mountpoint') or 'N/A' fstype_safe = partition.get('fstype') or 'N/A' total_space_safe = partition.get('total_space') or 'N/A' used_space_safe = partition.get('used_space') or 'N/A' free_space_safe = partition.get('free_space') or 'N/A' description += f""" ā”ā” PARTITION [{mountpoint_safe:<60}] ━┓ ā”ƒ Filesystem │ {fstype_safe:<60} ā”ƒ ā”ƒ Usage Meter │ [{usage_meter:<58}] ā”ƒ ā”ƒ Total Space │ {total_space_safe:<60} ā”ƒ ā”ƒ Used Space │ {used_space_safe:<60} ā”ƒ ā”ƒ Free Space │ {free_space_safe:<60} ā”ƒ ā”ƒ Usage │ {usage_percent}%{'':<57} ā”ƒ ┗━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┛ """ firmware_info = self._check_disk_firmware(device) if firmware_info['is_problematic']: description += "\nā”ā” FIRMWARE ALERTS " + "━" * 48 + "┓\n" for issue_item in firmware_info['known_issues']: issue_safe = str(issue_item) if issue_item else 'Unknown issue' description += f"ā”ƒ ⚠ {issue_safe:<67} ā”ƒ\n" description += "ā”—" + "━" * 71 + "ā”›\n" except Exception as e: description += f"\nError generating drive details: {str(e)}\n" if "Temperature" in issue: description += """ High drive temperatures can: - Reduce drive lifespan - Cause performance degradation - Lead to data corruption in extreme cases Optimal temperature range: 20-45°C """ if "ECC" in issue: description += """ ECC (Error Correction Code) Memory Issues: - Correctable: Memory errors that were successfully fixed - Uncorrectable: Serious memory errors that could not be corrected Frequent ECC corrections may indicate degrading memory modules """ if "CPU" in issue: description += """ High CPU usage sustained over time can indicate: - Resource constraints - Runaway processes - Need for performance optimization - Potential cooling issues """ if "Network" in issue: description += """ Network connectivity issues can impact: - Cluster communication - Data replication - Service availability - Management access """ if "Disk" in issue: for partition in health_report.get('drives_health', {}).get('drives', []): if partition.get('mountpoint') in issue: description += f"\n=== Disk Metrics ===\n" description += f"Disk Device: {partition['device']}\n" description += f"Mount Point: {partition['mountpoint']}\n" description += f"Total Space: {partition['total_space']}\n" description += f"Used Space: {partition['used_space']}\n" description += f"Free Space: {partition['free_space']}\n" description += f"Usage Percent: {partition['usage_percent']}%\n" return description def _determine_ticket_priority(self, issue: str, health_report: Dict[str, Any]) -> str: """ Determine ticket priority based on issue type and severity. P1 = Critical system outages (reserved for future major outages) P2 = Hardware failures requiring same-day response P3 = Warnings requiring response within 1-3 days P4 = Low priority monitoring alerts """ issue_lower = issue.lower() # P1 - Reserved for major system outages (implement later) # if 'cluster down' in issue_lower or 'total failure' in issue_lower: # return self.PRIORITIES['CRITICAL'] # P1 # P2 - Hardware failures requiring same-day response if any(keyword in issue_lower for keyword in [ 'smart failure', 'drive failure', 'disk failure', 'uncorrectable ecc', 'hardware failure', 'critical temperature', 'firmware issue', 'reallocated sector', 'pending sector' ]): return self.PRIORITIES['HIGH'] # P2 # P2 - SMART errors indicating potential drive failure if 'smart issues' in issue_lower and any(error_type in issue_lower for error_type in [ 'error', 'failed', 'reallocated', 'pending', 'uncorrectable' ]): return self.PRIORITIES['HIGH'] # P2 # P2 - Critical storage usage (>90%) if 'critical storage usage' in issue_lower: return self.PRIORITIES['HIGH'] # P2 # P2 - Network failures affecting cluster communication if any(keyword in issue_lower for keyword in [ 'network failure', 'unreachable', 'network down' ]): return self.PRIORITIES['HIGH'] # P2 # P3 - Warnings requiring attention within days if any(keyword in issue_lower for keyword in [ 'high temperature', 'high storage usage', 'correctable ecc', 'high cpu usage', 'warning' ]): return self.PRIORITIES['MEDIUM'] # P3 # P4 - Low priority monitoring alerts return self.PRIORITIES['LOW'] # P4 # ============================================================================= # TICKET CREATION METHODS # ============================================================================= def _create_tickets_for_issues(self, health_report: Dict[str, Any]): """Create tickets for detected issues.""" issues = self._detect_issues(health_report) if not issues: logger.info("No issues detected.") return hostname = socket.gethostname() action_type = self.TICKET_TEMPLATES['ACTION_TYPE'] environment = self.TICKET_TEMPLATES['ENVIRONMENT'] ticket_type = self.TICKET_TEMPLATES['TICKET_TYPE'] hardware_type = self.TICKET_TEMPLATES['HARDWARE_TYPE'] software_type = self.TICKET_TEMPLATES['SOFTWARE_TYPE'] for issue in issues: if issue.lower().startswith('critical') or 'critical' in issue.upper(): priority = self.PRIORITIES['CRITICAL'] elif issue.lower().startswith('warning') or 'warning' in issue.lower(): # all warnings become LOW priority (4) priority = self.PRIORITIES['LOW'] else: # everything else stays at MEDIUM (3) priority = self.PRIORITIES['MEDIUM'] category = self.TICKET_TEMPLATES['DEFAULT_CATEGORY'] issue_type = self.TICKET_TEMPLATES['DEFAULT_ISSUE_TYPE'] scope = self.TICKET_TEMPLATES['SCOPE']['SINGLE_NODE'] drive_size = "" if "Drive" in issue and "/dev/" in issue: device = re.search(r'/dev/[a-zA-Z0-9]+', issue).group(0) drive_details = self._get_drive_details(device) if drive_details['capacity']: drive_size = f"[{drive_details['capacity']}] " # Determine if this is a hardware or software issue issue_category = 'SOFTWARE' if 'LXC' in issue else 'HARDWARE' # Use the correct template based on issue category category_template = hardware_type['HARDWARE'] if issue_category == 'HARDWARE' else software_type['SOFTWARE'] ticket_title = ( f"[{hostname}]" f"{action_type['AUTO']}" f"{category_template}" f"{issue}" f"{scope}" f"{environment['PRODUCTION']}" f"{ticket_type['MAINTENANCE']}" ) description = self._generate_detailed_description(issue, health_report) ticket_payload = { "title": ticket_title, "description": description, "priority": priority, "status": "Open", "category": category, "type": issue_type } if self.dry_run: logger.info("Dry-run mode enabled. Simulating ticket creation:") logger.info(json.dumps(ticket_payload, indent=4)) else: try: response = requests.post( self.ticket_api_url, json=ticket_payload, headers={'Content-Type': 'application/json'} ) response_data = response.json() if response_data.get('success'): logger.info(f"Ticket created successfully: {ticket_title}") logger.info(f"Ticket ID: {response_data.get('ticket_id')}") elif response_data.get('error') == 'Duplicate ticket': logger.info(f"Duplicate ticket detected - existing ticket ID: {response_data.get('existing_ticket_id')}") continue else: logger.error(f"Failed to create ticket: {response_data.get('error')}") except Exception as e: logger.error(f"Error creating ticket: {e}") def _detect_issues(self, health_report: Dict[str, Any]) -> List[str]: """ Detect issues in the health report including non-critical issues. :param health_report: The comprehensive health report from the checks. :return: List of issue descriptions detected during checks. """ issues = [] # Check for drive-related issues for drive in health_report.get('drives_health', {}).get('drives', []): # Skip drives with ERROR or NOT_SUPPORTED status - these are likely virtual/unsupported devices if drive.get('smart_status') in ['ERROR', 'NOT_SUPPORTED']: logger.debug(f"Skipping issue detection for drive {drive['device']} with status {drive.get('smart_status')}") continue # Only report issues for drives with valid SMART status if drive.get('smart_issues') and drive.get('smart_status') in ['HEALTHY', 'UNHEALTHY', 'UNKNOWN']: # Filter out generic error messages that don't indicate real hardware issues filtered_issues = [] for issue in drive['smart_issues']: if not any(skip_phrase in issue for skip_phrase in [ "Error checking SMART:", "Unable to read device information", "SMART not supported", "timed out" ]): filtered_issues.append(issue) if filtered_issues: issues.append(f"Drive {drive['device']} has SMART issues: {', '.join(filtered_issues)}") # Check temperature regardless of SMART status if drive.get('temperature') and drive['temperature'] > self.CONFIG['THRESHOLDS']['TEMPERATURE_WARNING']: issues.append(f"Drive {drive['device']} temperature is high: {drive['temperature']}°C") # Check for ECC memory errors memory_health = health_report.get('memory_health', {}) if memory_health.get('has_ecc') and memory_health.get('ecc_errors'): issues.extend(memory_health['ecc_errors']) # Check for CPU-related issues cpu_health = health_report.get('cpu_health', {}) if cpu_health and cpu_health.get('cpu_usage_percent', 0) > self.CONFIG['THRESHOLDS']['CPU_WARNING']: issues.append("CPU usage is above threshold of 95%") # Check for network-related issues network_health = health_report.get('network_health', {}) for network in ['management_network', 'ceph_network']: if network_health.get(network, {}).get('issues'): issues.extend(network_health[network]['issues']) lxc_health = health_report.get('lxc_health', {}) if lxc_health.get('status') in ['WARNING', 'CRITICAL']: issues.extend(lxc_health.get('issues', [])) # Check for system-level drive issues system_health = health_report.get('system_health', {}) if system_health.get('issues'): issues.extend(system_health['issues']) logger.info("=== Issue Detection Started ===") logger.info(f"Checking drives: {len(health_report['drives_health']['drives'])} found") logger.info(f"Memory status: {health_report['memory_health']['status']}") logger.info(f"CPU status: {health_report['cpu_health']['status']}") logger.info(f"Network status: {health_report['network_health']}") logger.info(f"System status: {health_report['system_health']['status']}") logger.info(f"Detected issues: {issues}") logger.info("=== Issue Detection Completed ===\n") return issues # ============================================================================= # DISK AND STORAGE UTILITY METHODS # ============================================================================= def _get_all_disks(self) -> List[str]: """Get all physical disks using multiple detection methods.""" disks = set() # Method 1: Use lsblk to get physical disks, excluding virtual devices try: result = subprocess.run( ['lsblk', '-d', '-n', '-o', 'NAME,TYPE'], stdout=subprocess.PIPE, text=True ) for line in result.stdout.strip().split('\n'): if line: parts = line.split() if len(parts) >= 2: name, device_type = parts[0], parts[1] # Only include actual disks, exclude virtual devices if device_type == 'disk' and not name.startswith('rbd'): disks.add(f"/dev/{name}") logger.debug(f"Physical disks found via lsblk: {disks}") except Exception as e: logger.debug(f"lsblk detection failed: {e}") # Method 2: Direct device scanning for physical devices only for pattern in ['/dev/sd[a-z]', '/dev/nvme[0-9]n[0-9]']: try: import glob matches = glob.glob(pattern) # Filter out partitions (devices ending in numbers for sd*, already filtered for nvme) if 'sd' in pattern: matches = [d for d in matches if not d[-1].isdigit()] disks.update(matches) logger.debug(f"Disks found via glob {pattern}: {matches}") except Exception as e: logger.debug(f"Glob detection failed for {pattern}: {e}") return list(disks) def _is_excluded_mount(self, mountpoint: str) -> bool: """Check if a mountpoint should be excluded from monitoring.""" # Check exact matches if mountpoint in self.CONFIG['EXCLUDED_MOUNTS']: return True # Check patterns for pattern in self.CONFIG['EXCLUDED_PATTERNS']: if re.match(pattern, mountpoint): return True return False def _parse_size(self, size_str: str) -> float: """ Parse size string with units to bytes. :param size_str: String containing size with unit (e.g. '15.7G', '21.8T') :return: Size in bytes as float """ try: # Skip non-size strings if not isinstance(size_str, str): logger.debug(f"Not a string: {size_str}") return 0.0 if not any(unit in size_str.upper() for unit in ['B', 'K', 'M', 'G', 'T']): logger.debug(f"No valid size unit found in: {size_str}") return 0.0 # Define multipliers for units multipliers = { 'B': 1, 'K': 1024, 'M': 1024**2, 'G': 1024**3, 'T': 1024**4 } # Extract numeric value and unit match = re.match(r'(\d+\.?\d*)', size_str) if not match: logger.debug(f"Could not extract numeric value from: {size_str}") return 0.0 value = float(match.group(1)) unit_match = re.search(r'([BKMGT])', size_str.upper()) if not unit_match: logger.debug(f"Could not extract unit from: {size_str}") return 0.0 unit = unit_match.group(1) # Convert to bytes bytes_value = value * multipliers.get(unit, 0) return bytes_value except (ValueError, AttributeError, TypeError) as e: logger.debug(f"Failed to parse size string: {size_str}") logger.debug(f"P**** error details: {str(e)}") return 0.0 def _is_physical_disk(self, device_path): """ Check if the device is a physical disk, excluding logical volumes and special devices. :param device_path: Path to the device :return: Boolean indicating if it's a relevant physical disk """ logger.debug(f"Checking device: {device_path}") # Exclude known non-physical or special devices excluded_patterns = [ r'/dev/mapper/', # LVM devices r'/dev/dm-', # Device mapper devices r'/dev/loop', # Loop devices r'/dev/rbd', # Ceph RBD devices r'/boot', # Boot partitions r'/boot/efi', # EFI partitions r'[0-9]+$' # Partition numbers ] if any(re.search(pattern, device_path) for pattern in excluded_patterns): logger.debug(f"Device {device_path} excluded due to pattern match") return False # Match physical devices physical_patterns = [ r'/dev/sd[a-z]+$', # SATA/SAS drives r'/dev/nvme\d+n\d+$', # NVMe drives r'/dev/mmcblk\d+$', # MMC/SD cards r'/dev/hd[a-z]+$' # IDE drives (legacy) ] is_physical = any(re.match(pattern, device_path) for pattern in physical_patterns) logger.debug(f"Device {device_path} physical disk check result: {is_physical}") return is_physical def _check_disk_firmware(self, device: str) -> Dict[str, Any]: """Check disk firmware version against known problematic versions.""" firmware_info = { 'version': None, 'model': None, 'manufacturer': None, 'is_problematic': False, 'known_issues': [] } MANUFACTURER_PATTERNS = { 'Western Digital': ['WDC', 'Western Digital', 'Ultrastar'], 'Samsung': ['Samsung', 'SAMSUNG'], 'Seagate': ['Seagate', 'ST'], 'Intel': ['Intel', 'INTEL'], 'Micron': ['Micron', 'Crucial'], 'Toshiba': ['Toshiba', 'TOSHIBA'] } try: result = subprocess.run( ['smartctl', '-i', device], stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True ) model_line = None for line in result.stdout.split('\n'): if 'Firmware Version:' in line: firmware_info['version'] = line.split(':')[1].strip() elif 'Model Family:' in line: model_line = line firmware_info['model'] = line.split(':')[1].strip() elif 'Device Model:' in line and not firmware_info['model']: model_line = line firmware_info['model'] = line.split(':')[1].strip() # Determine manufacturer if model_line: for manufacturer, patterns in MANUFACTURER_PATTERNS.items(): if any(pattern in model_line for pattern in patterns): firmware_info['manufacturer'] = manufacturer break # Check against known problematic versions if firmware_info['manufacturer'] and firmware_info['model']: # Check if manufacturer exists in our problematic firmware database if firmware_info['manufacturer'] in self.PROBLEMATIC_FIRMWARE: for model, versions in self.PROBLEMATIC_FIRMWARE[firmware_info['manufacturer']].items(): if model in firmware_info['model'] and firmware_info['version'] in versions: firmware_info['is_problematic'] = True firmware_info['known_issues'].append( f"Known problematic firmware version {firmware_info['version']} " f"for {firmware_info['model']}" ) logger.debug(f"=== Firmware Check for {device} ===") logger.debug(f"Firmware version: {firmware_info['version']}") logger.debug(f"Model: {firmware_info['model']}") logger.debug(f"Manufacturer: {firmware_info['manufacturer']}") logger.debug(f"Known issues: {firmware_info['known_issues']}") logger.debug("=== End Firmware Check ===\n") except Exception as e: firmware_info['known_issues'].append(f"Error checking firmware: {str(e)}") return firmware_info # ============================================================================= # SMART HEALTH CHECKING METHODS # ============================================================================= def _parse_smart_value(self, raw_value: str) -> int: """Parse SMART values handling different formats including NVMe temperature readings.""" try: # Handle temperature values with °C if isinstance(raw_value, str) and '°C' in raw_value: # Extract only the numeric portion before °C temp_value = raw_value.split('°C')[0].strip() return int(temp_value) # Handle time format (e.g., '15589h+17m+33.939s') if 'h+' in raw_value: return int(raw_value.split('h+')[0]) # Handle hex values if '0x' in raw_value: return int(raw_value, 16) # Handle basic numbers return int(raw_value) except ValueError: logger.debug(f"Could not parse SMART value: {raw_value}") return 0 def _detect_manufacturer(self, model: str, serial: str = None) -> str: """Enhanced manufacturer detection based on model and serial patterns.""" if not model: return 'Unknown' model_upper = model.upper() # Western Digital patterns (including HGST which WD acquired) if any(pattern in model_upper for pattern in ['WDC', 'WD-', 'HGST', 'WESTERN DIGITAL']): return 'Western Digital' # Seagate patterns elif any(pattern in model_upper for pattern in ['ST', 'SEAGATE']): return 'Seagate' # Samsung patterns elif 'SAMSUNG' in model_upper: return 'Samsung' # Intel patterns elif any(pattern in model_upper for pattern in ['INTEL', 'SSDSC']): return 'Intel' # Micron/Crucial patterns elif any(pattern in model_upper for pattern in ['CRUCIAL', 'MICRON', 'CT']): return 'Micron' # Toshiba patterns elif 'TOSHIBA' in model_upper: return 'Toshiba' # Ridata/Ritek patterns (for your existing special handling) elif any(pattern in model_upper for pattern in ['RIDATA', 'RITEK']): return 'Ridata' # OOS patterns (for your existing special handling) elif 'OOS' in model_upper: return 'OOS' return 'Unknown' def _get_manufacturer_profile(self, model: str, manufacturer: str = None, firmware: str = None) -> Dict[str, Any]: """Get manufacturer-specific SMART profile based on drive model/manufacturer/firmware.""" logger.debug(f"Looking for profile - Model: '{model}', Manufacturer: '{manufacturer}', Firmware: '{firmware}'") # First, try to detect manufacturer if not provided if not manufacturer: manufacturer = self._detect_manufacturer(model) logger.debug(f"Auto-detected manufacturer: {manufacturer}") # Check each manufacturer profile for mfg, profile in self.MANUFACTURER_SMART_PROFILES.items(): # Check firmware patterns first (most specific for OEM drives like RiData) if firmware and 'firmware_patterns' in profile: for pattern in profile['firmware_patterns']: if firmware.startswith(pattern) or pattern in firmware: logger.debug(f"Matched manufacturer profile: {mfg} for firmware pattern '{pattern}' in '{firmware}'") return profile # Check if detected manufacturer matches this profile if manufacturer and manufacturer in profile['aliases']: logger.debug(f"Matched manufacturer profile: {mfg} for detected manufacturer '{manufacturer}'") return profile # Check model/manufacturer aliases (fallback) for alias in profile['aliases']: if alias.lower() in model.lower() or (manufacturer and alias.lower() in manufacturer.lower()): logger.debug(f"Matched manufacturer profile: {mfg} for model alias '{alias}' in '{model}'") return profile # Return generic profile if no match logger.debug(f"No specific profile found for Model: '{model}', Manufacturer: '{manufacturer}', Firmware: '{firmware}', using Generic profile") return self.MANUFACTURER_SMART_PROFILES['Generic'] def _should_monitor_attribute(self, attr_name: str, manufacturer_profile: dict) -> bool: """Check if an attribute should be monitored based on manufacturer profile.""" if not manufacturer_profile: return True # Default: monitor everything attr_config = manufacturer_profile.get('attributes', {}).get(attr_name, {}) # Check if explicitly set to not monitor if attr_config.get('monitor') is False: logger.debug(f"Skipping monitoring for {attr_name} - explicitly disabled") return False return True # Default: monitor unless explicitly disabled def _get_attribute_thresholds(self, attr_name: str, manufacturer_profile: dict) -> dict: """Get attribute-specific thresholds, falling back to defaults.""" # Check for manufacturer-specific thresholds first if manufacturer_profile: attr_config = manufacturer_profile.get('attributes', {}).get(attr_name, {}) if 'warning_threshold' in attr_config and 'critical_threshold' in attr_config: return { 'warning': attr_config['warning_threshold'], 'critical': attr_config['critical_threshold'], 'behavior': attr_config.get('behavior', 'countup') } # Enhanced BASE_SMART_THRESHOLDS with manufacturer-specific handling BASE_SMART_THRESHOLDS = { 'Reallocated_Sector_Ct': {'warning': 5, 'critical': 10}, 'Current_Pending_Sector': {'warning': 1, 'critical': 5}, 'Offline_Uncorrectable': {'warning': 1, 'critical': 2}, 'Reported_Uncorrect': {'warning': 1, 'critical': 10}, 'Spin_Retry_Count': {'warning': 1, 'critical': 5}, 'Power_Cycle_Count': {'warning': 5000, 'critical': 10000}, 'Power_On_Hours': {'warning': 61320, 'critical': 70080}, 'Temperature_Celsius': {'warning': 65, 'critical': 75}, 'Available_Spare': {'warning': 30, 'critical': 10}, 'Program_Fail_Count': {'warning': 10, 'critical': 20}, 'Erase_Fail_Count': {'warning': 10, 'critical': 20}, 'Load_Cycle_Count': {'warning': 900000, 'critical': 1000000}, 'SSD_Life_Left': {'warning': 30, 'critical': 10}, 'Program_Fail_Cnt_Total': {'warning': 1, 'critical': 5}, 'Erase_Fail_Count_Total': {'warning': 1, 'critical': 5}, # ADJUSTED: More lenient thresholds for error rates on unknown drives 'Raw_Read_Error_Rate': {'warning': 10000000, 'critical': 100000000}, # Raised significantly 'Seek_Error_Rate': {'warning': 10000000, 'critical': 100000000}, # Raised significantly 'Command_Timeout': {'warning': 100, 'critical': 1000}, # Raised significantly 'High_Fly_Writes': {'warning': 1, 'critical': 5}, 'Airflow_Temperature_Cel': {'warning': 65, 'critical': 75}, 'G_Sense_Error_Rate': {'warning': 100, 'critical': 1000}, 'Power-Off_Retract_Count': {'warning': 100000, 'critical': 500000}, 'Head_Flying_Hours': {'warning': 50000, 'critical': 70000}, 'Runtime_Bad_Block': {'warning': 10, 'critical': 100}, 'Factory_Bad_Block_Ct': {'warning': 50, 'critical': 200}, 'Grown_Failing_Block_Ct': {'warning': 10, 'critical': 50}, 'End-to-End_Error': {'warning': 1, 'critical': 5} } if attr_name in BASE_SMART_THRESHOLDS: return { 'warning': BASE_SMART_THRESHOLDS[attr_name]['warning'], 'critical': BASE_SMART_THRESHOLDS[attr_name]['critical'], 'behavior': 'countup' } return None # No thresholds defined def _is_new_drive(self, power_on_hours: int) -> bool: """Determine if a drive is considered "new" based on power-on hours.""" return power_on_hours < 720 # Less than 1 week of runtime def _check_smart_health(self, device: str) -> Dict[str, Any]: """Enhanced SMART health check with better error handling and predictive analysis.""" smart_health = { 'status': 'UNKNOWN', 'severity': 'NORMAL', 'issues': [], 'temp': None, 'attributes': {}, 'manufacturer_profile': None } try: # Skip virtual devices if '/dev/rbd' in device or '/dev/dm-' in device or '/dev/mapper/' in device: smart_health['status'] = 'NOT_SUPPORTED' smart_health['issues'].append("Virtual device - SMART not applicable") return smart_health # First verify the device is SMART-capable drive_details = self._get_drive_details(device) if not drive_details.get('smart_capable', False): smart_health['status'] = 'NOT_SUPPORTED' smart_health['issues'].append("SMART not supported on this device") return smart_health # Special handling for NVMe devices if 'nvme' in device: return self._check_nvme_smart_health(device) # If we have no model info, the device might not be responding properly if not drive_details.get('model'): smart_health['status'] = 'ERROR' smart_health['issues'].append("Unable to read device information") return smart_health logger.debug(f"Drive details for {device}: {drive_details}") manufacturer_profile = self._get_manufacturer_profile( drive_details.get('model', ''), drive_details.get('manufacturer', ''), drive_details.get('firmware', '') ) smart_health['manufacturer_profile'] = manufacturer_profile logger.debug(f"Selected manufacturer profile for {device}: {manufacturer_profile.get('aliases', ['Unknown'])[0] if manufacturer_profile else 'None'}") # Get firmware information firmware_info = self._check_disk_firmware(device) if firmware_info['is_problematic']: smart_health['severity'] = 'WARNING' smart_health['issues'].extend(firmware_info['known_issues']) # Get detailed SMART data with timeout result = subprocess.run( ['smartctl', '-A', '-H', '-l', 'error', '-l', 'background', device], stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, timeout=30 ) output = result.stdout # Check overall health status if 'FAILED' in output and 'PASSED' not in output: smart_health['status'] = 'UNHEALTHY' smart_health['severity'] = 'CRITICAL' smart_health['issues'].append("SMART overall health check failed") elif 'PASSED' in output: smart_health['status'] = 'HEALTHY' else: smart_health['status'] = 'UNKNOWN' # Parse SMART attributes with manufacturer-specific handling power_on_hours = 0 # First pass: collect all SMART attributes with priority for _Total versions smart_attributes_raw = {} for line in output.split('\n'): # Extract Power_On_Hours first to determine if drive is new if 'Power_On_Hours' in line: parts = line.split() if len(parts) >= 10: power_on_hours = self._parse_smart_value(parts[9]) smart_attributes_raw['Power_On_Hours'] = power_on_hours # Handle SMART attributes with preference for _Total versions for attr in ['Erase_Fail_Count', 'Program_Fail_Count']: # Check for _Total version first (more accurate) if f'{attr}_Total' in line: parts = line.split() if len(parts) >= 10: raw_value = self._parse_smart_value(parts[9]) smart_attributes_raw[f'{attr}_Total'] = raw_value # Store as _Total logger.debug(f"Found {attr}_Total: {raw_value}") break # Only use non-_Total version if _Total not found AND not Ridata elif attr in line and f'{attr}_Total' not in smart_attributes_raw: # Check if this is a Ridata drive and should skip regular counters if manufacturer_profile and manufacturer_profile.get('aliases', [{}])[0] == 'Ridata': logger.debug(f"Skipping {attr} for Ridata drive - using _Total version only") continue parts = line.split() if len(parts) >= 10: raw_value = self._parse_smart_value(parts[9]) smart_attributes_raw[attr] = raw_value logger.debug(f"Found {attr} (non-Total): {raw_value}") smart_health['attributes'] = smart_attributes_raw # Check if this is a new drive is_new_drive = self._is_new_drive(power_on_hours) logger.debug(f"Drive {device} power-on hours: {power_on_hours}, is_new_drive: {is_new_drive}") # Parse remaining SMART attributes for line in output.split('\n'): # Handle manufacturer-specific Wear_Leveling_Count if 'Wear_Leveling_Count' in line: parts = line.split() if len(parts) >= 10: raw_value = self._parse_smart_value(parts[9]) smart_health['attributes']['Wear_Leveling_Count'] = raw_value # Get manufacturer-specific thresholds wear_attr = manufacturer_profile.get('attributes', {}).get('Wear_Leveling_Count', {}) # Skip evaluation if this is a new drive and manufacturer profile says to ignore if is_new_drive and wear_attr.get('ignore_on_new_drive', False): logger.debug(f"Skipping Wear_Leveling_Count evaluation for new drive: {raw_value}") continue warning_threshold = wear_attr.get('warning_threshold') critical_threshold = wear_attr.get('critical_threshold') if warning_threshold and critical_threshold: behavior = wear_attr.get('behavior', 'countup') if behavior == 'countup': if raw_value >= critical_threshold: smart_health['severity'] = 'CRITICAL' smart_health['issues'].append(f"Critical wear leveling count: {raw_value}") elif raw_value >= warning_threshold: if smart_health['severity'] != 'CRITICAL': smart_health['severity'] = 'WARNING' smart_health['issues'].append(f"High wear leveling count: {raw_value}") elif behavior == 'countdown': if raw_value <= critical_threshold: smart_health['severity'] = 'CRITICAL' smart_health['issues'].append(f"Critical wear leveling remaining: {raw_value}") elif raw_value <= warning_threshold: if smart_health['severity'] != 'CRITICAL': smart_health['severity'] = 'WARNING' smart_health['issues'].append(f"Low wear leveling remaining: {raw_value}") # Handle all SMART attributes with manufacturer-specific logic ALL_SMART_ATTRIBUTES = [ 'Reallocated_Sector_Ct', 'Current_Pending_Sector', 'Offline_Uncorrectable', 'Reported_Uncorrect', 'Spin_Retry_Count', 'Power_Cycle_Count', 'Power_On_Hours', 'Temperature_Celsius', 'Available_Spare', 'Program_Fail_Count', 'Erase_Fail_Count', 'Load_Cycle_Count', 'SSD_Life_Left', 'Program_Fail_Cnt_Total', 'Erase_Fail_Count_Total', 'Program_Fail_Count_Chip', 'Erase_Fail_Count_Chip', 'Raw_Read_Error_Rate', 'Seek_Error_Rate', 'Command_Timeout', 'High_Fly_Writes', 'Airflow_Temperature_Cel', 'G_Sense_Error_Rate', 'Power-Off_Retract_Count', 'Head_Flying_Hours', 'Runtime_Bad_Block', 'Factory_Bad_Block_Ct', 'Grown_Failing_Block_Ct', 'End-to-End_Error' ] for line in output.split('\n'): for attr in ALL_SMART_ATTRIBUTES: if attr in line and attr not in ['Wear_Leveling_Count']: # Wear_Leveling handled separately above # Check if we should monitor this attribute if not self._should_monitor_attribute(attr, manufacturer_profile): logger.debug(f"Skipping {attr} - disabled for this manufacturer") continue parts = line.split() if len(parts) >= 10: raw_value = self._parse_smart_value(parts[9]) smart_health['attributes'][attr] = raw_value # Get manufacturer-specific or default thresholds attr_thresholds = self._get_attribute_thresholds(attr, manufacturer_profile) if not attr_thresholds: continue # Apply thresholds based on behavior if attr == 'Temperature_Celsius': smart_health['temp'] = raw_value if raw_value >= attr_thresholds['critical']: smart_health['severity'] = 'CRITICAL' smart_health['issues'].append(f"Critical temperature: {raw_value}°C") elif raw_value >= attr_thresholds['warning']: if smart_health['severity'] != 'CRITICAL': smart_health['severity'] = 'WARNING' smart_health['issues'].append(f"High temperature: {raw_value}°C") else: # Handle countup/countdown behavior behavior = attr_thresholds.get('behavior', 'countup') if behavior == 'countup': if raw_value >= attr_thresholds['critical']: smart_health['severity'] = 'CRITICAL' smart_health['issues'].append(f"Critical {attr}: {raw_value}") elif raw_value >= attr_thresholds['warning']: if smart_health['severity'] != 'CRITICAL': smart_health['severity'] = 'WARNING' smart_health['issues'].append(f"Warning {attr}: {raw_value}") elif behavior == 'countdown': if raw_value <= attr_thresholds['critical']: smart_health['severity'] = 'CRITICAL' smart_health['issues'].append(f"Critical {attr}: {raw_value}") elif raw_value <= attr_thresholds['warning']: if smart_health['severity'] != 'CRITICAL': smart_health['severity'] = 'WARNING' smart_health['issues'].append(f"Warning {attr}: {raw_value}") # Check for recent SMART errors error_log_pattern = r"Error \d+ occurred at disk power-on lifetime: (\d+) hours" error_matches = re.finditer(error_log_pattern, output) recent_errors = [] for match in error_matches: error_hour = int(match.group(1)) current_hours = smart_health['attributes'].get('Power_On_Hours', 0) if current_hours - error_hour < 168: # Errors within last week recent_errors.append(match.group(0)) if recent_errors: smart_health['severity'] = 'WARNING' smart_health['issues'].extend(recent_errors) # Enhanced analysis methods if smart_health['attributes']: # Trend analysis for predictive failure detection trend_issues = self._analyze_smart_trends(device, smart_health['attributes']) smart_health['issues'].extend(trend_issues) # SSD-specific checks drive_type = drive_details.get('type', 'HDD') if drive_type == 'SSD': ssd_issues = self._check_ssd_health(device, smart_health['attributes']) smart_health['issues'].extend(ssd_issues) # Enhanced temperature analysis if smart_health['temp']: drive_type = drive_details.get('type', 'HDD') thermal_issues = self._check_thermal_health(device, smart_health['temp'], drive_type) smart_health['issues'].extend(thermal_issues) # Error pattern analysis error_pattern_issues = self._analyze_error_patterns(device, output) smart_health['issues'].extend(error_pattern_issues) logger.debug(f"=== SMART Health Check for {device} ===") logger.debug(f"Manufacturer profile: {manufacturer_profile.get('aliases', ['Unknown'])[0] if manufacturer_profile else 'None'}") logger.debug("Raw SMART attributes:") for attr, value in smart_health['attributes'].items(): logger.debug(f"{attr}: {value}") logger.debug(f"Temperature: {smart_health['temp']}°C") logger.debug(f"Is new drive: {is_new_drive}") logger.debug(f"Detected Issues: {smart_health['issues']}") logger.debug("=== End SMART Check ===\n") # Special handling for NVMe drives if 'nvme' in device: try: nvme_result = subprocess.run( ['nvme', 'smart-log', device], stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, timeout=10 ) logger.debug(f"NVMe smart-log raw output for {device}:") logger.debug(nvme_result.stdout) # Initialize the temperature attribute if smart_health['temp'] is None: smart_health['attributes']['Temperature_Celsius'] = None for line in nvme_result.stdout.split('\n'): # Fix the NoneType error by checking if line exists and has content if line and line.strip() and 'temperature' in line.lower(): try: temp_str = line.split(':')[1].strip() if ':' in line else line.strip() logger.debug(f"Raw temperature string: {temp_str}") # Extract first temperature value more safely digits = ''.join(c for c in temp_str if c.isdigit()) if len(digits) >= 2: temp_value = int(digits[:2]) logger.debug(f"Parsed temperature value: {temp_value}") # Set both temperature fields smart_health['temp'] = temp_value smart_health['attributes']['Temperature_Celsius'] = temp_value logger.debug(f"Final temperature recorded: {smart_health['temp']}") break except (ValueError, IndexError, AttributeError) as e: logger.debug(f"Error parsing NVMe temperature from line '{line}': {e}") continue except subprocess.TimeoutExpired: logger.debug(f"NVMe smart-log for {device} timed out") except Exception as e: logger.debug(f"Error getting NVMe smart data for {device}: {e}") except subprocess.TimeoutExpired: smart_health['status'] = 'ERROR' smart_health['issues'].append("SMART check timed out") except Exception as e: smart_health['status'] = 'ERROR' smart_health['severity'] = 'UNKNOWN' smart_health['issues'].append(f"Error checking SMART: {str(e)}") logger.debug(f"Exception in _check_smart_health for {device}: {e}") import traceback logger.debug(traceback.format_exc()) return smart_health def _check_nvme_smart_health(self, device: str) -> Dict[str, Any]: """Dedicated NVMe SMART health check.""" smart_health = { 'status': 'UNKNOWN', 'severity': 'NORMAL', 'issues': [], 'temp': None, 'attributes': {}, 'manufacturer_profile': None } try: # Use nvme-cli for NVMe devices result = subprocess.run( ['nvme', 'smart-log', device], stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, timeout=30 ) if result.returncode == 0: smart_health['status'] = 'HEALTHY' # Parse NVMe smart log output for line in result.stdout.split('\n'): if 'temperature' in line.lower(): # Extract temperature temp_match = re.search(r'(\d+)', line) if temp_match: smart_health['temp'] = int(temp_match.group(1)) smart_health['attributes']['Temperature_Celsius'] = smart_health['temp'] elif 'available_spare' in line.lower(): spare_match = re.search(r'(\d+)%', line) if spare_match: spare_pct = int(spare_match.group(1)) smart_health['attributes']['Available_Spare'] = spare_pct if spare_pct < 10: smart_health['severity'] = 'CRITICAL' smart_health['issues'].append(f"Critical Available_Spare: {spare_pct}%") elif spare_pct < 30: smart_health['severity'] = 'WARNING' smart_health['issues'].append(f"Low Available_Spare: {spare_pct}%") # Enhanced NVMe analysis if smart_health['attributes']: # Trend analysis for NVMe devices trend_issues = self._analyze_smart_trends(device, smart_health['attributes']) smart_health['issues'].extend(trend_issues) # SSD-specific checks for NVMe ssd_issues = self._check_ssd_health(device, smart_health['attributes']) smart_health['issues'].extend(ssd_issues) # Enhanced temperature analysis for NVMe if smart_health['temp']: thermal_issues = self._check_thermal_health(device, smart_health['temp'], 'SSD') smart_health['issues'].extend(thermal_issues) else: smart_health['status'] = 'ERROR' smart_health['issues'].append("Failed to read NVMe SMART data") except subprocess.TimeoutExpired: smart_health['status'] = 'ERROR' smart_health['issues'].append("NVMe SMART check timed out") except Exception as e: smart_health['status'] = 'ERROR' smart_health['issues'].append(f"Error checking NVMe SMART: {str(e)}") return smart_health def _check_drives_health(self) -> Dict[str, Any]: """Check health of all drives in the system.""" drives_health = {'overall_status': 'NORMAL', 'drives': []} try: # Get only valid physical disks physical_disks = self._get_all_disks() logger.debug(f"Checking physical disks: {physical_disks}") if not physical_disks: logger.warning("No valid physical disks found for monitoring") drives_health['overall_status'] = 'WARNING' return drives_health # Get ALL partition information including device mapper partitions = psutil.disk_partitions(all=True) # Create mapping of base devices to their partitions device_partitions = {} for part in partitions: # Extract base device (e.g., /dev/sda from /dev/sda1) base_device = re.match(r'(/dev/[a-z]+)', part.device) if base_device: base_dev = base_device.group(1) if base_dev not in device_partitions: device_partitions[base_dev] = [] device_partitions[base_dev].append(part) overall_status = 'NORMAL' for disk in physical_disks: drive_report = { 'device': disk, 'partitions': [], 'smart_status': 'UNKNOWN', 'usage_percent': 0 } # Add partition information if available if disk in device_partitions: total_used = 0 total_space = 0 for partition in device_partitions[disk]: try: usage = psutil.disk_usage(partition.mountpoint) total_used += usage.used total_space += usage.total part_info = { 'device': partition.device, 'mountpoint': partition.mountpoint, 'fstype': partition.fstype, 'total_space': self._convert_bytes(usage.total), 'used_space': self._convert_bytes(usage.used), 'free_space': self._convert_bytes(usage.free), 'usage_percent': usage.percent } drive_report['partitions'].append(part_info) except Exception as e: logger.debug(f"Error getting partition usage for {partition.device}: {e}") # Calculate overall drive usage percentage if total_space > 0: drive_report['usage_percent'] = (total_used / total_space) * 100 # Check SMART health smart_health = self._check_smart_health(disk) drive_report.update({ 'smart_status': smart_health['status'], 'smart_issues': smart_health['issues'], 'temperature': smart_health['temp'], 'smart_attributes': smart_health['attributes'] }) # Only report issues for drives that should be monitored if smart_health['status'] == 'UNHEALTHY': overall_status = 'CRITICAL' elif smart_health['status'] == 'ERROR': # Don't escalate overall status for ERROR drives (might be virtual) logger.debug(f"Drive {disk} returned ERROR status, skipping from issue detection") elif smart_health['issues'] and smart_health['status'] not in ['ERROR', 'NOT_SUPPORTED']: if overall_status != 'CRITICAL': overall_status = 'WARNING' drives_health['drives'].append(drive_report) drives_health['overall_status'] = overall_status except Exception as e: logger.error(f"Error checking drives health: {str(e)}") return drives_health # ============================================================================= # SYSTEM HEALTH CHECKING METHODS # ============================================================================= @staticmethod def _convert_bytes(bytes_value: int, suffix: str = 'B') -> str: """ Convert bytes to a human-readable format. :param bytes_value: Number of bytes to convert. :param suffix: Suffix to append (default is 'B' for bytes). :return: Formatted string with the size in human-readable form. """ for unit in ['', 'K', 'M', 'G', 'T', 'P', 'E', 'Z']: if abs(bytes_value) < 1024.0: return f"{bytes_value:.1f}{unit}{suffix}" bytes_value /= 1024.0 return f"{bytes_value:.1f}Y{suffix}" def _convert_size_to_bytes(self, size_str: str) -> float: """Convert size string with units to bytes.""" units = {'B': 1, 'K': 1024, 'M': 1024**2, 'G': 1024**3, 'T': 1024**4} size = float(size_str[:-1]) unit = size_str[-1].upper() return size * units[unit] def _check_memory_usage(self) -> Dict[str, Any]: """Check for ECC memory errors if ECC memory is present.""" memory_health = { 'has_ecc': False, 'ecc_errors': [], 'status': 'OK', 'total_memory': self._convert_bytes(psutil.virtual_memory().total), 'used_memory': self._convert_bytes(psutil.virtual_memory().used), 'memory_percent': psutil.virtual_memory().percent } try: # First check using dmidecode result = subprocess.run( ['dmidecode', '--type', 'memory'], stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True ) if 'Error Correction Type: Multi-bit ECC' in result.stdout: memory_health['has_ecc'] = True # If dmidecode didn't find ECC, try the edac method as backup if not memory_health['has_ecc']: edac_path = '/sys/devices/system/edac/mc' if os.path.exists(edac_path) and os.listdir(edac_path): for mc_dir in glob.glob('/sys/devices/system/edac/mc/mc[0-9]*'): if os.path.exists(f"{mc_dir}/csrow0"): memory_health['has_ecc'] = True break # If ECC is present, check for errors if memory_health['has_ecc']: for mc_dir in glob.glob('/sys/devices/system/edac/mc/mc[0-9]*'): if os.path.exists(f"{mc_dir}/csrow0"): ue_count = self._read_ecc_count(f"{mc_dir}/csrow0/ue_count") if ue_count > 0: memory_health['status'] = 'CRITICAL' memory_health['ecc_errors'].append( f"Uncorrectable ECC errors detected in {os.path.basename(mc_dir)}: {ue_count}" ) ce_count = self._read_ecc_count(f"{mc_dir}/csrow0/ce_count") if ce_count > 0: if memory_health['status'] != 'CRITICAL': memory_health['status'] = 'WARNING' memory_health['ecc_errors'].append( f"Correctable ECC errors detected in {os.path.basename(mc_dir)}: {ce_count}" ) except Exception as e: memory_health['status'] = 'ERROR' memory_health['ecc_errors'].append(f"Error checking ECC status: {str(e)}") return memory_health def _read_ecc_count(self, filepath: str) -> int: """ Read ECC error count from a file. :param filepath: Path to the ECC count file :return: Number of ECC errors """ try: with open(filepath, 'r') as f: return int(f.read().strip()) except: return 0 def _check_cpu_usage(self) -> Dict[str, Any]: """ Check CPU usage and return health metrics. :return: Dictionary with CPU health metrics. """ cpu_usage_percent = psutil.cpu_percent(interval=1) cpu_health = { 'cpu_usage_percent': cpu_usage_percent, 'status': 'OK' if cpu_usage_percent < self.CONFIG['THRESHOLDS']['CPU_WARNING'] else 'WARNING' } return cpu_health def _check_network_status(self) -> Dict[str, Any]: """ Check the status of network interfaces and report any issues. :return: Dictionary containing network health metrics and any issues found. """ network_health = { 'management_network': { 'issues': [], 'status': 'OK', 'latency': None }, 'ceph_network': { 'issues': [], 'status': 'OK', 'latency': None } } try: # Check management network connectivity mgmt_result = subprocess.run( [ "ping", "-c", str(self.CONFIG['NETWORKS']['PING_COUNT']), "-W", str(self.CONFIG['NETWORKS']['PING_TIMEOUT']), self.CONFIG['NETWORKS']['MANAGEMENT'] ], stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True ) if mgmt_result.returncode != 0: network_health['management_network']['status'] = 'CRITICAL' network_health['management_network']['issues'].append( "Management network is unreachable" ) # Check Ceph network connectivity ceph_result = subprocess.run( [ "ping", "-c", str(self.CONFIG['NETWORKS']['PING_COUNT']), "-W", str(self.CONFIG['NETWORKS']['PING_TIMEOUT']), self.CONFIG['NETWORKS']['CEPH'] ], stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True ) if ceph_result.returncode != 0: network_health['ceph_network']['status'] = 'CRITICAL' network_health['ceph_network']['issues'].append( "Ceph network is unreachable" ) return network_health except Exception as e: logger.error(f"Network health check failed: {e}") return { 'status': 'ERROR', 'error': str(e) } def _check_lxc_storage(self) -> Dict[str, Any]: """ Check storage utilization for all running LXC containers """ logger.debug("Starting LXC storage check") lxc_health = { 'status': 'OK', 'containers': [], 'issues': [] } try: result = subprocess.run( ['pct', 'list'], stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True ) logger.debug(f"pct list output:\n{result.stdout}") for line in result.stdout.split('\n')[1:]: if not line.strip(): continue parts = line.split() if len(parts) < 2: logger.debug(f"Skipping invalid line: {line}") continue vmid, status = parts[0], parts[1] if status.lower() == 'running': logger.debug(f"Checking container {vmid} disk usage") disk_info = subprocess.run( ['pct', 'df', vmid], stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True ) container_info = { 'vmid': vmid, 'filesystems': [] } for fs_line in disk_info.stdout.split('\n')[1:]: if not fs_line.strip() or 'MP' in fs_line: continue # Fix: Use fs_line instead of line, and columns consistently columns = fs_line.split() if len(columns) >= 6: try: # Skip excluded mounts by checking the first column if columns[0].startswith('appPool:') or '/mnt/pve/mediaf' in columns[1]: continue # Get the mountpoint (last column) mountpoint = columns[-1] # Skip excluded mountpoints if self._is_excluded_mount(mountpoint): logger.debug(f"Skipping excluded mount: {mountpoint}") continue # Parse size values safely - use correct column indices total_space = self._parse_size(columns[2]) # 3rd column used_space = self._parse_size(columns[3]) # 4th column available_space = self._parse_size(columns[4]) # 5th column # Parse percentage safely try: usage_percent = float(columns[5].rstrip('%')) # 6th column except (ValueError, IndexError): # Calculate percentage if parsing fails usage_percent = (used_space / total_space * 100) if total_space > 0 else 0 filesystem = { 'mountpoint': mountpoint, 'total_space': total_space, 'used_space': used_space, 'available': available_space, 'usage_percent': usage_percent } container_info['filesystems'].append(filesystem) # Check thresholds if usage_percent >= self.CONFIG['THRESHOLDS']['LXC_CRITICAL']: lxc_health['status'] = 'CRITICAL' issue = f"LXC {vmid} critical storage usage: {usage_percent:.1f}% on {mountpoint}" lxc_health['issues'].append(issue) elif usage_percent >= self.CONFIG['THRESHOLDS']['LXC_WARNING']: if lxc_health['status'] != 'CRITICAL': lxc_health['status'] = 'WARNING' issue = f"LXC {vmid} high storage usage: {usage_percent:.1f}% on {mountpoint}" lxc_health['issues'].append(issue) logger.debug(f"Filesystem details: {filesystem}") except Exception as e: logger.debug(f"Error processing line: {str(e)}") logger.debug(f"Full exception: {repr(e)}") continue # Only add container info if we have filesystem data if container_info['filesystems']: lxc_health['containers'].append(container_info) logger.debug(f"Added container info for VMID {vmid}") logger.debug("=== LXC Storage Check Summary ===") logger.debug(f"Status: {lxc_health['status']}") logger.debug(f"Total containers checked: {len(lxc_health['containers'])}") logger.debug(f"Issues found: {len(lxc_health['issues'])}") logger.debug("=== End LXC Storage Check ===") except Exception as e: logger.debug(f"Critical error during LXC storage check: {str(e)}") lxc_health['status'] = 'ERROR' error_msg = f"Error checking LXC storage: {str(e)}" lxc_health['issues'].append(error_msg) return lxc_health def main(): parser = argparse.ArgumentParser(description="System Health Monitor") parser.add_argument( "--dry-run", action="store_true", help="Enable dry-run mode (simulate ticket creation without actual API calls)." ) args = parser.parse_args() monitor = SystemHealthMonitor( ticket_api_url=SystemHealthMonitor.CONFIG['TICKET_API_URL'], dry_run=args.dry_run ) monitor.run() if __name__ == "__main__": main()