#!/usr/bin/env python3 import os, sys, json, requests, psutil, socket, subprocess, logging, argparse, urllib.request, re from typing import Dict, Any, List # Create a logger logger = logging.getLogger(__name__) logger.setLevel(logging.DEBUG) # Create a console handler and set its level to DEBUG console_handler = logging.StreamHandler() console_handler.setLevel(logging.DEBUG) # Create a formatter formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s') # Add the formatter to the console handler console_handler.setFormatter(formatter) # Add the console handler to the logger logger.addHandler(console_handler) class SystemHealthMonitor: def __init__(self, ticket_api_url: str = 'http://10.10.10.45/create_ticket_api.php', state_file: str = '/tmp/last_health_check.json', dry_run: bool = False): """ Initialize the system health monitor. :param ticket_api_url: URL for the ticket creation API. :param state_file: File path to track the last health check results. :param dry_run: If True, simulate API calls without sending requests. """ self.ticket_api_url = ticket_api_url self.state_file = state_file self.dry_run = dry_run def run(self): """ Perform a one-shot health check of the system. """ try: # Perform health checks and gather the report health_report = self.perform_health_checks() # Create tickets for any detected critical issues self._create_tickets_for_issues(health_report) except Exception as e: print(f"Unexpected error during health check: {e}") def perform_health_checks(self) -> Dict[str, Any]: """ Perform comprehensive system health checks and return a report. :return: Dictionary containing results of various health checks. """ health_report = { 'drives_health': self._check_drives_health(), 'memory_health': self._check_memory_usage(), 'cpu_health': self._check_cpu_usage(), 'network_health': self._check_network_status() } return health_report def _generate_detailed_description(self, issue: str, health_report: Dict[str, Any]) -> str: """ Generate a detailed description for the issue based on the health report. :param issue: The issue description. :param health_report: The comprehensive health report from the checks. :return: A detailed description for the issue. """ description = issue + "\n\n" if "Disk" in issue: for partition in health_report.get('drives_health', {}).get('drives', []): if partition.get('mountpoint') in issue: description += f"Disk Device: {partition['device']}\n" description += f"Mount Point: {partition['mountpoint']}\n" description += f"Total Space: {partition['total_space']}\n" description += f"Used Space: {partition['used_space']}\n" description += f"Free Space: {partition['free_space']}\n" description += f"Usage Percent: {partition['usage_percent']}%\n" if partition.get('smart_status') == 'UNHEALTHY': try: # Get additional disk information using smartctl result = subprocess.run( ['smartctl', '-a', partition['device']], stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True ) output = result.stdout + result.stderr description += "\nSMART Information:\n" description += output except Exception as e: description += f"Error getting SMART information: {str(e)}\n" break elif "Memory" in issue: memory_health = health_report.get('memory_health', {}) description += f"Total Memory: {memory_health['total_memory']}\n" description += f"Used Memory: {memory_health['used_memory']}\n" description += f"Memory Usage Percent: {memory_health['memory_percent']}%\n" elif "CPU" in issue: cpu_health = health_report.get('cpu_health', {}) description += f"CPU Usage Percent: {cpu_health['cpu_usage_percent']}%\n" elif "Network" in issue: network_health = health_report.get('network_health', {}) for network in ['management_network', 'ceph_network']: if network_health[network]['issues']: description += f"{network.replace('_', ' ').title()} Issues:\n" description += "\n".join(network_health[network]['issues']) description += "\n" return description def _create_tickets_for_issues(self, health_report: Dict[str, Any]): """ Create tickets for detected issues with dynamic parameters based on severity. :param health_report: The comprehensive health report from the checks. """ issues = self._detect_issues(health_report) if not issues: logger.info("No issues detected.") return hostname = socket.gethostname() # Get the current hostname action_type = "[auto]" # Default action type for automatic checks scope = "[cluster-wide]" # Scope of the issues environment = "[production]" # Environment where the issues were found ticket_type = "[maintenance]" # Type of the ticket being created for issue in issues: # Determine priority, category, and type based on the issue detected priority = "4" # Default to low priority category = "Other" issue_type = "Task" if "Disk" in issue: priority = "3" # Medium priority for disk issues category = "Hardware" issue_type = "Incident" elif "Memory" in issue: priority = "4" # Low priority for memory issues category = "Hardware" issue_type = "Incident" elif "CPU" in issue: priority = "4" # Low priority for CPU issues category = "Hardware" issue_type = "Incident" elif "issues" in issue: # Any network issues priority = "2" # High priority for network issues category = "Network" issue_type = "Problem" # Create the ticket title with relevant details ticket_title = f"[{hostname}]{action_type}[{issue_type}] {issue} {scope}{environment}{ticket_type}" description = self._generate_detailed_description(issue, health_report) ticket_payload = { "title": ticket_title, "description": description, "priority": priority, "status": "Open", "category": category, "type": issue_type } if self.dry_run: # Dry-run mode: log the payload instead of sending it logger.info("Dry-run mode enabled. Simulating ticket creation:") logger.info(json.dumps(ticket_payload, indent=4)) print("Dry-run: Ticket payload:") print(json.dumps(ticket_payload, indent=4)) else: # Perform actual API request try: response = requests.post( self.ticket_api_url, json=ticket_payload, headers={'Content-Type': 'application/json'} ) print(f"Response status code: {response.status_code}") print(f"Response body: {response.text}") if response.status_code in [200, 201]: print(f"Ticket created successfully: {ticket_title}") else: print(f"Failed to create ticket. Status code: {response.status_code}") print(f"Response: {response.text}") except Exception as e: print(f"Error creating ticket: {e}") def _detect_issues(self, health_report: Dict[str, Any]) -> List[str]: """ Detect issues in the health report including non-critical issues. :param health_report: The comprehensive health report from the checks. :return: List of issue descriptions detected during checks. """ issues = [] # Check for drive-related issues for partition in health_report.get('drives_health', {}).get('drives', []): if partition.get('usage_status') == 'CRITICAL_HIGH_USAGE': issues.append( f"Disk {partition['mountpoint']} is {partition['usage_percent']}% full" ) elif partition.get('usage_status') == 'WARNING_HIGH_USAGE': issues.append( f"Disk {partition['mountpoint']} is {partition['usage_percent']}% full (Warning)" ) if partition.get('smart_status') == 'UNHEALTHY': issues.append(f"Disk {partition['mountpoint']} has an unhealthy SMART status") # Check for memory-related issues memory_health = health_report.get('memory_health', {}) if memory_health and memory_health.get('memory_percent', 0) > 80: issues.append("Memory usage is above 80%") # Check for CPU-related issues cpu_health = health_report.get('cpu_health', {}) if cpu_health and cpu_health.get('cpu_usage_percent', 0) > 80: issues.append("CPU usage is above 80%") # Check for network-related issues network_health = health_report.get('network_health', {}) for network in ['management_network', 'ceph_network']: if network_health.get(network, {}).get('issues'): issues.extend(network_health[network]['issues']) return issues def _is_physical_disk(self, device_path): """ Check if the device is a physical SATA, NVMe, or MMC disk. :param device_path: Path to the device :return: Boolean indicating if it's a physical disk """ return bool(re.match(r'/dev/(sd[a-z]|nvme\d+n\d+|mmcblk\d+)', device_path)) def _check_smart_health(self, device: str) -> Dict[str, Any]: """ Check comprehensive SMART health metrics for a drive. :param device: Path to device :return: Dictionary containing health metrics and status """ smart_health = { 'status': 'HEALTHY', 'issues': [], 'temp': None, 'attributes': {} } # Get detailed SMART attributes try: result = subprocess.run( ['smartctl', '-A', '-H', device], stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True ) output = result.stdout # Check critical attributes critical_thresholds = { 'Reallocated_Sector_Ct': 10, 'Current_Pending_Sector': 1, 'Offline_Uncorrectable': 1, 'Reported_Uncorrect': 1, 'Command_Timeout': 5, 'Temperature_Celsius': 60 } for line in output.split('\n'): for attr, threshold in critical_thresholds.items(): if attr in line: try: value = int(line.split()[9]) # Raw value is typically in column 10 smart_health['attributes'][attr] = value if attr == 'Temperature_Celsius': smart_health['temp'] = value if value > threshold: smart_health['issues'].append(f"Drive temperature critical: {value}°C") elif value > threshold: smart_health['issues'].append(f"{attr} above threshold: {value}") except (IndexError, ValueError): continue # Check overall SMART status if 'FAILED' in output or smart_health['issues']: smart_health['status'] = 'UNHEALTHY' except Exception as e: smart_health['status'] = 'ERROR' smart_health['issues'].append(f"Error checking SMART: {str(e)}") return smart_health def _check_drives_health(self) -> Dict[str, Any]: """ Check overall health of physical SATA and NVMe drives including disk usage and SMART status. :return: Combined health report of all drives and their status. """ drives_health = {'overall_status': 'NORMAL', 'drives': []} try: partitions = [p for p in psutil.disk_partitions() if self._is_physical_disk(p.device)] overall_status = 'NORMAL' for partition in partitions: drive_report = { 'device': partition.device, 'mountpoint': partition.mountpoint } # Check disk usage usage = psutil.disk_usage(partition.mountpoint) disk_usage_status = 'NORMAL' if usage.percent > 90: disk_usage_status = 'CRITICAL_HIGH_USAGE' elif usage.percent > 80: disk_usage_status = 'WARNING_HIGH_USAGE' drive_report.update({ 'total_space': self._convert_bytes(usage.total), 'used_space': self._convert_bytes(usage.used), 'free_space': self._convert_bytes(usage.free), 'usage_percent': usage.percent, 'usage_status': disk_usage_status }) # Check SMART health smart_health = self._check_smart_health(partition.device) drive_report.update({ 'smart_status': smart_health['status'], 'smart_issues': smart_health['issues'], 'temperature': smart_health['temp'], 'smart_attributes': smart_health['attributes'] }) # Update overall status if smart_health['status'] == 'UNHEALTHY' or disk_usage_status == 'CRITICAL_HIGH_USAGE': overall_status = 'CRITICAL' elif disk_usage_status == 'WARNING_HIGH_USAGE' and overall_status != 'CRITICAL': overall_status = 'WARNING' drives_health['drives'].append(drive_report) drives_health['overall_status'] = overall_status except Exception as e: logger.error(f"Error checking drives health: {str(e)}") return drives_health @staticmethod def _convert_bytes(bytes_value: int, suffix: str = 'B') -> str: """ Convert bytes to a human-readable format. :param bytes_value: Number of bytes to convert. :param suffix: Suffix to append (default is 'B' for bytes). :return: Formatted string with the size in human-readable form. """ for unit in ['', 'K', 'M', 'G', 'T', 'P', 'E', 'Z']: if abs(bytes_value) < 1024.0: return f"{bytes_value:.1f}{unit}{suffix}" bytes_value /= 1024.0 return f"{bytes_value:.1f}Y{suffix}" def _check_memory_usage(self) -> Dict[str, Any]: """ Check memory usage and return health metrics. :return: Dictionary with memory health metrics. """ memory_info = psutil.virtual_memory() memory_health = { 'total_memory': self._convert_bytes(memory_info.total), 'used_memory': self._convert_bytes(memory_info.used), 'memory_percent': memory_info.percent, 'status': 'OK' if memory_info.percent < 90 else 'WARNING' } return memory_health def _check_cpu_usage(self) -> Dict[str, Any]: """ Check CPU usage and return health metrics. :return: Dictionary with CPU health metrics. """ cpu_usage_percent = psutil.cpu_percent(interval=1) cpu_health = { 'cpu_usage_percent': cpu_usage_percent, 'status': 'OK' if cpu_usage_percent < 90 else 'WARNING' } return cpu_health def _check_network_status(self) -> Dict[str, Any]: """ Check the status of network interfaces and report any issues. :return: Dictionary containing network health metrics and any issues found. """ network_health = { 'management_network': {'issues': []}, 'ceph_network': {'issues': []} } try: # Check management network connectivity proc = subprocess.run(["ping", "-c", "1", "10.10.10.1"], stdout=subprocess.PIPE, stderr=subprocess.PIPE) if proc.returncode != 0: network_health['management_network']['issues'].append( "Management network is unreachable." ) # Check Ceph network connectivity proc = subprocess.run(["ping", "-c", "1", "10.10.90.1"], stdout=subprocess.PIPE, stderr=subprocess.PIPE) if proc.returncode != 0: network_health['ceph_network']['issues'].append( "Ceph network is unreachable." ) return network_health except Exception as e: print(f"Network health check failed: {e}") return {'error': str(e)} def main(): try: # Argument parser for CLI options parser = argparse.ArgumentParser(description="System Health Monitor") parser.add_argument( "--dry-run", action="store_true", help="Enable dry-run mode (simulate ticket creation without actual API calls)." ) args = parser.parse_args() # Parse command-line arguments or read from configuration file ticket_api_url = "http://10.10.10.45/create_ticket_api.php" state_file = "/tmp/last_health_check.json" # Instantiate the SystemHealthMonitor class monitor = SystemHealthMonitor( ticket_api_url=ticket_api_url, state_file=state_file, dry_run=args.dry_run # Pass the dry-run flag ) # Run the health checks monitor.run() # Check network health synchronously network_health = monitor._check_network_status() logger.info(f"Network health: {network_health}") except Exception as e: logger.error(f"An unexpected error occurred: {e}") sys.exit(1) if __name__ == "__main__": # Argument parser for CLI options parser = argparse.ArgumentParser(description="System Health Monitor") parser.add_argument( "--dry-run", action="store_true", help="Enable dry-run mode (simulate ticket creation without actual API calls)." ) args = parser.parse_args() # Set dry-run mode if specified dry_run_mode = args.dry_run main()