From b63bcd3a72e8b72ebb8cceead88ae77837a0d24d Mon Sep 17 00:00:00 2001 From: weireweire <20922698+weireweire@users.noreply.github.com> Date: Thu, 11 Jun 2026 15:39:34 +0800 Subject: [PATCH 1/2] Reduce record payload memory for large prompts --- src/aiperf/common/environment.py | 9 +++++ src/aiperf/workers/inference_client.py | 11 ++++-- tests/unit/workers/test_inference_client.py | 37 +++++++++++++++++++++ 3 files changed, 55 insertions(+), 2 deletions(-) diff --git a/src/aiperf/common/environment.py b/src/aiperf/common/environment.py index b88449c6c8..32899a8926 100644 --- a/src/aiperf/common/environment.py +++ b/src/aiperf/common/environment.py @@ -630,6 +630,15 @@ class _RecordSettings(BaseSettings): default=300.0, description="Timeout in seconds for processing record results", ) + STRIP_PAYLOAD_BYTES: bool = Field( + default=False, + description="When True, workers omit canonical request payload bytes from " + "RecordContext after the request has been sent. This substantially reduces " + "record-pipeline memory for very large prompts, but disables client-side " + "input tokenization, media counting from request bodies, and raw request " + "payload export. Intended for text-only runs that use server-reported " + "token counts.", + ) class _ServerMetricsSettings(BaseSettings): diff --git a/src/aiperf/workers/inference_client.py b/src/aiperf/workers/inference_client.py index c6583e4d13..0a8ca225ec 100644 --- a/src/aiperf/workers/inference_client.py +++ b/src/aiperf/workers/inference_client.py @@ -9,6 +9,7 @@ import orjson +from aiperf.common.environment import Environment from aiperf.common.mixins import AIPerfLifecycleMixin from aiperf.common.models import ( ErrorDetails, @@ -68,6 +69,7 @@ def __init__(self, model_endpoint: ModelEndpointInfo, service_id: str, **kwargs) ) # Create endpoint and transport instances + self.strip_record_payload_bytes = Environment.RECORD.STRIP_PAYLOAD_BYTES EndpointClass = plugins.get_class( PluginType.ENDPOINT, self.model_endpoint.endpoint.type ) @@ -221,7 +223,8 @@ def _enrich_request_record( hop to the record processor. The tokeniser and the raw-record exporter both read - ``request_info.payload_bytes``; ``osl_mismatch`` reads + ``request_info.payload_bytes`` unless + ``AIPERF_RECORD_STRIP_PAYLOAD_BYTES`` is enabled; ``osl_mismatch`` reads ``max_tokens``; image/audio/video metrics derive their counts from the endpoint's single-pass ``extract_payload_inputs`` at parse-time. ``turns`` is never populated on the attached context @@ -238,6 +241,10 @@ def _enrich_request_record( else None ) + payload_bytes = ( + None if self.strip_record_payload_bytes else request_info.payload_bytes + ) + record.request_info = RecordContext( credit_num=request_info.credit_num, credit_phase=request_info.credit_phase, @@ -248,7 +255,7 @@ def _enrich_request_record( credit_issued_ns=request_info.credit_issued_ns, agent_depth=request_info.agent_depth, parent_correlation_id=request_info.parent_correlation_id, - payload_bytes=request_info.payload_bytes, + payload_bytes=payload_bytes, max_tokens=max_tokens, audio_duration_seconds=audio_duration_seconds, cache_bust_marker=request_info.cache_bust_marker, diff --git a/tests/unit/workers/test_inference_client.py b/tests/unit/workers/test_inference_client.py index 244a62487d..74f8d4e532 100644 --- a/tests/unit/workers/test_inference_client.py +++ b/tests/unit/workers/test_inference_client.py @@ -8,6 +8,7 @@ from pytest import param from aiperf.common.enums import CreditPhase, ModelSelectionStrategy +from aiperf.common.environment import Environment from aiperf.common.models.dataset_models import Text, Turn from aiperf.common.models.model_endpoint_info import ( EndpointInfo, @@ -442,3 +443,39 @@ def test_enrich_downcasts_to_slim_record_context( assert not hasattr(ctx, attr), ( f"RecordContext must not carry pre-send field {attr!r}" ) + + def test_enrich_can_strip_payload_bytes_for_large_prompt_runs( + self, inference_client, model_endpoint, monkeypatch + ): + """Opt-in memory mode omits huge request payloads from record messages.""" + monkeypatch.setattr(Environment.RECORD, "STRIP_PAYLOAD_BYTES", True) + inference_client.strip_record_payload_bytes = ( + Environment.RECORD.STRIP_PAYLOAD_BYTES + ) + + turn = Turn(texts=[Text(contents=["x"])], role="user", model="test-model") + request_info = RequestInfo( + model_endpoint=model_endpoint, + turns=[turn], + turn_index=0, + credit_num=7, + credit_phase=CreditPhase.PROFILING, + x_request_id="rid", + x_correlation_id="cid", + conversation_id="conv", + payload_bytes=b'{"model":"x","messages":[{"role":"user","content":"x"}]}', + ) + record = RequestRecord( + request_info=request_info, + start_perf_ns=1000, + timestamp_ns=1000, + end_perf_ns=2000, + ) + + enriched = inference_client._enrich_request_record( + record=record, request_info=request_info + ) + + assert enriched.request_info is not None + assert enriched.request_info.payload_bytes is None + assert request_info.payload_bytes is not None From 3bcd555dfedc18899c8c1465c3ad8d3c82307192 Mon Sep 17 00:00:00 2001 From: weireweire <20922698+weireweire@users.noreply.github.com> Date: Thu, 11 Jun 2026 17:47:29 +0800 Subject: [PATCH 2/2] Avoid session reconstruction for payload-byte turns --- src/aiperf/workers/worker.py | 13 +++----- tests/unit/workers/test_worker.py | 53 +++++++++++++++++++------------ 2 files changed, 37 insertions(+), 29 deletions(-) diff --git a/src/aiperf/workers/worker.py b/src/aiperf/workers/worker.py index 695fd0045b..9f560f4d32 100644 --- a/src/aiperf/workers/worker.py +++ b/src/aiperf/workers/worker.py @@ -696,15 +696,10 @@ async def _process_credit(self, credit_context: CreditContext) -> None: try: # Payload bytes fast path: bypass session/conversation deserialization. - # Skipped for DAG descendants (agent_depth > 0) so their turn_list - # goes through session_manager — FORK children need parent-seeded - # accumulation and all multi-turn children need session state. - context_mode_requires_session = credit_context.credit.agent_depth > 0 - if ( - self._is_payload_bytes - and self._dataset_client is not None - and not context_mode_requires_session - ): + # PAYLOAD_BYTES entries are already full wire payloads for verbatim + # replay, including DAG children/subagents, so no accumulated + # turn_list is needed to construct the current request. + if self._is_payload_bytes and self._dataset_client is not None: conversation_id = credit_context.credit.conversation_id turn_index = credit_context.credit.turn_index payload_bytes = await self._dataset_client.get_payload_bytes( diff --git a/tests/unit/workers/test_worker.py b/tests/unit/workers/test_worker.py index e931000208..de29d8c1df 100644 --- a/tests/unit/workers/test_worker.py +++ b/tests/unit/workers/test_worker.py @@ -321,18 +321,17 @@ class TestProcessCreditFastPathRouting: """Worker's payload-bytes fast path routing. The fast path (read ``payload_bytes`` directly from the dataset - client, bypass session/conversation deserialisation) is gated on - two conditions: - 1. ``self._is_payload_bytes`` is True (mmap format is PAYLOAD_BYTES) - 2. ``credit_context.credit.agent_depth == 0`` (not a DAG descendant) - - DAG descendants (``agent_depth > 0``) must go through the session - path even under PAYLOAD_BYTES mmap so FORK children can seed their - ``UserSession.turn_list`` from the parent session's local state. + client, bypass session/conversation deserialisation) is gated on the + mmap format. PAYLOAD_BYTES entries are already full wire payloads, so + DAG descendants do not need the session path to accumulate context. """ def _make_credit_context( - self, agent_depth: int, conversation_id: str = "conv-xyz" + self, + agent_depth: int, + conversation_id: str = "conv-xyz", + turn_index: int = 0, + num_turns: int = 1, ) -> CreditContext: return CreditContext( credit=Credit( @@ -340,8 +339,8 @@ def _make_credit_context( phase=CreditPhase.PROFILING, conversation_id=conversation_id, x_correlation_id="xcorr", - turn_index=0, - num_turns=1, + turn_index=turn_index, + num_turns=num_turns, issued_at_ns=0, agent_depth=agent_depth, ), @@ -370,12 +369,20 @@ async def test_root_credit_uses_fast_path_when_payload_bytes_mode( execute.assert_called_once() session_path.assert_not_called() - async def test_child_credit_forced_to_session_path(self, monkeypatch, mock_worker): - """agent_depth > 0 must bypass the fast path even when - PAYLOAD_BYTES mmap is active. FORK children need the parent's - session-local turn_list, which is inaccessible from the fast path. + async def test_child_credit_uses_fast_path_when_payload_bytes_mode( + self, monkeypatch, mock_worker + ): + """agent_depth > 0 still uses only the current pre-encoded payload. + + PAYLOAD_BYTES is a verbatim replay format: each turn's bytes already + contain the complete request body. Forcing DAG children through the + session path reconstructs every turn in the child conversation and + retains duplicated full-history payloads in worker memory. """ mock_client = AsyncMock() + mock_client.get_payload_bytes = AsyncMock( + return_value=b'{"model":"x","messages":[{"role":"user","content":"t2"}]}' + ) mock_worker._dataset_client = mock_client mock_worker._is_payload_bytes = True @@ -384,12 +391,18 @@ async def test_child_credit_forced_to_session_path(self, monkeypatch, mock_worke monkeypatch.setattr(mock_worker, "_execute_request", execute) monkeypatch.setattr(mock_worker, "_process_credit_with_session", session_path) - await mock_worker._process_credit(self._make_credit_context(agent_depth=1)) + await mock_worker._process_credit( + self._make_credit_context( + agent_depth=1, + conversation_id="child-conv", + turn_index=2, + num_turns=10, + ) + ) - # Fast path never consulted the dataset client for bytes. - mock_client.get_payload_bytes.assert_not_called() - execute.assert_not_called() - session_path.assert_called_once() + mock_client.get_payload_bytes.assert_called_once_with("child-conv", 2) + execute.assert_called_once() + session_path.assert_not_called() async def test_non_payload_bytes_mode_always_session_path( self, monkeypatch, mock_worker