|
40 | 40 | import org.apache.ratis.util.TimeDuration; |
41 | 41 |
|
42 | 42 | import java.io.IOException; |
| 43 | +import java.util.ArrayList; |
| 44 | +import java.util.Collections; |
43 | 45 | import java.util.List; |
| 46 | +import java.util.Map; |
| 47 | +import java.util.NavigableMap; |
| 48 | +import java.util.TreeMap; |
44 | 49 | import java.util.concurrent.TimeUnit; |
45 | 50 |
|
46 | 51 | /** Server utilities for internal use. */ |
47 | 52 | public final class ServerImplUtils { |
| 53 | + /** The consecutive indices within the same term. */ |
| 54 | + static class ConsecutiveIndices { |
| 55 | + /** Convert the given entries to a list of {@link ConsecutiveIndices} */ |
| 56 | + static List<ConsecutiveIndices> convert(List<LogEntryProto> entries) { |
| 57 | + if (entries == null || entries.isEmpty()) { |
| 58 | + return Collections.emptyList(); |
| 59 | + } |
| 60 | + |
| 61 | + List<ConsecutiveIndices> indices = null; |
| 62 | + |
| 63 | + LogEntryProto previous = entries.get(0); |
| 64 | + long startIndex = previous.getIndex(); |
| 65 | + int count = 1; |
| 66 | + |
| 67 | + for (int i = 1; i < entries.size(); i++) { |
| 68 | + final LogEntryProto current = entries.get(i); |
| 69 | + // validate if the indices are consecutive |
| 70 | + Preconditions.assertSame(previous.getIndex() + 1, current.getIndex(), "index"); |
| 71 | + |
| 72 | + if (current.getTerm() == previous.getTerm()) { |
| 73 | + count++; |
| 74 | + } else { |
| 75 | + // validate if the terms are increasing |
| 76 | + Preconditions.assertTrue(previous.getTerm() < current.getTerm(), "term"); |
| 77 | + if (indices == null) { |
| 78 | + indices = new ArrayList<>(); |
| 79 | + } |
| 80 | + indices.add(new ConsecutiveIndices(previous.getTerm(), startIndex, count)); |
| 81 | + |
| 82 | + startIndex = current.getIndex(); |
| 83 | + count = 1; |
| 84 | + } |
| 85 | + previous = current; |
| 86 | + } |
| 87 | + |
| 88 | + final ConsecutiveIndices last = new ConsecutiveIndices(previous.getTerm(), startIndex, count); |
| 89 | + if (indices == null) { |
| 90 | + return Collections.singletonList(last); |
| 91 | + } else { |
| 92 | + indices.add(last); |
| 93 | + return indices; |
| 94 | + } |
| 95 | + } |
| 96 | + |
| 97 | + private final long term; |
| 98 | + private final long startIndex; |
| 99 | + private final int count; |
| 100 | + |
| 101 | + ConsecutiveIndices(long term, long startIndex, int count) { |
| 102 | + Preconditions.assertTrue(count > 0, () -> "count = " + count + " <= 0 "); |
| 103 | + this.term = term; |
| 104 | + this.startIndex = startIndex; |
| 105 | + this.count = count; |
| 106 | + } |
| 107 | + |
| 108 | + long getNextIndex() { |
| 109 | + return startIndex + count; |
| 110 | + } |
| 111 | + |
| 112 | + Long getTerm(long index) { |
| 113 | + final long diff = index - startIndex; |
| 114 | + return diff < 0 || diff >= count ? null: term; |
| 115 | + } |
| 116 | + } |
| 117 | + |
| 118 | + /** A data structure to support the {@link #contains(TermIndex)} method. */ |
| 119 | + static class NavigableIndices { |
| 120 | + private final NavigableMap<Long, ConsecutiveIndices> map = new TreeMap<>(); |
| 121 | + |
| 122 | + boolean contains(TermIndex ti) { |
| 123 | + final Long term = getTerm(ti.getIndex()); |
| 124 | + return term != null && term == ti.getTerm(); |
| 125 | + } |
| 126 | + |
| 127 | + synchronized Long getTerm(long index) { |
| 128 | + if (map.isEmpty()) { |
| 129 | + return null; |
| 130 | + } |
| 131 | + |
| 132 | + final Map.Entry<Long, ConsecutiveIndices> floorEntry = map.floorEntry(index); |
| 133 | + if (floorEntry == null) { |
| 134 | + return null; |
| 135 | + } |
| 136 | + return floorEntry.getValue().getTerm(index); |
| 137 | + } |
| 138 | + |
| 139 | + synchronized void append(List<ConsecutiveIndices> entriesTermIndices) { |
| 140 | + for(ConsecutiveIndices indices : entriesTermIndices) { |
| 141 | + // validate startIndex |
| 142 | + final Map.Entry<Long, ConsecutiveIndices> lastEntry = map.lastEntry(); |
| 143 | + if (lastEntry != null) { |
| 144 | + Preconditions.assertSame(lastEntry.getValue().getNextIndex(), indices.startIndex, "startIndex"); |
| 145 | + } |
| 146 | + map.put(indices.startIndex, indices); |
| 147 | + } |
| 148 | + } |
| 149 | + |
| 150 | + synchronized void removeExisting(List<ConsecutiveIndices> entriesTermIndices) { |
| 151 | + for(ConsecutiveIndices indices : entriesTermIndices) { |
| 152 | + final ConsecutiveIndices removed = map.remove(indices.startIndex); |
| 153 | + Preconditions.assertSame(indices, removed, "removed"); |
| 154 | + } |
| 155 | + } |
| 156 | + } |
| 157 | + |
48 | 158 | private ServerImplUtils() { |
49 | 159 | //Never constructed |
50 | 160 | } |
|
0 commit comments