Files
hwmonDaemon/hwmonDaemon.py
Jared Vititoe 7383a0c674 Escape special characters in Prometheus metric labels
Add escape function to sanitize backslashes, double quotes, and newlines
in label values per Prometheus text format spec. Prevents corrupted
metrics output from model names or paths containing these characters.

Resolves #10

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-10 12:57:37 -05:00

3543 lines
162 KiB
Python

#!/usr/bin/env python3
import os, sys, json, requests, psutil, socket, subprocess, logging, argparse, urllib.request, re, glob, datetime, fcntl, textwrap
from typing import Dict, Any, List
# =============================================================================
# LOGGING SETUP
# =============================================================================
logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)
console_handler = logging.StreamHandler()
console_handler.setLevel(logging.DEBUG)
formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
console_handler.setFormatter(formatter)
logger.addHandler(console_handler)
class SystemHealthMonitor:
# =============================================================================
# CLASS CONSTANTS AND CONFIGURATION
# =============================================================================
STANDARD_WIDTH = 80
PRIORITIES = {
'CRITICAL': '1', # P1 - Cluster outages, total system failure
'HIGH': '2', # P2 - Hardware failures, same-day response
'MEDIUM': '3', # P3 - Warnings, 1-3 day response
'NORMAL': '4', # P4 - Standard monitoring alerts
'LOW': '5' # P5 - Informational, minimal impact
}
ISSUE_PRIORITIES = {
# P1 - Critical System Issues (cluster-wide impact)
'CLUSTER_FAILURE': PRIORITIES['CRITICAL'],
'MULTIPLE_DRIVE_FAILURE': PRIORITIES['CRITICAL'],
'RAID_DEGRADED': PRIORITIES['CRITICAL'],
# P2 - Hardware Failures (same-day response)
'SMART_FAILURE': PRIORITIES['HIGH'],
'SMART_CRITICAL': PRIORITIES['HIGH'],
'DISK_CRITICAL': PRIORITIES['HIGH'],
'UNCORRECTABLE_ECC': PRIORITIES['HIGH'],
'NETWORK_FAILURE': PRIORITIES['HIGH'],
'TEMPERATURE_CRITICAL': PRIORITIES['HIGH'],
'SSD_WEAR_CRITICAL': PRIORITIES['HIGH'],
'NVME_SPARE_CRITICAL': PRIORITIES['HIGH'],
'FIRMWARE_CRITICAL': PRIORITIES['HIGH'],
'REALLOCATED_SECTOR': PRIORITIES['HIGH'],
'PENDING_SECTOR': PRIORITIES['HIGH'],
# P3 - Warnings (1-3 day response)
'SMART_WARNING': PRIORITIES['MEDIUM'],
'DISK_WARNING': PRIORITIES['MEDIUM'],
'CORRECTABLE_ECC': PRIORITIES['MEDIUM'],
'TEMPERATURE_WARNING': PRIORITIES['MEDIUM'],
'SSD_WEAR_WARNING': PRIORITIES['MEDIUM'],
'NVME_SPARE_WARNING': PRIORITIES['MEDIUM'],
'LXC_STORAGE_CRITICAL': PRIORITIES['MEDIUM'],
'TREND_ALERT': PRIORITIES['MEDIUM'],
# P4 - Normal Monitoring (standard response)
'CPU_HIGH': PRIORITIES['NORMAL'],
'LXC_STORAGE_WARNING': PRIORITIES['NORMAL'],
'SYSTEM_LOG_WARNING': PRIORITIES['NORMAL'],
'DRIVE_AGE_WARNING': PRIORITIES['NORMAL'],
# P5 - Informational (minimal impact)
'TEMPERATURE_INFO': PRIORITIES['LOW'],
'DRIVE_AGE_INFO': PRIORITIES['LOW'],
'SSD_WEAR_INFO': PRIORITIES['LOW'],
'SYSTEM_LOG_INFO': PRIORITIES['LOW'],
# Ceph cluster issues
'CEPH_HEALTH_ERR': PRIORITIES['CRITICAL'], # P1 - Cluster in error state
'CEPH_HEALTH_WARN': PRIORITIES['MEDIUM'], # P3 - Cluster warnings
'CEPH_OSD_DOWN': PRIORITIES['HIGH'], # P2 - OSD down (local node)
'CEPH_USAGE_CRITICAL': PRIORITIES['HIGH'], # P2 - Cluster near full
'CEPH_USAGE_WARNING': PRIORITIES['MEDIUM'], # P3 - Cluster usage high
'CEPH_PG_DEGRADED': PRIORITIES['HIGH'], # P2 - PGs degraded
'CEPH_MON_DOWN': PRIORITIES['HIGH'] # P2 - Monitor down
}
CONFIG = {
'TICKET_API_URL': 'http://10.10.10.45/create_ticket_api.php',
'TICKET_API_KEY': None, # Will be loaded from .env file
'THRESHOLDS': {
'DISK_CRITICAL': 90,
'DISK_WARNING': 80,
'LXC_CRITICAL': 90,
'LXC_WARNING': 80,
'CPU_WARNING': 95,
'TEMPERATURE_WARNING': 65
},
'NETWORKS': {
'MANAGEMENT': '10.10.10.1',
'CEPH': '10.10.90.1',
'PING_TIMEOUT': 1,
'PING_COUNT': 1
},
'EXCLUDED_MOUNTS': [
'/media',
'/mnt/pve/mediafs',
'/opt/metube_downloads'
],
'EXCLUDED_PATTERNS': [
r'/media.*',
r'/mnt/pve/mediafs.*',
r'.*/media$',
r'.*mediafs.*',
r'.*/downloads.*'
],
'HISTORY_DIR': '/var/log/hwmonDaemon',
'HISTORY_RETENTION_DAYS': 30,
'INCLUDE_INFO_TICKETS': False, # Set True to create P5 tickets for INFO alerts
'PRIORITY_ESCALATION_THRESHOLD': 3, # Number of criticals to trigger P1
# Ceph monitoring settings
'CEPH_ENABLED': True, # Enable/disable Ceph health monitoring
'CEPH_TICKET_NODE': None, # Hostname of node designated to create cluster-wide Ceph tickets
'CEPH_USAGE_WARNING': 70, # Ceph cluster usage warning threshold %
'CEPH_USAGE_CRITICAL': 85, # Ceph cluster usage critical threshold %
# Cluster identification for tickets
'CLUSTER_NAME': 'proxmox-cluster', # Name used in cluster-wide ticket titles instead of hostname
# Prometheus metrics settings
'PROMETHEUS_ENABLED': False, # Enable Prometheus metrics export
'PROMETHEUS_PORT': 9101, # Port for Prometheus metrics HTTP server
'PROMETHEUS_TEXTFILE_PATH': None, # Path for textfile collector (alternative to HTTP)
# SMART analysis thresholds
'NEW_DRIVE_HOURS_THRESHOLD': 720, # Hours to consider a drive "new" (~30 days)
'SMART_ERROR_RECENT_HOURS': 168 # Hours window for recent SMART errors (~1 week)
}
@classmethod
def load_env_config(cls):
"""Load configuration from .env file in /etc/hwmonDaemon/"""
# Check for .env file in standard system location
env_file = '/etc/hwmonDaemon/.env'
if not os.path.exists(env_file):
logger.warning(f".env file not found at {env_file} - API key required for ticket creation")
return
try:
with open(env_file, 'r') as f:
for line in f:
line = line.strip()
# Skip empty lines and comments
if not line or line.startswith('#'):
continue
# Parse KEY=VALUE format
if '=' in line:
key, value = line.split('=', 1)
key = key.strip()
value = value.strip()
# Update CONFIG if key exists
if key == 'TICKET_API_KEY':
cls.CONFIG['TICKET_API_KEY'] = value
logger.info("✓ Loaded TICKET_API_KEY from .env")
elif key == 'TICKET_API_URL':
cls.CONFIG['TICKET_API_URL'] = value
logger.info(f"✓ Loaded TICKET_API_URL: {value}")
# Ceph settings
elif key == 'CEPH_ENABLED':
cls.CONFIG['CEPH_ENABLED'] = value.lower() in ('true', '1', 'yes')
logger.info(f"✓ Loaded CEPH_ENABLED: {cls.CONFIG['CEPH_ENABLED']}")
elif key == 'CEPH_TICKET_NODE':
cls.CONFIG['CEPH_TICKET_NODE'] = value if value else None
logger.info(f"✓ Loaded CEPH_TICKET_NODE: {value}")
elif key == 'CEPH_USAGE_WARNING':
cls.CONFIG['CEPH_USAGE_WARNING'] = int(value)
elif key == 'CEPH_USAGE_CRITICAL':
cls.CONFIG['CEPH_USAGE_CRITICAL'] = int(value)
# Prometheus settings
elif key == 'PROMETHEUS_ENABLED':
cls.CONFIG['PROMETHEUS_ENABLED'] = value.lower() in ('true', '1', 'yes')
logger.info(f"✓ Loaded PROMETHEUS_ENABLED: {cls.CONFIG['PROMETHEUS_ENABLED']}")
elif key == 'PROMETHEUS_PORT':
cls.CONFIG['PROMETHEUS_PORT'] = int(value)
elif key == 'PROMETHEUS_TEXTFILE_PATH':
cls.CONFIG['PROMETHEUS_TEXTFILE_PATH'] = value if value else None
logger.info(f"✓ Loaded PROMETHEUS_TEXTFILE_PATH: {value}")
# Cluster identification
elif key == 'CLUSTER_NAME':
cls.CONFIG['CLUSTER_NAME'] = value if value else 'proxmox-cluster'
logger.info(f"✓ Loaded CLUSTER_NAME: {value}")
elif key == 'NEW_DRIVE_HOURS_THRESHOLD':
cls.CONFIG['NEW_DRIVE_HOURS_THRESHOLD'] = int(value)
elif key == 'SMART_ERROR_RECENT_HOURS':
cls.CONFIG['SMART_ERROR_RECENT_HOURS'] = int(value)
except Exception as e:
logger.error(f"Failed to load .env file: {e}")
TICKET_TEMPLATES = {
'ACTION_TYPE': {
'AUTO': '[auto]',
'MANUAL': '[manual]'
},
'ENVIRONMENT': {
'PRODUCTION': '[production]'
},
'TICKET_TYPE': {
'ISSUE': '[issue]', # General issue (replaces invalid 'incident')
'PROBLEM': '[problem]', # Root cause investigation
'TASK': '[task]', # Planned work item
'MAINTENANCE': '[maintenance]', # Scheduled/preventive work
'UPGRADE': '[upgrade]' # Hardware/software upgrade
},
'HARDWARE_TYPE': {
'HARDWARE': '[hardware]'
},
'SOFTWARE_TYPE': {
'SOFTWARE': '[software]'
},
'NETWORK_TYPE': {
'NETWORK': '[network]'
},
'SCOPE': {
'SINGLE_NODE': '[single-node]',
'CLUSTER_WIDE': '[cluster-wide]'
}
}
# Category and Type mappings for ticket API
TICKET_CATEGORIES = {
'HARDWARE': 'Hardware',
'SOFTWARE': 'Software'
}
TICKET_TYPES = {
'ISSUE': 'Issue', # General issue/incident
'PROBLEM': 'Problem', # Root cause investigation needed
'TASK': 'Task', # Planned work item
'MAINTENANCE': 'Maintenance', # Scheduled/preventive work
'UPGRADE': 'Upgrade', # Hardware/software upgrade
'INSTALL': 'Install', # New installation
'REQUEST': 'Request' # Service or information request
}
PROBLEMATIC_FIRMWARE = {
'Samsung': {
'EVO860': ['RVT01B6Q', 'RVT02B6Q'], # Known issues with sudden performance drops
'EVO870': ['SVT01B6Q'],
'PM883': ['HXT7404Q'] # Known issues with TRIM
},
'Seagate': {
'ST8000NM': ['CC64'], # Known issues with NCQ
'ST12000NM': ['SN02']
},
'WDC': {
'WD121KRYZ': ['01.01A01'], # RAID rebuild issues
'WD141KRYZ': ['02.01A02']
}
}
MANUFACTURER_SMART_PROFILES = {
'Western Digital': {
'aliases': ['WDC', 'Western Digital', 'HGST', 'Ultrastar'],
'attributes': {
'Raw_Read_Error_Rate': {
'monitor': False,
'description': 'WD drives use this as operation counter, not error count'
},
'Seek_Error_Rate': {
'monitor': False,
'description': 'WD drives use this as operation counter, not error count'
}
}
},
'Seagate': {
'aliases': ['Seagate', 'ST'],
'attributes': {
'Raw_Read_Error_Rate': {
'monitor': False,
'description': 'Seagate drives use this as operation counter'
}
}
},
'Ridata': {
'aliases': ['Ridata', 'Ritek', 'RIDATA', 'RITEK', 'SSD 512GB'],
'firmware_patterns': ['HT3618B7', 'HT36'],
'wear_leveling_behavior': 'countup',
'wear_leveling_baseline': 0,
'wear_leveling_thresholds': {
'warning': 1000000000, # 1 billion - very conservative
'critical': 2000000000 # 2 billion - extremely conservative
},
'attributes': {
'Wear_Leveling_Count': {
'behavior': 'countup',
'baseline': 0,
'warning_threshold': 1000000000,
'critical_threshold': 2000000000,
'description': 'Total wear leveling operations (countup from 0)',
'ignore_on_new_drive': False,
'monitor': True # Include in health checks
},
# These are operation counters, NOT actual failures - ignore completely
'Erase_Fail_Count_Chip': {
'monitor': False, # Skip monitoring entirely
'description': 'Operation counter, not actual failures - IGNORED'
},
'Program_Fail_Count_Chip': {
'monitor': False, # Skip monitoring entirely
'description': 'Operation counter, not actual failures - IGNORED'
},
# ADD THIS: Regular Erase_Fail_Count is also an operation counter for Ridata
'Erase_Fail_Count': {
'monitor': False, # Skip monitoring entirely for Ridata
'description': 'Operation counter for Ridata drives, not actual failures - IGNORED'
},
'Program_Fail_Count': {
'monitor': False, # Skip monitoring entirely for Ridata
'description': 'Operation counter for Ridata drives, not actual failures - IGNORED'
},
# These are the REAL failure counters - monitor with standard thresholds
'Program_Fail_Cnt_Total': {
'monitor': True,
'behavior': 'countup',
'baseline': 0,
'warning_threshold': 1, # Any failures are concerning
'critical_threshold': 5,
'description': 'Actual program failures (real failures)'
},
'Erase_Fail_Count_Total': {
'monitor': True,
'behavior': 'countup',
'baseline': 0,
'warning_threshold': 1, # Any failures are concerning
'critical_threshold': 5,
'description': 'Actual erase failures (real failures)'
}
}
},
'OOS': {
'aliases': ['OOS12000G', 'OOS'],
'attributes': {
# These drives seem to report very high error rates normally
'Raw_Read_Error_Rate': {
'monitor': False, # Skip monitoring - seems to be a counter
'description': 'OOS drives report high values normally'
},
'Seek_Error_Rate': {
'monitor': False, # Skip monitoring - seems to be a counter
'description': 'OOS drives report high values normally'
},
'Command_Timeout': {
'warning_threshold': 100000000000, # 100 billion
'critical_threshold': 200000000000, # 200 billion
'description': 'OOS drives report very high timeout counters'
}
}
},
'Samsung': {
'aliases': ['Samsung', 'SAMSUNG'],
'wear_leveling_behavior': 'countup',
'wear_leveling_baseline': 0,
'wear_leveling_thresholds': {
'warning': 2000,
'critical': 3000
},
'attributes': {
'Wear_Leveling_Count': {
'behavior': 'countup',
'baseline': 0,
'warning_threshold': 2000,
'critical_threshold': 3000,
'description': 'Total wear leveling operations performed',
'monitor': True
},
# Standard monitoring for all other attributes
'Program_Fail_Count': {
'monitor': True,
'warning_threshold': 10,
'critical_threshold': 20
},
'Erase_Fail_Count': {
'monitor': True,
'warning_threshold': 10,
'critical_threshold': 20
}
}
},
'Intel': {
'aliases': ['Intel', 'INTEL'],
'wear_leveling_behavior': 'percentage',
'wear_leveling_baseline': 100,
'wear_leveling_thresholds': {
'warning': 30,
'critical': 10
},
'attributes': {
'Media_Wearout_Indicator': {
'behavior': 'countdown',
'baseline': 100,
'warning_threshold': 30,
'critical_threshold': 10,
'description': 'Percentage of rated life remaining',
'monitor': True
}
}
},
'Micron': {
'aliases': ['Micron', 'MICRON', 'Crucial', 'CRUCIAL'],
'wear_leveling_behavior': 'percentage',
'wear_leveling_baseline': 100,
'wear_leveling_thresholds': {
'warning': 30,
'critical': 10
},
'attributes': {
# All attributes use default monitoring unless specified
}
},
'Generic': { # Fallback for unknown manufacturers
'aliases': ['Unknown', 'Generic'],
'wear_leveling_behavior': 'unknown',
'wear_leveling_baseline': None,
'wear_leveling_thresholds': {
'warning': None, # Don't trigger on unknown
'critical': None
},
'attributes': {
# All attributes use default monitoring
}
}
}
SEVERITY_INDICATORS = {
'CRITICAL': '[CRIT]',
'WARNING': '[WARN]',
'HEALTHY': '[ OK ]',
'UNKNOWN': '[ ?? ]'
}
SMART_DESCRIPTIONS = {
'Reported_Uncorrect': """
Number of errors that could not be recovered using hardware ECC.
Impact:
- Indicates permanent data loss in affected sectors
- High correlation with drive hardware failure
- Critical reliability indicator
Recommended Actions:
1. Backup critical data immediately
2. Check drive logs for related errors
3. Plan for drive replacement
4. Monitor for error count increases
""",
'Reallocated_Sector_Ct': """
Number of sectors that have been reallocated due to errors.
Impact:
- High counts indicate degrading media
- Each reallocation uses one of the drive's limited spare sectors
- Rapid increases suggest accelerating drive wear
Recommended Actions:
1. Monitor rate of increase
2. Check drive temperature
3. Plan replacement if count grows rapidly
""",
'Current_Pending_Sector': """
Sectors waiting to be reallocated due to read/write errors.
Impact:
- Indicates potentially unstable sectors
- May result in data loss if unrecoverable
- Should be monitored for increases
Recommended Actions:
1. Backup affected files
2. Run extended SMART tests
3. Monitor for conversion to reallocated sectors
""",
'Offline_Uncorrectable': """
Count of uncorrectable errors detected during offline data collection.
Impact:
- Direct indicator of media reliability issues
- May affect data integrity
- High values suggest drive replacement needed
Recommended Actions:
1. Run extended SMART tests
2. Check drive logs
3. Plan replacement if count is increasing
""",
'Spin_Retry_Count': """
Number of spin start retry attempts.
Impact:
- Indicates potential motor or bearing issues
- May predict imminent mechanical failure
- Increasing values suggest degrading drive health
Recommended Actions:
1. Monitor for rapid increases
2. Check drive temperature
3. Plan replacement if count grows rapidly
""",
'Power_On_Hours': """
Total number of hours the device has been powered on.
Impact:
- Normal aging metric
- Used to gauge overall drive lifetime
- Compare against manufacturer's MTBF rating
Recommended Actions:
1. Compare to warranty period
2. Plan replacement if approaching rated lifetime
""",
'Media_Wearout_Indicator': """
Percentage of drive's rated life remaining (SSDs).
Impact:
- 100 indicates new drive
- 0 indicates exceeded rated writes
- Critical for SSD lifecycle management
Recommended Actions:
1. Plan replacement below 20%
2. Monitor write workload
3. Consider workload redistribution
""",
'Temperature_Celsius': """
Current drive temperature.
Impact:
- High temperatures accelerate wear
- Optimal range: 20-45°C
- Sustained high temps reduce lifespan
Recommended Actions:
1. Check system cooling
2. Verify airflow
3. Monitor for sustained high temperatures
""",
'Available_Spare': """
Percentage of spare blocks remaining (SSDs).
Impact:
- Critical for SSD endurance
- Low values indicate approaching end-of-life
- Rapid decreases suggest excessive writes
Recommended Actions:
1. Plan replacement if below 20%
2. Monitor write patterns
3. Consider workload changes
""",
'Program_Fail_Count': """
Number of flash program operation failures.
Impact:
- Indicates NAND cell reliability
- Important for SSD health assessment
- Increasing values suggest flash degradation
Recommended Actions:
1. Monitor rate of increase
2. Check firmware updates
3. Plan replacement if rapidly increasing
""",
'Erase_Fail_Count': """
Number of flash erase operation failures.
Impact:
- Related to NAND block health
- Critical for SSD reliability
- High counts suggest failing flash blocks
Recommended Actions:
1. Monitor count increases
2. Check firmware version
3. Plan replacement if count is high
""",
'Load_Cycle_Count': """
Number of power cycles and head load/unload events.
Impact:
- Normal operation metric
- High counts may indicate power management issues
- Compare against rated cycles (typically 600k-1M)
Recommended Actions:
1. Review power management settings
2. Monitor rate of increase
3. Plan replacement near rated limit
""",
'Wear_Leveling_Count': """
SSD block erase distribution metric.
Impact:
- Indicates wear pattern uniformity
- Interpretation varies by manufacturer
- Critical for SSD longevity
Recommended Actions:
1. Monitor trend over time
2. Compare with manufacturer baseline
3. Check workload distribution
Note: Different manufacturers use different counting methods:
- Some count up from 0 (Samsung, etc.)
- Others count down from baseline (Ridata, etc.)
- Always check manufacturer specifications
"""
}
# =============================================================================
# INITIALIZATION
# =============================================================================
def __init__(self, ticket_api_url: str = 'http://10.10.10.45/create_ticket_api.php', dry_run: bool = False):
"""
Initialize the system health monitor.
:param ticket_api_url: URL for the ticket creation API.
:param dry_run: If True, simulate API calls without sending requests.
"""
# Load environment configuration first (API keys, etc.)
self.load_env_config()
self.ticket_api_url = ticket_api_url
self.dry_run = dry_run
# Ensure history directory exists
os.makedirs(self.CONFIG['HISTORY_DIR'], exist_ok=True)
def _enforce_storage_limit(self, history_dir: str, max_bytes: int = 10485760):
"""
Delete oldest history files if directory exceeds size limit (default 10MB).
:param history_dir: Directory containing history files
:param max_bytes: Maximum directory size in bytes (default 10MB)
"""
if not os.path.exists(history_dir):
return
try:
total_size = 0
files_with_mtime = []
# Calculate total size and collect file metadata
for f in os.listdir(history_dir):
filepath = os.path.join(history_dir, f)
if f.startswith('smart_history_') and f.endswith('.json'):
try:
stat = os.stat(filepath)
total_size += stat.st_size
files_with_mtime.append((filepath, stat.st_mtime, stat.st_size))
except (IOError, OSError) as e:
logger.debug(f"Could not stat file {filepath}: {e}")
# If over limit, delete oldest files first
if total_size > max_bytes:
# Sort by modification time (oldest first)
files_with_mtime.sort(key=lambda x: x[1])
logger.info(f"History directory size ({total_size} bytes) exceeds limit ({max_bytes} bytes), cleaning up...")
for filepath, mtime, file_size in files_with_mtime:
if total_size <= max_bytes:
break
try:
os.remove(filepath)
total_size -= file_size
logger.info(f"Removed old history file {os.path.basename(filepath)} (saved {file_size} bytes)")
except (IOError, OSError) as e:
logger.warning(f"Could not remove history file {filepath}: {e}")
except Exception as e:
logger.error(f"Error enforcing storage limit: {e}")
# =============================================================================
# MAIN EXECUTION METHODS
# =============================================================================
def run(self):
"""Perform a one-shot health check of the system."""
try:
# Perform health checks and gather the report
health_report = self.perform_health_checks()
# Create tickets for any detected critical issues
self._create_tickets_for_issues(health_report)
# Export Prometheus metrics if enabled
if self.CONFIG.get('PROMETHEUS_ENABLED', False):
self.write_prometheus_metrics(health_report)
except Exception as e:
import traceback
logger.error(f"Unexpected error during health check: {e}")
logger.error(traceback.format_exc())
def perform_health_checks(self) -> Dict[str, Any]:
"""Perform comprehensive system health checks and return a report."""
health_report = {
'hostname': socket.gethostname(),
'timestamp': datetime.datetime.now().isoformat(),
'drives_health': self._check_drives_health(),
'memory_health': self._check_memory_usage(),
'cpu_health': self._check_cpu_usage(),
'network_health': self._check_network_status(),
'ceph_health': self._check_ceph_health(),
'lxc_health': self._check_lxc_storage(),
'system_health': self._check_system_drive_indicators()
}
if self.dry_run:
logger.info("\n=== System Health Summary ===")
logger.info(f"Overall Drive Health: {health_report['drives_health']['overall_status']}")
# Summarized drive information with usage
logger.info("\nDrive Status:")
for drive in health_report['drives_health']['drives']:
issues = drive.get('smart_issues', [])
temp = f", {drive.get('temperature')}°C" if drive.get('temperature') else ""
status = "⚠️ " if issues else ""
# Disk usage information
usage_info = ""
if drive.get('partitions'):
for partition in drive['partitions']:
usage_info += f"\n └─ {partition['mountpoint']}: {partition['used_space']}/{partition['total_space']} ({partition['usage_percent']}% used)"
logger.info(f"{status}{drive['device']}{temp} - SMART: {drive['smart_status']}{usage_info}")
if issues:
logger.info(f" Issues: {', '.join(issues)}")
logger.info(f"\nMemory: {health_report['memory_health']['memory_percent']}% used")
if health_report['memory_health'].get('has_ecc'):
logger.info("ECC Memory: Present")
if health_report['memory_health'].get('ecc_errors'):
logger.info(f"ECC Errors: {len(health_report['memory_health']['ecc_errors'])} found")
logger.info(f"\nCPU Usage: {health_report['cpu_health']['cpu_usage_percent']}%")
logger.info("\nNetwork Status:")
logger.info(f"Management: {health_report['network_health']['management_network']['status']}")
logger.info(f"Ceph Network: {health_report['network_health']['ceph_network']['status']}")
# Ceph cluster status
ceph = health_report.get('ceph_health', {})
if ceph.get('is_ceph_node'):
logger.info("\nCeph Cluster Status:")
logger.info(f" Cluster Health: {ceph.get('cluster_health', 'UNKNOWN')}")
if ceph.get('cluster_usage'):
usage = ceph['cluster_usage']
logger.info(f" Cluster Usage: {usage.get('usage_percent', 0):.1f}%")
logger.info(f" OSDs: {len(ceph.get('osd_status', []))} total")
down_osds = [o for o in ceph.get('osd_status', []) if o.get('status') == 'down']
if down_osds:
logger.info(f" ⚠️ Down OSDs: {len(down_osds)}")
if ceph.get('cluster_wide_issues'):
logger.info(f" ⚠️ Cluster-wide issues: {len(ceph['cluster_wide_issues'])}")
if ceph.get('issues'):
logger.info(f" ⚠️ Node-specific issues: {len(ceph['issues'])}")
if health_report['system_health']['issues']:
logger.info(f"\nSystem Issues: {len(health_report['system_health']['issues'])} found")
logger.info("\n=== End Summary ===")
return health_report
# =============================================================================
# ENHANCED SMART ANALYSIS METHODS
# =============================================================================
def _analyze_smart_trends(self, device: str, current_attributes: dict) -> List[str]:
"""Analyze SMART attribute trends to predict failures."""
issues = []
# Create safe filename from device path
device_safe = device.replace('/', '_').replace('-', '_')
historical_file = os.path.join(self.CONFIG['HISTORY_DIR'], f"smart_history_{device_safe}.json")
try:
# Enforce storage limit before writing
self._enforce_storage_limit(self.CONFIG['HISTORY_DIR'])
# Load historical data with file locking
history = []
if os.path.exists(historical_file) and os.path.getsize(historical_file) > 0:
file_mode = 'r+'
else:
file_mode = 'w+'
with open(historical_file, file_mode) as f:
# Acquire exclusive lock
fcntl.flock(f.fileno(), fcntl.LOCK_EX)
try:
# Read existing data if file is not empty
if os.path.getsize(historical_file) > 0:
f.seek(0)
try:
history = json.load(f)
except json.JSONDecodeError as e:
logger.warning(f"Corrupted history file {historical_file}, starting fresh: {e}")
history = []
# Add current reading
current_reading = {
'timestamp': datetime.datetime.now().isoformat(),
'attributes': current_attributes
}
history.append(current_reading)
# Keep only recent data (30 days default)
cutoff_date = datetime.datetime.now() - datetime.timedelta(days=self.CONFIG['HISTORY_RETENTION_DAYS'])
history = [h for h in history if datetime.datetime.fromisoformat(h['timestamp']) > cutoff_date]
# Analyze trends for critical attributes
if len(history) >= 3: # Need at least 3 data points for trend analysis
critical_attrs = ['Reallocated_Sector_Ct', 'Current_Pending_Sector', 'Reported_Uncorrect',
'Offline_Uncorrectable', 'Program_Fail_Count', 'Erase_Fail_Count']
for attr in critical_attrs:
if attr in current_attributes:
# Get last week's values
recent_history = history[-7:] if len(history) >= 7 else history
values = [h['attributes'].get(attr, 0) for h in recent_history]
if len(values) >= 3:
# Check for rapid increase
recent_increase = values[-1] - values[0]
if recent_increase > 0:
rate = recent_increase / len(values)
# Different thresholds for different attributes
if attr in ['Reallocated_Sector_Ct', 'Current_Pending_Sector']:
if rate > 0.5: # More than 0.5 sectors per check
issues.append(f"TREND ALERT: Rapid increase in {attr}: +{recent_increase} in {len(values)} checks")
elif attr in ['Reported_Uncorrect', 'Offline_Uncorrectable']:
if rate > 0.2: # Any consistent increase is concerning
issues.append(f"TREND ALERT: Increasing {attr}: +{recent_increase} in {len(values)} checks")
else: # Program/Erase fail counts
if rate > 1: # More than 1 error per check
issues.append(f"TREND ALERT: Rapid increase in {attr}: +{recent_increase} in {len(values)} checks")
# Write updated history atomically
f.seek(0)
f.truncate()
json.dump(history, f, indent=2)
f.flush()
finally:
# Release lock
fcntl.flock(f.fileno(), fcntl.LOCK_UN)
except (IOError, OSError) as e:
logger.debug(f"I/O error analyzing trends for {device}: {e}")
except Exception as e:
logger.error(f"Unexpected error analyzing trends for {device}: {e}")
return issues
def _check_thermal_health(self, device: str, temperature: int, drive_type: str = 'HDD') -> List[str]:
"""Enhanced thermal health checking with drive-type specific thresholds."""
issues = []
if temperature is None:
return issues
# Drive-type specific temperature thresholds - ADJUSTED TO BE LESS SENSITIVE
if drive_type == 'SSD':
temp_thresholds = {'warning': 70, 'critical': 85, 'optimal_max': 65}
else: # HDD
temp_thresholds = {'warning': 65, 'critical': 75, 'optimal_max': 60}
if temperature >= temp_thresholds['critical']:
issues.append(f"CRITICAL: Drive temperature {temperature}°C exceeds safe operating limit for {drive_type}")
elif temperature >= temp_thresholds['warning']:
issues.append(f"WARNING: Drive temperature {temperature}°C approaching thermal limit for {drive_type}")
elif temperature > temp_thresholds['optimal_max']:
issues.append(f"INFO: Drive temperature {temperature}°C above optimal range for {drive_type}")
return issues
def _analyze_error_patterns(self, device: str, smart_output: str) -> List[str]:
"""Analyze SMART error logs for failure patterns."""
issues = []
# Pattern matching for different error types
error_patterns = {
'media_errors': [
r'UNC_ERR',
r'ABRT_ERR',
r'read error',
r'write error',
r'medium error'
],
'interface_errors': [
r'ICRC_ERR',
r'interface CRC error',
r'SATA link down',
r'communication failure'
],
'timeout_errors': [
r'command timeout',
r'NCQ error',
r'device fault',
r'reset required'
]
}
for error_type, patterns in error_patterns.items():
error_count = 0
for pattern in patterns:
matches = re.findall(pattern, smart_output, re.IGNORECASE)
error_count += len(matches)
if error_count > 0:
if error_count >= 10:
issues.append(f"CRITICAL: Multiple {error_type} detected ({error_count} occurrences)")
elif error_count >= 3:
issues.append(f"WARNING: {error_type} detected ({error_count} occurrences)")
elif error_count >= 1:
issues.append(f"INFO: {error_type} detected ({error_count} occurrences)")
return issues
def _check_ssd_health(self, device: str, smart_attributes: dict) -> List[str]:
"""SSD-specific health checks for wear and endurance."""
issues = []
# Check wear leveling and endurance indicators
wear_indicators = [
'Media_Wearout_Indicator',
'SSD_Life_Left',
'Percent_Lifetime_Remain',
'Available_Spare',
'Available_Spare_Threshold'
]
for indicator in wear_indicators:
if indicator in smart_attributes:
value = smart_attributes[indicator]
# Handle percentage-based indicators (countdown from 100)
if indicator in ['Media_Wearout_Indicator', 'SSD_Life_Left', 'Percent_Lifetime_Remain', 'Available_Spare']:
if value <= 5:
issues.append(f"CRITICAL: {indicator} at {value}% - SSD near end of life")
elif value <= 15:
issues.append(f"WARNING: {indicator} at {value}% - SSD showing significant wear")
elif value <= 30:
issues.append(f"INFO: {indicator} at {value}% - SSD wear monitoring recommended")
# Check for excessive bad blocks
bad_block_indicators = [
'Runtime_Bad_Block',
'Factory_Bad_Block_Ct',
'Grown_Failing_Block_Ct',
'End-to-End_Error'
]
for indicator in bad_block_indicators:
if indicator in smart_attributes:
value = smart_attributes[indicator]
if value > 100:
issues.append(f"WARNING: High {indicator}: {value}")
elif value > 10:
issues.append(f"INFO: Elevated {indicator}: {value}")
# Check write amplification and endurance metrics
endurance_indicators = [
'Total_LBAs_Written',
'Total_LBAs_Read',
'Host_Program_NAND_Pages_Count',
'FTL_Program_NAND_Pages_Count'
]
# Calculate write amplification if both host and FTL write counts are available
host_writes = smart_attributes.get('Host_Program_NAND_Pages_Count', 0)
ftl_writes = smart_attributes.get('FTL_Program_NAND_Pages_Count', 0)
if host_writes > 0 and ftl_writes > 0:
write_amplification = ftl_writes / host_writes
if write_amplification > 5.0:
issues.append(f"WARNING: High write amplification factor: {write_amplification:.2f}")
elif write_amplification > 3.0:
issues.append(f"INFO: Elevated write amplification factor: {write_amplification:.2f}")
return issues
def _check_system_drive_indicators(self) -> Dict[str, Any]:
"""Check system logs and kernel messages for drive issues."""
system_health = {
'status': 'OK',
'issues': []
}
try:
# Check dmesg for drive-related errors (last 1000 lines to avoid overwhelming output)
result = subprocess.run(['dmesg', '-T', '--level=err,warn'],
stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, timeout=10)
if result.returncode == 0:
error_patterns = [
(r'ata\d+.*failed command', 'ATA command failures'),
(r'sd \w+.*Medium Error', 'SCSI medium errors'),
(r'Buffer I/O error', 'Buffer I/O errors'),
(r'critical medium error', 'Critical medium errors'),
(r'unrecovered read error', 'Unrecovered read errors'),
(r'Current_Pending_Sector.*increased', 'Pending sector increases'),
(r'ata\d+.*SError:', 'SATA errors'),
(r'nvme\d+.*I/O error', 'NVMe I/O errors')
]
for pattern, description in error_patterns:
matches = re.findall(pattern, result.stdout, re.IGNORECASE)
if matches:
count = len(matches)
if count >= 5:
system_health['status'] = 'CRITICAL'
system_health['issues'].append(f"CRITICAL: {description} in system logs ({count} occurrences)")
elif count >= 2:
if system_health['status'] != 'CRITICAL':
system_health['status'] = 'WARNING'
system_health['issues'].append(f"WARNING: {description} in system logs ({count} occurrences)")
else:
system_health['issues'].append(f"INFO: {description} in system logs ({count} occurrences)")
except subprocess.TimeoutExpired:
system_health['issues'].append("WARNING: System log check timed out")
except Exception as e:
logger.debug(f"Error checking system drive indicators: {e}")
system_health['issues'].append(f"ERROR: Failed to check system logs: {str(e)}")
return system_health
# =============================================================================
# DRIVE HEALTH CHECKING METHODS
# =============================================================================
def _get_drive_details(self, device: str) -> Dict[str, str]:
"""Get detailed drive information using smartctl."""
drive_details = {
'model': None,
'serial': None,
'capacity': None,
'firmware': None,
'type': None, # SSD or HDD
'smart_capable': False
}
try:
# First check if device supports SMART
capability_result = subprocess.run(
['smartctl', '-i', device],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
timeout=30
)
# Check if smartctl failed completely
if capability_result.returncode not in [0, 4]: # 0 = success, 4 = some SMART errors but readable
logger.debug(f"smartctl failed for {device}: return code {capability_result.returncode}")
return drive_details
output = capability_result.stdout
# Check if SMART is supported
if "SMART support is: Enabled" in output or "SMART support is: Available" in output:
drive_details['smart_capable'] = True
elif "SMART support is: Unavailable" in output or "does not support SMART" in output:
logger.debug(f"Device {device} does not support SMART")
return drive_details
for line in output.split('\n'):
if 'Device Model' in line or 'Model Number' in line:
drive_details['model'] = line.split(':')[1].strip()
elif 'Serial Number' in line:
drive_details['serial'] = line.split(':')[1].strip()
elif 'User Capacity' in line:
# Extract capacity from brackets
capacity_match = re.search(r'\[(.*?)\]', line)
if capacity_match:
drive_details['capacity'] = capacity_match.group(1)
elif 'Firmware Version' in line:
drive_details['firmware'] = line.split(':')[1].strip()
elif 'Rotation Rate' in line:
if 'Solid State Device' in line:
drive_details['type'] = 'SSD'
else:
drive_details['type'] = 'HDD'
except Exception as e:
logger.debug(f"Error getting drive details for {device}: {e}")
return drive_details
def _get_issue_type(self, issue: str) -> str:
"""Determine issue type from issue description."""
if "SMART" in issue:
return "SMART Health Issue"
elif "Drive" in issue:
return "Storage Issue"
elif any(kw in issue for kw in ["Ceph", "OSD", "ceph"]):
return "Ceph Cluster Issue"
elif "ECC" in issue:
return "Memory Issue"
elif "CPU" in issue:
return "Performance Issue"
elif "Network" in issue:
return "Network Issue"
elif any(kw in issue for kw in ["LXC", "storage usage", "container"]):
return "Container Storage Issue"
return "Hardware Issue"
def _get_impact_level(self, issue: str) -> str:
"""Determine impact level from issue description."""
issue_upper = issue.upper()
# Check storage/CPU warnings first so "critical storage" isn't caught as Critical
if any(kw in issue_upper for kw in ["STORAGE USAGE", "THRESHOLD", "CPU USAGE"]):
return "[WARN] Warning - Action Needed Soon"
if "CRITICAL" in issue_upper or "UNHEALTHY" in issue_upper or "HEALTH_ERR" in issue_upper:
return "[CRIT] Critical - Immediate Action Required"
elif "WARNING" in issue_upper or "HEALTH_WARN" in issue_upper or "DOWN" in issue_upper:
return "[WARN] Warning - Action Needed Soon"
return "[LOW] Low - Monitor Only"
def _generate_detailed_description(self, issue: str, health_report: Dict[str, Any], priority: str = '3') -> str:
"""Generate detailed ticket description with properly formatted ASCII art."""
hostname = socket.gethostname()
timestamp = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
priority_labels = {
'1': '⚠ P1 - CRITICAL', '2': '⚠ P2 - HIGH',
'3': '● P3 - MEDIUM', '4': '● P4 - NORMAL', '5': '● P5 - LOW',
}
priority_display = priority_labels.get(priority, '● P3 - MEDIUM')
# Box width: all lines are exactly 80 chars
# border lines: ┏ + 78 ━ + ┓ = 80
# content lines: prefix + field_width + ┃ = 80
box_width = 78
banner = f"""
{'' * box_width}
{' HARDWARE MONITORING ALERT TICKET '.center(box_width)}
{'' * box_width}
┃ Host : {hostname:<{box_width - 14}}
┃ Generated : {timestamp:<{box_width - 14}}
┃ Priority : {priority_display:<{box_width - 14}}
{'' * box_width}"""
issue_type = self._get_issue_type(issue)
impact_level = self._get_impact_level(issue)
executive_summary = f"""
┏━ EXECUTIVE SUMMARY {'' * (box_width - 20)}
┃ Issue Type │ {issue_type:<60}
┃ Impact Level │ {impact_level:<60}
{'' * box_width}"""
description = banner + executive_summary
# Add relevant SMART descriptions
for attr in self.SMART_DESCRIPTIONS:
if attr in issue:
description += f"\n{attr}:\n{textwrap.dedent(self.SMART_DESCRIPTIONS[attr]).strip()}\n"
if "SMART" in issue:
description += "\n" + textwrap.dedent("""
SMART (Self-Monitoring, Analysis, and Reporting Technology) Attribute Details:
- Possible drive failure!
""").strip() + "\n"
if "Drive" in issue and "/dev/" in issue:
try:
device = re.search(r'/dev/[a-zA-Z0-9]+', issue).group(0) if '/dev/' in issue else None
drive_info = next((d for d in health_report['drives_health']['drives'] if d['device'] == device), None)
if drive_info:
drive_details = self._get_drive_details(device)
smart_data = {
'attributes': drive_info.get('smart_attributes', {}),
'performance_metrics': drive_info.get('performance_metrics', {}),
'last_test_date': drive_info.get('last_test_date', 'N/A')
}
power_on_hours = smart_data['attributes'].get('Power_On_Hours', 'N/A')
last_test_date = smart_data.get('last_test_date', 'N/A')
if power_on_hours != 'N/A' and isinstance(power_on_hours, (int, float)):
total_days = power_on_hours / 24
years = int(total_days / 365)
months = int((total_days % 365) / 30)
if years >= 1:
age = f"{years} year{'s' if years != 1 else ''}, {months} month{'s' if months != 1 else ''}"
elif months >= 1:
age = f"{months} month{'s' if months != 1 else ''}"
else:
age = "< 1 month"
else:
age = 'N/A'
# Ensure all values are properly formatted strings
device_safe = device or 'N/A'
model_safe = drive_details.get('model') or 'N/A'
serial_safe = drive_details.get('serial') or 'N/A'
capacity_safe = drive_details.get('capacity') or 'N/A'
type_safe = drive_details.get('type') or 'N/A'
firmware_safe = drive_details.get('firmware') or 'N/A'
description += f"""
┏━ DRIVE SPECIFICATIONS {'' * (box_width - 23)}
┃ Device Path │ {device_safe:<61}
┃ Model │ {model_safe:<61}
┃ Serial │ {serial_safe:<61}
┃ Capacity │ {capacity_safe:<61}
┃ Type │ {type_safe:<61}
┃ Firmware │ {firmware_safe:<61}
{'' * box_width}
"""
power_on_safe = f"{power_on_hours} hours" if power_on_hours != 'N/A' else 'N/A'
last_test_safe = last_test_date or 'N/A'
age_safe = age or 'N/A'
description += f"""
┏━ DRIVE TIMELINE {'' * (box_width - 17)}
┃ Power-On Hours │ {power_on_safe:<56}
┃ Last SMART Test │ {last_test_safe:<56}
┃ Drive Age │ {age_safe:<56}
{'' * box_width}
"""
smart_status_safe = drive_info.get('smart_status') or 'N/A'
# Properly handle temperature with None check
temp_value = drive_info.get('temperature')
temp_safe = f"{temp_value}°C" if temp_value is not None else 'N/A'
description += f"""
┏━ SMART STATUS {'' * (box_width - 15)}
┃ Status │ {smart_status_safe:<62}
┃ Temperature │ {temp_safe:<62}
{'' * box_width}
"""
if drive_info.get('smart_attributes'):
description += f"\n┏━ SMART ATTRIBUTES {'' * (box_width - 19)}\n"
for attr, value in drive_info['smart_attributes'].items():
attr_safe = str(attr).replace('_', ' ') if attr else 'Unknown'
value_safe = str(value) if value is not None else 'N/A'
description += f"{attr_safe:<27}{value_safe:<46}\n"
description += f"{'' * box_width}\n"
if drive_info.get('partitions'):
for partition in drive_info['partitions']:
usage_percent = partition.get('usage_percent', 0)
# Create 50-char usage meter (2% per block)
blocks = int(usage_percent / 2)
usage_meter = '' * blocks + '' * (50 - blocks)
mountpoint_safe = partition.get('mountpoint') or 'N/A'
fstype_safe = partition.get('fstype') or 'N/A'
total_space_safe = partition.get('total_space') or 'N/A'
used_space_safe = partition.get('used_space') or 'N/A'
free_space_safe = partition.get('free_space') or 'N/A'
usage_pct_str = f"{usage_percent}%"
# Truncate mountpoint if too long for header
mountpoint_display = mountpoint_safe[:50] if len(mountpoint_safe) > 50 else mountpoint_safe
description += f"""
┏━ PARTITION: {mountpoint_display} {'' * (box_width - 14 - len(mountpoint_display))}
┃ Filesystem │ {fstype_safe:<61}
┃ Usage Meter │ {usage_meter} {usage_pct_str:>10}
┃ Total Space │ {total_space_safe:<61}
┃ Used Space │ {used_space_safe:<61}
┃ Free Space │ {free_space_safe:<61}
{'' * box_width}
"""
firmware_info = self._check_disk_firmware(device)
if firmware_info['is_problematic']:
description += f"\n┏━ FIRMWARE ALERTS {'' * (box_width - 18)}\n"
for issue_item in firmware_info['known_issues']:
issue_safe = str(issue_item) if issue_item else 'Unknown issue'
description += f"┃ ⚠ {issue_safe:<{box_width - 4}}\n"
description += f"{'' * box_width}\n"
except Exception as e:
description += f"\nError generating drive details: {str(e)}\n"
if "Temperature" in issue:
description += "\n" + textwrap.dedent("""
High drive temperatures can:
- Reduce drive lifespan
- Cause performance degradation
- Lead to data corruption in extreme cases
Optimal temperature range: 20-45°C
""").strip() + "\n"
if "ECC" in issue:
description += "\n" + textwrap.dedent("""
ECC (Error Correction Code) Memory Issues:
- Correctable: Memory errors that were successfully fixed
- Uncorrectable: Serious memory errors that could not be corrected
Frequent ECC corrections may indicate degrading memory modules
""").strip() + "\n"
if "CPU" in issue:
description += "\n" + textwrap.dedent("""
High CPU usage sustained over time can indicate:
- Resource constraints
- Runaway processes
- Need for performance optimization
- Potential cooling issues
""").strip() + "\n"
# Add CPU STATUS box
cpu_health = health_report.get('cpu_health', {})
cpu_usage = cpu_health.get('cpu_usage_percent', 'N/A')
cpu_threshold = self.CONFIG['THRESHOLDS']['CPU_WARNING']
cpu_status = cpu_health.get('status', 'N/A')
cpu_usage_str = f"{cpu_usage}%" if isinstance(cpu_usage, (int, float)) else cpu_usage
description += f"""
┏━ CPU STATUS {'' * (box_width - 13)}
┃ Usage │ {cpu_usage_str:<61}
┃ Threshold │ {str(cpu_threshold) + '%':<61}
┃ Status │ {cpu_status:<61}
{'' * box_width}
"""
if "Network" in issue:
description += "\n" + textwrap.dedent("""
Network connectivity issues can impact:
- Cluster communication
- Data replication
- Service availability
- Management access
""").strip() + "\n"
# Add NETWORK STATUS box
net_health = health_report.get('network_health', {})
mgmt = net_health.get('management_network', {})
ceph_net = net_health.get('ceph_network', {})
mgmt_status = mgmt.get('status', 'N/A')
ceph_status = ceph_net.get('status', 'N/A')
mgmt_latency = mgmt.get('latency')
mgmt_latency_str = f"{mgmt_latency}ms" if mgmt_latency is not None else 'N/A'
mgmt_issues = mgmt.get('issues', [])
ceph_issues = ceph_net.get('issues', [])
all_net_issues = mgmt_issues + ceph_issues
issues_str = '; '.join(all_net_issues) if all_net_issues else 'None'
# Truncate issues string to fit in box
if len(issues_str) > 61:
issues_str = issues_str[:58] + '...'
description += f"""
┏━ NETWORK STATUS {'' * (box_width - 17)}
┃ Management │ {mgmt_status:<61}
┃ Ceph Network │ {ceph_status:<61}
┃ Latency │ {mgmt_latency_str:<61}
┃ Issues │ {issues_str:<61}
{'' * box_width}
"""
if any(kw in issue for kw in ["LXC", "storage usage", "container"]):
# Add CONTAINER STORAGE box
lxc_health = health_report.get('lxc_health', {})
containers = lxc_health.get('containers', [])
for container in containers:
vmid = container.get('vmid', 'N/A')
for fs in container.get('filesystems', []):
mountpoint = fs.get('mountpoint', 'N/A')
usage_pct = fs.get('usage_percent', 0)
total_bytes = fs.get('total_space', 0)
used_bytes = fs.get('used_space', 0)
avail_bytes = fs.get('available', 0)
# Only show filesystems relevant to this issue
if mountpoint not in issue and vmid not in issue:
continue
total_str = self._format_bytes_human(total_bytes) if isinstance(total_bytes, (int, float)) else str(total_bytes)
used_str = self._format_bytes_human(used_bytes) if isinstance(used_bytes, (int, float)) else str(used_bytes)
free_str = self._format_bytes_human(avail_bytes) if isinstance(avail_bytes, (int, float)) else str(avail_bytes)
# Create 50-char usage meter (2% per block)
blocks = int(usage_pct / 2)
usage_meter = '' * blocks + '' * (50 - blocks)
usage_pct_str = f"{usage_pct:.1f}%"
description += f"""
┏━ CONTAINER STORAGE {'' * (box_width - 20)}
┃ VMID │ {vmid:<61}
┃ Mountpoint │ {mountpoint:<61}
┃ Usage Meter │ {usage_meter} {usage_pct_str:>10}
┃ Total │ {total_str:<61}
┃ Used │ {used_str:<61}
┃ Free │ {free_str:<61}
{'' * box_width}
"""
if any(kw in issue for kw in ["Ceph", "OSD", "ceph", "HEALTH_ERR", "HEALTH_WARN"]):
# Add CEPH CLUSTER STATUS box
ceph_health = health_report.get('ceph_health', {})
if ceph_health.get('is_ceph_node'):
cluster_health = ceph_health.get('cluster_health', 'N/A')
cluster_usage = ceph_health.get('cluster_usage', {})
usage_pct = cluster_usage.get('usage_percent', 'N/A') if cluster_usage else 'N/A'
total_bytes = cluster_usage.get('total_bytes', 0) if cluster_usage else 0
used_bytes = cluster_usage.get('used_bytes', 0) if cluster_usage else 0
total_str = self._format_bytes_human(total_bytes) if total_bytes else 'N/A'
used_str = self._format_bytes_human(used_bytes) if used_bytes else 'N/A'
usage_pct_str = f"{usage_pct}%" if isinstance(usage_pct, (int, float)) else usage_pct
osd_list = ceph_health.get('osd_status', [])
osd_total = len(osd_list)
osd_up = sum(1 for o in osd_list if o.get('status') == 'up')
osd_summary = f"{osd_up}/{osd_total} up" if osd_total > 0 else 'N/A'
description += f"""
┏━ CEPH CLUSTER STATUS {'' * (box_width - 22)}
┃ Health │ {cluster_health:<61}
┃ Usage │ {usage_pct_str:<61}
┃ Total │ {total_str:<61}
┃ Used │ {used_str:<61}
┃ OSDs │ {osd_summary:<61}
{'' * box_width}
"""
if "Disk" in issue:
for partition in health_report.get('drives_health', {}).get('drives', []):
if partition.get('mountpoint') in issue:
description += f"\n=== Disk Metrics ===\n"
description += f"Disk Device: {partition['device']}\n"
description += f"Mount Point: {partition['mountpoint']}\n"
description += f"Total Space: {partition['total_space']}\n"
description += f"Used Space: {partition['used_space']}\n"
description += f"Free Space: {partition['free_space']}\n"
description += f"Usage Percent: {partition['usage_percent']}%\n"
return description
def _count_critical_issues(self, health_report: Dict[str, Any]) -> int:
"""Count total critical issues across all health checks for P1 escalation."""
count = 0
# Manufacturer operation counters to exclude (same as in _detect_issues)
manufacturer_counters = [
'Seek_Error_Rate', 'Command_Timeout', 'Raw_Read_Error_Rate'
]
# Count drive failures
for drive in health_report.get('drives_health', {}).get('drives', []):
if drive.get('smart_status') == 'UNHEALTHY':
count += 1
# Only count critical issues that aren't manufacturer operation counters
for issue in drive.get('smart_issues', []):
if 'critical' in issue.lower():
# Skip manufacturer operation counters
if not any(counter in issue for counter in manufacturer_counters):
count += 1
# Count ECC errors
if health_report.get('memory_health', {}).get('status') == 'CRITICAL':
count += 1
# Count network failures
net = health_report.get('network_health', {})
if net.get('management_network', {}).get('status') == 'CRITICAL':
count += 1
if net.get('ceph_network', {}).get('status') == 'CRITICAL':
count += 1
# Count LXC critical issues
if health_report.get('lxc_health', {}).get('status') == 'CRITICAL':
count += 1
return count
def _determine_ticket_priority(self, issue: str, health_report: Dict[str, Any]) -> str:
"""
Determine ticket priority based on issue type, severity, and context.
P1 = Cluster outages, multiple simultaneous failures
P2 = Hardware failures requiring same-day response
P3 = Warnings requiring response within 1-3 days
P4 = Normal monitoring alerts
P5 = Informational/minimal impact
"""
issue_lower = issue.lower()
# Count total critical issues for escalation logic
critical_count = self._count_critical_issues(health_report)
escalation_threshold = self.CONFIG.get('PRIORITY_ESCALATION_THRESHOLD', 3)
# P1 - Multiple simultaneous critical failures (cluster risk)
if critical_count >= escalation_threshold:
logger.info(f"P1 escalation triggered: {critical_count} critical issues detected")
return self.PRIORITIES['CRITICAL'] # P1
# P1 - Specific cluster-affecting scenarios
if any(keyword in issue_lower for keyword in [
'raid degraded', 'multiple drive',
'both networks unreachable',
'health_err' # Ceph cluster error
]):
return self.PRIORITIES['CRITICAL'] # P1
# P2 - Hardware failures requiring same-day response
if any(keyword in issue_lower for keyword in [
'smart failure', 'smart overall health check failed',
'drive failure', 'disk failure',
'uncorrectable ecc', 'hardware failure',
'critical temperature', 'firmware issue',
'reallocated_sector', 'pending_sector', 'offline_uncorrectable',
'critical available_spare', 'critical wear',
'critical reallocated', 'critical current_pending',
'network is unreachable',
'osd is down', 'osd down', # Ceph OSD down
'cluster usage critical' # Ceph usage critical
]):
return self.PRIORITIES['HIGH'] # P2
# P2 - Ceph OSD issues (need to check explicitly since 'down' is in issue text)
if '[ceph]' in issue_lower and 'down' in issue_lower:
return self.PRIORITIES['HIGH'] # P2
# P2 - SMART issues with critical indicators
if 'smart issues' in issue_lower and any(error_type in issue_lower for error_type in [
'critical', 'failed', 'reallocated', 'pending', 'uncorrectable', 'offline'
]):
return self.PRIORITIES['HIGH'] # P2
# P3 - Warnings requiring attention within days
if any(keyword in issue_lower for keyword in [
'warning', 'high temperature', 'correctable ecc',
'trend alert', 'critical storage usage',
'low available_spare', 'high wear',
'health_warn', 'cluster usage warning' # Ceph warnings
]):
return self.PRIORITIES['MEDIUM'] # P3
# P4 - Normal monitoring alerts
if any(keyword in issue_lower for keyword in [
'cpu usage', 'high storage usage',
'system log', 'drive age'
]):
return self.PRIORITIES['NORMAL'] # P4
# P5 - Informational/minimal impact
if any(keyword in issue_lower for keyword in [
'info:', 'info ', 'above optimal', 'monitor only'
]):
return self.PRIORITIES['LOW'] # P5
# Default to P3 for unknown issues (conservative approach)
return self.PRIORITIES['MEDIUM']
def _categorize_issue(self, issue: str) -> tuple:
"""
Determine the correct category, type, and tags for an issue.
Returns:
tuple: (category, ticket_type, issue_tag, ticket_type_tag)
- category: 'Hardware', 'Software', 'Network', etc.
- ticket_type: 'Issue', 'Problem', 'Task', 'Maintenance', etc.
- issue_tag: '[hardware]', '[software]', '[network]'
- ticket_type_tag: '[issue]', '[problem]', etc.
"""
issue_lower = issue.lower()
# Hardware Issues - Physical hardware problems
if any(keyword in issue_lower for keyword in [
'smart', 'drive', 'disk', '/dev/', 'sector', 'temperature',
'firmware', 'power_on_hours', 'reallocated', 'pending',
'ecc', 'memory', 'high_fly_writes', 'spin_retry', 'current_pending',
'nvme'
]):
# SMART errors/failures are issues (unplanned degradation)
if any(error in issue_lower for error in ['critical', 'failed', 'failure', 'error']):
return (
self.TICKET_CATEGORIES['HARDWARE'],
self.TICKET_TYPES['ISSUE'],
self.TICKET_TEMPLATES['HARDWARE_TYPE']['HARDWARE'],
self.TICKET_TEMPLATES['TICKET_TYPE']['ISSUE']
)
# SMART warnings are problems (need investigation)
else:
return (
self.TICKET_CATEGORIES['HARDWARE'],
self.TICKET_TYPES['PROBLEM'],
self.TICKET_TEMPLATES['HARDWARE_TYPE']['HARDWARE'],
self.TICKET_TEMPLATES['TICKET_TYPE']['PROBLEM']
)
# Software Issues - Application/OS/Container issues
if any(keyword in issue_lower for keyword in [
'lxc', 'container', 'storage usage', 'cpu usage', 'process',
'application', 'service', 'daemon'
]):
# Critical storage/CPU is an issue (service degradation)
if 'critical' in issue_lower:
return (
self.TICKET_CATEGORIES['SOFTWARE'],
self.TICKET_TYPES['ISSUE'],
self.TICKET_TEMPLATES['SOFTWARE_TYPE']['SOFTWARE'],
self.TICKET_TEMPLATES['TICKET_TYPE']['ISSUE']
)
# Warning level is a problem (needs investigation before it becomes critical)
else:
return (
self.TICKET_CATEGORIES['SOFTWARE'],
self.TICKET_TYPES['PROBLEM'],
self.TICKET_TEMPLATES['SOFTWARE_TYPE']['SOFTWARE'],
self.TICKET_TEMPLATES['TICKET_TYPE']['PROBLEM']
)
# Network Issues - Network connectivity/infrastructure (categorized as Hardware)
if any(keyword in issue_lower for keyword in [
'network', 'connectivity', 'unreachable', 'latency', 'packet loss',
'interface', 'link down'
]):
# Network failures are issues
if any(error in issue_lower for error in ['failure', 'down', 'unreachable', 'critical']):
return (
self.TICKET_CATEGORIES['HARDWARE'],
self.TICKET_TYPES['ISSUE'],
self.TICKET_TEMPLATES['NETWORK_TYPE']['NETWORK'],
self.TICKET_TEMPLATES['TICKET_TYPE']['ISSUE']
)
# Network warnings are problems
else:
return (
self.TICKET_CATEGORIES['HARDWARE'],
self.TICKET_TYPES['PROBLEM'],
self.TICKET_TEMPLATES['NETWORK_TYPE']['NETWORK'],
self.TICKET_TEMPLATES['TICKET_TYPE']['PROBLEM']
)
# Ceph Issues - Storage cluster issues (categorized as Hardware)
if any(keyword in issue_lower for keyword in [
'ceph', 'osd', 'health_err', 'health_warn', 'cluster usage'
]):
# Ceph errors are issues (unplanned degradation)
if any(error in issue_lower for error in [
'health_err', 'down', 'critical', 'error'
]):
return (
self.TICKET_CATEGORIES['HARDWARE'],
self.TICKET_TYPES['ISSUE'],
'[ceph]',
self.TICKET_TEMPLATES['TICKET_TYPE']['ISSUE']
)
# Ceph warnings are problems (need investigation)
else:
return (
self.TICKET_CATEGORIES['HARDWARE'],
self.TICKET_TYPES['PROBLEM'],
'[ceph]',
self.TICKET_TEMPLATES['TICKET_TYPE']['PROBLEM']
)
# Default: Hardware Problem (for undefined cases)
return (
self.TICKET_CATEGORIES['HARDWARE'],
self.TICKET_TYPES['PROBLEM'],
self.TICKET_TEMPLATES['HARDWARE_TYPE']['HARDWARE'],
self.TICKET_TEMPLATES['TICKET_TYPE']['PROBLEM']
)
# =============================================================================
# TICKET CREATION METHODS
# =============================================================================
def _create_tickets_for_issues(self, health_report: Dict[str, Any]):
"""Create tickets for detected issues."""
issues = self._detect_issues(health_report)
if not issues:
logger.info("No issues detected.")
return
hostname = socket.gethostname()
action_type = self.TICKET_TEMPLATES['ACTION_TYPE']
environment = self.TICKET_TEMPLATES['ENVIRONMENT']
for issue in issues:
# Use the comprehensive priority determination function
priority = self._determine_ticket_priority(issue, health_report)
# Get proper categorization for this issue
category, ticket_type, issue_tag, ticket_type_tag = self._categorize_issue(issue)
# Determine scope: cluster-wide for Ceph cluster issues, single-node otherwise
is_cluster_wide = '[cluster-wide]' in issue
scope = self.TICKET_TEMPLATES['SCOPE']['CLUSTER_WIDE'] if is_cluster_wide else self.TICKET_TEMPLATES['SCOPE']['SINGLE_NODE']
# Clean issue text for title (remove [cluster-wide] and [ceph] markers if present)
clean_issue = issue
if is_cluster_wide:
clean_issue = clean_issue.replace('[cluster-wide] ', '').replace('[cluster-wide]', '')
# Remove [ceph] marker since _categorize_issue adds it as issue_tag
clean_issue = clean_issue.replace('[ceph] ', '').replace('[ceph]', '')
# Extract drive capacity if this is a drive-related issue
drive_size = ""
if "Drive" in issue and "/dev/" in issue:
device_match = re.search(r'/dev/[a-zA-Z0-9]+', issue)
if device_match:
device = device_match.group(0)
drive_details = self._get_drive_details(device)
if drive_details['capacity']:
drive_size = f"[{drive_details['capacity']}] "
else:
logger.warning(f"Could not extract device from issue: {issue}")
# Build ticket title with proper categorization
# Add space after issue_tag if drive_size is empty (for non-drive issues)
issue_separator = drive_size if drive_size else " "
# Use cluster name for cluster-wide issues instead of individual hostname
# This ensures all nodes generate the same ticket title for deduplication
cluster_name = self.CONFIG.get('CLUSTER_NAME', 'proxmox-cluster')
ticket_source = f"[{cluster_name}]" if is_cluster_wide else f"[{hostname}]"
ticket_title = (
f"{ticket_source}"
f"{action_type['AUTO']}"
f"{issue_tag}"
f"{issue_separator}"
f"{clean_issue}"
f"{scope}"
f"{environment['PRODUCTION']}"
f"{ticket_type_tag}"
)
description = self._generate_detailed_description(issue, health_report, priority)
ticket_payload = {
"title": ticket_title,
"description": description,
"priority": priority,
"status": "Open",
"category": category,
"type": ticket_type
}
if self.dry_run:
logger.info("Dry-run mode enabled. Simulating ticket creation:")
logger.info(json.dumps(ticket_payload, indent=4))
else:
try:
response = requests.post(
self.ticket_api_url,
json=ticket_payload,
headers = {
'Content-Type': 'application/json',
'Authorization': f'Bearer {self.CONFIG["TICKET_API_KEY"]}'
},
timeout=10 # 10 second timeout for API calls
)
try:
response_data = response.json()
except json.JSONDecodeError as e:
logger.error(f"Invalid JSON response from ticket API: {e}")
continue
if response_data.get('success'):
logger.info(f"Ticket created successfully: {ticket_title}")
logger.info(f"Ticket ID: {response_data.get('ticket_id')}")
elif response_data.get('error') == 'Duplicate ticket':
logger.info(f"Duplicate ticket detected - existing ticket ID: {response_data.get('existing_ticket_id')}")
continue
else:
logger.error(f"Failed to create ticket: {response_data.get('error')}")
except Exception as e:
logger.error(f"Error creating ticket: {e}")
def _detect_issues(self, health_report: Dict[str, Any]) -> List[str]:
"""
Detect issues in the health report including non-critical issues.
:param health_report: The comprehensive health report from the checks.
:return: List of issue descriptions detected during checks.
"""
issues = []
# Check for drive-related issues
for drive in health_report.get('drives_health', {}).get('drives', []):
# Skip drives with ERROR or NOT_SUPPORTED status - these are likely virtual/unsupported devices
if drive.get('smart_status') in ['ERROR', 'NOT_SUPPORTED']:
logger.debug(f"Skipping issue detection for drive {drive['device']} with status {drive.get('smart_status')}")
continue
# Only report issues for drives with valid SMART status
if drive.get('smart_issues') and drive.get('smart_status') in ['HEALTHY', 'UNHEALTHY', 'UNKNOWN']:
# Filter out generic error messages and manufacturer-specific false positives
filtered_issues = []
for issue in drive['smart_issues']:
# Skip generic errors
if any(skip_phrase in issue for skip_phrase in [
"Error checking SMART:",
"Unable to read device information",
"SMART not supported",
"timed out"
]):
continue
# Skip manufacturer-specific operation counters (not actual errors)
# These are monitored attributes that manufacturers use as counters
if any(counter_name in issue for counter_name in [
"Seek_Error_Rate", # Seagate/WD use as operation counter
"Command_Timeout", # OOS/Seagate use as operation counter
"Raw_Read_Error_Rate" # Seagate/WD use as operation counter
]):
logger.debug(f"Filtering manufacturer operation counter from issues: {issue}")
continue
filtered_issues.append(issue)
if filtered_issues:
issues.append(f"Drive {drive['device']} has SMART issues: {', '.join(filtered_issues)}")
# Check temperature regardless of SMART status
if drive.get('temperature') and drive['temperature'] > self.CONFIG['THRESHOLDS']['TEMPERATURE_WARNING']:
issues.append(f"Drive {drive['device']} temperature is high: {drive['temperature']}°C")
# Check for ECC memory errors
memory_health = health_report.get('memory_health', {})
if memory_health.get('has_ecc') and memory_health.get('ecc_errors'):
issues.extend(memory_health['ecc_errors'])
# Check for CPU-related issues
cpu_health = health_report.get('cpu_health', {})
if cpu_health and cpu_health.get('cpu_usage_percent', 0) > self.CONFIG['THRESHOLDS']['CPU_WARNING']:
issues.append("CPU usage is above threshold of 95%")
# Check for network-related issues
network_health = health_report.get('network_health', {})
for network in ['management_network', 'ceph_network']:
if network_health.get(network, {}).get('issues'):
issues.extend(network_health[network]['issues'])
lxc_health = health_report.get('lxc_health', {})
if lxc_health.get('status') in ['WARNING', 'CRITICAL']:
issues.extend(lxc_health.get('issues', []))
# Check for system-level drive issues
system_health = health_report.get('system_health', {})
if system_health.get('issues'):
issues.extend(system_health['issues'])
# Check for Ceph cluster issues
ceph_health = health_report.get('ceph_health', {})
if ceph_health.get('is_ceph_node'):
hostname = socket.gethostname()
designated_node = self.CONFIG.get('CEPH_TICKET_NODE')
# Cluster-wide issues: only create tickets from designated node (or first node if not set)
# The [cluster-wide] tag ensures deduplication in tinker_tickets API
if ceph_health.get('cluster_wide_issues'):
# If no designated node, all nodes can report (API deduplicates)
# If designated node is set, only that node creates tickets
if not designated_node or hostname == designated_node:
for issue in ceph_health['cluster_wide_issues']:
# Add [cluster-wide] marker for API deduplication
issues.append(f"[cluster-wide] [ceph] {issue}")
else:
logger.debug(f"Skipping cluster-wide Ceph issues (designated node: {designated_node})")
# Node-specific issues: always report from the affected node
if ceph_health.get('issues'):
for issue in ceph_health['issues']:
issues.append(f"[ceph] {issue}")
logger.info("=== Issue Detection Started ===")
logger.info(f"Checking drives: {len(health_report['drives_health']['drives'])} found")
logger.info(f"Memory status: {health_report['memory_health']['status']}")
logger.info(f"CPU status: {health_report['cpu_health']['status']}")
logger.info(f"Network status: {health_report['network_health']}")
logger.info(f"System status: {health_report['system_health']['status']}")
logger.info(f"Detected issues (pre-filter): {issues}")
# Filter out INFO-level issues unless configured to include them
if not self.CONFIG.get('INCLUDE_INFO_TICKETS', False):
actionable_issues = []
for issue in issues:
# Skip INFO-level issues (P5 candidates that shouldn't create tickets)
if any(info_marker in issue.lower() for info_marker in [
'info:', 'info ', 'above optimal', 'monitor only'
]):
logger.debug(f"Filtering INFO-level issue: {issue}")
continue
actionable_issues.append(issue)
issues = actionable_issues
logger.info(f"Filtered to actionable issues: {issues}")
logger.info("=== Issue Detection Completed ===\n")
return issues
# =============================================================================
# DISK AND STORAGE UTILITY METHODS
# =============================================================================
def _get_all_disks(self) -> List[str]:
"""Get all physical disks using multiple detection methods."""
disks = set()
# Method 1: Use lsblk to get physical disks, excluding virtual devices
try:
result = subprocess.run(
['lsblk', '-d', '-n', '-o', 'NAME,TYPE'],
stdout=subprocess.PIPE,
text=True
)
for line in result.stdout.strip().split('\n'):
if line:
parts = line.split()
if len(parts) >= 2:
name, device_type = parts[0], parts[1]
# Only include actual disks, exclude virtual devices
if device_type == 'disk' and not name.startswith('rbd'):
disks.add(f"/dev/{name}")
logger.debug(f"Physical disks found via lsblk: {disks}")
except Exception as e:
logger.debug(f"lsblk detection failed: {e}")
# Method 2: Direct device scanning for physical devices only
for pattern in ['/dev/sd[a-z]', '/dev/nvme[0-9]n[0-9]']:
try:
import glob
matches = glob.glob(pattern)
# Filter out partitions (devices ending in numbers for sd*, already filtered for nvme)
if 'sd' in pattern:
matches = [d for d in matches if not d[-1].isdigit()]
disks.update(matches)
logger.debug(f"Disks found via glob {pattern}: {matches}")
except Exception as e:
logger.debug(f"Glob detection failed for {pattern}: {e}")
return list(disks)
def _is_excluded_mount(self, mountpoint: str) -> bool:
"""Check if a mountpoint should be excluded from monitoring."""
# Check exact matches
if mountpoint in self.CONFIG['EXCLUDED_MOUNTS']:
return True
# Check patterns
for pattern in self.CONFIG['EXCLUDED_PATTERNS']:
if re.match(pattern, mountpoint):
return True
return False
def _format_bytes_human(self, num_bytes):
"""Format a byte count into a human-readable string."""
for unit in ['B', 'KB', 'MB', 'GB', 'TB', 'PB']:
if abs(num_bytes) < 1024.0:
return f"{num_bytes:.1f} {unit}"
num_bytes /= 1024.0
return f"{num_bytes:.1f} EB"
def _parse_size(self, size_str: str) -> float:
"""
Parse size string with units to bytes.
:param size_str: String containing size with unit (e.g. '15.7G', '21.8T')
:return: Size in bytes as float
"""
try:
# Skip non-size strings
if not isinstance(size_str, str):
logger.debug(f"Not a string: {size_str}")
return 0.0
if not any(unit in size_str.upper() for unit in ['B', 'K', 'M', 'G', 'T']):
logger.debug(f"No valid size unit found in: {size_str}")
return 0.0
# Define multipliers for units
multipliers = {
'B': 1,
'K': 1024,
'M': 1024**2,
'G': 1024**3,
'T': 1024**4
}
# Extract numeric value and unit
match = re.match(r'(\d+\.?\d*)', size_str)
if not match:
logger.debug(f"Could not extract numeric value from: {size_str}")
return 0.0
value = float(match.group(1))
unit_match = re.search(r'([BKMGT])', size_str.upper())
if not unit_match:
logger.debug(f"Could not extract unit from: {size_str}")
return 0.0
unit = unit_match.group(1)
# Convert to bytes
bytes_value = value * multipliers.get(unit, 0)
return bytes_value
except (ValueError, AttributeError, TypeError) as e:
logger.debug(f"Failed to parse size string: {size_str}")
logger.debug(f"P**** error details: {str(e)}")
return 0.0
def _is_physical_disk(self, device_path):
"""
Check if the device is a physical disk, excluding logical volumes and special devices.
:param device_path: Path to the device
:return: Boolean indicating if it's a relevant physical disk
"""
logger.debug(f"Checking device: {device_path}")
# Exclude known non-physical or special devices
excluded_patterns = [
r'/dev/mapper/', # LVM devices
r'/dev/dm-', # Device mapper devices
r'/dev/loop', # Loop devices
r'/dev/rbd', # Ceph RBD devices
r'/boot', # Boot partitions
r'/boot/efi', # EFI partitions
r'[0-9]+$' # Partition numbers
]
if any(re.search(pattern, device_path) for pattern in excluded_patterns):
logger.debug(f"Device {device_path} excluded due to pattern match")
return False
# Match physical devices
physical_patterns = [
r'/dev/sd[a-z]+$', # SATA/SAS drives
r'/dev/nvme\d+n\d+$', # NVMe drives
r'/dev/mmcblk\d+$', # MMC/SD cards
r'/dev/hd[a-z]+$' # IDE drives (legacy)
]
is_physical = any(re.match(pattern, device_path) for pattern in physical_patterns)
logger.debug(f"Device {device_path} physical disk check result: {is_physical}")
return is_physical
def _check_disk_firmware(self, device: str) -> Dict[str, Any]:
"""Check disk firmware version against known problematic versions."""
firmware_info = {
'version': None,
'model': None,
'manufacturer': None,
'is_problematic': False,
'known_issues': []
}
MANUFACTURER_PATTERNS = {
'Western Digital': ['WDC', 'Western Digital', 'Ultrastar'],
'Samsung': ['Samsung', 'SAMSUNG'],
'Seagate': ['Seagate', 'ST'],
'Intel': ['Intel', 'INTEL'],
'Micron': ['Micron', 'Crucial'],
'Toshiba': ['Toshiba', 'TOSHIBA']
}
try:
result = subprocess.run(
['smartctl', '-i', device],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
timeout=30
)
model_line = None
for line in result.stdout.split('\n'):
if 'Firmware Version:' in line:
firmware_info['version'] = line.split(':')[1].strip()
elif 'Model Family:' in line:
model_line = line
firmware_info['model'] = line.split(':')[1].strip()
elif 'Device Model:' in line and not firmware_info['model']:
model_line = line
firmware_info['model'] = line.split(':')[1].strip()
# Determine manufacturer
if model_line:
for manufacturer, patterns in MANUFACTURER_PATTERNS.items():
if any(pattern in model_line for pattern in patterns):
firmware_info['manufacturer'] = manufacturer
break
# Check against known problematic versions
if firmware_info['manufacturer'] and firmware_info['model']:
# Check if manufacturer exists in our problematic firmware database
if firmware_info['manufacturer'] in self.PROBLEMATIC_FIRMWARE:
for model, versions in self.PROBLEMATIC_FIRMWARE[firmware_info['manufacturer']].items():
if model in firmware_info['model'] and firmware_info['version'] in versions:
firmware_info['is_problematic'] = True
firmware_info['known_issues'].append(
f"Known problematic firmware version {firmware_info['version']} "
f"for {firmware_info['model']}"
)
logger.debug(f"=== Firmware Check for {device} ===")
logger.debug(f"Firmware version: {firmware_info['version']}")
logger.debug(f"Model: {firmware_info['model']}")
logger.debug(f"Manufacturer: {firmware_info['manufacturer']}")
logger.debug(f"Known issues: {firmware_info['known_issues']}")
logger.debug("=== End Firmware Check ===\n")
except subprocess.TimeoutExpired:
logger.warning(f"smartctl -i timed out for {device}")
except Exception as e:
firmware_info['known_issues'].append(f"Error checking firmware: {str(e)}")
return firmware_info
# =============================================================================
# SMART HEALTH CHECKING METHODS
# =============================================================================
def _parse_smart_value(self, raw_value: str) -> int:
"""Parse SMART values handling different formats including NVMe temperature readings."""
try:
# Handle temperature values with °C
if isinstance(raw_value, str) and '°C' in raw_value:
# Extract only the numeric portion before °C
temp_value = raw_value.split('°C')[0].strip()
return int(temp_value)
# Handle time format (e.g., '15589h+17m+33.939s')
if 'h+' in raw_value:
return int(raw_value.split('h+')[0])
# Handle hex values
if '0x' in raw_value:
return int(raw_value, 16)
# Handle basic numbers
return int(raw_value)
except ValueError:
logger.debug(f"Could not parse SMART value: {raw_value}")
return 0
def _detect_manufacturer(self, model: str, serial: str = None) -> str:
"""Enhanced manufacturer detection based on model and serial patterns."""
if not model:
return 'Unknown'
model_upper = model.upper()
# Western Digital patterns (including HGST which WD acquired)
if any(pattern in model_upper for pattern in ['WDC', 'WD-', 'HGST', 'WESTERN DIGITAL']):
return 'Western Digital'
# Seagate patterns
elif any(pattern in model_upper for pattern in ['ST', 'SEAGATE']):
return 'Seagate'
# Samsung patterns
elif 'SAMSUNG' in model_upper:
return 'Samsung'
# Intel patterns
elif any(pattern in model_upper for pattern in ['INTEL', 'SSDSC']):
return 'Intel'
# Micron/Crucial patterns
elif any(pattern in model_upper for pattern in ['CRUCIAL', 'MICRON', 'CT']):
return 'Micron'
# Toshiba patterns
elif 'TOSHIBA' in model_upper:
return 'Toshiba'
# Ridata/Ritek patterns (for your existing special handling)
elif any(pattern in model_upper for pattern in ['RIDATA', 'RITEK']):
return 'Ridata'
# OOS patterns (for your existing special handling)
elif 'OOS' in model_upper:
return 'OOS'
return 'Unknown'
def _get_manufacturer_profile(self, model: str, manufacturer: str = None, firmware: str = None) -> Dict[str, Any]:
"""Get manufacturer-specific SMART profile based on drive model/manufacturer/firmware."""
logger.debug(f"Looking for profile - Model: '{model}', Manufacturer: '{manufacturer}', Firmware: '{firmware}'")
# First, try to detect manufacturer if not provided
if not manufacturer:
manufacturer = self._detect_manufacturer(model)
logger.debug(f"Auto-detected manufacturer: {manufacturer}")
# Check each manufacturer profile
for mfg, profile in self.MANUFACTURER_SMART_PROFILES.items():
# Check firmware patterns first (most specific for OEM drives like RiData)
if firmware and 'firmware_patterns' in profile:
for pattern in profile['firmware_patterns']:
if firmware.startswith(pattern) or pattern in firmware:
logger.debug(f"Matched manufacturer profile: {mfg} for firmware pattern '{pattern}' in '{firmware}'")
return profile
# Check if detected manufacturer matches this profile
if manufacturer and manufacturer in profile['aliases']:
logger.debug(f"Matched manufacturer profile: {mfg} for detected manufacturer '{manufacturer}'")
return profile
# Check model/manufacturer aliases (fallback)
for alias in profile['aliases']:
if alias.lower() in model.lower() or (manufacturer and alias.lower() in manufacturer.lower()):
logger.debug(f"Matched manufacturer profile: {mfg} for model alias '{alias}' in '{model}'")
return profile
# Return generic profile if no match
logger.debug(f"No specific profile found for Model: '{model}', Manufacturer: '{manufacturer}', Firmware: '{firmware}', using Generic profile")
return self.MANUFACTURER_SMART_PROFILES['Generic']
def _should_monitor_attribute(self, attr_name: str, manufacturer_profile: dict) -> bool:
"""Check if an attribute should be monitored based on manufacturer profile."""
if not manufacturer_profile:
return True # Default: monitor everything
attr_config = manufacturer_profile.get('attributes', {}).get(attr_name, {})
# Check if explicitly set to not monitor
if attr_config.get('monitor') is False:
logger.debug(f"Skipping monitoring for {attr_name} - explicitly disabled")
return False
return True # Default: monitor unless explicitly disabled
def _get_attribute_thresholds(self, attr_name: str, manufacturer_profile: dict) -> dict:
"""Get attribute-specific thresholds, falling back to defaults."""
# Check for manufacturer-specific thresholds first
if manufacturer_profile:
attr_config = manufacturer_profile.get('attributes', {}).get(attr_name, {})
if 'warning_threshold' in attr_config and 'critical_threshold' in attr_config:
return {
'warning': attr_config['warning_threshold'],
'critical': attr_config['critical_threshold'],
'behavior': attr_config.get('behavior', 'countup')
}
# Enhanced BASE_SMART_THRESHOLDS with manufacturer-specific handling
BASE_SMART_THRESHOLDS = {
'Reallocated_Sector_Ct': {'warning': 5, 'critical': 10},
'Current_Pending_Sector': {'warning': 1, 'critical': 5},
'Offline_Uncorrectable': {'warning': 1, 'critical': 2},
'Reported_Uncorrect': {'warning': 1, 'critical': 10},
'Spin_Retry_Count': {'warning': 1, 'critical': 5},
'Power_Cycle_Count': {'warning': 5000, 'critical': 10000},
'Power_On_Hours': {'warning': 61320, 'critical': 70080},
'Temperature_Celsius': {'warning': 65, 'critical': 75},
'Available_Spare': {'warning': 30, 'critical': 10},
'Program_Fail_Count': {'warning': 10, 'critical': 20},
'Erase_Fail_Count': {'warning': 10, 'critical': 20},
'Load_Cycle_Count': {'warning': 900000, 'critical': 1000000},
'SSD_Life_Left': {'warning': 30, 'critical': 10},
'Program_Fail_Cnt_Total': {'warning': 1, 'critical': 5},
'Erase_Fail_Count_Total': {'warning': 1, 'critical': 5},
# ADJUSTED: More lenient thresholds for error rates on unknown drives
'Raw_Read_Error_Rate': {'warning': 10000000, 'critical': 100000000}, # Raised significantly
'Seek_Error_Rate': {'warning': 10000000, 'critical': 100000000}, # Raised significantly
'Command_Timeout': {'warning': 100, 'critical': 1000}, # Raised significantly
'High_Fly_Writes': {'warning': 1, 'critical': 5},
'Airflow_Temperature_Cel': {'warning': 65, 'critical': 75},
'G_Sense_Error_Rate': {'warning': 100, 'critical': 1000},
'Power-Off_Retract_Count': {'warning': 100000, 'critical': 500000},
'Head_Flying_Hours': {'warning': 50000, 'critical': 70000},
'Runtime_Bad_Block': {'warning': 10, 'critical': 100},
'Factory_Bad_Block_Ct': {'warning': 50, 'critical': 200},
'Grown_Failing_Block_Ct': {'warning': 10, 'critical': 50},
'End-to-End_Error': {'warning': 1, 'critical': 5}
}
if attr_name in BASE_SMART_THRESHOLDS:
return {
'warning': BASE_SMART_THRESHOLDS[attr_name]['warning'],
'critical': BASE_SMART_THRESHOLDS[attr_name]['critical'],
'behavior': 'countup'
}
return None # No thresholds defined
def _is_new_drive(self, power_on_hours: int) -> bool:
"""Determine if a drive is considered "new" based on power-on hours."""
return power_on_hours < self.CONFIG['NEW_DRIVE_HOURS_THRESHOLD']
def _check_smart_health(self, device: str) -> Dict[str, Any]:
"""Enhanced SMART health check with better error handling and predictive analysis."""
smart_health = {
'status': 'UNKNOWN',
'severity': 'NORMAL',
'issues': [],
'temp': None,
'attributes': {},
'manufacturer_profile': None
}
try:
# Skip virtual devices
if '/dev/rbd' in device or '/dev/dm-' in device or '/dev/mapper/' in device:
smart_health['status'] = 'NOT_SUPPORTED'
smart_health['issues'].append("Virtual device - SMART not applicable")
return smart_health
# First verify the device is SMART-capable
drive_details = self._get_drive_details(device)
if not drive_details.get('smart_capable', False):
smart_health['status'] = 'NOT_SUPPORTED'
smart_health['issues'].append("SMART not supported on this device")
return smart_health
# Special handling for NVMe devices
if 'nvme' in device:
return self._check_nvme_smart_health(device)
# If we have no model info, the device might not be responding properly
if not drive_details.get('model'):
smart_health['status'] = 'ERROR'
smart_health['issues'].append("Unable to read device information")
return smart_health
# Skip Ridata drives entirely - unreliable and being replaced
manufacturer = self._detect_manufacturer(drive_details.get('model', ''))
if manufacturer == 'Ridata':
smart_health['status'] = 'SKIPPED'
smart_health['issues'].append("Ridata drive - monitoring disabled (unreliable hardware)")
logger.debug(f"Skipping SMART monitoring for Ridata drive {device}")
return smart_health
logger.debug(f"Drive details for {device}: {drive_details}")
manufacturer_profile = self._get_manufacturer_profile(
drive_details.get('model', ''),
drive_details.get('manufacturer', ''),
drive_details.get('firmware', '')
)
smart_health['manufacturer_profile'] = manufacturer_profile
logger.debug(f"Selected manufacturer profile for {device}: {manufacturer_profile.get('aliases', ['Unknown'])[0] if manufacturer_profile else 'None'}")
# Get firmware information
firmware_info = self._check_disk_firmware(device)
if firmware_info['is_problematic']:
smart_health['severity'] = 'WARNING'
smart_health['issues'].extend(firmware_info['known_issues'])
# Get detailed SMART data with timeout
result = subprocess.run(
['smartctl', '-A', '-H', '-l', 'error', '-l', 'background', device],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
timeout=30
)
output = result.stdout
# Check overall health status
if 'FAILED' in output and 'PASSED' not in output:
smart_health['status'] = 'UNHEALTHY'
smart_health['severity'] = 'CRITICAL'
smart_health['issues'].append("SMART overall health check failed")
elif 'PASSED' in output:
smart_health['status'] = 'HEALTHY'
else:
smart_health['status'] = 'UNKNOWN'
# Parse SMART attributes with manufacturer-specific handling
power_on_hours = 0
# First pass: collect all SMART attributes with priority for _Total versions
smart_attributes_raw = {}
for line in output.split('\n'):
# Extract Power_On_Hours first to determine if drive is new
if 'Power_On_Hours' in line:
parts = line.split()
if len(parts) >= 10:
power_on_hours = self._parse_smart_value(parts[9])
smart_attributes_raw['Power_On_Hours'] = power_on_hours
# Handle SMART attributes with preference for _Total versions
for attr in ['Erase_Fail_Count', 'Program_Fail_Count']:
# Check for _Total version first (more accurate)
if f'{attr}_Total' in line:
parts = line.split()
if len(parts) >= 10:
raw_value = self._parse_smart_value(parts[9])
smart_attributes_raw[f'{attr}_Total'] = raw_value # Store as _Total
logger.debug(f"Found {attr}_Total: {raw_value}")
break
# Only use non-_Total version if _Total not found AND not Ridata
elif attr in line and f'{attr}_Total' not in smart_attributes_raw:
# Check if this is a Ridata drive and should skip regular counters
if manufacturer_profile and manufacturer_profile.get('aliases', [{}])[0] == 'Ridata':
logger.debug(f"Skipping {attr} for Ridata drive - using _Total version only")
continue
parts = line.split()
if len(parts) >= 10:
raw_value = self._parse_smart_value(parts[9])
smart_attributes_raw[attr] = raw_value
logger.debug(f"Found {attr} (non-Total): {raw_value}")
smart_health['attributes'] = smart_attributes_raw
# Check if this is a new drive
is_new_drive = self._is_new_drive(power_on_hours)
logger.debug(f"Drive {device} power-on hours: {power_on_hours}, is_new_drive: {is_new_drive}")
# Parse remaining SMART attributes
for line in output.split('\n'):
# Handle manufacturer-specific Wear_Leveling_Count
if 'Wear_Leveling_Count' in line:
parts = line.split()
if len(parts) >= 10:
raw_value = self._parse_smart_value(parts[9])
smart_health['attributes']['Wear_Leveling_Count'] = raw_value
# Get manufacturer-specific thresholds
wear_attr = manufacturer_profile.get('attributes', {}).get('Wear_Leveling_Count', {})
# Skip evaluation if this is a new drive and manufacturer profile says to ignore
if is_new_drive and wear_attr.get('ignore_on_new_drive', False):
logger.debug(f"Skipping Wear_Leveling_Count evaluation for new drive: {raw_value}")
continue
warning_threshold = wear_attr.get('warning_threshold')
critical_threshold = wear_attr.get('critical_threshold')
if warning_threshold and critical_threshold:
behavior = wear_attr.get('behavior', 'countup')
if behavior == 'countup':
if raw_value >= critical_threshold:
smart_health['severity'] = 'CRITICAL'
smart_health['issues'].append(f"Critical wear leveling count: {raw_value}")
elif raw_value >= warning_threshold:
if smart_health['severity'] != 'CRITICAL':
smart_health['severity'] = 'WARNING'
smart_health['issues'].append(f"High wear leveling count: {raw_value}")
elif behavior == 'countdown':
if raw_value <= critical_threshold:
smart_health['severity'] = 'CRITICAL'
smart_health['issues'].append(f"Critical wear leveling remaining: {raw_value}")
elif raw_value <= warning_threshold:
if smart_health['severity'] != 'CRITICAL':
smart_health['severity'] = 'WARNING'
smart_health['issues'].append(f"Low wear leveling remaining: {raw_value}")
# Handle all SMART attributes with manufacturer-specific logic
ALL_SMART_ATTRIBUTES = [
'Reallocated_Sector_Ct', 'Current_Pending_Sector', 'Offline_Uncorrectable',
'Reported_Uncorrect', 'Spin_Retry_Count', 'Power_Cycle_Count', 'Power_On_Hours',
'Temperature_Celsius', 'Available_Spare', 'Program_Fail_Count', 'Erase_Fail_Count',
'Load_Cycle_Count', 'SSD_Life_Left', 'Program_Fail_Cnt_Total', 'Erase_Fail_Count_Total',
'Program_Fail_Count_Chip', 'Erase_Fail_Count_Chip',
'Raw_Read_Error_Rate', 'Seek_Error_Rate', 'Command_Timeout', 'High_Fly_Writes',
'Airflow_Temperature_Cel', 'G_Sense_Error_Rate', 'Power-Off_Retract_Count',
'Head_Flying_Hours', 'Runtime_Bad_Block', 'Factory_Bad_Block_Ct',
'Grown_Failing_Block_Ct', 'End-to-End_Error'
]
for line in output.split('\n'):
for attr in ALL_SMART_ATTRIBUTES:
if attr in line and attr not in ['Wear_Leveling_Count']: # Wear_Leveling handled separately above
# Check if we should monitor this attribute
if not self._should_monitor_attribute(attr, manufacturer_profile):
logger.debug(f"Skipping {attr} - disabled for this manufacturer")
continue
parts = line.split()
if len(parts) >= 10:
raw_value = self._parse_smart_value(parts[9])
smart_health['attributes'][attr] = raw_value
# Get manufacturer-specific or default thresholds
attr_thresholds = self._get_attribute_thresholds(attr, manufacturer_profile)
if not attr_thresholds:
continue
# Apply thresholds based on behavior
if attr == 'Temperature_Celsius':
smart_health['temp'] = raw_value
if raw_value >= attr_thresholds['critical']:
smart_health['severity'] = 'CRITICAL'
smart_health['issues'].append(f"Critical temperature: {raw_value}°C")
elif raw_value >= attr_thresholds['warning']:
if smart_health['severity'] != 'CRITICAL':
smart_health['severity'] = 'WARNING'
smart_health['issues'].append(f"High temperature: {raw_value}°C")
else:
# Handle countup/countdown behavior
behavior = attr_thresholds.get('behavior', 'countup')
if behavior == 'countup':
if raw_value >= attr_thresholds['critical']:
smart_health['severity'] = 'CRITICAL'
smart_health['issues'].append(f"Critical {attr}: {raw_value}")
elif raw_value >= attr_thresholds['warning']:
if smart_health['severity'] != 'CRITICAL':
smart_health['severity'] = 'WARNING'
smart_health['issues'].append(f"Warning {attr}: {raw_value}")
elif behavior == 'countdown':
if raw_value <= attr_thresholds['critical']:
smart_health['severity'] = 'CRITICAL'
smart_health['issues'].append(f"Critical {attr}: {raw_value}")
elif raw_value <= attr_thresholds['warning']:
if smart_health['severity'] != 'CRITICAL':
smart_health['severity'] = 'WARNING'
smart_health['issues'].append(f"Warning {attr}: {raw_value}")
# Check for recent SMART errors
error_log_pattern = r"Error \d+ occurred at disk power-on lifetime: (\d+) hours"
error_matches = re.finditer(error_log_pattern, output)
recent_errors = []
for match in error_matches:
error_hour = int(match.group(1))
current_hours = smart_health['attributes'].get('Power_On_Hours', 0)
if current_hours - error_hour < self.CONFIG['SMART_ERROR_RECENT_HOURS']:
recent_errors.append(match.group(0))
if recent_errors:
smart_health['severity'] = 'WARNING'
smart_health['issues'].extend(recent_errors)
# Enhanced analysis methods
if smart_health['attributes']:
# Trend analysis for predictive failure detection
trend_issues = self._analyze_smart_trends(device, smart_health['attributes'])
smart_health['issues'].extend(trend_issues)
# SSD-specific checks
drive_type = drive_details.get('type', 'HDD')
if drive_type == 'SSD':
ssd_issues = self._check_ssd_health(device, smart_health['attributes'])
smart_health['issues'].extend(ssd_issues)
# Enhanced temperature analysis
if smart_health['temp']:
drive_type = drive_details.get('type', 'HDD')
thermal_issues = self._check_thermal_health(device, smart_health['temp'], drive_type)
smart_health['issues'].extend(thermal_issues)
# Error pattern analysis
error_pattern_issues = self._analyze_error_patterns(device, output)
smart_health['issues'].extend(error_pattern_issues)
logger.debug(f"=== SMART Health Check for {device} ===")
logger.debug(f"Manufacturer profile: {manufacturer_profile.get('aliases', ['Unknown'])[0] if manufacturer_profile else 'None'}")
logger.debug("Raw SMART attributes:")
for attr, value in smart_health['attributes'].items():
logger.debug(f"{attr}: {value}")
logger.debug(f"Temperature: {smart_health['temp']}°C")
logger.debug(f"Is new drive: {is_new_drive}")
logger.debug(f"Detected Issues: {smart_health['issues']}")
logger.debug("=== End SMART Check ===\n")
# Special handling for NVMe drives
if 'nvme' in device:
try:
nvme_result = subprocess.run(
['nvme', 'smart-log', device],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
timeout=10
)
logger.debug(f"NVMe smart-log raw output for {device}:")
logger.debug(nvme_result.stdout)
# Initialize the temperature attribute
if smart_health['temp'] is None:
smart_health['attributes']['Temperature_Celsius'] = None
for line in nvme_result.stdout.split('\n'):
# Fix the NoneType error by checking if line exists and has content
if line and line.strip() and 'temperature' in line.lower():
try:
temp_str = line.split(':')[1].strip() if ':' in line else line.strip()
logger.debug(f"Raw temperature string: {temp_str}")
# Extract the first complete number from temperature string
temp_match = re.search(r'(\d+)', temp_str)
if temp_match:
temp_value = int(temp_match.group(1))
logger.debug(f"Parsed temperature value: {temp_value}")
# Set both temperature fields
smart_health['temp'] = temp_value
smart_health['attributes']['Temperature_Celsius'] = temp_value
logger.debug(f"Final temperature recorded: {smart_health['temp']}")
break
except (ValueError, IndexError, AttributeError) as e:
logger.debug(f"Error parsing NVMe temperature from line '{line}': {e}")
continue
except subprocess.TimeoutExpired:
logger.debug(f"NVMe smart-log for {device} timed out")
except Exception as e:
logger.debug(f"Error getting NVMe smart data for {device}: {e}")
except subprocess.TimeoutExpired:
smart_health['status'] = 'ERROR'
smart_health['issues'].append("SMART check timed out")
except Exception as e:
smart_health['status'] = 'ERROR'
smart_health['severity'] = 'UNKNOWN'
smart_health['issues'].append(f"Error checking SMART: {str(e)}")
logger.debug(f"Exception in _check_smart_health for {device}: {e}")
import traceback
logger.debug(traceback.format_exc())
return smart_health
def _check_nvme_smart_health(self, device: str) -> Dict[str, Any]:
"""Dedicated NVMe SMART health check."""
smart_health = {
'status': 'UNKNOWN',
'severity': 'NORMAL',
'issues': [],
'temp': None,
'attributes': {},
'manufacturer_profile': None
}
try:
# Use nvme-cli for NVMe devices
result = subprocess.run(
['nvme', 'smart-log', device],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
timeout=30
)
if result.returncode == 0:
smart_health['status'] = 'HEALTHY'
# Parse NVMe smart log output
for line in result.stdout.split('\n'):
if 'temperature' in line.lower():
# Extract temperature
temp_match = re.search(r'(\d+)', line)
if temp_match:
smart_health['temp'] = int(temp_match.group(1))
smart_health['attributes']['Temperature_Celsius'] = smart_health['temp']
elif 'available_spare' in line.lower():
spare_match = re.search(r'(\d+)%', line)
if spare_match:
spare_pct = int(spare_match.group(1))
smart_health['attributes']['Available_Spare'] = spare_pct
if spare_pct < 10:
smart_health['severity'] = 'CRITICAL'
smart_health['issues'].append(f"Critical Available_Spare: {spare_pct}%")
elif spare_pct < 30:
smart_health['severity'] = 'WARNING'
smart_health['issues'].append(f"Low Available_Spare: {spare_pct}%")
# Enhanced NVMe analysis
if smart_health['attributes']:
# Trend analysis for NVMe devices
trend_issues = self._analyze_smart_trends(device, smart_health['attributes'])
smart_health['issues'].extend(trend_issues)
# SSD-specific checks for NVMe
ssd_issues = self._check_ssd_health(device, smart_health['attributes'])
smart_health['issues'].extend(ssd_issues)
# Enhanced temperature analysis for NVMe
if smart_health['temp']:
thermal_issues = self._check_thermal_health(device, smart_health['temp'], 'SSD')
smart_health['issues'].extend(thermal_issues)
else:
smart_health['status'] = 'ERROR'
smart_health['issues'].append("Failed to read NVMe SMART data")
except subprocess.TimeoutExpired:
smart_health['status'] = 'ERROR'
smart_health['issues'].append("NVMe SMART check timed out")
except Exception as e:
smart_health['status'] = 'ERROR'
smart_health['issues'].append(f"Error checking NVMe SMART: {str(e)}")
return smart_health
def _check_drives_health(self) -> Dict[str, Any]:
"""Check health of all drives in the system."""
drives_health = {'overall_status': 'NORMAL', 'drives': []}
try:
# Get only valid physical disks
physical_disks = self._get_all_disks()
logger.debug(f"Checking physical disks: {physical_disks}")
if not physical_disks:
logger.warning("No valid physical disks found for monitoring")
drives_health['overall_status'] = 'WARNING'
return drives_health
# Get ALL partition information including device mapper
partitions = psutil.disk_partitions(all=True)
# Create mapping of base devices to their partitions
device_partitions = {}
for part in partitions:
# Extract base device (e.g., /dev/sda from /dev/sda1)
base_device = re.match(r'(/dev/[a-z]+)', part.device)
if base_device:
base_dev = base_device.group(1)
if base_dev not in device_partitions:
device_partitions[base_dev] = []
device_partitions[base_dev].append(part)
overall_status = 'NORMAL'
for disk in physical_disks:
drive_report = {
'device': disk,
'partitions': [],
'smart_status': 'UNKNOWN',
'usage_percent': 0
}
# Add partition information if available
if disk in device_partitions:
total_used = 0
total_space = 0
for partition in device_partitions[disk]:
try:
usage = psutil.disk_usage(partition.mountpoint)
total_used += usage.used
total_space += usage.total
part_info = {
'device': partition.device,
'mountpoint': partition.mountpoint,
'fstype': partition.fstype,
'total_space': self._convert_bytes(usage.total),
'used_space': self._convert_bytes(usage.used),
'free_space': self._convert_bytes(usage.free),
'usage_percent': usage.percent
}
drive_report['partitions'].append(part_info)
except Exception as e:
logger.debug(f"Error getting partition usage for {partition.device}: {e}")
# Calculate overall drive usage percentage
if total_space > 0:
drive_report['usage_percent'] = (total_used / total_space) * 100
# Check SMART health
smart_health = self._check_smart_health(disk)
drive_report.update({
'smart_status': smart_health['status'],
'smart_issues': smart_health['issues'],
'temperature': smart_health['temp'],
'smart_attributes': smart_health['attributes']
})
# Only report issues for drives that should be monitored
if smart_health['status'] == 'UNHEALTHY':
overall_status = 'CRITICAL'
elif smart_health['status'] == 'ERROR':
# Don't escalate overall status for ERROR drives (might be virtual)
logger.debug(f"Drive {disk} returned ERROR status, skipping from issue detection")
elif smart_health['issues'] and smart_health['status'] not in ['ERROR', 'NOT_SUPPORTED']:
if overall_status != 'CRITICAL':
overall_status = 'WARNING'
drives_health['drives'].append(drive_report)
drives_health['overall_status'] = overall_status
except Exception as e:
logger.error(f"Error checking drives health: {str(e)}")
return drives_health
# =============================================================================
# SYSTEM HEALTH CHECKING METHODS
# =============================================================================
@staticmethod
def _convert_bytes(bytes_value: int, suffix: str = 'B') -> str:
"""
Convert bytes to a human-readable format.
:param bytes_value: Number of bytes to convert.
:param suffix: Suffix to append (default is 'B' for bytes).
:return: Formatted string with the size in human-readable form.
"""
for unit in ['', 'K', 'M', 'G', 'T', 'P', 'E', 'Z']:
if abs(bytes_value) < 1024.0:
return f"{bytes_value:.1f}{unit}{suffix}"
bytes_value /= 1024.0
return f"{bytes_value:.1f}Y{suffix}"
def _convert_size_to_bytes(self, size_str: str) -> float:
"""Convert size string with units to bytes."""
units = {'B': 1, 'K': 1024, 'M': 1024**2, 'G': 1024**3, 'T': 1024**4}
size = float(size_str[:-1])
unit = size_str[-1].upper()
return size * units[unit]
def _check_memory_usage(self) -> Dict[str, Any]:
"""Check for ECC memory errors if ECC memory is present."""
memory_health = {
'has_ecc': False,
'ecc_errors': [],
'status': 'OK',
'total_memory': self._convert_bytes(psutil.virtual_memory().total),
'used_memory': self._convert_bytes(psutil.virtual_memory().used),
'memory_percent': psutil.virtual_memory().percent
}
try:
# First check using dmidecode
result = subprocess.run(
['dmidecode', '--type', 'memory'],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
timeout=30
)
if 'Error Correction Type: Multi-bit ECC' in result.stdout:
memory_health['has_ecc'] = True
# If dmidecode didn't find ECC, try the edac method as backup
if not memory_health['has_ecc']:
edac_path = '/sys/devices/system/edac/mc'
if os.path.exists(edac_path) and os.listdir(edac_path):
for mc_dir in glob.glob('/sys/devices/system/edac/mc/mc[0-9]*'):
if os.path.exists(f"{mc_dir}/csrow0"):
memory_health['has_ecc'] = True
break
# If ECC is present, check for errors
if memory_health['has_ecc']:
for mc_dir in glob.glob('/sys/devices/system/edac/mc/mc[0-9]*'):
if os.path.exists(f"{mc_dir}/csrow0"):
ue_count = self._read_ecc_count(f"{mc_dir}/csrow0/ue_count")
if ue_count > 0:
memory_health['status'] = 'CRITICAL'
memory_health['ecc_errors'].append(
f"Uncorrectable ECC errors detected in {os.path.basename(mc_dir)}: {ue_count}"
)
ce_count = self._read_ecc_count(f"{mc_dir}/csrow0/ce_count")
if ce_count > 0:
if memory_health['status'] != 'CRITICAL':
memory_health['status'] = 'WARNING'
memory_health['ecc_errors'].append(
f"Correctable ECC errors detected in {os.path.basename(mc_dir)}: {ce_count}"
)
except Exception as e:
memory_health['status'] = 'ERROR'
memory_health['ecc_errors'].append(f"Error checking ECC status: {str(e)}")
return memory_health
def _read_ecc_count(self, filepath: str) -> int:
"""
Read ECC error count from a file.
:param filepath: Path to the ECC count file
:return: Number of ECC errors
"""
try:
with open(filepath, 'r') as f:
return int(f.read().strip())
except (IOError, OSError, ValueError) as e:
logger.debug(f"Could not read ECC count from {filepath}: {e}")
return 0
def _check_cpu_usage(self) -> Dict[str, Any]:
"""
Check CPU usage and return health metrics.
:return: Dictionary with CPU health metrics.
"""
cpu_usage_percent = psutil.cpu_percent(interval=1)
cpu_health = {
'cpu_usage_percent': cpu_usage_percent,
'status': 'OK' if cpu_usage_percent < self.CONFIG['THRESHOLDS']['CPU_WARNING'] else 'WARNING'
}
return cpu_health
def _check_network_status(self) -> Dict[str, Any]:
"""
Check the status of network interfaces and report any issues.
:return: Dictionary containing network health metrics and any issues found.
"""
network_health = {
'management_network': {
'issues': [],
'status': 'OK',
'latency': None
},
'ceph_network': {
'issues': [],
'status': 'OK',
'latency': None
}
}
try:
# Check management network connectivity
mgmt_result = subprocess.run(
[
"ping",
"-c", str(self.CONFIG['NETWORKS']['PING_COUNT']),
"-W", str(self.CONFIG['NETWORKS']['PING_TIMEOUT']),
self.CONFIG['NETWORKS']['MANAGEMENT']
],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
timeout=30 # 30 second timeout for subprocess
)
if mgmt_result.returncode != 0:
network_health['management_network']['status'] = 'CRITICAL'
network_health['management_network']['issues'].append(
"Management network is unreachable"
)
# Check Ceph network connectivity
ceph_result = subprocess.run(
[
"ping",
"-c", str(self.CONFIG['NETWORKS']['PING_COUNT']),
"-W", str(self.CONFIG['NETWORKS']['PING_TIMEOUT']),
self.CONFIG['NETWORKS']['CEPH']
],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
timeout=30 # 30 second timeout for subprocess
)
if ceph_result.returncode != 0:
network_health['ceph_network']['status'] = 'CRITICAL'
network_health['ceph_network']['issues'].append(
"Ceph network is unreachable"
)
return network_health
except Exception as e:
logger.error(f"Network health check failed: {e}")
return {
'status': 'ERROR',
'error': str(e)
}
def _check_ceph_health(self) -> Dict[str, Any]:
"""
Check Ceph cluster health if this node is part of a Ceph cluster.
Returns health status, cluster info, and any issues detected.
Cluster-wide issues use [cluster-wide] tag for cross-node deduplication.
"""
import shutil
ceph_health = {
'status': 'OK',
'is_ceph_node': False,
'cluster_health': None,
'cluster_usage': None,
'osd_status': [],
'mon_status': [],
'issues': [],
'cluster_wide_issues': [] # Issues that apply to entire cluster
}
# Check if Ceph monitoring is enabled
if not self.CONFIG.get('CEPH_ENABLED', True):
logger.debug("Ceph monitoring disabled in config")
return ceph_health
# Check if ceph CLI is available
if not shutil.which('ceph'):
logger.debug("Ceph CLI not found - not a Ceph node")
return ceph_health
ceph_health['is_ceph_node'] = True
hostname = socket.gethostname()
try:
# Get cluster health status
health_result = subprocess.run(
['ceph', 'health', '--format=json'],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
timeout=30
)
if health_result.returncode == 0:
try:
health_data = json.loads(health_result.stdout)
ceph_health['cluster_health'] = health_data.get('status', 'UNKNOWN')
# Check cluster health status
if ceph_health['cluster_health'] == 'HEALTH_ERR':
ceph_health['status'] = 'CRITICAL'
# This is a cluster-wide issue
ceph_health['cluster_wide_issues'].append(
f"Ceph cluster HEALTH_ERR: {health_data.get('summary', {}).get('message', 'Unknown error')}"
)
elif ceph_health['cluster_health'] == 'HEALTH_WARN':
if ceph_health['status'] != 'CRITICAL':
ceph_health['status'] = 'WARNING'
# Extract warning messages
checks = health_data.get('checks', {})
for check_name, check_data in checks.items():
severity = check_data.get('severity', 'HEALTH_WARN')
message = check_data.get('summary', {}).get('message', check_name)
ceph_health['cluster_wide_issues'].append(
f"Ceph HEALTH_WARN: {message}"
)
except json.JSONDecodeError as e:
logger.warning(f"Failed to parse ceph health JSON: {e}")
# Get cluster usage (ceph df)
df_result = subprocess.run(
['ceph', 'df', '--format=json'],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
timeout=30
)
if df_result.returncode == 0:
try:
df_data = json.loads(df_result.stdout)
stats = df_data.get('stats', {})
total_bytes = stats.get('total_bytes', 0)
total_used = stats.get('total_used_raw_bytes', 0)
if total_bytes > 0:
usage_percent = (total_used / total_bytes) * 100
ceph_health['cluster_usage'] = {
'total_bytes': total_bytes,
'used_bytes': total_used,
'usage_percent': round(usage_percent, 2)
}
# Check usage thresholds
if usage_percent >= self.CONFIG.get('CEPH_USAGE_CRITICAL', 85):
ceph_health['status'] = 'CRITICAL'
ceph_health['cluster_wide_issues'].append(
f"Ceph cluster usage critical: {usage_percent:.1f}%"
)
elif usage_percent >= self.CONFIG.get('CEPH_USAGE_WARNING', 70):
if ceph_health['status'] != 'CRITICAL':
ceph_health['status'] = 'WARNING'
ceph_health['cluster_wide_issues'].append(
f"Ceph cluster usage warning: {usage_percent:.1f}%"
)
except json.JSONDecodeError as e:
logger.warning(f"Failed to parse ceph df JSON: {e}")
# Get OSD status (check for down OSDs on this node)
osd_result = subprocess.run(
['ceph', 'osd', 'tree', '--format=json'],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
timeout=30
)
if osd_result.returncode == 0:
try:
osd_data = json.loads(osd_result.stdout)
nodes = osd_data.get('nodes', [])
# Find OSDs on this host
host_id = None
for node in nodes:
if node.get('type') == 'host' and node.get('name') == hostname:
host_id = node.get('id')
break
# Check OSD status for this host
for node in nodes:
if node.get('type') == 'osd':
osd_info = {
'id': node.get('id'),
'name': node.get('name'),
'status': node.get('status', 'unknown'),
'reweight': node.get('reweight', 1.0)
}
# Check if OSD belongs to this host (by checking parent in tree)
# Simplified: just track all OSDs for now
ceph_health['osd_status'].append(osd_info)
# Check for down OSDs - this is a cluster-wide issue
# All nodes see the same OSD down, so treat as cluster-wide
if node.get('status') == 'down':
ceph_health['status'] = 'CRITICAL'
# Cluster-wide issue - OSD down affects entire cluster
# Do NOT include detecting hostname in message to enable deduplication
ceph_health['cluster_wide_issues'].append(
f"Ceph OSD {node.get('name')} is DOWN"
)
except json.JSONDecodeError as e:
logger.warning(f"Failed to parse ceph osd tree JSON: {e}")
# Get monitor status
mon_result = subprocess.run(
['ceph', 'mon', 'stat', '--format=json'],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
timeout=30
)
if mon_result.returncode == 0:
try:
mon_data = json.loads(mon_result.stdout)
ceph_health['mon_status'] = {
'quorum': mon_data.get('quorum', []),
'quorum_names': mon_data.get('quorum_names', [])
}
except json.JSONDecodeError as e:
logger.warning(f"Failed to parse ceph mon stat JSON: {e}")
logger.debug(f"=== Ceph Health Check ===")
logger.debug(f"Is Ceph node: {ceph_health['is_ceph_node']}")
logger.debug(f"Cluster health: {ceph_health['cluster_health']}")
logger.debug(f"Cluster usage: {ceph_health['cluster_usage']}")
logger.debug(f"Status: {ceph_health['status']}")
logger.debug(f"Issues: {ceph_health['issues']}")
logger.debug(f"Cluster-wide issues: {ceph_health['cluster_wide_issues']}")
logger.debug("=== End Ceph Health Check ===")
except subprocess.TimeoutExpired:
ceph_health['status'] = 'ERROR'
ceph_health['issues'].append("Ceph health check timed out")
except Exception as e:
ceph_health['status'] = 'ERROR'
ceph_health['issues'].append(f"Error checking Ceph health: {str(e)}")
logger.error(f"Ceph health check failed: {e}")
return ceph_health
# =============================================================================
# PROMETHEUS METRICS EXPORT
# =============================================================================
def export_prometheus_metrics(self, health_report: Dict[str, Any]) -> str:
"""
Export health report as Prometheus metrics in text format.
Metrics follow Prometheus naming conventions:
- hwmon_* prefix for all metrics
- Labels for dimensions (device, hostname, container, etc.)
Returns:
str: Prometheus text format metrics
"""
hostname = health_report.get('hostname', socket.gethostname())
metrics = []
# Helper to format labels with proper Prometheus escaping
def labels(**kwargs) -> str:
def escape(value):
return str(value).replace('\\', '\\\\').replace('"', '\\"').replace('\n', '\\n')
pairs = [f'{k}="{escape(v)}"' for k, v in kwargs.items() if v is not None]
return '{' + ','.join(pairs) + '}' if pairs else ''
# === System Info ===
metrics.append(f'# HELP hwmon_info System information')
metrics.append(f'# TYPE hwmon_info gauge')
metrics.append(f'hwmon_info{labels(hostname=hostname)} 1')
# === Drive Metrics ===
metrics.append(f'# HELP hwmon_drive_smart_healthy SMART health status (1=healthy, 0=unhealthy)')
metrics.append(f'# TYPE hwmon_drive_smart_healthy gauge')
metrics.append(f'# HELP hwmon_drive_temperature_celsius Drive temperature in Celsius')
metrics.append(f'# TYPE hwmon_drive_temperature_celsius gauge')
metrics.append(f'# HELP hwmon_drive_size_bytes Drive total size in bytes')
metrics.append(f'# TYPE hwmon_drive_size_bytes gauge')
metrics.append(f'# HELP hwmon_drive_smart_issues_total Number of SMART issues detected')
metrics.append(f'# TYPE hwmon_drive_smart_issues_total gauge')
for drive in health_report.get('drives_health', {}).get('drives', []):
device = drive.get('device', 'unknown')
drive_labels = labels(hostname=hostname, device=device)
# SMART health status
smart_status = drive.get('smart_status', 'UNKNOWN')
healthy = 1 if smart_status == 'HEALTHY' else 0
metrics.append(f'hwmon_drive_smart_healthy{drive_labels} {healthy}')
# Temperature
if drive.get('temperature'):
metrics.append(f'hwmon_drive_temperature_celsius{drive_labels} {drive["temperature"]}')
# Drive size (convert human-readable to bytes if possible)
if drive.get('capacity'):
capacity_bytes = self._parse_size_to_bytes(drive['capacity'])
if capacity_bytes:
metrics.append(f'hwmon_drive_size_bytes{drive_labels} {capacity_bytes}')
# Issue count
issues_count = len(drive.get('smart_issues', []))
metrics.append(f'hwmon_drive_smart_issues_total{drive_labels} {issues_count}')
# === CPU Metrics ===
cpu = health_report.get('cpu_health', {})
metrics.append(f'# HELP hwmon_cpu_usage_percent CPU usage percentage')
metrics.append(f'# TYPE hwmon_cpu_usage_percent gauge')
if cpu.get('cpu_usage_percent') is not None:
metrics.append(f'hwmon_cpu_usage_percent{labels(hostname=hostname)} {cpu["cpu_usage_percent"]}')
# === Memory Metrics ===
mem = health_report.get('memory_health', {})
metrics.append(f'# HELP hwmon_memory_usage_percent Memory usage percentage')
metrics.append(f'# TYPE hwmon_memory_usage_percent gauge')
if mem.get('memory_percent') is not None:
metrics.append(f'hwmon_memory_usage_percent{labels(hostname=hostname)} {mem["memory_percent"]}')
metrics.append(f'# HELP hwmon_memory_has_ecc Whether ECC memory is present (1=yes, 0=no)')
metrics.append(f'# TYPE hwmon_memory_has_ecc gauge')
has_ecc = 1 if mem.get('has_ecc') else 0
metrics.append(f'hwmon_memory_has_ecc{labels(hostname=hostname)} {has_ecc}')
if mem.get('has_ecc'):
metrics.append(f'# HELP hwmon_memory_ecc_errors_total Total ECC errors detected')
metrics.append(f'# TYPE hwmon_memory_ecc_errors_total gauge')
ecc_errors = len(mem.get('ecc_errors', []))
metrics.append(f'hwmon_memory_ecc_errors_total{labels(hostname=hostname)} {ecc_errors}')
# === Network Metrics ===
net = health_report.get('network_health', {})
metrics.append(f'# HELP hwmon_network_status Network status (1=OK, 0=issue)')
metrics.append(f'# TYPE hwmon_network_status gauge')
for net_type in ['management_network', 'ceph_network']:
net_info = net.get(net_type, {})
status = 1 if net_info.get('status') == 'OK' else 0
net_name = net_type.replace('_network', '')
metrics.append(f'hwmon_network_status{labels(hostname=hostname, network=net_name)} {status}')
# === Ceph Metrics ===
ceph = health_report.get('ceph_health', {})
if ceph.get('is_ceph_node'):
metrics.append(f'# HELP hwmon_ceph_cluster_healthy Ceph cluster health (1=healthy, 0=warning/error)')
metrics.append(f'# TYPE hwmon_ceph_cluster_healthy gauge')
ceph_healthy = 1 if ceph.get('cluster_health') == 'HEALTH_OK' else 0
metrics.append(f'hwmon_ceph_cluster_healthy{labels(hostname=hostname)} {ceph_healthy}')
if ceph.get('cluster_usage'):
usage = ceph['cluster_usage']
metrics.append(f'# HELP hwmon_ceph_cluster_usage_percent Ceph cluster usage percentage')
metrics.append(f'# TYPE hwmon_ceph_cluster_usage_percent gauge')
metrics.append(f'hwmon_ceph_cluster_usage_percent{labels(hostname=hostname)} {usage.get("usage_percent", 0)}')
metrics.append(f'# HELP hwmon_ceph_cluster_bytes_total Ceph cluster total bytes')
metrics.append(f'# TYPE hwmon_ceph_cluster_bytes_total gauge')
metrics.append(f'hwmon_ceph_cluster_bytes_total{labels(hostname=hostname)} {usage.get("total_bytes", 0)}')
metrics.append(f'# HELP hwmon_ceph_cluster_bytes_used Ceph cluster used bytes')
metrics.append(f'# TYPE hwmon_ceph_cluster_bytes_used gauge')
metrics.append(f'hwmon_ceph_cluster_bytes_used{labels(hostname=hostname)} {usage.get("used_bytes", 0)}')
metrics.append(f'# HELP hwmon_ceph_osd_total Total number of OSDs')
metrics.append(f'# TYPE hwmon_ceph_osd_total gauge')
osd_count = len(ceph.get('osd_status', []))
metrics.append(f'hwmon_ceph_osd_total{labels(hostname=hostname)} {osd_count}')
metrics.append(f'# HELP hwmon_ceph_osd_down Number of down OSDs')
metrics.append(f'# TYPE hwmon_ceph_osd_down gauge')
down_osds = len([o for o in ceph.get('osd_status', []) if o.get('status') == 'down'])
metrics.append(f'hwmon_ceph_osd_down{labels(hostname=hostname)} {down_osds}')
# === LXC Metrics ===
lxc = health_report.get('lxc_health', {})
if lxc.get('containers'):
metrics.append(f'# HELP hwmon_lxc_storage_usage_percent LXC container storage usage percentage')
metrics.append(f'# TYPE hwmon_lxc_storage_usage_percent gauge')
for container in lxc['containers']:
vmid = container.get('vmid', 'unknown')
for fs in container.get('filesystems', []):
mountpoint = fs.get('mountpoint', '/')
usage = fs.get('usage_percent', 0)
metrics.append(f'hwmon_lxc_storage_usage_percent{labels(hostname=hostname, vmid=vmid, mountpoint=mountpoint)} {usage}')
# === Issue Summary Metrics ===
metrics.append(f'# HELP hwmon_issues_total Total number of issues detected')
metrics.append(f'# TYPE hwmon_issues_total gauge')
system_issues = len(health_report.get('system_health', {}).get('issues', []))
ceph_issues = len(ceph.get('issues', [])) + len(ceph.get('cluster_wide_issues', []))
lxc_issues = len(lxc.get('issues', []))
total_issues = system_issues + ceph_issues + lxc_issues
metrics.append(f'hwmon_issues_total{labels(hostname=hostname)} {total_issues}')
return '\n'.join(metrics) + '\n'
def _parse_size_to_bytes(self, size_str: str) -> int:
"""Parse human-readable size string to bytes."""
if not size_str:
return 0
size_str = size_str.strip().upper()
multipliers = {
'B': 1,
'KB': 1024,
'MB': 1024**2,
'GB': 1024**3,
'TB': 1024**4,
'PB': 1024**5,
'K': 1024,
'M': 1024**2,
'G': 1024**3,
'T': 1024**4,
'P': 1024**5
}
try:
for suffix, mult in sorted(multipliers.items(), key=lambda x: -len(x[0])):
if size_str.endswith(suffix):
num = float(size_str[:-len(suffix)].strip())
return int(num * mult)
return int(float(size_str))
except (ValueError, TypeError):
return 0
def write_prometheus_metrics(self, health_report: Dict[str, Any]) -> bool:
"""
Write Prometheus metrics to configured destination.
If PROMETHEUS_TEXTFILE_PATH is set, writes to that file for node_exporter.
Otherwise, logs the metrics (for debugging or other use).
Returns:
bool: True if metrics were written successfully
"""
if not self.CONFIG.get('PROMETHEUS_ENABLED', False):
return False
try:
metrics = self.export_prometheus_metrics(health_report)
textfile_path = self.CONFIG.get('PROMETHEUS_TEXTFILE_PATH')
if textfile_path:
# Write to textfile for node_exporter textfile collector
# Write to temp file first, then atomic rename
import tempfile
temp_fd, temp_path = tempfile.mkstemp(
dir=os.path.dirname(textfile_path),
prefix='.hwmon_metrics_'
)
try:
with os.fdopen(temp_fd, 'w') as f:
f.write(metrics)
os.rename(temp_path, textfile_path)
logger.info(f"Prometheus metrics written to {textfile_path}")
except Exception:
os.unlink(temp_path)
raise
else:
# Just log metrics (for debugging)
logger.debug("Prometheus metrics generated:\n" + metrics)
return True
except Exception as e:
logger.error(f"Failed to write Prometheus metrics: {e}")
return False
def _check_lxc_storage(self) -> Dict[str, Any]:
"""
Check storage utilization for all running LXC containers
"""
logger.debug("Starting LXC storage check")
lxc_health = {
'status': 'OK',
'containers': [],
'issues': []
}
try:
result = subprocess.run(
['pct', 'list'],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
timeout=30 # 30 second timeout
)
logger.debug(f"pct list output:\n{result.stdout}")
for line in result.stdout.split('\n')[1:]:
if not line.strip():
continue
parts = line.split()
if len(parts) < 2:
logger.debug(f"Skipping invalid line: {line}")
continue
vmid, status = parts[0], parts[1]
if status.lower() == 'running':
logger.debug(f"Checking container {vmid} disk usage")
disk_info = subprocess.run(
['pct', 'df', vmid],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
timeout=30 # 30 second timeout per container
)
container_info = {
'vmid': vmid,
'filesystems': []
}
for fs_line in disk_info.stdout.split('\n')[1:]:
if not fs_line.strip() or 'MP' in fs_line:
continue
# Fix: Use fs_line instead of line, and columns consistently
columns = fs_line.split()
if len(columns) >= 6:
try:
# Skip excluded mounts by checking the first column
if columns[0].startswith('appPool:') or '/mnt/pve/mediaf' in columns[1]:
continue
# Get the mountpoint (last column)
mountpoint = columns[-1]
# Skip excluded mountpoints
if self._is_excluded_mount(mountpoint):
logger.debug(f"Skipping excluded mount: {mountpoint}")
continue
# Parse size values safely - use correct column indices
total_space = self._parse_size(columns[2]) # 3rd column
used_space = self._parse_size(columns[3]) # 4th column
available_space = self._parse_size(columns[4]) # 5th column
# Parse percentage safely
try:
usage_percent = float(columns[5].rstrip('%')) # 6th column
except (ValueError, IndexError):
# Calculate percentage if parsing fails
usage_percent = (used_space / total_space * 100) if total_space > 0 else 0
filesystem = {
'mountpoint': mountpoint,
'total_space': total_space,
'used_space': used_space,
'available': available_space,
'usage_percent': usage_percent
}
container_info['filesystems'].append(filesystem)
# Check thresholds
if usage_percent >= self.CONFIG['THRESHOLDS']['LXC_CRITICAL']:
lxc_health['status'] = 'CRITICAL'
issue = f"LXC {vmid} critical storage usage: {usage_percent:.1f}% on {mountpoint}"
lxc_health['issues'].append(issue)
elif usage_percent >= self.CONFIG['THRESHOLDS']['LXC_WARNING']:
if lxc_health['status'] != 'CRITICAL':
lxc_health['status'] = 'WARNING'
issue = f"LXC {vmid} high storage usage: {usage_percent:.1f}% on {mountpoint}"
lxc_health['issues'].append(issue)
logger.debug(f"Filesystem details: {filesystem}")
except Exception as e:
logger.debug(f"Error processing line: {str(e)}")
logger.debug(f"Full exception: {repr(e)}")
continue
# Only add container info if we have filesystem data
if container_info['filesystems']:
lxc_health['containers'].append(container_info)
logger.debug(f"Added container info for VMID {vmid}")
logger.debug("=== LXC Storage Check Summary ===")
logger.debug(f"Status: {lxc_health['status']}")
logger.debug(f"Total containers checked: {len(lxc_health['containers'])}")
logger.debug(f"Issues found: {len(lxc_health['issues'])}")
logger.debug("=== End LXC Storage Check ===")
except Exception as e:
logger.debug(f"Critical error during LXC storage check: {str(e)}")
lxc_health['status'] = 'ERROR'
error_msg = f"Error checking LXC storage: {str(e)}"
lxc_health['issues'].append(error_msg)
return lxc_health
def main():
parser = argparse.ArgumentParser(description="System Health Monitor")
parser.add_argument(
"--dry-run",
action="store_true",
help="Enable dry-run mode (simulate ticket creation without actual API calls)."
)
parser.add_argument(
"--metrics",
action="store_true",
help="Output Prometheus metrics to stdout and exit."
)
parser.add_argument(
"--export-json",
type=str,
metavar="FILE",
help="Export health report to JSON file."
)
args = parser.parse_args()
monitor = SystemHealthMonitor(
ticket_api_url=SystemHealthMonitor.CONFIG['TICKET_API_URL'],
dry_run=args.dry_run
)
if args.metrics:
# Just output metrics to stdout
health_report = monitor.perform_health_checks()
print(monitor.export_prometheus_metrics(health_report))
elif args.export_json:
# Export health report as JSON
import json
health_report = monitor.perform_health_checks()
with open(args.export_json, 'w') as f:
json.dump(health_report, f, indent=2, default=str)
logger.info(f"Health report exported to {args.export_json}")
else:
monitor.run()
if __name__ == "__main__":
main()