#!/usr/bin/env python3 import os import sys import json import datetime import requests import psutil import socket import subprocess from typing import Dict, Any, List class SystemHealthMonitor: def __init__(self, ticket_api_url: str = 'http://10.10.10.45/create_ticket_api.php', state_file: str = '/tmp/last_health_check.json'): """ Initialize the system health monitor :param ticket_api_url: URL for ticket creation API :param state_file: File to track last health check """ self.ticket_api_url = ticket_api_url self.state_file = state_file def run(self): """ Perform a one-shot health check """ try: # Perform health checks health_report = self.perform_health_checks() # Create tickets for critical issues self._create_tickets_for_issues(health_report) except Exception as e: print(f"Unexpected error during health check: {e}") def perform_health_checks(self) -> Dict[str, Any]: """ Perform comprehensive system health checks :return: Dictionary with health check results """ health_report = { 'drives_health': self._check_drives_health(), 'memory_health': self._check_memory_usage(), 'cpu_health': self._check_cpu_usage(), 'network_health': self._check_network_status() #'temperature_health': self._check_system_temperatures() } return health_report def _create_tickets_for_issues(self, health_report: Dict[str, Any]): """ Create tickets for critical issues with dynamic parameters. """ critical_issues = self._detect_critical_issues(health_report) if not critical_issues: print("No critical issues detected.") return # Initialize default ticket fields priority = "P4" # Default to low priority categories = set() # To accumulate unique categories issue_types = set() # To accumulate unique issue types hostname = socket.gethostname() action_type = "[auto]" scope = "[cluster-wide]" environment = "[production]" ticket_type = "[maintenance]" # Analyze critical issues to determine ticket parameters for issue in critical_issues: if "disk" in issue.lower(): priority = "P3" # Medium priority for disk issues categories.add("Hardware") issue_types.add("Incident") elif "memory" in issue.lower(): priority = "P4" # Low priority for memory issues categories.add("Hardware") issue_types.add("Incident") elif "cpu" in issue.lower(): priority = "P4" # Low priority for CPU issues categories.add("Hardware") issue_types.add("Incident") elif "internet connectivity" in issue.lower(): priority = "P2" # High priority for network issues categories.add("Network") issue_types.add("Problem") elif "health issues" in issue.lower(): priority = "P1" # Critical priority for health issues categories.add("Hardware") issue_types.add("Problem") # Create a list from the set to get unique values category = list(categories)[0] if categories else "Other" issue_type = list(issue_types)[0] if issue_types else "Task" ticket_title = f"[{hostname}]{action_type}[{issue_type}] System Health Issues Detected {scope}{environment}{ticket_type}" ticket_description = "Multiple system health issues detected:\n\n" + "\n".join(critical_issues) ticket_payload = { "title": ticket_title, "description": ticket_description, "priority": priority, "status": "Open", "category": category, "type": issue_type } try: response = requests.post( self.ticket_api_url, json=ticket_payload, headers={'Content-Type': 'application/json'} ) if response.status_code in [200, 201]: print(f"Ticket created successfully: {ticket_title}") else: print(f"Failed to create ticket. Status code: {response.status_code}") print(f"Response: {response.text}") except Exception as e: print(f"Error creating ticket: {e}") def _detect_critical_issues(self, health_report: Dict[str, Any]) -> List[str]: """ Detect critical issues in the health report :param health_report: Comprehensive health report :return: List of critical issue descriptions """ critical_issues = [] for partition in health_report.get('disk_health', {}).get('partitions', []): if partition.get('status') == 'CRITICAL_HIGH_USAGE': critical_issues.append( f"Disk {partition['mountpoint']} is {partition['usage_percent']}% full" ) return critical_issues def _check_drives_health(self) -> Dict[str, Any]: """ Check overall health of drives including disk usage and SMART status. :return: Combined health report of all drives """ drives_health = {'overall_status': 'NORMAL', 'drives': []} try: partitions = psutil.disk_partitions() overall_status = 'NORMAL' for partition in partitions: drive_report = { 'device': partition.device, 'mountpoint': partition.mountpoint } try: # Disk usage usage = psutil.disk_usage(partition.mountpoint) usage_status = 'NORMAL' if usage.percent > 90: usage_status = 'CRITICAL_HIGH_USAGE' elif usage.percent > 80: usage_status = 'WARNING_HIGH_USAGE' drive_report.update({ 'total_space': self._convert_bytes(usage.total), 'used_space': self._convert_bytes(usage.used), 'free_space': self._convert_bytes(usage.free), 'usage_percent': usage.percent, 'usage_status': usage_status }) # Update overall status if usage_status == 'CRITICAL_HIGH_USAGE': overall_status = 'CRITICAL_HIGH_USAGE' elif usage_status == 'WARNING_HIGH_USAGE' and overall_status != 'CRITICAL_HIGH_USAGE': overall_status = 'WARNING_HIGH_USAGE' # SMART status try: result = subprocess.run( ['smartctl', '-H', partition.device], stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True ) output = result.stdout + result.stderr smart_status = 'HEALTHY' if 'PASSED' in output else 'UNHEALTHY' drive_report['smart_status'] = smart_status if smart_status == 'UNHEALTHY' and overall_status != 'CRITICAL_HIGH_USAGE': overall_status = 'UNHEALTHY' except Exception as e: drive_report['smart_status'] = f"ERROR: {str(e)}" except Exception as e: drive_report['error'] = f"Could not check drive: {str(e)}" drives_health['drives'].append(drive_report) drives_health['overall_status'] = overall_status return drives_health except Exception as e: print(f"Drive health check failed: {e}") return {'error': str(e)} def _convert_bytes(self, bytes_value: int, suffix: str = 'B') -> str: """ Convert bytes to human-readable format :param bytes_value: Number of bytes :param suffix: Suffix to append (default 'B' for bytes) :return: Formatted string with size """ for unit in ['', 'K', 'M', 'G', 'T', 'P', 'E', 'Z']: if abs(bytes_value) < 1024.0: return f"{bytes_value:.1f}{unit}{suffix}" bytes_value /= 1024.0 return f"{bytes_value:.1f}Y{suffix}" def _check_memory_usage(self) -> Dict[str, Any]: """ Check memory usage and return health metrics :return: Memory health metrics """ try: memory = psutil.virtual_memory() return { 'total_memory': self._convert_bytes(memory.total), 'used_memory': self._convert_bytes(memory.used), 'free_memory': self._convert_bytes(memory.available), 'memory_percent': memory.percent } except Exception as e: print(f"Memory health check failed: {e}") return {'error': str(e)} def _check_cpu_usage(self) -> Dict[str, Any]: """ Check CPU usage and return health metrics :return: CPU health metrics """ try: cpu_usage = psutil.cpu_percent(interval=1) return { 'cpu_usage_percent': cpu_usage } except Exception as e: print(f"CPU health check failed: {e}") return {'error': str(e)} def _check_network_status(self) -> Dict[str, Any]: """ Check network connectivity between nodes and include detailed identifiers. :return: Network health report """ network_health = { 'management_network': {'status': 'UNKNOWN', 'issues': []}, 'ceph_network': {'status': 'UNKNOWN', 'issues': []} } # IP-to-hostname mapping management_mapping = { '10.10.10.2': 'large1', '10.10.10.10': 'medium1', '10.10.10.4': 'medium2', '10.10.10.8': 'micro1', '10.10.10.9': 'micro2' } ceph_mapping = { '10.10.90.10': 'large1', '10.10.90.4': 'medium1', '10.10.90.3': 'medium2', '10.10.90.2': 'micro1', '10.10.90.6': 'micro2' } def _ping_device(ip: str) -> bool: try: result = subprocess.run(['ping', '-c', '1', ip], stdout=subprocess.PIPE, stderr=subprocess.PIPE) return result.returncode == 0 except Exception as e: print(f"Error pinging {ip}: {e}") return False # Check management network management_ips = list(management_mapping.keys()) for source_ip in management_ips: for target_ip in management_ips: if source_ip != target_ip and not _ping_device(target_ip): source_host = management_mapping[source_ip] target_host = management_mapping[target_ip] issue = f"{source_host} ({source_ip}) cannot reach {target_host} ({target_ip}) in Management Network" network_health['management_network']['issues'].append(issue) # Check Ceph network ceph_ips = list(ceph_mapping.keys()) for source_ip in ceph_ips: for target_ip in ceph_ips: if source_ip != target_ip and not _ping_device(target_ip): source_host = ceph_mapping[source_ip] target_host = ceph_mapping[target_ip] issue = f"{source_host} ({source_ip}) cannot reach {target_host} ({target_ip}) in Ceph Network" network_health['ceph_network']['issues'].append(issue) # Update statuses network_health['management_network']['status'] = 'HEALTHY' if not network_health['management_network']['issues'] else 'ISSUES_DETECTED' network_health['ceph_network']['status'] = 'HEALTHY' if not network_health['ceph_network']['issues'] else 'ISSUES_DETECTED' return network_health def main(): # Initialize the monitor monitor = SystemHealthMonitor( ticket_api_url='http://10.10.10.45/create_ticket_api.php' ) # Run the monitor monitor.run() if __name__ == '__main__': # Require root/sudo for full system access if os.geteuid() != 0: print("This script must be run with sudo/root privileges") sys.exit(1) main()