Removed old/obsolete workflow execution system that was conflicting
with the new executeWorkflowSteps() engine:
Removed:
- activeExecutions Map (old tracking system)
- executeWorkflow() - old workflow executor
- executeNextStep() - old step processor
- executeCommandStep() - old command executor (duplicate)
- handleUserInput() - unimplemented prompt handler
- Duplicate app.post('/api/executions') endpoint
- app.post('/api/executions/:id/respond') endpoint
This was causing "Cannot read properties of undefined (reading 'target')"
error because the old code was being called instead of the new engine.
The new executeWorkflowSteps() engine is now the only workflow system.
Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
1078 lines
33 KiB
JavaScript
1078 lines
33 KiB
JavaScript
const express = require('express');
|
|
const http = require('http');
|
|
const WebSocket = require('ws');
|
|
const mysql = require('mysql2/promise');
|
|
const crypto = require('crypto');
|
|
require('dotenv').config();
|
|
|
|
const app = express();
|
|
const server = http.createServer(app);
|
|
const wss = new WebSocket.Server({ server });
|
|
|
|
// Middleware
|
|
app.use(express.json());
|
|
app.use(express.static('public'));
|
|
|
|
// UUID generator
|
|
function generateUUID() {
|
|
return crypto.randomUUID();
|
|
}
|
|
|
|
// Database pool
|
|
const pool = mysql.createPool({
|
|
host: process.env.DB_HOST,
|
|
port: process.env.DB_PORT || 3306,
|
|
user: process.env.DB_USER,
|
|
password: process.env.DB_PASSWORD,
|
|
database: process.env.DB_NAME,
|
|
waitForConnections: true,
|
|
connectionLimit: 10,
|
|
queueLimit: 0
|
|
});
|
|
|
|
// Initialize database tables
|
|
async function initDatabase() {
|
|
const connection = await pool.getConnection();
|
|
try {
|
|
await connection.query(`
|
|
CREATE TABLE IF NOT EXISTS users (
|
|
id VARCHAR(36) PRIMARY KEY,
|
|
username VARCHAR(255) UNIQUE NOT NULL,
|
|
display_name VARCHAR(255),
|
|
email VARCHAR(255),
|
|
groups TEXT,
|
|
last_login TIMESTAMP,
|
|
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
|
|
)
|
|
`);
|
|
|
|
// Database schema is managed manually - migrations removed after direct database fixes
|
|
|
|
await connection.query(`
|
|
CREATE TABLE IF NOT EXISTS workers (
|
|
id VARCHAR(36) PRIMARY KEY,
|
|
name VARCHAR(255) UNIQUE NOT NULL,
|
|
status VARCHAR(50) NOT NULL,
|
|
last_heartbeat TIMESTAMP NULL,
|
|
api_key VARCHAR(255),
|
|
metadata JSON,
|
|
INDEX idx_status (status),
|
|
INDEX idx_heartbeat (last_heartbeat)
|
|
)
|
|
`);
|
|
|
|
await connection.query(`
|
|
CREATE TABLE IF NOT EXISTS workflows (
|
|
id VARCHAR(36) PRIMARY KEY,
|
|
name VARCHAR(255) NOT NULL,
|
|
description TEXT,
|
|
definition JSON NOT NULL,
|
|
created_by VARCHAR(255),
|
|
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
|
|
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
|
|
INDEX idx_name (name)
|
|
)
|
|
`);
|
|
|
|
await connection.query(`
|
|
CREATE TABLE IF NOT EXISTS executions (
|
|
id VARCHAR(36) PRIMARY KEY,
|
|
workflow_id VARCHAR(36) NULL,
|
|
status VARCHAR(50) NOT NULL,
|
|
started_by VARCHAR(255),
|
|
started_at TIMESTAMP NULL,
|
|
completed_at TIMESTAMP NULL,
|
|
logs JSON,
|
|
INDEX idx_workflow (workflow_id),
|
|
INDEX idx_status (status),
|
|
INDEX idx_started (started_at)
|
|
)
|
|
`);
|
|
|
|
await connection.query(`
|
|
CREATE TABLE IF NOT EXISTS scheduled_commands (
|
|
id VARCHAR(36) PRIMARY KEY,
|
|
name VARCHAR(255) NOT NULL,
|
|
command TEXT NOT NULL,
|
|
worker_ids JSON NOT NULL,
|
|
schedule_type VARCHAR(50) NOT NULL,
|
|
schedule_value VARCHAR(255) NOT NULL,
|
|
enabled BOOLEAN DEFAULT TRUE,
|
|
created_by VARCHAR(255),
|
|
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
|
|
last_run TIMESTAMP NULL,
|
|
next_run TIMESTAMP NULL,
|
|
INDEX idx_enabled (enabled),
|
|
INDEX idx_next_run (next_run)
|
|
)
|
|
`);
|
|
|
|
console.log('Database tables initialized successfully');
|
|
} catch (error) {
|
|
console.error('Database initialization error:', error);
|
|
throw error;
|
|
} finally {
|
|
connection.release();
|
|
}
|
|
}
|
|
|
|
// Auto-cleanup old executions (runs daily)
|
|
async function cleanupOldExecutions() {
|
|
try {
|
|
const retentionDays = parseInt(process.env.EXECUTION_RETENTION_DAYS) || 30;
|
|
const [result] = await pool.query(
|
|
`DELETE FROM executions
|
|
WHERE status IN ('completed', 'failed')
|
|
AND started_at < DATE_SUB(NOW(), INTERVAL ? DAY)`,
|
|
[retentionDays]
|
|
);
|
|
console.log(`[Cleanup] Removed ${result.affectedRows} executions older than ${retentionDays} days`);
|
|
} catch (error) {
|
|
console.error('[Cleanup] Error removing old executions:', error);
|
|
}
|
|
}
|
|
|
|
// Run cleanup daily at 3 AM
|
|
setInterval(cleanupOldExecutions, 24 * 60 * 60 * 1000);
|
|
// Run cleanup on startup
|
|
cleanupOldExecutions();
|
|
|
|
// Scheduled Commands Processor
|
|
async function processScheduledCommands() {
|
|
try {
|
|
const [schedules] = await pool.query(
|
|
`SELECT * FROM scheduled_commands
|
|
WHERE enabled = TRUE
|
|
AND (next_run IS NULL OR next_run <= NOW())`
|
|
);
|
|
|
|
for (const schedule of schedules) {
|
|
console.log(`[Scheduler] Running scheduled command: ${schedule.name}`);
|
|
|
|
const workerIds = JSON.parse(schedule.worker_ids);
|
|
|
|
// Execute command on each worker
|
|
for (const workerId of workerIds) {
|
|
const workerWs = workers.get(workerId);
|
|
if (workerWs && workerWs.readyState === WebSocket.OPEN) {
|
|
const executionId = generateUUID();
|
|
|
|
// Create execution record
|
|
await pool.query(
|
|
'INSERT INTO executions (id, workflow_id, status, started_by, started_at, logs) VALUES (?, ?, ?, ?, NOW(), ?)',
|
|
[executionId, null, 'running', `scheduler:${schedule.name}`, JSON.stringify([{
|
|
step: 'scheduled_command',
|
|
action: 'command_sent',
|
|
worker_id: workerId,
|
|
command: schedule.command,
|
|
timestamp: new Date().toISOString()
|
|
}])]
|
|
);
|
|
|
|
// Send command to worker
|
|
workerWs.send(JSON.stringify({
|
|
type: 'execute_command',
|
|
execution_id: executionId,
|
|
command: schedule.command,
|
|
worker_id: workerId,
|
|
timeout: 300000 // 5 minute timeout for scheduled commands
|
|
}));
|
|
|
|
broadcast({ type: 'execution_started', execution_id: executionId, workflow_id: null });
|
|
}
|
|
}
|
|
|
|
// Update last_run and calculate next_run
|
|
const nextRun = calculateNextRun(schedule.schedule_type, schedule.schedule_value);
|
|
await pool.query(
|
|
'UPDATE scheduled_commands SET last_run = NOW(), next_run = ? WHERE id = ?',
|
|
[nextRun, schedule.id]
|
|
);
|
|
}
|
|
} catch (error) {
|
|
console.error('[Scheduler] Error processing scheduled commands:', error);
|
|
}
|
|
}
|
|
|
|
function calculateNextRun(scheduleType, scheduleValue) {
|
|
const now = new Date();
|
|
|
|
if (scheduleType === 'interval') {
|
|
// Interval in minutes
|
|
const minutes = parseInt(scheduleValue);
|
|
return new Date(now.getTime() + minutes * 60000);
|
|
} else if (scheduleType === 'daily') {
|
|
// Daily at HH:MM
|
|
const [hours, minutes] = scheduleValue.split(':').map(Number);
|
|
const next = new Date(now);
|
|
next.setHours(hours, minutes, 0, 0);
|
|
|
|
// If time has passed today, schedule for tomorrow
|
|
if (next <= now) {
|
|
next.setDate(next.getDate() + 1);
|
|
}
|
|
return next;
|
|
} else if (scheduleType === 'hourly') {
|
|
// Every N hours
|
|
const hours = parseInt(scheduleValue);
|
|
return new Date(now.getTime() + hours * 3600000);
|
|
}
|
|
|
|
return null;
|
|
}
|
|
|
|
// Run scheduler every minute
|
|
setInterval(processScheduledCommands, 60 * 1000);
|
|
// Initial run on startup
|
|
setTimeout(processScheduledCommands, 5000);
|
|
|
|
// WebSocket connections
|
|
const clients = new Set();
|
|
const workers = new Map(); // Map worker_id -> WebSocket connection
|
|
|
|
wss.on('connection', (ws) => {
|
|
clients.add(ws);
|
|
|
|
// Handle incoming messages from workers
|
|
ws.on('message', async (data) => {
|
|
try {
|
|
const message = JSON.parse(data.toString());
|
|
console.log('WebSocket message received:', message.type);
|
|
|
|
if (message.type === 'command_result') {
|
|
// Handle command result from worker
|
|
const { execution_id, command_id, worker_id, success, stdout, stderr, duration, timestamp } = message;
|
|
|
|
// Add result to execution logs
|
|
await addExecutionLog(execution_id, {
|
|
step: 'command_execution',
|
|
action: 'command_result',
|
|
command_id: command_id, // Include command_id for workflow tracking
|
|
worker_id: worker_id,
|
|
success: success,
|
|
stdout: stdout,
|
|
stderr: stderr,
|
|
duration: duration,
|
|
timestamp: timestamp || new Date().toISOString()
|
|
});
|
|
|
|
// For non-workflow executions, update status immediately
|
|
// For workflow executions, the workflow engine will update status
|
|
const [execution] = await pool.query('SELECT workflow_id FROM executions WHERE id = ?', [execution_id]);
|
|
if (execution.length > 0 && !execution[0].workflow_id) {
|
|
// Only update status for quick commands (no workflow_id)
|
|
const finalStatus = success ? 'completed' : 'failed';
|
|
await updateExecutionStatus(execution_id, finalStatus);
|
|
}
|
|
|
|
// Broadcast result to all connected clients
|
|
broadcast({
|
|
type: 'command_result',
|
|
execution_id: execution_id,
|
|
worker_id: worker_id,
|
|
success: success,
|
|
stdout: stdout,
|
|
stderr: stderr
|
|
});
|
|
|
|
console.log(`Command result received for execution ${execution_id}: ${success ? 'success' : 'failed'}`);
|
|
}
|
|
|
|
if (message.type === 'workflow_result') {
|
|
// Handle workflow result from worker
|
|
const { execution_id, worker_id, success, message: resultMessage, timestamp } = message;
|
|
|
|
// Add final result to logs
|
|
await addExecutionLog(execution_id, {
|
|
step: 'workflow_completion',
|
|
action: 'workflow_result',
|
|
worker_id: worker_id,
|
|
success: success,
|
|
message: resultMessage,
|
|
timestamp: timestamp || new Date().toISOString()
|
|
});
|
|
|
|
// Update execution status
|
|
const finalStatus = success ? 'completed' : 'failed';
|
|
await updateExecutionStatus(execution_id, finalStatus);
|
|
|
|
// Broadcast completion to all clients
|
|
broadcast({
|
|
type: 'workflow_result',
|
|
execution_id: execution_id,
|
|
status: finalStatus,
|
|
success: success,
|
|
message: resultMessage
|
|
});
|
|
|
|
console.log(`Workflow result received for execution ${execution_id}: ${finalStatus}`);
|
|
}
|
|
|
|
if (message.type === 'worker_connect') {
|
|
// Handle worker connection
|
|
const { worker_id, worker_name } = message;
|
|
console.log(`Worker connected: ${worker_name} (${worker_id})`);
|
|
|
|
// Find the database worker ID by name
|
|
const [dbWorkers] = await pool.query(
|
|
'SELECT id FROM workers WHERE name = ?',
|
|
[worker_name]
|
|
);
|
|
|
|
if (dbWorkers.length > 0) {
|
|
const dbWorkerId = dbWorkers[0].id;
|
|
|
|
// Store worker WebSocket connection using BOTH IDs
|
|
workers.set(worker_id, ws); // Runtime ID
|
|
workers.set(dbWorkerId, ws); // Database ID
|
|
|
|
// Store mapping for cleanup
|
|
ws.workerId = worker_id;
|
|
ws.dbWorkerId = dbWorkerId;
|
|
|
|
console.log(`Mapped worker: runtime_id=${worker_id}, db_id=${dbWorkerId}, name=${worker_name}`);
|
|
|
|
// Update worker status to online
|
|
await pool.query(
|
|
`UPDATE workers SET status='online', last_heartbeat=NOW() WHERE id=?`,
|
|
[dbWorkerId]
|
|
);
|
|
|
|
// Broadcast worker status update with database ID
|
|
broadcast({
|
|
type: 'worker_update',
|
|
worker_id: dbWorkerId,
|
|
status: 'online'
|
|
});
|
|
} else {
|
|
console.log(`Worker ${worker_name} not found in database, will be created on heartbeat`);
|
|
}
|
|
}
|
|
|
|
if (message.type === 'pong') {
|
|
// Handle worker pong response
|
|
const { worker_id } = message;
|
|
await pool.query(
|
|
`UPDATE workers SET last_heartbeat=NOW() WHERE id=?`,
|
|
[worker_id]
|
|
);
|
|
}
|
|
|
|
} catch (error) {
|
|
console.error('WebSocket message error:', error);
|
|
}
|
|
});
|
|
|
|
ws.on('close', () => {
|
|
clients.delete(ws);
|
|
// Remove worker from workers map when disconnected (both runtime and db IDs)
|
|
if (ws.workerId) {
|
|
workers.delete(ws.workerId);
|
|
console.log(`Worker ${ws.workerId} (runtime ID) disconnected`);
|
|
}
|
|
if (ws.dbWorkerId) {
|
|
workers.delete(ws.dbWorkerId);
|
|
console.log(`Worker ${ws.dbWorkerId} (database ID) disconnected`);
|
|
}
|
|
});
|
|
});
|
|
|
|
// Broadcast to all connected clients
|
|
function broadcast(data) {
|
|
clients.forEach(client => {
|
|
if (client.readyState === WebSocket.OPEN) {
|
|
client.send(JSON.stringify(data));
|
|
}
|
|
});
|
|
}
|
|
|
|
// Authelia SSO Middleware
|
|
async function authenticateSSO(req, res, next) {
|
|
// Check for Authelia headers
|
|
const remoteUser = req.headers['remote-user'];
|
|
const remoteName = req.headers['remote-name'];
|
|
const remoteEmail = req.headers['remote-email'];
|
|
const remoteGroups = req.headers['remote-groups'];
|
|
|
|
if (!remoteUser) {
|
|
return res.status(401).json({
|
|
error: 'Not authenticated',
|
|
message: 'Please access this service through auth.lotusguild.org'
|
|
});
|
|
}
|
|
|
|
// Check if user is in allowed groups (admin or employee)
|
|
const groups = remoteGroups ? remoteGroups.split(',').map(g => g.trim()) : [];
|
|
const allowedGroups = ['admin', 'employee'];
|
|
const hasAccess = groups.some(g => allowedGroups.includes(g));
|
|
|
|
if (!hasAccess) {
|
|
return res.status(403).json({
|
|
error: 'Access denied',
|
|
message: 'You must be in admin or employee group'
|
|
});
|
|
}
|
|
|
|
// Store/update user in database
|
|
try {
|
|
const userId = generateUUID();
|
|
await pool.query(
|
|
`INSERT INTO users (id, username, display_name, email, groups, last_login)
|
|
VALUES (?, ?, ?, ?, ?, NOW())
|
|
ON DUPLICATE KEY UPDATE
|
|
display_name=VALUES(display_name),
|
|
email=VALUES(email),
|
|
groups=VALUES(groups),
|
|
last_login=NOW()`,
|
|
[userId, remoteUser, remoteName, remoteEmail, remoteGroups]
|
|
);
|
|
} catch (error) {
|
|
console.error('Error updating user:', error);
|
|
}
|
|
|
|
// Attach user info to request
|
|
req.user = {
|
|
username: remoteUser,
|
|
name: remoteName || remoteUser,
|
|
email: remoteEmail || '',
|
|
groups: groups,
|
|
isAdmin: groups.includes('admin')
|
|
};
|
|
|
|
next();
|
|
}
|
|
|
|
// Workflow Execution Engine
|
|
async function executeWorkflowSteps(executionId, workflowId, definition, username) {
|
|
try {
|
|
console.log(`[Workflow] Starting execution ${executionId} for workflow ${workflowId}`);
|
|
|
|
const steps = definition.steps || [];
|
|
let allStepsSucceeded = true;
|
|
|
|
for (let i = 0; i < steps.length; i++) {
|
|
const step = steps[i];
|
|
console.log(`[Workflow] Execution ${executionId} - Step ${i + 1}/${steps.length}: ${step.name}`);
|
|
|
|
// Add step start log
|
|
await addExecutionLog(executionId, {
|
|
step: i + 1,
|
|
step_name: step.name,
|
|
action: 'step_started',
|
|
timestamp: new Date().toISOString()
|
|
});
|
|
|
|
broadcast({
|
|
type: 'workflow_step_started',
|
|
execution_id: executionId,
|
|
step: i + 1,
|
|
step_name: step.name
|
|
});
|
|
|
|
if (step.type === 'execute') {
|
|
// Execute command step
|
|
const success = await executeCommandStep(executionId, step, i + 1);
|
|
if (!success) {
|
|
allStepsSucceeded = false;
|
|
break; // Stop workflow on failure
|
|
}
|
|
} else if (step.type === 'wait') {
|
|
// Wait step (delay in seconds)
|
|
const seconds = step.duration || 5;
|
|
await addExecutionLog(executionId, {
|
|
step: i + 1,
|
|
action: 'waiting',
|
|
duration: seconds,
|
|
timestamp: new Date().toISOString()
|
|
});
|
|
await new Promise(resolve => setTimeout(resolve, seconds * 1000));
|
|
} else if (step.type === 'prompt') {
|
|
// Interactive prompt step (not fully implemented, would need user interaction)
|
|
await addExecutionLog(executionId, {
|
|
step: i + 1,
|
|
action: 'prompt_skipped',
|
|
message: 'Interactive prompts not yet supported',
|
|
timestamp: new Date().toISOString()
|
|
});
|
|
}
|
|
|
|
// Add step completion log
|
|
await addExecutionLog(executionId, {
|
|
step: i + 1,
|
|
step_name: step.name,
|
|
action: 'step_completed',
|
|
timestamp: new Date().toISOString()
|
|
});
|
|
|
|
broadcast({
|
|
type: 'workflow_step_completed',
|
|
execution_id: executionId,
|
|
step: i + 1,
|
|
step_name: step.name
|
|
});
|
|
}
|
|
|
|
// Mark execution as completed or failed
|
|
const finalStatus = allStepsSucceeded ? 'completed' : 'failed';
|
|
await updateExecutionStatus(executionId, finalStatus);
|
|
|
|
console.log(`[Workflow] Execution ${executionId} finished with status: ${finalStatus}`);
|
|
|
|
} catch (error) {
|
|
console.error(`[Workflow] Execution ${executionId} error:`, error);
|
|
await addExecutionLog(executionId, {
|
|
action: 'workflow_error',
|
|
error: error.message,
|
|
timestamp: new Date().toISOString()
|
|
});
|
|
await updateExecutionStatus(executionId, 'failed');
|
|
}
|
|
}
|
|
|
|
async function executeCommandStep(executionId, step, stepNumber) {
|
|
try {
|
|
const command = step.command;
|
|
const targets = step.targets || ['all'];
|
|
|
|
// Determine which workers to target
|
|
let targetWorkerIds = [];
|
|
|
|
if (targets.includes('all')) {
|
|
// Get all online workers
|
|
const [onlineWorkers] = await pool.query('SELECT id FROM workers WHERE status = ?', ['online']);
|
|
targetWorkerIds = onlineWorkers.map(w => w.id);
|
|
} else {
|
|
// Specific worker IDs or names
|
|
for (const target of targets) {
|
|
// Try to find by ID first, then by name
|
|
const [workerById] = await pool.query('SELECT id FROM workers WHERE id = ?', [target]);
|
|
if (workerById.length > 0) {
|
|
targetWorkerIds.push(workerById[0].id);
|
|
} else {
|
|
const [workerByName] = await pool.query('SELECT id FROM workers WHERE name = ?', [target]);
|
|
if (workerByName.length > 0) {
|
|
targetWorkerIds.push(workerByName[0].id);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
if (targetWorkerIds.length === 0) {
|
|
await addExecutionLog(executionId, {
|
|
step: stepNumber,
|
|
action: 'no_workers',
|
|
message: 'No workers available for this step',
|
|
timestamp: new Date().toISOString()
|
|
});
|
|
return false;
|
|
}
|
|
|
|
// Execute command on each target worker and wait for results
|
|
const results = [];
|
|
|
|
for (const workerId of targetWorkerIds) {
|
|
const workerWs = workers.get(workerId);
|
|
|
|
if (!workerWs || workerWs.readyState !== WebSocket.OPEN) {
|
|
await addExecutionLog(executionId, {
|
|
step: stepNumber,
|
|
action: 'worker_offline',
|
|
worker_id: workerId,
|
|
timestamp: new Date().toISOString()
|
|
});
|
|
continue;
|
|
}
|
|
|
|
// Send command to worker
|
|
const commandId = generateUUID();
|
|
|
|
await addExecutionLog(executionId, {
|
|
step: stepNumber,
|
|
action: 'command_sent',
|
|
worker_id: workerId,
|
|
command: command,
|
|
command_id: commandId,
|
|
timestamp: new Date().toISOString()
|
|
});
|
|
|
|
workerWs.send(JSON.stringify({
|
|
type: 'execute_command',
|
|
execution_id: executionId,
|
|
command_id: commandId,
|
|
command: command,
|
|
worker_id: workerId,
|
|
timeout: 120000 // 2 minute timeout
|
|
}));
|
|
|
|
// Wait for command result (with timeout)
|
|
const result = await waitForCommandResult(executionId, commandId, 120000);
|
|
results.push(result);
|
|
|
|
if (!result.success) {
|
|
// Command failed, workflow should stop
|
|
return false;
|
|
}
|
|
}
|
|
|
|
// All commands succeeded
|
|
return results.every(r => r.success);
|
|
|
|
} catch (error) {
|
|
console.error(`[Workflow] Error executing command step:`, error);
|
|
await addExecutionLog(executionId, {
|
|
step: stepNumber,
|
|
action: 'step_error',
|
|
error: error.message,
|
|
timestamp: new Date().toISOString()
|
|
});
|
|
return false;
|
|
}
|
|
}
|
|
|
|
async function waitForCommandResult(executionId, commandId, timeout) {
|
|
return new Promise((resolve) => {
|
|
const startTime = Date.now();
|
|
|
|
const checkInterval = setInterval(async () => {
|
|
try {
|
|
// Check if we've received the command result in logs
|
|
const [execution] = await pool.query('SELECT logs FROM executions WHERE id = ?', [executionId]);
|
|
|
|
if (execution.length > 0) {
|
|
const logs = typeof execution[0].logs === 'string' ? JSON.parse(execution[0].logs) : execution[0].logs;
|
|
const resultLog = logs.find(log => log.command_id === commandId && log.action === 'command_result');
|
|
|
|
if (resultLog) {
|
|
clearInterval(checkInterval);
|
|
resolve({
|
|
success: resultLog.success,
|
|
stdout: resultLog.stdout,
|
|
stderr: resultLog.stderr
|
|
});
|
|
return;
|
|
}
|
|
}
|
|
|
|
// Check timeout
|
|
if (Date.now() - startTime > timeout) {
|
|
clearInterval(checkInterval);
|
|
resolve({
|
|
success: false,
|
|
error: 'Command timeout'
|
|
});
|
|
}
|
|
} catch (error) {
|
|
console.error('[Workflow] Error checking command result:', error);
|
|
clearInterval(checkInterval);
|
|
resolve({
|
|
success: false,
|
|
error: error.message
|
|
});
|
|
}
|
|
}, 500); // Check every 500ms
|
|
});
|
|
}
|
|
|
|
// Routes - All protected by SSO
|
|
app.get('/api/user', authenticateSSO, (req, res) => {
|
|
res.json(req.user);
|
|
});
|
|
|
|
app.get('/api/workflows', authenticateSSO, async (req, res) => {
|
|
try {
|
|
const [rows] = await pool.query('SELECT * FROM workflows ORDER BY created_at DESC');
|
|
res.json(rows);
|
|
} catch (error) {
|
|
res.status(500).json({ error: error.message });
|
|
}
|
|
});
|
|
|
|
app.post('/api/workflows', authenticateSSO, async (req, res) => {
|
|
try {
|
|
const { name, description, definition } = req.body;
|
|
const id = generateUUID();
|
|
|
|
await pool.query(
|
|
'INSERT INTO workflows (id, name, description, definition, created_by) VALUES (?, ?, ?, ?, ?)',
|
|
[id, name, description, JSON.stringify(definition), req.user.username]
|
|
);
|
|
|
|
res.json({ id, name, description, definition });
|
|
broadcast({ type: 'workflow_created', workflow_id: id });
|
|
} catch (error) {
|
|
res.status(500).json({ error: error.message });
|
|
}
|
|
});
|
|
|
|
app.delete('/api/workflows/:id', authenticateSSO, async (req, res) => {
|
|
try {
|
|
// Only admins can delete workflows
|
|
if (!req.user.isAdmin) {
|
|
return res.status(403).json({ error: 'Admin access required' });
|
|
}
|
|
|
|
await pool.query('DELETE FROM workflows WHERE id = ?', [req.params.id]);
|
|
res.json({ success: true });
|
|
broadcast({ type: 'workflow_deleted', workflow_id: req.params.id });
|
|
} catch (error) {
|
|
res.status(500).json({ error: error.message });
|
|
}
|
|
});
|
|
|
|
app.get('/api/workers', authenticateSSO, async (req, res) => {
|
|
try {
|
|
const [rows] = await pool.query('SELECT * FROM workers ORDER BY name');
|
|
res.json(rows);
|
|
} catch (error) {
|
|
res.status(500).json({ error: error.message });
|
|
}
|
|
});
|
|
|
|
app.post('/api/workers/heartbeat', async (req, res) => {
|
|
try {
|
|
const { worker_id, name, metadata } = req.body;
|
|
const apiKey = req.headers['x-api-key'];
|
|
|
|
// Verify API key
|
|
if (apiKey !== process.env.WORKER_API_KEY) {
|
|
return res.status(401).json({ error: 'Invalid API key' });
|
|
}
|
|
|
|
await pool.query(
|
|
`INSERT INTO workers (id, name, status, last_heartbeat, api_key, metadata)
|
|
VALUES (?, ?, 'online', NOW(), ?, ?)
|
|
ON DUPLICATE KEY UPDATE
|
|
status='online',
|
|
last_heartbeat=NOW(),
|
|
metadata=VALUES(metadata)`,
|
|
[worker_id, name, apiKey, JSON.stringify(metadata)]
|
|
);
|
|
|
|
broadcast({ type: 'worker_update', worker_id, status: 'online' });
|
|
res.json({ success: true });
|
|
} catch (error) {
|
|
res.status(500).json({ error: error.message });
|
|
}
|
|
});
|
|
|
|
app.post('/api/executions', authenticateSSO, async (req, res) => {
|
|
try {
|
|
const { workflow_id } = req.body;
|
|
const id = generateUUID();
|
|
|
|
// Get workflow definition
|
|
const [workflows] = await pool.query('SELECT * FROM workflows WHERE id = ?', [workflow_id]);
|
|
if (workflows.length === 0) {
|
|
return res.status(404).json({ error: 'Workflow not found' });
|
|
}
|
|
|
|
const workflow = workflows[0];
|
|
const definition = typeof workflow.definition === 'string' ? JSON.parse(workflow.definition) : workflow.definition;
|
|
|
|
// Create execution record
|
|
await pool.query(
|
|
'INSERT INTO executions (id, workflow_id, status, started_by, started_at, logs) VALUES (?, ?, ?, ?, NOW(), ?)',
|
|
[id, workflow_id, 'running', req.user.username, JSON.stringify([])]
|
|
);
|
|
|
|
broadcast({ type: 'execution_started', execution_id: id, workflow_id });
|
|
|
|
// Start workflow execution asynchronously
|
|
executeWorkflowSteps(id, workflow_id, definition, req.user.username).catch(err => {
|
|
console.error(`[Workflow] Execution ${id} failed:`, err);
|
|
});
|
|
|
|
res.json({ id, workflow_id, status: 'running' });
|
|
} catch (error) {
|
|
res.status(500).json({ error: error.message });
|
|
}
|
|
});
|
|
|
|
app.get('/api/executions', authenticateSSO, async (req, res) => {
|
|
try {
|
|
const limit = parseInt(req.query.limit) || 50;
|
|
const offset = parseInt(req.query.offset) || 0;
|
|
|
|
const [rows] = await pool.query(
|
|
'SELECT e.*, w.name as workflow_name FROM executions e LEFT JOIN workflows w ON e.workflow_id = w.id ORDER BY e.started_at DESC LIMIT ? OFFSET ?',
|
|
[limit, offset]
|
|
);
|
|
|
|
// Get total count
|
|
const [countRows] = await pool.query('SELECT COUNT(*) as total FROM executions');
|
|
const total = countRows[0].total;
|
|
|
|
res.json({
|
|
executions: rows,
|
|
total: total,
|
|
limit: limit,
|
|
offset: offset,
|
|
hasMore: offset + rows.length < total
|
|
});
|
|
} catch (error) {
|
|
res.status(500).json({ error: error.message });
|
|
}
|
|
});
|
|
|
|
app.delete('/api/executions/:id', authenticateSSO, async (req, res) => {
|
|
try {
|
|
await pool.query('DELETE FROM executions WHERE id = ?', [req.params.id]);
|
|
broadcast({ type: 'execution_deleted', execution_id: req.params.id });
|
|
res.json({ success: true });
|
|
} catch (error) {
|
|
res.status(500).json({ error: error.message });
|
|
}
|
|
});
|
|
|
|
// Scheduled Commands API
|
|
app.get('/api/scheduled-commands', authenticateSSO, async (req, res) => {
|
|
try {
|
|
const [schedules] = await pool.query(
|
|
'SELECT * FROM scheduled_commands ORDER BY created_at DESC'
|
|
);
|
|
res.json(schedules);
|
|
} catch (error) {
|
|
res.status(500).json({ error: error.message });
|
|
}
|
|
});
|
|
|
|
app.post('/api/scheduled-commands', authenticateSSO, async (req, res) => {
|
|
try {
|
|
const { name, command, worker_ids, schedule_type, schedule_value } = req.body;
|
|
|
|
if (!name || !command || !worker_ids || !schedule_type || !schedule_value) {
|
|
return res.status(400).json({ error: 'Missing required fields' });
|
|
}
|
|
|
|
const id = generateUUID();
|
|
const nextRun = calculateNextRun(schedule_type, schedule_value);
|
|
|
|
await pool.query(
|
|
`INSERT INTO scheduled_commands
|
|
(id, name, command, worker_ids, schedule_type, schedule_value, created_by, next_run)
|
|
VALUES (?, ?, ?, ?, ?, ?, ?, ?)`,
|
|
[id, name, command, JSON.stringify(worker_ids), schedule_type, schedule_value, req.user.username, nextRun]
|
|
);
|
|
|
|
res.json({ success: true, id });
|
|
} catch (error) {
|
|
res.status(500).json({ error: error.message });
|
|
}
|
|
});
|
|
|
|
app.put('/api/scheduled-commands/:id/toggle', authenticateSSO, async (req, res) => {
|
|
try {
|
|
const { id } = req.params;
|
|
const [current] = await pool.query('SELECT enabled FROM scheduled_commands WHERE id = ?', [id]);
|
|
|
|
if (current.length === 0) {
|
|
return res.status(404).json({ error: 'Schedule not found' });
|
|
}
|
|
|
|
const newEnabled = !current[0].enabled;
|
|
await pool.query('UPDATE scheduled_commands SET enabled = ? WHERE id = ?', [newEnabled, id]);
|
|
|
|
res.json({ success: true, enabled: newEnabled });
|
|
} catch (error) {
|
|
res.status(500).json({ error: error.message });
|
|
}
|
|
});
|
|
|
|
app.delete('/api/scheduled-commands/:id', authenticateSSO, async (req, res) => {
|
|
try {
|
|
const { id } = req.params;
|
|
await pool.query('DELETE FROM scheduled_commands WHERE id = ?', [id]);
|
|
res.json({ success: true });
|
|
} catch (error) {
|
|
res.status(500).json({ error: error.message });
|
|
}
|
|
});
|
|
|
|
// Health check (no auth required)
|
|
app.get('/health', async (req, res) => {
|
|
try {
|
|
await pool.query('SELECT 1');
|
|
res.json({
|
|
status: 'ok',
|
|
timestamp: new Date().toISOString(),
|
|
database: 'connected',
|
|
auth: 'authelia-sso'
|
|
});
|
|
} catch (error) {
|
|
res.status(500).json({
|
|
status: 'error',
|
|
timestamp: new Date().toISOString(),
|
|
database: 'disconnected',
|
|
error: error.message
|
|
});
|
|
}
|
|
});
|
|
|
|
// Start server
|
|
const PORT = process.env.PORT || 8080;
|
|
const HOST = process.env.HOST || '0.0.0.0';
|
|
|
|
initDatabase().then(() => {
|
|
server.listen(PORT, HOST, () => {
|
|
console.log(`PULSE Server running on http://${HOST}:${PORT}`);
|
|
console.log(`Connected to MariaDB at ${process.env.DB_HOST}`);
|
|
console.log(`Authentication: Authelia SSO`);
|
|
console.log(`Worker API Key configured: ${process.env.WORKER_API_KEY ? 'Yes' : 'No'}`);
|
|
});
|
|
}).catch(err => {
|
|
console.error('Failed to start server:', err);
|
|
process.exit(1);
|
|
});
|
|
|
|
|
|
// ============================================
|
|
// WORKFLOW EXECUTION ENGINE
|
|
|
|
// Get execution details with logs
|
|
app.get('/api/executions/:id', authenticateSSO, async (req, res) => {
|
|
try {
|
|
const [rows] = await pool.query('SELECT * FROM executions WHERE id = ?', [req.params.id]);
|
|
if (rows.length === 0) {
|
|
return res.status(404).json({ error: 'Not found' });
|
|
}
|
|
|
|
const execution = rows[0];
|
|
const state = activeExecutions.get(req.params.id);
|
|
|
|
res.json({
|
|
...execution,
|
|
logs: JSON.parse(execution.logs || '[]'),
|
|
waiting_for_input: state?.waitingForInput || false,
|
|
prompt: state?.promptData || null
|
|
});
|
|
} catch (error) {
|
|
res.status(500).json({ error: error.message });
|
|
}
|
|
});
|
|
|
|
// Delete worker (admin only)
|
|
app.delete('/api/workers/:id', authenticateSSO, async (req, res) => {
|
|
try {
|
|
if (!req.user.isAdmin) {
|
|
return res.status(403).json({ error: 'Admin access required' });
|
|
}
|
|
|
|
await pool.query('DELETE FROM workers WHERE id = ?', [req.params.id]);
|
|
res.json({ success: true });
|
|
broadcast({ type: 'worker_deleted', worker_id: req.params.id });
|
|
} catch (error) {
|
|
res.status(500).json({ error: error.message });
|
|
}
|
|
});
|
|
|
|
// Send direct command to specific worker (for testing)
|
|
app.post('/api/workers/:id/command', authenticateSSO, async (req, res) => {
|
|
try {
|
|
const { command } = req.body;
|
|
const executionId = generateUUID();
|
|
const workerId = req.params.id;
|
|
|
|
// Create execution record in database
|
|
await pool.query(
|
|
'INSERT INTO executions (id, workflow_id, status, started_by, started_at, logs) VALUES (?, ?, ?, ?, NOW(), ?)',
|
|
[executionId, null, 'running', req.user.username, JSON.stringify([{
|
|
step: 'quick_command',
|
|
action: 'command_sent',
|
|
worker_id: workerId,
|
|
command: command,
|
|
timestamp: new Date().toISOString()
|
|
}])]
|
|
);
|
|
|
|
// Send command via WebSocket to specific worker
|
|
const commandMessage = {
|
|
type: 'execute_command',
|
|
execution_id: executionId,
|
|
command: command,
|
|
worker_id: workerId,
|
|
timeout: 60000
|
|
};
|
|
|
|
const workerWs = workers.get(workerId);
|
|
if (!workerWs || workerWs.readyState !== WebSocket.OPEN) {
|
|
return res.status(400).json({ error: 'Worker not connected' });
|
|
}
|
|
|
|
workerWs.send(JSON.stringify(commandMessage));
|
|
console.log(`Command sent to worker ${workerId}: ${command}`);
|
|
|
|
broadcast({ type: 'execution_started', execution_id: executionId, workflow_id: null });
|
|
res.json({ success: true, execution_id: executionId });
|
|
} catch (error) {
|
|
res.status(500).json({ error: error.message });
|
|
}
|
|
});
|
|
|
|
// ============================================
|
|
// EXAMPLE WORKFLOW DEFINITIONS
|
|
// ============================================
|
|
|
|
// Example 1: Simple command execution
|
|
const simpleWorkflow = {
|
|
name: "Update System Packages",
|
|
description: "Update all packages on target servers",
|
|
steps: [
|
|
{
|
|
name: "Update package list",
|
|
type: "execute",
|
|
targets: ["all"],
|
|
command: "apt update"
|
|
},
|
|
{
|
|
name: "User Approval",
|
|
type: "prompt",
|
|
message: "Packages updated. Proceed with upgrade?",
|
|
options: ["Yes", "No"]
|
|
},
|
|
{
|
|
name: "Upgrade packages",
|
|
type: "execute",
|
|
targets: ["all"],
|
|
command: "apt upgrade -y",
|
|
condition: "promptResponse === 'Yes'"
|
|
}
|
|
]
|
|
};
|
|
|
|
// Example 2: Complex workflow with conditions
|
|
const backupWorkflow = {
|
|
name: "Backup and Verify",
|
|
description: "Create backup and verify integrity",
|
|
steps: [
|
|
{
|
|
name: "Create backup",
|
|
type: "execute",
|
|
targets: ["all"],
|
|
command: "tar -czf /tmp/backup-$(date +%Y%m%d).tar.gz /opt/pulse-worker"
|
|
},
|
|
{
|
|
name: "Wait for backup",
|
|
type: "wait",
|
|
duration: 5000
|
|
},
|
|
{
|
|
name: "Verify backup",
|
|
type: "execute",
|
|
targets: ["all"],
|
|
command: "tar -tzf /tmp/backup-*.tar.gz > /dev/null && echo 'Backup OK' || echo 'Backup FAILED'"
|
|
},
|
|
{
|
|
name: "Cleanup decision",
|
|
type: "prompt",
|
|
message: "Backup complete. Delete old backups?",
|
|
options: ["Yes", "No", "Cancel"]
|
|
},
|
|
{
|
|
name: "Cleanup old backups",
|
|
type: "execute",
|
|
targets: ["all"],
|
|
command: "find /tmp -name 'backup-*.tar.gz' -mtime +7 -delete",
|
|
condition: "promptResponse === 'Yes'"
|
|
}
|
|
]
|
|
}; |