Updated websocket handler

This commit is contained in:
2026-01-07 20:20:18 -05:00
parent a1cf8ac90b
commit 2767087e27
2 changed files with 162 additions and 4 deletions

View File

@@ -892,7 +892,9 @@
} }
document.getElementById('executionDetails').innerHTML = html; document.getElementById('executionDetails').innerHTML = html;
document.getElementById('viewExecutionModal').classList.add('show'); const modal = document.getElementById('viewExecutionModal');
modal.dataset.executionId = executionId;
modal.classList.add('show');
} catch (error) { } catch (error) {
console.error('Error viewing execution:', error); console.error('Error viewing execution:', error);
alert('Error loading execution details'); alert('Error loading execution details');
@@ -1060,13 +1062,70 @@
function connectWebSocket() { function connectWebSocket() {
const protocol = window.location.protocol === 'https:' ? 'wss:' : 'ws:'; const protocol = window.location.protocol === 'https:' ? 'wss:' : 'ws:';
ws = new WebSocket(`${protocol}//${window.location.host}`); ws = new WebSocket(`${protocol}//${window.location.host}`);
ws.onmessage = (event) => { ws.onmessage = (event) => {
const data = JSON.parse(event.data); const data = JSON.parse(event.data);
console.log('WebSocket message:', data); console.log('WebSocket message:', data);
refreshData();
// Handle specific message types
if (data.type === 'command_result') {
// Display command result in real-time
console.log(`Command result received for execution ${data.execution_id}`);
console.log(`Success: ${data.success}`);
console.log(`Output: ${data.stdout}`);
if (data.stderr) {
console.log(`Error: ${data.stderr}`);
}
// If viewing execution details, refresh that specific execution
const executionModal = document.getElementById('viewExecutionModal');
if (executionModal && executionModal.classList.contains('show')) {
// Reload execution details to show new logs
const executionId = executionModal.dataset.executionId;
if (executionId === data.execution_id) {
viewExecution(executionId);
}
}
// Refresh execution list to show updated status
loadExecutions();
}
if (data.type === 'workflow_result') {
console.log(`Workflow ${data.status} for execution ${data.execution_id}`);
// Refresh execution list
loadExecutions();
// If viewing this execution, refresh details
const executionModal = document.getElementById('viewExecutionModal');
if (executionModal && executionModal.classList.contains('show')) {
const executionId = executionModal.dataset.executionId;
if (executionId === data.execution_id) {
viewExecution(executionId);
}
}
}
if (data.type === 'worker_update') {
console.log(`Worker ${data.worker_id} status: ${data.status}`);
loadWorkers();
}
if (data.type === 'execution_started' || data.type === 'execution_status') {
loadExecutions();
}
if (data.type === 'workflow_created' || data.type === 'workflow_deleted') {
loadWorkflows();
}
// Generic refresh for other message types
if (!['command_result', 'workflow_result', 'worker_update', 'execution_started', 'execution_status', 'workflow_created', 'workflow_deleted'].includes(data.type)) {
refreshData();
}
}; };
ws.onclose = () => { ws.onclose = () => {
console.log('WebSocket closed, reconnecting...'); console.log('WebSocket closed, reconnecting...');
setTimeout(connectWebSocket, 5000); setTimeout(connectWebSocket, 5000);

View File

@@ -101,6 +101,105 @@ async function initDatabase() {
const clients = new Set(); const clients = new Set();
wss.on('connection', (ws) => { wss.on('connection', (ws) => {
clients.add(ws); clients.add(ws);
// Handle incoming messages from workers
ws.on('message', async (data) => {
try {
const message = JSON.parse(data.toString());
console.log('WebSocket message received:', message.type);
if (message.type === 'command_result') {
// Handle command result from worker
const { execution_id, worker_id, success, stdout, stderr, duration, timestamp } = message;
// Add result to execution logs
await addExecutionLog(execution_id, {
step: 'command_execution',
action: 'command_result',
worker_id: worker_id,
success: success,
stdout: stdout,
stderr: stderr,
duration: duration,
timestamp: timestamp || new Date().toISOString()
});
// Broadcast result to all connected clients
broadcast({
type: 'command_result',
execution_id: execution_id,
worker_id: worker_id,
success: success,
stdout: stdout,
stderr: stderr
});
console.log(`Command result received for execution ${execution_id}`);
}
if (message.type === 'workflow_result') {
// Handle workflow result from worker
const { execution_id, worker_id, success, message: resultMessage, timestamp } = message;
// Add final result to logs
await addExecutionLog(execution_id, {
step: 'workflow_completion',
action: 'workflow_result',
worker_id: worker_id,
success: success,
message: resultMessage,
timestamp: timestamp || new Date().toISOString()
});
// Update execution status
const finalStatus = success ? 'completed' : 'failed';
await updateExecutionStatus(execution_id, finalStatus);
// Broadcast completion to all clients
broadcast({
type: 'workflow_result',
execution_id: execution_id,
status: finalStatus,
success: success,
message: resultMessage
});
console.log(`Workflow result received for execution ${execution_id}: ${finalStatus}`);
}
if (message.type === 'worker_connect') {
// Handle worker connection
const { worker_id, worker_name } = message;
console.log(`Worker connected: ${worker_name} (${worker_id})`);
// Update worker status to online
await pool.query(
`UPDATE workers SET status='online', last_heartbeat=NOW() WHERE id=?`,
[worker_id]
);
// Broadcast worker status update
broadcast({
type: 'worker_update',
worker_id: worker_id,
status: 'online'
});
}
if (message.type === 'pong') {
// Handle worker pong response
const { worker_id } = message;
await pool.query(
`UPDATE workers SET last_heartbeat=NOW() WHERE id=?`,
[worker_id]
);
}
} catch (error) {
console.error('WebSocket message error:', error);
}
});
ws.on('close', () => clients.delete(ws)); ws.on('close', () => clients.delete(ws));
}); });