Correction of deleted code

This commit is contained in:
2025-05-29 19:04:45 -04:00
parent 1e6260a899
commit 5ac12fd6b7

View File

@ -1311,4 +1311,473 @@ class SystemHealthMonitor:
logger.debug(f"=== SMART Health Check for {device} ===")
logger.debug(f"Manufacturer profile: {manufacturer_profile.get('aliases', ['Unknown'])[0] if manufacturer_profile else 'None'}")
logger.debug("Raw SMART attributes:")
for attr, value in smart_health['attributes'].
for attr, value in smart_health['attributes'].items():
logger.debug(f"{attr}: {value}")
logger.debug(f"Temperature: {smart_health['temp']}°C")
logger.debug(f"Is new drive: {is_new_drive}")
logger.debug(f"Detected Issues: {smart_health['issues']}")
logger.debug("=== End SMART Check ===\n")
# Special handling for NVMe drives
if 'nvme' in device:
try:
nvme_result = subprocess.run(
['nvme', 'smart-log', device],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
timeout=10
)
logger.debug(f"NVMe smart-log raw output for {device}:")
logger.debug(nvme_result.stdout)
# Initialize the temperature attribute
if smart_health['temp'] is None:
smart_health['attributes']['Temperature_Celsius'] = None
for line in nvme_result.stdout.split('\n'):
# Fix the NoneType error by checking if line exists and has content
if line and line.strip() and 'temperature' in line.lower():
try:
temp_str = line.split(':')[1].strip() if ':' in line else line.strip()
logger.debug(f"Raw temperature string: {temp_str}")
# Extract first temperature value more safely
digits = ''.join(c for c in temp_str if c.isdigit())
if len(digits) >= 2:
temp_value = int(digits[:2])
logger.debug(f"Parsed temperature value: {temp_value}")
# Set both temperature fields
smart_health['temp'] = temp_value
smart_health['attributes']['Temperature_Celsius'] = temp_value
logger.debug(f"Final temperature recorded: {smart_health['temp']}")
break
except (ValueError, IndexError, AttributeError) as e:
logger.debug(f"Error parsing NVMe temperature from line '{line}': {e}")
continue
except subprocess.TimeoutExpired:
logger.debug(f"NVMe smart-log for {device} timed out")
except Exception as e:
logger.debug(f"Error getting NVMe smart data for {device}: {e}")
except subprocess.TimeoutExpired:
smart_health['status'] = 'ERROR'
smart_health['issues'].append("SMART check timed out")
except Exception as e:
smart_health['status'] = 'ERROR'
smart_health['severity'] = 'UNKNOWN'
smart_health['issues'].append(f"Error checking SMART: {str(e)}")
logger.debug(f"Exception in _check_smart_health for {device}: {e}")
import traceback
logger.debug(traceback.format_exc())
return smart_health
def _check_drives_health(self) -> Dict[str, Any]:
drives_health = {'overall_status': 'NORMAL', 'drives': []}
try:
# Get only valid physical disks
physical_disks = self._get_all_disks()
logger.debug(f"Checking physical disks: {physical_disks}")
if not physical_disks:
logger.warning("No valid physical disks found for monitoring")
drives_health['overall_status'] = 'WARNING'
return drives_health
# Get ALL partition information including device mapper
partitions = psutil.disk_partitions(all=True)
# Create mapping of base devices to their partitions
device_partitions = {}
for part in partitions:
# Extract base device (e.g., /dev/sda from /dev/sda1)
base_device = re.match(r'(/dev/[a-z]+)', part.device)
if base_device:
base_dev = base_device.group(1)
if base_dev not in device_partitions:
device_partitions[base_dev] = []
device_partitions[base_dev].append(part)
overall_status = 'NORMAL'
for disk in physical_disks:
drive_report = {
'device': disk,
'partitions': [],
'smart_status': 'UNKNOWN',
'usage_percent': 0
}
# Add partition information if available
if disk in device_partitions:
total_used = 0
total_space = 0
for partition in device_partitions[disk]:
try:
usage = psutil.disk_usage(partition.mountpoint)
total_used += usage.used
total_space += usage.total
part_info = {
'device': partition.device,
'mountpoint': partition.mountpoint,
'fstype': partition.fstype,
'total_space': self._convert_bytes(usage.total),
'used_space': self._convert_bytes(usage.used),
'free_space': self._convert_bytes(usage.free),
'usage_percent': usage.percent
}
drive_report['partitions'].append(part_info)
except Exception as e:
logger.debug(f"Error getting partition usage for {partition.device}: {e}")
# Calculate overall drive usage percentage
if total_space > 0:
drive_report['usage_percent'] = (total_used / total_space) * 100
# Check SMART health
smart_health = self._check_smart_health(disk)
drive_report.update({
'smart_status': smart_health['status'],
'smart_issues': smart_health['issues'],
'temperature': smart_health['temp'],
'smart_attributes': smart_health['attributes']
})
# Only report issues for drives that should be monitored
if smart_health['status'] == 'UNHEALTHY':
overall_status = 'CRITICAL'
elif smart_health['status'] == 'ERROR':
# Don't escalate overall status for ERROR drives (might be virtual)
logger.debug(f"Drive {disk} returned ERROR status, skipping from issue detection")
elif smart_health['issues'] and smart_health['status'] not in ['ERROR', 'NOT_SUPPORTED']:
if overall_status != 'CRITICAL':
overall_status = 'WARNING'
drives_health['drives'].append(drive_report)
drives_health['overall_status'] = overall_status
except Exception as e:
logger.error(f"Error checking drives health: {str(e)}")
return drives_health
@staticmethod
def _convert_bytes(bytes_value: int, suffix: str = 'B') -> str:
"""
Convert bytes to a human-readable format.
:param bytes_value: Number of bytes to convert.
:param suffix: Suffix to append (default is 'B' for bytes).
:return: Formatted string with the size in human-readable form.
"""
for unit in ['', 'K', 'M', 'G', 'T', 'P', 'E', 'Z']:
if abs(bytes_value) < 1024.0:
return f"{bytes_value:.1f}{unit}{suffix}"
bytes_value /= 1024.0
return f"{bytes_value:.1f}Y{suffix}"
def _convert_size_to_bytes(self, size_str: str) -> float:
"""Convert size string with units to bytes"""
units = {'B': 1, 'K': 1024, 'M': 1024**2, 'G': 1024**3, 'T': 1024**4}
size = float(size_str[:-1])
unit = size_str[-1].upper()
return size * units[unit]
def _check_memory_usage(self) -> Dict[str, Any]:
"""
Check for ECC memory errors if ECC memory is present.
"""
memory_health = {
'has_ecc': False,
'ecc_errors': [],
'status': 'OK',
'total_memory': self._convert_bytes(psutil.virtual_memory().total),
'used_memory': self._convert_bytes(psutil.virtual_memory().used),
'memory_percent': psutil.virtual_memory().percent
}
try:
# First check using dmidecode
result = subprocess.run(
['dmidecode', '--type', 'memory'],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True
)
if 'Error Correction Type: Multi-bit ECC' in result.stdout:
memory_health['has_ecc'] = True
# If dmidecode didn't find ECC, try the edac method as backup
if not memory_health['has_ecc']:
edac_path = '/sys/devices/system/edac/mc'
if os.path.exists(edac_path) and os.listdir(edac_path):
for mc_dir in glob.glob('/sys/devices/system/edac/mc/mc[0-9]*'):
if os.path.exists(f"{mc_dir}/csrow0"):
memory_health['has_ecc'] = True
break
# If ECC is present, check for errors
if memory_health['has_ecc']:
for mc_dir in glob.glob('/sys/devices/system/edac/mc/mc[0-9]*'):
if os.path.exists(f"{mc_dir}/csrow0"):
ue_count = self._read_ecc_count(f"{mc_dir}/csrow0/ue_count")
if ue_count > 0:
memory_health['status'] = 'CRITICAL'
memory_health['ecc_errors'].append(
f"Uncorrectable ECC errors detected in {os.path.basename(mc_dir)}: {ue_count}"
)
ce_count = self._read_ecc_count(f"{mc_dir}/csrow0/ce_count")
if ce_count > 0:
if memory_health['status'] != 'CRITICAL':
memory_health['status'] = 'WARNING'
memory_health['ecc_errors'].append(
f"Correctable ECC errors detected in {os.path.basename(mc_dir)}: {ce_count}"
)
except Exception as e:
memory_health['status'] = 'ERROR'
memory_health['ecc_errors'].append(f"Error checking ECC status: {str(e)}")
return memory_health
def _read_ecc_count(self, filepath: str) -> int:
"""
Read ECC error count from a file.
:param filepath: Path to the ECC count file
:return: Number of ECC errors
"""
try:
with open(filepath, 'r') as f:
return int(f.read().strip())
except:
return 0
def _check_cpu_usage(self) -> Dict[str, Any]:
"""
Check CPU usage and return health metrics.
:return: Dictionary with CPU health metrics.
"""
cpu_usage_percent = psutil.cpu_percent(interval=1)
cpu_health = {
'cpu_usage_percent': cpu_usage_percent,
'status': 'OK' if cpu_usage_percent < self.CONFIG['THRESHOLDS']['CPU_WARNING'] else 'WARNING'
}
return cpu_health
def _check_network_status(self) -> Dict[str, Any]:
"""
Check the status of network interfaces and report any issues.
:return: Dictionary containing network health metrics and any issues found.
"""
network_health = {
'management_network': {
'issues': [],
'status': 'OK',
'latency': None
},
'ceph_network': {
'issues': [],
'status': 'OK',
'latency': None
}
}
try:
# Check management network connectivity
mgmt_result = subprocess.run(
[
"ping",
"-c", str(self.CONFIG['NETWORKS']['PING_COUNT']),
"-W", str(self.CONFIG['NETWORKS']['PING_TIMEOUT']),
self.CONFIG['NETWORKS']['MANAGEMENT']
],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True
)
if mgmt_result.returncode != 0:
network_health['management_network']['status'] = 'CRITICAL'
network_health['management_network']['issues'].append(
"Management network is unreachable"
)
# Check Ceph network connectivity
ceph_result = subprocess.run(
[
"ping",
"-c", str(self.CONFIG['NETWORKS']['PING_COUNT']),
"-W", str(self.CONFIG['NETWORKS']['PING_TIMEOUT']),
self.CONFIG['NETWORKS']['CEPH']
],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True
)
if ceph_result.returncode != 0:
network_health['ceph_network']['status'] = 'CRITICAL'
network_health['ceph_network']['issues'].append(
"Ceph network is unreachable"
)
return network_health
except Exception as e:
logger.error(f"Network health check failed: {e}")
return {
'status': 'ERROR',
'error': str(e)
}
def _check_lxc_storage(self) -> Dict[str, Any]:
"""
Check storage utilization for all running LXC containers
"""
logger.debug("Starting LXC storage check")
lxc_health = {
'status': 'OK',
'containers': [],
'issues': []
}
try:
result = subprocess.run(
['pct', 'list'],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True
)
logger.debug(f"pct list output:\n{result.stdout}")
for line in result.stdout.split('\n')[1:]:
if not line.strip():
continue
parts = line.split()
if len(parts) < 2:
logger.debug(f"Skipping invalid line: {line}")
continue
vmid, status = parts[0], parts[1]
if status.lower() == 'running':
logger.debug(f"Checking container {vmid} disk usage")
disk_info = subprocess.run(
['pct', 'df', vmid],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True
)
container_info = {
'vmid': vmid,
'filesystems': []
}
for fs_line in disk_info.stdout.split('\n')[1:]:
if not fs_line.strip() or 'MP' in fs_line:
continue
columns = line.split()
logger.debug(f"Split parts: {parts}")
if len(columns) >= 6:
try:
# Skip excluded mounts
if parts[0].startswith('appPool:') or '/mnt/pve/mediaf' in parts[0]:
continue
# Get the mountpoint (last column)
if len(parts) > 5:
# The mountpoint is the last column
mountpoint = columns[-1]
else:
mountpoint = "/"
# Skip excluded mountpoints
if self._is_excluded_mount(mountpoint):
logger.debug(f"Skipping excluded mount: {mountpoint}")
continue
# Parse size values safely
total_space = self._parse_size(columns[-5])
used_space = self._parse_size(columns[-4])
available_space = self._parse_size(columns[-3])
# Parse percentage safely
try:
usage_percent = float(columns[-2].rstrip('%'))
except (ValueError, IndexError):
# Calculate percentage if parsing fails
usage_percent = (used_space / total_space * 100) if total_space > 0 else 0
filesystem = {
'mountpoint': mountpoint,
'total_space': total_space,
'used_space': used_space,
'available': available_space,
'usage_percent': usage_percent
}
container_info['filesystems'].append(filesystem)
# Check thresholds
if usage_percent >= self.CONFIG['THRESHOLDS']['LXC_CRITICAL']:
lxc_health['status'] = 'CRITICAL'
issue = f"LXC {vmid} critical storage usage: {usage_percent:.1f}% on {mountpoint}"
lxc_health['issues'].append(issue)
elif usage_percent >= self.CONFIG['THRESHOLDS']['LXC_WARNING']:
if lxc_health['status'] != 'CRITICAL':
lxc_health['status'] = 'WARNING'
issue = f"LXC {vmid} high storage usage: {usage_percent:.1f}% on {mountpoint}"
lxc_health['issues'].append(issue)
logger.debug(f"Filesystem details: {filesystem}")
except Exception as e:
logger.debug(f"Error processing line: {str(e)}")
logger.debug(f"Full exception: {repr(e)}")
continue
# Only add container info if we have filesystem data
if container_info['filesystems']:
lxc_health['containers'].append(container_info)
logger.debug(f"Added container info for VMID {vmid}")
logger.debug("=== LXC Storage Check Summary ===")
logger.debug(f"Status: {lxc_health['status']}")
logger.debug(f"Total containers checked: {len(lxc_health['containers'])}")
logger.debug(f"Issues found: {len(lxc_health['issues'])}")
logger.debug("=== End LXC Storage Check ===")
except Exception as e:
logger.debug(f"Critical error during LXC storage check: {str(e)}")
lxc_health['status'] = 'ERROR'
error_msg = f"Error checking LXC storage: {str(e)}"
lxc_health['issues'].append(error_msg)
return lxc_health
def main():
parser = argparse.ArgumentParser(description="System Health Monitor")
parser.add_argument(
"--dry-run",
action="store_true",
help="Enable dry-run mode (simulate ticket creation without actual API calls)."
)
args = parser.parse_args()
monitor = SystemHealthMonitor(
ticket_api_url=SystemHealthMonitor.CONFIG['TICKET_API_URL'],
dry_run=args.dry_run
)
monitor.run()
if __name__ == "__main__":
main()