From 334699ef47f54b5a1d189302e163447753de1b4b Mon Sep 17 00:00:00 2001 From: Ryan Lymburner Date: Fri, 13 Jun 2025 11:29:29 -0700 Subject: [PATCH] Added integration test and additional logging around pod IP updates --- pkg/deploy/lattice/targets_manager.go | 12 +- pkg/gateway/model_build_targets.go | 14 ++ test/suites/integration/pod_ip_update_test.go | 176 ++++++++++++++++++ 3 files changed, 201 insertions(+), 1 deletion(-) create mode 100644 test/suites/integration/pod_ip_update_test.go diff --git a/pkg/deploy/lattice/targets_manager.go b/pkg/deploy/lattice/targets_manager.go index eabe70b2..3bf4df33 100644 --- a/pkg/deploy/lattice/targets_manager.go +++ b/pkg/deploy/lattice/targets_manager.go @@ -59,13 +59,21 @@ func (s *defaultTargetsManager) Update(ctx context.Context, modelTargets *model. modelTg.ID(), modelTargets.Spec.StackTargetGroupId) } - s.log.Debugf(ctx, "Creating targets for target group %s", modelTg.Status.Id) + s.log.Debugf(ctx, "Updating targets for target group %s with %d desired targets", modelTg.Status.Id, len(modelTargets.Spec.TargetList)) latticeTargets, err := s.List(ctx, modelTg) if err != nil { return err } + s.log.Debugf(ctx, "Found %d existing targets in VPC Lattice for target group %s", len(latticeTargets), modelTg.Status.Id) + staleTargets := s.findStaleTargets(modelTargets, latticeTargets) + if len(staleTargets) > 0 { + s.log.Infof(ctx, "Found %d stale targets to deregister from target group %s", len(staleTargets), modelTg.Status.Id) + for _, target := range staleTargets { + s.log.Debugf(ctx, "Stale target: %s:%d", target.TargetIP, target.Port) + } + } err1 := s.deregisterTargets(ctx, modelTg, staleTargets) err2 := s.registerTargets(ctx, modelTg, modelTargets.Spec.TargetList) @@ -92,6 +100,8 @@ func (s *defaultTargetsManager) findStaleTargets( TargetIP: aws.StringValue(target.Id), Port: aws.Int64Value(target.Port), } + // Consider targets stale if they are not in the current model set and not already draining + // This ensures that when pods are recreated with new IPs, old IPs are properly deregistered if aws.StringValue(target.Status) != vpclattice.TargetStatusDraining && !modelSet.Contains(ipPort) { staleTargets = append(staleTargets, ipPort) } diff --git a/pkg/gateway/model_build_targets.go b/pkg/gateway/model_build_targets.go index 15d081c3..30c97713 100644 --- a/pkg/gateway/model_build_targets.go +++ b/pkg/gateway/model_build_targets.go @@ -178,16 +178,26 @@ func (t *latticeTargetsModelBuildTask) getTargetListFromEndpoints(ctx context.Co return nil, err } + t.log.Debugf(ctx, "Found %d EndpointSlices for service %s/%s", len(epSlices.Items), t.service.Namespace, t.service.Name) + var targetList []model.Target for _, epSlice := range epSlices.Items { + t.log.Debugf(ctx, "Processing EndpointSlice %s with %d endpoints", epSlice.Name, len(epSlice.Endpoints)) for _, port := range epSlice.Ports { // Note that the Endpoint's port name is from ServicePort, but the actual registered port // is from Pods(targets). if _, ok := servicePortNames[aws.StringValue(port.Name)]; ok || skipMatch { for _, ep := range epSlice.Endpoints { + // Log endpoint conditions for debugging + t.log.Debugf(ctx, "Endpoint conditions - Ready: %v, Serving: %v, Terminating: %v", + aws.BoolValue(ep.Conditions.Ready), + aws.BoolValue(ep.Conditions.Serving), + aws.BoolValue(ep.Conditions.Terminating)) + for _, address := range ep.Addresses { // Do not model terminating endpoints so that they can deregister. if aws.BoolValue(ep.Conditions.Terminating) { + t.log.Debugf(ctx, "Skipping terminating endpoint %s", address) continue } target := model.Target{ @@ -197,6 +207,9 @@ func (t *latticeTargetsModelBuildTask) getTargetListFromEndpoints(ctx context.Co } if ep.TargetRef != nil && ep.TargetRef.Kind == "Pod" { target.TargetRef = types.NamespacedName{Namespace: ep.TargetRef.Namespace, Name: ep.TargetRef.Name} + t.log.Debugf(ctx, "Adding target %s:%d for pod %s/%s", address, aws.Int32Value(port.Port), ep.TargetRef.Namespace, ep.TargetRef.Name) + } else { + t.log.Debugf(ctx, "Adding target %s:%d (no pod reference)", address, aws.Int32Value(port.Port)) } targetList = append(targetList, target) } @@ -204,6 +217,7 @@ func (t *latticeTargetsModelBuildTask) getTargetListFromEndpoints(ctx context.Co } } } + t.log.Debugf(ctx, "Built %d targets from EndpointSlices for service %s/%s", len(targetList), t.service.Namespace, t.service.Name) return targetList, nil } diff --git a/test/suites/integration/pod_ip_update_test.go b/test/suites/integration/pod_ip_update_test.go new file mode 100644 index 00000000..0bccfd23 --- /dev/null +++ b/test/suites/integration/pod_ip_update_test.go @@ -0,0 +1,176 @@ +package integration + +import ( + "fmt" + "os" + "time" + + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/service/vpclattice" + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + "github.com/samber/lo" + appsv1 "k8s.io/api/apps/v1" + v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/types" + + "github.com/aws/aws-application-networking-k8s/pkg/model/core" + "github.com/aws/aws-application-networking-k8s/test/pkg/test" + gwv1 "sigs.k8s.io/gateway-api/apis/v1" + "sigs.k8s.io/gateway-api/apis/v1alpha2" +) + +var _ = Describe("Pod IP Update test", Ordered, func() { + var ( + httpsDeployment1 *appsv1.Deployment + httpsSvc1 *v1.Service + tlsRoute *v1alpha2.TLSRoute + initialPodIPs []string + initialTargets []*vpclattice.TargetSummary + ) + + It("Set up k8s resource for TLS passthrough", func() { + httpsDeployment1, httpsSvc1 = testFramework.NewHttpsApp(test.HTTPsAppOptions{Name: "tls-passthrough-test", Namespace: k8snamespace}) + tlsRoute = testFramework.NewTLSRoute(k8snamespace, testGateway, []v1alpha2.TLSRouteRule{ + { + BackendRefs: []gwv1.BackendRef{ + { + BackendObjectReference: gwv1.BackendObjectReference{ + Name: v1alpha2.ObjectName(httpsSvc1.Name), + Namespace: lo.ToPtr(gwv1.Namespace(httpsSvc1.Namespace)), + Kind: lo.ToPtr(gwv1.Kind("Service")), + Port: lo.ToPtr(gwv1.PortNumber(443)), + }, + }, + }, + }, + }) + // Create Kubernetes API Objects + testFramework.ExpectCreated(ctx, + tlsRoute, + httpsSvc1, + httpsDeployment1, + ) + }) + + It("Verify initial Lattice resource and capture pod IPs", func() { + route, _ := core.NewRoute(tlsRoute) + vpcLatticeService := testFramework.GetVpcLatticeService(ctx, route) + fmt.Printf("vpcLatticeService: %v \n", vpcLatticeService) + + tgSummary := testFramework.GetTCPTargetGroup(ctx, httpsSvc1) + tg, err := testFramework.LatticeClient.GetTargetGroup(&vpclattice.GetTargetGroupInput{ + TargetGroupIdentifier: aws.String(*tgSummary.Id), + }) + Expect(err).To(BeNil()) + Expect(tg).NotTo(BeNil()) + Expect(*tgSummary.VpcIdentifier).To(Equal(os.Getenv("CLUSTER_VPC_ID"))) + Expect(*tgSummary.Protocol).To(Equal("TCP")) + + // Capture initial targets and pod IPs + initialTargets = testFramework.GetTargets(ctx, tgSummary, httpsDeployment1) + Expect(len(initialTargets)).To(BeNumerically(">", 0)) + + // Get initial pod IPs + pods := testFramework.GetPodsByDeploymentName(httpsDeployment1.Name, httpsDeployment1.Namespace) + Expect(len(pods)).To(BeEquivalentTo(1)) + for _, pod := range pods { + initialPodIPs = append(initialPodIPs, pod.Status.PodIP) + } + + fmt.Printf("Initial pod IPs: %v\n", initialPodIPs) + fmt.Printf("Initial target count: %d\n", len(initialTargets)) + for _, target := range initialTargets { + fmt.Printf("Initial target: %s:%d\n", *target.Id, *target.Port) + } + }) + + It("Scale deployment to zero and back to one", func() { + // Scale down to zero + testFramework.Get(ctx, types.NamespacedName{Name: httpsDeployment1.Name, Namespace: httpsDeployment1.Namespace}, httpsDeployment1) + replicas := int32(0) + httpsDeployment1.Spec.Replicas = &replicas + testFramework.ExpectUpdated(ctx, httpsDeployment1) + + // Wait for pods to be terminated + Eventually(func(g Gomega) { + pods := testFramework.GetPodsByDeploymentName(httpsDeployment1.Name, httpsDeployment1.Namespace) + g.Expect(len(pods)).To(BeEquivalentTo(0)) + }).WithTimeout(2 * time.Minute).WithOffset(1).Should(Succeed()) + + fmt.Println("Deployment scaled down to zero") + + // Scale back up to one + testFramework.Get(ctx, types.NamespacedName{Name: httpsDeployment1.Name, Namespace: httpsDeployment1.Namespace}, httpsDeployment1) + replicas = int32(1) + httpsDeployment1.Spec.Replicas = &replicas + testFramework.ExpectUpdated(ctx, httpsDeployment1) + + // Wait for new pod to be ready + Eventually(func(g Gomega) { + pods := testFramework.GetPodsByDeploymentName(httpsDeployment1.Name, httpsDeployment1.Namespace) + g.Expect(len(pods)).To(BeEquivalentTo(1)) + g.Expect(pods[0].Status.Phase).To(Equal(v1.PodRunning)) + }).WithTimeout(3 * time.Minute).WithOffset(1).Should(Succeed()) + + fmt.Println("Deployment scaled back up to one") + }) + + It("Verify new pod IPs are different and targets are updated", func() { + // Get new pod IPs + var newPodIPs []string + pods := testFramework.GetPodsByDeploymentName(httpsDeployment1.Name, httpsDeployment1.Namespace) + Expect(len(pods)).To(BeEquivalentTo(1)) + for _, pod := range pods { + newPodIPs = append(newPodIPs, pod.Status.PodIP) + } + + fmt.Printf("New pod IPs: %v\n", newPodIPs) + + // Verify pod IPs have changed (this is the core issue we're testing) + Expect(newPodIPs).NotTo(Equal(initialPodIPs)) + + // Get updated targets from VPC Lattice + tgSummary := testFramework.GetTCPTargetGroup(ctx, httpsSvc1) + + // Wait for targets to be updated in VPC Lattice + Eventually(func(g Gomega) { + newTargets := testFramework.GetTargets(ctx, tgSummary, httpsDeployment1) + fmt.Printf("Current target count: %d\n", len(newTargets)) + for _, target := range newTargets { + fmt.Printf("Current target: %s:%d (status: %s)\n", *target.Id, *target.Port, *target.Status) + } + + // Verify that targets reflect the new pod IPs + targetIPs := make([]string, 0) + for _, target := range newTargets { + // Only consider healthy or initial targets, not draining ones + if *target.Status != vpclattice.TargetStatusDraining { + targetIPs = append(targetIPs, *target.Id) + } + } + + // The key assertion: new pod IPs should be registered as targets + for _, newPodIP := range newPodIPs { + g.Expect(targetIPs).To(ContainElement(newPodIP), + fmt.Sprintf("New pod IP %s should be registered as a target", newPodIP)) + } + + // Old pod IPs should not be in active targets (they should be draining or removed) + for _, oldPodIP := range initialPodIPs { + g.Expect(targetIPs).NotTo(ContainElement(oldPodIP), + fmt.Sprintf("Old pod IP %s should not be in active targets", oldPodIP)) + } + }).WithTimeout(5 * time.Minute).WithOffset(1).Should(Succeed()) + + fmt.Println("Target registration successfully updated with new pod IPs") + }) + + AfterAll(func() { + testFramework.ExpectDeletedThenNotFound(ctx, + tlsRoute, + httpsDeployment1, + httpsSvc1, + ) + }) +})