Compare commits
23 Commits
main
...
56f8047322
| Author | SHA1 | Date | |
|---|---|---|---|
| 56f8047322 | |||
| bf908568e3 | |||
| ffa124ed9d | |||
| 84edea8027 | |||
| 095bfb65ab | |||
| c619add705 | |||
| e6a6b7e359 | |||
| d25ba27f24 | |||
| 76b0a6d0d3 | |||
| adbbec2631 | |||
| 8b8d2c6312 | |||
| 1f5c84f327 | |||
| e03f8d6287 | |||
| 2097b73404 | |||
| 6d15e4d240 | |||
| 7896b40d91 | |||
| e2dc371bfe | |||
| df0184facf | |||
| a8be111e04 | |||
| b3806545bd | |||
| 2767087e27 | |||
| a1cf8ac90b | |||
| 9e842624e1 |
@@ -1780,11 +1780,6 @@
|
|||||||
// Add action buttons
|
// Add action buttons
|
||||||
html += '<div style="margin-top: 20px; padding-top: 15px; border-top: 1px solid var(--terminal-green); display: flex; gap: 10px;">';
|
html += '<div style="margin-top: 20px; padding-top: 15px; border-top: 1px solid var(--terminal-green); display: flex; gap: 10px;">';
|
||||||
|
|
||||||
// Abort button (only for running executions)
|
|
||||||
if (execution.status === 'running') {
|
|
||||||
html += `<button onclick="abortExecution('${executionId}')" style="background-color: var(--terminal-red); border-color: var(--terminal-red);">[ ⛔ Abort Execution ]</button>`;
|
|
||||||
}
|
|
||||||
|
|
||||||
// Re-run button (only for quick commands with command in logs)
|
// Re-run button (only for quick commands with command in logs)
|
||||||
const commandLog = execution.logs?.find(l => l.action === 'command_sent');
|
const commandLog = execution.logs?.find(l => l.action === 'command_sent');
|
||||||
if (commandLog && commandLog.command) {
|
if (commandLog && commandLog.command) {
|
||||||
@@ -1840,82 +1835,6 @@
|
|||||||
`;
|
`;
|
||||||
}
|
}
|
||||||
|
|
||||||
// Workflow step logs
|
|
||||||
if (log.action === 'step_started') {
|
|
||||||
return `
|
|
||||||
<div class="log-entry" style="border-left-color: var(--terminal-amber);">
|
|
||||||
<div class="log-timestamp">[${timestamp}]</div>
|
|
||||||
<div class="log-title" style="color: var(--terminal-amber);">▶️ Step ${log.step}: ${log.step_name}</div>
|
|
||||||
</div>
|
|
||||||
`;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (log.action === 'step_completed') {
|
|
||||||
return `
|
|
||||||
<div class="log-entry" style="border-left-color: var(--terminal-green);">
|
|
||||||
<div class="log-timestamp">[${timestamp}]</div>
|
|
||||||
<div class="log-title" style="color: var(--terminal-green);">✓ Step ${log.step} Completed: ${log.step_name}</div>
|
|
||||||
</div>
|
|
||||||
`;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (log.action === 'waiting') {
|
|
||||||
return `
|
|
||||||
<div class="log-entry" style="border-left-color: var(--terminal-amber);">
|
|
||||||
<div class="log-timestamp">[${timestamp}]</div>
|
|
||||||
<div class="log-title" style="color: var(--terminal-amber);">⏳ Waiting ${log.duration} seconds...</div>
|
|
||||||
</div>
|
|
||||||
`;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (log.action === 'no_workers') {
|
|
||||||
return `
|
|
||||||
<div class="log-entry failed">
|
|
||||||
<div class="log-timestamp">[${timestamp}]</div>
|
|
||||||
<div class="log-title">✗ Step ${log.step}: No Workers Available</div>
|
|
||||||
<div class="log-details">
|
|
||||||
<div class="log-field">${escapeHtml(log.message)}</div>
|
|
||||||
</div>
|
|
||||||
</div>
|
|
||||||
`;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (log.action === 'worker_offline') {
|
|
||||||
return `
|
|
||||||
<div class="log-entry" style="border-left-color: #ff4444;">
|
|
||||||
<div class="log-timestamp">[${timestamp}]</div>
|
|
||||||
<div class="log-title" style="color: #ff4444;">⚠️ Worker Offline</div>
|
|
||||||
<div class="log-details">
|
|
||||||
<div class="log-field"><span class="log-label">Worker ID:</span> ${log.worker_id}</div>
|
|
||||||
</div>
|
|
||||||
</div>
|
|
||||||
`;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (log.action === 'workflow_error') {
|
|
||||||
return `
|
|
||||||
<div class="log-entry failed">
|
|
||||||
<div class="log-timestamp">[${timestamp}]</div>
|
|
||||||
<div class="log-title">✗ Workflow Error</div>
|
|
||||||
<div class="log-details">
|
|
||||||
<div class="log-field"><span class="log-label">Error:</span> ${escapeHtml(log.error)}</div>
|
|
||||||
</div>
|
|
||||||
</div>
|
|
||||||
`;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (log.action === 'execution_aborted') {
|
|
||||||
return `
|
|
||||||
<div class="log-entry" style="border-left-color: var(--terminal-red);">
|
|
||||||
<div class="log-timestamp">[${timestamp}]</div>
|
|
||||||
<div class="log-title" style="color: var(--terminal-red);">⛔ Execution Aborted</div>
|
|
||||||
<div class="log-details">
|
|
||||||
<div class="log-field"><span class="log-label">Aborted by:</span> ${escapeHtml(log.aborted_by)}</div>
|
|
||||||
</div>
|
|
||||||
</div>
|
|
||||||
`;
|
|
||||||
}
|
|
||||||
|
|
||||||
// Fallback for unknown log types
|
// Fallback for unknown log types
|
||||||
return `<div class="log-entry"><pre>${JSON.stringify(log, null, 2)}</pre></div>`;
|
return `<div class="log-entry"><pre>${JSON.stringify(log, null, 2)}</pre></div>`;
|
||||||
}
|
}
|
||||||
@@ -1969,28 +1888,6 @@
|
|||||||
document.getElementById('quickCommand').scrollIntoView({ behavior: 'smooth' });
|
document.getElementById('quickCommand').scrollIntoView({ behavior: 'smooth' });
|
||||||
}
|
}
|
||||||
|
|
||||||
async function abortExecution(executionId) {
|
|
||||||
if (!confirm('Are you sure you want to abort this execution? This will mark it as failed.')) return;
|
|
||||||
|
|
||||||
try {
|
|
||||||
const response = await fetch(`/api/executions/${executionId}/abort`, {
|
|
||||||
method: 'POST',
|
|
||||||
headers: { 'Content-Type': 'application/json' }
|
|
||||||
});
|
|
||||||
|
|
||||||
if (response.ok) {
|
|
||||||
showTerminalNotification('Execution aborted', 'success');
|
|
||||||
closeModal('viewExecutionModal');
|
|
||||||
refreshData();
|
|
||||||
} else {
|
|
||||||
alert('Failed to abort execution');
|
|
||||||
}
|
|
||||||
} catch (error) {
|
|
||||||
console.error('Error aborting execution:', error);
|
|
||||||
alert('Error aborting execution');
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
async function downloadExecutionLogs(executionId) {
|
async function downloadExecutionLogs(executionId) {
|
||||||
try {
|
try {
|
||||||
const response = await fetch(`/api/executions/${executionId}`);
|
const response = await fetch(`/api/executions/${executionId}`);
|
||||||
@@ -2208,31 +2105,25 @@
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
let definition;
|
|
||||||
try {
|
|
||||||
definition = JSON.parse(definitionText);
|
|
||||||
} catch (error) {
|
|
||||||
alert('Invalid JSON definition: ' + error.message);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
try {
|
try {
|
||||||
|
const definition = JSON.parse(definitionText);
|
||||||
|
|
||||||
const response = await fetch('/api/workflows', {
|
const response = await fetch('/api/workflows', {
|
||||||
method: 'POST',
|
method: 'POST',
|
||||||
headers: { 'Content-Type': 'application/json' },
|
headers: { 'Content-Type': 'application/json' },
|
||||||
body: JSON.stringify({ name, description, definition })
|
body: JSON.stringify({ name, description, definition })
|
||||||
});
|
});
|
||||||
|
|
||||||
if (response.ok) {
|
if (response.ok) {
|
||||||
|
alert('Workflow created!');
|
||||||
closeModal('createWorkflowModal');
|
closeModal('createWorkflowModal');
|
||||||
switchTab('workflows');
|
switchTab('workflows');
|
||||||
showTerminalNotification('Workflow created successfully!', 'success');
|
|
||||||
refreshData();
|
refreshData();
|
||||||
} else {
|
} else {
|
||||||
alert('Failed to create workflow');
|
alert('Failed to create workflow');
|
||||||
}
|
}
|
||||||
} catch (error) {
|
} catch (error) {
|
||||||
alert('Error creating workflow: ' + error.message);
|
alert('Invalid JSON definition: ' + error.message);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -2395,30 +2286,11 @@
|
|||||||
document.getElementById(tabName).classList.add('active');
|
document.getElementById(tabName).classList.add('active');
|
||||||
}
|
}
|
||||||
|
|
||||||
async function refreshData() {
|
function refreshData() {
|
||||||
try {
|
loadWorkers();
|
||||||
await loadWorkers();
|
loadWorkflows();
|
||||||
} catch (e) {
|
loadExecutions();
|
||||||
console.error('Error loading workers:', e);
|
loadSchedules();
|
||||||
}
|
|
||||||
|
|
||||||
try {
|
|
||||||
await loadWorkflows();
|
|
||||||
} catch (e) {
|
|
||||||
console.error('Error loading workflows:', e);
|
|
||||||
}
|
|
||||||
|
|
||||||
try {
|
|
||||||
await loadExecutions();
|
|
||||||
} catch (e) {
|
|
||||||
console.error('Error loading executions:', e);
|
|
||||||
}
|
|
||||||
|
|
||||||
try {
|
|
||||||
await loadSchedules();
|
|
||||||
} catch (e) {
|
|
||||||
console.error('Error loading schedules:', e);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Terminal beep sound (Web Audio API)
|
// Terminal beep sound (Web Audio API)
|
||||||
@@ -2497,9 +2369,8 @@
|
|||||||
ws = new WebSocket(`${protocol}//${window.location.host}`);
|
ws = new WebSocket(`${protocol}//${window.location.host}`);
|
||||||
|
|
||||||
ws.onmessage = (event) => {
|
ws.onmessage = (event) => {
|
||||||
try {
|
const data = JSON.parse(event.data);
|
||||||
const data = JSON.parse(event.data);
|
console.log('WebSocket message:', data);
|
||||||
console.log('WebSocket message:', data);
|
|
||||||
|
|
||||||
// Handle specific message types
|
// Handle specific message types
|
||||||
if (data.type === 'command_result') {
|
if (data.type === 'command_result') {
|
||||||
@@ -2561,13 +2432,9 @@
|
|||||||
loadWorkflows();
|
loadWorkflows();
|
||||||
}
|
}
|
||||||
|
|
||||||
// Generic refresh for other message types
|
// Generic refresh for other message types
|
||||||
if (!['command_result', 'workflow_result', 'worker_update', 'execution_started', 'execution_status', 'workflow_created', 'workflow_deleted'].includes(data.type)) {
|
if (!['command_result', 'workflow_result', 'worker_update', 'execution_started', 'execution_status', 'workflow_created', 'workflow_deleted'].includes(data.type)) {
|
||||||
refreshData();
|
refreshData();
|
||||||
}
|
|
||||||
} catch (error) {
|
|
||||||
console.error('Error handling WebSocket message:', error);
|
|
||||||
console.error('Stack trace:', error.stack);
|
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|||||||
655
server.js
655
server.js
@@ -241,13 +241,12 @@ wss.on('connection', (ws) => {
|
|||||||
|
|
||||||
if (message.type === 'command_result') {
|
if (message.type === 'command_result') {
|
||||||
// Handle command result from worker
|
// Handle command result from worker
|
||||||
const { execution_id, command_id, worker_id, success, stdout, stderr, duration, timestamp } = message;
|
const { execution_id, worker_id, success, stdout, stderr, duration, timestamp } = message;
|
||||||
|
|
||||||
// Add result to execution logs
|
// Add result to execution logs
|
||||||
await addExecutionLog(execution_id, {
|
await addExecutionLog(execution_id, {
|
||||||
step: 'command_execution',
|
step: 'command_execution',
|
||||||
action: 'command_result',
|
action: 'command_result',
|
||||||
command_id: command_id, // Include command_id for workflow tracking
|
|
||||||
worker_id: worker_id,
|
worker_id: worker_id,
|
||||||
success: success,
|
success: success,
|
||||||
stdout: stdout,
|
stdout: stdout,
|
||||||
@@ -256,14 +255,9 @@ wss.on('connection', (ws) => {
|
|||||||
timestamp: timestamp || new Date().toISOString()
|
timestamp: timestamp || new Date().toISOString()
|
||||||
});
|
});
|
||||||
|
|
||||||
// For non-workflow executions, update status immediately
|
// Update execution status to completed or failed
|
||||||
// For workflow executions, the workflow engine will update status
|
const finalStatus = success ? 'completed' : 'failed';
|
||||||
const [execution] = await pool.query('SELECT workflow_id FROM executions WHERE id = ?', [execution_id]);
|
await updateExecutionStatus(execution_id, finalStatus);
|
||||||
if (execution.length > 0 && !execution[0].workflow_id) {
|
|
||||||
// Only update status for quick commands (no workflow_id)
|
|
||||||
const finalStatus = success ? 'completed' : 'failed';
|
|
||||||
await updateExecutionStatus(execution_id, finalStatus);
|
|
||||||
}
|
|
||||||
|
|
||||||
// Broadcast result to all connected clients
|
// Broadcast result to all connected clients
|
||||||
broadcast({
|
broadcast({
|
||||||
@@ -275,7 +269,7 @@ wss.on('connection', (ws) => {
|
|||||||
stderr: stderr
|
stderr: stderr
|
||||||
});
|
});
|
||||||
|
|
||||||
console.log(`Command result received for execution ${execution_id}: ${success ? 'success' : 'failed'}`);
|
console.log(`Command result received for execution ${execution_id}: ${finalStatus}`);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (message.type === 'workflow_result') {
|
if (message.type === 'workflow_result') {
|
||||||
@@ -386,45 +380,6 @@ function broadcast(data) {
|
|||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
// Helper function to add log entry to execution
|
|
||||||
async function addExecutionLog(executionId, logEntry) {
|
|
||||||
try {
|
|
||||||
const [execution] = await pool.query('SELECT logs FROM executions WHERE id = ?', [executionId]);
|
|
||||||
|
|
||||||
if (execution.length > 0) {
|
|
||||||
const logs = typeof execution[0].logs === 'string' ? JSON.parse(execution[0].logs) : execution[0].logs;
|
|
||||||
logs.push(logEntry);
|
|
||||||
|
|
||||||
await pool.query(
|
|
||||||
'UPDATE executions SET logs = ? WHERE id = ?',
|
|
||||||
[JSON.stringify(logs), executionId]
|
|
||||||
);
|
|
||||||
}
|
|
||||||
} catch (error) {
|
|
||||||
console.error(`[Workflow] Error adding execution log:`, error);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Helper function to update execution status
|
|
||||||
async function updateExecutionStatus(executionId, status) {
|
|
||||||
try {
|
|
||||||
await pool.query(
|
|
||||||
'UPDATE executions SET status = ?, completed_at = NOW() WHERE id = ?',
|
|
||||||
[status, executionId]
|
|
||||||
);
|
|
||||||
|
|
||||||
broadcast({
|
|
||||||
type: 'execution_status',
|
|
||||||
execution_id: executionId,
|
|
||||||
status: status
|
|
||||||
});
|
|
||||||
|
|
||||||
console.log(`[Workflow] Execution ${executionId} status updated to: ${status}`);
|
|
||||||
} catch (error) {
|
|
||||||
console.error(`[Workflow] Error updating execution status:`, error);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Authelia SSO Middleware
|
// Authelia SSO Middleware
|
||||||
async function authenticateSSO(req, res, next) {
|
async function authenticateSSO(req, res, next) {
|
||||||
// Check for Authelia headers
|
// Check for Authelia headers
|
||||||
@@ -481,237 +436,6 @@ async function authenticateSSO(req, res, next) {
|
|||||||
next();
|
next();
|
||||||
}
|
}
|
||||||
|
|
||||||
// Workflow Execution Engine
|
|
||||||
async function executeWorkflowSteps(executionId, workflowId, definition, username) {
|
|
||||||
try {
|
|
||||||
console.log(`[Workflow] Starting execution ${executionId} for workflow ${workflowId}`);
|
|
||||||
|
|
||||||
const steps = definition.steps || [];
|
|
||||||
let allStepsSucceeded = true;
|
|
||||||
|
|
||||||
for (let i = 0; i < steps.length; i++) {
|
|
||||||
const step = steps[i];
|
|
||||||
console.log(`[Workflow] Execution ${executionId} - Step ${i + 1}/${steps.length}: ${step.name}`);
|
|
||||||
|
|
||||||
// Add step start log
|
|
||||||
await addExecutionLog(executionId, {
|
|
||||||
step: i + 1,
|
|
||||||
step_name: step.name,
|
|
||||||
action: 'step_started',
|
|
||||||
timestamp: new Date().toISOString()
|
|
||||||
});
|
|
||||||
|
|
||||||
broadcast({
|
|
||||||
type: 'workflow_step_started',
|
|
||||||
execution_id: executionId,
|
|
||||||
step: i + 1,
|
|
||||||
step_name: step.name
|
|
||||||
});
|
|
||||||
|
|
||||||
if (step.type === 'execute') {
|
|
||||||
// Execute command step
|
|
||||||
const success = await executeCommandStep(executionId, step, i + 1);
|
|
||||||
if (!success) {
|
|
||||||
allStepsSucceeded = false;
|
|
||||||
break; // Stop workflow on failure
|
|
||||||
}
|
|
||||||
} else if (step.type === 'wait') {
|
|
||||||
// Wait step (delay in seconds)
|
|
||||||
const seconds = step.duration || 5;
|
|
||||||
await addExecutionLog(executionId, {
|
|
||||||
step: i + 1,
|
|
||||||
action: 'waiting',
|
|
||||||
duration: seconds,
|
|
||||||
timestamp: new Date().toISOString()
|
|
||||||
});
|
|
||||||
await new Promise(resolve => setTimeout(resolve, seconds * 1000));
|
|
||||||
} else if (step.type === 'prompt') {
|
|
||||||
// Interactive prompt step (not fully implemented, would need user interaction)
|
|
||||||
await addExecutionLog(executionId, {
|
|
||||||
step: i + 1,
|
|
||||||
action: 'prompt_skipped',
|
|
||||||
message: 'Interactive prompts not yet supported',
|
|
||||||
timestamp: new Date().toISOString()
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
// Add step completion log
|
|
||||||
await addExecutionLog(executionId, {
|
|
||||||
step: i + 1,
|
|
||||||
step_name: step.name,
|
|
||||||
action: 'step_completed',
|
|
||||||
timestamp: new Date().toISOString()
|
|
||||||
});
|
|
||||||
|
|
||||||
broadcast({
|
|
||||||
type: 'workflow_step_completed',
|
|
||||||
execution_id: executionId,
|
|
||||||
step: i + 1,
|
|
||||||
step_name: step.name
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
// Mark execution as completed or failed
|
|
||||||
const finalStatus = allStepsSucceeded ? 'completed' : 'failed';
|
|
||||||
await updateExecutionStatus(executionId, finalStatus);
|
|
||||||
|
|
||||||
console.log(`[Workflow] Execution ${executionId} finished with status: ${finalStatus}`);
|
|
||||||
|
|
||||||
} catch (error) {
|
|
||||||
console.error(`[Workflow] Execution ${executionId} error:`, error);
|
|
||||||
await addExecutionLog(executionId, {
|
|
||||||
action: 'workflow_error',
|
|
||||||
error: error.message,
|
|
||||||
timestamp: new Date().toISOString()
|
|
||||||
});
|
|
||||||
await updateExecutionStatus(executionId, 'failed');
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
async function executeCommandStep(executionId, step, stepNumber) {
|
|
||||||
try {
|
|
||||||
const command = step.command;
|
|
||||||
const targets = step.targets || ['all'];
|
|
||||||
|
|
||||||
// Determine which workers to target
|
|
||||||
let targetWorkerIds = [];
|
|
||||||
|
|
||||||
if (targets.includes('all')) {
|
|
||||||
// Get all online workers
|
|
||||||
const [onlineWorkers] = await pool.query('SELECT id FROM workers WHERE status = ?', ['online']);
|
|
||||||
targetWorkerIds = onlineWorkers.map(w => w.id);
|
|
||||||
} else {
|
|
||||||
// Specific worker IDs or names
|
|
||||||
for (const target of targets) {
|
|
||||||
// Try to find by ID first, then by name
|
|
||||||
const [workerById] = await pool.query('SELECT id FROM workers WHERE id = ?', [target]);
|
|
||||||
if (workerById.length > 0) {
|
|
||||||
targetWorkerIds.push(workerById[0].id);
|
|
||||||
} else {
|
|
||||||
const [workerByName] = await pool.query('SELECT id FROM workers WHERE name = ?', [target]);
|
|
||||||
if (workerByName.length > 0) {
|
|
||||||
targetWorkerIds.push(workerByName[0].id);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if (targetWorkerIds.length === 0) {
|
|
||||||
await addExecutionLog(executionId, {
|
|
||||||
step: stepNumber,
|
|
||||||
action: 'no_workers',
|
|
||||||
message: 'No workers available for this step',
|
|
||||||
timestamp: new Date().toISOString()
|
|
||||||
});
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
// Execute command on each target worker and wait for results
|
|
||||||
const results = [];
|
|
||||||
|
|
||||||
for (const workerId of targetWorkerIds) {
|
|
||||||
const workerWs = workers.get(workerId);
|
|
||||||
|
|
||||||
if (!workerWs || workerWs.readyState !== WebSocket.OPEN) {
|
|
||||||
await addExecutionLog(executionId, {
|
|
||||||
step: stepNumber,
|
|
||||||
action: 'worker_offline',
|
|
||||||
worker_id: workerId,
|
|
||||||
timestamp: new Date().toISOString()
|
|
||||||
});
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
// Send command to worker
|
|
||||||
const commandId = generateUUID();
|
|
||||||
|
|
||||||
await addExecutionLog(executionId, {
|
|
||||||
step: stepNumber,
|
|
||||||
action: 'command_sent',
|
|
||||||
worker_id: workerId,
|
|
||||||
command: command,
|
|
||||||
command_id: commandId,
|
|
||||||
timestamp: new Date().toISOString()
|
|
||||||
});
|
|
||||||
|
|
||||||
workerWs.send(JSON.stringify({
|
|
||||||
type: 'execute_command',
|
|
||||||
execution_id: executionId,
|
|
||||||
command_id: commandId,
|
|
||||||
command: command,
|
|
||||||
worker_id: workerId,
|
|
||||||
timeout: 120000 // 2 minute timeout
|
|
||||||
}));
|
|
||||||
|
|
||||||
// Wait for command result (with timeout)
|
|
||||||
const result = await waitForCommandResult(executionId, commandId, 120000);
|
|
||||||
results.push(result);
|
|
||||||
|
|
||||||
if (!result.success) {
|
|
||||||
// Command failed, workflow should stop
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// All commands succeeded
|
|
||||||
return results.every(r => r.success);
|
|
||||||
|
|
||||||
} catch (error) {
|
|
||||||
console.error(`[Workflow] Error executing command step:`, error);
|
|
||||||
await addExecutionLog(executionId, {
|
|
||||||
step: stepNumber,
|
|
||||||
action: 'step_error',
|
|
||||||
error: error.message,
|
|
||||||
timestamp: new Date().toISOString()
|
|
||||||
});
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
async function waitForCommandResult(executionId, commandId, timeout) {
|
|
||||||
return new Promise((resolve) => {
|
|
||||||
const startTime = Date.now();
|
|
||||||
|
|
||||||
const checkInterval = setInterval(async () => {
|
|
||||||
try {
|
|
||||||
// Check if we've received the command result in logs
|
|
||||||
const [execution] = await pool.query('SELECT logs FROM executions WHERE id = ?', [executionId]);
|
|
||||||
|
|
||||||
if (execution.length > 0) {
|
|
||||||
const logs = typeof execution[0].logs === 'string' ? JSON.parse(execution[0].logs) : execution[0].logs;
|
|
||||||
const resultLog = logs.find(log => log.command_id === commandId && log.action === 'command_result');
|
|
||||||
|
|
||||||
if (resultLog) {
|
|
||||||
clearInterval(checkInterval);
|
|
||||||
resolve({
|
|
||||||
success: resultLog.success,
|
|
||||||
stdout: resultLog.stdout,
|
|
||||||
stderr: resultLog.stderr
|
|
||||||
});
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Check timeout
|
|
||||||
if (Date.now() - startTime > timeout) {
|
|
||||||
clearInterval(checkInterval);
|
|
||||||
resolve({
|
|
||||||
success: false,
|
|
||||||
error: 'Command timeout'
|
|
||||||
});
|
|
||||||
}
|
|
||||||
} catch (error) {
|
|
||||||
console.error('[Workflow] Error checking command result:', error);
|
|
||||||
clearInterval(checkInterval);
|
|
||||||
resolve({
|
|
||||||
success: false,
|
|
||||||
error: error.message
|
|
||||||
});
|
|
||||||
}
|
|
||||||
}, 500); // Check every 500ms
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
// Routes - All protected by SSO
|
// Routes - All protected by SSO
|
||||||
app.get('/api/user', authenticateSSO, (req, res) => {
|
app.get('/api/user', authenticateSSO, (req, res) => {
|
||||||
res.json(req.user);
|
res.json(req.user);
|
||||||
@@ -730,24 +454,15 @@ app.post('/api/workflows', authenticateSSO, async (req, res) => {
|
|||||||
try {
|
try {
|
||||||
const { name, description, definition } = req.body;
|
const { name, description, definition } = req.body;
|
||||||
const id = generateUUID();
|
const id = generateUUID();
|
||||||
|
|
||||||
console.log('[Workflow] Creating workflow:', name);
|
|
||||||
console.log('[Workflow] Definition:', JSON.stringify(definition, null, 2));
|
|
||||||
|
|
||||||
await pool.query(
|
await pool.query(
|
||||||
'INSERT INTO workflows (id, name, description, definition, created_by) VALUES (?, ?, ?, ?, ?)',
|
'INSERT INTO workflows (id, name, description, definition, created_by) VALUES (?, ?, ?, ?, ?)',
|
||||||
[id, name, description, JSON.stringify(definition), req.user.username]
|
[id, name, description, JSON.stringify(definition), req.user.username]
|
||||||
);
|
);
|
||||||
|
|
||||||
console.log('[Workflow] Successfully inserted workflow:', id);
|
|
||||||
|
|
||||||
res.json({ id, name, description, definition });
|
res.json({ id, name, description, definition });
|
||||||
|
|
||||||
console.log('[Workflow] Broadcasting workflow_created');
|
|
||||||
broadcast({ type: 'workflow_created', workflow_id: id });
|
broadcast({ type: 'workflow_created', workflow_id: id });
|
||||||
console.log('[Workflow] Broadcast complete');
|
|
||||||
} catch (error) {
|
} catch (error) {
|
||||||
console.error('[Workflow] Error creating workflow:', error);
|
|
||||||
res.status(500).json({ error: error.message });
|
res.status(500).json({ error: error.message });
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
@@ -807,29 +522,13 @@ app.post('/api/executions', authenticateSSO, async (req, res) => {
|
|||||||
try {
|
try {
|
||||||
const { workflow_id } = req.body;
|
const { workflow_id } = req.body;
|
||||||
const id = generateUUID();
|
const id = generateUUID();
|
||||||
|
|
||||||
// Get workflow definition
|
|
||||||
const [workflows] = await pool.query('SELECT * FROM workflows WHERE id = ?', [workflow_id]);
|
|
||||||
if (workflows.length === 0) {
|
|
||||||
return res.status(404).json({ error: 'Workflow not found' });
|
|
||||||
}
|
|
||||||
|
|
||||||
const workflow = workflows[0];
|
|
||||||
const definition = typeof workflow.definition === 'string' ? JSON.parse(workflow.definition) : workflow.definition;
|
|
||||||
|
|
||||||
// Create execution record
|
|
||||||
await pool.query(
|
await pool.query(
|
||||||
'INSERT INTO executions (id, workflow_id, status, started_by, started_at, logs) VALUES (?, ?, ?, ?, NOW(), ?)',
|
'INSERT INTO executions (id, workflow_id, status, started_by, started_at, logs) VALUES (?, ?, ?, ?, NOW(), ?)',
|
||||||
[id, workflow_id, 'running', req.user.username, JSON.stringify([])]
|
[id, workflow_id, 'running', req.user.username, JSON.stringify([])]
|
||||||
);
|
);
|
||||||
|
|
||||||
broadcast({ type: 'execution_started', execution_id: id, workflow_id });
|
broadcast({ type: 'execution_started', execution_id: id, workflow_id });
|
||||||
|
|
||||||
// Start workflow execution asynchronously
|
|
||||||
executeWorkflowSteps(id, workflow_id, definition, req.user.username).catch(err => {
|
|
||||||
console.error(`[Workflow] Execution ${id} failed:`, err);
|
|
||||||
});
|
|
||||||
|
|
||||||
res.json({ id, workflow_id, status: 'running' });
|
res.json({ id, workflow_id, status: 'running' });
|
||||||
} catch (error) {
|
} catch (error) {
|
||||||
res.status(500).json({ error: error.message });
|
res.status(500).json({ error: error.message });
|
||||||
@@ -872,40 +571,6 @@ app.delete('/api/executions/:id', authenticateSSO, async (req, res) => {
|
|||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
app.post('/api/executions/:id/abort', authenticateSSO, async (req, res) => {
|
|
||||||
try {
|
|
||||||
const executionId = req.params.id;
|
|
||||||
|
|
||||||
// Check if execution exists and is running
|
|
||||||
const [execution] = await pool.query('SELECT status FROM executions WHERE id = ?', [executionId]);
|
|
||||||
|
|
||||||
if (execution.length === 0) {
|
|
||||||
return res.status(404).json({ error: 'Execution not found' });
|
|
||||||
}
|
|
||||||
|
|
||||||
if (execution[0].status !== 'running') {
|
|
||||||
return res.status(400).json({ error: 'Execution is not running' });
|
|
||||||
}
|
|
||||||
|
|
||||||
// Add abort log entry
|
|
||||||
await addExecutionLog(executionId, {
|
|
||||||
action: 'execution_aborted',
|
|
||||||
aborted_by: req.user.username,
|
|
||||||
timestamp: new Date().toISOString()
|
|
||||||
});
|
|
||||||
|
|
||||||
// Update execution status to failed
|
|
||||||
await updateExecutionStatus(executionId, 'failed');
|
|
||||||
|
|
||||||
console.log(`[Execution] Execution ${executionId} aborted by ${req.user.username}`);
|
|
||||||
|
|
||||||
res.json({ success: true });
|
|
||||||
} catch (error) {
|
|
||||||
console.error('[Execution] Error aborting execution:', error);
|
|
||||||
res.status(500).json({ error: error.message });
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
// Scheduled Commands API
|
// Scheduled Commands API
|
||||||
app.get('/api/scheduled-commands', authenticateSSO, async (req, res) => {
|
app.get('/api/scheduled-commands', authenticateSSO, async (req, res) => {
|
||||||
try {
|
try {
|
||||||
@@ -1009,6 +674,299 @@ initDatabase().then(() => {
|
|||||||
|
|
||||||
// ============================================
|
// ============================================
|
||||||
// WORKFLOW EXECUTION ENGINE
|
// WORKFLOW EXECUTION ENGINE
|
||||||
|
// ============================================
|
||||||
|
|
||||||
|
// Store active workflow executions in memory
|
||||||
|
const activeExecutions = new Map();
|
||||||
|
|
||||||
|
// Execute workflow step by step
|
||||||
|
async function executeWorkflow(workflowId, executionId, userId, targetWorkers = 'all') {
|
||||||
|
try {
|
||||||
|
// Get workflow definition
|
||||||
|
const [workflows] = await pool.query('SELECT * FROM workflows WHERE id = ?', [workflowId]);
|
||||||
|
if (workflows.length === 0) {
|
||||||
|
throw new Error('Workflow not found');
|
||||||
|
}
|
||||||
|
|
||||||
|
const workflow = workflows[0];
|
||||||
|
const definition = JSON.parse(workflow.definition);
|
||||||
|
|
||||||
|
// Initialize execution state
|
||||||
|
const executionState = {
|
||||||
|
id: executionId,
|
||||||
|
workflowId: workflowId,
|
||||||
|
currentStep: 0,
|
||||||
|
steps: definition.steps || [],
|
||||||
|
results: [],
|
||||||
|
status: 'running',
|
||||||
|
waitingForInput: false,
|
||||||
|
targetWorkers: targetWorkers,
|
||||||
|
userId: userId
|
||||||
|
};
|
||||||
|
|
||||||
|
activeExecutions.set(executionId, executionState);
|
||||||
|
|
||||||
|
// Start executing steps
|
||||||
|
await executeNextStep(executionId);
|
||||||
|
|
||||||
|
} catch (error) {
|
||||||
|
console.error('Workflow execution error:', error);
|
||||||
|
await updateExecutionStatus(executionId, 'failed', error.message);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Execute the next step in a workflow
|
||||||
|
async function executeNextStep(executionId) {
|
||||||
|
const state = activeExecutions.get(executionId);
|
||||||
|
if (!state) return;
|
||||||
|
|
||||||
|
// Check if we've completed all steps
|
||||||
|
if (state.currentStep >= state.steps.length) {
|
||||||
|
await updateExecutionStatus(executionId, 'completed');
|
||||||
|
activeExecutions.delete(executionId);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
const step = state.steps[state.currentStep];
|
||||||
|
|
||||||
|
// Check if step has a condition
|
||||||
|
if (step.condition && !evaluateCondition(step.condition, state)) {
|
||||||
|
console.log(`[Workflow] Skipping step ${state.currentStep}: condition not met`);
|
||||||
|
state.currentStep++;
|
||||||
|
return executeNextStep(executionId);
|
||||||
|
}
|
||||||
|
|
||||||
|
console.log(`[Workflow] Executing step ${state.currentStep}: ${step.name}`);
|
||||||
|
|
||||||
|
try {
|
||||||
|
switch (step.type) {
|
||||||
|
case 'execute':
|
||||||
|
await executeCommandStep(executionId, step);
|
||||||
|
break;
|
||||||
|
case 'prompt':
|
||||||
|
await executePromptStep(executionId, step);
|
||||||
|
break;
|
||||||
|
case 'wait':
|
||||||
|
await executeWaitStep(executionId, step);
|
||||||
|
break;
|
||||||
|
default:
|
||||||
|
throw new Error(`Unknown step type: ${step.type}`);
|
||||||
|
}
|
||||||
|
} catch (error) {
|
||||||
|
await addExecutionLog(executionId, {
|
||||||
|
step: state.currentStep,
|
||||||
|
error: error.message,
|
||||||
|
timestamp: new Date().toISOString()
|
||||||
|
});
|
||||||
|
await updateExecutionStatus(executionId, 'failed', error.message);
|
||||||
|
activeExecutions.delete(executionId);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Execute a command on workers
|
||||||
|
async function executeCommandStep(executionId, step) {
|
||||||
|
const state = activeExecutions.get(executionId);
|
||||||
|
|
||||||
|
// Get target workers
|
||||||
|
const [workers] = await pool.query(
|
||||||
|
'SELECT * FROM workers WHERE status = "online"'
|
||||||
|
);
|
||||||
|
|
||||||
|
if (workers.length === 0) {
|
||||||
|
throw new Error('No online workers available');
|
||||||
|
}
|
||||||
|
|
||||||
|
// Filter workers based on target
|
||||||
|
let targetWorkers = workers;
|
||||||
|
if (step.targets && step.targets[0] !== 'all') {
|
||||||
|
targetWorkers = workers.filter(w => step.targets.includes(w.name));
|
||||||
|
}
|
||||||
|
|
||||||
|
// Send command to workers via WebSocket
|
||||||
|
const commandMessage = {
|
||||||
|
type: 'execute_command',
|
||||||
|
execution_id: executionId,
|
||||||
|
step_index: state.currentStep,
|
||||||
|
command: step.command,
|
||||||
|
timeout: step.timeout || 300000
|
||||||
|
};
|
||||||
|
|
||||||
|
// Broadcast to target workers
|
||||||
|
clients.forEach(client => {
|
||||||
|
if (client.readyState === WebSocket.OPEN) {
|
||||||
|
client.send(JSON.stringify(commandMessage));
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
await addExecutionLog(executionId, {
|
||||||
|
step: state.currentStep,
|
||||||
|
action: 'command_sent',
|
||||||
|
command: step.command,
|
||||||
|
workers: targetWorkers.map(w => w.name),
|
||||||
|
timestamp: new Date().toISOString()
|
||||||
|
});
|
||||||
|
|
||||||
|
// For now, move to next step immediately
|
||||||
|
// In production, we'd wait for worker responses
|
||||||
|
state.currentStep++;
|
||||||
|
|
||||||
|
// Small delay to allow command to execute
|
||||||
|
setTimeout(() => executeNextStep(executionId), 1000);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Execute a user prompt step
|
||||||
|
async function executePromptStep(executionId, step) {
|
||||||
|
const state = activeExecutions.get(executionId);
|
||||||
|
|
||||||
|
state.waitingForInput = true;
|
||||||
|
state.promptData = {
|
||||||
|
message: step.message,
|
||||||
|
options: step.options,
|
||||||
|
step: state.currentStep
|
||||||
|
};
|
||||||
|
|
||||||
|
await addExecutionLog(executionId, {
|
||||||
|
step: state.currentStep,
|
||||||
|
action: 'waiting_for_input',
|
||||||
|
prompt: step.message,
|
||||||
|
timestamp: new Date().toISOString()
|
||||||
|
});
|
||||||
|
|
||||||
|
// Notify frontend that input is needed
|
||||||
|
broadcast({
|
||||||
|
type: 'execution_prompt',
|
||||||
|
execution_id: executionId,
|
||||||
|
prompt: state.promptData
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
// Execute a wait/delay step
|
||||||
|
async function executeWaitStep(executionId, step) {
|
||||||
|
const state = activeExecutions.get(executionId);
|
||||||
|
const delay = step.duration || 1000;
|
||||||
|
|
||||||
|
await addExecutionLog(executionId, {
|
||||||
|
step: state.currentStep,
|
||||||
|
action: 'waiting',
|
||||||
|
duration: delay,
|
||||||
|
timestamp: new Date().toISOString()
|
||||||
|
});
|
||||||
|
|
||||||
|
state.currentStep++;
|
||||||
|
setTimeout(() => executeNextStep(executionId), delay);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Handle user input for prompts
|
||||||
|
function handleUserInput(executionId, response) {
|
||||||
|
const state = activeExecutions.get(executionId);
|
||||||
|
if (!state || !state.waitingForInput) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
state.promptResponse = response;
|
||||||
|
state.waitingForInput = false;
|
||||||
|
state.currentStep++;
|
||||||
|
|
||||||
|
addExecutionLog(executionId, {
|
||||||
|
step: state.currentStep - 1,
|
||||||
|
action: 'user_response',
|
||||||
|
response: response,
|
||||||
|
timestamp: new Date().toISOString()
|
||||||
|
});
|
||||||
|
|
||||||
|
executeNextStep(executionId);
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Evaluate conditions
|
||||||
|
function evaluateCondition(condition, state) {
|
||||||
|
try {
|
||||||
|
// Simple condition evaluation
|
||||||
|
// In production, use a proper expression evaluator
|
||||||
|
const promptResponse = state.promptResponse;
|
||||||
|
return eval(condition);
|
||||||
|
} catch (error) {
|
||||||
|
console.error('Condition evaluation error:', error);
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Helper functions
|
||||||
|
async function updateExecutionStatus(executionId, status, error = null) {
|
||||||
|
const updates = { status };
|
||||||
|
if (status === 'completed' || status === 'failed') {
|
||||||
|
updates.completed_at = new Date();
|
||||||
|
}
|
||||||
|
if (error) {
|
||||||
|
// Add error to logs
|
||||||
|
await addExecutionLog(executionId, { error, timestamp: new Date().toISOString() });
|
||||||
|
}
|
||||||
|
|
||||||
|
await pool.query(
|
||||||
|
'UPDATE executions SET status = ?, completed_at = ? WHERE id = ?',
|
||||||
|
[status, updates.completed_at || null, executionId]
|
||||||
|
);
|
||||||
|
|
||||||
|
broadcast({
|
||||||
|
type: 'execution_status',
|
||||||
|
execution_id: executionId,
|
||||||
|
status: status
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
async function addExecutionLog(executionId, logEntry) {
|
||||||
|
const [rows] = await pool.query('SELECT logs FROM executions WHERE id = ?', [executionId]);
|
||||||
|
if (rows.length === 0) return;
|
||||||
|
|
||||||
|
const logs = JSON.parse(rows[0].logs || '[]');
|
||||||
|
logs.push(logEntry);
|
||||||
|
|
||||||
|
await pool.query('UPDATE executions SET logs = ? WHERE id = ?', [
|
||||||
|
JSON.stringify(logs),
|
||||||
|
executionId
|
||||||
|
]);
|
||||||
|
}
|
||||||
|
|
||||||
|
// ============================================
|
||||||
|
// API ROUTES - Add these to your server.js
|
||||||
|
// ============================================
|
||||||
|
|
||||||
|
// Start workflow execution
|
||||||
|
app.post('/api/executions', authenticateSSO, async (req, res) => {
|
||||||
|
try {
|
||||||
|
const { workflow_id, target_workers } = req.body;
|
||||||
|
const id = generateUUID();
|
||||||
|
|
||||||
|
await pool.query(
|
||||||
|
'INSERT INTO executions (id, workflow_id, status, started_by, started_at, logs) VALUES (?, ?, ?, ?, NOW(), ?)',
|
||||||
|
[id, workflow_id, 'running', req.user.username, JSON.stringify([])]
|
||||||
|
);
|
||||||
|
|
||||||
|
// Start execution
|
||||||
|
executeWorkflow(workflow_id, id, req.user.username, target_workers || 'all');
|
||||||
|
|
||||||
|
broadcast({ type: 'execution_started', execution_id: id, workflow_id });
|
||||||
|
res.json({ id, workflow_id, status: 'running' });
|
||||||
|
} catch (error) {
|
||||||
|
res.status(500).json({ error: error.message });
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
// Respond to workflow prompt
|
||||||
|
app.post('/api/executions/:id/respond', authenticateSSO, async (req, res) => {
|
||||||
|
try {
|
||||||
|
const { response } = req.body;
|
||||||
|
const success = handleUserInput(req.params.id, response);
|
||||||
|
|
||||||
|
if (success) {
|
||||||
|
res.json({ success: true });
|
||||||
|
} else {
|
||||||
|
res.status(400).json({ error: 'Execution not waiting for input' });
|
||||||
|
}
|
||||||
|
} catch (error) {
|
||||||
|
res.status(500).json({ error: error.message });
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
// Get execution details with logs
|
// Get execution details with logs
|
||||||
app.get('/api/executions/:id', authenticateSSO, async (req, res) => {
|
app.get('/api/executions/:id', authenticateSSO, async (req, res) => {
|
||||||
@@ -1017,12 +975,15 @@ app.get('/api/executions/:id', authenticateSSO, async (req, res) => {
|
|||||||
if (rows.length === 0) {
|
if (rows.length === 0) {
|
||||||
return res.status(404).json({ error: 'Not found' });
|
return res.status(404).json({ error: 'Not found' });
|
||||||
}
|
}
|
||||||
|
|
||||||
const execution = rows[0];
|
const execution = rows[0];
|
||||||
|
const state = activeExecutions.get(req.params.id);
|
||||||
|
|
||||||
res.json({
|
res.json({
|
||||||
...execution,
|
...execution,
|
||||||
logs: JSON.parse(execution.logs || '[]')
|
logs: JSON.parse(execution.logs || '[]'),
|
||||||
|
waiting_for_input: state?.waitingForInput || false,
|
||||||
|
prompt: state?.promptData || null
|
||||||
});
|
});
|
||||||
} catch (error) {
|
} catch (error) {
|
||||||
res.status(500).json({ error: error.message });
|
res.status(500).json({ error: error.message });
|
||||||
|
|||||||
Reference in New Issue
Block a user