diff --git a/src/aiperf/timing/branch_orchestrator.py b/src/aiperf/timing/branch_orchestrator.py index 77b6bae3b4..b5cdf07f51 100644 --- a/src/aiperf/timing/branch_orchestrator.py +++ b/src/aiperf/timing/branch_orchestrator.py @@ -258,6 +258,12 @@ def __init__( # early when one branch completes before another branch's spawning # turn has been reached. self._gated_turn_prereq_keys: dict[tuple[str, int], set[str]] = {} + # (conv_id, gated_turn_idx, prereq_key) -> spawning turn index. + # Snapshot seeding consults this to tell prereqs whose spawning turn + # already fired before t* (their children either appear live in the + # snapshot or completed entirely pre-t*) apart from prereqs whose + # spawning turn will fire during replay. + self._prereq_spawning_turn: dict[tuple[str, int, str], int] = {} # Defense-in-depth duplicate detection against future loaders that # bypass ``validate_for_orchestrator_v1``. A given # ``(branch_id, gated_turn_idx)`` tuple must not appear twice — that @@ -294,6 +300,9 @@ def _build_prereq_index(self) -> None: self._gated_turn_prereq_keys.setdefault( (conv.conversation_id, gated_idx), set() ).add(prereq_key) + self._prereq_spawning_turn[ + (conv.conversation_id, gated_idx, prereq_key) + ] = spawning_idx def get_branch_ids(self, credit) -> list[str]: """Look up the completed turn's ``branch_ids`` from metadata. @@ -514,7 +523,19 @@ def _ensure_seeded_join( for prereq_key in self._gated_turn_prereq_keys.get( (parent_state.conversation_id, gated_idx), set() ): - pending.outstanding[prereq_key] = PrereqState() + state = PrereqState() + spawning_idx = self._prereq_spawning_turn.get( + (parent_state.conversation_id, gated_idx, prereq_key) + ) + if spawning_idx is not None and spawning_idx < parent_state.next_turn_index: + # The spawning turn fired before t* and will never replay. + # Children still alive at t* re-register with expected + # counts during this same seeding pass; a branch with no + # live children completed entirely pre-t* and must seed as + # satisfied, or the gate is permanently unsatisfiable and + # the parent lane silently wedges for the whole phase. + state.registered = True + pending.outstanding[prereq_key] = state if ( parent_state.waiting_on_children @@ -762,22 +783,22 @@ async def _spawn_children_and_register_gates( self.stats.children_errored += 1 self.stats.children_spawned -= 1 - # If no children at all landed (all failed), check for gates that - # are now zero-outstanding and dispatch the gated turn immediately - # to avoid hanging the parent. + # If no children at all landed (all failed), pop gates that are now + # zero-outstanding so the parent is not left suspended on a join + # that can never fire via the child-leaf decrement path. gates_for_parent = self._future_joins.get(parent_corr, {}) - drained_gates: list[PendingBranchJoin] = [] for gated_idx, pending in list(gates_for_parent.items()): # A gate may be vestigial (created this call and immediately # satisfied) if every child under every prereq rolled back. if pending.is_satisfied: - # Only fire NOW if the gate is the parent's IMMEDIATE next - # turn. A delayed gate (intervening turns precede it) must not - # dispatch out of order: pop it silently and let the parent - # advance turns normally; when it reaches the (now un-gated) - # turn, the strategy sends it as an ordinary continuation. - if gated_idx == credit.turn_index + 1: - drained_gates.append(pending) + # Pop silently regardless of position. With the gate gone, + # _maybe_suspend_parent returns False and the strategy's + # normal continuation dispatches the (now un-gated) turn as + # an ordinary next turn - exactly once. Dispatching the + # immediate-next gate here as well double-dispatched the + # same turn: intercept still returned False, so the callback + # handler fell through to handle_credit_return -> + # _dispatch_next_turn for the identical turn_index. self._pop_future_join(parent_corr, gated_idx) # If no successful children AND no gated turns, release the # reserved parent state so the parent can drain. @@ -799,11 +820,6 @@ async def _spawn_children_and_register_gates( del self._descendant_counts[parent_corr] self._notify_drain() # all-children-rolled-back path: no credit return follows - for pending in drained_gates: - # Zero-outstanding gate with no way to fire via child-leaf - # decrement: dispatch immediately (matches Phase 0 hang-fix). - await self._release_blocked_join(pending) - def _ensure_future_join( self, credit, diff --git a/src/aiperf/timing/strategies/agentic_replay.py b/src/aiperf/timing/strategies/agentic_replay.py index fe59dab462..cffa3acc53 100644 --- a/src/aiperf/timing/strategies/agentic_replay.py +++ b/src/aiperf/timing/strategies/agentic_replay.py @@ -132,17 +132,18 @@ def __init__( # Cache-bust state. WARMUP and PROFILING construct distinct strategy # instances (PhaseRunner builds a fresh AgenticReplayStrategy per # phase), while the shared TrajectorySource keeps each sampled lane's - # x_correlation_id stable across the phase boundary. The MARKER text is - # also warmup-coherent: the digest is computed from - # ``(benchmark_id, recycle_pass, trajectory_index, trace_id)`` — - # phase-agnostic — so warmup turn k_i and profile turn k_i+1 get - # the same marker within the continued session. That preserves the - # KV-cache lineage warmup is meant to prime. - # trajectory_index is stable per "lane" (slot in the trajectory list) - # and reused on recycle, so the digest changes only across recycle - # passes for a given trace_id. - self._recycle_pass: dict[str, int] = {} - self._session_marker: dict[str, str | None] = {} + # x_correlation_id stable across the phase boundary AND carries the + # marker ledger across it. A session continuing into PROFILING reuses + # the exact marker minted for it during WARMUP (see + # ``_mint_marker_for_session``), so warmup turn k_i and profile turn + # k_i+1 share the same marker within the continued session - the + # KV-cache lineage warmup is meant to prime is preserved by identity, + # not by replaying mint order. New sessions draw from the shared + # ``recycle_pass`` counter, which never restarts, so a recycled + # session's digest can never collide with a warmed one. + ledger = conversation_source.cache_bust_ledger + self._recycle_pass: dict[str, int] = ledger.recycle_pass + self._session_marker: dict[str, str | None] = ledger.session_marker self._correlation_to_lane: dict[str, int] = {} self._cache_bust_target: CacheBustTarget = ( user_config.input.prompt.cache_bust.target @@ -176,10 +177,12 @@ async def setup_phase(self) -> None: TimingManager construction time. PROFILING: build the FIFO recycle queue with the FULL set of loader - trace_ids (including trajectory ids). Trajectories run live at - PROFILING start (resumed at k_i+1); the pop loop in - ``_spawn_from_recycle_or_id`` skips trace_ids whose session is - currently active so we never spawn a duplicate concurrent session. + trace_ids (including trajectory ids), and pre-register every live + trajectory lane in ``_active_traces``. Trajectories run live at + PROFILING start (resumed at k_i+1); pre-registering them here - + rather than lane-by-lane during dispatch - means a lane that + recycles immediately at startup can never pop a trace whose own + lane simply hasn't dispatched yet (a duplicate concurrent session). """ if self.config.phase == CreditPhase.PROFILING: if not self.conversation_source.trajectories: @@ -188,6 +191,7 @@ async def setup_phase(self) -> None: "WARMUP must complete with at least one trajectory before " "PROFILING can start. Check loader output and warmup failures." ) + self._active_traces.update(self._lanes_per_trace) self._recycle_queue = asyncio.Queue() # Recycle pool spans the FULL dataset, not (full - trajectories). # Trajectories run live at PROFILING start (resumed at k_i+1) and @@ -260,43 +264,75 @@ async def _execute_warmup(self) -> None: async def _execute_profiling(self) -> None: """Resume each trajectory at ``k_i + 1`` to seed the steady state. - Subsequent turns and recycle-pool sessions are dispatched from - handle_credit_return. + All trajectories are dispatched concurrently so the full concurrency + target is reached as fast as slot limits allow, rather than + serializing over N credit round-trips. Subsequent turns and + recycle-pool sessions are dispatched from handle_credit_return. """ self.info( f"PROFILING execute: resuming {len(self.conversation_source.trajectories)} " f"trajectory sessions" ) - for lane, trajectory in enumerate(self.conversation_source.trajectories): - if trajectory.snapshot is not None: - await self._dispatch_snapshot_for_profiling(trajectory, lane) + # return_exceptions=True keeps ownership of every lane until it + # settles: a bare gather would re-raise the first failure while the + # sibling coroutines keep issuing credits into a failing phase, + # unreachable by the phase runner's cancellation. + results = await asyncio.gather( + *( + self._dispatch_one_profiling_trajectory(trajectory, lane) + for lane, trajectory in enumerate(self.conversation_source.trajectories) + ), + return_exceptions=True, + ) + first_error: BaseException | None = None + for lane, result in enumerate(results): + if not isinstance(result, BaseException): continue - - session = self.conversation_source.session_for(trajectory) - self._correlation_to_lane[session.x_correlation_id] = lane - self._active_traces[trajectory.conversation_id] += 1 - self._mint_marker_for_session( - session.x_correlation_id, trajectory.conversation_id, lane + trace_id = self.conversation_source.trajectories[lane].conversation_id + self.error( + f"PROFILING dispatch failed for lane {lane} " + f"(trace_id={trace_id!r}): {result!r}" ) - resume_index = trajectory.start_turn_index + 1 - num_turns = len(session.metadata.turns) - - if resume_index >= num_turns: - # Trajectory's k_i was already the last turn (rare: happens - # only for very short traces). Skip directly to recycle. - self.debug( - lambda cid=trajectory.conversation_id, - k=trajectory.start_turn_index, - n=num_turns: f"Trajectory {cid} k_i={k} >= last turn (n={n}); recycling immediately" - ) - await self._spawn_from_recycle_or_id( - trajectory.conversation_id, - finished_correlation_id=session.x_correlation_id, + if first_error is None: + first_error = result + if first_error is not None: + raise first_error + + async def _dispatch_one_profiling_trajectory( + self, trajectory: Trajectory, lane: int + ) -> None: + """Dispatch one lane's initial PROFILING credit (run under gather).""" + if trajectory.snapshot is not None: + await self._dispatch_snapshot_for_profiling(trajectory, lane) + return + + session = self.conversation_source.session_for(trajectory) + self._correlation_to_lane[session.x_correlation_id] = lane + # The lane's trace was pre-registered in _active_traces by setup_phase. + self._mint_marker_for_session( + session.x_correlation_id, trajectory.conversation_id, lane + ) + resume_index = trajectory.start_turn_index + 1 + num_turns = len(session.metadata.turns) + + if resume_index >= num_turns: + # Trajectory's k_i was already the last turn (rare: happens + # only for very short traces). Skip directly to recycle. + self.debug( + lambda: ( + f"Trajectory {trajectory.conversation_id} " + f"k_i={trajectory.start_turn_index} >= last turn " + f"(n={num_turns}); recycling immediately" ) - continue + ) + await self._spawn_from_recycle_or_id( + trajectory.conversation_id, + finished_correlation_id=session.x_correlation_id, + ) + return - turn = self._build_turn_for_session(session, resume_index) - await self.credit_issuer.issue_credit(turn) + turn = self._build_turn_for_session(session, resume_index) + await self.credit_issuer.issue_credit(turn) async def handle_credit_return( self, credit: Credit, *, error: str | None = None @@ -396,8 +432,9 @@ async def _spawn_from_recycle_or_id( The initial recycle queue spans the full dataset pool (including trajectory trace_ids whose sessions are running live at PROFILING - start). The pop loop skips trace_ids in ``_active_traces`` and - re-enqueues them to avoid duplicate concurrent sessions. + start; every live lane is pre-registered in ``_active_traces`` by + ``setup_phase``). The pop loop skips trace_ids in ``_active_traces`` + and re-enqueues them to avoid duplicate concurrent sessions. """ # Prune unconditionally so every early-return path leaves dicts clean. self._session_marker.pop(finished_correlation_id, None) @@ -452,10 +489,10 @@ async def _dispatch_snapshot_for_profiling( ) -> None: warmup_snapshot = self._get_snapshot(trajectory) snapshot = self._snapshot_continuation_after_warmup(trajectory) + # Each lane's single root session (continuing or terminal) was + # pre-registered in _active_traces by setup_phase. for state in snapshot.states: self._correlation_to_lane[state.x_correlation_id] = lane - if state.agent_depth == 0: - self._active_traces[state.conversation_id] += 1 self._mint_marker_for_session( state.x_correlation_id, state.conversation_id, lane ) @@ -472,7 +509,6 @@ async def _dispatch_snapshot_for_profiling( ] for state in terminal_roots: self._correlation_to_lane[state.x_correlation_id] = lane - self._active_traces[state.conversation_id] += 1 self._mint_marker_for_session( state.x_correlation_id, state.conversation_id, lane ) @@ -647,22 +683,26 @@ def _build_turn_for_session( def _mint_marker_for_session( self, x_correlation_id: str, trace_id: str, trajectory_index: int ) -> str | None: - """Mint and store a per-session cache-bust marker. + """Mint (or reuse) and store a per-session cache-bust marker. Returns None when the feature is disabled (target=NONE), in which case the session map records None so callers can unconditionally look it up. Increments _recycle_pass[trace_id] each time a new session is minted for the same trace_id, so digest rotates across - recycles within a single phase. - - The strategy is constructed FRESH for each phase (per the - TimingStrategyProtocol contract; PhaseRunner builds a new instance for - WARMUP and another for PROFILING). Both phases start with empty - ``_recycle_pass``, so the first mint for a given trace_id in PROFILING - produces ``pass=0`` — matching WARMUP's pass=0 digest for the same - (trace_id, lane) pair. The shared ``TrajectorySource`` also preserves - the lane's x_correlation_id across the phase boundary. + recycles. + + Both ``_session_marker`` and ``_recycle_pass`` live on the shared + ``TrajectorySource`` ledger, surviving the WARMUP -> PROFILING + boundary (strategies are constructed fresh per phase). A session + whose x_correlation_id was already minted - a continuing lane + resuming at k_i+1 - keeps its WARMUP marker verbatim instead of + re-minting, so a continued session's digest can never rotate at the + phase boundary regardless of mint order. The pass counter never + restarts, so fresh sessions (recycles, parents unblocked after t*) + can never collide with a warmed digest. """ + if x_correlation_id in self._session_marker: + return self._session_marker[x_correlation_id] if self._cache_bust_target == CacheBustTarget.NONE: self._session_marker[x_correlation_id] = None return None diff --git a/src/aiperf/timing/trajectory_source.py b/src/aiperf/timing/trajectory_source.py index a6c6c576ee..63aeb70ad5 100644 --- a/src/aiperf/timing/trajectory_source.py +++ b/src/aiperf/timing/trajectory_source.py @@ -81,6 +81,24 @@ class Trajectory: ) +@dataclass(slots=True) +class CacheBustLedger: + """Cross-phase cache-bust marker state. + + Lives on the shared ``TrajectorySource`` (constructed once at + TimingManager level) so the WARMUP and PROFILING strategy instances see + one ledger. A session that continues across the phase boundary keeps the + exact marker minted for it during WARMUP, and new sessions draw pass + numbers from a counter that never restarts - so a recycled session's + digest can never collide with a warmed one. + """ + + recycle_pass: dict[str, int] = field(default_factory=dict) + """Next-pass counter per trace_id; incremented on every fresh mint.""" + session_marker: dict[str, str | None] = field(default_factory=dict) + """Minted marker per live x_correlation_id (None when cache-bust is off).""" + + @dataclass(slots=True, frozen=True) class _BranchRuntime: branch_id: str @@ -189,6 +207,19 @@ def __init__( self._log_trajectory_summary() + @property + def cache_bust_ledger(self) -> CacheBustLedger: + """Marker ledger shared by the WARMUP and PROFILING strategy instances. + + Created lazily so sources built through ``__new__`` in tests get a + ledger on first access without extra setup. + """ + ledger = getattr(self, "_cache_bust_ledger", None) + if ledger is None: + ledger = CacheBustLedger() + self._cache_bust_ledger = ledger + return ledger + def _log_trajectory_summary(self) -> None: """Log a one-block table of every trajectory's start position. diff --git a/tests/component_integration/test_agentic_replay_cli_e2e.py b/tests/component_integration/test_agentic_replay_cli_e2e.py index 4f0ef98740..2b4703d793 100644 --- a/tests/component_integration/test_agentic_replay_cli_e2e.py +++ b/tests/component_integration/test_agentic_replay_cli_e2e.py @@ -253,8 +253,10 @@ def test_agentic_replay_cli_scenario_unsafe_override_runs_to_completion( "validator must log timing_mode auto-set under --scenario " "(covers the read-only-property setter path against real UserConfig)" ) - assert "auto-set --inter-turn-delay-cap-seconds=60.0" in log_text, ( - "validator must auto-set inter-turn-delay-cap when unset" + assert "auto-set --trace-idle-gap-cap-seconds=60.0" in log_text, ( + "validator must auto-set the per-trace idle-gap cap when unset " + "(the AgentX scenario locks trace_idle_gap_cap_seconds, not the " + "inter-turn delay cap, since 932b4bc)" ) assert result.json is not None, "JSON export must exist" diff --git a/tests/component_integration/test_context_overflow_runtime_gate.py b/tests/component_integration/test_context_overflow_runtime_gate.py index a07de5587e..ed43732d9b 100644 --- a/tests/component_integration/test_context_overflow_runtime_gate.py +++ b/tests/component_integration/test_context_overflow_runtime_gate.py @@ -66,7 +66,10 @@ def _export_and_load_sync(aggregate: AggregateResult, tmp_path: Path) -> dict: config = AggregateExporterConfig(result=aggregate, output_dir=tmp_path) exporter = AggregateConfidenceJsonExporter(config) - out_path = asyncio.get_event_loop().run_until_complete(exporter.export()) + # asyncio.run creates a fresh loop: get_event_loop() raises on 3.12 when + # a previously-run in-process CLI test (e.g. the agentic-replay e2e) has + # consumed the main thread's loop and the xdist worker reuses the process. + out_path = asyncio.run(exporter.export()) with open(out_path) as f: return json.load(f) diff --git a/tests/component_integration/timing/test_dag_timing_pathology.py b/tests/component_integration/timing/test_dag_timing_pathology.py index f541963149..3497609402 100644 --- a/tests/component_integration/timing/test_dag_timing_pathology.py +++ b/tests/component_integration/timing/test_dag_timing_pathology.py @@ -656,7 +656,9 @@ async def test_orchestrator_zero_child_branch_with_gate_does_not_hang() -> None: SPAWN_JOIN against it. The orchestrator's expected_gates path must create a future-join with an unregistered PrereqState seed (from _gated_turn_prereq_keys) AND mark it registered with expected=0 — so - is_done is True and the gate does NOT block the parent.""" + is_done is True and the gate does NOT block the parent. The drained + gate is popped silently: intercept returns False and the strategy's + normal continuation owns the single dispatch of the gated turn.""" branch = ConversationBranchInfo( branch_id="root:0", child_conversation_ids=[], # zero children @@ -681,12 +683,14 @@ async def test_orchestrator_zero_child_branch_with_gate_does_not_hang() -> None: orch = BranchOrchestrator(conversation_source=cs, credit_issuer=issuer) s = await orch.intercept(_mk_credit("root", "p", turn_index=0, num_turns=2)) - # No children -> the expected_gates path fires the join immediately, so - # by the time intercept returns the gate has been drained and the parent - # is NOT suspended. + # No children -> the gate drains at spawn time and is popped silently, so + # by the time intercept returns the parent is NOT suspended and the + # strategy's continuation path dispatches turn 1 exactly once. assert s is False, "zero-child branch must not deadlock parent at next turn" - issuer.dispatch_join_turn.assert_awaited_once() - assert orch.stats.parents_resumed == 1 + issuer.dispatch_join_turn.assert_not_awaited() + assert "p" not in orch._active_joins + assert not orch._future_joins.get("p") + assert orch.stats.parents_resumed == 0 assert orch.stats.parents_suspended == 0 diff --git a/tests/unit/timing/strategies/test_agentic_replay.py b/tests/unit/timing/strategies/test_agentic_replay.py index 50f84bace6..18cdd644fd 100644 --- a/tests/unit/timing/strategies/test_agentic_replay.py +++ b/tests/unit/timing/strategies/test_agentic_replay.py @@ -54,13 +54,14 @@ def _build_real_trajectory_source( num_traces: int, turns_per_trace: int, trajectories: list[Trajectory], + dataset: DatasetMetadata | None = None, ) -> TrajectorySource: """Build a real TrajectorySource with deterministic trajectories. We construct the source via __new__ + manual init so we control the trajectories exactly (avoid randomization in tests). """ - ds = _make_dataset(num_traces, turns_per_trace) + ds = dataset if dataset is not None else _make_dataset(num_traces, turns_per_trace) src = TrajectorySource.__new__(TrajectorySource) src._dataset_metadata = ds @@ -81,8 +82,11 @@ def _make_strategy( issuer: AsyncMock | None = None, scheduler: MagicMock | None = None, user_config: object | None = None, + dataset: DatasetMetadata | None = None, ) -> tuple[AgenticReplayStrategy, AsyncMock, MagicMock, TrajectorySource]: - src = _build_real_trajectory_source(num_traces, turns_per_trace, trajectories) + src = _build_real_trajectory_source( + num_traces, turns_per_trace, trajectories, dataset=dataset + ) cfg = MagicMock() cfg.phase = phase cfg.concurrency = len(trajectories) @@ -616,6 +620,342 @@ async def capture(turn): assert issued[0].x_correlation_id != "warmed-root" +@pytest.mark.asyncio +async def test_terminal_root_snapshot_recycles_are_concurrent_not_serial(): + """Regression: terminal-root recycles must not serialize profiling startup. + + Commit f47bd5537e introduced an awaited recycle per trajectory in the + startup dispatch loop. With N terminal-root trajectories this blocked the + Kth dispatch until the (K-1)th recycle completed, causing all 256 sessions + to trickle in over ~54 s instead of bursting at t=0 on a real cluster. + """ + N = 3 + # N single-turn traces become terminal roots: snapshot root is at turn 0 + # (the only turn), so _snapshot_continuation_after_warmup drops the state + # (resume_index=1 >= len(turns)=1) and it lands in terminal_roots. + trajectories = [ + Trajectory( + conversation_id=f"trace_{i}", + start_turn_index=0, + snapshot=TrajectorySnapshot( + t_star_ms=0.0, + states=( + ConversationState( + conversation_id=f"trace_{i}", + x_correlation_id=f"warmed-{i}", + next_turn_index=0, + ), + ), + ), + ) + for i in range(N) + ] + + # Gate that blocks each recycled credit until we release it. in_flight + # counts issue_credit calls simultaneously blocked at the gate - if + # serial only 1 is ever in-flight, if concurrent all N are. + gate = asyncio.Event() + in_flight = 0 + captured: list = [] + + async def gated_issue_credit(turn): + nonlocal in_flight + in_flight += 1 + captured.append(turn) + await gate.wait() + in_flight -= 1 + return True + + issuer = AsyncMock() + issuer.issue_credit.side_effect = gated_issue_credit + strategy, _, _, _ = _make_strategy( + phase=CreditPhase.PROFILING, + trajectories=trajectories, + num_traces=N, + turns_per_trace=1, + issuer=issuer, + ) + + await strategy.setup_phase() + task = asyncio.create_task(strategy.execute_phase()) + + # Pump the event loop enough times for all concurrent dispatches to reach + # the gate. With the serial bug only 1 will be in-flight; with the fix + # all N will be blocked at gate.wait() simultaneously. + for _ in range(N + 4): + await asyncio.sleep(0) + + concurrent_at_gate = in_flight + gate.set() + await asyncio.wait_for(task, timeout=5.0) + + assert concurrent_at_gate == N, ( + f"Expected {N} recycled credits in-flight simultaneously at profiling " + f"startup, but only {concurrent_at_gate} were. Terminal-root recycles " + "appear to be dispatched serially, blocking the startup loop." + ) + # Exactly one recycled turn-0 session per lane, each a distinct session - + # N-at-the-gate must not be satisfiable by the wrong N credits. + assert issuer.issue_credit.await_count == N + assert all(turn.turn_index == 0 for turn in captured) + assert len({turn.x_correlation_id for turn in captured}) == N + + +@pytest.mark.asyncio +async def test_plain_trajectory_resumes_are_concurrent_not_serial(): + """The k_i+1 resume path (timestamp-less trajectories) must burst at t=0. + + Companion to the terminal-root regression test: a refactor that + re-serializes only the snapshot-less resume dispatch would otherwise + ship green. + """ + N = 3 + trajectories = [ + Trajectory(conversation_id=f"trace_{i}", start_turn_index=0) for i in range(N) + ] + gate = asyncio.Event() + in_flight = 0 + + async def gated_issue_credit(turn): + nonlocal in_flight + in_flight += 1 + await gate.wait() + in_flight -= 1 + return True + + issuer = AsyncMock() + issuer.issue_credit.side_effect = gated_issue_credit + strategy, _, _, _ = _make_strategy( + phase=CreditPhase.PROFILING, + trajectories=trajectories, + num_traces=N, + turns_per_trace=4, + issuer=issuer, + ) + await strategy.setup_phase() + task = asyncio.create_task(strategy.execute_phase()) + for _ in range(N + 4): + await asyncio.sleep(0) + + concurrent_at_gate = in_flight + gate.set() + await asyncio.wait_for(task, timeout=5.0) + + assert concurrent_at_gate == N, ( + f"Expected {N} resume credits in-flight simultaneously at profiling " + f"startup, but only {concurrent_at_gate} were." + ) + + +@pytest.mark.asyncio +async def test_continuing_session_keeps_warmup_marker_across_phase_boundary(): + """A continued session's cache-bust marker must not rotate at the boundary. + + Under wrap-fill, two lanes share trace_X: lane 0's root is blocked on + children at t* (skipped by WARMUP dispatch) while lane 1's root is ready. + WARMUP therefore mints only lane 1; PROFILING mints both. Positional + re-minting hands lane 0 the pass=0 digest and bumps lane 1's continuing + session to pass=1 - the warmed KV prefix becomes unreachable for the + measured turns. The marker minted in WARMUP must be reused verbatim for + the same x_correlation_id in PROFILING. + """ + ds = _make_dataset(num_traces=1, turns_per_trace=3) + trajectories = [ + Trajectory( + conversation_id="trace_0", + start_turn_index=0, + snapshot=TrajectorySnapshot( + t_star_ms=0.0, + states=( + ConversationState( + conversation_id="trace_0", + x_correlation_id="A-root", + next_turn_index=2, + waiting_on_children=True, + join_target_turn_index=2, + ), + ), + ), + ), + Trajectory( + conversation_id="trace_0", + start_turn_index=0, + snapshot=TrajectorySnapshot( + t_star_ms=0.0, + states=( + ConversationState( + conversation_id="trace_0", + x_correlation_id="B-root", + next_turn_index=0, + ), + ), + ), + ), + ] + src = _build_real_trajectory_source(1, 3, trajectories, dataset=ds) + user_config = MagicMock() + user_config.input.prompt.cache_bust.target = CacheBustTarget.FIRST_TURN_PREFIX + user_config.benchmark_id = "bench" + + def _strategy_for(phase: CreditPhase, issuer: AsyncMock) -> AgenticReplayStrategy: + cfg = MagicMock() + cfg.phase = phase + return AgenticReplayStrategy( + config=cfg, + conversation_source=src, + scheduler=MagicMock(), + stop_checker=MagicMock(), + credit_issuer=issuer, + lifecycle=MagicMock(), + user_config=user_config, + ) + + warmup_issuer = AsyncMock() + warmup = _strategy_for(CreditPhase.WARMUP, warmup_issuer) + await warmup.setup_phase() + await warmup.execute_phase() + warmup_turns = [c.args[0] for c in warmup_issuer.issue_credit.await_args_list] + warmup_marker = warmup._session_marker["B-root"] + assert warmup_marker is not None + assert [t.cache_bust_marker for t in warmup_turns] == [warmup_marker] + + profiling_issuer = AsyncMock() + profiling = _strategy_for(CreditPhase.PROFILING, profiling_issuer) + await profiling.setup_phase() + await profiling.execute_phase() + profiling_turns = { + t.conversation_id: t + for c in profiling_issuer.issue_credit.await_args_list + for t in [c.args[0]] + if t.turn_index == 1 + } + continuing = profiling_turns["trace_0"] + assert continuing.cache_bust_marker == warmup_marker, ( + "Continuing session's marker rotated at the WARMUP->PROFILING " + "boundary - the warmed KV prefix is unreachable for measured turns" + ) + # The unblocked lane-0 parent is a distinct session and must NOT share + # the continuing session's digest. + assert profiling._session_marker["A-root"] != warmup_marker + + +@pytest.mark.asyncio +async def test_startup_recycle_does_not_pop_live_trajectory_trace(): + """A startup recycle must not spawn a trace whose own lane hasn't dispatched. + + setup_phase pre-registers every live trajectory lane in _active_traces: + without that, an early lane that recycles immediately pops the queue head + before later lanes register themselves, spawning a duplicate concurrent + session of a trace that its own lane is about to resume. + """ + # Lane 0: single-turn snapshot on trace_1 -> terminal root, recycles at + # startup. Lane 1: plain trajectory resuming trace_0 at k_i+1. Dataset + # order puts trace_0 at the queue head, so a full-pool queue would hand + # lane 0's recycle a fresh turn-0 session of the still-live trace_0. + conversations = [ + ConversationMetadata( + conversation_id="trace_0", + turns=[TurnMetadata(), TurnMetadata()], + ), + ConversationMetadata( + conversation_id="trace_1", + turns=[TurnMetadata()], + ), + ] + ds = DatasetMetadata( + conversations=conversations, + sampling_strategy=DatasetSamplingStrategy.SEQUENTIAL, + ) + trajectories = [ + Trajectory( + conversation_id="trace_1", + start_turn_index=0, + snapshot=TrajectorySnapshot( + t_star_ms=0.0, + states=( + ConversationState( + conversation_id="trace_1", + x_correlation_id="warmed-1", + next_turn_index=0, + ), + ), + ), + ), + Trajectory(conversation_id="trace_0", start_turn_index=0), + ] + captured: list[tuple[str, int]] = [] + + async def capture(turn): + captured.append((turn.conversation_id, turn.turn_index)) + return True + + issuer = AsyncMock() + issuer.issue_credit.side_effect = capture + strategy, _, _, _ = _make_strategy( + phase=CreditPhase.PROFILING, + trajectories=trajectories, + issuer=issuer, + dataset=ds, + ) + await strategy.setup_phase() + await strategy.execute_phase() + + # Lane 0's recycle must spawn its own just-finished trace_1 (the only + # eligible trace), never a turn-0 duplicate of the live trace_0. + assert sorted(captured) == [("trace_0", 1), ("trace_1", 0)] + assert strategy._active_traces["trace_0"] <= 1 + + +@pytest.mark.asyncio +async def test_profiling_dispatch_error_waits_for_siblings_and_reraises(): + """One lane's dispatch failure must not detach the sibling dispatches. + + execute_phase must keep ownership of every sibling lane until it settles, + then re-raise the failure. A bare gather would return the exception while + the remaining lanes keep issuing credits into a failing phase unsupervised. + """ + N = 3 + trajectories = [ + Trajectory(conversation_id=f"trace_{i}", start_turn_index=0) for i in range(N) + ] + gate = asyncio.Event() + completed: list[str] = [] + + async def gated_issue_credit(turn): + if turn.conversation_id == "trace_1": + raise RuntimeError("lane boom") + await gate.wait() + completed.append(turn.conversation_id) + return True + + issuer = AsyncMock() + issuer.issue_credit.side_effect = gated_issue_credit + strategy, _, _, _ = _make_strategy( + phase=CreditPhase.PROFILING, + trajectories=trajectories, + num_traces=N, + turns_per_trace=4, + issuer=issuer, + ) + await strategy.setup_phase() + task = asyncio.create_task(strategy.execute_phase()) + for _ in range(N + 4): + await asyncio.sleep(0) + + # Lane 1 has already raised, but lanes 0 and 2 are still blocked at the + # gate: phase execution must still be in flight rather than finished + # with an exception while its siblings run detached. + assert not task.done(), ( + "execute_phase finished while sibling dispatches were still in " + "flight - one lane's failure detached the remaining lanes" + ) + + gate.set() + with pytest.raises(RuntimeError, match="lane boom"): + await asyncio.wait_for(task, timeout=5.0) + assert sorted(completed) == ["trace_0", "trace_2"] + + @pytest.mark.asyncio async def test_profiling_skips_trajectory_at_last_turn_and_recycles(): """If k_i is already the last turn, k_i+1 is out of range. Recycle immediately.""" @@ -762,11 +1102,9 @@ async def capture(turn): # Register the in-flight session's lane bookkeeping (normally done by # _execute_profiling); handle_credit_return's recycle path now requires - # finished_correlation_id to be in _correlation_to_lane. Also seed - # _active_traces so the new full-pool pop loop's skip-active-on-pop - # logic mirrors a real run. + # finished_correlation_id to be in _correlation_to_lane. _active_traces + # was already pre-registered by setup_phase. strategy._correlation_to_lane["xcorr"] = 0 - strategy._active_traces["trace_0"] += 1 issuer.issue_credit.reset_mock() issued_sessions.clear() @@ -848,10 +1186,9 @@ async def capture(turn): assert strategy._recycle_queue.qsize() == 1 # Register the in-flight session's lane (normally done by _execute_profiling). - # Also seed _active_traces so the new pop loop skips trace_0 while it is - # nominally alive — discard happens at the top of _spawn_from_recycle_or_id. + # _active_traces was already pre-registered by setup_phase — discard + # happens at the top of _spawn_from_recycle_or_id. strategy._correlation_to_lane["xcorr"] = 0 - strategy._active_traces["trace_0"] += 1 issuer.issue_credit.reset_mock() issued_sessions.clear() diff --git a/tests/unit/timing/strategies/test_agentic_replay_context_overflow.py b/tests/unit/timing/strategies/test_agentic_replay_context_overflow.py index a418c67857..df664a486d 100644 --- a/tests/unit/timing/strategies/test_agentic_replay_context_overflow.py +++ b/tests/unit/timing/strategies/test_agentic_replay_context_overflow.py @@ -137,11 +137,10 @@ async def capture(turn): issuer=issuer, ) await strategy.setup_phase() - # Seed lane and _active_traces (the new full-pool pop loop skips alive - # trace_ids). The finishing trace is discarded from _active_traces at + # Seed the lane mapping; _active_traces was pre-registered by + # setup_phase. The finishing trace is discarded from _active_traces at # the top of _spawn_from_recycle_or_id before the pop loop runs. strategy._correlation_to_lane["xcorr"] = 0 - strategy._active_traces["trace_0"] += 1 # Mid-trajectory turn (index 2 of 5) errors with context-overflow. mid = _make_credit(conversation_id="trace_0", turn_index=2, num_turns=5) @@ -230,11 +229,10 @@ async def capture(turn): issuer=issuer, ) await strategy.setup_phase() - # Seed lane and _active_traces (the new full-pool pop loop skips alive - # trace_ids; the finishing trace is discarded at the top of + # Seed the lane mapping; _active_traces was pre-registered by + # setup_phase (the finishing trace is discarded at the top of # _spawn_from_recycle_or_id before the pop loop runs). strategy._correlation_to_lane["xcorr"] = 0 - strategy._active_traces["trace_0"] += 1 final = _make_credit(conversation_id="trace_0", turn_index=2, num_turns=3) await strategy.handle_credit_return( diff --git a/tests/unit/timing/strategies/test_agentic_replay_recycle_adversarial.py b/tests/unit/timing/strategies/test_agentic_replay_recycle_adversarial.py index 8d3431cb19..b2e0d8d172 100644 --- a/tests/unit/timing/strategies/test_agentic_replay_recycle_adversarial.py +++ b/tests/unit/timing/strategies/test_agentic_replay_recycle_adversarial.py @@ -155,9 +155,8 @@ async def capture(turn): assert strategy._recycle_queue.qsize() == 1 # Register the in-flight session's lane (normally done by _execute_profiling). - # Seed _active_traces so the new pop loop skips trace_0 while it is alive. + # _active_traces was already pre-registered by setup_phase. strategy._correlation_to_lane["xcorr"] = 0 - strategy._active_traces["trace_0"] += 1 # Final turn (last index = 2 of num_turns=3) final = _make_credit(conversation_id="trace_0", turn_index=2, num_turns=3) @@ -212,13 +211,10 @@ async def capture(turn): # Register lane bookkeeping for both in-flight sessions (normally seeded by # _execute_profiling). handle_credit_return's recycle path requires - # finished_correlation_id to be in _correlation_to_lane. Seed - # _active_traces too: the new full-pool pop loop skips trace_ids whose - # session is currently alive, mirroring _execute_profiling behavior. + # finished_correlation_id to be in _correlation_to_lane. _active_traces + # was already pre-registered by setup_phase. strategy._correlation_to_lane["xcorr_a"] = 0 strategy._correlation_to_lane["xcorr_b"] = 1 - strategy._active_traces["trace_0"] += 1 - strategy._active_traces["trace_1"] += 1 # Two parallel consumers complete. We use asyncio.gather to drive them # concurrently within the same event-loop tick. asyncio.Queue is non-blocking @@ -287,12 +283,10 @@ async def test_burst_of_ten_completions_preserves_completion_order(): # Full pool: queue holds all 12 traces at setup. assert strategy._recycle_queue.qsize() == 12 - # Register lane bookkeeping for the 10 in-flight sessions. Seed - # _active_traces too so the new pop loop skips trace_ids whose session - # is alive (mirroring _execute_profiling). + # Register lane bookkeeping for the 10 in-flight sessions. + # _active_traces was already pre-registered by setup_phase. for i in range(10): strategy._correlation_to_lane[f"xcorr_{i}"] = i - strategy._active_traces[f"trace_{i}"] += 1 # Fire 10 completions in completion order: trace_0..trace_9 finish in order. for i in range(10): @@ -368,11 +362,10 @@ async def capture(turn): # Full pool: queue holds all 70 traces at setup. assert len(initial_queue) == 70 - # Register lane bookkeeping for the 50 in-flight sessions. Seed - # _active_traces too so the new pop loop skips alive trace_ids. + # Register lane bookkeeping for the 50 in-flight sessions. + # _active_traces was already pre-registered by setup_phase. for i in range(50): strategy._correlation_to_lane[f"xcorr_{i}"] = i - strategy._active_traces[f"trace_{i}"] += 1 finals = [ _make_credit( @@ -475,11 +468,10 @@ async def test_recycle_during_cooldown_does_not_start_new_sessions(): # Full pool: queue holds all 5 traces at setup. assert initial_size == 5 - # Register the in-flight session's lane bookkeeping. Seed _active_traces - # so the cooldown gate is reached after the discard at the top of - # _spawn_from_recycle_or_id. + # Register the in-flight session's lane bookkeeping. _active_traces was + # pre-registered by setup_phase; the cooldown gate is reached after the + # discard at the top of _spawn_from_recycle_or_id. strategy._correlation_to_lane["xcorr"] = 0 - strategy._active_traces["trace_0"] += 1 # Final turn arrives during cooldown. final = _make_credit(conversation_id="trace_0", turn_index=1, num_turns=2) @@ -550,16 +542,13 @@ async def capture(turn): # dispatched session's (trace_id, correlation_id) to the tail. from collections import deque - # Seed the trajectory's correlation_ids and _active_traces: - # handle_credit_return now requires finished_correlation_id to be present - # in _correlation_to_lane, and the new full-pool pop loop skips trace_ids - # in _active_traces. Mimic _execute_profiling's bookkeeping for the - # initial trajectory cohort. + # Seed the trajectory's correlation_ids: handle_credit_return requires + # finished_correlation_id to be present in _correlation_to_lane. + # _active_traces was already pre-registered by setup_phase. in_flight: deque[tuple[str, str]] = deque() for lane in range(trajectory_count): corr = f"xcorr_traj_{lane}" strategy._correlation_to_lane[corr] = lane - strategy._active_traces[f"trace_{lane}"] += 1 in_flight.append((f"trace_{lane}", corr)) total_completions = 1500 @@ -907,7 +896,6 @@ async def capture(turn): ) await strategy.setup_phase() strategy._correlation_to_lane["xcorr"] = 0 - strategy._active_traces["trace_0"] += 1 # Root credit: agent_depth defaults to 0 via _make_credit. root_final = _make_credit(conversation_id="trace_0", turn_index=1, num_turns=2) diff --git a/tests/unit/timing/test_branch_orchestrator.py b/tests/unit/timing/test_branch_orchestrator.py index 597c88bf93..9145d8e0e7 100644 --- a/tests/unit/timing/test_branch_orchestrator.py +++ b/tests/unit/timing/test_branch_orchestrator.py @@ -124,6 +124,164 @@ def get_metadata(self, conversation_id): assert released.gated_turn_index == 2 +def _two_branch_join_meta() -> tuple[ + ConversationMetadata, dict[str, ConversationMetadata] +]: + """Parent with branches A (spawned turn 1) and B (spawned turn 2), both + gating turn 3. Children are one-turn SPAWN conversations.""" + parent_meta = ConversationMetadata( + conversation_id="parent", + turns=[ + TurnMetadata(timestamp_ms=0.0), + TurnMetadata(timestamp_ms=10_000.0, branch_ids=["A"]), + TurnMetadata(timestamp_ms=20_000.0, branch_ids=["B"]), + TurnMetadata( + timestamp_ms=30_000.0, + prerequisites=[ + TurnPrerequisite(kind=PrerequisiteKind.SPAWN_JOIN, branch_id="A"), + TurnPrerequisite(kind=PrerequisiteKind.SPAWN_JOIN, branch_id="B"), + ], + ), + ], + branches=[ + ConversationBranchInfo( + branch_id="A", + child_conversation_ids=["a_child"], + mode=ConversationBranchMode.SPAWN, + start_timestamp_ms=11_000.0, + ), + ConversationBranchInfo( + branch_id="B", + child_conversation_ids=["b_child"], + mode=ConversationBranchMode.SPAWN, + start_timestamp_ms=21_000.0, + ), + ], + ) + children = { + cid: ConversationMetadata( + conversation_id=cid, + turns=[TurnMetadata(timestamp_ms=ts)], + is_root=False, + agent_depth=1, + parent_conversation_id="parent", + ) + for cid, ts in [("a_child", 11_000.0), ("b_child", 21_000.0)] + } + return parent_meta, children + + +def _source_for(parent_meta, children): + class _Source: + dataset_metadata = DatasetMetadata( + conversations=[parent_meta, *children.values()], + sampling_strategy=DatasetSamplingStrategy.SEQUENTIAL, + ) + + def get_metadata(self, conversation_id): + return {"parent": parent_meta, **children}[conversation_id] + + return _Source() + + +@pytest.mark.asyncio +async def test_seed_snapshot_prereq_completed_before_t_star_does_not_wedge_gate(): + """A prereq group that fully completed before t* must not wedge the join. + + Two branches share the turn-3 gate. t* falls between their completions: + branch A's child finished pre-t* (absent from the snapshot) while branch + B's child is live. A's spawning turn fired before the parent's resume + position and can never re-fire during replay, so its prereq key must be + seeded as satisfied - otherwise the gate is permanently unsatisfiable and + the parent lane silently wedges for the entire phase. + """ + parent_meta, children = _two_branch_join_meta() + issuer = MagicMock() + issuer.dispatch_join_turn = AsyncMock(return_value=True) + orch = BranchOrchestrator( + conversation_source=_source_for(parent_meta, children), credit_issuer=issuer + ) + states = ( + ConversationState( + conversation_id="parent", + x_correlation_id="parent-corr", + next_turn_index=3, + waiting_on_children=True, + join_target_turn_index=3, + ), + ConversationState( + conversation_id="b_child", + x_correlation_id="b-corr", + next_turn_index=0, + agent_depth=1, + parent_correlation_id="parent-corr", + join_target_turn_index=3, + branch_id="B", + branch_mode=ConversationBranchMode.SPAWN, + ), + ) + orch.seed_snapshot(states) + + pending = orch._active_joins["parent-corr"] + assert pending.outstanding["SPAWN_JOIN:A"].is_done, ( + "Branch A completed before t* but its prereq was seeded unsatisfiable" + ) + assert not pending.is_satisfied # B's live child is still outstanding. + + await orch.on_child_leaf_reached("b-corr") + + issuer.dispatch_join_turn.assert_awaited_once() + released = issuer.dispatch_join_turn.await_args.args[0] + assert released.parent_x_correlation_id == "parent-corr" + assert released.gated_turn_index == 3 + assert "parent-corr" not in orch._active_joins + + +@pytest.mark.asyncio +async def test_seed_snapshot_keeps_unfired_future_prereq_unregistered(): + """A prereq whose spawning turn replays after t* must stay unregistered. + + The parent resumes at turn 2 (B's spawning turn), so branch B will fire + during replay: seeding it as satisfied would release the gate before B's + children even spawn. Only branches whose spawning turn fired before the + parent's resume position may be auto-satisfied. + """ + parent_meta, children = _two_branch_join_meta() + issuer = MagicMock() + issuer.dispatch_join_turn = AsyncMock(return_value=True) + orch = BranchOrchestrator( + conversation_source=_source_for(parent_meta, children), credit_issuer=issuer + ) + states = ( + ConversationState( + conversation_id="parent", + x_correlation_id="parent-corr", + next_turn_index=2, + ), + ConversationState( + conversation_id="a_child", + x_correlation_id="a-corr", + next_turn_index=0, + agent_depth=1, + parent_correlation_id="parent-corr", + join_target_turn_index=3, + branch_id="A", + branch_mode=ConversationBranchMode.SPAWN, + ), + ) + orch.seed_snapshot(states) + + pending = orch._future_joins["parent-corr"][3] + assert pending.outstanding["SPAWN_JOIN:A"].expected == 1 + assert not pending.outstanding["SPAWN_JOIN:B"].registered + + await orch.on_child_leaf_reached("a-corr") + + # A is done but B has not even spawned yet: the gate must hold. + issuer.dispatch_join_turn.assert_not_awaited() + assert orch._future_joins["parent-corr"][3] is pending + + @pytest.mark.asyncio async def test_intercept_with_spawn_dispatches_children_and_registers_sticky(): """Phase 1 semantics: intercept returns False after a pure-spawn with no diff --git a/tests/unit/timing/test_branch_orchestrator_phase0.py b/tests/unit/timing/test_branch_orchestrator_phase0.py index 6e53f4e234..01c82283a4 100644 --- a/tests/unit/timing/test_branch_orchestrator_phase0.py +++ b/tests/unit/timing/test_branch_orchestrator_phase0.py @@ -8,9 +8,11 @@ - ``dispatch_join_turn`` propagates ``parent_branch_mode`` and ``parent_has_forks_on_gated_turn`` from :class:`PendingBranchJoin` instead of hardcoding FORK. -- ``BranchOrchestrator.intercept`` dispatches the gated join turn immediately - when every ``start_branch_child`` call fails (no children landed), instead - of registering a dead pending join that hangs the parent. +- ``BranchOrchestrator.intercept`` pops the drained gate silently and returns + False when every ``start_branch_child`` call fails (no children landed), + so the strategy's normal continuation dispatches the gated turn exactly + once - neither hanging the parent on a dead pending join nor + double-dispatching it from the orchestrator. """ from __future__ import annotations @@ -122,11 +124,14 @@ async def _try_issue(turn: TurnToSend) -> bool: @pytest.mark.asyncio -async def test_intercept_all_children_failed_with_gate_does_not_hang(): +async def test_intercept_all_children_failed_defers_gated_turn_to_strategy(): """When every ``start_branch_child`` raises on a parent turn whose next - turn is gated, the future join has zero outstanding children and - would never fire via the child-leaf decrement path. The orchestrator - must dispatch the gated turn immediately.""" + turn is gated, the future join has zero outstanding children and would + never fire via the child-leaf decrement path. The drained gate must be + popped silently and intercept must return False: the strategy's normal + continuation then dispatches turn k+1 exactly once. Dispatching the join + here AND returning False sent the same turn twice (the callback handler + falls through to handle_credit_return -> _dispatch_next_turn).""" branch = ConversationBranchInfo( branch_id="root:0", child_conversation_ids=["a", "b"], @@ -163,25 +168,19 @@ async def test_intercept_all_children_failed_with_gate_does_not_hang(): branch_mode=ConversationBranchMode.FORK, ) - # No children landed; the gate was drained at spawn time and the join - # fired immediately (not deferred). Parent's next turn is turn 1 but - # intercept returns False because the join already dispatched (the - # future/active join entries are gone). + # All children errored before any landed: the gate is "satisfied" with + # zero outstanding, popped silently, and the parent is not suspended. result = await orch.intercept(credit) - # Since all children errored before any landed, the gate was "satisfied" - # with zero outstanding and dispatched immediately. No suspension. assert result is False - # Gated turn dispatched exactly once. - assert issuer.dispatch_join_turn.await_count == 1 - dispatched_pending = issuer.dispatch_join_turn.await_args.args[0] - assert dispatched_pending.gated_turn_index == 1 - assert dispatched_pending.total_outstanding == 0 + # The single dispatch of turn 1 belongs to the strategy's continuation + # path; the orchestrator must not issue the join turn itself. + issuer.dispatch_join_turn.assert_not_awaited() - # No leaked per-parent state. + # No leaked per-parent state to wedge phase-end draining. assert "root-corr" not in orch._active_joins assert "root-corr" not in orch._future_joins assert "root-corr" not in orch._descendant_counts - assert orch.stats.parents_resumed == 1 + assert orch.stats.parents_resumed == 0 assert orch.stats.children_errored == 2 assert orch.stats.children_spawned == 0