diff --git a/pkg/workloadmanager/handlers.go b/pkg/workloadmanager/handlers.go index 99761e07..b9748518 100644 --- a/pkg/workloadmanager/handlers.go +++ b/pkg/workloadmanager/handlers.go @@ -200,21 +200,22 @@ func (s *Server) createSandbox(ctx context.Context, dynamicClient dynamic.Interf sandboxRollbackFunc() }() - // agent-sandbox create pod with same name as sandbox if no warmpool is used - // so here we try to get pod IP by sandbox name first - // if warmpool is used, the pod name is stored in sandbox's annotation `agents.x-k8s.io/sandbox-pod-name` - // https://github.com/kubernetes-sigs/agent-sandbox/blob/3ab7fbcd85ad0d75c6e632ecd14bcaeda5e76e1e/controllers/sandbox_controller.go#L465 - sandboxPodName := sandbox.Name - if podName, exists := createdSandbox.Annotations[controllers.SandboxPodNameAnnotation]; exists { - sandboxPodName = podName - } - - podIP, err := s.k8sClient.GetSandboxPodIP(ctx, sandbox.Namespace, sandbox.Name, sandboxPodName) - if err != nil { - return nil, fmt.Errorf("failed to get sandbox %s/%s pod IP: %v", sandbox.Namespace, sandbox.Name, err) + // Prefer ServiceFQDN: K8s Endpoints controller keeps DNS in sync when pod is evicted/recreated. + // Fallback to pod IP for agent-sandbox without ServiceFQDN. + host := createdSandbox.Status.ServiceFQDN + if host == "" { + sandboxPodName := sandbox.Name + if podName, exists := createdSandbox.Annotations[controllers.SandboxPodNameAnnotation]; exists { + sandboxPodName = podName + } + var err error + host, err = s.k8sClient.GetSandboxPodIP(ctx, sandbox.Namespace, sandbox.Name, sandboxPodName) + if err != nil { + return nil, fmt.Errorf("failed to get sandbox %s/%s pod IP: %v", sandbox.Namespace, sandbox.Name, err) + } } - storeCacheInfo := buildSandboxInfo(createdSandbox, podIP, sandboxEntry) + storeCacheInfo := buildSandboxInfo(createdSandbox, host, sandboxEntry) response := &types.CreateSandboxResponse{ SessionID: sandboxEntry.SessionID, diff --git a/pkg/workloadmanager/handlers_test.go b/pkg/workloadmanager/handlers_test.go index a5f78712..cccafed7 100644 --- a/pkg/workloadmanager/handlers_test.go +++ b/pkg/workloadmanager/handlers_test.go @@ -83,13 +83,23 @@ func readySandbox() *sandboxv1alpha1.Sandbox { Annotations: map[string]string{controllers.SandboxPodNameAnnotation: "pod-1"}, CreationTimestamp: metav1.Now(), }, - Status: sandboxv1alpha1.SandboxStatus{Conditions: []metav1.Condition{{ - Type: string(sandboxv1alpha1.SandboxConditionReady), - Status: metav1.ConditionTrue, - }}}, + Status: sandboxv1alpha1.SandboxStatus{ + ServiceFQDN: "sandbox-1.ns-1.svc.cluster.local", + Conditions: []metav1.Condition{{ + Type: string(sandboxv1alpha1.SandboxConditionReady), + Status: metav1.ConditionTrue, + }}, + }, } } +// readySandboxNoFQDN returns a sandbox without ServiceFQDN (fallback to GetSandboxPodIP). +func readySandboxNoFQDN() *sandboxv1alpha1.Sandbox { + sb := readySandbox() + sb.Status.ServiceFQDN = "" + return sb +} + func makeEntry() *sandboxEntry { return &sandboxEntry{ Kind: types.AgentRuntimeKind, @@ -111,6 +121,7 @@ func TestServerCreateSandbox(t *testing.T) { updateErr error sendResult bool expectErr bool + noServiceFQDN bool // use sandbox without ServiceFQDN (triggers GetSandboxPodIP fallback) expectCreateCalls int expectClaimCalls int expectDeleteCalls int @@ -129,6 +140,13 @@ func TestServerCreateSandbox(t *testing.T) { expectClaimCalls: 1, expectUpdateCalls: 1, }, + { + name: "fallback to pod IP when ServiceFQDN empty", + sendResult: true, + noServiceFQDN: true, + expectCreateCalls: 1, + expectUpdateCalls: 1, + }, { name: "store placeholder fails", storeErr: errors.New("store failed"), @@ -148,12 +166,13 @@ func TestServerCreateSandbox(t *testing.T) { expectClaimCalls: 1, }, { - name: "pod ip lookup fails triggers rollback", + name: "pod ip lookup fails triggers rollback (no ServiceFQDN fallback)", podIPErr: errors.New("pod ip missing"), sendResult: true, expectErr: true, expectCreateCalls: 1, expectDeleteCalls: 1, + noServiceFQDN: true, }, { name: "update store fails triggers rollback", @@ -174,6 +193,9 @@ func TestServerCreateSandbox(t *testing.T) { resultChan := make(chan SandboxStatusUpdate, 1) sb := readySandbox() + if tt.noServiceFQDN { + sb = readySandboxNoFQDN() + } if tt.sendResult { resultChan <- SandboxStatusUpdate{Sandbox: sb.DeepCopy()} } @@ -241,7 +263,11 @@ func TestServerCreateSandbox(t *testing.T) { require.Equal(t, string(sb.UID), resp.SandboxID) require.Len(t, resp.EntryPoints, 1) require.Equal(t, "/api", resp.EntryPoints[0].Path) - require.Equal(t, "10.0.0.9:8080", resp.EntryPoints[0].Endpoint) + expectedEndpoint := "sandbox-1.ns-1.svc.cluster.local:8080" + if tt.noServiceFQDN { + expectedEndpoint = "10.0.0.9:8080" + } + require.Equal(t, expectedEndpoint, resp.EntryPoints[0].Endpoint) }) } } diff --git a/pkg/workloadmanager/sandbox_helper.go b/pkg/workloadmanager/sandbox_helper.go index 17d565bc..5cf33757 100644 --- a/pkg/workloadmanager/sandbox_helper.go +++ b/pkg/workloadmanager/sandbox_helper.go @@ -37,7 +37,8 @@ func buildSandboxPlaceHolder(sandboxCR *sandboxv1alpha1.Sandbox, entry *sandboxE } } -func buildSandboxInfo(sandbox *sandboxv1alpha1.Sandbox, podIP string, entry *sandboxEntry) *types.SandboxInfo { +// buildSandboxInfo builds SandboxInfo. host is ServiceFQDN (preferred, resilient to pod restart) or pod IP. +func buildSandboxInfo(sandbox *sandboxv1alpha1.Sandbox, host string, entry *sandboxEntry) *types.SandboxInfo { createdAt := sandbox.GetCreationTimestamp().Time expiresAt := createdAt.Add(DefaultSandboxTTL) if sandbox.Spec.Lifecycle.ShutdownTime != nil { @@ -48,7 +49,7 @@ func buildSandboxInfo(sandbox *sandboxv1alpha1.Sandbox, podIP string, entry *san accesses = append(accesses, types.SandboxEntryPoint{ Path: port.PathPrefix, Protocol: string(port.Protocol), - Endpoint: net.JoinHostPort(podIP, strconv.Itoa(int(port.Port))), + Endpoint: net.JoinHostPort(host, strconv.Itoa(int(port.Port))), }) } return &types.SandboxInfo{ diff --git a/pkg/workloadmanager/sandbox_helper_test.go b/pkg/workloadmanager/sandbox_helper_test.go index c58ade10..7d8aa463 100644 --- a/pkg/workloadmanager/sandbox_helper_test.go +++ b/pkg/workloadmanager/sandbox_helper_test.go @@ -28,7 +28,10 @@ import ( "github.com/volcano-sh/agentcube/pkg/common/types" ) -const sandboxHelperTestPodIP = "10.0.0.1" +const ( + sandboxHelperTestPodIP = "10.0.0.1" + sandboxHelperTestServiceFQDN = "test-sandbox.default.svc.cluster.local" +) // Note: TestBuildSandboxPlaceHolder and TestBuildSandboxPlaceHolder_CodeInterpreter // removed - they only verified that struct fields match input parameters, which is @@ -40,7 +43,7 @@ func TestBuildSandboxInfo_TableDriven(t *testing.T) { tests := []struct { name string setupSandbox func() *sandboxv1alpha1.Sandbox - podIP string + host string // ServiceFQDN or pod IP entry *sandboxEntry validateResult func(t *testing.T, result *types.SandboxInfo) }{ @@ -64,7 +67,7 @@ func TestBuildSandboxInfo_TableDriven(t *testing.T) { }, } }, - podIP: sandboxHelperTestPodIP, + host: sandboxHelperTestPodIP, entry: &sandboxEntry{ Kind: types.AgentRuntimeKind, SessionID: "test-session-123", @@ -90,6 +93,39 @@ func TestBuildSandboxInfo_TableDriven(t *testing.T) { assert.Equal(t, sandboxHelperTestPodIP+":9090", result.EntryPoints[1].Endpoint) }, }, + { + name: "sandbox with ServiceFQDN (resilient to pod restart)", + setupSandbox: func() *sandboxv1alpha1.Sandbox { + return &sandboxv1alpha1.Sandbox{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-sandbox", + Namespace: "default", + UID: "test-uid-123", + CreationTimestamp: metav1.NewTime(now), + }, + Status: sandboxv1alpha1.SandboxStatus{ + ServiceFQDN: sandboxHelperTestServiceFQDN, + Conditions: []metav1.Condition{ + { + Type: string(sandboxv1alpha1.SandboxConditionReady), + Status: metav1.ConditionTrue, + }, + }, + }, + } + }, + host: sandboxHelperTestServiceFQDN, + entry: &sandboxEntry{ + Kind: types.AgentRuntimeKind, + SessionID: "test-session-123", + Ports: []runtimev1alpha1.TargetPort{ + {Port: 8080, Protocol: runtimev1alpha1.ProtocolTypeHTTP, PathPrefix: "/api"}, + }, + }, + validateResult: func(t *testing.T, result *types.SandboxInfo) { + assert.Equal(t, sandboxHelperTestServiceFQDN+":8080", result.EntryPoints[0].Endpoint) + }, + }, { name: "sandbox with shutdown time", setupSandbox: func() *sandboxv1alpha1.Sandbox { @@ -116,7 +152,7 @@ func TestBuildSandboxInfo_TableDriven(t *testing.T) { }, } }, - podIP: sandboxHelperTestPodIP, + host: sandboxHelperTestPodIP, entry: &sandboxEntry{ Kind: types.AgentRuntimeKind, SessionID: "test-session-123", @@ -148,7 +184,7 @@ func TestBuildSandboxInfo_TableDriven(t *testing.T) { }, } }, - podIP: sandboxHelperTestPodIP, + host: sandboxHelperTestPodIP, entry: &sandboxEntry{ Kind: types.AgentRuntimeKind, SessionID: "test-session-123", @@ -159,7 +195,7 @@ func TestBuildSandboxInfo_TableDriven(t *testing.T) { }, }, { - name: "sandbox with empty pod IP", + name: "sandbox with empty host", setupSandbox: func() *sandboxv1alpha1.Sandbox { return &sandboxv1alpha1.Sandbox{ ObjectMeta: metav1.ObjectMeta{ @@ -178,7 +214,7 @@ func TestBuildSandboxInfo_TableDriven(t *testing.T) { }, } }, - podIP: "", + host: "", entry: &sandboxEntry{ Kind: types.AgentRuntimeKind, SessionID: "test-session-123", @@ -199,7 +235,7 @@ func TestBuildSandboxInfo_TableDriven(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { sandbox := tt.setupSandbox() - result := buildSandboxInfo(sandbox, tt.podIP, tt.entry) + result := buildSandboxInfo(sandbox, tt.host, tt.entry) tt.validateResult(t, result) }) } diff --git a/test/e2e/e2e_test.go b/test/e2e/e2e_test.go index 4088aa99..e0d5beec 100644 --- a/test/e2e/e2e_test.go +++ b/test/e2e/e2e_test.go @@ -57,7 +57,11 @@ const ( // ownerKindSandboxWarmPool is the owner reference kind for SandboxWarmPool resources ownerKindSandboxWarmPool = "SandboxWarmPool" - agentcubeNamespace = "agentcube" + // sessionIDLabelKey is the label key for session ID on Sandbox (matches workloadmanager.SessionIdLabelKey) + sessionIDLabelKey = "runtime.agentcube.io/session-id" + + agentcubeNamespace = "agentcube" + e2eCodeInterpreterName = "e2e-code-interpreter" ) var ( @@ -792,7 +796,7 @@ func TestCodeInterpreterBasicInvocation(t *testing.T) { env := newTestEnv(t) namespace := agentcubeNamespace - name := "e2e-code-interpreter" + name := e2eCodeInterpreterName testCases := []struct { name string @@ -835,7 +839,7 @@ func TestCodeInterpreterFileOperations(t *testing.T) { env := newTestEnv(t) namespace := agentcubeNamespace - name := "e2e-code-interpreter" + name := e2eCodeInterpreterName // Create a session for file operations sessionID, err := env.createCodeInterpreterSession(namespace, name) @@ -938,6 +942,69 @@ print("Fibonacci sequence generated") }) } +// TestCodeInterpreterSandboxAccessAfterPodRestart verifies sandbox access resilience: +// when the pod is evicted/recreated, requests with the same session still succeed (ServiceFQDN). +func TestCodeInterpreterSandboxAccessAfterPodRestart(t *testing.T) { + env := newTestEnv(t) + ctx, err := newE2ETestContext() + require.NoError(t, err) + + namespace := agentcubeNamespace + name := e2eCodeInterpreterName + + sessionID, err := env.createCodeInterpreterSession(namespace, name) + require.NoError(t, err, "Failed to create code interpreter session") + + t.Cleanup(func() { + _ = env.deleteCodeInterpreterSession(sessionID) + }) + + // 1. Invoke once to verify sandbox is ready + req := &CodeInterpreterExecuteRequest{ + Command: []string{"echo", "before-pod-restart"}, + } + resp, err := env.invokeCodeInterpreter(namespace, name, sessionID, req) + require.NoError(t, err, "First invoke should succeed") + require.Contains(t, resp.Stdout, "before-pod-restart") + + // 2. Find sandbox and its pod + sandbox, err := ctx.getSandboxBySessionID(namespace, sessionID) + require.NoError(t, err, "Should find sandbox by session ID") + require.NotNil(t, sandbox) + + pod, err := ctx.getPodByOwner(namespace, "Sandbox", sandbox.Name) + require.NoError(t, err, "Should find pod owned by sandbox") + require.NotNil(t, pod) + + t.Logf("Deleting pod %s/%s to simulate eviction", pod.Namespace, pod.Name) + err = ctx.kubeClient.CoreV1().Pods(namespace).Delete(context.Background(), pod.Name, metav1.DeleteOptions{}) + require.NoError(t, err, "Failed to delete pod") + + // 3. Wait for new pod to be ready (agent-sandbox reconciles and recreates) + require.Eventually(t, func() bool { + newPod, err := ctx.getPodByOwner(namespace, "Sandbox", sandbox.Name) + if err != nil || newPod == nil { + return false + } + for _, c := range newPod.Status.Conditions { + if c.Type == corev1.PodReady && c.Status == corev1.ConditionTrue { + return true + } + } + return false + }, 3*time.Minute, 5*time.Second, "New pod should become ready after recreation") + + // 4. Invoke again with same session - should succeed via ServiceFQDN + reqAfter := &CodeInterpreterExecuteRequest{ + Command: []string{"echo", "after-pod-restart"}, + } + respAfter, err := env.invokeCodeInterpreter(namespace, name, sessionID, reqAfter) + require.NoError(t, err, "Invoke after pod restart should succeed (ServiceFQDN resilience)") + require.Contains(t, respAfter.Stdout, "after-pod-restart") + + t.Logf("Sandbox access after pod restart verified successfully") +} + func (ctx *e2eTestContext) cleanupCodeInterpreter(t *testing.T, namespace, name, yamlPath string) { t.Log("Cleaning up code interpreter resources...") if err := ctx.deleteYamlFile(yamlPath); err != nil { @@ -1311,6 +1378,24 @@ func (ctx *e2eTestContext) getSandboxByOwner(namespace, ownerKind, ownerName str return found, nil } +// getSandboxBySessionID finds exactly one Sandbox with the given session ID label +func (ctx *e2eTestContext) getSandboxBySessionID(namespace, sessionID string) (*sandboxv1alpha1.Sandbox, error) { + sandboxList := &sandboxv1alpha1.SandboxList{} + err := ctx.ctrlClient.List(context.Background(), sandboxList, + client.InNamespace(namespace), + client.MatchingLabels{sessionIDLabelKey: sessionID}) + if err != nil { + return nil, fmt.Errorf("failed to list sandboxes: %w", err) + } + if len(sandboxList.Items) == 0 { + return nil, fmt.Errorf("no sandbox found with session ID %s", sessionID) + } + if len(sandboxList.Items) > 1 { + return nil, fmt.Errorf("found multiple sandboxes with session ID %s", sessionID) + } + return &sandboxList.Items[0], nil +} + // getPodByOwner finds exactly one Pod owned by the specified owner func (ctx *e2eTestContext) getPodByOwner(namespace, ownerKind, ownerName string) (*corev1.Pod, error) { podList, err := ctx.kubeClient.CoreV1().Pods(namespace).List(context.Background(), metav1.ListOptions{}) @@ -1505,7 +1590,7 @@ func TestCodeInterpreterBasicInvocationLoad(t *testing.T) { env := newTestEnv(t) namespace := agentcubeNamespace - name := "e2e-code-interpreter" + name := e2eCodeInterpreterName // Load test configuration const (