Compare commits

...

23 Commits

Author SHA1 Message Date
56f8047322 Phase 10: Command Scheduler
Added comprehensive command scheduling system:

Backend:
- New scheduled_commands database table
- Scheduler processor runs every minute
- Support for three schedule types: interval, hourly, daily
- calculateNextRun() function for intelligent scheduling
- API endpoints: GET, POST, PUT (toggle), DELETE
- Executions automatically created and tracked
- Enable/disable schedules without deleting

Frontend:
- New Scheduler tab in navigation
- Create Schedule modal with worker selection
- Dynamic schedule input based on type
- Schedule list showing status, next/last run times
- Enable/Disable toggle for each schedule
- Delete schedule functionality
- Terminal-themed scheduler UI
- Integration with existing worker and execution systems

Schedule Types:
- Interval: Every X minutes (e.g., 30 for every 30 min)
- Hourly: Every X hours (e.g., 2 for every 2 hours)
- Daily: At specific time (e.g., 03:00 for 3 AM daily)

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
2026-01-07 23:13:27 -05:00
bf908568e3 Phase 9: Execution Diff View
Added powerful execution comparison and diff view:

- Compare Mode toggle button in executions tab
- Multi-select up to 5 executions for comparison
- Visual selection indicators with checkmarks
- Comparison modal with summary table (status, duration, timestamps)
- Side-by-side output view for all selected executions
- Line-by-line diff analysis for 2-execution comparisons
- Highlights identical vs. different lines
- Shows identical/different line counts
- Color-coded diff (green for exec 1, amber for exec 2)
- Perfect for comparing same command across workers
- Terminal-themed comparison UI

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
2026-01-07 23:08:55 -05:00
ffa124ed9d Phase 8: Execution Search & Filtering
Added comprehensive search and filtering for execution history:

- Search bar to filter by command text, execution ID, or workflow name
- Status filter dropdown (All, Running, Completed, Failed, Waiting)
- Real-time client-side filtering as user types
- Filter statistics showing X of Y executions
- Clear Filters button to reset all filters
- Extracts command text from logs for quick command searches
- Maintains all executions in memory for instant filtering
- Terminal-themed filter UI matching existing aesthetic

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
2026-01-07 23:06:43 -05:00
84edea8027 Phase 7: Multi-Worker Command Execution
Added ability to execute commands on multiple workers simultaneously:

- Added execution mode selector (Single/Multiple Workers)
- Multi-worker mode with checkbox list for worker selection
- Helper buttons: Select All, Online Only, Clear All
- Sequential execution across selected workers
- Results summary showing success/fail count per worker
- Updated command history to track multi-worker executions
- Terminal beep feedback based on overall success/failure
- Maintained backward compatibility with single worker mode

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
2026-01-07 23:03:45 -05:00
095bfb65ab Fix clearCompletedExecutions for new pagination API format
Changes:
- Handle new pagination response format (data.executions vs data)
- Request up to 1000 executions to ensure all are checked
- Track successful deletions count
- Use terminal notification instead of alert
- Better error handling for individual delete failures

Fixes regression from Phase 5 pagination changes.

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
2026-01-07 22:55:13 -05:00
c619add705 Phase 6: Terminal aesthetic refinements and notifications
Changes:
- Added blinking terminal cursor animation
- Smooth hover effects for execution/worker/workflow items
- Hover animation: background highlight + border expand + slide
- Loading pulse animation for loading states
- Slide-in animation for log entries
- Terminal beep sound using Web Audio API (different tones for success/error)
- Real-time terminal notifications for command completion
- Toast-style notifications with green glow effects
- Auto-dismiss after 3 seconds with fade-out
- Visual and audio feedback for user actions

Sound features:
- 800Hz tone for success (higher pitch)
- 200Hz tone for errors (lower pitch)
- 440Hz tone for info (standard A note)
- 100ms duration, exponential fade-out
- Graceful fallback if Web Audio API not supported

Notification features:
- Fixed position top-right
- Terminal-themed styling with glow
- Color-coded: green for success, red for errors
- Icons: ✓ success, ✗ error, ℹ info
- Smooth animations (slide-in, fade-out)

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
2026-01-07 22:52:51 -05:00
e6a6b7e359 Phase 5: Auto-cleanup and pagination for executions
Changes:
Server-side:
- Added automatic cleanup of old executions (runs daily)
- Configurable retention period via EXECUTION_RETENTION_DAYS env var (default: 30 days)
- Cleanup runs on server startup and every 24 hours
- Only cleans completed/failed executions, keeps running ones
- Added pagination support to /api/executions endpoint
- Returns total count, limit, offset, and hasMore flag

Client-side:
- Implemented "Load More" button for execution pagination
- Loads 50 executions at a time
- Appends additional executions when "Load More" clicked
- Shows total execution count info
- Backward compatible with old API format

Benefits:
- Automatic database maintenance
- Prevents execution table from growing indefinitely
- Better performance with large execution histories
- User can browse all executions via pagination
- Configurable retention policy per deployment

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
2026-01-07 22:50:39 -05:00
d25ba27f24 Phase 4: Execution detail enhancements with re-run and download
Changes:
- Added "Re-run Command" button to execution details modal
- Added "Download Logs" button to export execution data as JSON
- Re-run automatically switches to Quick Command tab and pre-fills form
- Download includes all execution metadata and logs
- Buttons only show for applicable execution types
- Terminal-themed button styling

