From 0d8a5c5dc79be8475412e33eb4db6df16ffeb4ec Mon Sep 17 00:00:00 2001 From: Ishan Dhanani Date: Tue, 16 Jun 2026 10:18:51 +0000 Subject: [PATCH] feat(adaptive): add agent context propagation Signed-off-by: Ishan Dhanani --- .../adaptive/src/agent_context_intercept.rs | 76 ++++++++ crates/adaptive/src/config.rs | 47 +++++ crates/adaptive/src/context_helpers.rs | 30 +++ crates/adaptive/src/lib.rs | 25 ++- crates/adaptive/src/plugin_component.rs | 11 ++ crates/adaptive/src/runtime/features.rs | 43 +++- .../unit/agent_context_intercept_tests.rs | 183 ++++++++++++++++++ crates/adaptive/tests/unit/config_tests.rs | 13 ++ .../tests/unit/context_helpers_tests.rs | 55 ++++++ .../tests/unit/plugin_component_tests.rs | 14 +- .../tests/unit/runtime_features_tests.rs | 89 +++++++-- crates/adaptive/tests/unit/runtime_tests.rs | 1 + crates/cli/src/session.rs | 62 +++++- crates/cli/tests/coverage/session_tests.rs | 137 ++++++++++++- crates/node/adaptive.d.ts | 20 ++ crates/node/adaptive.js | 21 ++ crates/node/tests/adaptive_tests.mjs | 9 + crates/node/tests/typed_tests.mjs | 2 + crates/wasm/tests-js/adaptive_tests.mjs | 5 + crates/wasm/wrappers/esm/adaptive.d.ts | 20 ++ crates/wasm/wrappers/esm/adaptive.js | 20 ++ crates/wasm/wrappers/nodejs/adaptive.js | 21 ++ docs/adaptive-plugin/about.mdx | 2 + docs/adaptive-plugin/agent-context.mdx | 60 ++++++ docs/adaptive-plugin/configuration.mdx | 34 +++- .../plugin-configuration-files.mdx | 1 + go/nemo_relay/adaptive.go | 16 ++ go/nemo_relay/adaptive/adaptive.go | 8 + go/nemo_relay/adaptive/optimizer_test.go | 2 + go/nemo_relay/adaptive_test.go | 12 +- python/nemo_relay/adaptive.py | 30 +++ python/nemo_relay/adaptive.pyi | 21 ++ python/tests/test_adaptive.py | 6 + python/tests/test_adaptive_config.py | 2 + 34 files changed, 1057 insertions(+), 41 deletions(-) create mode 100644 crates/adaptive/src/agent_context_intercept.rs create mode 100644 crates/adaptive/tests/unit/agent_context_intercept_tests.rs create mode 100644 docs/adaptive-plugin/agent-context.mdx diff --git a/crates/adaptive/src/agent_context_intercept.rs b/crates/adaptive/src/agent_context_intercept.rs new file mode 100644 index 00000000..847d1e2a --- /dev/null +++ b/crates/adaptive/src/agent_context_intercept.rs @@ -0,0 +1,76 @@ +// SPDX-FileCopyrightText: Copyright (c) 2026, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +// SPDX-License-Identifier: Apache-2.0 + +//! Opt-in request intercept for copying scope-local agent context into LLM requests. + +use std::sync::Arc; + +use nemo_relay::api::llm::LlmRequest; +use nemo_relay::api::runtime::LlmRequestInterceptFn; +use nemo_relay::codec::request::AnnotatedLlmRequest; +use serde_json::Value as Json; + +use crate::config::AgentContextComponentConfig; +use crate::context_helpers::resolve_agent_context; + +/// Opt-in LLM request intercept that injects canonical agent context into the request body. +pub struct AgentContextIntercept { + inject_body_path: String, +} + +impl AgentContextIntercept { + /// Creates a new agent-context request intercept from component config. + pub fn new(config: AgentContextComponentConfig) -> Self { + Self { + inject_body_path: config.inject_body_path, + } + } + + /// Converts this intercept into an [`LlmRequestInterceptFn`] suitable for registration. + pub fn into_request_fn(self) -> LlmRequestInterceptFn { + let inject_body_path = self.inject_body_path; + Arc::new( + move |_name: &str, mut request: LlmRequest, annotated: Option| { + if let Some(agent_context) = resolve_agent_context() { + insert_json_path_if_absent( + &mut request.content, + &inject_body_path, + &agent_context, + ); + } + Ok((request, annotated)) + }, + ) + } +} + +fn insert_json_path_if_absent(root: &mut Json, path: &str, value: &Json) { + let parts = path + .split('.') + .filter(|part| !part.is_empty()) + .collect::>(); + insert_json_parts_if_absent(root, &parts, value); +} + +fn insert_json_parts_if_absent(root: &mut Json, parts: &[&str], value: &Json) { + let Some((head, tail)) = parts.split_first() else { + return; + }; + let Some(object) = root.as_object_mut() else { + return; + }; + if tail.is_empty() { + object + .entry((*head).to_string()) + .or_insert_with(|| value.clone()); + return; + } + let child = object + .entry((*head).to_string()) + .or_insert_with(|| Json::Object(serde_json::Map::new())); + insert_json_parts_if_absent(child, tail, value); +} + +#[cfg(test)] +#[path = "../tests/unit/agent_context_intercept_tests.rs"] +mod tests; diff --git a/crates/adaptive/src/config.rs b/crates/adaptive/src/config.rs index c24bc4e5..9cdd57e1 100644 --- a/crates/adaptive/src/config.rs +++ b/crates/adaptive/src/config.rs @@ -26,6 +26,9 @@ pub struct AdaptiveConfig { /// Built-in LLM hint injection settings. #[serde(default, skip_serializing_if = "Option::is_none")] pub adaptive_hints: Option, + /// Built-in agent context propagation settings. + #[serde(default, skip_serializing_if = "Option::is_none")] + pub agent_context: Option, /// Built-in tool scheduling settings. #[serde(default, skip_serializing_if = "Option::is_none")] pub tool_parallelism: Option, @@ -45,6 +48,7 @@ impl Default for AdaptiveConfig { state: None, telemetry: None, adaptive_hints: None, + agent_context: None, tool_parallelism: None, acg: None, policy: ConfigPolicy::default(), @@ -136,6 +140,30 @@ impl Default for AdaptiveHintsComponentConfig { } } +/// Typed helper for agent context propagation settings. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct AgentContextComponentConfig { + /// Intercept priority. Lower values run first. + #[serde(default = "default_priority")] + pub priority: i32, + /// Whether later request intercepts should be skipped after this one runs. + #[serde(default)] + pub break_chain: bool, + /// JSON path used when injecting request-body agent context. + #[serde(default = "default_agent_context_path")] + pub inject_body_path: String, +} + +impl Default for AgentContextComponentConfig { + fn default() -> Self { + Self { + priority: default_priority(), + break_chain: false, + inject_body_path: default_agent_context_path(), + } + } +} + /// Typed helper for tool parallelism settings. #[derive(Debug, Clone, Serialize, Deserialize)] pub struct ToolParallelismComponentConfig { @@ -200,6 +228,10 @@ fn default_adaptive_hints_path() -> String { "nvext.agent_hints".to_string() } +fn default_agent_context_path() -> String { + "nvext.agent_context".to_string() +} + fn default_tool_parallelism_mode() -> String { "observe_only".to_string() } @@ -240,6 +272,13 @@ nemo_relay::editor_config! { nested: AdaptiveHintsComponentConfig, default: AdaptiveHintsComponentConfig, }, + agent_context => { + label: "agent_context", + kind: Section, + optional: true, + nested: AgentContextComponentConfig, + default: AgentContextComponentConfig, + }, tool_parallelism => { label: "tool_parallelism", kind: Section, @@ -297,6 +336,14 @@ nemo_relay::editor_config! { } } +nemo_relay::editor_config! { + impl AgentContextComponentConfig { + priority => { label: "priority", kind: Integer }, + break_chain => { label: "break_chain", kind: Boolean }, + inject_body_path => { label: "inject_body_path", kind: String }, + } +} + nemo_relay::editor_config! { impl ToolParallelismComponentConfig { priority => { label: "priority", kind: Integer }, diff --git a/crates/adaptive/src/context_helpers.rs b/crates/adaptive/src/context_helpers.rs index 95367c67..a2a732e1 100644 --- a/crates/adaptive/src/context_helpers.rs +++ b/crates/adaptive/src/context_helpers.rs @@ -9,6 +9,7 @@ //! - [`extract_scope_path`]: collects function names from the scope stack for trie lookup //! - [`read_manual_latency_sensitivity`]: walks all scopes for manual `latency_sensitive` annotations //! - [`resolve_agent_id`]: returns the first Agent scope name from the scope stack +//! - [`resolve_agent_context`]: returns the nearest scope-local agent context //! //! All functions are safe to call from sync contexts (intercepts are sync closures). //! They acquire a read lock on the scope stack, which is always fast. @@ -20,11 +21,15 @@ use nemo_relay::api::runtime::current_scope_stack; use nemo_relay::api::scope::ScopeType; +use serde_json::Value as Json; use uuid::Uuid; /// Metadata key path for manual latency sensitivity annotation. pub const LATENCY_SENSITIVITY_POINTER: &str = "/nemo_relay_adaptive/latency_sensitivity"; +/// Metadata key path for the canonical agent context object. +pub const AGENT_CONTEXT_POINTER: &str = "/nemo_relay/agent_context"; + /// Session-local scope identity used to coordinate warm-first cohorts. #[derive(Debug, Clone, Copy, PartialEq, Eq)] pub struct SharedParentScopeIdentity { @@ -169,6 +174,31 @@ pub fn resolve_agent_id() -> Option { .map(|s| s.name.clone()) } +/// Resolves the nearest canonical agent context from the current scope stack. +/// +/// Producers attach this object to scope metadata at +/// `/nemo_relay/agent_context`. Request intercepts read the nearest active +/// value, so child agent scopes override their parent turn context. +/// +/// # Returns +/// A cloned JSON object when one is visible on the current scope stack. +/// Returns `None` when no context exists or the scope stack cannot be read. +pub fn resolve_agent_context() -> Option { + let stack_handle = current_scope_stack(); + let stack = match stack_handle.read() { + Ok(s) => s, + Err(_) => return None, + }; + stack.scopes().iter().rev().find_map(|scope| { + scope + .metadata + .as_ref() + .and_then(|metadata| metadata.pointer(AGENT_CONTEXT_POINTER)) + .filter(|value| value.is_object()) + .cloned() + }) +} + /// Resolves the session-local identity used by warm-first cohort coordination. /// /// The shared parent must come from the parent scope, not the current scope's diff --git a/crates/adaptive/src/lib.rs b/crates/adaptive/src/lib.rs index 9843a2e5..99da4e56 100644 --- a/crates/adaptive/src/lib.rs +++ b/crates/adaptive/src/lib.rs @@ -14,6 +14,7 @@ pub mod acg_component; pub mod acg_learner; pub mod acg_profile; pub mod adaptive_hints_intercept; +pub mod agent_context_intercept; pub mod cache_diagnostics; pub mod config; pub mod context_helpers; @@ -36,12 +37,13 @@ pub mod trie; pub mod types; pub use config::{ - AcgComponentConfig, AdaptiveConfig, AdaptiveHintsComponentConfig, BackendSpec, StateConfig, - TelemetryComponentConfig, ToolParallelismComponentConfig, + AcgComponentConfig, AdaptiveConfig, AdaptiveHintsComponentConfig, AgentContextComponentConfig, + BackendSpec, StateConfig, TelemetryComponentConfig, ToolParallelismComponentConfig, }; pub use context_helpers::{ - LATENCY_SENSITIVITY_POINTER, extract_scope_path, read_manual_latency_sensitivity, - resolve_agent_id, resolve_shared_parent_scope_identity, set_latency_sensitivity, + AGENT_CONTEXT_POINTER, LATENCY_SENSITIVITY_POINTER, extract_scope_path, + read_manual_latency_sensitivity, resolve_agent_context, resolve_agent_id, + resolve_shared_parent_scope_identity, set_latency_sensitivity, }; pub use error::{AdaptiveError, Result}; #[cfg(feature = "redis-backend")] @@ -50,3 +52,18 @@ pub use runtime::features::AdaptiveRuntime; pub use storage::erased::AnyBackend; pub use storage::memory::InMemoryBackend; pub use storage::traits::{StorageBackend, StorageBackendDyn}; + +#[cfg(test)] +pub(crate) mod test_support { + use tokio::sync::{Mutex, MutexGuard}; + + static GLOBAL_RUNTIME_MUTEX: Mutex<()> = Mutex::const_new(()); + + pub(crate) async fn lock_global_runtime() -> MutexGuard<'static, ()> { + GLOBAL_RUNTIME_MUTEX.lock().await + } + + pub(crate) fn blocking_lock_global_runtime() -> MutexGuard<'static, ()> { + GLOBAL_RUNTIME_MUTEX.blocking_lock() + } +} diff --git a/crates/adaptive/src/plugin_component.rs b/crates/adaptive/src/plugin_component.rs index 62ccfd4b..ce17842d 100644 --- a/crates/adaptive/src/plugin_component.rs +++ b/crates/adaptive/src/plugin_component.rs @@ -180,6 +180,7 @@ fn validate_adaptive_plugin_config(plugin_config: &Map) -> Vec) -> Vec Self { + Self { + name: format!("adaptive_{runtime_id}_agent_context_request"), + priority: config.priority, + break_chain: config.break_chain, + config, + } + } +} + +impl AdaptiveFeature for AgentContextFeature { + fn register<'a>( + &'a mut self, + ctx: &'a mut RegistrationContext<'_>, + ) -> Pin> + Send + 'a>> { + Box::pin(async move { + let intercept = AgentContextIntercept::new(self.config.clone()); + ctx.register_llm_request_intercept( + &self.name, + self.priority, + self.break_chain, + intercept.into_request_fn(), + ) + }) + } +} + struct ToolParallelismFeature { name: String, priority: i32, diff --git a/crates/adaptive/tests/unit/agent_context_intercept_tests.rs b/crates/adaptive/tests/unit/agent_context_intercept_tests.rs new file mode 100644 index 00000000..c3be0ddc --- /dev/null +++ b/crates/adaptive/tests/unit/agent_context_intercept_tests.rs @@ -0,0 +1,183 @@ +// SPDX-FileCopyrightText: Copyright (c) 2026, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +// SPDX-License-Identifier: Apache-2.0 + +//! Unit tests for agent context request injection. + +use super::*; + +use nemo_relay::api::runtime::set_thread_scope_stack; +use nemo_relay::api::runtime::{LlmRequestInterceptFn, create_scope_stack}; +use nemo_relay::api::scope::{PopScopeParams, PushScopeParams, ScopeType, pop_scope, push_scope}; +use serde_json::json; + +fn reset_scope_stack() { + set_thread_scope_stack(create_scope_stack()); +} + +#[test] +fn agent_context_intercept_injects_nearest_scope_context() { + reset_scope_stack(); + let parent_context = json!({ + "session_type_id": "codex", + "session_id": "session-1", + "trajectory_id": "session-1:turn:1" + }); + let child_context = json!({ + "session_type_id": "codex", + "session_id": "session-1", + "trajectory_id": "worker-1", + "parent_trajectory_id": "session-1:turn:1" + }); + let parent = push_scope( + PushScopeParams::builder() + .name("turn") + .scope_type(ScopeType::Agent) + .metadata(json!({ "nemo_relay": { "agent_context": parent_context } })) + .build(), + ) + .unwrap(); + let child = push_scope( + PushScopeParams::builder() + .name("subagent:worker-1") + .scope_type(ScopeType::Agent) + .parent(&parent) + .metadata(json!({ "nemo_relay": { "agent_context": child_context } })) + .build(), + ) + .unwrap(); + + let request_fn = + AgentContextIntercept::new(AgentContextComponentConfig::default()).into_request_fn(); + let (request, annotated) = request_fn( + "openai.responses", + LlmRequest { + headers: serde_json::Map::new(), + content: json!({ "model": "test", "nvext": { "agent_hints": { "priority": 1 } } }), + }, + None, + ) + .unwrap(); + + assert_eq!( + request.content["nvext"]["agent_context"], + json!({ + "session_type_id": "codex", + "session_id": "session-1", + "trajectory_id": "worker-1", + "parent_trajectory_id": "session-1:turn:1" + }) + ); + assert_eq!( + request.content["nvext"]["agent_hints"]["priority"], + json!(1) + ); + assert!(annotated.is_none()); + + pop_scope(PopScopeParams::builder().handle_uuid(&child.uuid).build()).unwrap(); + pop_scope(PopScopeParams::builder().handle_uuid(&parent.uuid).build()).unwrap(); + reset_scope_stack(); +} + +#[test] +fn agent_context_intercept_preserves_existing_request_context() { + reset_scope_stack(); + let scope = push_scope( + PushScopeParams::builder() + .name("turn") + .scope_type(ScopeType::Agent) + .metadata(json!({ + "nemo_relay": { + "agent_context": { + "session_type_id": "codex", + "session_id": "session-1", + "trajectory_id": "session-1:turn:1" + } + } + })) + .build(), + ) + .unwrap(); + + let existing = json!({ + "session_type_id": "client", + "session_id": "client-session", + "trajectory_id": "client-trajectory" + }); + let request_fn = + AgentContextIntercept::new(AgentContextComponentConfig::default()).into_request_fn(); + let (request, _) = request_fn( + "openai.responses", + LlmRequest { + headers: serde_json::Map::new(), + content: json!({ "nvext": { "agent_context": existing } }), + }, + None, + ) + .unwrap(); + + assert_eq!( + request.content["nvext"]["agent_context"], + json!({ + "session_type_id": "client", + "session_id": "client-session", + "trajectory_id": "client-trajectory" + }) + ); + + pop_scope(PopScopeParams::builder().handle_uuid(&scope.uuid).build()).unwrap(); + reset_scope_stack(); +} + +#[test] +fn agent_context_intercept_noops_without_scope_context_or_object_body() { + reset_scope_stack(); + let request_fn = + AgentContextIntercept::new(AgentContextComponentConfig::default()).into_request_fn(); + + let (request, _) = request_fn( + "openai.responses", + LlmRequest { + headers: serde_json::Map::new(), + content: json!({ "model": "test" }), + }, + None, + ) + .unwrap(); + assert_eq!(request.content, json!({ "model": "test" })); + + let scope = push_scope( + PushScopeParams::builder() + .name("turn") + .scope_type(ScopeType::Agent) + .metadata(json!({ + "nemo_relay": { + "agent_context": { + "session_type_id": "codex", + "session_id": "session-1", + "trajectory_id": "session-1:turn:1" + } + } + })) + .build(), + ) + .unwrap(); + let (scalar_request, _) = request_fn( + "openai.responses", + LlmRequest { + headers: serde_json::Map::new(), + content: json!("scalar"), + }, + None, + ) + .unwrap(); + assert_eq!(scalar_request.content, json!("scalar")); + + pop_scope(PopScopeParams::builder().handle_uuid(&scope.uuid).build()).unwrap(); + reset_scope_stack(); +} + +#[test] +fn agent_context_intercept_request_fn_type_compiles() { + let _request_fn: LlmRequestInterceptFn = + AgentContextIntercept::new(AgentContextComponentConfig::default()).into_request_fn(); +} diff --git a/crates/adaptive/tests/unit/config_tests.rs b/crates/adaptive/tests/unit/config_tests.rs index 22f8ed6f..63c39e52 100644 --- a/crates/adaptive/tests/unit/config_tests.rs +++ b/crates/adaptive/tests/unit/config_tests.rs @@ -13,6 +13,7 @@ fn test_adaptive_config_defaults() { assert_eq!(config.version, 1); assert!(config.telemetry.is_none()); assert!(config.adaptive_hints.is_none()); + assert!(config.agent_context.is_none()); assert!(config.tool_parallelism.is_none()); assert_eq!( config.policy.unknown_component, @@ -26,6 +27,11 @@ fn test_typed_section_helpers_default() { assert_eq!(adaptive_hints.priority, 100); assert!(adaptive_hints.inject_header); + let agent_context = AgentContextComponentConfig::default(); + assert_eq!(agent_context.priority, 100); + assert!(!agent_context.break_chain); + assert_eq!(agent_context.inject_body_path, "nvext.agent_context"); + let tool_parallelism = ToolParallelismComponentConfig::default(); assert_eq!(tool_parallelism.mode, "observe_only"); } @@ -56,6 +62,7 @@ fn test_adaptive_config_deserialization_applies_field_defaults() { assert!(config.state.is_none()); assert!(config.telemetry.is_none()); assert!(config.adaptive_hints.is_none()); + assert!(config.agent_context.is_none()); assert!(config.tool_parallelism.is_none()); } @@ -67,6 +74,11 @@ fn test_component_configs_deserialize_with_default_helpers() { assert!(adaptive_hints.inject_header); assert_eq!(adaptive_hints.inject_body_path, "nvext.agent_hints"); + let agent_context: AgentContextComponentConfig = serde_json::from_value(json!({})).unwrap(); + assert_eq!(agent_context.priority, 100); + assert!(!agent_context.break_chain); + assert_eq!(agent_context.inject_body_path, "nvext.agent_context"); + let tool_parallelism: ToolParallelismComponentConfig = serde_json::from_value(json!({})).unwrap(); assert_eq!(tool_parallelism.priority, 100); @@ -88,6 +100,7 @@ fn test_adaptive_editor_schema_covers_canonical_options() { "state", "telemetry", "adaptive_hints", + "agent_context", "tool_parallelism", "acg", "policy", diff --git a/crates/adaptive/tests/unit/context_helpers_tests.rs b/crates/adaptive/tests/unit/context_helpers_tests.rs index ced5a039..1b1bdd78 100644 --- a/crates/adaptive/tests/unit/context_helpers_tests.rs +++ b/crates/adaptive/tests/unit/context_helpers_tests.rs @@ -5,6 +5,7 @@ use super::*; use nemo_relay::api::runtime::{create_scope_stack, set_thread_scope_stack}; +use nemo_relay::api::scope::{PopScopeParams, PushScopeParams, ScopeType, pop_scope, push_scope}; #[test] fn test_latency_sensitivity_pointer_is_valid_json_pointer() { @@ -74,10 +75,64 @@ fn test_helpers_return_defaults_when_scope_stack_lock_is_poisoned() { assert!(extract_scope_path().is_empty()); assert_eq!(read_manual_latency_sensitivity(), None); assert_eq!(resolve_agent_id(), None); + assert_eq!(resolve_agent_context(), None); set_thread_scope_stack(create_scope_stack()); } +#[test] +fn test_resolve_agent_context_uses_nearest_scope_metadata() { + set_thread_scope_stack(create_scope_stack()); + let parent = push_scope( + PushScopeParams::builder() + .name("parent") + .scope_type(ScopeType::Agent) + .metadata(serde_json::json!({ + "nemo_relay": { + "agent_context": { + "session_type_id": "codex", + "session_id": "session-1", + "trajectory_id": "session-1:turn:1" + } + } + })) + .build(), + ) + .unwrap(); + let child = push_scope( + PushScopeParams::builder() + .name("child") + .scope_type(ScopeType::Agent) + .parent(&parent) + .metadata(serde_json::json!({ + "nemo_relay": { + "agent_context": { + "session_type_id": "codex", + "session_id": "session-1", + "trajectory_id": "worker-1", + "parent_trajectory_id": "session-1:turn:1" + } + } + })) + .build(), + ) + .unwrap(); + + assert_eq!( + resolve_agent_context(), + Some(serde_json::json!({ + "session_type_id": "codex", + "session_id": "session-1", + "trajectory_id": "worker-1", + "parent_trajectory_id": "session-1:turn:1" + })) + ); + + pop_scope(PopScopeParams::builder().handle_uuid(&child.uuid).build()).unwrap(); + pop_scope(PopScopeParams::builder().handle_uuid(&parent.uuid).build()).unwrap(); + set_thread_scope_stack(create_scope_stack()); +} + #[test] fn test_set_latency_sensitivity_ignores_non_object_metadata() { let stack_handle = current_scope_stack(); diff --git a/crates/adaptive/tests/unit/plugin_component_tests.rs b/crates/adaptive/tests/unit/plugin_component_tests.rs index 03fe3f00..9f29e6fe 100644 --- a/crates/adaptive/tests/unit/plugin_component_tests.rs +++ b/crates/adaptive/tests/unit/plugin_component_tests.rs @@ -5,8 +5,6 @@ use super::*; -use std::sync::{Mutex, OnceLock}; - use nemo_relay::api::llm::LlmRequest; use nemo_relay::api::llm::llm_request_intercepts; use nemo_relay::api::runtime::NemoRelayContextState; @@ -14,14 +12,6 @@ use nemo_relay::api::runtime::global_context; use nemo_relay::plugin::{DiagnosticLevel, UnsupportedBehavior, clear_plugin_configuration}; use nemo_relay::plugin::{Plugin, PluginRegistrationContext, rollback_registrations}; use serde_json::json; -use tokio::sync::Mutex as AsyncMutex; - -static TEST_MUTEX: OnceLock> = OnceLock::new(); -static ASYNC_TEST_MUTEX: AsyncMutex<()> = AsyncMutex::const_new(()); - -fn test_mutex() -> &'static Mutex<()> { - TEST_MUTEX.get_or_init(|| Mutex::new(())) -} fn reset_global() { let _ = clear_plugin_configuration(); @@ -91,7 +81,7 @@ fn validate_adaptive_plugin_config_reports_unknown_fields_and_backend_errors() { #[test] fn register_adaptive_component_is_idempotent_and_deregisters_cleanly() { - let _guard = test_mutex().lock().unwrap(); + let _guard = crate::test_support::blocking_lock_global_runtime(); let _ = clear_plugin_configuration(); let _ = deregister_adaptive_component(); @@ -351,7 +341,7 @@ fn validate_adaptive_plugin_config_reports_component_specific_unknown_fields() { #[tokio::test(flavor = "current_thread")] async fn adaptive_plugin_registers_runtime_and_rolls_back_registration() { - let _guard = ASYNC_TEST_MUTEX.lock().await; + let _guard = crate::test_support::lock_global_runtime().await; reset_global(); let plugin = AdaptivePlugin; diff --git a/crates/adaptive/tests/unit/runtime_features_tests.rs b/crates/adaptive/tests/unit/runtime_features_tests.rs index c6481860..3413e1d5 100644 --- a/crates/adaptive/tests/unit/runtime_features_tests.rs +++ b/crates/adaptive/tests/unit/runtime_features_tests.rs @@ -28,7 +28,6 @@ use nemo_relay::error::FlowError; use nemo_relay::plugin::{ConfigPolicy, DiagnosticLevel, UnsupportedBehavior}; use nemo_relay::plugin::{clear_plugin_configuration, rollback_registrations}; use serde_json::json; -use tokio::sync::Mutex; use crate::acg::profile::{BlockStabilityScore, StabilityClass}; use crate::acg::prompt_ir::SpanId; @@ -42,8 +41,6 @@ use crate::types::plan::{ExecutionPlan, ParallelGroup}; use crate::types::records::RunRecord; use tokio_stream::StreamExt; -static TEST_MUTEX: Mutex<()> = Mutex::const_new(()); - fn reset_global() { let _ = clear_plugin_configuration(); let ctx = global_context(); @@ -427,7 +424,7 @@ async fn adaptive_runtime_new_rejects_invalid_configs_with_joined_errors() { #[tokio::test(flavor = "current_thread")] async fn registration_context_take_event_receiver_only_allows_one_consumer() { - let _lock = TEST_MUTEX.lock().await; + let _lock = crate::test_support::lock_global_runtime().await; reset_global(); let mut runtime = AdaptiveRuntime::new(AdaptiveConfig::default()) @@ -444,7 +441,7 @@ async fn registration_context_take_event_receiver_only_allows_one_consumer() { #[tokio::test(flavor = "current_thread")] async fn telemetry_feature_registers_subscriber_and_starts_drain_task() { - let _lock = TEST_MUTEX.lock().await; + let _lock = crate::test_support::lock_global_runtime().await; reset_global(); let mut runtime = AdaptiveRuntime::new(AdaptiveConfig { @@ -484,7 +481,7 @@ async fn telemetry_feature_registers_subscriber_and_starts_drain_task() { #[tokio::test(flavor = "current_thread")] async fn telemetry_feature_requires_backend() { - let _lock = TEST_MUTEX.lock().await; + let _lock = crate::test_support::lock_global_runtime().await; reset_global(); let mut runtime = AdaptiveRuntime::new(AdaptiveConfig::default()) @@ -506,7 +503,7 @@ async fn telemetry_feature_requires_backend() { #[tokio::test(flavor = "current_thread")] async fn adaptive_hints_feature_registers_request_intercept() { - let _lock = TEST_MUTEX.lock().await; + let _lock = crate::test_support::lock_global_runtime().await; reset_global(); let mut runtime = AdaptiveRuntime::new(AdaptiveConfig::default()) @@ -560,9 +557,73 @@ async fn adaptive_hints_feature_registers_request_intercept() { assert_llm_request_intercept_absent(&name); } +#[tokio::test(flavor = "current_thread")] +async fn agent_context_feature_registers_request_intercept() { + let _lock = crate::test_support::lock_global_runtime().await; + reset_global(); + + let mut runtime = AdaptiveRuntime::new(AdaptiveConfig::default()) + .await + .unwrap(); + let mut feature = AgentContextFeature::new( + AgentContextComponentConfig { + priority: 7, + break_chain: true, + ..AgentContextComponentConfig::default() + }, + Uuid::now_v7(), + ); + let name = feature.name.clone(); + + let mut ctx = RegistrationContext::new(&mut runtime); + feature.register(&mut ctx).await.unwrap(); + assert_llm_request_intercept_registered(&name); + + let scope = nemo_relay::api::scope::push_scope( + nemo_relay::api::scope::PushScopeParams::builder() + .name("turn") + .scope_type(nemo_relay::api::scope::ScopeType::Agent) + .metadata(json!({ + "nemo_relay": { + "agent_context": { + "session_type_id": "codex", + "session_id": "session-1", + "trajectory_id": "session-1:turn:1" + } + } + })) + .build(), + ) + .unwrap(); + + let request = llm_request_intercepts( + "model", + LlmRequest { + headers: serde_json::Map::new(), + content: json!({}), + }, + ) + .unwrap(); + assert_eq!( + request.content["nvext"]["agent_context"]["trajectory_id"], + json!("session-1:turn:1") + ); + + nemo_relay::api::scope::pop_scope( + nemo_relay::api::scope::PopScopeParams::builder() + .handle_uuid(&scope.uuid) + .build(), + ) + .unwrap(); + + let mut registrations = ctx.finish(); + rollback_registrations(&mut registrations); + assert_llm_request_intercept_absent(&name); +} + #[tokio::test(flavor = "current_thread")] async fn tool_parallelism_feature_registers_execution_intercept() { - let _lock = TEST_MUTEX.lock().await; + let _lock = crate::test_support::lock_global_runtime().await; reset_global(); let mut runtime = AdaptiveRuntime::new(AdaptiveConfig::default()) @@ -611,7 +672,7 @@ async fn tool_parallelism_feature_registers_execution_intercept() { #[tokio::test(flavor = "current_thread")] async fn adaptive_runtime_register_survives_hot_cache_seed_failures() { - let _lock = TEST_MUTEX.lock().await; + let _lock = crate::test_support::lock_global_runtime().await; reset_global(); let config = AdaptiveConfig { @@ -653,7 +714,7 @@ async fn adaptive_runtime_register_survives_hot_cache_seed_failures() { #[tokio::test(flavor = "current_thread")] async fn adaptive_runtime_register_is_idempotent_for_active_features() { - let _lock = TEST_MUTEX.lock().await; + let _lock = crate::test_support::lock_global_runtime().await; reset_global(); let mut runtime = AdaptiveRuntime::new(AdaptiveConfig { @@ -678,7 +739,7 @@ async fn adaptive_runtime_register_is_idempotent_for_active_features() { #[tokio::test(flavor = "current_thread")] async fn adaptive_runtime_register_rolls_back_when_telemetry_receiver_is_missing() { - let _lock = TEST_MUTEX.lock().await; + let _lock = crate::test_support::lock_global_runtime().await; reset_global(); let mut runtime = AdaptiveRuntime::new(AdaptiveConfig { @@ -702,7 +763,7 @@ async fn adaptive_runtime_register_rolls_back_when_telemetry_receiver_is_missing #[tokio::test(flavor = "current_thread")] async fn registration_context_registers_all_supported_callback_types() { - let _lock = TEST_MUTEX.lock().await; + let _lock = crate::test_support::lock_global_runtime().await; reset_global(); let mut runtime = AdaptiveRuntime::new(AdaptiveConfig::default()) @@ -811,7 +872,7 @@ async fn adaptive_runtime_helper_methods_cover_report_wait_for_idle_and_feature_ #[tokio::test(flavor = "current_thread")] async fn acg_feature_registers_execution_and_stream_intercepts() { - let _lock = TEST_MUTEX.lock().await; + let _lock = crate::test_support::lock_global_runtime().await; reset_global(); let mut runtime = AdaptiveRuntime::new(AdaptiveConfig::default()) @@ -918,7 +979,7 @@ async fn acg_feature_registers_execution_and_stream_intercepts() { #[tokio::test(flavor = "current_thread")] async fn adaptive_runtime_register_feature_rolls_back_partial_registrations_and_abort_handle() { - let _lock = TEST_MUTEX.lock().await; + let _lock = crate::test_support::lock_global_runtime().await; reset_global(); let mut runtime = AdaptiveRuntime::new(AdaptiveConfig::default()) diff --git a/crates/adaptive/tests/unit/runtime_tests.rs b/crates/adaptive/tests/unit/runtime_tests.rs index 5802a58a..06aec718 100644 --- a/crates/adaptive/tests/unit/runtime_tests.rs +++ b/crates/adaptive/tests/unit/runtime_tests.rs @@ -581,6 +581,7 @@ async fn adaptive_runtime_build_cache_request_facts_keeps_missing_stability_sema #[tokio::test(flavor = "current_thread")] async fn adaptive_runtime_bind_scope_requires_registration_and_passes_through_without_state() { + let _lock = crate::test_support::lock_global_runtime().await; reset_runtime_context(); let mut runtime = AdaptiveRuntime::new(AdaptiveConfig { agent_id: Some("agent-1".to_string()), diff --git a/crates/cli/src/session.rs b/crates/cli/src/session.rs index b944cd55..825d6c7b 100644 --- a/crates/cli/src/session.rs +++ b/crates/cli/src/session.rs @@ -388,15 +388,15 @@ impl SessionManager { let config = self.default_config.session_config_from_headers(headers); self.resolve_start_alias(&mut start, config.clone()).await?; let mut sessions = self.inner.lock().await; + let inferred_agent_kind = alignment::agent_kind_for_gateway_provider(&start.provider); let session_id = start .session_id .clone() .or_else(|| single_active_session_id(&sessions)) - .unwrap_or_else(|| format!("{}-gateway", AgentKind::Gateway.as_str())); + .unwrap_or_else(|| format!("{}-gateway", inferred_agent_kind.as_str())); // Match `start_llm`: when this path creates a brand-new session (real agent's gateway // request beats its SessionStart hook), label the session by the provider so ATIF and // Phoenix scopes carry the agent identity instead of freezing on "gateway". - let inferred_agent_kind = alignment::agent_kind_for_gateway_provider(&start.provider); let created_session = !sessions.contains_key(&session_id); let session = sessions .entry(session_id.clone()) @@ -1038,6 +1038,12 @@ impl Session { owner.subagent_id.as_deref(), owner.status, ); + let call_stack = owner + .subagent_id + .as_ref() + .and_then(|subagent_id| self.subagent_stacks.get(subagent_id)) + .cloned() + .unwrap_or_else(|| stack.clone()); let metadata = merge_metadata( llm_correlation_metadata( start.metadata, @@ -1049,7 +1055,7 @@ impl Session { owner.metadata, ); Ok(GatewayCallPrep { - scope_stack: stack.clone(), + scope_stack: call_stack, session_id: self.session_id.clone(), provider_name: start.provider, request: start.request, @@ -1175,6 +1181,7 @@ impl Session { "turn_source": turn_source, }), ); + let metadata = self.with_agent_context_metadata(metadata, None); let turn_name = self.turn_scope_name(); let scope = push_scope( PushScopeParams::builder() @@ -1214,6 +1221,54 @@ impl Session { ) } + fn with_agent_context_metadata( + &self, + mut metadata: Value, + owner_subagent_id: Option<&str>, + ) -> Value { + let Some(agent_context) = self.agent_context(owner_subagent_id) else { + return metadata; + }; + if !metadata.is_object() { + metadata = merge_metadata(metadata, json!({})); + } + let Some(object) = metadata.as_object_mut() else { + return merge_metadata( + metadata, + json!({ "nemo_relay": { "agent_context": agent_context } }), + ); + }; + let nemo_relay = object.entry("nemo_relay").or_insert_with(|| json!({})); + if !nemo_relay.is_object() { + *nemo_relay = json!({}); + } + let Some(nemo_relay) = nemo_relay.as_object_mut() else { + return metadata; + }; + nemo_relay.insert("agent_context".into(), agent_context); + metadata + } + + fn agent_context(&self, owner_subagent_id: Option<&str>) -> Option { + // Transparent coding-agent traffic can reach the gateway before, or without, a reliable + // SessionStart hook. Provider inference still identifies those sessions; generic gateway + // traffic remains context-free. + if self.agent_kind == AgentKind::Gateway || self.turn_index == 0 { + return None; + } + let root_trajectory_id = format!("{}:turn:{}", self.session_id, self.turn_index); + let mut context = json!({ + "session_type_id": self.agent_kind.as_str(), + "session_id": self.session_id, + "trajectory_id": root_trajectory_id.clone(), + }); + if let Some(subagent_id) = owner_subagent_id { + context["trajectory_id"] = json!(subagent_id); + context["parent_trajectory_id"] = json!(root_trajectory_id); + } + Some(context) + } + async fn end_turn(&mut self, event: SessionEvent) -> Result<(), CliError> { if let Some(subagent_id) = alignment::aliased_turn_subagent_id(&event) { self.close_subagent_scope(&subagent_id, event.payload) @@ -1385,6 +1440,7 @@ impl Session { event.metadata, json!({ "nemo_relay_scope_role": "subagent" }), ); + let metadata = self.with_agent_context_metadata(metadata, Some(&subagent_id)); let subagent_stack = create_scope_stack(); let scope = TASK_SCOPE_STACK .scope(subagent_stack.clone(), async { diff --git a/crates/cli/tests/coverage/session_tests.rs b/crates/cli/tests/coverage/session_tests.rs index cdab66c4..1d031fea 100644 --- a/crates/cli/tests/coverage/session_tests.rs +++ b/crates/cli/tests/coverage/session_tests.rs @@ -272,6 +272,17 @@ fn active_turn_scope(session: &Session) -> &ScopeHandle { .expect("expected active turn scope") } +fn prep_agent_context(prep: &GatewayCallPrep) -> Option { + prep.scope_stack + .read() + .unwrap() + .top() + .metadata + .as_ref() + .and_then(|metadata| metadata.pointer("/nemo_relay/agent_context")) + .cloned() +} + async fn alignment_alias(manager: &SessionManager, session_id: &str) -> Option { manager.alignment.lock().await.alias_for_session(session_id) } @@ -755,6 +766,130 @@ async fn codex_turn_is_agent_scope_with_turn_role_metadata() { ); } +#[tokio::test] +async fn gateway_call_prep_records_agent_context_on_scope_metadata() { + let gateway_only_manager = SessionManager::new(session_test_config()); + let gateway_only = gateway_only_manager + .prepare_gateway_call( + &HeaderMap::new(), + LlmGatewayStart { + session_id: None, + ..llm_start_with_responses_task("ignored", "Inspect the repo.") + }, + ) + .await + .unwrap(); + assert_eq!( + gateway_only.session_id, "codex-gateway", + "provider-inferred coding-agent sessions should not use the generic gateway id" + ); + assert_eq!( + prep_agent_context(&gateway_only), + Some(json!({ + "session_type_id": "codex", + "session_id": "codex-gateway", + "trajectory_id": "codex-gateway:turn:1" + })) + ); + gateway_only_manager + .finish_gateway_call(&gateway_only.session_id, false) + .await; + + let manager = SessionManager::new(session_test_config()); + manager + .apply_events( + &HeaderMap::new(), + vec![NormalizedEvent::AgentStarted(codex_session_event( + "codex-context", + "SessionStart", + json!({}), + ))], + ) + .await + .unwrap(); + + let root = manager + .prepare_gateway_call( + &HeaderMap::new(), + llm_start_with_responses_task("codex-context", "Inspect the repo."), + ) + .await + .unwrap(); + assert_eq!( + prep_agent_context(&root), + Some(json!({ + "session_type_id": "codex", + "session_id": "codex-context", + "trajectory_id": "codex-context:turn:1" + })) + ); + { + let sessions = manager.inner.lock().await; + let turn = active_turn_scope(sessions.get("codex-context").unwrap()); + assert_eq!( + turn.metadata + .as_ref() + .unwrap() + .pointer("/nemo_relay/agent_context"), + Some(&json!({ + "session_type_id": "codex", + "session_id": "codex-context", + "trajectory_id": "codex-context:turn:1" + })) + ); + } + manager.finish_gateway_call("codex-context", false).await; + + manager + .apply_events( + &HeaderMap::new(), + vec![NormalizedEvent::SubagentStarted(SubagentEvent { + session_id: "codex-context".into(), + agent_kind: AgentKind::Codex, + event_name: "SubagentStart".into(), + subagent_id: "worker-1".into(), + payload: json!({}), + metadata: json!({}), + })], + ) + .await + .unwrap(); + let child = manager + .prepare_gateway_call( + &HeaderMap::new(), + LlmGatewayStart { + subagent_id: Some("worker-1".into()), + ..llm_start_with_responses_task("codex-context", "Inspect the repo.") + }, + ) + .await + .unwrap(); + assert_eq!( + prep_agent_context(&child), + Some(json!({ + "session_type_id": "codex", + "session_id": "codex-context", + "trajectory_id": "worker-1", + "parent_trajectory_id": "codex-context:turn:1" + })) + ); + manager.finish_gateway_call("codex-context", false).await; + + let synthetic_manager = SessionManager::new(session_test_config()); + let synthetic = synthetic_manager + .prepare_gateway_call( + &HeaderMap::new(), + LlmGatewayStart { + session_id: None, + provider: "custom".into(), + ..llm_start() + }, + ) + .await + .unwrap(); + assert_eq!(prep_agent_context(&synthetic), None); +} + #[test] fn apply_start_alias_overrides_conflicting_subagent_id() { let mut start = llm_start(); @@ -3801,7 +3936,7 @@ async fn claude_startup_probe_only_session_is_pruned_after_finish() { .await .unwrap(); assert_eq!( - next.session_id, "gateway-gateway", + next.session_id, "codex-gateway", "probe-only sessions must not become the single-active fallback" ); } diff --git a/crates/node/adaptive.d.ts b/crates/node/adaptive.d.ts index a2ad4d30..ce5426d3 100644 --- a/crates/node/adaptive.d.ts +++ b/crates/node/adaptive.d.ts @@ -31,6 +31,13 @@ export interface AdaptiveHintsConfig { inject_body_path?: string; } +/** Built-in agent context propagation settings. */ +export interface AgentContextConfig { + priority?: number; + break_chain?: boolean; + inject_body_path?: string; +} + /** Built-in adaptive tool scheduling settings. */ export interface ToolParallelismConfig { priority?: number; @@ -59,6 +66,7 @@ export interface Config { state?: StateConfig; telemetry?: TelemetryConfig; adaptive_hints?: AdaptiveHintsConfig; + agent_context?: AgentContextConfig; tool_parallelism?: ToolParallelismConfig; acg?: AcgConfig; policy?: ConfigPolicy; @@ -220,6 +228,18 @@ export declare function telemetryConfig(config?: TelemetryConfig): TelemetryConf * of the chain, and writes hints to `nvext.agent_hints`. */ export declare function adaptiveHintsConfig(config?: AdaptiveHintsConfig): AdaptiveHintsConfig; +/** + * Create agent-context propagation settings with defaults applied. + * + * Merges caller-supplied overrides onto the default config used by the + * agent-context request intercept. + * + * @param config - Partial agent-context settings to override. + * @returns A normalized agent-context config object. + * @remarks By default the injector runs at priority `100`, preserves the rest + * of the chain, and writes context to `nvext.agent_context`. + */ +export declare function agentContextConfig(config?: AgentContextConfig): AgentContextConfig; /** * Create adaptive tool-parallelism settings with defaults applied. * diff --git a/crates/node/adaptive.js b/crates/node/adaptive.js index 8392f46a..8a53d82e 100644 --- a/crates/node/adaptive.js +++ b/crates/node/adaptive.js @@ -101,6 +101,26 @@ function adaptiveHintsConfig(config = {}) { }; } +/** + * Create agent-context propagation settings with defaults applied. + * + * Merges caller-supplied overrides onto the default config used by the + * agent-context request intercept. + * + * @param {object} [config={}] - Partial agent-context settings to override. + * @returns {object} A normalized agent-context config object. + * @remarks By default the injector runs at priority `100`, preserves the rest + * of the chain, and writes context to `nvext.agent_context`. + */ +function agentContextConfig(config = {}) { + return { + priority: 100, + break_chain: false, + inject_body_path: 'nvext.agent_context', + ...config, + }; +} + /** * Create adaptive tool-parallelism settings with defaults applied. * @@ -203,6 +223,7 @@ module.exports = { redisBackend, telemetryConfig, adaptiveHintsConfig, + agentContextConfig, toolParallelismConfig, acgConfig, ComponentSpec, diff --git a/crates/node/tests/adaptive_tests.mjs b/crates/node/tests/adaptive_tests.mjs index 540590fa..d22bca4d 100644 --- a/crates/node/tests/adaptive_tests.mjs +++ b/crates/node/tests/adaptive_tests.mjs @@ -140,6 +140,7 @@ describe('core plugins', () => { backend: adaptive.inMemoryBackend(), }, adaptive_hints: adaptive.adaptiveHintsConfig(), + agent_context: adaptive.agentContextConfig(), }), plugin.ComponentSpec(pluginKind, { priority: 17, @@ -160,6 +161,14 @@ describe('core plugins', () => { plugin.deregister(pluginKind); } }); + + it('builds an agent context config with defaults', () => { + assert.deepEqual(adaptive.agentContextConfig(), { + priority: 100, + break_chain: false, + inject_body_path: 'nvext.agent_context', + }); + }); }); describe('adaptive helpers', () => { diff --git a/crates/node/tests/typed_tests.mjs b/crates/node/tests/typed_tests.mjs index 2e2f9b65..fcaa0913 100644 --- a/crates/node/tests/typed_tests.mjs +++ b/crates/node/tests/typed_tests.mjs @@ -209,6 +209,7 @@ describe('adaptive typed helpers', () => { learners: ['latency_sensitivity'], }); config.adaptive_hints = adaptive.adaptiveHintsConfig(); + config.agent_context = adaptive.agentContextConfig(); config.tool_parallelism = adaptive.toolParallelismConfig(); config.acg = adaptive.acgConfig(); @@ -232,6 +233,7 @@ describe('adaptive typed helpers', () => { learners: ['latency_sensitivity'], }), adaptive_hints: adaptive.adaptiveHintsConfig(), + agent_context: adaptive.agentContextConfig(), tool_parallelism: adaptive.toolParallelismConfig(), acg: adaptive.acgConfig({ provider: 'openai', diff --git a/crates/wasm/tests-js/adaptive_tests.mjs b/crates/wasm/tests-js/adaptive_tests.mjs index 7f7fdbd4..50b477c8 100644 --- a/crates/wasm/tests-js/adaptive_tests.mjs +++ b/crates/wasm/tests-js/adaptive_tests.mjs @@ -11,6 +11,11 @@ test('WebAssembly adaptive wrappers expose default config and helper defaults', version: 1, }); assert.equal(adaptive.adaptiveHintsConfig().priority, 100); + assert.deepEqual(adaptive.agentContextConfig(), { + priority: 100, + break_chain: false, + inject_body_path: 'nvext.agent_context', + }); assert.equal(adaptive.toolParallelismConfig().mode, 'observe_only'); assert.deepEqual(adaptive.acgConfig(), { provider: 'passthrough', diff --git a/crates/wasm/wrappers/esm/adaptive.d.ts b/crates/wasm/wrappers/esm/adaptive.d.ts index 96cadb5c..d2e39aac 100644 --- a/crates/wasm/wrappers/esm/adaptive.d.ts +++ b/crates/wasm/wrappers/esm/adaptive.d.ts @@ -32,6 +32,13 @@ export interface AdaptiveHintsConfig { inject_body_path?: string; } +/** Built-in agent context propagation settings. */ +export interface AgentContextConfig { + priority?: number; + break_chain?: boolean; + inject_body_path?: string; +} + /** Built-in adaptive tool scheduling settings. */ export interface ToolParallelismConfig { priority?: number; @@ -60,6 +67,7 @@ export interface Config { state?: StateConfig; telemetry?: TelemetryConfig; adaptive_hints?: AdaptiveHintsConfig; + agent_context?: AgentContextConfig; tool_parallelism?: ToolParallelismConfig; acg?: AcgConfig; policy?: ConfigPolicy; @@ -221,6 +229,18 @@ export declare function telemetryConfig(config?: TelemetryConfig): TelemetryConf * of the chain, and writes hints to `nvext.agent_hints`. */ export declare function adaptiveHintsConfig(config?: AdaptiveHintsConfig): AdaptiveHintsConfig; +/** + * Create agent-context propagation settings with defaults applied. + * + * Merges caller-supplied overrides onto the default config used by the + * agent-context request intercept. + * + * @param config - Partial agent-context settings to override. + * @returns A normalized agent-context config object. + * @remarks By default the injector runs at priority `100`, preserves the rest + * of the chain, and writes context to `nvext.agent_context`. + */ +export declare function agentContextConfig(config?: AgentContextConfig): AgentContextConfig; /** * Create adaptive tool-parallelism settings with defaults applied. * diff --git a/crates/wasm/wrappers/esm/adaptive.js b/crates/wasm/wrappers/esm/adaptive.js index 0b74de50..2510964d 100644 --- a/crates/wasm/wrappers/esm/adaptive.js +++ b/crates/wasm/wrappers/esm/adaptive.js @@ -104,6 +104,26 @@ export function adaptiveHintsConfig(config = {}) { }; } +/** + * Create agent-context propagation settings with defaults applied. + * + * Merges caller-supplied overrides onto the default config used by the + * agent-context request intercept. + * + * @param {object} [config={}] - Partial agent-context settings to override. + * @returns {object} A normalized agent-context config object. + * @remarks By default the injector runs at priority `100`, preserves the rest + * of the chain, and writes context to `nvext.agent_context`. + */ +export function agentContextConfig(config = {}) { + return { + priority: 100, + break_chain: false, + inject_body_path: 'nvext.agent_context', + ...config, + }; +} + /** * Create adaptive tool-parallelism settings with defaults applied. * diff --git a/crates/wasm/wrappers/nodejs/adaptive.js b/crates/wasm/wrappers/nodejs/adaptive.js index 1f22aa3c..0a7f4dae 100644 --- a/crates/wasm/wrappers/nodejs/adaptive.js +++ b/crates/wasm/wrappers/nodejs/adaptive.js @@ -101,6 +101,26 @@ function adaptiveHintsConfig(config = {}) { }; } +/** + * Create agent-context propagation settings with defaults applied. + * + * Merges caller-supplied overrides onto the default config used by the + * agent-context request intercept. + * + * @param {object} [config={}] - Partial agent-context settings to override. + * @returns {object} A normalized agent-context config object. + * @remarks By default the injector runs at priority `100`, preserves the rest + * of the chain, and writes context to `nvext.agent_context`. + */ +function agentContextConfig(config = {}) { + return { + priority: 100, + break_chain: false, + inject_body_path: 'nvext.agent_context', + ...config, + }; +} + /** * Create adaptive tool-parallelism settings with defaults applied. * @@ -202,6 +222,7 @@ exports.inMemoryBackend = inMemoryBackend; exports.redisBackend = redisBackend; exports.telemetryConfig = telemetryConfig; exports.adaptiveHintsConfig = adaptiveHintsConfig; +exports.agentContextConfig = agentContextConfig; exports.toolParallelismConfig = toolParallelismConfig; exports.acgConfig = acgConfig; exports.ComponentSpec = ComponentSpec; diff --git a/docs/adaptive-plugin/about.mdx b/docs/adaptive-plugin/about.mdx index b053c8dc..19be767d 100644 --- a/docs/adaptive-plugin/about.mdx +++ b/docs/adaptive-plugin/about.mdx @@ -51,6 +51,8 @@ If instrumentation is not in place yet, start with cache planning accomplishes. - [Adaptive Hints](/adaptive-plugin/adaptive-hints) explains request hint injection and how downstream model paths can consume the hints. +- [Agent Context](/adaptive-plugin/agent-context) explains request context propagation from + scope metadata to provider-compatible request bodies. State, telemetry, tool parallelism, and policy are whole-plugin configuration areas. They are documented on [Adaptive Configuration](/adaptive-plugin/configuration) rather diff --git a/docs/adaptive-plugin/agent-context.mdx b/docs/adaptive-plugin/agent-context.mdx new file mode 100644 index 00000000..f3f0a58c --- /dev/null +++ b/docs/adaptive-plugin/agent-context.mdx @@ -0,0 +1,60 @@ +--- +title: "Agent Context" +description: "Propagate scope-local agent identity into managed LLM requests." +position: 5 +--- +{/* SPDX-FileCopyrightText: Copyright (c) 2026, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +SPDX-License-Identifier: Apache-2.0 */} + + +Use Agent Context when managed LLM requests need the active agent/session +identity copied from NeMo Relay scope metadata into a provider-compatible +request body. + +Agent Context registers as an LLM request intercept. Producers attach the +canonical context object to scope metadata at +`/nemo_relay/agent_context`; the intercept copies the nearest active value to +the configured request body path when the request does not already contain a +value there. + +The canonical context object uses these fields: + +| Field | Notes | +|---|---| +| `session_type_id` | Agent or harness kind, such as `codex` or `cursor`. | +| `session_id` | Stable root session identifier. | +| `trajectory_id` | Active turn or child trajectory identifier. | +| `parent_trajectory_id` | Optional parent trajectory for subagents. | + +## `plugins.toml` Example + +```toml +version = 1 + +[[components]] +kind = "adaptive" +enabled = true + +[components.config] +version = 1 + +[components.config.agent_context] +priority = 100 +break_chain = false +inject_body_path = "nvext.agent_context" +``` + +This configuration copies scope-local agent context into outgoing managed LLM +requests while allowing later request intercepts to continue running. + +## Fields + +| Field | Default | Notes | +|---|---|---| +| `priority` | `100` | Request intercept priority. Lower values run earlier. | +| `break_chain` | `false` | Whether this intercept stops later request intercepts. | +| `inject_body_path` | `nvext.agent_context` | JSON body path for request-body context injection. | + +Enable this only on provider paths that can consume or safely ignore the +configured body field. For Dynamo OpenAI-compatible endpoints, use +`nvext.agent_context`. diff --git a/docs/adaptive-plugin/configuration.mdx b/docs/adaptive-plugin/configuration.mdx index 55621aef..18cd5794 100644 --- a/docs/adaptive-plugin/configuration.mdx +++ b/docs/adaptive-plugin/configuration.mdx @@ -30,13 +30,15 @@ The top-level adaptive object contains: | `state` | Adaptive state backend. | | `telemetry` | Adaptive subscriber and learner settings. | | `adaptive_hints` | Request hint-injection behavior. | +| `agent_context` | Scope-local agent context propagation behavior. | | `tool_parallelism` | Tool scheduling observation or scheduling behavior. | | `acg` | Adaptive Cache Governor prompt-cache planning. | | `policy` | Adaptive-local handling for unknown fields and unsupported values. | -The requested area pages cover [Adaptive Cache Governor (ACG)](/adaptive-plugin/acg) and -[Adaptive Hints](/adaptive-plugin/adaptive-hints). State, telemetry, tool parallelism, and -policy remain whole-plugin settings: +The requested area pages cover [Adaptive Cache Governor (ACG)](/adaptive-plugin/acg), +[Adaptive Hints](/adaptive-plugin/adaptive-hints), and +[Agent Context](/adaptive-plugin/agent-context). State, telemetry, tool +parallelism, and policy remain whole-plugin settings: - Use `state.backend.kind = "in_memory"` for local experiments. - Use Redis state when learned state must survive restarts or be shared across @@ -76,6 +78,11 @@ break_chain = false inject_header = true inject_body_path = "nvext.agent_hints" +[components.config.agent_context] +priority = 100 +break_chain = false +inject_body_path = "nvext.agent_context" + [components.config.acg] provider = "passthrough" observation_window = 100 @@ -116,6 +123,9 @@ adaptive_config = nemo_relay.adaptive.AdaptiveConfig( adaptive_hints=nemo_relay.adaptive.AdaptiveHintsConfig( inject_body_path="nvext.agent_hints", ), + agent_context=nemo_relay.adaptive.AgentContextConfig( + inject_body_path="nvext.agent_context", + ), acg=nemo_relay.adaptive.AcgConfig(provider="passthrough"), ) @@ -147,6 +157,9 @@ adaptiveConfig.tool_parallelism = adaptive.toolParallelismConfig({ mode: "observ adaptiveConfig.adaptive_hints = adaptive.adaptiveHintsConfig({ inject_body_path: "nvext.agent_hints", }); +adaptiveConfig.agent_context = adaptive.agentContextConfig({ + inject_body_path: "nvext.agent_context", +}); adaptiveConfig.acg = adaptive.acgConfig({ provider: "passthrough" }); const pluginConfig = plugin.defaultConfig(); @@ -172,6 +185,7 @@ use nemo_relay_adaptive::{ TelemetryComponentConfig, ToolParallelismComponentConfig, AdaptiveHintsComponentConfig, + AgentContextComponentConfig, AcgComponentConfig, }; @@ -189,6 +203,10 @@ adaptive.adaptive_hints = Some(AdaptiveHintsComponentConfig { inject_body_path: "nvext.agent_hints".into(), ..AdaptiveHintsComponentConfig::default() }); +adaptive.agent_context = Some(AgentContextComponentConfig { + inject_body_path: "nvext.agent_context".into(), + ..AgentContextComponentConfig::default() +}); adaptive.acg = Some(AcgComponentConfig { provider: "passthrough".into(), ..AcgComponentConfig::default() @@ -229,6 +247,9 @@ adaptive_config = nemo_relay.adaptive.AdaptiveConfig( adaptive_hints=nemo_relay.adaptive.AdaptiveHintsConfig( inject_body_path="nvext.agent_hints", ), + agent_context=nemo_relay.adaptive.AgentContextConfig( + inject_body_path="nvext.agent_context", + ), acg=nemo_relay.adaptive.AcgConfig(provider="passthrough"), ) @@ -252,7 +273,8 @@ activating adaptive behavior from Node.js. ```rust use nemo_relay_adaptive::{ AcgComponentConfig, AdaptiveConfig, AdaptiveHintsComponentConfig, AdaptiveRuntime, - BackendSpec, StateConfig, TelemetryComponentConfig, ToolParallelismComponentConfig, + AgentContextComponentConfig, BackendSpec, StateConfig, TelemetryComponentConfig, + ToolParallelismComponentConfig, }; let mut adaptive = AdaptiveConfig::default(); @@ -269,6 +291,10 @@ adaptive.adaptive_hints = Some(AdaptiveHintsComponentConfig { inject_body_path: "nvext.agent_hints".into(), ..AdaptiveHintsComponentConfig::default() }); +adaptive.agent_context = Some(AgentContextComponentConfig { + inject_body_path: "nvext.agent_context".into(), + ..AgentContextComponentConfig::default() +}); adaptive.acg = Some(AcgComponentConfig { provider: "passthrough".into(), ..AcgComponentConfig::default() diff --git a/docs/build-plugins/plugin-configuration-files.mdx b/docs/build-plugins/plugin-configuration-files.mdx index 803cb7e0..3be80553 100644 --- a/docs/build-plugins/plugin-configuration-files.mdx +++ b/docs/build-plugins/plugin-configuration-files.mdx @@ -307,3 +307,4 @@ Use the component guides for field-level configuration: - [Adaptive Configuration](/adaptive-plugin/configuration) - [Adaptive Cache Governor (ACG)](/adaptive-plugin/acg) - [Adaptive Hints](/adaptive-plugin/adaptive-hints) +- [Agent Context](/adaptive-plugin/agent-context) diff --git a/go/nemo_relay/adaptive.go b/go/nemo_relay/adaptive.go index eb7ce81f..65f5e6f6 100644 --- a/go/nemo_relay/adaptive.go +++ b/go/nemo_relay/adaptive.go @@ -15,6 +15,7 @@ type AdaptiveConfig struct { State *AdaptiveStateConfig `json:"state,omitempty"` Telemetry *TelemetryConfig `json:"telemetry,omitempty"` AdaptiveHints *AdaptiveHintsConfig `json:"adaptive_hints,omitempty"` + AgentContext *AgentContextConfig `json:"agent_context,omitempty"` ToolParallelism *ToolParallelismConfig `json:"tool_parallelism,omitempty"` Acg *AcgConfig `json:"acg,omitempty"` Policy *ConfigPolicy `json:"policy,omitempty"` @@ -45,6 +46,13 @@ type AdaptiveHintsConfig struct { InjectBodyPath string `json:"inject_body_path,omitempty"` } +// AgentContextConfig configures built-in agent context propagation. +type AgentContextConfig struct { + Priority int32 `json:"priority,omitempty"` + BreakChain bool `json:"break_chain,omitempty"` + InjectBodyPath string `json:"inject_body_path,omitempty"` +} + // ToolParallelismConfig configures built-in adaptive tool scheduling. type ToolParallelismConfig struct { Priority int32 `json:"priority,omitempty"` @@ -110,6 +118,14 @@ func NewAdaptiveHintsConfig() AdaptiveHintsConfig { } } +// NewAgentContextConfig returns default agent context propagation settings. +func NewAgentContextConfig() AgentContextConfig { + return AgentContextConfig{ + Priority: 100, + InjectBodyPath: "nvext.agent_context", + } +} + // NewToolParallelismConfig returns default adaptive tool scheduling settings. func NewToolParallelismConfig() ToolParallelismConfig { return ToolParallelismConfig{ diff --git a/go/nemo_relay/adaptive/adaptive.go b/go/nemo_relay/adaptive/adaptive.go index 49306422..8dae8f53 100644 --- a/go/nemo_relay/adaptive/adaptive.go +++ b/go/nemo_relay/adaptive/adaptive.go @@ -43,6 +43,9 @@ type TelemetryConfig = nemo_relay.TelemetryConfig // AdaptiveHintsConfig configures built-in adaptive hint injection. type AdaptiveHintsConfig = nemo_relay.AdaptiveHintsConfig +// AgentContextConfig configures built-in agent context propagation. +type AgentContextConfig = nemo_relay.AgentContextConfig + // ToolParallelismConfig configures built-in adaptive tool scheduling. type ToolParallelismConfig = nemo_relay.ToolParallelismConfig @@ -98,6 +101,11 @@ func NewAdaptiveHintsConfig() AdaptiveHintsConfig { return nemo_relay.NewAdaptiveHintsConfig() } +// NewAgentContextConfig returns default agent context propagation settings. +func NewAgentContextConfig() AgentContextConfig { + return nemo_relay.NewAgentContextConfig() +} + // NewToolParallelismConfig returns default adaptive tool scheduling settings. func NewToolParallelismConfig() ToolParallelismConfig { return nemo_relay.NewToolParallelismConfig() diff --git a/go/nemo_relay/adaptive/optimizer_test.go b/go/nemo_relay/adaptive/optimizer_test.go index 7d18e321..22a94d04 100644 --- a/go/nemo_relay/adaptive/optimizer_test.go +++ b/go/nemo_relay/adaptive/optimizer_test.go @@ -20,6 +20,8 @@ func TestConfigBuilders(t *testing.T) { config.Telemetry = &telemetry adaptiveHints := NewAdaptiveHintsConfig() config.AdaptiveHints = &adaptiveHints + agentContext := NewAgentContextConfig() + config.AgentContext = &agentContext toolParallelism := NewToolParallelismConfig() config.ToolParallelism = &toolParallelism acg := NewAcgConfig() diff --git a/go/nemo_relay/adaptive_test.go b/go/nemo_relay/adaptive_test.go index fba86b6f..385a5bbf 100644 --- a/go/nemo_relay/adaptive_test.go +++ b/go/nemo_relay/adaptive_test.go @@ -19,7 +19,7 @@ func TestNewAdaptiveConfigDefaults(t *testing.T) { if config.Version != 1 { t.Fatalf("expected version 1, got %d", config.Version) } - if config.Telemetry != nil || config.AdaptiveHints != nil || config.ToolParallelism != nil || config.Acg != nil { + if config.Telemetry != nil || config.AdaptiveHints != nil || config.AgentContext != nil || config.ToolParallelism != nil || config.Acg != nil { t.Fatal("expected adaptive feature sections to default to nil") } } @@ -34,6 +34,9 @@ func TestAdaptiveHelperConstructors(t *testing.T) { hints := NewAdaptiveHintsConfig() assertAdaptiveHintsDefaults(t, hints) + agentContext := NewAgentContextConfig() + assertAgentContextDefaults(t, agentContext) + parallelism := NewToolParallelismConfig() assertToolParallelismDefaults(t, parallelism) @@ -80,6 +83,13 @@ func assertAdaptiveHintsDefaults(t *testing.T, hints AdaptiveHintsConfig) { } } +func assertAgentContextDefaults(t *testing.T, config AgentContextConfig) { + t.Helper() + if config.Priority != 100 || config.InjectBodyPath != "nvext.agent_context" { + t.Fatalf("unexpected agent context defaults: %#v", config) + } +} + func assertToolParallelismDefaults(t *testing.T, parallelism ToolParallelismConfig) { t.Helper() if parallelism.Priority != 100 || parallelism.Mode != "observe_only" { diff --git a/python/nemo_relay/adaptive.py b/python/nemo_relay/adaptive.py index e4f37545..728a64b7 100644 --- a/python/nemo_relay/adaptive.py +++ b/python/nemo_relay/adaptive.py @@ -178,6 +178,32 @@ def to_dict(self) -> JsonObject: ) +@dataclass(slots=True) +class AgentContextConfig: + """Built-in agent context propagation settings. + + Args: + priority: Intercept priority. Lower values run first. + break_chain: Whether to stop later request intercepts after this one. + inject_body_path: JSON body path used when injecting request-body + agent context. + """ + + priority: int = 100 + break_chain: bool = False + inject_body_path: str = "nvext.agent_context" + + def to_dict(self) -> JsonObject: + """Serialize this agent-context config to the canonical JSON object shape.""" + return _normalize_object( + { + "priority": self.priority, + "break_chain": self.break_chain, + "inject_body_path": self.inject_body_path, + } + ) + + @dataclass(slots=True) class ToolParallelismConfig: """Built-in adaptive tool scheduling settings. @@ -261,6 +287,7 @@ class AdaptiveConfig: state: Adaptive state backend configuration. telemetry: Built-in adaptive telemetry settings. adaptive_hints: Built-in LLM hint-injection settings. + agent_context: Built-in agent context propagation settings. tool_parallelism: Built-in tool scheduling settings. acg: Adaptive Cache Governor settings. policy: Unsupported-config policy applied within the adaptive config. @@ -275,6 +302,7 @@ class AdaptiveConfig: state: StateConfig | None = None telemetry: TelemetryConfig | None = None adaptive_hints: AdaptiveHintsConfig | None = None + agent_context: AgentContextConfig | None = None tool_parallelism: ToolParallelismConfig | None = None acg: AcgConfig | None = None policy: ConfigPolicy = field(default_factory=ConfigPolicy) @@ -287,6 +315,7 @@ def to_dict(self) -> JsonObject: "state": _normalize(self.state), "telemetry": _normalize(self.telemetry), "adaptive_hints": _normalize(self.adaptive_hints), + "agent_context": _normalize(self.agent_context), "tool_parallelism": _normalize(self.tool_parallelism), "acg": _normalize(self.acg), "policy": self.policy.to_dict(), @@ -379,6 +408,7 @@ def set_latency_sensitivity(level: int) -> None: "AcgStabilityThresholds", "AdaptiveConfig", "AdaptiveHintsConfig", + "AgentContextConfig", "ADAPTIVE_PLUGIN_KIND", "BackendSpec", "ConfigDiagnostic", diff --git a/python/nemo_relay/adaptive.pyi b/python/nemo_relay/adaptive.pyi index 892f7260..08ac2708 100644 --- a/python/nemo_relay/adaptive.pyi +++ b/python/nemo_relay/adaptive.pyi @@ -125,6 +125,25 @@ class AdaptiveHintsConfig: """Serialize this adaptive-hints config to the canonical JSON object shape.""" ... +@dataclass(slots=True) +class AgentContextConfig: + """Built-in agent context propagation settings. + + Args: + priority: Intercept priority. Lower values run first. + break_chain: Whether to stop later request intercepts after this one. + inject_body_path: JSON body path used when injecting request-body + agent context. + """ + + priority: int = ... + break_chain: bool = ... + inject_body_path: str = ... + + def to_dict(self) -> JsonObject: + """Serialize this agent-context config to the canonical JSON object shape.""" + ... + @dataclass(slots=True) class ToolParallelismConfig: """Built-in adaptive tool scheduling settings. @@ -191,6 +210,7 @@ class AdaptiveConfig: state: Adaptive state backend configuration. telemetry: Built-in adaptive telemetry subscriber settings. adaptive_hints: Built-in adaptive request-hints configuration. + agent_context: Built-in agent context propagation settings. tool_parallelism: Built-in adaptive tool-scheduling configuration. acg: Adaptive Cache Governor configuration. policy: Policy for unsupported adaptive configuration. @@ -201,6 +221,7 @@ class AdaptiveConfig: state: StateConfig | None = ... telemetry: TelemetryConfig | None = ... adaptive_hints: AdaptiveHintsConfig | None = ... + agent_context: AgentContextConfig | None = ... tool_parallelism: ToolParallelismConfig | None = ... acg: AcgConfig | None = ... policy: ConfigPolicy = ... diff --git a/python/tests/test_adaptive.py b/python/tests/test_adaptive.py index f3fb9254..9fc3398a 100644 --- a/python/tests/test_adaptive.py +++ b/python/tests/test_adaptive.py @@ -17,6 +17,7 @@ AcgStabilityThresholds, AdaptiveConfig, AdaptiveHintsConfig, + AgentContextConfig, BackendSpec, ComponentSpec, StateConfig, @@ -59,6 +60,11 @@ class NestedHint: def test_section_helpers(self): assert TelemetryConfig(learners=["latency_sensitivity"]).to_dict() == {"learners": ["latency_sensitivity"]} assert AdaptiveHintsConfig().to_dict()["priority"] == 100 + assert AgentContextConfig().to_dict() == { + "priority": 100, + "break_chain": False, + "inject_body_path": "nvext.agent_context", + } assert ToolParallelismConfig().to_dict()["mode"] == "observe_only" def test_adaptive_component_wraps_as_plugin_component(self): diff --git a/python/tests/test_adaptive_config.py b/python/tests/test_adaptive_config.py index eec629c4..df3875b4 100644 --- a/python/tests/test_adaptive_config.py +++ b/python/tests/test_adaptive_config.py @@ -12,6 +12,7 @@ AcgConfig, AcgStabilityThresholds, AdaptiveConfig, + AgentContextConfig, BackendSpec, ComponentSpec, ConfigPolicy, @@ -125,6 +126,7 @@ def test_in_memory_state_produces_clean_report(self): AdaptiveConfig( state=StateConfig(backend=BackendSpec.in_memory()), telemetry=TelemetryConfig(), + agent_context=AgentContextConfig(), ) ) ]