Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
260 changes: 1 addition & 259 deletions e2e/python/test_sandbox_policy.py
Original file line number Diff line number Diff line change
Expand Up @@ -314,9 +314,7 @@ def log_message(self, *args):
{"connect_status": connect_resp.strip(), "http_status": 0}
)

request = (
f"{method} {path} HTTP/1.1\r\nHost: {target_host}\r\nConnection: close\r\n\r\n"
)
request = f"{method} {path} HTTP/1.1\r\nHost: {target_host}\r\nConnection: close\r\n\r\n"
conn.sendall(request.encode())

data = b""
Expand Down Expand Up @@ -1348,262 +1346,6 @@ def test_l7_rule_without_query_matcher_allows_any_query_params(
assert "connect-server-ok" in resp["body"]


# =============================================================================
# Live policy update + log streaming tests
#
# LPU-1: Create sandbox, verify initial policy is v1
# LPU-2: Set the same policy again -> unchanged (no new version)
# LPU-3: Push a different policy -> new version loaded, verify connectivity
# LPU-4: Push v2 again -> unchanged
# LPU-5: Fetch logs (one-shot + streaming) and verify both sources appear
# =============================================================================


def test_live_policy_update_and_logs(
sandbox: Callable[..., Sandbox],
sandbox_client: SandboxClient,
) -> None:
"""End-to-end: live policy update lifecycle with log verification."""
from openshell._proto import openshell_pb2, sandbox_pb2

# --- Setup: two distinct policies ---
# Policy A: python can reach api.anthropic.com
policy_a = _base_policy(
network_policies={
"anthropic": sandbox_pb2.NetworkPolicyRule(
name="anthropic",
endpoints=[
sandbox_pb2.NetworkEndpoint(host="api.anthropic.com", port=443),
],
binaries=[sandbox_pb2.NetworkBinary(path="/**")],
),
},
)
# Policy B: python can reach api.anthropic.com AND example.com
policy_b = _base_policy(
network_policies={
"anthropic": sandbox_pb2.NetworkPolicyRule(
name="anthropic",
endpoints=[
sandbox_pb2.NetworkEndpoint(host="api.anthropic.com", port=443),
],
binaries=[sandbox_pb2.NetworkBinary(path="/**")],
),
"example": sandbox_pb2.NetworkPolicyRule(
name="example",
endpoints=[
sandbox_pb2.NetworkEndpoint(host="example.com", port=443),
],
binaries=[sandbox_pb2.NetworkBinary(path="/**")],
),
},
)

spec = datamodel_pb2.SandboxSpec(policy=policy_a)
stub = sandbox_client._stub

with sandbox(spec=spec, delete_on_exit=True) as sb:
sandbox_name = sb.sandbox.name

# --- LPU-1: Initial policy should be version 1 ---
status_resp = stub.GetSandboxPolicyStatus(
openshell_pb2.GetSandboxPolicyStatusRequest(name=sandbox_name, version=0)
)
assert status_resp.revision.version >= 1, "Initial policy should be at least v1"
initial_version = status_resp.revision.version
initial_hash = status_resp.revision.policy_hash

# --- LPU-2: Set the same policy -> no new version ---
update_resp = stub.UpdateConfig(
openshell_pb2.UpdateConfigRequest(
name=sandbox_name,
policy=policy_a,
)
)
assert update_resp.version == initial_version, (
f"Same policy should return existing version {initial_version}, "
f"got {update_resp.version}"
)
assert update_resp.policy_hash == initial_hash

# --- LPU-3: Push policy B -> new version ---
update_resp = stub.UpdateConfig(
openshell_pb2.UpdateConfigRequest(
name=sandbox_name,
policy=policy_b,
)
)
new_version = update_resp.version
assert new_version > initial_version, (
f"Different policy should create new version > {initial_version}, "
f"got {new_version}"
)
assert update_resp.policy_hash != initial_hash

# Wait for the sandbox to load the new policy (poll loop is 30s default).
import time

deadline = time.time() + 90
loaded = False
while time.time() < deadline:
status_resp = stub.GetSandboxPolicyStatus(
openshell_pb2.GetSandboxPolicyStatusRequest(
name=sandbox_name, version=new_version
)
)
status = status_resp.revision.status
if status == openshell_pb2.POLICY_STATUS_LOADED:
loaded = True
break
if status == openshell_pb2.POLICY_STATUS_FAILED:
pytest.fail(
f"Policy v{new_version} failed to load: "
f"{status_resp.revision.load_error}"
)
time.sleep(2)
assert loaded, f"Policy v{new_version} was not loaded within 90s"

# Verify the new policy works: example.com should now be allowed
result = sb.exec_python(_proxy_connect(), args=("example.com", 443))
assert result.exit_code == 0, result.stderr
assert "200" in result.stdout, (
f"example.com should be allowed after policy update, got: {result.stdout}"
)

# --- LPU-4: Push policy B again -> unchanged ---
update_resp = stub.UpdateConfig(
openshell_pb2.UpdateConfigRequest(
name=sandbox_name,
policy=policy_b,
)
)
assert update_resp.version == new_version, (
f"Same policy B should return existing version {new_version}, "
f"got {update_resp.version}"
)

# --- LPU-5: Verify policy history ---
list_resp = stub.ListSandboxPolicies(
openshell_pb2.ListSandboxPoliciesRequest(name=sandbox_name, limit=10)
)
versions = [r.version for r in list_resp.revisions]
assert new_version in versions
assert initial_version in versions

# Only one version should be Loaded
loaded_count = sum(
1
for r in list_resp.revisions
if r.status == openshell_pb2.POLICY_STATUS_LOADED
)
assert loaded_count == 1, (
f"Expected exactly 1 loaded version, got {loaded_count}: "
f"{[(r.version, r.status) for r in list_resp.revisions]}"
)

# --- LPU-6: Fetch logs (one-shot) and verify both sources ---
# Resolve sandbox ID for log RPCs
get_resp = stub.GetSandbox(openshell_pb2.GetSandboxRequest(name=sandbox_name))
sandbox_id = get_resp.sandbox.id

logs_resp = stub.GetSandboxLogs(
openshell_pb2.GetSandboxLogsRequest(sandbox_id=sandbox_id, lines=500)
)
assert logs_resp.buffer_total > 0, "Expected some logs in the buffer"

sources = {log.source or "gateway" for log in logs_resp.logs}
assert "gateway" in sources, (
f"Expected gateway logs in response, got sources: {sources}"
)
# Sandbox logs may take a moment to arrive via the push stream.
# If they're present, verify the source tag.
if "sandbox" in sources:
sandbox_logs = [l for l in logs_resp.logs if l.source == "sandbox"]
assert len(sandbox_logs) > 0
# Verify structured fields are present on at least one sandbox log
has_fields = any(len(l.fields) > 0 for l in sandbox_logs)
# Not all sandbox logs have fields (e.g., "Starting sandbox" doesn't),
# so we just check at least one does if there are CONNECT logs
connect_logs = [l for l in sandbox_logs if "CONNECT" in l.message]
if connect_logs:
assert has_fields, "CONNECT logs should have structured fields"


def test_live_policy_update_from_empty_network_policies(
sandbox: Callable[..., Sandbox],
sandbox_client: SandboxClient,
) -> None:
"""End-to-end: add the first network rule to a running sandbox."""
from openshell._proto import openshell_pb2, sandbox_pb2

initial_policy = _base_policy()
updated_policy = _base_policy(
network_policies={
"example": sandbox_pb2.NetworkPolicyRule(
name="example",
endpoints=[
sandbox_pb2.NetworkEndpoint(host="example.com", port=443),
],
binaries=[sandbox_pb2.NetworkBinary(path="/**")],
),
},
)

spec = datamodel_pb2.SandboxSpec(policy=initial_policy)
stub = sandbox_client._stub

with sandbox(spec=spec, delete_on_exit=True) as sb:
sandbox_name = sb.sandbox.name

denied = sb.exec_python(_proxy_connect(), args=("example.com", 443))
assert denied.exit_code == 0, denied.stderr
assert "403" in denied.stdout, denied.stdout

initial_status = stub.GetSandboxPolicyStatus(
openshell_pb2.GetSandboxPolicyStatusRequest(name=sandbox_name, version=0)
)
initial_version = initial_status.revision.version

update_resp = stub.UpdateConfig(
openshell_pb2.UpdateConfigRequest(
name=sandbox_name,
policy=updated_policy,
)
)
new_version = update_resp.version
assert new_version > initial_version, (
f"Adding the first network rule should create a new version > {initial_version}, "
f"got {new_version}"
)

import time

deadline = time.time() + 90
loaded = False
while time.time() < deadline:
status_resp = stub.GetSandboxPolicyStatus(
openshell_pb2.GetSandboxPolicyStatusRequest(
name=sandbox_name, version=new_version
)
)
status = status_resp.revision.status
if status == openshell_pb2.POLICY_STATUS_LOADED:
loaded = True
break
if status == openshell_pb2.POLICY_STATUS_FAILED:
pytest.fail(
f"Policy v{new_version} failed to load: "
f"{status_resp.revision.load_error}"
)
time.sleep(2)

assert loaded, f"Policy v{new_version} was not loaded within 90s"

allowed = sb.exec_python(_proxy_connect(), args=("example.com", 443))
assert allowed.exit_code == 0, allowed.stderr
assert "200" in allowed.stdout, allowed.stdout


# =============================================================================
# Forward proxy tests (plain HTTP, non-CONNECT)
# =============================================================================
Expand Down
Loading
Loading