Skip to content

Commit 13a3ae9

Browse files
committed
Fix:
- DurabilityQueue/ShardScheduler deadlock - MemtableCleanerThread.Cleanup assumes Boolean parameter is non-null, which is invalid if an exception has been thrown - AccordDurableOnFlush may be invoked while Accord is starting up, so should use AccordService.unsafeInstance - AccordCache shrink without lock regression - Cleanup system_accord compaction leftovers before starting up - system_accord_debug.txn order - system_accord_debug.txn_blocked_by order - system_accord_debug.shard_epochs order Improve: - Set DefaultProgressLog.setMode(Catchup) during Catchup - IdentityAccumulators only need to readLast, not readAll - Limit number of static segments we compact at once to sstable - If too many static segments on startup, wait for them to be compacted patch by Benedict; reviewed by Alex Petrov for CASSANDRA-21053
1 parent 21382e6 commit 13a3ae9

File tree

19 files changed

+236
-76
lines changed

19 files changed

+236
-76
lines changed

src/java/org/apache/cassandra/config/AccordSpec.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -197,6 +197,7 @@ public enum MixedTimeSourceHandling
197197
public static class JournalSpec implements Params
198198
{
199199
public int segmentSize = 32 << 20;
200+
public int compactMaxSegments = 32;
200201
public FailurePolicy failurePolicy = FailurePolicy.STOP;
201202
public ReplayMode replayMode = ReplayMode.ONLY_NON_DURABLE;
202203
public FlushMode flushMode = FlushMode.PERIODIC;
@@ -225,6 +226,12 @@ public int segmentSize()
225226
return segmentSize;
226227
}
227228

229+
@Override
230+
public int compactMaxSegments()
231+
{
232+
return compactMaxSegments;
233+
}
234+
228235
@Override
229236
public FailurePolicy failurePolicy()
230237
{

src/java/org/apache/cassandra/db/virtual/AccordDebugKeyspace.java

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -146,6 +146,7 @@
146146
import org.apache.cassandra.tcm.ClusterMetadata;
147147
import org.apache.cassandra.tcm.membership.NodeId;
148148
import org.apache.cassandra.utils.LocalizeString;
149+
import org.apache.cassandra.utils.concurrent.Future;
149150

150151
import static accord.coordinate.Infer.InvalidIf.NotKnownToBeInvalid;
151152
import static accord.local.RedundantStatus.Property.GC_BEFORE;
@@ -169,6 +170,7 @@
169170
import static org.apache.cassandra.db.virtual.VirtualTable.Sorted.SORTED;
170171
import static org.apache.cassandra.db.virtual.VirtualTable.Sorted.UNSORTED;
171172
import static org.apache.cassandra.schema.SchemaConstants.VIRTUAL_ACCORD_DEBUG;
173+
import static org.apache.cassandra.service.accord.AccordService.toFuture;
172174
import static org.apache.cassandra.utils.MonotonicClock.Global.approxTime;
173175

174176
// TODO (expected): split into separate classes in own package
@@ -713,7 +715,9 @@ private void addPartition(CommandStore commandStore, PartitionsCollector collect
713715
collector.partition(commandStore.id())
714716
.collect(rows -> {
715717
// TODO (desired): support maybe execute immediately with safeStore
716-
AccordService.getBlocking(commandStore.chain((PreLoadContext.Empty) metadata::toString, safeStore -> { addRows(safeStore, rows); }));
718+
Future<?> future = toFuture(commandStore.chain((PreLoadContext.Empty) metadata::toString, safeStore -> { addRows(safeStore, rows); }));
719+
if (!future.awaitUntilThrowUncheckedOnInterrupt(collector.deadlineNanos()))
720+
throw new InternalTimeoutException();
717721
});
718722
}
719723

@@ -1601,7 +1605,7 @@ private TxnTable()
16011605
"Accord per-CommandStore Transaction State",
16021606
"CREATE TABLE %s (\n" +
16031607
" command_store_id int,\n" +
1604-
" txn_id text,\n" +
1608+
" txn_id 'TxnIdUtf8Type',\n" +
16051609
" save_status text,\n" +
16061610
" route text,\n" +
16071611
" durability text,\n" +
@@ -1792,7 +1796,7 @@ protected void applyRowUpdate(Object[] partitionKeys, Object[] clusteringKeys, C
17921796
case TRY_EXECUTE:
17931797
run(txnId, commandStoreId, safeStore -> {
17941798
SafeCommand safeCommand = safeStore.unsafeGet(txnId);
1795-
Commands.maybeExecute(safeStore, safeCommand, safeCommand.current(), true, true, NotifyWaitingOnPlus.adapter(null, true, true));
1799+
Commands.maybeExecute(safeStore, safeCommand, safeCommand.current(), true, true, NotifyWaitingOnPlus.adapter(ignore -> {}, true, true));
17961800
return AsyncChains.success(null);
17971801
});
17981802
break;
@@ -2089,7 +2093,7 @@ protected TxnBlockedByTable()
20892093
" blocked_by_txn_id 'TxnIdUtf8Type',\n" +
20902094
" save_status text,\n" +
20912095
" execute_at text,\n" +
2092-
" PRIMARY KEY (txn_id, command_store_id, depth, blocked_by_key, blocked_by_txn_id)" +
2096+
" PRIMARY KEY (txn_id, depth, command_store_id, blocked_by_txn_id, blocked_by_key)" +
20932097
')', TxnIdUtf8Type.instance), BEST_EFFORT, ASC);
20942098
}
20952099

