diff --git a/accord-core/src/main/java/accord/impl/SimpleProgressLog.java b/accord-core/src/main/java/accord/impl/SimpleProgressLog.java index f768115ef0..d5135e2dbf 100644 --- a/accord-core/src/main/java/accord/impl/SimpleProgressLog.java +++ b/accord-core/src/main/java/accord/impl/SimpleProgressLog.java @@ -29,6 +29,7 @@ import javax.annotation.Nullable; +import accord.utils.AccordConfig; import accord.utils.IntrusiveLinkedList; import accord.utils.IntrusiveLinkedListNode; import accord.coordinate.*; @@ -88,11 +89,13 @@ boolean isAtLeastCommitted() enum DisseminateStatus { NotExecuted, Durable, Done } final Node node; + final AccordConfig config; final List instances = new CopyOnWriteArrayList<>(); - public SimpleProgressLog(Node node) + public SimpleProgressLog(Node node, AccordConfig config) { this.node = node; + this.config = config; } class Instance extends IntrusiveLinkedList implements ProgressLog, Runnable @@ -822,7 +825,7 @@ void ensureScheduled() return; isScheduled = true; - node.scheduler().once(() -> commandStore.execute(PreLoadContext.empty(), ignore -> run()).begin(commandStore.agent()), 200L, TimeUnit.MILLISECONDS); + node.scheduler().once(() -> commandStore.execute(PreLoadContext.empty(), ignore -> run()).begin(commandStore.agent()), config.progress_log_scheduler_delay_in_ms, TimeUnit.MILLISECONDS); } @Override diff --git a/accord-core/src/main/java/accord/local/Node.java b/accord-core/src/main/java/accord/local/Node.java index fbf5987457..acdb8aad9a 100644 --- a/accord-core/src/main/java/accord/local/Node.java +++ b/accord-core/src/main/java/accord/local/Node.java @@ -25,6 +25,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicReference; import java.util.function.BiConsumer; +import java.util.function.BiFunction; import java.util.function.Consumer; import java.util.function.Function; import java.util.function.LongSupplier; @@ -34,6 +35,7 @@ import accord.messages.*; import accord.primitives.*; import accord.primitives.Routable.Domain; +import accord.utils.AccordConfig; import accord.utils.MapReduceConsume; import accord.utils.async.AsyncChain; import accord.utils.async.AsyncResult; @@ -131,9 +133,9 @@ public boolean isCoordinating(TxnId txnId, Ballot promised) // TODO (expected, liveness): monitor the contents of this collection for stalled coordination, and excise them private final Map> coordinating = new ConcurrentHashMap<>(); - public Node(Id id, MessageSink messageSink, ConfigurationService configService, LongSupplier nowSupplier, + public Node(Id id, AccordConfig config, MessageSink messageSink, ConfigurationService configService, LongSupplier nowSupplier, Supplier dataSupplier, ShardDistributor shardDistributor, Agent agent, RandomSource random, Scheduler scheduler, TopologySorter.Supplier topologySorter, - Function progressLogFactory, CommandStores.Factory factory) + BiFunction progressLogFactory, CommandStores.Factory factory) { this.id = id; this.messageSink = messageSink; @@ -145,7 +147,7 @@ public Node(Id id, MessageSink messageSink, ConfigurationService configService, this.agent = agent; this.random = random; this.scheduler = scheduler; - this.commandStores = factory.create(this, agent, dataSupplier.get(), shardDistributor, progressLogFactory.apply(this)); + this.commandStores = factory.create(this, agent, dataSupplier.get(), shardDistributor, progressLogFactory.apply(this, config)); configService.registerListener(this); onTopologyUpdate(topology, false); diff --git a/accord-core/src/main/java/accord/utils/AccordConfig.java b/accord-core/src/main/java/accord/utils/AccordConfig.java new file mode 100644 index 0000000000..deff689e15 --- /dev/null +++ b/accord-core/src/main/java/accord/utils/AccordConfig.java @@ -0,0 +1,11 @@ +package accord.utils; + +public class AccordConfig { + public long progress_log_scheduler_delay_in_ms = 200L; + + public AccordConfig(long progress_log_scheduler_delay_in_ms) { + this.progress_log_scheduler_delay_in_ms = progress_log_scheduler_delay_in_ms; + } + + public AccordConfig() {} +} diff --git a/accord-core/src/test/java/accord/impl/basic/Cluster.java b/accord-core/src/test/java/accord/impl/basic/Cluster.java index 2898303396..7c2641e196 100644 --- a/accord-core/src/test/java/accord/impl/basic/Cluster.java +++ b/accord-core/src/test/java/accord/impl/basic/Cluster.java @@ -49,6 +49,7 @@ import accord.messages.Request; import accord.topology.TopologyRandomizer; import accord.topology.Topology; +import accord.utils.AccordConfig; import accord.utils.RandomSource; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -219,7 +220,7 @@ public static void run(Id[] nodes, Supplier queueSupplier, Consume { MessageSink messageSink = sinks.create(node, randomSupplier.get()); BurnTestConfigurationService configService = new BurnTestConfigurationService(node, messageSink, randomSupplier, topology, lookup::get, topologyUpdates); - lookup.put(node, new Node(node, messageSink, configService, nowSupplier.get(), + lookup.put(node, new Node(node, new AccordConfig(), messageSink, configService, nowSupplier.get(), () -> new ListStore(node), new ShardDistributor.EvenSplit<>(8, ignore -> new IntHashKey.Splitter()), new ListAgent(30L, onFailure), randomSupplier.get(), sinks, SizeOfIntersectionSorter.SUPPLIER, diff --git a/accord-core/src/test/java/accord/impl/mock/MockCluster.java b/accord-core/src/test/java/accord/impl/mock/MockCluster.java index 2a8886427b..c3d4885e9d 100644 --- a/accord-core/src/test/java/accord/impl/mock/MockCluster.java +++ b/accord-core/src/test/java/accord/impl/mock/MockCluster.java @@ -26,6 +26,7 @@ import accord.local.Node.Id; import accord.local.ShardDistributor; import accord.primitives.Ranges; +import accord.utils.AccordConfig; import accord.utils.DefaultRandom; import accord.utils.EpochFunction; import accord.utils.RandomSource; @@ -105,6 +106,7 @@ private Node createNode(Id id, Topology topology) MessageSink messageSink = messageSinkFactory.apply(id, this); MockConfigurationService configurationService = new MockConfigurationService(messageSink, onFetchTopology, topology); return new Node(id, + new AccordConfig(), messageSink, configurationService, nowSupplier, diff --git a/accord-core/src/test/java/accord/local/ImmutableCommandTest.java b/accord-core/src/test/java/accord/local/ImmutableCommandTest.java index be0df4d384..1af9741373 100644 --- a/accord-core/src/test/java/accord/local/ImmutableCommandTest.java +++ b/accord-core/src/test/java/accord/local/ImmutableCommandTest.java @@ -30,6 +30,8 @@ import accord.primitives.*; import accord.topology.Topology; import com.google.common.collect.Lists; + +import accord.utils.AccordConfig; import accord.utils.DefaultRandom; import org.junit.jupiter.api.Assertions; @@ -95,9 +97,9 @@ private static class NoOpProgressLog implements ProgressLog private static Node createNode(Id id, CommandStoreSupport storeSupport) { - return new Node(id, null, new MockConfigurationService(null, (epoch, service) -> { }, storeSupport.local.get()), + return new Node(id, new AccordConfig(), null, new MockConfigurationService(null, (epoch, service) -> { }, storeSupport.local.get()), new MockCluster.Clock(100), () -> storeSupport.data, new ShardDistributor.EvenSplit(8, ignore -> new IntKey.Splitter()), new TestAgent(), new DefaultRandom(), null, - SizeOfIntersectionSorter.SUPPLIER, ignore -> ignore2 -> new NoOpProgressLog(), InMemoryCommandStores.Synchronized::new); + SizeOfIntersectionSorter.SUPPLIER, (ignore, ignore2) -> ignore3 -> new NoOpProgressLog(), InMemoryCommandStores.Synchronized::new); } @Test diff --git a/accord-core/src/test/java/accord/messages/PreAcceptTest.java b/accord-core/src/test/java/accord/messages/PreAcceptTest.java index ae339a1ec1..8401b918c7 100644 --- a/accord-core/src/test/java/accord/messages/PreAcceptTest.java +++ b/accord-core/src/test/java/accord/messages/PreAcceptTest.java @@ -31,6 +31,7 @@ import accord.impl.mock.MockCluster.Clock; import accord.primitives.*; import accord.topology.Topology; +import accord.utils.AccordConfig; import accord.utils.DefaultRandom; import accord.utils.EpochFunction; import accord.utils.ThreadPoolScheduler; @@ -69,6 +70,7 @@ private static Node createNode(Id nodeId, MessageSink messageSink, Clock clock) MockStore store = new MockStore(); Scheduler scheduler = new ThreadPoolScheduler(); return new Node(nodeId, + new AccordConfig(), messageSink, new MockConfigurationService(messageSink, EpochFunction.noop(), TOPOLOGY), clock, diff --git a/accord-maelstrom/src/main/java/accord/maelstrom/Cluster.java b/accord-maelstrom/src/main/java/accord/maelstrom/Cluster.java index 74de7eb176..4afc00edaf 100644 --- a/accord-maelstrom/src/main/java/accord/maelstrom/Cluster.java +++ b/accord-maelstrom/src/main/java/accord/maelstrom/Cluster.java @@ -53,6 +53,7 @@ import accord.messages.Request; import accord.api.Scheduler; import accord.topology.Topology; +import accord.utils.AccordConfig; import accord.utils.RandomSource; // TODO (low priority, testing): merge with accord.impl.basic.Cluster @@ -307,7 +308,7 @@ public static void run(Id[] nodes, QueueSupplier queueSupplier, Consumer for (Id node : nodes) { MessageSink messageSink = sinks.create(node, randomSupplier.get()); - lookup.put(node, new Node(node, messageSink, new SimpleConfigService(topology), + lookup.put(node, new Node(node, new AccordConfig(), messageSink, new SimpleConfigService(topology), nowSupplier.get(), MaelstromStore::new, new ShardDistributor.EvenSplit(8, ignore -> new MaelstromKey.Splitter()), MaelstromAgent.INSTANCE, randomSupplier.get(), sinks, SizeOfIntersectionSorter.SUPPLIER, diff --git a/accord-maelstrom/src/main/java/accord/maelstrom/Main.java b/accord-maelstrom/src/main/java/accord/maelstrom/Main.java index 955645c83c..51b4347e6f 100644 --- a/accord-maelstrom/src/main/java/accord/maelstrom/Main.java +++ b/accord-maelstrom/src/main/java/accord/maelstrom/Main.java @@ -40,6 +40,7 @@ import accord.local.ShardDistributor; import accord.messages.ReplyContext; import accord.topology.Topology; +import accord.utils.AccordConfig; import accord.utils.DefaultRandom; import accord.utils.ThreadPoolScheduler; import accord.maelstrom.Packet.Type; @@ -160,7 +161,7 @@ public static void listen(TopologyFactory topologyFactory, Supplier in, MaelstromInit init = (MaelstromInit) packet.body; topology = topologyFactory.toTopology(init.cluster); sink = new StdoutSink(System::currentTimeMillis, scheduler, start, init.self, out, err); - on = new Node(init.self, sink, new SimpleConfigService(topology), System::currentTimeMillis, + on = new Node(init.self, new AccordConfig(), sink, new SimpleConfigService(topology), System::currentTimeMillis, MaelstromStore::new, new ShardDistributor.EvenSplit(8, ignore -> new MaelstromKey.Splitter()), MaelstromAgent.INSTANCE, new DefaultRandom(), scheduler, SizeOfIntersectionSorter.SUPPLIER, SimpleProgressLog::new, InMemoryCommandStores.SingleThread::new);