Features:
- Re-run: Quickly repeat a previous command on same worker
- Download: Export execution logs for auditing/debugging
- JSON format includes: execution_id, status, timestamps, logs
- Filename includes execution ID and date for easy organization

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
2026-01-07 22:49:20 -05:00
76b0a6d0d3 Phase 3: Quick command enhancements with templates and history
Changes:
- Added command templates modal with 12 common system commands
- Added command history tracking (stored in localStorage)
- History saves last 50 commands with timestamp and worker name
- Template categories: system info, disk/memory, network, Docker, logs
- Click templates to auto-fill command field
- Click history items to reuse previous commands
- Terminal-themed modals with green/amber styling
- History persists across browser sessions

Templates included:
- System: uname, uptime, CPU info, processes
- Resources: df -h, free -h, memory usage
- Network: ip addr, active connections
- Docker: container list
- Logs: syslog tail, who is logged in, last logins

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
2026-01-07 22:45:40 -05:00
adbbec2631 Phase 2: Enhanced worker status display with metadata
Changes:
- Show worker system metrics in dashboard and worker list
- Display CPU cores, memory usage, load average, uptime
- Added formatBytes() to display memory in human-readable format
- Added formatUptime() to show uptime as days/hours/minutes
- Added getTimeAgo() to show relative last-seen time
- Improved worker list with detailed metadata panel
- Show active tasks vs max concurrent tasks
- Terminal-themed styling for metadata display
- Amber labels for metadata fields

Benefits:
- See worker health at a glance
- Monitor resource usage (CPU, RAM, load)
- Track worker activity (active tasks)
- Better operational visibility

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
2026-01-07 22:43:13 -05:00
8b8d2c6312 Phase 1: Improve log display formatting
Changes:
- Added formatLogEntry() function to parse and format log entries
- Replaced raw JSON display with readable formatted logs
- Added specific formatting for command_sent and command_result logs
- Show timestamp, status, duration, stdout/stderr in organized layout
- Color-coded success (green) and failure (red) states
- Added scrollable output sections with max-height
- Syntax highlighting for command code blocks
- Terminal-themed styling with green/amber colors

Benefits:
- Much easier to read execution logs
- Clear visual distinction between sent/result logs
- Professional terminal aesthetic maintained
- Better UX for debugging command execution

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
2026-01-07 22:41:29 -05:00
1f5c84f327 Add execution cleanup functionality
Changes:
- Added DELETE /api/executions/:id endpoint
- Added "Clear Completed" button to Executions tab
- Deletes all completed and failed executions
- Broadcasts execution_deleted event to update all clients
- Shows count of deleted executions

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
2026-01-07 22:36:51 -05:00
e03f8d6287 Fix worker ID mapping - use database ID for command routing
Problem:
- Workers generate random UUID on startup (runtime ID)
- Database stores workers with persistent IDs (database ID)
- UI sends commands using database ID
- Server couldn't find worker connection (stored by runtime ID)
- Result: 400 Bad Request "Worker not connected"

Solution:
- When worker connects, look up database ID by worker name
- Store WebSocket connection in Map using BOTH IDs:
  * Runtime ID (from worker_connect message)
  * Database ID (from database lookup by name)
- Commands from UI use database ID → finds correct WebSocket
- Cleanup both IDs when worker disconnects

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
2026-01-07 22:29:23 -05:00
2097b73404 Fix worker command execution and execution status updates
Changes:
- Removed duplicate /api/executions/:id endpoint that didn't parse logs
- Added workers Map to track worker_id -> WebSocket connection
- Store worker connections when they send worker_connect message
- Send commands to specific worker instead of broadcasting to all clients
- Clean up workers Map when worker disconnects
- Update execution status to completed/failed when command results arrive
- Add proper error handling when worker is not connected

Fixes:
- execution.logs.forEach is not a function (logs now properly parsed)
- Commands stuck in "running" status (now update to completed/failed)
- Commands not reaching workers (now sent to specific worker WebSocket)

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
2026-01-07 22:23:02 -05:00
6d15e4d240 Remove database migrations after direct schema fixes
Changes:
- Removed all migration code from server.js
- Database schema fixed directly via MySQL:
  * Dropped users.role column (SSO only)
  * Dropped users.password column (SSO only)
  * Added executions.started_by column
  * Added workflows.created_by column
  * All tables now match expected schema
- Server startup will be faster without migrations

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
2026-01-07 22:11:07 -05:00
7896b40d91 Remove password column from users table
Changes:
- Drop password column from users table (SSO authentication only)
- PULSE uses Authelia SSO, not password-based authentication
- Fixes 500 error: Field 'password' doesn't have a default value

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
2026-01-07 22:01:58 -05:00
e2dc371bfe Add last_login column to users table migration
Changes:
- Add last_login TIMESTAMP column to existing users table
- Complete the users table migration with all required columns
- Fixes 500 error: Unknown column 'last_login' in 'INSERT INTO'

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
2026-01-07 20:33:04 -05:00
df0184facf Add migration to update users table schema
Changes:
- Add display_name, email, and groups columns to existing users table
- Handle MariaDB lack of IF NOT EXISTS in ALTER TABLE
- Gracefully skip columns that already exist
- Fixes 500 error when authenticating users

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
2026-01-07 20:28:47 -05:00
a8be111e04 Allow NULL workflow_id in executions table for quick commands
Changes:
- Modified executions table schema to allow NULL workflow_id
- Removed foreign key constraint that prevented NULL values
- Added migration to update existing table structure
- Quick commands can now be stored without a workflow reference

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
2026-01-07 20:27:02 -05:00
b3806545bd Fix quick command executions not appearing in execution tab
Changes:
- Create execution record in database when quick command is sent
- Store initial log entry with command details
- Broadcast execution_started event to update UI
- Display quick commands as "[Quick Command]" in execution list
- Fix worker communication to properly track all executions

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
2026-01-07 20:24:11 -05:00
2767087e27 Updated websocket handler 2026-01-07 20:20:18 -05:00
a1cf8ac90b updates aesthetic 2026-01-07 20:12:16 -05:00
9e842624e1 Claude md file 2026-01-07 19:57:16 -05:00
5 changed files with 4041 additions and 360 deletions

