blob: 86170a1050bbe043cec582d6e1406200bf41ad80 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.cassandra.db.compaction;
import java.util.*;
import java.util.concurrent.Callable;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Iterables;
import com.google.common.primitives.Ints;
import org.apache.cassandra.db.lifecycle.LifecycleNewTracker;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.Directories;
import org.apache.cassandra.db.DiskBoundaries;
import org.apache.cassandra.db.Memtable;
import org.apache.cassandra.db.SerializationHeader;
import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
import org.apache.cassandra.db.lifecycle.SSTableSet;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.index.Index;
import org.apache.cassandra.io.sstable.Descriptor;
import org.apache.cassandra.io.sstable.SSTableMultiWriter;
import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.apache.cassandra.io.sstable.ISSTableScanner;
import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
import org.apache.cassandra.notifications.*;
import org.apache.cassandra.schema.CompactionParams;
import org.apache.cassandra.service.ActiveRepairService;
/**
* Manages the compaction strategies.
*
* Currently has two instances of actual compaction strategies per data directory - one for repaired data and one for
* unrepaired data. This is done to be able to totally separate the different sets of sstables.
*
* Operations on this class are guarded by a {@link ReentrantReadWriteLock}. This lock performs mutual exclusion on
* reads and writes to the following variables: {@link this#repaired}, {@link this#unrepaired}, {@link this#isActive},
* {@link this#params}, {@link this#currentBoundaries}. Whenever performing reads on these variables,
* the {@link this#readLock} should be acquired. Likewise, updates to these variables should be guarded by
* {@link this#writeLock}.
*
* Whenever the {@link DiskBoundaries} change, the compaction strategies must be reloaded, so in order to ensure
* the compaction strategy placement reflect most up-to-date disk boundaries, call {@link this#maybeReloadDiskBoundaries()}
* before acquiring the read lock to acess the strategies.
*
*/
public class CompactionStrategyManager implements INotificationConsumer
{
private static final Logger logger = LoggerFactory.getLogger(CompactionStrategyManager.class);
public final CompactionLogger compactionLogger;
private final ColumnFamilyStore cfs;
private final boolean partitionSSTablesByTokenRange;
private final Supplier<DiskBoundaries> boundariesSupplier;
/**
* Performs mutual exclusion on the variables below
*/
private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
private final ReentrantReadWriteLock.ReadLock readLock = lock.readLock();
private final ReentrantReadWriteLock.WriteLock writeLock = lock.writeLock();
/**
* Variables guarded by read and write lock above
*/
//TODO check possibility of getting rid of these locks by encapsulating these in an immutable atomic object
private final List<AbstractCompactionStrategy> repaired = new ArrayList<>();
private final List<AbstractCompactionStrategy> unrepaired = new ArrayList<>();
private volatile CompactionParams params;
private DiskBoundaries currentBoundaries;
private volatile boolean enabled = true;
private volatile boolean isActive = true;
/**
We keep a copy of the schema compaction parameters here to be able to decide if we
should update the compaction strategy in {@link this#maybeReload(CFMetaData)} due to an ALTER.
If a user changes the local compaction strategy and then later ALTERs a compaction parameter,
we will use the new compaction parameters.
**/
private volatile CompactionParams schemaCompactionParams;
private boolean shouldDefragment;
private boolean supportsEarlyOpen;
private int fanout;
public CompactionStrategyManager(ColumnFamilyStore cfs)
{
this(cfs, cfs::getDiskBoundaries, cfs.getPartitioner().splitter().isPresent());
}
@VisibleForTesting
public CompactionStrategyManager(ColumnFamilyStore cfs, Supplier<DiskBoundaries> boundariesSupplier,
boolean partitionSSTablesByTokenRange)
{
cfs.getTracker().subscribe(this);
logger.trace("{} subscribed to the data tracker.", this);
this.cfs = cfs;
this.compactionLogger = new CompactionLogger(cfs, this);
this.boundariesSupplier = boundariesSupplier;
this.partitionSSTablesByTokenRange = partitionSSTablesByTokenRange;
params = cfs.metadata.params.compaction;
enabled = params.isEnabled();
reload(cfs.metadata.params.compaction);
}
/**
* Return the next background task
*
* Returns a task for the compaction strategy that needs it the most (most estimated remaining tasks)
*
*/
public AbstractCompactionTask getNextBackgroundTask(int gcBefore)
{
maybeReloadDiskBoundaries();
readLock.lock();
try
{
if (!isEnabled())
return null;
List<AbstractCompactionStrategy> strategies = new ArrayList<>();
strategies.addAll(repaired);
strategies.addAll(unrepaired);
Collections.sort(strategies, (o1, o2) -> Ints.compare(o2.getEstimatedRemainingTasks(), o1.getEstimatedRemainingTasks()));
for (AbstractCompactionStrategy strategy : strategies)
{
AbstractCompactionTask task = strategy.getNextBackgroundTask(gcBefore);
if (task != null)
return task;
}
}
finally
{
readLock.unlock();
}
return null;
}
public boolean isEnabled()
{
return enabled && isActive;
}
public boolean isActive()
{
return isActive;
}
public void resume()
{
writeLock.lock();
try
{
isActive = true;
}
finally
{
writeLock.unlock();
}
}
/**
* pause compaction while we cancel all ongoing compactions
*
* Separate call from enable/disable to not have to save the enabled-state externally
*/
public void pause()
{
writeLock.lock();
try
{
isActive = false;
}
finally
{
writeLock.unlock();
}
}
private void startup()
{
writeLock.lock();
try
{
for (SSTableReader sstable : cfs.getSSTables(SSTableSet.CANONICAL))
{
if (sstable.openReason != SSTableReader.OpenReason.EARLY)
compactionStrategyFor(sstable).addSSTable(sstable);
}
repaired.forEach(AbstractCompactionStrategy::startup);
unrepaired.forEach(AbstractCompactionStrategy::startup);
shouldDefragment = repaired.get(0).shouldDefragment();
supportsEarlyOpen = repaired.get(0).supportsEarlyOpen();
fanout = (repaired.get(0) instanceof LeveledCompactionStrategy) ? ((LeveledCompactionStrategy) repaired.get(0)).getLevelFanoutSize() : LeveledCompactionStrategy.DEFAULT_LEVEL_FANOUT_SIZE;
}
finally
{
writeLock.unlock();
}
repaired.forEach(AbstractCompactionStrategy::startup);
unrepaired.forEach(AbstractCompactionStrategy::startup);
if (Stream.concat(repaired.stream(), unrepaired.stream()).anyMatch(cs -> cs.logAll))
compactionLogger.enable();
}
/**
* return the compaction strategy for the given sstable
*
* returns differently based on the repaired status and which vnode the compaction strategy belongs to
* @param sstable
* @return
*/
protected AbstractCompactionStrategy getCompactionStrategyFor(SSTableReader sstable)
{
maybeReloadDiskBoundaries();
return compactionStrategyFor(sstable);
}
@VisibleForTesting
protected AbstractCompactionStrategy compactionStrategyFor(SSTableReader sstable)
{
// should not call maybeReloadDiskBoundaries because it may be called from within lock
readLock.lock();
try
{
int index = compactionStrategyIndexFor(sstable);
if (sstable.isRepaired())
return repaired.get(index);
else
return unrepaired.get(index);
}
finally
{
readLock.unlock();
}
}
/**
* Get the correct compaction strategy for the given sstable. If the first token starts within a disk boundary, we
* will add it to that compaction strategy.
*
* In the case we are upgrading, the first compaction strategy will get most files - we do not care about which disk
* the sstable is on currently (unless we don't know the local tokens yet). Once we start compacting we will write out
* sstables in the correct locations and give them to the correct compaction strategy instance.
*
* @param sstable
* @return
*/
@VisibleForTesting
protected int compactionStrategyIndexFor(SSTableReader sstable)
{
// should not call maybeReload because it may be called from within lock
readLock.lock();
try
{
//We only have a single compaction strategy when sstables are not
//partitioned by token range
if (!partitionSSTablesByTokenRange)
return 0;
return currentBoundaries.getDiskIndex(sstable);
}
finally
{
readLock.unlock();
}
}
public void shutdown()
{
writeLock.lock();
try
{
isActive = false;
repaired.forEach(AbstractCompactionStrategy::shutdown);
unrepaired.forEach(AbstractCompactionStrategy::shutdown);
compactionLogger.disable();
}
finally
{
writeLock.unlock();
}
}
public void maybeReload(CFMetaData metadata)
{
// compare the old schema configuration to the new one, ignore any locally set changes.
if (metadata.params.compaction.equals(schemaCompactionParams))
return;
writeLock.lock();
try
{
// compare the old schema configuration to the new one, ignore any locally set changes.
if (metadata.params.compaction.equals(schemaCompactionParams))
return;
reload(metadata.params.compaction);
}
finally
{
writeLock.unlock();
}
}
/**
* Checks if the disk boundaries changed and reloads the compaction strategies
* to reflect the most up-to-date disk boundaries.
*
* This is typically called before acquiring the {@link this#readLock} to ensure the most up-to-date
* disk locations and boundaries are used.
*
* This should *never* be called inside by a thread holding the {@link this#readLock}, since it
* will potentially acquire the {@link this#writeLock} to update the compaction strategies
* what can cause a deadlock.
*/
//TODO improve this to reload after receiving a notification rather than trying to reload on every operation
@VisibleForTesting
protected boolean maybeReloadDiskBoundaries()
{
if (!currentBoundaries.isOutOfDate())
return false;
writeLock.lock();
try
{
if (!currentBoundaries.isOutOfDate())
return false;
reload(params);
return true;
}
finally
{
writeLock.unlock();
}
}
/**
* Reload the compaction strategies
*
* Called after changing configuration and at startup.
* @param newCompactionParams
*/
private void reload(CompactionParams newCompactionParams)
{
boolean enabledWithJMX = enabled && !shouldBeEnabled();
boolean disabledWithJMX = !enabled && shouldBeEnabled();
if (currentBoundaries != null)
{
if (!newCompactionParams.equals(schemaCompactionParams))
logger.debug("Recreating compaction strategy - compaction parameters changed for {}.{}", cfs.keyspace.getName(), cfs.getTableName());
else if (currentBoundaries.isOutOfDate())
logger.debug("Recreating compaction strategy - disk boundaries are out of date for {}.{}.", cfs.keyspace.getName(), cfs.getTableName());
}
if (currentBoundaries == null || currentBoundaries.isOutOfDate())
currentBoundaries = boundariesSupplier.get();
setStrategy(newCompactionParams);
schemaCompactionParams = cfs.metadata.params.compaction;
if (disabledWithJMX || !shouldBeEnabled() && !enabledWithJMX)
disable();
else
enable();
startup();
}
public void replaceFlushed(Memtable memtable, Collection<SSTableReader> sstables)
{
cfs.getTracker().replaceFlushed(memtable, sstables);
if (sstables != null && !sstables.isEmpty())
CompactionManager.instance.submitBackground(cfs);
}
public int getUnleveledSSTables()
{
maybeReloadDiskBoundaries();
readLock.lock();
try
{
if (repaired.get(0) instanceof LeveledCompactionStrategy && unrepaired.get(0) instanceof LeveledCompactionStrategy)
{
int count = 0;
for (AbstractCompactionStrategy strategy : repaired)
count += ((LeveledCompactionStrategy) strategy).getLevelSize(0);
for (AbstractCompactionStrategy strategy : unrepaired)
count += ((LeveledCompactionStrategy) strategy).getLevelSize(0);
return count;
}
}
finally
{
readLock.unlock();
}
return 0;
}
public int getLevelFanoutSize()
{
return fanout;
}
public int[] getSSTableCountPerLevel()
{
maybeReloadDiskBoundaries();
readLock.lock();
try
{
if (repaired.get(0) instanceof LeveledCompactionStrategy && unrepaired.get(0) instanceof LeveledCompactionStrategy)
{
int[] res = new int[LeveledManifest.MAX_LEVEL_COUNT];
for (AbstractCompactionStrategy strategy : repaired)
{
int[] repairedCountPerLevel = ((LeveledCompactionStrategy) strategy).getAllLevelSize();
res = sumArrays(res, repairedCountPerLevel);
}
for (AbstractCompactionStrategy strategy : unrepaired)
{
int[] unrepairedCountPerLevel = ((LeveledCompactionStrategy) strategy).getAllLevelSize();
res = sumArrays(res, unrepairedCountPerLevel);
}
return res;
}
}
finally
{
readLock.unlock();
}
return null;
}
private static int[] sumArrays(int[] a, int[] b)
{
int[] res = new int[Math.max(a.length, b.length)];
for (int i = 0; i < res.length; i++)
{
if (i < a.length && i < b.length)
res[i] = a[i] + b[i];
else if (i < a.length)
res[i] = a[i];
else
res[i] = b[i];
}
return res;
}
public boolean shouldDefragment()
{
return shouldDefragment;
}
public Directories getDirectories()
{
maybeReloadDiskBoundaries();
readLock.lock();
try
{
assert repaired.get(0).getClass().equals(unrepaired.get(0).getClass());
return repaired.get(0).getDirectories();
}
finally
{
readLock.unlock();
}
}
private void handleFlushNotification(Iterable<SSTableReader> added)
{
// If reloaded, SSTables will be placed in their correct locations
// so there is no need to process notification
if (maybeReloadDiskBoundaries())
return;
readLock.lock();
try
{
for (SSTableReader sstable : added)
compactionStrategyFor(sstable).addSSTable(sstable);
}
finally
{
readLock.unlock();
}
}
private void handleListChangedNotification(Iterable<SSTableReader> added, Iterable<SSTableReader> removed)
{
// If reloaded, SSTables will be placed in their correct locations
// so there is no need to process notification
if (maybeReloadDiskBoundaries())
return;
readLock.lock();
try
{
// a bit of gymnastics to be able to replace sstables in compaction strategies
// we use this to know that a compaction finished and where to start the next compaction in LCS
int locationSize = partitionSSTablesByTokenRange? currentBoundaries.directories.size() : 1;
List<Set<SSTableReader>> repairedRemoved = new ArrayList<>(locationSize);
List<Set<SSTableReader>> repairedAdded = new ArrayList<>(locationSize);
List<Set<SSTableReader>> unrepairedRemoved = new ArrayList<>(locationSize);
List<Set<SSTableReader>> unrepairedAdded = new ArrayList<>(locationSize);
for (int i = 0; i < locationSize; i++)
{
repairedRemoved.add(new HashSet<>());
repairedAdded.add(new HashSet<>());
unrepairedRemoved.add(new HashSet<>());
unrepairedAdded.add(new HashSet<>());
}
for (SSTableReader sstable : removed)
{
int i = compactionStrategyIndexFor(sstable);
if (sstable.isRepaired())
repairedRemoved.get(i).add(sstable);
else
unrepairedRemoved.get(i).add(sstable);
}
for (SSTableReader sstable : added)
{
int i = compactionStrategyIndexFor(sstable);
if (sstable.isRepaired())
repairedAdded.get(i).add(sstable);
else
unrepairedAdded.get(i).add(sstable);
}
for (int i = 0; i < locationSize; i++)
{
if (!repairedRemoved.get(i).isEmpty())
repaired.get(i).replaceSSTables(repairedRemoved.get(i), repairedAdded.get(i));
else
repaired.get(i).addSSTables(repairedAdded.get(i));
if (!unrepairedRemoved.get(i).isEmpty())
unrepaired.get(i).replaceSSTables(unrepairedRemoved.get(i), unrepairedAdded.get(i));
else
unrepaired.get(i).addSSTables(unrepairedAdded.get(i));
}
}
finally
{
readLock.unlock();
}
}
private void handleRepairStatusChangedNotification(Iterable<SSTableReader> sstables)
{
// If reloaded, SSTables will be placed in their correct locations
// so there is no need to process notification
if (maybeReloadDiskBoundaries())
return;
// we need a write lock here since we move sstables from one strategy instance to another
readLock.lock();
try
{
for (SSTableReader sstable : sstables)
{
int index = compactionStrategyIndexFor(sstable);
if (sstable.isRepaired())
{
unrepaired.get(index).removeSSTable(sstable);
repaired.get(index).addSSTable(sstable);
}
else
{
repaired.get(index).removeSSTable(sstable);
unrepaired.get(index).addSSTable(sstable);
}
}
}
finally
{
readLock.unlock();
}
}
private void handleDeletingNotification(SSTableReader deleted)
{
// If reloaded, SSTables will be placed in their correct locations
// so there is no need to process notification
if (maybeReloadDiskBoundaries())
return;
readLock.lock();
try
{
compactionStrategyFor(deleted).removeSSTable(deleted);
}
finally
{
readLock.unlock();
}
}
public void handleNotification(INotification notification, Object sender)
{
if (notification instanceof SSTableAddedNotification)
{
handleFlushNotification(((SSTableAddedNotification) notification).added);
}
else if (notification instanceof SSTableListChangedNotification)
{
SSTableListChangedNotification listChangedNotification = (SSTableListChangedNotification) notification;
handleListChangedNotification(listChangedNotification.added, listChangedNotification.removed);
}
else if (notification instanceof SSTableRepairStatusChanged)
{
handleRepairStatusChangedNotification(((SSTableRepairStatusChanged) notification).sstables);
}
else if (notification instanceof SSTableDeletingNotification)
{
handleDeletingNotification(((SSTableDeletingNotification) notification).deleting);
}
}
public void enable()
{
writeLock.lock();
try
{
if (repaired != null)
repaired.forEach(AbstractCompactionStrategy::enable);
if (unrepaired != null)
unrepaired.forEach(AbstractCompactionStrategy::enable);
// enable this last to make sure the strategies are ready to get calls.
enabled = true;
}
finally
{
writeLock.unlock();
}
}
public void disable()
{
writeLock.lock();
try
{
// disable this first avoid asking disabled strategies for compaction tasks
enabled = false;
if (repaired != null)
repaired.forEach(AbstractCompactionStrategy::disable);
if (unrepaired != null)
unrepaired.forEach(AbstractCompactionStrategy::disable);
}
finally
{
writeLock.unlock();
}
}
/**
* Create ISSTableScanners from the given sstables
*
* Delegates the call to the compaction strategies to allow LCS to create a scanner
* @param sstables
* @param ranges
* @return
*/
@SuppressWarnings("resource")
public AbstractCompactionStrategy.ScannerList getScanners(Collection<SSTableReader> sstables, Collection<Range<Token>> ranges)
{
maybeReloadDiskBoundaries();
readLock.lock();
try
{
assert repaired.size() == unrepaired.size();
List<Set<SSTableReader>> repairedSSTables = new ArrayList<>();
List<Set<SSTableReader>> unrepairedSSTables = new ArrayList<>();
for (int i = 0; i < repaired.size(); i++)
{
repairedSSTables.add(new HashSet<>());
unrepairedSSTables.add(new HashSet<>());
}
for (SSTableReader sstable : sstables)
{
if (sstable.isRepaired())
repairedSSTables.get(compactionStrategyIndexFor(sstable)).add(sstable);
else
unrepairedSSTables.get(compactionStrategyIndexFor(sstable)).add(sstable);
}
List<ISSTableScanner> scanners = new ArrayList<>(sstables.size());
for (int i = 0; i < repairedSSTables.size(); i++)
{
if (!repairedSSTables.get(i).isEmpty())
scanners.addAll(repaired.get(i).getScanners(repairedSSTables.get(i), ranges).scanners);
}
for (int i = 0; i < unrepairedSSTables.size(); i++)
{
if (!unrepairedSSTables.get(i).isEmpty())
scanners.addAll(unrepaired.get(i).getScanners(unrepairedSSTables.get(i), ranges).scanners);
}
return new AbstractCompactionStrategy.ScannerList(scanners);
}
finally
{
readLock.unlock();
}
}
public AbstractCompactionStrategy.ScannerList getScanners(Collection<SSTableReader> sstables)
{
return getScanners(sstables, null);
}
public Collection<Collection<SSTableReader>> groupSSTablesForAntiCompaction(Collection<SSTableReader> sstablesToGroup)
{
maybeReloadDiskBoundaries();
readLock.lock();
try
{
Map<Integer, List<SSTableReader>> groups = sstablesToGroup.stream().collect(Collectors.groupingBy((s) -> compactionStrategyIndexFor(s)));
Collection<Collection<SSTableReader>> anticompactionGroups = new ArrayList<>();
for (Map.Entry<Integer, List<SSTableReader>> group : groups.entrySet())
anticompactionGroups.addAll(unrepaired.get(group.getKey()).groupSSTablesForAntiCompaction(group.getValue()));
return anticompactionGroups;
}
finally
{
readLock.unlock();
}
}
public long getMaxSSTableBytes()
{
readLock.lock();
try
{
return unrepaired.get(0).getMaxSSTableBytes();
}
finally
{
readLock.unlock();
}
}
public AbstractCompactionTask getCompactionTask(LifecycleTransaction txn, int gcBefore, long maxSSTableBytes)
{
maybeReloadDiskBoundaries();
readLock.lock();
try
{
validateForCompaction(txn.originals());
return compactionStrategyFor(txn.originals().iterator().next()).getCompactionTask(txn, gcBefore, maxSSTableBytes);
}
finally
{
readLock.unlock();
}
}
private void validateForCompaction(Iterable<SSTableReader> input)
{
readLock.lock();
try
{
SSTableReader firstSSTable = Iterables.getFirst(input, null);
assert firstSSTable != null;
boolean repaired = firstSSTable.isRepaired();
int firstIndex = compactionStrategyIndexFor(firstSSTable);
for (SSTableReader sstable : input)
{
if (sstable.isRepaired() != repaired)
throw new UnsupportedOperationException("You can't mix repaired and unrepaired data in a compaction");
if (firstIndex != compactionStrategyIndexFor(sstable))
throw new UnsupportedOperationException("You can't mix sstables from different directories in a compaction");
}
}
finally
{
readLock.unlock();
}
}
public Collection<AbstractCompactionTask> getMaximalTasks(final int gcBefore, final boolean splitOutput)
{
maybeReloadDiskBoundaries();
// runWithCompactionsDisabled cancels active compactions and disables them, then we are able
// to make the repaired/unrepaired strategies mark their own sstables as compacting. Once the
// sstables are marked the compactions are re-enabled
return cfs.runWithCompactionsDisabled(new Callable<Collection<AbstractCompactionTask>>()
{
@Override
public Collection<AbstractCompactionTask> call()
{
List<AbstractCompactionTask> tasks = new ArrayList<>();
readLock.lock();
try
{
for (AbstractCompactionStrategy strategy : repaired)
{
Collection<AbstractCompactionTask> task = strategy.getMaximalTask(gcBefore, splitOutput);
if (task != null)
tasks.addAll(task);
}
for (AbstractCompactionStrategy strategy : unrepaired)
{
Collection<AbstractCompactionTask> task = strategy.getMaximalTask(gcBefore, splitOutput);
if (task != null)
tasks.addAll(task);
}
}
finally
{
readLock.unlock();
}
if (tasks.isEmpty())
return null;
return tasks;
}
}, false, false);
}
/**
* Return a list of compaction tasks corresponding to the sstables requested. Split the sstables according
* to whether they are repaired or not, and by disk location. Return a task per disk location and repair status
* group.
*
* @param sstables the sstables to compact
* @param gcBefore gc grace period, throw away tombstones older than this
* @return a list of compaction tasks corresponding to the sstables requested
*/
public List<AbstractCompactionTask> getUserDefinedTasks(Collection<SSTableReader> sstables, int gcBefore)
{
return getUserDefinedTasks(sstables, gcBefore, false);
}
public List<AbstractCompactionTask> getUserDefinedTasks(Collection<SSTableReader> sstables, int gcBefore, boolean validateForCompaction)
{
maybeReloadDiskBoundaries();
List<AbstractCompactionTask> ret = new ArrayList<>();
readLock.lock();
try
{
if (validateForCompaction)
validateForCompaction(sstables);
Map<Integer, List<SSTableReader>> repairedSSTables = sstables.stream()
.filter(s -> !s.isMarkedSuspect() && s.isRepaired())
.collect(Collectors.groupingBy((s) -> compactionStrategyIndexFor(s)));
Map<Integer, List<SSTableReader>> unrepairedSSTables = sstables.stream()
.filter(s -> !s.isMarkedSuspect() && !s.isRepaired())
.collect(Collectors.groupingBy((s) -> compactionStrategyIndexFor(s)));
for (Map.Entry<Integer, List<SSTableReader>> group : repairedSSTables.entrySet())
ret.add(repaired.get(group.getKey()).getUserDefinedTask(group.getValue(), gcBefore));
for (Map.Entry<Integer, List<SSTableReader>> group : unrepairedSSTables.entrySet())
ret.add(unrepaired.get(group.getKey()).getUserDefinedTask(group.getValue(), gcBefore));
return ret;
}
finally
{
readLock.unlock();
}
}
/**
* @deprecated use {@link #getUserDefinedTasks(Collection, int)} instead.
*/
@Deprecated()
public AbstractCompactionTask getUserDefinedTask(Collection<SSTableReader> sstables, int gcBefore)
{
List<AbstractCompactionTask> tasks = getUserDefinedTasks(sstables, gcBefore, true);
assert tasks.size() == 1;
return tasks.get(0);
}
public int getEstimatedRemainingTasks()
{
maybeReloadDiskBoundaries();
int tasks = 0;
readLock.lock();
try
{
for (AbstractCompactionStrategy strategy : repaired)
tasks += strategy.getEstimatedRemainingTasks();
for (AbstractCompactionStrategy strategy : unrepaired)
tasks += strategy.getEstimatedRemainingTasks();
}
finally
{
readLock.unlock();
}
return tasks;
}
public boolean shouldBeEnabled()
{
return params.isEnabled();
}
public String getName()
{
maybeReloadDiskBoundaries();
readLock.lock();
try
{
return unrepaired.get(0).getName();
}
finally
{
readLock.unlock();
}
}
public List<List<AbstractCompactionStrategy>> getStrategies()
{
maybeReloadDiskBoundaries();
readLock.lock();
try
{
return Arrays.asList(repaired, unrepaired);
}
finally
{
readLock.unlock();
}
}
public void setNewLocalCompactionStrategy(CompactionParams params)
{
logger.info("Switching local compaction strategy from {} to {}}", this.params, params);
writeLock.lock();
try
{
setStrategy(params);
if (shouldBeEnabled())
enable();
else
disable();
startup();
}
finally
{
writeLock.unlock();
}
}
private void setStrategy(CompactionParams params)
{
repaired.forEach(AbstractCompactionStrategy::shutdown);
unrepaired.forEach(AbstractCompactionStrategy::shutdown);
repaired.clear();
unrepaired.clear();
if (partitionSSTablesByTokenRange)
{
for (int i = 0; i < currentBoundaries.directories.size(); i++)
{
repaired.add(CFMetaData.createCompactionStrategyInstance(cfs, params));
unrepaired.add(CFMetaData.createCompactionStrategyInstance(cfs, params));
}
}
else
{
repaired.add(CFMetaData.createCompactionStrategyInstance(cfs, params));
unrepaired.add(CFMetaData.createCompactionStrategyInstance(cfs, params));
}
this.params = params;
}
public CompactionParams getCompactionParams()
{
return params;
}
public boolean onlyPurgeRepairedTombstones()
{
return Boolean.parseBoolean(params.options().get(AbstractCompactionStrategy.ONLY_PURGE_REPAIRED_TOMBSTONES));
}
public SSTableMultiWriter createSSTableMultiWriter(Descriptor descriptor,
long keyCount,
long repairedAt,
MetadataCollector collector,
SerializationHeader header,
Collection<Index> indexes,
LifecycleNewTracker lifecycleNewTracker)
{
maybeReloadDiskBoundaries();
readLock.lock();
try
{
if (repairedAt == ActiveRepairService.UNREPAIRED_SSTABLE)
{
return unrepaired.get(0).createSSTableMultiWriter(descriptor, keyCount, repairedAt, collector, header, indexes, lifecycleNewTracker);
}
else
{
return repaired.get(0).createSSTableMultiWriter(descriptor, keyCount, repairedAt, collector, header, indexes, lifecycleNewTracker);
}
}
finally
{
readLock.unlock();
}
}
public boolean isRepaired(AbstractCompactionStrategy strategy)
{
readLock.lock();
try
{
return repaired.contains(strategy);
}
finally
{
readLock.unlock();
}
}
public List<String> getStrategyFolders(AbstractCompactionStrategy strategy)
{
readLock.lock();
try
{
List<Directories.DataDirectory> locations = currentBoundaries.directories;
if (partitionSSTablesByTokenRange)
{
int unrepairedIndex = unrepaired.indexOf(strategy);
if (unrepairedIndex > 0)
{
return Collections.singletonList(locations.get(unrepairedIndex).location.getAbsolutePath());
}
int repairedIndex = repaired.indexOf(strategy);
if (repairedIndex > 0)
{
return Collections.singletonList(locations.get(repairedIndex).location.getAbsolutePath());
}
}
List<String> folders = new ArrayList<>(locations.size());
for (Directories.DataDirectory location : locations)
{
folders.add(location.location.getAbsolutePath());
}
return folders;
}
finally
{
readLock.unlock();
}
}
public boolean supportsEarlyOpen()
{
return supportsEarlyOpen;
}
}