Security hardening, bug fixes, and performance improvements

Security fixes:
- Replace new Function() condition eval with vm.runInNewContext() (RCE fix)
- Add admin checks to DELETE executions, all scheduled-commands endpoints
- Remove api_key from GET /api/workers response (was exposed to all employees)
- Separate browserClients/workerClients sets; broadcast() now sends to browsers only
- Add worker WebSocket auth: reject if api_key provided but invalid
- Fix XSS: escapeHtml() on step_name, duration, worker_id, user info, execution_id

Bug fixes:
- Replace DB-polling waitForCommandResult with event-driven _commandResolvers Map
- Replace non-atomic addExecutionLog with JSON_ARRAY_APPEND (fixes concurrent write race)
- Add stale execution recovery on startup: running→failed with log entry
- Fix calculateNextRun returning null for unknown types (now throws)
- Fix scheduler overlap: skip if previous execution still running
- Fix JSON double-parse on worker_ids column
- Fix switchTab() bare event.target reference
- Fix selectedExecutions Array→Set (O(1) lookups, fixes performance regression)
- Fix param modal event listener leak (delegated handler, removes before re-adding)
- Add ws.onerror handler (was silently swallowing WebSocket errors)
- Move misplaced routes to before server.listen()

Performance/cleanup:
- DB connection pool 10→50
- EXECUTION_RETENTION_DAYS default 1→30 (matches docs)
- Remove unused packages: bcryptjs, body-parser, cors, js-yaml, jsonwebtoken
- Remove generateUUID() wrapper, use crypto.randomUUID() directly
- Remove dead example workflow constants
- Add ESC key handler to close modals
- Fix clearCompletedExecutions limit 1000→9999
- Add security notice to README.md

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-03-11 22:53:25 -04:00
parent 0fee118d1d
commit 58c172e131
5 changed files with 169 additions and 394 deletions

296
server.js
View File

