From e166e3fcb40924b72f91d38fd4583972b20ad94c Mon Sep 17 00:00:00 2001 From: Jared Vititoe Date: Mon, 11 May 2026 08:50:51 -0400 Subject: [PATCH] fix: LDAP conn leak, health timing info, security headers, link_stats size guard - app.py: move conn.unbind() into finally block in api_avatar() so connection is always closed even if conn.search() throws - app.py: remove elapsed-time strings from /health response (unauthenticated endpoint no longer leaks monitor timing) - app.py: add after_request hook setting X-Content-Type-Options, X-Frame-Options, Referrer-Policy on all responses - app.py: add 10 MB size guard on link_stats before JSON parse; log actual exception on parse failure - app.py: wrap suppressions_page network_snapshot parse in try/except (same protection as index page) Co-Authored-By: Claude Sonnet 4.6 --- app.py | 33 +++++++++++++++++++++++++++------ 1 file changed, 27 insertions(+), 6 deletions(-) diff --git a/app.py b/app.py index 20dd900..4480717 100644 --- a/app.py +++ b/app.py @@ -94,6 +94,14 @@ def _config() -> dict: return _cfg +@app.after_request +def add_security_headers(response): + response.headers.setdefault('X-Content-Type-Options', 'nosniff') + response.headers.setdefault('X-Frame-Options', 'DENY') + response.headers.setdefault('Referrer-Policy', 'strict-origin-when-cross-origin') + return response + + def _daemon_ok(last_check: str) -> bool: """Return True if monitor last checked within 20 minutes.""" if not last_check or last_check == 'Never': @@ -225,7 +233,11 @@ def suppressions_page(): active = db.get_active_suppressions() history = db.get_suppression_history(limit=50) snapshot_raw = db.get_state('network_snapshot') - snapshot = json.loads(snapshot_raw) if snapshot_raw else {} + try: + snapshot = json.loads(snapshot_raw) if snapshot_raw else {} + except Exception as e: + logger.error(f'Failed to parse network_snapshot JSON: {e}') + snapshot = {} return render_template( 'suppressions.html', user=user, @@ -272,10 +284,13 @@ def api_network(): def api_links(): raw = db.get_state('link_stats') if raw: + if len(raw) > 10_000_000: + logger.error(f'link_stats exceeds 10 MB ({len(raw)} bytes); possible corruption') + return jsonify({'error': 'Invalid cached data'}), 503 try: return jsonify(json.loads(raw)) - except Exception: - logger.error('Failed to parse link_stats JSON') + except Exception as e: + logger.error(f'Failed to parse link_stats JSON: {e}') return jsonify({'hosts': {}, 'updated': None}) @@ -548,6 +563,7 @@ def api_avatar(): return '', 404 avatar_data = None + conn = None try: import ldap3 server = ldap3.Server(ldap_cfg['host'], port=int(ldap_cfg.get('port', 3890))) @@ -560,13 +576,18 @@ def api_avatar(): f'(uid={safe_uid})', attributes=['avatar']) if conn.entries and conn.entries[0]['avatar'].value: avatar_data = conn.entries[0]['avatar'].value - conn.unbind() except ImportError: logger.error('ldap3 not installed — run: pip install ldap3') return '', 404 except Exception as e: logger.error(f'LDAP avatar lookup failed for {username}: {e}') return '', 404 + finally: + if conn is not None: + try: + conn.unbind() + except Exception: + pass if not avatar_data or len(avatar_data) < 100: with open(sentinel, 'w'): pass @@ -613,10 +634,10 @@ def health(): ts = datetime.strptime(last_check, '%Y-%m-%d %H:%M:%S UTC').replace(tzinfo=timezone.utc) age_s = (datetime.now(timezone.utc) - ts).total_seconds() if age_s > 1200: - checks['monitor'] = f'stale ({int(age_s)}s since last check)' + checks['monitor'] = 'stale' overall = 'degraded' else: - checks['monitor'] = f'ok ({int(age_s)}s ago)' + checks['monitor'] = 'ok' else: checks['monitor'] = 'no data yet' except Exception as e: