Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ public static AsyncResult<Timestamp> fetchMaxConflict(Node node, Seekables<?, ?>
@Override
void contact(Set<Node.Id> nodes, Topologies topologies, Callback<GetMaxConflictOk> callback)
{
node.send(nodes, to -> new GetMaxConflict(to, topologies, route, keysOrRanges, executionEpoch));
node.send(nodes, to -> new GetMaxConflict(to, topologies, route, keysOrRanges, executionEpoch), callback);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@
import java.util.List;
import java.util.Map;

import accord.local.SafeCommandStore;
import accord.messages.ReadData;
import accord.utils.async.AsyncChain;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -37,7 +40,6 @@
import accord.messages.ReadData.CommitOrReadNack;
import accord.messages.ReadData.ReadOk;
import accord.messages.ReadData.ReadReply;
import accord.messages.WaitUntilAppliedAndReadData;
import accord.primitives.PartialDeps;
import accord.primitives.PartialTxn;
import accord.primitives.Ranges;
Expand All @@ -50,6 +52,7 @@
import accord.utils.async.AsyncResults;
import javax.annotation.Nullable;

import static accord.local.SaveStatus.Applied;
import static accord.messages.ReadData.CommitOrReadNack.Insufficient;
import static accord.primitives.Routables.Slice.Minimal;

Expand Down Expand Up @@ -231,16 +234,37 @@ void abort(Ranges abort)
// TODO (expected): implement abort
}

public static class FetchRequest extends WaitUntilAppliedAndReadData
public static class FetchRequest extends ReadData
{
private static final ExecuteOn EXECUTE_ON = new ExecuteOn(Applied, Applied);
public final PartialTxn read;

public final PartialDeps partialDeps;

public FetchRequest(long sourceEpoch, TxnId syncId, Ranges ranges, PartialDeps partialDeps, PartialTxn partialTxn)
{
super(syncId, ranges, sourceEpoch, partialTxn);
super(syncId, ranges, sourceEpoch);
this.read = partialTxn;
this.partialDeps = partialDeps;
}

@Override
protected ExecuteOn executeOn()
{
return EXECUTE_ON;
}

@Override
public ReadType kind()
{
return ReadType.waitUntilApplied;
}

@Override
protected AsyncChain<Data> beginRead(SafeCommandStore safeStore, Timestamp executeAt, PartialTxn txn, Ranges unavailable) {
return read.read(safeStore, executeAt, unavailable);
}

@Override
protected void readComplete(CommandStore commandStore, Data result, Ranges unavailable)
{
Expand Down
33 changes: 25 additions & 8 deletions accord-core/src/main/java/accord/local/SerializerSupport.java
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ public static Command reconstruct(RangesForEpoch rangesForEpoch, Mutable attrs,
private static Command.PreAccepted preAccepted(RangesForEpoch rangesForEpoch, Mutable attrs, Timestamp executeAt, Ballot promised, MessageProvider messageProvider)
{
Set<MessageType> witnessed = messageProvider.test(PRE_ACCEPT_TYPES);
checkState(!witnessed.isEmpty());
checkState(!witnessed.isEmpty(), "PreAccepted message types not witnessed; witnessed is ", new LoggedMessageProvider(messageProvider));
attrs.partialTxn(txnFromPreAcceptOrBeginRecover(rangesForEpoch, witnessed, messageProvider));
return Command.PreAccepted.preAccepted(attrs, executeAt, promised);
}
Expand Down Expand Up @@ -225,7 +225,7 @@ private static <I, O> O extract(RangesForEpoch rangesForEpoch, SaveStatus status
{
case PreAccepted:
witnessed = messageProvider.test(PRE_ACCEPT_TYPES);
checkState(!witnessed.isEmpty());
checkState(!witnessed.isEmpty(), "Unable to find PreAccept types; witnessed %s", new LoggedMessageProvider(messageProvider));
return withContents.apply(param, txnFromPreAcceptOrBeginRecover(rangesForEpoch, witnessed, messageProvider), null, null, null);

case AcceptedInvalidate:
Expand All @@ -237,7 +237,7 @@ private static <I, O> O extract(RangesForEpoch rangesForEpoch, SaveStatus status
if (status.known.isDefinitionKnown())
{
witnessed = messageProvider.test(PRE_ACCEPT_TYPES);
checkState(!witnessed.isEmpty());
checkState(!witnessed.isEmpty(), "Unable to find PreAccept types; witnessed %s", new LoggedMessageProvider(messageProvider));
txn = txnFromPreAcceptOrBeginRecover(rangesForEpoch, witnessed, messageProvider);
}

Expand All @@ -258,7 +258,7 @@ private static <I, O> O extract(RangesForEpoch rangesForEpoch, SaveStatus status
}
else
{
Invariants.checkState(witnessed.contains(COMMIT_SLOW_PATH_REQ));
Invariants.checkState(witnessed.contains(COMMIT_SLOW_PATH_REQ), "Unable to find COMMIT_SLOW_PATH_REQ; witnessed %s", new LoggedMessageProvider(messageProvider));
commit = messageProvider.commitSlowPath();
}

Expand All @@ -284,14 +284,14 @@ else if (witnessed.contains(STABLE_FAST_PATH_REQ))
}
else
{
checkState(witnessed.contains(STABLE_SLOW_PATH_REQ));
checkState(witnessed.contains(STABLE_SLOW_PATH_REQ), "Unable to find STABLE_SLOW_PATH_REQ; witnessed %s", new LoggedMessageProvider(messageProvider));
if (witnessed.contains(COMMIT_SLOW_PATH_REQ))
{
commit = messageProvider.commitSlowPath();
}
else
{
checkState(witnessed.contains(COMMIT_MAXIMAL_REQ));
checkState(witnessed.contains(COMMIT_MAXIMAL_REQ), "Unable to find COMMIT_MAXIMAL_REQ; witnessed %s", new LoggedMessageProvider(messageProvider));
commit = messageProvider.commitMaximal();
}
}
Expand All @@ -316,7 +316,7 @@ else if (witnessed.contains(PROPAGATE_APPLY_MSG))
}
else
{
checkState(witnessed.contains(APPLY_MINIMAL_REQ));
checkState(witnessed.contains(APPLY_MINIMAL_REQ), "Unable to find APPLY_MINIMAL_REQ; witnessed %s", new LoggedMessageProvider(messageProvider));
Apply apply = messageProvider.applyMinimal();
Commit commit;
if (witnessed.contains(STABLE_MAXIMAL_REQ))
Expand All @@ -338,7 +338,7 @@ else if (witnessed.contains(STABLE_FAST_PATH_REQ))
}
else
{
throw illegalState("Invalid state: insufficient stable or commit messages found to reconstruct PreApplied or greater SaveStatus");
throw illegalState("Invalid state: insufficient stable or commit messages found to reconstruct PreApplied or greater SaveStatus; witnessed " + witnessed);
}

return sliceAndApply(rangesForEpoch, messageProvider, witnessed, commit, withContents, param, apply.writes, apply.result);
Expand Down Expand Up @@ -422,6 +422,7 @@ public interface WaitingOnProvider
public interface MessageProvider
{
Set<MessageType> test(Set<MessageType> messages);
Set<MessageType> all();

PreAccept preAccept();

Expand All @@ -447,4 +448,20 @@ public interface MessageProvider

Propagate propagateApply();
}

private static class LoggedMessageProvider
{
private final MessageProvider messageProvider;

private LoggedMessageProvider(MessageProvider messageProvider)
{
this.messageProvider = messageProvider;
}

@Override
public String toString()
{
return messageProvider.all().toString();
}
}
}
4 changes: 2 additions & 2 deletions accord-core/src/main/java/accord/messages/GetMaxConflict.java
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ public void accept(GetMaxConflictOk result, Throwable failure)
@Override
public MessageType type()
{
return MessageType.GET_EPHEMERAL_READ_DEPS_REQ;
return MessageType.GET_MAX_CONFLICT_REQ;
}

@Override
Expand Down Expand Up @@ -137,7 +137,7 @@ public String toString()
@Override
public MessageType type()
{
return MessageType.GET_EPHEMERAL_READ_DEPS_RSP;
return MessageType.GET_MAX_CONFLICT_RSP;
}
}
}
2 changes: 2 additions & 0 deletions accord-core/src/main/java/accord/messages/MessageType.java
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ public class MessageType
public static final MessageType GET_DEPS_RSP = remote("GET_DEPS_RSP", false);
public static final MessageType GET_EPHEMERAL_READ_DEPS_REQ = remote("GET_EPHEMERAL_READ_DEPS_REQ", false);
public static final MessageType GET_EPHEMERAL_READ_DEPS_RSP = remote("GET_EPHEMERAL_READ_DEPS_RSP", false);
public static final MessageType GET_MAX_CONFLICT_REQ = remote("GET_MAX_CONFLICT_REQ", false);
public static final MessageType GET_MAX_CONFLICT_RSP = remote("GET_MAX_CONFLICT_RSP", false);
public static final MessageType COMMIT_SLOW_PATH_REQ = remote("COMMIT_SLOW_PATH_REQ", true);
public static final MessageType COMMIT_MAXIMAL_REQ = remote("COMMIT_MAXIMAL_REQ", true );
public static final MessageType STABLE_FAST_PATH_REQ = remote("STABLE_FAST_PATH_REQ", true);
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@
import accord.utils.async.AsyncResult;
import accord.utils.async.AsyncResults;

import javax.annotation.Nullable;

import static accord.coordinate.tracking.RequestStatus.Success;
import static accord.primitives.AbstractRanges.UnionMode.MERGE_ADJACENT;
import static accord.primitives.Routables.Slice.Minimal;
Expand Down Expand Up @@ -344,6 +346,7 @@ private Notifications pending(long epoch)
return pending.get(idx);
}

@Nullable
private EpochState get(long epoch)
{
int index = indexOf(epoch);
Expand Down Expand Up @@ -587,7 +590,9 @@ public Topologies preciseEpochs(Unseekables<?> select, long minEpoch, long maxEp
{
Epochs snapshot = epochs;

TopologyMismatch tm = TopologyMismatch.checkForMismatch(snapshot.get(maxEpoch).global(), select);
EpochState maxState = snapshot.get(maxEpoch);
Invariants.checkState(maxState != null, "Unable to find epoch %d; known epochs are %d -> %d", maxEpoch, snapshot.minEpoch(), snapshot.currentEpoch);
TopologyMismatch tm = TopologyMismatch.checkForMismatch(maxState.global(), select);
if (tm != null)
throw tm;

Expand Down
7 changes: 6 additions & 1 deletion accord-core/src/main/java/accord/utils/Invariants.java
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,14 @@ public static boolean debug()
return DEBUG;
}

public static IllegalStateException createIllegalState(String msg)
{
return new IllegalStateException(msg);
}

public static IllegalStateException illegalState(String msg)
{
throw new IllegalStateException(msg);
throw createIllegalState(msg);
}

public static IllegalStateException illegalState()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import accord.api.VisibleForImplementation;
import accord.utils.Invariants;

import static accord.utils.Invariants.createIllegalState;
import static accord.utils.Invariants.illegalState;

public class AsyncResults
Expand Down Expand Up @@ -95,7 +96,7 @@ private void notify(Listener<V> listener, Result<V> result)
}
if (failures != null)
{
IllegalStateException f = illegalState("Callbacks threw");
IllegalStateException f = createIllegalState("Callbacks threw");
failures.forEach(f::addSuppressed);
throw f;
}
Expand Down Expand Up @@ -147,7 +148,7 @@ void setResult(V result, Throwable failure)
{
if (!trySetResult(result, failure))
{
IllegalStateException f = illegalState("Result has already been set on " + this);
IllegalStateException f = createIllegalState("Result has already been set on " + this);
if (failure != null)
f.addSuppressed(failure);
throw f;
Expand Down Expand Up @@ -226,7 +227,7 @@ public Throwable failure()
{
Result<V> result = getResult();
if (result.failure == null)
throw illegalState("Result succeeded");
illegalState("Result succeeded");
return result.failure;
}

Expand Down
4 changes: 4 additions & 0 deletions accord-core/src/test/java/accord/impl/list/ListRead.java
Original file line number Diff line number Diff line change
Expand Up @@ -78,13 +78,17 @@ public AsyncChain<Data> read(Seekable key, SafeCommandStore safeStore, Timestamp
{
default: throw new AssertionError();
case Key:
if (!keys.contains((Key)key))
throw new IllegalArgumentException("Attempted to read key " + key + " which is outside of the expected range " + keys);
Timestamped<int[]> data = s.get(unavailable, executeAt, (Key)key);
logger.trace("READ on {} at {} key:{} -> {}", s.node, executeAt, key, data);
Invariants.checkState(isEphemeralRead || data.timestamp.compareTo(executeAt) < 0,
"Data timestamp %s >= execute at %s", data.timestamp, executeAt);
result.put((Key)key, data);
break;
case Range:
if (!keys.containsAll(Ranges.single((Range)key)))
throw new IllegalArgumentException("Attempted to read range " + key + " which is outside of the expected range " + keys);
for (Map.Entry<Key, Timestamped<int[]>> e : s.get(unavailable, executeAt, (Range)key))
result.put(e.getKey(), e.getValue());
}
Expand Down