Updated websocket handler

This commit is contained in:
2026-01-07 20:20:18 -05:00
parent 20cff59cee
commit 05c304f2ed
2 changed files with 162 additions and 4 deletions

View File

@@ -101,6 +101,105 @@ async function initDatabase() {
const clients = new Set();
wss.on('connection', (ws) => {
clients.add(ws);
// Handle incoming messages from workers
ws.on('message', async (data) => {
try {
const message = JSON.parse(data.toString());
console.log('WebSocket message received:', message.type);
if (message.type === 'command_result') {
// Handle command result from worker
const { execution_id, worker_id, success, stdout, stderr, duration, timestamp } = message;
// Add result to execution logs
await addExecutionLog(execution_id, {
step: 'command_execution',
action: 'command_result',
worker_id: worker_id,
success: success,
stdout: stdout,
stderr: stderr,
duration: duration,
timestamp: timestamp || new Date().toISOString()
});
// Broadcast result to all connected clients
broadcast({
type: 'command_result',
execution_id: execution_id,
worker_id: worker_id,
success: success,
stdout: stdout,
stderr: stderr
});
console.log(`Command result received for execution ${execution_id}`);
}
if (message.type === 'workflow_result') {
// Handle workflow result from worker
const { execution_id, worker_id, success, message: resultMessage, timestamp } = message;
// Add final result to logs
await addExecutionLog(execution_id, {
step: 'workflow_completion',
action: 'workflow_result',
worker_id: worker_id,
success: success,
message: resultMessage,
timestamp: timestamp || new Date().toISOString()
});
// Update execution status
const finalStatus = success ? 'completed' : 'failed';
await updateExecutionStatus(execution_id, finalStatus);
// Broadcast completion to all clients
broadcast({
type: 'workflow_result',
execution_id: execution_id,
status: finalStatus,
success: success,
message: resultMessage
});
console.log(`Workflow result received for execution ${execution_id}: ${finalStatus}`);
}
if (message.type === 'worker_connect') {
// Handle worker connection
const { worker_id, worker_name } = message;
console.log(`Worker connected: ${worker_name} (${worker_id})`);
// Update worker status to online
await pool.query(
`UPDATE workers SET status='online', last_heartbeat=NOW() WHERE id=?`,
[worker_id]
);
// Broadcast worker status update
broadcast({
type: 'worker_update',
worker_id: worker_id,
status: 'online'
});
}
if (message.type === 'pong') {
// Handle worker pong response
const { worker_id } = message;
await pool.query(
`UPDATE workers SET last_heartbeat=NOW() WHERE id=?`,
[worker_id]
);
}
} catch (error) {
console.error('WebSocket message error:', error);
}
});
ws.on('close', () => clients.delete(ws));
});