blob: 18f15c2d3d25fe45ea53b6af484c3b0a0932b89f [file] [log] [blame]
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package org.apache.ignite.internal.processors.query.h2.twostep;
import java.util.BitSet;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import javax.cache.CacheException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.processors.query.h2.twostep.messages.GridQueryNextPageResponse;
import org.apache.ignite.internal.processors.tracing.MTC;
import org.apache.ignite.internal.processors.tracing.MTC.TraceSurroundings;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.h2.index.Cursor;
import org.h2.result.Row;
import org.h2.result.SearchRow;
import org.h2.value.Value;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import static java.util.Objects.requireNonNull;
import static org.apache.ignite.IgniteSystemProperties.IGNITE_SQL_MERGE_TABLE_MAX_SIZE;
import static org.apache.ignite.IgniteSystemProperties.IGNITE_SQL_MERGE_TABLE_PREFETCH_SIZE;
import static org.apache.ignite.IgniteSystemProperties.getInteger;
import static org.apache.ignite.internal.processors.tracing.SpanTags.SQL_PAGE_ROWS;
import static org.apache.ignite.internal.processors.tracing.SpanType.SQL_PAGE_FETCH;
import static org.apache.ignite.internal.processors.tracing.SpanType.SQL_PAGE_WAIT;
* Base class for reducer of remote index lookup results.
public abstract class AbstractReducer implements Reducer {
/** */
static final int MAX_FETCH_SIZE = getInteger(IGNITE_SQL_MERGE_TABLE_MAX_SIZE, 10_000);
/** */
static int prefetchSize = getInteger(IGNITE_SQL_MERGE_TABLE_PREFETCH_SIZE, 1024);
static {
if (!U.isPow2(prefetchSize)) {
throw new IllegalArgumentException(IGNITE_SQL_MERGE_TABLE_PREFETCH_SIZE + " (" + prefetchSize +
") must be positive and a power of 2.");
if (prefetchSize >= MAX_FETCH_SIZE) {
throw new IllegalArgumentException(IGNITE_SQL_MERGE_TABLE_PREFETCH_SIZE + " (" + prefetchSize +
") must be less than " + IGNITE_SQL_MERGE_TABLE_MAX_SIZE + " (" + MAX_FETCH_SIZE + ").");
/** */
private static final AtomicReferenceFieldUpdater<AbstractReducer, ConcurrentMap> LAST_PAGES_UPDATER =
AtomicReferenceFieldUpdater.newUpdater(AbstractReducer.class, ConcurrentMap.class, "lastPages");
/** */
private final GridKernalContext ctx;
/** DO NOT change name field of this field, updated through {@link #LAST_PAGES_UPDATER} */
private volatile ConcurrentMap<ReduceSourceKey, Integer> lastPages;
/** Row source nodes. */
protected Set<UUID> srcNodes;
/** */
private int pageSize;
* Will be r/w from query execution thread only, does not need to be threadsafe.
protected final ReduceBlockList<Row> fetched;
/** */
private Row lastEvictedRow;
* Constructor.
* @param ctx Kernal context.
AbstractReducer(GridKernalContext ctx) {
this.ctx = ctx;
fetched = new ReduceBlockList<>(prefetchSize);
/** {@inheritDoc} */
@Override public void setSources(Map<ClusterNode, BitSet> nodesToSegmentsCnt) {
assert srcNodes == null;
srcNodes = new HashSet<>(nodesToSegmentsCnt.size());
for (ClusterNode node : nodesToSegmentsCnt.keySet()) {
if (!srcNodes.add(
throw new IllegalStateException();
/** {@inheritDoc} */
@Override public Set<UUID> sources() {
return srcNodes;
/** {@inheritDoc} */
@Override public boolean hasSource(UUID nodeId) {
return srcNodes.contains(nodeId);
/** {@inheritDoc} */
@Override public void setPageSize(int pageSize) {
this.pageSize = pageSize;
/** {@inheritDoc} */
@Override public void onFailure(UUID nodeId, final CacheException e) {
if (nodeId == null)
nodeId = F.first(srcNodes);
addPage0(new ReduceResultPage(null, nodeId, null) {
@Override public boolean isFail() {
return true;
@Override public void fetchNextPage() {
if (e == null)
throw e;
/** {@inheritDoc} */
@Override public final Cursor find(@Nullable SearchRow first, @Nullable SearchRow last) {
checkBounds(lastEvictedRow, first, last);
if (fetchedAll())
return findAllFetched(fetched, first, last);
return findInStream(first, last);
* @param first Row.
* @param last Row.
* @return Cursor over remote streams.
protected abstract Cursor findInStream(@Nullable SearchRow first, @Nullable SearchRow last);
* @param fetched Fetched data.
* @param first Row.
* @param last Row.
* @return Cursor over fetched data.
protected abstract Cursor findAllFetched(List<Row> fetched, @Nullable SearchRow first, @Nullable SearchRow last);
* @param lastEvictedRow Last evicted fetched row.
* @param first Lower bound.
* @param last Upper bound.
protected void checkBounds(Row lastEvictedRow, SearchRow first, SearchRow last) {
if (lastEvictedRow != null)
throw new IgniteException("Fetched result set was too large. " +
IGNITE_SQL_MERGE_TABLE_MAX_SIZE + "(" + MAX_FETCH_SIZE + ") should be increased.");
* @param evictedBlock Evicted block.
protected void onBlockEvict(@NotNull List<Row> evictedBlock) {
assert evictedBlock.size() == prefetchSize;
// Remember the last row (it will be max row) from the evicted block.
lastEvictedRow = requireNonNull(last(evictedBlock));
* @param l List.
* @return Last element.
static <Z> Z last(List<Z> l) {
return l.get(l.size() - 1);
/** {@inheritDoc} */
@Override public void addPage(ReduceResultPage page) {
* @param page Page.
protected abstract void addPage0(ReduceResultPage page);
* Fails index if any source node is left.
private void checkSourceNodesAlive() {
for (UUID nodeId : srcNodes) {
if (!ctx.discovery().alive(nodeId)) {
onFailure(nodeId, null);
* @param e Error.
public void fail(final CacheException e) {
for (UUID nodeId : srcNodes)
onFailure(nodeId, e);
* @param page Page.
private void markLastPage(ReduceResultPage page) {
GridQueryNextPageResponse res = page.response();
if (!res.last()) {
UUID nodeId = page.source();
initLastPages(nodeId, res);
ConcurrentMap<ReduceSourceKey, Integer> lp = lastPages;
if (lp == null)
return; // It was not initialized --> wait for last page flag.
Integer lastPage = lp.get(new ReduceSourceKey(nodeId, res.segmentId()));
if (lastPage == null)
return; // This node may use the new protocol --> wait for last page flag.
if (lastPage != {
assert lastPage >;
return; // This is not the last page.
* @param nodeId Node ID.
* @param res Response.
private void initLastPages(UUID nodeId, GridQueryNextPageResponse res) {
int allRows = res.allRows();
// If the old protocol we send all rows number in the page 0, other pages have -1.
// In the new protocol we do not know it and always have -1, except terminating page,
// which has -2. Thus we have to init page counters only when we receive positive value
// in the first page.
if (allRows < 0 || != 0)
ConcurrentMap<ReduceSourceKey, Integer> lp = lastPages;
if (lp == null && !LAST_PAGES_UPDATER.compareAndSet(this, null, lp = new ConcurrentHashMap<>()))
lp = lastPages;
assert pageSize > 0 : pageSize;
int lastPage = allRows == 0 ? 0 : (allRows - 1) / pageSize;
assert lastPage >= 0 : lastPage;
if (lp.put(new ReduceSourceKey(nodeId, res.segmentId()), lastPage) != null)
throw new IllegalStateException();
* @param lastPage Real last page.
* @return Created dummy page.
protected final ReduceResultPage createDummyLastPage(ReduceResultPage lastPage) {
assert !lastPage.isDummyLast(); // It must be a real last page.
return new ReduceResultPage(ctx, lastPage.source(), null).setLast(true);
* @param queue Queue to poll.
* @param iter Current iterator.
* @return The same or new iterator.
protected final Iterator<Value[]> pollNextIterator(Pollable<ReduceResultPage> queue, Iterator<Value[]> iter) {
if (!iter.hasNext()) {
try (TraceSurroundings ignored =, MTC.span()))) {
ReduceResultPage page = takeNextPage(queue);
if (!page.isLast())
page.fetchNextPage(); // Failed will throw an exception here.
iter = page.rows();
MTC.span().addTag(SQL_PAGE_ROWS, () -> Integer.toString(page.rowsInPage()));
// The received iterator must be empty in the dummy last page or on failure.
assert iter.hasNext() || page.isDummyLast() || page.isFail();
return iter;
* @param queue Queue to poll.
* @return Next page.
private ReduceResultPage takeNextPage(Pollable<ReduceResultPage> queue) {
try (TraceSurroundings ignored =, MTC.span()))) {
ReduceResultPage page;
for (;;) {
try {
page = queue.poll(500, TimeUnit.MILLISECONDS);
catch (InterruptedException e) {
throw new CacheException("Query execution was interrupted.", e);
if (page != null)
return page;
* Pollable.
interface Pollable<E> {
* @param timeout Timeout.
* @param unit Time unit.
* @return Polled value or {@code null} if none.
* @throws InterruptedException If interrupted.
E poll(long timeout, TimeUnit unit) throws InterruptedException;