From 8bda9672d6d81b8463e40a7d688694799b5e23fd Mon Sep 17 00:00:00 2001 From: Jared Vititoe Date: Wed, 7 Jan 2026 22:23:02 -0500 Subject: [PATCH] Fix worker command execution and execution status updates Changes: - Removed duplicate /api/executions/:id endpoint that didn't parse logs - Added workers Map to track worker_id -> WebSocket connection - Store worker connections when they send worker_connect message - Send commands to specific worker instead of broadcasting to all clients - Clean up workers Map when worker disconnects - Update execution status to completed/failed when command results arrive - Add proper error handling when worker is not connected Fixes: - execution.logs.forEach is not a function (logs now properly parsed) - Commands stuck in "running" status (now update to completed/failed) - Commands not reaching workers (now sent to specific worker WebSocket) Co-Authored-By: Claude Sonnet 4.5 --- server.js | 49 +++++++++++++++++++++++++++++-------------------- 1 file changed, 29 insertions(+), 20 deletions(-) diff --git a/server.js b/server.js index f247418..be39cc2 100644 --- a/server.js +++ b/server.js @@ -100,6 +100,8 @@ async function initDatabase() { // WebSocket connections const clients = new Set(); +const workers = new Map(); // Map worker_id -> WebSocket connection + wss.on('connection', (ws) => { clients.add(ws); @@ -125,6 +127,10 @@ wss.on('connection', (ws) => { timestamp: timestamp || new Date().toISOString() }); + // Update execution status to completed or failed + const finalStatus = success ? 'completed' : 'failed'; + await updateExecutionStatus(execution_id, finalStatus); + // Broadcast result to all connected clients broadcast({ type: 'command_result', @@ -135,7 +141,7 @@ wss.on('connection', (ws) => { stderr: stderr }); - console.log(`Command result received for execution ${execution_id}`); + console.log(`Command result received for execution ${execution_id}: ${finalStatus}`); } if (message.type === 'workflow_result') { @@ -173,6 +179,9 @@ wss.on('connection', (ws) => { const { worker_id, worker_name } = message; console.log(`Worker connected: ${worker_name} (${worker_id})`); + // Store worker WebSocket connection + workers.set(worker_id, ws); + // Update worker status to online await pool.query( `UPDATE workers SET status='online', last_heartbeat=NOW() WHERE id=?`, @@ -201,7 +210,17 @@ wss.on('connection', (ws) => { } }); - ws.on('close', () => clients.delete(ws)); + ws.on('close', () => { + clients.delete(ws); + // Remove worker from workers map when disconnected + for (const [workerId, workerWs] of workers.entries()) { + if (workerWs === ws) { + workers.delete(workerId); + console.log(`Worker ${workerId} disconnected`); + break; + } + } + }); }); // Broadcast to all connected clients @@ -379,18 +398,6 @@ app.get('/api/executions', authenticateSSO, async (req, res) => { } }); -app.get('/api/executions/:id', authenticateSSO, async (req, res) => { - try { - const [rows] = await pool.query('SELECT * FROM executions WHERE id = ?', [req.params.id]); - if (rows.length === 0) { - return res.status(404).json({ error: 'Not found' }); - } - res.json(rows[0]); - } catch (error) { - res.status(500).json({ error: error.message }); - } -}); - // Health check (no auth required) app.get('/health', async (req, res) => { try { @@ -780,7 +787,7 @@ app.post('/api/workers/:id/command', authenticateSSO, async (req, res) => { }])] ); - // Send command via WebSocket + // Send command via WebSocket to specific worker const commandMessage = { type: 'execute_command', execution_id: executionId, @@ -789,11 +796,13 @@ app.post('/api/workers/:id/command', authenticateSSO, async (req, res) => { timeout: 60000 }; - clients.forEach(client => { - if (client.readyState === WebSocket.OPEN) { - client.send(JSON.stringify(commandMessage)); - } - }); + const workerWs = workers.get(workerId); + if (!workerWs || workerWs.readyState !== WebSocket.OPEN) { + return res.status(400).json({ error: 'Worker not connected' }); + } + + workerWs.send(JSON.stringify(commandMessage)); + console.log(`Command sent to worker ${workerId}: ${command}`); broadcast({ type: 'execution_started', execution_id: executionId, workflow_id: null }); res.json({ success: true, execution_id: executionId });