Compare commits
20 Commits
main
...
84edea8027
| Author | SHA1 | Date | |
|---|---|---|---|
| 84edea8027 | |||
| 095bfb65ab | |||
| c619add705 | |||
| e6a6b7e359 | |||
| d25ba27f24 | |||
| 76b0a6d0d3 | |||
| adbbec2631 | |||
| 8b8d2c6312 | |||
| 1f5c84f327 | |||
| e03f8d6287 | |||
| 2097b73404 | |||
| 6d15e4d240 | |||
| 7896b40d91 | |||
| e2dc371bfe | |||
| df0184facf | |||
| a8be111e04 | |||
| b3806545bd | |||
| 2767087e27 | |||
| a1cf8ac90b | |||
| 9e842624e1 |
3
.gitignore
vendored
3
.gitignore
vendored
@@ -30,3 +30,6 @@ Thumbs.db
|
||||
*.swp
|
||||
*.swo
|
||||
*~
|
||||
|
||||
# Claude
|
||||
Claude.md
|
||||
477
README.md
477
README.md
@@ -1,240 +1,375 @@
|
||||
# PULSE - Pipelined Unified Logic & Server Engine
|
||||
|
||||
A distributed workflow orchestration platform for managing and executing complex multi-step operations across server clusters through an intuitive web interface.
|
||||
A distributed workflow orchestration platform for managing and executing complex multi-step operations across server clusters through a retro terminal-themed web interface.
|
||||
|
||||
## Overview
|
||||
|
||||
PULSE is a centralized workflow execution system designed to orchestrate operations across distributed infrastructure. It provides a powerful web-based interface for defining, managing, and executing workflows that can span multiple servers, require human interaction, and perform complex automation tasks at scale.
|
||||
PULSE is a centralized workflow execution system designed to orchestrate operations across distributed infrastructure. It provides a powerful web-based interface with a vintage CRT terminal aesthetic for defining, managing, and executing workflows that can span multiple servers, require human interaction, and perform complex automation tasks at scale.
|
||||
|
||||
### Key Features
|
||||
|
||||
- **Interactive Workflow Management**: Define and execute multi-step workflows with conditional logic, user prompts, and decision points
|
||||
- **Distributed Execution**: Run commands and scripts across multiple worker nodes simultaneously
|
||||
- **High Availability Architecture**: Deploy redundant worker nodes in LXC containers with Ceph storage for fault tolerance
|
||||
- **Web-Based Control Center**: Intuitive interface for workflow selection, monitoring, and interactive input
|
||||
- **Flexible Worker Pool**: Scale horizontally by adding worker nodes as needed
|
||||
- **Real-Time Monitoring**: Track workflow progress, view logs, and receive notifications
|
||||
- **🎨 Retro Terminal Interface**: Phosphor green CRT-style interface with scanlines, glow effects, and ASCII art
|
||||
- **⚡ Quick Command Execution**: Instantly execute commands on any worker with built-in templates and command history
|
||||
- **📊 Real-Time Worker Monitoring**: Live system metrics including CPU, memory, load average, and active tasks
|
||||
- **🔄 Interactive Workflow Management**: Define and execute multi-step workflows with conditional logic and user prompts
|
||||
- **🌐 Distributed Execution**: Run commands across multiple worker nodes simultaneously via WebSocket
|
||||
- **📈 Execution Tracking**: Comprehensive logging with formatted output, re-run capabilities, and JSON export
|
||||
- **🔐 SSO Authentication**: Seamless integration with Authelia for enterprise authentication
|
||||
- **🧹 Auto-Cleanup**: Automatic removal of old executions with configurable retention policies
|
||||
- **🔔 Terminal Notifications**: Audio beeps and visual toasts for command completion events
|
||||
|
||||
## Architecture
|
||||
|
||||
PULSE consists of two core components:
|
||||
|
||||
### PULSE Server
|
||||
**Location:** `10.10.10.65` (LXC Container ID: 122)
|
||||
**Directory:** `/opt/pulse-server`
|
||||
|
||||
The central orchestration hub that:
|
||||
- Hosts the web interface for workflow management
|
||||
- Hosts the retro terminal web interface
|
||||
- Manages workflow definitions and execution state
|
||||
- Coordinates task distribution to worker nodes
|
||||
- Handles user interactions and input collection
|
||||
- Coordinates task distribution to worker nodes via WebSocket
|
||||
- Handles user interactions through Authelia SSO
|
||||
- Provides real-time status updates and logging
|
||||
- Stores all data in MariaDB database
|
||||
|
||||
**Technology Stack:**
|
||||
- Node.js 20.x
|
||||
- Express.js (web framework)
|
||||
- WebSocket (ws package) for real-time bidirectional communication
|
||||
- MySQL2 (MariaDB driver)
|
||||
- Authelia SSO integration
|
||||
|
||||
### PULSE Worker
|
||||
**Example:** `10.10.10.151` (LXC Container ID: 153, hostname: pulse-worker-01)
|
||||
**Directory:** `/opt/pulse-worker`
|
||||
|
||||
Lightweight execution agents that:
|
||||
- Connect to the PULSE server and await task assignments
|
||||
- Execute commands, scripts, and code on target infrastructure
|
||||
- Report execution status and results back to the server
|
||||
- Support multiple concurrent workflow executions
|
||||
- Automatically reconnect and resume on failure
|
||||
- Connect to PULSE server via WebSocket with heartbeat monitoring
|
||||
- Execute shell commands and report results in real-time
|
||||
- Provide system metrics (CPU, memory, load, uptime)
|
||||
- Support concurrent task execution with configurable limits
|
||||
- Automatically reconnect on connection loss
|
||||
|
||||
**Technology Stack:**
|
||||
- Node.js 20.x
|
||||
- WebSocket client
|
||||
- Child process execution
|
||||
- System metrics collection
|
||||
|
||||
```
|
||||
┌─────────────────────┐
|
||||
│ PULSE Server │
|
||||
│ (Web Interface) │
|
||||
└──────────┬──────────┘
|
||||
│
|
||||
┌──────┴───────┬──────────────┬──────────────┐
|
||||
│ │ │ │
|
||||
┌───▼────┐ ┌───▼────┐ ┌───▼────┐ ┌───▼────┐
|
||||
│ Worker │ │ Worker │ │ Worker │ │ Worker │
|
||||
│ Node 1 │ │ Node 2 │ │ Node 3 │ │ Node N │
|
||||
└────────┘ └────────┘ └────────┘ └────────┘
|
||||
LXC Containers in Proxmox with Ceph
|
||||
┌─────────────────────────────────┐
|
||||
│ PULSE Server (10.10.10.65) │
|
||||
│ Terminal Web Interface + API │
|
||||
│ ┌───────────┐ ┌──────────┐ │
|
||||
│ │ MariaDB │ │ Authelia │ │
|
||||
│ │ Database │ │ SSO │ │
|
||||
│ └───────────┘ └──────────┘ │
|
||||
└────────────┬────────────────────┘
|
||||
│ WebSocket
|
||||
┌────────┴────────┬───────────┐
|
||||
│ │ │
|
||||
┌───▼────────┐ ┌───▼────┐ ┌──▼─────┐
|
||||
│ Worker 1 │ │Worker 2│ │Worker N│
|
||||
│10.10.10.151│ │ ... │ │ ... │
|
||||
└────────────┘ └────────┘ └────────┘
|
||||
LXC Containers in Proxmox with Ceph
|
||||
```
|
||||
|
||||
## Deployment
|
||||
## Installation
|
||||
|
||||
### Prerequisites
|
||||
|
||||
- **Proxmox VE Cluster**: Hypervisor environment for container deployment
|
||||
- **Ceph Storage**: Distributed storage backend for high availability
|
||||
- **LXC Support**: Container runtime for worker node deployment
|
||||
- **Network Connectivity**: Communication between server and workers
|
||||
- **Node.js 20.x** or higher
|
||||
- **MariaDB 10.x** or higher
|
||||
- **Authelia** configured for SSO (optional but recommended)
|
||||
- **Network Connectivity** between server and workers
|
||||
|
||||
### Installation
|
||||
### PULSE Server Setup
|
||||
|
||||
#### PULSE Server
|
||||
```bash
|
||||
# Clone the repository
|
||||
git clone https://github.com/yourusername/pulse.git
|
||||
cd pulse
|
||||
# Clone repository
|
||||
cd /opt
|
||||
git clone <your-repo-url> pulse-server
|
||||
cd pulse-server
|
||||
|
||||
# Install dependencies
|
||||
npm install # or pip install -r requirements.txt
|
||||
npm install
|
||||
|
||||
# Configure server settings
|
||||
cp config.example.yml config.yml
|
||||
nano config.yml
|
||||
# Create .env file with configuration
|
||||
cat > .env << EOF
|
||||
# Server Configuration
|
||||
PORT=8080
|
||||
SECRET_KEY=your-secret-key-here
|
||||
|
||||
# Start the PULSE server
|
||||
npm start # or python server.py
|
||||
# MariaDB Configuration
|
||||
DB_HOST=10.10.10.50
|
||||
DB_PORT=3306
|
||||
DB_NAME=pulse
|
||||
DB_USER=pulse_user
|
||||
DB_PASSWORD=your-db-password
|
||||
|
||||
# Worker API Key (for worker authentication)
|
||||
WORKER_API_KEY=your-worker-api-key
|
||||
|
||||
# Auto-cleanup configuration (optional)
|
||||
EXECUTION_RETENTION_DAYS=30
|
||||
EOF
|
||||
|
||||
# Create systemd service
|
||||
cat > /etc/systemd/system/pulse.service << EOF
|
||||
[Unit]
|
||||
Description=PULSE Workflow Orchestration Server
|
||||
After=network.target
|
||||
|
||||
[Service]
|
||||
Type=simple
|
||||
User=root
|
||||
WorkingDirectory=/opt/pulse-server
|
||||
ExecStart=/usr/bin/node server.js
|
||||
Restart=always
|
||||
RestartSec=10
|
||||
|
||||
[Install]
|
||||
WantedBy=multi-user.target
|
||||
EOF
|
||||
|
||||
# Start service
|
||||
systemctl daemon-reload
|
||||
systemctl enable pulse.service
|
||||
systemctl start pulse.service
|
||||
```
|
||||
|
||||
#### PULSE Worker
|
||||
### PULSE Worker Setup
|
||||
|
||||
```bash
|
||||
# On each worker node (LXC container)
|
||||
# On each worker node
|
||||
cd /opt
|
||||
git clone <your-repo-url> pulse-worker
|
||||
cd pulse-worker
|
||||
|
||||
# Install dependencies
|
||||
npm install # or pip install -r requirements.txt
|
||||
npm install
|
||||
|
||||
# Configure worker connection
|
||||
cp worker-config.example.yml worker-config.yml
|
||||
nano worker-config.yml
|
||||
# Create .env file
|
||||
cat > .env << EOF
|
||||
# Worker Configuration
|
||||
WORKER_NAME=pulse-worker-01
|
||||
PULSE_SERVER=http://10.10.10.65:8080
|
||||
PULSE_WS=ws://10.10.10.65:8080
|
||||
WORKER_API_KEY=your-worker-api-key
|
||||
|
||||
# Start the worker daemon
|
||||
npm start # or python worker.py
|
||||
```
|
||||
# Performance Settings
|
||||
HEARTBEAT_INTERVAL=30
|
||||
MAX_CONCURRENT_TASKS=5
|
||||
EOF
|
||||
|
||||
### High Availability Setup
|
||||
# Create systemd service
|
||||
cat > /etc/systemd/system/pulse-worker.service << EOF
|
||||
[Unit]
|
||||
Description=PULSE Worker Node
|
||||
After=network.target
|
||||
|
||||
Deploy multiple worker nodes across Proxmox hosts:
|
||||
```bash
|
||||
# Create LXC template
|
||||
pct create 1000 local:vztmpl/ubuntu-22.04-standard_amd64.tar.zst \
|
||||
--rootfs ceph-pool:8 \
|
||||
--memory 2048 \
|
||||
--cores 2 \
|
||||
--net0 name=eth0,bridge=vmbr0,ip=dhcp
|
||||
[Service]
|
||||
Type=simple
|
||||
User=root
|
||||
WorkingDirectory=/opt/pulse-worker
|
||||
ExecStart=/usr/bin/node worker.js
|
||||
Restart=always
|
||||
RestartSec=10
|
||||
|
||||
# Clone for additional workers
|
||||
pct clone 1000 1001 --full --storage ceph-pool
|
||||
pct clone 1000 1002 --full --storage ceph-pool
|
||||
pct clone 1000 1003 --full --storage ceph-pool
|
||||
[Install]
|
||||
WantedBy=multi-user.target
|
||||
EOF
|
||||
|
||||
# Start all workers
|
||||
for i in {1000..1003}; do pct start $i; done
|
||||
# Start service
|
||||
systemctl daemon-reload
|
||||
systemctl enable pulse-worker.service
|
||||
systemctl start pulse-worker.service
|
||||
```
|
||||
|
||||
## Usage
|
||||
|
||||
### Creating a Workflow
|
||||
### Quick Command Execution
|
||||
|
||||
1. Access the PULSE web interface at `http://your-server:8080`
|
||||
2. Navigate to **Workflows** → **Create New**
|
||||
3. Define workflow steps using the visual editor or YAML syntax
|
||||
4. Specify execution targets (specific nodes, groups, or all workers)
|
||||
5. Add interactive prompts where user input is required
|
||||
6. Save and activate the workflow
|
||||
1. Access PULSE at `http://your-server:8080`
|
||||
2. Navigate to **⚡ Quick Command** tab
|
||||
3. Select a worker from the dropdown
|
||||
4. Use **Templates** for pre-built commands or **History** for recent commands
|
||||
5. Enter your command and click **Execute**
|
||||
6. View results in the **Executions** tab
|
||||
|
||||
### Example Workflow
|
||||
```yaml
|
||||
name: "System Update and Reboot"
|
||||
description: "Update all servers in the cluster with user confirmation"
|
||||
steps:
|
||||
- name: "Check Current Versions"
|
||||
type: "execute"
|
||||
targets: ["all"]
|
||||
command: "apt list --upgradable"
|
||||
|
||||
- name: "User Approval"
|
||||
type: "prompt"
|
||||
message: "Review available updates. Proceed with installation?"
|
||||
options: ["Yes", "No", "Cancel"]
|
||||
|
||||
- name: "Install Updates"
|
||||
type: "execute"
|
||||
targets: ["all"]
|
||||
command: "apt-get update && apt-get upgrade -y"
|
||||
condition: "prompt_response == 'Yes'"
|
||||
|
||||
- name: "Reboot Confirmation"
|
||||
type: "prompt"
|
||||
message: "Updates complete. Reboot all servers?"
|
||||
options: ["Yes", "No"]
|
||||
|
||||
- name: "Rolling Reboot"
|
||||
type: "execute"
|
||||
targets: ["all"]
|
||||
command: "reboot"
|
||||
strategy: "rolling"
|
||||
condition: "prompt_response == 'Yes'"
|
||||
```
|
||||
**Built-in Command Templates:**
|
||||
- System Info: `uname -a`
|
||||
- Disk Usage: `df -h`
|
||||
- Memory Usage: `free -h`
|
||||
- CPU Info: `lscpu`
|
||||
- Running Processes: `ps aux --sort=-%mem | head -20`
|
||||
- Network Interfaces: `ip addr show`
|
||||
- Docker Containers: `docker ps -a`
|
||||
- System Logs: `tail -n 50 /var/log/syslog`
|
||||
|
||||
### Running a Workflow
|
||||
### Worker Monitoring
|
||||
|
||||
1. Select a workflow from the dashboard
|
||||
2. Click **Execute**
|
||||
3. Monitor progress in real-time
|
||||
4. Respond to interactive prompts as they appear
|
||||
5. View detailed logs for each execution step
|
||||
The **Workers** tab displays real-time metrics for each worker:
|
||||
- System information (OS, architecture, CPU cores)
|
||||
- Memory usage (used/total with percentage)
|
||||
- Load averages (1m, 5m, 15m)
|
||||
- System uptime
|
||||
- Active tasks vs. maximum concurrent capacity
|
||||
|
||||
## Configuration
|
||||
### Execution Management
|
||||
|
||||
### Server Configuration (`config.yml`)
|
||||
```yaml
|
||||
server:
|
||||
host: "0.0.0.0"
|
||||
port: 8080
|
||||
secret_key: "your-secret-key"
|
||||
- **View Details**: Click any execution to see formatted logs with timestamps, status, and output
|
||||
- **Re-run Command**: Click "Re-run" button in execution details to repeat a command
|
||||
- **Download Logs**: Export execution data as JSON for auditing
|
||||
- **Clear Completed**: Bulk delete finished executions
|
||||
- **Auto-Cleanup**: Executions older than 30 days are automatically removed
|
||||
|
||||
database:
|
||||
type: "postgresql"
|
||||
host: "localhost"
|
||||
port: 5432
|
||||
name: "pulse"
|
||||
### Workflow Creation (Future Feature)
|
||||
|
||||
workers:
|
||||
heartbeat_interval: 30
|
||||
timeout: 300
|
||||
max_concurrent_tasks: 10
|
||||
|
||||
security:
|
||||
enable_authentication: true
|
||||
require_approval: true
|
||||
```
|
||||
|
||||
### Worker Configuration (`worker-config.yml`)
|
||||
```yaml
|
||||
worker:
|
||||
name: "worker-01"
|
||||
server_url: "http://pulse-server:8080"
|
||||
api_key: "worker-api-key"
|
||||
|
||||
resources:
|
||||
max_cpu_percent: 80
|
||||
max_memory_mb: 1024
|
||||
|
||||
executor:
|
||||
shell: "/bin/bash"
|
||||
working_directory: "/tmp/pulse"
|
||||
timeout: 3600
|
||||
```
|
||||
1. Navigate to **Workflows** → **Create New**
|
||||
2. Define workflow steps using JSON syntax
|
||||
3. Specify target workers
|
||||
4. Add interactive prompts where needed
|
||||
5. Save and execute
|
||||
|
||||
## Features in Detail
|
||||
|
||||
### Interactive Workflows
|
||||
- Pause execution to collect user input via web forms
|
||||
- Display intermediate results for review
|
||||
- Conditional branching based on user decisions
|
||||
- Multi-choice prompts with validation
|
||||
### Terminal Aesthetic
|
||||
- Phosphor green (#00ff41) on black (#0a0a0a) color scheme
|
||||
- CRT scanline animation effect
|
||||
- Text glow and shadow effects
|
||||
- ASCII box-drawing characters for borders
|
||||
- Boot sequence animation on first load
|
||||
- Hover effects with smooth transitions
|
||||
|
||||
### Mass Execution
|
||||
- Run commands across all workers simultaneously
|
||||
- Target specific node groups or individual servers
|
||||
- Rolling execution for zero-downtime updates
|
||||
- Parallel and sequential execution strategies
|
||||
### Real-Time Communication
|
||||
- WebSocket-based bidirectional communication
|
||||
- Instant command result notifications
|
||||
- Live worker status updates
|
||||
- Terminal beep sounds for events
|
||||
- Toast notifications with visual feedback
|
||||
|
||||
### Monitoring & Logging
|
||||
- Real-time workflow execution dashboard
|
||||
- Detailed per-step logging and output capture
|
||||
- Historical execution records and analytics
|
||||
- Alert notifications for failures or completion
|
||||
### Execution Tracking
|
||||
- Formatted log display (not raw JSON)
|
||||
- Color-coded success/failure indicators
|
||||
- Timestamp and duration for each step
|
||||
- Scrollable output with syntax highlighting
|
||||
- Persistent history with pagination
|
||||
- Load More button for large execution lists
|
||||
|
||||
### Security
|
||||
- Role-based access control (RBAC)
|
||||
- Authelia SSO integration for user authentication
|
||||
- API key authentication for workers
|
||||
- Workflow approval requirements
|
||||
- Audit logging for all actions
|
||||
- User session management
|
||||
- Admin-only operations (worker deletion, workflow management)
|
||||
- Audit logging for all executions
|
||||
|
||||
### Performance
|
||||
- Automatic cleanup of old executions (configurable retention)
|
||||
- Pagination for large execution lists (50 at a time)
|
||||
- Efficient WebSocket connection pooling
|
||||
- Worker heartbeat monitoring
|
||||
- Database connection pooling
|
||||
|
||||
## Configuration
|
||||
|
||||
### Environment Variables
|
||||
|
||||
**Server (.env):**
|
||||
```bash
|
||||
PORT=8080 # Server port
|
||||
SECRET_KEY=<random-string> # Session secret
|
||||
DB_HOST=10.10.10.50 # MariaDB host
|
||||
DB_PORT=3306 # MariaDB port
|
||||
DB_NAME=pulse # Database name
|
||||
DB_USER=pulse_user # Database user
|
||||
DB_PASSWORD=<password> # Database password
|
||||
WORKER_API_KEY=<api-key> # Worker authentication key
|
||||
EXECUTION_RETENTION_DAYS=30 # Auto-cleanup retention (default: 30)
|
||||
```
|
||||
|
||||
**Worker (.env):**
|
||||
```bash
|
||||
WORKER_NAME=pulse-worker-01 # Unique worker name
|
||||
PULSE_SERVER=http://10.10.10.65:8080 # Server HTTP URL
|
||||
PULSE_WS=ws://10.10.10.65:8080 # Server WebSocket URL
|
||||
WORKER_API_KEY=<api-key> # Must match server key
|
||||
HEARTBEAT_INTERVAL=30 # Heartbeat seconds (default: 30)
|
||||
MAX_CONCURRENT_TASKS=5 # Max parallel tasks (default: 5)
|
||||
```
|
||||
|
||||
## Database Schema
|
||||
|
||||
PULSE uses MariaDB with the following tables:
|
||||
|
||||
- **users**: User accounts from Authelia SSO
|
||||
- **workers**: Worker node registry with metadata
|
||||
- **workflows**: Workflow definitions (JSON)
|
||||
- **executions**: Execution history with logs
|
||||
|
||||
See [Claude.md](Claude.md) for complete schema details.
|
||||
|
||||
## Troubleshooting
|
||||
|
||||
### Worker Not Connecting
|
||||
```bash
|
||||
# Check worker service status
|
||||
systemctl status pulse-worker
|
||||
|
||||
# Check worker logs
|
||||
journalctl -u pulse-worker -n 50 -f
|
||||
|
||||
# Verify API key matches server
|
||||
grep WORKER_API_KEY /opt/pulse-worker/.env
|
||||
```
|
||||
|
||||
### Commands Stuck in "Running"
|
||||
- This was fixed in recent updates - restart the server:
|
||||
```bash
|
||||
systemctl restart pulse.service
|
||||
```
|
||||
|
||||
### Clear All Executions
|
||||
Use the database directly if needed:
|
||||
```bash
|
||||
mysql -h 10.10.10.50 -u pulse_user -p pulse
|
||||
> DELETE FROM executions WHERE status IN ('completed', 'failed');
|
||||
```
|
||||
|
||||
## Development
|
||||
|
||||
### Recent Updates
|
||||
|
||||
**Phase 1-6 Improvements:**
|
||||
- Formatted log display with color-coding
|
||||
- Worker system metrics monitoring
|
||||
- Command templates and history
|
||||
- Re-run and download execution features
|
||||
- Auto-cleanup and pagination
|
||||
- Terminal aesthetic refinements
|
||||
- Audio notifications and visual toasts
|
||||
|
||||
See git history for detailed changelog.
|
||||
|
||||
### Future Enhancements
|
||||
- Full workflow system implementation
|
||||
- Multi-worker command execution
|
||||
- Scheduled/cron job support
|
||||
- Execution search and filtering
|
||||
- Dark/light theme toggle
|
||||
- Mobile-responsive design
|
||||
- REST API documentation
|
||||
- Webhook integrations
|
||||
|
||||
## License
|
||||
|
||||
MIT License - See LICENSE file for details
|
||||
|
||||
---
|
||||
|
||||
**PULSE** - Orchestrating your infrastructure, one heartbeat at a time.
|
||||
**PULSE** - Orchestrating your infrastructure, one heartbeat at a time. ⚡
|
||||
|
||||
Built with retro terminal aesthetics 🖥️ | Powered by WebSockets 🔌 | Secured by Authelia 🔐
|
||||
|
||||
1565
public/index.html
1565
public/index.html
File diff suppressed because it is too large
Load Diff
227
server.js
227
server.js
@@ -46,6 +46,8 @@ async function initDatabase() {
|
||||
)
|
||||
`);
|
||||
|
||||
// Database schema is managed manually - migrations removed after direct database fixes
|
||||
|
||||
await connection.query(`
|
||||
CREATE TABLE IF NOT EXISTS workers (
|
||||
id VARCHAR(36) PRIMARY KEY,
|
||||
@@ -75,13 +77,12 @@ async function initDatabase() {
|
||||
await connection.query(`
|
||||
CREATE TABLE IF NOT EXISTS executions (
|
||||
id VARCHAR(36) PRIMARY KEY,
|
||||
workflow_id VARCHAR(36) NOT NULL,
|
||||
workflow_id VARCHAR(36) NULL,
|
||||
status VARCHAR(50) NOT NULL,
|
||||
started_by VARCHAR(255),
|
||||
started_at TIMESTAMP NULL,
|
||||
completed_at TIMESTAMP NULL,
|
||||
logs JSON,
|
||||
FOREIGN KEY (workflow_id) REFERENCES workflows(id) ON DELETE CASCADE,
|
||||
INDEX idx_workflow (workflow_id),
|
||||
INDEX idx_status (status),
|
||||
INDEX idx_started (started_at)
|
||||
@@ -97,11 +98,170 @@ async function initDatabase() {
|
||||
}
|
||||
}
|
||||
|
||||
// Auto-cleanup old executions (runs daily)
|
||||
async function cleanupOldExecutions() {
|
||||
try {
|
||||
const retentionDays = parseInt(process.env.EXECUTION_RETENTION_DAYS) || 30;
|
||||
const [result] = await pool.query(
|
||||
`DELETE FROM executions
|
||||
WHERE status IN ('completed', 'failed')
|
||||
AND started_at < DATE_SUB(NOW(), INTERVAL ? DAY)`,
|
||||
[retentionDays]
|
||||
);
|
||||
console.log(`[Cleanup] Removed ${result.affectedRows} executions older than ${retentionDays} days`);
|
||||
} catch (error) {
|
||||
console.error('[Cleanup] Error removing old executions:', error);
|
||||
}
|
||||
}
|
||||
|
||||
// Run cleanup daily at 3 AM
|
||||
setInterval(cleanupOldExecutions, 24 * 60 * 60 * 1000);
|
||||
// Run cleanup on startup
|
||||
cleanupOldExecutions();
|
||||
|
||||
// WebSocket connections
|
||||
const clients = new Set();
|
||||
const workers = new Map(); // Map worker_id -> WebSocket connection
|
||||
|
||||
wss.on('connection', (ws) => {
|
||||
clients.add(ws);
|
||||
ws.on('close', () => clients.delete(ws));
|
||||
|
||||
// Handle incoming messages from workers
|
||||
ws.on('message', async (data) => {
|
||||
try {
|
||||
const message = JSON.parse(data.toString());
|
||||
console.log('WebSocket message received:', message.type);
|
||||
|
||||
if (message.type === 'command_result') {
|
||||
// Handle command result from worker
|
||||
const { execution_id, worker_id, success, stdout, stderr, duration, timestamp } = message;
|
||||
|
||||
// Add result to execution logs
|
||||
await addExecutionLog(execution_id, {
|
||||
step: 'command_execution',
|
||||
action: 'command_result',
|
||||
worker_id: worker_id,
|
||||
success: success,
|
||||
stdout: stdout,
|
||||
stderr: stderr,
|
||||
duration: duration,
|
||||
timestamp: timestamp || new Date().toISOString()
|
||||
});
|
||||
|
||||
// Update execution status to completed or failed
|
||||
const finalStatus = success ? 'completed' : 'failed';
|
||||
await updateExecutionStatus(execution_id, finalStatus);
|
||||
|
||||
// Broadcast result to all connected clients
|
||||
broadcast({
|
||||
type: 'command_result',
|
||||
execution_id: execution_id,
|
||||
worker_id: worker_id,
|
||||
success: success,
|
||||
stdout: stdout,
|
||||
stderr: stderr
|
||||
});
|
||||
|
||||
console.log(`Command result received for execution ${execution_id}: ${finalStatus}`);
|
||||
}
|
||||
|
||||
if (message.type === 'workflow_result') {
|
||||
// Handle workflow result from worker
|
||||
const { execution_id, worker_id, success, message: resultMessage, timestamp } = message;
|
||||
|
||||
// Add final result to logs
|
||||
await addExecutionLog(execution_id, {
|
||||
step: 'workflow_completion',
|
||||
action: 'workflow_result',
|
||||
worker_id: worker_id,
|
||||
success: success,
|
||||
message: resultMessage,
|
||||
timestamp: timestamp || new Date().toISOString()
|
||||
});
|
||||
|
||||
// Update execution status
|
||||
const finalStatus = success ? 'completed' : 'failed';
|
||||
await updateExecutionStatus(execution_id, finalStatus);
|
||||
|
||||
// Broadcast completion to all clients
|
||||
broadcast({
|
||||
type: 'workflow_result',
|
||||
execution_id: execution_id,
|
||||
status: finalStatus,
|
||||
success: success,
|
||||
message: resultMessage
|
||||
});
|
||||
|
||||
console.log(`Workflow result received for execution ${execution_id}: ${finalStatus}`);
|
||||
}
|
||||
|
||||
if (message.type === 'worker_connect') {
|
||||
// Handle worker connection
|
||||
const { worker_id, worker_name } = message;
|
||||
console.log(`Worker connected: ${worker_name} (${worker_id})`);
|
||||
|
||||
// Find the database worker ID by name
|
||||
const [dbWorkers] = await pool.query(
|
||||
'SELECT id FROM workers WHERE name = ?',
|
||||
[worker_name]
|
||||
);
|
||||
|
||||
if (dbWorkers.length > 0) {
|
||||
const dbWorkerId = dbWorkers[0].id;
|
||||
|
||||
// Store worker WebSocket connection using BOTH IDs
|
||||
workers.set(worker_id, ws); // Runtime ID
|
||||
workers.set(dbWorkerId, ws); // Database ID
|
||||
|
||||
// Store mapping for cleanup
|
||||
ws.workerId = worker_id;
|
||||
ws.dbWorkerId = dbWorkerId;
|
||||
|
||||
console.log(`Mapped worker: runtime_id=${worker_id}, db_id=${dbWorkerId}, name=${worker_name}`);
|
||||
|
||||
// Update worker status to online
|
||||
await pool.query(
|
||||
`UPDATE workers SET status='online', last_heartbeat=NOW() WHERE id=?`,
|
||||
[dbWorkerId]
|
||||
);
|
||||
|
||||
// Broadcast worker status update with database ID
|
||||
broadcast({
|
||||
type: 'worker_update',
|
||||
worker_id: dbWorkerId,
|
||||
status: 'online'
|
||||
});
|
||||
} else {
|
||||
console.log(`Worker ${worker_name} not found in database, will be created on heartbeat`);
|
||||
}
|
||||
}
|
||||
|
||||
if (message.type === 'pong') {
|
||||
// Handle worker pong response
|
||||
const { worker_id } = message;
|
||||
await pool.query(
|
||||
`UPDATE workers SET last_heartbeat=NOW() WHERE id=?`,
|
||||
[worker_id]
|
||||
);
|
||||
}
|
||||
|
||||
} catch (error) {
|
||||
console.error('WebSocket message error:', error);
|
||||
}
|
||||
});
|
||||
|
||||
ws.on('close', () => {
|
||||
clients.delete(ws);
|
||||
// Remove worker from workers map when disconnected (both runtime and db IDs)
|
||||
if (ws.workerId) {
|
||||
workers.delete(ws.workerId);
|
||||
console.log(`Worker ${ws.workerId} (runtime ID) disconnected`);
|
||||
}
|
||||
if (ws.dbWorkerId) {
|
||||
workers.delete(ws.dbWorkerId);
|
||||
console.log(`Worker ${ws.dbWorkerId} (database ID) disconnected`);
|
||||
}
|
||||
});
|
||||
});
|
||||
|
||||
// Broadcast to all connected clients
|
||||
@@ -270,22 +430,35 @@ app.post('/api/executions', authenticateSSO, async (req, res) => {
|
||||
|
||||
app.get('/api/executions', authenticateSSO, async (req, res) => {
|
||||
try {
|
||||
const limit = parseInt(req.query.limit) || 50;
|
||||
const offset = parseInt(req.query.offset) || 0;
|
||||
|
||||
const [rows] = await pool.query(
|
||||
'SELECT e.*, w.name as workflow_name FROM executions e LEFT JOIN workflows w ON e.workflow_id = w.id ORDER BY e.started_at DESC LIMIT 50'
|
||||
'SELECT e.*, w.name as workflow_name FROM executions e LEFT JOIN workflows w ON e.workflow_id = w.id ORDER BY e.started_at DESC LIMIT ? OFFSET ?',
|
||||
[limit, offset]
|
||||
);
|
||||
res.json(rows);
|
||||
|
||||
// Get total count
|
||||
const [countRows] = await pool.query('SELECT COUNT(*) as total FROM executions');
|
||||
const total = countRows[0].total;
|
||||
|
||||
res.json({
|
||||
executions: rows,
|
||||
total: total,
|
||||
limit: limit,
|
||||
offset: offset,
|
||||
hasMore: offset + rows.length < total
|
||||
});
|
||||
} catch (error) {
|
||||
res.status(500).json({ error: error.message });
|
||||
}
|
||||
});
|
||||
|
||||
app.get('/api/executions/:id', authenticateSSO, async (req, res) => {
|
||||
app.delete('/api/executions/:id', authenticateSSO, async (req, res) => {
|
||||
try {
|
||||
const [rows] = await pool.query('SELECT * FROM executions WHERE id = ?', [req.params.id]);
|
||||
if (rows.length === 0) {
|
||||
return res.status(404).json({ error: 'Not found' });
|
||||
}
|
||||
res.json(rows[0]);
|
||||
await pool.query('DELETE FROM executions WHERE id = ?', [req.params.id]);
|
||||
broadcast({ type: 'execution_deleted', execution_id: req.params.id });
|
||||
res.json({ success: true });
|
||||
} catch (error) {
|
||||
res.status(500).json({ error: error.message });
|
||||
}
|
||||
@@ -666,22 +839,38 @@ app.post('/api/workers/:id/command', authenticateSSO, async (req, res) => {
|
||||
try {
|
||||
const { command } = req.body;
|
||||
const executionId = generateUUID();
|
||||
|
||||
// Send command via WebSocket
|
||||
const workerId = req.params.id;
|
||||
|
||||
// Create execution record in database
|
||||
await pool.query(
|
||||
'INSERT INTO executions (id, workflow_id, status, started_by, started_at, logs) VALUES (?, ?, ?, ?, NOW(), ?)',
|
||||
[executionId, null, 'running', req.user.username, JSON.stringify([{
|
||||
step: 'quick_command',
|
||||
action: 'command_sent',
|
||||
worker_id: workerId,
|
||||
command: command,
|
||||
timestamp: new Date().toISOString()
|
||||
}])]
|
||||
);
|
||||
|
||||
// Send command via WebSocket to specific worker
|
||||
const commandMessage = {
|
||||
type: 'execute_command',
|
||||
execution_id: executionId,
|
||||
command: command,
|
||||
worker_id: req.params.id,
|
||||
worker_id: workerId,
|
||||
timeout: 60000
|
||||
};
|
||||
|
||||
clients.forEach(client => {
|
||||
if (client.readyState === WebSocket.OPEN) {
|
||||
client.send(JSON.stringify(commandMessage));
|
||||
}
|
||||
});
|
||||
const workerWs = workers.get(workerId);
|
||||
if (!workerWs || workerWs.readyState !== WebSocket.OPEN) {
|
||||
return res.status(400).json({ error: 'Worker not connected' });
|
||||
}
|
||||
|
||||
workerWs.send(JSON.stringify(commandMessage));
|
||||
console.log(`Command sent to worker ${workerId}: ${command}`);
|
||||
|
||||
broadcast({ type: 'execution_started', execution_id: executionId, workflow_id: null });
|
||||
res.json({ success: true, execution_id: executionId });
|
||||
} catch (error) {
|
||||
res.status(500).json({ error: error.message });
|
||||
|
||||
Reference in New Issue
Block a user