From 2f941d0b6fc5ba6378db58a412bad66ac57d726a Mon Sep 17 00:00:00 2001 From: Jared Vititoe Date: Wed, 7 Jan 2026 23:24:42 -0500 Subject: [PATCH] Fix: Remove duplicate old workflow execution code Removed old/obsolete workflow execution system that was conflicting with the new executeWorkflowSteps() engine: Removed: - activeExecutions Map (old tracking system) - executeWorkflow() - old workflow executor - executeNextStep() - old step processor - executeCommandStep() - old command executor (duplicate) - handleUserInput() - unimplemented prompt handler - Duplicate app.post('/api/executions') endpoint - app.post('/api/executions/:id/respond') endpoint This was causing "Cannot read properties of undefined (reading 'target')" error because the old code was being called instead of the new engine. The new executeWorkflowSteps() engine is now the only workflow system. Co-Authored-By: Claude Sonnet 4.5 --- server.js | 293 ------------------------------------------------------ 1 file changed, 293 deletions(-) diff --git a/server.js b/server.js index 7858c7a..80e1d6b 100644 --- a/server.js +++ b/server.js @@ -927,299 +927,6 @@ initDatabase().then(() => { // ============================================ // WORKFLOW EXECUTION ENGINE -// ============================================ - -// Store active workflow executions in memory -const activeExecutions = new Map(); - -// Execute workflow step by step -async function executeWorkflow(workflowId, executionId, userId, targetWorkers = 'all') { - try { - // Get workflow definition - const [workflows] = await pool.query('SELECT * FROM workflows WHERE id = ?', [workflowId]); - if (workflows.length === 0) { - throw new Error('Workflow not found'); - } - - const workflow = workflows[0]; - const definition = JSON.parse(workflow.definition); - - // Initialize execution state - const executionState = { - id: executionId, - workflowId: workflowId, - currentStep: 0, - steps: definition.steps || [], - results: [], - status: 'running', - waitingForInput: false, - targetWorkers: targetWorkers, - userId: userId - }; - - activeExecutions.set(executionId, executionState); - - // Start executing steps - await executeNextStep(executionId); - - } catch (error) { - console.error('Workflow execution error:', error); - await updateExecutionStatus(executionId, 'failed', error.message); - } -} - -// Execute the next step in a workflow -async function executeNextStep(executionId) { - const state = activeExecutions.get(executionId); - if (!state) return; - - // Check if we've completed all steps - if (state.currentStep >= state.steps.length) { - await updateExecutionStatus(executionId, 'completed'); - activeExecutions.delete(executionId); - return; - } - - const step = state.steps[state.currentStep]; - - // Check if step has a condition - if (step.condition && !evaluateCondition(step.condition, state)) { - console.log(`[Workflow] Skipping step ${state.currentStep}: condition not met`); - state.currentStep++; - return executeNextStep(executionId); - } - - console.log(`[Workflow] Executing step ${state.currentStep}: ${step.name}`); - - try { - switch (step.type) { - case 'execute': - await executeCommandStep(executionId, step); - break; - case 'prompt': - await executePromptStep(executionId, step); - break; - case 'wait': - await executeWaitStep(executionId, step); - break; - default: - throw new Error(`Unknown step type: ${step.type}`); - } - } catch (error) { - await addExecutionLog(executionId, { - step: state.currentStep, - error: error.message, - timestamp: new Date().toISOString() - }); - await updateExecutionStatus(executionId, 'failed', error.message); - activeExecutions.delete(executionId); - } -} - -// Execute a command on workers -async function executeCommandStep(executionId, step) { - const state = activeExecutions.get(executionId); - - // Get target workers - const [workers] = await pool.query( - 'SELECT * FROM workers WHERE status = "online"' - ); - - if (workers.length === 0) { - throw new Error('No online workers available'); - } - - // Filter workers based on target - let targetWorkers = workers; - if (step.targets && step.targets[0] !== 'all') { - targetWorkers = workers.filter(w => step.targets.includes(w.name)); - } - - // Send command to workers via WebSocket - const commandMessage = { - type: 'execute_command', - execution_id: executionId, - step_index: state.currentStep, - command: step.command, - timeout: step.timeout || 300000 - }; - - // Broadcast to target workers - clients.forEach(client => { - if (client.readyState === WebSocket.OPEN) { - client.send(JSON.stringify(commandMessage)); - } - }); - - await addExecutionLog(executionId, { - step: state.currentStep, - action: 'command_sent', - command: step.command, - workers: targetWorkers.map(w => w.name), - timestamp: new Date().toISOString() - }); - - // For now, move to next step immediately - // In production, we'd wait for worker responses - state.currentStep++; - - // Small delay to allow command to execute - setTimeout(() => executeNextStep(executionId), 1000); -} - -// Execute a user prompt step -async function executePromptStep(executionId, step) { - const state = activeExecutions.get(executionId); - - state.waitingForInput = true; - state.promptData = { - message: step.message, - options: step.options, - step: state.currentStep - }; - - await addExecutionLog(executionId, { - step: state.currentStep, - action: 'waiting_for_input', - prompt: step.message, - timestamp: new Date().toISOString() - }); - - // Notify frontend that input is needed - broadcast({ - type: 'execution_prompt', - execution_id: executionId, - prompt: state.promptData - }); -} - -// Execute a wait/delay step -async function executeWaitStep(executionId, step) { - const state = activeExecutions.get(executionId); - const delay = step.duration || 1000; - - await addExecutionLog(executionId, { - step: state.currentStep, - action: 'waiting', - duration: delay, - timestamp: new Date().toISOString() - }); - - state.currentStep++; - setTimeout(() => executeNextStep(executionId), delay); -} - -// Handle user input for prompts -function handleUserInput(executionId, response) { - const state = activeExecutions.get(executionId); - if (!state || !state.waitingForInput) { - return false; - } - - state.promptResponse = response; - state.waitingForInput = false; - state.currentStep++; - - addExecutionLog(executionId, { - step: state.currentStep - 1, - action: 'user_response', - response: response, - timestamp: new Date().toISOString() - }); - - executeNextStep(executionId); - return true; -} - -// Evaluate conditions -function evaluateCondition(condition, state) { - try { - // Simple condition evaluation - // In production, use a proper expression evaluator - const promptResponse = state.promptResponse; - return eval(condition); - } catch (error) { - console.error('Condition evaluation error:', error); - return false; - } -} - -// Helper functions -async function updateExecutionStatus(executionId, status, error = null) { - const updates = { status }; - if (status === 'completed' || status === 'failed') { - updates.completed_at = new Date(); - } - if (error) { - // Add error to logs - await addExecutionLog(executionId, { error, timestamp: new Date().toISOString() }); - } - - await pool.query( - 'UPDATE executions SET status = ?, completed_at = ? WHERE id = ?', - [status, updates.completed_at || null, executionId] - ); - - broadcast({ - type: 'execution_status', - execution_id: executionId, - status: status - }); -} - -async function addExecutionLog(executionId, logEntry) { - const [rows] = await pool.query('SELECT logs FROM executions WHERE id = ?', [executionId]); - if (rows.length === 0) return; - - const logs = JSON.parse(rows[0].logs || '[]'); - logs.push(logEntry); - - await pool.query('UPDATE executions SET logs = ? WHERE id = ?', [ - JSON.stringify(logs), - executionId - ]); -} - -// ============================================ -// API ROUTES - Add these to your server.js -// ============================================ - -// Start workflow execution -app.post('/api/executions', authenticateSSO, async (req, res) => { - try { - const { workflow_id, target_workers } = req.body; - const id = generateUUID(); - - await pool.query( - 'INSERT INTO executions (id, workflow_id, status, started_by, started_at, logs) VALUES (?, ?, ?, ?, NOW(), ?)', - [id, workflow_id, 'running', req.user.username, JSON.stringify([])] - ); - - // Start execution - executeWorkflow(workflow_id, id, req.user.username, target_workers || 'all'); - - broadcast({ type: 'execution_started', execution_id: id, workflow_id }); - res.json({ id, workflow_id, status: 'running' }); - } catch (error) { - res.status(500).json({ error: error.message }); - } -}); - -// Respond to workflow prompt -app.post('/api/executions/:id/respond', authenticateSSO, async (req, res) => { - try { - const { response } = req.body; - const success = handleUserInput(req.params.id, response); - - if (success) { - res.json({ success: true }); - } else { - res.status(400).json({ error: 'Execution not waiting for input' }); - } - } catch (error) { - res.status(500).json({ error: error.message }); - } -}); // Get execution details with logs app.get('/api/executions/:id', authenticateSSO, async (req, res) => {