blob: 9ca29cb1906324dfbbb7d75a4bdf9d709cb6e4ed [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.cassandra.db.compaction;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.TimeUnit;
import com.google.common.collect.Iterables;
import org.apache.commons.math3.random.JDKRandomGenerator;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import com.google.monitoring.runtime.instrumentation.common.collect.ImmutableList;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.BufferDecoratedKey;
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.DecoratedKey;
import org.apache.cassandra.db.Directories;
import org.apache.cassandra.db.DiskBoundaries;
import org.apache.cassandra.db.compaction.unified.Controller;
import org.apache.cassandra.db.compaction.unified.UnifiedCompactionTask;
import org.apache.cassandra.db.lifecycle.SSTableSet;
import org.apache.cassandra.db.lifecycle.Tracker;
import org.apache.cassandra.db.marshal.AsciiType;
import org.apache.cassandra.dht.IPartitioner;
import org.apache.cassandra.dht.Murmur3Partitioner;
import org.apache.cassandra.dht.Splitter;
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.apache.cassandra.schema.TableMetadata;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.Interval;
import org.apache.cassandra.utils.Overlaps;
import org.apache.cassandra.utils.Pair;
import org.apache.cassandra.utils.TimeUUID;
import org.mockito.Answers;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.MockitoAnnotations;
import static org.apache.cassandra.io.sstable.format.SSTableReader.UNIQUE_IDENTIFIER_FACTORY;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyDouble;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.Mockito.RETURNS_SMART_NULLS;
import static org.mockito.Mockito.when;
import static org.mockito.Mockito.withSettings;
/**
* The unified compaction strategy is described in this design document:
*
* See CEP-26: https://cwiki.apache.org/confluence/display/CASSANDRA/CEP-26%3A+Unified+Compaction+Strategy
*/
public class UnifiedCompactionStrategyTest
{
private final static long ONE_MB = 1 << 20;
// Multiple disks can be used both with and without disk boundaries. We want to test both cases.
final String keyspace = "ks";
final String table = "tbl";
@Mock(answer = Answers.RETURNS_SMART_NULLS)
ColumnFamilyStore cfs;
@Mock(answer = Answers.RETURNS_SMART_NULLS)
CompactionStrategyManager csm;
ColumnFamilyStore.VersionedLocalRanges localRanges;
Tracker dataTracker;
long repairedAt;
IPartitioner partitioner;
Splitter splitter;
@BeforeClass
public static void setUpClass()
{
long seed = System.currentTimeMillis();
random.setSeed(seed);
System.out.println("Random seed: " + seed);
DatabaseDescriptor.daemonInitialization(); // because of all the static initialization in CFS
DatabaseDescriptor.setPartitionerUnsafe(Murmur3Partitioner.instance);
}
static final JDKRandomGenerator random = new JDKRandomGenerator();
@Before
public void setUp()
{
setUp(1);
}
protected void setUp(int numShards)
{
MockitoAnnotations.initMocks(this);
TableMetadata metadata = TableMetadata.builder(keyspace, table)
.addPartitionKeyColumn("pk", AsciiType.instance)
.build();
dataTracker = Tracker.newDummyTracker();
repairedAt = System.currentTimeMillis();
partitioner = DatabaseDescriptor.getPartitioner();
splitter = partitioner.splitter().orElse(null);
if (numShards > 1)
assertNotNull("Splitter is required with multiple compaction shards", splitter);
when(cfs.getPartitioner()).thenReturn(partitioner);
localRanges = cfs.fullWeightedRange(0, partitioner);
when(cfs.metadata()).thenReturn(metadata);
when(cfs.getTableName()).thenReturn(table);
when(cfs.localRangesWeighted()).thenReturn(localRanges);
when(cfs.getTracker()).thenReturn(dataTracker);
when(cfs.getLiveSSTables()).thenAnswer(request -> dataTracker.getView().select(SSTableSet.LIVE));
when(cfs.getSSTables(any())).thenAnswer(request -> dataTracker.getView().select(request.getArgument(0)));
when(cfs.getCompactionStrategyManager()).thenReturn(csm);
DiskBoundaries db = new DiskBoundaries(cfs, new Directories.DataDirectory[0], 0);
when(cfs.getDiskBoundaries()).thenReturn(db);
when(csm.onlyPurgeRepairedTombstones()).thenReturn(false);
}
@Test
public void testNoSSTables()
{
Controller controller = Mockito.mock(Controller.class);
long minimalSizeBytes = 2 << 20;
when(controller.getScalingParameter(anyInt())).thenReturn(4);
when(controller.getSurvivalFactor(anyInt())).thenReturn(1.0);
when(controller.getMaxLevelDensity(anyInt(), anyDouble())).thenCallRealMethod();
when(controller.getSurvivalFactor(anyInt())).thenReturn(1.0);
when(controller.getNumShards(anyDouble())).thenReturn(1);
when(controller.getBaseSstableSize(anyInt())).thenReturn((double) minimalSizeBytes);
when(controller.maxConcurrentCompactions()).thenReturn(1000); // let it generate as many candidates as it can
when(controller.maxThroughput()).thenReturn(Double.MAX_VALUE);
when(controller.maxSSTablesToCompact()).thenReturn(1000);
when(controller.random()).thenCallRealMethod();
UnifiedCompactionStrategy strategy = new UnifiedCompactionStrategy(cfs, new HashMap<>(), controller);
assertNull(strategy.getNextBackgroundTask(FBUtilities.nowInSeconds()));
assertEquals(0, strategy.getEstimatedRemainingTasks());
}
@Test
public void testGetBucketsSameWUniqueArena()
{
final int m = 2; // minimal sorted run size in MB m
final Map<Integer, Integer> sstables = new TreeMap<>();
for (int i = 0; i < 20; i++)
{
int numSSTables = 2 + random.nextInt(18);
sstables.put(m * i, numSSTables);
}
// W = 3, o = 1 => F = 5, T = 5 => expected T sstables and 2 buckets: 0-10m, 10-50m
testGetBucketsOneArena(sstables, new int[] { 3 }, m, new int[] { 5, 5});
// W = 2, o = 1 => F = 4, T = 4 => expected T sstables and 3 buckets: 0-8m, 8-32m, 32-128m
testGetBucketsOneArena(sstables, new int[] { 2 }, m, new int[] { 4, 4, 4});
// W = 0, o = 1 => F = 2, T = 2 => expected 2 sstables and 5 buckets: 0-4m, 4-8m, 8-16m, 16-32m, 32-64m
testGetBucketsOneArena(sstables, new int[] { 0 }, m, new int[] { 2, 2, 2, 2, 2});
// W = -2, o = 1 => F = 4, T = 2 => expected 2 sstables and 3 buckets: 0-8mb, 8-32m, 32-128m
testGetBucketsOneArena(sstables, new int[] { -2 }, m, new int[] { 2, 2, 2});
// W = -3, o = 1 => F = 5, T = 2 => expected 2 sstables and 2 buckets: 0-10m, 10-50m
testGetBucketsOneArena(sstables, new int[] { -3 }, m, new int[] { 2, 2});
// remove sstables from 4m to 8m to create an empty bucket in the next call
sstables.remove(4); // 4m
sstables.remove(6); // 6m
sstables.remove(8); // 8m
// W = 0, o = 1 => F = 2, T = 2 => expected 2 sstables and 5 buckets: 0-4m, 4-8m, 8-16m, 16-32m, 32-64m
testGetBucketsOneArena(sstables, new int[] { 0 }, m, new int[] { 2, 2, 2, 2, 2});
}
@Test
public void testGetBucketsDifferentWsUniqueArena()
{
final int m = 2; // minimal sorted run size in MB m
final Map<Integer, Integer> sstables = new TreeMap<>();
for (int i : new int[] { 50, 100, 200, 400, 600, 800, 1000})
{
int numSSTables = 2 + random.nextInt(18);
sstables.put(i, numSSTables);
}
// W = [30, 2, -6], o = 1 => F = [32, 4, 8] , T = [32, 4, 2] => expected 3 buckets: 0-64m, 64-256m 256-2048m
testGetBucketsOneArena(sstables, new int[]{ 30, 2, -6 }, m, new int[] { 32, 4, 2});
// W = [30, 6, -8], o = 1 => F = [32, 8, 10] , T = [32, 8, 2] => expected 3 buckets: 0-64m, 64-544m 544-5440m
testGetBucketsOneArena(sstables, new int[]{ 30, 6, -8 }, m, new int[] { 32, 8, 2});
// W = [0, 0, 0, -2, -2], o = 1 => F = [2, 2, 2, 4, 4] , T = [2, 2, 2, 2, 2] => expected 6 buckets: 0-4m, 4-8m, 8-16m, 16-64m, 64-256m, 256-1024m
testGetBucketsOneArena(sstables, new int[]{ 0, 0, 0, -2, -2 }, m, new int[] { 2, 2, 2, 2, 2, 2});
}
private void testGetBucketsOneArena(Map<Integer, Integer> sstableMap, int[] Ws, int m, int[] expectedTs)
{
long minimalSizeBytes = m << 20;
Controller controller = Mockito.mock(Controller.class);
when(controller.getNumShards(anyDouble())).thenReturn(1);
when(controller.getBaseSstableSize(anyInt())).thenReturn((double) minimalSizeBytes);
when(controller.maxConcurrentCompactions()).thenReturn(1000); // let it generate as many candidates as it can
when(controller.maxThroughput()).thenReturn(Double.MAX_VALUE);
when(controller.maxSSTablesToCompact()).thenReturn(1000);
when(controller.getScalingParameter(anyInt())).thenAnswer(answer -> {
int index = answer.getArgument(0);
return Ws[index < Ws.length ? index : Ws.length - 1];
});
when(controller.getFanout(anyInt())).thenCallRealMethod();
when(controller.getThreshold(anyInt())).thenCallRealMethod();
when(controller.getMaxLevelDensity(anyInt(), anyDouble())).thenCallRealMethod();
when(controller.getSurvivalFactor(anyInt())).thenReturn(1.0);
when(controller.random()).thenCallRealMethod();
UnifiedCompactionStrategy strategy = new UnifiedCompactionStrategy(cfs, new HashMap<>(), controller);
IPartitioner partitioner = cfs.getPartitioner();
DecoratedKey first = new BufferDecoratedKey(partitioner.getMinimumToken(), ByteBuffer.allocate(0));
DecoratedKey last = new BufferDecoratedKey(partitioner.getMaximumToken(), ByteBuffer.allocate(0));
List<SSTableReader> sstables = new ArrayList<>();
long dataSetSizeBytes = 0;
for (Map.Entry<Integer, Integer> entry : sstableMap.entrySet())
{
for (int i = 0; i < entry.getValue(); i++)
{
// we want a number > 0 and < 1 so that the sstable has always some size and never crosses the boundary to the next bucket
// so we leave a 1% margin, picking a number from 0.01 to 0.99
double rand = 0.01 + 0.98 * random.nextDouble();
long sizeOnDiskBytes = (entry.getKey() << 20) + (long) (minimalSizeBytes * rand);
dataSetSizeBytes += sizeOnDiskBytes;
sstables.add(mockSSTable(sizeOnDiskBytes, System.currentTimeMillis(), first, last));
}
}
strategy.addSSTables(sstables);
dataTracker.addInitialSSTables(sstables);
List<UnifiedCompactionStrategy.Level> levels = strategy.getLevels();
assertEquals(expectedTs.length, levels.size());
for (int i = 0; i < expectedTs.length; i++)
{
UnifiedCompactionStrategy.Level level = levels.get(i);
assertEquals(i, level.getIndex());
UnifiedCompactionStrategy.SelectionContext context = new UnifiedCompactionStrategy.SelectionContext(strategy.getController());
UnifiedCompactionStrategy.CompactionPick pick = level.getCompactionPick(context);
assertEquals(level.getSSTables().size() >= expectedTs[i], pick != null);
}
}
@Test
public void testPreserveLayout_W2_947()
{
testPreserveLayout(2, 947);
}
@Test
public void testPreserveLayout_WM2_947()
{
testPreserveLayout(-2, 947);
}
@Test
public void testPreserveLayout_W2_251()
{
testPreserveLayout(2, 251);
}
@Test
public void testPreserveLayout_WM2_251()
{
testPreserveLayout(-2, 251);
}
@Test
public void testPreserveLayout_W2_320()
{
testPreserveLayout(2, 320);
}
@Test
public void testPreserveLayout_WM2_320()
{
testPreserveLayout(-2, 320);
}
@Test
public void testPreserveLayout_WM2_947_128()
{
testLayout(-2, 947, 128);
}
@Test
public void testPreserveLayout_WM2_947_64()
{
testLayout(-2, 947, 64);
}
public void testPreserveLayout(int W, int numSSTables)
{
testLayout(W, numSSTables, 10000);
}
@Test
public void testMaxSSTablesToCompact()
{
testLayout(2, 944, 60);
testLayout(2, 944, 1000);
testLayout(2, 944, 100);
testLayout(2, 803, 200);
}
public void testLayout(int W, int numSSTables, int maxSSTablesToCompact)
{
int F = 2 + Math.abs(W);
int T = W < 0 ? 2 : F;
final long minSstableSizeBytes = 2L << 20; // 2 MB
final int numShards = 1;
final int levels = (int) Math.floor(Math.log(numSSTables) / Math.log(F)) + 1;
Controller controller = Mockito.mock(Controller.class);
when(controller.getScalingParameter(anyInt())).thenReturn(W);
when(controller.getFanout(anyInt())).thenCallRealMethod();
when(controller.getThreshold(anyInt())).thenCallRealMethod();
when(controller.getMaxLevelDensity(anyInt(), anyDouble())).thenCallRealMethod();
when(controller.getSurvivalFactor(anyInt())).thenReturn(1.0);
when(controller.getNumShards(anyDouble())).thenReturn(numShards);
when(controller.getBaseSstableSize(anyInt())).thenReturn((double) minSstableSizeBytes);
if (maxSSTablesToCompact >= numSSTables)
when(controller.maxConcurrentCompactions()).thenReturn(levels * (W < 0 ? 1 : F)); // make sure the work is assigned to different levels
else
when(controller.maxConcurrentCompactions()).thenReturn(1000); // make sure the work is assigned to different levels
when(controller.maxThroughput()).thenReturn(Double.MAX_VALUE);
when(controller.maxSSTablesToCompact()).thenReturn(maxSSTablesToCompact);
Random random = Mockito.mock(Random.class);
when(random.nextInt(anyInt())).thenReturn(0);
when(controller.random()).thenReturn(random);
UnifiedCompactionStrategy strategy = new UnifiedCompactionStrategy(cfs, new HashMap<>(), controller);
List<SSTableReader> allSstables = new ArrayList<>();
List<SSTableReader> sstables = mockSSTables(numSSTables,
0,
System.currentTimeMillis(),
0);
allSstables.addAll(sstables);
strategy.addSSTables(allSstables);
dataTracker.addInitialSSTables(allSstables);
int num = numSSTables;
UnifiedCompactionStrategy.CompactionPick task;
while (true)
{
task = strategy.getNextCompactionPick(0); // do not check expiration
if (task == null)
break;
boolean layout = Math.min(num, maxSSTablesToCompact) > F * F;
int limit;
if (layout)
{
int forLimitLevel = (int) (Math.pow(F, Math.floor(Math.log(maxSSTablesToCompact) / Math.log(F))));
// for clarification see W < 0 case in layoutCompactions method
limit = W < 0 ? maxSSTablesToCompact / forLimitLevel * forLimitLevel : forLimitLevel;
}
else
limit = maxSSTablesToCompact;
int expected = num;
if (layout)
{
int forTopLevel = (int) (Math.pow(F, Math.floor(Math.log(num) / Math.log(F))));
expected = W > 0
? forTopLevel
: num / forTopLevel * forTopLevel;
}
expected = Math.min(expected, limit);
int count = task.size();
assertEquals(expected, count);
for (SSTableReader rdr : task)
strategy.removeSSTable(rdr);
num -= count;
}
// Check that we issue all the compactions
assertTrue(num < T);
}
private static Map<Long, Integer> mapFromPair(Pair<Long, Integer> ... pairs)
{
Map<Long, Integer> ret = new HashMap<>();
for (Pair<Long, Integer> pair : pairs)
{
ret.put(pair.left, pair.right);
}
return ret;
}
@Test
public void testGetNextBackgroundTasks()
{
Controller controller = Mockito.mock(Controller.class);
long minimalSizeBytes = 2 << 20;
when(controller.getScalingParameter(anyInt())).thenReturn(0);
when(controller.getFanout(anyInt())).thenCallRealMethod();
when(controller.getThreshold(anyInt())).thenCallRealMethod();
when(controller.getMaxLevelDensity(anyInt(), anyDouble())).thenCallRealMethod();
when(controller.getSurvivalFactor(anyInt())).thenReturn(1.0);
when(controller.getNumShards(anyDouble())).thenReturn(1);
when(controller.getBaseSstableSize(anyInt())).thenReturn((double) minimalSizeBytes);
when(controller.maxConcurrentCompactions()).thenReturn(1000); // let it generate as many candidates as it can
when(controller.maxThroughput()).thenReturn(Double.MAX_VALUE);
when(controller.maxSSTablesToCompact()).thenReturn(1000);
when(controller.random()).thenCallRealMethod();
UnifiedCompactionStrategy strategy = new UnifiedCompactionStrategy(cfs, new HashMap<>(), controller);
IPartitioner partitioner = cfs.getPartitioner();
List<SSTableReader> sstables = createSStables(partitioner);
strategy.addSSTables(sstables);
dataTracker.addInitialSSTables(sstables);
AbstractCompactionTask task = strategy.getNextBackgroundTask(0);
assertSame(UnifiedCompactionTask.class, task.getClass());
task.transaction.abort();
}
private List<SSTableReader> createSStables(IPartitioner partitioner)
{
return createSStables(partitioner, mapFromPair(Pair.create(4 * ONE_MB, 4)), 10000);
}
private List<SSTableReader> createSStables(IPartitioner partitioner, int ttl)
{
return createSStables(partitioner, mapFromPair(Pair.create(4 * ONE_MB, 4)), ttl);
}
private List<SSTableReader> createSStables(IPartitioner partitioner, Map<Long, Integer> sstablesMap)
{
return createSStables(partitioner, sstablesMap, 10000);
}
// Used to make sure timestamps are not exactly the same, which disables expiration
int millisAdjustment = 0;
private List<SSTableReader> createSStables(IPartitioner partitioner,
Map<Long, Integer> sstablesMap,
int ttl)
{
List<SSTableReader> mockSSTables = new ArrayList<>();
Token min = partitioner.getMinimumToken();
Token max = partitioner.getMaximumToken();
ByteBuffer bb = ByteBuffer.allocate(0);
sstablesMap.forEach((size, num) -> {
Token first = min.getPartitioner().split(min, max, 0.01);
for (int i = 0; i < num; i++)
{
// pending repair
mockSSTables.add(mockSSTable(0,
size,
System.currentTimeMillis() + millisAdjustment++,
0.0,
new BufferDecoratedKey(first, bb),
new BufferDecoratedKey(max, bb),
ttl));
first = first.nextValidToken();
}
});
return mockSSTables;
}
@Test
public void testDropExpiredSSTables()
{
testDropExpiredFromBucket(1);
testDropExpiredAndCompactNonExpired();
}
private void testDropExpiredFromBucket(int numShards)
{
Controller controller = Mockito.mock(Controller.class);
long minimalSizeBytes = 2 << 20;
when(controller.getMaxLevelDensity(anyInt(), anyDouble())).thenCallRealMethod();
when(controller.getScalingParameter(anyInt())).thenReturn(3); // T=5
when(controller.getFanout(anyInt())).thenCallRealMethod();
when(controller.getThreshold(anyInt())).thenCallRealMethod();
when(controller.getSurvivalFactor(anyInt())).thenReturn(1.0);
when(controller.getNumShards(anyDouble())).thenReturn(numShards);
when(controller.getBaseSstableSize(anyInt())).thenReturn((double) minimalSizeBytes);
when(controller.maxConcurrentCompactions()).thenReturn(1000); // let it generate as many candidates as it can
when(controller.maxThroughput()).thenReturn(Double.MAX_VALUE);
when(controller.maxSSTablesToCompact()).thenReturn(1000);
when(controller.getIgnoreOverlapsInExpirationCheck()).thenReturn(false);
when(controller.random()).thenCallRealMethod();
UnifiedCompactionStrategy strategy = new UnifiedCompactionStrategy(cfs, new HashMap<>(), controller);
strategy.startup();
List<SSTableReader> sstables = createSStables(cfs.getPartitioner());
// Tracker#addSSTables also tries to backup SSTables, so we use addInitialSSTables and notify explicitly
strategy.addSSTables(sstables);
dataTracker.addInitialSSTables(sstables);
try
{
// nothing to compact yet
assertNull(strategy.getNextCompactionPick(0));
long timestamp = sstables.get(sstables.size() - 1).getMaxLocalDeletionTime();
long expirationPoint = timestamp + 1;
UnifiedCompactionStrategy.CompactionPick pick = strategy.getNextCompactionPick(expirationPoint);
assertNotNull(pick);
assertEquals(sstables.size(), pick.size());
assertEquals(-1, pick.level);
}
finally
{
strategy.shutdown();
}
}
private void testDropExpiredAndCompactNonExpired()
{
Controller controller = Mockito.mock(Controller.class);
long minimalSizeBytes = 2 << 20;
when(controller.getMaxLevelDensity(anyInt(), anyDouble())).thenCallRealMethod();
when(controller.getScalingParameter(anyInt())).thenReturn(2);
when(controller.getFanout(anyInt())).thenCallRealMethod();
when(controller.getThreshold(anyInt())).thenCallRealMethod();
when(controller.getSurvivalFactor(anyInt())).thenReturn(1.0);
when(controller.getNumShards(anyDouble())).thenReturn(1);
when(controller.getBaseSstableSize(anyInt())).thenReturn((double) minimalSizeBytes);
when(controller.maxConcurrentCompactions()).thenReturn(1000); // let it generate as many candidates as it can
when(controller.maxThroughput()).thenReturn(Double.MAX_VALUE);
when(controller.getIgnoreOverlapsInExpirationCheck()).thenReturn(false);
when(controller.maxSSTablesToCompact()).thenReturn(1000);
when(controller.random()).thenCallRealMethod();
UnifiedCompactionStrategy strategy = new UnifiedCompactionStrategy(cfs, new HashMap<>(), controller);
strategy.startup();
List<SSTableReader> expiredSSTables = createSStables(cfs.getPartitioner(), 1000);
List<SSTableReader> nonExpiredSSTables = createSStables(cfs.getPartitioner(), 0);
strategy.addSSTables(expiredSSTables);
strategy.addSSTables(nonExpiredSSTables.subList(0, 3));
dataTracker.addInitialSSTables(ImmutableList.copyOf(Iterables.concat(expiredSSTables, nonExpiredSSTables)));
long timestamp = expiredSSTables.get(expiredSSTables.size() - 1).getMaxLocalDeletionTime();
long expirationPoint = timestamp + 1;
try
{
UnifiedCompactionStrategy.CompactionPick pick = strategy.getNextCompactionPick(expirationPoint);
assertEquals(expiredSSTables.size(), pick.size());
assertEquals(-1, pick.level);
strategy.addSSTables(nonExpiredSSTables); // duplicates should be skipped
pick = strategy.getNextCompactionPick(expirationPoint);
assertEquals(expiredSSTables.size() + nonExpiredSSTables.size(), pick.size());
assertEquals(0, pick.level);
}
finally
{
strategy.shutdown();
}
}
@Test
public void testPending()
{
Controller controller = Mockito.mock(Controller.class);
when(controller.getScalingParameter(anyInt())).thenReturn(8); // F=10, T=10
when(controller.getFanout(anyInt())).thenCallRealMethod();
when(controller.getThreshold(anyInt())).thenCallRealMethod();
when(controller.maxSSTablesToCompact()).thenReturn(10); // same as fanout
long minimalSizeBytes = 2 << 20;
when(controller.getMaxLevelDensity(anyInt(), anyDouble())).thenCallRealMethod();
when(controller.getSurvivalFactor(anyInt())).thenReturn(1.0);
when(controller.getNumShards(anyDouble())).thenReturn(1);
when(controller.getBaseSstableSize(anyInt())).thenReturn((double) minimalSizeBytes);
when(controller.maxConcurrentCompactions()).thenReturn(1000); // let it generate as many candidates as it can
when(controller.maxThroughput()).thenReturn(Double.MAX_VALUE);
when(controller.getIgnoreOverlapsInExpirationCheck()).thenReturn(false);
when(controller.random()).thenCallRealMethod();
UnifiedCompactionStrategy strategy = new UnifiedCompactionStrategy(cfs, new HashMap<>(), controller);
strategy.startup();
int count = 91;
List<SSTableReader> sstables = createSStables(cfs.getPartitioner(),
mapFromPair(Pair.create(4 * ONE_MB, count)));
strategy.addSSTables(sstables);
dataTracker.addInitialSSTables(sstables);
UnifiedCompactionStrategy.CompactionPick pick = strategy.getNextCompactionPick(0);
assertNotNull(pick);
assertEquals(9, strategy.getEstimatedRemainingTasks());
}
@Test
public void testMaximalSelection()
{
Set<SSTableReader> allSSTables = new HashSet<>();
allSSTables.addAll(mockNonOverlappingSSTables(10, 0, 100 << 20));
allSSTables.addAll(mockNonOverlappingSSTables(15, 1, 200 << 20));
allSSTables.addAll(mockNonOverlappingSSTables(25, 2, 400 << 20));
Controller controller = Mockito.mock(Controller.class);
UnifiedCompactionStrategy strategy = new UnifiedCompactionStrategy(cfs, new HashMap<>(), controller);
strategy.addSSTables(allSSTables);
dataTracker.addInitialSSTables(allSSTables);
Collection<AbstractCompactionTask> tasks = strategy.getMaximalTask(0, false);
assertEquals(5, tasks.size()); // 5 (gcd of 10,15,25) common boundaries
for (AbstractCompactionTask task : tasks)
{
Set<SSTableReader> compacting = task.transaction.originals();
assertEquals(2 + 3 + 5, compacting.size()); // count / gcd sstables of each level
assertEquals((2 * 100L + 3 * 200 + 5 * 400) << 20, compacting.stream().mapToLong(SSTableReader::onDiskLength).sum());
// None of the selected sstables may intersect any in any other set.
for (AbstractCompactionTask task2 : tasks)
{
if (task == task2)
continue;
Set<SSTableReader> compacting2 = task2.transaction.originals();
for (SSTableReader r1 : compacting)
for (SSTableReader r2 : compacting2)
assertTrue(r1 + " intersects " + r2, r1.getFirst().compareTo(r2.getLast()) > 0 || r1.getLast().compareTo(r2.getFirst()) < 0);
}
}
}
@Test
public void testBucketSelectionSimple()
{
testBucketSelection(repeats(4, 10), repeats(10, 4), Overlaps.InclusionMethod.TRANSITIVE);
}
@Test
public void testBucketSelectionHalved()
{
testBucketSelection(repeats(4, arr(10, 5)), repeats(5, 6), Overlaps.InclusionMethod.TRANSITIVE);
testBucketSelection(repeats(4, arr(10, 5)), repeats(5, 6), Overlaps.InclusionMethod.SINGLE);
// When we take large sstables for one compaction, remaining overlaps don't have enough to trigger next
testBucketSelection(repeats(4, arr(10, 5)), repeats(5, 4), Overlaps.InclusionMethod.NONE, 10);
}
@Test
public void testBucketSelectionFives()
{
testBucketSelection(arr(25, 15, 10), repeats(5, arr(10)), Overlaps.InclusionMethod.TRANSITIVE);
testBucketSelection(arr(25, 15, 10), repeats(10, arr(6, 4)), Overlaps.InclusionMethod.SINGLE);
// When we take large sstables for one compaction, remaining overlaps don't have enough to trigger next
testBucketSelection(arr(25, 15, 10), repeats(10, arr(3)), Overlaps.InclusionMethod.NONE, 20);
}
@Test
public void testBucketSelectionMissing()
{
testBucketSelection(repeats(4,5), repeats(4, 4), Overlaps.InclusionMethod.TRANSITIVE, 3, 1);
}
@Test
public void testBucketSelectionHalvesMissing()
{
// Drop one half: still compact because of overlap
// Note: picks are returned right-to-left because the random mock always returns 0, picking the last bucket.
testBucketSelection(repeats(4, arr(6, 3)), arr(6, 6, 5), Overlaps.InclusionMethod.TRANSITIVE, 0, 1);
// Drop one full: don't compact
testBucketSelection(repeats(4, arr(3, 6)), arr(6, 6), Overlaps.InclusionMethod.TRANSITIVE, 5, 1);
// Drop two adjacent halves: don't compact
testBucketSelection(repeats(4, arr(6, 3)), arr(6, 6), Overlaps.InclusionMethod.TRANSITIVE, 4, 2, 3);
}
private int[] arr(int... values)
{
return values;
}
private int[] repeats(int count, int... values)
{
int[] rep = new int[count];
for (int i = 0; i < count; ++i)
rep[i] = values[i % values.length];
return rep;
}
public void testBucketSelection(int[] counts, int[] expecteds, Overlaps.InclusionMethod overlapInclusionMethod)
{
testBucketSelection(counts, expecteds, overlapInclusionMethod, 0);
}
public void testBucketSelection(int[] counts, int[] expecteds, Overlaps.InclusionMethod overlapInclusionMethod, int expectedRemaining, int... dropFromFirst)
{
Set<SSTableReader> allSSTables = new HashSet<>();
int fanout = counts.length;
for (int i = 0; i < fanout; ++i)
{
final int count = counts[i];
final List<SSTableReader> list = mockNonOverlappingSSTables(count, 0, (100 << 20) / count);
if (i == 0)
{
for (int k = dropFromFirst.length - 1; k >= 0; --k)
list.remove(dropFromFirst[k]);
}
allSSTables.addAll(list);
}
Controller controller = Mockito.mock(Controller.class);
when(controller.getScalingParameter(anyInt())).thenReturn(fanout - 2); // F=T=fanout
when(controller.getFanout(anyInt())).thenCallRealMethod();
when(controller.getThreshold(anyInt())).thenCallRealMethod();
when(controller.getMaxLevelDensity(anyInt(), anyDouble())).thenCallRealMethod();
when(controller.getSurvivalFactor(anyInt())).thenReturn(1.0);
when(controller.getNumShards(anyDouble())).thenReturn(1);
when(controller.getBaseSstableSize(anyInt())).thenReturn((double) (90 << 20));
when(controller.maxConcurrentCompactions()).thenReturn(1000); // let it generate as many candidates as it can
when(controller.maxThroughput()).thenReturn(Double.MAX_VALUE);
when(controller.getIgnoreOverlapsInExpirationCheck()).thenReturn(false);
when(controller.overlapInclusionMethod()).thenReturn(overlapInclusionMethod);
Random randomMock = Mockito.mock(Random.class);
when(randomMock.nextInt(anyInt())).thenReturn(0);
when(controller.random()).thenReturn(randomMock);
UnifiedCompactionStrategy strategy = new UnifiedCompactionStrategy(cfs, new HashMap<>(), controller);
strategy.addSSTables(allSSTables);
dataTracker.addInitialSSTables(allSSTables);
List<UnifiedCompactionStrategy.CompactionPick> picks = new ArrayList<>();
while (true)
{
UnifiedCompactionStrategy.CompactionPick pick = strategy.getNextCompactionPick(0);
if (pick == null)
break;
strategy.removeSSTables(pick);
picks.add(pick);
}
assertEquals(expectedRemaining, strategy.getSSTables().size());
assertEquals(expecteds.length, picks.size());
int buckIdx = 0;
for (UnifiedCompactionStrategy.CompactionPick pick : picks)
{
int expectedCount = expecteds[buckIdx++];
assertEquals(expectedCount, pick.size()); // count / gcd sstables of each level
if (overlapInclusionMethod == Overlaps.InclusionMethod.TRANSITIVE)
{
// None of the selected sstables may intersect any in any other set.
for (UnifiedCompactionStrategy.CompactionPick pick2 : picks)
{
if (pick == pick2)
continue;
for (SSTableReader r1 : pick)
for (SSTableReader r2 : pick2)
assertTrue(r1 + " intersects " + r2, r1.getFirst().compareTo(r2.getLast()) > 0 || r1.getLast().compareTo(r2.getFirst()) < 0);
}
}
}
}
SSTableReader mockSSTable(int level, long bytesOnDisk, long timestamp, double hotness, DecoratedKey first, DecoratedKey last)
{
return mockSSTable(level, bytesOnDisk, timestamp, hotness, first, last, 0);
}
SSTableReader mockSSTable(long bytesOnDisk, long timestamp, DecoratedKey first, DecoratedKey last)
{
return mockSSTable(0, bytesOnDisk, timestamp, 0, first, last, 0);
}
SSTableReader mockSSTable(int level,
long bytesOnDisk,
long timestamp,
double hotness,
DecoratedKey first,
DecoratedKey last,
int ttl)
{
// We create a ton of mock SSTables that mockito is going to keep until the end of the test suite without stubOnly.
// Mockito keeps them alive to preserve the history of invocations which is not available for stubs. If we ever
// need history of invocations and remove stubOnly, we should also manually reset mocked SSTables in tearDown.
SSTableReader ret = Mockito.mock(SSTableReader.class, withSettings().stubOnly()
.defaultAnswer(RETURNS_SMART_NULLS));
when(ret.getSSTableLevel()).thenReturn(level);
when(ret.onDiskLength()).thenReturn(bytesOnDisk);
when(ret.uncompressedLength()).thenReturn(bytesOnDisk); // let's assume no compression
when(ret.getMaxTimestamp()).thenReturn(timestamp);
when(ret.getMinTimestamp()).thenReturn(timestamp);
when(ret.getFirst()).thenReturn(first);
when(ret.getLast()).thenReturn(last);
when(ret.getInterval()).thenReturn(new Interval<>(first, last, ret));
when(ret.instanceId()).thenReturn(TimeUUID.Generator.nextTimeUUID(UNIQUE_IDENTIFIER_FACTORY));
when(ret.compareTo(any())).thenCallRealMethod();
when(ret.isMarkedSuspect()).thenReturn(false);
when(ret.isRepaired()).thenReturn(false);
when(ret.getRepairedAt()).thenReturn(repairedAt);
when(ret.getPendingRepair()).thenReturn(null);
when(ret.isPendingRepair()).thenReturn(false);
when(ret.getColumnFamilyName()).thenReturn(table);
when(ret.toString()).thenReturn(String.format("Bytes on disk: %s, level %d, hotness %f, timestamp %d, first %s, last %s",
FBUtilities.prettyPrintMemory(bytesOnDisk), level, hotness, timestamp, first, last));
long deletionTime;
if (ttl > 0)
deletionTime = TimeUnit.MILLISECONDS.toSeconds(timestamp) + ttl;
else
deletionTime = Long.MAX_VALUE;
when(ret.getMinLocalDeletionTime()).thenReturn(deletionTime);
when(ret.getMaxLocalDeletionTime()).thenReturn(deletionTime);
when(ret.getMinTTL()).thenReturn(ttl);
when(ret.getMaxTTL()).thenReturn(ttl);
return ret;
}
List<SSTableReader> mockSSTables(int numSSTables, long bytesOnDisk, double hotness, long timestamp)
{
DecoratedKey first = new BufferDecoratedKey(partitioner.getMinimumToken(), ByteBuffer.allocate(0));
DecoratedKey last = new BufferDecoratedKey(partitioner.getMinimumToken(), ByteBuffer.allocate(0));
List<SSTableReader> sstables = new ArrayList<>();
for (int i = 0; i < numSSTables; i++)
{
long b = (long)(bytesOnDisk * 0.95 + bytesOnDisk * 0.05 * random.nextDouble()); // leave 5% variability
double h = hotness * 0.95 + hotness * 0.05 * random.nextDouble(); // leave 5% variability
sstables.add(mockSSTable(0, b, timestamp, h, first, last, 0));
}
return sstables;
}
List<SSTableReader> mockNonOverlappingSSTables(int numSSTables, int level, long bytesOnDisk)
{
if (!partitioner.splitter().isPresent())
throw new IllegalStateException(String.format("Cannot split ranges with current partitioner %s", partitioner));
ByteBuffer emptyBuffer = ByteBuffer.allocate(0);
long timestamp = System.currentTimeMillis();
List<SSTableReader> sstables = new ArrayList<>(numSSTables);
for (int i = 0; i < numSSTables; i++)
{
DecoratedKey first = new BufferDecoratedKey(boundary(numSSTables, i).nextValidToken(), emptyBuffer);
DecoratedKey last = new BufferDecoratedKey(boundary(numSSTables, i+1), emptyBuffer);
sstables.add(mockSSTable(level, bytesOnDisk, timestamp, 0., first, last));
timestamp+=10;
}
return sstables;
}
private Token boundary(int numSSTables, int i)
{
return partitioner.split(partitioner.getMinimumToken(), partitioner.getMaximumToken(), i * 1.0 / numSSTables);
}
}