Compare commits
30 Commits
1f5c84f327
...
main
| Author | SHA1 | Date | |
|---|---|---|---|
| e7707d7edb | |||
| ea7a2d82e6 | |||
| 8224f3b6a4 | |||
| 9b31b7619f | |||
| 06eb2d2593 | |||
| aaeb59a8e2 | |||
| b85bd58c4b | |||
| 018752813a | |||
| 4511fac486 | |||
| 4e17fdbf8c | |||
| 5c41afed85 | |||
| 4baecc54d3 | |||
| 661c83a578 | |||
| c6e3e5704e | |||
| 9f972182b2 | |||
| fff50f19da | |||
| 8152a827e6 | |||
| bc3524e163 | |||
| f8ec651e73 | |||
| 7656f4a151 | |||
| e13fe9d22f | |||
| 8bda9672d6 | |||
| 4974730dc8 | |||
| df581e85a8 | |||
| e4574627f1 | |||
| 1d994dc8d6 | |||
| 02ed71e3e0 | |||
| cff058818e | |||
| 05c304f2ed | |||
| 20cff59cee |
477
README.md
477
README.md
@@ -1,240 +1,375 @@
|
|||||||
# PULSE - Pipelined Unified Logic & Server Engine
|
# PULSE - Pipelined Unified Logic & Server Engine
|
||||||
|
|
||||||
A distributed workflow orchestration platform for managing and executing complex multi-step operations across server clusters through an intuitive web interface.
|
A distributed workflow orchestration platform for managing and executing complex multi-step operations across server clusters through a retro terminal-themed web interface.
|
||||||
|
|
||||||
## Overview
|
## Overview
|
||||||
|
|
||||||
PULSE is a centralized workflow execution system designed to orchestrate operations across distributed infrastructure. It provides a powerful web-based interface for defining, managing, and executing workflows that can span multiple servers, require human interaction, and perform complex automation tasks at scale.
|
PULSE is a centralized workflow execution system designed to orchestrate operations across distributed infrastructure. It provides a powerful web-based interface with a vintage CRT terminal aesthetic for defining, managing, and executing workflows that can span multiple servers, require human interaction, and perform complex automation tasks at scale.
|
||||||
|
|
||||||
### Key Features
|
### Key Features
|
||||||
|
|
||||||
- **Interactive Workflow Management**: Define and execute multi-step workflows with conditional logic, user prompts, and decision points
|
- **🎨 Retro Terminal Interface**: Phosphor green CRT-style interface with scanlines, glow effects, and ASCII art
|
||||||
- **Distributed Execution**: Run commands and scripts across multiple worker nodes simultaneously
|
- **⚡ Quick Command Execution**: Instantly execute commands on any worker with built-in templates and command history
|
||||||
- **High Availability Architecture**: Deploy redundant worker nodes in LXC containers with Ceph storage for fault tolerance
|
- **📊 Real-Time Worker Monitoring**: Live system metrics including CPU, memory, load average, and active tasks
|
||||||
- **Web-Based Control Center**: Intuitive interface for workflow selection, monitoring, and interactive input
|
- **🔄 Interactive Workflow Management**: Define and execute multi-step workflows with conditional logic and user prompts
|
||||||
- **Flexible Worker Pool**: Scale horizontally by adding worker nodes as needed
|
- **🌐 Distributed Execution**: Run commands across multiple worker nodes simultaneously via WebSocket
|
||||||
- **Real-Time Monitoring**: Track workflow progress, view logs, and receive notifications
|
- **📈 Execution Tracking**: Comprehensive logging with formatted output, re-run capabilities, and JSON export
|
||||||
|
- **🔐 SSO Authentication**: Seamless integration with Authelia for enterprise authentication
|
||||||
|
- **🧹 Auto-Cleanup**: Automatic removal of old executions with configurable retention policies
|
||||||
|
- **🔔 Terminal Notifications**: Audio beeps and visual toasts for command completion events
|
||||||
|
|
||||||
## Architecture
|
## Architecture
|
||||||
|
|
||||||
PULSE consists of two core components:
|
PULSE consists of two core components:
|
||||||
|
|
||||||
### PULSE Server
|
### PULSE Server
|
||||||
|
**Location:** `10.10.10.65` (LXC Container ID: 122)
|
||||||
|
**Directory:** `/opt/pulse-server`
|
||||||
|
|
||||||
The central orchestration hub that:
|
The central orchestration hub that:
|
||||||
- Hosts the web interface for workflow management
|
- Hosts the retro terminal web interface
|
||||||
- Manages workflow definitions and execution state
|
- Manages workflow definitions and execution state
|
||||||
- Coordinates task distribution to worker nodes
|
- Coordinates task distribution to worker nodes via WebSocket
|
||||||
- Handles user interactions and input collection
|
- Handles user interactions through Authelia SSO
|
||||||
- Provides real-time status updates and logging
|
- Provides real-time status updates and logging
|
||||||
|
- Stores all data in MariaDB database
|
||||||
|
|
||||||
|
**Technology Stack:**
|
||||||
|
- Node.js 20.x
|
||||||
|
- Express.js (web framework)
|
||||||
|
- WebSocket (ws package) for real-time bidirectional communication
|
||||||
|
- MySQL2 (MariaDB driver)
|
||||||
|
- Authelia SSO integration
|
||||||
|
|
||||||
### PULSE Worker
|
### PULSE Worker
|
||||||
|
**Example:** `10.10.10.151` (LXC Container ID: 153, hostname: pulse-worker-01)
|
||||||
|
**Directory:** `/opt/pulse-worker`
|
||||||
|
|
||||||
Lightweight execution agents that:
|
Lightweight execution agents that:
|
||||||
- Connect to the PULSE server and await task assignments
|
- Connect to PULSE server via WebSocket with heartbeat monitoring
|
||||||
- Execute commands, scripts, and code on target infrastructure
|
- Execute shell commands and report results in real-time
|
||||||
- Report execution status and results back to the server
|
- Provide system metrics (CPU, memory, load, uptime)
|
||||||
- Support multiple concurrent workflow executions
|
- Support concurrent task execution with configurable limits
|
||||||
- Automatically reconnect and resume on failure
|
- Automatically reconnect on connection loss
|
||||||
|
|
||||||
|
**Technology Stack:**
|
||||||
|
- Node.js 20.x
|
||||||
|
- WebSocket client
|
||||||
|
- Child process execution
|
||||||
|
- System metrics collection
|
||||||
|
|
||||||
```
|
```
|
||||||
┌─────────────────────┐
|
┌─────────────────────────────────┐
|
||||||
│ PULSE Server │
|
│ PULSE Server (10.10.10.65) │
|
||||||
│ (Web Interface) │
|
│ Terminal Web Interface + API │
|
||||||
└──────────┬──────────┘
|
│ ┌───────────┐ ┌──────────┐ │
|
||||||
│
|
│ │ MariaDB │ │ Authelia │ │
|
||||||
┌──────┴───────┬──────────────┬──────────────┐
|
│ │ Database │ │ SSO │ │
|
||||||
│ │ │ │
|
│ └───────────┘ └──────────┘ │
|
||||||
┌───▼────┐ ┌───▼────┐ ┌───▼────┐ ┌───▼────┐
|
└────────────┬────────────────────┘
|
||||||
│ Worker │ │ Worker │ │ Worker │ │ Worker │
|
│ WebSocket
|
||||||
│ Node 1 │ │ Node 2 │ │ Node 3 │ │ Node N │
|
┌────────┴────────┬───────────┐
|
||||||
└────────┘ └────────┘ └────────┘ └────────┘
|
│ │ │
|
||||||
LXC Containers in Proxmox with Ceph
|
┌───▼────────┐ ┌───▼────┐ ┌──▼─────┐
|
||||||
|
│ Worker 1 │ │Worker 2│ │Worker N│
|
||||||
|
│10.10.10.151│ │ ... │ │ ... │
|
||||||
|
└────────────┘ └────────┘ └────────┘
|
||||||
|
LXC Containers in Proxmox with Ceph
|
||||||
```
|
```
|
||||||
|
|
||||||
## Deployment
|
## Installation
|
||||||
|
|
||||||
### Prerequisites
|
### Prerequisites
|
||||||
|
|
||||||
- **Proxmox VE Cluster**: Hypervisor environment for container deployment
|
- **Node.js 20.x** or higher
|
||||||
- **Ceph Storage**: Distributed storage backend for high availability
|
- **MariaDB 10.x** or higher
|
||||||
- **LXC Support**: Container runtime for worker node deployment
|
- **Authelia** configured for SSO (optional but recommended)
|
||||||
- **Network Connectivity**: Communication between server and workers
|
- **Network Connectivity** between server and workers
|
||||||
|
|
||||||
### Installation
|
### PULSE Server Setup
|
||||||
|
|
||||||
#### PULSE Server
|
|
||||||
```bash
|
```bash
|
||||||
# Clone the repository
|
# Clone repository
|
||||||
git clone https://github.com/yourusername/pulse.git
|
cd /opt
|
||||||
cd pulse
|
git clone <your-repo-url> pulse-server
|
||||||
|
cd pulse-server
|
||||||
|
|
||||||
# Install dependencies
|
# Install dependencies
|
||||||
npm install # or pip install -r requirements.txt
|
npm install
|
||||||
|
|
||||||
# Configure server settings
|
# Create .env file with configuration
|
||||||
cp config.example.yml config.yml
|
cat > .env << EOF
|
||||||
nano config.yml
|
# Server Configuration
|
||||||
|
PORT=8080
|
||||||
|
SECRET_KEY=your-secret-key-here
|
||||||
|
|
||||||
# Start the PULSE server
|
# MariaDB Configuration
|
||||||
npm start # or python server.py
|
DB_HOST=10.10.10.50
|
||||||
|
DB_PORT=3306
|
||||||
|
DB_NAME=pulse
|
||||||
|
DB_USER=pulse_user
|
||||||
|
DB_PASSWORD=your-db-password
|
||||||
|
|
||||||
|
# Worker API Key (for worker authentication)
|
||||||
|
WORKER_API_KEY=your-worker-api-key
|
||||||
|
|
||||||
|
# Auto-cleanup configuration (optional)
|
||||||
|
EXECUTION_RETENTION_DAYS=30
|
||||||
|
EOF
|
||||||
|
|
||||||
|
# Create systemd service
|
||||||
|
cat > /etc/systemd/system/pulse.service << EOF
|
||||||
|
[Unit]
|
||||||
|
Description=PULSE Workflow Orchestration Server
|
||||||
|
After=network.target
|
||||||
|
|
||||||
|
[Service]
|
||||||
|
Type=simple
|
||||||
|
User=root
|
||||||
|
WorkingDirectory=/opt/pulse-server
|
||||||
|
ExecStart=/usr/bin/node server.js
|
||||||
|
Restart=always
|
||||||
|
RestartSec=10
|
||||||
|
|
||||||
|
[Install]
|
||||||
|
WantedBy=multi-user.target
|
||||||
|
EOF
|
||||||
|
|
||||||
|
# Start service
|
||||||
|
systemctl daemon-reload
|
||||||
|
systemctl enable pulse.service
|
||||||
|
systemctl start pulse.service
|
||||||
```
|
```
|
||||||
|
|
||||||
#### PULSE Worker
|
### PULSE Worker Setup
|
||||||
|
|
||||||
```bash
|
```bash
|
||||||
# On each worker node (LXC container)
|
# On each worker node
|
||||||
|
cd /opt
|
||||||
|
git clone <your-repo-url> pulse-worker
|
||||||
cd pulse-worker
|
cd pulse-worker
|
||||||
|
|
||||||
# Install dependencies
|
# Install dependencies
|
||||||
npm install # or pip install -r requirements.txt
|
npm install
|
||||||
|
|
||||||
# Configure worker connection
|
# Create .env file
|
||||||
cp worker-config.example.yml worker-config.yml
|
cat > .env << EOF
|
||||||
nano worker-config.yml
|
# Worker Configuration
|
||||||
|
WORKER_NAME=pulse-worker-01
|
||||||
|
PULSE_SERVER=http://10.10.10.65:8080
|
||||||
|
PULSE_WS=ws://10.10.10.65:8080
|
||||||
|
WORKER_API_KEY=your-worker-api-key
|
||||||
|
|
||||||
# Start the worker daemon
|
# Performance Settings
|
||||||
npm start # or python worker.py
|
HEARTBEAT_INTERVAL=30
|
||||||
```
|
MAX_CONCURRENT_TASKS=5
|
||||||
|
EOF
|
||||||
|
|
||||||
### High Availability Setup
|
# Create systemd service
|
||||||
|
cat > /etc/systemd/system/pulse-worker.service << EOF
|
||||||
|
[Unit]
|
||||||
|
Description=PULSE Worker Node
|
||||||
|
After=network.target
|
||||||
|
|
||||||
Deploy multiple worker nodes across Proxmox hosts:
|
[Service]
|
||||||
```bash
|
Type=simple
|
||||||
# Create LXC template
|
User=root
|
||||||
pct create 1000 local:vztmpl/ubuntu-22.04-standard_amd64.tar.zst \
|
WorkingDirectory=/opt/pulse-worker
|
||||||
--rootfs ceph-pool:8 \
|
ExecStart=/usr/bin/node worker.js
|
||||||
--memory 2048 \
|
Restart=always
|
||||||
--cores 2 \
|
RestartSec=10
|
||||||
--net0 name=eth0,bridge=vmbr0,ip=dhcp
|
|
||||||
|
|
||||||
# Clone for additional workers
|
[Install]
|
||||||
pct clone 1000 1001 --full --storage ceph-pool
|
WantedBy=multi-user.target
|
||||||
pct clone 1000 1002 --full --storage ceph-pool
|
EOF
|
||||||
pct clone 1000 1003 --full --storage ceph-pool
|
|
||||||
|
|
||||||
# Start all workers
|
# Start service
|
||||||
for i in {1000..1003}; do pct start $i; done
|
systemctl daemon-reload
|
||||||
|
systemctl enable pulse-worker.service
|
||||||
|
systemctl start pulse-worker.service
|
||||||
```
|
```
|
||||||
|
|
||||||
## Usage
|
## Usage
|
||||||
|
|
||||||
### Creating a Workflow
|
### Quick Command Execution
|
||||||
|
|
||||||
1. Access the PULSE web interface at `http://your-server:8080`
|
1. Access PULSE at `http://your-server:8080`
|
||||||
2. Navigate to **Workflows** → **Create New**
|
2. Navigate to **⚡ Quick Command** tab
|
||||||
3. Define workflow steps using the visual editor or YAML syntax
|
3. Select a worker from the dropdown
|
||||||
4. Specify execution targets (specific nodes, groups, or all workers)
|
4. Use **Templates** for pre-built commands or **History** for recent commands
|
||||||
5. Add interactive prompts where user input is required
|
5. Enter your command and click **Execute**
|
||||||
6. Save and activate the workflow
|
6. View results in the **Executions** tab
|
||||||
|
|
||||||
### Example Workflow
|
**Built-in Command Templates:**
|
||||||
```yaml
|
- System Info: `uname -a`
|
||||||
name: "System Update and Reboot"
|
- Disk Usage: `df -h`
|
||||||
description: "Update all servers in the cluster with user confirmation"
|
- Memory Usage: `free -h`
|
||||||
steps:
|
- CPU Info: `lscpu`
|
||||||
- name: "Check Current Versions"
|
- Running Processes: `ps aux --sort=-%mem | head -20`
|
||||||
type: "execute"
|
- Network Interfaces: `ip addr show`
|
||||||
targets: ["all"]
|
- Docker Containers: `docker ps -a`
|
||||||
command: "apt list --upgradable"
|
- System Logs: `tail -n 50 /var/log/syslog`
|
||||||
|
|
||||||
- name: "User Approval"
|
|
||||||
type: "prompt"
|
|
||||||
message: "Review available updates. Proceed with installation?"
|
|
||||||
options: ["Yes", "No", "Cancel"]
|
|
||||||
|
|
||||||
- name: "Install Updates"
|
|
||||||
type: "execute"
|
|
||||||
targets: ["all"]
|
|
||||||
command: "apt-get update && apt-get upgrade -y"
|
|
||||||
condition: "prompt_response == 'Yes'"
|
|
||||||
|
|
||||||
- name: "Reboot Confirmation"
|
|
||||||
type: "prompt"
|
|
||||||
message: "Updates complete. Reboot all servers?"
|
|
||||||
options: ["Yes", "No"]
|
|
||||||
|
|
||||||
- name: "Rolling Reboot"
|
|
||||||
type: "execute"
|
|
||||||
targets: ["all"]
|
|
||||||
command: "reboot"
|
|
||||||
strategy: "rolling"
|
|
||||||
condition: "prompt_response == 'Yes'"
|
|
||||||
```
|
|
||||||
|
|
||||||
### Running a Workflow
|
### Worker Monitoring
|
||||||
|
|
||||||
1. Select a workflow from the dashboard
|
The **Workers** tab displays real-time metrics for each worker:
|
||||||
2. Click **Execute**
|
- System information (OS, architecture, CPU cores)
|
||||||
3. Monitor progress in real-time
|
- Memory usage (used/total with percentage)
|
||||||
4. Respond to interactive prompts as they appear
|
- Load averages (1m, 5m, 15m)
|
||||||
5. View detailed logs for each execution step
|
- System uptime
|
||||||
|
- Active tasks vs. maximum concurrent capacity
|
||||||
|
|
||||||
## Configuration
|
### Execution Management
|
||||||
|
|
||||||
### Server Configuration (`config.yml`)
|
- **View Details**: Click any execution to see formatted logs with timestamps, status, and output
|
||||||
```yaml
|
- **Re-run Command**: Click "Re-run" button in execution details to repeat a command
|
||||||
server:
|
- **Download Logs**: Export execution data as JSON for auditing
|
||||||
host: "0.0.0.0"
|
- **Clear Completed**: Bulk delete finished executions
|
||||||
port: 8080
|
- **Auto-Cleanup**: Executions older than 30 days are automatically removed
|
||||||
secret_key: "your-secret-key"
|
|
||||||
|
|
||||||
database:
|
### Workflow Creation (Future Feature)
|
||||||
type: "postgresql"
|
|
||||||
host: "localhost"
|
|
||||||
port: 5432
|
|
||||||
name: "pulse"
|
|
||||||
|
|
||||||
workers:
|
1. Navigate to **Workflows** → **Create New**
|
||||||
heartbeat_interval: 30
|
2. Define workflow steps using JSON syntax
|
||||||
timeout: 300
|
3. Specify target workers
|
||||||
max_concurrent_tasks: 10
|
4. Add interactive prompts where needed
|
||||||
|
5. Save and execute
|
||||||
security:
|
|
||||||
enable_authentication: true
|
|
||||||
require_approval: true
|
|
||||||
```
|
|
||||||
|
|
||||||
### Worker Configuration (`worker-config.yml`)
|
|
||||||
```yaml
|
|
||||||
worker:
|
|
||||||
name: "worker-01"
|
|
||||||
server_url: "http://pulse-server:8080"
|
|
||||||
api_key: "worker-api-key"
|
|
||||||
|
|
||||||
resources:
|
|
||||||
max_cpu_percent: 80
|
|
||||||
max_memory_mb: 1024
|
|
||||||
|
|
||||||
executor:
|
|
||||||
shell: "/bin/bash"
|
|
||||||
working_directory: "/tmp/pulse"
|
|
||||||
timeout: 3600
|
|
||||||
```
|
|
||||||
|
|
||||||
## Features in Detail
|
## Features in Detail
|
||||||
|
|
||||||
### Interactive Workflows
|
### Terminal Aesthetic
|
||||||
- Pause execution to collect user input via web forms
|
- Phosphor green (#00ff41) on black (#0a0a0a) color scheme
|
||||||
- Display intermediate results for review
|
- CRT scanline animation effect
|
||||||
- Conditional branching based on user decisions
|
- Text glow and shadow effects
|
||||||
- Multi-choice prompts with validation
|
- ASCII box-drawing characters for borders
|
||||||
|
- Boot sequence animation on first load
|
||||||
|
- Hover effects with smooth transitions
|
||||||
|
|
||||||
### Mass Execution
|
### Real-Time Communication
|
||||||
- Run commands across all workers simultaneously
|
- WebSocket-based bidirectional communication
|
||||||
- Target specific node groups or individual servers
|
- Instant command result notifications
|
||||||
- Rolling execution for zero-downtime updates
|
- Live worker status updates
|
||||||
- Parallel and sequential execution strategies
|
- Terminal beep sounds for events
|
||||||
|
- Toast notifications with visual feedback
|
||||||
|
|
||||||
### Monitoring & Logging
|
### Execution Tracking
|
||||||
- Real-time workflow execution dashboard
|
- Formatted log display (not raw JSON)
|
||||||
- Detailed per-step logging and output capture
|
- Color-coded success/failure indicators
|
||||||
- Historical execution records and analytics
|
- Timestamp and duration for each step
|
||||||
- Alert notifications for failures or completion
|
- Scrollable output with syntax highlighting
|
||||||
|
- Persistent history with pagination
|
||||||
|
- Load More button for large execution lists
|
||||||
|
|
||||||
### Security
|
### Security
|
||||||
- Role-based access control (RBAC)
|
- Authelia SSO integration for user authentication
|
||||||
- API key authentication for workers
|
- API key authentication for workers
|
||||||
- Workflow approval requirements
|
- User session management
|
||||||
- Audit logging for all actions
|
- Admin-only operations (worker deletion, workflow management)
|
||||||
|
- Audit logging for all executions
|
||||||
|
|
||||||
|
### Performance
|
||||||
|
- Automatic cleanup of old executions (configurable retention)
|
||||||
|
- Pagination for large execution lists (50 at a time)
|
||||||
|
- Efficient WebSocket connection pooling
|
||||||
|
- Worker heartbeat monitoring
|
||||||
|
- Database connection pooling
|
||||||
|
|
||||||
|
## Configuration
|
||||||
|
|
||||||
|
### Environment Variables
|
||||||
|
|
||||||
|
**Server (.env):**
|
||||||
|
```bash
|
||||||
|
PORT=8080 # Server port
|
||||||
|
SECRET_KEY=<random-string> # Session secret
|
||||||
|
DB_HOST=10.10.10.50 # MariaDB host
|
||||||
|
DB_PORT=3306 # MariaDB port
|
||||||
|
DB_NAME=pulse # Database name
|
||||||
|
DB_USER=pulse_user # Database user
|
||||||
|
DB_PASSWORD=<password> # Database password
|
||||||
|
WORKER_API_KEY=<api-key> # Worker authentication key
|
||||||
|
EXECUTION_RETENTION_DAYS=30 # Auto-cleanup retention (default: 30)
|
||||||
|
```
|
||||||
|
|
||||||
|
**Worker (.env):**
|
||||||
|
```bash
|
||||||
|
WORKER_NAME=pulse-worker-01 # Unique worker name
|
||||||
|
PULSE_SERVER=http://10.10.10.65:8080 # Server HTTP URL
|
||||||
|
PULSE_WS=ws://10.10.10.65:8080 # Server WebSocket URL
|
||||||
|
WORKER_API_KEY=<api-key> # Must match server key
|
||||||
|
HEARTBEAT_INTERVAL=30 # Heartbeat seconds (default: 30)
|
||||||
|
MAX_CONCURRENT_TASKS=5 # Max parallel tasks (default: 5)
|
||||||
|
```
|
||||||
|
|
||||||
|
## Database Schema
|
||||||
|
|
||||||
|
PULSE uses MariaDB with the following tables:
|
||||||
|
|
||||||
|
- **users**: User accounts from Authelia SSO
|
||||||
|
- **workers**: Worker node registry with metadata
|
||||||
|
- **workflows**: Workflow definitions (JSON)
|
||||||
|
- **executions**: Execution history with logs
|
||||||
|
|
||||||
|
See [Claude.md](Claude.md) for complete schema details.
|
||||||
|
|
||||||
|
## Troubleshooting
|
||||||
|
|
||||||
|
### Worker Not Connecting
|
||||||
|
```bash
|
||||||
|
# Check worker service status
|
||||||
|
systemctl status pulse-worker
|
||||||
|
|
||||||
|
# Check worker logs
|
||||||
|
journalctl -u pulse-worker -n 50 -f
|
||||||
|
|
||||||
|
# Verify API key matches server
|
||||||
|
grep WORKER_API_KEY /opt/pulse-worker/.env
|
||||||
|
```
|
||||||
|
|
||||||
|
### Commands Stuck in "Running"
|
||||||
|
- This was fixed in recent updates - restart the server:
|
||||||
|
```bash
|
||||||
|
systemctl restart pulse.service
|
||||||
|
```
|
||||||
|
|
||||||
|
### Clear All Executions
|
||||||
|
Use the database directly if needed:
|
||||||
|
```bash
|
||||||
|
mysql -h 10.10.10.50 -u pulse_user -p pulse
|
||||||
|
> DELETE FROM executions WHERE status IN ('completed', 'failed');
|
||||||
|
```
|
||||||
|
|
||||||
|
## Development
|
||||||
|
|
||||||
|
### Recent Updates
|
||||||
|
|
||||||
|
**Phase 1-6 Improvements:**
|
||||||
|
- Formatted log display with color-coding
|
||||||
|
- Worker system metrics monitoring
|
||||||
|
- Command templates and history
|
||||||
|
- Re-run and download execution features
|
||||||
|
- Auto-cleanup and pagination
|
||||||
|
- Terminal aesthetic refinements
|
||||||
|
- Audio notifications and visual toasts
|
||||||
|
|
||||||
|
See git history for detailed changelog.
|
||||||
|
|
||||||
|
### Future Enhancements
|
||||||
|
- Full workflow system implementation
|
||||||
|
- Multi-worker command execution
|
||||||
|
- Scheduled/cron job support
|
||||||
|
- Execution search and filtering
|
||||||
|
- Dark/light theme toggle
|
||||||
|
- Mobile-responsive design
|
||||||
|
- REST API documentation
|
||||||
|
- Webhook integrations
|
||||||
|
|
||||||
|
## License
|
||||||
|
|
||||||
|
MIT License - See LICENSE file for details
|
||||||
|
|
||||||
---
|
---
|
||||||
|
|
||||||
**PULSE** - Orchestrating your infrastructure, one heartbeat at a time.
|
**PULSE** - Orchestrating your infrastructure, one heartbeat at a time. ⚡
|
||||||
|
|
||||||
|
Built with retro terminal aesthetics 🖥️ | Powered by WebSockets 🔌 | Secured by Authelia 🔐
|
||||||
|
|||||||
1638
public/index.html
1638
public/index.html
File diff suppressed because it is too large
Load Diff
866
server.js
866
server.js
@@ -89,6 +89,24 @@ async function initDatabase() {
|
|||||||
)
|
)
|
||||||
`);
|
`);
|
||||||
|
|
||||||
|
await connection.query(`
|
||||||
|
CREATE TABLE IF NOT EXISTS scheduled_commands (
|
||||||
|
id VARCHAR(36) PRIMARY KEY,
|
||||||
|
name VARCHAR(255) NOT NULL,
|
||||||
|
command TEXT NOT NULL,
|
||||||
|
worker_ids JSON NOT NULL,
|
||||||
|
schedule_type VARCHAR(50) NOT NULL,
|
||||||
|
schedule_value VARCHAR(255) NOT NULL,
|
||||||
|
enabled BOOLEAN DEFAULT TRUE,
|
||||||
|
created_by VARCHAR(255),
|
||||||
|
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
|
||||||
|
last_run TIMESTAMP NULL,
|
||||||
|
next_run TIMESTAMP NULL,
|
||||||
|
INDEX idx_enabled (enabled),
|
||||||
|
INDEX idx_next_run (next_run)
|
||||||
|
)
|
||||||
|
`);
|
||||||
|
|
||||||
console.log('Database tables initialized successfully');
|
console.log('Database tables initialized successfully');
|
||||||
} catch (error) {
|
} catch (error) {
|
||||||
console.error('Database initialization error:', error);
|
console.error('Database initialization error:', error);
|
||||||
@@ -98,6 +116,116 @@ async function initDatabase() {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Auto-cleanup old executions (runs daily)
|
||||||
|
async function cleanupOldExecutions() {
|
||||||
|
try {
|
||||||
|
const retentionDays = parseInt(process.env.EXECUTION_RETENTION_DAYS) || 30;
|
||||||
|
const [result] = await pool.query(
|
||||||
|
`DELETE FROM executions
|
||||||
|
WHERE status IN ('completed', 'failed')
|
||||||
|
AND started_at < DATE_SUB(NOW(), INTERVAL ? DAY)`,
|
||||||
|
[retentionDays]
|
||||||
|
);
|
||||||
|
console.log(`[Cleanup] Removed ${result.affectedRows} executions older than ${retentionDays} days`);
|
||||||
|
} catch (error) {
|
||||||
|
console.error('[Cleanup] Error removing old executions:', error);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Run cleanup daily at 3 AM
|
||||||
|
setInterval(cleanupOldExecutions, 24 * 60 * 60 * 1000);
|
||||||
|
// Run cleanup on startup
|
||||||
|
cleanupOldExecutions();
|
||||||
|
|
||||||
|
// Scheduled Commands Processor
|
||||||
|
async function processScheduledCommands() {
|
||||||
|
try {
|
||||||
|
const [schedules] = await pool.query(
|
||||||
|
`SELECT * FROM scheduled_commands
|
||||||
|
WHERE enabled = TRUE
|
||||||
|
AND (next_run IS NULL OR next_run <= NOW())`
|
||||||
|
);
|
||||||
|
|
||||||
|
for (const schedule of schedules) {
|
||||||
|
console.log(`[Scheduler] Running scheduled command: ${schedule.name}`);
|
||||||
|
|
||||||
|
const workerIds = JSON.parse(schedule.worker_ids);
|
||||||
|
|
||||||
|
// Execute command on each worker
|
||||||
|
for (const workerId of workerIds) {
|
||||||
|
const workerWs = workers.get(workerId);
|
||||||
|
if (workerWs && workerWs.readyState === WebSocket.OPEN) {
|
||||||
|
const executionId = generateUUID();
|
||||||
|
|
||||||
|
// Create execution record
|
||||||
|
await pool.query(
|
||||||
|
'INSERT INTO executions (id, workflow_id, status, started_by, started_at, logs) VALUES (?, ?, ?, ?, NOW(), ?)',
|
||||||
|
[executionId, null, 'running', `scheduler:${schedule.name}`, JSON.stringify([{
|
||||||
|
step: 'scheduled_command',
|
||||||
|
action: 'command_sent',
|
||||||
|
worker_id: workerId,
|
||||||
|
command: schedule.command,
|
||||||
|
timestamp: new Date().toISOString()
|
||||||
|
}])]
|
||||||
|
);
|
||||||
|
|
||||||
|
// Send command to worker
|
||||||
|
workerWs.send(JSON.stringify({
|
||||||
|
type: 'execute_command',
|
||||||
|
execution_id: executionId,
|
||||||
|
command: schedule.command,
|
||||||
|
worker_id: workerId,
|
||||||
|
timeout: 300000 // 5 minute timeout for scheduled commands
|
||||||
|
}));
|
||||||
|
|
||||||
|
broadcast({ type: 'execution_started', execution_id: executionId, workflow_id: null });
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Update last_run and calculate next_run
|
||||||
|
const nextRun = calculateNextRun(schedule.schedule_type, schedule.schedule_value);
|
||||||
|
await pool.query(
|
||||||
|
'UPDATE scheduled_commands SET last_run = NOW(), next_run = ? WHERE id = ?',
|
||||||
|
[nextRun, schedule.id]
|
||||||
|
);
|
||||||
|
}
|
||||||
|
} catch (error) {
|
||||||
|
console.error('[Scheduler] Error processing scheduled commands:', error);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
function calculateNextRun(scheduleType, scheduleValue) {
|
||||||
|
const now = new Date();
|
||||||
|
|
||||||
|
if (scheduleType === 'interval') {
|
||||||
|
// Interval in minutes
|
||||||
|
const minutes = parseInt(scheduleValue);
|
||||||
|
return new Date(now.getTime() + minutes * 60000);
|
||||||
|
} else if (scheduleType === 'daily') {
|
||||||
|
// Daily at HH:MM
|
||||||
|
const [hours, minutes] = scheduleValue.split(':').map(Number);
|
||||||
|
const next = new Date(now);
|
||||||
|
next.setHours(hours, minutes, 0, 0);
|
||||||
|
|
||||||
|
// If time has passed today, schedule for tomorrow
|
||||||
|
if (next <= now) {
|
||||||
|
next.setDate(next.getDate() + 1);
|
||||||
|
}
|
||||||
|
return next;
|
||||||
|
} else if (scheduleType === 'hourly') {
|
||||||
|
// Every N hours
|
||||||
|
const hours = parseInt(scheduleValue);
|
||||||
|
return new Date(now.getTime() + hours * 3600000);
|
||||||
|
}
|
||||||
|
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Run scheduler every minute
|
||||||
|
setInterval(processScheduledCommands, 60 * 1000);
|
||||||
|
// Initial run on startup
|
||||||
|
setTimeout(processScheduledCommands, 5000);
|
||||||
|
|
||||||
// WebSocket connections
|
// WebSocket connections
|
||||||
const clients = new Set();
|
const clients = new Set();
|
||||||
const workers = new Map(); // Map worker_id -> WebSocket connection
|
const workers = new Map(); // Map worker_id -> WebSocket connection
|
||||||
@@ -113,12 +241,13 @@ wss.on('connection', (ws) => {
|
|||||||
|
|
||||||
if (message.type === 'command_result') {
|
if (message.type === 'command_result') {
|
||||||
// Handle command result from worker
|
// Handle command result from worker
|
||||||
const { execution_id, worker_id, success, stdout, stderr, duration, timestamp } = message;
|
const { execution_id, command_id, worker_id, success, stdout, stderr, duration, timestamp } = message;
|
||||||
|
|
||||||
// Add result to execution logs
|
// Add result to execution logs
|
||||||
await addExecutionLog(execution_id, {
|
await addExecutionLog(execution_id, {
|
||||||
step: 'command_execution',
|
step: 'command_execution',
|
||||||
action: 'command_result',
|
action: 'command_result',
|
||||||
|
command_id: command_id, // Include command_id for workflow tracking
|
||||||
worker_id: worker_id,
|
worker_id: worker_id,
|
||||||
success: success,
|
success: success,
|
||||||
stdout: stdout,
|
stdout: stdout,
|
||||||
@@ -127,9 +256,14 @@ wss.on('connection', (ws) => {
|
|||||||
timestamp: timestamp || new Date().toISOString()
|
timestamp: timestamp || new Date().toISOString()
|
||||||
});
|
});
|
||||||
|
|
||||||
// Update execution status to completed or failed
|
// For non-workflow executions, update status immediately
|
||||||
const finalStatus = success ? 'completed' : 'failed';
|
// For workflow executions, the workflow engine will update status
|
||||||
await updateExecutionStatus(execution_id, finalStatus);
|
const [execution] = await pool.query('SELECT workflow_id FROM executions WHERE id = ?', [execution_id]);
|
||||||
|
if (execution.length > 0 && !execution[0].workflow_id) {
|
||||||
|
// Only update status for quick commands (no workflow_id)
|
||||||
|
const finalStatus = success ? 'completed' : 'failed';
|
||||||
|
await updateExecutionStatus(execution_id, finalStatus);
|
||||||
|
}
|
||||||
|
|
||||||
// Broadcast result to all connected clients
|
// Broadcast result to all connected clients
|
||||||
broadcast({
|
broadcast({
|
||||||
@@ -141,7 +275,7 @@ wss.on('connection', (ws) => {
|
|||||||
stderr: stderr
|
stderr: stderr
|
||||||
});
|
});
|
||||||
|
|
||||||
console.log(`Command result received for execution ${execution_id}: ${finalStatus}`);
|
console.log(`Command result received for execution ${execution_id}: ${success ? 'success' : 'failed'}`);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (message.type === 'workflow_result') {
|
if (message.type === 'workflow_result') {
|
||||||
@@ -252,6 +386,45 @@ function broadcast(data) {
|
|||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Helper function to add log entry to execution
|
||||||
|
async function addExecutionLog(executionId, logEntry) {
|
||||||
|
try {
|
||||||
|
const [execution] = await pool.query('SELECT logs FROM executions WHERE id = ?', [executionId]);
|
||||||
|
|
||||||
|
if (execution.length > 0) {
|
||||||
|
const logs = typeof execution[0].logs === 'string' ? JSON.parse(execution[0].logs) : execution[0].logs;
|
||||||
|
logs.push(logEntry);
|
||||||
|
|
||||||
|
await pool.query(
|
||||||
|
'UPDATE executions SET logs = ? WHERE id = ?',
|
||||||
|
[JSON.stringify(logs), executionId]
|
||||||
|
);
|
||||||
|
}
|
||||||
|
} catch (error) {
|
||||||
|
console.error(`[Workflow] Error adding execution log:`, error);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Helper function to update execution status
|
||||||
|
async function updateExecutionStatus(executionId, status) {
|
||||||
|
try {
|
||||||
|
await pool.query(
|
||||||
|
'UPDATE executions SET status = ?, completed_at = NOW() WHERE id = ?',
|
||||||
|
[status, executionId]
|
||||||
|
);
|
||||||
|
|
||||||
|
broadcast({
|
||||||
|
type: 'execution_status',
|
||||||
|
execution_id: executionId,
|
||||||
|
status: status
|
||||||
|
});
|
||||||
|
|
||||||
|
console.log(`[Workflow] Execution ${executionId} status updated to: ${status}`);
|
||||||
|
} catch (error) {
|
||||||
|
console.error(`[Workflow] Error updating execution status:`, error);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// Authelia SSO Middleware
|
// Authelia SSO Middleware
|
||||||
async function authenticateSSO(req, res, next) {
|
async function authenticateSSO(req, res, next) {
|
||||||
// Check for Authelia headers
|
// Check for Authelia headers
|
||||||
@@ -308,6 +481,237 @@ async function authenticateSSO(req, res, next) {
|
|||||||
next();
|
next();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Workflow Execution Engine
|
||||||
|
async function executeWorkflowSteps(executionId, workflowId, definition, username) {
|
||||||
|
try {
|
||||||
|
console.log(`[Workflow] Starting execution ${executionId} for workflow ${workflowId}`);
|
||||||
|
|
||||||
|
const steps = definition.steps || [];
|
||||||
|
let allStepsSucceeded = true;
|
||||||
|
|
||||||
|
for (let i = 0; i < steps.length; i++) {
|
||||||
|
const step = steps[i];
|
||||||
|
console.log(`[Workflow] Execution ${executionId} - Step ${i + 1}/${steps.length}: ${step.name}`);
|
||||||
|
|
||||||
|
// Add step start log
|
||||||
|
await addExecutionLog(executionId, {
|
||||||
|
step: i + 1,
|
||||||
|
step_name: step.name,
|
||||||
|
action: 'step_started',
|
||||||
|
timestamp: new Date().toISOString()
|
||||||
|
});
|
||||||
|
|
||||||
|
broadcast({
|
||||||
|
type: 'workflow_step_started',
|
||||||
|
execution_id: executionId,
|
||||||
|
step: i + 1,
|
||||||
|
step_name: step.name
|
||||||
|
});
|
||||||
|
|
||||||
|
if (step.type === 'execute') {
|
||||||
|
// Execute command step
|
||||||
|
const success = await executeCommandStep(executionId, step, i + 1);
|
||||||
|
if (!success) {
|
||||||
|
allStepsSucceeded = false;
|
||||||
|
break; // Stop workflow on failure
|
||||||
|
}
|
||||||
|
} else if (step.type === 'wait') {
|
||||||
|
// Wait step (delay in seconds)
|
||||||
|
const seconds = step.duration || 5;
|
||||||
|
await addExecutionLog(executionId, {
|
||||||
|
step: i + 1,
|
||||||
|
action: 'waiting',
|
||||||
|
duration: seconds,
|
||||||
|
timestamp: new Date().toISOString()
|
||||||
|
});
|
||||||
|
await new Promise(resolve => setTimeout(resolve, seconds * 1000));
|
||||||
|
} else if (step.type === 'prompt') {
|
||||||
|
// Interactive prompt step (not fully implemented, would need user interaction)
|
||||||
|
await addExecutionLog(executionId, {
|
||||||
|
step: i + 1,
|
||||||
|
action: 'prompt_skipped',
|
||||||
|
message: 'Interactive prompts not yet supported',
|
||||||
|
timestamp: new Date().toISOString()
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
// Add step completion log
|
||||||
|
await addExecutionLog(executionId, {
|
||||||
|
step: i + 1,
|
||||||
|
step_name: step.name,
|
||||||
|
action: 'step_completed',
|
||||||
|
timestamp: new Date().toISOString()
|
||||||
|
});
|
||||||
|
|
||||||
|
broadcast({
|
||||||
|
type: 'workflow_step_completed',
|
||||||
|
execution_id: executionId,
|
||||||
|
step: i + 1,
|
||||||
|
step_name: step.name
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
// Mark execution as completed or failed
|
||||||
|
const finalStatus = allStepsSucceeded ? 'completed' : 'failed';
|
||||||
|
await updateExecutionStatus(executionId, finalStatus);
|
||||||
|
|
||||||
|
console.log(`[Workflow] Execution ${executionId} finished with status: ${finalStatus}`);
|
||||||
|
|
||||||
|
} catch (error) {
|
||||||
|
console.error(`[Workflow] Execution ${executionId} error:`, error);
|
||||||
|
await addExecutionLog(executionId, {
|
||||||
|
action: 'workflow_error',
|
||||||
|
error: error.message,
|
||||||
|
timestamp: new Date().toISOString()
|
||||||
|
});
|
||||||
|
await updateExecutionStatus(executionId, 'failed');
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async function executeCommandStep(executionId, step, stepNumber) {
|
||||||
|
try {
|
||||||
|
const command = step.command;
|
||||||
|
const targets = step.targets || ['all'];
|
||||||
|
|
||||||
|
// Determine which workers to target
|
||||||
|
let targetWorkerIds = [];
|
||||||
|
|
||||||
|
if (targets.includes('all')) {
|
||||||
|
// Get all online workers
|
||||||
|
const [onlineWorkers] = await pool.query('SELECT id FROM workers WHERE status = ?', ['online']);
|
||||||
|
targetWorkerIds = onlineWorkers.map(w => w.id);
|
||||||
|
} else {
|
||||||
|
// Specific worker IDs or names
|
||||||
|
for (const target of targets) {
|
||||||
|
// Try to find by ID first, then by name
|
||||||
|
const [workerById] = await pool.query('SELECT id FROM workers WHERE id = ?', [target]);
|
||||||
|
if (workerById.length > 0) {
|
||||||
|
targetWorkerIds.push(workerById[0].id);
|
||||||
|
} else {
|
||||||
|
const [workerByName] = await pool.query('SELECT id FROM workers WHERE name = ?', [target]);
|
||||||
|
if (workerByName.length > 0) {
|
||||||
|
targetWorkerIds.push(workerByName[0].id);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (targetWorkerIds.length === 0) {
|
||||||
|
await addExecutionLog(executionId, {
|
||||||
|
step: stepNumber,
|
||||||
|
action: 'no_workers',
|
||||||
|
message: 'No workers available for this step',
|
||||||
|
timestamp: new Date().toISOString()
|
||||||
|
});
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Execute command on each target worker and wait for results
|
||||||
|
const results = [];
|
||||||
|
|
||||||
|
for (const workerId of targetWorkerIds) {
|
||||||
|
const workerWs = workers.get(workerId);
|
||||||
|
|
||||||
|
if (!workerWs || workerWs.readyState !== WebSocket.OPEN) {
|
||||||
|
await addExecutionLog(executionId, {
|
||||||
|
step: stepNumber,
|
||||||
|
action: 'worker_offline',
|
||||||
|
worker_id: workerId,
|
||||||
|
timestamp: new Date().toISOString()
|
||||||
|
});
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Send command to worker
|
||||||
|
const commandId = generateUUID();
|
||||||
|
|
||||||
|
await addExecutionLog(executionId, {
|
||||||
|
step: stepNumber,
|
||||||
|
action: 'command_sent',
|
||||||
|
worker_id: workerId,
|
||||||
|
command: command,
|
||||||
|
command_id: commandId,
|
||||||
|
timestamp: new Date().toISOString()
|
||||||
|
});
|
||||||
|
|
||||||
|
workerWs.send(JSON.stringify({
|
||||||
|
type: 'execute_command',
|
||||||
|
execution_id: executionId,
|
||||||
|
command_id: commandId,
|
||||||
|
command: command,
|
||||||
|
worker_id: workerId,
|
||||||
|
timeout: 120000 // 2 minute timeout
|
||||||
|
}));
|
||||||
|
|
||||||
|
// Wait for command result (with timeout)
|
||||||
|
const result = await waitForCommandResult(executionId, commandId, 120000);
|
||||||
|
results.push(result);
|
||||||
|
|
||||||
|
if (!result.success) {
|
||||||
|
// Command failed, workflow should stop
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// All commands succeeded
|
||||||
|
return results.every(r => r.success);
|
||||||
|
|
||||||
|
} catch (error) {
|
||||||
|
console.error(`[Workflow] Error executing command step:`, error);
|
||||||
|
await addExecutionLog(executionId, {
|
||||||
|
step: stepNumber,
|
||||||
|
action: 'step_error',
|
||||||
|
error: error.message,
|
||||||
|
timestamp: new Date().toISOString()
|
||||||
|
});
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async function waitForCommandResult(executionId, commandId, timeout) {
|
||||||
|
return new Promise((resolve) => {
|
||||||
|
const startTime = Date.now();
|
||||||
|
|
||||||
|
const checkInterval = setInterval(async () => {
|
||||||
|
try {
|
||||||
|
// Check if we've received the command result in logs
|
||||||
|
const [execution] = await pool.query('SELECT logs FROM executions WHERE id = ?', [executionId]);
|
||||||
|
|
||||||
|
if (execution.length > 0) {
|
||||||
|
const logs = typeof execution[0].logs === 'string' ? JSON.parse(execution[0].logs) : execution[0].logs;
|
||||||
|
const resultLog = logs.find(log => log.command_id === commandId && log.action === 'command_result');
|
||||||
|
|
||||||
|
if (resultLog) {
|
||||||
|
clearInterval(checkInterval);
|
||||||
|
resolve({
|
||||||
|
success: resultLog.success,
|
||||||
|
stdout: resultLog.stdout,
|
||||||
|
stderr: resultLog.stderr
|
||||||
|
});
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check timeout
|
||||||
|
if (Date.now() - startTime > timeout) {
|
||||||
|
clearInterval(checkInterval);
|
||||||
|
resolve({
|
||||||
|
success: false,
|
||||||
|
error: 'Command timeout'
|
||||||
|
});
|
||||||
|
}
|
||||||
|
} catch (error) {
|
||||||
|
console.error('[Workflow] Error checking command result:', error);
|
||||||
|
clearInterval(checkInterval);
|
||||||
|
resolve({
|
||||||
|
success: false,
|
||||||
|
error: error.message
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}, 500); // Check every 500ms
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
// Routes - All protected by SSO
|
// Routes - All protected by SSO
|
||||||
app.get('/api/user', authenticateSSO, (req, res) => {
|
app.get('/api/user', authenticateSSO, (req, res) => {
|
||||||
res.json(req.user);
|
res.json(req.user);
|
||||||
@@ -326,15 +730,24 @@ app.post('/api/workflows', authenticateSSO, async (req, res) => {
|
|||||||
try {
|
try {
|
||||||
const { name, description, definition } = req.body;
|
const { name, description, definition } = req.body;
|
||||||
const id = generateUUID();
|
const id = generateUUID();
|
||||||
|
|
||||||
|
console.log('[Workflow] Creating workflow:', name);
|
||||||
|
console.log('[Workflow] Definition:', JSON.stringify(definition, null, 2));
|
||||||
|
|
||||||
await pool.query(
|
await pool.query(
|
||||||
'INSERT INTO workflows (id, name, description, definition, created_by) VALUES (?, ?, ?, ?, ?)',
|
'INSERT INTO workflows (id, name, description, definition, created_by) VALUES (?, ?, ?, ?, ?)',
|
||||||
[id, name, description, JSON.stringify(definition), req.user.username]
|
[id, name, description, JSON.stringify(definition), req.user.username]
|
||||||
);
|
);
|
||||||
|
|
||||||
|
console.log('[Workflow] Successfully inserted workflow:', id);
|
||||||
|
|
||||||
res.json({ id, name, description, definition });
|
res.json({ id, name, description, definition });
|
||||||
|
|
||||||
|
console.log('[Workflow] Broadcasting workflow_created');
|
||||||
broadcast({ type: 'workflow_created', workflow_id: id });
|
broadcast({ type: 'workflow_created', workflow_id: id });
|
||||||
|
console.log('[Workflow] Broadcast complete');
|
||||||
} catch (error) {
|
} catch (error) {
|
||||||
|
console.error('[Workflow] Error creating workflow:', error);
|
||||||
res.status(500).json({ error: error.message });
|
res.status(500).json({ error: error.message });
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
@@ -394,13 +807,29 @@ app.post('/api/executions', authenticateSSO, async (req, res) => {
|
|||||||
try {
|
try {
|
||||||
const { workflow_id } = req.body;
|
const { workflow_id } = req.body;
|
||||||
const id = generateUUID();
|
const id = generateUUID();
|
||||||
|
|
||||||
|
// Get workflow definition
|
||||||
|
const [workflows] = await pool.query('SELECT * FROM workflows WHERE id = ?', [workflow_id]);
|
||||||
|
if (workflows.length === 0) {
|
||||||
|
return res.status(404).json({ error: 'Workflow not found' });
|
||||||
|
}
|
||||||
|
|
||||||
|
const workflow = workflows[0];
|
||||||
|
const definition = typeof workflow.definition === 'string' ? JSON.parse(workflow.definition) : workflow.definition;
|
||||||
|
|
||||||
|
// Create execution record
|
||||||
await pool.query(
|
await pool.query(
|
||||||
'INSERT INTO executions (id, workflow_id, status, started_by, started_at, logs) VALUES (?, ?, ?, ?, NOW(), ?)',
|
'INSERT INTO executions (id, workflow_id, status, started_by, started_at, logs) VALUES (?, ?, ?, ?, NOW(), ?)',
|
||||||
[id, workflow_id, 'running', req.user.username, JSON.stringify([])]
|
[id, workflow_id, 'running', req.user.username, JSON.stringify([])]
|
||||||
);
|
);
|
||||||
|
|
||||||
broadcast({ type: 'execution_started', execution_id: id, workflow_id });
|
broadcast({ type: 'execution_started', execution_id: id, workflow_id });
|
||||||
|
|
||||||
|
// Start workflow execution asynchronously
|
||||||
|
executeWorkflowSteps(id, workflow_id, definition, req.user.username).catch(err => {
|
||||||
|
console.error(`[Workflow] Execution ${id} failed:`, err);
|
||||||
|
});
|
||||||
|
|
||||||
res.json({ id, workflow_id, status: 'running' });
|
res.json({ id, workflow_id, status: 'running' });
|
||||||
} catch (error) {
|
} catch (error) {
|
||||||
res.status(500).json({ error: error.message });
|
res.status(500).json({ error: error.message });
|
||||||
@@ -409,10 +838,25 @@ app.post('/api/executions', authenticateSSO, async (req, res) => {
|
|||||||
|
|
||||||
app.get('/api/executions', authenticateSSO, async (req, res) => {
|
app.get('/api/executions', authenticateSSO, async (req, res) => {
|
||||||
try {
|
try {
|
||||||
|
const limit = parseInt(req.query.limit) || 50;
|
||||||
|
const offset = parseInt(req.query.offset) || 0;
|
||||||
|
|
||||||
const [rows] = await pool.query(
|
const [rows] = await pool.query(
|
||||||
'SELECT e.*, w.name as workflow_name FROM executions e LEFT JOIN workflows w ON e.workflow_id = w.id ORDER BY e.started_at DESC LIMIT 50'
|
'SELECT e.*, w.name as workflow_name FROM executions e LEFT JOIN workflows w ON e.workflow_id = w.id ORDER BY e.started_at DESC LIMIT ? OFFSET ?',
|
||||||
|
[limit, offset]
|
||||||
);
|
);
|
||||||
res.json(rows);
|
|
||||||
|
// Get total count
|
||||||
|
const [countRows] = await pool.query('SELECT COUNT(*) as total FROM executions');
|
||||||
|
const total = countRows[0].total;
|
||||||
|
|
||||||
|
res.json({
|
||||||
|
executions: rows,
|
||||||
|
total: total,
|
||||||
|
limit: limit,
|
||||||
|
offset: offset,
|
||||||
|
hasMore: offset + rows.length < total
|
||||||
|
});
|
||||||
} catch (error) {
|
} catch (error) {
|
||||||
res.status(500).json({ error: error.message });
|
res.status(500).json({ error: error.message });
|
||||||
}
|
}
|
||||||
@@ -428,6 +872,104 @@ app.delete('/api/executions/:id', authenticateSSO, async (req, res) => {
|
|||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
|
app.post('/api/executions/:id/abort', authenticateSSO, async (req, res) => {
|
||||||
|
try {
|
||||||
|
const executionId = req.params.id;
|
||||||
|
|
||||||
|
// Check if execution exists and is running
|
||||||
|
const [execution] = await pool.query('SELECT status FROM executions WHERE id = ?', [executionId]);
|
||||||
|
|
||||||
|
if (execution.length === 0) {
|
||||||
|
return res.status(404).json({ error: 'Execution not found' });
|
||||||
|
}
|
||||||
|
|
||||||
|
if (execution[0].status !== 'running') {
|
||||||
|
return res.status(400).json({ error: 'Execution is not running' });
|
||||||
|
}
|
||||||
|
|
||||||
|
// Add abort log entry
|
||||||
|
await addExecutionLog(executionId, {
|
||||||
|
action: 'execution_aborted',
|
||||||
|
aborted_by: req.user.username,
|
||||||
|
timestamp: new Date().toISOString()
|
||||||
|
});
|
||||||
|
|
||||||
|
// Update execution status to failed
|
||||||
|
await updateExecutionStatus(executionId, 'failed');
|
||||||
|
|
||||||
|
console.log(`[Execution] Execution ${executionId} aborted by ${req.user.username}`);
|
||||||
|
|
||||||
|
res.json({ success: true });
|
||||||
|
} catch (error) {
|
||||||
|
console.error('[Execution] Error aborting execution:', error);
|
||||||
|
res.status(500).json({ error: error.message });
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
// Scheduled Commands API
|
||||||
|
app.get('/api/scheduled-commands', authenticateSSO, async (req, res) => {
|
||||||
|
try {
|
||||||
|
const [schedules] = await pool.query(
|
||||||
|
'SELECT * FROM scheduled_commands ORDER BY created_at DESC'
|
||||||
|
);
|
||||||
|
res.json(schedules);
|
||||||
|
} catch (error) {
|
||||||
|
res.status(500).json({ error: error.message });
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
app.post('/api/scheduled-commands', authenticateSSO, async (req, res) => {
|
||||||
|
try {
|
||||||
|
const { name, command, worker_ids, schedule_type, schedule_value } = req.body;
|
||||||
|
|
||||||
|
if (!name || !command || !worker_ids || !schedule_type || !schedule_value) {
|
||||||
|
return res.status(400).json({ error: 'Missing required fields' });
|
||||||
|
}
|
||||||
|
|
||||||
|
const id = generateUUID();
|
||||||
|
const nextRun = calculateNextRun(schedule_type, schedule_value);
|
||||||
|
|
||||||
|
await pool.query(
|
||||||
|
`INSERT INTO scheduled_commands
|
||||||
|
(id, name, command, worker_ids, schedule_type, schedule_value, created_by, next_run)
|
||||||
|
VALUES (?, ?, ?, ?, ?, ?, ?, ?)`,
|
||||||
|
[id, name, command, JSON.stringify(worker_ids), schedule_type, schedule_value, req.user.username, nextRun]
|
||||||
|
);
|
||||||
|
|
||||||
|
res.json({ success: true, id });
|
||||||
|
} catch (error) {
|
||||||
|
res.status(500).json({ error: error.message });
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
app.put('/api/scheduled-commands/:id/toggle', authenticateSSO, async (req, res) => {
|
||||||
|
try {
|
||||||
|
const { id } = req.params;
|
||||||
|
const [current] = await pool.query('SELECT enabled FROM scheduled_commands WHERE id = ?', [id]);
|
||||||
|
|
||||||
|
if (current.length === 0) {
|
||||||
|
return res.status(404).json({ error: 'Schedule not found' });
|
||||||
|
}
|
||||||
|
|
||||||
|
const newEnabled = !current[0].enabled;
|
||||||
|
await pool.query('UPDATE scheduled_commands SET enabled = ? WHERE id = ?', [newEnabled, id]);
|
||||||
|
|
||||||
|
res.json({ success: true, enabled: newEnabled });
|
||||||
|
} catch (error) {
|
||||||
|
res.status(500).json({ error: error.message });
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
app.delete('/api/scheduled-commands/:id', authenticateSSO, async (req, res) => {
|
||||||
|
try {
|
||||||
|
const { id } = req.params;
|
||||||
|
await pool.query('DELETE FROM scheduled_commands WHERE id = ?', [id]);
|
||||||
|
res.json({ success: true });
|
||||||
|
} catch (error) {
|
||||||
|
res.status(500).json({ error: error.message });
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
// Health check (no auth required)
|
// Health check (no auth required)
|
||||||
app.get('/health', async (req, res) => {
|
app.get('/health', async (req, res) => {
|
||||||
try {
|
try {
|
||||||
@@ -467,299 +1009,6 @@ initDatabase().then(() => {
|
|||||||
|
|
||||||
// ============================================
|
// ============================================
|
||||||
// WORKFLOW EXECUTION ENGINE
|
// WORKFLOW EXECUTION ENGINE
|
||||||
// ============================================
|
|
||||||
|
|
||||||
// Store active workflow executions in memory
|
|
||||||
const activeExecutions = new Map();
|
|
||||||
|
|
||||||
// Execute workflow step by step
|
|
||||||
async function executeWorkflow(workflowId, executionId, userId, targetWorkers = 'all') {
|
|
||||||
try {
|
|
||||||
// Get workflow definition
|
|
||||||
const [workflows] = await pool.query('SELECT * FROM workflows WHERE id = ?', [workflowId]);
|
|
||||||
if (workflows.length === 0) {
|
|
||||||
throw new Error('Workflow not found');
|
|
||||||
}
|
|
||||||
|
|
||||||
const workflow = workflows[0];
|
|
||||||
const definition = JSON.parse(workflow.definition);
|
|
||||||
|
|
||||||
// Initialize execution state
|
|
||||||
const executionState = {
|
|
||||||
id: executionId,
|
|
||||||
workflowId: workflowId,
|
|
||||||
currentStep: 0,
|
|
||||||
steps: definition.steps || [],
|
|
||||||
results: [],
|
|
||||||
status: 'running',
|
|
||||||
waitingForInput: false,
|
|
||||||
targetWorkers: targetWorkers,
|
|
||||||
userId: userId
|
|
||||||
};
|
|
||||||
|
|
||||||
activeExecutions.set(executionId, executionState);
|
|
||||||
|
|
||||||
// Start executing steps
|
|
||||||
await executeNextStep(executionId);
|
|
||||||
|
|
||||||
} catch (error) {
|
|
||||||
console.error('Workflow execution error:', error);
|
|
||||||
await updateExecutionStatus(executionId, 'failed', error.message);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Execute the next step in a workflow
|
|
||||||
async function executeNextStep(executionId) {
|
|
||||||
const state = activeExecutions.get(executionId);
|
|
||||||
if (!state) return;
|
|
||||||
|
|
||||||
// Check if we've completed all steps
|
|
||||||
if (state.currentStep >= state.steps.length) {
|
|
||||||
await updateExecutionStatus(executionId, 'completed');
|
|
||||||
activeExecutions.delete(executionId);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
const step = state.steps[state.currentStep];
|
|
||||||
|
|
||||||
// Check if step has a condition
|
|
||||||
if (step.condition && !evaluateCondition(step.condition, state)) {
|
|
||||||
console.log(`[Workflow] Skipping step ${state.currentStep}: condition not met`);
|
|
||||||
state.currentStep++;
|
|
||||||
return executeNextStep(executionId);
|
|
||||||
}
|
|
||||||
|
|
||||||
console.log(`[Workflow] Executing step ${state.currentStep}: ${step.name}`);
|
|
||||||
|
|
||||||
try {
|
|
||||||
switch (step.type) {
|
|
||||||
case 'execute':
|
|
||||||
await executeCommandStep(executionId, step);
|
|
||||||
break;
|
|
||||||
case 'prompt':
|
|
||||||
await executePromptStep(executionId, step);
|
|
||||||
break;
|
|
||||||
case 'wait':
|
|
||||||
await executeWaitStep(executionId, step);
|
|
||||||
break;
|
|
||||||
default:
|
|
||||||
throw new Error(`Unknown step type: ${step.type}`);
|
|
||||||
}
|
|
||||||
} catch (error) {
|
|
||||||
await addExecutionLog(executionId, {
|
|
||||||
step: state.currentStep,
|
|
||||||
error: error.message,
|
|
||||||
timestamp: new Date().toISOString()
|
|
||||||
});
|
|
||||||
await updateExecutionStatus(executionId, 'failed', error.message);
|
|
||||||
activeExecutions.delete(executionId);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Execute a command on workers
|
|
||||||
async function executeCommandStep(executionId, step) {
|
|
||||||
const state = activeExecutions.get(executionId);
|
|
||||||
|
|
||||||
// Get target workers
|
|
||||||
const [workers] = await pool.query(
|
|
||||||
'SELECT * FROM workers WHERE status = "online"'
|
|
||||||
);
|
|
||||||
|
|
||||||
if (workers.length === 0) {
|
|
||||||
throw new Error('No online workers available');
|
|
||||||
}
|
|
||||||
|
|
||||||
// Filter workers based on target
|
|
||||||
let targetWorkers = workers;
|
|
||||||
if (step.targets && step.targets[0] !== 'all') {
|
|
||||||
targetWorkers = workers.filter(w => step.targets.includes(w.name));
|
|
||||||
}
|
|
||||||
|
|
||||||
// Send command to workers via WebSocket
|
|
||||||
const commandMessage = {
|
|
||||||
type: 'execute_command',
|
|
||||||
execution_id: executionId,
|
|
||||||
step_index: state.currentStep,
|
|
||||||
command: step.command,
|
|
||||||
timeout: step.timeout || 300000
|
|
||||||
};
|
|
||||||
|
|
||||||
// Broadcast to target workers
|
|
||||||
clients.forEach(client => {
|
|
||||||
if (client.readyState === WebSocket.OPEN) {
|
|
||||||
client.send(JSON.stringify(commandMessage));
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
await addExecutionLog(executionId, {
|
|
||||||
step: state.currentStep,
|
|
||||||
action: 'command_sent',
|
|
||||||
command: step.command,
|
|
||||||
workers: targetWorkers.map(w => w.name),
|
|
||||||
timestamp: new Date().toISOString()
|
|
||||||
});
|
|
||||||
|
|
||||||
// For now, move to next step immediately
|
|
||||||
// In production, we'd wait for worker responses
|
|
||||||
state.currentStep++;
|
|
||||||
|
|
||||||
// Small delay to allow command to execute
|
|
||||||
setTimeout(() => executeNextStep(executionId), 1000);
|
|
||||||
}
|
|
||||||
|
|
||||||
// Execute a user prompt step
|
|
||||||
async function executePromptStep(executionId, step) {
|
|
||||||
const state = activeExecutions.get(executionId);
|
|
||||||
|
|
||||||
state.waitingForInput = true;
|
|
||||||
state.promptData = {
|
|
||||||
message: step.message,
|
|
||||||
options: step.options,
|
|
||||||
step: state.currentStep
|
|
||||||
};
|
|
||||||
|
|
||||||
await addExecutionLog(executionId, {
|
|
||||||
step: state.currentStep,
|
|
||||||
action: 'waiting_for_input',
|
|
||||||
prompt: step.message,
|
|
||||||
timestamp: new Date().toISOString()
|
|
||||||
});
|
|
||||||
|
|
||||||
// Notify frontend that input is needed
|
|
||||||
broadcast({
|
|
||||||
type: 'execution_prompt',
|
|
||||||
execution_id: executionId,
|
|
||||||
prompt: state.promptData
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
// Execute a wait/delay step
|
|
||||||
async function executeWaitStep(executionId, step) {
|
|
||||||
const state = activeExecutions.get(executionId);
|
|
||||||
const delay = step.duration || 1000;
|
|
||||||
|
|
||||||
await addExecutionLog(executionId, {
|
|
||||||
step: state.currentStep,
|
|
||||||
action: 'waiting',
|
|
||||||
duration: delay,
|
|
||||||
timestamp: new Date().toISOString()
|
|
||||||
});
|
|
||||||
|
|
||||||
state.currentStep++;
|
|
||||||
setTimeout(() => executeNextStep(executionId), delay);
|
|
||||||
}
|
|
||||||
|
|
||||||
// Handle user input for prompts
|
|
||||||
function handleUserInput(executionId, response) {
|
|
||||||
const state = activeExecutions.get(executionId);
|
|
||||||
if (!state || !state.waitingForInput) {
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
state.promptResponse = response;
|
|
||||||
state.waitingForInput = false;
|
|
||||||
state.currentStep++;
|
|
||||||
|
|
||||||
addExecutionLog(executionId, {
|
|
||||||
step: state.currentStep - 1,
|
|
||||||
action: 'user_response',
|
|
||||||
response: response,
|
|
||||||
timestamp: new Date().toISOString()
|
|
||||||
});
|
|
||||||
|
|
||||||
executeNextStep(executionId);
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
|
|
||||||
// Evaluate conditions
|
|
||||||
function evaluateCondition(condition, state) {
|
|
||||||
try {
|
|
||||||
// Simple condition evaluation
|
|
||||||
// In production, use a proper expression evaluator
|
|
||||||
const promptResponse = state.promptResponse;
|
|
||||||
return eval(condition);
|
|
||||||
} catch (error) {
|
|
||||||
console.error('Condition evaluation error:', error);
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Helper functions
|
|
||||||
async function updateExecutionStatus(executionId, status, error = null) {
|
|
||||||
const updates = { status };
|
|
||||||
if (status === 'completed' || status === 'failed') {
|
|
||||||
updates.completed_at = new Date();
|
|
||||||
}
|
|
||||||
if (error) {
|
|
||||||
// Add error to logs
|
|
||||||
await addExecutionLog(executionId, { error, timestamp: new Date().toISOString() });
|
|
||||||
}
|
|
||||||
|
|
||||||
await pool.query(
|
|
||||||
'UPDATE executions SET status = ?, completed_at = ? WHERE id = ?',
|
|
||||||
[status, updates.completed_at || null, executionId]
|
|
||||||
);
|
|
||||||
|
|
||||||
broadcast({
|
|
||||||
type: 'execution_status',
|
|
||||||
execution_id: executionId,
|
|
||||||
status: status
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
async function addExecutionLog(executionId, logEntry) {
|
|
||||||
const [rows] = await pool.query('SELECT logs FROM executions WHERE id = ?', [executionId]);
|
|
||||||
if (rows.length === 0) return;
|
|
||||||
|
|
||||||
const logs = JSON.parse(rows[0].logs || '[]');
|
|
||||||
logs.push(logEntry);
|
|
||||||
|
|
||||||
await pool.query('UPDATE executions SET logs = ? WHERE id = ?', [
|
|
||||||
JSON.stringify(logs),
|
|
||||||
executionId
|
|
||||||
]);
|
|
||||||
}
|
|
||||||
|
|
||||||
// ============================================
|
|
||||||
// API ROUTES - Add these to your server.js
|
|
||||||
// ============================================
|
|
||||||
|
|
||||||
// Start workflow execution
|
|
||||||
app.post('/api/executions', authenticateSSO, async (req, res) => {
|
|
||||||
try {
|
|
||||||
const { workflow_id, target_workers } = req.body;
|
|
||||||
const id = generateUUID();
|
|
||||||
|
|
||||||
await pool.query(
|
|
||||||
'INSERT INTO executions (id, workflow_id, status, started_by, started_at, logs) VALUES (?, ?, ?, ?, NOW(), ?)',
|
|
||||||
[id, workflow_id, 'running', req.user.username, JSON.stringify([])]
|
|
||||||
);
|
|
||||||
|
|
||||||
// Start execution
|
|
||||||
executeWorkflow(workflow_id, id, req.user.username, target_workers || 'all');
|
|
||||||
|
|
||||||
broadcast({ type: 'execution_started', execution_id: id, workflow_id });
|
|
||||||
res.json({ id, workflow_id, status: 'running' });
|
|
||||||
} catch (error) {
|
|
||||||
res.status(500).json({ error: error.message });
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
// Respond to workflow prompt
|
|
||||||
app.post('/api/executions/:id/respond', authenticateSSO, async (req, res) => {
|
|
||||||
try {
|
|
||||||
const { response } = req.body;
|
|
||||||
const success = handleUserInput(req.params.id, response);
|
|
||||||
|
|
||||||
if (success) {
|
|
||||||
res.json({ success: true });
|
|
||||||
} else {
|
|
||||||
res.status(400).json({ error: 'Execution not waiting for input' });
|
|
||||||
}
|
|
||||||
} catch (error) {
|
|
||||||
res.status(500).json({ error: error.message });
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
// Get execution details with logs
|
// Get execution details with logs
|
||||||
app.get('/api/executions/:id', authenticateSSO, async (req, res) => {
|
app.get('/api/executions/:id', authenticateSSO, async (req, res) => {
|
||||||
@@ -768,15 +1017,12 @@ app.get('/api/executions/:id', authenticateSSO, async (req, res) => {
|
|||||||
if (rows.length === 0) {
|
if (rows.length === 0) {
|
||||||
return res.status(404).json({ error: 'Not found' });
|
return res.status(404).json({ error: 'Not found' });
|
||||||
}
|
}
|
||||||
|
|
||||||
const execution = rows[0];
|
const execution = rows[0];
|
||||||
const state = activeExecutions.get(req.params.id);
|
|
||||||
|
|
||||||
res.json({
|
res.json({
|
||||||
...execution,
|
...execution,
|
||||||
logs: JSON.parse(execution.logs || '[]'),
|
logs: JSON.parse(execution.logs || '[]')
|
||||||
waiting_for_input: state?.waitingForInput || false,
|
|
||||||
prompt: state?.promptData || null
|
|
||||||
});
|
});
|
||||||
} catch (error) {
|
} catch (error) {
|
||||||
res.status(500).json({ error: error.message });
|
res.status(500).json({ error: error.message });
|
||||||
|
|||||||
Reference in New Issue
Block a user