Better Descriptions

This commit is contained in:
2024-12-05 15:30:16 -05:00
parent a0d14ab659
commit 2dc2b2ae08

View File

@ -1,13 +1,24 @@
#!/usr/bin/env python3
import os
import sys
import json
import requests
import psutil
import socket
import subprocess
import os, sys, json, requests, psutil, socket, subprocess, logging, asyncio
from typing import Dict, Any, List
# Create a logger
logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)
# Create a console handler and set its level to DEBUG
console_handler = logging.StreamHandler()
console_handler.setLevel(logging.DEBUG)
# Create a formatter
formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
# Add the formatter to the console handler
console_handler.setFormatter(formatter)
# Add the console handler to the logger
logger.addHandler(console_handler)
class SystemHealthMonitor:
def __init__(self,
ticket_api_url: str = 'http://10.10.10.45/create_ticket_api.php',
@ -49,6 +60,62 @@ class SystemHealthMonitor:
}
return health_report
def _generate_detailed_description(self, issue: str, health_report: Dict[str, Any]) -> str:
"""
Generate a detailed description for the issue based on the health report.
:param issue: The issue description.
:param health_report: The comprehensive health report from the checks.
:return: A detailed description for the issue.
"""
description = issue + "\n\n"
if "Disk" in issue:
for partition in health_report.get('drives_health', {}).get('drives', []):
if partition.get('mountpoint') in issue:
description += f"Disk Device: {partition['device']}\n"
description += f"Mount Point: {partition['mountpoint']}\n"
description += f"Total Space: {partition['total_space']}\n"
description += f"Used Space: {partition['used_space']}\n"
description += f"Free Space: {partition['free_space']}\n"
description += f"Usage Percent: {partition['usage_percent']}%\n"
if partition.get('smart_status') == 'UNHEALTHY':
try:
# Get additional disk information using smartctl
result = subprocess.run(
['smartctl', '-a', partition['device']],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True
)
output = result.stdout + result.stderr
description += "\nSMART Information:\n"
description += output
except Exception as e:
description += f"Error getting SMART information: {str(e)}\n"
break
elif "Memory" in issue:
memory_health = health_report.get('memory_health', {})
description += f"Total Memory: {memory_health['total_memory']}\n"
description += f"Used Memory: {memory_health['used_memory']}\n"
description += f"Memory Usage Percent: {memory_health['memory_percent']}%\n"
elif "CPU" in issue:
cpu_health = health_report.get('cpu_health', {})
description += f"CPU Usage Percent: {cpu_health['cpu_usage_percent']}%\n"
elif "Network" in issue:
network_health = health_report.get('network_health', {})
for network in ['management_network', 'ceph_network']:
if network_health[network]['issues']:
description += f"{network.replace('_', ' ').title()} Issues:\n"
description += "\n".join(network_health[network]['issues'])
description += "\n"
return description
def _create_tickets_for_issues(self, health_report: Dict[str, Any]):
"""
Create tickets for detected issues with dynamic parameters based on severity.
@ -92,9 +159,12 @@ class SystemHealthMonitor:
# Create the ticket title with relevant details
ticket_title = f"[{hostname}]{action_type}[{issue_type}] {issue} {scope}{environment}{ticket_type}"
# Create a detailed description for the ticket
description = self._generate_detailed_description(issue, health_report)
ticket_payload = {
"title": ticket_title,
"description": issue,
"description": description,
"priority": priority,
"status": "Open",
"category": category,
@ -168,14 +238,13 @@ class SystemHealthMonitor:
def _check_drives_health(self) -> Dict[str, Any]:
"""
Check overall health of drives including disk usage and SMART status.
:return: Combined health report of all drives and their status.
"""
drives_health = {'overall_status': 'NORMAL', 'drives': []}
try:
partitions = psutil.disk_partitions()
overall_status = 'NORMAL'
for partition in partitions:
drive_report = {
'device': partition.device,
@ -184,26 +253,23 @@ class SystemHealthMonitor:
try:
# Check disk usage
usage = psutil.disk_usage(partition.mountpoint)
usage_status = 'NORMAL'
disk_usage_status = 'NORMAL'
if usage.percent > 90:
usage_status = 'CRITICAL_HIGH_USAGE'
disk_usage_status = 'CRITICAL_HIGH_USAGE'
elif usage.percent > 80:
usage_status = 'WARNING_HIGH_USAGE'
disk_usage_status = 'WARNING_HIGH_USAGE'
drive_report.update({
'total_space': self._convert_bytes(usage.total),
'used_space': self._convert_bytes(usage.used),
'free_space': self._convert_bytes(usage.free),
'usage_percent': usage.percent,
'usage_status': usage_status
'disk_usage_status': disk_usage_status
})
# Update overall status based on usage
if usage_status == 'CRITICAL_HIGH_USAGE':
if disk_usage_status == 'CRITICAL_HIGH_USAGE':
overall_status = 'CRITICAL_HIGH_USAGE'
elif usage_status == 'WARNING_HIGH_USAGE' and overall_status != 'CRITICAL_HIGH_USAGE':
elif disk_usage_status == 'WARNING_HIGH_USAGE' and overall_status != 'CRITICAL_HIGH_USAGE':
overall_status = 'WARNING_HIGH_USAGE'
# Check SMART status of the drive
try:
result = subprocess.run(
@ -213,33 +279,26 @@ class SystemHealthMonitor:
text=True
)
output = result.stdout + result.stderr
smart_status = 'HEALTHY' if 'PASSED' in output else 'UNHEALTHY'
drive_report['smart_status'] = smart_status
drive_smart_status = 'HEALTHY' if 'PASSED' in output else 'UNHEALTHY'
drive_report['drive_smart_status'] = drive_smart_status
# Update overall status if SMART status is unhealthy
if smart_status == 'UNHEALTHY' and overall_status != 'CRITICAL_HIGH_USAGE':
if drive_smart_status == 'UNHEALTHY' and overall_status != 'CRITICAL_HIGH_USAGE':
overall_status = 'UNHEALTHY'
except Exception as e:
print(f"Error checking SMART status for {partition.device}: {str(e)}")
drive_report['smart_status'] = 'ERROR'
drive_report['drive_smart_status'] = 'ERROR'
except Exception as e:
drive_report['error'] = f"Could not check drive: {str(e)}"
logger.error(f"Could not check drive: {str(e)}")
drive_report['error'] = str(e)
drives_health['drives'].append(drive_report)
drives_health['overall_status'] = overall_status
return drives_health
except Exception as e:
print(f"Drive health check failed: {e}")
return {'error': str(e)}
def _convert_bytes(self, bytes_value: int, suffix: str = 'B') -> str:
def _convert_bytes(bytes_value: int, suffix: str = 'B') -> str:
"""
Convert bytes to a human-readable format.
:param bytes_value: Number of bytes to convert.
:param suffix: Suffix to append (default is 'B' for bytes).
:return: Formatted string with the size in human-readable form.
@ -261,7 +320,7 @@ class SystemHealthMonitor:
'total_memory': self._convert_bytes(memory_info.total),
'used_memory': self._convert_bytes(memory_info.used),
'memory_percent': memory_info.percent,
'status': 'OK' if memory_info.percent < 80 else 'WARNING'
'status': 'OK' if memory_info.percent < 90 else 'WARNING'
}
return memory_health
@ -274,31 +333,33 @@ class SystemHealthMonitor:
cpu_usage_percent = psutil.cpu_percent(interval=1)
cpu_health = {
'cpu_usage_percent': cpu_usage_percent,
'status': 'OK' if cpu_usage_percent < 80 else 'WARNING'
'status': 'OK' if cpu_usage_percent < 90 else 'WARNING'
}
return cpu_health
def _check_network_status(self) -> Dict[str, Any]:
async def _check_network_status(self) -> Dict[str, Any]:
"""
Check the status of network interfaces and report any issues.
:return: Dictionary containing network health metrics and any issues found.
"""
network_health = {
'management_network': {'issues': []},
'ceph_network': {'issues': []}
}
try:
# Check management network connectivity
management_check = os.system("ping -c 1 10.10.10.1")
if management_check != 0:
proc = await asyncio.create_subprocess_shell("ping -c 1 10.10.10.1", stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE)
await proc.wait()
if proc.returncode != 0:
network_health['management_network']['issues'].append(
"Management network is unreachable."
)
# Check Ceph network connectivity
ceph_check = os.system("ping -c 1 10.10.90.1")
if ceph_check != 0:
proc = await asyncio.create_subprocess_shell("ping -c 1 10.10.90.1", stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE)
await proc.wait()
if proc.returncode != 0:
network_health['ceph_network']['issues'].append(
"Ceph network is unreachable."
)
@ -309,6 +370,29 @@ class SystemHealthMonitor:
print(f"Network health check failed: {e}")
return {'error': str(e)}
if __name__ == '__main__':
monitor = SystemHealthMonitor()
monitor.run()
network_health = asyncio.run(_check_network_status())
def main():
try:
# Parse command-line arguments or read from configuration file
ticket_api_url = "http://10.10.10.45/create_ticket_api.php"
state_file = "/tmp/last_health_check.json"
# Instantiate the SystemHealthMonitor class
monitor = SystemHealthMonitor(ticket_api_url=ticket_api_url, state_file=state_file)
# Run the health checks
monitor.run()
except KeyboardInterrupt:
# Handle KeyboardInterrupt gracefully
print("Interrupted by user. Exiting...")
sys.exit(0)
except Exception as e:
# Handle other exceptions
print(f"An unexpected error occurred: {e}")
sys.exit(1)
if __name__ == "__main__":
main()