diff --git a/e2e/test/hooks_test.go b/e2e/test/hooks_test.go index 8eda29567..14b2e7e56 100644 --- a/e2e/test/hooks_test.go +++ b/e2e/test/hooks_test.go @@ -159,6 +159,7 @@ var _ = Describe("Hooks E2E Tests", Label("hooks"), Ordered, func() { startHooksExporter("exporter-hooks-before-fail-endLease.yaml") out, err := Jmp("shell", "--client", "test-client-hooks", + "--retry-timeout", "0", "--selector", "example.com/board=hooks", "j", "power", "on") Expect(err).To(HaveOccurred()) Expect(out).To(MatchRegexp(`(beforeLease hook fail|Exporter shutting down|Connection to exporter lost)`)) @@ -171,6 +172,7 @@ var _ = Describe("Hooks E2E Tests", Label("hooks"), Ordered, func() { // First lease: shell should fail because beforeLease hook fails out, err := Jmp("shell", "--client", "test-client-hooks", + "--retry-timeout", "0", "--selector", "example.com/board=hooks", "j", "power", "on") Expect(err).To(HaveOccurred()) Expect(out).To(MatchRegexp(`(beforeLease hook fail|Connection to exporter lost)`)) @@ -181,6 +183,7 @@ var _ = Describe("Hooks E2E Tests", Label("hooks"), Ordered, func() { // Second lease: should also fail (hook still configured to fail), // but this proves the exporter accepted a new lease after recovery out2, err2 := Jmp("shell", "--client", "test-client-hooks", + "--retry-timeout", "0", "--selector", "example.com/board=hooks", "j", "power", "on") Expect(err2).To(HaveOccurred()) Expect(out2).To(MatchRegexp(`(beforeLease hook fail|Connection to exporter lost)`)) @@ -193,6 +196,7 @@ var _ = Describe("Hooks E2E Tests", Label("hooks"), Ordered, func() { startHooksExporter("exporter-hooks-before-fail-endLease-with-after.yaml") out, err := Jmp("shell", "--client", "test-client-hooks", + "--retry-timeout", "0", "--selector", "example.com/board=hooks", "j", "power", "on") Expect(err).To(HaveOccurred()) Expect(out).NotTo(ContainSubstring("AFTER_SHOULD_NOT_RUN")) @@ -205,6 +209,7 @@ var _ = Describe("Hooks E2E Tests", Label("hooks"), Ordered, func() { startHooksExporterSingle("exporter-hooks-before-fail-exit.yaml") out, err := Jmp("shell", "--client", "test-client-hooks", + "--retry-timeout", "0", "--selector", "example.com/board=hooks", "j", "power", "on") Expect(err).To(HaveOccurred()) Expect(out).To(MatchRegexp(`(beforeLease hook fail|Exporter shutting down|Connection to exporter lost)`)) @@ -239,6 +244,7 @@ var _ = Describe("Hooks E2E Tests", Label("hooks"), Ordered, func() { // Shell may succeed or fail; the key is the exporter exits _, _ = Jmp("shell", "--client", "test-client-hooks", + "--retry-timeout", "0", "--selector", "example.com/board=hooks", "j", "power", "on") Eventually(func() bool { @@ -360,6 +366,7 @@ print("PYTHON_HOOK: complete") startHooksExporter("exporter-hooks-none.yaml") out, err := RunCmd("timeout", "60", "jmp", "shell", + "--retry-timeout", "75", "--client", "test-client-hooks", "--selector", "example.com/board=hooks", "--duration", "10s", "--", "sleep", "30") @@ -372,6 +379,7 @@ print("PYTHON_HOOK: complete") startHooksExporter("exporter-hooks-slow-before.yaml") out, err := RunCmd("timeout", "60", "jmp", "shell", + "--retry-timeout", "75", "--client", "test-client-hooks", "--selector", "example.com/board=hooks", "--duration", "5s", "--", "sleep", "30") @@ -383,6 +391,7 @@ print("PYTHON_HOOK: complete") startHooksExporter("exporter-hooks-slow-before.yaml") out, err := RunCmd("timeout", "60", "jmp", "shell", + "--retry-timeout", "75", "--client", "test-client-hooks", "--selector", "example.com/board=hooks", "--duration", "12s", "--", "sleep", "30") @@ -411,6 +420,7 @@ print("PYTHON_HOOK: complete") startHooksExporterSingle("exporter-hooks-before-fail-exit-with-after.yaml") out, err := Jmp("shell", "--client", "test-client-hooks", + "--retry-timeout", "0", "--selector", "example.com/board=hooks", "j", "power", "on") Expect(err).To(HaveOccurred()) Expect(out).NotTo(ContainSubstring("AFTER_SHOULD_NOT_RUN")) diff --git a/python/packages/jumpstarter-cli/jumpstarter_cli/common.py b/python/packages/jumpstarter-cli/jumpstarter_cli/common.py index f634269ec..999df0431 100644 --- a/python/packages/jumpstarter-cli/jumpstarter_cli/common.py +++ b/python/packages/jumpstarter-cli/jumpstarter_cli/common.py @@ -145,6 +145,20 @@ def convert(self, value, param, ctx): ), ) +RETRY_TIMEOUT = DurationParamType(minimum=timedelta(seconds=0)) + +opt_retry_timeout = partial( + click.option, + "--retry-timeout", + "retry_timeout", + type=RETRY_TIMEOUT, + default=None, + help=( + "Override retry timeout for unreachable exporters (e.g., '5m', '30s', " + "'0' to disable). Env: JMP_RETRY_TIMEOUT. Default: 5m." + ), +) + opt_begin_time = click.option( "--begin-time", "begin_time", diff --git a/python/packages/jumpstarter-cli/jumpstarter_cli/shell.py b/python/packages/jumpstarter-cli/jumpstarter_cli/shell.py index f02fe5e9a..88004611c 100644 --- a/python/packages/jumpstarter-cli/jumpstarter_cli/shell.py +++ b/python/packages/jumpstarter-cli/jumpstarter_cli/shell.py @@ -1,6 +1,7 @@ import logging import os import sys +import time from contextlib import ExitStack from datetime import timedelta from types import SimpleNamespace @@ -23,12 +24,16 @@ ) from jumpstarter_cli_common.signal import signal_handler -from .common import opt_acquisition_timeout, opt_duration_partial, opt_exporter_name, opt_selector +from .common import opt_acquisition_timeout, opt_duration_partial, opt_exporter_name, opt_retry_timeout, opt_selector from .login import relogin_client from jumpstarter.client import DirectLease from jumpstarter.client.client import client_from_path from jumpstarter.common import HOOK_WARNING_PREFIX, ExporterStatus -from jumpstarter.common.exceptions import ConnectionError, ExporterOfflineError +from jumpstarter.common.exceptions import ( + ConnectionError, + ExporterOfflineError, + ExporterUnreachableError, +) from jumpstarter.common.utils import launch_shell from jumpstarter.config.client import ClientConfigV1Alpha1 from jumpstarter.config.env import JMP_LEASE @@ -41,7 +46,6 @@ _TOKEN_REFRESH_THRESHOLD_SECONDS = 120 - def _run_shell_only(lease, config, command, path: str) -> int: """Run just the shell command without log streaming.""" allow = config.drivers.allow if config is not None else getattr(lease, "allow", []) @@ -288,9 +292,14 @@ async def _run_shell_with_lease_async(lease, exporter_logs, config, command, can insecure=getattr(lease, "insecure", False), passphrase=getattr(lease, "passphrase", None), ) as client: - # Probe GetStatus before log stream so the server-side error - # from unsupported exporters is not streamed to the terminal. - await client.get_status_async() + try: + await client.get_status_async() + except grpc.aio.AioRpcError as e: + if e.code() in (grpc.StatusCode.UNAVAILABLE, grpc.StatusCode.DEADLINE_EXCEEDED): + raise ExporterUnreachableError( + f"Exporter {lease.exporter_name} did not respond to initial status check" + ) from e + raise # Start log streaming and status monitor together # The status monitor polls in the background for reliable status tracking @@ -424,7 +433,8 @@ async def _run_shell_with_lease_async(lease, exporter_logs, config, command, can async def _shell_with_signal_handling( # noqa: C901 - config, selector, exporter_name, lease_name, duration, exporter_logs, command, acquisition_timeout + config, selector, exporter_name, lease_name, duration, exporter_logs, command, acquisition_timeout, + retry_timeout=None, ): """Handle lease acquisition and shell execution with signal handling.""" exit_code = 0 @@ -447,23 +457,52 @@ async def _shell_with_signal_handling( # noqa: C901 try: try: async with anyio.from_thread.BlockingPortal() as portal: - async with config.lease_async( - selector, exporter_name, lease_name, duration, portal, acquisition_timeout - ) as lease: - lease_used = lease - - # Start token monitoring only once we're in the shell - tg.start_soon(_monitor_token_expiry, config, lease, tg.cancel_scope, token_state) - - exit_code = await _run_shell_with_lease_async( - lease, exporter_logs, config, command, tg.cancel_scope - ) - if lease.release and lease.name and token_state["expired_unrecovered"]: - _warn_about_expired_token(lease.name, selector) + connect_deadline = None + while True: + async with config.lease_async( + selector, exporter_name, lease_name, duration, portal, acquisition_timeout, + retry_timeout=retry_timeout, + ) as lease: + lease_used = lease + + # Start token monitoring only once we're in the shell + tg.start_soon(_monitor_token_expiry, config, lease, tg.cancel_scope, token_state) + + unreachable = None + try: + exit_code = await _run_shell_with_lease_async( + lease, exporter_logs, config, command, tg.cancel_scope + ) + except BaseExceptionGroup as eg: + unreachable = find_exception_in_group(eg, ExporterUnreachableError) + if unreachable is None: + raise + except ExporterUnreachableError as exc: + unreachable = exc + if unreachable is not None: + if connect_deadline is None: + connect_deadline = time.monotonic() + lease.retry_timeout + if time.monotonic() >= connect_deadline: + raise ExporterUnreachableError( + f"Exporter {lease.exporter_name} unreachable after " + f"{lease.retry_timeout:.0f}s of retrying" + ) from unreachable + logger.warning( + "Exporter %s is unreachable, releasing lease and retrying...", + lease.exporter_name, + ) + logger.debug("Unreachable cause: %s", unreachable) + continue # lease released by __aexit__, loop re-acquires + if lease.release and lease.name and token_state["expired_unrecovered"]: + _warn_about_expired_token(lease.name, selector) + break except BaseExceptionGroup as eg: for exc in eg.exceptions: if isinstance(exc, TimeoutError): raise exc from None + unreachable_exc = find_exception_in_group(eg, ExporterUnreachableError) + if unreachable_exc: + raise unreachable_exc from None offline_exc = find_exception_in_group(eg, ExporterOfflineError) if offline_exc: raise offline_exc from None @@ -580,9 +619,7 @@ async def _shell_direct_async( async with create_task_group() as tg: tg.start_soon(signal_handler, tg.cancel_scope) try: - exit_code = await _run_shell_with_lease_async( - lease, exporter_logs, config, command, tg.cancel_scope - ) + exit_code = await _run_shell_with_lease_async(lease, exporter_logs, config, command, tg.cancel_scope) except grpc.aio.AioRpcError as e: if e.code() == grpc.StatusCode.UNAUTHENTICATED: raise click.ClickException("Authentication failed: invalid or missing passphrase") from None @@ -607,6 +644,7 @@ async def _shell_direct_async( @opt_duration_partial(default=timedelta(minutes=30), show_default="00:30:00") @click.option("--exporter-logs", is_flag=True, help="Enable exporter log streaming") @opt_acquisition_timeout() +@opt_retry_timeout() # direct connection (no controller) @click.option( "--tls-grpc", @@ -637,6 +675,7 @@ def shell( duration, exporter_logs, acquisition_timeout, + retry_timeout, tls_grpc_address, tls_grpc_insecure, passphrase, @@ -685,6 +724,7 @@ def shell( exporter_logs, command, acquisition_timeout, + retry_timeout, ) sys.exit(exit_code) diff --git a/python/packages/jumpstarter-cli/jumpstarter_cli/shell_test.py b/python/packages/jumpstarter-cli/jumpstarter_cli/shell_test.py index 12bcd18ee..59779487f 100644 --- a/python/packages/jumpstarter-cli/jumpstarter_cli/shell_test.py +++ b/python/packages/jumpstarter-cli/jumpstarter_cli/shell_test.py @@ -27,7 +27,7 @@ from jumpstarter.client.grpc import Lease, LeaseList from jumpstarter.common import ExporterStatus -from jumpstarter.common.exceptions import ExporterOfflineError +from jumpstarter.common.exceptions import ExporterOfflineError, ExporterUnreachableError from jumpstarter.config.client import ClientConfigV1Alpha1 from jumpstarter.config.env import JMP_LEASE @@ -64,9 +64,14 @@ def __init__(self): self.token = None @asynccontextmanager - async def lease_async(self, selector, exporter_name, lease_name, duration, portal, acquisition_timeout): + async def lease_async( + self, selector, exporter_name, lease_name, duration, portal, + acquisition_timeout, retry_timeout=None, + ): self.captured = (selector, exporter_name, lease_name, duration, acquisition_timeout) - yield Mock() + m = Mock() + m.retry_timeout = 0.0 + yield m def test_shell_passes_exporter_name_to_lease_async(): @@ -99,11 +104,15 @@ async def test_shell_warns_when_expired_token_prevents_cleanup_on_normal_exit(): lease.name = "expired-lease" lease.lease_ended = False lease.lease_transferred = False + lease.retry_timeout = 0.0 config = _DummyConfig() @asynccontextmanager - async def lease_async(selector, exporter_name, lease_name, duration, portal, acquisition_timeout): + async def lease_async( + selector, exporter_name, lease_name, duration, portal, + acquisition_timeout, retry_timeout=None, + ): yield lease config.lease_async = lease_async @@ -150,6 +159,7 @@ def test_shell_requires_selector_or_name_when_no_leases(): duration=timedelta(minutes=1), exporter_logs=False, acquisition_timeout=None, + retry_timeout=None, tls_grpc_address=None, tls_grpc_insecure=False, passphrase=None, @@ -170,6 +180,7 @@ def test_shell_allows_existing_lease_name_without_selector_or_name(): duration=timedelta(minutes=1), exporter_logs=False, acquisition_timeout=None, + retry_timeout=None, tls_grpc_address=None, tls_grpc_insecure=False, passphrase=None, @@ -194,6 +205,7 @@ def test_shell_auto_connects_single_lease(): duration=timedelta(minutes=1), exporter_logs=False, acquisition_timeout=None, + retry_timeout=None, tls_grpc_address=None, tls_grpc_insecure=False, passphrase=None, @@ -221,6 +233,7 @@ def test_shell_no_leases_shows_guidance(): duration=timedelta(minutes=1), exporter_logs=False, acquisition_timeout=None, + retry_timeout=None, tls_grpc_address=None, tls_grpc_insecure=False, passphrase=None, @@ -261,6 +274,7 @@ def test_shell_multi_lease_no_tty_error(): duration=timedelta(minutes=1), exporter_logs=False, acquisition_timeout=None, + retry_timeout=None, tls_grpc_address=None, tls_grpc_insecure=False, passphrase=None, @@ -296,6 +310,7 @@ def test_shell_no_own_leases_among_others(): duration=timedelta(minutes=1), exporter_logs=False, acquisition_timeout=None, + retry_timeout=None, tls_grpc_address=None, tls_grpc_insecure=False, passphrase=None, @@ -317,6 +332,7 @@ def test_shell_allows_env_lease_without_selector_or_name(): duration=timedelta(minutes=1), exporter_logs=False, acquisition_timeout=None, + retry_timeout=None, tls_grpc_address=None, tls_grpc_insecure=False, passphrase=None, @@ -922,58 +938,208 @@ async def fake_client_from_path(*_a, **_kw): assert not monitor._connection_lost -class TestShellWithSignalHandlingLeaseTimeout: - async def test_exits_gracefully_when_lease_ended_and_exception_group(self): - """BaseExceptionGroup with lease_ended=True should produce exit code 0.""" +class TestShellWithSignalHandlingExceptionGroup: + """Tests for _shell_with_signal_handling's outer BaseExceptionGroup handler. + + This handler catches exceptions from the lease context — + i.e. unrecoverable task-group failures. We patch _run_shell_with_lease_async + directly to isolate this layer. + """ + + def _make_config_with_lease(self, lease): + config = _DummyConfig() + + @asynccontextmanager + async def lease_async( + selector, exporter_name, lease_name, duration, portal, + acquisition_timeout, retry_timeout=None, + ): + yield lease + + config.lease_async = lease_async + return config + + async def test_exits_gracefully_when_lease_ended(self): + """When the lease expired naturally, a task-group failure exits with code 0.""" lease = Mock() lease.release = True lease.name = "timeout-lease" lease.lease_ended = True lease.lease_transferred = False + config = self._make_config_with_lease(lease) + + async def fake_run(*_): + raise BaseExceptionGroup("test", [RuntimeError("simulated cancellation")]) + + with ( + patch("jumpstarter_cli.shell._monitor_token_expiry", new_callable=AsyncMock), + patch("jumpstarter_cli.shell._run_shell_with_lease_async", side_effect=fake_run), + ): + exit_code = await _shell_with_signal_handling( + config, None, None, None, timedelta(minutes=1), False, (), None + ) + + assert exit_code == 0 + + async def test_raises_offline_error_when_lease_active_and_task_group_fails(self): + """An active lease that hits a task-group failure surfaces ExporterOfflineError. + + _shell_with_signal_handling's outer create_task_group always wraps the raised + exception in a BaseExceptionGroup (anyio behaviour). The caller + (handle_exceptions_with_reauthentication) unwraps it for the user, so we check + that ExporterOfflineError is present inside the group. + """ + from jumpstarter_cli_common.exceptions import find_exception_in_group + + lease = Mock() + lease.release = True + lease.name = "active-lease" + lease.lease_ended = False + lease.lease_transferred = False + config = self._make_config_with_lease(lease) + + async def fake_run(*_): + raise BaseExceptionGroup("test", [RuntimeError("connection broken")]) + + with ( + patch("jumpstarter_cli.shell._monitor_token_expiry", new_callable=AsyncMock), + patch("jumpstarter_cli.shell._run_shell_with_lease_async", side_effect=fake_run), + ): + with pytest.raises(BaseExceptionGroup) as exc_info: + await _shell_with_signal_handling( + config, None, None, None, timedelta(minutes=1), False, (), None + ) + + assert isinstance(exc_info.value, BaseExceptionGroup) + offline_exc = find_exception_in_group(exc_info.value, ExporterOfflineError) + assert offline_exc is not None + assert "Connection to exporter lost" in str(offline_exc) + + +class TestRetryLoopTimeout: + """Tests for the retry_timeout bounded retry in _shell_with_signal_handling.""" + + async def test_retries_then_raises_on_timeout(self): + """ExporterUnreachableError retries until retry_timeout expires, then raises.""" + lease = Mock() + lease.release = True + lease.name = "test-lease" + lease.exporter_name = "test-exporter" + lease.retry_timeout = 0.3 + lease.lease_ended = False + lease.lease_transferred = False config = _DummyConfig() + state = {"call_count": 0} @asynccontextmanager - async def lease_async(selector, exporter_name, lease_name, duration, portal, acquisition_timeout): + async def lease_async( + selector, exporter_name, lease_name, duration, portal, + acquisition_timeout, retry_timeout=None, + ): yield lease config.lease_async = lease_async - async def fake_run_shell(*_args): - raise BaseExceptionGroup("test", [RuntimeError("simulated cancellation")]) + async def fake_run(*_): + state["call_count"] += 1 + raise ExporterUnreachableError("exporter dead") + + with ( + patch("jumpstarter_cli.shell._monitor_token_expiry", new_callable=AsyncMock), + patch("jumpstarter_cli.shell._run_shell_with_lease_async", side_effect=fake_run), + ): + with pytest.raises((ExporterUnreachableError, BaseExceptionGroup)) as exc_info: + await _shell_with_signal_handling( + config, None, None, None, timedelta(minutes=1), False, (), None + ) + + exc = exc_info.value + if isinstance(exc, BaseExceptionGroup): + from jumpstarter_cli_common.exceptions import find_exception_in_group + + exc = find_exception_in_group(exc, ExporterUnreachableError) + assert exc is not None + assert "after 0s of retrying" in str(exc) + assert state["call_count"] >= 1 + + async def test_retries_when_wrapped_in_exception_group(self): + """ExporterUnreachableError wrapped in BaseExceptionGroup is also retried.""" + lease = Mock() + lease.release = True + lease.name = "test-lease" + lease.exporter_name = "test-exporter" + lease.retry_timeout = 10.0 + lease.lease_ended = False + lease.lease_transferred = False + + config = _DummyConfig() + state = {"call_count": 0} + + @asynccontextmanager + async def lease_async( + selector, exporter_name, lease_name, duration, portal, + acquisition_timeout, retry_timeout=None, + ): + yield lease + + config.lease_async = lease_async + + async def fake_run(*_): + state["call_count"] += 1 + if state["call_count"] < 3: + raise BaseExceptionGroup( + "task group", [ExporterUnreachableError("exporter dead")] + ) + return 0 with ( patch("jumpstarter_cli.shell._monitor_token_expiry", new_callable=AsyncMock), - patch("jumpstarter_cli.shell._run_shell_with_lease_async", side_effect=fake_run_shell), + patch("jumpstarter_cli.shell._run_shell_with_lease_async", side_effect=fake_run), ): exit_code = await _shell_with_signal_handling( config, None, None, None, timedelta(minutes=1), False, (), None ) assert exit_code == 0 + assert state["call_count"] == 3 + + async def test_retry_succeeds_before_timeout(self): + """ExporterUnreachableError retries and succeeds when exporter comes back.""" + state = {"call_count": 0} - async def test_raises_offline_error_when_lease_not_ended_and_exception_group(self): - """BaseExceptionGroup with lease_ended=False should raise ExporterOfflineError.""" lease = Mock() lease.release = True - lease.name = "active-lease" + lease.name = "test-lease" + lease.exporter_name = "test-exporter" + lease.retry_timeout = 10.0 lease.lease_ended = False lease.lease_transferred = False config = _DummyConfig() @asynccontextmanager - async def lease_async(selector, exporter_name, lease_name, duration, portal, acquisition_timeout): + async def lease_async( + selector, exporter_name, lease_name, duration, portal, + acquisition_timeout, retry_timeout=None, + ): yield lease config.lease_async = lease_async - async def fake_run_shell(*_args): - raise BaseExceptionGroup("test", [RuntimeError("connection broken")]) + async def fake_run(*_): + state["call_count"] += 1 + if state["call_count"] < 3: + raise ExporterUnreachableError("exporter dead") + return 0 with ( patch("jumpstarter_cli.shell._monitor_token_expiry", new_callable=AsyncMock), - patch("jumpstarter_cli.shell._run_shell_with_lease_async", side_effect=fake_run_shell), + patch("jumpstarter_cli.shell._run_shell_with_lease_async", side_effect=fake_run), ): - with pytest.raises((ExporterOfflineError, BaseExceptionGroup)): - await _shell_with_signal_handling(config, None, None, None, timedelta(minutes=1), False, (), None) + exit_code = await _shell_with_signal_handling( + config, None, None, None, timedelta(minutes=1), False, (), None + ) + + assert exit_code == 0 + assert state["call_count"] == 3 diff --git a/python/packages/jumpstarter/jumpstarter/client/client.py b/python/packages/jumpstarter/jumpstarter/client/client.py index 980f2da39..2e197d78b 100644 --- a/python/packages/jumpstarter/jumpstarter/client/client.py +++ b/python/packages/jumpstarter/jumpstarter/client/client.py @@ -8,11 +8,12 @@ import grpc from anyio.from_thread import BlockingPortal from google.protobuf import empty_pb2 +from grpc.aio import AioRpcError from .grpc import MultipathExporterStub from jumpstarter.client import DriverClient from jumpstarter.client.base import StubDriverClient -from jumpstarter.common.exceptions import MissingDriverError +from jumpstarter.common.exceptions import ExporterUnreachableError, MissingDriverError from jumpstarter.common.grpc import _override_default_grpc_options, aio_secure_channel, ssl_channel_credentials from jumpstarter.common.importlib import import_class from jumpstarter.config.tls import TLSConfigV1Alpha1 @@ -59,6 +60,16 @@ async def client_from_path( interceptors = passphrase_client_interceptors(passphrase) + async def _connect(channel): + try: + return await client_from_channel(channel, portal, stack, allow, unsafe) + except AioRpcError as e: + if e.code() in (grpc.StatusCode.UNAVAILABLE, grpc.StatusCode.DEADLINE_EXCEEDED): + raise ExporterUnreachableError( + "Exporter did not respond to initial connection" + ) from e + raise + path = str(path) if _is_tcp_address(path): if insecure: @@ -67,19 +78,17 @@ async def client_from_path( options=_override_default_grpc_options(grpc_options), interceptors=interceptors, ) as channel: - yield await client_from_channel(channel, portal, stack, allow, unsafe) + yield await _connect(channel) else: tls = tls_config or TLSConfigV1Alpha1() credentials = await ssl_channel_credentials(path, tls) - async with aio_secure_channel( - path, credentials, grpc_options, interceptors=interceptors - ) as channel: - yield await client_from_channel(channel, portal, stack, allow, unsafe) + async with aio_secure_channel(path, credentials, grpc_options, interceptors=interceptors) as channel: + yield await _connect(channel) else: async with grpc.aio.secure_channel( f"unix://{path}", grpc.local_channel_credentials(grpc.LocalConnectionType.UDS) ) as channel: - yield await client_from_channel(channel, portal, stack, allow, unsafe) + yield await _connect(channel) async def client_from_channel( diff --git a/python/packages/jumpstarter/jumpstarter/client/client_test.py b/python/packages/jumpstarter/jumpstarter/client/client_test.py index b8de573bb..d6a4ccbb2 100644 --- a/python/packages/jumpstarter/jumpstarter/client/client_test.py +++ b/python/packages/jumpstarter/jumpstarter/client/client_test.py @@ -4,9 +4,12 @@ from unittest.mock import AsyncMock, MagicMock, patch from uuid import UUID, uuid4 +import grpc import pytest +from grpc.aio import AioRpcError -from jumpstarter.client.client import _is_tcp_address +from jumpstarter.client.client import _is_tcp_address, client_from_path +from jumpstarter.common.exceptions import ExporterUnreachableError pytestmark = pytest.mark.anyio @@ -282,3 +285,68 @@ async def test_client_from_channel_passes_description(self) -> None: assert client.description == "Test driver description" assert client.methods_description == {"method1": "Does something"} + + +class MockAioRpcError(AioRpcError): + def __init__(self, status_code, message=""): + self._status_code = status_code + self._message = message + self._code = status_code + self._details = message + self._debug_error_string = "" + + def code(self): + return self._status_code + + def details(self): + return self._message + + +class TestClientFromPathExporterUnreachable: + """Tests for client_from_path converting gRPC errors to ExporterUnreachableError.""" + + async def test_unavailable_raises_exporter_unreachable(self): + """GetReport failing with UNAVAILABLE is converted to ExporterUnreachableError.""" + mock_portal = MagicMock() + mock_stack = ExitStack() + + with patch( + "jumpstarter.client.client.client_from_channel", + side_effect=MockAioRpcError(grpc.StatusCode.UNAVAILABLE, "connection refused"), + ): + with pytest.raises(ExporterUnreachableError, match="did not respond"): + async with client_from_path( + "/tmp/test.sock", mock_portal, mock_stack, allow=[], unsafe=True + ): + pass + + async def test_deadline_exceeded_raises_exporter_unreachable(self): + """GetReport failing with DEADLINE_EXCEEDED is converted to ExporterUnreachableError.""" + mock_portal = MagicMock() + mock_stack = ExitStack() + + with patch( + "jumpstarter.client.client.client_from_channel", + side_effect=MockAioRpcError(grpc.StatusCode.DEADLINE_EXCEEDED, "timed out"), + ): + with pytest.raises(ExporterUnreachableError, match="did not respond"): + async with client_from_path( + "/tmp/test.sock", mock_portal, mock_stack, allow=[], unsafe=True + ): + pass + + async def test_other_grpc_errors_propagate_unchanged(self): + """Non-connection gRPC errors are not converted to ExporterUnreachableError.""" + mock_portal = MagicMock() + mock_stack = ExitStack() + + with patch( + "jumpstarter.client.client.client_from_channel", + side_effect=MockAioRpcError(grpc.StatusCode.INTERNAL, "internal error"), + ): + with pytest.raises(MockAioRpcError): + async with client_from_path( + "/tmp/test.sock", mock_portal, mock_stack, allow=[], unsafe=True + ): + pass + diff --git a/python/packages/jumpstarter/jumpstarter/client/lease.py b/python/packages/jumpstarter/jumpstarter/client/lease.py index fe16c84ad..7c899621e 100644 --- a/python/packages/jumpstarter/jumpstarter/client/lease.py +++ b/python/packages/jumpstarter/jumpstarter/client/lease.py @@ -94,6 +94,7 @@ class Lease(ContextManagerMixin, AsyncContextManagerMixin): client_name: str | None = None # Name of the current client, used for ownership validation acquisition_timeout: int = field(default=7200) # Timeout in seconds for lease acquisition, polled in 5s intervals dial_timeout: float = field(default=30.0) # Timeout in seconds for Dial retry loop when exporter not ready + retry_timeout: float = field(default=300.0) # Retry timeout for unreachable exporter (0 to disable) exporter_name: str = field(default="remote", init=False) # Populated during acquisition exporter_labels: dict[str, str] = field(default_factory=dict, init=False) # Populated during acquisition lease_ending_callback: Callable[[Self, timedelta], None] | None = field( @@ -348,7 +349,7 @@ async def handle_async(self, stream): attempt + 1, ) raise - delay = min(base_delay * (2**attempt), max_delay, remaining) + delay = min(base_delay * (2 ** min(attempt, 10)), max_delay, remaining) logger.debug( "Exporter not ready, retrying Dial in %.1fs (attempt %d, %.1fs remaining)", delay, @@ -367,7 +368,7 @@ async def handle_async(self, stream): attempt + 1, ) raise - delay = min(base_delay * (2**attempt), max_delay, remaining) + delay = min(base_delay * (2 ** min(attempt, 10)), max_delay, remaining) logger.warning( "Exporter unavailable, retrying Dial in %.1fs (attempt %d, %.1fs remaining)", delay, diff --git a/python/packages/jumpstarter/jumpstarter/client/lease_test.py b/python/packages/jumpstarter/jumpstarter/client/lease_test.py index dffd6a288..e031c17a3 100644 --- a/python/packages/jumpstarter/jumpstarter/client/lease_test.py +++ b/python/packages/jumpstarter/jumpstarter/client/lease_test.py @@ -19,6 +19,9 @@ class MockAioRpcError(AioRpcError): def __init__(self, status_code, message=""): self._status_code = status_code self._message = message + self._code = status_code + self._details = message + self._debug_error_string = "" def code(self): return self._status_code diff --git a/python/packages/jumpstarter/jumpstarter/common/exceptions.py b/python/packages/jumpstarter/jumpstarter/common/exceptions.py index 830572811..b939cdbcc 100644 --- a/python/packages/jumpstarter/jumpstarter/common/exceptions.py +++ b/python/packages/jumpstarter/jumpstarter/common/exceptions.py @@ -51,6 +51,15 @@ class ExporterOfflineError(ConnectionError): pass +class ExporterUnreachableError(JumpstarterException): + """Raised when an exporter does not respond to the initial connection probe. + + Signals that the lease should be released and re-acquired. + """ + + pass + + class ConfigurationError(JumpstarterException): """Raised when a configuration error exists.""" diff --git a/python/packages/jumpstarter/jumpstarter/config/client.py b/python/packages/jumpstarter/jumpstarter/config/client.py index 96ec0909a..c25acf34c 100644 --- a/python/packages/jumpstarter/jumpstarter/config/client.py +++ b/python/packages/jumpstarter/jumpstarter/config/client.py @@ -24,7 +24,7 @@ from pydantic_settings import BaseSettings, NoDecode, SettingsConfigDict from .common import CONFIG_PATH, ObjectMeta -from .env import JMP_LEASE +from .env import JMP_LEASE, JMP_RETRY_TIMEOUT from .grpc import call_credentials from .shell import ShellConfigV1Alpha1 from .tls import TLSConfigV1Alpha1 @@ -105,6 +105,11 @@ class ClientConfigV1Alpha1Lease(BaseSettings): gt=0, exclude=True, # Internal field, not serialized to config files ) + retry_timeout: float = Field( + default=300.0, + description="Timeout in seconds for retrying when exporter is unreachable (0 to disable)", + ge=0, + ) class ClientConfigV1Alpha1(BaseSettings): @@ -314,6 +319,7 @@ async def lease_async( duration: timedelta, portal: BlockingPortal, acquisition_timeout: timedelta | None = None, + retry_timeout: timedelta | None = None, ): from jumpstarter.client import Lease @@ -328,6 +334,11 @@ async def lease_async( if acquisition_timeout is not None else self.leases.acquisition_timeout ) + retry_timeout_seconds = ( + retry_timeout.total_seconds() + if retry_timeout is not None + else float(os.environ.get(JMP_RETRY_TIMEOUT, self.leases.retry_timeout)) + ) async with Lease( channel=await self.channel(), namespace=self.metadata.namespace, @@ -344,6 +355,7 @@ async def lease_async( client_name=self.metadata.name, acquisition_timeout=acquisition_timeout_seconds, dial_timeout=self.leases.dial_timeout, + retry_timeout=retry_timeout_seconds, ) as lease: yield lease diff --git a/python/packages/jumpstarter/jumpstarter/config/client_config_test.py b/python/packages/jumpstarter/jumpstarter/config/client_config_test.py index 6f502179c..f97accbb9 100644 --- a/python/packages/jumpstarter/jumpstarter/config/client_config_test.py +++ b/python/packages/jumpstarter/jumpstarter/config/client_config_test.py @@ -332,6 +332,7 @@ def test_client_config_save_custom_lease_timeout(): use_profiles: false leases: acquisition_timeout: 3600 + retry_timeout: 300.0 """ config = ClientConfigV1Alpha1( alias="testclient", diff --git a/python/packages/jumpstarter/jumpstarter/config/env.py b/python/packages/jumpstarter/jumpstarter/config/env.py index 363f6feab..ab820cd99 100644 --- a/python/packages/jumpstarter/jumpstarter/config/env.py +++ b/python/packages/jumpstarter/jumpstarter/config/env.py @@ -16,3 +16,4 @@ JMP_GRPC_INSECURE = "JMP_GRPC_INSECURE" JUMPSTARTER_GRPC_INSECURE = "JUMPSTARTER_GRPC_INSECURE" JMP_GRPC_PASSPHRASE = "JMP_GRPC_PASSPHRASE" +JMP_RETRY_TIMEOUT = "JMP_RETRY_TIMEOUT" diff --git a/python/packages/jumpstarter/jumpstarter/exporter/hooks.py b/python/packages/jumpstarter/jumpstarter/exporter/hooks.py index 30dc9a0e0..5564cea11 100644 --- a/python/packages/jumpstarter/jumpstarter/exporter/hooks.py +++ b/python/packages/jumpstarter/jumpstarter/exporter/hooks.py @@ -686,11 +686,6 @@ async def run_before_lease_hook( ExporterStatus.BEFORE_LEASE_HOOK_FAILED, f"beforeLease hook failed (on_failure=exit, shutting down): {e}", ) - await report_status( - ExporterStatus.OFFLINE, - "Exporter shutting down due to beforeLease hook failure", - ) - # Defer shutdown: sets _stop_requested=True, actual stop after lease cleanup shutdown(exit_code=1, wait_for_lease_exit=True, should_unregister=True) else: # on_failure='endLease' - report failure, release in finally block diff --git a/python/packages/jumpstarter/jumpstarter/exporter/hooks_test.py b/python/packages/jumpstarter/jumpstarter/exporter/hooks_test.py index 5a5f6b4b0..631bdb7b4 100644 --- a/python/packages/jumpstarter/jumpstarter/exporter/hooks_test.py +++ b/python/packages/jumpstarter/jumpstarter/exporter/hooks_test.py @@ -1261,8 +1261,9 @@ async def test_before_hook_exit_reports_failed_not_available(self, lease_scope) """Issue E2: beforeLease fail+exit should report FAILED, not AVAILABLE. When beforeLease hook fails with on_failure=exit, the last status must be - BEFORE_LEASE_HOOK_FAILED. It should NOT report AVAILABLE, which would - incorrectly tell the controller the exporter is ready for new leases. + BEFORE_LEASE_HOOK_FAILED so the client can detect hook failure reliably. + It should NOT report AVAILABLE or OFFLINE, which would clobber the + failure status before the client can poll it. """ hook_config = HookConfigV1Alpha1( before_lease=HookInstanceConfigV1Alpha1(script="exit 1", timeout=10, on_failure="exit"), @@ -1282,16 +1283,9 @@ async def mock_report_status(status, msg): mock_shutdown, ) - # Last status should be OFFLINE (reported before shutdown to prevent new leases) last_status, _ = status_calls[-1] - assert last_status == ExporterStatus.OFFLINE, ( - f"Expected last status to be OFFLINE, got {last_status}" - ) - - # BEFORE_LEASE_HOOK_FAILED should also be present (reported before OFFLINE) - failed_statuses = [s for s, _ in status_calls if s == ExporterStatus.BEFORE_LEASE_HOOK_FAILED] - assert len(failed_statuses) > 0, ( - f"Expected BEFORE_LEASE_HOOK_FAILED status, got: {status_calls}" + assert last_status == ExporterStatus.BEFORE_LEASE_HOOK_FAILED, ( + f"Expected last status to be BEFORE_LEASE_HOOK_FAILED, got {last_status}" ) # AVAILABLE should never have been reported @@ -1379,11 +1373,12 @@ async def mock_report_status(status, msg): f"Expected LEASE_READY message to start with '{HOOK_WARNING_PREFIX}', got: '{msg}'" ) - async def test_before_hook_exit_reports_offline_before_shutdown(self, lease_scope) -> None: + async def test_before_hook_exit_reports_failed_before_shutdown(self, lease_scope) -> None: """When beforeLease hook fails with on_failure=exit, the exporter must - report OFFLINE status to the controller before initiating shutdown. - This prevents the controller from assigning new leases to a dying - exporter during the shutdown window. + report BEFORE_LEASE_HOOK_FAILED before initiating shutdown. OFFLINE is + NOT reported here — the shutdown path handles controller notification + during unregistration, avoiding a race where OFFLINE overwrites the + failure status before the client can poll it. """ hook_config = HookConfigV1Alpha1( before_lease=HookInstanceConfigV1Alpha1(script="exit 1", timeout=10, on_failure="exit"), @@ -1406,17 +1401,22 @@ def mock_shutdown(**kwargs): mock_shutdown, ) - offline_indices = [ - i for i, (s, _) in enumerate(status_calls) if s == ExporterStatus.OFFLINE + failed_indices = [ + i for i, (s, _) in enumerate(status_calls) if s == ExporterStatus.BEFORE_LEASE_HOOK_FAILED ] - assert len(offline_indices) > 0, ( - f"Expected OFFLINE status before shutdown, got: {status_calls}" + assert len(failed_indices) > 0, ( + f"Expected BEFORE_LEASE_HOOK_FAILED status before shutdown, got: {status_calls}" ) assert shutdown_called_at_index is not None, "shutdown was never called" - assert offline_indices[0] < shutdown_called_at_index, ( - f"OFFLINE (index {offline_indices[0]}) must be reported before " + assert failed_indices[0] < shutdown_called_at_index, ( + f"BEFORE_LEASE_HOOK_FAILED (index {failed_indices[0]}) must be reported before " f"shutdown (index {shutdown_called_at_index}). Statuses: {status_calls}" ) + offline_statuses = [s for s, _ in status_calls if s == ExporterStatus.OFFLINE] + assert len(offline_statuses) == 0, ( + f"OFFLINE should NOT be reported by hook handler (shutdown path handles it), " + f"got: {status_calls}" + ) async def test_after_hook_exit_reports_offline_before_shutdown(self, lease_scope) -> None: """When afterLease hook fails with on_failure=exit, OFFLINE must be