diff --git a/public/index.html b/public/index.html
index 8eabec5..3144c26 100644
--- a/public/index.html
+++ b/public/index.html
@@ -892,7 +892,9 @@
}
document.getElementById('executionDetails').innerHTML = html;
- document.getElementById('viewExecutionModal').classList.add('show');
+ const modal = document.getElementById('viewExecutionModal');
+ modal.dataset.executionId = executionId;
+ modal.classList.add('show');
} catch (error) {
console.error('Error viewing execution:', error);
alert('Error loading execution details');
@@ -1060,13 +1062,70 @@
function connectWebSocket() {
const protocol = window.location.protocol === 'https:' ? 'wss:' : 'ws:';
ws = new WebSocket(`${protocol}//${window.location.host}`);
-
+
ws.onmessage = (event) => {
const data = JSON.parse(event.data);
console.log('WebSocket message:', data);
- refreshData();
+
+ // Handle specific message types
+ if (data.type === 'command_result') {
+ // Display command result in real-time
+ console.log(`Command result received for execution ${data.execution_id}`);
+ console.log(`Success: ${data.success}`);
+ console.log(`Output: ${data.stdout}`);
+ if (data.stderr) {
+ console.log(`Error: ${data.stderr}`);
+ }
+
+ // If viewing execution details, refresh that specific execution
+ const executionModal = document.getElementById('viewExecutionModal');
+ if (executionModal && executionModal.classList.contains('show')) {
+ // Reload execution details to show new logs
+ const executionId = executionModal.dataset.executionId;
+ if (executionId === data.execution_id) {
+ viewExecution(executionId);
+ }
+ }
+
+ // Refresh execution list to show updated status
+ loadExecutions();
+ }
+
+ if (data.type === 'workflow_result') {
+ console.log(`Workflow ${data.status} for execution ${data.execution_id}`);
+
+ // Refresh execution list
+ loadExecutions();
+
+ // If viewing this execution, refresh details
+ const executionModal = document.getElementById('viewExecutionModal');
+ if (executionModal && executionModal.classList.contains('show')) {
+ const executionId = executionModal.dataset.executionId;
+ if (executionId === data.execution_id) {
+ viewExecution(executionId);
+ }
+ }
+ }
+
+ if (data.type === 'worker_update') {
+ console.log(`Worker ${data.worker_id} status: ${data.status}`);
+ loadWorkers();
+ }
+
+ if (data.type === 'execution_started' || data.type === 'execution_status') {
+ loadExecutions();
+ }
+
+ if (data.type === 'workflow_created' || data.type === 'workflow_deleted') {
+ loadWorkflows();
+ }
+
+ // Generic refresh for other message types
+ if (!['command_result', 'workflow_result', 'worker_update', 'execution_started', 'execution_status', 'workflow_created', 'workflow_deleted'].includes(data.type)) {
+ refreshData();
+ }
};
-
+
ws.onclose = () => {
console.log('WebSocket closed, reconnecting...');
setTimeout(connectWebSocket, 5000);
diff --git a/server.js b/server.js
index 511738a..0a07443 100644
--- a/server.js
+++ b/server.js
@@ -101,6 +101,105 @@ async function initDatabase() {
const clients = new Set();
wss.on('connection', (ws) => {
clients.add(ws);
+
+ // Handle incoming messages from workers
+ ws.on('message', async (data) => {
+ try {
+ const message = JSON.parse(data.toString());
+ console.log('WebSocket message received:', message.type);
+
+ if (message.type === 'command_result') {
+ // Handle command result from worker
+ const { execution_id, worker_id, success, stdout, stderr, duration, timestamp } = message;
+
+ // Add result to execution logs
+ await addExecutionLog(execution_id, {
+ step: 'command_execution',
+ action: 'command_result',
+ worker_id: worker_id,
+ success: success,
+ stdout: stdout,
+ stderr: stderr,
+ duration: duration,
+ timestamp: timestamp || new Date().toISOString()
+ });
+
+ // Broadcast result to all connected clients
+ broadcast({
+ type: 'command_result',
+ execution_id: execution_id,
+ worker_id: worker_id,
+ success: success,
+ stdout: stdout,
+ stderr: stderr
+ });
+
+ console.log(`Command result received for execution ${execution_id}`);
+ }
+
+ if (message.type === 'workflow_result') {
+ // Handle workflow result from worker
+ const { execution_id, worker_id, success, message: resultMessage, timestamp } = message;
+
+ // Add final result to logs
+ await addExecutionLog(execution_id, {
+ step: 'workflow_completion',
+ action: 'workflow_result',
+ worker_id: worker_id,
+ success: success,
+ message: resultMessage,
+ timestamp: timestamp || new Date().toISOString()
+ });
+
+ // Update execution status
+ const finalStatus = success ? 'completed' : 'failed';
+ await updateExecutionStatus(execution_id, finalStatus);
+
+ // Broadcast completion to all clients
+ broadcast({
+ type: 'workflow_result',
+ execution_id: execution_id,
+ status: finalStatus,
+ success: success,
+ message: resultMessage
+ });
+
+ console.log(`Workflow result received for execution ${execution_id}: ${finalStatus}`);
+ }
+
+ if (message.type === 'worker_connect') {
+ // Handle worker connection
+ const { worker_id, worker_name } = message;
+ console.log(`Worker connected: ${worker_name} (${worker_id})`);
+
+ // Update worker status to online
+ await pool.query(
+ `UPDATE workers SET status='online', last_heartbeat=NOW() WHERE id=?`,
+ [worker_id]
+ );
+
+ // Broadcast worker status update
+ broadcast({
+ type: 'worker_update',
+ worker_id: worker_id,
+ status: 'online'
+ });
+ }
+
+ if (message.type === 'pong') {
+ // Handle worker pong response
+ const { worker_id } = message;
+ await pool.query(
+ `UPDATE workers SET last_heartbeat=NOW() WHERE id=?`,
+ [worker_id]
+ );
+ }
+
+ } catch (error) {
+ console.error('WebSocket message error:', error);
+ }
+ });
+
ws.on('close', () => clients.delete(ws));
});