blob: 78607345d2a71f59d5cf7faf7b1f1efd94e9aa11 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.processors.cache;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Lock;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.binary.BinaryObjectException;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.failure.FailureContext;
import org.apache.ignite.failure.FailureType;
import org.apache.ignite.internal.IgniteClientDisconnectedCheckedException;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
import org.apache.ignite.internal.managers.communication.GridIoPolicy;
import org.apache.ignite.internal.managers.communication.GridMessageListener;
import org.apache.ignite.internal.managers.deployment.GridDeploymentInfo;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.distributed.dht.CacheGetFuture;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtAffinityAssignmentRequest;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtAffinityAssignmentResponse;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLockRequest;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLockResponse;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxFinishRequest;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxFinishResponse;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxPrepareRequest;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxPrepareResponse;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxQueryEnlistRequest;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxQueryEnlistResponse;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridPartitionedSingleGetFuture;
import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicAbstractUpdateRequest;
import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicNearResponse;
import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicSingleUpdateRequest;
import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicUpdateRequest;
import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicUpdateResponse;
import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicAbstractUpdateRequest;
import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicCheckUpdateRequest;
import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicFullUpdateRequest;
import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicSingleUpdateFilterRequest;
import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicSingleUpdateInvokeRequest;
import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicSingleUpdateRequest;
import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicUpdateResponse;
import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.UpdateErrors;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtForceKeysRequest;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtForceKeysResponse;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearGetRequest;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearGetResponse;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearLockRequest;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearLockResponse;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearSingleGetRequest;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearSingleGetResponse;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxEnlistRequest;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxEnlistResponse;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxFinishRequest;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxFinishResponse;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxPrepareRequest;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxPrepareResponse;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxQueryEnlistRequest;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxQueryEnlistResponse;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxQueryResultsEnlistRequest;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxQueryResultsEnlistResponse;
import org.apache.ignite.internal.processors.cache.query.GridCacheQueryRequest;
import org.apache.ignite.internal.processors.cache.query.GridCacheQueryResponse;
import org.apache.ignite.internal.processors.cache.transactions.IgniteTxState;
import org.apache.ignite.internal.processors.cache.transactions.IgniteTxStateAware;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.util.StripedCompositeReadWriteLock;
import org.apache.ignite.internal.util.lang.GridPlainRunnable;
import org.apache.ignite.internal.util.typedef.CI1;
import org.apache.ignite.internal.util.typedef.X;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteBiInClosure;
import org.apache.ignite.lang.IgniteUuid;
import org.apache.ignite.thread.IgniteThread;
import org.jetbrains.annotations.Nullable;
import static org.apache.ignite.internal.GridTopic.TOPIC_CACHE;
import static org.apache.ignite.internal.util.IgniteUtils.nl;
/**
* Cache communication manager.
*/
public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
/** Communication topic prefix for distributed queries. */
private static final String QUERY_TOPIC_PREFIX = "QUERY";
/** Message ID generator. */
private static final AtomicLong idGen = new AtomicLong();
/** */
private static final int MAX_STORED_PENDING_MESSAGES = 100;
/** Delay in milliseconds between retries. */
private long retryDelay;
/** Number of retries using to send messages. */
private int retryCnt;
/** */
private final MessageHandlers cacheHandlers = new MessageHandlers();
/** */
private final MessageHandlers grpHandlers = new MessageHandlers();
/** Stopping flag. */
private boolean stopping;
/** Mutex. */
private final StripedCompositeReadWriteLock rw =
new StripedCompositeReadWriteLock(Runtime.getRuntime().availableProcessors());
/** Deployment enabled. */
private boolean depEnabled;
/** */
private final List<GridCacheMessage> pendingMsgs = new ArrayList<>(MAX_STORED_PENDING_MESSAGES);
/**
* @param sb String builder.
*/
public void dumpPendingMessages(StringBuilder sb) {
synchronized (pendingMsgs) {
if (pendingMsgs.isEmpty())
return;
sb.append("Pending cache messages waiting for exchange [readyVer=").
append(cctx.exchange().readyAffinityVersion()).
append(", discoVer=").
append(cctx.discovery().topologyVersion()).append(']');
sb.append(nl());
for (GridCacheMessage msg : pendingMsgs) {
sb.append("Message [waitVer=").append(msg.topologyVersion()).append(", msg=").append(msg).append(']');
sb.append(nl());
}
}
}
/** Message listener. */
private GridMessageListener lsnr = new GridMessageListener() {
@Override public void onMessage(final UUID nodeId, final Object msg, final byte plc) {
if (log.isDebugEnabled())
log.debug("Received unordered cache communication message [nodeId=" + nodeId +
", locId=" + cctx.localNodeId() + ", msg=" + msg + ']');
final GridCacheMessage cacheMsg = (GridCacheMessage)msg;
AffinityTopologyVersion rmtAffVer = cacheMsg.topologyVersion();
AffinityTopologyVersion lastAffChangedVer = cacheMsg.lastAffinityChangedTopologyVersion();
cctx.exchange().lastAffinityChangedTopologyVersion(rmtAffVer, lastAffChangedVer);
IgniteInternalFuture<?> fut = null;
if (cacheMsg.partitionExchangeMessage()) {
if (cacheMsg instanceof GridDhtAffinityAssignmentRequest) {
GridDhtAffinityAssignmentRequest msg0 = (GridDhtAffinityAssignmentRequest)cacheMsg;
assert cacheMsg.topologyVersion() != null : cacheMsg;
AffinityTopologyVersion startTopVer = new AffinityTopologyVersion(cctx.localNode().order());
CacheGroupDescriptor desc = cctx.cache().cacheGroupDescriptors().get(msg0.groupId());
if (desc != null) {
if (desc.startTopologyVersion() != null)
startTopVer = desc.startTopologyVersion();
else if (desc.receivedFromStartVersion() != null)
startTopVer = desc.receivedFromStartVersion();
}
// Need to wait for exchange to avoid race between cache start and affinity request.
fut = cctx.exchange().affinityReadyFuture(startTopVer);
if (fut != null && !fut.isDone()) {
if (log.isDebugEnabled()) {
log.debug("Wait for exchange before processing message [msg=" + msg +
", node=" + nodeId +
", waitVer=" + startTopVer +
", cacheDesc=" + descriptorForMessage(cacheMsg) + ']');
}
fut.listen(new CI1<IgniteInternalFuture<?>>() {
@Override public void apply(IgniteInternalFuture<?> fut) {
cctx.kernalContext().closure().runLocalSafe(new GridPlainRunnable() {
@Override public void run() {
handleMessage(nodeId, cacheMsg, plc);
}
});
}
});
return;
}
}
long locTopVer = cctx.discovery().topologyVersion();
long rmtTopVer = cacheMsg.topologyVersion().topologyVersion();
if (locTopVer < rmtTopVer) {
if (log.isDebugEnabled())
log.debug("Received message has higher topology version [msg=" + msg +
", locTopVer=" + locTopVer + ", rmtTopVer=" + rmtTopVer + ']');
fut = cctx.discovery().topologyFuture(rmtTopVer);
}
}
else {
AffinityTopologyVersion locAffVer = cctx.exchange().readyAffinityVersion();
if (locAffVer.before(lastAffChangedVer)) {
IgniteLogger log = cacheMsg.messageLogger(cctx);
if (log.isDebugEnabled()) {
StringBuilder msg0 = new StringBuilder("Received message has higher affinity topology version [");
appendMessageInfo(cacheMsg, nodeId, msg0);
msg0.append(", locTopVer=").append(locAffVer).
append(", rmtTopVer=").append(rmtAffVer).
append(", lastAffChangedVer=").append(lastAffChangedVer).
append(']');
log.debug(msg0.toString());
}
fut = cctx.exchange().affinityReadyFuture(lastAffChangedVer);
}
}
if (fut != null && !fut.isDone()) {
synchronized (pendingMsgs) {
if (pendingMsgs.size() < MAX_STORED_PENDING_MESSAGES)
pendingMsgs.add(cacheMsg);
}
Thread curThread = Thread.currentThread();
final int stripe = curThread instanceof IgniteThread ? ((IgniteThread)curThread).stripe() : -1;
fut.listen(new CI1<IgniteInternalFuture<?>>() {
@Override public void apply(IgniteInternalFuture<?> t) {
Runnable c = new Runnable() {
@Override public void run() {
synchronized (pendingMsgs) {
pendingMsgs.remove(cacheMsg);
}
IgniteLogger log = cacheMsg.messageLogger(cctx);
if (log.isDebugEnabled()) {
StringBuilder msg0 = new StringBuilder("Process cache message after wait for " +
"affinity topology version [");
appendMessageInfo(cacheMsg, nodeId, msg0).append(']');
log.debug(msg0.toString());
}
handleMessage(nodeId, cacheMsg, plc);
}
};
if (stripe >= 0)
cctx.kernalContext().pools().getStripedExecutorService().execute(stripe, c);
else {
try {
cctx.kernalContext().pools().poolForPolicy(plc).execute(c);
}
catch (IgniteCheckedException e) {
U.error(cacheMsg.messageLogger(cctx), "Failed to get pool for policy: " + plc, e);
}
}
}
});
return;
}
handleMessage(nodeId, cacheMsg, plc);
}
};
/**
* @param nodeId Sender node ID.
* @param cacheMsg Message.
* @param plc Message policy.
*/
private void handleMessage(UUID nodeId, GridCacheMessage cacheMsg, byte plc) {
handleMessage(nodeId, cacheMsg, cacheMsg.cacheGroupMessage() ? grpHandlers : cacheHandlers, plc);
}
/**
* @param nodeId Sender node ID.
* @param cacheMsg Message.
* @param msgHandlers Message handlers.
* @param plc Message policy.
*/
@SuppressWarnings("unchecked")
private void handleMessage(UUID nodeId, GridCacheMessage cacheMsg, MessageHandlers msgHandlers, byte plc) {
Lock lock = rw.readLock();
lock.lock();
try {
int msgIdx = cacheMsg.lookupIndex();
IgniteBiInClosure<UUID, GridCacheMessage> c = null;
if (msgIdx >= 0) {
Map<Integer, IgniteBiInClosure[]> idxClsHandlers0 = msgHandlers.idxClsHandlers;
IgniteBiInClosure[] cacheClsHandlers = idxClsHandlers0.get(cacheMsg.handlerId());
if (cacheClsHandlers != null)
c = cacheClsHandlers[msgIdx];
}
if (c == null)
c = msgHandlers.clsHandlers.get(new ListenerKey(cacheMsg.handlerId(), cacheMsg.getClass()));
if (c == null) {
if (processMissedHandler(nodeId, cacheMsg))
return;
IgniteLogger log = cacheMsg.messageLogger(cctx);
StringBuilder msg0 = new StringBuilder("Received message without registered handler (will ignore) [");
appendMessageInfo(cacheMsg, nodeId, msg0);
msg0.append(", locTopVer=").append(cctx.exchange().readyAffinityVersion()).
append(", msgTopVer=").append(cacheMsg.topologyVersion()).
append(", desc=").append(descriptorForMessage(cacheMsg)).
append(']');
msg0.append(nl()).append("Registered listeners:");
Map<Integer, IgniteBiInClosure[]> idxClsHandlers0 = msgHandlers.idxClsHandlers;
for (Map.Entry<Integer, IgniteBiInClosure[]> e : idxClsHandlers0.entrySet())
msg0.append(nl()).append(e.getKey()).append("=").append(Arrays.toString(e.getValue()));
if (cctx.kernalContext().isStopping()) {
if (log.isDebugEnabled())
log.debug(msg0.toString());
}
else {
U.error(log, msg0.toString());
try {
cacheMsg.onClassError(new IgniteCheckedException("Failed to find message handler for message: " + cacheMsg));
processFailedMessage(nodeId, cacheMsg, c, plc);
}
catch (Exception e) {
U.error(log, "Failed to process failed message: " + e, e);
}
}
return;
}
onMessage0(nodeId, cacheMsg, c, plc);
}
finally {
lock.unlock();
}
}
/**
* @param nodeId Node ID.
* @param cacheMsg Message.
* @return {@code True} if message processed.
*/
private boolean processMissedHandler(UUID nodeId, GridCacheMessage cacheMsg) {
// It is possible to receive reader update after client near cache was closed.
if (cacheMsg instanceof GridDhtAtomicAbstractUpdateRequest) {
GridDhtAtomicAbstractUpdateRequest req = (GridDhtAtomicAbstractUpdateRequest)cacheMsg;
if (req.nearSize() > 0) {
List<KeyCacheObject> nearEvicted = new ArrayList<>(req.nearSize());
for (int i = 0; i < req.nearSize(); i++)
nearEvicted.add(req.nearKey(i));
GridDhtAtomicUpdateResponse dhtRes = new GridDhtAtomicUpdateResponse(req.cacheId(),
req.partition(),
req.futureId(),
false);
dhtRes.nearEvicted(nearEvicted);
sendMessageForMissedHandler(cacheMsg,
nodeId,
dhtRes,
nodeId,
GridIoPolicy.SYSTEM_POOL);
if (req.nearNodeId() != null) {
GridDhtAtomicNearResponse nearRes = new GridDhtAtomicNearResponse(req.cacheId(),
req.partition(),
req.nearFutureId(),
nodeId,
req.flags());
sendMessageForMissedHandler(cacheMsg,
nodeId,
nearRes,
req.nearNodeId(),
GridIoPolicy.SYSTEM_POOL);
}
return true;
}
}
return false;
}
/**
* @param origMsg Message without handler.
* @param origMsgNode Node sent {@code origMsg}.
* @param nodeId Target node ID.
* @param msg Response.
* @param plc Policy.
*/
private void sendMessageForMissedHandler(
GridCacheMessage origMsg,
UUID origMsgNode,
GridCacheMessage msg,
UUID nodeId,
byte plc) {
IgniteLogger log = msg.messageLogger(cctx);
try {
if (log.isDebugEnabled()) {
log.debug("Received message without registered handler, " +
"send response [locTopVer=" + cctx.exchange().readyAffinityVersion() +
", msgTopVer=" + origMsg.topologyVersion() +
", node=" + origMsgNode +
", msg=" + origMsg +
", resNode=" + nodeId +
", res=" + msg + ']');
}
send(nodeId, msg, plc);
}
catch (ClusterTopologyCheckedException e) {
if (log.isDebugEnabled())
log.debug("Failed to send response, node left [nodeId=" + nodeId + ", msg=" + msg + ']');
}
catch (IgniteCheckedException e) {
U.error(log, "Failed to send response [nodeId=" + nodeId + ", msg=" + msg + ", err=" + e + ']');
}
}
/** {@inheritDoc} */
@Override public void start0() throws IgniteCheckedException {
retryDelay = cctx.gridConfig().getNetworkSendRetryDelay();
retryCnt = cctx.gridConfig().getNetworkSendRetryCount();
depEnabled = cctx.gridDeploy().enabled();
cctx.gridIO().addMessageListener(TOPIC_CACHE, lsnr);
}
/**
* @return Lock or {@code null} if node is stopping.
*/
@Nullable public Lock readLock() {
Lock lock = rw.readLock();
if (!lock.tryLock())
return null;
if (stopping) {
lock.unlock();
return null;
}
return lock;
}
/**
*
*/
public void writeLock() {
boolean interrupted = false;
// Busy wait is intentional.
while (true) {
try {
if (rw.writeLock().tryLock(200, TimeUnit.MILLISECONDS))
break;
else
Thread.sleep(200);
}
catch (InterruptedException ignore) {
// Preserve interrupt status & ignore.
// Note that interrupted flag is cleared.
interrupted = true;
}
}
if (interrupted)
Thread.currentThread().interrupt();
}
/**
*
*/
public void writeUnlock() {
rw.writeLock().unlock();
}
/** {@inheritDoc} */
@Override protected void onKernalStop0(boolean cancel) {
cctx.gridIO().removeMessageListener(TOPIC_CACHE);
for (Object ordTopic : cacheHandlers.orderedHandlers.keySet())
cctx.gridIO().removeMessageListener(ordTopic);
for (Object ordTopic : grpHandlers.orderedHandlers.keySet())
cctx.gridIO().removeMessageListener(ordTopic);
writeLock();
try {
stopping = true;
}
finally {
rw.writeLock().unlock();
}
}
/**
* @param nodeId Sender Node ID.
* @param cacheMsg Cache message.
* @param c Handler closure.
* @param plc Message policy.
*/
private void onMessage0(final UUID nodeId, final GridCacheMessage cacheMsg,
final IgniteBiInClosure<UUID, GridCacheMessage> c, byte plc) {
try {
if (stopping) {
if (log.isDebugEnabled())
log.debug("Received cache communication message while stopping (will ignore) [nodeId=" +
nodeId + ", msg=" + cacheMsg + ']');
return;
}
if (depEnabled)
cctx.deploy().ignoreOwnership(true);
unmarshall(nodeId, cacheMsg);
if (cacheMsg.classError() != null)
processFailedMessage(nodeId, cacheMsg, c, plc);
else
processMessage(nodeId, cacheMsg, c);
}
catch (Throwable e) {
String msgStr;
try {
msgStr = String.valueOf(cacheMsg);
}
catch (Throwable e0) {
String clsName = cacheMsg.getClass().getName();
U.error(log, "Failed to log message due to an error: " + clsName, e0);
msgStr = clsName + "(failed to log message)";
}
U.error(log, "Failed to process message [senderId=" + nodeId + ", msg=" + msgStr + ']', e);
cctx.kernalContext().failure().process(new FailureContext(FailureType.CRITICAL_ERROR, e));
if (e instanceof Error)
throw (Error)e;
}
finally {
if (depEnabled)
cctx.deploy().ignoreOwnership(false);
}
}
/**
* Sends response on failed message.
*
* @param nodeId node id.
* @param res response.
* @param cctx shared context.
* @param plc grid io policy.
*/
private void sendResponseOnFailedMessage(UUID nodeId, GridCacheMessage res, GridCacheSharedContext cctx,
byte plc) {
try {
cctx.io().send(nodeId, res, plc);
}
catch (IgniteCheckedException e) {
U.error(log, "Failed to send response to node (is node still alive?) [nodeId=" + nodeId +
",res=" + res + ']', e);
}
}
/**
* @param cacheMsg Cache message.
* @param nodeId Node ID.
* @param builder Message builder.
* @return Message builder.
*/
private StringBuilder appendMessageInfo(GridCacheMessage cacheMsg, UUID nodeId, StringBuilder builder) {
if (txId(cacheMsg) != null) {
builder.append("txId=").append(txId(cacheMsg)).
append(", dhtTxId=").append(dhtTxId(cacheMsg)).
append(", msg=").append(cacheMsg);
}
else if (atomicFututeId(cacheMsg) != null) {
builder.append("futId=").append(atomicFututeId(cacheMsg)).
append(", writeVer=").append(atomicWriteVersion(cacheMsg)).
append(", msg=").append(cacheMsg);
}
else
builder.append("msg=").append(cacheMsg);
builder.append(", node=").append(nodeId);
return builder;
}
/**
* @param cacheMsg Cache message.
* @return Transaction ID if applicable for message.
*/
@Nullable private GridCacheVersion txId(GridCacheMessage cacheMsg) {
if (cacheMsg instanceof GridDhtTxPrepareRequest)
return ((GridDhtTxPrepareRequest)cacheMsg).nearXidVersion();
else if (cacheMsg instanceof GridNearTxPrepareRequest)
return ((GridNearTxPrepareRequest)cacheMsg).version();
else if (cacheMsg instanceof GridNearTxPrepareResponse)
return ((GridNearTxPrepareResponse)cacheMsg).version();
else if (cacheMsg instanceof GridNearTxFinishRequest)
return ((GridNearTxFinishRequest)cacheMsg).version();
else if (cacheMsg instanceof GridNearTxFinishResponse)
return ((GridNearTxFinishResponse)cacheMsg).xid();
return null;
}
/**
* @param cacheMsg Cache message.
* @return Transaction ID if applicable for message.
*/
@Nullable private GridCacheVersion dhtTxId(GridCacheMessage cacheMsg) {
if (cacheMsg instanceof GridDhtTxPrepareRequest)
return ((GridDhtTxPrepareRequest)cacheMsg).version();
else if (cacheMsg instanceof GridDhtTxPrepareResponse)
return ((GridDhtTxPrepareResponse)cacheMsg).version();
else if (cacheMsg instanceof GridDhtTxFinishRequest)
return ((GridDhtTxFinishRequest)cacheMsg).version();
else if (cacheMsg instanceof GridDhtTxFinishResponse)
return ((GridDhtTxFinishResponse)cacheMsg).xid();
return null;
}
/**
* @param cacheMsg Cache message.
* @return Atomic future ID if applicable for message.
*/
@Nullable private Long atomicFututeId(GridCacheMessage cacheMsg) {
if (cacheMsg instanceof GridNearAtomicAbstractUpdateRequest)
return ((GridNearAtomicAbstractUpdateRequest)cacheMsg).futureId();
else if (cacheMsg instanceof GridNearAtomicUpdateResponse)
return ((GridNearAtomicUpdateResponse)cacheMsg).futureId();
else if (cacheMsg instanceof GridDhtAtomicAbstractUpdateRequest)
return ((GridDhtAtomicAbstractUpdateRequest)cacheMsg).futureId();
else if (cacheMsg instanceof GridDhtAtomicUpdateResponse)
return ((GridDhtAtomicUpdateResponse)cacheMsg).futureId();
else if (cacheMsg instanceof GridNearAtomicCheckUpdateRequest)
return ((GridNearAtomicCheckUpdateRequest)cacheMsg).futureId();
return null;
}
/**
* @param cacheMsg Cache message.
* @return Atomic future ID if applicable for message.
*/
@Nullable private GridCacheVersion atomicWriteVersion(GridCacheMessage cacheMsg) {
if (cacheMsg instanceof GridDhtAtomicAbstractUpdateRequest)
return ((GridDhtAtomicAbstractUpdateRequest)cacheMsg).writeVersion();
return null;
}
/**
* Processes failed messages.
*
* @param nodeId Node ID.
* @param msg Message.
* @param c Closure.
* @param plc Message policy.
* @throws IgniteCheckedException If failed.
*/
private void processFailedMessage(UUID nodeId,
GridCacheMessage msg,
IgniteBiInClosure<UUID, GridCacheMessage> c,
byte plc)
throws IgniteCheckedException {
assert msg != null;
switch (msg.directType()) {
case 30: {
GridDhtLockRequest req = (GridDhtLockRequest)msg;
GridDhtLockResponse res = new GridDhtLockResponse(
req.cacheId(),
req.version(),
req.futureId(),
req.miniId(),
0,
false);
sendResponseOnFailedMessage(nodeId, res, cctx, plc);
}
break;
case 34: {
GridDhtTxPrepareRequest req = (GridDhtTxPrepareRequest)msg;
GridDhtTxPrepareResponse res = new GridDhtTxPrepareResponse(
req.partition(),
req.version(),
req.futureId(),
req.miniId(),
req.deployInfo() != null);
res.error(req.classError());
sendResponseOnFailedMessage(nodeId, res, cctx, req.policy());
}
break;
case 38: {
GridDhtAtomicUpdateRequest req = (GridDhtAtomicUpdateRequest)msg;
GridDhtAtomicUpdateResponse res = new GridDhtAtomicUpdateResponse(
req.cacheId(),
req.partition(),
req.futureId(),
false);
res.onError(req.classError());
sendResponseOnFailedMessage(nodeId, res, cctx, plc);
if (req.nearNodeId() != null) {
GridDhtAtomicNearResponse nearRes = new GridDhtAtomicNearResponse(req.cacheId(),
req.partition(),
req.nearFutureId(),
nodeId,
req.flags());
nearRes.errors(new UpdateErrors(req.classError()));
sendResponseOnFailedMessage(req.nearNodeId(), nearRes, cctx, plc);
}
}
break;
case 40: {
GridNearAtomicFullUpdateRequest req = (GridNearAtomicFullUpdateRequest)msg;
GridNearAtomicUpdateResponse res = new GridNearAtomicUpdateResponse(
req.cacheId(),
nodeId,
req.futureId(),
req.partition(),
false,
false);
res.error(req.classError());
sendResponseOnFailedMessage(nodeId, res, cctx, plc);
}
break;
case 42: {
GridDhtForceKeysRequest req = (GridDhtForceKeysRequest)msg;
GridDhtForceKeysResponse res = new GridDhtForceKeysResponse(
req.cacheId(),
req.futureId(),
req.miniId(),
false
);
res.error(req.classError());
sendResponseOnFailedMessage(nodeId, res, cctx, plc);
}
break;
case 49: {
GridNearGetRequest req = (GridNearGetRequest)msg;
GridNearGetResponse res = new GridNearGetResponse(
req.cacheId(),
req.futureId(),
req.miniId(),
req.version(),
req.deployInfo() != null);
res.error(req.classError());
sendResponseOnFailedMessage(nodeId, res, cctx, plc);
}
break;
case 50: {
GridNearGetResponse res = (GridNearGetResponse)msg;
CacheGetFuture fut = (CacheGetFuture)cctx.mvcc().future(res.futureId());
if (fut == null) {
if (log.isDebugEnabled())
log.debug("Failed to find future for get response [sender=" + nodeId + ", res=" + res + ']');
return;
}
res.error(res.classError());
fut.onResult(nodeId, res);
}
break;
case 51: {
GridNearLockRequest req = (GridNearLockRequest)msg;
GridNearLockResponse res = new GridNearLockResponse(
req.cacheId(),
req.version(),
req.futureId(),
req.miniId(),
false,
0,
req.classError(),
null,
false,
false);
sendResponseOnFailedMessage(nodeId, res, cctx, plc);
}
break;
case 55: {
GridNearTxPrepareRequest req = (GridNearTxPrepareRequest)msg;
GridNearTxPrepareResponse res = new GridNearTxPrepareResponse(
req.partition(),
req.version(),
req.futureId(),
req.miniId(),
req.version(),
req.version(),
null,
null,
null,
false,
req.deployInfo() != null);
res.error(req.classError());
sendResponseOnFailedMessage(nodeId, res, cctx, req.policy());
}
break;
case 58: {
GridCacheQueryRequest req = (GridCacheQueryRequest)msg;
GridCacheQueryResponse res = new GridCacheQueryResponse(
req.cacheId(),
req.id(),
req.classError(),
cctx.deploymentEnabled());
ClusterNode node = cctx.node(nodeId);
if (node == null) {
U.error(log, "Failed to send message because node left grid [nodeId=" + nodeId +
", msg=" + msg + ']');
}
else {
cctx.io().sendOrderedMessage(
node,
TOPIC_CACHE.topic(QUERY_TOPIC_PREFIX, nodeId, req.id()),
res,
plc,
Long.MAX_VALUE);
}
}
break;
case 114:
case 120: {
processMessage(nodeId, msg, c); // Will be handled by Rebalance Demander.
}
break;
case 116: {
GridNearSingleGetRequest req = (GridNearSingleGetRequest)msg;
GridNearSingleGetResponse res = new GridNearSingleGetResponse(
req.cacheId(),
req.futureId(),
req.topologyVersion(),
null,
false,
req.deployInfo() != null);
res.error(req.classError());
sendResponseOnFailedMessage(nodeId, res, cctx, plc);
}
break;
case 117: {
GridNearSingleGetResponse res = (GridNearSingleGetResponse)msg;
GridPartitionedSingleGetFuture fut = (GridPartitionedSingleGetFuture)cctx.mvcc()
.future(new IgniteUuid(IgniteUuid.VM_ID, res.futureId()));
if (fut == null) {
if (log.isDebugEnabled())
log.debug("Failed to find future for get response [sender=" + nodeId + ", res=" + res + ']');
return;
}
res.error(res.classError());
fut.onResult(nodeId, res);
}
break;
case 125: {
GridNearAtomicSingleUpdateRequest req = (GridNearAtomicSingleUpdateRequest)msg;
GridNearAtomicUpdateResponse res = new GridNearAtomicUpdateResponse(
req.cacheId(),
nodeId,
req.futureId(),
req.partition(),
false,
false);
res.error(req.classError());
sendResponseOnFailedMessage(nodeId, res, cctx, plc);
}
break;
case 126: {
GridNearAtomicSingleUpdateInvokeRequest req = (GridNearAtomicSingleUpdateInvokeRequest)msg;
GridNearAtomicUpdateResponse res = new GridNearAtomicUpdateResponse(
req.cacheId(),
nodeId,
req.futureId(),
req.partition(),
false,
false);
res.error(req.classError());
sendResponseOnFailedMessage(nodeId, res, cctx, plc);
}
break;
case 127: {
GridNearAtomicSingleUpdateFilterRequest req = (GridNearAtomicSingleUpdateFilterRequest)msg;
GridNearAtomicUpdateResponse res = new GridNearAtomicUpdateResponse(
req.cacheId(),
nodeId,
req.futureId(),
req.partition(),
false,
false);
res.error(req.classError());
sendResponseOnFailedMessage(nodeId, res, cctx, plc);
}
break;
case 151: {
GridNearTxQueryEnlistRequest req = (GridNearTxQueryEnlistRequest)msg;
GridNearTxQueryEnlistResponse res = new GridNearTxQueryEnlistResponse(
req.cacheId(),
req.futureId(),
req.miniId(),
req.version(),
req.classError());
sendResponseOnFailedMessage(nodeId, res, cctx, plc);
break;
}
case 153: {
GridNearTxQueryResultsEnlistRequest req = (GridNearTxQueryResultsEnlistRequest)msg;
GridNearTxQueryEnlistResponse res = new GridNearTxQueryResultsEnlistResponse(
req.cacheId(),
req.futureId(),
req.miniId(),
req.version(),
req.classError());
sendResponseOnFailedMessage(nodeId, res, cctx, plc);
break;
}
case 155: /* GridDhtTxQueryEnlistRequest */
case 156: /* GridDhtTxQueryFirstEnlistRequest */ {
GridDhtTxQueryEnlistRequest req = (GridDhtTxQueryEnlistRequest)msg;
GridDhtTxQueryEnlistResponse res = new GridDhtTxQueryEnlistResponse(
req.cacheId(),
req.dhtFutureId(),
req.batchId(),
req.classError());
sendResponseOnFailedMessage(nodeId, res, cctx, plc);
break;
}
case 159: {
GridNearTxEnlistRequest req = (GridNearTxEnlistRequest)msg;
GridNearTxEnlistResponse res = new GridNearTxEnlistResponse(
req.cacheId(),
req.futureId(),
req.miniId(),
req.version(),
req.classError());
sendResponseOnFailedMessage(nodeId, res, cctx, plc);
break;
}
case -36: {
GridDhtAtomicSingleUpdateRequest req = (GridDhtAtomicSingleUpdateRequest)msg;
GridDhtAtomicUpdateResponse res = new GridDhtAtomicUpdateResponse(
req.cacheId(),
req.partition(),
req.futureId(),
false);
res.onError(req.classError());
sendResponseOnFailedMessage(nodeId, res, cctx, plc);
if (req.nearNodeId() != null) {
GridDhtAtomicNearResponse nearRes = new GridDhtAtomicNearResponse(req.cacheId(),
req.partition(),
req.nearFutureId(),
nodeId,
req.flags());
nearRes.errors(new UpdateErrors(req.classError()));
sendResponseOnFailedMessage(req.nearNodeId(), nearRes, cctx, plc);
}
}
break;
default:
throw new IgniteCheckedException("Failed to send response to node. Unsupported direct type [message="
+ msg + "]", msg.classError());
}
}
/**
* @param nodeId Node ID.
* @param msg Message.
* @param c Closure.
*/
private void processMessage(UUID nodeId, GridCacheMessage msg, IgniteBiInClosure<UUID, GridCacheMessage> c) {
try {
c.apply(nodeId, msg);
if (log.isDebugEnabled())
log.debug("Finished processing cache communication message [nodeId=" + nodeId + ", msg=" + msg + ']');
}
catch (Throwable e) {
try {
U.error(log, "Failed processing message [senderId=" + nodeId + ", msg=" + msg + ']', e);
}
catch (Throwable e0) {
U.error(log, "Failed processing message [senderId=" + nodeId + ", msg=(failed to log message)", e);
U.error(log, "Failed to log message due to an error: ", e0);
}
cctx.kernalContext().failure().process(new FailureContext(FailureType.CRITICAL_ERROR, e));
throw e;
}
finally {
onMessageProcessed(msg);
}
}
/**
* @param msg Message.
*/
public void onMessageProcessed(GridCacheMessage msg) {
// Reset thread local context.
cctx.tm().resetContext();
GridCacheMvccManager mvcc = cctx.mvcc();
if (mvcc != null)
mvcc.contextReset();
// Unwind eviction notifications.
if (msg instanceof IgniteTxStateAware) {
IgniteTxState txState = ((IgniteTxStateAware)msg).txState();
if (txState != null)
txState.unwindEvicts(cctx);
}
else if (msg instanceof GridCacheIdMessage) {
GridCacheContext ctx = cctx.cacheContext(((GridCacheIdMessage)msg).cacheId());
if (ctx != null)
CU.unwindEvicts(ctx);
}
}
/**
* Pre-processes message prior to send.
*
* @param msg Message to send.
* @param destNodeId Destination node ID.
* @return {@code True} if should send message.
* @throws IgniteCheckedException If failed.
*/
private boolean onSend(GridCacheMessage msg, @Nullable UUID destNodeId) throws IgniteCheckedException {
if (msg.error() != null && cctx.kernalContext().isStopping())
return false;
if (msg.messageId() < 0)
// Generate and set message ID.
msg.messageId(idGen.incrementAndGet());
if (destNodeId == null || !cctx.localNodeId().equals(destNodeId)) {
msg.prepareMarshal(cctx);
if (msg instanceof GridCacheDeployable && msg.addDeploymentInfo())
cctx.deploy().prepare((GridCacheDeployable)msg);
}
return true;
}
/**
* @param nodeId Node ID.
* @param sndErr Send error.
* @return {@code True} if node left.
* @param ping {@code True} if try ping node.
* @throws IgniteClientDisconnectedCheckedException If ping failed.
*/
public boolean checkNodeLeft(UUID nodeId, IgniteCheckedException sndErr, boolean ping)
throws IgniteClientDisconnectedCheckedException {
return cctx.gridIO().checkNodeLeft(nodeId, sndErr, ping);
}
/**
* Sends communication message.
*
* @param node Node to send the message to.
* @param msg Message to send.
* @param plc IO policy.
* @throws IgniteCheckedException If sending failed.
* @throws ClusterTopologyCheckedException If receiver left.
*/
public void send(ClusterNode node, GridCacheMessage msg, byte plc) throws IgniteCheckedException {
assert !node.isLocal() : node;
msg.lastAffinityChangedTopologyVersion(cctx.exchange().lastAffinityChangedTopologyVersion(msg.topologyVersion()));
if (!onSend(msg, node.id()))
return;
if (log.isDebugEnabled())
log.debug("Sending cache message [msg=" + msg + ", node=" + U.toShortString(node) + ']');
int cnt = 0;
while (cnt <= retryCnt) {
try {
cnt++;
cctx.gridIO().sendToGridTopic(node, TOPIC_CACHE, msg, plc);
return;
}
catch (ClusterTopologyCheckedException e) {
throw e;
}
catch (IgniteCheckedException e) {
if (!cctx.discovery().alive(node.id()) || !cctx.discovery().pingNode(node.id()))
throw new ClusterTopologyCheckedException("Node left grid while sending message to: " + node.id(), e);
if (cnt == retryCnt || cctx.kernalContext().isStopping())
throw e;
else if (log.isDebugEnabled())
log.debug("Failed to send message to node (will retry): " + node.id());
}
U.sleep(retryDelay);
}
if (log.isDebugEnabled())
log.debug("Sent cache message [msg=" + msg + ", node=" + U.toShortString(node) + ']');
}
/**
* Sends communication message.
*
* @param nodeId ID of node to send the message to.
* @param msg Message to send.
* @param plc IO policy.
* @throws IgniteCheckedException If sending failed.
*/
public void send(UUID nodeId, GridCacheMessage msg, byte plc) throws IgniteCheckedException {
ClusterNode n = cctx.discovery().node(nodeId);
if (n == null)
throw new ClusterTopologyCheckedException("Failed to send message because node left grid [nodeId=" + nodeId +
", msg=" + msg + ']');
send(n, msg, plc);
}
/**
* @param node Destination node.
* @param topic Topic to send the message to.
* @param msg Message to send.
* @param plc IO policy.
* @param timeout Timeout to keep a message on receiving queue.
* @throws IgniteCheckedException Thrown in case of any errors.
*/
public void sendOrderedMessage(ClusterNode node, Object topic, GridCacheMessage msg, byte plc,
long timeout) throws IgniteCheckedException {
if (!onSend(msg, node.id()))
return;
msg.lastAffinityChangedTopologyVersion(cctx.exchange().lastAffinityChangedTopologyVersion(msg.topologyVersion()));
int cnt = 0;
while (cnt <= retryCnt) {
try {
cnt++;
cctx.gridIO().sendOrderedMessage(node, topic, msg, plc, timeout, false);
if (log.isDebugEnabled())
log.debug("Sent ordered cache message [topic=" + topic + ", msg=" + msg +
", nodeId=" + node.id() + ']');
return;
}
catch (ClusterTopologyCheckedException e) {
throw e;
}
catch (IgniteCheckedException e) {
if (cctx.discovery().node(node.id()) == null)
throw new ClusterTopologyCheckedException("Node left grid while sending ordered message to: " + node.id(), e);
if (cnt == retryCnt)
throw e;
else if (log.isDebugEnabled())
log.debug("Failed to send message to node (will retry): " + node.id());
}
U.sleep(retryDelay);
}
}
/**
* @return ID that auto-grows based on local counter and counters received from other nodes.
*/
public long nextIoId() {
return idGen.incrementAndGet();
}
/**
* Sends message without retries and node ping in case of error.
*
* @param node Node to send message to.
* @param msg Message.
* @param plc IO policy.
* @throws IgniteCheckedException If send failed.
*/
void sendNoRetry(ClusterNode node,
GridCacheMessage msg,
byte plc)
throws IgniteCheckedException {
assert node != null;
assert msg != null;
if (!onSend(msg, null))
return;
msg.lastAffinityChangedTopologyVersion(cctx.exchange().lastAffinityChangedTopologyVersion(msg.topologyVersion()));
try {
cctx.gridIO().sendToGridTopic(node, TOPIC_CACHE, msg, plc);
if (log.isDebugEnabled())
log.debug("Sent cache message [msg=" + msg + ", node=" + U.toShortString(node) + ']');
}
catch (ClusterTopologyCheckedException e) {
throw e;
}
catch (IgniteCheckedException e) {
if (!cctx.discovery().alive(node.id()))
throw new ClusterTopologyCheckedException("Node left grid while sending message to: " + node.id(), e);
else
throw e;
}
}
/**
* @param hndId Message handler ID.
* @param type Type of message.
* @param c Handler.
*/
public <Msg extends GridCacheMessage> void addCacheHandler(
int hndId,
Class<Msg> type,
IgniteBiInClosure<UUID, ? super Msg> c
) {
assert !type.isAssignableFrom(GridCacheGroupIdMessage.class) : type;
addHandler(hndId, type, c, cacheHandlers);
}
/**
* @param hndId Message handler ID.
* @param type Type of message.
* @param c Handler.
*/
public <Msg extends GridCacheGroupIdMessage> void addCacheGroupHandler(
int hndId,
Class<Msg> type,
IgniteBiInClosure<UUID, ? super Msg> c
) {
assert !type.isAssignableFrom(GridCacheIdMessage.class) : type;
addHandler(hndId, type, c, grpHandlers);
}
/**
* @param hndId Message handler ID.
* @param type Type of message.
* @param c Handler.
* @param msgHandlers Message handlers.
*/
private <Msg extends GridCacheMessage> void addHandler(
int hndId,
Class<Msg> type,
IgniteBiInClosure<UUID, ? super Msg> c,
MessageHandlers msgHandlers
) {
int msgIdx = messageIndex(type);
if (msgIdx != -1) {
Map<Integer, IgniteBiInClosure[]> idxClsHandlers0 = msgHandlers.idxClsHandlers;
IgniteBiInClosure[] cacheClsHandlers = idxClsHandlers0.compute(hndId, (key, clsHandlers) -> {
if (clsHandlers == null)
clsHandlers = new IgniteBiInClosure[GridCacheMessage.MAX_CACHE_MSG_LOOKUP_INDEX];
if (clsHandlers[msgIdx] != null)
return null;
clsHandlers[msgIdx] = c;
return clsHandlers;
});
if (cacheClsHandlers == null)
throw new IgniteException("Duplicate cache message ID found [hndId=" + hndId +
", type=" + type + ']');
return;
}
else {
ListenerKey key = new ListenerKey(hndId, type);
if (msgHandlers.clsHandlers.putIfAbsent(key,
(IgniteBiInClosure<UUID, GridCacheMessage>)c) != null)
assert false : "Handler for class already registered [hndId=" + hndId + ", cls=" + type +
", old=" + msgHandlers.clsHandlers.get(key) + ", new=" + c + ']';
}
IgniteLogger log0 = log;
if (log0 != null && log0.isTraceEnabled())
log0.trace(
"Registered cache communication handler [hndId=" + hndId + ", type=" + type +
", msgIdx=" + msgIdx + ", handler=" + c + ']');
}
/**
* @param cacheId Cache ID to remove handlers for.
*/
void removeCacheHandlers(int cacheId) {
removeHandlers(cacheHandlers, cacheId);
}
/**
* @param grpId Cache group ID to remove handlers for.
*/
void removeCacheGroupHandlers(int grpId) {
removeHandlers(grpHandlers, grpId);
}
/**
* @param msgHandlers Handlers.
* @param hndId ID to remove handlers for.
*/
private void removeHandlers(MessageHandlers msgHandlers, int hndId) {
assert hndId != 0;
msgHandlers.idxClsHandlers.remove(hndId);
for (Iterator<ListenerKey> iter = msgHandlers.clsHandlers.keySet().iterator(); iter.hasNext(); ) {
ListenerKey key = iter.next();
if (key.msgCls.equals(GridDhtAffinityAssignmentResponse.class))
continue;
if (key.hndId == hndId)
iter.remove();
}
}
/**
* @param cacheGrp {@code True} if cache group handler, {@code false} if cache handler.
* @param hndId Handler ID.
* @param type Message type.
*/
public void removeHandler(boolean cacheGrp, int hndId, Class<? extends GridCacheMessage> type) {
MessageHandlers msgHandlers = cacheGrp ? grpHandlers : cacheHandlers;
msgHandlers.clsHandlers.remove(new ListenerKey(hndId, type));
}
/**
* @param msgCls Message class to check.
* @return Message index.
*/
private int messageIndex(Class<?> msgCls) {
try {
Integer msgIdx = U.field(msgCls, GridCacheMessage.CACHE_MSG_INDEX_FIELD_NAME);
if (msgIdx == null || msgIdx < 0)
return -1;
return msgIdx;
}
catch (IgniteCheckedException ignored) {
return -1;
}
}
/**
* @param cctx Context.
* @param topic Topic.
* @param c Handler.
*/
public void addOrderedCacheHandler(GridCacheSharedContext cctx, Object topic, IgniteBiInClosure<UUID, ? extends GridCacheIdMessage> c) {
addOrderedHandler(cctx, false, topic, c);
}
/**
* @param cctx Context.
* @param topic Topic.
* @param c Handler.
*/
public void addOrderedCacheGroupHandler(
GridCacheSharedContext cctx,
Object topic,
IgniteBiInClosure<UUID, ? extends GridCacheGroupIdMessage> c
) {
addOrderedHandler(cctx, true, topic, c);
}
/**
* Adds ordered message handler.
*
* @param cctx Context.
* @param cacheGrp {@code True} if cache group message, {@code false} if cache message.
* @param topic Topic.
* @param c Handler.
*/
private void addOrderedHandler(
GridCacheSharedContext cctx,
boolean cacheGrp,
Object topic,
IgniteBiInClosure<UUID, ? extends GridCacheMessage> c
) {
MessageHandlers msgHandlers = cacheGrp ? grpHandlers : cacheHandlers;
IgniteLogger log0 = log;
if (msgHandlers.orderedHandlers.putIfAbsent(topic, c) == null) {
cctx.gridIO().addMessageListener(topic, new OrderedMessageListener(
(IgniteBiInClosure<UUID, GridCacheMessage>)c));
if (log0 != null && log0.isTraceEnabled())
log0.trace("Registered ordered cache communication handler [topic=" + topic + ", handler=" + c + ']');
}
else if (log0 != null)
U.warn(log0, "Failed to register ordered cache communication handler because it is already " +
"registered for this topic [topic=" + topic + ", handler=" + c + ']');
}
/**
* Removed ordered message handler.
*
* @param cacheGrp {@code True} if cache group message, {@code false} if cache message.
* @param topic Topic.
*/
public void removeOrderedHandler(boolean cacheGrp, Object topic) {
MessageHandlers msgHandlers = cacheGrp ? grpHandlers : cacheHandlers;
if (msgHandlers.orderedHandlers.remove(topic) != null) {
cctx.gridIO().removeMessageListener(topic);
if (log != null && log.isDebugEnabled())
log.debug("Unregistered ordered cache communication handler for topic:" + topic);
}
else if (log != null)
U.warn(log, "Failed to unregister ordered cache communication handler because it was not found " +
"for topic: " + topic);
}
/**
* @param nodeId Sender node ID.
* @param cacheMsg Message.
*/
@SuppressWarnings({"ErrorNotRethrown"})
private void unmarshall(UUID nodeId, GridCacheMessage cacheMsg) {
if (cctx.localNodeId().equals(nodeId))
return;
GridDeploymentInfo bean = cacheMsg.deployInfo();
try {
if (bean != null) {
assert depEnabled : "Received deployment info while peer class loading is disabled [nodeId=" + nodeId +
", msg=" + cacheMsg + ']';
cctx.deploy().p2pContext(
nodeId,
bean.classLoaderId(),
bean.userVersion(),
bean.deployMode(),
bean.participants()
);
if (log.isDebugEnabled())
log.debug("Set P2P context [senderId=" + nodeId + ", msg=" + cacheMsg + ']');
}
cacheMsg.finishUnmarshal(cctx, cctx.deploy().globalLoader());
}
catch (IgniteCheckedException e) {
cacheMsg.onClassError(e);
}
catch (BinaryObjectException e) {
cacheMsg.onClassError(new IgniteCheckedException(e));
}
catch (Error e) {
if (cacheMsg.ignoreClassErrors() && X.hasCause(e, NoClassDefFoundError.class,
UnsupportedClassVersionError.class))
cacheMsg.onClassError(new IgniteCheckedException("Failed to load class during unmarshalling: " + e, e));
else
throw e;
}
}
/**
* @param msg Message.
* @return Cache or group descriptor.
*/
private Object descriptorForMessage(GridCacheMessage msg) {
if (msg instanceof GridCacheIdMessage)
return cctx.cache().cacheDescriptor(((GridCacheIdMessage)msg).cacheId());
else if (msg instanceof GridCacheGroupIdMessage)
return cctx.cache().cacheGroupDescriptors().get(((GridCacheGroupIdMessage)msg).groupId());
return null;
}
/** {@inheritDoc} */
@Override public void printMemoryStats() {
X.println(">>> ");
X.println(">>> Cache IO manager memory stats [igniteInstanceName=" + cctx.igniteInstanceName() + ']');
X.println(">>> cacheClsHandlersSize: " + cacheHandlers.clsHandlers.size());
X.println(">>> cacheOrderedHandlersSize: " + cacheHandlers.orderedHandlers.size());
X.println(">>> cacheGrpClsHandlersSize: " + grpHandlers.clsHandlers.size());
X.println(">>> cacheGrpOrderedHandlersSize: " + grpHandlers.orderedHandlers.size());
}
/**
*
*/
static class MessageHandlers {
/** Indexed class handlers. */
volatile Map<Integer, IgniteBiInClosure[]> idxClsHandlers = new ConcurrentHashMap<>();
/** Handler registry. */
ConcurrentMap<ListenerKey, IgniteBiInClosure<UUID, GridCacheMessage>>
clsHandlers = new ConcurrentHashMap<>();
/** Ordered handler registry. */
ConcurrentMap<Object, IgniteBiInClosure<UUID, ? extends GridCacheMessage>> orderedHandlers =
new ConcurrentHashMap<>();
}
/**
* Ordered message listener.
*/
private class OrderedMessageListener implements GridMessageListener {
/** */
private final IgniteBiInClosure<UUID, GridCacheMessage> c;
/**
* @param c Handler closure.
*/
OrderedMessageListener(IgniteBiInClosure<UUID, GridCacheMessage> c) {
this.c = c;
}
/** {@inheritDoc} */
@Override public void onMessage(final UUID nodeId, Object msg, byte plc) {
if (log.isDebugEnabled())
log.debug("Received cache ordered message [nodeId=" + nodeId + ", msg=" + msg + ']');
Lock lock = rw.readLock();
lock.lock();
try {
GridCacheMessage cacheMsg = (GridCacheMessage)msg;
onMessage0(nodeId, cacheMsg, c, plc);
}
finally {
lock.unlock();
}
}
}
/**
*
*/
private static class ListenerKey {
/** Cache ID. */
private int hndId;
/** Message class. */
private Class<? extends GridCacheMessage> msgCls;
/**
* @param hndId Handler ID.
* @param msgCls Message class.
*/
private ListenerKey(int hndId, Class<? extends GridCacheMessage> msgCls) {
this.hndId = hndId;
this.msgCls = msgCls;
}
/** {@inheritDoc} */
@Override public boolean equals(Object o) {
if (this == o)
return true;
if (!(o instanceof ListenerKey))
return false;
ListenerKey that = (ListenerKey)o;
return hndId == that.hndId && msgCls.equals(that.msgCls);
}
/** {@inheritDoc} */
@Override public int hashCode() {
int res = hndId;
res = 31 * res + msgCls.hashCode();
return res;
}
}
}