@@ -2111,7 +2115,7 @@ protected void collect(PartitionsCollector collector)
21112115
DebugBlockedTxns.visit(AccordService.unsafeInstance(), txnId, maxDepth, collector.deadlineNanos(), txn -> {
21122116
String keyStr = txn.blockedViaKey == null ? "" : txn.blockedViaKey.toString();
21132117
String txnIdStr = txn.txnId == null || txn.txnId.equals(txnId) ? "" : txn.txnId.toString();
2114-
rows.add(txn.commandStoreId, txn.depth, keyStr, txnIdStr)
2118+
rows.add(txn.depth, txn.commandStoreId, txnIdStr, keyStr)
21152119
.eagerCollect(columns -> {
21162120
columns.add("save_status", txn.saveStatus, TO_STRING)
21172121
.add("execute_at", txn.executeAt, TO_STRING);
@@ -2160,8 +2164,9 @@ private ShardEpochsTable()
21602164
" quorum_fast_privileged_deps int,\n" +
21612165
" quorum_fast_privileged_nodeps int,\n" +
21622166
" token_end 'TokenUtf8Type',\n" +
2163-
" PRIMARY KEY (table_id, token_start, epoch_start)" +
2164-
')', UTF8Type.instance), FAIL, ASC);
2167+
" PRIMARY KEY (table_id, token_start, epoch_start))" +
2168+
" WITH CLUSTERING ORDER BY (token_start ASC, epoch_start DESC);"
2169+
, UTF8Type.instance), FAIL, ASC);
21652170
}
21662171

21672172
@Override

src/java/org/apache/cassandra/journal/Compactor.java

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,9 @@
1818
package org.apache.cassandra.journal;
1919

2020
import java.io.IOException;
21+
import java.util.ArrayList;
2122
import java.util.Collection;
22-
import java.util.HashSet;
23-
import java.util.Set;
23+
import java.util.List;
2424
import java.util.concurrent.Future;
2525
import java.util.concurrent.TimeUnit;
2626

@@ -29,6 +29,7 @@
2929

3030
import org.apache.cassandra.concurrent.ScheduledExecutorPlus;
3131
import org.apache.cassandra.concurrent.Shutdownable;
32+
import org.apache.cassandra.utils.concurrent.WaitQueue;
3233

3334
import static org.apache.cassandra.concurrent.ExecutorFactory.Global.executorFactory;
3435

@@ -40,6 +41,7 @@ public final class Compactor<K, V> implements Runnable, Shutdownable
4041
private final SegmentCompactor<K, V> segmentCompactor;
4142
private final ScheduledExecutorPlus executor;
4243
private Future<?> scheduled;
44+
public final WaitQueue compacted = WaitQueue.newWaitQueue();
4345

