2025-11-29 19:26:20 -05:00
const express = require ( 'express' ) ;
const http = require ( 'http' ) ;
const WebSocket = require ( 'ws' ) ;
const mysql = require ( 'mysql2/promise' ) ;
const crypto = require ( 'crypto' ) ;
require ( 'dotenv' ) . config ( ) ;
const app = express ( ) ;
const server = http . createServer ( app ) ;
const wss = new WebSocket . Server ( { server } ) ;
// Middleware
app . use ( express . json ( ) ) ;
app . use ( express . static ( 'public' ) ) ;
// UUID generator
function generateUUID ( ) {
return crypto . randomUUID ( ) ;
}
// Database pool
const pool = mysql . createPool ( {
host : process . env . DB _HOST ,
port : process . env . DB _PORT || 3306 ,
user : process . env . DB _USER ,
password : process . env . DB _PASSWORD ,
database : process . env . DB _NAME ,
waitForConnections : true ,
connectionLimit : 10 ,
queueLimit : 0
} ) ;
// Initialize database tables
async function initDatabase ( ) {
const connection = await pool . getConnection ( ) ;
try {
await connection . query ( `
CREATE TABLE IF NOT EXISTS users (
id VARCHAR ( 36 ) PRIMARY KEY ,
username VARCHAR ( 255 ) UNIQUE NOT NULL ,
display _name VARCHAR ( 255 ) ,
email VARCHAR ( 255 ) ,
groups TEXT ,
last _login TIMESTAMP ,
created _at TIMESTAMP DEFAULT CURRENT _TIMESTAMP
)
` );
2026-01-07 22:11:07 -05:00
// Database schema is managed manually - migrations removed after direct database fixes
2026-01-07 20:28:47 -05:00
2025-11-29 19:26:20 -05:00
await connection . query ( `
CREATE TABLE IF NOT EXISTS workers (
id VARCHAR ( 36 ) PRIMARY KEY ,
name VARCHAR ( 255 ) UNIQUE NOT NULL ,
status VARCHAR ( 50 ) NOT NULL ,
last _heartbeat TIMESTAMP NULL ,
api _key VARCHAR ( 255 ) ,
metadata JSON ,
INDEX idx _status ( status ) ,
INDEX idx _heartbeat ( last _heartbeat )
)
` );
await connection . query ( `
CREATE TABLE IF NOT EXISTS workflows (
id VARCHAR ( 36 ) PRIMARY KEY ,
name VARCHAR ( 255 ) NOT NULL ,
description TEXT ,
definition JSON NOT NULL ,
created _by VARCHAR ( 255 ) ,
created _at TIMESTAMP DEFAULT CURRENT _TIMESTAMP ,
updated _at TIMESTAMP DEFAULT CURRENT _TIMESTAMP ON UPDATE CURRENT _TIMESTAMP ,
INDEX idx _name ( name )
)
` );
await connection . query ( `
CREATE TABLE IF NOT EXISTS executions (
id VARCHAR ( 36 ) PRIMARY KEY ,
2026-01-07 20:27:02 -05:00
workflow _id VARCHAR ( 36 ) NULL ,
2025-11-29 19:26:20 -05:00
status VARCHAR ( 50 ) NOT NULL ,
started _by VARCHAR ( 255 ) ,
started _at TIMESTAMP NULL ,
completed _at TIMESTAMP NULL ,
logs JSON ,
INDEX idx _workflow ( workflow _id ) ,
INDEX idx _status ( status ) ,
INDEX idx _started ( started _at )
)
` );
2026-01-07 23:13:27 -05:00
await connection . query ( `
CREATE TABLE IF NOT EXISTS scheduled _commands (
id VARCHAR ( 36 ) PRIMARY KEY ,
name VARCHAR ( 255 ) NOT NULL ,
command TEXT NOT NULL ,
worker _ids JSON NOT NULL ,
schedule _type VARCHAR ( 50 ) NOT NULL ,
schedule _value VARCHAR ( 255 ) NOT NULL ,
enabled BOOLEAN DEFAULT TRUE ,
created _by VARCHAR ( 255 ) ,
created _at TIMESTAMP DEFAULT CURRENT _TIMESTAMP ,
last _run TIMESTAMP NULL ,
next _run TIMESTAMP NULL ,
INDEX idx _enabled ( enabled ) ,
INDEX idx _next _run ( next _run )
)
` );
2025-11-29 19:26:20 -05:00
console . log ( 'Database tables initialized successfully' ) ;
} catch ( error ) {
console . error ( 'Database initialization error:' , error ) ;
throw error ;
} finally {
connection . release ( ) ;
}
}
2026-03-03 16:04:22 -05:00
// Auto-cleanup old executions (runs hourly)
2026-01-07 22:50:39 -05:00
async function cleanupOldExecutions ( ) {
try {
2026-03-03 16:04:22 -05:00
const retentionDays = parseInt ( process . env . EXECUTION _RETENTION _DAYS ) || 1 ;
2026-01-07 22:50:39 -05:00
const [ result ] = await pool . query (
` DELETE FROM executions
WHERE status IN ( 'completed' , 'failed' )
AND started _at < DATE _SUB ( NOW ( ) , INTERVAL ? DAY ) ` ,
[ retentionDays ]
) ;
2026-03-03 16:04:22 -05:00
if ( result . affectedRows > 0 ) {
console . log ( ` [Cleanup] Removed ${ result . affectedRows } executions older than ${ retentionDays } day(s) ` ) ;
}
2026-01-07 22:50:39 -05:00
} catch ( error ) {
console . error ( '[Cleanup] Error removing old executions:' , error ) ;
}
}
2026-03-03 16:04:22 -05:00
// Run cleanup hourly
setInterval ( cleanupOldExecutions , 60 * 60 * 1000 ) ;
2026-01-07 22:50:39 -05:00
// Run cleanup on startup
cleanupOldExecutions ( ) ;
2026-01-07 23:13:27 -05:00
// Scheduled Commands Processor
async function processScheduledCommands ( ) {
try {
const [ schedules ] = await pool . query (
` SELECT * FROM scheduled_commands
WHERE enabled = TRUE
AND ( next _run IS NULL OR next _run <= NOW ( ) ) `
) ;
for ( const schedule of schedules ) {
console . log ( ` [Scheduler] Running scheduled command: ${ schedule . name } ` ) ;
const workerIds = JSON . parse ( schedule . worker _ids ) ;
// Execute command on each worker
for ( const workerId of workerIds ) {
const workerWs = workers . get ( workerId ) ;
if ( workerWs && workerWs . readyState === WebSocket . OPEN ) {
const executionId = generateUUID ( ) ;
// Create execution record
await pool . query (
'INSERT INTO executions (id, workflow_id, status, started_by, started_at, logs) VALUES (?, ?, ?, ?, NOW(), ?)' ,
[ executionId , null , 'running' , ` scheduler: ${ schedule . name } ` , JSON . stringify ( [ {
step : 'scheduled_command' ,
action : 'command_sent' ,
worker _id : workerId ,
command : schedule . command ,
timestamp : new Date ( ) . toISOString ( )
} ] ) ]
) ;
// Send command to worker
workerWs . send ( JSON . stringify ( {
type : 'execute_command' ,
execution _id : executionId ,
command : schedule . command ,
worker _id : workerId ,
timeout : 300000 // 5 minute timeout for scheduled commands
} ) ) ;
broadcast ( { type : 'execution_started' , execution _id : executionId , workflow _id : null } ) ;
}
}
// Update last_run and calculate next_run
const nextRun = calculateNextRun ( schedule . schedule _type , schedule . schedule _value ) ;
await pool . query (
'UPDATE scheduled_commands SET last_run = NOW(), next_run = ? WHERE id = ?' ,
[ nextRun , schedule . id ]
) ;
}
} catch ( error ) {
console . error ( '[Scheduler] Error processing scheduled commands:' , error ) ;
}
}
function calculateNextRun ( scheduleType , scheduleValue ) {
const now = new Date ( ) ;
if ( scheduleType === 'interval' ) {
// Interval in minutes
const minutes = parseInt ( scheduleValue ) ;
return new Date ( now . getTime ( ) + minutes * 60000 ) ;
} else if ( scheduleType === 'daily' ) {
// Daily at HH:MM
const [ hours , minutes ] = scheduleValue . split ( ':' ) . map ( Number ) ;
const next = new Date ( now ) ;
next . setHours ( hours , minutes , 0 , 0 ) ;
// If time has passed today, schedule for tomorrow
if ( next <= now ) {
next . setDate ( next . getDate ( ) + 1 ) ;
}
return next ;
} else if ( scheduleType === 'hourly' ) {
// Every N hours
const hours = parseInt ( scheduleValue ) ;
return new Date ( now . getTime ( ) + hours * 3600000 ) ;
}
return null ;
}
// Run scheduler every minute
setInterval ( processScheduledCommands , 60 * 1000 ) ;
// Initial run on startup
setTimeout ( processScheduledCommands , 5000 ) ;
2025-11-29 19:26:20 -05:00
// WebSocket connections
const clients = new Set ( ) ;
2026-01-07 22:23:02 -05:00
const workers = new Map ( ) ; // Map worker_id -> WebSocket connection
2025-11-29 19:26:20 -05:00
wss . on ( 'connection' , ( ws ) => {
clients . add ( ws ) ;
2026-01-07 20:20:18 -05:00
// Handle incoming messages from workers
ws . on ( 'message' , async ( data ) => {
try {
const message = JSON . parse ( data . toString ( ) ) ;
console . log ( 'WebSocket message received:' , message . type ) ;
if ( message . type === 'command_result' ) {
// Handle command result from worker
2026-01-07 23:19:12 -05:00
const { execution _id , command _id , worker _id , success , stdout , stderr , duration , timestamp } = message ;
2026-01-07 20:20:18 -05:00
// Add result to execution logs
await addExecutionLog ( execution _id , {
step : 'command_execution' ,
action : 'command_result' ,
2026-01-07 23:19:12 -05:00
command _id : command _id , // Include command_id for workflow tracking
2026-01-07 20:20:18 -05:00
worker _id : worker _id ,
success : success ,
stdout : stdout ,
stderr : stderr ,
duration : duration ,
timestamp : timestamp || new Date ( ) . toISOString ( )
} ) ;
2026-01-07 23:19:12 -05:00
// For non-workflow executions, update status immediately
// For workflow executions, the workflow engine will update status
const [ execution ] = await pool . query ( 'SELECT workflow_id FROM executions WHERE id = ?' , [ execution _id ] ) ;
if ( execution . length > 0 && ! execution [ 0 ] . workflow _id ) {
// Only update status for quick commands (no workflow_id)
const finalStatus = success ? 'completed' : 'failed' ;
await updateExecutionStatus ( execution _id , finalStatus ) ;
}
2026-01-07 22:23:02 -05:00
2026-01-07 20:20:18 -05:00
// Broadcast result to all connected clients
broadcast ( {
type : 'command_result' ,
execution _id : execution _id ,
worker _id : worker _id ,
success : success ,
stdout : stdout ,
stderr : stderr
} ) ;
2026-01-07 23:19:12 -05:00
console . log ( ` Command result received for execution ${ execution _id } : ${ success ? 'success' : 'failed' } ` ) ;
2026-01-07 20:20:18 -05:00
}
if ( message . type === 'workflow_result' ) {
// Handle workflow result from worker
const { execution _id , worker _id , success , message : resultMessage , timestamp } = message ;
// Add final result to logs
await addExecutionLog ( execution _id , {
step : 'workflow_completion' ,
action : 'workflow_result' ,
worker _id : worker _id ,
success : success ,
message : resultMessage ,
timestamp : timestamp || new Date ( ) . toISOString ( )
} ) ;
// Update execution status
const finalStatus = success ? 'completed' : 'failed' ;
await updateExecutionStatus ( execution _id , finalStatus ) ;
// Broadcast completion to all clients
broadcast ( {
type : 'workflow_result' ,
execution _id : execution _id ,
status : finalStatus ,
success : success ,
message : resultMessage
} ) ;
console . log ( ` Workflow result received for execution ${ execution _id } : ${ finalStatus } ` ) ;
}
if ( message . type === 'worker_connect' ) {
// Handle worker connection
const { worker _id , worker _name } = message ;
console . log ( ` Worker connected: ${ worker _name } ( ${ worker _id } ) ` ) ;
2026-01-07 22:29:23 -05:00
// Find the database worker ID by name
const [ dbWorkers ] = await pool . query (
'SELECT id FROM workers WHERE name = ?' ,
[ worker _name ]
2026-01-07 20:20:18 -05:00
) ;
2026-01-07 22:29:23 -05:00
if ( dbWorkers . length > 0 ) {
const dbWorkerId = dbWorkers [ 0 ] . id ;
// Store worker WebSocket connection using BOTH IDs
workers . set ( worker _id , ws ) ; // Runtime ID
workers . set ( dbWorkerId , ws ) ; // Database ID
// Store mapping for cleanup
ws . workerId = worker _id ;
ws . dbWorkerId = dbWorkerId ;
console . log ( ` Mapped worker: runtime_id= ${ worker _id } , db_id= ${ dbWorkerId } , name= ${ worker _name } ` ) ;
// Update worker status to online
await pool . query (
` UPDATE workers SET status='online', last_heartbeat=NOW() WHERE id=? ` ,
[ dbWorkerId ]
) ;
// Broadcast worker status update with database ID
broadcast ( {
type : 'worker_update' ,
worker _id : dbWorkerId ,
status : 'online'
} ) ;
} else {
console . log ( ` Worker ${ worker _name } not found in database, will be created on heartbeat ` ) ;
}
2026-01-07 20:20:18 -05:00
}
if ( message . type === 'pong' ) {
// Handle worker pong response
const { worker _id } = message ;
await pool . query (
` UPDATE workers SET last_heartbeat=NOW() WHERE id=? ` ,
[ worker _id ]
) ;
}
} catch ( error ) {
console . error ( 'WebSocket message error:' , error ) ;
}
} ) ;
2026-01-07 22:23:02 -05:00
ws . on ( 'close' , ( ) => {
clients . delete ( ws ) ;
2026-01-07 22:29:23 -05:00
// Remove worker from workers map when disconnected (both runtime and db IDs)
if ( ws . workerId ) {
workers . delete ( ws . workerId ) ;
console . log ( ` Worker ${ ws . workerId } (runtime ID) disconnected ` ) ;
}
if ( ws . dbWorkerId ) {
workers . delete ( ws . dbWorkerId ) ;
console . log ( ` Worker ${ ws . dbWorkerId } (database ID) disconnected ` ) ;
2026-01-07 22:23:02 -05:00
}
} ) ;
2025-11-29 19:26:20 -05:00
} ) ;
// Broadcast to all connected clients
function broadcast ( data ) {
clients . forEach ( client => {
if ( client . readyState === WebSocket . OPEN ) {
client . send ( JSON . stringify ( data ) ) ;
}
} ) ;
}
2026-01-08 22:03:00 -05:00
// Helper function to add log entry to execution
async function addExecutionLog ( executionId , logEntry ) {
try {
const [ execution ] = await pool . query ( 'SELECT logs FROM executions WHERE id = ?' , [ executionId ] ) ;
if ( execution . length > 0 ) {
const logs = typeof execution [ 0 ] . logs === 'string' ? JSON . parse ( execution [ 0 ] . logs ) : execution [ 0 ] . logs ;
logs . push ( logEntry ) ;
await pool . query (
'UPDATE executions SET logs = ? WHERE id = ?' ,
[ JSON . stringify ( logs ) , executionId ]
) ;
}
} catch ( error ) {
console . error ( ` [Workflow] Error adding execution log: ` , error ) ;
}
}
// Helper function to update execution status
async function updateExecutionStatus ( executionId , status ) {
try {
await pool . query (
'UPDATE executions SET status = ?, completed_at = NOW() WHERE id = ?' ,
[ status , executionId ]
) ;
broadcast ( {
type : 'execution_status' ,
execution _id : executionId ,
status : status
} ) ;
console . log ( ` [Workflow] Execution ${ executionId } status updated to: ${ status } ` ) ;
} catch ( error ) {
console . error ( ` [Workflow] Error updating execution status: ` , error ) ;
}
}
2025-11-29 19:26:20 -05:00
// Authelia SSO Middleware
async function authenticateSSO ( req , res , next ) {
// Check for Authelia headers
const remoteUser = req . headers [ 'remote-user' ] ;
const remoteName = req . headers [ 'remote-name' ] ;
const remoteEmail = req . headers [ 'remote-email' ] ;
const remoteGroups = req . headers [ 'remote-groups' ] ;
if ( ! remoteUser ) {
return res . status ( 401 ) . json ( {
error : 'Not authenticated' ,
message : 'Please access this service through auth.lotusguild.org'
} ) ;
}
// Check if user is in allowed groups (admin or employee)
const groups = remoteGroups ? remoteGroups . split ( ',' ) . map ( g => g . trim ( ) ) : [ ] ;
const allowedGroups = [ 'admin' , 'employee' ] ;
const hasAccess = groups . some ( g => allowedGroups . includes ( g ) ) ;
if ( ! hasAccess ) {
return res . status ( 403 ) . json ( {
error : 'Access denied' ,
message : 'You must be in admin or employee group'
} ) ;
}
// Store/update user in database
try {
const userId = generateUUID ( ) ;
await pool . query (
` INSERT INTO users (id, username, display_name, email, groups, last_login)
VALUES ( ? , ? , ? , ? , ? , NOW ( ) )
ON DUPLICATE KEY UPDATE
display _name = VALUES ( display _name ) ,
email = VALUES ( email ) ,
groups = VALUES ( groups ) ,
last _login = NOW ( ) ` ,
[ userId , remoteUser , remoteName , remoteEmail , remoteGroups ]
) ;
} catch ( error ) {
console . error ( 'Error updating user:' , error ) ;
}
// Attach user info to request
req . user = {
username : remoteUser ,
name : remoteName || remoteUser ,
email : remoteEmail || '' ,
groups : groups ,
isAdmin : groups . includes ( 'admin' )
} ;
next ( ) ;
}
2026-03-03 16:04:22 -05:00
// Gandalf machine-to-machine API key auth
function authenticateGandalf ( req , res , next ) {
const apiKey = req . headers [ 'x-gandalf-api-key' ] ;
if ( ! apiKey || apiKey !== process . env . GANDALF _API _KEY ) {
return res . status ( 401 ) . json ( { error : 'Unauthorized' } ) ;
}
req . user = { username : 'gandalf:link_stats' , isAdmin : false } ;
next ( ) ;
}
2026-01-07 23:19:12 -05:00
// Workflow Execution Engine
2026-03-03 16:20:05 -05:00
// Substitute {{param_name}} placeholders in a command string.
// Only alphanumeric + safe punctuation allowed in substituted values.
function applyParams ( command , params ) {
return command . replace ( /\{\{(\w+)\}\}/g , ( match , key ) => {
if ( ! ( key in params ) ) return match ;
const val = String ( params [ key ] ) . trim ( ) ;
if ( ! /^[a-zA-Z0-9._:@\-\/]+$/ . test ( val ) ) {
throw new Error ( ` Unsafe value for workflow parameter " ${ key } " ` ) ;
}
return val ;
} ) ;
}
async function executeWorkflowSteps ( executionId , workflowId , definition , username , params = { } ) {
2026-01-07 23:19:12 -05:00
try {
console . log ( ` [Workflow] Starting execution ${ executionId } for workflow ${ workflowId } ` ) ;
const steps = definition . steps || [ ] ;
let allStepsSucceeded = true ;
for ( let i = 0 ; i < steps . length ; i ++ ) {
const step = steps [ i ] ;
console . log ( ` [Workflow] Execution ${ executionId } - Step ${ i + 1 } / ${ steps . length } : ${ step . name } ` ) ;
// Add step start log
await addExecutionLog ( executionId , {
step : i + 1 ,
step _name : step . name ,
action : 'step_started' ,
timestamp : new Date ( ) . toISOString ( )
} ) ;
broadcast ( {
type : 'workflow_step_started' ,
execution _id : executionId ,
step : i + 1 ,
step _name : step . name
} ) ;
if ( step . type === 'execute' ) {
// Execute command step
2026-03-03 16:20:05 -05:00
const success = await executeCommandStep ( executionId , step , i + 1 , params ) ;
2026-01-07 23:19:12 -05:00
if ( ! success ) {
allStepsSucceeded = false ;
break ; // Stop workflow on failure
}
} else if ( step . type === 'wait' ) {
// Wait step (delay in seconds)
const seconds = step . duration || 5 ;
await addExecutionLog ( executionId , {
step : i + 1 ,
action : 'waiting' ,
duration : seconds ,
timestamp : new Date ( ) . toISOString ( )
} ) ;
await new Promise ( resolve => setTimeout ( resolve , seconds * 1000 ) ) ;
} else if ( step . type === 'prompt' ) {
// Interactive prompt step (not fully implemented, would need user interaction)
await addExecutionLog ( executionId , {
step : i + 1 ,
action : 'prompt_skipped' ,
message : 'Interactive prompts not yet supported' ,
timestamp : new Date ( ) . toISOString ( )
} ) ;
}
// Add step completion log
await addExecutionLog ( executionId , {
step : i + 1 ,
step _name : step . name ,
action : 'step_completed' ,
timestamp : new Date ( ) . toISOString ( )
} ) ;
broadcast ( {
type : 'workflow_step_completed' ,
execution _id : executionId ,
step : i + 1 ,
step _name : step . name
} ) ;
}
// Mark execution as completed or failed
const finalStatus = allStepsSucceeded ? 'completed' : 'failed' ;
await updateExecutionStatus ( executionId , finalStatus ) ;
console . log ( ` [Workflow] Execution ${ executionId } finished with status: ${ finalStatus } ` ) ;
} catch ( error ) {
console . error ( ` [Workflow] Execution ${ executionId } error: ` , error ) ;
await addExecutionLog ( executionId , {
action : 'workflow_error' ,
error : error . message ,
timestamp : new Date ( ) . toISOString ( )
} ) ;
await updateExecutionStatus ( executionId , 'failed' ) ;
}
}
2026-03-03 16:20:05 -05:00
async function executeCommandStep ( executionId , step , stepNumber , params = { } ) {
2026-01-07 23:19:12 -05:00
try {
2026-03-03 16:20:05 -05:00
let command = step . command ;
if ( Object . keys ( params ) . length > 0 ) {
command = applyParams ( command , params ) ;
}
2026-01-07 23:19:12 -05:00
const targets = step . targets || [ 'all' ] ;
// Determine which workers to target
let targetWorkerIds = [ ] ;
if ( targets . includes ( 'all' ) ) {
// Get all online workers
const [ onlineWorkers ] = await pool . query ( 'SELECT id FROM workers WHERE status = ?' , [ 'online' ] ) ;
targetWorkerIds = onlineWorkers . map ( w => w . id ) ;
} else {
// Specific worker IDs or names
for ( const target of targets ) {
// Try to find by ID first, then by name
const [ workerById ] = await pool . query ( 'SELECT id FROM workers WHERE id = ?' , [ target ] ) ;
if ( workerById . length > 0 ) {
targetWorkerIds . push ( workerById [ 0 ] . id ) ;
} else {
const [ workerByName ] = await pool . query ( 'SELECT id FROM workers WHERE name = ?' , [ target ] ) ;
if ( workerByName . length > 0 ) {
targetWorkerIds . push ( workerByName [ 0 ] . id ) ;
}
}
}
}
if ( targetWorkerIds . length === 0 ) {
await addExecutionLog ( executionId , {
step : stepNumber ,
action : 'no_workers' ,
message : 'No workers available for this step' ,
timestamp : new Date ( ) . toISOString ( )
} ) ;
return false ;
}
// Execute command on each target worker and wait for results
const results = [ ] ;
for ( const workerId of targetWorkerIds ) {
const workerWs = workers . get ( workerId ) ;
if ( ! workerWs || workerWs . readyState !== WebSocket . OPEN ) {
await addExecutionLog ( executionId , {
step : stepNumber ,
action : 'worker_offline' ,
worker _id : workerId ,
timestamp : new Date ( ) . toISOString ( )
} ) ;
continue ;
}
// Send command to worker
const commandId = generateUUID ( ) ;
await addExecutionLog ( executionId , {
step : stepNumber ,
action : 'command_sent' ,
worker _id : workerId ,
command : command ,
command _id : commandId ,
timestamp : new Date ( ) . toISOString ( )
} ) ;
workerWs . send ( JSON . stringify ( {
type : 'execute_command' ,
execution _id : executionId ,
command _id : commandId ,
command : command ,
worker _id : workerId ,
timeout : 120000 // 2 minute timeout
} ) ) ;
// Wait for command result (with timeout)
const result = await waitForCommandResult ( executionId , commandId , 120000 ) ;
results . push ( result ) ;
if ( ! result . success ) {
// Command failed, workflow should stop
return false ;
}
}
// All commands succeeded
return results . every ( r => r . success ) ;
} catch ( error ) {
console . error ( ` [Workflow] Error executing command step: ` , error ) ;
await addExecutionLog ( executionId , {
step : stepNumber ,
action : 'step_error' ,
error : error . message ,
timestamp : new Date ( ) . toISOString ( )
} ) ;
return false ;
}
}
async function waitForCommandResult ( executionId , commandId , timeout ) {
return new Promise ( ( resolve ) => {
const startTime = Date . now ( ) ;
const checkInterval = setInterval ( async ( ) => {
try {
// Check if we've received the command result in logs
const [ execution ] = await pool . query ( 'SELECT logs FROM executions WHERE id = ?' , [ executionId ] ) ;
if ( execution . length > 0 ) {
const logs = typeof execution [ 0 ] . logs === 'string' ? JSON . parse ( execution [ 0 ] . logs ) : execution [ 0 ] . logs ;
const resultLog = logs . find ( log => log . command _id === commandId && log . action === 'command_result' ) ;
if ( resultLog ) {
clearInterval ( checkInterval ) ;
resolve ( {
success : resultLog . success ,
stdout : resultLog . stdout ,
stderr : resultLog . stderr
} ) ;
return ;
}
}
// Check timeout
if ( Date . now ( ) - startTime > timeout ) {
clearInterval ( checkInterval ) ;
resolve ( {
success : false ,
error : 'Command timeout'
} ) ;
}
} catch ( error ) {
console . error ( '[Workflow] Error checking command result:' , error ) ;
clearInterval ( checkInterval ) ;
resolve ( {
success : false ,
error : error . message
} ) ;
}
} , 500 ) ; // Check every 500ms
} ) ;
}
2025-11-29 19:26:20 -05:00
// Routes - All protected by SSO
app . get ( '/api/user' , authenticateSSO , ( req , res ) => {
res . json ( req . user ) ;
} ) ;
app . get ( '/api/workflows' , authenticateSSO , async ( req , res ) => {
try {
const [ rows ] = await pool . query ( 'SELECT * FROM workflows ORDER BY created_at DESC' ) ;
res . json ( rows ) ;
} catch ( error ) {
res . status ( 500 ) . json ( { error : error . message } ) ;
}
} ) ;
app . post ( '/api/workflows' , authenticateSSO , async ( req , res ) => {
try {
const { name , description , definition } = req . body ;
const id = generateUUID ( ) ;
2026-01-07 23:30:50 -05:00
console . log ( '[Workflow] Creating workflow:' , name ) ;
console . log ( '[Workflow] Definition:' , JSON . stringify ( definition , null , 2 ) ) ;
2025-11-29 19:26:20 -05:00
await pool . query (
'INSERT INTO workflows (id, name, description, definition, created_by) VALUES (?, ?, ?, ?, ?)' ,
[ id , name , description , JSON . stringify ( definition ) , req . user . username ]
) ;
2026-01-07 23:30:50 -05:00
console . log ( '[Workflow] Successfully inserted workflow:' , id ) ;
2025-11-29 19:26:20 -05:00
res . json ( { id , name , description , definition } ) ;
2026-01-07 23:30:50 -05:00
console . log ( '[Workflow] Broadcasting workflow_created' ) ;
2025-11-29 19:26:20 -05:00
broadcast ( { type : 'workflow_created' , workflow _id : id } ) ;
2026-01-07 23:30:50 -05:00
console . log ( '[Workflow] Broadcast complete' ) ;
2025-11-29 19:26:20 -05:00
} catch ( error ) {
2026-01-07 23:30:50 -05:00
console . error ( '[Workflow] Error creating workflow:' , error ) ;
2025-11-29 19:26:20 -05:00
res . status ( 500 ) . json ( { error : error . message } ) ;
}
} ) ;
app . delete ( '/api/workflows/:id' , authenticateSSO , async ( req , res ) => {
try {
// Only admins can delete workflows
if ( ! req . user . isAdmin ) {
return res . status ( 403 ) . json ( { error : 'Admin access required' } ) ;
}
await pool . query ( 'DELETE FROM workflows WHERE id = ?' , [ req . params . id ] ) ;
res . json ( { success : true } ) ;
broadcast ( { type : 'workflow_deleted' , workflow _id : req . params . id } ) ;
} catch ( error ) {
res . status ( 500 ) . json ( { error : error . message } ) ;
}
} ) ;
app . get ( '/api/workers' , authenticateSSO , async ( req , res ) => {
try {
const [ rows ] = await pool . query ( 'SELECT * FROM workers ORDER BY name' ) ;
res . json ( rows ) ;
} catch ( error ) {
res . status ( 500 ) . json ( { error : error . message } ) ;
}
} ) ;
app . post ( '/api/workers/heartbeat' , async ( req , res ) => {
try {
const { worker _id , name , metadata } = req . body ;
const apiKey = req . headers [ 'x-api-key' ] ;
// Verify API key
if ( apiKey !== process . env . WORKER _API _KEY ) {
return res . status ( 401 ) . json ( { error : 'Invalid API key' } ) ;
}
await pool . query (
` INSERT INTO workers (id, name, status, last_heartbeat, api_key, metadata)
VALUES ( ? , ? , 'online' , NOW ( ) , ? , ? )
ON DUPLICATE KEY UPDATE
status = 'online' ,
last _heartbeat = NOW ( ) ,
metadata = VALUES ( metadata ) ` ,
[ worker _id , name , apiKey , JSON . stringify ( metadata ) ]
) ;
broadcast ( { type : 'worker_update' , worker _id , status : 'online' } ) ;
res . json ( { success : true } ) ;
} catch ( error ) {
res . status ( 500 ) . json ( { error : error . message } ) ;
}
} ) ;
app . post ( '/api/executions' , authenticateSSO , async ( req , res ) => {
try {
2026-03-03 16:20:05 -05:00
const { workflow _id , params = { } } = req . body ;
2025-11-29 19:26:20 -05:00
const id = generateUUID ( ) ;
2026-01-07 23:19:12 -05:00
// Get workflow definition
const [ workflows ] = await pool . query ( 'SELECT * FROM workflows WHERE id = ?' , [ workflow _id ] ) ;
if ( workflows . length === 0 ) {
return res . status ( 404 ) . json ( { error : 'Workflow not found' } ) ;
}
const workflow = workflows [ 0 ] ;
const definition = typeof workflow . definition === 'string' ? JSON . parse ( workflow . definition ) : workflow . definition ;
2026-03-03 16:20:05 -05:00
// Validate required params
const paramDefs = definition . params || [ ] ;
for ( const pd of paramDefs ) {
if ( pd . required && ! params [ pd . name ] ) {
return res . status ( 400 ) . json ( { error : ` Missing required parameter: ${ pd . label || pd . name } ` } ) ;
}
}
2026-01-07 23:19:12 -05:00
// Create execution record
2026-03-03 16:20:05 -05:00
const initLogs = Object . keys ( params ) . length > 0
? [ { action : 'params' , params , timestamp : new Date ( ) . toISOString ( ) } ]
: [ ] ;
2025-11-29 19:26:20 -05:00
await pool . query (
'INSERT INTO executions (id, workflow_id, status, started_by, started_at, logs) VALUES (?, ?, ?, ?, NOW(), ?)' ,
2026-03-03 16:20:05 -05:00
[ id , workflow _id , 'running' , req . user . username , JSON . stringify ( initLogs ) ]
2025-11-29 19:26:20 -05:00
) ;
2026-01-07 23:19:12 -05:00
2025-11-29 19:26:20 -05:00
broadcast ( { type : 'execution_started' , execution _id : id , workflow _id } ) ;
2026-01-07 23:19:12 -05:00
// Start workflow execution asynchronously
2026-03-03 16:20:05 -05:00
executeWorkflowSteps ( id , workflow _id , definition , req . user . username , params ) . catch ( err => {
2026-01-07 23:19:12 -05:00
console . error ( ` [Workflow] Execution ${ id } failed: ` , err ) ;
} ) ;
2025-11-29 19:26:20 -05:00
res . json ( { id , workflow _id , status : 'running' } ) ;
} catch ( error ) {
res . status ( 500 ) . json ( { error : error . message } ) ;
}
} ) ;
app . get ( '/api/executions' , authenticateSSO , async ( req , res ) => {
try {
2026-01-07 22:50:39 -05:00
const limit = parseInt ( req . query . limit ) || 50 ;
const offset = parseInt ( req . query . offset ) || 0 ;
2026-03-03 16:04:22 -05:00
const hideInternal = req . query . hide _internal === 'true' ;
const whereClause = hideInternal
? "WHERE started_by NOT LIKE 'gandalf:%' AND started_by NOT LIKE 'scheduler:%'"
: '' ;
2026-01-07 22:50:39 -05:00
2025-11-29 19:26:20 -05:00
const [ rows ] = await pool . query (
2026-03-03 16:04:22 -05:00
` SELECT e.*, w.name as workflow_name FROM executions e LEFT JOIN workflows w ON e.workflow_id = w.id ${ whereClause } ORDER BY e.started_at DESC LIMIT ? OFFSET ? ` ,
2026-01-07 22:50:39 -05:00
[ limit , offset ]
2025-11-29 19:26:20 -05:00
) ;
2026-01-07 22:50:39 -05:00
// Get total count
2026-03-03 16:04:22 -05:00
const [ countRows ] = await pool . query ( ` SELECT COUNT(*) as total FROM executions ${ whereClause } ` ) ;
2026-01-07 22:50:39 -05:00
const total = countRows [ 0 ] . total ;
res . json ( {
executions : rows ,
total : total ,
limit : limit ,
offset : offset ,
hasMore : offset + rows . length < total
} ) ;
2025-11-29 19:26:20 -05:00
} catch ( error ) {
res . status ( 500 ) . json ( { error : error . message } ) ;
}
} ) ;
2026-01-07 22:36:51 -05:00
app . delete ( '/api/executions/:id' , authenticateSSO , async ( req , res ) => {
try {
await pool . query ( 'DELETE FROM executions WHERE id = ?' , [ req . params . id ] ) ;
broadcast ( { type : 'execution_deleted' , execution _id : req . params . id } ) ;
res . json ( { success : true } ) ;
} catch ( error ) {
res . status ( 500 ) . json ( { error : error . message } ) ;
}
} ) ;
2026-01-08 22:11:59 -05:00
app . post ( '/api/executions/:id/abort' , authenticateSSO , async ( req , res ) => {
try {
const executionId = req . params . id ;
// Check if execution exists and is running
const [ execution ] = await pool . query ( 'SELECT status FROM executions WHERE id = ?' , [ executionId ] ) ;
if ( execution . length === 0 ) {
return res . status ( 404 ) . json ( { error : 'Execution not found' } ) ;
}
if ( execution [ 0 ] . status !== 'running' ) {
return res . status ( 400 ) . json ( { error : 'Execution is not running' } ) ;
}
// Add abort log entry
await addExecutionLog ( executionId , {
action : 'execution_aborted' ,
aborted _by : req . user . username ,
timestamp : new Date ( ) . toISOString ( )
} ) ;
// Update execution status to failed
await updateExecutionStatus ( executionId , 'failed' ) ;
console . log ( ` [Execution] Execution ${ executionId } aborted by ${ req . user . username } ` ) ;
res . json ( { success : true } ) ;
} catch ( error ) {
console . error ( '[Execution] Error aborting execution:' , error ) ;
res . status ( 500 ) . json ( { error : error . message } ) ;
}
} ) ;
2026-01-07 23:13:27 -05:00
// Scheduled Commands API
app . get ( '/api/scheduled-commands' , authenticateSSO , async ( req , res ) => {
try {
const [ schedules ] = await pool . query (
'SELECT * FROM scheduled_commands ORDER BY created_at DESC'
) ;
res . json ( schedules ) ;
} catch ( error ) {
res . status ( 500 ) . json ( { error : error . message } ) ;
}
} ) ;
app . post ( '/api/scheduled-commands' , authenticateSSO , async ( req , res ) => {
try {
const { name , command , worker _ids , schedule _type , schedule _value } = req . body ;
if ( ! name || ! command || ! worker _ids || ! schedule _type || ! schedule _value ) {
return res . status ( 400 ) . json ( { error : 'Missing required fields' } ) ;
}
const id = generateUUID ( ) ;
const nextRun = calculateNextRun ( schedule _type , schedule _value ) ;
await pool . query (
` INSERT INTO scheduled_commands
( id , name , command , worker _ids , schedule _type , schedule _value , created _by , next _run )
VALUES ( ? , ? , ? , ? , ? , ? , ? , ? ) ` ,
[ id , name , command , JSON . stringify ( worker _ids ) , schedule _type , schedule _value , req . user . username , nextRun ]
) ;
res . json ( { success : true , id } ) ;
} catch ( error ) {
res . status ( 500 ) . json ( { error : error . message } ) ;
}
} ) ;
app . put ( '/api/scheduled-commands/:id/toggle' , authenticateSSO , async ( req , res ) => {
try {
const { id } = req . params ;
const [ current ] = await pool . query ( 'SELECT enabled FROM scheduled_commands WHERE id = ?' , [ id ] ) ;
if ( current . length === 0 ) {
return res . status ( 404 ) . json ( { error : 'Schedule not found' } ) ;
}
const newEnabled = ! current [ 0 ] . enabled ;
await pool . query ( 'UPDATE scheduled_commands SET enabled = ? WHERE id = ?' , [ newEnabled , id ] ) ;
res . json ( { success : true , enabled : newEnabled } ) ;
} catch ( error ) {
res . status ( 500 ) . json ( { error : error . message } ) ;
}
} ) ;
app . delete ( '/api/scheduled-commands/:id' , authenticateSSO , async ( req , res ) => {
try {
const { id } = req . params ;
await pool . query ( 'DELETE FROM scheduled_commands WHERE id = ?' , [ id ] ) ;
res . json ( { success : true } ) ;
} catch ( error ) {
res . status ( 500 ) . json ( { error : error . message } ) ;
}
} ) ;
2026-03-03 16:04:22 -05:00
// Internal M2M API for Gandalf
app . post ( '/api/internal/command' , authenticateGandalf , async ( req , res ) => {
try {
const { worker _id , command } = req . body ;
if ( ! worker _id || ! command ) {
return res . status ( 400 ) . json ( { error : 'worker_id and command are required' } ) ;
}
const workerWs = workers . get ( worker _id ) ;
if ( ! workerWs || workerWs . readyState !== WebSocket . OPEN ) {
return res . status ( 400 ) . json ( { error : 'Worker not connected' } ) ;
}
const executionId = generateUUID ( ) ;
await pool . query (
'INSERT INTO executions (id, workflow_id, status, started_by, started_at, logs) VALUES (?, ?, ?, ?, NOW(), ?)' ,
[ executionId , null , 'running' , req . user . username , JSON . stringify ( [ {
step : 'internal_command' ,
action : 'command_sent' ,
worker _id : worker _id ,
command : command ,
timestamp : new Date ( ) . toISOString ( )
} ] ) ]
) ;
workerWs . send ( JSON . stringify ( {
type : 'execute_command' ,
execution _id : executionId ,
command : command ,
worker _id : worker _id ,
timeout : 60000
} ) ) ;
res . json ( { execution _id : executionId } ) ;
} catch ( error ) {
res . status ( 500 ) . json ( { error : error . message } ) ;
}
} ) ;
app . get ( '/api/internal/executions/:id' , authenticateGandalf , async ( req , res ) => {
try {
const [ rows ] = await pool . query ( 'SELECT * FROM executions WHERE id = ?' , [ req . params . id ] ) ;
if ( rows . length === 0 ) {
return res . status ( 404 ) . json ( { error : 'Not found' } ) ;
}
const execution = rows [ 0 ] ;
res . json ( {
... execution ,
logs : JSON . parse ( execution . logs || '[]' )
} ) ;
} catch ( error ) {
res . status ( 500 ) . json ( { error : error . message } ) ;
}
} ) ;
2025-11-29 19:26:20 -05:00
// Health check (no auth required)
app . get ( '/health' , async ( req , res ) => {
try {
await pool . query ( 'SELECT 1' ) ;
res . json ( {
status : 'ok' ,
timestamp : new Date ( ) . toISOString ( ) ,
database : 'connected' ,
auth : 'authelia-sso'
} ) ;
} catch ( error ) {
res . status ( 500 ) . json ( {
status : 'error' ,
timestamp : new Date ( ) . toISOString ( ) ,
database : 'disconnected' ,
error : error . message
} ) ;
}
} ) ;
// Start server
const PORT = process . env . PORT || 8080 ;
const HOST = process . env . HOST || '0.0.0.0' ;
initDatabase ( ) . then ( ( ) => {
server . listen ( PORT , HOST , ( ) => {
console . log ( ` PULSE Server running on http:// ${ HOST } : ${ PORT } ` ) ;
console . log ( ` Connected to MariaDB at ${ process . env . DB _HOST } ` ) ;
console . log ( ` Authentication: Authelia SSO ` ) ;
console . log ( ` Worker API Key configured: ${ process . env . WORKER _API _KEY ? 'Yes' : 'No' } ` ) ;
} ) ;
} ) . catch ( err => {
console . error ( 'Failed to start server:' , err ) ;
process . exit ( 1 ) ;
} ) ;
2025-11-30 13:03:18 -05:00
// ============================================
// WORKFLOW EXECUTION ENGINE
// Get execution details with logs
app . get ( '/api/executions/:id' , authenticateSSO , async ( req , res ) => {
try {
const [ rows ] = await pool . query ( 'SELECT * FROM executions WHERE id = ?' , [ req . params . id ] ) ;
if ( rows . length === 0 ) {
return res . status ( 404 ) . json ( { error : 'Not found' } ) ;
}
2026-01-08 22:06:25 -05:00
2025-11-30 13:03:18 -05:00
const execution = rows [ 0 ] ;
2026-01-08 22:06:25 -05:00
2025-11-30 13:03:18 -05:00
res . json ( {
... execution ,
2026-01-08 22:06:25 -05:00
logs : JSON . parse ( execution . logs || '[]' )
2025-11-30 13:03:18 -05:00
} ) ;
} catch ( error ) {
res . status ( 500 ) . json ( { error : error . message } ) ;
}
} ) ;
// Delete worker (admin only)
app . delete ( '/api/workers/:id' , authenticateSSO , async ( req , res ) => {
try {
if ( ! req . user . isAdmin ) {
return res . status ( 403 ) . json ( { error : 'Admin access required' } ) ;
}
await pool . query ( 'DELETE FROM workers WHERE id = ?' , [ req . params . id ] ) ;
res . json ( { success : true } ) ;
broadcast ( { type : 'worker_deleted' , worker _id : req . params . id } ) ;
} catch ( error ) {
res . status ( 500 ) . json ( { error : error . message } ) ;
}
} ) ;
// Send direct command to specific worker (for testing)
app . post ( '/api/workers/:id/command' , authenticateSSO , async ( req , res ) => {
try {
const { command } = req . body ;
const executionId = generateUUID ( ) ;
2026-01-07 20:24:11 -05:00
const workerId = req . params . id ;
// Create execution record in database
await pool . query (
'INSERT INTO executions (id, workflow_id, status, started_by, started_at, logs) VALUES (?, ?, ?, ?, NOW(), ?)' ,
[ executionId , null , 'running' , req . user . username , JSON . stringify ( [ {
step : 'quick_command' ,
action : 'command_sent' ,
worker _id : workerId ,
command : command ,
timestamp : new Date ( ) . toISOString ( )
} ] ) ]
) ;
2026-01-07 22:23:02 -05:00
// Send command via WebSocket to specific worker
2025-11-30 13:03:18 -05:00
const commandMessage = {
type : 'execute_command' ,
execution _id : executionId ,
command : command ,
2026-01-07 20:24:11 -05:00
worker _id : workerId ,
2025-11-30 13:03:18 -05:00
timeout : 60000
} ;
2026-01-07 22:23:02 -05:00
const workerWs = workers . get ( workerId ) ;
if ( ! workerWs || workerWs . readyState !== WebSocket . OPEN ) {
return res . status ( 400 ) . json ( { error : 'Worker not connected' } ) ;
}
workerWs . send ( JSON . stringify ( commandMessage ) ) ;
console . log ( ` Command sent to worker ${ workerId } : ${ command } ` ) ;
2025-11-30 13:03:18 -05:00
2026-01-07 20:24:11 -05:00
broadcast ( { type : 'execution_started' , execution _id : executionId , workflow _id : null } ) ;
2025-11-30 13:03:18 -05:00
res . json ( { success : true , execution _id : executionId } ) ;
} catch ( error ) {
res . status ( 500 ) . json ( { error : error . message } ) ;
}
} ) ;
// ============================================
// EXAMPLE WORKFLOW DEFINITIONS
// ============================================
// Example 1: Simple command execution
const simpleWorkflow = {
name : "Update System Packages" ,
description : "Update all packages on target servers" ,
steps : [
{
name : "Update package list" ,
type : "execute" ,
targets : [ "all" ] ,
command : "apt update"
} ,
{
name : "User Approval" ,
type : "prompt" ,
message : "Packages updated. Proceed with upgrade?" ,
options : [ "Yes" , "No" ]
} ,
{
name : "Upgrade packages" ,
type : "execute" ,
targets : [ "all" ] ,
command : "apt upgrade -y" ,
condition : "promptResponse === 'Yes'"
}
]
} ;
// Example 2: Complex workflow with conditions
const backupWorkflow = {
name : "Backup and Verify" ,
description : "Create backup and verify integrity" ,
steps : [
{
name : "Create backup" ,
type : "execute" ,
targets : [ "all" ] ,
command : "tar -czf /tmp/backup-$(date +%Y%m%d).tar.gz /opt/pulse-worker"
} ,
{
name : "Wait for backup" ,
type : "wait" ,
duration : 5000
} ,
{
name : "Verify backup" ,
type : "execute" ,
targets : [ "all" ] ,
command : "tar -tzf /tmp/backup-*.tar.gz > /dev/null && echo 'Backup OK' || echo 'Backup FAILED'"
} ,
{
name : "Cleanup decision" ,
type : "prompt" ,
message : "Backup complete. Delete old backups?" ,
options : [ "Yes" , "No" , "Cancel" ]
} ,
{
name : "Cleanup old backups" ,
type : "execute" ,
targets : [ "all" ] ,
command : "find /tmp -name 'backup-*.tar.gz' -mtime +7 -delete" ,
condition : "promptResponse === 'Yes'"
}
]
} ;