Security hardening, bug fixes, and backend improvements

Security:
- validateWebhookUrl() rejects non-http/https and private/internal IPs
- Validate webhook URL on workflow create and update
- Replace error.message in all HTTP 500 responses with 'Internal server error'
- Add requireJSON middleware (HTTP 415 if Content-Type wrong) on POST/PUT routes
- Reject missing API keys in worker heartbeat (not just wrong ones)
- Validate prompt response against allowed options before accepting

Bugs fixed:
- goto infinite loop protection: stepVisits[] counter, fails at GOTO_MAX_VISITS (100)
- wait step: validate duration (no NaN/negative), cap at WAIT_STEP_MAX_MS (24h)
- _executionPrompts now stores {resolve, options} for option validation
- JSON.parse wrapped in try/catch: workflows/:id, executions/:id, internal/executions/:id, POST /api/executions, scheduler worker_ids
- pong handler uses ws.dbWorkerId (set on connect) not message.worker_id
- Worker disconnect now marks worker offline in DB and broadcasts update
- command validation (type + empty check) on POST /api/workers/:id/command
- workflow_id required check on POST /api/executions

Performance & reliability:
- markStaleWorkersOffline() runs every 60s, marks workers without recent heartbeat offline
- Named constants: PROMPT_TIMEOUT_MS, COMMAND_TIMEOUT_MS, QUICK_CMD_TIMEOUT_MS,
  WEBHOOK_TIMEOUT_MS, WAIT_STEP_MAX_MS, GOTO_MAX_VISITS, WORKER_STALE_MINUTES

New features:
- GET /api/health (auth required): version, uptime, worker counts
- DELETE /api/executions/completed: bulk delete finished executions (admin)
- schedule_value positive-integer validation for interval/hourly schedule types
- Request logging middleware: [HTTP] METHOD /path STATUS Xms

Code quality:
- All console.log on error paths changed to console.error
- Removed stray debug console.log in POST /api/workflows

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-03-12 10:59:07 -04:00
parent 658caa9f7e
commit ba75a61bd2

323
server.js
View File

