#!/usr/bin/env python3 import os import json import requests import psutil import socket import subprocess import logging import argparse import re import glob import datetime import fcntl import textwrap import shutil from concurrent.futures import ThreadPoolExecutor, as_completed from typing import Dict, Any, List # ============================================================================= # LOGGING SETUP # ============================================================================= logger = logging.getLogger(__name__) logger.setLevel(logging.INFO) console_handler = logging.StreamHandler() console_handler.setLevel(logging.INFO) formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s') console_handler.setFormatter(formatter) logger.addHandler(console_handler) class SystemHealthMonitor: # ============================================================================= # CLASS CONSTANTS AND CONFIGURATION # ============================================================================= STANDARD_WIDTH = 80 PRIORITIES = { 'CRITICAL': '1', # P1 - Cluster outages, total system failure 'HIGH': '2', # P2 - Hardware failures, same-day response 'MEDIUM': '3', # P3 - Warnings, 1-3 day response 'NORMAL': '4', # P4 - Standard monitoring alerts 'LOW': '5' # P5 - Informational, minimal impact } ISSUE_PRIORITIES = { # P1 - Critical System Issues (cluster-wide impact) 'CLUSTER_FAILURE': PRIORITIES['CRITICAL'], 'MULTIPLE_DRIVE_FAILURE': PRIORITIES['CRITICAL'], 'RAID_DEGRADED': PRIORITIES['CRITICAL'], # P2 - Hardware Failures (same-day response) 'SMART_FAILURE': PRIORITIES['HIGH'], 'SMART_CRITICAL': PRIORITIES['HIGH'], 'DISK_CRITICAL': PRIORITIES['HIGH'], 'UNCORRECTABLE_ECC': PRIORITIES['HIGH'], 'NETWORK_FAILURE': PRIORITIES['HIGH'], 'TEMPERATURE_CRITICAL': PRIORITIES['HIGH'], 'SSD_WEAR_CRITICAL': PRIORITIES['HIGH'], 'NVME_SPARE_CRITICAL': PRIORITIES['HIGH'], 'FIRMWARE_CRITICAL': PRIORITIES['HIGH'], 'REALLOCATED_SECTOR': PRIORITIES['HIGH'], 'PENDING_SECTOR': PRIORITIES['HIGH'], # P3 - Warnings (1-3 day response) 'SMART_WARNING': PRIORITIES['MEDIUM'], 'DISK_WARNING': PRIORITIES['MEDIUM'], 'CORRECTABLE_ECC': PRIORITIES['MEDIUM'], 'TEMPERATURE_WARNING': PRIORITIES['MEDIUM'], 'SSD_WEAR_WARNING': PRIORITIES['MEDIUM'], 'NVME_SPARE_WARNING': PRIORITIES['MEDIUM'], 'LXC_STORAGE_CRITICAL': PRIORITIES['MEDIUM'], 'TREND_ALERT': PRIORITIES['MEDIUM'], # P4 - Normal Monitoring (standard response) 'CPU_HIGH': PRIORITIES['NORMAL'], 'LXC_STORAGE_WARNING': PRIORITIES['NORMAL'], 'SYSTEM_LOG_WARNING': PRIORITIES['NORMAL'], 'DRIVE_AGE_WARNING': PRIORITIES['NORMAL'], # P5 - Informational (minimal impact) 'TEMPERATURE_INFO': PRIORITIES['LOW'], 'DRIVE_AGE_INFO': PRIORITIES['LOW'], 'SSD_WEAR_INFO': PRIORITIES['LOW'], 'SYSTEM_LOG_INFO': PRIORITIES['LOW'], # Ceph cluster issues 'CEPH_HEALTH_ERR': PRIORITIES['CRITICAL'], # P1 - Cluster in error state 'CEPH_HEALTH_WARN': PRIORITIES['MEDIUM'], # P3 - Cluster warnings 'CEPH_OSD_DOWN': PRIORITIES['HIGH'], # P2 - OSD down (local node) 'CEPH_USAGE_CRITICAL': PRIORITIES['HIGH'], # P2 - Cluster near full 'CEPH_USAGE_WARNING': PRIORITIES['MEDIUM'], # P3 - Cluster usage high 'CEPH_PG_DEGRADED': PRIORITIES['HIGH'], # P2 - PGs degraded 'CEPH_MON_DOWN': PRIORITIES['HIGH'], # P2 - Monitor down # PBS (Proxmox Backup Server) issues 'PBS_ZFS_DEGRADED': PRIORITIES['CRITICAL'], # P1 - ZFS pool degraded 'PBS_ZFS_USAGE_CRITICAL': PRIORITIES['HIGH'], # P2 - ZFS pool near full 'PBS_ZFS_USAGE_WARNING': PRIORITIES['MEDIUM'], # P3 - ZFS pool usage high 'PBS_ZFS_ERRORS': PRIORITIES['HIGH'], # P2 - ZFS pool has errors 'PBS_BACKUP_FAILED': PRIORITIES['HIGH'], # P2 - Backup job failed 'PBS_GC_FAILED': PRIORITIES['MEDIUM'], # P3 - Garbage collection failed 'PBS_SYNC_FAILED': PRIORITIES['MEDIUM'] # P3 - Sync job failed } CONFIG = { 'TICKET_API_URL': 'http://10.10.10.45/create_ticket_api.php', 'TICKET_API_KEY': None, # Will be loaded from .env file 'THRESHOLDS': { 'DISK_CRITICAL': 90, 'DISK_WARNING': 80, 'LXC_CRITICAL': 90, 'LXC_WARNING': 80, 'CPU_WARNING': 95, 'TEMPERATURE_WARNING': 65 }, 'NETWORKS': { 'MANAGEMENT': '10.10.10.1', 'CEPH': '10.10.90.1', 'PING_TIMEOUT': 1, 'PING_COUNT': 1 }, 'EXCLUDED_MOUNTS': [ '/media', '/mnt/pve/mediafs', '/opt/metube_downloads' ], 'EXCLUDED_PATTERNS': [ r'/media.*', r'/mnt/pve/mediafs.*', r'.*/media$', r'.*mediafs.*', r'.*/downloads.*' ], 'HISTORY_DIR': '/var/log/hwmonDaemon', 'HISTORY_RETENTION_DAYS': 30, 'INCLUDE_INFO_TICKETS': False, # Set True to create P5 tickets for INFO alerts 'PRIORITY_ESCALATION_THRESHOLD': 3, # Number of criticals to trigger P1 # Ceph monitoring settings 'CEPH_ENABLED': True, # Enable/disable Ceph health monitoring 'CEPH_TICKET_NODE': None, # Hostname of node designated to create cluster-wide Ceph tickets 'CEPH_USAGE_WARNING': 70, # Ceph cluster usage warning threshold % 'CEPH_USAGE_CRITICAL': 85, # Ceph cluster usage critical threshold % # Cluster identification for tickets 'CLUSTER_NAME': 'proxmox-cluster', # Name used in cluster-wide ticket titles instead of hostname # Prometheus metrics settings 'PROMETHEUS_ENABLED': False, # Enable Prometheus metrics export 'PROMETHEUS_PORT': 9101, # Port for Prometheus metrics HTTP server 'PROMETHEUS_TEXTFILE_PATH': None, # Path for textfile collector (alternative to HTTP) # SMART analysis thresholds 'NEW_DRIVE_HOURS_THRESHOLD': 720, # Hours to consider a drive "new" (~30 days) 'SMART_ERROR_RECENT_HOURS': 168, # Hours window for recent SMART errors (~1 week) # Storage limits 'HISTORY_MAX_BYTES': 52428800, # 50MB max storage for history files # Health check endpoint 'HEALTH_SERVER_ENABLED': False, # Enable HTTP health check endpoint 'HEALTH_SERVER_PORT': 9102, # Port for health check endpoint # PBS (Proxmox Backup Server) monitoring 'PBS_ENABLED': False, # Enable PBS health monitoring 'PBS_ZFS_WARNING': 80, # ZFS pool usage warning threshold % 'PBS_ZFS_CRITICAL': 90 # ZFS pool usage critical threshold % } @classmethod def load_env_config(cls): """Load configuration from .env file in /etc/hwmonDaemon/""" # Check for .env file in standard system location env_file = '/etc/hwmonDaemon/.env' if not os.path.exists(env_file): logger.warning(f".env file not found at {env_file} - API key required for ticket creation") return try: with open(env_file, 'r') as f: for line in f: line = line.strip() # Skip empty lines and comments if not line or line.startswith('#'): continue # Parse KEY=VALUE format if '=' in line: key, value = line.split('=', 1) key = key.strip() value = value.strip() # Update CONFIG if key exists if key == 'TICKET_API_KEY': cls.CONFIG['TICKET_API_KEY'] = value logger.info("✓ Loaded TICKET_API_KEY from .env") elif key == 'TICKET_API_URL': cls.CONFIG['TICKET_API_URL'] = value logger.info(f"✓ Loaded TICKET_API_URL: {value}") # Ceph settings elif key == 'CEPH_ENABLED': cls.CONFIG['CEPH_ENABLED'] = value.lower() in ('true', '1', 'yes') logger.info(f"✓ Loaded CEPH_ENABLED: {cls.CONFIG['CEPH_ENABLED']}") elif key == 'CEPH_TICKET_NODE': cls.CONFIG['CEPH_TICKET_NODE'] = value if value else None logger.info(f"✓ Loaded CEPH_TICKET_NODE: {value}") elif key == 'CEPH_USAGE_WARNING': try: cls.CONFIG['CEPH_USAGE_WARNING'] = int(value) except ValueError: logger.warning(f"Invalid CEPH_USAGE_WARNING value: {value}") elif key == 'CEPH_USAGE_CRITICAL': try: cls.CONFIG['CEPH_USAGE_CRITICAL'] = int(value) except ValueError: logger.warning(f"Invalid CEPH_USAGE_CRITICAL value: {value}") # Prometheus settings elif key == 'PROMETHEUS_ENABLED': cls.CONFIG['PROMETHEUS_ENABLED'] = value.lower() in ('true', '1', 'yes') logger.info(f"✓ Loaded PROMETHEUS_ENABLED: {cls.CONFIG['PROMETHEUS_ENABLED']}") elif key == 'PROMETHEUS_PORT': try: cls.CONFIG['PROMETHEUS_PORT'] = int(value) except ValueError: logger.warning(f"Invalid PROMETHEUS_PORT value: {value}") elif key == 'PROMETHEUS_TEXTFILE_PATH': cls.CONFIG['PROMETHEUS_TEXTFILE_PATH'] = value if value else None logger.info(f"✓ Loaded PROMETHEUS_TEXTFILE_PATH: {value}") # Cluster identification elif key == 'CLUSTER_NAME': cls.CONFIG['CLUSTER_NAME'] = value if value else 'proxmox-cluster' logger.info(f"✓ Loaded CLUSTER_NAME: {value}") elif key == 'NEW_DRIVE_HOURS_THRESHOLD': try: cls.CONFIG['NEW_DRIVE_HOURS_THRESHOLD'] = int(value) except ValueError: logger.warning(f"Invalid NEW_DRIVE_HOURS_THRESHOLD value: {value}") elif key == 'SMART_ERROR_RECENT_HOURS': try: cls.CONFIG['SMART_ERROR_RECENT_HOURS'] = int(value) except ValueError: logger.warning(f"Invalid SMART_ERROR_RECENT_HOURS value: {value}") elif key == 'HISTORY_MAX_BYTES': try: cls.CONFIG['HISTORY_MAX_BYTES'] = int(value) except ValueError: logger.warning(f"Invalid HISTORY_MAX_BYTES value: {value}") # PBS settings elif key == 'PBS_ENABLED': cls.CONFIG['PBS_ENABLED'] = value.lower() in ('true', '1', 'yes') logger.info(f"✓ Loaded PBS_ENABLED: {cls.CONFIG['PBS_ENABLED']}") elif key == 'PBS_ZFS_WARNING': try: cls.CONFIG['PBS_ZFS_WARNING'] = int(value) except ValueError: logger.warning(f"Invalid PBS_ZFS_WARNING value: {value}") elif key == 'PBS_ZFS_CRITICAL': try: cls.CONFIG['PBS_ZFS_CRITICAL'] = int(value) except ValueError: logger.warning(f"Invalid PBS_ZFS_CRITICAL value: {value}") # Health server settings elif key == 'HEALTH_SERVER_ENABLED': cls.CONFIG['HEALTH_SERVER_ENABLED'] = value.lower() in ('true', '1', 'yes') logger.info(f"✓ Loaded HEALTH_SERVER_ENABLED: {cls.CONFIG['HEALTH_SERVER_ENABLED']}") elif key == 'HEALTH_SERVER_PORT': try: cls.CONFIG['HEALTH_SERVER_PORT'] = int(value) except ValueError: logger.warning(f"Invalid HEALTH_SERVER_PORT value: {value}") except Exception as e: logger.error(f"Failed to load .env file: {e}") # Validate critical configuration api_key = cls.CONFIG.get('TICKET_API_KEY') if not api_key or api_key == 'your_api_key_here': logger.warning("TICKET_API_KEY is not configured - ticket creation will fail (dry-run will still work)") TICKET_TEMPLATES = { 'ACTION_TYPE': { 'AUTO': '[auto]', 'MANUAL': '[manual]' }, 'ENVIRONMENT': { 'PRODUCTION': '[production]' }, 'TICKET_TYPE': { 'ISSUE': '[issue]', # General issue (replaces invalid 'incident') 'PROBLEM': '[problem]', # Root cause investigation 'TASK': '[task]', # Planned work item 'MAINTENANCE': '[maintenance]', # Scheduled/preventive work 'UPGRADE': '[upgrade]' # Hardware/software upgrade }, 'HARDWARE_TYPE': { 'HARDWARE': '[hardware]' }, 'SOFTWARE_TYPE': { 'SOFTWARE': '[software]' }, 'NETWORK_TYPE': { 'NETWORK': '[network]' }, 'SCOPE': { 'SINGLE_NODE': '[single-node]', 'CLUSTER_WIDE': '[cluster-wide]' } } # Category and Type mappings for ticket API TICKET_CATEGORIES = { 'HARDWARE': 'Hardware', 'SOFTWARE': 'Software' } TICKET_TYPES = { 'ISSUE': 'Issue', # General issue/incident 'PROBLEM': 'Problem', # Root cause investigation needed 'TASK': 'Task', # Planned work item 'MAINTENANCE': 'Maintenance', # Scheduled/preventive work 'UPGRADE': 'Upgrade', # Hardware/software upgrade 'INSTALL': 'Install', # New installation 'REQUEST': 'Request' # Service or information request } PROBLEMATIC_FIRMWARE = { 'Samsung': { 'EVO860': ['RVT01B6Q', 'RVT02B6Q'], # Known issues with sudden performance drops 'EVO870': ['SVT01B6Q'], 'PM883': ['HXT7404Q'] # Known issues with TRIM }, 'Seagate': { 'ST8000NM': ['CC64'], # Known issues with NCQ 'ST12000NM': ['SN02'] }, 'WDC': { 'WD121KRYZ': ['01.01A01'], # RAID rebuild issues 'WD141KRYZ': ['02.01A02'] } } MANUFACTURER_SMART_PROFILES = { 'Western Digital': { 'aliases': ['WDC', 'Western Digital', 'HGST', 'Ultrastar'], 'attributes': { 'Raw_Read_Error_Rate': { 'monitor': False, 'description': 'WD drives use this as operation counter, not error count' }, 'Seek_Error_Rate': { 'monitor': False, 'description': 'WD drives use this as operation counter, not error count' } } }, 'Seagate': { 'aliases': ['Seagate', 'ST'], 'attributes': { 'Raw_Read_Error_Rate': { 'monitor': False, 'description': 'Seagate drives use this as operation counter' } } }, 'Ridata': { 'aliases': ['Ridata', 'Ritek', 'RIDATA', 'RITEK', 'SSD 512GB'], 'firmware_patterns': ['HT3618B7', 'HT36'], 'wear_leveling_behavior': 'countup', 'wear_leveling_baseline': 0, 'wear_leveling_thresholds': { 'warning': 1000000000, # 1 billion - very conservative 'critical': 2000000000 # 2 billion - extremely conservative }, 'attributes': { 'Wear_Leveling_Count': { 'behavior': 'countup', 'baseline': 0, 'warning_threshold': 1000000000, 'critical_threshold': 2000000000, 'description': 'Total wear leveling operations (countup from 0)', 'ignore_on_new_drive': False, 'monitor': True # Include in health checks }, # These are operation counters, NOT actual failures - ignore completely 'Erase_Fail_Count_Chip': { 'monitor': False, # Skip monitoring entirely 'description': 'Operation counter, not actual failures - IGNORED' }, 'Program_Fail_Count_Chip': { 'monitor': False, # Skip monitoring entirely 'description': 'Operation counter, not actual failures - IGNORED' }, # ADD THIS: Regular Erase_Fail_Count is also an operation counter for Ridata 'Erase_Fail_Count': { 'monitor': False, # Skip monitoring entirely for Ridata 'description': 'Operation counter for Ridata drives, not actual failures - IGNORED' }, 'Program_Fail_Count': { 'monitor': False, # Skip monitoring entirely for Ridata 'description': 'Operation counter for Ridata drives, not actual failures - IGNORED' }, # These are the REAL failure counters - monitor with standard thresholds 'Program_Fail_Cnt_Total': { 'monitor': True, 'behavior': 'countup', 'baseline': 0, 'warning_threshold': 1, # Any failures are concerning 'critical_threshold': 5, 'description': 'Actual program failures (real failures)' }, 'Erase_Fail_Count_Total': { 'monitor': True, 'behavior': 'countup', 'baseline': 0, 'warning_threshold': 1, # Any failures are concerning 'critical_threshold': 5, 'description': 'Actual erase failures (real failures)' } } }, 'OOS': { 'aliases': ['OOS12000G', 'OOS'], 'attributes': { # These drives seem to report very high error rates normally 'Raw_Read_Error_Rate': { 'monitor': False, # Skip monitoring - seems to be a counter 'description': 'OOS drives report high values normally' }, 'Seek_Error_Rate': { 'monitor': False, # Skip monitoring - seems to be a counter 'description': 'OOS drives report high values normally' }, 'Command_Timeout': { 'warning_threshold': 100000000000, # 100 billion 'critical_threshold': 200000000000, # 200 billion 'description': 'OOS drives report very high timeout counters' } } }, 'Samsung': { 'aliases': ['Samsung', 'SAMSUNG'], 'wear_leveling_behavior': 'countup', 'wear_leveling_baseline': 0, 'wear_leveling_thresholds': { 'warning': 2000, 'critical': 3000 }, 'attributes': { 'Wear_Leveling_Count': { 'behavior': 'countup', 'baseline': 0, 'warning_threshold': 2000, 'critical_threshold': 3000, 'description': 'Total wear leveling operations performed', 'monitor': True }, # Standard monitoring for all other attributes 'Program_Fail_Count': { 'monitor': True, 'warning_threshold': 10, 'critical_threshold': 20 }, 'Erase_Fail_Count': { 'monitor': True, 'warning_threshold': 10, 'critical_threshold': 20 } } }, 'Intel': { 'aliases': ['Intel', 'INTEL'], 'wear_leveling_behavior': 'percentage', 'wear_leveling_baseline': 100, 'wear_leveling_thresholds': { 'warning': 30, 'critical': 10 }, 'attributes': { 'Media_Wearout_Indicator': { 'behavior': 'countdown', 'baseline': 100, 'warning_threshold': 30, 'critical_threshold': 10, 'description': 'Percentage of rated life remaining', 'monitor': True } } }, 'Micron': { 'aliases': ['Micron', 'MICRON', 'Crucial', 'CRUCIAL'], 'wear_leveling_behavior': 'percentage', 'wear_leveling_baseline': 100, 'wear_leveling_thresholds': { 'warning': 30, 'critical': 10 }, 'attributes': { # All attributes use default monitoring unless specified } }, 'Generic': { # Fallback for unknown manufacturers 'aliases': ['Unknown', 'Generic'], 'wear_leveling_behavior': 'unknown', 'wear_leveling_baseline': None, 'wear_leveling_thresholds': { 'warning': None, # Don't trigger on unknown 'critical': None }, 'attributes': { # All attributes use default monitoring } } } SEVERITY_INDICATORS = { 'CRITICAL': '[CRIT]', 'WARNING': '[WARN]', 'HEALTHY': '[ OK ]', 'UNKNOWN': '[ ?? ]' } SMART_DESCRIPTIONS = { 'Reported_Uncorrect': """ Number of errors that could not be recovered using hardware ECC. Impact: - Indicates permanent data loss in affected sectors - High correlation with drive hardware failure - Critical reliability indicator Recommended Actions: 1. Backup critical data immediately 2. Check drive logs for related errors 3. Plan for drive replacement 4. Monitor for error count increases """, 'Reallocated_Sector_Ct': """ Number of sectors that have been reallocated due to errors. Impact: - High counts indicate degrading media - Each reallocation uses one of the drive's limited spare sectors - Rapid increases suggest accelerating drive wear Recommended Actions: 1. Monitor rate of increase 2. Check drive temperature 3. Plan replacement if count grows rapidly """, 'Current_Pending_Sector': """ Sectors waiting to be reallocated due to read/write errors. Impact: - Indicates potentially unstable sectors - May result in data loss if unrecoverable - Should be monitored for increases Recommended Actions: 1. Backup affected files 2. Run extended SMART tests 3. Monitor for conversion to reallocated sectors """, 'Offline_Uncorrectable': """ Count of uncorrectable errors detected during offline data collection. Impact: - Direct indicator of media reliability issues - May affect data integrity - High values suggest drive replacement needed Recommended Actions: 1. Run extended SMART tests 2. Check drive logs 3. Plan replacement if count is increasing """, 'Spin_Retry_Count': """ Number of spin start retry attempts. Impact: - Indicates potential motor or bearing issues - May predict imminent mechanical failure - Increasing values suggest degrading drive health Recommended Actions: 1. Monitor for rapid increases 2. Check drive temperature 3. Plan replacement if count grows rapidly """, 'Power_On_Hours': """ Total number of hours the device has been powered on. Impact: - Normal aging metric - Used to gauge overall drive lifetime - Compare against manufacturer's MTBF rating Recommended Actions: 1. Compare to warranty period 2. Plan replacement if approaching rated lifetime """, 'Media_Wearout_Indicator': """ Percentage of drive's rated life remaining (SSDs). Impact: - 100 indicates new drive - 0 indicates exceeded rated writes - Critical for SSD lifecycle management Recommended Actions: 1. Plan replacement below 20% 2. Monitor write workload 3. Consider workload redistribution """, 'Temperature_Celsius': """ Current drive temperature. Impact: - High temperatures accelerate wear - Optimal range: 20-45°C - Sustained high temps reduce lifespan Recommended Actions: 1. Check system cooling 2. Verify airflow 3. Monitor for sustained high temperatures """, 'Available_Spare': """ Percentage of spare blocks remaining (SSDs). Impact: - Critical for SSD endurance - Low values indicate approaching end-of-life - Rapid decreases suggest excessive writes Recommended Actions: 1. Plan replacement if below 20% 2. Monitor write patterns 3. Consider workload changes """, 'Program_Fail_Count': """ Number of flash program operation failures. Impact: - Indicates NAND cell reliability - Important for SSD health assessment - Increasing values suggest flash degradation Recommended Actions: 1. Monitor rate of increase 2. Check firmware updates 3. Plan replacement if rapidly increasing """, 'Erase_Fail_Count': """ Number of flash erase operation failures. Impact: - Related to NAND block health - Critical for SSD reliability - High counts suggest failing flash blocks Recommended Actions: 1. Monitor count increases 2. Check firmware version 3. Plan replacement if count is high """, 'Load_Cycle_Count': """ Number of power cycles and head load/unload events. Impact: - Normal operation metric - High counts may indicate power management issues - Compare against rated cycles (typically 600k-1M) Recommended Actions: 1. Review power management settings 2. Monitor rate of increase 3. Plan replacement near rated limit """, 'Wear_Leveling_Count': """ SSD block erase distribution metric. Impact: - Indicates wear pattern uniformity - Interpretation varies by manufacturer - Critical for SSD longevity Recommended Actions: 1. Monitor trend over time 2. Compare with manufacturer baseline 3. Check workload distribution Note: Different manufacturers use different counting methods: - Some count up from 0 (Samsung, etc.) - Others count down from baseline (Ridata, etc.) - Always check manufacturer specifications """ } # ============================================================================= # INITIALIZATION # ============================================================================= def __init__(self, ticket_api_url: str = 'http://10.10.10.45/create_ticket_api.php', dry_run: bool = False, verbose: bool = False): """ Initialize the system health monitor. :param ticket_api_url: URL for the ticket creation API. :param dry_run: If True, simulate API calls without sending requests. :param verbose: If True, enable DEBUG-level logging output. """ # Set log verbosity if verbose: logger.setLevel(logging.DEBUG) for handler in logger.handlers: handler.setLevel(logging.DEBUG) logger.debug("Verbose logging enabled") # Load environment configuration first (API keys, etc.) self.load_env_config() self.ticket_api_url = ticket_api_url self.dry_run = dry_run # Ensure history directory exists os.makedirs(self.CONFIG['HISTORY_DIR'], exist_ok=True) # Drive details cache (per-run, cleared on next execution) self._drive_details_cache = {} # Health check tracking self._last_check_timestamp = None self._last_check_status = 'unknown' # Check tool availability at startup self._available_tools = self._check_tool_availability() def _check_tool_availability(self) -> Dict[str, bool]: """Check which external tools are available on this system. Returns a dict mapping tool names to availability booleans. Logs warnings for missing required tools and info for missing optional tools. """ required_tools = { 'smartctl': 'smartmontools', 'lsblk': 'util-linux', } optional_tools = { 'nvme': 'nvme-cli', 'ceph': 'ceph-common', 'pct': 'pve-container', 'dmidecode': 'dmidecode', 'proxmox-backup-manager': 'proxmox-backup-server', 'zpool': 'zfsutils-linux', } availability = {} for tool, package in required_tools.items(): available = shutil.which(tool) is not None availability[tool] = available if not available: logger.warning(f"Required tool '{tool}' not found (install: apt install {package})") for tool, package in optional_tools.items(): available = shutil.which(tool) is not None availability[tool] = available if not available: logger.debug(f"Optional tool '{tool}' not found (install: apt install {package})") return availability def _enforce_storage_limit(self, history_dir: str, max_bytes: int = None): """ Delete oldest history files if directory exceeds size limit. :param history_dir: Directory containing history files :param max_bytes: Maximum directory size in bytes (default from CONFIG) """ if max_bytes is None: max_bytes = self.CONFIG.get('HISTORY_MAX_BYTES', 52428800) if not os.path.exists(history_dir): return try: total_size = 0 files_with_mtime = [] # Calculate total size and collect file metadata for f in os.listdir(history_dir): filepath = os.path.join(history_dir, f) if f.startswith('smart_history_') and f.endswith('.json'): try: stat = os.stat(filepath) total_size += stat.st_size files_with_mtime.append((filepath, stat.st_mtime, stat.st_size)) except (IOError, OSError) as e: logger.debug(f"Could not stat file {filepath}: {e}") # If over limit, delete oldest files first if total_size > max_bytes: # Sort by modification time (oldest first) files_with_mtime.sort(key=lambda x: x[1]) logger.info(f"History directory size ({total_size} bytes) exceeds limit ({max_bytes} bytes), cleaning up...") for filepath, mtime, file_size in files_with_mtime: if total_size <= max_bytes: break try: os.remove(filepath) total_size -= file_size logger.info(f"Removed old history file {os.path.basename(filepath)} (saved {file_size} bytes)") except (IOError, OSError) as e: logger.warning(f"Could not remove history file {filepath}: {e}") except Exception as e: logger.error(f"Error enforcing storage limit: {e}") # ============================================================================= # HEALTH CHECK ENDPOINT # ============================================================================= def _start_health_server(self): """Start a lightweight HTTP health check endpoint as a daemon thread.""" from http.server import HTTPServer, BaseHTTPRequestHandler import threading monitor = self class HealthHandler(BaseHTTPRequestHandler): def do_GET(self): if self.path == '/health': response = { 'status': monitor._last_check_status, 'hostname': socket.gethostname(), 'last_check': monitor._last_check_timestamp, 'uptime': datetime.datetime.now().isoformat() } self.send_response(200) self.send_header('Content-Type', 'application/json') self.end_headers() self.wfile.write(json.dumps(response).encode()) else: self.send_response(404) self.end_headers() def log_message(self, format, *args): logger.debug(f"Health server: {format % args}") port = self.CONFIG.get('HEALTH_SERVER_PORT', 9102) try: server = HTTPServer(('', port), HealthHandler) thread = threading.Thread(target=server.serve_forever, daemon=True) thread.start() logger.info(f"Health check endpoint started on port {port}") except OSError as e: logger.warning(f"Could not start health server on port {port}: {e}") # ============================================================================= # MAIN EXECUTION METHODS # ============================================================================= def run(self): """Perform a one-shot health check of the system.""" try: # Perform health checks and gather the report health_report = self.perform_health_checks() # Track last check for health endpoint self._last_check_timestamp = datetime.datetime.now().isoformat() self._last_check_status = health_report.get('drives_health', {}).get('overall_status', 'unknown') # Create tickets for any detected critical issues self._create_tickets_for_issues(health_report) # Export Prometheus metrics if enabled if self.CONFIG.get('PROMETHEUS_ENABLED', False): self.write_prometheus_metrics(health_report) except Exception as e: import traceback logger.error(f"Unexpected error during health check: {e}") logger.error(traceback.format_exc()) def perform_health_checks(self) -> Dict[str, Any]: """Perform comprehensive system health checks and return a report.""" health_report = { 'hostname': socket.gethostname(), 'timestamp': datetime.datetime.now().isoformat(), 'drives_health': self._check_drives_health(), 'memory_health': self._check_memory_usage(), 'cpu_health': self._check_cpu_usage(), 'network_health': self._check_network_status(), 'ceph_health': self._check_ceph_health(), 'lxc_health': self._check_lxc_storage(), 'system_health': self._check_system_drive_indicators(), 'pbs_health': self._check_pbs_health() } if self.dry_run: logger.info("\n=== System Health Summary ===") logger.info(f"Overall Drive Health: {health_report['drives_health']['overall_status']}") # Summarized drive information with usage logger.info("\nDrive Status:") for drive in health_report['drives_health']['drives']: issues = drive.get('smart_issues', []) temp = f", {drive.get('temperature')}°C" if drive.get('temperature') else "" status = "⚠️ " if issues else "✓ " # Disk usage information usage_info = "" if drive.get('partitions'): for partition in drive['partitions']: usage_info += f"\n └─ {partition['mountpoint']}: {partition['used_space']}/{partition['total_space']} ({partition['usage_percent']}% used)" logger.info(f"{status}{drive['device']}{temp} - SMART: {drive['smart_status']}{usage_info}") if issues: logger.info(f" Issues: {', '.join(issues)}") logger.info(f"\nMemory: {health_report['memory_health']['memory_percent']}% used") if health_report['memory_health'].get('has_ecc'): logger.info("ECC Memory: Present") if health_report['memory_health'].get('ecc_errors'): logger.info(f"ECC Errors: {len(health_report['memory_health']['ecc_errors'])} found") logger.info(f"\nCPU Usage: {health_report['cpu_health']['cpu_usage_percent']}%") logger.info("\nNetwork Status:") logger.info(f"Management: {health_report['network_health']['management_network']['status']}") logger.info(f"Ceph Network: {health_report['network_health']['ceph_network']['status']}") # Ceph cluster status ceph = health_report.get('ceph_health', {}) if ceph.get('is_ceph_node'): logger.info("\nCeph Cluster Status:") logger.info(f" Cluster Health: {ceph.get('cluster_health', 'UNKNOWN')}") if ceph.get('cluster_usage'): usage = ceph['cluster_usage'] logger.info(f" Cluster Usage: {usage.get('usage_percent', 0):.1f}%") logger.info(f" OSDs: {len(ceph.get('osd_status', []))} total") down_osds = [o for o in ceph.get('osd_status', []) if o.get('status') == 'down'] if down_osds: logger.info(f" ⚠️ Down OSDs: {len(down_osds)}") if ceph.get('cluster_wide_issues'): logger.info(f" ⚠️ Cluster-wide issues: {len(ceph['cluster_wide_issues'])}") if ceph.get('issues'): logger.info(f" ⚠️ Node-specific issues: {len(ceph['issues'])}") if health_report['system_health']['issues']: logger.info(f"\nSystem Issues: {len(health_report['system_health']['issues'])} found") # PBS status pbs = health_report.get('pbs_health', {}) if pbs.get('is_pbs_node'): logger.info("\nPBS Status:") for pool in pbs.get('zfs_pools', []): logger.info(f" ZFS Pool '{pool['name']}': {pool['usage_percent']}% used ({pool['used']}/{pool['total']})") if pbs.get('failed_tasks'): logger.info(f" Failed tasks: {len(pbs['failed_tasks'])}") if pbs.get('issues'): logger.info(f" Issues: {len(pbs['issues'])}") logger.info("\n=== End Summary ===") return health_report # ============================================================================= # ENHANCED SMART ANALYSIS METHODS # ============================================================================= def _analyze_smart_trends(self, device: str, current_attributes: dict) -> List[str]: """Analyze SMART attribute trends to predict failures.""" issues = [] # Create safe filename from device path device_safe = device.replace('/', '_').replace('-', '_') historical_file = os.path.join(self.CONFIG['HISTORY_DIR'], f"smart_history_{device_safe}.json") try: # Enforce storage limit before writing self._enforce_storage_limit(self.CONFIG['HISTORY_DIR']) # Load historical data with file locking history = [] if os.path.exists(historical_file) and os.path.getsize(historical_file) > 0: file_mode = 'r+' else: file_mode = 'w+' with open(historical_file, file_mode) as f: # Acquire exclusive lock fcntl.flock(f.fileno(), fcntl.LOCK_EX) try: # Read existing data if file is not empty if os.path.getsize(historical_file) > 0: f.seek(0) try: history = json.load(f) except json.JSONDecodeError as e: logger.warning(f"Corrupted history file {historical_file}, starting fresh: {e}") history = [] # Add current reading current_reading = { 'timestamp': datetime.datetime.now().isoformat(), 'attributes': current_attributes } history.append(current_reading) # Keep only recent data (30 days default) cutoff_date = datetime.datetime.now() - datetime.timedelta(days=self.CONFIG['HISTORY_RETENTION_DAYS']) history = [h for h in history if datetime.datetime.fromisoformat(h['timestamp']) > cutoff_date] # Analyze trends for critical attributes if len(history) >= 3: # Need at least 3 data points for trend analysis critical_attrs = ['Reallocated_Sector_Ct', 'Current_Pending_Sector', 'Reported_Uncorrect', 'Offline_Uncorrectable', 'Program_Fail_Count', 'Erase_Fail_Count'] for attr in critical_attrs: if attr in current_attributes: # Get last week's values recent_history = history[-7:] if len(history) >= 7 else history values = [h['attributes'].get(attr, 0) for h in recent_history] if len(values) >= 3: # Check for rapid increase recent_increase = values[-1] - values[0] if recent_increase > 0: rate = recent_increase / len(values) # Different thresholds for different attributes if attr in ['Reallocated_Sector_Ct', 'Current_Pending_Sector']: if rate > 0.5: # More than 0.5 sectors per check issues.append(f"TREND ALERT: Rapid increase in {attr}: +{recent_increase} in {len(values)} checks") elif attr in ['Reported_Uncorrect', 'Offline_Uncorrectable']: if rate > 0.2: # Any consistent increase is concerning issues.append(f"TREND ALERT: Increasing {attr}: +{recent_increase} in {len(values)} checks") else: # Program/Erase fail counts if rate > 1: # More than 1 error per check issues.append(f"TREND ALERT: Rapid increase in {attr}: +{recent_increase} in {len(values)} checks") # Write updated history atomically f.seek(0) f.truncate() json.dump(history, f, indent=2) f.flush() finally: # Release lock fcntl.flock(f.fileno(), fcntl.LOCK_UN) except (IOError, OSError) as e: logger.debug(f"I/O error analyzing trends for {device}: {e}") except Exception as e: logger.error(f"Unexpected error analyzing trends for {device}: {e}") return issues def _check_thermal_health(self, device: str, temperature: int, drive_type: str = 'HDD') -> List[str]: """Enhanced thermal health checking with drive-type specific thresholds.""" issues = [] if temperature is None: return issues # Drive-type specific temperature thresholds - ADJUSTED TO BE LESS SENSITIVE if drive_type == 'SSD': temp_thresholds = {'warning': 70, 'critical': 85, 'optimal_max': 65} else: # HDD temp_thresholds = {'warning': 65, 'critical': 75, 'optimal_max': 60} if temperature >= temp_thresholds['critical']: issues.append(f"CRITICAL: Drive temperature {temperature}°C exceeds safe operating limit for {drive_type}") elif temperature >= temp_thresholds['warning']: issues.append(f"WARNING: Drive temperature {temperature}°C approaching thermal limit for {drive_type}") elif temperature > temp_thresholds['optimal_max']: issues.append(f"INFO: Drive temperature {temperature}°C above optimal range for {drive_type}") return issues def _analyze_error_patterns(self, device: str, smart_output: str) -> List[str]: """Analyze SMART error logs for failure patterns.""" issues = [] # Pattern matching for different error types error_patterns = { 'media_errors': [ r'UNC_ERR', r'ABRT_ERR', r'read error', r'write error', r'medium error' ], 'interface_errors': [ r'ICRC_ERR', r'interface CRC error', r'SATA link down', r'communication failure' ], 'timeout_errors': [ r'command timeout', r'NCQ error', r'device fault', r'reset required' ] } for error_type, patterns in error_patterns.items(): error_count = 0 for pattern in patterns: matches = re.findall(pattern, smart_output, re.IGNORECASE) error_count += len(matches) if error_count > 0: if error_count >= 10: issues.append(f"CRITICAL: Multiple {error_type} detected ({error_count} occurrences)") elif error_count >= 3: issues.append(f"WARNING: {error_type} detected ({error_count} occurrences)") elif error_count >= 1: issues.append(f"INFO: {error_type} detected ({error_count} occurrences)") return issues def _check_ssd_health(self, device: str, smart_attributes: dict) -> List[str]: """SSD-specific health checks for wear and endurance.""" issues = [] # Check wear leveling and endurance indicators wear_indicators = [ 'Media_Wearout_Indicator', 'SSD_Life_Left', 'Percent_Lifetime_Remain', 'Available_Spare', 'Available_Spare_Threshold' ] for indicator in wear_indicators: if indicator in smart_attributes: value = smart_attributes[indicator] # Handle percentage-based indicators (countdown from 100) if indicator in ['Media_Wearout_Indicator', 'SSD_Life_Left', 'Percent_Lifetime_Remain', 'Available_Spare']: if value <= 5: issues.append(f"CRITICAL: {indicator} at {value}% - SSD near end of life") elif value <= 15: issues.append(f"WARNING: {indicator} at {value}% - SSD showing significant wear") elif value <= 30: issues.append(f"INFO: {indicator} at {value}% - SSD wear monitoring recommended") # Check for excessive bad blocks bad_block_indicators = [ 'Runtime_Bad_Block', 'Factory_Bad_Block_Ct', 'Grown_Failing_Block_Ct', 'End-to-End_Error' ] for indicator in bad_block_indicators: if indicator in smart_attributes: value = smart_attributes[indicator] if value > 100: issues.append(f"WARNING: High {indicator}: {value}") elif value > 10: issues.append(f"INFO: Elevated {indicator}: {value}") # Check write amplification and endurance metrics endurance_indicators = [ 'Total_LBAs_Written', 'Total_LBAs_Read', 'Host_Program_NAND_Pages_Count', 'FTL_Program_NAND_Pages_Count' ] # Calculate write amplification if both host and FTL write counts are available host_writes = smart_attributes.get('Host_Program_NAND_Pages_Count', 0) ftl_writes = smart_attributes.get('FTL_Program_NAND_Pages_Count', 0) if host_writes > 0 and ftl_writes > 0: write_amplification = ftl_writes / host_writes if write_amplification > 5.0: issues.append(f"WARNING: High write amplification factor: {write_amplification:.2f}") elif write_amplification > 3.0: issues.append(f"INFO: Elevated write amplification factor: {write_amplification:.2f}") return issues def _check_system_drive_indicators(self) -> Dict[str, Any]: """Check system logs and kernel messages for drive issues.""" system_health = { 'status': 'OK', 'issues': [] } try: # Check dmesg for drive-related errors (last 1000 lines to avoid overwhelming output) result = subprocess.run(['dmesg', '-T', '--level=err,warn'], stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, timeout=10) if result.returncode == 0: error_patterns = [ (r'ata\d+.*failed command', 'ATA command failures'), (r'sd \w+.*Medium Error', 'SCSI medium errors'), (r'Buffer I/O error', 'Buffer I/O errors'), (r'critical medium error', 'Critical medium errors'), (r'unrecovered read error', 'Unrecovered read errors'), (r'Current_Pending_Sector.*increased', 'Pending sector increases'), (r'ata\d+.*SError:', 'SATA errors'), (r'nvme\d+.*I/O error', 'NVMe I/O errors') ] for pattern, description in error_patterns: matches = re.findall(pattern, result.stdout, re.IGNORECASE) if matches: count = len(matches) if count >= 5: system_health['status'] = 'CRITICAL' system_health['issues'].append(f"CRITICAL: {description} in system logs ({count} occurrences)") elif count >= 2: if system_health['status'] != 'CRITICAL': system_health['status'] = 'WARNING' system_health['issues'].append(f"WARNING: {description} in system logs ({count} occurrences)") else: system_health['issues'].append(f"INFO: {description} in system logs ({count} occurrences)") except subprocess.TimeoutExpired: system_health['issues'].append("WARNING: System log check timed out") except Exception as e: logger.debug(f"Error checking system drive indicators: {e}") system_health['issues'].append(f"ERROR: Failed to check system logs: {str(e)}") return system_health # ============================================================================= # DRIVE HEALTH CHECKING METHODS # ============================================================================= def _get_drive_details(self, device: str) -> Dict[str, str]: """Get detailed drive information using smartctl (cached per run).""" if device in self._drive_details_cache: return self._drive_details_cache[device] drive_details = { 'model': None, 'serial': None, 'capacity': None, 'firmware': None, 'type': None, # SSD or HDD 'smart_capable': False } try: # First check if device supports SMART capability_result = subprocess.run( ['smartctl', '-i', device], stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, timeout=30 ) # Check if smartctl failed completely if capability_result.returncode not in [0, 4]: # 0 = success, 4 = some SMART errors but readable logger.debug(f"smartctl failed for {device}: return code {capability_result.returncode}") return drive_details output = capability_result.stdout # Check if SMART is supported if "SMART support is: Enabled" in output or "SMART support is: Available" in output: drive_details['smart_capable'] = True elif "SMART support is: Unavailable" in output or "does not support SMART" in output: logger.debug(f"Device {device} does not support SMART") return drive_details for line in output.split('\n'): if 'Device Model' in line or 'Model Number' in line: drive_details['model'] = line.split(':')[1].strip() elif 'Serial Number' in line: drive_details['serial'] = line.split(':')[1].strip() elif 'User Capacity' in line: # Extract capacity from brackets capacity_match = re.search(r'\[(.*?)\]', line) if capacity_match: drive_details['capacity'] = capacity_match.group(1) elif 'Firmware Version' in line: drive_details['firmware'] = line.split(':')[1].strip() elif 'Rotation Rate' in line: if 'Solid State Device' in line: drive_details['type'] = 'SSD' else: drive_details['type'] = 'HDD' except Exception as e: logger.debug(f"Error getting drive details for {device}: {e}") self._drive_details_cache[device] = drive_details return drive_details def _get_issue_type(self, issue: str) -> str: """Determine issue type from issue description.""" if "SMART" in issue: return "SMART Health Issue" elif "Drive" in issue: return "Storage Issue" elif any(kw in issue for kw in ["Ceph", "OSD", "ceph"]): return "Ceph Cluster Issue" elif "ECC" in issue: return "Memory Issue" elif "CPU" in issue: return "Performance Issue" elif "Network" in issue: return "Network Issue" elif any(kw in issue for kw in ["LXC", "storage usage", "container"]): return "Container Storage Issue" return "Hardware Issue" def _get_impact_level(self, issue: str) -> str: """Determine impact level from issue description.""" issue_upper = issue.upper() # Check storage/CPU warnings first so "critical storage" isn't caught as Critical if any(kw in issue_upper for kw in ["STORAGE USAGE", "THRESHOLD", "CPU USAGE"]): return "[WARN] Warning - Action Needed Soon" if "CRITICAL" in issue_upper or "UNHEALTHY" in issue_upper or "HEALTH_ERR" in issue_upper: return "[CRIT] Critical - Immediate Action Required" elif "WARNING" in issue_upper or "HEALTH_WARN" in issue_upper or "DOWN" in issue_upper: return "[WARN] Warning - Action Needed Soon" return "[LOW] Low - Monitor Only" def _generate_detailed_description(self, issue: str, health_report: Dict[str, Any], priority: str = '3') -> str: """Generate detailed ticket description with properly formatted ASCII art.""" hostname = socket.gethostname() timestamp = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") priority_labels = { '1': '⚠ P1 - CRITICAL', '2': '⚠ P2 - HIGH', '3': '● P3 - MEDIUM', '4': '● P4 - NORMAL', '5': '● P5 - LOW', } priority_display = priority_labels.get(priority, '● P3 - MEDIUM') # Box width: all lines are exactly 80 chars # border lines: ┏ + 78 ━ + ┓ = 80 # content lines: prefix + field_width + ┃ = 80 box_width = 78 banner = f""" ┏{'━' * box_width}┓ ┃{' HARDWARE MONITORING ALERT TICKET '.center(box_width)}┃ ┣{'━' * box_width}┫ ┃ Host : {hostname:<{box_width - 14}}┃ ┃ Generated : {timestamp:<{box_width - 14}}┃ ┃ Priority : {priority_display:<{box_width - 14}}┃ ┗{'━' * box_width}┛""" issue_type = self._get_issue_type(issue) impact_level = self._get_impact_level(issue) executive_summary = f""" ┏━ EXECUTIVE SUMMARY {'━' * (box_width - 20)}┓ ┃ Issue Type │ {issue_type:<60}┃ ┃ Impact Level │ {impact_level:<60}┃ ┗{'━' * box_width}┛""" description = banner + executive_summary # Add relevant SMART descriptions for attr in self.SMART_DESCRIPTIONS: if attr in issue: description += f"\n{attr}:\n{textwrap.dedent(self.SMART_DESCRIPTIONS[attr]).strip()}\n" if "SMART" in issue: description += "\n" + textwrap.dedent(""" SMART (Self-Monitoring, Analysis, and Reporting Technology) Attribute Details: - Possible drive failure! """).strip() + "\n" if "Drive" in issue and ("has SMART issues" in issue or "temperature is high" in issue): try: serial_match = re.search(r'Drive (\S+) (?:has SMART issues|temperature is high)', issue) drive_id = serial_match.group(1) if serial_match else None # Find drive_info by matching serial (or device path as fallback) device = None drive_info = None for d in health_report['drives_health']['drives']: dd = self._get_drive_details(d['device']) if (dd.get('serial') or d['device']) == drive_id: device = d['device'] drive_info = d break if drive_info: drive_details = self._get_drive_details(device) smart_data = { 'attributes': drive_info.get('smart_attributes', {}), 'performance_metrics': drive_info.get('performance_metrics', {}), 'last_test_date': drive_info.get('last_test_date', 'N/A') } power_on_hours = smart_data['attributes'].get('Power_On_Hours', 'N/A') last_test_date = smart_data.get('last_test_date', 'N/A') if power_on_hours != 'N/A' and isinstance(power_on_hours, (int, float)): total_days = power_on_hours / 24 years = int(total_days / 365) months = int((total_days % 365) / 30) if years >= 1: age = f"{years} year{'s' if years != 1 else ''}, {months} month{'s' if months != 1 else ''}" elif months >= 1: age = f"{months} month{'s' if months != 1 else ''}" else: age = "< 1 month" else: age = 'N/A' # Ensure all values are properly formatted strings device_safe = device or 'N/A' model_safe = drive_details.get('model') or 'N/A' serial_safe = drive_details.get('serial') or 'N/A' capacity_safe = drive_details.get('capacity') or 'N/A' type_safe = drive_details.get('type') or 'N/A' firmware_safe = drive_details.get('firmware') or 'N/A' description += f""" ┏━ DRIVE SPECIFICATIONS {'━' * (box_width - 23)}┓ ┃ Device Path │ {device_safe:<61}┃ ┃ Model │ {model_safe:<61}┃ ┃ Serial │ {serial_safe:<61}┃ ┃ Capacity │ {capacity_safe:<61}┃ ┃ Type │ {type_safe:<61}┃ ┃ Firmware │ {firmware_safe:<61}┃ ┗{'━' * box_width}┛ """ power_on_safe = f"{power_on_hours} hours" if power_on_hours != 'N/A' else 'N/A' last_test_safe = last_test_date or 'N/A' age_safe = age or 'N/A' description += f""" ┏━ DRIVE TIMELINE {'━' * (box_width - 17)}┓ ┃ Power-On Hours │ {power_on_safe:<56}┃ ┃ Last SMART Test │ {last_test_safe:<56}┃ ┃ Drive Age │ {age_safe:<56}┃ ┗{'━' * box_width}┛ """ smart_status_safe = drive_info.get('smart_status') or 'N/A' # Properly handle temperature with None check temp_value = drive_info.get('temperature') temp_safe = f"{temp_value}°C" if temp_value is not None else 'N/A' description += f""" ┏━ SMART STATUS {'━' * (box_width - 15)}┓ ┃ Status │ {smart_status_safe:<62}┃ ┃ Temperature │ {temp_safe:<62}┃ ┗{'━' * box_width}┛ """ if drive_info.get('smart_attributes'): description += f"\n┏━ SMART ATTRIBUTES {'━' * (box_width - 19)}┓\n" for attr, value in drive_info['smart_attributes'].items(): attr_safe = str(attr).replace('_', ' ') if attr else 'Unknown' value_safe = str(value) if value is not None else 'N/A' description += f"┃ {attr_safe:<27} │ {value_safe:<46}┃\n" description += f"┗{'━' * box_width}┛\n" if drive_info.get('partitions'): for partition in drive_info['partitions']: usage_percent = partition.get('usage_percent', 0) # Create 50-char usage meter (2% per block) blocks = int(usage_percent / 2) usage_meter = '█' * blocks + '░' * (50 - blocks) mountpoint_safe = partition.get('mountpoint') or 'N/A' fstype_safe = partition.get('fstype') or 'N/A' total_space_safe = partition.get('total_space') or 'N/A' used_space_safe = partition.get('used_space') or 'N/A' free_space_safe = partition.get('free_space') or 'N/A' usage_pct_str = f"{usage_percent}%" # Truncate mountpoint if too long for header mountpoint_display = mountpoint_safe[:50] if len(mountpoint_safe) > 50 else mountpoint_safe description += f""" ┏━ PARTITION: {mountpoint_display} {'━' * (box_width - 14 - len(mountpoint_display))}┓ ┃ Filesystem │ {fstype_safe:<61}┃ ┃ Usage Meter │ {usage_meter} {usage_pct_str:>10}┃ ┃ Total Space │ {total_space_safe:<61}┃ ┃ Used Space │ {used_space_safe:<61}┃ ┃ Free Space │ {free_space_safe:<61}┃ ┗{'━' * box_width}┛ """ firmware_info = self._check_disk_firmware(device) if firmware_info['is_problematic']: description += f"\n┏━ FIRMWARE ALERTS {'━' * (box_width - 18)}┓\n" for issue_item in firmware_info['known_issues']: issue_safe = str(issue_item) if issue_item else 'Unknown issue' description += f"┃ ⚠ {issue_safe:<{box_width - 4}}┃\n" description += f"┗{'━' * box_width}┛\n" except Exception as e: description += f"\nError generating drive details: {str(e)}\n" if "Temperature" in issue: description += "\n" + textwrap.dedent(""" High drive temperatures can: - Reduce drive lifespan - Cause performance degradation - Lead to data corruption in extreme cases Optimal temperature range: 20-45°C """).strip() + "\n" if "ECC" in issue: description += "\n" + textwrap.dedent(""" ECC (Error Correction Code) Memory Issues: - Correctable: Memory errors that were successfully fixed - Uncorrectable: Serious memory errors that could not be corrected Frequent ECC corrections may indicate degrading memory modules """).strip() + "\n" if "CPU" in issue: description += "\n" + textwrap.dedent(""" High CPU usage sustained over time can indicate: - Resource constraints - Runaway processes - Need for performance optimization - Potential cooling issues """).strip() + "\n" # Add CPU STATUS box cpu_health = health_report.get('cpu_health', {}) cpu_usage = cpu_health.get('cpu_usage_percent', 'N/A') cpu_threshold = self.CONFIG['THRESHOLDS']['CPU_WARNING'] cpu_status = cpu_health.get('status', 'N/A') cpu_usage_str = f"{cpu_usage}%" if isinstance(cpu_usage, (int, float)) else cpu_usage description += f""" ┏━ CPU STATUS {'━' * (box_width - 13)}┓ ┃ Usage │ {cpu_usage_str:<61}┃ ┃ Threshold │ {str(cpu_threshold) + '%':<61}┃ ┃ Status │ {cpu_status:<61}┃ ┗{'━' * box_width}┛ """ if "Network" in issue: description += "\n" + textwrap.dedent(""" Network connectivity issues can impact: - Cluster communication - Data replication - Service availability - Management access """).strip() + "\n" # Add NETWORK STATUS box net_health = health_report.get('network_health', {}) mgmt = net_health.get('management_network', {}) ceph_net = net_health.get('ceph_network', {}) mgmt_status = mgmt.get('status', 'N/A') ceph_status = ceph_net.get('status', 'N/A') mgmt_latency = mgmt.get('latency') mgmt_latency_str = f"{mgmt_latency}ms" if mgmt_latency is not None else 'N/A' mgmt_issues = mgmt.get('issues', []) ceph_issues = ceph_net.get('issues', []) all_net_issues = mgmt_issues + ceph_issues issues_str = '; '.join(all_net_issues) if all_net_issues else 'None' # Truncate issues string to fit in box if len(issues_str) > 61: issues_str = issues_str[:58] + '...' description += f""" ┏━ NETWORK STATUS {'━' * (box_width - 17)}┓ ┃ Management │ {mgmt_status:<61}┃ ┃ Ceph Network │ {ceph_status:<61}┃ ┃ Latency │ {mgmt_latency_str:<61}┃ ┃ Issues │ {issues_str:<61}┃ ┗{'━' * box_width}┛ """ if any(kw in issue for kw in ["LXC", "storage usage", "container"]): # Add CONTAINER STORAGE box lxc_health = health_report.get('lxc_health', {}) containers = lxc_health.get('containers', []) for container in containers: vmid = container.get('vmid', 'N/A') for fs in container.get('filesystems', []): mountpoint = fs.get('mountpoint', 'N/A') usage_pct = fs.get('usage_percent', 0) total_bytes = fs.get('total_space', 0) used_bytes = fs.get('used_space', 0) avail_bytes = fs.get('available', 0) # Only show filesystems relevant to this issue if mountpoint not in issue and vmid not in issue: continue total_str = self._format_bytes_human(total_bytes) if isinstance(total_bytes, (int, float)) else str(total_bytes) used_str = self._format_bytes_human(used_bytes) if isinstance(used_bytes, (int, float)) else str(used_bytes) free_str = self._format_bytes_human(avail_bytes) if isinstance(avail_bytes, (int, float)) else str(avail_bytes) # Create 50-char usage meter (2% per block) blocks = int(usage_pct / 2) usage_meter = '█' * blocks + '░' * (50 - blocks) usage_pct_str = f"{usage_pct:.1f}%" description += f""" ┏━ CONTAINER STORAGE {'━' * (box_width - 20)}┓ ┃ VMID │ {vmid:<61}┃ ┃ Mountpoint │ {mountpoint:<61}┃ ┃ Usage Meter │ {usage_meter} {usage_pct_str:>10}┃ ┃ Total │ {total_str:<61}┃ ┃ Used │ {used_str:<61}┃ ┃ Free │ {free_str:<61}┃ ┗{'━' * box_width}┛ """ if any(kw in issue for kw in ["Ceph", "OSD", "ceph", "HEALTH_ERR", "HEALTH_WARN"]): # Add CEPH CLUSTER STATUS box ceph_health = health_report.get('ceph_health', {}) if ceph_health.get('is_ceph_node'): cluster_health = ceph_health.get('cluster_health', 'N/A') cluster_usage = ceph_health.get('cluster_usage', {}) usage_pct = cluster_usage.get('usage_percent', 'N/A') if cluster_usage else 'N/A' total_bytes = cluster_usage.get('total_bytes', 0) if cluster_usage else 0 used_bytes = cluster_usage.get('used_bytes', 0) if cluster_usage else 0 total_str = self._format_bytes_human(total_bytes) if total_bytes else 'N/A' used_str = self._format_bytes_human(used_bytes) if used_bytes else 'N/A' usage_pct_str = f"{usage_pct}%" if isinstance(usage_pct, (int, float)) else usage_pct osd_list = ceph_health.get('osd_status', []) osd_total = len(osd_list) osd_up = sum(1 for o in osd_list if o.get('status') == 'up') osd_summary = f"{osd_up}/{osd_total} up" if osd_total > 0 else 'N/A' description += f""" ┏━ CEPH CLUSTER STATUS {'━' * (box_width - 22)}┓ ┃ Health │ {cluster_health:<61}┃ ┃ Usage │ {usage_pct_str:<61}┃ ┃ Total │ {total_str:<61}┃ ┃ Used │ {used_str:<61}┃ ┃ OSDs │ {osd_summary:<61}┃ ┗{'━' * box_width}┛ """ if "Disk" in issue: for partition in health_report.get('drives_health', {}).get('drives', []): if partition.get('mountpoint') in issue: description += "\n=== Disk Metrics ===\n" description += f"Disk Device: {partition['device']}\n" description += f"Mount Point: {partition['mountpoint']}\n" description += f"Total Space: {partition['total_space']}\n" description += f"Used Space: {partition['used_space']}\n" description += f"Free Space: {partition['free_space']}\n" description += f"Usage Percent: {partition['usage_percent']}%\n" return description def _count_critical_issues(self, health_report: Dict[str, Any]) -> int: """Count total critical issues across all health checks for P1 escalation.""" count = 0 # Manufacturer operation counters to exclude (same as in _detect_issues) manufacturer_counters = [ 'Seek_Error_Rate', 'Command_Timeout', 'Raw_Read_Error_Rate' ] # Count drive failures for drive in health_report.get('drives_health', {}).get('drives', []): if drive.get('smart_status') == 'UNHEALTHY': count += 1 # Only count critical issues that aren't manufacturer operation counters for issue in drive.get('smart_issues', []): if 'critical' in issue.lower(): # Skip manufacturer operation counters if not any(counter in issue for counter in manufacturer_counters): count += 1 # Count ECC errors if health_report.get('memory_health', {}).get('status') == 'CRITICAL': count += 1 # Count network failures net = health_report.get('network_health', {}) if net.get('management_network', {}).get('status') == 'CRITICAL': count += 1 if net.get('ceph_network', {}).get('status') == 'CRITICAL': count += 1 # Count LXC critical issues if health_report.get('lxc_health', {}).get('status') == 'CRITICAL': count += 1 return count def _determine_ticket_priority(self, issue: str, health_report: Dict[str, Any]) -> str: """ Determine ticket priority based on issue type, severity, and context. P1 = Cluster outages, multiple simultaneous failures P2 = Hardware failures requiring same-day response P3 = Warnings requiring response within 1-3 days P4 = Normal monitoring alerts P5 = Informational/minimal impact """ issue_lower = issue.lower() # Count total critical issues for escalation logic critical_count = self._count_critical_issues(health_report) escalation_threshold = self.CONFIG.get('PRIORITY_ESCALATION_THRESHOLD', 3) # P1 - Multiple simultaneous critical failures (cluster risk) if critical_count >= escalation_threshold: logger.info(f"P1 escalation triggered: {critical_count} critical issues detected") return self.PRIORITIES['CRITICAL'] # P1 # P1 - Specific cluster-affecting scenarios if any(keyword in issue_lower for keyword in [ 'raid degraded', 'multiple drive', 'both networks unreachable', 'health_err' # Ceph cluster error ]): return self.PRIORITIES['CRITICAL'] # P1 # P2 - Hardware failures requiring same-day response if any(keyword in issue_lower for keyword in [ 'smart failure', 'smart overall health check failed', 'drive failure', 'disk failure', 'uncorrectable ecc', 'hardware failure', 'critical temperature', 'firmware issue', 'reallocated_sector', 'pending_sector', 'offline_uncorrectable', 'critical available_spare', 'critical wear', 'critical reallocated', 'critical current_pending', 'network is unreachable', 'osd is down', 'osd down', # Ceph OSD down 'cluster usage critical', # Ceph usage critical 'zfs pool', 'backup failed', # PBS critical issues 'usage critical' # PBS ZFS critical usage ]): return self.PRIORITIES['HIGH'] # P2 # P2 - Ceph OSD issues (need to check explicitly since 'down' is in issue text) if '[ceph]' in issue_lower and 'down' in issue_lower: return self.PRIORITIES['HIGH'] # P2 # P2 - SMART issues with critical indicators if 'smart issues' in issue_lower and any(error_type in issue_lower for error_type in [ 'critical', 'failed', 'reallocated', 'pending', 'uncorrectable', 'offline' ]): return self.PRIORITIES['HIGH'] # P2 # P3 - Warnings requiring attention within days if any(keyword in issue_lower for keyword in [ 'warning', 'high temperature', 'correctable ecc', 'trend alert', 'critical storage usage', 'low available_spare', 'high wear', 'health_warn', 'cluster usage warning', # Ceph warnings 'gc failed', 'sync failed', 'usage high' # PBS warnings ]): return self.PRIORITIES['MEDIUM'] # P3 # P4 - Normal monitoring alerts if any(keyword in issue_lower for keyword in [ 'cpu usage', 'high storage usage', 'system log', 'drive age' ]): return self.PRIORITIES['NORMAL'] # P4 # P5 - Informational/minimal impact if any(keyword in issue_lower for keyword in [ 'info:', 'info ', 'above optimal', 'monitor only' ]): return self.PRIORITIES['LOW'] # P5 # Default to P3 for unknown issues (conservative approach) return self.PRIORITIES['MEDIUM'] def _categorize_issue(self, issue: str) -> tuple: """ Determine the correct category, type, and tags for an issue. Returns: tuple: (category, ticket_type, issue_tag, ticket_type_tag) - category: 'Hardware', 'Software', 'Network', etc. - ticket_type: 'Issue', 'Problem', 'Task', 'Maintenance', etc. - issue_tag: '[hardware]', '[software]', '[network]' - ticket_type_tag: '[issue]', '[problem]', etc. """ issue_lower = issue.lower() # Hardware Issues - Physical hardware problems if any(keyword in issue_lower for keyword in [ 'smart', 'drive', 'disk', '/dev/', 'sector', 'temperature', 'firmware', 'power_on_hours', 'reallocated', 'pending', 'ecc', 'memory', 'high_fly_writes', 'spin_retry', 'current_pending', 'nvme' ]): # SMART errors/failures are issues (unplanned degradation) if any(error in issue_lower for error in ['critical', 'failed', 'failure', 'error']): return ( self.TICKET_CATEGORIES['HARDWARE'], self.TICKET_TYPES['ISSUE'], self.TICKET_TEMPLATES['HARDWARE_TYPE']['HARDWARE'], self.TICKET_TEMPLATES['TICKET_TYPE']['ISSUE'] ) # SMART warnings are problems (need investigation) else: return ( self.TICKET_CATEGORIES['HARDWARE'], self.TICKET_TYPES['PROBLEM'], self.TICKET_TEMPLATES['HARDWARE_TYPE']['HARDWARE'], self.TICKET_TEMPLATES['TICKET_TYPE']['PROBLEM'] ) # Software Issues - Application/OS/Container issues if any(keyword in issue_lower for keyword in [ 'lxc', 'container', 'storage usage', 'cpu usage', 'process', 'application', 'service', 'daemon' ]): # Critical storage/CPU is an issue (service degradation) if 'critical' in issue_lower: return ( self.TICKET_CATEGORIES['SOFTWARE'], self.TICKET_TYPES['ISSUE'], self.TICKET_TEMPLATES['SOFTWARE_TYPE']['SOFTWARE'], self.TICKET_TEMPLATES['TICKET_TYPE']['ISSUE'] ) # Warning level is a problem (needs investigation before it becomes critical) else: return ( self.TICKET_CATEGORIES['SOFTWARE'], self.TICKET_TYPES['PROBLEM'], self.TICKET_TEMPLATES['SOFTWARE_TYPE']['SOFTWARE'], self.TICKET_TEMPLATES['TICKET_TYPE']['PROBLEM'] ) # Network Issues - Network connectivity/infrastructure (categorized as Hardware) if any(keyword in issue_lower for keyword in [ 'network', 'connectivity', 'unreachable', 'latency', 'packet loss', 'interface', 'link down' ]): # Network failures are issues if any(error in issue_lower for error in ['failure', 'down', 'unreachable', 'critical']): return ( self.TICKET_CATEGORIES['HARDWARE'], self.TICKET_TYPES['ISSUE'], self.TICKET_TEMPLATES['NETWORK_TYPE']['NETWORK'], self.TICKET_TEMPLATES['TICKET_TYPE']['ISSUE'] ) # Network warnings are problems else: return ( self.TICKET_CATEGORIES['HARDWARE'], self.TICKET_TYPES['PROBLEM'], self.TICKET_TEMPLATES['NETWORK_TYPE']['NETWORK'], self.TICKET_TEMPLATES['TICKET_TYPE']['PROBLEM'] ) # Ceph Issues - Storage cluster issues (categorized as Hardware) if any(keyword in issue_lower for keyword in [ 'ceph', 'osd', 'health_err', 'health_warn', 'cluster usage' ]): # Ceph errors are issues (unplanned degradation) if any(error in issue_lower for error in [ 'health_err', 'down', 'critical', 'error' ]): return ( self.TICKET_CATEGORIES['HARDWARE'], self.TICKET_TYPES['ISSUE'], '[ceph]', self.TICKET_TEMPLATES['TICKET_TYPE']['ISSUE'] ) # Ceph warnings are problems (need investigation) else: return ( self.TICKET_CATEGORIES['HARDWARE'], self.TICKET_TYPES['PROBLEM'], '[ceph]', self.TICKET_TEMPLATES['TICKET_TYPE']['PROBLEM'] ) # PBS Issues - Backup server issues (categorized as Hardware for storage, Software for tasks) if any(keyword in issue_lower for keyword in [ 'pbs', 'zfs pool', 'backup failed', 'gc failed', 'sync failed' ]): if any(error in issue_lower for error in [ 'degraded', 'critical', 'failed', 'errors' ]): return ( self.TICKET_CATEGORIES['HARDWARE'], self.TICKET_TYPES['ISSUE'], '[pbs]', self.TICKET_TEMPLATES['TICKET_TYPE']['ISSUE'] ) else: return ( self.TICKET_CATEGORIES['HARDWARE'], self.TICKET_TYPES['PROBLEM'], '[pbs]', self.TICKET_TEMPLATES['TICKET_TYPE']['PROBLEM'] ) # Default: Hardware Problem (for undefined cases) return ( self.TICKET_CATEGORIES['HARDWARE'], self.TICKET_TYPES['PROBLEM'], self.TICKET_TEMPLATES['HARDWARE_TYPE']['HARDWARE'], self.TICKET_TEMPLATES['TICKET_TYPE']['PROBLEM'] ) # ============================================================================= # TICKET CREATION METHODS # ============================================================================= def _create_tickets_for_issues(self, health_report: Dict[str, Any]): """Create tickets for detected issues.""" issues = self._detect_issues(health_report) if not issues: logger.info("No issues detected.") return hostname = socket.gethostname() action_type = self.TICKET_TEMPLATES['ACTION_TYPE'] environment = self.TICKET_TEMPLATES['ENVIRONMENT'] for issue in issues: # Use the comprehensive priority determination function priority = self._determine_ticket_priority(issue, health_report) # Get proper categorization for this issue category, ticket_type, issue_tag, ticket_type_tag = self._categorize_issue(issue) # Determine scope: cluster-wide for Ceph cluster issues, single-node otherwise is_cluster_wide = '[cluster-wide]' in issue scope = self.TICKET_TEMPLATES['SCOPE']['CLUSTER_WIDE'] if is_cluster_wide else self.TICKET_TEMPLATES['SCOPE']['SINGLE_NODE'] # Clean issue text for title (remove [cluster-wide] and [ceph] markers if present) clean_issue = issue if is_cluster_wide: clean_issue = clean_issue.replace('[cluster-wide] ', '').replace('[cluster-wide]', '') # Remove [ceph] marker since _categorize_issue adds it as issue_tag clean_issue = clean_issue.replace('[ceph] ', '').replace('[ceph]', '') # Strip ever-changing SMART counters from the title so the title stays # stable across runs and doesn't trigger hourly "Title updated" comment spam. # The counter values are already captured in the ticket description. clean_issue = re.sub(r':\s*(?:Warning|Critical)\s+\w+:\s*\d+', '', clean_issue).strip(': ').strip() # Extract drive capacity if this is a drive-related issue. # Issue strings now use serial numbers; find the matching drive by serial. drive_size = "" issue_serial = None if "Drive" in issue and ("has SMART issues" in issue or "temperature is high" in issue): serial_match = re.search(r'Drive (\S+) (?:has SMART issues|temperature is high)', issue) if serial_match: issue_serial = serial_match.group(1) # Find the device path for this serial via the details cache matched_device = None for d in health_report.get('drives_health', {}).get('drives', []): dd = self._get_drive_details(d['device']) if (dd.get('serial') or d['device']) == issue_serial: matched_device = d['device'] break if matched_device: drive_details = self._get_drive_details(matched_device) if drive_details['capacity']: drive_size = f"[{drive_details['capacity']}] " else: logger.warning(f"Could not find device for drive id '{issue_serial}' in issue: {issue}") # Build ticket title with proper categorization # Add space after issue_tag if drive_size is empty (for non-drive issues) issue_separator = drive_size if drive_size else " " # Use cluster name for cluster-wide issues instead of individual hostname # This ensures all nodes generate the same ticket title for deduplication cluster_name = self.CONFIG.get('CLUSTER_NAME', 'proxmox-cluster') ticket_source = f"[{cluster_name}]" if is_cluster_wide else f"[{hostname}]" ticket_title = ( f"{ticket_source}" f"{action_type['AUTO']}" f"{issue_tag}" f"{issue_separator}" f"{clean_issue}" f"{scope}" f"{environment['PRODUCTION']}" f"{ticket_type_tag}" ) description = self._generate_detailed_description(issue, health_report, priority) # NOTE: The ticket API (create_ticket_api.php) deduplicates using a SHA-256 hash of: # issue_category + environment_tags + hostname (excluded for [cluster-wide]) + serial # Serial is preferred over device path — it remains stable across reboots and # device-letter reassignments. Falls back to /dev/sdX for non-SMART-capable devices. ticket_payload = { "title": ticket_title, "description": description, "priority": priority, "status": "Open", "category": category, "type": ticket_type, "serial": issue_serial, # drive serial for stable dedup; None for non-drive issues } if self.dry_run: logger.info("Dry-run mode enabled. Simulating ticket creation:") logger.info(json.dumps(ticket_payload, indent=4)) else: try: response = requests.post( self.ticket_api_url, json=ticket_payload, headers={ 'Content-Type': 'application/json', 'Authorization': f'Bearer {self.CONFIG["TICKET_API_KEY"]}' }, timeout=10 # 10 second timeout for API calls ) try: response_data = response.json() except json.JSONDecodeError as e: logger.error(f"Invalid JSON response from ticket API: {e}") continue if response_data.get('success'): logger.info(f"Ticket created successfully: {ticket_title}") logger.info(f"Ticket ID: {response_data.get('ticket_id')}") elif response_data.get('error') == 'Duplicate ticket': logger.info(f"Duplicate ticket detected - existing ticket ID: {response_data.get('existing_ticket_id')}") continue else: logger.error(f"Failed to create ticket: {response_data.get('error')}") except Exception as e: logger.error(f"Error creating ticket: {e}") def _detect_issues(self, health_report: Dict[str, Any]) -> List[str]: """ Detect issues in the health report including non-critical issues. :param health_report: The comprehensive health report from the checks. :return: List of issue descriptions detected during checks. """ issues = [] # Check for drive-related issues for drive in health_report.get('drives_health', {}).get('drives', []): # Skip drives with ERROR or NOT_SUPPORTED status - these are likely virtual/unsupported devices if drive.get('smart_status') in ['ERROR', 'NOT_SUPPORTED']: logger.debug(f"Skipping issue detection for drive {drive['device']} with status {drive.get('smart_status')}") continue # Only report issues for drives with valid SMART status if drive.get('smart_issues') and drive.get('smart_status') in ['HEALTHY', 'UNHEALTHY', 'UNKNOWN', 'REPLACEMENT_NEEDED']: # Filter out generic error messages and manufacturer-specific false positives filtered_issues = [] for issue in drive['smart_issues']: # Skip generic errors if any(skip_phrase in issue for skip_phrase in [ "Error checking SMART:", "Unable to read device information", "SMART not supported", "timed out" ]): continue # Skip manufacturer-specific operation counters (not actual errors) # These are monitored attributes that manufacturers use as counters if any(counter_name in issue for counter_name in [ "Seek_Error_Rate", # Seagate/WD use as operation counter "Command_Timeout", # OOS/Seagate use as operation counter "Raw_Read_Error_Rate" # Seagate/WD use as operation counter ]): logger.debug(f"Filtering manufacturer operation counter from issues: {issue}") continue filtered_issues.append(issue) if filtered_issues: drive_details = self._get_drive_details(drive['device']) drive_id = drive_details.get('serial') or drive['device'] issues.append(f"Drive {drive_id} has SMART issues: {', '.join(filtered_issues)}") # Check temperature regardless of SMART status if drive.get('temperature') and drive['temperature'] > self.CONFIG['THRESHOLDS']['TEMPERATURE_WARNING']: drive_details = self._get_drive_details(drive['device']) drive_id = drive_details.get('serial') or drive['device'] issues.append(f"Drive {drive_id} temperature is high: {drive['temperature']}°C") # Check for ECC memory errors memory_health = health_report.get('memory_health', {}) if memory_health.get('has_ecc') and memory_health.get('ecc_errors'): issues.extend(memory_health['ecc_errors']) # Check for CPU-related issues cpu_health = health_report.get('cpu_health', {}) if cpu_health and cpu_health.get('cpu_usage_percent', 0) > self.CONFIG['THRESHOLDS']['CPU_WARNING']: issues.append("CPU usage is above threshold of 95%") # Check for network-related issues network_health = health_report.get('network_health', {}) for network in ['management_network', 'ceph_network']: if network_health.get(network, {}).get('issues'): issues.extend(network_health[network]['issues']) lxc_health = health_report.get('lxc_health', {}) if lxc_health.get('status') in ['WARNING', 'CRITICAL']: issues.extend(lxc_health.get('issues', [])) # Check for system-level drive issues system_health = health_report.get('system_health', {}) if system_health.get('issues'): issues.extend(system_health['issues']) # Check for Ceph cluster issues ceph_health = health_report.get('ceph_health', {}) if ceph_health.get('is_ceph_node'): hostname = socket.gethostname() designated_node = self.CONFIG.get('CEPH_TICKET_NODE') # Cluster-wide issues: only create tickets from designated node (or first node if not set) # The [cluster-wide] tag + CLUSTER_NAME in ticket title ensures cross-node deduplication # in the tinker_tickets API (dedup hash excludes hostname for cluster-wide issues) if ceph_health.get('cluster_wide_issues'): # If no designated node, all nodes can report (API deduplicates) # If designated node is set, only that node creates tickets if not designated_node or hostname == designated_node: for issue in ceph_health['cluster_wide_issues']: # Add [cluster-wide] marker for API deduplication issues.append(f"[cluster-wide] [ceph] {issue}") else: logger.debug(f"Skipping cluster-wide Ceph issues (designated node: {designated_node})") # Node-specific issues: always report from the affected node if ceph_health.get('issues'): for issue in ceph_health['issues']: issues.append(f"[ceph] {issue}") # Check for PBS issues pbs_health = health_report.get('pbs_health', {}) if pbs_health.get('is_pbs_node') and pbs_health.get('issues'): for issue in pbs_health['issues']: issues.append(f"[pbs] {issue.get('issue', str(issue))}") logger.info("=== Issue Detection Started ===") logger.info(f"Checking drives: {len(health_report['drives_health']['drives'])} found") logger.info(f"Memory status: {health_report['memory_health']['status']}") logger.info(f"CPU status: {health_report['cpu_health']['status']}") logger.info(f"Network status: {health_report['network_health']}") logger.info(f"System status: {health_report['system_health']['status']}") logger.info(f"Detected issues (pre-filter): {issues}") # Filter out INFO-level issues unless configured to include them if not self.CONFIG.get('INCLUDE_INFO_TICKETS', False): actionable_issues = [] for issue in issues: # Skip INFO-level issues (P5 candidates that shouldn't create tickets) if any(info_marker in issue.lower() for info_marker in [ 'info:', 'info ', 'above optimal', 'monitor only' ]): logger.debug(f"Filtering INFO-level issue: {issue}") continue actionable_issues.append(issue) issues = actionable_issues logger.info(f"Filtered to actionable issues: {issues}") logger.info("=== Issue Detection Completed ===\n") return issues # ============================================================================= # DISK AND STORAGE UTILITY METHODS # ============================================================================= def _get_all_disks(self) -> List[str]: """Get all physical disks using lsblk with full device paths.""" disks = set() try: result = subprocess.run( ['lsblk', '-d', '-n', '-o', 'NAME,TYPE', '-p'], stdout=subprocess.PIPE, text=True, timeout=10 ) for line in result.stdout.strip().split('\n'): if line: parts = line.split() if len(parts) >= 2 and parts[1] == 'disk' and not parts[0].startswith('/dev/rbd'): disks.add(parts[0]) logger.debug(f"Physical disks found: {disks}") except subprocess.TimeoutExpired: logger.error("lsblk timed out during disk detection") except Exception as e: logger.error(f"Failed to detect disks: {e}") return sorted(disks) def _is_excluded_mount(self, mountpoint: str) -> bool: """Check if a mountpoint should be excluded from monitoring.""" # Check exact matches if mountpoint in self.CONFIG['EXCLUDED_MOUNTS']: return True # Check patterns for pattern in self.CONFIG['EXCLUDED_PATTERNS']: if re.match(pattern, mountpoint): return True return False def _format_bytes_human(self, num_bytes): """Format a byte count into a human-readable string.""" for unit in ['B', 'KB', 'MB', 'GB', 'TB', 'PB']: if abs(num_bytes) < 1024.0: return f"{num_bytes:.1f} {unit}" num_bytes /= 1024.0 return f"{num_bytes:.1f} EB" def _parse_size(self, size_str: str) -> float: """ Parse size string with units to bytes. :param size_str: String containing size with unit (e.g. '15.7G', '21.8T') :return: Size in bytes as float """ try: # Skip non-size strings if not isinstance(size_str, str): logger.debug(f"Not a string: {size_str}") return 0.0 if not any(unit in size_str.upper() for unit in ['B', 'K', 'M', 'G', 'T']): logger.debug(f"No valid size unit found in: {size_str}") return 0.0 # Define multipliers for units multipliers = { 'B': 1, 'K': 1024, 'M': 1024**2, 'G': 1024**3, 'T': 1024**4 } # Extract numeric value and unit match = re.match(r'(\d+\.?\d*)', size_str) if not match: logger.debug(f"Could not extract numeric value from: {size_str}") return 0.0 value = float(match.group(1)) unit_match = re.search(r'([BKMGT])', size_str.upper()) if not unit_match: logger.debug(f"Could not extract unit from: {size_str}") return 0.0 unit = unit_match.group(1) # Convert to bytes bytes_value = value * multipliers.get(unit, 0) return bytes_value except (ValueError, AttributeError, TypeError) as e: logger.debug(f"Failed to parse size string: {size_str}") logger.debug(f"P**** error details: {str(e)}") return 0.0 def _is_physical_disk(self, device_path): """ Check if the device is a physical disk, excluding logical volumes and special devices. :param device_path: Path to the device :return: Boolean indicating if it's a relevant physical disk """ logger.debug(f"Checking device: {device_path}") # Exclude known non-physical or special devices excluded_patterns = [ r'/dev/mapper/', # LVM devices r'/dev/dm-', # Device mapper devices r'/dev/loop', # Loop devices r'/dev/rbd', # Ceph RBD devices r'/boot', # Boot partitions r'/boot/efi', # EFI partitions r'[0-9]+$' # Partition numbers ] if any(re.search(pattern, device_path) for pattern in excluded_patterns): logger.debug(f"Device {device_path} excluded due to pattern match") return False # Match physical devices physical_patterns = [ r'/dev/sd[a-z]+$', # SATA/SAS drives r'/dev/nvme\d+n\d+$', # NVMe drives r'/dev/mmcblk\d+$', # MMC/SD cards r'/dev/hd[a-z]+$' # IDE drives (legacy) ] is_physical = any(re.match(pattern, device_path) for pattern in physical_patterns) logger.debug(f"Device {device_path} physical disk check result: {is_physical}") return is_physical def _check_disk_firmware(self, device: str) -> Dict[str, Any]: """Check disk firmware version against known problematic versions.""" firmware_info = { 'version': None, 'model': None, 'manufacturer': None, 'is_problematic': False, 'known_issues': [] } MANUFACTURER_PATTERNS = { 'Western Digital': ['WDC', 'Western Digital', 'Ultrastar'], 'Samsung': ['Samsung', 'SAMSUNG'], 'Seagate': ['Seagate', 'ST'], 'Intel': ['Intel', 'INTEL'], 'Micron': ['Micron', 'Crucial'], 'Toshiba': ['Toshiba', 'TOSHIBA'] } try: result = subprocess.run( ['smartctl', '-i', device], stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, timeout=30 ) model_line = None for line in result.stdout.split('\n'): if 'Firmware Version:' in line: firmware_info['version'] = line.split(':')[1].strip() elif 'Model Family:' in line: model_line = line firmware_info['model'] = line.split(':')[1].strip() elif 'Device Model:' in line and not firmware_info['model']: model_line = line firmware_info['model'] = line.split(':')[1].strip() # Determine manufacturer if model_line: for manufacturer, patterns in MANUFACTURER_PATTERNS.items(): if any(pattern in model_line for pattern in patterns): firmware_info['manufacturer'] = manufacturer break # Check against known problematic versions if firmware_info['manufacturer'] and firmware_info['model']: # Check if manufacturer exists in our problematic firmware database if firmware_info['manufacturer'] in self.PROBLEMATIC_FIRMWARE: for model, versions in self.PROBLEMATIC_FIRMWARE[firmware_info['manufacturer']].items(): if model in firmware_info['model'] and firmware_info['version'] in versions: firmware_info['is_problematic'] = True firmware_info['known_issues'].append( f"Known problematic firmware version {firmware_info['version']} " f"for {firmware_info['model']}" ) logger.debug(f"=== Firmware Check for {device} ===") logger.debug(f"Firmware version: {firmware_info['version']}") logger.debug(f"Model: {firmware_info['model']}") logger.debug(f"Manufacturer: {firmware_info['manufacturer']}") logger.debug(f"Known issues: {firmware_info['known_issues']}") logger.debug("=== End Firmware Check ===\n") except subprocess.TimeoutExpired: logger.warning(f"smartctl -i timed out for {device}") except Exception as e: firmware_info['known_issues'].append(f"Error checking firmware: {str(e)}") return firmware_info # ============================================================================= # SMART HEALTH CHECKING METHODS # ============================================================================= def _parse_smart_value(self, raw_value: str) -> int: """Parse SMART values handling different formats including NVMe temperature readings.""" try: # Handle temperature values with °C if isinstance(raw_value, str) and '°C' in raw_value: # Extract only the numeric portion before °C temp_value = raw_value.split('°C')[0].strip() return int(temp_value) # Handle time format (e.g., '15589h+17m+33.939s') if 'h+' in raw_value: return int(raw_value.split('h+')[0]) # Handle hex values if '0x' in raw_value: return int(raw_value, 16) # Handle basic numbers return int(raw_value) except ValueError: logger.debug(f"Could not parse SMART value: {raw_value}") return 0 def _detect_manufacturer(self, model: str, serial: str = None) -> str: """Enhanced manufacturer detection based on model and serial patterns.""" if not model: return 'Unknown' model_upper = model.upper() # Western Digital patterns (including HGST which WD acquired) if any(pattern in model_upper for pattern in ['WDC', 'WD-', 'HGST', 'WESTERN DIGITAL']): return 'Western Digital' # Seagate patterns elif any(pattern in model_upper for pattern in ['ST', 'SEAGATE']): return 'Seagate' # Samsung patterns elif 'SAMSUNG' in model_upper: return 'Samsung' # Intel patterns elif any(pattern in model_upper for pattern in ['INTEL', 'SSDSC']): return 'Intel' # Micron/Crucial patterns elif any(pattern in model_upper for pattern in ['CRUCIAL', 'MICRON', 'CT']): return 'Micron' # Toshiba patterns elif 'TOSHIBA' in model_upper: return 'Toshiba' # Ridata/Ritek patterns (for your existing special handling) elif any(pattern in model_upper for pattern in ['RIDATA', 'RITEK']): return 'Ridata' # OOS patterns (for your existing special handling) elif 'OOS' in model_upper: return 'OOS' return 'Unknown' def _get_manufacturer_profile(self, model: str, manufacturer: str = None, firmware: str = None) -> Dict[str, Any]: """Get manufacturer-specific SMART profile based on drive model/manufacturer/firmware.""" logger.debug(f"Looking for profile - Model: '{model}', Manufacturer: '{manufacturer}', Firmware: '{firmware}'") # First, try to detect manufacturer if not provided if not manufacturer: manufacturer = self._detect_manufacturer(model) logger.debug(f"Auto-detected manufacturer: {manufacturer}") # Check each manufacturer profile for mfg, profile in self.MANUFACTURER_SMART_PROFILES.items(): # Check firmware patterns first (most specific for OEM drives like RiData) if firmware and 'firmware_patterns' in profile: for pattern in profile['firmware_patterns']: if firmware.startswith(pattern) or pattern in firmware: logger.debug(f"Matched manufacturer profile: {mfg} for firmware pattern '{pattern}' in '{firmware}'") return profile # Check if detected manufacturer matches this profile if manufacturer and manufacturer in profile['aliases']: logger.debug(f"Matched manufacturer profile: {mfg} for detected manufacturer '{manufacturer}'") return profile # Check model/manufacturer aliases (fallback) for alias in profile['aliases']: if alias.lower() in model.lower() or (manufacturer and alias.lower() in manufacturer.lower()): logger.debug(f"Matched manufacturer profile: {mfg} for model alias '{alias}' in '{model}'") return profile # Return generic profile if no match logger.debug(f"No specific profile found for Model: '{model}', Manufacturer: '{manufacturer}', Firmware: '{firmware}', using Generic profile") return self.MANUFACTURER_SMART_PROFILES['Generic'] def _should_monitor_attribute(self, attr_name: str, manufacturer_profile: dict) -> bool: """Check if an attribute should be monitored based on manufacturer profile.""" if not manufacturer_profile: return True # Default: monitor everything attr_config = manufacturer_profile.get('attributes', {}).get(attr_name, {}) # Check if explicitly set to not monitor if attr_config.get('monitor') is False: logger.debug(f"Skipping monitoring for {attr_name} - explicitly disabled") return False return True # Default: monitor unless explicitly disabled def _get_attribute_thresholds(self, attr_name: str, manufacturer_profile: dict) -> dict: """Get attribute-specific thresholds, falling back to defaults.""" # Check for manufacturer-specific thresholds first if manufacturer_profile: attr_config = manufacturer_profile.get('attributes', {}).get(attr_name, {}) if 'warning_threshold' in attr_config and 'critical_threshold' in attr_config: return { 'warning': attr_config['warning_threshold'], 'critical': attr_config['critical_threshold'], 'behavior': attr_config.get('behavior', 'countup') } # Enhanced BASE_SMART_THRESHOLDS with manufacturer-specific handling BASE_SMART_THRESHOLDS = { 'Reallocated_Sector_Ct': {'warning': 5, 'critical': 10}, 'Current_Pending_Sector': {'warning': 1, 'critical': 5}, 'Offline_Uncorrectable': {'warning': 1, 'critical': 2}, 'Reported_Uncorrect': {'warning': 1, 'critical': 10}, 'Spin_Retry_Count': {'warning': 1, 'critical': 5}, 'Power_Cycle_Count': {'warning': 5000, 'critical': 10000}, 'Power_On_Hours': {'warning': 61320, 'critical': 70080}, 'Temperature_Celsius': {'warning': 65, 'critical': 75}, 'Available_Spare': {'warning': 30, 'critical': 10}, 'Program_Fail_Count': {'warning': 10, 'critical': 20}, 'Erase_Fail_Count': {'warning': 10, 'critical': 20}, 'Load_Cycle_Count': {'warning': 900000, 'critical': 1000000}, 'SSD_Life_Left': {'warning': 30, 'critical': 10}, 'Program_Fail_Cnt_Total': {'warning': 1, 'critical': 5}, 'Erase_Fail_Count_Total': {'warning': 1, 'critical': 5}, # ADJUSTED: More lenient thresholds for error rates on unknown drives 'Raw_Read_Error_Rate': {'warning': 10000000, 'critical': 100000000}, # Raised significantly 'Seek_Error_Rate': {'warning': 10000000, 'critical': 100000000}, # Raised significantly 'Command_Timeout': {'warning': 100, 'critical': 1000}, # Raised significantly 'High_Fly_Writes': {'warning': 1, 'critical': 5}, 'Airflow_Temperature_Cel': {'warning': 65, 'critical': 75}, 'G_Sense_Error_Rate': {'warning': 100, 'critical': 1000}, 'Power-Off_Retract_Count': {'warning': 100000, 'critical': 500000}, 'Head_Flying_Hours': {'warning': 50000, 'critical': 70000}, 'Runtime_Bad_Block': {'warning': 10, 'critical': 100}, 'Factory_Bad_Block_Ct': {'warning': 50, 'critical': 200}, 'Grown_Failing_Block_Ct': {'warning': 10, 'critical': 50}, 'End-to-End_Error': {'warning': 1, 'critical': 5} } if attr_name in BASE_SMART_THRESHOLDS: return { 'warning': BASE_SMART_THRESHOLDS[attr_name]['warning'], 'critical': BASE_SMART_THRESHOLDS[attr_name]['critical'], 'behavior': 'countup' } return None # No thresholds defined def _is_new_drive(self, power_on_hours: int) -> bool: """Determine if a drive is considered "new" based on power-on hours.""" return power_on_hours < self.CONFIG['NEW_DRIVE_HOURS_THRESHOLD'] def _check_smart_health(self, device: str) -> Dict[str, Any]: """Enhanced SMART health check with better error handling and predictive analysis.""" smart_health = { 'status': 'UNKNOWN', 'severity': 'NORMAL', 'issues': [], 'temp': None, 'attributes': {}, 'manufacturer_profile': None } try: # Skip virtual devices if '/dev/rbd' in device or '/dev/dm-' in device or '/dev/mapper/' in device: smart_health['status'] = 'NOT_SUPPORTED' smart_health['issues'].append("Virtual device - SMART not applicable") return smart_health # First verify the device is SMART-capable drive_details = self._get_drive_details(device) if not drive_details.get('smart_capable', False): smart_health['status'] = 'NOT_SUPPORTED' smart_health['issues'].append("SMART not supported on this device") return smart_health # Special handling for NVMe devices if 'nvme' in device: return self._check_nvme_smart_health(device) # If we have no model info, the device might not be responding properly if not drive_details.get('model'): smart_health['status'] = 'ERROR' smart_health['issues'].append("Unable to read device information") return smart_health # Ridata drives - known unreliable hardware, flag for replacement manufacturer = self._detect_manufacturer(drive_details.get('model', '')) if manufacturer == 'Ridata': smart_health['status'] = 'REPLACEMENT_NEEDED' smart_health['severity'] = 'WARNING' smart_health['issues'].append("Ridata drive detected - known unreliable hardware, replacement recommended") logger.info(f"Ridata drive {device} flagged for replacement") return smart_health logger.debug(f"Drive details for {device}: {drive_details}") manufacturer_profile = self._get_manufacturer_profile( drive_details.get('model', ''), drive_details.get('manufacturer', ''), drive_details.get('firmware', '') ) smart_health['manufacturer_profile'] = manufacturer_profile logger.debug(f"Selected manufacturer profile for {device}: {manufacturer_profile.get('aliases', ['Unknown'])[0] if manufacturer_profile else 'None'}") # Get firmware information firmware_info = self._check_disk_firmware(device) if firmware_info['is_problematic']: smart_health['severity'] = 'WARNING' smart_health['issues'].extend(firmware_info['known_issues']) # Get detailed SMART data with timeout result = subprocess.run( ['smartctl', '-A', '-H', '-l', 'error', '-l', 'background', device], stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, timeout=30 ) output = result.stdout # Check overall health status if 'FAILED' in output and 'PASSED' not in output: smart_health['status'] = 'UNHEALTHY' smart_health['severity'] = 'CRITICAL' smart_health['issues'].append("SMART overall health check failed") elif 'PASSED' in output: smart_health['status'] = 'HEALTHY' else: smart_health['status'] = 'UNKNOWN' # Parse SMART attributes with manufacturer-specific handling power_on_hours = 0 # First pass: collect all SMART attributes with priority for _Total versions smart_attributes_raw = {} for line in output.split('\n'): # Extract Power_On_Hours first to determine if drive is new if 'Power_On_Hours' in line: parts = line.split() if len(parts) >= 10: power_on_hours = self._parse_smart_value(parts[9]) smart_attributes_raw['Power_On_Hours'] = power_on_hours # Handle SMART attributes with preference for _Total versions for attr in ['Erase_Fail_Count', 'Program_Fail_Count']: # Check for _Total version first (more accurate) if f'{attr}_Total' in line: parts = line.split() if len(parts) >= 10: raw_value = self._parse_smart_value(parts[9]) smart_attributes_raw[f'{attr}_Total'] = raw_value # Store as _Total logger.debug(f"Found {attr}_Total: {raw_value}") break # Only use non-_Total version if _Total not found AND not Ridata elif attr in line and f'{attr}_Total' not in smart_attributes_raw: # Check if this is a Ridata drive and should skip regular counters if manufacturer_profile and manufacturer_profile.get('aliases', [{}])[0] == 'Ridata': logger.debug(f"Skipping {attr} for Ridata drive - using _Total version only") continue parts = line.split() if len(parts) >= 10: raw_value = self._parse_smart_value(parts[9]) smart_attributes_raw[attr] = raw_value logger.debug(f"Found {attr} (non-Total): {raw_value}") smart_health['attributes'] = smart_attributes_raw # Check if this is a new drive is_new_drive = self._is_new_drive(power_on_hours) logger.debug(f"Drive {device} power-on hours: {power_on_hours}, is_new_drive: {is_new_drive}") # Parse remaining SMART attributes for line in output.split('\n'): # Handle manufacturer-specific Wear_Leveling_Count if 'Wear_Leveling_Count' in line: parts = line.split() if len(parts) >= 10: raw_value = self._parse_smart_value(parts[9]) smart_health['attributes']['Wear_Leveling_Count'] = raw_value # Get manufacturer-specific thresholds wear_attr = manufacturer_profile.get('attributes', {}).get('Wear_Leveling_Count', {}) # Skip evaluation if this is a new drive and manufacturer profile says to ignore if is_new_drive and wear_attr.get('ignore_on_new_drive', False): logger.debug(f"Skipping Wear_Leveling_Count evaluation for new drive: {raw_value}") continue warning_threshold = wear_attr.get('warning_threshold') critical_threshold = wear_attr.get('critical_threshold') if warning_threshold and critical_threshold: behavior = wear_attr.get('behavior', 'countup') if behavior == 'countup': if raw_value >= critical_threshold: smart_health['severity'] = 'CRITICAL' smart_health['issues'].append(f"Critical wear leveling count: {raw_value}") elif raw_value >= warning_threshold: if smart_health['severity'] != 'CRITICAL': smart_health['severity'] = 'WARNING' smart_health['issues'].append(f"High wear leveling count: {raw_value}") elif behavior == 'countdown': if raw_value <= critical_threshold: smart_health['severity'] = 'CRITICAL' smart_health['issues'].append(f"Critical wear leveling remaining: {raw_value}") elif raw_value <= warning_threshold: if smart_health['severity'] != 'CRITICAL': smart_health['severity'] = 'WARNING' smart_health['issues'].append(f"Low wear leveling remaining: {raw_value}") # Handle all SMART attributes with manufacturer-specific logic ALL_SMART_ATTRIBUTES = [ 'Reallocated_Sector_Ct', 'Current_Pending_Sector', 'Offline_Uncorrectable', 'Reported_Uncorrect', 'Spin_Retry_Count', 'Power_Cycle_Count', 'Power_On_Hours', 'Temperature_Celsius', 'Available_Spare', 'Program_Fail_Count', 'Erase_Fail_Count', 'Load_Cycle_Count', 'SSD_Life_Left', 'Program_Fail_Cnt_Total', 'Erase_Fail_Count_Total', 'Program_Fail_Count_Chip', 'Erase_Fail_Count_Chip', 'Raw_Read_Error_Rate', 'Seek_Error_Rate', 'Command_Timeout', 'High_Fly_Writes', 'Airflow_Temperature_Cel', 'G_Sense_Error_Rate', 'Power-Off_Retract_Count', 'Head_Flying_Hours', 'Runtime_Bad_Block', 'Factory_Bad_Block_Ct', 'Grown_Failing_Block_Ct', 'End-to-End_Error' ] for line in output.split('\n'): for attr in ALL_SMART_ATTRIBUTES: if attr in line and attr not in ['Wear_Leveling_Count']: # Wear_Leveling handled separately above # Check if we should monitor this attribute if not self._should_monitor_attribute(attr, manufacturer_profile): logger.debug(f"Skipping {attr} - disabled for this manufacturer") continue parts = line.split() if len(parts) >= 10: raw_value = self._parse_smart_value(parts[9]) smart_health['attributes'][attr] = raw_value # Get manufacturer-specific or default thresholds attr_thresholds = self._get_attribute_thresholds(attr, manufacturer_profile) if not attr_thresholds: continue # Apply thresholds based on behavior if attr == 'Temperature_Celsius': smart_health['temp'] = raw_value if raw_value >= attr_thresholds['critical']: smart_health['severity'] = 'CRITICAL' smart_health['issues'].append(f"Critical temperature: {raw_value}°C") elif raw_value >= attr_thresholds['warning']: if smart_health['severity'] != 'CRITICAL': smart_health['severity'] = 'WARNING' smart_health['issues'].append(f"High temperature: {raw_value}°C") else: # Handle countup/countdown behavior behavior = attr_thresholds.get('behavior', 'countup') if behavior == 'countup': if raw_value >= attr_thresholds['critical']: smart_health['severity'] = 'CRITICAL' smart_health['issues'].append(f"Critical {attr}: {raw_value}") elif raw_value >= attr_thresholds['warning']: if smart_health['severity'] != 'CRITICAL': smart_health['severity'] = 'WARNING' smart_health['issues'].append(f"Warning {attr}: {raw_value}") elif behavior == 'countdown': if raw_value <= attr_thresholds['critical']: smart_health['severity'] = 'CRITICAL' smart_health['issues'].append(f"Critical {attr}: {raw_value}") elif raw_value <= attr_thresholds['warning']: if smart_health['severity'] != 'CRITICAL': smart_health['severity'] = 'WARNING' smart_health['issues'].append(f"Warning {attr}: {raw_value}") # Check for recent SMART errors error_log_pattern = r"Error \d+ occurred at disk power-on lifetime: (\d+) hours" error_matches = re.finditer(error_log_pattern, output) recent_errors = [] for match in error_matches: error_hour = int(match.group(1)) current_hours = smart_health['attributes'].get('Power_On_Hours', 0) if current_hours - error_hour < self.CONFIG['SMART_ERROR_RECENT_HOURS']: recent_errors.append(match.group(0)) if recent_errors: smart_health['severity'] = 'WARNING' smart_health['issues'].extend(recent_errors) # Enhanced analysis methods if smart_health['attributes']: # Trend analysis for predictive failure detection trend_issues = self._analyze_smart_trends(device, smart_health['attributes']) smart_health['issues'].extend(trend_issues) # SSD-specific checks drive_type = drive_details.get('type', 'HDD') if drive_type == 'SSD': ssd_issues = self._check_ssd_health(device, smart_health['attributes']) smart_health['issues'].extend(ssd_issues) # Enhanced temperature analysis if smart_health['temp']: drive_type = drive_details.get('type', 'HDD') thermal_issues = self._check_thermal_health(device, smart_health['temp'], drive_type) smart_health['issues'].extend(thermal_issues) # Error pattern analysis error_pattern_issues = self._analyze_error_patterns(device, output) smart_health['issues'].extend(error_pattern_issues) logger.debug(f"=== SMART Health Check for {device} ===") logger.debug(f"Manufacturer profile: {manufacturer_profile.get('aliases', ['Unknown'])[0] if manufacturer_profile else 'None'}") logger.debug("Raw SMART attributes:") for attr, value in smart_health['attributes'].items(): logger.debug(f"{attr}: {value}") logger.debug(f"Temperature: {smart_health['temp']}°C") logger.debug(f"Is new drive: {is_new_drive}") logger.debug(f"Detected Issues: {smart_health['issues']}") logger.debug("=== End SMART Check ===\n") # Special handling for NVMe drives (requires nvme-cli) if 'nvme' in device and self._available_tools.get('nvme'): try: nvme_result = subprocess.run( ['nvme', 'smart-log', device], stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, timeout=10 ) logger.debug(f"NVMe smart-log raw output for {device}:") logger.debug(nvme_result.stdout) # Initialize the temperature attribute if smart_health['temp'] is None: smart_health['attributes']['Temperature_Celsius'] = None for line in nvme_result.stdout.split('\n'): # Fix the NoneType error by checking if line exists and has content if line and line.strip() and 'temperature' in line.lower(): try: temp_str = line.split(':')[1].strip() if ':' in line else line.strip() logger.debug(f"Raw temperature string: {temp_str}") # Extract the first complete number from temperature string temp_match = re.search(r'(\d+)', temp_str) if temp_match: temp_value = int(temp_match.group(1)) logger.debug(f"Parsed temperature value: {temp_value}") # Set both temperature fields smart_health['temp'] = temp_value smart_health['attributes']['Temperature_Celsius'] = temp_value logger.debug(f"Final temperature recorded: {smart_health['temp']}") break except (ValueError, IndexError, AttributeError) as e: logger.debug(f"Error parsing NVMe temperature from line '{line}': {e}") continue except subprocess.TimeoutExpired: logger.debug(f"NVMe smart-log for {device} timed out") except Exception as e: logger.debug(f"Error getting NVMe smart data for {device}: {e}") except subprocess.TimeoutExpired: smart_health['status'] = 'ERROR' smart_health['issues'].append("SMART check timed out") except Exception as e: smart_health['status'] = 'ERROR' smart_health['severity'] = 'UNKNOWN' smart_health['issues'].append(f"Error checking SMART: {str(e)}") logger.debug(f"Exception in _check_smart_health for {device}: {e}") import traceback logger.debug(traceback.format_exc()) return smart_health def _check_nvme_smart_health(self, device: str) -> Dict[str, Any]: """Dedicated NVMe SMART health check.""" smart_health = { 'status': 'UNKNOWN', 'severity': 'NORMAL', 'issues': [], 'temp': None, 'attributes': {}, 'manufacturer_profile': None } if not self._available_tools.get('nvme'): logger.debug(f"nvme-cli not available, skipping NVMe health check for {device}") return smart_health try: # Use nvme-cli for NVMe devices result = subprocess.run( ['nvme', 'smart-log', device], stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, timeout=30 ) if result.returncode == 0: smart_health['status'] = 'HEALTHY' # Parse NVMe smart log output for line in result.stdout.split('\n'): if 'temperature' in line.lower(): # Extract temperature temp_match = re.search(r'(\d+)', line) if temp_match: smart_health['temp'] = int(temp_match.group(1)) smart_health['attributes']['Temperature_Celsius'] = smart_health['temp'] elif 'available_spare' in line.lower(): spare_match = re.search(r'(\d+)%', line) if spare_match: spare_pct = int(spare_match.group(1)) smart_health['attributes']['Available_Spare'] = spare_pct if spare_pct < 10: smart_health['severity'] = 'CRITICAL' smart_health['issues'].append(f"Critical Available_Spare: {spare_pct}%") elif spare_pct < 30: smart_health['severity'] = 'WARNING' smart_health['issues'].append(f"Low Available_Spare: {spare_pct}%") # Enhanced NVMe analysis if smart_health['attributes']: # Trend analysis for NVMe devices trend_issues = self._analyze_smart_trends(device, smart_health['attributes']) smart_health['issues'].extend(trend_issues) # SSD-specific checks for NVMe ssd_issues = self._check_ssd_health(device, smart_health['attributes']) smart_health['issues'].extend(ssd_issues) # Enhanced temperature analysis for NVMe if smart_health['temp']: thermal_issues = self._check_thermal_health(device, smart_health['temp'], 'SSD') smart_health['issues'].extend(thermal_issues) else: smart_health['status'] = 'ERROR' smart_health['issues'].append("Failed to read NVMe SMART data") except subprocess.TimeoutExpired: smart_health['status'] = 'ERROR' smart_health['issues'].append("NVMe SMART check timed out") except Exception as e: smart_health['status'] = 'ERROR' smart_health['issues'].append(f"Error checking NVMe SMART: {str(e)}") return smart_health def _check_drives_health(self) -> Dict[str, Any]: """Check health of all drives in the system.""" drives_health = {'overall_status': 'NORMAL', 'drives': []} if not self._available_tools.get('smartctl') or not self._available_tools.get('lsblk'): logger.warning("Drive health checks skipped: smartctl or lsblk not available") drives_health['overall_status'] = 'UNKNOWN' return drives_health try: # Get only valid physical disks physical_disks = self._get_all_disks() logger.debug(f"Checking physical disks: {physical_disks}") if not physical_disks: logger.warning("No valid physical disks found for monitoring") drives_health['overall_status'] = 'WARNING' return drives_health # Get ALL partition information including device mapper partitions = psutil.disk_partitions(all=True) # Create mapping of base devices to their partitions device_partitions = {} for part in partitions: # Extract base device (e.g., /dev/sda from /dev/sda1) base_device = re.match(r'(/dev/[a-z]+)', part.device) if base_device: base_dev = base_device.group(1) if base_dev not in device_partitions: device_partitions[base_dev] = [] device_partitions[base_dev].append(part) # Run SMART checks in parallel across all drives smart_results = {} max_workers = min(8, len(physical_disks)) with ThreadPoolExecutor(max_workers=max_workers) as executor: futures = {executor.submit(self._check_smart_health, disk): disk for disk in physical_disks} for future in as_completed(futures): disk = futures[future] try: smart_results[disk] = future.result() except Exception as e: logger.error(f"SMART check failed for {disk}: {e}") smart_results[disk] = {'status': 'ERROR', 'issues': [str(e)], 'temp': None, 'attributes': {}} # Build drive reports in original disk order overall_status = 'NORMAL' for disk in physical_disks: drive_report = { 'device': disk, 'partitions': [], 'smart_status': 'UNKNOWN', 'usage_percent': 0 } # Add partition information if available if disk in device_partitions: total_used = 0 total_space = 0 for partition in device_partitions[disk]: try: usage = psutil.disk_usage(partition.mountpoint) total_used += usage.used total_space += usage.total part_info = { 'device': partition.device, 'mountpoint': partition.mountpoint, 'fstype': partition.fstype, 'total_space': self._convert_bytes(usage.total), 'used_space': self._convert_bytes(usage.used), 'free_space': self._convert_bytes(usage.free), 'usage_percent': usage.percent } drive_report['partitions'].append(part_info) except Exception as e: logger.debug(f"Error getting partition usage for {partition.device}: {e}") # Calculate overall drive usage percentage if total_space > 0: drive_report['usage_percent'] = (total_used / total_space) * 100 # Use pre-fetched SMART results smart_health = smart_results.get(disk, {'status': 'ERROR', 'issues': [], 'temp': None, 'attributes': {}}) drive_report.update({ 'smart_status': smart_health['status'], 'smart_issues': smart_health['issues'], 'temperature': smart_health['temp'], 'smart_attributes': smart_health['attributes'] }) # Only report issues for drives that should be monitored if smart_health['status'] == 'UNHEALTHY': overall_status = 'CRITICAL' elif smart_health['status'] == 'ERROR': # Don't escalate overall status for ERROR drives (might be virtual) logger.debug(f"Drive {disk} returned ERROR status, skipping from issue detection") elif smart_health['issues'] and smart_health['status'] not in ['ERROR', 'NOT_SUPPORTED']: if overall_status != 'CRITICAL': overall_status = 'WARNING' drives_health['drives'].append(drive_report) drives_health['overall_status'] = overall_status except Exception as e: logger.error(f"Error checking drives health: {str(e)}") return drives_health # ============================================================================= # SYSTEM HEALTH CHECKING METHODS # ============================================================================= @staticmethod def _convert_bytes(bytes_value: int, suffix: str = 'B') -> str: """ Convert bytes to a human-readable format. :param bytes_value: Number of bytes to convert. :param suffix: Suffix to append (default is 'B' for bytes). :return: Formatted string with the size in human-readable form. """ for unit in ['', 'K', 'M', 'G', 'T', 'P', 'E', 'Z']: if abs(bytes_value) < 1024.0: return f"{bytes_value:.1f}{unit}{suffix}" bytes_value /= 1024.0 return f"{bytes_value:.1f}Y{suffix}" def _convert_size_to_bytes(self, size_str: str) -> float: """Convert size string with units to bytes.""" units = {'B': 1, 'K': 1024, 'M': 1024**2, 'G': 1024**3, 'T': 1024**4} size = float(size_str[:-1]) unit = size_str[-1].upper() return size * units[unit] def _check_memory_usage(self) -> Dict[str, Any]: """Check for ECC memory errors if ECC memory is present.""" memory_health = { 'has_ecc': False, 'ecc_errors': [], 'status': 'OK', 'total_memory': self._convert_bytes(psutil.virtual_memory().total), 'used_memory': self._convert_bytes(psutil.virtual_memory().used), 'memory_percent': psutil.virtual_memory().percent } try: # First check using dmidecode (if available) if self._available_tools.get('dmidecode'): result = subprocess.run( ['dmidecode', '--type', 'memory'], stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, timeout=30 ) if 'Error Correction Type: Multi-bit ECC' in result.stdout: memory_health['has_ecc'] = True # If dmidecode unavailable or didn't find ECC, try the edac method as backup if not memory_health['has_ecc']: edac_path = '/sys/devices/system/edac/mc' if os.path.exists(edac_path) and os.listdir(edac_path): for mc_dir in glob.glob('/sys/devices/system/edac/mc/mc[0-9]*'): if os.path.exists(f"{mc_dir}/csrow0"): memory_health['has_ecc'] = True break # If ECC is present, check for errors if memory_health['has_ecc']: for mc_dir in glob.glob('/sys/devices/system/edac/mc/mc[0-9]*'): if os.path.exists(f"{mc_dir}/csrow0"): ue_count = self._read_ecc_count(f"{mc_dir}/csrow0/ue_count") if ue_count > 0: memory_health['status'] = 'CRITICAL' memory_health['ecc_errors'].append( f"Uncorrectable ECC errors detected in {os.path.basename(mc_dir)}: {ue_count}" ) ce_count = self._read_ecc_count(f"{mc_dir}/csrow0/ce_count") if ce_count > 0: if memory_health['status'] != 'CRITICAL': memory_health['status'] = 'WARNING' memory_health['ecc_errors'].append( f"Correctable ECC errors detected in {os.path.basename(mc_dir)}: {ce_count}" ) except Exception as e: memory_health['status'] = 'ERROR' memory_health['ecc_errors'].append(f"Error checking ECC status: {str(e)}") return memory_health def _read_ecc_count(self, filepath: str) -> int: """ Read ECC error count from a file. :param filepath: Path to the ECC count file :return: Number of ECC errors """ try: with open(filepath, 'r') as f: return int(f.read().strip()) except (IOError, OSError, ValueError) as e: logger.debug(f"Could not read ECC count from {filepath}: {e}") return 0 def _check_cpu_usage(self) -> Dict[str, Any]: """ Check CPU usage and return health metrics. :return: Dictionary with CPU health metrics. """ cpu_usage_percent = psutil.cpu_percent(interval=1) cpu_health = { 'cpu_usage_percent': cpu_usage_percent, 'status': 'OK' if cpu_usage_percent < self.CONFIG['THRESHOLDS']['CPU_WARNING'] else 'WARNING' } return cpu_health def _check_network_status(self) -> Dict[str, Any]: """ Check the status of network interfaces and report any issues. :return: Dictionary containing network health metrics and any issues found. """ network_health = { 'management_network': { 'issues': [], 'status': 'OK', 'latency': None }, 'ceph_network': { 'issues': [], 'status': 'OK', 'latency': None } } try: # Check management network connectivity mgmt_result = subprocess.run( [ "ping", "-c", str(self.CONFIG['NETWORKS']['PING_COUNT']), "-W", str(self.CONFIG['NETWORKS']['PING_TIMEOUT']), self.CONFIG['NETWORKS']['MANAGEMENT'] ], stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, timeout=30 # 30 second timeout for subprocess ) if mgmt_result.returncode != 0: network_health['management_network']['status'] = 'CRITICAL' network_health['management_network']['issues'].append( "Management network is unreachable" ) # Check Ceph network connectivity ceph_result = subprocess.run( [ "ping", "-c", str(self.CONFIG['NETWORKS']['PING_COUNT']), "-W", str(self.CONFIG['NETWORKS']['PING_TIMEOUT']), self.CONFIG['NETWORKS']['CEPH'] ], stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, timeout=30 # 30 second timeout for subprocess ) if ceph_result.returncode != 0: network_health['ceph_network']['status'] = 'CRITICAL' network_health['ceph_network']['issues'].append( "Ceph network is unreachable" ) return network_health except Exception as e: logger.error(f"Network health check failed: {e}") return { 'status': 'ERROR', 'error': str(e) } def _check_ceph_health(self) -> Dict[str, Any]: """ Check Ceph cluster health if this node is part of a Ceph cluster. Returns health status, cluster info, and any issues detected. Cluster-wide issues use [cluster-wide] tag for cross-node deduplication. """ ceph_health = { 'status': 'OK', 'is_ceph_node': False, 'cluster_health': None, 'cluster_usage': None, 'osd_status': [], 'mon_status': [], 'issues': [], 'cluster_wide_issues': [] # Issues affecting entire cluster; use CLUSTER_NAME for dedup } # Check if Ceph monitoring is enabled if not self.CONFIG.get('CEPH_ENABLED', True): logger.debug("Ceph monitoring disabled in config") return ceph_health # Check if ceph CLI is available if not self._available_tools.get('ceph'): logger.debug("Ceph CLI not found - not a Ceph node") return ceph_health ceph_health['is_ceph_node'] = True hostname = socket.gethostname() try: # Get cluster health status health_result = subprocess.run( ['ceph', 'health', '--format=json'], stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, timeout=30 ) if health_result.returncode == 0: try: health_data = json.loads(health_result.stdout) ceph_health['cluster_health'] = health_data.get('status', 'UNKNOWN') # Check cluster health status if ceph_health['cluster_health'] == 'HEALTH_ERR': ceph_health['status'] = 'CRITICAL' # This is a cluster-wide issue ceph_health['cluster_wide_issues'].append( f"Ceph cluster HEALTH_ERR: {health_data.get('summary', {}).get('message', 'Unknown error')}" ) elif ceph_health['cluster_health'] == 'HEALTH_WARN': if ceph_health['status'] != 'CRITICAL': ceph_health['status'] = 'WARNING' # Extract warning messages checks = health_data.get('checks', {}) for check_name, check_data in checks.items(): severity = check_data.get('severity', 'HEALTH_WARN') message = check_data.get('summary', {}).get('message', check_name) ceph_health['cluster_wide_issues'].append( f"Ceph HEALTH_WARN: {message}" ) except json.JSONDecodeError as e: logger.warning(f"Failed to parse ceph health JSON: {e}") # Get cluster usage (ceph df) df_result = subprocess.run( ['ceph', 'df', '--format=json'], stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, timeout=30 ) if df_result.returncode == 0: try: df_data = json.loads(df_result.stdout) stats = df_data.get('stats', {}) total_bytes = stats.get('total_bytes', 0) total_used = stats.get('total_used_raw_bytes', 0) if total_bytes > 0: usage_percent = (total_used / total_bytes) * 100 ceph_health['cluster_usage'] = { 'total_bytes': total_bytes, 'used_bytes': total_used, 'usage_percent': round(usage_percent, 2) } # Check usage thresholds if usage_percent >= self.CONFIG.get('CEPH_USAGE_CRITICAL', 85): ceph_health['status'] = 'CRITICAL' ceph_health['cluster_wide_issues'].append( f"Ceph cluster usage critical: {usage_percent:.1f}%" ) elif usage_percent >= self.CONFIG.get('CEPH_USAGE_WARNING', 70): if ceph_health['status'] != 'CRITICAL': ceph_health['status'] = 'WARNING' ceph_health['cluster_wide_issues'].append( f"Ceph cluster usage warning: {usage_percent:.1f}%" ) except json.JSONDecodeError as e: logger.warning(f"Failed to parse ceph df JSON: {e}") # Get OSD status (check for down OSDs on this node) osd_result = subprocess.run( ['ceph', 'osd', 'tree', '--format=json'], stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, timeout=30 ) if osd_result.returncode == 0: try: osd_data = json.loads(osd_result.stdout) nodes = osd_data.get('nodes', []) # Find OSDs on this host host_id = None for node in nodes: if node.get('type') == 'host' and node.get('name') == hostname: host_id = node.get('id') break # Check OSD status for this host for node in nodes: if node.get('type') == 'osd': osd_info = { 'id': node.get('id'), 'name': node.get('name'), 'status': node.get('status', 'unknown'), 'reweight': node.get('reweight', 1.0) } # Check if OSD belongs to this host (by checking parent in tree) # Simplified: just track all OSDs for now ceph_health['osd_status'].append(osd_info) # Check for down OSDs - this is a cluster-wide issue # All nodes see the same OSD down, so treat as cluster-wide if node.get('status') == 'down': ceph_health['status'] = 'CRITICAL' # Cluster-wide issue - OSD down affects entire cluster # Do NOT include detecting hostname in message to enable deduplication ceph_health['cluster_wide_issues'].append( f"Ceph OSD {node.get('name')} is DOWN" ) except json.JSONDecodeError as e: logger.warning(f"Failed to parse ceph osd tree JSON: {e}") # Get monitor status mon_result = subprocess.run( ['ceph', 'mon', 'stat', '--format=json'], stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, timeout=30 ) if mon_result.returncode == 0: try: mon_data = json.loads(mon_result.stdout) ceph_health['mon_status'] = { 'quorum': mon_data.get('quorum', []), 'quorum_names': mon_data.get('quorum_names', []) } except json.JSONDecodeError as e: logger.warning(f"Failed to parse ceph mon stat JSON: {e}") logger.debug("=== Ceph Health Check ===") logger.debug(f"Is Ceph node: {ceph_health['is_ceph_node']}") logger.debug(f"Cluster health: {ceph_health['cluster_health']}") logger.debug(f"Cluster usage: {ceph_health['cluster_usage']}") logger.debug(f"Status: {ceph_health['status']}") logger.debug(f"Issues: {ceph_health['issues']}") logger.debug(f"Cluster-wide issues: {ceph_health['cluster_wide_issues']}") logger.debug("=== End Ceph Health Check ===") except subprocess.TimeoutExpired: ceph_health['status'] = 'ERROR' ceph_health['issues'].append("Ceph health check timed out") except Exception as e: ceph_health['status'] = 'ERROR' ceph_health['issues'].append(f"Error checking Ceph health: {str(e)}") logger.error(f"Ceph health check failed: {e}") return ceph_health # ============================================================================= # PBS (PROXMOX BACKUP SERVER) HEALTH CHECKS # ============================================================================= def _check_pbs_health(self) -> Dict[str, Any]: """ Check Proxmox Backup Server health including ZFS pools and task status. Returns health status for ZFS pools, failed backup/GC/sync jobs. Only active when PBS_ENABLED=true and relevant tools are available. """ pbs_health = { 'status': 'OK', 'is_pbs_node': False, 'zfs_pools': [], 'failed_tasks': [], 'issues': [] } if not self.CONFIG.get('PBS_ENABLED', False): logger.debug("PBS monitoring disabled in config") return pbs_health if not self._available_tools.get('zpool'): logger.debug("zpool not available - skipping PBS ZFS checks") return pbs_health pbs_health['is_pbs_node'] = True # Check ZFS pool status try: result = subprocess.run( ['zpool', 'status', '-p'], stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, timeout=30 ) if result.returncode == 0: current_pool = None for line in result.stdout.splitlines(): line_stripped = line.strip() if line_stripped.startswith('pool:'): current_pool = line_stripped.split(':', 1)[1].strip() elif line_stripped.startswith('state:') and current_pool: state = line_stripped.split(':', 1)[1].strip() if state != 'ONLINE': pbs_health['status'] = 'CRITICAL' pbs_health['issues'].append({ 'type': 'PBS_ZFS_DEGRADED', 'severity': 'CRITICAL', 'device': current_pool, 'issue': f"ZFS pool '{current_pool}' state: {state}" }) elif line_stripped.startswith('errors:') and current_pool: if 'No known data errors' not in line_stripped: pbs_health['issues'].append({ 'type': 'PBS_ZFS_ERRORS', 'severity': 'WARNING', 'device': current_pool, 'issue': f"ZFS pool '{current_pool}' has errors: {line_stripped}" }) except subprocess.TimeoutExpired: logger.warning("zpool status timed out") except Exception as e: logger.error(f"Error checking ZFS pool status: {e}") # Check ZFS pool usage try: result = subprocess.run( ['zpool', 'list', '-Hp'], stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, timeout=30 ) if result.returncode == 0: for line in result.stdout.splitlines(): parts = line.split('\t') if len(parts) >= 8: pool_name = parts[0] try: total_bytes = int(parts[1]) used_bytes = int(parts[2]) usage_pct = (used_bytes / total_bytes * 100) if total_bytes > 0 else 0 except (ValueError, ZeroDivisionError): continue pool_info = { 'name': pool_name, 'total': self._convert_bytes(total_bytes), 'used': self._convert_bytes(used_bytes), 'usage_percent': round(usage_pct, 1), 'health': parts[9] if len(parts) > 9 else 'UNKNOWN' } pbs_health['zfs_pools'].append(pool_info) if usage_pct >= self.CONFIG['PBS_ZFS_CRITICAL']: pbs_health['status'] = 'CRITICAL' pbs_health['issues'].append({ 'type': 'PBS_ZFS_USAGE_CRITICAL', 'severity': 'CRITICAL', 'device': pool_name, 'issue': f"ZFS pool '{pool_name}' usage critical: {usage_pct:.1f}%" }) elif usage_pct >= self.CONFIG['PBS_ZFS_WARNING']: if pbs_health['status'] != 'CRITICAL': pbs_health['status'] = 'WARNING' pbs_health['issues'].append({ 'type': 'PBS_ZFS_USAGE_WARNING', 'severity': 'WARNING', 'device': pool_name, 'issue': f"ZFS pool '{pool_name}' usage high: {usage_pct:.1f}%" }) except subprocess.TimeoutExpired: logger.warning("zpool list timed out") except Exception as e: logger.error(f"Error checking ZFS pool usage: {e}") # Check failed PBS tasks (requires proxmox-backup-manager) if self._available_tools.get('proxmox-backup-manager'): try: result = subprocess.run( ['proxmox-backup-manager', 'task', 'list', '--output-format', 'json'], stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, timeout=30 ) if result.returncode == 0: try: tasks = json.loads(result.stdout) for task in tasks: task_status = task.get('status', '') task_type = task.get('worker_type', '') task_id = task.get('worker_id', '') if task_status and task_status != 'OK': failed_task = { 'type': task_type, 'id': task_id, 'status': task_status, 'starttime': task.get('starttime', ''), 'endtime': task.get('endtime', '') } pbs_health['failed_tasks'].append(failed_task) # Categorize by task type if 'backup' in task_type.lower(): issue_type = 'PBS_BACKUP_FAILED' severity = 'CRITICAL' elif 'gc' in task_type.lower() or 'garbage' in task_type.lower(): issue_type = 'PBS_GC_FAILED' severity = 'WARNING' elif 'sync' in task_type.lower(): issue_type = 'PBS_SYNC_FAILED' severity = 'WARNING' else: issue_type = 'PBS_BACKUP_FAILED' severity = 'WARNING' pbs_health['issues'].append({ 'type': issue_type, 'severity': severity, 'device': f"task-{task_type}", 'issue': f"PBS {task_type} failed: {task_id} - {task_status}" }) if severity == 'CRITICAL': pbs_health['status'] = 'CRITICAL' elif pbs_health['status'] == 'OK': pbs_health['status'] = 'WARNING' except json.JSONDecodeError as e: logger.warning(f"Failed to parse PBS task list JSON: {e}") except subprocess.TimeoutExpired: logger.warning("proxmox-backup-manager task list timed out") except Exception as e: logger.error(f"Error checking PBS tasks: {e}") return pbs_health # ============================================================================= # PROMETHEUS METRICS EXPORT # ============================================================================= def export_prometheus_metrics(self, health_report: Dict[str, Any]) -> str: """ Export health report as Prometheus metrics in text format. Metrics follow Prometheus naming conventions: - hwmon_* prefix for all metrics - Labels for dimensions (device, hostname, container, etc.) Returns: str: Prometheus text format metrics """ hostname = health_report.get('hostname', socket.gethostname()) metrics = [] # Helper to format labels with proper Prometheus escaping def labels(**kwargs) -> str: def escape(value): return str(value).replace('\\', '\\\\').replace('"', '\\"').replace('\n', '\\n') pairs = [f'{k}="{escape(v)}"' for k, v in kwargs.items() if v is not None] return '{' + ','.join(pairs) + '}' if pairs else '' # === System Info === metrics.append('# HELP hwmon_info System information') metrics.append('# TYPE hwmon_info gauge') metrics.append(f'hwmon_info{labels(hostname=hostname)} 1') # === Drive Metrics === metrics.append('# HELP hwmon_drive_smart_healthy SMART health status (1=healthy, 0=unhealthy)') metrics.append('# TYPE hwmon_drive_smart_healthy gauge') metrics.append('# HELP hwmon_drive_temperature_celsius Drive temperature in Celsius') metrics.append('# TYPE hwmon_drive_temperature_celsius gauge') metrics.append('# HELP hwmon_drive_size_bytes Drive total size in bytes') metrics.append('# TYPE hwmon_drive_size_bytes gauge') metrics.append('# HELP hwmon_drive_smart_issues_total Number of SMART issues detected') metrics.append('# TYPE hwmon_drive_smart_issues_total gauge') for drive in health_report.get('drives_health', {}).get('drives', []): device = drive.get('device', 'unknown') drive_labels = labels(hostname=hostname, device=device) # SMART health status smart_status = drive.get('smart_status', 'UNKNOWN') healthy = 1 if smart_status == 'HEALTHY' else 0 metrics.append(f'hwmon_drive_smart_healthy{drive_labels} {healthy}') # Temperature if drive.get('temperature'): metrics.append(f'hwmon_drive_temperature_celsius{drive_labels} {drive["temperature"]}') # Drive size (convert human-readable to bytes if possible) if drive.get('capacity'): capacity_bytes = self._parse_size_to_bytes(drive['capacity']) if capacity_bytes: metrics.append(f'hwmon_drive_size_bytes{drive_labels} {capacity_bytes}') # Issue count issues_count = len(drive.get('smart_issues', [])) metrics.append(f'hwmon_drive_smart_issues_total{drive_labels} {issues_count}') # === CPU Metrics === cpu = health_report.get('cpu_health', {}) metrics.append('# HELP hwmon_cpu_usage_percent CPU usage percentage') metrics.append('# TYPE hwmon_cpu_usage_percent gauge') if cpu.get('cpu_usage_percent') is not None: metrics.append(f'hwmon_cpu_usage_percent{labels(hostname=hostname)} {cpu["cpu_usage_percent"]}') # === Memory Metrics === mem = health_report.get('memory_health', {}) metrics.append('# HELP hwmon_memory_usage_percent Memory usage percentage') metrics.append('# TYPE hwmon_memory_usage_percent gauge') if mem.get('memory_percent') is not None: metrics.append(f'hwmon_memory_usage_percent{labels(hostname=hostname)} {mem["memory_percent"]}') metrics.append('# HELP hwmon_memory_has_ecc Whether ECC memory is present (1=yes, 0=no)') metrics.append('# TYPE hwmon_memory_has_ecc gauge') has_ecc = 1 if mem.get('has_ecc') else 0 metrics.append(f'hwmon_memory_has_ecc{labels(hostname=hostname)} {has_ecc}') if mem.get('has_ecc'): metrics.append('# HELP hwmon_memory_ecc_errors_total Total ECC errors detected') metrics.append('# TYPE hwmon_memory_ecc_errors_total gauge') ecc_errors = len(mem.get('ecc_errors', [])) metrics.append(f'hwmon_memory_ecc_errors_total{labels(hostname=hostname)} {ecc_errors}') # === Network Metrics === net = health_report.get('network_health', {}) metrics.append('# HELP hwmon_network_status Network status (1=OK, 0=issue)') metrics.append('# TYPE hwmon_network_status gauge') for net_type in ['management_network', 'ceph_network']: net_info = net.get(net_type, {}) status = 1 if net_info.get('status') == 'OK' else 0 net_name = net_type.replace('_network', '') metrics.append(f'hwmon_network_status{labels(hostname=hostname, network=net_name)} {status}') # === Ceph Metrics === ceph = health_report.get('ceph_health', {}) if ceph.get('is_ceph_node'): metrics.append('# HELP hwmon_ceph_cluster_healthy Ceph cluster health (1=healthy, 0=warning/error)') metrics.append('# TYPE hwmon_ceph_cluster_healthy gauge') ceph_healthy = 1 if ceph.get('cluster_health') == 'HEALTH_OK' else 0 metrics.append(f'hwmon_ceph_cluster_healthy{labels(hostname=hostname)} {ceph_healthy}') if ceph.get('cluster_usage'): usage = ceph['cluster_usage'] metrics.append('# HELP hwmon_ceph_cluster_usage_percent Ceph cluster usage percentage') metrics.append('# TYPE hwmon_ceph_cluster_usage_percent gauge') metrics.append(f'hwmon_ceph_cluster_usage_percent{labels(hostname=hostname)} {usage.get("usage_percent", 0)}') metrics.append('# HELP hwmon_ceph_cluster_bytes_total Ceph cluster total bytes') metrics.append('# TYPE hwmon_ceph_cluster_bytes_total gauge') metrics.append(f'hwmon_ceph_cluster_bytes_total{labels(hostname=hostname)} {usage.get("total_bytes", 0)}') metrics.append('# HELP hwmon_ceph_cluster_bytes_used Ceph cluster used bytes') metrics.append('# TYPE hwmon_ceph_cluster_bytes_used gauge') metrics.append(f'hwmon_ceph_cluster_bytes_used{labels(hostname=hostname)} {usage.get("used_bytes", 0)}') metrics.append('# HELP hwmon_ceph_osd_total Total number of OSDs') metrics.append('# TYPE hwmon_ceph_osd_total gauge') osd_count = len(ceph.get('osd_status', [])) metrics.append(f'hwmon_ceph_osd_total{labels(hostname=hostname)} {osd_count}') metrics.append('# HELP hwmon_ceph_osd_down Number of down OSDs') metrics.append('# TYPE hwmon_ceph_osd_down gauge') down_osds = len([o for o in ceph.get('osd_status', []) if o.get('status') == 'down']) metrics.append(f'hwmon_ceph_osd_down{labels(hostname=hostname)} {down_osds}') # === LXC Metrics === lxc = health_report.get('lxc_health', {}) if lxc.get('containers'): metrics.append('# HELP hwmon_lxc_storage_usage_percent LXC container storage usage percentage') metrics.append('# TYPE hwmon_lxc_storage_usage_percent gauge') for container in lxc['containers']: vmid = container.get('vmid', 'unknown') for fs in container.get('filesystems', []): mountpoint = fs.get('mountpoint', '/') usage = fs.get('usage_percent', 0) metrics.append(f'hwmon_lxc_storage_usage_percent{labels(hostname=hostname, vmid=vmid, mountpoint=mountpoint)} {usage}') # === PBS Metrics === pbs = health_report.get('pbs_health', {}) if pbs.get('is_pbs_node'): metrics.append('# HELP hwmon_pbs_zfs_usage_percent PBS ZFS pool usage percentage') metrics.append('# TYPE hwmon_pbs_zfs_usage_percent gauge') for pool in pbs.get('zfs_pools', []): metrics.append(f'hwmon_pbs_zfs_usage_percent{labels(hostname=hostname, pool=pool["name"])} {pool["usage_percent"]}') metrics.append('# HELP hwmon_pbs_failed_tasks_total PBS failed task count') metrics.append('# TYPE hwmon_pbs_failed_tasks_total gauge') metrics.append(f'hwmon_pbs_failed_tasks_total{labels(hostname=hostname)} {len(pbs.get("failed_tasks", []))}') # === Issue Summary Metrics === metrics.append('# HELP hwmon_issues_total Total number of issues detected') metrics.append('# TYPE hwmon_issues_total gauge') system_issues = len(health_report.get('system_health', {}).get('issues', [])) ceph_issues = len(ceph.get('issues', [])) + len(ceph.get('cluster_wide_issues', [])) lxc_issues = len(lxc.get('issues', [])) pbs_issues = len(pbs.get('issues', [])) total_issues = system_issues + ceph_issues + lxc_issues + pbs_issues metrics.append(f'hwmon_issues_total{labels(hostname=hostname)} {total_issues}') return '\n'.join(metrics) + '\n' def _parse_size_to_bytes(self, size_str: str) -> int: """Parse human-readable size string to bytes.""" if not size_str: return 0 size_str = size_str.strip().upper() multipliers = { 'B': 1, 'KB': 1024, 'MB': 1024**2, 'GB': 1024**3, 'TB': 1024**4, 'PB': 1024**5, 'K': 1024, 'M': 1024**2, 'G': 1024**3, 'T': 1024**4, 'P': 1024**5 } try: for suffix, mult in sorted(multipliers.items(), key=lambda x: -len(x[0])): if size_str.endswith(suffix): num = float(size_str[:-len(suffix)].strip()) return int(num * mult) return int(float(size_str)) except (ValueError, TypeError): return 0 def write_prometheus_metrics(self, health_report: Dict[str, Any]) -> bool: """ Write Prometheus metrics to configured destination. If PROMETHEUS_TEXTFILE_PATH is set, writes to that file for node_exporter. Otherwise, logs the metrics (for debugging or other use). Returns: bool: True if metrics were written successfully """ if not self.CONFIG.get('PROMETHEUS_ENABLED', False): return False try: metrics = self.export_prometheus_metrics(health_report) textfile_path = self.CONFIG.get('PROMETHEUS_TEXTFILE_PATH') if textfile_path: # Write to textfile for node_exporter textfile collector # Write to temp file first, then atomic rename import tempfile temp_fd, temp_path = tempfile.mkstemp( dir=os.path.dirname(textfile_path), prefix='.hwmon_metrics_' ) try: with os.fdopen(temp_fd, 'w') as f: f.write(metrics) os.rename(temp_path, textfile_path) logger.info(f"Prometheus metrics written to {textfile_path}") except Exception: os.unlink(temp_path) raise else: # Just log metrics (for debugging) logger.debug("Prometheus metrics generated:\n" + metrics) return True except Exception as e: logger.error(f"Failed to write Prometheus metrics: {e}") return False def _check_lxc_storage(self) -> Dict[str, Any]: """ Check storage utilization for all running LXC containers """ logger.debug("Starting LXC storage check") lxc_health = { 'status': 'OK', 'containers': [], 'issues': [] } if not self._available_tools.get('pct'): logger.debug("pct not available - not a PVE node or pve-container not installed") return lxc_health try: result = subprocess.run( ['pct', 'list'], stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, timeout=30 # 30 second timeout ) logger.debug(f"pct list output:\n{result.stdout}") for line in result.stdout.split('\n')[1:]: if not line.strip(): continue parts = line.split() if len(parts) < 2: logger.debug(f"Skipping invalid line: {line}") continue vmid, status = parts[0], parts[1] if status.lower() == 'running': logger.debug(f"Checking container {vmid} disk usage") disk_info = subprocess.run( ['pct', 'df', vmid], stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, timeout=30 # 30 second timeout per container ) container_info = { 'vmid': vmid, 'filesystems': [] } for fs_line in disk_info.stdout.split('\n')[1:]: if not fs_line.strip() or 'MP' in fs_line: continue # Parse df output using regex for reliable column extraction match = re.match( r'(\S+)\s+(\S+)\s+(\S+)\s+(\S+)\s+(\S+)\s+(\d+\.?\d*)%?\s+(.*)', fs_line.strip() ) if not match: logger.debug(f"Could not parse filesystem line: {fs_line}") continue pool, device_col, total_str, used_str, avail_str, percent_str, mountpoint = match.groups() try: # Skip excluded mounts if pool.startswith('appPool:') or '/mnt/pve/mediaf' in device_col: continue mountpoint = mountpoint.strip() # Skip excluded mountpoints if self._is_excluded_mount(mountpoint): logger.debug(f"Skipping excluded mount: {mountpoint}") continue # Parse size values from named regex groups total_space = self._parse_size(total_str) used_space = self._parse_size(used_str) available_space = self._parse_size(avail_str) # Parse percentage from regex group try: usage_percent = float(percent_str) except ValueError: # Calculate percentage if parsing fails usage_percent = (used_space / total_space * 100) if total_space > 0 else 0 filesystem = { 'mountpoint': mountpoint, 'total_space': total_space, 'used_space': used_space, 'available': available_space, 'usage_percent': usage_percent } container_info['filesystems'].append(filesystem) # Check thresholds if usage_percent >= self.CONFIG['THRESHOLDS']['LXC_CRITICAL']: lxc_health['status'] = 'CRITICAL' issue = f"LXC {vmid} critical storage usage: {usage_percent:.1f}% on {mountpoint}" lxc_health['issues'].append(issue) elif usage_percent >= self.CONFIG['THRESHOLDS']['LXC_WARNING']: if lxc_health['status'] != 'CRITICAL': lxc_health['status'] = 'WARNING' issue = f"LXC {vmid} high storage usage: {usage_percent:.1f}% on {mountpoint}" lxc_health['issues'].append(issue) logger.debug(f"Filesystem details: {filesystem}") except Exception as e: logger.debug(f"Error processing line: {str(e)}") logger.debug(f"Full exception: {repr(e)}") continue # Only add container info if we have filesystem data if container_info['filesystems']: lxc_health['containers'].append(container_info) logger.debug(f"Added container info for VMID {vmid}") logger.debug("=== LXC Storage Check Summary ===") logger.debug(f"Status: {lxc_health['status']}") logger.debug(f"Total containers checked: {len(lxc_health['containers'])}") logger.debug(f"Issues found: {len(lxc_health['issues'])}") logger.debug("=== End LXC Storage Check ===") except Exception as e: logger.debug(f"Critical error during LXC storage check: {str(e)}") lxc_health['status'] = 'ERROR' error_msg = f"Error checking LXC storage: {str(e)}" lxc_health['issues'].append(error_msg) return lxc_health def main(): parser = argparse.ArgumentParser(description="System Health Monitor") parser.add_argument( "--dry-run", action="store_true", help="Enable dry-run mode (simulate ticket creation without actual API calls)." ) parser.add_argument( "--metrics", action="store_true", help="Output Prometheus metrics to stdout and exit." ) parser.add_argument( "--export-json", type=str, metavar="FILE", help="Export health report to JSON file." ) parser.add_argument( "-v", "--verbose", action="store_true", help="Enable verbose (DEBUG) logging output." ) parser.add_argument( "--health-server", action="store_true", help="Start HTTP health check endpoint (default port 9102)." ) args = parser.parse_args() monitor = SystemHealthMonitor( ticket_api_url=SystemHealthMonitor.CONFIG['TICKET_API_URL'], dry_run=args.dry_run, verbose=args.verbose ) # Start health server if requested via CLI or .env if args.health_server or monitor.CONFIG.get('HEALTH_SERVER_ENABLED', False): monitor._start_health_server() if args.metrics: # Just output metrics to stdout health_report = monitor.perform_health_checks() print(monitor.export_prometheus_metrics(health_report)) elif args.export_json: # Export health report as JSON import json health_report = monitor.perform_health_checks() with open(args.export_json, 'w') as f: json.dump(health_report, f, indent=2, default=str) logger.info(f"Health report exported to {args.export_json}") else: monitor.run() if __name__ == "__main__": main()