From 33d776a24da268c22388f573fdbab36168f4c092 Mon Sep 17 00:00:00 2001 From: Nikita Date: Wed, 3 Dec 2025 16:17:51 -0800 Subject: [PATCH 1/4] MINOR: added retries if removeMembersFromConsumerGroup failed with UnknownMemberIdException. --- .../RemoveMembersFromConsumerGroupResult.java | 2 +- .../apache/kafka/tools/StreamsResetter.java | 60 ++++++++----- .../kafka/tools/StreamsResetterTest.java | 90 ++++++++++++++++++- 3 files changed, 127 insertions(+), 25 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/RemoveMembersFromConsumerGroupResult.java b/clients/src/main/java/org/apache/kafka/clients/admin/RemoveMembersFromConsumerGroupResult.java index 3845e2f6aac62..58547e390dc26 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/RemoveMembersFromConsumerGroupResult.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/RemoveMembersFromConsumerGroupResult.java @@ -35,7 +35,7 @@ public class RemoveMembersFromConsumerGroupResult { private final KafkaFuture> future; private final Set memberInfos; - RemoveMembersFromConsumerGroupResult(KafkaFuture> future, + public RemoveMembersFromConsumerGroupResult(KafkaFuture> future, Set memberInfos) { this.future = future; this.memberInfos = memberInfos; diff --git a/tools/src/main/java/org/apache/kafka/tools/StreamsResetter.java b/tools/src/main/java/org/apache/kafka/tools/StreamsResetter.java index 08fc1d590e826..ce4f585349926 100644 --- a/tools/src/main/java/org/apache/kafka/tools/StreamsResetter.java +++ b/tools/src/main/java/org/apache/kafka/tools/StreamsResetter.java @@ -27,10 +27,12 @@ import org.apache.kafka.clients.consumer.GroupProtocol; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.consumer.OffsetAndTimestamp; +import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.KafkaFuture; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.annotation.InterfaceStability; import org.apache.kafka.common.errors.GroupIdNotFoundException; +import org.apache.kafka.common.errors.UnknownMemberIdException; import org.apache.kafka.common.requests.ListOffsetsResponse; import org.apache.kafka.common.serialization.ByteArrayDeserializer; import org.apache.kafka.common.utils.Exit; @@ -149,7 +151,7 @@ public int execute(final String[] args, final Properties config) { properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServerValue); try (Admin adminClient = Admin.create(properties)) { - maybeDeleteActiveConsumers(groupId, adminClient, options); + maybeDeleteActiveConsumers(groupId, adminClient, options.hasForce()); allTopics.clear(); allTopics.addAll(adminClient.listTopics().names().get(60, TimeUnit.SECONDS)); @@ -177,30 +179,42 @@ public int execute(final String[] args, final Properties config) { } } - private void maybeDeleteActiveConsumers(final String groupId, - final Admin adminClient, - final StreamsResetterOptions options) + // visible for testing + void maybeDeleteActiveConsumers(final String groupId, + final Admin adminClient, + final boolean force) throws ExecutionException, InterruptedException { - final DescribeConsumerGroupsResult describeResult = adminClient.describeConsumerGroups( - Set.of(groupId), - new DescribeConsumerGroupsOptions().timeoutMs(10 * 1000)); - try { - final List members = - new ArrayList<>(describeResult.describedGroups().get(groupId).get().members()); - if (!members.isEmpty()) { - if (options.hasForce()) { - System.out.println("Force deleting all active members in the group: " + groupId); - adminClient.removeMembersFromConsumerGroup(groupId, new RemoveMembersFromConsumerGroupOptions()).all().get(); - } else { - throw new IllegalStateException("Consumer group '" + groupId + "' is still active " - + "and has following members: " + members + ". " - + "Make sure to stop all running application instances before running the reset tool." - + " You can use option '--force' to remove active members from the group."); + int retries = 0; + while (true) { + final DescribeConsumerGroupsResult describeResult = adminClient.describeConsumerGroups( + Set.of(groupId), + new DescribeConsumerGroupsOptions().timeoutMs(10 * 1000)); + try { + final List members = + new ArrayList<>(describeResult.describedGroups().get(groupId).get().members()); + if (!members.isEmpty()) { + if (force) { + System.out.println("Force deleting all active members in the group: " + groupId); + adminClient.removeMembersFromConsumerGroup(groupId, new RemoveMembersFromConsumerGroupOptions()).all().get(); + } else { + throw new IllegalStateException("Consumer group '" + groupId + "' is still active " + + "and has following members: " + members + ". " + + "Make sure to stop all running application instances before running the reset tool." + + " You can use option '--force' to remove active members from the group."); + } + } + return; + } catch (ExecutionException ee) { + // If the group ID is not found, this is not an error case + if (ee.getCause() instanceof GroupIdNotFoundException) { + return; + } + // if a member is unknown, it may mean that it left the group itself. Retrying to confirm. + if (ee.getCause() instanceof KafkaException ke && ke.getCause() instanceof UnknownMemberIdException) { + if (retries++ < 3) { + continue; + } } - } - } catch (ExecutionException ee) { - // If the group ID is not found, this is not an error case - if (!(ee.getCause() instanceof GroupIdNotFoundException)) { throw ee; } } diff --git a/tools/src/test/java/org/apache/kafka/tools/StreamsResetterTest.java b/tools/src/test/java/org/apache/kafka/tools/StreamsResetterTest.java index ef8875f7c49eb..a4abe88b090cd 100644 --- a/tools/src/test/java/org/apache/kafka/tools/StreamsResetterTest.java +++ b/tools/src/test/java/org/apache/kafka/tools/StreamsResetterTest.java @@ -16,7 +16,12 @@ */ package org.apache.kafka.tools; +import org.apache.kafka.clients.admin.Admin; +import org.apache.kafka.clients.admin.ConsumerGroupDescription; +import org.apache.kafka.clients.admin.DescribeConsumerGroupsResult; +import org.apache.kafka.clients.admin.MemberDescription; import org.apache.kafka.clients.admin.MockAdminClient; +import org.apache.kafka.clients.admin.RemoveMembersFromConsumerGroupResult; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.MockConsumer; @@ -27,6 +32,10 @@ import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.TopicPartitionInfo; +import org.apache.kafka.common.errors.GroupIdNotFoundException; +import org.apache.kafka.common.internals.KafkaFutureImpl; +import org.apache.kafka.common.message.LeaveGroupRequestData; +import org.apache.kafka.common.protocol.Errors; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Timeout; @@ -41,7 +50,15 @@ import java.util.concurrent.ExecutionException; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; @Timeout(value = 600) public class StreamsResetterTest { @@ -293,6 +310,77 @@ public void testResetToDatetimeWhenPartitionIsEmptyResetsToLatestOffset() { assertEquals(beginningAndEndOffset, position); } + @Test + public void shouldRetryToRemoveMembersOnUnknownMemberIdExceptionAndForce() throws Exception { + final String groupId = "groupId"; + + final Admin adminClient = mock(Admin.class); + final ConsumerGroupDescription consumerGroupDescription = mock(ConsumerGroupDescription.class); + + when(adminClient.describeConsumerGroups(eq(Set.of(groupId)), any())) + .thenReturn(new DescribeConsumerGroupsResult(Map.of(groupId, KafkaFutureImpl.completedFuture(consumerGroupDescription)))); + when(adminClient.removeMembersFromConsumerGroup(eq(groupId), any())) + .thenReturn(new RemoveMembersFromConsumerGroupResult(KafkaFutureImpl.completedFuture(Map.of(new LeaveGroupRequestData.MemberIdentity(), Errors.UNKNOWN_MEMBER_ID)), Set.of())) + .thenReturn(new RemoveMembersFromConsumerGroupResult(KafkaFutureImpl.completedFuture(Map.of()), Set.of())); + when(consumerGroupDescription.members()).thenReturn(List.of(mock(MemberDescription.class))); + + streamsResetter.maybeDeleteActiveConsumers(groupId, adminClient, true); + + verify(adminClient, times(2)).removeMembersFromConsumerGroup(eq(groupId), any()); + } + + @Test + public void shouldFailIfThereAreMembersAndNotForce() throws Exception { + final String groupId = "groupId"; + + final Admin adminClient = mock(Admin.class); + final ConsumerGroupDescription consumerGroupDescription = mock(ConsumerGroupDescription.class); + + when(adminClient.describeConsumerGroups(eq(Set.of(groupId)), any())) + .thenReturn(new DescribeConsumerGroupsResult(Map.of(groupId, KafkaFutureImpl.completedFuture(consumerGroupDescription)))); + when(consumerGroupDescription.members()).thenReturn(List.of(mock(MemberDescription.class))); + + assertThrows(IllegalStateException.class, () -> streamsResetter.maybeDeleteActiveConsumers(groupId, adminClient, false)); + + verify(adminClient, never()).removeMembersFromConsumerGroup(eq(groupId), any()); + } + + @Test + public void shouldRemoveIfThereAreMembersAndForce() throws Exception { + final String groupId = "groupId"; + + final Admin adminClient = mock(Admin.class); + final ConsumerGroupDescription consumerGroupDescription = mock(ConsumerGroupDescription.class); + + when(adminClient.describeConsumerGroups(eq(Set.of(groupId)), any())) + .thenReturn(new DescribeConsumerGroupsResult(Map.of(groupId, KafkaFutureImpl.completedFuture(consumerGroupDescription)))); + when(adminClient.removeMembersFromConsumerGroup(eq(groupId), any())) + .thenReturn(new RemoveMembersFromConsumerGroupResult(KafkaFutureImpl.completedFuture(Map.of()), Set.of())); + when(consumerGroupDescription.members()).thenReturn(List.of(mock(MemberDescription.class))); + + streamsResetter.maybeDeleteActiveConsumers(groupId, adminClient, true); + + verify(adminClient).removeMembersFromConsumerGroup(eq(groupId), any()); + } + + @Test + public void shouldIgnoreGroupIdNotFoundException() throws Exception { + final String groupId = "groupId"; + + final Admin adminClient = mock(Admin.class); + final ConsumerGroupDescription consumerGroupDescription = mock(ConsumerGroupDescription.class); + + final KafkaFutureImpl future = new KafkaFutureImpl<>(); + future.completeExceptionally(new GroupIdNotFoundException(groupId)); + when(adminClient.describeConsumerGroups(eq(Set.of(groupId)), any())) + .thenReturn(new DescribeConsumerGroupsResult(Map.of(groupId, future))); + when(consumerGroupDescription.members()).thenReturn(List.of(mock(MemberDescription.class))); + + streamsResetter.maybeDeleteActiveConsumers(groupId, adminClient, true); + + verify(adminClient, never()).removeMembersFromConsumerGroup(eq(groupId), any()); + } + private Cluster createCluster(final int numNodes) { final HashMap nodes = new HashMap<>(); for (int i = 0; i < numNodes; ++i) { @@ -315,4 +403,4 @@ public synchronized Map offsetsForTimes(fina return topicPartitionToOffsetAndTimestamp; } } -} \ No newline at end of file +} From 7016288c891c9f11972cd78c72aef41688e4ec56 Mon Sep 17 00:00:00 2001 From: Nikita Date: Wed, 3 Dec 2025 23:46:35 -0800 Subject: [PATCH 2/4] Addressed comments. --- .../RemoveMembersFromConsumerGroupResult.java | 2 +- .../apache/kafka/tools/StreamsResetter.java | 8 +++++--- .../kafka/tools/StreamsResetterTest.java | 18 ++++++++++++++++++ 3 files changed, 24 insertions(+), 4 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/RemoveMembersFromConsumerGroupResult.java b/clients/src/main/java/org/apache/kafka/clients/admin/RemoveMembersFromConsumerGroupResult.java index 58547e390dc26..560d0ab0c9b37 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/RemoveMembersFromConsumerGroupResult.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/RemoveMembersFromConsumerGroupResult.java @@ -36,7 +36,7 @@ public class RemoveMembersFromConsumerGroupResult { private final Set memberInfos; public RemoveMembersFromConsumerGroupResult(KafkaFuture> future, - Set memberInfos) { + Set memberInfos) { this.future = future; this.memberInfos = memberInfos; } diff --git a/tools/src/main/java/org/apache/kafka/tools/StreamsResetter.java b/tools/src/main/java/org/apache/kafka/tools/StreamsResetter.java index ce4f585349926..babeef3e45f7c 100644 --- a/tools/src/main/java/org/apache/kafka/tools/StreamsResetter.java +++ b/tools/src/main/java/org/apache/kafka/tools/StreamsResetter.java @@ -118,6 +118,8 @@ public class StreamsResetter { + "*** Warning! This tool makes irreversible changes to your application. It is strongly recommended that " + "you run this once with \"--dry-run\" to preview your changes before making them.\n\n"; + private static final int MAX_REMOVE_MEMBERS_FROM_CONSUMER_GROUP_RETRIES = 3; + private final List allTopics = new LinkedList<>(); public static void main(final String[] args) { @@ -203,15 +205,15 @@ void maybeDeleteActiveConsumers(final String groupId, + " You can use option '--force' to remove active members from the group."); } } - return; + break; } catch (ExecutionException ee) { // If the group ID is not found, this is not an error case if (ee.getCause() instanceof GroupIdNotFoundException) { - return; + break; } // if a member is unknown, it may mean that it left the group itself. Retrying to confirm. if (ee.getCause() instanceof KafkaException ke && ke.getCause() instanceof UnknownMemberIdException) { - if (retries++ < 3) { + if (retries++ < MAX_REMOVE_MEMBERS_FROM_CONSUMER_GROUP_RETRIES) { continue; } } diff --git a/tools/src/test/java/org/apache/kafka/tools/StreamsResetterTest.java b/tools/src/test/java/org/apache/kafka/tools/StreamsResetterTest.java index a4abe88b090cd..187603ec98e83 100644 --- a/tools/src/test/java/org/apache/kafka/tools/StreamsResetterTest.java +++ b/tools/src/test/java/org/apache/kafka/tools/StreamsResetterTest.java @@ -329,6 +329,24 @@ public void shouldRetryToRemoveMembersOnUnknownMemberIdExceptionAndForce() throw verify(adminClient, times(2)).removeMembersFromConsumerGroup(eq(groupId), any()); } + @Test + public void shouldFailAfterTooManyRetries() throws Exception { + final String groupId = "groupId"; + + final Admin adminClient = mock(Admin.class); + final ConsumerGroupDescription consumerGroupDescription = mock(ConsumerGroupDescription.class); + + when(adminClient.describeConsumerGroups(eq(Set.of(groupId)), any())) + .thenReturn(new DescribeConsumerGroupsResult(Map.of(groupId, KafkaFutureImpl.completedFuture(consumerGroupDescription)))); + when(adminClient.removeMembersFromConsumerGroup(eq(groupId), any())) + .thenReturn(new RemoveMembersFromConsumerGroupResult(KafkaFutureImpl.completedFuture(Map.of(new LeaveGroupRequestData.MemberIdentity(), Errors.UNKNOWN_MEMBER_ID)), Set.of())); + when(consumerGroupDescription.members()).thenReturn(List.of(mock(MemberDescription.class))); + + assertThrows(ExecutionException.class, () -> streamsResetter.maybeDeleteActiveConsumers(groupId, adminClient, true)); + + verify(adminClient, times(4)).removeMembersFromConsumerGroup(eq(groupId), any()); + } + @Test public void shouldFailIfThereAreMembersAndNotForce() throws Exception { final String groupId = "groupId"; From cac7bb742d2451164394b336885a47b727f9f05d Mon Sep 17 00:00:00 2001 From: Nikita Date: Thu, 4 Dec 2025 09:46:23 -0800 Subject: [PATCH 3/4] Replaced a direct object creation with reflection to avoid modifications in the public API. --- .../RemoveMembersFromConsumerGroupResult.java | 4 ++-- .../kafka/tools/StreamsResetterTest.java | 18 ++++++++++++++---- 2 files changed, 16 insertions(+), 6 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/RemoveMembersFromConsumerGroupResult.java b/clients/src/main/java/org/apache/kafka/clients/admin/RemoveMembersFromConsumerGroupResult.java index 560d0ab0c9b37..3845e2f6aac62 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/RemoveMembersFromConsumerGroupResult.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/RemoveMembersFromConsumerGroupResult.java @@ -35,8 +35,8 @@ public class RemoveMembersFromConsumerGroupResult { private final KafkaFuture> future; private final Set memberInfos; - public RemoveMembersFromConsumerGroupResult(KafkaFuture> future, - Set memberInfos) { + RemoveMembersFromConsumerGroupResult(KafkaFuture> future, + Set memberInfos) { this.future = future; this.memberInfos = memberInfos; } diff --git a/tools/src/test/java/org/apache/kafka/tools/StreamsResetterTest.java b/tools/src/test/java/org/apache/kafka/tools/StreamsResetterTest.java index 187603ec98e83..63a59fcc8bfd3 100644 --- a/tools/src/test/java/org/apache/kafka/tools/StreamsResetterTest.java +++ b/tools/src/test/java/org/apache/kafka/tools/StreamsResetterTest.java @@ -20,6 +20,7 @@ import org.apache.kafka.clients.admin.ConsumerGroupDescription; import org.apache.kafka.clients.admin.DescribeConsumerGroupsResult; import org.apache.kafka.clients.admin.MemberDescription; +import org.apache.kafka.clients.admin.MemberToRemove; import org.apache.kafka.clients.admin.MockAdminClient; import org.apache.kafka.clients.admin.RemoveMembersFromConsumerGroupResult; import org.apache.kafka.clients.consumer.ConsumerRecord; @@ -28,6 +29,7 @@ import org.apache.kafka.clients.consumer.OffsetAndTimestamp; import org.apache.kafka.clients.consumer.internals.AutoOffsetResetStrategy; import org.apache.kafka.common.Cluster; +import org.apache.kafka.common.KafkaFuture; import org.apache.kafka.common.Node; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.TopicPartitionInfo; @@ -40,6 +42,7 @@ import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Timeout; +import java.lang.reflect.Constructor; import java.time.Duration; import java.time.Instant; import java.util.HashMap; @@ -320,8 +323,8 @@ public void shouldRetryToRemoveMembersOnUnknownMemberIdExceptionAndForce() throw when(adminClient.describeConsumerGroups(eq(Set.of(groupId)), any())) .thenReturn(new DescribeConsumerGroupsResult(Map.of(groupId, KafkaFutureImpl.completedFuture(consumerGroupDescription)))); when(adminClient.removeMembersFromConsumerGroup(eq(groupId), any())) - .thenReturn(new RemoveMembersFromConsumerGroupResult(KafkaFutureImpl.completedFuture(Map.of(new LeaveGroupRequestData.MemberIdentity(), Errors.UNKNOWN_MEMBER_ID)), Set.of())) - .thenReturn(new RemoveMembersFromConsumerGroupResult(KafkaFutureImpl.completedFuture(Map.of()), Set.of())); + .thenReturn(createRemoveMembersFromConsumerGroupResult(KafkaFutureImpl.completedFuture(Map.of(new LeaveGroupRequestData.MemberIdentity(), Errors.UNKNOWN_MEMBER_ID)), Set.of())) + .thenReturn(createRemoveMembersFromConsumerGroupResult(KafkaFutureImpl.completedFuture(Map.of()), Set.of())); when(consumerGroupDescription.members()).thenReturn(List.of(mock(MemberDescription.class))); streamsResetter.maybeDeleteActiveConsumers(groupId, adminClient, true); @@ -339,7 +342,7 @@ public void shouldFailAfterTooManyRetries() throws Exception { when(adminClient.describeConsumerGroups(eq(Set.of(groupId)), any())) .thenReturn(new DescribeConsumerGroupsResult(Map.of(groupId, KafkaFutureImpl.completedFuture(consumerGroupDescription)))); when(adminClient.removeMembersFromConsumerGroup(eq(groupId), any())) - .thenReturn(new RemoveMembersFromConsumerGroupResult(KafkaFutureImpl.completedFuture(Map.of(new LeaveGroupRequestData.MemberIdentity(), Errors.UNKNOWN_MEMBER_ID)), Set.of())); + .thenReturn(createRemoveMembersFromConsumerGroupResult(KafkaFutureImpl.completedFuture(Map.of(new LeaveGroupRequestData.MemberIdentity(), Errors.UNKNOWN_MEMBER_ID)), Set.of())); when(consumerGroupDescription.members()).thenReturn(List.of(mock(MemberDescription.class))); assertThrows(ExecutionException.class, () -> streamsResetter.maybeDeleteActiveConsumers(groupId, adminClient, true)); @@ -373,7 +376,7 @@ public void shouldRemoveIfThereAreMembersAndForce() throws Exception { when(adminClient.describeConsumerGroups(eq(Set.of(groupId)), any())) .thenReturn(new DescribeConsumerGroupsResult(Map.of(groupId, KafkaFutureImpl.completedFuture(consumerGroupDescription)))); when(adminClient.removeMembersFromConsumerGroup(eq(groupId), any())) - .thenReturn(new RemoveMembersFromConsumerGroupResult(KafkaFutureImpl.completedFuture(Map.of()), Set.of())); + .thenReturn(createRemoveMembersFromConsumerGroupResult(KafkaFutureImpl.completedFuture(Map.of()), Set.of())); when(consumerGroupDescription.members()).thenReturn(List.of(mock(MemberDescription.class))); streamsResetter.maybeDeleteActiveConsumers(groupId, adminClient, true); @@ -399,6 +402,13 @@ public void shouldIgnoreGroupIdNotFoundException() throws Exception { verify(adminClient, never()).removeMembersFromConsumerGroup(eq(groupId), any()); } + private RemoveMembersFromConsumerGroupResult createRemoveMembersFromConsumerGroupResult(final KafkaFuture> future, + final Set memberInfos) throws Exception { + final Constructor constructor = RemoveMembersFromConsumerGroupResult.class.getDeclaredConstructor(KafkaFuture.class, Set.class); + constructor.setAccessible(true); + return constructor.newInstance(future, memberInfos); + } + private Cluster createCluster(final int numNodes) { final HashMap nodes = new HashMap<>(); for (int i = 0; i < numNodes; ++i) { From 681d72953240281031de29cdf90e1208cb521a94 Mon Sep 17 00:00:00 2001 From: Nikita Date: Thu, 4 Dec 2025 22:08:26 -0800 Subject: [PATCH 4/4] Fixed spotlessJavaCheck failure. --- .../test/java/org/apache/kafka/tools/StreamsResetterTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tools/src/test/java/org/apache/kafka/tools/StreamsResetterTest.java b/tools/src/test/java/org/apache/kafka/tools/StreamsResetterTest.java index 63a59fcc8bfd3..020b7c1250f15 100644 --- a/tools/src/test/java/org/apache/kafka/tools/StreamsResetterTest.java +++ b/tools/src/test/java/org/apache/kafka/tools/StreamsResetterTest.java @@ -33,11 +33,11 @@ import org.apache.kafka.common.Node; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.TopicPartitionInfo; - import org.apache.kafka.common.errors.GroupIdNotFoundException; import org.apache.kafka.common.internals.KafkaFutureImpl; import org.apache.kafka.common.message.LeaveGroupRequestData; import org.apache.kafka.common.protocol.Errors; + import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Timeout;