const express = require('express'); const http = require('http'); const WebSocket = require('ws'); const mysql = require('mysql2/promise'); const crypto = require('crypto'); require('dotenv').config(); const app = express(); const server = http.createServer(app); const wss = new WebSocket.Server({ server }); // Middleware app.use(express.json()); app.use(express.static('public')); // UUID generator function generateUUID() { return crypto.randomUUID(); } // Database pool const pool = mysql.createPool({ host: process.env.DB_HOST, port: process.env.DB_PORT || 3306, user: process.env.DB_USER, password: process.env.DB_PASSWORD, database: process.env.DB_NAME, waitForConnections: true, connectionLimit: 10, queueLimit: 0 }); // Initialize database tables async function initDatabase() { const connection = await pool.getConnection(); try { await connection.query(` CREATE TABLE IF NOT EXISTS users ( id VARCHAR(36) PRIMARY KEY, username VARCHAR(255) UNIQUE NOT NULL, display_name VARCHAR(255), email VARCHAR(255), groups TEXT, last_login TIMESTAMP, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ) `); await connection.query(` CREATE TABLE IF NOT EXISTS workers ( id VARCHAR(36) PRIMARY KEY, name VARCHAR(255) UNIQUE NOT NULL, status VARCHAR(50) NOT NULL, last_heartbeat TIMESTAMP NULL, api_key VARCHAR(255), metadata JSON, INDEX idx_status (status), INDEX idx_heartbeat (last_heartbeat) ) `); await connection.query(` CREATE TABLE IF NOT EXISTS workflows ( id VARCHAR(36) PRIMARY KEY, name VARCHAR(255) NOT NULL, description TEXT, definition JSON NOT NULL, created_by VARCHAR(255), created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, INDEX idx_name (name) ) `); await connection.query(` CREATE TABLE IF NOT EXISTS executions ( id VARCHAR(36) PRIMARY KEY, workflow_id VARCHAR(36) NOT NULL, status VARCHAR(50) NOT NULL, started_by VARCHAR(255), started_at TIMESTAMP NULL, completed_at TIMESTAMP NULL, logs JSON, FOREIGN KEY (workflow_id) REFERENCES workflows(id) ON DELETE CASCADE, INDEX idx_workflow (workflow_id), INDEX idx_status (status), INDEX idx_started (started_at) ) `); console.log('Database tables initialized successfully'); } catch (error) { console.error('Database initialization error:', error); throw error; } finally { connection.release(); } } // WebSocket connections const clients = new Set(); wss.on('connection', (ws) => { clients.add(ws); ws.on('close', () => clients.delete(ws)); }); // Broadcast to all connected clients function broadcast(data) { clients.forEach(client => { if (client.readyState === WebSocket.OPEN) { client.send(JSON.stringify(data)); } }); } // Authelia SSO Middleware async function authenticateSSO(req, res, next) { // Check for Authelia headers const remoteUser = req.headers['remote-user']; const remoteName = req.headers['remote-name']; const remoteEmail = req.headers['remote-email']; const remoteGroups = req.headers['remote-groups']; if (!remoteUser) { return res.status(401).json({ error: 'Not authenticated', message: 'Please access this service through auth.lotusguild.org' }); } // Check if user is in allowed groups (admin or employee) const groups = remoteGroups ? remoteGroups.split(',').map(g => g.trim()) : []; const allowedGroups = ['admin', 'employee']; const hasAccess = groups.some(g => allowedGroups.includes(g)); if (!hasAccess) { return res.status(403).json({ error: 'Access denied', message: 'You must be in admin or employee group' }); } // Store/update user in database try { const userId = generateUUID(); await pool.query( `INSERT INTO users (id, username, display_name, email, groups, last_login) VALUES (?, ?, ?, ?, ?, NOW()) ON DUPLICATE KEY UPDATE display_name=VALUES(display_name), email=VALUES(email), groups=VALUES(groups), last_login=NOW()`, [userId, remoteUser, remoteName, remoteEmail, remoteGroups] ); } catch (error) { console.error('Error updating user:', error); } // Attach user info to request req.user = { username: remoteUser, name: remoteName || remoteUser, email: remoteEmail || '', groups: groups, isAdmin: groups.includes('admin') }; next(); } // Routes - All protected by SSO app.get('/api/user', authenticateSSO, (req, res) => { res.json(req.user); }); app.get('/api/workflows', authenticateSSO, async (req, res) => { try { const [rows] = await pool.query('SELECT * FROM workflows ORDER BY created_at DESC'); res.json(rows); } catch (error) { res.status(500).json({ error: error.message }); } }); app.post('/api/workflows', authenticateSSO, async (req, res) => { try { const { name, description, definition } = req.body; const id = generateUUID(); await pool.query( 'INSERT INTO workflows (id, name, description, definition, created_by) VALUES (?, ?, ?, ?, ?)', [id, name, description, JSON.stringify(definition), req.user.username] ); res.json({ id, name, description, definition }); broadcast({ type: 'workflow_created', workflow_id: id }); } catch (error) { res.status(500).json({ error: error.message }); } }); app.delete('/api/workflows/:id', authenticateSSO, async (req, res) => { try { // Only admins can delete workflows if (!req.user.isAdmin) { return res.status(403).json({ error: 'Admin access required' }); } await pool.query('DELETE FROM workflows WHERE id = ?', [req.params.id]); res.json({ success: true }); broadcast({ type: 'workflow_deleted', workflow_id: req.params.id }); } catch (error) { res.status(500).json({ error: error.message }); } }); app.get('/api/workers', authenticateSSO, async (req, res) => { try { const [rows] = await pool.query('SELECT * FROM workers ORDER BY name'); res.json(rows); } catch (error) { res.status(500).json({ error: error.message }); } }); app.post('/api/workers/heartbeat', async (req, res) => { try { const { worker_id, name, metadata } = req.body; const apiKey = req.headers['x-api-key']; // Verify API key if (apiKey !== process.env.WORKER_API_KEY) { return res.status(401).json({ error: 'Invalid API key' }); } await pool.query( `INSERT INTO workers (id, name, status, last_heartbeat, api_key, metadata) VALUES (?, ?, 'online', NOW(), ?, ?) ON DUPLICATE KEY UPDATE status='online', last_heartbeat=NOW(), metadata=VALUES(metadata)`, [worker_id, name, apiKey, JSON.stringify(metadata)] ); broadcast({ type: 'worker_update', worker_id, status: 'online' }); res.json({ success: true }); } catch (error) { res.status(500).json({ error: error.message }); } }); app.post('/api/executions', authenticateSSO, async (req, res) => { try { const { workflow_id } = req.body; const id = generateUUID(); await pool.query( 'INSERT INTO executions (id, workflow_id, status, started_by, started_at, logs) VALUES (?, ?, ?, ?, NOW(), ?)', [id, workflow_id, 'running', req.user.username, JSON.stringify([])] ); broadcast({ type: 'execution_started', execution_id: id, workflow_id }); res.json({ id, workflow_id, status: 'running' }); } catch (error) { res.status(500).json({ error: error.message }); } }); app.get('/api/executions', authenticateSSO, async (req, res) => { try { const [rows] = await pool.query( 'SELECT e.*, w.name as workflow_name FROM executions e LEFT JOIN workflows w ON e.workflow_id = w.id ORDER BY e.started_at DESC LIMIT 50' ); res.json(rows); } catch (error) { res.status(500).json({ error: error.message }); } }); app.get('/api/executions/:id', authenticateSSO, async (req, res) => { try { const [rows] = await pool.query('SELECT * FROM executions WHERE id = ?', [req.params.id]); if (rows.length === 0) { return res.status(404).json({ error: 'Not found' }); } res.json(rows[0]); } catch (error) { res.status(500).json({ error: error.message }); } }); // Health check (no auth required) app.get('/health', async (req, res) => { try { await pool.query('SELECT 1'); res.json({ status: 'ok', timestamp: new Date().toISOString(), database: 'connected', auth: 'authelia-sso' }); } catch (error) { res.status(500).json({ status: 'error', timestamp: new Date().toISOString(), database: 'disconnected', error: error.message }); } }); // Start server const PORT = process.env.PORT || 8080; const HOST = process.env.HOST || '0.0.0.0'; initDatabase().then(() => { server.listen(PORT, HOST, () => { console.log(`PULSE Server running on http://${HOST}:${PORT}`); console.log(`Connected to MariaDB at ${process.env.DB_HOST}`); console.log(`Authentication: Authelia SSO`); console.log(`Worker API Key configured: ${process.env.WORKER_API_KEY ? 'Yes' : 'No'}`); }); }).catch(err => { console.error('Failed to start server:', err); process.exit(1); });