const express = require('express'); const http = require('http'); const WebSocket = require('ws'); const mysql = require('mysql2/promise'); const crypto = require('crypto'); require('dotenv').config(); const app = express(); const server = http.createServer(app); const wss = new WebSocket.Server({ server }); // Middleware app.use(express.json()); app.use(express.static('public')); // UUID generator function generateUUID() { return crypto.randomUUID(); } // Database pool const pool = mysql.createPool({ host: process.env.DB_HOST, port: process.env.DB_PORT || 3306, user: process.env.DB_USER, password: process.env.DB_PASSWORD, database: process.env.DB_NAME, waitForConnections: true, connectionLimit: 10, queueLimit: 0 }); // Initialize database tables async function initDatabase() { const connection = await pool.getConnection(); try { await connection.query(` CREATE TABLE IF NOT EXISTS users ( id VARCHAR(36) PRIMARY KEY, username VARCHAR(255) UNIQUE NOT NULL, display_name VARCHAR(255), email VARCHAR(255), groups TEXT, last_login TIMESTAMP, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ) `); // Database schema is managed manually - migrations removed after direct database fixes await connection.query(` CREATE TABLE IF NOT EXISTS workers ( id VARCHAR(36) PRIMARY KEY, name VARCHAR(255) UNIQUE NOT NULL, status VARCHAR(50) NOT NULL, last_heartbeat TIMESTAMP NULL, api_key VARCHAR(255), metadata JSON, INDEX idx_status (status), INDEX idx_heartbeat (last_heartbeat) ) `); await connection.query(` CREATE TABLE IF NOT EXISTS workflows ( id VARCHAR(36) PRIMARY KEY, name VARCHAR(255) NOT NULL, description TEXT, definition JSON NOT NULL, created_by VARCHAR(255), created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, INDEX idx_name (name) ) `); await connection.query(` CREATE TABLE IF NOT EXISTS executions ( id VARCHAR(36) PRIMARY KEY, workflow_id VARCHAR(36) NULL, status VARCHAR(50) NOT NULL, started_by VARCHAR(255), started_at TIMESTAMP NULL, completed_at TIMESTAMP NULL, logs JSON, INDEX idx_workflow (workflow_id), INDEX idx_status (status), INDEX idx_started (started_at) ) `); await connection.query(` CREATE TABLE IF NOT EXISTS scheduled_commands ( id VARCHAR(36) PRIMARY KEY, name VARCHAR(255) NOT NULL, command TEXT NOT NULL, worker_ids JSON NOT NULL, schedule_type VARCHAR(50) NOT NULL, schedule_value VARCHAR(255) NOT NULL, enabled BOOLEAN DEFAULT TRUE, created_by VARCHAR(255), created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, last_run TIMESTAMP NULL, next_run TIMESTAMP NULL, INDEX idx_enabled (enabled), INDEX idx_next_run (next_run) ) `); console.log('Database tables initialized successfully'); } catch (error) { console.error('Database initialization error:', error); throw error; } finally { connection.release(); } } // Auto-cleanup old executions (runs daily) async function cleanupOldExecutions() { try { const retentionDays = parseInt(process.env.EXECUTION_RETENTION_DAYS) || 30; const [result] = await pool.query( `DELETE FROM executions WHERE status IN ('completed', 'failed') AND started_at < DATE_SUB(NOW(), INTERVAL ? DAY)`, [retentionDays] ); console.log(`[Cleanup] Removed ${result.affectedRows} executions older than ${retentionDays} days`); } catch (error) { console.error('[Cleanup] Error removing old executions:', error); } } // Run cleanup daily at 3 AM setInterval(cleanupOldExecutions, 24 * 60 * 60 * 1000); // Run cleanup on startup cleanupOldExecutions(); // Scheduled Commands Processor async function processScheduledCommands() { try { const [schedules] = await pool.query( `SELECT * FROM scheduled_commands WHERE enabled = TRUE AND (next_run IS NULL OR next_run <= NOW())` ); for (const schedule of schedules) { console.log(`[Scheduler] Running scheduled command: ${schedule.name}`); const workerIds = JSON.parse(schedule.worker_ids); // Execute command on each worker for (const workerId of workerIds) { const workerWs = workers.get(workerId); if (workerWs && workerWs.readyState === WebSocket.OPEN) { const executionId = generateUUID(); // Create execution record await pool.query( 'INSERT INTO executions (id, workflow_id, status, started_by, started_at, logs) VALUES (?, ?, ?, ?, NOW(), ?)', [executionId, null, 'running', `scheduler:${schedule.name}`, JSON.stringify([{ step: 'scheduled_command', action: 'command_sent', worker_id: workerId, command: schedule.command, timestamp: new Date().toISOString() }])] ); // Send command to worker workerWs.send(JSON.stringify({ type: 'execute_command', execution_id: executionId, command: schedule.command, worker_id: workerId, timeout: 300000 // 5 minute timeout for scheduled commands })); broadcast({ type: 'execution_started', execution_id: executionId, workflow_id: null }); } } // Update last_run and calculate next_run const nextRun = calculateNextRun(schedule.schedule_type, schedule.schedule_value); await pool.query( 'UPDATE scheduled_commands SET last_run = NOW(), next_run = ? WHERE id = ?', [nextRun, schedule.id] ); } } catch (error) { console.error('[Scheduler] Error processing scheduled commands:', error); } } function calculateNextRun(scheduleType, scheduleValue) { const now = new Date(); if (scheduleType === 'interval') { // Interval in minutes const minutes = parseInt(scheduleValue); return new Date(now.getTime() + minutes * 60000); } else if (scheduleType === 'daily') { // Daily at HH:MM const [hours, minutes] = scheduleValue.split(':').map(Number); const next = new Date(now); next.setHours(hours, minutes, 0, 0); // If time has passed today, schedule for tomorrow if (next <= now) { next.setDate(next.getDate() + 1); } return next; } else if (scheduleType === 'hourly') { // Every N hours const hours = parseInt(scheduleValue); return new Date(now.getTime() + hours * 3600000); } return null; } // Run scheduler every minute setInterval(processScheduledCommands, 60 * 1000); // Initial run on startup setTimeout(processScheduledCommands, 5000); // WebSocket connections const clients = new Set(); const workers = new Map(); // Map worker_id -> WebSocket connection wss.on('connection', (ws) => { clients.add(ws); // Handle incoming messages from workers ws.on('message', async (data) => { try { const message = JSON.parse(data.toString()); console.log('WebSocket message received:', message.type); if (message.type === 'command_result') { // Handle command result from worker const { execution_id, command_id, worker_id, success, stdout, stderr, duration, timestamp } = message; // Add result to execution logs await addExecutionLog(execution_id, { step: 'command_execution', action: 'command_result', command_id: command_id, // Include command_id for workflow tracking worker_id: worker_id, success: success, stdout: stdout, stderr: stderr, duration: duration, timestamp: timestamp || new Date().toISOString() }); // For non-workflow executions, update status immediately // For workflow executions, the workflow engine will update status const [execution] = await pool.query('SELECT workflow_id FROM executions WHERE id = ?', [execution_id]); if (execution.length > 0 && !execution[0].workflow_id) { // Only update status for quick commands (no workflow_id) const finalStatus = success ? 'completed' : 'failed'; await updateExecutionStatus(execution_id, finalStatus); } // Broadcast result to all connected clients broadcast({ type: 'command_result', execution_id: execution_id, worker_id: worker_id, success: success, stdout: stdout, stderr: stderr }); console.log(`Command result received for execution ${execution_id}: ${success ? 'success' : 'failed'}`); } if (message.type === 'workflow_result') { // Handle workflow result from worker const { execution_id, worker_id, success, message: resultMessage, timestamp } = message; // Add final result to logs await addExecutionLog(execution_id, { step: 'workflow_completion', action: 'workflow_result', worker_id: worker_id, success: success, message: resultMessage, timestamp: timestamp || new Date().toISOString() }); // Update execution status const finalStatus = success ? 'completed' : 'failed'; await updateExecutionStatus(execution_id, finalStatus); // Broadcast completion to all clients broadcast({ type: 'workflow_result', execution_id: execution_id, status: finalStatus, success: success, message: resultMessage }); console.log(`Workflow result received for execution ${execution_id}: ${finalStatus}`); } if (message.type === 'worker_connect') { // Handle worker connection const { worker_id, worker_name } = message; console.log(`Worker connected: ${worker_name} (${worker_id})`); // Find the database worker ID by name const [dbWorkers] = await pool.query( 'SELECT id FROM workers WHERE name = ?', [worker_name] ); if (dbWorkers.length > 0) { const dbWorkerId = dbWorkers[0].id; // Store worker WebSocket connection using BOTH IDs workers.set(worker_id, ws); // Runtime ID workers.set(dbWorkerId, ws); // Database ID // Store mapping for cleanup ws.workerId = worker_id; ws.dbWorkerId = dbWorkerId; console.log(`Mapped worker: runtime_id=${worker_id}, db_id=${dbWorkerId}, name=${worker_name}`); // Update worker status to online await pool.query( `UPDATE workers SET status='online', last_heartbeat=NOW() WHERE id=?`, [dbWorkerId] ); // Broadcast worker status update with database ID broadcast({ type: 'worker_update', worker_id: dbWorkerId, status: 'online' }); } else { console.log(`Worker ${worker_name} not found in database, will be created on heartbeat`); } } if (message.type === 'pong') { // Handle worker pong response const { worker_id } = message; await pool.query( `UPDATE workers SET last_heartbeat=NOW() WHERE id=?`, [worker_id] ); } } catch (error) { console.error('WebSocket message error:', error); } }); ws.on('close', () => { clients.delete(ws); // Remove worker from workers map when disconnected (both runtime and db IDs) if (ws.workerId) { workers.delete(ws.workerId); console.log(`Worker ${ws.workerId} (runtime ID) disconnected`); } if (ws.dbWorkerId) { workers.delete(ws.dbWorkerId); console.log(`Worker ${ws.dbWorkerId} (database ID) disconnected`); } }); }); // Broadcast to all connected clients function broadcast(data) { clients.forEach(client => { if (client.readyState === WebSocket.OPEN) { client.send(JSON.stringify(data)); } }); } // Helper function to add log entry to execution async function addExecutionLog(executionId, logEntry) { try { const [execution] = await pool.query('SELECT logs FROM executions WHERE id = ?', [executionId]); if (execution.length > 0) { const logs = typeof execution[0].logs === 'string' ? JSON.parse(execution[0].logs) : execution[0].logs; logs.push(logEntry); await pool.query( 'UPDATE executions SET logs = ? WHERE id = ?', [JSON.stringify(logs), executionId] ); } } catch (error) { console.error(`[Workflow] Error adding execution log:`, error); } } // Helper function to update execution status async function updateExecutionStatus(executionId, status) { try { await pool.query( 'UPDATE executions SET status = ?, completed_at = NOW() WHERE id = ?', [status, executionId] ); broadcast({ type: 'execution_status', execution_id: executionId, status: status }); console.log(`[Workflow] Execution ${executionId} status updated to: ${status}`); } catch (error) { console.error(`[Workflow] Error updating execution status:`, error); } } // Authelia SSO Middleware async function authenticateSSO(req, res, next) { // Check for Authelia headers const remoteUser = req.headers['remote-user']; const remoteName = req.headers['remote-name']; const remoteEmail = req.headers['remote-email']; const remoteGroups = req.headers['remote-groups']; if (!remoteUser) { return res.status(401).json({ error: 'Not authenticated', message: 'Please access this service through auth.lotusguild.org' }); } // Check if user is in allowed groups (admin or employee) const groups = remoteGroups ? remoteGroups.split(',').map(g => g.trim()) : []; const allowedGroups = ['admin', 'employee']; const hasAccess = groups.some(g => allowedGroups.includes(g)); if (!hasAccess) { return res.status(403).json({ error: 'Access denied', message: 'You must be in admin or employee group' }); } // Store/update user in database try { const userId = generateUUID(); await pool.query( `INSERT INTO users (id, username, display_name, email, groups, last_login) VALUES (?, ?, ?, ?, ?, NOW()) ON DUPLICATE KEY UPDATE display_name=VALUES(display_name), email=VALUES(email), groups=VALUES(groups), last_login=NOW()`, [userId, remoteUser, remoteName, remoteEmail, remoteGroups] ); } catch (error) { console.error('Error updating user:', error); } // Attach user info to request req.user = { username: remoteUser, name: remoteName || remoteUser, email: remoteEmail || '', groups: groups, isAdmin: groups.includes('admin') }; next(); } // Workflow Execution Engine async function executeWorkflowSteps(executionId, workflowId, definition, username) { try { console.log(`[Workflow] Starting execution ${executionId} for workflow ${workflowId}`); const steps = definition.steps || []; let allStepsSucceeded = true; for (let i = 0; i < steps.length; i++) { const step = steps[i]; console.log(`[Workflow] Execution ${executionId} - Step ${i + 1}/${steps.length}: ${step.name}`); // Add step start log await addExecutionLog(executionId, { step: i + 1, step_name: step.name, action: 'step_started', timestamp: new Date().toISOString() }); broadcast({ type: 'workflow_step_started', execution_id: executionId, step: i + 1, step_name: step.name }); if (step.type === 'execute') { // Execute command step const success = await executeCommandStep(executionId, step, i + 1); if (!success) { allStepsSucceeded = false; break; // Stop workflow on failure } } else if (step.type === 'wait') { // Wait step (delay in seconds) const seconds = step.duration || 5; await addExecutionLog(executionId, { step: i + 1, action: 'waiting', duration: seconds, timestamp: new Date().toISOString() }); await new Promise(resolve => setTimeout(resolve, seconds * 1000)); } else if (step.type === 'prompt') { // Interactive prompt step (not fully implemented, would need user interaction) await addExecutionLog(executionId, { step: i + 1, action: 'prompt_skipped', message: 'Interactive prompts not yet supported', timestamp: new Date().toISOString() }); } // Add step completion log await addExecutionLog(executionId, { step: i + 1, step_name: step.name, action: 'step_completed', timestamp: new Date().toISOString() }); broadcast({ type: 'workflow_step_completed', execution_id: executionId, step: i + 1, step_name: step.name }); } // Mark execution as completed or failed const finalStatus = allStepsSucceeded ? 'completed' : 'failed'; await updateExecutionStatus(executionId, finalStatus); console.log(`[Workflow] Execution ${executionId} finished with status: ${finalStatus}`); } catch (error) { console.error(`[Workflow] Execution ${executionId} error:`, error); await addExecutionLog(executionId, { action: 'workflow_error', error: error.message, timestamp: new Date().toISOString() }); await updateExecutionStatus(executionId, 'failed'); } } async function executeCommandStep(executionId, step, stepNumber) { try { const command = step.command; const targets = step.targets || ['all']; // Determine which workers to target let targetWorkerIds = []; if (targets.includes('all')) { // Get all online workers const [onlineWorkers] = await pool.query('SELECT id FROM workers WHERE status = ?', ['online']); targetWorkerIds = onlineWorkers.map(w => w.id); } else { // Specific worker IDs or names for (const target of targets) { // Try to find by ID first, then by name const [workerById] = await pool.query('SELECT id FROM workers WHERE id = ?', [target]); if (workerById.length > 0) { targetWorkerIds.push(workerById[0].id); } else { const [workerByName] = await pool.query('SELECT id FROM workers WHERE name = ?', [target]); if (workerByName.length > 0) { targetWorkerIds.push(workerByName[0].id); } } } } if (targetWorkerIds.length === 0) { await addExecutionLog(executionId, { step: stepNumber, action: 'no_workers', message: 'No workers available for this step', timestamp: new Date().toISOString() }); return false; } // Execute command on each target worker and wait for results const results = []; for (const workerId of targetWorkerIds) { const workerWs = workers.get(workerId); if (!workerWs || workerWs.readyState !== WebSocket.OPEN) { await addExecutionLog(executionId, { step: stepNumber, action: 'worker_offline', worker_id: workerId, timestamp: new Date().toISOString() }); continue; } // Send command to worker const commandId = generateUUID(); await addExecutionLog(executionId, { step: stepNumber, action: 'command_sent', worker_id: workerId, command: command, command_id: commandId, timestamp: new Date().toISOString() }); workerWs.send(JSON.stringify({ type: 'execute_command', execution_id: executionId, command_id: commandId, command: command, worker_id: workerId, timeout: 120000 // 2 minute timeout })); // Wait for command result (with timeout) const result = await waitForCommandResult(executionId, commandId, 120000); results.push(result); if (!result.success) { // Command failed, workflow should stop return false; } } // All commands succeeded return results.every(r => r.success); } catch (error) { console.error(`[Workflow] Error executing command step:`, error); await addExecutionLog(executionId, { step: stepNumber, action: 'step_error', error: error.message, timestamp: new Date().toISOString() }); return false; } } async function waitForCommandResult(executionId, commandId, timeout) { return new Promise((resolve) => { const startTime = Date.now(); const checkInterval = setInterval(async () => { try { // Check if we've received the command result in logs const [execution] = await pool.query('SELECT logs FROM executions WHERE id = ?', [executionId]); if (execution.length > 0) { const logs = typeof execution[0].logs === 'string' ? JSON.parse(execution[0].logs) : execution[0].logs; const resultLog = logs.find(log => log.command_id === commandId && log.action === 'command_result'); if (resultLog) { clearInterval(checkInterval); resolve({ success: resultLog.success, stdout: resultLog.stdout, stderr: resultLog.stderr }); return; } } // Check timeout if (Date.now() - startTime > timeout) { clearInterval(checkInterval); resolve({ success: false, error: 'Command timeout' }); } } catch (error) { console.error('[Workflow] Error checking command result:', error); clearInterval(checkInterval); resolve({ success: false, error: error.message }); } }, 500); // Check every 500ms }); } // Routes - All protected by SSO app.get('/api/user', authenticateSSO, (req, res) => { res.json(req.user); }); app.get('/api/workflows', authenticateSSO, async (req, res) => { try { const [rows] = await pool.query('SELECT * FROM workflows ORDER BY created_at DESC'); res.json(rows); } catch (error) { res.status(500).json({ error: error.message }); } }); app.post('/api/workflows', authenticateSSO, async (req, res) => { try { const { name, description, definition } = req.body; const id = generateUUID(); console.log('[Workflow] Creating workflow:', name); console.log('[Workflow] Definition:', JSON.stringify(definition, null, 2)); await pool.query( 'INSERT INTO workflows (id, name, description, definition, created_by) VALUES (?, ?, ?, ?, ?)', [id, name, description, JSON.stringify(definition), req.user.username] ); console.log('[Workflow] Successfully inserted workflow:', id); res.json({ id, name, description, definition }); console.log('[Workflow] Broadcasting workflow_created'); broadcast({ type: 'workflow_created', workflow_id: id }); console.log('[Workflow] Broadcast complete'); } catch (error) { console.error('[Workflow] Error creating workflow:', error); res.status(500).json({ error: error.message }); } }); app.delete('/api/workflows/:id', authenticateSSO, async (req, res) => { try { // Only admins can delete workflows if (!req.user.isAdmin) { return res.status(403).json({ error: 'Admin access required' }); } await pool.query('DELETE FROM workflows WHERE id = ?', [req.params.id]); res.json({ success: true }); broadcast({ type: 'workflow_deleted', workflow_id: req.params.id }); } catch (error) { res.status(500).json({ error: error.message }); } }); app.get('/api/workers', authenticateSSO, async (req, res) => { try { const [rows] = await pool.query('SELECT * FROM workers ORDER BY name'); res.json(rows); } catch (error) { res.status(500).json({ error: error.message }); } }); app.post('/api/workers/heartbeat', async (req, res) => { try { const { worker_id, name, metadata } = req.body; const apiKey = req.headers['x-api-key']; // Verify API key if (apiKey !== process.env.WORKER_API_KEY) { return res.status(401).json({ error: 'Invalid API key' }); } await pool.query( `INSERT INTO workers (id, name, status, last_heartbeat, api_key, metadata) VALUES (?, ?, 'online', NOW(), ?, ?) ON DUPLICATE KEY UPDATE status='online', last_heartbeat=NOW(), metadata=VALUES(metadata)`, [worker_id, name, apiKey, JSON.stringify(metadata)] ); broadcast({ type: 'worker_update', worker_id, status: 'online' }); res.json({ success: true }); } catch (error) { res.status(500).json({ error: error.message }); } }); app.post('/api/executions', authenticateSSO, async (req, res) => { try { const { workflow_id } = req.body; const id = generateUUID(); // Get workflow definition const [workflows] = await pool.query('SELECT * FROM workflows WHERE id = ?', [workflow_id]); if (workflows.length === 0) { return res.status(404).json({ error: 'Workflow not found' }); } const workflow = workflows[0]; const definition = typeof workflow.definition === 'string' ? JSON.parse(workflow.definition) : workflow.definition; // Create execution record await pool.query( 'INSERT INTO executions (id, workflow_id, status, started_by, started_at, logs) VALUES (?, ?, ?, ?, NOW(), ?)', [id, workflow_id, 'running', req.user.username, JSON.stringify([])] ); broadcast({ type: 'execution_started', execution_id: id, workflow_id }); // Start workflow execution asynchronously executeWorkflowSteps(id, workflow_id, definition, req.user.username).catch(err => { console.error(`[Workflow] Execution ${id} failed:`, err); }); res.json({ id, workflow_id, status: 'running' }); } catch (error) { res.status(500).json({ error: error.message }); } }); app.get('/api/executions', authenticateSSO, async (req, res) => { try { const limit = parseInt(req.query.limit) || 50; const offset = parseInt(req.query.offset) || 0; const [rows] = await pool.query( 'SELECT e.*, w.name as workflow_name FROM executions e LEFT JOIN workflows w ON e.workflow_id = w.id ORDER BY e.started_at DESC LIMIT ? OFFSET ?', [limit, offset] ); // Get total count const [countRows] = await pool.query('SELECT COUNT(*) as total FROM executions'); const total = countRows[0].total; res.json({ executions: rows, total: total, limit: limit, offset: offset, hasMore: offset + rows.length < total }); } catch (error) { res.status(500).json({ error: error.message }); } }); app.delete('/api/executions/:id', authenticateSSO, async (req, res) => { try { await pool.query('DELETE FROM executions WHERE id = ?', [req.params.id]); broadcast({ type: 'execution_deleted', execution_id: req.params.id }); res.json({ success: true }); } catch (error) { res.status(500).json({ error: error.message }); } }); // Scheduled Commands API app.get('/api/scheduled-commands', authenticateSSO, async (req, res) => { try { const [schedules] = await pool.query( 'SELECT * FROM scheduled_commands ORDER BY created_at DESC' ); res.json(schedules); } catch (error) { res.status(500).json({ error: error.message }); } }); app.post('/api/scheduled-commands', authenticateSSO, async (req, res) => { try { const { name, command, worker_ids, schedule_type, schedule_value } = req.body; if (!name || !command || !worker_ids || !schedule_type || !schedule_value) { return res.status(400).json({ error: 'Missing required fields' }); } const id = generateUUID(); const nextRun = calculateNextRun(schedule_type, schedule_value); await pool.query( `INSERT INTO scheduled_commands (id, name, command, worker_ids, schedule_type, schedule_value, created_by, next_run) VALUES (?, ?, ?, ?, ?, ?, ?, ?)`, [id, name, command, JSON.stringify(worker_ids), schedule_type, schedule_value, req.user.username, nextRun] ); res.json({ success: true, id }); } catch (error) { res.status(500).json({ error: error.message }); } }); app.put('/api/scheduled-commands/:id/toggle', authenticateSSO, async (req, res) => { try { const { id } = req.params; const [current] = await pool.query('SELECT enabled FROM scheduled_commands WHERE id = ?', [id]); if (current.length === 0) { return res.status(404).json({ error: 'Schedule not found' }); } const newEnabled = !current[0].enabled; await pool.query('UPDATE scheduled_commands SET enabled = ? WHERE id = ?', [newEnabled, id]); res.json({ success: true, enabled: newEnabled }); } catch (error) { res.status(500).json({ error: error.message }); } }); app.delete('/api/scheduled-commands/:id', authenticateSSO, async (req, res) => { try { const { id } = req.params; await pool.query('DELETE FROM scheduled_commands WHERE id = ?', [id]); res.json({ success: true }); } catch (error) { res.status(500).json({ error: error.message }); } }); // Health check (no auth required) app.get('/health', async (req, res) => { try { await pool.query('SELECT 1'); res.json({ status: 'ok', timestamp: new Date().toISOString(), database: 'connected', auth: 'authelia-sso' }); } catch (error) { res.status(500).json({ status: 'error', timestamp: new Date().toISOString(), database: 'disconnected', error: error.message }); } }); // Start server const PORT = process.env.PORT || 8080; const HOST = process.env.HOST || '0.0.0.0'; initDatabase().then(() => { server.listen(PORT, HOST, () => { console.log(`PULSE Server running on http://${HOST}:${PORT}`); console.log(`Connected to MariaDB at ${process.env.DB_HOST}`); console.log(`Authentication: Authelia SSO`); console.log(`Worker API Key configured: ${process.env.WORKER_API_KEY ? 'Yes' : 'No'}`); }); }).catch(err => { console.error('Failed to start server:', err); process.exit(1); }); // ============================================ // WORKFLOW EXECUTION ENGINE // Get execution details with logs app.get('/api/executions/:id', authenticateSSO, async (req, res) => { try { const [rows] = await pool.query('SELECT * FROM executions WHERE id = ?', [req.params.id]); if (rows.length === 0) { return res.status(404).json({ error: 'Not found' }); } const execution = rows[0]; res.json({ ...execution, logs: JSON.parse(execution.logs || '[]') }); } catch (error) { res.status(500).json({ error: error.message }); } }); // Delete worker (admin only) app.delete('/api/workers/:id', authenticateSSO, async (req, res) => { try { if (!req.user.isAdmin) { return res.status(403).json({ error: 'Admin access required' }); } await pool.query('DELETE FROM workers WHERE id = ?', [req.params.id]); res.json({ success: true }); broadcast({ type: 'worker_deleted', worker_id: req.params.id }); } catch (error) { res.status(500).json({ error: error.message }); } }); // Send direct command to specific worker (for testing) app.post('/api/workers/:id/command', authenticateSSO, async (req, res) => { try { const { command } = req.body; const executionId = generateUUID(); const workerId = req.params.id; // Create execution record in database await pool.query( 'INSERT INTO executions (id, workflow_id, status, started_by, started_at, logs) VALUES (?, ?, ?, ?, NOW(), ?)', [executionId, null, 'running', req.user.username, JSON.stringify([{ step: 'quick_command', action: 'command_sent', worker_id: workerId, command: command, timestamp: new Date().toISOString() }])] ); // Send command via WebSocket to specific worker const commandMessage = { type: 'execute_command', execution_id: executionId, command: command, worker_id: workerId, timeout: 60000 }; const workerWs = workers.get(workerId); if (!workerWs || workerWs.readyState !== WebSocket.OPEN) { return res.status(400).json({ error: 'Worker not connected' }); } workerWs.send(JSON.stringify(commandMessage)); console.log(`Command sent to worker ${workerId}: ${command}`); broadcast({ type: 'execution_started', execution_id: executionId, workflow_id: null }); res.json({ success: true, execution_id: executionId }); } catch (error) { res.status(500).json({ error: error.message }); } }); // ============================================ // EXAMPLE WORKFLOW DEFINITIONS // ============================================ // Example 1: Simple command execution const simpleWorkflow = { name: "Update System Packages", description: "Update all packages on target servers", steps: [ { name: "Update package list", type: "execute", targets: ["all"], command: "apt update" }, { name: "User Approval", type: "prompt", message: "Packages updated. Proceed with upgrade?", options: ["Yes", "No"] }, { name: "Upgrade packages", type: "execute", targets: ["all"], command: "apt upgrade -y", condition: "promptResponse === 'Yes'" } ] }; // Example 2: Complex workflow with conditions const backupWorkflow = { name: "Backup and Verify", description: "Create backup and verify integrity", steps: [ { name: "Create backup", type: "execute", targets: ["all"], command: "tar -czf /tmp/backup-$(date +%Y%m%d).tar.gz /opt/pulse-worker" }, { name: "Wait for backup", type: "wait", duration: 5000 }, { name: "Verify backup", type: "execute", targets: ["all"], command: "tar -tzf /tmp/backup-*.tar.gz > /dev/null && echo 'Backup OK' || echo 'Backup FAILED'" }, { name: "Cleanup decision", type: "prompt", message: "Backup complete. Delete old backups?", options: ["Yes", "No", "Cancel"] }, { name: "Cleanup old backups", type: "execute", targets: ["all"], command: "find /tmp -name 'backup-*.tar.gz' -mtime +7 -delete", condition: "promptResponse === 'Yes'" } ] };