Skip to content

Commit 2a2465d

Browse files
Support scale to zero rabbitMQ
1 parent 045c77d commit 2a2465d

File tree

4 files changed

+347
-6
lines changed

4 files changed

+347
-6
lines changed

controllers/rabbitmqcluster_controller.go

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -201,9 +201,23 @@ func (r *RabbitmqClusterReconciler) Reconcile(ctx context.Context, req ctrl.Requ
201201
if err := builder.Update(sts); err != nil {
202202
return ctrl.Result{}, err
203203
}
204-
if r.scaleDown(ctx, rabbitmqCluster, current, sts) {
205-
// return when cluster scale down detected; unsupported operation
206-
return ctrl.Result{}, nil
204+
if r.scaleToZero(current, sts) {
205+
err := r.saveReplicasBeforeZero(ctx, rabbitmqCluster, current)
206+
if err != nil {
207+
return ctrl.Result{}, err
208+
}
209+
} else {
210+
if r.scaleDown(ctx, rabbitmqCluster, current, sts) {
211+
// return when cluster scale down detected; unsupported operation
212+
return ctrl.Result{}, nil
213+
}
214+
}
215+
if r.scaleFromZero(current, sts) {
216+
if r.scaleDownFromZero(ctx, rabbitmqCluster, sts) {
217+
// return when cluster scale down from zero detected; unsupported operation
218+
return ctrl.Result{}, nil
219+
}
220+
r.removeReplicasBeforeZeroAnnotationIfExists(ctx, rabbitmqCluster)
207221
}
208222
}
209223

Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,98 @@
1+
package controllers
2+
3+
import (
4+
"context"
5+
"errors"
6+
"fmt"
7+
"strconv"
8+
9+
ctrl "sigs.k8s.io/controller-runtime"
10+
11+
"github.com/rabbitmq/cluster-operator/v2/api/v1beta1"
12+
"github.com/rabbitmq/cluster-operator/v2/internal/status"
13+
appsv1 "k8s.io/api/apps/v1"
14+
corev1 "k8s.io/api/core/v1"
15+
)
16+
17+
const beforeZeroReplicasConfigured = "rabbitmq.com/before-zero-replicas-configured"
18+
19+
// scaleToZero checks if the desired replicas is zero and the current replicas is not zero.
20+
func (r *RabbitmqClusterReconciler) scaleToZero(current, sts *appsv1.StatefulSet) bool {
21+
currentReplicas := *current.Spec.Replicas
22+
desiredReplicas := *sts.Spec.Replicas
23+
return desiredReplicas == 0 && currentReplicas > 0
24+
}
25+
26+
// scaleFromZero checks if the current replicas is zero and the desired replicas is greater than zero.
27+
func (r *RabbitmqClusterReconciler) scaleFromZero(current, sts *appsv1.StatefulSet) bool {
28+
currentReplicas := *current.Spec.Replicas
29+
desiredReplicas := *sts.Spec.Replicas
30+
return currentReplicas == 0 && desiredReplicas > 0
31+
}
32+
33+
// scaleDownFromZero checks if the current replicas is desired replicas would be greatter than replicas configured before zero state.
34+
func (r *RabbitmqClusterReconciler) scaleDownFromZero(ctx context.Context, cluster *v1beta1.RabbitmqCluster, sts *appsv1.StatefulSet) bool {
35+
var err error
36+
var beforeZeroReplicas int64
37+
desiredReplicas := *sts.Spec.Replicas
38+
annotationValue, ok := cluster.Annotations[beforeZeroReplicasConfigured]
39+
if !ok {
40+
return false
41+
}
42+
43+
beforeZeroReplicas, err = strconv.ParseInt(annotationValue, 10, 32)
44+
if err != nil {
45+
msg := "Failed to convert string to integer for before-zero-replicas-configuration annotation"
46+
reason := "TransformErrorOperation"
47+
err = r.recordEventsAndSetCondition(ctx, cluster, status.ReconcileSuccess, corev1.ConditionFalse, corev1.EventTypeWarning, reason, msg)
48+
if err != nil {
49+
return true
50+
}
51+
return true
52+
}
53+
54+
if desiredReplicas < int32(beforeZeroReplicas) {
55+
msg := fmt.Sprintf("Cluster Scale down not supported; tried to scale cluster from %d nodes to %d nodes", int32(beforeZeroReplicas), desiredReplicas)
56+
reason := "UnsupportedOperation"
57+
err = r.recordEventsAndSetCondition(ctx, cluster, status.ReconcileSuccess, corev1.ConditionFalse, corev1.EventTypeWarning, reason, msg)
58+
if err != nil {
59+
return true
60+
}
61+
return true
62+
}
63+
return false
64+
}
65+
66+
// saveReplicasBeforeZero saves the current replicas count in an annotation before scaling down to zero.
67+
// This is used to prevent scaling down when the cluster change from zero replicas to a number less than the saved replicas count.
68+
func (r *RabbitmqClusterReconciler) saveReplicasBeforeZero(ctx context.Context, cluster *v1beta1.RabbitmqCluster, current *appsv1.StatefulSet) error {
69+
var err error
70+
currentReplicas := *current.Spec.Replicas
71+
logger := ctrl.LoggerFrom(ctx)
72+
msg := "Cluster Scale down to 0 replicas."
73+
reason := "ScaleDownToZero"
74+
logger.Info(msg)
75+
err = r.updateAnnotation(ctx, cluster, cluster.Namespace, cluster.Name, beforeZeroReplicasConfigured, fmt.Sprint(currentReplicas))
76+
r.Recorder.Event(cluster, corev1.EventTypeNormal, reason, msg)
77+
return err
78+
}
79+
80+
// If the annotation rabbitmq.com/before-zero-replicas-configured exists it will be deleted.
81+
func (r *RabbitmqClusterReconciler) removeReplicasBeforeZeroAnnotationIfExists(ctx context.Context, cluster *v1beta1.RabbitmqCluster) {
82+
if _, ok := cluster.Annotations[beforeZeroReplicasConfigured]; ok {
83+
r.deleteAnnotation(ctx, cluster, beforeZeroReplicasConfigured)
84+
}
85+
}
86+
87+
func (r *RabbitmqClusterReconciler) recordEventsAndSetCondition(ctx context.Context, cluster *v1beta1.RabbitmqCluster, condType status.RabbitmqClusterConditionType, condStatus corev1.ConditionStatus, eventType, reason, msg string) error {
88+
logger := ctrl.LoggerFrom(ctx)
89+
var statusErr error
90+
logger.Error(errors.New(reason), msg)
91+
r.Recorder.Event(cluster, eventType, reason, msg)
92+
cluster.Status.SetCondition(condType, condStatus, reason, msg)
93+
if statusErr := r.Status().Update(ctx, cluster); statusErr != nil {
94+
logger.Error(statusErr, "Failed to update ReconcileSuccess condition state")
95+
}
96+
return statusErr
97+
98+
}
Lines changed: 231 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,231 @@
1+
package controllers_test
2+
3+
import (
4+
"context"
5+
"fmt"
6+
7+
. "github.com/onsi/ginkgo/v2"
8+
. "github.com/onsi/gomega"
9+
rabbitmqv1beta1 "github.com/rabbitmq/cluster-operator/v2/api/v1beta1"
10+
"github.com/rabbitmq/cluster-operator/v2/internal/status"
11+
apierrors "k8s.io/apimachinery/pkg/api/errors"
12+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
13+
"k8s.io/apimachinery/pkg/types"
14+
"k8s.io/utils/ptr"
15+
runtimeClient "sigs.k8s.io/controller-runtime/pkg/client"
16+
)
17+
18+
var _ = Describe("Cluster scale to zero", func() {
19+
var (
20+
cluster *rabbitmqv1beta1.RabbitmqCluster
21+
defaultNamespace = "default"
22+
ctx = context.Background()
23+
)
24+
25+
AfterEach(func() {
26+
Expect(client.Delete(ctx, cluster)).To(Succeed())
27+
waitForClusterDeletion(ctx, cluster, client)
28+
Eventually(func() bool {
29+
rmq := &rabbitmqv1beta1.RabbitmqCluster{}
30+
err := client.Get(ctx, types.NamespacedName{Name: cluster.Name, Namespace: cluster.Namespace}, rmq)
31+
return apierrors.IsNotFound(err)
32+
}).Should(BeTrue())
33+
})
34+
35+
It("scale to zero", func() {
36+
By("update statefulSet replicas to zero", func() {
37+
cluster = &rabbitmqv1beta1.RabbitmqCluster{
38+
ObjectMeta: metav1.ObjectMeta{
39+
Name: "rabbitmq-to-zero",
40+
Namespace: defaultNamespace,
41+
},
42+
Spec: rabbitmqv1beta1.RabbitmqClusterSpec{
43+
Replicas: ptr.To(int32(2)),
44+
},
45+
}
46+
Expect(client.Create(ctx, cluster)).To(Succeed())
47+
waitForClusterCreation(ctx, cluster, client)
48+
49+
Expect(updateWithRetry(cluster, func(r *rabbitmqv1beta1.RabbitmqCluster) {
50+
r.Spec.Replicas = ptr.To(int32(0))
51+
})).To(Succeed())
52+
53+
Eventually(func() int32 {
54+
sts, err := clientSet.AppsV1().StatefulSets(defaultNamespace).Get(ctx, cluster.ChildResourceName("server"), metav1.GetOptions{})
55+
Expect(err).NotTo(HaveOccurred())
56+
return *sts.Spec.Replicas
57+
}, 10, 1).Should(Equal(int32(0)))
58+
59+
})
60+
61+
By("setting ReconcileSuccess to 'true'", func() {
62+
Eventually(func() string {
63+
rabbit := &rabbitmqv1beta1.RabbitmqCluster{}
64+
Expect(client.Get(ctx, runtimeClient.ObjectKey{
65+
Name: cluster.Name,
66+
Namespace: defaultNamespace,
67+
}, rabbit)).To(Succeed())
68+
69+
for i := range rabbit.Status.Conditions {
70+
if rabbit.Status.Conditions[i].Type == status.ReconcileSuccess {
71+
return fmt.Sprintf(
72+
"ReconcileSuccess status: %s, with reason: %s and message: %s",
73+
rabbit.Status.Conditions[i].Status,
74+
rabbit.Status.Conditions[i].Reason,
75+
rabbit.Status.Conditions[i].Message)
76+
}
77+
}
78+
return "ReconcileSuccess status: condition not present"
79+
}, 0).Should(Equal("ReconcileSuccess status: True, " +
80+
"with reason: Success " +
81+
"and message: Finish reconciling"))
82+
})
83+
})
84+
})
85+
86+
var _ = Describe("Cluster scale from zero", func() {
87+
var (
88+
cluster *rabbitmqv1beta1.RabbitmqCluster
89+
defaultNamespace = "default"
90+
ctx = context.Background()
91+
)
92+
93+
AfterEach(func() {
94+
Expect(client.Delete(ctx, cluster)).To(Succeed())
95+
waitForClusterDeletion(ctx, cluster, client)
96+
Eventually(func() bool {
97+
rmq := &rabbitmqv1beta1.RabbitmqCluster{}
98+
err := client.Get(ctx, types.NamespacedName{Name: cluster.Name, Namespace: cluster.Namespace}, rmq)
99+
return apierrors.IsNotFound(err)
100+
}).Should(BeTrue())
101+
})
102+
103+
It("scale from zero", func() {
104+
By("update statefulSet replicas from zero", func() {
105+
cluster = &rabbitmqv1beta1.RabbitmqCluster{
106+
ObjectMeta: metav1.ObjectMeta{
107+
Name: "rabbitmq-from-zero",
108+
Namespace: defaultNamespace,
109+
Annotations: map[string]string{
110+
"rabbitmq.com/before-zero-replicas-configured": "2",
111+
},
112+
},
113+
Spec: rabbitmqv1beta1.RabbitmqClusterSpec{
114+
Replicas: ptr.To(int32(0)),
115+
},
116+
}
117+
Expect(client.Create(ctx, cluster)).To(Succeed())
118+
waitForClusterCreation(ctx, cluster, client)
119+
120+
Expect(updateWithRetry(cluster, func(r *rabbitmqv1beta1.RabbitmqCluster) {
121+
r.Spec.Replicas = ptr.To(int32(2))
122+
})).To(Succeed())
123+
124+
Eventually(func() int32 {
125+
sts, err := clientSet.AppsV1().StatefulSets(defaultNamespace).Get(ctx, cluster.ChildResourceName("server"), metav1.GetOptions{})
126+
Expect(err).NotTo(HaveOccurred())
127+
return *sts.Spec.Replicas
128+
}, 10, 1).Should(Equal(int32(2)))
129+
130+
})
131+
132+
By("setting ReconcileSuccess to 'true'", func() {
133+
Eventually(func() string {
134+
rabbit := &rabbitmqv1beta1.RabbitmqCluster{}
135+
Expect(client.Get(ctx, runtimeClient.ObjectKey{
136+
Name: cluster.Name,
137+
Namespace: defaultNamespace,
138+
}, rabbit)).To(Succeed())
139+
140+
for i := range rabbit.Status.Conditions {
141+
if rabbit.Status.Conditions[i].Type == status.ReconcileSuccess {
142+
return fmt.Sprintf(
143+
"ReconcileSuccess status: %s, with reason: %s and message: %s",
144+
rabbit.Status.Conditions[i].Status,
145+
rabbit.Status.Conditions[i].Reason,
146+
rabbit.Status.Conditions[i].Message)
147+
}
148+
}
149+
return "ReconcileSuccess status: condition not present"
150+
}, 0).Should(Equal("ReconcileSuccess status: True, " +
151+
"with reason: Success " +
152+
"and message: Finish reconciling"))
153+
})
154+
})
155+
})
156+
157+
var _ = Describe("Cluster scale from zero to less replicas configured", Ordered, func() {
158+
var (
159+
cluster *rabbitmqv1beta1.RabbitmqCluster
160+
defaultNamespace = "default"
161+
ctx = context.Background()
162+
)
163+
164+
AfterEach(func() {
165+
Expect(client.Delete(ctx, cluster)).To(Succeed())
166+
waitForClusterDeletion(ctx, cluster, client)
167+
Eventually(func() bool {
168+
rmq := &rabbitmqv1beta1.RabbitmqCluster{}
169+
err := client.Get(ctx, types.NamespacedName{Name: cluster.Name, Namespace: cluster.Namespace}, rmq)
170+
return apierrors.IsNotFound(err)
171+
}).Should(BeTrue())
172+
})
173+
174+
It("scale from zero to less replicas", func() {
175+
By("update statefulSet replicas from zero", func() {
176+
cluster = &rabbitmqv1beta1.RabbitmqCluster{
177+
ObjectMeta: metav1.ObjectMeta{
178+
Name: "rabbitmq-from-zero-to-less",
179+
Namespace: defaultNamespace,
180+
Annotations: map[string]string{
181+
"rabbitmq.com/before-zero-replicas-configured": "2",
182+
},
183+
},
184+
Spec: rabbitmqv1beta1.RabbitmqClusterSpec{
185+
Replicas: ptr.To(int32(0)),
186+
},
187+
}
188+
Expect(client.Create(ctx, cluster)).To(Succeed())
189+
waitForClusterCreation(ctx, cluster, client)
190+
191+
Expect(updateWithRetry(cluster, func(r *rabbitmqv1beta1.RabbitmqCluster) {
192+
r.Spec.Replicas = ptr.To(int32(1))
193+
})).To(Succeed())
194+
195+
Consistently(func() int32 {
196+
sts, err := clientSet.AppsV1().StatefulSets(defaultNamespace).Get(ctx, cluster.ChildResourceName("server"), metav1.GetOptions{})
197+
Expect(err).NotTo(HaveOccurred())
198+
return *sts.Spec.Replicas
199+
}, 10, 1).Should(Equal(int32(0)))
200+
201+
})
202+
203+
By("setting 'Warning' events", func() {
204+
Expect(aggregateEventMsgs(ctx, cluster, "UnsupportedOperation")).To(
205+
ContainSubstring("Cluster Scale down not supported"))
206+
})
207+
208+
By("setting ReconcileSuccess to 'false'", func() {
209+
Eventually(func() string {
210+
rabbit := &rabbitmqv1beta1.RabbitmqCluster{}
211+
Expect(client.Get(ctx, runtimeClient.ObjectKey{
212+
Name: cluster.Name,
213+
Namespace: defaultNamespace,
214+
}, rabbit)).To(Succeed())
215+
216+
for i := range rabbit.Status.Conditions {
217+
if rabbit.Status.Conditions[i].Type == status.ReconcileSuccess {
218+
return fmt.Sprintf(
219+
"ReconcileSuccess status: %s, with reason: %s and message: %s",
220+
rabbit.Status.Conditions[i].Status,
221+
rabbit.Status.Conditions[i].Reason,
222+
rabbit.Status.Conditions[i].Message)
223+
}
224+
}
225+
return "ReconcileSuccess status: condition not present"
226+
}, 0).Should(Equal("ReconcileSuccess status: False, " +
227+
"with reason: UnsupportedOperation " +
228+
"and message: Cluster Scale down not supported; tried to scale cluster from 2 nodes to 1 nodes"))
229+
})
230+
})
231+
})

go.mod

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,6 @@
11
module github.com/rabbitmq/cluster-operator/v2
22

3-
go 1.24.2
4-
5-
toolchain go1.24.3
3+
go 1.24.4
64

75
require (
86
github.com/cloudflare/cfssl v1.6.5

0 commit comments

Comments
 (0)