| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.cassandra.db.compaction; |
| |
| import java.io.IOException; |
| import java.util.ArrayList; |
| import java.util.Arrays; |
| import java.util.Collections; |
| import java.util.List; |
| import java.util.Set; |
| import java.util.concurrent.CountDownLatch; |
| import java.util.concurrent.atomic.AtomicInteger; |
| import java.util.stream.Collectors; |
| |
| import com.google.common.collect.Iterables; |
| import com.google.common.collect.Lists; |
| import com.google.common.collect.Sets; |
| import com.google.common.io.Files; |
| import org.junit.AfterClass; |
| import org.junit.Before; |
| import org.junit.BeforeClass; |
| import org.junit.Test; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import org.apache.cassandra.SchemaLoader; |
| import org.apache.cassandra.Util; |
| import org.apache.cassandra.config.DatabaseDescriptor; |
| import org.apache.cassandra.db.ColumnFamilyStore; |
| import org.apache.cassandra.db.DecoratedKey; |
| import org.apache.cassandra.db.Directories; |
| import org.apache.cassandra.db.DiskBoundaries; |
| import org.apache.cassandra.db.Keyspace; |
| import org.apache.cassandra.db.PartitionPosition; |
| import org.apache.cassandra.db.RowUpdateBuilder; |
| import org.apache.cassandra.db.compaction.AbstractStrategyHolder.GroupedSSTableContainer; |
| import org.apache.cassandra.dht.ByteOrderedPartitioner; |
| import org.apache.cassandra.dht.IPartitioner; |
| import org.apache.cassandra.io.sstable.format.SSTableReader; |
| import org.apache.cassandra.io.util.File; |
| import org.apache.cassandra.notifications.SSTableAddedNotification; |
| import org.apache.cassandra.notifications.SSTableDeletingNotification; |
| import org.apache.cassandra.schema.CompactionParams; |
| import org.apache.cassandra.schema.KeyspaceParams; |
| import org.apache.cassandra.service.StorageService; |
| import org.apache.cassandra.utils.ByteBufferUtil; |
| |
| import static org.apache.cassandra.utils.TimeUUID.Generator.nextTimeUUID; |
| import static org.junit.Assert.assertEquals; |
| import static org.junit.Assert.assertFalse; |
| import static org.junit.Assert.assertNotNull; |
| import static org.junit.Assert.assertSame; |
| import static org.junit.Assert.assertTrue; |
| import static org.junit.Assert.fail; |
| |
| public class CompactionStrategyManagerTest |
| { |
| private static final Logger logger = LoggerFactory.getLogger(CompactionStrategyManagerTest.class); |
| |
| private static final String KS_PREFIX = "Keyspace1"; |
| private static final String TABLE_PREFIX = "CF_STANDARD"; |
| |
| private static IPartitioner originalPartitioner; |
| private static boolean backups; |
| |
| @BeforeClass |
| public static void beforeClass() |
| { |
| SchemaLoader.prepareServer(); |
| backups = DatabaseDescriptor.isIncrementalBackupsEnabled(); |
| DatabaseDescriptor.setIncrementalBackupsEnabled(false); |
| /** |
| * We use byte ordered partitioner in this test to be able to easily infer an SSTable |
| * disk assignment based on its generation - See {@link this#getSSTableIndex(Integer[], SSTableReader)} |
| */ |
| originalPartitioner = StorageService.instance.setPartitionerUnsafe(ByteOrderedPartitioner.instance); |
| SchemaLoader.createKeyspace(KS_PREFIX, |
| KeyspaceParams.simple(1), |
| SchemaLoader.standardCFMD(KS_PREFIX, TABLE_PREFIX) |
| .compaction(CompactionParams.stcs(Collections.emptyMap()))); |
| } |
| |
| @Before |
| public void setUp() throws Exception |
| { |
| ColumnFamilyStore cfs = Keyspace.open(KS_PREFIX).getColumnFamilyStore(TABLE_PREFIX); |
| cfs.truncateBlocking(); |
| } |
| |
| @AfterClass |
| public static void afterClass() |
| { |
| DatabaseDescriptor.setPartitionerUnsafe(originalPartitioner); |
| DatabaseDescriptor.setIncrementalBackupsEnabled(backups); |
| } |
| |
| @Test |
| public void testSSTablesAssignedToCorrectCompactionStrategy() throws IOException |
| { |
| // Creates 100 SSTables with keys 0-99 |
| int numSSTables = 100; |
| ColumnFamilyStore cfs = Keyspace.open(KS_PREFIX).getColumnFamilyStore(TABLE_PREFIX); |
| cfs.disableAutoCompaction(); |
| Set<SSTableReader> previousSSTables = cfs.getLiveSSTables(); |
| for (int i = 0; i < numSSTables; i++) |
| { |
| createSSTableWithKey(KS_PREFIX, TABLE_PREFIX, i); |
| Set<SSTableReader> currentSSTables = cfs.getLiveSSTables(); |
| Set<SSTableReader> newSSTables = Sets.difference(currentSSTables, previousSSTables); |
| assertEquals(1, newSSTables.size()); |
| if (i % 3 == 0) |
| { |
| //make 1 third of sstables repaired |
| cfs.getCompactionStrategyManager().mutateRepaired(newSSTables, System.currentTimeMillis(), null, false); |
| } |
| else if (i % 3 == 1) |
| { |
| //make 1 third of sstables pending repair |
| cfs.getCompactionStrategyManager().mutateRepaired(newSSTables, 0, nextTimeUUID(), false); |
| } |
| previousSSTables = currentSSTables; |
| } |
| |
| // Creates a CompactionStrategymanager with different numbers of disks and check |
| // if the SSTables are assigned to the correct compaction strategies |
| for (int numDisks = 2; numDisks < 10; numDisks++) |
| { |
| testSSTablesAssignedToCorrectCompactionStrategy(numSSTables, numDisks); |
| } |
| } |
| |
| public void testSSTablesAssignedToCorrectCompactionStrategy(int numSSTables, int numDisks) |
| { |
| // Create a mock CFS with the given number of disks |
| MockCFS cfs = createJBODMockCFS(numDisks); |
| //Check that CFS will contain numSSTables |
| assertEquals(numSSTables, cfs.getLiveSSTables().size()); |
| |
| // Creates a compaction strategy manager with an external boundary supplier |
| final Integer[] boundaries = computeBoundaries(numSSTables, numDisks); |
| |
| MockBoundaryManager mockBoundaryManager = new MockBoundaryManager(cfs, boundaries); |
| logger.debug("Boundaries for {} disks is {}", numDisks, Arrays.toString(boundaries)); |
| CompactionStrategyManager csm = new CompactionStrategyManager(cfs, mockBoundaryManager::getBoundaries, |
| true); |
| |
| // Check that SSTables are assigned to the correct Compaction Strategy |
| for (SSTableReader reader : cfs.getLiveSSTables()) |
| { |
| verifySSTableIsAssignedToCorrectStrategy(boundaries, csm, reader); |
| } |
| |
| for (int delta = 1; delta <= 3; delta++) |
| { |
| // Update disk boundaries |
| Integer[] previousBoundaries = Arrays.copyOf(boundaries, boundaries.length); |
| updateBoundaries(mockBoundaryManager, boundaries, delta); |
| |
| // Check that SSTables are still assigned to the previous boundary layout |
| logger.debug("Old boundaries: {} New boundaries: {}", Arrays.toString(previousBoundaries), Arrays.toString(boundaries)); |
| for (SSTableReader reader : cfs.getLiveSSTables()) |
| { |
| verifySSTableIsAssignedToCorrectStrategy(previousBoundaries, csm, reader); |
| } |
| |
| // Reload CompactionStrategyManager so new disk boundaries will be loaded |
| csm.maybeReloadDiskBoundaries(); |
| |
| for (SSTableReader reader : cfs.getLiveSSTables()) |
| { |
| // Check that SSTables are assigned to the new boundary layout |
| verifySSTableIsAssignedToCorrectStrategy(boundaries, csm, reader); |
| |
| // Remove SSTable and check that it will be removed from the correct compaction strategy |
| csm.handleNotification(new SSTableDeletingNotification(reader), this); |
| assertFalse(((SizeTieredCompactionStrategy)csm.compactionStrategyFor(reader)).sstables.contains(reader)); |
| |
| // Add SSTable again and check that is correctly assigned |
| csm.handleNotification(new SSTableAddedNotification(Collections.singleton(reader), null), this); |
| verifySSTableIsAssignedToCorrectStrategy(boundaries, csm, reader); |
| } |
| } |
| } |
| |
| @Test |
| public void testAutomaticUpgradeConcurrency() throws Exception |
| { |
| ColumnFamilyStore cfs = Keyspace.open(KS_PREFIX).getColumnFamilyStore(TABLE_PREFIX); |
| DatabaseDescriptor.setAutomaticSSTableUpgradeEnabled(true); |
| DatabaseDescriptor.setMaxConcurrentAutoUpgradeTasks(1); |
| |
| // latch to block CompactionManager.BackgroundCompactionCandidate#maybeRunUpgradeTask |
| // inside the currentlyBackgroundUpgrading check - with max_concurrent_auto_upgrade_tasks = 1 this will make |
| // sure that BackgroundCompactionCandidate#maybeRunUpgradeTask returns false until the latch has been counted down |
| CountDownLatch latch = new CountDownLatch(1); |
| AtomicInteger upgradeTaskCount = new AtomicInteger(0); |
| MockCFSForCSM mock = new MockCFSForCSM(cfs, latch, upgradeTaskCount); |
| |
| CompactionManager.BackgroundCompactionCandidate r = CompactionManager.instance.getBackgroundCompactionCandidate(mock); |
| CompactionStrategyManager mgr = mock.getCompactionStrategyManager(); |
| // basic idea is that we start a thread which will be able to get in to the currentlyBackgroundUpgrading-guarded |
| // code in CompactionManager, then we try to run a bunch more of the upgrade tasks which should return false |
| // due to the currentlyBackgroundUpgrading count being >= max_concurrent_auto_upgrade_tasks |
| Thread t = new Thread(() -> r.maybeRunUpgradeTask(mgr)); |
| t.start(); |
| Thread.sleep(100); // let the thread start and grab the task |
| assertEquals(1, CompactionManager.instance.currentlyBackgroundUpgrading.get()); |
| assertFalse(r.maybeRunUpgradeTask(mgr)); |
| assertFalse(r.maybeRunUpgradeTask(mgr)); |
| latch.countDown(); |
| t.join(); |
| assertEquals(1, upgradeTaskCount.get()); // we should only call findUpgradeSSTableTask once when concurrency = 1 |
| assertEquals(0, CompactionManager.instance.currentlyBackgroundUpgrading.get()); |
| |
| DatabaseDescriptor.setAutomaticSSTableUpgradeEnabled(false); |
| } |
| |
| @Test |
| public void testAutomaticUpgradeConcurrency2() throws Exception |
| { |
| ColumnFamilyStore cfs = Keyspace.open(KS_PREFIX).getColumnFamilyStore(TABLE_PREFIX); |
| DatabaseDescriptor.setAutomaticSSTableUpgradeEnabled(true); |
| DatabaseDescriptor.setMaxConcurrentAutoUpgradeTasks(2); |
| // latch to block CompactionManager.BackgroundCompactionCandidate#maybeRunUpgradeTask |
| // inside the currentlyBackgroundUpgrading check - with max_concurrent_auto_upgrade_tasks = 1 this will make |
| // sure that BackgroundCompactionCandidate#maybeRunUpgradeTask returns false until the latch has been counted down |
| CountDownLatch latch = new CountDownLatch(1); |
| AtomicInteger upgradeTaskCount = new AtomicInteger(); |
| MockCFSForCSM mock = new MockCFSForCSM(cfs, latch, upgradeTaskCount); |
| |
| CompactionManager.BackgroundCompactionCandidate r = CompactionManager.instance.getBackgroundCompactionCandidate(mock); |
| CompactionStrategyManager mgr = mock.getCompactionStrategyManager(); |
| |
| // basic idea is that we start 2 threads who will be able to get in to the currentlyBackgroundUpgrading-guarded |
| // code in CompactionManager, then we try to run a bunch more of the upgrade task which should return false |
| // due to the currentlyBackgroundUpgrading count being >= max_concurrent_auto_upgrade_tasks |
| Thread t = new Thread(() -> r.maybeRunUpgradeTask(mgr)); |
| t.start(); |
| Thread t2 = new Thread(() -> r.maybeRunUpgradeTask(mgr)); |
| t2.start(); |
| Thread.sleep(100); // let the threads start and grab the task |
| assertEquals(2, CompactionManager.instance.currentlyBackgroundUpgrading.get()); |
| assertFalse(r.maybeRunUpgradeTask(mgr)); |
| assertFalse(r.maybeRunUpgradeTask(mgr)); |
| assertFalse(r.maybeRunUpgradeTask(mgr)); |
| assertEquals(2, CompactionManager.instance.currentlyBackgroundUpgrading.get()); |
| latch.countDown(); |
| t.join(); |
| t2.join(); |
| assertEquals(2, upgradeTaskCount.get()); |
| assertEquals(0, CompactionManager.instance.currentlyBackgroundUpgrading.get()); |
| |
| DatabaseDescriptor.setMaxConcurrentAutoUpgradeTasks(1); |
| DatabaseDescriptor.setAutomaticSSTableUpgradeEnabled(false); |
| } |
| |
| private static void assertHolderExclusivity(boolean isRepaired, boolean isPendingRepair, boolean isTransient, Class<? extends AbstractStrategyHolder> expectedType) |
| { |
| ColumnFamilyStore cfs = Keyspace.open(KS_PREFIX).getColumnFamilyStore(TABLE_PREFIX); |
| CompactionStrategyManager csm = cfs.getCompactionStrategyManager(); |
| |
| AbstractStrategyHolder holder = csm.getHolder(isRepaired, isPendingRepair, isTransient); |
| assertNotNull(holder); |
| assertSame(expectedType, holder.getClass()); |
| |
| int matches = 0; |
| for (AbstractStrategyHolder other : csm.getHolders()) |
| { |
| if (other.managesRepairedGroup(isRepaired, isPendingRepair, isTransient)) |
| { |
| assertSame("holder assignment should be mutually exclusive", holder, other); |
| matches++; |
| } |
| } |
| assertEquals(1, matches); |
| } |
| |
| private static void assertInvalieHolderConfig(boolean isRepaired, boolean isPendingRepair, boolean isTransient) |
| { |
| ColumnFamilyStore cfs = Keyspace.open(KS_PREFIX).getColumnFamilyStore(TABLE_PREFIX); |
| CompactionStrategyManager csm = cfs.getCompactionStrategyManager(); |
| try |
| { |
| csm.getHolder(isRepaired, isPendingRepair, isTransient); |
| fail("Expected IllegalArgumentException"); |
| } |
| catch (IllegalArgumentException e) |
| { |
| // expected |
| } |
| } |
| |
| /** |
| * If an sstable can be be assigned to a strategy holder, it shouldn't be possibly to |
| * assign it to any of the other holders. |
| */ |
| @Test |
| public void testMutualExclusiveHolderClassification() throws Exception |
| { |
| assertHolderExclusivity(false, false, false, CompactionStrategyHolder.class); |
| assertHolderExclusivity(true, false, false, CompactionStrategyHolder.class); |
| assertHolderExclusivity(false, true, false, PendingRepairHolder.class); |
| assertHolderExclusivity(false, true, true, PendingRepairHolder.class); |
| assertInvalieHolderConfig(true, true, false); |
| assertInvalieHolderConfig(true, true, true); |
| assertInvalieHolderConfig(false, false, true); |
| assertInvalieHolderConfig(true, false, true); |
| } |
| |
| PartitionPosition forKey(int key) |
| { |
| DecoratedKey dk = Util.dk(String.format("%04d", key)); |
| return dk.getToken().minKeyBound(); |
| } |
| |
| /** |
| * Test that csm.groupSSTables correctly groups sstables by repaired status and directory |
| */ |
| @Test |
| public void groupSSTables() throws Exception |
| { |
| final int numDir = 4; |
| ColumnFamilyStore cfs = createJBODMockCFS(numDir); |
| Keyspace.open(cfs.keyspace.getName()).getColumnFamilyStore(cfs.name).disableAutoCompaction(); |
| assertTrue(cfs.getLiveSSTables().isEmpty()); |
| List<SSTableReader> transientRepairs = new ArrayList<>(); |
| List<SSTableReader> pendingRepair = new ArrayList<>(); |
| List<SSTableReader> unrepaired = new ArrayList<>(); |
| List<SSTableReader> repaired = new ArrayList<>(); |
| |
| for (int i = 0; i < numDir; i++) |
| { |
| int key = 100 * i; |
| transientRepairs.add(createSSTableWithKey(cfs.keyspace.getName(), cfs.name, key++)); |
| pendingRepair.add(createSSTableWithKey(cfs.keyspace.getName(), cfs.name, key++)); |
| unrepaired.add(createSSTableWithKey(cfs.keyspace.getName(), cfs.name, key++)); |
| repaired.add(createSSTableWithKey(cfs.keyspace.getName(), cfs.name, key++)); |
| } |
| |
| cfs.getCompactionStrategyManager().mutateRepaired(transientRepairs, 0, nextTimeUUID(), true); |
| cfs.getCompactionStrategyManager().mutateRepaired(pendingRepair, 0, nextTimeUUID(), false); |
| cfs.getCompactionStrategyManager().mutateRepaired(repaired, 1000, null, false); |
| |
| DiskBoundaries boundaries = new DiskBoundaries(cfs, cfs.getDirectories().getWriteableLocations(), |
| Lists.newArrayList(forKey(100), forKey(200), forKey(300)), |
| 10, 10); |
| |
| CompactionStrategyManager csm = new CompactionStrategyManager(cfs, () -> boundaries, true); |
| |
| List<GroupedSSTableContainer> grouped = csm.groupSSTables(Iterables.concat( transientRepairs, pendingRepair, repaired, unrepaired)); |
| |
| for (int x=0; x<grouped.size(); x++) |
| { |
| GroupedSSTableContainer group = grouped.get(x); |
| AbstractStrategyHolder holder = csm.getHolders().get(x); |
| for (int y=0; y<numDir; y++) |
| { |
| SSTableReader sstable = Iterables.getOnlyElement(group.getGroup(y)); |
| assertTrue(holder.managesSSTable(sstable)); |
| SSTableReader expected; |
| if (sstable.isRepaired()) |
| expected = repaired.get(y); |
| else if (sstable.isPendingRepair()) |
| { |
| if (sstable.isTransient()) |
| { |
| expected = transientRepairs.get(y); |
| } |
| else |
| { |
| expected = pendingRepair.get(y); |
| } |
| } |
| else |
| expected = unrepaired.get(y); |
| |
| assertSame(expected, sstable); |
| } |
| } |
| } |
| |
| private MockCFS createJBODMockCFS(int disks) |
| { |
| // Create #disks data directories to simulate JBOD |
| Directories.DataDirectory[] directories = new Directories.DataDirectory[disks]; |
| for (int i = 0; i < disks; ++i) |
| { |
| File tempDir = new File(Files.createTempDir()); |
| tempDir.deleteOnExit(); |
| directories[i] = new Directories.DataDirectory(tempDir); |
| } |
| |
| ColumnFamilyStore cfs = Keyspace.open(KS_PREFIX).getColumnFamilyStore(TABLE_PREFIX); |
| MockCFS mockCFS = new MockCFS(cfs, new Directories(cfs.metadata(), directories)); |
| mockCFS.disableAutoCompaction(); |
| mockCFS.addSSTables(cfs.getLiveSSTables()); |
| return mockCFS; |
| } |
| |
| /** |
| * Updates the boundaries with a delta |
| */ |
| private void updateBoundaries(MockBoundaryManager boundaryManager, Integer[] boundaries, int delta) |
| { |
| for (int j = 0; j < boundaries.length - 1; j++) |
| { |
| if ((j + delta) % 2 == 0) |
| boundaries[j] -= delta; |
| else |
| boundaries[j] += delta; |
| } |
| boundaryManager.invalidateBoundaries(); |
| } |
| |
| private void verifySSTableIsAssignedToCorrectStrategy(Integer[] boundaries, CompactionStrategyManager csm, SSTableReader reader) |
| { |
| // Check that sstable is assigned to correct disk |
| int index = getSSTableIndex(boundaries, reader); |
| assertEquals(index, csm.compactionStrategyIndexFor(reader)); |
| // Check that compaction strategy actually contains SSTable |
| assertTrue(((SizeTieredCompactionStrategy)csm.compactionStrategyFor(reader)).sstables.contains(reader)); |
| } |
| |
| /** |
| * Creates disk boundaries such that each disk receives |
| * an equal amount of SSTables |
| */ |
| private Integer[] computeBoundaries(int numSSTables, int numDisks) |
| { |
| Integer[] result = new Integer[numDisks]; |
| int sstablesPerRange = numSSTables / numDisks; |
| result[0] = sstablesPerRange; |
| for (int i = 1; i < numDisks; i++) |
| { |
| result[i] = result[i - 1] + sstablesPerRange; |
| } |
| result[numDisks - 1] = numSSTables; // make last boundary alwyays be the number of SSTables to prevent rounding errors |
| return result; |
| } |
| |
| private int getSSTableIndex(Integer[] boundaries, SSTableReader reader) |
| { |
| int index = 0; |
| int firstKey = Integer.parseInt(new String(ByteBufferUtil.getArray(reader.first.getKey()))); |
| while (boundaries[index] <= firstKey) |
| index++; |
| logger.debug("Index for SSTable {} on boundary {} is {}", reader.descriptor.id, Arrays.toString(boundaries), index); |
| return index; |
| } |
| |
| |
| |
| class MockBoundaryManager |
| { |
| private final ColumnFamilyStore cfs; |
| private Integer[] positions; |
| private DiskBoundaries boundaries; |
| |
| public MockBoundaryManager(ColumnFamilyStore cfs, Integer[] positions) |
| { |
| this.cfs = cfs; |
| this.positions = positions; |
| this.boundaries = createDiskBoundaries(cfs, positions); |
| } |
| |
| public void invalidateBoundaries() |
| { |
| boundaries.invalidate(); |
| } |
| |
| public DiskBoundaries getBoundaries() |
| { |
| if (boundaries.isOutOfDate()) |
| boundaries = createDiskBoundaries(cfs, positions); |
| return boundaries; |
| } |
| |
| private DiskBoundaries createDiskBoundaries(ColumnFamilyStore cfs, Integer[] boundaries) |
| { |
| List<PartitionPosition> positions = Arrays.stream(boundaries).map(b -> Util.token(String.format(String.format("%04d", b))).minKeyBound()).collect(Collectors.toList()); |
| return new DiskBoundaries(cfs, cfs.getDirectories().getWriteableLocations(), positions, 0, 0); |
| } |
| } |
| |
| private static SSTableReader createSSTableWithKey(String keyspace, String table, int key) |
| { |
| long timestamp = System.currentTimeMillis(); |
| DecoratedKey dk = Util.dk(String.format("%04d", key)); |
| ColumnFamilyStore cfs = Keyspace.open(keyspace).getColumnFamilyStore(table); |
| new RowUpdateBuilder(cfs.metadata(), timestamp, dk.getKey()) |
| .clustering(Integer.toString(key)) |
| .add("val", "val") |
| .build() |
| .applyUnsafe(); |
| Set<SSTableReader> before = cfs.getLiveSSTables(); |
| Util.flush(cfs); |
| Set<SSTableReader> after = cfs.getLiveSSTables(); |
| return Iterables.getOnlyElement(Sets.difference(after, before)); |
| } |
| |
| // just to be able to override the data directories |
| private static class MockCFS extends ColumnFamilyStore |
| { |
| MockCFS(ColumnFamilyStore cfs, Directories dirs) |
| { |
| super(cfs.keyspace, cfs.getTableName(), Util.newSeqGen(), cfs.metadata, dirs, false, false, true); |
| } |
| } |
| |
| private static class MockCFSForCSM extends ColumnFamilyStore |
| { |
| private final CountDownLatch latch; |
| private final AtomicInteger upgradeTaskCount; |
| |
| private MockCFSForCSM(ColumnFamilyStore cfs, CountDownLatch latch, AtomicInteger upgradeTaskCount) |
| { |
| super(cfs.keyspace, cfs.name, Util.newSeqGen(10), cfs.metadata, cfs.getDirectories(), true, false, false); |
| this.latch = latch; |
| this.upgradeTaskCount = upgradeTaskCount; |
| } |
| @Override |
| public CompactionStrategyManager getCompactionStrategyManager() |
| { |
| return new MockCSM(this, latch, upgradeTaskCount); |
| } |
| } |
| |
| private static class MockCSM extends CompactionStrategyManager |
| { |
| private final CountDownLatch latch; |
| private final AtomicInteger upgradeTaskCount; |
| |
| private MockCSM(ColumnFamilyStore cfs, CountDownLatch latch, AtomicInteger upgradeTaskCount) |
| { |
| super(cfs); |
| this.latch = latch; |
| this.upgradeTaskCount = upgradeTaskCount; |
| } |
| |
| @Override |
| public AbstractCompactionTask findUpgradeSSTableTask() |
| { |
| try |
| { |
| latch.await(); |
| upgradeTaskCount.incrementAndGet(); |
| } |
| catch (InterruptedException e) |
| { |
| throw new RuntimeException(e); |
| } |
| return null; |
| } |
| } |
| } |