@@ -3,6 +3,7 @@ const http = require('http');
const WebSocket = require('ws');
const mysql = require('mysql2/promise');
const crypto = require('crypto');
const vm = require('vm');
require('dotenv').config();
const app = express();
@@ -13,11 +14,6 @@ const wss = new WebSocket.Server({ server });
app.use(express.json());
app.use(express.static('public'));
// UUID generator
function generateUUID() {
return crypto.randomUUID();
}
// Database pool
const pool = mysql.createPool({
host: process.env.DB_HOST,
@@ -26,7 +22,7 @@ const pool = mysql.createPool({
password: process.env.DB_PASSWORD,
database: process.env.DB_NAME,
waitForConnections: true,
connectionLimit: 10,
connectionLimit: 50,
queueLimit: 0
});
@@ -107,6 +103,22 @@ async function initDatabase() {
)
`);
// Recover stale executions from a previous server crash
const [staleExecs] = await connection.query("SELECT id FROM executions WHERE status = 'running'");
if (staleExecs.length > 0) {
for (const exec of staleExecs) {
await connection.query(
"UPDATE executions SET status = 'failed', completed_at = NOW() WHERE id = ?",
[exec.id]
);
await connection.query(
"UPDATE executions SET logs = JSON_ARRAY_APPEND(COALESCE(logs, '[]'), '$', CAST(? AS JSON)) WHERE id = ?",
[JSON.stringify({ action: 'server_restart_recovery', message: 'Execution marked failed due to server restart', timestamp: new Date().toISOString() }), exec.id]
);
}
console.log(`[Recovery] Marked ${staleExecs.length} stale execution(s) as failed`);
}
console.log('Database tables initialized successfully');
} catch (error) {
console.error('Database initialization error:', error);
@@ -119,7 +131,7 @@ async function initDatabase() {
// Auto-cleanup old executions (runs hourly)
async function cleanupOldExecutions() {
try {
const retentionDays = parseInt(process.env.EXECUTION_RETENTION_DAYS) || 1;
const retentionDays = parseInt(process.env.EXECUTION_RETENTION_DAYS) || 30;
const [result] = await pool.query(
`DELETE FROM executions
WHERE status IN ('completed', 'failed')
@@ -149,15 +161,28 @@ async function processScheduledCommands() {
);
for (const schedule of schedules) {
// Prevent overlapping execution — skip if a previous run is still active
const [runningExecs] = await pool.query(
"SELECT id FROM executions WHERE started_by = ? AND status = 'running'",
[`scheduler:${schedule.name}`]
);
if (runningExecs.length > 0) {
console.log(`[Scheduler] Skipping "${schedule.name}" - previous execution still running`);
continue;
}
console.log(`[Scheduler] Running scheduled command: ${schedule.name}`);
const workerIds = JSON.parse(schedule.worker_ids);
// Handle both string (raw SQL) and object (auto-parsed by MySQL2 JSON column)
const workerIds = typeof schedule.worker_ids === 'string'
? JSON.parse(schedule.worker_ids)
: schedule.worker_ids;
// Execute command on each worker
for (const workerId of workerIds) {
const workerWs = workers.get(workerId);
if (workerWs && workerWs.readyState === WebSocket.OPEN) {
const executionId = generateUUID();
const executionId = crypto.randomUUID();
// Create execution record
await pool.query(
@@ -185,7 +210,13 @@ async function processScheduledCommands() {
}
// Update last_run and calculate next_run
const nextRun = calculateNextRun(schedule.schedule_type, schedule.schedule_value);
let nextRun;
try {
nextRun = calculateNextRun(schedule.schedule_type, schedule.schedule_value);
} catch (err) {
console.error(`[Scheduler] Invalid schedule config for "${schedule.name}": ${err.message}`);
continue;
}
await pool.query(
'UPDATE scheduled_commands SET last_run = NOW(), next_run = ? WHERE id = ?',
[nextRun, schedule.id]
@@ -202,10 +233,12 @@ function calculateNextRun(scheduleType, scheduleValue) {
if (scheduleType === 'interval') {
// Interval in minutes
const minutes = parseInt(scheduleValue);
if (isNaN(minutes) || minutes <= 0) throw new Error(`Invalid interval value: ${scheduleValue}`);
return new Date(now.getTime() + minutes * 60000);
} else if (scheduleType === 'daily') {
// Daily at HH:MM
const [hours, minutes] = scheduleValue.split(':').map(Number);
if (isNaN(hours) || isNaN(minutes)) throw new Error(`Invalid daily time format: ${scheduleValue}`);
const next = new Date(now);
next.setHours(hours, minutes, 0, 0);
@@ -217,10 +250,11 @@ function calculateNextRun(scheduleType, scheduleValue) {
} else if (scheduleType === 'hourly') {
// Every N hours
const hours = parseInt(scheduleValue);
if (isNaN(hours) || hours <= 0) throw new Error(`Invalid hourly value: ${scheduleValue}`);
return new Date(now.getTime() + hours * 3600000);
}
return null;
throw new Error(`Unknown schedule type: ${scheduleType}`);
}
// Run scheduler every minute
@@ -229,11 +263,13 @@ setInterval(processScheduledCommands, 60 * 1000);
setTimeout(processScheduledCommands, 5000);
// WebSocket connections
const clients = new Set();
const browserClients = new Set(); // Browser UI connections
const workerClients = new Set(); // Worker agent connections
const workers = new Map(); // Map worker_id -> WebSocket connection
wss.on('connection', (ws) => {
clients.add(ws);
// Default to browser client until worker_connect identifies it as a worker
browserClients.add(ws);
// Handle incoming messages from workers
ws.on('message', async (data) => {
@@ -269,7 +305,15 @@ wss.on('connection', (ws) => {
await updateExecutionStatus(execution_id, finalStatus);
}
// Broadcast result to all connected clients
// Resolve any pending event-driven command promise (eliminates DB polling)
if (command_id) {
const resolver = _commandResolvers.get(command_id);
if (resolver) {
resolver({ success, stdout, stderr });
}
}
// Broadcast result to browser clients only
broadcast({
type: 'command_result',
execution_id: execution_id,
@@ -314,8 +358,16 @@ wss.on('connection', (ws) => {
}
if (message.type === 'worker_connect') {
// Handle worker connection
const { worker_id, worker_name } = message;
// Authenticate worker — reject if api_key is provided but wrong
const { worker_id, worker_name, api_key } = message;
if (api_key && api_key !== process.env.WORKER_API_KEY) {
console.warn(`[Security] Worker connection rejected: invalid API key from "${worker_name}"`);
ws.close(4001, 'Unauthorized');
return;
}
// Move from browser set to worker set
browserClients.delete(ws);
workerClients.add(ws);
console.log(`Worker connected: ${worker_name} (${worker_id})`);
// Find the database worker ID by name
@@ -369,7 +421,8 @@ wss.on('connection', (ws) => {
});
ws.on('close', () => {
clients.delete(ws);
browserClients.delete(ws);
workerClients.delete(ws);
// Remove worker from workers map when disconnected (both runtime and db IDs)
if (ws.workerId) {
workers.delete(ws.workerId);
@@ -382,29 +435,22 @@ wss.on('connection', (ws) => {
});
});
// Broadcast to all connected clients
// Broadcast to browser clients only (NOT worker agents)
function broadcast(data) {
clients.forEach(client => {
browserClients.forEach(client => {
if (client.readyState === WebSocket.OPEN) {
client.send(JSON.stringify(data));
}
});
}
// Helper function to add log entry to execution
// Helper function to add log entry to execution (atomic — no read-modify-write race condition)
async function addExecutionLog(executionId, logEntry) {
try {
const [execution] = await pool.query('SELECT logs FROM executions WHERE id = ?', [executionId]);
if (execution.length > 0) {
const logs = typeof execution[0].logs === 'string' ? JSON.parse(execution[0].logs) : execution[0].logs;
logs.push(logEntry);
await pool.query(
'UPDATE executions SET logs = ? WHERE id = ?',
[JSON.stringify(logs), executionId]
);
}
await pool.query(
"UPDATE executions SET logs = JSON_ARRAY_APPEND(COALESCE(logs, '[]'), '$', CAST(? AS JSON)) WHERE id = ?",
[JSON.stringify(logEntry), executionId]
);
} catch (error) {
console.error(`[Workflow] Error adding execution log:`, error);
}
@@ -459,7 +505,7 @@ async function authenticateSSO(req, res, next) {
// Store/update user in database
try {
const userId = generateUUID();
const userId = crypto.randomUUID();
await pool.query(
`INSERT INTO users (id, username, display_name, email, groups, last_login)
VALUES (?, ?, ?, ?, ?, NOW())
@@ -512,10 +558,11 @@ function applyParams(command, params) {
}
// Evaluate a condition string against execution state and params.
// Uses vm.runInNewContext with a timeout to avoid arbitrary code execution risk.
function evalCondition(condition, state, params) {
try {
// eslint-disable-next-line no-new-func
return !!new Function('state', 'params', `return !!(${condition})`)(state, params);
const context = vm.createContext({ state, params, promptResponse: state.promptResponse });
return !!vm.runInNewContext(condition, context, { timeout: 100 });
} catch (e) {
return false;
}
@@ -527,6 +574,7 @@ const _executionState = new Map(); // executionId → { params, state }
// Pending prompt resolvers — set when a prompt step is waiting for user input.
const _executionPrompts = new Map(); // executionId → resolve fn
const _commandResolvers = new Map(); // commandId → resolve fn (event-driven result delivery)
async function executeWorkflowSteps(executionId, workflowId, definition, username, params = {}) {
_executionState.set(executionId, { params, state: {} });
@@ -724,7 +772,7 @@ async function executeCommandStep(executionId, step, stepNumber, params = {}) {
}
// Send command to worker
const commandId = generateUUID();
const commandId = crypto.randomUUID();
await addExecutionLog(executionId, {
step: stepNumber,
@@ -769,52 +817,21 @@ async function executeCommandStep(executionId, step, stepNumber, params = {}) {
}
}
// Wait for a command result using event-driven promise resolution.
// The resolver is stored in _commandResolvers and called immediately when
// command_result arrives via WebSocket — no DB polling required.
async function waitForCommandResult(executionId, commandId, timeout) {
return new Promise((resolve) => {
const startTime = Date.now();
const timer = setTimeout(() => {
_commandResolvers.delete(commandId);
resolve({ success: false, error: 'Command timeout' });
}, timeout);
const checkInterval = setInterval(async () => {
try {
// Check if we've received the command result in logs
const [execution] = await pool.query('SELECT logs FROM executions WHERE id = ?', [executionId]);
if (execution.length > 0) {
const logs = typeof execution[0].logs === 'string' ? JSON.parse(execution[0].logs) : execution[0].logs;
// Find the command_sent entry for this commandId, then look for the next command_result after it.
// (Worker doesn't echo command_id back, so we can't match by command_id directly.)
const sentIdx = logs.findIndex(l => l.command_id === commandId && l.action === 'command_sent');
const resultLog = sentIdx >= 0
? logs.slice(sentIdx + 1).find(l => l.action === 'command_result')
: logs.find(l => l.command_id === commandId && l.action === 'command_result');
if (resultLog) {
clearInterval(checkInterval);
resolve({
success: resultLog.success,
stdout: resultLog.stdout,
stderr: resultLog.stderr
});
return;
}
}
// Check timeout
if (Date.now() - startTime > timeout) {
clearInterval(checkInterval);
resolve({
success: false,
error: 'Command timeout'
});
}
} catch (error) {
console.error('[Workflow] Error checking command result:', error);
clearInterval(checkInterval);
resolve({
success: false,
error: error.message
});
}
}, 500); // Check every 500ms
_commandResolvers.set(commandId, (result) => {
clearTimeout(timer);
_commandResolvers.delete(commandId);
resolve(result);
});
});
}
@@ -846,7 +863,7 @@ app.get('/api/workflows/:id', authenticateSSO, async (req, res) => {
app.post('/api/workflows', authenticateSSO, async (req, res) => {
try {
const { name, description, definition } = req.body;
const id = generateUUID();
const id = crypto.randomUUID();
console.log('[Workflow] Creating workflow:', name);
console.log('[Workflow] Definition:', JSON.stringify(definition, null, 2));
@@ -886,7 +903,7 @@ app.delete('/api/workflows/:id', authenticateSSO, async (req, res) => {
app.get('/api/workers', authenticateSSO, async (req, res) => {
try {
const [rows] = await pool.query('SELECT * FROM workers ORDER BY name');
const [rows] = await pool.query('SELECT id, name, status, last_heartbeat, metadata FROM workers ORDER BY name');
res.json(rows);
} catch (error) {
res.status(500).json({ error: error.message });
@@ -923,7 +940,7 @@ app.post('/api/workers/heartbeat', async (req, res) => {
app.post('/api/executions', authenticateSSO, async (req, res) => {
try {
const { workflow_id, params = {} } = req.body;
const id = generateUUID();
const id = crypto.randomUUID();
// Get workflow definition
const [workflows] = await pool.query('SELECT * FROM workflows WHERE id = ?', [workflow_id]);
@@ -997,6 +1014,7 @@ app.get('/api/executions', authenticateSSO, async (req, res) => {
app.delete('/api/executions/:id', authenticateSSO, async (req, res) => {
try {
if (!req.user.isAdmin) return res.status(403).json({ error: 'Admin access required' });
await pool.query('DELETE FROM executions WHERE id = ?', [req.params.id]);
broadcast({ type: 'execution_deleted', execution_id: req.params.id });
res.json({ success: true });
@@ -1115,13 +1133,14 @@ app.get('/api/scheduled-commands', authenticateSSO, async (req, res) => {
app.post('/api/scheduled-commands', authenticateSSO, async (req, res) => {
try {
if (!req.user.isAdmin) return res.status(403).json({ error: 'Admin access required' });
const { name, command, worker_ids, schedule_type, schedule_value } = req.body;
if (!name || !command || !worker_ids || !schedule_type || !schedule_value) {
return res.status(400).json({ error: 'Missing required fields' });
}
const id = generateUUID();
const id = crypto.randomUUID();
const nextRun = calculateNextRun(schedule_type, schedule_value);
await pool.query(
@@ -1139,6 +1158,7 @@ app.post('/api/scheduled-commands', authenticateSSO, async (req, res) => {
app.put('/api/scheduled-commands/:id/toggle', authenticateSSO, async (req, res) => {
try {
if (!req.user.isAdmin) return res.status(403).json({ error: 'Admin access required' });
const { id } = req.params;
const [current] = await pool.query('SELECT enabled FROM scheduled_commands WHERE id = ?', [id]);
@@ -1157,6 +1177,7 @@ app.put('/api/scheduled-commands/:id/toggle', authenticateSSO, async (req, res)
app.delete('/api/scheduled-commands/:id', authenticateSSO, async (req, res) => {
try {
if (!req.user.isAdmin) return res.status(403).json({ error: 'Admin access required' });
const { id } = req.params;
await pool.query('DELETE FROM scheduled_commands WHERE id = ?', [id]);
res.json({ success: true });
@@ -1178,7 +1199,7 @@ app.post('/api/internal/command', authenticateGandalf, async (req, res) => {
return res.status(400).json({ error: 'Worker not connected' });
}
const executionId = generateUUID();
const executionId = crypto.randomUUID();
await pool.query(
'INSERT INTO executions (id, workflow_id, status, started_by, started_at, logs) VALUES (?, ?, ?, ?, NOW(), ?)',
@@ -1241,26 +1262,6 @@ app.get('/health', async (req, res) => {
}
});
// Start server
const PORT = process.env.PORT || 8080;
const HOST = process.env.HOST || '0.0.0.0';
initDatabase().then(() => {
server.listen(PORT, HOST, () => {
console.log(`PULSE Server running on http://${HOST}:${PORT}`);
console.log(`Connected to MariaDB at ${process.env.DB_HOST}`);
console.log(`Authentication: Authelia SSO`);
console.log(`Worker API Key configured: ${process.env.WORKER_API_KEY ? 'Yes' : 'No'}`);
});
}).catch(err => {
console.error('Failed to start server:', err);
process.exit(1);
});
// ============================================
// WORKFLOW EXECUTION ENGINE
// Get execution details with logs
app.get('/api/executions/:id', authenticateSSO, async (req, res) => {
try {
@@ -1270,7 +1271,7 @@ app.get('/api/executions/:id', authenticateSSO, async (req, res) => {
}
const execution = rows[0];
const parsedLogs = JSON.parse(execution.logs || '[]');
const parsedLogs = typeof execution.logs === 'string' ? JSON.parse(execution.logs || '[]') : (execution.logs || []);
const waitingForInput = _executionPrompts.has(req.params.id);
let pendingPrompt = null;
if (waitingForInput) {
@@ -1299,7 +1300,7 @@ app.delete('/api/workers/:id', authenticateSSO, async (req, res) => {
if (!req.user.isAdmin) {
return res.status(403).json({ error: 'Admin access required' });
}
await pool.query('DELETE FROM workers WHERE id = ?', [req.params.id]);
res.json({ success: true });
broadcast({ type: 'worker_deleted', worker_id: req.params.id });
@@ -1308,11 +1309,11 @@ app.delete('/api/workers/:id', authenticateSSO, async (req, res) => {
}
});
// Send direct command to specific worker (for testing)
// Send direct command to specific worker
app.post('/api/workers/:id/command', authenticateSSO, async (req, res) => {
try {
const { command } = req.body;
const executionId = generateUUID();
const executionId = crypto.randomUUID();
const workerId = req.params.id;
// Create execution record in database
@@ -1351,71 +1352,18 @@ app.post('/api/workers/:id/command', authenticateSSO, async (req, res) => {
}
});
// ============================================
// EXAMPLE WORKFLOW DEFINITIONS
// ============================================
// Start server
const PORT = process.env.PORT || 8080;
const HOST = process.env.HOST || '0.0.0.0';
// Example 1: Simple command execution
const simpleWorkflow = {
name: "Update System Packages",
description: "Update all packages on target servers",
steps: [
{
name: "Update package list",
type: "execute",
targets: ["all"],
command: "apt update"
},
{
name: "User Approval",
type: "prompt",
message: "Packages updated. Proceed with upgrade?",
options: ["Yes", "No"]
},
{
name: "Upgrade packages",
type: "execute",
targets: ["all"],
command: "apt upgrade -y",
condition: "promptResponse === 'Yes'"
}
]
};
// Example 2: Complex workflow with conditions
const backupWorkflow = {
name: "Backup and Verify",
description: "Create backup and verify integrity",
steps: [
{
name: "Create backup",
type: "execute",
targets: ["all"],
command: "tar -czf /tmp/backup-$(date +%Y%m%d).tar.gz /opt/pulse-worker"
},
{
name: "Wait for backup",
type: "wait",
duration: 5000
},
{
name: "Verify backup",
type: "execute",
targets: ["all"],
command: "tar -tzf /tmp/backup-*.tar.gz > /dev/null && echo 'Backup OK' || echo 'Backup FAILED'"
},
{
name: "Cleanup decision",
type: "prompt",
message: "Backup complete. Delete old backups?",
options: ["Yes", "No", "Cancel"]
},
{
name: "Cleanup old backups",
type: "execute",
targets: ["all"],
command: "find /tmp -name 'backup-*.tar.gz' -mtime +7 -delete",
condition: "promptResponse === 'Yes'"
}
]
};
initDatabase().then(() => {
server.listen(PORT, HOST, () => {
console.log(`PULSE Server running on http://${HOST}:${PORT}`);
console.log(`Connected to MariaDB at ${process.env.DB_HOST}`);
console.log(`Authentication: Authelia SSO`);
console.log(`Worker API Key configured: ${process.env.WORKER_API_KEY ? 'Yes' : 'No'}`);
});
}).catch(err => {
console.error('Failed to start server:', err);
process.exit(1);
});