From a103c33e1f5d8302f9b7760f2d829bc69ec39227 Mon Sep 17 00:00:00 2001 From: szywilliam Date: Sun, 26 Jan 2025 09:35:18 +0800 Subject: [PATCH 1/2] apply review patch --- .../ratis/server/impl/RaftServerImpl.java | 19 ++- .../ratis/server/impl/ServerImplUtils.java | 111 ++++++++++++++++++ 2 files changed, 125 insertions(+), 5 deletions(-) diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java index c67329cae1..03f12b6fad 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java @@ -17,7 +17,6 @@ */ package org.apache.ratis.server.impl; -import java.util.concurrent.CountDownLatch; import org.apache.ratis.client.impl.ClientProtoUtils; import org.apache.ratis.conf.RaftProperties; import org.apache.ratis.metrics.Timekeeper; @@ -25,11 +24,11 @@ import org.apache.ratis.proto.RaftProtos.AppendEntriesReplyProto.AppendResult; import org.apache.ratis.proto.RaftProtos.AppendEntriesRequestProto; import org.apache.ratis.proto.RaftProtos.CommitInfoProto; -import org.apache.ratis.proto.RaftProtos.LogInfoProto; import org.apache.ratis.proto.RaftProtos.InstallSnapshotReplyProto; import org.apache.ratis.proto.RaftProtos.InstallSnapshotRequestProto; import org.apache.ratis.proto.RaftProtos.InstallSnapshotResult; import org.apache.ratis.proto.RaftProtos.LogEntryProto; +import org.apache.ratis.proto.RaftProtos.LogInfoProto; import org.apache.ratis.proto.RaftProtos.RaftClientRequestProto; import org.apache.ratis.proto.RaftProtos.RaftClientRequestProto.TypeCase; import org.apache.ratis.proto.RaftProtos.RaftConfigurationProto; @@ -82,6 +81,8 @@ import org.apache.ratis.server.RaftServerRpc; import org.apache.ratis.server.impl.LeaderElection.Phase; import org.apache.ratis.server.impl.RetryCacheImpl.CacheEntry; +import org.apache.ratis.server.impl.ServerImplUtils.ConsecutiveIndices; +import org.apache.ratis.server.impl.ServerImplUtils.NavigableIndices; import org.apache.ratis.server.leader.LeaderState; import org.apache.ratis.server.metrics.LeaderElectionMetrics; import org.apache.ratis.server.metrics.RaftServerMetricsImpl; @@ -128,6 +129,7 @@ import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.ThreadLocalRandom; @@ -260,6 +262,7 @@ public long[] getFollowerMatchIndices() { private final ThreadGroup threadGroup; private final AtomicReference> appendLogFuture; + private final NavigableIndices appendLogTermIndices = new NavigableIndices(); RaftServerImpl(RaftGroup group, StateMachine stateMachine, RaftServerProxy proxy, RaftStorage.StartupOption option) throws IOException { @@ -1687,10 +1690,16 @@ leaderId, getMemberId(), currentTerm, followerCommit, inconsistencyReplyNextInde }); } private CompletableFuture appendLog(ReferenceCountedObject> entriesRef) { - entriesRef.retain(); + final List entries = entriesRef.retain(); + final List entriesTermIndices = ConsecutiveIndices.convert(entries); + appendLogTermIndices.append(entriesTermIndices); + return appendLogFuture.updateAndGet(f -> f.thenCompose( ignored -> JavaUtils.allOf(state.getLog().append(entriesRef)))) - .whenComplete((v, e) -> entriesRef.release()); + .whenComplete((v, e) -> { + entriesRef.release(); + appendLogTermIndices.removeExisting(entriesTermIndices); + }); } private long checkInconsistentAppendEntries(TermIndex previous, List entries) { @@ -1717,7 +1726,7 @@ private long checkInconsistentAppendEntries(TermIndex previous, List convert(List entries) { + if (entries == null || entries.isEmpty()) { + return Collections.emptyList(); + } + + List indices = null; + + LogEntryProto previous = entries.get(0); + long startIndex = previous.getIndex(); + int count = 1; + + for (int i = 1; i < entries.size(); i++) { + final LogEntryProto current = entries.get(i); + // validate if the indices are consecutive + Preconditions.assertSame(previous.getIndex() + 1, current.getIndex(), "index"); + + if (current.getTerm() == previous.getTerm()) { + count++; + } else { + // validate if the terms are increasing + Preconditions.assertTrue(previous.getTerm() < current.getTerm(), "term"); + if (indices == null) { + indices = new ArrayList<>(); + } + indices.add(new ConsecutiveIndices(previous.getTerm(), startIndex, count)); + + startIndex = current.getIndex(); + count = 1; + } + previous = current; + } + + final ConsecutiveIndices last = new ConsecutiveIndices(previous.getTerm(), startIndex, count); + if (indices == null) { + return Collections.singletonList(last); + } else { + indices.add(last); + return indices; + } + } + + private final long term; + private final long startIndex; + private final int count; + + ConsecutiveIndices(long term, long startIndex, int count) { + Preconditions.assertTrue(count > 0, () -> "count = " + count + " <= 0 "); + this.term = term; + this.startIndex = startIndex; + this.count = count; + } + + long getNextIndex() { + return startIndex + count; + } + + Long getTerm(long index) { + final long diff = index - startIndex; + return diff < 0 || diff >= count ? null: term; + } + } + + /** A data structure to support the {@link #contains(TermIndex)} method. */ + static class NavigableIndices { + private final NavigableMap map = new TreeMap<>(); + + boolean contains(TermIndex ti) { + final Long term = getTerm(ti.getIndex()); + return term != null && term == ti.getTerm(); + } + + synchronized Long getTerm(long index) { + if (map.isEmpty()) { + return null; + } + + final Map.Entry floorEntry = map.floorEntry(index); + if (floorEntry == null) { + return null; + } + return floorEntry.getValue().getTerm(index); + } + + synchronized void append(List entriesTermIndices) { + for(ConsecutiveIndices indices : entriesTermIndices) { + // validate index0 + final long index0 = indices.startIndex; + final Map.Entry lastEntry = map.lastEntry(); + if (lastEntry != null) { + Preconditions.assertSame(lastEntry.getValue().getNextIndex(), index0, "index0"); + } + map.put(index0, indices); + } + } + + synchronized void removeExisting(List entriesTermIndices) { + for(ConsecutiveIndices indices : entriesTermIndices) { + final ConsecutiveIndices removed = map.remove(indices.startIndex); + Preconditions.assertSame(indices, removed, "removed"); + } + } + } + private ServerImplUtils() { //Never constructed } From 8212ecc898772512c9b577fea0fba4998ded4dee Mon Sep 17 00:00:00 2001 From: szywilliam Date: Thu, 20 Feb 2025 19:38:53 +0800 Subject: [PATCH 2/2] address review issues --- .../org/apache/ratis/server/impl/RaftServerImpl.java | 10 +++++++--- .../org/apache/ratis/server/impl/RaftServerProxy.java | 3 ++- .../org/apache/ratis/server/impl/ServerImplUtils.java | 7 +++---- 3 files changed, 12 insertions(+), 8 deletions(-) diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java index 03f12b6fad..a6798c48b4 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java @@ -114,6 +114,7 @@ import org.apache.ratis.util.ReferenceCountedObject; import org.apache.ratis.util.TimeDuration; import org.apache.ratis.util.function.CheckedSupplier; +import org.apache.ratis.util.function.UncheckedAutoCloseableSupplier; import java.io.File; import java.io.IOException; @@ -1690,10 +1691,13 @@ leaderId, getMemberId(), currentTerm, followerCommit, inconsistencyReplyNextInde }); } private CompletableFuture appendLog(ReferenceCountedObject> entriesRef) { - final List entries = entriesRef.retain(); - final List entriesTermIndices = ConsecutiveIndices.convert(entries); - appendLogTermIndices.append(entriesTermIndices); + final List entriesTermIndices; + try(UncheckedAutoCloseableSupplier> entries = entriesRef.retainAndReleaseOnClose()) { + entriesTermIndices = ConsecutiveIndices.convert(entries.get()); + appendLogTermIndices.append(entriesTermIndices); + } + entriesRef.retain(); return appendLogFuture.updateAndGet(f -> f.thenCompose( ignored -> JavaUtils.allOf(state.getLog().append(entriesRef)))) .whenComplete((v, e) -> { diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java index 6b41c8c2ac..523a418337 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java @@ -653,7 +653,8 @@ public CompletableFuture appendEntriesAsync( try { final RaftGroupId groupId = ProtoUtils.toRaftGroupId(request.getServerRequest().getRaftGroupId()); return getImplFuture(groupId) - .thenCompose(impl -> impl.executeSubmitServerRequestAsync(() -> impl.appendEntriesAsync(requestRef))); + .thenCompose(impl -> JavaUtils.callAsUnchecked( + () -> impl.appendEntriesAsync(requestRef), CompletionException::new)); } finally { requestRef.release(); } diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerImplUtils.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerImplUtils.java index 976a050085..c5010a5346 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerImplUtils.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerImplUtils.java @@ -138,13 +138,12 @@ synchronized Long getTerm(long index) { synchronized void append(List entriesTermIndices) { for(ConsecutiveIndices indices : entriesTermIndices) { - // validate index0 - final long index0 = indices.startIndex; + // validate startIndex final Map.Entry lastEntry = map.lastEntry(); if (lastEntry != null) { - Preconditions.assertSame(lastEntry.getValue().getNextIndex(), index0, "index0"); + Preconditions.assertSame(lastEntry.getValue().getNextIndex(), indices.startIndex, "startIndex"); } - map.put(index0, indices); + map.put(indices.startIndex, indices); } }