CEP-15: (C*) Implement TopologySorter to prioritise hosts based on DynamicSnitch and/or topology layout (#72)

patch by David Capwell; reviewed by Blake Eggleston for CASSANDRA-18929
diff --git a/accord-core/src/main/java/accord/topology/Topologies.java b/accord-core/src/main/java/accord/topology/Topologies.java
index 5832cce..1d729d2 100644
--- a/accord-core/src/main/java/accord/topology/Topologies.java
+++ b/accord-core/src/main/java/accord/topology/Topologies.java
@@ -260,29 +260,25 @@
         private final List<Topology> topologies;
         private final int maxShardsPerEpoch;
 
-        public Multi(TopologySorter.Supplier sorter, int initialCapacity)
+        public Multi(TopologySorter.Supplier sorter, Topology... topologies)
         {
-            this.topologies = new ArrayList<>(initialCapacity);
-            this.supplier = sorter;
-            this.sorter = sorter.get(this);
+            this(sorter, Arrays.asList(topologies));
+        }
+
+        public Multi(TopologySorter.Supplier sorter, List<Topology> input)
+        {
+            this.topologies = new ArrayList<>(input.size());
+            for (Topology topology : input)
+            {
+                Invariants.checkArgument(topologies.isEmpty() || topology.epoch == topologies.get(topologies.size() - 1).epoch - 1);
+                topologies.add(topology);
+            }
             int maxShardsPerEpoch = 0;
             for (int i = 0 ; i < topologies.size() ; ++i)
                 maxShardsPerEpoch = Math.max(maxShardsPerEpoch, topologies.get(i).size());
             this.maxShardsPerEpoch = maxShardsPerEpoch;
-        }
-
-        public Multi(TopologySorter.Supplier sorter, Topology... topologies)
-        {
-            this(sorter, topologies.length);
-            for (Topology topology : topologies)
-                add(topology);
-        }
-
-        public Multi(TopologySorter.Supplier sorter, List<Topology> topologies)
-        {
-            this(sorter, topologies.size());
-            for (Topology topology : topologies)
-                add(topology);
+            this.supplier = sorter;
+            this.sorter = sorter.get(this);
         }
 
         @Override
@@ -392,12 +388,6 @@
             return maxShardsPerEpoch;
         }
 
-        public void add(Topology topology)
-        {
-            Invariants.checkArgument(topologies.isEmpty() || topology.epoch == topologies.get(topologies.size() - 1).epoch - 1);
-            topologies.add(topology);
-        }
-
         @Override
         public boolean equals(Object obj)
         {
@@ -422,4 +412,38 @@
             return sorter.compare(node1, node2, shards);
         }
     }
+
+    class Builder
+    {
+        private final List<Topology> topologies;
+
+        public Builder(int initialCapacity)
+        {
+            topologies = new ArrayList<>(initialCapacity);
+        }
+
+        public void add(Topology topology)
+        {
+            Invariants.checkArgument(topologies.isEmpty() || topology.epoch == topologies.get(topologies.size() - 1).epoch - 1);
+            topologies.add(topology);
+        }
+
+        public boolean isEmpty()
+        {
+            return topologies.isEmpty();
+        }
+
+        public Topologies build(TopologySorter.Supplier sorter)
+        {
+            switch (topologies.size())
+            {
+                case 0:
+                    throw new IllegalStateException("Unable to build an empty Topologies");
+                case 1:
+                    return new Single(sorter, topologies.get(0));
+                default:
+                    return new Multi(sorter, topologies);
+            }
+        }
+    }
 }
diff --git a/accord-core/src/main/java/accord/topology/TopologyManager.java b/accord-core/src/main/java/accord/topology/TopologyManager.java
index a940b10..dcb499c 100644
--- a/accord-core/src/main/java/accord/topology/TopologyManager.java
+++ b/accord-core/src/main/java/accord/topology/TopologyManager.java
@@ -517,7 +517,7 @@
 
         int i = (int)(snapshot.currentEpoch - maxEpoch);
         int maxi = (int)(Math.min(1 + snapshot.currentEpoch - minEpoch, snapshot.epochs.length));
