Phase 10: Command Scheduler
Added comprehensive command scheduling system: Backend: - New scheduled_commands database table - Scheduler processor runs every minute - Support for three schedule types: interval, hourly, daily - calculateNextRun() function for intelligent scheduling - API endpoints: GET, POST, PUT (toggle), DELETE - Executions automatically created and tracked - Enable/disable schedules without deleting Frontend: - New Scheduler tab in navigation - Create Schedule modal with worker selection - Dynamic schedule input based on type - Schedule list showing status, next/last run times - Enable/Disable toggle for each schedule - Delete schedule functionality - Terminal-themed scheduler UI - Integration with existing worker and execution systems Schedule Types: - Interval: Every X minutes (e.g., 30 for every 30 min) - Hourly: Every X hours (e.g., 2 for every 2 hours) - Daily: At specific time (e.g., 03:00 for 3 AM daily) Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
This commit is contained in:
171
server.js
171
server.js
@@ -89,6 +89,24 @@ async function initDatabase() {
|
||||
)
|
||||
`);
|
||||
|
||||
await connection.query(`
|
||||
CREATE TABLE IF NOT EXISTS scheduled_commands (
|
||||
id VARCHAR(36) PRIMARY KEY,
|
||||
name VARCHAR(255) NOT NULL,
|
||||
command TEXT NOT NULL,
|
||||
worker_ids JSON NOT NULL,
|
||||
schedule_type VARCHAR(50) NOT NULL,
|
||||
schedule_value VARCHAR(255) NOT NULL,
|
||||
enabled BOOLEAN DEFAULT TRUE,
|
||||
created_by VARCHAR(255),
|
||||
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
|
||||
last_run TIMESTAMP NULL,
|
||||
next_run TIMESTAMP NULL,
|
||||
INDEX idx_enabled (enabled),
|
||||
INDEX idx_next_run (next_run)
|
||||
)
|
||||
`);
|
||||
|
||||
console.log('Database tables initialized successfully');
|
||||
} catch (error) {
|
||||
console.error('Database initialization error:', error);
|
||||
@@ -119,6 +137,95 @@ setInterval(cleanupOldExecutions, 24 * 60 * 60 * 1000);
|
||||
// Run cleanup on startup
|
||||
cleanupOldExecutions();
|
||||
|
||||
// Scheduled Commands Processor
|
||||
async function processScheduledCommands() {
|
||||
try {
|
||||
const [schedules] = await pool.query(
|
||||
`SELECT * FROM scheduled_commands
|
||||
WHERE enabled = TRUE
|
||||
AND (next_run IS NULL OR next_run <= NOW())`
|
||||
);
|
||||
|
||||
for (const schedule of schedules) {
|
||||
console.log(`[Scheduler] Running scheduled command: ${schedule.name}`);
|
||||
|
||||
const workerIds = JSON.parse(schedule.worker_ids);
|
||||
|
||||
// Execute command on each worker
|
||||
for (const workerId of workerIds) {
|
||||
const workerWs = workers.get(workerId);
|
||||
if (workerWs && workerWs.readyState === WebSocket.OPEN) {
|
||||
const executionId = generateUUID();
|
||||
|
||||
// Create execution record
|
||||
await pool.query(
|
||||
'INSERT INTO executions (id, workflow_id, status, started_by, started_at, logs) VALUES (?, ?, ?, ?, NOW(), ?)',
|
||||
[executionId, null, 'running', `scheduler:${schedule.name}`, JSON.stringify([{
|
||||
step: 'scheduled_command',
|
||||
action: 'command_sent',
|
||||
worker_id: workerId,
|
||||
command: schedule.command,
|
||||
timestamp: new Date().toISOString()
|
||||
}])]
|
||||
);
|
||||
|
||||
// Send command to worker
|
||||
workerWs.send(JSON.stringify({
|
||||
type: 'execute_command',
|
||||
execution_id: executionId,
|
||||
command: schedule.command,
|
||||
worker_id: workerId,
|
||||
timeout: 300000 // 5 minute timeout for scheduled commands
|
||||
}));
|
||||
|
||||
broadcast({ type: 'execution_started', execution_id: executionId, workflow_id: null });
|
||||
}
|
||||
}
|
||||
|
||||
// Update last_run and calculate next_run
|
||||
const nextRun = calculateNextRun(schedule.schedule_type, schedule.schedule_value);
|
||||
await pool.query(
|
||||
'UPDATE scheduled_commands SET last_run = NOW(), next_run = ? WHERE id = ?',
|
||||
[nextRun, schedule.id]
|
||||
);
|
||||
}
|
||||
} catch (error) {
|
||||
console.error('[Scheduler] Error processing scheduled commands:', error);
|
||||
}
|
||||
}
|
||||
|
||||
function calculateNextRun(scheduleType, scheduleValue) {
|
||||
const now = new Date();
|
||||
|
||||
if (scheduleType === 'interval') {
|
||||
// Interval in minutes
|
||||
const minutes = parseInt(scheduleValue);
|
||||
return new Date(now.getTime() + minutes * 60000);
|
||||
} else if (scheduleType === 'daily') {
|
||||
// Daily at HH:MM
|
||||
const [hours, minutes] = scheduleValue.split(':').map(Number);
|
||||
const next = new Date(now);
|
||||
next.setHours(hours, minutes, 0, 0);
|
||||
|
||||
// If time has passed today, schedule for tomorrow
|
||||
if (next <= now) {
|
||||
next.setDate(next.getDate() + 1);
|
||||
}
|
||||
return next;
|
||||
} else if (scheduleType === 'hourly') {
|
||||
// Every N hours
|
||||
const hours = parseInt(scheduleValue);
|
||||
return new Date(now.getTime() + hours * 3600000);
|
||||
}
|
||||
|
||||
return null;
|
||||
}
|
||||
|
||||
// Run scheduler every minute
|
||||
setInterval(processScheduledCommands, 60 * 1000);
|
||||
// Initial run on startup
|
||||
setTimeout(processScheduledCommands, 5000);
|
||||
|
||||
// WebSocket connections
|
||||
const clients = new Set();
|
||||
const workers = new Map(); // Map worker_id -> WebSocket connection
|
||||
@@ -464,6 +571,70 @@ app.delete('/api/executions/:id', authenticateSSO, async (req, res) => {
|
||||
}
|
||||
});
|
||||
|
||||
// Scheduled Commands API
|
||||
app.get('/api/scheduled-commands', authenticateSSO, async (req, res) => {
|
||||
try {
|
||||
const [schedules] = await pool.query(
|
||||
'SELECT * FROM scheduled_commands ORDER BY created_at DESC'
|
||||
);
|
||||
res.json(schedules);
|
||||
} catch (error) {
|
||||
res.status(500).json({ error: error.message });
|
||||
}
|
||||
});
|
||||
|
||||
app.post('/api/scheduled-commands', authenticateSSO, async (req, res) => {
|
||||
try {
|
||||
const { name, command, worker_ids, schedule_type, schedule_value } = req.body;
|
||||
|
||||
if (!name || !command || !worker_ids || !schedule_type || !schedule_value) {
|
||||
return res.status(400).json({ error: 'Missing required fields' });
|
||||
}
|
||||
|
||||
const id = generateUUID();
|
||||
const nextRun = calculateNextRun(schedule_type, schedule_value);
|
||||
|
||||
await pool.query(
|
||||
`INSERT INTO scheduled_commands
|
||||
(id, name, command, worker_ids, schedule_type, schedule_value, created_by, next_run)
|
||||
VALUES (?, ?, ?, ?, ?, ?, ?, ?)`,
|
||||
[id, name, command, JSON.stringify(worker_ids), schedule_type, schedule_value, req.user.username, nextRun]
|
||||
);
|
||||
|
||||
res.json({ success: true, id });
|
||||
} catch (error) {
|
||||
res.status(500).json({ error: error.message });
|
||||
}
|
||||
});
|
||||
|
||||
app.put('/api/scheduled-commands/:id/toggle', authenticateSSO, async (req, res) => {
|
||||
try {
|
||||
const { id } = req.params;
|
||||
const [current] = await pool.query('SELECT enabled FROM scheduled_commands WHERE id = ?', [id]);
|
||||
|
||||
if (current.length === 0) {
|
||||
return res.status(404).json({ error: 'Schedule not found' });
|
||||
}
|
||||
|
||||
const newEnabled = !current[0].enabled;
|
||||
await pool.query('UPDATE scheduled_commands SET enabled = ? WHERE id = ?', [newEnabled, id]);
|
||||
|
||||
res.json({ success: true, enabled: newEnabled });
|
||||
} catch (error) {
|
||||
res.status(500).json({ error: error.message });
|
||||
}
|
||||
});
|
||||
|
||||
app.delete('/api/scheduled-commands/:id', authenticateSSO, async (req, res) => {
|
||||
try {
|
||||
const { id } = req.params;
|
||||
await pool.query('DELETE FROM scheduled_commands WHERE id = ?', [id]);
|
||||
res.json({ success: true });
|
||||
} catch (error) {
|
||||
res.status(500).json({ error: error.message });
|
||||
}
|
||||
});
|
||||
|
||||
// Health check (no auth required)
|
||||
app.get('/health', async (req, res) => {
|
||||
try {
|
||||
|
||||
Reference in New Issue
Block a user