diff --git a/public/index.html b/public/index.html index 8f0a524..c5eb7a5 100644 --- a/public/index.html +++ b/public/index.html @@ -1835,6 +1835,70 @@ `; } + // Workflow step logs + if (log.action === 'step_started') { + return ` +
+
[${timestamp}]
+
▶️ Step ${log.step}: ${log.step_name}
+
+ `; + } + + if (log.action === 'step_completed') { + return ` +
+
[${timestamp}]
+
✓ Step ${log.step} Completed: ${log.step_name}
+
+ `; + } + + if (log.action === 'waiting') { + return ` +
+
[${timestamp}]
+
⏳ Waiting ${log.duration} seconds...
+
+ `; + } + + if (log.action === 'no_workers') { + return ` +
+
[${timestamp}]
+
✗ Step ${log.step}: No Workers Available
+
+
${escapeHtml(log.message)}
+
+
+ `; + } + + if (log.action === 'worker_offline') { + return ` +
+
[${timestamp}]
+
⚠️ Worker Offline
+
+
Worker ID: ${log.worker_id}
+
+
+ `; + } + + if (log.action === 'workflow_error') { + return ` +
+
[${timestamp}]
+
✗ Workflow Error
+
+
Error: ${escapeHtml(log.error)}
+
+
+ `; + } + // Fallback for unknown log types return `
${JSON.stringify(log, null, 2)}
`; } diff --git a/server.js b/server.js index c121b15..7858c7a 100644 --- a/server.js +++ b/server.js @@ -241,12 +241,13 @@ wss.on('connection', (ws) => { if (message.type === 'command_result') { // Handle command result from worker - const { execution_id, worker_id, success, stdout, stderr, duration, timestamp } = message; + const { execution_id, command_id, worker_id, success, stdout, stderr, duration, timestamp } = message; // Add result to execution logs await addExecutionLog(execution_id, { step: 'command_execution', action: 'command_result', + command_id: command_id, // Include command_id for workflow tracking worker_id: worker_id, success: success, stdout: stdout, @@ -255,9 +256,14 @@ wss.on('connection', (ws) => { timestamp: timestamp || new Date().toISOString() }); - // Update execution status to completed or failed - const finalStatus = success ? 'completed' : 'failed'; - await updateExecutionStatus(execution_id, finalStatus); + // For non-workflow executions, update status immediately + // For workflow executions, the workflow engine will update status + const [execution] = await pool.query('SELECT workflow_id FROM executions WHERE id = ?', [execution_id]); + if (execution.length > 0 && !execution[0].workflow_id) { + // Only update status for quick commands (no workflow_id) + const finalStatus = success ? 'completed' : 'failed'; + await updateExecutionStatus(execution_id, finalStatus); + } // Broadcast result to all connected clients broadcast({ @@ -269,7 +275,7 @@ wss.on('connection', (ws) => { stderr: stderr }); - console.log(`Command result received for execution ${execution_id}: ${finalStatus}`); + console.log(`Command result received for execution ${execution_id}: ${success ? 'success' : 'failed'}`); } if (message.type === 'workflow_result') { @@ -436,6 +442,237 @@ async function authenticateSSO(req, res, next) { next(); } +// Workflow Execution Engine +async function executeWorkflowSteps(executionId, workflowId, definition, username) { + try { + console.log(`[Workflow] Starting execution ${executionId} for workflow ${workflowId}`); + + const steps = definition.steps || []; + let allStepsSucceeded = true; + + for (let i = 0; i < steps.length; i++) { + const step = steps[i]; + console.log(`[Workflow] Execution ${executionId} - Step ${i + 1}/${steps.length}: ${step.name}`); + + // Add step start log + await addExecutionLog(executionId, { + step: i + 1, + step_name: step.name, + action: 'step_started', + timestamp: new Date().toISOString() + }); + + broadcast({ + type: 'workflow_step_started', + execution_id: executionId, + step: i + 1, + step_name: step.name + }); + + if (step.type === 'execute') { + // Execute command step + const success = await executeCommandStep(executionId, step, i + 1); + if (!success) { + allStepsSucceeded = false; + break; // Stop workflow on failure + } + } else if (step.type === 'wait') { + // Wait step (delay in seconds) + const seconds = step.duration || 5; + await addExecutionLog(executionId, { + step: i + 1, + action: 'waiting', + duration: seconds, + timestamp: new Date().toISOString() + }); + await new Promise(resolve => setTimeout(resolve, seconds * 1000)); + } else if (step.type === 'prompt') { + // Interactive prompt step (not fully implemented, would need user interaction) + await addExecutionLog(executionId, { + step: i + 1, + action: 'prompt_skipped', + message: 'Interactive prompts not yet supported', + timestamp: new Date().toISOString() + }); + } + + // Add step completion log + await addExecutionLog(executionId, { + step: i + 1, + step_name: step.name, + action: 'step_completed', + timestamp: new Date().toISOString() + }); + + broadcast({ + type: 'workflow_step_completed', + execution_id: executionId, + step: i + 1, + step_name: step.name + }); + } + + // Mark execution as completed or failed + const finalStatus = allStepsSucceeded ? 'completed' : 'failed'; + await updateExecutionStatus(executionId, finalStatus); + + console.log(`[Workflow] Execution ${executionId} finished with status: ${finalStatus}`); + + } catch (error) { + console.error(`[Workflow] Execution ${executionId} error:`, error); + await addExecutionLog(executionId, { + action: 'workflow_error', + error: error.message, + timestamp: new Date().toISOString() + }); + await updateExecutionStatus(executionId, 'failed'); + } +} + +async function executeCommandStep(executionId, step, stepNumber) { + try { + const command = step.command; + const targets = step.targets || ['all']; + + // Determine which workers to target + let targetWorkerIds = []; + + if (targets.includes('all')) { + // Get all online workers + const [onlineWorkers] = await pool.query('SELECT id FROM workers WHERE status = ?', ['online']); + targetWorkerIds = onlineWorkers.map(w => w.id); + } else { + // Specific worker IDs or names + for (const target of targets) { + // Try to find by ID first, then by name + const [workerById] = await pool.query('SELECT id FROM workers WHERE id = ?', [target]); + if (workerById.length > 0) { + targetWorkerIds.push(workerById[0].id); + } else { + const [workerByName] = await pool.query('SELECT id FROM workers WHERE name = ?', [target]); + if (workerByName.length > 0) { + targetWorkerIds.push(workerByName[0].id); + } + } + } + } + + if (targetWorkerIds.length === 0) { + await addExecutionLog(executionId, { + step: stepNumber, + action: 'no_workers', + message: 'No workers available for this step', + timestamp: new Date().toISOString() + }); + return false; + } + + // Execute command on each target worker and wait for results + const results = []; + + for (const workerId of targetWorkerIds) { + const workerWs = workers.get(workerId); + + if (!workerWs || workerWs.readyState !== WebSocket.OPEN) { + await addExecutionLog(executionId, { + step: stepNumber, + action: 'worker_offline', + worker_id: workerId, + timestamp: new Date().toISOString() + }); + continue; + } + + // Send command to worker + const commandId = generateUUID(); + + await addExecutionLog(executionId, { + step: stepNumber, + action: 'command_sent', + worker_id: workerId, + command: command, + command_id: commandId, + timestamp: new Date().toISOString() + }); + + workerWs.send(JSON.stringify({ + type: 'execute_command', + execution_id: executionId, + command_id: commandId, + command: command, + worker_id: workerId, + timeout: 120000 // 2 minute timeout + })); + + // Wait for command result (with timeout) + const result = await waitForCommandResult(executionId, commandId, 120000); + results.push(result); + + if (!result.success) { + // Command failed, workflow should stop + return false; + } + } + + // All commands succeeded + return results.every(r => r.success); + + } catch (error) { + console.error(`[Workflow] Error executing command step:`, error); + await addExecutionLog(executionId, { + step: stepNumber, + action: 'step_error', + error: error.message, + timestamp: new Date().toISOString() + }); + return false; + } +} + +async function waitForCommandResult(executionId, commandId, timeout) { + return new Promise((resolve) => { + const startTime = Date.now(); + + const checkInterval = setInterval(async () => { + try { + // Check if we've received the command result in logs + const [execution] = await pool.query('SELECT logs FROM executions WHERE id = ?', [executionId]); + + if (execution.length > 0) { + const logs = typeof execution[0].logs === 'string' ? JSON.parse(execution[0].logs) : execution[0].logs; + const resultLog = logs.find(log => log.command_id === commandId && log.action === 'command_result'); + + if (resultLog) { + clearInterval(checkInterval); + resolve({ + success: resultLog.success, + stdout: resultLog.stdout, + stderr: resultLog.stderr + }); + return; + } + } + + // Check timeout + if (Date.now() - startTime > timeout) { + clearInterval(checkInterval); + resolve({ + success: false, + error: 'Command timeout' + }); + } + } catch (error) { + console.error('[Workflow] Error checking command result:', error); + clearInterval(checkInterval); + resolve({ + success: false, + error: error.message + }); + } + }, 500); // Check every 500ms + }); +} + // Routes - All protected by SSO app.get('/api/user', authenticateSSO, (req, res) => { res.json(req.user); @@ -522,13 +759,29 @@ app.post('/api/executions', authenticateSSO, async (req, res) => { try { const { workflow_id } = req.body; const id = generateUUID(); - + + // Get workflow definition + const [workflows] = await pool.query('SELECT * FROM workflows WHERE id = ?', [workflow_id]); + if (workflows.length === 0) { + return res.status(404).json({ error: 'Workflow not found' }); + } + + const workflow = workflows[0]; + const definition = typeof workflow.definition === 'string' ? JSON.parse(workflow.definition) : workflow.definition; + + // Create execution record await pool.query( 'INSERT INTO executions (id, workflow_id, status, started_by, started_at, logs) VALUES (?, ?, ?, ?, NOW(), ?)', [id, workflow_id, 'running', req.user.username, JSON.stringify([])] ); - + broadcast({ type: 'execution_started', execution_id: id, workflow_id }); + + // Start workflow execution asynchronously + executeWorkflowSteps(id, workflow_id, definition, req.user.username).catch(err => { + console.error(`[Workflow] Execution ${id} failed:`, err); + }); + res.json({ id, workflow_id, status: 'running' }); } catch (error) { res.status(500).json({ error: error.message });