blob: 46ba4fbce177f0382a7a4fd62dc9adb03b5cfa21 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.processors.cache.query;
import java.util.Collection;
import java.util.Collections;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteIllegalStateException;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.internal.cache.query.index.IndexQueryResultMeta;
import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.query.reducer.IndexQueryReducer;
import org.apache.ignite.internal.processors.cache.query.reducer.NodePageStream;
import org.apache.ignite.internal.processors.cache.query.reducer.TextQueryReducer;
import org.apache.ignite.internal.processors.cache.query.reducer.UnsortedCacheQueryReducer;
import org.apache.ignite.internal.util.lang.GridPlainCallable;
import org.apache.ignite.internal.util.typedef.internal.U;
import static org.apache.ignite.internal.processors.cache.query.GridCacheQueryType.INDEX;
import static org.apache.ignite.internal.processors.cache.query.GridCacheQueryType.TEXT;
/**
* Distributed query future.
*/
public class GridCacheDistributedQueryFuture<K, V, R> extends GridCacheQueryFutureAdapter<K, V, R> {
/** */
private final long reqId;
/** Helps to send cache query requests to other nodes. */
private final GridCacheDistributedQueryManager<K, V> qryMgr;
/** Collection of streams. */
private final Map<UUID, NodePageStream<R>> streams;
/** Count of streams that finish receiving remote pages. */
private final AtomicInteger noRemotePagesStreamsCnt = new AtomicInteger();
/** Count down this latch when every node responses on initial cache query request. */
private final CountDownLatch firstPageLatch = new CountDownLatch(1);
/** Set of nodes that deliver their first page. */
private Set<UUID> rcvdFirstPage = ConcurrentHashMap.newKeySet();
/** Metadata for IndexQuery. */
private final CompletableFuture<IndexQueryResultMeta> idxQryMetaFut;
/**
* @param ctx Cache context.
* @param reqId Request ID.
* @param qry Query.
*/
protected GridCacheDistributedQueryFuture(
GridCacheContext<K, V> ctx,
long reqId,
GridCacheQueryBean qry,
Collection<ClusterNode> nodes
) {
super(ctx, qry, false);
assert reqId > 0;
this.reqId = reqId;
qryMgr = (GridCacheDistributedQueryManager<K, V>)ctx.queries();
if (qry.query().partition() != null)
nodes = Collections.singletonList(node(nodes));
streams = new ConcurrentHashMap<>(nodes.size());
for (ClusterNode node : nodes) {
streams.computeIfAbsent(node.id(), nodeId ->
new NodePageStream<>(nodeId, () -> requestPages(nodeId), () -> cancelPages(nodeId)));
}
Map<UUID, NodePageStream<R>> streamsMap = Collections.unmodifiableMap(streams);
if (qry.query().type() == INDEX) {
idxQryMetaFut = new CompletableFuture<>();
reducer = new IndexQueryReducer<>(qry.query().idxQryDesc().valType(), streamsMap, cctx, idxQryMetaFut);
}
else {
idxQryMetaFut = null;
reducer = qry.query().type() == TEXT ? new TextQueryReducer<>(streamsMap) : new UnsortedCacheQueryReducer<>(streamsMap);
}
}
/**
* @return Nodes for query execution.
*/
private ClusterNode node(Collection<ClusterNode> nodes) {
ClusterNode rmtNode = null;
for (ClusterNode node : nodes) {
if (node.isLocal())
return node;
rmtNode = node;
}
return rmtNode;
}
/** {@inheritDoc} */
@Override protected void cancelQuery(Throwable err) {
firstPageLatch.countDown();
for (NodePageStream<R> s : streams.values())
s.cancel(err);
cctx.queries().onQueryFutureCanceled(reqId);
clear();
}
/** {@inheritDoc} */
@Override protected void onNodeLeft(UUID nodeId) {
NodePageStream<R> stream = streams.get(nodeId);
if (stream != null && stream.hasRemotePages())
onError(new ClusterTopologyCheckedException("Remote node has left topology: " + nodeId));
}
/** {@inheritDoc} */
@Override protected void onPage(UUID nodeId, Collection<R> data, boolean last) {
synchronized (firstPageLatch) {
if (rcvdFirstPage != null) {
rcvdFirstPage.add(nodeId);
if (rcvdFirstPage.size() == streams.size()) {
firstPageLatch.countDown();
rcvdFirstPage.clear();
rcvdFirstPage = null;
}
}
}
NodePageStream<R> stream = streams.get(nodeId);
if (stream == null)
return;
stream.addPage(data, last);
if (last) {
int cnt;
do {
cnt = noRemotePagesStreamsCnt.get();
} while (!noRemotePagesStreamsCnt.compareAndSet(cnt, cnt + 1));
if (cnt + 1 >= streams.size())
onDone();
}
}
/** {@inheritDoc} */
@Override protected void onMeta(IndexQueryResultMeta metaData) {
if (metaData != null)
idxQryMetaFut.complete(metaData);
}
/** {@inheritDoc} */
@Override public void awaitFirstItemAvailable() throws IgniteCheckedException {
U.await(firstPageLatch);
if (isDone() && error() != null)
// Throw the exception if future failed.
super.get();
}
/** {@inheritDoc} */
@Override public Collection<R> get() throws IgniteCheckedException {
return get0();
}
/** {@inheritDoc} */
@Override public Collection<R> get(long timeout, TimeUnit unit) throws IgniteCheckedException {
return get0();
}
/** {@inheritDoc} */
@Override public Collection<R> getUninterruptibly() throws IgniteCheckedException {
return get0();
}
/**
* Completion of distributed query future depends on user that iterates over query result with lazy page loading.
* Then {@link #get()} can lock on unpredictably long period of time. So we should avoid call it.
*/
private Collection<R> get0() {
throw new IgniteIllegalStateException("Unexpected lock on iterator over distributed cache query result.");
}
/** {@inheritDoc} */
@Override void clear() {
assert isDone() : this;
GridCacheDistributedQueryManager<K, V> qryMgr = (GridCacheDistributedQueryManager<K, V>)cctx.queries();
if (qryMgr != null)
qryMgr.removeQueryFuture(reqId);
}
/** @return Request ID. */
long requestId() {
return reqId;
}
/**
* Send initial query request to query nodes.
*/
public void startQuery() {
try {
GridCacheQueryRequest req = GridCacheQueryRequest.startQueryRequest(cctx, reqId, this);
qryMgr.sendRequest(this, req, streams.keySet());
}
catch (IgniteCheckedException e) {
onError(e);
}
}
/**
* Send request to fetch new pages.
*
* @param nodeId Node to send request.
*/
private void requestPages(UUID nodeId) {
try {
GridCacheQueryRequest req = GridCacheQueryRequest.pageRequest(cctx, reqId, query().query(), fields());
qryMgr.sendRequest(this, req, Collections.singletonList(nodeId));
}
catch (IgniteCheckedException e) {
onError(e);
}
}
/**
* Cancels remote pages.
*
* @param nodeId Node to send request.
*/
// TODO IGNITE-15731: Refactor how CacheQueryReducer handles remote nodes.
private void cancelPages(UUID nodeId) {
try {
GridCacheQueryRequest req = GridCacheQueryRequest.cancelRequest(cctx, reqId, fields());
if (nodeId.equals(cctx.localNodeId())) {
// Process cancel query directly (without sending) for local node,
cctx.closures().callLocalSafe(new GridPlainCallable<Object>() {
@Override public Object call() {
qryMgr.processQueryRequest(cctx.localNodeId(), req);
return null;
}
});
}
else {
try {
cctx.io().send(nodeId, req, cctx.ioPolicy());
}
catch (IgniteCheckedException e) {
if (cctx.io().checkNodeLeft(nodeId, e, false)) {
if (log.isDebugEnabled())
log.debug("Failed to send cancel request, node failed: " + nodeId);
}
else
U.error(log, "Failed to send cancel request [node=" + nodeId + ']', e);
}
}
}
catch (IgniteCheckedException e) {
U.error(logger(), "Failed to send cancel request (will cancel query in any case).", e);
}
}
/** {@inheritDoc} */
@Override protected void onError(Throwable err) {
if (onDone(err)) {
streams.values().forEach(s -> s.cancel(err));
if (idxQryMetaFut != null && !idxQryMetaFut.isDone())
idxQryMetaFut.completeExceptionally(err);
firstPageLatch.countDown();
}
}
}