diff --git a/server.js b/server.js index 7858c7a..80e1d6b 100644 --- a/server.js +++ b/server.js @@ -927,299 +927,6 @@ initDatabase().then(() => { // ============================================ // WORKFLOW EXECUTION ENGINE -// ============================================ - -// Store active workflow executions in memory -const activeExecutions = new Map(); - -// Execute workflow step by step -async function executeWorkflow(workflowId, executionId, userId, targetWorkers = 'all') { - try { - // Get workflow definition - const [workflows] = await pool.query('SELECT * FROM workflows WHERE id = ?', [workflowId]); - if (workflows.length === 0) { - throw new Error('Workflow not found'); - } - - const workflow = workflows[0]; - const definition = JSON.parse(workflow.definition); - - // Initialize execution state - const executionState = { - id: executionId, - workflowId: workflowId, - currentStep: 0, - steps: definition.steps || [], - results: [], - status: 'running', - waitingForInput: false, - targetWorkers: targetWorkers, - userId: userId - }; - - activeExecutions.set(executionId, executionState); - - // Start executing steps - await executeNextStep(executionId); - - } catch (error) { - console.error('Workflow execution error:', error); - await updateExecutionStatus(executionId, 'failed', error.message); - } -} - -// Execute the next step in a workflow -async function executeNextStep(executionId) { - const state = activeExecutions.get(executionId); - if (!state) return; - - // Check if we've completed all steps - if (state.currentStep >= state.steps.length) { - await updateExecutionStatus(executionId, 'completed'); - activeExecutions.delete(executionId); - return; - } - - const step = state.steps[state.currentStep]; - - // Check if step has a condition - if (step.condition && !evaluateCondition(step.condition, state)) { - console.log(`[Workflow] Skipping step ${state.currentStep}: condition not met`); - state.currentStep++; - return executeNextStep(executionId); - } - - console.log(`[Workflow] Executing step ${state.currentStep}: ${step.name}`); - - try { - switch (step.type) { - case 'execute': - await executeCommandStep(executionId, step); - break; - case 'prompt': - await executePromptStep(executionId, step); - break; - case 'wait': - await executeWaitStep(executionId, step); - break; - default: - throw new Error(`Unknown step type: ${step.type}`); - } - } catch (error) { - await addExecutionLog(executionId, { - step: state.currentStep, - error: error.message, - timestamp: new Date().toISOString() - }); - await updateExecutionStatus(executionId, 'failed', error.message); - activeExecutions.delete(executionId); - } -} - -// Execute a command on workers -async function executeCommandStep(executionId, step) { - const state = activeExecutions.get(executionId); - - // Get target workers - const [workers] = await pool.query( - 'SELECT * FROM workers WHERE status = "online"' - ); - - if (workers.length === 0) { - throw new Error('No online workers available'); - } - - // Filter workers based on target - let targetWorkers = workers; - if (step.targets && step.targets[0] !== 'all') { - targetWorkers = workers.filter(w => step.targets.includes(w.name)); - } - - // Send command to workers via WebSocket - const commandMessage = { - type: 'execute_command', - execution_id: executionId, - step_index: state.currentStep, - command: step.command, - timeout: step.timeout || 300000 - }; - - // Broadcast to target workers - clients.forEach(client => { - if (client.readyState === WebSocket.OPEN) { - client.send(JSON.stringify(commandMessage)); - } - }); - - await addExecutionLog(executionId, { - step: state.currentStep, - action: 'command_sent', - command: step.command, - workers: targetWorkers.map(w => w.name), - timestamp: new Date().toISOString() - }); - - // For now, move to next step immediately - // In production, we'd wait for worker responses - state.currentStep++; - - // Small delay to allow command to execute - setTimeout(() => executeNextStep(executionId), 1000); -} - -// Execute a user prompt step -async function executePromptStep(executionId, step) { - const state = activeExecutions.get(executionId); - - state.waitingForInput = true; - state.promptData = { - message: step.message, - options: step.options, - step: state.currentStep - }; - - await addExecutionLog(executionId, { - step: state.currentStep, - action: 'waiting_for_input', - prompt: step.message, - timestamp: new Date().toISOString() - }); - - // Notify frontend that input is needed - broadcast({ - type: 'execution_prompt', - execution_id: executionId, - prompt: state.promptData - }); -} - -// Execute a wait/delay step -async function executeWaitStep(executionId, step) { - const state = activeExecutions.get(executionId); - const delay = step.duration || 1000; - - await addExecutionLog(executionId, { - step: state.currentStep, - action: 'waiting', - duration: delay, - timestamp: new Date().toISOString() - }); - - state.currentStep++; - setTimeout(() => executeNextStep(executionId), delay); -} - -// Handle user input for prompts -function handleUserInput(executionId, response) { - const state = activeExecutions.get(executionId); - if (!state || !state.waitingForInput) { - return false; - } - - state.promptResponse = response; - state.waitingForInput = false; - state.currentStep++; - - addExecutionLog(executionId, { - step: state.currentStep - 1, - action: 'user_response', - response: response, - timestamp: new Date().toISOString() - }); - - executeNextStep(executionId); - return true; -} - -// Evaluate conditions -function evaluateCondition(condition, state) { - try { - // Simple condition evaluation - // In production, use a proper expression evaluator - const promptResponse = state.promptResponse; - return eval(condition); - } catch (error) { - console.error('Condition evaluation error:', error); - return false; - } -} - -// Helper functions -async function updateExecutionStatus(executionId, status, error = null) { - const updates = { status }; - if (status === 'completed' || status === 'failed') { - updates.completed_at = new Date(); - } - if (error) { - // Add error to logs - await addExecutionLog(executionId, { error, timestamp: new Date().toISOString() }); - } - - await pool.query( - 'UPDATE executions SET status = ?, completed_at = ? WHERE id = ?', - [status, updates.completed_at || null, executionId] - ); - - broadcast({ - type: 'execution_status', - execution_id: executionId, - status: status - }); -} - -async function addExecutionLog(executionId, logEntry) { - const [rows] = await pool.query('SELECT logs FROM executions WHERE id = ?', [executionId]); - if (rows.length === 0) return; - - const logs = JSON.parse(rows[0].logs || '[]'); - logs.push(logEntry); - - await pool.query('UPDATE executions SET logs = ? WHERE id = ?', [ - JSON.stringify(logs), - executionId - ]); -} - -// ============================================ -// API ROUTES - Add these to your server.js -// ============================================ - -// Start workflow execution -app.post('/api/executions', authenticateSSO, async (req, res) => { - try { - const { workflow_id, target_workers } = req.body; - const id = generateUUID(); - - await pool.query( - 'INSERT INTO executions (id, workflow_id, status, started_by, started_at, logs) VALUES (?, ?, ?, ?, NOW(), ?)', - [id, workflow_id, 'running', req.user.username, JSON.stringify([])] - ); - - // Start execution - executeWorkflow(workflow_id, id, req.user.username, target_workers || 'all'); - - broadcast({ type: 'execution_started', execution_id: id, workflow_id }); - res.json({ id, workflow_id, status: 'running' }); - } catch (error) { - res.status(500).json({ error: error.message }); - } -}); - -// Respond to workflow prompt -app.post('/api/executions/:id/respond', authenticateSSO, async (req, res) => { - try { - const { response } = req.body; - const success = handleUserInput(req.params.id, response); - - if (success) { - res.json({ success: true }); - } else { - res.status(400).json({ error: 'Execution not waiting for input' }); - } - } catch (error) { - res.status(500).json({ error: error.message }); - } -}); // Get execution details with logs app.get('/api/executions/:id', authenticateSSO, async (req, res) => {