CEP-15: Add Accord configuration stub
Patch by Jacek Lewandowski; reviewed by David Capwell for CASSANDRA-18221
diff --git a/accord-core/src/main/java/accord/config/LocalConfig.java b/accord-core/src/main/java/accord/config/LocalConfig.java
new file mode 100644
index 0000000..0ced3b9
--- /dev/null
+++ b/accord-core/src/main/java/accord/config/LocalConfig.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package accord.config;
+
+import java.time.Duration;
+
+public interface LocalConfig
+{
+ default Duration getProgressLogScheduleDelay()
+ {
+ return Duration.ofSeconds(1);
+ }
+}
diff --git a/accord-core/src/main/java/accord/config/MutableLocalConfig.java b/accord-core/src/main/java/accord/config/MutableLocalConfig.java
new file mode 100644
index 0000000..f3c4278
--- /dev/null
+++ b/accord-core/src/main/java/accord/config/MutableLocalConfig.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package accord.config;
+
+import java.time.Duration;
+
+import com.google.common.annotations.VisibleForTesting;
+
+@VisibleForTesting
+public class MutableLocalConfig implements LocalConfig
+{
+ private volatile Duration progressLogScheduleDelay = LocalConfig.super.getProgressLogScheduleDelay();
+
+ public void setProgressLogScheduleDelay(Duration progressLogScheduleDelay)
+ {
+ this.progressLogScheduleDelay = progressLogScheduleDelay;
+ }
+
+ @Override
+ public Duration getProgressLogScheduleDelay()
+ {
+ return progressLogScheduleDelay;
+ }
+}
diff --git a/accord-core/src/main/java/accord/impl/SimpleProgressLog.java b/accord-core/src/main/java/accord/impl/SimpleProgressLog.java
index 89291a1..420d54d 100644
--- a/accord-core/src/main/java/accord/impl/SimpleProgressLog.java
+++ b/accord-core/src/main/java/accord/impl/SimpleProgressLog.java
@@ -18,6 +18,7 @@
package accord.impl;
+import java.time.Duration;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
@@ -679,7 +680,8 @@
return;
isScheduled = true;
- node.scheduler().once(() -> commandStore.execute(empty(), ignore -> run()).begin(commandStore.agent()), 1L, TimeUnit.SECONDS);
+ Duration delay = node.localConfig().getProgressLogScheduleDelay();
+ node.scheduler().once(() -> commandStore.execute(empty(), ignore -> run()).begin(commandStore.agent()), delay.toNanos(), TimeUnit.NANOSECONDS);
}
@Override
diff --git a/accord-core/src/main/java/accord/local/Node.java b/accord-core/src/main/java/accord/local/Node.java
index 8e4237f..f0ee2bd 100644
--- a/accord-core/src/main/java/accord/local/Node.java
+++ b/accord-core/src/main/java/accord/local/Node.java
@@ -49,6 +49,7 @@
import accord.api.RoutingKey;
import accord.api.Scheduler;
import accord.api.TopologySorter;
+import accord.config.LocalConfig;
import accord.coordinate.CoordinateTransaction;
import accord.coordinate.MaybeRecover;
import accord.coordinate.Outcome;
@@ -145,6 +146,7 @@
private final AtomicReference<Timestamp> now;
private final Agent agent;
private final RandomSource random;
+ private final LocalConfig localConfig;
// TODO (expected, consider): this really needs to be thought through some more, as it needs to be per-instance in some cases, and per-node in others
private final Scheduler scheduler;
@@ -155,9 +157,10 @@
public Node(Id id, MessageSink messageSink, LocalMessage.Handler localMessageHandler,
ConfigurationService configService, LongSupplier nowSupplier, ToLongFunction<TimeUnit> nowTimeUnit,
Supplier<DataStore> dataSupplier, ShardDistributor shardDistributor, Agent agent, RandomSource random, Scheduler scheduler, TopologySorter.Supplier topologySorter,
- Function<Node, ProgressLog.Factory> progressLogFactory, CommandStores.Factory factory)
+ Function<Node, ProgressLog.Factory> progressLogFactory, CommandStores.Factory factory, LocalConfig localConfig)
{
this.id = id;
+ this.localConfig = localConfig;
this.messageSink = messageSink;
this.localMessageHandler = localMessageHandler;
this.configService = configService;
@@ -173,6 +176,11 @@
configService.registerListener(this);
}
+ public LocalConfig localConfig()
+ {
+ return localConfig;
+ }
+
/**
* This starts the node for tests and makes sure that the provided topology is acknowledged correctly. This method is not
* safe for production systems as it doesn't handle restarts and partially acknowledged histories
diff --git a/accord-core/src/test/java/accord/Utils.java b/accord-core/src/test/java/accord/Utils.java
index 77a8bd8..6d42388 100644
--- a/accord-core/src/test/java/accord/Utils.java
+++ b/accord-core/src/test/java/accord/Utils.java
@@ -27,6 +27,7 @@
import accord.api.MessageSink;
import accord.api.Scheduler;
+import accord.config.LocalConfig;
import accord.impl.InMemoryCommandStores;
import accord.impl.IntKey;
import accord.impl.SimpleProgressLog;
@@ -38,6 +39,7 @@
import accord.local.Node;
import accord.local.NodeTimeService;
import accord.local.ShardDistributor;
+import accord.config.MutableLocalConfig;
import accord.messages.LocalMessage;
import accord.primitives.Keys;
import accord.primitives.Range;
@@ -140,6 +142,7 @@
{
MockStore store = new MockStore();
Scheduler scheduler = new ThreadPoolScheduler();
+ LocalConfig localConfig = new MutableLocalConfig();
Node node = new Node(nodeId,
messageSink,
LocalMessage::process,
@@ -153,7 +156,8 @@
scheduler,
SizeOfIntersectionSorter.SUPPLIER,
SimpleProgressLog::new,
- InMemoryCommandStores.Synchronized::new);
+ InMemoryCommandStores.Synchronized::new,
+ localConfig);
awaitUninterruptibly(node.unsafeStart());
return node;
}
diff --git a/accord-core/src/test/java/accord/impl/basic/Cluster.java b/accord-core/src/test/java/accord/impl/basic/Cluster.java
index a7eff1c..efeadfe 100644
--- a/accord-core/src/test/java/accord/impl/basic/Cluster.java
+++ b/accord-core/src/test/java/accord/impl/basic/Cluster.java
@@ -35,6 +35,7 @@
import java.util.function.LongSupplier;
import java.util.function.Supplier;
+import accord.config.LocalConfig;
import accord.impl.MessageListener;
import org.junit.jupiter.api.Assertions;
import org.slf4j.Logger;
@@ -55,6 +56,7 @@
import accord.local.Node.Id;
import accord.local.NodeTimeService;
import accord.local.ShardDistributor;
+import accord.config.MutableLocalConfig;
import accord.messages.LocalMessage;
import accord.messages.MessageType;
import accord.messages.Message;
@@ -229,11 +231,12 @@
MessageSink messageSink = sinks.create(id, randomSupplier.get());
LongSupplier nowSupplier = nowSupplierSupplier.get();
BurnTestConfigurationService configService = new BurnTestConfigurationService(id, executor, randomSupplier, topology, lookup::get, topologyUpdates);
+ LocalConfig localConfig = new MutableLocalConfig();
Node node = new Node(id, messageSink, LocalMessage::process, configService, nowSupplier, NodeTimeService.unixWrapper(TimeUnit.MILLISECONDS, nowSupplier),
() -> new ListStore(id), new ShardDistributor.EvenSplit<>(8, ignore -> new IntHashKey.Splitter()),
executor.agent(),
randomSupplier.get(), sinks, SizeOfIntersectionSorter.SUPPLIER,
- SimpleProgressLog::new, DelayedCommandStores.factory(sinks.pending));
+ SimpleProgressLog::new, DelayedCommandStores.factory(sinks.pending), localConfig);
lookup.put(id, node);
CoordinateDurabilityScheduling durability = new CoordinateDurabilityScheduling(node);
// TODO (desired): randomise
diff --git a/accord-core/src/test/java/accord/impl/mock/MockCluster.java b/accord-core/src/test/java/accord/impl/mock/MockCluster.java
index fdd8a91..4b3d198 100644
--- a/accord-core/src/test/java/accord/impl/mock/MockCluster.java
+++ b/accord-core/src/test/java/accord/impl/mock/MockCluster.java
@@ -34,6 +34,7 @@
import accord.NetworkFilter;
import accord.api.MessageSink;
+import accord.config.LocalConfig;
import accord.impl.InMemoryCommandStores;
import accord.impl.IntKey;
import accord.impl.SimpleProgressLog;
@@ -45,6 +46,7 @@
import accord.local.Node.Id;
import accord.local.NodeTimeService;
import accord.local.ShardDistributor;
+import accord.config.MutableLocalConfig;
import accord.messages.Callback;
import accord.messages.LocalMessage;
import accord.messages.Reply;
@@ -119,6 +121,7 @@
MockStore store = new MockStore();
MessageSink messageSink = messageSinkFactory.apply(id, this);
MockConfigurationService configurationService = new MockConfigurationService(messageSink, onFetchTopology, topology);
+ LocalConfig localConfig = new MutableLocalConfig();
Node node = new Node(id,
messageSink,
LocalMessage::process,
@@ -132,7 +135,8 @@
new ThreadPoolScheduler(),
SizeOfIntersectionSorter.SUPPLIER,
SimpleProgressLog::new,
- InMemoryCommandStores.SingleThread::new);
+ InMemoryCommandStores.SingleThread::new,
+ localConfig);
awaitUninterruptibly(node.unsafeStart());
node.onTopologyUpdate(topology, true);
return node;
diff --git a/accord-core/src/test/java/accord/local/ImmutableCommandTest.java b/accord-core/src/test/java/accord/local/ImmutableCommandTest.java
index 15061ea..ef573d1 100644
--- a/accord-core/src/test/java/accord/local/ImmutableCommandTest.java
+++ b/accord-core/src/test/java/accord/local/ImmutableCommandTest.java
@@ -32,6 +32,7 @@
import accord.api.RoutingKey;
import accord.api.Scheduler;
import accord.api.TestableConfigurationService;
+import accord.config.LocalConfig;
import accord.impl.InMemoryCommandStore;
import accord.impl.InMemoryCommandStores;
import accord.impl.IntKey;
@@ -43,6 +44,7 @@
import accord.impl.mock.MockStore;
import accord.local.Node.Id;
import accord.local.SaveStatus.LocalExecution;
+import accord.config.MutableLocalConfig;
import accord.primitives.FullKeyRoute;
import accord.primitives.Keys;
import accord.primitives.Participants;
@@ -109,10 +111,11 @@
private static Node createNode(Id id, CommandStoreSupport storeSupport)
{
MockCluster.Clock clock = new MockCluster.Clock(100);
+ LocalConfig localConfig = new MutableLocalConfig();
Node node = new Node(id, null, null, new MockConfigurationService(null, (epoch, service) -> { }, storeSupport.local.get()),
clock, NodeTimeService.unixWrapper(TimeUnit.MICROSECONDS, clock),
() -> storeSupport.data, new ShardDistributor.EvenSplit(8, ignore -> new IntKey.Splitter()), new TestAgent(), new DefaultRandom(), Scheduler.NEVER_RUN_SCHEDULED,
- SizeOfIntersectionSorter.SUPPLIER, ignore -> ignore2 -> new NoOpProgressLog(), InMemoryCommandStores.Synchronized::new);
+ SizeOfIntersectionSorter.SUPPLIER, ignore -> ignore2 -> new NoOpProgressLog(), InMemoryCommandStores.Synchronized::new, localConfig);
awaitUninterruptibly(node.unsafeStart());
node.onTopologyUpdate(storeSupport.local.get(), true);
return node;
diff --git a/accord-maelstrom/src/main/java/accord/maelstrom/Cluster.java b/accord-maelstrom/src/main/java/accord/maelstrom/Cluster.java
index 1af7d4c..0184765 100644
--- a/accord-maelstrom/src/main/java/accord/maelstrom/Cluster.java
+++ b/accord-maelstrom/src/main/java/accord/maelstrom/Cluster.java
@@ -41,6 +41,8 @@
import accord.api.MessageSink;
import accord.api.Scheduler;
+import accord.config.LocalConfig;
+import accord.config.MutableLocalConfig;
import accord.impl.InMemoryCommandStores;
import accord.impl.SimpleProgressLog;
import accord.impl.SizeOfIntersectionSorter;
@@ -314,12 +316,13 @@
{
MessageSink messageSink = sinks.create(node, randomSupplier.get());
LongSupplier nowSupplier = nowSupplierSupplier.get();
+ LocalConfig localConfig = new MutableLocalConfig();
lookup.put(node, new Node(node, messageSink, LocalMessage::process, new SimpleConfigService(topology),
nowSupplier, NodeTimeService.unixWrapper(TimeUnit.MICROSECONDS, nowSupplier),
MaelstromStore::new, new ShardDistributor.EvenSplit(8, ignore -> new MaelstromKey.Splitter()),
MaelstromAgent.INSTANCE,
randomSupplier.get(), sinks, SizeOfIntersectionSorter.SUPPLIER,
- SimpleProgressLog::new, InMemoryCommandStores.SingleThread::new));
+ SimpleProgressLog::new, InMemoryCommandStores.SingleThread::new, localConfig));
}
AsyncResult<?> startup = AsyncChains.reduce(lookup.values().stream().map(Node::unsafeStart).collect(toList()), (a, b) -> null).beginAsResult();
diff --git a/accord-maelstrom/src/main/java/accord/maelstrom/Main.java b/accord-maelstrom/src/main/java/accord/maelstrom/Main.java
index 871123a..40e73ff 100644
--- a/accord-maelstrom/src/main/java/accord/maelstrom/Main.java
+++ b/accord-maelstrom/src/main/java/accord/maelstrom/Main.java
@@ -32,6 +32,8 @@
import accord.api.MessageSink;
import accord.api.Scheduler;
+import accord.config.LocalConfig;
+import accord.config.MutableLocalConfig;
import accord.coordinate.Timeout;
import accord.impl.InMemoryCommandStores;
import accord.impl.SimpleProgressLog;
@@ -173,11 +175,12 @@
MaelstromInit init = (MaelstromInit) packet.body;
topology = topologyFactory.toTopology(init.cluster);
sink = new StdoutSink(System::currentTimeMillis, scheduler, start, init.self, out, err);
+ LocalConfig localConfig = new MutableLocalConfig();
on = new Node(init.self, sink, LocalMessage::process, new SimpleConfigService(topology),
System::currentTimeMillis, NodeTimeService.unixWrapper(TimeUnit.MILLISECONDS, System::currentTimeMillis),
MaelstromStore::new, new ShardDistributor.EvenSplit(8, ignore -> new MaelstromKey.Splitter()),
MaelstromAgent.INSTANCE, new DefaultRandom(), scheduler, SizeOfIntersectionSorter.SUPPLIER,
- SimpleProgressLog::new, InMemoryCommandStores.SingleThread::new);
+ SimpleProgressLog::new, InMemoryCommandStores.SingleThread::new, localConfig);
awaitUninterruptibly(on.unsafeStart());
err.println("Initialized node " + init.self);
err.flush();