2025-11-29 19:26:20 -05:00
|
|
|
const express = require('express');
|
|
|
|
|
const http = require('http');
|
|
|
|
|
const WebSocket = require('ws');
|
|
|
|
|
const mysql = require('mysql2/promise');
|
|
|
|
|
const crypto = require('crypto');
|
|
|
|
|
require('dotenv').config();
|
|
|
|
|
|
|
|
|
|
const app = express();
|
|
|
|
|
const server = http.createServer(app);
|
|
|
|
|
const wss = new WebSocket.Server({ server });
|
|
|
|
|
|
|
|
|
|
// Middleware
|
|
|
|
|
app.use(express.json());
|
|
|
|
|
app.use(express.static('public'));
|
|
|
|
|
|
|
|
|
|
// UUID generator
|
|
|
|
|
function generateUUID() {
|
|
|
|
|
return crypto.randomUUID();
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Database pool
|
|
|
|
|
const pool = mysql.createPool({
|
|
|
|
|
host: process.env.DB_HOST,
|
|
|
|
|
port: process.env.DB_PORT || 3306,
|
|
|
|
|
user: process.env.DB_USER,
|
|
|
|
|
password: process.env.DB_PASSWORD,
|
|
|
|
|
database: process.env.DB_NAME,
|
|
|
|
|
waitForConnections: true,
|
|
|
|
|
connectionLimit: 10,
|
|
|
|
|
queueLimit: 0
|
|
|
|
|
});
|
|
|
|
|
|
|
|
|
|
// Initialize database tables
|
|
|
|
|
async function initDatabase() {
|
|
|
|
|
const connection = await pool.getConnection();
|
|
|
|
|
try {
|
|
|
|
|
await connection.query(`
|
|
|
|
|
CREATE TABLE IF NOT EXISTS users (
|
|
|
|
|
id VARCHAR(36) PRIMARY KEY,
|
|
|
|
|
username VARCHAR(255) UNIQUE NOT NULL,
|
|
|
|
|
display_name VARCHAR(255),
|
|
|
|
|
email VARCHAR(255),
|
|
|
|
|
groups TEXT,
|
|
|
|
|
last_login TIMESTAMP,
|
|
|
|
|
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
|
|
|
|
|
)
|
|
|
|
|
`);
|
|
|
|
|
|
|
|
|
|
await connection.query(`
|
|
|
|
|
CREATE TABLE IF NOT EXISTS workers (
|
|
|
|
|
id VARCHAR(36) PRIMARY KEY,
|
|
|
|
|
name VARCHAR(255) UNIQUE NOT NULL,
|
|
|
|
|
status VARCHAR(50) NOT NULL,
|
|
|
|
|
last_heartbeat TIMESTAMP NULL,
|
|
|
|
|
api_key VARCHAR(255),
|
|
|
|
|
metadata JSON,
|
|
|
|
|
INDEX idx_status (status),
|
|
|
|
|
INDEX idx_heartbeat (last_heartbeat)
|
|
|
|
|
)
|
|
|
|
|
`);
|
|
|
|
|
|
|
|
|
|
await connection.query(`
|
|
|
|
|
CREATE TABLE IF NOT EXISTS workflows (
|
|
|
|
|
id VARCHAR(36) PRIMARY KEY,
|
|
|
|
|
name VARCHAR(255) NOT NULL,
|
|
|
|
|
description TEXT,
|
|
|
|
|
definition JSON NOT NULL,
|
|
|
|
|
created_by VARCHAR(255),
|
|
|
|
|
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
|
|
|
|
|
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
|
|
|
|
|
INDEX idx_name (name)
|
|
|
|
|
)
|
|
|
|
|
`);
|
|
|
|
|
|
|
|
|
|
await connection.query(`
|
|
|
|
|
CREATE TABLE IF NOT EXISTS executions (
|
|
|
|
|
id VARCHAR(36) PRIMARY KEY,
|
|
|
|
|
workflow_id VARCHAR(36) NOT NULL,
|
|
|
|
|
status VARCHAR(50) NOT NULL,
|
|
|
|
|
started_by VARCHAR(255),
|
|
|
|
|
started_at TIMESTAMP NULL,
|
|
|
|
|
completed_at TIMESTAMP NULL,
|
|
|
|
|
logs JSON,
|
|
|
|
|
FOREIGN KEY (workflow_id) REFERENCES workflows(id) ON DELETE CASCADE,
|
|
|
|
|
INDEX idx_workflow (workflow_id),
|
|
|
|
|
INDEX idx_status (status),
|
|
|
|
|
INDEX idx_started (started_at)
|
|
|
|
|
)
|
|
|
|
|
`);
|
|
|
|
|
|
|
|
|
|
console.log('Database tables initialized successfully');
|
|
|
|
|
} catch (error) {
|
|
|
|
|
console.error('Database initialization error:', error);
|
|
|
|
|
throw error;
|
|
|
|
|
} finally {
|
|
|
|
|
connection.release();
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// WebSocket connections
|
|
|
|
|
const clients = new Set();
|
|
|
|
|
wss.on('connection', (ws) => {
|
|
|
|
|
clients.add(ws);
|
|
|
|
|
ws.on('close', () => clients.delete(ws));
|
|
|
|
|
});
|
|
|
|
|
|
|
|
|
|
// Broadcast to all connected clients
|
|
|
|
|
function broadcast(data) {
|
|
|
|
|
clients.forEach(client => {
|
|
|
|
|
if (client.readyState === WebSocket.OPEN) {
|
|
|
|
|
client.send(JSON.stringify(data));
|
|
|
|
|
}
|
|
|
|
|
});
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Authelia SSO Middleware
|
|
|
|
|
async function authenticateSSO(req, res, next) {
|
|
|
|
|
// Check for Authelia headers
|
|
|
|
|
const remoteUser = req.headers['remote-user'];
|
|
|
|
|
const remoteName = req.headers['remote-name'];
|
|
|
|
|
const remoteEmail = req.headers['remote-email'];
|
|
|
|
|
const remoteGroups = req.headers['remote-groups'];
|
|
|
|
|
|
|
|
|
|
if (!remoteUser) {
|
|
|
|
|
return res.status(401).json({
|
|
|
|
|
error: 'Not authenticated',
|
|
|
|
|
message: 'Please access this service through auth.lotusguild.org'
|
|
|
|
|
});
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Check if user is in allowed groups (admin or employee)
|
|
|
|
|
const groups = remoteGroups ? remoteGroups.split(',').map(g => g.trim()) : [];
|
|
|
|
|
const allowedGroups = ['admin', 'employee'];
|
|
|
|
|
const hasAccess = groups.some(g => allowedGroups.includes(g));
|
|
|
|
|
|
|
|
|
|
if (!hasAccess) {
|
|
|
|
|
return res.status(403).json({
|
|
|
|
|
error: 'Access denied',
|
|
|
|
|
message: 'You must be in admin or employee group'
|
|
|
|
|
});
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Store/update user in database
|
|
|
|
|
try {
|
|
|
|
|
const userId = generateUUID();
|
|
|
|
|
await pool.query(
|
|
|
|
|
`INSERT INTO users (id, username, display_name, email, groups, last_login)
|
|
|
|
|
VALUES (?, ?, ?, ?, ?, NOW())
|
|
|
|
|
ON DUPLICATE KEY UPDATE
|
|
|
|
|
display_name=VALUES(display_name),
|
|
|
|
|
email=VALUES(email),
|
|
|
|
|
groups=VALUES(groups),
|
|
|
|
|
last_login=NOW()`,
|
|
|
|
|
[userId, remoteUser, remoteName, remoteEmail, remoteGroups]
|
|
|
|
|
);
|
|
|
|
|
} catch (error) {
|
|
|
|
|
console.error('Error updating user:', error);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Attach user info to request
|
|
|
|
|
req.user = {
|
|
|
|
|
username: remoteUser,
|
|
|
|
|
name: remoteName || remoteUser,
|
|
|
|
|
email: remoteEmail || '',
|
|
|
|
|
groups: groups,
|
|
|
|
|
isAdmin: groups.includes('admin')
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
next();
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Routes - All protected by SSO
|
|
|
|
|
app.get('/api/user', authenticateSSO, (req, res) => {
|
|
|
|
|
res.json(req.user);
|
|
|
|
|
});
|
|
|
|
|
|
|
|
|
|
app.get('/api/workflows', authenticateSSO, async (req, res) => {
|
|
|
|
|
try {
|
|
|
|
|
const [rows] = await pool.query('SELECT * FROM workflows ORDER BY created_at DESC');
|
|
|
|
|
res.json(rows);
|
|
|
|
|
} catch (error) {
|
|
|
|
|
res.status(500).json({ error: error.message });
|
|
|
|
|
}
|
|
|
|
|
});
|
|
|
|
|
|
|
|
|
|
app.post('/api/workflows', authenticateSSO, async (req, res) => {
|
|
|
|
|
try {
|
|
|
|
|
const { name, description, definition } = req.body;
|
|
|
|
|
const id = generateUUID();
|
|
|
|
|
|
|
|
|
|
await pool.query(
|
|
|
|
|
'INSERT INTO workflows (id, name, description, definition, created_by) VALUES (?, ?, ?, ?, ?)',
|
|
|
|
|
[id, name, description, JSON.stringify(definition), req.user.username]
|
|
|
|
|
);
|
|
|
|
|
|
|
|
|
|
res.json({ id, name, description, definition });
|
|
|
|
|
broadcast({ type: 'workflow_created', workflow_id: id });
|
|
|
|
|
} catch (error) {
|
|
|
|
|
res.status(500).json({ error: error.message });
|
|
|
|
|
}
|
|
|
|
|
});
|
|
|
|
|
|
|
|
|
|
app.delete('/api/workflows/:id', authenticateSSO, async (req, res) => {
|
|
|
|
|
try {
|
|
|
|
|
// Only admins can delete workflows
|
|
|
|
|
if (!req.user.isAdmin) {
|
|
|
|
|
return res.status(403).json({ error: 'Admin access required' });
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
await pool.query('DELETE FROM workflows WHERE id = ?', [req.params.id]);
|
|
|
|
|
res.json({ success: true });
|
|
|
|
|
broadcast({ type: 'workflow_deleted', workflow_id: req.params.id });
|
|
|
|
|
} catch (error) {
|
|
|
|
|
res.status(500).json({ error: error.message });
|
|
|
|
|
}
|
|
|
|
|
});
|
|
|
|
|
|
|
|
|
|
app.get('/api/workers', authenticateSSO, async (req, res) => {
|
|
|
|
|
try {
|
|
|
|
|
const [rows] = await pool.query('SELECT * FROM workers ORDER BY name');
|
|
|
|
|
res.json(rows);
|
|
|
|
|
} catch (error) {
|
|
|
|
|
res.status(500).json({ error: error.message });
|
|
|
|
|
}
|
|
|
|
|
});
|
|
|
|
|
|
|
|
|
|
app.post('/api/workers/heartbeat', async (req, res) => {
|
|
|
|
|
try {
|
|
|
|
|
const { worker_id, name, metadata } = req.body;
|
|
|
|
|
const apiKey = req.headers['x-api-key'];
|
|
|
|
|
|
|
|
|
|
// Verify API key
|
|
|
|
|
if (apiKey !== process.env.WORKER_API_KEY) {
|
|
|
|
|
return res.status(401).json({ error: 'Invalid API key' });
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
await pool.query(
|
|
|
|
|
`INSERT INTO workers (id, name, status, last_heartbeat, api_key, metadata)
|
|
|
|
|
VALUES (?, ?, 'online', NOW(), ?, ?)
|
|
|
|
|
ON DUPLICATE KEY UPDATE
|
|
|
|
|
status='online',
|
|
|
|
|
last_heartbeat=NOW(),
|
|
|
|
|
metadata=VALUES(metadata)`,
|
|
|
|
|
[worker_id, name, apiKey, JSON.stringify(metadata)]
|
|
|
|
|
);
|
|
|
|
|
|
|
|
|
|
broadcast({ type: 'worker_update', worker_id, status: 'online' });
|
|
|
|
|
res.json({ success: true });
|
|
|
|
|
} catch (error) {
|
|
|
|
|
res.status(500).json({ error: error.message });
|
|
|
|
|
}
|
|
|
|
|
});
|
|
|
|
|
|
|
|
|
|
app.post('/api/executions', authenticateSSO, async (req, res) => {
|
|
|
|
|
try {
|
|
|
|
|
const { workflow_id } = req.body;
|
|
|
|
|
const id = generateUUID();
|
|
|
|
|
|
|
|
|
|
await pool.query(
|
|
|
|
|
'INSERT INTO executions (id, workflow_id, status, started_by, started_at, logs) VALUES (?, ?, ?, ?, NOW(), ?)',
|
|
|
|
|
[id, workflow_id, 'running', req.user.username, JSON.stringify([])]
|
|
|
|
|
);
|
|
|
|
|
|
|
|
|
|
broadcast({ type: 'execution_started', execution_id: id, workflow_id });
|
|
|
|
|
res.json({ id, workflow_id, status: 'running' });
|
|
|
|
|
} catch (error) {
|
|
|
|
|
res.status(500).json({ error: error.message });
|
|
|
|
|
}
|
|
|
|
|
});
|
|
|
|
|
|
|
|
|
|
app.get('/api/executions', authenticateSSO, async (req, res) => {
|
|
|
|
|
try {
|
|
|
|
|
const [rows] = await pool.query(
|
|
|
|
|
'SELECT e.*, w.name as workflow_name FROM executions e LEFT JOIN workflows w ON e.workflow_id = w.id ORDER BY e.started_at DESC LIMIT 50'
|
|
|
|
|
);
|
|
|
|
|
res.json(rows);
|
|
|
|
|
} catch (error) {
|
|
|
|
|
res.status(500).json({ error: error.message });
|
|
|
|
|
}
|
|
|
|
|
});
|
|
|
|
|
|
|
|
|
|
app.get('/api/executions/:id', authenticateSSO, async (req, res) => {
|
|
|
|
|
try {
|
|
|
|
|
const [rows] = await pool.query('SELECT * FROM executions WHERE id = ?', [req.params.id]);
|
|
|
|
|
if (rows.length === 0) {
|
|
|
|
|
return res.status(404).json({ error: 'Not found' });
|
|
|
|
|
}
|
|
|
|
|
res.json(rows[0]);
|
|
|
|
|
} catch (error) {
|
|
|
|
|
res.status(500).json({ error: error.message });
|
|
|
|
|
}
|
|
|
|
|
});
|
|
|
|
|
|
|
|
|
|
// Health check (no auth required)
|
|
|
|
|
app.get('/health', async (req, res) => {
|
|
|
|
|
try {
|
|
|
|
|
await pool.query('SELECT 1');
|
|
|
|
|
res.json({
|
|
|
|
|
status: 'ok',
|
|
|
|
|
timestamp: new Date().toISOString(),
|
|
|
|
|
database: 'connected',
|
|
|
|
|
auth: 'authelia-sso'
|
|
|
|
|
});
|
|
|
|
|
} catch (error) {
|
|
|
|
|
res.status(500).json({
|
|
|
|
|
status: 'error',
|
|
|
|
|
timestamp: new Date().toISOString(),
|
|
|
|
|
database: 'disconnected',
|
|
|
|
|
error: error.message
|
|
|
|
|
});
|
|
|
|
|
}
|
|
|
|
|
});
|
|
|
|
|
|
|
|
|
|
// Start server
|
|
|
|
|
const PORT = process.env.PORT || 8080;
|
|
|
|
|
const HOST = process.env.HOST || '0.0.0.0';
|
|
|
|
|
|
|
|
|
|
initDatabase().then(() => {
|
|
|
|
|
server.listen(PORT, HOST, () => {
|
|
|
|
|
console.log(`PULSE Server running on http://${HOST}:${PORT}`);
|
|
|
|
|
console.log(`Connected to MariaDB at ${process.env.DB_HOST}`);
|
|
|
|
|
console.log(`Authentication: Authelia SSO`);
|
|
|
|
|
console.log(`Worker API Key configured: ${process.env.WORKER_API_KEY ? 'Yes' : 'No'}`);
|
|
|
|
|
});
|
|
|
|
|
}).catch(err => {
|
|
|
|
|
console.error('Failed to start server:', err);
|
|
|
|
|
process.exit(1);
|
|
|
|
|
});
|
2025-11-30 13:03:18 -05:00
|
|
|
|
|
|
|
|
|
|
|
|
|
// ============================================
|
|
|
|
|
// WORKFLOW EXECUTION ENGINE
|
|
|
|
|
// ============================================
|
|
|
|
|
|
|
|
|
|
// Store active workflow executions in memory
|
|
|
|
|
const activeExecutions = new Map();
|
|
|
|
|
|
|
|
|
|
// Execute workflow step by step
|
|
|
|
|
async function executeWorkflow(workflowId, executionId, userId, targetWorkers = 'all') {
|
|
|
|
|
try {
|
|
|
|
|
// Get workflow definition
|
|
|
|
|
const [workflows] = await pool.query('SELECT * FROM workflows WHERE id = ?', [workflowId]);
|
|
|
|
|
if (workflows.length === 0) {
|
|
|
|
|
throw new Error('Workflow not found');
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
const workflow = workflows[0];
|
|
|
|
|
const definition = JSON.parse(workflow.definition);
|
|
|
|
|
|
|
|
|
|
// Initialize execution state
|
|
|
|
|
const executionState = {
|
|
|
|
|
id: executionId,
|
|
|
|
|
workflowId: workflowId,
|
|
|
|
|
currentStep: 0,
|
|
|
|
|
steps: definition.steps || [],
|
|
|
|
|
results: [],
|
|
|
|
|
status: 'running',
|
|
|
|
|
waitingForInput: false,
|
|
|
|
|
targetWorkers: targetWorkers,
|
|
|
|
|
userId: userId
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
activeExecutions.set(executionId, executionState);
|
|
|
|
|
|
|
|
|
|
// Start executing steps
|
|
|
|
|
await executeNextStep(executionId);
|
|
|
|
|
|
|
|
|
|
} catch (error) {
|
|
|
|
|
console.error('Workflow execution error:', error);
|
|
|
|
|
await updateExecutionStatus(executionId, 'failed', error.message);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Execute the next step in a workflow
|
|
|
|
|
async function executeNextStep(executionId) {
|
|
|
|
|
const state = activeExecutions.get(executionId);
|
|
|
|
|
if (!state) return;
|
|
|
|
|
|
|
|
|
|
// Check if we've completed all steps
|
|
|
|
|
if (state.currentStep >= state.steps.length) {
|
|
|
|
|
await updateExecutionStatus(executionId, 'completed');
|
|
|
|
|
activeExecutions.delete(executionId);
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
const step = state.steps[state.currentStep];
|
|
|
|
|
|
|
|
|
|
// Check if step has a condition
|
|
|
|
|
if (step.condition && !evaluateCondition(step.condition, state)) {
|
|
|
|
|
console.log(`[Workflow] Skipping step ${state.currentStep}: condition not met`);
|
|
|
|
|
state.currentStep++;
|
|
|
|
|
return executeNextStep(executionId);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
console.log(`[Workflow] Executing step ${state.currentStep}: ${step.name}`);
|
|
|
|
|
|
|
|
|
|
try {
|
|
|
|
|
switch (step.type) {
|
|
|
|
|
case 'execute':
|
|
|
|
|
await executeCommandStep(executionId, step);
|
|
|
|
|
break;
|
|
|
|
|
case 'prompt':
|
|
|
|
|
await executePromptStep(executionId, step);
|
|
|
|
|
break;
|
|
|
|
|
case 'wait':
|
|
|
|
|
await executeWaitStep(executionId, step);
|
|
|
|
|
break;
|
|
|
|
|
default:
|
|
|
|
|
throw new Error(`Unknown step type: ${step.type}`);
|
|
|
|
|
}
|
|
|
|
|
} catch (error) {
|
|
|
|
|
await addExecutionLog(executionId, {
|
|
|
|
|
step: state.currentStep,
|
|
|
|
|
error: error.message,
|
|
|
|
|
timestamp: new Date().toISOString()
|
|
|
|
|
});
|
|
|
|
|
await updateExecutionStatus(executionId, 'failed', error.message);
|
|
|
|
|
activeExecutions.delete(executionId);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Execute a command on workers
|
|
|
|
|
async function executeCommandStep(executionId, step) {
|
|
|
|
|
const state = activeExecutions.get(executionId);
|
|
|
|
|
|
|
|
|
|
// Get target workers
|
|
|
|
|
const [workers] = await pool.query(
|
|
|
|
|
'SELECT * FROM workers WHERE status = "online"'
|
|
|
|
|
);
|
|
|
|
|
|
|
|
|
|
if (workers.length === 0) {
|
|
|
|
|
throw new Error('No online workers available');
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Filter workers based on target
|
|
|
|
|
let targetWorkers = workers;
|
|
|
|
|
if (step.targets && step.targets[0] !== 'all') {
|
|
|
|
|
targetWorkers = workers.filter(w => step.targets.includes(w.name));
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Send command to workers via WebSocket
|
|
|
|
|
const commandMessage = {
|
|
|
|
|
type: 'execute_command',
|
|
|
|
|
execution_id: executionId,
|
|
|
|
|
step_index: state.currentStep,
|
|
|
|
|
command: step.command,
|
|
|
|
|
timeout: step.timeout || 300000
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
// Broadcast to target workers
|
|
|
|
|
clients.forEach(client => {
|
|
|
|
|
if (client.readyState === WebSocket.OPEN) {
|
|
|
|
|
client.send(JSON.stringify(commandMessage));
|
|
|
|
|
}
|
|
|
|
|
});
|
|
|
|
|
|
|
|
|
|
await addExecutionLog(executionId, {
|
|
|
|
|
step: state.currentStep,
|
|
|
|
|
action: 'command_sent',
|
|
|
|
|
command: step.command,
|
|
|
|
|
workers: targetWorkers.map(w => w.name),
|
|
|
|
|
timestamp: new Date().toISOString()
|
|
|
|
|
});
|
|
|
|
|
|
|
|
|
|
// For now, move to next step immediately
|
|
|
|
|
// In production, we'd wait for worker responses
|
|
|
|
|
state.currentStep++;
|
|
|
|
|
|
|
|
|
|
// Small delay to allow command to execute
|
|
|
|
|
setTimeout(() => executeNextStep(executionId), 1000);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Execute a user prompt step
|
|
|
|
|
async function executePromptStep(executionId, step) {
|
|
|
|
|
const state = activeExecutions.get(executionId);
|
|
|
|
|
|
|
|
|
|
state.waitingForInput = true;
|
|
|
|
|
state.promptData = {
|
|
|
|
|
message: step.message,
|
|
|
|
|
options: step.options,
|
|
|
|
|
step: state.currentStep
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
await addExecutionLog(executionId, {
|
|
|
|
|
step: state.currentStep,
|
|
|
|
|
action: 'waiting_for_input',
|
|
|
|
|
prompt: step.message,
|
|
|
|
|
timestamp: new Date().toISOString()
|
|
|
|
|
});
|
|
|
|
|
|
|
|
|
|
// Notify frontend that input is needed
|
|
|
|
|
broadcast({
|
|
|
|
|
type: 'execution_prompt',
|
|
|
|
|
execution_id: executionId,
|
|
|
|
|
prompt: state.promptData
|
|
|
|
|
});
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Execute a wait/delay step
|
|
|
|
|
async function executeWaitStep(executionId, step) {
|
|
|
|
|
const state = activeExecutions.get(executionId);
|
|
|
|
|
const delay = step.duration || 1000;
|
|
|
|
|
|
|
|
|
|
await addExecutionLog(executionId, {
|
|
|
|
|
step: state.currentStep,
|
|
|
|
|
action: 'waiting',
|
|
|
|
|
duration: delay,
|
|
|
|
|
timestamp: new Date().toISOString()
|
|
|
|
|
});
|
|
|
|
|
|
|
|
|
|
state.currentStep++;
|
|
|
|
|
setTimeout(() => executeNextStep(executionId), delay);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Handle user input for prompts
|
|
|
|
|
function handleUserInput(executionId, response) {
|
|
|
|
|
const state = activeExecutions.get(executionId);
|
|
|
|
|
if (!state || !state.waitingForInput) {
|
|
|
|
|
return false;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
state.promptResponse = response;
|
|
|
|
|
state.waitingForInput = false;
|
|
|
|
|
state.currentStep++;
|
|
|
|
|
|
|
|
|
|
addExecutionLog(executionId, {
|
|
|
|
|
step: state.currentStep - 1,
|
|
|
|
|
action: 'user_response',
|
|
|
|
|
response: response,
|
|
|
|
|
timestamp: new Date().toISOString()
|
|
|
|
|
});
|
|
|
|
|
|
|
|
|
|
executeNextStep(executionId);
|
|
|
|
|
return true;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Evaluate conditions
|
|
|
|
|
function evaluateCondition(condition, state) {
|
|
|
|
|
try {
|
|
|
|
|
// Simple condition evaluation
|
|
|
|
|
// In production, use a proper expression evaluator
|
|
|
|
|
const promptResponse = state.promptResponse;
|
|
|
|
|
return eval(condition);
|
|
|
|
|
} catch (error) {
|
|
|
|
|
console.error('Condition evaluation error:', error);
|
|
|
|
|
return false;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Helper functions
|
|
|
|
|
async function updateExecutionStatus(executionId, status, error = null) {
|
|
|
|
|
const updates = { status };
|
|
|
|
|
if (status === 'completed' || status === 'failed') {
|
|
|
|
|
updates.completed_at = new Date();
|
|
|
|
|
}
|
|
|
|
|
if (error) {
|
|
|
|
|
// Add error to logs
|
|
|
|
|
await addExecutionLog(executionId, { error, timestamp: new Date().toISOString() });
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
await pool.query(
|
|
|
|
|
'UPDATE executions SET status = ?, completed_at = ? WHERE id = ?',
|
|
|
|
|
[status, updates.completed_at || null, executionId]
|
|
|
|
|
);
|
|
|
|
|
|
|
|
|
|
broadcast({
|
|
|
|
|
type: 'execution_status',
|
|
|
|
|
execution_id: executionId,
|
|
|
|
|
status: status
|
|
|
|
|
});
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
async function addExecutionLog(executionId, logEntry) {
|
|
|
|
|
const [rows] = await pool.query('SELECT logs FROM executions WHERE id = ?', [executionId]);
|
|
|
|
|
if (rows.length === 0) return;
|
|
|
|
|
|
|
|
|
|
const logs = JSON.parse(rows[0].logs || '[]');
|
|
|
|
|
logs.push(logEntry);
|
|
|
|
|
|
|
|
|
|
await pool.query('UPDATE executions SET logs = ? WHERE id = ?', [
|
|
|
|
|
JSON.stringify(logs),
|
|
|
|
|
executionId
|
|
|
|
|
]);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// ============================================
|
|
|
|
|
// API ROUTES - Add these to your server.js
|
|
|
|
|
// ============================================
|
|
|
|
|
|
|
|
|
|
// Start workflow execution
|
|
|
|
|
app.post('/api/executions', authenticateSSO, async (req, res) => {
|
|
|
|
|
try {
|
|
|
|
|
const { workflow_id, target_workers } = req.body;
|
|
|
|
|
const id = generateUUID();
|
|
|
|
|
|
|
|
|
|
await pool.query(
|
|
|
|
|
'INSERT INTO executions (id, workflow_id, status, started_by, started_at, logs) VALUES (?, ?, ?, ?, NOW(), ?)',
|
|
|
|
|
[id, workflow_id, 'running', req.user.username, JSON.stringify([])]
|
|
|
|
|
);
|
|
|
|
|
|
|
|
|
|
// Start execution
|
|
|
|
|
executeWorkflow(workflow_id, id, req.user.username, target_workers || 'all');
|
|
|
|
|
|
|
|
|
|
broadcast({ type: 'execution_started', execution_id: id, workflow_id });
|
|
|
|
|
res.json({ id, workflow_id, status: 'running' });
|
|
|
|
|
} catch (error) {
|
|
|
|
|
res.status(500).json({ error: error.message });
|
|
|
|
|
}
|
|
|
|
|
});
|
|
|
|
|
|
|
|
|
|
// Respond to workflow prompt
|
|
|
|
|
app.post('/api/executions/:id/respond', authenticateSSO, async (req, res) => {
|
|
|
|
|
try {
|
|
|
|
|
const { response } = req.body;
|
|
|
|
|
const success = handleUserInput(req.params.id, response);
|
|
|
|
|
|
|
|
|
|
if (success) {
|
|
|
|
|
res.json({ success: true });
|
|
|
|
|
} else {
|
|
|
|
|
res.status(400).json({ error: 'Execution not waiting for input' });
|
|
|
|
|
}
|
|
|
|
|
} catch (error) {
|
|
|
|
|
res.status(500).json({ error: error.message });
|
|
|
|
|
}
|
|
|
|
|
});
|
|
|
|
|
|
|
|
|
|
// Get execution details with logs
|
|
|
|
|
app.get('/api/executions/:id', authenticateSSO, async (req, res) => {
|
|
|
|
|
try {
|
|
|
|
|
const [rows] = await pool.query('SELECT * FROM executions WHERE id = ?', [req.params.id]);
|
|
|
|
|
if (rows.length === 0) {
|
|
|
|
|
return res.status(404).json({ error: 'Not found' });
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
const execution = rows[0];
|
|
|
|
|
const state = activeExecutions.get(req.params.id);
|
|
|
|
|
|
|
|
|
|
res.json({
|
|
|
|
|
...execution,
|
|
|
|
|
logs: JSON.parse(execution.logs || '[]'),
|
|
|
|
|
waiting_for_input: state?.waitingForInput || false,
|
|
|
|
|
prompt: state?.promptData || null
|
|
|
|
|
});
|
|
|
|
|
} catch (error) {
|
|
|
|
|
res.status(500).json({ error: error.message });
|
|
|
|
|
}
|
|
|
|
|
});
|
|
|
|
|
|
|
|
|
|
// Delete worker (admin only)
|
|
|
|
|
app.delete('/api/workers/:id', authenticateSSO, async (req, res) => {
|
|
|
|
|
try {
|
|
|
|
|
if (!req.user.isAdmin) {
|
|
|
|
|
return res.status(403).json({ error: 'Admin access required' });
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
await pool.query('DELETE FROM workers WHERE id = ?', [req.params.id]);
|
|
|
|
|
res.json({ success: true });
|
|
|
|
|
broadcast({ type: 'worker_deleted', worker_id: req.params.id });
|
|
|
|
|
} catch (error) {
|
|
|
|
|
res.status(500).json({ error: error.message });
|
|
|
|
|
}
|
|
|
|
|
});
|
|
|
|
|
|
|
|
|
|
// Send direct command to specific worker (for testing)
|
|
|
|
|
app.post('/api/workers/:id/command', authenticateSSO, async (req, res) => {
|
|
|
|
|
try {
|
|
|
|
|
const { command } = req.body;
|
|
|
|
|
const executionId = generateUUID();
|
|
|
|
|
|
|
|
|
|
// Send command via WebSocket
|
|
|
|
|
const commandMessage = {
|
|
|
|
|
type: 'execute_command',
|
|
|
|
|
execution_id: executionId,
|
|
|
|
|
command: command,
|
|
|
|
|
worker_id: req.params.id,
|
|
|
|
|
timeout: 60000
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
clients.forEach(client => {
|
|
|
|
|
if (client.readyState === WebSocket.OPEN) {
|
|
|
|
|
client.send(JSON.stringify(commandMessage));
|
|
|
|
|
}
|
|
|
|
|
});
|
|
|
|
|
|
|
|
|
|
res.json({ success: true, execution_id: executionId });
|
|
|
|
|
} catch (error) {
|
|
|
|
|
res.status(500).json({ error: error.message });
|
|
|
|
|
}
|
|
|
|
|
});
|
|
|
|
|
|
|
|
|
|
// ============================================
|
|
|
|
|
// EXAMPLE WORKFLOW DEFINITIONS
|
|
|
|
|
// ============================================
|
|
|
|
|
|
|
|
|
|
// Example 1: Simple command execution
|
|
|
|
|
const simpleWorkflow = {
|
|
|
|
|
name: "Update System Packages",
|
|
|
|
|
description: "Update all packages on target servers",
|
|
|
|
|
steps: [
|
|
|
|
|
{
|
|
|
|
|
name: "Update package list",
|
|
|
|
|
type: "execute",
|
|
|
|
|
targets: ["all"],
|
|
|
|
|
command: "apt update"
|
|
|
|
|
},
|
|
|
|
|
{
|
|
|
|
|
name: "User Approval",
|
|
|
|
|
type: "prompt",
|
|
|
|
|
message: "Packages updated. Proceed with upgrade?",
|
|
|
|
|
options: ["Yes", "No"]
|
|
|
|
|
},
|
|
|
|
|
{
|
|
|
|
|
name: "Upgrade packages",
|
|
|
|
|
type: "execute",
|
|
|
|
|
targets: ["all"],
|
|
|
|
|
command: "apt upgrade -y",
|
|
|
|
|
condition: "promptResponse === 'Yes'"
|
|
|
|
|
}
|
|
|
|
|
]
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
// Example 2: Complex workflow with conditions
|
|
|
|
|
const backupWorkflow = {
|
|
|
|
|
name: "Backup and Verify",
|
|
|
|
|
description: "Create backup and verify integrity",
|
|
|
|
|
steps: [
|
|
|
|
|
{
|
|
|
|
|
name: "Create backup",
|
|
|
|
|
type: "execute",
|
|
|
|
|
targets: ["all"],
|
|
|
|
|
command: "tar -czf /tmp/backup-$(date +%Y%m%d).tar.gz /opt/pulse-worker"
|
|
|
|
|
},
|
|
|
|
|
{
|
|
|
|
|
name: "Wait for backup",
|
|
|
|
|
type: "wait",
|
|
|
|
|
duration: 5000
|
|
|
|
|
},
|
|
|
|
|
{
|
|
|
|
|
name: "Verify backup",
|
|
|
|
|
type: "execute",
|
|
|
|
|
targets: ["all"],
|
|
|
|
|
command: "tar -tzf /tmp/backup-*.tar.gz > /dev/null && echo 'Backup OK' || echo 'Backup FAILED'"
|
|
|
|
|
},
|
|
|
|
|
{
|
|
|
|
|
name: "Cleanup decision",
|
|
|
|
|
type: "prompt",
|
|
|
|
|
message: "Backup complete. Delete old backups?",
|
|
|
|
|
options: ["Yes", "No", "Cancel"]
|
|
|
|
|
},
|
|
|
|
|
{
|
|
|
|
|
name: "Cleanup old backups",
|
|
|
|
|
type: "execute",
|
|
|
|
|
targets: ["all"],
|
|
|
|
|
command: "find /tmp -name 'backup-*.tar.gz' -mtime +7 -delete",
|
|
|
|
|
condition: "promptResponse === 'Yes'"
|
|
|
|
|
}
|
|
|
|
|
]
|
|
|
|
|
};
|