Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions accord-core/src/main/java/accord/impl/SimpleProgressLog.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@

import javax.annotation.Nullable;

import accord.utils.AccordConfig;
import accord.utils.IntrusiveLinkedList;
import accord.utils.IntrusiveLinkedListNode;
import accord.coordinate.*;
Expand Down Expand Up @@ -88,11 +89,13 @@ boolean isAtLeastCommitted()
enum DisseminateStatus { NotExecuted, Durable, Done }

final Node node;
final AccordConfig config;
final List<Instance> instances = new CopyOnWriteArrayList<>();

public SimpleProgressLog(Node node)
public SimpleProgressLog(Node node, AccordConfig config)
{
this.node = node;
this.config = config;
}

class Instance extends IntrusiveLinkedList<Instance.State.Monitoring> implements ProgressLog, Runnable
Expand Down Expand Up @@ -822,7 +825,7 @@ void ensureScheduled()
return;

isScheduled = true;
node.scheduler().once(() -> commandStore.execute(PreLoadContext.empty(), ignore -> run()).begin(commandStore.agent()), 200L, TimeUnit.MILLISECONDS);
node.scheduler().once(() -> commandStore.execute(PreLoadContext.empty(), ignore -> run()).begin(commandStore.agent()), config.progress_log_scheduler_delay_in_ms, TimeUnit.MILLISECONDS);
}

@Override
Expand Down
8 changes: 5 additions & 3 deletions accord-core/src/main/java/accord/local/Node.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.LongSupplier;
Expand All @@ -34,6 +35,7 @@
import accord.messages.*;
import accord.primitives.*;
import accord.primitives.Routable.Domain;
import accord.utils.AccordConfig;
import accord.utils.MapReduceConsume;
import accord.utils.async.AsyncChain;
import accord.utils.async.AsyncResult;
Expand Down Expand Up @@ -131,9 +133,9 @@ public boolean isCoordinating(TxnId txnId, Ballot promised)
// TODO (expected, liveness): monitor the contents of this collection for stalled coordination, and excise them
private final Map<TxnId, AsyncResult<? extends Outcome>> coordinating = new ConcurrentHashMap<>();

