Implement Complete Workflow Execution Engine

Added full workflow execution engine that actually runs workflow steps:

Server-Side (server.js):
- executeWorkflowSteps() - Main workflow orchestration function
- executeCommandStep() - Executes commands on target workers
- waitForCommandResult() - Polls for command completion
- Support for step types: execute, wait, prompt (prompt skipped for now)
- Sequential step execution with failure handling
- Worker targeting: "all" or specific worker IDs/names
- Automatic status updates (running -> completed/failed)
- Real-time WebSocket broadcasts for step progress
- Command result tracking with command_id for workflows
- Only updates status for non-workflow quick commands

Client-Side (index.html):
- Enhanced formatLogEntry() with workflow-specific log types
- step_started - Shows step number and name with amber color
- step_completed - Shows completion with green checkmark
- waiting - Displays wait duration
- no_workers - Error when no workers available
- worker_offline - Warning for offline workers
- workflow_error - Critical workflow errors
- Better visual feedback for workflow progress

Workflow Definition Format:
{
  "steps": [
    {
      "name": "Step Name",
      "type": "execute",
      "targets": ["all"] or ["worker-name"],
      "command": "your command here"
    },
    {
      "type": "wait",
      "duration": 5
    }
  ]
}

Features:
- Executes steps sequentially
- Stops on first failure
- Supports multiple workers per step
- Real-time progress updates
- Comprehensive logging
- Terminal-themed workflow logs

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
This commit is contained in:
2026-01-07 23:19:12 -05:00
parent 4511fac486
commit 018752813a
2 changed files with 324 additions and 7 deletions

View File

@@ -1835,6 +1835,70 @@
`; `;
} }
// Workflow step logs
if (log.action === 'step_started') {
return `
<div class="log-entry" style="border-left-color: var(--terminal-amber);">
<div class="log-timestamp">[${timestamp}]</div>
<div class="log-title" style="color: var(--terminal-amber);">▶️ Step ${log.step}: ${log.step_name}</div>
</div>
`;
}
if (log.action === 'step_completed') {
return `
<div class="log-entry" style="border-left-color: var(--terminal-green);">
<div class="log-timestamp">[${timestamp}]</div>
<div class="log-title" style="color: var(--terminal-green);">✓ Step ${log.step} Completed: ${log.step_name}</div>
</div>
`;
}
if (log.action === 'waiting') {
return `
<div class="log-entry" style="border-left-color: var(--terminal-amber);">
<div class="log-timestamp">[${timestamp}]</div>
<div class="log-title" style="color: var(--terminal-amber);">⏳ Waiting ${log.duration} seconds...</div>
</div>
`;
}
if (log.action === 'no_workers') {
return `
<div class="log-entry failed">
<div class="log-timestamp">[${timestamp}]</div>
<div class="log-title">✗ Step ${log.step}: No Workers Available</div>
<div class="log-details">
<div class="log-field">${escapeHtml(log.message)}</div>
</div>
</div>
`;
}
if (log.action === 'worker_offline') {
return `
<div class="log-entry" style="border-left-color: #ff4444;">
<div class="log-timestamp">[${timestamp}]</div>
<div class="log-title" style="color: #ff4444;">⚠️ Worker Offline</div>
<div class="log-details">
<div class="log-field"><span class="log-label">Worker ID:</span> ${log.worker_id}</div>
</div>
</div>
`;
}
if (log.action === 'workflow_error') {
return `
<div class="log-entry failed">
<div class="log-timestamp">[${timestamp}]</div>
<div class="log-title">✗ Workflow Error</div>
<div class="log-details">
<div class="log-field"><span class="log-label">Error:</span> ${escapeHtml(log.error)}</div>
</div>
</div>
`;
}
// Fallback for unknown log types // Fallback for unknown log types
return `<div class="log-entry"><pre>${JSON.stringify(log, null, 2)}</pre></div>`; return `<div class="log-entry"><pre>${JSON.stringify(log, null, 2)}</pre></div>`;
} }

267
server.js
View File

