Compare commits

..

12 Commits

Author SHA1 Message Date
1f5c84f327 Add execution cleanup functionality
Changes:
- Added DELETE /api/executions/:id endpoint
- Added "Clear Completed" button to Executions tab
- Deletes all completed and failed executions
- Broadcasts execution_deleted event to update all clients
- Shows count of deleted executions

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
2026-01-07 22:36:51 -05:00
e03f8d6287 Fix worker ID mapping - use database ID for command routing
Problem:
- Workers generate random UUID on startup (runtime ID)
- Database stores workers with persistent IDs (database ID)
- UI sends commands using database ID
- Server couldn't find worker connection (stored by runtime ID)
- Result: 400 Bad Request "Worker not connected"

Solution:
- When worker connects, look up database ID by worker name
- Store WebSocket connection in Map using BOTH IDs:
  * Runtime ID (from worker_connect message)
  * Database ID (from database lookup by name)
- Commands from UI use database ID → finds correct WebSocket
- Cleanup both IDs when worker disconnects

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
2026-01-07 22:29:23 -05:00
2097b73404 Fix worker command execution and execution status updates
Changes:
- Removed duplicate /api/executions/:id endpoint that didn't parse logs
- Added workers Map to track worker_id -> WebSocket connection
- Store worker connections when they send worker_connect message
- Send commands to specific worker instead of broadcasting to all clients
- Clean up workers Map when worker disconnects
- Update execution status to completed/failed when command results arrive
- Add proper error handling when worker is not connected

Fixes:
- execution.logs.forEach is not a function (logs now properly parsed)
- Commands stuck in "running" status (now update to completed/failed)
- Commands not reaching workers (now sent to specific worker WebSocket)

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
2026-01-07 22:23:02 -05:00
6d15e4d240 Remove database migrations after direct schema fixes
Changes:
- Removed all migration code from server.js
- Database schema fixed directly via MySQL:
  * Dropped users.role column (SSO only)
  * Dropped users.password column (SSO only)
  * Added executions.started_by column
  * Added workflows.created_by column
  * All tables now match expected schema
- Server startup will be faster without migrations

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
2026-01-07 22:11:07 -05:00
7896b40d91 Remove password column from users table
Changes:
- Drop password column from users table (SSO authentication only)
- PULSE uses Authelia SSO, not password-based authentication
- Fixes 500 error: Field 'password' doesn't have a default value

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
2026-01-07 22:01:58 -05:00
e2dc371bfe Add last_login column to users table migration
Changes:
- Add last_login TIMESTAMP column to existing users table
- Complete the users table migration with all required columns
- Fixes 500 error: Unknown column 'last_login' in 'INSERT INTO'

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
2026-01-07 20:33:04 -05:00
df0184facf Add migration to update users table schema
Changes:
- Add display_name, email, and groups columns to existing users table
- Handle MariaDB lack of IF NOT EXISTS in ALTER TABLE
- Gracefully skip columns that already exist
- Fixes 500 error when authenticating users

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
2026-01-07 20:28:47 -05:00
a8be111e04 Allow NULL workflow_id in executions table for quick commands
Changes:
- Modified executions table schema to allow NULL workflow_id
- Removed foreign key constraint that prevented NULL values
- Added migration to update existing table structure
- Quick commands can now be stored without a workflow reference

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
2026-01-07 20:27:02 -05:00
b3806545bd Fix quick command executions not appearing in execution tab
Changes:
- Create execution record in database when quick command is sent
- Store initial log entry with command details
- Broadcast execution_started event to update UI
- Display quick commands as "[Quick Command]" in execution list
- Fix worker communication to properly track all executions

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
2026-01-07 20:24:11 -05:00
2767087e27 Updated websocket handler 2026-01-07 20:20:18 -05:00
a1cf8ac90b updates aesthetic 2026-01-07 20:12:16 -05:00
9e842624e1 Claude md file 2026-01-07 19:57:16 -05:00
4 changed files with 1960 additions and 2387 deletions

1372
Claude.md Normal file

File diff suppressed because it is too large Load Diff

469
README.md
View File

