Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 11 additions & 1 deletion pkg/deploy/lattice/targets_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,13 +59,21 @@ func (s *defaultTargetsManager) Update(ctx context.Context, modelTargets *model.
modelTg.ID(), modelTargets.Spec.StackTargetGroupId)
}

s.log.Debugf(ctx, "Creating targets for target group %s", modelTg.Status.Id)
s.log.Debugf(ctx, "Updating targets for target group %s with %d desired targets", modelTg.Status.Id, len(modelTargets.Spec.TargetList))

latticeTargets, err := s.List(ctx, modelTg)
if err != nil {
return err
}
s.log.Debugf(ctx, "Found %d existing targets in VPC Lattice for target group %s", len(latticeTargets), modelTg.Status.Id)

staleTargets := s.findStaleTargets(modelTargets, latticeTargets)
if len(staleTargets) > 0 {
s.log.Infof(ctx, "Found %d stale targets to deregister from target group %s", len(staleTargets), modelTg.Status.Id)
for _, target := range staleTargets {
s.log.Debugf(ctx, "Stale target: %s:%d", target.TargetIP, target.Port)
}
}

err1 := s.deregisterTargets(ctx, modelTg, staleTargets)
err2 := s.registerTargets(ctx, modelTg, modelTargets.Spec.TargetList)
Expand All @@ -92,6 +100,8 @@ func (s *defaultTargetsManager) findStaleTargets(
TargetIP: aws.StringValue(target.Id),
Port: aws.Int64Value(target.Port),
}
// Consider targets stale if they are not in the current model set and not already draining
// This ensures that when pods are recreated with new IPs, old IPs are properly deregistered
if aws.StringValue(target.Status) != vpclattice.TargetStatusDraining && !modelSet.Contains(ipPort) {
staleTargets = append(staleTargets, ipPort)
}
Expand Down
14 changes: 14 additions & 0 deletions pkg/gateway/model_build_targets.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,16 +178,26 @@ func (t *latticeTargetsModelBuildTask) getTargetListFromEndpoints(ctx context.Co
return nil, err
}

t.log.Debugf(ctx, "Found %d EndpointSlices for service %s/%s", len(epSlices.Items), t.service.Namespace, t.service.Name)

var targetList []model.Target
for _, epSlice := range epSlices.Items {
t.log.Debugf(ctx, "Processing EndpointSlice %s with %d endpoints", epSlice.Name, len(epSlice.Endpoints))
for _, port := range epSlice.Ports {
// Note that the Endpoint's port name is from ServicePort, but the actual registered port
// is from Pods(targets).
if _, ok := servicePortNames[aws.StringValue(port.Name)]; ok || skipMatch {
for _, ep := range epSlice.Endpoints {
// Log endpoint conditions for debugging
t.log.Debugf(ctx, "Endpoint conditions - Ready: %v, Serving: %v, Terminating: %v",
aws.BoolValue(ep.Conditions.Ready),
aws.BoolValue(ep.Conditions.Serving),
aws.BoolValue(ep.Conditions.Terminating))

for _, address := range ep.Addresses {
// Do not model terminating endpoints so that they can deregister.
if aws.BoolValue(ep.Conditions.Terminating) {
t.log.Debugf(ctx, "Skipping terminating endpoint %s", address)
continue
}
target := model.Target{
Expand All @@ -197,13 +207,17 @@ func (t *latticeTargetsModelBuildTask) getTargetListFromEndpoints(ctx context.Co
}
if ep.TargetRef != nil && ep.TargetRef.Kind == "Pod" {
target.TargetRef = types.NamespacedName{Namespace: ep.TargetRef.Namespace, Name: ep.TargetRef.Name}
t.log.Debugf(ctx, "Adding target %s:%d for pod %s/%s", address, aws.Int32Value(port.Port), ep.TargetRef.Namespace, ep.TargetRef.Name)
} else {
t.log.Debugf(ctx, "Adding target %s:%d (no pod reference)", address, aws.Int32Value(port.Port))
}
targetList = append(targetList, target)
}
}
}
}
}
t.log.Debugf(ctx, "Built %d targets from EndpointSlices for service %s/%s", len(targetList), t.service.Namespace, t.service.Name)
return targetList, nil
}

