Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions src/aiperf/common/environment.py
Original file line number Diff line number Diff line change
Expand Up @@ -630,6 +630,15 @@ class _RecordSettings(BaseSettings):
default=300.0,
description="Timeout in seconds for processing record results",
)
STRIP_PAYLOAD_BYTES: bool = Field(
default=False,
description="When True, workers omit canonical request payload bytes from "
"RecordContext after the request has been sent. This substantially reduces "
"record-pipeline memory for very large prompts, but disables client-side "
"input tokenization, media counting from request bodies, and raw request "
"payload export. Intended for text-only runs that use server-reported "
"token counts.",
)


class _ServerMetricsSettings(BaseSettings):
Expand Down
11 changes: 9 additions & 2 deletions src/aiperf/workers/inference_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

import orjson

from aiperf.common.environment import Environment
from aiperf.common.mixins import AIPerfLifecycleMixin
from aiperf.common.models import (
ErrorDetails,
Expand Down Expand Up @@ -68,6 +69,7 @@ def __init__(self, model_endpoint: ModelEndpointInfo, service_id: str, **kwargs)
)

# Create endpoint and transport instances
self.strip_record_payload_bytes = Environment.RECORD.STRIP_PAYLOAD_BYTES
EndpointClass = plugins.get_class(
PluginType.ENDPOINT, self.model_endpoint.endpoint.type
)
Expand Down Expand Up @@ -221,7 +223,8 @@ def _enrich_request_record(
hop to the record processor.

The tokeniser and the raw-record exporter both read
``request_info.payload_bytes``; ``osl_mismatch`` reads
``request_info.payload_bytes`` unless
``AIPERF_RECORD_STRIP_PAYLOAD_BYTES`` is enabled; ``osl_mismatch`` reads
``max_tokens``; image/audio/video metrics derive their counts from
the endpoint's single-pass ``extract_payload_inputs`` at
parse-time. ``turns`` is never populated on the attached context
Expand All @@ -238,6 +241,10 @@ def _enrich_request_record(
else None
)

payload_bytes = (
None if self.strip_record_payload_bytes else request_info.payload_bytes
)

record.request_info = RecordContext(
credit_num=request_info.credit_num,
credit_phase=request_info.credit_phase,
Expand All @@ -248,7 +255,7 @@ def _enrich_request_record(
credit_issued_ns=request_info.credit_issued_ns,
agent_depth=request_info.agent_depth,
parent_correlation_id=request_info.parent_correlation_id,
payload_bytes=request_info.payload_bytes,
payload_bytes=payload_bytes,
max_tokens=max_tokens,
audio_duration_seconds=audio_duration_seconds,
cache_bust_marker=request_info.cache_bust_marker,
Expand Down
13 changes: 4 additions & 9 deletions src/aiperf/workers/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -696,15 +696,10 @@ async def _process_credit(self, credit_context: CreditContext) -> None:

try:
# Payload bytes fast path: bypass session/conversation deserialization.
# Skipped for DAG descendants (agent_depth > 0) so their turn_list
# goes through session_manager — FORK children need parent-seeded
# accumulation and all multi-turn children need session state.
context_mode_requires_session = credit_context.credit.agent_depth > 0
if (
self._is_payload_bytes
and self._dataset_client is not None
and not context_mode_requires_session
):
# PAYLOAD_BYTES entries are already full wire payloads for verbatim
# replay, including DAG children/subagents, so no accumulated
# turn_list is needed to construct the current request.
if self._is_payload_bytes and self._dataset_client is not None:
conversation_id = credit_context.credit.conversation_id
turn_index = credit_context.credit.turn_index
payload_bytes = await self._dataset_client.get_payload_bytes(
Expand Down
37 changes: 37 additions & 0 deletions tests/unit/workers/test_inference_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from pytest import param

from aiperf.common.enums import CreditPhase, ModelSelectionStrategy
from aiperf.common.environment import Environment
from aiperf.common.models.dataset_models import Text, Turn
from aiperf.common.models.model_endpoint_info import (
EndpointInfo,
Expand Down Expand Up @@ -442,3 +443,39 @@ def test_enrich_downcasts_to_slim_record_context(
assert not hasattr(ctx, attr), (
f"RecordContext must not carry pre-send field {attr!r}"
)

def test_enrich_can_strip_payload_bytes_for_large_prompt_runs(
self, inference_client, model_endpoint, monkeypatch
):
"""Opt-in memory mode omits huge request payloads from record messages."""
monkeypatch.setattr(Environment.RECORD, "STRIP_PAYLOAD_BYTES", True)
inference_client.strip_record_payload_bytes = (
Environment.RECORD.STRIP_PAYLOAD_BYTES
)

turn = Turn(texts=[Text(contents=["x"])], role="user", model="test-model")
request_info = RequestInfo(
model_endpoint=model_endpoint,
turns=[turn],
turn_index=0,
credit_num=7,
credit_phase=CreditPhase.PROFILING,
x_request_id="rid",
x_correlation_id="cid",
conversation_id="conv",
payload_bytes=b'{"model":"x","messages":[{"role":"user","content":"x"}]}',
)
record = RequestRecord(
request_info=request_info,
start_perf_ns=1000,
timestamp_ns=1000,
end_perf_ns=2000,
)

enriched = inference_client._enrich_request_record(
record=record, request_info=request_info
)

assert enriched.request_info is not None
assert enriched.request_info.payload_bytes is None
assert request_info.payload_bytes is not None
53 changes: 33 additions & 20 deletions tests/unit/workers/test_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -321,27 +321,26 @@ class TestProcessCreditFastPathRouting:
"""Worker's payload-bytes fast path routing.

The fast path (read ``payload_bytes`` directly from the dataset
client, bypass session/conversation deserialisation) is gated on
two conditions:
1. ``self._is_payload_bytes`` is True (mmap format is PAYLOAD_BYTES)
2. ``credit_context.credit.agent_depth == 0`` (not a DAG descendant)

DAG descendants (``agent_depth > 0``) must go through the session
path even under PAYLOAD_BYTES mmap so FORK children can seed their
``UserSession.turn_list`` from the parent session's local state.
client, bypass session/conversation deserialisation) is gated on the
mmap format. PAYLOAD_BYTES entries are already full wire payloads, so
DAG descendants do not need the session path to accumulate context.
"""

def _make_credit_context(
self, agent_depth: int, conversation_id: str = "conv-xyz"
self,
agent_depth: int,
conversation_id: str = "conv-xyz",
turn_index: int = 0,
num_turns: int = 1,
) -> CreditContext:
return CreditContext(
credit=Credit(
id=1,
phase=CreditPhase.PROFILING,
conversation_id=conversation_id,
x_correlation_id="xcorr",
turn_index=0,
num_turns=1,
turn_index=turn_index,
num_turns=num_turns,
issued_at_ns=0,
agent_depth=agent_depth,
),
Expand Down Expand Up @@ -370,12 +369,20 @@ async def test_root_credit_uses_fast_path_when_payload_bytes_mode(
execute.assert_called_once()
session_path.assert_not_called()

async def test_child_credit_forced_to_session_path(self, monkeypatch, mock_worker):
"""agent_depth > 0 must bypass the fast path even when
PAYLOAD_BYTES mmap is active. FORK children need the parent's
session-local turn_list, which is inaccessible from the fast path.
async def test_child_credit_uses_fast_path_when_payload_bytes_mode(
self, monkeypatch, mock_worker
):
"""agent_depth > 0 still uses only the current pre-encoded payload.

PAYLOAD_BYTES is a verbatim replay format: each turn's bytes already
contain the complete request body. Forcing DAG children through the
session path reconstructs every turn in the child conversation and
retains duplicated full-history payloads in worker memory.
"""
mock_client = AsyncMock()
mock_client.get_payload_bytes = AsyncMock(
return_value=b'{"model":"x","messages":[{"role":"user","content":"t2"}]}'
)
mock_worker._dataset_client = mock_client
mock_worker._is_payload_bytes = True

Expand All @@ -384,12 +391,18 @@ async def test_child_credit_forced_to_session_path(self, monkeypatch, mock_worke
monkeypatch.setattr(mock_worker, "_execute_request", execute)
monkeypatch.setattr(mock_worker, "_process_credit_with_session", session_path)

await mock_worker._process_credit(self._make_credit_context(agent_depth=1))
await mock_worker._process_credit(
self._make_credit_context(
agent_depth=1,
conversation_id="child-conv",
turn_index=2,
num_turns=10,
)
)

# Fast path never consulted the dataset client for bytes.
mock_client.get_payload_bytes.assert_not_called()
execute.assert_not_called()
session_path.assert_called_once()
mock_client.get_payload_bytes.assert_called_once_with("child-conv", 2)
execute.assert_called_once()
session_path.assert_not_called()

async def test_non_payload_bytes_mode_always_session_path(
self, monkeypatch, mock_worker
Expand Down
Loading