Files
analyzeOSDs/ceph_osd_analyzer.py

477 lines
18 KiB
Python
Raw Normal View History

2025-12-22 16:40:19 -05:00
#!/usr/bin/env python3
"""
Advanced Ceph OSD Replacement Candidate Analyzer
This script identifies the best OSD replacement candidates by analyzing:
- SMART health data (wear, errors, temperature)
- Capacity utilization and imbalance
- Host-level distribution and resilience
- Age and performance metrics
- PG distribution balance
Usage: sudo python3 ceph_osd_analyzer.py [--class hdd|nvme] [--min-size 8]
"""
import json
import subprocess
import sys
import argparse
from collections import defaultdict
from datetime import datetime
import re
class Colors:
RED = '\033[91m'
YELLOW = '\033[93m'
GREEN = '\033[92m'
BLUE = '\033[94m'
CYAN = '\033[96m'
BOLD = '\033[1m'
END = '\033[0m'
def run_command(cmd, parse_json=False):
"""Execute shell command and return output"""
try:
result = subprocess.run(cmd, shell=True, capture_output=True, text=True, check=True)
if parse_json:
return json.loads(result.stdout)
return result.stdout.strip()
except subprocess.CalledProcessError as e:
print(f"{Colors.RED}Error executing: {cmd}{Colors.END}")
print(f"{Colors.RED}{e.stderr}{Colors.END}")
return None if parse_json else ""
except json.JSONDecodeError as e:
print(f"{Colors.RED}Error parsing JSON from: {cmd}{Colors.END}")
return None
def get_osd_tree():
"""Get OSD tree structure"""
return run_command("ceph osd tree -f json", parse_json=True)
def get_osd_df():
"""Get OSD disk usage statistics"""
return run_command("ceph osd df -f json", parse_json=True)
def get_osd_metadata(osd_id):
"""Get metadata for specific OSD"""
return run_command(f"ceph osd metadata osd.{osd_id} -f json", parse_json=True)
def get_osd_perf():
"""Get OSD performance statistics"""
return run_command("ceph osd perf -f json", parse_json=True)
def get_pg_dump():
"""Get PG dump for distribution analysis"""
return run_command("ceph pg dump -f json 2>/dev/null", parse_json=True)
def get_device_health(osd_id):
"""Get device SMART health metrics"""
data = run_command(f"ceph device query-daemon-health-metrics osd.{osd_id} -f json 2>/dev/null", parse_json=True)
if not data:
# Try alternative method
metadata = get_osd_metadata(osd_id)
if metadata and 'device_ids' in metadata:
device = metadata['device_ids']
return get_smart_data_direct(device)
return data
def get_smart_data_direct(device_path):
"""Fallback: Get SMART data directly from device"""
# Try to find the device
try:
result = run_command(f"smartctl -a -j {device_path} 2>/dev/null", parse_json=True)
return result
except:
return None
def get_osd_pool_mapping():
"""Get which pools each OSD belongs to"""
pg_data = get_pg_dump()
osd_pools = defaultdict(set)
if pg_data and 'pg_map' in pg_data and 'pg_stats' in pg_data['pg_map']:
for pg in pg_data['pg_map']['pg_stats']:
pool_id = pg['pgid'].split('.')[0]
for osd in pg.get('acting', []):
osd_pools[osd].add(int(pool_id))
return osd_pools
def parse_smart_health(smart_data):
"""Parse SMART data and calculate health score"""
score = 100.0
issues = []
metrics = {}
if not smart_data:
return 50.0, ["No SMART data available"], metrics
# Check for different SMART data formats
if 'ata_smart_attributes' in smart_data:
attrs = smart_data['ata_smart_attributes'].get('table', [])
for attr in attrs:
attr_id = attr.get('id')
name = attr.get('name', '')
value = attr.get('value', 0)
worst = attr.get('worst', 0)
raw_value = attr.get('raw', {}).get('value', 0)
# Reallocated Sectors (5)
if attr_id == 5:
metrics['reallocated_sectors'] = raw_value
if raw_value > 0:
score -= min(20, raw_value * 2)
issues.append(f"Reallocated sectors: {raw_value}")
# Spin Retry Count (10)
elif attr_id == 10:
metrics['spin_retry'] = raw_value
if raw_value > 0:
score -= min(15, raw_value * 3)
issues.append(f"Spin retry count: {raw_value}")
# Pending Sectors (197)
elif attr_id == 197:
metrics['pending_sectors'] = raw_value
if raw_value > 0:
score -= min(25, raw_value * 5)
issues.append(f"Pending sectors: {raw_value}")
# Uncorrectable Sectors (198)
elif attr_id == 198:
metrics['uncorrectable_sectors'] = raw_value
if raw_value > 0:
score -= min(30, raw_value * 5)
issues.append(f"Uncorrectable sectors: {raw_value}")
# Temperature (190, 194)
elif attr_id in [190, 194]:
metrics['temperature'] = raw_value
if raw_value > 60:
score -= min(10, (raw_value - 60) * 2)
issues.append(f"High temperature: {raw_value}°C")
# Power On Hours (9)
elif attr_id == 9:
metrics['power_on_hours'] = raw_value
age_years = raw_value / 8760
metrics['age_years'] = age_years
if age_years > 5:
score -= min(15, (age_years - 5) * 3)
issues.append(f"Drive age: {age_years:.1f} years")
# Wear leveling (for SSDs, 177)
elif attr_id == 177 and value < worst:
metrics['wear_leveling'] = value
wear_percent = 100 - value
if wear_percent > 20:
score -= min(20, wear_percent)
issues.append(f"Wear level: {wear_percent}%")
# NVMe SMART data
elif 'nvme_smart_health_information_log' in smart_data:
nvme_health = smart_data['nvme_smart_health_information_log']
# Available spare
spare = nvme_health.get('available_spare', 100)
if spare < 50:
score -= (100 - spare) * 0.5
issues.append(f"Low available spare: {spare}%")
# Percentage used
pct_used = nvme_health.get('percentage_used', 0)
metrics['percentage_used'] = pct_used
if pct_used > 80:
score -= min(30, (pct_used - 80) * 1.5)
issues.append(f"High wear: {pct_used}%")
# Media errors
media_errors = nvme_health.get('media_errors', 0)
if media_errors > 0:
score -= min(25, media_errors * 5)
issues.append(f"Media errors: {media_errors}")
# Temperature
temp = nvme_health.get('temperature', 0)
metrics['temperature'] = temp
if temp > 70:
score -= min(10, (temp - 70) * 2)
issues.append(f"High temperature: {temp}°C")
return max(0, score), issues, metrics
def calculate_capacity_score(osd_data, host_osds, osd_class):
"""Calculate score based on capacity optimization potential"""
score = 0.0
factors = []
weight = osd_data.get('crush_weight', 0)
utilization = osd_data.get('utilization', 0)
# Small drives are better candidates (more capacity gain)
if weight < 2:
score += 40
factors.append(f"Very small drive ({weight}TB) - high capacity gain")
elif weight < 5:
score += 30
factors.append(f"Small drive ({weight}TB) - good capacity gain")
elif weight < 10:
score += 15
factors.append(f"Medium drive ({weight}TB)")
else:
score += 5
factors.append(f"Large drive ({weight}TB) - lower priority")
# High utilization drives are harder to replace
if utilization > 70:
score -= 15
factors.append(f"High utilization ({utilization:.1f}%) - requires data migration")
elif utilization > 50:
score -= 8
factors.append(f"Medium utilization ({utilization:.1f}%)")
# Host balance consideration
host_total_weight = sum(o.get('crush_weight', 0) for o in host_osds if o.get('device_class') == osd_class)
host_avg_weight = host_total_weight / len([o for o in host_osds if o.get('device_class') == osd_class]) if host_osds else 0
if weight < host_avg_weight * 0.5:
score += 15
factors.append(f"Below host average ({host_avg_weight:.1f}TB) - improves balance")
return score, factors
def calculate_resilience_score(osd_data, host_data, all_hosts):
"""Calculate score based on cluster resilience improvement"""
score = 0.0
factors = []
host_name = host_data['name']
osd_class = osd_data.get('device_class', 'hdd')
# Count OSDs per host by class
host_class_counts = {}
for host in all_hosts:
host_class_counts[host['name']] = {
'hdd': len([o for o in host.get('children', []) if o.get('device_class') == 'hdd' and o.get('status') == 'up']),
'nvme': len([o for o in host.get('children', []) if o.get('device_class') == 'nvme' and o.get('status') == 'up'])
}
current_count = host_class_counts[host_name][osd_class]
avg_count = sum(h[osd_class] for h in host_class_counts.values()) / len(host_class_counts)
# Hosts with more OSDs are better candidates for reduction
if current_count > avg_count * 1.2:
score += 20
factors.append(f"Host has {current_count} {osd_class} OSDs (above average {avg_count:.1f})")
elif current_count > avg_count:
score += 10
factors.append(f"Host slightly above average {osd_class} count")
# Check for down OSDs on same host (indicates potential issues)
down_osds = [o for o in host_data.get('children', []) if o.get('status') == 'down']
if down_osds:
score += 15
factors.append(f"Host has {len(down_osds)} down OSD(s) - may have hardware issues")
return score, factors
def calculate_performance_score(osd_perf_data, pg_count, avg_pg_count):
"""Calculate score based on performance metrics"""
score = 0.0
factors = []
if not osd_perf_data:
return 0, ["No performance data available"]
commit_latency = osd_perf_data.get('commit_latency_ms', 0)
apply_latency = osd_perf_data.get('apply_latency_ms', 0)
# High latency indicates slow drive
if commit_latency > 50:
score += 15
factors.append(f"High commit latency ({commit_latency}ms)")
elif commit_latency > 30:
score += 8
factors.append(f"Elevated commit latency ({commit_latency}ms)")
if apply_latency > 50:
score += 15
factors.append(f"High apply latency ({apply_latency}ms)")
# PG imbalance
if pg_count > avg_pg_count * 1.3:
score += 10
factors.append(f"High PG count ({pg_count} vs avg {avg_pg_count:.0f})")
elif pg_count < avg_pg_count * 0.7:
score -= 5
factors.append(f"Low PG count ({pg_count}) - already underutilized")
return score, factors
def analyze_cluster():
"""Main analysis function"""
print(f"{Colors.BOLD}{Colors.CYAN}=== Ceph OSD Replacement Candidate Analyzer ==={Colors.END}\n")
# Gather data
print("Gathering cluster data...")
osd_tree = get_osd_tree()
osd_df = get_osd_df()
osd_perf = get_osd_perf()
if not osd_tree or not osd_df:
print(f"{Colors.RED}Failed to gather cluster data{Colors.END}")
return
# Parse OSD data
osd_df_map = {node['id']: node for node in osd_df['nodes']}
osd_perf_map = {p['id']: p for p in osd_perf.get('osd_perf_infos', [])} if osd_perf else {}
# Calculate average PG count
pg_counts = [node['pgs'] for node in osd_df['nodes'] if node.get('pgs', 0) > 0]
avg_pg_count = sum(pg_counts) / len(pg_counts) if pg_counts else 0
# Build host map
hosts = [node for node in osd_tree['nodes'] if node['type'] == 'host']
host_map = {host['id']: host for host in hosts}
# Analyze each OSD
candidates = []
print("Analyzing OSDs...\n")
for host in hosts:
host_osds = [node for node in osd_tree['nodes'] if node.get('id') in [c for c in host.get('children', [])]]
for osd_id in host.get('children', []):
osd_node = next((n for n in osd_tree['nodes'] if n['id'] == osd_id), None)
if not osd_node or osd_node.get('status') != 'up':
continue
osd_id_num = osd_node['id']
osd_name = osd_node['name']
device_class = osd_node.get('device_class', 'hdd')
print(f"Analyzing {osd_name} ({device_class})...", end='\r')
# Get OSD data
osd_df_data = osd_df_map.get(osd_id_num, {})
osd_perf_data = osd_perf_map.get(osd_id_num, {})
# SMART health analysis
health_data = get_device_health(osd_id_num)
health_score, health_issues, health_metrics = parse_smart_health(health_data)
# Capacity optimization score
capacity_score, capacity_factors = calculate_capacity_score(
osd_df_data, host_osds, device_class
)
# Resilience score
resilience_score, resilience_factors = calculate_resilience_score(
osd_node, host, hosts
)
# Performance score
performance_score, performance_factors = calculate_performance_score(
osd_perf_data, osd_df_data.get('pgs', 0), avg_pg_count
)
# Calculate total score (weighted)
total_score = (
(100 - health_score) * 0.40 + # Health is most important
capacity_score * 0.30 + # Capacity optimization
resilience_score * 0.20 + # Cluster resilience
performance_score * 0.10 # Performance issues
)
candidates.append({
'osd_id': osd_id_num,
'osd_name': osd_name,
'host': host['name'],
'device_class': device_class,
'weight': osd_df_data.get('crush_weight', 0),
'size': osd_df_data.get('kb', 0) / 1024 / 1024 / 1024, # TB
'utilization': osd_df_data.get('utilization', 0),
'pgs': osd_df_data.get('pgs', 0),
'total_score': total_score,
'health_score': health_score,
'health_issues': health_issues,
'health_metrics': health_metrics,
'capacity_factors': capacity_factors,
'resilience_factors': resilience_factors,
'performance_factors': performance_factors,
})
print(" " * 80, end='\r') # Clear the line
# Sort by total score (descending)
candidates.sort(key=lambda x: x['total_score'], reverse=True)
# Display results
print(f"\n{Colors.BOLD}{Colors.CYAN}=== TOP REPLACEMENT CANDIDATES ==={Colors.END}\n")
for rank, candidate in enumerate(candidates[:15], 1):
score_color = Colors.RED if candidate['total_score'] > 50 else Colors.YELLOW if candidate['total_score'] > 30 else Colors.GREEN
health_color = Colors.GREEN if candidate['health_score'] > 80 else Colors.YELLOW if candidate['health_score'] > 60 else Colors.RED
print(f"{Colors.BOLD}#{rank} - {candidate['osd_name']} ({candidate['device_class'].upper()}){Colors.END}")
print(f" Host: {candidate['host']}")
print(f" Size: {candidate['size']:.2f} TB (weight: {candidate['weight']:.2f})")
print(f" Utilization: {candidate['utilization']:.1f}% | PGs: {candidate['pgs']}")
print(f" {score_color}Replacement Score: {candidate['total_score']:.1f}/100{Colors.END}")
print(f" {health_color}Health Score: {candidate['health_score']:.1f}/100{Colors.END}")
if candidate['health_issues']:
print(f" {Colors.RED}Health Issues:{Colors.END}")
for issue in candidate['health_issues'][:3]:
print(f" - {issue}")
if candidate['capacity_factors']:
print(f" Capacity Optimization:")
for factor in candidate['capacity_factors'][:2]:
print(f"{factor}")
if candidate['resilience_factors']:
print(f" Resilience Impact:")
for factor in candidate['resilience_factors'][:2]:
print(f"{factor}")
if candidate['performance_factors']:
print(f" Performance Metrics:")
for factor in candidate['performance_factors'][:2]:
print(f"{factor}")
print()
# Summary by class
print(f"\n{Colors.BOLD}{Colors.CYAN}=== SUMMARY BY DEVICE CLASS ==={Colors.END}\n")
for device_class in ['hdd', 'nvme']:
class_candidates = [c for c in candidates if c['device_class'] == device_class]
if class_candidates:
top_candidate = class_candidates[0]
print(f"{Colors.BOLD}{device_class.upper()}:{Colors.END}")
print(f" Top candidate: {top_candidate['osd_name']} (score: {top_candidate['total_score']:.1f})")
print(f" Host: {top_candidate['host']}")
print(f" Capacity gain potential: {top_candidate['weight']:.2f} TB")
print()
if __name__ == "__main__":
parser = argparse.ArgumentParser(description='Analyze Ceph OSDs for replacement candidates')
parser.add_argument('--class', dest='device_class', choices=['hdd', 'nvme'],
help='Filter by device class')
parser.add_argument('--min-size', type=float, default=0,
help='Minimum OSD size in TB to consider')
args = parser.parse_args()
try:
analyze_cluster()
except KeyboardInterrupt:
print(f"\n{Colors.YELLOW}Analysis interrupted{Colors.END}")
sys.exit(0)
except Exception as e:
print(f"{Colors.RED}Error: {e}{Colors.END}")
import traceback
traceback.print_exc()
sys.exit(1)