diff --git a/cmd/flagger/main.go b/cmd/flagger/main.go index fa7bb1a94..8984c1f0f 100644 --- a/cmd/flagger/main.go +++ b/cmd/flagger/main.go @@ -88,6 +88,9 @@ var ( kubeconfigServiceMesh string clusterName string noCrossNamespaceRefs bool + istioMultiClusterEnabled bool + istioMultiClusterLabel string + istioMultiClusterNs string ) func init() { @@ -123,6 +126,9 @@ func init() { flag.StringVar(&kubeconfigServiceMesh, "kubeconfig-service-mesh", "", "Path to a kubeconfig for the service mesh control plane cluster.") flag.StringVar(&clusterName, "cluster-name", "", "Cluster name to be included in alert msgs.") flag.BoolVar(&noCrossNamespaceRefs, "no-cross-namespace-refs", false, "When set to true, Flagger can only refer to resources in the same namespace.") + flag.BoolVar(&istioMultiClusterEnabled, "istio-multicluster-enabled", false, "Enable Istio multi-cluster support.") + flag.StringVar(&istioMultiClusterLabel, "istio-multicluster-secret-label", "istio/multiCluster=true", "Label on secrets for Istio multi-cluster discovery.") + flag.StringVar(&istioMultiClusterNs, "istio-multicluster-secret-namespace", "istio-system", "Namespace where Istio multi-cluster secrets are located.") } func main() { @@ -228,7 +234,13 @@ func main() { setOwnerRefs = false } - routerFactory := router.NewFactory(cfg, kubeClient, flaggerClient, knativeClient, ingressAnnotationsPrefix, ingressClass, logger, meshClient, setOwnerRefs) + var clusterManager *router.ClusterManager + if istioMultiClusterEnabled { + clusterManager = router.NewClusterManager(kubeClient, meshClient, logger, istioMultiClusterLabel, istioMultiClusterNs) + clusterManager.Start(stopCh) + } + + routerFactory := router.NewFactory(cfg, kubeClient, flaggerClient, knativeClient, ingressAnnotationsPrefix, ingressClass, logger, meshClient, setOwnerRefs, clusterManager) var configTracker canary.Tracker if enableConfigTracking { diff --git a/pkg/controller/controller.go b/pkg/controller/controller.go index 39c72c373..3d50bd64f 100644 --- a/pkg/controller/controller.go +++ b/pkg/controller/controller.go @@ -159,9 +159,9 @@ func NewController( ctrl.enqueue(new) } else if !newCanary.DeletionTimestamp.IsZero() && hasFinalizer(&newCanary) || - !hasFinalizer(&newCanary) && newCanary.Spec.RevertOnDeletion { + !hasFinalizer(&newCanary) && (newCanary.Spec.RevertOnDeletion || ctrl.routerFactory.IsMultiClusterEnabled()) { // If this was marked for deletion and has finalizers enqueue for finalizing or - // if this canary doesn't have finalizers and RevertOnDeletion is true updated speck enqueue + // if this canary doesn't have finalizers and RevertOnDeletion is true (or multi-cluster is enabled) updated speck enqueue ctrl.enqueue(new) } @@ -266,8 +266,8 @@ func (c *Controller) syncHandler(key string) error { return fmt.Errorf("invalid canary spec: %s", err) } - // Finalize if canary has been marked for deletion and revert is desired - if cd.Spec.RevertOnDeletion && cd.ObjectMeta.DeletionTimestamp != nil { + // Finalize if canary has been marked for deletion and revert is desired (or multi-cluster is enabled) + if (cd.Spec.RevertOnDeletion || c.routerFactory.IsMultiClusterEnabled()) && cd.ObjectMeta.DeletionTimestamp != nil { // If finalizers have been previously removed proceed if !hasFinalizer(cd) { c.logger.Infof("Canary %s.%s has been finalized", cd.Name, cd.Namespace) @@ -306,8 +306,8 @@ func (c *Controller) syncHandler(key string) error { c.canaries.Store(fmt.Sprintf("%s.%s", cd.Name, cd.Namespace), cd) - // If opt in for revertOnDeletion add finalizer if not present - if cd.Spec.RevertOnDeletion && !hasFinalizer(cd) { + // If opt in for revertOnDeletion (or multi-cluster is enabled) add finalizer if not present + if (cd.Spec.RevertOnDeletion || c.routerFactory.IsMultiClusterEnabled()) && !hasFinalizer(cd) { if err := c.addFinalizer(cd); err != nil { return fmt.Errorf("unable to add finalizer to canary %s.%s: %w", cd.Name, cd.Namespace, err) } diff --git a/pkg/controller/finalizer.go b/pkg/controller/finalizer.go index 57ab04863..33ebb3674 100644 --- a/pkg/controller/finalizer.go +++ b/pkg/controller/finalizer.go @@ -18,8 +18,8 @@ package controller import ( "context" - "errors" "fmt" + "strings" "time" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -80,12 +80,12 @@ func (c *Controller) finalize(old interface{}) error { Cap: canary.GetAnalysisInterval(), Steps: 4, } - retry.OnError(backoff, func(err error) bool { - return err.Error() == "retriable error" + err = retry.OnError(backoff, func(err error) bool { + return strings.Contains(err.Error(), "retriable: ") }, func() error { retriable, err := canaryController.IsCanaryReady(canary) if err != nil && retriable { - return errors.New("retriable error") + return fmt.Errorf("retriable: %w", err) } return err }) diff --git a/pkg/controller/multi_cluster_controller_test.go b/pkg/controller/multi_cluster_controller_test.go new file mode 100644 index 000000000..831f1ef15 --- /dev/null +++ b/pkg/controller/multi_cluster_controller_test.go @@ -0,0 +1,68 @@ +package controller + +import ( + "context" + "fmt" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + + "github.com/fluxcd/flagger/pkg/router" +) + +func TestController_MultiClusterFinalizer(t *testing.T) { + // 1. Setup Canary WITHOUT RevertOnDeletion + canary := newDeploymentTestCanary() + canary.Spec.RevertOnDeletion = false + + // 2. Setup Fixture with Multi-Cluster ENABLED + // We'll mock the ClusterManager directly in the factory + mocks := newDeploymentFixture(canary) + + // Inject ClusterManager to simulate multi-cluster enabled + cm := router.NewClusterManager(mocks.kubeClient, mocks.flaggerClient, mocks.logger, "istio/multiCluster=true", "istio-system") + rf := router.NewFactory(nil, mocks.kubeClient, mocks.flaggerClient, nil, "", "", mocks.logger, mocks.flaggerClient, true, cm) + + mocks.ctrl.routerFactory = rf // Override with our multi-cluster factory + + // 3. Sync the canary - this should ADD the finalizer because multi-cluster is enabled + key := fmt.Sprintf("%s/%s", canary.Namespace, canary.Name) + err := mocks.ctrl.syncHandler(key) + require.NoError(t, err) + + // 4. Verify Finalizer was added + c, err := mocks.flaggerClient.FlaggerV1beta1().Canaries(canary.Namespace).Get(context.Background(), canary.Name, metav1.GetOptions{}) + require.NoError(t, err) + assert.True(t, hasFinalizer(c), "Finalizer should be added in multi-cluster mode even if RevertOnDeletion is false") + + // Initialize canary (creates primary, set status etc) + mocks.ctrl.advanceCanary(canary.Name, canary.Namespace) + mocks.makePrimaryReady(t) + mocks.makeCanaryReady(t) + mocks.ctrl.advanceCanary(canary.Name, canary.Namespace) + + // Update informer indexer + c, _ = mocks.flaggerClient.FlaggerV1beta1().Canaries(canary.Namespace).Get(context.Background(), canary.Name, metav1.GetOptions{}) + err = mocks.ctrl.flaggerInformers.CanaryInformer.Informer().GetIndexer().Update(c) + require.NoError(t, err) + + // 5. Mark for Deletion + now := metav1.Now() + c.DeletionTimestamp = &now + c, err = mocks.flaggerClient.FlaggerV1beta1().Canaries(canary.Namespace).Update(context.Background(), c, metav1.UpdateOptions{}) + require.NoError(t, err) + // Update informer indexer + err = mocks.ctrl.flaggerInformers.CanaryInformer.Informer().GetIndexer().Update(c) + require.NoError(t, err) + + // 6. Sync again - this should TRIGGER finalization + err = mocks.ctrl.syncHandler(key) + require.NoError(t, err) + + // 7. Verify Finalizer was removed (after successful finalization) + c, err = mocks.flaggerClient.FlaggerV1beta1().Canaries(canary.Namespace).Get(context.Background(), canary.Name, metav1.GetOptions{}) + require.NoError(t, err) + assert.False(t, hasFinalizer(c), "Finalizer should be removed after successful finalization") +} diff --git a/pkg/controller/scheduler.go b/pkg/controller/scheduler.go index 12a94befc..26bd23daf 100644 --- a/pkg/controller/scheduler.go +++ b/pkg/controller/scheduler.go @@ -550,9 +550,6 @@ func (c *Controller) runPromotionTrafficShift(canary *flaggerv1.Canary, canaryCo } } } - - return - } func (c *Controller) runCanary(canary *flaggerv1.Canary, canaryController canary.Controller, diff --git a/pkg/controller/scheduler_daemonset_fixture_test.go b/pkg/controller/scheduler_daemonset_fixture_test.go index afbd0881b..843f4fa6a 100644 --- a/pkg/controller/scheduler_daemonset_fixture_test.go +++ b/pkg/controller/scheduler_daemonset_fixture_test.go @@ -92,7 +92,7 @@ func newDaemonSetFixture(c *flaggerv1.Canary) daemonSetFixture { } // init router - rf := router.NewFactory(nil, kubeClient, flaggerClient, nil, "annotationsPrefix", "", logger, flaggerClient, true) + rf := router.NewFactory(nil, kubeClient, flaggerClient, nil, "annotationsPrefix", "", logger, flaggerClient, true, nil) // init observer observerFactory, _ := observers.NewFactory(testMetricsServerURL) diff --git a/pkg/controller/scheduler_deployment_fixture_test.go b/pkg/controller/scheduler_deployment_fixture_test.go index 4efb9b639..b29695df4 100644 --- a/pkg/controller/scheduler_deployment_fixture_test.go +++ b/pkg/controller/scheduler_deployment_fixture_test.go @@ -121,7 +121,7 @@ func newDeploymentFixture(c *flaggerv1.Canary) fixture { } // init router - rf := router.NewFactory(nil, kubeClient, flaggerClient, nil, "annotationsPrefix", "", logger, flaggerClient, true) + rf := router.NewFactory(nil, kubeClient, flaggerClient, nil, "annotationsPrefix", "", logger, flaggerClient, true, nil) // init observer observerFactory, _ := observers.NewFactory(testMetricsServerURL) diff --git a/pkg/loadtester/gate.go b/pkg/loadtester/gate.go index 324124894..46842b49a 100644 --- a/pkg/loadtester/gate.go +++ b/pkg/loadtester/gate.go @@ -16,7 +16,12 @@ limitations under the License. package loadtester -import "sync" +import ( + "fmt" + "sync" + + flaggerv1 "github.com/fluxcd/flagger/pkg/apis/flagger/v1beta1" +) type GateStorage struct { backend string @@ -45,3 +50,15 @@ func (gs *GateStorage) isOpen(key string) (locked bool) { } return } + +func gateKey(payload *flaggerv1.CanaryWebhookPayload) string { + key := fmt.Sprintf("%s.%s", payload.Name, payload.Namespace) + if payload.Checksum != "" { + return fmt.Sprintf("%s.%s", key, payload.Checksum) + } + return key +} + +func rollbackKey(payload *flaggerv1.CanaryWebhookPayload) string { + return fmt.Sprintf("rollback.%s", gateKey(payload)) +} diff --git a/pkg/loadtester/server.go b/pkg/loadtester/server.go index 5f7a3674e..716855391 100644 --- a/pkg/loadtester/server.go +++ b/pkg/loadtester/server.go @@ -66,7 +66,7 @@ func ListenAndServe(port string, timeout time.Duration, logger *zap.SugaredLogge return } - canaryName := fmt.Sprintf("%s.%s", canary.Name, canary.Namespace) + canaryName := gateKey(canary) approved := gate.isOpen(canaryName) if approved { w.WriteHeader(http.StatusOK) @@ -102,7 +102,7 @@ func ListenAndServe(port string, timeout time.Duration, logger *zap.SugaredLogge return } - canaryName := fmt.Sprintf("%s.%s", canary.Name, canary.Namespace) + canaryName := gateKey(canary) gate.open(canaryName) w.WriteHeader(http.StatusAccepted) @@ -133,7 +133,7 @@ func ListenAndServe(port string, timeout time.Duration, logger *zap.SugaredLogge return } - canaryName := fmt.Sprintf("%s.%s", canary.Name, canary.Namespace) + canaryName := gateKey(canary) gate.close(canaryName) w.WriteHeader(http.StatusAccepted) @@ -164,7 +164,7 @@ func ListenAndServe(port string, timeout time.Duration, logger *zap.SugaredLogge return } - canaryName := fmt.Sprintf("rollback.%s.%s", canary.Name, canary.Namespace) + canaryName := rollbackKey(canary) approved := gate.isOpen(canaryName) if approved { w.WriteHeader(http.StatusOK) @@ -199,7 +199,7 @@ func ListenAndServe(port string, timeout time.Duration, logger *zap.SugaredLogge return } - canaryName := fmt.Sprintf("rollback.%s.%s", canary.Name, canary.Namespace) + canaryName := rollbackKey(canary) gate.open(canaryName) w.WriteHeader(http.StatusAccepted) @@ -229,7 +229,7 @@ func ListenAndServe(port string, timeout time.Duration, logger *zap.SugaredLogge return } - canaryName := fmt.Sprintf("rollback.%s.%s", canary.Name, canary.Namespace) + canaryName := rollbackKey(canary) gate.close(canaryName) w.WriteHeader(http.StatusAccepted) diff --git a/pkg/loadtester/server_test.go b/pkg/loadtester/server_test.go index f45fe8b14..be7195898 100644 --- a/pkg/loadtester/server_test.go +++ b/pkg/loadtester/server_test.go @@ -84,6 +84,28 @@ func TestServer_HandleNewBashTaskCmdExitNonZero(t *testing.T) { assert.Equal(t, "command false failed: : exit status 1", resp.Body.String()) } +func TestGateStorageScopesApprovalByChecksum(t *testing.T) { + gate := NewGateStorage("in-memory") + current := &flaggerv1.CanaryWebhookPayload{Name: "podinfo", Namespace: "test", Checksum: "abc123"} + stale := &flaggerv1.CanaryWebhookPayload{Name: "podinfo", Namespace: "test", Checksum: "def456"} + + gate.open(gateKey(current)) + + assert.True(t, gate.isOpen(gateKey(current))) + assert.False(t, gate.isOpen(gateKey(stale))) +} + +func TestGateStorageScopesRollbackByChecksum(t *testing.T) { + gate := NewGateStorage("in-memory") + current := &flaggerv1.CanaryWebhookPayload{Name: "podinfo", Namespace: "test", Checksum: "abc123"} + stale := &flaggerv1.CanaryWebhookPayload{Name: "podinfo", Namespace: "test", Checksum: "def456"} + + gate.open(rollbackKey(current)) + + assert.True(t, gate.isOpen(rollbackKey(current))) + assert.False(t, gate.isOpen(rollbackKey(stale))) +} + func newJsonRequest(method string, url string, v interface{}) *http.Request { payload, _ := json.Marshal(v) req, _ := http.NewRequest(method, url, bytes.NewReader(payload)) diff --git a/pkg/router/factory.go b/pkg/router/factory.go index 838cd2bfe..63b9c4aed 100644 --- a/pkg/router/factory.go +++ b/pkg/router/factory.go @@ -38,6 +38,7 @@ type Factory struct { ingressClass string logger *zap.SugaredLogger setOwnerRefs bool + clusterManager *ClusterManager } func NewFactory(kubeConfig *restclient.Config, kubeClient kubernetes.Interface, @@ -47,7 +48,8 @@ func NewFactory(kubeConfig *restclient.Config, kubeClient kubernetes.Interface, ingressClass string, logger *zap.SugaredLogger, meshClient clientset.Interface, - setOwnerRefs bool) *Factory { + setOwnerRefs bool, + clusterManager *ClusterManager) *Factory { return &Factory{ kubeConfig: kubeConfig, meshClient: meshClient, @@ -58,6 +60,7 @@ func NewFactory(kubeConfig *restclient.Config, kubeClient kubernetes.Interface, ingressClass: ingressClass, logger: logger, setOwnerRefs: setOwnerRefs, + clusterManager: clusterManager, } } @@ -109,11 +112,12 @@ func (factory *Factory) MeshRouter(provider string, labelSelector string) Interf } case provider == flaggerv1.IstioProvider: return &IstioRouter{ - logger: factory.logger, - flaggerClient: factory.flaggerClient, - kubeClient: factory.kubeClient, - istioClient: factory.meshClient, - setOwnerRefs: factory.setOwnerRefs, + logger: factory.logger, + flaggerClient: factory.flaggerClient, + kubeClient: factory.kubeClient, + istioClients: factory.getMeshClients(), + setOwnerRefs: factory.setOwnerRefs, + clusterManager: factory.clusterManager, } case strings.HasPrefix(provider, flaggerv1.SMIProvider+":v1alpha1"): mesh := strings.TrimPrefix(provider, flaggerv1.SMIProvider+":v1alpha1:") @@ -225,11 +229,24 @@ func (factory *Factory) MeshRouter(provider string, labelSelector string) Interf return &NopRouter{} default: return &IstioRouter{ - logger: factory.logger, - flaggerClient: factory.flaggerClient, - kubeClient: factory.kubeClient, - istioClient: factory.meshClient, - setOwnerRefs: factory.setOwnerRefs, + logger: factory.logger, + flaggerClient: factory.flaggerClient, + kubeClient: factory.kubeClient, + istioClients: factory.getMeshClients(), + setOwnerRefs: factory.setOwnerRefs, + clusterManager: factory.clusterManager, } } } + +func (factory *Factory) getMeshClients() []clientset.Interface { + if factory.clusterManager != nil { + return factory.clusterManager.GetClients() + } + return []clientset.Interface{factory.meshClient} +} + +// IsMultiClusterEnabled returns true if multi-cluster support is enabled +func (factory *Factory) IsMultiClusterEnabled() bool { + return factory.clusterManager != nil +} diff --git a/pkg/router/istio.go b/pkg/router/istio.go index 6a63c2bc9..d6709bcde 100644 --- a/pkg/router/istio.go +++ b/pkg/router/istio.go @@ -40,11 +40,12 @@ import ( // IstioRouter is managing Istio virtual services type IstioRouter struct { - kubeClient kubernetes.Interface - istioClient clientset.Interface - flaggerClient clientset.Interface - logger *zap.SugaredLogger - setOwnerRefs bool + kubeClient kubernetes.Interface + istioClients []clientset.Interface + flaggerClient clientset.Interface + logger *zap.SugaredLogger + setOwnerRefs bool + clusterManager *ClusterManager } const cookieHeader = "Cookie" @@ -53,31 +54,65 @@ const maxAgeAttr = "Max-Age" var letters = []rune("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ") +// resolveNamespace checks if a namespace exists on a remote cluster. +// For the local cluster, always returns the original namespace. +func (ir *IstioRouter) resolveNamespace(cluster MultiClusterClient, namespace string) string { + if ir.clusterManager != nil { + return ir.clusterManager.ResolveNamespace(cluster, namespace) + } + return namespace +} + // Reconcile creates or updates the Istio virtual service and destination rules func (ir *IstioRouter) Reconcile(canary *flaggerv1.Canary) error { + clusters := ir.getClusters() + if len(clusters) == 0 { + return fmt.Errorf("no Istio clients available") + } + + // Local cluster (first client) — must succeed + if err := ir.reconcileWithClient(canary, clusters[0].IstioClient); err != nil { + return err + } + + // Remote clusters — best effort, with automated namespace creation + for i := 1; i < len(clusters); i++ { + ns := ir.resolveNamespace(clusters[i], canary.Namespace) + remoteCanary := canary.DeepCopy() + remoteCanary.Namespace = ns + if err := ir.reconcileWithClient(remoteCanary, clusters[i].IstioClient); err != nil { + ir.logger.Warnf("Multi-cluster: failed to reconcile on remote cluster %d: %v", i, err) + } else { + ir.logger.Infof("Multi-cluster: successfully reconciled on remote cluster %d in namespace %s", i, ns) + } + } + return nil +} + +func (ir *IstioRouter) reconcileWithClient(canary *flaggerv1.Canary, istioClient clientset.Interface) error { _, primaryName, canaryName := canary.GetServiceNames() - if err := ir.reconcileDestinationRule(canary, canaryName); err != nil { + if err := ir.reconcileDestinationRule(canary, canaryName, istioClient); err != nil { return fmt.Errorf("reconcileDestinationRule failed: %w", err) } - if err := ir.reconcileDestinationRule(canary, primaryName); err != nil { + if err := ir.reconcileDestinationRule(canary, primaryName, istioClient); err != nil { return fmt.Errorf("reconcileDestinationRule failed: %w", err) } - if err := ir.reconcileVirtualService(canary); err != nil { + if err := ir.reconcileVirtualService(canary, istioClient); err != nil { return fmt.Errorf("reconcileVirtualService failed: %w", err) } return nil } -func (ir *IstioRouter) reconcileDestinationRule(canary *flaggerv1.Canary, name string) error { +func (ir *IstioRouter) reconcileDestinationRule(canary *flaggerv1.Canary, name string, istioClient clientset.Interface) error { newSpec := istiov1beta1.DestinationRuleSpec{ Host: name, TrafficPolicy: canary.Spec.Service.TrafficPolicy, } - destinationRule, err := ir.istioClient.NetworkingV1beta1().DestinationRules(canary.Namespace).Get(context.TODO(), name, metav1.GetOptions{}) + destinationRule, err := istioClient.NetworkingV1beta1().DestinationRules(canary.Namespace).Get(context.TODO(), name, metav1.GetOptions{}) // insert if errors.IsNotFound(err) { destinationRule = &istiov1beta1.DestinationRule{ @@ -96,7 +131,7 @@ func (ir *IstioRouter) reconcileDestinationRule(canary *flaggerv1.Canary, name s }), } } - _, err = ir.istioClient.NetworkingV1beta1().DestinationRules(canary.Namespace).Create(context.TODO(), destinationRule, metav1.CreateOptions{}) + _, err = istioClient.NetworkingV1beta1().DestinationRules(canary.Namespace).Create(context.TODO(), destinationRule, metav1.CreateOptions{}) if err != nil { return fmt.Errorf("DestinationRule %s.%s create error: %w", name, canary.Namespace, err) } @@ -112,7 +147,7 @@ func (ir *IstioRouter) reconcileDestinationRule(canary *flaggerv1.Canary, name s if diff := cmp.Diff(newSpec, destinationRule.Spec); diff != "" { clone := destinationRule.DeepCopy() clone.Spec = newSpec - _, err = ir.istioClient.NetworkingV1beta1().DestinationRules(canary.Namespace).Update(context.TODO(), clone, metav1.UpdateOptions{}) + _, err = istioClient.NetworkingV1beta1().DestinationRules(canary.Namespace).Update(context.TODO(), clone, metav1.UpdateOptions{}) if err != nil { return fmt.Errorf("DestinationRule %s.%s update error: %w", name, canary.Namespace, err) } @@ -141,7 +176,7 @@ func canaryToL4Match(canary *flaggerv1.Canary) []istiov1beta1.L4MatchAttributes return match } -func (ir *IstioRouter) reconcileVirtualService(canary *flaggerv1.Canary) error { +func (ir *IstioRouter) reconcileVirtualService(canary *flaggerv1.Canary, istioClient clientset.Interface) error { apexName, primaryName, canaryName := canary.GetServiceNames() if canary.Spec.Service.Delegation { @@ -260,7 +295,7 @@ func (ir *IstioRouter) reconcileVirtualService(canary *flaggerv1.Canary) error { } } - virtualService, err := ir.istioClient.NetworkingV1beta1().VirtualServices(canary.Namespace).Get(context.TODO(), apexName, metav1.GetOptions{}) + virtualService, err := istioClient.NetworkingV1beta1().VirtualServices(canary.Namespace).Get(context.TODO(), apexName, metav1.GetOptions{}) // insert if errors.IsNotFound(err) { virtualService = &istiov1beta1.VirtualService{ @@ -281,7 +316,7 @@ func (ir *IstioRouter) reconcileVirtualService(canary *flaggerv1.Canary) error { }), } } - _, err = ir.istioClient.NetworkingV1beta1().VirtualServices(canary.Namespace).Create(context.TODO(), virtualService, metav1.CreateOptions{}) + _, err = istioClient.NetworkingV1beta1().VirtualServices(canary.Namespace).Create(context.TODO(), virtualService, metav1.CreateOptions{}) if err != nil { return fmt.Errorf("VirtualService %s.%s create error: %w", apexName, canary.Namespace, err) } @@ -359,7 +394,7 @@ func (ir *IstioRouter) reconcileVirtualService(canary *flaggerv1.Canary) error { vtClone.ObjectMeta.Annotations[configAnnotation] = string(b) } - _, err = ir.istioClient.NetworkingV1beta1().VirtualServices(canary.Namespace).Update(context.TODO(), vtClone, metav1.UpdateOptions{}) + _, err = istioClient.NetworkingV1beta1().VirtualServices(canary.Namespace).Update(context.TODO(), vtClone, metav1.UpdateOptions{}) if err != nil { return fmt.Errorf("VirtualService %s.%s update error: %w", apexName, canary.Namespace, err) } @@ -380,7 +415,7 @@ func (ir *IstioRouter) GetRoutes(canary *flaggerv1.Canary) ( ) { apexName, primaryName, canaryName := canary.GetServiceNames() vs := &istiov1beta1.VirtualService{} - vs, err = ir.istioClient.NetworkingV1beta1().VirtualServices(canary.Namespace).Get(context.TODO(), apexName, metav1.GetOptions{}) + vs, err = ir.getClusters()[0].IstioClient.NetworkingV1beta1().VirtualServices(canary.Namespace).Get(context.TODO(), apexName, metav1.GetOptions{}) if err != nil { err = fmt.Errorf("VirtualService %s.%s get query error %v", apexName, canary.Namespace, err) return @@ -471,10 +506,41 @@ func (ir *IstioRouter) SetRoutes( primaryWeight int, canaryWeight int, mirrored bool, +) error { + clusters := ir.getClusters() + if len(clusters) == 0 { + return fmt.Errorf("no Istio clients available") + } + + // Local cluster (first client) — must succeed + if err := ir.setRoutesWithClient(canary, primaryWeight, canaryWeight, mirrored, clusters[0].IstioClient); err != nil { + return err + } + + // Remote clusters — best effort, with automated namespace creation + for i := 1; i < len(clusters); i++ { + ns := ir.resolveNamespace(clusters[i], canary.Namespace) + remoteCanary := canary.DeepCopy() + remoteCanary.Namespace = ns + if err := ir.setRoutesWithClient(remoteCanary, primaryWeight, canaryWeight, mirrored, clusters[i].IstioClient); err != nil { + ir.logger.Warnf("Multi-cluster: failed to set routes on remote cluster %d: %v", i, err) + } else { + ir.logger.Infof("Multi-cluster: successfully set routes on remote cluster %d in namespace %s", i, ns) + } + } + return nil +} + +func (ir *IstioRouter) setRoutesWithClient( + canary *flaggerv1.Canary, + primaryWeight int, + canaryWeight int, + mirrored bool, + istioClient clientset.Interface, ) error { apexName, primaryName, canaryName := canary.GetServiceNames() - vs, err := ir.istioClient.NetworkingV1beta1().VirtualServices(canary.Namespace).Get(context.TODO(), apexName, metav1.GetOptions{}) + vs, err := istioClient.NetworkingV1beta1().VirtualServices(canary.Namespace).Get(context.TODO(), apexName, metav1.GetOptions{}) if err != nil { return fmt.Errorf("VirtualService %s.%s get query error %v", apexName, canary.Namespace, err) } @@ -494,7 +560,7 @@ func (ir *IstioRouter) SetRoutes( weightedRoute, } - vs, err = ir.istioClient.NetworkingV1beta1().VirtualServices(canary.Namespace).Update(context.TODO(), vsCopy, metav1.UpdateOptions{}) + vs, err = istioClient.NetworkingV1beta1().VirtualServices(canary.Namespace).Update(context.TODO(), vsCopy, metav1.UpdateOptions{}) if err != nil { return fmt.Errorf("VirtualService %s.%s update failed: %w", apexName, canary.Namespace, err) } @@ -567,7 +633,7 @@ func (ir *IstioRouter) SetRoutes( } } - vs, err = ir.istioClient.NetworkingV1beta1().VirtualServices(canary.Namespace).Update(context.TODO(), vsCopy, metav1.UpdateOptions{}) + vs, err = istioClient.NetworkingV1beta1().VirtualServices(canary.Namespace).Update(context.TODO(), vsCopy, metav1.UpdateOptions{}) if err != nil { return fmt.Errorf("VirtualService %s.%s update failed: %w", apexName, canary.Namespace, err) } @@ -575,11 +641,47 @@ func (ir *IstioRouter) SetRoutes( } func (ir *IstioRouter) Finalize(canary *flaggerv1.Canary) error { - // Need to see if I can get the annotation orig-configuration - apexName, _, _ := canary.GetServiceNames() + clusters := ir.getClusters() + if len(clusters) == 0 { + return fmt.Errorf("no Istio clients available") + } + + // Local cluster (first client) — must succeed + if err := ir.finalizeWithClient(canary, clusters[0].IstioClient, false); err != nil { + return err + } + + // Remote clusters — best effort, with automated namespace creation + for i := 1; i < len(clusters); i++ { + ns := ir.resolveNamespace(clusters[i], canary.Namespace) + remoteCanary := canary.DeepCopy() + remoteCanary.Namespace = ns + if err := ir.finalizeWithClient(remoteCanary, clusters[i].IstioClient, true); err != nil { + ir.logger.Warnf("Multi-cluster: failed to finalize on remote cluster %d: %v", i, err) + } else { + ir.logger.Infof("Multi-cluster: successfully finalized on remote cluster %d in namespace %s", i, ns) + } + } + return nil +} + +func (ir *IstioRouter) finalizeWithClient(canary *flaggerv1.Canary, istioClient clientset.Interface, isRemote bool) error { + apexName, primaryName, canaryName := canary.GetServiceNames() - vs, err := ir.istioClient.NetworkingV1beta1().VirtualServices(canary.Namespace).Get(context.TODO(), apexName, metav1.GetOptions{}) + // On remote clusters, DestinationRules must be explicitly deleted as OwnerReferences don't work across clusters + if isRemote { + for _, name := range []string{primaryName, canaryName} { + if err := istioClient.NetworkingV1beta1().DestinationRules(canary.Namespace).Delete(context.TODO(), name, metav1.DeleteOptions{}); err != nil && !errors.IsNotFound(err) { + ir.logger.Warnf("Multi-cluster: failed to delete DestinationRule %s on remote cluster: %v", name, err) + } + } + } + + vs, err := istioClient.NetworkingV1beta1().VirtualServices(canary.Namespace).Get(context.TODO(), apexName, metav1.GetOptions{}) if err != nil { + if errors.IsNotFound(err) && isRemote { + return nil + } return fmt.Errorf("VirtualService %s.%s get query error: %w", apexName, canary.Namespace, err) } @@ -597,6 +699,13 @@ func (ir *IstioRouter) Finalize(canary *flaggerv1.Canary) error { apexName, canary.Namespace, configAnnotation) } } else { + // If original configuration is not found and it's a remote cluster, delete the VirtualService + if isRemote { + if err := istioClient.NetworkingV1beta1().VirtualServices(canary.Namespace).Delete(context.TODO(), apexName, metav1.DeleteOptions{}); err != nil && !errors.IsNotFound(err) { + return fmt.Errorf("Multi-cluster: failed to delete VirtualService %s on remote cluster: %w", apexName, err) + } + return nil + } ir.logger.Warnf("VirtualService %s.%s original configuration not found, unable to revert", apexName, canary.Namespace) return nil } @@ -604,13 +713,27 @@ func (ir *IstioRouter) Finalize(canary *flaggerv1.Canary) error { clone := vs.DeepCopy() clone.Spec = storedSpec - _, err = ir.istioClient.NetworkingV1beta1().VirtualServices(canary.Namespace).Update(context.TODO(), clone, metav1.UpdateOptions{}) + _, err = istioClient.NetworkingV1beta1().VirtualServices(canary.Namespace).Update(context.TODO(), clone, metav1.UpdateOptions{}) if err != nil { return fmt.Errorf("VirtualService %s.%s update error: %w", apexName, canary.Namespace, err) } return nil } +func (ir *IstioRouter) getClusters() []MultiClusterClient { + if ir.clusterManager != nil { + return ir.clusterManager.GetMultiClusterClients() + } + var clients []MultiClusterClient + for _, c := range ir.istioClients { + clients = append(clients, MultiClusterClient{ + IstioClient: c, + KubeClient: ir.kubeClient, + }) + } + return clients +} + // mergeMatchConditions appends the URI match rules to canary conditions func mergeMatchConditions(canary, defaults []istiov1beta1.HTTPMatchRequest) []istiov1beta1.HTTPMatchRequest { if len(defaults) == 0 { diff --git a/pkg/router/istio_test.go b/pkg/router/istio_test.go index 1c2423bd3..dde3e03f8 100644 --- a/pkg/router/istio_test.go +++ b/pkg/router/istio_test.go @@ -33,6 +33,7 @@ import ( flaggerv1 "github.com/fluxcd/flagger/pkg/apis/flagger/v1beta1" istiov1alpha1 "github.com/fluxcd/flagger/pkg/apis/istio/common/v1alpha1" istiov1beta1 "github.com/fluxcd/flagger/pkg/apis/istio/v1beta1" + clientset "github.com/fluxcd/flagger/pkg/client/clientset/versioned" ) func TestUnmarshalVirtualService(t *testing.T) { @@ -80,7 +81,7 @@ func TestIstioRouter_Sync(t *testing.T) { router := &IstioRouter{ logger: mocks.logger, flaggerClient: mocks.flaggerClient, - istioClient: mocks.meshClient, + istioClients: []clientset.Interface{mocks.meshClient}, kubeClient: mocks.kubeClient, } @@ -148,7 +149,7 @@ func TestIstioRouter_SetRoutes(t *testing.T) { router := &IstioRouter{ logger: mocks.logger, flaggerClient: mocks.flaggerClient, - istioClient: mocks.meshClient, + istioClients: []clientset.Interface{mocks.meshClient}, kubeClient: mocks.kubeClient, } @@ -418,7 +419,7 @@ func TestIstioRouter_getSessionAffinityRoutes(t *testing.T) { router := &IstioRouter{ logger: mocks.logger, flaggerClient: mocks.flaggerClient, - istioClient: mocks.meshClient, + istioClients: []clientset.Interface{mocks.meshClient}, kubeClient: mocks.kubeClient, } @@ -490,7 +491,7 @@ func TestIstioRouter_getSessionAffinityRoutes(t *testing.T) { router := &IstioRouter{ logger: mocks.logger, flaggerClient: mocks.flaggerClient, - istioClient: mocks.meshClient, + istioClients: []clientset.Interface{mocks.meshClient}, kubeClient: mocks.kubeClient, } @@ -579,7 +580,7 @@ func TestIstioRouter_GetRoutes(t *testing.T) { router := &IstioRouter{ logger: mocks.logger, flaggerClient: mocks.flaggerClient, - istioClient: mocks.meshClient, + istioClients: []clientset.Interface{mocks.meshClient}, kubeClient: mocks.kubeClient, } @@ -635,7 +636,7 @@ func TestIstioRouter_HTTPRequestHeaders(t *testing.T) { router := &IstioRouter{ logger: mocks.logger, flaggerClient: mocks.flaggerClient, - istioClient: mocks.meshClient, + istioClients: []clientset.Interface{mocks.meshClient}, kubeClient: mocks.kubeClient, } @@ -655,7 +656,7 @@ func TestIstioRouter_CORS(t *testing.T) { router := &IstioRouter{ logger: mocks.logger, flaggerClient: mocks.flaggerClient, - istioClient: mocks.meshClient, + istioClients: []clientset.Interface{mocks.meshClient}, kubeClient: mocks.kubeClient, } @@ -675,7 +676,7 @@ func TestIstioRouter_ABTest(t *testing.T) { router := &IstioRouter{ logger: mocks.logger, flaggerClient: mocks.flaggerClient, - istioClient: mocks.meshClient, + istioClients: []clientset.Interface{mocks.meshClient}, kubeClient: mocks.kubeClient, } @@ -725,7 +726,7 @@ func TestIstioRouter_GatewayPort(t *testing.T) { router := &IstioRouter{ logger: mocks.logger, flaggerClient: mocks.flaggerClient, - istioClient: mocks.meshClient, + istioClients: []clientset.Interface{mocks.meshClient}, kubeClient: mocks.kubeClient, } @@ -749,7 +750,7 @@ func TestIstioRouter_Delegate(t *testing.T) { router := &IstioRouter{ logger: mocks.logger, flaggerClient: mocks.flaggerClient, - istioClient: mocks.meshClient, + istioClients: []clientset.Interface{mocks.meshClient}, kubeClient: mocks.kubeClient, } @@ -780,7 +781,7 @@ func TestIstioRouter_Delegate(t *testing.T) { router := &IstioRouter{ logger: mocks.logger, flaggerClient: mocks.flaggerClient, - istioClient: mocks.meshClient, + istioClients: []clientset.Interface{mocks.meshClient}, kubeClient: mocks.kubeClient, } @@ -794,7 +795,7 @@ func TestIstioRouter_Finalize(t *testing.T) { router := &IstioRouter{ logger: mocks.logger, flaggerClient: mocks.flaggerClient, - istioClient: mocks.meshClient, + istioClients: []clientset.Interface{mocks.meshClient}, kubeClient: mocks.kubeClient, } @@ -847,7 +848,7 @@ func TestIstioRouter_Finalize(t *testing.T) { for _, table := range tables { var err error if table.createVS { - vs, err := router.istioClient.NetworkingV1beta1().VirtualServices(table.canary.Namespace).Get(context.TODO(), table.canary.Name, metav1.GetOptions{}) + vs, err := router.getClusters()[0].IstioClient.NetworkingV1beta1().VirtualServices(table.canary.Namespace).Get(context.TODO(), table.canary.Name, metav1.GetOptions{}) require.NoError(t, err) if vs.Annotations == nil { @@ -862,7 +863,7 @@ func TestIstioRouter_Finalize(t *testing.T) { case "kubectl": vs.Annotations[kubectlAnnotation] = `{"apiVersion": "networking.istio.io/v1beta1","kind": "VirtualService","metadata": {"annotations": {},"name": "podinfo","namespace": "test"}, "spec": {"gateways": ["istio-system/ingressgateway"],"hosts": ["podinfo"],"http": [{"route": [{"destination": {"host": "podinfo"}}]}]}}` } - _, err = router.istioClient.NetworkingV1beta1().VirtualServices(table.canary.Namespace).Update(context.TODO(), vs, metav1.UpdateOptions{}) + _, err = router.getClusters()[0].IstioClient.NetworkingV1beta1().VirtualServices(table.canary.Namespace).Update(context.TODO(), vs, metav1.UpdateOptions{}) require.NoError(t, err) } @@ -879,7 +880,7 @@ func TestIstioRouter_Finalize(t *testing.T) { } if table.spec != nil { - vs, err := router.istioClient.NetworkingV1beta1().VirtualServices(table.canary.Namespace).Get(context.TODO(), table.canary.Name, metav1.GetOptions{}) + vs, err := router.getClusters()[0].IstioClient.NetworkingV1beta1().VirtualServices(table.canary.Namespace).Get(context.TODO(), table.canary.Name, metav1.GetOptions{}) require.NoError(t, err) require.Equal(t, *table.spec, vs.Spec) } @@ -891,7 +892,7 @@ func TestIstioRouter_Match(t *testing.T) { router := &IstioRouter{ logger: mocks.logger, flaggerClient: mocks.flaggerClient, - istioClient: mocks.meshClient, + istioClients: []clientset.Interface{mocks.meshClient}, kubeClient: mocks.kubeClient, } @@ -1011,7 +1012,7 @@ func TestIstioRouter_SetRoutesTCP(t *testing.T) { router := &IstioRouter{ logger: mocks.logger, flaggerClient: mocks.flaggerClient, - istioClient: mocks.meshClient, + istioClients: []clientset.Interface{mocks.meshClient}, kubeClient: mocks.kubeClient, } @@ -1051,7 +1052,7 @@ func TestIstioRouter_GetRoutesTCP(t *testing.T) { router := &IstioRouter{ logger: mocks.logger, flaggerClient: mocks.flaggerClient, - istioClient: mocks.meshClient, + istioClients: []clientset.Interface{mocks.meshClient}, kubeClient: mocks.kubeClient, } diff --git a/pkg/router/multi_cluster.go b/pkg/router/multi_cluster.go new file mode 100644 index 000000000..b0bde8de4 --- /dev/null +++ b/pkg/router/multi_cluster.go @@ -0,0 +1,217 @@ +/* +Copyright 2020 The Flux authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package router + +import ( + "context" + "sync" + "time" + + "go.uber.org/zap" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/tools/cache" + "k8s.io/client-go/tools/clientcmd" + + clientset "github.com/fluxcd/flagger/pkg/client/clientset/versioned" +) + +// MultiClusterClient holds both clients for a remote cluster +type MultiClusterClient struct { + IstioClient clientset.Interface + KubeClient kubernetes.Interface +} + +// ClusterManager manages Istio clients for multiple clusters discovered via Secrets +type ClusterManager struct { + kubeClient kubernetes.Interface + meshClient clientset.Interface + logger *zap.SugaredLogger + secretLabel string + secretNamespace string + clients *sync.Map +} + +// NewClusterManager returns a new ClusterManager +func NewClusterManager( + kubeClient kubernetes.Interface, + meshClient clientset.Interface, + logger *zap.SugaredLogger, + secretLabel string, + secretNamespace string, +) *ClusterManager { + return &ClusterManager{ + kubeClient: kubeClient, + meshClient: meshClient, + logger: logger, + secretLabel: secretLabel, + secretNamespace: secretNamespace, + clients: new(sync.Map), + } +} + +// Start watching for multi-cluster secrets +func (cm *ClusterManager) Start(stopCh <-chan struct{}) { + cm.logger.Infof("Starting multi-cluster manager, watching for secrets with label %s in namespace %s", cm.secretLabel, cm.secretNamespace) + + watchlist := cache.NewFilteredListWatchFromClient( + cm.kubeClient.CoreV1().RESTClient(), + "secrets", + cm.secretNamespace, + func(options *metav1.ListOptions) { + options.LabelSelector = cm.secretLabel + }, + ) + + _, informer := cache.NewInformer( + watchlist, + &corev1.Secret{}, + time.Minute*5, + cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { + secret := obj.(*corev1.Secret) + cm.addCluster(secret) + }, + UpdateFunc: func(old, new interface{}) { + secret := new.(*corev1.Secret) + cm.addCluster(secret) + }, + DeleteFunc: func(obj interface{}) { + secret := obj.(*corev1.Secret) + cm.removeCluster(secret) + }, + }, + ) + + go informer.Run(stopCh) + + if !cache.WaitForCacheSync(stopCh, informer.HasSynced) { + cm.logger.Error("Timed out waiting for multi-cluster secrets cache to sync") + } +} + +func (cm *ClusterManager) addCluster(secret *corev1.Secret) { + for name, data := range secret.Data { + // Istio remote secrets contain the kubeconfig in a key that is the cluster name + // We skip keys like 'token' as they are not kubeconfigs + if name == "token" { + continue + } + + cm.logger.Infof("Multi-cluster manager: found cluster %s in secret %s", name, secret.Name) + + config, err := clientcmd.RESTConfigFromKubeConfig(data) + if err != nil { + cm.logger.Errorf("Multi-cluster manager: error building kubeconfig for cluster %s: %v", name, err) + continue + } + + client, err := clientset.NewForConfig(config) + if err != nil { + cm.logger.Errorf("Multi-cluster manager: error building Istio clientset for cluster %s: %v", name, err) + continue + } + + kubeClient, err := kubernetes.NewForConfig(config) + if err != nil { + cm.logger.Errorf("Multi-cluster manager: error building kubernetes clientset for cluster %s: %v", name, err) + continue + } + + cm.clients.Store(name, &MultiClusterClient{ + IstioClient: client, + KubeClient: kubeClient, + }) + } +} + +func (cm *ClusterManager) removeCluster(secret *corev1.Secret) { + for name := range secret.Data { + if name == "token" { + continue + } + cm.logger.Infof("Multi-cluster manager: removing cluster %s", name) + cm.clients.Delete(name) + } +} + +// GetClients returns all Istio clients including the local one +func (cm *ClusterManager) GetClients() []clientset.Interface { + var clients []clientset.Interface + clients = append(clients, cm.meshClient) + + cm.clients.Range(func(key, value interface{}) bool { + rc := value.(*MultiClusterClient) + clients = append(clients, rc.IstioClient) + return true + }) + + return clients +} + +// GetMultiClusterClients returns all cluster clients (Istio + Kube) including the local one +func (cm *ClusterManager) GetMultiClusterClients() []MultiClusterClient { + var clients []MultiClusterClient + clients = append(clients, MultiClusterClient{ + IstioClient: cm.meshClient, + KubeClient: cm.kubeClient, + }) + + cm.clients.Range(func(key, value interface{}) bool { + rc := value.(*MultiClusterClient) + clients = append(clients, *rc) + return true + }) + + return clients +} + +// ResolveNamespace checks if a namespace exists on a remote cluster. +// If it does not exist, it creates it with istio-injection=enabled label. +func (cm *ClusterManager) ResolveNamespace(cluster MultiClusterClient, namespace string) string { + // For the local cluster, always use the canary namespace + if cluster.IstioClient == cm.meshClient { + return namespace + } + + kubeClient := cluster.KubeClient + if kubeClient == nil { + cm.logger.Warnf("Multi-cluster: could not find kube client for remote cluster, using namespace %s", namespace) + return namespace + } + + _, err := kubeClient.CoreV1().Namespaces().Get(context.TODO(), namespace, metav1.GetOptions{}) + if err != nil { + cm.logger.Infof("Multi-cluster: namespace %s not found on remote cluster, creating it", namespace) + ns := &corev1.Namespace{ + ObjectMeta: metav1.ObjectMeta{ + Name: namespace, + Labels: map[string]string{ + "istio-injection": "enabled", + }, + }, + } + _, err = kubeClient.CoreV1().Namespaces().Create(context.TODO(), ns, metav1.CreateOptions{}) + if err != nil { + cm.logger.Errorf("Multi-cluster: failed to create namespace %s on remote cluster: %v", namespace, err) + return namespace + } + } + + return namespace +} diff --git a/pkg/router/multi_cluster_test.go b/pkg/router/multi_cluster_test.go new file mode 100644 index 000000000..247f8e55c --- /dev/null +++ b/pkg/router/multi_cluster_test.go @@ -0,0 +1,269 @@ +/* +Copyright 2020 The Flux authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package router + +import ( + "context" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.uber.org/zap" + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/kubernetes/fake" + + istiov1beta1 "github.com/fluxcd/flagger/pkg/apis/istio/v1beta1" + clientset "github.com/fluxcd/flagger/pkg/client/clientset/versioned" + fakeFlagger "github.com/fluxcd/flagger/pkg/client/clientset/versioned/fake" +) + +func TestClusterManager_AddRemoveCluster(t *testing.T) { + logger, _ := zap.NewDevelopment() + sugar := logger.Sugar() + + kubeClient := fake.NewSimpleClientset() + meshClient := fakeFlagger.NewSimpleClientset() + + cm := NewClusterManager(kubeClient, meshClient, sugar, "istio/multiCluster=true", "istio-system") + + // Minimal kubeconfig that satisfies parsing + kubeconfig := ` +apiVersion: v1 +clusters: +- cluster: + server: https://localhost:8443 + name: remote-cluster +contexts: +- context: + cluster: remote-cluster + user: remote-user + name: remote-cluster +current-context: remote-cluster +kind: Config +users: +- name: remote-user + user: + token: fake-token +` + secret := &corev1.Secret{ + ObjectMeta: metav1.ObjectMeta{ + Name: "istio-remote-secret", + Namespace: "istio-system", + Labels: map[string]string{ + "istio/multiCluster": "true", + }, + }, + Data: map[string][]byte{ + "remote-cluster": []byte(kubeconfig), + }, + } + + // Before adding: only local client + clients := cm.GetClients() + assert.Len(t, clients, 1, "Should have 1 client (local) before adding") + + // Add remote cluster + cm.addCluster(secret) + clients = cm.GetClients() + assert.Len(t, clients, 2, "Should have 2 clients (local + remote) after adding") + + // Re-add same cluster (should update, not duplicate) + cm.addCluster(secret) + clients = cm.GetClients() + assert.Len(t, clients, 2, "Should still have 2 clients after re-adding") + + // Remove remote cluster + cm.removeCluster(secret) + clients = cm.GetClients() + assert.Len(t, clients, 1, "Should have 1 client (local) after removal") +} + +func TestClusterManager_ResolveNamespace(t *testing.T) { + logger, _ := zap.NewDevelopment() + sugar := logger.Sugar() + + kubeClient := fake.NewSimpleClientset() + meshClient := fakeFlagger.NewSimpleClientset() + + cm := NewClusterManager(kubeClient, meshClient, sugar, "istio/multiCluster=true", "istio-system") + + // Local cluster always resolves to original namespace + assert.Equal(t, "original", cm.ResolveNamespace(MultiClusterClient{IstioClient: meshClient, KubeClient: kubeClient}, "original")) + + // Manually inject a remote cluster with fake clients to avoid panic and allow state control + remoteMeshClient := fakeFlagger.NewSimpleClientset() + remoteKubeClient := fake.NewSimpleClientset() + cm.clients.Store("remote-cluster", &MultiClusterClient{ + IstioClient: remoteMeshClient, + KubeClient: remoteKubeClient, + }) + + // Get the injected client + remoteClient := remoteMeshClient + + // Test creation when namespace doesn't exist + assert.Equal(t, "non-existent", cm.ResolveNamespace(MultiClusterClient{IstioClient: remoteClient, KubeClient: remoteKubeClient}, "non-existent")) + ns, err := remoteKubeClient.CoreV1().Namespaces().Get(context.TODO(), "non-existent", metav1.GetOptions{}) + require.NoError(t, err) + assert.Equal(t, "enabled", ns.Labels["istio-injection"]) + + // Test success when namespace exists + _, err = remoteKubeClient.CoreV1().Namespaces().Create(context.TODO(), &corev1.Namespace{ + ObjectMeta: metav1.ObjectMeta{Name: "existent"}, + }, metav1.CreateOptions{}) + require.NoError(t, err) + assert.Equal(t, "existent", cm.ResolveNamespace(MultiClusterClient{IstioClient: remoteClient, KubeClient: remoteKubeClient}, "existent")) +} + +func TestClusterManager_SkipsTokenKey(t *testing.T) { + logger, _ := zap.NewDevelopment() + sugar := logger.Sugar() + + kubeClient := fake.NewSimpleClientset() + meshClient := fakeFlagger.NewSimpleClientset() + + cm := NewClusterManager(kubeClient, meshClient, sugar, "istio/multiCluster=true", "istio-system") + + // Secret with only a "token" key — should be skipped + secret := &corev1.Secret{ + ObjectMeta: metav1.ObjectMeta{ + Name: "istio-remote-secret-token-only", + Namespace: "istio-system", + }, + Data: map[string][]byte{ + "token": []byte("some-token"), + }, + } + + cm.addCluster(secret) + clients := cm.GetClients() + assert.Len(t, clients, 1, "Should still have only 1 client (local) — token key is skipped") +} + +func TestClusterManager_InvalidKubeconfig(t *testing.T) { + logger, _ := zap.NewDevelopment() + sugar := logger.Sugar() + + kubeClient := fake.NewSimpleClientset() + meshClient := fakeFlagger.NewSimpleClientset() + + cm := NewClusterManager(kubeClient, meshClient, sugar, "istio/multiCluster=true", "istio-system") + + // Secret with invalid kubeconfig data + secret := &corev1.Secret{ + ObjectMeta: metav1.ObjectMeta{ + Name: "bad-secret", + Namespace: "istio-system", + }, + Data: map[string][]byte{ + "remote-cluster": []byte("this is not a valid kubeconfig"), + }, + } + + cm.addCluster(secret) + clients := cm.GetClients() + assert.Len(t, clients, 1, "Should still have only 1 client (local) — invalid kubeconfig is skipped") +} + +func TestIstioRouter_FinalizeMultiCluster(t *testing.T) { + logger, _ := zap.NewDevelopment() + sugar := logger.Sugar() + + kubeClient := fake.NewSimpleClientset() + meshClient := fakeFlagger.NewSimpleClientset() + + cm := NewClusterManager(kubeClient, meshClient, sugar, "istio/multiCluster=true", "istio-system") + + // Inject a remote cluster + remoteMeshClient := fakeFlagger.NewSimpleClientset() + remoteKubeClient := fake.NewSimpleClientset() + cm.clients.Store("remote-cluster", &MultiClusterClient{ + IstioClient: remoteMeshClient, + KubeClient: remoteKubeClient, + }) + + router := &IstioRouter{ + logger: sugar, + kubeClient: kubeClient, + istioClients: []clientset.Interface{meshClient}, + clusterManager: cm, + } + + canary := newTestCanary() + + // 1. Setup local cluster VirtualService (with annotation so it reverts) + vs := &istiov1beta1.VirtualService{ + ObjectMeta: metav1.ObjectMeta{ + Name: canary.Name, + Namespace: canary.Namespace, + Annotations: map[string]string{ + kubectlAnnotation: "{}", + }, + }, + } + _, err := meshClient.NetworkingV1beta1().VirtualServices(canary.Namespace).Create(context.TODO(), vs, metav1.CreateOptions{}) + require.NoError(t, err) + + // 2. Setup remote cluster resources (replicated) + // DestinationRules (created by Flagger) + drPrimary := &istiov1beta1.DestinationRule{ + ObjectMeta: metav1.ObjectMeta{ + Name: canary.Name + "-primary", + Namespace: canary.Namespace, + }, + } + drCanary := &istiov1beta1.DestinationRule{ + ObjectMeta: metav1.ObjectMeta{ + Name: canary.Name + "-canary", + Namespace: canary.Namespace, + }, + } + _, err = remoteMeshClient.NetworkingV1beta1().DestinationRules(canary.Namespace).Create(context.TODO(), drPrimary, metav1.CreateOptions{}) + require.NoError(t, err) + _, err = remoteMeshClient.NetworkingV1beta1().DestinationRules(canary.Namespace).Create(context.TODO(), drCanary, metav1.CreateOptions{}) + require.NoError(t, err) + + // VirtualService (created by Flagger, no orig-config) + remoteVS := &istiov1beta1.VirtualService{ + ObjectMeta: metav1.ObjectMeta{ + Name: canary.Name, + Namespace: canary.Namespace, + }, + } + _, err = remoteMeshClient.NetworkingV1beta1().VirtualServices(canary.Namespace).Create(context.TODO(), remoteVS, metav1.CreateOptions{}) + require.NoError(t, err) + + // 3. Finalize + err = router.Finalize(canary) + require.NoError(t, err) + + // 4. Verify Local: VirtualService should still exist (reverted) + _, err = meshClient.NetworkingV1beta1().VirtualServices(canary.Namespace).Get(context.TODO(), canary.Name, metav1.GetOptions{}) + assert.NoError(t, err, "Local VirtualService should still exist") + + // 5. Verify Remote: DestinationRules should be DELETED + _, err = remoteMeshClient.NetworkingV1beta1().DestinationRules(canary.Namespace).Get(context.TODO(), canary.Name+"-primary", metav1.GetOptions{}) + assert.True(t, errors.IsNotFound(err), "Remote primary DestinationRule should be deleted") + _, err = remoteMeshClient.NetworkingV1beta1().DestinationRules(canary.Namespace).Get(context.TODO(), canary.Name+"-canary", metav1.GetOptions{}) + assert.True(t, errors.IsNotFound(err), "Remote canary DestinationRule should be deleted") + + // 6. Verify Remote: VirtualService should be DELETED (no orig-config) + _, err = remoteMeshClient.NetworkingV1beta1().VirtualServices(canary.Namespace).Get(context.TODO(), canary.Name, metav1.GetOptions{}) + assert.True(t, errors.IsNotFound(err), "Remote VirtualService should be deleted") +}