Accord TCM integration
Patch by Blake Eggleston; Reviewed by David Capwell for CASSANDRA-18444
diff --git a/accord-core/src/main/java/accord/api/ConfigurationService.java b/accord-core/src/main/java/accord/api/ConfigurationService.java
index 7b868ac..f7ecfec 100644
--- a/accord-core/src/main/java/accord/api/ConfigurationService.java
+++ b/accord-core/src/main/java/accord/api/ConfigurationService.java
@@ -118,6 +118,12 @@
* This should be invoked on each replica once EpochReady.coordination has returned on a replica.
*/
void onEpochSyncComplete(Node.Id node, long epoch);
+
+ /**
+ * Called when the configuration service is meant to truncate it's topology data up to (but not including)
+ * the given epoch
+ */
+ void truncateTopologyUntil(long epoch);
}
void registerListener(Listener listener);
diff --git a/accord-core/src/main/java/accord/impl/AbstractConfigurationService.java b/accord-core/src/main/java/accord/impl/AbstractConfigurationService.java
new file mode 100644
index 0000000..6d2aaf3
--- /dev/null
+++ b/accord-core/src/main/java/accord/impl/AbstractConfigurationService.java
@@ -0,0 +1,279 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package accord.impl;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.primitives.Ints;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import accord.api.ConfigurationService;
+import accord.local.Node;
+import accord.topology.Topology;
+import accord.utils.Invariants;
+import accord.utils.async.AsyncResult;
+import accord.utils.async.AsyncResults;
+
+public abstract class AbstractConfigurationService implements ConfigurationService
+{
+ private static final Logger logger = LoggerFactory.getLogger(AbstractConfigurationService.class);
+
+ protected final Node.Id node;
+
+ protected final EpochHistory epochs = new EpochHistory();
+
+ protected final List<Listener> listeners = new ArrayList<>();
+
+ static class EpochState
+ {
+ private final long epoch;
+ private final AsyncResult.Settable<Topology> received = AsyncResults.settable();
+ private final AsyncResult.Settable<Void> acknowledged = AsyncResults.settable();
+
+ private Topology topology = null;
+
+ public EpochState(long epoch)
+ {
+ this.epoch = epoch;
+ }
+
+ public long epoch()
+ {
+ return epoch;
+ }
+
+ @Override
+ public String toString()
+ {
+ return "EpochState{" + epoch + '}';
+ }
+ }
+
+ @VisibleForTesting
+ protected static class EpochHistory
+ {
+ // TODO (low priority): move pendingEpochs / FetchTopology into here?
+ private List<EpochState> epochs = new ArrayList<>();
+
+ protected long lastReceived = 0;
+ private long lastAcknowledged = 0;
+
+ long minEpoch()
+ {
+ return epochs.isEmpty() ? 0L : epochs.get(0).epoch;
+ }
+
+ long maxEpoch()
+ {
+ int size = epochs.size();
+ return size == 0 ? 0L : epochs.get(size - 1).epoch;
+ }
+
+ @VisibleForTesting
+ EpochState atIndex(int idx)
+ {
+ return epochs.get(idx);
+ }
+
+ @VisibleForTesting
+ int size()
+ {
+ return epochs.size();
+ }
+
+ EpochState getOrCreate(long epoch)
+ {
+ Invariants.checkArgument(epoch > 0);
+ if (epochs.isEmpty())
+ {
+ EpochState state = new EpochState(epoch);
+ epochs.add(state);
+ return state;
+ }
+
+ long minEpoch = minEpoch();
+ if (epoch < minEpoch)
+ {
+ int prepend = Ints.checkedCast(minEpoch - epoch);
+ List<EpochState> next = new ArrayList<>(epochs.size() + prepend);
+ for (long addEpoch=epoch; addEpoch<minEpoch; addEpoch++)
+ next.add(new EpochState(addEpoch));
+ next.addAll(epochs);
+ epochs = next;
+ minEpoch = minEpoch();
+ Invariants.checkState(minEpoch == epoch);
+ }
+ long maxEpoch = maxEpoch();
+ int idx = Ints.checkedCast(epoch - minEpoch);
+
+ // add any missing epochs
+ for (long addEpoch = maxEpoch + 1; addEpoch <= epoch; addEpoch++)
+ epochs.add(new EpochState(addEpoch));
+
+ return epochs.get(idx);
+ }
+
+ public EpochHistory receive(Topology topology)
+ {
+ long epoch = topology.epoch();
+ Invariants.checkState(lastReceived == epoch - 1 || epoch == 0 || lastReceived == 0);
+ lastReceived = epoch;
+ EpochState state = getOrCreate(epoch);
+ if (state != null)
+ {
+ state.topology = topology;
+ state.received.setSuccess(topology);
+ }
+ return this;
+ }
+
+ AsyncResult<Topology> receiveFuture(long epoch)
+ {
+ return getOrCreate(epoch).received;
+ }
+
+ Topology topologyFor(long epoch)
+ {
+ return getOrCreate(epoch).topology;
+ }
+
+ public EpochHistory acknowledge(long epoch)
+ {
+ Invariants.checkState(lastAcknowledged == epoch - 1 || epoch == 0 || lastAcknowledged == 0);
+ lastAcknowledged = epoch;
+ getOrCreate(epoch).acknowledged.setSuccess(null);
+ return this;
+ }
+
+ AsyncResult<Void> acknowledgeFuture(long epoch)
+ {
+ return getOrCreate(epoch).acknowledged;
+ }
+
+ void truncateUntil(long epoch)
+ {
+ Invariants.checkArgument(epoch <= maxEpoch());
+ long minEpoch = minEpoch();
+ int toTrim = Ints.checkedCast(epoch - minEpoch);
+ if (toTrim <=0)
+ return;
+
+ epochs = new ArrayList<>(epochs.subList(toTrim, epochs.size()));
+ }
+ }
+
+ public AbstractConfigurationService(Node.Id node)
+ {
+ this.node = node;
+ }
+
+ @Override
+ public synchronized void registerListener(Listener listener)
+ {
+ listeners.add(listener);
+ }
+
+ @Override
+ public synchronized Topology currentTopology()
+ {
+ return epochs.topologyFor(epochs.lastReceived);
+ }
+
+ @Override
+ public synchronized Topology getTopologyForEpoch(long epoch)
+ {
+ return epochs.topologyFor(epoch);
+ }
+
+ protected abstract void fetchTopologyInternal(long epoch);
+
+ @Override
+ public synchronized void fetchTopologyForEpoch(long epoch)
+ {
+ if (epoch <= epochs.lastReceived)
+ return;
+
+ fetchTopologyInternal(epoch);
+ }
+
+ protected abstract void epochSyncComplete(Topology topology );
+
+ @Override
+ public synchronized void acknowledgeEpoch(EpochReady ready)
+ {
+ ready.metadata.addCallback(() -> epochs.acknowledge(ready.epoch));
+ ready.coordination.addCallback(() -> epochSyncComplete(epochs.getOrCreate(ready.epoch).topology));
+ }
+
+ protected void topologyUpdatePreListenerNotify(Topology topology) {}
+ protected void topologyUpdatePostListenerNotify(Topology topology) {}
+
+ public synchronized AsyncResult<Void> reportTopology(Topology topology)
+ {
+ long lastReceived = epochs.lastReceived;
+ if (topology.epoch() <= lastReceived)
+ return AsyncResults.success(null);
+
+ if (lastReceived > 0 && topology.epoch() > lastReceived + 1)
+ {
+ fetchTopologyForEpoch(lastReceived + 1);
+ epochs.receiveFuture(lastReceived + 1).addCallback(() -> reportTopology(topology));
+ return AsyncResults.success(null);
+ }
+
+ long lastAcked = epochs.lastAcknowledged;
+ if (lastAcked > 0 && topology.epoch() > lastAcked + 1)
+ {
+ epochs.acknowledgeFuture(lastAcked + 1).addCallback(() -> reportTopology(topology));
+ return AsyncResults.success(null);
+ }
+ logger.trace("Epoch {} received by {}", topology.epoch(), node);
+
+ epochs.receive(topology);
+ topologyUpdatePreListenerNotify(topology);
+ for (Listener listener : listeners)
+ listener.onTopologyUpdate(topology);
+ topologyUpdatePostListenerNotify(topology);
+ return AsyncResults.success(null);
+ }
+
+ protected void epochSyncCompletePreListenerNotify(Node.Id node, long epoch) {}
+
+ public synchronized void epochSyncComplete(Node.Id node, long epoch)
+ {
+ epochSyncCompletePreListenerNotify(node, epoch);
+ for (Listener listener : listeners)
+ listener.onEpochSyncComplete(node, epoch);
+ }
+
+ protected void truncateTopologiesPreListenerNotify(long epoch) {}
+ protected void truncateTopologiesPostListenerNotify(long epoch) {}
+
+ public synchronized void truncateTopologiesUntil(long epoch)
+ {
+ truncateTopologiesPreListenerNotify(epoch);
+ for (Listener listener : listeners)
+ listener.truncateTopologyUntil(epoch);
+ truncateTopologiesPostListenerNotify(epoch);
+ epochs.truncateUntil(epoch);
+ }
+}
diff --git a/accord-core/src/main/java/accord/local/Node.java b/accord-core/src/main/java/accord/local/Node.java
index 6e97c04..01e83eb 100644
--- a/accord-core/src/main/java/accord/local/Node.java
+++ b/accord-core/src/main/java/accord/local/Node.java
@@ -141,7 +141,6 @@
this.configService = configService;
this.topology = new TopologyManager(topologySorter, id);
this.nowSupplier = nowSupplier;
- Topology topology = configService.currentTopology();
this.now = new AtomicReference<>(Timestamp.fromValues(topology.epoch(), nowSupplier.getAsLong(), id));
this.agent = agent;
this.random = random;
@@ -199,6 +198,12 @@
topology.onEpochSyncComplete(node, epoch);
}
+ @Override
+ public void truncateTopologyUntil(long epoch)
+ {
+ topology.truncateTopologyUntil(epoch);
+ }
+
public void withEpoch(long epoch, Runnable runnable)
{
if (topology.hasEpoch(epoch))
diff --git a/accord-core/src/main/java/accord/topology/Topology.java b/accord-core/src/main/java/accord/topology/Topology.java
index 728553c..56b251e 100644
--- a/accord-core/src/main/java/accord/topology/Topology.java
+++ b/accord-core/src/main/java/accord/topology/Topology.java
@@ -103,7 +103,7 @@
@Override
public String toString()
{
- return "Topology{" + "epoch=" + epoch + ", " + super.toString() + '}';
+ return "Topology{" + "epoch=" + epoch + ", " + Arrays.toString(shards) + '}';
}
@Override
diff --git a/accord-core/src/main/java/accord/topology/TopologyManager.java b/accord-core/src/main/java/accord/topology/TopologyManager.java
index aea8c53..9c083a5 100644
--- a/accord-core/src/main/java/accord/topology/TopologyManager.java
+++ b/accord-core/src/main/java/accord/topology/TopologyManager.java
@@ -68,6 +68,7 @@
this.global = checkArgument(global, !global.isSubset());
this.local = global.forNode(node).trim();
Invariants.checkArgument(!global().isSubset());
+ // TODO: can we just track sync for local ranges here?
this.syncTracker = new QuorumTracker(new Single(sorter, global()));
this.syncComplete = syncComplete;
this.prevSynced = prevSynced;
@@ -129,6 +130,7 @@
private static class Epochs
{
+ private static final Epochs EMPTY = new Epochs(new EpochState[0]);
private final long currentEpoch;
private final EpochState[] epochs;
// nodes we've received sync complete notifications from, for epochs we do not yet have topologies for.
@@ -174,6 +176,16 @@
return current().epoch + 1;
}
+ public long minEpoch()
+ {
+ return currentEpoch - epochs.length + 1;
+ }
+
+ public long epoch()
+ {
+ return currentEpoch;
+ }
+
public Topology current()
{
return epochs.length > 0 ? epochs[0].global() : Topology.EMPTY;
@@ -197,6 +209,8 @@
else
{
EpochState state = get(epoch);
+ if (state == null)
+ return;
state.recordSyncComplete(node);
for (epoch++ ; state.syncComplete() && epoch <= currentEpoch; epoch++)
{
@@ -223,14 +237,15 @@
{
this.sorter = sorter;
this.node = node;
- this.epochs = new Epochs(new EpochState[0]);
+ this.epochs = Epochs.EMPTY;
}
public synchronized void onTopologyUpdate(Topology topology)
{
Epochs current = epochs;
- checkArgument(topology.epoch == current.nextEpoch(), "Expected topology update %d to be %d", topology.epoch, current.nextEpoch());
+ checkArgument(topology.epoch == current.nextEpoch() || epochs == Epochs.EMPTY,
+ "Expected topology update %d to be %d", topology.epoch, current.nextEpoch());
EpochState[] nextEpochs = new EpochState[current.epochs.length + 1];
List<Set<Id>> pendingSync = new ArrayList<>(current.pendingSyncComplete);
Set<Id> alreadySyncd = Collections.emptySet();
@@ -274,6 +289,22 @@
epochs.syncComplete(node, epoch);
}
+ public synchronized void truncateTopologyUntil(long epoch)
+ {
+ Epochs current = epochs;
+ checkArgument(current.epoch() >= epoch);
+
+ if (current.minEpoch() >= epoch)
+ return;
+
+ int newLen = current.epochs.length - (int) (epoch - current.minEpoch());
+ Invariants.checkState(current.epochs[newLen - 1].syncComplete());
+
+ EpochState[] nextEpochs = new EpochState[newLen];
+ System.arraycopy(current.epochs, 0, nextEpochs, 0, newLen);
+ epochs = new Epochs(nextEpochs, current.pendingSyncComplete, current.futureEpochFutures);
+ }
+
public TopologySorter.Supplier sorter()
{
return sorter;
diff --git a/accord-core/src/test/java/accord/Utils.java b/accord-core/src/test/java/accord/Utils.java
index b5f3863..3b55416 100644
--- a/accord-core/src/test/java/accord/Utils.java
+++ b/accord-core/src/test/java/accord/Utils.java
@@ -138,17 +138,17 @@
MockStore store = new MockStore();
Scheduler scheduler = new ThreadPoolScheduler();
Node node = new Node(nodeId,
- messageSink,
- new MockConfigurationService(messageSink, EpochFunction.noop(), topology),
- clock,
- () -> store,
- new ShardDistributor.EvenSplit(8, ignore -> new IntKey.Splitter()),
- new TestAgent(),
- new DefaultRandom(),
- scheduler,
- SizeOfIntersectionSorter.SUPPLIER,
- SimpleProgressLog::new,
- InMemoryCommandStores.Synchronized::new);
+ messageSink,
+ new MockConfigurationService(messageSink, EpochFunction.noop(), topology),
+ clock,
+ () -> store,
+ new ShardDistributor.EvenSplit(8, ignore -> new IntKey.Splitter()),
+ new TestAgent(),
+ new DefaultRandom(),
+ scheduler,
+ SizeOfIntersectionSorter.SUPPLIER,
+ SimpleProgressLog::new,
+ InMemoryCommandStores.Synchronized::new);
awaitUninterruptibly(node.start());
return node;
}
diff --git a/accord-core/src/test/java/accord/burn/BurnTestConfigurationService.java b/accord-core/src/test/java/accord/burn/BurnTestConfigurationService.java
index 0c5aa2a..855366f 100644
--- a/accord-core/src/test/java/accord/burn/BurnTestConfigurationService.java
+++ b/accord-core/src/test/java/accord/burn/BurnTestConfigurationService.java
@@ -20,17 +20,13 @@
import accord.api.TestableConfigurationService;
import accord.local.AgentExecutor;
+import accord.impl.AbstractConfigurationService;
import accord.utils.RandomSource;
import accord.local.Node;
import accord.messages.*;
import accord.topology.Topology;
-import accord.utils.Invariants;
-import accord.utils.async.AsyncResult;
import accord.utils.async.AsyncResults;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
@@ -38,124 +34,22 @@
import java.util.function.Function;
import java.util.function.Supplier;
-public class BurnTestConfigurationService implements TestableConfigurationService
+public class BurnTestConfigurationService extends AbstractConfigurationService implements TestableConfigurationService
{
- private static final Logger logger = LoggerFactory.getLogger(BurnTestConfigurationService.class);
-
- private final Node.Id node;
private final AgentExecutor executor;
private final Function<Node.Id, Node> lookup;
private final Supplier<RandomSource> randomSupplier;
- private final Map<Long, FetchTopology> pendingEpochs = new HashMap<>();
-
- private final EpochHistory epochs = new EpochHistory();
- private final List<Listener> listeners = new ArrayList<>();
private final TopologyUpdates topologyUpdates;
-
- private static class EpochState
- {
- private final long epoch;
- private final AsyncResult.Settable<Topology> received = AsyncResults.settable();
- private final AsyncResult.Settable<Void> acknowledged = AsyncResults.settable();
- private final AsyncResult.Settable<Void> synced = AsyncResults.settable();
-
- private Topology topology = null;
-
- public EpochState(long epoch)
- {
- this.epoch = epoch;
- }
- }
-
- private static class EpochHistory
- {
- // TODO (low priority): move pendingEpochs / FetchTopology into here?
- private final List<EpochState> epochs = new ArrayList<>();
-
- private long lastReceived = 0;
- private long lastAcknowledged = 0;
- private long lastSyncd = 0;
-
- private EpochState get(long epoch)
- {
- for (long addEpoch = epochs.size() - 1; addEpoch <= epoch; addEpoch++)
- epochs.add(new EpochState(addEpoch));
- return epochs.get((int) epoch);
- }
-
- EpochHistory receive(Topology topology)
- {
- long epoch = topology.epoch();
- Invariants.checkState(epoch == 0 || lastReceived == epoch - 1);
- lastReceived = epoch;
- EpochState state = get(epoch);
- state.topology = topology;
- state.received.setSuccess(topology);
- return this;
- }
-
- AsyncResult<Topology> receiveFuture(long epoch)
- {
- return get(epoch).received;
- }
-
- Topology topologyFor(long epoch)
- {
- return get(epoch).topology;
- }
-
- EpochHistory acknowledge(long epoch)
- {
- Invariants.checkState(epoch == 0 || lastAcknowledged == epoch - 1);
- lastAcknowledged = epoch;
- get(epoch).acknowledged.setSuccess(null);
- return this;
- }
-
- AsyncResult<Void> acknowledgeFuture(long epoch)
- {
- return get(epoch).acknowledged;
- }
-
- EpochHistory syncComplete(long epoch)
- {
- Invariants.checkState(epoch == 0 || lastSyncd == epoch - 1);
- EpochState state = get(epoch);
- Invariants.checkState(state.received.isDone());
- Invariants.checkState(state.acknowledged.isDone());
- lastSyncd = epoch;
- get(epoch).synced.setSuccess(null);
- return this;
- }
- }
+ private final Map<Long, FetchTopology> pendingEpochs = new HashMap<>();
public BurnTestConfigurationService(Node.Id node, AgentExecutor executor, Supplier<RandomSource> randomSupplier, Topology topology, Function<Node.Id, Node> lookup, TopologyUpdates topologyUpdates)
{
- this.node = node;
+ super(node);
this.executor = executor;
this.randomSupplier = randomSupplier;
this.lookup = lookup;
this.topologyUpdates = topologyUpdates;
- epochs.receive(Topology.EMPTY).acknowledge(0).syncComplete(0);
- epochs.receive(topology).acknowledge(1).syncComplete(1);
- }
-
- @Override
- public synchronized void registerListener(Listener listener)
- {
- listeners.add(listener);
- }
-
- @Override
- public synchronized Topology currentTopology()
- {
- return epochs.topologyFor(epochs.lastReceived);
- }
-
- @Override
- public synchronized Topology getTopologyForEpoch(long epoch)
- {
- return epochs.topologyFor(epoch);
+ reportTopology(topology);
}
private static class FetchTopologyRequest implements Request
@@ -257,58 +151,29 @@
}
@Override
- public synchronized void fetchTopologyForEpoch(long epoch)
+ protected void fetchTopologyInternal(long epoch)
{
- if (epoch <= epochs.lastReceived)
- return;
-
- for (long e = epochs.lastReceived + 1; e < epoch ; ++e)
- pendingEpochs.computeIfAbsent(epoch, FetchTopology::new);
+ pendingEpochs.computeIfAbsent(epoch, FetchTopology::new);
}
@Override
- public synchronized void acknowledgeEpoch(EpochReady ready)
+ protected void epochSyncComplete(Topology topology)
{
- ready.metadata.addCallback(() -> epochs.acknowledge(ready.epoch));
- ready.coordination.addCallback(() -> topologyUpdates.syncComplete(lookup.apply(node), epochs.get(ready.epoch).topology.nodes(), ready.epoch));
+ topologyUpdates.syncComplete(lookup.apply(node), topology.nodes(), topology.epoch());
+ }
+
+ @Override
+ protected void topologyUpdatePostListenerNotify(Topology topology)
+ {
+ FetchTopology fetch = pendingEpochs.remove(topology.epoch());
+ if (fetch == null)
+ return;
+
+ fetch.setSuccess(null);
}
private Node originator()
{
return lookup.apply(node);
}
-
- @Override
- public synchronized AsyncResult<Void> reportTopology(Topology topology)
- {
- long lastReceived = epochs.lastReceived;
- if (topology.epoch() <= lastReceived)
- return AsyncResults.success(null);
-
- if (topology.epoch() > lastReceived + 1)
- {
- fetchTopologyForEpoch(lastReceived + 1);
- epochs.receiveFuture(lastReceived + 1).addCallback(() -> reportTopology(topology));
- return AsyncResults.success(null);
- }
-
- long lastAcked = epochs.lastAcknowledged;
- if (topology.epoch() > lastAcked + 1)
- {
- epochs.acknowledgeFuture(lastAcked + 1).addCallback(() -> reportTopology(topology));
- return AsyncResults.success(null);
- }
- logger.trace("Epoch {} received by {}", topology.epoch(), node);
-
- epochs.receive(topology);
- for (Listener listener : listeners)
- listener.onTopologyUpdate(topology);
-
- FetchTopology fetch = pendingEpochs.remove(topology.epoch());
- if (fetch == null)
- return AsyncResults.success(null);
-
- fetch.setSuccess(null);
- return AsyncResults.success(null);
- }
}
diff --git a/accord-core/src/test/java/accord/impl/AbstractConfigurationServiceTest.java b/accord-core/src/test/java/accord/impl/AbstractConfigurationServiceTest.java
new file mode 100644
index 0000000..d0e6ee0
--- /dev/null
+++ b/accord-core/src/test/java/accord/impl/AbstractConfigurationServiceTest.java
@@ -0,0 +1,302 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package accord.impl;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import accord.api.ConfigurationService.EpochReady;
+import accord.utils.async.AsyncResult;
+import accord.utils.async.AsyncResults;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+
+import accord.api.ConfigurationService;
+import accord.impl.AbstractConfigurationService.EpochHistory;
+import accord.local.Node.Id;
+import accord.primitives.Range;
+import accord.topology.Shard;
+import accord.topology.Topology;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+public class AbstractConfigurationServiceTest
+{
+ public static class TestListener implements ConfigurationService.Listener
+ {
+ private final ConfigurationService parent;
+ private final boolean ackTopologies;
+ final Map<Long, Topology> topologies = new HashMap<>();
+ final Map<Long, Set<Id>> syncCompletes = new HashMap<>();
+ final Set<Long> truncates = new HashSet<>();
+
+ public TestListener(ConfigurationService parent, boolean ackTopologies)
+ {
+ this.parent = parent;
+ this.ackTopologies = ackTopologies;
+ }
+
+ @Override
+ public AsyncResult<Void> onTopologyUpdate(Topology topology)
+ {
+ if (topologies.put(topology.epoch(), topology) != null)
+ Assertions.fail("Received topology twice for epoch " + topology.epoch());
+ if (ackTopologies)
+ parent.acknowledgeEpoch(EpochReady.done(topology.epoch()));
+ return AsyncResults.success(null);
+ }
+
+ @Override
+ public void onEpochSyncComplete(Id node, long epoch)
+ {
+ Set<Id> synced = syncCompletes.computeIfAbsent(epoch, e -> new HashSet<>());
+ if (!synced.add(node))
+ Assertions.fail(String.format("Recieved multiple syncs for epoch %s from %s", epoch, node));
+ }
+
+ @Override
+ public void truncateTopologyUntil(long epoch)
+ {
+ if (!truncates.add(epoch))
+ Assertions.fail(String.format("Recieved multiple truncates for epoch", epoch));
+ }
+
+ public void assertNoTruncates()
+ {
+ Assertions.assertTrue(truncates.isEmpty());
+ }
+
+ public void assertTruncates(Long... epochs)
+ {
+ Assertions.assertEquals(ImmutableSet.copyOf(epochs), truncates);
+ }
+
+ public void assertSyncsFor(Long... epochs)
+ {
+ Assertions.assertEquals(ImmutableSet.copyOf(epochs), syncCompletes.keySet());
+ }
+
+ public void assertSyncsForEpoch(long epoch, Id... nodes)
+ {
+ Assertions.assertEquals(ImmutableSet.copyOf(nodes), syncCompletes.get(epoch));
+ }
+
+ public void assertTopologiesFor(Long... epochs)
+ {
+ Assertions.assertEquals(ImmutableSet.copyOf(epochs), topologies.keySet());
+ }
+
+ public void assertTopologyForEpoch(long epoch, Topology topology)
+ {
+ Assertions.assertEquals(topology, topologies.get(epoch));
+ }
+ }
+
+ private static class TestableConfigurationService extends AbstractConfigurationService
+ {
+ final Set<Long> syncStarted = new HashSet<>();
+ final Set<Long> epochsFetched = new HashSet<>();
+
+ public TestableConfigurationService(Id node)
+ {
+ super(node);
+ }
+
+ @Override
+ protected void fetchTopologyInternal(long epoch)
+ {
+ epochsFetched.add(epoch);
+ }
+
+ @Override
+ protected void epochSyncComplete(Topology topology)
+ {
+ if (!syncStarted.add(topology.epoch()))
+ Assertions.fail("Sync started multiple times for " + topology.epoch());
+ }
+
+ @Override
+ protected void topologyUpdatePostListenerNotify(Topology topology)
+ {
+ acknowledgeEpoch(EpochReady.done(topology.epoch()));
+ }
+ }
+
+ private static final Id ID1 = new Id(1);
+ private static final Id ID2 = new Id(2);
+ private static final Id ID3 = new Id(3);
+ private static final List<Id> NODES = ImmutableList.of(ID1, ID2, ID3);
+ private static final Range RANGE = IntKey.range(0, 100);
+
+ private static Shard shard(Range range, List<Id> nodes, Set<Id> fastPath)
+ {
+ return new Shard(range, nodes, fastPath);
+ }
+
+ private static Topology topology(long epoch, Range range, List<Id> nodes, Set<Id> fastPath)
+ {
+ return new Topology(epoch, shard(range, nodes, fastPath));
+ }
+
+ private static Topology topology(long epoch, Id... fastPath)
+ {
+ return topology(epoch, RANGE, NODES, ImmutableSet.copyOf(fastPath));
+ }
+
+ private static Topology topology(long epoch, int... fastPath)
+ {
+ Set<Id> fpSet = Arrays.stream(fastPath).mapToObj(Id::new).collect(Collectors.toSet());
+ return topology(epoch, RANGE, NODES, fpSet);
+ }
+
+ private static final Topology TOPOLOGY1 = topology(1, 1, 2, 3);
+ private static final Topology TOPOLOGY2 = topology(2, 1, 2);
+ private static final Topology TOPOLOGY3 = topology(3, 1, 3);
+ private static final Topology TOPOLOGY4 = topology(4, 2, 3);
+
+ @Test
+ public void getTopologyTest()
+ {
+ TestableConfigurationService service = new TestableConfigurationService(ID1);
+ TestListener listener = new TestListener(service, false);
+ service.registerListener(listener);
+ service.reportTopology(TOPOLOGY1);
+ service.reportTopology(TOPOLOGY2);
+ service.reportTopology(TOPOLOGY3);
+ service.reportTopology(TOPOLOGY4);
+
+ listener.assertNoTruncates();
+ listener.assertTopologiesFor(1L, 2L, 3L, 4L);
+ Assertions.assertSame(TOPOLOGY1, service.getTopologyForEpoch(1));
+ Assertions.assertSame(TOPOLOGY2, service.getTopologyForEpoch(2));
+ Assertions.assertSame(TOPOLOGY3, service.getTopologyForEpoch(3));
+ Assertions.assertSame(TOPOLOGY4, service.getTopologyForEpoch(4));
+ }
+
+ /**
+ * check everything works properly if we start loading after epoch 1 has
+ * been removed
+ */
+ @Test
+ public void loadAfterTruncate()
+ {
+ TestableConfigurationService service = new TestableConfigurationService(ID1);
+ TestListener listener = new TestListener(service, false);
+ service.registerListener(listener);
+ service.reportTopology(TOPOLOGY3);
+ service.reportTopology(TOPOLOGY4);
+
+ listener.assertNoTruncates();
+ listener.assertTopologiesFor(3L, 4L);
+ Assertions.assertSame(TOPOLOGY3, service.getTopologyForEpoch(3));
+ Assertions.assertSame(TOPOLOGY4, service.getTopologyForEpoch(4));
+ }
+
+ /**
+ * If we receive topology epochs out of order for some reason, we should
+ * reorder with callbacks
+ */
+ @Test
+ public void awaitOutOfOrderTopologies()
+ {
+ TestableConfigurationService service = new TestableConfigurationService(ID1);
+
+ TestListener listener = new TestListener(service, false);
+ service.registerListener(listener);
+
+ service.reportTopology(TOPOLOGY1);
+ service.reportTopology(TOPOLOGY3);
+ listener.assertTopologiesFor(1L);
+ Assertions.assertEquals(ImmutableSet.of(2L), service.epochsFetched);
+
+ service.reportTopology(TOPOLOGY2);
+ listener.assertTopologiesFor(1L, 2L, 3L);
+
+ }
+
+ private static void assertHistoryEpochs(EpochHistory history, long... expected)
+ {
+ Assertions.assertEquals(history.size(), expected.length);
+ if (expected.length == 0)
+ return;
+
+ Assertions.assertEquals(expected[0], history.minEpoch());
+ Assertions.assertEquals(expected[expected.length - 1], history.maxEpoch());
+
+ for (int i=0; i<expected.length; i++)
+ Assertions.assertEquals(expected[i], history.atIndex(i).epoch());
+ }
+
+ @Test
+ public void epochHistoryAppend()
+ {
+ EpochHistory history = new EpochHistory();
+ Assertions.assertEquals(0, history.size());
+
+ history.getOrCreate(5);
+ assertHistoryEpochs(history, 5);
+
+ history.getOrCreate(6);
+ assertHistoryEpochs(history, 5, 6);
+
+ history.getOrCreate(8);
+ assertHistoryEpochs(history, 5, 6, 7, 8);
+ }
+
+ @Test
+ public void epochHistoryPrepend()
+ {
+ EpochHistory history = new EpochHistory();
+ Assertions.assertEquals(0, history.size());
+
+ history.getOrCreate(5);
+ history.getOrCreate(6);
+ assertHistoryEpochs(history, 5, 6);
+
+ history.getOrCreate(3);
+ assertHistoryEpochs(history, 3, 4, 5, 6);
+ }
+
+ @Test
+ public void epochHistoryTruncate()
+ {
+ EpochHistory history = new EpochHistory();
+ Assertions.assertEquals(0, history.size());
+
+ history.getOrCreate(1);
+ history.getOrCreate(2);
+ history.getOrCreate(3);
+ history.getOrCreate(4);
+ history.getOrCreate(5);
+ history.getOrCreate(6);
+
+ assertHistoryEpochs(history, 1, 2, 3, 4, 5, 6);
+
+ history.truncateUntil(4);
+ assertHistoryEpochs(history, 4, 5, 6);
+
+ history.getOrCreate(7);
+ assertHistoryEpochs(history, 4, 5, 6, 7);
+ }
+}
diff --git a/accord-core/src/test/java/accord/impl/mock/MockCluster.java b/accord-core/src/test/java/accord/impl/mock/MockCluster.java
index d6a244d..d2bc233 100644
--- a/accord-core/src/test/java/accord/impl/mock/MockCluster.java
+++ b/accord-core/src/test/java/accord/impl/mock/MockCluster.java
@@ -119,6 +119,7 @@
SimpleProgressLog::new,
InMemoryCommandStores.SingleThread::new);
awaitUninterruptibly(node.start());
+ node.onTopologyUpdate(topology);
return node;
}
diff --git a/accord-core/src/test/java/accord/local/ImmutableCommandTest.java b/accord-core/src/test/java/accord/local/ImmutableCommandTest.java
index 18bb226..c39f46a 100644
--- a/accord-core/src/test/java/accord/local/ImmutableCommandTest.java
+++ b/accord-core/src/test/java/accord/local/ImmutableCommandTest.java
@@ -100,6 +100,7 @@
new MockCluster.Clock(100), () -> storeSupport.data, new ShardDistributor.EvenSplit(8, ignore -> new IntKey.Splitter()), new TestAgent(), new DefaultRandom(), null,
SizeOfIntersectionSorter.SUPPLIER, ignore -> ignore2 -> new NoOpProgressLog(), InMemoryCommandStores.Synchronized::new);
awaitUninterruptibly(node.start());
+ node.onTopologyUpdate(storeSupport.local.get());
return node;
}
diff --git a/accord-core/src/test/java/accord/messages/PreAcceptTest.java b/accord-core/src/test/java/accord/messages/PreAcceptTest.java
index b3a6994..ec4f58d 100644
--- a/accord-core/src/test/java/accord/messages/PreAcceptTest.java
+++ b/accord-core/src/test/java/accord/messages/PreAcceptTest.java
@@ -196,7 +196,8 @@
messageSink.assertHistorySizes(0, 1);
Assertions.assertEquals(ID3, messageSink.responses.get(0).to);
PartialDeps expectedDeps = new PartialDeps(Ranges.of(range(0, 12)), KeyDeps.NONE, RangeDeps.NONE);
- Assertions.assertEquals(new PreAccept.PreAcceptOk(txnId2, Timestamp.fromValues(1, 110, ID1), expectedDeps),
+ Timestamp expectedTs = Timestamp.fromValues(1, 110, ID1).withExtraFlags(txnId2.flags());
+ Assertions.assertEquals(new PreAccept.PreAcceptOk(txnId2, expectedTs, expectedDeps),
messageSink.responses.get(0).payload);
}
finally
diff --git a/accord-core/src/test/java/accord/topology/TopologyManagerTest.java b/accord-core/src/test/java/accord/topology/TopologyManagerTest.java
index ec85d15..bec8468 100644
--- a/accord-core/src/test/java/accord/topology/TopologyManagerTest.java
+++ b/accord-core/src/test/java/accord/topology/TopologyManagerTest.java
@@ -200,4 +200,67 @@
Assertions.assertEquals(topologies(topology2, topology(1, shard(range(200, 300), idList(4, 5, 6), idSet(4, 5)))),
actual);
}
+
+ @Test
+ void incompleteTopologyHistory()
+ {
+ Topology topology5 = topology(5,
+ shard(range(100, 200), idList(1, 2, 3), idSet(1, 2)),
+ shard(range(200, 300), idList(4, 5, 6), idSet(4, 5)));
+ Topology topology6 = topology(6,
+ shard(range(100, 200), idList(1, 2, 3), idSet(1, 2)),
+ shard(range(200, 300), idList(4, 5, 6), idSet(5, 6)));
+
+ TopologyManager service = new TopologyManager(SUPPLIER, ID);
+ service.onTopologyUpdate(topology5);
+ service.onTopologyUpdate(topology6);
+
+ Assertions.assertSame(topology6, service.getEpochStateUnsafe(6).global());
+ Assertions.assertSame(topology5, service.getEpochStateUnsafe(5).global());
+ for (int i=1; i<=6; i++) service.onEpochSyncComplete(id(i), 5);
+ Assertions.assertTrue(service.getEpochStateUnsafe(5).syncComplete());
+ Assertions.assertNull(service.getEpochStateUnsafe(4));
+
+ service.onEpochSyncComplete(id(1), 4);
+ }
+
+ private static void markTopologySynced(TopologyManager service, long epoch)
+ {
+ service.getEpochStateUnsafe(epoch).global().nodes().forEach(id -> service.onEpochSyncComplete(id, epoch));
+ }
+
+ private static void addAndMarkSynced(TopologyManager service, Topology topology)
+ {
+ service.onTopologyUpdate(topology);
+ markTopologySynced(service, topology.epoch());
+ }
+
+ @Test
+ void truncateTopologyHistory()
+ {
+ Range range = range(100, 200);
+ TopologyManager service = new TopologyManager(SUPPLIER, ID);
+ addAndMarkSynced(service, topology(1, shard(range, idList(1, 2, 3), idSet(1, 2))));
+ addAndMarkSynced(service, topology(2, shard(range, idList(1, 2, 3), idSet(2, 3))));
+ addAndMarkSynced(service, topology(3, shard(range, idList(1, 2, 3), idSet(1, 2))));
+ addAndMarkSynced(service, topology(4, shard(range, idList(1, 2, 3), idSet(1, 3))));
+
+ Assertions.assertTrue(service.hasEpoch(1));
+ Assertions.assertTrue(service.hasEpoch(2));
+ Assertions.assertTrue(service.hasEpoch(3));
+ Assertions.assertTrue(service.hasEpoch(4));
+
+ service.truncateTopologyUntil(3);
+ Assertions.assertFalse(service.hasEpoch(1));
+ Assertions.assertFalse(service.hasEpoch(2));
+ Assertions.assertTrue(service.hasEpoch(3));
+ Assertions.assertTrue(service.hasEpoch(4));
+
+ }
+
+ @Test
+ void truncateTopologyCantTruncateUnsyncedEpochs()
+ {
+
+ }
}