fix: LDAP conn leak, health timing info, security headers, link_stats size guard
Lint / Python (flake8) (push) Failing after 51s
Lint / JS (eslint) (push) Successful in 7s
Security / Python Security (bandit) (push) Successful in 1m2s
Test / Python Tests (pytest) (push) Failing after 1m21s
Lint / Notify on failure (push) Successful in 2s
Lint / Deploy (push) Has been skipped
Lint / Python (flake8) (push) Failing after 51s
Lint / JS (eslint) (push) Successful in 7s
Security / Python Security (bandit) (push) Successful in 1m2s
Test / Python Tests (pytest) (push) Failing after 1m21s
Lint / Notify on failure (push) Successful in 2s
Lint / Deploy (push) Has been skipped
- app.py: move conn.unbind() into finally block in api_avatar() so connection is always closed even if conn.search() throws - app.py: remove elapsed-time strings from /health response (unauthenticated endpoint no longer leaks monitor timing) - app.py: add after_request hook setting X-Content-Type-Options, X-Frame-Options, Referrer-Policy on all responses - app.py: add 10 MB size guard on link_stats before JSON parse; log actual exception on parse failure - app.py: wrap suppressions_page network_snapshot parse in try/except (same protection as index page) Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
@@ -94,6 +94,14 @@ def _config() -> dict:
|
||||
return _cfg
|
||||
|
||||
|
||||
@app.after_request
|
||||
def add_security_headers(response):
|
||||
response.headers.setdefault('X-Content-Type-Options', 'nosniff')
|
||||
response.headers.setdefault('X-Frame-Options', 'DENY')
|
||||
response.headers.setdefault('Referrer-Policy', 'strict-origin-when-cross-origin')
|
||||
return response
|
||||
|
||||
|
||||
def _daemon_ok(last_check: str) -> bool:
|
||||
"""Return True if monitor last checked within 20 minutes."""
|
||||
if not last_check or last_check == 'Never':
|
||||
@@ -225,7 +233,11 @@ def suppressions_page():
|
||||
active = db.get_active_suppressions()
|
||||
history = db.get_suppression_history(limit=50)
|
||||
snapshot_raw = db.get_state('network_snapshot')
|
||||
snapshot = json.loads(snapshot_raw) if snapshot_raw else {}
|
||||
try:
|
||||
snapshot = json.loads(snapshot_raw) if snapshot_raw else {}
|
||||
except Exception as e:
|
||||
logger.error(f'Failed to parse network_snapshot JSON: {e}')
|
||||
snapshot = {}
|
||||
return render_template(
|
||||
'suppressions.html',
|
||||
user=user,
|
||||
@@ -272,10 +284,13 @@ def api_network():
|
||||
def api_links():
|
||||
raw = db.get_state('link_stats')
|
||||
if raw:
|
||||
if len(raw) > 10_000_000:
|
||||
logger.error(f'link_stats exceeds 10 MB ({len(raw)} bytes); possible corruption')
|
||||
return jsonify({'error': 'Invalid cached data'}), 503
|
||||
try:
|
||||
return jsonify(json.loads(raw))
|
||||
except Exception:
|
||||
logger.error('Failed to parse link_stats JSON')
|
||||
except Exception as e:
|
||||
logger.error(f'Failed to parse link_stats JSON: {e}')
|
||||
return jsonify({'hosts': {}, 'updated': None})
|
||||
|
||||
|
||||
@@ -548,6 +563,7 @@ def api_avatar():
|
||||
return '', 404
|
||||
|
||||
avatar_data = None
|
||||
conn = None
|
||||
try:
|
||||
import ldap3
|
||||
server = ldap3.Server(ldap_cfg['host'], port=int(ldap_cfg.get('port', 3890)))
|
||||
@@ -560,13 +576,18 @@ def api_avatar():
|
||||
f'(uid={safe_uid})', attributes=['avatar'])
|
||||
if conn.entries and conn.entries[0]['avatar'].value:
|
||||
avatar_data = conn.entries[0]['avatar'].value
|
||||
conn.unbind()
|
||||
except ImportError:
|
||||
logger.error('ldap3 not installed — run: pip install ldap3')
|
||||
return '', 404
|
||||
except Exception as e:
|
||||
logger.error(f'LDAP avatar lookup failed for {username}: {e}')
|
||||
return '', 404
|
||||
finally:
|
||||
if conn is not None:
|
||||
try:
|
||||
conn.unbind()
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
if not avatar_data or len(avatar_data) < 100:
|
||||
with open(sentinel, 'w'): pass
|
||||
@@ -613,10 +634,10 @@ def health():
|
||||
ts = datetime.strptime(last_check, '%Y-%m-%d %H:%M:%S UTC').replace(tzinfo=timezone.utc)
|
||||
age_s = (datetime.now(timezone.utc) - ts).total_seconds()
|
||||
if age_s > 1200:
|
||||
checks['monitor'] = f'stale ({int(age_s)}s since last check)'
|
||||
checks['monitor'] = 'stale'
|
||||
overall = 'degraded'
|
||||
else:
|
||||
checks['monitor'] = f'ok ({int(age_s)}s ago)'
|
||||
checks['monitor'] = 'ok'
|
||||
else:
|
||||
checks['monitor'] = 'no data yet'
|
||||
except Exception as e:
|
||||
|
||||
Reference in New Issue
Block a user