Created by ${w.created_by || 'Unknown'} on ${safeDate(w.created_at)?.toLocaleString() ?? 'N/A'}
Created by ${escapeHtml(w.created_by || 'Unknown')} on ${safeDate(w.created_at)?.toLocaleString() ?? 'N/A'}
${currentUser && currentUser.isAdmin ?
@@ -1298,6 +1299,7 @@
document.getElementById('workflowList').innerHTML = html;
} catch (error) {
console.error('Error loading workflows:', error);
+ document.getElementById('workflowList').innerHTML = '
⚠ Failed to load workflows
';
}
}
@@ -1544,6 +1546,7 @@
} catch (error) {
console.error('Error loading executions:', error);
+ document.getElementById('executionList').innerHTML = '
⚠ Failed to load executions
';
}
}
@@ -2942,18 +2945,9 @@
ws.onmessage = (event) => {
try {
const data = JSON.parse(event.data);
- console.log('WebSocket message:', data);
// Handle specific message types
if (data.type === 'command_result') {
- // Display command result in real-time
- console.log(`Command result received for execution ${data.execution_id}`);
- console.log(`Success: ${data.success}`);
- console.log(`Output: ${data.stdout}`);
- if (data.stderr) {
- console.log(`Error: ${data.stderr}`);
- }
-
// Show terminal notification only for manual executions
if (!data.is_automated) {
if (data.success) {
@@ -2978,7 +2972,6 @@
}
if (data.type === 'workflow_result') {
- console.log(`Workflow ${data.status} for execution ${data.execution_id}`);
// Refresh execution list
loadExecutions();
@@ -2994,7 +2987,6 @@
}
if (data.type === 'worker_update') {
- console.log(`Worker ${data.worker_id} status: ${data.status}`);
loadWorkers();
}
diff --git a/server.js b/server.js
index 80d325f..16a26ff 100644
--- a/server.js
+++ b/server.js
@@ -237,7 +237,20 @@ async function processScheduledCommands() {
);
for (const schedule of schedules) {
- // Prevent overlapping execution — skip if a previous run is still active
+ // Atomically claim this run by advancing next_run before doing any work.
+ // If two scheduler instances race, only the one that updates a row proceeds.
+ const claimNextRun = calculateNextRun(schedule.schedule_type, schedule.schedule_value);
+ const [claimed] = await pool.query(
+ `UPDATE scheduled_commands SET next_run = ?, last_run = NOW()
+ WHERE id = ? AND (next_run IS NULL OR next_run <= NOW())`,
+ [claimNextRun, schedule.id]
+ );
+ if (claimed.affectedRows === 0) {
+ console.log(`[Scheduler] Skipping "${schedule.name}" - already claimed by another run`);
+ continue;
+ }
+
+ // Also skip if a previous run is still active
const [runningExecs] = await pool.query(
"SELECT id FROM executions WHERE started_by = ? AND status = 'running'",
[`scheduler:${schedule.name}`]
@@ -291,18 +304,7 @@ async function processScheduledCommands() {
}
}
- // Update last_run and calculate next_run
- let nextRun;
- try {
- nextRun = calculateNextRun(schedule.schedule_type, schedule.schedule_value);
- } catch (err) {
- console.error(`[Scheduler] Invalid schedule config for "${schedule.name}": ${err.message}`);
- continue;
- }
- await pool.query(
- 'UPDATE scheduled_commands SET last_run = NOW(), next_run = ? WHERE id = ?',
- [nextRun, schedule.id]
- );
+ // next_run and last_run already updated atomically above when we claimed the slot
}
} catch (error) {
console.error('[Scheduler] Error processing scheduled commands:', error);
@@ -480,6 +482,12 @@ wss.on('connection', (ws) => {
if (dbWorkers.length > 0) {
const dbWorkerId = dbWorkers[0].id;
+ // Clean up any stale entry for this db worker before storing the new one
+ // (handles reconnect: old runtime-ID entry would otherwise linger).
+ for (const [key, val] of workers) {
+ if (val.dbWorkerId === dbWorkerId && val !== ws) workers.delete(key);
+ }
+
// Store worker WebSocket connection using BOTH IDs
workers.set(worker_id, ws); // Runtime ID
workers.set(dbWorkerId, ws); // Database ID
@@ -543,7 +551,10 @@ wss.on('connection', (ws) => {
// Broadcast to browser clients only (NOT worker agents)
function broadcast(data) {
- browserClients.forEach(client => {
+ // Snapshot the Set before iterating — a close event during iteration would
+ // otherwise modify the Set in-place, causing skipped or double-visited entries.
+ const snapshot = Array.from(browserClients);
+ snapshot.forEach(client => {
if (client.readyState === WebSocket.OPEN) {
client.send(JSON.stringify(data));
}
@@ -716,6 +727,7 @@ function evalCondition(condition, state, params) {
const context = vm.createContext({ state, params, promptResponse: state.promptResponse });
return !!vm.runInNewContext(condition, context, { timeout: 100 });
} catch (e) {
+ console.warn(`[Workflow] evalCondition error (treated as false): ${e.message} — condition: ${condition}`);
return false;
}
}
@@ -772,6 +784,16 @@ async function executeWorkflowSteps(executionId, workflowId, definition, usernam
const step = steps[currentIndex];
const execState = _executionState.get(executionId);
+ if (!execState) {
+ // State was lost (server restart or bug) — fail cleanly rather than throwing TypeError
+ await addExecutionLog(executionId, {
+ action: 'workflow_error',
+ error: 'Execution state lost unexpectedly; aborting.',
+ timestamp: new Date().toISOString()
+ });
+ await updateExecutionStatus(executionId, 'failed');
+ return;
+ }
const stepLabel = step.name || step.id || `Step ${currentIndex + 1}`;
console.log(`[Workflow] ${executionId} — step ${currentIndex + 1}: ${stepLabel}`);