Fix flaky test in IndexSummaryManagerTest

Patch by Sam Tunnicliffe; reviewed by Marcus Eriksson for
CASSANDRA-12218
diff --git a/src/java/org/apache/cassandra/io/sstable/IndexSummaryManager.java b/src/java/org/apache/cassandra/io/sstable/IndexSummaryManager.java
index ddda430..95ade16 100644
--- a/src/java/org/apache/cassandra/io/sstable/IndexSummaryManager.java
+++ b/src/java/org/apache/cassandra/io/sstable/IndexSummaryManager.java
@@ -225,7 +225,9 @@
         Pair<List<SSTableReader>, Map<UUID, LifecycleTransaction>> compactingAndNonCompacting = getCompactingAndNonCompactingSSTables();
         try
         {
-            redistributeSummaries(compactingAndNonCompacting.left, compactingAndNonCompacting.right, this.memoryPoolBytes);
+            redistributeSummaries(new IndexSummaryRedistribution(compactingAndNonCompacting.left,
+                                                                 compactingAndNonCompacting.right,
+                                                                 this.memoryPoolBytes));
         }
         finally
         {
@@ -237,14 +239,14 @@
     /**
      * Attempts to fairly distribute a fixed pool of memory for index summaries across a set of SSTables based on
      * their recent read rates.
-     * @param transactions containing the sstables we are to redistribute the memory pool across
-     * @param memoryPoolBytes a size (in bytes) that the total index summary space usage should stay close to or
-     *                        under, if possible
+     * @param redistribution encapsulating the transactions containing the sstables we are to redistribute the
+     *                       memory pool across and a size (in bytes) that the total index summary space usage
+     *                       should stay close to or under, if possible
      * @return a list of new SSTableReader instances
      */
     @VisibleForTesting
-    public static List<SSTableReader> redistributeSummaries(List<SSTableReader> compacting, Map<UUID, LifecycleTransaction> transactions, long memoryPoolBytes) throws IOException
+    public static List<SSTableReader> redistributeSummaries(IndexSummaryRedistribution redistribution) throws IOException
     {
-        return CompactionManager.instance.runIndexSummaryRedistribution(new IndexSummaryRedistribution(compacting, transactions, memoryPoolBytes));
+        return CompactionManager.instance.runIndexSummaryRedistribution(redistribution);
     }
 }
diff --git a/test/unit/org/apache/cassandra/io/sstable/IndexSummaryManagerTest.java b/test/unit/org/apache/cassandra/io/sstable/IndexSummaryManagerTest.java
index 0498c68..c0445d5 100644
--- a/test/unit/org/apache/cassandra/io/sstable/IndexSummaryManagerTest.java
+++ b/test/unit/org/apache/cassandra/io/sstable/IndexSummaryManagerTest.java
@@ -19,16 +19,8 @@
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
+import java.util.*;
+import java.util.concurrent.*;
 import java.util.concurrent.atomic.AtomicReference;
 
 import com.google.common.base.Joiner;
@@ -147,7 +139,7 @@
 
         try (LifecycleTransaction txn = cfs.getTracker().tryModify(sstables, OperationType.UNKNOWN))
         {
-            sstables = redistributeSummaries(Collections.EMPTY_LIST, of(cfs.metadata.cfId, txn), originalOffHeapSize * sstables.size());
+            sstables = redistributeSummaries(redistribution(Collections.EMPTY_LIST, of(cfs.metadata.cfId, txn), originalOffHeapSize * sstables.size()));
         }
         for (SSTableReader sstable : sstables)
             assertEquals(BASE_SAMPLING_LEVEL, sstable.getIndexSummarySamplingLevel());
@@ -255,7 +247,7 @@
         long summarySpace = sstable.getIndexSummaryOffHeapSize();
         try (LifecycleTransaction txn = cfs.getTracker().tryModify(asList(sstable), OperationType.UNKNOWN))
         {
-            redistributeSummaries(Collections.EMPTY_LIST, of(cfs.metadata.cfId, txn), summarySpace);
+            redistributeSummaries(redistribution(Collections.EMPTY_LIST, of(cfs.metadata.cfId, txn), summarySpace));
         }
 
         sstable = cfs.getLiveSSTables().iterator().next();
@@ -267,7 +259,7 @@
         int previousSize = sstable.getIndexSummarySize();
         try (LifecycleTransaction txn = cfs.getTracker().tryModify(asList(sstable), OperationType.UNKNOWN))
         {
-            redistributeSummaries(Collections.EMPTY_LIST, of(cfs.metadata.cfId, txn), (long) Math.ceil(summarySpace * 1.5));
+            redistributeSummaries(redistribution(Collections.EMPTY_LIST, of(cfs.metadata.cfId, txn), (long) Math.ceil(summarySpace * 1.5)));
         }
         sstable = cfs.getLiveSSTables().iterator().next();
         assertEquals(previousSize * 1.5, (double) sstable.getIndexSummarySize(), 1);