@@ -241,12 +241,13 @@ wss.on('connection', (ws) => {
if (message.type === 'command_result') { if (message.type === 'command_result') {
// Handle command result from worker // Handle command result from worker
const { execution_id, worker_id, success, stdout, stderr, duration, timestamp } = message; const { execution_id, command_id, worker_id, success, stdout, stderr, duration, timestamp } = message;
// Add result to execution logs // Add result to execution logs
await addExecutionLog(execution_id, { await addExecutionLog(execution_id, {
step: 'command_execution', step: 'command_execution',
action: 'command_result', action: 'command_result',
command_id: command_id, // Include command_id for workflow tracking
worker_id: worker_id, worker_id: worker_id,
success: success, success: success,
stdout: stdout, stdout: stdout,
@@ -255,9 +256,14 @@ wss.on('connection', (ws) => {
timestamp: timestamp || new Date().toISOString() timestamp: timestamp || new Date().toISOString()
}); });
// Update execution status to completed or failed // For non-workflow executions, update status immediately
const finalStatus = success ? 'completed' : 'failed'; // For workflow executions, the workflow engine will update status
await updateExecutionStatus(execution_id, finalStatus); const [execution] = await pool.query('SELECT workflow_id FROM executions WHERE id = ?', [execution_id]);
if (execution.length > 0 && !execution[0].workflow_id) {
// Only update status for quick commands (no workflow_id)
const finalStatus = success ? 'completed' : 'failed';
await updateExecutionStatus(execution_id, finalStatus);
}
// Broadcast result to all connected clients // Broadcast result to all connected clients
broadcast({ broadcast({
@@ -269,7 +275,7 @@ wss.on('connection', (ws) => {
stderr: stderr stderr: stderr
}); });
console.log(`Command result received for execution ${execution_id}: ${finalStatus}`); console.log(`Command result received for execution ${execution_id}: ${success ? 'success' : 'failed'}`);
} }
if (message.type === 'workflow_result') { if (message.type === 'workflow_result') {
@@ -436,6 +442,237 @@ async function authenticateSSO(req, res, next) {
next(); next();
} }
// Workflow Execution Engine
async function executeWorkflowSteps(executionId, workflowId, definition, username) {
try {
console.log(`[Workflow] Starting execution ${executionId} for workflow ${workflowId}`);
const steps = definition.steps || [];
let allStepsSucceeded = true;
for (let i = 0; i < steps.length; i++) {
const step = steps[i];
console.log(`[Workflow] Execution ${executionId} - Step ${i + 1}/${steps.length}: ${step.name}`);
// Add step start log
await addExecutionLog(executionId, {
step: i + 1,
step_name: step.name,
action: 'step_started',
timestamp: new Date().toISOString()
});
broadcast({
type: 'workflow_step_started',
execution_id: executionId,
step: i + 1,
step_name: step.name
});
if (step.type === 'execute') {
// Execute command step
const success = await executeCommandStep(executionId, step, i + 1);
if (!success) {
allStepsSucceeded = false;
break; // Stop workflow on failure
}
} else if (step.type === 'wait') {
// Wait step (delay in seconds)
const seconds = step.duration || 5;
await addExecutionLog(executionId, {
step: i + 1,
action: 'waiting',
duration: seconds,
timestamp: new Date().toISOString()
});
await new Promise(resolve => setTimeout(resolve, seconds * 1000));
} else if (step.type === 'prompt') {
// Interactive prompt step (not fully implemented, would need user interaction)
await addExecutionLog(executionId, {
step: i + 1,
action: 'prompt_skipped',
message: 'Interactive prompts not yet supported',
timestamp: new Date().toISOString()
});
}
// Add step completion log
await addExecutionLog(executionId, {
step: i + 1,
step_name: step.name,
action: 'step_completed',
timestamp: new Date().toISOString()
});
broadcast({
type: 'workflow_step_completed',
execution_id: executionId,
step: i + 1,
step_name: step.name
});
}
// Mark execution as completed or failed
const finalStatus = allStepsSucceeded ? 'completed' : 'failed';
await updateExecutionStatus(executionId, finalStatus);
console.log(`[Workflow] Execution ${executionId} finished with status: ${finalStatus}`);
} catch (error) {
console.error(`[Workflow] Execution ${executionId} error:`, error);
await addExecutionLog(executionId, {
action: 'workflow_error',
error: error.message,
timestamp: new Date().toISOString()
});
await updateExecutionStatus(executionId, 'failed');
}
}
async function executeCommandStep(executionId, step, stepNumber) {
try {
const command = step.command;
const targets = step.targets || ['all'];
// Determine which workers to target
let targetWorkerIds = [];
if (targets.includes('all')) {
// Get all online workers
const [onlineWorkers] = await pool.query('SELECT id FROM workers WHERE status = ?', ['online']);
targetWorkerIds = onlineWorkers.map(w => w.id);
} else {
// Specific worker IDs or names
for (const target of targets) {
// Try to find by ID first, then by name
const [workerById] = await pool.query('SELECT id FROM workers WHERE id = ?', [target]);
if (workerById.length > 0) {
targetWorkerIds.push(workerById[0].id);
} else {
const [workerByName] = await pool.query('SELECT id FROM workers WHERE name = ?', [target]);
if (workerByName.length > 0) {
targetWorkerIds.push(workerByName[0].id);
}
}
}
}
if (targetWorkerIds.length === 0) {
await addExecutionLog(executionId, {
step: stepNumber,
action: 'no_workers',
message: 'No workers available for this step',
timestamp: new Date().toISOString()
});
return false;
}
// Execute command on each target worker and wait for results
const results = [];
for (const workerId of targetWorkerIds) {
const workerWs = workers.get(workerId);
if (!workerWs || workerWs.readyState !== WebSocket.OPEN) {
await addExecutionLog(executionId, {
step: stepNumber,
action: 'worker_offline',
worker_id: workerId,
timestamp: new Date().toISOString()
});
continue;
}
// Send command to worker
const commandId = generateUUID();
await addExecutionLog(executionId, {
step: stepNumber,
action: 'command_sent',
worker_id: workerId,
command: command,
command_id: commandId,
timestamp: new Date().toISOString()
});
workerWs.send(JSON.stringify({
type: 'execute_command',
execution_id: executionId,
command_id: commandId,
command: command,
worker_id: workerId,
timeout: 120000 // 2 minute timeout
}));
// Wait for command result (with timeout)
const result = await waitForCommandResult(executionId, commandId, 120000);
results.push(result);
if (!result.success) {
// Command failed, workflow should stop
return false;
}
}
// All commands succeeded
return results.every(r => r.success);
} catch (error) {
console.error(`[Workflow] Error executing command step:`, error);
await addExecutionLog(executionId, {
step: stepNumber,
action: 'step_error',
error: error.message,
timestamp: new Date().toISOString()
});
return false;
}
}
async function waitForCommandResult(executionId, commandId, timeout) {
return new Promise((resolve) => {
const startTime = Date.now();
const checkInterval = setInterval(async () => {
try {
// Check if we've received the command result in logs
const [execution] = await pool.query('SELECT logs FROM executions WHERE id = ?', [executionId]);
if (execution.length > 0) {
const logs = typeof execution[0].logs === 'string' ? JSON.parse(execution[0].logs) : execution[0].logs;
const resultLog = logs.find(log => log.command_id === commandId && log.action === 'command_result');
if (resultLog) {
clearInterval(checkInterval);
resolve({
success: resultLog.success,
stdout: resultLog.stdout,
stderr: resultLog.stderr
});
return;
}
}
// Check timeout
if (Date.now() - startTime > timeout) {
clearInterval(checkInterval);
resolve({
success: false,
error: 'Command timeout'
});
}
} catch (error) {
console.error('[Workflow] Error checking command result:', error);
clearInterval(checkInterval);
resolve({
success: false,
error: error.message
});
}
}, 500); // Check every 500ms
});
}
// Routes - All protected by SSO // Routes - All protected by SSO
app.get('/api/user', authenticateSSO, (req, res) => { app.get('/api/user', authenticateSSO, (req, res) => {
res.json(req.user); res.json(req.user);
@@ -522,13 +759,29 @@ app.post('/api/executions', authenticateSSO, async (req, res) => {
try { try {
const { workflow_id } = req.body; const { workflow_id } = req.body;
const id = generateUUID(); const id = generateUUID();
// Get workflow definition
const [workflows] = await pool.query('SELECT * FROM workflows WHERE id = ?', [workflow_id]);
if (workflows.length === 0) {
return res.status(404).json({ error: 'Workflow not found' });
}
const workflow = workflows[0];
const definition = typeof workflow.definition === 'string' ? JSON.parse(workflow.definition) : workflow.definition;
// Create execution record
await pool.query( await pool.query(
'INSERT INTO executions (id, workflow_id, status, started_by, started_at, logs) VALUES (?, ?, ?, ?, NOW(), ?)', 'INSERT INTO executions (id, workflow_id, status, started_by, started_at, logs) VALUES (?, ?, ?, ?, NOW(), ?)',
[id, workflow_id, 'running', req.user.username, JSON.stringify([])] [id, workflow_id, 'running', req.user.username, JSON.stringify([])]
); );
broadcast({ type: 'execution_started', execution_id: id, workflow_id }); broadcast({ type: 'execution_started', execution_id: id, workflow_id });
// Start workflow execution asynchronously
executeWorkflowSteps(id, workflow_id, definition, req.user.username).catch(err => {
console.error(`[Workflow] Execution ${id} failed:`, err);
});
res.json({ id, workflow_id, status: 'running' }); res.json({ id, workflow_id, status: 'running' });
} catch (error) { } catch (error) {
res.status(500).json({ error: error.message }); res.status(500).json({ error: error.message });