public Node(Id id, MessageSink messageSink, ConfigurationService configService, LongSupplier nowSupplier,
public Node(Id id, AccordConfig config, MessageSink messageSink, ConfigurationService configService, LongSupplier nowSupplier,
Supplier<DataStore> dataSupplier, ShardDistributor shardDistributor, Agent agent, RandomSource random, Scheduler scheduler, TopologySorter.Supplier topologySorter,
Function<Node, ProgressLog.Factory> progressLogFactory, CommandStores.Factory factory)
BiFunction<Node, AccordConfig, ProgressLog.Factory> progressLogFactory, CommandStores.Factory factory)
{
this.id = id;
this.messageSink = messageSink;
Expand All @@ -145,7 +147,7 @@ public Node(Id id, MessageSink messageSink, ConfigurationService configService,
this.agent = agent;
this.random = random;
this.scheduler = scheduler;
this.commandStores = factory.create(this, agent, dataSupplier.get(), shardDistributor, progressLogFactory.apply(this));
this.commandStores = factory.create(this, agent, dataSupplier.get(), shardDistributor, progressLogFactory.apply(this, config));

configService.registerListener(this);
onTopologyUpdate(topology, false);
Expand Down
11 changes: 11 additions & 0 deletions accord-core/src/main/java/accord/utils/AccordConfig.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package accord.utils;

public class AccordConfig {
public long progress_log_scheduler_delay_in_ms = 200L;

public AccordConfig(long progress_log_scheduler_delay_in_ms) {
this.progress_log_scheduler_delay_in_ms = progress_log_scheduler_delay_in_ms;
}

public AccordConfig() {}
}
3 changes: 2 additions & 1 deletion accord-core/src/test/java/accord/impl/basic/Cluster.java
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import accord.messages.Request;
import accord.topology.TopologyRandomizer;
import accord.topology.Topology;
import accord.utils.AccordConfig;
import accord.utils.RandomSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -219,7 +220,7 @@ public static void run(Id[] nodes, Supplier<PendingQueue> queueSupplier, Consume
{
MessageSink messageSink = sinks.create(node, randomSupplier.get());
BurnTestConfigurationService configService = new BurnTestConfigurationService(node, messageSink, randomSupplier, topology, lookup::get, topologyUpdates);
lookup.put(node, new Node(node, messageSink, configService, nowSupplier.get(),
lookup.put(node, new Node(node, new AccordConfig(), messageSink, configService, nowSupplier.get(),
() -> new ListStore(node), new ShardDistributor.EvenSplit<>(8, ignore -> new IntHashKey.Splitter()),
new ListAgent(30L, onFailure),
randomSupplier.get(), sinks, SizeOfIntersectionSorter.SUPPLIER,
Expand Down
2 changes: 2 additions & 0 deletions accord-core/src/test/java/accord/impl/mock/MockCluster.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import accord.local.Node.Id;
import accord.local.ShardDistributor;
import accord.primitives.Ranges;
import accord.utils.AccordConfig;
import accord.utils.DefaultRandom;
import accord.utils.EpochFunction;
import accord.utils.RandomSource;
Expand Down Expand Up @@ -105,6 +106,7 @@ private Node createNode(Id id, Topology topology)
MessageSink messageSink = messageSinkFactory.apply(id, this);
MockConfigurationService configurationService = new MockConfigurationService(messageSink, onFetchTopology, topology);
return new Node(id,
new AccordConfig(),
messageSink,
configurationService,
nowSupplier,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@
import accord.primitives.*;
import accord.topology.Topology;
import com.google.common.collect.Lists;

import accord.utils.AccordConfig;
import accord.utils.DefaultRandom;

import org.junit.jupiter.api.Assertions;
Expand Down Expand Up @@ -95,9 +97,9 @@ private static class NoOpProgressLog implements ProgressLog

private static Node createNode(Id id, CommandStoreSupport storeSupport)
{
return new Node(id, null, new MockConfigurationService(null, (epoch, service) -> { }, storeSupport.local.get()),
return new Node(id, new AccordConfig(), null, new MockConfigurationService(null, (epoch, service) -> { }, storeSupport.local.get()),
new MockCluster.Clock(100), () -> storeSupport.data, new ShardDistributor.EvenSplit(8, ignore -> new IntKey.Splitter()), new TestAgent(), new DefaultRandom(), null,
SizeOfIntersectionSorter.SUPPLIER, ignore -> ignore2 -> new NoOpProgressLog(), InMemoryCommandStores.Synchronized::new);
SizeOfIntersectionSorter.SUPPLIER, (ignore, ignore2) -> ignore3 -> new NoOpProgressLog(), InMemoryCommandStores.Synchronized::new);
}

@Test
Expand Down
2 changes: 2 additions & 0 deletions accord-core/src/test/java/accord/messages/PreAcceptTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import accord.impl.mock.MockCluster.Clock;
import accord.primitives.*;
import accord.topology.Topology;
import accord.utils.AccordConfig;
import accord.utils.DefaultRandom;
import accord.utils.EpochFunction;
import accord.utils.ThreadPoolScheduler;
Expand Down Expand Up @@ -69,6 +70,7 @@ private static Node createNode(Id nodeId, MessageSink messageSink, Clock clock)
MockStore store = new MockStore();
Scheduler scheduler = new ThreadPoolScheduler();
return new Node(nodeId,
new AccordConfig(),
messageSink,
new MockConfigurationService(messageSink, EpochFunction.noop(), TOPOLOGY),
clock,
Expand Down
3 changes: 2 additions & 1 deletion accord-maelstrom/src/main/java/accord/maelstrom/Cluster.java
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
import accord.messages.Request;
import accord.api.Scheduler;
import accord.topology.Topology;
import accord.utils.AccordConfig;
import accord.utils.RandomSource;

// TODO (low priority, testing): merge with accord.impl.basic.Cluster
Expand Down Expand Up @@ -307,7 +308,7 @@ public static void run(Id[] nodes, QueueSupplier queueSupplier, Consumer<Packet>
for (Id node : nodes)
{
MessageSink messageSink = sinks.create(node, randomSupplier.get());
lookup.put(node, new Node(node, messageSink, new SimpleConfigService(topology),
lookup.put(node, new Node(node, new AccordConfig(), messageSink, new SimpleConfigService(topology),
nowSupplier.get(), MaelstromStore::new, new ShardDistributor.EvenSplit(8, ignore -> new MaelstromKey.Splitter()),
MaelstromAgent.INSTANCE,
randomSupplier.get(), sinks, SizeOfIntersectionSorter.SUPPLIER,
Expand Down
3 changes: 2 additions & 1 deletion accord-maelstrom/src/main/java/accord/maelstrom/Main.java
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import accord.local.ShardDistributor;
import accord.messages.ReplyContext;
import accord.topology.Topology;
import accord.utils.AccordConfig;
import accord.utils.DefaultRandom;
import accord.utils.ThreadPoolScheduler;
import accord.maelstrom.Packet.Type;
Expand Down Expand Up @@ -160,7 +161,7 @@ public static void listen(TopologyFactory topologyFactory, Supplier<String> in,
MaelstromInit init = (MaelstromInit) packet.body;
topology = topologyFactory.toTopology(init.cluster);
sink = new StdoutSink(System::currentTimeMillis, scheduler, start, init.self, out, err);
on = new Node(init.self, sink, new SimpleConfigService(topology), System::currentTimeMillis,
on = new Node(init.self, new AccordConfig(), sink, new SimpleConfigService(topology), System::currentTimeMillis,
MaelstromStore::new, new ShardDistributor.EvenSplit(8, ignore -> new MaelstromKey.Splitter()),
MaelstromAgent.INSTANCE, new DefaultRandom(), scheduler, SizeOfIntersectionSorter.SUPPLIER,
SimpleProgressLog::new, InMemoryCommandStores.SingleThread::new);
Expand Down