From f3cb4e29652194533edfc518173a4b32baa132c1 Mon Sep 17 00:00:00 2001 From: vbedi Date: Wed, 29 Oct 2025 18:43:51 +0000 Subject: [PATCH 1/2] Add allow-takeover-from annotation to enable automated VPC Lattice service takeover between clusters --- docs/guides/advanced-configurations.md | 39 ++ pkg/aws/cloud.go | 9 +- pkg/aws/cloud_mocks.go | 14 + pkg/aws/services/tagging.go | 24 +- pkg/aws/services/tagging_mocks.go | 8 +- pkg/aws/services/tagging_test.go | 67 ++- .../predicates/allowtakeoverfrom_predicate.go | 31 ++ .../allowtakeoverfrom_predicate_test.go | 158 +++++++ pkg/controllers/route_controller.go | 2 +- .../access_log_subscription_manager.go | 2 +- .../access_log_subscription_manager_test.go | 4 +- pkg/deploy/lattice/listener_manager.go | 9 +- pkg/deploy/lattice/listener_manager_test.go | 58 ++- pkg/deploy/lattice/rule_manager.go | 13 +- pkg/deploy/lattice/rule_manager_test.go | 89 +++- pkg/deploy/lattice/service_manager.go | 49 ++- pkg/deploy/lattice/service_manager_test.go | 141 +++++- pkg/deploy/lattice/service_network_manager.go | 2 +- .../lattice/service_network_manager_test.go | 8 +- pkg/deploy/lattice/target_group_manager.go | 2 +- .../lattice/target_group_manager_test.go | 10 +- pkg/gateway/model_build_lattice_service.go | 23 + pkg/k8s/utils.go | 3 + pkg/model/lattice/service.go | 1 + .../integration/service_takeover_test.go | 404 ++++++++++++++++++ 25 files changed, 1100 insertions(+), 70 deletions(-) create mode 100644 pkg/controllers/predicates/allowtakeoverfrom_predicate.go create mode 100644 pkg/controllers/predicates/allowtakeoverfrom_predicate_test.go create mode 100644 test/suites/integration/service_takeover_test.go diff --git a/docs/guides/advanced-configurations.md b/docs/guides/advanced-configurations.md index 7f18fa44..e109954f 100644 --- a/docs/guides/advanced-configurations.md +++ b/docs/guides/advanced-configurations.md @@ -132,3 +132,42 @@ spec: port: 80 targetPort: 8090 ``` + +### Blue/Green Multi-Cluster Migration with Service Takeover + +For blue/green cluster migrations, the controller supports automated takeover of VPC Lattice services using the `allow-takeover-from` annotation. This eliminates the need for manual ManagedBy tag changes during cluster migrations. + +#### Migration Workflow + +1. Blue cluster creates HTTPRoute +2. Blue cluster exports service using ServiceExport (creates standalone target group for cross-cluster access) +3. Green cluster imports blue service using ServiceImport (references the exported target group from blue cluster) +4. Green cluster creates HTTPRoute with takeover annotation to claim the existing VPC Lattice service: + +```yaml +apiVersion: gateway.networking.k8s.io/v1 +kind: HTTPRoute +metadata: + name: inventory-service + annotations: + application-networking.k8s.aws/allow-takeover-from: "123456789012/blue-cluster/vpc-0abc123def456789" +spec: + parentRefs: + - name: my-gateway + rules: + - matches: + - path: + type: PathPrefix + value: /inventory + backendRefs: + - name: inventory-ver1 + kind: ServiceImport + weight: 90 + - name: inventory-ver2 + kind: Service + port: 80 + weight: 10 +``` + +5. Controller takes over the VPC Lattice service and updates it to reflect traffic weights in green HTTPRoute +6. Controller updates ManagedBy tag on service, service network service association, listeners, and rules to transfer ownership diff --git a/pkg/aws/cloud.go b/pkg/aws/cloud.go index 9562f58a..e54f3243 100644 --- a/pkg/aws/cloud.go +++ b/pkg/aws/cloud.go @@ -52,6 +52,9 @@ type Cloud interface { // MergeTags creates a new tag map by merging baseTags and additionalTags. // BaseTags will override additionalTags for any duplicate keys. MergeTags(baseTags services.Tags, additionalTags services.Tags) services.Tags + + // GetManagedByFromTags extracts the ManagedBy tag value from a tags map + GetManagedByFromTags(tags services.Tags) string } // NewCloud constructs new Cloud implementation. @@ -168,7 +171,7 @@ func (c *defaultCloud) getTags(ctx context.Context, arn string) (services.Tags, return resp.Tags, nil } -func (c *defaultCloud) getManagedByFromTags(tags services.Tags) string { +func (c *defaultCloud) GetManagedByFromTags(tags services.Tags) string { tag, ok := tags[TagManagedBy] if !ok || tag == nil { return "" @@ -181,7 +184,7 @@ func (c *defaultCloud) IsArnManaged(ctx context.Context, arn string) (bool, erro if err != nil { return false, err } - return c.isOwner(c.getManagedByFromTags(tags)), nil + return c.isOwner(c.GetManagedByFromTags(tags)), nil } func (c *defaultCloud) TryOwn(ctx context.Context, arn string) (bool, error) { @@ -195,7 +198,7 @@ func (c *defaultCloud) TryOwn(ctx context.Context, arn string) (bool, error) { func (c *defaultCloud) TryOwnFromTags(ctx context.Context, arn string, tags services.Tags) (bool, error) { // For resources that need backwards compatibility - not having managedBy is considered as owned by controller. - managedBy := c.getManagedByFromTags(tags) + managedBy := c.GetManagedByFromTags(tags) if managedBy == "" { err := c.ownResource(ctx, arn) if err != nil { diff --git a/pkg/aws/cloud_mocks.go b/pkg/aws/cloud_mocks.go index 26293d05..8a923453 100644 --- a/pkg/aws/cloud_mocks.go +++ b/pkg/aws/cloud_mocks.go @@ -77,6 +77,20 @@ func (mr *MockCloudMockRecorder) DefaultTagsMergedWith(arg0 interface{}) *gomock return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DefaultTagsMergedWith", reflect.TypeOf((*MockCloud)(nil).DefaultTagsMergedWith), arg0) } +// GetManagedByFromTags mocks base method. +func (m *MockCloud) GetManagedByFromTags(arg0 map[string]*string) string { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetManagedByFromTags", arg0) + ret0, _ := ret[0].(string) + return ret0 +} + +// GetManagedByFromTags indicates an expected call of GetManagedByFromTags. +func (mr *MockCloudMockRecorder) GetManagedByFromTags(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetManagedByFromTags", reflect.TypeOf((*MockCloud)(nil).GetManagedByFromTags), arg0) +} + // IsArnManaged mocks base method. func (m *MockCloud) IsArnManaged(arg0 context.Context, arg1 string) (bool, error) { m.ctrl.T.Helper() diff --git a/pkg/aws/services/tagging.go b/pkg/aws/services/tagging.go index 518326cb..d8272ddc 100644 --- a/pkg/aws/services/tagging.go +++ b/pkg/aws/services/tagging.go @@ -37,7 +37,7 @@ type Tagging interface { FindResourcesByTags(ctx context.Context, resourceType ResourceType, tags Tags) ([]string, error) // Updates tags for a given resource ARN - UpdateTags(ctx context.Context, resourceArn string, newTags Tags) error + UpdateTags(ctx context.Context, resourceArn string, additionalTags Tags, awsManagedTags Tags) error } type defaultTagging struct { @@ -170,14 +170,21 @@ func convertTagsToFilter(tags Tags) []*taggingapi.TagFilter { return filters } -func (t *defaultTagging) UpdateTags(ctx context.Context, resourceArn string, newTags Tags) error { +func (t *defaultTagging) UpdateTags(ctx context.Context, resourceArn string, additionalTags Tags, awsManagedTags Tags) error { existingTags, err := t.GetTagsForArns(ctx, []string{resourceArn}) if err != nil { return fmt.Errorf("failed to get existing tags: %w", err) } currentTags := k8s.GetNonAWSManagedTags(existingTags[resourceArn]) - filteredNewTags := k8s.GetNonAWSManagedTags(newTags) + filteredNewTags := k8s.GetNonAWSManagedTags(additionalTags) + + for key, value := range awsManagedTags { + if existingValue, exists := existingTags[resourceArn][key]; exists { + currentTags[key] = existingValue + } + filteredNewTags[key] = value + } tagsToAdd, tagsToRemove := k8s.CalculateTagDifference(currentTags, filteredNewTags) @@ -204,7 +211,7 @@ func (t *defaultTagging) UpdateTags(ctx context.Context, resourceArn string, new return nil } -func (t *latticeTagging) UpdateTags(ctx context.Context, resourceArn string, newTags Tags) error { +func (t *latticeTagging) UpdateTags(ctx context.Context, resourceArn string, additionalTags Tags, awsManagedTags Tags) error { existingTags, err := t.ListTagsForResourceWithContext(ctx, &vpclattice.ListTagsForResourceInput{ ResourceArn: aws.String(resourceArn), }) @@ -213,7 +220,14 @@ func (t *latticeTagging) UpdateTags(ctx context.Context, resourceArn string, new } currentTags := k8s.GetNonAWSManagedTags(existingTags.Tags) - filteredNewTags := k8s.GetNonAWSManagedTags(newTags) + filteredNewTags := k8s.GetNonAWSManagedTags(additionalTags) + + for key, value := range awsManagedTags { + if existingValue, exists := existingTags.Tags[key]; exists { + currentTags[key] = existingValue + } + filteredNewTags[key] = value + } tagsToAdd, tagsToRemove := k8s.CalculateTagDifference(currentTags, filteredNewTags) diff --git a/pkg/aws/services/tagging_mocks.go b/pkg/aws/services/tagging_mocks.go index 1f452a19..786f6596 100644 --- a/pkg/aws/services/tagging_mocks.go +++ b/pkg/aws/services/tagging_mocks.go @@ -65,15 +65,15 @@ func (mr *MockTaggingMockRecorder) GetTagsForArns(arg0, arg1 interface{}) *gomoc } // UpdateTags mocks base method. -func (m *MockTagging) UpdateTags(arg0 context.Context, arg1 string, arg2 map[string]*string) error { +func (m *MockTagging) UpdateTags(arg0 context.Context, arg1 string, arg2, arg3 map[string]*string) error { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "UpdateTags", arg0, arg1, arg2) + ret := m.ctrl.Call(m, "UpdateTags", arg0, arg1, arg2, arg3) ret0, _ := ret[0].(error) return ret0 } // UpdateTags indicates an expected call of UpdateTags. -func (mr *MockTaggingMockRecorder) UpdateTags(arg0, arg1, arg2 interface{}) *gomock.Call { +func (mr *MockTaggingMockRecorder) UpdateTags(arg0, arg1, arg2, arg3 interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "UpdateTags", reflect.TypeOf((*MockTagging)(nil).UpdateTags), arg0, arg1, arg2) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "UpdateTags", reflect.TypeOf((*MockTagging)(nil).UpdateTags), arg0, arg1, arg2, arg3) } diff --git a/pkg/aws/services/tagging_test.go b/pkg/aws/services/tagging_test.go index 2ec02123..9bbf4d09 100644 --- a/pkg/aws/services/tagging_test.go +++ b/pkg/aws/services/tagging_test.go @@ -436,87 +436,118 @@ func TestLatticeTagging_UpdateTags(t *testing.T) { name string resourceArn string existingTags Tags - newTags Tags + additionalTags Tags + awsManagedTags Tags expectedTagCalls int expectedUntagCalls int expectError bool description string }{ { - name: "nil new tags removes all existing additional tags", + name: "nil additional tags removes all existing additional tags", resourceArn: "arn:aws:vpc-lattice:us-west-2:123456789:service/svc-123", existingTags: Tags{ "Environment": aws.String("Dev"), "Project": aws.String("MyApp"), "application-networking.k8s.aws/ManagedBy": aws.String("123456789/cluster/vpc-123"), }, - newTags: nil, + additionalTags: nil, + awsManagedTags: nil, expectedTagCalls: 0, expectedUntagCalls: 1, expectError: false, - description: "should remove all additional tags when newTags is nil", + description: "should remove all additional tags when additionalTags is nil", }, { - name: "add new tags when no existing additional tags", + name: "add new additional tags when no existing additional tags", resourceArn: "arn:aws:vpc-lattice:us-west-2:123456789:service/svc-123", existingTags: Tags{ "application-networking.k8s.aws/ManagedBy": aws.String("123456789/cluster/vpc-123"), }, - newTags: Tags{ + additionalTags: Tags{ "Environment": aws.String("Dev"), "Project": aws.String("MyApp"), }, + awsManagedTags: nil, expectedTagCalls: 1, expectedUntagCalls: 0, expectError: false, - description: "should add new tags when no existing additional tags", + description: "should add new additional tags when no existing additional tags", }, { - name: "update existing additional tags", + name: "update AWS managed tags only", + resourceArn: "arn:aws:vpc-lattice:us-west-2:123456789:service/svc-123", + existingTags: Tags{ + "Environment": aws.String("Dev"), + "application-networking.k8s.aws/ManagedBy": aws.String("old-cluster/old-vpc"), + }, + additionalTags: Tags{ + "Environment": aws.String("Dev"), + }, + awsManagedTags: Tags{ + "application-networking.k8s.aws/ManagedBy": aws.String("new-cluster/new-vpc"), + }, + expectedTagCalls: 1, + expectedUntagCalls: 0, + expectError: false, + description: "should update AWS managed tags when provided", + }, + { + name: "update both additional and AWS managed tags", resourceArn: "arn:aws:vpc-lattice:us-west-2:123456789:service/svc-123", existingTags: Tags{ "Environment": aws.String("Dev"), "Project": aws.String("OldApp"), - "application-networking.k8s.aws/ManagedBy": aws.String("123456789/cluster/vpc-123"), + "application-networking.k8s.aws/ManagedBy": aws.String("old-cluster/old-vpc"), }, - newTags: Tags{ + additionalTags: Tags{ "Environment": aws.String("Prod"), "Project": aws.String("NewApp"), }, + awsManagedTags: Tags{ + "application-networking.k8s.aws/ManagedBy": aws.String("new-cluster/new-vpc"), + }, expectedTagCalls: 1, expectedUntagCalls: 0, expectError: false, - description: "should update changed additional tag values", + description: "should update both additional and AWS managed tags", }, { - name: "no changes needed", + name: "no changes needed with AWS managed tags", resourceArn: "arn:aws:vpc-lattice:us-west-2:123456789:service/svc-123", existingTags: Tags{ "Environment": aws.String("Dev"), "Project": aws.String("MyApp"), "application-networking.k8s.aws/ManagedBy": aws.String("123456789/cluster/vpc-123"), }, - newTags: Tags{ + additionalTags: Tags{ "Environment": aws.String("Dev"), "Project": aws.String("MyApp"), }, + awsManagedTags: Tags{ + "application-networking.k8s.aws/ManagedBy": aws.String("123456789/cluster/vpc-123"), + }, expectedTagCalls: 0, expectedUntagCalls: 0, expectError: false, description: "should not make API calls when no changes needed", }, { - name: "filters out AWS managed tags from new tags", + name: "filters out AWS managed tags from additional tags", resourceArn: "arn:aws:vpc-lattice:us-west-2:123456789:service/svc-123", existingTags: Tags{}, - newTags: Tags{ + additionalTags: Tags{ "application-networking.k8s.aws/ManagedBy": aws.String("test-override"), "application-networking.k8s.aws/RouteType": aws.String("http"), + "Environment": aws.String("Dev"), }, - expectedTagCalls: 0, + awsManagedTags: Tags{ + "application-networking.k8s.aws/ManagedBy": aws.String("correct-value"), + }, + expectedTagCalls: 1, expectedUntagCalls: 0, expectError: false, - description: "should filter out AWS managed tags from new tags, resulting in no API calls", + description: "should filter out AWS managed tags from additional tags but include them from awsManagedTags", }, } @@ -542,7 +573,7 @@ func TestLatticeTagging_UpdateTags(t *testing.T) { Return(nil, nil).Times(tt.expectedTagCalls) } - err := lt.UpdateTags(ctx, tt.resourceArn, tt.newTags) + err := lt.UpdateTags(ctx, tt.resourceArn, tt.additionalTags, tt.awsManagedTags) if tt.expectError { assert.Error(t, err, tt.description) diff --git a/pkg/controllers/predicates/allowtakeoverfrom_predicate.go b/pkg/controllers/predicates/allowtakeoverfrom_predicate.go new file mode 100644 index 00000000..44791bb7 --- /dev/null +++ b/pkg/controllers/predicates/allowtakeoverfrom_predicate.go @@ -0,0 +1,31 @@ +package predicates + +import ( + "sigs.k8s.io/controller-runtime/pkg/event" + "sigs.k8s.io/controller-runtime/pkg/predicate" + + "github.com/aws/aws-application-networking-k8s/pkg/k8s" +) + +var AllowTakeoverFromAnnotationChangedPredicate = predicate.Funcs{ + UpdateFunc: func(e event.UpdateEvent) bool { + oldAnnotations := e.ObjectOld.GetAnnotations() + newAnnotations := e.ObjectNew.GetAnnotations() + + oldAllowTakeoverFromAnnotation := getAllowTakeoverFromAnnotation(oldAnnotations) + newAllowTakeoverFromAnnotation := getAllowTakeoverFromAnnotation(newAnnotations) + + return oldAllowTakeoverFromAnnotation != newAllowTakeoverFromAnnotation + }, + CreateFunc: func(e event.CreateEvent) bool { + annotations := e.Object.GetAnnotations() + return getAllowTakeoverFromAnnotation(annotations) != "" + }, +} + +func getAllowTakeoverFromAnnotation(annotations map[string]string) string { + if annotations == nil { + return "" + } + return annotations[k8s.AllowTakeoverFromAnnotation] +} diff --git a/pkg/controllers/predicates/allowtakeoverfrom_predicate_test.go b/pkg/controllers/predicates/allowtakeoverfrom_predicate_test.go new file mode 100644 index 00000000..16797a71 --- /dev/null +++ b/pkg/controllers/predicates/allowtakeoverfrom_predicate_test.go @@ -0,0 +1,158 @@ +package predicates + +import ( + "testing" + + "github.com/stretchr/testify/assert" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "sigs.k8s.io/controller-runtime/pkg/event" + gwv1 "sigs.k8s.io/gateway-api/apis/v1" + + "github.com/aws/aws-application-networking-k8s/pkg/k8s" +) + +func TestAllowTakeoverFromAnnotationChangedPredicate_UpdateFunc(t *testing.T) { + tests := []struct { + name string + oldAnnotations map[string]string + newAnnotations map[string]string + expected bool + }{ + { + name: "annotation added", + oldAnnotations: nil, + newAnnotations: map[string]string{ + k8s.AllowTakeoverFromAnnotation: "123456789012/old-cluster/vpc-123", + }, + expected: true, + }, + { + name: "annotation removed", + oldAnnotations: map[string]string{ + k8s.AllowTakeoverFromAnnotation: "123456789012/old-cluster/vpc-123", + }, + newAnnotations: nil, + expected: true, + }, + { + name: "annotation unchanged", + oldAnnotations: map[string]string{ + k8s.AllowTakeoverFromAnnotation: "123456789012/cluster/vpc-123", + }, + newAnnotations: map[string]string{ + k8s.AllowTakeoverFromAnnotation: "123456789012/cluster/vpc-123", + }, + expected: false, + }, + { + name: "no annotation in both", + oldAnnotations: nil, + newAnnotations: nil, + expected: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + oldRoute := &gwv1.HTTPRoute{ + ObjectMeta: metav1.ObjectMeta{ + Annotations: tt.oldAnnotations, + }, + } + + newRoute := &gwv1.HTTPRoute{ + ObjectMeta: metav1.ObjectMeta{ + Annotations: tt.newAnnotations, + }, + } + + updateEvent := event.UpdateEvent{ + ObjectOld: oldRoute, + ObjectNew: newRoute, + } + + result := AllowTakeoverFromAnnotationChangedPredicate.Update(updateEvent) + assert.Equal(t, tt.expected, result) + }) + } +} + +func TestAllowTakeoverFromAnnotationChangedPredicate_CreateFunc(t *testing.T) { + tests := []struct { + name string + annotations map[string]string + expected bool + }{ + { + name: "create with takeover annotation", + annotations: map[string]string{ + k8s.AllowTakeoverFromAnnotation: "123456789012/cluster/vpc-123", + }, + expected: true, + }, + { + name: "create without takeover annotation", + annotations: nil, + expected: false, + }, + { + name: "create with empty takeover annotation", + annotations: map[string]string{ + k8s.AllowTakeoverFromAnnotation: "", + }, + expected: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + route := &gwv1.HTTPRoute{ + ObjectMeta: metav1.ObjectMeta{ + Annotations: tt.annotations, + }, + } + + createEvent := event.CreateEvent{ + Object: route, + } + + result := AllowTakeoverFromAnnotationChangedPredicate.Create(createEvent) + assert.Equal(t, tt.expected, result) + }) + } +} + +func TestGetAllowTakeoverFromAnnotation(t *testing.T) { + tests := []struct { + name string + annotations map[string]string + expected string + }{ + { + name: "nil annotations", + annotations: nil, + expected: "", + }, + { + name: "annotation present", + annotations: map[string]string{ + k8s.AllowTakeoverFromAnnotation: "123456789012/cluster/vpc-123", + }, + expected: "123456789012/cluster/vpc-123", + }, + { + name: "annotation empty", + annotations: map[string]string{ + k8s.AllowTakeoverFromAnnotation: "", + }, + expected: "", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + result := getAllowTakeoverFromAnnotation(tt.annotations) + assert.Equal(t, tt.expected, result) + }) + } +} diff --git a/pkg/controllers/route_controller.go b/pkg/controllers/route_controller.go index 326d44d0..56b65d03 100644 --- a/pkg/controllers/route_controller.go +++ b/pkg/controllers/route_controller.go @@ -116,7 +116,7 @@ func RegisterAllRouteControllers( svcImportEventHandler := eventhandlers.NewServiceImportEventHandler(log, mgrClient) builder := ctrl.NewControllerManagedBy(mgr). - For(routeInfo.gatewayApiType, builder.WithPredicates(predicate.Or(predicates.NewRouteChangedPredicate(), predicates.AdditionalTagsAnnotationChangedPredicate))). + For(routeInfo.gatewayApiType, builder.WithPredicates(predicate.Or(predicates.NewRouteChangedPredicate(), predicates.AdditionalTagsAnnotationChangedPredicate, predicates.AllowTakeoverFromAnnotationChangedPredicate))). Watches(&gwv1.Gateway{}, gwEventHandler). Watches(&corev1.Service{}, svcEventHandler.MapToRoute(routeInfo.routeType)). Watches(&anv1alpha1.ServiceImport{}, svcImportEventHandler.MapToRoute(routeInfo.routeType)). diff --git a/pkg/deploy/lattice/access_log_subscription_manager.go b/pkg/deploy/lattice/access_log_subscription_manager.go index fb6fd144..9f9bab24 100644 --- a/pkg/deploy/lattice/access_log_subscription_manager.go +++ b/pkg/deploy/lattice/access_log_subscription_manager.go @@ -150,7 +150,7 @@ func (m *defaultAccessLogSubscriptionManager) Update( } updateALSOutput, err := vpcLatticeSess.UpdateAccessLogSubscriptionWithContext(ctx, updateALSInput) if err == nil { - err = m.cloud.Tagging().UpdateTags(ctx, *updateALSOutput.Arn, accessLogSubscription.Spec.AdditionalTags) + err = m.cloud.Tagging().UpdateTags(ctx, *updateALSOutput.Arn, accessLogSubscription.Spec.AdditionalTags, nil) if err != nil { return nil, fmt.Errorf("failed to update tags for access log subscription %s: %w", *updateALSOutput.Arn, err) } diff --git a/pkg/deploy/lattice/access_log_subscription_manager_test.go b/pkg/deploy/lattice/access_log_subscription_manager_test.go index 4fb2f64e..c48358ca 100644 --- a/pkg/deploy/lattice/access_log_subscription_manager_test.go +++ b/pkg/deploy/lattice/access_log_subscription_manager_test.go @@ -326,7 +326,7 @@ func TestAccessLogSubscriptionManager(t *testing.T) { mockLattice.EXPECT().FindServiceNetwork(ctx, sourceName).Return(serviceNetworkInfo, nil) mockLattice.EXPECT().UpdateAccessLogSubscriptionWithContext(ctx, updateALSInput).Return(updateALSOutput, nil) - mockTagging.EXPECT().UpdateTags(ctx, accessLogSubscriptionArn, gomock.Any()).Return(nil) + mockTagging.EXPECT().UpdateTags(ctx, accessLogSubscriptionArn, gomock.Any(), nil).Return(nil) mgr := NewAccessLogSubscriptionManager(gwlog.FallbackLogger, cloud) resp, err := mgr.Update(ctx, accessLogSubscription) @@ -635,7 +635,7 @@ func Test_AccessLogSubscriptionManager_WithAdditionalTags_Update(t *testing.T) { mockLattice.EXPECT().FindServiceNetwork(ctx, sourceName).Return(serviceNetworkInfo, nil) mockLattice.EXPECT().UpdateAccessLogSubscriptionWithContext(ctx, updateALSInput).Return(updateALSOutput, nil) - mockTagging.EXPECT().UpdateTags(ctx, accessLogSubscriptionArn, accessLogSubscription.Spec.AdditionalTags).Return(nil) + mockTagging.EXPECT().UpdateTags(ctx, accessLogSubscriptionArn, accessLogSubscription.Spec.AdditionalTags, nil).Return(nil) mgr := NewAccessLogSubscriptionManager(gwlog.FallbackLogger, cloud) resp, err := mgr.Update(ctx, accessLogSubscription) diff --git a/pkg/deploy/lattice/listener_manager.go b/pkg/deploy/lattice/listener_manager.go index 0776f3f3..948ab166 100644 --- a/pkg/deploy/lattice/listener_manager.go +++ b/pkg/deploy/lattice/listener_manager.go @@ -77,7 +77,14 @@ func (d *defaultListenerManager) Upsert( ServiceId: latticeSvcId, } - err = d.cloud.Tagging().UpdateTags(ctx, aws.StringValue(latticeListenerSummary.Arn), modelListener.Spec.AdditionalTags) + var awsManagedTags services.Tags + if modelSvc.Spec.AllowTakeoverFrom != "" { + awsManagedTags = services.Tags{ + pkg_aws.TagManagedBy: d.cloud.DefaultTags()[pkg_aws.TagManagedBy], + } + } + + err = d.cloud.Tagging().UpdateTags(ctx, aws.StringValue(latticeListenerSummary.Arn), modelListener.Spec.AdditionalTags, awsManagedTags) if err != nil { return model.ListenerStatus{}, fmt.Errorf("failed to update tags for listener %s due to %s", aws.StringValue(latticeListenerSummary.Id), err) } diff --git a/pkg/deploy/lattice/listener_manager_test.go b/pkg/deploy/lattice/listener_manager_test.go index 25ce656a..a51642a7 100644 --- a/pkg/deploy/lattice/listener_manager_test.go +++ b/pkg/deploy/lattice/listener_manager_test.go @@ -144,7 +144,7 @@ func Test_UpsertListener_DoNotNeedToUpdateExistingHTTPAndHTTPSListener(t *testin }, }}, nil) - mockTagging.EXPECT().UpdateTags(ctx, "existing-arn", gomock.Any()).Return(nil) + mockTagging.EXPECT().UpdateTags(ctx, "existing-arn", gomock.Any(), nil).Return(nil) mockLattice.EXPECT().GetListenerWithContext(ctx, gomock.Any()).Times(0) mockLattice.EXPECT().UpdateListenerWithContext(ctx, gomock.Any()).Times(0) @@ -279,7 +279,7 @@ func Test_UpsertListener_Update_TLS_PASSTHROUGHListener(t *testing.T) { }, }}, nil) - mockTagging.EXPECT().UpdateTags(ctx, "existing-arn", gomock.Any()).Return(nil) + mockTagging.EXPECT().UpdateTags(ctx, "existing-arn", gomock.Any(), nil).Return(nil) mockLattice.EXPECT().GetListenerWithContext(ctx, &vpclattice.GetListenerInput{ ServiceIdentifier: aws.String("svc-id"), @@ -748,7 +748,7 @@ func Test_ListenerManager_WithAdditionalTags_UpdateHTTP(t *testing.T) { }, }}, nil) - mockTagging.EXPECT().UpdateTags(ctx, "existing-arn", ml.Spec.AdditionalTags).Return(nil) + mockTagging.EXPECT().UpdateTags(ctx, "existing-arn", ml.Spec.AdditionalTags, nil).Return(nil) // No UpdateListener call expected for HTTP listeners (only tags are updated) mockLattice.EXPECT().UpdateListenerWithContext(ctx, gomock.Any()).Times(0) @@ -807,7 +807,7 @@ func Test_ListenerManager_WithAdditionalTags_UpdateTLSPassthrough(t *testing.T) }, }}, nil) - mockTagging.EXPECT().UpdateTags(ctx, "existing-tls-arn", ml.Spec.AdditionalTags).Return(nil) + mockTagging.EXPECT().UpdateTags(ctx, "existing-tls-arn", ml.Spec.AdditionalTags, nil).Return(nil) mockLattice.EXPECT().GetListenerWithContext(ctx, gomock.Any()).Return( &vpclattice.GetListenerOutput{ @@ -835,3 +835,53 @@ func Test_ListenerManager_WithAdditionalTags_UpdateTLSPassthrough(t *testing.T) assert.Equal(t, "existing-tls-arn", status.ListenerArn) assert.Equal(t, "existing-tls-id", status.Id) } + +func Test_ListenerManager_WithTakeoverAnnotation_UpdateTags(t *testing.T) { + c := gomock.NewController(t) + defer c.Finish() + ctx := context.TODO() + mockLattice := mocks.NewMockLattice(c) + mockTagging := mocks.NewMockTagging(c) + cloud := pkg_aws.NewDefaultCloudWithTagging(mockLattice, mockTagging, TestCloudConfig) + + ml := &model.Listener{ + Spec: model.ListenerSpec{ + Protocol: vpclattice.ListenerProtocolHttp, + Port: 8080, + DefaultAction: &model.DefaultAction{ + FixedResponseStatusCode: aws.Int64(404), + }, + AdditionalTags: mocks.Tags{ + "Environment": &[]string{"Takeover"}[0], + }, + }, + } + + ms := &model.Service{ + Spec: model.ServiceSpec{ + AllowTakeoverFrom: "other-account/other-cluster/other-vpc", + }, + Status: &model.ServiceStatus{Id: "svc-id"}, + } + + mockLattice.EXPECT().ListListenersWithContext(ctx, gomock.Any()).Return( + &vpclattice.ListListenersOutput{Items: []*vpclattice.ListenerSummary{ + { + Arn: aws.String("existing-arn"), + Id: aws.String("existing-id"), + Name: aws.String("existing-name"), + Port: aws.Int64(8080), + }, + }}, nil) + + expectedAwsManagedTags := mocks.Tags{ + pkg_aws.TagManagedBy: cloud.DefaultTags()[pkg_aws.TagManagedBy], + } + mockTagging.EXPECT().UpdateTags(ctx, "existing-arn", ml.Spec.AdditionalTags, expectedAwsManagedTags).Return(nil) + + lm := NewListenerManager(gwlog.FallbackLogger, cloud) + status, err := lm.Upsert(ctx, ml, ms) + assert.Nil(t, err) + assert.Equal(t, "existing-arn", status.ListenerArn) + assert.Equal(t, "existing-id", status.Id) +} diff --git a/pkg/deploy/lattice/rule_manager.go b/pkg/deploy/lattice/rule_manager.go index 63cbcc7a..eff5ade3 100644 --- a/pkg/deploy/lattice/rule_manager.go +++ b/pkg/deploy/lattice/rule_manager.go @@ -11,6 +11,7 @@ import ( "github.com/aws/aws-sdk-go/service/vpclattice" pkg_aws "github.com/aws/aws-application-networking-k8s/pkg/aws" + "github.com/aws/aws-application-networking-k8s/pkg/aws/services" "github.com/aws/aws-application-networking-k8s/pkg/utils/gwlog" model "github.com/aws/aws-application-networking-k8s/pkg/model/lattice" @@ -194,7 +195,7 @@ func (r *defaultRuleManager) Upsert( if matchingRule == nil { return r.create(ctx, currentLatticeRules, latticeRuleFromModel, latticeServiceId, latticeListenerId, modelRule) } else { - return r.updateIfNeeded(ctx, latticeRuleFromModel, matchingRule, latticeServiceId, latticeListenerId, modelRule) + return r.updateIfNeeded(ctx, latticeRuleFromModel, matchingRule, latticeServiceId, latticeListenerId, modelRule, modelSvc) } } @@ -205,6 +206,7 @@ func (r *defaultRuleManager) updateIfNeeded( latticeSvcId string, latticeListenerId string, modelRule *model.Rule, + modelSvc *model.Service, ) (model.RuleStatus, error) { updatedRuleStatus := model.RuleStatus{ Name: aws.StringValue(matchingRule.Name), @@ -215,7 +217,14 @@ func (r *defaultRuleManager) updateIfNeeded( Priority: aws.Int64Value(matchingRule.Priority), } - err := r.cloud.Tagging().UpdateTags(ctx, aws.StringValue(matchingRule.Arn), modelRule.Spec.AdditionalTags) + var awsManagedTags services.Tags + if modelSvc.Spec.AllowTakeoverFrom != "" { + awsManagedTags = services.Tags{ + pkg_aws.TagManagedBy: r.cloud.DefaultTags()[pkg_aws.TagManagedBy], + } + } + + err := r.cloud.Tagging().UpdateTags(ctx, aws.StringValue(matchingRule.Arn), modelRule.Spec.AdditionalTags, awsManagedTags) if err != nil { return model.RuleStatus{}, fmt.Errorf("failed to update tags for rule %s: %w", aws.StringValue(matchingRule.Id), err) } diff --git a/pkg/deploy/lattice/rule_manager_test.go b/pkg/deploy/lattice/rule_manager_test.go index 44196846..44ca7b5f 100644 --- a/pkg/deploy/lattice/rule_manager_test.go +++ b/pkg/deploy/lattice/rule_manager_test.go @@ -166,7 +166,7 @@ func Test_Create(t *testing.T) { }, }, nil) - mockTagging.EXPECT().UpdateTags(ctx, "existing-arn", gomock.Any()).Return(nil) + mockTagging.EXPECT().UpdateTags(ctx, "existing-arn", gomock.Any(), nil).Return(nil) mockLattice.EXPECT().UpdateRuleWithContext(ctx, gomock.Any()).Return( &vpclattice.UpdateRuleOutput{ @@ -209,7 +209,7 @@ func Test_Create(t *testing.T) { }, }, nil) - mockTagging.EXPECT().UpdateTags(ctx, "existing-arn", gomock.Any()).Return(nil) + mockTagging.EXPECT().UpdateTags(ctx, "existing-arn", gomock.Any(), nil).Return(nil) mockLattice.EXPECT().UpdateRuleWithContext(ctx, gomock.Any()).Return( &vpclattice.UpdateRuleOutput{ @@ -253,7 +253,7 @@ func Test_Create(t *testing.T) { }, }, nil) // <-- should be an exact match, no update required - mockTagging.EXPECT().UpdateTags(ctx, "existing-arn", gomock.Any()).Return(nil) + mockTagging.EXPECT().UpdateTags(ctx, "existing-arn", gomock.Any(), nil).Return(nil) rm := NewRuleManager(gwlog.FallbackLogger, cloudWithTagging) ruleStatus, err := rm.Upsert(ctx, r, l, svc) @@ -533,7 +533,7 @@ func Test_RuleManager_WithAdditionalTags_Update(t *testing.T) { }, }, nil) - mockTagging.EXPECT().UpdateTags(ctx, "existing-arn", r.Spec.AdditionalTags).Return(nil) + mockTagging.EXPECT().UpdateTags(ctx, "existing-arn", r.Spec.AdditionalTags, nil).Return(nil) mockLattice.EXPECT().UpdateRuleWithContext(ctx, gomock.Any()).Return( &vpclattice.UpdateRuleOutput{ @@ -615,7 +615,7 @@ func Test_RuleManager_WithAdditionalTags_UpdateNoActionChange(t *testing.T) { }, nil) // Mock UpdateTags call for additional tags (should still be called even if no action update) - mockTagging.EXPECT().UpdateTags(ctx, "existing-arn", r.Spec.AdditionalTags).Return(nil) + mockTagging.EXPECT().UpdateTags(ctx, "existing-arn", r.Spec.AdditionalTags, nil).Return(nil) // No UpdateRule call expected since action matches mockLattice.EXPECT().UpdateRuleWithContext(ctx, gomock.Any()).Times(0) @@ -625,3 +625,82 @@ func Test_RuleManager_WithAdditionalTags_UpdateNoActionChange(t *testing.T) { assert.Nil(t, err) assert.Equal(t, "existing-arn", ruleStatus.Arn) } + +func Test_RuleManager_WithTakeoverAnnotation_UpdateTags(t *testing.T) { + c := gomock.NewController(t) + defer c.Finish() + ctx := context.TODO() + mockLattice := mocks.NewMockLattice(c) + mockTagging := mocks.NewMockTagging(c) + cloud := pkg_aws.NewDefaultCloudWithTagging(mockLattice, mockTagging, TestCloudConfig) + + svc := &model.Service{ + Spec: model.ServiceSpec{ + AllowTakeoverFrom: "other-account/other-cluster/other-vpc", + }, + Status: &model.ServiceStatus{Id: "svc-id"}, + } + + l := &model.Listener{ + Spec: model.ListenerSpec{ + Port: 80, + Protocol: "HTTP", + }, + Status: &model.ListenerStatus{Id: "listener-id"}, + } + + r := &model.Rule{ + Spec: model.RuleSpec{ + Priority: 1, + Method: "POST", + Action: model.RuleAction{ + TargetGroups: []*model.RuleTargetGroup{ + { + LatticeTgId: "tg-id", + Weight: 1, + }, + }, + }, + AdditionalTags: mocks.Tags{ + "Environment": &[]string{"Takeover"}[0], + }, + }, + } + + mockLattice.EXPECT().GetRulesAsList(ctx, gomock.Any()).Return( + []*vpclattice.GetRuleOutput{ + { + Id: aws.String("existing-id"), + Arn: aws.String("existing-arn"), + Match: &vpclattice.RuleMatch{ + HttpMatch: &vpclattice.HttpMatch{ + Method: aws.String("POST"), + }, + }, + Action: &vpclattice.RuleAction{ + Forward: &vpclattice.ForwardAction{ + TargetGroups: []*vpclattice.WeightedTargetGroup{ + { + TargetGroupIdentifier: aws.String("tg-id"), + Weight: aws.Int64(1), + }, + }, + }, + }, + Name: aws.String("existing-name"), + Priority: aws.Int64(1), + }, + }, nil) + + expectedAwsManagedTags := mocks.Tags{ + pkg_aws.TagManagedBy: cloud.DefaultTags()[pkg_aws.TagManagedBy], + } + mockTagging.EXPECT().UpdateTags(ctx, "existing-arn", r.Spec.AdditionalTags, expectedAwsManagedTags).Return(nil) + + mockLattice.EXPECT().UpdateRuleWithContext(ctx, gomock.Any()).Times(0) + + rm := NewRuleManager(gwlog.FallbackLogger, cloud) + ruleStatus, err := rm.Upsert(ctx, r, l, svc) + assert.Nil(t, err) + assert.Equal(t, "existing-arn", ruleStatus.Arn) +} diff --git a/pkg/deploy/lattice/service_manager.go b/pkg/deploy/lattice/service_manager.go index 223b4521..40f9be60 100644 --- a/pkg/deploy/lattice/service_manager.go +++ b/pkg/deploy/lattice/service_manager.go @@ -149,10 +149,21 @@ func (m *defaultServiceManager) checkAndUpdateTags(ctx context.Context, svc *Ser return err } if !owned { - return services.NewConflictError("service", svc.Spec.RouteNamespace+"/"+svc.Spec.RouteName, - fmt.Sprintf("Found existing resource not owned by controller: %s", *svcSum.Arn)) + canTakeover := m.canTakeoverService(svc, tagsResp.Tags) + if canTakeover { + currentOwner := m.cloud.GetManagedByFromTags(tagsResp.Tags) + newOwner := m.cloud.DefaultTags()[pkg_aws.TagManagedBy] + err = m.transferServiceOwnership(ctx, svcSum.Arn, newOwner) + if err != nil { + return fmt.Errorf("failed to takeover service %s from %s to %s: %w", svc.LatticeServiceName(), currentOwner, *newOwner, err) + } + m.log.Infof(ctx, "Successfully took over service %s from %s to %s", svc.LatticeServiceName(), currentOwner, *newOwner) + return nil + } else { + return services.NewConflictError("service", svc.Spec.RouteNamespace+"/"+svc.Spec.RouteName, + fmt.Sprintf("Found existing resource not owned by controller: %s", *svcSum.Arn)) + } } - tagFields := model.ServiceTagFieldsFromTags(tagsResp.Tags) switch { case tagFields.RouteName == "" && tagFields.RouteNamespace == "": @@ -174,7 +185,7 @@ func (m *defaultServiceManager) checkAndUpdateTags(ctx context.Context, svc *Ser } func (m *defaultServiceManager) updateServiceAndAssociations(ctx context.Context, svc *Service, svcSum *SvcSummary) (ServiceInfo, error) { - err := m.cloud.Tagging().UpdateTags(ctx, aws.StringValue(svcSum.Arn), svc.Spec.AdditionalTags) + err := m.cloud.Tagging().UpdateTags(ctx, aws.StringValue(svcSum.Arn), svc.Spec.AdditionalTags, nil) if err != nil { return ServiceInfo{}, fmt.Errorf("failed to update tags for service %s: %w", aws.StringValue(svcSum.Id), err) } @@ -235,8 +246,15 @@ func (m *defaultServiceManager) updateAssociations(ctx context.Context, svc *Ser } } + var awsManagedTags services.Tags + if svc.Spec.AllowTakeoverFrom != "" { + awsManagedTags = services.Tags{ + pkg_aws.TagManagedBy: m.cloud.DefaultTags()[pkg_aws.TagManagedBy], + } + } + for _, assoc := range toUpdate { - err := m.cloud.Tagging().UpdateTags(ctx, aws.StringValue(assoc.Arn), svc.Spec.AdditionalTags) + err := m.cloud.Tagging().UpdateTags(ctx, aws.StringValue(assoc.Arn), svc.Spec.AdditionalTags, awsManagedTags) if err != nil { return fmt.Errorf("failed to update tags for association %s: %w", aws.StringValue(assoc.Arn), err) } @@ -437,3 +455,24 @@ func (m *defaultServiceManager) Delete(ctx context.Context, svc *Service) error } return nil } + +func (m *defaultServiceManager) canTakeoverService(svc *Service, serviceTags services.Tags) bool { + takeoverFrom := svc.Spec.AllowTakeoverFrom + if takeoverFrom == "" { + return false + } + + currentOwner := m.cloud.GetManagedByFromTags(serviceTags) + + return currentOwner == takeoverFrom +} + +func (m *defaultServiceManager) transferServiceOwnership(ctx context.Context, serviceArn *string, newOwner *string) error { + _, err := m.cloud.Lattice().TagResourceWithContext(ctx, &vpclattice.TagResourceInput{ + ResourceArn: serviceArn, + Tags: map[string]*string{ + pkg_aws.TagManagedBy: newOwner, + }, + }) + return err +} diff --git a/pkg/deploy/lattice/service_manager_test.go b/pkg/deploy/lattice/service_manager_test.go index fb56864c..38c6470b 100644 --- a/pkg/deploy/lattice/service_manager_test.go +++ b/pkg/deploy/lattice/service_manager_test.go @@ -142,7 +142,7 @@ func TestServiceManagerInteg(t *testing.T) { }). Times(1) // for service only - mockTagging.EXPECT().UpdateTags(ctx, "svc-arn", gomock.Any()).Return(nil) + mockTagging.EXPECT().UpdateTags(ctx, "svc-arn", gomock.Any(), gomock.Any()).Return(nil) // 3 associations exist in lattice: keep, delete, and foreign mockLattice.EXPECT(). @@ -161,7 +161,7 @@ func TestServiceManagerInteg(t *testing.T) { }). Times(1) - mockTagging.EXPECT().UpdateTags(ctx, "sn-keep-arn", gomock.Any()).Return(nil) + mockTagging.EXPECT().UpdateTags(ctx, "sn-keep-arn", gomock.Any(), gomock.Any()).Return(nil) // return managed by gateway controller tags for all associations except for foreign and foreign ram mockLattice.EXPECT().ListTagsForResourceWithContext(gomock.Any(), gomock.Any()). @@ -259,7 +259,7 @@ func TestServiceManagerInteg(t *testing.T) { Tags: svc.Spec.ToTags(), })).Times(1) - mockTagging.EXPECT().UpdateTags(ctx, "svc-arn", gomock.Any()).Return(nil) + mockTagging.EXPECT().UpdateTags(ctx, "svc-arn", gomock.Any(), gomock.Any()).Return(nil) mockLattice.EXPECT().ListServiceNetworkServiceAssociationsAsList(gomock.Any(), gomock.Any()).Times(1) mockLattice.EXPECT(). @@ -358,7 +358,7 @@ func TestServiceManagerInteg(t *testing.T) { }). Times(1) // for service only - mockTagging.EXPECT().UpdateTags(ctx, "standalone-svc-arn", gomock.Any()).Return(nil) + mockTagging.EXPECT().UpdateTags(ctx, "standalone-svc-arn", gomock.Any(), gomock.Any()).Return(nil) // no associations exist for standalone service mockLattice.EXPECT(). @@ -710,7 +710,7 @@ func Test_ServiceManager_WithAdditionalTags_UpdateService(t *testing.T) { }). Times(1) - mockTagging.EXPECT().UpdateTags(ctx, "svc-arn", svc.Spec.AdditionalTags).Return(nil) + mockTagging.EXPECT().UpdateTags(ctx, "svc-arn", svc.Spec.AdditionalTags, gomock.Any()).Return(nil) mockLattice.EXPECT(). ListServiceNetworkServiceAssociationsAsList(gomock.Any(), gomock.Any()). @@ -726,7 +726,7 @@ func Test_ServiceManager_WithAdditionalTags_UpdateService(t *testing.T) { }). Times(1) - mockTagging.EXPECT().UpdateTags(ctx, "assoc-arn", svc.Spec.AdditionalTags).Return(nil) + mockTagging.EXPECT().UpdateTags(ctx, "assoc-arn", svc.Spec.AdditionalTags, gomock.Any()).Return(nil) status, err := m.Upsert(ctx, svc) assert.Nil(t, err) @@ -776,7 +776,7 @@ func Test_ServiceManager_WithAdditionalTags_UpdateAssociations(t *testing.T) { }). Times(1) - mockTagging.EXPECT().UpdateTags(ctx, "svc-arn", svc.Spec.AdditionalTags).Return(nil) + mockTagging.EXPECT().UpdateTags(ctx, "svc-arn", svc.Spec.AdditionalTags, gomock.Any()).Return(nil) mockLattice.EXPECT(). ListServiceNetworkServiceAssociationsAsList(gomock.Any(), gomock.Any()). @@ -798,7 +798,7 @@ func Test_ServiceManager_WithAdditionalTags_UpdateAssociations(t *testing.T) { }). Times(1) - mockTagging.EXPECT().UpdateTags(ctx, "sn-keep-arn", svc.Spec.AdditionalTags).Return(nil) + mockTagging.EXPECT().UpdateTags(ctx, "sn-keep-arn", svc.Spec.AdditionalTags, gomock.Any()).Return(nil) mockLattice.EXPECT().ListTagsForResourceWithContext(gomock.Any(), gomock.Any()). DoAndReturn(func(_ context.Context, req *vpclattice.ListTagsForResourceInput, _ ...interface{}) (*vpclattice.ListTagsForResourceOutput, error) { @@ -849,3 +849,128 @@ func Test_ServiceManager_WithAdditionalTags_UpdateAssociations(t *testing.T) { assert.Nil(t, err) assert.Equal(t, "svc-arn", status.Arn) } + +func Test_ServiceManager_ServiceTakeover(t *testing.T) { + c := gomock.NewController(t) + defer c.Finish() + + mockLattice := mocks.NewMockLattice(c) + mockTagging := mocks.NewMockTagging(c) + cfg := pkg_aws.CloudConfig{VpcId: "vpc-id", AccountId: "account-id", ClusterName: "cluster"} + cl := pkg_aws.NewDefaultCloudWithTagging(mockLattice, mockTagging, cfg) + ctx := context.Background() + m := NewServiceManager(gwlog.FallbackLogger, cl) + + svc := &Service{ + Spec: model.ServiceSpec{ + ServiceTagFields: model.ServiceTagFields{ + RouteName: "takeover-svc", + RouteNamespace: "ns", + RouteType: core.HttpRouteType, + }, + ServiceNetworkNames: []string{"sn"}, + AllowTakeoverFrom: "other-account/other-cluster/other-vpc", + }, + } + + mockLattice.EXPECT(). + FindService(gomock.Any(), gomock.Any()). + Return(&vpclattice.ServiceSummary{ + Arn: aws.String("svc-arn"), + Id: aws.String("svc-id"), + Name: aws.String(svc.LatticeServiceName()), + }, nil). + Times(1) + + mockLattice.EXPECT().ListTagsForResourceWithContext(gomock.Any(), gomock.Any()). + DoAndReturn(func(_ context.Context, req *vpclattice.ListTagsForResourceInput, _ ...interface{}) (*vpclattice.ListTagsForResourceOutput, error) { + return &vpclattice.ListTagsForResourceOutput{ + Tags: map[string]*string{ + pkg_aws.TagManagedBy: aws.String("other-account/other-cluster/other-vpc"), + }, + }, nil + }). + Times(1) + + mockLattice.EXPECT().TagResourceWithContext(gomock.Any(), gomock.Eq(&vpclattice.TagResourceInput{ + ResourceArn: aws.String("svc-arn"), + Tags: map[string]*string{ + pkg_aws.TagManagedBy: cl.DefaultTags()[pkg_aws.TagManagedBy], + }, + })).Times(1) + + mockTagging.EXPECT().UpdateTags(ctx, "svc-arn", svc.Spec.AdditionalTags, gomock.Any()).Return(nil) + + mockLattice.EXPECT().ListServiceNetworkServiceAssociationsAsList(gomock.Any(), gomock.Any()). + Return([]*SnSvcAssocSummary{ + { + Arn: aws.String("assoc-arn"), + Id: aws.String("assoc-id"), + ServiceNetworkName: aws.String("sn"), + Status: aws.String(vpclattice.ServiceNetworkServiceAssociationStatusActive), + }, + }, nil).Times(1) + + expectedAwsManagedTags := mocks.Tags{ + pkg_aws.TagManagedBy: cl.DefaultTags()[pkg_aws.TagManagedBy], + } + mockTagging.EXPECT().UpdateTags(ctx, "assoc-arn", svc.Spec.AdditionalTags, expectedAwsManagedTags).Return(nil) + + status, err := m.Upsert(ctx, svc) + assert.Nil(t, err) + assert.Equal(t, "svc-arn", status.Arn) +} + +func Test_ServiceManager_ServiceTakeover_NotAllowed(t *testing.T) { + c := gomock.NewController(t) + defer c.Finish() + + mockLattice := mocks.NewMockLattice(c) + mockTagging := mocks.NewMockTagging(c) + cfg := pkg_aws.CloudConfig{VpcId: "vpc-id", AccountId: "account-id", ClusterName: "cluster"} + cl := pkg_aws.NewDefaultCloudWithTagging(mockLattice, mockTagging, cfg) + ctx := context.Background() + m := NewServiceManager(gwlog.FallbackLogger, cl) + + svc := &Service{ + Spec: model.ServiceSpec{ + ServiceTagFields: model.ServiceTagFields{ + RouteName: "conflict-svc", + RouteNamespace: "ns", + RouteType: core.HttpRouteType, + }, + ServiceNetworkNames: []string{"sn"}, + AllowTakeoverFrom: "expected-owner/expected-cluster/expected-vpc", + }, + } + + mockLattice.EXPECT(). + FindService(gomock.Any(), gomock.Any()). + Return(&vpclattice.ServiceSummary{ + Arn: aws.String("svc-arn"), + Id: aws.String("svc-id"), + Name: aws.String(svc.LatticeServiceName()), + }, nil). + Times(1) + + mockLattice.EXPECT().ListTagsForResourceWithContext(gomock.Any(), gomock.Any()). + DoAndReturn(func(_ context.Context, req *vpclattice.ListTagsForResourceInput, _ ...interface{}) (*vpclattice.ListTagsForResourceOutput, error) { + return &vpclattice.ListTagsForResourceOutput{ + Tags: map[string]*string{ + pkg_aws.TagManagedBy: aws.String("different-owner/different-cluster/different-vpc"), + }, + }, nil + }). + Times(1) + + mockLattice.EXPECT().TagResourceWithContext(gomock.Any(), gomock.Any()).Times(0) + + mockTagging.EXPECT().UpdateTags(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Times(0) + + status, err := m.Upsert(ctx, svc) + assert.NotNil(t, err) + assert.True(t, mocks.IsConflictError(err)) + assert.Contains(t, err.Error(), "Found existing resource not owned by controller") + assert.Contains(t, err.Error(), "svc-arn") + assert.Equal(t, "", status.Arn) +} diff --git a/pkg/deploy/lattice/service_network_manager.go b/pkg/deploy/lattice/service_network_manager.go index 26d8d222..feddaa76 100644 --- a/pkg/deploy/lattice/service_network_manager.go +++ b/pkg/deploy/lattice/service_network_manager.go @@ -233,7 +233,7 @@ func (m *defaultServiceNetworkManager) updateServiceNetworkVpcAssociation(ctx co return model.ServiceNetworkStatus{}, err } - err = m.cloud.Tagging().UpdateTags(ctx, aws.StringValue(snva.Arn), additionalTags) + err = m.cloud.Tagging().UpdateTags(ctx, aws.StringValue(snva.Arn), additionalTags, nil) if err != nil { return model.ServiceNetworkStatus{}, fmt.Errorf("failed to update tags for service network vpc association %s: %w", aws.StringValue(snva.Id), err) } diff --git a/pkg/deploy/lattice/service_network_manager_test.go b/pkg/deploy/lattice/service_network_manager_test.go index a9455e09..ed086bf9 100644 --- a/pkg/deploy/lattice/service_network_manager_test.go +++ b/pkg/deploy/lattice/service_network_manager_test.go @@ -525,7 +525,7 @@ func Test_defaultServiceNetworkManager_CreateOrUpdate_SnExists_SnvaExists_Update Tags: cloud.DefaultTags(), }, nil) - mockTagging.EXPECT().UpdateTags(ctx, gomock.Any(), gomock.Any()).Return(nil) + mockTagging.EXPECT().UpdateTags(ctx, gomock.Any(), gomock.Any(), nil).Return(nil) mockLattice.EXPECT().CreateServiceNetworkServiceAssociationWithContext(ctx, gomock.Any()).MaxTimes(0) mockLattice.EXPECT().UpdateServiceNetworkVpcAssociationWithContext(ctx, gomock.Any()).Return(&vpclattice.UpdateServiceNetworkVpcAssociationOutput{ @@ -591,7 +591,7 @@ func Test_defaultServiceNetworkManager_CreateOrUpdate_SnExists_SnvaExists_Securi Tags: cloud.DefaultTags(), }, nil) - mockTagging.EXPECT().UpdateTags(ctx, gomock.Any(), gomock.Any()).Return(nil) + mockTagging.EXPECT().UpdateTags(ctx, snvaArn, nil, nil).Return(nil) mockLattice.EXPECT().CreateServiceNetworkServiceAssociationWithContext(ctx, gomock.Any()).Times(0) mockLattice.EXPECT().UpdateServiceNetworkVpcAssociationWithContext(ctx, gomock.Any()).Times(0) @@ -695,7 +695,7 @@ func Test_defaultServiceNetworkManager_CreateOrUpdate_SnExists_SnvaExists_Cannot Tags: cloud.DefaultTags(), }, nil) - mockTagging.EXPECT().UpdateTags(ctx, gomock.Any(), gomock.Any()).Return(nil) + mockTagging.EXPECT().UpdateTags(ctx, snvaArn, gomock.Any(), nil).Return(nil) mockLattice.EXPECT().CreateServiceNetworkServiceAssociationWithContext(ctx, gomock.Any()).Times(0) updateSNVAError := errors.New("InvalidParameterException SecurityGroupIds cannot be empty") @@ -762,7 +762,7 @@ func Test_UpsertVpcAssociation_WithAdditionalTags_ExistingAssociation(t *testing "Project": &[]string{"SNManager"}[0], } - mockTagging.EXPECT().UpdateTags(ctx, snvaArn, additionalTags).Return(nil) + mockTagging.EXPECT().UpdateTags(ctx, snvaArn, additionalTags, nil).Return(nil) mockLattice.EXPECT().UpdateServiceNetworkVpcAssociationWithContext(ctx, gomock.Any()).Times(0) diff --git a/pkg/deploy/lattice/target_group_manager.go b/pkg/deploy/lattice/target_group_manager.go index 6f958f29..b7cb37a7 100644 --- a/pkg/deploy/lattice/target_group_manager.go +++ b/pkg/deploy/lattice/target_group_manager.go @@ -132,7 +132,7 @@ func (s *defaultTargetGroupManager) create(ctx context.Context, modelTg *model.T func (s *defaultTargetGroupManager) update(ctx context.Context, targetGroup *model.TargetGroup, latticeTg *vpclattice.GetTargetGroupOutput) (model.TargetGroupStatus, error) { healthCheckConfig := targetGroup.Spec.HealthCheckConfig - err := s.awsCloud.Tagging().UpdateTags(ctx, aws.StringValue(latticeTg.Arn), targetGroup.Spec.AdditionalTags) + err := s.awsCloud.Tagging().UpdateTags(ctx, aws.StringValue(latticeTg.Arn), targetGroup.Spec.AdditionalTags, nil) if err != nil { return model.TargetGroupStatus{}, fmt.Errorf("failed to update tags for target group %s: %w", aws.StringValue(latticeTg.Id), err) } diff --git a/pkg/deploy/lattice/target_group_manager_test.go b/pkg/deploy/lattice/target_group_manager_test.go index 109a9f87..4a90f9fb 100644 --- a/pkg/deploy/lattice/target_group_manager_test.go +++ b/pkg/deploy/lattice/target_group_manager_test.go @@ -252,7 +252,7 @@ func Test_CreateTargetGroup_TGActive_UpdateHealthCheck(t *testing.T) { mockTagging.EXPECT().FindResourcesByTags(ctx, gomock.Any(), gomock.Any()).Return([]string{arn}, nil) mockLattice.EXPECT().GetTargetGroupWithContext(ctx, gomock.Any()).Return(&tgOutput, nil) - mockTagging.EXPECT().UpdateTags(ctx, arn, tgSpec.AdditionalTags).Return(nil) + mockTagging.EXPECT().UpdateTags(ctx, arn, tgSpec.AdditionalTags, nil).Return(nil) if tt.wantErr { mockLattice.EXPECT().UpdateTargetGroupWithContext(ctx, gomock.Any()).Return(nil, errors.New("error")) @@ -323,7 +323,7 @@ func Test_CreateTargetGroup_TGActive_HealthCheckSame(t *testing.T) { mockTagging.EXPECT().FindResourcesByTags(ctx, gomock.Any(), gomock.Any()).Return([]string{"arn"}, nil) mockLattice.EXPECT().GetTargetGroupWithContext(ctx, gomock.Any()).Return(&tgOutput, nil) - mockTagging.EXPECT().UpdateTags(ctx, gomock.Any(), gomock.Any()).Return(nil) + mockTagging.EXPECT().UpdateTags(ctx, "arn", gomock.Any(), nil).Return(nil) mockLattice.EXPECT().UpdateTargetGroupWithContext(ctx, gomock.Any()).Times(0) @@ -1236,7 +1236,7 @@ func Test_update_ServiceExportWithPolicy_Integration(t *testing.T) { mockTagging := mocks.NewMockTagging(c) cloud := pkg_aws.NewDefaultCloudWithTagging(mockLattice, mockTagging, TestCloudConfig) - mockTagging.EXPECT().UpdateTags(ctx, gomock.Any(), gomock.Any()).Return(nil) + mockTagging.EXPECT().UpdateTags(ctx, "arn:aws:vpc-lattice:us-west-2:123456789012:targetgroup/tg-12345", gomock.Any(), nil).Return(nil) // Since we don't have a real k8s k8sClient in this test, we'll test the case where // no k8sClient is available (which should fall back to default behavior) @@ -1455,7 +1455,7 @@ func Test_update_ServiceExportWithPolicyResolution(t *testing.T) { }, } - mockTagging.EXPECT().UpdateTags(ctx, gomock.Any(), gomock.Any()).Return(nil) + mockTagging.EXPECT().UpdateTags(ctx, "arn:aws:vpc-lattice:us-west-2:123456789012:targetgroup/tg-12345", gomock.Any(), nil).Return(nil) tgManager := NewTargetGroupManager(gwlog.FallbackLogger, cloud, k8sClient) @@ -1639,7 +1639,7 @@ func Test_update_BackwardsCompatibility(t *testing.T) { }, } - mockTagging.EXPECT().UpdateTags(ctx, gomock.Any(), gomock.Any()).Return(nil) + mockTagging.EXPECT().UpdateTags(ctx, "arn:aws:vpc-lattice:us-west-2:123456789012:targetgroup/tg-12345", gomock.Any(), nil).Return(nil) tgManager := NewTargetGroupManager(gwlog.FallbackLogger, cloud, k8sClient) diff --git a/pkg/gateway/model_build_lattice_service.go b/pkg/gateway/model_build_lattice_service.go index 07a821aa..ff471f6e 100644 --- a/pkg/gateway/model_build_lattice_service.go +++ b/pkg/gateway/model_build_lattice_service.go @@ -189,6 +189,12 @@ func (t *latticeServiceModelBuildTask) buildLatticeService(ctx context.Context) } spec.CustomerCertARN = certArn + allowTakeoverFrom, err := t.getAllowTakeoverFrom(ctx) + if err != nil { + return nil, fmt.Errorf("failed to get takeover annotation: %w", err) + } + spec.AllowTakeoverFrom = allowTakeoverFrom + svc, err := model.NewLatticeService(t.stack, spec) if err != nil { return nil, err @@ -276,3 +282,20 @@ func (t *latticeServiceModelBuildTask) isStandaloneMode(ctx context.Context) (bo t.route.Namespace(), t.route.Name(), standalone) return standalone, nil } + +func (t *latticeServiceModelBuildTask) getAllowTakeoverFrom(ctx context.Context) (string, error) { + annotations := t.route.K8sObject().GetAnnotations() + if annotations == nil { + return "", nil + } + + takeoverFrom := annotations[k8s.AllowTakeoverFromAnnotation] + if takeoverFrom == "" { + return "", nil + } + + t.log.Debugf(ctx, "Found allow-takeover-from annotation: %s for route %s/%s", + takeoverFrom, t.route.Namespace(), t.route.Name()) + + return takeoverFrom, nil +} diff --git a/pkg/k8s/utils.go b/pkg/k8s/utils.go index 5ba0a93e..991732be 100644 --- a/pkg/k8s/utils.go +++ b/pkg/k8s/utils.go @@ -32,6 +32,9 @@ const ( // Additional tags TagsAnnotationKey = AnnotationPrefix + "tags" + // HttpRoute takeover annotation + AllowTakeoverFromAnnotation = AnnotationPrefix + "allow-takeover-from" + // AWS tag validation limits maxTagKeyLength = 128 maxTagValueLength = 256 diff --git a/pkg/model/lattice/service.go b/pkg/model/lattice/service.go index 961b0658..250bdd3c 100644 --- a/pkg/model/lattice/service.go +++ b/pkg/model/lattice/service.go @@ -19,6 +19,7 @@ type ServiceSpec struct { CustomerDomainName string `json:"customerdomainname"` CustomerCertARN string `json:"customercertarn"` AdditionalTags services.Tags `json:"additionaltags,omitempty"` + AllowTakeoverFrom string `json:"allowtakeoverfrom,omitempty"` } type ServiceStatus struct { diff --git a/test/suites/integration/service_takeover_test.go b/test/suites/integration/service_takeover_test.go new file mode 100644 index 00000000..124271c7 --- /dev/null +++ b/test/suites/integration/service_takeover_test.go @@ -0,0 +1,404 @@ +package integration + +import ( + "fmt" + "log" + "strings" + "time" + + "github.com/aws/aws-application-networking-k8s/test/pkg/test" + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/aws/awserr" + "github.com/aws/aws-sdk-go/service/vpclattice" + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + "github.com/samber/lo" + appsv1 "k8s.io/api/apps/v1" + corev1 "k8s.io/api/core/v1" + v1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "sigs.k8s.io/controller-runtime/pkg/client" + gwv1 "sigs.k8s.io/gateway-api/apis/v1" +) + +var _ = Describe("Service Takeover Test", Ordered, func() { + var ( + preCreatedServiceArn *string + preCreatedListenerArn *string + preCreatedRuleArn *string + preCreatedAssociationArn *string + preCreatedTargetGroupArn *string + + deployment1 *appsv1.Deployment + service1 *v1.Service + deployment2 *appsv1.Deployment + service2 *v1.Service + httpRoute *gwv1.HTTPRoute + + originalManagedBy = "685175429445/blue-controller/vpc-0e19af3ab36ee2915" + serviceName = "inventory-e2e-test" + ) + + It("Create lattice resources simulating HttpRoute created by blue controller", func() { + serviceResp, err := testFramework.LatticeClient.CreateService(&vpclattice.CreateServiceInput{ + Name: aws.String(serviceName), + Tags: map[string]*string{ + "application-networking.k8s.aws/ManagedBy": aws.String(originalManagedBy), + "application-networking.k8s.aws/RouteName": aws.String("inventory"), + "application-networking.k8s.aws/RouteNamespace": aws.String(k8snamespace), + "application-networking.k8s.aws/RouteType": aws.String("http"), + "application-networking.k8s.aws/ClusterName": aws.String("blue-cluster"), + }, + }) + Expect(err).To(BeNil()) + preCreatedServiceArn = serviceResp.Arn + + Eventually(func(g Gomega) { + getServiceResp, err := testFramework.LatticeClient.GetService(&vpclattice.GetServiceInput{ + ServiceIdentifier: preCreatedServiceArn, + }) + g.Expect(err).To(BeNil()) + g.Expect(*getServiceResp.Status).To(Equal("ACTIVE")) + }).WithTimeout(2 * time.Minute).Should(Succeed()) + + listenerResp, err := testFramework.LatticeClient.CreateListener(&vpclattice.CreateListenerInput{ + ServiceIdentifier: preCreatedServiceArn, + Name: aws.String("inventory-listener"), + Protocol: aws.String("HTTP"), + Port: aws.Int64(80), + DefaultAction: &vpclattice.RuleAction{ + FixedResponse: &vpclattice.FixedResponseAction{ + StatusCode: aws.Int64(404), + }, + }, + Tags: map[string]*string{ + "application-networking.k8s.aws/ManagedBy": aws.String(originalManagedBy), + }, + }) + Expect(err).To(BeNil()) + preCreatedListenerArn = listenerResp.Arn + + targetGroupResp, err := testFramework.LatticeClient.CreateTargetGroup(&vpclattice.CreateTargetGroupInput{ + Name: aws.String("inventory-takeover-tg"), + Type: aws.String("IP"), + Config: &vpclattice.TargetGroupConfig{ + Protocol: aws.String("HTTP"), + Port: aws.Int64(80), + VpcIdentifier: aws.String(testFramework.Cloud.Config().VpcId), + }, + Tags: map[string]*string{ + "application-networking.k8s.aws/ManagedBy": aws.String(originalManagedBy), + }, + }) + Expect(err).To(BeNil()) + preCreatedTargetGroupArn = targetGroupResp.Arn + + Eventually(func(g Gomega) { + getTargetGroupResp, err := testFramework.LatticeClient.GetTargetGroup(&vpclattice.GetTargetGroupInput{ + TargetGroupIdentifier: preCreatedTargetGroupArn, + }) + g.Expect(err).To(BeNil()) + g.Expect(*getTargetGroupResp.Status).To(Equal("ACTIVE")) + }).WithTimeout(2 * time.Minute).Should(Succeed()) + + _, err = testFramework.LatticeClient.RegisterTargets(&vpclattice.RegisterTargetsInput{ + TargetGroupIdentifier: preCreatedTargetGroupArn, + Targets: []*vpclattice.Target{ + { + Id: aws.String("192.168.1.100"), + Port: aws.Int64(8090), + }, + }, + }) + Expect(err).To(BeNil()) + + ruleResp, err := testFramework.LatticeClient.CreateRule(&vpclattice.CreateRuleInput{ + ServiceIdentifier: preCreatedServiceArn, + ListenerIdentifier: preCreatedListenerArn, + Name: aws.String("inventory-rule"), + Priority: aws.Int64(1), + Match: &vpclattice.RuleMatch{ + HttpMatch: &vpclattice.HttpMatch{ + PathMatch: &vpclattice.PathMatch{ + Match: &vpclattice.PathMatchType{ + Prefix: aws.String("/"), + }, + CaseSensitive: aws.Bool(true), + }, + }, + }, + Action: &vpclattice.RuleAction{ + Forward: &vpclattice.ForwardAction{ + TargetGroups: []*vpclattice.WeightedTargetGroup{ + { + TargetGroupIdentifier: preCreatedTargetGroupArn, + Weight: aws.Int64(100), + }, + }, + }, + }, + Tags: map[string]*string{ + "application-networking.k8s.aws/ManagedBy": aws.String(originalManagedBy), + }, + }) + Expect(err).To(BeNil()) + preCreatedRuleArn = ruleResp.Arn + + associationResp, err := testFramework.LatticeClient.CreateServiceNetworkServiceAssociation(&vpclattice.CreateServiceNetworkServiceAssociationInput{ + ServiceIdentifier: preCreatedServiceArn, + ServiceNetworkIdentifier: testServiceNetwork.Id, + Tags: map[string]*string{ + "application-networking.k8s.aws/ManagedBy": aws.String(originalManagedBy), + }, + }) + Expect(err).To(BeNil()) + preCreatedAssociationArn = associationResp.Arn + + Eventually(func(g Gomega) { + getAssocResp, err := testFramework.LatticeClient.GetServiceNetworkServiceAssociation(&vpclattice.GetServiceNetworkServiceAssociationInput{ + ServiceNetworkServiceAssociationIdentifier: preCreatedAssociationArn, + }) + g.Expect(err).To(BeNil()) + g.Expect(*getAssocResp.Status).To(Equal("ACTIVE")) + }).WithTimeout(2 * time.Minute).Should(Succeed()) + }) + + It("Creating HTTPRoute without takeover annotation should fail", func() { + deployment1, service1 = testFramework.NewHttpApp(test.HTTPAppOptions{ + Name: "backend-1", + Namespace: k8snamespace, + Port: 80, + TargetPort: 8090, + }) + deployment2, service2 = testFramework.NewHttpApp(test.HTTPAppOptions{ + Name: "backend-2", + Namespace: k8snamespace, + Port: 80, + TargetPort: 8090, + }) + testFramework.ExpectCreated(ctx, deployment1, service1, deployment2, service2) + + httpRoute = &gwv1.HTTPRoute{ + ObjectMeta: metav1.ObjectMeta{ + Name: "inventory", + Namespace: k8snamespace, + }, + Spec: gwv1.HTTPRouteSpec{ + CommonRouteSpec: gwv1.CommonRouteSpec{ + ParentRefs: []gwv1.ParentReference{ + { + Name: gwv1.ObjectName(testGateway.Name), + SectionName: lo.ToPtr(gwv1.SectionName("http")), + }, + }, + }, + Rules: []gwv1.HTTPRouteRule{ + { + BackendRefs: []gwv1.HTTPBackendRef{ + { + BackendRef: gwv1.BackendRef{ + BackendObjectReference: gwv1.BackendObjectReference{ + Name: gwv1.ObjectName(service1.Name), + Kind: lo.ToPtr(gwv1.Kind("Service")), + Port: lo.ToPtr(gwv1.PortNumber(80)), + }, + Weight: lo.ToPtr(int32(50)), + }, + }, + { + BackendRef: gwv1.BackendRef{ + BackendObjectReference: gwv1.BackendObjectReference{ + Name: gwv1.ObjectName(service2.Name), + Kind: lo.ToPtr(gwv1.Kind("Service")), + Port: lo.ToPtr(gwv1.PortNumber(80)), + }, + Weight: lo.ToPtr(int32(50)), + }, + }, + }, + }, + }, + }, + } + testFramework.ExpectCreated(ctx, httpRoute) + + Eventually(func(g Gomega) { + events := &corev1.EventList{} + err := testFramework.List(ctx, events, client.InNamespace(k8snamespace)) + g.Expect(err).To(BeNil()) + + found := false + for _, event := range events.Items { + if event.InvolvedObject.Name == httpRoute.Name && + event.Reason == "FailedDeployModel" { + if strings.Contains(event.Message, "Found existing resource not owned by controller") { + found = true + break + } + } + } + g.Expect(found).To(BeTrue()) + }).WithTimeout(2 * time.Minute).Should(Succeed()) + }) + + It("Adding takeover annotation to HttpRoute should allow HttpRoute to takeover the service", func() { + Eventually(func(g Gomega) { + err := testFramework.Get(ctx, client.ObjectKeyFromObject(httpRoute), httpRoute) + g.Expect(err).To(BeNil()) + + httpRoute.Annotations = map[string]string{ + "application-networking.k8s.aws/allow-takeover-from": originalManagedBy, + } + err = testFramework.Update(ctx, httpRoute) + g.Expect(err).To(BeNil()) + }).WithTimeout(2 * time.Minute).Should(Succeed()) + }) + + It("Verify takeover completed successfully", func() { + currentManagedBy := fmt.Sprintf("%s/%s/%s", + testFramework.Cloud.Config().AccountId, + testFramework.Cloud.Config().ClusterName, + testFramework.Cloud.Config().VpcId) + + Eventually(func(g Gomega) { + getRuleResp, err := testFramework.LatticeClient.GetRule(&vpclattice.GetRuleInput{ + ServiceIdentifier: preCreatedServiceArn, + ListenerIdentifier: preCreatedListenerArn, + RuleIdentifier: preCreatedRuleArn, + }) + g.Expect(err).To(BeNil()) + + // Verify service ManagedBy tag updated + serviceTags, err := testFramework.LatticeClient.ListTagsForResource(&vpclattice.ListTagsForResourceInput{ + ResourceArn: preCreatedServiceArn, + }) + g.Expect(err).To(BeNil()) + managedByTag := serviceTags.Tags["application-networking.k8s.aws/ManagedBy"] + g.Expect(*managedByTag).To(Equal(currentManagedBy)) + + // Verify rule now has 2 target groups + g.Expect(getRuleResp.Action.Forward).ToNot(BeNil()) + g.Expect(len(getRuleResp.Action.Forward.TargetGroups)).To(Equal(2)) + g.Expect(*getRuleResp.Action.Forward.TargetGroups[0].Weight).To(Equal(int64(50))) + g.Expect(*getRuleResp.Action.Forward.TargetGroups[1].Weight).To(Equal(int64(50))) + + // Verify original target group is no longer referenced in the rule + for _, tg := range getRuleResp.Action.Forward.TargetGroups { + g.Expect(*tg.TargetGroupIdentifier).ToNot(Equal(*preCreatedTargetGroupArn)) + } + + // Verify rule ManagedBy tag updated + ruleTags, err := testFramework.LatticeClient.ListTagsForResource(&vpclattice.ListTagsForResourceInput{ + ResourceArn: preCreatedRuleArn, + }) + g.Expect(err).To(BeNil()) + ruleManagedByTag := ruleTags.Tags["application-networking.k8s.aws/ManagedBy"] + g.Expect(*ruleManagedByTag).To(Equal(currentManagedBy)) + + // Verify listener ManagedBy tag updated + listenerTags, err := testFramework.LatticeClient.ListTagsForResource(&vpclattice.ListTagsForResourceInput{ + ResourceArn: preCreatedListenerArn, + }) + g.Expect(err).To(BeNil()) + listenerManagedByTag := listenerTags.Tags["application-networking.k8s.aws/ManagedBy"] + g.Expect(*listenerManagedByTag).To(Equal(currentManagedBy)) + + // Verify service network service association ManagedBy tag updated + assocTags, err := testFramework.LatticeClient.ListTagsForResource(&vpclattice.ListTagsForResourceInput{ + ResourceArn: preCreatedAssociationArn, + }) + g.Expect(err).To(BeNil()) + assocManagedByTag := assocTags.Tags["application-networking.k8s.aws/ManagedBy"] + g.Expect(*assocManagedByTag).To(Equal(currentManagedBy)) + + }).WithTimeout(5 * time.Minute).Should(Succeed()) + }) + + AfterAll(func() { + if httpRoute != nil { + testFramework.ExpectDeletedThenNotFound(ctx, httpRoute) + } + if deployment1 != nil && service1 != nil && deployment2 != nil && service2 != nil { + testFramework.ExpectDeletedThenNotFound(ctx, deployment1, service1, deployment2, service2) + } + if preCreatedRuleArn != nil { + _, err := testFramework.LatticeClient.DeleteRule(&vpclattice.DeleteRuleInput{ + ServiceIdentifier: preCreatedServiceArn, + ListenerIdentifier: preCreatedListenerArn, + RuleIdentifier: preCreatedRuleArn, + }) + if err != nil { + if reqErr, ok := err.(awserr.RequestFailure); !ok || reqErr.StatusCode() != 404 { + log.Printf("Failed to delete rule %s: %v", *preCreatedRuleArn, err) + } + } + } + + if preCreatedListenerArn != nil { + _, err := testFramework.LatticeClient.DeleteListener(&vpclattice.DeleteListenerInput{ + ServiceIdentifier: preCreatedServiceArn, + ListenerIdentifier: preCreatedListenerArn, + }) + if err != nil { + if reqErr, ok := err.(awserr.RequestFailure); !ok || reqErr.StatusCode() != 404 { + log.Printf("Failed to delete listener %s: %v", *preCreatedListenerArn, err) + } + } + } + + if preCreatedAssociationArn != nil { + _, err := testFramework.LatticeClient.DeleteServiceNetworkServiceAssociation(&vpclattice.DeleteServiceNetworkServiceAssociationInput{ + ServiceNetworkServiceAssociationIdentifier: preCreatedAssociationArn, + }) + if err != nil { + if reqErr, ok := err.(awserr.RequestFailure); !ok || reqErr.StatusCode() != 404 { + log.Printf("Failed to delete association %s: %v", *preCreatedAssociationArn, err) + } + } else { + Eventually(func(g Gomega) { + _, err := testFramework.LatticeClient.GetServiceNetworkServiceAssociation(&vpclattice.GetServiceNetworkServiceAssociationInput{ + ServiceNetworkServiceAssociationIdentifier: preCreatedAssociationArn, + }) + g.Expect(err).To(HaveOccurred()) + }).WithTimeout(2 * time.Minute).Should(Succeed()) + } + } + + if preCreatedServiceArn != nil { + _, err := testFramework.LatticeClient.DeleteService(&vpclattice.DeleteServiceInput{ + ServiceIdentifier: preCreatedServiceArn, + }) + if err != nil { + if reqErr, ok := err.(awserr.RequestFailure); !ok || reqErr.StatusCode() != 404 { + log.Printf("Failed to delete service %s: %v", *preCreatedServiceArn, err) + } + } + } + + if preCreatedTargetGroupArn != nil { + _, err := testFramework.LatticeClient.DeregisterTargets(&vpclattice.DeregisterTargetsInput{ + TargetGroupIdentifier: preCreatedTargetGroupArn, + Targets: []*vpclattice.Target{ + { + Id: aws.String("192.168.1.100"), + Port: aws.Int64(8090), + }, + }, + }) + if err != nil { + if reqErr, ok := err.(awserr.RequestFailure); !ok || reqErr.StatusCode() != 404 { + log.Printf("Failed to deregister targets from %s: %v", *preCreatedTargetGroupArn, err) + } + } + + _, err = testFramework.LatticeClient.DeleteTargetGroup(&vpclattice.DeleteTargetGroupInput{ + TargetGroupIdentifier: preCreatedTargetGroupArn, + }) + if err != nil { + if reqErr, ok := err.(awserr.RequestFailure); !ok || reqErr.StatusCode() != 404 { + log.Printf("Failed to delete target group %s: %v", *preCreatedTargetGroupArn, err) + } + } + } + }) +}) From 253992397fa1ddfa33aa42837f1b7805eeec154e Mon Sep 17 00:00:00 2001 From: vbedi Date: Thu, 30 Oct 2025 20:15:42 +0000 Subject: [PATCH 2/2] address review comments --- docs/guides/advanced-configurations.md | 2 +- pkg/deploy/lattice/service_manager.go | 3 +-- test/suites/integration/service_takeover_test.go | 15 +++++++-------- 3 files changed, 9 insertions(+), 11 deletions(-) diff --git a/docs/guides/advanced-configurations.md b/docs/guides/advanced-configurations.md index e109954f..bc6673ed 100644 --- a/docs/guides/advanced-configurations.md +++ b/docs/guides/advanced-configurations.md @@ -135,7 +135,7 @@ spec: ### Blue/Green Multi-Cluster Migration with Service Takeover -For blue/green cluster migrations, the controller supports automated takeover of VPC Lattice services using the `allow-takeover-from` annotation. This eliminates the need for manual ManagedBy tag changes during cluster migrations. +For blue/green cluster migrations, the controller supports automated takeover of VPC Lattice services using the `application-networking.k8s.aws/allow-takeover-from` annotation. The annotation value must match the value of the `application-networking.k8s.aws/ManagedBy` tag on the VPC Lattice Service, which has the format `{AWS_ACCOUNT_ID}/{CLUSTER_NAME}/{VPC_ID}` (e.g., "123456789012/blue-cluster/vpc-0abc123def456789"). This eliminates the need for manual ManagedBy tag changes during cluster migrations. #### Migration Workflow diff --git a/pkg/deploy/lattice/service_manager.go b/pkg/deploy/lattice/service_manager.go index 40f9be60..1be5b5e7 100644 --- a/pkg/deploy/lattice/service_manager.go +++ b/pkg/deploy/lattice/service_manager.go @@ -149,8 +149,7 @@ func (m *defaultServiceManager) checkAndUpdateTags(ctx context.Context, svc *Ser return err } if !owned { - canTakeover := m.canTakeoverService(svc, tagsResp.Tags) - if canTakeover { + if m.canTakeoverService(svc, tagsResp.Tags) { currentOwner := m.cloud.GetManagedByFromTags(tagsResp.Tags) newOwner := m.cloud.DefaultTags()[pkg_aws.TagManagedBy] err = m.transferServiceOwnership(ctx, svcSum.Arn, newOwner) diff --git a/test/suites/integration/service_takeover_test.go b/test/suites/integration/service_takeover_test.go index 124271c7..aa4749a6 100644 --- a/test/suites/integration/service_takeover_test.go +++ b/test/suites/integration/service_takeover_test.go @@ -4,7 +4,6 @@ import ( "fmt" "log" "strings" - "time" "github.com/aws/aws-application-networking-k8s/test/pkg/test" "github.com/aws/aws-sdk-go/aws" @@ -59,7 +58,7 @@ var _ = Describe("Service Takeover Test", Ordered, func() { }) g.Expect(err).To(BeNil()) g.Expect(*getServiceResp.Status).To(Equal("ACTIVE")) - }).WithTimeout(2 * time.Minute).Should(Succeed()) + }).Should(Succeed()) listenerResp, err := testFramework.LatticeClient.CreateListener(&vpclattice.CreateListenerInput{ ServiceIdentifier: preCreatedServiceArn, @@ -99,7 +98,7 @@ var _ = Describe("Service Takeover Test", Ordered, func() { }) g.Expect(err).To(BeNil()) g.Expect(*getTargetGroupResp.Status).To(Equal("ACTIVE")) - }).WithTimeout(2 * time.Minute).Should(Succeed()) + }).Should(Succeed()) _, err = testFramework.LatticeClient.RegisterTargets(&vpclattice.RegisterTargetsInput{ TargetGroupIdentifier: preCreatedTargetGroupArn, @@ -160,7 +159,7 @@ var _ = Describe("Service Takeover Test", Ordered, func() { }) g.Expect(err).To(BeNil()) g.Expect(*getAssocResp.Status).To(Equal("ACTIVE")) - }).WithTimeout(2 * time.Minute).Should(Succeed()) + }).Should(Succeed()) }) It("Creating HTTPRoute without takeover annotation should fail", func() { @@ -238,7 +237,7 @@ var _ = Describe("Service Takeover Test", Ordered, func() { } } g.Expect(found).To(BeTrue()) - }).WithTimeout(2 * time.Minute).Should(Succeed()) + }).Should(Succeed()) }) It("Adding takeover annotation to HttpRoute should allow HttpRoute to takeover the service", func() { @@ -251,7 +250,7 @@ var _ = Describe("Service Takeover Test", Ordered, func() { } err = testFramework.Update(ctx, httpRoute) g.Expect(err).To(BeNil()) - }).WithTimeout(2 * time.Minute).Should(Succeed()) + }).Should(Succeed()) }) It("Verify takeover completed successfully", func() { @@ -311,7 +310,7 @@ var _ = Describe("Service Takeover Test", Ordered, func() { assocManagedByTag := assocTags.Tags["application-networking.k8s.aws/ManagedBy"] g.Expect(*assocManagedByTag).To(Equal(currentManagedBy)) - }).WithTimeout(5 * time.Minute).Should(Succeed()) + }).Should(Succeed()) }) AfterAll(func() { @@ -360,7 +359,7 @@ var _ = Describe("Service Takeover Test", Ordered, func() { ServiceNetworkServiceAssociationIdentifier: preCreatedAssociationArn, }) g.Expect(err).To(HaveOccurred()) - }).WithTimeout(2 * time.Minute).Should(Succeed()) + }).Should(Succeed()) } }