diff --git a/tools/src/main/java/org/apache/kafka/tools/StreamsResetter.java b/tools/src/main/java/org/apache/kafka/tools/StreamsResetter.java index 08fc1d590e826..babeef3e45f7c 100644 --- a/tools/src/main/java/org/apache/kafka/tools/StreamsResetter.java +++ b/tools/src/main/java/org/apache/kafka/tools/StreamsResetter.java @@ -27,10 +27,12 @@ import org.apache.kafka.clients.consumer.GroupProtocol; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.consumer.OffsetAndTimestamp; +import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.KafkaFuture; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.annotation.InterfaceStability; import org.apache.kafka.common.errors.GroupIdNotFoundException; +import org.apache.kafka.common.errors.UnknownMemberIdException; import org.apache.kafka.common.requests.ListOffsetsResponse; import org.apache.kafka.common.serialization.ByteArrayDeserializer; import org.apache.kafka.common.utils.Exit; @@ -116,6 +118,8 @@ public class StreamsResetter { + "*** Warning! This tool makes irreversible changes to your application. It is strongly recommended that " + "you run this once with \"--dry-run\" to preview your changes before making them.\n\n"; + private static final int MAX_REMOVE_MEMBERS_FROM_CONSUMER_GROUP_RETRIES = 3; + private final List allTopics = new LinkedList<>(); public static void main(final String[] args) { @@ -149,7 +153,7 @@ public int execute(final String[] args, final Properties config) { properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServerValue); try (Admin adminClient = Admin.create(properties)) { - maybeDeleteActiveConsumers(groupId, adminClient, options); + maybeDeleteActiveConsumers(groupId, adminClient, options.hasForce()); allTopics.clear(); allTopics.addAll(adminClient.listTopics().names().get(60, TimeUnit.SECONDS)); @@ -177,30 +181,42 @@ public int execute(final String[] args, final Properties config) { } } - private void maybeDeleteActiveConsumers(final String groupId, - final Admin adminClient, - final StreamsResetterOptions options) + // visible for testing + void maybeDeleteActiveConsumers(final String groupId, + final Admin adminClient, + final boolean force) throws ExecutionException, InterruptedException { - final DescribeConsumerGroupsResult describeResult = adminClient.describeConsumerGroups( - Set.of(groupId), - new DescribeConsumerGroupsOptions().timeoutMs(10 * 1000)); - try { - final List members = - new ArrayList<>(describeResult.describedGroups().get(groupId).get().members()); - if (!members.isEmpty()) { - if (options.hasForce()) { - System.out.println("Force deleting all active members in the group: " + groupId); - adminClient.removeMembersFromConsumerGroup(groupId, new RemoveMembersFromConsumerGroupOptions()).all().get(); - } else { - throw new IllegalStateException("Consumer group '" + groupId + "' is still active " - + "and has following members: " + members + ". " - + "Make sure to stop all running application instances before running the reset tool." - + " You can use option '--force' to remove active members from the group."); + int retries = 0; + while (true) { + final DescribeConsumerGroupsResult describeResult = adminClient.describeConsumerGroups( + Set.of(groupId), + new DescribeConsumerGroupsOptions().timeoutMs(10 * 1000)); + try { + final List members = + new ArrayList<>(describeResult.describedGroups().get(groupId).get().members()); + if (!members.isEmpty()) { + if (force) { + System.out.println("Force deleting all active members in the group: " + groupId); + adminClient.removeMembersFromConsumerGroup(groupId, new RemoveMembersFromConsumerGroupOptions()).all().get(); + } else { + throw new IllegalStateException("Consumer group '" + groupId + "' is still active " + + "and has following members: " + members + ". " + + "Make sure to stop all running application instances before running the reset tool." + + " You can use option '--force' to remove active members from the group."); + } + } + break; + } catch (ExecutionException ee) { + // If the group ID is not found, this is not an error case + if (ee.getCause() instanceof GroupIdNotFoundException) { + break; + } + // if a member is unknown, it may mean that it left the group itself. Retrying to confirm. + if (ee.getCause() instanceof KafkaException ke && ke.getCause() instanceof UnknownMemberIdException) { + if (retries++ < MAX_REMOVE_MEMBERS_FROM_CONSUMER_GROUP_RETRIES) { + continue; + } } - } - } catch (ExecutionException ee) { - // If the group ID is not found, this is not an error case - if (!(ee.getCause() instanceof GroupIdNotFoundException)) { throw ee; } } diff --git a/tools/src/test/java/org/apache/kafka/tools/StreamsResetterTest.java b/tools/src/test/java/org/apache/kafka/tools/StreamsResetterTest.java index ef8875f7c49eb..020b7c1250f15 100644 --- a/tools/src/test/java/org/apache/kafka/tools/StreamsResetterTest.java +++ b/tools/src/test/java/org/apache/kafka/tools/StreamsResetterTest.java @@ -16,21 +16,33 @@ */ package org.apache.kafka.tools; +import org.apache.kafka.clients.admin.Admin; +import org.apache.kafka.clients.admin.ConsumerGroupDescription; +import org.apache.kafka.clients.admin.DescribeConsumerGroupsResult; +import org.apache.kafka.clients.admin.MemberDescription; +import org.apache.kafka.clients.admin.MemberToRemove; import org.apache.kafka.clients.admin.MockAdminClient; +import org.apache.kafka.clients.admin.RemoveMembersFromConsumerGroupResult; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.MockConsumer; import org.apache.kafka.clients.consumer.OffsetAndTimestamp; import org.apache.kafka.clients.consumer.internals.AutoOffsetResetStrategy; import org.apache.kafka.common.Cluster; +import org.apache.kafka.common.KafkaFuture; import org.apache.kafka.common.Node; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.TopicPartitionInfo; +import org.apache.kafka.common.errors.GroupIdNotFoundException; +import org.apache.kafka.common.internals.KafkaFutureImpl; +import org.apache.kafka.common.message.LeaveGroupRequestData; +import org.apache.kafka.common.protocol.Errors; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Timeout; +import java.lang.reflect.Constructor; import java.time.Duration; import java.time.Instant; import java.util.HashMap; @@ -41,7 +53,15 @@ import java.util.concurrent.ExecutionException; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; @Timeout(value = 600) public class StreamsResetterTest { @@ -293,6 +313,102 @@ public void testResetToDatetimeWhenPartitionIsEmptyResetsToLatestOffset() { assertEquals(beginningAndEndOffset, position); } + @Test + public void shouldRetryToRemoveMembersOnUnknownMemberIdExceptionAndForce() throws Exception { + final String groupId = "groupId"; + + final Admin adminClient = mock(Admin.class); + final ConsumerGroupDescription consumerGroupDescription = mock(ConsumerGroupDescription.class); + + when(adminClient.describeConsumerGroups(eq(Set.of(groupId)), any())) + .thenReturn(new DescribeConsumerGroupsResult(Map.of(groupId, KafkaFutureImpl.completedFuture(consumerGroupDescription)))); + when(adminClient.removeMembersFromConsumerGroup(eq(groupId), any())) + .thenReturn(createRemoveMembersFromConsumerGroupResult(KafkaFutureImpl.completedFuture(Map.of(new LeaveGroupRequestData.MemberIdentity(), Errors.UNKNOWN_MEMBER_ID)), Set.of())) + .thenReturn(createRemoveMembersFromConsumerGroupResult(KafkaFutureImpl.completedFuture(Map.of()), Set.of())); + when(consumerGroupDescription.members()).thenReturn(List.of(mock(MemberDescription.class))); + + streamsResetter.maybeDeleteActiveConsumers(groupId, adminClient, true); + + verify(adminClient, times(2)).removeMembersFromConsumerGroup(eq(groupId), any()); + } + + @Test + public void shouldFailAfterTooManyRetries() throws Exception { + final String groupId = "groupId"; + + final Admin adminClient = mock(Admin.class); + final ConsumerGroupDescription consumerGroupDescription = mock(ConsumerGroupDescription.class); + + when(adminClient.describeConsumerGroups(eq(Set.of(groupId)), any())) + .thenReturn(new DescribeConsumerGroupsResult(Map.of(groupId, KafkaFutureImpl.completedFuture(consumerGroupDescription)))); + when(adminClient.removeMembersFromConsumerGroup(eq(groupId), any())) + .thenReturn(createRemoveMembersFromConsumerGroupResult(KafkaFutureImpl.completedFuture(Map.of(new LeaveGroupRequestData.MemberIdentity(), Errors.UNKNOWN_MEMBER_ID)), Set.of())); + when(consumerGroupDescription.members()).thenReturn(List.of(mock(MemberDescription.class))); + + assertThrows(ExecutionException.class, () -> streamsResetter.maybeDeleteActiveConsumers(groupId, adminClient, true)); + + verify(adminClient, times(4)).removeMembersFromConsumerGroup(eq(groupId), any()); + } + + @Test + public void shouldFailIfThereAreMembersAndNotForce() throws Exception { + final String groupId = "groupId"; + + final Admin adminClient = mock(Admin.class); + final ConsumerGroupDescription consumerGroupDescription = mock(ConsumerGroupDescription.class); + + when(adminClient.describeConsumerGroups(eq(Set.of(groupId)), any())) + .thenReturn(new DescribeConsumerGroupsResult(Map.of(groupId, KafkaFutureImpl.completedFuture(consumerGroupDescription)))); + when(consumerGroupDescription.members()).thenReturn(List.of(mock(MemberDescription.class))); + + assertThrows(IllegalStateException.class, () -> streamsResetter.maybeDeleteActiveConsumers(groupId, adminClient, false)); + + verify(adminClient, never()).removeMembersFromConsumerGroup(eq(groupId), any()); + } + + @Test + public void shouldRemoveIfThereAreMembersAndForce() throws Exception { + final String groupId = "groupId"; + + final Admin adminClient = mock(Admin.class); + final ConsumerGroupDescription consumerGroupDescription = mock(ConsumerGroupDescription.class); + + when(adminClient.describeConsumerGroups(eq(Set.of(groupId)), any())) + .thenReturn(new DescribeConsumerGroupsResult(Map.of(groupId, KafkaFutureImpl.completedFuture(consumerGroupDescription)))); + when(adminClient.removeMembersFromConsumerGroup(eq(groupId), any())) + .thenReturn(createRemoveMembersFromConsumerGroupResult(KafkaFutureImpl.completedFuture(Map.of()), Set.of())); + when(consumerGroupDescription.members()).thenReturn(List.of(mock(MemberDescription.class))); + + streamsResetter.maybeDeleteActiveConsumers(groupId, adminClient, true); + + verify(adminClient).removeMembersFromConsumerGroup(eq(groupId), any()); + } + + @Test + public void shouldIgnoreGroupIdNotFoundException() throws Exception { + final String groupId = "groupId"; + + final Admin adminClient = mock(Admin.class); + final ConsumerGroupDescription consumerGroupDescription = mock(ConsumerGroupDescription.class); + + final KafkaFutureImpl future = new KafkaFutureImpl<>(); + future.completeExceptionally(new GroupIdNotFoundException(groupId)); + when(adminClient.describeConsumerGroups(eq(Set.of(groupId)), any())) + .thenReturn(new DescribeConsumerGroupsResult(Map.of(groupId, future))); + when(consumerGroupDescription.members()).thenReturn(List.of(mock(MemberDescription.class))); + + streamsResetter.maybeDeleteActiveConsumers(groupId, adminClient, true); + + verify(adminClient, never()).removeMembersFromConsumerGroup(eq(groupId), any()); + } + + private RemoveMembersFromConsumerGroupResult createRemoveMembersFromConsumerGroupResult(final KafkaFuture> future, + final Set memberInfos) throws Exception { + final Constructor constructor = RemoveMembersFromConsumerGroupResult.class.getDeclaredConstructor(KafkaFuture.class, Set.class); + constructor.setAccessible(true); + return constructor.newInstance(future, memberInfos); + } + private Cluster createCluster(final int numNodes) { final HashMap nodes = new HashMap<>(); for (int i = 0; i < numNodes; ++i) { @@ -315,4 +431,4 @@ public synchronized Map offsetsForTimes(fina return topicPartitionToOffsetAndTimestamp; } } -} \ No newline at end of file +}