const axios = require('axios'); const WebSocket = require('ws'); const { exec } = require('child_process'); const { promisify } = require('util'); const os = require('os'); const crypto = require('crypto'); require('dotenv').config(); const execAsync = promisify(exec); class PulseWorker { constructor() { this.workerId = crypto.randomUUID(); this.workerName = process.env.WORKER_NAME || os.hostname(); this.serverUrl = process.env.PULSE_SERVER || 'http://localhost:8080'; this.wsUrl = process.env.PULSE_WS || 'ws://localhost:8080'; this.apiKey = process.env.WORKER_API_KEY; this.heartbeatInterval = parseInt(process.env.HEARTBEAT_INTERVAL || '30') * 1000; this.maxConcurrentTasks = parseInt(process.env.MAX_CONCURRENT_TASKS || '5'); this.activeTasks = 0; this.ws = null; this.heartbeatTimer = null; } async start() { console.log(`[PULSE Worker] Starting worker: ${this.workerName}`); console.log(`[PULSE Worker] Worker ID: ${this.workerId}`); console.log(`[PULSE Worker] Server: ${this.serverUrl}`); // Send initial heartbeat await this.sendHeartbeat(); // Start heartbeat timer this.startHeartbeat(); // Connect to WebSocket for real-time commands this.connectWebSocket(); console.log(`[PULSE Worker] Worker started successfully`); } startHeartbeat() { this.heartbeatTimer = setInterval(async () => { try { await this.sendHeartbeat(); } catch (error) { console.error('[PULSE Worker] Heartbeat failed:', error.message); } }, this.heartbeatInterval); } async sendHeartbeat() { const metadata = { hostname: os.hostname(), platform: os.platform(), arch: os.arch(), cpus: os.cpus().length, totalMem: os.totalmem(), freeMem: os.freemem(), uptime: os.uptime(), loadavg: os.loadavg(), activeTasks: this.activeTasks, maxConcurrentTasks: this.maxConcurrentTasks }; try { const response = await axios.post( `${this.serverUrl}/api/workers/heartbeat`, { worker_id: this.workerId, name: this.workerName, metadata: metadata }, { headers: { 'X-API-Key': this.apiKey, 'Content-Type': 'application/json' } } ); console.log(`[PULSE Worker] Heartbeat sent - Status: online`); return response.data; } catch (error) { console.error('[PULSE Worker] Heartbeat error:', error.message); throw error; } } connectWebSocket() { console.log(`[PULSE Worker] Connecting to WebSocket...`); this.ws = new WebSocket(this.wsUrl); this.ws.on('open', () => { console.log('[PULSE Worker] WebSocket connected'); // Identify this worker this.ws.send(JSON.stringify({ type: 'worker_connect', worker_id: this.workerId, worker_name: this.workerName })); }); this.ws.on('message', async (data) => { try { const message = JSON.parse(data.toString()); await this.handleMessage(message); } catch (error) { console.error('[PULSE Worker] Message handling error:', error); } }); this.ws.on('close', () => { console.log('[PULSE Worker] WebSocket disconnected, reconnecting...'); setTimeout(() => this.connectWebSocket(), 5000); }); this.ws.on('error', (error) => { console.error('[PULSE Worker] WebSocket error:', error.message); }); } async handleMessage(message) { console.log(`[PULSE Worker] Received message:`, message.type); switch (message.type) { case 'execute_command': await this.executeCommand(message); break; case 'execute_workflow': await this.executeWorkflow(message); break; case 'ping': this.sendPong(); break; default: console.log(`[PULSE Worker] Unknown message type: ${message.type}`); } } async executeCommand(message) { const { command, execution_id, command_id, timeout = 300000 } = message; if (this.activeTasks >= this.maxConcurrentTasks) { console.log(`[PULSE Worker] Max concurrent tasks reached, rejecting command`); return; } this.activeTasks++; console.log(`[PULSE Worker] Executing command (active tasks: ${this.activeTasks})`); try { const startTime = Date.now(); const { stdout, stderr } = await execAsync(command, { timeout: timeout, maxBuffer: 10 * 1024 * 1024 // 10MB buffer }); const duration = Date.now() - startTime; const result = { type: 'command_result', execution_id, worker_id: this.workerId, command_id, success: true, stdout: stdout, stderr: stderr, duration: duration, timestamp: new Date().toISOString() }; this.sendResult(result); console.log(`[PULSE Worker] Command completed in ${duration}ms`); } catch (error) { const result = { type: 'command_result', execution_id, worker_id: this.workerId, command_id, success: false, error: error.message, stdout: error.stdout || '', stderr: error.stderr || '', timestamp: new Date().toISOString() }; this.sendResult(result); console.error(`[PULSE Worker] Command failed:`, error.message); } finally { this.activeTasks--; } } async executeWorkflow(message) { const { workflow, execution_id } = message; console.log(`[PULSE Worker] Executing workflow: ${workflow.name}`); // Workflow execution will be implemented in phase 2 // For now, just acknowledge receipt this.sendResult({ type: 'workflow_result', execution_id, worker_id: this.workerId, success: true, message: 'Workflow execution not yet implemented', timestamp: new Date().toISOString() }); } sendResult(result) { if (this.ws && this.ws.readyState === WebSocket.OPEN) { this.ws.send(JSON.stringify(result)); } } sendPong() { if (this.ws && this.ws.readyState === WebSocket.OPEN) { this.ws.send(JSON.stringify({ type: 'pong', worker_id: this.workerId })); } } async stop() { console.log('[PULSE Worker] Shutting down...'); if (this.heartbeatTimer) { clearInterval(this.heartbeatTimer); } if (this.ws) { this.ws.close(); } console.log('[PULSE Worker] Shutdown complete'); } } // Start worker const worker = new PulseWorker(); // Handle graceful shutdown process.on('SIGTERM', async () => { await worker.stop(); process.exit(0); }); process.on('SIGINT', async () => { await worker.stop(); process.exit(0); }); // Start the worker worker.start().catch((error) => { console.error('[PULSE Worker] Fatal error:', error); process.exit(1); });