`;
@@ -1913,21 +1958,29 @@
if (execution.waiting_for_input && execution.prompt) {
html += `
-
⏳ Waiting for Input
-
${execution.prompt.message}
-
- ${execution.prompt.options.map(opt =>
- `
`
+
Waiting for Input
+
${escapeHtml(execution.prompt.message || '')}
+
+ ${(execution.prompt.options || []).map(opt =>
+ ``
).join('')}
`;
}
-
+
if (execution.logs && execution.logs.length > 0) {
html += '
Execution Logs:
';
- execution.logs.forEach(log => {
- html += formatLogEntry(log);
+ // Pass executionId only to prompt steps that are still pending (no response after them)
+ const logs = execution.logs;
+ logs.forEach((log, idx) => {
+ let promptExecId = null;
+ if (log.action === 'prompt' && execution.waiting_for_input) {
+ // Only make interactive if this is the last prompt and no response follows
+ const hasResponse = logs.slice(idx + 1).some(l => l.action === 'prompt_response');
+ if (!hasResponse) promptExecId = executionId;
+ }
+ html += formatLogEntry(log, promptExecId);
});
}
@@ -1960,7 +2013,7 @@
}
}
- function formatLogEntry(log) {
+ function formatLogEntry(log, executionId = null) {
const timestamp = new Date(log.timestamp).toLocaleTimeString();
// Format based on log action type
@@ -2070,6 +2123,43 @@
`;
}
+ if (log.action === 'prompt') {
+ const optionsHtml = (log.options || []).map(opt => {
+ if (executionId) {
+ return `
`;
+ }
+ return `
`;
+ }).join('');
+ return `
+
+
[${timestamp}]
+
❓ Step ${log.step}: ${escapeHtml(log.step_name || 'Prompt')}
+
+
${escapeHtml(log.message || '')}
+
${optionsHtml}
+
+
+ `;
+ }
+
+ if (log.action === 'prompt_response') {
+ return `
+
+
[${timestamp}]
+
↪ Response: ${escapeHtml(log.response || '')}${log.responded_by ? `by ${escapeHtml(log.responded_by)}` : ''}
+
+ `;
+ }
+
+ if (log.action === 'step_skipped') {
+ return `
+
+
[${timestamp}]
+
⊘ Step ${log.step} Skipped${log.reason ? ': ' + escapeHtml(log.reason) : ''}
+
+ `;
+ }
+
// Fallback for unknown log types
return `
${JSON.stringify(log, null, 2)}`;
}
@@ -2184,17 +2274,18 @@
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ response })
});
-
+
if (res.ok) {
- closeModal('viewExecutionModal');
- alert('Response submitted!');
- refreshData();
+ showTerminalNotification(`Response submitted: ${response}`, 'success');
+ // Refresh the modal to show the next step
+ viewExecution(executionId);
} else {
- alert('Failed to submit response');
+ const data = await res.json().catch(() => ({}));
+ showTerminalNotification(data.error || 'Failed to submit response', 'error');
}
} catch (error) {
console.error('Error responding to prompt:', error);
- alert('Error submitting response');
+ showTerminalNotification('Error submitting response', 'error');
}
}
@@ -2348,6 +2439,68 @@
}
}
+ async function editWorkflow(workflowId) {
+ try {
+ const response = await fetch(`/api/workflows/${workflowId}`);
+ if (!response.ok) throw new Error('Workflow not found');
+ const wf = await response.json();
+ document.getElementById('editWorkflowId').value = wf.id;
+ document.getElementById('editWorkflowName').value = wf.name;
+ document.getElementById('editWorkflowDescription').value = wf.description || '';
+ document.getElementById('editWorkflowDefinition').value = JSON.stringify(wf.definition, null, 2);
+ document.getElementById('editWorkflowError').style.display = 'none';
+ document.getElementById('editWorkflowModal').classList.add('show');
+ } catch (error) {
+ console.error('Error loading workflow for edit:', error);
+ showTerminalNotification('Error loading workflow', 'error');
+ }
+ }
+
+ async function saveWorkflow() {
+ const id = document.getElementById('editWorkflowId').value;
+ const name = document.getElementById('editWorkflowName').value.trim();
+ const description = document.getElementById('editWorkflowDescription').value.trim();
+ const definitionText = document.getElementById('editWorkflowDefinition').value;
+ const errorEl = document.getElementById('editWorkflowError');
+
+ if (!name) {
+ errorEl.textContent = 'Name is required';
+ errorEl.style.display = 'block';
+ return;
+ }
+
+ let definition;
+ try {
+ definition = JSON.parse(definitionText);
+ } catch (e) {
+ errorEl.textContent = 'Invalid JSON: ' + e.message;
+ errorEl.style.display = 'block';
+ return;
+ }
+ errorEl.style.display = 'none';
+
+ try {
+ const response = await fetch(`/api/workflows/${id}`, {
+ method: 'PUT',
+ headers: { 'Content-Type': 'application/json' },
+ body: JSON.stringify({ name, description, definition })
+ });
+
+ if (response.ok) {
+ closeModal('editWorkflowModal');
+ showTerminalNotification('Workflow saved!', 'success');
+ loadWorkflows();
+ } else {
+ const data = await response.json().catch(() => ({}));
+ errorEl.textContent = data.error || 'Failed to save workflow';
+ errorEl.style.display = 'block';
+ }
+ } catch (error) {
+ errorEl.textContent = 'Error saving workflow: ' + error.message;
+ errorEl.style.display = 'block';
+ }
+ }
+
function showCreateWorkflow() {
document.getElementById('createWorkflowModal').classList.add('show');
}
@@ -2715,8 +2868,25 @@
loadWorkflows();
}
+ if (data.type === 'workflow_updated') {
+ loadWorkflows();
+ }
+
+ if (data.type === 'execution_prompt') {
+ // If this execution is currently open, refresh to show the prompt
+ const executionModal = document.getElementById('viewExecutionModal');
+ if (executionModal && executionModal.classList.contains('show')) {
+ const currentId = executionModal.dataset.executionId;
+ if (currentId === data.execution_id) {
+ viewExecution(data.execution_id);
+ }
+ }
+ // Also update execution list so status indicators refresh
+ loadExecutions();
+ }
+
// Generic refresh for other message types
- if (!['command_result', 'workflow_result', 'worker_update', 'execution_started', 'execution_status', 'workflow_created', 'workflow_deleted'].includes(data.type)) {
+ if (!['command_result', 'workflow_result', 'worker_update', 'execution_started', 'execution_status', 'workflow_created', 'workflow_deleted', 'workflow_updated', 'execution_prompt'].includes(data.type)) {
refreshData();
}
} catch (error) {
diff --git a/server.js b/server.js
index ebf48ba..13920f1 100644
--- a/server.js
+++ b/server.js
@@ -508,92 +508,161 @@ function applyParams(command, params) {
});
}
+// Evaluate a condition string against execution state and params.
+function evalCondition(condition, state, params) {
+ try {
+ // eslint-disable-next-line no-new-func
+ return !!new Function('state', 'params', `return !!(${condition})`)(state, params);
+ } catch (e) {
+ return false;
+ }
+}
+
+// Per-execution mutable state (params + user-keyed state dict).
+// Survives across step boundaries; cleaned up when execution ends.
+const _executionState = new Map(); // executionId → { params, state }
+
+// Pending prompt resolvers — set when a prompt step is waiting for user input.
+const _executionPrompts = new Map(); // executionId → resolve fn
+
async function executeWorkflowSteps(executionId, workflowId, definition, username, params = {}) {
+ _executionState.set(executionId, { params, state: {} });
+
try {
console.log(`[Workflow] Starting execution ${executionId} for workflow ${workflowId}`);
const steps = definition.steps || [];
- let allStepsSucceeded = true;
- for (let i = 0; i < steps.length; i++) {
- const step = steps[i];
- console.log(`[Workflow] Execution ${executionId} - Step ${i + 1}/${steps.length}: ${step.name}`);
+ // Build step-id → index map for goto support
+ const stepIdMap = new Map();
+ steps.forEach((step, i) => { if (step.id) stepIdMap.set(step.id, i); });
+
+ let currentIndex = 0;
+
+ while (currentIndex < steps.length) {
+ const step = steps[currentIndex];
+ const execState = _executionState.get(executionId);
+ const stepLabel = step.name || step.id || `Step ${currentIndex + 1}`;
+ console.log(`[Workflow] ${executionId} — step ${currentIndex + 1}: ${stepLabel}`);
- // Add step start log
await addExecutionLog(executionId, {
- step: i + 1,
- step_name: step.name,
- action: 'step_started',
+ step: currentIndex + 1, step_name: stepLabel, action: 'step_started',
timestamp: new Date().toISOString()
});
+ broadcast({ type: 'workflow_step_started', execution_id: executionId,
+ step: currentIndex + 1, step_name: stepLabel });
- broadcast({
- type: 'workflow_step_started',
- execution_id: executionId,
- step: i + 1,
- step_name: step.name
- });
+ let gotoId = step.goto || null; // may be overridden by prompt routes
if (step.type === 'execute') {
- // Execute command step
- const success = await executeCommandStep(executionId, step, i + 1, params);
- if (!success) {
- allStepsSucceeded = false;
- break; // Stop workflow on failure
+ const condOk = !step.condition || evalCondition(step.condition, execState.state, execState.params);
+ if (condOk) {
+ const success = await executeCommandStep(executionId, step, currentIndex + 1, params);
+ if (!success) {
+ await updateExecutionStatus(executionId, 'failed');
+ return;
+ }
+ } else {
+ await addExecutionLog(executionId, {
+ step: currentIndex + 1, action: 'step_skipped',
+ reason: 'condition false', timestamp: new Date().toISOString()
+ });
}
- } else if (step.type === 'wait') {
- // Wait step (delay in seconds)
- const seconds = step.duration || 5;
- await addExecutionLog(executionId, {
- step: i + 1,
- action: 'waiting',
- duration: seconds,
- timestamp: new Date().toISOString()
- });
- await new Promise(resolve => setTimeout(resolve, seconds * 1000));
+
} else if (step.type === 'prompt') {
- // Interactive prompt step (not fully implemented, would need user interaction)
+ const response = await executePromptStep(executionId, step, currentIndex + 1);
+ if (response !== null) {
+ execState.state[step.key || 'lastPrompt'] = response;
+ execState.state.promptResponse = response; // backwards compat
+ // Prompt routes override goto
+ if (step.routes && step.routes[response]) {
+ gotoId = step.routes[response];
+ }
+ }
+
+ } else if (step.type === 'wait') {
+ const ms = (step.duration || 5) * 1000;
await addExecutionLog(executionId, {
- step: i + 1,
- action: 'prompt_skipped',
- message: 'Interactive prompts not yet supported',
+ step: currentIndex + 1, action: 'waiting', duration_ms: ms,
timestamp: new Date().toISOString()
});
+ await new Promise(r => setTimeout(r, ms));
}
- // Add step completion log
await addExecutionLog(executionId, {
- step: i + 1,
- step_name: step.name,
- action: 'step_completed',
+ step: currentIndex + 1, step_name: stepLabel, action: 'step_completed',
timestamp: new Date().toISOString()
});
+ broadcast({ type: 'workflow_step_completed', execution_id: executionId,
+ step: currentIndex + 1, step_name: stepLabel });
- broadcast({
- type: 'workflow_step_completed',
- execution_id: executionId,
- step: i + 1,
- step_name: step.name
- });
+ // Determine next step
+ if (gotoId === 'end' || gotoId === '__end__') {
+ break;
+ } else if (gotoId) {
+ if (stepIdMap.has(gotoId)) {
+ currentIndex = stepIdMap.get(gotoId);
+ } else {
+ await addExecutionLog(executionId, {
+ action: 'goto_error', target: gotoId,
+ message: `No step with id "${gotoId}" found`,
+ timestamp: new Date().toISOString()
+ });
+ break;
+ }
+ } else {
+ currentIndex++;
+ }
}
- // Mark execution as completed or failed
- const finalStatus = allStepsSucceeded ? 'completed' : 'failed';
- await updateExecutionStatus(executionId, finalStatus);
-
- console.log(`[Workflow] Execution ${executionId} finished with status: ${finalStatus}`);
+ await updateExecutionStatus(executionId, 'completed');
+ console.log(`[Workflow] Execution ${executionId} completed`);
} catch (error) {
console.error(`[Workflow] Execution ${executionId} error:`, error);
await addExecutionLog(executionId, {
- action: 'workflow_error',
- error: error.message,
+ action: 'workflow_error', error: error.message,
timestamp: new Date().toISOString()
});
await updateExecutionStatus(executionId, 'failed');
+ } finally {
+ _executionState.delete(executionId);
+ _executionPrompts.delete(executionId);
}
}
+// Pause execution and wait for user to respond via POST /api/executions/:id/respond.
+// Resolves with the chosen option string, or null on 60-minute timeout.
+async function executePromptStep(executionId, step, stepNumber) {
+ const message = step.message || 'Please choose an option:';
+ const options = step.options || ['Continue'];
+
+ await addExecutionLog(executionId, {
+ step: stepNumber, step_name: step.name, action: 'prompt',
+ message, options, timestamp: new Date().toISOString()
+ });
+
+ broadcast({
+ type: 'execution_prompt',
+ execution_id: executionId,
+ prompt: { message, options, step: stepNumber, step_name: step.name }
+ });
+
+ return new Promise(resolve => {
+ const timer = setTimeout(() => {
+ _executionPrompts.delete(executionId);
+ console.warn(`[Workflow] Prompt timed out for execution ${executionId}`);
+ resolve(null);
+ }, 60 * 60 * 1000); // 60 minute timeout
+
+ _executionPrompts.set(executionId, (response) => {
+ clearTimeout(timer);
+ _executionPrompts.delete(executionId);
+ resolve(response);
+ });
+ });
+}
+
async function executeCommandStep(executionId, step, stepNumber, params = {}) {
try {
let command = step.command;
@@ -755,6 +824,17 @@ app.get('/api/workflows', authenticateSSO, async (req, res) => {
}
});
+app.get('/api/workflows/:id', authenticateSSO, async (req, res) => {
+ try {
+ const [rows] = await pool.query('SELECT * FROM workflows WHERE id = ?', [req.params.id]);
+ if (rows.length === 0) return res.status(404).json({ error: 'Not found' });
+ const wf = rows[0];
+ res.json({ ...wf, definition: JSON.parse(wf.definition || '{}') });
+ } catch (error) {
+ res.status(500).json({ error: error.message });
+ }
+});
+
app.post('/api/workflows', authenticateSSO, async (req, res) => {
try {
const { name, description, definition } = req.body;
@@ -942,6 +1022,10 @@ app.post('/api/executions/:id/abort', authenticateSSO, async (req, res) => {
// Update execution status to failed
await updateExecutionStatus(executionId, 'failed');
+ // Unblock any pending prompt so the thread can exit
+ const pending = _executionPrompts.get(executionId);
+ if (pending) pending(null);
+
console.log(`[Execution] Execution ${executionId} aborted by ${req.user.username}`);
res.json({ success: true });
@@ -951,6 +1035,64 @@ app.post('/api/executions/:id/abort', authenticateSSO, async (req, res) => {
}
});
+// Respond to a pending prompt in a running execution
+app.post('/api/executions/:id/respond', authenticateSSO, async (req, res) => {
+ try {
+ const { id } = req.params;
+ const { response } = req.body;
+
+ if (!response) return res.status(400).json({ error: 'response is required' });
+
+ const pending = _executionPrompts.get(id);
+ if (!pending) return res.status(404).json({ error: 'No pending prompt for this execution' });
+
+ await addExecutionLog(id, {
+ action: 'prompt_response', response,
+ responded_by: req.user.username,
+ timestamp: new Date().toISOString()
+ });
+
+ broadcast({ type: 'prompt_response', execution_id: id, response });
+
+ pending(response);
+ res.json({ success: true });
+ } catch (error) {
+ res.status(500).json({ error: error.message });
+ }
+});
+
+// Edit a workflow definition (admin only)
+app.put('/api/workflows/:id', authenticateSSO, async (req, res) => {
+ try {
+ if (!req.user.isAdmin) return res.status(403).json({ error: 'Admin only' });
+
+ const { id } = req.params;
+ const { name, description, definition } = req.body;
+
+ if (!name || !definition) return res.status(400).json({ error: 'name and definition required' });
+
+ // Validate definition is parseable JSON
+ let defObj;
+ try {
+ defObj = typeof definition === 'string' ? JSON.parse(definition) : definition;
+ } catch (e) {
+ return res.status(400).json({ error: 'Invalid JSON in definition' });
+ }
+
+ const [result] = await pool.query(
+ 'UPDATE workflows SET name=?, description=?, definition=?, updated_at=NOW() WHERE id=?',
+ [name, description || '', JSON.stringify(defObj), id]
+ );
+
+ if (result.affectedRows === 0) return res.status(404).json({ error: 'Workflow not found' });
+
+ broadcast({ type: 'workflow_updated', workflow_id: id });
+ res.json({ success: true });
+ } catch (error) {
+ res.status(500).json({ error: error.message });
+ }
+});
+
// Scheduled Commands API
app.get('/api/scheduled-commands', authenticateSSO, async (req, res) => {
try {
@@ -1120,10 +1262,23 @@ app.get('/api/executions/:id', authenticateSSO, async (req, res) => {
}
const execution = rows[0];
+ const parsedLogs = JSON.parse(execution.logs || '[]');
+ const waitingForInput = _executionPrompts.has(req.params.id);
+ let pendingPrompt = null;
+ if (waitingForInput) {
+ for (let i = parsedLogs.length - 1; i >= 0; i--) {
+ if (parsedLogs[i].action === 'prompt') {
+ pendingPrompt = { message: parsedLogs[i].message, options: parsedLogs[i].options };
+ break;
+ }
+ }
+ }
res.json({
...execution,
- logs: JSON.parse(execution.logs || '[]')
+ logs: parsedLogs,
+ waiting_for_input: waitingForInput,
+ prompt: pendingPrompt,
});
} catch (error) {
res.status(500).json({ error: error.message });