Files
pulse/server.js
Jared Vititoe 2f941d0b6f Fix: Remove duplicate old workflow execution code
Removed old/obsolete workflow execution system that was conflicting
with the new executeWorkflowSteps() engine:

Removed:
- activeExecutions Map (old tracking system)
- executeWorkflow() - old workflow executor
- executeNextStep() - old step processor
- executeCommandStep() - old command executor (duplicate)
- handleUserInput() - unimplemented prompt handler
- Duplicate app.post('/api/executions') endpoint
- app.post('/api/executions/:id/respond') endpoint

This was causing "Cannot read properties of undefined (reading 'target')"
error because the old code was being called instead of the new engine.

The new executeWorkflowSteps() engine is now the only workflow system.

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
2026-01-07 23:24:42 -05:00

1078 lines
33 KiB
JavaScript

const express = require('express');
const http = require('http');
const WebSocket = require('ws');
const mysql = require('mysql2/promise');
const crypto = require('crypto');
require('dotenv').config();
const app = express();
const server = http.createServer(app);
const wss = new WebSocket.Server({ server });
// Middleware
app.use(express.json());
app.use(express.static('public'));
// UUID generator
function generateUUID() {
return crypto.randomUUID();
}
// Database pool
const pool = mysql.createPool({
host: process.env.DB_HOST,
port: process.env.DB_PORT || 3306,
user: process.env.DB_USER,
password: process.env.DB_PASSWORD,
database: process.env.DB_NAME,
waitForConnections: true,
connectionLimit: 10,
queueLimit: 0
});
// Initialize database tables
async function initDatabase() {
const connection = await pool.getConnection();
try {
await connection.query(`
CREATE TABLE IF NOT EXISTS users (
id VARCHAR(36) PRIMARY KEY,
username VARCHAR(255) UNIQUE NOT NULL,
display_name VARCHAR(255),
email VARCHAR(255),
groups TEXT,
last_login TIMESTAMP,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
`);
// Database schema is managed manually - migrations removed after direct database fixes
await connection.query(`
CREATE TABLE IF NOT EXISTS workers (
id VARCHAR(36) PRIMARY KEY,
name VARCHAR(255) UNIQUE NOT NULL,
status VARCHAR(50) NOT NULL,
last_heartbeat TIMESTAMP NULL,
api_key VARCHAR(255),
metadata JSON,
INDEX idx_status (status),
INDEX idx_heartbeat (last_heartbeat)
)
`);
await connection.query(`
CREATE TABLE IF NOT EXISTS workflows (
id VARCHAR(36) PRIMARY KEY,
name VARCHAR(255) NOT NULL,
description TEXT,
definition JSON NOT NULL,
created_by VARCHAR(255),
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
INDEX idx_name (name)
)
`);
await connection.query(`
CREATE TABLE IF NOT EXISTS executions (
id VARCHAR(36) PRIMARY KEY,
workflow_id VARCHAR(36) NULL,
status VARCHAR(50) NOT NULL,
started_by VARCHAR(255),
started_at TIMESTAMP NULL,
completed_at TIMESTAMP NULL,
logs JSON,
INDEX idx_workflow (workflow_id),
INDEX idx_status (status),
INDEX idx_started (started_at)
)
`);
await connection.query(`
CREATE TABLE IF NOT EXISTS scheduled_commands (
id VARCHAR(36) PRIMARY KEY,
name VARCHAR(255) NOT NULL,
command TEXT NOT NULL,
worker_ids JSON NOT NULL,
schedule_type VARCHAR(50) NOT NULL,
schedule_value VARCHAR(255) NOT NULL,
enabled BOOLEAN DEFAULT TRUE,
created_by VARCHAR(255),
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
last_run TIMESTAMP NULL,
next_run TIMESTAMP NULL,
INDEX idx_enabled (enabled),
INDEX idx_next_run (next_run)
)
`);
console.log('Database tables initialized successfully');
} catch (error) {
console.error('Database initialization error:', error);
throw error;
} finally {
connection.release();
}
}
// Auto-cleanup old executions (runs daily)
async function cleanupOldExecutions() {
try {
const retentionDays = parseInt(process.env.EXECUTION_RETENTION_DAYS) || 30;
const [result] = await pool.query(
`DELETE FROM executions
WHERE status IN ('completed', 'failed')
AND started_at < DATE_SUB(NOW(), INTERVAL ? DAY)`,
[retentionDays]
);
console.log(`[Cleanup] Removed ${result.affectedRows} executions older than ${retentionDays} days`);
} catch (error) {
console.error('[Cleanup] Error removing old executions:', error);
}
}
// Run cleanup daily at 3 AM
setInterval(cleanupOldExecutions, 24 * 60 * 60 * 1000);
// Run cleanup on startup
cleanupOldExecutions();
// Scheduled Commands Processor
async function processScheduledCommands() {
try {
const [schedules] = await pool.query(
`SELECT * FROM scheduled_commands
WHERE enabled = TRUE
AND (next_run IS NULL OR next_run <= NOW())`
);
for (const schedule of schedules) {
console.log(`[Scheduler] Running scheduled command: ${schedule.name}`);
const workerIds = JSON.parse(schedule.worker_ids);
// Execute command on each worker
for (const workerId of workerIds) {
const workerWs = workers.get(workerId);
if (workerWs && workerWs.readyState === WebSocket.OPEN) {
const executionId = generateUUID();
// Create execution record
await pool.query(
'INSERT INTO executions (id, workflow_id, status, started_by, started_at, logs) VALUES (?, ?, ?, ?, NOW(), ?)',
[executionId, null, 'running', `scheduler:${schedule.name}`, JSON.stringify([{
step: 'scheduled_command',
action: 'command_sent',
worker_id: workerId,
command: schedule.command,
timestamp: new Date().toISOString()
}])]
);
// Send command to worker
workerWs.send(JSON.stringify({
type: 'execute_command',
execution_id: executionId,
command: schedule.command,
worker_id: workerId,
timeout: 300000 // 5 minute timeout for scheduled commands
}));
broadcast({ type: 'execution_started', execution_id: executionId, workflow_id: null });
}
}
// Update last_run and calculate next_run
const nextRun = calculateNextRun(schedule.schedule_type, schedule.schedule_value);
await pool.query(
'UPDATE scheduled_commands SET last_run = NOW(), next_run = ? WHERE id = ?',
[nextRun, schedule.id]
);
}
} catch (error) {
console.error('[Scheduler] Error processing scheduled commands:', error);
}
}
function calculateNextRun(scheduleType, scheduleValue) {
const now = new Date();
if (scheduleType === 'interval') {
// Interval in minutes
const minutes = parseInt(scheduleValue);
return new Date(now.getTime() + minutes * 60000);
} else if (scheduleType === 'daily') {
// Daily at HH:MM
const [hours, minutes] = scheduleValue.split(':').map(Number);
const next = new Date(now);
next.setHours(hours, minutes, 0, 0);
// If time has passed today, schedule for tomorrow
if (next <= now) {
next.setDate(next.getDate() + 1);
}
return next;
} else if (scheduleType === 'hourly') {
// Every N hours
const hours = parseInt(scheduleValue);
return new Date(now.getTime() + hours * 3600000);
}
return null;
}
// Run scheduler every minute
setInterval(processScheduledCommands, 60 * 1000);
// Initial run on startup
setTimeout(processScheduledCommands, 5000);
// WebSocket connections
const clients = new Set();
const workers = new Map(); // Map worker_id -> WebSocket connection
wss.on('connection', (ws) => {
clients.add(ws);
// Handle incoming messages from workers
ws.on('message', async (data) => {
try {
const message = JSON.parse(data.toString());
console.log('WebSocket message received:', message.type);
if (message.type === 'command_result') {
// Handle command result from worker
const { execution_id, command_id, worker_id, success, stdout, stderr, duration, timestamp } = message;
// Add result to execution logs
await addExecutionLog(execution_id, {
step: 'command_execution',
action: 'command_result',
command_id: command_id, // Include command_id for workflow tracking
worker_id: worker_id,
success: success,
stdout: stdout,
stderr: stderr,
duration: duration,
timestamp: timestamp || new Date().toISOString()
});
// For non-workflow executions, update status immediately
// For workflow executions, the workflow engine will update status
const [execution] = await pool.query('SELECT workflow_id FROM executions WHERE id = ?', [execution_id]);
if (execution.length > 0 && !execution[0].workflow_id) {
// Only update status for quick commands (no workflow_id)
const finalStatus = success ? 'completed' : 'failed';
await updateExecutionStatus(execution_id, finalStatus);
}
// Broadcast result to all connected clients
broadcast({
type: 'command_result',
execution_id: execution_id,
worker_id: worker_id,
success: success,
stdout: stdout,
stderr: stderr
});
console.log(`Command result received for execution ${execution_id}: ${success ? 'success' : 'failed'}`);
}
if (message.type === 'workflow_result') {
// Handle workflow result from worker
const { execution_id, worker_id, success, message: resultMessage, timestamp } = message;
// Add final result to logs
await addExecutionLog(execution_id, {
step: 'workflow_completion',
action: 'workflow_result',
worker_id: worker_id,
success: success,
message: resultMessage,
timestamp: timestamp || new Date().toISOString()
});
// Update execution status
const finalStatus = success ? 'completed' : 'failed';
await updateExecutionStatus(execution_id, finalStatus);
// Broadcast completion to all clients
broadcast({
type: 'workflow_result',
execution_id: execution_id,
status: finalStatus,
success: success,
message: resultMessage
});
console.log(`Workflow result received for execution ${execution_id}: ${finalStatus}`);
}
if (message.type === 'worker_connect') {
// Handle worker connection
const { worker_id, worker_name } = message;
console.log(`Worker connected: ${worker_name} (${worker_id})`);
// Find the database worker ID by name
const [dbWorkers] = await pool.query(
'SELECT id FROM workers WHERE name = ?',
[worker_name]
);
if (dbWorkers.length > 0) {
const dbWorkerId = dbWorkers[0].id;
// Store worker WebSocket connection using BOTH IDs
workers.set(worker_id, ws); // Runtime ID
workers.set(dbWorkerId, ws); // Database ID
// Store mapping for cleanup
ws.workerId = worker_id;
ws.dbWorkerId = dbWorkerId;
console.log(`Mapped worker: runtime_id=${worker_id}, db_id=${dbWorkerId}, name=${worker_name}`);
// Update worker status to online
await pool.query(
`UPDATE workers SET status='online', last_heartbeat=NOW() WHERE id=?`,
[dbWorkerId]
);
// Broadcast worker status update with database ID
broadcast({
type: 'worker_update',
worker_id: dbWorkerId,
status: 'online'
});
} else {
console.log(`Worker ${worker_name} not found in database, will be created on heartbeat`);
}
}
if (message.type === 'pong') {
// Handle worker pong response
const { worker_id } = message;
await pool.query(
`UPDATE workers SET last_heartbeat=NOW() WHERE id=?`,
[worker_id]
);
}
} catch (error) {
console.error('WebSocket message error:', error);
}
});
ws.on('close', () => {
clients.delete(ws);
// Remove worker from workers map when disconnected (both runtime and db IDs)
if (ws.workerId) {
workers.delete(ws.workerId);
console.log(`Worker ${ws.workerId} (runtime ID) disconnected`);
}
if (ws.dbWorkerId) {
workers.delete(ws.dbWorkerId);
console.log(`Worker ${ws.dbWorkerId} (database ID) disconnected`);
}
});
});
// Broadcast to all connected clients
function broadcast(data) {
clients.forEach(client => {
if (client.readyState === WebSocket.OPEN) {
client.send(JSON.stringify(data));
}
});
}
// Authelia SSO Middleware
async function authenticateSSO(req, res, next) {
// Check for Authelia headers
const remoteUser = req.headers['remote-user'];
const remoteName = req.headers['remote-name'];
const remoteEmail = req.headers['remote-email'];
const remoteGroups = req.headers['remote-groups'];
if (!remoteUser) {
return res.status(401).json({
error: 'Not authenticated',
message: 'Please access this service through auth.lotusguild.org'
});
}
// Check if user is in allowed groups (admin or employee)
const groups = remoteGroups ? remoteGroups.split(',').map(g => g.trim()) : [];
const allowedGroups = ['admin', 'employee'];
const hasAccess = groups.some(g => allowedGroups.includes(g));
if (!hasAccess) {
return res.status(403).json({
error: 'Access denied',
message: 'You must be in admin or employee group'
});
}
// Store/update user in database
try {
const userId = generateUUID();
await pool.query(
`INSERT INTO users (id, username, display_name, email, groups, last_login)
VALUES (?, ?, ?, ?, ?, NOW())
ON DUPLICATE KEY UPDATE
display_name=VALUES(display_name),
email=VALUES(email),
groups=VALUES(groups),
last_login=NOW()`,
[userId, remoteUser, remoteName, remoteEmail, remoteGroups]
);
} catch (error) {
console.error('Error updating user:', error);
}
// Attach user info to request
req.user = {
username: remoteUser,
name: remoteName || remoteUser,
email: remoteEmail || '',
groups: groups,
isAdmin: groups.includes('admin')
};
next();
}
// Workflow Execution Engine
async function executeWorkflowSteps(executionId, workflowId, definition, username) {
try {
console.log(`[Workflow] Starting execution ${executionId} for workflow ${workflowId}`);
const steps = definition.steps || [];
let allStepsSucceeded = true;
for (let i = 0; i < steps.length; i++) {
const step = steps[i];
console.log(`[Workflow] Execution ${executionId} - Step ${i + 1}/${steps.length}: ${step.name}`);
// Add step start log
await addExecutionLog(executionId, {
step: i + 1,
step_name: step.name,
action: 'step_started',
timestamp: new Date().toISOString()
});
broadcast({
type: 'workflow_step_started',
execution_id: executionId,
step: i + 1,
step_name: step.name
});
if (step.type === 'execute') {
// Execute command step
const success = await executeCommandStep(executionId, step, i + 1);
if (!success) {
allStepsSucceeded = false;
break; // Stop workflow on failure
}
} else if (step.type === 'wait') {
// Wait step (delay in seconds)
const seconds = step.duration || 5;
await addExecutionLog(executionId, {
step: i + 1,
action: 'waiting',
duration: seconds,
timestamp: new Date().toISOString()
});
await new Promise(resolve => setTimeout(resolve, seconds * 1000));
} else if (step.type === 'prompt') {
// Interactive prompt step (not fully implemented, would need user interaction)
await addExecutionLog(executionId, {
step: i + 1,
action: 'prompt_skipped',
message: 'Interactive prompts not yet supported',
timestamp: new Date().toISOString()
});
}
// Add step completion log
await addExecutionLog(executionId, {
step: i + 1,
step_name: step.name,
action: 'step_completed',
timestamp: new Date().toISOString()
});
broadcast({
type: 'workflow_step_completed',
execution_id: executionId,
step: i + 1,
step_name: step.name
});
}
// Mark execution as completed or failed
const finalStatus = allStepsSucceeded ? 'completed' : 'failed';
await updateExecutionStatus(executionId, finalStatus);
console.log(`[Workflow] Execution ${executionId} finished with status: ${finalStatus}`);
} catch (error) {
console.error(`[Workflow] Execution ${executionId} error:`, error);
await addExecutionLog(executionId, {
action: 'workflow_error',
error: error.message,
timestamp: new Date().toISOString()
});
await updateExecutionStatus(executionId, 'failed');
}
}
async function executeCommandStep(executionId, step, stepNumber) {
try {
const command = step.command;
const targets = step.targets || ['all'];
// Determine which workers to target
let targetWorkerIds = [];
if (targets.includes('all')) {
// Get all online workers
const [onlineWorkers] = await pool.query('SELECT id FROM workers WHERE status = ?', ['online']);
targetWorkerIds = onlineWorkers.map(w => w.id);
} else {
// Specific worker IDs or names
for (const target of targets) {
// Try to find by ID first, then by name
const [workerById] = await pool.query('SELECT id FROM workers WHERE id = ?', [target]);
if (workerById.length > 0) {
targetWorkerIds.push(workerById[0].id);
} else {
const [workerByName] = await pool.query('SELECT id FROM workers WHERE name = ?', [target]);
if (workerByName.length > 0) {
targetWorkerIds.push(workerByName[0].id);
}
}
}
}
if (targetWorkerIds.length === 0) {
await addExecutionLog(executionId, {
step: stepNumber,
action: 'no_workers',
message: 'No workers available for this step',
timestamp: new Date().toISOString()
});
return false;
}
// Execute command on each target worker and wait for results
const results = [];
for (const workerId of targetWorkerIds) {
const workerWs = workers.get(workerId);
if (!workerWs || workerWs.readyState !== WebSocket.OPEN) {
await addExecutionLog(executionId, {
step: stepNumber,
action: 'worker_offline',
worker_id: workerId,
timestamp: new Date().toISOString()
});
continue;
}
// Send command to worker
const commandId = generateUUID();
await addExecutionLog(executionId, {
step: stepNumber,
action: 'command_sent',
worker_id: workerId,
command: command,
command_id: commandId,
timestamp: new Date().toISOString()
});
workerWs.send(JSON.stringify({
type: 'execute_command',
execution_id: executionId,
command_id: commandId,
command: command,
worker_id: workerId,
timeout: 120000 // 2 minute timeout
}));
// Wait for command result (with timeout)
const result = await waitForCommandResult(executionId, commandId, 120000);
results.push(result);
if (!result.success) {
// Command failed, workflow should stop
return false;
}
}
// All commands succeeded
return results.every(r => r.success);
} catch (error) {
console.error(`[Workflow] Error executing command step:`, error);
await addExecutionLog(executionId, {
step: stepNumber,
action: 'step_error',
error: error.message,
timestamp: new Date().toISOString()
});
return false;
}
}
async function waitForCommandResult(executionId, commandId, timeout) {
return new Promise((resolve) => {
const startTime = Date.now();
const checkInterval = setInterval(async () => {
try {
// Check if we've received the command result in logs
const [execution] = await pool.query('SELECT logs FROM executions WHERE id = ?', [executionId]);
if (execution.length > 0) {
const logs = typeof execution[0].logs === 'string' ? JSON.parse(execution[0].logs) : execution[0].logs;
const resultLog = logs.find(log => log.command_id === commandId && log.action === 'command_result');
if (resultLog) {
clearInterval(checkInterval);
resolve({
success: resultLog.success,
stdout: resultLog.stdout,
stderr: resultLog.stderr
});
return;
}
}
// Check timeout
if (Date.now() - startTime > timeout) {
clearInterval(checkInterval);
resolve({
success: false,
error: 'Command timeout'
});
}
} catch (error) {
console.error('[Workflow] Error checking command result:', error);
clearInterval(checkInterval);
resolve({
success: false,
error: error.message
});
}
}, 500); // Check every 500ms
});
}
// Routes - All protected by SSO
app.get('/api/user', authenticateSSO, (req, res) => {
res.json(req.user);
});
app.get('/api/workflows', authenticateSSO, async (req, res) => {
try {
const [rows] = await pool.query('SELECT * FROM workflows ORDER BY created_at DESC');
res.json(rows);
} catch (error) {
res.status(500).json({ error: error.message });
}
});
app.post('/api/workflows', authenticateSSO, async (req, res) => {
try {
const { name, description, definition } = req.body;
const id = generateUUID();
await pool.query(
'INSERT INTO workflows (id, name, description, definition, created_by) VALUES (?, ?, ?, ?, ?)',
[id, name, description, JSON.stringify(definition), req.user.username]
);
res.json({ id, name, description, definition });
broadcast({ type: 'workflow_created', workflow_id: id });
} catch (error) {
res.status(500).json({ error: error.message });
}
});
app.delete('/api/workflows/:id', authenticateSSO, async (req, res) => {
try {
// Only admins can delete workflows
if (!req.user.isAdmin) {
return res.status(403).json({ error: 'Admin access required' });
}
await pool.query('DELETE FROM workflows WHERE id = ?', [req.params.id]);
res.json({ success: true });
broadcast({ type: 'workflow_deleted', workflow_id: req.params.id });
} catch (error) {
res.status(500).json({ error: error.message });
}
});
app.get('/api/workers', authenticateSSO, async (req, res) => {
try {
const [rows] = await pool.query('SELECT * FROM workers ORDER BY name');
res.json(rows);
} catch (error) {
res.status(500).json({ error: error.message });
}
});
app.post('/api/workers/heartbeat', async (req, res) => {
try {
const { worker_id, name, metadata } = req.body;
const apiKey = req.headers['x-api-key'];
// Verify API key
if (apiKey !== process.env.WORKER_API_KEY) {
return res.status(401).json({ error: 'Invalid API key' });
}
await pool.query(
`INSERT INTO workers (id, name, status, last_heartbeat, api_key, metadata)
VALUES (?, ?, 'online', NOW(), ?, ?)
ON DUPLICATE KEY UPDATE
status='online',
last_heartbeat=NOW(),
metadata=VALUES(metadata)`,
[worker_id, name, apiKey, JSON.stringify(metadata)]
);
broadcast({ type: 'worker_update', worker_id, status: 'online' });
res.json({ success: true });
} catch (error) {
res.status(500).json({ error: error.message });
}
});
app.post('/api/executions', authenticateSSO, async (req, res) => {
try {
const { workflow_id } = req.body;
const id = generateUUID();
// Get workflow definition
const [workflows] = await pool.query('SELECT * FROM workflows WHERE id = ?', [workflow_id]);
if (workflows.length === 0) {
return res.status(404).json({ error: 'Workflow not found' });
}
const workflow = workflows[0];
const definition = typeof workflow.definition === 'string' ? JSON.parse(workflow.definition) : workflow.definition;
// Create execution record
await pool.query(
'INSERT INTO executions (id, workflow_id, status, started_by, started_at, logs) VALUES (?, ?, ?, ?, NOW(), ?)',
[id, workflow_id, 'running', req.user.username, JSON.stringify([])]
);
broadcast({ type: 'execution_started', execution_id: id, workflow_id });
// Start workflow execution asynchronously
executeWorkflowSteps(id, workflow_id, definition, req.user.username).catch(err => {
console.error(`[Workflow] Execution ${id} failed:`, err);
});
res.json({ id, workflow_id, status: 'running' });
} catch (error) {
res.status(500).json({ error: error.message });
}
});
app.get('/api/executions', authenticateSSO, async (req, res) => {
try {
const limit = parseInt(req.query.limit) || 50;
const offset = parseInt(req.query.offset) || 0;
const [rows] = await pool.query(
'SELECT e.*, w.name as workflow_name FROM executions e LEFT JOIN workflows w ON e.workflow_id = w.id ORDER BY e.started_at DESC LIMIT ? OFFSET ?',
[limit, offset]
);
// Get total count
const [countRows] = await pool.query('SELECT COUNT(*) as total FROM executions');
const total = countRows[0].total;
res.json({
executions: rows,
total: total,
limit: limit,
offset: offset,
hasMore: offset + rows.length < total
});
} catch (error) {
res.status(500).json({ error: error.message });
}
});
app.delete('/api/executions/:id', authenticateSSO, async (req, res) => {
try {
await pool.query('DELETE FROM executions WHERE id = ?', [req.params.id]);
broadcast({ type: 'execution_deleted', execution_id: req.params.id });
res.json({ success: true });
} catch (error) {
res.status(500).json({ error: error.message });
}
});
// Scheduled Commands API
app.get('/api/scheduled-commands', authenticateSSO, async (req, res) => {
try {
const [schedules] = await pool.query(
'SELECT * FROM scheduled_commands ORDER BY created_at DESC'
);
res.json(schedules);
} catch (error) {
res.status(500).json({ error: error.message });
}
});
app.post('/api/scheduled-commands', authenticateSSO, async (req, res) => {
try {
const { name, command, worker_ids, schedule_type, schedule_value } = req.body;
if (!name || !command || !worker_ids || !schedule_type || !schedule_value) {
return res.status(400).json({ error: 'Missing required fields' });
}
const id = generateUUID();
const nextRun = calculateNextRun(schedule_type, schedule_value);
await pool.query(
`INSERT INTO scheduled_commands
(id, name, command, worker_ids, schedule_type, schedule_value, created_by, next_run)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)`,
[id, name, command, JSON.stringify(worker_ids), schedule_type, schedule_value, req.user.username, nextRun]
);
res.json({ success: true, id });
} catch (error) {
res.status(500).json({ error: error.message });
}
});
app.put('/api/scheduled-commands/:id/toggle', authenticateSSO, async (req, res) => {
try {
const { id } = req.params;
const [current] = await pool.query('SELECT enabled FROM scheduled_commands WHERE id = ?', [id]);
if (current.length === 0) {
return res.status(404).json({ error: 'Schedule not found' });
}
const newEnabled = !current[0].enabled;
await pool.query('UPDATE scheduled_commands SET enabled = ? WHERE id = ?', [newEnabled, id]);
res.json({ success: true, enabled: newEnabled });
} catch (error) {
res.status(500).json({ error: error.message });
}
});
app.delete('/api/scheduled-commands/:id', authenticateSSO, async (req, res) => {
try {
const { id } = req.params;
await pool.query('DELETE FROM scheduled_commands WHERE id = ?', [id]);
res.json({ success: true });
} catch (error) {
res.status(500).json({ error: error.message });
}
});
// Health check (no auth required)
app.get('/health', async (req, res) => {
try {
await pool.query('SELECT 1');
res.json({
status: 'ok',
timestamp: new Date().toISOString(),
database: 'connected',
auth: 'authelia-sso'
});
} catch (error) {
res.status(500).json({
status: 'error',
timestamp: new Date().toISOString(),
database: 'disconnected',
error: error.message
});
}
});
// Start server
const PORT = process.env.PORT || 8080;
const HOST = process.env.HOST || '0.0.0.0';
initDatabase().then(() => {
server.listen(PORT, HOST, () => {
console.log(`PULSE Server running on http://${HOST}:${PORT}`);
console.log(`Connected to MariaDB at ${process.env.DB_HOST}`);
console.log(`Authentication: Authelia SSO`);
console.log(`Worker API Key configured: ${process.env.WORKER_API_KEY ? 'Yes' : 'No'}`);
});
}).catch(err => {
console.error('Failed to start server:', err);
process.exit(1);
});
// ============================================
// WORKFLOW EXECUTION ENGINE
// Get execution details with logs
app.get('/api/executions/:id', authenticateSSO, async (req, res) => {
try {
const [rows] = await pool.query('SELECT * FROM executions WHERE id = ?', [req.params.id]);
if (rows.length === 0) {
return res.status(404).json({ error: 'Not found' });
}
const execution = rows[0];
const state = activeExecutions.get(req.params.id);
res.json({
...execution,
logs: JSON.parse(execution.logs || '[]'),
waiting_for_input: state?.waitingForInput || false,
prompt: state?.promptData || null
});
} catch (error) {
res.status(500).json({ error: error.message });
}
});
// Delete worker (admin only)
app.delete('/api/workers/:id', authenticateSSO, async (req, res) => {
try {
if (!req.user.isAdmin) {
return res.status(403).json({ error: 'Admin access required' });
}
await pool.query('DELETE FROM workers WHERE id = ?', [req.params.id]);
res.json({ success: true });
broadcast({ type: 'worker_deleted', worker_id: req.params.id });
} catch (error) {
res.status(500).json({ error: error.message });
}
});
// Send direct command to specific worker (for testing)
app.post('/api/workers/:id/command', authenticateSSO, async (req, res) => {
try {
const { command } = req.body;
const executionId = generateUUID();
const workerId = req.params.id;
// Create execution record in database
await pool.query(
'INSERT INTO executions (id, workflow_id, status, started_by, started_at, logs) VALUES (?, ?, ?, ?, NOW(), ?)',
[executionId, null, 'running', req.user.username, JSON.stringify([{
step: 'quick_command',
action: 'command_sent',
worker_id: workerId,
command: command,
timestamp: new Date().toISOString()
}])]
);
// Send command via WebSocket to specific worker
const commandMessage = {
type: 'execute_command',
execution_id: executionId,
command: command,
worker_id: workerId,
timeout: 60000
};
const workerWs = workers.get(workerId);
if (!workerWs || workerWs.readyState !== WebSocket.OPEN) {
return res.status(400).json({ error: 'Worker not connected' });
}
workerWs.send(JSON.stringify(commandMessage));
console.log(`Command sent to worker ${workerId}: ${command}`);
broadcast({ type: 'execution_started', execution_id: executionId, workflow_id: null });
res.json({ success: true, execution_id: executionId });
} catch (error) {
res.status(500).json({ error: error.message });
}
});
// ============================================
// EXAMPLE WORKFLOW DEFINITIONS
// ============================================
// Example 1: Simple command execution
const simpleWorkflow = {
name: "Update System Packages",
description: "Update all packages on target servers",
steps: [
{
name: "Update package list",
type: "execute",
targets: ["all"],
command: "apt update"
},
{
name: "User Approval",
type: "prompt",
message: "Packages updated. Proceed with upgrade?",
options: ["Yes", "No"]
},
{
name: "Upgrade packages",
type: "execute",
targets: ["all"],
command: "apt upgrade -y",
condition: "promptResponse === 'Yes'"
}
]
};
// Example 2: Complex workflow with conditions
const backupWorkflow = {
name: "Backup and Verify",
description: "Create backup and verify integrity",
steps: [
{
name: "Create backup",
type: "execute",
targets: ["all"],
command: "tar -czf /tmp/backup-$(date +%Y%m%d).tar.gz /opt/pulse-worker"
},
{
name: "Wait for backup",
type: "wait",
duration: 5000
},
{
name: "Verify backup",
type: "execute",
targets: ["all"],
command: "tar -tzf /tmp/backup-*.tar.gz > /dev/null && echo 'Backup OK' || echo 'Backup FAILED'"
},
{
name: "Cleanup decision",
type: "prompt",
message: "Backup complete. Delete old backups?",
options: ["Yes", "No", "Cancel"]
},
{
name: "Cleanup old backups",
type: "execute",
targets: ["all"],
command: "find /tmp -name 'backup-*.tar.gz' -mtime +7 -delete",
condition: "promptResponse === 'Yes'"
}
]
};