Add interactive workflow system with prompt steps, workflow editor
- executeWorkflowSteps: rewritten with step-id/goto branching support - executePromptStep: async pause via _executionPrompts Map, 60min timeout - POST /api/executions/:id/respond: resolves pending prompt from browser - PUT /api/workflows/🆔 admin-only workflow editing, broadcasts workflow_updated - GET /api/workflows/🆔 fetch single workflow for edit modal - GET /api/executions/🆔 now includes waiting_for_input + prompt fields - index.html: prompt/prompt_response/step_skipped log entry rendering - index.html: execution_prompt WebSocket handler refreshes open modal - index.html: workflow_updated WebSocket handler reloads workflow list - index.html: Edit button + modal for in-browser workflow editing - index.html: respondToPrompt keeps modal open, refreshes execution view - Interactive Link Troubleshooter v2 workflow: 45-step wizard with copper/fiber branches, clean/swap/reseat actions, re-test loops, CRC error path, performance diagnostics, SUCCESS/ESCALATE terminals Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
257
server.js
257
server.js
@@ -508,92 +508,161 @@ function applyParams(command, params) {
|
||||
});
|
||||
}
|
||||
|
||||
// Evaluate a condition string against execution state and params.
|
||||
function evalCondition(condition, state, params) {
|
||||
try {
|
||||
// eslint-disable-next-line no-new-func
|
||||
return !!new Function('state', 'params', `return !!(${condition})`)(state, params);
|
||||
} catch (e) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
// Per-execution mutable state (params + user-keyed state dict).
|
||||
// Survives across step boundaries; cleaned up when execution ends.
|
||||
const _executionState = new Map(); // executionId → { params, state }
|
||||
|
||||
// Pending prompt resolvers — set when a prompt step is waiting for user input.
|
||||
const _executionPrompts = new Map(); // executionId → resolve fn
|
||||
|
||||
async function executeWorkflowSteps(executionId, workflowId, definition, username, params = {}) {
|
||||
_executionState.set(executionId, { params, state: {} });
|
||||
|
||||
try {
|
||||
console.log(`[Workflow] Starting execution ${executionId} for workflow ${workflowId}`);
|
||||
|
||||
const steps = definition.steps || [];
|
||||
let allStepsSucceeded = true;
|
||||
|
||||
for (let i = 0; i < steps.length; i++) {
|
||||
const step = steps[i];
|
||||
console.log(`[Workflow] Execution ${executionId} - Step ${i + 1}/${steps.length}: ${step.name}`);
|
||||
// Build step-id → index map for goto support
|
||||
const stepIdMap = new Map();
|
||||
steps.forEach((step, i) => { if (step.id) stepIdMap.set(step.id, i); });
|
||||
|
||||
let currentIndex = 0;
|
||||
|
||||
while (currentIndex < steps.length) {
|
||||
const step = steps[currentIndex];
|
||||
const execState = _executionState.get(executionId);
|
||||
const stepLabel = step.name || step.id || `Step ${currentIndex + 1}`;
|
||||
console.log(`[Workflow] ${executionId} — step ${currentIndex + 1}: ${stepLabel}`);
|
||||
|
||||
// Add step start log
|
||||
await addExecutionLog(executionId, {
|
||||
step: i + 1,
|
||||
step_name: step.name,
|
||||
action: 'step_started',
|
||||
step: currentIndex + 1, step_name: stepLabel, action: 'step_started',
|
||||
timestamp: new Date().toISOString()
|
||||
});
|
||||
broadcast({ type: 'workflow_step_started', execution_id: executionId,
|
||||
step: currentIndex + 1, step_name: stepLabel });
|
||||
|
||||
broadcast({
|
||||
type: 'workflow_step_started',
|
||||
execution_id: executionId,
|
||||
step: i + 1,
|
||||
step_name: step.name
|
||||
});
|
||||
let gotoId = step.goto || null; // may be overridden by prompt routes
|
||||
|
||||
if (step.type === 'execute') {
|
||||
// Execute command step
|
||||
const success = await executeCommandStep(executionId, step, i + 1, params);
|
||||
if (!success) {
|
||||
allStepsSucceeded = false;
|
||||
break; // Stop workflow on failure
|
||||
const condOk = !step.condition || evalCondition(step.condition, execState.state, execState.params);
|
||||
if (condOk) {
|
||||
const success = await executeCommandStep(executionId, step, currentIndex + 1, params);
|
||||
if (!success) {
|
||||
await updateExecutionStatus(executionId, 'failed');
|
||||
return;
|
||||
}
|
||||
} else {
|
||||
await addExecutionLog(executionId, {
|
||||
step: currentIndex + 1, action: 'step_skipped',
|
||||
reason: 'condition false', timestamp: new Date().toISOString()
|
||||
});
|
||||
}
|
||||
} else if (step.type === 'wait') {
|
||||
// Wait step (delay in seconds)
|
||||
const seconds = step.duration || 5;
|
||||
await addExecutionLog(executionId, {
|
||||
step: i + 1,
|
||||
action: 'waiting',
|
||||
duration: seconds,
|
||||
timestamp: new Date().toISOString()
|
||||
});
|
||||
await new Promise(resolve => setTimeout(resolve, seconds * 1000));
|
||||
|
||||
} else if (step.type === 'prompt') {
|
||||
// Interactive prompt step (not fully implemented, would need user interaction)
|
||||
const response = await executePromptStep(executionId, step, currentIndex + 1);
|
||||
if (response !== null) {
|
||||
execState.state[step.key || 'lastPrompt'] = response;
|
||||
execState.state.promptResponse = response; // backwards compat
|
||||
// Prompt routes override goto
|
||||
if (step.routes && step.routes[response]) {
|
||||
gotoId = step.routes[response];
|
||||
}
|
||||
}
|
||||
|
||||
} else if (step.type === 'wait') {
|
||||
const ms = (step.duration || 5) * 1000;
|
||||
await addExecutionLog(executionId, {
|
||||
step: i + 1,
|
||||
action: 'prompt_skipped',
|
||||
message: 'Interactive prompts not yet supported',
|
||||
step: currentIndex + 1, action: 'waiting', duration_ms: ms,
|
||||
timestamp: new Date().toISOString()
|
||||
});
|
||||
await new Promise(r => setTimeout(r, ms));
|
||||
}
|
||||
|
||||
// Add step completion log
|
||||
await addExecutionLog(executionId, {
|
||||
step: i + 1,
|
||||
step_name: step.name,
|
||||
action: 'step_completed',
|
||||
step: currentIndex + 1, step_name: stepLabel, action: 'step_completed',
|
||||
timestamp: new Date().toISOString()
|
||||
});
|
||||
broadcast({ type: 'workflow_step_completed', execution_id: executionId,
|
||||
step: currentIndex + 1, step_name: stepLabel });
|
||||
|
||||
broadcast({
|
||||
type: 'workflow_step_completed',
|
||||
execution_id: executionId,
|
||||
step: i + 1,
|
||||
step_name: step.name
|
||||
});
|
||||
// Determine next step
|
||||
if (gotoId === 'end' || gotoId === '__end__') {
|
||||
break;
|
||||
} else if (gotoId) {
|
||||
if (stepIdMap.has(gotoId)) {
|
||||
currentIndex = stepIdMap.get(gotoId);
|
||||
} else {
|
||||
await addExecutionLog(executionId, {
|
||||
action: 'goto_error', target: gotoId,
|
||||
message: `No step with id "${gotoId}" found`,
|
||||
timestamp: new Date().toISOString()
|
||||
});
|
||||
break;
|
||||
}
|
||||
} else {
|
||||
currentIndex++;
|
||||
}
|
||||
}
|
||||
|
||||
// Mark execution as completed or failed
|
||||
const finalStatus = allStepsSucceeded ? 'completed' : 'failed';
|
||||
await updateExecutionStatus(executionId, finalStatus);
|
||||
|
||||
console.log(`[Workflow] Execution ${executionId} finished with status: ${finalStatus}`);
|
||||
await updateExecutionStatus(executionId, 'completed');
|
||||
console.log(`[Workflow] Execution ${executionId} completed`);
|
||||
|
||||
} catch (error) {
|
||||
console.error(`[Workflow] Execution ${executionId} error:`, error);
|
||||
await addExecutionLog(executionId, {
|
||||
action: 'workflow_error',
|
||||
error: error.message,
|
||||
action: 'workflow_error', error: error.message,
|
||||
timestamp: new Date().toISOString()
|
||||
});
|
||||
await updateExecutionStatus(executionId, 'failed');
|
||||
} finally {
|
||||
_executionState.delete(executionId);
|
||||
_executionPrompts.delete(executionId);
|
||||
}
|
||||
}
|
||||
|
||||
// Pause execution and wait for user to respond via POST /api/executions/:id/respond.
|
||||
// Resolves with the chosen option string, or null on 60-minute timeout.
|
||||
async function executePromptStep(executionId, step, stepNumber) {
|
||||
const message = step.message || 'Please choose an option:';
|
||||
const options = step.options || ['Continue'];
|
||||
|
||||
await addExecutionLog(executionId, {
|
||||
step: stepNumber, step_name: step.name, action: 'prompt',
|
||||
message, options, timestamp: new Date().toISOString()
|
||||
});
|
||||
|
||||
broadcast({
|
||||
type: 'execution_prompt',
|
||||
execution_id: executionId,
|
||||
prompt: { message, options, step: stepNumber, step_name: step.name }
|
||||
});
|
||||
|
||||
return new Promise(resolve => {
|
||||
const timer = setTimeout(() => {
|
||||
_executionPrompts.delete(executionId);
|
||||
console.warn(`[Workflow] Prompt timed out for execution ${executionId}`);
|
||||
resolve(null);
|
||||
}, 60 * 60 * 1000); // 60 minute timeout
|
||||
|
||||
_executionPrompts.set(executionId, (response) => {
|
||||
clearTimeout(timer);
|
||||
_executionPrompts.delete(executionId);
|
||||
resolve(response);
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
async function executeCommandStep(executionId, step, stepNumber, params = {}) {
|
||||
try {
|
||||
let command = step.command;
|
||||
@@ -755,6 +824,17 @@ app.get('/api/workflows', authenticateSSO, async (req, res) => {
|
||||
}
|
||||
});
|
||||
|
||||
app.get('/api/workflows/:id', authenticateSSO, async (req, res) => {
|
||||
try {
|
||||
const [rows] = await pool.query('SELECT * FROM workflows WHERE id = ?', [req.params.id]);
|
||||
if (rows.length === 0) return res.status(404).json({ error: 'Not found' });
|
||||
const wf = rows[0];
|
||||
res.json({ ...wf, definition: JSON.parse(wf.definition || '{}') });
|
||||
} catch (error) {
|
||||
res.status(500).json({ error: error.message });
|
||||
}
|
||||
});
|
||||
|
||||
app.post('/api/workflows', authenticateSSO, async (req, res) => {
|
||||
try {
|
||||
const { name, description, definition } = req.body;
|
||||
@@ -942,6 +1022,10 @@ app.post('/api/executions/:id/abort', authenticateSSO, async (req, res) => {
|
||||
// Update execution status to failed
|
||||
await updateExecutionStatus(executionId, 'failed');
|
||||
|
||||
// Unblock any pending prompt so the thread can exit
|
||||
const pending = _executionPrompts.get(executionId);
|
||||
if (pending) pending(null);
|
||||
|
||||
console.log(`[Execution] Execution ${executionId} aborted by ${req.user.username}`);
|
||||
|
||||
res.json({ success: true });
|
||||
@@ -951,6 +1035,64 @@ app.post('/api/executions/:id/abort', authenticateSSO, async (req, res) => {
|
||||
}
|
||||
});
|
||||
|
||||
// Respond to a pending prompt in a running execution
|
||||
app.post('/api/executions/:id/respond', authenticateSSO, async (req, res) => {
|
||||
try {
|
||||
const { id } = req.params;
|
||||
const { response } = req.body;
|
||||
|
||||
if (!response) return res.status(400).json({ error: 'response is required' });
|
||||
|
||||
const pending = _executionPrompts.get(id);
|
||||
if (!pending) return res.status(404).json({ error: 'No pending prompt for this execution' });
|
||||
|
||||
await addExecutionLog(id, {
|
||||
action: 'prompt_response', response,
|
||||
responded_by: req.user.username,
|
||||
timestamp: new Date().toISOString()
|
||||
});
|
||||
|
||||
broadcast({ type: 'prompt_response', execution_id: id, response });
|
||||
|
||||
pending(response);
|
||||
res.json({ success: true });
|
||||
} catch (error) {
|
||||
res.status(500).json({ error: error.message });
|
||||
}
|
||||
});
|
||||
|
||||
// Edit a workflow definition (admin only)
|
||||
app.put('/api/workflows/:id', authenticateSSO, async (req, res) => {
|
||||
try {
|
||||
if (!req.user.isAdmin) return res.status(403).json({ error: 'Admin only' });
|
||||
|
||||
const { id } = req.params;
|
||||
const { name, description, definition } = req.body;
|
||||
|
||||
if (!name || !definition) return res.status(400).json({ error: 'name and definition required' });
|
||||
|
||||
// Validate definition is parseable JSON
|
||||
let defObj;
|
||||
try {
|
||||
defObj = typeof definition === 'string' ? JSON.parse(definition) : definition;
|
||||
} catch (e) {
|
||||
return res.status(400).json({ error: 'Invalid JSON in definition' });
|
||||
}
|
||||
|
||||
const [result] = await pool.query(
|
||||
'UPDATE workflows SET name=?, description=?, definition=?, updated_at=NOW() WHERE id=?',
|
||||
[name, description || '', JSON.stringify(defObj), id]
|
||||
);
|
||||
|
||||
if (result.affectedRows === 0) return res.status(404).json({ error: 'Workflow not found' });
|
||||
|
||||
broadcast({ type: 'workflow_updated', workflow_id: id });
|
||||
res.json({ success: true });
|
||||
} catch (error) {
|
||||
res.status(500).json({ error: error.message });
|
||||
}
|
||||
});
|
||||
|
||||
// Scheduled Commands API
|
||||
app.get('/api/scheduled-commands', authenticateSSO, async (req, res) => {
|
||||
try {
|
||||
@@ -1120,10 +1262,23 @@ app.get('/api/executions/:id', authenticateSSO, async (req, res) => {
|
||||
}
|
||||
|
||||
const execution = rows[0];
|
||||
const parsedLogs = JSON.parse(execution.logs || '[]');
|
||||
const waitingForInput = _executionPrompts.has(req.params.id);
|
||||
let pendingPrompt = null;
|
||||
if (waitingForInput) {
|
||||
for (let i = parsedLogs.length - 1; i >= 0; i--) {
|
||||
if (parsedLogs[i].action === 'prompt') {
|
||||
pendingPrompt = { message: parsedLogs[i].message, options: parsedLogs[i].options };
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
res.json({
|
||||
...execution,
|
||||
logs: JSON.parse(execution.logs || '[]')
|
||||
logs: parsedLogs,
|
||||
waiting_for_input: waitingForInput,
|
||||
prompt: pendingPrompt,
|
||||
});
|
||||
} catch (error) {
|
||||
res.status(500).json({ error: error.message });
|
||||
|
||||
Reference in New Issue
Block a user