@@ -1,375 +1,240 @@
# PULSE - Pipelined Unified Logic & Server Engine # PULSE - Pipelined Unified Logic & Server Engine
A distributed workflow orchestration platform for managing and executing complex multi-step operations across server clusters through a retro terminal-themed web interface. A distributed workflow orchestration platform for managing and executing complex multi-step operations across server clusters through an intuitive web interface.
## Overview ## Overview
PULSE is a centralized workflow execution system designed to orchestrate operations across distributed infrastructure. It provides a powerful web-based interface with a vintage CRT terminal aesthetic for defining, managing, and executing workflows that can span multiple servers, require human interaction, and perform complex automation tasks at scale. PULSE is a centralized workflow execution system designed to orchestrate operations across distributed infrastructure. It provides a powerful web-based interface for defining, managing, and executing workflows that can span multiple servers, require human interaction, and perform complex automation tasks at scale.
### Key Features ### Key Features
- **🎨 Retro Terminal Interface**: Phosphor green CRT-style interface with scanlines, glow effects, and ASCII art - **Interactive Workflow Management**: Define and execute multi-step workflows with conditional logic, user prompts, and decision points
- **⚡ Quick Command Execution**: Instantly execute commands on any worker with built-in templates and command history - **Distributed Execution**: Run commands and scripts across multiple worker nodes simultaneously
- **📊 Real-Time Worker Monitoring**: Live system metrics including CPU, memory, load average, and active tasks - **High Availability Architecture**: Deploy redundant worker nodes in LXC containers with Ceph storage for fault tolerance
- **🔄 Interactive Workflow Management**: Define and execute multi-step workflows with conditional logic and user prompts - **Web-Based Control Center**: Intuitive interface for workflow selection, monitoring, and interactive input
- **🌐 Distributed Execution**: Run commands across multiple worker nodes simultaneously via WebSocket - **Flexible Worker Pool**: Scale horizontally by adding worker nodes as needed
- **📈 Execution Tracking**: Comprehensive logging with formatted output, re-run capabilities, and JSON export - **Real-Time Monitoring**: Track workflow progress, view logs, and receive notifications
- **🔐 SSO Authentication**: Seamless integration with Authelia for enterprise authentication
- **🧹 Auto-Cleanup**: Automatic removal of old executions with configurable retention policies
- **🔔 Terminal Notifications**: Audio beeps and visual toasts for command completion events
## Architecture ## Architecture
PULSE consists of two core components: PULSE consists of two core components:
### PULSE Server ### PULSE Server
**Location:** `10.10.10.65` (LXC Container ID: 122)
**Directory:** `/opt/pulse-server`
The central orchestration hub that: The central orchestration hub that:
- Hosts the retro terminal web interface - Hosts the web interface for workflow management
- Manages workflow definitions and execution state - Manages workflow definitions and execution state
- Coordinates task distribution to worker nodes via WebSocket - Coordinates task distribution to worker nodes
- Handles user interactions through Authelia SSO - Handles user interactions and input collection
- Provides real-time status updates and logging - Provides real-time status updates and logging
- Stores all data in MariaDB database
**Technology Stack:**
- Node.js 20.x
- Express.js (web framework)
- WebSocket (ws package) for real-time bidirectional communication
- MySQL2 (MariaDB driver)
- Authelia SSO integration
### PULSE Worker ### PULSE Worker
**Example:** `10.10.10.151` (LXC Container ID: 153, hostname: pulse-worker-01)
**Directory:** `/opt/pulse-worker`
Lightweight execution agents that: Lightweight execution agents that:
- Connect to PULSE server via WebSocket with heartbeat monitoring - Connect to the PULSE server and await task assignments
- Execute shell commands and report results in real-time - Execute commands, scripts, and code on target infrastructure
- Provide system metrics (CPU, memory, load, uptime) - Report execution status and results back to the server
- Support concurrent task execution with configurable limits - Support multiple concurrent workflow executions
- Automatically reconnect on connection loss - Automatically reconnect and resume on failure
**Technology Stack:**
- Node.js 20.x
- WebSocket client
- Child process execution
- System metrics collection
``` ```
┌───────────────────────────────── ┌─────────────────────┐
│ PULSE Server (10.10.10.65) │ PULSE Server
Terminal Web Interface + API (Web Interface)
│ ┌───────────┐ ┌──────────┐ │ ────────────────────
│ MariaDB │ │ Authelia │
│ Database │ │ SSO │ │ ┌──────┴───────┬──────────────┬──────────────┐
│ └───────────┘ └──────────┘ │ │ │
────────────────────────────────┘ ───────┐ ┌───────┐ ┌───────┐ ┌───────
│ WebSocket │ Worker │ │ Worker │ │ Worker │ │ Worker │
┌────────┴────────┬───────────┐ │ Node 1 │ │ Node 2 │ │ Node 3 │ │ Node N │
│ │ │ └────────┘ └────────┘ └────────┘ └────────┘
┌───▼────────┐ ┌───▼────┐ ┌──▼─────┐ LXC Containers in Proxmox with Ceph
│ Worker 1 │ │Worker 2│ │Worker N│
│10.10.10.151│ │ ... │ │ ... │
└────────────┘ └────────┘ └────────┘
LXC Containers in Proxmox with Ceph
``` ```
## Installation ## Deployment
### Prerequisites ### Prerequisites
- **Node.js 20.x** or higher - **Proxmox VE Cluster**: Hypervisor environment for container deployment
- **MariaDB 10.x** or higher - **Ceph Storage**: Distributed storage backend for high availability
- **Authelia** configured for SSO (optional but recommended) - **LXC Support**: Container runtime for worker node deployment
- **Network Connectivity** between server and workers - **Network Connectivity**: Communication between server and workers
### PULSE Server Setup ### Installation
#### PULSE Server
```bash ```bash
# Clone repository # Clone the repository
cd /opt git clone https://github.com/yourusername/pulse.git
git clone <your-repo-url> pulse-server cd pulse
cd pulse-server
# Install dependencies # Install dependencies
npm install npm install # or pip install -r requirements.txt
# Create .env file with configuration # Configure server settings
cat > .env << EOF cp config.example.yml config.yml
# Server Configuration nano config.yml
PORT=8080
SECRET_KEY=your-secret-key-here
# MariaDB Configuration # Start the PULSE server
DB_HOST=10.10.10.50 npm start # or python server.py
DB_PORT=3306
DB_NAME=pulse
DB_USER=pulse_user
DB_PASSWORD=your-db-password
# Worker API Key (for worker authentication)
WORKER_API_KEY=your-worker-api-key
# Auto-cleanup configuration (optional)
EXECUTION_RETENTION_DAYS=30
EOF
# Create systemd service
cat > /etc/systemd/system/pulse.service << EOF
[Unit]
Description=PULSE Workflow Orchestration Server
After=network.target
[Service]
Type=simple
User=root
WorkingDirectory=/opt/pulse-server
ExecStart=/usr/bin/node server.js
Restart=always
RestartSec=10
[Install]
WantedBy=multi-user.target
EOF
# Start service
systemctl daemon-reload
systemctl enable pulse.service
systemctl start pulse.service
``` ```
### PULSE Worker Setup #### PULSE Worker
```bash ```bash
# On each worker node # On each worker node (LXC container)
cd /opt
git clone <your-repo-url> pulse-worker
cd pulse-worker cd pulse-worker
# Install dependencies # Install dependencies
npm install npm install # or pip install -r requirements.txt
# Create .env file # Configure worker connection
cat > .env << EOF cp worker-config.example.yml worker-config.yml
# Worker Configuration nano worker-config.yml
WORKER_NAME=pulse-worker-01
PULSE_SERVER=http://10.10.10.65:8080
PULSE_WS=ws://10.10.10.65:8080
WORKER_API_KEY=your-worker-api-key
# Performance Settings # Start the worker daemon
HEARTBEAT_INTERVAL=30 npm start # or python worker.py
MAX_CONCURRENT_TASKS=5 ```
EOF
# Create systemd service ### High Availability Setup
cat > /etc/systemd/system/pulse-worker.service << EOF
[Unit]
Description=PULSE Worker Node
After=network.target
[Service] Deploy multiple worker nodes across Proxmox hosts:
Type=simple ```bash
User=root # Create LXC template
WorkingDirectory=/opt/pulse-worker pct create 1000 local:vztmpl/ubuntu-22.04-standard_amd64.tar.zst \
ExecStart=/usr/bin/node worker.js --rootfs ceph-pool:8 \
Restart=always --memory 2048 \
RestartSec=10 --cores 2 \
--net0 name=eth0,bridge=vmbr0,ip=dhcp
[Install] # Clone for additional workers
WantedBy=multi-user.target pct clone 1000 1001 --full --storage ceph-pool
EOF pct clone 1000 1002 --full --storage ceph-pool
pct clone 1000 1003 --full --storage ceph-pool
# Start service # Start all workers
systemctl daemon-reload for i in {1000..1003}; do pct start $i; done
systemctl enable pulse-worker.service
systemctl start pulse-worker.service
``` ```
## Usage ## Usage
### Quick Command Execution ### Creating a Workflow
1. Access PULSE at `http://your-server:8080` 1. Access the PULSE web interface at `http://your-server:8080`
2. Navigate to **⚡ Quick Command** tab 2. Navigate to **Workflows****Create New**
3. Select a worker from the dropdown 3. Define workflow steps using the visual editor or YAML syntax
4. Use **Templates** for pre-built commands or **History** for recent commands 4. Specify execution targets (specific nodes, groups, or all workers)
5. Enter your command and click **Execute** 5. Add interactive prompts where user input is required
6. View results in the **Executions** tab 6. Save and activate the workflow
**Built-in Command Templates:** ### Example Workflow
- System Info: `uname -a` ```yaml
- Disk Usage: `df -h` name: "System Update and Reboot"
- Memory Usage: `free -h` description: "Update all servers in the cluster with user confirmation"
- CPU Info: `lscpu` steps:
- Running Processes: `ps aux --sort=-%mem | head -20` - name: "Check Current Versions"
- Network Interfaces: `ip addr show` type: "execute"
- Docker Containers: `docker ps -a` targets: ["all"]
- System Logs: `tail -n 50 /var/log/syslog` command: "apt list --upgradable"
### Worker Monitoring - name: "User Approval"
type: "prompt"
message: "Review available updates. Proceed with installation?"
options: ["Yes", "No", "Cancel"]
The **Workers** tab displays real-time metrics for each worker: - name: "Install Updates"
- System information (OS, architecture, CPU cores) type: "execute"
- Memory usage (used/total with percentage) targets: ["all"]
- Load averages (1m, 5m, 15m) command: "apt-get update && apt-get upgrade -y"
- System uptime condition: "prompt_response == 'Yes'"
- Active tasks vs. maximum concurrent capacity
### Execution Management - name: "Reboot Confirmation"
type: "prompt"
message: "Updates complete. Reboot all servers?"
options: ["Yes", "No"]
- **View Details**: Click any execution to see formatted logs with timestamps, status, and output - name: "Rolling Reboot"
- **Re-run Command**: Click "Re-run" button in execution details to repeat a command type: "execute"
- **Download Logs**: Export execution data as JSON for auditing targets: ["all"]
- **Clear Completed**: Bulk delete finished executions command: "reboot"
- **Auto-Cleanup**: Executions older than 30 days are automatically removed strategy: "rolling"
condition: "prompt_response == 'Yes'"
```
### Workflow Creation (Future Feature) ### Running a Workflow
1. Navigate to **Workflows****Create New** 1. Select a workflow from the dashboard
2. Define workflow steps using JSON syntax 2. Click **Execute**
3. Specify target workers 3. Monitor progress in real-time
4. Add interactive prompts where needed 4. Respond to interactive prompts as they appear
5. Save and execute 5. View detailed logs for each execution step
## Features in Detail
### Terminal Aesthetic
- Phosphor green (#00ff41) on black (#0a0a0a) color scheme
- CRT scanline animation effect
- Text glow and shadow effects
- ASCII box-drawing characters for borders
- Boot sequence animation on first load
- Hover effects with smooth transitions
### Real-Time Communication
- WebSocket-based bidirectional communication
- Instant command result notifications
- Live worker status updates
- Terminal beep sounds for events
- Toast notifications with visual feedback
### Execution Tracking
- Formatted log display (not raw JSON)
- Color-coded success/failure indicators
- Timestamp and duration for each step
- Scrollable output with syntax highlighting
- Persistent history with pagination
- Load More button for large execution lists
### Security
- Authelia SSO integration for user authentication
- API key authentication for workers
- User session management
- Admin-only operations (worker deletion, workflow management)
- Audit logging for all executions
### Performance
- Automatic cleanup of old executions (configurable retention)
- Pagination for large execution lists (50 at a time)
- Efficient WebSocket connection pooling
- Worker heartbeat monitoring
- Database connection pooling
## Configuration ## Configuration
### Environment Variables ### Server Configuration (`config.yml`)
```yaml
server:
host: "0.0.0.0"
port: 8080
secret_key: "your-secret-key"
**Server (.env):** database:
```bash type: "postgresql"
PORT=8080 # Server port host: "localhost"
SECRET_KEY=<random-string> # Session secret port: 5432
DB_HOST=10.10.10.50 # MariaDB host name: "pulse"
DB_PORT=3306 # MariaDB port
DB_NAME=pulse # Database name workers:
DB_USER=pulse_user # Database user heartbeat_interval: 30
DB_PASSWORD=<password> # Database password timeout: 300
WORKER_API_KEY=<api-key> # Worker authentication key max_concurrent_tasks: 10
EXECUTION_RETENTION_DAYS=30 # Auto-cleanup retention (default: 30)
security:
enable_authentication: true
require_approval: true
``` ```
**Worker (.env):** ### Worker Configuration (`worker-config.yml`)
```bash ```yaml
WORKER_NAME=pulse-worker-01 # Unique worker name worker:
PULSE_SERVER=http://10.10.10.65:8080 # Server HTTP URL name: "worker-01"
PULSE_WS=ws://10.10.10.65:8080 # Server WebSocket URL server_url: "http://pulse-server:8080"
WORKER_API_KEY=<api-key> # Must match server key api_key: "worker-api-key"
HEARTBEAT_INTERVAL=30 # Heartbeat seconds (default: 30)
MAX_CONCURRENT_TASKS=5 # Max parallel tasks (default: 5) resources:
max_cpu_percent: 80
max_memory_mb: 1024
executor:
shell: "/bin/bash"
working_directory: "/tmp/pulse"
timeout: 3600
``` ```
## Database Schema ## Features in Detail
PULSE uses MariaDB with the following tables: ### Interactive Workflows
- Pause execution to collect user input via web forms
- Display intermediate results for review
- Conditional branching based on user decisions
- Multi-choice prompts with validation
- **users**: User accounts from Authelia SSO ### Mass Execution
- **workers**: Worker node registry with metadata - Run commands across all workers simultaneously
- **workflows**: Workflow definitions (JSON) - Target specific node groups or individual servers
- **executions**: Execution history with logs - Rolling execution for zero-downtime updates
- Parallel and sequential execution strategies
See [Claude.md](Claude.md) for complete schema details. ### Monitoring & Logging
- Real-time workflow execution dashboard
- Detailed per-step logging and output capture
- Historical execution records and analytics
- Alert notifications for failures or completion
## Troubleshooting ### Security
- Role-based access control (RBAC)
- API key authentication for workers
- Workflow approval requirements
- Audit logging for all actions
### Worker Not Connecting
```bash
# Check worker service status
systemctl status pulse-worker
# Check worker logs
journalctl -u pulse-worker -n 50 -f
# Verify API key matches server
grep WORKER_API_KEY /opt/pulse-worker/.env
```
### Commands Stuck in "Running"
- This was fixed in recent updates - restart the server:
```bash
systemctl restart pulse.service
```
### Clear All Executions
Use the database directly if needed:
```bash
mysql -h 10.10.10.50 -u pulse_user -p pulse
> DELETE FROM executions WHERE status IN ('completed', 'failed');
```
## Development
### Recent Updates
**Phase 1-6 Improvements:**
- Formatted log display with color-coding
- Worker system metrics monitoring
- Command templates and history
- Re-run and download execution features
- Auto-cleanup and pagination
- Terminal aesthetic refinements
- Audio notifications and visual toasts
See git history for detailed changelog.
### Future Enhancements
- Full workflow system implementation
- Multi-worker command execution
- Scheduled/cron job support
- Execution search and filtering
- Dark/light theme toggle
- Mobile-responsive design
- REST API documentation
- Webhook integrations
## License
MIT License - See LICENSE file for details
--- ---
**PULSE** - Orchestrating your infrastructure, one heartbeat at a time. **PULSE** - Orchestrating your infrastructure, one heartbeat at a time.
Built with retro terminal aesthetics 🖥️ | Powered by WebSockets 🔌 | Secured by Authelia 🔐

File diff suppressed because it is too large Load Diff

854
server.js
View File

@@ -89,24 +89,6 @@ async function initDatabase() {
) )
`); `);
await connection.query(`
CREATE TABLE IF NOT EXISTS scheduled_commands (
id VARCHAR(36) PRIMARY KEY,
name VARCHAR(255) NOT NULL,
command TEXT NOT NULL,
worker_ids JSON NOT NULL,
schedule_type VARCHAR(50) NOT NULL,
schedule_value VARCHAR(255) NOT NULL,
enabled BOOLEAN DEFAULT TRUE,
created_by VARCHAR(255),
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
last_run TIMESTAMP NULL,
next_run TIMESTAMP NULL,
INDEX idx_enabled (enabled),
INDEX idx_next_run (next_run)
)
`);
console.log('Database tables initialized successfully'); console.log('Database tables initialized successfully');
} catch (error) { } catch (error) {
console.error('Database initialization error:', error); console.error('Database initialization error:', error);
@@ -116,116 +98,6 @@ async function initDatabase() {
} }
} }
// Auto-cleanup old executions (runs daily)
async function cleanupOldExecutions() {
try {
const retentionDays = parseInt(process.env.EXECUTION_RETENTION_DAYS) || 30;
const [result] = await pool.query(
`DELETE FROM executions
WHERE status IN ('completed', 'failed')
AND started_at < DATE_SUB(NOW(), INTERVAL ? DAY)`,
[retentionDays]
);
console.log(`[Cleanup] Removed ${result.affectedRows} executions older than ${retentionDays} days`);
} catch (error) {
console.error('[Cleanup] Error removing old executions:', error);
}
}
// Run cleanup daily at 3 AM
setInterval(cleanupOldExecutions, 24 * 60 * 60 * 1000);
// Run cleanup on startup
cleanupOldExecutions();
// Scheduled Commands Processor
async function processScheduledCommands() {
try {
const [schedules] = await pool.query(
`SELECT * FROM scheduled_commands
WHERE enabled = TRUE
AND (next_run IS NULL OR next_run <= NOW())`
);
for (const schedule of schedules) {
console.log(`[Scheduler] Running scheduled command: ${schedule.name}`);
const workerIds = JSON.parse(schedule.worker_ids);
// Execute command on each worker
for (const workerId of workerIds) {
const workerWs = workers.get(workerId);
if (workerWs && workerWs.readyState === WebSocket.OPEN) {
const executionId = generateUUID();
// Create execution record
await pool.query(
'INSERT INTO executions (id, workflow_id, status, started_by, started_at, logs) VALUES (?, ?, ?, ?, NOW(), ?)',
[executionId, null, 'running', `scheduler:${schedule.name}`, JSON.stringify([{
step: 'scheduled_command',
action: 'command_sent',
worker_id: workerId,
command: schedule.command,
timestamp: new Date().toISOString()
}])]
);
// Send command to worker
workerWs.send(JSON.stringify({
type: 'execute_command',
execution_id: executionId,
command: schedule.command,
worker_id: workerId,
timeout: 300000 // 5 minute timeout for scheduled commands
}));
broadcast({ type: 'execution_started', execution_id: executionId, workflow_id: null });
}
}
// Update last_run and calculate next_run
const nextRun = calculateNextRun(schedule.schedule_type, schedule.schedule_value);
await pool.query(
'UPDATE scheduled_commands SET last_run = NOW(), next_run = ? WHERE id = ?',
[nextRun, schedule.id]
);
}
} catch (error) {
console.error('[Scheduler] Error processing scheduled commands:', error);
}
}
function calculateNextRun(scheduleType, scheduleValue) {
const now = new Date();
if (scheduleType === 'interval') {
// Interval in minutes
const minutes = parseInt(scheduleValue);
return new Date(now.getTime() + minutes * 60000);
} else if (scheduleType === 'daily') {
// Daily at HH:MM
const [hours, minutes] = scheduleValue.split(':').map(Number);
const next = new Date(now);
next.setHours(hours, minutes, 0, 0);
// If time has passed today, schedule for tomorrow
if (next <= now) {
next.setDate(next.getDate() + 1);
}
return next;
} else if (scheduleType === 'hourly') {
// Every N hours
const hours = parseInt(scheduleValue);
return new Date(now.getTime() + hours * 3600000);
}
return null;
}
// Run scheduler every minute
setInterval(processScheduledCommands, 60 * 1000);
// Initial run on startup
setTimeout(processScheduledCommands, 5000);
// WebSocket connections // WebSocket connections
const clients = new Set(); const clients = new Set();
const workers = new Map(); // Map worker_id -> WebSocket connection const workers = new Map(); // Map worker_id -> WebSocket connection
@@ -241,13 +113,12 @@ wss.on('connection', (ws) => {
if (message.type === 'command_result') { if (message.type === 'command_result') {
// Handle command result from worker // Handle command result from worker
const { execution_id, command_id, worker_id, success, stdout, stderr, duration, timestamp } = message; const { execution_id, worker_id, success, stdout, stderr, duration, timestamp } = message;
// Add result to execution logs // Add result to execution logs
await addExecutionLog(execution_id, { await addExecutionLog(execution_id, {
step: 'command_execution', step: 'command_execution',
action: 'command_result', action: 'command_result',
command_id: command_id, // Include command_id for workflow tracking
worker_id: worker_id, worker_id: worker_id,
success: success, success: success,
stdout: stdout, stdout: stdout,
@@ -256,14 +127,9 @@ wss.on('connection', (ws) => {
timestamp: timestamp || new Date().toISOString() timestamp: timestamp || new Date().toISOString()
}); });
// For non-workflow executions, update status immediately // Update execution status to completed or failed
// For workflow executions, the workflow engine will update status const finalStatus = success ? 'completed' : 'failed';
const [execution] = await pool.query('SELECT workflow_id FROM executions WHERE id = ?', [execution_id]); await updateExecutionStatus(execution_id, finalStatus);
if (execution.length > 0 && !execution[0].workflow_id) {
// Only update status for quick commands (no workflow_id)
const finalStatus = success ? 'completed' : 'failed';
await updateExecutionStatus(execution_id, finalStatus);
}
// Broadcast result to all connected clients // Broadcast result to all connected clients
broadcast({ broadcast({
@@ -275,7 +141,7 @@ wss.on('connection', (ws) => {
stderr: stderr stderr: stderr
}); });
console.log(`Command result received for execution ${execution_id}: ${success ? 'success' : 'failed'}`); console.log(`Command result received for execution ${execution_id}: ${finalStatus}`);
} }
if (message.type === 'workflow_result') { if (message.type === 'workflow_result') {
@@ -386,45 +252,6 @@ function broadcast(data) {
}); });
} }
// Helper function to add log entry to execution
async function addExecutionLog(executionId, logEntry) {
try {
const [execution] = await pool.query('SELECT logs FROM executions WHERE id = ?', [executionId]);
if (execution.length > 0) {
const logs = typeof execution[0].logs === 'string' ? JSON.parse(execution[0].logs) : execution[0].logs;
logs.push(logEntry);
await pool.query(
'UPDATE executions SET logs = ? WHERE id = ?',
[JSON.stringify(logs), executionId]
);
}
} catch (error) {
console.error(`[Workflow] Error adding execution log:`, error);
}
}
// Helper function to update execution status
async function updateExecutionStatus(executionId, status) {
try {
await pool.query(
'UPDATE executions SET status = ?, completed_at = NOW() WHERE id = ?',
[status, executionId]
);
broadcast({
type: 'execution_status',
execution_id: executionId,
status: status
});
console.log(`[Workflow] Execution ${executionId} status updated to: ${status}`);
} catch (error) {
console.error(`[Workflow] Error updating execution status:`, error);
}
}
// Authelia SSO Middleware // Authelia SSO Middleware
async function authenticateSSO(req, res, next) { async function authenticateSSO(req, res, next) {
// Check for Authelia headers // Check for Authelia headers
@@ -481,237 +308,6 @@ async function authenticateSSO(req, res, next) {
next(); next();
} }
// Workflow Execution Engine
async function executeWorkflowSteps(executionId, workflowId, definition, username) {
try {
console.log(`[Workflow] Starting execution ${executionId} for workflow ${workflowId}`);
const steps = definition.steps || [];
let allStepsSucceeded = true;
for (let i = 0; i < steps.length; i++) {
const step = steps[i];
console.log(`[Workflow] Execution ${executionId} - Step ${i + 1}/${steps.length}: ${step.name}`);
// Add step start log
await addExecutionLog(executionId, {
step: i + 1,
step_name: step.name,
action: 'step_started',
timestamp: new Date().toISOString()
});
broadcast({
type: 'workflow_step_started',
execution_id: executionId,
step: i + 1,
step_name: step.name
});
if (step.type === 'execute') {
// Execute command step
const success = await executeCommandStep(executionId, step, i + 1);
if (!success) {
allStepsSucceeded = false;
break; // Stop workflow on failure
}
} else if (step.type === 'wait') {
// Wait step (delay in seconds)
const seconds = step.duration || 5;
await addExecutionLog(executionId, {
step: i + 1,
action: 'waiting',
duration: seconds,
timestamp: new Date().toISOString()
});
await new Promise(resolve => setTimeout(resolve, seconds * 1000));
} else if (step.type === 'prompt') {
// Interactive prompt step (not fully implemented, would need user interaction)
await addExecutionLog(executionId, {
step: i + 1,
action: 'prompt_skipped',
message: 'Interactive prompts not yet supported',
timestamp: new Date().toISOString()
});
}
// Add step completion log
await addExecutionLog(executionId, {
step: i + 1,
step_name: step.name,
action: 'step_completed',
timestamp: new Date().toISOString()
});
broadcast({
type: 'workflow_step_completed',
execution_id: executionId,
step: i + 1,
step_name: step.name
});
}
// Mark execution as completed or failed
const finalStatus = allStepsSucceeded ? 'completed' : 'failed';
await updateExecutionStatus(executionId, finalStatus);
console.log(`[Workflow] Execution ${executionId} finished with status: ${finalStatus}`);
} catch (error) {
console.error(`[Workflow] Execution ${executionId} error:`, error);
await addExecutionLog(executionId, {
action: 'workflow_error',
error: error.message,
timestamp: new Date().toISOString()
});
await updateExecutionStatus(executionId, 'failed');
}
}
async function executeCommandStep(executionId, step, stepNumber) {
try {
const command = step.command;
const targets = step.targets || ['all'];
// Determine which workers to target
let targetWorkerIds = [];
if (targets.includes('all')) {
// Get all online workers
const [onlineWorkers] = await pool.query('SELECT id FROM workers WHERE status = ?', ['online']);
targetWorkerIds = onlineWorkers.map(w => w.id);
} else {
// Specific worker IDs or names
for (const target of targets) {
// Try to find by ID first, then by name
const [workerById] = await pool.query('SELECT id FROM workers WHERE id = ?', [target]);
if (workerById.length > 0) {
targetWorkerIds.push(workerById[0].id);
} else {
const [workerByName] = await pool.query('SELECT id FROM workers WHERE name = ?', [target]);
if (workerByName.length > 0) {
targetWorkerIds.push(workerByName[0].id);
}
}
}
}
if (targetWorkerIds.length === 0) {
await addExecutionLog(executionId, {
step: stepNumber,
action: 'no_workers',
message: 'No workers available for this step',
timestamp: new Date().toISOString()
});
return false;
}
// Execute command on each target worker and wait for results
const results = [];
for (const workerId of targetWorkerIds) {
const workerWs = workers.get(workerId);
if (!workerWs || workerWs.readyState !== WebSocket.OPEN) {
await addExecutionLog(executionId, {
step: stepNumber,
action: 'worker_offline',
worker_id: workerId,
timestamp: new Date().toISOString()
});
continue;
}
// Send command to worker
const commandId = generateUUID();
await addExecutionLog(executionId, {
step: stepNumber,
action: 'command_sent',
worker_id: workerId,
command: command,
command_id: commandId,
timestamp: new Date().toISOString()
});
workerWs.send(JSON.stringify({
type: 'execute_command',
execution_id: executionId,
command_id: commandId,
command: command,
worker_id: workerId,
timeout: 120000 // 2 minute timeout
}));
// Wait for command result (with timeout)
const result = await waitForCommandResult(executionId, commandId, 120000);
results.push(result);
if (!result.success) {
// Command failed, workflow should stop
return false;
}
}
// All commands succeeded
return results.every(r => r.success);
} catch (error) {
console.error(`[Workflow] Error executing command step:`, error);
await addExecutionLog(executionId, {
step: stepNumber,
action: 'step_error',
error: error.message,
timestamp: new Date().toISOString()
});
return false;
}
}
async function waitForCommandResult(executionId, commandId, timeout) {
return new Promise((resolve) => {
const startTime = Date.now();
const checkInterval = setInterval(async () => {
try {
// Check if we've received the command result in logs
const [execution] = await pool.query('SELECT logs FROM executions WHERE id = ?', [executionId]);
if (execution.length > 0) {
const logs = typeof execution[0].logs === 'string' ? JSON.parse(execution[0].logs) : execution[0].logs;
const resultLog = logs.find(log => log.command_id === commandId && log.action === 'command_result');
if (resultLog) {
clearInterval(checkInterval);
resolve({
success: resultLog.success,
stdout: resultLog.stdout,
stderr: resultLog.stderr
});
return;
}
}
// Check timeout
if (Date.now() - startTime > timeout) {
clearInterval(checkInterval);
resolve({
success: false,
error: 'Command timeout'
});
}
} catch (error) {
console.error('[Workflow] Error checking command result:', error);
clearInterval(checkInterval);
resolve({
success: false,
error: error.message
});
}
}, 500); // Check every 500ms
});
}
// Routes - All protected by SSO // Routes - All protected by SSO
app.get('/api/user', authenticateSSO, (req, res) => { app.get('/api/user', authenticateSSO, (req, res) => {
res.json(req.user); res.json(req.user);
@@ -731,23 +327,14 @@ app.post('/api/workflows', authenticateSSO, async (req, res) => {
const { name, description, definition } = req.body; const { name, description, definition } = req.body;
const id = generateUUID(); const id = generateUUID();
console.log('[Workflow] Creating workflow:', name);
console.log('[Workflow] Definition:', JSON.stringify(definition, null, 2));
await pool.query( await pool.query(
'INSERT INTO workflows (id, name, description, definition, created_by) VALUES (?, ?, ?, ?, ?)', 'INSERT INTO workflows (id, name, description, definition, created_by) VALUES (?, ?, ?, ?, ?)',
[id, name, description, JSON.stringify(definition), req.user.username] [id, name, description, JSON.stringify(definition), req.user.username]
); );
console.log('[Workflow] Successfully inserted workflow:', id);
res.json({ id, name, description, definition }); res.json({ id, name, description, definition });
console.log('[Workflow] Broadcasting workflow_created');
broadcast({ type: 'workflow_created', workflow_id: id }); broadcast({ type: 'workflow_created', workflow_id: id });
console.log('[Workflow] Broadcast complete');
} catch (error) { } catch (error) {
console.error('[Workflow] Error creating workflow:', error);
res.status(500).json({ error: error.message }); res.status(500).json({ error: error.message });
} }
}); });
@@ -808,28 +395,12 @@ app.post('/api/executions', authenticateSSO, async (req, res) => {
const { workflow_id } = req.body; const { workflow_id } = req.body;
const id = generateUUID(); const id = generateUUID();
// Get workflow definition
const [workflows] = await pool.query('SELECT * FROM workflows WHERE id = ?', [workflow_id]);
if (workflows.length === 0) {
return res.status(404).json({ error: 'Workflow not found' });
}
const workflow = workflows[0];
const definition = typeof workflow.definition === 'string' ? JSON.parse(workflow.definition) : workflow.definition;
// Create execution record
await pool.query( await pool.query(
'INSERT INTO executions (id, workflow_id, status, started_by, started_at, logs) VALUES (?, ?, ?, ?, NOW(), ?)', 'INSERT INTO executions (id, workflow_id, status, started_by, started_at, logs) VALUES (?, ?, ?, ?, NOW(), ?)',
[id, workflow_id, 'running', req.user.username, JSON.stringify([])] [id, workflow_id, 'running', req.user.username, JSON.stringify([])]
); );
broadcast({ type: 'execution_started', execution_id: id, workflow_id }); broadcast({ type: 'execution_started', execution_id: id, workflow_id });
// Start workflow execution asynchronously
executeWorkflowSteps(id, workflow_id, definition, req.user.username).catch(err => {
console.error(`[Workflow] Execution ${id} failed:`, err);
});
res.json({ id, workflow_id, status: 'running' }); res.json({ id, workflow_id, status: 'running' });
} catch (error) { } catch (error) {
res.status(500).json({ error: error.message }); res.status(500).json({ error: error.message });
@@ -838,25 +409,10 @@ app.post('/api/executions', authenticateSSO, async (req, res) => {
app.get('/api/executions', authenticateSSO, async (req, res) => { app.get('/api/executions', authenticateSSO, async (req, res) => {
try { try {
const limit = parseInt(req.query.limit) || 50;
const offset = parseInt(req.query.offset) || 0;
const [rows] = await pool.query( const [rows] = await pool.query(
'SELECT e.*, w.name as workflow_name FROM executions e LEFT JOIN workflows w ON e.workflow_id = w.id ORDER BY e.started_at DESC LIMIT ? OFFSET ?', 'SELECT e.*, w.name as workflow_name FROM executions e LEFT JOIN workflows w ON e.workflow_id = w.id ORDER BY e.started_at DESC LIMIT 50'
[limit, offset]
); );
res.json(rows);
// Get total count
const [countRows] = await pool.query('SELECT COUNT(*) as total FROM executions');
const total = countRows[0].total;
res.json({
executions: rows,
total: total,
limit: limit,
offset: offset,
hasMore: offset + rows.length < total
});
} catch (error) { } catch (error) {
res.status(500).json({ error: error.message }); res.status(500).json({ error: error.message });
} }
@@ -872,104 +428,6 @@ app.delete('/api/executions/:id', authenticateSSO, async (req, res) => {
} }
}); });
app.post('/api/executions/:id/abort', authenticateSSO, async (req, res) => {
try {
const executionId = req.params.id;
// Check if execution exists and is running
const [execution] = await pool.query('SELECT status FROM executions WHERE id = ?', [executionId]);
if (execution.length === 0) {
return res.status(404).json({ error: 'Execution not found' });
}
if (execution[0].status !== 'running') {
return res.status(400).json({ error: 'Execution is not running' });
}
// Add abort log entry
await addExecutionLog(executionId, {
action: 'execution_aborted',
aborted_by: req.user.username,
timestamp: new Date().toISOString()
});
// Update execution status to failed
await updateExecutionStatus(executionId, 'failed');
console.log(`[Execution] Execution ${executionId} aborted by ${req.user.username}`);
res.json({ success: true });
} catch (error) {
console.error('[Execution] Error aborting execution:', error);
res.status(500).json({ error: error.message });
}
});
// Scheduled Commands API
app.get('/api/scheduled-commands', authenticateSSO, async (req, res) => {
try {
const [schedules] = await pool.query(
'SELECT * FROM scheduled_commands ORDER BY created_at DESC'
);
res.json(schedules);
} catch (error) {
res.status(500).json({ error: error.message });
}
});
app.post('/api/scheduled-commands', authenticateSSO, async (req, res) => {
try {
const { name, command, worker_ids, schedule_type, schedule_value } = req.body;
if (!name || !command || !worker_ids || !schedule_type || !schedule_value) {
return res.status(400).json({ error: 'Missing required fields' });
}
const id = generateUUID();
const nextRun = calculateNextRun(schedule_type, schedule_value);
await pool.query(
`INSERT INTO scheduled_commands
(id, name, command, worker_ids, schedule_type, schedule_value, created_by, next_run)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)`,
[id, name, command, JSON.stringify(worker_ids), schedule_type, schedule_value, req.user.username, nextRun]
);
res.json({ success: true, id });
} catch (error) {
res.status(500).json({ error: error.message });
}
});
app.put('/api/scheduled-commands/:id/toggle', authenticateSSO, async (req, res) => {
try {
const { id } = req.params;
const [current] = await pool.query('SELECT enabled FROM scheduled_commands WHERE id = ?', [id]);
if (current.length === 0) {
return res.status(404).json({ error: 'Schedule not found' });
}
const newEnabled = !current[0].enabled;
await pool.query('UPDATE scheduled_commands SET enabled = ? WHERE id = ?', [newEnabled, id]);
res.json({ success: true, enabled: newEnabled });
} catch (error) {
res.status(500).json({ error: error.message });
}
});
app.delete('/api/scheduled-commands/:id', authenticateSSO, async (req, res) => {
try {
const { id } = req.params;
await pool.query('DELETE FROM scheduled_commands WHERE id = ?', [id]);
res.json({ success: true });
} catch (error) {
res.status(500).json({ error: error.message });
}
});
// Health check (no auth required) // Health check (no auth required)
app.get('/health', async (req, res) => { app.get('/health', async (req, res) => {
try { try {
@@ -1009,6 +467,299 @@ initDatabase().then(() => {
// ============================================ // ============================================
// WORKFLOW EXECUTION ENGINE // WORKFLOW EXECUTION ENGINE
// ============================================
// Store active workflow executions in memory
const activeExecutions = new Map();
// Execute workflow step by step
async function executeWorkflow(workflowId, executionId, userId, targetWorkers = 'all') {
try {
// Get workflow definition
const [workflows] = await pool.query('SELECT * FROM workflows WHERE id = ?', [workflowId]);
if (workflows.length === 0) {
throw new Error('Workflow not found');
}
const workflow = workflows[0];
const definition = JSON.parse(workflow.definition);
// Initialize execution state
const executionState = {
id: executionId,
workflowId: workflowId,
currentStep: 0,
steps: definition.steps || [],
results: [],
status: 'running',
waitingForInput: false,
targetWorkers: targetWorkers,
userId: userId
};
activeExecutions.set(executionId, executionState);
// Start executing steps
await executeNextStep(executionId);
} catch (error) {
console.error('Workflow execution error:', error);
await updateExecutionStatus(executionId, 'failed', error.message);
}
}
// Execute the next step in a workflow
async function executeNextStep(executionId) {
const state = activeExecutions.get(executionId);
if (!state) return;
// Check if we've completed all steps
if (state.currentStep >= state.steps.length) {
await updateExecutionStatus(executionId, 'completed');
activeExecutions.delete(executionId);
return;
}
const step = state.steps[state.currentStep];
// Check if step has a condition
if (step.condition && !evaluateCondition(step.condition, state)) {
console.log(`[Workflow] Skipping step ${state.currentStep}: condition not met`);
state.currentStep++;
return executeNextStep(executionId);
}
console.log(`[Workflow] Executing step ${state.currentStep}: ${step.name}`);
try {
switch (step.type) {
case 'execute':
await executeCommandStep(executionId, step);
break;
case 'prompt':
await executePromptStep(executionId, step);
break;
case 'wait':
await executeWaitStep(executionId, step);
break;
default:
throw new Error(`Unknown step type: ${step.type}`);
}
} catch (error) {
await addExecutionLog(executionId, {
step: state.currentStep,
error: error.message,
timestamp: new Date().toISOString()
});
await updateExecutionStatus(executionId, 'failed', error.message);
activeExecutions.delete(executionId);
}
}
// Execute a command on workers
async function executeCommandStep(executionId, step) {
const state = activeExecutions.get(executionId);
// Get target workers
const [workers] = await pool.query(
'SELECT * FROM workers WHERE status = "online"'
);
if (workers.length === 0) {
throw new Error('No online workers available');
}
// Filter workers based on target
let targetWorkers = workers;
if (step.targets && step.targets[0] !== 'all') {
targetWorkers = workers.filter(w => step.targets.includes(w.name));
}
// Send command to workers via WebSocket
const commandMessage = {
type: 'execute_command',
execution_id: executionId,
step_index: state.currentStep,
command: step.command,
timeout: step.timeout || 300000
};
// Broadcast to target workers
clients.forEach(client => {
if (client.readyState === WebSocket.OPEN) {
client.send(JSON.stringify(commandMessage));
}
});
await addExecutionLog(executionId, {
step: state.currentStep,
action: 'command_sent',
command: step.command,
workers: targetWorkers.map(w => w.name),
timestamp: new Date().toISOString()
});
// For now, move to next step immediately
// In production, we'd wait for worker responses
state.currentStep++;
// Small delay to allow command to execute
setTimeout(() => executeNextStep(executionId), 1000);
}
// Execute a user prompt step
async function executePromptStep(executionId, step) {
const state = activeExecutions.get(executionId);
state.waitingForInput = true;
state.promptData = {
message: step.message,
options: step.options,
step: state.currentStep
};
await addExecutionLog(executionId, {
step: state.currentStep,
action: 'waiting_for_input',
prompt: step.message,
timestamp: new Date().toISOString()
});
// Notify frontend that input is needed
broadcast({
type: 'execution_prompt',
execution_id: executionId,
prompt: state.promptData
});
}
// Execute a wait/delay step
async function executeWaitStep(executionId, step) {
const state = activeExecutions.get(executionId);
const delay = step.duration || 1000;
await addExecutionLog(executionId, {
step: state.currentStep,
action: 'waiting',
duration: delay,
timestamp: new Date().toISOString()
});
state.currentStep++;
setTimeout(() => executeNextStep(executionId), delay);
}
// Handle user input for prompts
function handleUserInput(executionId, response) {
const state = activeExecutions.get(executionId);
if (!state || !state.waitingForInput) {
return false;
}
state.promptResponse = response;
state.waitingForInput = false;
state.currentStep++;
addExecutionLog(executionId, {
step: state.currentStep - 1,
action: 'user_response',
response: response,
timestamp: new Date().toISOString()
});
executeNextStep(executionId);
return true;
}
// Evaluate conditions
function evaluateCondition(condition, state) {
try {
// Simple condition evaluation
// In production, use a proper expression evaluator
const promptResponse = state.promptResponse;
return eval(condition);
} catch (error) {
console.error('Condition evaluation error:', error);
return false;
}
}
// Helper functions
async function updateExecutionStatus(executionId, status, error = null) {
const updates = { status };
if (status === 'completed' || status === 'failed') {
updates.completed_at = new Date();
}
if (error) {
// Add error to logs
await addExecutionLog(executionId, { error, timestamp: new Date().toISOString() });
}
await pool.query(
'UPDATE executions SET status = ?, completed_at = ? WHERE id = ?',
[status, updates.completed_at || null, executionId]
);
broadcast({
type: 'execution_status',
execution_id: executionId,
status: status
});
}
async function addExecutionLog(executionId, logEntry) {
const [rows] = await pool.query('SELECT logs FROM executions WHERE id = ?', [executionId]);
if (rows.length === 0) return;
const logs = JSON.parse(rows[0].logs || '[]');
logs.push(logEntry);
await pool.query('UPDATE executions SET logs = ? WHERE id = ?', [
JSON.stringify(logs),
executionId
]);
}
// ============================================
// API ROUTES - Add these to your server.js
// ============================================
// Start workflow execution
app.post('/api/executions', authenticateSSO, async (req, res) => {
try {
const { workflow_id, target_workers } = req.body;
const id = generateUUID();
await pool.query(
'INSERT INTO executions (id, workflow_id, status, started_by, started_at, logs) VALUES (?, ?, ?, ?, NOW(), ?)',
[id, workflow_id, 'running', req.user.username, JSON.stringify([])]
);
// Start execution
executeWorkflow(workflow_id, id, req.user.username, target_workers || 'all');
broadcast({ type: 'execution_started', execution_id: id, workflow_id });
res.json({ id, workflow_id, status: 'running' });
} catch (error) {
res.status(500).json({ error: error.message });
}
});
// Respond to workflow prompt
app.post('/api/executions/:id/respond', authenticateSSO, async (req, res) => {
try {
const { response } = req.body;
const success = handleUserInput(req.params.id, response);
if (success) {
res.json({ success: true });
} else {
res.status(400).json({ error: 'Execution not waiting for input' });
}
} catch (error) {
res.status(500).json({ error: error.message });
}
});
// Get execution details with logs // Get execution details with logs
app.get('/api/executions/:id', authenticateSSO, async (req, res) => { app.get('/api/executions/:id', authenticateSSO, async (req, res) => {
@@ -1019,10 +770,13 @@ app.get('/api/executions/:id', authenticateSSO, async (req, res) => {
} }
const execution = rows[0]; const execution = rows[0];
const state = activeExecutions.get(req.params.id);
res.json({ res.json({
...execution, ...execution,
logs: JSON.parse(execution.logs || '[]') logs: JSON.parse(execution.logs || '[]'),
waiting_for_input: state?.waitingForInput || false,
prompt: state?.promptData || null
}); });
} catch (error) { } catch (error) {
res.status(500).json({ error: error.message }); res.status(500).json({ error: error.message });