From 05c304f2ed153206f1013abaf651c6c335bd92c6 Mon Sep 17 00:00:00 2001 From: Jared Vititoe Date: Wed, 7 Jan 2026 20:20:18 -0500 Subject: [PATCH] Updated websocket handler --- public/index.html | 67 ++++++++++++++++++++++++++++++-- server.js | 99 +++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 162 insertions(+), 4 deletions(-) diff --git a/public/index.html b/public/index.html index 8eabec5..3144c26 100644 --- a/public/index.html +++ b/public/index.html @@ -892,7 +892,9 @@ } document.getElementById('executionDetails').innerHTML = html; - document.getElementById('viewExecutionModal').classList.add('show'); + const modal = document.getElementById('viewExecutionModal'); + modal.dataset.executionId = executionId; + modal.classList.add('show'); } catch (error) { console.error('Error viewing execution:', error); alert('Error loading execution details'); @@ -1060,13 +1062,70 @@ function connectWebSocket() { const protocol = window.location.protocol === 'https:' ? 'wss:' : 'ws:'; ws = new WebSocket(`${protocol}//${window.location.host}`); - + ws.onmessage = (event) => { const data = JSON.parse(event.data); console.log('WebSocket message:', data); - refreshData(); + + // Handle specific message types + if (data.type === 'command_result') { + // Display command result in real-time + console.log(`Command result received for execution ${data.execution_id}`); + console.log(`Success: ${data.success}`); + console.log(`Output: ${data.stdout}`); + if (data.stderr) { + console.log(`Error: ${data.stderr}`); + } + + // If viewing execution details, refresh that specific execution + const executionModal = document.getElementById('viewExecutionModal'); + if (executionModal && executionModal.classList.contains('show')) { + // Reload execution details to show new logs + const executionId = executionModal.dataset.executionId; + if (executionId === data.execution_id) { + viewExecution(executionId); + } + } + + // Refresh execution list to show updated status + loadExecutions(); + } + + if (data.type === 'workflow_result') { + console.log(`Workflow ${data.status} for execution ${data.execution_id}`); + + // Refresh execution list + loadExecutions(); + + // If viewing this execution, refresh details + const executionModal = document.getElementById('viewExecutionModal'); + if (executionModal && executionModal.classList.contains('show')) { + const executionId = executionModal.dataset.executionId; + if (executionId === data.execution_id) { + viewExecution(executionId); + } + } + } + + if (data.type === 'worker_update') { + console.log(`Worker ${data.worker_id} status: ${data.status}`); + loadWorkers(); + } + + if (data.type === 'execution_started' || data.type === 'execution_status') { + loadExecutions(); + } + + if (data.type === 'workflow_created' || data.type === 'workflow_deleted') { + loadWorkflows(); + } + + // Generic refresh for other message types + if (!['command_result', 'workflow_result', 'worker_update', 'execution_started', 'execution_status', 'workflow_created', 'workflow_deleted'].includes(data.type)) { + refreshData(); + } }; - + ws.onclose = () => { console.log('WebSocket closed, reconnecting...'); setTimeout(connectWebSocket, 5000); diff --git a/server.js b/server.js index 511738a..0a07443 100644 --- a/server.js +++ b/server.js @@ -101,6 +101,105 @@ async function initDatabase() { const clients = new Set(); wss.on('connection', (ws) => { clients.add(ws); + + // Handle incoming messages from workers + ws.on('message', async (data) => { + try { + const message = JSON.parse(data.toString()); + console.log('WebSocket message received:', message.type); + + if (message.type === 'command_result') { + // Handle command result from worker + const { execution_id, worker_id, success, stdout, stderr, duration, timestamp } = message; + + // Add result to execution logs + await addExecutionLog(execution_id, { + step: 'command_execution', + action: 'command_result', + worker_id: worker_id, + success: success, + stdout: stdout, + stderr: stderr, + duration: duration, + timestamp: timestamp || new Date().toISOString() + }); + + // Broadcast result to all connected clients + broadcast({ + type: 'command_result', + execution_id: execution_id, + worker_id: worker_id, + success: success, + stdout: stdout, + stderr: stderr + }); + + console.log(`Command result received for execution ${execution_id}`); + } + + if (message.type === 'workflow_result') { + // Handle workflow result from worker + const { execution_id, worker_id, success, message: resultMessage, timestamp } = message; + + // Add final result to logs + await addExecutionLog(execution_id, { + step: 'workflow_completion', + action: 'workflow_result', + worker_id: worker_id, + success: success, + message: resultMessage, + timestamp: timestamp || new Date().toISOString() + }); + + // Update execution status + const finalStatus = success ? 'completed' : 'failed'; + await updateExecutionStatus(execution_id, finalStatus); + + // Broadcast completion to all clients + broadcast({ + type: 'workflow_result', + execution_id: execution_id, + status: finalStatus, + success: success, + message: resultMessage + }); + + console.log(`Workflow result received for execution ${execution_id}: ${finalStatus}`); + } + + if (message.type === 'worker_connect') { + // Handle worker connection + const { worker_id, worker_name } = message; + console.log(`Worker connected: ${worker_name} (${worker_id})`); + + // Update worker status to online + await pool.query( + `UPDATE workers SET status='online', last_heartbeat=NOW() WHERE id=?`, + [worker_id] + ); + + // Broadcast worker status update + broadcast({ + type: 'worker_update', + worker_id: worker_id, + status: 'online' + }); + } + + if (message.type === 'pong') { + // Handle worker pong response + const { worker_id } = message; + await pool.query( + `UPDATE workers SET last_heartbeat=NOW() WHERE id=?`, + [worker_id] + ); + } + + } catch (error) { + console.error('WebSocket message error:', error); + } + }); + ws.on('close', () => clients.delete(ws)); });