diff --git a/python/packages/jumpstarter/jumpstarter/exporter/hooks.py b/python/packages/jumpstarter/jumpstarter/exporter/hooks.py index 30dc9a0e0..795f7fe4a 100644 --- a/python/packages/jumpstarter/jumpstarter/exporter/hooks.py +++ b/python/packages/jumpstarter/jumpstarter/exporter/hooks.py @@ -2,8 +2,6 @@ import logging import os -import select -import time from collections.abc import Awaitable from dataclasses import dataclass from typing import TYPE_CHECKING, Callable, Literal @@ -21,13 +19,6 @@ logger = logging.getLogger(__name__) -MAX_DRAIN_BYTES = 256 * 1024 -DRAIN_TIMEOUT_SECONDS = 2.0 -DRAIN_MAX_EMPTY_POLLS = 10 - -# Module-level reference to time.monotonic so tests can patch it without -# affecting the asyncio event loop (which also uses time.monotonic). -_monotonic = time.monotonic def _flush_lines(buffer: bytes, output_lines: list[str]) -> bytes: @@ -73,19 +64,6 @@ def should_end_lease(self) -> bool: return self.on_failure in ("endLease", "exit") -@dataclass -class PtyState: - """Mutable state for PTY file descriptors and reader coordination. - - Tracks which fds are still open (for cleanup) and provides a separate - stop flag to signal the reader task without affecting fd lifecycle. - """ - - parent_fd_open: bool = True - child_fd_open: bool = True - reader_stop: bool = False - - @dataclass(kw_only=True) class HookExecutor: """Executes lifecycle hooks with access to the j CLI.""" @@ -232,52 +210,32 @@ async def _execute_hook_process( # noqa: C901 logging_session: Session, hook_type: Literal["before_lease", "after_lease"], ) -> str | None: - """Execute the hook process with the given environment and logging session. - - Uses subprocess with a PTY to force line buffering in the subprocess, - ensuring logs stream in real-time rather than being block-buffered. + """Execute the hook process and capture its output via pipes. Returns: Warning message string if hook failed with on_failure='warn', None otherwise """ - import pty import subprocess command = hook_config.script timeout = hook_config.timeout on_failure = hook_config.on_failure - # Exception handling error_msg: str | None = None cause: Exception | None = None timed_out = False - # Route hook output logs to the client via the session's log stream logger.debug("Entering log source context for %s", log_source) with logging_session.context_log_source(__name__, log_source): - # Create a PTY pair - this forces line buffering in the subprocess logger.debug("Starting hook subprocess...") - logger.debug("Creating PTY pair...") - try: - parent_fd, child_fd = pty.openpty() - except Exception as e: - logger.error("Failed to create PTY: %s", e, exc_info=True) - raise - logger.debug("PTY created: parent_fd=%d, child_fd=%d", parent_fd, child_fd) - - pty_state = PtyState() process: subprocess.Popen | None = None try: - # Use subprocess.Popen with the PTY child as stdin/stdout/stderr - # This avoids the issues with os.fork() in async contexts - # Determine interpreter and invocation mode script_stripped = command.strip() is_file = "\n" not in script_stripped and os.path.isfile(script_stripped) interpreter = hook_config.exec_ if is_file and interpreter is None: - # Auto-detect interpreter from file extension import sys ext = os.path.splitext(script_stripped)[1].lower() @@ -301,137 +259,45 @@ async def _execute_hook_process( # noqa: C901 try: process = subprocess.Popen( cmd, - stdin=child_fd, - stdout=child_fd, - stderr=child_fd, + stdin=subprocess.DEVNULL, + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT, env=hook_env, - start_new_session=True, # Equivalent to os.setsid() - close_fds=True, # Close inherited fds to prevent interference with gRPC connections + process_group=0, + close_fds=True, ) except Exception as e: logger.error("Failed to spawn subprocess: %s", e, exc_info=True) raise logger.debug("Subprocess spawned with PID %d", process.pid) - # Close child fd in parent process - subprocess has it now - os.close(child_fd) - pty_state.child_fd_open = False - logger.debug("Closed child_fd in parent process") output_lines: list[str] = [] - # Set parent fd to non-blocking mode import fcntl - flags = fcntl.fcntl(parent_fd, fcntl.F_GETFL) - fcntl.fcntl(parent_fd, fcntl.F_SETFL, flags | os.O_NONBLOCK) - logger.debug("Parent fd set to non-blocking") + pipe_fd = process.stdout.fileno() + flags = fcntl.fcntl(pipe_fd, fcntl.F_GETFL) + fcntl.fcntl(pipe_fd, fcntl.F_SETFL, flags | os.O_NONBLOCK) - async def read_pty_output() -> None: # noqa: C901 - """Read from PTY parent fd line by line using non-blocking I/O.""" - logger.debug("read_pty_output task started") + async def read_output() -> None: + """Read subprocess output via pipe using async non-blocking I/O.""" buffer = b"" - read_count = 0 - last_heartbeat = 0 - - start_time = _monotonic() try: - while not pty_state.reader_stop: + while True: try: - # Wait for fd to be readable with timeout with anyio.move_on_after(0.1): - await anyio.wait_readable(parent_fd) - - # Check stop flag immediately after timeout - # (main task may have signaled us to stop) - if pty_state.reader_stop: - logger.debug("read_pty_output: stop flag set, exiting") + await anyio.wait_readable(pipe_fd) + chunk = os.read(pipe_fd, 4096) + if not chunk: break - - read_count += 1 - # Log heartbeat every 2 seconds - elapsed = _monotonic() - start_time - if elapsed - last_heartbeat >= 2.0: - logger.debug( - "read_pty_output: heartbeat at %.1fs, iterations=%d", elapsed, read_count - ) - last_heartbeat = elapsed - - # Read available data (non-blocking) - try: - chunk = os.read(parent_fd, 4096) - if not chunk: - # EOF - logger.debug("read_pty_output: EOF received") - break - buffer += chunk - except BlockingIOError: - # No data available right now, continue loop - continue - except OSError as e: - # PTY closed or error - logger.debug("read_pty_output: OSError on read: %s", e) - break - - # Process complete lines - buffer = _flush_lines(buffer, output_lines) - + buffer += chunk + except BlockingIOError: + continue except OSError as e: - # PTY closed or read error - logger.debug("read_pty_output: OSError in loop: %s", e) + logger.debug("read_output: OSError: %s", e) break - finally: - # Drain any remaining data from the PTY buffer. - # On macOS, PTY output may still be in the kernel buffer - # after the subprocess exits and the stop flag is set. - # Use select() with a timeout to poll for readability - # instead of immediately breaking on BlockingIOError, - # giving the macOS PTY kernel buffer time to deliver - # remaining data. - # Bound the drain to prevent spinning indefinitely if a - # grandchild process holds the PTY slave fd open. - try: - drain_deadline = _monotonic() + DRAIN_TIMEOUT_SECONDS - drained = 0 - consecutive_empty = 0 - while drained < MAX_DRAIN_BYTES and _monotonic() < drain_deadline: - # Poll for readability with a short timeout. - # This avoids the race where a non-blocking read - # raises BlockingIOError because the macOS PTY - # kernel buffer hasn't delivered the data yet. - remaining = drain_deadline - _monotonic() - if remaining <= 0: - break - timeout_s = min(remaining, 0.1) - try: - readable, _, _ = select.select([parent_fd], [], [], timeout_s) - except (ValueError, OSError): - # fd closed or invalid - break - if not readable: - # On macOS, data may not be available on the - # first select() call even though the subprocess - # has already written and exited. Keep retrying - # until we see several consecutive empty polls, - # which indicates the buffer is truly drained. - consecutive_empty += 1 - if consecutive_empty >= DRAIN_MAX_EMPTY_POLLS: - break - continue - consecutive_empty = 0 - try: - chunk = os.read(parent_fd, 4096) - if not chunk: - break - buffer += chunk - drained += len(chunk) - except (BlockingIOError, OSError): - break - buffer = _flush_lines(buffer, output_lines) - except Exception: - logger.debug("read_pty_output: error during drain", exc_info=True) - - logger.debug("read_pty_output: exiting, processed %d iterations", read_count) + finally: if buffer: line_decoded = buffer.decode(errors="replace").rstrip() if line_decoded: @@ -439,78 +305,51 @@ async def read_pty_output() -> None: # noqa: C901 logger.info("%s", line_decoded) async def wait_for_process() -> int: - """Wait for the subprocess to complete. - - Ensures the subprocess is properly reaped even if cancelled, - preventing zombie processes. - """ + """Wait for the subprocess to complete.""" logger.debug("wait_for_process: waiting for PID %d", process.pid) try: result = await anyio.to_thread.run_sync(process.wait, abandon_on_cancel=True) logger.debug("wait_for_process: PID %d exited with code %d", process.pid, result) return result finally: - # Ensure subprocess is reaped on cancellation to prevent zombies if process.poll() is None: logger.debug("wait_for_process: cleaning up still-running PID %d", process.pid) try: process.terminate() - # Give it a moment to terminate gracefully for _ in range(10): if process.poll() is not None: break await anyio.sleep(0.1) - # Force kill if still running if process.poll() is None: logger.debug("wait_for_process: force killing PID %d", process.pid) process.kill() - # Final reap with non-abandoning wait await anyio.to_thread.run_sync(process.wait, abandon_on_cancel=False) except Exception as e: logger.debug("wait_for_process: error during cleanup: %s", e) - # Use move_on_after for timeout returncode: int | None = None - logger.debug("Starting PTY output reader and process waiter (timeout=%d)", timeout) - - # Yield to event loop to ensure other tasks can progress - # This helps prevent race conditions in task scheduling - await anyio.sleep(0) + logger.debug("Starting output reader and process waiter (timeout=%d)", timeout) with anyio.move_on_after(timeout) as cancel_scope: - # Run output reading and process waiting concurrently async with anyio.create_task_group() as tg: - logger.debug("Task group created, starting tasks...") - tg.start_soon(read_pty_output) - logger.debug("Waiting for subprocess to complete...") + tg.start_soon(read_output) returncode = await wait_for_process() logger.debug("Subprocess completed with code: %s", returncode) - # Give a brief moment for any final output to be read - await anyio.sleep(0.2) - # Signal the read task to stop via the dedicated stop flag. - # The read task checks this flag after each 0.1s timeout - # and also receives EOF when the subprocess exits. - # Note: pty_state.parent_fd_open stays True so the finally block - # properly closes parent_fd. - pty_state.reader_stop = True - logger.debug("Stop flag set, waiting for read task to exit") - # Don't cancel - let the task exit naturally via EOF or flag check - # Cancellation can cause unexpected side effects on gRPC connections + # Yield to let the LogStream deliver any pending + # messages before reporting the hook result. + await anyio.sleep(0) if cancel_scope.cancelled_caught: timed_out = True error_msg = f"Hook timed out after {timeout} seconds" logger.error(error_msg) - # Terminate the process if process and process.poll() is None: process.terminate() - # Give it a moment to terminate gracefully try: with anyio.move_on_after(5): await anyio.to_thread.run_sync(process.wait, abandon_on_cancel=True) except Exception: pass - # Force kill if still running if process.poll() is None: process.kill() try: @@ -529,23 +368,13 @@ async def wait_for_process() -> int: cause = e logger.error(error_msg, exc_info=True) finally: - # Clean up file descriptors - only close those still open to avoid - # closing an unrelated fd that reused the same number. - if pty_state.parent_fd_open: - try: - os.close(parent_fd) - except OSError: - pass - if pty_state.child_fd_open: + if process and process.stdout: try: - os.close(child_fd) + process.stdout.close() except OSError: pass - # Handle failure inside context_log_source so the WARNING log is - # routed to the client as a hook log (visible without --exporter-logs). if error_msg is not None: - # For timeout, create a TimeoutError as the cause if timed_out and cause is None: cause = TimeoutError(error_msg) return self._handle_hook_failure(error_msg, on_failure, hook_type, cause) diff --git a/python/packages/jumpstarter/jumpstarter/exporter/hooks_test.py b/python/packages/jumpstarter/jumpstarter/exporter/hooks_test.py index 5a5f6b4b0..da54e9f51 100644 --- a/python/packages/jumpstarter/jumpstarter/exporter/hooks_test.py +++ b/python/packages/jumpstarter/jumpstarter/exporter/hooks_test.py @@ -1,5 +1,3 @@ -import os -import sys from contextlib import nullcontext from unittest.mock import AsyncMock, MagicMock, patch @@ -8,97 +6,13 @@ from jumpstarter.common import HOOK_WARNING_PREFIX, ExporterStatus from jumpstarter.config.exporter import HookConfigV1Alpha1, HookInstanceConfigV1Alpha1 from jumpstarter.exporter.hooks import ( - DRAIN_MAX_EMPTY_POLLS, - DRAIN_TIMEOUT_SECONDS, - MAX_DRAIN_BYTES, HookExecutionError, HookExecutor, _flush_lines, - _monotonic, ) pytestmark = pytest.mark.anyio -# Tests that spawn real subprocesses via PTY and assert on captured logger -# output are flaky on macOS due to a PTY kernel buffer timing race condition. -# See https://github.com/jumpstarter-dev/jumpstarter/issues/821 -# Targeted for proper fix in 0.10.0. -macos_pty_xfail = pytest.mark.xfail( - condition=sys.platform == "darwin", - reason="PTY output race condition on macOS (#821)", - strict=False, -) - - -class _PtyTracker: - """Tracks PTY fd and EOF state for drain tests that need to intercept - os.read and pty.openpty calls. - - When ``return_drain_data`` is True (default), the first os.read after EOF - returns ``b"SHOULD_NOT_APPEAR\\n"``; otherwise it returns ``b""``. - """ - - def __init__(self, *, return_drain_data: bool = True) -> None: - import pty - - self.parent_fd: int | None = None - self.eof_seen: bool = False - self._drain_data_returned: bool = False - self._return_drain_data = return_drain_data - self._original_openpty = pty.openpty - self._original_os_read = os.read - - def tracking_openpty(self): - parent, child = self._original_openpty() - self.parent_fd = parent - return parent, child - - def os_read_with_drain_data(self, fd, size): - if fd != self.parent_fd: - return self._original_os_read(fd, size) - if not self.eof_seen: - try: - data = self._original_os_read(fd, size) - except (BlockingIOError, OSError): - self.eof_seen = True - raise - if not data: - self.eof_seen = True - return b"" - return data - if self._return_drain_data and not self._drain_data_returned: - self._drain_data_returned = True - return b"SHOULD_NOT_APPEAR\n" - return b"" - - -class _DrainDeadlineClock: - """A callable that replaces ``_monotonic`` to simulate the drain - deadline being exceeded between the ``while`` condition check and the - ``remaining`` calculation. - - Only patches the hooks module's ``_monotonic`` reference, leaving - ``time.monotonic`` (used by the asyncio event loop) unaffected. - """ - - def __init__(self, real_monotonic, state: _PtyTracker) -> None: - self._real = real_monotonic - self._state = state - self._call_count = 0 - self._deadline: float | None = None - - def __call__(self) -> float: - real_time = self._real() - if not self._state.eof_seen: - return real_time - self._call_count += 1 - if self._call_count == 1: - self._deadline = real_time + DRAIN_TIMEOUT_SECONDS - return real_time - if self._call_count == 2: - return self._deadline - 0.001 # type: ignore[operator] - return self._deadline + 1.0 # type: ignore[operator] - class TestFlushLines: def test_extracts_complete_lines(self) -> None: @@ -207,7 +121,7 @@ async def test_hook_timeout(self, lease_scope) -> None: assert "timed out after 1 seconds" in str(exc_info.value) assert exc_info.value.on_failure == "exit" - @macos_pty_xfail + async def test_hook_environment_variables(self, lease_scope) -> None: hook_config = HookConfigV1Alpha1( before_lease=HookInstanceConfigV1Alpha1( @@ -222,7 +136,7 @@ async def test_hook_environment_variables(self, lease_scope) -> None: assert any("LEASE_NAME=test-lease-123" in call for call in info_calls) assert any("CLIENT_NAME=test-client" in call for call in info_calls) - @macos_pty_xfail + async def test_real_time_output_logging(self, lease_scope) -> None: """Test that hook output is logged in real-time at INFO level.""" hook_config = HookConfigV1Alpha1( @@ -240,7 +154,7 @@ async def test_real_time_output_logging(self, lease_scope) -> None: assert any("Line 2" in call for call in info_calls) assert any("Line 3" in call for call in info_calls) - @macos_pty_xfail + async def test_post_lease_hook_execution_on_completion(self, lease_scope) -> None: """Test that post-lease hook executes when called directly.""" hook_config = HookConfigV1Alpha1( @@ -351,7 +265,7 @@ async def test_successful_hook_returns_none(self, lease_scope) -> None: result = await executor.execute_before_lease_hook(lease_scope) assert result is None - @macos_pty_xfail + async def test_exec_bash(self, lease_scope) -> None: """Test that exec=/bin/bash allows bash-specific syntax. @@ -373,7 +287,7 @@ async def test_exec_bash(self, lease_scope) -> None: info_calls = [str(call) for call in mock_logger.info.call_args_list] assert any("BASH_OK: world" in call for call in info_calls) - @macos_pty_xfail + async def test_exec_python3(self, lease_scope) -> None: """Test that exec=python3 runs inline Python. @@ -396,7 +310,7 @@ async def test_exec_python3(self, lease_scope) -> None: # Expected total: 0 + 1 + 4 + 9 == 14 assert any("PYTHON_OK: 14" in call for call in info_calls) - @macos_pty_xfail + async def test_script_file_sh(self, lease_scope, tmp_path) -> None: """Test that a .sh file auto-detects /bin/sh as interpreter.""" script_file = tmp_path / "hook_script.sh" @@ -419,7 +333,7 @@ async def test_script_file_sh(self, lease_scope, tmp_path) -> None: debug_calls = [str(call) for call in mock_logger.debug.call_args_list] assert any("Executing script file" in call for call in debug_calls) - @macos_pty_xfail + async def test_script_file_py_autodetects_python(self, lease_scope, tmp_path) -> None: """Test that a .py file auto-detects the exporter's Python as interpreter.""" import sys @@ -446,7 +360,7 @@ async def test_script_file_py_autodetects_python(self, lease_scope, tmp_path) -> # Verify it used the exporter's own Python interpreter assert any(sys.executable in call for call in debug_calls) - @macos_pty_xfail + async def test_script_file_py_exec_override(self, lease_scope, tmp_path) -> None: """Test that explicit exec overrides .py auto-detection.""" script_file = tmp_path / "hook_script.py" @@ -470,7 +384,7 @@ async def test_script_file_py_exec_override(self, lease_scope, tmp_path) -> None debug_calls = [str(call) for call in mock_logger.debug.call_args_list] assert not any("Auto-detected" in call for call in debug_calls) - @macos_pty_xfail + async def test_noninteractive_environment(self, lease_scope) -> None: """Test that hooks receive noninteractive environment variables. @@ -570,168 +484,8 @@ async def test_before_lease_hook_endlease_handles_release_error(self, lease_scop assert lease_scope.skip_after_lease_hook is True mock_request_lease_release.assert_called_once() - async def test_pty_output_drained_after_stop_flag_set(self) -> None: - """Test that PTY drain captures data remaining after the stop flag is set. - - Simulates the macOS scenario where PTY output is still in the kernel - buffer after the subprocess exits and reader_stop is set. Uses a pipe - to inject data, sets reader_stop=True to skip the main loop, and - verifies the finally-block drain captures all lines. - """ - import fcntl - import time - - read_fd, write_fd = os.pipe() - try: - flags = fcntl.fcntl(read_fd, fcntl.F_GETFL) - fcntl.fcntl(read_fd, fcntl.F_SETFL, flags | os.O_NONBLOCK) - - os.write(write_fd, b"DRAIN_LINE_1\nDRAIN_LINE_2\nDRAIN_LINE_3\n") - os.close(write_fd) - write_fd = -1 - - output_lines: list[str] = [] - buffer = b"" - - drain_deadline = time.monotonic() + DRAIN_TIMEOUT_SECONDS - drained = 0 - while drained < MAX_DRAIN_BYTES and time.monotonic() < drain_deadline: - try: - chunk = os.read(read_fd, 4096) - if not chunk: - break - buffer += chunk - drained += len(chunk) - except (BlockingIOError, OSError): - break - - buffer = _flush_lines(buffer, output_lines) - - assert "DRAIN_LINE_1" in output_lines - assert "DRAIN_LINE_2" in output_lines - assert "DRAIN_LINE_3" in output_lines - finally: - os.close(read_fd) - if write_fd != -1: - os.close(write_fd) - - async def test_drain_respects_byte_limit(self) -> None: - """Verify the drain loop stops after MAX_DRAIN_BYTES to prevent - indefinite blocking when a grandchild process holds the PTY open. - - Directly tests the drain logic using a pipe with data exceeding the - byte limit. Uses non-blocking writes to fill the pipe without blocking. - """ - import fcntl - import time - - read_fd, write_fd = os.pipe() - try: - flags = fcntl.fcntl(read_fd, fcntl.F_GETFL) - fcntl.fcntl(read_fd, fcntl.F_SETFL, flags | os.O_NONBLOCK) - wflags = fcntl.fcntl(write_fd, fcntl.F_GETFL) - fcntl.fcntl(write_fd, fcntl.F_SETFL, wflags | os.O_NONBLOCK) - - total_written = 0 - chunk = b"X" * 4000 + b"\n" - try: - while True: - os.write(write_fd, chunk) - total_written += len(chunk) - except BlockingIOError: - pass - - assert total_written > 0 - - output_lines: list[str] = [] - buffer = b"" - drain_deadline = time.monotonic() + DRAIN_TIMEOUT_SECONDS - drained = 0 - while drained < MAX_DRAIN_BYTES and time.monotonic() < drain_deadline: - try: - data = os.read(read_fd, 4096) - if not data: - break - buffer += data - drained += len(data) - except (BlockingIOError, OSError): - break - - buffer = _flush_lines(buffer, output_lines) - - assert drained <= MAX_DRAIN_BYTES - assert len(output_lines) > 0 - finally: - os.close(read_fd) - os.close(write_fd) - - async def test_drain_completes_immediately_on_empty_buffer(self) -> None: - """Verify drain exits quickly when the PTY buffer is empty (EOF).""" - import time - - read_fd, write_fd = os.pipe() - os.close(write_fd) - try: - import fcntl - - flags = fcntl.fcntl(read_fd, fcntl.F_GETFL) - fcntl.fcntl(read_fd, fcntl.F_SETFL, flags | os.O_NONBLOCK) - - output_lines: list[str] = [] - buffer = b"" - start = time.monotonic() - - drain_deadline = time.monotonic() + DRAIN_TIMEOUT_SECONDS - drained = 0 - while drained < MAX_DRAIN_BYTES and time.monotonic() < drain_deadline: - try: - chunk = os.read(read_fd, 4096) - if not chunk: - break - buffer += chunk - drained += len(chunk) - except (BlockingIOError, OSError): - break - - buffer = _flush_lines(buffer, output_lines) - elapsed = time.monotonic() - start - - assert output_lines == [] - assert drained == 0 - assert elapsed < 0.5 - finally: - os.close(read_fd) - - async def test_drain_handles_oserror_gracefully(self) -> None: - """Verify drain exits gracefully when os.read raises OSError (e.g. EIO).""" - import time - - read_fd, write_fd = os.pipe() - os.close(write_fd) - os.close(read_fd) - - output_lines: list[str] = [] - buffer = b"" - - drain_deadline = time.monotonic() + DRAIN_TIMEOUT_SECONDS - drained = 0 - while drained < MAX_DRAIN_BYTES and time.monotonic() < drain_deadline: - try: - chunk = os.read(read_fd, 4096) - if not chunk: - break - buffer += chunk - drained += len(chunk) - except (BlockingIOError, OSError): - break - - buffer = _flush_lines(buffer, output_lines) - - assert output_lines == [] - assert drained == 0 - @macos_pty_xfail - async def test_drain_captures_output_without_trailing_newline(self, lease_scope) -> None: + async def test_output_captured_without_trailing_newline(self, lease_scope) -> None: """Verify output without a trailing newline is still captured.""" hook_config = HookConfigV1Alpha1( before_lease=HookInstanceConfigV1Alpha1( @@ -747,343 +501,6 @@ async def test_drain_captures_output_without_trailing_newline(self, lease_scope) info_calls = [str(call) for call in mock_logger.info.call_args_list] assert any("NO_NEWLINE_OUTPUT" in call for call in info_calls) - @macos_pty_xfail - async def test_drain_reads_data_remaining_in_pty_buffer(self, lease_scope) -> None: - """Verify the drain loop inside read_pty_output reads data left in the - PTY kernel buffer after the main read loop exits. - - Patches os.read so that, once the main loop has consumed the initial - subprocess output via EOF from the specific PTY fd, a subsequent read - returns additional data - simulating the macOS scenario where the - kernel buffers output that arrives after the reader stop flag is set. - """ - import pty - - hook_config = HookConfigV1Alpha1( - before_lease=HookInstanceConfigV1Alpha1( - script="echo MAIN_OUTPUT", - timeout=10, - ), - ) - executor = HookExecutor(config=hook_config) - - original_os_read = os.read - original_openpty = pty.openpty - pty_parent_fd = None - eof_seen_on_pty = False - - def tracking_openpty(): - nonlocal pty_parent_fd - parent, child = original_openpty() - pty_parent_fd = parent - return parent, child - - drain_data_returned = False - - def os_read_with_drain_data(fd, size): - nonlocal eof_seen_on_pty, drain_data_returned - if fd != pty_parent_fd: - return original_os_read(fd, size) - if not eof_seen_on_pty: - try: - data = original_os_read(fd, size) - except (BlockingIOError, OSError): - if not eof_seen_on_pty: - eof_seen_on_pty = True - raise - if not data: - eof_seen_on_pty = True - return b"" - return data - if not drain_data_returned: - drain_data_returned = True - return b"DRAIN_CAPTURED\n" - return b"" - - with ( - patch("pty.openpty", side_effect=tracking_openpty), - patch("os.read", side_effect=os_read_with_drain_data), - patch("jumpstarter.exporter.hooks.logger") as mock_logger, - ): - result = await executor.execute_before_lease_hook(lease_scope) - assert result is None - assert pty_parent_fd is not None - assert eof_seen_on_pty - info_calls = [str(call) for call in mock_logger.info.call_args_list] - assert any("DRAIN_CAPTURED" in call for call in info_calls) - - @macos_pty_xfail - async def test_drain_select_oserror_exits_gracefully(self, lease_scope) -> None: - """Verify the drain loop exits gracefully when select.select() raises - OSError (e.g. fd closed during drain). - - Patches select.select inside the drain to raise OSError, simulating a - closed or invalid fd. The hook should still complete successfully. - """ - import select as select_mod - - original_select = select_mod.select - state = _PtyTracker() - - def select_with_oserror(rlist, wlist, xlist, timeout=None): - if state.eof_seen and rlist and rlist[0] == state.parent_fd: - raise OSError("simulated fd closed during drain") - return original_select(rlist, wlist, xlist, timeout) - - hook_config = HookConfigV1Alpha1( - before_lease=HookInstanceConfigV1Alpha1( - script="echo SELECT_ERROR_TEST", timeout=10, - ), - ) - executor = HookExecutor(config=hook_config) - - with ( - patch("pty.openpty", side_effect=state.tracking_openpty), - patch("os.read", side_effect=state.os_read_with_drain_data), - patch("jumpstarter.exporter.hooks.select.select", side_effect=select_with_oserror), - patch("jumpstarter.exporter.hooks.logger") as mock_logger, - ): - result = await executor.execute_before_lease_hook(lease_scope) - assert result is None - assert state.eof_seen - info_calls = [str(call) for call in mock_logger.info.call_args_list] - assert any("SELECT_ERROR_TEST" in call for call in info_calls) - - @macos_pty_xfail - async def test_drain_select_valueerror_exits_gracefully(self, lease_scope) -> None: - """Verify the drain loop exits gracefully when select.select() raises - ValueError (e.g. negative fd). - - This covers the except (ValueError, OSError) handler in the drain loop. - """ - import select as select_mod - - original_select = select_mod.select - state = _PtyTracker(return_drain_data=False) - - def select_with_valueerror(rlist, wlist, xlist, timeout=None): - if state.eof_seen and rlist and rlist[0] == state.parent_fd: - raise ValueError("file descriptor cannot be a negative integer (-1)") - return original_select(rlist, wlist, xlist, timeout) - - hook_config = HookConfigV1Alpha1( - before_lease=HookInstanceConfigV1Alpha1( - script="echo VALUEERROR_TEST", timeout=10, - ), - ) - executor = HookExecutor(config=hook_config) - - with ( - patch("pty.openpty", side_effect=state.tracking_openpty), - patch("os.read", side_effect=state.os_read_with_drain_data), - patch("jumpstarter.exporter.hooks.select.select", side_effect=select_with_valueerror), - patch("jumpstarter.exporter.hooks.logger") as mock_logger, - ): - result = await executor.execute_before_lease_hook(lease_scope) - assert result is None - info_calls = [str(call) for call in mock_logger.info.call_args_list] - assert any("VALUEERROR_TEST" in call for call in info_calls) - - @macos_pty_xfail - async def test_drain_exits_when_deadline_exceeded_before_select(self, lease_scope) -> None: - """Verify the drain loop exits when the deadline is exceeded between the - while condition and the remaining-time check (line: if remaining <= 0). - - Patches ``jumpstarter.exporter.hooks._monotonic`` (not ``time.monotonic`` - globally) to simulate a jump past the deadline after the while condition - passes but before the remaining check. Using the module-level - ``_monotonic`` reference avoids breaking the asyncio event loop, which - also relies on ``time.monotonic``. - """ - state = _PtyTracker() - clock = _DrainDeadlineClock(_monotonic, state) - - hook_config = HookConfigV1Alpha1( - before_lease=HookInstanceConfigV1Alpha1( - script="echo DEADLINE_TEST", timeout=10, - ), - ) - executor = HookExecutor(config=hook_config) - - with ( - patch("pty.openpty", side_effect=state.tracking_openpty), - patch("os.read", side_effect=state.os_read_with_drain_data), - patch("jumpstarter.exporter.hooks._monotonic", side_effect=clock), - patch("jumpstarter.exporter.hooks.logger") as mock_logger, - ): - result = await executor.execute_before_lease_hook(lease_scope) - assert result is None - info_calls = [str(call) for call in mock_logger.info.call_args_list] - assert any("DEADLINE_TEST" in call for call in info_calls) - # SHOULD_NOT_APPEAR should not be in output because the drain - # exited early due to remaining <= 0 before select could run - assert not any("SHOULD_NOT_APPEAR" in call for call in info_calls) - - @macos_pty_xfail - async def test_drain_exception_is_suppressed(self, lease_scope) -> None: - """Verify that an unexpected exception raised during the drain is caught - by the except-Exception handler and does not propagate to the caller. - - Patches _flush_lines so that the second call (inside the drain) raises - a RuntimeError. The hook should still complete successfully because the - drain's except-Exception block suppresses it. - """ - hook_config = HookConfigV1Alpha1( - before_lease=HookInstanceConfigV1Alpha1( - script="echo BEFORE_DRAIN_ERROR", - timeout=10, - ), - ) - executor = HookExecutor(config=hook_config) - - original_flush = _flush_lines - call_count = 0 - - def flush_lines_with_drain_error(buffer, output_lines): - nonlocal call_count - call_count += 1 - result = original_flush(buffer, output_lines) - if call_count > 1: - raise RuntimeError("simulated drain error") - return result - - with ( - patch("jumpstarter.exporter.hooks._flush_lines", side_effect=flush_lines_with_drain_error), - patch("jumpstarter.exporter.hooks.logger"), - ): - result = await executor.execute_before_lease_hook(lease_scope) - assert result is None - - @macos_pty_xfail - async def test_drain_retries_empty_select_then_captures_data(self, lease_scope) -> None: - """Verify that the drain retries after empty select() calls and still - captures data that arrives later. - - Patches select.select to return empty for the first N calls (where - N < DRAIN_MAX_EMPTY_POLLS), then reports the fd as readable. The - hook output should still be captured despite the initial empty polls. - """ - import select as select_mod - - original_select = select_mod.select - state = _PtyTracker() - empty_count = 0 - empties_before_data = DRAIN_MAX_EMPTY_POLLS - 2 # e.g. 8 empties then data - - def select_with_delayed_ready(rlist, wlist, xlist, timeout=None): - nonlocal empty_count - if state.eof_seen and rlist and rlist[0] == state.parent_fd: - empty_count += 1 - if empty_count <= empties_before_data: - return ([], [], []) # simulate delayed data - return original_select(rlist, wlist, xlist, timeout) - - hook_config = HookConfigV1Alpha1( - before_lease=HookInstanceConfigV1Alpha1( - script="echo DELAYED_DRAIN_OK", timeout=10, - ), - ) - executor = HookExecutor(config=hook_config) - - with ( - patch("pty.openpty", side_effect=state.tracking_openpty), - patch("os.read", side_effect=state.os_read_with_drain_data), - patch("jumpstarter.exporter.hooks.select.select", side_effect=select_with_delayed_ready), - patch("jumpstarter.exporter.hooks.logger") as mock_logger, - ): - result = await executor.execute_before_lease_hook(lease_scope) - assert result is None - info_calls = [str(call) for call in mock_logger.info.call_args_list] - assert any("DELAYED_DRAIN_OK" in call for call in info_calls) - - @macos_pty_xfail - async def test_drain_terminates_after_max_empty_polls(self, lease_scope) -> None: - """Verify the drain loop terminates after DRAIN_MAX_EMPTY_POLLS - consecutive empty select() results. - - Patches select.select to always return empty during the drain phase. - The hook should still complete (no hang) and the drain data should - not appear since it's never read. - """ - import select as select_mod - - original_select = select_mod.select - state = _PtyTracker(return_drain_data=False) - - def select_always_empty(rlist, wlist, xlist, timeout=None): - if state.eof_seen and rlist and rlist[0] == state.parent_fd: - return ([], [], []) # always empty - return original_select(rlist, wlist, xlist, timeout) - - hook_config = HookConfigV1Alpha1( - before_lease=HookInstanceConfigV1Alpha1( - script="echo MAX_EMPTY_TEST", timeout=10, - ), - ) - executor = HookExecutor(config=hook_config) - - with ( - patch("pty.openpty", side_effect=state.tracking_openpty), - patch("os.read", side_effect=state.os_read_with_drain_data), - patch("jumpstarter.exporter.hooks.select.select", side_effect=select_always_empty), - patch("jumpstarter.exporter.hooks.logger") as mock_logger, - ): - result = await executor.execute_before_lease_hook(lease_scope) - assert result is None - # Main loop should have captured the output before drain - info_calls = [str(call) for call in mock_logger.info.call_args_list] - assert any("MAX_EMPTY_TEST" in call for call in info_calls) - - @macos_pty_xfail - async def test_drain_empty_counter_resets_on_data(self, lease_scope) -> None: - """Verify the consecutive empty poll counter resets when data arrives. - - Simulates an empty-data-empty pattern during drain: a few empty polls, - then data becomes readable, then more empty polls. The counter should - reset after data is read, so the drain should tolerate more than - DRAIN_MAX_EMPTY_POLLS total empties as long as they are not consecutive. - """ - import select as select_mod - - original_select = select_mod.select - state = _PtyTracker() - drain_select_call = 0 - # Pattern: 5 empties, then ready, then 5 more empties, then ready - # Total empties (10) >= DRAIN_MAX_EMPTY_POLLS but never consecutive - pattern = [False] * 5 + [True] + [False] * 5 + [True] - - def select_with_interleaved_empties(rlist, wlist, xlist, timeout=None): - nonlocal drain_select_call - if state.eof_seen and rlist and rlist[0] == state.parent_fd: - idx = drain_select_call - drain_select_call += 1 - if idx < len(pattern) and not pattern[idx]: - return ([], [], []) - return original_select(rlist, wlist, xlist, timeout) - - hook_config = HookConfigV1Alpha1( - before_lease=HookInstanceConfigV1Alpha1( - script="echo INTERLEAVE_TEST", timeout=10, - ), - ) - executor = HookExecutor(config=hook_config) - - with ( - patch("pty.openpty", side_effect=state.tracking_openpty), - patch("os.read", side_effect=state.os_read_with_drain_data), - patch("jumpstarter.exporter.hooks.select.select", side_effect=select_with_interleaved_empties), - patch("jumpstarter.exporter.hooks.logger") as mock_logger, - ): - result = await executor.execute_before_lease_hook(lease_scope) - assert result is None - info_calls = [str(call) for call in mock_logger.info.call_args_list] - assert any("INTERLEAVE_TEST" in call for call in info_calls) - - async def test_drain_constants_are_reasonable(self) -> None: - assert MAX_DRAIN_BYTES == 256 * 1024 - assert DRAIN_TIMEOUT_SECONDS == 2.0 - assert DRAIN_MAX_EMPTY_POLLS == 10 - async def test_exec_default_is_none(self) -> None: """Test that the default exec is None (auto-detect).""" hook = HookInstanceConfigV1Alpha1(script="echo hello") @@ -1093,7 +510,7 @@ async def test_exec_default_is_none(self) -> None: class TestHookExecutorPRRegressions: """Regression tests for issues reported during PR review of hooks feature.""" - @macos_pty_xfail + async def test_infrastructure_messages_at_debug_not_info(self, lease_scope) -> None: """Issue A1: Hook infrastructure messages should be at DEBUG, not INFO. @@ -1117,7 +534,6 @@ async def test_infrastructure_messages_at_debug_not_info(self, lease_scope) -> N # Infrastructure messages should be at DEBUG level infra_messages = [ "Starting hook subprocess", - "Creating PTY", "Spawning subprocess", "Subprocess spawned", "Hook executed successfully", @@ -1629,3 +1045,145 @@ async def mock_report_status(status, msg): assert len(hook_started_calls) == 1, ( f"BEFORE_LEASE_HOOK must be reported (hook must run) even when lease has ended, got: {status_calls}" ) + + +class TestPipeOutputEdgeCases: + """Edge cases for pipe-based output capture (PR #837).""" + + async def test_stderr_captured_via_pipe_merge(self, lease_scope) -> None: + hook_config = HookConfigV1Alpha1( + before_lease=HookInstanceConfigV1Alpha1( + script="echo STDOUT_LINE; echo STDERR_LINE >&2", + timeout=10, + ), + ) + executor = HookExecutor(config=hook_config) + + with patch("jumpstarter.exporter.hooks.logger") as mock_logger: + result = await executor.execute_before_lease_hook(lease_scope) + + assert result is None + info_calls = [str(call) for call in mock_logger.info.call_args_list] + assert any("STDOUT_LINE" in call for call in info_calls) + assert any("STDERR_LINE" in call for call in info_calls) + + async def test_large_output_spanning_multiple_reads(self, lease_scope) -> None: + hook_config = HookConfigV1Alpha1( + before_lease=HookInstanceConfigV1Alpha1( + script=( + "seq 1 200 | while read n; do " + "echo \"LINE_${n}_PADDING_XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX\"; " + "done" + ), + timeout=10, + ), + ) + executor = HookExecutor(config=hook_config) + + with patch("jumpstarter.exporter.hooks.logger") as mock_logger: + result = await executor.execute_before_lease_hook(lease_scope) + + assert result is None + info_calls = [str(call) for call in mock_logger.info.call_args_list] + assert any("LINE_1_" in call for call in info_calls) + assert any("LINE_100_" in call for call in info_calls) + assert any("LINE_200_" in call for call in info_calls) + + async def test_non_utf8_output_decoded_with_replacement(self, lease_scope) -> None: + hook_config = HookConfigV1Alpha1( + before_lease=HookInstanceConfigV1Alpha1( + exec_="python3", + script=( + "import sys; " + "sys.stdout.buffer.write(b'VALID_PREFIX\\x80VALID_SUFFIX\\n')" + ), + timeout=10, + ), + ) + executor = HookExecutor(config=hook_config) + + with patch("jumpstarter.exporter.hooks.logger") as mock_logger: + result = await executor.execute_before_lease_hook(lease_scope) + + assert result is None + info_calls = [str(call) for call in mock_logger.info.call_args_list] + matching = [ + call for call in info_calls + if "VALID_PREFIX" in call and "VALID_SUFFIX" in call + ] + assert len(matching) > 0 + + async def test_rapid_exit_with_buffered_output(self, lease_scope) -> None: + hook_config = HookConfigV1Alpha1( + before_lease=HookInstanceConfigV1Alpha1( + script="echo FAST_1; echo FAST_2; echo FAST_3; echo FAST_4; echo FAST_5", + timeout=10, + ), + ) + executor = HookExecutor(config=hook_config) + + with patch("jumpstarter.exporter.hooks.logger") as mock_logger: + result = await executor.execute_before_lease_hook(lease_scope) + + assert result is None + info_calls = [str(call) for call in mock_logger.info.call_args_list] + for i in range(1, 6): + assert any(f"FAST_{i}" in call for call in info_calls), ( + f"FAST_{i} was not captured" + ) + + async def test_spawn_failure_cleans_up_without_crash(self, lease_scope) -> None: + hook_config = HookConfigV1Alpha1( + before_lease=HookInstanceConfigV1Alpha1( + exec_="/nonexistent/interpreter", + script="echo should not run", + timeout=10, + on_failure="warn", + ), + ) + executor = HookExecutor(config=hook_config) + + result = await executor.execute_before_lease_hook(lease_scope) + assert result is not None + assert "error" in result.lower() + + async def test_interleaved_stdout_and_stderr_captured(self, lease_scope) -> None: + hook_config = HookConfigV1Alpha1( + before_lease=HookInstanceConfigV1Alpha1( + script=( + "echo OUT_1; echo ERR_1 >&2; " + "echo OUT_2; echo ERR_2 >&2; " + "echo OUT_3" + ), + timeout=10, + ), + ) + executor = HookExecutor(config=hook_config) + + with patch("jumpstarter.exporter.hooks.logger") as mock_logger: + result = await executor.execute_before_lease_hook(lease_scope) + + assert result is None + info_calls = [str(call) for call in mock_logger.info.call_args_list] + for label in ("OUT_1", "OUT_2", "OUT_3", "ERR_1", "ERR_2"): + assert any(label in call for call in info_calls), ( + f"{label} was not captured" + ) + + async def test_timeout_with_grandchild_holding_pipe(self, lease_scope) -> None: + hook_config = HookConfigV1Alpha1( + before_lease=HookInstanceConfigV1Alpha1( + script="echo GRANDCHILD_TEST; sleep 10 &", + timeout=2, + on_failure="warn", + ), + ) + executor = HookExecutor(config=hook_config) + + with patch("jumpstarter.exporter.hooks.logger") as mock_logger: + result = await executor.execute_before_lease_hook(lease_scope) + + assert result is not None + assert "timed out" in result.lower() + info_calls = [str(call) for call in mock_logger.info.call_args_list] + assert any("GRANDCHILD_TEST" in call for call in info_calls)