Files
hwmonDaemon/hwmonDaemon.py

640 lines
26 KiB
Python

#!/usr/bin/env python3
import os, sys, json, requests, psutil, socket, subprocess, logging, argparse, urllib.request, re, glob
from typing import Dict, Any, List
# Create a logger
logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)
# Create a console handler and set its level to DEBUG
console_handler = logging.StreamHandler()
console_handler.setLevel(logging.DEBUG)
# Create a formatter
formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
# Add the formatter to the console handler
console_handler.setFormatter(formatter)
# Add the console handler to the logger
logger.addHandler(console_handler)
class SystemHealthMonitor:
PRIORITIES = {
'CRITICAL': '1',
'HIGH': '2',
'MEDIUM': '3',
'LOW': '4'
}
ISSUE_PRIORITIES = {
'SMART_FAILURE': PRIORITIES['HIGH'],
'DISK_CRITICAL': PRIORITIES['HIGH'],
'DISK_WARNING': PRIORITIES['MEDIUM'],
'UNCORRECTABLE_ECC': PRIORITIES['HIGH'],
'CORRECTABLE_ECC': PRIORITIES['MEDIUM'],
'CPU_HIGH': PRIORITIES['MEDIUM'],
'NETWORK_FAILURE': PRIORITIES['HIGH']
}
CONFIG = {
'TICKET_API_URL': 'http://10.10.10.45/create_ticket_api.php',
'THRESHOLDS': {
'DISK_CRITICAL': 90,
'DISK_WARNING': 80,
'CPU_WARNING': 80,
'TEMPERATURE_WARNING': 65
},
'NETWORKS': {
'MANAGEMENT': '10.10.10.1',
'CEPH': '10.10.90.1',
'PING_TIMEOUT': 1, # seconds
'PING_COUNT': 1
}
}
TICKET_TEMPLATES = {
'ACTION_TYPE': '[auto]',
'ENVIRONMENT': '[production]',
'TICKET_TYPE': '[maintenance]',
'HARDWARE_TYPE': '[hardware]',
'NETWORK_TYPE': '[network]',
'SCOPE_SINGLE': '[single-node]',
'SCOPE_CLUSTER': '[cluster-wide]',
'DEFAULT_CATEGORY': 'Hardware',
'DEFAULT_ISSUE_TYPE': 'Problem'
}
def __init__(self,
ticket_api_url: str = 'http://10.10.10.45/create_ticket_api.php',
dry_run: bool = False):
"""
Initialize the system health monitor.
:param ticket_api_url: URL for the ticket creation API.
:param state_file: File path to track the last health check results.
:param dry_run: If True, simulate API calls without sending requests.
"""
self.ticket_api_url = ticket_api_url
self.state_file = state_file
self.dry_run = dry_run
def run(self):
"""
Perform a one-shot health check of the system.
"""
try:
# Perform health checks and gather the report
health_report = self.perform_health_checks()
# Create tickets for any detected critical issues
self._create_tickets_for_issues(health_report)
except Exception as e:
print(f"Unexpected error during health check: {e}")
def perform_health_checks(self) -> Dict[str, Any]:
"""
Perform comprehensive system health checks and return a report.
"""
health_report = {
'drives_health': self._check_drives_health(),
'memory_health': self._check_memory_usage(),
'cpu_health': self._check_cpu_usage(),
'network_health': self._check_network_status()
}
if self.dry_run:
logger.info("=== Detailed Health Check Results ===")
logger.info(f"Drive Health Status: {health_report['drives_health']['overall_status']}")
for drive in health_report['drives_health']['drives']:
logger.info(f"Drive {drive['mountpoint']}: {drive['usage_percent']}% used, SMART: {drive['smart_status']}")
logger.info(f"Memory Status: {health_report['memory_health']['status']}")
logger.info(f"Memory Usage: {health_report['memory_health']['memory_percent']}%")
logger.info(f"ECC Memory: {'Present' if health_report['memory_health']['has_ecc'] else 'Not Present'}")
if health_report['memory_health']['has_ecc'] and health_report['memory_health']['ecc_errors']:
logger.info(f"ECC Errors: {health_report['memory_health']['ecc_errors']}")
logger.info(f"CPU Usage: {health_report['cpu_health']['cpu_usage_percent']}% ({health_report['cpu_health']['status']})")
logger.info(f"Network Management Status: {health_report['network_health']['management_network']['status']}")
logger.info(f"Network Ceph Status: {health_report['network_health']['ceph_network']['status']}")
logger.info("================================")
return health_report
def _generate_detailed_description(self, issue: str, health_report: Dict[str, Any]) -> str:
"""
Generate a detailed description for the issue based on the health report.
:param issue: The issue description.
:param health_report: The comprehensive health report from the checks.
:return: A detailed description for the issue.
"""
banner = """
=================================================================
AUTOMATED TICKET - Generated by Hardware Monitoring Service (hwmonDaemon)
Host: {}
Generated: {}
=================================================================
""".format(socket.gethostname(), datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"))
description = banner + issue + "\n\n"
if "Disk" in issue:
for partition in health_report.get('drives_health', {}).get('drives', []):
if partition.get('mountpoint') in issue:
description += f"Disk Device: {partition['device']}\n"
description += f"Mount Point: {partition['mountpoint']}\n"
description += f"Total Space: {partition['total_space']}\n"
description += f"Used Space: {partition['used_space']}\n"
description += f"Free Space: {partition['free_space']}\n"
description += f"Usage Percent: {partition['usage_percent']}%\n"
if partition.get('smart_status') == 'UNHEALTHY':
try:
# Get additional disk information using smartctl
result = subprocess.run(
['smartctl', '-a', partition['device']],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True
)
output = result.stdout + result.stderr
description += "\nSMART Information:\n"
description += output
except Exception as e:
description += f"Error getting SMART information: {str(e)}\n"
break
elif "Memory" in issue:
memory_health = health_report.get('memory_health', {})
description += f"Total Memory: {memory_health['total_memory']}\n"
description += f"Used Memory: {memory_health['used_memory']}\n"
description += f"Memory Usage Percent: {memory_health['memory_percent']}%\n"
elif "CPU" in issue:
cpu_health = health_report.get('cpu_health', {})
description += f"CPU Usage Percent: {cpu_health['cpu_usage_percent']}%\n"
elif "Network" in issue:
network_health = health_report.get('network_health', {})
for network in ['management_network', 'ceph_network']:
if network_health[network]['issues']:
description += f"{network.replace('_', ' ').title()} Issues:\n"
description += "\n".join(network_health[network]['issues'])
description += "\n"
return description
def _create_tickets_for_issues(self, health_report: Dict[str, Any]):
"""
Create tickets for detected issues with standardized parameters.
"""
issues = self._detect_issues(health_report)
if not issues:
logger.info("No issues detected.")
return
hostname = socket.gethostname()
action_type = self.TICKET_TEMPLATES['ACTION_TYPE']
environment = self.TICKET_TEMPLATES['ENVIRONMENT']
ticket_type = self.TICKET_TEMPLATES['TICKET_TYPE']
for issue in issues:
priority = self.PRIORITIES['MEDIUM'] # default
category = self.TICKET_TEMPLATES['DEFAULT_CATEGORY']
issue_type = self.TICKET_TEMPLATES['DEFAULT_ISSUE_TYPE']
scope = self.TICKET_TEMPLATES['SCOPE_SINGLE']
if "Disk" in issue:
hardware_type = self.TICKET_TEMPLATES['HARDWARE_TYPE']
if "CRITICAL" in issue or "SMART failure" in issue:
priority = self.ISSUE_PRIORITIES['DISK_CRITICAL']
elif "WARNING" in issue:
priority = self.ISSUE_PRIORITIES['DISK_WARNING']
elif "Network" in issue:
hardware_type = self.TICKET_TEMPLATES['NETWORK_TYPE']
priority = self.ISSUE_PRIORITIES['NETWORK_FAILURE']
scope = self.TICKET_TEMPLATES['SCOPE_CLUSTER']
elif "Uncorrectable ECC" in issue:
hardware_type = "[hardware]"
priority = "2"
elif "Correctable ECC" in issue:
hardware_type = "[hardware]"
priority = "3"
elif "CPU" in issue:
hardware_type = "[hardware]"
priority = "3"
# Create standardized ticket title
ticket_title = f"[{hostname}]{action_type}{hardware_type} {issue} {scope}{environment}{ticket_type}"
description = self._generate_detailed_description(issue, health_report)
ticket_payload = {
"title": ticket_title,
"description": description,
"priority": priority,
"status": "Open",
"category": category,
"type": issue_type
}
if self.dry_run:
logger.info("Dry-run mode enabled. Simulating ticket creation:")
logger.info(json.dumps(ticket_payload, indent=4))
print("Dry-run: Ticket payload:")
print(json.dumps(ticket_payload, indent=4))
else:
try:
response = requests.post(
self.ticket_api_url,
json=ticket_payload,
headers={'Content-Type': 'application/json'}
)
logger.info(f"Response status code: {response.status_code}")
logger.info(f"Response body: {response.text}")
if response.status_code in [200, 201]:
logger.info(f"Ticket created successfully: {ticket_title}")
else:
logger.error(f"Failed to create ticket. Status code: {response.status_code}")
logger.error(f"Response: {response.text}")
except Exception as e:
logger.error(f"Error creating ticket: {e}")
def _detect_issues(self, health_report: Dict[str, Any]) -> List[str]:
"""
Detect issues in the health report including non-critical issues.
:param health_report: The comprehensive health report from the checks.
:return: List of issue descriptions detected during checks.
"""
issues = []
# Check for drive-related issues
for partition in health_report.get('drives_health', {}).get('drives', []):
if partition.get('usage_status') == 'CRITICAL_HIGH_USAGE':
issues.append(
f"Disk {partition['mountpoint']} is {partition['usage_percent']}% full"
)
elif partition.get('usage_status') == 'WARNING_HIGH_USAGE':
issues.append(
f"Disk {partition['mountpoint']} is {partition['usage_percent']}% full (Warning)"
)
if partition.get('smart_status') == 'UNHEALTHY':
issues.append(f"Disk {partition['mountpoint']} has an unhealthy SMART status")
# Check for ECC memory errors
memory_health = health_report.get('memory_health', {})
if memory_health.get('has_ecc') and memory_health.get('ecc_errors'):
issues.extend(memory_health['ecc_errors'])
# Check for CPU-related issues
cpu_health = health_report.get('cpu_health', {})
if cpu_health and cpu_health.get('cpu_usage_percent', 0) > self.CONFIG['THRESHOLDS']['CPU_WARNING']:
issues.append("CPU usage is above threshold")
# Check for network-related issues
network_health = health_report.get('network_health', {})
for network in ['management_network', 'ceph_network']:
if network_health.get(network, {}).get('issues'):
issues.extend(network_health[network]['issues'])
return issues
def _is_physical_disk(self, device_path):
"""
Check if the device is a physical SATA, NVMe, or MMC disk, excluding system partitions.
:param device_path: Path to the device
:return: Boolean indicating if it's a relevant physical disk
"""
excluded_mounts = ['/boot', '/boot/efi']
if any(device_path.startswith(mount) for mount in excluded_mounts):
return False
return bool(re.match(r'/dev/(sd[a-z]|nvme\d+n\d+|mmcblk\d+)', device_path))
def _check_smart_health(self, device: str) -> Dict[str, Any]:
"""
Check comprehensive SMART health metrics for a drive.
"""
smart_health = {
'status': 'HEALTHY',
'issues': [],
'temp': None,
'attributes': {}
}
try:
result = subprocess.run(
['smartctl', '-A', '-H', device],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True
)
output = result.stdout
# Check overall SMART status first
if 'FAILED' in output and 'PASSED' not in output:
smart_health['status'] = 'UNHEALTHY'
smart_health['issues'].append("SMART overall health check failed")
# Parse SMART attributes
for line in output.split('\n'):
if 'ATTRIBUTE_NAME' in line:
continue
# Check for current failures only
if 'WHEN_FAILED' in line and 'In_the_past' not in line and '-' not in line:
smart_health['status'] = 'UNHEALTHY'
smart_health['issues'].append(f"Current failure detected: {line}")
# Monitor critical attributes
for attr in ['Reallocated_Sector_Ct', 'Current_Pending_Sector',
'Offline_Uncorrectable', 'Reported_Uncorrect']:
if attr in line:
parts = line.split()
if len(parts) >= 10:
raw_value = int(parts[9])
if raw_value > 0:
smart_health['status'] = 'UNHEALTHY'
smart_health['issues'].append(f"{attr} has value {raw_value}")
# Check temperature
if 'Temperature_Celsius' in line or 'Airflow_Temperature_Cel' in line:
parts = line.split()
if len(parts) >= 10:
temp = int(parts[9])
smart_health['temp'] = temp
if temp > self.CONFIG['THRESHOLDS']['TEMPERATURE_WARNING']:
smart_health['issues'].append(f"High drive temperature: {temp}°C")
except Exception as e:
smart_health['status'] = 'ERROR'
smart_health['issues'].append(f"Error checking SMART: {str(e)}")
return smart_health
def _check_drives_health(self) -> Dict[str, Any]:
"""
Check overall health of physical SATA and NVMe drives including disk usage and SMART status.
:return: Combined health report of all drives and their status.
"""
drives_health = {'overall_status': 'NORMAL', 'drives': []}
try:
partitions = [p for p in psutil.disk_partitions() if self._is_physical_disk(p.device)]
overall_status = 'NORMAL'
for partition in partitions:
drive_report = {
'device': partition.device,
'mountpoint': partition.mountpoint
}
# Check disk usage
usage = psutil.disk_usage(partition.mountpoint)
disk_usage_status = 'NORMAL'
if usage.percent > self.CONFIG['THRESHOLDS']['DISK_CRITICAL']:
disk_usage_status = 'CRITICAL_HIGH_USAGE'
elif usage.percent > self.CONFIG['THRESHOLDS']['DISK_WARNING']:
disk_usage_status = 'WARNING_HIGH_USAGE'
drive_report.update({
'total_space': self._convert_bytes(usage.total),
'used_space': self._convert_bytes(usage.used),
'free_space': self._convert_bytes(usage.free),
'usage_percent': usage.percent,
'usage_status': disk_usage_status
})
# Check SMART health
smart_health = self._check_smart_health(partition.device)
drive_report.update({
'smart_status': smart_health['status'],
'smart_issues': smart_health['issues'],
'temperature': smart_health['temp'],
'smart_attributes': smart_health['attributes']
})
# Update overall status
if smart_health['status'] == 'UNHEALTHY' or disk_usage_status == 'CRITICAL_HIGH_USAGE':
overall_status = 'CRITICAL'
elif disk_usage_status == 'WARNING_HIGH_USAGE' and overall_status != 'CRITICAL':
overall_status = 'WARNING'
drives_health['drives'].append(drive_report)
drives_health['overall_status'] = overall_status
except Exception as e:
logger.error(f"Error checking drives health: {str(e)}")
return drives_health
@staticmethod
def _convert_bytes(bytes_value: int, suffix: str = 'B') -> str:
"""
Convert bytes to a human-readable format.
:param bytes_value: Number of bytes to convert.
:param suffix: Suffix to append (default is 'B' for bytes).
:return: Formatted string with the size in human-readable form.
"""
for unit in ['', 'K', 'M', 'G', 'T', 'P', 'E', 'Z']:
if abs(bytes_value) < 1024.0:
return f"{bytes_value:.1f}{unit}{suffix}"
bytes_value /= 1024.0
return f"{bytes_value:.1f}Y{suffix}"
def _check_memory_usage(self) -> Dict[str, Any]:
"""
Check for ECC memory errors if ECC memory is present.
"""
memory_health = {
'has_ecc': False,
'ecc_errors': [],
'status': 'OK',
'total_memory': self._convert_bytes(psutil.virtual_memory().total),
'used_memory': self._convert_bytes(psutil.virtual_memory().used),
'memory_percent': psutil.virtual_memory().percent
}
try:
# First check using dmidecode
result = subprocess.run(
['dmidecode', '--type', 'memory'],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True
)
if 'Error Correction Type: Multi-bit ECC' in result.stdout:
memory_health['has_ecc'] = True
# If dmidecode didn't find ECC, try the edac method as backup
if not memory_health['has_ecc']:
edac_path = '/sys/devices/system/edac/mc'
if os.path.exists(edac_path) and os.listdir(edac_path):
for mc_dir in glob.glob('/sys/devices/system/edac/mc/mc[0-9]*'):
if os.path.exists(f"{mc_dir}/csrow0"):
memory_health['has_ecc'] = True
break
# If ECC is present, check for errors
if memory_health['has_ecc']:
for mc_dir in glob.glob('/sys/devices/system/edac/mc/mc[0-9]*'):
if os.path.exists(f"{mc_dir}/csrow0"):
ue_count = self._read_ecc_count(f"{mc_dir}/csrow0/ue_count")
if ue_count > 0:
memory_health['status'] = 'CRITICAL'
memory_health['ecc_errors'].append(
f"Uncorrectable ECC errors detected in {os.path.basename(mc_dir)}: {ue_count}"
)
ce_count = self._read_ecc_count(f"{mc_dir}/csrow0/ce_count")
if ce_count > 0:
if memory_health['status'] != 'CRITICAL':
memory_health['status'] = 'WARNING'
memory_health['ecc_errors'].append(
f"Correctable ECC errors detected in {os.path.basename(mc_dir)}: {ce_count}"
)
except Exception as e:
memory_health['status'] = 'ERROR'
memory_health['ecc_errors'].append(f"Error checking ECC status: {str(e)}")
return memory_health
def _read_ecc_count(self, filepath: str) -> int:
"""
Read ECC error count from a file.
:param filepath: Path to the ECC count file
:return: Number of ECC errors
"""
try:
with open(filepath, 'r') as f:
return int(f.read().strip())
except:
return 0
def _check_cpu_usage(self) -> Dict[str, Any]:
"""
Check CPU usage and return health metrics.
:return: Dictionary with CPU health metrics.
"""
cpu_usage_percent = psutil.cpu_percent(interval=1)
cpu_health = {
'cpu_usage_percent': cpu_usage_percent,
'status': 'OK' if cpu_usage_percent < self.CONFIG['THRESHOLDS']['CPU_WARNING'] else 'WARNING'
}
return cpu_health
def _check_network_status(self) -> Dict[str, Any]:
"""
Check the status of network interfaces and report any issues.
:return: Dictionary containing network health metrics and any issues found.
"""
network_health = {
'management_network': {
'issues': [],
'status': 'OK',
'latency': None
},
'ceph_network': {
'issues': [],
'status': 'OK',
'latency': None
}
}
try:
# Check management network connectivity
mgmt_result = subprocess.run(
[
"ping",
"-c", str(self.CONFIG['NETWORKS']['PING_COUNT']),
"-W", str(self.CONFIG['NETWORKS']['PING_TIMEOUT']),
self.CONFIG['NETWORKS']['MANAGEMENT']
],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True
)
if mgmt_result.returncode != 0:
network_health['management_network']['status'] = 'CRITICAL'
network_health['management_network']['issues'].append(
"Management network is unreachable"
)
# Check Ceph network connectivity
ceph_result = subprocess.run(
[
"ping",
"-c", str(self.CONFIG['NETWORKS']['PING_COUNT']),
"-W", str(self.CONFIG['NETWORKS']['PING_TIMEOUT']),
self.CONFIG['NETWORKS']['CEPH']
],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True
)
if ceph_result.returncode != 0:
network_health['ceph_network']['status'] = 'CRITICAL'
network_health['ceph_network']['issues'].append(
"Ceph network is unreachable"
)
return network_health
except Exception as e:
logger.error(f"Network health check failed: {e}")
return {
'status': 'ERROR',
'error': str(e)
}
def main():
try:
# Argument parser for CLI options
parser = argparse.ArgumentParser(description="System Health Monitor")
parser.add_argument(
"--dry-run",
action="store_true",
help="Enable dry-run mode (simulate ticket creation without actual API calls)."
)
args = parser.parse_args()
# Parse command-line arguments or read from configuration file
ticket_api_url = "http://10.10.10.45/create_ticket_api.php"
# Instantiate the SystemHealthMonitor class
monitor = SystemHealthMonitor(
ticket_api_url=SystemHealthMonitor.CONFIG['TICKET_API_URL'],
state_file=SystemHealthMonitor.CONFIG['STATE_FILE'],
dry_run=args.dry_run
)
# Run the health checks
monitor.run()
# Check network health synchronously
network_health = monitor._check_network_status()
except Exception as e:
logger.error(f"An unexpected error occurred: {e}")
sys.exit(1)
if __name__ == "__main__":
# Argument parser for CLI options
parser = argparse.ArgumentParser(description="System Health Monitor")
parser.add_argument(
"--dry-run",
action="store_true",
help="Enable dry-run mode (simulate ticket creation without actual API calls)."
)
args = parser.parse_args()
# Set dry-run mode if specified
dry_run_mode = args.dry_run
main()