blob: caf375b956ea7134f3309df42f4befa75366b06a [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.cassandra.db.index.composites;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.*;
import org.apache.cassandra.db.*;
import org.apache.cassandra.db.filter.*;
import org.apache.cassandra.db.index.AbstractSimplePerColumnSecondaryIndex;
import org.apache.cassandra.db.index.PerColumnSecondaryIndex;
import org.apache.cassandra.db.index.SecondaryIndex;
import org.apache.cassandra.db.index.SecondaryIndexManager;
import org.apache.cassandra.db.index.SecondaryIndexSearcher;
import org.apache.cassandra.db.marshal.CompositeType;
import org.apache.cassandra.dht.AbstractBounds;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.thrift.IndexExpression;
import org.apache.cassandra.thrift.IndexOperator;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class CompositesSearcher extends SecondaryIndexSearcher
{
private static final Logger logger = LoggerFactory.getLogger(CompositesSearcher.class);
private final int prefixSize;
public CompositesSearcher(SecondaryIndexManager indexManager, Set<ByteBuffer> columns, int prefixSize)
{
super(indexManager, columns);
this.prefixSize = prefixSize;
}
private IndexExpression highestSelectivityPredicate(List<IndexExpression> clause)
{
IndexExpression best = null;
int bestMeanCount = Integer.MAX_VALUE;
for (IndexExpression expression : clause)
{
//skip columns belonging to a different index type
if(!columns.contains(expression.column_name))
continue;
SecondaryIndex index = indexManager.getIndexForColumn(expression.column_name);
if (index == null || (expression.op != IndexOperator.EQ))
continue;
int columns = index.getIndexCfs().getMeanColumns();
if (columns < bestMeanCount)
{
best = expression;
bestMeanCount = columns;
}
}
return best;
}
public boolean isIndexing(List<IndexExpression> clause)
{
return highestSelectivityPredicate(clause) != null;
}
@Override
public List<Row> search(List<IndexExpression> clause, AbstractBounds<RowPosition> range, int maxResults, IDiskAtomFilter dataFilter, boolean maxIsColumns)
{
assert clause != null && !clause.isEmpty();
ExtendedFilter filter = ExtendedFilter.create(baseCfs, dataFilter, clause, maxResults, maxIsColumns, false);
return baseCfs.filter(getIndexedIterator(range, filter), filter);
}
public ColumnFamilyStore.AbstractScanIterator getIndexedIterator(final AbstractBounds<RowPosition> range, final ExtendedFilter filter)
{
// Start with the most-restrictive indexed clause, then apply remaining clauses
// to each row matching that clause.
// TODO: allow merge join instead of just one index + loop
final IndexExpression primary = highestSelectivityPredicate(filter.getClause());
final SecondaryIndex index = indexManager.getIndexForColumn(primary.column_name);
assert index != null;
final DecoratedKey indexKey = index.getIndexKeyFor(primary.value);
/*
* XXX: If the range requested is a token range, we'll have to start at the beginning (and stop at the end) of
* the indexed row unfortunately (which will be inefficient), because we have not way to intuit the small
* possible key having a given token. A fix would be to actually store the token along the key in the
* indexed row.
*/
ByteBuffer startKey = range.left instanceof DecoratedKey ? ((DecoratedKey)range.left).key : ByteBufferUtil.EMPTY_BYTE_BUFFER;
ByteBuffer endKey = range.right instanceof DecoratedKey ? ((DecoratedKey)range.right).key : ByteBufferUtil.EMPTY_BYTE_BUFFER;
final CompositeType baseComparator = (CompositeType)baseCfs.getComparator();
final CompositeType indexComparator = (CompositeType)index.getIndexCfs().getComparator();
CompositeType.Builder builder = null;
if (startKey.remaining() > 0)
{
builder = indexComparator.builder().add(startKey);
// For names filter, we have no choice but to query from the beginning of the key. This can be highly inefficient however.
if (filter.originalFilter() instanceof SliceQueryFilter)
{
ByteBuffer[] components = baseComparator.split(((SliceQueryFilter)filter.originalFilter()).start());
for (int i = 0; i < Math.min(prefixSize, components.length); ++i)
builder.add(components[i]);
}
}
final ByteBuffer startPrefix = startKey.remaining() == 0 ? ByteBufferUtil.EMPTY_BYTE_BUFFER : builder.build();
if (endKey.remaining() > 0)
{
builder = indexComparator.builder().add(endKey);
// For names filter, we have no choice but to query until the end of the key. This can be highly inefficient however.
if (filter.originalFilter() instanceof SliceQueryFilter)
{
ByteBuffer[] components = baseComparator.split(((SliceQueryFilter)filter.originalFilter()).finish());
for (int i = 0; i < Math.min(prefixSize, components.length); ++i)
builder.add(components[i]);
}
}
final ByteBuffer endPrefix = endKey.remaining() == 0 ? ByteBufferUtil.EMPTY_BYTE_BUFFER : builder.buildAsEndOfRange();
// We will need to filter clustering keys based on the user filter. If
// it is a names filter, we are really interested on the clustering
// part, not the actual column name (NOTE: this is a hack that assumes CQL3).
final SliceQueryFilter originalFilter;
if (filter.originalFilter() instanceof SliceQueryFilter)
{
originalFilter = (SliceQueryFilter)filter.originalFilter();
}
else
{
ByteBuffer first = ((NamesQueryFilter)filter.originalFilter()).columns.iterator().next();
ByteBuffer[] components = baseComparator.split(first);
builder = baseComparator.builder();
// All all except the last component, since it's the column name
for (int i = 0; i < components.length - 1; i++)
builder.add(components[i]);
originalFilter = new SliceQueryFilter(builder.copy().build(), builder.copy().buildAsEndOfRange(), false, Integer.MAX_VALUE);
}
return new ColumnFamilyStore.AbstractScanIterator()
{
private ByteBuffer lastSeenPrefix = startPrefix;
private Deque<IColumn> indexColumns;
private final QueryPath path = new QueryPath(baseCfs.columnFamily);
private int columnsRead = Integer.MAX_VALUE;
private final int meanColumns = Math.max(index.getIndexCfs().getMeanColumns(), 1);
// We shouldn't fetch only 1 row as this provides buggy paging in case the first row doesn't satisfy all clauses
private final int rowsPerQuery = Math.max(Math.min(filter.maxRows(), filter.maxColumns() / meanColumns), 2);
public boolean needsFiltering()
{
return false;
}
private Row makeReturn(DecoratedKey key, ColumnFamily data)
{
if (data == null)
{
return endOfData();
}
else
{
assert key != null;
return new Row(key, data);
}
}
protected Row computeNext()
{
/*
* Our internal index code is wired toward internal rows. So we need to acumulate all results for a given
* row before returning from this method. Which unfortunately means that this method has to do what
* CFS.filter does for KeysIndex.
*/
DecoratedKey currentKey = null;
ColumnFamily data = null;
int columnsCount = 0;
int limit = ((SliceQueryFilter)filter.initialFilter()).count;
while (true)
{
// Did we got more columns that needed to respect the user limit?
// (but we still need to return was fetch already)
if (columnsCount > limit)
return makeReturn(currentKey, data);
if (indexColumns == null || indexColumns.isEmpty())
{
if (columnsRead < rowsPerQuery)
{
logger.trace("Read only {} (< {}) last page through, must be done", columnsRead, rowsPerQuery);
return makeReturn(currentKey, data);
}
if (logger.isTraceEnabled() && (index instanceof AbstractSimplePerColumnSecondaryIndex))
logger.trace("Scanning index {} starting with {}",
((AbstractSimplePerColumnSecondaryIndex)index).expressionString(primary), indexComparator.getString(startPrefix));
QueryFilter indexFilter = QueryFilter.getSliceFilter(indexKey,
new QueryPath(index.getIndexCfs().getColumnFamilyName()),
lastSeenPrefix,
endPrefix,
false,
rowsPerQuery);
ColumnFamily indexRow = index.getIndexCfs().getColumnFamily(indexFilter);
if (indexRow == null)
return makeReturn(currentKey, data);
Collection<IColumn> sortedColumns = indexRow.getSortedColumns();
columnsRead = sortedColumns.size();
indexColumns = new ArrayDeque(sortedColumns);
IColumn firstColumn = sortedColumns.iterator().next();
// Paging is racy, so it is possible the first column of a page is not the last seen one.
if (lastSeenPrefix != startPrefix && lastSeenPrefix.equals(firstColumn.name()))
{
// skip the row we already saw w/ the last page of results
indexColumns.poll();
logger.trace("Skipping {}", indexComparator.getString(firstColumn.name()));
}
else if (range instanceof Range && !indexColumns.isEmpty() && firstColumn.name().equals(startPrefix))
{
// skip key excluded by range
indexColumns.poll();
logger.trace("Skipping first key as range excludes it");
}
}
while (!indexColumns.isEmpty() && columnsCount <= limit)
{
IColumn column = indexColumns.poll();
lastSeenPrefix = column.name();
if (column.isMarkedForDelete())
{
logger.trace("skipping {}", column.name());
continue;
}
ByteBuffer[] components = indexComparator.split(lastSeenPrefix);
DecoratedKey dk = baseCfs.partitioner.decorateKey(components[0]);
// Are we done for this row?
if (currentKey == null)
{
currentKey = dk;
}
else if (!currentKey.equals(dk))
{
DecoratedKey previousKey = currentKey;
currentKey = dk;
// We're done with the previous row, return it if it had data, continue otherwise
indexColumns.addFirst(column);
if (data == null)
continue;
else
return makeReturn(previousKey, data);
}
if (!range.right.isMinimum(baseCfs.partitioner) && range.right.compareTo(dk) < 0)
{
logger.trace("Reached end of assigned scan range");
return endOfData();
}
if (!range.contains(dk))
{
logger.debug("Skipping entry {} outside of assigned scan range", dk.token);
continue;
}
logger.trace("Adding index hit to current row for {}", indexComparator.getString(lastSeenPrefix));
// For sparse composites, we're good querying the whole logical row
// Obviously if this index is used for other usage, that might be inefficient
CompositeType.Builder builder = baseComparator.builder();
for (int i = 0; i < prefixSize; i++)
builder.add(components[i + 1]);
// Does this "row" match the user original filter
ByteBuffer start = builder.copy().build();
if (!originalFilter.includes(baseComparator, start))
continue;
SliceQueryFilter dataFilter = new SliceQueryFilter(start, builder.copy().buildAsEndOfRange(), false, Integer.MAX_VALUE);
ColumnFamily newData = baseCfs.getColumnFamily(new QueryFilter(dk, path, dataFilter));
if (newData != null)
{
ByteBuffer baseColumnName = builder.copy().add(primary.column_name).build();
ByteBuffer indexedValue = indexKey.key;
if (isIndexValueStale(newData, baseColumnName, indexedValue))
{
// delete the index entry w/ its own timestamp
IColumn dummyColumn = new Column(baseColumnName, indexedValue, column.timestamp());
((PerColumnSecondaryIndex) index).delete(dk.key, dummyColumn);
continue;
}
if (!filter.isSatisfiedBy(newData, builder))
continue;
if (data == null)
data = ColumnFamily.create(baseCfs.metadata);
data.resolve(newData);
columnsCount += newData.getLiveColumnCount();
}
}
}
}
public void close() throws IOException {}
};
}
}