| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.cassandra.index.internal.composites; |
| |
| import java.nio.ByteBuffer; |
| import java.util.ArrayList; |
| import java.util.List; |
| |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import org.apache.cassandra.config.CFMetaData; |
| import org.apache.cassandra.db.*; |
| import org.apache.cassandra.db.filter.ClusteringIndexNamesFilter; |
| import org.apache.cassandra.db.filter.DataLimits; |
| import org.apache.cassandra.db.filter.RowFilter; |
| import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator; |
| import org.apache.cassandra.db.rows.*; |
| import org.apache.cassandra.db.transform.Transformation; |
| import org.apache.cassandra.index.internal.CassandraIndex; |
| import org.apache.cassandra.index.internal.CassandraIndexSearcher; |
| import org.apache.cassandra.index.internal.IndexEntry; |
| import org.apache.cassandra.utils.btree.BTreeSet; |
| import org.apache.cassandra.utils.concurrent.OpOrder; |
| |
| |
| public class CompositesSearcher extends CassandraIndexSearcher |
| { |
| private static final Logger logger = LoggerFactory.getLogger(CompositesSearcher.class); |
| |
| public CompositesSearcher(ReadCommand command, |
| RowFilter.Expression expression, |
| CassandraIndex index) |
| { |
| super(command, expression, index); |
| } |
| |
| private boolean isMatchingEntry(DecoratedKey partitionKey, IndexEntry entry, ReadCommand command) |
| { |
| return command.selectsKey(partitionKey) && command.selectsClustering(partitionKey, entry.indexedEntryClustering); |
| } |
| |
| protected UnfilteredPartitionIterator queryDataFromIndex(final DecoratedKey indexKey, |
| final RowIterator indexHits, |
| final ReadCommand command, |
| final ReadOrderGroup orderGroup) |
| { |
| assert indexHits.staticRow() == Rows.EMPTY_STATIC_ROW; |
| |
| return new UnfilteredPartitionIterator() |
| { |
| private IndexEntry nextEntry; |
| |
| private UnfilteredRowIterator next; |
| |
| public boolean isForThrift() |
| { |
| return command.isForThrift(); |
| } |
| |
| public CFMetaData metadata() |
| { |
| return command.metadata(); |
| } |
| |
| public boolean hasNext() |
| { |
| return prepareNext(); |
| } |
| |
| public UnfilteredRowIterator next() |
| { |
| if (next == null) |
| prepareNext(); |
| |
| UnfilteredRowIterator toReturn = next; |
| next = null; |
| return toReturn; |
| } |
| |
| private boolean prepareNext() |
| { |
| while (true) |
| { |
| if (next != null) |
| return true; |
| |
| if (nextEntry == null) |
| { |
| if (!indexHits.hasNext()) |
| return false; |
| |
| nextEntry = index.decodeEntry(indexKey, indexHits.next()); |
| } |
| |
| // Gather all index hits belonging to the same partition and query the data for those hits. |
| // TODO: it's much more efficient to do 1 read for all hits to the same partition than doing |
| // 1 read per index hit. However, this basically mean materializing all hits for a partition |
| // in memory so we should consider adding some paging mechanism. However, index hits should |
| // be relatively small so it's much better than the previous code that was materializing all |
| // *data* for a given partition. |
| BTreeSet.Builder<Clustering> clusterings = BTreeSet.builder(index.baseCfs.getComparator()); |
| List<IndexEntry> entries = new ArrayList<>(); |
| DecoratedKey partitionKey = index.baseCfs.decorateKey(nextEntry.indexedKey); |
| |
| while (nextEntry != null && partitionKey.getKey().equals(nextEntry.indexedKey)) |
| { |
| // We're queried a slice of the index, but some hits may not match some of the clustering column constraints |
| if (isMatchingEntry(partitionKey, nextEntry, command)) |
| { |
| clusterings.add(nextEntry.indexedEntryClustering); |
| entries.add(nextEntry); |
| } |
| |
| nextEntry = indexHits.hasNext() ? index.decodeEntry(indexKey, indexHits.next()) : null; |
| } |
| |
| // Because we've eliminated entries that don't match the clustering columns, it's possible we added nothing |
| if (clusterings.isEmpty()) |
| continue; |
| |
| // Query the gathered index hits. We still need to filter stale hits from the resulting query. |
| ClusteringIndexNamesFilter filter = new ClusteringIndexNamesFilter(clusterings.build(), false); |
| SinglePartitionReadCommand dataCmd = SinglePartitionReadCommand.create(isForThrift(), |
| index.baseCfs.metadata, |
| command.nowInSec(), |
| command.columnFilter(), |
| command.rowFilter(), |
| DataLimits.NONE, |
| partitionKey, |
| filter, |
| null); |
| @SuppressWarnings("resource") // We close right away if empty, and if it's assign to next it will be called either |
| // by the next caller of next, or through closing this iterator is this come before. |
| UnfilteredRowIterator dataIter = |
| filterStaleEntries(dataCmd.queryMemtableAndDisk(index.baseCfs, |
| orderGroup.baseReadOpOrderGroup()), |
| indexKey.getKey(), |
| entries, |
| orderGroup.writeOpOrderGroup(), |
| command.nowInSec()); |
| |
| if (dataIter.isEmpty()) |
| { |
| dataIter.close(); |
| continue; |
| } |
| |
| next = dataIter; |
| return true; |
| } |
| } |
| |
| public void remove() |
| { |
| throw new UnsupportedOperationException(); |
| } |
| |
| public void close() |
| { |
| indexHits.close(); |
| if (next != null) |
| next.close(); |
| } |
| }; |
| } |
| |
| private void deleteAllEntries(final List<IndexEntry> entries, final OpOrder.Group writeOp, final int nowInSec) |
| { |
| entries.forEach(entry -> |
| index.deleteStaleEntry(entry.indexValue, |
| entry.indexClustering, |
| new DeletionTime(entry.timestamp, nowInSec), |
| writeOp)); |
| } |
| |
| private UnfilteredRowIterator filterStaleEntries(UnfilteredRowIterator dataIter, |
| final ByteBuffer indexValue, |
| final List<IndexEntry> entries, |
| final OpOrder.Group writeOp, |
| final int nowInSec) |
| { |
| // collect stale index entries and delete them when we close this iterator |
| final List<IndexEntry> staleEntries = new ArrayList<>(); |
| |
| // if there is a partition level delete in the base table, we need to filter |
| // any index entries which would be shadowed by it |
| if (!dataIter.partitionLevelDeletion().isLive()) |
| { |
| DeletionTime deletion = dataIter.partitionLevelDeletion(); |
| entries.forEach(e -> { |
| if (deletion.deletes(e.timestamp)) |
| staleEntries.add(e); |
| }); |
| } |
| |
| ClusteringComparator comparator = dataIter.metadata().comparator; |
| class Transform extends Transformation |
| { |
| private int entriesIdx; |
| |
| @Override |
| public Row applyToRow(Row row) |
| { |
| IndexEntry entry = findEntry(row.clustering()); |
| if (!index.isStale(row, indexValue, nowInSec)) |
| return row; |
| |
| staleEntries.add(entry); |
| return null; |
| } |
| |
| private IndexEntry findEntry(Clustering clustering) |
| { |
| assert entriesIdx < entries.size(); |
| while (entriesIdx < entries.size()) |
| { |
| IndexEntry entry = entries.get(entriesIdx++); |
| Clustering indexedEntryClustering = entry.indexedEntryClustering; |
| // The entries are in clustering order. So that the requested entry should be the |
| // next entry, the one at 'entriesIdx'. However, we can have stale entries, entries |
| // that have no corresponding row in the base table typically because of a range |
| // tombstone or partition level deletion. Delete such stale entries. |
| int cmp = comparator.compare(indexedEntryClustering, clustering); |
| assert cmp <= 0; // this would means entries are not in clustering order, which shouldn't happen |
| if (cmp == 0) |
| return entry; |
| else |
| { |
| // COMPACT COMPOSITE tables support null values in there clustering key but |
| // those tables do not support static columns. By consequence if a table |
| // has some static columns and all its clustering key elements are null |
| // it means that the partition exists and contains only static data |
| if (!dataIter.metadata().hasStaticColumns() || !containsOnlyNullValues(indexedEntryClustering)) |
| staleEntries.add(entry); |
| } |
| } |
| // entries correspond to the rows we've queried, so we shouldn't have a row that has no corresponding entry. |
| throw new AssertionError(); |
| } |
| |
| private boolean containsOnlyNullValues(Clustering indexedEntryClustering) |
| { |
| int i = 0; |
| for (; i < indexedEntryClustering.size() && indexedEntryClustering.get(i) == null; i++); |
| return i == indexedEntryClustering.size(); |
| } |
| |
| @Override |
| public void onPartitionClose() |
| { |
| deleteAllEntries(staleEntries, writeOp, nowInSec); |
| } |
| } |
| |
| return Transformation.apply(dataIter, new Transform()); |
| } |
| } |