3
.gitignore vendored
View File

@@ -30,3 +30,6 @@ Thumbs.db
*.swp *.swp
*.swo *.swo
*~ *~
# Claude
Claude.md

1372
Claude.md Normal file

File diff suppressed because it is too large Load Diff

477
README.md
View File

@@ -1,240 +1,375 @@
# PULSE - Pipelined Unified Logic & Server Engine # PULSE - Pipelined Unified Logic & Server Engine
A distributed workflow orchestration platform for managing and executing complex multi-step operations across server clusters through an intuitive web interface. A distributed workflow orchestration platform for managing and executing complex multi-step operations across server clusters through a retro terminal-themed web interface.
## Overview ## Overview
PULSE is a centralized workflow execution system designed to orchestrate operations across distributed infrastructure. It provides a powerful web-based interface for defining, managing, and executing workflows that can span multiple servers, require human interaction, and perform complex automation tasks at scale. PULSE is a centralized workflow execution system designed to orchestrate operations across distributed infrastructure. It provides a powerful web-based interface with a vintage CRT terminal aesthetic for defining, managing, and executing workflows that can span multiple servers, require human interaction, and perform complex automation tasks at scale.
### Key Features ### Key Features
- **Interactive Workflow Management**: Define and execute multi-step workflows with conditional logic, user prompts, and decision points - **🎨 Retro Terminal Interface**: Phosphor green CRT-style interface with scanlines, glow effects, and ASCII art
- **Distributed Execution**: Run commands and scripts across multiple worker nodes simultaneously - **⚡ Quick Command Execution**: Instantly execute commands on any worker with built-in templates and command history
- **High Availability Architecture**: Deploy redundant worker nodes in LXC containers with Ceph storage for fault tolerance - **📊 Real-Time Worker Monitoring**: Live system metrics including CPU, memory, load average, and active tasks
- **Web-Based Control Center**: Intuitive interface for workflow selection, monitoring, and interactive input - **🔄 Interactive Workflow Management**: Define and execute multi-step workflows with conditional logic and user prompts
- **Flexible Worker Pool**: Scale horizontally by adding worker nodes as needed - **🌐 Distributed Execution**: Run commands across multiple worker nodes simultaneously via WebSocket
- **Real-Time Monitoring**: Track workflow progress, view logs, and receive notifications - **📈 Execution Tracking**: Comprehensive logging with formatted output, re-run capabilities, and JSON export
- **🔐 SSO Authentication**: Seamless integration with Authelia for enterprise authentication
- **🧹 Auto-Cleanup**: Automatic removal of old executions with configurable retention policies
- **🔔 Terminal Notifications**: Audio beeps and visual toasts for command completion events
## Architecture ## Architecture
PULSE consists of two core components: PULSE consists of two core components:
### PULSE Server ### PULSE Server
**Location:** `10.10.10.65` (LXC Container ID: 122)
**Directory:** `/opt/pulse-server`
The central orchestration hub that: The central orchestration hub that:
- Hosts the web interface for workflow management - Hosts the retro terminal web interface
- Manages workflow definitions and execution state - Manages workflow definitions and execution state
- Coordinates task distribution to worker nodes - Coordinates task distribution to worker nodes via WebSocket
- Handles user interactions and input collection - Handles user interactions through Authelia SSO
- Provides real-time status updates and logging - Provides real-time status updates and logging
- Stores all data in MariaDB database
**Technology Stack:**
- Node.js 20.x
- Express.js (web framework)
- WebSocket (ws package) for real-time bidirectional communication
- MySQL2 (MariaDB driver)
- Authelia SSO integration
### PULSE Worker ### PULSE Worker
**Example:** `10.10.10.151` (LXC Container ID: 153, hostname: pulse-worker-01)
**Directory:** `/opt/pulse-worker`
Lightweight execution agents that: Lightweight execution agents that:
- Connect to the PULSE server and await task assignments - Connect to PULSE server via WebSocket with heartbeat monitoring
- Execute commands, scripts, and code on target infrastructure - Execute shell commands and report results in real-time
- Report execution status and results back to the server - Provide system metrics (CPU, memory, load, uptime)
- Support multiple concurrent workflow executions - Support concurrent task execution with configurable limits
- Automatically reconnect and resume on failure - Automatically reconnect on connection loss
**Technology Stack:**
- Node.js 20.x
- WebSocket client
- Child process execution
- System metrics collection
``` ```
┌─────────────────────┐ ┌─────────────────────────────────
│ PULSE Server │ PULSE Server (10.10.10.65)
(Web Interface) Terminal Web Interface + API
──────────────────── │ ┌───────────┐ ┌──────────┐ │
│ MariaDB │ │ Authelia │
┌──────┴───────┬──────────────┬──────────────┐ │ Database │ │ SSO │ │
│ │ │ │ └───────────┘ └──────────┘
───────┐ ┌───────┐ ┌───────┐ ┌─────── ────────────────────────────────
│ Worker │ │ Worker │ │ Worker │ │ Worker │ │ WebSocket
│ Node 1 │ │ Node 2 │ │ Node 3 │ │ Node N │ ┌────────┴────────┬───────────┐
└────────┘ └────────┘ └────────┘ └────────┘ │ │ │
LXC Containers in Proxmox with Ceph ┌───▼────────┐ ┌───▼────┐ ┌──▼─────┐
│ Worker 1 │ │Worker 2│ │Worker N│
│10.10.10.151│ │ ... │ │ ... │
└────────────┘ └────────┘ └────────┘
LXC Containers in Proxmox with Ceph
``` ```
## Deployment ## Installation
### Prerequisites ### Prerequisites
- **Proxmox VE Cluster**: Hypervisor environment for container deployment - **Node.js 20.x** or higher
- **Ceph Storage**: Distributed storage backend for high availability - **MariaDB 10.x** or higher
- **LXC Support**: Container runtime for worker node deployment - **Authelia** configured for SSO (optional but recommended)
- **Network Connectivity**: Communication between server and workers - **Network Connectivity** between server and workers
### Installation ### PULSE Server Setup
#### PULSE Server
```bash ```bash
# Clone the repository # Clone repository
git clone https://github.com/yourusername/pulse.git cd /opt
cd pulse git clone <your-repo-url> pulse-server
cd pulse-server
# Install dependencies # Install dependencies
npm install # or pip install -r requirements.txt npm install
# Configure server settings # Create .env file with configuration
cp config.example.yml config.yml cat > .env << EOF
nano config.yml # Server Configuration
PORT=8080
SECRET_KEY=your-secret-key-here
# Start the PULSE server # MariaDB Configuration
npm start # or python server.py DB_HOST=10.10.10.50
DB_PORT=3306
DB_NAME=pulse
DB_USER=pulse_user
DB_PASSWORD=your-db-password
# Worker API Key (for worker authentication)
WORKER_API_KEY=your-worker-api-key
# Auto-cleanup configuration (optional)
EXECUTION_RETENTION_DAYS=30
EOF
# Create systemd service
cat > /etc/systemd/system/pulse.service << EOF
[Unit]
Description=PULSE Workflow Orchestration Server
After=network.target
[Service]
Type=simple
User=root
WorkingDirectory=/opt/pulse-server
ExecStart=/usr/bin/node server.js
Restart=always
RestartSec=10
[Install]
WantedBy=multi-user.target
EOF
# Start service
systemctl daemon-reload
systemctl enable pulse.service
systemctl start pulse.service
``` ```
#### PULSE Worker ### PULSE Worker Setup
```bash ```bash
# On each worker node (LXC container) # On each worker node
cd /opt
git clone <your-repo-url> pulse-worker
cd pulse-worker cd pulse-worker
# Install dependencies # Install dependencies
npm install # or pip install -r requirements.txt npm install
# Configure worker connection # Create .env file
cp worker-config.example.yml worker-config.yml cat > .env << EOF
nano worker-config.yml # Worker Configuration
WORKER_NAME=pulse-worker-01
PULSE_SERVER=http://10.10.10.65:8080
PULSE_WS=ws://10.10.10.65:8080
WORKER_API_KEY=your-worker-api-key
# Start the worker daemon # Performance Settings
npm start # or python worker.py HEARTBEAT_INTERVAL=30
``` MAX_CONCURRENT_TASKS=5
EOF
### High Availability Setup # Create systemd service
cat > /etc/systemd/system/pulse-worker.service << EOF
[Unit]
Description=PULSE Worker Node
After=network.target
Deploy multiple worker nodes across Proxmox hosts: [Service]
```bash Type=simple
# Create LXC template User=root
pct create 1000 local:vztmpl/ubuntu-22.04-standard_amd64.tar.zst \ WorkingDirectory=/opt/pulse-worker
--rootfs ceph-pool:8 \ ExecStart=/usr/bin/node worker.js
--memory 2048 \ Restart=always
--cores 2 \ RestartSec=10
--net0 name=eth0,bridge=vmbr0,ip=dhcp
# Clone for additional workers [Install]
pct clone 1000 1001 --full --storage ceph-pool WantedBy=multi-user.target
pct clone 1000 1002 --full --storage ceph-pool EOF
pct clone 1000 1003 --full --storage ceph-pool
# Start all workers # Start service
for i in {1000..1003}; do pct start $i; done systemctl daemon-reload
systemctl enable pulse-worker.service
systemctl start pulse-worker.service
``` ```
## Usage ## Usage
### Creating a Workflow ### Quick Command Execution
1. Access the PULSE web interface at `http://your-server:8080` 1. Access PULSE at `http://your-server:8080`
2. Navigate to **Workflows****Create New** 2. Navigate to **⚡ Quick Command** tab
3. Define workflow steps using the visual editor or YAML syntax 3. Select a worker from the dropdown
4. Specify execution targets (specific nodes, groups, or all workers) 4. Use **Templates** for pre-built commands or **History** for recent commands
5. Add interactive prompts where user input is required 5. Enter your command and click **Execute**
6. Save and activate the workflow 6. View results in the **Executions** tab
### Example Workflow **Built-in Command Templates:**
```yaml - System Info: `uname -a`
name: "System Update and Reboot" - Disk Usage: `df -h`
description: "Update all servers in the cluster with user confirmation" - Memory Usage: `free -h`
steps: - CPU Info: `lscpu`
- name: "Check Current Versions" - Running Processes: `ps aux --sort=-%mem | head -20`
type: "execute" - Network Interfaces: `ip addr show`
targets: ["all"] - Docker Containers: `docker ps -a`
command: "apt list --upgradable" - System Logs: `tail -n 50 /var/log/syslog`
- name: "User Approval"
type: "prompt"
message: "Review available updates. Proceed with installation?"
options: ["Yes", "No", "Cancel"]
- name: "Install Updates"
type: "execute"
targets: ["all"]
command: "apt-get update && apt-get upgrade -y"
condition: "prompt_response == 'Yes'"
- name: "Reboot Confirmation"
type: "prompt"
message: "Updates complete. Reboot all servers?"
options: ["Yes", "No"]
- name: "Rolling Reboot"
type: "execute"
targets: ["all"]
command: "reboot"
strategy: "rolling"
condition: "prompt_response == 'Yes'"
```
### Running a Workflow ### Worker Monitoring
1. Select a workflow from the dashboard The **Workers** tab displays real-time metrics for each worker:
2. Click **Execute** - System information (OS, architecture, CPU cores)
3. Monitor progress in real-time - Memory usage (used/total with percentage)
4. Respond to interactive prompts as they appear - Load averages (1m, 5m, 15m)
5. View detailed logs for each execution step - System uptime
- Active tasks vs. maximum concurrent capacity
## Configuration ### Execution Management
### Server Configuration (`config.yml`) - **View Details**: Click any execution to see formatted logs with timestamps, status, and output
```yaml - **Re-run Command**: Click "Re-run" button in execution details to repeat a command
server: - **Download Logs**: Export execution data as JSON for auditing
host: "0.0.0.0" - **Clear Completed**: Bulk delete finished executions
port: 8080 - **Auto-Cleanup**: Executions older than 30 days are automatically removed
secret_key: "your-secret-key"
database: ### Workflow Creation (Future Feature)
type: "postgresql"
host: "localhost"
port: 5432
name: "pulse"
workers: 1. Navigate to **Workflows****Create New**
heartbeat_interval: 30 2. Define workflow steps using JSON syntax
timeout: 300 3. Specify target workers
max_concurrent_tasks: 10 4. Add interactive prompts where needed
5. Save and execute
security:
enable_authentication: true
require_approval: true
```
### Worker Configuration (`worker-config.yml`)
```yaml
worker:
name: "worker-01"
server_url: "http://pulse-server:8080"
api_key: "worker-api-key"
resources:
max_cpu_percent: 80
max_memory_mb: 1024
executor:
shell: "/bin/bash"
working_directory: "/tmp/pulse"
timeout: 3600
```
## Features in Detail ## Features in Detail
### Interactive Workflows ### Terminal Aesthetic
- Pause execution to collect user input via web forms - Phosphor green (#00ff41) on black (#0a0a0a) color scheme
- Display intermediate results for review - CRT scanline animation effect
- Conditional branching based on user decisions - Text glow and shadow effects
- Multi-choice prompts with validation - ASCII box-drawing characters for borders
- Boot sequence animation on first load
- Hover effects with smooth transitions
### Mass Execution ### Real-Time Communication
- Run commands across all workers simultaneously - WebSocket-based bidirectional communication
- Target specific node groups or individual servers - Instant command result notifications
- Rolling execution for zero-downtime updates - Live worker status updates
- Parallel and sequential execution strategies - Terminal beep sounds for events
- Toast notifications with visual feedback
### Monitoring & Logging ### Execution Tracking
- Real-time workflow execution dashboard - Formatted log display (not raw JSON)
- Detailed per-step logging and output capture - Color-coded success/failure indicators
- Historical execution records and analytics - Timestamp and duration for each step
- Alert notifications for failures or completion - Scrollable output with syntax highlighting
- Persistent history with pagination
- Load More button for large execution lists
### Security ### Security
- Role-based access control (RBAC) - Authelia SSO integration for user authentication
- API key authentication for workers - API key authentication for workers
- Workflow approval requirements - User session management
- Audit logging for all actions - Admin-only operations (worker deletion, workflow management)
- Audit logging for all executions
### Performance
- Automatic cleanup of old executions (configurable retention)
- Pagination for large execution lists (50 at a time)
- Efficient WebSocket connection pooling
- Worker heartbeat monitoring
- Database connection pooling
## Configuration
### Environment Variables
**Server (.env):**
```bash
PORT=8080 # Server port
SECRET_KEY=<random-string> # Session secret
DB_HOST=10.10.10.50 # MariaDB host
DB_PORT=3306 # MariaDB port
DB_NAME=pulse # Database name
DB_USER=pulse_user # Database user
DB_PASSWORD=<password> # Database password
WORKER_API_KEY=<api-key> # Worker authentication key
EXECUTION_RETENTION_DAYS=30 # Auto-cleanup retention (default: 30)
```
**Worker (.env):**
```bash
WORKER_NAME=pulse-worker-01 # Unique worker name
PULSE_SERVER=http://10.10.10.65:8080 # Server HTTP URL
PULSE_WS=ws://10.10.10.65:8080 # Server WebSocket URL
WORKER_API_KEY=<api-key> # Must match server key
HEARTBEAT_INTERVAL=30 # Heartbeat seconds (default: 30)
MAX_CONCURRENT_TASKS=5 # Max parallel tasks (default: 5)
```
## Database Schema
PULSE uses MariaDB with the following tables:
- **users**: User accounts from Authelia SSO
- **workers**: Worker node registry with metadata
- **workflows**: Workflow definitions (JSON)
- **executions**: Execution history with logs
See [Claude.md](Claude.md) for complete schema details.
## Troubleshooting
### Worker Not Connecting
```bash
# Check worker service status
systemctl status pulse-worker
# Check worker logs
journalctl -u pulse-worker -n 50 -f
# Verify API key matches server
grep WORKER_API_KEY /opt/pulse-worker/.env
```
### Commands Stuck in "Running"
- This was fixed in recent updates - restart the server:
```bash
systemctl restart pulse.service
```
### Clear All Executions
Use the database directly if needed:
```bash
mysql -h 10.10.10.50 -u pulse_user -p pulse
> DELETE FROM executions WHERE status IN ('completed', 'failed');
```
## Development
### Recent Updates
**Phase 1-6 Improvements:**
- Formatted log display with color-coding
- Worker system metrics monitoring
- Command templates and history
- Re-run and download execution features
- Auto-cleanup and pagination
- Terminal aesthetic refinements
- Audio notifications and visual toasts
See git history for detailed changelog.
### Future Enhancements
- Full workflow system implementation
- Multi-worker command execution
- Scheduled/cron job support
- Execution search and filtering
- Dark/light theme toggle
- Mobile-responsive design
- REST API documentation
- Webhook integrations
## License
MIT License - See LICENSE file for details
--- ---
**PULSE** - Orchestrating your infrastructure, one heartbeat at a time. **PULSE** - Orchestrating your infrastructure, one heartbeat at a time.
Built with retro terminal aesthetics 🖥️ | Powered by WebSockets 🔌 | Secured by Authelia 🔐

File diff suppressed because it is too large Load Diff

396
server.js
View File

@@ -46,6 +46,8 @@ async function initDatabase() {
) )
`); `);
// Database schema is managed manually - migrations removed after direct database fixes
await connection.query(` await connection.query(`
CREATE TABLE IF NOT EXISTS workers ( CREATE TABLE IF NOT EXISTS workers (
id VARCHAR(36) PRIMARY KEY, id VARCHAR(36) PRIMARY KEY,
@@ -75,19 +77,36 @@ async function initDatabase() {
await connection.query(` await connection.query(`
CREATE TABLE IF NOT EXISTS executions ( CREATE TABLE IF NOT EXISTS executions (
id VARCHAR(36) PRIMARY KEY, id VARCHAR(36) PRIMARY KEY,
workflow_id VARCHAR(36) NOT NULL, workflow_id VARCHAR(36) NULL,
status VARCHAR(50) NOT NULL, status VARCHAR(50) NOT NULL,
started_by VARCHAR(255), started_by VARCHAR(255),
started_at TIMESTAMP NULL, started_at TIMESTAMP NULL,
completed_at TIMESTAMP NULL, completed_at TIMESTAMP NULL,
logs JSON, logs JSON,
FOREIGN KEY (workflow_id) REFERENCES workflows(id) ON DELETE CASCADE,
INDEX idx_workflow (workflow_id), INDEX idx_workflow (workflow_id),
INDEX idx_status (status), INDEX idx_status (status),
INDEX idx_started (started_at) INDEX idx_started (started_at)
) )
`); `);
await connection.query(`
CREATE TABLE IF NOT EXISTS scheduled_commands (
id VARCHAR(36) PRIMARY KEY,
name VARCHAR(255) NOT NULL,
command TEXT NOT NULL,
worker_ids JSON NOT NULL,
schedule_type VARCHAR(50) NOT NULL,
schedule_value VARCHAR(255) NOT NULL,
enabled BOOLEAN DEFAULT TRUE,
created_by VARCHAR(255),
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
last_run TIMESTAMP NULL,
next_run TIMESTAMP NULL,
INDEX idx_enabled (enabled),
INDEX idx_next_run (next_run)
)
`);
console.log('Database tables initialized successfully'); console.log('Database tables initialized successfully');
} catch (error) { } catch (error) {
console.error('Database initialization error:', error); console.error('Database initialization error:', error);
@@ -97,11 +116,259 @@ async function initDatabase() {
} }
} }
// Auto-cleanup old executions (runs daily)
async function cleanupOldExecutions() {
try {
const retentionDays = parseInt(process.env.EXECUTION_RETENTION_DAYS) || 30;
const [result] = await pool.query(
`DELETE FROM executions
WHERE status IN ('completed', 'failed')
AND started_at < DATE_SUB(NOW(), INTERVAL ? DAY)`,
[retentionDays]
);
console.log(`[Cleanup] Removed ${result.affectedRows} executions older than ${retentionDays} days`);
} catch (error) {
console.error('[Cleanup] Error removing old executions:', error);
}
}
// Run cleanup daily at 3 AM
setInterval(cleanupOldExecutions, 24 * 60 * 60 * 1000);
// Run cleanup on startup
cleanupOldExecutions();
// Scheduled Commands Processor
async function processScheduledCommands() {
try {
const [schedules] = await pool.query(
`SELECT * FROM scheduled_commands
WHERE enabled = TRUE
AND (next_run IS NULL OR next_run <= NOW())`
);
for (const schedule of schedules) {
console.log(`[Scheduler] Running scheduled command: ${schedule.name}`);
const workerIds = JSON.parse(schedule.worker_ids);
// Execute command on each worker
for (const workerId of workerIds) {
const workerWs = workers.get(workerId);
if (workerWs && workerWs.readyState === WebSocket.OPEN) {
const executionId = generateUUID();
// Create execution record
await pool.query(
'INSERT INTO executions (id, workflow_id, status, started_by, started_at, logs) VALUES (?, ?, ?, ?, NOW(), ?)',
[executionId, null, 'running', `scheduler:${schedule.name}`, JSON.stringify([{
step: 'scheduled_command',
action: 'command_sent',
worker_id: workerId,
command: schedule.command,
timestamp: new Date().toISOString()
}])]
);
// Send command to worker
workerWs.send(JSON.stringify({
type: 'execute_command',
execution_id: executionId,
command: schedule.command,
worker_id: workerId,
timeout: 300000 // 5 minute timeout for scheduled commands
}));
broadcast({ type: 'execution_started', execution_id: executionId, workflow_id: null });
}
}
// Update last_run and calculate next_run
const nextRun = calculateNextRun(schedule.schedule_type, schedule.schedule_value);
await pool.query(
'UPDATE scheduled_commands SET last_run = NOW(), next_run = ? WHERE id = ?',
[nextRun, schedule.id]
);
}
} catch (error) {
console.error('[Scheduler] Error processing scheduled commands:', error);
}
}
function calculateNextRun(scheduleType, scheduleValue) {
const now = new Date();
if (scheduleType === 'interval') {
// Interval in minutes
const minutes = parseInt(scheduleValue);
return new Date(now.getTime() + minutes * 60000);
} else if (scheduleType === 'daily') {
// Daily at HH:MM
const [hours, minutes] = scheduleValue.split(':').map(Number);
const next = new Date(now);
next.setHours(hours, minutes, 0, 0);
// If time has passed today, schedule for tomorrow
if (next <= now) {
next.setDate(next.getDate() + 1);
}
return next;
} else if (scheduleType === 'hourly') {
// Every N hours
const hours = parseInt(scheduleValue);
return new Date(now.getTime() + hours * 3600000);
}
return null;
}
// Run scheduler every minute
setInterval(processScheduledCommands, 60 * 1000);
// Initial run on startup
setTimeout(processScheduledCommands, 5000);
// WebSocket connections // WebSocket connections
const clients = new Set(); const clients = new Set();
const workers = new Map(); // Map worker_id -> WebSocket connection
wss.on('connection', (ws) => { wss.on('connection', (ws) => {
clients.add(ws); clients.add(ws);
ws.on('close', () => clients.delete(ws));
// Handle incoming messages from workers
ws.on('message', async (data) => {
try {
const message = JSON.parse(data.toString());
console.log('WebSocket message received:', message.type);
if (message.type === 'command_result') {
// Handle command result from worker
const { execution_id, worker_id, success, stdout, stderr, duration, timestamp } = message;
// Add result to execution logs
await addExecutionLog(execution_id, {
step: 'command_execution',
action: 'command_result',
worker_id: worker_id,
success: success,
stdout: stdout,
stderr: stderr,
duration: duration,
timestamp: timestamp || new Date().toISOString()
});
// Update execution status to completed or failed
const finalStatus = success ? 'completed' : 'failed';
await updateExecutionStatus(execution_id, finalStatus);
// Broadcast result to all connected clients
broadcast({
type: 'command_result',
execution_id: execution_id,
worker_id: worker_id,
success: success,
stdout: stdout,
stderr: stderr
});
console.log(`Command result received for execution ${execution_id}: ${finalStatus}`);
}
if (message.type === 'workflow_result') {
// Handle workflow result from worker
const { execution_id, worker_id, success, message: resultMessage, timestamp } = message;
// Add final result to logs
await addExecutionLog(execution_id, {
step: 'workflow_completion',
action: 'workflow_result',
worker_id: worker_id,
success: success,
message: resultMessage,
timestamp: timestamp || new Date().toISOString()
});
// Update execution status
const finalStatus = success ? 'completed' : 'failed';
await updateExecutionStatus(execution_id, finalStatus);
// Broadcast completion to all clients
broadcast({
type: 'workflow_result',
execution_id: execution_id,
status: finalStatus,
success: success,
message: resultMessage
});
console.log(`Workflow result received for execution ${execution_id}: ${finalStatus}`);
}
if (message.type === 'worker_connect') {
// Handle worker connection
const { worker_id, worker_name } = message;
console.log(`Worker connected: ${worker_name} (${worker_id})`);
// Find the database worker ID by name
const [dbWorkers] = await pool.query(
'SELECT id FROM workers WHERE name = ?',
[worker_name]
);
if (dbWorkers.length > 0) {
const dbWorkerId = dbWorkers[0].id;
// Store worker WebSocket connection using BOTH IDs
workers.set(worker_id, ws); // Runtime ID
workers.set(dbWorkerId, ws); // Database ID
// Store mapping for cleanup
ws.workerId = worker_id;
ws.dbWorkerId = dbWorkerId;
console.log(`Mapped worker: runtime_id=${worker_id}, db_id=${dbWorkerId}, name=${worker_name}`);
// Update worker status to online
await pool.query(
`UPDATE workers SET status='online', last_heartbeat=NOW() WHERE id=?`,
[dbWorkerId]
);
// Broadcast worker status update with database ID
broadcast({
type: 'worker_update',
worker_id: dbWorkerId,
status: 'online'
});
} else {
console.log(`Worker ${worker_name} not found in database, will be created on heartbeat`);
}
}
if (message.type === 'pong') {
// Handle worker pong response
const { worker_id } = message;
await pool.query(
`UPDATE workers SET last_heartbeat=NOW() WHERE id=?`,
[worker_id]
);
}
} catch (error) {
console.error('WebSocket message error:', error);
}
});
ws.on('close', () => {
clients.delete(ws);
// Remove worker from workers map when disconnected (both runtime and db IDs)
if (ws.workerId) {
workers.delete(ws.workerId);
console.log(`Worker ${ws.workerId} (runtime ID) disconnected`);
}
if (ws.dbWorkerId) {
workers.delete(ws.dbWorkerId);
console.log(`Worker ${ws.dbWorkerId} (database ID) disconnected`);
}
});
}); });
// Broadcast to all connected clients // Broadcast to all connected clients
@@ -270,22 +537,99 @@ app.post('/api/executions', authenticateSSO, async (req, res) => {
app.get('/api/executions', authenticateSSO, async (req, res) => { app.get('/api/executions', authenticateSSO, async (req, res) => {
try { try {
const limit = parseInt(req.query.limit) || 50;
const offset = parseInt(req.query.offset) || 0;
const [rows] = await pool.query( const [rows] = await pool.query(
'SELECT e.*, w.name as workflow_name FROM executions e LEFT JOIN workflows w ON e.workflow_id = w.id ORDER BY e.started_at DESC LIMIT 50' 'SELECT e.*, w.name as workflow_name FROM executions e LEFT JOIN workflows w ON e.workflow_id = w.id ORDER BY e.started_at DESC LIMIT ? OFFSET ?',
[limit, offset]
); );
res.json(rows);
// Get total count
const [countRows] = await pool.query('SELECT COUNT(*) as total FROM executions');
const total = countRows[0].total;
res.json({
executions: rows,
total: total,
limit: limit,
offset: offset,
hasMore: offset + rows.length < total
});
} catch (error) { } catch (error) {
res.status(500).json({ error: error.message }); res.status(500).json({ error: error.message });
} }
}); });
app.get('/api/executions/:id', authenticateSSO, async (req, res) => { app.delete('/api/executions/:id', authenticateSSO, async (req, res) => {
try { try {
const [rows] = await pool.query('SELECT * FROM executions WHERE id = ?', [req.params.id]); await pool.query('DELETE FROM executions WHERE id = ?', [req.params.id]);
if (rows.length === 0) { broadcast({ type: 'execution_deleted', execution_id: req.params.id });
return res.status(404).json({ error: 'Not found' }); res.json({ success: true });
} catch (error) {
res.status(500).json({ error: error.message });
}
});
// Scheduled Commands API
app.get('/api/scheduled-commands', authenticateSSO, async (req, res) => {
try {
const [schedules] = await pool.query(
'SELECT * FROM scheduled_commands ORDER BY created_at DESC'
);
res.json(schedules);
} catch (error) {
res.status(500).json({ error: error.message });
}
});
app.post('/api/scheduled-commands', authenticateSSO, async (req, res) => {
try {
const { name, command, worker_ids, schedule_type, schedule_value } = req.body;
if (!name || !command || !worker_ids || !schedule_type || !schedule_value) {
return res.status(400).json({ error: 'Missing required fields' });
} }
res.json(rows[0]);
const id = generateUUID();
const nextRun = calculateNextRun(schedule_type, schedule_value);
await pool.query(
`INSERT INTO scheduled_commands
(id, name, command, worker_ids, schedule_type, schedule_value, created_by, next_run)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)`,
[id, name, command, JSON.stringify(worker_ids), schedule_type, schedule_value, req.user.username, nextRun]
);
res.json({ success: true, id });
} catch (error) {
res.status(500).json({ error: error.message });
}
});
app.put('/api/scheduled-commands/:id/toggle', authenticateSSO, async (req, res) => {
try {
const { id } = req.params;
const [current] = await pool.query('SELECT enabled FROM scheduled_commands WHERE id = ?', [id]);
if (current.length === 0) {
return res.status(404).json({ error: 'Schedule not found' });
}
const newEnabled = !current[0].enabled;
await pool.query('UPDATE scheduled_commands SET enabled = ? WHERE id = ?', [newEnabled, id]);
res.json({ success: true, enabled: newEnabled });
} catch (error) {
res.status(500).json({ error: error.message });
}
});
app.delete('/api/scheduled-commands/:id', authenticateSSO, async (req, res) => {
try {
const { id } = req.params;
await pool.query('DELETE FROM scheduled_commands WHERE id = ?', [id]);
res.json({ success: true });
} catch (error) { } catch (error) {
res.status(500).json({ error: error.message }); res.status(500).json({ error: error.message });
} }
@@ -666,22 +1010,38 @@ app.post('/api/workers/:id/command', authenticateSSO, async (req, res) => {
try { try {
const { command } = req.body; const { command } = req.body;
const executionId = generateUUID(); const executionId = generateUUID();
const workerId = req.params.id;
// Send command via WebSocket
// Create execution record in database
await pool.query(
'INSERT INTO executions (id, workflow_id, status, started_by, started_at, logs) VALUES (?, ?, ?, ?, NOW(), ?)',
[executionId, null, 'running', req.user.username, JSON.stringify([{
step: 'quick_command',
action: 'command_sent',
worker_id: workerId,
command: command,
timestamp: new Date().toISOString()
}])]
);
// Send command via WebSocket to specific worker
const commandMessage = { const commandMessage = {
type: 'execute_command', type: 'execute_command',
execution_id: executionId, execution_id: executionId,
command: command, command: command,
worker_id: req.params.id, worker_id: workerId,
timeout: 60000 timeout: 60000
}; };
clients.forEach(client => { const workerWs = workers.get(workerId);
if (client.readyState === WebSocket.OPEN) { if (!workerWs || workerWs.readyState !== WebSocket.OPEN) {
client.send(JSON.stringify(commandMessage)); return res.status(400).json({ error: 'Worker not connected' });
} }
});
workerWs.send(JSON.stringify(commandMessage));
console.log(`Command sent to worker ${workerId}: ${command}`);
broadcast({ type: 'execution_started', execution_id: executionId, workflow_id: null });
res.json({ success: true, execution_id: executionId }); res.json({ success: true, execution_id: executionId });
} catch (error) { } catch (error) {
res.status(500).json({ error: error.message }); res.status(500).json({ error: error.message });