Fix ticket overview: priority display, impact indicators, non-drive detail boxes

- Replace emoji severity indicators (🔴🟡🟢) with ASCII ([CRIT]/[WARN]/[LOW]/[??])
- Fix banner priority to show actual P1-P5 level instead of hardcoded HIGH/MEDIUM
- Add LXC/container keyword detection to _get_issue_type()
- Rewrite _get_impact_level() with storage/CPU awareness to avoid false Critical
- Fix SMART description indentation with textwrap.dedent()
- Fix drive age showing "0 years" for drives < 1 year old (now shows months)
- Remove unused perf_metrics block
- Add structured boxed sections for CPU, Network, Container, and Ceph tickets
- Add _format_bytes_human() helper for LXC storage display

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-02-06 19:52:51 -05:00
parent 70b02de104
commit 058ea5ad06

View File

@@ -1,5 +1,5 @@
#!/usr/bin/env python3
import os, sys, json, requests, psutil, socket, subprocess, logging, argparse, urllib.request, re, glob, datetime, fcntl
import os, sys, json, requests, psutil, socket, subprocess, logging, argparse, urllib.request, re, glob, datetime, fcntl, textwrap
from typing import Dict, Any, List
# =============================================================================
@@ -422,10 +422,10 @@ class SystemHealthMonitor:
}
SEVERITY_INDICATORS = {
'CRITICAL': '🔴',
'WARNING': '🟡',
'HEALTHY': '🟢',
'UNKNOWN': ''
'CRITICAL': '[CRIT]',
'WARNING': '[WARN]',
'HEALTHY': '[ OK ]',
'UNKNOWN': '[ ?? ]'
}
SMART_DESCRIPTIONS = {
@@ -1096,22 +1096,31 @@ class SystemHealthMonitor:
return "Performance Issue"
elif "Network" in issue:
return "Network Issue"
elif any(kw in issue for kw in ["LXC", "storage usage", "container"]):
return "Container Storage Issue"
return "Hardware Issue"
def _get_impact_level(self, issue: str) -> str:
"""Determine impact level from issue description."""
issue_upper = issue.upper()
# Check storage/CPU warnings first so "critical storage" isn't caught as Critical
if any(kw in issue_upper for kw in ["STORAGE USAGE", "THRESHOLD", "CPU USAGE"]):
return "[WARN] Warning - Action Needed Soon"
if "CRITICAL" in issue_upper or "UNHEALTHY" in issue_upper or "HEALTH_ERR" in issue_upper:
return "🔴 Critical - Immediate Action Required"
return "[CRIT] Critical - Immediate Action Required"
elif "WARNING" in issue_upper or "HEALTH_WARN" in issue_upper or "DOWN" in issue_upper:
return "🟡 Warning - Action Needed Soon"
return "🟢 Low - Monitor Only"
return "[WARN] Warning - Action Needed Soon"
return "[LOW] Low - Monitor Only"
def _generate_detailed_description(self, issue: str, health_report: Dict[str, Any]) -> str:
def _generate_detailed_description(self, issue: str, health_report: Dict[str, Any], priority: str = '3') -> str:
"""Generate detailed ticket description with properly formatted ASCII art."""
hostname = socket.gethostname()
timestamp = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
priority = "⚠ HIGH" if "CRITICAL" in issue else "● MEDIUM"
priority_labels = {
'1': '⚠ P1 - CRITICAL', '2': '⚠ P2 - HIGH',
'3': '● P3 - MEDIUM', '4': '● P4 - NORMAL', '5': '● P5 - LOW',
}
priority_display = priority_labels.get(priority, '● P3 - MEDIUM')
# Box width: all lines are exactly 80 chars
# border lines: ┏ + 78 ━ + ┓ = 80
@@ -1124,7 +1133,7 @@ class SystemHealthMonitor:
{'' * box_width}
┃ Host : {hostname:<{box_width - 14}}
┃ Generated : {timestamp:<{box_width - 14}}
┃ Priority : {priority:<{box_width - 14}}
┃ Priority : {priority_display:<{box_width - 14}}
{'' * box_width}"""
issue_type = self._get_issue_type(issue)
@@ -1141,13 +1150,13 @@ class SystemHealthMonitor:
# Add relevant SMART descriptions
for attr in self.SMART_DESCRIPTIONS:
if attr in issue:
description += f"\n{attr}:\n{self.SMART_DESCRIPTIONS[attr]}\n"
description += f"\n{attr}:\n{textwrap.dedent(self.SMART_DESCRIPTIONS[attr]).strip()}\n"
if "SMART" in issue:
description += """
description += "\n" + textwrap.dedent("""
SMART (Self-Monitoring, Analysis, and Reporting Technology) Attribute Details:
- Possible drive failure!
"""
""").strip() + "\n"
if "Drive" in issue and "/dev/" in issue:
try:
@@ -1165,7 +1174,18 @@ class SystemHealthMonitor:
power_on_hours = smart_data['attributes'].get('Power_On_Hours', 'N/A')
last_test_date = smart_data.get('last_test_date', 'N/A')
age = f"{int(power_on_hours/24/365) if isinstance(power_on_hours, (int, float)) else 'N/A'} years" if power_on_hours != 'N/A' else 'N/A'
if power_on_hours != 'N/A' and isinstance(power_on_hours, (int, float)):
total_days = power_on_hours / 24
years = int(total_days / 365)
months = int((total_days % 365) / 30)
if years >= 1:
age = f"{years} year{'s' if years != 1 else ''}, {months} month{'s' if months != 1 else ''}"
elif months >= 1:
age = f"{months} month{'s' if months != 1 else ''}"
else:
age = "< 1 month"
else:
age = 'N/A'
# Ensure all values are properly formatted strings
device_safe = device or 'N/A'
@@ -1186,14 +1206,6 @@ class SystemHealthMonitor:
{'' * box_width}
"""
if drive_info:
perf_metrics = {
'read_speed': drive_info.get('performance_metrics', {}).get('read_speed', 'N/A'),
'write_speed': drive_info.get('performance_metrics', {}).get('write_speed', 'N/A'),
'access_time': drive_info.get('performance_metrics', {}).get('access_time', 'N/A'),
'iops': drive_info.get('performance_metrics', {}).get('iops', 'N/A')
}
power_on_safe = f"{power_on_hours} hours" if power_on_hours != 'N/A' else 'N/A'
last_test_safe = last_test_date or 'N/A'
age_safe = age or 'N/A'
@@ -1264,39 +1276,140 @@ class SystemHealthMonitor:
description += f"\nError generating drive details: {str(e)}\n"
if "Temperature" in issue:
description += """
description += "\n" + textwrap.dedent("""
High drive temperatures can:
- Reduce drive lifespan
- Cause performance degradation
- Lead to data corruption in extreme cases
Optimal temperature range: 20-45°C
"""
""").strip() + "\n"
if "ECC" in issue:
description += """
description += "\n" + textwrap.dedent("""
ECC (Error Correction Code) Memory Issues:
- Correctable: Memory errors that were successfully fixed
- Uncorrectable: Serious memory errors that could not be corrected
Frequent ECC corrections may indicate degrading memory modules
"""
""").strip() + "\n"
if "CPU" in issue:
description += """
description += "\n" + textwrap.dedent("""
High CPU usage sustained over time can indicate:
- Resource constraints
- Runaway processes
- Need for performance optimization
- Potential cooling issues
"""
""").strip() + "\n"
# Add CPU STATUS box
cpu_health = health_report.get('cpu_health', {})
cpu_usage = cpu_health.get('cpu_usage_percent', 'N/A')
cpu_threshold = self.CONFIG['THRESHOLDS']['CPU_WARNING']
cpu_status = cpu_health.get('status', 'N/A')
cpu_usage_str = f"{cpu_usage}%" if isinstance(cpu_usage, (int, float)) else cpu_usage
description += f"""
┏━ CPU STATUS {'' * (box_width - 13)}
┃ Usage │ {cpu_usage_str:<61}
┃ Threshold │ {str(cpu_threshold) + '%':<61}
┃ Status │ {cpu_status:<61}
{'' * box_width}
"""
if "Network" in issue:
description += """
description += "\n" + textwrap.dedent("""
Network connectivity issues can impact:
- Cluster communication
- Data replication
- Service availability
- Management access
"""
""").strip() + "\n"
# Add NETWORK STATUS box
net_health = health_report.get('network_health', {})
mgmt = net_health.get('management_network', {})
ceph_net = net_health.get('ceph_network', {})
mgmt_status = mgmt.get('status', 'N/A')
ceph_status = ceph_net.get('status', 'N/A')
mgmt_latency = mgmt.get('latency')
mgmt_latency_str = f"{mgmt_latency}ms" if mgmt_latency is not None else 'N/A'
mgmt_issues = mgmt.get('issues', [])
ceph_issues = ceph_net.get('issues', [])
all_net_issues = mgmt_issues + ceph_issues
issues_str = '; '.join(all_net_issues) if all_net_issues else 'None'
# Truncate issues string to fit in box
if len(issues_str) > 61:
issues_str = issues_str[:58] + '...'
description += f"""
┏━ NETWORK STATUS {'' * (box_width - 17)}
┃ Management │ {mgmt_status:<61}
┃ Ceph Network │ {ceph_status:<61}
┃ Latency │ {mgmt_latency_str:<61}
┃ Issues │ {issues_str:<61}
{'' * box_width}
"""
if any(kw in issue for kw in ["LXC", "storage usage", "container"]):
# Add CONTAINER STORAGE box
lxc_health = health_report.get('lxc_health', {})
containers = lxc_health.get('containers', [])
for container in containers:
vmid = container.get('vmid', 'N/A')
for fs in container.get('filesystems', []):
mountpoint = fs.get('mountpoint', 'N/A')
usage_pct = fs.get('usage_percent', 0)
total_bytes = fs.get('total_space', 0)
used_bytes = fs.get('used_space', 0)
avail_bytes = fs.get('available', 0)
# Only show filesystems relevant to this issue
if mountpoint not in issue and vmid not in issue:
continue
total_str = self._format_bytes_human(total_bytes) if isinstance(total_bytes, (int, float)) else str(total_bytes)
used_str = self._format_bytes_human(used_bytes) if isinstance(used_bytes, (int, float)) else str(used_bytes)
free_str = self._format_bytes_human(avail_bytes) if isinstance(avail_bytes, (int, float)) else str(avail_bytes)
# Create 50-char usage meter (2% per block)
blocks = int(usage_pct / 2)
usage_meter = '' * blocks + '' * (50 - blocks)
usage_pct_str = f"{usage_pct:.1f}%"
description += f"""
┏━ CONTAINER STORAGE {'' * (box_width - 20)}
┃ VMID │ {vmid:<61}
┃ Mountpoint │ {mountpoint:<61}
┃ Usage Meter │ {usage_meter} {usage_pct_str:>10}
┃ Total │ {total_str:<61}
┃ Used │ {used_str:<61}
┃ Free │ {free_str:<61}
{'' * box_width}
"""
if any(kw in issue for kw in ["Ceph", "OSD", "ceph", "HEALTH_ERR", "HEALTH_WARN"]):
# Add CEPH CLUSTER STATUS box
ceph_health = health_report.get('ceph_health', {})
if ceph_health.get('is_ceph_node'):
cluster_health = ceph_health.get('cluster_health', 'N/A')
cluster_usage = ceph_health.get('cluster_usage', {})
usage_pct = cluster_usage.get('usage_percent', 'N/A') if cluster_usage else 'N/A'
total_bytes = cluster_usage.get('total_bytes', 0) if cluster_usage else 0
used_bytes = cluster_usage.get('used_bytes', 0) if cluster_usage else 0
total_str = self._format_bytes_human(total_bytes) if total_bytes else 'N/A'
used_str = self._format_bytes_human(used_bytes) if used_bytes else 'N/A'
usage_pct_str = f"{usage_pct}%" if isinstance(usage_pct, (int, float)) else usage_pct
osd_list = ceph_health.get('osd_status', [])
osd_total = len(osd_list)
osd_up = sum(1 for o in osd_list if o.get('status') == 'up')
osd_summary = f"{osd_up}/{osd_total} up" if osd_total > 0 else 'N/A'
description += f"""
┏━ CEPH CLUSTER STATUS {'' * (box_width - 22)}
┃ Health │ {cluster_health:<61}
┃ Usage │ {usage_pct_str:<61}
┃ Total │ {total_str:<61}
┃ Used │ {used_str:<61}
┃ OSDs │ {osd_summary:<61}
{'' * box_width}
"""
if "Disk" in issue:
for partition in health_report.get('drives_health', {}).get('drives', []):
@@ -1602,7 +1715,7 @@ class SystemHealthMonitor:
f"{environment['PRODUCTION']}"
f"{ticket_type_tag}"
)
description = self._generate_detailed_description(issue, health_report)
description = self._generate_detailed_description(issue, health_report, priority)
ticket_payload = {
"title": ticket_title,
@@ -1822,6 +1935,14 @@ class SystemHealthMonitor:
return True
return False
def _format_bytes_human(self, num_bytes):
"""Format a byte count into a human-readable string."""
for unit in ['B', 'KB', 'MB', 'GB', 'TB', 'PB']:
if abs(num_bytes) < 1024.0:
return f"{num_bytes:.1f} {unit}"
num_bytes /= 1024.0
return f"{num_bytes:.1f} EB"
def _parse_size(self, size_str: str) -> float:
"""
Parse size string with units to bytes.