4446
Compactor(Journal<K, V> journal, SegmentCompactor<K, V> segmentCompactor)
4547
{
@@ -73,11 +75,18 @@ public synchronized void updateCompactionPeriod(int period, TimeUnit units)
7375
@Override
7476
public void run()
7577
{
76-
Set<StaticSegment<K, V>> toCompact = new HashSet<>();
78+
List<StaticSegment<K, V>> toCompact = new ArrayList<>();
7779
journal.segments().selectStatic(toCompact);
7880
if (toCompact.isEmpty())
7981
return;
8082

83+
int limit = journal.params.compactMaxSegments();
84+
if (toCompact.size() > limit)
85+
{
86+
toCompact.sort(StaticSegment::compareTo);
87+
toCompact.subList(limit, toCompact.size()).clear();
88+
}
89+
8190
try
8291
{
8392
Collection<StaticSegment<K, V>> newSegments = segmentCompactor.compact(toCompact);
@@ -88,6 +97,8 @@ public void run()
8897
journal.replaceCompactedSegments(toCompact, newSegments);
8998
for (StaticSegment<K, V> segment : toCompact)
9099
segment.discard(journal);
100+
101+
compacted.signalAll();
91102
}
92103
catch (IOException e)
93104
{

src/java/org/apache/cassandra/journal/Journal.java

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -222,6 +222,27 @@ public void start()
222222
"Unexpected journal state after initialization", state);
223223
flusher.start();
224224
compactor.start();
225+
226+
final int maxSegments = 100;
227+
if (segments.get().count(Segment::isStatic) > maxSegments)
228+
{
229+
while (true)
230+
{
231+
WaitQueue.Signal signal = compactor.compacted.register();
232+
int count = segments.get().count(Segment::isStatic);
233+
if (count <= maxSegments)
234+
{
235+
signal.cancel();
236+
logger.info("Only {} static segments; continuing with startup", count);
237+
break;
238+
}
239+
else
240+
{
241+
logger.info("Too many ({}) static segments; waiting until some compacted before starting up", count);
242+
signal.awaitThrowUncheckedOnInterrupt();
243+
}
244+
}
245+
}
225246
}
226247

227248
@VisibleForTesting

src/java/org/apache/cassandra/journal/Params.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,11 @@ enum ReplayMode { RESET, ALL, ONLY_NON_DURABLE }
3131
*/
3232
int segmentSize();
3333

34+
/**
35+
* @return maximum number of static segments to compact at once to sstable
36+
*/
37+
int compactMaxSegments();
38+
3439
/**
3540
* @return this journal's {@link FailurePolicy}
3641
*/

src/java/org/apache/cassandra/journal/Segments.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,17 @@ Iterable<Segment<K, V>> all()
102102
return this.segments.values();
103103
}
104104

105+
public int count(Predicate<? super Segment<K, V>> predicate)
106+
{
107+
int count = 0;
108+
for (Segment<K, V> segment : segments.values())
109+
{
110+
if (predicate.test(segment))
111+
++count;
112+
}
113+
return count;
114+
}
115+
105116
/**
106117
* Returns segments in timestamp order. Will allocate and sort the segment collection.
107118
*/

src/java/org/apache/cassandra/service/StartupChecks.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -725,6 +725,12 @@ public void execute(StartupChecksOptions options) throws StartupException
725725
for (TableMetadata cfm : Schema.instance.getTablesAndViews(SchemaConstants.SYSTEM_KEYSPACE_NAME))
726726
ColumnFamilyStore.scrubDataDirectories(cfm);
727727

728+
if (DatabaseDescriptor.getAccordTransactionsEnabled())
729+
{
730+
for (TableMetadata cfm : Schema.instance.getTablesAndViews(SchemaConstants.ACCORD_KEYSPACE_NAME))
731+
ColumnFamilyStore.scrubDataDirectories(cfm);
732+
}
733+
728734
try
729735
{
730736
SystemKeyspace.checkHealth();

src/java/org/apache/cassandra/service/accord/AccordCache.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -259,8 +259,7 @@ private <K, V> void shrinkOrEvict(Lock lock, AccordCacheEntry<K, V> node)
259259
{
260260
//noinspection LockAcquiredButNotSafelyReleased
261261
lock.lock();
262-
node.tryApplyShrink(cur, upd);
263-
queue.addLast(node);
262+
node.tryApplyShrink(cur, upd, queue);
264263
}
265264
}
266265
}

src/java/org/apache/cassandra/service/accord/AccordCacheEntry.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import com.google.common.primitives.Ints;
3131

3232
import accord.utils.ArrayBuffers.BufferList;
33+
import accord.utils.IntrusiveLinkedList;
3334
import accord.utils.IntrusiveLinkedListNode;
3435
import accord.utils.Invariants;
3536
import accord.utils.async.Cancellable;
@@ -595,10 +596,14 @@ public Throwable failure()
595596
return ((FailedToSave)state).cause;
596597
}
597598

598-
void tryApplyShrink(Object cur, Object upd)
599+
void tryApplyShrink(Object cur, Object upd, IntrusiveLinkedList<AccordCacheEntry<?,?>> queue)
599600
{
601+
if (references() > 0 || !isUnqueued())
602+
return;
603+
600604
if (isLoaded() && unwrap() == cur && upd != cur && upd != null)
601605
applyShrink(owner.parent(), cur, upd);
606+
queue.addLast(this);
602607
}
603608

604609
private void applyShrink(AccordCache.Type<K, V, ?> parent, Object cur, Object upd)

0 commit comments

Comments
 (0)