Compare commits
14 Commits
main
...
adbbec2631
| Author | SHA1 | Date | |
|---|---|---|---|
| adbbec2631 | |||
| 8b8d2c6312 | |||
| 1f5c84f327 | |||
| e03f8d6287 | |||
| 2097b73404 | |||
| 6d15e4d240 | |||
| 7896b40d91 | |||
| e2dc371bfe | |||
| df0184facf | |||
| a8be111e04 | |||
| b3806545bd | |||
| 2767087e27 | |||
| a1cf8ac90b | |||
| 9e842624e1 |
3
.gitignore
vendored
3
.gitignore
vendored
@@ -30,3 +30,6 @@ Thumbs.db
|
|||||||
*.swp
|
*.swp
|
||||||
*.swo
|
*.swo
|
||||||
*~
|
*~
|
||||||
|
|
||||||
|
# Claude
|
||||||
|
Claude.md
|
||||||
File diff suppressed because it is too large
Load Diff
185
server.js
185
server.js
@@ -46,6 +46,8 @@ async function initDatabase() {
|
|||||||
)
|
)
|
||||||
`);
|
`);
|
||||||
|
|
||||||
|
// Database schema is managed manually - migrations removed after direct database fixes
|
||||||
|
|
||||||
await connection.query(`
|
await connection.query(`
|
||||||
CREATE TABLE IF NOT EXISTS workers (
|
CREATE TABLE IF NOT EXISTS workers (
|
||||||
id VARCHAR(36) PRIMARY KEY,
|
id VARCHAR(36) PRIMARY KEY,
|
||||||
@@ -75,13 +77,12 @@ async function initDatabase() {
|
|||||||
await connection.query(`
|
await connection.query(`
|
||||||
CREATE TABLE IF NOT EXISTS executions (
|
CREATE TABLE IF NOT EXISTS executions (
|
||||||
id VARCHAR(36) PRIMARY KEY,
|
id VARCHAR(36) PRIMARY KEY,
|
||||||
workflow_id VARCHAR(36) NOT NULL,
|
workflow_id VARCHAR(36) NULL,
|
||||||
status VARCHAR(50) NOT NULL,
|
status VARCHAR(50) NOT NULL,
|
||||||
started_by VARCHAR(255),
|
started_by VARCHAR(255),
|
||||||
started_at TIMESTAMP NULL,
|
started_at TIMESTAMP NULL,
|
||||||
completed_at TIMESTAMP NULL,
|
completed_at TIMESTAMP NULL,
|
||||||
logs JSON,
|
logs JSON,
|
||||||
FOREIGN KEY (workflow_id) REFERENCES workflows(id) ON DELETE CASCADE,
|
|
||||||
INDEX idx_workflow (workflow_id),
|
INDEX idx_workflow (workflow_id),
|
||||||
INDEX idx_status (status),
|
INDEX idx_status (status),
|
||||||
INDEX idx_started (started_at)
|
INDEX idx_started (started_at)
|
||||||
@@ -99,9 +100,147 @@ async function initDatabase() {
|
|||||||
|
|
||||||
// WebSocket connections
|
// WebSocket connections
|
||||||
const clients = new Set();
|
const clients = new Set();
|
||||||
|
const workers = new Map(); // Map worker_id -> WebSocket connection
|
||||||
|
|
||||||
wss.on('connection', (ws) => {
|
wss.on('connection', (ws) => {
|
||||||
clients.add(ws);
|
clients.add(ws);
|
||||||
ws.on('close', () => clients.delete(ws));
|
|
||||||
|
// Handle incoming messages from workers
|
||||||
|
ws.on('message', async (data) => {
|
||||||
|
try {
|
||||||
|
const message = JSON.parse(data.toString());
|
||||||
|
console.log('WebSocket message received:', message.type);
|
||||||
|
|
||||||
|
if (message.type === 'command_result') {
|
||||||
|
// Handle command result from worker
|
||||||
|
const { execution_id, worker_id, success, stdout, stderr, duration, timestamp } = message;
|
||||||
|
|
||||||
|
// Add result to execution logs
|
||||||
|
await addExecutionLog(execution_id, {
|
||||||
|
step: 'command_execution',
|
||||||
|
action: 'command_result',
|
||||||
|
worker_id: worker_id,
|
||||||
|
success: success,
|
||||||
|
stdout: stdout,
|
||||||
|
stderr: stderr,
|
||||||
|
duration: duration,
|
||||||
|
timestamp: timestamp || new Date().toISOString()
|
||||||
|
});
|
||||||
|
|
||||||
|
// Update execution status to completed or failed
|
||||||
|
const finalStatus = success ? 'completed' : 'failed';
|
||||||
|
await updateExecutionStatus(execution_id, finalStatus);
|
||||||
|
|
||||||
|
// Broadcast result to all connected clients
|
||||||
|
broadcast({
|
||||||
|
type: 'command_result',
|
||||||
|
execution_id: execution_id,
|
||||||
|
worker_id: worker_id,
|
||||||
|
success: success,
|
||||||
|
stdout: stdout,
|
||||||
|
stderr: stderr
|
||||||
|
});
|
||||||
|
|
||||||
|
console.log(`Command result received for execution ${execution_id}: ${finalStatus}`);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (message.type === 'workflow_result') {
|
||||||
|
// Handle workflow result from worker
|
||||||
|
const { execution_id, worker_id, success, message: resultMessage, timestamp } = message;
|
||||||
|
|
||||||
|
// Add final result to logs
|
||||||
|
await addExecutionLog(execution_id, {
|
||||||
|
step: 'workflow_completion',
|
||||||
|
action: 'workflow_result',
|
||||||
|
worker_id: worker_id,
|
||||||
|
success: success,
|
||||||
|
message: resultMessage,
|
||||||
|
timestamp: timestamp || new Date().toISOString()
|
||||||
|
});
|
||||||
|
|
||||||
|
// Update execution status
|
||||||
|
const finalStatus = success ? 'completed' : 'failed';
|
||||||
|
await updateExecutionStatus(execution_id, finalStatus);
|
||||||
|
|
||||||
|
// Broadcast completion to all clients
|
||||||
|
broadcast({
|
||||||
|
type: 'workflow_result',
|
||||||
|
execution_id: execution_id,
|
||||||
|
status: finalStatus,
|
||||||
|
success: success,
|
||||||
|
message: resultMessage
|
||||||
|
});
|
||||||
|
|
||||||
|
console.log(`Workflow result received for execution ${execution_id}: ${finalStatus}`);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (message.type === 'worker_connect') {
|
||||||
|
// Handle worker connection
|
||||||
|
const { worker_id, worker_name } = message;
|
||||||
|
console.log(`Worker connected: ${worker_name} (${worker_id})`);
|
||||||
|
|
||||||
|
// Find the database worker ID by name
|
||||||
|
const [dbWorkers] = await pool.query(
|
||||||
|
'SELECT id FROM workers WHERE name = ?',
|
||||||
|
[worker_name]
|
||||||
|
);
|
||||||
|
|
||||||
|
if (dbWorkers.length > 0) {
|
||||||
|
const dbWorkerId = dbWorkers[0].id;
|
||||||
|
|
||||||
|
// Store worker WebSocket connection using BOTH IDs
|
||||||
|
workers.set(worker_id, ws); // Runtime ID
|
||||||
|
workers.set(dbWorkerId, ws); // Database ID
|
||||||
|
|
||||||
|
// Store mapping for cleanup
|
||||||
|
ws.workerId = worker_id;
|
||||||
|
ws.dbWorkerId = dbWorkerId;
|
||||||
|
|
||||||
|
console.log(`Mapped worker: runtime_id=${worker_id}, db_id=${dbWorkerId}, name=${worker_name}`);
|
||||||
|
|
||||||
|
// Update worker status to online
|
||||||
|
await pool.query(
|
||||||
|
`UPDATE workers SET status='online', last_heartbeat=NOW() WHERE id=?`,
|
||||||
|
[dbWorkerId]
|
||||||
|
);
|
||||||
|
|
||||||
|
// Broadcast worker status update with database ID
|
||||||
|
broadcast({
|
||||||
|
type: 'worker_update',
|
||||||
|
worker_id: dbWorkerId,
|
||||||
|
status: 'online'
|
||||||
|
});
|
||||||
|
} else {
|
||||||
|
console.log(`Worker ${worker_name} not found in database, will be created on heartbeat`);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (message.type === 'pong') {
|
||||||
|
// Handle worker pong response
|
||||||
|
const { worker_id } = message;
|
||||||
|
await pool.query(
|
||||||
|
`UPDATE workers SET last_heartbeat=NOW() WHERE id=?`,
|
||||||
|
[worker_id]
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
} catch (error) {
|
||||||
|
console.error('WebSocket message error:', error);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
ws.on('close', () => {
|
||||||
|
clients.delete(ws);
|
||||||
|
// Remove worker from workers map when disconnected (both runtime and db IDs)
|
||||||
|
if (ws.workerId) {
|
||||||
|
workers.delete(ws.workerId);
|
||||||
|
console.log(`Worker ${ws.workerId} (runtime ID) disconnected`);
|
||||||
|
}
|
||||||
|
if (ws.dbWorkerId) {
|
||||||
|
workers.delete(ws.dbWorkerId);
|
||||||
|
console.log(`Worker ${ws.dbWorkerId} (database ID) disconnected`);
|
||||||
|
}
|
||||||
|
});
|
||||||
});
|
});
|
||||||
|
|
||||||
// Broadcast to all connected clients
|
// Broadcast to all connected clients
|
||||||
@@ -279,13 +418,11 @@ app.get('/api/executions', authenticateSSO, async (req, res) => {
|
|||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
app.get('/api/executions/:id', authenticateSSO, async (req, res) => {
|
app.delete('/api/executions/:id', authenticateSSO, async (req, res) => {
|
||||||
try {
|
try {
|
||||||
const [rows] = await pool.query('SELECT * FROM executions WHERE id = ?', [req.params.id]);
|
await pool.query('DELETE FROM executions WHERE id = ?', [req.params.id]);
|
||||||
if (rows.length === 0) {
|
broadcast({ type: 'execution_deleted', execution_id: req.params.id });
|
||||||
return res.status(404).json({ error: 'Not found' });
|
res.json({ success: true });
|
||||||
}
|
|
||||||
res.json(rows[0]);
|
|
||||||
} catch (error) {
|
} catch (error) {
|
||||||
res.status(500).json({ error: error.message });
|
res.status(500).json({ error: error.message });
|
||||||
}
|
}
|
||||||
@@ -666,22 +803,38 @@ app.post('/api/workers/:id/command', authenticateSSO, async (req, res) => {
|
|||||||
try {
|
try {
|
||||||
const { command } = req.body;
|
const { command } = req.body;
|
||||||
const executionId = generateUUID();
|
const executionId = generateUUID();
|
||||||
|
const workerId = req.params.id;
|
||||||
|
|
||||||
// Send command via WebSocket
|
// Create execution record in database
|
||||||
|
await pool.query(
|
||||||
|
'INSERT INTO executions (id, workflow_id, status, started_by, started_at, logs) VALUES (?, ?, ?, ?, NOW(), ?)',
|
||||||
|
[executionId, null, 'running', req.user.username, JSON.stringify([{
|
||||||
|
step: 'quick_command',
|
||||||
|
action: 'command_sent',
|
||||||
|
worker_id: workerId,
|
||||||
|
command: command,
|
||||||
|
timestamp: new Date().toISOString()
|
||||||
|
}])]
|
||||||
|
);
|
||||||
|
|
||||||
|
// Send command via WebSocket to specific worker
|
||||||
const commandMessage = {
|
const commandMessage = {
|
||||||
type: 'execute_command',
|
type: 'execute_command',
|
||||||
execution_id: executionId,
|
execution_id: executionId,
|
||||||
command: command,
|
command: command,
|
||||||
worker_id: req.params.id,
|
worker_id: workerId,
|
||||||
timeout: 60000
|
timeout: 60000
|
||||||
};
|
};
|
||||||
|
|
||||||
clients.forEach(client => {
|
const workerWs = workers.get(workerId);
|
||||||
if (client.readyState === WebSocket.OPEN) {
|
if (!workerWs || workerWs.readyState !== WebSocket.OPEN) {
|
||||||
client.send(JSON.stringify(commandMessage));
|
return res.status(400).json({ error: 'Worker not connected' });
|
||||||
}
|
}
|
||||||
});
|
|
||||||
|
|
||||||
|
workerWs.send(JSON.stringify(commandMessage));
|
||||||
|
console.log(`Command sent to worker ${workerId}: ${command}`);
|
||||||
|
|
||||||
|
broadcast({ type: 'execution_started', execution_id: executionId, workflow_id: null });
|
||||||
res.json({ success: true, execution_id: executionId });
|
res.json({ success: true, execution_id: executionId });
|
||||||
} catch (error) {
|
} catch (error) {
|
||||||
res.status(500).json({ error: error.message });
|
res.status(500).json({ error: error.message });
|
||||||
|
|||||||
Reference in New Issue
Block a user