blob: 6b5b8a45c918452ed4c116eca2e8e08eb7eef37c [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.cassandra.db.compaction;
import java.util.*;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableMap;
import com.google.common.base.Predicate;
import com.google.common.collect.Iterables;
import com.google.common.util.concurrent.RateLimiter;
import org.apache.cassandra.db.Directories;
import org.apache.cassandra.db.SerializationHeader;
import org.apache.cassandra.index.Index;
import org.apache.cassandra.io.sstable.Descriptor;
import org.apache.cassandra.io.sstable.SSTableMultiWriter;
import org.apache.cassandra.io.sstable.SimpleSSTableMultiWriter;
import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.Memtable;
import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.exceptions.ConfigurationException;
import org.apache.cassandra.io.sstable.Component;
import org.apache.cassandra.io.sstable.ISSTableScanner;
import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
import org.apache.cassandra.schema.CompactionParams;
import org.apache.cassandra.utils.JVMStabilityInspector;
/**
* Pluggable compaction strategy determines how SSTables get merged.
*
* There are two main goals:
* - perform background compaction constantly as needed; this typically makes a tradeoff between
* i/o done by compaction, and merging done at read time.
* - perform a full (maximum possible) compaction if requested by the user
*/
public abstract class AbstractCompactionStrategy
{
private static final Logger logger = LoggerFactory.getLogger(AbstractCompactionStrategy.class);
protected static final float DEFAULT_TOMBSTONE_THRESHOLD = 0.2f;
// minimum interval needed to perform tombstone removal compaction in seconds, default 86400 or 1 day.
protected static final long DEFAULT_TOMBSTONE_COMPACTION_INTERVAL = 86400;
protected static final boolean DEFAULT_UNCHECKED_TOMBSTONE_COMPACTION_OPTION = false;
protected static final boolean DEFAULT_LOG_ALL_OPTION = false;
protected static final String TOMBSTONE_THRESHOLD_OPTION = "tombstone_threshold";
protected static final String TOMBSTONE_COMPACTION_INTERVAL_OPTION = "tombstone_compaction_interval";
// disable range overlap check when deciding if an SSTable is candidate for tombstone compaction (CASSANDRA-6563)
protected static final String UNCHECKED_TOMBSTONE_COMPACTION_OPTION = "unchecked_tombstone_compaction";
protected static final String LOG_ALL_OPTION = "log_all";
protected static final String COMPACTION_ENABLED = "enabled";
public static final String ONLY_PURGE_REPAIRED_TOMBSTONES = "only_purge_repaired_tombstones";
protected Map<String, String> options;
protected final ColumnFamilyStore cfs;
protected float tombstoneThreshold;
protected long tombstoneCompactionInterval;
protected boolean uncheckedTombstoneCompaction;
protected boolean disableTombstoneCompactions = false;
protected boolean logAll = true;
private final Directories directories;
/**
* pause/resume/getNextBackgroundTask must synchronize. This guarantees that after pause completes,
* no new tasks will be generated; or put another way, pause can't run until in-progress tasks are
* done being created.
*
* This allows runWithCompactionsDisabled to be confident that after pausing, once in-progress
* tasks abort, it's safe to proceed with truncate/cleanup/etc.
*
* See CASSANDRA-3430
*/
protected boolean isActive = false;
protected AbstractCompactionStrategy(ColumnFamilyStore cfs, Map<String, String> options)
{
assert cfs != null;
this.cfs = cfs;
this.options = ImmutableMap.copyOf(options);
/* checks must be repeated here, as user supplied strategies might not call validateOptions directly */
try
{
validateOptions(options);
String optionValue = options.get(TOMBSTONE_THRESHOLD_OPTION);
tombstoneThreshold = optionValue == null ? DEFAULT_TOMBSTONE_THRESHOLD : Float.parseFloat(optionValue);
optionValue = options.get(TOMBSTONE_COMPACTION_INTERVAL_OPTION);
tombstoneCompactionInterval = optionValue == null ? DEFAULT_TOMBSTONE_COMPACTION_INTERVAL : Long.parseLong(optionValue);
optionValue = options.get(UNCHECKED_TOMBSTONE_COMPACTION_OPTION);
uncheckedTombstoneCompaction = optionValue == null ? DEFAULT_UNCHECKED_TOMBSTONE_COMPACTION_OPTION : Boolean.parseBoolean(optionValue);
optionValue = options.get(LOG_ALL_OPTION);
logAll = optionValue == null ? DEFAULT_LOG_ALL_OPTION : Boolean.parseBoolean(optionValue);
if (!shouldBeEnabled())
this.disable();
}
catch (ConfigurationException e)
{
logger.warn("Error setting compaction strategy options ({}), defaults will be used", e.getMessage());
tombstoneThreshold = DEFAULT_TOMBSTONE_THRESHOLD;
tombstoneCompactionInterval = DEFAULT_TOMBSTONE_COMPACTION_INTERVAL;
uncheckedTombstoneCompaction = DEFAULT_UNCHECKED_TOMBSTONE_COMPACTION_OPTION;
}
directories = cfs.getDirectories();
}
public Directories getDirectories()
{
return directories;
}
/**
* For internal, temporary suspension of background compactions so that we can do exceptional
* things like truncate or major compaction
*/
public synchronized void pause()
{
isActive = false;
}
/**
* For internal, temporary suspension of background compactions so that we can do exceptional
* things like truncate or major compaction
*/
public synchronized void resume()
{
isActive = true;
}
/**
* Performs any extra initialization required
*/
public void startup()
{
isActive = true;
}
/**
* Releases any resources if this strategy is shutdown (when the CFS is reloaded after a schema change).
*/
public void shutdown()
{
isActive = false;
}
/**
* @param gcBefore throw away tombstones older than this
*
* @return the next background/minor compaction task to run; null if nothing to do.
*
* Is responsible for marking its sstables as compaction-pending.
*/
public abstract AbstractCompactionTask getNextBackgroundTask(final int gcBefore);
/**
* @param gcBefore throw away tombstones older than this
*
* @return a compaction task that should be run to compact this columnfamilystore
* as much as possible. Null if nothing to do.
*
* Is responsible for marking its sstables as compaction-pending.
*/
public abstract Collection<AbstractCompactionTask> getMaximalTask(final int gcBefore, boolean splitOutput);
/**
* @param sstables SSTables to compact. Must be marked as compacting.
* @param gcBefore throw away tombstones older than this
*
* @return a compaction task corresponding to the requested sstables.
* Will not be null. (Will throw if user requests an invalid compaction.)
*
* Is responsible for marking its sstables as compaction-pending.
*/
public abstract AbstractCompactionTask getUserDefinedTask(Collection<SSTableReader> sstables, final int gcBefore);
public AbstractCompactionTask getCompactionTask(LifecycleTransaction txn, final int gcBefore, long maxSSTableBytes)
{
return new CompactionTask(cfs, txn, gcBefore);
}
/**
* @return the number of background tasks estimated to still be needed for this columnfamilystore
*/
public abstract int getEstimatedRemainingTasks();
/**
* @return size in bytes of the largest sstables for this strategy
*/
public abstract long getMaxSSTableBytes();
public void enable()
{
}
public void disable()
{
}
/**
* @return whether or not MeteredFlusher should be able to trigger memtable flushes for this CF.
*/
public boolean isAffectedByMeteredFlusher()
{
return true;
}
/**
* If not affected by MeteredFlusher (and handling flushing on its own), override to tell MF how much
* space to reserve for this CF, i.e., how much space to subtract from `memtable_total_space_in_mb` when deciding
* if other memtables should be flushed or not.
*/
public long getMemtableReservedSize()
{
return 0;
}
/**
* Handle a flushed memtable.
*
* @param memtable the flushed memtable
* @param sstables the written sstables. can be null or empty if the memtable was clean.
*/
public void replaceFlushed(Memtable memtable, Collection<SSTableReader> sstables)
{
}
/**
* Filters SSTables that are to be blacklisted from the given collection
*
* @param originalCandidates The collection to check for blacklisted SSTables
* @return list of the SSTables with blacklisted ones filtered out
*/
public static Iterable<SSTableReader> filterSuspectSSTables(Iterable<SSTableReader> originalCandidates)
{
return Iterables.filter(originalCandidates, new Predicate<SSTableReader>()
{
public boolean apply(SSTableReader sstable)
{
return !sstable.isMarkedSuspect();
}
});
}
public ScannerList getScanners(Collection<SSTableReader> sstables, Range<Token> range)
{
return range == null ? getScanners(sstables, (Collection<Range<Token>>)null) : getScanners(sstables, Collections.singleton(range));
}
/**
* Returns a list of KeyScanners given sstables and a range on which to scan.
* The default implementation simply grab one SSTableScanner per-sstable, but overriding this method
* allow for a more memory efficient solution if we know the sstable don't overlap (see
* LeveledCompactionStrategy for instance).
*/
@SuppressWarnings("resource")
public ScannerList getScanners(Collection<SSTableReader> sstables, Collection<Range<Token>> ranges)
{
RateLimiter limiter = CompactionManager.instance.getRateLimiter();
ArrayList<ISSTableScanner> scanners = new ArrayList<ISSTableScanner>();
try
{
for (SSTableReader sstable : sstables)
scanners.add(sstable.getScanner(ranges, limiter));
}
catch (Throwable t)
{
try
{
new ScannerList(scanners).close();
}
catch (Throwable t2)
{
t.addSuppressed(t2);
}
throw t;
}
return new ScannerList(scanners);
}
public boolean shouldDefragment()
{
return false;
}
public String getName()
{
return getClass().getSimpleName();
}
public synchronized void replaceSSTables(Collection<SSTableReader> removed, Collection<SSTableReader> added)
{
for (SSTableReader remove : removed)
removeSSTable(remove);
for (SSTableReader add : added)
addSSTable(add);
}
public abstract void addSSTable(SSTableReader added);
public synchronized void addSSTables(Iterable<SSTableReader> added)
{
for (SSTableReader sstable : added)
addSSTable(sstable);
}
public abstract void removeSSTable(SSTableReader sstable);
public static class ScannerList implements AutoCloseable
{
public final List<ISSTableScanner> scanners;
public ScannerList(List<ISSTableScanner> scanners)
{
this.scanners = scanners;
}
public void close()
{
Throwable t = null;
for (ISSTableScanner scanner : scanners)
{
try
{
scanner.close();
}
catch (Throwable t2)
{
JVMStabilityInspector.inspectThrowable(t2);
if (t == null)
t = t2;
else
t.addSuppressed(t2);
}
}
if (t != null)
throw Throwables.propagate(t);
}
}
public ScannerList getScanners(Collection<SSTableReader> toCompact)
{
return getScanners(toCompact, (Collection<Range<Token>>)null);
}
/**
* Check if given sstable is worth dropping tombstones at gcBefore.
* Check is skipped if tombstone_compaction_interval time does not elapse since sstable creation and returns false.
*
* @param sstable SSTable to check
* @param gcBefore time to drop tombstones
* @return true if given sstable's tombstones are expected to be removed
*/
protected boolean worthDroppingTombstones(SSTableReader sstable, int gcBefore)
{
if (disableTombstoneCompactions || CompactionController.NEVER_PURGE_TOMBSTONES)
return false;
// since we use estimations to calculate, there is a chance that compaction will not drop tombstones actually.
// if that happens we will end up in infinite compaction loop, so first we check enough if enough time has
// elapsed since SSTable created.
if (System.currentTimeMillis() < sstable.getCreationTimeFor(Component.DATA) + tombstoneCompactionInterval * 1000)
return false;
double droppableRatio = sstable.getEstimatedDroppableTombstoneRatio(gcBefore);
if (droppableRatio <= tombstoneThreshold)
return false;
//sstable range overlap check is disabled. See CASSANDRA-6563.
if (uncheckedTombstoneCompaction)
return true;
Collection<SSTableReader> overlaps = cfs.getOverlappingLiveSSTables(Collections.singleton(sstable));
if (overlaps.isEmpty())
{
// there is no overlap, tombstones are safely droppable
return true;
}
else if (CompactionController.getFullyExpiredSSTables(cfs, Collections.singleton(sstable), overlaps, gcBefore).size() > 0)
{
return true;
}
else
{
// what percentage of columns do we expect to compact outside of overlap?
if (sstable.getIndexSummarySize() < 2)
{
// we have too few samples to estimate correct percentage
return false;
}
// first, calculate estimated keys that do not overlap
long keys = sstable.estimatedKeys();
Set<Range<Token>> ranges = new HashSet<Range<Token>>(overlaps.size());
for (SSTableReader overlap : overlaps)
ranges.add(new Range<>(overlap.first.getToken(), overlap.last.getToken()));
long remainingKeys = keys - sstable.estimatedKeysForRanges(ranges);
// next, calculate what percentage of columns we have within those keys
long columns = sstable.getEstimatedColumnCount().mean() * remainingKeys;
double remainingColumnsRatio = ((double) columns) / (sstable.getEstimatedColumnCount().count() * sstable.getEstimatedColumnCount().mean());
// return if we still expect to have droppable tombstones in rest of columns
return remainingColumnsRatio * droppableRatio > tombstoneThreshold;
}
}
public static Map<String, String> validateOptions(Map<String, String> options) throws ConfigurationException
{
String threshold = options.get(TOMBSTONE_THRESHOLD_OPTION);
if (threshold != null)
{
try
{
float thresholdValue = Float.parseFloat(threshold);
if (thresholdValue < 0)
{
throw new ConfigurationException(String.format("%s must be greater than 0, but was %f", TOMBSTONE_THRESHOLD_OPTION, thresholdValue));
}
}
catch (NumberFormatException e)
{
throw new ConfigurationException(String.format("%s is not a parsable int (base10) for %s", threshold, TOMBSTONE_THRESHOLD_OPTION), e);
}
}
String interval = options.get(TOMBSTONE_COMPACTION_INTERVAL_OPTION);
if (interval != null)
{
try
{
long tombstoneCompactionInterval = Long.parseLong(interval);
if (tombstoneCompactionInterval < 0)
{
throw new ConfigurationException(String.format("%s must be greater than 0, but was %d", TOMBSTONE_COMPACTION_INTERVAL_OPTION, tombstoneCompactionInterval));
}
}
catch (NumberFormatException e)
{
throw new ConfigurationException(String.format("%s is not a parsable int (base10) for %s", interval, TOMBSTONE_COMPACTION_INTERVAL_OPTION), e);
}
}
String unchecked = options.get(UNCHECKED_TOMBSTONE_COMPACTION_OPTION);
if (unchecked != null)
{
if (!unchecked.equalsIgnoreCase("true") && !unchecked.equalsIgnoreCase("false"))
throw new ConfigurationException(String.format("'%s' should be either 'true' or 'false', not '%s'", UNCHECKED_TOMBSTONE_COMPACTION_OPTION, unchecked));
}
String logAll = options.get(LOG_ALL_OPTION);
if (logAll != null)
{
if (!logAll.equalsIgnoreCase("true") && !logAll.equalsIgnoreCase("false"))
{
throw new ConfigurationException(String.format("'%s' should either be 'true' or 'false', not %s", LOG_ALL_OPTION, logAll));
}
}
String compactionEnabled = options.get(COMPACTION_ENABLED);
if (compactionEnabled != null)
{
if (!compactionEnabled.equalsIgnoreCase("true") && !compactionEnabled.equalsIgnoreCase("false"))
{
throw new ConfigurationException(String.format("enabled should either be 'true' or 'false', not %s", compactionEnabled));
}
}
Map<String, String> uncheckedOptions = new HashMap<String, String>(options);
uncheckedOptions.remove(TOMBSTONE_THRESHOLD_OPTION);
uncheckedOptions.remove(TOMBSTONE_COMPACTION_INTERVAL_OPTION);
uncheckedOptions.remove(UNCHECKED_TOMBSTONE_COMPACTION_OPTION);
uncheckedOptions.remove(LOG_ALL_OPTION);
uncheckedOptions.remove(COMPACTION_ENABLED);
uncheckedOptions.remove(ONLY_PURGE_REPAIRED_TOMBSTONES);
uncheckedOptions.remove(CompactionParams.Option.PROVIDE_OVERLAPPING_TOMBSTONES.toString());
return uncheckedOptions;
}
public boolean shouldBeEnabled()
{
String optionValue = options.get(COMPACTION_ENABLED);
return optionValue == null || Boolean.parseBoolean(optionValue);
}
/**
* Method for grouping similar SSTables together, This will be used by
* anti-compaction to determine which SSTables should be anitcompacted
* as a group. If a given compaction strategy creates sstables which
* cannot be merged due to some constraint it must override this method.
*/
public Collection<Collection<SSTableReader>> groupSSTablesForAntiCompaction(Collection<SSTableReader> sstablesToGroup)
{
int groupSize = 2;
List<SSTableReader> sortedSSTablesToGroup = new ArrayList<>(sstablesToGroup);
Collections.sort(sortedSSTablesToGroup, SSTableReader.sstableComparator);
Collection<Collection<SSTableReader>> groupedSSTables = new ArrayList<>();
Collection<SSTableReader> currGroup = new ArrayList<>();
for (SSTableReader sstable : sortedSSTablesToGroup)
{
currGroup.add(sstable);
if (currGroup.size() == groupSize)
{
groupedSSTables.add(currGroup);
currGroup = new ArrayList<>();
}
}
if (currGroup.size() != 0)
groupedSSTables.add(currGroup);
return groupedSSTables;
}
public CompactionLogger.Strategy strategyLogger()
{
return CompactionLogger.Strategy.none;
}
public SSTableMultiWriter createSSTableMultiWriter(Descriptor descriptor,
long keyCount,
long repairedAt,
MetadataCollector meta,
SerializationHeader header,
Collection<Index> indexes,
LifecycleTransaction txn)
{
return SimpleSSTableMultiWriter.create(descriptor, keyCount, repairedAt, cfs.metadata, meta, header, indexes, txn);
}
public boolean supportsEarlyOpen()
{
return true;
}
}