diff --git a/src/aiperf/dataset/loader/weka_parallel_convert.py b/src/aiperf/dataset/loader/weka_parallel_convert.py index 7f7f323e87..57b94bf55f 100644 --- a/src/aiperf/dataset/loader/weka_parallel_convert.py +++ b/src/aiperf/dataset/loader/weka_parallel_convert.py @@ -504,8 +504,11 @@ def _process_task(task: _WekaTraceTask) -> _WekaProcessTaskResult: if cp["subagent_index"] in dropped_subagent_indices: continue + # Subagents share the parent trace's ``hash_id_scope: "local"`` + # namespace (see _reconstruct_serial): scope on parent_trace_id, not + # the child session_id, so shared blocks decode identically. child_decode, child_partial, child_decode_text = _make_scope_helpers( - cp["session_id"], bs + cp["parent_trace_id"], bs ) child_recon = ConversationReconstructor( block_size=bs, diff --git a/src/aiperf/dataset/loader/weka_trace.py b/src/aiperf/dataset/loader/weka_trace.py index a127ebb908..fc037993de 100644 --- a/src/aiperf/dataset/loader/weka_trace.py +++ b/src/aiperf/dataset/loader/weka_trace.py @@ -1173,11 +1173,13 @@ def _reconstruct_serial( if cp.subagent_index in dropped_per_trace.get(cp.parent_trace_id, set()): continue child_model_map = model_map_per_trace.get(cp.parent_trace_id, {}) - # Subagent has its own scope: tool_tokens/system_tokens differ from - # the parent, and its block_cache must not leak across subagents. + # ``hash_id_scope: "local"`` is one namespace per trace FILE: a + # subagent shares its parent trace's scope so a hash_id reused + # across parent and subagent (or across siblings) decodes to the + # same tokens, reproducing the real cross-agent shared prefix. pg = self.prompt_generator pg._cache.clear() - pg._hash_id_corpus_rng.set_trace_id(cp.session_id) + pg._hash_id_corpus_rng.set_trace_id(cp.parent_trace_id) # Sync for ``_decode_block_tokens``; see parent loop above. self._block_size = cp.block_size diff --git a/src/aiperf/dataset/mmap_cache.py b/src/aiperf/dataset/mmap_cache.py index d8680cfbcc..cf8b727faa 100644 --- a/src/aiperf/dataset/mmap_cache.py +++ b/src/aiperf/dataset/mmap_cache.py @@ -33,8 +33,9 @@ (missing manifest.json) treats the entry as a MISS and overwrites it. Manifest version: - Bumped whenever the on-disk layout or the side-data schema changes. - Mismatches are treated as a MISS. + Bumped whenever the on-disk layout, the side-data schema, or the decoded + content the loaders produce for a given key changes. Mismatches are treated + as a MISS. """ from __future__ import annotations @@ -61,10 +62,15 @@ _logger = AIPerfLogger(__name__) -# Bump when cached side-data changes. Version 5 fixes Conversation.metadata() -# projection of per-turn theoretical prefix-cache block counts, which realtime -# profiling needs to report the trace-level infinite-cache hit rate. -MANIFEST_VERSION = 5 +# Bump when the on-disk layout, side-data schema, OR the decoded content the +# loaders produce for a given key changes -- the key has no source-code +# component, so a content-semantics fix must bump this or warm caches keep +# serving the old (wrong) dataset. Version 6 invalidates entries built before +# the weka subagent hash_id-scope fix (subagents now share the parent trace's +# scope, so shared blocks decode to different tokens than v5 produced). +# Version 5 fixed the Conversation.metadata() projection of per-turn +# theoretical prefix-cache block counts for realtime infinite-cache hit rate. +MANIFEST_VERSION = 6 MANIFEST_FILENAME = "manifest.json" INPUTS_JSON_FILENAME = "inputs.json" diff --git a/tests/integration/test_weka_hash_id_scope.py b/tests/integration/test_weka_hash_id_scope.py new file mode 100644 index 0000000000..8c33d957fc --- /dev/null +++ b/tests/integration/test_weka_hash_id_scope.py @@ -0,0 +1,182 @@ +# SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +"""End-to-end benchmark that stress-tests the Weka ``hash_id_scope: "local"`` +contract on the wire. + +A Weka trace declares one hash_id namespace per trace FILE: the same hash_id +must decode to identical tokens across the parent conversation and every +subagent (spawn-mode child) conversation of that trace. This is what lets +replay reproduce the cross-agent shared prefixes a real inference server serves +from KV cache. + +The crafted trace below has a parent turn and TWO sibling subagents whose inner +requests reference the EXACT same hash_id blocks as the parent's first turn +(``[10, 11, 12]``), with ``tool_tokens == system_tokens == 0`` and +``in == n*block_size`` so each first-turn prompt is purely the decoded blocks. +Under the correct (shared) scope, all three first-turn requests render to +byte-identical prompt text on the wire. A per-child decode scope (the bug this +guards against) would decode the shared blocks under different seeds, so the +sibling payloads -- and the parent vs child payloads -- would diverge. + +We run the real ``aiperf profile`` subprocess against the mock server, export +raw records (``--export-level raw``), and assert on the ACTUAL request payloads. +""" + +from __future__ import annotations + +import json +from pathlib import Path + +import pytest + +from tests.harness.utils import AIPerfCLI, AIPerfMockServer + +BLOCK_SIZE = 64 +SHARED_HASH_IDS = [10, 11, 12] +SHARED_IN = BLOCK_SIZE * len(SHARED_HASH_IDS) # exact tile -> no partial tail + + +def _normal(t, in_tokens, hash_ids, *, stop="end_turn", out=32): + return { + "t": t, + "type": "n", + "model": "test-model", + "in": in_tokens, + "out": out, + "hash_ids": hash_ids, + "input_types": ["text"], + "output_types": ["text"], + "stop": stop, + "api_time": 1.0, + "think_time": 0.0, + } + + +def _subagent(agent_id, t): + return { + "t": t, + "type": "subagent", + "agent_id": agent_id, + "subagent_type": "Explore", + "duration_ms": 1000, + "total_tokens": 100, + "tool_use_count": 1, + "status": "completed", + "requests": [_normal(0.0, SHARED_IN, SHARED_HASH_IDS)], + "models": ["test-model"], + "tool_tokens": 0, + "system_tokens": 0, + } + + +def _text_of(msg: dict) -> str | None: + c = msg.get("content") + if isinstance(c, str): + return c + if isinstance(c, list): + parts = [ + p["text"] + for p in c + if isinstance(p, dict) and isinstance(p.get("text"), str) + ] + return "".join(parts) if parts else None + return None + + +def _last_user_text(messages: list[dict]) -> str | None: + users = [m for m in messages if m.get("role") == "user"] + return _text_of(users[-1]) if users else None + + +@pytest.mark.integration +@pytest.mark.asyncio +class TestWekaHashIdScopeEndToEnd: + async def test_subagents_share_parent_hash_id_scope_on_the_wire( + self, + cli: AIPerfCLI, + aiperf_mock_server: AIPerfMockServer, + tmp_path: Path, + monkeypatch: pytest.MonkeyPatch, + ): + # Isolate the content-addressed mmap dataset cache to this run. The cache + # key is (input bytes, tokenizer, prompt/input settings) with no source + # component, so a global cache could otherwise replay a dataset built by + # a DIFFERENT loader version for the same trace -- masking a scope + # regression. A per-test dir forces a fresh load of the current code. + monkeypatch.setenv( + "AIPERF_DATASET_MMAP_CACHE_DIR", str(tmp_path / "mmap_cache") + ) + + trace = { + "id": "scope_stress", + "models": ["test-model"], + "block_size": BLOCK_SIZE, + "hash_id_scope": "local", + "tool_tokens": 0, + "system_tokens": 0, + "requests": [ + _normal(0.0, SHARED_IN, SHARED_HASH_IDS, stop="tool_use"), + _subagent("agent_001", 2.0), + _subagent("agent_002", 3.0), + # A later parent turn so the subagents join (are dispatched) + # rather than being dropped for lack of a following turn. + _normal(20.0, BLOCK_SIZE * 4, [10, 11, 12, 13]), + ], + } + trace_file = tmp_path / "scope_stress.json" + trace_file.write_text(json.dumps(trace)) + + result = await cli.run( + f""" + aiperf profile \ + --model test-model \ + --url {aiperf_mock_server.url} \ + --endpoint-type chat \ + --input-file {trace_file} \ + --custom-dataset-type weka_trace \ + --request-count 1 \ + --concurrency 1 \ + --workers-max 1 \ + --export-level raw \ + --ui simple + """, + timeout=300.0, + ) + + assert result.raw_records is not None, ( + "profile_export_raw.jsonl must exist when --export-level raw is set" + ) + + roots = [ + r for r in result.raw_records if r.metadata.parent_correlation_id is None + ] + kids = [ + r + for r in result.raw_records + if r.metadata.parent_correlation_id is not None + ] + + # Both sibling subagents must have been dispatched as spawn children. + assert len(kids) == 2, ( + f"expected 2 spawn-child records, got {len(kids)}: " + f"{[_last_user_text(r.payload.get('messages', [])) for r in kids]}" + ) + + # CORE SCOPE GUARD: the two siblings reference identical hash_id blocks, + # so under one shared trace scope they render byte-identical payloads. + # A per-child decode scope would make these diverge. + assert kids[0].payload["messages"] == kids[1].payload["messages"], ( + "sibling subagents referencing the same hash_ids must render " + "identical prompts -- they share the parent trace's hash_id scope" + ) + + # PARENT<->CHILD SHARING: the subagents reference the parent's turn-0 + # blocks exactly, so the child's user prompt must equal the parent's + # turn-0 user prompt (the shared blocks decode identically). + parent_turn0 = min(roots, key=lambda r: len(r.payload["messages"])) + assert _last_user_text(kids[0].payload["messages"]) == _last_user_text( + parent_turn0.payload["messages"] + ), ( + "a subagent reusing the parent's hash_id blocks must decode them to " + "the same prompt text as the parent (shared hash_id scope)" + ) diff --git a/tests/unit/dataset/loader/test_weka_trace.py b/tests/unit/dataset/loader/test_weka_trace.py index ac6288b39b..36a1333ca7 100644 --- a/tests/unit/dataset/loader/test_weka_trace.py +++ b/tests/unit/dataset/loader/test_weka_trace.py @@ -6,6 +6,7 @@ import pytest +from aiperf.common.hash_id_random_generator import HashIdRandomGenerator from aiperf.dataset.loader.weka_trace import WekaTraceLoader FIXTURES = Path(__file__).parents[3] / "fixtures" / "weka_traces" @@ -1032,3 +1033,150 @@ def normal( trace_b_turns = conv_by_id["trace_idle_b"].turns assert trace_b_turns[0].timestamp == 150_000.0 assert trace_b_turns[1].timestamp == 210_000.0 + + +# ============================================================================= +# hash_id_scope: subagents share the parent trace's hash_id namespace. +# +# A weka trace declares ``hash_id_scope: "local"`` == one hash_id namespace per +# trace FILE: the same hash_id must decode to identical tokens across the parent +# conversation and every subagent/sibling conversation of that trace, so replay +# reproduces the cross-agent shared prefixes a real server serves from KV cache. +# +# NOTE: these tests deliberately wire the REAL, scope-sensitive +# ``HashIdRandomGenerator`` (seeds from ``sha256(f"{seed}:{trace_id}:{hash_id}")``) +# instead of ``stub_hash_id_corpus_rng``. The stub ignores ``set_trace_id``, so a +# given hash_id decodes identically under any scope -- it cannot detect a +# per-child scope regression. +# ============================================================================= + + +def _wire_real_scope_rng(loader, *, block_size: int, seed: int = 1234) -> None: + """Wire a MagicMock prompt_generator backed by the real, scope-sensitive RNG. + + ``tokenizer.decode`` is a token-reflecting string so identical token lists + round-trip to identical text and differing token lists to differing text -- + letting a turn's ``raw_messages`` stand in for "what tokens this block decoded + to under the active scope". + """ + pg = MagicMock() + pg._cache = {} + pg._tokenized_corpus = list(range(4096)) + pg._corpus_size = 4096 + pg._hash_id_corpus_rng = HashIdRandomGenerator(seed, _internal=True) + pg.tokenizer.decode.side_effect = lambda toks: "|".join(str(t) for t in toks) + loader.prompt_generator = pg + loader._tokenizer_name = "test-tok" + loader._trust_remote_code = False + loader._tokenizer_revision = None + loader._block_size = block_size + + +def _write_trace(tmp_path: Path, trace: dict) -> str: + p = tmp_path / f"{trace['id']}.json" + p.write_text(json.dumps(trace)) + return str(p) + + +def _normal_req( + *, t: float, in_tokens: int, hash_ids: list[int], stop: str = "end_turn" +): + return { + "t": t, + "type": "n", + "model": "m", + "in": in_tokens, + "out": 10, + "hash_ids": hash_ids, + "input_types": ["text"], + "output_types": ["text"], + "stop": stop, + "api_time": 1.0, + "think_time": 0.0, + } + + +def _subagent(*, agent_id: str, t: float, in_tokens: int, hash_ids: list[int]): + return { + "t": t, + "type": "subagent", + "agent_id": agent_id, + "subagent_type": "Explore", + "duration_ms": 1000, + "total_tokens": 100, + "tool_use_count": 1, + "status": "completed", + "requests": [_normal_req(t=0.0, in_tokens=in_tokens, hash_ids=hash_ids)], + "models": ["m"], + "tool_tokens": 0, + "system_tokens": 0, + } + + +def test_convert_to_conversations_subagent_inherits_parent_hash_id_scope(tmp_path): + """A hash_id shared by a parent request and a subagent inner request decodes + to identical tokens -- the subagent shares the parent trace's scope, it does + NOT get a private per-child decode scope.""" + bs = 16 + shared = [100, 101, 102] + trace = { + "id": "trace_scope", + "models": ["m"], + "block_size": bs, + "hash_id_scope": "local", + "requests": [ + _normal_req(t=0.0, in_tokens=bs * len(shared), hash_ids=shared), + _subagent( + agent_id="agent_001", + t=2.0, + in_tokens=bs * len(shared), + hash_ids=shared, + ), + ], + } + loader = WekaTraceLoader( + filename=_write_trace(tmp_path, trace), user_config=_mk_user_config() + ) + _wire_real_scope_rng(loader, block_size=bs) + + convs = loader.convert_to_conversations(loader.load_dataset()) + parent = next(c for c in convs if c.session_id == "trace_scope") + child = next(c for c in convs if c.parent_conversation_id == "trace_scope") + + # in == n*bs and tool/system == 0, so turn-0 content is PURELY the decoded + # shared blocks (no partial tail, no system segment). Equal iff same scope. + assert child.turns[0].raw_messages == parent.turns[0].raw_messages + + +def test_convert_to_conversations_sibling_subagents_share_hash_id_scope(tmp_path): + """Two sibling subagents that reference the same hash_id blocks decode them + identically -- both share the parent trace's scope, not per-agent scopes. + Sibling sharing is the dominant cross-conversation block-reuse mode in real + captures, so this is the case a per-child scope regression corrupts most.""" + bs = 16 + shared = [200, 201] + trace = { + "id": "trace_sib", + "models": ["m"], + "block_size": bs, + "hash_id_scope": "local", + "requests": [ + _normal_req(t=0.0, in_tokens=bs * 3, hash_ids=[1, 2, 3], stop="tool_use"), + _subagent( + agent_id="agent_001", t=2.0, in_tokens=bs * len(shared), hash_ids=shared + ), + _subagent( + agent_id="agent_002", t=3.0, in_tokens=bs * len(shared), hash_ids=shared + ), + ], + } + loader = WekaTraceLoader( + filename=_write_trace(tmp_path, trace), user_config=_mk_user_config() + ) + _wire_real_scope_rng(loader, block_size=bs) + + convs = loader.convert_to_conversations(loader.load_dataset()) + children = [c for c in convs if c.parent_conversation_id == "trace_sib"] + assert len(children) == 2 + sib_a, sib_b = children + assert sib_a.turns[0].raw_messages == sib_b.turns[0].raw_messages