#!/usr/bin/env python3 """ Advanced Ceph OSD Replacement Candidate Analyzer This script identifies the best OSD replacement candidates by analyzing: - SMART health data (wear, errors, temperature) from ALL cluster nodes - Capacity utilization and imbalance - Host-level distribution and resilience - Age and performance metrics - PG distribution balance Usage: sudo python3 ceph_osd_analyzer.py [--class hdd|nvme] [--min-size 8] [--debug] """ import json import subprocess import sys import argparse from collections import defaultdict from datetime import datetime import re DEBUG = False class Colors: RED = '\033[91m' YELLOW = '\033[93m' GREEN = '\033[92m' BLUE = '\033[94m' CYAN = '\033[96m' BOLD = '\033[1m' END = '\033[0m' def run_command(cmd, parse_json=False, host=None): """Execute shell command locally or via SSH and return output""" try: if host: cmd = f"ssh -o StrictHostKeyChecking=no -o ConnectTimeout=5 {host} '{cmd}'" if DEBUG: print(f"{Colors.CYAN}DEBUG: Running: {cmd}{Colors.END}") result = subprocess.run(cmd, shell=True, capture_output=True, text=True, check=True) if parse_json: return json.loads(result.stdout) return result.stdout.strip() except subprocess.CalledProcessError as e: if DEBUG: print(f"{Colors.YELLOW}DEBUG: Command failed: {cmd}{Colors.END}") if e.stderr: print(f"{Colors.YELLOW}DEBUG: stderr: {e.stderr[:200]}{Colors.END}") return None if parse_json else "" except json.JSONDecodeError as e: if DEBUG: print(f"{Colors.RED}Error parsing JSON from: {cmd}{Colors.END}") return None def get_osd_tree(): """Get OSD tree structure""" return run_command("ceph osd tree -f json", parse_json=True) def get_osd_df(): """Get OSD disk usage statistics""" return run_command("ceph osd df -f json", parse_json=True) def get_osd_metadata(osd_id): """Get metadata for specific OSD""" return run_command(f"ceph osd metadata osd.{osd_id} -f json", parse_json=True) # Performance metrics removed for simplicity def get_osd_host_mapping(osd_tree): """Build mapping of OSD ID to hostname""" osd_to_host = {} for node in osd_tree['nodes']: if node['type'] == 'host': host_name = node['name'] for child_id in node.get('children', []): osd_to_host[child_id] = host_name return osd_to_host def get_device_path_for_osd(osd_id, hostname): """Get the physical device path for an OSD on a host (resolve dm devices).""" metadata = get_osd_metadata(osd_id) if metadata: # Try 'bluestore_bdev_devices' first phys_dev = metadata.get('bluestore_bdev_devices') if phys_dev: device = f"/dev/{phys_dev.strip()}" if DEBUG: print(f"{Colors.GREEN}DEBUG: Found physical device from metadata: {device}{Colors.END}") return device # Fallback: follow the symlink result = run_command(f"readlink -f /var/lib/ceph/osd/ceph-{osd_id}/block", host=hostname) if result and result.startswith('/dev/'): # Check if it is a dm device, try to find underlying if '/dev/dm-' in result: base = run_command(f"lsblk -no pkname {result}", host=hostname) if base: device = f"/dev/{base.strip()}" if DEBUG: print(f"{Colors.GREEN}DEBUG: Resolved dm device {result} -> {device}{Colors.END}") return device else: if DEBUG: print(f"{Colors.GREEN}DEBUG: Using device symlink {result}{Colors.END}") return result # Last fallback: lsblk from block path result = run_command(f"lsblk -no pkname /var/lib/ceph/osd/ceph-{osd_id}/block", host=hostname) if result: device = f"/dev/{result.strip()}" if DEBUG: print(f"{Colors.GREEN}DEBUG: Found device from lsblk: {device}{Colors.END}") return device if DEBUG: print(f"{Colors.RED}DEBUG: Could not determine device for osd.{osd_id}{Colors.END}") return None def get_smart_data_remote(device_path, hostname): """Get SMART data from a remote host with proper device type detection.""" if not device_path: return None # Strip partition suffix base_device = re.sub(r'p?\d+$', '', device_path) # Detect type: NVMe or SATA if 'nvme' in base_device: dev_type = 'nvme' else: dev_type = 'sat' # sata/ata, compatible with SSD/HDD cmd = f"sudo smartctl -a -j -d {dev_type} {base_device} 2>/dev/null" result = run_command(cmd, host=hostname, parse_json=True) if DEBUG and result is None: print(f"{Colors.YELLOW}DEBUG: SMART data failed for {base_device} on {hostname}{Colors.END}") return result def get_device_health(osd_id, hostname): """Get device SMART health metrics from the appropriate host""" if DEBUG: print(f"{Colors.CYAN}DEBUG: Getting health for osd.{osd_id} on {hostname}{Colors.END}") # First try ceph's built-in health metrics data = run_command(f"ceph device query-daemon-health-metrics osd.{osd_id} -f json 2>/dev/null", parse_json=True) if data and ('ata_smart_attributes' in data or 'nvme_smart_health_information_log' in data): if DEBUG: print(f"{Colors.GREEN}DEBUG: Got SMART data from ceph device query{Colors.END}") return data # If that fails, get device path and query via SSH device_path = get_device_path_for_osd(osd_id, hostname) if DEBUG: print(f"{Colors.CYAN}DEBUG: Device path for osd.{osd_id}: {device_path}{Colors.END}") if device_path: smart_data = get_smart_data_remote(device_path, hostname) if smart_data and DEBUG: print(f"{Colors.GREEN}DEBUG: Got SMART data via SSH from {hostname}{Colors.END}") return smart_data return None def parse_smart_health(smart_data): """Parse SMART data and calculate health score""" score = 100.0 issues = [] metrics = {} if not smart_data: return 50.0, ["No SMART data available"], metrics # Check for HDD SMART data if 'ata_smart_attributes' in smart_data: attrs = smart_data['ata_smart_attributes'].get('table', []) for attr in attrs: attr_id = attr.get('id') name = attr.get('name', '') value = attr.get('value', 0) raw_value = attr.get('raw', {}).get('value', 0) # Reallocated Sectors (5) if attr_id == 5: metrics['reallocated_sectors'] = raw_value if raw_value > 0: score -= min(20, raw_value * 2) issues.append(f"Reallocated sectors: {raw_value}") # Spin Retry Count (10) elif attr_id == 10: metrics['spin_retry'] = raw_value if raw_value > 0: score -= min(15, raw_value * 3) issues.append(f"Spin retry count: {raw_value}") # Pending Sectors (197) elif attr_id == 197: metrics['pending_sectors'] = raw_value if raw_value > 0: score -= min(25, raw_value * 5) issues.append(f"Pending sectors: {raw_value}") # Uncorrectable Sectors (198) elif attr_id == 198: metrics['uncorrectable_sectors'] = raw_value if raw_value > 0: score -= min(30, raw_value * 5) issues.append(f"Uncorrectable sectors: {raw_value}") # Temperature (190, 194) elif attr_id in [190, 194]: # Only use valid temperature values if isinstance(raw_value, int) and 0 < raw_value < 100: metrics['temperature'] = raw_value if raw_value > 60: score -= min(10, (raw_value - 60) * 2) issues.append(f"High temperature: {raw_value}°C") # Power On Hours (9) elif attr_id == 9: metrics['power_on_hours'] = raw_value age_years = raw_value / 8760 metrics['age_years'] = age_years if age_years > 5: score -= min(15, (age_years - 5) * 3) issues.append(f"Drive age: {age_years:.1f} years") # Check for NVMe SMART data elif 'nvme_smart_health_information_log' in smart_data: nvme_health = smart_data['nvme_smart_health_information_log'] # Available spare spare = nvme_health.get('available_spare', 100) if spare < 50: score -= (100 - spare) * 0.5 issues.append(f"Low available spare: {spare}%") # Percentage used pct_used = nvme_health.get('percentage_used', 0) metrics['percentage_used'] = pct_used if pct_used > 80: score -= min(30, (pct_used - 80) * 1.5) issues.append(f"High wear: {pct_used}%") # Media errors media_errors = nvme_health.get('media_errors', 0) if media_errors > 0: score -= min(25, media_errors * 5) issues.append(f"Media errors: {media_errors}") # Temperature temp = nvme_health.get('temperature', 0) if 0 < temp < 150: # Valid temperature range metrics['temperature'] = temp if temp > 70: score -= min(10, (temp - 70) * 2) issues.append(f"High temperature: {temp}°C") return max(0, score), issues, metrics def calculate_capacity_score(osd_data, host_osds_data, osd_class): """Calculate score based on capacity optimization potential""" score = 0.0 factors = [] weight = osd_data.get('crush_weight', 0) utilization = osd_data.get('utilization', 0) # Small drives are better candidates if weight < 2: score += 40 factors.append(f"Very small drive ({weight:.1f}TB) - high capacity gain") elif weight < 5: score += 30 factors.append(f"Small drive ({weight:.1f}TB) - good capacity gain") elif weight < 10: score += 15 factors.append(f"Medium drive ({weight:.1f}TB)") else: score += 5 factors.append(f"Large drive ({weight:.1f}TB) - lower priority") # High utilization drives are harder to replace if utilization > 70: score -= 15 factors.append(f"High utilization ({utilization:.1f}%) - requires data migration") elif utilization > 50: score -= 8 factors.append(f"Medium utilization ({utilization:.1f}%)") # Host balance consideration same_class_osds = [o for o in host_osds_data if o.get('device_class') == osd_class] if same_class_osds: host_total_weight = sum(o.get('crush_weight', 0) for o in same_class_osds) host_avg_weight = host_total_weight / len(same_class_osds) if weight < host_avg_weight * 0.5: score += 15 factors.append(f"Below host average ({host_avg_weight:.1f}TB) - improves balance") return score, factors def calculate_resilience_score(osd_data, host_name, all_hosts_data, osd_tree): """Calculate score based on cluster resilience improvement""" score = 0.0 factors = [] osd_class = osd_data.get('device_class', 'hdd') # Count OSDs per host by class host_class_counts = {} for host_node in [n for n in osd_tree['nodes'] if n['type'] == 'host']: h_name = host_node['name'] host_osds = [osd_tree['nodes'][i] for i in range(len(osd_tree['nodes'])) if osd_tree['nodes'][i].get('id') in host_node.get('children', []) and osd_tree['nodes'][i].get('type') == 'osd'] host_class_counts[h_name] = { 'hdd': len([o for o in host_osds if o.get('device_class') == 'hdd' and o.get('status') == 'up']), 'nvme': len([o for o in host_osds if o.get('device_class') == 'nvme' and o.get('status') == 'up']) } if host_name not in host_class_counts: return 0, ["Host not found in cluster"] current_count = host_class_counts[host_name][osd_class] avg_count = sum(h[osd_class] for h in host_class_counts.values()) / len(host_class_counts) # Hosts with more OSDs are better candidates if current_count > avg_count * 1.2: score += 20 factors.append(f"Host has {current_count} {osd_class} OSDs (above average {avg_count:.1f})") elif current_count > avg_count: score += 10 factors.append(f"Host slightly above average {osd_class} count") # Check for down OSDs on same host host_node = next((n for n in osd_tree['nodes'] if n['type'] == 'host' and n['name'] == host_name), None) if host_node: down_osds = [osd_tree['nodes'][i] for i in range(len(osd_tree['nodes'])) if osd_tree['nodes'][i].get('id') in host_node.get('children', []) and osd_tree['nodes'][i].get('status') == 'down'] if down_osds: score += 15 factors.append(f"Host has {len(down_osds)} down OSD(s) - may have hardware issues") return score, factors # Performance metrics removed for simplicity def analyze_cluster(): """Main analysis function""" print(f"{Colors.BOLD}{Colors.CYAN}=== Ceph OSD Replacement Candidate Analyzer ==={Colors.END}\n") # Gather data print("Gathering cluster data...") osd_tree = get_osd_tree() osd_df = get_osd_df() if not osd_tree or not osd_df: print(f"{Colors.RED}Failed to gather cluster data{Colors.END}") return # Build OSD to host mapping osd_to_host = get_osd_host_mapping(osd_tree) # Parse OSD data osd_df_map = {node['id']: node for node in osd_df['nodes']} # Build host data map host_osds_map = defaultdict(list) for node in osd_tree['nodes']: if node['type'] == 'osd' and node.get('status') == 'up': host_name = osd_to_host.get(node['id']) if host_name: osd_df_data = osd_df_map.get(node['id'], {}) host_osds_map[host_name].append({ 'id': node['id'], 'device_class': node.get('device_class', 'hdd'), 'crush_weight': osd_df_data.get('crush_weight', 0) }) # Analyze each OSD candidates = [] failed_smart = [] print("Analyzing OSDs across all cluster nodes...\n") total_osds = len([n for n in osd_tree['nodes'] if n['type'] == 'osd' and n.get('status') == 'up']) current_osd = 0 for node in osd_tree['nodes']: if node['type'] != 'osd' or node.get('status') != 'up': continue current_osd += 1 osd_id = node['id'] osd_name = node['name'] device_class = node.get('device_class', 'hdd') host_name = osd_to_host.get(osd_id, 'unknown') print(f"[{current_osd}/{total_osds}] Analyzing {osd_name} on {host_name} ({device_class})...".ljust(80), end='\r') # Get OSD data osd_df_data = osd_df_map.get(osd_id, {}) # SMART health analysis health_data = get_device_health(osd_id, host_name) if not health_data: failed_smart.append((osd_name, host_name)) health_score, health_issues, health_metrics = parse_smart_health(health_data) # Capacity optimization score capacity_score, capacity_factors = calculate_capacity_score( osd_df_data, host_osds_map.get(host_name, []), device_class ) # Resilience score resilience_score, resilience_factors = calculate_resilience_score( node, host_name, host_osds_map, osd_tree ) # Calculate total score (weighted: 60% health, 30% capacity, 10% resilience) total_score = ( (100 - health_score) * 0.60 + # Health is most important capacity_score * 0.30 + # Capacity optimization resilience_score * 0.10 # Cluster resilience ) candidates.append({ 'osd_id': osd_id, 'osd_name': osd_name, 'host': host_name, 'device_class': device_class, 'weight': osd_df_data.get('crush_weight', 0), 'size': osd_df_data.get('kb', 0) / 1024 / 1024 / 1024, # TB 'utilization': osd_df_data.get('utilization', 0), 'total_score': total_score, 'health_score': health_score, 'health_issues': health_issues, 'health_metrics': health_metrics, 'capacity_factors': capacity_factors, 'resilience_factors': resilience_factors, }) print(" " * 80, end='\r') # Show SMART failures if any if failed_smart: print(f"\n{Colors.YELLOW}Note: Unable to retrieve SMART data for {len(failed_smart)} OSDs:{Colors.END}") for osd_name, host in failed_smart[:5]: print(f" - {osd_name} on {host}") if len(failed_smart) > 5: print(f" ... and {len(failed_smart) - 5} more") print() # Sort by total score candidates.sort(key=lambda x: x['total_score'], reverse=True) # Display results print(f"\n{Colors.BOLD}{Colors.CYAN}=== TOP REPLACEMENT CANDIDATES (ALL HOSTS) ==={Colors.END}\n") for rank, candidate in enumerate(candidates[:15], 1): score_color = Colors.RED if candidate['total_score'] > 50 else Colors.YELLOW if candidate['total_score'] > 30 else Colors.GREEN health_color = Colors.GREEN if candidate['health_score'] > 80 else Colors.YELLOW if candidate['health_score'] > 60 else Colors.RED print(f"{Colors.BOLD}#{rank} - {candidate['osd_name']} ({candidate['device_class'].upper()}){Colors.END}") print(f" Host: {candidate['host']}") print(f" Size: {candidate['size']:.2f} TB (weight: {candidate['weight']:.2f})") print(f" Utilization: {candidate['utilization']:.1f}%") print(f" {score_color}Replacement Score: {candidate['total_score']:.1f}/100{Colors.END}") print(f" {health_color}Health Score: {candidate['health_score']:.1f}/100{Colors.END}") if candidate['health_issues']: print(f" {Colors.RED}Health Issues:{Colors.END}") for issue in candidate['health_issues'][:3]: print(f" - {issue}") if candidate['capacity_factors']: print(f" Capacity Optimization:") for factor in candidate['capacity_factors'][:2]: print(f" • {factor}") if candidate['resilience_factors']: print(f" Host Distribution:") for factor in candidate['resilience_factors'][:2]: print(f" • {factor}") print() # Summary by class print(f"\n{Colors.BOLD}{Colors.CYAN}=== SUMMARY BY DEVICE CLASS ==={Colors.END}\n") for device_class in ['hdd', 'nvme']: class_candidates = [c for c in candidates if c['device_class'] == device_class] if class_candidates: top_candidate = class_candidates[0] print(f"{Colors.BOLD}{device_class.upper()}:{Colors.END}") print(f" Top candidate: {top_candidate['osd_name']} (score: {top_candidate['total_score']:.1f})") print(f" Host: {top_candidate['host']}") print(f" Capacity gain potential: {top_candidate['weight']:.2f} TB") print() # Summary by host print(f"\n{Colors.BOLD}{Colors.CYAN}=== TOP CANDIDATES BY HOST ==={Colors.END}\n") hosts_seen = set() for candidate in candidates: if candidate['host'] not in hosts_seen and len(hosts_seen) < 5: hosts_seen.add(candidate['host']) print(f"{Colors.BOLD}{candidate['host']}:{Colors.END}") print(f" Top candidate: {candidate['osd_name']} (score: {candidate['total_score']:.1f})") print(f" {candidate['device_class'].upper()}, {candidate['weight']:.2f} TB, {candidate['utilization']:.1f}% used") if candidate['health_issues']: print(f" Issues: {candidate['health_issues'][0]}") print() if __name__ == "__main__": parser = argparse.ArgumentParser(description='Analyze Ceph OSDs for replacement candidates across entire cluster') parser.add_argument('--class', dest='device_class', choices=['hdd', 'nvme'], help='Filter by device class') parser.add_argument('--min-size', type=float, default=0, help='Minimum OSD size in TB to consider') parser.add_argument('--debug', action='store_true', help='Enable debug output') args = parser.parse_args() if args.debug: DEBUG = True try: analyze_cluster() except KeyboardInterrupt: print(f"\n{Colors.YELLOW}Analysis interrupted{Colors.END}") sys.exit(0) except Exception as e: print(f"{Colors.RED}Error: {e}{Colors.END}") import traceback traceback.print_exc() sys.exit(1)