fix: LDAP empty-password guard, expires_minutes bounds, snapshot JSON safety, rate dict cleanup
Lint / Python (flake8) (push) Failing after 39s
Lint / JS (eslint) (push) Failing after 12s
Security / Python Security (bandit) (push) Successful in 41s
Test / Python Tests (pytest) (push) Failing after 1m28s
Lint / Notify on failure (push) Successful in 2s
Lint / Deploy (push) Has been skipped
Lint / Python (flake8) (push) Failing after 39s
Lint / JS (eslint) (push) Failing after 12s
Security / Python Security (bandit) (push) Successful in 41s
Test / Python Tests (pytest) (push) Failing after 1m28s
Lint / Notify on failure (push) Successful in 2s
Lint / Deploy (push) Has been skipped
- app.py: fail loudly if LDAP bind_pw is not configured rather than attempting anonymous bind - app.py: validate expires_minutes is 1–43200 (max 30 days) before storing suppression - app.py: wrap network_snapshot JSON parse in try/except so a corrupt DB value returns degraded page instead of 500 - app.py: prune _diag_rate entries inactive for >1h to prevent unbounded growth Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
@@ -182,7 +182,11 @@ def index():
|
|||||||
summary = db.get_status_summary()
|
summary = db.get_status_summary()
|
||||||
snapshot_raw = db.get_state('network_snapshot')
|
snapshot_raw = db.get_state('network_snapshot')
|
||||||
last_check = db.get_state('last_check', 'Never')
|
last_check = db.get_state('last_check', 'Never')
|
||||||
snapshot = json.loads(snapshot_raw) if snapshot_raw else {}
|
try:
|
||||||
|
snapshot = json.loads(snapshot_raw) if snapshot_raw else {}
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f'Failed to parse network_snapshot JSON: {e}')
|
||||||
|
snapshot = {}
|
||||||
suppressions = db.get_active_suppressions()
|
suppressions = db.get_active_suppressions()
|
||||||
_annotate_suppressions(events, suppressions)
|
_annotate_suppressions(events, suppressions)
|
||||||
recent_resolved = db.get_recent_resolved(hours=24, limit=10)
|
recent_resolved = db.get_recent_resolved(hours=24, limit=10)
|
||||||
@@ -327,13 +331,21 @@ def api_create_suppression():
|
|||||||
if len(target_detail) > 255:
|
if len(target_detail) > 255:
|
||||||
return jsonify({'error': 'target_detail must be 255 characters or fewer'}), 400
|
return jsonify({'error': 'target_detail must be 255 characters or fewer'}), 400
|
||||||
|
|
||||||
|
if expires_minutes is not None:
|
||||||
|
try:
|
||||||
|
expires_minutes = int(expires_minutes)
|
||||||
|
if expires_minutes <= 0 or expires_minutes > 43200:
|
||||||
|
return jsonify({'error': 'expires_minutes must be between 1 and 43200 (30 days)'}), 400
|
||||||
|
except (ValueError, TypeError):
|
||||||
|
return jsonify({'error': 'expires_minutes must be a valid integer'}), 400
|
||||||
|
|
||||||
sup_id = db.create_suppression(
|
sup_id = db.create_suppression(
|
||||||
target_type=target_type,
|
target_type=target_type,
|
||||||
target_name=target_name,
|
target_name=target_name,
|
||||||
target_detail=target_detail,
|
target_detail=target_detail,
|
||||||
reason=reason,
|
reason=reason,
|
||||||
suppressed_by=user['username'],
|
suppressed_by=user['username'],
|
||||||
expires_minutes=int(expires_minutes) if expires_minutes else None,
|
expires_minutes=expires_minutes,
|
||||||
)
|
)
|
||||||
logger.info(
|
logger.info(
|
||||||
f'Suppression #{sup_id} created by {user["username"]}: '
|
f'Suppression #{sup_id} created by {user["username"]}: '
|
||||||
@@ -449,7 +461,10 @@ def api_diagnose_start():
|
|||||||
requesting_user = _get_user()['username']
|
requesting_user = _get_user()['username']
|
||||||
now = time.time()
|
now = time.time()
|
||||||
with _diag_lock:
|
with _diag_lock:
|
||||||
# Rate limit: max 5 diagnostic jobs per user per minute
|
# Rate limit: max 5 diagnostic jobs per user per minute; prune stale user entries
|
||||||
|
stale_users = [u for u, ts in _diag_rate.items() if not ts or max(ts) < now - 3600]
|
||||||
|
for u in stale_users:
|
||||||
|
del _diag_rate[u]
|
||||||
recent = [t for t in _diag_rate.get(requesting_user, []) if now - t < 60]
|
recent = [t for t in _diag_rate.get(requesting_user, []) if now - t < 60]
|
||||||
if len(recent) >= 5:
|
if len(recent) >= 5:
|
||||||
return jsonify({'error': 'Rate limit exceeded: max 5 diagnostics per minute'}), 429
|
return jsonify({'error': 'Rate limit exceeded: max 5 diagnostics per minute'}), 429
|
||||||
@@ -527,13 +542,18 @@ def api_avatar():
|
|||||||
return '', 404
|
return '', 404
|
||||||
|
|
||||||
# Query lldap
|
# Query lldap
|
||||||
|
bind_pw = ldap_cfg.get('bind_pw', '')
|
||||||
|
if not bind_pw:
|
||||||
|
logger.error('LDAP bind_pw not configured — avatar lookup disabled')
|
||||||
|
return '', 404
|
||||||
|
|
||||||
avatar_data = None
|
avatar_data = None
|
||||||
try:
|
try:
|
||||||
import ldap3
|
import ldap3
|
||||||
server = ldap3.Server(ldap_cfg['host'], port=int(ldap_cfg.get('port', 3890)))
|
server = ldap3.Server(ldap_cfg['host'], port=int(ldap_cfg.get('port', 3890)))
|
||||||
conn = ldap3.Connection(server,
|
conn = ldap3.Connection(server,
|
||||||
user=ldap_cfg['bind_dn'],
|
user=ldap_cfg['bind_dn'],
|
||||||
password=ldap_cfg.get('bind_pw', ''),
|
password=bind_pw,
|
||||||
auto_bind=True, receive_timeout=5)
|
auto_bind=True, receive_timeout=5)
|
||||||
safe_uid = ldap3.utils.conv.escape_filter_chars(username)
|
safe_uid = ldap3.utils.conv.escape_filter_chars(username)
|
||||||
conn.search(ldap_cfg.get('user_base', 'ou=people,dc=example,dc=com'),
|
conn.search(ldap_cfg.get('user_base', 'ou=people,dc=example,dc=com'),
|
||||||
|
|||||||
Reference in New Issue
Block a user