const express = require('express'); const http = require('http'); const WebSocket = require('ws'); const mysql = require('mysql2/promise'); const crypto = require('crypto'); require('dotenv').config(); const app = express(); const server = http.createServer(app); const wss = new WebSocket.Server({ server }); // Middleware app.use(express.json()); app.use(express.static('public')); // UUID generator function generateUUID() { return crypto.randomUUID(); } // Database pool const pool = mysql.createPool({ host: process.env.DB_HOST, port: process.env.DB_PORT || 3306, user: process.env.DB_USER, password: process.env.DB_PASSWORD, database: process.env.DB_NAME, waitForConnections: true, connectionLimit: 10, queueLimit: 0 }); // Initialize database tables async function initDatabase() { const connection = await pool.getConnection(); try { await connection.query(` CREATE TABLE IF NOT EXISTS users ( id VARCHAR(36) PRIMARY KEY, username VARCHAR(255) UNIQUE NOT NULL, display_name VARCHAR(255), email VARCHAR(255), groups TEXT, last_login TIMESTAMP, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ) `); await connection.query(` CREATE TABLE IF NOT EXISTS workers ( id VARCHAR(36) PRIMARY KEY, name VARCHAR(255) UNIQUE NOT NULL, status VARCHAR(50) NOT NULL, last_heartbeat TIMESTAMP NULL, api_key VARCHAR(255), metadata JSON, INDEX idx_status (status), INDEX idx_heartbeat (last_heartbeat) ) `); await connection.query(` CREATE TABLE IF NOT EXISTS workflows ( id VARCHAR(36) PRIMARY KEY, name VARCHAR(255) NOT NULL, description TEXT, definition JSON NOT NULL, created_by VARCHAR(255), created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, INDEX idx_name (name) ) `); await connection.query(` CREATE TABLE IF NOT EXISTS executions ( id VARCHAR(36) PRIMARY KEY, workflow_id VARCHAR(36) NOT NULL, status VARCHAR(50) NOT NULL, started_by VARCHAR(255), started_at TIMESTAMP NULL, completed_at TIMESTAMP NULL, logs JSON, FOREIGN KEY (workflow_id) REFERENCES workflows(id) ON DELETE CASCADE, INDEX idx_workflow (workflow_id), INDEX idx_status (status), INDEX idx_started (started_at) ) `); console.log('Database tables initialized successfully'); } catch (error) { console.error('Database initialization error:', error); throw error; } finally { connection.release(); } } // WebSocket connections const clients = new Set(); wss.on('connection', (ws) => { clients.add(ws); ws.on('close', () => clients.delete(ws)); }); // Broadcast to all connected clients function broadcast(data) { clients.forEach(client => { if (client.readyState === WebSocket.OPEN) { client.send(JSON.stringify(data)); } }); } // Authelia SSO Middleware async function authenticateSSO(req, res, next) { // Check for Authelia headers const remoteUser = req.headers['remote-user']; const remoteName = req.headers['remote-name']; const remoteEmail = req.headers['remote-email']; const remoteGroups = req.headers['remote-groups']; if (!remoteUser) { return res.status(401).json({ error: 'Not authenticated', message: 'Please access this service through auth.lotusguild.org' }); } // Check if user is in allowed groups (admin or employee) const groups = remoteGroups ? remoteGroups.split(',').map(g => g.trim()) : []; const allowedGroups = ['admin', 'employee']; const hasAccess = groups.some(g => allowedGroups.includes(g)); if (!hasAccess) { return res.status(403).json({ error: 'Access denied', message: 'You must be in admin or employee group' }); } // Store/update user in database try { const userId = generateUUID(); await pool.query( `INSERT INTO users (id, username, display_name, email, groups, last_login) VALUES (?, ?, ?, ?, ?, NOW()) ON DUPLICATE KEY UPDATE display_name=VALUES(display_name), email=VALUES(email), groups=VALUES(groups), last_login=NOW()`, [userId, remoteUser, remoteName, remoteEmail, remoteGroups] ); } catch (error) { console.error('Error updating user:', error); } // Attach user info to request req.user = { username: remoteUser, name: remoteName || remoteUser, email: remoteEmail || '', groups: groups, isAdmin: groups.includes('admin') }; next(); } // Routes - All protected by SSO app.get('/api/user', authenticateSSO, (req, res) => { res.json(req.user); }); app.get('/api/workflows', authenticateSSO, async (req, res) => { try { const [rows] = await pool.query('SELECT * FROM workflows ORDER BY created_at DESC'); res.json(rows); } catch (error) { res.status(500).json({ error: error.message }); } }); app.post('/api/workflows', authenticateSSO, async (req, res) => { try { const { name, description, definition } = req.body; const id = generateUUID(); await pool.query( 'INSERT INTO workflows (id, name, description, definition, created_by) VALUES (?, ?, ?, ?, ?)', [id, name, description, JSON.stringify(definition), req.user.username] ); res.json({ id, name, description, definition }); broadcast({ type: 'workflow_created', workflow_id: id }); } catch (error) { res.status(500).json({ error: error.message }); } }); app.delete('/api/workflows/:id', authenticateSSO, async (req, res) => { try { // Only admins can delete workflows if (!req.user.isAdmin) { return res.status(403).json({ error: 'Admin access required' }); } await pool.query('DELETE FROM workflows WHERE id = ?', [req.params.id]); res.json({ success: true }); broadcast({ type: 'workflow_deleted', workflow_id: req.params.id }); } catch (error) { res.status(500).json({ error: error.message }); } }); app.get('/api/workers', authenticateSSO, async (req, res) => { try { const [rows] = await pool.query('SELECT * FROM workers ORDER BY name'); res.json(rows); } catch (error) { res.status(500).json({ error: error.message }); } }); app.post('/api/workers/heartbeat', async (req, res) => { try { const { worker_id, name, metadata } = req.body; const apiKey = req.headers['x-api-key']; // Verify API key if (apiKey !== process.env.WORKER_API_KEY) { return res.status(401).json({ error: 'Invalid API key' }); } await pool.query( `INSERT INTO workers (id, name, status, last_heartbeat, api_key, metadata) VALUES (?, ?, 'online', NOW(), ?, ?) ON DUPLICATE KEY UPDATE status='online', last_heartbeat=NOW(), metadata=VALUES(metadata)`, [worker_id, name, apiKey, JSON.stringify(metadata)] ); broadcast({ type: 'worker_update', worker_id, status: 'online' }); res.json({ success: true }); } catch (error) { res.status(500).json({ error: error.message }); } }); app.post('/api/executions', authenticateSSO, async (req, res) => { try { const { workflow_id } = req.body; const id = generateUUID(); await pool.query( 'INSERT INTO executions (id, workflow_id, status, started_by, started_at, logs) VALUES (?, ?, ?, ?, NOW(), ?)', [id, workflow_id, 'running', req.user.username, JSON.stringify([])] ); broadcast({ type: 'execution_started', execution_id: id, workflow_id }); res.json({ id, workflow_id, status: 'running' }); } catch (error) { res.status(500).json({ error: error.message }); } }); app.get('/api/executions', authenticateSSO, async (req, res) => { try { const [rows] = await pool.query( 'SELECT e.*, w.name as workflow_name FROM executions e LEFT JOIN workflows w ON e.workflow_id = w.id ORDER BY e.started_at DESC LIMIT 50' ); res.json(rows); } catch (error) { res.status(500).json({ error: error.message }); } }); app.get('/api/executions/:id', authenticateSSO, async (req, res) => { try { const [rows] = await pool.query('SELECT * FROM executions WHERE id = ?', [req.params.id]); if (rows.length === 0) { return res.status(404).json({ error: 'Not found' }); } res.json(rows[0]); } catch (error) { res.status(500).json({ error: error.message }); } }); // Health check (no auth required) app.get('/health', async (req, res) => { try { await pool.query('SELECT 1'); res.json({ status: 'ok', timestamp: new Date().toISOString(), database: 'connected', auth: 'authelia-sso' }); } catch (error) { res.status(500).json({ status: 'error', timestamp: new Date().toISOString(), database: 'disconnected', error: error.message }); } }); // Start server const PORT = process.env.PORT || 8080; const HOST = process.env.HOST || '0.0.0.0'; initDatabase().then(() => { server.listen(PORT, HOST, () => { console.log(`PULSE Server running on http://${HOST}:${PORT}`); console.log(`Connected to MariaDB at ${process.env.DB_HOST}`); console.log(`Authentication: Authelia SSO`); console.log(`Worker API Key configured: ${process.env.WORKER_API_KEY ? 'Yes' : 'No'}`); }); }).catch(err => { console.error('Failed to start server:', err); process.exit(1); }); // ============================================ // WORKFLOW EXECUTION ENGINE // ============================================ // Store active workflow executions in memory const activeExecutions = new Map(); // Execute workflow step by step async function executeWorkflow(workflowId, executionId, userId, targetWorkers = 'all') { try { // Get workflow definition const [workflows] = await pool.query('SELECT * FROM workflows WHERE id = ?', [workflowId]); if (workflows.length === 0) { throw new Error('Workflow not found'); } const workflow = workflows[0]; const definition = JSON.parse(workflow.definition); // Initialize execution state const executionState = { id: executionId, workflowId: workflowId, currentStep: 0, steps: definition.steps || [], results: [], status: 'running', waitingForInput: false, targetWorkers: targetWorkers, userId: userId }; activeExecutions.set(executionId, executionState); // Start executing steps await executeNextStep(executionId); } catch (error) { console.error('Workflow execution error:', error); await updateExecutionStatus(executionId, 'failed', error.message); } } // Execute the next step in a workflow async function executeNextStep(executionId) { const state = activeExecutions.get(executionId); if (!state) return; // Check if we've completed all steps if (state.currentStep >= state.steps.length) { await updateExecutionStatus(executionId, 'completed'); activeExecutions.delete(executionId); return; } const step = state.steps[state.currentStep]; // Check if step has a condition if (step.condition && !evaluateCondition(step.condition, state)) { console.log(`[Workflow] Skipping step ${state.currentStep}: condition not met`); state.currentStep++; return executeNextStep(executionId); } console.log(`[Workflow] Executing step ${state.currentStep}: ${step.name}`); try { switch (step.type) { case 'execute': await executeCommandStep(executionId, step); break; case 'prompt': await executePromptStep(executionId, step); break; case 'wait': await executeWaitStep(executionId, step); break; default: throw new Error(`Unknown step type: ${step.type}`); } } catch (error) { await addExecutionLog(executionId, { step: state.currentStep, error: error.message, timestamp: new Date().toISOString() }); await updateExecutionStatus(executionId, 'failed', error.message); activeExecutions.delete(executionId); } } // Execute a command on workers async function executeCommandStep(executionId, step) { const state = activeExecutions.get(executionId); // Get target workers const [workers] = await pool.query( 'SELECT * FROM workers WHERE status = "online"' ); if (workers.length === 0) { throw new Error('No online workers available'); } // Filter workers based on target let targetWorkers = workers; if (step.targets && step.targets[0] !== 'all') { targetWorkers = workers.filter(w => step.targets.includes(w.name)); } // Send command to workers via WebSocket const commandMessage = { type: 'execute_command', execution_id: executionId, step_index: state.currentStep, command: step.command, timeout: step.timeout || 300000 }; // Broadcast to target workers clients.forEach(client => { if (client.readyState === WebSocket.OPEN) { client.send(JSON.stringify(commandMessage)); } }); await addExecutionLog(executionId, { step: state.currentStep, action: 'command_sent', command: step.command, workers: targetWorkers.map(w => w.name), timestamp: new Date().toISOString() }); // For now, move to next step immediately // In production, we'd wait for worker responses state.currentStep++; // Small delay to allow command to execute setTimeout(() => executeNextStep(executionId), 1000); } // Execute a user prompt step async function executePromptStep(executionId, step) { const state = activeExecutions.get(executionId); state.waitingForInput = true; state.promptData = { message: step.message, options: step.options, step: state.currentStep }; await addExecutionLog(executionId, { step: state.currentStep, action: 'waiting_for_input', prompt: step.message, timestamp: new Date().toISOString() }); // Notify frontend that input is needed broadcast({ type: 'execution_prompt', execution_id: executionId, prompt: state.promptData }); } // Execute a wait/delay step async function executeWaitStep(executionId, step) { const state = activeExecutions.get(executionId); const delay = step.duration || 1000; await addExecutionLog(executionId, { step: state.currentStep, action: 'waiting', duration: delay, timestamp: new Date().toISOString() }); state.currentStep++; setTimeout(() => executeNextStep(executionId), delay); } // Handle user input for prompts function handleUserInput(executionId, response) { const state = activeExecutions.get(executionId); if (!state || !state.waitingForInput) { return false; } state.promptResponse = response; state.waitingForInput = false; state.currentStep++; addExecutionLog(executionId, { step: state.currentStep - 1, action: 'user_response', response: response, timestamp: new Date().toISOString() }); executeNextStep(executionId); return true; } // Evaluate conditions function evaluateCondition(condition, state) { try { // Simple condition evaluation // In production, use a proper expression evaluator const promptResponse = state.promptResponse; return eval(condition); } catch (error) { console.error('Condition evaluation error:', error); return false; } } // Helper functions async function updateExecutionStatus(executionId, status, error = null) { const updates = { status }; if (status === 'completed' || status === 'failed') { updates.completed_at = new Date(); } if (error) { // Add error to logs await addExecutionLog(executionId, { error, timestamp: new Date().toISOString() }); } await pool.query( 'UPDATE executions SET status = ?, completed_at = ? WHERE id = ?', [status, updates.completed_at || null, executionId] ); broadcast({ type: 'execution_status', execution_id: executionId, status: status }); } async function addExecutionLog(executionId, logEntry) { const [rows] = await pool.query('SELECT logs FROM executions WHERE id = ?', [executionId]); if (rows.length === 0) return; const logs = JSON.parse(rows[0].logs || '[]'); logs.push(logEntry); await pool.query('UPDATE executions SET logs = ? WHERE id = ?', [ JSON.stringify(logs), executionId ]); } // ============================================ // API ROUTES - Add these to your server.js // ============================================ // Start workflow execution app.post('/api/executions', authenticateSSO, async (req, res) => { try { const { workflow_id, target_workers } = req.body; const id = generateUUID(); await pool.query( 'INSERT INTO executions (id, workflow_id, status, started_by, started_at, logs) VALUES (?, ?, ?, ?, NOW(), ?)', [id, workflow_id, 'running', req.user.username, JSON.stringify([])] ); // Start execution executeWorkflow(workflow_id, id, req.user.username, target_workers || 'all'); broadcast({ type: 'execution_started', execution_id: id, workflow_id }); res.json({ id, workflow_id, status: 'running' }); } catch (error) { res.status(500).json({ error: error.message }); } }); // Respond to workflow prompt app.post('/api/executions/:id/respond', authenticateSSO, async (req, res) => { try { const { response } = req.body; const success = handleUserInput(req.params.id, response); if (success) { res.json({ success: true }); } else { res.status(400).json({ error: 'Execution not waiting for input' }); } } catch (error) { res.status(500).json({ error: error.message }); } }); // Get execution details with logs app.get('/api/executions/:id', authenticateSSO, async (req, res) => { try { const [rows] = await pool.query('SELECT * FROM executions WHERE id = ?', [req.params.id]); if (rows.length === 0) { return res.status(404).json({ error: 'Not found' }); } const execution = rows[0]; const state = activeExecutions.get(req.params.id); res.json({ ...execution, logs: JSON.parse(execution.logs || '[]'), waiting_for_input: state?.waitingForInput || false, prompt: state?.promptData || null }); } catch (error) { res.status(500).json({ error: error.message }); } }); // Delete worker (admin only) app.delete('/api/workers/:id', authenticateSSO, async (req, res) => { try { if (!req.user.isAdmin) { return res.status(403).json({ error: 'Admin access required' }); } await pool.query('DELETE FROM workers WHERE id = ?', [req.params.id]); res.json({ success: true }); broadcast({ type: 'worker_deleted', worker_id: req.params.id }); } catch (error) { res.status(500).json({ error: error.message }); } }); // Send direct command to specific worker (for testing) app.post('/api/workers/:id/command', authenticateSSO, async (req, res) => { try { const { command } = req.body; const executionId = generateUUID(); // Send command via WebSocket const commandMessage = { type: 'execute_command', execution_id: executionId, command: command, worker_id: req.params.id, timeout: 60000 }; clients.forEach(client => { if (client.readyState === WebSocket.OPEN) { client.send(JSON.stringify(commandMessage)); } }); res.json({ success: true, execution_id: executionId }); } catch (error) { res.status(500).json({ error: error.message }); } }); // ============================================ // EXAMPLE WORKFLOW DEFINITIONS // ============================================ // Example 1: Simple command execution const simpleWorkflow = { name: "Update System Packages", description: "Update all packages on target servers", steps: [ { name: "Update package list", type: "execute", targets: ["all"], command: "apt update" }, { name: "User Approval", type: "prompt", message: "Packages updated. Proceed with upgrade?", options: ["Yes", "No"] }, { name: "Upgrade packages", type: "execute", targets: ["all"], command: "apt upgrade -y", condition: "promptResponse === 'Yes'" } ] }; // Example 2: Complex workflow with conditions const backupWorkflow = { name: "Backup and Verify", description: "Create backup and verify integrity", steps: [ { name: "Create backup", type: "execute", targets: ["all"], command: "tar -czf /tmp/backup-$(date +%Y%m%d).tar.gz /opt/pulse-worker" }, { name: "Wait for backup", type: "wait", duration: 5000 }, { name: "Verify backup", type: "execute", targets: ["all"], command: "tar -tzf /tmp/backup-*.tar.gz > /dev/null && echo 'Backup OK' || echo 'Backup FAILED'" }, { name: "Cleanup decision", type: "prompt", message: "Backup complete. Delete old backups?", options: ["Yes", "No", "Cancel"] }, { name: "Cleanup old backups", type: "execute", targets: ["all"], command: "find /tmp -name 'backup-*.tar.gz' -mtime +7 -delete", condition: "promptResponse === 'Yes'" } ] };