blob: a3134aa2017044a6a7b699461193c578c7ce3aa7 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.cassandra.db;
import java.io.File;
import java.io.IOException;
import java.lang.management.ManagementFactory;
import java.net.InetAddress;
import java.util.*;
import java.util.Map.Entry;
import java.util.concurrent.Callable;
import java.util.concurrent.Future;
import javax.management.MBeanServer;
import javax.management.ObjectName;
import org.apache.log4j.Logger;
import org.apache.commons.collections.PredicateUtils;
import org.apache.commons.collections.iterators.CollatingIterator;
import org.apache.commons.collections.iterators.FilterIterator;
import org.apache.commons.lang.StringUtils;
import org.apache.cassandra.concurrent.DebuggableThreadPoolExecutor;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.io.*;
import org.apache.cassandra.io.util.FileUtils;
import org.apache.cassandra.service.AntiEntropyService;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.Pair;
import org.cliffc.high_scale_lib.NonBlockingHashMap;
public class CompactionManager implements CompactionManagerMBean
{
public static final String MBEAN_OBJECT_NAME = "org.apache.cassandra.db:type=CompactionManager";
private static final Logger logger = Logger.getLogger(CompactionManager.class);
public static final CompactionManager instance;
private int minimumCompactionThreshold = 4; // compact this many sstables min at a time
private int maximumCompactionThreshold = 32; // compact this many sstables max at a time
static
{
instance = new CompactionManager();
MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
try
{
mbs.registerMBean(instance, new ObjectName(MBEAN_OBJECT_NAME));
}
catch (Exception e)
{
throw new RuntimeException(e);
}
}
private CompactionExecutor executor = new CompactionExecutor();
private Map<ColumnFamilyStore, Integer> estimatedCompactions = new NonBlockingHashMap<ColumnFamilyStore, Integer>();
/**
* Call this whenever a compaction might be needed on the given columnfamily.
* It's okay to over-call (within reason) since the compactions are single-threaded,
* and if a call is unnecessary, it will just be no-oped in the bucketing phase.
*/
public Future<Integer> submitMinorIfNeeded(final ColumnFamilyStore cfs)
{
Callable<Integer> callable = new Callable<Integer>()
{
public Integer call() throws IOException
{
if (minimumCompactionThreshold <= 0 || maximumCompactionThreshold <= 0)
{
logger.debug("Compaction is currently disabled.");
return 0;
}
logger.debug("Checking to see if compaction of " + cfs.columnFamily_ + " would be useful");
Set<List<SSTableReader>> buckets = getBuckets(
convertSSTablesToPairs(cfs.getSSTables()), 50L * 1024L * 1024L);
updateEstimateFor(cfs, buckets);
for (List<SSTableReader> sstables : buckets)
{
if (sstables.size() >= minimumCompactionThreshold)
{
// if we have too many to compact all at once, compact older ones first -- this avoids
// re-compacting files we just created.
Collections.sort(sstables);
return doCompaction(cfs, sstables.subList(0, Math.min(sstables.size(), maximumCompactionThreshold)), getDefaultGCBefore());
}
}
return 0;
}
};
return executor.submit(callable);
}
private void updateEstimateFor(ColumnFamilyStore cfs, Set<List<SSTableReader>> buckets)
{
int n = 0;
for (List<SSTableReader> sstables : buckets)
{
if (sstables.size() >= minimumCompactionThreshold)
{
n += 1 + sstables.size() / (maximumCompactionThreshold - minimumCompactionThreshold);
}
}
estimatedCompactions.put(cfs, n);
}
public Future<Object> submitCleanup(final ColumnFamilyStore cfStore)
{
Callable<Object> runnable = new Callable<Object>()
{
public Object call() throws IOException
{
doCleanupCompaction(cfStore);
return this;
}
};
return executor.submit(runnable);
}
public Future<List<String>> submitAnticompaction(final ColumnFamilyStore cfStore, final Collection<Range> ranges, final InetAddress target)
{
Callable<List<String>> callable = new Callable<List<String>>()
{
public List<String> call() throws IOException
{
return doAntiCompaction(cfStore, cfStore.getSSTables(), ranges, target);
}
};
return executor.submit(callable);
}
public Future submitMajor(final ColumnFamilyStore cfStore)
{
return submitMajor(cfStore, 0, getDefaultGCBefore());
}
public Future submitMajor(final ColumnFamilyStore cfStore, final long skip, final int gcBefore)
{
Callable<Object> callable = new Callable<Object>()
{
public Object call() throws IOException
{
Collection<SSTableReader> sstables;
if (skip > 0)
{
sstables = new ArrayList<SSTableReader>();
for (SSTableReader sstable : cfStore.getSSTables())
{
if (sstable.length() < skip * 1024L * 1024L * 1024L)
{
sstables.add(sstable);
}
}
}
else
{
sstables = cfStore.getSSTables();
}
doCompaction(cfStore, sstables, gcBefore);
return this;
}
};
return executor.submit(callable);
}
public Future submitValidation(final ColumnFamilyStore cfStore, final AntiEntropyService.Validator validator)
{
Callable<Object> callable = new Callable<Object>()
{
public Object call() throws IOException
{
doValidationCompaction(cfStore, validator);
return this;
}
};
return executor.submit(callable);
}
/**
* Gets the minimum number of sstables in queue before compaction kicks off
*/
public int getMinimumCompactionThreshold()
{
return minimumCompactionThreshold;
}
/**
* Sets the minimum number of sstables in queue before compaction kicks off
*/
public void setMinimumCompactionThreshold(int threshold)
{
minimumCompactionThreshold = threshold;
}
/**
* Gets the maximum number of sstables in queue before compaction kicks off
*/
public int getMaximumCompactionThreshold()
{
return maximumCompactionThreshold;
}
/**
* Sets the maximum number of sstables in queue before compaction kicks off
*/
public void setMaximumCompactionThreshold(int threshold)
{
maximumCompactionThreshold = threshold;
}
public void disableAutoCompaction()
{
minimumCompactionThreshold = 0;
maximumCompactionThreshold = 0;
}
/**
* For internal use and testing only. The rest of the system should go through the submit* methods,
* which are properly serialized.
*/
int doCompaction(ColumnFamilyStore cfs, Collection<SSTableReader> sstables, int gcBefore) throws IOException
{
// The collection of sstables passed may be empty (but not null); even if
// it is not empty, it may compact down to nothing if all rows are deleted.
Table table = cfs.getTable();
if (DatabaseDescriptor.isSnapshotBeforeCompaction())
table.snapshot("compact-" + cfs.columnFamily_);
logger.info("Compacting [" + StringUtils.join(sstables, ",") + "]");
String compactionFileLocation = table.getDataFileLocation(cfs.getExpectedCompactedFileSize(sstables));
// If the compaction file path is null that means we have no space left for this compaction.
// try again w/o the largest one.
List<SSTableReader> smallerSSTables = new ArrayList<SSTableReader>(sstables);
while (compactionFileLocation == null && smallerSSTables.size() > 1)
{
logger.warn("insufficient space to compact all requested files " + StringUtils.join(smallerSSTables, ", "));
smallerSSTables.remove(cfs.getMaxSizeFile(smallerSSTables));
compactionFileLocation = table.getDataFileLocation(cfs.getExpectedCompactedFileSize(smallerSSTables));
}
if (compactionFileLocation == null)
{
logger.error("insufficient space to compact even the two smallest files, aborting");
return 0;
}
sstables = smallerSSTables;
// new sstables from flush can be added during a compaction, but only the compaction can remove them,
// so in our single-threaded compaction world this is a valid way of determining if we're compacting
// all the sstables (that existed when we started)
boolean major = cfs.isCompleteSSTables(sstables);
long startTime = System.currentTimeMillis();
long totalkeysWritten = 0;
// TODO the int cast here is potentially buggy
int expectedBloomFilterSize = Math.max(DatabaseDescriptor.getIndexInterval(), (int)SSTableReader.getApproximateKeyCount(sstables));
if (logger.isDebugEnabled())
logger.debug("Expected bloom filter size : " + expectedBloomFilterSize);
SSTableWriter writer;
CompactionIterator ci = new CompactionIterator(cfs, sstables, gcBefore, major); // retain a handle so we can call close()
Iterator<CompactionIterator.CompactedRow> nni = new FilterIterator(ci, PredicateUtils.notNullPredicate());
executor.beginCompaction(cfs, ci);
Map<DecoratedKey, SSTable.PositionSize> cachedKeys = new HashMap<DecoratedKey, SSTable.PositionSize>();
try
{
if (!nni.hasNext())
{
// don't mark compacted in the finally block, since if there _is_ nondeleted data,
// we need to sync it (via closeAndOpen) first, so there is no period during which
// a crash could cause data loss.
cfs.markCompacted(sstables);
return 0;
}
String newFilename = new File(compactionFileLocation, cfs.getTempSSTableFileName()).getAbsolutePath();
writer = new SSTableWriter(newFilename, expectedBloomFilterSize, StorageService.getPartitioner());
while (nni.hasNext())
{
CompactionIterator.CompactedRow row = nni.next();
long prevpos = writer.getFilePointer();
writer.append(row.key, row.buffer);
totalkeysWritten++;
long rowsize = writer.getFilePointer() - prevpos;
if (rowsize > DatabaseDescriptor.getRowWarningThreshold())
logger.warn("Large row " + row.key.key + " in " + cfs.getColumnFamilyName() + " " + rowsize + " bytes");
cfs.addToCompactedRowStats(rowsize);
for (SSTableReader sstable : sstables)
{
if (sstable.getCachedPosition(row.key) != null)
{
cachedKeys.put(row.key, new SSTable.PositionSize(prevpos, rowsize));
break;
}
}
}
}
finally
{
ci.close();
}
SSTableReader ssTable = writer.closeAndOpenReader();
cfs.replaceCompactedSSTables(sstables, Arrays.asList(ssTable));
for (Entry<DecoratedKey, SSTable.PositionSize> entry : cachedKeys.entrySet())
ssTable.cacheKey(entry.getKey(), entry.getValue());
submitMinorIfNeeded(cfs);
String format = "Compacted to %s. %d/%d bytes for %d keys. Time: %dms";
long dTime = System.currentTimeMillis() - startTime;
logger.info(String.format(format, writer.getFilename(), SSTable.getTotalBytes(sstables), ssTable.length(), totalkeysWritten, dTime));
return sstables.size();
}
private SSTableWriter antiCompactionHelper(ColumnFamilyStore cfs, Collection<SSTableReader> sstables, Collection<Range> ranges, InetAddress target)
throws IOException
{
Table table = cfs.getTable();
logger.info("AntiCompacting [" + StringUtils.join(sstables, ",") + "]");
// Calculate the expected compacted filesize
long expectedRangeFileSize = cfs.getExpectedCompactedFileSize(sstables) / 2;
String compactionFileLocation = table.getDataFileLocation(expectedRangeFileSize);
if (compactionFileLocation == null)
{
throw new UnsupportedOperationException("disk full");
}
if (target != null)
{
// compacting for streaming: send to subdirectory
compactionFileLocation = compactionFileLocation + File.separator + DatabaseDescriptor.STREAMING_SUBDIR;
}
long totalKeysWritten = 0;
long startTime = System.currentTimeMillis();
int expectedBloomFilterSize = Math.max(DatabaseDescriptor.getIndexInterval(), (int)(SSTableReader.getApproximateKeyCount(sstables) / 2));
if (logger.isDebugEnabled())
logger.debug("Expected bloom filter size : " + expectedBloomFilterSize);
SSTableWriter writer = null;
CompactionIterator ci = new AntiCompactionIterator(cfs, sstables, ranges, getDefaultGCBefore(), cfs.isCompleteSSTables(sstables));
Iterator<CompactionIterator.CompactedRow> nni = new FilterIterator(ci, PredicateUtils.notNullPredicate());
executor.beginCompaction(cfs, ci);
try
{
while (nni.hasNext())
{
CompactionIterator.CompactedRow row = nni.next();
if (writer == null)
{
FileUtils.createDirectory(compactionFileLocation);
String newFilename = new File(compactionFileLocation, cfs.getTempSSTableFileName()).getAbsolutePath();
writer = new SSTableWriter(newFilename, expectedBloomFilterSize, StorageService.getPartitioner());
}
writer.append(row.key, row.buffer);
totalKeysWritten++;
}
}
finally
{
ci.close();
}
if (writer != null) {
List<String> filenames = writer.getAllFilenames();
String format = "AntiCompacted to %s. %d/%d bytes for %d keys. Time: %dms.";
long dTime = System.currentTimeMillis() - startTime;
long length = new File(filenames.get(filenames.size() -1)).length(); // Data file is last in the list
logger.info(String.format(format, writer.getFilename(), SSTable.getTotalBytes(sstables), length, totalKeysWritten, dTime));
}
return writer;
}
/**
* This function is used to do the anti compaction process. It spits out a file which has keys
* that belong to a given range. If the target is not specified it spits out the file as a compacted file with the
* unnecessary ranges wiped out.
*
* @param cfs
* @param sstables
* @param ranges
* @param target
* @return
* @throws java.io.IOException
*/
private List<String> doAntiCompaction(ColumnFamilyStore cfs, Collection<SSTableReader> sstables, Collection<Range> ranges, InetAddress target)
throws IOException
{
List<String> filenames = new ArrayList<String>(SSTable.FILES_ON_DISK);
SSTableWriter writer = antiCompactionHelper(cfs, sstables, ranges, target);
if (writer != null)
{
writer.close();
filenames = writer.getAllFilenames();
}
return filenames;
}
/**
* Like doAntiCompaction(), but returns an List of SSTableReaders instead of a list of filenames.
* @throws java.io.IOException
*/
private List<SSTableReader> doAntiCompactionReturnReaders(ColumnFamilyStore cfs, Collection<SSTableReader> sstables, Collection<Range> ranges, InetAddress target)
throws IOException
{
List<SSTableReader> results = new ArrayList<SSTableReader>(1);
SSTableWriter writer = antiCompactionHelper(cfs, sstables, ranges, target);
if (writer != null)
{
results.add(writer.closeAndOpenReader());
}
return results;
}
/**
* This function goes over each file and removes the keys that the node is not responsible for
* and only keeps keys that this node is responsible for.
*
* @throws IOException
*/
private void doCleanupCompaction(ColumnFamilyStore cfs) throws IOException
{
Collection<SSTableReader> originalSSTables = cfs.getSSTables();
List<SSTableReader> sstables = doAntiCompactionReturnReaders(cfs, originalSSTables, StorageService.instance.getLocalRanges(cfs.getTable().name), null);
if (!sstables.isEmpty())
{
cfs.replaceCompactedSSTables(originalSSTables, sstables);
}
}
/**
* Performs a readonly "compaction" of all sstables in order to validate complete rows,
* but without writing the merge result
*/
private void doValidationCompaction(ColumnFamilyStore cfs, AntiEntropyService.Validator validator) throws IOException
{
Collection<SSTableReader> sstables = cfs.getSSTables();
CompactionIterator ci = new CompactionIterator(cfs, sstables, getDefaultGCBefore(), true);
executor.beginCompaction(cfs, ci);
try
{
Iterator<CompactionIterator.CompactedRow> nni = new FilterIterator(ci, PredicateUtils.notNullPredicate());
// validate the CF as we iterate over it
validator.prepare(cfs);
while (nni.hasNext())
{
CompactionIterator.CompactedRow row = nni.next();
validator.add(row);
}
validator.complete();
}
finally
{
ci.close();
}
}
/*
* Group files of similar size into buckets.
*/
static <T> Set<List<T>> getBuckets(Iterable<Pair<T, Long>> files, long min)
{
// Sort the list in order to get deterministic results during the grouping below
List<Pair<T, Long>> sortedFiles = new ArrayList<Pair<T, Long>>();
for (Pair<T, Long> pair: files)
sortedFiles.add(pair);
Collections.sort(sortedFiles, new Comparator<Pair<T, Long>>()
{
public int compare(Pair<T, Long> p1, Pair<T, Long> p2)
{
return p1.right.compareTo(p2.right);
}
});
Map<List<T>, Long> buckets = new HashMap<List<T>, Long>();
for (Pair<T, Long> pair: sortedFiles)
{
long size = pair.right;
boolean bFound = false;
// look for a bucket containing similar-sized files:
// group in the same bucket if it's w/in 50% of the average for this bucket,
// or this file and the bucket are all considered "small" (less than `min`)
for (Entry<List<T>, Long> entry : buckets.entrySet())
{
List<T> bucket = entry.getKey();
long averageSize = entry.getValue();
if ((size > (averageSize / 2) && size < (3 * averageSize) / 2)
|| (size < min && averageSize < min))
{
// remove and re-add because adding changes the hash
buckets.remove(bucket);
long totalSize = bucket.size() * averageSize;
averageSize = (totalSize + size) / (bucket.size() + 1);
bucket.add(pair.left);
buckets.put(bucket, averageSize);
bFound = true;
break;
}
}
// no similar bucket found; put it in a new one
if (!bFound)
{
ArrayList<T> bucket = new ArrayList<T>();
bucket.add(pair.left);
buckets.put(bucket, size);
}
}
return buckets.keySet();
}
private static Collection<Pair<SSTableReader, Long>> convertSSTablesToPairs(Collection<SSTableReader> collection)
{
Collection<Pair<SSTableReader, Long>> tablePairs = new HashSet<Pair<SSTableReader, Long>>();
for(SSTableReader table: collection)
tablePairs.add(new Pair<SSTableReader, Long>(table, table.length()));
return tablePairs;
}
public static int getDefaultGCBefore()
{
return (int)(System.currentTimeMillis() / 1000) - DatabaseDescriptor.getGcGraceInSeconds();
}
private static class AntiCompactionIterator extends CompactionIterator
{
private Set<SSTableScanner> scanners;
public AntiCompactionIterator(ColumnFamilyStore cfStore, Collection<SSTableReader> sstables, Collection<Range> ranges, int gcBefore, boolean isMajor)
throws IOException
{
super(cfStore, getCollatedRangeIterator(sstables, ranges), gcBefore, isMajor);
}
private static Iterator getCollatedRangeIterator(Collection<SSTableReader> sstables, final Collection<Range> ranges)
throws IOException
{
org.apache.commons.collections.Predicate rangesPredicate = new org.apache.commons.collections.Predicate()
{
public boolean evaluate(Object row)
{
return Range.isTokenInRanges(((IteratingRow)row).getKey().token, ranges);
}
};
CollatingIterator iter = FBUtilities.<IteratingRow>getCollatingIterator();
for (SSTableReader sstable : sstables)
{
SSTableScanner scanner = sstable.getScanner(FILE_BUFFER_SIZE);
iter.addIterator(new FilterIterator(scanner, rangesPredicate));
}
return iter;
}
public Iterable<SSTableScanner> getScanners()
{
if (scanners == null)
{
scanners = new HashSet<SSTableScanner>();
for (Object o : ((CollatingIterator)source).getIterators())
{
scanners.add((SSTableScanner)((FilterIterator)o).getIterator());
}
}
return scanners;
}
}
public void checkAllColumnFamilies() throws IOException
{
// perform estimates
for (final ColumnFamilyStore cfs : ColumnFamilyStore.all())
{
Runnable runnable = new Runnable()
{
public void run ()
{
logger.debug("Estimating compactions for " + cfs.columnFamily_);
final Set<List<SSTableReader>> buckets =
getBuckets(convertSSTablesToPairs(cfs.getSSTables()), 50L * 1024L * 1024L);
updateEstimateFor(cfs, buckets);
}
};
executor.submit(runnable);
}
// actually schedule compactions. done in a second pass so all the estimates occur before we
// bog down the executor in actual compactions.
for (ColumnFamilyStore cfs : ColumnFamilyStore.all())
{
submitMinorIfNeeded(cfs);
}
}
private class CompactionExecutor extends DebuggableThreadPoolExecutor
{
private volatile ColumnFamilyStore cfs;
private volatile CompactionIterator ci;
public CompactionExecutor()
{
super("COMPACTION-POOL", DatabaseDescriptor.getCompactionPriority());
}
@Override
public void afterExecute(Runnable r, Throwable t)
{
super.afterExecute(r, t);
cfs = null;
ci = null;
}
void beginCompaction(ColumnFamilyStore cfs, CompactionIterator ci)
{
this.cfs = cfs;
this.ci = ci;
}
public String getColumnFamilyName()
{
return cfs == null ? null : cfs.getColumnFamilyName();
}
public Long getBytesTotal()
{
return ci == null ? null : ci.getTotalBytes();
}
public Long getBytesCompleted()
{
return ci == null ? null : ci.getBytesRead();
}
}
public String getColumnFamilyInProgress()
{
return executor.getColumnFamilyName();
}
public Long getBytesTotalInProgress()
{
return executor.getBytesTotal();
}
public Long getBytesCompacted()
{
return executor.getBytesCompleted();
}
public int getPendingTasks()
{
int n = 0;
for (Integer i : estimatedCompactions.values())
{
n += i;
}
return n;
}
}