blob: 9f2bc2ea75fbbe2b3083b6d3c38eeec8af48d1b3 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.cassandra.db.compaction;
import java.util.Collections;
import java.util.List;
import java.util.UUID;
import com.google.common.collect.Iterables;
import org.junit.Assert;
import org.junit.Test;
import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.apache.cassandra.notifications.SSTableAddedNotification;
import org.apache.cassandra.notifications.SSTableDeletingNotification;
import org.apache.cassandra.notifications.SSTableListChangedNotification;
import org.apache.cassandra.notifications.SSTableRepairStatusChanged;
import org.apache.cassandra.repair.consistent.LocalSessionAccessor;
import org.apache.cassandra.service.ActiveRepairService;
import org.apache.cassandra.utils.FBUtilities;
/**
* Tests CompactionStrategyManager's handling of pending repair sstables
*/
public class CompactionStrategyManagerPendingRepairTest extends AbstractPendingRepairTest
{
private boolean transientContains(SSTableReader sstable)
{
return csm.getTransientRepairsUnsafe().containsSSTable(sstable);
}
private boolean pendingContains(SSTableReader sstable)
{
return csm.getPendingRepairsUnsafe().containsSSTable(sstable);
}
private boolean repairedContains(SSTableReader sstable)
{
return csm.getRepairedUnsafe().containsSSTable(sstable);
}
private boolean unrepairedContains(SSTableReader sstable)
{
return csm.getUnrepairedUnsafe().containsSSTable(sstable);
}
private boolean hasPendingStrategiesFor(UUID sessionID)
{
return !Iterables.isEmpty(csm.getPendingRepairsUnsafe().getStrategiesFor(sessionID));
}
private boolean hasTransientStrategiesFor(UUID sessionID)
{
return !Iterables.isEmpty(csm.getTransientRepairsUnsafe().getStrategiesFor(sessionID));
}
/**
* Pending repair strategy should be created when we encounter a new pending id
*/
@Test
public void sstableAdded()
{
UUID repairID = registerSession(cfs, true, true);
LocalSessionAccessor.prepareUnsafe(repairID, COORDINATOR, PARTICIPANTS);
Assert.assertTrue(Iterables.isEmpty(csm.getPendingRepairsUnsafe().allStrategies()));
SSTableReader sstable = makeSSTable(true);
Assert.assertFalse(sstable.isRepaired());
Assert.assertFalse(sstable.isPendingRepair());
mutateRepaired(sstable, repairID, false);
Assert.assertFalse(sstable.isRepaired());
Assert.assertTrue(sstable.isPendingRepair());
Assert.assertFalse(hasPendingStrategiesFor(repairID));
Assert.assertFalse(hasTransientStrategiesFor(repairID));
// add the sstable
csm.handleNotification(new SSTableAddedNotification(Collections.singleton(sstable), null), cfs.getTracker());
Assert.assertFalse(repairedContains(sstable));
Assert.assertFalse(unrepairedContains(sstable));
Assert.assertTrue(pendingContains(sstable));
Assert.assertTrue(hasPendingStrategiesFor(repairID));
Assert.assertFalse(hasTransientStrategiesFor(repairID));
}
@Test
public void sstableListChangedAddAndRemove()
{
UUID repairID = registerSession(cfs, true, true);
LocalSessionAccessor.prepareUnsafe(repairID, COORDINATOR, PARTICIPANTS);
SSTableReader sstable1 = makeSSTable(true);
mutateRepaired(sstable1, repairID, false);
SSTableReader sstable2 = makeSSTable(true);
mutateRepaired(sstable2, repairID, false);
Assert.assertFalse(repairedContains(sstable1));
Assert.assertFalse(unrepairedContains(sstable1));
Assert.assertFalse(repairedContains(sstable2));
Assert.assertFalse(unrepairedContains(sstable2));
Assert.assertFalse(hasPendingStrategiesFor(repairID));
Assert.assertFalse(hasTransientStrategiesFor(repairID));
// add only
SSTableListChangedNotification notification;
notification = new SSTableListChangedNotification(Collections.singleton(sstable1),
Collections.emptyList(),
OperationType.COMPACTION);
csm.handleNotification(notification, cfs.getTracker());
Assert.assertFalse(repairedContains(sstable1));
Assert.assertFalse(unrepairedContains(sstable1));
Assert.assertTrue(pendingContains(sstable1));
Assert.assertFalse(repairedContains(sstable2));
Assert.assertFalse(unrepairedContains(sstable2));
Assert.assertFalse(pendingContains(sstable2));
Assert.assertTrue(hasPendingStrategiesFor(repairID));
Assert.assertFalse(hasTransientStrategiesFor(repairID));
// remove and add
notification = new SSTableListChangedNotification(Collections.singleton(sstable2),
Collections.singleton(sstable1),
OperationType.COMPACTION);
csm.handleNotification(notification, cfs.getTracker());
Assert.assertFalse(repairedContains(sstable1));
Assert.assertFalse(unrepairedContains(sstable1));
Assert.assertFalse(pendingContains(sstable1));
Assert.assertFalse(repairedContains(sstable2));
Assert.assertFalse(unrepairedContains(sstable2));
Assert.assertTrue(pendingContains(sstable2));
}
@Test
public void sstableRepairStatusChanged()
{
UUID repairID = registerSession(cfs, true, true);
LocalSessionAccessor.prepareUnsafe(repairID, COORDINATOR, PARTICIPANTS);
// add as unrepaired
SSTableReader sstable = makeSSTable(false);
Assert.assertTrue(unrepairedContains(sstable));
Assert.assertFalse(repairedContains(sstable));
Assert.assertFalse(hasPendingStrategiesFor(repairID));
Assert.assertFalse(hasTransientStrategiesFor(repairID));
SSTableRepairStatusChanged notification;
// change to pending repaired
mutateRepaired(sstable, repairID, false);
notification = new SSTableRepairStatusChanged(Collections.singleton(sstable));
csm.handleNotification(notification, cfs.getTracker());
Assert.assertFalse(unrepairedContains(sstable));
Assert.assertFalse(repairedContains(sstable));
Assert.assertTrue(hasPendingStrategiesFor(repairID));
Assert.assertFalse(hasTransientStrategiesFor(repairID));
Assert.assertTrue(pendingContains(sstable));
// change to repaired
mutateRepaired(sstable, System.currentTimeMillis());
notification = new SSTableRepairStatusChanged(Collections.singleton(sstable));
csm.handleNotification(notification, cfs.getTracker());
Assert.assertFalse(unrepairedContains(sstable));
Assert.assertTrue(repairedContains(sstable));
Assert.assertFalse(pendingContains(sstable));
}
@Test
public void sstableDeleted()
{
UUID repairID = registerSession(cfs, true, true);
LocalSessionAccessor.prepareUnsafe(repairID, COORDINATOR, PARTICIPANTS);
SSTableReader sstable = makeSSTable(true);
mutateRepaired(sstable, repairID, false);
csm.handleNotification(new SSTableAddedNotification(Collections.singleton(sstable), null), cfs.getTracker());
Assert.assertTrue(pendingContains(sstable));
// delete sstable
SSTableDeletingNotification notification = new SSTableDeletingNotification(sstable);
csm.handleNotification(notification, cfs.getTracker());
Assert.assertFalse(pendingContains(sstable));
Assert.assertFalse(unrepairedContains(sstable));
Assert.assertFalse(repairedContains(sstable));
}
/**
* CompactionStrategyManager.getStrategies should include
* pending repair strategies when appropriate
*/
@Test
public void getStrategies()
{
UUID repairID = registerSession(cfs, true, true);
LocalSessionAccessor.prepareUnsafe(repairID, COORDINATOR, PARTICIPANTS);
List<List<AbstractCompactionStrategy>> strategies;
strategies = csm.getStrategies();
Assert.assertEquals(3, strategies.size());
Assert.assertTrue(strategies.get(2).isEmpty());
SSTableReader sstable = makeSSTable(true);
mutateRepaired(sstable, repairID, false);
csm.handleNotification(new SSTableAddedNotification(Collections.singleton(sstable), null), cfs.getTracker());
strategies = csm.getStrategies();
Assert.assertEquals(3, strategies.size());
Assert.assertFalse(strategies.get(2).isEmpty());
}
/**
* Tests that finalized repairs result in cleanup compaction tasks
* which reclassify the sstables as repaired
*/
@Test
public void cleanupCompactionFinalized()
{
UUID repairID = registerSession(cfs, true, true);
LocalSessionAccessor.prepareUnsafe(repairID, COORDINATOR, PARTICIPANTS);
SSTableReader sstable = makeSSTable(true);
mutateRepaired(sstable, repairID, false);
csm.handleNotification(new SSTableAddedNotification(Collections.singleton(sstable), null), cfs.getTracker());
LocalSessionAccessor.finalizeUnsafe(repairID);
Assert.assertTrue(hasPendingStrategiesFor(repairID));
Assert.assertFalse(hasTransientStrategiesFor(repairID));
Assert.assertTrue(pendingContains(sstable));
Assert.assertTrue(sstable.isPendingRepair());
Assert.assertFalse(sstable.isRepaired());
cfs.getCompactionStrategyManager().enable(); // enable compaction to fetch next background task
AbstractCompactionTask compactionTask = csm.getNextBackgroundTask(FBUtilities.nowInSeconds());
Assert.assertNotNull(compactionTask);
Assert.assertSame(PendingRepairManager.RepairFinishedCompactionTask.class, compactionTask.getClass());
// run the compaction
compactionTask.execute(ActiveCompactionsTracker.NOOP);
Assert.assertTrue(repairedContains(sstable));
Assert.assertFalse(unrepairedContains(sstable));
Assert.assertFalse(pendingContains(sstable));
Assert.assertFalse(hasPendingStrategiesFor(repairID));
Assert.assertFalse(hasTransientStrategiesFor(repairID));
// sstable should have pendingRepair cleared, and repairedAt set correctly
long expectedRepairedAt = ActiveRepairService.instance.getParentRepairSession(repairID).repairedAt;
Assert.assertFalse(sstable.isPendingRepair());
Assert.assertTrue(sstable.isRepaired());
Assert.assertEquals(expectedRepairedAt, sstable.getSSTableMetadata().repairedAt);
}
/**
* Tests that failed repairs result in cleanup compaction tasks
* which reclassify the sstables as unrepaired
*/
@Test
public void cleanupCompactionFailed()
{
UUID repairID = registerSession(cfs, true, true);
LocalSessionAccessor.prepareUnsafe(repairID, COORDINATOR, PARTICIPANTS);
SSTableReader sstable = makeSSTable(true);
mutateRepaired(sstable, repairID, false);
csm.handleNotification(new SSTableAddedNotification(Collections.singleton(sstable), null), cfs.getTracker());
LocalSessionAccessor.failUnsafe(repairID);
Assert.assertTrue(hasPendingStrategiesFor(repairID));
Assert.assertFalse(hasTransientStrategiesFor(repairID));
Assert.assertTrue(pendingContains(sstable));
Assert.assertTrue(sstable.isPendingRepair());
Assert.assertFalse(sstable.isRepaired());
cfs.getCompactionStrategyManager().enable(); // enable compaction to fetch next background task
AbstractCompactionTask compactionTask = csm.getNextBackgroundTask(FBUtilities.nowInSeconds());
Assert.assertNotNull(compactionTask);
Assert.assertSame(PendingRepairManager.RepairFinishedCompactionTask.class, compactionTask.getClass());
// run the compaction
compactionTask.execute(ActiveCompactionsTracker.NOOP);
Assert.assertFalse(repairedContains(sstable));
Assert.assertTrue(unrepairedContains(sstable));
Assert.assertFalse(hasPendingStrategiesFor(repairID));
Assert.assertFalse(hasTransientStrategiesFor(repairID));
// sstable should have pendingRepair cleared, and repairedAt set correctly
Assert.assertFalse(sstable.isPendingRepair());
Assert.assertFalse(sstable.isRepaired());
Assert.assertEquals(ActiveRepairService.UNREPAIRED_SSTABLE, sstable.getSSTableMetadata().repairedAt);
}
@Test
public void finalizedSessionTransientCleanup()
{
Assert.assertTrue(cfs.getLiveSSTables().isEmpty());
UUID repairID = registerSession(cfs, true, true);
LocalSessionAccessor.prepareUnsafe(repairID, COORDINATOR, PARTICIPANTS);
SSTableReader sstable = makeSSTable(true);
mutateRepaired(sstable, repairID, true);
csm.handleNotification(new SSTableAddedNotification(Collections.singleton(sstable), null), cfs.getTracker());
LocalSessionAccessor.finalizeUnsafe(repairID);
Assert.assertFalse(hasPendingStrategiesFor(repairID));
Assert.assertTrue(hasTransientStrategiesFor(repairID));
Assert.assertTrue(transientContains(sstable));
Assert.assertFalse(pendingContains(sstable));
Assert.assertFalse(repairedContains(sstable));
Assert.assertFalse(unrepairedContains(sstable));
cfs.getCompactionStrategyManager().enable(); // enable compaction to fetch next background task
AbstractCompactionTask compactionTask = csm.getNextBackgroundTask(FBUtilities.nowInSeconds());
Assert.assertNotNull(compactionTask);
Assert.assertSame(PendingRepairManager.RepairFinishedCompactionTask.class, compactionTask.getClass());
// run the compaction
compactionTask.execute(ActiveCompactionsTracker.NOOP);
Assert.assertTrue(cfs.getLiveSSTables().isEmpty());
Assert.assertFalse(hasPendingStrategiesFor(repairID));
Assert.assertFalse(hasTransientStrategiesFor(repairID));
}
@Test
public void failedSessionTransientCleanup()
{
Assert.assertTrue(cfs.getLiveSSTables().isEmpty());
UUID repairID = registerSession(cfs, true, true);
LocalSessionAccessor.prepareUnsafe(repairID, COORDINATOR, PARTICIPANTS);
SSTableReader sstable = makeSSTable(true);
mutateRepaired(sstable, repairID, true);
csm.handleNotification(new SSTableAddedNotification(Collections.singleton(sstable), null), cfs.getTracker());
LocalSessionAccessor.failUnsafe(repairID);
Assert.assertFalse(hasPendingStrategiesFor(repairID));
Assert.assertTrue(hasTransientStrategiesFor(repairID));
Assert.assertTrue(transientContains(sstable));
Assert.assertFalse(pendingContains(sstable));
Assert.assertFalse(repairedContains(sstable));
Assert.assertFalse(unrepairedContains(sstable));
cfs.getCompactionStrategyManager().enable(); // enable compaction to fetch next background task
AbstractCompactionTask compactionTask = csm.getNextBackgroundTask(FBUtilities.nowInSeconds());
Assert.assertNotNull(compactionTask);
Assert.assertSame(PendingRepairManager.RepairFinishedCompactionTask.class, compactionTask.getClass());
// run the compaction
compactionTask.execute(ActiveCompactionsTracker.NOOP);
Assert.assertFalse(cfs.getLiveSSTables().isEmpty());
Assert.assertFalse(hasPendingStrategiesFor(repairID));
Assert.assertFalse(hasTransientStrategiesFor(repairID));
Assert.assertFalse(transientContains(sstable));
Assert.assertFalse(pendingContains(sstable));
Assert.assertFalse(repairedContains(sstable));
Assert.assertTrue(unrepairedContains(sstable));
}
}