From 2957c3dd637190aa8339f101cb03fc099905647f Mon Sep 17 00:00:00 2001 From: Paul Wallrabe Date: Wed, 24 Jun 2026 11:36:28 +0200 Subject: [PATCH 01/16] fix(hooks): eliminate macOS PTY output race condition Replace start_new_session=True with process_group=0 to prevent macOS PTY revocation on subprocess exit. Restructure the reader loop to always attempt os.read() before checking the stop flag, preventing event-loop scheduling starvation from skipping all reads. These two changes address the root cause of the flaky macOS PTY tests (#560, #733, #821, #826) rather than the symptoms. The drain retry logic from #826 is no longer needed and is removed. Closes #821 Co-Authored-By: Claude Opus 4.6 (1M context) --- .../jumpstarter/jumpstarter/exporter/hooks.py | 40 +----- .../jumpstarter/exporter/hooks_test.py | 127 ------------------ 2 files changed, 6 insertions(+), 161 deletions(-) diff --git a/python/packages/jumpstarter/jumpstarter/exporter/hooks.py b/python/packages/jumpstarter/jumpstarter/exporter/hooks.py index 30dc9a0e0..4be6a9f5f 100644 --- a/python/packages/jumpstarter/jumpstarter/exporter/hooks.py +++ b/python/packages/jumpstarter/jumpstarter/exporter/hooks.py @@ -23,8 +23,6 @@ MAX_DRAIN_BYTES = 256 * 1024 DRAIN_TIMEOUT_SECONDS = 2.0 -DRAIN_MAX_EMPTY_POLLS = 10 - # Module-level reference to time.monotonic so tests can patch it without # affecting the asyncio event loop (which also uses time.monotonic). _monotonic = time.monotonic @@ -305,7 +303,7 @@ async def _execute_hook_process( # noqa: C901 stdout=child_fd, stderr=child_fd, env=hook_env, - start_new_session=True, # Equivalent to os.setsid() + process_group=0, close_fds=True, # Close inherited fds to prevent interference with gRPC connections ) except Exception as e: @@ -335,20 +333,12 @@ async def read_pty_output() -> None: # noqa: C901 start_time = _monotonic() try: - while not pty_state.reader_stop: + while True: try: - # Wait for fd to be readable with timeout with anyio.move_on_after(0.1): await anyio.wait_readable(parent_fd) - # Check stop flag immediately after timeout - # (main task may have signaled us to stop) - if pty_state.reader_stop: - logger.debug("read_pty_output: stop flag set, exiting") - break - read_count += 1 - # Log heartbeat every 2 seconds elapsed = _monotonic() - start_time if elapsed - last_heartbeat >= 2.0: logger.debug( @@ -356,27 +346,24 @@ async def read_pty_output() -> None: # noqa: C901 ) last_heartbeat = elapsed - # Read available data (non-blocking) try: chunk = os.read(parent_fd, 4096) if not chunk: - # EOF logger.debug("read_pty_output: EOF received") break buffer += chunk except BlockingIOError: - # No data available right now, continue loop + if pty_state.reader_stop: + logger.debug("read_pty_output: stop flag set and no data, exiting") + break continue except OSError as e: - # PTY closed or error logger.debug("read_pty_output: OSError on read: %s", e) break - # Process complete lines buffer = _flush_lines(buffer, output_lines) except OSError as e: - # PTY closed or read error logger.debug("read_pty_output: OSError in loop: %s", e) break finally: @@ -392,12 +379,7 @@ async def read_pty_output() -> None: # noqa: C901 try: drain_deadline = _monotonic() + DRAIN_TIMEOUT_SECONDS drained = 0 - consecutive_empty = 0 while drained < MAX_DRAIN_BYTES and _monotonic() < drain_deadline: - # Poll for readability with a short timeout. - # This avoids the race where a non-blocking read - # raises BlockingIOError because the macOS PTY - # kernel buffer hasn't delivered the data yet. remaining = drain_deadline - _monotonic() if remaining <= 0: break @@ -405,19 +387,9 @@ async def read_pty_output() -> None: # noqa: C901 try: readable, _, _ = select.select([parent_fd], [], [], timeout_s) except (ValueError, OSError): - # fd closed or invalid break if not readable: - # On macOS, data may not be available on the - # first select() call even though the subprocess - # has already written and exited. Keep retrying - # until we see several consecutive empty polls, - # which indicates the buffer is truly drained. - consecutive_empty += 1 - if consecutive_empty >= DRAIN_MAX_EMPTY_POLLS: - break - continue - consecutive_empty = 0 + break try: chunk = os.read(parent_fd, 4096) if not chunk: diff --git a/python/packages/jumpstarter/jumpstarter/exporter/hooks_test.py b/python/packages/jumpstarter/jumpstarter/exporter/hooks_test.py index 5a5f6b4b0..d1bd49e4b 100644 --- a/python/packages/jumpstarter/jumpstarter/exporter/hooks_test.py +++ b/python/packages/jumpstarter/jumpstarter/exporter/hooks_test.py @@ -8,7 +8,6 @@ from jumpstarter.common import HOOK_WARNING_PREFIX, ExporterStatus from jumpstarter.config.exporter import HookConfigV1Alpha1, HookInstanceConfigV1Alpha1 from jumpstarter.exporter.hooks import ( - DRAIN_MAX_EMPTY_POLLS, DRAIN_TIMEOUT_SECONDS, MAX_DRAIN_BYTES, HookExecutionError, @@ -954,135 +953,9 @@ def flush_lines_with_drain_error(buffer, output_lines): result = await executor.execute_before_lease_hook(lease_scope) assert result is None - @macos_pty_xfail - async def test_drain_retries_empty_select_then_captures_data(self, lease_scope) -> None: - """Verify that the drain retries after empty select() calls and still - captures data that arrives later. - - Patches select.select to return empty for the first N calls (where - N < DRAIN_MAX_EMPTY_POLLS), then reports the fd as readable. The - hook output should still be captured despite the initial empty polls. - """ - import select as select_mod - - original_select = select_mod.select - state = _PtyTracker() - empty_count = 0 - empties_before_data = DRAIN_MAX_EMPTY_POLLS - 2 # e.g. 8 empties then data - - def select_with_delayed_ready(rlist, wlist, xlist, timeout=None): - nonlocal empty_count - if state.eof_seen and rlist and rlist[0] == state.parent_fd: - empty_count += 1 - if empty_count <= empties_before_data: - return ([], [], []) # simulate delayed data - return original_select(rlist, wlist, xlist, timeout) - - hook_config = HookConfigV1Alpha1( - before_lease=HookInstanceConfigV1Alpha1( - script="echo DELAYED_DRAIN_OK", timeout=10, - ), - ) - executor = HookExecutor(config=hook_config) - - with ( - patch("pty.openpty", side_effect=state.tracking_openpty), - patch("os.read", side_effect=state.os_read_with_drain_data), - patch("jumpstarter.exporter.hooks.select.select", side_effect=select_with_delayed_ready), - patch("jumpstarter.exporter.hooks.logger") as mock_logger, - ): - result = await executor.execute_before_lease_hook(lease_scope) - assert result is None - info_calls = [str(call) for call in mock_logger.info.call_args_list] - assert any("DELAYED_DRAIN_OK" in call for call in info_calls) - - @macos_pty_xfail - async def test_drain_terminates_after_max_empty_polls(self, lease_scope) -> None: - """Verify the drain loop terminates after DRAIN_MAX_EMPTY_POLLS - consecutive empty select() results. - - Patches select.select to always return empty during the drain phase. - The hook should still complete (no hang) and the drain data should - not appear since it's never read. - """ - import select as select_mod - - original_select = select_mod.select - state = _PtyTracker(return_drain_data=False) - - def select_always_empty(rlist, wlist, xlist, timeout=None): - if state.eof_seen and rlist and rlist[0] == state.parent_fd: - return ([], [], []) # always empty - return original_select(rlist, wlist, xlist, timeout) - - hook_config = HookConfigV1Alpha1( - before_lease=HookInstanceConfigV1Alpha1( - script="echo MAX_EMPTY_TEST", timeout=10, - ), - ) - executor = HookExecutor(config=hook_config) - - with ( - patch("pty.openpty", side_effect=state.tracking_openpty), - patch("os.read", side_effect=state.os_read_with_drain_data), - patch("jumpstarter.exporter.hooks.select.select", side_effect=select_always_empty), - patch("jumpstarter.exporter.hooks.logger") as mock_logger, - ): - result = await executor.execute_before_lease_hook(lease_scope) - assert result is None - # Main loop should have captured the output before drain - info_calls = [str(call) for call in mock_logger.info.call_args_list] - assert any("MAX_EMPTY_TEST" in call for call in info_calls) - - @macos_pty_xfail - async def test_drain_empty_counter_resets_on_data(self, lease_scope) -> None: - """Verify the consecutive empty poll counter resets when data arrives. - - Simulates an empty-data-empty pattern during drain: a few empty polls, - then data becomes readable, then more empty polls. The counter should - reset after data is read, so the drain should tolerate more than - DRAIN_MAX_EMPTY_POLLS total empties as long as they are not consecutive. - """ - import select as select_mod - - original_select = select_mod.select - state = _PtyTracker() - drain_select_call = 0 - # Pattern: 5 empties, then ready, then 5 more empties, then ready - # Total empties (10) >= DRAIN_MAX_EMPTY_POLLS but never consecutive - pattern = [False] * 5 + [True] + [False] * 5 + [True] - - def select_with_interleaved_empties(rlist, wlist, xlist, timeout=None): - nonlocal drain_select_call - if state.eof_seen and rlist and rlist[0] == state.parent_fd: - idx = drain_select_call - drain_select_call += 1 - if idx < len(pattern) and not pattern[idx]: - return ([], [], []) - return original_select(rlist, wlist, xlist, timeout) - - hook_config = HookConfigV1Alpha1( - before_lease=HookInstanceConfigV1Alpha1( - script="echo INTERLEAVE_TEST", timeout=10, - ), - ) - executor = HookExecutor(config=hook_config) - - with ( - patch("pty.openpty", side_effect=state.tracking_openpty), - patch("os.read", side_effect=state.os_read_with_drain_data), - patch("jumpstarter.exporter.hooks.select.select", side_effect=select_with_interleaved_empties), - patch("jumpstarter.exporter.hooks.logger") as mock_logger, - ): - result = await executor.execute_before_lease_hook(lease_scope) - assert result is None - info_calls = [str(call) for call in mock_logger.info.call_args_list] - assert any("INTERLEAVE_TEST" in call for call in info_calls) - async def test_drain_constants_are_reasonable(self) -> None: assert MAX_DRAIN_BYTES == 256 * 1024 assert DRAIN_TIMEOUT_SECONDS == 2.0 - assert DRAIN_MAX_EMPTY_POLLS == 10 async def test_exec_default_is_none(self) -> None: """Test that the default exec is None (auto-detect).""" From 2225dbcc8488ef0052cfc364d2b30bee104a5ffb Mon Sep 17 00:00:00 2001 From: Paul Wallrabe Date: Wed, 24 Jun 2026 11:42:23 +0200 Subject: [PATCH 02/16] ci: temporarily enable macOS tests on PRs to validate PTY fix Run the full test matrix (all Python versions, Linux + macOS) on PRs to confirm the PTY race condition fix passes consistently on macOS. This will be reverted once 20 consecutive macOS passes are confirmed. Co-Authored-By: Claude Opus 4.6 (1M context) --- .github/workflows/python-tests.yaml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/.github/workflows/python-tests.yaml b/.github/workflows/python-tests.yaml index 3fd327296..0ec7953f4 100644 --- a/.github/workflows/python-tests.yaml +++ b/.github/workflows/python-tests.yaml @@ -41,8 +41,8 @@ jobs: # Merge queue and workflow_dispatch run the full matrix # (all Python versions on both Linux and macOS). if [[ "${{ github.event_name }}" == "pull_request" ]]; then - echo 'python-versions=["3.12"]' >> "$GITHUB_OUTPUT" - echo 'runners=["ubuntu-24.04"]' >> "$GITHUB_OUTPUT" + echo 'python-versions=["3.11", "3.12", "3.13"]' >> "$GITHUB_OUTPUT" + echo 'runners=["ubuntu-24.04", "macos-15"]' >> "$GITHUB_OUTPUT" else echo 'python-versions=["3.11", "3.12", "3.13"]' >> "$GITHUB_OUTPUT" echo 'runners=["ubuntu-24.04", "macos-15"]' >> "$GITHUB_OUTPUT" From 35920d96c725eae149dd4f1433aaab52c92fe0b3 Mon Sep 17 00:00:00 2001 From: Paul Wallrabe Date: Wed, 24 Jun 2026 11:50:44 +0200 Subject: [PATCH 03/16] test(hooks): cover reader_stop + BlockingIOError exit path Add test for the grandchild scenario where the PTY slave is held open after the direct child exits. The reader gets BlockingIOError (no data) and exits via the stop flag. This covers lines 357-358 in hooks.py to satisfy the diff-coverage threshold. Co-Authored-By: Claude Opus 4.6 (1M context) --- .../jumpstarter/exporter/hooks_test.py | 43 +++++++++++++++++++ 1 file changed, 43 insertions(+) diff --git a/python/packages/jumpstarter/jumpstarter/exporter/hooks_test.py b/python/packages/jumpstarter/jumpstarter/exporter/hooks_test.py index d1bd49e4b..7b5a9cc82 100644 --- a/python/packages/jumpstarter/jumpstarter/exporter/hooks_test.py +++ b/python/packages/jumpstarter/jumpstarter/exporter/hooks_test.py @@ -953,6 +953,49 @@ def flush_lines_with_drain_error(buffer, output_lines): result = await executor.execute_before_lease_hook(lease_scope) assert result is None + async def test_reader_exits_on_stop_flag_when_no_data(self, lease_scope) -> None: + """Verify the reader exits via the stop flag when os.read raises + BlockingIOError (no data available) and reader_stop is True. + + This covers the grandchild scenario where the PTY slave is held + open after the direct child exits: the main loop gets + BlockingIOError because no new data is being written, and the + stop flag causes it to exit cleanly. + """ + state = _PtyTracker(return_drain_data=False) + + def os_read_blocking_after_eof(fd, size): + if fd != state.parent_fd: + return state._original_os_read(fd, size) + if not state.eof_seen: + try: + data = state._original_os_read(fd, size) + except (BlockingIOError, OSError): + state.eof_seen = True + raise + if not data: + state.eof_seen = True + raise BlockingIOError("simulated grandchild holding PTY open") + return data + raise BlockingIOError("simulated grandchild holding PTY open") + + hook_config = HookConfigV1Alpha1( + before_lease=HookInstanceConfigV1Alpha1( + script="echo STOP_FLAG_TEST", timeout=10, + ), + ) + executor = HookExecutor(config=hook_config) + + with ( + patch("pty.openpty", side_effect=state.tracking_openpty), + patch("os.read", side_effect=os_read_blocking_after_eof), + patch("jumpstarter.exporter.hooks.logger") as mock_logger, + ): + result = await executor.execute_before_lease_hook(lease_scope) + assert result is None + info_calls = [str(call) for call in mock_logger.info.call_args_list] + assert any("STOP_FLAG_TEST" in call for call in info_calls) + async def test_drain_constants_are_reasonable(self) -> None: assert MAX_DRAIN_BYTES == 256 * 1024 assert DRAIN_TIMEOUT_SECONDS == 2.0 From f406817710e5e6c90b9ded75dd8faba38e41cf28 Mon Sep 17 00:00:00 2001 From: Paul Wallrabe Date: Wed, 24 Jun 2026 12:01:33 +0200 Subject: [PATCH 04/16] test(hooks): use real grandchild process to cover reader_stop path The previous test mocked os.read to simulate BlockingIOError, but on Linux the reader gets EOF before BlockingIOError. Use a real backgrounded subprocess (sleep &) to hold the PTY slave open, which forces the reader to exit via the reader_stop + BlockingIOError path. Co-Authored-By: Claude Opus 4.6 (1M context) --- .../jumpstarter/exporter/hooks_test.py | 41 +++++-------------- 1 file changed, 10 insertions(+), 31 deletions(-) diff --git a/python/packages/jumpstarter/jumpstarter/exporter/hooks_test.py b/python/packages/jumpstarter/jumpstarter/exporter/hooks_test.py index 7b5a9cc82..b0297cbc5 100644 --- a/python/packages/jumpstarter/jumpstarter/exporter/hooks_test.py +++ b/python/packages/jumpstarter/jumpstarter/exporter/hooks_test.py @@ -953,44 +953,23 @@ def flush_lines_with_drain_error(buffer, output_lines): result = await executor.execute_before_lease_hook(lease_scope) assert result is None - async def test_reader_exits_on_stop_flag_when_no_data(self, lease_scope) -> None: - """Verify the reader exits via the stop flag when os.read raises - BlockingIOError (no data available) and reader_stop is True. - - This covers the grandchild scenario where the PTY slave is held - open after the direct child exits: the main loop gets - BlockingIOError because no new data is being written, and the - stop flag causes it to exit cleanly. - """ - state = _PtyTracker(return_drain_data=False) - - def os_read_blocking_after_eof(fd, size): - if fd != state.parent_fd: - return state._original_os_read(fd, size) - if not state.eof_seen: - try: - data = state._original_os_read(fd, size) - except (BlockingIOError, OSError): - state.eof_seen = True - raise - if not data: - state.eof_seen = True - raise BlockingIOError("simulated grandchild holding PTY open") - return data - raise BlockingIOError("simulated grandchild holding PTY open") + async def test_reader_exits_on_stop_flag_when_grandchild_holds_pty(self, lease_scope) -> None: + """Verify the reader exits via the stop flag when a grandchild + process holds the PTY slave open after the direct child exits. + The backgrounded sleep inherits the PTY slave fd, preventing EOF + on the master. The reader gets BlockingIOError (no data from the + silent grandchild) and exits once reader_stop is set. + """ hook_config = HookConfigV1Alpha1( before_lease=HookInstanceConfigV1Alpha1( - script="echo STOP_FLAG_TEST", timeout=10, + script="echo STOP_FLAG_TEST; sleep 300 &", + timeout=10, ), ) executor = HookExecutor(config=hook_config) - with ( - patch("pty.openpty", side_effect=state.tracking_openpty), - patch("os.read", side_effect=os_read_blocking_after_eof), - patch("jumpstarter.exporter.hooks.logger") as mock_logger, - ): + with patch("jumpstarter.exporter.hooks.logger") as mock_logger: result = await executor.execute_before_lease_hook(lease_scope) assert result is None info_calls = [str(call) for call in mock_logger.info.call_args_list] From 2fa58a082123070c6ae474353793e2a463001198 Mon Sep 17 00:00:00 2001 From: Paul Wallrabe Date: Wed, 24 Jun 2026 12:47:32 +0200 Subject: [PATCH 05/16] ci: retrigger #2 From 775da070d7e59dae7938299941e79985e3c4815a Mon Sep 17 00:00:00 2001 From: Paul Wallrabe Date: Wed, 24 Jun 2026 13:01:47 +0200 Subject: [PATCH 06/16] fix(test): don't set eof_seen on transient BlockingIOError in PtyTracker BlockingIOError means "no data yet" (transient), not EOF. Setting eof_seen on BlockingIOError caused the mock to inject drain data during the main reader loop on macOS, where BlockingIOError can occur before the real EOF arrives. Co-Authored-By: Claude Opus 4.6 (1M context) --- .../packages/jumpstarter/jumpstarter/exporter/hooks_test.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/python/packages/jumpstarter/jumpstarter/exporter/hooks_test.py b/python/packages/jumpstarter/jumpstarter/exporter/hooks_test.py index b0297cbc5..3f743adfe 100644 --- a/python/packages/jumpstarter/jumpstarter/exporter/hooks_test.py +++ b/python/packages/jumpstarter/jumpstarter/exporter/hooks_test.py @@ -58,7 +58,9 @@ def os_read_with_drain_data(self, fd, size): if not self.eof_seen: try: data = self._original_os_read(fd, size) - except (BlockingIOError, OSError): + except BlockingIOError: + raise + except OSError: self.eof_seen = True raise if not data: From ef8d2155c0a812ed51e4f67e59b3b0f0185d37b8 Mon Sep 17 00:00:00 2001 From: Paul Wallrabe Date: Wed, 24 Jun 2026 13:11:07 +0200 Subject: [PATCH 07/16] ci: run macOS pytest 20x to validate PTY fix Temporarily run only macOS runners with 20 repetitions via matrix index to confirm the PTY race condition fix is stable. Will be reverted after validation. Co-Authored-By: Claude Opus 4.6 (1M context) --- .github/workflows/python-tests.yaml | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/.github/workflows/python-tests.yaml b/.github/workflows/python-tests.yaml index 0ec7953f4..bc05eada1 100644 --- a/.github/workflows/python-tests.yaml +++ b/.github/workflows/python-tests.yaml @@ -42,7 +42,7 @@ jobs: # (all Python versions on both Linux and macOS). if [[ "${{ github.event_name }}" == "pull_request" ]]; then echo 'python-versions=["3.11", "3.12", "3.13"]' >> "$GITHUB_OUTPUT" - echo 'runners=["ubuntu-24.04", "macos-15"]' >> "$GITHUB_OUTPUT" + echo 'runners=["macos-15"]' >> "$GITHUB_OUTPUT" else echo 'python-versions=["3.11", "3.12", "3.13"]' >> "$GITHUB_OUTPUT" echo 'runners=["ubuntu-24.04", "macos-15"]' >> "$GITHUB_OUTPUT" @@ -53,14 +53,11 @@ jobs: if: needs.changes.outputs.should_run == 'true' || github.event_name == 'workflow_dispatch' runs-on: ${{ matrix.runs-on }} strategy: + fail-fast: false matrix: runs-on: ${{ fromJson(needs.changes.outputs.runners) }} - # Floor: oldest Python in supported platforms (RHEL 9 appstream) - # Ceiling: newest Python in latest Fedora - # Review on each RHEL/Fedora release - # PRs run only 3.12 on Linux; merge queue runs all versions - # on both Linux and macOS. python-version: ${{ fromJson(needs.changes.outputs.python-versions) }} + run-index: [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20] steps: - uses: actions/checkout@9c091bb21b7c1c1d1991bb908d89e4e9dddfe3e0 # v7 with: From 6a5e3a94936750cdc3fdcf45d98e07f423f3f619 Mon Sep 17 00:00:00 2001 From: Paul Wallrabe Date: Wed, 24 Jun 2026 13:38:21 +0200 Subject: [PATCH 08/16] fix(hooks): drain polls for full timeout instead of breaking on first empty On macOS, PTY internal buffer delivery can lag behind slave closure for very fast commands. The drain must keep polling (bounded by DRAIN_TIMEOUT_SECONDS) rather than breaking on the first empty select(), giving the kernel time to deliver remaining data. Co-Authored-By: Claude Opus 4.6 (1M context) --- python/packages/jumpstarter/jumpstarter/exporter/hooks.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/packages/jumpstarter/jumpstarter/exporter/hooks.py b/python/packages/jumpstarter/jumpstarter/exporter/hooks.py index 4be6a9f5f..2325b4255 100644 --- a/python/packages/jumpstarter/jumpstarter/exporter/hooks.py +++ b/python/packages/jumpstarter/jumpstarter/exporter/hooks.py @@ -389,7 +389,7 @@ async def read_pty_output() -> None: # noqa: C901 except (ValueError, OSError): break if not readable: - break + continue try: chunk = os.read(parent_fd, 4096) if not chunk: From 49304056e1b81faf94575fe1868c57ae06870944 Mon Sep 17 00:00:00 2001 From: Paul Wallrabe Date: Wed, 24 Jun 2026 14:00:45 +0200 Subject: [PATCH 09/16] fix(hooks): extend reader grace period to DRAIN_TIMEOUT_SECONDS Replace the 0.2s grace period with DRAIN_TIMEOUT_SECONDS (2s) after subprocess exit. This gives the macOS PTY kernel buffer sufficient time to deliver data to the master fd before the reader_stop flag forces the reader to exit. The 0.2s window was the fundamental cause of the remaining failures: on loaded macOS CI runners, the PTY buffer delivery can lag behind slave closure by more than 200ms. Co-Authored-By: Claude Opus 4.6 (1M context) --- .../jumpstarter/jumpstarter/exporter/hooks.py | 15 ++++++--------- 1 file changed, 6 insertions(+), 9 deletions(-) diff --git a/python/packages/jumpstarter/jumpstarter/exporter/hooks.py b/python/packages/jumpstarter/jumpstarter/exporter/hooks.py index 2325b4255..5a297e3f5 100644 --- a/python/packages/jumpstarter/jumpstarter/exporter/hooks.py +++ b/python/packages/jumpstarter/jumpstarter/exporter/hooks.py @@ -450,22 +450,19 @@ async def wait_for_process() -> int: await anyio.sleep(0) with anyio.move_on_after(timeout) as cancel_scope: - # Run output reading and process waiting concurrently async with anyio.create_task_group() as tg: logger.debug("Task group created, starting tasks...") tg.start_soon(read_pty_output) logger.debug("Waiting for subprocess to complete...") returncode = await wait_for_process() logger.debug("Subprocess completed with code: %s", returncode) - # Give a brief moment for any final output to be read - await anyio.sleep(0.2) - # Signal the read task to stop via the dedicated stop flag. - # The read task checks this flag after each 0.1s timeout - # and also receives EOF when the subprocess exits. - # Note: pty_state.parent_fd_open stays True so the finally block - # properly closes parent_fd. + # After the subprocess exits, the PTY slave has no + # more writers. The reader will get EOF and exit. + # Set reader_stop after a grace period to handle + # grandchild processes that hold the PTY slave open. + await anyio.sleep(DRAIN_TIMEOUT_SECONDS) pty_state.reader_stop = True - logger.debug("Stop flag set, waiting for read task to exit") + logger.debug("Reader grace period expired, stop flag set") # Don't cancel - let the task exit naturally via EOF or flag check # Cancellation can cause unexpected side effects on gRPC connections From 853a47a6c1080f81109107a72a3fe14206240a93 Mon Sep 17 00:00:00 2001 From: Paul Wallrabe Date: Thu, 25 Jun 2026 16:13:50 +0200 Subject: [PATCH 10/16] ci: bump e2e timeout from 30m to 35m The PTY fix adds a 2s grace period after each hook subprocess exits, giving the macOS PTY kernel buffer time to deliver data. With ~24 hook e2e tests running both beforeLease and afterLease hooks, this adds ~2-3 minutes total. Bump the timeout to accommodate. Co-Authored-By: Claude Opus 4.6 (1M context) --- .github/workflows/e2e.yaml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/.github/workflows/e2e.yaml b/.github/workflows/e2e.yaml index 7242f38cd..f200c4803 100644 --- a/.github/workflows/e2e.yaml +++ b/.github/workflows/e2e.yaml @@ -57,7 +57,7 @@ jobs: matrix: include: ${{ fromJson(needs.changes.outputs.e2e-matrix) }} runs-on: ${{ matrix.os }} - timeout-minutes: 30 + timeout-minutes: 35 steps: - name: Checkout repository uses: actions/checkout@9c091bb21b7c1c1d1991bb908d89e4e9dddfe3e0 # v7 @@ -96,7 +96,7 @@ jobs: matrix: include: ${{ fromJson(needs.changes.outputs.e2e-matrix) }} runs-on: ${{ matrix.os }} - timeout-minutes: 30 + timeout-minutes: 35 steps: - name: Checkout repository uses: actions/checkout@9c091bb21b7c1c1d1991bb908d89e4e9dddfe3e0 # v7 From 312bab21c2124cf3676b67336bd1c5c95104642a Mon Sep 17 00:00:00 2001 From: Paul Wallrabe Date: Thu, 25 Jun 2026 16:18:43 +0200 Subject: [PATCH 11/16] test(hooks): remove macos_pty_xfail markers Remove the xfail markers added by upstream for the macOS PTY race condition. The fix in this PR (process_group=0 + read-before-stop + drain with continue + extended grace period) addresses the root cause, making these markers unnecessary. Co-Authored-By: Claude Opus 4.6 (1M context) --- .../jumpstarter/exporter/hooks_test.py | 43 +++++++------------ 1 file changed, 16 insertions(+), 27 deletions(-) diff --git a/python/packages/jumpstarter/jumpstarter/exporter/hooks_test.py b/python/packages/jumpstarter/jumpstarter/exporter/hooks_test.py index 3f743adfe..15ee8d01d 100644 --- a/python/packages/jumpstarter/jumpstarter/exporter/hooks_test.py +++ b/python/packages/jumpstarter/jumpstarter/exporter/hooks_test.py @@ -1,5 +1,4 @@ import os -import sys from contextlib import nullcontext from unittest.mock import AsyncMock, MagicMock, patch @@ -18,16 +17,6 @@ pytestmark = pytest.mark.anyio -# Tests that spawn real subprocesses via PTY and assert on captured logger -# output are flaky on macOS due to a PTY kernel buffer timing race condition. -# See https://github.com/jumpstarter-dev/jumpstarter/issues/821 -# Targeted for proper fix in 0.10.0. -macos_pty_xfail = pytest.mark.xfail( - condition=sys.platform == "darwin", - reason="PTY output race condition on macOS (#821)", - strict=False, -) - class _PtyTracker: """Tracks PTY fd and EOF state for drain tests that need to intercept @@ -208,7 +197,7 @@ async def test_hook_timeout(self, lease_scope) -> None: assert "timed out after 1 seconds" in str(exc_info.value) assert exc_info.value.on_failure == "exit" - @macos_pty_xfail + async def test_hook_environment_variables(self, lease_scope) -> None: hook_config = HookConfigV1Alpha1( before_lease=HookInstanceConfigV1Alpha1( @@ -223,7 +212,7 @@ async def test_hook_environment_variables(self, lease_scope) -> None: assert any("LEASE_NAME=test-lease-123" in call for call in info_calls) assert any("CLIENT_NAME=test-client" in call for call in info_calls) - @macos_pty_xfail + async def test_real_time_output_logging(self, lease_scope) -> None: """Test that hook output is logged in real-time at INFO level.""" hook_config = HookConfigV1Alpha1( @@ -241,7 +230,7 @@ async def test_real_time_output_logging(self, lease_scope) -> None: assert any("Line 2" in call for call in info_calls) assert any("Line 3" in call for call in info_calls) - @macos_pty_xfail + async def test_post_lease_hook_execution_on_completion(self, lease_scope) -> None: """Test that post-lease hook executes when called directly.""" hook_config = HookConfigV1Alpha1( @@ -352,7 +341,7 @@ async def test_successful_hook_returns_none(self, lease_scope) -> None: result = await executor.execute_before_lease_hook(lease_scope) assert result is None - @macos_pty_xfail + async def test_exec_bash(self, lease_scope) -> None: """Test that exec=/bin/bash allows bash-specific syntax. @@ -374,7 +363,7 @@ async def test_exec_bash(self, lease_scope) -> None: info_calls = [str(call) for call in mock_logger.info.call_args_list] assert any("BASH_OK: world" in call for call in info_calls) - @macos_pty_xfail + async def test_exec_python3(self, lease_scope) -> None: """Test that exec=python3 runs inline Python. @@ -397,7 +386,7 @@ async def test_exec_python3(self, lease_scope) -> None: # Expected total: 0 + 1 + 4 + 9 == 14 assert any("PYTHON_OK: 14" in call for call in info_calls) - @macos_pty_xfail + async def test_script_file_sh(self, lease_scope, tmp_path) -> None: """Test that a .sh file auto-detects /bin/sh as interpreter.""" script_file = tmp_path / "hook_script.sh" @@ -420,7 +409,7 @@ async def test_script_file_sh(self, lease_scope, tmp_path) -> None: debug_calls = [str(call) for call in mock_logger.debug.call_args_list] assert any("Executing script file" in call for call in debug_calls) - @macos_pty_xfail + async def test_script_file_py_autodetects_python(self, lease_scope, tmp_path) -> None: """Test that a .py file auto-detects the exporter's Python as interpreter.""" import sys @@ -447,7 +436,7 @@ async def test_script_file_py_autodetects_python(self, lease_scope, tmp_path) -> # Verify it used the exporter's own Python interpreter assert any(sys.executable in call for call in debug_calls) - @macos_pty_xfail + async def test_script_file_py_exec_override(self, lease_scope, tmp_path) -> None: """Test that explicit exec overrides .py auto-detection.""" script_file = tmp_path / "hook_script.py" @@ -471,7 +460,7 @@ async def test_script_file_py_exec_override(self, lease_scope, tmp_path) -> None debug_calls = [str(call) for call in mock_logger.debug.call_args_list] assert not any("Auto-detected" in call for call in debug_calls) - @macos_pty_xfail + async def test_noninteractive_environment(self, lease_scope) -> None: """Test that hooks receive noninteractive environment variables. @@ -731,7 +720,7 @@ async def test_drain_handles_oserror_gracefully(self) -> None: assert output_lines == [] assert drained == 0 - @macos_pty_xfail + async def test_drain_captures_output_without_trailing_newline(self, lease_scope) -> None: """Verify output without a trailing newline is still captured.""" hook_config = HookConfigV1Alpha1( @@ -748,7 +737,7 @@ async def test_drain_captures_output_without_trailing_newline(self, lease_scope) info_calls = [str(call) for call in mock_logger.info.call_args_list] assert any("NO_NEWLINE_OUTPUT" in call for call in info_calls) - @macos_pty_xfail + async def test_drain_reads_data_remaining_in_pty_buffer(self, lease_scope) -> None: """Verify the drain loop inside read_pty_output reads data left in the PTY kernel buffer after the main read loop exits. @@ -813,7 +802,7 @@ def os_read_with_drain_data(fd, size): info_calls = [str(call) for call in mock_logger.info.call_args_list] assert any("DRAIN_CAPTURED" in call for call in info_calls) - @macos_pty_xfail + async def test_drain_select_oserror_exits_gracefully(self, lease_scope) -> None: """Verify the drain loop exits gracefully when select.select() raises OSError (e.g. fd closed during drain). @@ -850,7 +839,7 @@ def select_with_oserror(rlist, wlist, xlist, timeout=None): info_calls = [str(call) for call in mock_logger.info.call_args_list] assert any("SELECT_ERROR_TEST" in call for call in info_calls) - @macos_pty_xfail + async def test_drain_select_valueerror_exits_gracefully(self, lease_scope) -> None: """Verify the drain loop exits gracefully when select.select() raises ValueError (e.g. negative fd). @@ -885,7 +874,7 @@ def select_with_valueerror(rlist, wlist, xlist, timeout=None): info_calls = [str(call) for call in mock_logger.info.call_args_list] assert any("VALUEERROR_TEST" in call for call in info_calls) - @macos_pty_xfail + async def test_drain_exits_when_deadline_exceeded_before_select(self, lease_scope) -> None: """Verify the drain loop exits when the deadline is exceeded between the while condition and the remaining-time check (line: if remaining <= 0). @@ -920,7 +909,7 @@ async def test_drain_exits_when_deadline_exceeded_before_select(self, lease_scop # exited early due to remaining <= 0 before select could run assert not any("SHOULD_NOT_APPEAR" in call for call in info_calls) - @macos_pty_xfail + async def test_drain_exception_is_suppressed(self, lease_scope) -> None: """Verify that an unexpected exception raised during the drain is caught by the except-Exception handler and does not propagate to the caller. @@ -990,7 +979,7 @@ async def test_exec_default_is_none(self) -> None: class TestHookExecutorPRRegressions: """Regression tests for issues reported during PR review of hooks feature.""" - @macos_pty_xfail + async def test_infrastructure_messages_at_debug_not_info(self, lease_scope) -> None: """Issue A1: Hook infrastructure messages should be at DEBUG, not INFO. From 9b34914091655f9c998da84a16dfb132eb603362 Mon Sep 17 00:00:00 2001 From: Paul Wallrabe Date: Fri, 26 Jun 2026 11:10:20 +0200 Subject: [PATCH 12/16] fix(hooks): replace PTY with subprocess.PIPE for output capture The XNU kernel discards buffered PTY data when the slave fd closes without S_CTTYREF set (documented in Ruby #20682, pexpect #662, Apple Developer Forums #663632). This is POSIX-compliant behavior that no userspace I/O strategy can work around. Replace the PTY-based output capture with subprocess.PIPE, which uses kernel pipes instead of the PTY subsystem. Pipes guarantee data delivery before EOF on all platforms, eliminating the macOS race condition entirely. This removes ~400 lines of PTY management code (non-blocking polling, drain loops, retry logic, PtyState tracking) and replaces it with a simple blocking pipe read in a thread. Closes #821 Co-Authored-By: Claude Opus 4.6 (1M context) --- .../jumpstarter/jumpstarter/exporter/hooks.py | 194 +------ .../jumpstarter/exporter/hooks_test.py | 472 +----------------- 2 files changed, 22 insertions(+), 644 deletions(-) diff --git a/python/packages/jumpstarter/jumpstarter/exporter/hooks.py b/python/packages/jumpstarter/jumpstarter/exporter/hooks.py index 5a297e3f5..6eb91f786 100644 --- a/python/packages/jumpstarter/jumpstarter/exporter/hooks.py +++ b/python/packages/jumpstarter/jumpstarter/exporter/hooks.py @@ -2,8 +2,6 @@ import logging import os -import select -import time from collections.abc import Awaitable from dataclasses import dataclass from typing import TYPE_CHECKING, Callable, Literal @@ -21,11 +19,6 @@ logger = logging.getLogger(__name__) -MAX_DRAIN_BYTES = 256 * 1024 -DRAIN_TIMEOUT_SECONDS = 2.0 -# Module-level reference to time.monotonic so tests can patch it without -# affecting the asyncio event loop (which also uses time.monotonic). -_monotonic = time.monotonic def _flush_lines(buffer: bytes, output_lines: list[str]) -> bytes: @@ -71,19 +64,6 @@ def should_end_lease(self) -> bool: return self.on_failure in ("endLease", "exit") -@dataclass -class PtyState: - """Mutable state for PTY file descriptors and reader coordination. - - Tracks which fds are still open (for cleanup) and provides a separate - stop flag to signal the reader task without affecting fd lifecycle. - """ - - parent_fd_open: bool = True - child_fd_open: bool = True - reader_stop: bool = False - - @dataclass(kw_only=True) class HookExecutor: """Executes lifecycle hooks with access to the j CLI.""" @@ -230,52 +210,32 @@ async def _execute_hook_process( # noqa: C901 logging_session: Session, hook_type: Literal["before_lease", "after_lease"], ) -> str | None: - """Execute the hook process with the given environment and logging session. - - Uses subprocess with a PTY to force line buffering in the subprocess, - ensuring logs stream in real-time rather than being block-buffered. + """Execute the hook process and capture its output via pipes. Returns: Warning message string if hook failed with on_failure='warn', None otherwise """ - import pty import subprocess command = hook_config.script timeout = hook_config.timeout on_failure = hook_config.on_failure - # Exception handling error_msg: str | None = None cause: Exception | None = None timed_out = False - # Route hook output logs to the client via the session's log stream logger.debug("Entering log source context for %s", log_source) with logging_session.context_log_source(__name__, log_source): - # Create a PTY pair - this forces line buffering in the subprocess logger.debug("Starting hook subprocess...") - logger.debug("Creating PTY pair...") - try: - parent_fd, child_fd = pty.openpty() - except Exception as e: - logger.error("Failed to create PTY: %s", e, exc_info=True) - raise - logger.debug("PTY created: parent_fd=%d, child_fd=%d", parent_fd, child_fd) - - pty_state = PtyState() process: subprocess.Popen | None = None try: - # Use subprocess.Popen with the PTY child as stdin/stdout/stderr - # This avoids the issues with os.fork() in async contexts - # Determine interpreter and invocation mode script_stripped = command.strip() is_file = "\n" not in script_stripped and os.path.isfile(script_stripped) interpreter = hook_config.exec_ if is_file and interpreter is None: - # Auto-detect interpreter from file extension import sys ext = os.path.splitext(script_stripped)[1].lower() @@ -299,111 +259,36 @@ async def _execute_hook_process( # noqa: C901 try: process = subprocess.Popen( cmd, - stdin=child_fd, - stdout=child_fd, - stderr=child_fd, + stdin=subprocess.DEVNULL, + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT, env=hook_env, process_group=0, - close_fds=True, # Close inherited fds to prevent interference with gRPC connections + close_fds=True, ) except Exception as e: logger.error("Failed to spawn subprocess: %s", e, exc_info=True) raise logger.debug("Subprocess spawned with PID %d", process.pid) - # Close child fd in parent process - subprocess has it now - os.close(child_fd) - pty_state.child_fd_open = False - logger.debug("Closed child_fd in parent process") output_lines: list[str] = [] - # Set parent fd to non-blocking mode - import fcntl - - flags = fcntl.fcntl(parent_fd, fcntl.F_GETFL) - fcntl.fcntl(parent_fd, fcntl.F_SETFL, flags | os.O_NONBLOCK) - logger.debug("Parent fd set to non-blocking") - - async def read_pty_output() -> None: # noqa: C901 - """Read from PTY parent fd line by line using non-blocking I/O.""" - logger.debug("read_pty_output task started") + async def read_output() -> None: + """Read subprocess output line by line via pipe.""" buffer = b"" - read_count = 0 - last_heartbeat = 0 - - start_time = _monotonic() try: while True: - try: - with anyio.move_on_after(0.1): - await anyio.wait_readable(parent_fd) - - read_count += 1 - elapsed = _monotonic() - start_time - if elapsed - last_heartbeat >= 2.0: - logger.debug( - "read_pty_output: heartbeat at %.1fs, iterations=%d", elapsed, read_count - ) - last_heartbeat = elapsed - - try: - chunk = os.read(parent_fd, 4096) - if not chunk: - logger.debug("read_pty_output: EOF received") - break - buffer += chunk - except BlockingIOError: - if pty_state.reader_stop: - logger.debug("read_pty_output: stop flag set and no data, exiting") - break - continue - except OSError as e: - logger.debug("read_pty_output: OSError on read: %s", e) - break - - buffer = _flush_lines(buffer, output_lines) - - except OSError as e: - logger.debug("read_pty_output: OSError in loop: %s", e) + chunk = await anyio.to_thread.run_sync( + lambda: process.stdout.read(4096), + abandon_on_cancel=True, + ) + if not chunk: break - finally: - # Drain any remaining data from the PTY buffer. - # On macOS, PTY output may still be in the kernel buffer - # after the subprocess exits and the stop flag is set. - # Use select() with a timeout to poll for readability - # instead of immediately breaking on BlockingIOError, - # giving the macOS PTY kernel buffer time to deliver - # remaining data. - # Bound the drain to prevent spinning indefinitely if a - # grandchild process holds the PTY slave fd open. - try: - drain_deadline = _monotonic() + DRAIN_TIMEOUT_SECONDS - drained = 0 - while drained < MAX_DRAIN_BYTES and _monotonic() < drain_deadline: - remaining = drain_deadline - _monotonic() - if remaining <= 0: - break - timeout_s = min(remaining, 0.1) - try: - readable, _, _ = select.select([parent_fd], [], [], timeout_s) - except (ValueError, OSError): - break - if not readable: - continue - try: - chunk = os.read(parent_fd, 4096) - if not chunk: - break - buffer += chunk - drained += len(chunk) - except (BlockingIOError, OSError): - break - + buffer += chunk buffer = _flush_lines(buffer, output_lines) - except Exception: - logger.debug("read_pty_output: error during drain", exc_info=True) - - logger.debug("read_pty_output: exiting, processed %d iterations", read_count) + except OSError as e: + logger.debug("read_output: OSError: %s", e) + finally: if buffer: line_decoded = buffer.decode(errors="replace").rstrip() if line_decoded: @@ -411,75 +296,48 @@ async def read_pty_output() -> None: # noqa: C901 logger.info("%s", line_decoded) async def wait_for_process() -> int: - """Wait for the subprocess to complete. - - Ensures the subprocess is properly reaped even if cancelled, - preventing zombie processes. - """ + """Wait for the subprocess to complete.""" logger.debug("wait_for_process: waiting for PID %d", process.pid) try: result = await anyio.to_thread.run_sync(process.wait, abandon_on_cancel=True) logger.debug("wait_for_process: PID %d exited with code %d", process.pid, result) return result finally: - # Ensure subprocess is reaped on cancellation to prevent zombies if process.poll() is None: logger.debug("wait_for_process: cleaning up still-running PID %d", process.pid) try: process.terminate() - # Give it a moment to terminate gracefully for _ in range(10): if process.poll() is not None: break await anyio.sleep(0.1) - # Force kill if still running if process.poll() is None: logger.debug("wait_for_process: force killing PID %d", process.pid) process.kill() - # Final reap with non-abandoning wait await anyio.to_thread.run_sync(process.wait, abandon_on_cancel=False) except Exception as e: logger.debug("wait_for_process: error during cleanup: %s", e) - # Use move_on_after for timeout returncode: int | None = None - logger.debug("Starting PTY output reader and process waiter (timeout=%d)", timeout) - - # Yield to event loop to ensure other tasks can progress - # This helps prevent race conditions in task scheduling - await anyio.sleep(0) + logger.debug("Starting output reader and process waiter (timeout=%d)", timeout) with anyio.move_on_after(timeout) as cancel_scope: async with anyio.create_task_group() as tg: - logger.debug("Task group created, starting tasks...") - tg.start_soon(read_pty_output) - logger.debug("Waiting for subprocess to complete...") + tg.start_soon(read_output) returncode = await wait_for_process() logger.debug("Subprocess completed with code: %s", returncode) - # After the subprocess exits, the PTY slave has no - # more writers. The reader will get EOF and exit. - # Set reader_stop after a grace period to handle - # grandchild processes that hold the PTY slave open. - await anyio.sleep(DRAIN_TIMEOUT_SECONDS) - pty_state.reader_stop = True - logger.debug("Reader grace period expired, stop flag set") - # Don't cancel - let the task exit naturally via EOF or flag check - # Cancellation can cause unexpected side effects on gRPC connections if cancel_scope.cancelled_caught: timed_out = True error_msg = f"Hook timed out after {timeout} seconds" logger.error(error_msg) - # Terminate the process if process and process.poll() is None: process.terminate() - # Give it a moment to terminate gracefully try: with anyio.move_on_after(5): await anyio.to_thread.run_sync(process.wait, abandon_on_cancel=True) except Exception: pass - # Force kill if still running if process.poll() is None: process.kill() try: @@ -498,23 +356,13 @@ async def wait_for_process() -> int: cause = e logger.error(error_msg, exc_info=True) finally: - # Clean up file descriptors - only close those still open to avoid - # closing an unrelated fd that reused the same number. - if pty_state.parent_fd_open: - try: - os.close(parent_fd) - except OSError: - pass - if pty_state.child_fd_open: + if process and process.stdout: try: - os.close(child_fd) + process.stdout.close() except OSError: pass - # Handle failure inside context_log_source so the WARNING log is - # routed to the client as a hook log (visible without --exporter-logs). if error_msg is not None: - # For timeout, create a TimeoutError as the cause if timed_out and cause is None: cause = TimeoutError(error_msg) return self._handle_hook_failure(error_msg, on_failure, hook_type, cause) diff --git a/python/packages/jumpstarter/jumpstarter/exporter/hooks_test.py b/python/packages/jumpstarter/jumpstarter/exporter/hooks_test.py index 15ee8d01d..e15b362f7 100644 --- a/python/packages/jumpstarter/jumpstarter/exporter/hooks_test.py +++ b/python/packages/jumpstarter/jumpstarter/exporter/hooks_test.py @@ -1,4 +1,3 @@ -import os from contextlib import nullcontext from unittest.mock import AsyncMock, MagicMock, patch @@ -7,89 +6,14 @@ from jumpstarter.common import HOOK_WARNING_PREFIX, ExporterStatus from jumpstarter.config.exporter import HookConfigV1Alpha1, HookInstanceConfigV1Alpha1 from jumpstarter.exporter.hooks import ( - DRAIN_TIMEOUT_SECONDS, - MAX_DRAIN_BYTES, HookExecutionError, HookExecutor, _flush_lines, - _monotonic, ) pytestmark = pytest.mark.anyio -class _PtyTracker: - """Tracks PTY fd and EOF state for drain tests that need to intercept - os.read and pty.openpty calls. - - When ``return_drain_data`` is True (default), the first os.read after EOF - returns ``b"SHOULD_NOT_APPEAR\\n"``; otherwise it returns ``b""``. - """ - - def __init__(self, *, return_drain_data: bool = True) -> None: - import pty - - self.parent_fd: int | None = None - self.eof_seen: bool = False - self._drain_data_returned: bool = False - self._return_drain_data = return_drain_data - self._original_openpty = pty.openpty - self._original_os_read = os.read - - def tracking_openpty(self): - parent, child = self._original_openpty() - self.parent_fd = parent - return parent, child - - def os_read_with_drain_data(self, fd, size): - if fd != self.parent_fd: - return self._original_os_read(fd, size) - if not self.eof_seen: - try: - data = self._original_os_read(fd, size) - except BlockingIOError: - raise - except OSError: - self.eof_seen = True - raise - if not data: - self.eof_seen = True - return b"" - return data - if self._return_drain_data and not self._drain_data_returned: - self._drain_data_returned = True - return b"SHOULD_NOT_APPEAR\n" - return b"" - - -class _DrainDeadlineClock: - """A callable that replaces ``_monotonic`` to simulate the drain - deadline being exceeded between the ``while`` condition check and the - ``remaining`` calculation. - - Only patches the hooks module's ``_monotonic`` reference, leaving - ``time.monotonic`` (used by the asyncio event loop) unaffected. - """ - - def __init__(self, real_monotonic, state: _PtyTracker) -> None: - self._real = real_monotonic - self._state = state - self._call_count = 0 - self._deadline: float | None = None - - def __call__(self) -> float: - real_time = self._real() - if not self._state.eof_seen: - return real_time - self._call_count += 1 - if self._call_count == 1: - self._deadline = real_time + DRAIN_TIMEOUT_SECONDS - return real_time - if self._call_count == 2: - return self._deadline - 0.001 # type: ignore[operator] - return self._deadline + 1.0 # type: ignore[operator] - - class TestFlushLines: def test_extracts_complete_lines(self) -> None: output: list[str] = [] @@ -560,168 +484,8 @@ async def test_before_lease_hook_endlease_handles_release_error(self, lease_scop assert lease_scope.skip_after_lease_hook is True mock_request_lease_release.assert_called_once() - async def test_pty_output_drained_after_stop_flag_set(self) -> None: - """Test that PTY drain captures data remaining after the stop flag is set. - - Simulates the macOS scenario where PTY output is still in the kernel - buffer after the subprocess exits and reader_stop is set. Uses a pipe - to inject data, sets reader_stop=True to skip the main loop, and - verifies the finally-block drain captures all lines. - """ - import fcntl - import time - - read_fd, write_fd = os.pipe() - try: - flags = fcntl.fcntl(read_fd, fcntl.F_GETFL) - fcntl.fcntl(read_fd, fcntl.F_SETFL, flags | os.O_NONBLOCK) - - os.write(write_fd, b"DRAIN_LINE_1\nDRAIN_LINE_2\nDRAIN_LINE_3\n") - os.close(write_fd) - write_fd = -1 - - output_lines: list[str] = [] - buffer = b"" - - drain_deadline = time.monotonic() + DRAIN_TIMEOUT_SECONDS - drained = 0 - while drained < MAX_DRAIN_BYTES and time.monotonic() < drain_deadline: - try: - chunk = os.read(read_fd, 4096) - if not chunk: - break - buffer += chunk - drained += len(chunk) - except (BlockingIOError, OSError): - break - - buffer = _flush_lines(buffer, output_lines) - - assert "DRAIN_LINE_1" in output_lines - assert "DRAIN_LINE_2" in output_lines - assert "DRAIN_LINE_3" in output_lines - finally: - os.close(read_fd) - if write_fd != -1: - os.close(write_fd) - - async def test_drain_respects_byte_limit(self) -> None: - """Verify the drain loop stops after MAX_DRAIN_BYTES to prevent - indefinite blocking when a grandchild process holds the PTY open. - - Directly tests the drain logic using a pipe with data exceeding the - byte limit. Uses non-blocking writes to fill the pipe without blocking. - """ - import fcntl - import time - - read_fd, write_fd = os.pipe() - try: - flags = fcntl.fcntl(read_fd, fcntl.F_GETFL) - fcntl.fcntl(read_fd, fcntl.F_SETFL, flags | os.O_NONBLOCK) - wflags = fcntl.fcntl(write_fd, fcntl.F_GETFL) - fcntl.fcntl(write_fd, fcntl.F_SETFL, wflags | os.O_NONBLOCK) - - total_written = 0 - chunk = b"X" * 4000 + b"\n" - try: - while True: - os.write(write_fd, chunk) - total_written += len(chunk) - except BlockingIOError: - pass - - assert total_written > 0 - - output_lines: list[str] = [] - buffer = b"" - drain_deadline = time.monotonic() + DRAIN_TIMEOUT_SECONDS - drained = 0 - while drained < MAX_DRAIN_BYTES and time.monotonic() < drain_deadline: - try: - data = os.read(read_fd, 4096) - if not data: - break - buffer += data - drained += len(data) - except (BlockingIOError, OSError): - break - - buffer = _flush_lines(buffer, output_lines) - - assert drained <= MAX_DRAIN_BYTES - assert len(output_lines) > 0 - finally: - os.close(read_fd) - os.close(write_fd) - - async def test_drain_completes_immediately_on_empty_buffer(self) -> None: - """Verify drain exits quickly when the PTY buffer is empty (EOF).""" - import time - - read_fd, write_fd = os.pipe() - os.close(write_fd) - try: - import fcntl - - flags = fcntl.fcntl(read_fd, fcntl.F_GETFL) - fcntl.fcntl(read_fd, fcntl.F_SETFL, flags | os.O_NONBLOCK) - - output_lines: list[str] = [] - buffer = b"" - start = time.monotonic() - - drain_deadline = time.monotonic() + DRAIN_TIMEOUT_SECONDS - drained = 0 - while drained < MAX_DRAIN_BYTES and time.monotonic() < drain_deadline: - try: - chunk = os.read(read_fd, 4096) - if not chunk: - break - buffer += chunk - drained += len(chunk) - except (BlockingIOError, OSError): - break - - buffer = _flush_lines(buffer, output_lines) - elapsed = time.monotonic() - start - - assert output_lines == [] - assert drained == 0 - assert elapsed < 0.5 - finally: - os.close(read_fd) - - async def test_drain_handles_oserror_gracefully(self) -> None: - """Verify drain exits gracefully when os.read raises OSError (e.g. EIO).""" - import time - - read_fd, write_fd = os.pipe() - os.close(write_fd) - os.close(read_fd) - - output_lines: list[str] = [] - buffer = b"" - - drain_deadline = time.monotonic() + DRAIN_TIMEOUT_SECONDS - drained = 0 - while drained < MAX_DRAIN_BYTES and time.monotonic() < drain_deadline: - try: - chunk = os.read(read_fd, 4096) - if not chunk: - break - buffer += chunk - drained += len(chunk) - except (BlockingIOError, OSError): - break - - buffer = _flush_lines(buffer, output_lines) - assert output_lines == [] - assert drained == 0 - - - async def test_drain_captures_output_without_trailing_newline(self, lease_scope) -> None: + async def test_output_captured_without_trailing_newline(self, lease_scope) -> None: """Verify output without a trailing newline is still captured.""" hook_config = HookConfigV1Alpha1( before_lease=HookInstanceConfigV1Alpha1( @@ -737,239 +501,6 @@ async def test_drain_captures_output_without_trailing_newline(self, lease_scope) info_calls = [str(call) for call in mock_logger.info.call_args_list] assert any("NO_NEWLINE_OUTPUT" in call for call in info_calls) - - async def test_drain_reads_data_remaining_in_pty_buffer(self, lease_scope) -> None: - """Verify the drain loop inside read_pty_output reads data left in the - PTY kernel buffer after the main read loop exits. - - Patches os.read so that, once the main loop has consumed the initial - subprocess output via EOF from the specific PTY fd, a subsequent read - returns additional data - simulating the macOS scenario where the - kernel buffers output that arrives after the reader stop flag is set. - """ - import pty - - hook_config = HookConfigV1Alpha1( - before_lease=HookInstanceConfigV1Alpha1( - script="echo MAIN_OUTPUT", - timeout=10, - ), - ) - executor = HookExecutor(config=hook_config) - - original_os_read = os.read - original_openpty = pty.openpty - pty_parent_fd = None - eof_seen_on_pty = False - - def tracking_openpty(): - nonlocal pty_parent_fd - parent, child = original_openpty() - pty_parent_fd = parent - return parent, child - - drain_data_returned = False - - def os_read_with_drain_data(fd, size): - nonlocal eof_seen_on_pty, drain_data_returned - if fd != pty_parent_fd: - return original_os_read(fd, size) - if not eof_seen_on_pty: - try: - data = original_os_read(fd, size) - except (BlockingIOError, OSError): - if not eof_seen_on_pty: - eof_seen_on_pty = True - raise - if not data: - eof_seen_on_pty = True - return b"" - return data - if not drain_data_returned: - drain_data_returned = True - return b"DRAIN_CAPTURED\n" - return b"" - - with ( - patch("pty.openpty", side_effect=tracking_openpty), - patch("os.read", side_effect=os_read_with_drain_data), - patch("jumpstarter.exporter.hooks.logger") as mock_logger, - ): - result = await executor.execute_before_lease_hook(lease_scope) - assert result is None - assert pty_parent_fd is not None - assert eof_seen_on_pty - info_calls = [str(call) for call in mock_logger.info.call_args_list] - assert any("DRAIN_CAPTURED" in call for call in info_calls) - - - async def test_drain_select_oserror_exits_gracefully(self, lease_scope) -> None: - """Verify the drain loop exits gracefully when select.select() raises - OSError (e.g. fd closed during drain). - - Patches select.select inside the drain to raise OSError, simulating a - closed or invalid fd. The hook should still complete successfully. - """ - import select as select_mod - - original_select = select_mod.select - state = _PtyTracker() - - def select_with_oserror(rlist, wlist, xlist, timeout=None): - if state.eof_seen and rlist and rlist[0] == state.parent_fd: - raise OSError("simulated fd closed during drain") - return original_select(rlist, wlist, xlist, timeout) - - hook_config = HookConfigV1Alpha1( - before_lease=HookInstanceConfigV1Alpha1( - script="echo SELECT_ERROR_TEST", timeout=10, - ), - ) - executor = HookExecutor(config=hook_config) - - with ( - patch("pty.openpty", side_effect=state.tracking_openpty), - patch("os.read", side_effect=state.os_read_with_drain_data), - patch("jumpstarter.exporter.hooks.select.select", side_effect=select_with_oserror), - patch("jumpstarter.exporter.hooks.logger") as mock_logger, - ): - result = await executor.execute_before_lease_hook(lease_scope) - assert result is None - assert state.eof_seen - info_calls = [str(call) for call in mock_logger.info.call_args_list] - assert any("SELECT_ERROR_TEST" in call for call in info_calls) - - - async def test_drain_select_valueerror_exits_gracefully(self, lease_scope) -> None: - """Verify the drain loop exits gracefully when select.select() raises - ValueError (e.g. negative fd). - - This covers the except (ValueError, OSError) handler in the drain loop. - """ - import select as select_mod - - original_select = select_mod.select - state = _PtyTracker(return_drain_data=False) - - def select_with_valueerror(rlist, wlist, xlist, timeout=None): - if state.eof_seen and rlist and rlist[0] == state.parent_fd: - raise ValueError("file descriptor cannot be a negative integer (-1)") - return original_select(rlist, wlist, xlist, timeout) - - hook_config = HookConfigV1Alpha1( - before_lease=HookInstanceConfigV1Alpha1( - script="echo VALUEERROR_TEST", timeout=10, - ), - ) - executor = HookExecutor(config=hook_config) - - with ( - patch("pty.openpty", side_effect=state.tracking_openpty), - patch("os.read", side_effect=state.os_read_with_drain_data), - patch("jumpstarter.exporter.hooks.select.select", side_effect=select_with_valueerror), - patch("jumpstarter.exporter.hooks.logger") as mock_logger, - ): - result = await executor.execute_before_lease_hook(lease_scope) - assert result is None - info_calls = [str(call) for call in mock_logger.info.call_args_list] - assert any("VALUEERROR_TEST" in call for call in info_calls) - - - async def test_drain_exits_when_deadline_exceeded_before_select(self, lease_scope) -> None: - """Verify the drain loop exits when the deadline is exceeded between the - while condition and the remaining-time check (line: if remaining <= 0). - - Patches ``jumpstarter.exporter.hooks._monotonic`` (not ``time.monotonic`` - globally) to simulate a jump past the deadline after the while condition - passes but before the remaining check. Using the module-level - ``_monotonic`` reference avoids breaking the asyncio event loop, which - also relies on ``time.monotonic``. - """ - state = _PtyTracker() - clock = _DrainDeadlineClock(_monotonic, state) - - hook_config = HookConfigV1Alpha1( - before_lease=HookInstanceConfigV1Alpha1( - script="echo DEADLINE_TEST", timeout=10, - ), - ) - executor = HookExecutor(config=hook_config) - - with ( - patch("pty.openpty", side_effect=state.tracking_openpty), - patch("os.read", side_effect=state.os_read_with_drain_data), - patch("jumpstarter.exporter.hooks._monotonic", side_effect=clock), - patch("jumpstarter.exporter.hooks.logger") as mock_logger, - ): - result = await executor.execute_before_lease_hook(lease_scope) - assert result is None - info_calls = [str(call) for call in mock_logger.info.call_args_list] - assert any("DEADLINE_TEST" in call for call in info_calls) - # SHOULD_NOT_APPEAR should not be in output because the drain - # exited early due to remaining <= 0 before select could run - assert not any("SHOULD_NOT_APPEAR" in call for call in info_calls) - - - async def test_drain_exception_is_suppressed(self, lease_scope) -> None: - """Verify that an unexpected exception raised during the drain is caught - by the except-Exception handler and does not propagate to the caller. - - Patches _flush_lines so that the second call (inside the drain) raises - a RuntimeError. The hook should still complete successfully because the - drain's except-Exception block suppresses it. - """ - hook_config = HookConfigV1Alpha1( - before_lease=HookInstanceConfigV1Alpha1( - script="echo BEFORE_DRAIN_ERROR", - timeout=10, - ), - ) - executor = HookExecutor(config=hook_config) - - original_flush = _flush_lines - call_count = 0 - - def flush_lines_with_drain_error(buffer, output_lines): - nonlocal call_count - call_count += 1 - result = original_flush(buffer, output_lines) - if call_count > 1: - raise RuntimeError("simulated drain error") - return result - - with ( - patch("jumpstarter.exporter.hooks._flush_lines", side_effect=flush_lines_with_drain_error), - patch("jumpstarter.exporter.hooks.logger"), - ): - result = await executor.execute_before_lease_hook(lease_scope) - assert result is None - - async def test_reader_exits_on_stop_flag_when_grandchild_holds_pty(self, lease_scope) -> None: - """Verify the reader exits via the stop flag when a grandchild - process holds the PTY slave open after the direct child exits. - - The backgrounded sleep inherits the PTY slave fd, preventing EOF - on the master. The reader gets BlockingIOError (no data from the - silent grandchild) and exits once reader_stop is set. - """ - hook_config = HookConfigV1Alpha1( - before_lease=HookInstanceConfigV1Alpha1( - script="echo STOP_FLAG_TEST; sleep 300 &", - timeout=10, - ), - ) - executor = HookExecutor(config=hook_config) - - with patch("jumpstarter.exporter.hooks.logger") as mock_logger: - result = await executor.execute_before_lease_hook(lease_scope) - assert result is None - info_calls = [str(call) for call in mock_logger.info.call_args_list] - assert any("STOP_FLAG_TEST" in call for call in info_calls) - - async def test_drain_constants_are_reasonable(self) -> None: - assert MAX_DRAIN_BYTES == 256 * 1024 - assert DRAIN_TIMEOUT_SECONDS == 2.0 - async def test_exec_default_is_none(self) -> None: """Test that the default exec is None (auto-detect).""" hook = HookInstanceConfigV1Alpha1(script="echo hello") @@ -1003,7 +534,6 @@ async def test_infrastructure_messages_at_debug_not_info(self, lease_scope) -> N # Infrastructure messages should be at DEBUG level infra_messages = [ "Starting hook subprocess", - "Creating PTY", "Spawning subprocess", "Subprocess spawned", "Hook executed successfully", From d3d3e76f757dd4b7bce6402c66e9ed126fa35e2f Mon Sep 17 00:00:00 2001 From: Paul Wallrabe Date: Fri, 26 Jun 2026 14:15:14 +0200 Subject: [PATCH 13/16] fix(hooks): use async non-blocking read on pipe fd for real-time output Replace thread-based process.stdout.read() with non-blocking os.read() + anyio.wait_readable() on the pipe fd. This delivers hook output line-by-line in real-time (like PTY did) instead of in a batch after subprocess exit. The thread-based reader had a scheduling race: the thread pool could delay the reader's first read until after the subprocess exited, causing all logger.info calls to happen in a burst right before the hook reported completion. The LogStream couldn't deliver the messages to the client in time. The async reader starts immediately (no thread pool dependency), reads each line as it's produced, and logs it via logger.info which the LogStream delivers to the client in real-time. Co-Authored-By: Claude Opus 4.6 (1M context) --- .../jumpstarter/jumpstarter/exporter/hooks.py | 27 ++++++++++++------- 1 file changed, 18 insertions(+), 9 deletions(-) diff --git a/python/packages/jumpstarter/jumpstarter/exporter/hooks.py b/python/packages/jumpstarter/jumpstarter/exporter/hooks.py index 6eb91f786..230887a13 100644 --- a/python/packages/jumpstarter/jumpstarter/exporter/hooks.py +++ b/python/packages/jumpstarter/jumpstarter/exporter/hooks.py @@ -273,21 +273,30 @@ async def _execute_hook_process( # noqa: C901 output_lines: list[str] = [] + import fcntl + + pipe_fd = process.stdout.fileno() + flags = fcntl.fcntl(pipe_fd, fcntl.F_GETFL) + fcntl.fcntl(pipe_fd, fcntl.F_SETFL, flags | os.O_NONBLOCK) + async def read_output() -> None: - """Read subprocess output line by line via pipe.""" + """Read subprocess output via pipe using async non-blocking I/O.""" buffer = b"" try: while True: - chunk = await anyio.to_thread.run_sync( - lambda: process.stdout.read(4096), - abandon_on_cancel=True, - ) - if not chunk: + try: + with anyio.move_on_after(0.1): + await anyio.wait_readable(pipe_fd) + chunk = os.read(pipe_fd, 4096) + if not chunk: + break + buffer += chunk + except BlockingIOError: + continue + except OSError as e: + logger.debug("read_output: OSError: %s", e) break - buffer += chunk buffer = _flush_lines(buffer, output_lines) - except OSError as e: - logger.debug("read_output: OSError: %s", e) finally: if buffer: line_decoded = buffer.decode(errors="replace").rstrip() From 66a190accd5eb7a4bdb49964f2dc94e3e6fd9ceb Mon Sep 17 00:00:00 2001 From: Paul Wallrabe Date: Fri, 26 Jun 2026 15:44:57 +0200 Subject: [PATCH 14/16] fix(hooks): yield to event loop after reader to flush LogStream After the reader finishes and the task group exits, yield to the event loop before reporting the hook result. This lets the LogStream deliver pending messages to the client before the hook status changes to AVAILABLE, preventing the client from disconnecting before receiving the last output lines. Co-Authored-By: Claude Opus 4.6 (1M context) --- python/packages/jumpstarter/jumpstarter/exporter/hooks.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/python/packages/jumpstarter/jumpstarter/exporter/hooks.py b/python/packages/jumpstarter/jumpstarter/exporter/hooks.py index 230887a13..795f7fe4a 100644 --- a/python/packages/jumpstarter/jumpstarter/exporter/hooks.py +++ b/python/packages/jumpstarter/jumpstarter/exporter/hooks.py @@ -335,6 +335,9 @@ async def wait_for_process() -> int: tg.start_soon(read_output) returncode = await wait_for_process() logger.debug("Subprocess completed with code: %s", returncode) + # Yield to let the LogStream deliver any pending + # messages before reporting the hook result. + await anyio.sleep(0) if cancel_scope.cancelled_caught: timed_out = True From d89544f242ad9f1b9fdd795c42d44472c8f01223 Mon Sep 17 00:00:00 2001 From: Paul Wallrabe Date: Tue, 30 Jun 2026 10:46:37 +0200 Subject: [PATCH 15/16] revert(ci): remove temporary validation CI changes The PTY approach was replaced with subprocess.PIPE, so the 35m e2e timeout bump and the 20x macOS validation matrix are no longer needed. Co-Authored-By: Claude Opus 4.6 (1M context) --- .github/workflows/e2e.yaml | 4 ++-- .github/workflows/python-tests.yaml | 11 +++++++---- 2 files changed, 9 insertions(+), 6 deletions(-) diff --git a/.github/workflows/e2e.yaml b/.github/workflows/e2e.yaml index f200c4803..7242f38cd 100644 --- a/.github/workflows/e2e.yaml +++ b/.github/workflows/e2e.yaml @@ -57,7 +57,7 @@ jobs: matrix: include: ${{ fromJson(needs.changes.outputs.e2e-matrix) }} runs-on: ${{ matrix.os }} - timeout-minutes: 35 + timeout-minutes: 30 steps: - name: Checkout repository uses: actions/checkout@9c091bb21b7c1c1d1991bb908d89e4e9dddfe3e0 # v7 @@ -96,7 +96,7 @@ jobs: matrix: include: ${{ fromJson(needs.changes.outputs.e2e-matrix) }} runs-on: ${{ matrix.os }} - timeout-minutes: 35 + timeout-minutes: 30 steps: - name: Checkout repository uses: actions/checkout@9c091bb21b7c1c1d1991bb908d89e4e9dddfe3e0 # v7 diff --git a/.github/workflows/python-tests.yaml b/.github/workflows/python-tests.yaml index bc05eada1..3fd327296 100644 --- a/.github/workflows/python-tests.yaml +++ b/.github/workflows/python-tests.yaml @@ -41,8 +41,8 @@ jobs: # Merge queue and workflow_dispatch run the full matrix # (all Python versions on both Linux and macOS). if [[ "${{ github.event_name }}" == "pull_request" ]]; then - echo 'python-versions=["3.11", "3.12", "3.13"]' >> "$GITHUB_OUTPUT" - echo 'runners=["macos-15"]' >> "$GITHUB_OUTPUT" + echo 'python-versions=["3.12"]' >> "$GITHUB_OUTPUT" + echo 'runners=["ubuntu-24.04"]' >> "$GITHUB_OUTPUT" else echo 'python-versions=["3.11", "3.12", "3.13"]' >> "$GITHUB_OUTPUT" echo 'runners=["ubuntu-24.04", "macos-15"]' >> "$GITHUB_OUTPUT" @@ -53,11 +53,14 @@ jobs: if: needs.changes.outputs.should_run == 'true' || github.event_name == 'workflow_dispatch' runs-on: ${{ matrix.runs-on }} strategy: - fail-fast: false matrix: runs-on: ${{ fromJson(needs.changes.outputs.runners) }} + # Floor: oldest Python in supported platforms (RHEL 9 appstream) + # Ceiling: newest Python in latest Fedora + # Review on each RHEL/Fedora release + # PRs run only 3.12 on Linux; merge queue runs all versions + # on both Linux and macOS. python-version: ${{ fromJson(needs.changes.outputs.python-versions) }} - run-index: [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20] steps: - uses: actions/checkout@9c091bb21b7c1c1d1991bb908d89e4e9dddfe3e0 # v7 with: From 364426328e5d2adb0d150b5614d41e935f951cb1 Mon Sep 17 00:00:00 2001 From: Paul Wallrabe Date: Tue, 30 Jun 2026 17:10:46 +0200 Subject: [PATCH 16/16] test(hooks): add edge case tests for pipe-based output capture Cover pipe-specific edge cases that were untested after the PTY-to-pipe migration: stderr merge, large output spanning multiple reads, non-UTF8 decoding, rapid exit buffering, spawn failure cleanup, interleaved stdout/stderr, and grandchild process holding the pipe open past timeout. Co-Authored-By: Claude Opus 4.6 (1M context) --- .../jumpstarter/exporter/hooks_test.py | 142 ++++++++++++++++++ 1 file changed, 142 insertions(+) diff --git a/python/packages/jumpstarter/jumpstarter/exporter/hooks_test.py b/python/packages/jumpstarter/jumpstarter/exporter/hooks_test.py index e15b362f7..da54e9f51 100644 --- a/python/packages/jumpstarter/jumpstarter/exporter/hooks_test.py +++ b/python/packages/jumpstarter/jumpstarter/exporter/hooks_test.py @@ -1045,3 +1045,145 @@ async def mock_report_status(status, msg): assert len(hook_started_calls) == 1, ( f"BEFORE_LEASE_HOOK must be reported (hook must run) even when lease has ended, got: {status_calls}" ) + + +class TestPipeOutputEdgeCases: + """Edge cases for pipe-based output capture (PR #837).""" + + async def test_stderr_captured_via_pipe_merge(self, lease_scope) -> None: + hook_config = HookConfigV1Alpha1( + before_lease=HookInstanceConfigV1Alpha1( + script="echo STDOUT_LINE; echo STDERR_LINE >&2", + timeout=10, + ), + ) + executor = HookExecutor(config=hook_config) + + with patch("jumpstarter.exporter.hooks.logger") as mock_logger: + result = await executor.execute_before_lease_hook(lease_scope) + + assert result is None + info_calls = [str(call) for call in mock_logger.info.call_args_list] + assert any("STDOUT_LINE" in call for call in info_calls) + assert any("STDERR_LINE" in call for call in info_calls) + + async def test_large_output_spanning_multiple_reads(self, lease_scope) -> None: + hook_config = HookConfigV1Alpha1( + before_lease=HookInstanceConfigV1Alpha1( + script=( + "seq 1 200 | while read n; do " + "echo \"LINE_${n}_PADDING_XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX\"; " + "done" + ), + timeout=10, + ), + ) + executor = HookExecutor(config=hook_config) + + with patch("jumpstarter.exporter.hooks.logger") as mock_logger: + result = await executor.execute_before_lease_hook(lease_scope) + + assert result is None + info_calls = [str(call) for call in mock_logger.info.call_args_list] + assert any("LINE_1_" in call for call in info_calls) + assert any("LINE_100_" in call for call in info_calls) + assert any("LINE_200_" in call for call in info_calls) + + async def test_non_utf8_output_decoded_with_replacement(self, lease_scope) -> None: + hook_config = HookConfigV1Alpha1( + before_lease=HookInstanceConfigV1Alpha1( + exec_="python3", + script=( + "import sys; " + "sys.stdout.buffer.write(b'VALID_PREFIX\\x80VALID_SUFFIX\\n')" + ), + timeout=10, + ), + ) + executor = HookExecutor(config=hook_config) + + with patch("jumpstarter.exporter.hooks.logger") as mock_logger: + result = await executor.execute_before_lease_hook(lease_scope) + + assert result is None + info_calls = [str(call) for call in mock_logger.info.call_args_list] + matching = [ + call for call in info_calls + if "VALID_PREFIX" in call and "VALID_SUFFIX" in call + ] + assert len(matching) > 0 + + async def test_rapid_exit_with_buffered_output(self, lease_scope) -> None: + hook_config = HookConfigV1Alpha1( + before_lease=HookInstanceConfigV1Alpha1( + script="echo FAST_1; echo FAST_2; echo FAST_3; echo FAST_4; echo FAST_5", + timeout=10, + ), + ) + executor = HookExecutor(config=hook_config) + + with patch("jumpstarter.exporter.hooks.logger") as mock_logger: + result = await executor.execute_before_lease_hook(lease_scope) + + assert result is None + info_calls = [str(call) for call in mock_logger.info.call_args_list] + for i in range(1, 6): + assert any(f"FAST_{i}" in call for call in info_calls), ( + f"FAST_{i} was not captured" + ) + + async def test_spawn_failure_cleans_up_without_crash(self, lease_scope) -> None: + hook_config = HookConfigV1Alpha1( + before_lease=HookInstanceConfigV1Alpha1( + exec_="/nonexistent/interpreter", + script="echo should not run", + timeout=10, + on_failure="warn", + ), + ) + executor = HookExecutor(config=hook_config) + + result = await executor.execute_before_lease_hook(lease_scope) + assert result is not None + assert "error" in result.lower() + + async def test_interleaved_stdout_and_stderr_captured(self, lease_scope) -> None: + hook_config = HookConfigV1Alpha1( + before_lease=HookInstanceConfigV1Alpha1( + script=( + "echo OUT_1; echo ERR_1 >&2; " + "echo OUT_2; echo ERR_2 >&2; " + "echo OUT_3" + ), + timeout=10, + ), + ) + executor = HookExecutor(config=hook_config) + + with patch("jumpstarter.exporter.hooks.logger") as mock_logger: + result = await executor.execute_before_lease_hook(lease_scope) + + assert result is None + info_calls = [str(call) for call in mock_logger.info.call_args_list] + for label in ("OUT_1", "OUT_2", "OUT_3", "ERR_1", "ERR_2"): + assert any(label in call for call in info_calls), ( + f"{label} was not captured" + ) + + async def test_timeout_with_grandchild_holding_pipe(self, lease_scope) -> None: + hook_config = HookConfigV1Alpha1( + before_lease=HookInstanceConfigV1Alpha1( + script="echo GRANDCHILD_TEST; sleep 10 &", + timeout=2, + on_failure="warn", + ), + ) + executor = HookExecutor(config=hook_config) + + with patch("jumpstarter.exporter.hooks.logger") as mock_logger: + result = await executor.execute_before_lease_hook(lease_scope) + + assert result is not None + assert "timed out" in result.lower() + info_calls = [str(call) for call in mock_logger.info.call_args_list] + assert any("GRANDCHILD_TEST" in call for call in info_calls)