blob: af8aaf431e74a1985fef223c1669df27263584d5 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.processors.query.calcite.exec;
import java.lang.reflect.Type;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.util.ImmutableBitSet;
import org.apache.calcite.util.ImmutableIntList;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.cluster.ClusterTopologyException;
import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.cache.query.index.sorted.IndexKeyType;
import org.apache.ignite.internal.cache.query.index.sorted.IndexPlainRowImpl;
import org.apache.ignite.internal.cache.query.index.sorted.IndexRow;
import org.apache.ignite.internal.cache.query.index.sorted.InlineIndexRowHandler;
import org.apache.ignite.internal.cache.query.index.sorted.inline.IndexQueryContext;
import org.apache.ignite.internal.cache.query.index.sorted.inline.InlineIndex;
import org.apache.ignite.internal.cache.query.index.sorted.inline.InlineIndexKeyType;
import org.apache.ignite.internal.cache.query.index.sorted.inline.io.InlineIO;
import org.apache.ignite.internal.cache.query.index.sorted.keys.IndexKey;
import org.apache.ignite.internal.cache.query.index.sorted.keys.IndexKeyFactory;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTopologyFuture;
import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtLocalPartition;
import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState;
import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionTopology;
import org.apache.ignite.internal.processors.cache.mvcc.MvccSnapshot;
import org.apache.ignite.internal.processors.cache.persistence.tree.BPlusTree;
import org.apache.ignite.internal.processors.cache.persistence.tree.io.BPlusIO;
import org.apache.ignite.internal.processors.query.calcite.exec.RowHandler.RowFactory;
import org.apache.ignite.internal.processors.query.calcite.exec.exp.RangeIterable;
import org.apache.ignite.internal.processors.query.calcite.schema.CacheTableDescriptor;
import org.apache.ignite.internal.processors.query.calcite.type.IgniteTypeFactory;
import org.apache.ignite.internal.processors.query.calcite.util.TypeUtils;
import org.apache.ignite.internal.util.lang.GridCursor;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.spi.indexing.IndexingQueryFilter;
import org.apache.ignite.spi.indexing.IndexingQueryFilterImpl;
import org.jetbrains.annotations.Nullable;
/**
* Scan on index.
*/
public class IndexScan<Row> extends AbstractIndexScan<Row, IndexRow> {
/** */
private final GridKernalContext kctx;
/** */
private final GridCacheContext<?, ?> cctx;
/** */
private final CacheTableDescriptor desc;
/** */
private final RowFactory<Row> factory;
/** */
private final AffinityTopologyVersion topVer;
/** */
private final int[] parts;
/** */
private final MvccSnapshot mvccSnapshot;
/** */
private volatile List<GridDhtLocalPartition> reserved;
/** */
private final ImmutableBitSet requiredColumns;
/** */
protected final InlineIndex idx;
/** Mapping from index keys to row fields. */
private final ImmutableIntList idxFieldMapping;
/** Mapping from row fields to index keys. */
private final int[] fieldIdxMapping;
/** Types of key fields stored in index. */
private final Type[] fieldsStoreTypes;
/**
* @param ectx Execution context.
* @param desc Table descriptor.
* @param idxFieldMapping Mapping from index keys to row fields.
* @param idx Physical index.
* @param ranges Index scan bounds.
*/
public IndexScan(
ExecutionContext<Row> ectx,
CacheTableDescriptor desc,
InlineIndex idx,
ImmutableIntList idxFieldMapping,
int[] parts,
RangeIterable<Row> ranges,
@Nullable ImmutableBitSet requiredColumns
) {
this(ectx, desc, new TreeIndexWrapper(idx), idxFieldMapping, parts, ranges, requiredColumns);
}
/**
* @param ectx Execution context.
* @param desc Table descriptor.
* @param idxFieldMapping Mapping from index keys to row fields.
* @param treeIdx Physical index wrapper.
* @param ranges Index scan bounds.
*/
protected IndexScan(
ExecutionContext<Row> ectx,
CacheTableDescriptor desc,
TreeIndexWrapper treeIdx,
ImmutableIntList idxFieldMapping,
int[] parts,
RangeIterable<Row> ranges,
@Nullable ImmutableBitSet requiredColumns
) {
super(
ectx,
desc.rowType(ectx.getTypeFactory(), requiredColumns),
treeIdx,
ranges
);
this.desc = desc;
this.idx = treeIdx.idx;
cctx = desc.cacheContext();
kctx = cctx.kernalContext();
factory = ectx.rowHandler().factory(ectx.getTypeFactory(), rowType);
topVer = ectx.topologyVersion();
this.parts = parts;
mvccSnapshot = ectx.mvccSnapshot();
this.requiredColumns = requiredColumns;
this.idxFieldMapping = idxFieldMapping;
RelDataType srcRowType = desc.rowType(ectx.getTypeFactory(), null);
IgniteTypeFactory typeFactory = ectx.getTypeFactory();
fieldsStoreTypes = new Type[srcRowType.getFieldCount()];
for (int i = 0; i < srcRowType.getFieldCount(); i++)
fieldsStoreTypes[i] = typeFactory.getResultClass(srcRowType.getFieldList().get(i).getType());
fieldIdxMapping = fieldToInlinedKeysMapping(srcRowType.getFieldCount());
}
/**
* Checks if we can use inlined index keys instead of cache row iteration and returns fields to keys mapping.
*
* @return Mapping from target row fields to inlined index keys, or {@code null} if inlined index keys
* should not be used.
*/
private int[] fieldToInlinedKeysMapping(int srcFieldsCnt) {
List<InlineIndexKeyType> inlinedKeys = idx.segment(0).rowHandler().inlineIndexKeyTypes();
// Since inline scan doesn't check expire time, allow it only if expired entries are eagerly removed.
if (!cctx.config().isEagerTtl())
return null;
// Even if we need some subset of inlined keys we are required to the read full inlined row, since this row
// is also participated in comparison with other rows when cursor processing the next index page.
if (inlinedKeys.size() < idx.segment(0).rowHandler().indexKeyDefinitions().size() ||
inlinedKeys.size() < (requiredColumns == null ? srcFieldsCnt : requiredColumns.cardinality()))
return null;
for (InlineIndexKeyType keyType : inlinedKeys) {
// Variable length types can be not fully inlined, so it's probably better to directly read full cache row
// instead of trying to read inlined value and than falllback to cache row reading.
// Inlined JAVA_OBJECT can't be compared with fill cache row in case of hash collision, this can lead to
// issues when processing the next index page in cursor if current page was concurrently splitted.
if (keyType.keySize() < 0 || keyType.type() == IndexKeyType.JAVA_OBJECT)
return null;
}
ImmutableBitSet reqCols = requiredColumns == null ? ImmutableBitSet.range(0, srcFieldsCnt) :
requiredColumns;
int[] fieldIdxMapping = new int[rowType.getFieldCount()];
for (int i = 0, j = reqCols.nextSetBit(0); j != -1; j = reqCols.nextSetBit(j + 1), i++) {
// j = source field index, i = target field index.
int keyIdx = idxFieldMapping.indexOf(j);
if (keyIdx >= 0 && keyIdx < inlinedKeys.size())
fieldIdxMapping[i] = keyIdx;
else
return null;
}
return fieldIdxMapping;
}
/** {@inheritDoc} */
@Override public synchronized Iterator<Row> iterator() {
reserve();
try {
return super.iterator();
}
catch (Exception e) {
release();
throw e;
}
}
/** {@inheritDoc} */
@Override protected IndexRow row2indexRow(Row bound) {
if (bound == null)
return null;
InlineIndexRowHandler idxRowHnd = idx.segment(0).rowHandler();
RowHandler<Row> rowHnd = ectx.rowHandler();
IndexKey[] keys = new IndexKey[idxRowHnd.indexKeyDefinitions().size()];
assert keys.length >= idxFieldMapping.size() : "Unexpected index keys [keys.length=" + keys.length +
", idxFieldMapping.size()=" + idxFieldMapping.size() + ']';
boolean nullSearchRow = true;
for (int i = 0; i < idxFieldMapping.size(); ++i) {
int fieldIdx = idxFieldMapping.getInt(i);
Object key = rowHnd.get(fieldIdx, bound);
if (key != ectx.unspecifiedValue()) {
key = TypeUtils.fromInternal(ectx, key, fieldsStoreTypes[fieldIdx]);
keys[i] = IndexKeyFactory.wrap(key, idxRowHnd.indexKeyDefinitions().get(i).idxType(),
cctx.cacheObjectContext(), idxRowHnd.indexKeyTypeSettings());
nullSearchRow = false;
}
}
return nullSearchRow ? null : new IndexPlainRowImpl(keys, idxRowHnd);
}
/** {@inheritDoc} */
@Override protected Row indexRow2Row(IndexRow row) throws IgniteCheckedException {
if (row.indexPlainRow())
return inlineIndexRow2Row(row);
else
return desc.toRow(ectx, row.cacheDataRow(), factory, requiredColumns);
}
/** */
private Row inlineIndexRow2Row(IndexRow row) {
RowHandler<Row> hnd = ectx.rowHandler();
Row res = factory.create();
for (int i = 0; i < fieldIdxMapping.length; i++)
hnd.set(i, res, TypeUtils.toInternal(ectx, row.key(fieldIdxMapping[i]).key()));
return res;
}
/** */
@Override public void close() {
release();
}
/** */
private synchronized void reserve() {
if (reserved != null)
return;
GridDhtPartitionTopology top = cctx.topology();
top.readLock();
GridDhtTopologyFuture topFut = top.topologyVersionFuture();
boolean done = topFut.isDone();
if (!done || !(topFut.topologyVersion().compareTo(topVer) >= 0
&& cctx.shared().exchange().lastAffinityChangedTopologyVersion(topFut.initialVersion()).compareTo(topVer) <= 0)) {
top.readUnlock();
throw new ClusterTopologyException("Topology was changed. Please retry on stable topology.");
}
List<GridDhtLocalPartition> toReserve;
if (cctx.isReplicated()) {
int partsCnt = cctx.affinity().partitions();
toReserve = new ArrayList<>(partsCnt);
for (int i = 0; i < partsCnt; i++)
toReserve.add(top.localPartition(i));
}
else if (cctx.isPartitioned()) {
assert parts != null;
toReserve = new ArrayList<>(parts.length);
for (int i = 0; i < parts.length; i++)
toReserve.add(top.localPartition(parts[i]));
}
else
toReserve = Collections.emptyList();
reserved = new ArrayList<>(toReserve.size());
try {
for (GridDhtLocalPartition part : toReserve) {
if (part == null || !part.reserve()) {
throw new ClusterTopologyException(
"Failed to reserve partition for query execution. Retry on stable topology."
);
}
else if (part.state() != GridDhtPartitionState.OWNING) {
part.release();
throw new ClusterTopologyException(
"Failed to reserve partition for query execution. Retry on stable topology."
);
}
reserved.add(part);
}
}
catch (Exception e) {
release();
throw e;
}
finally {
top.readUnlock();
}
}
/** */
private synchronized void release() {
if (reserved == null)
return;
for (GridDhtLocalPartition part : reserved)
part.release();
reserved = null;
}
/** {@inheritDoc} */
@Override protected IndexQueryContext indexQueryContext() {
IndexingQueryFilter filter = new IndexingQueryFilterImpl(kctx, topVer, parts);
InlineIndexRowHandler rowHnd = idx.segment(0).rowHandler();
InlineIndexRowFactory rowFactory = isInlineScan() ?
new InlineIndexRowFactory(rowHnd.inlineIndexKeyTypes().toArray(new InlineIndexKeyType[0]), rowHnd) : null;
BPlusTree.TreeRowClosure<IndexRow, IndexRow> rowFilter = isInlineScan() ? null : createNotExpiredRowFilter();
return new IndexQueryContext(filter, rowFilter, rowFactory, mvccSnapshot);
}
/** */
public boolean isInlineScan() {
return fieldIdxMapping != null;
}
/** */
private static class InlineIndexRowFactory implements BPlusTree.TreeRowFactory<IndexRow, IndexRow> {
/** Inline key types. */
private final InlineIndexKeyType[] keyTypes;
/** */
private final InlineIndexRowHandler idxRowHnd;
/** Read full cache index row instead of inlined values. */
private boolean useCacheRow;
/** */
private InlineIndexRowFactory(
InlineIndexKeyType[] keyTypes,
InlineIndexRowHandler idxRowHnd
) {
this.keyTypes = keyTypes;
this.idxRowHnd = idxRowHnd;
}
/** {@inheritDoc} */
@Override public IndexRow create(
BPlusTree<IndexRow, IndexRow> tree,
BPlusIO<IndexRow> io,
long pageAddr,
int idx
) throws IgniteCheckedException {
if (useCacheRow)
return io.getLookupRow(tree, pageAddr, idx);
int inlineSize = ((InlineIO)io).inlineSize();
int rowOffset = io.offset(idx);
int keyOffset = 0;
IndexKey[] keys = new IndexKey[keyTypes.length];
// Check if all required keys is inlined before creating index row.
for (int keyIdx = 0; keyIdx < keyTypes.length; keyIdx++) {
InlineIndexKeyType keyType = keyTypes[keyIdx];
if (!keyType.inlinedFullValue(pageAddr, rowOffset + keyOffset, inlineSize - keyOffset)) {
// Since we are checking only fixed-length keys, this condition means that for all rows current
// key type is not fully inlined, so fallback to cache index row.
useCacheRow = true;
return io.getLookupRow(tree, pageAddr, idx);
}
keys[keyIdx] = keyType.get(pageAddr, rowOffset + keyOffset, inlineSize - keyOffset);
keyOffset += keyType.inlineSize(pageAddr, rowOffset + keyOffset);
}
return new IndexPlainRowImpl(keys, idxRowHnd);
}
}
/**
* Creates row filter to skip null values in the first index column.
*/
public static BPlusTree.TreeRowClosure<IndexRow, IndexRow> createNotNullRowFilter(
InlineIndex idx,
boolean checkExpired
) {
List<InlineIndexKeyType> inlineKeyTypes = idx.segment(0).rowHandler().inlineIndexKeyTypes();
InlineIndexKeyType keyType = F.isEmpty(inlineKeyTypes) ? null : inlineKeyTypes.get(0);
return new BPlusTree.TreeRowClosure<IndexRow, IndexRow>() {
/** {@inheritDoc} */
@Override public boolean apply(
BPlusTree<IndexRow, IndexRow> tree,
BPlusIO<IndexRow> io,
long pageAddr,
int idx
) throws IgniteCheckedException {
if (!checkExpired && keyType != null && io instanceof InlineIO) {
Boolean keyIsNull = keyType.isNull(pageAddr, io.offset(idx), ((InlineIO)io).inlineSize());
if (keyIsNull == Boolean.TRUE)
return false;
}
IndexRow idxRow = io.getLookupRow(tree, pageAddr, idx);
if (checkExpired &&
idxRow.cacheDataRow().expireTime() > 0 &&
idxRow.cacheDataRow().expireTime() <= U.currentTimeMillis())
return false;
return idxRow.key(0).type() != IndexKeyType.NULL;
}
};
}
/** */
public static BPlusTree.TreeRowClosure<IndexRow, IndexRow> createNotExpiredRowFilter() {
return (tree, io, pageAddr, idx) -> {
IndexRow idxRow = io.getLookupRow(tree, pageAddr, idx);
// Skip expired.
return !(idxRow.cacheDataRow().expireTime() > 0 &&
idxRow.cacheDataRow().expireTime() <= U.currentTimeMillis());
};
}
/** */
protected static class TreeIndexWrapper implements TreeIndex<IndexRow> {
/** Underlying index. */
protected final InlineIndex idx;
/** */
protected TreeIndexWrapper(InlineIndex idx) {
this.idx = idx;
}
/** {@inheritDoc} */
@Override public GridCursor<IndexRow> find(
IndexRow lower,
IndexRow upper,
boolean lowerInclude,
boolean upperInclude,
IndexQueryContext qctx
) {
try {
return idx.find(lower, upper, lowerInclude, upperInclude, qctx);
}
catch (IgniteCheckedException e) {
throw new IgniteException("Failed to find index rows", e);
}
}
}
}