#!/usr/bin/env python3 import os, sys, json, requests, psutil, socket, subprocess, logging, argparse, urllib.request, re, glob, datetime from typing import Dict, Any, List # Create a logger logger = logging.getLogger(__name__) logger.setLevel(logging.DEBUG) # Create a console handler and set its level to DEBUG console_handler = logging.StreamHandler() console_handler.setLevel(logging.DEBUG) # Create a formatter formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s') # Add the formatter to the console handler console_handler.setFormatter(formatter) # Add the console handler to the logger logger.addHandler(console_handler) class SystemHealthMonitor: PRIORITIES = { 'CRITICAL': '1', 'HIGH': '2', 'MEDIUM': '3', 'LOW': '4' } ISSUE_PRIORITIES = { 'SMART_FAILURE': PRIORITIES['HIGH'], 'DISK_CRITICAL': PRIORITIES['HIGH'], 'DISK_WARNING': PRIORITIES['MEDIUM'], 'UNCORRECTABLE_ECC': PRIORITIES['HIGH'], 'CORRECTABLE_ECC': PRIORITIES['MEDIUM'], 'CPU_HIGH': PRIORITIES['MEDIUM'], 'NETWORK_FAILURE': PRIORITIES['HIGH'] } CONFIG = { 'TICKET_API_URL': 'http://10.10.10.45/create_ticket_api.php', 'THRESHOLDS': { 'DISK_CRITICAL': 90, 'DISK_WARNING': 80, 'CPU_WARNING': 80, 'TEMPERATURE_WARNING': 65 }, 'NETWORKS': { 'MANAGEMENT': '10.10.10.1', 'CEPH': '10.10.90.1', 'PING_TIMEOUT': 1, # seconds 'PING_COUNT': 1 } } TICKET_TEMPLATES = { 'ACTION_TYPE': '[auto]', 'ENVIRONMENT': '[production]', 'TICKET_TYPE': '[maintenance]', 'HARDWARE_TYPE': '[hardware]', 'NETWORK_TYPE': '[network]', 'SCOPE_SINGLE': '[single-node]', 'SCOPE_CLUSTER': '[cluster-wide]', 'DEFAULT_CATEGORY': 'Hardware', 'DEFAULT_ISSUE_TYPE': 'Problem' } PROBLEMATIC_FIRMWARE = { 'Samsung': { 'EVO860': ['RVT01B6Q', 'RVT02B6Q'], # Known issues with sudden performance drops 'EVO870': ['SVT01B6Q'], 'PM883': ['HXT7404Q'] # Known issues with TRIM }, 'Seagate': { 'ST8000NM': ['CC64'], # Known issues with NCQ 'ST12000NM': ['SN02'] }, 'WDC': { 'WD121KRYZ': ['01.01A01'], # RAID rebuild issues 'WD141KRYZ': ['02.01A02'] } } def __init__(self, ticket_api_url: str = 'http://10.10.10.45/create_ticket_api.php', dry_run: bool = False): """ Initialize the system health monitor. :param ticket_api_url: URL for the ticket creation API. :param dry_run: If True, simulate API calls without sending requests. """ self.ticket_api_url = ticket_api_url self.dry_run = dry_run def run(self): """ Perform a one-shot health check of the system. """ try: # Perform health checks and gather the report health_report = self.perform_health_checks() # Create tickets for any detected critical issues self._create_tickets_for_issues(health_report) except Exception as e: print(f"Unexpected error during health check: {e}") def perform_health_checks(self) -> Dict[str, Any]: """ Perform comprehensive system health checks and return a report. """ health_report = { 'drives_health': self._check_drives_health(), 'memory_health': self._check_memory_usage(), 'cpu_health': self._check_cpu_usage(), 'network_health': self._check_network_status() } if self.dry_run: logger.info("=== Detailed Health Check Results ===") logger.info(f"Drive Health Status: {health_report['drives_health']['overall_status']}") for drive in health_report['drives_health']['drives']: logger.info(f"Drive {drive['mountpoint']}: {drive['usage_percent']}% used, SMART: {drive['smart_status']}") logger.info(f"Memory Status: {health_report['memory_health']['status']}") logger.info(f"Memory Usage: {health_report['memory_health']['memory_percent']}%") logger.info(f"ECC Memory: {'Present' if health_report['memory_health']['has_ecc'] else 'Not Present'}") if health_report['memory_health']['has_ecc'] and health_report['memory_health']['ecc_errors']: logger.info(f"ECC Errors: {health_report['memory_health']['ecc_errors']}") logger.info(f"CPU Usage: {health_report['cpu_health']['cpu_usage_percent']}% ({health_report['cpu_health']['status']})") logger.info(f"Network Management Status: {health_report['network_health']['management_network']['status']}") logger.info(f"Network Ceph Status: {health_report['network_health']['ceph_network']['status']}") logger.info("================================") return health_report def _generate_detailed_description(self, issue: str, health_report: Dict[str, Any]) -> str: # Keep existing banner and initial description banner = """ ================================================================= AUTOMATED TICKET - Generated by Hardware Monitoring Service (hwmonDaemon) Host: {} Generated: {} ================================================================= """.format(socket.gethostname(), datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")) description = banner + issue + "\n\n" # Add issue explanation section description += "=== Issue Details ===\n" # Add SMART attribute explanations SMART_DESCRIPTIONS = { 'Reallocated_Sector_Ct': """ Number of sectors that have been reallocated due to errors. - High counts indicate degrading media - Each reallocation uses one of the drive's limited spare sectors - Rapid increases suggest accelerating drive wear """, 'Current_Pending_Sector': """ Sectors waiting to be reallocated due to read/write errors. - Indicates potentially unstable sectors - May result in data loss if unrecoverable - Should be monitored for increases """, 'Offline_Uncorrectable': """ Count of uncorrectable errors detected during offline data collection. - Direct indicator of media reliability issues - May affect data integrity - High values suggest drive replacement needed """, 'Reported_Uncorrect': """ Number of errors that could not be recovered using hardware ECC. - Critical indicator of drive health - Directly impacts data reliability - Any non-zero value requires attention """, 'Spin_Retry_Count': """ Number of spin start retry attempts. - Indicates potential motor or bearing issues - May predict imminent mechanical failure - Increasing values suggest degrading drive health """, 'Power_On_Hours': """ Total number of hours the device has been powered on. - Normal aging metric - Used to gauge overall drive lifetime - Compare against manufacturer's MTBF rating """, 'Media_Wearout_Indicator': """ Percentage of drive's rated life remaining (SSDs). - 100 indicates new drive - 0 indicates exceeded rated writes - Critical for SSD lifecycle management """, 'Temperature_Celsius': """ Current drive temperature. - High temperatures accelerate wear - Optimal range: 20-45°C - Sustained high temps reduce lifespan """, 'Available_Spare': """ Percentage of spare blocks remaining (SSDs). - Critical for SSD endurance - Low values indicate approaching end-of-life - Rapid decreases suggest excessive writes """, 'Program_Fail_Count': """ Number of flash program operation failures. - Indicates NAND cell reliability - Important for SSD health assessment - Increasing values suggest flash degradation """, 'Erase_Fail_Count': """ Number of flash erase operation failures. - Related to NAND block health - Critical for SSD reliability - High counts suggest failing flash blocks """ } if "SMART" in issue: description += """ SMART (Self-Monitoring, Analysis, and Reporting Technology) issues indicate potential drive reliability problems. - Reallocated sectors indicate bad blocks that have been remapped - Pending sectors are potentially failing blocks waiting to be remapped - Uncorrectable errors indicate data that could not be read """ if "Temperature" in issue: description += """ High drive temperatures can: - Reduce drive lifespan - Cause performance degradation - Lead to data corruption in extreme cases Optimal temperature range: 20-45°C """ if "ECC" in issue: description += """ ECC (Error Correction Code) Memory Issues: - Correctable: Memory errors that were successfully fixed - Uncorrectable: Serious memory errors that could not be corrected Frequent ECC corrections may indicate degrading memory modules """ if "CPU" in issue: description += """ High CPU usage sustained over time can indicate: - Resource constraints - Runaway processes - Need for performance optimization - Potential cooling issues """ if "Network" in issue: description += """ Network connectivity issues can impact: - Cluster communication - Data replication - Service availability - Management access """ # Keep existing detailed metrics section if "Disk" in issue: for partition in health_report.get('drives_health', {}).get('drives', []): if partition.get('mountpoint') in issue: description += f"\n=== Disk Metrics ===\n" description += f"Disk Device: {partition['device']}\n" description += f"Mount Point: {partition['mountpoint']}\n" description += f"Total Space: {partition['total_space']}\n" description += f"Used Space: {partition['used_space']}\n" description += f"Free Space: {partition['free_space']}\n" description += f"Usage Percent: {partition['usage_percent']}%\n" return description def _create_tickets_for_issues(self, health_report: Dict[str, Any]): issues = self._detect_issues(health_report) if not issues: logger.info("No issues detected.") return hostname = socket.gethostname() action_type = self.TICKET_TEMPLATES['ACTION_TYPE'] environment = self.TICKET_TEMPLATES['ENVIRONMENT'] ticket_type = self.TICKET_TEMPLATES['TICKET_TYPE'] hardware_type = self.TICKET_TEMPLATES['HARDWARE_TYPE'] for issue in issues: priority = self.PRIORITIES['MEDIUM'] category = self.TICKET_TEMPLATES['DEFAULT_CATEGORY'] issue_type = self.TICKET_TEMPLATES['DEFAULT_ISSUE_TYPE'] scope = self.TICKET_TEMPLATES['SCOPE_SINGLE'] # Priority and type assignment logic remains the same... ticket_title = f"[{hostname}]{action_type}{hardware_type} {issue} {scope}{environment}{ticket_type}" description = self._generate_detailed_description(issue, health_report) ticket_payload = { "title": ticket_title, "description": description, "priority": priority, "status": "Open", "category": category, "type": issue_type } if self.dry_run: logger.info("Dry-run mode enabled. Simulating ticket creation:") logger.info(json.dumps(ticket_payload, indent=4)) else: try: response = requests.post( self.ticket_api_url, json=ticket_payload, headers={'Content-Type': 'application/json'} ) response_data = response.json() if response_data.get('success'): logger.info(f"Ticket created successfully: {ticket_title}") logger.info(f"Ticket ID: {response_data.get('ticket_id')}") elif response_data.get('error') == 'Duplicate ticket': logger.info(f"Duplicate ticket detected - existing ticket ID: {response_data.get('existing_ticket_id')}") continue else: logger.error(f"Failed to create ticket: {response_data.get('error')}") except Exception as e: logger.error(f"Error creating ticket: {e}") def _detect_issues(self, health_report: Dict[str, Any]) -> List[str]: """ Detect issues in the health report including non-critical issues. :param health_report: The comprehensive health report from the checks. :return: List of issue descriptions detected during checks. """ issues = [] # Check for drive-related issues for drive in health_report.get('drives_health', {}).get('drives', []): if drive.get('smart_issues'): issues.append(f"Drive {drive['device']} has SMART issues: {', '.join(drive['smart_issues'])}") if drive.get('temperature') and drive['temperature'] > self.CONFIG['THRESHOLDS']['TEMPERATURE_WARNING']: issues.append(f"Drive {drive['device']} temperature is high: {drive['temperature']}°C") # Check for ECC memory errors memory_health = health_report.get('memory_health', {}) if memory_health.get('has_ecc') and memory_health.get('ecc_errors'): issues.extend(memory_health['ecc_errors']) # Check for CPU-related issues cpu_health = health_report.get('cpu_health', {}) if cpu_health and cpu_health.get('cpu_usage_percent', 0) > self.CONFIG['THRESHOLDS']['CPU_WARNING']: issues.append("CPU usage is above threshold") # Check for network-related issues network_health = health_report.get('network_health', {}) for network in ['management_network', 'ceph_network']: if network_health.get(network, {}).get('issues'): issues.extend(network_health[network]['issues']) logger.debug("=== Issue Detection Started ===") logger.debug(f"Checking drives: {len(health_report['drives_health']['drives'])} found") logger.debug(f"Memory status: {health_report['memory_health']['status']}") logger.debug(f"CPU status: {health_report['cpu_health']['status']}") logger.debug(f"Network status: {health_report['network_health']}") logger.debug(f"Detected issues: {issues}") logger.debug("=== Issue Detection Completed ===\n") return issues def _get_all_disks(self) -> List[str]: """ Get all physical disks using multiple detection methods. """ disks = set() # Method 1: Use lsblk to get physical disks try: result = subprocess.run( ['lsblk', '-d', '-n', '-o', 'NAME'], stdout=subprocess.PIPE, text=True ) disks.update(f"/dev/{disk}" for disk in result.stdout.strip().split('\n')) logger.debug(f"Disks found via lsblk: {disks}") except Exception as e: logger.debug(f"lsblk detection failed: {e}") # Method 2: Direct device scanning for pattern in ['/dev/sd*', '/dev/nvme*n*']: try: matches = glob.glob(pattern) disks.update(d for d in matches if not d[-1].isdigit()) logger.debug(f"Disks found via glob {pattern}: {matches}") except Exception as e: logger.debug(f"Glob detection failed for {pattern}: {e}") return list(disks) def _is_physical_disk(self, device_path): """ Check if the device is a physical disk, excluding logical volumes and special devices. :param device_path: Path to the device :return: Boolean indicating if it's a relevant physical disk """ logger.debug(f"Checking device: {device_path}") # Exclude known non-physical or special devices excluded_patterns = [ r'/dev/mapper/', # LVM devices r'/dev/dm-', # Device mapper devices r'/dev/loop', # Loop devices r'/dev/rbd', # Ceph RBD devices r'/boot', # Boot partitions r'/boot/efi' # EFI partitions ] if any(re.search(pattern, device_path) for pattern in excluded_patterns): logger.debug(f"Device {device_path} excluded due to pattern match") return False # Match physical devices physical_patterns = [ r'/dev/sd[a-z]+$', # SATA/SAS drives r'/dev/nvme\d+n\d+$', # NVMe drives r'/dev/mmcblk\d+$', # MMC/SD cards r'/dev/hd[a-z]+$' # IDE drives (legacy) ] is_physical = any(re.match(pattern, device_path) for pattern in physical_patterns) logger.debug(f"Device {device_path} physical disk check result: {is_physical}") return is_physical def _check_disk_firmware(self, device: str) -> Dict[str, Any]: """ Check disk firmware version against known problematic versions. """ firmware_info = { 'version': None, 'model': None, 'manufacturer': None, 'is_problematic': False, 'known_issues': [] } try: result = subprocess.run( ['smartctl', '-i', device], stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True ) for line in result.stdout.split('\n'): if 'Firmware Version:' in line: firmware_info['version'] = line.split(':')[1].strip() elif 'Model Family:' in line: firmware_info['model'] = line.split(':')[1].strip() elif 'Device Model:' in line: if not firmware_info['model']: firmware_info['model'] = line.split(':')[1].strip() # Determine manufacturer for manufacturer in self.PROBLEMATIC_FIRMWARE.keys(): if manufacturer in firmware_info['model']: firmware_info['manufacturer'] = manufacturer break # Check against known problematic versions if firmware_info['manufacturer'] and firmware_info['model']: for model, versions in self.PROBLEMATIC_FIRMWARE[firmware_info['manufacturer']].items(): if model in firmware_info['model'] and firmware_info['version'] in versions: firmware_info['is_problematic'] = True firmware_info['known_issues'].append( f"Known problematic firmware version {firmware_info['version']} " f"for {firmware_info['model']}" ) logger.debug(f"=== Firmware Check for {device} ===") logger.debug(f"Firmware version: {firmware_info['version']}") logger.debug(f"Model: {firmware_info['model']}") logger.debug(f"Manufacturer: {firmware_info['manufacturer']}") logger.debug(f"Known issues: {firmware_info['known_issues']}") logger.debug("=== End Firmware Check ===\n") except Exception as e: firmware_info['known_issues'].append(f"Error checking firmware: {str(e)}") return firmware_info def _parse_smart_value(self, raw_value: str) -> int: """ Parse SMART values handling different formats """ try: # Handle time format (e.g., '15589h+17m+33.939s') if 'h+' in raw_value: return int(raw_value.split('h+')[0]) # Handle hex values if '0x' in raw_value: return int(raw_value, 16) # Handle basic numbers return int(raw_value) except ValueError: logger.debug(f"Could not parse SMART value: {raw_value}") return 0 def _check_smart_health(self, device: str) -> Dict[str, Any]: """ Enhanced SMART health check with detailed failure thresholds. """ smart_health = { 'status': 'HEALTHY', 'severity': 'NORMAL', 'issues': [], 'temp': None, 'attributes': {} } # Define critical SMART attributes and their thresholds SMART_THRESHOLDS = { 'Reallocated_Sector_Ct': {'warning': 5, 'critical': 10}, 'Current_Pending_Sector': {'warning': 1, 'critical': 5}, 'Offline_Uncorrectable': {'warning': 1, 'critical': 2}, 'Reported_Uncorrect': {'warning': 1, 'critical': 10}, 'Spin_Retry_Count': {'warning': 1, 'critical': 5}, # 'Command_Timeout': {'warning': 5, 'critical': 10}, # Removed 'Power_Cycle_Count': {'warning': 5000, 'critical': 10000}, 'Power_On_Hours': {'warning': 61320, 'critical': 70080}, # ~7-8 years 'Media_Wearout_Indicator': {'warning': 30, 'critical': 10}, 'Temperature_Celsius': {'warning': 65, 'critical': 75}, 'Host_Writes_32MiB': {'warning': 50000000, 'critical': 100000000}, 'Wear_Leveling_Count': {'warning': 2000, 'critical': 3000}, 'Available_Spare': {'warning': 30, 'critical': 10}, 'Program_Fail_Count': {'warning': 10, 'critical': 20}, 'Erase_Fail_Count': {'warning': 10, 'critical': 20}, # 'Raw_Read_Error_Rate': {'warning': 50, 'critical': 100}, # Removed # 'Seek_Error_Rate': {'warning': 50, 'critical': 100}, # Removed 'Load_Cycle_Count': {'warning': 900000, 'critical': 1000000}, 'SSD_Life_Left': {'warning': 30, 'critical': 10} } try: # Get firmware information firmware_info = self._check_disk_firmware(device) if firmware_info['is_problematic']: smart_health['severity'] = 'WARNING' smart_health['issues'].extend(firmware_info['known_issues']) # Get detailed SMART data including performance metrics result = subprocess.run( ['smartctl', '-A', '-H', '-l', 'error', '-l', 'background', device], stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True ) output = result.stdout # Check overall health status if 'FAILED' in output and 'PASSED' not in output: smart_health['status'] = 'UNHEALTHY' smart_health['severity'] = 'CRITICAL' smart_health['issues'].append("SMART overall health check failed") # Parse SMART attributes with thresholds for line in output.split('\n'): if 'Reported_Uncorrect' in line: parts = line.split() raw_value = self._parse_smart_value(parts[9]) logger.debug(f"Found Reported_Uncorrect value: {raw_value}") smart_health['attributes']['Reported_Uncorrect'] = raw_value if raw_value >= SMART_THRESHOLDS['Reported_Uncorrect']['critical']: smart_health['status'] = 'UNHEALTHY' smart_health['severity'] = 'CRITICAL' smart_health['issues'].append(f"Critical uncorrectable errors: {raw_value}") elif raw_value >= SMART_THRESHOLDS['Reported_Uncorrect']['warning']: if smart_health['severity'] != 'CRITICAL': smart_health['severity'] = 'WARNING' smart_health['issues'].append(f"Warning: uncorrectable errors detected: {raw_value}") for attr, thresholds in SMART_THRESHOLDS.items(): if attr in line: parts = line.split() if len(parts) >= 10: raw_value = self._parse_smart_value(parts[9]) smart_health['attributes'][attr] = raw_value if attr == 'Temperature_Celsius': smart_health['temp'] = raw_value if raw_value >= thresholds['critical']: smart_health['severity'] = 'CRITICAL' smart_health['issues'].append(f"Critical temperature: {raw_value}°C") elif raw_value >= thresholds['warning']: smart_health['severity'] = 'WARNING' smart_health['issues'].append(f"High temperature: {raw_value}°C") else: if raw_value >= thresholds['critical']: smart_health['severity'] = 'CRITICAL' smart_health['issues'].append(f"Critical {attr}: {raw_value}") elif raw_value >= thresholds['warning']: if smart_health['severity'] != 'CRITICAL': smart_health['severity'] = 'WARNING' smart_health['issues'].append(f"Warning {attr}: {raw_value}") # Check for recent SMART errors error_log_pattern = r"Error \d+ occurred at disk power-on lifetime: (\d+) hours" error_matches = re.finditer(error_log_pattern, output) recent_errors = [] for match in error_matches: error_hour = int(match.group(1)) current_hours = smart_health['attributes'].get('Power_On_Hours', 0) if current_hours - error_hour < 168: # Errors within last week recent_errors.append(match.group(0)) if recent_errors: smart_health['severity'] = 'WARNING' smart_health['issues'].extend(recent_errors) smart_health['performance_metrics'] = { 'read_speed': None, 'write_speed': None, 'access_time': None } # Quick performance test try: perf_result = subprocess.run( ['hdparm', '-Tt', device], stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True ) for line in perf_result.stdout.split('\n'): if 'buffered disk reads' in line: smart_health['performance_metrics']['read_speed'] = float(line.split()[0]) elif 'cached reads' in line: smart_health['performance_metrics']['cached_speed'] = float(line.split()[0]) except: pass # Skip performance metrics if hdparm fails logger.debug(f"=== SMART Health Check for {device} ===") logger.debug("Raw SMART attributes:") for attr, value in smart_health['attributes'].items(): logger.debug(f"{attr}: {value}") logger.debug(f"Temperature: {smart_health['temp']}°C") logger.debug(f"Detected Issues: {smart_health['issues']}") logger.debug("=== End SMART Check ===\n") except Exception as e: smart_health['status'] = 'ERROR' smart_health['severity'] = 'UNKNOWN' smart_health['issues'].append(f"Error checking SMART: {str(e)}") return smart_health def _check_drives_health(self) -> Dict[str, Any]: """ Check overall health of physical SATA and NVMe drives including disk usage and SMART status. """ drives_health = {'overall_status': 'NORMAL', 'drives': []} try: # Get physical disks only (exclude RBD devices) physical_disks = [disk for disk in self._get_all_disks() if disk.startswith(('/dev/sd', '/dev/nvme'))] logger.debug(f"Checking physical disks: {physical_disks}") overall_status = 'NORMAL' for disk in physical_disks: drive_report = { 'device': disk, 'mountpoint': None, 'usage_status': 'UNMOUNTED', 'usage_percent': 0, 'total_space': '0B', 'used_space': '0B', 'free_space': '0B', 'smart_status': 'UNKNOWN' } # Check SMART health first smart_health = self._check_smart_health(disk) drive_report.update({ 'smart_status': smart_health['status'], 'smart_issues': smart_health['issues'], 'temperature': smart_health['temp'], 'smart_attributes': smart_health['attributes'] }) # Update overall status based on SMART health if smart_health['status'] == 'UNHEALTHY': overall_status = 'CRITICAL' elif smart_health['issues'] and overall_status != 'CRITICAL': overall_status = 'WARNING' drives_health['drives'].append(drive_report) drives_health['overall_status'] = overall_status except Exception as e: logger.error(f"Error checking drives health: {str(e)}") return drives_health @staticmethod def _convert_bytes(bytes_value: int, suffix: str = 'B') -> str: """ Convert bytes to a human-readable format. :param bytes_value: Number of bytes to convert. :param suffix: Suffix to append (default is 'B' for bytes). :return: Formatted string with the size in human-readable form. """ for unit in ['', 'K', 'M', 'G', 'T', 'P', 'E', 'Z']: if abs(bytes_value) < 1024.0: return f"{bytes_value:.1f}{unit}{suffix}" bytes_value /= 1024.0 return f"{bytes_value:.1f}Y{suffix}" def _check_memory_usage(self) -> Dict[str, Any]: """ Check for ECC memory errors if ECC memory is present. """ memory_health = { 'has_ecc': False, 'ecc_errors': [], 'status': 'OK', 'total_memory': self._convert_bytes(psutil.virtual_memory().total), 'used_memory': self._convert_bytes(psutil.virtual_memory().used), 'memory_percent': psutil.virtual_memory().percent } try: # First check using dmidecode result = subprocess.run( ['dmidecode', '--type', 'memory'], stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True ) if 'Error Correction Type: Multi-bit ECC' in result.stdout: memory_health['has_ecc'] = True # If dmidecode didn't find ECC, try the edac method as backup if not memory_health['has_ecc']: edac_path = '/sys/devices/system/edac/mc' if os.path.exists(edac_path) and os.listdir(edac_path): for mc_dir in glob.glob('/sys/devices/system/edac/mc/mc[0-9]*'): if os.path.exists(f"{mc_dir}/csrow0"): memory_health['has_ecc'] = True break # If ECC is present, check for errors if memory_health['has_ecc']: for mc_dir in glob.glob('/sys/devices/system/edac/mc/mc[0-9]*'): if os.path.exists(f"{mc_dir}/csrow0"): ue_count = self._read_ecc_count(f"{mc_dir}/csrow0/ue_count") if ue_count > 0: memory_health['status'] = 'CRITICAL' memory_health['ecc_errors'].append( f"Uncorrectable ECC errors detected in {os.path.basename(mc_dir)}: {ue_count}" ) ce_count = self._read_ecc_count(f"{mc_dir}/csrow0/ce_count") if ce_count > 0: if memory_health['status'] != 'CRITICAL': memory_health['status'] = 'WARNING' memory_health['ecc_errors'].append( f"Correctable ECC errors detected in {os.path.basename(mc_dir)}: {ce_count}" ) except Exception as e: memory_health['status'] = 'ERROR' memory_health['ecc_errors'].append(f"Error checking ECC status: {str(e)}") return memory_health def _read_ecc_count(self, filepath: str) -> int: """ Read ECC error count from a file. :param filepath: Path to the ECC count file :return: Number of ECC errors """ try: with open(filepath, 'r') as f: return int(f.read().strip()) except: return 0 def _check_cpu_usage(self) -> Dict[str, Any]: """ Check CPU usage and return health metrics. :return: Dictionary with CPU health metrics. """ cpu_usage_percent = psutil.cpu_percent(interval=1) cpu_health = { 'cpu_usage_percent': cpu_usage_percent, 'status': 'OK' if cpu_usage_percent < self.CONFIG['THRESHOLDS']['CPU_WARNING'] else 'WARNING' } return cpu_health def _check_network_status(self) -> Dict[str, Any]: """ Check the status of network interfaces and report any issues. :return: Dictionary containing network health metrics and any issues found. """ network_health = { 'management_network': { 'issues': [], 'status': 'OK', 'latency': None }, 'ceph_network': { 'issues': [], 'status': 'OK', 'latency': None } } try: # Check management network connectivity mgmt_result = subprocess.run( [ "ping", "-c", str(self.CONFIG['NETWORKS']['PING_COUNT']), "-W", str(self.CONFIG['NETWORKS']['PING_TIMEOUT']), self.CONFIG['NETWORKS']['MANAGEMENT'] ], stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True ) if mgmt_result.returncode != 0: network_health['management_network']['status'] = 'CRITICAL' network_health['management_network']['issues'].append( "Management network is unreachable" ) # Check Ceph network connectivity ceph_result = subprocess.run( [ "ping", "-c", str(self.CONFIG['NETWORKS']['PING_COUNT']), "-W", str(self.CONFIG['NETWORKS']['PING_TIMEOUT']), self.CONFIG['NETWORKS']['CEPH'] ], stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True ) if ceph_result.returncode != 0: network_health['ceph_network']['status'] = 'CRITICAL' network_health['ceph_network']['issues'].append( "Ceph network is unreachable" ) return network_health except Exception as e: logger.error(f"Network health check failed: {e}") return { 'status': 'ERROR', 'error': str(e) } def main(): try: # Argument parser for CLI options parser = argparse.ArgumentParser(description="System Health Monitor") parser.add_argument( "--dry-run", action="store_true", help="Enable dry-run mode (simulate ticket creation without actual API calls)." ) args = parser.parse_args() # Parse command-line arguments or read from configuration file ticket_api_url = "http://10.10.10.45/create_ticket_api.php" # Instantiate the SystemHealthMonitor class monitor = SystemHealthMonitor( ticket_api_url=SystemHealthMonitor.CONFIG['TICKET_API_URL'], dry_run=args.dry_run ) # Run the health checks monitor.run() # Check network health synchronously network_health = monitor._check_network_status() except Exception as e: logger.error(f"An unexpected error occurred: {e}") sys.exit(1) if __name__ == "__main__": # Argument parser for CLI options parser = argparse.ArgumentParser(description="System Health Monitor") parser.add_argument( "--dry-run", action="store_true", help="Enable dry-run mode (simulate ticket creation without actual API calls)." ) args = parser.parse_args() # Set dry-run mode if specified dry_run_mode = args.dry_run main()