@@ -278,7 +270,7 @@
         cfs.metadata.minIndexInterval(originalMinIndexInterval);
         try (LifecycleTransaction txn = cfs.getTracker().tryModify(asList(sstable), OperationType.UNKNOWN))
         {
-            redistributeSummaries(Collections.EMPTY_LIST, of(cfs.metadata.cfId, txn), (long) Math.ceil(summarySpace / 2.0));
+            redistributeSummaries(redistribution(Collections.EMPTY_LIST, of(cfs.metadata.cfId, txn), (long) Math.ceil(summarySpace / 2.0)));
         }
         sstable = cfs.getLiveSSTables().iterator().next();
         assertEquals(originalMinIndexInterval * 2, sstable.getEffectiveIndexInterval(), 0.001);
@@ -291,7 +283,7 @@
         cfs.metadata.maxIndexInterval(originalMinIndexInterval * 4);
         try (LifecycleTransaction txn = cfs.getTracker().tryModify(asList(sstable), OperationType.UNKNOWN))
         {
-            redistributeSummaries(Collections.EMPTY_LIST, of(cfs.metadata.cfId, txn), 10);
+            redistributeSummaries(redistribution(Collections.EMPTY_LIST, of(cfs.metadata.cfId, txn), 10));
         }
         sstable = cfs.getLiveSSTables().iterator().next();
         assertEquals(cfs.metadata.params.minIndexInterval, sstable.getEffectiveIndexInterval(), 0.001);
@@ -314,7 +306,7 @@
 
         try (LifecycleTransaction txn = cfs.getTracker().tryModify(sstables, OperationType.UNKNOWN))
         {
-            redistributeSummaries(Collections.EMPTY_LIST, of(cfs.metadata.cfId, txn), 10);
+            redistributeSummaries(redistribution(Collections.EMPTY_LIST, of(cfs.metadata.cfId, txn), 10));
         }
         sstables = new ArrayList<>(cfs.getLiveSSTables());
         for (SSTableReader sstable : sstables)
@@ -324,7 +316,7 @@
         cfs.metadata.maxIndexInterval(cfs.metadata.params.maxIndexInterval / 2);
         try (LifecycleTransaction txn = cfs.getTracker().tryModify(sstables, OperationType.UNKNOWN))
         {
-            redistributeSummaries(Collections.EMPTY_LIST, of(cfs.metadata.cfId, txn), 1);
+            redistributeSummaries(redistribution(Collections.EMPTY_LIST, of(cfs.metadata.cfId, txn), 1));
         }
         sstables = new ArrayList<>(cfs.getLiveSSTables());
         for (SSTableReader sstable : sstables)
