Files
pulse/server.js

758 lines
21 KiB
JavaScript
Raw Normal View History

const express = require('express');
const http = require('http');
const WebSocket = require('ws');
const mysql = require('mysql2/promise');
const crypto = require('crypto');
require('dotenv').config();
const app = express();
const server = http.createServer(app);
const wss = new WebSocket.Server({ server });
// Middleware
app.use(express.json());
app.use(express.static('public'));
// UUID generator
function generateUUID() {
return crypto.randomUUID();
}
// Database pool
const pool = mysql.createPool({
host: process.env.DB_HOST,
port: process.env.DB_PORT || 3306,
user: process.env.DB_USER,
password: process.env.DB_PASSWORD,
database: process.env.DB_NAME,
waitForConnections: true,
connectionLimit: 10,
queueLimit: 0
});
// Initialize database tables
async function initDatabase() {
const connection = await pool.getConnection();
try {
await connection.query(`
CREATE TABLE IF NOT EXISTS users (
id VARCHAR(36) PRIMARY KEY,
username VARCHAR(255) UNIQUE NOT NULL,
display_name VARCHAR(255),
email VARCHAR(255),
groups TEXT,
last_login TIMESTAMP,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
`);
await connection.query(`
CREATE TABLE IF NOT EXISTS workers (
id VARCHAR(36) PRIMARY KEY,
name VARCHAR(255) UNIQUE NOT NULL,
status VARCHAR(50) NOT NULL,
last_heartbeat TIMESTAMP NULL,
api_key VARCHAR(255),
metadata JSON,
INDEX idx_status (status),
INDEX idx_heartbeat (last_heartbeat)
)
`);
await connection.query(`
CREATE TABLE IF NOT EXISTS workflows (
id VARCHAR(36) PRIMARY KEY,
name VARCHAR(255) NOT NULL,
description TEXT,
definition JSON NOT NULL,
created_by VARCHAR(255),
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
INDEX idx_name (name)
)
`);
await connection.query(`
CREATE TABLE IF NOT EXISTS executions (
id VARCHAR(36) PRIMARY KEY,
workflow_id VARCHAR(36) NOT NULL,
status VARCHAR(50) NOT NULL,
started_by VARCHAR(255),
started_at TIMESTAMP NULL,
completed_at TIMESTAMP NULL,
logs JSON,
FOREIGN KEY (workflow_id) REFERENCES workflows(id) ON DELETE CASCADE,
INDEX idx_workflow (workflow_id),
INDEX idx_status (status),
INDEX idx_started (started_at)
)
`);
console.log('Database tables initialized successfully');
} catch (error) {
console.error('Database initialization error:', error);
throw error;
} finally {
connection.release();
}
}
// WebSocket connections
const clients = new Set();
wss.on('connection', (ws) => {
clients.add(ws);
ws.on('close', () => clients.delete(ws));
});
// Broadcast to all connected clients
function broadcast(data) {
clients.forEach(client => {
if (client.readyState === WebSocket.OPEN) {
client.send(JSON.stringify(data));
}
});
}
// Authelia SSO Middleware
async function authenticateSSO(req, res, next) {
// Check for Authelia headers
const remoteUser = req.headers['remote-user'];
const remoteName = req.headers['remote-name'];
const remoteEmail = req.headers['remote-email'];
const remoteGroups = req.headers['remote-groups'];
if (!remoteUser) {
return res.status(401).json({
error: 'Not authenticated',
message: 'Please access this service through auth.lotusguild.org'
});
}
// Check if user is in allowed groups (admin or employee)
const groups = remoteGroups ? remoteGroups.split(',').map(g => g.trim()) : [];
const allowedGroups = ['admin', 'employee'];
const hasAccess = groups.some(g => allowedGroups.includes(g));
if (!hasAccess) {
return res.status(403).json({
error: 'Access denied',
message: 'You must be in admin or employee group'
});
}
// Store/update user in database
try {
const userId = generateUUID();
await pool.query(
`INSERT INTO users (id, username, display_name, email, groups, last_login)
VALUES (?, ?, ?, ?, ?, NOW())
ON DUPLICATE KEY UPDATE
display_name=VALUES(display_name),
email=VALUES(email),
groups=VALUES(groups),
last_login=NOW()`,
[userId, remoteUser, remoteName, remoteEmail, remoteGroups]
);
} catch (error) {
console.error('Error updating user:', error);
}
// Attach user info to request
req.user = {
username: remoteUser,
name: remoteName || remoteUser,
email: remoteEmail || '',
groups: groups,
isAdmin: groups.includes('admin')
};
next();
}
// Routes - All protected by SSO
app.get('/api/user', authenticateSSO, (req, res) => {
res.json(req.user);
});
app.get('/api/workflows', authenticateSSO, async (req, res) => {
try {
const [rows] = await pool.query('SELECT * FROM workflows ORDER BY created_at DESC');
res.json(rows);
} catch (error) {
res.status(500).json({ error: error.message });
}
});
app.post('/api/workflows', authenticateSSO, async (req, res) => {
try {
const { name, description, definition } = req.body;
const id = generateUUID();
await pool.query(
'INSERT INTO workflows (id, name, description, definition, created_by) VALUES (?, ?, ?, ?, ?)',
[id, name, description, JSON.stringify(definition), req.user.username]
);
res.json({ id, name, description, definition });
broadcast({ type: 'workflow_created', workflow_id: id });
} catch (error) {
res.status(500).json({ error: error.message });
}
});
app.delete('/api/workflows/:id', authenticateSSO, async (req, res) => {
try {
// Only admins can delete workflows
if (!req.user.isAdmin) {
return res.status(403).json({ error: 'Admin access required' });
}
await pool.query('DELETE FROM workflows WHERE id = ?', [req.params.id]);
res.json({ success: true });
broadcast({ type: 'workflow_deleted', workflow_id: req.params.id });
} catch (error) {
res.status(500).json({ error: error.message });
}
});
app.get('/api/workers', authenticateSSO, async (req, res) => {
try {
const [rows] = await pool.query('SELECT * FROM workers ORDER BY name');
res.json(rows);
} catch (error) {
res.status(500).json({ error: error.message });
}
});
app.post('/api/workers/heartbeat', async (req, res) => {
try {
const { worker_id, name, metadata } = req.body;
const apiKey = req.headers['x-api-key'];
// Verify API key
if (apiKey !== process.env.WORKER_API_KEY) {
return res.status(401).json({ error: 'Invalid API key' });
}
await pool.query(
`INSERT INTO workers (id, name, status, last_heartbeat, api_key, metadata)
VALUES (?, ?, 'online', NOW(), ?, ?)
ON DUPLICATE KEY UPDATE
status='online',
last_heartbeat=NOW(),
metadata=VALUES(metadata)`,
[worker_id, name, apiKey, JSON.stringify(metadata)]
);
broadcast({ type: 'worker_update', worker_id, status: 'online' });
res.json({ success: true });
} catch (error) {
res.status(500).json({ error: error.message });
}
});
app.post('/api/executions', authenticateSSO, async (req, res) => {
try {
const { workflow_id } = req.body;
const id = generateUUID();
await pool.query(
'INSERT INTO executions (id, workflow_id, status, started_by, started_at, logs) VALUES (?, ?, ?, ?, NOW(), ?)',
[id, workflow_id, 'running', req.user.username, JSON.stringify([])]
);
broadcast({ type: 'execution_started', execution_id: id, workflow_id });
res.json({ id, workflow_id, status: 'running' });
} catch (error) {
res.status(500).json({ error: error.message });
}
});
app.get('/api/executions', authenticateSSO, async (req, res) => {
try {
const [rows] = await pool.query(
'SELECT e.*, w.name as workflow_name FROM executions e LEFT JOIN workflows w ON e.workflow_id = w.id ORDER BY e.started_at DESC LIMIT 50'
);
res.json(rows);
} catch (error) {
res.status(500).json({ error: error.message });
}
});
app.get('/api/executions/:id', authenticateSSO, async (req, res) => {
try {
const [rows] = await pool.query('SELECT * FROM executions WHERE id = ?', [req.params.id]);
if (rows.length === 0) {
return res.status(404).json({ error: 'Not found' });
}
res.json(rows[0]);
} catch (error) {
res.status(500).json({ error: error.message });
}
});
// Health check (no auth required)
app.get('/health', async (req, res) => {
try {
await pool.query('SELECT 1');
res.json({
status: 'ok',
timestamp: new Date().toISOString(),
database: 'connected',
auth: 'authelia-sso'
});
} catch (error) {
res.status(500).json({
status: 'error',
timestamp: new Date().toISOString(),
database: 'disconnected',
error: error.message
});
}
});
// Start server
const PORT = process.env.PORT || 8080;
const HOST = process.env.HOST || '0.0.0.0';
initDatabase().then(() => {
server.listen(PORT, HOST, () => {
console.log(`PULSE Server running on http://${HOST}:${PORT}`);
console.log(`Connected to MariaDB at ${process.env.DB_HOST}`);
console.log(`Authentication: Authelia SSO`);
console.log(`Worker API Key configured: ${process.env.WORKER_API_KEY ? 'Yes' : 'No'}`);
});
}).catch(err => {
console.error('Failed to start server:', err);
process.exit(1);
});
2025-11-30 13:03:18 -05:00
// ============================================
// WORKFLOW EXECUTION ENGINE
// ============================================
// Store active workflow executions in memory
const activeExecutions = new Map();
// Execute workflow step by step
async function executeWorkflow(workflowId, executionId, userId, targetWorkers = 'all') {
try {
// Get workflow definition
const [workflows] = await pool.query('SELECT * FROM workflows WHERE id = ?', [workflowId]);
if (workflows.length === 0) {
throw new Error('Workflow not found');
}
const workflow = workflows[0];
const definition = JSON.parse(workflow.definition);
// Initialize execution state
const executionState = {
id: executionId,
workflowId: workflowId,
currentStep: 0,
steps: definition.steps || [],
results: [],
status: 'running',
waitingForInput: false,
targetWorkers: targetWorkers,
userId: userId
};
activeExecutions.set(executionId, executionState);
// Start executing steps
await executeNextStep(executionId);
} catch (error) {
console.error('Workflow execution error:', error);
await updateExecutionStatus(executionId, 'failed', error.message);
}
}
// Execute the next step in a workflow
async function executeNextStep(executionId) {
const state = activeExecutions.get(executionId);
if (!state) return;
// Check if we've completed all steps
if (state.currentStep >= state.steps.length) {
await updateExecutionStatus(executionId, 'completed');
activeExecutions.delete(executionId);
return;
}
const step = state.steps[state.currentStep];
// Check if step has a condition
if (step.condition && !evaluateCondition(step.condition, state)) {
console.log(`[Workflow] Skipping step ${state.currentStep}: condition not met`);
state.currentStep++;
return executeNextStep(executionId);
}
console.log(`[Workflow] Executing step ${state.currentStep}: ${step.name}`);
try {
switch (step.type) {
case 'execute':
await executeCommandStep(executionId, step);
break;
case 'prompt':
await executePromptStep(executionId, step);
break;
case 'wait':
await executeWaitStep(executionId, step);
break;
default:
throw new Error(`Unknown step type: ${step.type}`);
}
} catch (error) {
await addExecutionLog(executionId, {
step: state.currentStep,
error: error.message,
timestamp: new Date().toISOString()
});
await updateExecutionStatus(executionId, 'failed', error.message);
activeExecutions.delete(executionId);
}
}
// Execute a command on workers
async function executeCommandStep(executionId, step) {
const state = activeExecutions.get(executionId);
// Get target workers
const [workers] = await pool.query(
'SELECT * FROM workers WHERE status = "online"'
);
if (workers.length === 0) {
throw new Error('No online workers available');
}
// Filter workers based on target
let targetWorkers = workers;
if (step.targets && step.targets[0] !== 'all') {
targetWorkers = workers.filter(w => step.targets.includes(w.name));
}
// Send command to workers via WebSocket
const commandMessage = {
type: 'execute_command',
execution_id: executionId,
step_index: state.currentStep,
command: step.command,
timeout: step.timeout || 300000
};
// Broadcast to target workers
clients.forEach(client => {
if (client.readyState === WebSocket.OPEN) {
client.send(JSON.stringify(commandMessage));
}
});
await addExecutionLog(executionId, {
step: state.currentStep,
action: 'command_sent',
command: step.command,
workers: targetWorkers.map(w => w.name),
timestamp: new Date().toISOString()
});
// For now, move to next step immediately
// In production, we'd wait for worker responses
state.currentStep++;
// Small delay to allow command to execute
setTimeout(() => executeNextStep(executionId), 1000);
}
// Execute a user prompt step
async function executePromptStep(executionId, step) {
const state = activeExecutions.get(executionId);
state.waitingForInput = true;
state.promptData = {
message: step.message,
options: step.options,
step: state.currentStep
};
await addExecutionLog(executionId, {
step: state.currentStep,
action: 'waiting_for_input',
prompt: step.message,
timestamp: new Date().toISOString()
});
// Notify frontend that input is needed
broadcast({
type: 'execution_prompt',
execution_id: executionId,
prompt: state.promptData
});
}
// Execute a wait/delay step
async function executeWaitStep(executionId, step) {
const state = activeExecutions.get(executionId);
const delay = step.duration || 1000;
await addExecutionLog(executionId, {
step: state.currentStep,
action: 'waiting',
duration: delay,
timestamp: new Date().toISOString()
});
state.currentStep++;
setTimeout(() => executeNextStep(executionId), delay);
}
// Handle user input for prompts
function handleUserInput(executionId, response) {
const state = activeExecutions.get(executionId);
if (!state || !state.waitingForInput) {
return false;
}
state.promptResponse = response;
state.waitingForInput = false;
state.currentStep++;
addExecutionLog(executionId, {
step: state.currentStep - 1,
action: 'user_response',
response: response,
timestamp: new Date().toISOString()
});
executeNextStep(executionId);
return true;
}
// Evaluate conditions
function evaluateCondition(condition, state) {
try {
// Simple condition evaluation
// In production, use a proper expression evaluator
const promptResponse = state.promptResponse;
return eval(condition);
} catch (error) {
console.error('Condition evaluation error:', error);
return false;
}
}
// Helper functions
async function updateExecutionStatus(executionId, status, error = null) {
const updates = { status };
if (status === 'completed' || status === 'failed') {
updates.completed_at = new Date();
}
if (error) {
// Add error to logs
await addExecutionLog(executionId, { error, timestamp: new Date().toISOString() });
}
await pool.query(
'UPDATE executions SET status = ?, completed_at = ? WHERE id = ?',
[status, updates.completed_at || null, executionId]
);
broadcast({
type: 'execution_status',
execution_id: executionId,
status: status
});
}
async function addExecutionLog(executionId, logEntry) {
const [rows] = await pool.query('SELECT logs FROM executions WHERE id = ?', [executionId]);
if (rows.length === 0) return;
const logs = JSON.parse(rows[0].logs || '[]');
logs.push(logEntry);
await pool.query('UPDATE executions SET logs = ? WHERE id = ?', [
JSON.stringify(logs),
executionId
]);
}
// ============================================
// API ROUTES - Add these to your server.js
// ============================================
// Start workflow execution
app.post('/api/executions', authenticateSSO, async (req, res) => {
try {
const { workflow_id, target_workers } = req.body;
const id = generateUUID();
await pool.query(
'INSERT INTO executions (id, workflow_id, status, started_by, started_at, logs) VALUES (?, ?, ?, ?, NOW(), ?)',
[id, workflow_id, 'running', req.user.username, JSON.stringify([])]
);
// Start execution
executeWorkflow(workflow_id, id, req.user.username, target_workers || 'all');
broadcast({ type: 'execution_started', execution_id: id, workflow_id });
res.json({ id, workflow_id, status: 'running' });
} catch (error) {
res.status(500).json({ error: error.message });
}
});
// Respond to workflow prompt
app.post('/api/executions/:id/respond', authenticateSSO, async (req, res) => {
try {
const { response } = req.body;
const success = handleUserInput(req.params.id, response);
if (success) {
res.json({ success: true });
} else {
res.status(400).json({ error: 'Execution not waiting for input' });
}
} catch (error) {
res.status(500).json({ error: error.message });
}
});
// Get execution details with logs
app.get('/api/executions/:id', authenticateSSO, async (req, res) => {
try {
const [rows] = await pool.query('SELECT * FROM executions WHERE id = ?', [req.params.id]);
if (rows.length === 0) {
return res.status(404).json({ error: 'Not found' });
}
const execution = rows[0];
const state = activeExecutions.get(req.params.id);
res.json({
...execution,
logs: JSON.parse(execution.logs || '[]'),
waiting_for_input: state?.waitingForInput || false,
prompt: state?.promptData || null
});
} catch (error) {
res.status(500).json({ error: error.message });
}
});
// Delete worker (admin only)
app.delete('/api/workers/:id', authenticateSSO, async (req, res) => {
try {
if (!req.user.isAdmin) {
return res.status(403).json({ error: 'Admin access required' });
}
await pool.query('DELETE FROM workers WHERE id = ?', [req.params.id]);
res.json({ success: true });
broadcast({ type: 'worker_deleted', worker_id: req.params.id });
} catch (error) {
res.status(500).json({ error: error.message });
}
});
// Send direct command to specific worker (for testing)
app.post('/api/workers/:id/command', authenticateSSO, async (req, res) => {
try {
const { command } = req.body;
const executionId = generateUUID();
// Send command via WebSocket
const commandMessage = {
type: 'execute_command',
execution_id: executionId,
command: command,
worker_id: req.params.id,
timeout: 60000
};
clients.forEach(client => {
if (client.readyState === WebSocket.OPEN) {
client.send(JSON.stringify(commandMessage));
}
});
res.json({ success: true, execution_id: executionId });
} catch (error) {
res.status(500).json({ error: error.message });
}
});
// ============================================
// EXAMPLE WORKFLOW DEFINITIONS
// ============================================
// Example 1: Simple command execution
const simpleWorkflow = {
name: "Update System Packages",
description: "Update all packages on target servers",
steps: [
{
name: "Update package list",
type: "execute",
targets: ["all"],
command: "apt update"
},
{
name: "User Approval",
type: "prompt",
message: "Packages updated. Proceed with upgrade?",
options: ["Yes", "No"]
},
{
name: "Upgrade packages",
type: "execute",
targets: ["all"],
command: "apt upgrade -y",
condition: "promptResponse === 'Yes'"
}
]
};
// Example 2: Complex workflow with conditions
const backupWorkflow = {
name: "Backup and Verify",
description: "Create backup and verify integrity",
steps: [
{
name: "Create backup",
type: "execute",
targets: ["all"],
command: "tar -czf /tmp/backup-$(date +%Y%m%d).tar.gz /opt/pulse-worker"
},
{
name: "Wait for backup",
type: "wait",
duration: 5000
},
{
name: "Verify backup",
type: "execute",
targets: ["all"],
command: "tar -tzf /tmp/backup-*.tar.gz > /dev/null && echo 'Backup OK' || echo 'Backup FAILED'"
},
{
name: "Cleanup decision",
type: "prompt",
message: "Backup complete. Delete old backups?",
options: ["Yes", "No", "Cancel"]
},
{
name: "Cleanup old backups",
type: "execute",
targets: ["all"],
command: "find /tmp -name 'backup-*.tar.gz' -mtime +7 -delete",
condition: "promptResponse === 'Yes'"
}
]
};