From ea1b4e9c4e249b1b65afe94d2026b855dc980749 Mon Sep 17 00:00:00 2001 From: Anthony Casagrande Date: Fri, 12 Jun 2026 10:53:16 -0700 Subject: [PATCH 1/9] fix(agentic): dispatch all profiling trajectories concurrently at startup _execute_profiling was serially awaiting each trajectory's first credit, blocking trajectory K+1 until K's dispatch completed. With N=256 and slow per-trajectory work (snapshot materialization, tokenization), this caused the full concurrency target to take ~54s to reach on a real cluster instead of bursting at t=0. Replace the serial for-loop with asyncio.gather over a new _dispatch_one_profiling_trajectory helper so all trajectories begin their initial credit issuance concurrently. Regression test verifies that N terminal-root snapshot trajectories all reach issue_credit simultaneously rather than one-at-a-time. Root cause introduced in commit f47bd5537e (Cam Quilici, 2026-06-03). Co-Authored-By: Claude Sonnet 4.6 Signed-off-by: Anthony Casagrande --- .../timing/strategies/agentic_replay.py | 69 +++++++----- .../timing/strategies/test_agentic_replay.py | 104 ++++++++++++++++++ 2 files changed, 144 insertions(+), 29 deletions(-) diff --git a/src/aiperf/timing/strategies/agentic_replay.py b/src/aiperf/timing/strategies/agentic_replay.py index fe59dab46..65fa49186 100644 --- a/src/aiperf/timing/strategies/agentic_replay.py +++ b/src/aiperf/timing/strategies/agentic_replay.py @@ -260,43 +260,54 @@ async def _execute_warmup(self) -> None: async def _execute_profiling(self) -> None: """Resume each trajectory at ``k_i + 1`` to seed the steady state. - Subsequent turns and recycle-pool sessions are dispatched from - handle_credit_return. + All trajectories are dispatched concurrently so the full concurrency + target is reached immediately rather than serializing over N credit + round-trips. Subsequent turns and recycle-pool sessions are dispatched + from handle_credit_return. """ self.info( f"PROFILING execute: resuming {len(self.conversation_source.trajectories)} " f"trajectory sessions" ) - for lane, trajectory in enumerate(self.conversation_source.trajectories): - if trajectory.snapshot is not None: - await self._dispatch_snapshot_for_profiling(trajectory, lane) - continue + await asyncio.gather( + *[ + self._dispatch_one_profiling_trajectory(lane, trajectory) + for lane, trajectory in enumerate(self.conversation_source.trajectories) + ] + ) - session = self.conversation_source.session_for(trajectory) - self._correlation_to_lane[session.x_correlation_id] = lane - self._active_traces[trajectory.conversation_id] += 1 - self._mint_marker_for_session( - session.x_correlation_id, trajectory.conversation_id, lane + async def _dispatch_one_profiling_trajectory( + self, lane: int, trajectory: Trajectory + ) -> None: + if trajectory.snapshot is not None: + await self._dispatch_snapshot_for_profiling(trajectory, lane) + return + + session = self.conversation_source.session_for(trajectory) + self._correlation_to_lane[session.x_correlation_id] = lane + self._active_traces[trajectory.conversation_id] += 1 + self._mint_marker_for_session( + session.x_correlation_id, trajectory.conversation_id, lane + ) + resume_index = trajectory.start_turn_index + 1 + num_turns = len(session.metadata.turns) + + if resume_index >= num_turns: + # Trajectory's k_i was already the last turn (rare: happens + # only for very short traces). Skip directly to recycle. + self.debug( + lambda cid=trajectory.conversation_id, + k=trajectory.start_turn_index, + n=num_turns: f"Trajectory {cid} k_i={k} >= last turn (n={n}); recycling immediately" ) - resume_index = trajectory.start_turn_index + 1 - num_turns = len(session.metadata.turns) - - if resume_index >= num_turns: - # Trajectory's k_i was already the last turn (rare: happens - # only for very short traces). Skip directly to recycle. - self.debug( - lambda cid=trajectory.conversation_id, - k=trajectory.start_turn_index, - n=num_turns: f"Trajectory {cid} k_i={k} >= last turn (n={n}); recycling immediately" - ) - await self._spawn_from_recycle_or_id( - trajectory.conversation_id, - finished_correlation_id=session.x_correlation_id, - ) - continue + await self._spawn_from_recycle_or_id( + trajectory.conversation_id, + finished_correlation_id=session.x_correlation_id, + ) + return - turn = self._build_turn_for_session(session, resume_index) - await self.credit_issuer.issue_credit(turn) + turn = self._build_turn_for_session(session, resume_index) + await self.credit_issuer.issue_credit(turn) async def handle_credit_return( self, credit: Credit, *, error: str | None = None diff --git a/tests/unit/timing/strategies/test_agentic_replay.py b/tests/unit/timing/strategies/test_agentic_replay.py index 50f84bace..5195ef5fd 100644 --- a/tests/unit/timing/strategies/test_agentic_replay.py +++ b/tests/unit/timing/strategies/test_agentic_replay.py @@ -616,6 +616,110 @@ async def capture(turn): assert issued[0].x_correlation_id != "warmed-root" +@pytest.mark.asyncio +async def test_terminal_root_snapshot_recycles_are_concurrent_not_serial(): + """Regression: terminal-root recycles must not serialize profiling startup. + + Commit f47bd5537e introduced `await _spawn_from_recycle_or_id(...)` inside + the per-trajectory loop in _dispatch_snapshot_for_profiling. With N terminal- + root trajectories this blocked the Kth dispatch until the (K-1)th recycle + completed, causing all 256 sessions to trickle in over ~54 s instead of + bursting at t=0 on a real cluster. + """ + N = 3 + # N single-turn traces become terminal roots: snapshot root is at turn 0 + # (the only turn), so _snapshot_continuation_after_warmup drops the state + # (resume_index=1 >= len(turns)=1) and it lands in terminal_roots. + # Extra traces give the recycle queue enough depth for N concurrent pops. + conversations = [ + ConversationMetadata( + conversation_id=f"trace_{i}", + turns=[TurnMetadata(timestamp_ms=float(i * 1000))], + ) + for i in range(N * 2) + ] + ds = DatasetMetadata( + conversations=conversations, + sampling_strategy=DatasetSamplingStrategy.SEQUENTIAL, + ) + trajectories = [ + Trajectory( + conversation_id=f"trace_{i}", + start_turn_index=0, + snapshot=TrajectorySnapshot( + t_star_ms=float(i * 1000), + states=( + ConversationState( + conversation_id=f"trace_{i}", + x_correlation_id=f"warmed-{i}", + next_turn_index=0, + ), + ), + ), + ) + for i in range(N) + ] + + src = TrajectorySource.__new__(TrajectorySource) + src._dataset_metadata = ds + src._dataset_sampler = MagicMock() + src._metadata_lookup = {c.conversation_id: c for c in conversations} + src._random_seed = 0 + src._target_size = N + src.trajectories = trajectories + + # Gate that blocks each recycled credit until we release it. + # in_flight tracks how many issue_credit calls are simultaneously blocked + # at the gate — if serial only 1 is ever in-flight, if concurrent all N are. + gate = asyncio.Event() + in_flight = 0 + max_in_flight = 0 + + async def gated_issue_credit(turn): + nonlocal in_flight, max_in_flight + in_flight += 1 + max_in_flight = max(max_in_flight, in_flight) + await gate.wait() + in_flight -= 1 + return True + + issuer = AsyncMock() + issuer.issue_credit.side_effect = gated_issue_credit + stop_checker = MagicMock() + stop_checker.can_start_new_session.return_value = True + cfg = MagicMock() + cfg.phase = CreditPhase.PROFILING + cfg.concurrency = N + + strategy = AgenticReplayStrategy( + config=cfg, + conversation_source=src, + scheduler=MagicMock(), + stop_checker=stop_checker, + credit_issuer=issuer, + lifecycle=MagicMock(), + ) + + await strategy.setup_phase() + task = asyncio.create_task(strategy.execute_phase()) + + # Pump the event loop enough times for all concurrent dispatches to reach + # the gate. With the serial bug only 1 will be in-flight; with the fix + # all N will be blocked at gate.wait() simultaneously. + for _ in range(N + 4): + await asyncio.sleep(0) + + concurrent_at_gate = in_flight + gate.set() + await task + + assert concurrent_at_gate == N, ( + f"Expected {N} recycled credits in-flight simultaneously at profiling " + f"startup, but only {concurrent_at_gate} were. Terminal-root recycles " + "appear to be dispatched serially, blocking the startup loop." + ) + + @pytest.mark.asyncio async def test_profiling_skips_trajectory_at_last_turn_and_recycles(): """If k_i is already the last turn, k_i+1 is out of range. Recycle immediately.""" From c0bf05e6a16300c09016cc959337118f28d11bb6 Mon Sep 17 00:00:00 2001 From: Anthony Casagrande Date: Fri, 12 Jun 2026 11:50:12 -0700 Subject: [PATCH 2/9] fix(agentic): contain per-lane dispatch failures in profiling startup gather A bare asyncio.gather re-raises the first lane's exception while the remaining dispatch coroutines keep running detached - issuing credits and mutating strategy state in a phase that is already failing, and unreachable by the phase runner's cancellation. Sibling exceptions were also silently swallowed. Gather with return_exceptions=True, log every failed lane with its trace_id, and re-raise the first failure only after all lanes settle. Matches the established fan-out pattern in branch_orchestrator. Co-Authored-By: Claude Fable 5 Signed-off-by: Anthony Casagrande --- .../timing/strategies/agentic_replay.py | 24 +++++++-- .../timing/strategies/test_agentic_replay.py | 50 +++++++++++++++++++ 2 files changed, 71 insertions(+), 3 deletions(-) diff --git a/src/aiperf/timing/strategies/agentic_replay.py b/src/aiperf/timing/strategies/agentic_replay.py index 65fa49186..e40eabdc5 100644 --- a/src/aiperf/timing/strategies/agentic_replay.py +++ b/src/aiperf/timing/strategies/agentic_replay.py @@ -269,12 +269,30 @@ async def _execute_profiling(self) -> None: f"PROFILING execute: resuming {len(self.conversation_source.trajectories)} " f"trajectory sessions" ) - await asyncio.gather( - *[ + # return_exceptions=True keeps ownership of every lane until it + # settles: a bare gather would re-raise the first failure while the + # sibling coroutines keep issuing credits into a failing phase, + # unreachable by the phase runner's cancellation. + results = await asyncio.gather( + *( self._dispatch_one_profiling_trajectory(lane, trajectory) for lane, trajectory in enumerate(self.conversation_source.trajectories) - ] + ), + return_exceptions=True, ) + first_error: BaseException | None = None + for lane, result in enumerate(results): + if not isinstance(result, BaseException): + continue + trace_id = self.conversation_source.trajectories[lane].conversation_id + self.error( + f"PROFILING dispatch failed for lane {lane} " + f"(trace_id={trace_id!r}): {result!r}" + ) + if first_error is None: + first_error = result + if first_error is not None: + raise first_error async def _dispatch_one_profiling_trajectory( self, lane: int, trajectory: Trajectory diff --git a/tests/unit/timing/strategies/test_agentic_replay.py b/tests/unit/timing/strategies/test_agentic_replay.py index 5195ef5fd..5cd03c6a3 100644 --- a/tests/unit/timing/strategies/test_agentic_replay.py +++ b/tests/unit/timing/strategies/test_agentic_replay.py @@ -720,6 +720,56 @@ async def gated_issue_credit(turn): ) +@pytest.mark.asyncio +async def test_profiling_dispatch_error_waits_for_siblings_and_reraises(): + """One lane's dispatch failure must not detach the sibling dispatches. + + execute_phase must keep ownership of every sibling lane until it settles, + then re-raise the failure. A bare gather would return the exception while + the remaining lanes keep issuing credits into a failing phase unsupervised. + """ + N = 3 + trajectories = [ + Trajectory(conversation_id=f"trace_{i}", start_turn_index=0) for i in range(N) + ] + gate = asyncio.Event() + completed: list[str] = [] + + async def gated_issue_credit(turn): + if turn.conversation_id == "trace_1": + raise RuntimeError("lane boom") + await gate.wait() + completed.append(turn.conversation_id) + return True + + issuer = AsyncMock() + issuer.issue_credit.side_effect = gated_issue_credit + strategy, _, _, _ = _make_strategy( + phase=CreditPhase.PROFILING, + trajectories=trajectories, + num_traces=N, + turns_per_trace=4, + issuer=issuer, + ) + await strategy.setup_phase() + task = asyncio.create_task(strategy.execute_phase()) + for _ in range(N + 4): + await asyncio.sleep(0) + + # Lane 1 has already raised, but lanes 0 and 2 are still blocked at the + # gate: phase execution must still be in flight rather than finished + # with an exception while its siblings run detached. + assert not task.done(), ( + "execute_phase finished while sibling dispatches were still in " + "flight - one lane's failure detached the remaining lanes" + ) + + gate.set() + with pytest.raises(RuntimeError, match="lane boom"): + await asyncio.wait_for(task, timeout=5.0) + assert sorted(completed) == ["trace_0", "trace_2"] + + @pytest.mark.asyncio async def test_profiling_skips_trajectory_at_last_turn_and_recycles(): """If k_i is already the last turn, k_i+1 is out of range. Recycle immediately.""" From 5e2d20b0d976ba023810021a1aa5aeffbc4b4c1a Mon Sep 17 00:00:00 2001 From: Anthony Casagrande Date: Fri, 12 Jun 2026 11:58:07 -0700 Subject: [PATCH 3/9] fix(agentic): pre-register live trajectory lanes before profiling dispatch A lane that recycles immediately at PROFILING startup (terminal-root snapshot or k_i at the last turn) pops the recycle queue before later lanes have registered themselves in _active_traces, so the pop's duplicate-session guard sees count 0 for a still-live trace and spawns a second concurrent session of it - also stealing that trace's pass=0 cache-bust digest from the continuing warmup session. Pre-register every lane's trace in _active_traces during setup_phase (exactly the _lanes_per_trace multiset) instead of lane-by-lane during dispatch, so the guard sees final counts from t=0 regardless of dispatch interleaving. Tests that hand-simulated the old per-lane increments drop them accordingly. Co-Authored-By: Claude Fable 5 Signed-off-by: Anthony Casagrande --- .../timing/strategies/agentic_replay.py | 23 ++--- .../timing/strategies/test_agentic_replay.py | 86 +++++++++++++++++-- .../test_agentic_replay_context_overflow.py | 10 +-- ...test_agentic_replay_recycle_adversarial.py | 38 +++----- 4 files changed, 107 insertions(+), 50 deletions(-) diff --git a/src/aiperf/timing/strategies/agentic_replay.py b/src/aiperf/timing/strategies/agentic_replay.py index e40eabdc5..48d59b5b8 100644 --- a/src/aiperf/timing/strategies/agentic_replay.py +++ b/src/aiperf/timing/strategies/agentic_replay.py @@ -176,10 +176,12 @@ async def setup_phase(self) -> None: TimingManager construction time. PROFILING: build the FIFO recycle queue with the FULL set of loader - trace_ids (including trajectory ids). Trajectories run live at - PROFILING start (resumed at k_i+1); the pop loop in - ``_spawn_from_recycle_or_id`` skips trace_ids whose session is - currently active so we never spawn a duplicate concurrent session. + trace_ids (including trajectory ids), and pre-register every live + trajectory lane in ``_active_traces``. Trajectories run live at + PROFILING start (resumed at k_i+1); pre-registering them here - + rather than lane-by-lane during dispatch - means a lane that + recycles immediately at startup can never pop a trace whose own + lane simply hasn't dispatched yet (a duplicate concurrent session). """ if self.config.phase == CreditPhase.PROFILING: if not self.conversation_source.trajectories: @@ -188,6 +190,7 @@ async def setup_phase(self) -> None: "WARMUP must complete with at least one trajectory before " "PROFILING can start. Check loader output and warmup failures." ) + self._active_traces.update(self._lanes_per_trace) self._recycle_queue = asyncio.Queue() # Recycle pool spans the FULL dataset, not (full - trajectories). # Trajectories run live at PROFILING start (resumed at k_i+1) and @@ -303,7 +306,7 @@ async def _dispatch_one_profiling_trajectory( session = self.conversation_source.session_for(trajectory) self._correlation_to_lane[session.x_correlation_id] = lane - self._active_traces[trajectory.conversation_id] += 1 + # The lane's trace was pre-registered in _active_traces by setup_phase. self._mint_marker_for_session( session.x_correlation_id, trajectory.conversation_id, lane ) @@ -425,8 +428,9 @@ async def _spawn_from_recycle_or_id( The initial recycle queue spans the full dataset pool (including trajectory trace_ids whose sessions are running live at PROFILING - start). The pop loop skips trace_ids in ``_active_traces`` and - re-enqueues them to avoid duplicate concurrent sessions. + start; every live lane is pre-registered in ``_active_traces`` by + ``setup_phase``). The pop loop skips trace_ids in ``_active_traces`` + and re-enqueues them to avoid duplicate concurrent sessions. """ # Prune unconditionally so every early-return path leaves dicts clean. self._session_marker.pop(finished_correlation_id, None) @@ -481,10 +485,10 @@ async def _dispatch_snapshot_for_profiling( ) -> None: warmup_snapshot = self._get_snapshot(trajectory) snapshot = self._snapshot_continuation_after_warmup(trajectory) + # Each lane's single root session (continuing or terminal) was + # pre-registered in _active_traces by setup_phase. for state in snapshot.states: self._correlation_to_lane[state.x_correlation_id] = lane - if state.agent_depth == 0: - self._active_traces[state.conversation_id] += 1 self._mint_marker_for_session( state.x_correlation_id, state.conversation_id, lane ) @@ -501,7 +505,6 @@ async def _dispatch_snapshot_for_profiling( ] for state in terminal_roots: self._correlation_to_lane[state.x_correlation_id] = lane - self._active_traces[state.conversation_id] += 1 self._mint_marker_for_session( state.x_correlation_id, state.conversation_id, lane ) diff --git a/tests/unit/timing/strategies/test_agentic_replay.py b/tests/unit/timing/strategies/test_agentic_replay.py index 5cd03c6a3..54a1c852f 100644 --- a/tests/unit/timing/strategies/test_agentic_replay.py +++ b/tests/unit/timing/strategies/test_agentic_replay.py @@ -54,13 +54,14 @@ def _build_real_trajectory_source( num_traces: int, turns_per_trace: int, trajectories: list[Trajectory], + dataset: DatasetMetadata | None = None, ) -> TrajectorySource: """Build a real TrajectorySource with deterministic trajectories. We construct the source via __new__ + manual init so we control the trajectories exactly (avoid randomization in tests). """ - ds = _make_dataset(num_traces, turns_per_trace) + ds = dataset if dataset is not None else _make_dataset(num_traces, turns_per_trace) src = TrajectorySource.__new__(TrajectorySource) src._dataset_metadata = ds @@ -81,8 +82,11 @@ def _make_strategy( issuer: AsyncMock | None = None, scheduler: MagicMock | None = None, user_config: object | None = None, + dataset: DatasetMetadata | None = None, ) -> tuple[AgenticReplayStrategy, AsyncMock, MagicMock, TrajectorySource]: - src = _build_real_trajectory_source(num_traces, turns_per_trace, trajectories) + src = _build_real_trajectory_source( + num_traces, turns_per_trace, trajectories, dataset=dataset + ) cfg = MagicMock() cfg.phase = phase cfg.concurrency = len(trajectories) @@ -720,6 +724,73 @@ async def gated_issue_credit(turn): ) +@pytest.mark.asyncio +async def test_startup_recycle_does_not_pop_live_trajectory_trace(): + """A startup recycle must not spawn a trace whose own lane hasn't dispatched. + + setup_phase pre-registers every live trajectory lane in _active_traces: + without that, an early lane that recycles immediately pops the queue head + before later lanes register themselves, spawning a duplicate concurrent + session of a trace that its own lane is about to resume. + """ + # Lane 0: single-turn snapshot on trace_1 -> terminal root, recycles at + # startup. Lane 1: plain trajectory resuming trace_0 at k_i+1. Dataset + # order puts trace_0 at the queue head, so a full-pool queue would hand + # lane 0's recycle a fresh turn-0 session of the still-live trace_0. + conversations = [ + ConversationMetadata( + conversation_id="trace_0", + turns=[TurnMetadata(), TurnMetadata()], + ), + ConversationMetadata( + conversation_id="trace_1", + turns=[TurnMetadata()], + ), + ] + ds = DatasetMetadata( + conversations=conversations, + sampling_strategy=DatasetSamplingStrategy.SEQUENTIAL, + ) + trajectories = [ + Trajectory( + conversation_id="trace_1", + start_turn_index=0, + snapshot=TrajectorySnapshot( + t_star_ms=0.0, + states=( + ConversationState( + conversation_id="trace_1", + x_correlation_id="warmed-1", + next_turn_index=0, + ), + ), + ), + ), + Trajectory(conversation_id="trace_0", start_turn_index=0), + ] + captured: list[tuple[str, int]] = [] + + async def capture(turn): + captured.append((turn.conversation_id, turn.turn_index)) + return True + + issuer = AsyncMock() + issuer.issue_credit.side_effect = capture + strategy, _, _, _ = _make_strategy( + phase=CreditPhase.PROFILING, + trajectories=trajectories, + issuer=issuer, + dataset=ds, + ) + await strategy.setup_phase() + await strategy.execute_phase() + + # Lane 0's recycle must spawn its own just-finished trace_1 (the only + # eligible trace), never a turn-0 duplicate of the live trace_0. + assert sorted(captured) == [("trace_0", 1), ("trace_1", 0)] + assert strategy._active_traces["trace_0"] <= 1 + + @pytest.mark.asyncio async def test_profiling_dispatch_error_waits_for_siblings_and_reraises(): """One lane's dispatch failure must not detach the sibling dispatches. @@ -916,11 +987,9 @@ async def capture(turn): # Register the in-flight session's lane bookkeeping (normally done by # _execute_profiling); handle_credit_return's recycle path now requires - # finished_correlation_id to be in _correlation_to_lane. Also seed - # _active_traces so the new full-pool pop loop's skip-active-on-pop - # logic mirrors a real run. + # finished_correlation_id to be in _correlation_to_lane. _active_traces + # was already pre-registered by setup_phase. strategy._correlation_to_lane["xcorr"] = 0 - strategy._active_traces["trace_0"] += 1 issuer.issue_credit.reset_mock() issued_sessions.clear() @@ -1002,10 +1071,9 @@ async def capture(turn): assert strategy._recycle_queue.qsize() == 1 # Register the in-flight session's lane (normally done by _execute_profiling). - # Also seed _active_traces so the new pop loop skips trace_0 while it is - # nominally alive — discard happens at the top of _spawn_from_recycle_or_id. + # _active_traces was already pre-registered by setup_phase — discard + # happens at the top of _spawn_from_recycle_or_id. strategy._correlation_to_lane["xcorr"] = 0 - strategy._active_traces["trace_0"] += 1 issuer.issue_credit.reset_mock() issued_sessions.clear() diff --git a/tests/unit/timing/strategies/test_agentic_replay_context_overflow.py b/tests/unit/timing/strategies/test_agentic_replay_context_overflow.py index a418c6785..df664a486 100644 --- a/tests/unit/timing/strategies/test_agentic_replay_context_overflow.py +++ b/tests/unit/timing/strategies/test_agentic_replay_context_overflow.py @@ -137,11 +137,10 @@ async def capture(turn): issuer=issuer, ) await strategy.setup_phase() - # Seed lane and _active_traces (the new full-pool pop loop skips alive - # trace_ids). The finishing trace is discarded from _active_traces at + # Seed the lane mapping; _active_traces was pre-registered by + # setup_phase. The finishing trace is discarded from _active_traces at # the top of _spawn_from_recycle_or_id before the pop loop runs. strategy._correlation_to_lane["xcorr"] = 0 - strategy._active_traces["trace_0"] += 1 # Mid-trajectory turn (index 2 of 5) errors with context-overflow. mid = _make_credit(conversation_id="trace_0", turn_index=2, num_turns=5) @@ -230,11 +229,10 @@ async def capture(turn): issuer=issuer, ) await strategy.setup_phase() - # Seed lane and _active_traces (the new full-pool pop loop skips alive - # trace_ids; the finishing trace is discarded at the top of + # Seed the lane mapping; _active_traces was pre-registered by + # setup_phase (the finishing trace is discarded at the top of # _spawn_from_recycle_or_id before the pop loop runs). strategy._correlation_to_lane["xcorr"] = 0 - strategy._active_traces["trace_0"] += 1 final = _make_credit(conversation_id="trace_0", turn_index=2, num_turns=3) await strategy.handle_credit_return( diff --git a/tests/unit/timing/strategies/test_agentic_replay_recycle_adversarial.py b/tests/unit/timing/strategies/test_agentic_replay_recycle_adversarial.py index 8d3431cb1..b2e0d8d17 100644 --- a/tests/unit/timing/strategies/test_agentic_replay_recycle_adversarial.py +++ b/tests/unit/timing/strategies/test_agentic_replay_recycle_adversarial.py @@ -155,9 +155,8 @@ async def capture(turn): assert strategy._recycle_queue.qsize() == 1 # Register the in-flight session's lane (normally done by _execute_profiling). - # Seed _active_traces so the new pop loop skips trace_0 while it is alive. + # _active_traces was already pre-registered by setup_phase. strategy._correlation_to_lane["xcorr"] = 0 - strategy._active_traces["trace_0"] += 1 # Final turn (last index = 2 of num_turns=3) final = _make_credit(conversation_id="trace_0", turn_index=2, num_turns=3) @@ -212,13 +211,10 @@ async def capture(turn): # Register lane bookkeeping for both in-flight sessions (normally seeded by # _execute_profiling). handle_credit_return's recycle path requires - # finished_correlation_id to be in _correlation_to_lane. Seed - # _active_traces too: the new full-pool pop loop skips trace_ids whose - # session is currently alive, mirroring _execute_profiling behavior. + # finished_correlation_id to be in _correlation_to_lane. _active_traces + # was already pre-registered by setup_phase. strategy._correlation_to_lane["xcorr_a"] = 0 strategy._correlation_to_lane["xcorr_b"] = 1 - strategy._active_traces["trace_0"] += 1 - strategy._active_traces["trace_1"] += 1 # Two parallel consumers complete. We use asyncio.gather to drive them # concurrently within the same event-loop tick. asyncio.Queue is non-blocking @@ -287,12 +283,10 @@ async def test_burst_of_ten_completions_preserves_completion_order(): # Full pool: queue holds all 12 traces at setup. assert strategy._recycle_queue.qsize() == 12 - # Register lane bookkeeping for the 10 in-flight sessions. Seed - # _active_traces too so the new pop loop skips trace_ids whose session - # is alive (mirroring _execute_profiling). + # Register lane bookkeeping for the 10 in-flight sessions. + # _active_traces was already pre-registered by setup_phase. for i in range(10): strategy._correlation_to_lane[f"xcorr_{i}"] = i - strategy._active_traces[f"trace_{i}"] += 1 # Fire 10 completions in completion order: trace_0..trace_9 finish in order. for i in range(10): @@ -368,11 +362,10 @@ async def capture(turn): # Full pool: queue holds all 70 traces at setup. assert len(initial_queue) == 70 - # Register lane bookkeeping for the 50 in-flight sessions. Seed - # _active_traces too so the new pop loop skips alive trace_ids. + # Register lane bookkeeping for the 50 in-flight sessions. + # _active_traces was already pre-registered by setup_phase. for i in range(50): strategy._correlation_to_lane[f"xcorr_{i}"] = i - strategy._active_traces[f"trace_{i}"] += 1 finals = [ _make_credit( @@ -475,11 +468,10 @@ async def test_recycle_during_cooldown_does_not_start_new_sessions(): # Full pool: queue holds all 5 traces at setup. assert initial_size == 5 - # Register the in-flight session's lane bookkeeping. Seed _active_traces - # so the cooldown gate is reached after the discard at the top of - # _spawn_from_recycle_or_id. + # Register the in-flight session's lane bookkeeping. _active_traces was + # pre-registered by setup_phase; the cooldown gate is reached after the + # discard at the top of _spawn_from_recycle_or_id. strategy._correlation_to_lane["xcorr"] = 0 - strategy._active_traces["trace_0"] += 1 # Final turn arrives during cooldown. final = _make_credit(conversation_id="trace_0", turn_index=1, num_turns=2) @@ -550,16 +542,13 @@ async def capture(turn): # dispatched session's (trace_id, correlation_id) to the tail. from collections import deque - # Seed the trajectory's correlation_ids and _active_traces: - # handle_credit_return now requires finished_correlation_id to be present - # in _correlation_to_lane, and the new full-pool pop loop skips trace_ids - # in _active_traces. Mimic _execute_profiling's bookkeeping for the - # initial trajectory cohort. + # Seed the trajectory's correlation_ids: handle_credit_return requires + # finished_correlation_id to be present in _correlation_to_lane. + # _active_traces was already pre-registered by setup_phase. in_flight: deque[tuple[str, str]] = deque() for lane in range(trajectory_count): corr = f"xcorr_traj_{lane}" strategy._correlation_to_lane[corr] = lane - strategy._active_traces[f"trace_{lane}"] += 1 in_flight.append((f"trace_{lane}", corr)) total_completions = 1500 @@ -907,7 +896,6 @@ async def capture(turn): ) await strategy.setup_phase() strategy._correlation_to_lane["xcorr"] = 0 - strategy._active_traces["trace_0"] += 1 # Root credit: agent_depth defaults to 0 via _make_credit. root_final = _make_credit(conversation_id="trace_0", turn_index=1, num_turns=2) From 3dfa3b47ca6a4b7ed737c9d032fec042952b5fa9 Mon Sep 17 00:00:00 2001 From: Anthony Casagrande Date: Fri, 12 Jun 2026 12:01:33 -0700 Subject: [PATCH 4/9] fix(agentic): keep cache-bust markers stable for sessions continuing across phases Marker coherence across the WARMUP -> PROFILING boundary relied on both phases re-minting in identical positional order from per-phase counters. The phases mint different sets: WARMUP skips waiting_on_children states while PROFILING mints every continuation state, so under wrap-fill with mixed ready/waiting roots on a shared trace the continuing session's re-mint lands on a different pass number - its marker rotates mid-session at the phase boundary and the warmed KV prefix becomes unreachable for the measured turns. Move _recycle_pass/_session_marker to a CacheBustLedger on the shared TrajectorySource and reuse an already-minted marker for the same x_correlation_id. Continuing sessions now keep their WARMUP marker by identity rather than by mint-order replay, and the never-restarting pass counter guarantees fresh sessions cannot collide with warmed digests. Co-Authored-By: Claude Fable 5 Signed-off-by: Anthony Casagrande --- .../timing/strategies/agentic_replay.py | 47 +++++----- src/aiperf/timing/trajectory_source.py | 31 +++++++ .../timing/strategies/test_agentic_replay.py | 92 +++++++++++++++++++ 3 files changed, 149 insertions(+), 21 deletions(-) diff --git a/src/aiperf/timing/strategies/agentic_replay.py b/src/aiperf/timing/strategies/agentic_replay.py index 48d59b5b8..c986230a8 100644 --- a/src/aiperf/timing/strategies/agentic_replay.py +++ b/src/aiperf/timing/strategies/agentic_replay.py @@ -132,17 +132,18 @@ def __init__( # Cache-bust state. WARMUP and PROFILING construct distinct strategy # instances (PhaseRunner builds a fresh AgenticReplayStrategy per # phase), while the shared TrajectorySource keeps each sampled lane's - # x_correlation_id stable across the phase boundary. The MARKER text is - # also warmup-coherent: the digest is computed from - # ``(benchmark_id, recycle_pass, trajectory_index, trace_id)`` — - # phase-agnostic — so warmup turn k_i and profile turn k_i+1 get - # the same marker within the continued session. That preserves the - # KV-cache lineage warmup is meant to prime. - # trajectory_index is stable per "lane" (slot in the trajectory list) - # and reused on recycle, so the digest changes only across recycle - # passes for a given trace_id. - self._recycle_pass: dict[str, int] = {} - self._session_marker: dict[str, str | None] = {} + # x_correlation_id stable across the phase boundary AND carries the + # marker ledger across it. A session continuing into PROFILING reuses + # the exact marker minted for it during WARMUP (see + # ``_mint_marker_for_session``), so warmup turn k_i and profile turn + # k_i+1 share the same marker within the continued session - the + # KV-cache lineage warmup is meant to prime is preserved by identity, + # not by replaying mint order. New sessions draw from the shared + # ``recycle_pass`` counter, which never restarts, so a recycled + # session's digest can never collide with a warmed one. + ledger = conversation_source.cache_bust_ledger + self._recycle_pass: dict[str, int] = ledger.recycle_pass + self._session_marker: dict[str, str | None] = ledger.session_marker self._correlation_to_lane: dict[str, int] = {} self._cache_bust_target: CacheBustTarget = ( user_config.input.prompt.cache_bust.target @@ -679,22 +680,26 @@ def _build_turn_for_session( def _mint_marker_for_session( self, x_correlation_id: str, trace_id: str, trajectory_index: int ) -> str | None: - """Mint and store a per-session cache-bust marker. + """Mint (or reuse) and store a per-session cache-bust marker. Returns None when the feature is disabled (target=NONE), in which case the session map records None so callers can unconditionally look it up. Increments _recycle_pass[trace_id] each time a new session is minted for the same trace_id, so digest rotates across - recycles within a single phase. - - The strategy is constructed FRESH for each phase (per the - TimingStrategyProtocol contract; PhaseRunner builds a new instance for - WARMUP and another for PROFILING). Both phases start with empty - ``_recycle_pass``, so the first mint for a given trace_id in PROFILING - produces ``pass=0`` — matching WARMUP's pass=0 digest for the same - (trace_id, lane) pair. The shared ``TrajectorySource`` also preserves - the lane's x_correlation_id across the phase boundary. + recycles. + + Both ``_session_marker`` and ``_recycle_pass`` live on the shared + ``TrajectorySource`` ledger, surviving the WARMUP -> PROFILING + boundary (strategies are constructed fresh per phase). A session + whose x_correlation_id was already minted - a continuing lane + resuming at k_i+1 - keeps its WARMUP marker verbatim instead of + re-minting, so a continued session's digest can never rotate at the + phase boundary regardless of mint order. The pass counter never + restarts, so fresh sessions (recycles, parents unblocked after t*) + can never collide with a warmed digest. """ + if x_correlation_id in self._session_marker: + return self._session_marker[x_correlation_id] if self._cache_bust_target == CacheBustTarget.NONE: self._session_marker[x_correlation_id] = None return None diff --git a/src/aiperf/timing/trajectory_source.py b/src/aiperf/timing/trajectory_source.py index a6c6c576e..63aeb70ad 100644 --- a/src/aiperf/timing/trajectory_source.py +++ b/src/aiperf/timing/trajectory_source.py @@ -81,6 +81,24 @@ class Trajectory: ) +@dataclass(slots=True) +class CacheBustLedger: + """Cross-phase cache-bust marker state. + + Lives on the shared ``TrajectorySource`` (constructed once at + TimingManager level) so the WARMUP and PROFILING strategy instances see + one ledger. A session that continues across the phase boundary keeps the + exact marker minted for it during WARMUP, and new sessions draw pass + numbers from a counter that never restarts - so a recycled session's + digest can never collide with a warmed one. + """ + + recycle_pass: dict[str, int] = field(default_factory=dict) + """Next-pass counter per trace_id; incremented on every fresh mint.""" + session_marker: dict[str, str | None] = field(default_factory=dict) + """Minted marker per live x_correlation_id (None when cache-bust is off).""" + + @dataclass(slots=True, frozen=True) class _BranchRuntime: branch_id: str @@ -189,6 +207,19 @@ def __init__( self._log_trajectory_summary() + @property + def cache_bust_ledger(self) -> CacheBustLedger: + """Marker ledger shared by the WARMUP and PROFILING strategy instances. + + Created lazily so sources built through ``__new__`` in tests get a + ledger on first access without extra setup. + """ + ledger = getattr(self, "_cache_bust_ledger", None) + if ledger is None: + ledger = CacheBustLedger() + self._cache_bust_ledger = ledger + return ledger + def _log_trajectory_summary(self) -> None: """Log a one-block table of every trajectory's start position. diff --git a/tests/unit/timing/strategies/test_agentic_replay.py b/tests/unit/timing/strategies/test_agentic_replay.py index 54a1c852f..db4c48390 100644 --- a/tests/unit/timing/strategies/test_agentic_replay.py +++ b/tests/unit/timing/strategies/test_agentic_replay.py @@ -724,6 +724,98 @@ async def gated_issue_credit(turn): ) +@pytest.mark.asyncio +async def test_continuing_session_keeps_warmup_marker_across_phase_boundary(): + """A continued session's cache-bust marker must not rotate at the boundary. + + Under wrap-fill, two lanes share trace_X: lane 0's root is blocked on + children at t* (skipped by WARMUP dispatch) while lane 1's root is ready. + WARMUP therefore mints only lane 1; PROFILING mints both. Positional + re-minting hands lane 0 the pass=0 digest and bumps lane 1's continuing + session to pass=1 - the warmed KV prefix becomes unreachable for the + measured turns. The marker minted in WARMUP must be reused verbatim for + the same x_correlation_id in PROFILING. + """ + ds = _make_dataset(num_traces=1, turns_per_trace=3) + trajectories = [ + Trajectory( + conversation_id="trace_0", + start_turn_index=0, + snapshot=TrajectorySnapshot( + t_star_ms=0.0, + states=( + ConversationState( + conversation_id="trace_0", + x_correlation_id="A-root", + next_turn_index=2, + waiting_on_children=True, + join_target_turn_index=2, + ), + ), + ), + ), + Trajectory( + conversation_id="trace_0", + start_turn_index=0, + snapshot=TrajectorySnapshot( + t_star_ms=0.0, + states=( + ConversationState( + conversation_id="trace_0", + x_correlation_id="B-root", + next_turn_index=0, + ), + ), + ), + ), + ] + src = _build_real_trajectory_source(1, 3, trajectories, dataset=ds) + user_config = MagicMock() + user_config.input.prompt.cache_bust.target = CacheBustTarget.FIRST_TURN_PREFIX + user_config.benchmark_id = "bench" + + def _strategy_for(phase: CreditPhase, issuer: AsyncMock) -> AgenticReplayStrategy: + cfg = MagicMock() + cfg.phase = phase + return AgenticReplayStrategy( + config=cfg, + conversation_source=src, + scheduler=MagicMock(), + stop_checker=MagicMock(), + credit_issuer=issuer, + lifecycle=MagicMock(), + user_config=user_config, + ) + + warmup_issuer = AsyncMock() + warmup = _strategy_for(CreditPhase.WARMUP, warmup_issuer) + await warmup.setup_phase() + await warmup.execute_phase() + warmup_turns = [c.args[0] for c in warmup_issuer.issue_credit.await_args_list] + warmup_marker = warmup._session_marker["B-root"] + assert warmup_marker is not None + assert [t.cache_bust_marker for t in warmup_turns] == [warmup_marker] + + profiling_issuer = AsyncMock() + profiling = _strategy_for(CreditPhase.PROFILING, profiling_issuer) + await profiling.setup_phase() + await profiling.execute_phase() + profiling_turns = { + t.conversation_id: t + for c in profiling_issuer.issue_credit.await_args_list + for t in [c.args[0]] + if t.turn_index == 1 + } + continuing = profiling_turns["trace_0"] + assert continuing.cache_bust_marker == warmup_marker, ( + "Continuing session's marker rotated at the WARMUP->PROFILING " + "boundary - the warmed KV prefix is unreachable for measured turns" + ) + # The unblocked lane-0 parent is a distinct session and must NOT share + # the continuing session's digest. + assert profiling._session_marker["A-root"] != warmup_marker + + @pytest.mark.asyncio async def test_startup_recycle_does_not_pop_live_trajectory_trace(): """A startup recycle must not spawn a trace whose own lane hasn't dispatched. From 9479d72c53f1fb615ad56fb9e07413f89ce76b47 Mon Sep 17 00:00:00 2001 From: Anthony Casagrande Date: Fri, 12 Jun 2026 12:04:21 -0700 Subject: [PATCH 5/9] fix(dag): seed pre-t* completed prereqs as satisfied in snapshot joins _ensure_seeded_join pre-seeds every prereq key declared on a gated turn as unregistered, and seed_snapshot only registers keys for children still alive at t*. When two branch groups share one join turn and t* falls between their completions, the already-finished group's key could never register (PrereqState.is_done requires registered=True), so the gate was permanently unsatisfiable: the parent lane silently wedged for the entire phase, surfacing only as a misleading outstanding=0 abandoned-join warning at cleanup while has_pending_branch_work kept the phase-end drain waiting until the forced-completion timeout. Record each prereq's spawning turn in the index; snapshot seeding marks keys whose spawning turn fired before the parent's resume position as registered (expected=0 -> satisfied unless live children re-register). Keys whose spawning turn replays after t* stay unregistered so a gate can still not fire before its branch has spawned. Co-Authored-By: Claude Fable 5 Signed-off-by: Anthony Casagrande --- src/aiperf/timing/branch_orchestrator.py | 23 ++- tests/unit/timing/test_branch_orchestrator.py | 158 ++++++++++++++++++ 2 files changed, 180 insertions(+), 1 deletion(-) diff --git a/src/aiperf/timing/branch_orchestrator.py b/src/aiperf/timing/branch_orchestrator.py index 77b6bae3b..d87509d5a 100644 --- a/src/aiperf/timing/branch_orchestrator.py +++ b/src/aiperf/timing/branch_orchestrator.py @@ -258,6 +258,12 @@ def __init__( # early when one branch completes before another branch's spawning # turn has been reached. self._gated_turn_prereq_keys: dict[tuple[str, int], set[str]] = {} + # (conv_id, gated_turn_idx, prereq_key) -> spawning turn index. + # Snapshot seeding consults this to tell prereqs whose spawning turn + # already fired before t* (their children either appear live in the + # snapshot or completed entirely pre-t*) apart from prereqs whose + # spawning turn will fire during replay. + self._prereq_spawning_turn: dict[tuple[str, int, str], int] = {} # Defense-in-depth duplicate detection against future loaders that # bypass ``validate_for_orchestrator_v1``. A given # ``(branch_id, gated_turn_idx)`` tuple must not appear twice — that @@ -294,6 +300,9 @@ def _build_prereq_index(self) -> None: self._gated_turn_prereq_keys.setdefault( (conv.conversation_id, gated_idx), set() ).add(prereq_key) + self._prereq_spawning_turn[ + (conv.conversation_id, gated_idx, prereq_key) + ] = spawning_idx def get_branch_ids(self, credit) -> list[str]: """Look up the completed turn's ``branch_ids`` from metadata. @@ -514,7 +523,19 @@ def _ensure_seeded_join( for prereq_key in self._gated_turn_prereq_keys.get( (parent_state.conversation_id, gated_idx), set() ): - pending.outstanding[prereq_key] = PrereqState() + state = PrereqState() + spawning_idx = self._prereq_spawning_turn.get( + (parent_state.conversation_id, gated_idx, prereq_key) + ) + if spawning_idx is not None and spawning_idx < parent_state.next_turn_index: + # The spawning turn fired before t* and will never replay. + # Children still alive at t* re-register with expected + # counts during this same seeding pass; a branch with no + # live children completed entirely pre-t* and must seed as + # satisfied, or the gate is permanently unsatisfiable and + # the parent lane silently wedges for the whole phase. + state.registered = True + pending.outstanding[prereq_key] = state if ( parent_state.waiting_on_children diff --git a/tests/unit/timing/test_branch_orchestrator.py b/tests/unit/timing/test_branch_orchestrator.py index 597c88bf9..9145d8e0e 100644 --- a/tests/unit/timing/test_branch_orchestrator.py +++ b/tests/unit/timing/test_branch_orchestrator.py @@ -124,6 +124,164 @@ def get_metadata(self, conversation_id): assert released.gated_turn_index == 2 +def _two_branch_join_meta() -> tuple[ + ConversationMetadata, dict[str, ConversationMetadata] +]: + """Parent with branches A (spawned turn 1) and B (spawned turn 2), both + gating turn 3. Children are one-turn SPAWN conversations.""" + parent_meta = ConversationMetadata( + conversation_id="parent", + turns=[ + TurnMetadata(timestamp_ms=0.0), + TurnMetadata(timestamp_ms=10_000.0, branch_ids=["A"]), + TurnMetadata(timestamp_ms=20_000.0, branch_ids=["B"]), + TurnMetadata( + timestamp_ms=30_000.0, + prerequisites=[ + TurnPrerequisite(kind=PrerequisiteKind.SPAWN_JOIN, branch_id="A"), + TurnPrerequisite(kind=PrerequisiteKind.SPAWN_JOIN, branch_id="B"), + ], + ), + ], + branches=[ + ConversationBranchInfo( + branch_id="A", + child_conversation_ids=["a_child"], + mode=ConversationBranchMode.SPAWN, + start_timestamp_ms=11_000.0, + ), + ConversationBranchInfo( + branch_id="B", + child_conversation_ids=["b_child"], + mode=ConversationBranchMode.SPAWN, + start_timestamp_ms=21_000.0, + ), + ], + ) + children = { + cid: ConversationMetadata( + conversation_id=cid, + turns=[TurnMetadata(timestamp_ms=ts)], + is_root=False, + agent_depth=1, + parent_conversation_id="parent", + ) + for cid, ts in [("a_child", 11_000.0), ("b_child", 21_000.0)] + } + return parent_meta, children + + +def _source_for(parent_meta, children): + class _Source: + dataset_metadata = DatasetMetadata( + conversations=[parent_meta, *children.values()], + sampling_strategy=DatasetSamplingStrategy.SEQUENTIAL, + ) + + def get_metadata(self, conversation_id): + return {"parent": parent_meta, **children}[conversation_id] + + return _Source() + + +@pytest.mark.asyncio +async def test_seed_snapshot_prereq_completed_before_t_star_does_not_wedge_gate(): + """A prereq group that fully completed before t* must not wedge the join. + + Two branches share the turn-3 gate. t* falls between their completions: + branch A's child finished pre-t* (absent from the snapshot) while branch + B's child is live. A's spawning turn fired before the parent's resume + position and can never re-fire during replay, so its prereq key must be + seeded as satisfied - otherwise the gate is permanently unsatisfiable and + the parent lane silently wedges for the entire phase. + """ + parent_meta, children = _two_branch_join_meta() + issuer = MagicMock() + issuer.dispatch_join_turn = AsyncMock(return_value=True) + orch = BranchOrchestrator( + conversation_source=_source_for(parent_meta, children), credit_issuer=issuer + ) + states = ( + ConversationState( + conversation_id="parent", + x_correlation_id="parent-corr", + next_turn_index=3, + waiting_on_children=True, + join_target_turn_index=3, + ), + ConversationState( + conversation_id="b_child", + x_correlation_id="b-corr", + next_turn_index=0, + agent_depth=1, + parent_correlation_id="parent-corr", + join_target_turn_index=3, + branch_id="B", + branch_mode=ConversationBranchMode.SPAWN, + ), + ) + orch.seed_snapshot(states) + + pending = orch._active_joins["parent-corr"] + assert pending.outstanding["SPAWN_JOIN:A"].is_done, ( + "Branch A completed before t* but its prereq was seeded unsatisfiable" + ) + assert not pending.is_satisfied # B's live child is still outstanding. + + await orch.on_child_leaf_reached("b-corr") + + issuer.dispatch_join_turn.assert_awaited_once() + released = issuer.dispatch_join_turn.await_args.args[0] + assert released.parent_x_correlation_id == "parent-corr" + assert released.gated_turn_index == 3 + assert "parent-corr" not in orch._active_joins + + +@pytest.mark.asyncio +async def test_seed_snapshot_keeps_unfired_future_prereq_unregistered(): + """A prereq whose spawning turn replays after t* must stay unregistered. + + The parent resumes at turn 2 (B's spawning turn), so branch B will fire + during replay: seeding it as satisfied would release the gate before B's + children even spawn. Only branches whose spawning turn fired before the + parent's resume position may be auto-satisfied. + """ + parent_meta, children = _two_branch_join_meta() + issuer = MagicMock() + issuer.dispatch_join_turn = AsyncMock(return_value=True) + orch = BranchOrchestrator( + conversation_source=_source_for(parent_meta, children), credit_issuer=issuer + ) + states = ( + ConversationState( + conversation_id="parent", + x_correlation_id="parent-corr", + next_turn_index=2, + ), + ConversationState( + conversation_id="a_child", + x_correlation_id="a-corr", + next_turn_index=0, + agent_depth=1, + parent_correlation_id="parent-corr", + join_target_turn_index=3, + branch_id="A", + branch_mode=ConversationBranchMode.SPAWN, + ), + ) + orch.seed_snapshot(states) + + pending = orch._future_joins["parent-corr"][3] + assert pending.outstanding["SPAWN_JOIN:A"].expected == 1 + assert not pending.outstanding["SPAWN_JOIN:B"].registered + + await orch.on_child_leaf_reached("a-corr") + + # A is done but B has not even spawned yet: the gate must hold. + issuer.dispatch_join_turn.assert_not_awaited() + assert orch._future_joins["parent-corr"][3] is pending + + @pytest.mark.asyncio async def test_intercept_with_spawn_dispatches_children_and_registers_sticky(): """Phase 1 semantics: intercept returns False after a pure-spawn with no From 04318d6764a65c18a6df021008993bfc28423d20 Mon Sep 17 00:00:00 2001 From: Anthony Casagrande Date: Fri, 12 Jun 2026 12:13:33 -0700 Subject: [PATCH 6/9] fix(dag): stop double-dispatching the gated turn when zero children land When every child of a gated branch failed to start, the drained-gate path dispatched the gated turn via _release_blocked_join, but _maybe_suspend_parent then returned False (the gate was already popped), so the callback handler fell through to the strategy's normal continuation and dispatched the identical turn again - a duplicate request whose doubled returns propagate per-turn and end in the double-recycle RuntimeError at the trajectory's final turn. Pop drained gates silently instead: with the gate gone, intercept returns False and the strategy's continuation owns the single dispatch. No hang is reintroduced - the original Phase 0 deadlock came from leaving an unsatisfiable join registered, and the pop still prevents that. Tests that encoded the False+dispatched combination now assert single ownership. Co-Authored-By: Claude Fable 5 Signed-off-by: Anthony Casagrande --- src/aiperf/timing/branch_orchestrator.py | 27 ++++++------- .../timing/test_dag_timing_pathology.py | 16 +++++--- .../timing/test_branch_orchestrator_phase0.py | 39 +++++++++---------- 3 files changed, 40 insertions(+), 42 deletions(-) diff --git a/src/aiperf/timing/branch_orchestrator.py b/src/aiperf/timing/branch_orchestrator.py index d87509d5a..b5cdf07f5 100644 --- a/src/aiperf/timing/branch_orchestrator.py +++ b/src/aiperf/timing/branch_orchestrator.py @@ -783,22 +783,22 @@ async def _spawn_children_and_register_gates( self.stats.children_errored += 1 self.stats.children_spawned -= 1 - # If no children at all landed (all failed), check for gates that - # are now zero-outstanding and dispatch the gated turn immediately - # to avoid hanging the parent. + # If no children at all landed (all failed), pop gates that are now + # zero-outstanding so the parent is not left suspended on a join + # that can never fire via the child-leaf decrement path. gates_for_parent = self._future_joins.get(parent_corr, {}) - drained_gates: list[PendingBranchJoin] = [] for gated_idx, pending in list(gates_for_parent.items()): # A gate may be vestigial (created this call and immediately # satisfied) if every child under every prereq rolled back. if pending.is_satisfied: - # Only fire NOW if the gate is the parent's IMMEDIATE next - # turn. A delayed gate (intervening turns precede it) must not - # dispatch out of order: pop it silently and let the parent - # advance turns normally; when it reaches the (now un-gated) - # turn, the strategy sends it as an ordinary continuation. - if gated_idx == credit.turn_index + 1: - drained_gates.append(pending) + # Pop silently regardless of position. With the gate gone, + # _maybe_suspend_parent returns False and the strategy's + # normal continuation dispatches the (now un-gated) turn as + # an ordinary next turn - exactly once. Dispatching the + # immediate-next gate here as well double-dispatched the + # same turn: intercept still returned False, so the callback + # handler fell through to handle_credit_return -> + # _dispatch_next_turn for the identical turn_index. self._pop_future_join(parent_corr, gated_idx) # If no successful children AND no gated turns, release the # reserved parent state so the parent can drain. @@ -820,11 +820,6 @@ async def _spawn_children_and_register_gates( del self._descendant_counts[parent_corr] self._notify_drain() # all-children-rolled-back path: no credit return follows - for pending in drained_gates: - # Zero-outstanding gate with no way to fire via child-leaf - # decrement: dispatch immediately (matches Phase 0 hang-fix). - await self._release_blocked_join(pending) - def _ensure_future_join( self, credit, diff --git a/tests/component_integration/timing/test_dag_timing_pathology.py b/tests/component_integration/timing/test_dag_timing_pathology.py index f54196314..349760940 100644 --- a/tests/component_integration/timing/test_dag_timing_pathology.py +++ b/tests/component_integration/timing/test_dag_timing_pathology.py @@ -656,7 +656,9 @@ async def test_orchestrator_zero_child_branch_with_gate_does_not_hang() -> None: SPAWN_JOIN against it. The orchestrator's expected_gates path must create a future-join with an unregistered PrereqState seed (from _gated_turn_prereq_keys) AND mark it registered with expected=0 — so - is_done is True and the gate does NOT block the parent.""" + is_done is True and the gate does NOT block the parent. The drained + gate is popped silently: intercept returns False and the strategy's + normal continuation owns the single dispatch of the gated turn.""" branch = ConversationBranchInfo( branch_id="root:0", child_conversation_ids=[], # zero children @@ -681,12 +683,14 @@ async def test_orchestrator_zero_child_branch_with_gate_does_not_hang() -> None: orch = BranchOrchestrator(conversation_source=cs, credit_issuer=issuer) s = await orch.intercept(_mk_credit("root", "p", turn_index=0, num_turns=2)) - # No children -> the expected_gates path fires the join immediately, so - # by the time intercept returns the gate has been drained and the parent - # is NOT suspended. + # No children -> the gate drains at spawn time and is popped silently, so + # by the time intercept returns the parent is NOT suspended and the + # strategy's continuation path dispatches turn 1 exactly once. assert s is False, "zero-child branch must not deadlock parent at next turn" - issuer.dispatch_join_turn.assert_awaited_once() - assert orch.stats.parents_resumed == 1 + issuer.dispatch_join_turn.assert_not_awaited() + assert "p" not in orch._active_joins + assert not orch._future_joins.get("p") + assert orch.stats.parents_resumed == 0 assert orch.stats.parents_suspended == 0 diff --git a/tests/unit/timing/test_branch_orchestrator_phase0.py b/tests/unit/timing/test_branch_orchestrator_phase0.py index 6e53f4e23..01c82283a 100644 --- a/tests/unit/timing/test_branch_orchestrator_phase0.py +++ b/tests/unit/timing/test_branch_orchestrator_phase0.py @@ -8,9 +8,11 @@ - ``dispatch_join_turn`` propagates ``parent_branch_mode`` and ``parent_has_forks_on_gated_turn`` from :class:`PendingBranchJoin` instead of hardcoding FORK. -- ``BranchOrchestrator.intercept`` dispatches the gated join turn immediately - when every ``start_branch_child`` call fails (no children landed), instead - of registering a dead pending join that hangs the parent. +- ``BranchOrchestrator.intercept`` pops the drained gate silently and returns + False when every ``start_branch_child`` call fails (no children landed), + so the strategy's normal continuation dispatches the gated turn exactly + once - neither hanging the parent on a dead pending join nor + double-dispatching it from the orchestrator. """ from __future__ import annotations @@ -122,11 +124,14 @@ async def _try_issue(turn: TurnToSend) -> bool: @pytest.mark.asyncio -async def test_intercept_all_children_failed_with_gate_does_not_hang(): +async def test_intercept_all_children_failed_defers_gated_turn_to_strategy(): """When every ``start_branch_child`` raises on a parent turn whose next - turn is gated, the future join has zero outstanding children and - would never fire via the child-leaf decrement path. The orchestrator - must dispatch the gated turn immediately.""" + turn is gated, the future join has zero outstanding children and would + never fire via the child-leaf decrement path. The drained gate must be + popped silently and intercept must return False: the strategy's normal + continuation then dispatches turn k+1 exactly once. Dispatching the join + here AND returning False sent the same turn twice (the callback handler + falls through to handle_credit_return -> _dispatch_next_turn).""" branch = ConversationBranchInfo( branch_id="root:0", child_conversation_ids=["a", "b"], @@ -163,25 +168,19 @@ async def test_intercept_all_children_failed_with_gate_does_not_hang(): branch_mode=ConversationBranchMode.FORK, ) - # No children landed; the gate was drained at spawn time and the join - # fired immediately (not deferred). Parent's next turn is turn 1 but - # intercept returns False because the join already dispatched (the - # future/active join entries are gone). + # All children errored before any landed: the gate is "satisfied" with + # zero outstanding, popped silently, and the parent is not suspended. result = await orch.intercept(credit) - # Since all children errored before any landed, the gate was "satisfied" - # with zero outstanding and dispatched immediately. No suspension. assert result is False - # Gated turn dispatched exactly once. - assert issuer.dispatch_join_turn.await_count == 1 - dispatched_pending = issuer.dispatch_join_turn.await_args.args[0] - assert dispatched_pending.gated_turn_index == 1 - assert dispatched_pending.total_outstanding == 0 + # The single dispatch of turn 1 belongs to the strategy's continuation + # path; the orchestrator must not issue the join turn itself. + issuer.dispatch_join_turn.assert_not_awaited() - # No leaked per-parent state. + # No leaked per-parent state to wedge phase-end draining. assert "root-corr" not in orch._active_joins assert "root-corr" not in orch._future_joins assert "root-corr" not in orch._descendant_counts - assert orch.stats.parents_resumed == 1 + assert orch.stats.parents_resumed == 0 assert orch.stats.children_errored == 2 assert orch.stats.children_spawned == 0 From d9683ad4855ac958092de5370974532830e5a37d Mon Sep 17 00:00:00 2001 From: Anthony Casagrande Date: Fri, 12 Jun 2026 12:13:44 -0700 Subject: [PATCH 7/9] test: track AgentX scenario idle-gap cap rename in CLI e2e assertion Commit 932b4bc switched the inferencex-agentx-mvp scenario from inter_turn_delay_cap_seconds to trace_idle_gap_cap_seconds but the e2e log assertion still expected the old auto-set line, failing the test on every run since. Co-Authored-By: Claude Fable 5 Signed-off-by: Anthony Casagrande --- tests/component_integration/test_agentic_replay_cli_e2e.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/tests/component_integration/test_agentic_replay_cli_e2e.py b/tests/component_integration/test_agentic_replay_cli_e2e.py index 4f0ef9874..2b4703d79 100644 --- a/tests/component_integration/test_agentic_replay_cli_e2e.py +++ b/tests/component_integration/test_agentic_replay_cli_e2e.py @@ -253,8 +253,10 @@ def test_agentic_replay_cli_scenario_unsafe_override_runs_to_completion( "validator must log timing_mode auto-set under --scenario " "(covers the read-only-property setter path against real UserConfig)" ) - assert "auto-set --inter-turn-delay-cap-seconds=60.0" in log_text, ( - "validator must auto-set inter-turn-delay-cap when unset" + assert "auto-set --trace-idle-gap-cap-seconds=60.0" in log_text, ( + "validator must auto-set the per-trace idle-gap cap when unset " + "(the AgentX scenario locks trace_idle_gap_cap_seconds, not the " + "inter-turn delay cap, since 932b4bc)" ) assert result.json is not None, "JSON export must exist" From d57156110aac9759ab3ae7c7012cc4cfb2747f76 Mon Sep 17 00:00:00 2001 From: Anthony Casagrande Date: Fri, 12 Jun 2026 12:17:02 -0700 Subject: [PATCH 8/9] test(agentic): harden profiling-startup concurrency regression tests - Assert the N-at-the-gate count is produced by exactly N distinct turn-0 sessions (await_count + distinct correlation ids), so the assertion cannot be satisfied by a double-dispatching lane masking a lost one. - Bound task completion with wait_for so a wedge regression fails in 5s instead of hanging to the global timeout. - Add a companion test covering the snapshot-less k_i+1 resume branch, which previously had no concurrency coverage (verified to fail against a serial dispatch loop). - Rebuild on the file's _make_strategy/_build_real_trajectory_source helpers, dropping the inline TrajectorySource.__new__ copy, the dead max_in_flight/seed/target-size setup, and the wrong queue-depth comment (N traces suffice for N concurrent pops; depth is conserved). - Match _dispatch_one_profiling_trajectory's parameter order to its sibling (trajectory, lane), drop the now-pointless default-arg capture ceremony in the debug lambda, and add the missing docstring. Co-Authored-By: Claude Fable 5 Signed-off-by: Anthony Casagrande --- .../timing/strategies/agentic_replay.py | 19 +-- .../timing/strategies/test_agentic_replay.py | 115 +++++++++++------- 2 files changed, 80 insertions(+), 54 deletions(-) diff --git a/src/aiperf/timing/strategies/agentic_replay.py b/src/aiperf/timing/strategies/agentic_replay.py index c986230a8..cffa3acc5 100644 --- a/src/aiperf/timing/strategies/agentic_replay.py +++ b/src/aiperf/timing/strategies/agentic_replay.py @@ -265,9 +265,9 @@ async def _execute_profiling(self) -> None: """Resume each trajectory at ``k_i + 1`` to seed the steady state. All trajectories are dispatched concurrently so the full concurrency - target is reached immediately rather than serializing over N credit - round-trips. Subsequent turns and recycle-pool sessions are dispatched - from handle_credit_return. + target is reached as fast as slot limits allow, rather than + serializing over N credit round-trips. Subsequent turns and + recycle-pool sessions are dispatched from handle_credit_return. """ self.info( f"PROFILING execute: resuming {len(self.conversation_source.trajectories)} " @@ -279,7 +279,7 @@ async def _execute_profiling(self) -> None: # unreachable by the phase runner's cancellation. results = await asyncio.gather( *( - self._dispatch_one_profiling_trajectory(lane, trajectory) + self._dispatch_one_profiling_trajectory(trajectory, lane) for lane, trajectory in enumerate(self.conversation_source.trajectories) ), return_exceptions=True, @@ -299,8 +299,9 @@ async def _execute_profiling(self) -> None: raise first_error async def _dispatch_one_profiling_trajectory( - self, lane: int, trajectory: Trajectory + self, trajectory: Trajectory, lane: int ) -> None: + """Dispatch one lane's initial PROFILING credit (run under gather).""" if trajectory.snapshot is not None: await self._dispatch_snapshot_for_profiling(trajectory, lane) return @@ -318,9 +319,11 @@ async def _dispatch_one_profiling_trajectory( # Trajectory's k_i was already the last turn (rare: happens # only for very short traces). Skip directly to recycle. self.debug( - lambda cid=trajectory.conversation_id, - k=trajectory.start_turn_index, - n=num_turns: f"Trajectory {cid} k_i={k} >= last turn (n={n}); recycling immediately" + lambda: ( + f"Trajectory {trajectory.conversation_id} " + f"k_i={trajectory.start_turn_index} >= last turn " + f"(n={num_turns}); recycling immediately" + ) ) await self._spawn_from_recycle_or_id( trajectory.conversation_id, diff --git a/tests/unit/timing/strategies/test_agentic_replay.py b/tests/unit/timing/strategies/test_agentic_replay.py index db4c48390..18cdd644f 100644 --- a/tests/unit/timing/strategies/test_agentic_replay.py +++ b/tests/unit/timing/strategies/test_agentic_replay.py @@ -624,34 +624,21 @@ async def capture(turn): async def test_terminal_root_snapshot_recycles_are_concurrent_not_serial(): """Regression: terminal-root recycles must not serialize profiling startup. - Commit f47bd5537e introduced `await _spawn_from_recycle_or_id(...)` inside - the per-trajectory loop in _dispatch_snapshot_for_profiling. With N terminal- - root trajectories this blocked the Kth dispatch until the (K-1)th recycle - completed, causing all 256 sessions to trickle in over ~54 s instead of - bursting at t=0 on a real cluster. + Commit f47bd5537e introduced an awaited recycle per trajectory in the + startup dispatch loop. With N terminal-root trajectories this blocked the + Kth dispatch until the (K-1)th recycle completed, causing all 256 sessions + to trickle in over ~54 s instead of bursting at t=0 on a real cluster. """ N = 3 # N single-turn traces become terminal roots: snapshot root is at turn 0 # (the only turn), so _snapshot_continuation_after_warmup drops the state # (resume_index=1 >= len(turns)=1) and it lands in terminal_roots. - # Extra traces give the recycle queue enough depth for N concurrent pops. - conversations = [ - ConversationMetadata( - conversation_id=f"trace_{i}", - turns=[TurnMetadata(timestamp_ms=float(i * 1000))], - ) - for i in range(N * 2) - ] - ds = DatasetMetadata( - conversations=conversations, - sampling_strategy=DatasetSamplingStrategy.SEQUENTIAL, - ) trajectories = [ Trajectory( conversation_id=f"trace_{i}", start_turn_index=0, snapshot=TrajectorySnapshot( - t_star_ms=float(i * 1000), + t_star_ms=0.0, states=( ConversationState( conversation_id=f"trace_{i}", @@ -664,44 +651,29 @@ async def test_terminal_root_snapshot_recycles_are_concurrent_not_serial(): for i in range(N) ] - src = TrajectorySource.__new__(TrajectorySource) - src._dataset_metadata = ds - src._dataset_sampler = MagicMock() - src._metadata_lookup = {c.conversation_id: c for c in conversations} - src._random_seed = 0 - src._target_size = N - src.trajectories = trajectories - - # Gate that blocks each recycled credit until we release it. - # in_flight tracks how many issue_credit calls are simultaneously blocked - # at the gate — if serial only 1 is ever in-flight, if concurrent all N are. + # Gate that blocks each recycled credit until we release it. in_flight + # counts issue_credit calls simultaneously blocked at the gate - if + # serial only 1 is ever in-flight, if concurrent all N are. gate = asyncio.Event() in_flight = 0 - max_in_flight = 0 + captured: list = [] async def gated_issue_credit(turn): - nonlocal in_flight, max_in_flight + nonlocal in_flight in_flight += 1 - max_in_flight = max(max_in_flight, in_flight) + captured.append(turn) await gate.wait() in_flight -= 1 return True issuer = AsyncMock() issuer.issue_credit.side_effect = gated_issue_credit - stop_checker = MagicMock() - stop_checker.can_start_new_session.return_value = True - cfg = MagicMock() - cfg.phase = CreditPhase.PROFILING - cfg.concurrency = N - - strategy = AgenticReplayStrategy( - config=cfg, - conversation_source=src, - scheduler=MagicMock(), - stop_checker=stop_checker, - credit_issuer=issuer, - lifecycle=MagicMock(), + strategy, _, _, _ = _make_strategy( + phase=CreditPhase.PROFILING, + trajectories=trajectories, + num_traces=N, + turns_per_trace=1, + issuer=issuer, ) await strategy.setup_phase() @@ -715,13 +687,64 @@ async def gated_issue_credit(turn): concurrent_at_gate = in_flight gate.set() - await task + await asyncio.wait_for(task, timeout=5.0) assert concurrent_at_gate == N, ( f"Expected {N} recycled credits in-flight simultaneously at profiling " f"startup, but only {concurrent_at_gate} were. Terminal-root recycles " "appear to be dispatched serially, blocking the startup loop." ) + # Exactly one recycled turn-0 session per lane, each a distinct session - + # N-at-the-gate must not be satisfiable by the wrong N credits. + assert issuer.issue_credit.await_count == N + assert all(turn.turn_index == 0 for turn in captured) + assert len({turn.x_correlation_id for turn in captured}) == N + + +@pytest.mark.asyncio +async def test_plain_trajectory_resumes_are_concurrent_not_serial(): + """The k_i+1 resume path (timestamp-less trajectories) must burst at t=0. + + Companion to the terminal-root regression test: a refactor that + re-serializes only the snapshot-less resume dispatch would otherwise + ship green. + """ + N = 3 + trajectories = [ + Trajectory(conversation_id=f"trace_{i}", start_turn_index=0) for i in range(N) + ] + gate = asyncio.Event() + in_flight = 0 + + async def gated_issue_credit(turn): + nonlocal in_flight + in_flight += 1 + await gate.wait() + in_flight -= 1 + return True + + issuer = AsyncMock() + issuer.issue_credit.side_effect = gated_issue_credit + strategy, _, _, _ = _make_strategy( + phase=CreditPhase.PROFILING, + trajectories=trajectories, + num_traces=N, + turns_per_trace=4, + issuer=issuer, + ) + await strategy.setup_phase() + task = asyncio.create_task(strategy.execute_phase()) + for _ in range(N + 4): + await asyncio.sleep(0) + + concurrent_at_gate = in_flight + gate.set() + await asyncio.wait_for(task, timeout=5.0) + + assert concurrent_at_gate == N, ( + f"Expected {N} resume credits in-flight simultaneously at profiling " + f"startup, but only {concurrent_at_gate} were." + ) @pytest.mark.asyncio From 92026e5028800ccb33ae3819440cbe549478e335 Mon Sep 17 00:00:00 2001 From: Anthony Casagrande Date: Fri, 12 Jun 2026 12:28:29 -0700 Subject: [PATCH 9/9] test: make overflow-gate exporter helper event-loop independent _export_and_load_sync used asyncio.get_event_loop().run_until_complete, which raises 'no current event loop' on Python 3.12 whenever the xdist worker previously ran an in-process CLI test (e.g. the agentic-replay e2e) that consumed the main thread's loop. The failure only appeared when test scheduling co-located the two files, making the suite flaky under -n auto. asyncio.run creates a fresh loop per call. Co-Authored-By: Claude Fable 5 Signed-off-by: Anthony Casagrande --- .../test_context_overflow_runtime_gate.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/tests/component_integration/test_context_overflow_runtime_gate.py b/tests/component_integration/test_context_overflow_runtime_gate.py index a07de5587..ed43732d9 100644 --- a/tests/component_integration/test_context_overflow_runtime_gate.py +++ b/tests/component_integration/test_context_overflow_runtime_gate.py @@ -66,7 +66,10 @@ def _export_and_load_sync(aggregate: AggregateResult, tmp_path: Path) -> dict: config = AggregateExporterConfig(result=aggregate, output_dir=tmp_path) exporter = AggregateConfidenceJsonExporter(config) - out_path = asyncio.get_event_loop().run_until_complete(exporter.export()) + # asyncio.run creates a fresh loop: get_event_loop() raises on 3.12 when + # a previously-run in-process CLI test (e.g. the agentic-replay e2e) has + # consumed the main thread's loop and the xdist worker reuses the process. + out_path = asyncio.run(exporter.export()) with open(out_path) as f: return json.load(f)