From 50453a350888cfaf6d52201cc9ee01fb34723768 Mon Sep 17 00:00:00 2001 From: Debug Agent Date: Fri, 22 May 2026 01:54:58 +0200 Subject: [PATCH 1/6] fix(acp): override astep to keep post-prompt callbacks on caller thread MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ACPAgent inherited the default AgentBase.astep, which routes the synchronous step() through loop.run_in_executor(None, self.step, ...). That moves every post-prompt callback (including telemetry's stats_callback, which re-enters state.lock) onto a worker thread — while LocalConversation.arun holds the conversation state's reentrant FIFOLock on the loop thread. The callback then blocks on a lock owned by the thread that is itself awaiting astep to return, deadlocking the conversation. See #3348 for the original symptom and #3350 for the architectural diagnosis. The fix overrides astep natively: the ACP conn.prompt(...) round-trip is scheduled on the portal loop via BlockingPortal.start_task_soon, and the result is awaited back on the caller's loop via asyncio.wrap_future. All post-prompt work — _record_usage, _finalize_successful_turn, on_event emission, status flips — runs on the caller thread, where the FIFOLock owner already lives, so re-entrant acquires succeed. Both step() and astep() now share extracted helpers (_do_acp_prompt, _finalize_successful_turn, _emit_turn_timeout, _emit_turn_error, _clear_turn_callbacks). Adds three regression tests in TestACPAgentAstep: - astep overrides AgentBase.astep (structural guarantee) - post-prompt on_event callbacks fire on caller thread, not portal - astep completes when invoked while caller holds state.lock Fixes #3348 Closes #3350 Co-Authored-By: Claude Opus 4.7 (1M context) --- .../openhands/sdk/agent/acp_agent.py | 506 ++++++++++++------ tests/sdk/agent/test_acp_agent.py | 170 ++++++ 2 files changed, 517 insertions(+), 159 deletions(-) diff --git a/openhands-sdk/openhands/sdk/agent/acp_agent.py b/openhands-sdk/openhands/sdk/agent/acp_agent.py index 744c44fb34..d9bceb516d 100644 --- a/openhands-sdk/openhands/sdk/agent/acp_agent.py +++ b/openhands-sdk/openhands/sdk/agent/acp_agent.py @@ -1266,6 +1266,213 @@ def _build_acp_prompt( return None return blocks + # ------------------------------------------------------------------ + # Shared step / astep implementation + # ------------------------------------------------------------------ + # + # ``step`` (sync) and ``astep`` (async) both drive the same logical + # turn: send the latest user message to the ACP subprocess, await its + # response, emit FinishAction + ObservationEvent. The only thing + # they differ on is HOW they await ``self._conn.prompt(...)`` — the + # connection is bound to the AnyIO BlockingPortal's loop (set up in + # ``_start_acp_server``), which lives on its own thread. + # + # * ``step`` runs entirely on a single caller thread. It uses + # ``self._executor.run_async(...)`` to block the caller + # while the prompt coroutine runs on the portal loop. + # * ``astep`` runs on the caller's asyncio event loop (e.g. + # ``LocalConversation.arun``'s loop, which is also the + # FIFOLock owner). It uses + # ``portal.start_task_soon(...) + asyncio.wrap_future`` + # to schedule the prompt on the portal loop AND await + # its result back on the caller's loop — without leaving + # the caller's thread. All post-prompt work + # (``_record_usage`` → ``stats_callback``, event + # emission, status mutations) therefore happens on the + # caller's thread, where the conversation state lock is + # already held re-entrantly. This is what avoids the + # cross-thread state-lock deadlock described in #3348. + # + # The default ``AgentBase.astep`` would otherwise wrap ``step`` in + # ``loop.run_in_executor(None, self.step, ...)``, scheduling step on + # an unrelated executor thread while ``arun`` holds the state lock on + # the loop thread — every state-lock acquire inside step's call chain + # would then deadlock. + + async def _do_acp_prompt(self, prompt_blocks: list[Any]) -> PromptResponse: + """Single ACP ``conn.prompt`` call + optional UsageUpdate sync. + + Always runs on the portal's loop (where ``self._conn`` lives). + Does not implement retry — callers wrap this in their own retry + loop so the loop can use ``time.sleep`` (sync path) or + ``asyncio.sleep`` (async path) appropriately. + """ + usage_sync = self._client.prepare_usage_sync(self._session_id or "") + response = await self._conn.prompt(prompt_blocks, self._session_id) + if self._client.get_turn_usage_update(self._session_id or "") is None: + try: + await asyncio.wait_for(usage_sync.wait(), timeout=_USAGE_UPDATE_TIMEOUT) + except TimeoutError: + logger.warning( + "UsageUpdate not received within %.1fs for session %s", + _USAGE_UPDATE_TIMEOUT, + self._session_id, + ) + return response + + def _finalize_successful_turn( + self, + response: PromptResponse | None, + elapsed: float, + state: ConversationState, + on_event: ConversationCallbackType, + ) -> None: + """Common post-prompt bookkeeping. Sync — runs on caller thread.""" + session_id = self._session_id or "" + usage_update = self._client.pop_turn_usage_update(session_id) + self._record_usage( + response, + session_id, + elapsed=elapsed, + usage_update=usage_update, + ) + + # ACPToolCallEvents were already emitted live from + # _OpenHandsACPBridge.session_update as each ToolCallStart / + # ToolCallProgress notification arrived — no end-of-turn fan-out + # here. FinishAction closes out the turn below. + + response_text = "".join(self._client.accumulated_text) + thought_text = "".join(self._client.accumulated_thoughts) + if not response_text: + response_text = "(No response from ACP server)" + + # ACP step() boundaries are full remote assistant turns, not + # partial planning steps. Emit FinishAction to delimit that + # completed turn for eval/remote consumers, matching #2190. + finish_action = FinishAction(message=response_text) + tc_id = str(uuid.uuid4()) + action_event = ActionEvent( + source="agent", + thought=[], + reasoning_content=thought_text or None, + action=finish_action, + tool_name="finish", + tool_call_id=tc_id, + tool_call=MessageToolCall( + id=tc_id, + name="finish", + arguments=json.dumps({"message": response_text}), + origin="completion", + ), + llm_response_id=str(uuid.uuid4()), + ) + on_event(action_event) + on_event( + ObservationEvent( + observation=FinishObservation.from_text(text=response_text), + action_id=action_event.id, + tool_name="finish", + tool_call_id=tc_id, + ) + ) + state.execution_status = ConversationExecutionStatus.FINISHED + + def _emit_turn_timeout( + self, + elapsed: float, + state: ConversationState, + on_event: ConversationCallbackType, + ) -> None: + """Emit the error message + status flip when a turn times out.""" + logger.error( + "ACP prompt timed out after %.1fs (limit=%.0fs). " + "The ACP server may have completed its work but failed to " + "send the JSON-RPC response. Accumulated %d text chunks, " + "%d tool calls.", + elapsed, + self.acp_prompt_timeout, + len(self._client.accumulated_text), + len(self._client.accumulated_tool_calls), + ) + error_message = Message( + role="assistant", + content=[ + TextContent( + text=( + f"ACP prompt timed out after {elapsed:.0f}s. " + "The agent may have completed its work but " + "the response was not received." + ) + ) + ], + ) + # Close any tool cards left in flight from the timed-out attempt. + self._cancel_inflight_tool_calls() + on_event(MessageEvent(source="agent", llm_message=error_message)) + state.execution_status = ConversationExecutionStatus.ERROR + + def _emit_turn_error( + self, + exc: BaseException, + state: ConversationState, + on_event: ConversationCallbackType, + ) -> None: + """Emit the error event pair when a turn fails (non-timeout).""" + logger.error("ACP prompt failed: %s", exc, exc_info=True) + error_str = str(exc) + + # Close any tool cards left in flight before surfacing the error. + self._cancel_inflight_tool_calls() + + # Emit error as an agent message (existing behavior, preserved for + # consumers that inspect MessageEvents). + error_message = Message( + role="assistant", + content=[TextContent(text=f"ACP error: {exc}")], + ) + on_event(MessageEvent(source="agent", llm_message=error_message)) + + # Emit typed ConversationErrorEvent so RemoteConversation can + # report the actual error detail via _get_last_error_detail() + # instead of falling back to "Remote conversation ended with error". + is_aup = ( + "usage policy" in error_str.lower() or "content policy" in error_str.lower() + ) + on_event( + ConversationErrorEvent( + source="agent", + code="UsagePolicyRefusal" if is_aup else "ACPPromptError", + detail=error_str[:500], + ) + ) + state.execution_status = ConversationExecutionStatus.ERROR + + def _clear_turn_callbacks(self) -> None: + """Unwire per-turn callbacks so trailing session_update is a no-op. + + Called from both ``step`` and ``astep`` ``finally`` blocks. If the + ACP subprocess later dispatches a trailing ``session_update`` (e.g. + between turns), it fires on the portal thread with no FIFOLock held + by anyone — firing a stale ``on_event`` there would race with + other threads mutating ``state.events``. Clearing the callbacks + turns any such late update into a no-op emit. + """ + self._client.on_event = None + self._client.on_token = None + self._client.on_activity = None + + def _build_prompt_blocks_for_latest_user_message( + self, state: ConversationState + ) -> list[Any] | None: + """Walk events in reverse for the latest user MessageEvent.""" + for event in reversed(list(state.events)): + if isinstance(event, MessageEvent) and event.source == "user": + blocks = self._build_acp_prompt(event) + if blocks: + return blocks + return None + @observe(name="acp_agent.step", ignore_inputs=["conversation", "on_event"]) def step( self, @@ -1273,19 +1480,16 @@ def step( on_event: ConversationCallbackType, on_token: ConversationTokenCallbackType | None = None, ) -> None: - """Send the latest user message to the ACP server and emit the response.""" - state = conversation.state + """Send the latest user message to the ACP server and emit the response. - # Find the latest user message. Conversation implementations already - # attach per-turn AgentContext extensions to MessageEvent.extended_content; - # MessageEvent.to_llm_message() merges those extensions with the user text. - prompt_blocks = None - for event in reversed(list(state.events)): - if isinstance(event, MessageEvent) and event.source == "user": - prompt_blocks = self._build_acp_prompt(event) - if prompt_blocks: - break + Sync entry point — used by callers that don't already have an + asyncio loop running (CLI, eval harness, ``LocalConversation.run``). + ``LocalConversation.arun`` goes through :meth:`astep` instead, which + avoids the cross-thread state-lock deadlock described in #3348. + """ + state = conversation.state + prompt_blocks = self._build_prompt_blocks_for_latest_user_message(state) if prompt_blocks is None: logger.warning("No user message found; finishing conversation") state.execution_status = ConversationExecutionStatus.FINISHED @@ -1295,38 +1499,17 @@ def step( t0 = time.monotonic() try: - - async def _prompt() -> PromptResponse: - usage_sync = self._client.prepare_usage_sync(self._session_id or "") - response = await self._conn.prompt( - prompt_blocks, - self._session_id, - ) - if self._client.get_turn_usage_update(self._session_id or "") is None: - try: - await asyncio.wait_for( - usage_sync.wait(), timeout=_USAGE_UPDATE_TIMEOUT - ) - except TimeoutError: - logger.warning( - "UsageUpdate not received within %.1fs for session %s", - _USAGE_UPDATE_TIMEOUT, - self._session_id, - ) - return response - - # Send prompt to ACP server with retry logic for connection errors. - # Transient connection failures (network blips, server restarts) are - # retried to preserve session state and avoid losing progress. logger.info( "Sending ACP prompt (timeout=%.0fs, blocks=%d)", self.acp_prompt_timeout, len(prompt_blocks), ) - response: PromptResponse | None = None max_retries = _ACP_PROMPT_MAX_RETRIES + async def _prompt() -> PromptResponse: + return await self._do_acp_prompt(prompt_blocks) + for attempt in range(max_retries + 1): try: response = self._executor.run_async( @@ -1341,8 +1524,8 @@ async def _prompt() -> PromptResponse: min(attempt, len(_ACP_PROMPT_RETRY_DELAYS) - 1) ] logger.warning( - "ACP prompt failed with retriable error (attempt %d/%d), " - "retrying in %.0fs: %s", + "ACP prompt failed with retriable error " + "(attempt %d/%d), retrying in %.0fs: %s", attempt + 1, max_retries + 1, delay, @@ -1355,8 +1538,8 @@ async def _prompt() -> PromptResponse: raise except ACPRequestError as e: # Retry transient server errors (e.g. "Internal Server - # Error" from Gemini). These are JSON-RPC -32603 errors - # that indicate a server-side failure, not a client bug. + # Error" from Gemini). JSON-RPC -32603 = server-side + # failure, not a client bug. if ( e.code in _RETRIABLE_SERVER_ERROR_CODES and attempt < max_retries @@ -1365,8 +1548,8 @@ async def _prompt() -> PromptResponse: min(attempt, len(_ACP_PROMPT_RETRY_DELAYS) - 1) ] logger.warning( - "ACP prompt failed with server error (attempt %d/%d), " - "retrying in %.0fs: [%d] %s", + "ACP prompt failed with server error " + "(attempt %d/%d), retrying in %.0fs: [%d] %s", attempt + 1, max_retries + 1, delay, @@ -1381,135 +1564,140 @@ async def _prompt() -> PromptResponse: elapsed = time.monotonic() - t0 logger.info("ACP prompt returned in %.1fs", elapsed) + self._finalize_successful_turn(response, elapsed, state, on_event) + except TimeoutError: + self._emit_turn_timeout(time.monotonic() - t0, state, on_event) + except Exception as e: + self._emit_turn_error(e, state, on_event) + # Re-raise so LocalConversation.run()'s outer except handler + # breaks the loop, emits ConversationErrorEvent, and raises + # ConversationRunError — matching how the regular Agent works. + raise + finally: + self._clear_turn_callbacks() - session_id = self._session_id or "" - usage_update = self._client.pop_turn_usage_update(session_id) - self._record_usage( - response, - session_id, - elapsed=elapsed, - usage_update=usage_update, - ) - - # ACPToolCallEvents were already emitted live from - # _OpenHandsACPBridge.session_update as each ToolCallStart / - # ToolCallProgress notification arrived — no end-of-turn fan-out - # here. FinishAction closes out the turn below. + @observe(name="acp_agent.astep", ignore_inputs=["conversation", "on_event"]) + async def astep( + self, + conversation: LocalConversation, + on_event: ConversationCallbackType, + on_token: ConversationTokenCallbackType | None = None, + ) -> None: + """Native-async variant of :meth:`step`. + + Schedules the ACP ``conn.prompt(...)`` round-trip on the portal + loop (where the connection lives) via + ``BlockingPortal.start_task_soon`` and awaits the result back on + the caller's loop via ``asyncio.wrap_future``. Post-prompt work + (``_record_usage`` → telemetry callbacks → ``on_event`` calls → + state mutations) runs entirely on the caller's thread. + + Why this matters: ``LocalConversation.arun`` holds the + conversation state's ``FIFOLock`` on its own loop thread across + ``await self.agent.astep(...)``. The default + ``AgentBase.astep`` would wrap sync ``step`` in + ``loop.run_in_executor(None, self.step, ...)``, moving every + post-prompt callback (notably ``stats_callback``) to an + executor worker thread — a different thread from the lock + owner. Any ``with state:`` inside those callbacks then blocks + on a lock owned by a thread that is itself ``await``-ing + ``astep`` to return. See #3348 / #3350 for the full diagnosis. + Keeping post-prompt work on the caller's thread sidesteps the + whole class of cross-thread state-lock deadlocks. + """ + state = conversation.state - # Build response message - response_text = "".join(self._client.accumulated_text) - thought_text = "".join(self._client.accumulated_thoughts) + prompt_blocks = self._build_prompt_blocks_for_latest_user_message(state) + if prompt_blocks is None: + logger.warning("No user message found; finishing conversation") + state.execution_status = ConversationExecutionStatus.FINISHED + return - if not response_text: - response_text = "(No response from ACP server)" + self._reset_client_for_turn(on_token, on_event) - # ACP step() boundaries are full remote assistant turns, not - # partial planning steps. Emit FinishAction to delimit that - # completed turn for eval/remote consumers, matching #2190. - finish_action = FinishAction(message=response_text) - tc_id = str(uuid.uuid4()) - action_event = ActionEvent( - source="agent", - thought=[], - reasoning_content=thought_text or None, - action=finish_action, - tool_name="finish", - tool_call_id=tc_id, - tool_call=MessageToolCall( - id=tc_id, - name="finish", - arguments=json.dumps({"message": response_text}), - origin="completion", - ), - llm_response_id=str(uuid.uuid4()), - ) - on_event(action_event) - on_event( - ObservationEvent( - observation=FinishObservation.from_text(text=response_text), - action_id=action_event.id, - tool_name="finish", - tool_call_id=tc_id, - ) + t0 = time.monotonic() + try: + logger.info( + "Sending ACP prompt (timeout=%.0fs, blocks=%d, async)", + self.acp_prompt_timeout, + len(prompt_blocks), ) + portal = self._executor._ensure_portal() - state.execution_status = ConversationExecutionStatus.FINISHED + response: PromptResponse | None = None + max_retries = _ACP_PROMPT_MAX_RETRIES + for attempt in range(max_retries + 1): + try: + # Schedule the ACP prompt on the portal loop (where the + # connection lives) and await the result on the caller + # loop. ``asyncio.wait_for`` enforces the per-attempt + # timeout symmetrically with the sync path's + # ``run_async(timeout=...)``. + future = portal.start_task_soon(self._do_acp_prompt, prompt_blocks) + response = await asyncio.wait_for( + asyncio.wrap_future(future), + timeout=self.acp_prompt_timeout, + ) + break + except TimeoutError: + # ``asyncio.TimeoutError`` aliases ``TimeoutError`` on + # Python 3.11+ — same except clause catches both. + # Surface to the outer handler below so step and astep + # share error-event semantics. + raise TimeoutError( + f"ACP prompt timed out after {self.acp_prompt_timeout:.0f}s" + ) + except _RETRIABLE_CONNECTION_ERRORS as e: + if attempt < max_retries: + delay = _ACP_PROMPT_RETRY_DELAYS[ + min(attempt, len(_ACP_PROMPT_RETRY_DELAYS) - 1) + ] + logger.warning( + "ACP prompt failed with retriable error " + "(attempt %d/%d), retrying in %.0fs: %s", + attempt + 1, + max_retries + 1, + delay, + e, + ) + await asyncio.sleep(delay) + self._cancel_inflight_tool_calls() + self._reset_client_for_turn(on_token, on_event) + else: + raise + except ACPRequestError as e: + if ( + e.code in _RETRIABLE_SERVER_ERROR_CODES + and attempt < max_retries + ): + delay = _ACP_PROMPT_RETRY_DELAYS[ + min(attempt, len(_ACP_PROMPT_RETRY_DELAYS) - 1) + ] + logger.warning( + "ACP prompt failed with server error " + "(attempt %d/%d), retrying in %.0fs: [%d] %s", + attempt + 1, + max_retries + 1, + delay, + e.code, + e, + ) + await asyncio.sleep(delay) + self._cancel_inflight_tool_calls() + self._reset_client_for_turn(on_token, on_event) + else: + raise - except TimeoutError: elapsed = time.monotonic() - t0 - logger.error( - "ACP prompt timed out after %.1fs (limit=%.0fs). " - "The ACP server may have completed its work but failed to " - "send the JSON-RPC response. Accumulated %d text chunks, " - "%d tool calls.", - elapsed, - self.acp_prompt_timeout, - len(self._client.accumulated_text), - len(self._client.accumulated_tool_calls), - ) - error_message = Message( - role="assistant", - content=[ - TextContent( - text=( - f"ACP prompt timed out after {elapsed:.0f}s. " - "The agent may have completed its work but " - "the response was not received." - ) - ) - ], - ) - # Close any tool cards left in flight from the timed-out attempt. - self._cancel_inflight_tool_calls() - on_event(MessageEvent(source="agent", llm_message=error_message)) - state.execution_status = ConversationExecutionStatus.ERROR + logger.info("ACP prompt returned in %.1fs (async)", elapsed) + self._finalize_successful_turn(response, elapsed, state, on_event) + except TimeoutError: + self._emit_turn_timeout(time.monotonic() - t0, state, on_event) except Exception as e: - logger.error("ACP prompt failed: %s", e, exc_info=True) - error_str = str(e) - - # Close any tool cards left in flight before surfacing the error. - self._cancel_inflight_tool_calls() - - # Emit error as an agent message (existing behavior, preserved for - # consumers that inspect MessageEvents) - error_message = Message( - role="assistant", - content=[TextContent(text=f"ACP error: {e}")], - ) - on_event(MessageEvent(source="agent", llm_message=error_message)) - - # Emit typed ConversationErrorEvent so RemoteConversation can - # report the actual error detail via _get_last_error_detail() - # instead of falling back to "Remote conversation ended with error" - is_aup = ( - "usage policy" in error_str.lower() - or "content policy" in error_str.lower() - ) - on_event( - ConversationErrorEvent( - source="agent", - code="UsagePolicyRefusal" if is_aup else "ACPPromptError", - detail=error_str[:500], - ) - ) - - state.execution_status = ConversationExecutionStatus.ERROR - - # Re-raise so LocalConversation.run()'s outer except handler - # breaks the loop, emits ConversationErrorEvent, and raises - # ConversationRunError — matching how the regular Agent works + self._emit_turn_error(e, state, on_event) raise finally: - # Unwire the per-turn callbacks now that this step has finished - # emitting everything it's going to emit. If the ACP subprocess - # later dispatches a trailing ``session_update`` (e.g. between - # turns), it fires on the portal thread with no FIFOLock held - # by anyone — firing a stale ``on_event`` there would race - # with other threads mutating ``state.events``. Clearing the - # callbacks turns any such late update into a no-op emit. - self._client.on_event = None - self._client.on_token = None - self._client.on_activity = None + self._clear_turn_callbacks() def ask_agent(self, question: str) -> str | None: """Fork the ACP session, prompt the fork, and return the response.""" diff --git a/tests/sdk/agent/test_acp_agent.py b/tests/sdk/agent/test_acp_agent.py index 4d19773e85..600178d397 100644 --- a/tests/sdk/agent/test_acp_agent.py +++ b/tests/sdk/agent/test_acp_agent.py @@ -4,6 +4,7 @@ import asyncio import json +import threading import uuid from typing import Any from unittest.mock import AsyncMock, MagicMock, patch @@ -1299,6 +1300,175 @@ def _fake_run_async(_coro, **_kwargs): assert mock_client.on_token is None +# --------------------------------------------------------------------------- +# Async step (astep) — regression coverage for #3348 / #3350 +# --------------------------------------------------------------------------- + + +class TestACPAgentAstep: + """Native ``ACPAgent.astep`` must not fall back to + ``AgentBase.astep`` (which runs ``step`` via + ``loop.run_in_executor``). Doing so would move post-prompt work + (e.g. ``stats_callback``'s ``with state:`` re-entry) onto an + executor worker thread, deadlocking against + ``LocalConversation.arun`` which holds the state's reentrant + ``FIFOLock`` on the loop thread. See OpenHands/software-agent-sdk + issues #3348 and #3350. + """ + + def _make_conversation_with_message(self, tmp_path, text="Hello"): + state = _make_state(tmp_path) + state.events.append( + SystemPromptEvent( + source="agent", + system_prompt=TextContent(text="ACP-managed agent"), + tools=[], + ) + ) + state.events.append( + MessageEvent( + source="user", + llm_message=Message(role="user", content=[TextContent(text=text)]), + ) + ) + conversation = MagicMock() + conversation.state = state + return conversation + + def test_astep_overrides_default_agentbase_implementation(self): + """The architectural fix: ACPAgent must define its own ``astep``. + + If this assertion ever flips back, the default + ``AgentBase.astep`` will route ACP traffic through + ``loop.run_in_executor(None, self.step, ...)`` again, which + deadlocks against ``LocalConversation.arun``. + """ + assert ACPAgent.astep is not AgentBase.astep + + def test_astep_runs_post_prompt_callbacks_on_caller_thread(self, tmp_path): + """Post-prompt ``on_event`` callbacks fire on the caller thread. + + This is the structural property that defeats the #3348 deadlock: + ``stats_callback`` (registered downstream of ``_record_usage``) + does ``with state:`` — re-entering the conversation state's + FIFOLock. FIFOLock is reentrant **on the same thread**. If + ``astep`` schedules ``step`` on a worker thread (the buggy + default), the callback runs on a different thread from the + lock owner and blocks forever. Asserting on threading.get_ident + here is sufficient — the deadlock is downstream of this + invariant. + """ + from openhands.sdk.utils.async_executor import AsyncExecutor + + agent = _make_agent() + conversation = self._make_conversation_with_message(tmp_path) + + caller_thread_id = threading.get_ident() + prompt_thread_id: list[int] = [] + on_event_thread_ids: list[int] = [] + + mock_client = _OpenHandsACPBridge() + mock_client.get_turn_usage_update = MagicMock(return_value=object()) + agent._client = mock_client + agent._conn = MagicMock() + + async def _fake_prompt(prompt_blocks, session_id): + # This must run on the portal loop's thread, not the caller's. + prompt_thread_id.append(threading.get_ident()) + mock_client.accumulated_text.append("answer") + return None + + agent._conn.prompt = _fake_prompt + agent._session_id = "test-session" + + executor = AsyncExecutor() + try: + agent._executor = executor + + def _capture_event(event): + on_event_thread_ids.append(threading.get_ident()) + + asyncio.run(agent.astep(conversation, on_event=_capture_event)) + finally: + executor.close() + + # Prompt ran on the portal's thread — proving we actually crossed + # the loop boundary and didn't just await on the caller loop. + assert len(prompt_thread_id) == 1 + assert prompt_thread_id[0] != caller_thread_id + + # All on_event callbacks (ActionEvent + ObservationEvent) ran + # back on the caller's thread — proving post-prompt work was + # NOT moved to a worker thread. + assert len(on_event_thread_ids) >= 2 + for tid in on_event_thread_ids: + assert tid == caller_thread_id, ( + f"on_event ran on thread {tid} instead of caller " + f"{caller_thread_id} — astep regressed to thread-pool path" + ) + + assert ( + conversation.state.execution_status == ConversationExecutionStatus.FINISHED + ) + + def test_astep_does_not_deadlock_under_reentrant_state_lock(self, tmp_path): + """End-to-end: ``arun``-shaped caller can ``await astep`` while + holding ``state.lock``. + + This mirrors what ``LocalConversation.arun`` does — it holds + ``state.lock`` on the asyncio loop thread across the await. + With the architectural fix in place, post-prompt callbacks + re-enter the lock on the same thread (FIFOLock is reentrant) + and astep returns promptly. Without the fix, callbacks run on + a worker thread, hit the lock owned by the loop thread, and + the await never returns. + """ + from openhands.sdk.utils.async_executor import AsyncExecutor + + agent = _make_agent() + conversation = self._make_conversation_with_message(tmp_path) + state = conversation.state + + mock_client = _OpenHandsACPBridge() + mock_client.get_turn_usage_update = MagicMock(return_value=object()) + agent._client = mock_client + agent._conn = MagicMock() + + async def _fake_prompt(prompt_blocks, session_id): + mock_client.accumulated_text.append("done") + return None + + agent._conn.prompt = _fake_prompt + agent._session_id = "test-session" + + executor = AsyncExecutor() + try: + agent._executor = executor + + # Reproduce stats_callback's pattern: on each event, take the + # state lock briefly. Same-thread reentrancy must succeed. + def _capture_event(event): + with state: + pass + + async def _arun_shaped() -> None: + # arun holds the state lock across astep — same shape as + # LocalConversation.arun. + with state: + await asyncio.wait_for( + agent.astep(conversation, on_event=_capture_event), + timeout=10.0, + ) + + asyncio.run(_arun_shaped()) + finally: + executor.close() + + assert ( + conversation.state.execution_status == ConversationExecutionStatus.FINISHED + ) + + # --------------------------------------------------------------------------- # Cleanup # --------------------------------------------------------------------------- From b98ea9dd80b38f51d220f82a05d54d1a6d067ba5 Mon Sep 17 00:00:00 2001 From: Debug Agent Date: Fri, 22 May 2026 10:03:18 +0200 Subject: [PATCH 2/6] fix(acp): marshal bridge callbacks and handle cancellation in astep MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Addresses two concurrency gaps the review bot flagged on PR #3358: 1. Mid-turn bridge callbacks (ACPToolCallEvents, streamed tokens) were still firing on the portal thread. ``_OpenHandsACPBridge.session_update`` invokes the registered ``on_event`` / ``on_token`` synchronously from the portal loop's thread. Without marshalling, those callbacks would re-acquire ``state.lock`` from the wrong thread while ``arun()`` owns it on the caller loop — same deadlock class as #3348, just mid-turn. Fix: ``_make_caller_loop_marshaller`` wraps each bridge callback so * same-thread invocations (from ``_cancel_inflight_tool_calls`` / ``_finalize_successful_turn`` called directly inside astep) run synchronously, preserving sync invariants; * cross-thread invocations (the portal-thread case) hop back to the caller loop via ``loop.call_soon_threadsafe``, preserving FIFO ordering of bridge events. 2. ``asyncio.CancelledError`` was bypassing both turn-error handlers and going straight to ``finally``, which only clears callbacks. Any in-flight ACPToolCallEvents we had already streamed would stay ``pending`` / ``in_progress`` forever — ``LocalConversation._emit_orphaned_action_errors`` only patches ``ActionEvent``s, not ``ACPToolCallEvent``s. Fix: catch ``asyncio.CancelledError`` before the generic ``except Exception``, run ``_cancel_inflight_tool_calls()`` to emit terminal ``failed`` events on the caller thread (via the marshaller's same-thread branch), then re-raise so ``arun()`` can transition to PAUSED. Adds two regression tests in ``TestACPAgentAstep``: - ``test_astep_marshals_bridge_callbacks_to_caller_thread`` — fires an ACPToolCallEvent + a token chunk from the portal-thread fake prompt; asserts both land on the caller thread. - ``test_astep_emits_failed_tool_calls_on_cancellation`` — cancels a task with an in-flight tool call seeded mid-prompt; asserts a terminal ``failed`` ACPToolCallEvent is emitted before CancelledError re-raises. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../openhands/sdk/agent/acp_agent.py | 97 +++++++++- tests/sdk/agent/test_acp_agent.py | 176 ++++++++++++++++++ 2 files changed, 269 insertions(+), 4 deletions(-) diff --git a/openhands-sdk/openhands/sdk/agent/acp_agent.py b/openhands-sdk/openhands/sdk/agent/acp_agent.py index d9bceb516d..3be5bdafff 100644 --- a/openhands-sdk/openhands/sdk/agent/acp_agent.py +++ b/openhands-sdk/openhands/sdk/agent/acp_agent.py @@ -22,7 +22,7 @@ import threading import time import uuid -from collections.abc import Generator +from collections.abc import Callable, Generator from pathlib import Path from typing import TYPE_CHECKING, Any, Literal @@ -1462,6 +1462,59 @@ def _clear_turn_callbacks(self) -> None: self._client.on_token = None self._client.on_activity = None + def _make_caller_loop_marshaller( + self, + loop: asyncio.AbstractEventLoop, + caller_thread_id: int, + callback: Callable[..., Any] | None, + ) -> Callable[..., None] | None: + """Wrap ``callback`` so portal-thread invocations hop back to ``loop``. + + ``_OpenHandsACPBridge.session_update`` runs on the portal thread (the + connection's loop), and emits ``ACPToolCallEvent`` / streamed tokens + synchronously by calling whatever is wired into the bridge. In the + ``astep`` path, ``arun()`` holds the conversation state's reentrant + ``FIFOLock`` on the caller loop's thread — invoking the agent-server's + ``on_event`` from the portal thread would re-acquire the lock from + the wrong thread and deadlock the conversation (same failure class as + #3348, just mid-turn instead of post-turn). + + The returned wrapper: + + * Runs synchronously if invoked from the caller thread — direct + paths like ``_cancel_inflight_tool_calls`` / ``_finalize_successful_turn`` + go straight through, preserving sync invariants. + * Marshals to the caller loop via ``call_soon_threadsafe`` when + invoked from any other thread (the portal thread is the + motivating case, but third-party threads would be handled too). + ``call_soon_threadsafe`` is FIFO per loop, so ordering of bridge + callbacks is preserved relative to each other. + + Returns ``None`` if ``callback`` is ``None`` so callers can wire + ``self._reset_client_for_turn(None, None)`` semantics through + unchanged. + """ + if callback is None: + return None + + def _invoke(*args: Any, **kwargs: Any) -> None: + if threading.get_ident() == caller_thread_id: + callback(*args, **kwargs) + return + try: + loop.call_soon_threadsafe(lambda: callback(*args, **kwargs)) + except RuntimeError: + # Caller loop is closed (e.g. astep already returned and the + # bridge fired a trailing notification). Dropping is the + # right behavior — _clear_turn_callbacks should have already + # unwired this, but guard the race. + logger.debug( + "caller loop closed; dropping marshalled ACP callback", + exc_info=True, + ) + + return _invoke + def _build_prompt_blocks_for_latest_user_message( self, state: ConversationState ) -> list[Any] | None: @@ -1613,7 +1666,25 @@ async def astep( state.execution_status = ConversationExecutionStatus.FINISHED return - self._reset_client_for_turn(on_token, on_event) + # Marshal bridge callbacks back onto the caller loop. Without this, + # ``_OpenHandsACPBridge.session_update`` would fire ``on_event`` / + # ``on_token`` from the portal thread mid-turn (e.g. for each + # ``ACPToolCallEvent`` or streamed token), which re-acquires + # ``state.lock`` from the wrong thread while ``arun()`` owns it on + # the caller loop's thread. Bridge events now hop back via + # ``call_soon_threadsafe``; same-thread invocations (e.g. from + # ``_cancel_inflight_tool_calls`` called directly inside astep) run + # synchronously to preserve sync invariants in retry/error paths. + loop = asyncio.get_running_loop() + caller_thread_id = threading.get_ident() + marshalled_on_event = self._make_caller_loop_marshaller( + loop, caller_thread_id, on_event + ) + marshalled_on_token = self._make_caller_loop_marshaller( + loop, caller_thread_id, on_token + ) + assert marshalled_on_event is not None # on_event is required + self._reset_client_for_turn(marshalled_on_token, marshalled_on_event) t0 = time.monotonic() try: @@ -1662,7 +1733,9 @@ async def astep( ) await asyncio.sleep(delay) self._cancel_inflight_tool_calls() - self._reset_client_for_turn(on_token, on_event) + self._reset_client_for_turn( + marshalled_on_token, marshalled_on_event + ) else: raise except ACPRequestError as e: @@ -1684,13 +1757,29 @@ async def astep( ) await asyncio.sleep(delay) self._cancel_inflight_tool_calls() - self._reset_client_for_turn(on_token, on_event) + self._reset_client_for_turn( + marshalled_on_token, marshalled_on_event + ) else: raise elapsed = time.monotonic() - t0 logger.info("ACP prompt returned in %.1fs (async)", elapsed) self._finalize_successful_turn(response, elapsed, state, on_event) + except asyncio.CancelledError: + # Interrupt landed while astep was awaiting the portal future + # (or sleeping between retries). Close out any + # ``pending`` / ``in_progress`` ACPToolCallEvents we've already + # streamed — without this, those tool cards stay live in the + # event stream because ``LocalConversation._emit_orphaned_action_errors`` + # only patches ``ActionEvent``s, not ``ACPToolCallEvent``s. + # We're on the caller thread here (asyncio's cancel raises on + # the awaiter's loop), and ``self._client.on_event`` is the + # marshaller — its same-thread branch will invoke ``on_event`` + # synchronously, so the failed events are emitted before we + # re-raise. Re-raising lets ``arun()`` transition to PAUSED. + self._cancel_inflight_tool_calls() + raise except TimeoutError: self._emit_turn_timeout(time.monotonic() - t0, state, on_event) except Exception as e: diff --git a/tests/sdk/agent/test_acp_agent.py b/tests/sdk/agent/test_acp_agent.py index 600178d397..0d8ed1b692 100644 --- a/tests/sdk/agent/test_acp_agent.py +++ b/tests/sdk/agent/test_acp_agent.py @@ -1468,6 +1468,182 @@ async def _arun_shaped() -> None: conversation.state.execution_status == ConversationExecutionStatus.FINISHED ) + def test_astep_marshals_bridge_callbacks_to_caller_thread(self, tmp_path): + """Mid-turn bridge callbacks (ACPToolCallEvents, streamed tokens) + must NOT fire on the portal thread. + + Without marshalling, ``_OpenHandsACPBridge.session_update`` invokes + the registered ``on_event`` synchronously from the portal loop's + thread, re-acquiring ``state.lock`` from the wrong thread while + ``arun()`` owns it — same deadlock class as #3348, just mid-turn + rather than post-turn. This test stages an ACP prompt that fires + an ``ACPToolCallEvent`` and a token chunk from the portal loop and + asserts both land back on the caller thread. + """ + from openhands.sdk.event import ACPToolCallEvent + from openhands.sdk.utils.async_executor import AsyncExecutor + + agent = _make_agent() + conversation = self._make_conversation_with_message(tmp_path) + + caller_thread_id = threading.get_ident() + on_event_thread_ids: list[int] = [] + on_token_thread_ids: list[int] = [] + observed_event_kinds: list[str] = [] + + mock_client = _OpenHandsACPBridge() + mock_client.get_turn_usage_update = MagicMock(return_value=object()) + agent._client = mock_client + agent._conn = MagicMock() + + async def _fake_prompt(prompt_blocks, session_id): + # Mimic what the real bridge does inside session_update: + # fire an ACPToolCallEvent + a token chunk from the portal + # thread (= the thread we're on now). + if mock_client.on_event is not None: + mock_client.on_event( + ACPToolCallEvent( + tool_call_id="tc-1", + title="echo", + status="in_progress", + tool_kind=None, + raw_input=None, + raw_output=None, + content=None, + is_error=False, + ) + ) + if mock_client.on_token is not None: + mock_client.on_token("partial-chunk") + mock_client.accumulated_text.append("ok") + return None + + agent._conn.prompt = _fake_prompt + agent._session_id = "test-session" + + executor = AsyncExecutor() + try: + agent._executor = executor + + def _capture_event(event): + on_event_thread_ids.append(threading.get_ident()) + observed_event_kinds.append(type(event).__name__) + + def _capture_token(chunk): + on_token_thread_ids.append(threading.get_ident()) + + asyncio.run( + agent.astep( + conversation, + on_event=_capture_event, + on_token=_capture_token, + ) + ) + finally: + executor.close() + + # The portal-thread ACPToolCallEvent was marshalled — landed on the + # caller thread, not the portal thread. + assert "ACPToolCallEvent" in observed_event_kinds, ( + f"ACPToolCallEvent missing from observed events: {observed_event_kinds}" + ) + assert len(on_event_thread_ids) >= 3 # tool call + action + observation + for tid in on_event_thread_ids: + assert tid == caller_thread_id, ( + f"on_event fired on thread {tid} instead of caller " + f"{caller_thread_id} — bridge callback was not marshalled" + ) + # And on_token chunks also landed on the caller thread. + assert len(on_token_thread_ids) >= 1 + for tid in on_token_thread_ids: + assert tid == caller_thread_id, ( + f"on_token fired on thread {tid} instead of caller " + f"{caller_thread_id} — token callback was not marshalled" + ) + + def test_astep_emits_failed_tool_calls_on_cancellation(self, tmp_path): + """``asyncio.CancelledError`` during astep must close in-flight + ``ACPToolCallEvent``s as ``failed`` and re-raise. + + Otherwise the cancel races straight to ``finally`` (which only + clears callbacks) and any pending/in_progress tool cards stay + live forever — ``LocalConversation._emit_orphaned_action_errors`` + only patches ``ActionEvent``s, not ``ACPToolCallEvent``s. + """ + from openhands.sdk.event import ACPToolCallEvent + from openhands.sdk.utils.async_executor import AsyncExecutor + + agent = _make_agent() + conversation = self._make_conversation_with_message(tmp_path) + + emitted_events: list = [] + + mock_client = _OpenHandsACPBridge() + mock_client.get_turn_usage_update = MagicMock(return_value=object()) + agent._client = mock_client + agent._conn = MagicMock() + + async def _fake_prompt(prompt_blocks, session_id): + # Seed an in-flight tool call AFTER _reset_client_for_turn has + # run (which clears accumulated_tool_calls). In production + # the bridge accumulates these inside session_update as + # ToolCallStart / ToolCallProgress notifications arrive. + mock_client.accumulated_tool_calls.append( + { + "tool_call_id": "tc-cancel-1", + "title": "in-flight tool", + "status": "in_progress", + "tool_kind": None, + "raw_input": None, + "raw_output": None, + "content": None, + } + ) + # Block long enough for the cancel to land. Real ACP prompt + # would be similarly blocked in await self._conn.prompt(...). + await asyncio.sleep(60) + return None + + agent._conn.prompt = _fake_prompt + agent._session_id = "test-session" + + executor = AsyncExecutor() + + async def _run_with_cancel() -> None: + task = asyncio.create_task( + agent.astep(conversation, on_event=emitted_events.append) + ) + # Yield enough for astep to wire callbacks and start awaiting + # the prompt future, then cancel. + await asyncio.sleep(0.1) + task.cancel() + with pytest.raises(asyncio.CancelledError): + await task + + try: + agent._executor = executor + asyncio.run(_run_with_cancel()) + finally: + executor.close() + + # The orphaned in_progress tool call was closed out with a + # terminal ``failed`` ACPToolCallEvent. + failed_tool_events = [ + e + for e in emitted_events + if isinstance(e, ACPToolCallEvent) + and e.tool_call_id == "tc-cancel-1" + and e.status == "failed" + ] + observed = [ + (type(e).__name__, getattr(e, "status", None)) for e in emitted_events + ] + assert len(failed_tool_events) == 1, ( + f"expected one terminal failed event for tc-cancel-1, " + f"got events: {observed}" + ) + assert failed_tool_events[0].is_error is True + # --------------------------------------------------------------------------- # Cleanup From 15319a4212966ed73ce1b0b5cd5219863ae680d1 Mon Sep 17 00:00:00 2001 From: Debug Agent Date: Fri, 22 May 2026 10:56:10 +0200 Subject: [PATCH 3/6] refactor(acp): address review nits on astep portal access, exc chain, test timing MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Addresses three review suggestions on PR #3358: 1. ``AsyncExecutor._ensure_portal`` was private; ``astep`` reached into it to schedule the ACP prompt via ``start_task_soon``. Promote it to a public ``portal`` property so the contract is part of the API surface and callers don't need to know the internal field name. 2. ``asyncio.wait_for``'s ``TimeoutError`` is now re-raised ``from exc`` so ``__cause__`` survives, preserving the traceback chain for anyone inspecting it. Adds a comment noting that the portal task scheduled via ``start_task_soon`` is NOT cancelled here — it keeps running on the portal loop until the underlying ACP connection responds. This is benign: ``_clear_turn_callbacks`` already mutes bridge callbacks in the ``finally`` block, and asyncio silently discards a late ``wrap_future`` resolution. 3. ``test_astep_emits_failed_tool_calls_on_cancellation`` was using a 100 ms sleep as a sync point — flaky on heavily loaded CI runners. Replaced with an ``asyncio.Event`` set inside ``_fake_prompt`` and awaited (with a 5 s safety timeout) before ``task.cancel()``. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../openhands/sdk/agent/acp_agent.py | 15 +++- .../openhands/sdk/utils/async_executor.py | 11 +++ tests/sdk/agent/test_acp_agent.py | 70 ++++++++++++------- 3 files changed, 66 insertions(+), 30 deletions(-) diff --git a/openhands-sdk/openhands/sdk/agent/acp_agent.py b/openhands-sdk/openhands/sdk/agent/acp_agent.py index 3be5bdafff..c8ec6fd78c 100644 --- a/openhands-sdk/openhands/sdk/agent/acp_agent.py +++ b/openhands-sdk/openhands/sdk/agent/acp_agent.py @@ -1693,7 +1693,7 @@ async def astep( self.acp_prompt_timeout, len(prompt_blocks), ) - portal = self._executor._ensure_portal() + portal = self._executor.portal response: PromptResponse | None = None max_retries = _ACP_PROMPT_MAX_RETRIES @@ -1710,14 +1710,23 @@ async def astep( timeout=self.acp_prompt_timeout, ) break - except TimeoutError: + except TimeoutError as exc: # ``asyncio.TimeoutError`` aliases ``TimeoutError`` on # Python 3.11+ — same except clause catches both. # Surface to the outer handler below so step and astep # share error-event semantics. + # + # The portal task scheduled via ``start_task_soon`` is + # NOT cancelled by ``asyncio.wait_for`` — it keeps + # running on the portal loop until the underlying ACP + # connection eventually responds or itself times out. + # This is benign: ``_clear_turn_callbacks`` in the + # ``finally`` block has already muted the bridge + # callbacks, and asyncio silently discards a result + # arriving on an already-resolved wrap_future. raise TimeoutError( f"ACP prompt timed out after {self.acp_prompt_timeout:.0f}s" - ) + ) from exc except _RETRIABLE_CONNECTION_ERRORS as e: if attempt < max_retries: delay = _ACP_PROMPT_RETRY_DELAYS[ diff --git a/openhands-sdk/openhands/sdk/utils/async_executor.py b/openhands-sdk/openhands/sdk/utils/async_executor.py index bb71e010c0..ef6186e3e9 100644 --- a/openhands-sdk/openhands/sdk/utils/async_executor.py +++ b/openhands-sdk/openhands/sdk/utils/async_executor.py @@ -26,6 +26,17 @@ def __init__(self): self._lock = threading.Lock() self._atexit_registered = False + @property + def portal(self): + """The underlying ``BlockingPortal``, lazily started. + + Public accessor for callers that need to schedule work directly on + the portal loop (e.g. ``ACPAgent.astep`` uses + ``portal.start_task_soon`` + ``asyncio.wrap_future`` to bridge ACP + calls across loops). Equivalent to ``_ensure_portal()``. + """ + return self._ensure_portal() + def _ensure_portal(self): with self._lock: if self._portal is None: diff --git a/tests/sdk/agent/test_acp_agent.py b/tests/sdk/agent/test_acp_agent.py index 0d8ed1b692..990fd3ea7b 100644 --- a/tests/sdk/agent/test_acp_agent.py +++ b/tests/sdk/agent/test_acp_agent.py @@ -1583,39 +1583,55 @@ def test_astep_emits_failed_tool_calls_on_cancellation(self, tmp_path): agent._client = mock_client agent._conn = MagicMock() - async def _fake_prompt(prompt_blocks, session_id): - # Seed an in-flight tool call AFTER _reset_client_for_turn has - # run (which clears accumulated_tool_calls). In production - # the bridge accumulates these inside session_update as - # ToolCallStart / ToolCallProgress notifications arrive. - mock_client.accumulated_tool_calls.append( - { - "tool_call_id": "tc-cancel-1", - "title": "in-flight tool", - "status": "in_progress", - "tool_kind": None, - "raw_input": None, - "raw_output": None, - "content": None, - } - ) - # Block long enough for the cancel to land. Real ACP prompt - # would be similarly blocked in await self._conn.prompt(...). - await asyncio.sleep(60) - return None - - agent._conn.prompt = _fake_prompt - agent._session_id = "test-session" - executor = AsyncExecutor() async def _run_with_cancel() -> None: + # Signal the moment ``_fake_prompt`` has entered (after + # ``_reset_client_for_turn`` has run and the bridge + # callbacks are wired) so the cancel races deterministically. + # ``asyncio.Event`` MUST be created inside the running loop — + # creating it at module/test scope would bind it to the wrong + # loop and ``set()`` would silently no-op. + prompt_entered = asyncio.Event() + # ``set`` lives on the portal loop, ``wait`` lives on the + # caller loop — go through ``call_soon_threadsafe`` so the + # event sees a consistent loop owner. + caller_loop = asyncio.get_running_loop() + + async def _fake_prompt(prompt_blocks, session_id): + # Seed an in-flight tool call AFTER _reset_client_for_turn + # has run (which clears accumulated_tool_calls). In + # production the bridge accumulates these inside + # session_update as ToolCallStart / ToolCallProgress + # notifications arrive. + mock_client.accumulated_tool_calls.append( + { + "tool_call_id": "tc-cancel-1", + "title": "in-flight tool", + "status": "in_progress", + "tool_kind": None, + "raw_input": None, + "raw_output": None, + "content": None, + } + ) + caller_loop.call_soon_threadsafe(prompt_entered.set) + # Block long enough for the cancel to land. Real ACP + # prompt would be similarly blocked in + # ``await self._conn.prompt(...)``. + await asyncio.sleep(60) + return None + + agent._conn.prompt = _fake_prompt + agent._session_id = "test-session" + task = asyncio.create_task( agent.astep(conversation, on_event=emitted_events.append) ) - # Yield enough for astep to wire callbacks and start awaiting - # the prompt future, then cancel. - await asyncio.sleep(0.1) + # Deterministic: wait until the fake prompt has actually + # entered (and seeded the in-flight tool call) before + # cancelling. Removes the CI-load timing race. + await asyncio.wait_for(prompt_entered.wait(), timeout=5.0) task.cancel() with pytest.raises(asyncio.CancelledError): await task From 17a0bb644201e8dcf2fe68705e68ed4bdbb0f80b Mon Sep 17 00:00:00 2001 From: Debug Agent Date: Fri, 22 May 2026 11:13:00 +0200 Subject: [PATCH 4/6] fix(acp): drop late marshalled callbacks + minor astep cleanups MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Addresses three more review comments on PR #3358: 1. (Important) Marshalled bridge callbacks could fire AFTER ``arun()`` released the state lock. A portal-thread ``session_update`` racing with prompt completion would queue a ``call_soon_threadsafe`` entry that the caller loop ticks later — potentially after astep's ``finally`` had already returned to ``arun``, which had then exited its ``with state:`` block. The user's ``on_event`` would then run outside the lock, appending stale ``ACPToolCallEvent``s to ``state.events`` without synchronization. Fix: turn the closure-based marshaller into ``_CallerLoopMarshaller`` (module-level class with ``__slots__`` for the hot path) that exposes ``deactivate()``. Both the entry call and the deferred dispatch short-circuit when inactive. ``astep``'s ``finally`` block calls ``deactivate()`` on each marshaller BEFORE ``_clear_turn_callbacks`` unwires the bridge, so any in-flight queued entry becomes a no-op before the loop has a chance to tick it. 2. ``assert marshalled_on_event is not None`` would silently disappear under ``python -O``. Replaced with an explicit ``raise TypeError`` so the invariant survives optimized runs. 3. ``AsyncExecutor.portal`` and ``_ensure_portal`` are now annotated ``-> BlockingPortal`` so static analysis can flag wrong-loop usage. New regression test ``test_astep_drops_late_bridge_callbacks_after_turn``: captures the marshaller during a turn, lets the turn complete, then fires the stashed marshaller from a worker thread to simulate a straggler ``session_update`` — asserts the user's ``on_event`` is NOT invoked. Fails without ``deactivate()``. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../openhands/sdk/agent/acp_agent.py | 125 ++++++++++++++---- .../openhands/sdk/utils/async_executor.py | 6 +- tests/sdk/agent/test_acp_agent.py | 95 +++++++++++++ 3 files changed, 199 insertions(+), 27 deletions(-) diff --git a/openhands-sdk/openhands/sdk/agent/acp_agent.py b/openhands-sdk/openhands/sdk/agent/acp_agent.py index c8ec6fd78c..dc4a06c792 100644 --- a/openhands-sdk/openhands/sdk/agent/acp_agent.py +++ b/openhands-sdk/openhands/sdk/agent/acp_agent.py @@ -343,6 +343,76 @@ async def _filter_jsonrpc_lines(source: Any, dest: Any) -> None: dest.feed_eof() +class _CallerLoopMarshaller: + """Thread-hopping wrapper for ``ACPAgent.astep``'s bridge callbacks. + + Constructed by :meth:`ACPAgent._make_caller_loop_marshaller` for each + per-turn ``on_event`` / ``on_token`` callback wired into + :class:`_OpenHandsACPBridge`. Same-thread invocations call through + synchronously (so ``_finalize_successful_turn`` / + ``_cancel_inflight_tool_calls`` keep their sync semantics); + portal-thread invocations hop back to the caller loop via + ``loop.call_soon_threadsafe``. + + Late-callback safety: ``deactivate()`` flips an internal flag that + short-circuits BOTH the entry call and the deferred dispatch. Any + ``call_soon_threadsafe`` entry queued before deactivation but not yet + ticked by the loop becomes a no-op, so portal-thread + ``session_update`` racing with prompt completion can no longer leak + stale ``ACPToolCallEvent``s into the event stream after ``arun()`` + has released the state lock. + """ + + __slots__ = ("_loop", "_caller_thread_id", "_callback", "_active") + + def __init__( + self, + loop: asyncio.AbstractEventLoop, + caller_thread_id: int, + callback: Callable[..., Any], + ) -> None: + self._loop = loop + self._caller_thread_id = caller_thread_id + self._callback = callback + self._active = True + + def __call__(self, *args: Any, **kwargs: Any) -> None: + if not self._active: + return + if threading.get_ident() == self._caller_thread_id: + self._callback(*args, **kwargs) + return + try: + # ``call_soon_threadsafe`` only takes positional ``*args``; + # wrap in a tiny adapter so we can carry kwargs through and + # re-check ``_active`` at dispatch time. + self._loop.call_soon_threadsafe(self._deferred, args, kwargs) + except RuntimeError: + # Caller loop is closed (e.g. astep already returned and the + # bridge fired a trailing notification). Dropping is the + # right behavior — ``_clear_turn_callbacks`` should have + # already unwired this, but guard the race. + logger.debug( + "caller loop closed; dropping marshalled ACP callback", + exc_info=True, + ) + + def _deferred(self, args: tuple[Any, ...], kwargs: dict[str, Any]) -> None: + if not self._active: + return + self._callback(*args, **kwargs) + + def deactivate(self) -> None: + """Drop any in-flight and future invocations. + + Idempotent. Must be called BEFORE ``_clear_turn_callbacks`` + unwires the bridge — otherwise a ``session_update`` racing with + teardown could schedule a callback via the still-wired marshaller + that fires after the state lock has been released. + """ + self._active = False + + class _OpenHandsACPBridge: """Bridge between OpenHands and ACP that accumulates session updates. @@ -1467,7 +1537,7 @@ def _make_caller_loop_marshaller( loop: asyncio.AbstractEventLoop, caller_thread_id: int, callback: Callable[..., Any] | None, - ) -> Callable[..., None] | None: + ) -> _CallerLoopMarshaller | None: """Wrap ``callback`` so portal-thread invocations hop back to ``loop``. ``_OpenHandsACPBridge.session_update`` runs on the portal thread (the @@ -1479,41 +1549,34 @@ def _make_caller_loop_marshaller( the wrong thread and deadlock the conversation (same failure class as #3348, just mid-turn instead of post-turn). - The returned wrapper: + The returned :class:`_CallerLoopMarshaller` is itself callable: - * Runs synchronously if invoked from the caller thread — direct - paths like ``_cancel_inflight_tool_calls`` / ``_finalize_successful_turn`` - go straight through, preserving sync invariants. + * Runs ``callback`` synchronously if invoked from the caller + thread — direct paths like ``_cancel_inflight_tool_calls`` / + ``_finalize_successful_turn`` go straight through, preserving + sync invariants. * Marshals to the caller loop via ``call_soon_threadsafe`` when invoked from any other thread (the portal thread is the motivating case, but third-party threads would be handled too). ``call_soon_threadsafe`` is FIFO per loop, so ordering of bridge callbacks is preserved relative to each other. + On turn cleanup, callers must invoke ``deactivate()`` BEFORE + clearing bridge wiring (see ``_clear_turn_callbacks``). After + deactivation, any ``call_soon_threadsafe`` entries the loop has + not yet ticked become no-ops — without this, a portal-thread + ``session_update`` racing with prompt completion can queue a + bridge callback that fires AFTER ``arun()`` has released the + state lock, leaking stale ``ACPToolCallEvent``s into the event + stream outside the lock. + Returns ``None`` if ``callback`` is ``None`` so callers can wire ``self._reset_client_for_turn(None, None)`` semantics through unchanged. """ if callback is None: return None - - def _invoke(*args: Any, **kwargs: Any) -> None: - if threading.get_ident() == caller_thread_id: - callback(*args, **kwargs) - return - try: - loop.call_soon_threadsafe(lambda: callback(*args, **kwargs)) - except RuntimeError: - # Caller loop is closed (e.g. astep already returned and the - # bridge fired a trailing notification). Dropping is the - # right behavior — _clear_turn_callbacks should have already - # unwired this, but guard the race. - logger.debug( - "caller loop closed; dropping marshalled ACP callback", - exc_info=True, - ) - - return _invoke + return _CallerLoopMarshaller(loop, caller_thread_id, callback) def _build_prompt_blocks_for_latest_user_message( self, state: ConversationState @@ -1683,7 +1746,11 @@ async def astep( marshalled_on_token = self._make_caller_loop_marshaller( loop, caller_thread_id, on_token ) - assert marshalled_on_event is not None # on_event is required + if marshalled_on_event is None: + # on_event is a required, non-Optional parameter — guard + # unconditionally so the invariant survives ``python -O`` + # (where ``assert`` is a no-op). + raise TypeError("on_event is required and must not be None") self._reset_client_for_turn(marshalled_on_token, marshalled_on_event) t0 = time.monotonic() @@ -1795,6 +1862,16 @@ async def astep( self._emit_turn_error(e, state, on_event) raise finally: + # Deactivate marshallers BEFORE unwiring the bridge. A + # portal-thread ``session_update`` racing with prompt + # completion could have queued a ``call_soon_threadsafe`` + # entry that hasn't ticked yet — without deactivation, it + # would fire on the next loop tick (potentially after + # ``arun()`` has exited its ``with state:`` block), leaking + # stale events into the stream outside the lock. + marshalled_on_event.deactivate() + if marshalled_on_token is not None: + marshalled_on_token.deactivate() self._clear_turn_callbacks() def ask_agent(self, question: str) -> str | None: diff --git a/openhands-sdk/openhands/sdk/utils/async_executor.py b/openhands-sdk/openhands/sdk/utils/async_executor.py index ef6186e3e9..e4cb1aa763 100644 --- a/openhands-sdk/openhands/sdk/utils/async_executor.py +++ b/openhands-sdk/openhands/sdk/utils/async_executor.py @@ -6,7 +6,7 @@ from typing import Any import anyio -from anyio.from_thread import start_blocking_portal +from anyio.from_thread import BlockingPortal, start_blocking_portal from openhands.sdk.logger import get_logger @@ -27,7 +27,7 @@ def __init__(self): self._atexit_registered = False @property - def portal(self): + def portal(self) -> BlockingPortal: """The underlying ``BlockingPortal``, lazily started. Public accessor for callers that need to schedule work directly on @@ -37,7 +37,7 @@ def portal(self): """ return self._ensure_portal() - def _ensure_portal(self): + def _ensure_portal(self) -> BlockingPortal: with self._lock: if self._portal is None: self._portal_cm = start_blocking_portal() diff --git a/tests/sdk/agent/test_acp_agent.py b/tests/sdk/agent/test_acp_agent.py index 990fd3ea7b..299a575e4a 100644 --- a/tests/sdk/agent/test_acp_agent.py +++ b/tests/sdk/agent/test_acp_agent.py @@ -1660,6 +1660,101 @@ async def _fake_prompt(prompt_blocks, session_id): ) assert failed_tool_events[0].is_error is True + def test_astep_drops_late_bridge_callbacks_after_turn(self, tmp_path): + """Pending ``call_soon_threadsafe`` bridge entries must NOT fire + after the turn ends. + + Reproduction: a portal-thread ``session_update`` racing with + prompt completion queues a marshalled callback via + ``call_soon_threadsafe``. The loop ticks it AFTER astep's + ``finally`` has run. Without the marshaller's ``deactivate()`` + gate, the user's ``on_event`` would be invoked outside ``arun()``'s + ``with state:`` block, leaking stale ``ACPToolCallEvent``s into + the event stream. With the gate, the late entry is dropped. + """ + from openhands.sdk.event import ACPToolCallEvent + from openhands.sdk.utils.async_executor import AsyncExecutor + + agent = _make_agent() + conversation = self._make_conversation_with_message(tmp_path) + user_callback_invocations: list = [] + + mock_client = _OpenHandsACPBridge() + mock_client.get_turn_usage_update = MagicMock(return_value=object()) + agent._client = mock_client + agent._conn = MagicMock() + + # Hold onto the marshaller wired into the bridge so we can + # simulate a portal-thread fire AFTER the turn's finally has + # deactivated it. + stashed_marshaller: list = [] + + async def _fake_prompt(prompt_blocks, session_id): + stashed_marshaller.append(mock_client.on_event) + mock_client.accumulated_text.append("ok") + return None + + agent._conn.prompt = _fake_prompt + agent._session_id = "test-session" + + executor = AsyncExecutor() + try: + agent._executor = executor + + def _capture(event): + user_callback_invocations.append(event) + + async def _drive() -> None: + await agent.astep(conversation, on_event=_capture) + # astep's finally has run deactivate() + _clear_turn_callbacks. + # Now simulate a stragglerportal-thread session_update that + # somehow still holds the (now stale) marshaller reference + # and fires. call_soon_threadsafe schedules onto our + # running loop; one more loop tick processes it. + assert len(stashed_marshaller) == 1 + marshaller = stashed_marshaller[0] + # Run from a worker thread so the cross-thread branch + # (call_soon_threadsafe) is exercised, not the same-thread + # shortcut. + done = threading.Event() + + def _fire_from_other_thread(): + marshaller( + ACPToolCallEvent( + tool_call_id="tc-late-1", + title="late", + status="in_progress", + tool_kind=None, + raw_input=None, + raw_output=None, + content=None, + is_error=False, + ) + ) + done.set() + + t = threading.Thread(target=_fire_from_other_thread) + t.start() + t.join() + # Yield twice so any pending call_soon_threadsafe entry + # gets a chance to tick. + await asyncio.sleep(0) + await asyncio.sleep(0) + + asyncio.run(_drive()) + finally: + executor.close() + + late_events = [ + e + for e in user_callback_invocations + if isinstance(e, ACPToolCallEvent) and e.tool_call_id == "tc-late-1" + ] + assert late_events == [], ( + f"late marshalled callback should have been dropped after " + f"deactivate(), but reached user on_event: {late_events}" + ) + # --------------------------------------------------------------------------- # Cleanup From e1b3264efd989aa170ce86f63f5b25d4f66f7afb Mon Sep 17 00:00:00 2001 From: Debug Agent Date: Fri, 22 May 2026 11:35:48 +0200 Subject: [PATCH 5/6] docs(acp): correct wait_for timeout comment in astep MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The previous comment claimed the portal task scheduled via ``BlockingPortal.start_task_soon`` is NOT cancelled by ``asyncio.wait_for`` and that ``_clear_turn_callbacks`` has already muted bridge callbacks by the time the outer ``except TimeoutError`` runs. Both claims were wrong: 1. ``asyncio.wait_for`` does cancel the wrapped future on timeout, and that cancellation propagates through ``asyncio.wrap_future`` to the underlying ``concurrent.futures.Future`` returned by ``start_task_soon`` — AnyIO uses that to cancel the portal task, so ``_do_acp_prompt`` receives a ``CancelledError`` and tears down on the portal loop. Verified empirically: future.cancelled() == True, portal task observed CancelledError. 2. ``_clear_turn_callbacks`` runs in the ``finally`` block, which is AFTER the outer ``except TimeoutError`` (which is what calls ``_emit_turn_timeout`` → ``_cancel_inflight_tool_calls``). The marshallers are still active during that handler, which is exactly what lets ``_cancel_inflight_tool_calls`` route orphan-tool events to the user's ``on_event`` via the marshaller's same-thread branch. Updated the comment to describe the actual flow. No behavior change. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../openhands/sdk/agent/acp_agent.py | 25 +++++++++++++------ 1 file changed, 17 insertions(+), 8 deletions(-) diff --git a/openhands-sdk/openhands/sdk/agent/acp_agent.py b/openhands-sdk/openhands/sdk/agent/acp_agent.py index dc4a06c792..751b6991c5 100644 --- a/openhands-sdk/openhands/sdk/agent/acp_agent.py +++ b/openhands-sdk/openhands/sdk/agent/acp_agent.py @@ -1783,14 +1783,23 @@ async def astep( # Surface to the outer handler below so step and astep # share error-event semantics. # - # The portal task scheduled via ``start_task_soon`` is - # NOT cancelled by ``asyncio.wait_for`` — it keeps - # running on the portal loop until the underlying ACP - # connection eventually responds or itself times out. - # This is benign: ``_clear_turn_callbacks`` in the - # ``finally`` block has already muted the bridge - # callbacks, and asyncio silently discards a result - # arriving on an already-resolved wrap_future. + # ``asyncio.wait_for`` cancels the wrapped future on + # timeout; that cancellation propagates through + # ``asyncio.wrap_future`` to the underlying + # ``concurrent.futures.Future`` returned by + # ``BlockingPortal.start_task_soon``, which AnyIO uses + # to cancel the portal task — so ``_do_acp_prompt`` + # receives a ``CancelledError`` and tears down on the + # portal loop. Verified empirically: ``future.cancelled()`` + # is True after timeout and the portal task observes + # ``CancelledError``. The outer ``except TimeoutError`` + # below runs next (``_emit_turn_timeout`` → emits the + # error message, calls ``_cancel_inflight_tool_calls`` + # to close orphan tool cards on the caller thread via + # the marshaller's same-thread branch), and finally + # ``deactivate()`` + ``_clear_turn_callbacks`` drop any + # straggler bridge callback that races in before the + # portal task fully unwinds. raise TimeoutError( f"ACP prompt timed out after {self.acp_prompt_timeout:.0f}s" ) from exc From abd1ed0d4b5a1367d8a4b38026f79a6815de773a Mon Sep 17 00:00:00 2001 From: Debug Agent Date: Fri, 22 May 2026 14:20:59 +0200 Subject: [PATCH 6/6] refactor(acp): drop exc_info on caller-loop-closed log + add _deferred guard test MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Addresses two more review suggestions on PR #3358: 1. ``logger.debug("caller loop closed; dropping marshalled ACP callback", exc_info=True)`` emitted a full traceback for every trailing ``session_update`` race after astep exits — noisy and not useful at DEBUG since the message is self-explanatory. Dropped ``exc_info``. 2. ``test_astep_drops_late_bridge_callbacks_after_turn`` exercises only the ``__call__`` entry-guard (``if not self._active: return`` at the top), not the deferred-dispatch guard in ``_deferred``. By the time the late marshaller fires, astep's ``finally`` has already ``deactivate()``-ed, so the cross-thread branch is short-circuited at entry and ``call_soon_threadsafe`` is never reached. Added a sister test ``test_astep_deferred_dropped_when_deactivated_after_enqueue`` that reproduces the actual race (queue ``_deferred`` BEFORE deactivation, deactivate, then yield the loop) by manually calling ``loop.call_soon_threadsafe(marshaller._deferred, (event,), {})`` and then ``marshaller.deactivate()``. Verified the test fails without the ``if not self._active`` guard in ``_deferred``: with the guard stripped, the queued callback runs and the stale event reaches the user's ``on_event``. Also corrected the comment on the entry-guard test to match what it actually exercises. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../openhands/sdk/agent/acp_agent.py | 1 - tests/sdk/agent/test_acp_agent.py | 104 ++++++++++++++++-- 2 files changed, 97 insertions(+), 8 deletions(-) diff --git a/openhands-sdk/openhands/sdk/agent/acp_agent.py b/openhands-sdk/openhands/sdk/agent/acp_agent.py index 751b6991c5..ae1dcb2358 100644 --- a/openhands-sdk/openhands/sdk/agent/acp_agent.py +++ b/openhands-sdk/openhands/sdk/agent/acp_agent.py @@ -394,7 +394,6 @@ def __call__(self, *args: Any, **kwargs: Any) -> None: # already unwired this, but guard the race. logger.debug( "caller loop closed; dropping marshalled ACP callback", - exc_info=True, ) def _deferred(self, args: tuple[Any, ...], kwargs: dict[str, Any]) -> None: diff --git a/tests/sdk/agent/test_acp_agent.py b/tests/sdk/agent/test_acp_agent.py index 299a575e4a..0d7eea4d40 100644 --- a/tests/sdk/agent/test_acp_agent.py +++ b/tests/sdk/agent/test_acp_agent.py @@ -1707,15 +1707,16 @@ def _capture(event): async def _drive() -> None: await agent.astep(conversation, on_event=_capture) # astep's finally has run deactivate() + _clear_turn_callbacks. - # Now simulate a stragglerportal-thread session_update that + # Now simulate a straggler portal-thread session_update that # somehow still holds the (now stale) marshaller reference - # and fires. call_soon_threadsafe schedules onto our - # running loop; one more loop tick processes it. + # and fires. ``deactivate()`` already set ``_active=False``, + # so the marshaller's ``__call__`` short-circuits at the + # entry guard — this test exercises that guard. The + # separate ``_deferred``-guard test below covers the case + # where a ``call_soon_threadsafe`` entry has already been + # queued before deactivation. assert len(stashed_marshaller) == 1 marshaller = stashed_marshaller[0] - # Run from a worker thread so the cross-thread branch - # (call_soon_threadsafe) is exercised, not the same-thread - # shortcut. done = threading.Event() def _fire_from_other_thread(): @@ -1737,7 +1738,7 @@ def _fire_from_other_thread(): t.start() t.join() # Yield twice so any pending call_soon_threadsafe entry - # gets a chance to tick. + # would get a chance to tick. await asyncio.sleep(0) await asyncio.sleep(0) @@ -1755,6 +1756,95 @@ def _fire_from_other_thread(): f"deactivate(), but reached user on_event: {late_events}" ) + def test_astep_deferred_dropped_when_deactivated_after_enqueue(self, tmp_path): + """Exercises ``_CallerLoopMarshaller._deferred``'s active guard. + + Sister test to ``test_astep_drops_late_bridge_callbacks_after_turn``, + which exercises only the entry-guard short-circuit (because by + the time it fires, ``deactivate()`` has already been called). + Here we manually queue ``marshaller._deferred`` via + ``call_soon_threadsafe`` BEFORE deactivation — same race as a + portal-thread ``session_update`` that fires while astep is still + in flight but whose loop tick lands after astep's ``finally``. + We then deactivate, yield the loop, and assert the user's + ``on_event`` was not invoked. + """ + from openhands.sdk.event import ACPToolCallEvent + from openhands.sdk.utils.async_executor import AsyncExecutor + + agent = _make_agent() + conversation = self._make_conversation_with_message(tmp_path) + user_callback_invocations: list = [] + stashed_marshaller: list = [] + + mock_client = _OpenHandsACPBridge() + mock_client.get_turn_usage_update = MagicMock(return_value=object()) + agent._client = mock_client + agent._conn = MagicMock() + + async def _fake_prompt(prompt_blocks, session_id): + stashed_marshaller.append(mock_client.on_event) + mock_client.accumulated_text.append("ok") + return None + + agent._conn.prompt = _fake_prompt + agent._session_id = "test-session" + + executor = AsyncExecutor() + try: + agent._executor = executor + + def _capture(event): + user_callback_invocations.append(event) + + async def _drive() -> None: + await agent.astep(conversation, on_event=_capture) + assert len(stashed_marshaller) == 1 + marshaller = stashed_marshaller[0] + + # astep's finally has already called deactivate(). Force + # ``_active`` back to True so we can simulate the race: + # ``__call__`` fired and enqueued ``_deferred`` via + # ``call_soon_threadsafe`` BEFORE deactivation lands. + marshaller._active = True + loop = asyncio.get_running_loop() + event = ACPToolCallEvent( + tool_call_id="tc-deferred-1", + title="late deferred", + status="in_progress", + tool_kind=None, + raw_input=None, + raw_output=None, + content=None, + is_error=False, + ) + # Queue _deferred directly — this is what __call__ would + # do via call_soon_threadsafe from another thread. + loop.call_soon_threadsafe(marshaller._deferred, (event,), {}) + + # Now flip _active to False BEFORE the loop ticks the + # queued entry — this is the race the _deferred guard + # exists to handle. + marshaller.deactivate() + + # Yield twice so the queued _deferred runs. + await asyncio.sleep(0) + await asyncio.sleep(0) + + asyncio.run(_drive()) + finally: + executor.close() + + deferred_events = [ + e + for e in user_callback_invocations + if isinstance(e, ACPToolCallEvent) and e.tool_call_id == "tc-deferred-1" + ] + assert deferred_events == [], ( + f"_deferred should have dropped the queued callback after " + f"deactivate(), but reached user on_event: {deferred_events}" + ) + # --------------------------------------------------------------------------- # Cleanup