@@ -337,7 +329,7 @@
         cfs.metadata.maxIndexInterval(cfs.metadata.params.maxIndexInterval * 2);
         try (LifecycleTransaction txn = cfs.getTracker().tryModify(sstables, OperationType.UNKNOWN))
         {
-            redistributeSummaries(Collections.EMPTY_LIST, of(cfs.metadata.cfId, txn), 1);
+            redistributeSummaries(redistribution(Collections.EMPTY_LIST, of(cfs.metadata.cfId, txn), 1));
         }
         for (SSTableReader sstable : cfs.getLiveSSTables())
         {
@@ -368,7 +360,7 @@
         // there should be enough space to not downsample anything
         try (LifecycleTransaction txn = cfs.getTracker().tryModify(sstables, OperationType.UNKNOWN))
         {
-            sstables = redistributeSummaries(Collections.EMPTY_LIST, of(cfs.metadata.cfId, txn), (singleSummaryOffHeapSpace * numSSTables));
+            sstables = redistributeSummaries(redistribution(Collections.EMPTY_LIST, of(cfs.metadata.cfId, txn), (singleSummaryOffHeapSpace * numSSTables)));
         }
         for (SSTableReader sstable : sstables)
             assertEquals(BASE_SAMPLING_LEVEL, sstable.getIndexSummarySamplingLevel());
@@ -379,7 +371,7 @@
         assert sstables.size() == 4;
         try (LifecycleTransaction txn = cfs.getTracker().tryModify(sstables, OperationType.UNKNOWN))
         {
-            sstables = redistributeSummaries(Collections.EMPTY_LIST, of(cfs.metadata.cfId, txn), (singleSummaryOffHeapSpace * (numSSTables / 2)));
+            sstables = redistributeSummaries(redistribution(Collections.EMPTY_LIST, of(cfs.metadata.cfId, txn), (singleSummaryOffHeapSpace * (numSSTables / 2))));
         }
         for (SSTableReader sstable : sstables)
             assertEquals(BASE_SAMPLING_LEVEL / 2, sstable.getIndexSummarySamplingLevel());
@@ -388,7 +380,7 @@
         // everything should get cut to a quarter
         try (LifecycleTransaction txn = cfs.getTracker().tryModify(sstables, OperationType.UNKNOWN))
         {
-            sstables = redistributeSummaries(Collections.EMPTY_LIST, of(cfs.metadata.cfId, txn), (singleSummaryOffHeapSpace * (numSSTables / 4)));
+            sstables = redistributeSummaries(redistribution(Collections.EMPTY_LIST, of(cfs.metadata.cfId, txn), (singleSummaryOffHeapSpace * (numSSTables / 4))));
         }
         for (SSTableReader sstable : sstables)
             assertEquals(BASE_SAMPLING_LEVEL / 4, sstable.getIndexSummarySamplingLevel());
@@ -397,7 +389,7 @@
         // upsample back up to half
         try (LifecycleTransaction txn = cfs.getTracker().tryModify(sstables, OperationType.UNKNOWN))
         {
-            sstables = redistributeSummaries(Collections.EMPTY_LIST, of(cfs.metadata.cfId, txn), (singleSummaryOffHeapSpace * (numSSTables / 2) + 4));
+            sstables = redistributeSummaries(redistribution(Collections.EMPTY_LIST, of(cfs.metadata.cfId, txn), (singleSummaryOffHeapSpace * (numSSTables / 2) + 4)));
         }
         assert sstables.size() == 4;
         for (SSTableReader sstable : sstables)
@@ -407,7 +399,7 @@
         // upsample back up to the original index summary
         try (LifecycleTransaction txn = cfs.getTracker().tryModify(sstables, OperationType.UNKNOWN))
         {
-            sstables = redistributeSummaries(Collections.EMPTY_LIST, of(cfs.metadata.cfId, txn), (singleSummaryOffHeapSpace * numSSTables));
+            sstables = redistributeSummaries(redistribution(Collections.EMPTY_LIST, of(cfs.metadata.cfId, txn), (singleSummaryOffHeapSpace * numSSTables)));
         }
         for (SSTableReader sstable : sstables)
             assertEquals(BASE_SAMPLING_LEVEL, sstable.getIndexSummarySamplingLevel());
@@ -419,7 +411,7 @@
         sstables.get(1).overrideReadMeter(new RestorableMeter(50.0, 50.0));
         try (LifecycleTransaction txn = cfs.getTracker().tryModify(sstables, OperationType.UNKNOWN))
         {
-            sstables = redistributeSummaries(Collections.EMPTY_LIST, of(cfs.metadata.cfId, txn), (singleSummaryOffHeapSpace * 3));
+            sstables = redistributeSummaries(redistribution(Collections.EMPTY_LIST, of(cfs.metadata.cfId, txn), (singleSummaryOffHeapSpace * 3)));
         }
         Collections.sort(sstables, hotnessComparator);
         assertEquals(BASE_SAMPLING_LEVEL / 2, sstables.get(0).getIndexSummarySamplingLevel());
@@ -435,7 +427,7 @@
         sstables.get(1).overrideReadMeter(new RestorableMeter(higherRate, higherRate));
         try (LifecycleTransaction txn = cfs.getTracker().tryModify(sstables, OperationType.UNKNOWN))
         {
-            sstables = redistributeSummaries(Collections.EMPTY_LIST, of(cfs.metadata.cfId, txn), (singleSummaryOffHeapSpace * 3));
+            sstables = redistributeSummaries(redistribution(Collections.EMPTY_LIST, of(cfs.metadata.cfId, txn), (singleSummaryOffHeapSpace * 3)));
         }
         Collections.sort(sstables, hotnessComparator);
         assertEquals(BASE_SAMPLING_LEVEL / 2, sstables.get(0).getIndexSummarySamplingLevel());
@@ -453,7 +445,7 @@
 
         try (LifecycleTransaction txn = cfs.getTracker().tryModify(sstables, OperationType.UNKNOWN))
         {
-            sstables = redistributeSummaries(Collections.EMPTY_LIST, of(cfs.metadata.cfId, txn), (singleSummaryOffHeapSpace * 3) + 50);
+            sstables = redistributeSummaries(redistribution(Collections.EMPTY_LIST, of(cfs.metadata.cfId, txn), (singleSummaryOffHeapSpace * 3) + 50));
         }
         Collections.sort(sstables, hotnessComparator);
 
@@ -477,7 +469,7 @@
         sstables.get(3).overrideReadMeter(new RestorableMeter(128.0, 128.0));
         try (LifecycleTransaction txn = cfs.getTracker().tryModify(sstables, OperationType.UNKNOWN))
         {
-            sstables = redistributeSummaries(Collections.EMPTY_LIST, of(cfs.metadata.cfId, txn), (long) (singleSummaryOffHeapSpace + (singleSummaryOffHeapSpace * (92.0 / BASE_SAMPLING_LEVEL))));
+            sstables = redistributeSummaries(redistribution(Collections.EMPTY_LIST, of(cfs.metadata.cfId, txn), (long) (singleSummaryOffHeapSpace + (singleSummaryOffHeapSpace * (92.0 / BASE_SAMPLING_LEVEL)))));
         }
         Collections.sort(sstables, hotnessComparator);
         assertEquals(1, sstables.get(0).getIndexSummarySize());  // at the min sampling level
@@ -490,7 +482,7 @@
         // Don't leave enough space for even the minimal index summaries
         try (LifecycleTransaction txn = cfs.getTracker().tryModify(sstables, OperationType.UNKNOWN))
         {
-            sstables = redistributeSummaries(Collections.EMPTY_LIST, of(cfs.metadata.cfId, txn), 10);
+            sstables = redistributeSummaries(redistribution(Collections.EMPTY_LIST, of(cfs.metadata.cfId, txn), 10));
         }
         for (SSTableReader sstable : sstables)
             assertEquals(1, sstable.getIndexSummarySize());  // at the min sampling level
@@ -625,6 +617,9 @@
         // everything should get cut in half
         final AtomicReference<CompactionInterruptedException> exception = new AtomicReference<>();
 
+        // barrier to control when redistribution runs
+        final CountDownLatch barrier = new CountDownLatch(1);
+
         Thread t = new Thread(new Runnable()
         {
             public void run()
@@ -634,7 +629,10 @@
                     // Don't leave enough space for even the minimal index summaries
                     try (LifecycleTransaction txn = cfs.getTracker().tryModify(sstables, OperationType.UNKNOWN))
                     {
-                        redistributeSummaries(Collections.EMPTY_LIST, of(cfs.metadata.cfId, txn), singleSummaryOffHeapSpace);
+                        redistributeSummaries(new ObservableRedistribution(Collections.EMPTY_LIST,
+                                                                           of(cfs.metadata.cfId, txn),
+                                                                           singleSummaryOffHeapSpace,
+                                                                           barrier));
                     }
                 }
                 catch (CompactionInterruptedException ex)
@@ -649,7 +647,13 @@
         t.start();
         while (CompactionManager.instance.getActiveCompactions() == 0 && t.isAlive())
             Thread.sleep(1);
+        // to ensure that the stop condition check in IndexSummaryRedistribution::redistributeSummaries
+        // is made *after* the halt request is made to the CompactionManager, don't allow the redistribution
+        // to proceed until stopCompaction has been called.
         CompactionManager.instance.stopCompaction("INDEX_SUMMARY");
+        // allows the redistribution to proceed
+        barrier.countDown();
+
         t.join();
 
         assertNotNull("Expected compaction interrupted exception", exception.get());
@@ -664,4 +668,37 @@
 
         validateData(cfs, numRows);
     }
+
+    private static IndexSummaryRedistribution redistribution(List<SSTableReader> compacting,
+                                                             Map<UUID, LifecycleTransaction> transactions,
+                                                             long memoryPoolBytes)
+    {
+        return new IndexSummaryRedistribution(compacting, transactions, memoryPoolBytes);
+    }
+
+    private static class ObservableRedistribution extends IndexSummaryRedistribution
+    {
+        CountDownLatch barrier;
+        public ObservableRedistribution(List<SSTableReader> compacting,
+                                        Map<UUID, LifecycleTransaction> transactions,
+                                        long memoryPoolBytes,
+                                        CountDownLatch barrier)
+        {
+            super(compacting, transactions, memoryPoolBytes);
+            this.barrier = barrier;
+        }
+
+        public List<SSTableReader> redistributeSummaries() throws IOException
+        {
+            try
+            {
+                barrier.await();
+            }
+            catch (InterruptedException e)
+            {
+                throw new RuntimeException("Interrupted waiting on test barrier");
+            }
+            return super.redistributeSummaries();
+        }
+    }
 }