@@ -33,6 +33,58 @@ const executionLimiter = rateLimit({
}); });
app.use('/api/', apiLimiter); app.use('/api/', apiLimiter);
// Named constants for timeouts and limits
const PROMPT_TIMEOUT_MS = 60 * 60 * 1000; // 60 min — how long a prompt waits for user input
const COMMAND_TIMEOUT_MS = 120_000; // 2 min — workflow step command timeout
const QUICK_CMD_TIMEOUT_MS = 60_000; // 1 min — quick/direct/gandalf command timeout
const WEBHOOK_TIMEOUT_MS = 10_000; // 10 s — outbound webhook HTTP timeout
const WAIT_STEP_MAX_MS = 24 * 60 * 60_000; // 24 h — cap on workflow wait step
const GOTO_MAX_VISITS = 100; // max times a step may be revisited via goto
const WORKER_STALE_MINUTES = 5; // minutes before a worker is marked offline
const SERVER_VERSION = '1.0.0';
// Request logging middleware
app.use((req, res, next) => {
const start = Date.now();
res.on('finish', () => {
console.log(`[HTTP] ${req.method} ${req.path} ${res.statusCode} ${Date.now() - start}ms`);
});
next();
});
// Content-Type guard for JSON endpoints
function requireJSON(req, res, next) {
const ct = req.headers['content-type'] || '';
if (!ct.includes('application/json')) {
return res.status(415).json({ error: 'Content-Type must be application/json' });
}
next();
}
// Validate and parse a webhook URL; returns { ok, url, reason }
function validateWebhookUrl(raw) {
if (!raw) return { ok: true, url: null };
let url;
try { url = new URL(raw); } catch { return { ok: false, reason: 'Invalid URL format' }; }
if (!['http:', 'https:'].includes(url.protocol)) {
return { ok: false, reason: 'Webhook URL must use http or https' };
}
const host = url.hostname.toLowerCase();
if (
host === 'localhost' ||
host === '::1' ||
/^127\./.test(host) ||
/^10\./.test(host) ||
/^192\.168\./.test(host) ||
/^172\.(1[6-9]|2\d|3[01])\./.test(host) ||
/^169\.254\./.test(host) ||
/^fe80:/i.test(host)
) {
return { ok: false, reason: 'Webhook URL must not point to a private/internal address' };
}
return { ok: true, url };
}
// Database pool // Database pool
const pool = mysql.createPool({ const pool = mysql.createPool({
host: process.env.DB_HOST, host: process.env.DB_HOST,
@@ -198,9 +250,15 @@ async function processScheduledCommands() {
console.log(`[Scheduler] Running scheduled command: ${schedule.name}`); console.log(`[Scheduler] Running scheduled command: ${schedule.name}`);
// Handle both string (raw SQL) and object (auto-parsed by MySQL2 JSON column) // Handle both string (raw SQL) and object (auto-parsed by MySQL2 JSON column)
const workerIds = typeof schedule.worker_ids === 'string' let workerIds;
? JSON.parse(schedule.worker_ids) try {
: schedule.worker_ids; workerIds = typeof schedule.worker_ids === 'string'
? JSON.parse(schedule.worker_ids)
: schedule.worker_ids;
} catch {
console.error(`[Scheduler] Invalid worker_ids JSON for "${schedule.name}" — skipping`);
continue;
}
// Execute command on each worker // Execute command on each worker
for (const workerId of workerIds) { for (const workerId of workerIds) {
@@ -226,7 +284,7 @@ async function processScheduledCommands() {
execution_id: executionId, execution_id: executionId,
command: schedule.command, command: schedule.command,
worker_id: workerId, worker_id: workerId,
timeout: 300000 // 5 minute timeout for scheduled commands timeout: 5 * 60_000 // SCHEDULED_CMD_TIMEOUT
})); }));
broadcast({ type: 'execution_started', execution_id: executionId, workflow_id: null }); broadcast({ type: 'execution_started', execution_id: executionId, workflow_id: null });
@@ -286,6 +344,25 @@ setInterval(processScheduledCommands, 60 * 1000);
// Initial run on startup // Initial run on startup
setTimeout(processScheduledCommands, 5000); setTimeout(processScheduledCommands, 5000);
// Mark workers offline when their heartbeat goes stale
async function markStaleWorkersOffline() {
try {
const [stale] = await pool.query(
`SELECT id FROM workers WHERE status = 'online'
AND last_heartbeat < DATE_SUB(NOW(), INTERVAL ? MINUTE)`,
[WORKER_STALE_MINUTES]
);
for (const w of stale) {
await pool.query(`UPDATE workers SET status='offline' WHERE id=?`, [w.id]);
broadcast({ type: 'worker_update', worker_id: w.id, status: 'offline' });
console.log(`[Worker] Marked stale worker ${w.id} offline`);
}
} catch (error) {
console.error('[Worker] Stale check error:', error);
}
}
setInterval(markStaleWorkersOffline, 60_000);
// WebSocket connections // WebSocket connections
const browserClients = new Set(); // Browser UI connections const browserClients = new Set(); // Browser UI connections
const workerClients = new Set(); // Worker agent connections const workerClients = new Set(); // Worker agent connections
@@ -431,12 +508,14 @@ wss.on('connection', (ws) => {
} }
if (message.type === 'pong') { if (message.type === 'pong') {
// Handle worker pong response // Use the DB worker ID stored on connect; fall back to message payload
const { worker_id } = message; const dbId = ws.dbWorkerId || message.worker_id;
await pool.query( if (dbId) {
`UPDATE workers SET last_heartbeat=NOW() WHERE id=?`, await pool.query(
[worker_id] `UPDATE workers SET last_heartbeat=NOW() WHERE id=?`,
); [dbId]
);
}
} }
} catch (error) { } catch (error) {
@@ -447,7 +526,6 @@ wss.on('connection', (ws) => {
ws.on('close', () => { ws.on('close', () => {
browserClients.delete(ws); browserClients.delete(ws);
workerClients.delete(ws); workerClients.delete(ws);
// Remove worker from workers map when disconnected (both runtime and db IDs)
if (ws.workerId) { if (ws.workerId) {
workers.delete(ws.workerId); workers.delete(ws.workerId);
console.log(`Worker ${ws.workerId} (runtime ID) disconnected`); console.log(`Worker ${ws.workerId} (runtime ID) disconnected`);
@@ -455,6 +533,10 @@ wss.on('connection', (ws) => {
if (ws.dbWorkerId) { if (ws.dbWorkerId) {
workers.delete(ws.dbWorkerId); workers.delete(ws.dbWorkerId);
console.log(`Worker ${ws.dbWorkerId} (database ID) disconnected`); console.log(`Worker ${ws.dbWorkerId} (database ID) disconnected`);
// Mark worker offline in DB
pool.query(`UPDATE workers SET status='offline' WHERE id=?`, [ws.dbWorkerId])
.then(() => broadcast({ type: 'worker_update', worker_id: ws.dbWorkerId, status: 'offline' }))
.catch(err => console.error('[Worker] Failed to mark worker offline:', err));
} }
}); });
}); });
@@ -520,6 +602,12 @@ async function fireWebhook(executionId, status) {
if (!rows.length || !rows[0].webhook_url) return; if (!rows.length || !rows[0].webhook_url) return;
const exec = rows[0]; const exec = rows[0];
const { ok, url, reason } = validateWebhookUrl(exec.webhook_url);
if (!ok) {
console.warn(`[Webhook] Skipping invalid stored URL for execution ${executionId}: ${reason}`);
return;
}
const payload = { const payload = {
execution_id: executionId, execution_id: executionId,
workflow_id: exec.workflow_id, workflow_id: exec.workflow_id,
@@ -531,15 +619,12 @@ async function fireWebhook(executionId, status) {
timestamp: new Date().toISOString() timestamp: new Date().toISOString()
}; };
const url = new URL(exec.webhook_url); const response = await fetch(exec.webhook_url, {
const body = JSON.stringify(payload);
const options = {
method: 'POST', method: 'POST',
headers: { 'Content-Type': 'application/json', 'User-Agent': 'PULSE-Webhook/1.0' } headers: { 'Content-Type': 'application/json', 'User-Agent': 'PULSE-Webhook/1.0' },
}; body: JSON.stringify(payload),
signal: AbortSignal.timeout(WEBHOOK_TIMEOUT_MS)
// Use native fetch (Node 18+) });
const response = await fetch(exec.webhook_url, { ...options, body, signal: AbortSignal.timeout(10000) });
console.log(`[Webhook] ${url.hostname} responded ${response.status} for execution ${executionId}`); console.log(`[Webhook] ${url.hostname} responded ${response.status} for execution ${executionId}`);
} }
@@ -660,8 +745,20 @@ async function executeWorkflowSteps(executionId, workflowId, definition, usernam
steps.forEach((step, i) => { if (step.id) stepIdMap.set(step.id, i); }); steps.forEach((step, i) => { if (step.id) stepIdMap.set(step.id, i); });
let currentIndex = 0; let currentIndex = 0;
const stepVisits = new Array(steps.length).fill(0);
while (currentIndex < steps.length) { while (currentIndex < steps.length) {
// Detect goto infinite loops
stepVisits[currentIndex] = (stepVisits[currentIndex] || 0) + 1;
if (stepVisits[currentIndex] > GOTO_MAX_VISITS) {
await addExecutionLog(executionId, {
action: 'workflow_error',
error: `Infinite loop detected at step ${currentIndex + 1} (visited ${stepVisits[currentIndex]} times)`,
timestamp: new Date().toISOString()
});
await updateExecutionStatus(executionId, 'failed');
return;
}
// Enforce global execution timeout // Enforce global execution timeout
if (Date.now() > executionDeadline) { if (Date.now() > executionDeadline) {
await addExecutionLog(executionId, { await addExecutionLog(executionId, {
@@ -723,7 +820,9 @@ async function executeWorkflowSteps(executionId, workflowId, definition, usernam
} }
} else if (step.type === 'wait') { } else if (step.type === 'wait') {
const ms = (step.duration || 5) * 1000; let ms = parseFloat(step.duration || 5) * 1000;
if (isNaN(ms) || ms < 0) ms = 5000;
if (ms > WAIT_STEP_MAX_MS) ms = WAIT_STEP_MAX_MS;
await addExecutionLog(executionId, { await addExecutionLog(executionId, {
step: currentIndex + 1, action: 'waiting', duration_ms: ms, step: currentIndex + 1, action: 'waiting', duration_ms: ms,
timestamp: new Date().toISOString() timestamp: new Date().toISOString()
@@ -795,12 +894,15 @@ async function executePromptStep(executionId, step, stepNumber) {
_executionPrompts.delete(executionId); _executionPrompts.delete(executionId);
console.warn(`[Workflow] Prompt timed out for execution ${executionId}`); console.warn(`[Workflow] Prompt timed out for execution ${executionId}`);
resolve(null); resolve(null);
}, 60 * 60 * 1000); // 60 minute timeout }, PROMPT_TIMEOUT_MS);
_executionPrompts.set(executionId, (response) => { _executionPrompts.set(executionId, {
clearTimeout(timer); resolve: (response) => {
_executionPrompts.delete(executionId); clearTimeout(timer);
resolve(response); _executionPrompts.delete(executionId);
resolve(response);
},
options
}); });
}); });
} }
@@ -880,11 +982,11 @@ async function executeCommandStep(executionId, step, stepNumber, params = {}) {
command_id: commandId, command_id: commandId,
command: command, command: command,
worker_id: workerId, worker_id: workerId,
timeout: 120000 // 2 minute timeout timeout: COMMAND_TIMEOUT_MS
})); }));
// Wait for command result (with timeout) // Wait for command result (with timeout)
const result = await waitForCommandResult(executionId, commandId, 120000); const result = await waitForCommandResult(executionId, commandId, COMMAND_TIMEOUT_MS);
results.push(result); results.push(result);
if (!result.success) { if (!result.success) {
@@ -936,7 +1038,8 @@ app.get('/api/workflows', authenticateSSO, async (req, res) => {
const [rows] = await pool.query('SELECT * FROM workflows ORDER BY created_at DESC'); const [rows] = await pool.query('SELECT * FROM workflows ORDER BY created_at DESC');
res.json(rows); res.json(rows);
} catch (error) { } catch (error) {
res.status(500).json({ error: error.message }); console.error('[API] GET /api/workflows error:', error);
res.status(500).json({ error: 'Internal server error' });
} }
}); });
@@ -945,18 +1048,24 @@ app.get('/api/workflows/:id', authenticateSSO, async (req, res) => {
const [rows] = await pool.query('SELECT * FROM workflows WHERE id = ?', [req.params.id]); const [rows] = await pool.query('SELECT * FROM workflows WHERE id = ?', [req.params.id]);
if (rows.length === 0) return res.status(404).json({ error: 'Not found' }); if (rows.length === 0) return res.status(404).json({ error: 'Not found' });
const wf = rows[0]; const wf = rows[0];
res.json({ ...wf, definition: JSON.parse(wf.definition || '{}') }); let definition = {};
try { definition = JSON.parse(wf.definition || '{}'); } catch { /* corrupt definition — return empty */ }
res.json({ ...wf, definition });
} catch (error) { } catch (error) {
res.status(500).json({ error: error.message }); console.error('[API] GET /api/workflows/:id error:', error);
res.status(500).json({ error: 'Internal server error' });
} }
}); });
app.post('/api/workflows', authenticateSSO, async (req, res) => { app.post('/api/workflows', authenticateSSO, requireJSON, async (req, res) => {
try { try {
const { name, description, definition, webhook_url } = req.body; const { name, description, definition, webhook_url } = req.body;
const id = crypto.randomUUID(); if (!name || !definition) return res.status(400).json({ error: 'name and definition are required' });
console.log('[Workflow] Creating workflow:', name); const webhookCheck = validateWebhookUrl(webhook_url);
if (!webhookCheck.ok) return res.status(400).json({ error: webhookCheck.reason });
const id = crypto.randomUUID();
await pool.query( await pool.query(
'INSERT INTO workflows (id, name, description, definition, webhook_url, created_by) VALUES (?, ?, ?, ?, ?, ?)', 'INSERT INTO workflows (id, name, description, definition, webhook_url, created_by) VALUES (?, ?, ?, ?, ?, ?)',
@@ -964,13 +1073,10 @@ app.post('/api/workflows', authenticateSSO, async (req, res) => {
); );
res.json({ id, name, description, definition, webhook_url: webhook_url || null }); res.json({ id, name, description, definition, webhook_url: webhook_url || null });
console.log('[Workflow] Broadcasting workflow_created');
broadcast({ type: 'workflow_created', workflow_id: id }); broadcast({ type: 'workflow_created', workflow_id: id });
console.log('[Workflow] Broadcast complete');
} catch (error) { } catch (error) {
console.error('[Workflow] Error creating workflow:', error); console.error('[Workflow] Error creating workflow:', error);
res.status(500).json({ error: error.message }); res.status(500).json({ error: 'Internal server error' });
} }
}); });
@@ -985,7 +1091,7 @@ app.delete('/api/workflows/:id', authenticateSSO, async (req, res) => {
res.json({ success: true }); res.json({ success: true });
broadcast({ type: 'workflow_deleted', workflow_id: req.params.id }); broadcast({ type: 'workflow_deleted', workflow_id: req.params.id });
} catch (error) { } catch (error) {
res.status(500).json({ error: error.message }); res.status(500).json({ error: 'Internal server error' });
} }
}); });
@@ -994,7 +1100,8 @@ app.get('/api/workers', authenticateSSO, async (req, res) => {
const [rows] = await pool.query('SELECT id, name, status, last_heartbeat, metadata FROM workers ORDER BY name'); const [rows] = await pool.query('SELECT id, name, status, last_heartbeat, metadata FROM workers ORDER BY name');
res.json(rows); res.json(rows);
} catch (error) { } catch (error) {
res.status(500).json({ error: error.message }); console.error('[API] GET /api/workers error:', error);
res.status(500).json({ error: 'Internal server error' });
} }
}); });
@@ -1003,8 +1110,8 @@ app.post('/api/workers/heartbeat', async (req, res) => {
const { worker_id, name, metadata } = req.body; const { worker_id, name, metadata } = req.body;
const apiKey = req.headers['x-api-key']; const apiKey = req.headers['x-api-key'];
// Verify API key // Verify API key — reject missing or wrong keys
if (apiKey !== process.env.WORKER_API_KEY) { if (!apiKey || apiKey !== process.env.WORKER_API_KEY) {
return res.status(401).json({ error: 'Invalid API key' }); return res.status(401).json({ error: 'Invalid API key' });
} }
@@ -1021,13 +1128,15 @@ app.post('/api/workers/heartbeat', async (req, res) => {
broadcast({ type: 'worker_update', worker_id, status: 'online' }); broadcast({ type: 'worker_update', worker_id, status: 'online' });
res.json({ success: true }); res.json({ success: true });
} catch (error) { } catch (error) {
res.status(500).json({ error: error.message }); console.error('[Worker] Heartbeat error:', error);
res.status(500).json({ error: 'Internal server error' });
} }
}); });
app.post('/api/executions', executionLimiter, authenticateSSO, async (req, res) => { app.post('/api/executions', executionLimiter, authenticateSSO, requireJSON, async (req, res) => {
try { try {
const { workflow_id, params = {}, dry_run = false } = req.body; const { workflow_id, params = {}, dry_run = false } = req.body;
if (!workflow_id) return res.status(400).json({ error: 'workflow_id is required' });
const id = crypto.randomUUID(); const id = crypto.randomUUID();
// Get workflow definition // Get workflow definition
@@ -1037,7 +1146,12 @@ app.post('/api/executions', executionLimiter, authenticateSSO, async (req, res)
} }
const workflow = workflows[0]; const workflow = workflows[0];
const definition = typeof workflow.definition === 'string' ? JSON.parse(workflow.definition) : workflow.definition; let definition;
try {
definition = typeof workflow.definition === 'string' ? JSON.parse(workflow.definition) : workflow.definition;
} catch {
return res.status(500).json({ error: 'Workflow definition is corrupt' });
}
// Validate required params // Validate required params
const paramDefs = definition.params || []; const paramDefs = definition.params || [];
@@ -1069,7 +1183,8 @@ app.post('/api/executions', executionLimiter, authenticateSSO, async (req, res)
res.json({ id, workflow_id, status: 'running', dry_run: !!dry_run }); res.json({ id, workflow_id, status: 'running', dry_run: !!dry_run });
} catch (error) { } catch (error) {
res.status(500).json({ error: error.message }); console.error('[Execution] Error starting execution:', error);
res.status(500).json({ error: 'Internal server error' });
} }
}); });
@@ -1138,7 +1253,21 @@ app.get('/api/executions', authenticateSSO, async (req, res) => {
hasMore: offset + rows.length < total hasMore: offset + rows.length < total
}); });
} catch (error) { } catch (error) {
res.status(500).json({ error: error.message }); res.status(500).json({ error: 'Internal server error' });
}
});
app.delete('/api/executions/completed', authenticateSSO, async (req, res) => {
try {
if (!req.user.isAdmin) return res.status(403).json({ error: 'Admin access required' });
const [result] = await pool.query(
"DELETE FROM executions WHERE status IN ('completed', 'failed')"
);
broadcast({ type: 'executions_bulk_deleted' });
res.json({ success: true, deleted: result.affectedRows });
} catch (error) {
console.error('[Execution] Bulk delete error:', error);
res.status(500).json({ error: 'Internal server error' });
} }
}); });
@@ -1149,7 +1278,8 @@ app.delete('/api/executions/:id', authenticateSSO, async (req, res) => {
broadcast({ type: 'execution_deleted', execution_id: req.params.id }); broadcast({ type: 'execution_deleted', execution_id: req.params.id });
res.json({ success: true }); res.json({ success: true });
} catch (error) { } catch (error) {
res.status(500).json({ error: error.message }); console.error('[Execution] Error deleting execution:', error);
res.status(500).json({ error: 'Internal server error' });
} }
}); });
@@ -1180,14 +1310,14 @@ app.post('/api/executions/:id/abort', authenticateSSO, async (req, res) => {
// Unblock any pending prompt so the thread can exit // Unblock any pending prompt so the thread can exit
const pending = _executionPrompts.get(executionId); const pending = _executionPrompts.get(executionId);
if (pending) pending(null); if (pending) pending.resolve(null);
console.log(`[Execution] Execution ${executionId} aborted by ${req.user.username}`); console.log(`[Execution] Execution ${executionId} aborted by ${req.user.username}`);
res.json({ success: true }); res.json({ success: true });
} catch (error) { } catch (error) {
console.error('[Execution] Error aborting execution:', error); console.error('[Execution] Error aborting execution:', error);
res.status(500).json({ error: error.message }); res.status(500).json({ error: 'Internal server error' });
} }
}); });
@@ -1197,11 +1327,20 @@ app.post('/api/executions/:id/respond', authenticateSSO, async (req, res) => {
const { id } = req.params; const { id } = req.params;
const { response } = req.body; const { response } = req.body;
if (!response) return res.status(400).json({ error: 'response is required' }); if (!response || typeof response !== 'string') {
return res.status(400).json({ error: 'response is required' });
}
const pending = _executionPrompts.get(id); const pending = _executionPrompts.get(id);
if (!pending) return res.status(404).json({ error: 'No pending prompt for this execution' }); if (!pending) return res.status(404).json({ error: 'No pending prompt for this execution' });
// Validate response is one of the allowed options
if (pending.options && !pending.options.includes(response)) {
return res.status(400).json({
error: `Invalid response. Allowed values: ${pending.options.join(', ')}`
});
}
await addExecutionLog(id, { await addExecutionLog(id, {
action: 'prompt_response', response, action: 'prompt_response', response,
responded_by: req.user.username, responded_by: req.user.username,
@@ -1210,15 +1349,15 @@ app.post('/api/executions/:id/respond', authenticateSSO, async (req, res) => {
broadcast({ type: 'prompt_response', execution_id: id, response }); broadcast({ type: 'prompt_response', execution_id: id, response });
pending(response); pending.resolve(response);
res.json({ success: true }); res.json({ success: true });
} catch (error) { } catch (error) {
res.status(500).json({ error: error.message }); res.status(500).json({ error: 'Internal server error' });
} }
}); });
// Edit a workflow definition (admin only) // Edit a workflow definition (admin only)
app.put('/api/workflows/:id', authenticateSSO, async (req, res) => { app.put('/api/workflows/:id', authenticateSSO, requireJSON, async (req, res) => {
try { try {
if (!req.user.isAdmin) return res.status(403).json({ error: 'Admin only' }); if (!req.user.isAdmin) return res.status(403).json({ error: 'Admin only' });
@@ -1227,11 +1366,13 @@ app.put('/api/workflows/:id', authenticateSSO, async (req, res) => {
if (!name || !definition) return res.status(400).json({ error: 'name and definition required' }); if (!name || !definition) return res.status(400).json({ error: 'name and definition required' });
// Validate definition is parseable JSON const webhookCheck = validateWebhookUrl(webhook_url);
if (!webhookCheck.ok) return res.status(400).json({ error: webhookCheck.reason });
let defObj; let defObj;
try { try {
defObj = typeof definition === 'string' ? JSON.parse(definition) : definition; defObj = typeof definition === 'string' ? JSON.parse(definition) : definition;
} catch (e) { } catch {
return res.status(400).json({ error: 'Invalid JSON in definition' }); return res.status(400).json({ error: 'Invalid JSON in definition' });
} }
@@ -1245,7 +1386,8 @@ app.put('/api/workflows/:id', authenticateSSO, async (req, res) => {
broadcast({ type: 'workflow_updated', workflow_id: id }); broadcast({ type: 'workflow_updated', workflow_id: id });
res.json({ success: true }); res.json({ success: true });
} catch (error) { } catch (error) {
res.status(500).json({ error: error.message }); console.error('[Workflow] Error updating workflow:', error);
res.status(500).json({ error: 'Internal server error' });
} }
}); });
@@ -1257,11 +1399,11 @@ app.get('/api/scheduled-commands', authenticateSSO, async (req, res) => {
); );
res.json(schedules); res.json(schedules);
} catch (error) { } catch (error) {
res.status(500).json({ error: error.message }); res.status(500).json({ error: 'Internal server error' });
} }
}); });
app.post('/api/scheduled-commands', authenticateSSO, async (req, res) => { app.post('/api/scheduled-commands', authenticateSSO, requireJSON, async (req, res) => {
try { try {
if (!req.user.isAdmin) return res.status(403).json({ error: 'Admin access required' }); if (!req.user.isAdmin) return res.status(403).json({ error: 'Admin access required' });
const { name, command, worker_ids, schedule_type, schedule_value } = req.body; const { name, command, worker_ids, schedule_type, schedule_value } = req.body;
@@ -1270,6 +1412,14 @@ app.post('/api/scheduled-commands', authenticateSSO, async (req, res) => {
return res.status(400).json({ error: 'Missing required fields' }); return res.status(400).json({ error: 'Missing required fields' });
} }
// Validate schedule_value for numeric types
if (schedule_type === 'interval' || schedule_type === 'hourly') {
const n = parseInt(schedule_value, 10);
if (!Number.isInteger(n) || n <= 0) {
return res.status(400).json({ error: `schedule_value for type "${schedule_type}" must be a positive integer` });
}
}
const id = crypto.randomUUID(); const id = crypto.randomUUID();
const nextRun = calculateNextRun(schedule_type, schedule_value); const nextRun = calculateNextRun(schedule_type, schedule_value);
@@ -1282,7 +1432,7 @@ app.post('/api/scheduled-commands', authenticateSSO, async (req, res) => {
res.json({ success: true, id }); res.json({ success: true, id });
} catch (error) { } catch (error) {
res.status(500).json({ error: error.message }); res.status(500).json({ error: 'Internal server error' });
} }
}); });
@@ -1301,7 +1451,7 @@ app.put('/api/scheduled-commands/:id/toggle', authenticateSSO, async (req, res)
res.json({ success: true, enabled: newEnabled }); res.json({ success: true, enabled: newEnabled });
} catch (error) { } catch (error) {
res.status(500).json({ error: error.message }); res.status(500).json({ error: 'Internal server error' });
} }
}); });
@@ -1312,7 +1462,7 @@ app.delete('/api/scheduled-commands/:id', authenticateSSO, async (req, res) => {
await pool.query('DELETE FROM scheduled_commands WHERE id = ?', [id]); await pool.query('DELETE FROM scheduled_commands WHERE id = ?', [id]);
res.json({ success: true }); res.json({ success: true });
} catch (error) { } catch (error) {
res.status(500).json({ error: error.message }); res.status(500).json({ error: 'Internal server error' });
} }
}); });
@@ -1347,12 +1497,12 @@ app.post('/api/internal/command', authenticateGandalf, async (req, res) => {
execution_id: executionId, execution_id: executionId,
command: command, command: command,
worker_id: worker_id, worker_id: worker_id,
timeout: 60000 timeout: QUICK_CMD_TIMEOUT_MS
})); }));
res.json({ execution_id: executionId }); res.json({ execution_id: executionId });
} catch (error) { } catch (error) {
res.status(500).json({ error: error.message }); res.status(500).json({ error: 'Internal server error' });
} }
}); });
@@ -1363,12 +1513,32 @@ app.get('/api/internal/executions/:id', authenticateGandalf, async (req, res) =>
return res.status(404).json({ error: 'Not found' }); return res.status(404).json({ error: 'Not found' });
} }
const execution = rows[0]; const execution = rows[0];
let logs = [];
try { logs = JSON.parse(execution.logs || '[]'); } catch { logs = []; }
res.json({ ...execution, logs });
} catch (error) {
console.error('[API] GET /api/internal/executions/:id error:', error);
res.status(500).json({ error: 'Internal server error' });
}
});
// Detailed health + stats (auth required)
app.get('/api/health', authenticateSSO, async (req, res) => {
try {
await pool.query('SELECT 1');
const [workerRows] = await pool.query('SELECT status, COUNT(*) as count FROM workers GROUP BY status');
const workerCounts = Object.fromEntries(workerRows.map(r => [r.status, Number(r.count)]));
res.json({ res.json({
...execution, status: 'ok',
logs: JSON.parse(execution.logs || '[]') version: SERVER_VERSION,
uptime_seconds: Math.floor(process.uptime()),
timestamp: new Date().toISOString(),
workers: { online: workerCounts.online || 0, offline: workerCounts.offline || 0 },
database: 'connected'
}); });
} catch (error) { } catch (error) {
res.status(500).json({ error: error.message }); console.error('[Health] /api/health error:', error);
res.status(500).json({ status: 'error', timestamp: new Date().toISOString() });
} }
}); });
@@ -1387,7 +1557,7 @@ app.get('/health', async (req, res) => {
status: 'error', status: 'error',
timestamp: new Date().toISOString(), timestamp: new Date().toISOString(),
database: 'disconnected', database: 'disconnected',
error: error.message error: 'Internal server error'
}); });
} }
}); });
@@ -1401,7 +1571,10 @@ app.get('/api/executions/:id', authenticateSSO, async (req, res) => {
} }
const execution = rows[0]; const execution = rows[0];
const parsedLogs = typeof execution.logs === 'string' ? JSON.parse(execution.logs || '[]') : (execution.logs || []); let parsedLogs = [];
try {
parsedLogs = typeof execution.logs === 'string' ? JSON.parse(execution.logs || '[]') : (execution.logs || []);
} catch { parsedLogs = []; }
const waitingForInput = _executionPrompts.has(req.params.id); const waitingForInput = _executionPrompts.has(req.params.id);
let pendingPrompt = null; let pendingPrompt = null;
if (waitingForInput) { if (waitingForInput) {
@@ -1420,7 +1593,8 @@ app.get('/api/executions/:id', authenticateSSO, async (req, res) => {
prompt: pendingPrompt, prompt: pendingPrompt,
}); });
} catch (error) { } catch (error) {
res.status(500).json({ error: error.message }); console.error('[API] GET /api/executions/:id error:', error);
res.status(500).json({ error: 'Internal server error' });
} }
}); });
@@ -1435,14 +1609,17 @@ app.delete('/api/workers/:id', authenticateSSO, async (req, res) => {
res.json({ success: true }); res.json({ success: true });
broadcast({ type: 'worker_deleted', worker_id: req.params.id }); broadcast({ type: 'worker_deleted', worker_id: req.params.id });
} catch (error) { } catch (error) {
res.status(500).json({ error: error.message }); res.status(500).json({ error: 'Internal server error' });
} }
}); });
// Send direct command to specific worker // Send direct command to specific worker
app.post('/api/workers/:id/command', authenticateSSO, async (req, res) => { app.post('/api/workers/:id/command', authenticateSSO, requireJSON, async (req, res) => {
try { try {
const { command } = req.body; const { command } = req.body;
if (!command || typeof command !== 'string' || !command.trim()) {
return res.status(400).json({ error: 'command is required' });
}
const executionId = crypto.randomUUID(); const executionId = crypto.randomUUID();
const workerId = req.params.id; const workerId = req.params.id;
@@ -1464,7 +1641,7 @@ app.post('/api/workers/:id/command', authenticateSSO, async (req, res) => {
execution_id: executionId, execution_id: executionId,
command: command, command: command,
worker_id: workerId, worker_id: workerId,
timeout: 60000 timeout: QUICK_CMD_TIMEOUT_MS
}; };
const workerWs = workers.get(workerId); const workerWs = workers.get(workerId);
@@ -1478,7 +1655,7 @@ app.post('/api/workers/:id/command', authenticateSSO, async (req, res) => {
broadcast({ type: 'execution_started', execution_id: executionId, workflow_id: null }); broadcast({ type: 'execution_started', execution_id: executionId, workflow_id: null });
res.json({ success: true, execution_id: executionId }); res.json({ success: true, execution_id: executionId });
} catch (error) { } catch (error) {
res.status(500).json({ error: error.message }); res.status(500).json({ error: 'Internal server error' });
} }
}); });