Compare commits
14 Commits
main
...
adbbec2631
| Author | SHA1 | Date | |
|---|---|---|---|
| adbbec2631 | |||
| 8b8d2c6312 | |||
| 1f5c84f327 | |||
| e03f8d6287 | |||
| 2097b73404 | |||
| 6d15e4d240 | |||
| 7896b40d91 | |||
| e2dc371bfe | |||
| df0184facf | |||
| a8be111e04 | |||
| b3806545bd | |||
| 2767087e27 | |||
| a1cf8ac90b | |||
| 9e842624e1 |
3
.gitignore
vendored
3
.gitignore
vendored
@@ -30,3 +30,6 @@ Thumbs.db
|
||||
*.swp
|
||||
*.swo
|
||||
*~
|
||||
|
||||
# Claude
|
||||
Claude.md
|
||||
File diff suppressed because it is too large
Load Diff
187
server.js
187
server.js
@@ -46,6 +46,8 @@ async function initDatabase() {
|
||||
)
|
||||
`);
|
||||
|
||||
// Database schema is managed manually - migrations removed after direct database fixes
|
||||
|
||||
await connection.query(`
|
||||
CREATE TABLE IF NOT EXISTS workers (
|
||||
id VARCHAR(36) PRIMARY KEY,
|
||||
@@ -75,13 +77,12 @@ async function initDatabase() {
|
||||
await connection.query(`
|
||||
CREATE TABLE IF NOT EXISTS executions (
|
||||
id VARCHAR(36) PRIMARY KEY,
|
||||
workflow_id VARCHAR(36) NOT NULL,
|
||||
workflow_id VARCHAR(36) NULL,
|
||||
status VARCHAR(50) NOT NULL,
|
||||
started_by VARCHAR(255),
|
||||
started_at TIMESTAMP NULL,
|
||||
completed_at TIMESTAMP NULL,
|
||||
logs JSON,
|
||||
FOREIGN KEY (workflow_id) REFERENCES workflows(id) ON DELETE CASCADE,
|
||||
INDEX idx_workflow (workflow_id),
|
||||
INDEX idx_status (status),
|
||||
INDEX idx_started (started_at)
|
||||
@@ -99,9 +100,147 @@ async function initDatabase() {
|
||||
|
||||
// WebSocket connections
|
||||
const clients = new Set();
|
||||
const workers = new Map(); // Map worker_id -> WebSocket connection
|
||||
|
||||
wss.on('connection', (ws) => {
|
||||
clients.add(ws);
|
||||
ws.on('close', () => clients.delete(ws));
|
||||
|
||||
// Handle incoming messages from workers
|
||||
ws.on('message', async (data) => {
|
||||
try {
|
||||
const message = JSON.parse(data.toString());
|
||||
console.log('WebSocket message received:', message.type);
|
||||
|
||||
if (message.type === 'command_result') {
|
||||
// Handle command result from worker
|
||||
const { execution_id, worker_id, success, stdout, stderr, duration, timestamp } = message;
|
||||
|
||||
// Add result to execution logs
|
||||
await addExecutionLog(execution_id, {
|
||||
step: 'command_execution',
|
||||
action: 'command_result',
|
||||
worker_id: worker_id,
|
||||
success: success,
|
||||
stdout: stdout,
|
||||
stderr: stderr,
|
||||
duration: duration,
|
||||
timestamp: timestamp || new Date().toISOString()
|
||||
});
|
||||
|
||||
// Update execution status to completed or failed
|
||||
const finalStatus = success ? 'completed' : 'failed';
|
||||
await updateExecutionStatus(execution_id, finalStatus);
|
||||
|
||||
// Broadcast result to all connected clients
|
||||
broadcast({
|
||||
type: 'command_result',
|
||||
execution_id: execution_id,
|
||||
worker_id: worker_id,
|
||||
success: success,
|
||||
stdout: stdout,
|
||||
stderr: stderr
|
||||
});
|
||||
|
||||
console.log(`Command result received for execution ${execution_id}: ${finalStatus}`);
|
||||
}
|
||||
|
||||
if (message.type === 'workflow_result') {
|
||||
// Handle workflow result from worker
|
||||
const { execution_id, worker_id, success, message: resultMessage, timestamp } = message;
|
||||
|
||||
// Add final result to logs
|
||||
await addExecutionLog(execution_id, {
|
||||
step: 'workflow_completion',
|
||||
action: 'workflow_result',
|
||||
worker_id: worker_id,
|
||||
success: success,
|
||||
message: resultMessage,
|
||||
timestamp: timestamp || new Date().toISOString()
|
||||
});
|
||||
|
||||
// Update execution status
|
||||
const finalStatus = success ? 'completed' : 'failed';
|
||||
await updateExecutionStatus(execution_id, finalStatus);
|
||||
|
||||
// Broadcast completion to all clients
|
||||
broadcast({
|
||||
type: 'workflow_result',
|
||||
execution_id: execution_id,
|
||||
status: finalStatus,
|
||||
success: success,
|
||||
message: resultMessage
|
||||
});
|
||||
|
||||
console.log(`Workflow result received for execution ${execution_id}: ${finalStatus}`);
|
||||
}
|
||||
|
||||
if (message.type === 'worker_connect') {
|
||||
// Handle worker connection
|
||||
const { worker_id, worker_name } = message;
|
||||
console.log(`Worker connected: ${worker_name} (${worker_id})`);
|
||||
|
||||
// Find the database worker ID by name
|
||||
const [dbWorkers] = await pool.query(
|
||||
'SELECT id FROM workers WHERE name = ?',
|
||||
[worker_name]
|
||||
);
|
||||
|
||||
if (dbWorkers.length > 0) {
|
||||
const dbWorkerId = dbWorkers[0].id;
|
||||
|
||||
// Store worker WebSocket connection using BOTH IDs
|
||||
workers.set(worker_id, ws); // Runtime ID
|
||||
workers.set(dbWorkerId, ws); // Database ID
|
||||
|
||||
// Store mapping for cleanup
|
||||
ws.workerId = worker_id;
|
||||
ws.dbWorkerId = dbWorkerId;
|
||||
|
||||
console.log(`Mapped worker: runtime_id=${worker_id}, db_id=${dbWorkerId}, name=${worker_name}`);
|
||||
|
||||
// Update worker status to online
|
||||
await pool.query(
|
||||
`UPDATE workers SET status='online', last_heartbeat=NOW() WHERE id=?`,
|
||||
[dbWorkerId]
|
||||
);
|
||||
|
||||
// Broadcast worker status update with database ID
|
||||
broadcast({
|
||||
type: 'worker_update',
|
||||
worker_id: dbWorkerId,
|
||||
status: 'online'
|
||||
});
|
||||
} else {
|
||||
console.log(`Worker ${worker_name} not found in database, will be created on heartbeat`);
|
||||
}
|
||||
}
|
||||
|
||||
if (message.type === 'pong') {
|
||||
// Handle worker pong response
|
||||
const { worker_id } = message;
|
||||
await pool.query(
|
||||
`UPDATE workers SET last_heartbeat=NOW() WHERE id=?`,
|
||||
[worker_id]
|
||||
);
|
||||
}
|
||||
|
||||
} catch (error) {
|
||||
console.error('WebSocket message error:', error);
|
||||
}
|
||||
});
|
||||
|
||||
ws.on('close', () => {
|
||||
clients.delete(ws);
|
||||
// Remove worker from workers map when disconnected (both runtime and db IDs)
|
||||
if (ws.workerId) {
|
||||
workers.delete(ws.workerId);
|
||||
console.log(`Worker ${ws.workerId} (runtime ID) disconnected`);
|
||||
}
|
||||
if (ws.dbWorkerId) {
|
||||
workers.delete(ws.dbWorkerId);
|
||||
console.log(`Worker ${ws.dbWorkerId} (database ID) disconnected`);
|
||||
}
|
||||
});
|
||||
});
|
||||
|
||||
// Broadcast to all connected clients
|
||||
@@ -279,13 +418,11 @@ app.get('/api/executions', authenticateSSO, async (req, res) => {
|
||||
}
|
||||
});
|
||||
|
||||
app.get('/api/executions/:id', authenticateSSO, async (req, res) => {
|
||||
app.delete('/api/executions/:id', authenticateSSO, async (req, res) => {
|
||||
try {
|
||||
const [rows] = await pool.query('SELECT * FROM executions WHERE id = ?', [req.params.id]);
|
||||
if (rows.length === 0) {
|
||||
return res.status(404).json({ error: 'Not found' });
|
||||
}
|
||||
res.json(rows[0]);
|
||||
await pool.query('DELETE FROM executions WHERE id = ?', [req.params.id]);
|
||||
broadcast({ type: 'execution_deleted', execution_id: req.params.id });
|
||||
res.json({ success: true });
|
||||
} catch (error) {
|
||||
res.status(500).json({ error: error.message });
|
||||
}
|
||||
@@ -666,22 +803,38 @@ app.post('/api/workers/:id/command', authenticateSSO, async (req, res) => {
|
||||
try {
|
||||
const { command } = req.body;
|
||||
const executionId = generateUUID();
|
||||
|
||||
// Send command via WebSocket
|
||||
const workerId = req.params.id;
|
||||
|
||||
// Create execution record in database
|
||||
await pool.query(
|
||||
'INSERT INTO executions (id, workflow_id, status, started_by, started_at, logs) VALUES (?, ?, ?, ?, NOW(), ?)',
|
||||
[executionId, null, 'running', req.user.username, JSON.stringify([{
|
||||
step: 'quick_command',
|
||||
action: 'command_sent',
|
||||
worker_id: workerId,
|
||||
command: command,
|
||||
timestamp: new Date().toISOString()
|
||||
}])]
|
||||
);
|
||||
|
||||
// Send command via WebSocket to specific worker
|
||||
const commandMessage = {
|
||||
type: 'execute_command',
|
||||
execution_id: executionId,
|
||||
command: command,
|
||||
worker_id: req.params.id,
|
||||
worker_id: workerId,
|
||||
timeout: 60000
|
||||
};
|
||||
|
||||
clients.forEach(client => {
|
||||
if (client.readyState === WebSocket.OPEN) {
|
||||
client.send(JSON.stringify(commandMessage));
|
||||
}
|
||||
});
|
||||
const workerWs = workers.get(workerId);
|
||||
if (!workerWs || workerWs.readyState !== WebSocket.OPEN) {
|
||||
return res.status(400).json({ error: 'Worker not connected' });
|
||||
}
|
||||
|
||||
workerWs.send(JSON.stringify(commandMessage));
|
||||
console.log(`Command sent to worker ${workerId}: ${command}`);
|
||||
|
||||
broadcast({ type: 'execution_started', execution_id: executionId, workflow_id: null });
|
||||
res.json({ success: true, execution_id: executionId });
|
||||
} catch (error) {
|
||||
res.status(500).json({ error: error.message });
|
||||
|
||||
Reference in New Issue
Block a user