diff --git a/public/index.html b/public/index.html index 245a9c9..b601fb4 100644 --- a/public/index.html +++ b/public/index.html @@ -574,6 +574,19 @@ font-family: var(--font-mono); margin-bottom: 14px; } + .prompt-output { + background: rgba(0, 0, 0, 0.4); + border: 1px solid rgba(0, 255, 65, 0.25); + color: var(--terminal-green); + font-family: var(--font-mono); + font-size: 0.78em; + padding: 10px; + max-height: 280px; + overflow-y: auto; + white-space: pre-wrap; + word-break: break-all; + margin-bottom: 14px; + } .prompt-opt-btn { padding: 7px 16px; margin: 4px 4px 4px 0; @@ -2002,13 +2015,15 @@ `; if (execution.waiting_for_input && execution.prompt) { + const promptOutput = execution.prompt.output || ''; html += `

Waiting for Input

+ ${promptOutput ? `
${escapeHtml(promptOutput)}
` : ''}

${escapeHtml(execution.prompt.message || '')}

${(execution.prompt.options || []).map(opt => - `` + `` ).join('')}
@@ -2172,7 +2187,7 @@ if (log.action === 'prompt') { const optionsHtml = (log.options || []).map(opt => { if (executionId) { - return ``; + return ``; } return ``; }).join(''); @@ -2181,6 +2196,7 @@
[${timestamp}]
❓ Step ${log.step}: ${escapeHtml(log.step_name || 'Prompt')}
+ ${log.output ? `
${escapeHtml(log.output)}
` : ''}
${escapeHtml(log.message || '')}
${optionsHtml}
diff --git a/server.js b/server.js index 61f3abd..96b0c83 100644 --- a/server.js +++ b/server.js @@ -188,7 +188,7 @@ async function initDatabase() { [exec.id] ); await connection.query( - "UPDATE executions SET logs = JSON_ARRAY_APPEND(COALESCE(logs, '[]'), '$', CAST(? AS JSON)) WHERE id = ?", + "UPDATE executions SET logs = JSON_ARRAY_APPEND(COALESCE(logs, '[]'), '$', JSON_EXTRACT(?, '$')) WHERE id = ?", [JSON.stringify({ action: 'server_restart_recovery', message: 'Execution marked failed due to server restart', timestamp: new Date().toISOString() }), exec.id] ); } @@ -415,6 +415,12 @@ wss.on('connection', (ws) => { } } + // Store last command output in execution state so prompt steps can surface it + const execStateForOutput = _executionState.get(execution_id); + if (execStateForOutput) { + execStateForOutput.state._lastCommandOutput = stdout || ''; + } + // Broadcast result to browser clients only broadcast({ type: 'command_result', @@ -564,7 +570,7 @@ function broadcast(data) { async function addExecutionLog(executionId, logEntry) { try { await pool.query( - "UPDATE executions SET logs = JSON_ARRAY_APPEND(COALESCE(logs, '[]'), '$', CAST(? AS JSON)) WHERE id = ?", + "UPDATE executions SET logs = JSON_ARRAY_APPEND(COALESCE(logs, '[]'), '$', JSON_EXTRACT(?, '$')) WHERE id = ?", [JSON.stringify(logEntry), executionId] ); } catch (error) { @@ -899,15 +905,21 @@ async function executePromptStep(executionId, step, stepNumber) { const message = step.message || 'Please choose an option:'; const options = step.options || ['Continue']; - await addExecutionLog(executionId, { + // Include the last command output so the user can review results alongside the question + const execState = _executionState.get(executionId); + const lastOutput = (execState?.state?._lastCommandOutput) || null; + + const logEntry = { step: stepNumber, step_name: step.name, action: 'prompt', message, options, timestamp: new Date().toISOString() - }); + }; + if (lastOutput) logEntry.output = lastOutput; + await addExecutionLog(executionId, logEntry); broadcast({ type: 'execution_prompt', execution_id: executionId, - prompt: { message, options, step: stepNumber, step_name: step.name } + prompt: { message, options, step: stepNumber, step_name: step.name, output: lastOutput || undefined } }); return new Promise(resolve => { @@ -1648,7 +1660,11 @@ app.get('/api/executions/:id', authenticateSSO, async (req, res) => { if (waitingForInput) { for (let i = parsedLogs.length - 1; i >= 0; i--) { if (parsedLogs[i].action === 'prompt') { - pendingPrompt = { message: parsedLogs[i].message, options: parsedLogs[i].options }; + pendingPrompt = { + message: parsedLogs[i].message, + options: parsedLogs[i].options, + output: parsedLogs[i].output || null + }; break; } } diff --git a/worker/worker.js b/worker/worker.js new file mode 100644 index 0000000..1db7804 --- /dev/null +++ b/worker/worker.js @@ -0,0 +1,257 @@ +const axios = require('axios'); +const WebSocket = require('ws'); +const { exec } = require('child_process'); +const { promisify } = require('util'); +const os = require('os'); +const crypto = require('crypto'); +require('dotenv').config(); + +const execAsync = promisify(exec); + +class PulseWorker { + constructor() { + this.workerId = crypto.randomUUID(); + this.workerName = process.env.WORKER_NAME || os.hostname(); + this.serverUrl = process.env.PULSE_SERVER || 'http://localhost:8080'; + this.wsUrl = process.env.PULSE_WS || 'ws://localhost:8080'; + this.apiKey = process.env.WORKER_API_KEY; + this.heartbeatInterval = parseInt(process.env.HEARTBEAT_INTERVAL || '30') * 1000; + this.maxConcurrentTasks = parseInt(process.env.MAX_CONCURRENT_TASKS || '5'); + this.activeTasks = 0; + this.ws = null; + this.heartbeatTimer = null; + } + + async start() { + console.log(`[PULSE Worker] Starting worker: ${this.workerName}`); + console.log(`[PULSE Worker] Worker ID: ${this.workerId}`); + console.log(`[PULSE Worker] Server: ${this.serverUrl}`); + + // Send initial heartbeat + await this.sendHeartbeat(); + + // Start heartbeat timer + this.startHeartbeat(); + + // Connect to WebSocket for real-time commands + this.connectWebSocket(); + + console.log(`[PULSE Worker] Worker started successfully`); + } + + startHeartbeat() { + this.heartbeatTimer = setInterval(async () => { + try { + await this.sendHeartbeat(); + } catch (error) { + console.error('[PULSE Worker] Heartbeat failed:', error.message); + } + }, this.heartbeatInterval); + } + + async sendHeartbeat() { + const metadata = { + hostname: os.hostname(), + platform: os.platform(), + arch: os.arch(), + cpus: os.cpus().length, + totalMem: os.totalmem(), + freeMem: os.freemem(), + uptime: os.uptime(), + loadavg: os.loadavg(), + activeTasks: this.activeTasks, + maxConcurrentTasks: this.maxConcurrentTasks + }; + + try { + const response = await axios.post( + `${this.serverUrl}/api/workers/heartbeat`, + { + worker_id: this.workerId, + name: this.workerName, + metadata: metadata + }, + { + headers: { + 'X-API-Key': this.apiKey, + 'Content-Type': 'application/json' + } + } + ); + + console.log(`[PULSE Worker] Heartbeat sent - Status: online`); + return response.data; + } catch (error) { + console.error('[PULSE Worker] Heartbeat error:', error.message); + throw error; + } + } + + connectWebSocket() { + console.log(`[PULSE Worker] Connecting to WebSocket...`); + + this.ws = new WebSocket(this.wsUrl); + + this.ws.on('open', () => { + console.log('[PULSE Worker] WebSocket connected'); + // Identify this worker + this.ws.send(JSON.stringify({ + type: 'worker_connect', + worker_id: this.workerId, + worker_name: this.workerName + })); + }); + + this.ws.on('message', async (data) => { + try { + const message = JSON.parse(data.toString()); + await this.handleMessage(message); + } catch (error) { + console.error('[PULSE Worker] Message handling error:', error); + } + }); + + this.ws.on('close', () => { + console.log('[PULSE Worker] WebSocket disconnected, reconnecting...'); + setTimeout(() => this.connectWebSocket(), 5000); + }); + + this.ws.on('error', (error) => { + console.error('[PULSE Worker] WebSocket error:', error.message); + }); + } + + async handleMessage(message) { + console.log(`[PULSE Worker] Received message:`, message.type); + + switch (message.type) { + case 'execute_command': + await this.executeCommand(message); + break; + case 'execute_workflow': + await this.executeWorkflow(message); + break; + case 'ping': + this.sendPong(); + break; + default: + console.log(`[PULSE Worker] Unknown message type: ${message.type}`); + } + } + + async executeCommand(message) { + const { command, execution_id, command_id, timeout = 300000 } = message; + + if (this.activeTasks >= this.maxConcurrentTasks) { + console.log(`[PULSE Worker] Max concurrent tasks reached, rejecting command`); + return; + } + + this.activeTasks++; + console.log(`[PULSE Worker] Executing command (active tasks: ${this.activeTasks})`); + + try { + const startTime = Date.now(); + const { stdout, stderr } = await execAsync(command, { + timeout: timeout, + maxBuffer: 10 * 1024 * 1024 // 10MB buffer + }); + const duration = Date.now() - startTime; + + const result = { + type: 'command_result', + execution_id, + worker_id: this.workerId, + command_id, + success: true, + stdout: stdout, + stderr: stderr, + duration: duration, + timestamp: new Date().toISOString() + }; + + this.sendResult(result); + console.log(`[PULSE Worker] Command completed in ${duration}ms`); + } catch (error) { + const result = { + type: 'command_result', + execution_id, + worker_id: this.workerId, + command_id, + success: false, + error: error.message, + stdout: error.stdout || '', + stderr: error.stderr || '', + timestamp: new Date().toISOString() + }; + + this.sendResult(result); + console.error(`[PULSE Worker] Command failed:`, error.message); + } finally { + this.activeTasks--; + } + } + + async executeWorkflow(message) { + const { workflow, execution_id } = message; + + console.log(`[PULSE Worker] Executing workflow: ${workflow.name}`); + + // Workflow execution will be implemented in phase 2 + // For now, just acknowledge receipt + this.sendResult({ + type: 'workflow_result', + execution_id, + worker_id: this.workerId, + success: true, + message: 'Workflow execution not yet implemented', + timestamp: new Date().toISOString() + }); + } + + sendResult(result) { + if (this.ws && this.ws.readyState === WebSocket.OPEN) { + this.ws.send(JSON.stringify(result)); + } + } + + sendPong() { + if (this.ws && this.ws.readyState === WebSocket.OPEN) { + this.ws.send(JSON.stringify({ type: 'pong', worker_id: this.workerId })); + } + } + + async stop() { + console.log('[PULSE Worker] Shutting down...'); + + if (this.heartbeatTimer) { + clearInterval(this.heartbeatTimer); + } + + if (this.ws) { + this.ws.close(); + } + + console.log('[PULSE Worker] Shutdown complete'); + } +} + +// Start worker +const worker = new PulseWorker(); + +// Handle graceful shutdown +process.on('SIGTERM', async () => { + await worker.stop(); + process.exit(0); +}); + +process.on('SIGINT', async () => { + await worker.stop(); + process.exit(0); +}); + +// Start the worker +worker.start().catch((error) => { + console.error('[PULSE Worker] Fatal error:', error); + process.exit(1); +});