1157 lines
35 KiB
JavaScript
1157 lines
35 KiB
JavaScript
const express = require('express');
|
|
const http = require('http');
|
|
const WebSocket = require('ws');
|
|
const mysql = require('mysql2/promise');
|
|
const crypto = require('crypto');
|
|
require('dotenv').config();
|
|
|
|
const app = express();
|
|
const server = http.createServer(app);
|
|
const wss = new WebSocket.Server({ server });
|
|
|
|
// Middleware
|
|
app.use(express.json());
|
|
app.use(express.static('public'));
|
|
|
|
// UUID generator
|
|
function generateUUID() {
|
|
return crypto.randomUUID();
|
|
}
|
|
|
|
// Database pool
|
|
const pool = mysql.createPool({
|
|
host: process.env.DB_HOST,
|
|
port: process.env.DB_PORT || 3306,
|
|
user: process.env.DB_USER,
|
|
password: process.env.DB_PASSWORD,
|
|
database: process.env.DB_NAME,
|
|
waitForConnections: true,
|
|
connectionLimit: 10,
|
|
queueLimit: 0
|
|
});
|
|
|
|
// Initialize database tables
|
|
async function initDatabase() {
|
|
const connection = await pool.getConnection();
|
|
try {
|
|
await connection.query(`
|
|
CREATE TABLE IF NOT EXISTS users (
|
|
id VARCHAR(36) PRIMARY KEY,
|
|
username VARCHAR(255) UNIQUE NOT NULL,
|
|
display_name VARCHAR(255),
|
|
email VARCHAR(255),
|
|
groups TEXT,
|
|
last_login TIMESTAMP,
|
|
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
|
|
)
|
|
`);
|
|
|
|
// Database schema is managed manually - migrations removed after direct database fixes
|
|
|
|
await connection.query(`
|
|
CREATE TABLE IF NOT EXISTS workers (
|
|
id VARCHAR(36) PRIMARY KEY,
|
|
name VARCHAR(255) UNIQUE NOT NULL,
|
|
status VARCHAR(50) NOT NULL,
|
|
last_heartbeat TIMESTAMP NULL,
|
|
api_key VARCHAR(255),
|
|
metadata JSON,
|
|
INDEX idx_status (status),
|
|
INDEX idx_heartbeat (last_heartbeat)
|
|
)
|
|
`);
|
|
|
|
await connection.query(`
|
|
CREATE TABLE IF NOT EXISTS workflows (
|
|
id VARCHAR(36) PRIMARY KEY,
|
|
name VARCHAR(255) NOT NULL,
|
|
description TEXT,
|
|
definition JSON NOT NULL,
|
|
created_by VARCHAR(255),
|
|
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
|
|
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
|
|
INDEX idx_name (name)
|
|
)
|
|
`);
|
|
|
|
await connection.query(`
|
|
CREATE TABLE IF NOT EXISTS executions (
|
|
id VARCHAR(36) PRIMARY KEY,
|
|
workflow_id VARCHAR(36) NULL,
|
|
status VARCHAR(50) NOT NULL,
|
|
started_by VARCHAR(255),
|
|
started_at TIMESTAMP NULL,
|
|
completed_at TIMESTAMP NULL,
|
|
logs JSON,
|
|
INDEX idx_workflow (workflow_id),
|
|
INDEX idx_status (status),
|
|
INDEX idx_started (started_at)
|
|
)
|
|
`);
|
|
|
|
await connection.query(`
|
|
CREATE TABLE IF NOT EXISTS scheduled_commands (
|
|
id VARCHAR(36) PRIMARY KEY,
|
|
name VARCHAR(255) NOT NULL,
|
|
command TEXT NOT NULL,
|
|
worker_ids JSON NOT NULL,
|
|
schedule_type VARCHAR(50) NOT NULL,
|
|
schedule_value VARCHAR(255) NOT NULL,
|
|
enabled BOOLEAN DEFAULT TRUE,
|
|
created_by VARCHAR(255),
|
|
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
|
|
last_run TIMESTAMP NULL,
|
|
next_run TIMESTAMP NULL,
|
|
INDEX idx_enabled (enabled),
|
|
INDEX idx_next_run (next_run)
|
|
)
|
|
`);
|
|
|
|
console.log('Database tables initialized successfully');
|
|
} catch (error) {
|
|
console.error('Database initialization error:', error);
|
|
throw error;
|
|
} finally {
|
|
connection.release();
|
|
}
|
|
}
|
|
|
|
// Auto-cleanup old executions (runs daily)
|
|
async function cleanupOldExecutions() {
|
|
try {
|
|
const retentionDays = parseInt(process.env.EXECUTION_RETENTION_DAYS) || 30;
|
|
const [result] = await pool.query(
|
|
`DELETE FROM executions
|
|
WHERE status IN ('completed', 'failed')
|
|
AND started_at < DATE_SUB(NOW(), INTERVAL ? DAY)`,
|
|
[retentionDays]
|
|
);
|
|
console.log(`[Cleanup] Removed ${result.affectedRows} executions older than ${retentionDays} days`);
|
|
} catch (error) {
|
|
console.error('[Cleanup] Error removing old executions:', error);
|
|
}
|
|
}
|
|
|
|
// Run cleanup daily at 3 AM
|
|
setInterval(cleanupOldExecutions, 24 * 60 * 60 * 1000);
|
|
// Run cleanup on startup
|
|
cleanupOldExecutions();
|
|
|
|
// Scheduled Commands Processor
|
|
async function processScheduledCommands() {
|
|
try {
|
|
const [schedules] = await pool.query(
|
|
`SELECT * FROM scheduled_commands
|
|
WHERE enabled = TRUE
|
|
AND (next_run IS NULL OR next_run <= NOW())`
|
|
);
|
|
|
|
for (const schedule of schedules) {
|
|
console.log(`[Scheduler] Running scheduled command: ${schedule.name}`);
|
|
|
|
const workerIds = JSON.parse(schedule.worker_ids);
|
|
|
|
// Execute command on each worker
|
|
for (const workerId of workerIds) {
|
|
const workerWs = workers.get(workerId);
|
|
if (workerWs && workerWs.readyState === WebSocket.OPEN) {
|
|
const executionId = generateUUID();
|
|
|
|
// Create execution record
|
|
await pool.query(
|
|
'INSERT INTO executions (id, workflow_id, status, started_by, started_at, logs) VALUES (?, ?, ?, ?, NOW(), ?)',
|
|
[executionId, null, 'running', `scheduler:${schedule.name}`, JSON.stringify([{
|
|
step: 'scheduled_command',
|
|
action: 'command_sent',
|
|
worker_id: workerId,
|
|
command: schedule.command,
|
|
timestamp: new Date().toISOString()
|
|
}])]
|
|
);
|
|
|
|
// Send command to worker
|
|
workerWs.send(JSON.stringify({
|
|
type: 'execute_command',
|
|
execution_id: executionId,
|
|
command: schedule.command,
|
|
worker_id: workerId,
|
|
timeout: 300000 // 5 minute timeout for scheduled commands
|
|
}));
|
|
|
|
broadcast({ type: 'execution_started', execution_id: executionId, workflow_id: null });
|
|
}
|
|
}
|
|
|
|
// Update last_run and calculate next_run
|
|
const nextRun = calculateNextRun(schedule.schedule_type, schedule.schedule_value);
|
|
await pool.query(
|
|
'UPDATE scheduled_commands SET last_run = NOW(), next_run = ? WHERE id = ?',
|
|
[nextRun, schedule.id]
|
|
);
|
|
}
|
|
} catch (error) {
|
|
console.error('[Scheduler] Error processing scheduled commands:', error);
|
|
}
|
|
}
|
|
|
|
function calculateNextRun(scheduleType, scheduleValue) {
|
|
const now = new Date();
|
|
|
|
if (scheduleType === 'interval') {
|
|
// Interval in minutes
|
|
const minutes = parseInt(scheduleValue);
|
|
return new Date(now.getTime() + minutes * 60000);
|
|
} else if (scheduleType === 'daily') {
|
|
// Daily at HH:MM
|
|
const [hours, minutes] = scheduleValue.split(':').map(Number);
|
|
const next = new Date(now);
|
|
next.setHours(hours, minutes, 0, 0);
|
|
|
|
// If time has passed today, schedule for tomorrow
|
|
if (next <= now) {
|
|
next.setDate(next.getDate() + 1);
|
|
}
|
|
return next;
|
|
} else if (scheduleType === 'hourly') {
|
|
// Every N hours
|
|
const hours = parseInt(scheduleValue);
|
|
return new Date(now.getTime() + hours * 3600000);
|
|
}
|
|
|
|
return null;
|
|
}
|
|
|
|
// Run scheduler every minute
|
|
setInterval(processScheduledCommands, 60 * 1000);
|
|
// Initial run on startup
|
|
setTimeout(processScheduledCommands, 5000);
|
|
|
|
// WebSocket connections
|
|
const clients = new Set();
|
|
const workers = new Map(); // Map worker_id -> WebSocket connection
|
|
|
|
wss.on('connection', (ws) => {
|
|
clients.add(ws);
|
|
|
|
// Handle incoming messages from workers
|
|
ws.on('message', async (data) => {
|
|
try {
|
|
const message = JSON.parse(data.toString());
|
|
console.log('WebSocket message received:', message.type);
|
|
|
|
if (message.type === 'command_result') {
|
|
// Handle command result from worker
|
|
const { execution_id, command_id, worker_id, success, stdout, stderr, duration, timestamp } = message;
|
|
|
|
// Add result to execution logs
|
|
await addExecutionLog(execution_id, {
|
|
step: 'command_execution',
|
|
action: 'command_result',
|
|
command_id: command_id, // Include command_id for workflow tracking
|
|
worker_id: worker_id,
|
|
success: success,
|
|
stdout: stdout,
|
|
stderr: stderr,
|
|
duration: duration,
|
|
timestamp: timestamp || new Date().toISOString()
|
|
});
|
|
|
|
// For non-workflow executions, update status immediately
|
|
// For workflow executions, the workflow engine will update status
|
|
const [execution] = await pool.query('SELECT workflow_id FROM executions WHERE id = ?', [execution_id]);
|
|
if (execution.length > 0 && !execution[0].workflow_id) {
|
|
// Only update status for quick commands (no workflow_id)
|
|
const finalStatus = success ? 'completed' : 'failed';
|
|
await updateExecutionStatus(execution_id, finalStatus);
|
|
}
|
|
|
|
// Broadcast result to all connected clients
|
|
broadcast({
|
|
type: 'command_result',
|
|
execution_id: execution_id,
|
|
worker_id: worker_id,
|
|
success: success,
|
|
stdout: stdout,
|
|
stderr: stderr
|
|
});
|
|
|
|
console.log(`Command result received for execution ${execution_id}: ${success ? 'success' : 'failed'}`);
|
|
}
|
|
|
|
if (message.type === 'workflow_result') {
|
|
// Handle workflow result from worker
|
|
const { execution_id, worker_id, success, message: resultMessage, timestamp } = message;
|
|
|
|
// Add final result to logs
|
|
await addExecutionLog(execution_id, {
|
|
step: 'workflow_completion',
|
|
action: 'workflow_result',
|
|
worker_id: worker_id,
|
|
success: success,
|
|
message: resultMessage,
|
|
timestamp: timestamp || new Date().toISOString()
|
|
});
|
|
|
|
// Update execution status
|
|
const finalStatus = success ? 'completed' : 'failed';
|
|
await updateExecutionStatus(execution_id, finalStatus);
|
|
|
|
// Broadcast completion to all clients
|
|
broadcast({
|
|
type: 'workflow_result',
|
|
execution_id: execution_id,
|
|
status: finalStatus,
|
|
success: success,
|
|
message: resultMessage
|
|
});
|
|
|
|
console.log(`Workflow result received for execution ${execution_id}: ${finalStatus}`);
|
|
}
|
|
|
|
if (message.type === 'worker_connect') {
|
|
// Handle worker connection
|
|
const { worker_id, worker_name } = message;
|
|
console.log(`Worker connected: ${worker_name} (${worker_id})`);
|
|
|
|
// Find the database worker ID by name
|
|
const [dbWorkers] = await pool.query(
|
|
'SELECT id FROM workers WHERE name = ?',
|
|
[worker_name]
|
|
);
|
|
|
|
if (dbWorkers.length > 0) {
|
|
const dbWorkerId = dbWorkers[0].id;
|
|
|
|
// Store worker WebSocket connection using BOTH IDs
|
|
workers.set(worker_id, ws); // Runtime ID
|
|
workers.set(dbWorkerId, ws); // Database ID
|
|
|
|
// Store mapping for cleanup
|
|
ws.workerId = worker_id;
|
|
ws.dbWorkerId = dbWorkerId;
|
|
|
|
console.log(`Mapped worker: runtime_id=${worker_id}, db_id=${dbWorkerId}, name=${worker_name}`);
|
|
|
|
// Update worker status to online
|
|
await pool.query(
|
|
`UPDATE workers SET status='online', last_heartbeat=NOW() WHERE id=?`,
|
|
[dbWorkerId]
|
|
);
|
|
|
|
// Broadcast worker status update with database ID
|
|
broadcast({
|
|
type: 'worker_update',
|
|
worker_id: dbWorkerId,
|
|
status: 'online'
|
|
});
|
|
} else {
|
|
console.log(`Worker ${worker_name} not found in database, will be created on heartbeat`);
|
|
}
|
|
}
|
|
|
|
if (message.type === 'pong') {
|
|
// Handle worker pong response
|
|
const { worker_id } = message;
|
|
await pool.query(
|
|
`UPDATE workers SET last_heartbeat=NOW() WHERE id=?`,
|
|
[worker_id]
|
|
);
|
|
}
|
|
|
|
} catch (error) {
|
|
console.error('WebSocket message error:', error);
|
|
}
|
|
});
|
|
|
|
ws.on('close', () => {
|
|
clients.delete(ws);
|
|
// Remove worker from workers map when disconnected (both runtime and db IDs)
|
|
if (ws.workerId) {
|
|
workers.delete(ws.workerId);
|
|
console.log(`Worker ${ws.workerId} (runtime ID) disconnected`);
|
|
}
|
|
if (ws.dbWorkerId) {
|
|
workers.delete(ws.dbWorkerId);
|
|
console.log(`Worker ${ws.dbWorkerId} (database ID) disconnected`);
|
|
}
|
|
});
|
|
});
|
|
|
|
// Broadcast to all connected clients
|
|
function broadcast(data) {
|
|
clients.forEach(client => {
|
|
if (client.readyState === WebSocket.OPEN) {
|
|
client.send(JSON.stringify(data));
|
|
}
|
|
});
|
|
}
|
|
|
|
// Helper function to add log entry to execution
|
|
async function addExecutionLog(executionId, logEntry) {
|
|
try {
|
|
const [execution] = await pool.query('SELECT logs FROM executions WHERE id = ?', [executionId]);
|
|
|
|
if (execution.length > 0) {
|
|
const logs = typeof execution[0].logs === 'string' ? JSON.parse(execution[0].logs) : execution[0].logs;
|
|
logs.push(logEntry);
|
|
|
|
await pool.query(
|
|
'UPDATE executions SET logs = ? WHERE id = ?',
|
|
[JSON.stringify(logs), executionId]
|
|
);
|
|
}
|
|
} catch (error) {
|
|
console.error(`[Workflow] Error adding execution log:`, error);
|
|
}
|
|
}
|
|
|
|
// Helper function to update execution status
|
|
async function updateExecutionStatus(executionId, status) {
|
|
try {
|
|
await pool.query(
|
|
'UPDATE executions SET status = ?, completed_at = NOW() WHERE id = ?',
|
|
[status, executionId]
|
|
);
|
|
|
|
broadcast({
|
|
type: 'execution_status',
|
|
execution_id: executionId,
|
|
status: status
|
|
});
|
|
|
|
console.log(`[Workflow] Execution ${executionId} status updated to: ${status}`);
|
|
} catch (error) {
|
|
console.error(`[Workflow] Error updating execution status:`, error);
|
|
}
|
|
}
|
|
|
|
// Authelia SSO Middleware
|
|
async function authenticateSSO(req, res, next) {
|
|
// Check for Authelia headers
|
|
const remoteUser = req.headers['remote-user'];
|
|
const remoteName = req.headers['remote-name'];
|
|
const remoteEmail = req.headers['remote-email'];
|
|
const remoteGroups = req.headers['remote-groups'];
|
|
|
|
if (!remoteUser) {
|
|
return res.status(401).json({
|
|
error: 'Not authenticated',
|
|
message: 'Please access this service through auth.lotusguild.org'
|
|
});
|
|
}
|
|
|
|
// Check if user is in allowed groups (admin or employee)
|
|
const groups = remoteGroups ? remoteGroups.split(',').map(g => g.trim()) : [];
|
|
const allowedGroups = ['admin', 'employee'];
|
|
const hasAccess = groups.some(g => allowedGroups.includes(g));
|
|
|
|
if (!hasAccess) {
|
|
return res.status(403).json({
|
|
error: 'Access denied',
|
|
message: 'You must be in admin or employee group'
|
|
});
|
|
}
|
|
|
|
// Store/update user in database
|
|
try {
|
|
const userId = generateUUID();
|
|
await pool.query(
|
|
`INSERT INTO users (id, username, display_name, email, groups, last_login)
|
|
VALUES (?, ?, ?, ?, ?, NOW())
|
|
ON DUPLICATE KEY UPDATE
|
|
display_name=VALUES(display_name),
|
|
email=VALUES(email),
|
|
groups=VALUES(groups),
|
|
last_login=NOW()`,
|
|
[userId, remoteUser, remoteName, remoteEmail, remoteGroups]
|
|
);
|
|
} catch (error) {
|
|
console.error('Error updating user:', error);
|
|
}
|
|
|
|
// Attach user info to request
|
|
req.user = {
|
|
username: remoteUser,
|
|
name: remoteName || remoteUser,
|
|
email: remoteEmail || '',
|
|
groups: groups,
|
|
isAdmin: groups.includes('admin')
|
|
};
|
|
|
|
next();
|
|
}
|
|
|
|
// Workflow Execution Engine
|
|
async function executeWorkflowSteps(executionId, workflowId, definition, username) {
|
|
try {
|
|
console.log(`[Workflow] Starting execution ${executionId} for workflow ${workflowId}`);
|
|
|
|
const steps = definition.steps || [];
|
|
let allStepsSucceeded = true;
|
|
|
|
for (let i = 0; i < steps.length; i++) {
|
|
const step = steps[i];
|
|
console.log(`[Workflow] Execution ${executionId} - Step ${i + 1}/${steps.length}: ${step.name}`);
|
|
|
|
// Add step start log
|
|
await addExecutionLog(executionId, {
|
|
step: i + 1,
|
|
step_name: step.name,
|
|
action: 'step_started',
|
|
timestamp: new Date().toISOString()
|
|
});
|
|
|
|
broadcast({
|
|
type: 'workflow_step_started',
|
|
execution_id: executionId,
|
|
step: i + 1,
|
|
step_name: step.name
|
|
});
|
|
|
|
if (step.type === 'execute') {
|
|
// Execute command step
|
|
const success = await executeCommandStep(executionId, step, i + 1);
|
|
if (!success) {
|
|
allStepsSucceeded = false;
|
|
break; // Stop workflow on failure
|
|
}
|
|
} else if (step.type === 'wait') {
|
|
// Wait step (delay in seconds)
|
|
const seconds = step.duration || 5;
|
|
await addExecutionLog(executionId, {
|
|
step: i + 1,
|
|
action: 'waiting',
|
|
duration: seconds,
|
|
timestamp: new Date().toISOString()
|
|
});
|
|
await new Promise(resolve => setTimeout(resolve, seconds * 1000));
|
|
} else if (step.type === 'prompt') {
|
|
// Interactive prompt step (not fully implemented, would need user interaction)
|
|
await addExecutionLog(executionId, {
|
|
step: i + 1,
|
|
action: 'prompt_skipped',
|
|
message: 'Interactive prompts not yet supported',
|
|
timestamp: new Date().toISOString()
|
|
});
|
|
}
|
|
|
|
// Add step completion log
|
|
await addExecutionLog(executionId, {
|
|
step: i + 1,
|
|
step_name: step.name,
|
|
action: 'step_completed',
|
|
timestamp: new Date().toISOString()
|
|
});
|
|
|
|
broadcast({
|
|
type: 'workflow_step_completed',
|
|
execution_id: executionId,
|
|
step: i + 1,
|
|
step_name: step.name
|
|
});
|
|
}
|
|
|
|
// Mark execution as completed or failed
|
|
const finalStatus = allStepsSucceeded ? 'completed' : 'failed';
|
|
await updateExecutionStatus(executionId, finalStatus);
|
|
|
|
console.log(`[Workflow] Execution ${executionId} finished with status: ${finalStatus}`);
|
|
|
|
} catch (error) {
|
|
console.error(`[Workflow] Execution ${executionId} error:`, error);
|
|
await addExecutionLog(executionId, {
|
|
action: 'workflow_error',
|
|
error: error.message,
|
|
timestamp: new Date().toISOString()
|
|
});
|
|
await updateExecutionStatus(executionId, 'failed');
|
|
}
|
|
}
|
|
|
|
async function executeCommandStep(executionId, step, stepNumber) {
|
|
try {
|
|
const command = step.command;
|
|
const targets = step.targets || ['all'];
|
|
|
|
// Determine which workers to target
|
|
let targetWorkerIds = [];
|
|
|
|
if (targets.includes('all')) {
|
|
// Get all online workers
|
|
const [onlineWorkers] = await pool.query('SELECT id FROM workers WHERE status = ?', ['online']);
|
|
targetWorkerIds = onlineWorkers.map(w => w.id);
|
|
} else {
|
|
// Specific worker IDs or names
|
|
for (const target of targets) {
|
|
// Try to find by ID first, then by name
|
|
const [workerById] = await pool.query('SELECT id FROM workers WHERE id = ?', [target]);
|
|
if (workerById.length > 0) {
|
|
targetWorkerIds.push(workerById[0].id);
|
|
} else {
|
|
const [workerByName] = await pool.query('SELECT id FROM workers WHERE name = ?', [target]);
|
|
if (workerByName.length > 0) {
|
|
targetWorkerIds.push(workerByName[0].id);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
if (targetWorkerIds.length === 0) {
|
|
await addExecutionLog(executionId, {
|
|
step: stepNumber,
|
|
action: 'no_workers',
|
|
message: 'No workers available for this step',
|
|
timestamp: new Date().toISOString()
|
|
});
|
|
return false;
|
|
}
|
|
|
|
// Execute command on each target worker and wait for results
|
|
const results = [];
|
|
|
|
for (const workerId of targetWorkerIds) {
|
|
const workerWs = workers.get(workerId);
|
|
|
|
if (!workerWs || workerWs.readyState !== WebSocket.OPEN) {
|
|
await addExecutionLog(executionId, {
|
|
step: stepNumber,
|
|
action: 'worker_offline',
|
|
worker_id: workerId,
|
|
timestamp: new Date().toISOString()
|
|
});
|
|
continue;
|
|
}
|
|
|
|
// Send command to worker
|
|
const commandId = generateUUID();
|
|
|
|
await addExecutionLog(executionId, {
|
|
step: stepNumber,
|
|
action: 'command_sent',
|
|
worker_id: workerId,
|
|
command: command,
|
|
command_id: commandId,
|
|
timestamp: new Date().toISOString()
|
|
});
|
|
|
|
workerWs.send(JSON.stringify({
|
|
type: 'execute_command',
|
|
execution_id: executionId,
|
|
command_id: commandId,
|
|
command: command,
|
|
worker_id: workerId,
|
|
timeout: 120000 // 2 minute timeout
|
|
}));
|
|
|
|
// Wait for command result (with timeout)
|
|
const result = await waitForCommandResult(executionId, commandId, 120000);
|
|
results.push(result);
|
|
|
|
if (!result.success) {
|
|
// Command failed, workflow should stop
|
|
return false;
|
|
}
|
|
}
|
|
|
|
// All commands succeeded
|
|
return results.every(r => r.success);
|
|
|
|
} catch (error) {
|
|
console.error(`[Workflow] Error executing command step:`, error);
|
|
await addExecutionLog(executionId, {
|
|
step: stepNumber,
|
|
action: 'step_error',
|
|
error: error.message,
|
|
timestamp: new Date().toISOString()
|
|
});
|
|
return false;
|
|
}
|
|
}
|
|
|
|
async function waitForCommandResult(executionId, commandId, timeout) {
|
|
return new Promise((resolve) => {
|
|
const startTime = Date.now();
|
|
|
|
const checkInterval = setInterval(async () => {
|
|
try {
|
|
// Check if we've received the command result in logs
|
|
const [execution] = await pool.query('SELECT logs FROM executions WHERE id = ?', [executionId]);
|
|
|
|
if (execution.length > 0) {
|
|
const logs = typeof execution[0].logs === 'string' ? JSON.parse(execution[0].logs) : execution[0].logs;
|
|
const resultLog = logs.find(log => log.command_id === commandId && log.action === 'command_result');
|
|
|
|
if (resultLog) {
|
|
clearInterval(checkInterval);
|
|
resolve({
|
|
success: resultLog.success,
|
|
stdout: resultLog.stdout,
|
|
stderr: resultLog.stderr
|
|
});
|
|
return;
|
|
}
|
|
}
|
|
|
|
// Check timeout
|
|
if (Date.now() - startTime > timeout) {
|
|
clearInterval(checkInterval);
|
|
resolve({
|
|
success: false,
|
|
error: 'Command timeout'
|
|
});
|
|
}
|
|
} catch (error) {
|
|
console.error('[Workflow] Error checking command result:', error);
|
|
clearInterval(checkInterval);
|
|
resolve({
|
|
success: false,
|
|
error: error.message
|
|
});
|
|
}
|
|
}, 500); // Check every 500ms
|
|
});
|
|
}
|
|
|
|
// Routes - All protected by SSO
|
|
app.get('/api/user', authenticateSSO, (req, res) => {
|
|
res.json(req.user);
|
|
});
|
|
|
|
app.get('/api/workflows', authenticateSSO, async (req, res) => {
|
|
try {
|
|
const [rows] = await pool.query('SELECT * FROM workflows ORDER BY created_at DESC');
|
|
res.json(rows);
|
|
} catch (error) {
|
|
res.status(500).json({ error: error.message });
|
|
}
|
|
});
|
|
|
|
app.post('/api/workflows', authenticateSSO, async (req, res) => {
|
|
try {
|
|
const { name, description, definition } = req.body;
|
|
const id = generateUUID();
|
|
|
|
console.log('[Workflow] Creating workflow:', name);
|
|
console.log('[Workflow] Definition:', JSON.stringify(definition, null, 2));
|
|
|
|
await pool.query(
|
|
'INSERT INTO workflows (id, name, description, definition, created_by) VALUES (?, ?, ?, ?, ?)',
|
|
[id, name, description, JSON.stringify(definition), req.user.username]
|
|
);
|
|
|
|
console.log('[Workflow] Successfully inserted workflow:', id);
|
|
|
|
res.json({ id, name, description, definition });
|
|
|
|
console.log('[Workflow] Broadcasting workflow_created');
|
|
broadcast({ type: 'workflow_created', workflow_id: id });
|
|
console.log('[Workflow] Broadcast complete');
|
|
} catch (error) {
|
|
console.error('[Workflow] Error creating workflow:', error);
|
|
res.status(500).json({ error: error.message });
|
|
}
|
|
});
|
|
|
|
app.delete('/api/workflows/:id', authenticateSSO, async (req, res) => {
|
|
try {
|
|
// Only admins can delete workflows
|
|
if (!req.user.isAdmin) {
|
|
return res.status(403).json({ error: 'Admin access required' });
|
|
}
|
|
|
|
await pool.query('DELETE FROM workflows WHERE id = ?', [req.params.id]);
|
|
res.json({ success: true });
|
|
broadcast({ type: 'workflow_deleted', workflow_id: req.params.id });
|
|
} catch (error) {
|
|
res.status(500).json({ error: error.message });
|
|
}
|
|
});
|
|
|
|
app.get('/api/workers', authenticateSSO, async (req, res) => {
|
|
try {
|
|
const [rows] = await pool.query('SELECT * FROM workers ORDER BY name');
|
|
res.json(rows);
|
|
} catch (error) {
|
|
res.status(500).json({ error: error.message });
|
|
}
|
|
});
|
|
|
|
app.post('/api/workers/heartbeat', async (req, res) => {
|
|
try {
|
|
const { worker_id, name, metadata } = req.body;
|
|
const apiKey = req.headers['x-api-key'];
|
|
|
|
// Verify API key
|
|
if (apiKey !== process.env.WORKER_API_KEY) {
|
|
return res.status(401).json({ error: 'Invalid API key' });
|
|
}
|
|
|
|
await pool.query(
|
|
`INSERT INTO workers (id, name, status, last_heartbeat, api_key, metadata)
|
|
VALUES (?, ?, 'online', NOW(), ?, ?)
|
|
ON DUPLICATE KEY UPDATE
|
|
status='online',
|
|
last_heartbeat=NOW(),
|
|
metadata=VALUES(metadata)`,
|
|
[worker_id, name, apiKey, JSON.stringify(metadata)]
|
|
);
|
|
|
|
broadcast({ type: 'worker_update', worker_id, status: 'online' });
|
|
res.json({ success: true });
|
|
} catch (error) {
|
|
res.status(500).json({ error: error.message });
|
|
}
|
|
});
|
|
|
|
app.post('/api/executions', authenticateSSO, async (req, res) => {
|
|
try {
|
|
const { workflow_id } = req.body;
|
|
const id = generateUUID();
|
|
|
|
// Get workflow definition
|
|
const [workflows] = await pool.query('SELECT * FROM workflows WHERE id = ?', [workflow_id]);
|
|
if (workflows.length === 0) {
|
|
return res.status(404).json({ error: 'Workflow not found' });
|
|
}
|
|
|
|
const workflow = workflows[0];
|
|
const definition = typeof workflow.definition === 'string' ? JSON.parse(workflow.definition) : workflow.definition;
|
|
|
|
// Create execution record
|
|
await pool.query(
|
|
'INSERT INTO executions (id, workflow_id, status, started_by, started_at, logs) VALUES (?, ?, ?, ?, NOW(), ?)',
|
|
[id, workflow_id, 'running', req.user.username, JSON.stringify([])]
|
|
);
|
|
|
|
broadcast({ type: 'execution_started', execution_id: id, workflow_id });
|
|
|
|
// Start workflow execution asynchronously
|
|
executeWorkflowSteps(id, workflow_id, definition, req.user.username).catch(err => {
|
|
console.error(`[Workflow] Execution ${id} failed:`, err);
|
|
});
|
|
|
|
res.json({ id, workflow_id, status: 'running' });
|
|
} catch (error) {
|
|
res.status(500).json({ error: error.message });
|
|
}
|
|
});
|
|
|
|
app.get('/api/executions', authenticateSSO, async (req, res) => {
|
|
try {
|
|
const limit = parseInt(req.query.limit) || 50;
|
|
const offset = parseInt(req.query.offset) || 0;
|
|
|
|
const [rows] = await pool.query(
|
|
'SELECT e.*, w.name as workflow_name FROM executions e LEFT JOIN workflows w ON e.workflow_id = w.id ORDER BY e.started_at DESC LIMIT ? OFFSET ?',
|
|
[limit, offset]
|
|
);
|
|
|
|
// Get total count
|
|
const [countRows] = await pool.query('SELECT COUNT(*) as total FROM executions');
|
|
const total = countRows[0].total;
|
|
|
|
res.json({
|
|
executions: rows,
|
|
total: total,
|
|
limit: limit,
|
|
offset: offset,
|
|
hasMore: offset + rows.length < total
|
|
});
|
|
} catch (error) {
|
|
res.status(500).json({ error: error.message });
|
|
}
|
|
});
|
|
|
|
app.delete('/api/executions/:id', authenticateSSO, async (req, res) => {
|
|
try {
|
|
await pool.query('DELETE FROM executions WHERE id = ?', [req.params.id]);
|
|
broadcast({ type: 'execution_deleted', execution_id: req.params.id });
|
|
res.json({ success: true });
|
|
} catch (error) {
|
|
res.status(500).json({ error: error.message });
|
|
}
|
|
});
|
|
|
|
app.post('/api/executions/:id/abort', authenticateSSO, async (req, res) => {
|
|
try {
|
|
const executionId = req.params.id;
|
|
|
|
// Check if execution exists and is running
|
|
const [execution] = await pool.query('SELECT status FROM executions WHERE id = ?', [executionId]);
|
|
|
|
if (execution.length === 0) {
|
|
return res.status(404).json({ error: 'Execution not found' });
|
|
}
|
|
|
|
if (execution[0].status !== 'running') {
|
|
return res.status(400).json({ error: 'Execution is not running' });
|
|
}
|
|
|
|
// Add abort log entry
|
|
await addExecutionLog(executionId, {
|
|
action: 'execution_aborted',
|
|
aborted_by: req.user.username,
|
|
timestamp: new Date().toISOString()
|
|
});
|
|
|
|
// Update execution status to failed
|
|
await updateExecutionStatus(executionId, 'failed');
|
|
|
|
console.log(`[Execution] Execution ${executionId} aborted by ${req.user.username}`);
|
|
|
|
res.json({ success: true });
|
|
} catch (error) {
|
|
console.error('[Execution] Error aborting execution:', error);
|
|
res.status(500).json({ error: error.message });
|
|
}
|
|
});
|
|
|
|
// Scheduled Commands API
|
|
app.get('/api/scheduled-commands', authenticateSSO, async (req, res) => {
|
|
try {
|
|
const [schedules] = await pool.query(
|
|
'SELECT * FROM scheduled_commands ORDER BY created_at DESC'
|
|
);
|
|
res.json(schedules);
|
|
} catch (error) {
|
|
res.status(500).json({ error: error.message });
|
|
}
|
|
});
|
|
|
|
app.post('/api/scheduled-commands', authenticateSSO, async (req, res) => {
|
|
try {
|
|
const { name, command, worker_ids, schedule_type, schedule_value } = req.body;
|
|
|
|
if (!name || !command || !worker_ids || !schedule_type || !schedule_value) {
|
|
return res.status(400).json({ error: 'Missing required fields' });
|
|
}
|
|
|
|
const id = generateUUID();
|
|
const nextRun = calculateNextRun(schedule_type, schedule_value);
|
|
|
|
await pool.query(
|
|
`INSERT INTO scheduled_commands
|
|
(id, name, command, worker_ids, schedule_type, schedule_value, created_by, next_run)
|
|
VALUES (?, ?, ?, ?, ?, ?, ?, ?)`,
|
|
[id, name, command, JSON.stringify(worker_ids), schedule_type, schedule_value, req.user.username, nextRun]
|
|
);
|
|
|
|
res.json({ success: true, id });
|
|
} catch (error) {
|
|
res.status(500).json({ error: error.message });
|
|
}
|
|
});
|
|
|
|
app.put('/api/scheduled-commands/:id/toggle', authenticateSSO, async (req, res) => {
|
|
try {
|
|
const { id } = req.params;
|
|
const [current] = await pool.query('SELECT enabled FROM scheduled_commands WHERE id = ?', [id]);
|
|
|
|
if (current.length === 0) {
|
|
return res.status(404).json({ error: 'Schedule not found' });
|
|
}
|
|
|
|
const newEnabled = !current[0].enabled;
|
|
await pool.query('UPDATE scheduled_commands SET enabled = ? WHERE id = ?', [newEnabled, id]);
|
|
|
|
res.json({ success: true, enabled: newEnabled });
|
|
} catch (error) {
|
|
res.status(500).json({ error: error.message });
|
|
}
|
|
});
|
|
|
|
app.delete('/api/scheduled-commands/:id', authenticateSSO, async (req, res) => {
|
|
try {
|
|
const { id } = req.params;
|
|
await pool.query('DELETE FROM scheduled_commands WHERE id = ?', [id]);
|
|
res.json({ success: true });
|
|
} catch (error) {
|
|
res.status(500).json({ error: error.message });
|
|
}
|
|
});
|
|
|
|
// Health check (no auth required)
|
|
app.get('/health', async (req, res) => {
|
|
try {
|
|
await pool.query('SELECT 1');
|
|
res.json({
|
|
status: 'ok',
|
|
timestamp: new Date().toISOString(),
|
|
database: 'connected',
|
|
auth: 'authelia-sso'
|
|
});
|
|
} catch (error) {
|
|
res.status(500).json({
|
|
status: 'error',
|
|
timestamp: new Date().toISOString(),
|
|
database: 'disconnected',
|
|
error: error.message
|
|
});
|
|
}
|
|
});
|
|
|
|
// Start server
|
|
const PORT = process.env.PORT || 8080;
|
|
const HOST = process.env.HOST || '0.0.0.0';
|
|
|
|
initDatabase().then(() => {
|
|
server.listen(PORT, HOST, () => {
|
|
console.log(`PULSE Server running on http://${HOST}:${PORT}`);
|
|
console.log(`Connected to MariaDB at ${process.env.DB_HOST}`);
|
|
console.log(`Authentication: Authelia SSO`);
|
|
console.log(`Worker API Key configured: ${process.env.WORKER_API_KEY ? 'Yes' : 'No'}`);
|
|
});
|
|
}).catch(err => {
|
|
console.error('Failed to start server:', err);
|
|
process.exit(1);
|
|
});
|
|
|
|
|
|
// ============================================
|
|
// WORKFLOW EXECUTION ENGINE
|
|
|
|
// Get execution details with logs
|
|
app.get('/api/executions/:id', authenticateSSO, async (req, res) => {
|
|
try {
|
|
const [rows] = await pool.query('SELECT * FROM executions WHERE id = ?', [req.params.id]);
|
|
if (rows.length === 0) {
|
|
return res.status(404).json({ error: 'Not found' });
|
|
}
|
|
|
|
const execution = rows[0];
|
|
|
|
res.json({
|
|
...execution,
|
|
logs: JSON.parse(execution.logs || '[]')
|
|
});
|
|
} catch (error) {
|
|
res.status(500).json({ error: error.message });
|
|
}
|
|
});
|
|
|
|
// Delete worker (admin only)
|
|
app.delete('/api/workers/:id', authenticateSSO, async (req, res) => {
|
|
try {
|
|
if (!req.user.isAdmin) {
|
|
return res.status(403).json({ error: 'Admin access required' });
|
|
}
|
|
|
|
await pool.query('DELETE FROM workers WHERE id = ?', [req.params.id]);
|
|
res.json({ success: true });
|
|
broadcast({ type: 'worker_deleted', worker_id: req.params.id });
|
|
} catch (error) {
|
|
res.status(500).json({ error: error.message });
|
|
}
|
|
});
|
|
|
|
// Send direct command to specific worker (for testing)
|
|
app.post('/api/workers/:id/command', authenticateSSO, async (req, res) => {
|
|
try {
|
|
const { command } = req.body;
|
|
const executionId = generateUUID();
|
|
const workerId = req.params.id;
|
|
|
|
// Create execution record in database
|
|
await pool.query(
|
|
'INSERT INTO executions (id, workflow_id, status, started_by, started_at, logs) VALUES (?, ?, ?, ?, NOW(), ?)',
|
|
[executionId, null, 'running', req.user.username, JSON.stringify([{
|
|
step: 'quick_command',
|
|
action: 'command_sent',
|
|
worker_id: workerId,
|
|
command: command,
|
|
timestamp: new Date().toISOString()
|
|
}])]
|
|
);
|
|
|
|
// Send command via WebSocket to specific worker
|
|
const commandMessage = {
|
|
type: 'execute_command',
|
|
execution_id: executionId,
|
|
command: command,
|
|
worker_id: workerId,
|
|
timeout: 60000
|
|
};
|
|
|
|
const workerWs = workers.get(workerId);
|
|
if (!workerWs || workerWs.readyState !== WebSocket.OPEN) {
|
|
return res.status(400).json({ error: 'Worker not connected' });
|
|
}
|
|
|
|
workerWs.send(JSON.stringify(commandMessage));
|
|
console.log(`Command sent to worker ${workerId}: ${command}`);
|
|
|
|
broadcast({ type: 'execution_started', execution_id: executionId, workflow_id: null });
|
|
res.json({ success: true, execution_id: executionId });
|
|
} catch (error) {
|
|
res.status(500).json({ error: error.message });
|
|
}
|
|
});
|
|
|
|
// ============================================
|
|
// EXAMPLE WORKFLOW DEFINITIONS
|
|
// ============================================
|
|
|
|
// Example 1: Simple command execution
|
|
const simpleWorkflow = {
|
|
name: "Update System Packages",
|
|
description: "Update all packages on target servers",
|
|
steps: [
|
|
{
|
|
name: "Update package list",
|
|
type: "execute",
|
|
targets: ["all"],
|
|
command: "apt update"
|
|
},
|
|
{
|
|
name: "User Approval",
|
|
type: "prompt",
|
|
message: "Packages updated. Proceed with upgrade?",
|
|
options: ["Yes", "No"]
|
|
},
|
|
{
|
|
name: "Upgrade packages",
|
|
type: "execute",
|
|
targets: ["all"],
|
|
command: "apt upgrade -y",
|
|
condition: "promptResponse === 'Yes'"
|
|
}
|
|
]
|
|
};
|
|
|
|
// Example 2: Complex workflow with conditions
|
|
const backupWorkflow = {
|
|
name: "Backup and Verify",
|
|
description: "Create backup and verify integrity",
|
|
steps: [
|
|
{
|
|
name: "Create backup",
|
|
type: "execute",
|
|
targets: ["all"],
|
|
command: "tar -czf /tmp/backup-$(date +%Y%m%d).tar.gz /opt/pulse-worker"
|
|
},
|
|
{
|
|
name: "Wait for backup",
|
|
type: "wait",
|
|
duration: 5000
|
|
},
|
|
{
|
|
name: "Verify backup",
|
|
type: "execute",
|
|
targets: ["all"],
|
|
command: "tar -tzf /tmp/backup-*.tar.gz > /dev/null && echo 'Backup OK' || echo 'Backup FAILED'"
|
|
},
|
|
{
|
|
name: "Cleanup decision",
|
|
type: "prompt",
|
|
message: "Backup complete. Delete old backups?",
|
|
options: ["Yes", "No", "Cancel"]
|
|
},
|
|
{
|
|
name: "Cleanup old backups",
|
|
type: "execute",
|
|
targets: ["all"],
|
|
command: "find /tmp -name 'backup-*.tar.gz' -mtime +7 -delete",
|
|
condition: "promptResponse === 'Yes'"
|
|
}
|
|
]
|
|
}; |