#!/usr/bin/env python3 """ Advanced Ceph OSD Replacement Candidate Analyzer This script identifies the best OSD replacement candidates by analyzing: - SMART health data (wear, errors, temperature) from ALL cluster nodes - Capacity utilization and imbalance - Host-level distribution and resilience - Age and performance metrics - PG distribution balance Usage: sudo python3 ceph_osd_analyzer.py [--class hdd|nvme] [--min-size 8] """ import json import subprocess import sys import argparse from collections import defaultdict from datetime import datetime import re class Colors: RED = '\033[91m' YELLOW = '\033[93m' GREEN = '\033[92m' BLUE = '\033[94m' CYAN = '\033[96m' BOLD = '\033[1m' END = '\033[0m' def run_command(cmd, parse_json=False, host=None): """Execute shell command locally or via SSH and return output""" try: if host: cmd = f"ssh -o StrictHostKeyChecking=no -o ConnectTimeout=5 {host} '{cmd}'" result = subprocess.run(cmd, shell=True, capture_output=True, text=True, check=True) if parse_json: return json.loads(result.stdout) return result.stdout.strip() except subprocess.CalledProcessError as e: if host: print(f"{Colors.YELLOW}Warning: Failed to execute on {host}: {cmd}{Colors.END}") return None if parse_json else "" except json.JSONDecodeError as e: print(f"{Colors.RED}Error parsing JSON from: {cmd}{Colors.END}") return None def get_osd_tree(): """Get OSD tree structure""" return run_command("ceph osd tree -f json", parse_json=True) def get_osd_df(): """Get OSD disk usage statistics""" return run_command("ceph osd df -f json", parse_json=True) def get_osd_metadata(osd_id): """Get metadata for specific OSD""" return run_command(f"ceph osd metadata osd.{osd_id} -f json", parse_json=True) def get_osd_perf(): """Get OSD performance statistics""" return run_command("ceph osd perf -f json", parse_json=True) def get_pg_dump(): """Get PG dump for distribution analysis""" return run_command("ceph pg dump -f json 2>/dev/null", parse_json=True) def get_osd_host_mapping(osd_tree): """Build mapping of OSD ID to hostname""" osd_to_host = {} for node in osd_tree['nodes']: if node['type'] == 'host': host_name = node['name'] for child_id in node.get('children', []): osd_to_host[child_id] = host_name return osd_to_host def get_device_path_for_osd(osd_id, hostname): """Get the device path for an OSD on a specific host""" # Try to get from ceph metadata first metadata = get_osd_metadata(osd_id) if metadata: # Get device path from metadata devices = metadata.get('devices', '') device_ids = metadata.get('device_ids', '') if devices: return devices.split(',')[0] if ',' in devices else devices elif device_ids: # device_ids format is like "VENDOR_MODEL_SERIAL" # We need to find the actual device path on the host pass # Fallback: query the OSD on the remote host for its device cmd = f"ceph-volume lvm list osd.{osd_id} -f json 2>/dev/null || ceph-volume simple scan /var/lib/ceph/osd/ceph-{osd_id} 2>/dev/null" result = run_command(cmd, host=hostname, parse_json=False) if result: # Try to extract device path from output for line in result.split('\n'): if 'block_device' in line or 'device' in line: match = re.search(r'/dev/[a-z0-9]+', line) if match: return match.group(0) # Last resort: try common patterns common_paths = [ f"/dev/disk/by-partuuid/$(readlink /var/lib/ceph/osd/ceph-{osd_id}/block | xargs basename)", f"/var/lib/ceph/osd/ceph-{osd_id}/block" ] for path in common_paths: result = run_command(f"readlink -f {path} 2>/dev/null", host=hostname) if result and result.startswith('/dev/'): return result return None def get_smart_data_remote(device_path, hostname): """Get SMART data from a remote host""" if not device_path: return None # Remove partition numbers if present base_device = re.sub(r'p?\d+$', '', device_path) cmd = f"smartctl -a -j {base_device} 2>/dev/null" result = run_command(cmd, host=hostname, parse_json=True) return result def get_device_health(osd_id, hostname): """Get device SMART health metrics from the appropriate host""" # First try ceph's built-in health metrics data = run_command(f"ceph device query-daemon-health-metrics osd.{osd_id} -f json 2>/dev/null", parse_json=True) if data and 'ata_smart_attributes' in data or 'nvme_smart_health_information_log' in data: return data # If that fails, get device path and query via SSH device_path = get_device_path_for_osd(osd_id, hostname) if device_path: return get_smart_data_remote(device_path, hostname) return None def parse_smart_health(smart_data): """Parse SMART data and calculate health score""" score = 100.0 issues = [] metrics = {} if not smart_data: return 50.0, ["No SMART data available"], metrics # Check for different SMART data formats if 'ata_smart_attributes' in smart_data: attrs = smart_data['ata_smart_attributes'].get('table', []) for attr in attrs: attr_id = attr.get('id') name = attr.get('name', '') value = attr.get('value', 0) worst = attr.get('worst', 0) raw_value = attr.get('raw', {}).get('value', 0) # Reallocated Sectors (5) if attr_id == 5: metrics['reallocated_sectors'] = raw_value if raw_value > 0: score -= min(20, raw_value * 2) issues.append(f"Reallocated sectors: {raw_value}") # Spin Retry Count (10) elif attr_id == 10: metrics['spin_retry'] = raw_value if raw_value > 0: score -= min(15, raw_value * 3) issues.append(f"Spin retry count: {raw_value}") # Pending Sectors (197) elif attr_id == 197: metrics['pending_sectors'] = raw_value if raw_value > 0: score -= min(25, raw_value * 5) issues.append(f"Pending sectors: {raw_value}") # Uncorrectable Sectors (198) elif attr_id == 198: metrics['uncorrectable_sectors'] = raw_value if raw_value > 0: score -= min(30, raw_value * 5) issues.append(f"Uncorrectable sectors: {raw_value}") # Temperature (190, 194) elif attr_id in [190, 194]: metrics['temperature'] = raw_value if raw_value > 60: score -= min(10, (raw_value - 60) * 2) issues.append(f"High temperature: {raw_value}°C") # Power On Hours (9) elif attr_id == 9: metrics['power_on_hours'] = raw_value age_years = raw_value / 8760 metrics['age_years'] = age_years if age_years > 5: score -= min(15, (age_years - 5) * 3) issues.append(f"Drive age: {age_years:.1f} years") # Wear leveling (for SSDs, 177) elif attr_id == 177 and value < worst: metrics['wear_leveling'] = value wear_percent = 100 - value if wear_percent > 20: score -= min(20, wear_percent) issues.append(f"Wear level: {wear_percent}%") # NVMe SMART data elif 'nvme_smart_health_information_log' in smart_data: nvme_health = smart_data['nvme_smart_health_information_log'] # Available spare spare = nvme_health.get('available_spare', 100) if spare < 50: score -= (100 - spare) * 0.5 issues.append(f"Low available spare: {spare}%") # Percentage used pct_used = nvme_health.get('percentage_used', 0) metrics['percentage_used'] = pct_used if pct_used > 80: score -= min(30, (pct_used - 80) * 1.5) issues.append(f"High wear: {pct_used}%") # Media errors media_errors = nvme_health.get('media_errors', 0) if media_errors > 0: score -= min(25, media_errors * 5) issues.append(f"Media errors: {media_errors}") # Temperature temp = nvme_health.get('temperature', 0) metrics['temperature'] = temp if temp > 70: score -= min(10, (temp - 70) * 2) issues.append(f"High temperature: {temp}°C") return max(0, score), issues, metrics def calculate_capacity_score(osd_data, host_osds_data, osd_class): """Calculate score based on capacity optimization potential""" score = 0.0 factors = [] weight = osd_data.get('crush_weight', 0) utilization = osd_data.get('utilization', 0) # Small drives are better candidates (more capacity gain) if weight < 2: score += 40 factors.append(f"Very small drive ({weight}TB) - high capacity gain") elif weight < 5: score += 30 factors.append(f"Small drive ({weight}TB) - good capacity gain") elif weight < 10: score += 15 factors.append(f"Medium drive ({weight}TB)") else: score += 5 factors.append(f"Large drive ({weight}TB) - lower priority") # High utilization drives are harder to replace if utilization > 70: score -= 15 factors.append(f"High utilization ({utilization:.1f}%) - requires data migration") elif utilization > 50: score -= 8 factors.append(f"Medium utilization ({utilization:.1f}%)") # Host balance consideration same_class_osds = [o for o in host_osds_data if o.get('device_class') == osd_class] if same_class_osds: host_total_weight = sum(o.get('crush_weight', 0) for o in same_class_osds) host_avg_weight = host_total_weight / len(same_class_osds) if weight < host_avg_weight * 0.5: score += 15 factors.append(f"Below host average ({host_avg_weight:.1f}TB) - improves balance") return score, factors def calculate_resilience_score(osd_data, host_name, all_hosts_data, osd_tree): """Calculate score based on cluster resilience improvement""" score = 0.0 factors = [] osd_class = osd_data.get('device_class', 'hdd') # Count OSDs per host by class host_class_counts = {} for host_node in [n for n in osd_tree['nodes'] if n['type'] == 'host']: h_name = host_node['name'] host_osds = [osd_tree['nodes'][i] for i in range(len(osd_tree['nodes'])) if osd_tree['nodes'][i].get('id') in host_node.get('children', []) and osd_tree['nodes'][i].get('type') == 'osd'] host_class_counts[h_name] = { 'hdd': len([o for o in host_osds if o.get('device_class') == 'hdd' and o.get('status') == 'up']), 'nvme': len([o for o in host_osds if o.get('device_class') == 'nvme' and o.get('status') == 'up']) } if host_name not in host_class_counts: return 0, ["Host not found in cluster"] current_count = host_class_counts[host_name][osd_class] avg_count = sum(h[osd_class] for h in host_class_counts.values()) / len(host_class_counts) # Hosts with more OSDs are better candidates for reduction if current_count > avg_count * 1.2: score += 20 factors.append(f"Host has {current_count} {osd_class} OSDs (above average {avg_count:.1f})") elif current_count > avg_count: score += 10 factors.append(f"Host slightly above average {osd_class} count") # Check for down OSDs on same host (indicates potential issues) host_node = next((n for n in osd_tree['nodes'] if n['type'] == 'host' and n['name'] == host_name), None) if host_node: down_osds = [osd_tree['nodes'][i] for i in range(len(osd_tree['nodes'])) if osd_tree['nodes'][i].get('id') in host_node.get('children', []) and osd_tree['nodes'][i].get('status') == 'down'] if down_osds: score += 15 factors.append(f"Host has {len(down_osds)} down OSD(s) - may have hardware issues") return score, factors def calculate_performance_score(osd_perf_data, pg_count, avg_pg_count): """Calculate score based on performance metrics""" score = 0.0 factors = [] if not osd_perf_data: return 0, ["No performance data available"] commit_latency = osd_perf_data.get('commit_latency_ms', 0) apply_latency = osd_perf_data.get('apply_latency_ms', 0) # High latency indicates slow drive if commit_latency > 50: score += 15 factors.append(f"High commit latency ({commit_latency}ms)") elif commit_latency > 30: score += 8 factors.append(f"Elevated commit latency ({commit_latency}ms)") if apply_latency > 50: score += 15 factors.append(f"High apply latency ({apply_latency}ms)") # PG imbalance if pg_count > avg_pg_count * 1.3: score += 10 factors.append(f"High PG count ({pg_count} vs avg {avg_pg_count:.0f})") elif pg_count < avg_pg_count * 0.7: score -= 5 factors.append(f"Low PG count ({pg_count}) - already underutilized") return score, factors def analyze_cluster(): """Main analysis function""" print(f"{Colors.BOLD}{Colors.CYAN}=== Ceph OSD Replacement Candidate Analyzer ==={Colors.END}\n") # Gather data print("Gathering cluster data...") osd_tree = get_osd_tree() osd_df = get_osd_df() osd_perf = get_osd_perf() if not osd_tree or not osd_df: print(f"{Colors.RED}Failed to gather cluster data{Colors.END}") return # Build OSD to host mapping osd_to_host = get_osd_host_mapping(osd_tree) # Parse OSD data osd_df_map = {node['id']: node for node in osd_df['nodes']} osd_perf_map = {p['id']: p for p in osd_perf.get('osd_perf_infos', [])} if osd_perf else {} # Calculate average PG count pg_counts = [node['pgs'] for node in osd_df['nodes'] if node.get('pgs', 0) > 0] avg_pg_count = sum(pg_counts) / len(pg_counts) if pg_counts else 0 # Build host data map host_osds_map = defaultdict(list) for node in osd_tree['nodes']: if node['type'] == 'osd' and node.get('status') == 'up': host_name = osd_to_host.get(node['id']) if host_name: osd_df_data = osd_df_map.get(node['id'], {}) host_osds_map[host_name].append({ 'id': node['id'], 'device_class': node.get('device_class', 'hdd'), 'crush_weight': osd_df_data.get('crush_weight', 0) }) # Analyze each OSD candidates = [] print("Analyzing OSDs across all cluster nodes...\n") total_osds = len([n for n in osd_tree['nodes'] if n['type'] == 'osd' and n.get('status') == 'up']) current_osd = 0 for node in osd_tree['nodes']: if node['type'] != 'osd' or node.get('status') != 'up': continue current_osd += 1 osd_id = node['id'] osd_name = node['name'] device_class = node.get('device_class', 'hdd') host_name = osd_to_host.get(osd_id, 'unknown') print(f"[{current_osd}/{total_osds}] Analyzing {osd_name} on {host_name} ({device_class})...".ljust(80), end='\r') # Get OSD data osd_df_data = osd_df_map.get(osd_id, {}) osd_perf_data = osd_perf_map.get(osd_id, {}) # SMART health analysis - query from the correct host health_data = get_device_health(osd_id, host_name) health_score, health_issues, health_metrics = parse_smart_health(health_data) # Capacity optimization score capacity_score, capacity_factors = calculate_capacity_score( osd_df_data, host_osds_map.get(host_name, []), device_class ) # Resilience score resilience_score, resilience_factors = calculate_resilience_score( node, host_name, host_osds_map, osd_tree ) # Performance score performance_score, performance_factors = calculate_performance_score( osd_perf_data, osd_df_data.get('pgs', 0), avg_pg_count ) # Calculate total score (weighted) total_score = ( (100 - health_score) * 0.40 + # Health is most important capacity_score * 0.30 + # Capacity optimization resilience_score * 0.20 + # Cluster resilience performance_score * 0.10 # Performance issues ) candidates.append({ 'osd_id': osd_id, 'osd_name': osd_name, 'host': host_name, 'device_class': device_class, 'weight': osd_df_data.get('crush_weight', 0), 'size': osd_df_data.get('kb', 0) / 1024 / 1024 / 1024, # TB 'utilization': osd_df_data.get('utilization', 0), 'pgs': osd_df_data.get('pgs', 0), 'total_score': total_score, 'health_score': health_score, 'health_issues': health_issues, 'health_metrics': health_metrics, 'capacity_factors': capacity_factors, 'resilience_factors': resilience_factors, 'performance_factors': performance_factors, }) print(" " * 80, end='\r') # Clear the line # Sort by total score (descending) candidates.sort(key=lambda x: x['total_score'], reverse=True) # Display results print(f"\n{Colors.BOLD}{Colors.CYAN}=== TOP REPLACEMENT CANDIDATES (ALL HOSTS) ==={Colors.END}\n") for rank, candidate in enumerate(candidates[:15], 1): score_color = Colors.RED if candidate['total_score'] > 50 else Colors.YELLOW if candidate['total_score'] > 30 else Colors.GREEN health_color = Colors.GREEN if candidate['health_score'] > 80 else Colors.YELLOW if candidate['health_score'] > 60 else Colors.RED print(f"{Colors.BOLD}#{rank} - {candidate['osd_name']} ({candidate['device_class'].upper()}){Colors.END}") print(f" Host: {candidate['host']}") print(f" Size: {candidate['size']:.2f} TB (weight: {candidate['weight']:.2f})") print(f" Utilization: {candidate['utilization']:.1f}% | PGs: {candidate['pgs']}") print(f" {score_color}Replacement Score: {candidate['total_score']:.1f}/100{Colors.END}") print(f" {health_color}Health Score: {candidate['health_score']:.1f}/100{Colors.END}") if candidate['health_issues']: print(f" {Colors.RED}Health Issues:{Colors.END}") for issue in candidate['health_issues'][:3]: print(f" - {issue}") if candidate['capacity_factors']: print(f" Capacity Optimization:") for factor in candidate['capacity_factors'][:2]: print(f" • {factor}") if candidate['resilience_factors']: print(f" Resilience Impact:") for factor in candidate['resilience_factors'][:2]: print(f" • {factor}") if candidate['performance_factors']: print(f" Performance Metrics:") for factor in candidate['performance_factors'][:2]: print(f" • {factor}") print() # Summary by class print(f"\n{Colors.BOLD}{Colors.CYAN}=== SUMMARY BY DEVICE CLASS ==={Colors.END}\n") for device_class in ['hdd', 'nvme']: class_candidates = [c for c in candidates if c['device_class'] == device_class] if class_candidates: top_candidate = class_candidates[0] print(f"{Colors.BOLD}{device_class.upper()}:{Colors.END}") print(f" Top candidate: {top_candidate['osd_name']} (score: {top_candidate['total_score']:.1f})") print(f" Host: {top_candidate['host']}") print(f" Capacity gain potential: {top_candidate['weight']:.2f} TB") print() # Summary by host print(f"\n{Colors.BOLD}{Colors.CYAN}=== TOP CANDIDATES BY HOST ==={Colors.END}\n") hosts_seen = set() for candidate in candidates: if candidate['host'] not in hosts_seen and len(hosts_seen) < 5: hosts_seen.add(candidate['host']) print(f"{Colors.BOLD}{candidate['host']}:{Colors.END}") print(f" Top candidate: {candidate['osd_name']} (score: {candidate['total_score']:.1f})") print(f" {candidate['device_class'].upper()}, {candidate['weight']:.2f} TB, {candidate['utilization']:.1f}% used") if candidate['health_issues']: print(f" Issues: {candidate['health_issues'][0]}") print() if __name__ == "__main__": parser = argparse.ArgumentParser(description='Analyze Ceph OSDs for replacement candidates across entire cluster') parser.add_argument('--class', dest='device_class', choices=['hdd', 'nvme'], help='Filter by device class') parser.add_argument('--min-size', type=float, default=0, help='Minimum OSD size in TB to consider') args = parser.parse_args() try: analyze_cluster() except KeyboardInterrupt: print(f"\n{Colors.YELLOW}Analysis interrupted{Colors.END}") sys.exit(0) except Exception as e: print(f"{Colors.RED}Error: {e}{Colors.END}") import traceback traceback.print_exc() sys.exit(1)