Fix prompt buttons, add command output to prompt steps, add worker to repo

- Fix onclick buttons broken by JSON.stringify double-quotes inside HTML
  attributes — use data-opt attribute + this.dataset.opt instead
- Track last command stdout in execution state when command_result arrives
- executePromptStep: include last command output in log entry and broadcast
  so users can review results alongside the question in the same view
- GET /api/executions/🆔 propagate output field to pending prompt response
- Add .prompt-output CSS class for scrollable terminal-style output block
- Fix MariaDB CAST(? AS JSON) → JSON_EXTRACT(?, '$') (MariaDB 10.11 compat)
- Add worker/worker.js to repo (deployed on pulse-worker-01 / LXC 153)
  Fix: worker was not echoing command_id back in result — resolvers always
  got undefined, causing every workflow step to timeout and fail

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-03-17 19:06:02 -04:00
parent 3f6e04d1ab
commit 2290d52f8b
3 changed files with 297 additions and 8 deletions

View File

@@ -574,6 +574,19 @@
font-family: var(--font-mono); font-family: var(--font-mono);
margin-bottom: 14px; margin-bottom: 14px;
} }
.prompt-output {
background: rgba(0, 0, 0, 0.4);
border: 1px solid rgba(0, 255, 65, 0.25);
color: var(--terminal-green);
font-family: var(--font-mono);
font-size: 0.78em;
padding: 10px;
max-height: 280px;
overflow-y: auto;
white-space: pre-wrap;
word-break: break-all;
margin-bottom: 14px;
}
.prompt-opt-btn { .prompt-opt-btn {
padding: 7px 16px; padding: 7px 16px;
margin: 4px 4px 4px 0; margin: 4px 4px 4px 0;
@@ -2002,13 +2015,15 @@
`; `;
if (execution.waiting_for_input && execution.prompt) { if (execution.waiting_for_input && execution.prompt) {
const promptOutput = execution.prompt.output || '';
html += ` html += `
<div class="prompt-box"> <div class="prompt-box">
<h3>Waiting for Input</h3> <h3>Waiting for Input</h3>
${promptOutput ? `<pre class="prompt-output">${escapeHtml(promptOutput)}</pre>` : ''}
<p>${escapeHtml(execution.prompt.message || '')}</p> <p>${escapeHtml(execution.prompt.message || '')}</p>
<div style="margin-top: 10px;"> <div style="margin-top: 10px;">
${(execution.prompt.options || []).map(opt => ${(execution.prompt.options || []).map(opt =>
`<button class="prompt-opt-btn" onclick="respondToPrompt('${executionId}', ${JSON.stringify(opt)})">${escapeHtml(opt)}</button>` `<button class="prompt-opt-btn" data-opt="${opt.replace(/&/g,'&amp;').replace(/"/g,'&quot;')}" onclick="respondToPrompt('${executionId}', this.dataset.opt)">${escapeHtml(opt)}</button>`
).join('')} ).join('')}
</div> </div>
</div> </div>
@@ -2172,7 +2187,7 @@
if (log.action === 'prompt') { if (log.action === 'prompt') {
const optionsHtml = (log.options || []).map(opt => { const optionsHtml = (log.options || []).map(opt => {
if (executionId) { if (executionId) {
return `<button class="prompt-opt-btn" onclick="respondToPrompt('${executionId}', ${JSON.stringify(opt)})">${escapeHtml(opt)}</button>`; return `<button class="prompt-opt-btn" data-opt="${opt.replace(/&/g,'&amp;').replace(/"/g,'&quot;')}" onclick="respondToPrompt('${executionId}', this.dataset.opt)">${escapeHtml(opt)}</button>`;
} }
return `<button class="prompt-opt-btn answered" disabled>${escapeHtml(opt)}</button>`; return `<button class="prompt-opt-btn answered" disabled>${escapeHtml(opt)}</button>`;
}).join(''); }).join('');
@@ -2181,6 +2196,7 @@
<div class="log-timestamp">[${timestamp}]</div> <div class="log-timestamp">[${timestamp}]</div>
<div class="log-title" style="color: var(--terminal-cyan);">❓ Step ${log.step}: ${escapeHtml(log.step_name || 'Prompt')}</div> <div class="log-title" style="color: var(--terminal-cyan);">❓ Step ${log.step}: ${escapeHtml(log.step_name || 'Prompt')}</div>
<div class="log-details"> <div class="log-details">
${log.output ? `<pre class="log-output" style="max-height:300px;overflow-y:auto;margin-bottom:10px;">${escapeHtml(log.output)}</pre>` : ''}
<div style="color: var(--terminal-green); margin-bottom: 10px;">${escapeHtml(log.message || '')}</div> <div style="color: var(--terminal-green); margin-bottom: 10px;">${escapeHtml(log.message || '')}</div>
<div>${optionsHtml}</div> <div>${optionsHtml}</div>
</div> </div>

View File

@@ -188,7 +188,7 @@ async function initDatabase() {
[exec.id] [exec.id]
); );
await connection.query( await connection.query(
"UPDATE executions SET logs = JSON_ARRAY_APPEND(COALESCE(logs, '[]'), '$', CAST(? AS JSON)) WHERE id = ?", "UPDATE executions SET logs = JSON_ARRAY_APPEND(COALESCE(logs, '[]'), '$', JSON_EXTRACT(?, '$')) WHERE id = ?",
[JSON.stringify({ action: 'server_restart_recovery', message: 'Execution marked failed due to server restart', timestamp: new Date().toISOString() }), exec.id] [JSON.stringify({ action: 'server_restart_recovery', message: 'Execution marked failed due to server restart', timestamp: new Date().toISOString() }), exec.id]
); );
} }
@@ -415,6 +415,12 @@ wss.on('connection', (ws) => {
} }
} }
// Store last command output in execution state so prompt steps can surface it
const execStateForOutput = _executionState.get(execution_id);
if (execStateForOutput) {
execStateForOutput.state._lastCommandOutput = stdout || '';
}
// Broadcast result to browser clients only // Broadcast result to browser clients only
broadcast({ broadcast({
type: 'command_result', type: 'command_result',
@@ -564,7 +570,7 @@ function broadcast(data) {
async function addExecutionLog(executionId, logEntry) { async function addExecutionLog(executionId, logEntry) {
try { try {
await pool.query( await pool.query(
"UPDATE executions SET logs = JSON_ARRAY_APPEND(COALESCE(logs, '[]'), '$', CAST(? AS JSON)) WHERE id = ?", "UPDATE executions SET logs = JSON_ARRAY_APPEND(COALESCE(logs, '[]'), '$', JSON_EXTRACT(?, '$')) WHERE id = ?",
[JSON.stringify(logEntry), executionId] [JSON.stringify(logEntry), executionId]
); );
} catch (error) { } catch (error) {
@@ -899,15 +905,21 @@ async function executePromptStep(executionId, step, stepNumber) {
const message = step.message || 'Please choose an option:'; const message = step.message || 'Please choose an option:';
const options = step.options || ['Continue']; const options = step.options || ['Continue'];
await addExecutionLog(executionId, { // Include the last command output so the user can review results alongside the question
const execState = _executionState.get(executionId);
const lastOutput = (execState?.state?._lastCommandOutput) || null;
const logEntry = {
step: stepNumber, step_name: step.name, action: 'prompt', step: stepNumber, step_name: step.name, action: 'prompt',
message, options, timestamp: new Date().toISOString() message, options, timestamp: new Date().toISOString()
}); };
if (lastOutput) logEntry.output = lastOutput;
await addExecutionLog(executionId, logEntry);
broadcast({ broadcast({
type: 'execution_prompt', type: 'execution_prompt',
execution_id: executionId, execution_id: executionId,
prompt: { message, options, step: stepNumber, step_name: step.name } prompt: { message, options, step: stepNumber, step_name: step.name, output: lastOutput || undefined }
}); });
return new Promise(resolve => { return new Promise(resolve => {
@@ -1648,7 +1660,11 @@ app.get('/api/executions/:id', authenticateSSO, async (req, res) => {
if (waitingForInput) { if (waitingForInput) {
for (let i = parsedLogs.length - 1; i >= 0; i--) { for (let i = parsedLogs.length - 1; i >= 0; i--) {
if (parsedLogs[i].action === 'prompt') { if (parsedLogs[i].action === 'prompt') {
pendingPrompt = { message: parsedLogs[i].message, options: parsedLogs[i].options }; pendingPrompt = {
message: parsedLogs[i].message,
options: parsedLogs[i].options,
output: parsedLogs[i].output || null
};
break; break;
} }
} }

257
worker/worker.js Normal file
View File

@@ -0,0 +1,257 @@
const axios = require('axios');
const WebSocket = require('ws');
const { exec } = require('child_process');
const { promisify } = require('util');
const os = require('os');
const crypto = require('crypto');
require('dotenv').config();
const execAsync = promisify(exec);
class PulseWorker {
constructor() {
this.workerId = crypto.randomUUID();
this.workerName = process.env.WORKER_NAME || os.hostname();
this.serverUrl = process.env.PULSE_SERVER || 'http://localhost:8080';
this.wsUrl = process.env.PULSE_WS || 'ws://localhost:8080';
this.apiKey = process.env.WORKER_API_KEY;
this.heartbeatInterval = parseInt(process.env.HEARTBEAT_INTERVAL || '30') * 1000;
this.maxConcurrentTasks = parseInt(process.env.MAX_CONCURRENT_TASKS || '5');
this.activeTasks = 0;
this.ws = null;
this.heartbeatTimer = null;
}
async start() {
console.log(`[PULSE Worker] Starting worker: ${this.workerName}`);
console.log(`[PULSE Worker] Worker ID: ${this.workerId}`);
console.log(`[PULSE Worker] Server: ${this.serverUrl}`);
// Send initial heartbeat
await this.sendHeartbeat();
// Start heartbeat timer
this.startHeartbeat();
// Connect to WebSocket for real-time commands
this.connectWebSocket();
console.log(`[PULSE Worker] Worker started successfully`);
}
startHeartbeat() {
this.heartbeatTimer = setInterval(async () => {
try {
await this.sendHeartbeat();
} catch (error) {
console.error('[PULSE Worker] Heartbeat failed:', error.message);
}
}, this.heartbeatInterval);
}
async sendHeartbeat() {
const metadata = {
hostname: os.hostname(),
platform: os.platform(),
arch: os.arch(),
cpus: os.cpus().length,
totalMem: os.totalmem(),
freeMem: os.freemem(),
uptime: os.uptime(),
loadavg: os.loadavg(),
activeTasks: this.activeTasks,
maxConcurrentTasks: this.maxConcurrentTasks
};
try {
const response = await axios.post(
`${this.serverUrl}/api/workers/heartbeat`,
{
worker_id: this.workerId,
name: this.workerName,
metadata: metadata
},
{
headers: {
'X-API-Key': this.apiKey,
'Content-Type': 'application/json'
}
}
);
console.log(`[PULSE Worker] Heartbeat sent - Status: online`);
return response.data;
} catch (error) {
console.error('[PULSE Worker] Heartbeat error:', error.message);
throw error;
}
}
connectWebSocket() {
console.log(`[PULSE Worker] Connecting to WebSocket...`);
this.ws = new WebSocket(this.wsUrl);
this.ws.on('open', () => {
console.log('[PULSE Worker] WebSocket connected');
// Identify this worker
this.ws.send(JSON.stringify({
type: 'worker_connect',
worker_id: this.workerId,
worker_name: this.workerName
}));
});
this.ws.on('message', async (data) => {
try {
const message = JSON.parse(data.toString());
await this.handleMessage(message);
} catch (error) {
console.error('[PULSE Worker] Message handling error:', error);
}
});
this.ws.on('close', () => {
console.log('[PULSE Worker] WebSocket disconnected, reconnecting...');
setTimeout(() => this.connectWebSocket(), 5000);
});
this.ws.on('error', (error) => {
console.error('[PULSE Worker] WebSocket error:', error.message);
});
}
async handleMessage(message) {
console.log(`[PULSE Worker] Received message:`, message.type);
switch (message.type) {
case 'execute_command':
await this.executeCommand(message);
break;
case 'execute_workflow':
await this.executeWorkflow(message);
break;
case 'ping':
this.sendPong();
break;
default:
console.log(`[PULSE Worker] Unknown message type: ${message.type}`);
}
}
async executeCommand(message) {
const { command, execution_id, command_id, timeout = 300000 } = message;
if (this.activeTasks >= this.maxConcurrentTasks) {
console.log(`[PULSE Worker] Max concurrent tasks reached, rejecting command`);
return;
}
this.activeTasks++;
console.log(`[PULSE Worker] Executing command (active tasks: ${this.activeTasks})`);
try {
const startTime = Date.now();
const { stdout, stderr } = await execAsync(command, {
timeout: timeout,
maxBuffer: 10 * 1024 * 1024 // 10MB buffer
});
const duration = Date.now() - startTime;
const result = {
type: 'command_result',
execution_id,
worker_id: this.workerId,
command_id,
success: true,
stdout: stdout,
stderr: stderr,
duration: duration,
timestamp: new Date().toISOString()
};
this.sendResult(result);
console.log(`[PULSE Worker] Command completed in ${duration}ms`);
} catch (error) {
const result = {
type: 'command_result',
execution_id,
worker_id: this.workerId,
command_id,
success: false,
error: error.message,
stdout: error.stdout || '',
stderr: error.stderr || '',
timestamp: new Date().toISOString()
};
this.sendResult(result);
console.error(`[PULSE Worker] Command failed:`, error.message);
} finally {
this.activeTasks--;
}
}
async executeWorkflow(message) {
const { workflow, execution_id } = message;
console.log(`[PULSE Worker] Executing workflow: ${workflow.name}`);
// Workflow execution will be implemented in phase 2
// For now, just acknowledge receipt
this.sendResult({
type: 'workflow_result',
execution_id,
worker_id: this.workerId,
success: true,
message: 'Workflow execution not yet implemented',
timestamp: new Date().toISOString()
});
}
sendResult(result) {
if (this.ws && this.ws.readyState === WebSocket.OPEN) {
this.ws.send(JSON.stringify(result));
}
}
sendPong() {
if (this.ws && this.ws.readyState === WebSocket.OPEN) {
this.ws.send(JSON.stringify({ type: 'pong', worker_id: this.workerId }));
}
}
async stop() {
console.log('[PULSE Worker] Shutting down...');
if (this.heartbeatTimer) {
clearInterval(this.heartbeatTimer);
}
if (this.ws) {
this.ws.close();
}
console.log('[PULSE Worker] Shutdown complete');
}
}
// Start worker
const worker = new PulseWorker();
// Handle graceful shutdown
process.on('SIGTERM', async () => {
await worker.stop();
process.exit(0);
});
process.on('SIGINT', async () => {
await worker.stop();
process.exit(0);
});
// Start the worker
worker.start().catch((error) => {
console.error('[PULSE Worker] Fatal error:', error);
process.exit(1);
});