blob: 347f9b3ed0eafec708690fcdf73121300f535fa6 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.cassandra.db.compaction;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BooleanSupplier;
import java.util.function.Predicate;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import javax.management.openmbean.OpenDataException;
import javax.management.openmbean.TabularData;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.Collections2;
import com.google.common.collect.ConcurrentHashMultiset;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Multimap;
import com.google.common.collect.Multiset;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.RateLimiter;
import com.google.common.util.concurrent.Uninterruptibles;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.cassandra.cache.AutoSavingCache;
import org.apache.cassandra.concurrent.ExecutorFactory;
import org.apache.cassandra.concurrent.WrappedExecutorPlus;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.DecoratedKey;
import org.apache.cassandra.db.Directories;
import org.apache.cassandra.db.DiskBoundaries;
import org.apache.cassandra.db.Keyspace;
import org.apache.cassandra.db.SerializationHeader;
import org.apache.cassandra.db.SystemKeyspace;
import org.apache.cassandra.db.compaction.CompactionInfo.Holder;
import org.apache.cassandra.db.lifecycle.ILifecycleTransaction;
import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
import org.apache.cassandra.db.lifecycle.SSTableIntervalTree;
import org.apache.cassandra.db.lifecycle.SSTableSet;
import org.apache.cassandra.db.lifecycle.View;
import org.apache.cassandra.db.lifecycle.WrappedLifecycleTransaction;
import org.apache.cassandra.db.rows.UnfilteredRowIterator;
import org.apache.cassandra.db.view.ViewBuilderTask;
import org.apache.cassandra.dht.AbstractBounds;
import org.apache.cassandra.dht.Bounds;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.exceptions.ConfigurationException;
import org.apache.cassandra.index.SecondaryIndexBuilder;
import org.apache.cassandra.io.sstable.Component;
import org.apache.cassandra.io.sstable.Descriptor;
import org.apache.cassandra.io.sstable.ISSTableScanner;
import org.apache.cassandra.io.sstable.IndexSummaryRedistribution;
import org.apache.cassandra.io.sstable.SSTableRewriter;
import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.apache.cassandra.io.sstable.format.SSTableWriter;
import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
import org.apache.cassandra.io.sstable.metadata.StatsMetadata;
import org.apache.cassandra.io.util.File;
import org.apache.cassandra.io.util.FileUtils;
import org.apache.cassandra.locator.RangesAtEndpoint;
import org.apache.cassandra.metrics.CompactionMetrics;
import org.apache.cassandra.metrics.TableMetrics;
import org.apache.cassandra.repair.NoSuchRepairSessionException;
import org.apache.cassandra.schema.CompactionParams.TombstoneOption;
import org.apache.cassandra.schema.Schema;
import org.apache.cassandra.schema.TableMetadata;
import org.apache.cassandra.service.ActiveRepairService;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.streaming.PreviewKind;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.JVMStabilityInspector;
import org.apache.cassandra.utils.MBeanWrapper;
import org.apache.cassandra.utils.Throwables;
import org.apache.cassandra.utils.TimeUUID;
import org.apache.cassandra.utils.WrappedRunnable;
import org.apache.cassandra.utils.concurrent.Future;
import org.apache.cassandra.utils.concurrent.ImmediateFuture;
import org.apache.cassandra.utils.concurrent.Refs;
import static java.util.Collections.singleton;
import static org.apache.cassandra.concurrent.ExecutorFactory.Global.executorFactory;
import static org.apache.cassandra.concurrent.FutureTask.callable;
import static org.apache.cassandra.config.DatabaseDescriptor.getConcurrentCompactors;
import static org.apache.cassandra.db.compaction.CompactionManager.CompactionExecutor.compactionThreadGroup;
import static org.apache.cassandra.service.ActiveRepairService.NO_PENDING_REPAIR;
import static org.apache.cassandra.service.ActiveRepairService.UNREPAIRED_SSTABLE;
import static org.apache.cassandra.utils.Clock.Global.nanoTime;
import static org.apache.cassandra.utils.TimeUUID.Generator.nextTimeUUID;
/**
* <p>
* A singleton which manages a private executor of ongoing compactions.
* </p>
* Scheduling for compaction is accomplished by swapping sstables to be compacted into
* a set via Tracker. New scheduling attempts will ignore currently compacting
* sstables.
*/
public class CompactionManager implements CompactionManagerMBean
{
public static final String MBEAN_OBJECT_NAME = "org.apache.cassandra.db:type=CompactionManager";
private static final Logger logger = LoggerFactory.getLogger(CompactionManager.class);
public static final CompactionManager instance;
@VisibleForTesting
public final AtomicInteger currentlyBackgroundUpgrading = new AtomicInteger(0);
public static final int NO_GC = Integer.MIN_VALUE;
public static final int GC_ALL = Integer.MAX_VALUE;
static
{
instance = new CompactionManager();
MBeanWrapper.instance.registerMBean(instance, MBEAN_OBJECT_NAME);
}
private final CompactionExecutor executor = new CompactionExecutor();
private final ValidationExecutor validationExecutor = new ValidationExecutor();
private final CompactionExecutor cacheCleanupExecutor = new CacheCleanupExecutor();
private final CompactionExecutor viewBuildExecutor = new ViewBuildExecutor();
private final CompactionMetrics metrics = new CompactionMetrics(executor, validationExecutor, viewBuildExecutor);
@VisibleForTesting
final Multiset<ColumnFamilyStore> compactingCF = ConcurrentHashMultiset.create();
public final ActiveCompactions active = new ActiveCompactions();
// used to temporarily pause non-strategy managed compactions (like index summary redistribution)
private final AtomicInteger globalCompactionPauseCount = new AtomicInteger(0);
private final RateLimiter compactionRateLimiter = RateLimiter.create(Double.MAX_VALUE);
public CompactionMetrics getMetrics()
{
return metrics;
}
/**
* Gets compaction rate limiter.
* Rate unit is bytes per sec.
*
* @return RateLimiter with rate limit set
*/
public RateLimiter getRateLimiter()
{
setRateInBytes(DatabaseDescriptor.getCompactionThroughputBytesPerSec());
return compactionRateLimiter;
}
/**
* Sets the rate for the rate limiter. When compaction_throughput is 0 or node is bootstrapping,
* this sets the rate to Double.MAX_VALUE bytes per second.
* @param throughputMbPerSec throughput to set in MiB/s
* @deprecated Use setRateInBytes instead
*/
@Deprecated
public void setRate(final double throughputMbPerSec)
{
setRateInBytes(throughputMbPerSec * 1024.0 * 1024);
}
/**
* Sets the rate for the rate limiter. When compaction_throughput is 0 or node is bootstrapping,
* this sets the rate to Double.MAX_VALUE bytes per second.
* @param throughputBytesPerSec throughput to set in B/s
*/
public void setRateInBytes(final double throughputBytesPerSec)
{
double throughput = throughputBytesPerSec;
// if throughput is set to 0, throttling is disabled
if (throughput == 0 || StorageService.instance.isBootstrapMode())
throughput = Double.MAX_VALUE;
if (compactionRateLimiter.getRate() != throughput)
compactionRateLimiter.setRate(throughput);
}
/**
* Call this whenever a compaction might be needed on the given columnfamily.
* It's okay to over-call (within reason) if a call is unnecessary, it will
* turn into a no-op in the bucketing/candidate-scan phase.
*/
public List<Future<?>> submitBackground(final ColumnFamilyStore cfs)
{
if (cfs.isAutoCompactionDisabled())
{
logger.trace("Autocompaction is disabled");
return Collections.emptyList();
}
/**
* If a CF is currently being compacted, and there are no idle threads, submitBackground should be a no-op;
* we can wait for the current compaction to finish and re-submit when more information is available.
* Otherwise, we should submit at least one task to prevent starvation by busier CFs, and more if there
* are idle threads stil. (CASSANDRA-4310)
*/
int count = compactingCF.count(cfs);
if (count > 0 && executor.getActiveTaskCount() >= executor.getMaximumPoolSize())
{
logger.trace("Background compaction is still running for {}.{} ({} remaining). Skipping",
cfs.keyspace.getName(), cfs.name, count);
return Collections.emptyList();
}
logger.trace("Scheduling a background task check for {}.{} with {}",
cfs.keyspace.getName(),
cfs.name,
cfs.getCompactionStrategyManager().getName());
List<Future<?>> futures = new ArrayList<>(1);
Future<?> fut = executor.submitIfRunning(new BackgroundCompactionCandidate(cfs), "background task");
if (!fut.isCancelled())
futures.add(fut);
else
compactingCF.remove(cfs);
return futures;
}
public boolean isCompacting(Iterable<ColumnFamilyStore> cfses, Predicate<SSTableReader> sstablePredicate)
{
for (ColumnFamilyStore cfs : cfses)
if (cfs.getTracker().getCompacting().stream().anyMatch(sstablePredicate))
return true;
return false;
}
@VisibleForTesting
public boolean hasOngoingOrPendingTasks()
{
if (!active.getCompactions().isEmpty() || !compactingCF.isEmpty())
return true;
int pendingTasks = executor.getPendingTaskCount() +
validationExecutor.getPendingTaskCount() +
viewBuildExecutor.getPendingTaskCount() +
cacheCleanupExecutor.getPendingTaskCount();
if (pendingTasks > 0)
return true;
int activeTasks = executor.getActiveTaskCount() +
validationExecutor.getActiveTaskCount() +
viewBuildExecutor.getActiveTaskCount() +
cacheCleanupExecutor.getActiveTaskCount();
return activeTasks > 0;
}
/**
* Shutdowns both compaction and validation executors, cancels running compaction / validation,
* and waits for tasks to complete if tasks were not cancelable.
*/
public void forceShutdown()
{
// shutdown executors to prevent further submission
executor.shutdown();
validationExecutor.shutdown();
viewBuildExecutor.shutdown();
cacheCleanupExecutor.shutdown();
// interrupt compactions and validations
for (Holder compactionHolder : active.getCompactions())
{
compactionHolder.stop();
}
// wait for tasks to terminate
// compaction tasks are interrupted above, so it shuold be fairy quick
// until not interrupted tasks to complete.
for (ExecutorService exec : Arrays.asList(executor, validationExecutor, viewBuildExecutor, cacheCleanupExecutor))
{
try
{
if (!exec.awaitTermination(1, TimeUnit.MINUTES))
logger.warn("Failed to wait for compaction executors shutdown");
}
catch (InterruptedException e)
{
logger.error("Interrupted while waiting for tasks to be terminated", e);
}
}
}
public void finishCompactionsAndShutdown(long timeout, TimeUnit unit) throws InterruptedException
{
executor.shutdown();
executor.awaitTermination(timeout, unit);
}
// the actual sstables to compact are not determined until we run the BCT; that way, if new sstables
// are created between task submission and execution, we execute against the most up-to-date information
@VisibleForTesting
class BackgroundCompactionCandidate implements Runnable
{
private final ColumnFamilyStore cfs;
BackgroundCompactionCandidate(ColumnFamilyStore cfs)
{
compactingCF.add(cfs);
this.cfs = cfs;
}
public void run()
{
boolean ranCompaction = false;
try
{
logger.trace("Checking {}.{}", cfs.keyspace.getName(), cfs.name);
if (!cfs.isValid())
{
logger.trace("Aborting compaction for dropped CF");
return;
}
CompactionStrategyManager strategy = cfs.getCompactionStrategyManager();
AbstractCompactionTask task = strategy.getNextBackgroundTask(getDefaultGcBefore(cfs, FBUtilities.nowInSeconds()));
if (task == null)
{
if (DatabaseDescriptor.automaticSSTableUpgrade())
ranCompaction = maybeRunUpgradeTask(strategy);
}
else
{
task.execute(active);
ranCompaction = true;
}
}
finally
{
compactingCF.remove(cfs);
}
if (ranCompaction) // only submit background if we actually ran a compaction - otherwise we end up in an infinite loop submitting noop background tasks
submitBackground(cfs);
}
boolean maybeRunUpgradeTask(CompactionStrategyManager strategy)
{
logger.debug("Checking for upgrade tasks {}.{}", cfs.keyspace.getName(), cfs.getTableName());
try
{
if (currentlyBackgroundUpgrading.incrementAndGet() <= DatabaseDescriptor.maxConcurrentAutoUpgradeTasks())
{
AbstractCompactionTask upgradeTask = strategy.findUpgradeSSTableTask();
if (upgradeTask != null)
{
upgradeTask.execute(active);
return true;
}
}
}
finally
{
currentlyBackgroundUpgrading.decrementAndGet();
}
logger.trace("No tasks available");
return false;
}
}
@VisibleForTesting
public BackgroundCompactionCandidate getBackgroundCompactionCandidate(ColumnFamilyStore cfs)
{
return new BackgroundCompactionCandidate(cfs);
}
/**
* Run an operation over all sstables using jobs threads
*
* @param cfs the column family store to run the operation on
* @param operation the operation to run
* @param jobs the number of threads to use - 0 means use all available. It never uses more than concurrent_compactors threads
* @return status of the operation
* @throws ExecutionException
* @throws InterruptedException
*/
@SuppressWarnings("resource")
private AllSSTableOpStatus parallelAllSSTableOperation(final ColumnFamilyStore cfs, final OneSSTableOperation operation, int jobs, OperationType operationType) throws ExecutionException, InterruptedException
{
logger.info("Starting {} for {}.{}", operationType, cfs.keyspace.getName(), cfs.getTableName());
List<LifecycleTransaction> transactions = new ArrayList<>();
List<Future<?>> futures = new ArrayList<>();
try (LifecycleTransaction compacting = cfs.markAllCompacting(operationType))
{
if (compacting == null)
return AllSSTableOpStatus.UNABLE_TO_CANCEL;
Iterable<SSTableReader> sstables = Lists.newArrayList(operation.filterSSTables(compacting));
if (Iterables.isEmpty(sstables))
{
logger.info("No sstables to {} for {}.{}", operationType.name(), cfs.keyspace.getName(), cfs.name);
return AllSSTableOpStatus.SUCCESSFUL;
}
for (final SSTableReader sstable : sstables)
{
final LifecycleTransaction txn = compacting.split(singleton(sstable));
transactions.add(txn);
Callable<Object> callable = new Callable<Object>()
{
@Override
public Object call() throws Exception
{
operation.execute(txn);
return this;
}
};
Future<?> fut = executor.submitIfRunning(callable, "paralell sstable operation");
if (!fut.isCancelled())
futures.add(fut);
else
return AllSSTableOpStatus.ABORTED;
if (jobs > 0 && futures.size() == jobs)
{
Future<?> f = FBUtilities.waitOnFirstFuture(futures);
futures.remove(f);
}
}
FBUtilities.waitOnFutures(futures);
assert compacting.originals().isEmpty();
logger.info("Finished {} for {}.{} successfully", operationType, cfs.keyspace.getName(), cfs.getTableName());
return AllSSTableOpStatus.SUCCESSFUL;
}
finally
{
// wait on any unfinished futures to make sure we don't close an ongoing transaction
try
{
FBUtilities.waitOnFutures(futures);
}
catch (Throwable t)
{
// these are handled/logged in CompactionExecutor#afterExecute
}
Throwable fail = Throwables.close(null, transactions);
if (fail != null)
logger.error("Failed to cleanup lifecycle transactions ({} for {}.{})", operationType, cfs.keyspace.getName(), cfs.getTableName(), fail);
}
}
private static interface OneSSTableOperation
{
Iterable<SSTableReader> filterSSTables(LifecycleTransaction transaction);
void execute(LifecycleTransaction input) throws IOException;
}
public enum AllSSTableOpStatus
{
SUCCESSFUL(0),
ABORTED(1),
UNABLE_TO_CANCEL(2);
public final int statusCode;
AllSSTableOpStatus(int statusCode)
{
this.statusCode = statusCode;
}
}
public AllSSTableOpStatus performScrub(final ColumnFamilyStore cfs, final boolean skipCorrupted, final boolean checkData,
int jobs)
throws InterruptedException, ExecutionException
{
return performScrub(cfs, skipCorrupted, checkData, false, jobs);
}
public AllSSTableOpStatus performScrub(final ColumnFamilyStore cfs, final boolean skipCorrupted, final boolean checkData,
final boolean reinsertOverflowedTTL, int jobs)
throws InterruptedException, ExecutionException
{
return parallelAllSSTableOperation(cfs, new OneSSTableOperation()
{
@Override
public Iterable<SSTableReader> filterSSTables(LifecycleTransaction input)
{
return input.originals();
}
@Override
public void execute(LifecycleTransaction input)
{
scrubOne(cfs, input, skipCorrupted, checkData, reinsertOverflowedTTL, active);
}
}, jobs, OperationType.SCRUB);
}
public AllSSTableOpStatus performVerify(ColumnFamilyStore cfs, Verifier.Options options) throws InterruptedException, ExecutionException
{
assert !cfs.isIndex();
return parallelAllSSTableOperation(cfs, new OneSSTableOperation()
{
@Override
public Iterable<SSTableReader> filterSSTables(LifecycleTransaction input)
{
return input.originals();
}
@Override
public void execute(LifecycleTransaction input)
{
verifyOne(cfs, input.onlyOne(), options, active);
}
}, 0, OperationType.VERIFY);
}
public AllSSTableOpStatus performSSTableRewrite(final ColumnFamilyStore cfs,
final boolean skipIfCurrentVersion,
final long skipIfOlderThanTimestamp,
final boolean skipIfCompressionMatches,
int jobs) throws InterruptedException, ExecutionException
{
return performSSTableRewrite(cfs, (sstable) -> {
// Skip if descriptor version matches current version
if (skipIfCurrentVersion && sstable.descriptor.version.equals(sstable.descriptor.getFormat().getLatestVersion()))
return false;
// Skip if SSTable creation time is past given timestamp
if (sstable.getCreationTimeFor(Component.DATA) > skipIfOlderThanTimestamp)
return false;
TableMetadata metadata = cfs.metadata.get();
// Skip if SSTable compression parameters match current ones
if (skipIfCompressionMatches &&
((!sstable.compression && !metadata.params.compression.isEnabled()) ||
(sstable.compression && metadata.params.compression.equals(sstable.getCompressionMetadata().parameters))))
return false;
return true;
}, jobs);
}
/**
* Perform SSTable rewrite
* @param sstableFilter sstables for which predicate returns {@link false} will be excluded
*/
public AllSSTableOpStatus performSSTableRewrite(final ColumnFamilyStore cfs, Predicate<SSTableReader> sstableFilter, int jobs) throws InterruptedException, ExecutionException
{
return parallelAllSSTableOperation(cfs, new OneSSTableOperation()
{
@Override
public Iterable<SSTableReader> filterSSTables(LifecycleTransaction transaction)
{
List<SSTableReader> sortedSSTables = Lists.newArrayList(transaction.originals());
Collections.sort(sortedSSTables, SSTableReader.sizeComparator.reversed());
Iterator<SSTableReader> iter = sortedSSTables.iterator();
while (iter.hasNext())
{
SSTableReader sstable = iter.next();
if (!sstableFilter.test(sstable))
{
transaction.cancel(sstable);
iter.remove();
}
}
return sortedSSTables;
}
@Override
public void execute(LifecycleTransaction txn)
{
AbstractCompactionTask task = cfs.getCompactionStrategyManager().getCompactionTask(txn, NO_GC, Long.MAX_VALUE);
task.setUserDefined(true);
task.setCompactionType(OperationType.UPGRADE_SSTABLES);
task.execute(active);
}
}, jobs, OperationType.UPGRADE_SSTABLES);
}
public AllSSTableOpStatus performCleanup(final ColumnFamilyStore cfStore, int jobs) throws InterruptedException, ExecutionException
{
assert !cfStore.isIndex();
Keyspace keyspace = cfStore.keyspace;
// if local ranges is empty, it means no data should remain
final RangesAtEndpoint replicas = StorageService.instance.getLocalReplicas(keyspace.getName());
final Set<Range<Token>> allRanges = replicas.ranges();
final Set<Range<Token>> transientRanges = replicas.onlyTransient().ranges();
final Set<Range<Token>> fullRanges = replicas.onlyFull().ranges();
final boolean hasIndexes = cfStore.indexManager.hasIndexes();
return parallelAllSSTableOperation(cfStore, new OneSSTableOperation()
{
@Override
public Iterable<SSTableReader> filterSSTables(LifecycleTransaction transaction)
{
List<SSTableReader> sortedSSTables = Lists.newArrayList(transaction.originals());
Iterator<SSTableReader> sstableIter = sortedSSTables.iterator();
int totalSSTables = 0;
int skippedSStables = 0;
while (sstableIter.hasNext())
{
SSTableReader sstable = sstableIter.next();
boolean needsCleanupFull = needsCleanup(sstable, fullRanges);
boolean needsCleanupTransient = !transientRanges.isEmpty() && sstable.isRepaired() && needsCleanup(sstable, transientRanges);
//If there are no ranges for which the table needs cleanup either due to lack of intersection or lack
//of the table being repaired.
totalSSTables++;
if (!needsCleanupFull && !needsCleanupTransient)
{
logger.debug("Skipping {} ([{}, {}]) for cleanup; all rows should be kept. Needs cleanup full ranges: {} Needs cleanup transient ranges: {} Repaired: {}",
sstable,
sstable.first.getToken(),
sstable.last.getToken(),
needsCleanupFull,
needsCleanupTransient,
sstable.isRepaired());
sstableIter.remove();
transaction.cancel(sstable);
skippedSStables++;
}
}
logger.info("Skipping cleanup for {}/{} sstables for {}.{} since they are fully contained in owned ranges (full ranges: {}, transient ranges: {})",
skippedSStables, totalSSTables, cfStore.keyspace.getName(), cfStore.getTableName(), fullRanges, transientRanges);
sortedSSTables.sort(SSTableReader.sizeComparator);
return sortedSSTables;
}
@Override
public void execute(LifecycleTransaction txn) throws IOException
{
CleanupStrategy cleanupStrategy = CleanupStrategy.get(cfStore, allRanges, transientRanges, txn.onlyOne().isRepaired(), FBUtilities.nowInSeconds());
doCleanupOne(cfStore, txn, cleanupStrategy, replicas.ranges(), hasIndexes);
}
}, jobs, OperationType.CLEANUP);
}
public AllSSTableOpStatus performGarbageCollection(final ColumnFamilyStore cfStore, TombstoneOption tombstoneOption, int jobs) throws InterruptedException, ExecutionException
{
assert !cfStore.isIndex();
return parallelAllSSTableOperation(cfStore, new OneSSTableOperation()
{
@Override
public Iterable<SSTableReader> filterSSTables(LifecycleTransaction transaction)
{
Iterable<SSTableReader> originals = transaction.originals();
if (cfStore.getCompactionStrategyManager().onlyPurgeRepairedTombstones())
originals = Iterables.filter(originals, SSTableReader::isRepaired);
List<SSTableReader> sortedSSTables = Lists.newArrayList(originals);
Collections.sort(sortedSSTables, SSTableReader.maxTimestampAscending);
return sortedSSTables;
}
@Override
public void execute(LifecycleTransaction txn) throws IOException
{
logger.debug("Garbage collecting {}", txn.originals());
CompactionTask task = new CompactionTask(cfStore, txn, getDefaultGcBefore(cfStore, FBUtilities.nowInSeconds()))
{
@Override
protected CompactionController getCompactionController(Set<SSTableReader> toCompact)
{
return new CompactionController(cfStore, toCompact, gcBefore, null, tombstoneOption);
}
@Override
protected int getLevel()
{
return txn.onlyOne().getSSTableLevel();
}
};
task.setUserDefined(true);
task.setCompactionType(OperationType.GARBAGE_COLLECT);
task.execute(active);
}
}, jobs, OperationType.GARBAGE_COLLECT);
}
public AllSSTableOpStatus relocateSSTables(final ColumnFamilyStore cfs, int jobs) throws ExecutionException, InterruptedException
{
if (!cfs.getPartitioner().splitter().isPresent())
{
logger.info("Partitioner does not support splitting");
return AllSSTableOpStatus.ABORTED;
}
if (StorageService.instance.getLocalReplicas(cfs.keyspace.getName()).isEmpty())
{
logger.info("Relocate cannot run before a node has joined the ring");
return AllSSTableOpStatus.ABORTED;
}
final DiskBoundaries diskBoundaries = cfs.getDiskBoundaries();
return parallelAllSSTableOperation(cfs, new OneSSTableOperation()
{
@Override
public Iterable<SSTableReader> filterSSTables(LifecycleTransaction transaction)
{
Set<SSTableReader> originals = Sets.newHashSet(transaction.originals());
Set<SSTableReader> needsRelocation = originals.stream().filter(s -> !inCorrectLocation(s)).collect(Collectors.toSet());
transaction.cancel(Sets.difference(originals, needsRelocation));
Map<Integer, List<SSTableReader>> groupedByDisk = groupByDiskIndex(needsRelocation);
int maxSize = 0;
for (List<SSTableReader> diskSSTables : groupedByDisk.values())
maxSize = Math.max(maxSize, diskSSTables.size());
List<SSTableReader> mixedSSTables = new ArrayList<>();
for (int i = 0; i < maxSize; i++)
for (List<SSTableReader> diskSSTables : groupedByDisk.values())
if (i < diskSSTables.size())
mixedSSTables.add(diskSSTables.get(i));
return mixedSSTables;
}
public Map<Integer, List<SSTableReader>> groupByDiskIndex(Set<SSTableReader> needsRelocation)
{
return needsRelocation.stream().collect(Collectors.groupingBy((s) -> diskBoundaries.getDiskIndex(s)));
}
private boolean inCorrectLocation(SSTableReader sstable)
{
if (!cfs.getPartitioner().splitter().isPresent())
return true;
// Compare the expected data directory for the sstable with its current data directory
Directories.DataDirectory currentDirectory = cfs.getDirectories().getDataDirectoryForFile(sstable.descriptor);
return diskBoundaries.isInCorrectLocation(sstable, currentDirectory);
}
@Override
public void execute(LifecycleTransaction txn)
{
logger.debug("Relocating {}", txn.originals());
AbstractCompactionTask task = cfs.getCompactionStrategyManager().getCompactionTask(txn, NO_GC, Long.MAX_VALUE);
task.setUserDefined(true);
task.setCompactionType(OperationType.RELOCATE);
task.execute(active);
}
}, jobs, OperationType.RELOCATE);
}
/**
* Splits the given token ranges of the given sstables into a pending repair silo
*/
public Future<Void> submitPendingAntiCompaction(ColumnFamilyStore cfs,
RangesAtEndpoint tokenRanges,
Refs<SSTableReader> sstables,
LifecycleTransaction txn,
TimeUUID sessionId,
BooleanSupplier isCancelled)
{
Runnable runnable = new WrappedRunnable()
{
protected void runMayThrow() throws Exception
{
try (TableMetrics.TableTimer.Context ctx = cfs.metric.anticompactionTime.time())
{
performAnticompaction(cfs, tokenRanges, sstables, txn, sessionId, isCancelled);
}
}
};
Future<Void> task = null;
try
{
task = executor.submitIfRunning(runnable, "pending anticompaction");
return task;
}
finally
{
if (task == null || task.isCancelled())
{
sstables.release();
txn.abort();
}
}
}
/**
* for sstables that are fully contained in the given ranges, just rewrite their metadata with
* the pending repair id and remove them from the transaction
*/
private static void mutateFullyContainedSSTables(ColumnFamilyStore cfs,
Refs<SSTableReader> refs,
Iterator<SSTableReader> sstableIterator,
Collection<Range<Token>> ranges,
LifecycleTransaction txn,
TimeUUID sessionID,
boolean isTransient) throws IOException
{
if (ranges.isEmpty())
return;
List<Range<Token>> normalizedRanges = Range.normalize(ranges);
Set<SSTableReader> fullyContainedSSTables = findSSTablesToAnticompact(sstableIterator, normalizedRanges, sessionID);
cfs.metric.bytesMutatedAnticompaction.inc(SSTableReader.getTotalBytes(fullyContainedSSTables));
cfs.getCompactionStrategyManager().mutateRepaired(fullyContainedSSTables, UNREPAIRED_SSTABLE, sessionID, isTransient);
// since we're just re-writing the sstable metdata for the fully contained sstables, we don't want
// them obsoleted when the anti-compaction is complete. So they're removed from the transaction here
txn.cancel(fullyContainedSSTables);
refs.release(fullyContainedSSTables);
}
/**
* Make sure the {validatedForRepair} are marked for compaction before calling this.
*
* Caller must reference the validatedForRepair sstables (via ParentRepairSession.getActiveRepairedSSTableRefs(..)).
*
* @param cfs
* @param replicas token ranges to be repaired
* @param validatedForRepair SSTables containing the repaired ranges. Should be referenced before passing them.
* @param sessionID the repair session we're anti-compacting for
* @param isCancelled function that indicates if active anti-compaction should be canceled
* @throws InterruptedException
* @throws IOException
*/
public void performAnticompaction(ColumnFamilyStore cfs,
RangesAtEndpoint replicas,
Refs<SSTableReader> validatedForRepair,
LifecycleTransaction txn,
TimeUUID sessionID,
BooleanSupplier isCancelled) throws IOException
{
try
{
ActiveRepairService.ParentRepairSession prs;
try
{
prs = ActiveRepairService.instance.getParentRepairSession(sessionID);
}
catch (NoSuchRepairSessionException e)
{
throw new CompactionInterruptedException(e.getMessage());
}
Preconditions.checkArgument(!prs.isPreview(), "Cannot anticompact for previews");
Preconditions.checkArgument(!replicas.isEmpty(), "No ranges to anti-compact");
if (logger.isInfoEnabled())
logger.info("{} Starting anticompaction for {}.{} on {}/{} sstables", PreviewKind.NONE.logPrefix(sessionID), cfs.keyspace.getName(), cfs.getTableName(), validatedForRepair.size(), cfs.getLiveSSTables().size());
if (logger.isTraceEnabled())
logger.trace("{} Starting anticompaction for ranges {}", PreviewKind.NONE.logPrefix(sessionID), replicas);
Set<SSTableReader> sstables = new HashSet<>(validatedForRepair);
validateSSTableBoundsForAnticompaction(sessionID, sstables, replicas);
mutateFullyContainedSSTables(cfs, validatedForRepair, sstables.iterator(), replicas.onlyFull().ranges(), txn, sessionID, false);
mutateFullyContainedSSTables(cfs, validatedForRepair, sstables.iterator(), replicas.onlyTransient().ranges(), txn, sessionID, true);
assert txn.originals().equals(sstables);
if (!sstables.isEmpty())
doAntiCompaction(cfs, replicas, txn, sessionID, isCancelled);
txn.finish();
}
finally
{
validatedForRepair.release();
txn.close();
}
logger.info("{} Completed anticompaction successfully", PreviewKind.NONE.logPrefix(sessionID));
}
static void validateSSTableBoundsForAnticompaction(TimeUUID sessionID,
Collection<SSTableReader> sstables,
RangesAtEndpoint ranges)
{
List<Range<Token>> normalizedRanges = Range.normalize(ranges.ranges());
for (SSTableReader sstable : sstables)
{
Bounds<Token> bounds = new Bounds<>(sstable.first.getToken(), sstable.last.getToken());
if (!Iterables.any(normalizedRanges, r -> (r.contains(bounds.left) && r.contains(bounds.right)) || r.intersects(bounds)))
{
// this should never happen - in PendingAntiCompaction#getSSTables we select all sstables that intersect the repaired ranges, that can't have changed here
String message = String.format("%s SSTable %s (%s) does not intersect repaired ranges %s, this sstable should not have been included.",
PreviewKind.NONE.logPrefix(sessionID), sstable, bounds, normalizedRanges);
logger.error(message);
throw new IllegalStateException(message);
}
}
}
@VisibleForTesting
static Set<SSTableReader> findSSTablesToAnticompact(Iterator<SSTableReader> sstableIterator, List<Range<Token>> normalizedRanges, TimeUUID parentRepairSession)
{
Set<SSTableReader> fullyContainedSSTables = new HashSet<>();
while (sstableIterator.hasNext())
{
SSTableReader sstable = sstableIterator.next();
Bounds<Token> sstableBounds = new Bounds<>(sstable.first.getToken(), sstable.last.getToken());
for (Range<Token> r : normalizedRanges)
{
// ranges are normalized - no wrap around - if first and last are contained we know that all tokens are contained in the range
if (r.contains(sstable.first.getToken()) && r.contains(sstable.last.getToken()))
{
logger.info("{} SSTable {} fully contained in range {}, mutating repairedAt instead of anticompacting", PreviewKind.NONE.logPrefix(parentRepairSession), sstable, r);
fullyContainedSSTables.add(sstable);
sstableIterator.remove();
break;
}
else if (r.intersects(sstableBounds))
{
logger.info("{} SSTable {} ({}) will be anticompacted on range {}", PreviewKind.NONE.logPrefix(parentRepairSession), sstable, sstableBounds, r);
}
}
}
return fullyContainedSSTables;
}
public void performMaximal(final ColumnFamilyStore cfStore, boolean splitOutput)
{
FBUtilities.waitOnFutures(submitMaximal(cfStore, getDefaultGcBefore(cfStore, FBUtilities.nowInSeconds()), splitOutput));
}
@SuppressWarnings("resource") // the tasks are executed in parallel on the executor, making sure that they get closed
public List<Future<?>> submitMaximal(final ColumnFamilyStore cfStore, final int gcBefore, boolean splitOutput)
{
// here we compute the task off the compaction executor, so having that present doesn't
// confuse runWithCompactionsDisabled -- i.e., we don't want to deadlock ourselves, waiting
// for ourselves to finish/acknowledge cancellation before continuing.
CompactionTasks tasks = cfStore.getCompactionStrategyManager().getMaximalTasks(gcBefore, splitOutput);
if (tasks.isEmpty())
return Collections.emptyList();
List<Future<?>> futures = new ArrayList<>();
int nonEmptyTasks = 0;
for (final AbstractCompactionTask task : tasks)
{
if (task.transaction.originals().size() > 0)
nonEmptyTasks++;
Runnable runnable = new WrappedRunnable()
{
protected void runMayThrow()
{
task.execute(active);
}
};
Future<?> fut = executor.submitIfRunning(runnable, "maximal task");
if (!fut.isCancelled())
futures.add(fut);
}
if (nonEmptyTasks > 1)
logger.info("Major compaction will not result in a single sstable - repaired and unrepaired data is kept separate and compaction runs per data_file_directory.");
return futures;
}
public void forceCompaction(ColumnFamilyStore cfStore, Supplier<Collection<SSTableReader>> sstablesFn, com.google.common.base.Predicate<SSTableReader> sstablesPredicate)
{
Callable<CompactionTasks> taskCreator = () -> {
Collection<SSTableReader> sstables = sstablesFn.get();
if (sstables == null || sstables.isEmpty())
{
logger.debug("No sstables found for the provided token range");
return CompactionTasks.empty();
}
return cfStore.getCompactionStrategyManager().getUserDefinedTasks(sstables, getDefaultGcBefore(cfStore, FBUtilities.nowInSeconds()));
};
try (CompactionTasks tasks = cfStore.runWithCompactionsDisabled(taskCreator,
sstablesPredicate,
false,
false,
false))
{
if (tasks.isEmpty())
return;
Runnable runnable = new WrappedRunnable()
{
protected void runMayThrow()
{
for (AbstractCompactionTask task : tasks)
if (task != null)
task.execute(active);
}
};
FBUtilities.waitOnFuture(executor.submitIfRunning(runnable, "force compaction for token range"));
}
}
/**
* Forces a major compaction of specified token ranges of the specified column family.
* <p>
* The token ranges will be interpreted as closed intervals to match the closed interval defined by the first and
* last keys of a sstable, even though the {@link Range} class is suppossed to be half-open by definition.
*
* @param cfStore The column family store to be compacted.
* @param ranges The token ranges to be compacted, interpreted as closed intervals.
*/
public void forceCompactionForTokenRange(ColumnFamilyStore cfStore, Collection<Range<Token>> ranges)
{
forceCompaction(cfStore,
() -> sstablesInBounds(cfStore, ranges),
sstable -> new Bounds<>(sstable.first.getToken(), sstable.last.getToken()).intersects(ranges));
}
/**
* Returns the sstables of the specified column family store that intersect with the specified token ranges.
* <p>
* The token ranges will be interpreted as closed intervals to match the closed interval defined by the first and
* last keys of a sstable, even though the {@link Range} class is suppossed to be half-open by definition.
*/
private static Collection<SSTableReader> sstablesInBounds(ColumnFamilyStore cfs, Collection<Range<Token>> tokenRangeCollection)
{
final Set<SSTableReader> sstables = new HashSet<>();
Iterable<SSTableReader> liveTables = cfs.getTracker().getView().select(SSTableSet.LIVE);
SSTableIntervalTree tree = SSTableIntervalTree.build(liveTables);
for (Range<Token> tokenRange : tokenRangeCollection)
{
if (!AbstractBounds.strictlyWrapsAround(tokenRange.left, tokenRange.right))
{
Iterable<SSTableReader> ssTableReaders = View.sstablesInBounds(tokenRange.left.minKeyBound(), tokenRange.right.maxKeyBound(), tree);
Iterables.addAll(sstables, ssTableReaders);
}
else
{
// Searching an interval tree will not return the correct results for a wrapping range
// so we have to unwrap it first
for (Range<Token> unwrappedRange : tokenRange.unwrap())
{
Iterable<SSTableReader> ssTableReaders = View.sstablesInBounds(unwrappedRange.left.minKeyBound(), unwrappedRange.right.maxKeyBound(), tree);
Iterables.addAll(sstables, ssTableReaders);
}
}
}
return sstables;
}
public void forceCompactionForKey(ColumnFamilyStore cfStore, DecoratedKey key)
{
forceCompaction(cfStore, () -> sstablesWithKey(cfStore, key), sstable -> sstable.maybePresent(key));
}
private static Collection<SSTableReader> sstablesWithKey(ColumnFamilyStore cfs, DecoratedKey key)
{
final Set<SSTableReader> sstables = new HashSet<>();
Iterable<SSTableReader> liveTables = cfs.getTracker().getView().liveSSTablesInBounds(key.getToken().minKeyBound(),
key.getToken().maxKeyBound());
for (SSTableReader sstable : liveTables)
{
if (sstable.maybePresent(key))
sstables.add(sstable);
}
return sstables.isEmpty() ? Collections.emptyList() : sstables;
}
public void forceUserDefinedCompaction(String dataFiles)
{
String[] filenames = dataFiles.split(",");
Multimap<ColumnFamilyStore, Descriptor> descriptors = ArrayListMultimap.create();
for (String filename : filenames)
{
// extract keyspace and columnfamily name from filename
Descriptor desc = Descriptor.fromFilename(filename.trim());
if (Schema.instance.getTableMetadataRef(desc) == null)
{
logger.warn("Schema does not exist for file {}. Skipping.", filename);
continue;
}
// group by keyspace/columnfamily
ColumnFamilyStore cfs = Keyspace.open(desc.ksname).getColumnFamilyStore(desc.cfname);
descriptors.put(cfs, cfs.getDirectories().find(new File(filename.trim()).name()));
}
List<Future<?>> futures = new ArrayList<>(descriptors.size());
int nowInSec = FBUtilities.nowInSeconds();
for (ColumnFamilyStore cfs : descriptors.keySet())
futures.add(submitUserDefined(cfs, descriptors.get(cfs), getDefaultGcBefore(cfs, nowInSec)));
FBUtilities.waitOnFutures(futures);
}
public void forceUserDefinedCleanup(String dataFiles)
{
String[] filenames = dataFiles.split(",");
HashMap<ColumnFamilyStore, Descriptor> descriptors = Maps.newHashMap();
for (String filename : filenames)
{
// extract keyspace and columnfamily name from filename
Descriptor desc = Descriptor.fromFilename(filename.trim());
if (Schema.instance.getTableMetadataRef(desc) == null)
{
logger.warn("Schema does not exist for file {}. Skipping.", filename);
continue;
}
// group by keyspace/columnfamily
ColumnFamilyStore cfs = Keyspace.open(desc.ksname).getColumnFamilyStore(desc.cfname);
desc = cfs.getDirectories().find(new File(filename.trim()).name());
if (desc != null)
descriptors.put(cfs, desc);
}
if (!StorageService.instance.isJoined())
{
logger.error("Cleanup cannot run before a node has joined the ring");
return;
}
for (Map.Entry<ColumnFamilyStore,Descriptor> entry : descriptors.entrySet())
{
ColumnFamilyStore cfs = entry.getKey();
Keyspace keyspace = cfs.keyspace;
final RangesAtEndpoint replicas = StorageService.instance.getLocalReplicas(keyspace.getName());
final Set<Range<Token>> allRanges = replicas.ranges();
final Set<Range<Token>> transientRanges = replicas.onlyTransient().ranges();
boolean hasIndexes = cfs.indexManager.hasIndexes();
SSTableReader sstable = lookupSSTable(cfs, entry.getValue());
if (sstable == null)
{
logger.warn("Will not clean {}, it is not an active sstable", entry.getValue());
}
else
{
CleanupStrategy cleanupStrategy = CleanupStrategy.get(cfs, allRanges, transientRanges, sstable.isRepaired(), FBUtilities.nowInSeconds());
try (LifecycleTransaction txn = cfs.getTracker().tryModify(sstable, OperationType.CLEANUP))
{
doCleanupOne(cfs, txn, cleanupStrategy, allRanges, hasIndexes);
}
catch (IOException e)
{
logger.error("forceUserDefinedCleanup failed: {}", e.getLocalizedMessage());
}
}
}
}
public Future<?> submitUserDefined(final ColumnFamilyStore cfs, final Collection<Descriptor> dataFiles, final int gcBefore)
{
Runnable runnable = new WrappedRunnable()
{
protected void runMayThrow() throws Exception
{
// look up the sstables now that we're on the compaction executor, so we don't try to re-compact
// something that was already being compacted earlier.
Collection<SSTableReader> sstables = new ArrayList<>(dataFiles.size());
for (Descriptor desc : dataFiles)
{
// inefficient but not in a performance sensitive path
SSTableReader sstable = lookupSSTable(cfs, desc);
if (sstable == null)
{
logger.info("Will not compact {}: it is not an active sstable", desc);
}
else
{
sstables.add(sstable);
}
}
if (sstables.isEmpty())
{
logger.info("No files to compact for user defined compaction");
}
else
{
try (CompactionTasks tasks = cfs.getCompactionStrategyManager().getUserDefinedTasks(sstables, gcBefore))
{
for (AbstractCompactionTask task : tasks)
{
if (task != null)
task.execute(active);
}
}
}
}
};
return executor.submitIfRunning(runnable, "user defined task");
}
// This acquire a reference on the sstable
// This is not efficient, do not use in any critical path
private SSTableReader lookupSSTable(final ColumnFamilyStore cfs, Descriptor descriptor)
{
for (SSTableReader sstable : cfs.getSSTables(SSTableSet.CANONICAL))
{
if (sstable.descriptor.equals(descriptor))
return sstable;
}
return null;
}
public Future<?> submitValidation(Callable<Object> validation)
{
return validationExecutor.submitIfRunning(validation, "validation");
}
/* Used in tests. */
public void disableAutoCompaction()
{
for (String ksname : Schema.instance.getNonSystemKeyspaces().names())
{
for (ColumnFamilyStore cfs : Keyspace.open(ksname).getColumnFamilyStores())
cfs.disableAutoCompaction();
}
}
@VisibleForTesting
void scrubOne(ColumnFamilyStore cfs, LifecycleTransaction modifier, boolean skipCorrupted, boolean checkData, boolean reinsertOverflowedTTL, ActiveCompactionsTracker activeCompactions)
{
CompactionInfo.Holder scrubInfo = null;
try (Scrubber scrubber = new Scrubber(cfs, modifier, skipCorrupted, checkData, reinsertOverflowedTTL))
{
scrubInfo = scrubber.getScrubInfo();
activeCompactions.beginCompaction(scrubInfo);
scrubber.scrub();
}
finally
{
if (scrubInfo != null)
activeCompactions.finishCompaction(scrubInfo);
}
}
@VisibleForTesting
void verifyOne(ColumnFamilyStore cfs, SSTableReader sstable, Verifier.Options options, ActiveCompactionsTracker activeCompactions)
{
CompactionInfo.Holder verifyInfo = null;
try (Verifier verifier = new Verifier(cfs, sstable, false, options))
{
verifyInfo = verifier.getVerifyInfo();
activeCompactions.beginCompaction(verifyInfo);
verifier.verify();
}
finally
{
if (verifyInfo != null)
activeCompactions.finishCompaction(verifyInfo);
}
}
/**
* Determines if a cleanup would actually remove any data in this SSTable based
* on a set of owned ranges.
*/
@VisibleForTesting
public static boolean needsCleanup(SSTableReader sstable, Collection<Range<Token>> ownedRanges)
{
if (ownedRanges.isEmpty())
{
return true; // all data will be cleaned
}
// unwrap and sort the ranges by LHS token
List<Range<Token>> sortedRanges = Range.normalize(ownedRanges);
// see if there are any keys LTE the token for the start of the first range
// (token range ownership is exclusive on the LHS.)
Range<Token> firstRange = sortedRanges.get(0);
if (sstable.first.getToken().compareTo(firstRange.left) <= 0)
return true;
// then, iterate over all owned ranges and see if the next key beyond the end of the owned
// range falls before the start of the next range
for (int i = 0; i < sortedRanges.size(); i++)
{
Range<Token> range = sortedRanges.get(i);
if (range.right.isMinimum())
{
// we split a wrapping range and this is the second half.
// there can't be any keys beyond this (and this is the last range)
return false;
}
DecoratedKey firstBeyondRange = sstable.firstKeyBeyond(range.right.maxKeyBound());
if (firstBeyondRange == null)
{
// we ran off the end of the sstable looking for the next key; we don't need to check any more ranges
return false;
}
if (i == (sortedRanges.size() - 1))
{
// we're at the last range and we found a key beyond the end of the range
return true;
}
Range<Token> nextRange = sortedRanges.get(i + 1);
if (firstBeyondRange.getToken().compareTo(nextRange.left) <= 0)
{
// we found a key in between the owned ranges
return true;
}
}
return false;
}
/**
* This function goes over a file and removes the keys that the node is not responsible for
* and only keeps keys that this node is responsible for.
*
* @throws IOException
*/
private void doCleanupOne(final ColumnFamilyStore cfs,
LifecycleTransaction txn,
CleanupStrategy cleanupStrategy,
Collection<Range<Token>> allRanges,
boolean hasIndexes) throws IOException
{
assert !cfs.isIndex();
SSTableReader sstable = txn.onlyOne();
// if ranges is empty and no index, entire sstable is discarded
if (!hasIndexes && !new Bounds<>(sstable.first.getToken(), sstable.last.getToken()).intersects(allRanges))
{
txn.obsoleteOriginals();
txn.finish();
logger.info("SSTable {} ([{}, {}]) does not intersect the owned ranges ({}), dropping it", sstable, sstable.first.getToken(), sstable.last.getToken(), allRanges);
return;
}
long start = nanoTime();
long totalkeysWritten = 0;
long expectedBloomFilterSize = Math.max(cfs.metadata().params.minIndexInterval,
SSTableReader.getApproximateKeyCount(txn.originals()));
if (logger.isTraceEnabled())
logger.trace("Expected bloom filter size : {}", expectedBloomFilterSize);
logger.info("Cleaning up {}", sstable);
File compactionFileLocation = sstable.descriptor.directory;
RateLimiter limiter = getRateLimiter();
double compressionRatio = sstable.getCompressionRatio();
if (compressionRatio == MetadataCollector.NO_COMPRESSION_RATIO)
compressionRatio = 1.0;
List<SSTableReader> finished;
int nowInSec = FBUtilities.nowInSeconds();
try (SSTableRewriter writer = SSTableRewriter.construct(cfs, txn, false, sstable.maxDataAge);
ISSTableScanner scanner = cleanupStrategy.getScanner(sstable);
CompactionController controller = new CompactionController(cfs, txn.originals(), getDefaultGcBefore(cfs, nowInSec));
Refs<SSTableReader> refs = Refs.ref(Collections.singleton(sstable));
CompactionIterator ci = new CompactionIterator(OperationType.CLEANUP, Collections.singletonList(scanner), controller, nowInSec, nextTimeUUID(), active, null))
{
StatsMetadata metadata = sstable.getSSTableMetadata();
writer.switchWriter(createWriter(cfs, compactionFileLocation, expectedBloomFilterSize, metadata.repairedAt, metadata.pendingRepair, metadata.isTransient, sstable, txn));
long lastBytesScanned = 0;
while (ci.hasNext())
{
try (UnfilteredRowIterator partition = ci.next();
UnfilteredRowIterator notCleaned = cleanupStrategy.cleanup(partition))
{
if (notCleaned == null)
continue;
if (writer.append(notCleaned) != null)
totalkeysWritten++;
long bytesScanned = scanner.getBytesScanned();
compactionRateLimiterAcquire(limiter, bytesScanned, lastBytesScanned, compressionRatio);
lastBytesScanned = bytesScanned;
}
}
// flush to ensure we don't lose the tombstones on a restart, since they are not commitlog'd
cfs.indexManager.flushAllIndexesBlocking();
finished = writer.finish();
}
if (!finished.isEmpty())
{
String format = "Cleaned up to %s. %s to %s (~%d%% of original) for %,d keys. Time: %,dms.";
long dTime = TimeUnit.NANOSECONDS.toMillis(nanoTime() - start);
long startsize = sstable.onDiskLength();
long endsize = 0;
for (SSTableReader newSstable : finished)
endsize += newSstable.onDiskLength();
double ratio = (double) endsize / (double) startsize;
logger.info(String.format(format, finished.get(0).getFilename(), FBUtilities.prettyPrintMemory(startsize),
FBUtilities.prettyPrintMemory(endsize), (int) (ratio * 100), totalkeysWritten, dTime));
}
}
static void compactionRateLimiterAcquire(RateLimiter limiter, long bytesScanned, long lastBytesScanned, double compressionRatio)
{
long lengthRead = (long) ((bytesScanned - lastBytesScanned) * compressionRatio) + 1;
while (lengthRead >= Integer.MAX_VALUE)
{
limiter.acquire(Integer.MAX_VALUE);
lengthRead -= Integer.MAX_VALUE;
}
if (lengthRead > 0)
{
limiter.acquire((int) lengthRead);
}
}
private static abstract class CleanupStrategy
{
protected final Collection<Range<Token>> ranges;
protected final int nowInSec;
protected CleanupStrategy(Collection<Range<Token>> ranges, int nowInSec)
{
this.ranges = ranges;
this.nowInSec = nowInSec;
}
public static CleanupStrategy get(ColumnFamilyStore cfs, Collection<Range<Token>> ranges, Collection<Range<Token>> transientRanges, boolean isRepaired, int nowInSec)
{
if (cfs.indexManager.hasIndexes())
{
if (!transientRanges.isEmpty())
{
//Shouldn't have been possible to create this situation
throw new AssertionError("Can't have indexes and transient ranges");
}
return new Full(cfs, ranges, nowInSec);
}
return new Bounded(cfs, ranges, transientRanges, isRepaired, nowInSec);
}
public abstract ISSTableScanner getScanner(SSTableReader sstable);
public abstract UnfilteredRowIterator cleanup(UnfilteredRowIterator partition);
private static final class Bounded extends CleanupStrategy
{
private final Collection<Range<Token>> transientRanges;
private final boolean isRepaired;
public Bounded(final ColumnFamilyStore cfs, Collection<Range<Token>> ranges, Collection<Range<Token>> transientRanges, boolean isRepaired, int nowInSec)
{
super(ranges, nowInSec);
instance.cacheCleanupExecutor.submit(new Runnable()
{
@Override
public void run()
{
cfs.cleanupCache();
}
});
this.transientRanges = transientRanges;
this.isRepaired = isRepaired;
}
@Override
public ISSTableScanner getScanner(SSTableReader sstable)
{
//If transient replication is enabled and there are transient ranges
//then cleanup should remove any partitions that are repaired and in the transient range
//as they should already be synchronized at other full replicas.
//So just don't scan the portion of the table containing the repaired transient ranges
Collection<Range<Token>> rangesToScan = ranges;
if (isRepaired)
{
rangesToScan = Collections2.filter(ranges, range -> !transientRanges.contains(range));
}
return sstable.getScanner(rangesToScan);
}
@Override
public UnfilteredRowIterator cleanup(UnfilteredRowIterator partition)
{
return partition;
}
}
private static final class Full extends CleanupStrategy
{
private final ColumnFamilyStore cfs;
public Full(ColumnFamilyStore cfs, Collection<Range<Token>> ranges, int nowInSec)
{
super(ranges, nowInSec);
this.cfs = cfs;
}
@Override
public ISSTableScanner getScanner(SSTableReader sstable)
{
return sstable.getScanner();
}
@Override
public UnfilteredRowIterator cleanup(UnfilteredRowIterator partition)
{
if (Range.isInRanges(partition.partitionKey().getToken(), ranges))
return partition;
cfs.invalidateCachedPartition(partition.partitionKey());
cfs.indexManager.deletePartition(partition, nowInSec);
return null;
}
}
}
public static SSTableWriter createWriter(ColumnFamilyStore cfs,
File compactionFileLocation,
long expectedBloomFilterSize,
long repairedAt,
TimeUUID pendingRepair,
boolean isTransient,
SSTableReader sstable,
LifecycleTransaction txn)
{
FileUtils.createDirectory(compactionFileLocation);
return SSTableWriter.create(cfs.metadata,
cfs.newSSTableDescriptor(compactionFileLocation),
expectedBloomFilterSize,
repairedAt,
pendingRepair,
isTransient,
sstable.getSSTableLevel(),
sstable.header,
cfs.indexManager.listIndexes(),
txn);
}
public static SSTableWriter createWriterForAntiCompaction(ColumnFamilyStore cfs,
File compactionFileLocation,
int expectedBloomFilterSize,
long repairedAt,
TimeUUID pendingRepair,
boolean isTransient,
Collection<SSTableReader> sstables,
ILifecycleTransaction txn)
{
FileUtils.createDirectory(compactionFileLocation);
int minLevel = Integer.MAX_VALUE;
// if all sstables have the same level, we can compact them together without creating overlap during anticompaction
// note that we only anticompact from unrepaired sstables, which is not leveled, but we still keep original level
// after first migration to be able to drop the sstables back in their original place in the repaired sstable manifest
for (SSTableReader sstable : sstables)
{
if (minLevel == Integer.MAX_VALUE)
minLevel = sstable.getSSTableLevel();
if (minLevel != sstable.getSSTableLevel())
{
minLevel = 0;
break;
}
}
return SSTableWriter.create(cfs.newSSTableDescriptor(compactionFileLocation),
(long) expectedBloomFilterSize,
repairedAt,
pendingRepair,
isTransient,
cfs.metadata,
new MetadataCollector(sstables, cfs.metadata().comparator, minLevel),
SerializationHeader.make(cfs.metadata(), sstables),
cfs.indexManager.listIndexes(),
txn);
}
/**
* Splits up an sstable into two new sstables. The first of the new tables will store repaired ranges, the second
* will store the non-repaired ranges. Once anticompation is completed, the original sstable is marked as compacted
* and subsequently deleted.
* @param cfs
* @param txn a transaction over the repaired sstables to anticompact
* @param ranges full and transient ranges to be placed into one of the new sstables. The repaired table will be tracked via
* the {@link org.apache.cassandra.io.sstable.metadata.StatsMetadata#pendingRepair} field.
* @param pendingRepair the repair session we're anti-compacting for
* @param isCancelled function that indicates if active anti-compaction should be canceled
*/
private void doAntiCompaction(ColumnFamilyStore cfs,
RangesAtEndpoint ranges,
LifecycleTransaction txn,
TimeUUID pendingRepair,
BooleanSupplier isCancelled)
{
int originalCount = txn.originals().size();
logger.info("Performing anticompaction on {} sstables for {}", originalCount, pendingRepair);
//Group SSTables
Set<SSTableReader> sstables = txn.originals();
// Repairs can take place on both unrepaired (incremental + full) and repaired (full) data.
// Although anti-compaction could work on repaired sstables as well and would result in having more accurate
// repairedAt values for these, we still avoid anti-compacting already repaired sstables, as we currently don't
// make use of any actual repairedAt value and splitting up sstables just for that is not worth it at this point.
Set<SSTableReader> unrepairedSSTables = sstables.stream().filter((s) -> !s.isRepaired()).collect(Collectors.toSet());
cfs.metric.bytesAnticompacted.inc(SSTableReader.getTotalBytes(unrepairedSSTables));
Collection<Collection<SSTableReader>> groupedSSTables = cfs.getCompactionStrategyManager().groupSSTablesForAntiCompaction(unrepairedSSTables);
// iterate over sstables to check if the full / transient / unrepaired ranges intersect them.
int antiCompactedSSTableCount = 0;
for (Collection<SSTableReader> sstableGroup : groupedSSTables)
{
try (LifecycleTransaction groupTxn = txn.split(sstableGroup))
{
int antiCompacted = antiCompactGroup(cfs, ranges, groupTxn, pendingRepair, isCancelled);
antiCompactedSSTableCount += antiCompacted;
}
}
String format = "Anticompaction completed successfully, anticompacted from {} to {} sstable(s) for {}.";
logger.info(format, originalCount, antiCompactedSSTableCount, pendingRepair);
}
@VisibleForTesting
int antiCompactGroup(ColumnFamilyStore cfs,
RangesAtEndpoint ranges,
LifecycleTransaction txn,
TimeUUID pendingRepair,
BooleanSupplier isCancelled)
{
Preconditions.checkArgument(!ranges.isEmpty(), "need at least one full or transient range");
long groupMaxDataAge = -1;
for (Iterator<SSTableReader> i = txn.originals().iterator(); i.hasNext();)
{
SSTableReader sstable = i.next();
if (groupMaxDataAge < sstable.maxDataAge)
groupMaxDataAge = sstable.maxDataAge;
}
if (txn.originals().size() == 0)
{
logger.info("No valid anticompactions for this group, All sstables were compacted and are no longer available");
return 0;
}
logger.info("Anticompacting {} in {}.{} for {}", txn.originals(), cfs.keyspace.getName(), cfs.getTableName(), pendingRepair);
Set<SSTableReader> sstableAsSet = txn.originals();
File destination = cfs.getDirectories().getWriteableLocationAsFile(cfs.getExpectedCompactedFileSize(sstableAsSet, OperationType.ANTICOMPACTION));
int nowInSec = FBUtilities.nowInSeconds();
RateLimiter limiter = getRateLimiter();
/**
* HACK WARNING
*
* We have multiple writers operating over the same Transaction, producing different sets of sstables that all
* logically replace the transaction's originals. The SSTableRewriter assumes it has exclusive control over
* the transaction state, and this will lead to temporarily inconsistent sstable/tracker state if we do not
* take special measures to avoid it.
*
* Specifically, if a number of rewriter have prepareToCommit() invoked in sequence, then two problematic things happen:
* 1. The obsoleteOriginals() call of the first rewriter immediately remove the originals from the tracker, despite
* their having been only partially replaced. To avoid this, we must either avoid obsoleteOriginals() or checkpoint()
* 2. The LifecycleTransaction may only have prepareToCommit() invoked once, and this will checkpoint() also.
*
* Similarly commit() would finalise partially complete on-disk state.
*
* To avoid these problems, we introduce a SharedTxn that proxies all calls onto the underlying transaction
* except prepareToCommit(), checkpoint(), obsoleteOriginals(), and commit().
* We then invoke these methods directly once each of the rewriter has updated the transaction
* with their share of replacements.
*
* Note that for the same essential reason we also explicitly disable early open.
* By noop-ing checkpoint we avoid any of the problems with early open, but by continuing to explicitly
* disable it we also prevent any of the extra associated work from being performed.
*/
class SharedTxn extends WrappedLifecycleTransaction
{
public SharedTxn(ILifecycleTransaction delegate) { super(delegate); }
public Throwable commit(Throwable accumulate) { return accumulate; }
public void prepareToCommit() {}
public void checkpoint() {}
public void obsoleteOriginals() {}
public void close() {}
}
CompactionStrategyManager strategy = cfs.getCompactionStrategyManager();
try (SharedTxn sharedTxn = new SharedTxn(txn);
SSTableRewriter fullWriter = SSTableRewriter.constructWithoutEarlyOpening(sharedTxn, false, groupMaxDataAge);
SSTableRewriter transWriter = SSTableRewriter.constructWithoutEarlyOpening(sharedTxn, false, groupMaxDataAge);
SSTableRewriter unrepairedWriter = SSTableRewriter.constructWithoutEarlyOpening(sharedTxn, false, groupMaxDataAge);
AbstractCompactionStrategy.ScannerList scanners = strategy.getScanners(txn.originals());
CompactionController controller = new CompactionController(cfs, sstableAsSet, getDefaultGcBefore(cfs, nowInSec));
CompactionIterator ci = getAntiCompactionIterator(scanners.scanners, controller, nowInSec, nextTimeUUID(), active, isCancelled))
{
int expectedBloomFilterSize = Math.max(cfs.metadata().params.minIndexInterval, (int)(SSTableReader.getApproximateKeyCount(sstableAsSet)));
fullWriter.switchWriter(CompactionManager.createWriterForAntiCompaction(cfs, destination, expectedBloomFilterSize, UNREPAIRED_SSTABLE, pendingRepair, false, sstableAsSet, txn));
transWriter.switchWriter(CompactionManager.createWriterForAntiCompaction(cfs, destination, expectedBloomFilterSize, UNREPAIRED_SSTABLE, pendingRepair, true, sstableAsSet, txn));
unrepairedWriter.switchWriter(CompactionManager.createWriterForAntiCompaction(cfs, destination, expectedBloomFilterSize, UNREPAIRED_SSTABLE, NO_PENDING_REPAIR, false, sstableAsSet, txn));
Predicate<Token> fullChecker = !ranges.onlyFull().isEmpty() ? new Range.OrderedRangeContainmentChecker(ranges.onlyFull().ranges()) : t -> false;
Predicate<Token> transChecker = !ranges.onlyTransient().isEmpty() ? new Range.OrderedRangeContainmentChecker(ranges.onlyTransient().ranges()) : t -> false;
double compressionRatio = scanners.getCompressionRatio();
if (compressionRatio == MetadataCollector.NO_COMPRESSION_RATIO)
compressionRatio = 1.0;
long lastBytesScanned = 0;
while (ci.hasNext())
{
try (UnfilteredRowIterator partition = ci.next())
{
Token token = partition.partitionKey().getToken();
// if this row is contained in the full or transient ranges, append it to the appropriate sstable
if (fullChecker.test(token))
{
fullWriter.append(partition);
}
else if (transChecker.test(token))
{
transWriter.append(partition);
}
else
{
// otherwise, append it to the unrepaired sstable
unrepairedWriter.append(partition);
}
long bytesScanned = scanners.getTotalBytesScanned();
compactionRateLimiterAcquire(limiter, bytesScanned, lastBytesScanned, compressionRatio);
lastBytesScanned = bytesScanned;
}
}
fullWriter.prepareToCommit();
transWriter.prepareToCommit();
unrepairedWriter.prepareToCommit();
txn.checkpoint();
txn.obsoleteOriginals();
txn.prepareToCommit();
List<SSTableReader> fullSSTables = new ArrayList<>(fullWriter.finished());
List<SSTableReader> transSSTables = new ArrayList<>(transWriter.finished());
List<SSTableReader> unrepairedSSTables = new ArrayList<>(unrepairedWriter.finished());
fullWriter.commit();
transWriter.commit();
unrepairedWriter.commit();
txn.commit();
logger.info("Anticompacted {} in {}.{} to full = {}, transient = {}, unrepaired = {} for {}",
sstableAsSet,
cfs.keyspace.getName(),
cfs.getTableName(),
fullSSTables,
transSSTables,
unrepairedSSTables,
pendingRepair);
return fullSSTables.size() + transSSTables.size() + unrepairedSSTables.size();
}
catch (Throwable e)
{
if (e instanceof CompactionInterruptedException && isCancelled.getAsBoolean())
{
logger.info("Anticompaction has been canceled for session {}", pendingRepair);
logger.trace(e.getMessage(), e);
}
else
{
JVMStabilityInspector.inspectThrowable(e);
logger.error("Error anticompacting " + txn + " for " + pendingRepair, e);
}
throw e;
}
}
@VisibleForTesting
public static CompactionIterator getAntiCompactionIterator(List<ISSTableScanner> scanners, CompactionController controller, int nowInSec, TimeUUID timeUUID, ActiveCompactionsTracker activeCompactions, BooleanSupplier isCancelled)
{
return new CompactionIterator(OperationType.ANTICOMPACTION, scanners, controller, nowInSec, timeUUID, activeCompactions, null)
{
public boolean isStopRequested()
{
return super.isStopRequested() || isCancelled.getAsBoolean();
}
};
}
@VisibleForTesting
Future<?> submitIndexBuild(final SecondaryIndexBuilder builder, ActiveCompactionsTracker activeCompactions)
{
Runnable runnable = new Runnable()
{
public void run()
{
activeCompactions.beginCompaction(builder);
try
{
builder.build();
}
finally
{
activeCompactions.finishCompaction(builder);
}
}
};
return executor.submitIfRunning(runnable, "index build");
}
/**
* Is not scheduled, because it is performing disjoint work from sstable compaction.
*/
public Future<?> submitIndexBuild(final SecondaryIndexBuilder builder)
{
return submitIndexBuild(builder, active);
}
public Future<?> submitCacheWrite(final AutoSavingCache.Writer writer)
{
return submitCacheWrite(writer, active);
}
Future<?> submitCacheWrite(final AutoSavingCache.Writer writer, ActiveCompactionsTracker activeCompactions)
{
Runnable runnable = new Runnable()
{
public void run()
{
if (!AutoSavingCache.flushInProgress.add(writer.cacheType()))
{
logger.trace("Cache flushing was already in progress: skipping {}", writer.getCompactionInfo());
return;
}
try
{
activeCompactions.beginCompaction(writer);
try
{
writer.saveCache();
}
finally
{
activeCompactions.finishCompaction(writer);
}
}
finally
{
AutoSavingCache.flushInProgress.remove(writer.cacheType());
}
}
};
return executor.submitIfRunning(runnable, "cache write");
}
public List<SSTableReader> runIndexSummaryRedistribution(IndexSummaryRedistribution redistribution) throws IOException
{
return runIndexSummaryRedistribution(redistribution, active);
}
@VisibleForTesting
List<SSTableReader> runIndexSummaryRedistribution(IndexSummaryRedistribution redistribution, ActiveCompactionsTracker activeCompactions) throws IOException
{
activeCompactions.beginCompaction(redistribution);
try
{
return redistribution.redistributeSummaries();
}
finally
{
activeCompactions.finishCompaction(redistribution);
}
}
public static int getDefaultGcBefore(ColumnFamilyStore cfs, int nowInSec)
{
// 2ndary indexes have ExpiringColumns too, so we need to purge tombstones deleted before now. We do not need to
// add any GcGrace however since 2ndary indexes are local to a node.
return cfs.isIndex() ? nowInSec : cfs.gcBefore(nowInSec);
}
public Future<Long> submitViewBuilder(final ViewBuilderTask task)
{
return submitViewBuilder(task, active);
}
@VisibleForTesting
Future<Long> submitViewBuilder(final ViewBuilderTask task, ActiveCompactionsTracker activeCompactions)
{
return viewBuildExecutor.submitIfRunning(() -> {
activeCompactions.beginCompaction(task);
try
{
return task.call();
}
finally
{
activeCompactions.finishCompaction(task);
}
}, "view build");
}
public int getActiveCompactions()
{
return active.getCompactions().size();
}
public static boolean isCompactor(Thread thread)
{
return thread.getThreadGroup().getParent() == compactionThreadGroup;
}
// TODO: this is a bit ugly, but no uglier than it was
static class CompactionExecutor extends WrappedExecutorPlus
{
static final ThreadGroup compactionThreadGroup = executorFactory().newThreadGroup("compaction");
public CompactionExecutor()
{
this(executorFactory(), getConcurrentCompactors(), "CompactionExecutor", Integer.MAX_VALUE);
}
public CompactionExecutor(int threads, String name, int queueSize)
{
this(executorFactory(), threads, name, queueSize);
}
protected CompactionExecutor(ExecutorFactory executorFactory, int threads, String name, int queueSize)
{
super(executorFactory
.withJmxInternal()
.configurePooled(name, threads)
.withThreadGroup(compactionThreadGroup)
.withQueueLimit(queueSize).build());
}
public Future<Void> submitIfRunning(Runnable task, String name)
{
return submitIfRunning(callable(name, task), name);
}
/**
* Submit the task but only if the executor has not been shutdown.If the executor has
* been shutdown, or in case of a rejected execution exception return a cancelled future.
*
* @param task - the task to submit
* @param name - the task name to use in log messages
*
* @return the future that will deliver the task result, or a future that has already been
* cancelled if the task could not be submitted.
*/
public <T> Future<T> submitIfRunning(Callable<T> task, String name)
{
try
{
return submit(task);
}
catch (RejectedExecutionException ex)
{
if (isShutdown())
logger.info("Executor has shut down, could not submit {}", name);
else
logger.error("Failed to submit {}", name, ex);
return ImmediateFuture.cancelled();
}
}
public void execute(Runnable command)
{
executor.execute(command);
}
public <T> Future<T> submit(Callable<T> task)
{
return executor.submit(task);
}
public <T> Future<T> submit(Runnable task, T result)
{
return submit(callable(task, result));
}
public Future<?> submit(Runnable task)
{
return submit(task, null);
}
}
// TODO: pull out relevant parts of CompactionExecutor and move to ValidationManager
public static class ValidationExecutor extends CompactionExecutor
{
// CompactionExecutor, and by extension ValidationExecutor, use ExecutorPlus's
// default RejectedExecutionHandler which blocks the submitting thread when the work queue is
// full. The calling thread in this case is AntiEntropyStage, so in most cases we don't actually
// want to block when the ValidationExecutor is saturated as this prevents progress on all
// repair tasks and may cause repair sessions to time out. Also, it can lead to references to
// heavyweight validation responses containing merkle trees being held for extended periods which
// increases GC pressure. Using LinkedBlockingQueue instead of the default SynchronousQueue allows
// tasks to be submitted without blocking the caller, but will always prefer queueing to creating
// new threads if the pool already has at least `corePoolSize` threads already running. For this
// reason we set corePoolSize to the maximum desired concurrency, but allow idle core threads to
// be terminated.
public ValidationExecutor()
{
super(DatabaseDescriptor.getConcurrentValidations(),
"ValidationExecutor",
Integer.MAX_VALUE);
}
public void adjustPoolSize()
{
setMaximumPoolSize(DatabaseDescriptor.getConcurrentValidations());
setCorePoolSize(DatabaseDescriptor.getConcurrentValidations());
}
}
private static class ViewBuildExecutor extends CompactionExecutor
{
public ViewBuildExecutor()
{
super(DatabaseDescriptor.getConcurrentViewBuilders(), "ViewBuildExecutor", Integer.MAX_VALUE);
}
}
private static class CacheCleanupExecutor extends CompactionExecutor
{
public CacheCleanupExecutor()
{
super(1, "CacheCleanupExecutor", Integer.MAX_VALUE);
}
}
public void incrementAborted()
{
metrics.compactionsAborted.inc();
}
public void incrementCompactionsReduced()
{
metrics.compactionsReduced.inc();
}
public void incrementSstablesDropppedFromCompactions(long num)
{
metrics.sstablesDropppedFromCompactions.inc(num);
}
public List<Map<String, String>> getCompactions()
{
List<Holder> compactionHolders = active.getCompactions();
List<Map<String, String>> out = new ArrayList<Map<String, String>>(compactionHolders.size());
for (CompactionInfo.Holder ci : compactionHolders)
out.add(ci.getCompactionInfo().asMap());
return out;
}
public List<String> getCompactionSummary()
{
List<Holder> compactionHolders = active.getCompactions();
List<String> out = new ArrayList<String>(compactionHolders.size());
for (CompactionInfo.Holder ci : compactionHolders)
out.add(ci.getCompactionInfo().toString());
return out;
}
public TabularData getCompactionHistory()
{
try
{
return SystemKeyspace.getCompactionHistory();
}
catch (OpenDataException e)
{
throw new RuntimeException(e);
}
}
public long getTotalBytesCompacted()
{
return metrics.bytesCompacted.getCount();
}
public long getTotalCompactionsCompleted()
{
return metrics.totalCompactionsCompleted.getCount();
}
public int getPendingTasks()
{
return metrics.pendingTasks.getValue();
}
public long getCompletedTasks()
{
return metrics.completedTasks.getValue();
}
public void stopCompaction(String type)
{
OperationType operation = OperationType.valueOf(type);
for (Holder holder : active.getCompactions())
{
if (holder.getCompactionInfo().getTaskType() == operation)
holder.stop();
}
}
public void stopCompactionById(String compactionId)
{
for (Holder holder : active.getCompactions())
{
TimeUUID holderId = holder.getCompactionInfo().getTaskId();
if (holderId != null && holderId.equals(TimeUUID.fromString(compactionId)))
holder.stop();
}
}
public void setConcurrentCompactors(int value)
{
if (value > executor.getCorePoolSize())
{
// we are increasing the value
executor.setMaximumPoolSize(value);
executor.setCorePoolSize(value);
}
else if (value < executor.getCorePoolSize())
{
// we are reducing the value
executor.setCorePoolSize(value);
executor.setMaximumPoolSize(value);
}
}
public void setConcurrentValidations()
{
validationExecutor.adjustPoolSize();
}
public void setConcurrentViewBuilders(int value)
{
if (value > viewBuildExecutor.getCorePoolSize())
{
// we are increasing the value
viewBuildExecutor.setMaximumPoolSize(value);
viewBuildExecutor.setCorePoolSize(value);
}
else if (value < viewBuildExecutor.getCorePoolSize())
{
// we are reducing the value
viewBuildExecutor.setCorePoolSize(value);
viewBuildExecutor.setMaximumPoolSize(value);
}
}
public int getCoreCompactorThreads()
{
return executor.getCorePoolSize();
}
public void setCoreCompactorThreads(int number)
{
executor.setCorePoolSize(number);
}
public int getMaximumCompactorThreads()
{
return executor.getMaximumPoolSize();
}
public void setMaximumCompactorThreads(int number)
{
executor.setMaximumPoolSize(number);
}
public int getCoreValidationThreads()
{
return validationExecutor.getCorePoolSize();
}
public void setCoreValidationThreads(int number)
{
validationExecutor.setCorePoolSize(number);
}
public int getMaximumValidatorThreads()
{
return validationExecutor.getMaximumPoolSize();
}
public void setMaximumValidatorThreads(int number)
{
validationExecutor.setMaximumPoolSize(number);
}
public boolean getDisableSTCSInL0()
{
return DatabaseDescriptor.getDisableSTCSInL0();
}
public void setDisableSTCSInL0(boolean disabled)
{
if (disabled != DatabaseDescriptor.getDisableSTCSInL0())
logger.info("Changing STCS in L0 disabled from {} to {}", DatabaseDescriptor.getDisableSTCSInL0(), disabled);
DatabaseDescriptor.setDisableSTCSInL0(disabled);
}
public int getCoreViewBuildThreads()
{
return viewBuildExecutor.getCorePoolSize();
}
public void setCoreViewBuildThreads(int number)
{
viewBuildExecutor.setCorePoolSize(number);
}
public int getMaximumViewBuildThreads()
{
return viewBuildExecutor.getMaximumPoolSize();
}
public void setMaximumViewBuildThreads(int number)
{
viewBuildExecutor.setMaximumPoolSize(number);
}
public boolean getAutomaticSSTableUpgradeEnabled()
{
return DatabaseDescriptor.automaticSSTableUpgrade();
}
public void setAutomaticSSTableUpgradeEnabled(boolean enabled)
{
DatabaseDescriptor.setAutomaticSSTableUpgradeEnabled(enabled);
}
public int getMaxConcurrentAutoUpgradeTasks()
{
return DatabaseDescriptor.maxConcurrentAutoUpgradeTasks();
}
public void setMaxConcurrentAutoUpgradeTasks(int value)
{
try
{
DatabaseDescriptor.setMaxConcurrentAutoUpgradeTasks(value);
}
catch (ConfigurationException e)
{
throw new RuntimeException(e.getMessage());
}
}
/**
* Try to stop all of the compactions for given ColumnFamilies.
*
* Note that this method does not wait for all compactions to finish; you'll need to loop against
* isCompacting if you want that behavior.
*
* @param columnFamilies The ColumnFamilies to try to stop compaction upon.
* @param sstablePredicate the sstable predicate to match on
* @param interruptValidation true if validation operations for repair should also be interrupted
*/
public void interruptCompactionFor(Iterable<TableMetadata> columnFamilies, Predicate<SSTableReader> sstablePredicate, boolean interruptValidation)
{
assert columnFamilies != null;
// interrupt in-progress compactions
for (Holder compactionHolder : active.getCompactions())
{
CompactionInfo info = compactionHolder.getCompactionInfo();
if ((info.getTaskType() == OperationType.VALIDATION) && !interruptValidation)
continue;
if (info.getTableMetadata() == null || Iterables.contains(columnFamilies, info.getTableMetadata()))
{
if (info.shouldStop(sstablePredicate))
compactionHolder.stop();
}
}
}
public void interruptCompactionForCFs(Iterable<ColumnFamilyStore> cfss, Predicate<SSTableReader> sstablePredicate, boolean interruptValidation)
{
List<TableMetadata> metadata = new ArrayList<>();
for (ColumnFamilyStore cfs : cfss)
metadata.add(cfs.metadata());
interruptCompactionFor(metadata, sstablePredicate, interruptValidation);
}
public void waitForCessation(Iterable<ColumnFamilyStore> cfss, Predicate<SSTableReader> sstablePredicate)
{
long start = nanoTime();
long delay = TimeUnit.MINUTES.toNanos(1);
while (nanoTime() - start < delay)
{
if (CompactionManager.instance.isCompacting(cfss, sstablePredicate))
Uninterruptibles.sleepUninterruptibly(1, TimeUnit.MILLISECONDS);
else
break;
}
}
public List<CompactionInfo> getSSTableTasks()
{
return active.getCompactions()
.stream()
.map(CompactionInfo.Holder::getCompactionInfo)
.filter(task -> task.getTaskType() != OperationType.COUNTER_CACHE_SAVE
&& task.getTaskType() != OperationType.KEY_CACHE_SAVE
&& task.getTaskType() != OperationType.ROW_CACHE_SAVE)
.collect(Collectors.toList());
}
/**
* Return whether "global" compactions should be paused, used by ColumnFamilyStore#runWithCompactionsDisabled
*
* a global compaction is one that includes several/all tables, currently only IndexSummaryBuilder
*/
public boolean isGlobalCompactionPaused()
{
return globalCompactionPauseCount.get() > 0;
}
public CompactionPauser pauseGlobalCompaction()
{
CompactionPauser pauser = globalCompactionPauseCount::decrementAndGet;
globalCompactionPauseCount.incrementAndGet();
return pauser;
}
public interface CompactionPauser extends AutoCloseable
{
public void close();
}
}