Fix worker command execution and execution status updates

Changes:
- Removed duplicate /api/executions/:id endpoint that didn't parse logs
- Added workers Map to track worker_id -> WebSocket connection
- Store worker connections when they send worker_connect message
- Send commands to specific worker instead of broadcasting to all clients
- Clean up workers Map when worker disconnects
- Update execution status to completed/failed when command results arrive
- Add proper error handling when worker is not connected

Fixes:
- execution.logs.forEach is not a function (logs now properly parsed)
- Commands stuck in "running" status (now update to completed/failed)
- Commands not reaching workers (now sent to specific worker WebSocket)

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
This commit is contained in:
2026-01-07 22:23:02 -05:00
parent 6d15e4d240
commit 2097b73404

View File

@@ -100,6 +100,8 @@ async function initDatabase() {
// WebSocket connections // WebSocket connections
const clients = new Set(); const clients = new Set();
const workers = new Map(); // Map worker_id -> WebSocket connection
wss.on('connection', (ws) => { wss.on('connection', (ws) => {
clients.add(ws); clients.add(ws);
@@ -125,6 +127,10 @@ wss.on('connection', (ws) => {
timestamp: timestamp || new Date().toISOString() timestamp: timestamp || new Date().toISOString()
}); });
// Update execution status to completed or failed
const finalStatus = success ? 'completed' : 'failed';
await updateExecutionStatus(execution_id, finalStatus);
// Broadcast result to all connected clients // Broadcast result to all connected clients
broadcast({ broadcast({
type: 'command_result', type: 'command_result',
@@ -135,7 +141,7 @@ wss.on('connection', (ws) => {
stderr: stderr stderr: stderr
}); });
console.log(`Command result received for execution ${execution_id}`); console.log(`Command result received for execution ${execution_id}: ${finalStatus}`);
} }
if (message.type === 'workflow_result') { if (message.type === 'workflow_result') {
@@ -173,6 +179,9 @@ wss.on('connection', (ws) => {
const { worker_id, worker_name } = message; const { worker_id, worker_name } = message;
console.log(`Worker connected: ${worker_name} (${worker_id})`); console.log(`Worker connected: ${worker_name} (${worker_id})`);
// Store worker WebSocket connection
workers.set(worker_id, ws);
// Update worker status to online // Update worker status to online
await pool.query( await pool.query(
`UPDATE workers SET status='online', last_heartbeat=NOW() WHERE id=?`, `UPDATE workers SET status='online', last_heartbeat=NOW() WHERE id=?`,
@@ -201,7 +210,17 @@ wss.on('connection', (ws) => {
} }
}); });
ws.on('close', () => clients.delete(ws)); ws.on('close', () => {
clients.delete(ws);
// Remove worker from workers map when disconnected
for (const [workerId, workerWs] of workers.entries()) {
if (workerWs === ws) {
workers.delete(workerId);
console.log(`Worker ${workerId} disconnected`);
break;
}
}
});
}); });
// Broadcast to all connected clients // Broadcast to all connected clients
@@ -379,18 +398,6 @@ app.get('/api/executions', authenticateSSO, async (req, res) => {
} }
}); });
app.get('/api/executions/:id', authenticateSSO, async (req, res) => {
try {
const [rows] = await pool.query('SELECT * FROM executions WHERE id = ?', [req.params.id]);
if (rows.length === 0) {
return res.status(404).json({ error: 'Not found' });
}
res.json(rows[0]);
} catch (error) {
res.status(500).json({ error: error.message });
}
});
// Health check (no auth required) // Health check (no auth required)
app.get('/health', async (req, res) => { app.get('/health', async (req, res) => {
try { try {
@@ -780,7 +787,7 @@ app.post('/api/workers/:id/command', authenticateSSO, async (req, res) => {
}])] }])]
); );
// Send command via WebSocket // Send command via WebSocket to specific worker
const commandMessage = { const commandMessage = {
type: 'execute_command', type: 'execute_command',
execution_id: executionId, execution_id: executionId,
@@ -789,11 +796,13 @@ app.post('/api/workers/:id/command', authenticateSSO, async (req, res) => {
timeout: 60000 timeout: 60000
}; };
clients.forEach(client => { const workerWs = workers.get(workerId);
if (client.readyState === WebSocket.OPEN) { if (!workerWs || workerWs.readyState !== WebSocket.OPEN) {
client.send(JSON.stringify(commandMessage)); return res.status(400).json({ error: 'Worker not connected' });
} }
});
workerWs.send(JSON.stringify(commandMessage));
console.log(`Command sent to worker ${workerId}: ${command}`);
broadcast({ type: 'execution_started', execution_id: executionId, workflow_id: null }); broadcast({ type: 'execution_started', execution_id: executionId, workflow_id: null });
res.json({ success: true, execution_id: executionId }); res.json({ success: true, execution_id: executionId });