Implement Complete Workflow Execution Engine
Added full workflow execution engine that actually runs workflow steps:
Server-Side (server.js):
- executeWorkflowSteps() - Main workflow orchestration function
- executeCommandStep() - Executes commands on target workers
- waitForCommandResult() - Polls for command completion
- Support for step types: execute, wait, prompt (prompt skipped for now)
- Sequential step execution with failure handling
- Worker targeting: "all" or specific worker IDs/names
- Automatic status updates (running -> completed/failed)
- Real-time WebSocket broadcasts for step progress
- Command result tracking with command_id for workflows
- Only updates status for non-workflow quick commands
Client-Side (index.html):
- Enhanced formatLogEntry() with workflow-specific log types
- step_started - Shows step number and name with amber color
- step_completed - Shows completion with green checkmark
- waiting - Displays wait duration
- no_workers - Error when no workers available
- worker_offline - Warning for offline workers
- workflow_error - Critical workflow errors
- Better visual feedback for workflow progress
Workflow Definition Format:
{
"steps": [
{
"name": "Step Name",
"type": "execute",
"targets": ["all"] or ["worker-name"],
"command": "your command here"
},
{
"type": "wait",
"duration": 5
}
]
}
Features:
- Executes steps sequentially
- Stops on first failure
- Supports multiple workers per step
- Real-time progress updates
- Comprehensive logging
- Terminal-themed workflow logs
Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
This commit is contained in:
@@ -1835,6 +1835,70 @@
|
||||
`;
|
||||
}
|
||||
|
||||
// Workflow step logs
|
||||
if (log.action === 'step_started') {
|
||||
return `
|
||||
<div class="log-entry" style="border-left-color: var(--terminal-amber);">
|
||||
<div class="log-timestamp">[${timestamp}]</div>
|
||||
<div class="log-title" style="color: var(--terminal-amber);">▶️ Step ${log.step}: ${log.step_name}</div>
|
||||
</div>
|
||||
`;
|
||||
}
|
||||
|
||||
if (log.action === 'step_completed') {
|
||||
return `
|
||||
<div class="log-entry" style="border-left-color: var(--terminal-green);">
|
||||
<div class="log-timestamp">[${timestamp}]</div>
|
||||
<div class="log-title" style="color: var(--terminal-green);">✓ Step ${log.step} Completed: ${log.step_name}</div>
|
||||
</div>
|
||||
`;
|
||||
}
|
||||
|
||||
if (log.action === 'waiting') {
|
||||
return `
|
||||
<div class="log-entry" style="border-left-color: var(--terminal-amber);">
|
||||
<div class="log-timestamp">[${timestamp}]</div>
|
||||
<div class="log-title" style="color: var(--terminal-amber);">⏳ Waiting ${log.duration} seconds...</div>
|
||||
</div>
|
||||
`;
|
||||
}
|
||||
|
||||
if (log.action === 'no_workers') {
|
||||
return `
|
||||
<div class="log-entry failed">
|
||||
<div class="log-timestamp">[${timestamp}]</div>
|
||||
<div class="log-title">✗ Step ${log.step}: No Workers Available</div>
|
||||
<div class="log-details">
|
||||
<div class="log-field">${escapeHtml(log.message)}</div>
|
||||
</div>
|
||||
</div>
|
||||
`;
|
||||
}
|
||||
|
||||
if (log.action === 'worker_offline') {
|
||||
return `
|
||||
<div class="log-entry" style="border-left-color: #ff4444;">
|
||||
<div class="log-timestamp">[${timestamp}]</div>
|
||||
<div class="log-title" style="color: #ff4444;">⚠️ Worker Offline</div>
|
||||
<div class="log-details">
|
||||
<div class="log-field"><span class="log-label">Worker ID:</span> ${log.worker_id}</div>
|
||||
</div>
|
||||
</div>
|
||||
`;
|
||||
}
|
||||
|
||||
if (log.action === 'workflow_error') {
|
||||
return `
|
||||
<div class="log-entry failed">
|
||||
<div class="log-timestamp">[${timestamp}]</div>
|
||||
<div class="log-title">✗ Workflow Error</div>
|
||||
<div class="log-details">
|
||||
<div class="log-field"><span class="log-label">Error:</span> ${escapeHtml(log.error)}</div>
|
||||
</div>
|
||||
</div>
|
||||
`;
|
||||
}
|
||||
|
||||
// Fallback for unknown log types
|
||||
return `<div class="log-entry"><pre>${JSON.stringify(log, null, 2)}</pre></div>`;
|
||||
}
|
||||
|
||||
267
server.js
267
server.js
@@ -241,12 +241,13 @@ wss.on('connection', (ws) => {
|
||||
|
||||
if (message.type === 'command_result') {
|
||||
// Handle command result from worker
|
||||
const { execution_id, worker_id, success, stdout, stderr, duration, timestamp } = message;
|
||||
const { execution_id, command_id, worker_id, success, stdout, stderr, duration, timestamp } = message;
|
||||
|
||||
// Add result to execution logs
|
||||
await addExecutionLog(execution_id, {
|
||||
step: 'command_execution',
|
||||
action: 'command_result',
|
||||
command_id: command_id, // Include command_id for workflow tracking
|
||||
worker_id: worker_id,
|
||||
success: success,
|
||||
stdout: stdout,
|
||||
@@ -255,9 +256,14 @@ wss.on('connection', (ws) => {
|
||||
timestamp: timestamp || new Date().toISOString()
|
||||
});
|
||||
|
||||
// Update execution status to completed or failed
|
||||
const finalStatus = success ? 'completed' : 'failed';
|
||||
await updateExecutionStatus(execution_id, finalStatus);
|
||||
// For non-workflow executions, update status immediately
|
||||
// For workflow executions, the workflow engine will update status
|
||||
const [execution] = await pool.query('SELECT workflow_id FROM executions WHERE id = ?', [execution_id]);
|
||||
if (execution.length > 0 && !execution[0].workflow_id) {
|
||||
// Only update status for quick commands (no workflow_id)
|
||||
const finalStatus = success ? 'completed' : 'failed';
|
||||
await updateExecutionStatus(execution_id, finalStatus);
|
||||
}
|
||||
|
||||
// Broadcast result to all connected clients
|
||||
broadcast({
|
||||
@@ -269,7 +275,7 @@ wss.on('connection', (ws) => {
|
||||
stderr: stderr
|
||||
});
|
||||
|
||||
console.log(`Command result received for execution ${execution_id}: ${finalStatus}`);
|
||||
console.log(`Command result received for execution ${execution_id}: ${success ? 'success' : 'failed'}`);
|
||||
}
|
||||
|
||||
if (message.type === 'workflow_result') {
|
||||
@@ -436,6 +442,237 @@ async function authenticateSSO(req, res, next) {
|
||||
next();
|
||||
}
|
||||
|
||||
// Workflow Execution Engine
|
||||
async function executeWorkflowSteps(executionId, workflowId, definition, username) {
|
||||
try {
|
||||
console.log(`[Workflow] Starting execution ${executionId} for workflow ${workflowId}`);
|
||||
|
||||
const steps = definition.steps || [];
|
||||
let allStepsSucceeded = true;
|
||||
|
||||
for (let i = 0; i < steps.length; i++) {
|
||||
const step = steps[i];
|
||||
console.log(`[Workflow] Execution ${executionId} - Step ${i + 1}/${steps.length}: ${step.name}`);
|
||||
|
||||
// Add step start log
|
||||
await addExecutionLog(executionId, {
|
||||
step: i + 1,
|
||||
step_name: step.name,
|
||||
action: 'step_started',
|
||||
timestamp: new Date().toISOString()
|
||||
});
|
||||
|
||||
broadcast({
|
||||
type: 'workflow_step_started',
|
||||
execution_id: executionId,
|
||||
step: i + 1,
|
||||
step_name: step.name
|
||||
});
|
||||
|
||||
if (step.type === 'execute') {
|
||||
// Execute command step
|
||||
const success = await executeCommandStep(executionId, step, i + 1);
|
||||
if (!success) {
|
||||
allStepsSucceeded = false;
|
||||
break; // Stop workflow on failure
|
||||
}
|
||||
} else if (step.type === 'wait') {
|
||||
// Wait step (delay in seconds)
|
||||
const seconds = step.duration || 5;
|
||||
await addExecutionLog(executionId, {
|
||||
step: i + 1,
|
||||
action: 'waiting',
|
||||
duration: seconds,
|
||||
timestamp: new Date().toISOString()
|
||||
});
|
||||
await new Promise(resolve => setTimeout(resolve, seconds * 1000));
|
||||
} else if (step.type === 'prompt') {
|
||||
// Interactive prompt step (not fully implemented, would need user interaction)
|
||||
await addExecutionLog(executionId, {
|
||||
step: i + 1,
|
||||
action: 'prompt_skipped',
|
||||
message: 'Interactive prompts not yet supported',
|
||||
timestamp: new Date().toISOString()
|
||||
});
|
||||
}
|
||||
|
||||
// Add step completion log
|
||||
await addExecutionLog(executionId, {
|
||||
step: i + 1,
|
||||
step_name: step.name,
|
||||
action: 'step_completed',
|
||||
timestamp: new Date().toISOString()
|
||||
});
|
||||
|
||||
broadcast({
|
||||
type: 'workflow_step_completed',
|
||||
execution_id: executionId,
|
||||
step: i + 1,
|
||||
step_name: step.name
|
||||
});
|
||||
}
|
||||
|
||||
// Mark execution as completed or failed
|
||||
const finalStatus = allStepsSucceeded ? 'completed' : 'failed';
|
||||
await updateExecutionStatus(executionId, finalStatus);
|
||||
|
||||
console.log(`[Workflow] Execution ${executionId} finished with status: ${finalStatus}`);
|
||||
|
||||
} catch (error) {
|
||||
console.error(`[Workflow] Execution ${executionId} error:`, error);
|
||||
await addExecutionLog(executionId, {
|
||||
action: 'workflow_error',
|
||||
error: error.message,
|
||||
timestamp: new Date().toISOString()
|
||||
});
|
||||
await updateExecutionStatus(executionId, 'failed');
|
||||
}
|
||||
}
|
||||
|
||||
async function executeCommandStep(executionId, step, stepNumber) {
|
||||
try {
|
||||
const command = step.command;
|
||||
const targets = step.targets || ['all'];
|
||||
|
||||
// Determine which workers to target
|
||||
let targetWorkerIds = [];
|
||||
|
||||
if (targets.includes('all')) {
|
||||
// Get all online workers
|
||||
const [onlineWorkers] = await pool.query('SELECT id FROM workers WHERE status = ?', ['online']);
|
||||
targetWorkerIds = onlineWorkers.map(w => w.id);
|
||||
} else {
|
||||
// Specific worker IDs or names
|
||||
for (const target of targets) {
|
||||
// Try to find by ID first, then by name
|
||||
const [workerById] = await pool.query('SELECT id FROM workers WHERE id = ?', [target]);
|
||||
if (workerById.length > 0) {
|
||||
targetWorkerIds.push(workerById[0].id);
|
||||
} else {
|
||||
const [workerByName] = await pool.query('SELECT id FROM workers WHERE name = ?', [target]);
|
||||
if (workerByName.length > 0) {
|
||||
targetWorkerIds.push(workerByName[0].id);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (targetWorkerIds.length === 0) {
|
||||
await addExecutionLog(executionId, {
|
||||
step: stepNumber,
|
||||
action: 'no_workers',
|
||||
message: 'No workers available for this step',
|
||||
timestamp: new Date().toISOString()
|
||||
});
|
||||
return false;
|
||||
}
|
||||
|
||||
// Execute command on each target worker and wait for results
|
||||
const results = [];
|
||||
|
||||
for (const workerId of targetWorkerIds) {
|
||||
const workerWs = workers.get(workerId);
|
||||
|
||||
if (!workerWs || workerWs.readyState !== WebSocket.OPEN) {
|
||||
await addExecutionLog(executionId, {
|
||||
step: stepNumber,
|
||||
action: 'worker_offline',
|
||||
worker_id: workerId,
|
||||
timestamp: new Date().toISOString()
|
||||
});
|
||||
continue;
|
||||
}
|
||||
|
||||
// Send command to worker
|
||||
const commandId = generateUUID();
|
||||
|
||||
await addExecutionLog(executionId, {
|
||||
step: stepNumber,
|
||||
action: 'command_sent',
|
||||
worker_id: workerId,
|
||||
command: command,
|
||||
command_id: commandId,
|
||||
timestamp: new Date().toISOString()
|
||||
});
|
||||
|
||||
workerWs.send(JSON.stringify({
|
||||
type: 'execute_command',
|
||||
execution_id: executionId,
|
||||
command_id: commandId,
|
||||
command: command,
|
||||
worker_id: workerId,
|
||||
timeout: 120000 // 2 minute timeout
|
||||
}));
|
||||
|
||||
// Wait for command result (with timeout)
|
||||
const result = await waitForCommandResult(executionId, commandId, 120000);
|
||||
results.push(result);
|
||||
|
||||
if (!result.success) {
|
||||
// Command failed, workflow should stop
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
// All commands succeeded
|
||||
return results.every(r => r.success);
|
||||
|
||||
} catch (error) {
|
||||
console.error(`[Workflow] Error executing command step:`, error);
|
||||
await addExecutionLog(executionId, {
|
||||
step: stepNumber,
|
||||
action: 'step_error',
|
||||
error: error.message,
|
||||
timestamp: new Date().toISOString()
|
||||
});
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
async function waitForCommandResult(executionId, commandId, timeout) {
|
||||
return new Promise((resolve) => {
|
||||
const startTime = Date.now();
|
||||
|
||||
const checkInterval = setInterval(async () => {
|
||||
try {
|
||||
// Check if we've received the command result in logs
|
||||
const [execution] = await pool.query('SELECT logs FROM executions WHERE id = ?', [executionId]);
|
||||
|
||||
if (execution.length > 0) {
|
||||
const logs = typeof execution[0].logs === 'string' ? JSON.parse(execution[0].logs) : execution[0].logs;
|
||||
const resultLog = logs.find(log => log.command_id === commandId && log.action === 'command_result');
|
||||
|
||||
if (resultLog) {
|
||||
clearInterval(checkInterval);
|
||||
resolve({
|
||||
success: resultLog.success,
|
||||
stdout: resultLog.stdout,
|
||||
stderr: resultLog.stderr
|
||||
});
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
// Check timeout
|
||||
if (Date.now() - startTime > timeout) {
|
||||
clearInterval(checkInterval);
|
||||
resolve({
|
||||
success: false,
|
||||
error: 'Command timeout'
|
||||
});
|
||||
}
|
||||
} catch (error) {
|
||||
console.error('[Workflow] Error checking command result:', error);
|
||||
clearInterval(checkInterval);
|
||||
resolve({
|
||||
success: false,
|
||||
error: error.message
|
||||
});
|
||||
}
|
||||
}, 500); // Check every 500ms
|
||||
});
|
||||
}
|
||||
|
||||
// Routes - All protected by SSO
|
||||
app.get('/api/user', authenticateSSO, (req, res) => {
|
||||
res.json(req.user);
|
||||
@@ -522,13 +759,29 @@ app.post('/api/executions', authenticateSSO, async (req, res) => {
|
||||
try {
|
||||
const { workflow_id } = req.body;
|
||||
const id = generateUUID();
|
||||
|
||||
|
||||
// Get workflow definition
|
||||
const [workflows] = await pool.query('SELECT * FROM workflows WHERE id = ?', [workflow_id]);
|
||||
if (workflows.length === 0) {
|
||||
return res.status(404).json({ error: 'Workflow not found' });
|
||||
}
|
||||
|
||||
const workflow = workflows[0];
|
||||
const definition = typeof workflow.definition === 'string' ? JSON.parse(workflow.definition) : workflow.definition;
|
||||
|
||||
// Create execution record
|
||||
await pool.query(
|
||||
'INSERT INTO executions (id, workflow_id, status, started_by, started_at, logs) VALUES (?, ?, ?, ?, NOW(), ?)',
|
||||
[id, workflow_id, 'running', req.user.username, JSON.stringify([])]
|
||||
);
|
||||
|
||||
|
||||
broadcast({ type: 'execution_started', execution_id: id, workflow_id });
|
||||
|
||||
// Start workflow execution asynchronously
|
||||
executeWorkflowSteps(id, workflow_id, definition, req.user.username).catch(err => {
|
||||
console.error(`[Workflow] Execution ${id} failed:`, err);
|
||||
});
|
||||
|
||||
res.json({ id, workflow_id, status: 'running' });
|
||||
} catch (error) {
|
||||
res.status(500).json({ error: error.message });
|
||||
|
||||
Reference in New Issue
Block a user