Skip to content

Commit af934aa

Browse files
authored
MINOR: Rename CoordinatorRuntime.activeTopicPartitions to activeCoordinators (#21010)
The coordinator runtime uses the term coordinator in all APIs so `CoordinatorRuntime.activeTopicPartitions` feels a bit weird. This patch renames it to `CoordinatorRuntime.activeCoordinators` to follow the convention. It also makes the `state` field within the context volatile as it is accesses without the log in that method. Reviewers: Chia-Ping Tsai <chia7712@gmail.com>, Lianet Magrans <lmagrans@confluent.io>
1 parent a9cc8f2 commit af934aa

File tree

3 files changed

+6
-11
lines changed

3 files changed

+6
-11
lines changed

coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntime.java

Lines changed: 3 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -578,7 +578,7 @@ class CoordinatorContext {
578578
/**
579579
* The current state.
580580
*/
581-
CoordinatorState state;
581+
volatile CoordinatorState state;
582582

583583
/**
584584
* The current epoch of the coordinator. This represents
@@ -2611,14 +2611,9 @@ public void close() throws Exception {
26112611
}
26122612

26132613
/**
2614-
* Util method which returns all the topic partitions for which
2615-
* the state machine is in active state.
2616-
* <p>
2617-
* This could be useful if the caller does not have a specific
2618-
* target internal topic partition.
2619-
* @return List of {@link TopicPartition} whose coordinators are active
2614+
* @return List of {@link TopicPartition} whose coordinators are active.
26202615
*/
2621-
public List<TopicPartition> activeTopicPartitions() {
2616+
public List<TopicPartition> activeCoordinators() {
26222617
if (coordinators == null || coordinators.isEmpty()) {
26232618
return List.of();
26242619
}

share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorService.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -301,7 +301,7 @@ public void run() {
301301
return;
302302
}
303303
List<CompletableFuture<Void>> futures = new ArrayList<>();
304-
runtime.activeTopicPartitions().forEach(tp -> futures.add(performRecordPruning(tp)));
304+
runtime.activeCoordinators().forEach(tp -> futures.add(performRecordPruning(tp)));
305305

306306
CompletableFuture.allOf(futures.toArray(new CompletableFuture<?>[]{}))
307307
.whenComplete((res, exp) -> {

share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorServiceTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,7 @@ class ShareCoordinatorServiceTest {
9393
@SuppressWarnings("unchecked")
9494
private CoordinatorRuntime<ShareCoordinatorShard, CoordinatorRecord> mockRuntime() {
9595
CoordinatorRuntime<ShareCoordinatorShard, CoordinatorRecord> runtime = mock(CoordinatorRuntime.class);
96-
when(runtime.activeTopicPartitions())
96+
when(runtime.activeCoordinators())
9797
.thenReturn(List.of(new TopicPartition(Topic.SHARE_GROUP_STATE_TOPIC_NAME, 0)));
9898
return runtime;
9999
}
@@ -1533,7 +1533,7 @@ public void testRecordPruningTaskPeriodicityWithSomeFailures() throws Exception
15331533
TopicPartition tp1 = new TopicPartition(Topic.SHARE_GROUP_STATE_TOPIC_NAME, 0);
15341534
TopicPartition tp2 = new TopicPartition(Topic.SHARE_GROUP_STATE_TOPIC_NAME, 1);
15351535

1536-
when(runtime.activeTopicPartitions())
1536+
when(runtime.activeCoordinators())
15371537
.thenReturn(List.of(tp1, tp2));
15381538

15391539
when(writer.deleteRecords(

0 commit comments

Comments
 (0)