Fix worker command execution and execution status updates
Changes: - Removed duplicate /api/executions/:id endpoint that didn't parse logs - Added workers Map to track worker_id -> WebSocket connection - Store worker connections when they send worker_connect message - Send commands to specific worker instead of broadcasting to all clients - Clean up workers Map when worker disconnects - Update execution status to completed/failed when command results arrive - Add proper error handling when worker is not connected Fixes: - execution.logs.forEach is not a function (logs now properly parsed) - Commands stuck in "running" status (now update to completed/failed) - Commands not reaching workers (now sent to specific worker WebSocket) Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
This commit is contained in:
49
server.js
49
server.js
@@ -100,6 +100,8 @@ async function initDatabase() {
|
|||||||
|
|
||||||
// WebSocket connections
|
// WebSocket connections
|
||||||
const clients = new Set();
|
const clients = new Set();
|
||||||
|
const workers = new Map(); // Map worker_id -> WebSocket connection
|
||||||
|
|
||||||
wss.on('connection', (ws) => {
|
wss.on('connection', (ws) => {
|
||||||
clients.add(ws);
|
clients.add(ws);
|
||||||
|
|
||||||
@@ -125,6 +127,10 @@ wss.on('connection', (ws) => {
|
|||||||
timestamp: timestamp || new Date().toISOString()
|
timestamp: timestamp || new Date().toISOString()
|
||||||
});
|
});
|
||||||
|
|
||||||
|
// Update execution status to completed or failed
|
||||||
|
const finalStatus = success ? 'completed' : 'failed';
|
||||||
|
await updateExecutionStatus(execution_id, finalStatus);
|
||||||
|
|
||||||
// Broadcast result to all connected clients
|
// Broadcast result to all connected clients
|
||||||
broadcast({
|
broadcast({
|
||||||
type: 'command_result',
|
type: 'command_result',
|
||||||
@@ -135,7 +141,7 @@ wss.on('connection', (ws) => {
|
|||||||
stderr: stderr
|
stderr: stderr
|
||||||
});
|
});
|
||||||
|
|
||||||
console.log(`Command result received for execution ${execution_id}`);
|
console.log(`Command result received for execution ${execution_id}: ${finalStatus}`);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (message.type === 'workflow_result') {
|
if (message.type === 'workflow_result') {
|
||||||
@@ -173,6 +179,9 @@ wss.on('connection', (ws) => {
|
|||||||
const { worker_id, worker_name } = message;
|
const { worker_id, worker_name } = message;
|
||||||
console.log(`Worker connected: ${worker_name} (${worker_id})`);
|
console.log(`Worker connected: ${worker_name} (${worker_id})`);
|
||||||
|
|
||||||
|
// Store worker WebSocket connection
|
||||||
|
workers.set(worker_id, ws);
|
||||||
|
|
||||||
// Update worker status to online
|
// Update worker status to online
|
||||||
await pool.query(
|
await pool.query(
|
||||||
`UPDATE workers SET status='online', last_heartbeat=NOW() WHERE id=?`,
|
`UPDATE workers SET status='online', last_heartbeat=NOW() WHERE id=?`,
|
||||||
@@ -201,7 +210,17 @@ wss.on('connection', (ws) => {
|
|||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
ws.on('close', () => clients.delete(ws));
|
ws.on('close', () => {
|
||||||
|
clients.delete(ws);
|
||||||
|
// Remove worker from workers map when disconnected
|
||||||
|
for (const [workerId, workerWs] of workers.entries()) {
|
||||||
|
if (workerWs === ws) {
|
||||||
|
workers.delete(workerId);
|
||||||
|
console.log(`Worker ${workerId} disconnected`);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
});
|
});
|
||||||
|
|
||||||
// Broadcast to all connected clients
|
// Broadcast to all connected clients
|
||||||
@@ -379,18 +398,6 @@ app.get('/api/executions', authenticateSSO, async (req, res) => {
|
|||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
app.get('/api/executions/:id', authenticateSSO, async (req, res) => {
|
|
||||||
try {
|
|
||||||
const [rows] = await pool.query('SELECT * FROM executions WHERE id = ?', [req.params.id]);
|
|
||||||
if (rows.length === 0) {
|
|
||||||
return res.status(404).json({ error: 'Not found' });
|
|
||||||
}
|
|
||||||
res.json(rows[0]);
|
|
||||||
} catch (error) {
|
|
||||||
res.status(500).json({ error: error.message });
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
// Health check (no auth required)
|
// Health check (no auth required)
|
||||||
app.get('/health', async (req, res) => {
|
app.get('/health', async (req, res) => {
|
||||||
try {
|
try {
|
||||||
@@ -780,7 +787,7 @@ app.post('/api/workers/:id/command', authenticateSSO, async (req, res) => {
|
|||||||
}])]
|
}])]
|
||||||
);
|
);
|
||||||
|
|
||||||
// Send command via WebSocket
|
// Send command via WebSocket to specific worker
|
||||||
const commandMessage = {
|
const commandMessage = {
|
||||||
type: 'execute_command',
|
type: 'execute_command',
|
||||||
execution_id: executionId,
|
execution_id: executionId,
|
||||||
@@ -789,11 +796,13 @@ app.post('/api/workers/:id/command', authenticateSSO, async (req, res) => {
|
|||||||
timeout: 60000
|
timeout: 60000
|
||||||
};
|
};
|
||||||
|
|
||||||
clients.forEach(client => {
|
const workerWs = workers.get(workerId);
|
||||||
if (client.readyState === WebSocket.OPEN) {
|
if (!workerWs || workerWs.readyState !== WebSocket.OPEN) {
|
||||||
client.send(JSON.stringify(commandMessage));
|
return res.status(400).json({ error: 'Worker not connected' });
|
||||||
}
|
}
|
||||||
});
|
|
||||||
|
workerWs.send(JSON.stringify(commandMessage));
|
||||||
|
console.log(`Command sent to worker ${workerId}: ${command}`);
|
||||||
|
|
||||||
broadcast({ type: 'execution_started', execution_id: executionId, workflow_id: null });
|
broadcast({ type: 'execution_started', execution_id: executionId, workflow_id: null });
|
||||||
res.json({ success: true, execution_id: executionId });
|
res.json({ success: true, execution_id: executionId });
|
||||||
|
|||||||
Reference in New Issue
Block a user