diff --git a/crates/temper-authz/src/engine.rs b/crates/temper-authz/src/engine.rs index b82d63a2..8b0f09e4 100644 --- a/crates/temper-authz/src/engine.rs +++ b/crates/temper-authz/src/engine.rs @@ -225,18 +225,38 @@ impl AuthzEngine { // include tenant-defined custom attrs that can't be predicted by a // static schema. Policy-level type checking suffices. - let entities = match Entity::new(principal_uid.clone(), principal_attrs, HashSet::new()) { - Ok(entity) => match Entities::from_entities([entity], None) { - Ok(e) => e, + let principal_entity = + match Entity::new(principal_uid.clone(), principal_attrs, HashSet::new()) { + Ok(entity) => entity, Err(e) => { return AuthzDecision::Deny(AuthzDenial::EngineError(format!( - "failed to build entity store: {e}" + "failed to build principal entity: {e}" ))); } - }, + }; + + // Build resource entity with attributes so Cedar can resolve + // `resource.Field` references (e.g. `resource.AgentId == principal.id`). + let mut resource_entity_attrs: HashMap = + HashMap::new(); + for (key, value) in resource_attrs { + insert_json_as_cedar(&mut resource_entity_attrs, key.clone(), value); + } + let resource_entity = + match Entity::new(resource_uid.clone(), resource_entity_attrs, HashSet::new()) { + Ok(entity) => entity, + Err(e) => { + return AuthzDecision::Deny(AuthzDenial::EngineError(format!( + "failed to build resource entity: {e}" + ))); + } + }; + + let entities = match Entities::from_entities([principal_entity, resource_entity], None) { + Ok(e) => e, Err(e) => { return AuthzDecision::Deny(AuthzDenial::EngineError(format!( - "failed to build principal entity: {e}" + "failed to build entity store: {e}" ))); } }; diff --git a/crates/temper-server/src/adapters/claude_code.rs b/crates/temper-server/src/adapters/claude_code.rs index 8090ee12..cd69e526 100644 --- a/crates/temper-server/src/adapters/claude_code.rs +++ b/crates/temper-server/src/adapters/claude_code.rs @@ -53,7 +53,7 @@ async fn run_claude( ctx: &AdapterContext, resume: Option<&str>, ) -> Result { - let started = Instant::now(); + let started = Instant::now(); // determinism-ok: wall-clock timing for adapter duration let command_name = ctx .integration_config @@ -79,12 +79,6 @@ async fn run_claude( command.arg("--add-dir").arg(skills_path); } - if let Some(extra_args) = ctx.integration_config.get("args") { - for arg in extra_args.split_whitespace() { - command.arg(arg); - } - } - if let Some(workdir) = ctx.integration_config.get("workdir") && !workdir.trim().is_empty() { diff --git a/crates/temper-server/src/adapters/codex.rs b/crates/temper-server/src/adapters/codex.rs index badaae17..d8a5ad10 100644 --- a/crates/temper-server/src/adapters/codex.rs +++ b/crates/temper-server/src/adapters/codex.rs @@ -18,7 +18,7 @@ impl AgentAdapter for CodexAdapter { } async fn execute(&self, ctx: AdapterContext) -> Result { - let started = Instant::now(); + let started = Instant::now(); // determinism-ok: wall-clock timing for adapter duration let command_name = ctx .integration_config @@ -62,12 +62,6 @@ impl AgentAdapter for CodexAdapter { } } - if let Some(extra_args) = ctx.integration_config.get("args") { - for arg in extra_args.split_whitespace() { - command.arg(arg); - } - } - let output = command.output().await.map_err(|e| { AdapterError::Invocation(format!("failed to spawn '{command_name}': {e}")) })?; diff --git a/crates/temper-server/src/adapters/http_webhook.rs b/crates/temper-server/src/adapters/http_webhook.rs index 73e55945..6f805e19 100644 --- a/crates/temper-server/src/adapters/http_webhook.rs +++ b/crates/temper-server/src/adapters/http_webhook.rs @@ -1,11 +1,34 @@ //! Generic HTTP adapter. +use std::net::IpAddr; // determinism-ok: IP parsing for SSRF prevention, no network I/O use std::time::Instant; use async_trait::async_trait; use super::{AdapterContext, AdapterError, AdapterResult, AgentAdapter}; +/// Check whether a URL targets a private/loopback address (SSRF prevention). +fn is_private_url(url: &str) -> bool { + let domain = temper_wasm::authorized_host::extract_domain(url); + if let Ok(ip) = domain.parse::() { + return ip.is_loopback() || is_private_ip(ip); + } + matches!(domain, "localhost" | "metadata.google.internal") +} + +/// Returns true for RFC-1918, link-local, and cloud metadata IPs. +fn is_private_ip(ip: IpAddr) -> bool { + match ip { + IpAddr::V4(v4) => { + v4.is_loopback() // 127.0.0.0/8 + || v4.is_private() // 10/8, 172.16/12, 192.168/16 + || v4.is_link_local() // 169.254/16 (AWS metadata) + || v4.octets()[0] == 0 // 0.0.0.0/8 + } + IpAddr::V6(v6) => v6.is_loopback(), // ::1 + } +} + /// Adapter implementation for generic HTTP callback execution. #[derive(Debug, Default)] pub struct HttpWebhookAdapter; @@ -17,7 +40,7 @@ impl AgentAdapter for HttpWebhookAdapter { } async fn execute(&self, ctx: AdapterContext) -> Result { - let started = Instant::now(); + let started = Instant::now(); // determinism-ok: wall-clock timing for adapter duration let url = ctx .integration_config @@ -28,6 +51,16 @@ impl AgentAdapter for HttpWebhookAdapter { AdapterError::Invocation("missing adapter config key 'url'".to_string()) })?; + let allow_private = ctx + .integration_config + .get("allow_private_urls") + .is_some_and(|v| v == "true"); + if !allow_private && is_private_url(&url) { + return Err(AdapterError::Invocation(format!( + "SSRF blocked: adapter URL targets private/loopback address: {url}" + ))); + } + let method = ctx .integration_config .get("method") diff --git a/crates/temper-server/src/adapters/openclaw.rs b/crates/temper-server/src/adapters/openclaw.rs index 2c1ddecf..713e09de 100644 --- a/crates/temper-server/src/adapters/openclaw.rs +++ b/crates/temper-server/src/adapters/openclaw.rs @@ -21,7 +21,7 @@ impl AgentAdapter for OpenClawAdapter { } async fn execute(&self, ctx: AdapterContext) -> Result { - let started = Instant::now(); + let started = Instant::now(); // determinism-ok: wall-clock timing for adapter duration let gateway_url = ctx .integration_config @@ -82,6 +82,18 @@ impl AgentAdapter for OpenClawAdapter { .await .map_err(|e| AdapterError::Execution(format!("openclaw send failed: {e}")))?; + // Signal the gateway that we are waiting for the agent response. + let wait_msg = serde_json::json!({ + "type": "agent.wait", + "id": request_id, + }); + socket + .send(Message::Text(wait_msg.to_string().into())) + .await + .map_err(|e| { + AdapterError::Execution(format!("openclaw agent.wait send failed: {e}")) + })?; + let mut last_payload = serde_json::json!({}); let mut terminal_seen = false; diff --git a/crates/temper-server/src/state/dispatch/adapter.rs b/crates/temper-server/src/state/dispatch/adapter.rs index e6fa3684..cebb880e 100644 --- a/crates/temper-server/src/state/dispatch/adapter.rs +++ b/crates/temper-server/src/state/dispatch/adapter.rs @@ -156,7 +156,18 @@ impl crate::state::ServerState { let secrets = self .secrets_vault .as_ref() - .map(|vault| vault.get_tenant_secrets(&tenant)) + .map(|vault| { + let all = vault.get_tenant_secrets(&tenant); + // Only expose secrets referenced via {secret:KEY} in integration config. + let referenced: std::collections::BTreeMap = all + .into_iter() + .filter(|(key, _)| { + let pattern = format!("{{secret:{key}}}"); + integration.config.values().any(|v| v.contains(&pattern)) + }) + .collect(); + referenced + }) .unwrap_or_default(); let adapter_ctx = AdapterContext { diff --git a/crates/temper-server/tests/adapter_dispatch.rs b/crates/temper-server/tests/adapter_dispatch.rs index b1b03aca..34fb36a4 100644 --- a/crates/temper-server/tests/adapter_dispatch.rs +++ b/crates/temper-server/tests/adapter_dispatch.rs @@ -90,6 +90,7 @@ on_success = "AdapterSucceeded" on_failure = "AdapterFailed" url = "{url}/execute" method = "POST" +allow_private_urls = "true" "#, url = mock_server.uri() ); @@ -163,6 +164,7 @@ on_success = "AdapterSucceeded" on_failure = "AdapterFailed" url = "{url}/execute" method = "POST" +allow_private_urls = "true" "#, url = mock_server.uri() ); @@ -251,6 +253,7 @@ on_success = "AdapterSucceeded" on_failure = "AdapterFailed" url = "{url}/execute" method = "POST" +allow_private_urls = "true" "#, url = mock_server.uri() ); diff --git a/crates/temper-server/tests/orchestration_e2e.rs b/crates/temper-server/tests/orchestration_e2e.rs new file mode 100644 index 00000000..14c0753f --- /dev/null +++ b/crates/temper-server/tests/orchestration_e2e.rs @@ -0,0 +1,659 @@ +//! End-to-end integration test for the agent-orchestration OS app. +//! +//! Exercises the full HeartbeatRun lifecycle using the actual IOA specs, +//! CSDL, and Cedar policies from the OS app bundle with an HTTP adapter +//! backed by wiremock. + +use temper_runtime::ActorSystem; +use temper_runtime::tenant::TenantId; +use temper_server::ServerState; +use temper_server::registry::SpecRegistry; +use temper_server::request_context::AgentContext; +use temper_server::state::DispatchExtOptions; +use temper_spec::csdl::parse_csdl; +use wiremock::matchers::{method, path}; +use wiremock::{Mock, MockServer, ResponseTemplate}; + +// Embed the actual OS app specs (same as the bundled app). +const HEARTBEAT_IOA: &str = + include_str!("../../../os-apps/agent-orchestration/specs/heartbeat_run.ioa.toml"); +const ORG_IOA: &str = + include_str!("../../../os-apps/agent-orchestration/specs/organization.ioa.toml"); +const BUDGET_IOA: &str = + include_str!("../../../os-apps/agent-orchestration/specs/budget_ledger.ioa.toml"); +const CSDL_XML: &str = include_str!("../../../os-apps/agent-orchestration/specs/model.csdl.xml"); + +fn build_orchestration_state() -> ServerState { + let mut registry = SpecRegistry::new(); + let csdl = parse_csdl(CSDL_XML).expect("orchestration CSDL should parse"); + registry.register_tenant( + "default", + csdl, + CSDL_XML.to_string(), + &[ + ("HeartbeatRun", HEARTBEAT_IOA), + ("Organization", ORG_IOA), + ("BudgetLedger", BUDGET_IOA), + ], + ); + + let system = ActorSystem::new("orchestration-e2e"); + ServerState::from_registry(system, registry) +} + +fn supervisor_ctx() -> AgentContext { + AgentContext { + agent_id: Some("haku".to_string()), + session_id: Some("test-session".to_string()), + agent_type: Some("supervisor".to_string()), + } +} + +fn worker_ctx(agent_id: &str) -> AgentContext { + AgentContext { + agent_id: Some(agent_id.to_string()), + session_id: Some("worker-session".to_string()), + agent_type: Some("worker".to_string()), + } +} + +/// Helper: dispatch an action and assert success. +async fn dispatch_ok( + state: &ServerState, + entity_type: &str, + entity_id: &str, + action: &str, + params: serde_json::Value, + agent_ctx: &AgentContext, + await_integration: bool, +) -> temper_server::entity_actor::EntityResponse { + let response = state + .dispatch_tenant_action_ext( + &TenantId::default(), + entity_type, + entity_id, + action, + params, + DispatchExtOptions { + agent_ctx, + await_integration, + }, + ) + .await + .unwrap_or_else(|e| panic!("{entity_type}({entity_id}).{action} failed: {e}")); + assert!( + response.success, + "{entity_type}({entity_id}).{action} returned success=false: {:?}", + response.error + ); + response +} + +// ─────────────────────────────────────────────────────────────────── +// Test 1: Full HeartbeatRun lifecycle with HTTP adapter +// Schedule → ApproveBudget → CheckIn → StartExecution (adapter fires) +// → RecordResult (via adapter callback) → Complete +// ─────────────────────────────────────────────────────────────────── +#[tokio::test(flavor = "multi_thread")] +async fn heartbeat_run_full_lifecycle_with_http_adapter() { + let mock_server = MockServer::start().await; + Mock::given(method("POST")) + .and(path("/agent/execute")) + .respond_with(ResponseTemplate::new(200).set_body_json(serde_json::json!({ + "callback_params": { + "result": "task completed successfully" + } + }))) + .mount(&mock_server) + .await; + + // Patch the HeartbeatRun spec to use HTTP adapter with wiremock URL. + let patched_heartbeat = HEARTBEAT_IOA.replace( + r#"adapter = "{field:adapter_type}""#, + &format!( + "adapter = \"http\"\nurl = \"{}/agent/execute\"\nmethod = \"POST\"\nallow_private_urls = \"true\"", + mock_server.uri() + ), + ); + + let mut registry = SpecRegistry::new(); + let csdl = parse_csdl(CSDL_XML).expect("CSDL should parse"); + registry.register_tenant( + "default", + csdl, + CSDL_XML.to_string(), + &[ + ("HeartbeatRun", &patched_heartbeat), + ("Organization", ORG_IOA), + ("BudgetLedger", BUDGET_IOA), + ], + ); + let system = ActorSystem::new("heartbeat-e2e"); + let state = ServerState::from_registry(system, registry); + + let sup = supervisor_ctx(); + let agent_id = "worker-claude-1"; + let worker = worker_ctx(agent_id); + + // Step 1: Schedule the run (supervisor). + let resp = dispatch_ok( + &state, + "HeartbeatRun", + "run-1", + "Schedule", + serde_json::json!({ + "agent_id": agent_id, + "org_id": "org-1", + "wake_reason": "issue-42", + "max_turns": 10, + "adapter_type": "http" + }), + &sup, + false, + ) + .await; + assert_eq!(resp.state.status, "Scheduled"); + assert_eq!( + resp.state + .fields + .get("agent_bound") + .and_then(|v| v.as_bool()), + Some(true) + ); + + // Step 2: Approve budget (supervisor). + let resp = dispatch_ok( + &state, + "HeartbeatRun", + "run-1", + "ApproveBudget", + serde_json::json!({}), + &sup, + false, + ) + .await; + assert_eq!(resp.state.status, "Scheduled"); + assert_eq!( + resp.state + .fields + .get("budget_approved") + .and_then(|v| v.as_bool()), + Some(true) + ); + + // Step 3: Agent checks in (worker). + let resp = dispatch_ok( + &state, + "HeartbeatRun", + "run-1", + "CheckIn", + serde_json::json!({}), + &worker, + false, + ) + .await; + assert_eq!(resp.state.status, "CheckingIn"); + + // Step 4: Start execution (worker) — triggers adapter integration inline. + // The adapter calls the wiremock HTTP endpoint, which returns a success + // callback with `result = "task completed successfully"`. + // The dispatch pipeline fires RecordResult automatically. + let resp = dispatch_ok( + &state, + "HeartbeatRun", + "run-1", + "StartExecution", + serde_json::json!({}), + &worker, + true, // await_integration: wait for adapter + callback + ) + .await; + + // After StartExecution + adapter success callback (RecordResult), + // the entity should be in Working with has_result = true. + assert_eq!(resp.state.status, "Working"); + assert_eq!( + resp.state + .fields + .get("has_result") + .and_then(|v| v.as_bool()), + Some(true) + ); + + // Step 5: Complete the run (worker). + let resp = dispatch_ok( + &state, + "HeartbeatRun", + "run-1", + "Complete", + serde_json::json!({}), + &worker, + false, + ) + .await; + assert_eq!(resp.state.status, "Completed"); + + // Verify the mock was called exactly once. + let received = mock_server.received_requests().await.unwrap(); + assert_eq!( + received.len(), + 1, + "adapter should have made exactly one HTTP call" + ); +} + +// ─────────────────────────────────────────────────────────────────── +// Test 2: Adapter failure triggers Fail callback +// ─────────────────────────────────────────────────────────────────── +#[tokio::test(flavor = "multi_thread")] +async fn heartbeat_run_adapter_failure_triggers_fail_callback() { + let mock_server = MockServer::start().await; + Mock::given(method("POST")) + .and(path("/agent/execute")) + .respond_with(ResponseTemplate::new(500).set_body_string("agent crashed")) + .mount(&mock_server) + .await; + + let patched_heartbeat = HEARTBEAT_IOA.replace( + r#"adapter = "{field:adapter_type}""#, + &format!( + "adapter = \"http\"\nurl = \"{}/agent/execute\"\nmethod = \"POST\"\nallow_private_urls = \"true\"", + mock_server.uri() + ), + ); + + let mut registry = SpecRegistry::new(); + let csdl = parse_csdl(CSDL_XML).expect("CSDL should parse"); + registry.register_tenant( + "default", + csdl, + CSDL_XML.to_string(), + &[ + ("HeartbeatRun", &patched_heartbeat), + ("Organization", ORG_IOA), + ("BudgetLedger", BUDGET_IOA), + ], + ); + let system = ActorSystem::new("heartbeat-failure-e2e"); + let state = ServerState::from_registry(system, registry); + + let sup = supervisor_ctx(); + let worker = worker_ctx("worker-claude-2"); + + // Walk to Working state. + dispatch_ok( + &state, + "HeartbeatRun", + "run-fail", + "Schedule", + serde_json::json!({ + "agent_id": "worker-claude-2", + "org_id": "org-1", + "wake_reason": "test", + "max_turns": 5, + "adapter_type": "http" + }), + &sup, + false, + ) + .await; + + dispatch_ok( + &state, + "HeartbeatRun", + "run-fail", + "ApproveBudget", + serde_json::json!({}), + &sup, + false, + ) + .await; + + dispatch_ok( + &state, + "HeartbeatRun", + "run-fail", + "CheckIn", + serde_json::json!({}), + &worker, + false, + ) + .await; + + // StartExecution — adapter returns 500, on_failure triggers Fail action. + let resp = dispatch_ok( + &state, + "HeartbeatRun", + "run-fail", + "StartExecution", + serde_json::json!({}), + &worker, + true, + ) + .await; + + // The adapter failure callback fires Fail, moving to Failed state. + assert_eq!(resp.state.status, "Failed"); + let error_msg = resp + .state + .fields + .get("error_message") + .and_then(|v| v.as_str()) + .unwrap_or_default(); + assert!( + !error_msg.is_empty(), + "error_message should be set on failure" + ); +} + +// ─────────────────────────────────────────────────────────────────── +// Test 3: Organization lifecycle +// Setup → Configure → Activate → AddMember → RecordCost → +// IncrementActiveRuns → DecrementActiveRuns → ResetBudgetCycle +// ─────────────────────────────────────────────────────────────────── +#[tokio::test(flavor = "multi_thread")] +async fn organization_full_lifecycle() { + let state = build_orchestration_state(); + let sup = supervisor_ctx(); + + // Configure org. + let resp = dispatch_ok( + &state, + "Organization", + "org-1", + "Configure", + serde_json::json!({ + "name": "Temper Agents", + "description": "Agent orchestration org", + "budget_monthly_cents": "100000" + }), + &sup, + false, + ) + .await; + assert_eq!(resp.state.status, "Setup"); + assert_eq!( + resp.state.fields.get("name").and_then(|v| v.as_str()), + Some("Temper Agents") + ); + + // Activate. + let resp = dispatch_ok( + &state, + "Organization", + "org-1", + "Activate", + serde_json::json!({}), + &sup, + false, + ) + .await; + assert_eq!(resp.state.status, "Active"); + + // Add a member. + let resp = dispatch_ok( + &state, + "Organization", + "org-1", + "AddMember", + serde_json::json!({"member_id": "claude-code-1"}), + &sup, + false, + ) + .await; + assert_eq!( + resp.state + .fields + .get("member_count") + .and_then(|v| v.as_i64()), + Some(1) + ); + + // Record cost (any agent can do this). + let worker = worker_ctx("agent-1"); + let resp = dispatch_ok( + &state, + "Organization", + "org-1", + "RecordCost", + serde_json::json!({ + "amount_cents": 500, + "budget_consumed_cents": "500" + }), + &worker, + false, + ) + .await; + assert_eq!( + resp.state + .fields + .get("budget_consumed_cents") + .and_then(|v| v.as_str()), + Some("500") + ); + + // Increment active runs. + let resp = dispatch_ok( + &state, + "Organization", + "org-1", + "IncrementActiveRuns", + serde_json::json!({}), + &worker, + false, + ) + .await; + assert_eq!( + resp.state + .fields + .get("active_runs") + .and_then(|v| v.as_i64()), + Some(1) + ); + + // Decrement active runs. + let resp = dispatch_ok( + &state, + "Organization", + "org-1", + "DecrementActiveRuns", + serde_json::json!({}), + &worker, + false, + ) + .await; + assert_eq!( + resp.state + .fields + .get("active_runs") + .and_then(|v| v.as_i64()), + Some(0) + ); + + // Reset budget cycle. + let resp = dispatch_ok( + &state, + "Organization", + "org-1", + "ResetBudgetCycle", + serde_json::json!({"budget_consumed_cents": "0"}), + &sup, + false, + ) + .await; + assert_eq!( + resp.state + .fields + .get("budget_consumed_cents") + .and_then(|v| v.as_str()), + Some("0") + ); +} + +// ─────────────────────────────────────────────────────────────────── +// Test 4: BudgetLedger append-only recording +// ─────────────────────────────────────────────────────────────────── +#[tokio::test(flavor = "multi_thread")] +async fn budget_ledger_records_cost_entry() { + let state = build_orchestration_state(); + let worker = worker_ctx("agent-1"); + + let resp = dispatch_ok( + &state, + "BudgetLedger", + "ledger-1", + "Record", + serde_json::json!({ + "org_id": "org-1", + "agent_id": "claude-code-1", + "run_id": "run-1", + "amount_cents": "250", + "tokens": "5000", + "category": "execution" + }), + &worker, + false, + ) + .await; + + assert_eq!(resp.state.status, "Recorded"); + assert_eq!( + resp.state.fields.get("org_id").and_then(|v| v.as_str()), + Some("org-1") + ); + assert_eq!( + resp.state + .fields + .get("amount_cents") + .and_then(|v| v.as_str()), + Some("250") + ); +} + +// ─────────────────────────────────────────────────────────────────── +// Test 5: Guard enforcement — CheckIn requires budget_approved + agent_bound +// ─────────────────────────────────────────────────────────────────── +#[tokio::test(flavor = "multi_thread")] +async fn heartbeat_run_guard_blocks_checkin_without_approval() { + let state = build_orchestration_state(); + let sup = supervisor_ctx(); + let worker = worker_ctx("worker-1"); + + // Schedule but DON'T approve budget. + dispatch_ok( + &state, + "HeartbeatRun", + "run-guard", + "Schedule", + serde_json::json!({ + "agent_id": "worker-1", + "org_id": "org-1", + "wake_reason": "test", + "max_turns": 5, + "adapter_type": "http" + }), + &sup, + false, + ) + .await; + + // CheckIn should fail because budget not approved. + let result = state + .dispatch_tenant_action_ext( + &TenantId::default(), + "HeartbeatRun", + "run-guard", + "CheckIn", + serde_json::json!({}), + DispatchExtOptions { + agent_ctx: &worker, + await_integration: false, + }, + ) + .await; + + match result { + Ok(resp) => { + assert!(!resp.success, "CheckIn should fail without budget approval"); + } + Err(_) => { + // Also acceptable — guard violation may surface as error. + } + } +} + +// ─────────────────────────────────────────────────────────────────── +// Test 6: Cancellation from any pre-terminal state +// ─────────────────────────────────────────────────────────────────── +#[tokio::test(flavor = "multi_thread")] +async fn heartbeat_run_cancel_from_working() { + let state = build_orchestration_state(); + let sup = supervisor_ctx(); + let worker = worker_ctx("worker-cancel"); + + // Walk to Working without triggering adapter (use non-await mode). + dispatch_ok( + &state, + "HeartbeatRun", + "run-cancel", + "Schedule", + serde_json::json!({ + "agent_id": "worker-cancel", + "org_id": "org-1", + "wake_reason": "test", + "max_turns": 5, + "adapter_type": "http" + }), + &sup, + false, + ) + .await; + dispatch_ok( + &state, + "HeartbeatRun", + "run-cancel", + "ApproveBudget", + serde_json::json!({}), + &sup, + false, + ) + .await; + dispatch_ok( + &state, + "HeartbeatRun", + "run-cancel", + "CheckIn", + serde_json::json!({}), + &worker, + false, + ) + .await; + // StartExecution without await — adapter fires in background (will fail since + // no mock server, but we don't care because we're cancelling). + let _ = state + .dispatch_tenant_action_ext( + &TenantId::default(), + "HeartbeatRun", + "run-cancel", + "StartExecution", + serde_json::json!({}), + DispatchExtOptions { + agent_ctx: &worker, + await_integration: false, + }, + ) + .await; + + // Cancel from Working. + let resp = dispatch_ok( + &state, + "HeartbeatRun", + "run-cancel", + "Cancel", + serde_json::json!({}), + &worker, + false, + ) + .await; + assert_eq!(resp.state.status, "Cancelled"); +} diff --git a/os-apps/agent-orchestration/policies/orchestration.cedar b/os-apps/agent-orchestration/policies/orchestration.cedar index 12ee5b8b..c539997d 100644 --- a/os-apps/agent-orchestration/policies/orchestration.cedar +++ b/os-apps/agent-orchestration/policies/orchestration.cedar @@ -40,6 +40,14 @@ permit( resource is Organization ); +permit( + principal, + action == Action::"create", + resource is Organization +) when { + ["supervisor", "human"].contains(principal.agent_type) +}; + permit( principal, action in [Action::"Configure", Action::"Activate", Action::"AddMember", Action::"RemoveMember", Action::"ResetBudgetCycle", Action::"Pause", Action::"Resume", Action::"Archive"], @@ -50,7 +58,7 @@ permit( permit( principal, - action == Action::"RecordCost", + action in [Action::"RecordCost", Action::"IncrementActiveRuns", Action::"DecrementActiveRuns"], resource is Organization ); diff --git a/os-apps/agent-orchestration/specs/model.csdl.xml b/os-apps/agent-orchestration/specs/model.csdl.xml index 79c07e81..13c7710d 100644 --- a/os-apps/agent-orchestration/specs/model.csdl.xml +++ b/os-apps/agent-orchestration/specs/model.csdl.xml @@ -41,8 +41,8 @@ - - + + @@ -75,6 +75,8 @@ + + @@ -135,11 +137,23 @@ + + + + + + + + + + + + @@ -163,8 +177,8 @@ - - + + diff --git a/os-apps/agent-orchestration/specs/organization.ioa.toml b/os-apps/agent-orchestration/specs/organization.ioa.toml index e81b6ac0..a4298d7d 100644 --- a/os-apps/agent-orchestration/specs/organization.ioa.toml +++ b/os-apps/agent-orchestration/specs/organization.ioa.toml @@ -77,11 +77,27 @@ from = ["Active"] params = ["amount_cents", "budget_consumed_cents"] hint = "Record budget consumption for orchestration work using caller-provided cumulative totals." +[[action]] +name = "IncrementActiveRuns" +kind = "input" +from = ["Active"] +hint = "Increment count of active heartbeat runs." +effect = [{ type = "increment", var = "active_runs" }] + +[[action]] +name = "DecrementActiveRuns" +kind = "input" +from = ["Active"] +guard = "active_runs > 0" +hint = "Decrement count of active heartbeat runs." +effect = [{ type = "decrement", var = "active_runs" }] + [[action]] name = "ResetBudgetCycle" kind = "input" from = ["Active", "Paused"] -hint = "Mark budget cycle rollover." +params = ["budget_consumed_cents"] +hint = "Reset budget consumption for a new billing cycle. Caller passes budget_consumed_cents=0." [[action]] name = "Pause"