Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
186 changes: 125 additions & 61 deletions src/aiperf/workers/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
CacheBustTarget,
CommAddress,
CommandType,
ConversationBranchMode,
MemoryMapFormat,
MessageType,
)
Expand Down Expand Up @@ -101,6 +102,27 @@ def _apply_cache_bust_to_system_message(
return system_message


def _content_has_marker_at_edge(
content: object, marker: str, *, is_prefix: bool
) -> bool:
"""Whether ``content`` already carries ``marker`` at the prefix/suffix edge.

The injection helpers run once per credit and several paths mutate a turn
object shared across the session's turns (delta-mode ``turn_list[0]``, and
the unconditional every-credit first-user mark used for seeded resumes), so
re-injecting the constant per-session marker must not stack it. This check
is exact (the per-session marker is constant; a fresh recycled play sees
pristine content and injects its own marker). Handles plain-string and
OpenAI multimodal list-of-parts content.
"""
if isinstance(content, str):
return content.startswith(marker) if is_prefix else content.endswith(marker)
if isinstance(content, list) and content:
marker_part = {"type": "text", "text": marker.strip()}
return content[0 if is_prefix else -1] == marker_part
return False


def _inject_marker_into_raw_messages(
raw_messages: list[dict], marker: str, *, is_prefix: bool
) -> None:
Expand All @@ -109,14 +131,17 @@ def _inject_marker_into_raw_messages(
No-op when raw_messages is empty or the first message is not a system role.
For multimodal content (``content`` is a list of parts), the marker is
inserted as a new ``{"type": "text", "text": marker}`` part at the start
(prefix) or end (suffix) of the parts list.
(prefix) or end (suffix) of the parts list. Idempotent via
:func:`_content_has_marker_at_edge`.
"""
if not raw_messages or not marker:
return
first = raw_messages[0]
if not isinstance(first, dict) or first.get("role") != "system":
return
content = first.get("content", "")
if _content_has_marker_at_edge(content, marker, is_prefix=is_prefix):
return
if isinstance(content, str):
raw_messages[0] = {
**first,
Expand All @@ -142,13 +167,18 @@ def _inject_marker_into_first_user_turn(
No-op when raw_messages is empty. For multimodal content (``content`` is
a list of parts), the marker is inserted as a new
``{"type": "text", "text": marker}`` part at the start (prefix) or end
(suffix) of the parts list.
(suffix) of the parts list. Idempotent via
:func:`_content_has_marker_at_edge` — FIRST_TURN_* injection runs every
credit (to mark seeded turn 0 on mid-trajectory resumes), so repeated calls
on the same shared turn must not stack the marker.
"""
if not raw_messages or not marker:
return
for idx, msg in enumerate(raw_messages):
if isinstance(msg, dict) and msg.get("role") == "user":
content = msg.get("content", "")
if _content_has_marker_at_edge(content, marker, is_prefix=is_prefix):
return
if isinstance(content, str):
raw_messages[idx] = {
**msg,
Expand Down Expand Up @@ -231,6 +261,8 @@ def _inject_marker_into_first_user_text(
first.contents = [marker.strip()]
return
existing = first.contents[0]
if _content_has_marker_at_edge(existing, marker, is_prefix=is_prefix):
return

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Text seed idempotency mismatch

Medium Severity

_content_has_marker_at_edge treats a full prefix/suffix marker on strings, but _inject_marker_into_first_user_text seeds marker-only synthetic turns with marker.strip() (no surrounding newlines). Because injection now runs every credit, a second call does not recognize the edge marker and prepends/appends the full marker again, stacking [rid:…] tokens on the shared turn.

Fix in Cursor Fix in Web

Reviewed by Cursor Bugbot for commit 1f161a7. Configure here.

first.contents[0] = (marker + existing) if is_prefix else (existing + marker)


Expand Down Expand Up @@ -272,56 +304,113 @@ def _apply_cache_bust(
``turn_list`` where the system role lives in ``turn_list[0]`` and later
deltas start with the prior assistant response).

SYSTEM_* fallback: when ``target`` is ``SYSTEM_PREFIX`` / ``SYSTEM_SUFFIX``
and there is no system message anywhere (neither a Conversation-level
``system_message`` nor a leading ``role=="system"`` entry in any turn's
``raw_messages``), the marker is routed to the first user turn with the
same prefix/suffix orientation — i.e. SYSTEM_PREFIX falls back to a
first-user-turn prefix, SYSTEM_SUFFIX falls back to a first-user-turn
suffix. Without a system prompt the first user message is the prefix of
the entire wire payload, so this produces the same physical token-0
divergence without fabricating a system role. The fallback is gated on
``credit.turn_index == 0`` (matches FIRST_TURN_* semantics: marker only
affects the first turn's KV cache; later turns inherit).
Injection targets the *effective wire prefix* (see
:func:`_effective_prefix_turns`): the slice of ``turn_list`` that
``build_messages`` actually emits. A ``reset_context`` turn makes
``build_messages`` discard every prior turn, so the effective prefix begins
at the last such turn — which may sit mid-history (seeded on a resume,
never dispatched as the current turn), not just at ``turn_list[-1]``.
Marking the discarded turn 0 instead would leave the real prefix unmarked
and let recycled plays warm the server's cache on identical post-reset
bytes. ``SYSTEM_*`` targets are handled in
:func:`_apply_system_target_cache_bust`.
"""
marker = credit.cache_bust_marker
target = credit.cache_bust_target

if not marker or target == CacheBustTarget.NONE:
return system_message

# FORK children share the parent's KV cache by design: they seed turn_list
# from the parent (the SAME Turn objects) and must send the parent's exact
# prefix to hit its cache. The parent already injected its marker into those
# shared turns, so the child inherits it for free — re-busting here would
# diverge the child's prefix from the parent's (cache miss) AND mutate the
# parent's shared, read-only Turn objects (stacking markers). So cache-bust
# is a no-op for FORK children. SPAWN children start fresh (no shared turns)
# and root sessions own their prefix, so both are busted normally.
if (
session.parent_correlation_id is not None
and session.branch_mode == ConversationBranchMode.FORK
):
return system_message

is_prefix = target in (
CacheBustTarget.SYSTEM_PREFIX,
CacheBustTarget.FIRST_TURN_PREFIX,
)
prefix_turns = _effective_prefix_turns(session)

if target in (CacheBustTarget.SYSTEM_PREFIX, CacheBustTarget.SYSTEM_SUFFIX):
# Three sub-paths with intentionally different semantics:
# 1. Conversation-level system_message present: marker injected
# every turn (string mutation re-applied per credit).
# 2. raw_messages first dict has role=="system": marker injected
# every turn (raw mutation re-applied per credit). Under deltas
# that dict lives in turn_list[0]; under message-array it lives
# in turn_list[-1] (same single turn).
# 3. No system anywhere -> first-user-turn fallback: marker injected
# ONLY on turn_index == 0. Subsequent turns inherit via the
# inference server's prefix-cache hit, matching FIRST_TURN_*
# semantics. Re-injecting on every turn would drift token-0 on
# every credit and fragment the cache key.
if system_message is not None:
return _apply_cache_bust_to_system_message(system_message, marker, target)
raw_system = _find_first_system_message(session.turn_list)
if raw_system is not None:
_inject_marker_into_raw_messages(raw_system, marker, is_prefix=is_prefix)
elif credit.turn_index == 0:
_inject_marker_at_first_user(session.turn_list, marker, is_prefix=is_prefix)
return system_message
return _apply_system_target_cache_bust(
prefix_turns,
system_message=system_message,
marker=marker,
target=target,
is_prefix=is_prefix,
)

if credit.turn_index == 0:
_inject_marker_at_first_user(session.turn_list, marker, is_prefix=is_prefix)
# Mark the effective prefix's opening user turn every credit (idempotent).
# Unconditional rather than turn_index==0-gated so a seeded mid-trajectory
# resume (turn_list back-filled with turns 0..k_i at a credit whose
# turn_index > 0) still marks the true wire prefix.
_inject_marker_at_first_user(prefix_turns, marker, is_prefix=is_prefix)
return system_message


def _apply_system_target_cache_bust(
prefix_turns: list[Turn],
*,
system_message: str | None,
marker: str,
target: CacheBustTarget,
is_prefix: bool,
) -> str | None:
"""Inject a ``SYSTEM_PREFIX`` / ``SYSTEM_SUFFIX`` marker for one credit.

``prefix_turns`` is the effective wire prefix slice (see
:func:`_effective_prefix_turns`). Three sub-paths:
1. Conversation-level ``system_message`` present: marker applied every
turn (string mutation re-applied per credit). Unaffected by
``reset_context`` — the ``system_message`` rides on ``RequestInfo`` and
is re-emitted every turn independent of ``build_messages``' reset.
2. ``raw_messages`` first dict has ``role=="system"``: marker injected
into the first system message of the prefix slice.
3. No system in the slice -> first-user-turn fallback: marker injected
into the first user turn every credit (idempotent), matching
``FIRST_TURN_*`` semantics so a seeded mid-trajectory resume still
marks the prefix.

Returns the (possibly modified) ``system_message``.
"""
if system_message is not None:
return _apply_cache_bust_to_system_message(system_message, marker, target)
raw_system = _find_first_system_message(prefix_turns)
if raw_system is not None:
_inject_marker_into_raw_messages(raw_system, marker, is_prefix=is_prefix)
else:
_inject_marker_at_first_user(prefix_turns, marker, is_prefix=is_prefix)
return system_message


def _effective_prefix_turns(session: UserSession) -> list[Turn]:
"""The ``turn_list`` slice that forms the wire prefix for cache-bust.

``base_endpoint.build_messages`` restarts the message array at every
``reset_context`` turn that carries ``raw_messages`` (discarding everything
before it), so the effective prefix begins at the *last* such turn in
``turn_list`` — not turn 0, and not merely ``turn_list[-1]``: a reset can
sit mid-history (e.g. seeded into a mid-trajectory resume, where it is never
dispatched as the current turn). Returns the slice from that turn to the
end, or the whole ``turn_list`` when there is no reset.
"""
turns = session.turn_list
for i in range(len(turns) - 1, -1, -1):
if turns[i].reset_context and turns[i].raw_messages:
return turns[i:]
return turns


class Worker(BaseComponentService, ProcessHealthMixin):
"""Worker processes credits from the TimingManager and makes API calls to inference servers.

Expand Down Expand Up @@ -845,8 +934,7 @@ def _maybe_warn_cache_bust_silent_drop(
credit: Credit,
) -> None:
"""Emit a one-shot warning if cache-bust was requested but had nowhere
to land on this credit (e.g. SYSTEM_* on turn>0 with no system anywhere,
or empty session.turn_list).
to land on this credit (an empty ``session.turn_list``).

Rate-limited to once per worker via ``self._cache_bust_warning_shown`` —
the misconfiguration is identical for every credit, so a single
Expand All @@ -864,30 +952,6 @@ def _maybe_warn_cache_bust_silent_drop(
f"cache-bust target={target.value} requested but session.turn_list "
f"is empty — marker NOT injected (further occurrences suppressed)."
)
return
# SYSTEM_* on turn>0 with no system anywhere: the fallback is gated on
# turn_index==0 by design (see _apply_cache_bust comments), so the
# marker is intentionally NOT re-applied. Surface this once so users
# configuring cache-bust against a synthetic / no-system trace see why
# token-0 didn't drift.
if target in (CacheBustTarget.SYSTEM_PREFIX, CacheBustTarget.SYSTEM_SUFFIX):
if session.conversation.system_message is not None:
return
last_turn = session.turn_list[-1]
raw = last_turn.raw_messages
has_raw_system = bool(
raw and isinstance(raw[0], dict) and raw[0].get("role") == "system"
)
if not has_raw_system and credit.turn_index > 0:
self._cache_bust_warning_shown = True
self.warning(
f"cache-bust target={target.value} requested but trace has no "
f"system message (neither Conversation.system_message nor "
f"raw_messages[0].role=='system'); fallback to first-user-turn "
f"only fires on turn_index==0, so subsequent turns inherit the "
f"already-prefixed prompt. This is intentional (matches "
f"FIRST_TURN_* semantics) — further occurrences suppressed."
)

async def _execute_request(
self,
Expand Down
47 changes: 19 additions & 28 deletions tests/component_integration/test_agentic_replay_cache_bust.py
Original file line number Diff line number Diff line change
Expand Up @@ -291,14 +291,12 @@ def test_agentic_replay_cache_bust_marker_in_wire_payload(
across all turns of a session, distinct across sessions, and absent
from the trace turn bodies.

Note on ``FIRST_TURN_*`` semantics (spec §4.5): the worker only injects
the marker at ``credit.turn_index == 0``. Agentic_replay trajectories
that resume at ``k_i > 0`` therefore never see a FIRST_TURN_* marker —
only sessions that begin at turn 0 (recycled spawns and k_i=0
trajectories) carry one. We restrict the per-session continuity /
cross-session distinctness assertions to *marked* sessions for
FIRST_TURN_* and require at least one such marked session to exist.
SYSTEM_* applies on every turn, so marker coverage is universal.
Marker coverage is universal for every target: FIRST_TURN_* injects into
the effective wire prefix's opening user turn on every credit (including
mid-trajectory resumes at ``k_i > 0``, whose seeded turn 0 is the real
prefix), and SYSTEM_* applies every turn. So every profiling session must
carry exactly one marker — a regression of the seeded-resume fix would show
up here as an unmarked ``k_i > 0`` session.
"""
cmd = _build_cmd(weka_with_system_dir, cache_bust=target)
result = cli.run_sync(cmd, timeout=defaults.timeout)
Expand Down Expand Up @@ -329,10 +327,6 @@ def test_agentic_replay_cache_bust_marker_in_wire_payload(
f"got {len(by_session)}: {list(by_session.keys())}"
)

is_first_turn_target = target in (
CacheBustTarget.FIRST_TURN_PREFIX,
CacheBustTarget.FIRST_TURN_SUFFIX,
)
is_prefix_target = target in (
CacheBustTarget.SYSTEM_PREFIX,
CacheBustTarget.FIRST_TURN_PREFIX,
Expand Down Expand Up @@ -387,22 +381,19 @@ def test_agentic_replay_cache_bust_marker_in_wire_payload(
)
session_rids[xcorr] = next(iter(rids_in_session))

if is_first_turn_target:
# FIRST_TURN_* only fires when credit.turn_index == 0. With our
# 6-trace fixture + concurrency=3 + duration=8s, recycled sessions
# always start at turn 0, so at least one session must be marked.
assert len(session_rids) >= 1, (
f"target={target}: no session received a FIRST_TURN marker. "
f"Recycled sessions begin at turn_index=0 and must inject. "
f"Total sessions={len(by_session)}, "
f"unmarked={len(sessions_without_marker)}"
)
else:
# SYSTEM_* applies on every turn -> every session must be marked.
assert not sessions_without_marker, (
f"target={target}: SYSTEM_* must mark every session; "
f"unmarked={sessions_without_marker}"
)
# Every profiling session must carry a marker, for ALL targets. FIRST_TURN_*
# marks the effective prefix's opening user turn on every credit — including
# seeded mid-trajectory resumes at k_i > 0 — so an unmarked session here is a
# regression of the seeded-resume fix. SYSTEM_* applies every turn. (This
# fixture is linear, no FORK children — FORK inheritance is covered in the
# DAG cache-bust test and the worker unit tests.)
assert not sessions_without_marker, (
f"target={target}: every session must be marked, but these were not: "
f"{sessions_without_marker}. Total sessions={len(by_session)}."
)
assert len(session_rids) >= 1, (
f"target={target}: no marked sessions at all (fixture/run too small?)."
)

# Cross-session distinctness: among marked sessions we want >= 2 distinct
# rids whenever there are >= 2 marked sessions (which is the common case).
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,14 @@ def weka_collision_fixture(tmp_path: Path) -> Path:


def _build_cmd(weka_dir: Path, *, duration: int) -> str:
"""Build an aiperf command tuned to drive >=50 distinct sessions.

4 traces x concurrency=3 plus a 6s benchmark window forces continuous
recycle of the small pool; 100+ recycles per trace are typical, which
means hundreds of x_correlation_ids each of which mints a fresh marker.
"""Build an aiperf command that drives many distinct recycled sessions.

4 traces x concurrency=3 over a multi-second benchmark window forces
continuous recycle of the small pool, so each completed session mints a
fresh marker. The exact session count is wall-clock-dependent (it scales
with machine speed); the assertion floor below is set well under what even
a loaded machine produces so the zero-collision contract -- not throughput
-- is what the test gates on.
"""
return f"""
aiperf profile
Expand Down Expand Up @@ -90,10 +93,13 @@ def test_no_marker_collisions_across_large_recycle_run(
Asserts (within PROFILING):
1. Every session has exactly one rid (intra-session marker continuity).
2. ``len(set(rids)) == len(rids)`` across all sessions (zero collisions).
3. >=50 distinct rids observed (smoke check that the run was big enough
to be a meaningful uniqueness test).
3. >=20 distinct rids observed -- a non-vacuity floor, set well below the
session count a loaded machine produces so it does not flake on
throughput. The zero-collision check (2) is the real regression bar:
the pre-fix 33% collision rate is caught with ~99.9% probability even
at 20 sessions, so this floor does not weaken detection.
"""
cmd = _build_cmd(weka_collision_fixture, duration=6)
cmd = _build_cmd(weka_collision_fixture, duration=10)
result = cli.run_sync(cmd, timeout=defaults.timeout)

assert result.exit_code == 0, (
Expand Down Expand Up @@ -129,9 +135,10 @@ def test_no_marker_collisions_across_large_recycle_run(
)
session_rids.append(next(iter(rids_in_session)))

assert len(session_rids) >= 50, (
f"Need >=50 sessions for a meaningful uniqueness test; "
f"got {len(session_rids)}. Increase duration or shrink fixture."
assert len(session_rids) >= 20, (
f"Need >=20 sessions for a non-vacuous uniqueness test; "
f"got {len(session_rids)}. Increase --benchmark-duration or shrink the "
f"fixture if a slower machine is under-producing sessions."
)

# The hard contract: zero duplicates across the entire run.
Expand Down
Loading
Loading