Files
analyzeOSDs/ceph_osd_analyzer.py
Jared Vititoe 03374fa784 Add USB drive SMART support with multiple bridge chipset attempts
**Issue**: osd.2 is a USB-connected 1TB drive that couldn't read SMART

Error was: "Read Device Identity failed: scsi error unsupported field"
This is typical for USB-attached drives that need bridge-specific flags.

**Solution**: Added USB transport detection and multiple fallback methods:
- SAT (SCSI-ATA Translation) - most common USB bridges
- usbjmicron - JMicron USB bridge chipsets
- usbcypress - Cypress USB bridge chipsets
- Generic USB fallback
- SCSI passthrough

Also added USB/SAT attempt to unknown transport types as fallback.

**Debug Enhancement**:
- Now shows detected transport type in debug output
- Helps diagnose why SMART fails

**Note**: USB drives in Ceph clusters are unconventional but functional.
This OSD appears to be temporary/supplemental storage capacity.

If SMART still fails after this update, the USB bridge may be incompatible
with smartmontools, which is acceptable for temporary storage.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
2026-01-06 15:16:35 -05:00

648 lines
26 KiB
Python

#!/usr/bin/env python3
"""
Advanced Ceph OSD Replacement Candidate Analyzer
This script identifies the best OSD replacement candidates by analyzing:
- SMART health data (wear, errors, temperature) from ALL cluster nodes
- Capacity utilization and imbalance
- Host-level distribution and resilience
- Age and performance metrics
- PG distribution balance
Usage: sudo python3 ceph_osd_analyzer.py [--class hdd|nvme] [--min-size 8] [--debug]
"""
import json
import subprocess
import sys
import argparse
from collections import defaultdict
from datetime import datetime
import re
DEBUG = False
class Colors:
RED = '\033[91m'
YELLOW = '\033[93m'
GREEN = '\033[92m'
BLUE = '\033[94m'
CYAN = '\033[96m'
BOLD = '\033[1m'
END = '\033[0m'
def run_command(cmd, parse_json=False, host=None):
"""Execute shell command locally or via SSH and return output"""
try:
if host:
cmd = f"ssh -o StrictHostKeyChecking=no -o ConnectTimeout=5 {host} '{cmd}'"
if DEBUG:
print(f"{Colors.CYAN}DEBUG: Running: {cmd}{Colors.END}")
result = subprocess.run(cmd, shell=True, capture_output=True, text=True, check=True)
if parse_json:
return json.loads(result.stdout)
return result.stdout.strip()
except subprocess.CalledProcessError as e:
if DEBUG:
print(f"{Colors.YELLOW}DEBUG: Command failed: {cmd}{Colors.END}")
if e.stderr:
print(f"{Colors.YELLOW}DEBUG: stderr: {e.stderr[:200]}{Colors.END}")
return None if parse_json else ""
except json.JSONDecodeError as e:
if DEBUG:
print(f"{Colors.RED}Error parsing JSON from: {cmd}{Colors.END}")
return None
def get_osd_tree():
"""Get OSD tree structure"""
return run_command("ceph osd tree -f json", parse_json=True)
def get_osd_df():
"""Get OSD disk usage statistics"""
return run_command("ceph osd df -f json", parse_json=True)
def get_osd_metadata(osd_id):
"""Get metadata for specific OSD"""
return run_command(f"ceph osd metadata osd.{osd_id} -f json", parse_json=True)
# Performance metrics removed for simplicity
def get_osd_host_mapping(osd_tree):
"""Build mapping of OSD ID to hostname"""
osd_to_host = {}
for node in osd_tree['nodes']:
if node['type'] == 'host':
host_name = node['name']
for child_id in node.get('children', []):
osd_to_host[child_id] = host_name
return osd_to_host
def get_device_path_for_osd(osd_id, hostname):
"""Get the physical device path for an OSD on a host (resolve dm devices)."""
metadata = get_osd_metadata(osd_id)
if metadata:
# Try 'bluestore_bdev_devices' first
phys_dev = metadata.get('bluestore_bdev_devices')
if phys_dev:
device = f"/dev/{phys_dev.strip()}"
if DEBUG:
print(f"{Colors.GREEN}DEBUG: Found physical device from metadata: {device}{Colors.END}")
return device
# Also try devices field which sometimes has the info
devices = metadata.get('devices')
if devices:
# devices might be comma-separated
first_dev = devices.split(',')[0].strip()
if first_dev and not first_dev.startswith('dm-'):
device = f"/dev/{first_dev}" if not first_dev.startswith('/dev/') else first_dev
if DEBUG:
print(f"{Colors.GREEN}DEBUG: Found device from metadata.devices: {device}{Colors.END}")
return device
# Fallback: follow the symlink
result = run_command(f"readlink -f /var/lib/ceph/osd/ceph-{osd_id}/block", host=hostname)
if result and result.startswith('/dev/'):
# Check if it is a dm device, try to find underlying
if '/dev/dm-' in result or '/dev/mapper/' in result:
# Try multiple methods to resolve dm device
base = run_command(f"lsblk -no pkname {result}", host=hostname)
if not base:
# Alternative: use ls -l on /dev/mapper
base = run_command(f"ls -l {result} | awk '{{print $NF}}' | xargs basename", host=hostname)
if base:
device = f"/dev/{base.strip()}"
if DEBUG:
print(f"{Colors.GREEN}DEBUG: Resolved dm device {result} -> {device}{Colors.END}")
return device
else:
if DEBUG:
print(f"{Colors.GREEN}DEBUG: Using device symlink {result}{Colors.END}")
return result
# Try alternative: lsblk with PKNAME (parent kernel name)
result = run_command(f"lsblk -no pkname /var/lib/ceph/osd/ceph-{osd_id}/block 2>/dev/null", host=hostname)
if result:
device = f"/dev/{result.strip()}"
if DEBUG:
print(f"{Colors.GREEN}DEBUG: Found device from lsblk pkname: {device}{Colors.END}")
return device
# Last resort: try to get from ceph-volume lvm list
result = run_command(f"ceph-volume lvm list | grep -A 20 'osd id.*{osd_id}' | grep 'devices' | awk '{{print $2}}'", host=hostname)
if result:
device = result.strip()
if DEBUG:
print(f"{Colors.GREEN}DEBUG: Found device from ceph-volume: {device}{Colors.END}")
return device
if DEBUG:
print(f"{Colors.RED}DEBUG: Could not determine device for osd.{osd_id}{Colors.END}")
return None
def get_smart_data_remote(device_path, hostname):
"""Get SMART data from a remote host with multiple fallback methods"""
if not device_path:
return None
# Determine device type
tran = run_command(f"lsblk -no tran {device_path} 2>/dev/null", host=hostname)
tran = tran.strip() if tran else ""
# Try different command variations based on device type
commands_to_try = []
if tran == "nvme" or "nvme" in device_path:
commands_to_try = [
f"sudo smartctl -a -j {device_path} -d nvme",
f"smartctl -a -j {device_path} -d nvme", # Try without sudo
f"sudo smartctl -a -j {device_path}",
]
elif tran == "usb":
# USB-connected drives need special device type flags
commands_to_try = [
f"sudo smartctl -a -j {device_path} -d sat", # SAT (SCSI-ATA Translation)
f"sudo smartctl -a -j {device_path} -d usbjmicron", # JMicron USB bridge
f"sudo smartctl -a -j {device_path} -d usbcypress", # Cypress USB bridge
f"sudo smartctl -a -j {device_path} -d usb", # Generic USB
f"sudo smartctl -a -j {device_path} -d scsi", # SCSI passthrough
f"sudo smartctl -a -j {device_path}", # Auto-detect
]
elif tran == "sata":
commands_to_try = [
f"sudo smartctl -a -j {device_path}",
f"smartctl -a -j {device_path}",
f"sudo smartctl -a -j {device_path} -d ata",
]
else:
# Unknown or no transport, try generic approaches including USB
commands_to_try = [
f"sudo smartctl -a -j {device_path}",
f"smartctl -a -j {device_path}",
f"sudo smartctl -a -j {device_path} -d sat", # Try USB/SAT
f"sudo smartctl -a -j {device_path} -d auto",
]
# Try each command until one succeeds
for cmd in commands_to_try:
result = run_command(f"{cmd} 2>/dev/null", host=hostname, parse_json=True)
if result and ('ata_smart_attributes' in result or 'nvme_smart_health_information_log' in result):
if DEBUG:
print(f"{Colors.GREEN}DEBUG: SMART success with: {cmd}{Colors.END}")
return result
if DEBUG:
print(f"{Colors.RED}DEBUG: All SMART methods failed for {device_path} on {hostname}{Colors.END}")
print(f"{Colors.YELLOW}DEBUG: Transport type detected: {tran if tran else 'unknown'}{Colors.END}")
return None
def get_device_health(osd_id, hostname):
"""Get device SMART health metrics from the appropriate host"""
if DEBUG:
print(f"{Colors.CYAN}DEBUG: Getting health for osd.{osd_id} on {hostname}{Colors.END}")
# First try ceph's built-in health metrics
data = run_command(f"ceph device query-daemon-health-metrics osd.{osd_id} -f json 2>/dev/null", parse_json=True)
if data:
# Ceph returns data nested under device ID, extract it
if isinstance(data, dict) and len(data) > 0:
# Get the first (and usually only) device entry
device_data = next(iter(data.values())) if data else None
if device_data and ('ata_smart_attributes' in device_data or 'nvme_smart_health_information_log' in device_data):
if DEBUG:
print(f"{Colors.GREEN}DEBUG: Got SMART data from ceph device query (nested format){Colors.END}")
return device_data
# Also check if data is already in the right format (backward compatibility)
if 'ata_smart_attributes' in data or 'nvme_smart_health_information_log' in data:
if DEBUG:
print(f"{Colors.GREEN}DEBUG: Got SMART data from ceph device query (direct format){Colors.END}")
return data
# If that fails, get device path and query via SSH
device_path = get_device_path_for_osd(osd_id, hostname)
if DEBUG:
print(f"{Colors.CYAN}DEBUG: Device path for osd.{osd_id}: {device_path}{Colors.END}")
if device_path:
smart_data = get_smart_data_remote(device_path, hostname)
if smart_data and DEBUG:
print(f"{Colors.GREEN}DEBUG: Got SMART data via SSH from {hostname}{Colors.END}")
return smart_data
return None
def parse_smart_health(smart_data):
"""Parse SMART data and calculate health score"""
score = 100.0
issues = []
metrics = {}
if not smart_data:
# CRITICAL: Failed SMART reads are a red flag - could indicate drive issues
return 0.0, ["CRITICAL: No SMART data available - drive may be failing"], metrics
# Check for HDD SMART data
if 'ata_smart_attributes' in smart_data:
attrs = smart_data['ata_smart_attributes'].get('table', [])
for attr in attrs:
attr_id = attr.get('id')
name = attr.get('name', '')
value = attr.get('value', 0)
raw_value = attr.get('raw', {}).get('value', 0)
# Reallocated Sectors (5) - CRITICAL indicator of imminent failure
if attr_id == 5:
metrics['reallocated_sectors'] = raw_value
if raw_value > 0:
# ANY reallocated sectors is a severe problem
if raw_value >= 10:
score -= 95 # Drive is failing, near-zero health
elif raw_value >= 5:
score -= 85 # Critical failure imminent
else:
score -= 70 # Even 1-4 sectors is very serious
issues.append(f"CRITICAL: Reallocated sectors: {raw_value} - DRIVE FAILING")
# Spin Retry Count (10) - CRITICAL
elif attr_id == 10:
metrics['spin_retry'] = raw_value
if raw_value > 0:
score -= min(40, raw_value * 10)
issues.append(f"CRITICAL: Spin retry count: {raw_value}")
# Pending Sectors (197) - CRITICAL
elif attr_id == 197:
metrics['pending_sectors'] = raw_value
if raw_value > 0:
score -= min(60, raw_value * 10)
issues.append(f"CRITICAL: Pending sectors: {raw_value}")
# Uncorrectable Sectors (198) - CRITICAL
elif attr_id == 198:
metrics['uncorrectable_sectors'] = raw_value
if raw_value > 0:
score -= min(70, raw_value * 15)
issues.append(f"CRITICAL: Uncorrectable sectors: {raw_value}")
# Temperature (190, 194)
elif attr_id in [190, 194]:
# Only use valid temperature values
if isinstance(raw_value, int) and 0 < raw_value < 100:
metrics['temperature'] = raw_value
if raw_value > 60:
score -= min(10, (raw_value - 60) * 2)
issues.append(f"High temperature: {raw_value}°C")
# Power On Hours (9)
elif attr_id == 9:
metrics['power_on_hours'] = raw_value
age_years = raw_value / 8760
metrics['age_years'] = age_years
if age_years > 5:
score -= min(15, (age_years - 5) * 3)
issues.append(f"Drive age: {age_years:.1f} years")
# Check for NVMe SMART data
elif 'nvme_smart_health_information_log' in smart_data:
nvme_health = smart_data['nvme_smart_health_information_log']
# Available spare
spare = nvme_health.get('available_spare', 100)
if spare < 50:
score -= (100 - spare) * 0.5
issues.append(f"Low available spare: {spare}%")
# Percentage used
pct_used = nvme_health.get('percentage_used', 0)
metrics['percentage_used'] = pct_used
if pct_used > 80:
score -= min(30, (pct_used - 80) * 1.5)
issues.append(f"High wear: {pct_used}%")
# Media errors - CRITICAL for NVMe
media_errors = nvme_health.get('media_errors', 0)
if media_errors > 0:
score -= min(60, media_errors * 10)
issues.append(f"CRITICAL: Media errors: {media_errors}")
# Temperature
temp = nvme_health.get('temperature', 0)
if 0 < temp < 150: # Valid temperature range
metrics['temperature'] = temp
if temp > 70:
score -= min(10, (temp - 70) * 2)
issues.append(f"High temperature: {temp}°C")
return max(0, score), issues, metrics
def calculate_capacity_score(osd_data, host_osds_data, osd_class):
"""Calculate score based on capacity optimization potential"""
score = 0.0
factors = []
weight = osd_data.get('crush_weight', 0)
utilization = osd_data.get('utilization', 0)
# Small drives are better candidates
if weight < 2:
score += 40
factors.append(f"Very small drive ({weight:.1f}TB) - high capacity gain")
elif weight < 5:
score += 30
factors.append(f"Small drive ({weight:.1f}TB) - good capacity gain")
elif weight < 10:
score += 15
factors.append(f"Medium drive ({weight:.1f}TB)")
else:
score += 5
factors.append(f"Large drive ({weight:.1f}TB) - lower priority")
# High utilization drives are harder to replace
if utilization > 70:
score -= 15
factors.append(f"High utilization ({utilization:.1f}%) - requires data migration")
elif utilization > 50:
score -= 8
factors.append(f"Medium utilization ({utilization:.1f}%)")
# Host balance consideration
same_class_osds = [o for o in host_osds_data if o.get('device_class') == osd_class]
if same_class_osds:
host_total_weight = sum(o.get('crush_weight', 0) for o in same_class_osds)
host_avg_weight = host_total_weight / len(same_class_osds)
if weight < host_avg_weight * 0.5:
score += 15
factors.append(f"Below host average ({host_avg_weight:.1f}TB) - improves balance")
return score, factors
def calculate_resilience_score(osd_data, host_name, all_hosts_data, osd_tree):
"""Calculate score based on cluster resilience improvement"""
score = 0.0
factors = []
osd_class = osd_data.get('device_class', 'hdd')
# Count OSDs per host by class
host_class_counts = {}
for host_node in [n for n in osd_tree['nodes'] if n['type'] == 'host']:
h_name = host_node['name']
host_osds = [osd_tree['nodes'][i] for i in range(len(osd_tree['nodes']))
if osd_tree['nodes'][i].get('id') in host_node.get('children', [])
and osd_tree['nodes'][i].get('type') == 'osd']
host_class_counts[h_name] = {
'hdd': len([o for o in host_osds if o.get('device_class') == 'hdd' and o.get('status') == 'up']),
'nvme': len([o for o in host_osds if o.get('device_class') == 'nvme' and o.get('status') == 'up'])
}
if host_name not in host_class_counts:
return 0, ["Host not found in cluster"]
current_count = host_class_counts[host_name][osd_class]
avg_count = sum(h[osd_class] for h in host_class_counts.values()) / len(host_class_counts)
# Hosts with more OSDs are better candidates
if current_count > avg_count * 1.2:
score += 20
factors.append(f"Host has {current_count} {osd_class} OSDs (above average {avg_count:.1f})")
elif current_count > avg_count:
score += 10
factors.append(f"Host slightly above average {osd_class} count")
# Check for down OSDs on same host
host_node = next((n for n in osd_tree['nodes'] if n['type'] == 'host' and n['name'] == host_name), None)
if host_node:
down_osds = [osd_tree['nodes'][i] for i in range(len(osd_tree['nodes']))
if osd_tree['nodes'][i].get('id') in host_node.get('children', [])
and osd_tree['nodes'][i].get('status') == 'down']
if down_osds:
score += 15
factors.append(f"Host has {len(down_osds)} down OSD(s) - may have hardware issues")
return score, factors
# Performance metrics removed for simplicity
def analyze_cluster():
"""Main analysis function"""
print(f"{Colors.BOLD}{Colors.CYAN}=== Ceph OSD Replacement Candidate Analyzer ==={Colors.END}\n")
# Gather data
print("Gathering cluster data...")
osd_tree = get_osd_tree()
osd_df = get_osd_df()
if not osd_tree or not osd_df:
print(f"{Colors.RED}Failed to gather cluster data{Colors.END}")
return
# Build OSD to host mapping
osd_to_host = get_osd_host_mapping(osd_tree)
# Parse OSD data
osd_df_map = {node['id']: node for node in osd_df['nodes']}
# Build host data map
host_osds_map = defaultdict(list)
for node in osd_tree['nodes']:
if node['type'] == 'osd' and node.get('status') == 'up':
host_name = osd_to_host.get(node['id'])
if host_name:
osd_df_data = osd_df_map.get(node['id'], {})
host_osds_map[host_name].append({
'id': node['id'],
'device_class': node.get('device_class', 'hdd'),
'crush_weight': osd_df_data.get('crush_weight', 0)
})
# Analyze each OSD
candidates = []
failed_smart = []
print("Analyzing OSDs across all cluster nodes...\n")
total_osds = len([n for n in osd_tree['nodes'] if n['type'] == 'osd' and n.get('status') == 'up'])
current_osd = 0
for node in osd_tree['nodes']:
if node['type'] != 'osd' or node.get('status') != 'up':
continue
current_osd += 1
osd_id = node['id']
osd_name = node['name']
device_class = node.get('device_class', 'hdd')
host_name = osd_to_host.get(osd_id, 'unknown')
print(f"[{current_osd}/{total_osds}] Analyzing {osd_name} on {host_name} ({device_class})...".ljust(80), end='\r')
# Get OSD data
osd_df_data = osd_df_map.get(osd_id, {})
# SMART health analysis
health_data = get_device_health(osd_id, host_name)
if not health_data:
failed_smart.append((osd_name, host_name))
health_score, health_issues, health_metrics = parse_smart_health(health_data)
# Capacity optimization score
capacity_score, capacity_factors = calculate_capacity_score(
osd_df_data, host_osds_map.get(host_name, []), device_class
)
# Resilience score
resilience_score, resilience_factors = calculate_resilience_score(
node, host_name, host_osds_map, osd_tree
)
# Calculate total score with revised weights
# Priority: Failed drives > Small failing drives > Small drives > Any failing
has_health_issues = len(health_issues) > 0
has_critical_issues = any('CRITICAL:' in issue and ('Reallocated' in issue or 'Uncorrectable' in issue or 'Pending' in issue)
for issue in health_issues)
is_small = osd_df_data.get('crush_weight', 0) < 5
# Base scoring: 80% health, 15% capacity, 5% resilience
base_score = (
(100 - health_score) * 0.80 + # Health is critical
capacity_score * 0.15 + # Capacity matters for small drives
resilience_score * 0.05 # Cluster resilience (minor)
)
# Apply multipliers for priority combinations
if health_score == 0: # Failed SMART reads
if is_small:
base_score += 30 # Failed SMART + small = top priority
else:
base_score += 20 # Failed SMART alone is still critical
elif has_critical_issues: # Reallocated/pending/uncorrectable sectors
if is_small:
base_score += 25 # Critical issues + small drive
else:
base_score += 20 # Critical issues alone
elif has_health_issues and is_small:
base_score += 15 # Small + beginning to fail
total_score = min(100, base_score) # Cap at 100
candidates.append({
'osd_id': osd_id,
'osd_name': osd_name,
'host': host_name,
'device_class': device_class,
'weight': osd_df_data.get('crush_weight', 0),
'size': osd_df_data.get('kb', 0) / 1024 / 1024 / 1024, # TB
'utilization': osd_df_data.get('utilization', 0),
'total_score': total_score,
'health_score': health_score,
'health_issues': health_issues,
'health_metrics': health_metrics,
'capacity_factors': capacity_factors,
'resilience_factors': resilience_factors,
})
print(" " * 80, end='\r')
# Show SMART failures if any
if failed_smart:
print(f"\n{Colors.YELLOW}Note: Unable to retrieve SMART data for {len(failed_smart)} OSDs:{Colors.END}")
for osd_name, host in failed_smart[:5]:
print(f" - {osd_name} on {host}")
if len(failed_smart) > 5:
print(f" ... and {len(failed_smart) - 5} more")
print()
# Sort by total score
candidates.sort(key=lambda x: x['total_score'], reverse=True)
# Display results
print(f"\n{Colors.BOLD}{Colors.CYAN}=== TOP REPLACEMENT CANDIDATES (ALL HOSTS) ==={Colors.END}\n")
for rank, candidate in enumerate(candidates[:15], 1):
score_color = Colors.RED if candidate['total_score'] > 50 else Colors.YELLOW if candidate['total_score'] > 30 else Colors.GREEN
health_color = Colors.GREEN if candidate['health_score'] > 80 else Colors.YELLOW if candidate['health_score'] > 60 else Colors.RED
print(f"{Colors.BOLD}#{rank} - {candidate['osd_name']} ({candidate['device_class'].upper()}){Colors.END}")
print(f" Host: {candidate['host']}")
print(f" Size: {candidate['size']:.2f} TB (weight: {candidate['weight']:.2f})")
print(f" Utilization: {candidate['utilization']:.1f}%")
print(f" {score_color}Replacement Score: {candidate['total_score']:.1f}/100{Colors.END}")
print(f" {health_color}Health Score: {candidate['health_score']:.1f}/100{Colors.END}")
if candidate['health_issues']:
print(f" {Colors.RED}Health Issues:{Colors.END}")
for issue in candidate['health_issues'][:3]:
print(f" - {issue}")
if candidate['capacity_factors']:
print(f" Capacity Optimization:")
for factor in candidate['capacity_factors'][:2]:
print(f"{factor}")
if candidate['resilience_factors']:
print(f" Host Distribution:")
for factor in candidate['resilience_factors'][:2]:
print(f"{factor}")
print()
# Summary by class
print(f"\n{Colors.BOLD}{Colors.CYAN}=== SUMMARY BY DEVICE CLASS ==={Colors.END}\n")
for device_class in ['hdd', 'nvme']:
class_candidates = [c for c in candidates if c['device_class'] == device_class]
if class_candidates:
top_candidate = class_candidates[0]
print(f"{Colors.BOLD}{device_class.upper()}:{Colors.END}")
print(f" Top candidate: {top_candidate['osd_name']} (score: {top_candidate['total_score']:.1f})")
print(f" Host: {top_candidate['host']}")
print(f" Capacity gain potential: {top_candidate['weight']:.2f} TB")
print()
# Summary by host
print(f"\n{Colors.BOLD}{Colors.CYAN}=== TOP CANDIDATES BY HOST ==={Colors.END}\n")
hosts_seen = set()
for candidate in candidates:
if candidate['host'] not in hosts_seen and len(hosts_seen) < 5:
hosts_seen.add(candidate['host'])
print(f"{Colors.BOLD}{candidate['host']}:{Colors.END}")
print(f" Top candidate: {candidate['osd_name']} (score: {candidate['total_score']:.1f})")
print(f" {candidate['device_class'].upper()}, {candidate['weight']:.2f} TB, {candidate['utilization']:.1f}% used")
if candidate['health_issues']:
print(f" Issues: {candidate['health_issues'][0]}")
print()
if __name__ == "__main__":
parser = argparse.ArgumentParser(description='Analyze Ceph OSDs for replacement candidates across entire cluster')
parser.add_argument('--class', dest='device_class', choices=['hdd', 'nvme'],
help='Filter by device class')
parser.add_argument('--min-size', type=float, default=0,
help='Minimum OSD size in TB to consider')
parser.add_argument('--debug', action='store_true',
help='Enable debug output')
args = parser.parse_args()
if args.debug:
DEBUG = True
try:
analyze_cluster()
except KeyboardInterrupt:
print(f"\n{Colors.YELLOW}Analysis interrupted{Colors.END}")
sys.exit(0)
except Exception as e:
print(f"{Colors.RED}Error: {e}{Colors.END}")
import traceback
traceback.print_exc()
sys.exit(1)