Skip to content

Commit 65a3bd2

Browse files
authored
KAFKA-19926: Apply rebalance delay also for later epochs if the group is empty (#21011)
In #20755 we introduce a initial rebalance delay for streams group, now we would like to expand it to all rebalance when group is empty to avoid frequent rebalance in a small time period Reviewers: Lucas Brutschy <lbrutschy@confluent.io>
1 parent 58d62d1 commit 65a3bd2

File tree

2 files changed

+62
-4
lines changed

2 files changed

+62
-4
lines changed

group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2043,8 +2043,7 @@ private CoordinatorResult<StreamsGroupHeartbeatResult, CoordinatorRecord> stream
20432043
}
20442044

20452045
// Schedule initial rebalance delay for new streams groups to coalesce joins.
2046-
boolean isInitialRebalance = (group.groupEpoch() == 0);
2047-
if (isInitialRebalance) {
2046+
if (group.isEmpty()) {
20482047
int initialDelayMs = streamsGroupInitialRebalanceDelayMs(groupId);
20492048
if (initialDelayMs > 0) {
20502049
timer.scheduleIfAbsent(
@@ -2064,9 +2063,9 @@ private CoordinatorResult<StreamsGroupHeartbeatResult, CoordinatorRecord> stream
20642063
TasksTuple targetAssignment;
20652064
if (groupEpoch > group.assignmentEpoch()) {
20662065
boolean initialDelayActive = timer.isScheduled(streamsInitialRebalanceKey(groupId));
2067-
if (initialDelayActive && group.assignmentEpoch() == 0) {
2066+
if (initialDelayActive) {
20682067
// During initial rebalance delay, return empty assignment to first joining members.
2069-
targetAssignmentEpoch = 1;
2068+
targetAssignmentEpoch = Math.max(1, group.assignmentEpoch());
20702069
targetAssignment = TasksTuple.EMPTY;
20712070
} else {
20722071
targetAssignment = updateStreamsTargetAssignment(

group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16732,6 +16732,7 @@ public void testStreamsGroupMemberJoiningWithStaleTopology() {
1673216732
GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder()
1673316733
.withStreamsGroupTaskAssignors(List.of(assignor))
1673416734
.withMetadataImage(metadataImage)
16735+
.withConfig(GroupCoordinatorConfig.STREAMS_GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG, 0)
1673516736
.withStreamsGroup(
1673616737
new StreamsGroupBuilder(groupId, 10)
1673716738
.withTopology(StreamsTopology.fromHeartbeatRequest(topology1))
@@ -17536,6 +17537,64 @@ public void testStreamsInitialRebalanceDelayEmptyDuringDelayAssignsAfterTimer()
1753617537
);
1753717538
}
1753817539

17540+
@Test
17541+
public void testStreamsRebalanceDelayWhenJoiningEmptyGroupWithNonZeroEpoch() {
17542+
String groupId = "fooup";
17543+
String memberId = Uuid.randomUuid().toString();
17544+
String subtopology1 = "subtopology1";
17545+
String fooTopicName = "foo";
17546+
Uuid fooTopicId = Uuid.randomUuid();
17547+
Topology topology = new Topology().setSubtopologies(List.of(
17548+
new Subtopology().setSubtopologyId(subtopology1).setSourceTopics(List.of(fooTopicName))
17549+
));
17550+
17551+
MockTaskAssignor assignor = new MockTaskAssignor("sticky");
17552+
GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder()
17553+
.withStreamsGroupTaskAssignors(List.of(assignor))
17554+
.withMetadataImage(new MetadataImageBuilder()
17555+
.addTopic(fooTopicId, fooTopicName, 2)
17556+
.buildCoordinatorMetadataImage())
17557+
.withConfig(GroupCoordinatorConfig.STREAMS_GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG, 1000)
17558+
.withStreamsGroup(new StreamsGroupBuilder(groupId, 10))
17559+
.build();
17560+
17561+
StreamsGroup group = context.groupMetadataManager.streamsGroup(groupId);
17562+
assertTrue(group.isEmpty());
17563+
assertEquals(10, group.groupEpoch());
17564+
17565+
assignor.prepareGroupAssignment(
17566+
Map.of(memberId, TaskAssignmentTestUtil.mkTasksTuple(TaskRole.ACTIVE, TaskAssignmentTestUtil.mkTasks(subtopology1, 0, 1))));
17567+
17568+
CoordinatorResult<StreamsGroupHeartbeatResult, CoordinatorRecord> result;
17569+
17570+
result = context.streamsGroupHeartbeat(
17571+
new StreamsGroupHeartbeatRequestData()
17572+
.setGroupId(groupId)
17573+
.setMemberId(memberId)
17574+
.setMemberEpoch(0)
17575+
.setRebalanceTimeoutMs(10000)
17576+
.setTopology(topology)
17577+
.setActiveTasks(List.of())
17578+
.setStandbyTasks(List.of())
17579+
.setWarmupTasks(List.of()));
17580+
17581+
int memberEpoch = result.response().data().memberEpoch();
17582+
assertTrue(result.response().data().activeTasks().isEmpty());
17583+
17584+
context.sleep(2000);
17585+
17586+
result = context.streamsGroupHeartbeat(
17587+
new StreamsGroupHeartbeatRequestData()
17588+
.setGroupId(groupId)
17589+
.setMemberId(memberId)
17590+
.setMemberEpoch(memberEpoch)
17591+
.setActiveTasks(List.of())
17592+
.setStandbyTasks(List.of())
17593+
.setWarmupTasks(List.of()));
17594+
17595+
assertFalse(result.response().data().activeTasks().isEmpty());
17596+
}
17597+
1753917598
@Test
1754017599
public void testStreamsReconciliationProcess() {
1754117600
String groupId = "fooup";

0 commit comments

Comments
 (0)