258 lines
6.8 KiB
JavaScript
258 lines
6.8 KiB
JavaScript
|
|
const axios = require('axios');
|
||
|
|
const WebSocket = require('ws');
|
||
|
|
const { exec } = require('child_process');
|
||
|
|
const { promisify } = require('util');
|
||
|
|
const os = require('os');
|
||
|
|
const crypto = require('crypto');
|
||
|
|
require('dotenv').config();
|
||
|
|
|
||
|
|
const execAsync = promisify(exec);
|
||
|
|
|
||
|
|
class PulseWorker {
|
||
|
|
constructor() {
|
||
|
|
this.workerId = crypto.randomUUID();
|
||
|
|
this.workerName = process.env.WORKER_NAME || os.hostname();
|
||
|
|
this.serverUrl = process.env.PULSE_SERVER || 'http://localhost:8080';
|
||
|
|
this.wsUrl = process.env.PULSE_WS || 'ws://localhost:8080';
|
||
|
|
this.apiKey = process.env.WORKER_API_KEY;
|
||
|
|
this.heartbeatInterval = parseInt(process.env.HEARTBEAT_INTERVAL || '30') * 1000;
|
||
|
|
this.maxConcurrentTasks = parseInt(process.env.MAX_CONCURRENT_TASKS || '5');
|
||
|
|
this.activeTasks = 0;
|
||
|
|
this.ws = null;
|
||
|
|
this.heartbeatTimer = null;
|
||
|
|
}
|
||
|
|
|
||
|
|
async start() {
|
||
|
|
console.log(`[PULSE Worker] Starting worker: ${this.workerName}`);
|
||
|
|
console.log(`[PULSE Worker] Worker ID: ${this.workerId}`);
|
||
|
|
console.log(`[PULSE Worker] Server: ${this.serverUrl}`);
|
||
|
|
|
||
|
|
// Send initial heartbeat
|
||
|
|
await this.sendHeartbeat();
|
||
|
|
|
||
|
|
// Start heartbeat timer
|
||
|
|
this.startHeartbeat();
|
||
|
|
|
||
|
|
// Connect to WebSocket for real-time commands
|
||
|
|
this.connectWebSocket();
|
||
|
|
|
||
|
|
console.log(`[PULSE Worker] Worker started successfully`);
|
||
|
|
}
|
||
|
|
|
||
|
|
startHeartbeat() {
|
||
|
|
this.heartbeatTimer = setInterval(async () => {
|
||
|
|
try {
|
||
|
|
await this.sendHeartbeat();
|
||
|
|
} catch (error) {
|
||
|
|
console.error('[PULSE Worker] Heartbeat failed:', error.message);
|
||
|
|
}
|
||
|
|
}, this.heartbeatInterval);
|
||
|
|
}
|
||
|
|
|
||
|
|
async sendHeartbeat() {
|
||
|
|
const metadata = {
|
||
|
|
hostname: os.hostname(),
|
||
|
|
platform: os.platform(),
|
||
|
|
arch: os.arch(),
|
||
|
|
cpus: os.cpus().length,
|
||
|
|
totalMem: os.totalmem(),
|
||
|
|
freeMem: os.freemem(),
|
||
|
|
uptime: os.uptime(),
|
||
|
|
loadavg: os.loadavg(),
|
||
|
|
activeTasks: this.activeTasks,
|
||
|
|
maxConcurrentTasks: this.maxConcurrentTasks
|
||
|
|
};
|
||
|
|
|
||
|
|
try {
|
||
|
|
const response = await axios.post(
|
||
|
|
`${this.serverUrl}/api/workers/heartbeat`,
|
||
|
|
{
|
||
|
|
worker_id: this.workerId,
|
||
|
|
name: this.workerName,
|
||
|
|
metadata: metadata
|
||
|
|
},
|
||
|
|
{
|
||
|
|
headers: {
|
||
|
|
'X-API-Key': this.apiKey,
|
||
|
|
'Content-Type': 'application/json'
|
||
|
|
}
|
||
|
|
}
|
||
|
|
);
|
||
|
|
|
||
|
|
console.log(`[PULSE Worker] Heartbeat sent - Status: online`);
|
||
|
|
return response.data;
|
||
|
|
} catch (error) {
|
||
|
|
console.error('[PULSE Worker] Heartbeat error:', error.message);
|
||
|
|
throw error;
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
connectWebSocket() {
|
||
|
|
console.log(`[PULSE Worker] Connecting to WebSocket...`);
|
||
|
|
|
||
|
|
this.ws = new WebSocket(this.wsUrl);
|
||
|
|
|
||
|
|
this.ws.on('open', () => {
|
||
|
|
console.log('[PULSE Worker] WebSocket connected');
|
||
|
|
// Identify this worker
|
||
|
|
this.ws.send(JSON.stringify({
|
||
|
|
type: 'worker_connect',
|
||
|
|
worker_id: this.workerId,
|
||
|
|
worker_name: this.workerName
|
||
|
|
}));
|
||
|
|
});
|
||
|
|
|
||
|
|
this.ws.on('message', async (data) => {
|
||
|
|
try {
|
||
|
|
const message = JSON.parse(data.toString());
|
||
|
|
await this.handleMessage(message);
|
||
|
|
} catch (error) {
|
||
|
|
console.error('[PULSE Worker] Message handling error:', error);
|
||
|
|
}
|
||
|
|
});
|
||
|
|
|
||
|
|
this.ws.on('close', () => {
|
||
|
|
console.log('[PULSE Worker] WebSocket disconnected, reconnecting...');
|
||
|
|
setTimeout(() => this.connectWebSocket(), 5000);
|
||
|
|
});
|
||
|
|
|
||
|
|
this.ws.on('error', (error) => {
|
||
|
|
console.error('[PULSE Worker] WebSocket error:', error.message);
|
||
|
|
});
|
||
|
|
}
|
||
|
|
|
||
|
|
async handleMessage(message) {
|
||
|
|
console.log(`[PULSE Worker] Received message:`, message.type);
|
||
|
|
|
||
|
|
switch (message.type) {
|
||
|
|
case 'execute_command':
|
||
|
|
await this.executeCommand(message);
|
||
|
|
break;
|
||
|
|
case 'execute_workflow':
|
||
|
|
await this.executeWorkflow(message);
|
||
|
|
break;
|
||
|
|
case 'ping':
|
||
|
|
this.sendPong();
|
||
|
|
break;
|
||
|
|
default:
|
||
|
|
console.log(`[PULSE Worker] Unknown message type: ${message.type}`);
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
async executeCommand(message) {
|
||
|
|
const { command, execution_id, command_id, timeout = 300000 } = message;
|
||
|
|
|
||
|
|
if (this.activeTasks >= this.maxConcurrentTasks) {
|
||
|
|
console.log(`[PULSE Worker] Max concurrent tasks reached, rejecting command`);
|
||
|
|
return;
|
||
|
|
}
|
||
|
|
|
||
|
|
this.activeTasks++;
|
||
|
|
console.log(`[PULSE Worker] Executing command (active tasks: ${this.activeTasks})`);
|
||
|
|
|
||
|
|
try {
|
||
|
|
const startTime = Date.now();
|
||
|
|
const { stdout, stderr } = await execAsync(command, {
|
||
|
|
timeout: timeout,
|
||
|
|
maxBuffer: 10 * 1024 * 1024 // 10MB buffer
|
||
|
|
});
|
||
|
|
const duration = Date.now() - startTime;
|
||
|
|
|
||
|
|
const result = {
|
||
|
|
type: 'command_result',
|
||
|
|
execution_id,
|
||
|
|
worker_id: this.workerId,
|
||
|
|
command_id,
|
||
|
|
success: true,
|
||
|
|
stdout: stdout,
|
||
|
|
stderr: stderr,
|
||
|
|
duration: duration,
|
||
|
|
timestamp: new Date().toISOString()
|
||
|
|
};
|
||
|
|
|
||
|
|
this.sendResult(result);
|
||
|
|
console.log(`[PULSE Worker] Command completed in ${duration}ms`);
|
||
|
|
} catch (error) {
|
||
|
|
const result = {
|
||
|
|
type: 'command_result',
|
||
|
|
execution_id,
|
||
|
|
worker_id: this.workerId,
|
||
|
|
command_id,
|
||
|
|
success: false,
|
||
|
|
error: error.message,
|
||
|
|
stdout: error.stdout || '',
|
||
|
|
stderr: error.stderr || '',
|
||
|
|
timestamp: new Date().toISOString()
|
||
|
|
};
|
||
|
|
|
||
|
|
this.sendResult(result);
|
||
|
|
console.error(`[PULSE Worker] Command failed:`, error.message);
|
||
|
|
} finally {
|
||
|
|
this.activeTasks--;
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
async executeWorkflow(message) {
|
||
|
|
const { workflow, execution_id } = message;
|
||
|
|
|
||
|
|
console.log(`[PULSE Worker] Executing workflow: ${workflow.name}`);
|
||
|
|
|
||
|
|
// Workflow execution will be implemented in phase 2
|
||
|
|
// For now, just acknowledge receipt
|
||
|
|
this.sendResult({
|
||
|
|
type: 'workflow_result',
|
||
|
|
execution_id,
|
||
|
|
worker_id: this.workerId,
|
||
|
|
success: true,
|
||
|
|
message: 'Workflow execution not yet implemented',
|
||
|
|
timestamp: new Date().toISOString()
|
||
|
|
});
|
||
|
|
}
|
||
|
|
|
||
|
|
sendResult(result) {
|
||
|
|
if (this.ws && this.ws.readyState === WebSocket.OPEN) {
|
||
|
|
this.ws.send(JSON.stringify(result));
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
sendPong() {
|
||
|
|
if (this.ws && this.ws.readyState === WebSocket.OPEN) {
|
||
|
|
this.ws.send(JSON.stringify({ type: 'pong', worker_id: this.workerId }));
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
async stop() {
|
||
|
|
console.log('[PULSE Worker] Shutting down...');
|
||
|
|
|
||
|
|
if (this.heartbeatTimer) {
|
||
|
|
clearInterval(this.heartbeatTimer);
|
||
|
|
}
|
||
|
|
|
||
|
|
if (this.ws) {
|
||
|
|
this.ws.close();
|
||
|
|
}
|
||
|
|
|
||
|
|
console.log('[PULSE Worker] Shutdown complete');
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
// Start worker
|
||
|
|
const worker = new PulseWorker();
|
||
|
|
|
||
|
|
// Handle graceful shutdown
|
||
|
|
process.on('SIGTERM', async () => {
|
||
|
|
await worker.stop();
|
||
|
|
process.exit(0);
|
||
|
|
});
|
||
|
|
|
||
|
|
process.on('SIGINT', async () => {
|
||
|
|
await worker.stop();
|
||
|
|
process.exit(0);
|
||
|
|
});
|
||
|
|
|
||
|
|
// Start the worker
|
||
|
|
worker.start().catch((error) => {
|
||
|
|
console.error('[PULSE Worker] Fatal error:', error);
|
||
|
|
process.exit(1);
|
||
|
|
});
|