Skip to content

Commit 63a777a

Browse files
AntonVasantdajac
authored andcommitted
KAFKA-19891: Bump group epoch when member regex subscription transitions from non-empty to empty (#21013)
This PR fixes an issue in GroupMetadataManager#maybeUpdateRegularExpressions where a member’s regex subscription transition from non-empty → empty did not trigger a group epoch bump. The method previously returned REGEX_UPDATED, which does not cause consumerGroupHeartbeat to increment the group epoch. Fix The patch updates the logic to return: REGEX_UPDATED_AND_RESOLVED when: the updated regex subscription text is empty, and the previous subscription was non-empty. This ensures that consumerGroupHeartbeat correctly bumps the group epoch, keeping the group metadata consistent. Tests Several tests were updated to align with the corrected behavior. Tests that previously expected no epoch bump were failing, and have now been adjusted to expect the new, correct logic. JIRA https://issues.apache.org/jira/browse/KAFKA-19891 Impact Fixes coordinator state correctness for regex-subscribing consumer groups Ensures group epoch bumps happen for all relevant subscription transitions Backward compatible No public API changes Reviewers: Sean Quah <squah@confluent.io>, David Jacot <djacot@confluent.io>
1 parent 5df5907 commit 63a777a

File tree

2 files changed

+83
-0
lines changed

2 files changed

+83
-0
lines changed

group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3314,6 +3314,8 @@ private UpdateRegularExpressionsResult maybeUpdateRegularExpressions(
33143314
updateRegularExpressionsResult = UpdateRegularExpressionsResult.REGEX_UPDATED_AND_RESOLVED;
33153315
}
33163316
}
3317+
} else if (isNotEmpty(oldSubscribedTopicRegex)) {
3318+
updateRegularExpressionsResult = UpdateRegularExpressionsResult.REGEX_UPDATED_AND_RESOLVED;
33173319
}
33183320
}
33193321

group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java

Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21486,6 +21486,87 @@ foooTopicName, computeTopicHash(foooTopicName, new KRaftCoordinatorMetadataImage
2148621486
);
2148721487
}
2148821488

21489+
@Test
21490+
public void testConsumerGroupMemberClearsRegex() {
21491+
String groupId = "fooup";
21492+
String memberId1 = Uuid.randomUuid().toString();
21493+
21494+
Uuid fooTopicId = Uuid.randomUuid();
21495+
String fooTopicName = "foo";
21496+
21497+
CoordinatorMetadataImage metadataImage = new MetadataImageBuilder()
21498+
.addTopic(fooTopicId, fooTopicName, 6)
21499+
.buildCoordinatorMetadataImage(12345L);
21500+
21501+
MockPartitionAssignor assignor = new MockPartitionAssignor("range");
21502+
assignor.prepareGroupAssignment(new GroupAssignment(Map.of()));
21503+
GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder()
21504+
.withConfig(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNORS_CONFIG, List.of(assignor))
21505+
.withMetadataImage(metadataImage)
21506+
.withConsumerGroup(new ConsumerGroupBuilder(groupId, 10)
21507+
.withMember(new ConsumerGroupMember.Builder(memberId1)
21508+
.setState(MemberState.STABLE)
21509+
.setMemberEpoch(10)
21510+
.setPreviousMemberEpoch(10)
21511+
.setClientId(DEFAULT_CLIENT_ID)
21512+
.setClientHost(DEFAULT_CLIENT_ADDRESS.toString())
21513+
.setRebalanceTimeoutMs(5000)
21514+
.setSubscribedTopicRegex("foo*")
21515+
.setServerAssignorName("range")
21516+
.setAssignedPartitions(mkAssignment(
21517+
mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 4, 5)))
21518+
.build())
21519+
.withAssignment(memberId1, mkAssignment(
21520+
mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 4, 5)))
21521+
.withAssignmentEpoch(10))
21522+
.build();
21523+
21524+
// Member 1 updates its new regular expression.
21525+
CoordinatorResult<ConsumerGroupHeartbeatResponseData, CoordinatorRecord> result = context.consumerGroupHeartbeat(
21526+
new ConsumerGroupHeartbeatRequestData()
21527+
.setGroupId(groupId)
21528+
.setMemberId(memberId1)
21529+
.setMemberEpoch(10)
21530+
.setRebalanceTimeoutMs(5000)
21531+
.setSubscribedTopicRegex("")
21532+
.setServerAssignor("range")
21533+
.setTopicPartitions(List.of()));
21534+
21535+
assertResponseEquals(
21536+
new ConsumerGroupHeartbeatResponseData()
21537+
.setMemberId(memberId1)
21538+
.setMemberEpoch(11)
21539+
.setHeartbeatIntervalMs(5000)
21540+
.setAssignment(new ConsumerGroupHeartbeatResponseData.Assignment()
21541+
.setTopicPartitions(List.of())
21542+
),
21543+
result.response()
21544+
);
21545+
21546+
ConsumerGroupMember expectedMember1 = new ConsumerGroupMember.Builder(memberId1)
21547+
.setState(MemberState.STABLE)
21548+
.setMemberEpoch(11)
21549+
.setPreviousMemberEpoch(10)
21550+
.setClientId(DEFAULT_CLIENT_ID)
21551+
.setClientHost(DEFAULT_CLIENT_ADDRESS.toString())
21552+
.setRebalanceTimeoutMs(5000)
21553+
.setSubscribedTopicRegex("")
21554+
.setServerAssignorName("range")
21555+
.build();
21556+
21557+
List<CoordinatorRecord> expectedRecords = List.of(
21558+
GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId, expectedMember1),
21559+
// previous expression is deleted
21560+
GroupCoordinatorRecordHelpers.newConsumerGroupRegularExpressionTombstone(groupId, "foo*"),
21561+
GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 11, 0),
21562+
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId, memberId1, Map.of()),
21563+
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochRecord(groupId, 11),
21564+
GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId, expectedMember1)
21565+
);
21566+
21567+
assertRecordsEquals(expectedRecords, result.records());
21568+
}
21569+
2148921570
@Test
2149021571
public void testConsumerMemberWithRegexReplacedByClassicMemberWithSameSubscription() {
2149121572
String groupId = "fooup";

0 commit comments

Comments
 (0)