diff --git a/package-lock.json b/package-lock.json index 4ca1c48..8818f17 100644 --- a/package-lock.json +++ b/package-lock.json @@ -9,8 +9,10 @@ "version": "1.0.0", "license": "ISC", "dependencies": { + "cron-parser": "^5.5.0", "dotenv": "^17.2.3", "express": "^5.1.0", + "express-rate-limit": "^8.3.1", "mysql2": "^3.15.3", "ws": "^8.18.3" } @@ -139,6 +141,17 @@ "node": ">=6.6.0" } }, + "node_modules/cron-parser": { + "version": "5.5.0", + "resolved": "https://registry.npmjs.org/cron-parser/-/cron-parser-5.5.0.tgz", + "integrity": "sha512-oML4lKUXxizYswqmxuOCpgFS8BNUJpIu6k/2HVHyaL8Ynnf3wdf9tkns0yRdJLSIjkJ+b0DXHMZEHGpMwjnPww==", + "dependencies": { + "luxon": "^3.7.1" + }, + "engines": { + "node": ">=18" + } + }, "node_modules/debug": { "version": "4.4.3", "resolved": "https://registry.npmjs.org/debug/-/debug-4.4.3.tgz", @@ -302,6 +315,23 @@ "url": "https://opencollective.com/express" } }, + "node_modules/express-rate-limit": { + "version": "8.3.1", + "resolved": "https://registry.npmjs.org/express-rate-limit/-/express-rate-limit-8.3.1.tgz", + "integrity": "sha512-D1dKN+cmyPWuvB+G2SREQDzPY1agpBIcTa9sJxOPMCNeH3gwzhqJRDWCXW3gg0y//+LQ/8j52JbMROWyrKdMdw==", + "dependencies": { + "ip-address": "10.1.0" + }, + "engines": { + "node": ">= 16" + }, + "funding": { + "url": "https://github.com/sponsors/express-rate-limit" + }, + "peerDependencies": { + "express": ">= 4.11" + } + }, "node_modules/finalhandler": { "version": "2.1.0", "resolved": "https://registry.npmjs.org/finalhandler/-/finalhandler-2.1.0.tgz", @@ -470,6 +500,14 @@ "integrity": "sha512-k/vGaX4/Yla3WzyMCvTQOXYeIHvqOKtnqBduzTHpzpQZzAskKMhZ2K+EnBiSM9zGSoIFeMpXKxa4dYeZIQqewQ==", "license": "ISC" }, + "node_modules/ip-address": { + "version": "10.1.0", + "resolved": "https://registry.npmjs.org/ip-address/-/ip-address-10.1.0.tgz", + "integrity": "sha512-XXADHxXmvT9+CRxhXg56LJovE+bmWnEWB78LB83VZTprKTmaC5QfruXocxzTZ2Kl0DNwKuBdlIhjL8LeY8Sf8Q==", + "engines": { + "node": ">= 12" + } + }, "node_modules/ipaddr.js": { "version": "1.9.1", "resolved": "https://registry.npmjs.org/ipaddr.js/-/ipaddr.js-1.9.1.tgz", @@ -521,6 +559,14 @@ "url": "https://github.com/sponsors/wellwelwel" } }, + "node_modules/luxon": { + "version": "3.7.2", + "resolved": "https://registry.npmjs.org/luxon/-/luxon-3.7.2.tgz", + "integrity": "sha512-vtEhXh/gNjI9Yg1u4jX/0YVPMvxzHuGgCm6tC5kZyb08yjGWGnqAjGJvcXbqQR2P3MyMEFnRbpcdFS6PBcLqew==", + "engines": { + "node": ">=12" + } + }, "node_modules/math-intrinsics": { "version": "1.1.0", "resolved": "https://registry.npmjs.org/math-intrinsics/-/math-intrinsics-1.1.0.tgz", diff --git a/package.json b/package.json index 02f0bcf..a9bf370 100644 --- a/package.json +++ b/package.json @@ -10,8 +10,10 @@ "license": "ISC", "description": "", "dependencies": { + "cron-parser": "^5.5.0", "dotenv": "^17.2.3", "express": "^5.1.0", + "express-rate-limit": "^8.3.1", "mysql2": "^3.15.3", "ws": "^8.18.3" } diff --git a/public/index.html b/public/index.html index 3d1e1c6..e29577f 100644 --- a/public/index.html +++ b/public/index.html @@ -975,6 +975,8 @@ } ] } + + @@ -1022,6 +1024,10 @@

