Files
pulse/worker/worker.js

258 lines
6.8 KiB
JavaScript
Raw Permalink Normal View History

const axios = require('axios');
const WebSocket = require('ws');
const { exec } = require('child_process');
const { promisify } = require('util');
const os = require('os');
const crypto = require('crypto');
require('dotenv').config();
const execAsync = promisify(exec);
class PulseWorker {
constructor() {
this.workerId = crypto.randomUUID();
this.workerName = process.env.WORKER_NAME || os.hostname();
this.serverUrl = process.env.PULSE_SERVER || 'http://localhost:8080';
this.wsUrl = process.env.PULSE_WS || 'ws://localhost:8080';
this.apiKey = process.env.WORKER_API_KEY;
this.heartbeatInterval = parseInt(process.env.HEARTBEAT_INTERVAL || '30') * 1000;
this.maxConcurrentTasks = parseInt(process.env.MAX_CONCURRENT_TASKS || '5');
this.activeTasks = 0;
this.ws = null;
this.heartbeatTimer = null;
}
async start() {
console.log(`[PULSE Worker] Starting worker: ${this.workerName}`);
console.log(`[PULSE Worker] Worker ID: ${this.workerId}`);
console.log(`[PULSE Worker] Server: ${this.serverUrl}`);
// Send initial heartbeat
await this.sendHeartbeat();
// Start heartbeat timer
this.startHeartbeat();
// Connect to WebSocket for real-time commands
this.connectWebSocket();
console.log(`[PULSE Worker] Worker started successfully`);
}
startHeartbeat() {
this.heartbeatTimer = setInterval(async () => {
try {
await this.sendHeartbeat();
} catch (error) {
console.error('[PULSE Worker] Heartbeat failed:', error.message);
}
}, this.heartbeatInterval);
}
async sendHeartbeat() {
const metadata = {
hostname: os.hostname(),
platform: os.platform(),
arch: os.arch(),
cpus: os.cpus().length,
totalMem: os.totalmem(),
freeMem: os.freemem(),
uptime: os.uptime(),
loadavg: os.loadavg(),
activeTasks: this.activeTasks,
maxConcurrentTasks: this.maxConcurrentTasks
};
try {
const response = await axios.post(
`${this.serverUrl}/api/workers/heartbeat`,
{
worker_id: this.workerId,
name: this.workerName,
metadata: metadata
},
{
headers: {
'X-API-Key': this.apiKey,
'Content-Type': 'application/json'
}
}
);
console.log(`[PULSE Worker] Heartbeat sent - Status: online`);
return response.data;
} catch (error) {
console.error('[PULSE Worker] Heartbeat error:', error.message);
throw error;
}
}
connectWebSocket() {
console.log(`[PULSE Worker] Connecting to WebSocket...`);
this.ws = new WebSocket(this.wsUrl);
this.ws.on('open', () => {
console.log('[PULSE Worker] WebSocket connected');
// Identify this worker
this.ws.send(JSON.stringify({
type: 'worker_connect',
worker_id: this.workerId,
worker_name: this.workerName
}));
});
this.ws.on('message', async (data) => {
try {
const message = JSON.parse(data.toString());
await this.handleMessage(message);
} catch (error) {
console.error('[PULSE Worker] Message handling error:', error);
}
});
this.ws.on('close', () => {
console.log('[PULSE Worker] WebSocket disconnected, reconnecting...');
setTimeout(() => this.connectWebSocket(), 5000);
});
this.ws.on('error', (error) => {
console.error('[PULSE Worker] WebSocket error:', error.message);
});
}
async handleMessage(message) {
console.log(`[PULSE Worker] Received message:`, message.type);
switch (message.type) {
case 'execute_command':
await this.executeCommand(message);
break;
case 'execute_workflow':
await this.executeWorkflow(message);
break;
case 'ping':
this.sendPong();
break;
default:
console.log(`[PULSE Worker] Unknown message type: ${message.type}`);
}
}
async executeCommand(message) {
const { command, execution_id, command_id, timeout = 300000 } = message;
if (this.activeTasks >= this.maxConcurrentTasks) {
console.log(`[PULSE Worker] Max concurrent tasks reached, rejecting command`);
return;
}
this.activeTasks++;
console.log(`[PULSE Worker] Executing command (active tasks: ${this.activeTasks})`);
try {
const startTime = Date.now();
const { stdout, stderr } = await execAsync(command, {
timeout: timeout,
maxBuffer: 10 * 1024 * 1024 // 10MB buffer
});
const duration = Date.now() - startTime;
const result = {
type: 'command_result',
execution_id,
worker_id: this.workerId,
command_id,
success: true,
stdout: stdout,
stderr: stderr,
duration: duration,
timestamp: new Date().toISOString()
};
this.sendResult(result);
console.log(`[PULSE Worker] Command completed in ${duration}ms`);
} catch (error) {
const result = {
type: 'command_result',
execution_id,
worker_id: this.workerId,
command_id,
success: false,
error: error.message,
stdout: error.stdout || '',
stderr: error.stderr || '',
timestamp: new Date().toISOString()
};
this.sendResult(result);
console.error(`[PULSE Worker] Command failed:`, error.message);
} finally {
this.activeTasks--;
}
}
async executeWorkflow(message) {
const { workflow, execution_id } = message;
console.log(`[PULSE Worker] Executing workflow: ${workflow.name}`);
// Workflow execution will be implemented in phase 2
// For now, just acknowledge receipt
this.sendResult({
type: 'workflow_result',
execution_id,
worker_id: this.workerId,
success: true,
message: 'Workflow execution not yet implemented',
timestamp: new Date().toISOString()
});
}
sendResult(result) {
if (this.ws && this.ws.readyState === WebSocket.OPEN) {
this.ws.send(JSON.stringify(result));
}
}
sendPong() {
if (this.ws && this.ws.readyState === WebSocket.OPEN) {
this.ws.send(JSON.stringify({ type: 'pong', worker_id: this.workerId }));
}
}
async stop() {
console.log('[PULSE Worker] Shutting down...');
if (this.heartbeatTimer) {
clearInterval(this.heartbeatTimer);
}
if (this.ws) {
this.ws.close();
}
console.log('[PULSE Worker] Shutdown complete');
}
}
// Start worker
const worker = new PulseWorker();
// Handle graceful shutdown
process.on('SIGTERM', async () => {
await worker.stop();
process.exit(0);
});
process.on('SIGINT', async () => {
await worker.stop();
process.exit(0);
});
// Start the worker
worker.start().catch((error) => {
console.error('[PULSE Worker] Fatal error:', error);
process.exit(1);
});