NoneType fix?

This commit is contained in:
2025-05-29 12:44:55 -04:00
parent f8784eddd2
commit 95a5a8227a

View File

@ -448,31 +448,53 @@ class SystemHealthMonitor:
'serial': None,
'capacity': None,
'firmware': None,
'type': None # SSD or HDD
'type': None, # SSD or HDD
'smart_capable': False
}
try:
result = subprocess.run(
# First check if device supports SMART
capability_result = subprocess.run(
['smartctl', '-i', device],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True
)
for line in result.stdout.split('\n'):
if 'Device Model' in line:
# Check if smartctl failed completely
if capability_result.returncode not in [0, 4]: # 0 = success, 4 = some SMART errors but readable
logger.debug(f"smartctl failed for {device}: return code {capability_result.returncode}")
return drive_details
output = capability_result.stdout
# Check if SMART is supported
if "SMART support is: Enabled" in output or "SMART support is: Available" in output:
drive_details['smart_capable'] = True
elif "SMART support is: Unavailable" in output or "does not support SMART" in output:
logger.debug(f"Device {device} does not support SMART")
return drive_details
for line in output.split('\n'):
if 'Device Model' in line or 'Model Number' in line:
drive_details['model'] = line.split(':')[1].strip()
elif 'Serial Number' in line:
drive_details['serial'] = line.split(':')[1].strip()
elif 'User Capacity' in line:
drive_details['capacity'] = line.split('[')[1].split(']')[0]
# Extract capacity from brackets
capacity_match = re.search(r'\[(.*?)\]', line)
if capacity_match:
drive_details['capacity'] = capacity_match.group(1)
elif 'Firmware Version' in line:
drive_details['firmware'] = line.split(':')[1].strip()
elif 'Rotation Rate' in line:
drive_details['type'] = 'SSD' if 'Solid State Device' in line else 'HDD'
if 'Solid State Device' in line:
drive_details['type'] = 'SSD'
else:
drive_details['type'] = 'HDD'
except Exception as e:
logger.debug(f"Error getting drive details: {e}")
logger.debug(f"Error getting drive details for {device}: {e}")
return drive_details
@ -801,8 +823,28 @@ class SystemHealthMonitor:
# Check for drive-related issues
for drive in health_report.get('drives_health', {}).get('drives', []):
if drive.get('smart_issues'):
issues.append(f"Drive {drive['device']} has SMART issues: {', '.join(drive['smart_issues'])}")
# Skip drives with ERROR or NOT_SUPPORTED status - these are likely virtual/unsupported devices
if drive.get('smart_status') in ['ERROR', 'NOT_SUPPORTED']:
logger.debug(f"Skipping issue detection for drive {drive['device']} with status {drive.get('smart_status')}")
continue
# Only report issues for drives with valid SMART status
if drive.get('smart_issues') and drive.get('smart_status') in ['HEALTHY', 'UNHEALTHY', 'UNKNOWN']:
# Filter out generic error messages that don't indicate real hardware issues
filtered_issues = []
for issue in drive['smart_issues']:
if not any(skip_phrase in issue for skip_phrase in [
"Error checking SMART:",
"Unable to read device information",
"SMART not supported",
"timed out"
]):
filtered_issues.append(issue)
if filtered_issues:
issues.append(f"Drive {drive['device']} has SMART issues: {', '.join(filtered_issues)}")
# Check temperature regardless of SMART status
if drive.get('temperature') and drive['temperature'] > self.CONFIG['THRESHOLDS']['TEMPERATURE_WARNING']:
issues.append(f"Drive {drive['device']} temperature is high: {drive['temperature']}°C")
@ -1086,10 +1128,10 @@ class SystemHealthMonitor:
def _check_smart_health(self, device: str) -> Dict[str, Any]:
"""
Enhanced SMART health check with manufacturer-specific thresholds.
Enhanced SMART health check with better error handling.
"""
smart_health = {
'status': 'HEALTHY',
'status': 'UNKNOWN',
'severity': 'NORMAL',
'issues': [],
'temp': None,
@ -1098,8 +1140,19 @@ class SystemHealthMonitor:
}
try:
# Get drive details first to determine manufacturer
# First verify the device is SMART-capable
drive_details = self._get_drive_details(device)
if not drive_details.get('smart_capable', False):
smart_health['status'] = 'NOT_SUPPORTED'
smart_health['issues'].append("SMART not supported on this device")
return smart_health
# If we have no model info, the device might not be responding properly
if not drive_details.get('model'):
smart_health['status'] = 'ERROR'
smart_health['issues'].append("Unable to read device information")
return smart_health
manufacturer_profile = self._get_manufacturer_profile(
drive_details.get('model', ''),
drive_details.get('manufacturer', '')
@ -1112,12 +1165,13 @@ class SystemHealthMonitor:
smart_health['severity'] = 'WARNING'
smart_health['issues'].extend(firmware_info['known_issues'])
# Get detailed SMART data
# Get detailed SMART data with timeout
result = subprocess.run(
['smartctl', '-A', '-H', '-l', 'error', '-l', 'background', device],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True
text=True,
timeout=30 # 30 second timeout
)
output = result.stdout
@ -1127,6 +1181,10 @@ class SystemHealthMonitor:
smart_health['status'] = 'UNHEALTHY'
smart_health['severity'] = 'CRITICAL'
smart_health['issues'].append("SMART overall health check failed")
elif 'PASSED' in output:
smart_health['status'] = 'HEALTHY'
else:
smart_health['status'] = 'UNKNOWN'
# Parse SMART attributes with manufacturer-specific handling
power_on_hours = 0
@ -1240,9 +1298,9 @@ class SystemHealthMonitor:
if recent_errors:
smart_health['severity'] = 'WARNING'
smart_health['issues'].extend(recent_errors)
logger.debug(f"=== SMART Health Check for {device} ===")
logger.debug(f"Manufacturer profile: {manufacturer_profile.get('aliases', ['Unknown'])[0]}")
logger.debug(f"Manufacturer profile: {manufacturer_profile.get('aliases', ['Unknown'])[0] if manufacturer_profile else 'None'}")
logger.debug("Raw SMART attributes:")
for attr, value in smart_health['attributes'].items():
logger.debug(f"{attr}: {value}")
@ -1253,38 +1311,58 @@ class SystemHealthMonitor:
# Special handling for NVMe drives
if 'nvme' in device:
nvme_result = subprocess.run(
['nvme', 'smart-log', device],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True
)
logger.debug(f"NVMe smart-log raw output for {device}:")
logger.debug(nvme_result.stdout)
# Add this line to initialize the temperature attribute
smart_health['attributes']['Temperature_Celsius'] = None
for line in nvme_result.stdout.split('\n'):
if 'temperature' in line.lower():
temp_str = line.split(':')[1].strip()
logger.debug(f"Raw temperature string: {temp_str}")
# Extract first temperature value
temp_value = int(''.join(c for c in temp_str if c.isdigit())[0:2])
logger.debug(f"Parsed temperature value: {temp_value}")
# Set both temperature fields
smart_health['temp'] = temp_value
smart_health['attributes']['Temperature_Celsius'] = temp_value
logger.debug(f"Final temperature recorded: {smart_health['temp']}")
break
try:
nvme_result = subprocess.run(
['nvme', 'smart-log', device],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
timeout=10
)
logger.debug(f"NVMe smart-log raw output for {device}:")
logger.debug(nvme_result.stdout)
# Initialize the temperature attribute
if smart_health['temp'] is None:
smart_health['attributes']['Temperature_Celsius'] = None
for line in nvme_result.stdout.split('\n'):
# Fix the NoneType error by checking if line exists and has content
if line and line.strip() and 'temperature' in line.lower():
try:
temp_str = line.split(':')[1].strip() if ':' in line else line.strip()
logger.debug(f"Raw temperature string: {temp_str}")
# Extract first temperature value more safely
digits = ''.join(c for c in temp_str if c.isdigit())
if len(digits) >= 2:
temp_value = int(digits[:2])
logger.debug(f"Parsed temperature value: {temp_value}")
# Set both temperature fields
smart_health['temp'] = temp_value
smart_health['attributes']['Temperature_Celsius'] = temp_value
logger.debug(f"Final temperature recorded: {smart_health['temp']}")
break
except (ValueError, IndexError, AttributeError) as e:
logger.debug(f"Error parsing NVMe temperature from line '{line}': {e}")
continue
except subprocess.TimeoutExpired:
logger.debug(f"NVMe smart-log for {device} timed out")
except Exception as e:
logger.debug(f"Error getting NVMe smart data for {device}: {e}")
except subprocess.TimeoutExpired:
smart_health['status'] = 'ERROR'
smart_health['issues'].append("SMART check timed out")
except Exception as e:
smart_health['status'] = 'ERROR'
smart_health['severity'] = 'UNKNOWN'
smart_health['issues'].append(f"Error checking SMART: {str(e)}")
logger.debug(f"Exception in _check_smart_health for {device}: {e}")
import traceback
logger.debug(traceback.format_exc())
return smart_health
@ -1292,11 +1370,15 @@ class SystemHealthMonitor:
drives_health = {'overall_status': 'NORMAL', 'drives': []}
try:
# Get physical disks only
physical_disks = [disk for disk in self._get_all_disks()
if disk.startswith(('/dev/sd', '/dev/nvme'))]
# Get only valid physical disks
physical_disks = self._get_all_disks()
logger.debug(f"Checking physical disks: {physical_disks}")
if not physical_disks:
logger.warning("No valid physical disks found for monitoring")
drives_health['overall_status'] = 'WARNING'
return drives_health
# Get ALL partition information including device mapper
partitions = psutil.disk_partitions(all=True)
@ -1355,10 +1437,15 @@ class SystemHealthMonitor:
'smart_attributes': smart_health['attributes']
})
# Only report issues for drives that should be monitored
if smart_health['status'] == 'UNHEALTHY':
overall_status = 'CRITICAL'
elif smart_health['issues'] and overall_status != 'CRITICAL':
overall_status = 'WARNING'
elif smart_health['status'] == 'ERROR':
# Don't escalate overall status for ERROR drives (might be virtual)
logger.debug(f"Drive {disk} returned ERROR status, skipping from issue detection")
elif smart_health['issues'] and smart_health['status'] not in ['ERROR', 'NOT_SUPPORTED']:
if overall_status != 'CRITICAL':
overall_status = 'WARNING'
drives_health['drives'].append(drive_report)