Skip to content

Commit 13e0d1b

Browse files
author
Ryan Lymburner
authored
Added integration test and additional logging around pod IP updates (#771)
1 parent 7ddc642 commit 13e0d1b

File tree

3 files changed

+201
-1
lines changed

3 files changed

+201
-1
lines changed

pkg/deploy/lattice/targets_manager.go

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,13 +59,21 @@ func (s *defaultTargetsManager) Update(ctx context.Context, modelTargets *model.
5959
modelTg.ID(), modelTargets.Spec.StackTargetGroupId)
6060
}
6161

62-
s.log.Debugf(ctx, "Creating targets for target group %s", modelTg.Status.Id)
62+
s.log.Debugf(ctx, "Updating targets for target group %s with %d desired targets", modelTg.Status.Id, len(modelTargets.Spec.TargetList))
6363

6464
latticeTargets, err := s.List(ctx, modelTg)
6565
if err != nil {
6666
return err
6767
}
68+
s.log.Debugf(ctx, "Found %d existing targets in VPC Lattice for target group %s", len(latticeTargets), modelTg.Status.Id)
69+
6870
staleTargets := s.findStaleTargets(modelTargets, latticeTargets)
71+
if len(staleTargets) > 0 {
72+
s.log.Infof(ctx, "Found %d stale targets to deregister from target group %s", len(staleTargets), modelTg.Status.Id)
73+
for _, target := range staleTargets {
74+
s.log.Debugf(ctx, "Stale target: %s:%d", target.TargetIP, target.Port)
75+
}
76+
}
6977

7078
err1 := s.deregisterTargets(ctx, modelTg, staleTargets)
7179
err2 := s.registerTargets(ctx, modelTg, modelTargets.Spec.TargetList)
@@ -92,6 +100,8 @@ func (s *defaultTargetsManager) findStaleTargets(
92100
TargetIP: aws.StringValue(target.Id),
93101
Port: aws.Int64Value(target.Port),
94102
}
103+
// Consider targets stale if they are not in the current model set and not already draining
104+
// This ensures that when pods are recreated with new IPs, old IPs are properly deregistered
95105
if aws.StringValue(target.Status) != vpclattice.TargetStatusDraining && !modelSet.Contains(ipPort) {
96106
staleTargets = append(staleTargets, ipPort)
97107
}

pkg/gateway/model_build_targets.go

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -178,16 +178,26 @@ func (t *latticeTargetsModelBuildTask) getTargetListFromEndpoints(ctx context.Co
178178
return nil, err
179179
}
180180

181+
t.log.Debugf(ctx, "Found %d EndpointSlices for service %s/%s", len(epSlices.Items), t.service.Namespace, t.service.Name)
182+
181183
var targetList []model.Target
182184
for _, epSlice := range epSlices.Items {
185+
t.log.Debugf(ctx, "Processing EndpointSlice %s with %d endpoints", epSlice.Name, len(epSlice.Endpoints))
183186
for _, port := range epSlice.Ports {
184187
// Note that the Endpoint's port name is from ServicePort, but the actual registered port
185188
// is from Pods(targets).
186189
if _, ok := servicePortNames[aws.StringValue(port.Name)]; ok || skipMatch {
187190
for _, ep := range epSlice.Endpoints {
191+
// Log endpoint conditions for debugging
192+
t.log.Debugf(ctx, "Endpoint conditions - Ready: %v, Serving: %v, Terminating: %v",
193+
aws.BoolValue(ep.Conditions.Ready),
194+
aws.BoolValue(ep.Conditions.Serving),
195+
aws.BoolValue(ep.Conditions.Terminating))
196+
188197
for _, address := range ep.Addresses {
189198
// Do not model terminating endpoints so that they can deregister.
190199
if aws.BoolValue(ep.Conditions.Terminating) {
200+
t.log.Debugf(ctx, "Skipping terminating endpoint %s", address)
191201
continue
192202
}
193203
target := model.Target{
@@ -197,13 +207,17 @@ func (t *latticeTargetsModelBuildTask) getTargetListFromEndpoints(ctx context.Co
197207
}
198208
if ep.TargetRef != nil && ep.TargetRef.Kind == "Pod" {
199209
target.TargetRef = types.NamespacedName{Namespace: ep.TargetRef.Namespace, Name: ep.TargetRef.Name}
210+
t.log.Debugf(ctx, "Adding target %s:%d for pod %s/%s", address, aws.Int32Value(port.Port), ep.TargetRef.Namespace, ep.TargetRef.Name)
211+
} else {
212+
t.log.Debugf(ctx, "Adding target %s:%d (no pod reference)", address, aws.Int32Value(port.Port))
200213
}
201214
targetList = append(targetList, target)
202215
}
203216
}
204217
}
205218
}
206219
}
220+
t.log.Debugf(ctx, "Built %d targets from EndpointSlices for service %s/%s", len(targetList), t.service.Namespace, t.service.Name)
207221
return targetList, nil
208222
}
209223

