blob: 1d3d18c312efc638c48d509ad6f725b69dde11bf [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.cassandra.db.compaction;
import java.util.*;
import java.util.concurrent.Callable;
import org.apache.cassandra.db.lifecycle.LifecycleNewTracker;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.Directories;
import org.apache.cassandra.db.Memtable;
import org.apache.cassandra.db.SerializationHeader;
import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
import org.apache.cassandra.db.lifecycle.SSTableSet;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.io.sstable.Descriptor;
import org.apache.cassandra.io.sstable.SSTableMultiWriter;
import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.apache.cassandra.io.sstable.ISSTableScanner;
import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
import org.apache.cassandra.notifications.*;
import org.apache.cassandra.schema.CompactionParams;
import org.apache.cassandra.service.ActiveRepairService;
/**
* Manages the compaction strategies.
*
* Currently has two instances of actual compaction strategies - one for repaired data and one for
* unrepaired data. This is done to be able to totally separate the different sets of sstables.
*/
public class CompactionStrategyManager implements INotificationConsumer
{
private static final Logger logger = LoggerFactory.getLogger(CompactionStrategyManager.class);
private final ColumnFamilyStore cfs;
private volatile AbstractCompactionStrategy repaired;
private volatile AbstractCompactionStrategy unrepaired;
private volatile boolean enabled = true;
public boolean isActive = true;
private volatile CompactionParams params;
/*
We keep a copy of the schema compaction parameters here to be able to decide if we
should update the compaction strategy in maybeReloadCompactionStrategy() due to an ALTER.
If a user changes the local compaction strategy and then later ALTERs a compaction parameter,
we will use the new compaction parameters.
*/
private CompactionParams schemaCompactionParams;
public CompactionStrategyManager(ColumnFamilyStore cfs)
{
cfs.getTracker().subscribe(this);
logger.trace("{} subscribed to the data tracker.", this);
this.cfs = cfs;
reload(cfs.metadata);
params = cfs.metadata.params.compaction;
enabled = params.isEnabled();
}
/**
* Return the next background task
*
* Returns a task for the compaction strategy that needs it the most (most estimated remaining tasks)
*
*/
public synchronized AbstractCompactionTask getNextBackgroundTask(int gcBefore)
{
if (!isEnabled())
return null;
maybeReload(cfs.metadata);
if (repaired.getEstimatedRemainingTasks() > unrepaired.getEstimatedRemainingTasks())
{
AbstractCompactionTask repairedTask = repaired.getNextBackgroundTask(gcBefore);
if (repairedTask != null)
return repairedTask;
return unrepaired.getNextBackgroundTask(gcBefore);
}
else
{
AbstractCompactionTask unrepairedTask = unrepaired.getNextBackgroundTask(gcBefore);
if (unrepairedTask != null)
return unrepairedTask;
return repaired.getNextBackgroundTask(gcBefore);
}
}
public boolean isEnabled()
{
return enabled && isActive;
}
public synchronized void resume()
{
isActive = true;
}
/**
* pause compaction while we cancel all ongoing compactions
*
* Separate call from enable/disable to not have to save the enabled-state externally
*/
public synchronized void pause()
{
isActive = false;
}
private void startup()
{
for (SSTableReader sstable : cfs.getSSTables(SSTableSet.CANONICAL))
{
if (sstable.openReason != SSTableReader.OpenReason.EARLY)
getCompactionStrategyFor(sstable).addSSTable(sstable);
}
repaired.startup();
unrepaired.startup();
}
/**
* return the compaction strategy for the given sstable
*
* returns differently based on the repaired status
* @param sstable
* @return
*/
private AbstractCompactionStrategy getCompactionStrategyFor(SSTableReader sstable)
{
if (sstable.isRepaired())
return repaired;
else
return unrepaired;
}
public void shutdown()
{
isActive = false;
repaired.shutdown();
unrepaired.shutdown();
}
public synchronized void maybeReload(CFMetaData metadata)
{
// compare the old schema configuration to the new one, ignore any locally set changes.
if (metadata.params.compaction.equals(schemaCompactionParams))
return;
reload(metadata);
}
/**
* Reload the compaction strategies
*
* Called after changing configuration and at startup.
* @param metadata
*/
public synchronized void reload(CFMetaData metadata)
{
boolean disabledWithJMX = !enabled && shouldBeEnabled();
setStrategy(metadata.params.compaction);
schemaCompactionParams = metadata.params.compaction;
if (disabledWithJMX || !shouldBeEnabled())
disable();
else
enable();
startup();
}
public void replaceFlushed(Memtable memtable, Collection<SSTableReader> sstables)
{
cfs.getTracker().replaceFlushed(memtable, sstables);
if (sstables != null && !sstables.isEmpty())
CompactionManager.instance.submitBackground(cfs);
}
public int getUnleveledSSTables()
{
if (repaired instanceof LeveledCompactionStrategy && unrepaired instanceof LeveledCompactionStrategy)
{
int count = 0;
count += ((LeveledCompactionStrategy)repaired).getLevelSize(0);
count += ((LeveledCompactionStrategy)unrepaired).getLevelSize(0);
return count;
}
return 0;
}
public synchronized int[] getSSTableCountPerLevel()
{
if (repaired instanceof LeveledCompactionStrategy && unrepaired instanceof LeveledCompactionStrategy)
{
int [] res = new int[LeveledManifest.MAX_LEVEL_COUNT];
int[] repairedCountPerLevel = ((LeveledCompactionStrategy) repaired).getAllLevelSize();
res = sumArrays(res, repairedCountPerLevel);
int[] unrepairedCountPerLevel = ((LeveledCompactionStrategy) unrepaired).getAllLevelSize();
res = sumArrays(res, unrepairedCountPerLevel);
return res;
}
return null;
}
private static int[] sumArrays(int[] a, int[] b)
{
int[] res = new int[Math.max(a.length, b.length)];
for (int i = 0; i < res.length; i++)
{
if (i < a.length && i < b.length)
res[i] = a[i] + b[i];
else if (i < a.length)
res[i] = a[i];
else
res[i] = b[i];
}
return res;
}
public boolean shouldDefragment()
{
assert repaired.getClass().equals(unrepaired.getClass());
return repaired.shouldDefragment();
}
public Directories getDirectories()
{
assert repaired.getClass().equals(unrepaired.getClass());
return repaired.getDirectories();
}
public synchronized void handleNotification(INotification notification, Object sender)
{
if (notification instanceof SSTableAddedNotification)
{
SSTableAddedNotification flushedNotification = (SSTableAddedNotification) notification;
for (SSTableReader sstable : flushedNotification.added)
{
if (sstable.isRepaired())
repaired.addSSTable(sstable);
else
unrepaired.addSSTable(sstable);
}
}
else if (notification instanceof SSTableListChangedNotification)
{
SSTableListChangedNotification listChangedNotification = (SSTableListChangedNotification) notification;
Set<SSTableReader> repairedRemoved = new HashSet<>();
Set<SSTableReader> repairedAdded = new HashSet<>();
Set<SSTableReader> unrepairedRemoved = new HashSet<>();
Set<SSTableReader> unrepairedAdded = new HashSet<>();
for (SSTableReader sstable : listChangedNotification.removed)
{
if (sstable.isRepaired())
repairedRemoved.add(sstable);
else
unrepairedRemoved.add(sstable);
}
for (SSTableReader sstable : listChangedNotification.added)
{
if (sstable.isRepaired())
repairedAdded.add(sstable);
else
unrepairedAdded.add(sstable);
}
if (!repairedRemoved.isEmpty())
{
repaired.replaceSSTables(repairedRemoved, repairedAdded);
}
else
{
for (SSTableReader sstable : repairedAdded)
repaired.addSSTable(sstable);
}
if (!unrepairedRemoved.isEmpty())
{
unrepaired.replaceSSTables(unrepairedRemoved, unrepairedAdded);
}
else
{
for (SSTableReader sstable : unrepairedAdded)
unrepaired.addSSTable(sstable);
}
}
else if (notification instanceof SSTableRepairStatusChanged)
{
for (SSTableReader sstable : ((SSTableRepairStatusChanged) notification).sstable)
{
if (sstable.isRepaired())
{
unrepaired.removeSSTable(sstable);
repaired.addSSTable(sstable);
}
else
{
repaired.removeSSTable(sstable);
unrepaired.addSSTable(sstable);
}
}
}
else if (notification instanceof SSTableDeletingNotification)
{
SSTableReader sstable = ((SSTableDeletingNotification)notification).deleting;
if (sstable.isRepaired())
repaired.removeSSTable(sstable);
else
unrepaired.removeSSTable(sstable);
}
}
public void enable()
{
if (repaired != null)
repaired.enable();
if (unrepaired != null)
unrepaired.enable();
// enable this last to make sure the strategies are ready to get calls.
enabled = true;
}
public void disable()
{
// disable this first avoid asking disabled strategies for compaction tasks
enabled = false;
if (repaired != null)
repaired.disable();
if (unrepaired != null)
unrepaired.disable();
}
/**
* Create ISSTableScanner from the given sstables
*
* Delegates the call to the compaction strategies to allow LCS to create a scanner
* @param sstables
* @param ranges
* @return
*/
@SuppressWarnings("resource")
public synchronized AbstractCompactionStrategy.ScannerList getScanners(Collection<SSTableReader> sstables, Collection<Range<Token>> ranges)
{
List<SSTableReader> repairedSSTables = new ArrayList<>();
List<SSTableReader> unrepairedSSTables = new ArrayList<>();
for (SSTableReader sstable : sstables)
{
if (sstable.isRepaired())
repairedSSTables.add(sstable);
else
unrepairedSSTables.add(sstable);
}
Set<ISSTableScanner> scanners = new HashSet<>(sstables.size());
AbstractCompactionStrategy.ScannerList repairedScanners = repaired.getScanners(repairedSSTables, ranges);
AbstractCompactionStrategy.ScannerList unrepairedScanners = unrepaired.getScanners(unrepairedSSTables, ranges);
scanners.addAll(repairedScanners.scanners);
scanners.addAll(unrepairedScanners.scanners);
return new AbstractCompactionStrategy.ScannerList(new ArrayList<>(scanners));
}
public synchronized AbstractCompactionStrategy.ScannerList getScanners(Collection<SSTableReader> sstables)
{
return getScanners(sstables, null);
}
public Collection<Collection<SSTableReader>> groupSSTablesForAntiCompaction(Collection<SSTableReader> sstablesToGroup)
{
return unrepaired.groupSSTablesForAntiCompaction(sstablesToGroup);
}
public long getMaxSSTableBytes()
{
return unrepaired.getMaxSSTableBytes();
}
public AbstractCompactionTask getCompactionTask(LifecycleTransaction txn, int gcBefore, long maxSSTableBytes)
{
return getCompactionStrategyFor(txn.originals().iterator().next()).getCompactionTask(txn, gcBefore, maxSSTableBytes);
}
public Collection<AbstractCompactionTask> getMaximalTasks(final int gcBefore, final boolean splitOutput)
{
// runWithCompactionsDisabled cancels active compactions and disables them, then we are able
// to make the repaired/unrepaired strategies mark their own sstables as compacting. Once the
// sstables are marked the compactions are re-enabled
return cfs.runWithCompactionsDisabled(new Callable<Collection<AbstractCompactionTask>>()
{
@Override
public Collection<AbstractCompactionTask> call() throws Exception
{
synchronized (CompactionStrategyManager.this)
{
Collection<AbstractCompactionTask> repairedTasks = repaired.getMaximalTask(gcBefore, splitOutput);
Collection<AbstractCompactionTask> unrepairedTasks = unrepaired.getMaximalTask(gcBefore, splitOutput);
if (repairedTasks == null && unrepairedTasks == null)
return null;
if (repairedTasks == null)
return unrepairedTasks;
if (unrepairedTasks == null)
return repairedTasks;
List<AbstractCompactionTask> tasks = new ArrayList<>();
tasks.addAll(repairedTasks);
tasks.addAll(unrepairedTasks);
return tasks;
}
}
}, false, false);
}
public AbstractCompactionTask getUserDefinedTask(Collection<SSTableReader> sstables, int gcBefore)
{
return getCompactionStrategyFor(sstables.iterator().next()).getUserDefinedTask(sstables, gcBefore);
}
public int getEstimatedRemainingTasks()
{
int tasks = 0;
tasks += repaired.getEstimatedRemainingTasks();
tasks += unrepaired.getEstimatedRemainingTasks();
return tasks;
}
public boolean shouldBeEnabled()
{
return params.isEnabled();
}
public String getName()
{
return unrepaired.getName();
}
public List<AbstractCompactionStrategy> getStrategies()
{
return Arrays.asList(repaired, unrepaired);
}
public synchronized void setNewLocalCompactionStrategy(CompactionParams params)
{
logger.info("Switching local compaction strategy from {} to {}}", this.params, params);
setStrategy(params);
if (shouldBeEnabled())
enable();
else
disable();
startup();
}
private void setStrategy(CompactionParams params)
{
if (repaired != null)
repaired.shutdown();
if (unrepaired != null)
unrepaired.shutdown();
repaired = CFMetaData.createCompactionStrategyInstance(cfs, params);
unrepaired = CFMetaData.createCompactionStrategyInstance(cfs, params);
this.params = params;
}
public CompactionParams getCompactionParams()
{
return params;
}
public boolean onlyPurgeRepairedTombstones()
{
return Boolean.parseBoolean(params.options().get(AbstractCompactionStrategy.ONLY_PURGE_REPAIRED_TOMBSTONES));
}
public SSTableMultiWriter createSSTableMultiWriter(Descriptor descriptor, long keyCount, long repairedAt, MetadataCollector collector, SerializationHeader header, LifecycleNewTracker lifecycleNewTracker)
{
if (repairedAt == ActiveRepairService.UNREPAIRED_SSTABLE)
{
return unrepaired.createSSTableMultiWriter(descriptor, keyCount, repairedAt, collector, header, lifecycleNewTracker);
}
else
{
return repaired.createSSTableMultiWriter(descriptor, keyCount, repairedAt, collector, header, lifecycleNewTracker);
}
}
public boolean supportsEarlyOpen()
{
return repaired.supportsEarlyOpen();
}
}