blob: ca67ddb0ea6f70e670c6d69f3146a54ec5ffd505 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.cassandra.db.compaction;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.ConcurrentModificationException;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.primitives.Longs;
import org.apache.cassandra.io.util.File;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.Directories;
import org.apache.cassandra.db.DiskBoundaries;
import org.apache.cassandra.db.SerializationHeader;
import org.apache.cassandra.db.compaction.AbstractStrategyHolder.TaskSupplier;
import org.apache.cassandra.db.compaction.PendingRepairManager.CleanupTask;
import org.apache.cassandra.db.lifecycle.LifecycleNewTracker;
import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
import org.apache.cassandra.db.lifecycle.SSTableSet;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.index.Index;
import org.apache.cassandra.io.sstable.Component;
import org.apache.cassandra.io.sstable.Descriptor;
import org.apache.cassandra.io.sstable.ISSTableScanner;
import org.apache.cassandra.io.sstable.SSTable;
import org.apache.cassandra.io.sstable.SSTableMultiWriter;
import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
import org.apache.cassandra.io.sstable.metadata.StatsMetadata;
import org.apache.cassandra.notifications.INotification;
import org.apache.cassandra.notifications.INotificationConsumer;
import org.apache.cassandra.notifications.SSTableAddedNotification;
import org.apache.cassandra.notifications.SSTableDeletingNotification;
import org.apache.cassandra.notifications.SSTableListChangedNotification;
import org.apache.cassandra.notifications.SSTableMetadataChanged;
import org.apache.cassandra.notifications.SSTableRepairStatusChanged;
import org.apache.cassandra.repair.consistent.admin.CleanupSummary;
import org.apache.cassandra.schema.CompactionParams;
import org.apache.cassandra.schema.TableMetadata;
import org.apache.cassandra.service.ActiveRepairService;
import org.apache.cassandra.utils.TimeUUID;
import static org.apache.cassandra.db.compaction.AbstractStrategyHolder.GroupedSSTableContainer;
/**
* Manages the compaction strategies.
*
* SSTables are isolated from each other based on their incremental repair status (repaired, unrepaired, or pending repair)
* and directory (determined by their starting token). This class handles the routing between {@link AbstractStrategyHolder}
* instances based on repair status, and the {@link AbstractStrategyHolder} instances have separate compaction strategies
* for each directory, which it routes sstables to. Note that {@link PendingRepairHolder} also divides sstables on their
* pending repair id.
*
* Operations on this class are guarded by a {@link ReentrantReadWriteLock}. This lock performs mutual exclusion on
* reads and writes to the following variables: {@link this#repaired}, {@link this#unrepaired}, {@link this#isActive},
* {@link this#params}, {@link this#currentBoundaries}. Whenever performing reads on these variables,
* the {@link this#readLock} should be acquired. Likewise, updates to these variables should be guarded by
* {@link this#writeLock}.
*
* Whenever the {@link DiskBoundaries} change, the compaction strategies must be reloaded, so in order to ensure
* the compaction strategy placement reflect most up-to-date disk boundaries, call {@link this#maybeReloadDiskBoundaries()}
* before acquiring the read lock to acess the strategies.
*
*/
public class CompactionStrategyManager implements INotificationConsumer
{
private static final Logger logger = LoggerFactory.getLogger(CompactionStrategyManager.class);
public final CompactionLogger compactionLogger;
private final ColumnFamilyStore cfs;
private final boolean partitionSSTablesByTokenRange;
private final Supplier<DiskBoundaries> boundariesSupplier;
/**
* Performs mutual exclusion on the variables below
*/
private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
private final ReentrantReadWriteLock.ReadLock readLock = lock.readLock();
private final ReentrantReadWriteLock.WriteLock writeLock = lock.writeLock();
/**
* Variables guarded by read and write lock above
*/
private final PendingRepairHolder transientRepairs;
private final PendingRepairHolder pendingRepairs;
private final CompactionStrategyHolder repaired;
private final CompactionStrategyHolder unrepaired;
private final ImmutableList<AbstractStrategyHolder> holders;
private volatile CompactionParams params;
private DiskBoundaries currentBoundaries;
private volatile boolean enabled;
private volatile boolean isActive = true;
/*
We keep a copy of the schema compaction parameters here to be able to decide if we
should update the compaction strategy in maybeReload() due to an ALTER.
If a user changes the local compaction strategy and then later ALTERs a compaction parameter,
we will use the new compaction parameters.
*/
private volatile CompactionParams schemaCompactionParams;
private volatile boolean supportsEarlyOpen;
private volatile int fanout;
private volatile long maxSSTableSizeBytes;
private volatile String name;
public static int TWCS_BUCKET_COUNT_MAX = 128;
public CompactionStrategyManager(ColumnFamilyStore cfs)
{
this(cfs, cfs::getDiskBoundaries, cfs.getPartitioner().splitter().isPresent());
}
@VisibleForTesting
public CompactionStrategyManager(ColumnFamilyStore cfs, Supplier<DiskBoundaries> boundariesSupplier,
boolean partitionSSTablesByTokenRange)
{
AbstractStrategyHolder.DestinationRouter router = new AbstractStrategyHolder.DestinationRouter()
{
public int getIndexForSSTable(SSTableReader sstable)
{
return compactionStrategyIndexFor(sstable);
}
public int getIndexForSSTableDirectory(Descriptor descriptor)
{
return compactionStrategyIndexForDirectory(descriptor);
}
};
transientRepairs = new PendingRepairHolder(cfs, router, true);
pendingRepairs = new PendingRepairHolder(cfs, router, false);
repaired = new CompactionStrategyHolder(cfs, router, true);
unrepaired = new CompactionStrategyHolder(cfs, router, false);
holders = ImmutableList.of(transientRepairs, pendingRepairs, repaired, unrepaired);
cfs.getTracker().subscribe(this);
logger.trace("{} subscribed to the data tracker.", this);
this.cfs = cfs;
this.compactionLogger = new CompactionLogger(cfs, this);
this.boundariesSupplier = boundariesSupplier;
this.partitionSSTablesByTokenRange = partitionSSTablesByTokenRange;
params = cfs.metadata().params.compaction;
enabled = params.isEnabled();
reload(cfs.metadata().params.compaction);
}
/**
* Return the next background task
*
* Returns a task for the compaction strategy that needs it the most (most estimated remaining tasks)
*/
public AbstractCompactionTask getNextBackgroundTask(int gcBefore)
{
maybeReloadDiskBoundaries();
readLock.lock();
try
{
if (!isEnabled())
return null;
int numPartitions = getNumTokenPartitions();
// first try to promote/demote sstables from completed repairs
AbstractCompactionTask repairFinishedTask;
repairFinishedTask = pendingRepairs.getNextRepairFinishedTask();
if (repairFinishedTask != null)
return repairFinishedTask;
repairFinishedTask = transientRepairs.getNextRepairFinishedTask();
if (repairFinishedTask != null)
return repairFinishedTask;
// sort compaction task suppliers by remaining tasks descending
List<TaskSupplier> suppliers = new ArrayList<>(numPartitions * holders.size());
for (AbstractStrategyHolder holder : holders)
suppliers.addAll(holder.getBackgroundTaskSuppliers(gcBefore));
Collections.sort(suppliers);
// return the first non-null task
for (TaskSupplier supplier : suppliers)
{
AbstractCompactionTask task = supplier.getTask();
if (task != null)
return task;
}
return null;
}
finally
{
readLock.unlock();
}
}
/**
* finds the oldest (by modification date) non-latest-version sstable on disk and creates an upgrade task for it
* @return
*/
@VisibleForTesting
@SuppressWarnings("resource") // transaction is closed by AbstractCompactionTask::execute
AbstractCompactionTask findUpgradeSSTableTask()
{
if (!isEnabled() || !DatabaseDescriptor.automaticSSTableUpgrade())
return null;
Set<SSTableReader> compacting = cfs.getTracker().getCompacting();
List<SSTableReader> potentialUpgrade = cfs.getLiveSSTables()
.stream()
.filter(s -> !compacting.contains(s) && !s.descriptor.version.isLatestVersion())
.sorted((o1, o2) -> {
File f1 = new File(o1.descriptor.filenameFor(Component.DATA));
File f2 = new File(o2.descriptor.filenameFor(Component.DATA));
return Longs.compare(f1.lastModified(), f2.lastModified());
}).collect(Collectors.toList());
for (SSTableReader sstable : potentialUpgrade)
{
LifecycleTransaction txn = cfs.getTracker().tryModify(sstable, OperationType.UPGRADE_SSTABLES);
if (txn != null)
{
logger.debug("Running automatic sstable upgrade for {}", sstable);
return getCompactionStrategyFor(sstable).getCompactionTask(txn, Integer.MIN_VALUE, Long.MAX_VALUE);
}
}
return null;
}
public boolean isEnabled()
{
return enabled && isActive;
}
public boolean isActive()
{
return isActive;
}
public void resume()
{
writeLock.lock();
try
{
isActive = true;
}
finally
{
writeLock.unlock();
}
}
/**
* pause compaction while we cancel all ongoing compactions
*
* Separate call from enable/disable to not have to save the enabled-state externally
*/
public void pause()
{
writeLock.lock();
try
{
isActive = false;
}
finally
{
writeLock.unlock();
}
}
private void startup()
{
writeLock.lock();
try
{
for (SSTableReader sstable : cfs.getSSTables(SSTableSet.CANONICAL))
{
if (sstable.openReason != SSTableReader.OpenReason.EARLY)
compactionStrategyFor(sstable).addSSTable(sstable);
}
holders.forEach(AbstractStrategyHolder::startup);
supportsEarlyOpen = repaired.first().supportsEarlyOpen();
fanout = (repaired.first() instanceof LeveledCompactionStrategy) ? ((LeveledCompactionStrategy) repaired.first()).getLevelFanoutSize() : LeveledCompactionStrategy.DEFAULT_LEVEL_FANOUT_SIZE;
maxSSTableSizeBytes = repaired.first().getMaxSSTableBytes();
name = repaired.first().getName();
}
finally
{
writeLock.unlock();
}
if (repaired.first().logAll)
compactionLogger.enable();
}
/**
* return the compaction strategy for the given sstable
*
* returns differently based on the repaired status and which vnode the compaction strategy belongs to
* @param sstable
* @return
*/
public AbstractCompactionStrategy getCompactionStrategyFor(SSTableReader sstable)
{
maybeReloadDiskBoundaries();
return compactionStrategyFor(sstable);
}
@VisibleForTesting
AbstractCompactionStrategy compactionStrategyFor(SSTableReader sstable)
{
// should not call maybeReloadDiskBoundaries because it may be called from within lock
readLock.lock();
try
{
return getHolder(sstable).getStrategyFor(sstable);
}
finally
{
readLock.unlock();
}
}
/**
* Get the correct compaction strategy for the given sstable. If the first token starts within a disk boundary, we
* will add it to that compaction strategy.
*
* In the case we are upgrading, the first compaction strategy will get most files - we do not care about which disk
* the sstable is on currently (unless we don't know the local tokens yet). Once we start compacting we will write out
* sstables in the correct locations and give them to the correct compaction strategy instance.
*
* @param sstable
* @return
*/
int compactionStrategyIndexFor(SSTableReader sstable)
{
// should not call maybeReloadDiskBoundaries because it may be called from within lock
readLock.lock();
try
{
//We only have a single compaction strategy when sstables are not
//partitioned by token range
if (!partitionSSTablesByTokenRange)
return 0;
return currentBoundaries.getDiskIndex(sstable);
}
finally
{
readLock.unlock();
}
}
private int compactionStrategyIndexForDirectory(Descriptor descriptor)
{
readLock.lock();
try
{
return partitionSSTablesByTokenRange ? currentBoundaries.getBoundariesFromSSTableDirectory(descriptor) : 0;
}
finally
{
readLock.unlock();
}
}
@VisibleForTesting
CompactionStrategyHolder getRepairedUnsafe()
{
return repaired;
}
@VisibleForTesting
CompactionStrategyHolder getUnrepairedUnsafe()
{
return unrepaired;
}
@VisibleForTesting
PendingRepairHolder getPendingRepairsUnsafe()
{
return pendingRepairs;
}
@VisibleForTesting
PendingRepairHolder getTransientRepairsUnsafe()
{
return transientRepairs;
}
public boolean hasDataForPendingRepair(TimeUUID sessionID)
{
readLock.lock();
try
{
return pendingRepairs.hasDataForSession(sessionID) || transientRepairs.hasDataForSession(sessionID);
}
finally
{
readLock.unlock();
}
}
public void shutdown()
{
writeLock.lock();
try
{
isActive = false;
holders.forEach(AbstractStrategyHolder::shutdown);
compactionLogger.disable();
}
finally
{
writeLock.unlock();
}
}
public void maybeReload(TableMetadata metadata)
{
// compare the old schema configuration to the new one, ignore any locally set changes.
if (metadata.params.compaction.equals(schemaCompactionParams))
return;
writeLock.lock();
try
{
// compare the old schema configuration to the new one, ignore any locally set changes.
if (metadata.params.compaction.equals(schemaCompactionParams))
return;
reload(metadata.params.compaction);
}
finally
{
writeLock.unlock();
}
}
/**
* Checks if the disk boundaries changed and reloads the compaction strategies
* to reflect the most up-to-date disk boundaries.
*
* This is typically called before acquiring the {@link this#readLock} to ensure the most up-to-date
* disk locations and boundaries are used.
*
* This should *never* be called inside by a thread holding the {@link this#readLock}, since it
* will potentially acquire the {@link this#writeLock} to update the compaction strategies
* what can cause a deadlock.
*/
//TODO improve this to reload after receiving a notification rather than trying to reload on every operation
@VisibleForTesting
protected void maybeReloadDiskBoundaries()
{
if (!currentBoundaries.isOutOfDate())
return;
writeLock.lock();
try
{
if (!currentBoundaries.isOutOfDate())
return;
reload(params);
}
finally
{
writeLock.unlock();
}
}
/**
* Reload the compaction strategies
*
* Called after changing configuration and at startup.
* @param newCompactionParams
*/
private void reload(CompactionParams newCompactionParams)
{
boolean enabledWithJMX = enabled && !shouldBeEnabled();
boolean disabledWithJMX = !enabled && shouldBeEnabled();
if (currentBoundaries != null)
{
if (!newCompactionParams.equals(schemaCompactionParams))
logger.debug("Recreating compaction strategy - compaction parameters changed for {}.{}", cfs.keyspace.getName(), cfs.getTableName());
else if (currentBoundaries.isOutOfDate())
logger.debug("Recreating compaction strategy - disk boundaries are out of date for {}.{}.", cfs.keyspace.getName(), cfs.getTableName());
}
if (currentBoundaries == null || currentBoundaries.isOutOfDate())
currentBoundaries = boundariesSupplier.get();
setStrategy(newCompactionParams);
schemaCompactionParams = cfs.metadata().params.compaction;
if (disabledWithJMX || !shouldBeEnabled() && !enabledWithJMX)
disable();
else
enable();
startup();
}
private Iterable<AbstractCompactionStrategy> getAllStrategies()
{
return Iterables.concat(Iterables.transform(holders, AbstractStrategyHolder::allStrategies));
}
public int getUnleveledSSTables()
{
maybeReloadDiskBoundaries();
readLock.lock();
try
{
if (repaired.first() instanceof LeveledCompactionStrategy)
{
int count = 0;
for (AbstractCompactionStrategy strategy : getAllStrategies())
count += ((LeveledCompactionStrategy) strategy).getLevelSize(0);
return count;
}
}
finally
{
readLock.unlock();
}
return 0;
}
public int getLevelFanoutSize()
{
return fanout;
}
public int[] getSSTableCountPerLevel()
{
maybeReloadDiskBoundaries();
readLock.lock();
try
{
if (repaired.first() instanceof LeveledCompactionStrategy)
{
int[] res = new int[LeveledGenerations.MAX_LEVEL_COUNT];
for (AbstractCompactionStrategy strategy : getAllStrategies())
{
int[] repairedCountPerLevel = ((LeveledCompactionStrategy) strategy).getAllLevelSize();
res = sumArrays(res, repairedCountPerLevel);
}
return res;
}
}
finally
{
readLock.unlock();
}
return null;
}
public long[] getPerLevelSizeBytes()
{
readLock.lock();
try
{
if (repaired.first() instanceof LeveledCompactionStrategy)
{
long [] res = new long[LeveledGenerations.MAX_LEVEL_COUNT];
for (AbstractCompactionStrategy strategy : getAllStrategies())
{
long[] repairedCountPerLevel = ((LeveledCompactionStrategy) strategy).getAllLevelSizeBytes();
res = sumArrays(res, repairedCountPerLevel);
}
return res;
}
return null;
}
finally
{
readLock.unlock();
}
}
public int[] getSSTableCountPerTWCSBucket()
{
readLock.lock();
try
{
List<Map<Long, Integer>> countsByBucket = Stream.concat(
StreamSupport.stream(repaired.allStrategies().spliterator(), false),
StreamSupport.stream(unrepaired.allStrategies().spliterator(), false))
.filter((TimeWindowCompactionStrategy.class)::isInstance)
.map(s -> ((TimeWindowCompactionStrategy)s).getSSTableCountByBuckets())
.collect(Collectors.toList());
return countsByBucket.isEmpty() ? null : sumCountsByBucket(countsByBucket, TWCS_BUCKET_COUNT_MAX);
}
finally
{
readLock.unlock();
}
}
static int[] sumCountsByBucket(List<Map<Long, Integer>> countsByBucket, int max)
{
TreeMap<Long, Integer> merged = new TreeMap<>(Comparator.reverseOrder());
countsByBucket.stream().flatMap(e -> e.entrySet().stream()).forEach(e -> merged.merge(e.getKey(), e.getValue(), Integer::sum));
return merged.values().stream().limit(max).mapToInt(i -> i).toArray();
}
static int[] sumArrays(int[] a, int[] b)
{
int[] res = new int[Math.max(a.length, b.length)];
for (int i = 0; i < res.length; i++)
{
if (i < a.length && i < b.length)
res[i] = a[i] + b[i];
else if (i < a.length)
res[i] = a[i];
else
res[i] = b[i];
}
return res;
}
static long[] sumArrays(long[] a, long[] b)
{
long[] res = new long[Math.max(a.length, b.length)];
for (int i = 0; i < res.length; i++)
{
if (i < a.length && i < b.length)
res[i] = a[i] + b[i];
else if (i < a.length)
res[i] = a[i];
else
res[i] = b[i];
}
return res;
}
/**
* Should only be called holding the readLock
*/
private void handleFlushNotification(Iterable<SSTableReader> added)
{
for (SSTableReader sstable : added)
compactionStrategyFor(sstable).addSSTable(sstable);
}
private int getHolderIndex(SSTableReader sstable)
{
for (int i = 0; i < holders.size(); i++)
{
if (holders.get(i).managesSSTable(sstable))
return i;
}
throw new IllegalStateException("No holder claimed " + sstable);
}
private AbstractStrategyHolder getHolder(SSTableReader sstable)
{
for (AbstractStrategyHolder holder : holders)
{
if (holder.managesSSTable(sstable))
return holder;
}
throw new IllegalStateException("No holder claimed " + sstable);
}
private AbstractStrategyHolder getHolder(long repairedAt, TimeUUID pendingRepair, boolean isTransient)
{
return getHolder(repairedAt != ActiveRepairService.UNREPAIRED_SSTABLE,
pendingRepair != ActiveRepairService.NO_PENDING_REPAIR,
isTransient);
}
@VisibleForTesting
AbstractStrategyHolder getHolder(boolean isRepaired, boolean isPendingRepair, boolean isTransient)
{
for (AbstractStrategyHolder holder : holders)
{
if (holder.managesRepairedGroup(isRepaired, isPendingRepair, isTransient))
return holder;
}
throw new IllegalStateException(String.format("No holder claimed isPendingRepair: %s, isPendingRepair %s",
isRepaired, isPendingRepair));
}
@VisibleForTesting
ImmutableList<AbstractStrategyHolder> getHolders()
{
return holders;
}
/**
* Split sstables into a list of grouped sstable containers, the list index an sstable
*
* lives in matches the list index of the holder that's responsible for it
*/
public List<GroupedSSTableContainer> groupSSTables(Iterable<SSTableReader> sstables)
{
List<GroupedSSTableContainer> classified = new ArrayList<>(holders.size());
for (AbstractStrategyHolder holder : holders)
{
classified.add(holder.createGroupedSSTableContainer());
}
for (SSTableReader sstable : sstables)
{
classified.get(getHolderIndex(sstable)).add(sstable);
}
return classified;
}
/**
* Should only be called holding the readLock
*/
private void handleListChangedNotification(Iterable<SSTableReader> added, Iterable<SSTableReader> removed)
{
List<GroupedSSTableContainer> addedGroups = groupSSTables(added);
List<GroupedSSTableContainer> removedGroups = groupSSTables(removed);
for (int i=0; i<holders.size(); i++)
{
holders.get(i).replaceSSTables(removedGroups.get(i), addedGroups.get(i));
}
}
/**
* Should only be called holding the readLock
*/
private void handleRepairStatusChangedNotification(Iterable<SSTableReader> sstables)
{
List<GroupedSSTableContainer> groups = groupSSTables(sstables);
for (int i = 0; i < holders.size(); i++)
{
GroupedSSTableContainer group = groups.get(i);
if (group.isEmpty())
continue;
AbstractStrategyHolder dstHolder = holders.get(i);
for (AbstractStrategyHolder holder : holders)
{
if (holder != dstHolder)
holder.removeSSTables(group);
}
// adding sstables into another strategy may change its level,
// thus it won't be removed from original LCS. We have to remove sstables first
dstHolder.addSSTables(group);
}
}
/**
* Should only be called holding the readLock
*/
private void handleMetadataChangedNotification(SSTableReader sstable, StatsMetadata oldMetadata)
{
AbstractCompactionStrategy acs = getCompactionStrategyFor(sstable);
acs.metadataChanged(oldMetadata, sstable);
}
/**
* Should only be called holding the readLock
*/
private void handleDeletingNotification(SSTableReader deleted)
{
compactionStrategyFor(deleted).removeSSTable(deleted);
}
public void handleNotification(INotification notification, Object sender)
{
// we might race with reload adding/removing the sstables, this means that compaction strategies
// must handle double notifications.
maybeReloadDiskBoundaries();
readLock.lock();
try
{
if (notification instanceof SSTableAddedNotification)
{
SSTableAddedNotification flushedNotification = (SSTableAddedNotification) notification;
handleFlushNotification(flushedNotification.added);
}
else if (notification instanceof SSTableListChangedNotification)
{
SSTableListChangedNotification listChangedNotification = (SSTableListChangedNotification) notification;
handleListChangedNotification(listChangedNotification.added, listChangedNotification.removed);
}
else if (notification instanceof SSTableRepairStatusChanged)
{
handleRepairStatusChangedNotification(((SSTableRepairStatusChanged) notification).sstables);
}
else if (notification instanceof SSTableDeletingNotification)
{
handleDeletingNotification(((SSTableDeletingNotification) notification).deleting);
}
else if (notification instanceof SSTableMetadataChanged)
{
SSTableMetadataChanged lcNotification = (SSTableMetadataChanged) notification;
handleMetadataChangedNotification(lcNotification.sstable, lcNotification.oldMetadata);
}
}
finally
{
readLock.unlock();
}
}
public void enable()
{
writeLock.lock();
try
{
// enable this last to make sure the strategies are ready to get calls.
enabled = true;
}
finally
{
writeLock.unlock();
}
}
public void disable()
{
writeLock.lock();
try
{
enabled = false;
}
finally
{
writeLock.unlock();
}
}
/**
* Create ISSTableScanners from the given sstables
*
* Delegates the call to the compaction strategies to allow LCS to create a scanner
* @param sstables
* @param ranges
* @return
*/
@SuppressWarnings("resource")
public AbstractCompactionStrategy.ScannerList maybeGetScanners(Collection<SSTableReader> sstables, Collection<Range<Token>> ranges)
{
maybeReloadDiskBoundaries();
List<ISSTableScanner> scanners = new ArrayList<>(sstables.size());
readLock.lock();
try
{
List<GroupedSSTableContainer> sstableGroups = groupSSTables(sstables);
for (int i = 0; i < holders.size(); i++)
{
AbstractStrategyHolder holder = holders.get(i);
GroupedSSTableContainer group = sstableGroups.get(i);
scanners.addAll(holder.getScanners(group, ranges));
}
}
catch (PendingRepairManager.IllegalSSTableArgumentException e)
{
ISSTableScanner.closeAllAndPropagate(scanners, new ConcurrentModificationException(e));
}
finally
{
readLock.unlock();
}
return new AbstractCompactionStrategy.ScannerList(scanners);
}
public AbstractCompactionStrategy.ScannerList getScanners(Collection<SSTableReader> sstables, Collection<Range<Token>> ranges)
{
while (true)
{
try
{
return maybeGetScanners(sstables, ranges);
}
catch (ConcurrentModificationException e)
{
logger.debug("SSTable repairedAt/pendingRepaired values changed while getting scanners");
}
}
}
public AbstractCompactionStrategy.ScannerList getScanners(Collection<SSTableReader> sstables)
{
return getScanners(sstables, null);
}
public Collection<Collection<SSTableReader>> groupSSTablesForAntiCompaction(Collection<SSTableReader> sstablesToGroup)
{
maybeReloadDiskBoundaries();
readLock.lock();
try
{
return unrepaired.groupForAnticompaction(sstablesToGroup);
}
finally
{
readLock.unlock();
}
}
public long getMaxSSTableBytes()
{
return maxSSTableSizeBytes;
}
public AbstractCompactionTask getCompactionTask(LifecycleTransaction txn, int gcBefore, long maxSSTableBytes)
{
maybeReloadDiskBoundaries();
readLock.lock();
try
{
validateForCompaction(txn.originals());
return compactionStrategyFor(txn.originals().iterator().next()).getCompactionTask(txn, gcBefore, maxSSTableBytes);
}
finally
{
readLock.unlock();
}
}
private void validateForCompaction(Iterable<SSTableReader> input)
{
readLock.lock();
try
{
SSTableReader firstSSTable = Iterables.getFirst(input, null);
assert firstSSTable != null;
boolean repaired = firstSSTable.isRepaired();
int firstIndex = compactionStrategyIndexFor(firstSSTable);
boolean isPending = firstSSTable.isPendingRepair();
TimeUUID pendingRepair = firstSSTable.getSSTableMetadata().pendingRepair;
for (SSTableReader sstable : input)
{
if (sstable.isRepaired() != repaired)
throw new UnsupportedOperationException("You can't mix repaired and unrepaired data in a compaction");
if (firstIndex != compactionStrategyIndexFor(sstable))
throw new UnsupportedOperationException("You can't mix sstables from different directories in a compaction");
if (isPending && !pendingRepair.equals(sstable.getSSTableMetadata().pendingRepair))
throw new UnsupportedOperationException("You can't compact sstables from different pending repair sessions");
}
}
finally
{
readLock.unlock();
}
}
public CompactionTasks getMaximalTasks(final int gcBefore, final boolean splitOutput)
{
maybeReloadDiskBoundaries();
// runWithCompactionsDisabled cancels active compactions and disables them, then we are able
// to make the repaired/unrepaired strategies mark their own sstables as compacting. Once the
// sstables are marked the compactions are re-enabled
return cfs.runWithCompactionsDisabled(() -> {
List<AbstractCompactionTask> tasks = new ArrayList<>();
readLock.lock();
try
{
for (AbstractStrategyHolder holder : holders)
{
tasks.addAll(holder.getMaximalTasks(gcBefore, splitOutput));
}
}
finally
{
readLock.unlock();
}
return CompactionTasks.create(tasks);
}, false, false);
}
/**
* Return a list of compaction tasks corresponding to the sstables requested. Split the sstables according
* to whether they are repaired or not, and by disk location. Return a task per disk location and repair status
* group.
*
* @param sstables the sstables to compact
* @param gcBefore gc grace period, throw away tombstones older than this
* @return a list of compaction tasks corresponding to the sstables requested
*/
public CompactionTasks getUserDefinedTasks(Collection<SSTableReader> sstables, int gcBefore)
{
maybeReloadDiskBoundaries();
List<AbstractCompactionTask> ret = new ArrayList<>();
readLock.lock();
try
{
List<GroupedSSTableContainer> groupedSSTables = groupSSTables(sstables);
for (int i = 0; i < holders.size(); i++)
{
ret.addAll(holders.get(i).getUserDefinedTasks(groupedSSTables.get(i), gcBefore));
}
return CompactionTasks.create(ret);
}
finally
{
readLock.unlock();
}
}
public int getEstimatedRemainingTasks()
{
maybeReloadDiskBoundaries();
int tasks = 0;
readLock.lock();
try
{
for (AbstractCompactionStrategy strategy : getAllStrategies())
tasks += strategy.getEstimatedRemainingTasks();
}
finally
{
readLock.unlock();
}
return tasks;
}
public boolean shouldBeEnabled()
{
return params.isEnabled();
}
public String getName()
{
return name;
}
public List<List<AbstractCompactionStrategy>> getStrategies()
{
maybeReloadDiskBoundaries();
readLock.lock();
try
{
return Arrays.asList(Lists.newArrayList(repaired.allStrategies()),
Lists.newArrayList(unrepaired.allStrategies()),
Lists.newArrayList(pendingRepairs.allStrategies()));
}
finally
{
readLock.unlock();
}
}
public void setNewLocalCompactionStrategy(CompactionParams params)
{
logger.info("Switching local compaction strategy from {} to {}}", this.params, params);
writeLock.lock();
try
{
setStrategy(params);
if (shouldBeEnabled())
enable();
else
disable();
startup();
}
finally
{
writeLock.unlock();
}
}
private int getNumTokenPartitions()
{
return partitionSSTablesByTokenRange ? currentBoundaries.directories.size() : 1;
}
private void setStrategy(CompactionParams params)
{
int numPartitions = getNumTokenPartitions();
for (AbstractStrategyHolder holder : holders)
holder.setStrategy(params, numPartitions);
this.params = params;
}
public CompactionParams getCompactionParams()
{
return params;
}
public boolean onlyPurgeRepairedTombstones()
{
return Boolean.parseBoolean(params.options().get(AbstractCompactionStrategy.ONLY_PURGE_REPAIRED_TOMBSTONES));
}
public SSTableMultiWriter createSSTableMultiWriter(Descriptor descriptor,
long keyCount,
long repairedAt,
TimeUUID pendingRepair,
boolean isTransient,
MetadataCollector collector,
SerializationHeader header,
Collection<Index> indexes,
LifecycleNewTracker lifecycleNewTracker)
{
SSTable.validateRepairedMetadata(repairedAt, pendingRepair, isTransient);
maybeReloadDiskBoundaries();
readLock.lock();
try
{
return getHolder(repairedAt, pendingRepair, isTransient).createSSTableMultiWriter(descriptor,
keyCount,
repairedAt,
pendingRepair,
isTransient,
collector,
header,
indexes,
lifecycleNewTracker);
}
finally
{
readLock.unlock();
}
}
public boolean isRepaired(AbstractCompactionStrategy strategy)
{
return repaired.getStrategyIndex(strategy) >= 0;
}
public List<String> getStrategyFolders(AbstractCompactionStrategy strategy)
{
readLock.lock();
try
{
Directories.DataDirectory[] locations = cfs.getDirectories().getWriteableLocations();
if (partitionSSTablesByTokenRange)
{
for (AbstractStrategyHolder holder : holders)
{
int idx = holder.getStrategyIndex(strategy);
if (idx >= 0)
return Collections.singletonList(locations[idx].location.absolutePath());
}
}
List<String> folders = new ArrayList<>(locations.length);
for (Directories.DataDirectory location : locations)
{
folders.add(location.location.absolutePath());
}
return folders;
}
finally
{
readLock.unlock();
}
}
public boolean supportsEarlyOpen()
{
return supportsEarlyOpen;
}
@VisibleForTesting
List<PendingRepairManager> getPendingRepairManagers()
{
maybeReloadDiskBoundaries();
readLock.lock();
try
{
return Lists.newArrayList(pendingRepairs.getManagers());
}
finally
{
readLock.unlock();
}
}
/**
* Mutates sstable repairedAt times and notifies listeners of the change with the writeLock held. Prevents races
* with other processes between when the metadata is changed and when sstables are moved between strategies.
*/
public void mutateRepaired(Collection<SSTableReader> sstables, long repairedAt, TimeUUID pendingRepair, boolean isTransient) throws IOException
{
Set<SSTableReader> changed = new HashSet<>();
writeLock.lock();
try
{
for (SSTableReader sstable: sstables)
{
sstable.mutateRepairedAndReload(repairedAt, pendingRepair, isTransient);
verifyMetadata(sstable, repairedAt, pendingRepair, isTransient);
changed.add(sstable);
}
}
finally
{
try
{
// if there was an exception mutating repairedAt, we should still notify for the
// sstables that we were able to modify successfully before releasing the lock
cfs.getTracker().notifySSTableRepairedStatusChanged(changed);
}
finally
{
writeLock.unlock();
}
}
}
private static void verifyMetadata(SSTableReader sstable, long repairedAt, TimeUUID pendingRepair, boolean isTransient)
{
if (!Objects.equals(pendingRepair, sstable.getPendingRepair()))
throw new IllegalStateException(String.format("Failed setting pending repair to %s on %s (pending repair is %s)", pendingRepair, sstable, sstable.getPendingRepair()));
if (repairedAt != sstable.getRepairedAt())
throw new IllegalStateException(String.format("Failed setting repairedAt to %d on %s (repairedAt is %d)", repairedAt, sstable, sstable.getRepairedAt()));
if (isTransient != sstable.isTransient())
throw new IllegalStateException(String.format("Failed setting isTransient to %b on %s (isTransient is %b)", isTransient, sstable, sstable.isTransient()));
}
public CleanupSummary releaseRepairData(Collection<TimeUUID> sessions)
{
List<CleanupTask> cleanupTasks = new ArrayList<>();
readLock.lock();
try
{
for (PendingRepairManager prm : Iterables.concat(pendingRepairs.getManagers(), transientRepairs.getManagers()))
cleanupTasks.add(prm.releaseSessionData(sessions));
}
finally
{
readLock.unlock();
}
CleanupSummary summary = new CleanupSummary(cfs, Collections.emptySet(), Collections.emptySet());
for (CleanupTask task : cleanupTasks)
summary = CleanupSummary.add(summary, task.cleanup());
return summary;
}
}