Fix flaky test in IndexSummaryManagerTest
Patch by Sam Tunnicliffe; reviewed by Marcus Eriksson for
CASSANDRA-12218
diff --git a/src/java/org/apache/cassandra/io/sstable/IndexSummaryManager.java b/src/java/org/apache/cassandra/io/sstable/IndexSummaryManager.java
index ddda430..95ade16 100644
--- a/src/java/org/apache/cassandra/io/sstable/IndexSummaryManager.java
+++ b/src/java/org/apache/cassandra/io/sstable/IndexSummaryManager.java
@@ -225,7 +225,9 @@
Pair<List<SSTableReader>, Map<UUID, LifecycleTransaction>> compactingAndNonCompacting = getCompactingAndNonCompactingSSTables();
try
{
- redistributeSummaries(compactingAndNonCompacting.left, compactingAndNonCompacting.right, this.memoryPoolBytes);
+ redistributeSummaries(new IndexSummaryRedistribution(compactingAndNonCompacting.left,
+ compactingAndNonCompacting.right,
+ this.memoryPoolBytes));
}
finally
{
@@ -237,14 +239,14 @@
/**
* Attempts to fairly distribute a fixed pool of memory for index summaries across a set of SSTables based on
* their recent read rates.
- * @param transactions containing the sstables we are to redistribute the memory pool across
- * @param memoryPoolBytes a size (in bytes) that the total index summary space usage should stay close to or
- * under, if possible
+ * @param redistribution encapsulating the transactions containing the sstables we are to redistribute the
+ * memory pool across and a size (in bytes) that the total index summary space usage
+ * should stay close to or under, if possible
* @return a list of new SSTableReader instances
*/
@VisibleForTesting
- public static List<SSTableReader> redistributeSummaries(List<SSTableReader> compacting, Map<UUID, LifecycleTransaction> transactions, long memoryPoolBytes) throws IOException
+ public static List<SSTableReader> redistributeSummaries(IndexSummaryRedistribution redistribution) throws IOException
{
- return CompactionManager.instance.runIndexSummaryRedistribution(new IndexSummaryRedistribution(compacting, transactions, memoryPoolBytes));
+ return CompactionManager.instance.runIndexSummaryRedistribution(redistribution);
}
}
diff --git a/test/unit/org/apache/cassandra/io/sstable/IndexSummaryManagerTest.java b/test/unit/org/apache/cassandra/io/sstable/IndexSummaryManagerTest.java
index 0498c68..c0445d5 100644
--- a/test/unit/org/apache/cassandra/io/sstable/IndexSummaryManagerTest.java
+++ b/test/unit/org/apache/cassandra/io/sstable/IndexSummaryManagerTest.java
@@ -19,16 +19,8 @@
import java.io.IOException;
import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
+import java.util.*;
+import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicReference;
import com.google.common.base.Joiner;
@@ -147,7 +139,7 @@
try (LifecycleTransaction txn = cfs.getTracker().tryModify(sstables, OperationType.UNKNOWN))
{
- sstables = redistributeSummaries(Collections.EMPTY_LIST, of(cfs.metadata.cfId, txn), originalOffHeapSize * sstables.size());
+ sstables = redistributeSummaries(redistribution(Collections.EMPTY_LIST, of(cfs.metadata.cfId, txn), originalOffHeapSize * sstables.size()));
}
for (SSTableReader sstable : sstables)
assertEquals(BASE_SAMPLING_LEVEL, sstable.getIndexSummarySamplingLevel());
@@ -255,7 +247,7 @@
long summarySpace = sstable.getIndexSummaryOffHeapSize();
try (LifecycleTransaction txn = cfs.getTracker().tryModify(asList(sstable), OperationType.UNKNOWN))
{
- redistributeSummaries(Collections.EMPTY_LIST, of(cfs.metadata.cfId, txn), summarySpace);
+ redistributeSummaries(redistribution(Collections.EMPTY_LIST, of(cfs.metadata.cfId, txn), summarySpace));
}
sstable = cfs.getLiveSSTables().iterator().next();
@@ -267,7 +259,7 @@
int previousSize = sstable.getIndexSummarySize();
try (LifecycleTransaction txn = cfs.getTracker().tryModify(asList(sstable), OperationType.UNKNOWN))
{
- redistributeSummaries(Collections.EMPTY_LIST, of(cfs.metadata.cfId, txn), (long) Math.ceil(summarySpace * 1.5));
+ redistributeSummaries(redistribution(Collections.EMPTY_LIST, of(cfs.metadata.cfId, txn), (long) Math.ceil(summarySpace * 1.5)));
}
sstable = cfs.getLiveSSTables().iterator().next();
assertEquals(previousSize * 1.5, (double) sstable.getIndexSummarySize(), 1);
@@ -278,7 +270,7 @@
cfs.metadata.minIndexInterval(originalMinIndexInterval);
try (LifecycleTransaction txn = cfs.getTracker().tryModify(asList(sstable), OperationType.UNKNOWN))
{
- redistributeSummaries(Collections.EMPTY_LIST, of(cfs.metadata.cfId, txn), (long) Math.ceil(summarySpace / 2.0));
+ redistributeSummaries(redistribution(Collections.EMPTY_LIST, of(cfs.metadata.cfId, txn), (long) Math.ceil(summarySpace / 2.0)));
}
sstable = cfs.getLiveSSTables().iterator().next();
assertEquals(originalMinIndexInterval * 2, sstable.getEffectiveIndexInterval(), 0.001);
@@ -291,7 +283,7 @@
cfs.metadata.maxIndexInterval(originalMinIndexInterval * 4);
try (LifecycleTransaction txn = cfs.getTracker().tryModify(asList(sstable), OperationType.UNKNOWN))
{
- redistributeSummaries(Collections.EMPTY_LIST, of(cfs.metadata.cfId, txn), 10);
+ redistributeSummaries(redistribution(Collections.EMPTY_LIST, of(cfs.metadata.cfId, txn), 10));
}
sstable = cfs.getLiveSSTables().iterator().next();
assertEquals(cfs.metadata.params.minIndexInterval, sstable.getEffectiveIndexInterval(), 0.001);
@@ -314,7 +306,7 @@
try (LifecycleTransaction txn = cfs.getTracker().tryModify(sstables, OperationType.UNKNOWN))
{
- redistributeSummaries(Collections.EMPTY_LIST, of(cfs.metadata.cfId, txn), 10);
+ redistributeSummaries(redistribution(Collections.EMPTY_LIST, of(cfs.metadata.cfId, txn), 10));
}
sstables = new ArrayList<>(cfs.getLiveSSTables());
for (SSTableReader sstable : sstables)
@@ -324,7 +316,7 @@
cfs.metadata.maxIndexInterval(cfs.metadata.params.maxIndexInterval / 2);
try (LifecycleTransaction txn = cfs.getTracker().tryModify(sstables, OperationType.UNKNOWN))
{
- redistributeSummaries(Collections.EMPTY_LIST, of(cfs.metadata.cfId, txn), 1);
+ redistributeSummaries(redistribution(Collections.EMPTY_LIST, of(cfs.metadata.cfId, txn), 1));
}
sstables = new ArrayList<>(cfs.getLiveSSTables());
for (SSTableReader sstable : sstables)
@@ -337,7 +329,7 @@
cfs.metadata.maxIndexInterval(cfs.metadata.params.maxIndexInterval * 2);
try (LifecycleTransaction txn = cfs.getTracker().tryModify(sstables, OperationType.UNKNOWN))
{
- redistributeSummaries(Collections.EMPTY_LIST, of(cfs.metadata.cfId, txn), 1);
+ redistributeSummaries(redistribution(Collections.EMPTY_LIST, of(cfs.metadata.cfId, txn), 1));
}
for (SSTableReader sstable : cfs.getLiveSSTables())
{
@@ -368,7 +360,7 @@
// there should be enough space to not downsample anything
try (LifecycleTransaction txn = cfs.getTracker().tryModify(sstables, OperationType.UNKNOWN))
{
- sstables = redistributeSummaries(Collections.EMPTY_LIST, of(cfs.metadata.cfId, txn), (singleSummaryOffHeapSpace * numSSTables));
+ sstables = redistributeSummaries(redistribution(Collections.EMPTY_LIST, of(cfs.metadata.cfId, txn), (singleSummaryOffHeapSpace * numSSTables)));
}
for (SSTableReader sstable : sstables)
assertEquals(BASE_SAMPLING_LEVEL, sstable.getIndexSummarySamplingLevel());
@@ -379,7 +371,7 @@
assert sstables.size() == 4;
try (LifecycleTransaction txn = cfs.getTracker().tryModify(sstables, OperationType.UNKNOWN))
{
- sstables = redistributeSummaries(Collections.EMPTY_LIST, of(cfs.metadata.cfId, txn), (singleSummaryOffHeapSpace * (numSSTables / 2)));
+ sstables = redistributeSummaries(redistribution(Collections.EMPTY_LIST, of(cfs.metadata.cfId, txn), (singleSummaryOffHeapSpace * (numSSTables / 2))));
}
for (SSTableReader sstable : sstables)
assertEquals(BASE_SAMPLING_LEVEL / 2, sstable.getIndexSummarySamplingLevel());
@@ -388,7 +380,7 @@
// everything should get cut to a quarter
try (LifecycleTransaction txn = cfs.getTracker().tryModify(sstables, OperationType.UNKNOWN))
{
- sstables = redistributeSummaries(Collections.EMPTY_LIST, of(cfs.metadata.cfId, txn), (singleSummaryOffHeapSpace * (numSSTables / 4)));
+ sstables = redistributeSummaries(redistribution(Collections.EMPTY_LIST, of(cfs.metadata.cfId, txn), (singleSummaryOffHeapSpace * (numSSTables / 4))));
}
for (SSTableReader sstable : sstables)
assertEquals(BASE_SAMPLING_LEVEL / 4, sstable.getIndexSummarySamplingLevel());
@@ -397,7 +389,7 @@
// upsample back up to half
try (LifecycleTransaction txn = cfs.getTracker().tryModify(sstables, OperationType.UNKNOWN))
{
- sstables = redistributeSummaries(Collections.EMPTY_LIST, of(cfs.metadata.cfId, txn), (singleSummaryOffHeapSpace * (numSSTables / 2) + 4));
+ sstables = redistributeSummaries(redistribution(Collections.EMPTY_LIST, of(cfs.metadata.cfId, txn), (singleSummaryOffHeapSpace * (numSSTables / 2) + 4)));
}
assert sstables.size() == 4;
for (SSTableReader sstable : sstables)
@@ -407,7 +399,7 @@
// upsample back up to the original index summary
try (LifecycleTransaction txn = cfs.getTracker().tryModify(sstables, OperationType.UNKNOWN))
{
- sstables = redistributeSummaries(Collections.EMPTY_LIST, of(cfs.metadata.cfId, txn), (singleSummaryOffHeapSpace * numSSTables));
+ sstables = redistributeSummaries(redistribution(Collections.EMPTY_LIST, of(cfs.metadata.cfId, txn), (singleSummaryOffHeapSpace * numSSTables)));
}
for (SSTableReader sstable : sstables)
assertEquals(BASE_SAMPLING_LEVEL, sstable.getIndexSummarySamplingLevel());
@@ -419,7 +411,7 @@
sstables.get(1).overrideReadMeter(new RestorableMeter(50.0, 50.0));
try (LifecycleTransaction txn = cfs.getTracker().tryModify(sstables, OperationType.UNKNOWN))
{
- sstables = redistributeSummaries(Collections.EMPTY_LIST, of(cfs.metadata.cfId, txn), (singleSummaryOffHeapSpace * 3));
+ sstables = redistributeSummaries(redistribution(Collections.EMPTY_LIST, of(cfs.metadata.cfId, txn), (singleSummaryOffHeapSpace * 3)));
}
Collections.sort(sstables, hotnessComparator);
assertEquals(BASE_SAMPLING_LEVEL / 2, sstables.get(0).getIndexSummarySamplingLevel());
@@ -435,7 +427,7 @@
sstables.get(1).overrideReadMeter(new RestorableMeter(higherRate, higherRate));
try (LifecycleTransaction txn = cfs.getTracker().tryModify(sstables, OperationType.UNKNOWN))
{
- sstables = redistributeSummaries(Collections.EMPTY_LIST, of(cfs.metadata.cfId, txn), (singleSummaryOffHeapSpace * 3));
+ sstables = redistributeSummaries(redistribution(Collections.EMPTY_LIST, of(cfs.metadata.cfId, txn), (singleSummaryOffHeapSpace * 3)));
}
Collections.sort(sstables, hotnessComparator);
assertEquals(BASE_SAMPLING_LEVEL / 2, sstables.get(0).getIndexSummarySamplingLevel());
@@ -453,7 +445,7 @@
try (LifecycleTransaction txn = cfs.getTracker().tryModify(sstables, OperationType.UNKNOWN))
{
- sstables = redistributeSummaries(Collections.EMPTY_LIST, of(cfs.metadata.cfId, txn), (singleSummaryOffHeapSpace * 3) + 50);
+ sstables = redistributeSummaries(redistribution(Collections.EMPTY_LIST, of(cfs.metadata.cfId, txn), (singleSummaryOffHeapSpace * 3) + 50));
}
Collections.sort(sstables, hotnessComparator);
@@ -477,7 +469,7 @@
sstables.get(3).overrideReadMeter(new RestorableMeter(128.0, 128.0));
try (LifecycleTransaction txn = cfs.getTracker().tryModify(sstables, OperationType.UNKNOWN))
{
- sstables = redistributeSummaries(Collections.EMPTY_LIST, of(cfs.metadata.cfId, txn), (long) (singleSummaryOffHeapSpace + (singleSummaryOffHeapSpace * (92.0 / BASE_SAMPLING_LEVEL))));
+ sstables = redistributeSummaries(redistribution(Collections.EMPTY_LIST, of(cfs.metadata.cfId, txn), (long) (singleSummaryOffHeapSpace + (singleSummaryOffHeapSpace * (92.0 / BASE_SAMPLING_LEVEL)))));
}
Collections.sort(sstables, hotnessComparator);
assertEquals(1, sstables.get(0).getIndexSummarySize()); // at the min sampling level
@@ -490,7 +482,7 @@
// Don't leave enough space for even the minimal index summaries
try (LifecycleTransaction txn = cfs.getTracker().tryModify(sstables, OperationType.UNKNOWN))
{
- sstables = redistributeSummaries(Collections.EMPTY_LIST, of(cfs.metadata.cfId, txn), 10);
+ sstables = redistributeSummaries(redistribution(Collections.EMPTY_LIST, of(cfs.metadata.cfId, txn), 10));
}
for (SSTableReader sstable : sstables)
assertEquals(1, sstable.getIndexSummarySize()); // at the min sampling level
@@ -625,6 +617,9 @@
// everything should get cut in half
final AtomicReference<CompactionInterruptedException> exception = new AtomicReference<>();
+ // barrier to control when redistribution runs
+ final CountDownLatch barrier = new CountDownLatch(1);
+
Thread t = new Thread(new Runnable()
{
public void run()
@@ -634,7 +629,10 @@
// Don't leave enough space for even the minimal index summaries
try (LifecycleTransaction txn = cfs.getTracker().tryModify(sstables, OperationType.UNKNOWN))
{
- redistributeSummaries(Collections.EMPTY_LIST, of(cfs.metadata.cfId, txn), singleSummaryOffHeapSpace);
+ redistributeSummaries(new ObservableRedistribution(Collections.EMPTY_LIST,
+ of(cfs.metadata.cfId, txn),
+ singleSummaryOffHeapSpace,
+ barrier));
}
}
catch (CompactionInterruptedException ex)
@@ -649,7 +647,13 @@
t.start();
while (CompactionManager.instance.getActiveCompactions() == 0 && t.isAlive())
Thread.sleep(1);
+ // to ensure that the stop condition check in IndexSummaryRedistribution::redistributeSummaries
+ // is made *after* the halt request is made to the CompactionManager, don't allow the redistribution
+ // to proceed until stopCompaction has been called.
CompactionManager.instance.stopCompaction("INDEX_SUMMARY");
+ // allows the redistribution to proceed
+ barrier.countDown();
+
t.join();
assertNotNull("Expected compaction interrupted exception", exception.get());
@@ -664,4 +668,37 @@
validateData(cfs, numRows);
}
+
+ private static IndexSummaryRedistribution redistribution(List<SSTableReader> compacting,
+ Map<UUID, LifecycleTransaction> transactions,
+ long memoryPoolBytes)
+ {
+ return new IndexSummaryRedistribution(compacting, transactions, memoryPoolBytes);
+ }
+
+ private static class ObservableRedistribution extends IndexSummaryRedistribution
+ {
+ CountDownLatch barrier;
+ public ObservableRedistribution(List<SSTableReader> compacting,
+ Map<UUID, LifecycleTransaction> transactions,
+ long memoryPoolBytes,
+ CountDownLatch barrier)
+ {
+ super(compacting, transactions, memoryPoolBytes);
+ this.barrier = barrier;
+ }
+
+ public List<SSTableReader> redistributeSummaries() throws IOException
+ {
+ try
+ {
+ barrier.await();
+ }
+ catch (InterruptedException e)
+ {
+ throw new RuntimeException("Interrupted waiting on test barrier");
+ }
+ return super.redistributeSummaries();
+ }
+ }
}