blob: f4619f2a502ce418b00d143ced333657e78c02c1 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.cassandra.db;
import java.util.*;
import com.google.common.collect.Iterables;
import org.apache.cassandra.db.columniterator.IColumnIteratorFactory;
import org.apache.cassandra.db.columniterator.LazyColumnIterator;
import org.apache.cassandra.db.columniterator.OnDiskAtomIterator;
import org.apache.cassandra.db.filter.QueryFilter;
import org.apache.cassandra.db.filter.IDiskAtomFilter;
import org.apache.cassandra.db.filter.TombstoneOverwhelmingException;
import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.apache.cassandra.utils.CloseableIterator;
import org.apache.cassandra.utils.MergeIterator;
public class RowIteratorFactory
{
private static final Comparator<OnDiskAtomIterator> COMPARE_BY_KEY = new Comparator<OnDiskAtomIterator>()
{
public int compare(OnDiskAtomIterator o1, OnDiskAtomIterator o2)
{
return DecoratedKey.comparator.compare(o1.getKey(), o2.getKey());
}
};
/**
* Get a row iterator over the provided memtables and sstables, between the provided keys
* and filtered by the queryfilter.
* @param memtables Memtables pending flush.
* @param sstables SStables to scan through.
* @param range The data range to fetch
* @param cfs
* @return A row iterator following all the given restrictions
*/
public static CloseableIterator<Row> getIterator(final Iterable<Memtable> memtables,
final Collection<SSTableReader> sstables,
final DataRange range,
final ColumnFamilyStore cfs,
final long now)
{
// fetch data from current memtable, historical memtables, and SSTables in the correct order.
final List<CloseableIterator<OnDiskAtomIterator>> iterators = new ArrayList<>(Iterables.size(memtables) + sstables.size());
for (Memtable memtable : memtables)
iterators.add(new ConvertToColumnIterator(range, memtable.getEntryIterator(range.startKey(), range.stopKey())));
for (SSTableReader sstable : sstables)
iterators.add(sstable.getScanner(range));
// reduce rows from all sources into a single row
return MergeIterator.get(iterators, COMPARE_BY_KEY, new MergeIterator.Reducer<OnDiskAtomIterator, Row>()
{
private final int gcBefore = cfs.gcBefore(now);
private final List<OnDiskAtomIterator> colIters = new ArrayList<>();
private DecoratedKey key;
private ColumnFamily returnCF;
@Override
protected void onKeyChange()
{
this.returnCF = ArrayBackedSortedColumns.factory.create(cfs.metadata, range.columnFilter.isReversed());
}
public void reduce(OnDiskAtomIterator current)
{
this.colIters.add(current);
this.key = current.getKey();
this.returnCF.delete(current.getColumnFamily());
}
protected Row getReduced()
{
// First check if this row is in the rowCache. If it is and it covers our filter, we can skip the rest
ColumnFamily cached = cfs.getRawCachedRow(key);
IDiskAtomFilter filter = range.columnFilter(key.getKey());
if (cached == null || !cfs.isFilterFullyCoveredBy(filter, cached, now))
{
// not cached: collate
QueryFilter.collateOnDiskAtom(returnCF, colIters, filter, key, gcBefore, now);
}
else
{
QueryFilter keyFilter = new QueryFilter(key, cfs.name, filter, now);
returnCF = cfs.filterColumnFamily(cached, keyFilter);
}
Row rv = new Row(key, returnCF);
colIters.clear();
key = null;
return rv;
}
});
}
/**
* Get a ColumnIterator for a specific key in the memtable.
*/
private static class ConvertToColumnIterator implements CloseableIterator<OnDiskAtomIterator>
{
private final DataRange range;
private final Iterator<Map.Entry<DecoratedKey, ColumnFamily>> iter;
public ConvertToColumnIterator(DataRange range, Iterator<Map.Entry<DecoratedKey, ColumnFamily>> iter)
{
this.range = range;
this.iter = iter;
}
public boolean hasNext()
{
return iter.hasNext();
}
/*
* Note that when doing get_paged_slice, we reset the start of the queryFilter after we've fetched the
* first row. This means that this iterator should not use in any way the filter to fetch a row before
* we call next(). Which prevents us for using guava AbstractIterator.
* This is obviously rather fragile and we should consider refactoring that code, but such refactor will go
* deep into the storage engine code so this will have to do until then.
*/
public OnDiskAtomIterator next()
{
final Map.Entry<DecoratedKey, ColumnFamily> entry = iter.next();
return new LazyColumnIterator(entry.getKey(), new IColumnIteratorFactory()
{
public OnDiskAtomIterator create()
{
return range.columnFilter(entry.getKey().getKey()).getColumnIterator(entry.getKey(), entry.getValue());
}
});
}
public void remove()
{
throw new UnsupportedOperationException();
}
public void close()
{
// pass
}
}
}