| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.cassandra.index.internal.keys; |
| |
| import java.nio.ByteBuffer; |
| |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import org.apache.cassandra.config.CFMetaData; |
| import org.apache.cassandra.config.ColumnDefinition; |
| import org.apache.cassandra.db.*; |
| import org.apache.cassandra.db.filter.ColumnFilter; |
| import org.apache.cassandra.db.filter.DataLimits; |
| import org.apache.cassandra.db.filter.RowFilter; |
| import org.apache.cassandra.db.partitions.ImmutableBTreePartition; |
| import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator; |
| import org.apache.cassandra.db.rows.*; |
| import org.apache.cassandra.index.internal.CassandraIndex; |
| import org.apache.cassandra.index.internal.CassandraIndexSearcher; |
| import org.apache.cassandra.utils.concurrent.OpOrder; |
| |
| public class KeysSearcher extends CassandraIndexSearcher |
| { |
| private static final Logger logger = LoggerFactory.getLogger(KeysSearcher.class); |
| |
| public KeysSearcher(ReadCommand command, |
| RowFilter.Expression expression, |
| CassandraIndex indexer) |
| { |
| super(command, expression, indexer); |
| } |
| |
| protected UnfilteredPartitionIterator queryDataFromIndex(final DecoratedKey indexKey, |
| final RowIterator indexHits, |
| final ReadCommand command, |
| final ReadExecutionController executionController) |
| { |
| assert indexHits.staticRow() == Rows.EMPTY_STATIC_ROW; |
| |
| return new UnfilteredPartitionIterator() |
| { |
| private UnfilteredRowIterator next; |
| |
| public boolean isForThrift() |
| { |
| return command.isForThrift(); |
| } |
| |
| public CFMetaData metadata() |
| { |
| return command.metadata(); |
| } |
| |
| public boolean hasNext() |
| { |
| return prepareNext(); |
| } |
| |
| public UnfilteredRowIterator next() |
| { |
| if (next == null) |
| prepareNext(); |
| |
| UnfilteredRowIterator toReturn = next; |
| next = null; |
| return toReturn; |
| } |
| |
| private boolean prepareNext() |
| { |
| while (next == null && indexHits.hasNext()) |
| { |
| Row hit = indexHits.next(); |
| DecoratedKey key = index.baseCfs.decorateKey(hit.clustering().get(0)); |
| if (!command.selectsKey(key)) |
| continue; |
| |
| ColumnFilter extendedFilter = getExtendedFilter(command.columnFilter()); |
| SinglePartitionReadCommand dataCmd = SinglePartitionReadCommand.create(isForThrift(), |
| index.baseCfs.metadata, |
| command.nowInSec(), |
| extendedFilter, |
| command.rowFilter(), |
| DataLimits.NONE, |
| key, |
| command.clusteringIndexFilter(key), |
| null); |
| |
| @SuppressWarnings("resource") // filterIfStale closes it's iterator if either it materialize it or if it returns null. |
| // Otherwise, we close right away if empty, and if it's assigned to next it will be called either |
| // by the next caller of next, or through closing this iterator is this come before. |
| UnfilteredRowIterator dataIter = filterIfStale(dataCmd.queryMemtableAndDisk(index.baseCfs, executionController), |
| hit, |
| indexKey.getKey(), |
| executionController.writeOpOrderGroup(), |
| isForThrift(), |
| command.nowInSec()); |
| |
| if (dataIter != null) |
| { |
| if (dataIter.isEmpty()) |
| dataIter.close(); |
| else |
| next = dataIter; |
| } |
| } |
| return next != null; |
| } |
| |
| public void remove() |
| { |
| throw new UnsupportedOperationException(); |
| } |
| |
| public void close() |
| { |
| indexHits.close(); |
| if (next != null) |
| next.close(); |
| } |
| }; |
| } |
| |
| private ColumnFilter getExtendedFilter(ColumnFilter initialFilter) |
| { |
| if (command.columnFilter().fetches(index.getIndexedColumn())) |
| return initialFilter; |
| |
| ColumnFilter.Builder builder = ColumnFilter.selectionBuilder(); |
| builder.addAll(initialFilter.fetchedColumns()); |
| builder.add(index.getIndexedColumn()); |
| return builder.build(); |
| } |
| |
| private UnfilteredRowIterator filterIfStale(UnfilteredRowIterator iterator, |
| Row indexHit, |
| ByteBuffer indexedValue, |
| OpOrder.Group writeOp, |
| boolean isForThrift, |
| int nowInSec) |
| { |
| if (isForThrift) |
| { |
| // The data we got has gone though ThrifResultsMerger, so we're looking for the row whose clustering |
| // is the indexed name and so we need to materialize the partition. |
| ImmutableBTreePartition result = ImmutableBTreePartition.create(iterator); |
| iterator.close(); |
| Row data = result.getRow(Clustering.make(index.getIndexedColumn().name.bytes)); |
| if (data == null) |
| return null; |
| |
| // for thrift tables, we need to compare the index entry against the compact value column, |
| // not the column actually designated as the indexed column so we don't use the index function |
| // lib for the staleness check like we do in every other case |
| Cell baseData = data.getCell(index.baseCfs.metadata.compactValueColumn()); |
| if (baseData == null || !baseData.isLive(nowInSec) || index.getIndexedColumn().type.compare(indexedValue, baseData.value()) != 0) |
| { |
| // Index is stale, remove the index entry and ignore |
| index.deleteStaleEntry(index.getIndexCfs().decorateKey(indexedValue), |
| Clustering.make(index.getIndexedColumn().name.bytes), |
| new DeletionTime(indexHit.primaryKeyLivenessInfo().timestamp(), nowInSec), |
| writeOp); |
| return null; |
| } |
| else |
| { |
| if (command.columnFilter().fetches(index.getIndexedColumn())) |
| return result.unfilteredIterator(); |
| |
| // The query on the base table used an extended column filter to ensure that the |
| // indexed column was actually read for use in the staleness check, before |
| // returning the results we must filter the base table partition so that it |
| // contains only the originally requested columns. See CASSANDRA-11523 |
| ClusteringComparator comparator = result.metadata().comparator; |
| Slices.Builder slices = new Slices.Builder(comparator); |
| for (ColumnDefinition selected : command.columnFilter().fetchedColumns()) |
| slices.add(Slice.make(comparator, selected.name.bytes)); |
| return result.unfilteredIterator(ColumnFilter.all(command.metadata()), slices.build(), false); |
| } |
| } |
| else |
| { |
| if (!iterator.metadata().isCompactTable()) |
| { |
| logger.warn("Non-composite index was used on the table '{}' during the query. Starting from Cassandra 4.0, only " + |
| "composite indexes will be supported. If compact flags were dropped for this table, drop and re-create " + |
| "the index.", iterator.metadata().cfName); |
| } |
| |
| Row data = iterator.staticRow(); |
| if (index.isStale(data, indexedValue, nowInSec)) |
| { |
| // Index is stale, remove the index entry and ignore |
| index.deleteStaleEntry(index.getIndexCfs().decorateKey(indexedValue), |
| makeIndexClustering(iterator.partitionKey().getKey(), Clustering.EMPTY), |
| new DeletionTime(indexHit.primaryKeyLivenessInfo().timestamp(), nowInSec), |
| writeOp); |
| iterator.close(); |
| return null; |
| } |
| else |
| { |
| return iterator; |
| } |
| } |
| } |
| } |