From d07dd8d4bef8b24aa598c6242b1a1379b8fcf748 Mon Sep 17 00:00:00 2001 From: Timon de Groot Date: Mon, 19 Jan 2026 16:01:43 +0100 Subject: [PATCH] Brancher: ping host before logbook poll This makes it possible to reuse a brancher node that does not have any items in the logbook anymore, which typically happens on older Brancher nodes. Fixes #185 --- grumphp.yml | 1 + psalm.xml | 2 +- src/Brancher/BrancherHypernodeManager.php | 162 ++++++++--- src/Brancher/SshPoller.php | 44 +++ .../Brancher/BrancherHypernodeManagerTest.php | 261 ++++++++++++++++++ tests/Unit/Brancher/TestSshPoller.php | 73 +++++ 6 files changed, 507 insertions(+), 36 deletions(-) create mode 100644 src/Brancher/SshPoller.php create mode 100644 tests/Unit/Brancher/BrancherHypernodeManagerTest.php create mode 100644 tests/Unit/Brancher/TestSshPoller.php diff --git a/grumphp.yml b/grumphp.yml index a67c5db..820fad0 100644 --- a/grumphp.yml +++ b/grumphp.yml @@ -5,6 +5,7 @@ grumphp: warning_severity: 0 whitelist_patterns: - /^src\/(.*)/ + - /^tests\/(.*)/ triggered_by: [php] psalm: config: psalm.xml diff --git a/psalm.xml b/psalm.xml index 4961e8f..c5218c1 100644 --- a/psalm.xml +++ b/psalm.xml @@ -1,6 +1,6 @@ log = $log; - $this->hypernodeClient = HypernodeClientFactory::create(getenv('HYPERNODE_API_TOKEN') ?: ''); + $this->hypernodeClient = $hypernodeClient + ?? HypernodeClientFactory::create(getenv('HYPERNODE_API_TOKEN') ?: ''); + $this->sshPoller = $sshPoller ?? new SshPoller(); } /** @@ -105,6 +122,11 @@ public function createForHypernode(string $hypernode, array $data = []): string /** * Wait for brancher Hypernode to become available. * + * This method first attempts a quick SSH connectivity check. If the brancher is already + * reachable (e.g., when reusing an existing brancher), it returns early. Otherwise, it + * falls back to polling the API logbook for delivery status, then performs a final SSH + * reachability check. + * * @param string $brancherHypernode Name of the brancher Hypernode * @param int $timeout Maximum time to wait for availability * @param int $reachabilityCheckCount Number of consecutive successful checks required @@ -121,24 +143,58 @@ public function waitForAvailability( int $reachabilityCheckCount = 6, int $reachabilityCheckInterval = 10 ): void { - $latest = microtime(true); - $timeElapsed = 0; + $latest = $this->sshPoller->microtime(); + $timeElapsed = 0.0; + + // Phase 1: SSH-first check, early return for reused delivered branchers + $this->log->info( + sprintf('Attempting SSH connectivity check for brancher Hypernode %s...', $brancherHypernode) + ); + + $isReachable = $this->pollSshConnectivity( + $brancherHypernode, + self::PRE_POLL_SUCCESS_COUNT, + self::PRE_POLL_FAIL_COUNT, + $reachabilityCheckInterval, + $timeElapsed, + $latest, + $timeout + ); + if ($isReachable) { + $this->log->info( + sprintf('Brancher Hypernode %s is reachable!', $brancherHypernode) + ); + return; + } + + $this->log->info( + sprintf( + 'SSH check inconclusive for brancher Hypernode %s, falling back to delivery check...', + $brancherHypernode + ) + ); + + // Phase 2: Wait for delivery by polling the logbook $resolved = false; $interval = 3; $allowedErrorWindow = 3; + $logbookStartTime = $timeElapsed; while ($timeElapsed < $timeout) { - $now = microtime(true); + $now = $this->sshPoller->microtime(); $timeElapsed += $now - $latest; $latest = $now; try { $flows = $this->hypernodeClient->logbook->getList($brancherHypernode); - $relevantFlows = array_filter($flows, fn(Flow $flow) => in_array($flow->name, ["ensure_app", "ensure_copied_app"], true)); + $relevantFlows = array_filter( + $flows, + fn(Flow $flow) => in_array($flow->name, self::RELEVANT_FLOW_NAMES, true) + ); $failedFlows = array_filter($relevantFlows, fn(Flow $flow) => $flow->isReverted()); $completedFlows = array_filter($relevantFlows, fn(Flow $flow) => $flow->isComplete()); - if (count($failedFlows) === count($relevantFlows)) { + if (count($relevantFlows) > 0 && count($failedFlows) === count($relevantFlows)) { throw new CreateBrancherHypernodeFailedException(); } @@ -151,9 +207,9 @@ public function waitForAvailability( // Otherwise, there's an error, and it should be propagated. if ($e->getCode() !== 404) { throw $e; - } elseif ($timeElapsed < $allowedErrorWindow) { + } elseif (($timeElapsed - $logbookStartTime) < $allowedErrorWindow) { // Sometimes we get an error where the logbook is not yet available, but it will be soon. - // We allow a small window for this to happen, and then we throw an exception. + // We allow a small window for this to happen, and then we continue polling. $this->log->info( sprintf( 'Got an expected exception during the allowed error window of HTTP code %d, waiting for %s to become available.', @@ -161,11 +217,16 @@ public function waitForAvailability( $brancherHypernode ) ); - continue; } } - sleep($interval); + $this->sshPoller->sleep($interval); + } + + if (!$resolved) { + throw new TimeoutException( + sprintf('Timed out waiting for brancher Hypernode %s to be delivered', $brancherHypernode) + ); } $this->log->info( @@ -175,35 +236,75 @@ public function waitForAvailability( ) ); - if (!$resolved) { + // Phase 3: Final SSH reachability check + $isReachable = $this->pollSshConnectivity( + $brancherHypernode, + $reachabilityCheckCount, + 0, // No max failures, rely on timeout + $reachabilityCheckInterval, + $timeElapsed, + $latest, + $timeout + ); + if (!$isReachable) { throw new TimeoutException( - sprintf('Timed out waiting for brancher Hypernode %s to be delivered', $brancherHypernode) + sprintf('Timed out waiting for brancher Hypernode %s to become reachable', $brancherHypernode) ); } + $this->log->info( + sprintf('Brancher Hypernode %s became reachable!', $brancherHypernode) + ); + } + + /** + * Poll SSH connectivity until we get enough consecutive successes or hit a limit. + * + * @param string $brancherHypernode Hostname to check + * @param int $requiredConsecutiveSuccesses Number of consecutive successes required + * @param int $maxFailedAttempts Maximum failed attempts before giving up (0 = no limit, use timeout only) + * @param int $checkInterval Seconds between checks + * @param float $timeElapsed Reference to track elapsed time + * @param float $latest Reference to track latest timestamp + * @param int $timeout Maximum time allowed + * @return bool True if SSH check succeeded, false if we should fall back to other methods + */ + private function pollSshConnectivity( + string $brancherHypernode, + int $requiredConsecutiveSuccesses, + int $maxFailedAttempts, + int $checkInterval, + float &$timeElapsed, + float &$latest, + int $timeout + ): bool { $consecutiveSuccesses = 0; + $failedAttempts = 0; + while ($timeElapsed < $timeout) { - $now = microtime(true); + $now = $this->sshPoller->microtime(); $timeElapsed += $now - $latest; $latest = $now; - $connection = @fsockopen(sprintf("%s.hypernode.io", $brancherHypernode), 22); - if ($connection) { - fclose($connection); + // Check if we've hit the max failed attempts limit (0 = unlimited) + if ($maxFailedAttempts > 0 && $failedAttempts >= $maxFailedAttempts) { + return false; + } + + if ($this->sshPoller->poll($brancherHypernode)) { $consecutiveSuccesses++; $this->log->info( sprintf( 'Brancher Hypernode %s reachability check %d/%d succeeded.', $brancherHypernode, $consecutiveSuccesses, - $reachabilityCheckCount + $requiredConsecutiveSuccesses ) ); - if ($consecutiveSuccesses >= $reachabilityCheckCount) { - break; + if ($consecutiveSuccesses >= $requiredConsecutiveSuccesses) { + return true; } - sleep($reachabilityCheckInterval); } else { if ($consecutiveSuccesses > 0) { $this->log->info( @@ -211,27 +312,18 @@ public function waitForAvailability( 'Brancher Hypernode %s reachability check failed, resetting counter (was at %d/%d).', $brancherHypernode, $consecutiveSuccesses, - $reachabilityCheckCount + $requiredConsecutiveSuccesses ) ); } $consecutiveSuccesses = 0; - sleep($reachabilityCheckInterval); + $failedAttempts++; } - } - if ($consecutiveSuccesses < $reachabilityCheckCount) { - throw new TimeoutException( - sprintf('Timed out waiting for brancher Hypernode %s to become reachable', $brancherHypernode) - ); + $this->sshPoller->sleep($checkInterval); } - $this->log->info( - sprintf( - 'Brancher Hypernode %s became reachable!', - $brancherHypernode - ) - ); + return false; } /** diff --git a/src/Brancher/SshPoller.php b/src/Brancher/SshPoller.php new file mode 100644 index 0000000..33c4526 --- /dev/null +++ b/src/Brancher/SshPoller.php @@ -0,0 +1,44 @@ +logger = $this->createMock(LoggerInterface::class); + $this->hypernodeClient = $this->createMock(HypernodeClient::class); + $this->logbook = $this->createMock(Logbook::class); + $this->hypernodeClient->logbook = $this->logbook; + $this->sshPoller = new TestSshPoller(); + $this->sshPoller->setMicrotime(1000.0); + + $this->manager = new BrancherHypernodeManager( + $this->logger, + $this->hypernodeClient, + $this->sshPoller + ); + } + + public function testSshFirstCheckSucceedsReturnsEarly(): void + { + $this->sshPoller->pollResults = [true, true, true]; + + $this->logbook->expects($this->never())->method('getList'); + + $this->manager->waitForAvailability('test-brancher', 1500, 6, 10); + + $this->assertSame(3, $this->sshPoller->pollCount); + } + + public function testSshFirstCheckFailsFallsBackToLogbook(): void + { + $this->sshPoller->pollResults = array_merge( + array_fill(0, 5, false), + array_fill(0, 6, true) + ); + + $flow = $this->createFlow('ensure_app', Flow::STATE_SUCCESS); + $this->logbook->expects($this->once()) + ->method('getList') + ->with('test-brancher') + ->willReturn([$flow]); + + $this->manager->waitForAvailability('test-brancher', 1500, 6, 10); + + $this->assertSame(11, $this->sshPoller->pollCount); + } + + public function testLogbookAllFlowsRevertedThrowsException(): void + { + $this->sshPoller->pollResults = array_fill(0, 5, false); + + $flow = $this->createFlow('ensure_app', Flow::STATE_REVERTED); + $this->logbook->expects($this->once()) + ->method('getList') + ->with('test-brancher') + ->willReturn([$flow]); + + $this->expectException(CreateBrancherHypernodeFailedException::class); + + $this->manager->waitForAvailability('test-brancher', 1500, 6, 10); + } + + public function testTimeoutDuringDeliveryThrowsException(): void + { + $this->sshPoller->pollResults = array_fill(0, 5, false); + $this->sshPoller->sleepTimeAdvance = 10; + + $flow = $this->createFlow('ensure_app', Flow::STATE_RUNNING); + $this->logbook->method('getList') + ->with('test-brancher') + ->willReturn([$flow]); + + $this->expectException(TimeoutException::class); + $this->expectExceptionMessage('Timed out waiting for brancher Hypernode test-brancher to be delivered'); + + $this->manager->waitForAvailability('test-brancher', 30, 6, 10); + } + + public function testTimeoutDuringReachabilityThrowsException(): void + { + $this->sshPoller->pollResults = array_fill(0, 100, false); + $this->sshPoller->sleepTimeAdvance = 10; + + $flow = $this->createFlow('ensure_app', Flow::STATE_SUCCESS); + $this->logbook->method('getList') + ->with('test-brancher') + ->willReturn([$flow]); + + $this->expectException(TimeoutException::class); + $this->expectExceptionMessage('Timed out waiting for brancher Hypernode test-brancher to become reachable'); + + $this->manager->waitForAvailability('test-brancher', 100, 6, 10); + } + + public function testLogbook404DuringAllowedWindowContinuesPolling(): void + { + $this->sshPoller->pollResults = array_merge( + array_fill(0, 5, false), + array_fill(0, 6, true) + ); + + $response = $this->createMock(ResponseInterface::class); + $response->method('getStatusCode')->willReturn(404); + $response->method('getBody')->willReturn('Not Found'); + $exception404 = new HypernodeApiClientException($response); + + $flow = $this->createFlow('ensure_app', Flow::STATE_SUCCESS); + $this->logbook->expects($this->exactly(2)) + ->method('getList') + ->with('test-brancher') + ->willReturnOnConsecutiveCalls( + $this->throwException($exception404), + [$flow] + ); + + $this->manager->waitForAvailability('test-brancher', 1500, 6, 10); + + $this->assertSame(11, $this->sshPoller->pollCount); + } + + public function testLogbookNon404ErrorPropagates(): void + { + $this->sshPoller->pollResults = array_fill(0, 5, false); + + $response = $this->createMock(ResponseInterface::class); + $response->method('getStatusCode')->willReturn(500); + $response->method('getBody')->willReturn('Internal Server Error'); + $exception500 = new HypernodeApiClientException($response); + + $this->logbook->expects($this->once()) + ->method('getList') + ->with('test-brancher') + ->willThrowException($exception500); + + $this->expectException(HypernodeApiClientException::class); + + $this->manager->waitForAvailability('test-brancher', 1500, 6, 10); + } + + public function testSshFirstCheckIntermittentFailuresResetCounter(): void + { + $this->sshPoller->pollResults = [true, true, false, true, true, true]; + + $this->logbook->expects($this->never())->method('getList'); + + $this->manager->waitForAvailability('test-brancher', 1500, 6, 10); + + $this->assertSame(6, $this->sshPoller->pollCount); + } + + public function testSshFirstCheckExhaustsMaxFailuresBeforeFallback(): void + { + $this->sshPoller->pollResults = array_merge( + array_fill(0, 5, false), + array_fill(0, 6, true) + ); + + $flow = $this->createFlow('ensure_app', Flow::STATE_SUCCESS); + $this->logbook->expects($this->once()) + ->method('getList') + ->willReturn([$flow]); + + $this->manager->waitForAvailability('test-brancher', 1500, 6, 10); + + $this->assertSame(11, $this->sshPoller->pollCount); + } + + public function testMultipleFlowsAllMustComplete(): void + { + $this->sshPoller->pollResults = array_merge( + array_fill(0, 5, false), + array_fill(0, 6, true) + ); + + $flowComplete = $this->createFlow('ensure_app', Flow::STATE_SUCCESS); + $flowRunning = $this->createFlow('ensure_copied_app', Flow::STATE_RUNNING); + $flowComplete2 = $this->createFlow('ensure_copied_app', Flow::STATE_SUCCESS); + + $this->logbook->expects($this->exactly(2)) + ->method('getList') + ->willReturnOnConsecutiveCalls( + [$flowComplete, $flowRunning], + [$flowComplete, $flowComplete2] + ); + + $this->manager->waitForAvailability('test-brancher', 1500, 6, 10); + + $this->assertSame(11, $this->sshPoller->pollCount); + } + + public function testEmptyLogbookContinuesPolling(): void + { + $this->sshPoller->pollResults = array_merge( + array_fill(0, 5, false), + array_fill(0, 6, true) + ); + + $flow = $this->createFlow('ensure_app', Flow::STATE_SUCCESS); + $this->logbook->expects($this->exactly(2)) + ->method('getList') + ->willReturnOnConsecutiveCalls([], [$flow]); + + $this->manager->waitForAvailability('test-brancher', 1500, 6, 10); + + $this->assertSame(11, $this->sshPoller->pollCount); + } + + public function testIrrelevantFlowsAreIgnored(): void + { + $this->sshPoller->pollResults = array_merge( + array_fill(0, 5, false), + array_fill(0, 6, true) + ); + + $irrelevantFlow = $this->createFlow('some_other_flow', Flow::STATE_RUNNING); + $relevantFlow = $this->createFlow('ensure_app', Flow::STATE_SUCCESS); + + $this->logbook->expects($this->exactly(2)) + ->method('getList') + ->willReturnOnConsecutiveCalls( + [$irrelevantFlow], + [$relevantFlow, $irrelevantFlow] + ); + + $this->manager->waitForAvailability('test-brancher', 1500, 6, 10); + + $this->assertSame(11, $this->sshPoller->pollCount); + } + + private function createFlow(string $name, string $state): Flow + { + return new Flow([ + 'name' => $name, + 'state' => $state, + ]); + } +} diff --git a/tests/Unit/Brancher/TestSshPoller.php b/tests/Unit/Brancher/TestSshPoller.php new file mode 100644 index 0000000..11c084b --- /dev/null +++ b/tests/Unit/Brancher/TestSshPoller.php @@ -0,0 +1,73 @@ +currentMicrotime = $time; + } + + /** + * Advance microtime by a specific amount + */ + public function advanceMicrotime(float $seconds): void + { + $this->currentMicrotime += $seconds; + } + + public function poll(string $hostname): bool + { + $this->pollCount++; + if ($this->pollIndex >= count($this->pollResults)) { + return false; // Default to false if we run out of results + } + return $this->pollResults[$this->pollIndex++]; + } + + public function sleep(int $seconds): void + { + $this->sleepCount++; + $this->sleepCalls[] = $seconds; + + // Advance simulated time + if ($this->sleepTimeAdvance > 0) { + $this->currentMicrotime += $this->sleepTimeAdvance; + } else { + $this->currentMicrotime += $seconds; + } + // Don't actually sleep in tests + } + + public function microtime(): float + { + return $this->currentMicrotime; + } +}