Compare commits

...

7 Commits

Author SHA1 Message Date
0990e5b807 Add parse/route step types, wider modal, automated link troubleshooter
- Add 'parse' step type: reads KEY=VALUE lines from last command output into execution state
- Add 'route' step type: evaluates JS conditions list, auto-jumps to first match with no user input
- Support condition field on parse steps (skip when conditional execute was also skipped)
- Widen execution detail modal to min(1100px, 96vw) to eliminate horizontal scrolling
- Show last command output in prompt boxes so user has context before clicking
- Add formatLogEntry support for parse_complete and route_taken log actions
- Apply applyParams() substitution to prompt messages ({{server_name}}, {{iface}}, etc.)
- Fix prompt button onclick using data-opt attribute to avoid JSON double-quote breakage

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-17 19:29:28 -04:00
2290d52f8b Fix prompt buttons, add command output to prompt steps, add worker to repo
- Fix onclick buttons broken by JSON.stringify double-quotes inside HTML
  attributes — use data-opt attribute + this.dataset.opt instead
- Track last command stdout in execution state when command_result arrives
- executePromptStep: include last command output in log entry and broadcast
  so users can review results alongside the question in the same view
- GET /api/executions/🆔 propagate output field to pending prompt response
- Add .prompt-output CSS class for scrollable terminal-style output block
- Fix MariaDB CAST(? AS JSON) → JSON_EXTRACT(?, '$') (MariaDB 10.11 compat)
- Add worker/worker.js to repo (deployed on pulse-worker-01 / LXC 153)
  Fix: worker was not echoing command_id back in result — resolvers always
  got undefined, causing every workflow step to timeout and fail

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-17 19:06:02 -04:00
3f6e04d1ab Apply LotusGuild design system convergence (aesthetic_diff.md)
- §5: Section headers now ╠═══ TITLE ═══╣ (was ═══ TITLE ═══)
- §8+§18: Replace inline-style showTerminalNotification() with lt.toast.*
  delegate wrapper; load base.js from /base.js
- §12: Fix --text-muted #008822→#00bb33 (WCAG AA contrast)

base.js symlinked from web_template into public/ so lt.* is available.
showTerminalNotification() is kept as a thin wrapper so all existing
call sites continue to work unchanged.

README: Remove completed pending items (toast, text-muted, position)

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-14 21:40:36 -04:00
18a6b642d1 Fix all-workers-offline silent success bug, noisy logging, UX polish
server.js:
- Fix bug: when all targeted workers disconnect before step runs, results[] was empty
  and results.every() returned true vacuously (silent false success). Now tracks sentCount
  and fails with 'no_workers' log if nothing was actually dispatched
- Remove per-message console.log on every WebSocket message (high noise)
- Only log a warning for failed commands (not every success)

index.html:
- loadSchedules() catch now shows error message in scheduleList (was silent)
- abortExecution() shows server's error message from JSON body instead of generic string
  (e.g. "Execution is not running" instead of "Failed to abort execution")

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-13 13:48:35 -04:00
8ff3700601 Fix XSS, add per-step timeout/retry to workflow engine
index.html:
- Escape started_by in execution list, execution cards, and execution detail modal
- Escape schedule name in schedules list

server.js:
- Per-step timeout: step.timeout (seconds) overrides global COMMAND_TIMEOUT_MS (5s-600s range)
- Per-step retry: step.retries (max 5) with step.retryDelayMs (max 30s) re-sends command on failure
  with command_retry log entries showing attempt/max_retries progress

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-12 17:36:35 -04:00
ba5ba6f899 Security and reliability fixes: XSS, race conditions, logging
- Fix XSS in workflow list: escape name/description/created_by with escapeHtml()
- Fix broadcast() race: snapshot browserClients Set before iterating
- Fix waitForCommandResult: use position-based matching (worker omits command_id in result)
- Fix scheduler duplicate-run race: atomic UPDATE with affectedRows check
- Suppress toast alerts for automated executions (gandalf:/scheduler: prefix)
- Add waiting_for_input + prompt fields to GET /executions/:id
- Add GET /api/workflows/:id endpoint
- Add interactive prompt UI: respondToPrompt(), WebSocket execution_prompt handler
- Add workflow editor modal (admin-only)
- Remove sensitive console.log calls (WS payload, command output)
- Show error messages in list containers on load failure
- evalCondition: log warning instead of silently swallowing errors
- Worker reconnect: clean up stale entries for same dbWorkerId
- Null-check execState before continuing workflow steps

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-12 17:30:32 -04:00
a596c6075c Fix ESC handler, form reset, and scheduler next-run countdown
- Fix ESC key handler to use .modal.show class selector instead of style.display check
- Reset create workflow form fields when opening the create modal
- Show relative countdown (e.g. "in 5m") alongside next run timestamp in scheduler list

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-12 11:26:48 -04:00
5 changed files with 565 additions and 105 deletions

View File

@@ -4,6 +4,22 @@ A distributed workflow orchestration platform for managing and executing complex
> **Security Notice:** This repository is hosted on Gitea and is version-controlled. **Never commit secrets, credentials, passwords, API keys, or any sensitive information to this repo.** All sensitive configuration belongs exclusively in `.env` files which are listed in `.gitignore` and must never be committed. This includes database passwords, worker API keys, webhook secrets, and internal IP details.
**Design System**: [web_template](https://code.lotusguild.org/LotusGuild/web_template) — shared CSS, JS, and layout patterns for all LotusGuild apps
## Styling & Layout
PULSE uses the **LotusGuild Terminal Design System**. For all styling, component, and layout documentation see:
- [`web_template/README.md`](https://code.lotusguild.org/LotusGuild/web_template/src/branch/main/README.md) — full component reference, CSS variables, JS API
- [`web_template/base.css`](https://code.lotusguild.org/LotusGuild/web_template/src/branch/main/base.css) — unified CSS (`.lt-*` classes)
- [`web_template/base.js`](https://code.lotusguild.org/LotusGuild/web_template/src/branch/main/base.js) — `window.lt` utilities (toast, modal, WebSocket helpers, fetch)
- [`web_template/aesthetic_diff.md`](https://code.lotusguild.org/LotusGuild/web_template/src/branch/main/aesthetic_diff.md) — cross-app divergence analysis and convergence guide
- [`web_template/node/middleware.js`](https://code.lotusguild.org/LotusGuild/web_template/src/branch/main/node/middleware.js) — Express auth, CSRF, CSP nonce middleware
**Pending convergence items (see aesthetic_diff.md):**
- Extract inline `<style>` from `public/index.html` into `public/style.css` and extend `base.css`
- Use `lt.autoRefresh.start(refreshData, 30000)` instead of raw `setInterval`
## Overview
PULSE is a centralized workflow execution system designed to orchestrate operations across distributed infrastructure. It provides a powerful web-based interface with a vintage CRT terminal aesthetic for defining, managing, and executing workflows that can span multiple servers, require human interaction, and perform complex automation tasks at scale.
@@ -307,12 +323,34 @@ MAX_CONCURRENT_TASKS=5 # Max parallel tasks (default: 5)
PULSE uses MariaDB with the following tables:
- **users**: User accounts from Authelia SSO
- **workers**: Worker node registry with metadata
- **workflows**: Workflow definitions (JSON)
- **executions**: Execution history with logs
| Table | Purpose |
|-------|---------|
| `users` | User accounts synced from Authelia SSO |
| `workers` | Worker node registry with connection metadata |
| `workflows` | Workflow definitions stored as JSON |
| `executions` | Execution history with logs, status, and timestamps |
See [Claude.md](Claude.md) for complete schema details.
### `executions` Table Key Columns
| Column | Description |
|--------|-------------|
| `id` | Auto-increment primary key |
| `worker_id` | Foreign key to workers |
| `command` | The command that was executed |
| `status` | `running`, `completed`, `failed` |
| `output` | Command output / log (JSON or text) |
| `created_at` | Execution start timestamp |
| `completed_at` | Execution end timestamp |
### `workers` Table Key Columns
| Column | Description |
|--------|-------------|
| `id` | Auto-increment primary key |
| `name` | Worker name (from `WORKER_NAME` env) |
| `last_seen` | Last heartbeat timestamp |
| `status` | `online`, `offline` |
| `metadata` | JSON blob of system info |
## Troubleshooting

1
public/base.js Symbolic link
View File

@@ -0,0 +1 @@
/root/code/web_template/base.js

View File

@@ -20,7 +20,7 @@
--bg-terminal-border: #003300;
--text-primary: #00ff41;
--text-secondary: #00cc33;
--text-muted: #008822;
--text-muted: #00bb33;
/* Border & UI */
--border-color: #00ff41;
@@ -271,11 +271,11 @@
text-shadow: var(--glow-amber);
}
.card h3::before {
content: '═══ ';
content: '═══ ';
color: var(--terminal-green);
}
.card h3::after {
content: ' ═══';
content: ' ═══';
color: var(--terminal-green);
}
.status {
@@ -474,9 +474,9 @@
padding: 0;
border: 3px double var(--terminal-green);
border-radius: 0;
max-width: 600px;
width: 90%;
max-height: 85vh;
max-width: min(1100px, 96vw);
width: 96vw;
max-height: 90vh;
overflow-y: auto;
box-shadow: 0 0 30px rgba(0, 255, 65, 0.3);
position: relative;
@@ -574,6 +574,19 @@
font-family: var(--font-mono);
margin-bottom: 14px;
}
.prompt-output {
background: rgba(0, 0, 0, 0.4);
border: 1px solid rgba(0, 255, 65, 0.25);
color: var(--terminal-green);
font-family: var(--font-mono);
font-size: 0.78em;
padding: 10px;
max-height: 280px;
overflow-y: auto;
white-space: pre-wrap;
word-break: break-all;
margin-bottom: 14px;
}
.prompt-opt-btn {
padding: 7px 16px;
margin: 4px 4px 4px 0;
@@ -689,6 +702,28 @@
border-color: #330000;
}
/* parse_complete and route_taken log entries */
.log-parse-table {
display: grid;
grid-template-columns: max-content 1fr;
gap: 2px 12px;
font-size: 0.82em;
margin-top: 6px;
}
.log-parse-key { color: var(--terminal-amber); opacity: .8; }
.log-parse-val { color: var(--terminal-green); word-break: break-all; }
.log-route-label {
color: var(--terminal-cyan);
font-size: 0.9em;
margin-top: 4px;
}
.log-route-goto {
color: var(--terminal-amber);
font-size: 0.82em;
opacity: .75;
margin-top: 2px;
}
.log-entry code {
background: var(--bg-terminal);
padding: 2px 6px;
@@ -803,6 +838,7 @@
animation: slide-in 0.3s ease-out;
}
</style>
<script src="/base.js"></script>
</head>
<body>
<div class="container">
@@ -1254,6 +1290,7 @@
document.getElementById('workerList').innerHTML = fullHtml;
} catch (error) {
console.error('Error loading workers:', error);
document.getElementById('workerList').innerHTML = '<div class="empty" style="color:var(--terminal-red);">⚠ Failed to load workers</div>';
}
}
@@ -1283,9 +1320,9 @@
const def = _workflowRegistry[w.id] || {};
return `
<div class="workflow-item">
<div class="workflow-name">${w.name}${paramBadge(def)}</div>
<div class="workflow-desc">${w.description || 'No description'}</div>
<div class="timestamp">Created by ${w.created_by || 'Unknown'} on ${safeDate(w.created_at)?.toLocaleString() ?? 'N/A'}</div>
<div class="workflow-name">${escapeHtml(w.name)}${paramBadge(def)}</div>
<div class="workflow-desc">${escapeHtml(w.description || 'No description')}</div>
<div class="timestamp">Created by ${escapeHtml(w.created_by || 'Unknown')} on ${safeDate(w.created_at)?.toLocaleString() ?? 'N/A'}</div>
<div style="margin-top: 10px;">
<button onclick="executeWorkflow('${w.id}')">▶️ Execute</button>
${currentUser && currentUser.isAdmin ?
@@ -1298,6 +1335,7 @@
document.getElementById('workflowList').innerHTML = html;
} catch (error) {
console.error('Error loading workflows:', error);
document.getElementById('workflowList').innerHTML = '<div class="empty" style="color:var(--terminal-red);">⚠ Failed to load workflows</div>';
}
}
@@ -1326,14 +1364,22 @@
scheduleDesc = `Cron: ${s.schedule_value}`;
}
const nextRun = safeDate(s.next_run)?.toLocaleString() ?? 'Not scheduled';
const nextRunDate = safeDate(s.next_run);
const nextRunIn = nextRunDate ? (() => {
const secs = Math.round((nextRunDate - Date.now()) / 1000);
if (secs <= 0) return 'now';
if (secs < 60) return `${secs}s`;
if (secs < 3600) return `${Math.round(secs/60)}m`;
return `${Math.round(secs/3600)}h`;
})() : null;
const nextRun = nextRunDate ? `${nextRunDate.toLocaleString()}${nextRunIn ? ` (in ${nextRunIn})` : ''}` : 'Not scheduled';
const lastRun = safeDate(s.last_run)?.toLocaleString() ?? 'Never';
return `
<div class="workflow-item" style="opacity: ${s.enabled ? 1 : 0.6};">
<div style="display: flex; justify-content: space-between; align-items: start;">
<div style="flex: 1;">
<div class="workflow-name">${s.name}</div>
<div class="workflow-name">${escapeHtml(s.name || '')}</div>
<div style="color: var(--terminal-green); font-family: var(--font-mono); font-size: 0.9em; margin: 8px 0;">
Command: <code>${escapeHtml(s.command)}</code>
</div>
@@ -1366,6 +1412,7 @@
document.getElementById('scheduleList').innerHTML = html;
} catch (error) {
console.error('Error loading schedules:', error);
document.getElementById('scheduleList').innerHTML = '<div class="empty" style="color:var(--terminal-red);">⚠ Failed to load schedules</div>';
}
}
@@ -1519,7 +1566,7 @@
<div class="execution-item" onclick="viewExecution('${e.id}')">
<span class="status ${e.status}">${e.status}</span>
<strong>${e.workflow_name || '[Quick Command]'}</strong>
<div class="timestamp">by ${e.started_by} at ${safeDate(e.started_at)?.toLocaleString() ?? 'N/A'}</div>
<div class="timestamp">by ${escapeHtml(e.started_by || '')} at ${safeDate(e.started_at)?.toLocaleString() ?? 'N/A'}</div>
</div>
`).join('');
document.getElementById('dashExecutions').innerHTML = dashHtml;
@@ -1536,6 +1583,7 @@
} catch (error) {
console.error('Error loading executions:', error);
document.getElementById('executionList').innerHTML = '<div class="empty" style="color:var(--terminal-red);">⚠ Failed to load executions</div>';
}
}
@@ -1644,7 +1692,7 @@
<span class="status ${e.status}">${e.status}</span>
<strong>${e.workflow_name || '[Quick Command]'}</strong>
<div class="timestamp">
Started by ${e.started_by} at ${safeDate(e.started_at)?.toLocaleString() ?? 'N/A'}
Started by ${escapeHtml(e.started_by || '')} at ${safeDate(e.started_at)?.toLocaleString() ?? 'N/A'}
${e.completed_at ? ` • Completed at ${safeDate(e.completed_at)?.toLocaleString() ?? 'N/A'}` : elapsed}
</div>
</div>
@@ -1985,17 +2033,19 @@
<div><strong>Status:</strong> <span class="status ${execution.status}">${execution.status}</span></div>
<div><strong>Started:</strong> ${safeDate(execution.started_at)?.toLocaleString() ?? 'N/A'}</div>
${execution.completed_at ? `<div><strong>Completed:</strong> ${safeDate(execution.completed_at)?.toLocaleString() ?? 'N/A'}</div>` : ''}
<div><strong>Started by:</strong> ${execution.started_by}</div>
<div><strong>Started by:</strong> ${escapeHtml(execution.started_by || '')}</div>
`;
if (execution.waiting_for_input && execution.prompt) {
const promptOutput = execution.prompt.output || '';
html += `
<div class="prompt-box">
<h3>Waiting for Input</h3>
${promptOutput ? `<pre class="prompt-output">${escapeHtml(promptOutput)}</pre>` : ''}
<p>${escapeHtml(execution.prompt.message || '')}</p>
<div style="margin-top: 10px;">
${(execution.prompt.options || []).map(opt =>
`<button class="prompt-opt-btn" onclick="respondToPrompt('${executionId}', ${JSON.stringify(opt)})">${escapeHtml(opt)}</button>`
`<button class="prompt-opt-btn" data-opt="${opt.replace(/&/g,'&amp;').replace(/"/g,'&quot;')}" onclick="respondToPrompt('${executionId}', this.dataset.opt)">${escapeHtml(opt)}</button>`
).join('')}
</div>
</div>
@@ -2108,6 +2158,31 @@
`;
}
if (log.action === 'parse_complete') {
const pairs = log.parsed || {};
const keys = Object.keys(pairs);
const tableRows = keys.map(k =>
`<div class="log-parse-key">${escapeHtml(k)}</div><div class="log-parse-val">${escapeHtml(pairs[k])}</div>`
).join('');
return `
<div class="log-entry" style="border-left-color: #444; opacity:.85;">
<div class="log-timestamp">[${timestamp}]</div>
<div class="log-title" style="color:#666;">⚙ Parsed ${keys.length} variable${keys.length !== 1 ? 's' : ''}</div>
${keys.length > 0 ? `<div class="log-details"><div class="log-parse-table">${tableRows}</div></div>` : ''}
</div>
`;
}
if (log.action === 'route_taken') {
return `
<div class="log-entry" style="border-left-color: var(--terminal-cyan); opacity:.9;">
<div class="log-timestamp">[${timestamp}]</div>
<div class="log-title" style="color: var(--terminal-cyan);">⇒ Auto-route: Step ${log.step}</div>
${log.label ? `<div class="log-details"><div class="log-route-label">${escapeHtml(log.label)}</div>${log.goto ? `<div class="log-route-goto">→ ${escapeHtml(log.goto)}</div>` : ''}</div>` : ''}
</div>
`;
}
if (log.action === 'no_workers') {
return `
<div class="log-entry failed">
@@ -2159,7 +2234,7 @@
if (log.action === 'prompt') {
const optionsHtml = (log.options || []).map(opt => {
if (executionId) {
return `<button class="prompt-opt-btn" onclick="respondToPrompt('${executionId}', ${JSON.stringify(opt)})">${escapeHtml(opt)}</button>`;
return `<button class="prompt-opt-btn" data-opt="${opt.replace(/&/g,'&amp;').replace(/"/g,'&quot;')}" onclick="respondToPrompt('${executionId}', this.dataset.opt)">${escapeHtml(opt)}</button>`;
}
return `<button class="prompt-opt-btn answered" disabled>${escapeHtml(opt)}</button>`;
}).join('');
@@ -2168,6 +2243,7 @@
<div class="log-timestamp">[${timestamp}]</div>
<div class="log-title" style="color: var(--terminal-cyan);">❓ Step ${log.step}: ${escapeHtml(log.step_name || 'Prompt')}</div>
<div class="log-details">
${log.output ? `<pre class="log-output" style="max-height:300px;overflow-y:auto;margin-bottom:10px;">${escapeHtml(log.output)}</pre>` : ''}
<div style="color: var(--terminal-green); margin-bottom: 10px;">${escapeHtml(log.message || '')}</div>
<div>${optionsHtml}</div>
</div>
@@ -2358,7 +2434,8 @@
closeModal('viewExecutionModal');
refreshData();
} else {
alert('Failed to abort execution');
const err = await response.json().catch(() => ({}));
alert(err.error || 'Failed to abort execution');
}
} catch (error) {
console.error('Error aborting execution:', error);
@@ -2635,6 +2712,10 @@
}
function showCreateWorkflow() {
document.getElementById('workflowName').value = '';
document.getElementById('workflowDescription').value = '';
document.getElementById('workflowDefinition').value = '';
document.getElementById('workflowWebhookUrl').value = '';
document.getElementById('createWorkflowModal').classList.add('show');
}
@@ -2882,45 +2963,12 @@
}
}
// Show terminal notification
// Show terminal notification — delegates to lt.toast from base.js
function showTerminalNotification(message, type = 'info') {
const notification = document.createElement('div');
notification.style.cssText = `
position: fixed;
top: 80px;
right: 20px;
background: #001a00;
border: 2px solid var(--terminal-green);
color: var(--terminal-green);
padding: 15px 20px;
font-family: var(--font-mono);
z-index: 10000;
animation: slide-in 0.3s ease-out;
box-shadow: 0 0 20px rgba(0, 255, 65, 0.3);
`;
if (type === 'error') {
notification.style.borderColor = '#ff4444';
notification.style.color = '#ff4444';
message = '✗ ' + message;
} else if (type === 'success') {
message = '✓ ' + message;
} else {
message = ' ' + message;
}
notification.textContent = message;
document.body.appendChild(notification);
// Play beep
terminalBeep(type);
// Remove after 3 seconds
setTimeout(() => {
notification.style.opacity = '0';
notification.style.transition = 'opacity 0.5s';
setTimeout(() => notification.remove(), 500);
}, 3000);
if (type === 'success') return lt.toast.success(message);
if (type === 'error') return lt.toast.error(message);
if (type === 'warning') return lt.toast.warning(message);
return lt.toast.info(message);
}
function connectWebSocket() {
@@ -2930,18 +2978,9 @@
ws.onmessage = (event) => {
try {
const data = JSON.parse(event.data);
console.log('WebSocket message:', data);
// Handle specific message types
if (data.type === 'command_result') {
// Display command result in real-time
console.log(`Command result received for execution ${data.execution_id}`);
console.log(`Success: ${data.success}`);
console.log(`Output: ${data.stdout}`);
if (data.stderr) {
console.log(`Error: ${data.stderr}`);
}
// Show terminal notification only for manual executions
if (!data.is_automated) {
if (data.success) {
@@ -2966,7 +3005,6 @@
}
if (data.type === 'workflow_result') {
console.log(`Workflow ${data.status} for execution ${data.execution_id}`);
// Refresh execution list
loadExecutions();
@@ -2982,7 +3020,6 @@
}
if (data.type === 'worker_update') {
console.log(`Worker ${data.worker_id} status: ${data.status}`);
loadWorkers();
}
@@ -3038,10 +3075,8 @@
// Close any open modal on ESC key
document.addEventListener('keydown', (e) => {
if (e.key === 'Escape') {
document.querySelectorAll('.modal').forEach(modal => {
if (modal.style.display && modal.style.display !== 'none') {
modal.style.display = 'none';
}
document.querySelectorAll('.modal.show').forEach(modal => {
modal.classList.remove('show');
});
}
});

189
server.js
View File

@@ -188,7 +188,7 @@ async function initDatabase() {
[exec.id]
);
await connection.query(
"UPDATE executions SET logs = JSON_ARRAY_APPEND(COALESCE(logs, '[]'), '$', CAST(? AS JSON)) WHERE id = ?",
"UPDATE executions SET logs = JSON_ARRAY_APPEND(COALESCE(logs, '[]'), '$', JSON_EXTRACT(?, '$')) WHERE id = ?",
[JSON.stringify({ action: 'server_restart_recovery', message: 'Execution marked failed due to server restart', timestamp: new Date().toISOString() }), exec.id]
);
}
@@ -237,7 +237,20 @@ async function processScheduledCommands() {
);
for (const schedule of schedules) {
// Prevent overlapping execution — skip if a previous run is still active
// Atomically claim this run by advancing next_run before doing any work.
// If two scheduler instances race, only the one that updates a row proceeds.
const claimNextRun = calculateNextRun(schedule.schedule_type, schedule.schedule_value);
const [claimed] = await pool.query(
`UPDATE scheduled_commands SET next_run = ?, last_run = NOW()
WHERE id = ? AND (next_run IS NULL OR next_run <= NOW())`,
[claimNextRun, schedule.id]
);
if (claimed.affectedRows === 0) {
console.log(`[Scheduler] Skipping "${schedule.name}" - already claimed by another run`);
continue;
}
// Also skip if a previous run is still active
const [runningExecs] = await pool.query(
"SELECT id FROM executions WHERE started_by = ? AND status = 'running'",
[`scheduler:${schedule.name}`]
@@ -291,18 +304,7 @@ async function processScheduledCommands() {
}
}
// Update last_run and calculate next_run
let nextRun;
try {
nextRun = calculateNextRun(schedule.schedule_type, schedule.schedule_value);
} catch (err) {
console.error(`[Scheduler] Invalid schedule config for "${schedule.name}": ${err.message}`);
continue;
}
await pool.query(
'UPDATE scheduled_commands SET last_run = NOW(), next_run = ? WHERE id = ?',
[nextRun, schedule.id]
);
// next_run and last_run already updated atomically above when we claimed the slot
}
} catch (error) {
console.error('[Scheduler] Error processing scheduled commands:', error);
@@ -376,7 +378,6 @@ wss.on('connection', (ws) => {
ws.on('message', async (data) => {
try {
const message = JSON.parse(data.toString());
console.log('WebSocket message received:', message.type);
if (message.type === 'command_result') {
// Handle command result from worker
@@ -414,6 +415,12 @@ wss.on('connection', (ws) => {
}
}
// Store last command output in execution state so prompt steps can surface it
const execStateForOutput = _executionState.get(execution_id);
if (execStateForOutput) {
execStateForOutput.state._lastCommandOutput = stdout || '';
}
// Broadcast result to browser clients only
broadcast({
type: 'command_result',
@@ -425,7 +432,7 @@ wss.on('connection', (ws) => {
is_automated: isAutomated,
});
console.log(`Command result received for execution ${execution_id}: ${success ? 'success' : 'failed'}`);
if (!success) console.warn(`[Worker] Command failed for execution ${execution_id} on worker ${worker_id}`);
}
if (message.type === 'workflow_result') {
@@ -480,6 +487,12 @@ wss.on('connection', (ws) => {
if (dbWorkers.length > 0) {
const dbWorkerId = dbWorkers[0].id;
// Clean up any stale entry for this db worker before storing the new one
// (handles reconnect: old runtime-ID entry would otherwise linger).
for (const [key, val] of workers) {
if (val.dbWorkerId === dbWorkerId && val !== ws) workers.delete(key);
}
// Store worker WebSocket connection using BOTH IDs
workers.set(worker_id, ws); // Runtime ID
workers.set(dbWorkerId, ws); // Database ID
@@ -543,7 +556,10 @@ wss.on('connection', (ws) => {
// Broadcast to browser clients only (NOT worker agents)
function broadcast(data) {
browserClients.forEach(client => {
// Snapshot the Set before iterating — a close event during iteration would
// otherwise modify the Set in-place, causing skipped or double-visited entries.
const snapshot = Array.from(browserClients);
snapshot.forEach(client => {
if (client.readyState === WebSocket.OPEN) {
client.send(JSON.stringify(data));
}
@@ -554,7 +570,7 @@ function broadcast(data) {
async function addExecutionLog(executionId, logEntry) {
try {
await pool.query(
"UPDATE executions SET logs = JSON_ARRAY_APPEND(COALESCE(logs, '[]'), '$', CAST(? AS JSON)) WHERE id = ?",
"UPDATE executions SET logs = JSON_ARRAY_APPEND(COALESCE(logs, '[]'), '$', JSON_EXTRACT(?, '$')) WHERE id = ?",
[JSON.stringify(logEntry), executionId]
);
} catch (error) {
@@ -716,6 +732,7 @@ function evalCondition(condition, state, params) {
const context = vm.createContext({ state, params, promptResponse: state.promptResponse });
return !!vm.runInNewContext(condition, context, { timeout: 100 });
} catch (e) {
console.warn(`[Workflow] evalCondition error (treated as false): ${e.message} — condition: ${condition}`);
return false;
}
}
@@ -772,6 +789,16 @@ async function executeWorkflowSteps(executionId, workflowId, definition, usernam
const step = steps[currentIndex];
const execState = _executionState.get(executionId);
if (!execState) {
// State was lost (server restart or bug) — fail cleanly rather than throwing TypeError
await addExecutionLog(executionId, {
action: 'workflow_error',
error: 'Execution state lost unexpectedly; aborting.',
timestamp: new Date().toISOString()
});
await updateExecutionStatus(executionId, 'failed');
return;
}
const stepLabel = step.name || step.id || `Step ${currentIndex + 1}`;
console.log(`[Workflow] ${executionId} — step ${currentIndex + 1}: ${stepLabel}`);
@@ -828,6 +855,47 @@ async function executeWorkflowSteps(executionId, workflowId, definition, usernam
timestamp: new Date().toISOString()
});
await new Promise(r => setTimeout(r, ms));
} else if (step.type === 'parse') {
// Parse KEY=VALUE lines from last command output into execution state.
// Matches lines of the form: UPPER_KEY=value (key must start with uppercase letter)
const condOkParse = !step.condition || evalCondition(step.condition, execState?.state || {}, execState?.params || {});
const parseState = condOkParse ? _executionState.get(executionId) : null;
if (parseState) {
const output = parseState.state._lastCommandOutput || '';
const parsed = {};
for (const line of output.split('\n')) {
const m = line.match(/^([A-Z][A-Z0-9_]*)=(.*)$/);
if (m) {
const k = m[1].toLowerCase();
const v = m[2].trim();
parsed[k] = v;
parseState.state[k] = v;
}
}
await addExecutionLog(executionId, {
step: currentIndex + 1, step_name: stepLabel, action: 'parse_complete',
parsed_count: Object.keys(parsed).length, parsed,
timestamp: new Date().toISOString()
});
}
} else if (step.type === 'route') {
// Evaluate conditions in order; jump to the first match (no user input needed).
const routeState = execState?.state || {};
const routeParams = execState?.params || {};
let routeTaken = null;
for (const cond of (step.conditions || [])) {
const matches = cond.default || evalCondition(cond.if || 'false', routeState, routeParams);
if (matches) { routeTaken = cond; gotoId = cond.goto || null; break; }
}
await addExecutionLog(executionId, {
step: currentIndex + 1, step_name: stepLabel, action: 'route_taken',
condition: routeTaken?.if || 'default',
label: routeTaken?.label || null,
goto: gotoId,
timestamp: new Date().toISOString()
});
}
await addExecutionLog(executionId, {
@@ -875,18 +943,28 @@ async function executeWorkflowSteps(executionId, workflowId, definition, usernam
// Pause execution and wait for user to respond via POST /api/executions/:id/respond.
// Resolves with the chosen option string, or null on 60-minute timeout.
async function executePromptStep(executionId, step, stepNumber) {
const message = step.message || 'Please choose an option:';
const execState = _executionState.get(executionId);
// Apply param substitution to the message so {{server_name}}, {{iface}} etc. work
let message = step.message || 'Please choose an option:';
if (execState && Object.keys(execState.params).length > 0) {
message = applyParams(message, execState.params);
}
const options = step.options || ['Continue'];
await addExecutionLog(executionId, {
// Include the last command output so the user can review results alongside the question
const lastOutput = (execState?.state?._lastCommandOutput) || null;
const logEntry = {
step: stepNumber, step_name: step.name, action: 'prompt',
message, options, timestamp: new Date().toISOString()
});
};
if (lastOutput) logEntry.output = lastOutput;
await addExecutionLog(executionId, logEntry);
broadcast({
type: 'execution_prompt',
execution_id: executionId,
prompt: { message, options, step: stepNumber, step_name: step.name }
prompt: { message, options, step: stepNumber, step_name: step.name, output: lastOutput || undefined }
});
return new Promise(resolve => {
@@ -950,6 +1028,7 @@ async function executeCommandStep(executionId, step, stepNumber, params = {}) {
// Execute command on each target worker and wait for results
const results = [];
let sentCount = 0;
for (const workerId of targetWorkerIds) {
const workerWs = workers.get(workerId);
@@ -963,9 +1042,10 @@ async function executeCommandStep(executionId, step, stepNumber, params = {}) {
});
continue;
}
sentCount++;
// Send command to worker
const commandId = crypto.randomUUID();
let commandId = crypto.randomUUID();
await addExecutionLog(executionId, {
step: stepNumber,
@@ -976,27 +1056,72 @@ async function executeCommandStep(executionId, step, stepNumber, params = {}) {
timestamp: new Date().toISOString()
});
// Per-step timeout override: step.timeout (seconds) overrides global default
const stepTimeoutMs = step.timeout
? Math.min(Math.max(parseInt(step.timeout, 10) * 1000, 5000), 600000)
: COMMAND_TIMEOUT_MS;
workerWs.send(JSON.stringify({
type: 'execute_command',
execution_id: executionId,
command_id: commandId,
command: command,
worker_id: workerId,
timeout: COMMAND_TIMEOUT_MS
timeout: stepTimeoutMs
}));
// Wait for command result (with timeout)
const result = await waitForCommandResult(executionId, commandId, COMMAND_TIMEOUT_MS);
// Per-step retry: step.retries (default 0) with step.retryDelayMs (default 2000)
const maxRetries = Math.min(parseInt(step.retries || 0, 10), 5);
const retryDelayMs = Math.min(parseInt(step.retryDelayMs || 2000, 10), 30000);
let result;
let attempt = 0;
while (true) {
result = await waitForCommandResult(executionId, commandId, stepTimeoutMs);
if (result.success || attempt >= maxRetries) break;
attempt++;
await addExecutionLog(executionId, {
step: stepNumber,
action: 'command_retry',
worker_id: workerId,
attempt,
max_retries: maxRetries,
message: `Command failed, retrying (attempt ${attempt}/${maxRetries})...`,
timestamp: new Date().toISOString()
});
await new Promise(r => setTimeout(r, retryDelayMs));
// Re-send command for retry
const retryCommandId = crypto.randomUUID();
await addExecutionLog(executionId, {
step: stepNumber, action: 'command_sent', worker_id: workerId,
command: command, command_id: retryCommandId, timestamp: new Date().toISOString()
});
workerWs.send(JSON.stringify({
type: 'execute_command', execution_id: executionId, command_id: retryCommandId,
command: command, worker_id: workerId, timeout: stepTimeoutMs
}));
commandId = retryCommandId; // update for next waitForCommandResult call
}
results.push(result);
if (!result.success) {
// Command failed, workflow should stop
// Command failed after all retries, workflow should stop
return false;
}
}
// All commands succeeded
return results.every(r => r.success);
// If every target worker was offline, treat as step failure
if (sentCount === 0) {
await addExecutionLog(executionId, {
step: stepNumber,
action: 'no_workers',
message: 'All target workers were offline when step executed',
timestamp: new Date().toISOString()
});
return false;
}
return true; // all sent commands succeeded (failures return false above)
} catch (error) {
console.error(`[Workflow] Error executing command step:`, error);
@@ -1580,7 +1705,11 @@ app.get('/api/executions/:id', authenticateSSO, async (req, res) => {
if (waitingForInput) {
for (let i = parsedLogs.length - 1; i >= 0; i--) {
if (parsedLogs[i].action === 'prompt') {
pendingPrompt = { message: parsedLogs[i].message, options: parsedLogs[i].options };
pendingPrompt = {
message: parsedLogs[i].message,
options: parsedLogs[i].options,
output: parsedLogs[i].output || null
};
break;
}
}

257
worker/worker.js Normal file
View File

@@ -0,0 +1,257 @@
const axios = require('axios');
const WebSocket = require('ws');
const { exec } = require('child_process');
const { promisify } = require('util');
const os = require('os');
const crypto = require('crypto');
require('dotenv').config();
const execAsync = promisify(exec);
class PulseWorker {
constructor() {
this.workerId = crypto.randomUUID();
this.workerName = process.env.WORKER_NAME || os.hostname();
this.serverUrl = process.env.PULSE_SERVER || 'http://localhost:8080';
this.wsUrl = process.env.PULSE_WS || 'ws://localhost:8080';
this.apiKey = process.env.WORKER_API_KEY;
this.heartbeatInterval = parseInt(process.env.HEARTBEAT_INTERVAL || '30') * 1000;
this.maxConcurrentTasks = parseInt(process.env.MAX_CONCURRENT_TASKS || '5');
this.activeTasks = 0;
this.ws = null;
this.heartbeatTimer = null;
}
async start() {
console.log(`[PULSE Worker] Starting worker: ${this.workerName}`);
console.log(`[PULSE Worker] Worker ID: ${this.workerId}`);
console.log(`[PULSE Worker] Server: ${this.serverUrl}`);
// Send initial heartbeat
await this.sendHeartbeat();
// Start heartbeat timer
this.startHeartbeat();
// Connect to WebSocket for real-time commands
this.connectWebSocket();
console.log(`[PULSE Worker] Worker started successfully`);
}
startHeartbeat() {
this.heartbeatTimer = setInterval(async () => {
try {
await this.sendHeartbeat();
} catch (error) {
console.error('[PULSE Worker] Heartbeat failed:', error.message);
}
}, this.heartbeatInterval);
}
async sendHeartbeat() {
const metadata = {
hostname: os.hostname(),
platform: os.platform(),
arch: os.arch(),
cpus: os.cpus().length,
totalMem: os.totalmem(),
freeMem: os.freemem(),
uptime: os.uptime(),
loadavg: os.loadavg(),
activeTasks: this.activeTasks,
maxConcurrentTasks: this.maxConcurrentTasks
};
try {
const response = await axios.post(
`${this.serverUrl}/api/workers/heartbeat`,
{
worker_id: this.workerId,
name: this.workerName,
metadata: metadata
},
{
headers: {
'X-API-Key': this.apiKey,
'Content-Type': 'application/json'
}
}
);
console.log(`[PULSE Worker] Heartbeat sent - Status: online`);
return response.data;
} catch (error) {
console.error('[PULSE Worker] Heartbeat error:', error.message);
throw error;
}
}
connectWebSocket() {
console.log(`[PULSE Worker] Connecting to WebSocket...`);
this.ws = new WebSocket(this.wsUrl);
this.ws.on('open', () => {
console.log('[PULSE Worker] WebSocket connected');
// Identify this worker
this.ws.send(JSON.stringify({
type: 'worker_connect',
worker_id: this.workerId,
worker_name: this.workerName
}));
});
this.ws.on('message', async (data) => {
try {
const message = JSON.parse(data.toString());
await this.handleMessage(message);
} catch (error) {
console.error('[PULSE Worker] Message handling error:', error);
}
});
this.ws.on('close', () => {
console.log('[PULSE Worker] WebSocket disconnected, reconnecting...');
setTimeout(() => this.connectWebSocket(), 5000);
});
this.ws.on('error', (error) => {
console.error('[PULSE Worker] WebSocket error:', error.message);
});
}
async handleMessage(message) {
console.log(`[PULSE Worker] Received message:`, message.type);
switch (message.type) {
case 'execute_command':
await this.executeCommand(message);
break;
case 'execute_workflow':
await this.executeWorkflow(message);
break;
case 'ping':
this.sendPong();
break;
default:
console.log(`[PULSE Worker] Unknown message type: ${message.type}`);
}
}
async executeCommand(message) {
const { command, execution_id, command_id, timeout = 300000 } = message;
if (this.activeTasks >= this.maxConcurrentTasks) {
console.log(`[PULSE Worker] Max concurrent tasks reached, rejecting command`);
return;
}
this.activeTasks++;
console.log(`[PULSE Worker] Executing command (active tasks: ${this.activeTasks})`);
try {
const startTime = Date.now();
const { stdout, stderr } = await execAsync(command, {
timeout: timeout,
maxBuffer: 10 * 1024 * 1024 // 10MB buffer
});
const duration = Date.now() - startTime;
const result = {
type: 'command_result',
execution_id,
worker_id: this.workerId,
command_id,
success: true,
stdout: stdout,
stderr: stderr,
duration: duration,
timestamp: new Date().toISOString()
};
this.sendResult(result);
console.log(`[PULSE Worker] Command completed in ${duration}ms`);
} catch (error) {
const result = {
type: 'command_result',
execution_id,
worker_id: this.workerId,
command_id,
success: false,
error: error.message,
stdout: error.stdout || '',
stderr: error.stderr || '',
timestamp: new Date().toISOString()
};
this.sendResult(result);
console.error(`[PULSE Worker] Command failed:`, error.message);
} finally {
this.activeTasks--;
}
}
async executeWorkflow(message) {
const { workflow, execution_id } = message;
console.log(`[PULSE Worker] Executing workflow: ${workflow.name}`);
// Workflow execution will be implemented in phase 2
// For now, just acknowledge receipt
this.sendResult({
type: 'workflow_result',
execution_id,
worker_id: this.workerId,
success: true,
message: 'Workflow execution not yet implemented',
timestamp: new Date().toISOString()
});
}
sendResult(result) {
if (this.ws && this.ws.readyState === WebSocket.OPEN) {
this.ws.send(JSON.stringify(result));
}
}
sendPong() {
if (this.ws && this.ws.readyState === WebSocket.OPEN) {
this.ws.send(JSON.stringify({ type: 'pong', worker_id: this.workerId }));
}
}
async stop() {
console.log('[PULSE Worker] Shutting down...');
if (this.heartbeatTimer) {
clearInterval(this.heartbeatTimer);
}
if (this.ws) {
this.ws.close();
}
console.log('[PULSE Worker] Shutdown complete');
}
}
// Start worker
const worker = new PulseWorker();
// Handle graceful shutdown
process.on('SIGTERM', async () => {
await worker.stop();
process.exit(0);
});
process.on('SIGINT', async () => {
await worker.stop();
process.exit(0);
});
// Start the worker
worker.start().catch((error) => {
console.error('[PULSE Worker] Fatal error:', error);
process.exit(1);
});