Updated function scoping

This commit is contained in:
2025-03-06 11:26:55 -05:00
parent e3e0c73630
commit b2cae0b6aa

View File

@ -76,142 +76,12 @@ class SystemHealthMonitor:
'WD141KRYZ': ['02.01A02']
}
}
def __init__(self,
ticket_api_url: str = 'http://10.10.10.45/create_ticket_api.php',
dry_run: bool = False):
"""
Initialize the system health monitor.
:param ticket_api_url: URL for the ticket creation API.
:param dry_run: If True, simulate API calls without sending requests.
"""
self.ticket_api_url = ticket_api_url
self.dry_run = dry_run
def run(self):
"""
Perform a one-shot health check of the system.
"""
try:
# Perform health checks and gather the report
health_report = self.perform_health_checks()
# Create tickets for any detected critical issues
self._create_tickets_for_issues(health_report)
except Exception as e:
print(f"Unexpected error during health check: {e}")
def perform_health_checks(self) -> Dict[str, Any]:
"""
Perform comprehensive system health checks and return a report.
"""
health_report = {
'drives_health': self._check_drives_health(),
'memory_health': self._check_memory_usage(),
'cpu_health': self._check_cpu_usage(),
'network_health': self._check_network_status()
SEVERITY_INDICATORS = {
'CRITICAL': '🔴',
'WARNING': '🟡',
'HEALTHY': '🟢',
'UNKNOWN': ''
}
if self.dry_run:
logger.info("\n=== System Health Summary ===")
logger.info(f"Overall Drive Health: {health_report['drives_health']['overall_status']}")
# Summarized drive information with usage
logger.info("\nDrive Status:")
for drive in health_report['drives_health']['drives']:
issues = drive.get('smart_issues', [])
temp = f", {drive.get('temperature')}°C" if drive.get('temperature') else ""
status = "⚠️ " if issues else ""
# Disk usage information
usage_info = ""
if drive.get('partitions'):
for partition in drive['partitions']:
usage_info += f"\n └─ {partition['mountpoint']}: {partition['used_space']}/{partition['total_space']} ({partition['usage_percent']}% used)"
logger.info(f"{status}{drive['device']}{temp} - SMART: {drive['smart_status']}{usage_info}")
if issues:
logger.info(f" Issues: {', '.join(issues)}")
logger.info(f"\nMemory: {health_report['memory_health']['memory_percent']}% used")
if health_report['memory_health'].get('has_ecc'):
logger.info("ECC Memory: Present")
if health_report['memory_health'].get('ecc_errors'):
logger.info(f"ECC Errors: {len(health_report['memory_health']['ecc_errors'])} found")
logger.info(f"\nCPU Usage: {health_report['cpu_health']['cpu_usage_percent']}%")
logger.info("\nNetwork Status:")
logger.info(f"Management: {health_report['network_health']['management_network']['status']}")
logger.info(f"Ceph: {health_report['network_health']['ceph_network']['status']}")
logger.info("\n=== End Summary ===")
return health_report
def _get_drive_details(self, device: str) -> Dict[str, str]:
"""
Get detailed drive information using smartctl
"""
drive_details = {
'model': None,
'serial': None,
'capacity': None,
'firmware': None,
'type': None # SSD or HDD
}
try:
result = subprocess.run(
['smartctl', '-i', device],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True
)
for line in result.stdout.split('\n'):
if 'Device Model' in line:
drive_details['model'] = line.split(':')[1].strip()
elif 'Serial Number' in line:
drive_details['serial'] = line.split(':')[1].strip()
elif 'User Capacity' in line:
drive_details['capacity'] = line.split('[')[1].split(']')[0]
elif 'Firmware Version' in line:
drive_details['firmware'] = line.split(':')[1].strip()
elif 'Rotation Rate' in line:
drive_details['type'] = 'SSD' if 'Solid State Device' in line else 'HDD'
except Exception as e:
logger.debug(f"Error getting drive details: {e}")
return drive_details
def _generate_detailed_description(self, issue: str, health_report: Dict[str, Any]) -> str:
hostname = socket.gethostname()
timestamp = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
priority = "⚠ HIGH" if "CRITICAL" in issue else "● MEDIUM"
# Calculate maximum width based on content
content_width = max(
len(hostname),
len(timestamp),
len(priority),
len("HARDWARE MONITORING ALERT TICKET")
) + 10 # Add padding
banner = f"""
{'' * content_width}
{' HARDWARE MONITORING ALERT TICKET '.center(content_width)}
{'' * content_width}
┃ Host : {hostname:<{content_width-13}}
┃ Generated : {timestamp:<{content_width-13}}
┃ Priority : {priority:<{content_width-13}}
{'' * content_width}
"""
description = banner + "\n" + "┏━ ISSUE SUMMARY " + "" * 50 + "\n" + issue + "\n\n"
# Add SMART attribute explanations
SMART_DESCRIPTIONS = {
'Reported_Uncorrect': """
Number of errors that could not be recovered using hardware ECC.
@ -382,13 +252,136 @@ class SystemHealthMonitor:
3. Check workload distribution
"""
}
SEVERITY_INDICATORS = {
'CRITICAL': '🔴',
'WARNING': '🟡',
'HEALTHY': '🟢',
'UNKNOWN': ''
def __init__(self,
ticket_api_url: str = 'http://10.10.10.45/create_ticket_api.php',
dry_run: bool = False):
"""
Initialize the system health monitor.
:param ticket_api_url: URL for the ticket creation API.
:param dry_run: If True, simulate API calls without sending requests.
"""
self.ticket_api_url = ticket_api_url
self.dry_run = dry_run
def run(self):
"""
Perform a one-shot health check of the system.
"""
try:
# Perform health checks and gather the report
health_report = self.perform_health_checks()
# Create tickets for any detected critical issues
self._create_tickets_for_issues(health_report)
except Exception as e:
print(f"Unexpected error during health check: {e}")
def perform_health_checks(self) -> Dict[str, Any]:
"""
Perform comprehensive system health checks and return a report.
"""
health_report = {
'drives_health': self._check_drives_health(),
'memory_health': self._check_memory_usage(),
'cpu_health': self._check_cpu_usage(),
'network_health': self._check_network_status()
}
if self.dry_run:
logger.info("\n=== System Health Summary ===")
logger.info(f"Overall Drive Health: {health_report['drives_health']['overall_status']}")
# Summarized drive information with usage
logger.info("\nDrive Status:")
for drive in health_report['drives_health']['drives']:
issues = drive.get('smart_issues', [])
temp = f", {drive.get('temperature')}°C" if drive.get('temperature') else ""
status = "⚠️ " if issues else ""
# Disk usage information
usage_info = ""
if drive.get('partitions'):
for partition in drive['partitions']:
usage_info += f"\n └─ {partition['mountpoint']}: {partition['used_space']}/{partition['total_space']} ({partition['usage_percent']}% used)"
logger.info(f"{status}{drive['device']}{temp} - SMART: {drive['smart_status']}{usage_info}")
if issues:
logger.info(f" Issues: {', '.join(issues)}")
logger.info(f"\nMemory: {health_report['memory_health']['memory_percent']}% used")
if health_report['memory_health'].get('has_ecc'):
logger.info("ECC Memory: Present")
if health_report['memory_health'].get('ecc_errors'):
logger.info(f"ECC Errors: {len(health_report['memory_health']['ecc_errors'])} found")
logger.info(f"\nCPU Usage: {health_report['cpu_health']['cpu_usage_percent']}%")
logger.info("\nNetwork Status:")
logger.info(f"Management: {health_report['network_health']['management_network']['status']}")
logger.info(f"Ceph: {health_report['network_health']['ceph_network']['status']}")
logger.info("\n=== End Summary ===")
return health_report
def _get_drive_details(self, device: str) -> Dict[str, str]:
"""
Get detailed drive information using smartctl
"""
drive_details = {
'model': None,
'serial': None,
'capacity': None,
'firmware': None,
'type': None # SSD or HDD
}
try:
result = subprocess.run(
['smartctl', '-i', device],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True
)
for line in result.stdout.split('\n'):
if 'Device Model' in line:
drive_details['model'] = line.split(':')[1].strip()
elif 'Serial Number' in line:
drive_details['serial'] = line.split(':')[1].strip()
elif 'User Capacity' in line:
drive_details['capacity'] = line.split('[')[1].split(']')[0]
elif 'Firmware Version' in line:
drive_details['firmware'] = line.split(':')[1].strip()
elif 'Rotation Rate' in line:
drive_details['type'] = 'SSD' if 'Solid State Device' in line else 'HDD'
except Exception as e:
logger.debug(f"Error getting drive details: {e}")
return drive_details
STANDARD_WIDTH = 80
def make_box(title: str, content: str) -> str:
return f"""
┏━ {title} {'' * (content_width - len(title) - 3)}
{content}
{'' * content_width}"""
# Format each section using the consistent width
sections = {
'DRIVE SPECIFICATIONS': ...,
'SMART STATUS': ...,
'PARTITION INFO': ...
}
# Each content line should pad to content_width
for section, content in sections.items():
formatted_content = '\n'.join(f"{line:<{content_width-2}}" for line in content.split('\n'))
description += make_box(section, formatted_content)
def _get_issue_type(self, issue: str) -> str:
if "SMART" in issue:
return "SMART Health Issue"
@ -409,6 +402,21 @@ class SystemHealthMonitor:
return "🟡 Warning - Action Needed Soon"
return "🟢 Low - Monitor Only"
def _generate_detailed_description(self, issue: str, health_report: Dict[str, Any]) -> str:
hostname = socket.gethostname()
timestamp = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
priority = "⚠ HIGH" if "CRITICAL" in issue else "● MEDIUM"
content_width = STANDARD_WIDTH - 2
banner = f"""
{'' * content_width}
{' HARDWARE MONITORING ALERT TICKET '.center(content_width)}
{'' * content_width}
┃ Host : {hostname:<{content_width-13}}
┃ Generated : {timestamp:<{content_width-13}}
┃ Priority : {priority:<{content_width-13}}
{'' * content_width}"""
executive_summary = f"""
┏━ EXECUTIVE SUMMARY ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┓
┃ Issue Type │ {self._get_issue_type(issue)}