From b29b70d88b9d209d25c9064de5da0b170c55cd44 Mon Sep 17 00:00:00 2001 From: Jared Vititoe Date: Sun, 15 Mar 2026 09:19:07 -0400 Subject: [PATCH] Improve Pulse execution reliability: retry logic, better logging, SSH hardening MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit monitor.py / diagnose.py PulseClient.run_command: - Add automatic single retry on submit failure, explicit Pulse failure (status=failed/timed_out), and poll timeout — handles transient SSH or Pulse hiccups without dropping the whole collection cycle - Log execution_id and full Pulse URL on every failure so failed runs can be found in the Pulse UI immediately - Handle 'timed_out' and 'cancelled' Pulse statuses explicitly (previously only 'failed' was caught; others would spin until local deadline) - Poll every 2s instead of 1s to reduce Pulse API chatter SSH command options (_ssh_batch + diagnose.py): - Add BatchMode=yes: aborts immediately instead of hanging on a password prompt if key auth fails - Add ServerAliveInterval=10 ServerAliveCountMax=2: SSH detects a hung remote command within ~20s instead of sitting silent until the 45s Pulse timeout expires Co-Authored-By: Claude Sonnet 4.6 --- diagnose.py | 4 +++- monitor.py | 43 ++++++++++++++++++++++++++++++++++++------- 2 files changed, 39 insertions(+), 8 deletions(-) diff --git a/diagnose.py b/diagnose.py index 79c7d9c..30e2fb4 100644 --- a/diagnose.py +++ b/diagnose.py @@ -77,7 +77,9 @@ class DiagnosticsRunner: return ( f'ssh -o StrictHostKeyChecking=no -o ConnectTimeout=5 ' - f'-o LogLevel=ERROR root@{ip_q} \'{remote_cmd}\'' + f'-o BatchMode=yes -o LogLevel=ERROR ' + f'-o ServerAliveInterval=10 -o ServerAliveCountMax=2 ' + f'root@{ip_q} \'{remote_cmd}\'' ) # ------------------------------------------------------------------ diff --git a/monitor.py b/monitor.py index 6961ab7..3a12544 100644 --- a/monitor.py +++ b/monitor.py @@ -246,11 +246,16 @@ class PulseClient: 'Content-Type': 'application/json', }) - def run_command(self, command: str) -> Optional[str]: - """Submit *command* to Pulse, poll until done, return stdout or None.""" + def run_command(self, command: str, _retry: bool = True) -> Optional[str]: + """Submit *command* to Pulse, poll until done, return stdout or None. + + Retries once automatically on transient submit failures or timeouts. + """ self.last_execution_id = None if not self.url or not self.api_key or not self.worker_id: return None + + # Submit try: resp = self.session.post( f'{self.url}/api/internal/command', @@ -262,11 +267,16 @@ class PulseClient: self.last_execution_id = execution_id except Exception as e: logger.error(f'Pulse command submit failed: {e}') + if _retry: + logger.info('Retrying Pulse command submit in 5s...') + time.sleep(5) + return self.run_command(command, _retry=False) return None + # Poll deadline = time.time() + self.timeout while time.time() < deadline: - time.sleep(1) + time.sleep(2) try: r = self.session.get( f'{self.url}/api/internal/executions/{execution_id}', @@ -281,11 +291,28 @@ class PulseClient: if entry.get('action') == 'command_result': return entry.get('stdout', '') return '' - if status == 'failed': + if status in ('failed', 'timed_out', 'cancelled'): + logger.error( + f'Pulse execution {execution_id} ended with status={status!r}; ' + f'view at {self.url}/executions/{execution_id}' + ) + if _retry and status != 'cancelled': + logger.info('Retrying failed Pulse command in 5s...') + time.sleep(5) + return self.run_command(command, _retry=False) return None except Exception as e: - logger.error(f'Pulse poll failed: {e}') - logger.warning(f'Pulse command timed out after {self.timeout}s') + logger.error(f'Pulse poll failed for {execution_id}: {e}') + + logger.warning( + f'Pulse command timed out after {self.timeout}s ' + f'(execution_id={execution_id}); ' + f'view at {self.url}/executions/{execution_id}' + ) + if _retry: + logger.info('Retrying timed-out Pulse command in 5s...') + time.sleep(5) + return self.run_command(command, _retry=False) return None @@ -336,7 +363,9 @@ class LinkStatsCollector: ssh_cmd = ( f'ssh -o StrictHostKeyChecking=no -o ConnectTimeout=5 ' - f'-o LogLevel=ERROR root@{ip} "{shell_cmd}"' + f'-o BatchMode=yes -o LogLevel=ERROR ' + f'-o ServerAliveInterval=10 -o ServerAliveCountMax=2 ' + f'root@{ip} "{shell_cmd}"' ) output = self.pulse.run_command(ssh_cmd) if output is None: