blob: 832fb3e01fdbaa2e432078a49a4ce02a8733e86d [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.processors.query.h2.database;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import javax.cache.CacheException;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.GridTopic;
import org.apache.ignite.internal.cache.query.index.Index;
import org.apache.ignite.internal.cache.query.index.sorted.IndexKeyType;
import org.apache.ignite.internal.cache.query.index.sorted.IndexPlainRowImpl;
import org.apache.ignite.internal.cache.query.index.sorted.IndexRow;
import org.apache.ignite.internal.cache.query.index.sorted.IndexRowImpl;
import org.apache.ignite.internal.cache.query.index.sorted.IndexValueCursor;
import org.apache.ignite.internal.cache.query.index.sorted.InlineIndexRowHandler;
import org.apache.ignite.internal.cache.query.index.sorted.SortedIndexDefinition;
import org.apache.ignite.internal.cache.query.index.sorted.inline.IndexQueryContext;
import org.apache.ignite.internal.cache.query.index.sorted.inline.InlineIndexImpl;
import org.apache.ignite.internal.cache.query.index.sorted.inline.InlineIndexKeyType;
import org.apache.ignite.internal.cache.query.index.sorted.inline.InlineIndexKeyTypeRegistry;
import org.apache.ignite.internal.cache.query.index.sorted.keys.IndexKey;
import org.apache.ignite.internal.cache.query.index.sorted.keys.IndexKeyFactory;
import org.apache.ignite.internal.managers.communication.GridIoPolicy;
import org.apache.ignite.internal.managers.communication.GridMessageListener;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
import org.apache.ignite.internal.processors.cache.persistence.tree.BPlusTree;
import org.apache.ignite.internal.processors.query.GridQueryRowDescriptor;
import org.apache.ignite.internal.processors.query.QueryUtils;
import org.apache.ignite.internal.processors.query.h2.H2Cursor;
import org.apache.ignite.internal.processors.query.h2.H2Utils;
import org.apache.ignite.internal.processors.query.h2.IgniteH2Indexing;
import org.apache.ignite.internal.processors.query.h2.opt.GridH2Table;
import org.apache.ignite.internal.processors.query.h2.opt.H2CacheRow;
import org.apache.ignite.internal.processors.query.h2.opt.H2Row;
import org.apache.ignite.internal.processors.query.h2.opt.QueryContext;
import org.apache.ignite.internal.processors.query.h2.opt.QueryContextRegistry;
import org.apache.ignite.internal.processors.query.h2.opt.join.CursorIteratorWrapper;
import org.apache.ignite.internal.processors.query.h2.opt.join.DistributedJoinContext;
import org.apache.ignite.internal.processors.query.h2.opt.join.DistributedLookupBatch;
import org.apache.ignite.internal.processors.query.h2.opt.join.RangeSource;
import org.apache.ignite.internal.processors.query.h2.opt.join.RangeStream;
import org.apache.ignite.internal.processors.query.h2.opt.join.SegmentKey;
import org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2IndexRangeRequest;
import org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2IndexRangeResponse;
import org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2RowMessage;
import org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2RowRange;
import org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2RowRangeBounds;
import org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2ValueMessage;
import org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2ValueMessageFactory;
import org.apache.ignite.internal.processors.tracing.MTC;
import org.apache.ignite.internal.processors.tracing.MTC.TraceSurroundings;
import org.apache.ignite.internal.processors.tracing.Span;
import org.apache.ignite.internal.util.GridSpinBusyLock;
import org.apache.ignite.internal.util.lang.GridCursor;
import org.apache.ignite.internal.util.typedef.CIX2;
import org.apache.ignite.internal.util.typedef.T2;
import org.apache.ignite.internal.util.typedef.internal.LT;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteBiTuple;
import org.apache.ignite.plugin.extensions.communication.Message;
import org.apache.ignite.spi.indexing.IndexingQueryCacheFilter;
import org.h2.engine.Session;
import org.h2.index.Cursor;
import org.h2.index.IndexCondition;
import org.h2.index.IndexLookupBatch;
import org.h2.index.IndexType;
import org.h2.message.DbException;
import org.h2.result.SearchRow;
import org.h2.table.IndexColumn;
import org.h2.table.TableFilter;
import org.h2.value.Value;
import org.jetbrains.annotations.NotNull;
import static java.util.Collections.singletonList;
import static org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2IndexRangeResponse.STATUS_ERROR;
import static org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2IndexRangeResponse.STATUS_NOT_FOUND;
import static org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2IndexRangeResponse.STATUS_OK;
import static org.apache.ignite.internal.processors.tracing.SpanTags.ERROR;
import static org.apache.ignite.internal.processors.tracing.SpanTags.SQL_IDX;
import static org.apache.ignite.internal.processors.tracing.SpanTags.SQL_IDX_RANGE_ROWS;
import static org.apache.ignite.internal.processors.tracing.SpanTags.SQL_TABLE;
import static org.apache.ignite.internal.processors.tracing.SpanType.SQL_IDX_RANGE_REQ;
import static org.apache.ignite.internal.processors.tracing.SpanType.SQL_IDX_RANGE_RESP;
import static org.h2.result.Row.MEMORY_CALCULATE;
/**
* H2 Index over {@link BPlusTree}.
*/
@SuppressWarnings({"unchecked"})
public class H2TreeIndex extends H2TreeIndexBase {
/** Underlying Ignite index. */
private final InlineIndexImpl queryIndex;
/** Kernal context. */
private final GridKernalContext ctx;
/** Cache context. */
private final GridCacheContext<?, ?> cctx;
/** Table name. */
private final String tblName;
/** Index name. */
private final String idxName;
/** */
private final IgniteLogger log;
/** */
private final Object msgTopic;
/** Query context registry. */
private final QueryContextRegistry qryCtxRegistry;
/** */
private final GridMessageListener msgLsnr;
/** */
private final CIX2<ClusterNode, Message> locNodeHnd = new CIX2<ClusterNode, Message>() {
@Override public void applyx(ClusterNode locNode, Message msg) {
onMessage0(locNode.id(), msg);
}
};
/** */
public H2TreeIndex(InlineIndexImpl queryIndex, GridH2Table tbl, IndexColumn[] cols, boolean pk, IgniteLogger log) {
super(tbl, queryIndex.name(), cols,
pk ? IndexType.createPrimaryKey(false, false) :
IndexType.createNonUnique(false, false, false));
cctx = tbl.cacheContext();
ctx = cctx.kernalContext();
tblName = tbl.getName();
idxName = queryIndex.name();
this.log = log;
this.queryIndex = queryIndex;
qryCtxRegistry = ((IgniteH2Indexing)(ctx.query().getIndexing())).queryContextRegistry();
// Initialize distributed joins.
msgTopic = new IgniteBiTuple<>(GridTopic.TOPIC_QUERY, tbl.identifierString() + '.' + getName());
msgLsnr = new GridMessageListener() {
@Override public void onMessage(UUID nodeId, Object msg, byte plc) {
GridSpinBusyLock l = tbl.tableDescriptor().indexing().busyLock();
if (!l.enterBusy())
return;
try {
onMessage0(nodeId, msg);
}
finally {
l.leaveBusy();
}
}
};
ctx.io().addMessageListener(msgTopic, msgLsnr);
}
/** {@inheritDoc} */
@Override public int inlineSize() {
return queryIndex.inlineSize();
}
/** {@inheritDoc} */
@Override public int segmentsCount() {
return queryIndex.segmentsCount();
}
/** {@inheritDoc} */
@Override public long totalRowCount(IndexingQueryCacheFilter partsFilter) {
return 0;
}
/** {@inheritDoc} */
@Override public Cursor find(Session ses, SearchRow lower, SearchRow upper) {
assert lower == null || lower instanceof H2Row : lower;
assert upper == null || upper instanceof H2Row : upper;
try {
T2<IndexRow, IndexRow> key = prepareIndexKeys(lower, upper);
QueryContext qctx = ses != null ? H2Utils.context(ses) : null;
GridCursor<IndexRow> cursor = queryIndex.find(key.get1(), key.get2(), true, true, segment(qctx), idxQryContext(qctx));
GridCursor<H2Row> h2cursor = new IndexValueCursor<>(cursor, this::mapIndexRow);
return new H2Cursor(h2cursor);
}
catch (IgniteCheckedException e) {
throw DbException.convert(e);
}
}
/** */
private IndexQueryContext idxQryContext(QueryContext qctx) {
if (qctx == null)
return null;
return new IndexQueryContext(qctx.filter(), null, qctx.mvccSnapshot());
}
/** */
private T2<IndexRow, IndexRow> prepareIndexKeys(SearchRow lower, SearchRow upper) {
InlineIndexRowHandler rowHnd = queryIndex.segment(0).rowHandler();
return new T2<>(prepareIndexKey(lower, rowHnd), prepareIndexKey(upper, rowHnd));
}
/** */
private IndexRow prepareIndexKey(SearchRow row, InlineIndexRowHandler rowHnd) {
if (row == null)
return null;
else if (row instanceof H2CacheRow)
return new IndexRowImpl(rowHnd, (CacheDataRow)row, getCachedKeys((H2CacheRow)row));
else
return preparePlainIndexKey(row, rowHnd);
}
/** Extract cached values of a row to skip the double work of getting index keys from Ignite cache. */
private IndexKey[] getCachedKeys(H2CacheRow row) {
IndexKey[] cached = new IndexKey[columnIds.length];
for (int i = 0; i < columnIds.length; ++i) {
Object key = row.getCached(columnIds[i]);
if (key == null)
break;
cached[i] = IndexKeyFactory.wrap(
key, columns[i].getType(), cctx.cacheObjectContext(), queryIndex.keyTypeSettings());
}
return cached;
}
/** */
private IndexRow preparePlainIndexKey(SearchRow row, InlineIndexRowHandler rowHnd) {
int idxColsLen = indexColumns.length;
IndexKey[] keys = row == null ? null : new IndexKey[idxColsLen];
for (int i = 0; i < idxColsLen; ++i) {
int colId = indexColumns[i].column.getColumnId();
Value v = row.getValue(colId);
IndexKeyType colType = rowHnd.indexKeyDefinitions().get(i).idxType();
if (v == null)
break;
// If it's possible to convert search row to index value type - do it. In this case converted value
// can be used for the inline search.
// Otherwise, we can use search row as index find bound only if types have the same comparison rules.
// If types have different comparison rules (for example, '2' > '10' for strings and 2 < 10 for integers)
// best we can do here is leave search bound empty. In this case index scan by bounds can be extended to
// full index scan and rows will be filtered out by original condition on H2 level.
if (colType.code() != v.getType()) {
if (Value.getHigherOrder(colType.code(), v.getType()) == colType.code())
v = v.convertTo(colType.code());
else {
InlineIndexKeyType colKeyType = InlineIndexKeyTypeRegistry.get(colType, queryIndex.keyTypeSettings());
IndexKey idxKey = IndexKeyFactory.wrap(v.getObject(), v.getType(), cctx.cacheObjectContext(),
queryIndex.keyTypeSettings());
if (colKeyType.isComparableTo(idxKey)) {
keys[i] = idxKey;
continue;
}
LT.warn(log, "Provided value can't be used as index search bound due to column data type " +
"mismatch. This can lead to full index scans instead of range index scans. [index=" +
idxName + ", colType=" + colType + ", valType=" + IndexKeyType.forCode(v.getType()) + ']');
break;
}
}
keys[i] = IndexKeyFactory.wrap(
v.getObject(), v.getType(), cctx.cacheObjectContext(), queryIndex.keyTypeSettings());
}
return new IndexPlainRowImpl(keys, rowHnd);
}
/** */
private H2Row mapIndexRow(IndexRow row) {
if (row == null)
return null;
return new H2CacheRow(rowDescriptor(), row.cacheDataRow());
}
/** {@inheritDoc} */
@Override public long getRowCount(Session ses) {
try {
QueryContext qctx = H2Utils.context(ses);
return queryIndex.count(segment(qctx), idxQryContext(qctx));
}
catch (IgniteCheckedException e) {
throw DbException.convert(e);
}
}
/** {@inheritDoc} */
@Override public Cursor findFirstOrLast(Session ses, boolean b) {
try {
QueryContext qctx = H2Utils.context(ses);
IndexQueryContext qryCtx = idxQryContext(qctx);
GridCursor<IndexRow> cursor = b ?
queryIndex.findFirst(segment(qctx), qryCtx)
: queryIndex.findLast(segment(qctx), qryCtx);
return new H2Cursor(new IndexValueCursor<>(cursor, this::mapIndexRow));
}
catch (IgniteCheckedException e) {
throw DbException.convert(e);
}
}
/**
* Determines if provided row can be treated as expired at the current moment.
*
* @param row row to check.
* @throws NullPointerException if provided row is {@code null}.
*/
private static boolean isExpired(@NotNull H2Row row) {
return row.expireTime() > 0 && row.expireTime() <= U.currentTimeMillis();
}
/** {@inheritDoc} */
@Override public void destroy() {
try {
super.destroy();
}
finally {
if (msgLsnr != null)
ctx.io().removeMessageListener(msgTopic, msgLsnr);
}
}
/** {@inheritDoc} */
@Override public H2CacheRow put(H2CacheRow row) {
throw new IllegalStateException("Must not be invoked.");
}
/** {@inheritDoc} */
@Override public boolean putx(H2CacheRow row) {
throw new IllegalStateException("Must not be invoked.");
}
/** {@inheritDoc} */
@Override public boolean removex(SearchRow row) {
throw new IllegalStateException("Must not be invoked.");
}
/** {@inheritDoc} */
@Override public IndexLookupBatch createLookupBatch(TableFilter[] filters, int filter) {
QueryContext qctx = H2Utils.context(filters[filter].getSession());
if (qctx == null || qctx.distributedJoinContext() == null || !getTable().isPartitioned())
return null;
IndexColumn affCol = getTable().getAffinityKeyColumn();
GridQueryRowDescriptor desc = getTable().rowDescriptor();
int affColId = -1;
boolean ucast = false;
if (affCol != null) {
affColId = affCol.column.getColumnId();
int[] masks = filters[filter].getMasks();
if (masks != null) {
ucast = (masks[affColId] & IndexCondition.EQUALITY) != 0 ||
(masks[QueryUtils.KEY_COL] & IndexCondition.EQUALITY) != 0 ||
(masks[desc.getAlternativeColumnId(QueryUtils.KEY_COL)] & IndexCondition.EQUALITY) != 0;
}
}
return new DistributedLookupBatch(this, cctx, ucast, affColId);
}
/**
* @param nodes Nodes.
* @param msg Message.
*/
public void send(Collection<ClusterNode> nodes, Message msg) {
boolean res = getTable().tableDescriptor().indexing().send(msgTopic,
-1,
nodes,
msg,
null,
locNodeHnd,
GridIoPolicy.IDX_POOL,
false
);
if (!res)
throw H2Utils.retryException("Failed to send message to nodes: " + nodes);
}
/**
* @param nodeId Source node ID.
* @param msg Message.
*/
private void onMessage0(UUID nodeId, Object msg) {
ClusterNode node = ctx.discovery().node(nodeId);
if (node == null)
return;
try {
if (msg instanceof GridH2IndexRangeRequest)
onIndexRangeRequest(node, (GridH2IndexRangeRequest)msg);
else if (msg instanceof GridH2IndexRangeResponse)
onIndexRangeResponse(node, (GridH2IndexRangeResponse)msg);
}
catch (Throwable th) {
U.error(log, "Failed to handle message[nodeId=" + nodeId + ", msg=" + msg + "]", th);
if (th instanceof Error)
throw th;
}
}
/**
* @param node Requesting node.
* @param msg Request message.
*/
private void onIndexRangeRequest(final ClusterNode node, final GridH2IndexRangeRequest msg) {
// We don't use try with resources on purpose - the catch block must also be executed in the context of this span.
TraceSurroundings trace = MTC.support(ctx.tracing().create(SQL_IDX_RANGE_REQ, MTC.span()));
Span span = MTC.span();
try {
span.addTag(SQL_IDX, () -> idxName);
span.addTag(SQL_TABLE, () -> tblName);
GridH2IndexRangeResponse res = new GridH2IndexRangeResponse();
res.originNodeId(msg.originNodeId());
res.queryId(msg.queryId());
res.originSegmentId(msg.originSegmentId());
res.segment(msg.segment());
res.batchLookupId(msg.batchLookupId());
QueryContext qctx = qryCtxRegistry.getShared(
msg.originNodeId(),
msg.queryId(),
msg.originSegmentId()
);
if (qctx == null)
res.status(STATUS_NOT_FOUND);
else {
DistributedJoinContext joinCtx = qctx.distributedJoinContext();
assert joinCtx != null;
try {
RangeSource src;
if (msg.bounds() != null) {
// This is the first request containing all the search rows.
assert !msg.bounds().isEmpty() : "empty bounds";
src = new RangeSource(this, msg.bounds(), msg.segment(), idxQryContext(qctx));
}
else {
// This is request to fetch next portion of data.
src = joinCtx.getSource(node.id(), msg.segment(), msg.batchLookupId());
assert src != null;
}
List<GridH2RowRange> ranges = new ArrayList<>();
int maxRows = joinCtx.pageSize();
assert maxRows > 0 : maxRows;
while (maxRows > 0) {
GridH2RowRange range = src.next(maxRows);
if (range == null)
break;
ranges.add(range);
if (range.rows() != null)
maxRows -= range.rows().size();
}
assert !ranges.isEmpty();
if (src.hasMoreRows()) {
// Save source for future fetches.
if (msg.bounds() != null)
joinCtx.putSource(node.id(), msg.segment(), msg.batchLookupId(), src);
}
else if (msg.bounds() == null) {
// Drop saved source.
joinCtx.putSource(node.id(), msg.segment(), msg.batchLookupId(), null);
}
res.ranges(ranges);
res.status(STATUS_OK);
span.addTag(SQL_IDX_RANGE_ROWS, () ->
Integer.toString(ranges.stream().mapToInt(GridH2RowRange::rowsSize).sum()));
}
catch (Throwable th) {
span.addTag(ERROR, th::getMessage);
U.error(log, "Failed to process request: " + msg, th);
res.error(th.getClass() + ": " + th.getMessage());
res.status(STATUS_ERROR);
}
}
send(singletonList(node), res);
}
catch (Throwable th) {
span.addTag(ERROR, th::getMessage);
throw th;
}
finally {
if (trace != null)
trace.close();
}
}
/**
* @param node Responded node.
* @param msg Response message.
*/
private void onIndexRangeResponse(ClusterNode node, GridH2IndexRangeResponse msg) {
try (TraceSurroundings ignored = MTC.support(ctx.tracing().create(SQL_IDX_RANGE_RESP, MTC.span()))) {
QueryContext qctx = qryCtxRegistry.getShared(
msg.originNodeId(),
msg.queryId(),
msg.originSegmentId()
);
if (qctx == null)
return;
DistributedJoinContext joinCtx = qctx.distributedJoinContext();
assert joinCtx != null;
Map<SegmentKey, RangeStream> streams = joinCtx.getStreams(msg.batchLookupId());
if (streams == null)
return;
RangeStream stream = streams.get(new SegmentKey(node, msg.segment()));
assert stream != null;
stream.onResponse(msg);
}
}
/**
* Find rows for the segments (distributed joins).
*
* @param bounds Bounds.
* @param segment Segment.
* @param qryCtx Index query context.
* @return Iterator.
*/
public Iterator<H2Row> findForSegment(GridH2RowRangeBounds bounds, int segment, IndexQueryContext qryCtx) {
SearchRow lower = toSearchRow(bounds.first());
SearchRow upper = toSearchRow(bounds.last());
T2<IndexRow, IndexRow> key = prepareIndexKeys(lower, upper);
try {
GridCursor<IndexRow> range = queryIndex.find(key.get1(), key.get2(), true, true, segment, qryCtx);
if (range == null)
range = IndexValueCursor.EMPTY;
GridCursor<H2Row> h2cursor = new IndexValueCursor<>(range, this::mapIndexRow);
H2Cursor cur = new H2Cursor(h2cursor);
return new CursorIteratorWrapper(cur);
}
catch (IgniteCheckedException e) {
throw DbException.convert(e);
}
}
/**
* @param msg Row message.
* @return Search row.
*/
private SearchRow toSearchRow(GridH2RowMessage msg) {
if (msg == null)
return null;
Value[] vals = new Value[getTable().getColumns().length];
assert vals.length > 0;
List<GridH2ValueMessage> msgVals = msg.values();
for (int i = 0; i < indexColumns.length; i++) {
if (i >= msgVals.size())
continue;
try {
vals[indexColumns[i].column.getColumnId()] = msgVals.get(i).value(ctx);
}
catch (IgniteCheckedException e) {
throw new CacheException(e);
}
}
return database.createRow(vals, MEMORY_CALCULATE);
}
/**
* @param row Search row.
* @return Row message.
*/
public GridH2RowMessage toSearchRowMessage(SearchRow row) {
if (row == null)
return null;
List<GridH2ValueMessage> vals = new ArrayList<>(indexColumns.length);
for (IndexColumn idxCol : indexColumns) {
Value val = row.getValue(idxCol.column.getColumnId());
if (val == null)
break;
try {
vals.add(GridH2ValueMessageFactory.toMessage(val));
}
catch (IgniteCheckedException e) {
throw new CacheException(e);
}
}
GridH2RowMessage res = new GridH2RowMessage();
res.values(vals);
return res;
}
/**
* Returns number of elements in the tree by scanning pages of the bottom (leaf) level.
*
* @return Number of elements in the tree.
* @throws IgniteCheckedException If failed.
*/
public long size() throws IgniteCheckedException {
return queryIndex.totalCount();
}
/**
* Creates a new index that is an exact copy of this index.
*
* @return New index.
*/
public H2TreeIndex createCopy(InlineIndexImpl inlineIndex, SortedIndexDefinition idxDef) throws IgniteCheckedException {
return new H2TreeIndex(inlineIndex, tbl, indexColumns, idxDef.primary(), log);
}
/**
* @return Index's id.
*/
public UUID indexId() {
return queryIndex.id();
}
/**
* @return Index.
*/
public InlineIndexImpl index() {
return queryIndex;
}
/** {@inheritDoc} */
@Override public <T extends Index> T unwrap(Class<T> clazz) {
if (clazz.isInstance(queryIndex))
return clazz.cast(queryIndex);
return super.unwrap(clazz);
}
}