const express = require('express'); const http = require('http'); const WebSocket = require('ws'); const mysql = require('mysql2/promise'); const crypto = require('crypto'); const vm = require('vm'); const rateLimit = require('express-rate-limit'); const cronParser = require('cron-parser'); require('dotenv').config(); const app = express(); const server = http.createServer(app); const wss = new WebSocket.Server({ server }); // Middleware app.use(express.json()); app.use(express.static('public')); // Rate limiting const apiLimiter = rateLimit({ windowMs: 15 * 60 * 1000, // 15 minutes max: 300, standardHeaders: true, legacyHeaders: false, message: { error: 'Too many requests, please try again later' } }); const executionLimiter = rateLimit({ windowMs: 60 * 1000, // 1 minute max: 20, standardHeaders: true, legacyHeaders: false, message: { error: 'Too many execution requests, please slow down' } }); app.use('/api/', apiLimiter); // Named constants for timeouts and limits const PROMPT_TIMEOUT_MS = 60 * 60 * 1000; // 60 min — how long a prompt waits for user input const COMMAND_TIMEOUT_MS = 120_000; // 2 min — workflow step command timeout const QUICK_CMD_TIMEOUT_MS = 60_000; // 1 min — quick/direct/gandalf command timeout const WEBHOOK_TIMEOUT_MS = 10_000; // 10 s — outbound webhook HTTP timeout const WAIT_STEP_MAX_MS = 24 * 60 * 60_000; // 24 h — cap on workflow wait step const GOTO_MAX_VISITS = 100; // max times a step may be revisited via goto const WORKER_STALE_MINUTES = 5; // minutes before a worker is marked offline const SERVER_VERSION = '1.0.0'; // Request logging middleware app.use((req, res, next) => { const start = Date.now(); res.on('finish', () => { console.log(`[HTTP] ${req.method} ${req.path} ${res.statusCode} ${Date.now() - start}ms`); }); next(); }); // Content-Type guard for JSON endpoints function requireJSON(req, res, next) { const ct = req.headers['content-type'] || ''; if (!ct.includes('application/json')) { return res.status(415).json({ error: 'Content-Type must be application/json' }); } next(); } // Validate and parse a webhook URL; returns { ok, url, reason } function validateWebhookUrl(raw) { if (!raw) return { ok: true, url: null }; let url; try { url = new URL(raw); } catch { return { ok: false, reason: 'Invalid URL format' }; } if (!['http:', 'https:'].includes(url.protocol)) { return { ok: false, reason: 'Webhook URL must use http or https' }; } const host = url.hostname.toLowerCase(); if ( host === 'localhost' || host === '::1' || /^127\./.test(host) || /^10\./.test(host) || /^192\.168\./.test(host) || /^172\.(1[6-9]|2\d|3[01])\./.test(host) || /^169\.254\./.test(host) || /^fe80:/i.test(host) ) { return { ok: false, reason: 'Webhook URL must not point to a private/internal address' }; } return { ok: true, url }; } // Database pool const pool = mysql.createPool({ host: process.env.DB_HOST, port: process.env.DB_PORT || 3306, user: process.env.DB_USER, password: process.env.DB_PASSWORD, database: process.env.DB_NAME, waitForConnections: true, connectionLimit: 50, queueLimit: 0 }); // Initialize database tables async function initDatabase() { const connection = await pool.getConnection(); try { await connection.query(` CREATE TABLE IF NOT EXISTS users ( id VARCHAR(36) PRIMARY KEY, username VARCHAR(255) UNIQUE NOT NULL, display_name VARCHAR(255), email VARCHAR(255), groups TEXT, last_login TIMESTAMP, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ) `); // Database schema is managed manually - migrations removed after direct database fixes await connection.query(` CREATE TABLE IF NOT EXISTS workers ( id VARCHAR(36) PRIMARY KEY, name VARCHAR(255) UNIQUE NOT NULL, status VARCHAR(50) NOT NULL, last_heartbeat TIMESTAMP NULL, api_key VARCHAR(255), metadata JSON, INDEX idx_status (status), INDEX idx_heartbeat (last_heartbeat) ) `); await connection.query(` CREATE TABLE IF NOT EXISTS workflows ( id VARCHAR(36) PRIMARY KEY, name VARCHAR(255) NOT NULL, description TEXT, definition JSON NOT NULL, webhook_url VARCHAR(500) NULL, created_by VARCHAR(255), created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, INDEX idx_name (name) ) `); // Add webhook_url to existing workflows table if missing await connection.query(` ALTER TABLE workflows ADD COLUMN IF NOT EXISTS webhook_url VARCHAR(500) NULL `).catch(() => {}); await connection.query(` CREATE TABLE IF NOT EXISTS executions ( id VARCHAR(36) PRIMARY KEY, workflow_id VARCHAR(36) NULL, status VARCHAR(50) NOT NULL, started_by VARCHAR(255), started_at TIMESTAMP NULL, completed_at TIMESTAMP NULL, logs JSON, INDEX idx_workflow (workflow_id), INDEX idx_status (status), INDEX idx_started (started_at) ) `); await connection.query(` CREATE TABLE IF NOT EXISTS scheduled_commands ( id VARCHAR(36) PRIMARY KEY, name VARCHAR(255) NOT NULL, command TEXT NOT NULL, worker_ids JSON NOT NULL, schedule_type VARCHAR(50) NOT NULL, schedule_value VARCHAR(255) NOT NULL, enabled BOOLEAN DEFAULT TRUE, created_by VARCHAR(255), created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, last_run TIMESTAMP NULL, next_run TIMESTAMP NULL, INDEX idx_enabled (enabled), INDEX idx_next_run (next_run) ) `); // Recover stale executions from a previous server crash const [staleExecs] = await connection.query("SELECT id FROM executions WHERE status = 'running'"); if (staleExecs.length > 0) { for (const exec of staleExecs) { await connection.query( "UPDATE executions SET status = 'failed', completed_at = NOW() WHERE id = ?", [exec.id] ); await connection.query( "UPDATE executions SET logs = JSON_ARRAY_APPEND(COALESCE(logs, '[]'), '$', CAST(? AS JSON)) WHERE id = ?", [JSON.stringify({ action: 'server_restart_recovery', message: 'Execution marked failed due to server restart', timestamp: new Date().toISOString() }), exec.id] ); } console.log(`[Recovery] Marked ${staleExecs.length} stale execution(s) as failed`); } console.log('Database tables initialized successfully'); } catch (error) { console.error('Database initialization error:', error); throw error; } finally { connection.release(); } } // Auto-cleanup old executions (runs hourly) async function cleanupOldExecutions() { try { const retentionDays = parseInt(process.env.EXECUTION_RETENTION_DAYS) || 30; const [result] = await pool.query( `DELETE FROM executions WHERE status IN ('completed', 'failed') AND started_at < DATE_SUB(NOW(), INTERVAL ? DAY)`, [retentionDays] ); if (result.affectedRows > 0) { console.log(`[Cleanup] Removed ${result.affectedRows} executions older than ${retentionDays} day(s)`); } } catch (error) { console.error('[Cleanup] Error removing old executions:', error); } } // Run cleanup hourly setInterval(cleanupOldExecutions, 60 * 60 * 1000); // Run cleanup on startup cleanupOldExecutions(); // Scheduled Commands Processor async function processScheduledCommands() { try { const [schedules] = await pool.query( `SELECT * FROM scheduled_commands WHERE enabled = TRUE AND (next_run IS NULL OR next_run <= NOW())` ); for (const schedule of schedules) { // Atomically claim this run by advancing next_run before doing any work. // If two scheduler instances race, only the one that updates a row proceeds. const claimNextRun = calculateNextRun(schedule.schedule_type, schedule.schedule_value); const [claimed] = await pool.query( `UPDATE scheduled_commands SET next_run = ?, last_run = NOW() WHERE id = ? AND (next_run IS NULL OR next_run <= NOW())`, [claimNextRun, schedule.id] ); if (claimed.affectedRows === 0) { console.log(`[Scheduler] Skipping "${schedule.name}" - already claimed by another run`); continue; } // Also skip if a previous run is still active const [runningExecs] = await pool.query( "SELECT id FROM executions WHERE started_by = ? AND status = 'running'", [`scheduler:${schedule.name}`] ); if (runningExecs.length > 0) { console.log(`[Scheduler] Skipping "${schedule.name}" - previous execution still running`); continue; } console.log(`[Scheduler] Running scheduled command: ${schedule.name}`); // Handle both string (raw SQL) and object (auto-parsed by MySQL2 JSON column) let workerIds; try { workerIds = typeof schedule.worker_ids === 'string' ? JSON.parse(schedule.worker_ids) : schedule.worker_ids; } catch { console.error(`[Scheduler] Invalid worker_ids JSON for "${schedule.name}" — skipping`); continue; } // Execute command on each worker for (const workerId of workerIds) { const workerWs = workers.get(workerId); if (workerWs && workerWs.readyState === WebSocket.OPEN) { const executionId = crypto.randomUUID(); // Create execution record await pool.query( 'INSERT INTO executions (id, workflow_id, status, started_by, started_at, logs) VALUES (?, ?, ?, ?, NOW(), ?)', [executionId, null, 'running', `scheduler:${schedule.name}`, JSON.stringify([{ step: 'scheduled_command', action: 'command_sent', worker_id: workerId, command: schedule.command, timestamp: new Date().toISOString() }])] ); // Send command to worker workerWs.send(JSON.stringify({ type: 'execute_command', execution_id: executionId, command: schedule.command, worker_id: workerId, timeout: 5 * 60_000 // SCHEDULED_CMD_TIMEOUT })); broadcast({ type: 'execution_started', execution_id: executionId, workflow_id: null }); } } // next_run and last_run already updated atomically above when we claimed the slot } } catch (error) { console.error('[Scheduler] Error processing scheduled commands:', error); } } function calculateNextRun(scheduleType, scheduleValue) { const now = new Date(); if (scheduleType === 'interval') { // Interval in minutes const minutes = parseInt(scheduleValue); if (isNaN(minutes) || minutes <= 0) throw new Error(`Invalid interval value: ${scheduleValue}`); return new Date(now.getTime() + minutes * 60000); } else if (scheduleType === 'daily') { // Daily at HH:MM const [hours, minutes] = scheduleValue.split(':').map(Number); if (isNaN(hours) || isNaN(minutes)) throw new Error(`Invalid daily time format: ${scheduleValue}`); const next = new Date(now); next.setHours(hours, minutes, 0, 0); if (next <= now) next.setDate(next.getDate() + 1); return next; } else if (scheduleType === 'hourly') { // Every N hours const hours = parseInt(scheduleValue); if (isNaN(hours) || hours <= 0) throw new Error(`Invalid hourly value: ${scheduleValue}`); return new Date(now.getTime() + hours * 3600000); } else if (scheduleType === 'cron') { // Full cron expression e.g. "0 2 * * 0" (Sundays at 2am) const interval = cronParser.parseExpression(scheduleValue, { currentDate: now }); return interval.next().toDate(); } throw new Error(`Unknown schedule type: ${scheduleType}`); } // Run scheduler every minute setInterval(processScheduledCommands, 60 * 1000); // Initial run on startup setTimeout(processScheduledCommands, 5000); // Mark workers offline when their heartbeat goes stale async function markStaleWorkersOffline() { try { const [stale] = await pool.query( `SELECT id FROM workers WHERE status = 'online' AND last_heartbeat < DATE_SUB(NOW(), INTERVAL ? MINUTE)`, [WORKER_STALE_MINUTES] ); for (const w of stale) { await pool.query(`UPDATE workers SET status='offline' WHERE id=?`, [w.id]); broadcast({ type: 'worker_update', worker_id: w.id, status: 'offline' }); console.log(`[Worker] Marked stale worker ${w.id} offline`); } } catch (error) { console.error('[Worker] Stale check error:', error); } } setInterval(markStaleWorkersOffline, 60_000); // WebSocket connections const browserClients = new Set(); // Browser UI connections const workerClients = new Set(); // Worker agent connections const workers = new Map(); // Map worker_id -> WebSocket connection wss.on('connection', (ws) => { // Default to browser client until worker_connect identifies it as a worker browserClients.add(ws); // Handle incoming messages from workers ws.on('message', async (data) => { try { const message = JSON.parse(data.toString()); console.log('WebSocket message received:', message.type); if (message.type === 'command_result') { // Handle command result from worker const { execution_id, command_id, worker_id, success, stdout, stderr, duration, timestamp } = message; // Add result to execution logs await addExecutionLog(execution_id, { step: 'command_execution', action: 'command_result', command_id: command_id, // Include command_id for workflow tracking worker_id: worker_id, success: success, stdout: stdout, stderr: stderr, duration: duration, timestamp: timestamp || new Date().toISOString() }); // For non-workflow executions, update status immediately // For workflow executions, the workflow engine will update status const [execution] = await pool.query('SELECT workflow_id, started_by FROM executions WHERE id = ?', [execution_id]); const startedBy = execution.length > 0 ? (execution[0].started_by || '') : ''; const isAutomated = startedBy.startsWith('gandalf:') || startedBy.startsWith('scheduler:'); if (execution.length > 0 && !execution[0].workflow_id) { // Only update status for quick commands (no workflow_id) const finalStatus = success ? 'completed' : 'failed'; await updateExecutionStatus(execution_id, finalStatus); } // Resolve any pending event-driven command promise (eliminates DB polling) if (command_id) { const resolver = _commandResolvers.get(command_id); if (resolver) { resolver({ success, stdout, stderr }); } } // Broadcast result to browser clients only broadcast({ type: 'command_result', execution_id: execution_id, worker_id: worker_id, success: success, stdout: stdout, stderr: stderr, is_automated: isAutomated, }); console.log(`Command result received for execution ${execution_id}: ${success ? 'success' : 'failed'}`); } if (message.type === 'workflow_result') { // Handle workflow result from worker const { execution_id, worker_id, success, message: resultMessage, timestamp } = message; // Add final result to logs await addExecutionLog(execution_id, { step: 'workflow_completion', action: 'workflow_result', worker_id: worker_id, success: success, message: resultMessage, timestamp: timestamp || new Date().toISOString() }); // Update execution status const finalStatus = success ? 'completed' : 'failed'; await updateExecutionStatus(execution_id, finalStatus); // Broadcast completion to all clients broadcast({ type: 'workflow_result', execution_id: execution_id, status: finalStatus, success: success, message: resultMessage }); console.log(`Workflow result received for execution ${execution_id}: ${finalStatus}`); } if (message.type === 'worker_connect') { // Authenticate worker — reject if api_key is provided but wrong const { worker_id, worker_name, api_key } = message; if (api_key && api_key !== process.env.WORKER_API_KEY) { console.warn(`[Security] Worker connection rejected: invalid API key from "${worker_name}"`); ws.close(4001, 'Unauthorized'); return; } // Move from browser set to worker set browserClients.delete(ws); workerClients.add(ws); console.log(`Worker connected: ${worker_name} (${worker_id})`); // Find the database worker ID by name const [dbWorkers] = await pool.query( 'SELECT id FROM workers WHERE name = ?', [worker_name] ); if (dbWorkers.length > 0) { const dbWorkerId = dbWorkers[0].id; // Clean up any stale entry for this db worker before storing the new one // (handles reconnect: old runtime-ID entry would otherwise linger). for (const [key, val] of workers) { if (val.dbWorkerId === dbWorkerId && val !== ws) workers.delete(key); } // Store worker WebSocket connection using BOTH IDs workers.set(worker_id, ws); // Runtime ID workers.set(dbWorkerId, ws); // Database ID // Store mapping for cleanup ws.workerId = worker_id; ws.dbWorkerId = dbWorkerId; console.log(`Mapped worker: runtime_id=${worker_id}, db_id=${dbWorkerId}, name=${worker_name}`); // Update worker status to online await pool.query( `UPDATE workers SET status='online', last_heartbeat=NOW() WHERE id=?`, [dbWorkerId] ); // Broadcast worker status update with database ID broadcast({ type: 'worker_update', worker_id: dbWorkerId, status: 'online' }); } else { console.log(`Worker ${worker_name} not found in database, will be created on heartbeat`); } } if (message.type === 'pong') { // Use the DB worker ID stored on connect; fall back to message payload const dbId = ws.dbWorkerId || message.worker_id; if (dbId) { await pool.query( `UPDATE workers SET last_heartbeat=NOW() WHERE id=?`, [dbId] ); } } } catch (error) { console.error('WebSocket message error:', error); } }); ws.on('close', () => { browserClients.delete(ws); workerClients.delete(ws); if (ws.workerId) { workers.delete(ws.workerId); console.log(`Worker ${ws.workerId} (runtime ID) disconnected`); } if (ws.dbWorkerId) { workers.delete(ws.dbWorkerId); console.log(`Worker ${ws.dbWorkerId} (database ID) disconnected`); // Mark worker offline in DB pool.query(`UPDATE workers SET status='offline' WHERE id=?`, [ws.dbWorkerId]) .then(() => broadcast({ type: 'worker_update', worker_id: ws.dbWorkerId, status: 'offline' })) .catch(err => console.error('[Worker] Failed to mark worker offline:', err)); } }); }); // Broadcast to browser clients only (NOT worker agents) function broadcast(data) { // Snapshot the Set before iterating — a close event during iteration would // otherwise modify the Set in-place, causing skipped or double-visited entries. const snapshot = Array.from(browserClients); snapshot.forEach(client => { if (client.readyState === WebSocket.OPEN) { client.send(JSON.stringify(data)); } }); } // Helper function to add log entry to execution (atomic — no read-modify-write race condition) async function addExecutionLog(executionId, logEntry) { try { await pool.query( "UPDATE executions SET logs = JSON_ARRAY_APPEND(COALESCE(logs, '[]'), '$', CAST(? AS JSON)) WHERE id = ?", [JSON.stringify(logEntry), executionId] ); } catch (error) { console.error(`[Workflow] Error adding execution log:`, error); } } // Helper function to update execution status async function updateExecutionStatus(executionId, status) { try { await pool.query( 'UPDATE executions SET status = ?, completed_at = NOW() WHERE id = ?', [status, executionId] ); broadcast({ type: 'execution_status', execution_id: executionId, status: status }); console.log(`[Workflow] Execution ${executionId} status updated to: ${status}`); // Fire webhook if configured on the workflow and execution reached a terminal state if (status === 'completed' || status === 'failed') { fireWebhook(executionId, status).catch(err => { console.error(`[Webhook] Delivery error for execution ${executionId}:`, err.message); }); } } catch (error) { console.error(`[Workflow] Error updating execution status:`, error); } } // Fire optional webhook on workflow completion/failure async function fireWebhook(executionId, status) { const [rows] = await pool.query( `SELECT e.id, e.workflow_id, e.started_by, e.started_at, e.completed_at, w.webhook_url, w.name as workflow_name FROM executions e LEFT JOIN workflows w ON e.workflow_id = w.id WHERE e.id = ?`, [executionId] ); if (!rows.length || !rows[0].webhook_url) return; const exec = rows[0]; const { ok, url, reason } = validateWebhookUrl(exec.webhook_url); if (!ok) { console.warn(`[Webhook] Skipping invalid stored URL for execution ${executionId}: ${reason}`); return; } const payload = { execution_id: executionId, workflow_id: exec.workflow_id, workflow_name: exec.workflow_name, status, started_by: exec.started_by, started_at: exec.started_at, completed_at: exec.completed_at, timestamp: new Date().toISOString() }; const response = await fetch(exec.webhook_url, { method: 'POST', headers: { 'Content-Type': 'application/json', 'User-Agent': 'PULSE-Webhook/1.0' }, body: JSON.stringify(payload), signal: AbortSignal.timeout(WEBHOOK_TIMEOUT_MS) }); console.log(`[Webhook] ${url.hostname} responded ${response.status} for execution ${executionId}`); } // Authelia SSO Middleware async function authenticateSSO(req, res, next) { // Check for Authelia headers const remoteUser = req.headers['remote-user']; const remoteName = req.headers['remote-name']; const remoteEmail = req.headers['remote-email']; const remoteGroups = req.headers['remote-groups']; if (!remoteUser) { return res.status(401).json({ error: 'Not authenticated', message: 'Please access this service through auth.lotusguild.org' }); } // Check if user is in allowed groups (admin or employee) const groups = remoteGroups ? remoteGroups.split(',').map(g => g.trim()) : []; const allowedGroups = ['admin', 'employee']; const hasAccess = groups.some(g => allowedGroups.includes(g)); if (!hasAccess) { return res.status(403).json({ error: 'Access denied', message: 'You must be in admin or employee group' }); } // Store/update user in database try { const userId = crypto.randomUUID(); await pool.query( `INSERT INTO users (id, username, display_name, email, groups, last_login) VALUES (?, ?, ?, ?, ?, NOW()) ON DUPLICATE KEY UPDATE display_name=VALUES(display_name), email=VALUES(email), groups=VALUES(groups), last_login=NOW()`, [userId, remoteUser, remoteName, remoteEmail, remoteGroups] ); } catch (error) { console.error('Error updating user:', error); } // Attach user info to request req.user = { username: remoteUser, name: remoteName || remoteUser, email: remoteEmail || '', groups: groups, isAdmin: groups.includes('admin') }; next(); } // Gandalf machine-to-machine API key auth function authenticateGandalf(req, res, next) { const apiKey = req.headers['x-gandalf-api-key']; if (!apiKey || apiKey !== process.env.GANDALF_API_KEY) { return res.status(401).json({ error: 'Unauthorized' }); } req.user = { username: 'gandalf:link_stats', isAdmin: false }; next(); } // Workflow Execution Engine // Substitute {{param_name}} placeholders in a command string. // Only alphanumeric + safe punctuation allowed in substituted values. function applyParams(command, params) { return command.replace(/\{\{(\w+)\}\}/g, (match, key) => { if (!(key in params)) return match; const val = String(params[key]).trim(); if (!/^[a-zA-Z0-9._:@\-\/]+$/.test(val)) { throw new Error(`Unsafe value for workflow parameter "${key}"`); } return val; }); } // Evaluate a condition string against execution state and params. // Uses vm.runInNewContext with a timeout to avoid arbitrary code execution risk. function evalCondition(condition, state, params) { try { const context = vm.createContext({ state, params, promptResponse: state.promptResponse }); return !!vm.runInNewContext(condition, context, { timeout: 100 }); } catch (e) { console.warn(`[Workflow] evalCondition error (treated as false): ${e.message} — condition: ${condition}`); return false; } } // Per-execution mutable state (params + user-keyed state dict). // Survives across step boundaries; cleaned up when execution ends. const _executionState = new Map(); // executionId → { params, state } // Pending prompt resolvers — set when a prompt step is waiting for user input. const _executionPrompts = new Map(); // executionId → resolve fn const _commandResolvers = new Map(); // commandId → resolve fn (event-driven result delivery) async function executeWorkflowSteps(executionId, workflowId, definition, username, params = {}, dryRun = false) { _executionState.set(executionId, { params, state: {} }); // Global execution timeout const maxMinutes = parseInt(process.env.EXECUTION_MAX_MINUTES) || 60; const executionDeadline = Date.now() + maxMinutes * 60000; try { console.log(`[Workflow] Starting execution ${executionId} for workflow ${workflowId}${dryRun ? ' [DRY RUN]' : ''}`); const steps = definition.steps || []; // Build step-id → index map for goto support const stepIdMap = new Map(); steps.forEach((step, i) => { if (step.id) stepIdMap.set(step.id, i); }); let currentIndex = 0; const stepVisits = new Array(steps.length).fill(0); while (currentIndex < steps.length) { // Detect goto infinite loops stepVisits[currentIndex] = (stepVisits[currentIndex] || 0) + 1; if (stepVisits[currentIndex] > GOTO_MAX_VISITS) { await addExecutionLog(executionId, { action: 'workflow_error', error: `Infinite loop detected at step ${currentIndex + 1} (visited ${stepVisits[currentIndex]} times)`, timestamp: new Date().toISOString() }); await updateExecutionStatus(executionId, 'failed'); return; } // Enforce global execution timeout if (Date.now() > executionDeadline) { await addExecutionLog(executionId, { action: 'execution_timeout', message: `Execution exceeded maximum runtime of ${maxMinutes} minutes`, timestamp: new Date().toISOString() }); await updateExecutionStatus(executionId, 'failed'); return; } const step = steps[currentIndex]; const execState = _executionState.get(executionId); if (!execState) { // State was lost (server restart or bug) — fail cleanly rather than throwing TypeError await addExecutionLog(executionId, { action: 'workflow_error', error: 'Execution state lost unexpectedly; aborting.', timestamp: new Date().toISOString() }); await updateExecutionStatus(executionId, 'failed'); return; } const stepLabel = step.name || step.id || `Step ${currentIndex + 1}`; console.log(`[Workflow] ${executionId} — step ${currentIndex + 1}: ${stepLabel}`); await addExecutionLog(executionId, { step: currentIndex + 1, step_name: stepLabel, action: 'step_started', timestamp: new Date().toISOString() }); broadcast({ type: 'workflow_step_started', execution_id: executionId, step: currentIndex + 1, step_name: stepLabel }); let gotoId = step.goto || null; // may be overridden by prompt routes if (step.type === 'execute') { const condOk = !step.condition || evalCondition(step.condition, execState.state, execState.params); if (condOk) { if (dryRun) { await addExecutionLog(executionId, { step: currentIndex + 1, step_name: stepLabel, action: 'dry_run_skipped', command: step.command, targets: step.targets || ['all'], message: '[DRY RUN] Command not executed', timestamp: new Date().toISOString() }); } else { const success = await executeCommandStep(executionId, step, currentIndex + 1, params); if (!success) { await updateExecutionStatus(executionId, 'failed'); return; } } } else { await addExecutionLog(executionId, { step: currentIndex + 1, action: 'step_skipped', reason: 'condition false', timestamp: new Date().toISOString() }); } } else if (step.type === 'prompt') { const response = await executePromptStep(executionId, step, currentIndex + 1); if (response !== null) { execState.state[step.key || 'lastPrompt'] = response; execState.state.promptResponse = response; // backwards compat // Prompt routes override goto if (step.routes && step.routes[response]) { gotoId = step.routes[response]; } } } else if (step.type === 'wait') { let ms = parseFloat(step.duration || 5) * 1000; if (isNaN(ms) || ms < 0) ms = 5000; if (ms > WAIT_STEP_MAX_MS) ms = WAIT_STEP_MAX_MS; await addExecutionLog(executionId, { step: currentIndex + 1, action: 'waiting', duration_ms: ms, timestamp: new Date().toISOString() }); await new Promise(r => setTimeout(r, ms)); } await addExecutionLog(executionId, { step: currentIndex + 1, step_name: stepLabel, action: 'step_completed', timestamp: new Date().toISOString() }); broadcast({ type: 'workflow_step_completed', execution_id: executionId, step: currentIndex + 1, step_name: stepLabel }); // Determine next step if (gotoId === 'end' || gotoId === '__end__') { break; } else if (gotoId) { if (stepIdMap.has(gotoId)) { currentIndex = stepIdMap.get(gotoId); } else { await addExecutionLog(executionId, { action: 'goto_error', target: gotoId, message: `No step with id "${gotoId}" found`, timestamp: new Date().toISOString() }); break; } } else { currentIndex++; } } await updateExecutionStatus(executionId, 'completed'); console.log(`[Workflow] Execution ${executionId} completed`); } catch (error) { console.error(`[Workflow] Execution ${executionId} error:`, error); await addExecutionLog(executionId, { action: 'workflow_error', error: error.message, timestamp: new Date().toISOString() }); await updateExecutionStatus(executionId, 'failed'); } finally { _executionState.delete(executionId); _executionPrompts.delete(executionId); } } // Pause execution and wait for user to respond via POST /api/executions/:id/respond. // Resolves with the chosen option string, or null on 60-minute timeout. async function executePromptStep(executionId, step, stepNumber) { const message = step.message || 'Please choose an option:'; const options = step.options || ['Continue']; await addExecutionLog(executionId, { step: stepNumber, step_name: step.name, action: 'prompt', message, options, timestamp: new Date().toISOString() }); broadcast({ type: 'execution_prompt', execution_id: executionId, prompt: { message, options, step: stepNumber, step_name: step.name } }); return new Promise(resolve => { const timer = setTimeout(() => { _executionPrompts.delete(executionId); console.warn(`[Workflow] Prompt timed out for execution ${executionId}`); resolve(null); }, PROMPT_TIMEOUT_MS); _executionPrompts.set(executionId, { resolve: (response) => { clearTimeout(timer); _executionPrompts.delete(executionId); resolve(response); }, options }); }); } async function executeCommandStep(executionId, step, stepNumber, params = {}) { try { let command = step.command; if (Object.keys(params).length > 0) { command = applyParams(command, params); } const targets = step.targets || ['all']; // Determine which workers to target let targetWorkerIds = []; if (targets.includes('all')) { // Get all online workers const [onlineWorkers] = await pool.query('SELECT id FROM workers WHERE status = ?', ['online']); targetWorkerIds = onlineWorkers.map(w => w.id); } else { // Specific worker IDs or names for (const target of targets) { // Try to find by ID first, then by name const [workerById] = await pool.query('SELECT id FROM workers WHERE id = ?', [target]); if (workerById.length > 0) { targetWorkerIds.push(workerById[0].id); } else { const [workerByName] = await pool.query('SELECT id FROM workers WHERE name = ?', [target]); if (workerByName.length > 0) { targetWorkerIds.push(workerByName[0].id); } } } } if (targetWorkerIds.length === 0) { await addExecutionLog(executionId, { step: stepNumber, action: 'no_workers', message: 'No workers available for this step', timestamp: new Date().toISOString() }); return false; } // Execute command on each target worker and wait for results const results = []; for (const workerId of targetWorkerIds) { const workerWs = workers.get(workerId); if (!workerWs || workerWs.readyState !== WebSocket.OPEN) { await addExecutionLog(executionId, { step: stepNumber, action: 'worker_offline', worker_id: workerId, timestamp: new Date().toISOString() }); continue; } // Send command to worker const commandId = crypto.randomUUID(); await addExecutionLog(executionId, { step: stepNumber, action: 'command_sent', worker_id: workerId, command: command, command_id: commandId, timestamp: new Date().toISOString() }); workerWs.send(JSON.stringify({ type: 'execute_command', execution_id: executionId, command_id: commandId, command: command, worker_id: workerId, timeout: COMMAND_TIMEOUT_MS })); // Wait for command result (with timeout) const result = await waitForCommandResult(executionId, commandId, COMMAND_TIMEOUT_MS); results.push(result); if (!result.success) { // Command failed, workflow should stop return false; } } // All commands succeeded return results.every(r => r.success); } catch (error) { console.error(`[Workflow] Error executing command step:`, error); await addExecutionLog(executionId, { step: stepNumber, action: 'step_error', error: error.message, timestamp: new Date().toISOString() }); return false; } } // Wait for a command result using event-driven promise resolution. // The resolver is stored in _commandResolvers and called immediately when // command_result arrives via WebSocket — no DB polling required. async function waitForCommandResult(executionId, commandId, timeout) { return new Promise((resolve) => { const timer = setTimeout(() => { _commandResolvers.delete(commandId); resolve({ success: false, error: 'Command timeout' }); }, timeout); _commandResolvers.set(commandId, (result) => { clearTimeout(timer); _commandResolvers.delete(commandId); resolve(result); }); }); } // Routes - All protected by SSO app.get('/api/user', authenticateSSO, (req, res) => { res.json(req.user); }); app.get('/api/workflows', authenticateSSO, async (req, res) => { try { const [rows] = await pool.query('SELECT * FROM workflows ORDER BY created_at DESC'); res.json(rows); } catch (error) { console.error('[API] GET /api/workflows error:', error); res.status(500).json({ error: 'Internal server error' }); } }); app.get('/api/workflows/:id', authenticateSSO, async (req, res) => { try { const [rows] = await pool.query('SELECT * FROM workflows WHERE id = ?', [req.params.id]); if (rows.length === 0) return res.status(404).json({ error: 'Not found' }); const wf = rows[0]; let definition = {}; try { definition = JSON.parse(wf.definition || '{}'); } catch { /* corrupt definition — return empty */ } res.json({ ...wf, definition }); } catch (error) { console.error('[API] GET /api/workflows/:id error:', error); res.status(500).json({ error: 'Internal server error' }); } }); app.post('/api/workflows', authenticateSSO, requireJSON, async (req, res) => { try { const { name, description, definition, webhook_url } = req.body; if (!name || !definition) return res.status(400).json({ error: 'name and definition are required' }); const webhookCheck = validateWebhookUrl(webhook_url); if (!webhookCheck.ok) return res.status(400).json({ error: webhookCheck.reason }); const id = crypto.randomUUID(); await pool.query( 'INSERT INTO workflows (id, name, description, definition, webhook_url, created_by) VALUES (?, ?, ?, ?, ?, ?)', [id, name, description, JSON.stringify(definition), webhook_url || null, req.user.username] ); res.json({ id, name, description, definition, webhook_url: webhook_url || null }); broadcast({ type: 'workflow_created', workflow_id: id }); } catch (error) { console.error('[Workflow] Error creating workflow:', error); res.status(500).json({ error: 'Internal server error' }); } }); app.delete('/api/workflows/:id', authenticateSSO, async (req, res) => { try { // Only admins can delete workflows if (!req.user.isAdmin) { return res.status(403).json({ error: 'Admin access required' }); } await pool.query('DELETE FROM workflows WHERE id = ?', [req.params.id]); res.json({ success: true }); broadcast({ type: 'workflow_deleted', workflow_id: req.params.id }); } catch (error) { res.status(500).json({ error: 'Internal server error' }); } }); app.get('/api/workers', authenticateSSO, async (req, res) => { try { const [rows] = await pool.query('SELECT id, name, status, last_heartbeat, metadata FROM workers ORDER BY name'); res.json(rows); } catch (error) { console.error('[API] GET /api/workers error:', error); res.status(500).json({ error: 'Internal server error' }); } }); app.post('/api/workers/heartbeat', async (req, res) => { try { const { worker_id, name, metadata } = req.body; const apiKey = req.headers['x-api-key']; // Verify API key — reject missing or wrong keys if (!apiKey || apiKey !== process.env.WORKER_API_KEY) { return res.status(401).json({ error: 'Invalid API key' }); } await pool.query( `INSERT INTO workers (id, name, status, last_heartbeat, api_key, metadata) VALUES (?, ?, 'online', NOW(), ?, ?) ON DUPLICATE KEY UPDATE status='online', last_heartbeat=NOW(), metadata=VALUES(metadata)`, [worker_id, name, apiKey, JSON.stringify(metadata)] ); broadcast({ type: 'worker_update', worker_id, status: 'online' }); res.json({ success: true }); } catch (error) { console.error('[Worker] Heartbeat error:', error); res.status(500).json({ error: 'Internal server error' }); } }); app.post('/api/executions', executionLimiter, authenticateSSO, requireJSON, async (req, res) => { try { const { workflow_id, params = {}, dry_run = false } = req.body; if (!workflow_id) return res.status(400).json({ error: 'workflow_id is required' }); const id = crypto.randomUUID(); // Get workflow definition const [workflows] = await pool.query('SELECT * FROM workflows WHERE id = ?', [workflow_id]); if (workflows.length === 0) { return res.status(404).json({ error: 'Workflow not found' }); } const workflow = workflows[0]; let definition; try { definition = typeof workflow.definition === 'string' ? JSON.parse(workflow.definition) : workflow.definition; } catch { return res.status(500).json({ error: 'Workflow definition is corrupt' }); } // Validate required params const paramDefs = definition.params || []; for (const pd of paramDefs) { if (pd.required && !params[pd.name]) { return res.status(400).json({ error: `Missing required parameter: ${pd.label || pd.name}` }); } } // Create execution record const initLogs = Object.keys(params).length > 0 ? [{ action: 'params', params, timestamp: new Date().toISOString() }] : []; await pool.query( 'INSERT INTO executions (id, workflow_id, status, started_by, started_at, logs) VALUES (?, ?, ?, ?, NOW(), ?)', [id, workflow_id, 'running', req.user.username, JSON.stringify(initLogs)] ); if (dry_run) { broadcast({ type: 'execution_started', execution_id: id, workflow_id, dry_run: true }); } else { broadcast({ type: 'execution_started', execution_id: id, workflow_id }); } // Start workflow execution asynchronously executeWorkflowSteps(id, workflow_id, definition, req.user.username, params, dry_run).catch(err => { console.error(`[Workflow] Execution ${id} failed:`, err); }); res.json({ id, workflow_id, status: 'running', dry_run: !!dry_run }); } catch (error) { console.error('[Execution] Error starting execution:', error); res.status(500).json({ error: 'Internal server error' }); } }); app.get('/api/executions', authenticateSSO, async (req, res) => { try { const limit = Math.min(parseInt(req.query.limit) || 50, 1000); const offset = parseInt(req.query.offset) || 0; const conditions = []; const queryParams = []; // Existing filter if (req.query.hide_internal === 'true') { conditions.push("e.started_by NOT LIKE 'gandalf:%' AND e.started_by NOT LIKE 'scheduler:%'"); } // New filters if (req.query.status) { conditions.push('e.status = ?'); queryParams.push(req.query.status); } if (req.query.workflow_id) { conditions.push('e.workflow_id = ?'); queryParams.push(req.query.workflow_id); } if (req.query.started_by) { conditions.push('e.started_by = ?'); queryParams.push(req.query.started_by); } if (req.query.after) { conditions.push('e.started_at >= ?'); queryParams.push(new Date(req.query.after)); } if (req.query.before) { conditions.push('e.started_at <= ?'); queryParams.push(new Date(req.query.before)); } if (req.query.search) { conditions.push('(w.name LIKE ? OR e.started_by LIKE ?)'); const term = `%${req.query.search}%`; queryParams.push(term, term); } const whereClause = conditions.length ? `WHERE ${conditions.join(' AND ')}` : ''; const [rows] = await pool.query( `SELECT e.id, e.workflow_id, e.status, e.started_by, e.started_at, e.completed_at, w.name as workflow_name FROM executions e LEFT JOIN workflows w ON e.workflow_id = w.id ${whereClause} ORDER BY e.started_at DESC LIMIT ? OFFSET ?`, [...queryParams, limit, offset] ); const [countRows] = await pool.query( `SELECT COUNT(*) as total FROM executions e LEFT JOIN workflows w ON e.workflow_id = w.id ${whereClause}`, queryParams ); const total = countRows[0].total; res.json({ executions: rows, total, limit, offset, hasMore: offset + rows.length < total }); } catch (error) { res.status(500).json({ error: 'Internal server error' }); } }); app.delete('/api/executions/completed', authenticateSSO, async (req, res) => { try { if (!req.user.isAdmin) return res.status(403).json({ error: 'Admin access required' }); const [result] = await pool.query( "DELETE FROM executions WHERE status IN ('completed', 'failed')" ); broadcast({ type: 'executions_bulk_deleted' }); res.json({ success: true, deleted: result.affectedRows }); } catch (error) { console.error('[Execution] Bulk delete error:', error); res.status(500).json({ error: 'Internal server error' }); } }); app.delete('/api/executions/:id', authenticateSSO, async (req, res) => { try { if (!req.user.isAdmin) return res.status(403).json({ error: 'Admin access required' }); await pool.query('DELETE FROM executions WHERE id = ?', [req.params.id]); broadcast({ type: 'execution_deleted', execution_id: req.params.id }); res.json({ success: true }); } catch (error) { console.error('[Execution] Error deleting execution:', error); res.status(500).json({ error: 'Internal server error' }); } }); app.post('/api/executions/:id/abort', authenticateSSO, async (req, res) => { try { const executionId = req.params.id; // Check if execution exists and is running const [execution] = await pool.query('SELECT status FROM executions WHERE id = ?', [executionId]); if (execution.length === 0) { return res.status(404).json({ error: 'Execution not found' }); } if (execution[0].status !== 'running') { return res.status(400).json({ error: 'Execution is not running' }); } // Add abort log entry await addExecutionLog(executionId, { action: 'execution_aborted', aborted_by: req.user.username, timestamp: new Date().toISOString() }); // Update execution status to failed await updateExecutionStatus(executionId, 'failed'); // Unblock any pending prompt so the thread can exit const pending = _executionPrompts.get(executionId); if (pending) pending.resolve(null); console.log(`[Execution] Execution ${executionId} aborted by ${req.user.username}`); res.json({ success: true }); } catch (error) { console.error('[Execution] Error aborting execution:', error); res.status(500).json({ error: 'Internal server error' }); } }); // Respond to a pending prompt in a running execution app.post('/api/executions/:id/respond', authenticateSSO, async (req, res) => { try { const { id } = req.params; const { response } = req.body; if (!response || typeof response !== 'string') { return res.status(400).json({ error: 'response is required' }); } const pending = _executionPrompts.get(id); if (!pending) return res.status(404).json({ error: 'No pending prompt for this execution' }); // Validate response is one of the allowed options if (pending.options && !pending.options.includes(response)) { return res.status(400).json({ error: `Invalid response. Allowed values: ${pending.options.join(', ')}` }); } await addExecutionLog(id, { action: 'prompt_response', response, responded_by: req.user.username, timestamp: new Date().toISOString() }); broadcast({ type: 'prompt_response', execution_id: id, response }); pending.resolve(response); res.json({ success: true }); } catch (error) { res.status(500).json({ error: 'Internal server error' }); } }); // Edit a workflow definition (admin only) app.put('/api/workflows/:id', authenticateSSO, requireJSON, async (req, res) => { try { if (!req.user.isAdmin) return res.status(403).json({ error: 'Admin only' }); const { id } = req.params; const { name, description, definition, webhook_url } = req.body; if (!name || !definition) return res.status(400).json({ error: 'name and definition required' }); const webhookCheck = validateWebhookUrl(webhook_url); if (!webhookCheck.ok) return res.status(400).json({ error: webhookCheck.reason }); let defObj; try { defObj = typeof definition === 'string' ? JSON.parse(definition) : definition; } catch { return res.status(400).json({ error: 'Invalid JSON in definition' }); } const [result] = await pool.query( 'UPDATE workflows SET name=?, description=?, definition=?, webhook_url=?, updated_at=NOW() WHERE id=?', [name, description || '', JSON.stringify(defObj), webhook_url || null, id] ); if (result.affectedRows === 0) return res.status(404).json({ error: 'Workflow not found' }); broadcast({ type: 'workflow_updated', workflow_id: id }); res.json({ success: true }); } catch (error) { console.error('[Workflow] Error updating workflow:', error); res.status(500).json({ error: 'Internal server error' }); } }); // Scheduled Commands API app.get('/api/scheduled-commands', authenticateSSO, async (req, res) => { try { const [schedules] = await pool.query( 'SELECT * FROM scheduled_commands ORDER BY created_at DESC' ); res.json(schedules); } catch (error) { res.status(500).json({ error: 'Internal server error' }); } }); app.post('/api/scheduled-commands', authenticateSSO, requireJSON, async (req, res) => { try { if (!req.user.isAdmin) return res.status(403).json({ error: 'Admin access required' }); const { name, command, worker_ids, schedule_type, schedule_value } = req.body; if (!name || !command || !worker_ids || !schedule_type || !schedule_value) { return res.status(400).json({ error: 'Missing required fields' }); } // Validate schedule_value for numeric types if (schedule_type === 'interval' || schedule_type === 'hourly') { const n = parseInt(schedule_value, 10); if (!Number.isInteger(n) || n <= 0) { return res.status(400).json({ error: `schedule_value for type "${schedule_type}" must be a positive integer` }); } } const id = crypto.randomUUID(); const nextRun = calculateNextRun(schedule_type, schedule_value); await pool.query( `INSERT INTO scheduled_commands (id, name, command, worker_ids, schedule_type, schedule_value, created_by, next_run) VALUES (?, ?, ?, ?, ?, ?, ?, ?)`, [id, name, command, JSON.stringify(worker_ids), schedule_type, schedule_value, req.user.username, nextRun] ); res.json({ success: true, id }); } catch (error) { res.status(500).json({ error: 'Internal server error' }); } }); app.put('/api/scheduled-commands/:id/toggle', authenticateSSO, async (req, res) => { try { if (!req.user.isAdmin) return res.status(403).json({ error: 'Admin access required' }); const { id } = req.params; const [current] = await pool.query('SELECT enabled FROM scheduled_commands WHERE id = ?', [id]); if (current.length === 0) { return res.status(404).json({ error: 'Schedule not found' }); } const newEnabled = !current[0].enabled; await pool.query('UPDATE scheduled_commands SET enabled = ? WHERE id = ?', [newEnabled, id]); res.json({ success: true, enabled: newEnabled }); } catch (error) { res.status(500).json({ error: 'Internal server error' }); } }); app.delete('/api/scheduled-commands/:id', authenticateSSO, async (req, res) => { try { if (!req.user.isAdmin) return res.status(403).json({ error: 'Admin access required' }); const { id } = req.params; await pool.query('DELETE FROM scheduled_commands WHERE id = ?', [id]); res.json({ success: true }); } catch (error) { res.status(500).json({ error: 'Internal server error' }); } }); // Internal M2M API for Gandalf app.post('/api/internal/command', authenticateGandalf, async (req, res) => { try { const { worker_id, command } = req.body; if (!worker_id || !command) { return res.status(400).json({ error: 'worker_id and command are required' }); } const workerWs = workers.get(worker_id); if (!workerWs || workerWs.readyState !== WebSocket.OPEN) { return res.status(400).json({ error: 'Worker not connected' }); } const executionId = crypto.randomUUID(); await pool.query( 'INSERT INTO executions (id, workflow_id, status, started_by, started_at, logs) VALUES (?, ?, ?, ?, NOW(), ?)', [executionId, null, 'running', req.user.username, JSON.stringify([{ step: 'internal_command', action: 'command_sent', worker_id: worker_id, command: command, timestamp: new Date().toISOString() }])] ); workerWs.send(JSON.stringify({ type: 'execute_command', execution_id: executionId, command: command, worker_id: worker_id, timeout: QUICK_CMD_TIMEOUT_MS })); res.json({ execution_id: executionId }); } catch (error) { res.status(500).json({ error: 'Internal server error' }); } }); app.get('/api/internal/executions/:id', authenticateGandalf, async (req, res) => { try { const [rows] = await pool.query('SELECT * FROM executions WHERE id = ?', [req.params.id]); if (rows.length === 0) { return res.status(404).json({ error: 'Not found' }); } const execution = rows[0]; let logs = []; try { logs = JSON.parse(execution.logs || '[]'); } catch { logs = []; } res.json({ ...execution, logs }); } catch (error) { console.error('[API] GET /api/internal/executions/:id error:', error); res.status(500).json({ error: 'Internal server error' }); } }); // Detailed health + stats (auth required) app.get('/api/health', authenticateSSO, async (req, res) => { try { await pool.query('SELECT 1'); const [workerRows] = await pool.query('SELECT status, COUNT(*) as count FROM workers GROUP BY status'); const workerCounts = Object.fromEntries(workerRows.map(r => [r.status, Number(r.count)])); res.json({ status: 'ok', version: SERVER_VERSION, uptime_seconds: Math.floor(process.uptime()), timestamp: new Date().toISOString(), workers: { online: workerCounts.online || 0, offline: workerCounts.offline || 0 }, database: 'connected' }); } catch (error) { console.error('[Health] /api/health error:', error); res.status(500).json({ status: 'error', timestamp: new Date().toISOString() }); } }); // Health check (no auth required) app.get('/health', async (req, res) => { try { await pool.query('SELECT 1'); res.json({ status: 'ok', timestamp: new Date().toISOString(), database: 'connected', auth: 'authelia-sso' }); } catch (error) { res.status(500).json({ status: 'error', timestamp: new Date().toISOString(), database: 'disconnected', error: 'Internal server error' }); } }); // Get execution details with logs app.get('/api/executions/:id', authenticateSSO, async (req, res) => { try { const [rows] = await pool.query('SELECT * FROM executions WHERE id = ?', [req.params.id]); if (rows.length === 0) { return res.status(404).json({ error: 'Not found' }); } const execution = rows[0]; let parsedLogs = []; try { parsedLogs = typeof execution.logs === 'string' ? JSON.parse(execution.logs || '[]') : (execution.logs || []); } catch { parsedLogs = []; } const waitingForInput = _executionPrompts.has(req.params.id); let pendingPrompt = null; if (waitingForInput) { for (let i = parsedLogs.length - 1; i >= 0; i--) { if (parsedLogs[i].action === 'prompt') { pendingPrompt = { message: parsedLogs[i].message, options: parsedLogs[i].options }; break; } } } res.json({ ...execution, logs: parsedLogs, waiting_for_input: waitingForInput, prompt: pendingPrompt, }); } catch (error) { console.error('[API] GET /api/executions/:id error:', error); res.status(500).json({ error: 'Internal server error' }); } }); // Delete worker (admin only) app.delete('/api/workers/:id', authenticateSSO, async (req, res) => { try { if (!req.user.isAdmin) { return res.status(403).json({ error: 'Admin access required' }); } await pool.query('DELETE FROM workers WHERE id = ?', [req.params.id]); res.json({ success: true }); broadcast({ type: 'worker_deleted', worker_id: req.params.id }); } catch (error) { res.status(500).json({ error: 'Internal server error' }); } }); // Send direct command to specific worker app.post('/api/workers/:id/command', authenticateSSO, requireJSON, async (req, res) => { try { const { command } = req.body; if (!command || typeof command !== 'string' || !command.trim()) { return res.status(400).json({ error: 'command is required' }); } const executionId = crypto.randomUUID(); const workerId = req.params.id; // Create execution record in database await pool.query( 'INSERT INTO executions (id, workflow_id, status, started_by, started_at, logs) VALUES (?, ?, ?, ?, NOW(), ?)', [executionId, null, 'running', req.user.username, JSON.stringify([{ step: 'quick_command', action: 'command_sent', worker_id: workerId, command: command, timestamp: new Date().toISOString() }])] ); // Send command via WebSocket to specific worker const commandMessage = { type: 'execute_command', execution_id: executionId, command: command, worker_id: workerId, timeout: QUICK_CMD_TIMEOUT_MS }; const workerWs = workers.get(workerId); if (!workerWs || workerWs.readyState !== WebSocket.OPEN) { return res.status(400).json({ error: 'Worker not connected' }); } workerWs.send(JSON.stringify(commandMessage)); console.log(`Command sent to worker ${workerId}: ${command}`); broadcast({ type: 'execution_started', execution_id: executionId, workflow_id: null }); res.json({ success: true, execution_id: executionId }); } catch (error) { res.status(500).json({ error: 'Internal server error' }); } }); // Start server const PORT = process.env.PORT || 8080; const HOST = process.env.HOST || '0.0.0.0'; initDatabase().then(() => { server.listen(PORT, HOST, () => { console.log(`PULSE Server running on http://${HOST}:${PORT}`); console.log(`Connected to MariaDB at ${process.env.DB_HOST}`); console.log(`Authentication: Authelia SSO`); console.log(`Worker API Key configured: ${process.env.WORKER_API_KEY ? 'Yes' : 'No'}`); }); }).catch(err => { console.error('Failed to start server:', err); process.exit(1); });