blob: 91b9b3b7d3722c3dfeb26d7dd6549ac2c6bea693 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.cassandra.db.compaction;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.Set;
import java.util.UUID;
import java.util.stream.Collectors;
import junit.framework.Assert;
import org.junit.After;
import org.junit.Before;
import org.junit.BeforeClass;
import com.google.common.collect.Iterables;
import com.google.common.collect.Sets;
import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.cassandra.MockSchema;
import org.apache.cassandra.OrderedJUnit4ClassRunner;
import org.apache.cassandra.SchemaLoader;
import org.apache.cassandra.Util;
import org.apache.cassandra.UpdateBuilder;
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.DecoratedKey;
import org.apache.cassandra.db.Keyspace;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.exceptions.ConfigurationException;
import org.apache.cassandra.io.sstable.ISSTableScanner;
import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.apache.cassandra.notifications.SSTableAddedNotification;
import org.apache.cassandra.notifications.SSTableRepairStatusChanged;
import org.apache.cassandra.repair.RepairJobDesc;
import org.apache.cassandra.repair.Validator;
import org.apache.cassandra.schema.CompactionParams;
import org.apache.cassandra.schema.KeyspaceParams;
import org.apache.cassandra.service.ActiveRepairService;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.Pair;
import static java.util.Collections.singleton;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
@RunWith(OrderedJUnit4ClassRunner.class)
public class LeveledCompactionStrategyTest
{
private static final Logger logger = LoggerFactory.getLogger(LeveledCompactionStrategyTest.class);
private static final String KEYSPACE1 = "LeveledCompactionStrategyTest";
private static final String CF_STANDARDDLEVELED = "StandardLeveled";
private Keyspace keyspace;
private ColumnFamilyStore cfs;
@BeforeClass
public static void defineSchema() throws ConfigurationException
{
// Disable tombstone histogram rounding for tests
System.setProperty("cassandra.streaminghistogram.roundseconds", "1");
SchemaLoader.prepareServer();
SchemaLoader.createKeyspace(KEYSPACE1,
KeyspaceParams.simple(1),
SchemaLoader.standardCFMD(KEYSPACE1, CF_STANDARDDLEVELED)
.compaction(CompactionParams.lcs(Collections.singletonMap("sstable_size_in_mb", "1"))));
}
@Before
public void enableCompaction()
{
keyspace = Keyspace.open(KEYSPACE1);
cfs = keyspace.getColumnFamilyStore(CF_STANDARDDLEVELED);
cfs.enableAutoCompaction();
}
/**
* Since we use StandardLeveled CF for every test, we want to clean up after the test.
*/
@After
public void truncateSTandardLeveled()
{
cfs.truncateBlocking();
}
/**
* Ensure that the grouping operation preserves the levels of grouped tables
*/
@Test
public void testGrouperLevels() throws Exception{
ByteBuffer value = ByteBuffer.wrap(new byte[100 * 1024]); // 100 KB value, make it easy to have multiple files
//Need entropy to prevent compression so size is predictable with compression enabled/disabled
new Random().nextBytes(value.array());
// Enough data to have a level 1 and 2
int rows = 40;
int columns = 20;
// Adds enough data to trigger multiple sstable per level
for (int r = 0; r < rows; r++)
{
UpdateBuilder update = UpdateBuilder.create(cfs.metadata, String.valueOf(r));
for (int c = 0; c < columns; c++)
update.newRow("column" + c).add("val", value);
update.applyUnsafe();
cfs.forceBlockingFlush();
}
waitForLeveling(cfs);
CompactionStrategyManager strategyManager = cfs.getCompactionStrategyManager();
// Checking we're not completely bad at math
int l1Count = strategyManager.getSSTableCountPerLevel()[1];
int l2Count = strategyManager.getSSTableCountPerLevel()[2];
if (l1Count == 0 || l2Count == 0)
{
logger.error("L1 or L2 has 0 sstables. Expected > 0 on both.");
logger.error("L1: " + l1Count);
logger.error("L2: " + l2Count);
Assert.fail();
}
Collection<Collection<SSTableReader>> groupedSSTables = cfs.getCompactionStrategyManager().groupSSTablesForAntiCompaction(cfs.getLiveSSTables());
for (Collection<SSTableReader> sstableGroup : groupedSSTables)
{
int groupLevel = -1;
Iterator<SSTableReader> it = sstableGroup.iterator();
while (it.hasNext())
{
SSTableReader sstable = it.next();
int tableLevel = sstable.getSSTableLevel();
if (groupLevel == -1)
groupLevel = tableLevel;
assert groupLevel == tableLevel;
}
}
}
/*
* This exercises in particular the code of #4142
*/
@Test
public void testValidationMultipleSSTablePerLevel() throws Exception
{
byte [] b = new byte[100 * 1024];
new Random().nextBytes(b);
ByteBuffer value = ByteBuffer.wrap(b); // 100 KB value, make it easy to have multiple files
// Enough data to have a level 1 and 2
int rows = 40;
int columns = 20;
// Adds enough data to trigger multiple sstable per level
for (int r = 0; r < rows; r++)
{
UpdateBuilder update = UpdateBuilder.create(cfs.metadata, String.valueOf(r));
for (int c = 0; c < columns; c++)
update.newRow("column" + c).add("val", value);
update.applyUnsafe();
cfs.forceBlockingFlush();
}
waitForLeveling(cfs);
CompactionStrategyManager strategyManager = cfs.getCompactionStrategyManager();
// Checking we're not completely bad at math
assertTrue(strategyManager.getSSTableCountPerLevel()[1] > 0);
assertTrue(strategyManager.getSSTableCountPerLevel()[2] > 0);
Range<Token> range = new Range<>(Util.token(""), Util.token(""));
int gcBefore = keyspace.getColumnFamilyStore(CF_STANDARDDLEVELED).gcBefore(FBUtilities.nowInSeconds());
UUID parentRepSession = UUID.randomUUID();
ActiveRepairService.instance.registerParentRepairSession(parentRepSession, FBUtilities.getBroadcastAddress(), Arrays.asList(cfs), Arrays.asList(range), false, System.currentTimeMillis(), true);
RepairJobDesc desc = new RepairJobDesc(parentRepSession, UUID.randomUUID(), KEYSPACE1, CF_STANDARDDLEVELED, Arrays.asList(range));
Validator validator = new Validator(desc, FBUtilities.getBroadcastAddress(), gcBefore);
CompactionManager.instance.submitValidation(cfs, validator).get();
}
/**
* wait for leveled compaction to quiesce on the given columnfamily
*/
public static void waitForLeveling(ColumnFamilyStore cfs) throws InterruptedException
{
CompactionStrategyManager strategyManager = cfs.getCompactionStrategyManager();
while (true)
{
// since we run several compaction strategies we wait until L0 in all strategies is empty and
// atleast one L1+ is non-empty. In these tests we always run a single data directory with only unrepaired data
// so it should be good enough
boolean allL0Empty = true;
boolean anyL1NonEmpty = false;
for (List<AbstractCompactionStrategy> strategies : strategyManager.getStrategies())
{
for (AbstractCompactionStrategy strategy : strategies)
{
if (!(strategy instanceof LeveledCompactionStrategy))
return;
// note that we check > 1 here, if there is too little data in L0, we don't compact it up to L1
if (((LeveledCompactionStrategy)strategy).getLevelSize(0) > 1)
allL0Empty = false;
for (int i = 1; i < 5; i++)
if (((LeveledCompactionStrategy)strategy).getLevelSize(i) > 0)
anyL1NonEmpty = true;
}
}
if (allL0Empty && anyL1NonEmpty)
return;
Thread.sleep(100);
}
}
@Test
public void testCompactionProgress() throws Exception
{
// make sure we have SSTables in L1
byte [] b = new byte[100 * 1024];
new Random().nextBytes(b);
ByteBuffer value = ByteBuffer.wrap(b);
int rows = 2;
int columns = 10;
for (int r = 0; r < rows; r++)
{
UpdateBuilder update = UpdateBuilder.create(cfs.metadata, String.valueOf(r));
for (int c = 0; c < columns; c++)
update.newRow("column" + c).add("val", value);
update.applyUnsafe();
cfs.forceBlockingFlush();
}
waitForLeveling(cfs);
LeveledCompactionStrategy strategy = (LeveledCompactionStrategy) cfs.getCompactionStrategyManager().getStrategies().get(1).get(0);
assert strategy.getLevelSize(1) > 0;
// get LeveledScanner for level 1 sstables
Collection<SSTableReader> sstables = strategy.manifest.getLevel(1);
List<ISSTableScanner> scanners = strategy.getScanners(sstables).scanners;
assertEquals(1, scanners.size()); // should be one per level
ISSTableScanner scanner = scanners.get(0);
// scan through to the end
while (scanner.hasNext())
scanner.next();
// scanner.getCurrentPosition should be equal to total bytes of L1 sstables
assertEquals(scanner.getCurrentPosition(), SSTableReader.getTotalUncompressedBytes(sstables));
}
@Test
public void testMutateLevel() throws Exception
{
cfs.disableAutoCompaction();
ByteBuffer value = ByteBuffer.wrap(new byte[100 * 1024]); // 100 KB value, make it easy to have multiple files
// Enough data to have a level 1 and 2
int rows = 40;
int columns = 20;
// Adds enough data to trigger multiple sstable per level
for (int r = 0; r < rows; r++)
{
UpdateBuilder update = UpdateBuilder.create(cfs.metadata, String.valueOf(r));
for (int c = 0; c < columns; c++)
update.newRow("column" + c).add("val", value);
update.applyUnsafe();
cfs.forceBlockingFlush();
}
cfs.forceBlockingFlush();
LeveledCompactionStrategy strategy = (LeveledCompactionStrategy) cfs.getCompactionStrategyManager().getStrategies().get(1).get(0);
cfs.forceMajorCompaction();
for (SSTableReader s : cfs.getLiveSSTables())
{
assertTrue(s.getSSTableLevel() != 6 && s.getSSTableLevel() > 0);
strategy.manifest.remove(s);
s.descriptor.getMetadataSerializer().mutateLevel(s.descriptor, 6);
s.reloadSSTableMetadata();
strategy.manifest.addSSTables(Collections.singleton(s));
}
// verify that all sstables in the changed set is level 6
for (SSTableReader s : cfs.getLiveSSTables())
assertEquals(6, s.getSSTableLevel());
int[] levels = strategy.manifest.getAllLevelSize();
// verify that the manifest has correct amount of sstables
assertEquals(cfs.getLiveSSTables().size(), levels[6]);
}
@Test
public void testNewRepairedSSTable() throws Exception
{
byte [] b = new byte[100 * 1024];
new Random().nextBytes(b);
ByteBuffer value = ByteBuffer.wrap(b); // 100 KB value, make it easy to have multiple files
// Enough data to have a level 1 and 2
int rows = 40;
int columns = 20;
// Adds enough data to trigger multiple sstable per level
for (int r = 0; r < rows; r++)
{
UpdateBuilder update = UpdateBuilder.create(cfs.metadata, String.valueOf(r));
for (int c = 0; c < columns; c++)
update.newRow("column" + c).add("val", value);
update.applyUnsafe();
cfs.forceBlockingFlush();
}
waitForLeveling(cfs);
cfs.disableAutoCompaction();
while(CompactionManager.instance.isCompacting(Arrays.asList(cfs)))
Thread.sleep(100);
CompactionStrategyManager manager = cfs.getCompactionStrategyManager();
List<List<AbstractCompactionStrategy>> strategies = manager.getStrategies();
LeveledCompactionStrategy repaired = (LeveledCompactionStrategy) strategies.get(0).get(0);
LeveledCompactionStrategy unrepaired = (LeveledCompactionStrategy) strategies.get(1).get(0);
assertEquals(0, repaired.manifest.getLevelCount() );
assertEquals(2, unrepaired.manifest.getLevelCount());
assertTrue(manager.getSSTableCountPerLevel()[1] > 0);
assertTrue(manager.getSSTableCountPerLevel()[2] > 0);
for (SSTableReader sstable : cfs.getLiveSSTables())
assertFalse(sstable.isRepaired());
int sstableCount = unrepaired.manifest.getSSTables().size();
// we only have unrepaired sstables:
assertEquals(sstableCount, cfs.getLiveSSTables().size());
SSTableReader sstable1 = unrepaired.manifest.getLevel(2).iterator().next();
SSTableReader sstable2 = unrepaired.manifest.getLevel(1).iterator().next();
sstable1.descriptor.getMetadataSerializer().mutateRepairedAt(sstable1.descriptor, System.currentTimeMillis());
sstable1.reloadSSTableMetadata();
assertTrue(sstable1.isRepaired());
manager.handleNotification(new SSTableRepairStatusChanged(Arrays.asList(sstable1)), this);
int repairedSSTableCount = repaired.manifest.getSSTables().size();
assertEquals(1, repairedSSTableCount);
// make sure the repaired sstable ends up in the same level in the repaired manifest:
assertTrue(repaired.manifest.getLevel(2).contains(sstable1));
// and that it is gone from unrepaired
assertFalse(unrepaired.manifest.getLevel(2).contains(sstable1));
unrepaired.removeSSTable(sstable2);
manager.handleNotification(new SSTableAddedNotification(singleton(sstable2)), this);
assertTrue(unrepaired.manifest.getLevel(1).contains(sstable2));
assertFalse(repaired.manifest.getLevel(1).contains(sstable2));
}
@Test
public void testTokenRangeCompaction() throws Exception
{
// Remove any existing data so we can start out clean with predictable number of sstables
cfs.truncateBlocking();
// Disable auto compaction so cassandra does not compact
CompactionManager.instance.disableAutoCompaction();
ByteBuffer value = ByteBuffer.wrap(new byte[100 * 1024]); // 100 KB value, make it easy to have multiple files
DecoratedKey key1 = Util.dk(String.valueOf(1));
DecoratedKey key2 = Util.dk(String.valueOf(2));
List<DecoratedKey> keys = new ArrayList<>(Arrays.asList(key1, key2));
int numIterations = 10;
int columns = 2;
// Add enough data to trigger multiple sstables.
// create 10 sstables that contain data for both key1 and key2
for (int i = 0; i < numIterations; i++) {
for (DecoratedKey key : keys) {
UpdateBuilder update = UpdateBuilder.create(cfs.metadata, key);
for (int c = 0; c < columns; c++)
update.newRow("column" + c).add("val", value);
update.applyUnsafe();
}
cfs.forceBlockingFlush();
}
// create 20 more sstables with 10 containing data for key1 and other 10 containing data for key2
for (int i = 0; i < numIterations; i++) {
for (DecoratedKey key : keys) {
UpdateBuilder update = UpdateBuilder.create(cfs.metadata, key);
for (int c = 0; c < columns; c++)
update.newRow("column" + c).add("val", value);
update.applyUnsafe();
cfs.forceBlockingFlush();
}
}
// We should have a total of 30 sstables by now
assertEquals(30, cfs.getLiveSSTables().size());
// Compact just the tables with key2
// Bit hackish to use the key1.token as the prior key but works in BytesToken
Range<Token> tokenRange = new Range<>(key2.getToken(), key2.getToken());
Collection<Range<Token>> tokenRanges = new ArrayList<>(Arrays.asList(tokenRange));
cfs.forceCompactionForTokenRange(tokenRanges);
while(CompactionManager.instance.isCompacting(Arrays.asList(cfs))) {
Thread.sleep(100);
}
// 20 tables that have key2 should have been compacted in to 1 table resulting in 11 (30-20+1)
assertEquals(11, cfs.getLiveSSTables().size());
// Compact just the tables with key1. At this point all 11 tables should have key1
Range<Token> tokenRange2 = new Range<>(key1.getToken(), key1.getToken());
Collection<Range<Token>> tokenRanges2 = new ArrayList<>(Arrays.asList(tokenRange2));
cfs.forceCompactionForTokenRange(tokenRanges2);
while(CompactionManager.instance.isCompacting(Arrays.asList(cfs))) {
Thread.sleep(100);
}
// the 11 tables containing key1 should all compact to 1 table
assertEquals(1, cfs.getLiveSSTables().size());
}
@Test
public void testCompactionCandidateOrdering() throws Exception
{
// add some data
byte [] b = new byte[100 * 1024];
new Random().nextBytes(b);
ByteBuffer value = ByteBuffer.wrap(b);
int rows = 4;
int columns = 10;
// Just keep sstables in L0 for this test
cfs.disableAutoCompaction();
for (int r = 0; r < rows; r++)
{
UpdateBuilder update = UpdateBuilder.create(cfs.metadata, String.valueOf(r));
for (int c = 0; c < columns; c++)
update.newRow("column" + c).add("val", value);
update.applyUnsafe();
cfs.forceBlockingFlush();
}
LeveledCompactionStrategy strategy = (LeveledCompactionStrategy) (cfs.getCompactionStrategyManager()).getStrategies().get(1).get(0);
// get readers for level 0 sstables
Collection<SSTableReader> sstables = strategy.manifest.getLevel(0);
Collection<SSTableReader> sortedCandidates = strategy.manifest.ageSortedSSTables(sstables);
assertTrue(String.format("More than 1 sstable required for test, found: %d .", sortedCandidates.size()), sortedCandidates.size() > 1);
long lastMaxTimeStamp = Long.MIN_VALUE;
for (SSTableReader sstable : sortedCandidates)
{
assertTrue(String.format("SStables not sorted into oldest to newest by maxTimestamp. Current sstable: %d , last sstable: %d", sstable.getMaxTimestamp(), lastMaxTimeStamp),
sstable.getMaxTimestamp() > lastMaxTimeStamp);
lastMaxTimeStamp = sstable.getMaxTimestamp();
}
}
@Test
public void testAddingOverlapping()
{
ColumnFamilyStore cfs = MockSchema.newCFS();
LeveledManifest lm = new LeveledManifest(cfs, 10, 10, new SizeTieredCompactionStrategyOptions());
List<SSTableReader> currentLevel = new ArrayList<>();
int gen = 1;
currentLevel.add(MockSchema.sstableWithLevel(gen++, 10, 20, 1, cfs));
currentLevel.add(MockSchema.sstableWithLevel(gen++, 21, 30, 1, cfs));
currentLevel.add(MockSchema.sstableWithLevel(gen++, 51, 100, 1, cfs));
currentLevel.add(MockSchema.sstableWithLevel(gen++, 80, 120, 1, cfs));
currentLevel.add(MockSchema.sstableWithLevel(gen++, 90, 150, 1, cfs));
lm.addSSTables(currentLevel);
assertLevelsEqual(lm.getLevel(1), currentLevel.subList(0, 3));
assertLevelsEqual(lm.getLevel(0), currentLevel.subList(3, 5));
List<SSTableReader> newSSTables = new ArrayList<>();
// this sstable last token is the same as the first token of L1 above, should get sent to L0:
newSSTables.add(MockSchema.sstableWithLevel(gen++, 5, 10, 1, cfs));
lm.addSSTables(newSSTables);
assertLevelsEqual(lm.getLevel(1), currentLevel.subList(0, 3));
assertEquals(0, newSSTables.get(0).getSSTableLevel());
assertTrue(lm.getLevel(0).containsAll(newSSTables));
newSSTables.clear();
newSSTables.add(MockSchema.sstableWithLevel(gen++, 30, 40, 1, cfs));
lm.addSSTables(newSSTables);
assertLevelsEqual(lm.getLevel(1), currentLevel.subList(0, 3));
assertEquals(0, newSSTables.get(0).getSSTableLevel());
assertTrue(lm.getLevel(0).containsAll(newSSTables));
newSSTables.clear();
newSSTables.add(MockSchema.sstableWithLevel(gen++, 100, 140, 1, cfs));
lm.addSSTables(newSSTables);
assertLevelsEqual(lm.getLevel(1), currentLevel.subList(0, 3));
assertEquals(0, newSSTables.get(0).getSSTableLevel());
assertTrue(lm.getLevel(0).containsAll(newSSTables));
newSSTables.clear();
newSSTables.add(MockSchema.sstableWithLevel(gen++, 100, 140, 1, cfs));
newSSTables.add(MockSchema.sstableWithLevel(gen++, 120, 140, 1, cfs));
lm.addSSTables(newSSTables);
List<SSTableReader> newL1 = new ArrayList<>(currentLevel.subList(0, 3));
newL1.add(newSSTables.get(1));
assertLevelsEqual(lm.getLevel(1), newL1);
newSSTables.remove(1);
assertTrue(newSSTables.stream().allMatch(s -> s.getSSTableLevel() == 0));
assertTrue(lm.getLevel(0).containsAll(newSSTables));
}
@Test
public void singleTokenSSTableTest()
{
ColumnFamilyStore cfs = MockSchema.newCFS();
LeveledManifest lm = new LeveledManifest(cfs, 10, 10, new SizeTieredCompactionStrategyOptions());
List<SSTableReader> expectedL1 = new ArrayList<>();
int gen = 1;
// single sstable, single token (100)
expectedL1.add(MockSchema.sstableWithLevel(gen++, 100, 100, 1, cfs));
lm.addSSTables(expectedL1);
List<SSTableReader> expectedL0 = new ArrayList<>();
// should get moved to L0:
expectedL0.add(MockSchema.sstableWithLevel(gen++, 99, 101, 1, cfs));
expectedL0.add(MockSchema.sstableWithLevel(gen++, 100, 101, 1, cfs));
expectedL0.add(MockSchema.sstableWithLevel(gen++, 99, 100, 1, cfs));
expectedL0.add(MockSchema.sstableWithLevel(gen++, 100, 100, 1, cfs));
lm.addSSTables(expectedL0);
assertLevelsEqual(expectedL0, lm.getLevel(0));
assertTrue(expectedL0.stream().allMatch(s -> s.getSSTableLevel() == 0));
assertLevelsEqual(expectedL1, lm.getLevel(1));
assertTrue(expectedL1.stream().allMatch(s -> s.getSSTableLevel() == 1));
// should work:
expectedL1.add(MockSchema.sstableWithLevel(gen++, 98, 99, 1, cfs));
expectedL1.add(MockSchema.sstableWithLevel(gen++, 101, 101, 1, cfs));
lm.addSSTables(expectedL1.subList(1, expectedL1.size()));
assertLevelsEqual(expectedL1, lm.getLevel(1));
}
@Test
public void randomMultiLevelAddTest()
{
int iterations = 100;
int levelCount = 9;
ColumnFamilyStore cfs = MockSchema.newCFS();
LeveledManifest lm = new LeveledManifest(cfs, 10, 10, new SizeTieredCompactionStrategyOptions());
long seed = System.currentTimeMillis();
Random r = new Random(seed);
List<SSTableReader> newLevels = generateNewRandomLevels(cfs, 40, levelCount, 0, r);
int sstableCount = newLevels.size();
lm.addSSTables(newLevels);
int [] expectedLevelSizes = lm.getAllLevelSize();
for (int j = 0; j < iterations; j++)
{
newLevels = generateNewRandomLevels(cfs, 20, levelCount, sstableCount, r);
sstableCount += newLevels.size();
int[] canAdd = canAdd(lm, newLevels, levelCount);
for (int i = 0; i < levelCount; i++)
expectedLevelSizes[i] += canAdd[i];
lm.addSSTables(newLevels);
}
// and verify no levels overlap
int actualSSTableCount = 0;
for (int i = 0; i < levelCount; i++)
{
actualSSTableCount += lm.getLevelSize(i);
List<SSTableReader> level = new ArrayList<>(lm.getLevel(i));
int lvl = i;
assertTrue(level.stream().allMatch(s -> s.getSSTableLevel() == lvl));
if (i > 0)
{
level.sort(SSTableReader.sstableComparator);
SSTableReader prev = null;
for (SSTableReader sstable : level)
{
if (prev != null && sstable.first.compareTo(prev.last) <= 0)
{
String levelStr = level.stream().map(s -> String.format("[%s, %s]", s.first, s.last)).collect(Collectors.joining(", "));
String overlap = String.format("sstable [%s, %s] overlaps with [%s, %s] in level %d (%s) ", sstable.first, sstable.last, prev.first, prev.last, i, levelStr);
Assert.fail("[seed = "+seed+"] overlap in level "+lvl+": " + overlap);
}
prev = sstable;
}
}
}
assertEquals(sstableCount, actualSSTableCount);
for (int i = 0; i < levelCount; i++)
assertEquals("[seed = " + seed + "] wrong sstable count in level = " + i, expectedLevelSizes[i], lm.getLevel(i).size());
}
private static List<SSTableReader> generateNewRandomLevels(ColumnFamilyStore cfs, int maxSSTableCountPerLevel, int levelCount, int startGen, Random r)
{
List<SSTableReader> newLevels = new ArrayList<>();
for (int level = 0; level < levelCount; level++)
{
int numLevelSSTables = r.nextInt(maxSSTableCountPerLevel) + 1;
List<Integer> tokens = new ArrayList<>(numLevelSSTables * 2);
for (int i = 0; i < numLevelSSTables * 2; i++)
tokens.add(r.nextInt(4000));
Collections.sort(tokens);
for (int i = 0; i < tokens.size() - 1; i += 2)
{
SSTableReader sstable = MockSchema.sstableWithLevel(++startGen, tokens.get(i), tokens.get(i + 1), level, cfs);
newLevels.add(sstable);
}
}
return newLevels;
}
/**
* brute-force checks if the new sstables can be added to the correct level in manifest
*
* @return count of expected sstables to add to each level
*/
private static int[] canAdd(LeveledManifest lm, List<SSTableReader> newSSTables, int levelCount)
{
Map<Integer, Collection<SSTableReader>> sstableGroups = new HashMap<>();
newSSTables.forEach(s -> sstableGroups.computeIfAbsent(s.getSSTableLevel(), k -> new ArrayList<>()).add(s));
int[] canAdd = new int[levelCount];
for (Map.Entry<Integer, Collection<SSTableReader>> lvlGroup : sstableGroups.entrySet())
{
int level = lvlGroup.getKey();
if (level == 0)
{
canAdd[0] += lvlGroup.getValue().size();
continue;
}
List<SSTableReader> newLevel = new ArrayList<>(lm.getLevel(level));
for (SSTableReader sstable : lvlGroup.getValue())
{
newLevel.add(sstable);
newLevel.sort(SSTableReader.sstableComparator);
SSTableReader prev = null;
boolean kept = true;
for (SSTableReader sst : newLevel)
{
if (prev != null && prev.last.compareTo(sst.first) >= 0)
{
newLevel.remove(sstable);
kept = false;
break;
}
prev = sst;
}
if (kept)
canAdd[level] += 1;
else
canAdd[0] += 1;
}
}
return canAdd;
}
private static void assertLevelsEqual(Collection<SSTableReader> l1, Collection<SSTableReader> l2)
{
assertEquals(l1.size(), l2.size());
assertEquals(new HashSet<>(l1), new HashSet<>(l2));
}
@Test
public void testHighestLevelHasMoreDataThanSupported()
{
ColumnFamilyStore cfs = MockSchema.newCFS();
int fanoutSize = 2; // to generate less sstables
LeveledManifest lm = new LeveledManifest(cfs, 1, fanoutSize, new SizeTieredCompactionStrategyOptions());
// generate data for L7 to trigger compaction
int l7 = 7;
int maxBytesForL7 = (int) (Math.pow(fanoutSize, l7) * 1024 * 1024);
int sstablesSizeForL7 = (int) (maxBytesForL7 * 1.001) + 1;
List<SSTableReader> sstablesOnL7 = Collections.singletonList(MockSchema.sstableWithLevel( 1, sstablesSizeForL7, l7, cfs));
lm.addSSTables(sstablesOnL7);
// generate data for L8 to trigger compaction
int l8 = 8;
int maxBytesForL8 = (int) (Math.pow(fanoutSize, l8) * 1024 * 1024);
int sstablesSizeForL8 = (int) (maxBytesForL8 * 1.001) + 1;
List<SSTableReader> sstablesOnL8 = Collections.singletonList(MockSchema.sstableWithLevel( 2, sstablesSizeForL8, l8, cfs));
lm.addSSTables(sstablesOnL8);
// compaction for L8 sstables is not supposed to be run because there is no upper level to promote sstables
// that's why we expect compaction candidates for L7 only
Collection<SSTableReader> compactionCandidates = lm.getCompactionCandidates().sstables;
assertThat(compactionCandidates).containsAll(sstablesOnL7);
assertThat(compactionCandidates).doesNotContainAnyElementsOf(sstablesOnL8);
}
@Test
public void testReduceScopeL0L1() throws IOException
{
ColumnFamilyStore cfs = MockSchema.newCFS();
Map<String, String> localOptions = new HashMap<>();
localOptions.put("class", "LeveledCompactionStrategy");
localOptions.put("sstable_size_in_mb", "1");
cfs.setCompactionParameters(localOptions);
List<SSTableReader> l1sstables = new ArrayList<>();
for (int i = 0; i < 10; i++)
{
SSTableReader l1sstable = MockSchema.sstable(i, 1 * 1024 * 1024, cfs);
l1sstable.descriptor.getMetadataSerializer().mutateLevel(l1sstable.descriptor, 1);
l1sstable.reloadSSTableMetadata();
l1sstables.add(l1sstable);
}
List<SSTableReader> l0sstables = new ArrayList<>();
for (int i = 10; i < 20; i++)
l0sstables.add(MockSchema.sstable(i, (i + 1) * 1024 * 1024, cfs));
try (LifecycleTransaction txn = LifecycleTransaction.offline(OperationType.COMPACTION, Iterables.concat(l0sstables, l1sstables)))
{
CompactionTask task = new LeveledCompactionTask(cfs, txn, 1, 0, 1024*1024, false);
SSTableReader lastRemoved = null;
boolean removed = true;
for (int i = 0; i < l0sstables.size(); i++)
{
Set<SSTableReader> before = new HashSet<>(txn.originals());
removed = task.reduceScopeForLimitedSpace(0);
SSTableReader removedSSTable = Iterables.getOnlyElement(Sets.difference(before, txn.originals()), null);
if (removed)
{
assertNotNull(removedSSTable);
assertTrue(lastRemoved == null || removedSSTable.onDiskLength() < lastRemoved.onDiskLength());
assertEquals(0, removedSSTable.getSSTableLevel());
Pair<Set<SSTableReader>, Set<SSTableReader>> sstables = groupByLevel(txn.originals());
Set<SSTableReader> l1after = sstables.right;
assertEquals(l1after, new HashSet<>(l1sstables)); // we don't touch L1
assertEquals(before.size() - 1, txn.originals().size());
lastRemoved = removedSSTable;
}
else
{
assertNull(removedSSTable);
Pair<Set<SSTableReader>, Set<SSTableReader>> sstables = groupByLevel(txn.originals());
Set<SSTableReader> l0after = sstables.left;
Set<SSTableReader> l1after = sstables.right;
assertEquals(l1after, new HashSet<>(l1sstables)); // we don't touch L1
assertEquals(1, l0after.size()); // and we stop reducing once there is a single sstable left
}
}
assertFalse(removed);
}
}
@Test
public void testReduceScopeL0()
{
List<SSTableReader> l0sstables = new ArrayList<>();
for (int i = 10; i < 20; i++)
l0sstables.add(MockSchema.sstable(i, (i + 1) * 1024 * 1024, cfs));
try (LifecycleTransaction txn = LifecycleTransaction.offline(OperationType.COMPACTION, l0sstables))
{
CompactionTask task = new LeveledCompactionTask(cfs, txn, 0, 0, 1024*1024, false);
SSTableReader lastRemoved = null;
boolean removed = true;
for (int i = 0; i < l0sstables.size(); i++)
{
Set<SSTableReader> before = new HashSet<>(txn.originals());
removed = task.reduceScopeForLimitedSpace(0);
SSTableReader removedSSTable = Sets.difference(before, txn.originals()).stream().findFirst().orElse(null);
if (removed)
{
assertNotNull(removedSSTable);
assertTrue(lastRemoved == null || removedSSTable.onDiskLength() < lastRemoved.onDiskLength());
assertEquals(0, removedSSTable.getSSTableLevel());
assertEquals(before.size() - 1, txn.originals().size());
lastRemoved = removedSSTable;
}
else
{
assertNull(removedSSTable);
Pair<Set<SSTableReader>, Set<SSTableReader>> sstables = groupByLevel(txn.originals());
Set<SSTableReader> l0after = sstables.left;
assertEquals(1, l0after.size()); // and we stop reducing once there is a single sstable left
}
}
assertFalse(removed);
}
}
@Test
public void testNoHighLevelReduction() throws IOException
{
List<SSTableReader> sstables = new ArrayList<>();
int i = 1;
for (; i < 5; i++)
{
SSTableReader sstable = MockSchema.sstable(i, (i + 1) * 1024 * 1024, cfs);
sstable.descriptor.getMetadataSerializer().mutateLevel(sstable.descriptor, 1);
sstable.reloadSSTableMetadata();
sstables.add(sstable);
}
for (; i < 10; i++)
{
SSTableReader sstable = MockSchema.sstable(i, (i + 1) * 1024 * 1024, cfs);
sstable.descriptor.getMetadataSerializer().mutateLevel(sstable.descriptor, 2);
sstable.reloadSSTableMetadata();
sstables.add(sstable);
}
try (LifecycleTransaction txn = LifecycleTransaction.offline(OperationType.COMPACTION, sstables))
{
CompactionTask task = new LeveledCompactionTask(cfs, txn, 0, 0, 1024 * 1024, false);
assertFalse(task.reduceScopeForLimitedSpace(0));
assertEquals(new HashSet<>(sstables), txn.originals());
}
}
private Pair<Set<SSTableReader>, Set<SSTableReader>> groupByLevel(Iterable<SSTableReader> sstables)
{
Set<SSTableReader> l1after = new HashSet<>();
Set<SSTableReader> l0after = new HashSet<>();
for (SSTableReader sstable : sstables)
{
switch (sstable.getSSTableLevel())
{
case 0:
l0after.add(sstable);
break;
case 1:
l1after.add(sstable);
break;
default:
throw new RuntimeException("only l0 & l1 sstables");
}
}
return Pair.create(l0after, l1after);
}
}