Lines changed: 176 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,176 @@
1+
package integration
2+
3+
import (
4+
"fmt"
5+
"os"
6+
"time"
7+
8+
"github.com/aws/aws-sdk-go/aws"
9+
"github.com/aws/aws-sdk-go/service/vpclattice"
10+
. "github.com/onsi/ginkgo/v2"
11+
. "github.com/onsi/gomega"
12+
"github.com/samber/lo"
13+
appsv1 "k8s.io/api/apps/v1"
14+
v1 "k8s.io/api/core/v1"
15+
"k8s.io/apimachinery/pkg/types"
16+
17+
"github.com/aws/aws-application-networking-k8s/pkg/model/core"
18+
"github.com/aws/aws-application-networking-k8s/test/pkg/test"
19+
gwv1 "sigs.k8s.io/gateway-api/apis/v1"
20+
"sigs.k8s.io/gateway-api/apis/v1alpha2"
21+
)
22+
23+
var _ = Describe("Pod IP Update test", Ordered, func() {
24+
var (
25+
httpsDeployment1 *appsv1.Deployment
26+
httpsSvc1 *v1.Service
27+
tlsRoute *v1alpha2.TLSRoute
28+
initialPodIPs []string
29+
initialTargets []*vpclattice.TargetSummary
30+
)
31+
32+
It("Set up k8s resource for TLS passthrough", func() {
33+
httpsDeployment1, httpsSvc1 = testFramework.NewHttpsApp(test.HTTPsAppOptions{Name: "tls-passthrough-test", Namespace: k8snamespace})
34+
tlsRoute = testFramework.NewTLSRoute(k8snamespace, testGateway, []v1alpha2.TLSRouteRule{
35+
{
36+
BackendRefs: []gwv1.BackendRef{
37+
{
38+
BackendObjectReference: gwv1.BackendObjectReference{
39+
Name: v1alpha2.ObjectName(httpsSvc1.Name),
40+
Namespace: lo.ToPtr(gwv1.Namespace(httpsSvc1.Namespace)),
41+
Kind: lo.ToPtr(gwv1.Kind("Service")),
42+
Port: lo.ToPtr(gwv1.PortNumber(443)),
43+
},
44+
},
45+
},
46+
},
47+
})
48+
// Create Kubernetes API Objects
49+
testFramework.ExpectCreated(ctx,
50+
tlsRoute,
51+
httpsSvc1,
52+
httpsDeployment1,
53+
)
54+
})
55+
56+
It("Verify initial Lattice resource and capture pod IPs", func() {
57+
route, _ := core.NewRoute(tlsRoute)
58+
vpcLatticeService := testFramework.GetVpcLatticeService(ctx, route)
59+
fmt.Printf("vpcLatticeService: %v \n", vpcLatticeService)
60+
61+
tgSummary := testFramework.GetTCPTargetGroup(ctx, httpsSvc1)
62+
tg, err := testFramework.LatticeClient.GetTargetGroup(&vpclattice.GetTargetGroupInput{
63+
TargetGroupIdentifier: aws.String(*tgSummary.Id),
64+
})
65+
Expect(err).To(BeNil())
66+
Expect(tg).NotTo(BeNil())
67+
Expect(*tgSummary.VpcIdentifier).To(Equal(os.Getenv("CLUSTER_VPC_ID")))
68+
Expect(*tgSummary.Protocol).To(Equal("TCP"))
69+
70+
// Capture initial targets and pod IPs
71+
initialTargets = testFramework.GetTargets(ctx, tgSummary, httpsDeployment1)
72+
Expect(len(initialTargets)).To(BeNumerically(">", 0))
73+
74+
// Get initial pod IPs
75+
pods := testFramework.GetPodsByDeploymentName(httpsDeployment1.Name, httpsDeployment1.Namespace)
76+
Expect(len(pods)).To(BeEquivalentTo(1))
77+
for _, pod := range pods {
78+
initialPodIPs = append(initialPodIPs, pod.Status.PodIP)
79+
}
80+
81+
fmt.Printf("Initial pod IPs: %v\n", initialPodIPs)
82+
fmt.Printf("Initial target count: %d\n", len(initialTargets))
83+
for _, target := range initialTargets {
84+
fmt.Printf("Initial target: %s:%d\n", *target.Id, *target.Port)
85+
}
86+
})
87+
88+
It("Scale deployment to zero and back to one", func() {
89+
// Scale down to zero
90+
testFramework.Get(ctx, types.NamespacedName{Name: httpsDeployment1.Name, Namespace: httpsDeployment1.Namespace}, httpsDeployment1)
91+
replicas := int32(0)
92+
httpsDeployment1.Spec.Replicas = &replicas
93+
testFramework.ExpectUpdated(ctx, httpsDeployment1)
94+
95+
// Wait for pods to be terminated
96+
Eventually(func(g Gomega) {
97+
pods := testFramework.GetPodsByDeploymentName(httpsDeployment1.Name, httpsDeployment1.Namespace)
98+
g.Expect(len(pods)).To(BeEquivalentTo(0))
99+
}).WithTimeout(2 * time.Minute).WithOffset(1).Should(Succeed())
100+
101+
fmt.Println("Deployment scaled down to zero")
102+
103+
// Scale back up to one
104+
testFramework.Get(ctx, types.NamespacedName{Name: httpsDeployment1.Name, Namespace: httpsDeployment1.Namespace}, httpsDeployment1)
105+
replicas = int32(1)
106+
httpsDeployment1.Spec.Replicas = &replicas
107+
testFramework.ExpectUpdated(ctx, httpsDeployment1)
108+
109+
// Wait for new pod to be ready
110+
Eventually(func(g Gomega) {
111+
pods := testFramework.GetPodsByDeploymentName(httpsDeployment1.Name, httpsDeployment1.Namespace)
112+
g.Expect(len(pods)).To(BeEquivalentTo(1))
113+
g.Expect(pods[0].Status.Phase).To(Equal(v1.PodRunning))
114+
}).WithTimeout(3 * time.Minute).WithOffset(1).Should(Succeed())
115+
116+
fmt.Println("Deployment scaled back up to one")
117+
})
118+
119+
It("Verify new pod IPs are different and targets are updated", func() {
120+
// Get new pod IPs
121+
var newPodIPs []string
122+
pods := testFramework.GetPodsByDeploymentName(httpsDeployment1.Name, httpsDeployment1.Namespace)
123+
Expect(len(pods)).To(BeEquivalentTo(1))
124+
for _, pod := range pods {
125+
newPodIPs = append(newPodIPs, pod.Status.PodIP)
126+
}
127+
128+
fmt.Printf("New pod IPs: %v\n", newPodIPs)
129+
130+
// Verify pod IPs have changed (this is the core issue we're testing)
131+
Expect(newPodIPs).NotTo(Equal(initialPodIPs))
132+
133+
// Get updated targets from VPC Lattice
134+
tgSummary := testFramework.GetTCPTargetGroup(ctx, httpsSvc1)
135+
136+
// Wait for targets to be updated in VPC Lattice
137+
Eventually(func(g Gomega) {
138+
newTargets := testFramework.GetTargets(ctx, tgSummary, httpsDeployment1)
139+
fmt.Printf("Current target count: %d\n", len(newTargets))
140+
for _, target := range newTargets {
141+
fmt.Printf("Current target: %s:%d (status: %s)\n", *target.Id, *target.Port, *target.Status)
142+
}
143+
144+
// Verify that targets reflect the new pod IPs
145+
targetIPs := make([]string, 0)
146+
for _, target := range newTargets {
147+
// Only consider healthy or initial targets, not draining ones
148+
if *target.Status != vpclattice.TargetStatusDraining {
149+
targetIPs = append(targetIPs, *target.Id)
150+
}
151+
}
152+
153+
// The key assertion: new pod IPs should be registered as targets
154+
for _, newPodIP := range newPodIPs {
155+
g.Expect(targetIPs).To(ContainElement(newPodIP),
156+
fmt.Sprintf("New pod IP %s should be registered as a target", newPodIP))
157+
}
158+
159+
// Old pod IPs should not be in active targets (they should be draining or removed)
160+
for _, oldPodIP := range initialPodIPs {
161+
g.Expect(targetIPs).NotTo(ContainElement(oldPodIP),
162+
fmt.Sprintf("Old pod IP %s should not be in active targets", oldPodIP))
163+
}
164+
}).WithTimeout(5 * time.Minute).WithOffset(1).Should(Succeed())
165+
166+
fmt.Println("Target registration successfully updated with new pod IPs")
167+
})
168+
169+
AfterAll(func() {
170+
testFramework.ExpectDeletedThenNotFound(ctx,
171+
tlsRoute,
172+
httpsDeployment1,
173+
httpsSvc1,
174+
)
175+
})
176+
})

0 commit comments

Comments
 (0)