-        Topologies.Multi topologies = new Topologies.Multi(sorter, maxi - i);
+        Topologies.Builder topologies = new Topologies.Builder(maxi - i);
 
         Unseekables<?> remaining = select;
         while (i < maxi)
@@ -528,7 +528,7 @@
         }
 
         if (i == snapshot.epochs.length)
-            return topologies;
+            return topologies.build(sorter);
 
         // include any additional epochs to reach sufficiency
         EpochState prev = snapshot.epochs[maxi - 1];
@@ -538,14 +538,14 @@
             remaining = remaining.subtract(sufficient);
             remaining = remaining.subtract(prev.addedRanges);
             if (remaining.isEmpty())
-                return topologies;
+                return topologies.build(sorter);
 
             EpochState next = snapshot.epochs[i++];
             topologies.add(next.global.forSelection(remaining));
             prev = next;
         } while (i < snapshot.epochs.length);
 
-        return topologies;
+        return topologies.build(sorter);
     }
 
     public Topologies preciseEpochs(Unseekables<?> select, long minEpoch, long maxEpoch)
@@ -556,7 +556,7 @@
             return new Single(sorter, snapshot.get(minEpoch).global.forSelection(select));
 
         int count = (int)(1 + maxEpoch - minEpoch);
-        Topologies.Multi topologies = new Topologies.Multi(sorter, count);
+        Topologies.Builder topologies = new Topologies.Builder(count);
         for (int i = count - 1 ; i >= 0 ; --i)
         {
             EpochState epochState = snapshot.get(minEpoch + i);
@@ -564,10 +564,9 @@
             select = select.subtract(epochState.addedRanges);
         }
 
-        for (int i = count - 1 ; i >= 0 ; --i)
         Invariants.checkState(!topologies.isEmpty(), "Unable to find an epoch that contained %s", select);
 
-        return topologies;
+        return topologies.build(sorter);
     }
 
     public Topologies forEpoch(Unseekables<?> select, long epoch)
diff --git a/accord-core/src/test/java/accord/messages/TxnRequestScopeTest.java b/accord-core/src/test/java/accord/messages/TxnRequestScopeTest.java
index e32097e..95021c1 100644
--- a/accord-core/src/test/java/accord/messages/TxnRequestScopeTest.java
+++ b/accord-core/src/test/java/accord/messages/TxnRequestScopeTest.java
@@ -44,9 +44,7 @@
         Topology topology1 = topology(1, shard(range, idList(1, 2, 3), idSet(1, 2)));
         Topology topology2 = topology(2, shard(range, idList(3, 4, 5), idSet(4, 5)));
 
-        Topologies.Multi topologies = new Topologies.Multi((TopologySorter.StaticSorter)(a, b, s)->0);
-        topologies.add(topology2);
-        topologies.add(topology1);
+        Topologies.Multi topologies = new Topologies.Multi((TopologySorter.StaticSorter)(a, b, s)->0, topology2, topology1);
 
         // 3 remains a member across both topologies, so can process requests without waiting for latest topology data
         Assertions.assertEquals(scope(150), ((PartialKeyRoute)TxnRequest.computeScope(id(3), topologies, route)).toParticipants());
@@ -75,9 +73,7 @@
                                       shard(range1, idList(4, 5, 6), idSet(4, 5)),
                                       shard(range2, idList(1, 2, 3), idSet(1, 2)) );
 
-        Topologies.Multi topologies = new Topologies.Multi((TopologySorter.StaticSorter)(a,b,s)->0);
-        topologies.add(topology2);
-        topologies.add(topology1);
+        Topologies.Multi topologies = new Topologies.Multi((TopologySorter.StaticSorter)(a,b,s)->0, topology2, topology1);
 
         Assertions.assertEquals(scope(150, 250), ((PartialKeyRoute)TxnRequest.computeScope(id(1), topologies, route)).toParticipants());
         Assertions.assertEquals(2, TxnRequest.computeWaitForEpoch(id(1), topologies, route));