Skip to content

Commit d27d90c

Browse files
authored
MINOR: Refactor OffsetFetch path (#21009)
The `GroupCoordinator` interface has two methods to fetch offsets: `fetchOffsets` and `fetchAllOffsets`. They have the exact same signature and the implementation in `GroupCoordinatorService` is exactly the same, modulo the name of the operation. The path refactors the path to simplify it and reuse more code. We could further refactor `KafkaApis` but let's do this in a follow-up in order to keep this change small and simple. Reviewers: Sean Quah <squah@confluent.io>, Lianet Magrans <lmagrans@confluent.io>
1 parent 98b11d7 commit d27d90c

File tree

8 files changed

+99
-125
lines changed

8 files changed

+99
-125
lines changed

clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchRequest.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -325,6 +325,10 @@ public static boolean useTopicIds(short version) {
325325
return version >= TOPIC_ID_MIN_VERSION;
326326
}
327327

328+
public static boolean requestAllOffsets(OffsetFetchRequestData.OffsetFetchRequestGroup request) {
329+
return request.topics() == null;
330+
}
331+
328332
@Override
329333
public OffsetFetchRequestData data() {
330334
return data;

core/src/main/scala/kafka/server/KafkaApis.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1014,7 +1014,7 @@ class KafkaApis(val requestChannel: RequestChannel,
10141014

10151015
val futures = new mutable.ArrayBuffer[CompletableFuture[OffsetFetchResponseData.OffsetFetchResponseGroup]](groups.size)
10161016
groups.forEach { groupOffsetFetch =>
1017-
val isAllPartitions = groupOffsetFetch.topics == null
1017+
val isAllPartitions = OffsetFetchRequest.requestAllOffsets(groupOffsetFetch)
10181018
if (!authHelper.authorize(request.context, DESCRIBE, GROUP, groupOffsetFetch.groupId)) {
10191019
futures += CompletableFuture.completedFuture(OffsetFetchResponse.groupError(
10201020
groupOffsetFetch,
@@ -1050,7 +1050,7 @@ class KafkaApis(val requestChannel: RequestChannel,
10501050
): CompletableFuture[OffsetFetchResponseData.OffsetFetchResponseGroup] = {
10511051
val useTopicIds = OffsetFetchRequest.useTopicIds(requestContext.apiVersion)
10521052

1053-
groupCoordinator.fetchAllOffsets(
1053+
groupCoordinator.fetchOffsets(
10541054
requestContext,
10551055
groupFetchRequest,
10561056
requireStable

core/src/test/scala/unit/kafka/server/KafkaApisTest.scala

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -9032,7 +9032,7 @@ class KafkaApisTest extends Logging {
90329032
)).thenReturn(group1Future)
90339033

90349034
val group2Future = new CompletableFuture[OffsetFetchResponseData.OffsetFetchResponseGroup]()
9035-
when(groupCoordinator.fetchAllOffsets(
9035+
when(groupCoordinator.fetchOffsets(
90369036
requestChannelRequest.context,
90379037
new OffsetFetchRequestData.OffsetFetchRequestGroup()
90389038
.setGroupId("group-2")
@@ -9041,7 +9041,7 @@ class KafkaApisTest extends Logging {
90419041
)).thenReturn(group2Future)
90429042

90439043
val group3Future = new CompletableFuture[OffsetFetchResponseData.OffsetFetchResponseGroup]()
9044-
when(groupCoordinator.fetchAllOffsets(
9044+
when(groupCoordinator.fetchOffsets(
90459045
requestChannelRequest.context,
90469046
new OffsetFetchRequestData.OffsetFetchRequestGroup()
90479047
.setGroupId("group-3")
@@ -9050,7 +9050,7 @@ class KafkaApisTest extends Logging {
90509050
)).thenReturn(group3Future)
90519051

90529052
val group4Future = new CompletableFuture[OffsetFetchResponseData.OffsetFetchResponseGroup]()
9053-
when(groupCoordinator.fetchAllOffsets(
9053+
when(groupCoordinator.fetchOffsets(
90549054
requestChannelRequest.context,
90559055
new OffsetFetchRequestData.OffsetFetchRequestGroup()
90569056
.setGroupId("group-4")
@@ -9190,7 +9190,7 @@ class KafkaApisTest extends Logging {
91909190
)).thenReturn(group1Future)
91919191

91929192
val group2Future = new CompletableFuture[OffsetFetchResponseData.OffsetFetchResponseGroup]()
9193-
when(groupCoordinator.fetchAllOffsets(
9193+
when(groupCoordinator.fetchOffsets(
91949194
requestChannelRequest.context,
91959195
new OffsetFetchRequestData.OffsetFetchRequestGroup()
91969196
.setGroupId("group-2")
@@ -9384,7 +9384,7 @@ class KafkaApisTest extends Logging {
93849384
val requestChannelRequest = makeRequest(version)
93859385

93869386
val future = new CompletableFuture[OffsetFetchResponseData.OffsetFetchResponseGroup]()
9387-
when(groupCoordinator.fetchAllOffsets(
9387+
when(groupCoordinator.fetchOffsets(
93889388
requestChannelRequest.context,
93899389
new OffsetFetchRequestData.OffsetFetchRequestGroup()
93909390
.setGroupId("group-1")
@@ -9546,7 +9546,7 @@ class KafkaApisTest extends Logging {
95469546

95479547
// group-3 is allowed and bar is allowed.
95489548
val group3Future = new CompletableFuture[OffsetFetchResponseData.OffsetFetchResponseGroup]()
9549-
when(groupCoordinator.fetchAllOffsets(
9549+
when(groupCoordinator.fetchOffsets(
95509550
requestChannelRequest.context,
95519551
new OffsetFetchRequestData.OffsetFetchRequestGroup()
95529552
.setGroupId("group-3")

group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinator.java

Lines changed: 0 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -286,21 +286,6 @@ CompletableFuture<OffsetFetchResponseData.OffsetFetchResponseGroup> fetchOffsets
286286
boolean requireStable
287287
);
288288

289-
/**
290-
* Fetch all offsets for a given Group.
291-
*
292-
* @param context The request context.
293-
* @param request The OffsetFetchRequestGroup request.
294-
*
295-
* @return A future yielding the results.
296-
* The error codes of the results are set to indicate the errors occurred during the execution.
297-
*/
298-
CompletableFuture<OffsetFetchResponseData.OffsetFetchResponseGroup> fetchAllOffsets(
299-
AuthorizableRequestContext context,
300-
OffsetFetchRequestData.OffsetFetchRequestGroup request,
301-
boolean requireStable
302-
);
303-
304289
/**
305290
* Describe the Share Group Offsets for a given group.
306291
*

group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java

Lines changed: 6 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,7 @@
7272
import org.apache.kafka.common.requests.DescribeGroupsRequest;
7373
import org.apache.kafka.common.requests.DescribeShareGroupOffsetsRequest;
7474
import org.apache.kafka.common.requests.OffsetCommitRequest;
75+
import org.apache.kafka.common.requests.OffsetFetchRequest;
7576
import org.apache.kafka.common.requests.OffsetFetchResponse;
7677
import org.apache.kafka.common.requests.ShareGroupDescribeRequest;
7778
import org.apache.kafka.common.requests.ShareGroupHeartbeatRequest;
@@ -1629,6 +1630,8 @@ public CompletableFuture<OffsetFetchResponseData.OffsetFetchResponseGroup> fetch
16291630
));
16301631
}
16311632

1633+
var name = OffsetFetchRequest.requestAllOffsets(request) ? "fetch-all-offsets" : "fetch-offsets";
1634+
16321635
// The require stable flag when set tells the broker to hold on returning unstable
16331636
// (or uncommitted) offsets. In the previous implementation of the group coordinator,
16341637
// the UNSTABLE_OFFSET_COMMIT error is returned when unstable offsets are present. As
@@ -1639,86 +1642,28 @@ public CompletableFuture<OffsetFetchResponseData.OffsetFetchResponseGroup> fetch
16391642
// the pending offsets are committed. Otherwise, we use a read operation.
16401643
if (requireStable) {
16411644
return runtime.scheduleWriteOperation(
1642-
"fetch-offsets",
1645+
name,
16431646
topicPartitionFor(request.groupId()),
16441647
Duration.ofMillis(config.offsetCommitTimeoutMs()),
16451648
coordinator -> new CoordinatorResult<>(
16461649
List.of(),
16471650
coordinator.fetchOffsets(request, Long.MAX_VALUE)
16481651
)
16491652
).exceptionally(exception -> handleOffsetFetchException(
1650-
"fetch-offsets",
1653+
name,
16511654
context,
16521655
request,
16531656
exception
16541657
));
16551658
} else {
16561659
return runtime.scheduleReadOperation(
1657-
"fetch-offsets",
1660+
name,
16581661
topicPartitionFor(request.groupId()),
16591662
(coordinator, offset) -> coordinator.fetchOffsets(request, offset)
16601663
);
16611664
}
16621665
}
16631666

1664-
/**
1665-
* See {@link GroupCoordinator#fetchAllOffsets(AuthorizableRequestContext, OffsetFetchRequestData.OffsetFetchRequestGroup, boolean)}.
1666-
*/
1667-
@Override
1668-
public CompletableFuture<OffsetFetchResponseData.OffsetFetchResponseGroup> fetchAllOffsets(
1669-
AuthorizableRequestContext context,
1670-
OffsetFetchRequestData.OffsetFetchRequestGroup request,
1671-
boolean requireStable
1672-
) {
1673-
if (!isActive.get()) {
1674-
return CompletableFuture.completedFuture(OffsetFetchResponse.groupError(
1675-
request,
1676-
Errors.COORDINATOR_NOT_AVAILABLE,
1677-
context.requestVersion()
1678-
));
1679-
}
1680-
1681-
// For backwards compatibility, we support fetch commits for the empty group id.
1682-
if (request.groupId() == null) {
1683-
return CompletableFuture.completedFuture(OffsetFetchResponse.groupError(
1684-
request,
1685-
Errors.INVALID_GROUP_ID,
1686-
context.requestVersion()
1687-
));
1688-
}
1689-
1690-
// The require stable flag when set tells the broker to hold on returning unstable
1691-
// (or uncommitted) offsets. In the previous implementation of the group coordinator,
1692-
// the UNSTABLE_OFFSET_COMMIT error is returned when unstable offsets are present. As
1693-
// the new implementation relies on timeline data structures, the coordinator does not
1694-
// really know whether offsets are stable or not so it is hard to return the same error.
1695-
// Instead, we use a write operation when the flag is set to guarantee that the fetch
1696-
// is based on all the available offsets and to ensure that the response waits until
1697-
// the pending offsets are committed. Otherwise, we use a read operation.
1698-
if (requireStable) {
1699-
return runtime.scheduleWriteOperation(
1700-
"fetch-all-offsets",
1701-
topicPartitionFor(request.groupId()),
1702-
Duration.ofMillis(config.offsetCommitTimeoutMs()),
1703-
coordinator -> new CoordinatorResult<>(
1704-
List.of(),
1705-
coordinator.fetchAllOffsets(request, Long.MAX_VALUE)
1706-
)
1707-
).exceptionally(exception -> handleOffsetFetchException(
1708-
"fetch-all-offsets",
1709-
context,
1710-
request,
1711-
exception
1712-
));
1713-
} else {
1714-
return runtime.scheduleReadOperation(
1715-
"fetch-all-offsets",
1716-
topicPartitionFor(request.groupId()),
1717-
(coordinator, offset) -> coordinator.fetchAllOffsets(request, offset)
1718-
);
1719-
}
1720-
}
1721-
17221667
/**
17231668
* See {@link GroupCoordinator#describeShareGroupOffsets(AuthorizableRequestContext, DescribeShareGroupOffsetsRequestData.DescribeShareGroupOffsetsRequestGroup)}.
17241669
*/

group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java

Lines changed: 6 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@
5757
import org.apache.kafka.common.message.TxnOffsetCommitResponseData;
5858
import org.apache.kafka.common.protocol.ApiMessage;
5959
import org.apache.kafka.common.protocol.Errors;
60+
import org.apache.kafka.common.requests.OffsetFetchRequest;
6061
import org.apache.kafka.common.requests.TransactionResult;
6162
import org.apache.kafka.common.utils.LogContext;
6263
import org.apache.kafka.common.utils.Time;
@@ -806,23 +807,11 @@ public OffsetFetchResponseData.OffsetFetchResponseGroup fetchOffsets(
806807
OffsetFetchRequestData.OffsetFetchRequestGroup request,
807808
long epoch
808809
) throws ApiException {
809-
return offsetMetadataManager.fetchOffsets(request, epoch);
810-
}
811-
812-
/**
813-
* Fetch all offsets for a given group.
814-
*
815-
* @param request The OffsetFetchRequestGroup request.
816-
* @param epoch The epoch (or offset) used to read from the
817-
* timeline data structure.
818-
*
819-
* @return A List of OffsetFetchResponseTopics response.
820-
*/
821-
public OffsetFetchResponseData.OffsetFetchResponseGroup fetchAllOffsets(
822-
OffsetFetchRequestData.OffsetFetchRequestGroup request,
823-
long epoch
824-
) throws ApiException {
825-
return offsetMetadataManager.fetchAllOffsets(request, epoch);
810+
if (OffsetFetchRequest.requestAllOffsets(request)) {
811+
return offsetMetadataManager.fetchAllOffsets(request, epoch);
812+
} else {
813+
return offsetMetadataManager.fetchOffsets(request, epoch);
814+
}
826815
}
827816

828817
/**

group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java

Lines changed: 27 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -1722,11 +1722,14 @@ public void testFetchOffsets(
17221722
OffsetFetchRequestData.OffsetFetchRequestGroup request =
17231723
new OffsetFetchRequestData.OffsetFetchRequestGroup()
17241724
.setGroupId("group");
1725-
if (!fetchAllOffsets) {
1726-
request
1727-
.setTopics(List.of(new OffsetFetchRequestData.OffsetFetchRequestTopics()
1728-
.setName("foo")
1729-
.setPartitionIndexes(List.of(0))));
1725+
1726+
if (fetchAllOffsets) {
1727+
request.setTopics(null);
1728+
} else {
1729+
request.setTopics(List.of(new OffsetFetchRequestData.OffsetFetchRequestTopics()
1730+
.setName("foo")
1731+
.setPartitionIndexes(List.of(0))
1732+
));
17301733
}
17311734

17321735
OffsetFetchResponseData.OffsetFetchResponseGroup response =
@@ -1753,9 +1756,7 @@ public void testFetchOffsets(
17531756
)).thenReturn(CompletableFuture.completedFuture(response));
17541757
}
17551758

1756-
TriFunction<RequestContext, OffsetFetchRequestData.OffsetFetchRequestGroup, Boolean, CompletableFuture<OffsetFetchResponseData.OffsetFetchResponseGroup>> fetchOffsets =
1757-
fetchAllOffsets ? service::fetchAllOffsets : service::fetchOffsets;
1758-
CompletableFuture<OffsetFetchResponseData.OffsetFetchResponseGroup> future = fetchOffsets.apply(
1759+
CompletableFuture<OffsetFetchResponseData.OffsetFetchResponseGroup> future = service.fetchOffsets(
17591760
requestContext(ApiKeys.OFFSET_FETCH),
17601761
request,
17611762
requireStable
@@ -1784,16 +1785,17 @@ public void testFetchOffsetsWhenNotStarted(
17841785
OffsetFetchRequestData.OffsetFetchRequestGroup request =
17851786
new OffsetFetchRequestData.OffsetFetchRequestGroup()
17861787
.setGroupId("group");
1787-
if (!fetchAllOffsets) {
1788-
request
1789-
.setTopics(List.of(new OffsetFetchRequestData.OffsetFetchRequestTopics()
1790-
.setName("foo")
1791-
.setPartitionIndexes(List.of(0))));
1788+
1789+
if (fetchAllOffsets) {
1790+
request.setTopics(null);
1791+
} else {
1792+
request.setTopics(List.of(new OffsetFetchRequestData.OffsetFetchRequestTopics()
1793+
.setName("foo")
1794+
.setPartitionIndexes(List.of(0))
1795+
));
17921796
}
17931797

1794-
TriFunction<RequestContext, OffsetFetchRequestData.OffsetFetchRequestGroup, Boolean, CompletableFuture<OffsetFetchResponseData.OffsetFetchResponseGroup>> fetchOffsets =
1795-
fetchAllOffsets ? service::fetchAllOffsets : service::fetchOffsets;
1796-
CompletableFuture<OffsetFetchResponseData.OffsetFetchResponseGroup> future = fetchOffsets.apply(
1798+
CompletableFuture<OffsetFetchResponseData.OffsetFetchResponseGroup> future = service.fetchOffsets(
17971799
requestContext(ApiKeys.OFFSET_FETCH),
17981800
request,
17991801
requireStable
@@ -1834,11 +1836,14 @@ public void testFetchOffsetsWithWrappedError(
18341836
OffsetFetchRequestData.OffsetFetchRequestGroup request =
18351837
new OffsetFetchRequestData.OffsetFetchRequestGroup()
18361838
.setGroupId("group");
1837-
if (!fetchAllOffsets) {
1838-
request
1839-
.setTopics(List.of(new OffsetFetchRequestData.OffsetFetchRequestTopics()
1840-
.setName("foo")
1841-
.setPartitionIndexes(List.of(0))));
1839+
1840+
if (fetchAllOffsets) {
1841+
request.setTopics(null);
1842+
} else {
1843+
request.setTopics(List.of(new OffsetFetchRequestData.OffsetFetchRequestTopics()
1844+
.setName("foo")
1845+
.setPartitionIndexes(List.of(0))
1846+
));
18421847
}
18431848

18441849
when(runtime.scheduleWriteOperation(
@@ -1848,9 +1853,7 @@ public void testFetchOffsetsWithWrappedError(
18481853
ArgumentMatchers.any()
18491854
)).thenReturn(FutureUtils.failedFuture(new CompletionException(error.exception())));
18501855

1851-
TriFunction<RequestContext, OffsetFetchRequestData.OffsetFetchRequestGroup, Boolean, CompletableFuture<OffsetFetchResponseData.OffsetFetchResponseGroup>> fetchOffsets =
1852-
fetchAllOffsets ? service::fetchAllOffsets : service::fetchOffsets;
1853-
CompletableFuture<OffsetFetchResponseData.OffsetFetchResponseGroup> future = fetchOffsets.apply(
1856+
CompletableFuture<OffsetFetchResponseData.OffsetFetchResponseGroup> future = service.fetchOffsets(
18541857
requestContext(ApiKeys.OFFSET_FETCH),
18551858
request,
18561859
true

0 commit comments

Comments
 (0)