blob: 95fc7b85b08f18420c96e0c99cc385c402af8e7c [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.cassandra.db.compaction;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.UUID;
import java.util.function.Supplier;
import com.google.common.base.Preconditions;
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.SerializationHeader;
import org.apache.cassandra.db.lifecycle.LifecycleNewTracker;
import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.index.Index;
import org.apache.cassandra.io.sstable.Descriptor;
import org.apache.cassandra.io.sstable.ISSTableScanner;
import org.apache.cassandra.io.sstable.SSTableMultiWriter;
import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
import org.apache.cassandra.schema.CompactionParams;
/**
* Wrapper that's aware of how sstables are divided between separate strategies,
* and provides a standard interface to them
*
* not threadsafe, calls must be synchronized by caller
*/
public abstract class AbstractStrategyHolder
{
public static class TaskSupplier implements Comparable<TaskSupplier>
{
private final int numRemaining;
private final Supplier<AbstractCompactionTask> supplier;
TaskSupplier(int numRemaining, Supplier<AbstractCompactionTask> supplier)
{
this.numRemaining = numRemaining;
this.supplier = supplier;
}
public AbstractCompactionTask getTask()
{
return supplier.get();
}
public int compareTo(TaskSupplier o)
{
return o.numRemaining - numRemaining;
}
}
public static interface DestinationRouter
{
int getIndexForSSTable(SSTableReader sstable);
int getIndexForSSTableDirectory(Descriptor descriptor);
}
/**
* Maps sstables to their token partition bucket
*/
public static class GroupedSSTableContainer
{
private final AbstractStrategyHolder holder;
private final Set<SSTableReader>[] groups;
private GroupedSSTableContainer(AbstractStrategyHolder holder)
{
this.holder = holder;
Preconditions.checkArgument(holder.numTokenPartitions > 0, "numTokenPartitions not set");
groups = new Set[holder.numTokenPartitions];
}
void add(SSTableReader sstable)
{
Preconditions.checkArgument(holder.managesSSTable(sstable), "this strategy holder doesn't manage %s", sstable);
int idx = holder.router.getIndexForSSTable(sstable);
Preconditions.checkState(idx >= 0 && idx < holder.numTokenPartitions, "Invalid sstable index (%s) for %s", idx, sstable);
if (groups[idx] == null)
groups[idx] = new HashSet<>();
groups[idx].add(sstable);
}
public int numGroups()
{
return groups.length;
}
public Set<SSTableReader> getGroup(int i)
{
Preconditions.checkArgument(i >= 0 && i < groups.length);
Set<SSTableReader> group = groups[i];
return group != null ? group : Collections.emptySet();
}
boolean isGroupEmpty(int i)
{
return getGroup(i).isEmpty();
}
boolean isEmpty()
{
for (int i = 0; i < groups.length; i++)
if (!isGroupEmpty(i))
return false;
return true;
}
}
protected final ColumnFamilyStore cfs;
final DestinationRouter router;
private int numTokenPartitions = -1;
AbstractStrategyHolder(ColumnFamilyStore cfs, DestinationRouter router)
{
this.cfs = cfs;
this.router = router;
}
public abstract void startup();
public abstract void shutdown();
final void setStrategy(CompactionParams params, int numTokenPartitions)
{
Preconditions.checkArgument(numTokenPartitions > 0, "at least one token partition required");
shutdown();
this.numTokenPartitions = numTokenPartitions;
setStrategyInternal(params, numTokenPartitions);
}
protected abstract void setStrategyInternal(CompactionParams params, int numTokenPartitions);
/**
* SSTables are grouped by their repaired and pending repair status. This method determines if this holder
* holds the sstable for the given repaired/grouped statuses. Holders should be mutually exclusive in the
* groups they deal with. IOW, if one holder returns true for a given isRepaired/isPendingRepair combo,
* none of the others should.
*/
public abstract boolean managesRepairedGroup(boolean isRepaired, boolean isPendingRepair, boolean isTransient);
public boolean managesSSTable(SSTableReader sstable)
{
return managesRepairedGroup(sstable.isRepaired(), sstable.isPendingRepair(), sstable.isTransient());
}
public abstract AbstractCompactionStrategy getStrategyFor(SSTableReader sstable);
public abstract Iterable<AbstractCompactionStrategy> allStrategies();
public abstract Collection<TaskSupplier> getBackgroundTaskSuppliers(int gcBefore);
public abstract Collection<AbstractCompactionTask> getMaximalTasks(int gcBefore, boolean splitOutput);
public abstract Collection<AbstractCompactionTask> getUserDefinedTasks(GroupedSSTableContainer sstables, int gcBefore);
public GroupedSSTableContainer createGroupedSSTableContainer()
{
return new GroupedSSTableContainer(this);
}
public abstract void addSSTables(GroupedSSTableContainer sstables);
public abstract void removeSSTables(GroupedSSTableContainer sstables);
public abstract void replaceSSTables(GroupedSSTableContainer removed, GroupedSSTableContainer added);
public abstract List<ISSTableScanner> getScanners(GroupedSSTableContainer sstables, Collection<Range<Token>> ranges);
public abstract SSTableMultiWriter createSSTableMultiWriter(Descriptor descriptor,
long keyCount,
long repairedAt,
UUID pendingRepair,
boolean isTransient,
MetadataCollector collector,
SerializationHeader header,
Collection<Index> indexes,
LifecycleNewTracker lifecycleNewTracker);
/**
* Return the directory index the given compaction strategy belongs to, or -1
* if it's not held by this holder
*/
public abstract int getStrategyIndex(AbstractCompactionStrategy strategy);
public abstract boolean containsSSTable(SSTableReader sstable);
}