CEP-15: (C*) Implement TopologySorter to prioritise hosts based on DynamicSnitch and/or topology layout (#72)
patch by David Capwell; reviewed by Blake Eggleston for CASSANDRA-18929
diff --git a/accord-core/src/main/java/accord/topology/Topologies.java b/accord-core/src/main/java/accord/topology/Topologies.java
index 5832cce..1d729d2 100644
--- a/accord-core/src/main/java/accord/topology/Topologies.java
+++ b/accord-core/src/main/java/accord/topology/Topologies.java
@@ -260,29 +260,25 @@
private final List<Topology> topologies;
private final int maxShardsPerEpoch;
- public Multi(TopologySorter.Supplier sorter, int initialCapacity)
+ public Multi(TopologySorter.Supplier sorter, Topology... topologies)
{
- this.topologies = new ArrayList<>(initialCapacity);
- this.supplier = sorter;
- this.sorter = sorter.get(this);
+ this(sorter, Arrays.asList(topologies));
+ }
+
+ public Multi(TopologySorter.Supplier sorter, List<Topology> input)
+ {
+ this.topologies = new ArrayList<>(input.size());
+ for (Topology topology : input)
+ {
+ Invariants.checkArgument(topologies.isEmpty() || topology.epoch == topologies.get(topologies.size() - 1).epoch - 1);
+ topologies.add(topology);
+ }
int maxShardsPerEpoch = 0;
for (int i = 0 ; i < topologies.size() ; ++i)
maxShardsPerEpoch = Math.max(maxShardsPerEpoch, topologies.get(i).size());
this.maxShardsPerEpoch = maxShardsPerEpoch;
- }
-
- public Multi(TopologySorter.Supplier sorter, Topology... topologies)
- {
- this(sorter, topologies.length);
- for (Topology topology : topologies)
- add(topology);
- }
-
- public Multi(TopologySorter.Supplier sorter, List<Topology> topologies)
- {
- this(sorter, topologies.size());
- for (Topology topology : topologies)
- add(topology);
+ this.supplier = sorter;
+ this.sorter = sorter.get(this);
}
@Override
@@ -392,12 +388,6 @@
return maxShardsPerEpoch;
}
- public void add(Topology topology)
- {
- Invariants.checkArgument(topologies.isEmpty() || topology.epoch == topologies.get(topologies.size() - 1).epoch - 1);
- topologies.add(topology);
- }
-
@Override
public boolean equals(Object obj)
{
@@ -422,4 +412,38 @@
return sorter.compare(node1, node2, shards);
}
}
+
+ class Builder
+ {
+ private final List<Topology> topologies;
+
+ public Builder(int initialCapacity)
+ {
+ topologies = new ArrayList<>(initialCapacity);
+ }
+
+ public void add(Topology topology)
+ {
+ Invariants.checkArgument(topologies.isEmpty() || topology.epoch == topologies.get(topologies.size() - 1).epoch - 1);
+ topologies.add(topology);
+ }
+
+ public boolean isEmpty()
+ {
+ return topologies.isEmpty();
+ }
+
+ public Topologies build(TopologySorter.Supplier sorter)
+ {
+ switch (topologies.size())
+ {
+ case 0:
+ throw new IllegalStateException("Unable to build an empty Topologies");
+ case 1:
+ return new Single(sorter, topologies.get(0));
+ default:
+ return new Multi(sorter, topologies);
+ }
+ }
+ }
}
diff --git a/accord-core/src/main/java/accord/topology/TopologyManager.java b/accord-core/src/main/java/accord/topology/TopologyManager.java
index a940b10..dcb499c 100644
--- a/accord-core/src/main/java/accord/topology/TopologyManager.java
+++ b/accord-core/src/main/java/accord/topology/TopologyManager.java
@@ -517,7 +517,7 @@
int i = (int)(snapshot.currentEpoch - maxEpoch);
int maxi = (int)(Math.min(1 + snapshot.currentEpoch - minEpoch, snapshot.epochs.length));
- Topologies.Multi topologies = new Topologies.Multi(sorter, maxi - i);
+ Topologies.Builder topologies = new Topologies.Builder(maxi - i);
Unseekables<?> remaining = select;
while (i < maxi)
@@ -528,7 +528,7 @@
}
if (i == snapshot.epochs.length)
- return topologies;
+ return topologies.build(sorter);
// include any additional epochs to reach sufficiency
EpochState prev = snapshot.epochs[maxi - 1];
@@ -538,14 +538,14 @@
remaining = remaining.subtract(sufficient);
remaining = remaining.subtract(prev.addedRanges);
if (remaining.isEmpty())
- return topologies;
+ return topologies.build(sorter);
EpochState next = snapshot.epochs[i++];
topologies.add(next.global.forSelection(remaining));
prev = next;
} while (i < snapshot.epochs.length);
- return topologies;
+ return topologies.build(sorter);
}
public Topologies preciseEpochs(Unseekables<?> select, long minEpoch, long maxEpoch)
@@ -556,7 +556,7 @@
return new Single(sorter, snapshot.get(minEpoch).global.forSelection(select));
int count = (int)(1 + maxEpoch - minEpoch);
- Topologies.Multi topologies = new Topologies.Multi(sorter, count);
+ Topologies.Builder topologies = new Topologies.Builder(count);
for (int i = count - 1 ; i >= 0 ; --i)
{
EpochState epochState = snapshot.get(minEpoch + i);
@@ -564,10 +564,9 @@
select = select.subtract(epochState.addedRanges);
}
- for (int i = count - 1 ; i >= 0 ; --i)
Invariants.checkState(!topologies.isEmpty(), "Unable to find an epoch that contained %s", select);
- return topologies;
+ return topologies.build(sorter);
}
public Topologies forEpoch(Unseekables<?> select, long epoch)
diff --git a/accord-core/src/test/java/accord/messages/TxnRequestScopeTest.java b/accord-core/src/test/java/accord/messages/TxnRequestScopeTest.java
index e32097e..95021c1 100644
--- a/accord-core/src/test/java/accord/messages/TxnRequestScopeTest.java
+++ b/accord-core/src/test/java/accord/messages/TxnRequestScopeTest.java
@@ -44,9 +44,7 @@
Topology topology1 = topology(1, shard(range, idList(1, 2, 3), idSet(1, 2)));
Topology topology2 = topology(2, shard(range, idList(3, 4, 5), idSet(4, 5)));
- Topologies.Multi topologies = new Topologies.Multi((TopologySorter.StaticSorter)(a, b, s)->0);
- topologies.add(topology2);
- topologies.add(topology1);
+ Topologies.Multi topologies = new Topologies.Multi((TopologySorter.StaticSorter)(a, b, s)->0, topology2, topology1);
// 3 remains a member across both topologies, so can process requests without waiting for latest topology data
Assertions.assertEquals(scope(150), ((PartialKeyRoute)TxnRequest.computeScope(id(3), topologies, route)).toParticipants());
@@ -75,9 +73,7 @@
shard(range1, idList(4, 5, 6), idSet(4, 5)),
shard(range2, idList(1, 2, 3), idSet(1, 2)) );
- Topologies.Multi topologies = new Topologies.Multi((TopologySorter.StaticSorter)(a,b,s)->0);
- topologies.add(topology2);
- topologies.add(topology1);
+ Topologies.Multi topologies = new Topologies.Multi((TopologySorter.StaticSorter)(a,b,s)->0, topology2, topology1);
Assertions.assertEquals(scope(150, 250), ((PartialKeyRoute)TxnRequest.computeScope(id(1), topologies, route)).toParticipants());
Assertions.assertEquals(2, TxnRequest.computeWaitForEpoch(id(1), topologies, route));