Expand Down
176 changes: 176 additions & 0 deletions test/suites/integration/pod_ip_update_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,176 @@
package integration

import (
"fmt"
"os"
"time"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/vpclattice"
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
"github.com/samber/lo"
appsv1 "k8s.io/api/apps/v1"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types"

"github.com/aws/aws-application-networking-k8s/pkg/model/core"
"github.com/aws/aws-application-networking-k8s/test/pkg/test"
gwv1 "sigs.k8s.io/gateway-api/apis/v1"
"sigs.k8s.io/gateway-api/apis/v1alpha2"
)

var _ = Describe("Pod IP Update test", Ordered, func() {
var (
httpsDeployment1 *appsv1.Deployment
httpsSvc1 *v1.Service
tlsRoute *v1alpha2.TLSRoute
initialPodIPs []string
initialTargets []*vpclattice.TargetSummary
)

It("Set up k8s resource for TLS passthrough", func() {
httpsDeployment1, httpsSvc1 = testFramework.NewHttpsApp(test.HTTPsAppOptions{Name: "tls-passthrough-test", Namespace: k8snamespace})
tlsRoute = testFramework.NewTLSRoute(k8snamespace, testGateway, []v1alpha2.TLSRouteRule{
{
BackendRefs: []gwv1.BackendRef{
{
BackendObjectReference: gwv1.BackendObjectReference{
Name: v1alpha2.ObjectName(httpsSvc1.Name),
Namespace: lo.ToPtr(gwv1.Namespace(httpsSvc1.Namespace)),
Kind: lo.ToPtr(gwv1.Kind("Service")),
Port: lo.ToPtr(gwv1.PortNumber(443)),
},
},
},
},
})
// Create Kubernetes API Objects
testFramework.ExpectCreated(ctx,
tlsRoute,
httpsSvc1,
httpsDeployment1,
)
})

It("Verify initial Lattice resource and capture pod IPs", func() {
route, _ := core.NewRoute(tlsRoute)
vpcLatticeService := testFramework.GetVpcLatticeService(ctx, route)
fmt.Printf("vpcLatticeService: %v \n", vpcLatticeService)

tgSummary := testFramework.GetTCPTargetGroup(ctx, httpsSvc1)
tg, err := testFramework.LatticeClient.GetTargetGroup(&vpclattice.GetTargetGroupInput{
TargetGroupIdentifier: aws.String(*tgSummary.Id),
})
Expect(err).To(BeNil())
Expect(tg).NotTo(BeNil())
Expect(*tgSummary.VpcIdentifier).To(Equal(os.Getenv("CLUSTER_VPC_ID")))
Expect(*tgSummary.Protocol).To(Equal("TCP"))

// Capture initial targets and pod IPs
initialTargets = testFramework.GetTargets(ctx, tgSummary, httpsDeployment1)
Expect(len(initialTargets)).To(BeNumerically(">", 0))

// Get initial pod IPs
pods := testFramework.GetPodsByDeploymentName(httpsDeployment1.Name, httpsDeployment1.Namespace)
Expect(len(pods)).To(BeEquivalentTo(1))
for _, pod := range pods {
initialPodIPs = append(initialPodIPs, pod.Status.PodIP)
}

fmt.Printf("Initial pod IPs: %v\n", initialPodIPs)
fmt.Printf("Initial target count: %d\n", len(initialTargets))
for _, target := range initialTargets {
fmt.Printf("Initial target: %s:%d\n", *target.Id, *target.Port)
}
})

It("Scale deployment to zero and back to one", func() {
// Scale down to zero
testFramework.Get(ctx, types.NamespacedName{Name: httpsDeployment1.Name, Namespace: httpsDeployment1.Namespace}, httpsDeployment1)
replicas := int32(0)
httpsDeployment1.Spec.Replicas = &replicas
testFramework.ExpectUpdated(ctx, httpsDeployment1)

// Wait for pods to be terminated
Eventually(func(g Gomega) {
pods := testFramework.GetPodsByDeploymentName(httpsDeployment1.Name, httpsDeployment1.Namespace)
g.Expect(len(pods)).To(BeEquivalentTo(0))
}).WithTimeout(2 * time.Minute).WithOffset(1).Should(Succeed())

fmt.Println("Deployment scaled down to zero")

// Scale back up to one
testFramework.Get(ctx, types.NamespacedName{Name: httpsDeployment1.Name, Namespace: httpsDeployment1.Namespace}, httpsDeployment1)
replicas = int32(1)
httpsDeployment1.Spec.Replicas = &replicas
testFramework.ExpectUpdated(ctx, httpsDeployment1)

// Wait for new pod to be ready
Eventually(func(g Gomega) {
pods := testFramework.GetPodsByDeploymentName(httpsDeployment1.Name, httpsDeployment1.Namespace)
g.Expect(len(pods)).To(BeEquivalentTo(1))
g.Expect(pods[0].Status.Phase).To(Equal(v1.PodRunning))
}).WithTimeout(3 * time.Minute).WithOffset(1).Should(Succeed())

fmt.Println("Deployment scaled back up to one")
})

It("Verify new pod IPs are different and targets are updated", func() {
// Get new pod IPs
var newPodIPs []string
pods := testFramework.GetPodsByDeploymentName(httpsDeployment1.Name, httpsDeployment1.Namespace)
Expect(len(pods)).To(BeEquivalentTo(1))
for _, pod := range pods {
newPodIPs = append(newPodIPs, pod.Status.PodIP)
}

fmt.Printf("New pod IPs: %v\n", newPodIPs)

// Verify pod IPs have changed (this is the core issue we're testing)
Expect(newPodIPs).NotTo(Equal(initialPodIPs))

// Get updated targets from VPC Lattice
tgSummary := testFramework.GetTCPTargetGroup(ctx, httpsSvc1)

// Wait for targets to be updated in VPC Lattice
Eventually(func(g Gomega) {
newTargets := testFramework.GetTargets(ctx, tgSummary, httpsDeployment1)
fmt.Printf("Current target count: %d\n", len(newTargets))
for _, target := range newTargets {
fmt.Printf("Current target: %s:%d (status: %s)\n", *target.Id, *target.Port, *target.Status)
}

// Verify that targets reflect the new pod IPs
targetIPs := make([]string, 0)
for _, target := range newTargets {
// Only consider healthy or initial targets, not draining ones
if *target.Status != vpclattice.TargetStatusDraining {
targetIPs = append(targetIPs, *target.Id)
}
}

// The key assertion: new pod IPs should be registered as targets
for _, newPodIP := range newPodIPs {
g.Expect(targetIPs).To(ContainElement(newPodIP),
fmt.Sprintf("New pod IP %s should be registered as a target", newPodIP))
}

// Old pod IPs should not be in active targets (they should be draining or removed)
for _, oldPodIP := range initialPodIPs {
g.Expect(targetIPs).NotTo(ContainElement(oldPodIP),
fmt.Sprintf("Old pod IP %s should not be in active targets", oldPodIP))
}
}).WithTimeout(5 * time.Minute).WithOffset(1).Should(Succeed())

fmt.Println("Target registration successfully updated with new pod IPs")
})

AfterAll(func() {
testFramework.ExpectDeletedThenNotFound(ctx,
tlsRoute,
httpsDeployment1,
httpsSvc1,
)
})
})
Loading