blob: 2811543e56ec607cd18baf29fed7f9cab0d6101f [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.cassandra.db;
import java.io.Closeable;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.TreeSet;
import com.google.common.base.Function;
import com.google.common.collect.Iterables;
import com.google.common.collect.Iterators;
import org.apache.cassandra.concurrent.Stage;
import org.apache.cassandra.concurrent.StageManager;
import org.apache.cassandra.db.columniterator.OnDiskAtomIterator;
import org.apache.cassandra.db.composites.CellName;
import org.apache.cassandra.db.filter.NamesQueryFilter;
import org.apache.cassandra.db.filter.QueryFilter;
import org.apache.cassandra.db.marshal.CounterColumnType;
import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.apache.cassandra.io.util.FileUtils;
import org.apache.cassandra.tracing.Tracing;
import org.apache.cassandra.utils.SearchIterator;
import org.apache.cassandra.utils.memory.HeapAllocator;
public class CollationController
{
private final ColumnFamilyStore cfs;
private final QueryFilter filter;
private final int gcBefore;
private int sstablesIterated = 0;
public CollationController(ColumnFamilyStore cfs, QueryFilter filter, int gcBefore)
{
this.cfs = cfs;
this.filter = filter;
this.gcBefore = gcBefore;
}
public ColumnFamily getTopLevelColumns(boolean copyOnHeap)
{
return filter.filter instanceof NamesQueryFilter
&& cfs.metadata.getDefaultValidator() != CounterColumnType.instance
? collectTimeOrderedData(copyOnHeap)
: collectAllData(copyOnHeap);
}
/**
* Collects data in order of recency, using the sstable maxtimestamp data.
* Once we have data for all requests columns that is newer than the newest remaining maxtimestamp,
* we stop.
*/
private ColumnFamily collectTimeOrderedData(boolean copyOnHeap)
{
final ColumnFamily container = ArrayBackedSortedColumns.factory.create(cfs.metadata, filter.filter.isReversed());
List<OnDiskAtomIterator> iterators = new ArrayList<>();
boolean isEmpty = true;
Tracing.trace("Acquiring sstable references");
ColumnFamilyStore.ViewFragment view = cfs.select(cfs.viewFilter(filter.key));
DeletionInfo returnDeletionInfo = container.deletionInfo();
try
{
Tracing.trace("Merging memtable contents");
for (Memtable memtable : view.memtables)
{
ColumnFamily cf = memtable.getColumnFamily(filter.key);
if (cf != null)
{
filter.delete(container.deletionInfo(), cf);
isEmpty = false;
Iterator<Cell> iter = filter.getIterator(cf);
while (iter.hasNext())
{
Cell cell = iter.next();
if (copyOnHeap)
cell = cell.localCopy(cfs.metadata, HeapAllocator.instance);
container.addColumn(cell);
}
}
}
// avoid changing the filter columns of the original filter
// (reduceNameFilter removes columns that are known to be irrelevant)
NamesQueryFilter namesFilter = (NamesQueryFilter) filter.filter;
TreeSet<CellName> filterColumns = new TreeSet<>(namesFilter.columns);
QueryFilter reducedFilter = new QueryFilter(filter.key, filter.cfName, namesFilter.withUpdatedColumns(filterColumns), filter.timestamp);
/* add the SSTables on disk */
Collections.sort(view.sstables, SSTableReader.maxTimestampComparator);
boolean onlyUnrepaired = true;
// read sorted sstables
for (SSTableReader sstable : view.sstables)
{
// if we've already seen a row tombstone with a timestamp greater
// than the most recent update to this sstable, we're done, since the rest of the sstables
// will also be older
if (sstable.getMaxTimestamp() < returnDeletionInfo.getTopLevelDeletion().markedForDeleteAt)
break;
long currentMaxTs = sstable.getMaxTimestamp();
reduceNameFilter(reducedFilter, container, currentMaxTs);
if (((NamesQueryFilter) reducedFilter.filter).columns.isEmpty())
break;
if (sstable.isRepaired())
onlyUnrepaired = false;
Tracing.trace("Merging data from sstable {}", sstable.descriptor.generation);
sstable.incrementReadCount();
OnDiskAtomIterator iter = reducedFilter.getSSTableColumnIterator(sstable);
iterators.add(iter);
isEmpty = false;
if (iter.getColumnFamily() != null)
{
container.delete(iter.getColumnFamily());
sstablesIterated++;
while (iter.hasNext())
container.addAtom(iter.next());
}
}
// we need to distinguish between "there is no data at all for this row" (BF will let us rebuild that efficiently)
// and "there used to be data, but it's gone now" (we should cache the empty CF so we don't need to rebuild that slower)
if (isEmpty)
return null;
// do a final collate. toCollate is boilerplate required to provide a CloseableIterator
ColumnFamily returnCF = container.cloneMeShallow();
Tracing.trace("Collating all results");
filter.collateOnDiskAtom(returnCF, container.iterator(), gcBefore);
// "hoist up" the requested data into a more recent sstable
if (sstablesIterated > cfs.getMinimumCompactionThreshold()
&& onlyUnrepaired
&& !cfs.isAutoCompactionDisabled()
&& cfs.getCompactionStrategy().shouldDefragment())
{
// !!WARNING!! if we stop copying our data to a heap-managed object,
// we will need to track the lifetime of this mutation as well
Tracing.trace("Defragmenting requested data");
final Mutation mutation = new Mutation(cfs.keyspace.getName(), filter.key.getKey(), returnCF.cloneMe());
StageManager.getStage(Stage.MUTATION).execute(new Runnable()
{
public void run()
{
// skipping commitlog and index updates is fine since we're just de-fragmenting existing data
Keyspace.open(mutation.getKeyspaceName()).apply(mutation, false, false);
}
});
}
// Caller is responsible for final removeDeletedCF. This is important for cacheRow to work correctly:
return returnCF;
}
finally
{
for (OnDiskAtomIterator iter : iterators)
FileUtils.closeQuietly(iter);
}
}
/**
* remove columns from @param filter where we already have data in @param container newer than @param sstableTimestamp
*/
private void reduceNameFilter(QueryFilter filter, ColumnFamily container, long sstableTimestamp)
{
if (container == null)
return;
SearchIterator<CellName, Cell> searchIter = container.searchIterator();
for (Iterator<CellName> iterator = ((NamesQueryFilter) filter.filter).columns.iterator(); iterator.hasNext() && searchIter.hasNext(); )
{
CellName filterColumn = iterator.next();
Cell cell = searchIter.next(filterColumn);
if (cell != null && cell.timestamp() > sstableTimestamp)
iterator.remove();
}
}
/**
* Collects data the brute-force way: gets an iterator for the filter in question
* from every memtable and sstable, then merges them together.
*/
private ColumnFamily collectAllData(boolean copyOnHeap)
{
Tracing.trace("Acquiring sstable references");
ColumnFamilyStore.ViewFragment view = cfs.select(cfs.viewFilter(filter.key));
List<Iterator<? extends OnDiskAtom>> iterators = new ArrayList<>(Iterables.size(view.memtables) + view.sstables.size());
ColumnFamily returnCF = ArrayBackedSortedColumns.factory.create(cfs.metadata, filter.filter.isReversed());
DeletionInfo returnDeletionInfo = returnCF.deletionInfo();
try
{
Tracing.trace("Merging memtable tombstones");
for (Memtable memtable : view.memtables)
{
final ColumnFamily cf = memtable.getColumnFamily(filter.key);
if (cf != null)
{
filter.delete(returnDeletionInfo, cf);
Iterator<Cell> iter = filter.getIterator(cf);
if (copyOnHeap)
{
iter = Iterators.transform(iter, new Function<Cell, Cell>()
{
public Cell apply(Cell cell)
{
return cell.localCopy(cf.metadata, HeapAllocator.instance);
}
});
}
iterators.add(iter);
}
}
/*
* We can't eliminate full sstables based on the timestamp of what we've already read like
* in collectTimeOrderedData, but we still want to eliminate sstable whose maxTimestamp < mostRecentTombstone
* we've read. We still rely on the sstable ordering by maxTimestamp since if
* maxTimestamp_s1 > maxTimestamp_s0,
* we're guaranteed that s1 cannot have a row tombstone such that
* timestamp(tombstone) > maxTimestamp_s0
* since we necessarily have
* timestamp(tombstone) <= maxTimestamp_s1
* In other words, iterating in maxTimestamp order allow to do our mostRecentTombstone elimination
* in one pass, and minimize the number of sstables for which we read a rowTombstone.
*/
Collections.sort(view.sstables, SSTableReader.maxTimestampComparator);
List<SSTableReader> skippedSSTables = null;
long minTimestamp = Long.MAX_VALUE;
int nonIntersectingSSTables = 0;
for (SSTableReader sstable : view.sstables)
{
minTimestamp = Math.min(minTimestamp, sstable.getMinTimestamp());
// if we've already seen a row tombstone with a timestamp greater
// than the most recent update to this sstable, we can skip it
if (sstable.getMaxTimestamp() < returnDeletionInfo.getTopLevelDeletion().markedForDeleteAt)
break;
if (!filter.shouldInclude(sstable))
{
nonIntersectingSSTables++;
// sstable contains no tombstone if maxLocalDeletionTime == Integer.MAX_VALUE, so we can safely skip those entirely
if (sstable.getSSTableMetadata().maxLocalDeletionTime != Integer.MAX_VALUE)
{
if (skippedSSTables == null)
skippedSSTables = new ArrayList<>();
skippedSSTables.add(sstable);
}
continue;
}
sstable.incrementReadCount();
OnDiskAtomIterator iter = filter.getSSTableColumnIterator(sstable);
iterators.add(iter);
if (iter.getColumnFamily() != null)
{
ColumnFamily cf = iter.getColumnFamily();
returnCF.delete(cf);
sstablesIterated++;
}
}
int includedDueToTombstones = 0;
// Check for row tombstone in the skipped sstables
if (skippedSSTables != null)
{
for (SSTableReader sstable : skippedSSTables)
{
if (sstable.getMaxTimestamp() <= minTimestamp)
continue;
sstable.incrementReadCount();
OnDiskAtomIterator iter = filter.getSSTableColumnIterator(sstable);
ColumnFamily cf = iter.getColumnFamily();
// we are only interested in row-level tombstones here, and only if markedForDeleteAt is larger than minTimestamp
if (cf != null && cf.deletionInfo().getTopLevelDeletion().markedForDeleteAt > minTimestamp)
{
includedDueToTombstones++;
iterators.add(iter);
returnCF.delete(cf.deletionInfo().getTopLevelDeletion());
sstablesIterated++;
}
else
{
FileUtils.closeQuietly(iter);
}
}
}
if (Tracing.isTracing())
Tracing.trace("Skipped {}/{} non-slice-intersecting sstables, included {} due to tombstones",
nonIntersectingSSTables, view.sstables.size(), includedDueToTombstones);
// we need to distinguish between "there is no data at all for this row" (BF will let us rebuild that efficiently)
// and "there used to be data, but it's gone now" (we should cache the empty CF so we don't need to rebuild that slower)
if (iterators.isEmpty())
return null;
Tracing.trace("Merging data from memtables and {} sstables", sstablesIterated);
filter.collateOnDiskAtom(returnCF, iterators, gcBefore);
// Caller is responsible for final removeDeletedCF. This is important for cacheRow to work correctly:
return returnCF;
}
finally
{
for (Object iter : iterators)
if (iter instanceof Closeable)
FileUtils.closeQuietly((Closeable) iter);
}
}
public int getSstablesIterated()
{
return sstablesIterated;
}
}