Files
hwmonDaemon/hwmonDaemon.py

918 lines
38 KiB
Python

#!/usr/bin/env python3
import os, sys, json, requests, psutil, socket, subprocess, logging, argparse, urllib.request, re, glob, datetime
from typing import Dict, Any, List
# Create a logger
logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)
# Create a console handler and set its level to DEBUG
console_handler = logging.StreamHandler()
console_handler.setLevel(logging.DEBUG)
# Create a formatter
formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
# Add the formatter to the console handler
console_handler.setFormatter(formatter)
# Add the console handler to the logger
logger.addHandler(console_handler)
class SystemHealthMonitor:
PRIORITIES = {
'CRITICAL': '1',
'HIGH': '2',
'MEDIUM': '3',
'LOW': '4'
}
ISSUE_PRIORITIES = {
'SMART_FAILURE': PRIORITIES['HIGH'],
'DISK_CRITICAL': PRIORITIES['HIGH'],
'DISK_WARNING': PRIORITIES['MEDIUM'],
'UNCORRECTABLE_ECC': PRIORITIES['HIGH'],
'CORRECTABLE_ECC': PRIORITIES['MEDIUM'],
'CPU_HIGH': PRIORITIES['MEDIUM'],
'NETWORK_FAILURE': PRIORITIES['HIGH']
}
CONFIG = {
'TICKET_API_URL': 'http://10.10.10.45/create_ticket_api.php',
'THRESHOLDS': {
'DISK_CRITICAL': 90,
'DISK_WARNING': 80,
'CPU_WARNING': 80,
'TEMPERATURE_WARNING': 65
},
'NETWORKS': {
'MANAGEMENT': '10.10.10.1',
'CEPH': '10.10.90.1',
'PING_TIMEOUT': 1, # seconds
'PING_COUNT': 1
}
}
TICKET_TEMPLATES = {
'ACTION_TYPE': '[auto]',
'ENVIRONMENT': '[production]',
'TICKET_TYPE': '[maintenance]',
'HARDWARE_TYPE': '[hardware]',
'NETWORK_TYPE': '[network]',
'SCOPE_SINGLE': '[single-node]',
'SCOPE_CLUSTER': '[cluster-wide]',
'DEFAULT_CATEGORY': 'Hardware',
'DEFAULT_ISSUE_TYPE': 'Problem'
}
PROBLEMATIC_FIRMWARE = {
'Samsung': {
'EVO860': ['RVT01B6Q', 'RVT02B6Q'], # Known issues with sudden performance drops
'EVO870': ['SVT01B6Q'],
'PM883': ['HXT7404Q'] # Known issues with TRIM
},
'Seagate': {
'ST8000NM': ['CC64'], # Known issues with NCQ
'ST12000NM': ['SN02']
},
'WDC': {
'WD121KRYZ': ['01.01A01'], # RAID rebuild issues
'WD141KRYZ': ['02.01A02']
}
}
def __init__(self,
ticket_api_url: str = 'http://10.10.10.45/create_ticket_api.php',
dry_run: bool = False):
"""
Initialize the system health monitor.
:param ticket_api_url: URL for the ticket creation API.
:param dry_run: If True, simulate API calls without sending requests.
"""
self.ticket_api_url = ticket_api_url
self.dry_run = dry_run
def run(self):
"""
Perform a one-shot health check of the system.
"""
try:
# Perform health checks and gather the report
health_report = self.perform_health_checks()
# Create tickets for any detected critical issues
self._create_tickets_for_issues(health_report)
except Exception as e:
print(f"Unexpected error during health check: {e}")
def perform_health_checks(self) -> Dict[str, Any]:
"""
Perform comprehensive system health checks and return a report.
"""
health_report = {
'drives_health': self._check_drives_health(),
'memory_health': self._check_memory_usage(),
'cpu_health': self._check_cpu_usage(),
'network_health': self._check_network_status()
}
if self.dry_run:
logger.info("=== Detailed Health Check Results ===")
logger.info(f"Drive Health Status: {health_report['drives_health']['overall_status']}")
for drive in health_report['drives_health']['drives']:
logger.info(f"Drive {drive['mountpoint']}: {drive['usage_percent']}% used, SMART: {drive['smart_status']}")
logger.info(f"Memory Status: {health_report['memory_health']['status']}")
logger.info(f"Memory Usage: {health_report['memory_health']['memory_percent']}%")
logger.info(f"ECC Memory: {'Present' if health_report['memory_health']['has_ecc'] else 'Not Present'}")
if health_report['memory_health']['has_ecc'] and health_report['memory_health']['ecc_errors']:
logger.info(f"ECC Errors: {health_report['memory_health']['ecc_errors']}")
logger.info(f"CPU Usage: {health_report['cpu_health']['cpu_usage_percent']}% ({health_report['cpu_health']['status']})")
logger.info(f"Network Management Status: {health_report['network_health']['management_network']['status']}")
logger.info(f"Network Ceph Status: {health_report['network_health']['ceph_network']['status']}")
logger.info("================================")
return health_report
def _generate_detailed_description(self, issue: str, health_report: Dict[str, Any]) -> str:
# Keep existing banner and initial description
banner = """
=================================================================
AUTOMATED TICKET - Generated by Hardware Monitoring Service (hwmonDaemon)
Host: {}
Generated: {}
=================================================================
""".format(socket.gethostname(), datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"))
description = banner + issue + "\n\n"
# Add issue explanation section
description += "=== Issue Details ===\n"
# Add SMART attribute explanations
SMART_DESCRIPTIONS = {
'Reallocated_Sector_Ct': """
Number of sectors that have been reallocated due to errors.
- High counts indicate degrading media
- Each reallocation uses one of the drive's limited spare sectors
- Rapid increases suggest accelerating drive wear
""",
'Current_Pending_Sector': """
Sectors waiting to be reallocated due to read/write errors.
- Indicates potentially unstable sectors
- May result in data loss if unrecoverable
- Should be monitored for increases
""",
'Offline_Uncorrectable': """
Count of uncorrectable errors detected during offline data collection.
- Direct indicator of media reliability issues
- May affect data integrity
- High values suggest drive replacement needed
""",
'Reported_Uncorrect': """
Number of errors that could not be recovered using hardware ECC.
- Critical indicator of drive health
- Directly impacts data reliability
- Any non-zero value requires attention
""",
'Spin_Retry_Count': """
Number of spin start retry attempts.
- Indicates potential motor or bearing issues
- May predict imminent mechanical failure
- Increasing values suggest degrading drive health
""",
'Power_On_Hours': """
Total number of hours the device has been powered on.
- Normal aging metric
- Used to gauge overall drive lifetime
- Compare against manufacturer's MTBF rating
""",
'Media_Wearout_Indicator': """
Percentage of drive's rated life remaining (SSDs).
- 100 indicates new drive
- 0 indicates exceeded rated writes
- Critical for SSD lifecycle management
""",
'Temperature_Celsius': """
Current drive temperature.
- High temperatures accelerate wear
- Optimal range: 20-45°C
- Sustained high temps reduce lifespan
""",
'Available_Spare': """
Percentage of spare blocks remaining (SSDs).
- Critical for SSD endurance
- Low values indicate approaching end-of-life
- Rapid decreases suggest excessive writes
""",
'Program_Fail_Count': """
Number of flash program operation failures.
- Indicates NAND cell reliability
- Important for SSD health assessment
- Increasing values suggest flash degradation
""",
'Erase_Fail_Count': """
Number of flash erase operation failures.
- Related to NAND block health
- Critical for SSD reliability
- High counts suggest failing flash blocks
"""
}
if "SMART" in issue:
description += """
SMART (Self-Monitoring, Analysis, and Reporting Technology) issues indicate potential drive reliability problems.
- Reallocated sectors indicate bad blocks that have been remapped
- Pending sectors are potentially failing blocks waiting to be remapped
- Uncorrectable errors indicate data that could not be read
"""
if "Temperature" in issue:
description += """
High drive temperatures can:
- Reduce drive lifespan
- Cause performance degradation
- Lead to data corruption in extreme cases
Optimal temperature range: 20-45°C
"""
if "ECC" in issue:
description += """
ECC (Error Correction Code) Memory Issues:
- Correctable: Memory errors that were successfully fixed
- Uncorrectable: Serious memory errors that could not be corrected
Frequent ECC corrections may indicate degrading memory modules
"""
if "CPU" in issue:
description += """
High CPU usage sustained over time can indicate:
- Resource constraints
- Runaway processes
- Need for performance optimization
- Potential cooling issues
"""
if "Network" in issue:
description += """
Network connectivity issues can impact:
- Cluster communication
- Data replication
- Service availability
- Management access
"""
# Keep existing detailed metrics section
if "Disk" in issue:
for partition in health_report.get('drives_health', {}).get('drives', []):
if partition.get('mountpoint') in issue:
description += f"\n=== Disk Metrics ===\n"
description += f"Disk Device: {partition['device']}\n"
description += f"Mount Point: {partition['mountpoint']}\n"
description += f"Total Space: {partition['total_space']}\n"
description += f"Used Space: {partition['used_space']}\n"
description += f"Free Space: {partition['free_space']}\n"
description += f"Usage Percent: {partition['usage_percent']}%\n"
return description
def _create_tickets_for_issues(self, health_report: Dict[str, Any]):
issues = self._detect_issues(health_report)
if not issues:
logger.info("No issues detected.")
return
hostname = socket.gethostname()
action_type = self.TICKET_TEMPLATES['ACTION_TYPE']
environment = self.TICKET_TEMPLATES['ENVIRONMENT']
ticket_type = self.TICKET_TEMPLATES['TICKET_TYPE']
hardware_type = self.TICKET_TEMPLATES['HARDWARE_TYPE']
for issue in issues:
priority = self.PRIORITIES['MEDIUM']
category = self.TICKET_TEMPLATES['DEFAULT_CATEGORY']
issue_type = self.TICKET_TEMPLATES['DEFAULT_ISSUE_TYPE']
scope = self.TICKET_TEMPLATES['SCOPE_SINGLE']
# Priority and type assignment logic remains the same...
ticket_title = f"[{hostname}]{action_type}{hardware_type} {issue} {scope}{environment}{ticket_type}"
description = self._generate_detailed_description(issue, health_report)
ticket_payload = {
"title": ticket_title,
"description": description,
"priority": priority,
"status": "Open",
"category": category,
"type": issue_type
}
if self.dry_run:
logger.info("Dry-run mode enabled. Simulating ticket creation:")
logger.info(json.dumps(ticket_payload, indent=4))
else:
try:
response = requests.post(
self.ticket_api_url,
json=ticket_payload,
headers={'Content-Type': 'application/json'}
)
response_data = response.json()
if response_data.get('success'):
logger.info(f"Ticket created successfully: {ticket_title}")
logger.info(f"Ticket ID: {response_data.get('ticket_id')}")
elif response_data.get('error') == 'Duplicate ticket':
logger.info(f"Duplicate ticket detected - existing ticket ID: {response_data.get('existing_ticket_id')}")
continue
else:
logger.error(f"Failed to create ticket: {response_data.get('error')}")
except Exception as e:
logger.error(f"Error creating ticket: {e}")
def _detect_issues(self, health_report: Dict[str, Any]) -> List[str]:
"""
Detect issues in the health report including non-critical issues.
:param health_report: The comprehensive health report from the checks.
:return: List of issue descriptions detected during checks.
"""
issues = []
# Check for drive-related issues
for drive in health_report.get('drives_health', {}).get('drives', []):
if drive.get('smart_issues'):
issues.append(f"Drive {drive['device']} has SMART issues: {', '.join(drive['smart_issues'])}")
if drive.get('temperature') and drive['temperature'] > self.CONFIG['THRESHOLDS']['TEMPERATURE_WARNING']:
issues.append(f"Drive {drive['device']} temperature is high: {drive['temperature']}°C")
# Check for ECC memory errors
memory_health = health_report.get('memory_health', {})
if memory_health.get('has_ecc') and memory_health.get('ecc_errors'):
issues.extend(memory_health['ecc_errors'])
# Check for CPU-related issues
cpu_health = health_report.get('cpu_health', {})
if cpu_health and cpu_health.get('cpu_usage_percent', 0) > self.CONFIG['THRESHOLDS']['CPU_WARNING']:
issues.append("CPU usage is above threshold")
# Check for network-related issues
network_health = health_report.get('network_health', {})
for network in ['management_network', 'ceph_network']:
if network_health.get(network, {}).get('issues'):
issues.extend(network_health[network]['issues'])
logger.debug("=== Issue Detection Started ===")
logger.debug(f"Checking drives: {len(health_report['drives_health']['drives'])} found")
logger.debug(f"Memory status: {health_report['memory_health']['status']}")
logger.debug(f"CPU status: {health_report['cpu_health']['status']}")
logger.debug(f"Network status: {health_report['network_health']}")
logger.debug(f"Detected issues: {issues}")
logger.debug("=== Issue Detection Completed ===\n")
return issues
def _get_all_disks(self) -> List[str]:
"""
Get all physical disks using multiple detection methods.
"""
disks = set()
# Method 1: Use lsblk to get physical disks
try:
result = subprocess.run(
['lsblk', '-d', '-n', '-o', 'NAME'],
stdout=subprocess.PIPE,
text=True
)
disks.update(f"/dev/{disk}" for disk in result.stdout.strip().split('\n'))
logger.debug(f"Disks found via lsblk: {disks}")
except Exception as e:
logger.debug(f"lsblk detection failed: {e}")
# Method 2: Direct device scanning
for pattern in ['/dev/sd*', '/dev/nvme*n*']:
try:
matches = glob.glob(pattern)
disks.update(d for d in matches if not d[-1].isdigit())
logger.debug(f"Disks found via glob {pattern}: {matches}")
except Exception as e:
logger.debug(f"Glob detection failed for {pattern}: {e}")
return list(disks)
def _is_physical_disk(self, device_path):
"""
Check if the device is a physical disk, excluding logical volumes and special devices.
:param device_path: Path to the device
:return: Boolean indicating if it's a relevant physical disk
"""
logger.debug(f"Checking device: {device_path}")
# Exclude known non-physical or special devices
excluded_patterns = [
r'/dev/mapper/', # LVM devices
r'/dev/dm-', # Device mapper devices
r'/dev/loop', # Loop devices
r'/dev/rbd', # Ceph RBD devices
r'/boot', # Boot partitions
r'/boot/efi' # EFI partitions
]
if any(re.search(pattern, device_path) for pattern in excluded_patterns):
logger.debug(f"Device {device_path} excluded due to pattern match")
return False
# Match physical devices
physical_patterns = [
r'/dev/sd[a-z]+$', # SATA/SAS drives
r'/dev/nvme\d+n\d+$', # NVMe drives
r'/dev/mmcblk\d+$', # MMC/SD cards
r'/dev/hd[a-z]+$' # IDE drives (legacy)
]
is_physical = any(re.match(pattern, device_path) for pattern in physical_patterns)
logger.debug(f"Device {device_path} physical disk check result: {is_physical}")
return is_physical
def _check_disk_firmware(self, device: str) -> Dict[str, Any]:
"""
Check disk firmware version against known problematic versions.
"""
firmware_info = {
'version': None,
'model': None,
'manufacturer': None,
'is_problematic': False,
'known_issues': []
}
try:
result = subprocess.run(
['smartctl', '-i', device],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True
)
for line in result.stdout.split('\n'):
if 'Firmware Version:' in line:
firmware_info['version'] = line.split(':')[1].strip()
elif 'Model Family:' in line:
firmware_info['model'] = line.split(':')[1].strip()
elif 'Device Model:' in line:
if not firmware_info['model']:
firmware_info['model'] = line.split(':')[1].strip()
# Determine manufacturer
for manufacturer in self.PROBLEMATIC_FIRMWARE.keys():
if manufacturer in firmware_info['model']:
firmware_info['manufacturer'] = manufacturer
break
# Check against known problematic versions
if firmware_info['manufacturer'] and firmware_info['model']:
for model, versions in self.PROBLEMATIC_FIRMWARE[firmware_info['manufacturer']].items():
if model in firmware_info['model'] and firmware_info['version'] in versions:
firmware_info['is_problematic'] = True
firmware_info['known_issues'].append(
f"Known problematic firmware version {firmware_info['version']} "
f"for {firmware_info['model']}"
)
logger.debug(f"=== Firmware Check for {device} ===")
logger.debug(f"Firmware version: {firmware_info['version']}")
logger.debug(f"Model: {firmware_info['model']}")
logger.debug(f"Manufacturer: {firmware_info['manufacturer']}")
logger.debug(f"Known issues: {firmware_info['known_issues']}")
logger.debug("=== End Firmware Check ===\n")
except Exception as e:
firmware_info['known_issues'].append(f"Error checking firmware: {str(e)}")
return firmware_info
def _check_smart_health(self, device: str) -> Dict[str, Any]:
"""
Enhanced SMART health check with detailed failure thresholds.
"""
smart_health = {
'status': 'HEALTHY',
'severity': 'NORMAL',
'issues': [],
'temp': None,
'attributes': {}
}
# Define critical SMART attributes and their thresholds
SMART_THRESHOLDS = {
'Reallocated_Sector_Ct': {'warning': 5, 'critical': 10},
'Current_Pending_Sector': {'warning': 1, 'critical': 5},
'Offline_Uncorrectable': {'warning': 1, 'critical': 2},
'Reported_Uncorrect': {'warning': 1, 'critical': 2},
'Spin_Retry_Count': {'warning': 1, 'critical': 5},
# 'Command_Timeout': {'warning': 5, 'critical': 10}, # Removed
'Power_Cycle_Count': {'warning': 5000, 'critical': 10000},
'Power_On_Hours': {'warning': 61320, 'critical': 70080}, # ~7-8 years
'Media_Wearout_Indicator': {'warning': 30, 'critical': 10},
'Temperature_Celsius': {'warning': 65, 'critical': 75},
'Host_Writes_32MiB': {'warning': 50000000, 'critical': 100000000},
'Wear_Leveling_Count': {'warning': 50, 'critical': 20},
'Available_Spare': {'warning': 30, 'critical': 10},
'Program_Fail_Count': {'warning': 10, 'critical': 20},
'Erase_Fail_Count': {'warning': 10, 'critical': 20},
# 'Raw_Read_Error_Rate': {'warning': 50, 'critical': 100}, # Removed
# 'Seek_Error_Rate': {'warning': 50, 'critical': 100}, # Removed
'Load_Cycle_Count': {'warning': 300000, 'critical': 600000},
'SSD_Life_Left': {'warning': 30, 'critical': 10}
}
try:
# Get firmware information
firmware_info = self._check_disk_firmware(device)
if firmware_info['is_problematic']:
smart_health['severity'] = 'WARNING'
smart_health['issues'].extend(firmware_info['known_issues'])
# Get detailed SMART data including performance metrics
result = subprocess.run(
['smartctl', '-A', '-H', '-l', 'error', '-l', 'background', device],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True
)
output = result.stdout
# Check overall health status
if 'FAILED' in output and 'PASSED' not in output:
smart_health['status'] = 'UNHEALTHY'
smart_health['severity'] = 'CRITICAL'
smart_health['issues'].append("SMART overall health check failed")
# Parse SMART attributes with thresholds
for line in output.split('\n'):
# Inside the try block where SMART attributes are parsed:
if 'Reported_Uncorrect' in line:
parts = line.split()
raw_value = int(parts[9])
logger.debug(f"Found Reported_Uncorrect value: {raw_value}")
smart_health['attributes']['Reported_Uncorrect'] = raw_value
if raw_value >= SMART_THRESHOLDS['Reported_Uncorrect']['critical']:
smart_health['status'] = 'UNHEALTHY'
smart_health['severity'] = 'CRITICAL'
smart_health['issues'].append(f"Critical uncorrectable errors: {raw_value}")
elif raw_value >= SMART_THRESHOLDS['Reported_Uncorrect']['warning']:
if smart_health['severity'] != 'CRITICAL':
smart_health['severity'] = 'WARNING'
smart_health['issues'].append(f"Warning: uncorrectable errors detected: {raw_value}")
for attr, thresholds in SMART_THRESHOLDS.items():
if attr in line:
parts = line.split()
if len(parts) >= 10:
raw_value = int(parts[9])
smart_health['attributes'][attr] = raw_value
if attr == 'Temperature_Celsius':
smart_health['temp'] = raw_value
if raw_value >= thresholds['critical']:
smart_health['severity'] = 'CRITICAL'
smart_health['issues'].append(f"Critical temperature: {raw_value}°C")
elif raw_value >= thresholds['warning']:
smart_health['severity'] = 'WARNING'
smart_health['issues'].append(f"High temperature: {raw_value}°C")
else:
if raw_value >= thresholds['critical']:
smart_health['severity'] = 'CRITICAL'
smart_health['issues'].append(f"Critical {attr}: {raw_value}")
elif raw_value >= thresholds['warning']:
if smart_health['severity'] != 'CRITICAL':
smart_health['severity'] = 'WARNING'
smart_health['issues'].append(f"Warning {attr}: {raw_value}")
# Check for recent SMART errors
error_log_pattern = r"Error \d+ occurred at disk power-on lifetime: (\d+) hours"
error_matches = re.finditer(error_log_pattern, output)
recent_errors = []
for match in error_matches:
error_hour = int(match.group(1))
current_hours = smart_health['attributes'].get('Power_On_Hours', 0)
if current_hours - error_hour < 168: # Errors within last week
recent_errors.append(match.group(0))
if recent_errors:
smart_health['severity'] = 'WARNING'
smart_health['issues'].extend(recent_errors)
smart_health['performance_metrics'] = {
'read_speed': None,
'write_speed': None,
'access_time': None
}
# Quick performance test
try:
perf_result = subprocess.run(
['hdparm', '-Tt', device],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True
)
for line in perf_result.stdout.split('\n'):
if 'buffered disk reads' in line:
smart_health['performance_metrics']['read_speed'] = float(line.split()[0])
elif 'cached reads' in line:
smart_health['performance_metrics']['cached_speed'] = float(line.split()[0])
except:
pass # Skip performance metrics if hdparm fails
logger.debug(f"=== SMART Health Check for {device} ===")
logger.debug("Raw SMART attributes:")
for attr, value in smart_health['attributes'].items():
logger.debug(f"{attr}: {value}")
logger.debug(f"Temperature: {smart_health['temp']}°C")
logger.debug(f"Detected Issues: {smart_health['issues']}")
logger.debug("=== End SMART Check ===\n")
except Exception as e:
smart_health['status'] = 'ERROR'
smart_health['severity'] = 'UNKNOWN'
smart_health['issues'].append(f"Error checking SMART: {str(e)}")
return smart_health
def _check_drives_health(self) -> Dict[str, Any]:
"""
Check overall health of physical SATA and NVMe drives including disk usage and SMART status.
"""
drives_health = {'overall_status': 'NORMAL', 'drives': []}
try:
# Get physical disks only (exclude RBD devices)
physical_disks = [disk for disk in self._get_all_disks()
if disk.startswith(('/dev/sd', '/dev/nvme'))]
logger.debug(f"Checking physical disks: {physical_disks}")
overall_status = 'NORMAL'
for disk in physical_disks:
drive_report = {
'device': disk,
'mountpoint': None,
'usage_status': 'UNMOUNTED',
'usage_percent': 0,
'total_space': '0B',
'used_space': '0B',
'free_space': '0B',
'smart_status': 'UNKNOWN'
}
# Check SMART health first
smart_health = self._check_smart_health(disk)
drive_report.update({
'smart_status': smart_health['status'],
'smart_issues': smart_health['issues'],
'temperature': smart_health['temp'],
'smart_attributes': smart_health['attributes']
})
# Update overall status based on SMART health
if smart_health['status'] == 'UNHEALTHY':
overall_status = 'CRITICAL'
elif smart_health['issues'] and overall_status != 'CRITICAL':
overall_status = 'WARNING'
drives_health['drives'].append(drive_report)
drives_health['overall_status'] = overall_status
except Exception as e:
logger.error(f"Error checking drives health: {str(e)}")
return drives_health
@staticmethod
def _convert_bytes(bytes_value: int, suffix: str = 'B') -> str:
"""
Convert bytes to a human-readable format.
:param bytes_value: Number of bytes to convert.
:param suffix: Suffix to append (default is 'B' for bytes).
:return: Formatted string with the size in human-readable form.
"""
for unit in ['', 'K', 'M', 'G', 'T', 'P', 'E', 'Z']:
if abs(bytes_value) < 1024.0:
return f"{bytes_value:.1f}{unit}{suffix}"
bytes_value /= 1024.0
return f"{bytes_value:.1f}Y{suffix}"
def _check_memory_usage(self) -> Dict[str, Any]:
"""
Check for ECC memory errors if ECC memory is present.
"""
memory_health = {
'has_ecc': False,
'ecc_errors': [],
'status': 'OK',
'total_memory': self._convert_bytes(psutil.virtual_memory().total),
'used_memory': self._convert_bytes(psutil.virtual_memory().used),
'memory_percent': psutil.virtual_memory().percent
}
try:
# First check using dmidecode
result = subprocess.run(
['dmidecode', '--type', 'memory'],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True
)
if 'Error Correction Type: Multi-bit ECC' in result.stdout:
memory_health['has_ecc'] = True
# If dmidecode didn't find ECC, try the edac method as backup
if not memory_health['has_ecc']:
edac_path = '/sys/devices/system/edac/mc'
if os.path.exists(edac_path) and os.listdir(edac_path):
for mc_dir in glob.glob('/sys/devices/system/edac/mc/mc[0-9]*'):
if os.path.exists(f"{mc_dir}/csrow0"):
memory_health['has_ecc'] = True
break
# If ECC is present, check for errors
if memory_health['has_ecc']:
for mc_dir in glob.glob('/sys/devices/system/edac/mc/mc[0-9]*'):
if os.path.exists(f"{mc_dir}/csrow0"):
ue_count = self._read_ecc_count(f"{mc_dir}/csrow0/ue_count")
if ue_count > 0:
memory_health['status'] = 'CRITICAL'
memory_health['ecc_errors'].append(
f"Uncorrectable ECC errors detected in {os.path.basename(mc_dir)}: {ue_count}"
)
ce_count = self._read_ecc_count(f"{mc_dir}/csrow0/ce_count")
if ce_count > 0:
if memory_health['status'] != 'CRITICAL':
memory_health['status'] = 'WARNING'
memory_health['ecc_errors'].append(
f"Correctable ECC errors detected in {os.path.basename(mc_dir)}: {ce_count}"
)
except Exception as e:
memory_health['status'] = 'ERROR'
memory_health['ecc_errors'].append(f"Error checking ECC status: {str(e)}")
return memory_health
def _read_ecc_count(self, filepath: str) -> int:
"""
Read ECC error count from a file.
:param filepath: Path to the ECC count file
:return: Number of ECC errors
"""
try:
with open(filepath, 'r') as f:
return int(f.read().strip())
except:
return 0
def _check_cpu_usage(self) -> Dict[str, Any]:
"""
Check CPU usage and return health metrics.
:return: Dictionary with CPU health metrics.
"""
cpu_usage_percent = psutil.cpu_percent(interval=1)
cpu_health = {
'cpu_usage_percent': cpu_usage_percent,
'status': 'OK' if cpu_usage_percent < self.CONFIG['THRESHOLDS']['CPU_WARNING'] else 'WARNING'
}
return cpu_health
def _check_network_status(self) -> Dict[str, Any]:
"""
Check the status of network interfaces and report any issues.
:return: Dictionary containing network health metrics and any issues found.
"""
network_health = {
'management_network': {
'issues': [],
'status': 'OK',
'latency': None
},
'ceph_network': {
'issues': [],
'status': 'OK',
'latency': None
}
}
try:
# Check management network connectivity
mgmt_result = subprocess.run(
[
"ping",
"-c", str(self.CONFIG['NETWORKS']['PING_COUNT']),
"-W", str(self.CONFIG['NETWORKS']['PING_TIMEOUT']),
self.CONFIG['NETWORKS']['MANAGEMENT']
],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True
)
if mgmt_result.returncode != 0:
network_health['management_network']['status'] = 'CRITICAL'
network_health['management_network']['issues'].append(
"Management network is unreachable"
)
# Check Ceph network connectivity
ceph_result = subprocess.run(
[
"ping",
"-c", str(self.CONFIG['NETWORKS']['PING_COUNT']),
"-W", str(self.CONFIG['NETWORKS']['PING_TIMEOUT']),
self.CONFIG['NETWORKS']['CEPH']
],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True
)
if ceph_result.returncode != 0:
network_health['ceph_network']['status'] = 'CRITICAL'
network_health['ceph_network']['issues'].append(
"Ceph network is unreachable"
)
return network_health
except Exception as e:
logger.error(f"Network health check failed: {e}")
return {
'status': 'ERROR',
'error': str(e)
}
def main():
try:
# Argument parser for CLI options
parser = argparse.ArgumentParser(description="System Health Monitor")
parser.add_argument(
"--dry-run",
action="store_true",
help="Enable dry-run mode (simulate ticket creation without actual API calls)."
)
args = parser.parse_args()
# Parse command-line arguments or read from configuration file
ticket_api_url = "http://10.10.10.45/create_ticket_api.php"
# Instantiate the SystemHealthMonitor class
monitor = SystemHealthMonitor(
ticket_api_url=SystemHealthMonitor.CONFIG['TICKET_API_URL'],
dry_run=args.dry_run
)
# Run the health checks
monitor.run()
# Check network health synchronously
network_health = monitor._check_network_status()
except Exception as e:
logger.error(f"An unexpected error occurred: {e}")
sys.exit(1)
if __name__ == "__main__":
# Argument parser for CLI options
parser = argparse.ArgumentParser(description="System Health Monitor")
parser.add_argument(
"--dry-run",
action="store_true",
help="Enable dry-run mode (simulate ticket creation without actual API calls)."
)
args = parser.parse_args()
# Set dry-run mode if specified
dry_run_mode = args.dry_run
main()