blob: 03d41117450343dd3df9cb6b5725a70131e1b940 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.cassandra.db.compaction;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.UUID;
import com.google.common.base.Preconditions;
import com.google.common.collect.Iterables;
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.SerializationHeader;
import org.apache.cassandra.db.lifecycle.LifecycleNewTracker;
import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.index.Index;
import org.apache.cassandra.io.sstable.Descriptor;
import org.apache.cassandra.io.sstable.ISSTableScanner;
import org.apache.cassandra.io.sstable.SSTableMultiWriter;
import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
import org.apache.cassandra.schema.CompactionParams;
import org.apache.cassandra.service.ActiveRepairService;
public class PendingRepairHolder extends AbstractStrategyHolder
{
private final List<PendingRepairManager> managers = new ArrayList<>();
private final boolean isTransient;
public PendingRepairHolder(ColumnFamilyStore cfs, DestinationRouter router, boolean isTransient)
{
super(cfs, router);
this.isTransient = isTransient;
}
@Override
public void startup()
{
managers.forEach(PendingRepairManager::startup);
}
@Override
public void shutdown()
{
managers.forEach(PendingRepairManager::shutdown);
}
@Override
public void setStrategyInternal(CompactionParams params, int numTokenPartitions)
{
managers.clear();
for (int i = 0; i < numTokenPartitions; i++)
managers.add(new PendingRepairManager(cfs, params, isTransient));
}
@Override
public boolean managesRepairedGroup(boolean isRepaired, boolean isPendingRepair, boolean isTransient)
{
Preconditions.checkArgument(!isPendingRepair || !isRepaired,
"SSTables cannot be both repaired and pending repair");
return isPendingRepair && (this.isTransient == isTransient);
}
@Override
public AbstractCompactionStrategy getStrategyFor(SSTableReader sstable)
{
Preconditions.checkArgument(managesSSTable(sstable), "Attempting to get compaction strategy from wrong holder");
return managers.get(router.getIndexForSSTable(sstable)).getOrCreate(sstable);
}
@Override
public Iterable<AbstractCompactionStrategy> allStrategies()
{
return Iterables.concat(Iterables.transform(managers, PendingRepairManager::getStrategies));
}
Iterable<AbstractCompactionStrategy> getStrategiesFor(UUID session)
{
List<AbstractCompactionStrategy> strategies = new ArrayList<>(managers.size());
for (PendingRepairManager manager : managers)
{
AbstractCompactionStrategy strategy = manager.get(session);
if (strategy != null)
strategies.add(strategy);
}
return strategies;
}
public Iterable<PendingRepairManager> getManagers()
{
return managers;
}
@Override
public Collection<TaskSupplier> getBackgroundTaskSuppliers(int gcBefore)
{
List<TaskSupplier> suppliers = new ArrayList<>(managers.size());
for (PendingRepairManager manager : managers)
suppliers.add(new TaskSupplier(manager.getMaxEstimatedRemainingTasks(), () -> manager.getNextBackgroundTask(gcBefore)));
return suppliers;
}
@Override
public Collection<AbstractCompactionTask> getMaximalTasks(int gcBefore, boolean splitOutput)
{
List<AbstractCompactionTask> tasks = new ArrayList<>(managers.size());
for (PendingRepairManager manager : managers)
{
Collection<AbstractCompactionTask> task = manager.getMaximalTasks(gcBefore, splitOutput);
if (task != null)
tasks.addAll(task);
}
return tasks;
}
@Override
public Collection<AbstractCompactionTask> getUserDefinedTasks(GroupedSSTableContainer sstables, int gcBefore)
{
List<AbstractCompactionTask> tasks = new ArrayList<>(managers.size());
for (int i = 0; i < managers.size(); i++)
{
if (sstables.isGroupEmpty(i))
continue;
tasks.addAll(managers.get(i).createUserDefinedTasks(sstables.getGroup(i), gcBefore));
}
return tasks;
}
AbstractCompactionTask getNextRepairFinishedTask()
{
List<TaskSupplier> repairFinishedSuppliers = getRepairFinishedTaskSuppliers();
if (!repairFinishedSuppliers.isEmpty())
{
Collections.sort(repairFinishedSuppliers);
for (TaskSupplier supplier : repairFinishedSuppliers)
{
AbstractCompactionTask task = supplier.getTask();
if (task != null)
return task;
}
}
return null;
}
private ArrayList<TaskSupplier> getRepairFinishedTaskSuppliers()
{
ArrayList<TaskSupplier> suppliers = new ArrayList<>(managers.size());
for (PendingRepairManager manager : managers)
{
int numPending = manager.getNumPendingRepairFinishedTasks();
if (numPending > 0)
{
suppliers.add(new TaskSupplier(numPending, manager::getNextRepairFinishedTask));
}
}
return suppliers;
}
@Override
public void addSSTables(GroupedSSTableContainer sstables)
{
Preconditions.checkArgument(sstables.numGroups() == managers.size());
for (int i = 0; i < managers.size(); i++)
{
if (!sstables.isGroupEmpty(i))
managers.get(i).addSSTables(sstables.getGroup(i));
}
}
@Override
public void removeSSTables(GroupedSSTableContainer sstables)
{
Preconditions.checkArgument(sstables.numGroups() == managers.size());
for (int i = 0; i < managers.size(); i++)
{
if (!sstables.isGroupEmpty(i))
managers.get(i).removeSSTables(sstables.getGroup(i));
}
}
@Override
public void replaceSSTables(GroupedSSTableContainer removed, GroupedSSTableContainer added)
{
Preconditions.checkArgument(removed.numGroups() == managers.size());
Preconditions.checkArgument(added.numGroups() == managers.size());
for (int i = 0; i < managers.size(); i++)
{
if (removed.isGroupEmpty(i) && added.isGroupEmpty(i))
continue;
if (removed.isGroupEmpty(i))
managers.get(i).addSSTables(added.getGroup(i));
else
managers.get(i).replaceSSTables(removed.getGroup(i), added.getGroup(i));
}
}
@Override
public List<ISSTableScanner> getScanners(GroupedSSTableContainer sstables, Collection<Range<Token>> ranges)
{
List<ISSTableScanner> scanners = new ArrayList<>(managers.size());
for (int i = 0; i < managers.size(); i++)
{
if (sstables.isGroupEmpty(i))
continue;
scanners.addAll(managers.get(i).getScanners(sstables.getGroup(i), ranges));
}
return scanners;
}
@Override
public SSTableMultiWriter createSSTableMultiWriter(Descriptor descriptor,
long keyCount,
long repairedAt,
UUID pendingRepair,
boolean isTransient,
MetadataCollector collector,
SerializationHeader header,
Collection<Index> indexes,
LifecycleNewTracker lifecycleNewTracker)
{
Preconditions.checkArgument(repairedAt == ActiveRepairService.UNREPAIRED_SSTABLE,
"PendingRepairHolder can't create sstablewriter with repaired at set");
Preconditions.checkArgument(pendingRepair != null,
"PendingRepairHolder can't create sstable writer without pendingRepair id");
// to avoid creating a compaction strategy for the wrong pending repair manager, we get the index based on where the sstable is to be written
AbstractCompactionStrategy strategy = managers.get(router.getIndexForSSTableDirectory(descriptor)).getOrCreate(pendingRepair);
return strategy.createSSTableMultiWriter(descriptor,
keyCount,
repairedAt,
pendingRepair,
isTransient,
collector,
header,
indexes,
lifecycleNewTracker);
}
@Override
public int getStrategyIndex(AbstractCompactionStrategy strategy)
{
for (int i = 0; i < managers.size(); i++)
{
if (managers.get(i).hasStrategy(strategy))
return i;
}
return -1;
}
public boolean hasDataForSession(UUID sessionID)
{
return Iterables.any(managers, prm -> prm.hasDataForSession(sessionID));
}
@Override
public boolean containsSSTable(SSTableReader sstable)
{
return Iterables.any(managers, prm -> prm.containsSSTable(sstable));
}
}