From 58c172e1311cbdfd1565cc6bbb1bed8bd086e9fb Mon Sep 17 00:00:00 2001 From: Jared Vititoe Date: Wed, 11 Mar 2026 22:53:25 -0400 Subject: [PATCH] Security hardening, bug fixes, and performance improvements MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Security fixes: - Replace new Function() condition eval with vm.runInNewContext() (RCE fix) - Add admin checks to DELETE executions, all scheduled-commands endpoints - Remove api_key from GET /api/workers response (was exposed to all employees) - Separate browserClients/workerClients sets; broadcast() now sends to browsers only - Add worker WebSocket auth: reject if api_key provided but invalid - Fix XSS: escapeHtml() on step_name, duration, worker_id, user info, execution_id Bug fixes: - Replace DB-polling waitForCommandResult with event-driven _commandResolvers Map - Replace non-atomic addExecutionLog with JSON_ARRAY_APPEND (fixes concurrent write race) - Add stale execution recovery on startup: running→failed with log entry - Fix calculateNextRun returning null for unknown types (now throws) - Fix scheduler overlap: skip if previous execution still running - Fix JSON double-parse on worker_ids column - Fix switchTab() bare event.target reference - Fix selectedExecutions Array→Set (O(1) lookups, fixes performance regression) - Fix param modal event listener leak (delegated handler, removes before re-adding) - Add ws.onerror handler (was silently swallowing WebSocket errors) - Move misplaced routes to before server.listen() Performance/cleanup: - DB connection pool 10→50 - EXECUTION_RETENTION_DAYS default 1→30 (matches docs) - Remove unused packages: bcryptjs, body-parser, cors, js-yaml, jsonwebtoken - Remove generateUUID() wrapper, use crypto.randomUUID() directly - Remove dead example workflow constants - Add ESC key handler to close modals - Fix clearCompletedExecutions limit 1000→9999 - Add security notice to README.md Co-Authored-By: Claude Sonnet 4.6 --- README.md | 2 + package-lock.json | 186 ----------------------------- package.json | 5 - public/index.html | 74 +++++++----- server.js | 296 +++++++++++++++++++--------------------------- 5 files changed, 169 insertions(+), 394 deletions(-) diff --git a/README.md b/README.md index 56dca34..516c011 100644 --- a/README.md +++ b/README.md @@ -2,6 +2,8 @@ A distributed workflow orchestration platform for managing and executing complex multi-step operations across server clusters through a retro terminal-themed web interface. +> **Security Notice:** This repository is hosted on Gitea and is version-controlled. **Never commit secrets, credentials, passwords, API keys, or any sensitive information to this repo.** All sensitive configuration belongs exclusively in `.env` files which are listed in `.gitignore` and must never be committed. This includes database passwords, worker API keys, webhook secrets, and internal IP details. + ## Overview PULSE is a centralized workflow execution system designed to orchestrate operations across distributed infrastructure. It provides a powerful web-based interface with a vintage CRT terminal aesthetic for defining, managing, and executing workflows that can span multiple servers, require human interaction, and perform complex automation tasks at scale. diff --git a/package-lock.json b/package-lock.json index 36e8fec..4ca1c48 100644 --- a/package-lock.json +++ b/package-lock.json @@ -9,13 +9,8 @@ "version": "1.0.0", "license": "ISC", "dependencies": { - "bcryptjs": "^3.0.3", - "body-parser": "^2.2.1", - "cors": "^2.8.5", "dotenv": "^17.2.3", "express": "^5.1.0", - "js-yaml": "^4.1.1", - "jsonwebtoken": "^9.0.2", "mysql2": "^3.15.3", "ws": "^8.18.3" } @@ -33,12 +28,6 @@ "node": ">= 0.6" } }, - "node_modules/argparse": { - "version": "2.0.1", - "resolved": "https://registry.npmjs.org/argparse/-/argparse-2.0.1.tgz", - "integrity": "sha512-8+9WqebbFzpX9OR+Wa6O29asIogeRMzcGtAINdpMHHyAg10f05aSFVBbcEqGf/PXw1EjAZ+q2/bEBg3DvurK3Q==", - "license": "Python-2.0" - }, "node_modules/aws-ssl-profiles": { "version": "1.1.2", "resolved": "https://registry.npmjs.org/aws-ssl-profiles/-/aws-ssl-profiles-1.1.2.tgz", @@ -48,15 +37,6 @@ "node": ">= 6.0.0" } }, - "node_modules/bcryptjs": { - "version": "3.0.3", - "resolved": "https://registry.npmjs.org/bcryptjs/-/bcryptjs-3.0.3.tgz", - "integrity": "sha512-GlF5wPWnSa/X5LKM1o0wz0suXIINz1iHRLvTS+sLyi7XPbe5ycmYI3DlZqVGZZtDgl4DmasFg7gOB3JYbphV5g==", - "license": "BSD-3-Clause", - "bin": { - "bcrypt": "bin/bcrypt" - } - }, "node_modules/body-parser": { "version": "2.2.1", "resolved": "https://registry.npmjs.org/body-parser/-/body-parser-2.2.1.tgz", @@ -81,12 +61,6 @@ "url": "https://opencollective.com/express" } }, - "node_modules/buffer-equal-constant-time": { - "version": "1.0.1", - "resolved": "https://registry.npmjs.org/buffer-equal-constant-time/-/buffer-equal-constant-time-1.0.1.tgz", - "integrity": "sha512-zRpUiDwd/xk6ADqPMATG8vc9VPrkck7T07OIx0gnjmJAnHnTVXNQG3vfvWNuiZIkwu9KrKdA1iJKfsfTVxE6NA==", - "license": "BSD-3-Clause" - }, "node_modules/bytes": { "version": "3.1.2", "resolved": "https://registry.npmjs.org/bytes/-/bytes-3.1.2.tgz", @@ -165,19 +139,6 @@ "node": ">=6.6.0" } }, - "node_modules/cors": { - "version": "2.8.5", - "resolved": "https://registry.npmjs.org/cors/-/cors-2.8.5.tgz", - "integrity": "sha512-KIHbLJqu73RGr/hnbrO9uBeixNGuvSQjul/jdFvS/KFSIH1hWVd1ng7zOHx+YrEfInLG7q4n6GHQ9cDtxv/P6g==", - "license": "MIT", - "dependencies": { - "object-assign": "^4", - "vary": "^1" - }, - "engines": { - "node": ">= 0.10" - } - }, "node_modules/debug": { "version": "4.4.3", "resolved": "https://registry.npmjs.org/debug/-/debug-4.4.3.tgz", @@ -239,15 +200,6 @@ "node": ">= 0.4" } }, - "node_modules/ecdsa-sig-formatter": { - "version": "1.0.11", - "resolved": "https://registry.npmjs.org/ecdsa-sig-formatter/-/ecdsa-sig-formatter-1.0.11.tgz", - "integrity": "sha512-nagl3RYrbNv6kQkeJIpt6NJZy8twLB/2vtz6yN9Z4vRKHN4/QZJIEbqohALSgwKdnksuY3k5Addp5lg8sVoVcQ==", - "license": "Apache-2.0", - "dependencies": { - "safe-buffer": "^5.0.1" - } - }, "node_modules/ee-first": { "version": "1.1.1", "resolved": "https://registry.npmjs.org/ee-first/-/ee-first-1.1.1.tgz", @@ -539,103 +491,6 @@ "integrity": "sha512-Ks/IoX00TtClbGQr4TWXemAnktAQvYB7HzcCxDGqEZU6oCmb2INHuOoKxbtR+HFkmYWBKv/dOZtGRiAjDhj92g==", "license": "MIT" }, - "node_modules/js-yaml": { - "version": "4.1.1", - "resolved": "https://registry.npmjs.org/js-yaml/-/js-yaml-4.1.1.tgz", - "integrity": "sha512-qQKT4zQxXl8lLwBtHMWwaTcGfFOZviOJet3Oy/xmGk2gZH677CJM9EvtfdSkgWcATZhj/55JZ0rmy3myCT5lsA==", - "license": "MIT", - "dependencies": { - "argparse": "^2.0.1" - }, - "bin": { - "js-yaml": "bin/js-yaml.js" - } - }, - "node_modules/jsonwebtoken": { - "version": "9.0.2", - "resolved": "https://registry.npmjs.org/jsonwebtoken/-/jsonwebtoken-9.0.2.tgz", - "integrity": "sha512-PRp66vJ865SSqOlgqS8hujT5U4AOgMfhrwYIuIhfKaoSCZcirrmASQr8CX7cUg+RMih+hgznrjp99o+W4pJLHQ==", - "license": "MIT", - "dependencies": { - "jws": "^3.2.2", - "lodash.includes": "^4.3.0", - "lodash.isboolean": "^3.0.3", - "lodash.isinteger": "^4.0.4", - "lodash.isnumber": "^3.0.3", - "lodash.isplainobject": "^4.0.6", - "lodash.isstring": "^4.0.1", - "lodash.once": "^4.0.0", - "ms": "^2.1.1", - "semver": "^7.5.4" - }, - "engines": { - "node": ">=12", - "npm": ">=6" - } - }, - "node_modules/jwa": { - "version": "1.4.2", - "resolved": "https://registry.npmjs.org/jwa/-/jwa-1.4.2.tgz", - "integrity": "sha512-eeH5JO+21J78qMvTIDdBXidBd6nG2kZjg5Ohz/1fpa28Z4CcsWUzJ1ZZyFq/3z3N17aZy+ZuBoHljASbL1WfOw==", - "license": "MIT", - "dependencies": { - "buffer-equal-constant-time": "^1.0.1", - "ecdsa-sig-formatter": "1.0.11", - "safe-buffer": "^5.0.1" - } - }, - "node_modules/jws": { - "version": "3.2.2", - "resolved": "https://registry.npmjs.org/jws/-/jws-3.2.2.tgz", - "integrity": "sha512-YHlZCB6lMTllWDtSPHz/ZXTsi8S00usEV6v1tjq8tOUZzw7DpSDWVXjXDre6ed1w/pd495ODpHZYSdkRTsa0HA==", - "license": "MIT", - "dependencies": { - "jwa": "^1.4.1", - "safe-buffer": "^5.0.1" - } - }, - "node_modules/lodash.includes": { - "version": "4.3.0", - "resolved": "https://registry.npmjs.org/lodash.includes/-/lodash.includes-4.3.0.tgz", - "integrity": "sha512-W3Bx6mdkRTGtlJISOvVD/lbqjTlPPUDTMnlXZFnVwi9NKJ6tiAk6LVdlhZMm17VZisqhKcgzpO5Wz91PCt5b0w==", - "license": "MIT" - }, - "node_modules/lodash.isboolean": { - "version": "3.0.3", - "resolved": "https://registry.npmjs.org/lodash.isboolean/-/lodash.isboolean-3.0.3.tgz", - "integrity": "sha512-Bz5mupy2SVbPHURB98VAcw+aHh4vRV5IPNhILUCsOzRmsTmSQ17jIuqopAentWoehktxGd9e/hbIXq980/1QJg==", - "license": "MIT" - }, - "node_modules/lodash.isinteger": { - "version": "4.0.4", - "resolved": "https://registry.npmjs.org/lodash.isinteger/-/lodash.isinteger-4.0.4.tgz", - "integrity": "sha512-DBwtEWN2caHQ9/imiNeEA5ys1JoRtRfY3d7V9wkqtbycnAmTvRRmbHKDV4a0EYc678/dia0jrte4tjYwVBaZUA==", - "license": "MIT" - }, - "node_modules/lodash.isnumber": { - "version": "3.0.3", - "resolved": "https://registry.npmjs.org/lodash.isnumber/-/lodash.isnumber-3.0.3.tgz", - "integrity": "sha512-QYqzpfwO3/CWf3XP+Z+tkQsfaLL/EnUlXWVkIk5FUPc4sBdTehEqZONuyRt2P67PXAk+NXmTBcc97zw9t1FQrw==", - "license": "MIT" - }, - "node_modules/lodash.isplainobject": { - "version": "4.0.6", - "resolved": "https://registry.npmjs.org/lodash.isplainobject/-/lodash.isplainobject-4.0.6.tgz", - "integrity": "sha512-oSXzaWypCMHkPC3NvBEaPHf0KsA5mvPrOPgQWDsbg8n7orZ290M0BmC/jgRZ4vcJ6DTAhjrsSYgdsW/F+MFOBA==", - "license": "MIT" - }, - "node_modules/lodash.isstring": { - "version": "4.0.1", - "resolved": "https://registry.npmjs.org/lodash.isstring/-/lodash.isstring-4.0.1.tgz", - "integrity": "sha512-0wJxfxH1wgO3GrbuP+dTTk7op+6L41QCXbGINEmD+ny/G/eCqGzxyCsh7159S+mgDDcoarnBw6PC1PS5+wUGgw==", - "license": "MIT" - }, - "node_modules/lodash.once": { - "version": "4.1.1", - "resolved": "https://registry.npmjs.org/lodash.once/-/lodash.once-4.1.1.tgz", - "integrity": "sha512-Sb487aTOCr9drQVL8pIxOzVhafOjZN9UU54hiN8PU3uAiSV7lx1yYNpbNmex2PK6dSJoNTSJUUswT651yww3Mg==", - "license": "MIT" - }, "node_modules/long": { "version": "5.3.2", "resolved": "https://registry.npmjs.org/long/-/long-5.3.2.tgz", @@ -768,15 +623,6 @@ "node": ">= 0.6" } }, - "node_modules/object-assign": { - "version": "4.1.1", - "resolved": "https://registry.npmjs.org/object-assign/-/object-assign-4.1.1.tgz", - "integrity": "sha512-rJgTQnkUnH1sFw8yT6VSU3zD3sWmu6sZhIseY8VX+GRu3P6F7Fu+JNDoXfklElbLJSnc3FUQHVe4cU5hj+BcUg==", - "license": "MIT", - "engines": { - "node": ">=0.10.0" - } - }, "node_modules/object-inspect": { "version": "1.13.4", "resolved": "https://registry.npmjs.org/object-inspect/-/object-inspect-1.13.4.tgz", @@ -897,44 +743,12 @@ "node": ">= 18" } }, - "node_modules/safe-buffer": { - "version": "5.2.1", - "resolved": "https://registry.npmjs.org/safe-buffer/-/safe-buffer-5.2.1.tgz", - "integrity": "sha512-rp3So07KcdmmKbGvgaNxQSJr7bGVSVk5S9Eq1F+ppbRo70+YeaDxkw5Dd8NPN+GD6bjnYm2VuPuCXmpuYvmCXQ==", - "funding": [ - { - "type": "github", - "url": "https://github.com/sponsors/feross" - }, - { - "type": "patreon", - "url": "https://www.patreon.com/feross" - }, - { - "type": "consulting", - "url": "https://feross.org/support" - } - ], - "license": "MIT" - }, "node_modules/safer-buffer": { "version": "2.1.2", "resolved": "https://registry.npmjs.org/safer-buffer/-/safer-buffer-2.1.2.tgz", "integrity": "sha512-YZo3K82SD7Riyi0E1EQPojLz7kpepnSQI9IyPbHHg1XXXevb5dJI7tpyN2ADxGcQbHG7vcyRHk0cbwqcQriUtg==", "license": "MIT" }, - "node_modules/semver": { - "version": "7.7.3", - "resolved": "https://registry.npmjs.org/semver/-/semver-7.7.3.tgz", - "integrity": "sha512-SdsKMrI9TdgjdweUSR9MweHA4EJ8YxHn8DFaDisvhVlUOe4BF1tLD7GAj0lIqWVl+dPb/rExr0Btby5loQm20Q==", - "license": "ISC", - "bin": { - "semver": "bin/semver.js" - }, - "engines": { - "node": ">=10" - } - }, "node_modules/send": { "version": "1.2.0", "resolved": "https://registry.npmjs.org/send/-/send-1.2.0.tgz", diff --git a/package.json b/package.json index fa6884d..02f0bcf 100644 --- a/package.json +++ b/package.json @@ -10,13 +10,8 @@ "license": "ISC", "description": "", "dependencies": { - "bcryptjs": "^3.0.3", - "body-parser": "^2.2.1", - "cors": "^2.8.5", "dotenv": "^17.2.3", "express": "^5.1.0", - "js-yaml": "^4.1.1", - "jsonwebtoken": "^9.0.2", "mysql2": "^3.15.3", "ws": "^8.18.3" } diff --git a/public/index.html b/public/index.html index 5518eab..3d1e1c6 100644 --- a/public/index.html +++ b/public/index.html @@ -1095,7 +1095,7 @@ let workers = []; let allExecutions = []; // Store all loaded executions for filtering let compareMode = false; - let selectedExecutions = []; + let selectedExecutions = new Set(); async function loadUser() { try { @@ -1104,10 +1104,10 @@ currentUser = await response.json(); document.getElementById('userInfo').innerHTML = ` -
${currentUser.name}
- -
${currentUser.groups.map(g => - `${g}` +
${escapeHtml(currentUser.name || '')}
+ +
${(currentUser.groups || []).map(g => + `${escapeHtml(g)}` ).join('')}
`; return true; @@ -1593,7 +1593,7 @@ const fullHtml = filtered.length === 0 ? '
No executions match your filters
' : filtered.map(e => { - const isSelected = selectedExecutions.includes(e.id); + const isSelected = selectedExecutions.has(e.id); const clickHandler = compareMode ? `toggleExecutionSelection('${e.id}')` : `viewExecution('${e.id}')`; const selectedStyle = isSelected ? 'background: rgba(255, 176, 0, 0.2); border-left-width: 5px; border-left-color: var(--terminal-amber);' : ''; @@ -1625,7 +1625,7 @@ function toggleCompareMode() { compareMode = !compareMode; - selectedExecutions = []; + selectedExecutions = new Set(); const btn = document.getElementById('compareModeBtn'); const compareBtn = document.getElementById('compareBtn'); @@ -1649,31 +1649,29 @@ } function toggleExecutionSelection(executionId) { - const index = selectedExecutions.indexOf(executionId); - - if (index > -1) { - selectedExecutions.splice(index, 1); + if (selectedExecutions.has(executionId)) { + selectedExecutions.delete(executionId); } else { - if (selectedExecutions.length >= 5) { + if (selectedExecutions.size >= 5) { showTerminalNotification('Maximum 5 executions can be compared', 'error'); return; } - selectedExecutions.push(executionId); + selectedExecutions.add(executionId); } renderFilteredExecutions(); // Update compare button text const compareBtn = document.getElementById('compareBtn'); - if (selectedExecutions.length >= 2) { - compareBtn.textContent = `[ ⚖️ Compare Selected (${selectedExecutions.length}) ]`; + if (selectedExecutions.size >= 2) { + compareBtn.textContent = `[ ⚖️ Compare Selected (${selectedExecutions.size}) ]`; } else { compareBtn.textContent = '[ ⚖️ Compare Selected ]'; } } async function compareSelectedExecutions() { - if (selectedExecutions.length < 2) { + if (selectedExecutions.size < 2) { showTerminalNotification('Please select at least 2 executions to compare', 'error'); return; } @@ -1837,7 +1835,7 @@ if (!confirm('Delete all completed and failed executions?')) return; try { - const response = await fetch('/api/executions?limit=1000'); // Get all executions + const response = await fetch('/api/executions?limit=9999'); // Get all executions const data = await response.json(); const executions = data.executions || data; // Handle new pagination format @@ -1911,10 +1909,10 @@ ${p.required ? 'required' : ''}>
`).join(''); document.getElementById('paramModal').style.display = 'flex'; - // Focus first input; Enter key submits - form.querySelectorAll('input').forEach(inp => { - inp.addEventListener('keydown', e => { if (e.key === 'Enter') submitParamForm(); }); - }); + // Focus first input; Enter key submits — use single delegated listener to avoid duplicates + if (form._keydownHandler) form.removeEventListener('keydown', form._keydownHandler); + form._keydownHandler = (e) => { if (e.key === 'Enter' && e.target.tagName === 'INPUT') submitParamForm(); }; + form.addEventListener('keydown', form._keydownHandler); const first = form.querySelector('input'); if (first) setTimeout(() => first.focus(), 50); } @@ -2052,7 +2050,7 @@ return `
[${timestamp}]
-
▶️ Step ${log.step}: ${log.step_name}
+
▶️ Step ${log.step}: ${escapeHtml(log.step_name || '')}
`; } @@ -2061,7 +2059,7 @@ return `
[${timestamp}]
-
✓ Step ${log.step} Completed: ${log.step_name}
+
✓ Step ${log.step} Completed: ${escapeHtml(log.step_name || '')}
`; } @@ -2070,7 +2068,7 @@ return `
[${timestamp}]
-
⏳ Waiting ${log.duration} seconds...
+
⏳ Waiting ${escapeHtml(String(log.duration || 0))} seconds...
`; } @@ -2093,7 +2091,7 @@
[${timestamp}]
⚠️ Worker Offline
-
Worker ID: ${log.worker_id}
+
Worker ID: ${escapeHtml(log.worker_id || '')}
`; @@ -2583,7 +2581,7 @@
✓ Command sent successfully!
- Execution ID: ${data.execution_id} + Execution ID: ${escapeHtml(String(data.execution_id || ''))}
Check the Executions tab to see the results @@ -2697,9 +2695,12 @@ function switchTab(tabName) { document.querySelectorAll('.tab').forEach(t => t.classList.remove('active')); document.querySelectorAll('.tab-content').forEach(c => c.classList.remove('active')); - - event.target.classList.add('active'); - document.getElementById(tabName).classList.add('active'); + + // Find the button by its onclick attribute rather than relying on bare `event` + const tabBtn = document.querySelector(`.tab[onclick*="'${tabName}'"]`); + if (tabBtn) tabBtn.classList.add('active'); + const tabContent = document.getElementById(tabName); + if (tabContent) tabContent.classList.add('active'); } async function refreshData() { @@ -2901,8 +2902,23 @@ console.log('WebSocket closed, reconnecting...'); setTimeout(connectWebSocket, 5000); }; + + ws.onerror = (error) => { + console.error('[WebSocket] Connection error:', error); + }; } + // Close any open modal on ESC key + document.addEventListener('keydown', (e) => { + if (e.key === 'Escape') { + document.querySelectorAll('.modal').forEach(modal => { + if (modal.style.display && modal.style.display !== 'none') { + modal.style.display = 'none'; + } + }); + } + }); + // Initialize loadUser().then((success) => { if (success) { diff --git a/server.js b/server.js index dfc7405..4ce01ac 100644 --- a/server.js +++ b/server.js @@ -3,6 +3,7 @@ const http = require('http'); const WebSocket = require('ws'); const mysql = require('mysql2/promise'); const crypto = require('crypto'); +const vm = require('vm'); require('dotenv').config(); const app = express(); @@ -13,11 +14,6 @@ const wss = new WebSocket.Server({ server }); app.use(express.json()); app.use(express.static('public')); -// UUID generator -function generateUUID() { - return crypto.randomUUID(); -} - // Database pool const pool = mysql.createPool({ host: process.env.DB_HOST, @@ -26,7 +22,7 @@ const pool = mysql.createPool({ password: process.env.DB_PASSWORD, database: process.env.DB_NAME, waitForConnections: true, - connectionLimit: 10, + connectionLimit: 50, queueLimit: 0 }); @@ -107,6 +103,22 @@ async function initDatabase() { ) `); + // Recover stale executions from a previous server crash + const [staleExecs] = await connection.query("SELECT id FROM executions WHERE status = 'running'"); + if (staleExecs.length > 0) { + for (const exec of staleExecs) { + await connection.query( + "UPDATE executions SET status = 'failed', completed_at = NOW() WHERE id = ?", + [exec.id] + ); + await connection.query( + "UPDATE executions SET logs = JSON_ARRAY_APPEND(COALESCE(logs, '[]'), '$', CAST(? AS JSON)) WHERE id = ?", + [JSON.stringify({ action: 'server_restart_recovery', message: 'Execution marked failed due to server restart', timestamp: new Date().toISOString() }), exec.id] + ); + } + console.log(`[Recovery] Marked ${staleExecs.length} stale execution(s) as failed`); + } + console.log('Database tables initialized successfully'); } catch (error) { console.error('Database initialization error:', error); @@ -119,7 +131,7 @@ async function initDatabase() { // Auto-cleanup old executions (runs hourly) async function cleanupOldExecutions() { try { - const retentionDays = parseInt(process.env.EXECUTION_RETENTION_DAYS) || 1; + const retentionDays = parseInt(process.env.EXECUTION_RETENTION_DAYS) || 30; const [result] = await pool.query( `DELETE FROM executions WHERE status IN ('completed', 'failed') @@ -149,15 +161,28 @@ async function processScheduledCommands() { ); for (const schedule of schedules) { + // Prevent overlapping execution — skip if a previous run is still active + const [runningExecs] = await pool.query( + "SELECT id FROM executions WHERE started_by = ? AND status = 'running'", + [`scheduler:${schedule.name}`] + ); + if (runningExecs.length > 0) { + console.log(`[Scheduler] Skipping "${schedule.name}" - previous execution still running`); + continue; + } + console.log(`[Scheduler] Running scheduled command: ${schedule.name}`); - const workerIds = JSON.parse(schedule.worker_ids); + // Handle both string (raw SQL) and object (auto-parsed by MySQL2 JSON column) + const workerIds = typeof schedule.worker_ids === 'string' + ? JSON.parse(schedule.worker_ids) + : schedule.worker_ids; // Execute command on each worker for (const workerId of workerIds) { const workerWs = workers.get(workerId); if (workerWs && workerWs.readyState === WebSocket.OPEN) { - const executionId = generateUUID(); + const executionId = crypto.randomUUID(); // Create execution record await pool.query( @@ -185,7 +210,13 @@ async function processScheduledCommands() { } // Update last_run and calculate next_run - const nextRun = calculateNextRun(schedule.schedule_type, schedule.schedule_value); + let nextRun; + try { + nextRun = calculateNextRun(schedule.schedule_type, schedule.schedule_value); + } catch (err) { + console.error(`[Scheduler] Invalid schedule config for "${schedule.name}": ${err.message}`); + continue; + } await pool.query( 'UPDATE scheduled_commands SET last_run = NOW(), next_run = ? WHERE id = ?', [nextRun, schedule.id] @@ -202,10 +233,12 @@ function calculateNextRun(scheduleType, scheduleValue) { if (scheduleType === 'interval') { // Interval in minutes const minutes = parseInt(scheduleValue); + if (isNaN(minutes) || minutes <= 0) throw new Error(`Invalid interval value: ${scheduleValue}`); return new Date(now.getTime() + minutes * 60000); } else if (scheduleType === 'daily') { // Daily at HH:MM const [hours, minutes] = scheduleValue.split(':').map(Number); + if (isNaN(hours) || isNaN(minutes)) throw new Error(`Invalid daily time format: ${scheduleValue}`); const next = new Date(now); next.setHours(hours, minutes, 0, 0); @@ -217,10 +250,11 @@ function calculateNextRun(scheduleType, scheduleValue) { } else if (scheduleType === 'hourly') { // Every N hours const hours = parseInt(scheduleValue); + if (isNaN(hours) || hours <= 0) throw new Error(`Invalid hourly value: ${scheduleValue}`); return new Date(now.getTime() + hours * 3600000); } - return null; + throw new Error(`Unknown schedule type: ${scheduleType}`); } // Run scheduler every minute @@ -229,11 +263,13 @@ setInterval(processScheduledCommands, 60 * 1000); setTimeout(processScheduledCommands, 5000); // WebSocket connections -const clients = new Set(); +const browserClients = new Set(); // Browser UI connections +const workerClients = new Set(); // Worker agent connections const workers = new Map(); // Map worker_id -> WebSocket connection wss.on('connection', (ws) => { - clients.add(ws); + // Default to browser client until worker_connect identifies it as a worker + browserClients.add(ws); // Handle incoming messages from workers ws.on('message', async (data) => { @@ -269,7 +305,15 @@ wss.on('connection', (ws) => { await updateExecutionStatus(execution_id, finalStatus); } - // Broadcast result to all connected clients + // Resolve any pending event-driven command promise (eliminates DB polling) + if (command_id) { + const resolver = _commandResolvers.get(command_id); + if (resolver) { + resolver({ success, stdout, stderr }); + } + } + + // Broadcast result to browser clients only broadcast({ type: 'command_result', execution_id: execution_id, @@ -314,8 +358,16 @@ wss.on('connection', (ws) => { } if (message.type === 'worker_connect') { - // Handle worker connection - const { worker_id, worker_name } = message; + // Authenticate worker — reject if api_key is provided but wrong + const { worker_id, worker_name, api_key } = message; + if (api_key && api_key !== process.env.WORKER_API_KEY) { + console.warn(`[Security] Worker connection rejected: invalid API key from "${worker_name}"`); + ws.close(4001, 'Unauthorized'); + return; + } + // Move from browser set to worker set + browserClients.delete(ws); + workerClients.add(ws); console.log(`Worker connected: ${worker_name} (${worker_id})`); // Find the database worker ID by name @@ -369,7 +421,8 @@ wss.on('connection', (ws) => { }); ws.on('close', () => { - clients.delete(ws); + browserClients.delete(ws); + workerClients.delete(ws); // Remove worker from workers map when disconnected (both runtime and db IDs) if (ws.workerId) { workers.delete(ws.workerId); @@ -382,29 +435,22 @@ wss.on('connection', (ws) => { }); }); -// Broadcast to all connected clients +// Broadcast to browser clients only (NOT worker agents) function broadcast(data) { - clients.forEach(client => { + browserClients.forEach(client => { if (client.readyState === WebSocket.OPEN) { client.send(JSON.stringify(data)); } }); } -// Helper function to add log entry to execution +// Helper function to add log entry to execution (atomic — no read-modify-write race condition) async function addExecutionLog(executionId, logEntry) { try { - const [execution] = await pool.query('SELECT logs FROM executions WHERE id = ?', [executionId]); - - if (execution.length > 0) { - const logs = typeof execution[0].logs === 'string' ? JSON.parse(execution[0].logs) : execution[0].logs; - logs.push(logEntry); - - await pool.query( - 'UPDATE executions SET logs = ? WHERE id = ?', - [JSON.stringify(logs), executionId] - ); - } + await pool.query( + "UPDATE executions SET logs = JSON_ARRAY_APPEND(COALESCE(logs, '[]'), '$', CAST(? AS JSON)) WHERE id = ?", + [JSON.stringify(logEntry), executionId] + ); } catch (error) { console.error(`[Workflow] Error adding execution log:`, error); } @@ -459,7 +505,7 @@ async function authenticateSSO(req, res, next) { // Store/update user in database try { - const userId = generateUUID(); + const userId = crypto.randomUUID(); await pool.query( `INSERT INTO users (id, username, display_name, email, groups, last_login) VALUES (?, ?, ?, ?, ?, NOW()) @@ -512,10 +558,11 @@ function applyParams(command, params) { } // Evaluate a condition string against execution state and params. +// Uses vm.runInNewContext with a timeout to avoid arbitrary code execution risk. function evalCondition(condition, state, params) { try { - // eslint-disable-next-line no-new-func - return !!new Function('state', 'params', `return !!(${condition})`)(state, params); + const context = vm.createContext({ state, params, promptResponse: state.promptResponse }); + return !!vm.runInNewContext(condition, context, { timeout: 100 }); } catch (e) { return false; } @@ -527,6 +574,7 @@ const _executionState = new Map(); // executionId → { params, state } // Pending prompt resolvers — set when a prompt step is waiting for user input. const _executionPrompts = new Map(); // executionId → resolve fn +const _commandResolvers = new Map(); // commandId → resolve fn (event-driven result delivery) async function executeWorkflowSteps(executionId, workflowId, definition, username, params = {}) { _executionState.set(executionId, { params, state: {} }); @@ -724,7 +772,7 @@ async function executeCommandStep(executionId, step, stepNumber, params = {}) { } // Send command to worker - const commandId = generateUUID(); + const commandId = crypto.randomUUID(); await addExecutionLog(executionId, { step: stepNumber, @@ -769,52 +817,21 @@ async function executeCommandStep(executionId, step, stepNumber, params = {}) { } } +// Wait for a command result using event-driven promise resolution. +// The resolver is stored in _commandResolvers and called immediately when +// command_result arrives via WebSocket — no DB polling required. async function waitForCommandResult(executionId, commandId, timeout) { return new Promise((resolve) => { - const startTime = Date.now(); + const timer = setTimeout(() => { + _commandResolvers.delete(commandId); + resolve({ success: false, error: 'Command timeout' }); + }, timeout); - const checkInterval = setInterval(async () => { - try { - // Check if we've received the command result in logs - const [execution] = await pool.query('SELECT logs FROM executions WHERE id = ?', [executionId]); - - if (execution.length > 0) { - const logs = typeof execution[0].logs === 'string' ? JSON.parse(execution[0].logs) : execution[0].logs; - // Find the command_sent entry for this commandId, then look for the next command_result after it. - // (Worker doesn't echo command_id back, so we can't match by command_id directly.) - const sentIdx = logs.findIndex(l => l.command_id === commandId && l.action === 'command_sent'); - const resultLog = sentIdx >= 0 - ? logs.slice(sentIdx + 1).find(l => l.action === 'command_result') - : logs.find(l => l.command_id === commandId && l.action === 'command_result'); - - if (resultLog) { - clearInterval(checkInterval); - resolve({ - success: resultLog.success, - stdout: resultLog.stdout, - stderr: resultLog.stderr - }); - return; - } - } - - // Check timeout - if (Date.now() - startTime > timeout) { - clearInterval(checkInterval); - resolve({ - success: false, - error: 'Command timeout' - }); - } - } catch (error) { - console.error('[Workflow] Error checking command result:', error); - clearInterval(checkInterval); - resolve({ - success: false, - error: error.message - }); - } - }, 500); // Check every 500ms + _commandResolvers.set(commandId, (result) => { + clearTimeout(timer); + _commandResolvers.delete(commandId); + resolve(result); + }); }); } @@ -846,7 +863,7 @@ app.get('/api/workflows/:id', authenticateSSO, async (req, res) => { app.post('/api/workflows', authenticateSSO, async (req, res) => { try { const { name, description, definition } = req.body; - const id = generateUUID(); + const id = crypto.randomUUID(); console.log('[Workflow] Creating workflow:', name); console.log('[Workflow] Definition:', JSON.stringify(definition, null, 2)); @@ -886,7 +903,7 @@ app.delete('/api/workflows/:id', authenticateSSO, async (req, res) => { app.get('/api/workers', authenticateSSO, async (req, res) => { try { - const [rows] = await pool.query('SELECT * FROM workers ORDER BY name'); + const [rows] = await pool.query('SELECT id, name, status, last_heartbeat, metadata FROM workers ORDER BY name'); res.json(rows); } catch (error) { res.status(500).json({ error: error.message }); @@ -923,7 +940,7 @@ app.post('/api/workers/heartbeat', async (req, res) => { app.post('/api/executions', authenticateSSO, async (req, res) => { try { const { workflow_id, params = {} } = req.body; - const id = generateUUID(); + const id = crypto.randomUUID(); // Get workflow definition const [workflows] = await pool.query('SELECT * FROM workflows WHERE id = ?', [workflow_id]); @@ -997,6 +1014,7 @@ app.get('/api/executions', authenticateSSO, async (req, res) => { app.delete('/api/executions/:id', authenticateSSO, async (req, res) => { try { + if (!req.user.isAdmin) return res.status(403).json({ error: 'Admin access required' }); await pool.query('DELETE FROM executions WHERE id = ?', [req.params.id]); broadcast({ type: 'execution_deleted', execution_id: req.params.id }); res.json({ success: true }); @@ -1115,13 +1133,14 @@ app.get('/api/scheduled-commands', authenticateSSO, async (req, res) => { app.post('/api/scheduled-commands', authenticateSSO, async (req, res) => { try { + if (!req.user.isAdmin) return res.status(403).json({ error: 'Admin access required' }); const { name, command, worker_ids, schedule_type, schedule_value } = req.body; if (!name || !command || !worker_ids || !schedule_type || !schedule_value) { return res.status(400).json({ error: 'Missing required fields' }); } - const id = generateUUID(); + const id = crypto.randomUUID(); const nextRun = calculateNextRun(schedule_type, schedule_value); await pool.query( @@ -1139,6 +1158,7 @@ app.post('/api/scheduled-commands', authenticateSSO, async (req, res) => { app.put('/api/scheduled-commands/:id/toggle', authenticateSSO, async (req, res) => { try { + if (!req.user.isAdmin) return res.status(403).json({ error: 'Admin access required' }); const { id } = req.params; const [current] = await pool.query('SELECT enabled FROM scheduled_commands WHERE id = ?', [id]); @@ -1157,6 +1177,7 @@ app.put('/api/scheduled-commands/:id/toggle', authenticateSSO, async (req, res) app.delete('/api/scheduled-commands/:id', authenticateSSO, async (req, res) => { try { + if (!req.user.isAdmin) return res.status(403).json({ error: 'Admin access required' }); const { id } = req.params; await pool.query('DELETE FROM scheduled_commands WHERE id = ?', [id]); res.json({ success: true }); @@ -1178,7 +1199,7 @@ app.post('/api/internal/command', authenticateGandalf, async (req, res) => { return res.status(400).json({ error: 'Worker not connected' }); } - const executionId = generateUUID(); + const executionId = crypto.randomUUID(); await pool.query( 'INSERT INTO executions (id, workflow_id, status, started_by, started_at, logs) VALUES (?, ?, ?, ?, NOW(), ?)', @@ -1241,26 +1262,6 @@ app.get('/health', async (req, res) => { } }); -// Start server -const PORT = process.env.PORT || 8080; -const HOST = process.env.HOST || '0.0.0.0'; - -initDatabase().then(() => { - server.listen(PORT, HOST, () => { - console.log(`PULSE Server running on http://${HOST}:${PORT}`); - console.log(`Connected to MariaDB at ${process.env.DB_HOST}`); - console.log(`Authentication: Authelia SSO`); - console.log(`Worker API Key configured: ${process.env.WORKER_API_KEY ? 'Yes' : 'No'}`); - }); -}).catch(err => { - console.error('Failed to start server:', err); - process.exit(1); -}); - - -// ============================================ -// WORKFLOW EXECUTION ENGINE - // Get execution details with logs app.get('/api/executions/:id', authenticateSSO, async (req, res) => { try { @@ -1270,7 +1271,7 @@ app.get('/api/executions/:id', authenticateSSO, async (req, res) => { } const execution = rows[0]; - const parsedLogs = JSON.parse(execution.logs || '[]'); + const parsedLogs = typeof execution.logs === 'string' ? JSON.parse(execution.logs || '[]') : (execution.logs || []); const waitingForInput = _executionPrompts.has(req.params.id); let pendingPrompt = null; if (waitingForInput) { @@ -1299,7 +1300,7 @@ app.delete('/api/workers/:id', authenticateSSO, async (req, res) => { if (!req.user.isAdmin) { return res.status(403).json({ error: 'Admin access required' }); } - + await pool.query('DELETE FROM workers WHERE id = ?', [req.params.id]); res.json({ success: true }); broadcast({ type: 'worker_deleted', worker_id: req.params.id }); @@ -1308,11 +1309,11 @@ app.delete('/api/workers/:id', authenticateSSO, async (req, res) => { } }); -// Send direct command to specific worker (for testing) +// Send direct command to specific worker app.post('/api/workers/:id/command', authenticateSSO, async (req, res) => { try { const { command } = req.body; - const executionId = generateUUID(); + const executionId = crypto.randomUUID(); const workerId = req.params.id; // Create execution record in database @@ -1351,71 +1352,18 @@ app.post('/api/workers/:id/command', authenticateSSO, async (req, res) => { } }); -// ============================================ -// EXAMPLE WORKFLOW DEFINITIONS -// ============================================ +// Start server +const PORT = process.env.PORT || 8080; +const HOST = process.env.HOST || '0.0.0.0'; -// Example 1: Simple command execution -const simpleWorkflow = { - name: "Update System Packages", - description: "Update all packages on target servers", - steps: [ - { - name: "Update package list", - type: "execute", - targets: ["all"], - command: "apt update" - }, - { - name: "User Approval", - type: "prompt", - message: "Packages updated. Proceed with upgrade?", - options: ["Yes", "No"] - }, - { - name: "Upgrade packages", - type: "execute", - targets: ["all"], - command: "apt upgrade -y", - condition: "promptResponse === 'Yes'" - } - ] -}; - -// Example 2: Complex workflow with conditions -const backupWorkflow = { - name: "Backup and Verify", - description: "Create backup and verify integrity", - steps: [ - { - name: "Create backup", - type: "execute", - targets: ["all"], - command: "tar -czf /tmp/backup-$(date +%Y%m%d).tar.gz /opt/pulse-worker" - }, - { - name: "Wait for backup", - type: "wait", - duration: 5000 - }, - { - name: "Verify backup", - type: "execute", - targets: ["all"], - command: "tar -tzf /tmp/backup-*.tar.gz > /dev/null && echo 'Backup OK' || echo 'Backup FAILED'" - }, - { - name: "Cleanup decision", - type: "prompt", - message: "Backup complete. Delete old backups?", - options: ["Yes", "No", "Cancel"] - }, - { - name: "Cleanup old backups", - type: "execute", - targets: ["all"], - command: "find /tmp -name 'backup-*.tar.gz' -mtime +7 -delete", - condition: "promptResponse === 'Yes'" - } - ] -}; \ No newline at end of file +initDatabase().then(() => { + server.listen(PORT, HOST, () => { + console.log(`PULSE Server running on http://${HOST}:${PORT}`); + console.log(`Connected to MariaDB at ${process.env.DB_HOST}`); + console.log(`Authentication: Authelia SSO`); + console.log(`Worker API Key configured: ${process.env.WORKER_API_KEY ? 'Yes' : 'No'}`); + }); +}).catch(err => { + console.error('Failed to start server:', err); + process.exit(1); +});