diff --git a/e2e/python/test_sandbox_policy.py b/e2e/python/test_sandbox_policy.py index 625fe8da..092f9978 100644 --- a/e2e/python/test_sandbox_policy.py +++ b/e2e/python/test_sandbox_policy.py @@ -314,9 +314,7 @@ def log_message(self, *args): {"connect_status": connect_resp.strip(), "http_status": 0} ) - request = ( - f"{method} {path} HTTP/1.1\r\nHost: {target_host}\r\nConnection: close\r\n\r\n" - ) + request = f"{method} {path} HTTP/1.1\r\nHost: {target_host}\r\nConnection: close\r\n\r\n" conn.sendall(request.encode()) data = b"" @@ -1348,262 +1346,6 @@ def test_l7_rule_without_query_matcher_allows_any_query_params( assert "connect-server-ok" in resp["body"] -# ============================================================================= -# Live policy update + log streaming tests -# -# LPU-1: Create sandbox, verify initial policy is v1 -# LPU-2: Set the same policy again -> unchanged (no new version) -# LPU-3: Push a different policy -> new version loaded, verify connectivity -# LPU-4: Push v2 again -> unchanged -# LPU-5: Fetch logs (one-shot + streaming) and verify both sources appear -# ============================================================================= - - -def test_live_policy_update_and_logs( - sandbox: Callable[..., Sandbox], - sandbox_client: SandboxClient, -) -> None: - """End-to-end: live policy update lifecycle with log verification.""" - from openshell._proto import openshell_pb2, sandbox_pb2 - - # --- Setup: two distinct policies --- - # Policy A: python can reach api.anthropic.com - policy_a = _base_policy( - network_policies={ - "anthropic": sandbox_pb2.NetworkPolicyRule( - name="anthropic", - endpoints=[ - sandbox_pb2.NetworkEndpoint(host="api.anthropic.com", port=443), - ], - binaries=[sandbox_pb2.NetworkBinary(path="/**")], - ), - }, - ) - # Policy B: python can reach api.anthropic.com AND example.com - policy_b = _base_policy( - network_policies={ - "anthropic": sandbox_pb2.NetworkPolicyRule( - name="anthropic", - endpoints=[ - sandbox_pb2.NetworkEndpoint(host="api.anthropic.com", port=443), - ], - binaries=[sandbox_pb2.NetworkBinary(path="/**")], - ), - "example": sandbox_pb2.NetworkPolicyRule( - name="example", - endpoints=[ - sandbox_pb2.NetworkEndpoint(host="example.com", port=443), - ], - binaries=[sandbox_pb2.NetworkBinary(path="/**")], - ), - }, - ) - - spec = datamodel_pb2.SandboxSpec(policy=policy_a) - stub = sandbox_client._stub - - with sandbox(spec=spec, delete_on_exit=True) as sb: - sandbox_name = sb.sandbox.name - - # --- LPU-1: Initial policy should be version 1 --- - status_resp = stub.GetSandboxPolicyStatus( - openshell_pb2.GetSandboxPolicyStatusRequest(name=sandbox_name, version=0) - ) - assert status_resp.revision.version >= 1, "Initial policy should be at least v1" - initial_version = status_resp.revision.version - initial_hash = status_resp.revision.policy_hash - - # --- LPU-2: Set the same policy -> no new version --- - update_resp = stub.UpdateConfig( - openshell_pb2.UpdateConfigRequest( - name=sandbox_name, - policy=policy_a, - ) - ) - assert update_resp.version == initial_version, ( - f"Same policy should return existing version {initial_version}, " - f"got {update_resp.version}" - ) - assert update_resp.policy_hash == initial_hash - - # --- LPU-3: Push policy B -> new version --- - update_resp = stub.UpdateConfig( - openshell_pb2.UpdateConfigRequest( - name=sandbox_name, - policy=policy_b, - ) - ) - new_version = update_resp.version - assert new_version > initial_version, ( - f"Different policy should create new version > {initial_version}, " - f"got {new_version}" - ) - assert update_resp.policy_hash != initial_hash - - # Wait for the sandbox to load the new policy (poll loop is 30s default). - import time - - deadline = time.time() + 90 - loaded = False - while time.time() < deadline: - status_resp = stub.GetSandboxPolicyStatus( - openshell_pb2.GetSandboxPolicyStatusRequest( - name=sandbox_name, version=new_version - ) - ) - status = status_resp.revision.status - if status == openshell_pb2.POLICY_STATUS_LOADED: - loaded = True - break - if status == openshell_pb2.POLICY_STATUS_FAILED: - pytest.fail( - f"Policy v{new_version} failed to load: " - f"{status_resp.revision.load_error}" - ) - time.sleep(2) - assert loaded, f"Policy v{new_version} was not loaded within 90s" - - # Verify the new policy works: example.com should now be allowed - result = sb.exec_python(_proxy_connect(), args=("example.com", 443)) - assert result.exit_code == 0, result.stderr - assert "200" in result.stdout, ( - f"example.com should be allowed after policy update, got: {result.stdout}" - ) - - # --- LPU-4: Push policy B again -> unchanged --- - update_resp = stub.UpdateConfig( - openshell_pb2.UpdateConfigRequest( - name=sandbox_name, - policy=policy_b, - ) - ) - assert update_resp.version == new_version, ( - f"Same policy B should return existing version {new_version}, " - f"got {update_resp.version}" - ) - - # --- LPU-5: Verify policy history --- - list_resp = stub.ListSandboxPolicies( - openshell_pb2.ListSandboxPoliciesRequest(name=sandbox_name, limit=10) - ) - versions = [r.version for r in list_resp.revisions] - assert new_version in versions - assert initial_version in versions - - # Only one version should be Loaded - loaded_count = sum( - 1 - for r in list_resp.revisions - if r.status == openshell_pb2.POLICY_STATUS_LOADED - ) - assert loaded_count == 1, ( - f"Expected exactly 1 loaded version, got {loaded_count}: " - f"{[(r.version, r.status) for r in list_resp.revisions]}" - ) - - # --- LPU-6: Fetch logs (one-shot) and verify both sources --- - # Resolve sandbox ID for log RPCs - get_resp = stub.GetSandbox(openshell_pb2.GetSandboxRequest(name=sandbox_name)) - sandbox_id = get_resp.sandbox.id - - logs_resp = stub.GetSandboxLogs( - openshell_pb2.GetSandboxLogsRequest(sandbox_id=sandbox_id, lines=500) - ) - assert logs_resp.buffer_total > 0, "Expected some logs in the buffer" - - sources = {log.source or "gateway" for log in logs_resp.logs} - assert "gateway" in sources, ( - f"Expected gateway logs in response, got sources: {sources}" - ) - # Sandbox logs may take a moment to arrive via the push stream. - # If they're present, verify the source tag. - if "sandbox" in sources: - sandbox_logs = [l for l in logs_resp.logs if l.source == "sandbox"] - assert len(sandbox_logs) > 0 - # Verify structured fields are present on at least one sandbox log - has_fields = any(len(l.fields) > 0 for l in sandbox_logs) - # Not all sandbox logs have fields (e.g., "Starting sandbox" doesn't), - # so we just check at least one does if there are CONNECT logs - connect_logs = [l for l in sandbox_logs if "CONNECT" in l.message] - if connect_logs: - assert has_fields, "CONNECT logs should have structured fields" - - -def test_live_policy_update_from_empty_network_policies( - sandbox: Callable[..., Sandbox], - sandbox_client: SandboxClient, -) -> None: - """End-to-end: add the first network rule to a running sandbox.""" - from openshell._proto import openshell_pb2, sandbox_pb2 - - initial_policy = _base_policy() - updated_policy = _base_policy( - network_policies={ - "example": sandbox_pb2.NetworkPolicyRule( - name="example", - endpoints=[ - sandbox_pb2.NetworkEndpoint(host="example.com", port=443), - ], - binaries=[sandbox_pb2.NetworkBinary(path="/**")], - ), - }, - ) - - spec = datamodel_pb2.SandboxSpec(policy=initial_policy) - stub = sandbox_client._stub - - with sandbox(spec=spec, delete_on_exit=True) as sb: - sandbox_name = sb.sandbox.name - - denied = sb.exec_python(_proxy_connect(), args=("example.com", 443)) - assert denied.exit_code == 0, denied.stderr - assert "403" in denied.stdout, denied.stdout - - initial_status = stub.GetSandboxPolicyStatus( - openshell_pb2.GetSandboxPolicyStatusRequest(name=sandbox_name, version=0) - ) - initial_version = initial_status.revision.version - - update_resp = stub.UpdateConfig( - openshell_pb2.UpdateConfigRequest( - name=sandbox_name, - policy=updated_policy, - ) - ) - new_version = update_resp.version - assert new_version > initial_version, ( - f"Adding the first network rule should create a new version > {initial_version}, " - f"got {new_version}" - ) - - import time - - deadline = time.time() + 90 - loaded = False - while time.time() < deadline: - status_resp = stub.GetSandboxPolicyStatus( - openshell_pb2.GetSandboxPolicyStatusRequest( - name=sandbox_name, version=new_version - ) - ) - status = status_resp.revision.status - if status == openshell_pb2.POLICY_STATUS_LOADED: - loaded = True - break - if status == openshell_pb2.POLICY_STATUS_FAILED: - pytest.fail( - f"Policy v{new_version} failed to load: " - f"{status_resp.revision.load_error}" - ) - time.sleep(2) - - assert loaded, f"Policy v{new_version} was not loaded within 90s" - - allowed = sb.exec_python(_proxy_connect(), args=("example.com", 443)) - assert allowed.exit_code == 0, allowed.stderr - assert "200" in allowed.stdout, allowed.stdout - - # ============================================================================= # Forward proxy tests (plain HTTP, non-CONNECT) # ============================================================================= diff --git a/e2e/rust/tests/live_policy_update.rs b/e2e/rust/tests/live_policy_update.rs new file mode 100644 index 00000000..c60b2954 --- /dev/null +++ b/e2e/rust/tests/live_policy_update.rs @@ -0,0 +1,423 @@ +// SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +// SPDX-License-Identifier: Apache-2.0 + +//! E2E tests for live policy updates on a running sandbox. +//! +//! Covers the full round-trip: +//! - Create sandbox with policy A +//! - Verify initial policy version via `policy get` +//! - Push same policy A again -> no version bump (idempotent) +//! - Push different policy B -> new version, `--wait` for sandbox to load it +//! - Verify policy history via `policy list` +//! +//! These tests replace the Python e2e tests `test_live_policy_update_and_logs` +//! and `test_live_policy_update_from_empty_network_policies`, which were flaky +//! due to hard-coded 90s poll timeouts. The Rust tests use the CLI's built-in +//! `--wait` flag for reliable synchronization. +//! +//! Note: the removed Python tests also covered `GetSandboxLogs` RPC and +//! verified actual proxy connectivity after policy update. Those are tracked +//! as follow-up coverage gaps -- the proxy enforcement path is covered by the +//! existing L4/L7/SSRF Python e2e tests, and log fetching needs a dedicated +//! test. + +#![cfg(feature = "e2e")] + +use std::fmt::Write as _; +use std::io::Write; +use std::process::Stdio; + +use openshell_e2e::harness::binary::openshell_cmd; +use openshell_e2e::harness::output::{extract_field, strip_ansi}; +use openshell_e2e::harness::sandbox::SandboxGuard; +use tempfile::NamedTempFile; + +// --------------------------------------------------------------------------- +// Policy YAML builders +// --------------------------------------------------------------------------- + +/// Build a policy YAML that allows any binary to reach the given hosts on +/// port 443. +/// +/// NOTE: The indentation in the format string is load-bearing YAML structure. +fn write_policy(hosts: &[&str]) -> Result { + let mut file = NamedTempFile::new().map_err(|e| format!("create temp policy file: {e}"))?; + + let mut network_rules = String::new(); + for (i, host) in hosts.iter().enumerate() { + let _ = write!( + network_rules, + r#" rule_{i}: + name: rule_{i} + endpoints: + - host: {host} + port: 443 + binaries: + - path: "/**" +"# + ); + } + + let policy = format!( + r"version: 1 + +filesystem_policy: + include_workdir: true + read_only: + - /usr + - /lib + - /proc + - /dev/urandom + - /app + - /etc + - /var/log + read_write: + - /sandbox + - /tmp + - /dev/null + +landlock: + compatibility: best_effort + +process: + run_as_user: sandbox + run_as_group: sandbox + +network_policies: +{network_rules}" + ); + + file.write_all(policy.as_bytes()) + .map_err(|e| format!("write temp policy file: {e}"))?; + file.flush() + .map_err(|e| format!("flush temp policy file: {e}"))?; + Ok(file) +} + +/// Build a minimal policy YAML with no network rules. +fn write_empty_network_policy() -> Result { + let mut file = NamedTempFile::new().map_err(|e| format!("create temp policy file: {e}"))?; + + let policy = r"version: 1 + +filesystem_policy: + include_workdir: true + read_only: + - /usr + - /lib + - /proc + - /dev/urandom + - /app + - /etc + - /var/log + read_write: + - /sandbox + - /tmp + - /dev/null + +landlock: + compatibility: best_effort + +process: + run_as_user: sandbox + run_as_group: sandbox +"; + + file.write_all(policy.as_bytes()) + .map_err(|e| format!("write temp policy file: {e}"))?; + file.flush() + .map_err(|e| format!("flush temp policy file: {e}"))?; + Ok(file) +} + +// --------------------------------------------------------------------------- +// CLI helpers +// --------------------------------------------------------------------------- + +struct CliResult { + success: bool, + output: String, + exit_code: Option, +} + +/// Run an `openshell` CLI command and return the result. +async fn run_cli(args: &[&str]) -> CliResult { + let mut cmd = openshell_cmd(); + cmd.args(args).stdout(Stdio::piped()).stderr(Stdio::piped()); + + let output = cmd.output().await.expect("spawn openshell command"); + let stdout = String::from_utf8_lossy(&output.stdout).to_string(); + let stderr = String::from_utf8_lossy(&output.stderr).to_string(); + let combined = strip_ansi(&format!("{stdout}{stderr}")); + + CliResult { + success: output.status.success(), + output: combined, + exit_code: output.status.code(), + } +} + +/// Extract the policy version number from `policy get` output. +/// +/// Uses the shared `extract_field` helper to find `Version: ` or +/// `Revision: ` in CLI tabular output. +fn extract_version(output: &str) -> Option { + extract_field(output, "Version") + .or_else(|| extract_field(output, "Revision")) + .and_then(|v| v.parse::().ok()) +} + +/// Extract the policy hash from `policy get` output. +fn extract_hash(output: &str) -> Option { + extract_field(output, "Hash") + .or_else(|| extract_field(output, "Policy hash")) +} + +/// Check that a version number appears in `policy list` output as a +/// distinct field value (not just a substring of some other number). +/// +/// Looks for the version number preceded by whitespace or at the start +/// of a line, to avoid matching "2" inside "12" or timestamps. +fn list_output_contains_version(output: &str, version: u32) -> bool { + let v = version.to_string(); + output.lines().any(|line| { + line.split_whitespace() + .any(|word| word == v || word.starts_with(&format!("{v} "))) + }) +} + +// --------------------------------------------------------------------------- +// Tests +// --------------------------------------------------------------------------- + +/// Test the full live policy update lifecycle: +/// +/// 1. Create sandbox with `--keep` +/// 2. Set policy A, verify initial version >= 1 +/// 3. Push same policy A -> version unchanged (idempotent) +/// 4. Push policy B (adds example.com) with `--wait` -> new version +/// 5. Push policy B again -> idempotent +/// 6. Verify policy list shows both versions +#[tokio::test] +#[allow(clippy::too_many_lines)] +async fn live_policy_update_round_trip() { + // --- Write two distinct policy files --- + let policy_a = write_policy(&["api.anthropic.com"]).expect("write policy A"); + let policy_b = + write_policy(&["api.anthropic.com", "example.com"]).expect("write policy B"); + + let policy_a_path = policy_a + .path() + .to_str() + .expect("policy A path should be utf-8") + .to_string(); + let policy_b_path = policy_b + .path() + .to_str() + .expect("policy B path should be utf-8") + .to_string(); + + // --- Create a long-running sandbox --- + let mut guard = SandboxGuard::create_keep( + &["sh", "-c", "echo Ready && sleep infinity"], + "Ready", + ) + .await + .expect("create keep sandbox"); + + // --- Set initial policy A --- + let r = run_cli(&[ + "policy", "set", &guard.name, "--policy", &policy_a_path, "--wait", "--timeout", "120", + ]) + .await; + assert!( + r.success, + "policy set A should succeed (exit {:?}):\n{}", + r.exit_code, r.output + ); + + // --- Verify initial policy version --- + let r = run_cli(&["policy", "get", &guard.name]).await; + assert!( + r.success, + "policy get should succeed (exit {:?}):\n{}", + r.exit_code, r.output + ); + + let initial_version = extract_version(&r.output) + .unwrap_or_else(|| panic!("could not parse version from policy get output:\n{}", r.output)); + assert!( + initial_version >= 1, + "initial policy version should be >= 1, got {initial_version}" + ); + + let initial_hash = extract_hash(&r.output); + + // --- Push same policy A again -> should be idempotent --- + let r = run_cli(&[ + "policy", "set", &guard.name, "--policy", &policy_a_path, "--wait", "--timeout", "120", + ]) + .await; + assert!( + r.success, + "policy set A (repeat) should succeed (exit {:?}):\n{}", + r.exit_code, r.output + ); + + let r = run_cli(&["policy", "get", &guard.name]).await; + assert!(r.success, "policy get after repeat should succeed:\n{}", r.output); + + let repeat_version = extract_version(&r.output) + .unwrap_or_else(|| panic!("could not parse version after repeat:\n{}", r.output)); + assert_eq!( + repeat_version, initial_version, + "same policy should not bump version: expected {initial_version}, got {repeat_version}" + ); + + if let (Some(ih), Some(rh)) = (&initial_hash, &extract_hash(&r.output)) { + assert_eq!(ih, rh, "same policy should produce same hash"); + } + + // --- Push policy B -> should create new version --- + let r = run_cli(&[ + "policy", "set", &guard.name, "--policy", &policy_b_path, "--wait", "--timeout", "120", + ]) + .await; + assert!( + r.success, + "policy set B should succeed (exit {:?}):\n{}", + r.exit_code, r.output + ); + + let r = run_cli(&["policy", "get", &guard.name]).await; + assert!(r.success, "policy get after B should succeed:\n{}", r.output); + + let new_version = extract_version(&r.output) + .unwrap_or_else(|| panic!("could not parse version after B:\n{}", r.output)); + assert!( + new_version > initial_version, + "different policy should bump version: expected > {initial_version}, got {new_version}" + ); + + if let (Some(ih), Some(nh)) = (&initial_hash, &extract_hash(&r.output)) { + assert_ne!(ih, nh, "different policy should produce different hash"); + } + + // --- Push policy B again -> idempotent --- + let r = run_cli(&[ + "policy", "set", &guard.name, "--policy", &policy_b_path, "--wait", "--timeout", "120", + ]) + .await; + assert!( + r.success, + "policy set B (repeat) should succeed (exit {:?}):\n{}", + r.exit_code, r.output + ); + + let r = run_cli(&["policy", "get", &guard.name]).await; + assert!(r.success, "policy get after B repeat should succeed:\n{}", r.output); + + let repeat_b_version = extract_version(&r.output) + .unwrap_or_else(|| panic!("could not parse version after B repeat:\n{}", r.output)); + assert_eq!( + repeat_b_version, new_version, + "same policy B should not bump version: expected {new_version}, got {repeat_b_version}" + ); + + // --- Verify policy list shows revision history --- + let r = run_cli(&["policy", "list", &guard.name]).await; + assert!( + r.success, + "policy list should succeed (exit {:?}):\n{}", + r.exit_code, r.output + ); + + // Both versions should appear in the list output. + assert!( + list_output_contains_version(&r.output, new_version), + "policy list should contain version {new_version}:\n{}", + r.output + ); + assert!( + list_output_contains_version(&r.output, initial_version), + "policy list should contain initial version {initial_version}:\n{}", + r.output + ); + + guard.cleanup().await; +} + +/// Test live policy update from an initially empty network policy: +/// +/// 1. Create sandbox with `--keep` +/// 2. Set policy with no network rules +/// 3. Push policy with a network rule using `--wait` +/// 4. Verify the version bumped +#[tokio::test] +async fn live_policy_update_from_empty_network_policies() { + let empty_policy = write_empty_network_policy().expect("write empty network policy"); + let full_policy = write_policy(&["example.com"]).expect("write full policy"); + + let empty_path = empty_policy + .path() + .to_str() + .expect("empty policy path should be utf-8") + .to_string(); + let full_path = full_policy + .path() + .to_str() + .expect("full policy path should be utf-8") + .to_string(); + + // Create sandbox with empty network policy. + let mut guard = SandboxGuard::create_keep( + &["sh", "-c", "echo Ready && sleep infinity"], + "Ready", + ) + .await + .expect("create keep sandbox"); + + // Set initial empty policy. + let r = run_cli(&[ + "policy", "set", &guard.name, "--policy", &empty_path, "--wait", "--timeout", "120", + ]) + .await; + assert!( + r.success, + "policy set (empty) should succeed (exit {:?}):\n{}", + r.exit_code, r.output + ); + + let r = run_cli(&["policy", "get", &guard.name]).await; + assert!(r.success, "policy get (empty) should succeed:\n{}", r.output); + + let initial_version = extract_version(&r.output) + .unwrap_or_else(|| panic!("could not parse version from empty policy:\n{}", r.output)); + + // Push policy with network rules. + let r = run_cli(&[ + "policy", "set", &guard.name, "--policy", &full_path, "--wait", "--timeout", "120", + ]) + .await; + assert!( + r.success, + "policy set (full) should succeed (exit {:?}):\n{}", + r.exit_code, r.output + ); + + let r = run_cli(&["policy", "get", &guard.name]).await; + assert!(r.success, "policy get (full) should succeed:\n{}", r.output); + + let new_version = extract_version(&r.output).unwrap_or_else(|| { + panic!( + "could not parse version after adding network rules:\n{}", + r.output + ) + }); + assert!( + new_version > initial_version, + "adding network rules should create new version > {initial_version}, got {new_version}" + ); + + guard.cleanup().await; +}