diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java index c67329cae1..a6798c48b4 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java @@ -17,7 +17,6 @@ */ package org.apache.ratis.server.impl; -import java.util.concurrent.CountDownLatch; import org.apache.ratis.client.impl.ClientProtoUtils; import org.apache.ratis.conf.RaftProperties; import org.apache.ratis.metrics.Timekeeper; @@ -25,11 +24,11 @@ import org.apache.ratis.proto.RaftProtos.AppendEntriesReplyProto.AppendResult; import org.apache.ratis.proto.RaftProtos.AppendEntriesRequestProto; import org.apache.ratis.proto.RaftProtos.CommitInfoProto; -import org.apache.ratis.proto.RaftProtos.LogInfoProto; import org.apache.ratis.proto.RaftProtos.InstallSnapshotReplyProto; import org.apache.ratis.proto.RaftProtos.InstallSnapshotRequestProto; import org.apache.ratis.proto.RaftProtos.InstallSnapshotResult; import org.apache.ratis.proto.RaftProtos.LogEntryProto; +import org.apache.ratis.proto.RaftProtos.LogInfoProto; import org.apache.ratis.proto.RaftProtos.RaftClientRequestProto; import org.apache.ratis.proto.RaftProtos.RaftClientRequestProto.TypeCase; import org.apache.ratis.proto.RaftProtos.RaftConfigurationProto; @@ -82,6 +81,8 @@ import org.apache.ratis.server.RaftServerRpc; import org.apache.ratis.server.impl.LeaderElection.Phase; import org.apache.ratis.server.impl.RetryCacheImpl.CacheEntry; +import org.apache.ratis.server.impl.ServerImplUtils.ConsecutiveIndices; +import org.apache.ratis.server.impl.ServerImplUtils.NavigableIndices; import org.apache.ratis.server.leader.LeaderState; import org.apache.ratis.server.metrics.LeaderElectionMetrics; import org.apache.ratis.server.metrics.RaftServerMetricsImpl; @@ -113,6 +114,7 @@ import org.apache.ratis.util.ReferenceCountedObject; import org.apache.ratis.util.TimeDuration; import org.apache.ratis.util.function.CheckedSupplier; +import org.apache.ratis.util.function.UncheckedAutoCloseableSupplier; import java.io.File; import java.io.IOException; @@ -128,6 +130,7 @@ import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.ThreadLocalRandom; @@ -260,6 +263,7 @@ public long[] getFollowerMatchIndices() { private final ThreadGroup threadGroup; private final AtomicReference> appendLogFuture; + private final NavigableIndices appendLogTermIndices = new NavigableIndices(); RaftServerImpl(RaftGroup group, StateMachine stateMachine, RaftServerProxy proxy, RaftStorage.StartupOption option) throws IOException { @@ -1687,10 +1691,19 @@ leaderId, getMemberId(), currentTerm, followerCommit, inconsistencyReplyNextInde }); } private CompletableFuture appendLog(ReferenceCountedObject> entriesRef) { + final List entriesTermIndices; + try(UncheckedAutoCloseableSupplier> entries = entriesRef.retainAndReleaseOnClose()) { + entriesTermIndices = ConsecutiveIndices.convert(entries.get()); + appendLogTermIndices.append(entriesTermIndices); + } + entriesRef.retain(); return appendLogFuture.updateAndGet(f -> f.thenCompose( ignored -> JavaUtils.allOf(state.getLog().append(entriesRef)))) - .whenComplete((v, e) -> entriesRef.release()); + .whenComplete((v, e) -> { + entriesRef.release(); + appendLogTermIndices.removeExisting(entriesTermIndices); + }); } private long checkInconsistentAppendEntries(TermIndex previous, List entries) { @@ -1717,7 +1730,7 @@ private long checkInconsistentAppendEntries(TermIndex previous, List appendEntriesAsync( try { final RaftGroupId groupId = ProtoUtils.toRaftGroupId(request.getServerRequest().getRaftGroupId()); return getImplFuture(groupId) - .thenCompose(impl -> impl.executeSubmitServerRequestAsync(() -> impl.appendEntriesAsync(requestRef))); + .thenCompose(impl -> JavaUtils.callAsUnchecked( + () -> impl.appendEntriesAsync(requestRef), CompletionException::new)); } finally { requestRef.release(); } diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerImplUtils.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerImplUtils.java index e26c6e0ab1..c5010a5346 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerImplUtils.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerImplUtils.java @@ -40,11 +40,121 @@ import org.apache.ratis.util.TimeDuration; import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; import java.util.List; +import java.util.Map; +import java.util.NavigableMap; +import java.util.TreeMap; import java.util.concurrent.TimeUnit; /** Server utilities for internal use. */ public final class ServerImplUtils { + /** The consecutive indices within the same term. */ + static class ConsecutiveIndices { + /** Convert the given entries to a list of {@link ConsecutiveIndices} */ + static List convert(List entries) { + if (entries == null || entries.isEmpty()) { + return Collections.emptyList(); + } + + List indices = null; + + LogEntryProto previous = entries.get(0); + long startIndex = previous.getIndex(); + int count = 1; + + for (int i = 1; i < entries.size(); i++) { + final LogEntryProto current = entries.get(i); + // validate if the indices are consecutive + Preconditions.assertSame(previous.getIndex() + 1, current.getIndex(), "index"); + + if (current.getTerm() == previous.getTerm()) { + count++; + } else { + // validate if the terms are increasing + Preconditions.assertTrue(previous.getTerm() < current.getTerm(), "term"); + if (indices == null) { + indices = new ArrayList<>(); + } + indices.add(new ConsecutiveIndices(previous.getTerm(), startIndex, count)); + + startIndex = current.getIndex(); + count = 1; + } + previous = current; + } + + final ConsecutiveIndices last = new ConsecutiveIndices(previous.getTerm(), startIndex, count); + if (indices == null) { + return Collections.singletonList(last); + } else { + indices.add(last); + return indices; + } + } + + private final long term; + private final long startIndex; + private final int count; + + ConsecutiveIndices(long term, long startIndex, int count) { + Preconditions.assertTrue(count > 0, () -> "count = " + count + " <= 0 "); + this.term = term; + this.startIndex = startIndex; + this.count = count; + } + + long getNextIndex() { + return startIndex + count; + } + + Long getTerm(long index) { + final long diff = index - startIndex; + return diff < 0 || diff >= count ? null: term; + } + } + + /** A data structure to support the {@link #contains(TermIndex)} method. */ + static class NavigableIndices { + private final NavigableMap map = new TreeMap<>(); + + boolean contains(TermIndex ti) { + final Long term = getTerm(ti.getIndex()); + return term != null && term == ti.getTerm(); + } + + synchronized Long getTerm(long index) { + if (map.isEmpty()) { + return null; + } + + final Map.Entry floorEntry = map.floorEntry(index); + if (floorEntry == null) { + return null; + } + return floorEntry.getValue().getTerm(index); + } + + synchronized void append(List entriesTermIndices) { + for(ConsecutiveIndices indices : entriesTermIndices) { + // validate startIndex + final Map.Entry lastEntry = map.lastEntry(); + if (lastEntry != null) { + Preconditions.assertSame(lastEntry.getValue().getNextIndex(), indices.startIndex, "startIndex"); + } + map.put(indices.startIndex, indices); + } + } + + synchronized void removeExisting(List entriesTermIndices) { + for(ConsecutiveIndices indices : entriesTermIndices) { + final ConsecutiveIndices removed = map.remove(indices.startIndex); + Preconditions.assertSame(indices, removed, "removed"); + } + } + } + private ServerImplUtils() { //Never constructed }