blob: 2114d4249e0925a49d89620fe9e467596041b989 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.cassandra.index.internal.keys;
import java.nio.ByteBuffer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.cassandra.db.*;
import org.apache.cassandra.db.filter.ColumnFilter;
import org.apache.cassandra.db.filter.DataLimits;
import org.apache.cassandra.db.filter.RowFilter;
import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator;
import org.apache.cassandra.db.rows.*;
import org.apache.cassandra.index.internal.CassandraIndex;
import org.apache.cassandra.index.internal.CassandraIndexSearcher;
import org.apache.cassandra.schema.TableMetadata;
public class KeysSearcher extends CassandraIndexSearcher
{
private static final Logger logger = LoggerFactory.getLogger(KeysSearcher.class);
public KeysSearcher(ReadCommand command,
RowFilter.Expression expression,
CassandraIndex indexer)
{
super(command, expression, indexer);
}
protected UnfilteredPartitionIterator queryDataFromIndex(final DecoratedKey indexKey,
final RowIterator indexHits,
final ReadCommand command,
final ReadExecutionController executionController)
{
assert indexHits.staticRow() == Rows.EMPTY_STATIC_ROW;
return new UnfilteredPartitionIterator()
{
private UnfilteredRowIterator next;
public TableMetadata metadata()
{
return command.metadata();
}
public boolean hasNext()
{
return prepareNext();
}
public UnfilteredRowIterator next()
{
if (next == null)
prepareNext();
UnfilteredRowIterator toReturn = next;
next = null;
return toReturn;
}
private boolean prepareNext()
{
while (next == null && indexHits.hasNext())
{
Row hit = indexHits.next();
DecoratedKey key = index.baseCfs.decorateKey(hit.clustering().bufferAt(0));
if (!command.selectsKey(key))
continue;
ColumnFilter extendedFilter = getExtendedFilter(command.columnFilter());
SinglePartitionReadCommand dataCmd = SinglePartitionReadCommand.create(index.baseCfs.metadata(),
command.nowInSec(),
extendedFilter,
command.rowFilter(),
DataLimits.NONE,
key,
command.clusteringIndexFilter(key),
null);
@SuppressWarnings("resource") // filterIfStale closes it's iterator if either it materialize it or if it returns null.
// Otherwise, we close right away if empty, and if it's assigned to next it will be called either
// by the next caller of next, or through closing this iterator is this come before.
UnfilteredRowIterator dataIter = filterIfStale(dataCmd.queryMemtableAndDisk(index.baseCfs, executionController),
hit,
indexKey.getKey(),
executionController.getWriteContext(),
command.nowInSec());
if (dataIter != null)
{
if (dataIter.isEmpty())
dataIter.close();
else
next = dataIter;
}
}
return next != null;
}
public void remove()
{
throw new UnsupportedOperationException();
}
public void close()
{
indexHits.close();
if (next != null)
next.close();
}
};
}
private ColumnFilter getExtendedFilter(ColumnFilter initialFilter)
{
if (command.columnFilter().fetches(index.getIndexedColumn()))
return initialFilter;
ColumnFilter.Builder builder = ColumnFilter.selectionBuilder();
builder.addAll(initialFilter.fetchedColumns());
builder.add(index.getIndexedColumn());
return builder.build();
}
private UnfilteredRowIterator filterIfStale(UnfilteredRowIterator iterator,
Row indexHit,
ByteBuffer indexedValue,
WriteContext ctx,
int nowInSec)
{
Row data = iterator.staticRow();
if (index.isStale(data, indexedValue, nowInSec))
{
// Index is stale, remove the index entry and ignore
index.deleteStaleEntry(index.getIndexCfs().decorateKey(indexedValue),
makeIndexClustering(iterator.partitionKey().getKey(), Clustering.EMPTY),
new DeletionTime(indexHit.primaryKeyLivenessInfo().timestamp(), nowInSec),
ctx);
iterator.close();
return null;
}
else
{
return iterator;
}
}
}