▶ RUN WORKFLOW

Fill in required parameters

+
+ + +
@@ -1871,17 +1879,18 @@ } else { const name = document.querySelector(`[onclick="executeWorkflow('${workflowId}')"]`) ?.closest('.workflow-item')?.querySelector('.workflow-name')?.textContent || 'this workflow'; - if (!confirm(`Execute: ${name}?`)) return; - await startExecution(workflowId, {}); + const choice = confirm(`Execute: ${name}?\n\nClick OK to run normally, or Cancel to abort.\n(Use the workflow's Run button with dry-run checkbox for a dry run.)`); + if (!choice) return; + await startExecution(workflowId, {}, false); } } - async function startExecution(workflowId, params) { + async function startExecution(workflowId, params, dryRun = false) { try { const response = await fetch('/api/executions', { method: 'POST', headers: { 'Content-Type': 'application/json' }, - body: JSON.stringify({ workflow_id: workflowId, params }) + body: JSON.stringify({ workflow_id: workflowId, params, dry_run: dryRun }) }); if (response.ok) { switchTab('executions'); @@ -1919,6 +1928,7 @@ function closeParamModal() { document.getElementById('paramModal').style.display = 'none'; + document.getElementById('paramDryRun').checked = false; _pendingExecWorkflowId = null; } @@ -1937,8 +1947,10 @@ } if (val) params[p.name] = val; } + const dryRun = document.getElementById('paramDryRun').checked; + const wfId = _pendingExecWorkflowId; closeParamModal(); - await startExecution(_pendingExecWorkflowId, params); + await startExecution(wfId, params, dryRun); } async function viewExecution(executionId) { @@ -2446,6 +2458,7 @@ document.getElementById('editWorkflowName').value = wf.name; document.getElementById('editWorkflowDescription').value = wf.description || ''; document.getElementById('editWorkflowDefinition').value = JSON.stringify(wf.definition, null, 2); + document.getElementById('editWorkflowWebhookUrl').value = wf.webhook_url || ''; document.getElementById('editWorkflowError').style.display = 'none'; document.getElementById('editWorkflowModal').classList.add('show'); } catch (error) { @@ -2459,6 +2472,7 @@ const name = document.getElementById('editWorkflowName').value.trim(); const description = document.getElementById('editWorkflowDescription').value.trim(); const definitionText = document.getElementById('editWorkflowDefinition').value; + const webhook_url = document.getElementById('editWorkflowWebhookUrl').value.trim() || null; const errorEl = document.getElementById('editWorkflowError'); if (!name) { @@ -2481,7 +2495,7 @@ const response = await fetch(`/api/workflows/${id}`, { method: 'PUT', headers: { 'Content-Type': 'application/json' }, - body: JSON.stringify({ name, description, definition }) + body: JSON.stringify({ name, description, definition, webhook_url }) }); if (response.ok) { @@ -2507,12 +2521,13 @@ const name = document.getElementById('workflowName').value; const description = document.getElementById('workflowDescription').value; const definitionText = document.getElementById('workflowDefinition').value; - + const webhook_url = document.getElementById('workflowWebhookUrl').value.trim() || null; + if (!name || !definitionText) { alert('Name and definition are required'); return; } - + let definition; try { definition = JSON.parse(definitionText); @@ -2525,7 +2540,7 @@ const response = await fetch('/api/workflows', { method: 'POST', headers: { 'Content-Type': 'application/json' }, - body: JSON.stringify({ name, description, definition }) + body: JSON.stringify({ name, description, definition, webhook_url }) }); if (response.ok) { diff --git a/server.js b/server.js index 4ce01ac..eaa2efc 100644 --- a/server.js +++ b/server.js @@ -4,6 +4,8 @@ const WebSocket = require('ws'); const mysql = require('mysql2/promise'); const crypto = require('crypto'); const vm = require('vm'); +const rateLimit = require('express-rate-limit'); +const cronParser = require('cron-parser'); require('dotenv').config(); const app = express(); @@ -14,6 +16,23 @@ const wss = new WebSocket.Server({ server }); app.use(express.json()); app.use(express.static('public')); +// Rate limiting +const apiLimiter = rateLimit({ + windowMs: 15 * 60 * 1000, // 15 minutes + max: 300, + standardHeaders: true, + legacyHeaders: false, + message: { error: 'Too many requests, please try again later' } +}); +const executionLimiter = rateLimit({ + windowMs: 60 * 1000, // 1 minute + max: 20, + standardHeaders: true, + legacyHeaders: false, + message: { error: 'Too many execution requests, please slow down' } +}); +app.use('/api/', apiLimiter); + // Database pool const pool = mysql.createPool({ host: process.env.DB_HOST, @@ -63,12 +82,17 @@ async function initDatabase() { name VARCHAR(255) NOT NULL, description TEXT, definition JSON NOT NULL, + webhook_url VARCHAR(500) NULL, created_by VARCHAR(255), created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, INDEX idx_name (name) ) `); + // Add webhook_url to existing workflows table if missing + await connection.query(` + ALTER TABLE workflows ADD COLUMN IF NOT EXISTS webhook_url VARCHAR(500) NULL + `).catch(() => {}); await connection.query(` CREATE TABLE IF NOT EXISTS executions ( @@ -241,17 +265,17 @@ function calculateNextRun(scheduleType, scheduleValue) { if (isNaN(hours) || isNaN(minutes)) throw new Error(`Invalid daily time format: ${scheduleValue}`); const next = new Date(now); next.setHours(hours, minutes, 0, 0); - - // If time has passed today, schedule for tomorrow - if (next <= now) { - next.setDate(next.getDate() + 1); - } + if (next <= now) next.setDate(next.getDate() + 1); return next; } else if (scheduleType === 'hourly') { // Every N hours const hours = parseInt(scheduleValue); if (isNaN(hours) || hours <= 0) throw new Error(`Invalid hourly value: ${scheduleValue}`); return new Date(now.getTime() + hours * 3600000); + } else if (scheduleType === 'cron') { + // Full cron expression e.g. "0 2 * * 0" (Sundays at 2am) + const interval = cronParser.parseExpression(scheduleValue, { currentDate: now }); + return interval.next().toDate(); } throw new Error(`Unknown schedule type: ${scheduleType}`); @@ -471,11 +495,54 @@ async function updateExecutionStatus(executionId, status) { }); console.log(`[Workflow] Execution ${executionId} status updated to: ${status}`); + + // Fire webhook if configured on the workflow and execution reached a terminal state + if (status === 'completed' || status === 'failed') { + fireWebhook(executionId, status).catch(err => { + console.error(`[Webhook] Delivery error for execution ${executionId}:`, err.message); + }); + } } catch (error) { console.error(`[Workflow] Error updating execution status:`, error); } } +// Fire optional webhook on workflow completion/failure +async function fireWebhook(executionId, status) { + const [rows] = await pool.query( + `SELECT e.id, e.workflow_id, e.started_by, e.started_at, e.completed_at, + w.webhook_url, w.name as workflow_name + FROM executions e + LEFT JOIN workflows w ON e.workflow_id = w.id + WHERE e.id = ?`, + [executionId] + ); + if (!rows.length || !rows[0].webhook_url) return; + + const exec = rows[0]; + const payload = { + execution_id: executionId, + workflow_id: exec.workflow_id, + workflow_name: exec.workflow_name, + status, + started_by: exec.started_by, + started_at: exec.started_at, + completed_at: exec.completed_at, + timestamp: new Date().toISOString() + }; + + const url = new URL(exec.webhook_url); + const body = JSON.stringify(payload); + const options = { + method: 'POST', + headers: { 'Content-Type': 'application/json', 'User-Agent': 'PULSE-Webhook/1.0' } + }; + + // Use native fetch (Node 18+) + const response = await fetch(exec.webhook_url, { ...options, body, signal: AbortSignal.timeout(10000) }); + console.log(`[Webhook] ${url.hostname} responded ${response.status} for execution ${executionId}`); +} + // Authelia SSO Middleware async function authenticateSSO(req, res, next) { // Check for Authelia headers @@ -576,11 +643,15 @@ const _executionState = new Map(); // executionId → { params, state } const _executionPrompts = new Map(); // executionId → resolve fn const _commandResolvers = new Map(); // commandId → resolve fn (event-driven result delivery) -async function executeWorkflowSteps(executionId, workflowId, definition, username, params = {}) { +async function executeWorkflowSteps(executionId, workflowId, definition, username, params = {}, dryRun = false) { _executionState.set(executionId, { params, state: {} }); + // Global execution timeout + const maxMinutes = parseInt(process.env.EXECUTION_MAX_MINUTES) || 60; + const executionDeadline = Date.now() + maxMinutes * 60000; + try { - console.log(`[Workflow] Starting execution ${executionId} for workflow ${workflowId}`); + console.log(`[Workflow] Starting execution ${executionId} for workflow ${workflowId}${dryRun ? ' [DRY RUN]' : ''}`); const steps = definition.steps || []; @@ -591,6 +662,17 @@ async function executeWorkflowSteps(executionId, workflowId, definition, usernam let currentIndex = 0; while (currentIndex < steps.length) { + // Enforce global execution timeout + if (Date.now() > executionDeadline) { + await addExecutionLog(executionId, { + action: 'execution_timeout', + message: `Execution exceeded maximum runtime of ${maxMinutes} minutes`, + timestamp: new Date().toISOString() + }); + await updateExecutionStatus(executionId, 'failed'); + return; + } + const step = steps[currentIndex]; const execState = _executionState.get(executionId); const stepLabel = step.name || step.id || `Step ${currentIndex + 1}`; @@ -608,10 +690,19 @@ async function executeWorkflowSteps(executionId, workflowId, definition, usernam if (step.type === 'execute') { const condOk = !step.condition || evalCondition(step.condition, execState.state, execState.params); if (condOk) { - const success = await executeCommandStep(executionId, step, currentIndex + 1, params); - if (!success) { - await updateExecutionStatus(executionId, 'failed'); - return; + if (dryRun) { + await addExecutionLog(executionId, { + step: currentIndex + 1, step_name: stepLabel, action: 'dry_run_skipped', + command: step.command, targets: step.targets || ['all'], + message: '[DRY RUN] Command not executed', + timestamp: new Date().toISOString() + }); + } else { + const success = await executeCommandStep(executionId, step, currentIndex + 1, params); + if (!success) { + await updateExecutionStatus(executionId, 'failed'); + return; + } } } else { await addExecutionLog(executionId, { @@ -862,20 +953,17 @@ app.get('/api/workflows/:id', authenticateSSO, async (req, res) => { app.post('/api/workflows', authenticateSSO, async (req, res) => { try { - const { name, description, definition } = req.body; + const { name, description, definition, webhook_url } = req.body; const id = crypto.randomUUID(); console.log('[Workflow] Creating workflow:', name); - console.log('[Workflow] Definition:', JSON.stringify(definition, null, 2)); await pool.query( - 'INSERT INTO workflows (id, name, description, definition, created_by) VALUES (?, ?, ?, ?, ?)', - [id, name, description, JSON.stringify(definition), req.user.username] + 'INSERT INTO workflows (id, name, description, definition, webhook_url, created_by) VALUES (?, ?, ?, ?, ?, ?)', + [id, name, description, JSON.stringify(definition), webhook_url || null, req.user.username] ); - console.log('[Workflow] Successfully inserted workflow:', id); - - res.json({ id, name, description, definition }); + res.json({ id, name, description, definition, webhook_url: webhook_url || null }); console.log('[Workflow] Broadcasting workflow_created'); broadcast({ type: 'workflow_created', workflow_id: id }); @@ -937,9 +1025,9 @@ app.post('/api/workers/heartbeat', async (req, res) => { } }); -app.post('/api/executions', authenticateSSO, async (req, res) => { +app.post('/api/executions', executionLimiter, authenticateSSO, async (req, res) => { try { - const { workflow_id, params = {} } = req.body; + const { workflow_id, params = {}, dry_run = false } = req.body; const id = crypto.randomUUID(); // Get workflow definition @@ -968,14 +1056,18 @@ app.post('/api/executions', authenticateSSO, async (req, res) => { [id, workflow_id, 'running', req.user.username, JSON.stringify(initLogs)] ); - broadcast({ type: 'execution_started', execution_id: id, workflow_id }); + if (dry_run) { + broadcast({ type: 'execution_started', execution_id: id, workflow_id, dry_run: true }); + } else { + broadcast({ type: 'execution_started', execution_id: id, workflow_id }); + } // Start workflow execution asynchronously - executeWorkflowSteps(id, workflow_id, definition, req.user.username, params).catch(err => { + executeWorkflowSteps(id, workflow_id, definition, req.user.username, params, dry_run).catch(err => { console.error(`[Workflow] Execution ${id} failed:`, err); }); - res.json({ id, workflow_id, status: 'running' }); + res.json({ id, workflow_id, status: 'running', dry_run: !!dry_run }); } catch (error) { res.status(500).json({ error: error.message }); } @@ -983,28 +1075,66 @@ app.post('/api/executions', authenticateSSO, async (req, res) => { app.get('/api/executions', authenticateSSO, async (req, res) => { try { - const limit = parseInt(req.query.limit) || 50; + const limit = Math.min(parseInt(req.query.limit) || 50, 1000); const offset = parseInt(req.query.offset) || 0; - const hideInternal = req.query.hide_internal === 'true'; - const whereClause = hideInternal - ? "WHERE started_by NOT LIKE 'gandalf:%' AND started_by NOT LIKE 'scheduler:%'" - : ''; + const conditions = []; + const queryParams = []; + + // Existing filter + if (req.query.hide_internal === 'true') { + conditions.push("e.started_by NOT LIKE 'gandalf:%' AND e.started_by NOT LIKE 'scheduler:%'"); + } + // New filters + if (req.query.status) { + conditions.push('e.status = ?'); + queryParams.push(req.query.status); + } + if (req.query.workflow_id) { + conditions.push('e.workflow_id = ?'); + queryParams.push(req.query.workflow_id); + } + if (req.query.started_by) { + conditions.push('e.started_by = ?'); + queryParams.push(req.query.started_by); + } + if (req.query.after) { + conditions.push('e.started_at >= ?'); + queryParams.push(new Date(req.query.after)); + } + if (req.query.before) { + conditions.push('e.started_at <= ?'); + queryParams.push(new Date(req.query.before)); + } + if (req.query.search) { + conditions.push('(w.name LIKE ? OR e.started_by LIKE ?)'); + const term = `%${req.query.search}%`; + queryParams.push(term, term); + } + + const whereClause = conditions.length ? `WHERE ${conditions.join(' AND ')}` : ''; const [rows] = await pool.query( - `SELECT e.*, w.name as workflow_name FROM executions e LEFT JOIN workflows w ON e.workflow_id = w.id ${whereClause} ORDER BY e.started_at DESC LIMIT ? OFFSET ?`, - [limit, offset] + `SELECT e.id, e.workflow_id, e.status, e.started_by, e.started_at, e.completed_at, + w.name as workflow_name + FROM executions e + LEFT JOIN workflows w ON e.workflow_id = w.id + ${whereClause} + ORDER BY e.started_at DESC LIMIT ? OFFSET ?`, + [...queryParams, limit, offset] ); - // Get total count - const [countRows] = await pool.query(`SELECT COUNT(*) as total FROM executions ${whereClause}`); + const [countRows] = await pool.query( + `SELECT COUNT(*) as total FROM executions e LEFT JOIN workflows w ON e.workflow_id = w.id ${whereClause}`, + queryParams + ); const total = countRows[0].total; res.json({ executions: rows, - total: total, - limit: limit, - offset: offset, + total, + limit, + offset, hasMore: offset + rows.length < total }); } catch (error) { @@ -1093,7 +1223,7 @@ app.put('/api/workflows/:id', authenticateSSO, async (req, res) => { if (!req.user.isAdmin) return res.status(403).json({ error: 'Admin only' }); const { id } = req.params; - const { name, description, definition } = req.body; + const { name, description, definition, webhook_url } = req.body; if (!name || !definition) return res.status(400).json({ error: 'name and definition required' }); @@ -1106,8 +1236,8 @@ app.put('/api/workflows/:id', authenticateSSO, async (req, res) => { } const [result] = await pool.query( - 'UPDATE workflows SET name=?, description=?, definition=?, updated_at=NOW() WHERE id=?', - [name, description || '', JSON.stringify(defObj), id] + 'UPDATE workflows SET name=?, description=?, definition=?, webhook_url=?, updated_at=NOW() WHERE id=?', + [name, description || '', JSON.stringify(defObj), webhook_url || null, id] ); if (result.affectedRows === 0) return res.status(404).json({ error: 'Workflow not found' });