blob: f7b3f7a8b8fa32a8512dbd2d87b532d04e15e093 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.processors.cache.query;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.events.DiscoveryEvent;
import org.apache.ignite.events.Event;
import org.apache.ignite.internal.IgniteClientDisconnectedCheckedException;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.IgniteInterruptedCheckedException;
import org.apache.ignite.internal.cache.query.index.IndexQueryResultMeta;
import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
import org.apache.ignite.internal.managers.communication.GridIoPolicy;
import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
import org.apache.ignite.internal.metric.IoStatisticsHolder;
import org.apache.ignite.internal.metric.IoStatisticsQueryHelper;
import org.apache.ignite.internal.processors.query.GridQueryFieldMetadata;
import org.apache.ignite.internal.util.GridBoundedConcurrentOrderedSet;
import org.apache.ignite.internal.util.GridCloseableIteratorAdapter;
import org.apache.ignite.internal.util.lang.GridCloseableIterator;
import org.apache.ignite.internal.util.lang.GridPlainCallable;
import org.apache.ignite.internal.util.typedef.CI1;
import org.apache.ignite.internal.util.typedef.CI2;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.X;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteBiInClosure;
import org.apache.ignite.lang.IgniteClosure;
import org.apache.ignite.lang.IgniteFuture;
import org.apache.ignite.lang.IgniteReducer;
import org.jetbrains.annotations.Nullable;
import static org.apache.ignite.events.EventType.EVT_NODE_FAILED;
import static org.apache.ignite.events.EventType.EVT_NODE_LEFT;
import static org.apache.ignite.internal.GridTopic.TOPIC_CACHE;
import static org.apache.ignite.internal.processors.cache.query.GridCacheQueryType.INDEX;
import static org.apache.ignite.internal.processors.cache.query.GridCacheQueryType.SCAN;
/**
* Distributed query manager (for cache in REPLICATED / PARTITIONED cache mode).
*/
public class GridCacheDistributedQueryManager<K, V> extends GridCacheQueryManager<K, V> {
/** */
private static final int MAX_CANCEL_IDS = 1000;
/** Query response frequency. */
private static final long RESEND_FREQ = 3000;
/** Query response attempts. */
private static final int RESEND_ATTEMPTS = 5;
/** Prefix for communication topic. */
private static final String TOPIC_PREFIX = "QUERY";
/** {request ID -> thread} */
private ConcurrentMap<Long, Thread> threads = new ConcurrentHashMap<>();
/** {request ID -> future} */
private final ConcurrentMap<Long, GridCacheDistributedQueryFuture<?, ?, ?>> futs =
new ConcurrentHashMap<>();
/** Received requests to cancel. */
private Collection<CancelMessageId> cancelIds =
new GridBoundedConcurrentOrderedSet<>(MAX_CANCEL_IDS);
/** Canceled queries. */
private Collection<Long> cancelled = new GridBoundedConcurrentOrderedSet<>(MAX_CANCEL_IDS);
/** Query response handler. */
private IgniteBiInClosure<UUID, GridCacheQueryResponse> resHnd = new CI2<UUID, GridCacheQueryResponse>() {
@Override public void apply(UUID nodeId, GridCacheQueryResponse res) {
processQueryResponse(nodeId, res);
}
};
/** Event listener. */
private GridLocalEventListener lsnr = new GridLocalEventListener() {
/** {@inheritDoc} */
@Override public void onEvent(Event evt) {
DiscoveryEvent discoEvt = (DiscoveryEvent)evt;
for (GridCacheDistributedQueryFuture fut : futs.values())
fut.onNodeLeft(discoEvt.eventNode().id());
}
};
/** {@inheritDoc} */
@Override public void onKernalStart0() throws IgniteCheckedException {
super.onKernalStart0();
assert !cctx.isRecoveryMode() : "Registering message handlers in recovery mode [cacheName=" + cctx.name() + ']';
cctx.io().addCacheHandler(
cctx.cacheId(),
cctx.startTopologyVersion(),
GridCacheQueryRequest.class,
(CI2<UUID, GridCacheQueryRequest>)this::processQueryRequest);
cctx.events().addListener(lsnr, EVT_NODE_LEFT, EVT_NODE_FAILED);
}
/** {@inheritDoc} */
@Override protected void onKernalStop0(boolean cancel) {
super.onKernalStop0(cancel);
cctx.events().removeListener(lsnr);
}
/** {@inheritDoc} */
@Override public void onDisconnected(IgniteFuture<?> reconnectFut) {
IgniteClientDisconnectedCheckedException err = new IgniteClientDisconnectedCheckedException(reconnectFut,
"Query was cancelled, client node disconnected.");
for (Map.Entry<Long, GridCacheDistributedQueryFuture<?, ?, ?>> e : futs.entrySet()) {
GridCacheDistributedQueryFuture<?, ?, ?> fut = e.getValue();
fut.onPage(null, null, null, err, true);
futs.remove(e.getKey(), fut);
}
}
/** {@inheritDoc} */
@Override public void printMemoryStats() {
super.printMemoryStats();
X.println(">>> threadsSize: " + threads.size());
X.println(">>> futsSize: " + futs.size());
}
/**
* Removes query future from futures map.
*
* @param reqId Request id.
*/
protected void removeQueryFuture(long reqId) {
futs.remove(reqId);
}
/**
* Gets query future from futures map.
*
* @param reqId Request id.
* @return Found future or null.
*/
protected GridCacheDistributedQueryFuture<?, ?, ?> getQueryFuture(long reqId) {
return futs.get(reqId);
}
/**
* Processes cache query request.
*
* @param sndId Sender node id.
* @param req Query request.
*/
@Override public void processQueryRequest(UUID sndId, GridCacheQueryRequest req) {
assert req.mvccSnapshot() != null || !cctx.mvccEnabled() || req.cancel() ||
(req.type() == null && !req.fields()) : req; // Last assertion means next page request.
if (req.cancel()) {
cancelIds.add(new CancelMessageId(req.id(), sndId));
if (req.fields())
removeFieldsQueryResult(sndId, req.id());
else
removeQueryResult(sndId, req.id());
}
else {
if (!cancelIds.contains(new CancelMessageId(req.id(), sndId))) {
if (!F.eq(req.cacheName(), cctx.name())) {
GridCacheQueryResponse res = new GridCacheQueryResponse(
cctx.cacheId(),
req.id(),
new IgniteCheckedException("Received request for incorrect cache [expected=" + cctx.name() +
", actual=" + req.cacheName()),
cctx.deploymentEnabled());
sendQueryResponse(sndId, res, 0);
}
else {
threads.put(req.id(), Thread.currentThread());
try {
GridCacheQueryInfo info = distributedQueryInfo(sndId, req);
if (info == null)
return;
if (req.fields())
runFieldsQuery(info);
else
runQuery(info);
}
catch (Throwable e) {
U.error(log(), "Failed to run query.", e);
sendQueryResponse(sndId, new GridCacheQueryResponse(cctx.cacheId(), req.id(), e.getCause(),
cctx.deploymentEnabled()), 0);
if (e instanceof Error)
throw (Error)e;
}
finally {
threads.remove(req.id());
}
}
}
}
}
/**
* @param sndId Sender node id.
* @param req Query request.
* @return Query info.
*/
@Nullable private GridCacheQueryInfo distributedQueryInfo(UUID sndId, GridCacheQueryRequest req) {
IgniteReducer<Object, Object> rdc = req.reducer();
IgniteClosure<Object, Object> trans = (IgniteClosure<Object, Object>)req.transformer();
ClusterNode sndNode = cctx.node(sndId);
if (sndNode == null)
return null;
GridCacheQueryAdapter<?> qry =
new GridCacheQueryAdapter<>(
cctx,
req.type(),
log,
req.pageSize(),
0,
req.includeBackups(),
false,
null,
req.keyValueFilter(),
req.partition() == -1 ? null : req.partition(),
req.className(),
req.clause(),
req.idxQryDesc(),
req.limit(),
req.includeMetaData(),
req.keepBinary(),
req.taskHash(),
req.mvccSnapshot(),
req.isDataPageScanEnabled()
);
return new GridCacheQueryInfo(
false,
trans,
rdc,
qry,
null,
sndId,
req.id(),
req.includeMetaData(),
req.allPages(),
req.arguments()
);
}
/**
* Sends cache query response.
*
* @param nodeId Node to send response.
* @param res Cache query response.
* @param timeout Message timeout.
* @return {@code true} if response was sent, {@code false} otherwise.
*/
private boolean sendQueryResponse(UUID nodeId, GridCacheQueryResponse res, long timeout) {
ClusterNode node = cctx.node(nodeId);
if (node == null)
return false;
int attempt = 1;
IgniteCheckedException err = null;
while (!Thread.currentThread().isInterrupted()) {
try {
if (log.isDebugEnabled())
log.debug("Send query response: " + res);
Object topic = topic(nodeId, res.requestId());
cctx.io().sendOrderedMessage(
node,
topic,
res,
GridIoPolicy.QUERY_POOL,
timeout > 0 ? timeout : Long.MAX_VALUE);
return true;
}
catch (ClusterTopologyCheckedException ignored) {
if (log.isDebugEnabled())
log.debug("Failed to send query response since node left grid [nodeId=" + nodeId +
", res=" + res + "]");
return false;
}
catch (IgniteCheckedException e) {
if (err == null)
err = e;
if (Thread.currentThread().isInterrupted())
break;
if (attempt < RESEND_ATTEMPTS) {
if (log.isDebugEnabled())
log.debug("Failed to send queries response (will try again) [nodeId=" + nodeId + ", res=" +
res + ", attempt=" + attempt + ", err=" + e + "]");
if (!Thread.currentThread().isInterrupted())
try {
U.sleep(RESEND_FREQ);
}
catch (IgniteInterruptedCheckedException e1) {
U.error(log,
"Waiting for queries response resending was interrupted (response will not be sent) " +
"[nodeId=" + nodeId + ", response=" + res + "]", e1);
return false;
}
}
else {
U.error(log, "Failed to sender cache response [nodeId=" + nodeId + ", response=" + res + "]", err);
return false;
}
}
attempt++;
}
return false;
}
/**
* Processes cache query response.
*
* @param sndId Sender node id.
* @param res Query response.
*/
@SuppressWarnings("unchecked")
private void processQueryResponse(UUID sndId, GridCacheQueryResponse res) {
if (log.isDebugEnabled())
log.debug("Received query response: " + res);
GridCacheQueryFutureAdapter fut = getQueryFuture(res.requestId());
if (fut != null)
if (res.fields())
((GridCacheDistributedFieldsQueryFuture)fut).onFieldsPage(
sndId,
res.metadata(),
(Collection<Map<String, Object>>)((Collection)res.data()),
res.error(),
res.isFinished());
else
fut.onPage(sndId, res.idxQryMetadata(), res.data(), res.error(), res.isFinished());
else if (!cancelled.contains(res.requestId()))
U.warn(log, "Received response for finished or unknown query [rmtNodeId=" + sndId +
", res=" + res + ']');
}
/** {@inheritDoc} */
@Override void onQueryFutureCanceled(long reqId) {
cancelled.add(reqId);
}
/** {@inheritDoc} */
@Override void onCancelAtStop() {
super.onCancelAtStop();
for (GridCacheQueryFutureAdapter fut : futs.values())
try {
fut.cancel();
}
catch (IgniteCheckedException e) {
U.error(log, "Failed to cancel running query future: " + fut, e);
}
U.interrupt(threads.values());
}
/** {@inheritDoc} */
@Override protected boolean onPageReady(
boolean loc,
GridCacheQueryInfo qryInfo,
IndexQueryResultMeta idxQryMetadata,
Collection<?> data,
boolean finished,
Throwable e
) {
GridCacheLocalQueryFuture<?, ?, ?> fut = qryInfo.localQueryFuture();
if (loc)
assert fut != null;
if (e != null) {
if (loc)
fut.onPage(null, null, null, e, true);
else
sendQueryResponse(qryInfo.senderId(),
new GridCacheQueryResponse(cctx.cacheId(), qryInfo.requestId(), e, cctx.deploymentEnabled()),
qryInfo.query().timeout());
return true;
}
if (loc)
fut.onPage(null, null, data, null, finished);
else {
GridCacheQueryResponse res = new GridCacheQueryResponse(cctx.cacheId(), qryInfo.requestId(),
finished, /*fields*/false, cctx.deploymentEnabled());
if (qryInfo.query().type() == INDEX)
res.idxQryMetadata((IndexQueryResultMeta)idxQryMetadata);
res.data(data);
if (!sendQueryResponse(qryInfo.senderId(), res, qryInfo.query().timeout()))
return false;
}
return true;
}
/** {@inheritDoc} */
@SuppressWarnings("ConstantConditions")
@Override protected boolean onFieldsPageReady(boolean loc, GridCacheQueryInfo qryInfo,
@Nullable List<GridQueryFieldMetadata> metadata,
@Nullable Collection<?> entities,
@Nullable Collection<?> data,
boolean finished, @Nullable Throwable e) {
assert qryInfo != null;
if (e != null) {
if (loc) {
GridCacheLocalFieldsQueryFuture fut = (GridCacheLocalFieldsQueryFuture)qryInfo.localQueryFuture();
fut.onPage(null, null, null, e, true);
}
else
sendQueryResponse(qryInfo.senderId(),
new GridCacheQueryResponse(cctx.cacheId(), qryInfo.requestId(), e, cctx.deploymentEnabled()),
qryInfo.query().timeout());
return true;
}
if (loc) {
GridCacheLocalFieldsQueryFuture fut = (GridCacheLocalFieldsQueryFuture)qryInfo.localQueryFuture();
fut.onFieldsPage(null, metadata, data, null, finished);
}
else {
GridCacheQueryResponse res = new GridCacheQueryResponse(cctx.cacheId(), qryInfo.requestId(),
finished, qryInfo.reducer() == null, cctx.deploymentEnabled());
res.metadata(metadata);
res.data(entities != null ? entities : data);
if (!sendQueryResponse(qryInfo.senderId(), res, qryInfo.query().timeout()))
return false;
}
return true;
}
/** {@inheritDoc} */
@Override public CacheQueryFuture<?> queryDistributed(GridCacheQueryBean qry, final Collection<ClusterNode> nodes) {
return queryDistributed(qry, nodes, false);
}
/** */
private CacheQueryFuture<?> queryDistributed(GridCacheQueryBean qry, final Collection<ClusterNode> nodes, boolean fields) {
if (log.isDebugEnabled())
log.debug("Executing distributed query: " + qry);
long reqId = cctx.io().nextIoId();
final GridCacheDistributedQueryFuture fut = fields ?
new GridCacheDistributedFieldsQueryFuture(cctx, reqId, qry, nodes) :
new GridCacheDistributedQueryFuture(cctx, reqId, qry, nodes);
try {
fut.qry.query().validate();
final Object topic = topic(cctx.nodeId(), reqId);
cctx.io().addOrderedCacheHandler(cctx.shared(), topic, resHnd);
fut.listen(new CI1<IgniteInternalFuture<?>>() {
@Override public void apply(IgniteInternalFuture<?> fut) {
cctx.io().removeOrderedHandler(false, topic);
}
});
futs.put(reqId, fut);
if (cctx.kernalContext().clientDisconnected()) {
IgniteClientDisconnectedCheckedException err = new IgniteClientDisconnectedCheckedException(
cctx.kernalContext().cluster().clientReconnectFuture(),
"Query was cancelled, client node disconnected.");
fut.onError(err);
}
fut.startQuery();
}
catch (IgniteCheckedException e) {
fut.onError(e);
}
return fut;
}
/** {@inheritDoc} */
@SuppressWarnings({"unchecked"})
@Override public GridCloseableIterator scanQueryDistributed(final GridCacheQueryAdapter qry,
Collection<ClusterNode> nodes) throws IgniteCheckedException {
assert qry.type() == GridCacheQueryType.SCAN : qry;
assert qry.mvccSnapshot() != null || !cctx.mvccEnabled();
boolean performanceStatsEnabled = cctx.kernalContext().performanceStatistics().enabled();
long startTime = performanceStatsEnabled ? System.currentTimeMillis() : 0;
long startTimeNanos = performanceStatsEnabled ? System.nanoTime() : 0;
GridCloseableIterator locIter0 = null;
for (ClusterNode node : nodes) {
if (node.isLocal()) {
locIter0 = scanQueryLocal(qry, false);
Collection<ClusterNode> rmtNodes = new ArrayList<>(nodes.size() - 1);
for (ClusterNode n : nodes) {
// Equals by reference can be used here.
if (n != node)
rmtNodes.add(n);
}
nodes = rmtNodes;
break;
}
}
final GridCloseableIterator locIter = locIter0;
final GridCacheQueryBean bean = new GridCacheQueryBean(qry, null, qry.<K, V>transform(), null);
final CacheQueryFuture fut = queryDistributed(bean, nodes);
return new GridCloseableIteratorAdapter() {
/** */
private Object cur;
/** Logical reads. */
private long logicalReads;
/** Physical reads. */
private long physicalReads;
@Override protected Object onNext() throws IgniteCheckedException {
if (!onHasNext())
throw new NoSuchElementException();
Object e = cur;
cur = null;
return e;
}
@Override protected boolean onHasNext() throws IgniteCheckedException {
if (cur != null)
return true;
if (locIter != null) {
if (performanceStatsEnabled)
IoStatisticsQueryHelper.startGatheringQueryStatistics();
try {
if (locIter.hasNextX())
cur = locIter.nextX();
}
finally {
if (performanceStatsEnabled) {
IoStatisticsHolder stat = IoStatisticsQueryHelper.finishGatheringQueryStatistics();
logicalReads += stat.logicalReads();
physicalReads += stat.physicalReads();
}
}
}
return cur != null || (cur = convert(fut.next())) != null;
}
/**
* @param obj Entry to convert.
* @return Cache entry
*/
private Object convert(Object obj) {
if (qry.transform() != null)
return obj;
Map.Entry e = (Map.Entry)obj;
return e == null ? null : new CacheQueryEntry(e.getKey(), e.getValue());
}
@Override protected void onClose() throws IgniteCheckedException {
super.onClose();
if (locIter != null)
locIter.close();
if (fut != null)
fut.cancel();
if (performanceStatsEnabled) {
cctx.kernalContext().performanceStatistics().query(
SCAN,
cctx.name(),
((GridCacheDistributedQueryFuture)fut).requestId(),
startTime,
System.nanoTime() - startTimeNanos,
true);
if (logicalReads > 0 || physicalReads > 0) {
cctx.kernalContext().performanceStatistics().queryReads(
SCAN,
cctx.localNodeId(),
((GridCacheDistributedQueryFuture)fut).requestId(),
logicalReads,
physicalReads);
}
}
}
};
}
/** {@inheritDoc} */
@Override public CacheQueryFuture<?> queryFieldsLocal(GridCacheQueryBean qry) {
if (log.isDebugEnabled())
log.debug("Executing query on local node: " + qry);
GridCacheLocalFieldsQueryFuture fut = new GridCacheLocalFieldsQueryFuture(cctx, qry);
try {
qry.query().validate();
fut.execute();
}
catch (IgniteCheckedException e) {
fut.onDone(e);
}
return fut;
}
/** {@inheritDoc} */
@Override public CacheQueryFuture<?> queryFieldsDistributed(GridCacheQueryBean qry, Collection<ClusterNode> nodes) {
return queryDistributed(qry, nodes, true);
}
/**
* Sends query request.
*
* @param fut Distributed future.
* @param req Request.
* @param nodeIds Nodes.
* @throws IgniteCheckedException In case of error.
*/
// TODO IGNITE-15731: Refactor how CacheQueryReducer handles remote nodes.
public void sendRequest(
final GridCacheDistributedQueryFuture<?, ?, ?> fut,
final GridCacheQueryRequest req,
Collection<UUID> nodeIds
) throws IgniteCheckedException {
assert fut != null;
assert req != null;
assert nodeIds != null;
final UUID locNodeId = cctx.localNodeId();
boolean locNode = false;
Collection<UUID> rmtNodes = null;
for (UUID nodeId: nodeIds) {
if (nodeId.equals(locNodeId))
locNode = true;
else {
if (rmtNodes == null)
rmtNodes = new ArrayList<>(nodeIds.size());
rmtNodes.add(nodeId);
}
}
// Request should be sent to remote nodes before the query is processed on the local node.
// For example, a remote reducer has a state, we should not serialize and then send
// the reducer changed by the local node.
if (!F.isEmpty(rmtNodes)) {
for (UUID node : rmtNodes) {
try {
cctx.io().send(node, req, GridIoPolicy.QUERY_POOL);
}
catch (IgniteCheckedException e) {
if (cctx.io().checkNodeLeft(node, e, true)) {
fut.onNodeLeft(node);
if (fut.isDone())
return;
}
else
throw e;
}
}
}
if (locNode) {
cctx.closures().callLocalSafe(new GridPlainCallable<Object>() {
@Override public Object call() throws Exception {
req.beforeLocalExecution(cctx);
processQueryRequest(locNodeId, req);
return null;
}
}, GridIoPolicy.QUERY_POOL);
}
}
/**
* Gets topic for ordered response messages.
*
* @param nodeId Node ID.
* @param reqId Request ID.
* @return Topic.
*/
private Object topic(UUID nodeId, long reqId) {
return TOPIC_CACHE.topic(TOPIC_PREFIX, nodeId, reqId);
}
/**
* Cancel message ID.
*/
private static class CancelMessageId implements Comparable<CancelMessageId> {
/** Message ID. */
private long reqId;
/** Node ID. */
private UUID nodeId;
/**
* @param reqId Message ID.
* @param nodeId Node ID.
*/
private CancelMessageId(long reqId, UUID nodeId) {
this.reqId = reqId;
this.nodeId = nodeId;
}
/** {@inheritDoc} */
@Override public int compareTo(CancelMessageId m) {
int res = Long.compare(reqId, m.reqId);
if (res == 0)
res = m.nodeId.compareTo(nodeId);
return res;
}
/** {@inheritDoc} */
@Override public boolean equals(Object obj) {
if (obj == this)
return true;
CancelMessageId other = (CancelMessageId)obj;
return reqId == other.reqId && nodeId.equals(other.nodeId);
}
/** {@inheritDoc} */
@Override public int hashCode() {
return 31 * ((int)(reqId ^ (reqId >>> 32))) + nodeId.hashCode();
}
/** {@inheritDoc} */
@Override public String toString() {
return S.toString(CancelMessageId.class, this);
}
}
}