blob: 70fb45d6081fc03c0b5af26117a5dc149196093e [file] [log] [blame]
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package org.apache.ignite.internal.managers.communication;
import java.nio.channels.Channel;
import java.nio.channels.FileChannel;
import java.nio.channels.ReadableByteChannel;
import java.nio.channels.SocketChannel;
import java.nio.channels.WritableByteChannel;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Date;
import java.util.Deque;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Queue;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.atomic.LongAdder;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.BooleanSupplier;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteMessaging;
import org.apache.ignite.cluster.ClusterGroup;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.GridTopic;
import org.apache.ignite.internal.IgniteClientDisconnectedCheckedException;
import org.apache.ignite.internal.IgniteComponentType;
import org.apache.ignite.internal.IgniteDeploymentCheckedException;
import org.apache.ignite.internal.IgniteFeatures;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.IgniteKernal;
import org.apache.ignite.internal.NodeStoppingException;
import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
import org.apache.ignite.internal.managers.GridManagerAdapter;
import org.apache.ignite.internal.managers.deployment.GridDeployment;
import org.apache.ignite.internal.managers.eventstorage.GridEventStorageManager;
import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
import org.apache.ignite.internal.processors.cache.mvcc.msg.MvccMessage;
import org.apache.ignite.internal.processors.cache.persistence.file.FileIO;
import org.apache.ignite.internal.processors.cache.persistence.file.FileIOFactory;
import org.apache.ignite.internal.processors.cache.persistence.file.RandomAccessFileIO;
import org.apache.ignite.internal.processors.cache.persistence.file.RandomAccessFileIOFactory;
import org.apache.ignite.internal.processors.metric.MetricRegistry;
import org.apache.ignite.internal.processors.platform.message.PlatformMessageFilter;
import org.apache.ignite.internal.processors.pool.PoolProcessor;
import org.apache.ignite.internal.processors.timeout.GridTimeoutObject;
import org.apache.ignite.internal.processors.tracing.MTC;
import org.apache.ignite.internal.processors.tracing.MTC.TraceSurroundings;
import org.apache.ignite.internal.processors.tracing.Span;
import org.apache.ignite.internal.processors.tracing.SpanTags;
import org.apache.ignite.internal.util.GridBoundedConcurrentLinkedHashSet;
import org.apache.ignite.internal.util.StripedCompositeReadWriteLock;
import org.apache.ignite.internal.util.future.GridFinishedFuture;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.internal.util.lang.IgnitePair;
import org.apache.ignite.internal.util.tostring.GridToStringExclude;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.T2;
import org.apache.ignite.internal.util.typedef.X;
import org.apache.ignite.internal.util.typedef.internal.A;
import org.apache.ignite.internal.util.typedef.internal.LT;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteBiPredicate;
import org.apache.ignite.lang.IgniteInClosure;
import org.apache.ignite.lang.IgniteRunnable;
import org.apache.ignite.lang.IgniteUuid;
import org.apache.ignite.marshaller.Marshaller;
import org.apache.ignite.plugin.extensions.communication.Message;
import org.apache.ignite.plugin.extensions.communication.MessageFactory;
import org.apache.ignite.plugin.extensions.communication.MessageFormatter;
import org.apache.ignite.plugin.extensions.communication.MessageReader;
import org.apache.ignite.plugin.extensions.communication.MessageWriter;
import org.apache.ignite.spi.IgniteSpiException;
import org.apache.ignite.spi.communication.CommunicationListener;
import org.apache.ignite.spi.communication.CommunicationSpi;
import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
import org.apache.ignite.spi.communication.tcp.internal.CommunicationListenerEx;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import static;
import static;
import static;
import static org.apache.ignite.internal.GridTopic.TOPIC_CACHE_COORDINATOR;
import static org.apache.ignite.internal.GridTopic.TOPIC_COMM_USER;
import static org.apache.ignite.internal.GridTopic.TOPIC_IO_TEST;
import static org.apache.ignite.internal.IgniteFeatures.CHANNEL_COMMUNICATION;
import static org.apache.ignite.internal.IgniteFeatures.nodeSupports;
import static org.apache.ignite.internal.managers.communication.GridIoPolicy.AFFINITY_POOL;
import static org.apache.ignite.internal.managers.communication.GridIoPolicy.DATA_STREAMER_POOL;
import static org.apache.ignite.internal.managers.communication.GridIoPolicy.IDX_POOL;
import static org.apache.ignite.internal.managers.communication.GridIoPolicy.IGFS_POOL;
import static org.apache.ignite.internal.managers.communication.GridIoPolicy.MANAGEMENT_POOL;
import static org.apache.ignite.internal.managers.communication.GridIoPolicy.P2P_POOL;
import static org.apache.ignite.internal.managers.communication.GridIoPolicy.PUBLIC_POOL;
import static org.apache.ignite.internal.managers.communication.GridIoPolicy.QUERY_POOL;
import static org.apache.ignite.internal.managers.communication.GridIoPolicy.SCHEMA_POOL;
import static org.apache.ignite.internal.managers.communication.GridIoPolicy.SERVICE_POOL;
import static org.apache.ignite.internal.managers.communication.GridIoPolicy.SYSTEM_POOL;
import static org.apache.ignite.internal.managers.communication.GridIoPolicy.UTILITY_CACHE_POOL;
import static org.apache.ignite.internal.managers.communication.GridIoPolicy.isReservedGridIoPolicy;
import static org.apache.ignite.internal.processors.metric.impl.MetricUtils.metricName;
import static;
import static org.apache.ignite.internal.processors.tracing.SpanType.COMMUNICATION_ORDERED_PROCESS;
import static org.apache.ignite.internal.processors.tracing.SpanType.COMMUNICATION_REGULAR_PROCESS;
import static org.apache.ignite.internal.processors.tracing.messages.TraceableMessagesTable.traceName;
import static org.apache.ignite.internal.util.nio.GridNioBackPressureControl.threadProcessingMessage;
import static org.jsr166.ConcurrentLinkedHashMap.QueuePolicy.PER_SEGMENT_Q_OPTIMIZED_RMV;
* This class represents the internal grid communication (<em>input</em> and <em>output</em>) manager
* which is placed as a layer of indirection between the {@link IgniteKernal} and {@link CommunicationSpi}.
* The IO manager is responsible for controlling CommunicationSPI which in turn is responsible
* for exchanging data between Ignite nodes.
* <h2>Data exchanging</h2>
* <p>
* Communication manager provides a rich API for data exchanging between a pair of cluster nodes. Two types
* of communication <em>Message-based communication</em> and <em>File-based communication</em> are available.
* Each of them support sending data to an arbitrary topic on the remote node (see {@link GridTopic} for an
* additional information about Ignite topics).
* <h3>Message-based communication</h3>
* <p>
* {@link Message} and {@link GridTopic} are used to provide a topic-based messaging protocol
* between cluster nodes. All of messages used for data exchanging can be devided into two general types:
* <em>internal</em> and <em>user</em> messages.
* <p>
* <em>Internal message</em> communication is used by Ignite Kernal. Please, refer to appropriate methods below:
* <ul>
* <li>{@link #sendToGridTopic(ClusterNode, GridTopic, Message, byte)}</li>
* <li>{@link #sendOrderedMessage(ClusterNode, Object, Message, byte, long, boolean)}</li>
* <li>{@link #addMessageListener(Object, GridMessageListener)}</li>
* </ul>
* <p>
* <em>User message</em> communication is directly exposed to the {@link IgniteMessaging} API and provides
* for user functionality for topic-based message exchanging among nodes within the cluser defined
* by {@link ClusterGroup}. Please, refer to appropriate methods below:
* <ul>
* <li>{@link #sendToCustomTopic(ClusterNode, Object, Message, byte)}</li>
* <li>{@link #addUserMessageListener(Object, IgniteBiPredicate, UUID)}</li>
* </ul>
* <h3>File-based communication</h3>
* <p>
* Sending or receiving binary data (represented by a <em>File</em>) over a <em>SocketChannel</em> is only
* possible when the build-in {@link TcpCommunicationSpi} implementation of Communication SPI is used and
* both local and remote nodes are {@link IgniteFeatures#CHANNEL_COMMUNICATION CHANNEL_COMMUNICATION} feature
* support. To ensue that the remote node satisfies all conditions the {@link #fileTransmissionSupported(ClusterNode)}
* method must be called prior to data sending.
* <p>
* It is possible to receive a set of files on a particular topic (any of {@link GridTopic}) on the remote node.
* A transmission handler for desired topic must be registered prior to opening transmission sender to it.
* Methods below are used to register handlers and open new transmissions:
* <ul>
* <li>{@link #addTransmissionHandler(Object, TransmissionHandler)}</li>
* <li>{@link #removeTransmissionHandler(Object)}</li>
* <li>{@link #openTransmissionSender(UUID, Object)}</li>
* </ul>
* <p>
* Each transmission sender opens a new transmission session to remote node prior to sending files over it.
* (see description of {@link TransmissionSender} for details). The {@link TransmissionSender}
* will send all files within single session syncronously one by one.
* <p>
* <em>NOTE.</em> It is important to call <em>close()</em> method or use <em>try-with-resource</em>
* statement to release all resources once you've done with the transmission session. This ensures that all
* resources are released on remote node in a proper way (i.e. transmission handlers are closed).
* <p>
* @see TcpCommunicationSpi
* @see IgniteMessaging
* @see TransmissionHandler
public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializable>> {
/** Io communication metrics registry name. */
public static final String COMM_METRICS = metricName("io", "communication");
/** Outbound message queue size metric name. */
public static final String OUTBOUND_MSG_QUEUE_CNT = "OutboundMessagesQueueSize";
/** Sent messages count metric name. */
public static final String SENT_MSG_CNT = "SentMessagesCount";
/** Sent bytes count metric name. */
public static final String SENT_BYTES_CNT = "SentBytesCount";
/** Received messages count metric name. */
public static final String RCVD_MSGS_CNT = "ReceivedMessagesCount";
/** Received bytes count metric name. */
public static final String RCVD_BYTES_CNT = "ReceivedBytesCount";
/** Empty array of message factories. */
public static final MessageFactory[] EMPTY = {};
/** Max closed topics to store. */
public static final int MAX_CLOSED_TOPICS = 10240;
/** Direct protocol version attribute name. */
public static final String DIRECT_PROTO_VER_ATTR = "";
/** Direct protocol version. */
public static final byte DIRECT_PROTO_VER = 3;
/** Current IO policy. */
private static final ThreadLocal<Byte> CUR_PLC = new ThreadLocal<>();
* Default chunk size in bytes used for sending\receiving files over a {@link SocketChannel}.
* Setting the transfer chunk size more than <tt>1 MB</tt> is meaningless because there is
* no asymptotic benefit. What you're trying to achieve with larger transfer chunk sizes is
* fewer thread context switches, and every time we double the transfer size you have
* the context switch cost.
* <p>
* Default value is {@code 256Kb}.
private static final int DFLT_CHUNK_SIZE_BYTES = 256 * 1024;
/** Mutex to achieve consistency of transmission handlers and receiver contexts. */
private final Object rcvMux = new Object();
/** Map of registered handlers per each IO topic. */
private final ConcurrentMap<Object, TransmissionHandler> topicTransmissionHnds = new ConcurrentHashMap<>();
/** The map of already known channel read contexts by its registered topics. */
private final ConcurrentMap<Object, ReceiverContext> rcvCtxs = new ConcurrentHashMap<>();
/** The map of sessions which are currently writing files and their corresponding interruption flags. */
private final ConcurrentMap<T2<UUID, IgniteUuid>, AtomicBoolean> senderStopFlags = new ConcurrentHashMap<>();
* Default factory to provide IO operation interface over files for further transmission them between nodes.
* Some implementations of file senders\receivers are using the zero-copy approach to transfer bytes
* from a file to the given {@link SocketChannel} and vice-versa. So, it is necessary to produce an {@link FileIO}
* implementation based on {@link FileChannel} which is reflected in Ignite project as {@link RandomAccessFileIO}.
* @see FileChannel#transferTo(long, long, WritableByteChannel)
private final FileIOFactory fileIoFactory = new RandomAccessFileIOFactory();
/** The maximum number of retry attempts (read or write attempts). */
private final int retryCnt;
/** Network timeout in milliseconds. */
private final int netTimeoutMs;
/** Listeners by topic. */
private final ConcurrentMap<Object, GridMessageListener> lsnrMap = new ConcurrentHashMap<>();
/** System listeners. */
private volatile GridMessageListener[] sysLsnrs;
/** Mutex for system listeners. */
private final Object sysLsnrsMux = new Object();
/** Disconnect listeners. */
private final Collection<GridDisconnectListener> disconnectLsnrs = new ConcurrentLinkedQueue<>();
/** Pool processor. */
private final PoolProcessor pools;
/** Discovery listener. */
private GridLocalEventListener discoLsnr;
/** */
private final ConcurrentMap<Object, ConcurrentMap<UUID, GridCommunicationMessageSet>> msgSetMap =
new ConcurrentHashMap<>();
/** Local node ID. */
private final UUID locNodeId;
/** Cache for messages that were received prior to discovery. */
private final ConcurrentMap<UUID, Deque<DelayedMessage>> waitMap = new ConcurrentHashMap<>();
/** Communication message listener. */
private CommunicationListenerEx<Serializable> commLsnr;
/** Grid marshaller. */
private final Marshaller marsh;
/** Busy lock. */
private final ReadWriteLock busyLock =
new StripedCompositeReadWriteLock(Runtime.getRuntime().availableProcessors());
/** Lock to sync maps access. */
private final ReadWriteLock lock = new ReentrantReadWriteLock();
/** Fully started flag. When set to true, can send and receive messages. */
private volatile boolean started;
/** Closed topics. */
private final GridBoundedConcurrentLinkedHashSet<Object> closedTopics =
new GridBoundedConcurrentLinkedHashSet<>(MAX_CLOSED_TOPICS, MAX_CLOSED_TOPICS, 0.75f, 256,
/** */
private MessageFactory msgFactory;
/** */
private MessageFormatter formatter;
/** Stopping flag. */
private volatile boolean stopping;
/** */
private final AtomicReference<ConcurrentHashMap<Long, IoTestFuture>> ioTestMap = new AtomicReference<>();
/** */
private final AtomicLong ioTestId = new AtomicLong();
/** No-op runnable. */
private static final IgniteRunnable NOOP = () -> {};
* @param ctx Grid kernal context.
public GridIoManager(GridKernalContext ctx) {
super(ctx, ctx.config().getCommunicationSpi());
pools = ctx.pools();
assert pools != null;
locNodeId = ctx.localNodeId();
marsh = ctx.config().getMarshaller();
synchronized (sysLsnrsMux) {
sysLsnrs = new GridMessageListener[GridTopic.values().length];
retryCnt = ctx.config().getNetworkSendRetryCount();
netTimeoutMs = (int)ctx.config().getNetworkTimeout();
* @return Message factory.
public MessageFactory messageFactory() {
assert msgFactory != null;
return msgFactory;
* @return Message writer factory.
public MessageFormatter formatter() {
assert formatter != null;
return formatter;
* Resets metrics for this manager.
public void resetMetrics() {
/** {@inheritDoc} */
@Override public void start() throws IgniteCheckedException {
MessageFormatter[] formatterExt = ctx.plugins().extensions(MessageFormatter.class);
if (formatterExt != null && formatterExt.length > 0) {
if (formatterExt.length > 1)
throw new IgniteCheckedException("More than one MessageFormatter extension is defined. Check your " +
"plugins configuration and make sure that only one of them provides custom message format.");
formatter = formatterExt[0];
else {
formatter = new MessageFormatter() {
@Override public MessageWriter writer(UUID rmtNodeId) throws IgniteCheckedException {
assert rmtNodeId != null;
return new DirectMessageWriter(U.directProtocolVersion(ctx, rmtNodeId));
@Override public MessageReader reader(UUID rmtNodeId, MessageFactory msgFactory)
throws IgniteCheckedException {
return new DirectMessageReader(msgFactory,
rmtNodeId != null ? U.directProtocolVersion(ctx, rmtNodeId) : DIRECT_PROTO_VER);
MessageFactory[] msgs = ctx.plugins().extensions(MessageFactory.class);
if (msgs == null)
msgs = EMPTY;
List<MessageFactory> compMsgs = new ArrayList<>();
compMsgs.add(new GridIoMessageFactory());
for (IgniteComponentType compType : IgniteComponentType.values()) {
MessageFactory f = compType.messageFactory();
if (f != null)
if (!compMsgs.isEmpty())
msgs = F.concat(msgs, compMsgs.toArray(new MessageFactory[compMsgs.size()]));
msgFactory = new IgniteMessageFactoryImpl(msgs);
MetricRegistry ioMetric = ctx.metric().registry(COMM_METRICS);
CommunicationSpi spi = ctx.config().getCommunicationSpi();
ioMetric.register(OUTBOUND_MSG_QUEUE_CNT, spi::getOutboundMessagesQueueSize,
"Outbound messages queue size.");
ioMetric.register(SENT_MSG_CNT, spi::getSentMessagesCount, "Sent messages count.");
ioMetric.register(SENT_BYTES_CNT, spi::getSentBytesCount, "Sent bytes count.");
ioMetric.register(RCVD_MSGS_CNT, spi::getReceivedMessagesCount,
"Received messages count.");
ioMetric.register(RCVD_BYTES_CNT, spi::getReceivedBytesCount, "Received bytes count.");
getSpi().setListener(commLsnr = new CommunicationListenerEx<Serializable>() {
@Override public void onMessage(UUID nodeId, Serializable msg, IgniteRunnable msgC) {
try {
onMessage0(nodeId, (GridIoMessage)msg, msgC);
catch (ClassCastException ignored) {
U.error(log, "Communication manager received message of unknown type (will ignore): " +
msg.getClass().getName() + ". Most likely GridCommunicationSpi is being used directly, " +
"which is illegal - make sure to send messages only via GridProjection API.");
@Override public void onDisconnected(UUID nodeId) {
for (GridDisconnectListener lsnr : disconnectLsnrs)
@Override public void onChannelOpened(UUID rmtNodeId, Serializable initMsg, Channel channel) {
try {
onChannelOpened0(rmtNodeId, (GridIoMessage)initMsg, channel);
catch (ClassCastException ignored) {
U.error(log, "Communication manager received message of unknown type (will ignore): " +
initMsg.getClass().getName() + ". Most likely GridCommunicationSpi is being used directly, " +
"which is illegal - make sure to send messages only via GridProjection API.");
if (log.isDebugEnabled())
addMessageListener(GridTopic.TOPIC_IO_TEST, new GridMessageListener() {
@Override public void onMessage(UUID nodeId, Object msg, byte plc) {
ClusterNode node = ctx.discovery().node(nodeId);
if (node == null)
IgniteIoTestMessage msg0 = (IgniteIoTestMessage)msg;
if (msg0.request()) {
IgniteIoTestMessage res = new IgniteIoTestMessage(, false, null);
try {
sendToGridTopic(node, GridTopic.TOPIC_IO_TEST, res, GridIoPolicy.SYSTEM_POOL);
catch (IgniteCheckedException e) {
U.error(log, "Failed to send IO test response [msg=" + msg0 + "]", e);
else {
IoTestFuture fut = ioTestMap().get(;
if (fut == null)
U.warn(log, "Failed to find IO test future [msg=" + msg0 + ']');
* @param nodes Nodes.
* @param payload Payload.
* @param procFromNioThread If {@code true} message is processed from NIO thread.
* @return Response future.
public IgniteInternalFuture sendIoTest(List<ClusterNode> nodes, byte[] payload, boolean procFromNioThread) {
long id = ioTestId.getAndIncrement();
IoTestFuture fut = new IoTestFuture(id, nodes.size());
IgniteIoTestMessage msg = new IgniteIoTestMessage(id, true, payload);
ioTestMap().put(id, fut);
for (int i = 0; i < nodes.size(); i++) {
ClusterNode node = nodes.get(i);
try {
sendToGridTopic(node, GridTopic.TOPIC_IO_TEST, msg, GridIoPolicy.SYSTEM_POOL);
catch (IgniteCheckedException e) {
return new GridFinishedFuture(e);
return fut;
* @param node Node.
* @param payload Payload.
* @param procFromNioThread If {@code true} message is processed from NIO thread.
* @return Response future.
public IgniteInternalFuture<List<IgniteIoTestMessage>> sendIoTest(
ClusterNode node,
byte[] payload,
boolean procFromNioThread
) {
long id = ioTestId.getAndIncrement();
IoTestFuture fut = new IoTestFuture(id, 1);
IgniteIoTestMessage msg = new IgniteIoTestMessage(id, true, payload);
ioTestMap().put(id, fut);
try {
sendToGridTopic(node, GridTopic.TOPIC_IO_TEST, msg, GridIoPolicy.SYSTEM_POOL);
catch (IgniteCheckedException e) {
return new GridFinishedFuture(e);
return fut;
* @return IO test futures map.
private ConcurrentHashMap<Long, IoTestFuture> ioTestMap() {
ConcurrentHashMap<Long, IoTestFuture> map = ioTestMap.get();
if (map == null) {
if (!ioTestMap.compareAndSet(null, map = new ConcurrentHashMap<>()))
map = ioTestMap.get();
return map;
* @param warmup Warmup duration in milliseconds.
* @param duration Test duration in milliseconds.
* @param threads Thread count.
* @param latencyLimit Max latency in nanoseconds.
* @param rangesCnt Ranges count in resulting histogram.
* @param payLoadSize Payload size in bytes.
* @param procFromNioThread {@code True} to process requests in NIO threads.
* @param nodes Nodes participating in test.
public void runIoTest(
final long warmup,
final long duration,
final int threads,
final long latencyLimit,
final int rangesCnt,
final int payLoadSize,
final boolean procFromNioThread,
final List<ClusterNode> nodes
) {
ExecutorService svc = Executors.newFixedThreadPool(threads + 1);
final AtomicBoolean warmupFinished = new AtomicBoolean();
final AtomicBoolean done = new AtomicBoolean();
final CyclicBarrier bar = new CyclicBarrier(threads + 1);
final LongAdder cnt = new LongAdder();
final long sleepDuration = 5000;
final byte[] payLoad = new byte[payLoadSize];
final Map<UUID, IoTestThreadLocalNodeResults>[] res = new Map[threads];
boolean failed = true;
try {
svc.execute(new Runnable() {
@Override public void run() {
boolean failed = true;
try {
long start = System.currentTimeMillis();
if (log.isInfoEnabled())"IO test started " +
"[warmup=" + warmup +
", duration=" + duration +
", threads=" + threads +
", latencyLimit=" + latencyLimit +
", rangesCnt=" + rangesCnt +
", payLoadSize=" + payLoadSize +
", procFromNioThreads=" + procFromNioThread + ']'
for (;;) {
if (!warmupFinished.get() && System.currentTimeMillis() - start > warmup) {
if (log.isInfoEnabled())"IO test warmup finished.");
start = System.currentTimeMillis();
if (warmupFinished.get() && System.currentTimeMillis() - start > duration) {
if (log.isInfoEnabled())"IO test finished, will wait for all threads to finish.");
failed = false;
if (log.isInfoEnabled())"IO test [opsCnt/sec=" + (cnt.sumThenReset() * 1000 / sleepDuration) +
", warmup=" + !warmupFinished.get() +
", elapsed=" + (System.currentTimeMillis() - start) + ']');
// At this point all threads have finished the test and
// stored data to the resulting array of maps.
// Need to iterate it over and sum values for all threads.
catch (InterruptedException | BrokenBarrierException e) {
U.error(log, "IO test failed.", e);
finally {
if (failed)
for (int i = 0; i < threads; i++) {
final int i0 = i;
res[i] = U.newHashMap(nodes.size());
svc.execute(new Runnable() {
@Override public void run() {
boolean failed = true;
ThreadLocalRandom rnd = ThreadLocalRandom.current();
int size = nodes.size();
Map<UUID, IoTestThreadLocalNodeResults> res0 = res[i0];
try {
boolean warmupFinished0 = false;
for (;;) {
if (done.get())
if (!warmupFinished0)
warmupFinished0 = warmupFinished.get();
ClusterNode node = nodes.get(rnd.nextInt(size));
List<IgniteIoTestMessage> msgs = sendIoTest(node, payLoad, procFromNioThread).get();
for (IgniteIoTestMessage msg : msgs) {
UUID nodeId = msg.senderNodeId();
assert nodeId != null;
IoTestThreadLocalNodeResults nodeRes = res0.get(nodeId);
if (nodeRes == null)
nodeRes = new IoTestThreadLocalNodeResults(rangesCnt, latencyLimit));
failed = false;
catch (Exception e) {
U.error(log, "IO test worker thread failed.", e);
finally {
if (failed)
failed = false;
finally {
if (failed)
U.shutdownNow(GridIoManager.class, svc, log);
* @param rawRes Resulting map.
private void printIoTestResults(
Map<UUID, IoTestThreadLocalNodeResults>[] rawRes
) {
Map<UUID, IoTestNodeResults> res = new HashMap<>();
for (Map<UUID, IoTestThreadLocalNodeResults> r : rawRes) {
for (Entry<UUID, IoTestThreadLocalNodeResults> e : r.entrySet()) {
IoTestNodeResults r0 = res.get(e.getKey());
if (r0 == null)
res.put(e.getKey(), r0 = new IoTestNodeResults());
SimpleDateFormat dateFmt = new SimpleDateFormat("HH:mm:ss,SSS");
StringBuilder b = new StringBuilder(
.append("IO test results (round-trip count per each latency bin).")
for (Entry<UUID, IoTestNodeResults> e : res.entrySet()) {
ClusterNode node = ctx.discovery().node(e.getKey());
long binLatencyMcs = e.getValue().binLatencyMcs();
b.append("Node ID: ").append(e.getKey()).append(" (addrs=")
.append(node != null ? node.addresses().toString() : "n/a")
.append(", binLatency=").append(binLatencyMcs).append("mcs")
b.append("Latency bin, mcs | Count exclusive | Percentage exclusive | " +
"Count inclusive | Percentage inclusive ").append(;
long[] nodeRes = e.getValue().resLatency;
long sum = 0;
for (int i = 0; i < nodeRes.length; i++)
sum += nodeRes[i];
long curSum = 0;
for (int i = 0; i < nodeRes.length; i++) {
curSum += nodeRes[i];
if (i < nodeRes.length - 1)
b.append(String.format("<%11d mcs | %15d | %19.6f%% | %15d | %19.6f%%\n",
(i + 1) * binLatencyMcs,
nodeRes[i], (100.0 * nodeRes[i]) / sum,
curSum, (100.0 * curSum) / sum));
b.append(String.format(">%11d mcs | %15d | %19.6f%% | %15d | %19.6f%%\n",
i * binLatencyMcs,
nodeRes[i], (100.0 * nodeRes[i]) / sum,
curSum, (100.0 * curSum) / sum));
b.append("Total latency (ns): ").append(
.append(String.format("%15d", e.getValue().totalLatency)).append(;
b.append("Max latencies (ns):").append(;
format(b, e.getValue().maxLatency, dateFmt);
b.append("Max request send queue times (ns):").append(;
format(b, e.getValue().maxReqSendQueueTime, dateFmt);
b.append("Max request receive queue times (ns):").append(;
format(b, e.getValue().maxReqRcvQueueTime, dateFmt);
b.append("Max response send queue times (ns):").append(;
format(b, e.getValue().maxResSendQueueTime, dateFmt);
b.append("Max response receive queue times (ns):").append(;
format(b, e.getValue().maxResRcvQueueTime, dateFmt);
b.append("Max request wire times (millis):").append(;
format(b, e.getValue().maxReqWireTimeMillis, dateFmt);
b.append("Max response wire times (millis):").append(;
format(b, e.getValue().maxResWireTimeMillis, dateFmt);
if (log.isInfoEnabled());
* @param b Builder.
* @param pairs Pairs to format.
* @param dateFmt Formatter.
private void format(StringBuilder b, Collection<IgnitePair<Long>> pairs, SimpleDateFormat dateFmt) {
for (IgnitePair<Long> p : pairs) {
b.append(String.format("%15d", p.get1())).append(" ")
.append(dateFmt.format(new Date(p.get2()))).append(;
/** {@inheritDoc} */
@Override public void onKernalStart0() throws IgniteCheckedException {
discoLsnr = new GridLocalEventListener() {
@Override public void onEvent(Event evt) {
assert evt instanceof DiscoveryEvent : "Invalid event: " + evt;
DiscoveryEvent discoEvt = (DiscoveryEvent)evt;
UUID nodeId = discoEvt.eventNode().id();
switch (evt.type()) {
assert waitMap.get(nodeId) == null; // We can't receive messages from undiscovered nodes.
try {
// Stop all writer sessions.
for (Map.Entry<T2<UUID, IgniteUuid>, AtomicBoolean> writeSesEntry: senderStopFlags.entrySet()) {
if (writeSesEntry.getKey().get1().equals(nodeId))
synchronized (rcvMux) {
// Clear the context on the uploader node left.
Iterator<Map.Entry<Object, ReceiverContext>> it = rcvCtxs.entrySet().iterator();
while (it.hasNext()) {
Map.Entry<Object, ReceiverContext> e =;
if (nodeId.equals(e.getValue().rmtNodeId)) {
new ClusterTopologyCheckedException("Remote node left the grid. " +
"Receiver has been stopped : " + nodeId));
finally {
for (Map.Entry<Object, ConcurrentMap<UUID, GridCommunicationMessageSet>> e :
msgSetMap.entrySet()) {
ConcurrentMap<UUID, GridCommunicationMessageSet> map = e.getValue();
GridCommunicationMessageSet set;
boolean empty;
synchronized (map) {
set = map.remove(nodeId);
empty = map.isEmpty();
if (set != null) {
if (log.isDebugEnabled())
log.debug("Removed message set due to node leaving grid: " + set);
// Unregister timeout listener.
// Node may still send stale messages for this topic
// even after discovery notification is done.
if (empty)
msgSetMap.remove(e.getKey(), map);
// Clean up delayed and ordered messages (need exclusive lock).
try {
Deque<DelayedMessage> waitList = waitMap.remove(nodeId);
if (log.isDebugEnabled())
log.debug("Removed messages from discovery startup delay list " +
"(sender node left topology): " + waitList);
finally {
assert false : "Unexpected event: " + evt;
ctx.event().addLocalEventListener(discoLsnr, EVT_NODE_JOINED, EVT_NODE_LEFT, EVT_NODE_FAILED);
// Make sure that there are no stale messages due to window between communication
// manager start and kernal start.
// 1. Process wait list.
Collection<Collection<DelayedMessage>> delayedMsgs = new ArrayList<>();
try {
started = true;
for (Entry<UUID, Deque<DelayedMessage>> e : waitMap.entrySet()) {
if (ctx.discovery().node(e.getKey()) != null) {
Deque<DelayedMessage> waitList = waitMap.remove(e.getKey());
if (log.isDebugEnabled())
log.debug("Processing messages from discovery startup delay list: " + waitList);
if (waitList != null)
finally {
// After write lock released.
if (!delayedMsgs.isEmpty()) {
for (Collection<DelayedMessage> col : delayedMsgs)
for (DelayedMessage msg : col)
commLsnr.onMessage(msg.nodeId(), msg.message(), msg.callback());
// 2. Process messages sets.
for (Map.Entry<Object, ConcurrentMap<UUID, GridCommunicationMessageSet>> e : msgSetMap.entrySet()) {
ConcurrentMap<UUID, GridCommunicationMessageSet> map = e.getValue();
for (GridCommunicationMessageSet set : map.values()) {
if (ctx.discovery().node(set.nodeId()) == null) {
// All map modifications should be synced for consistency.
boolean rmv;
synchronized (map) {
rmv = map.remove(set.nodeId(), set);
if (rmv) {
if (log.isDebugEnabled())
log.debug("Removed message set due to node leaving grid: " + set);
// Unregister timeout listener.
boolean rmv;
synchronized (map) {
rmv = map.isEmpty();
if (rmv) {
msgSetMap.remove(e.getKey(), map);
// Node may still send stale messages for this topic
// even after discovery notification is done.
/** {@inheritDoc} */
@Override public void onKernalStop0(boolean cancel) {
// No more communication messages.
boolean interrupted = false;
// Busy wait is intentional.
while (true) {
try {
if (busyLock.writeLock().tryLock(200, TimeUnit.MILLISECONDS))
catch (InterruptedException ignore) {
// Preserve interrupt status & ignore.
// Note that interrupted flag is cleared.
interrupted = true;
try {
if (interrupted)
GridEventStorageManager evtMgr = ctx.event();
if (evtMgr != null && discoLsnr != null)
stopping = true;
Set<ReceiverContext> rcvs;
synchronized (rcvMux) {
rcvs = new HashSet<>(rcvCtxs.values());
for (ReceiverContext rctx : rcvs) {
interruptRecevier(rctx, new NodeStoppingException("Local node io manager requested to be stopped: "
+ ctx.localNodeId()));
finally {
/** {@inheritDoc} */
@Override public void stop(boolean cancel) throws IgniteCheckedException {
if (log.isDebugEnabled())
* @param rmtNodeId The remote node id.
* @param initMsg Message with additional channel params.
* @param channel The channel to notify listeners with.
private void onChannelOpened0(UUID rmtNodeId, GridIoMessage initMsg, Channel channel) {
Lock busyLock0 = busyLock.readLock();
try {
if (stopping) {
if (log.isDebugEnabled()) {
log.debug("Received communication channel create event while node stopping (will ignore) " +
"[rmtNodeId=" + rmtNodeId + ", initMsg=" + initMsg + ']');
if (initMsg.topic() == null) {
int topicOrd = initMsg.topicOrdinal();
initMsg.topic(topicOrd >= 0 ? GridTopic.fromOrdinal(topicOrd) :
U.unmarshal(marsh, initMsg.topicBytes(), U.resolveClassLoader(ctx.config())));
byte plc = initMsg.policy();
pools.poolForPolicy(plc).execute(new Runnable() {
@Override public void run() {
processOpenedChannel(initMsg.topic(), rmtNodeId, (SessionChannelMessage)initMsg.message(),
catch (IgniteCheckedException e) {
U.error(log, "Failed to process channel creation event due to exception " +
"[rmtNodeId=" + rmtNodeId + ", initMsg=" + initMsg + ']', e);
finally {
* @param nodeId Node ID.
* @param msg Message bytes.
* @param msgC Closure to call when message processing finished.
private void onMessage0(UUID nodeId, GridIoMessage msg, IgniteRunnable msgC) {
assert nodeId != null;
assert msg != null;
Lock busyLock0 = busyLock.readLock();
try {
if (stopping) {
if (log.isDebugEnabled())
log.debug("Received communication message while stopping (will ignore) [nodeId=" +
nodeId + ", msg=" + msg + ']');
if (msg.topic() == null) {
int topicOrd = msg.topicOrdinal();
msg.topic(topicOrd >= 0 ? GridTopic.fromOrdinal(topicOrd) :
U.unmarshal(marsh, msg.topicBytes(), U.resolveClassLoader(ctx.config())));
if (!started) {
try {
if (!started) { // Sets to true in write lock, so double checking.
// Received message before valid context is set to manager.
if (log.isDebugEnabled())
log.debug("Adding message to waiting list [senderId=" + nodeId +
", msg=" + msg + ']');
Deque<DelayedMessage> list = F.<UUID, Deque<DelayedMessage>>addIfAbsent(
assert list != null;
list.add(new DelayedMessage(nodeId, msg, msgC));
finally {
// If message is P2P, then process in P2P service.
// This is done to avoid extra waiting and potential deadlocks
// as thread pool may not have any available threads to give.
byte plc = msg.policy();
switch (plc) {
case P2P_POOL: {
processP2PMessage(nodeId, msg, msgC);
case IDX_POOL:
if (msg.isOrdered())
processOrderedMessage(nodeId, msg, plc, msgC);
processRegularMessage(nodeId, msg, plc, msgC);
assert plc >= 0 : "Negative policy [plc=" + plc + ", msg=" + msg + ']';
if (isReservedGridIoPolicy(plc))
throw new IgniteCheckedException("Failed to process message with policy of reserved range. " +
"[policy=" + plc + ']');
if (msg.isOrdered())
processOrderedMessage(nodeId, msg, plc, msgC);
processRegularMessage(nodeId, msg, plc, msgC);
catch (IgniteCheckedException e) {
U.error(log, "Failed to process message (will ignore): " + msg, e);
finally {
* @param nodeId Node ID.
* @param msg Message.
* @param msgC Closure to call when message processing finished.
private void processP2PMessage(
final UUID nodeId,
final GridIoMessage msg,
final IgniteRunnable msgC
) {
Runnable c = new Runnable() {
@Override public void run() {
try {
threadProcessingMessage(true, msgC);
GridMessageListener lsnr = listenerGet0(msg.topic());
if (lsnr == null)
Object obj = msg.message();
assert obj != null;
invokeListener(msg.policy(), lsnr, nodeId, obj, secSubjId(msg));
finally {
threadProcessingMessage(false, null);;
try {
catch (RejectedExecutionException e) {
U.error(log, "Failed to process P2P message due to execution rejection. Increase the upper bound " +
"on 'ExecutorService' provided by 'IgniteConfiguration.getPeerClassLoadingThreadPoolSize()'. " +
"Will attempt to process message in the listener thread instead.", e);;
* @param nodeId Node ID.
* @param msg Message.
* @param plc Execution policy.
* @param msgC Closure to call when message processing finished.
* @throws IgniteCheckedException If failed.
private void processRegularMessage(
final UUID nodeId,
final GridIoMessage msg,
final byte plc,
final IgniteRunnable msgC
) throws IgniteCheckedException {
Runnable c = new TraceRunnable(ctx.tracing(), COMMUNICATION_REGULAR_PROCESS) {
@Override public void execute() {
try {
MTC.span().addTag(SpanTags.MESSAGE, () -> traceName(msg));
threadProcessingMessage(true, msgC);
processRegularMessage0(msg, nodeId);
finally {
threadProcessingMessage(false, null);;
@Override public String toString() {
return "Message closure [msg=" + msg + ']';
MTC.span().addLog(() -> "Regular process queued");
if (msg.topicOrdinal() == TOPIC_IO_TEST.ordinal()) {
IgniteIoTestMessage msg0 = (IgniteIoTestMessage)msg.message();
if (msg0.processFromNioThread());
ctx.getStripedExecutorService().execute(-1, c);
if (msg.topicOrdinal() == TOPIC_CACHE_COORDINATOR.ordinal()) {
MvccMessage msg0 = (MvccMessage)msg.message();
// see IGNITE-8609
/*if (msg0.processedFromNioThread());
ctx.getStripedExecutorService().execute(-1, c);
final int part = msg.partition(); // Store partition to avoid possible recalculation.
if (plc == GridIoPolicy.SYSTEM_POOL && part != GridIoMessage.STRIPE_DISABLED_PART) {
ctx.getStripedExecutorService().execute(part, c);
if (plc == GridIoPolicy.DATA_STREAMER_POOL && part != GridIoMessage.STRIPE_DISABLED_PART) {
ctx.getDataStreamerExecutorService().execute(part, c);
if (msg.topicOrdinal() == TOPIC_IO_TEST.ordinal()) {
IgniteIoTestMessage msg0 = (IgniteIoTestMessage)msg.message();
if (msg0.processFromNioThread()) {;
try {
String execName = msg.executorName();
if (execName != null) {
Executor exec = pools.customExecutor(execName);
if (exec != null) {
else {
LT.warn(log, "Custom executor doesn't exist (message will be processed in default " +
"thread pool): " + execName);
catch (RejectedExecutionException e) {
if (!ctx.isStopping()) {
U.error(log, "Failed to process regular message due to execution rejection. Will attempt to process " +
"message in the listener thread instead.", e);;
else if (log.isDebugEnabled())
log.debug("Failed to process regular message due to execution rejection: " + msg);
* @param msg Message.
* @param nodeId Node ID.
private void processRegularMessage0(GridIoMessage msg, UUID nodeId) {
GridMessageListener lsnr = listenerGet0(msg.topic());
if (lsnr == null)
Object obj = msg.message();
assert obj != null;
invokeListener(msg.policy(), lsnr, nodeId, obj, secSubjId(msg));
* Get listener.
* @param topic Topic.
* @return Listener.
@Nullable private GridMessageListener listenerGet0(Object topic) {
if (topic instanceof GridTopic)
return sysLsnrs[systemListenerIndex(topic)];
return lsnrMap.get(topic);
* Put listener if it is absent.
* @param topic Topic.
* @param lsnr Listener.
* @return Old listener (if any).
@Nullable private GridMessageListener listenerPutIfAbsent0(Object topic, GridMessageListener lsnr) {
if (topic instanceof GridTopic) {
synchronized (sysLsnrsMux) {
int idx = systemListenerIndex(topic);
GridMessageListener old = sysLsnrs[idx];
if (old == null)
changeSystemListener(idx, lsnr);
return old;
return lsnrMap.putIfAbsent(topic, lsnr);
* Remove listener.
* @param topic Topic.
* @return Removed listener (if any).
@Nullable private GridMessageListener listenerRemove0(Object topic) {
if (topic instanceof GridTopic) {
synchronized (sysLsnrsMux) {
int idx = systemListenerIndex(topic);
GridMessageListener old = sysLsnrs[idx];
if (old != null)
changeSystemListener(idx, null);
return old;
return lsnrMap.remove(topic);
* Remove listener if it matches expected value.
* @param topic Topic.
* @param exp Listener.
* @return Result.
private boolean listenerRemove0(Object topic, GridMessageListener exp) {
if (topic instanceof GridTopic) {
synchronized (sysLsnrsMux) {
return systemListenerChange(topic, exp, null);
return lsnrMap.remove(topic, exp);
* Replace listener.
* @param topic Topic.
* @param exp Old value.
* @param newVal New value.
* @return Result.
private boolean listenerReplace0(Object topic, GridMessageListener exp, GridMessageListener newVal) {
if (topic instanceof GridTopic) {
synchronized (sysLsnrsMux) {
return systemListenerChange(topic, exp, newVal);
return lsnrMap.replace(topic, exp, newVal);
* Change system listener.
* @param topic Topic.
* @param exp Expected value.
* @param newVal New value.
* @return Result.
private boolean systemListenerChange(Object topic, GridMessageListener exp, GridMessageListener newVal) {
assert Thread.holdsLock(sysLsnrsMux);
assert topic instanceof GridTopic;
int idx = systemListenerIndex(topic);
GridMessageListener old = sysLsnrs[idx];
if (old != null && old.equals(exp)) {
changeSystemListener(idx, newVal);
return true;
return false;
* Change systme listener at the given index.
* @param idx Index.
* @param lsnr Listener.
private void changeSystemListener(int idx, @Nullable GridMessageListener lsnr) {
assert Thread.holdsLock(sysLsnrsMux);
GridMessageListener[] res = new GridMessageListener[sysLsnrs.length];
System.arraycopy(sysLsnrs, 0, res, 0, sysLsnrs.length);
res[idx] = lsnr;
sysLsnrs = res;
* Get index of a system listener.
* @param topic Topic.
* @return Index.
private int systemListenerIndex(Object topic) {
assert topic instanceof GridTopic;
return ((GridTopic)topic).ordinal();
* @param nodeId Node ID.
* @param msg Ordered message.
* @param plc Execution policy.
* @param msgC Closure to call when message processing finished ({@code null} for sync processing).
* @throws IgniteCheckedException If failed.
private void processOrderedMessage(
final UUID nodeId,
final GridIoMessage msg,
final byte plc,
@Nullable final IgniteRunnable msgC
) throws IgniteCheckedException {
assert msg != null;
long timeout = msg.timeout();
boolean skipOnTimeout = msg.skipOnTimeout();
boolean isNew = false;
ConcurrentMap<UUID, GridCommunicationMessageSet> map;
GridCommunicationMessageSet set = null;
while (true) {
map = msgSetMap.get(msg.topic());
if (map == null) {
set = new GridCommunicationMessageSet(plc, msg.topic(), nodeId, timeout, skipOnTimeout, msg, msgC);
map = new ConcurrentHashMap0<>();
map.put(nodeId, set);
ConcurrentMap<UUID, GridCommunicationMessageSet> old = msgSetMap.putIfAbsent(
msg.topic(), map);
if (old != null)
map = old;
else {
isNew = true;
// Put succeeded.
boolean rmv = false;
synchronized (map) {
if (map.isEmpty())
rmv = true;
else {
set = map.get(nodeId);
if (set == null) {
GridCommunicationMessageSet old = map.putIfAbsent(nodeId,
set = new GridCommunicationMessageSet(plc, msg.topic(),
nodeId, timeout, skipOnTimeout, msg, msgC));
assert old == null;
isNew = true;
// Put succeeded.
if (rmv)
msgSetMap.remove(msg.topic(), map);
else {
assert set != null;
assert !isNew;
set.add(msg, msgC);
if (isNew && ctx.discovery().node(nodeId) == null) {
if (log.isDebugEnabled())
log.debug("Message is ignored as sender has left the grid: " + msg);
assert map != null;
boolean rmv;
synchronized (map) {
rmv = map.isEmpty();
if (rmv)
msgSetMap.remove(msg.topic(), map);
if (isNew && set.endTime() != Long.MAX_VALUE)
final GridMessageListener lsnr = listenerGet0(msg.topic());
if (lsnr == null) {
if (closedTopics.contains(msg.topic())) {
if (log.isDebugEnabled())
log.debug("Message is ignored as it came for the closed topic: " + msg);
assert map != null;
msgSetMap.remove(msg.topic(), map);
else if (log.isDebugEnabled()) {
// Note that we simply keep messages if listener is not
// registered yet, until one will be registered.
log.debug("Received message for unknown listener (messages will be kept until a " +
"listener is registered): " + msg);
// Mark the message as processed, otherwise reading from the connection
// may stop.
if (msgC != null);
if (msgC == null) {
// Message from local node can be processed in sync manner.
assert locNodeId.equals(nodeId);
unwindMessageSet(set, lsnr);
final GridCommunicationMessageSet msgSet0 = set;
Runnable c = new Runnable() {
@Override public void run() {
try {
threadProcessingMessage(true, msgC);
unwindMessageSet(msgSet0, lsnr);
finally {
threadProcessingMessage(false, null);
try {
MTC.span().addLog(() -> "Ordered process queued");
catch (RejectedExecutionException e) {
U.error(log, "Failed to process ordered message due to execution rejection. " +
"Increase the upper bound on executor service provided by corresponding " +
"configuration property. Will attempt to process message in the listener " +
"thread instead [msgPlc=" + plc + ']', e);;
* @param msgSet Message set to unwind.
* @param lsnr Listener to notify.
private void unwindMessageSet(GridCommunicationMessageSet msgSet, GridMessageListener lsnr) {
// Loop until message set is empty or
// another thread owns the reservation.
while (true) {
if (msgSet.reserve()) {
try {
finally {
// Check outside of reservation block.
if (!msgSet.changed()) {
if (log.isDebugEnabled())
log.debug("Message set has not been changed: " + msgSet);
else {
if (log.isDebugEnabled())
log.debug("Another thread owns reservation: " + msgSet);
* Invoke message listener.
* @param plc Policy.
* @param lsnr Listener.
* @param nodeId Node ID.
* @param msg Message.
* @param secSubjId Security subject that will be used to open a security session.
private void invokeListener(Byte plc, GridMessageListener lsnr, UUID nodeId, Object msg, UUID secSubjId) {
MTC.span().addLog(() -> "Invoke listener");
Byte oldPlc = CUR_PLC.get();
boolean change = !F.eq(oldPlc, plc);
if (change)
UUID newSecSubjId = secSubjId != null ? secSubjId : nodeId;
try (OperationSecurityContext s = {
lsnr.onMessage(nodeId, msg, plc);
finally {
if (change)
* @return Current IO policy
@Nullable public static Byte currentPolicy() {
return CUR_PLC.get();
* @param nodeId Node ID.
* @param sndErr Send error.
* @param ping {@code True} if try ping node.
* @return {@code True} if node left.
* @throws IgniteClientDisconnectedCheckedException If ping failed.
public boolean checkNodeLeft(UUID nodeId, IgniteCheckedException sndErr, boolean ping)
throws IgniteClientDisconnectedCheckedException
return sndErr instanceof ClusterTopologyCheckedException ||
ctx.discovery().node(nodeId) == null ||
(ping && !ctx.discovery().pingNode(nodeId));
* @param remoteId The remote node to connect to.
* @param topic The remote topic to connect to.
* @return The channel instance to communicate with remote.
public TransmissionSender openTransmissionSender(UUID remoteId, Object topic) {
return new TransmissionSender(remoteId, topic);
* @param topic The {@link GridTopic} to register handler to.
* @param hnd Handler which will handle file upload requests.
public void addTransmissionHandler(Object topic, TransmissionHandler hnd) {
TransmissionHandler hnd0 = topicTransmissionHnds.putIfAbsent(topic, hnd);
assert hnd0 == null : "The topic already have an appropriate session handler [topic=" + topic + ']';
* @param topic The topic to erase handler from.
public void removeTransmissionHandler(Object topic) {
ReceiverContext rcvCtx0;
synchronized (rcvMux) {
rcvCtx0 = rcvCtxs.remove(topic);
new IgniteCheckedException("Receiver has been closed due to removing corresponding transmission handler " +
"on local node [nodeId=" + ctx.localNodeId() + ']'));
* This method must be used prior to opening a {@link TransmissionSender} by calling
* {@link #openTransmissionSender(UUID, Object)} to ensure that remote and local nodes
* are fully support direct {@link SocketChannel} connection to transfer data.
* @param node Remote node to check.
* @return {@code true} if a file can be sent over socket channel directly.
public boolean fileTransmissionSupported(ClusterNode node) {
return ((CommunicationSpi)getSpi() instanceof TcpCommunicationSpi) &&
nodeSupports(node, CHANNEL_COMMUNICATION);
* @param nodeId Destination node to connect to.
* @param topic Topic to send the request to.
* @param initMsg Channel initialization message.
* @return Established {@link Channel} to use.
* @throws IgniteCheckedException If fails.
private IgniteInternalFuture<Channel> openChannel(
UUID nodeId,
Object topic,
Message initMsg
) throws IgniteCheckedException {
assert nodeId != null;
assert topic != null;
assert !locNodeId.equals(nodeId) : "Channel cannot be opened to the local node itself: " + nodeId;
assert (CommunicationSpi)getSpi() instanceof TcpCommunicationSpi : "Only TcpCommunicationSpi supports direct " +
"connections between nodes: " + getSpi().getClass();
ClusterNode node = ctx.discovery().node(nodeId);
if (node == null)
throw new ClusterTopologyCheckedException("Failed to open a new channel to remote node (node left): " + nodeId);
int topicOrd = topic instanceof GridTopic ? ((Enum<GridTopic>)topic).ordinal() : -1;
GridIoMessage ioMsg = createGridIoMessage(topic,
try {
if (topicOrd < 0)
ioMsg.topicBytes(U.marshal(marsh, topic));
return ((TcpCommunicationSpi)(CommunicationSpi)getSpi()).openChannel(node, ioMsg);
catch (IgniteSpiException e) {
if (e.getCause() instanceof ClusterTopologyCheckedException)
throw (ClusterTopologyCheckedException)e.getCause();
if (!ctx.discovery().alive(node))
throw new ClusterTopologyCheckedException("Failed to create channel (node left): " +, e);
throw new IgniteCheckedException("Failed to create channel (node may have left the grid or " +
"TCP connection cannot be established due to unknown issues) " +
"[node=" + node + ", topic=" + topic + ']', e);
* @param node Destination node.
* @param topic Topic to send the message to.
* @param topicOrd GridTopic enumeration ordinal.
* @param msg Message to send.
* @param plc Type of processing.
* @param ordered Ordered flag.
* @param timeout Timeout.
* @param skipOnTimeout Whether message can be skipped on timeout.
* @param ackC Ack closure.
* @param async If {@code true} message for local node will be processed in pool, otherwise in current thread.
* @throws IgniteCheckedException Thrown in case of any errors.
private void send(
ClusterNode node,
Object topic,
int topicOrd,
Message msg,
byte plc,
boolean ordered,
long timeout,
boolean skipOnTimeout,
IgniteInClosure<IgniteException> ackC,
boolean async
) throws IgniteCheckedException {
assert node != null;
assert topic != null;
assert msg != null;
assert !async || msg instanceof GridIoUserMessage : msg; // Async execution was added only for IgniteMessaging.
assert topicOrd >= 0 || !(topic instanceof GridTopic) : msg;
try (TraceSurroundings ignored = support(null)) {
MTC.span().addLog(() -> "Create communication msg - " + traceName(msg));
GridIoMessage ioMsg = createGridIoMessage(topic, topicOrd, msg, plc, ordered, timeout, skipOnTimeout);
if (locNodeId.equals( {
assert plc != P2P_POOL;
CommunicationListener commLsnr = this.commLsnr;
if (commLsnr == null)
throw new IgniteCheckedException("Trying to send message when grid is not fully started.");
if (ordered)
processOrderedMessage(locNodeId, ioMsg, plc, null);
else if (async)
processRegularMessage(locNodeId, ioMsg, plc, NOOP);
processRegularMessage0(ioMsg, locNodeId);
if (ackC != null)
else {
if (topicOrd < 0)
ioMsg.topicBytes(U.marshal(marsh, topic));
try {
if ((CommunicationSpi)getSpi() instanceof TcpCommunicationSpi)
((TcpCommunicationSpi)(CommunicationSpi)getSpi()).sendMessage(node, ioMsg, ackC);
getSpi().sendMessage(node, ioMsg);
catch (IgniteSpiException e) {
if (e.getCause() instanceof ClusterTopologyCheckedException)
throw (ClusterTopologyCheckedException)e.getCause();
if (!ctx.discovery().alive(node))
throw new ClusterTopologyCheckedException("Failed to send message, node left: " +, e);
throw new IgniteCheckedException("Failed to send message (node may have left the grid or " +
"TCP connection cannot be established due to firewall issues) " +
"[node=" + node + ", topic=" + topic +
", msg=" + msg + ", policy=" + plc + ']', e);
/** */
private @NotNull GridIoMessage createGridIoMessage(
Object topic,
int topicOrd,
Message msg,
byte plc,
boolean ordered,
long timeout,
boolean skipOnTimeout) {
if ( {
UUID secSubjId = null;
UUID curSecSubjId =;
if (!locNodeId.equals(curSecSubjId))
secSubjId = curSecSubjId;
return new GridIoSecurityAwareMessage(secSubjId, plc, topic, topicOrd, msg, ordered, timeout, skipOnTimeout);
return new GridIoMessage(plc, topic, topicOrd, msg, ordered, timeout, skipOnTimeout);
* @param nodeId Id of destination node.
* @param topic Topic to send the message to.
* @param msg Message to send.
* @param plc Type of processing.
* @throws IgniteCheckedException Thrown in case of any errors.
public void sendToCustomTopic(UUID nodeId, Object topic, Message msg, byte plc)
throws IgniteCheckedException {
ClusterNode node = ctx.discovery().node(nodeId);
if (node == null)
throw new ClusterTopologyCheckedException("Failed to send message to node (has node left grid?): " + nodeId);
sendToCustomTopic(node, topic, msg, plc);
* @param nodeId Id of destination node.
* @param topic Topic to send the message to.
* @param msg Message to send.
* @param plc Type of processing.
* @throws IgniteCheckedException Thrown in case of any errors.
public void sendToGridTopic(UUID nodeId, GridTopic topic, Message msg, byte plc)
throws IgniteCheckedException {
ClusterNode node = ctx.discovery().node(nodeId);
if (node == null)
throw new ClusterTopologyCheckedException("Failed to send message to node (has node left grid?): " + nodeId);
send(node, topic, topic.ordinal(), msg, plc, false, 0, false, null, false);
* @param node Destination node.
* @param topic Topic to send the message to.
* @param msg Message to send.
* @param plc Type of processing.
* @throws IgniteCheckedException Thrown in case of any errors.
public void sendToGridTopic(ClusterNode node, GridTopic topic, Message msg, byte plc)
throws IgniteCheckedException {
send(node, topic, topic.ordinal(), msg, plc, false, 0, false, null, false);
* @param node Destination node.
* @param topic Topic to send the message to.
* @param msg Message to send.
* @param plc Type of processing.
* @throws IgniteCheckedException Thrown in case of any errors.
public void sendToCustomTopic(ClusterNode node, Object topic, Message msg, byte plc)
throws IgniteCheckedException {
send(node, topic, -1, msg, plc, false, 0, false, null, false);
* @param node Destination node.
* @param topic Topic to send the message to.
* @param msg Message to send.
* @param plc Type of processing.
* @param span Current span for tracing.
* @throws IgniteCheckedException Thrown in case of any errors.
public void sendToGridTopic(ClusterNode node, GridTopic topic, Message msg, byte plc, Span span)
throws IgniteCheckedException {
send(node, topic, topic.ordinal(), msg, plc, false, 0, false, null, false);
* @param node Destination node.
* @param topic Topic to send the message to.
* @param topicOrd GridTopic enumeration ordinal.
* @param msg Message to send.
* @param plc Type of processing. *
* @throws IgniteCheckedException Thrown in case of any errors.
public void sendGeneric(ClusterNode node, Object topic, int topicOrd, Message msg, byte plc)
throws IgniteCheckedException {
send(node, topic, topicOrd, msg, plc, false, 0, false, null, false);
* @param node Destination node.
* @param topic Topic to send the message to.
* @param msg Message to send.
* @param plc Type of processing.
* @param timeout Timeout to keep a message on receiving queue.
* @param skipOnTimeout Whether message can be skipped on timeout.
* @throws IgniteCheckedException Thrown in case of any errors.
public void sendOrderedMessage(
ClusterNode node,
Object topic,
Message msg,
byte plc,
long timeout,
boolean skipOnTimeout
) throws IgniteCheckedException {
assert timeout > 0 || skipOnTimeout;
send(node, topic, (byte)-1, msg, plc, true, timeout, skipOnTimeout, null, false);
* @param node Destination nodes.
* @param topic Topic to send the message to.
* @param msg Message to send.
* @param plc Type of processing.
* @param ackC Ack closure.
* @throws IgniteCheckedException Thrown in case of any errors.
public void sendToGridTopic(ClusterNode node,
GridTopic topic,
Message msg,
byte plc,
IgniteInClosure<IgniteException> ackC) throws IgniteCheckedException
send(node, topic, topic.ordinal(), msg, plc, false, 0, false, ackC, false);
* @param nodes Destination node.
* @param topic Topic to send the message to.
* @param msg Message to send.
* @param plc Type of processing.
* @param timeout Timeout to keep a message on receiving queue.
* @param skipOnTimeout Whether message can be skipped on timeout.
* @throws IgniteCheckedException Thrown in case of any errors.
void sendOrderedMessageToGridTopic(
Collection<? extends ClusterNode> nodes,
GridTopic topic,
Message msg,
byte plc,
long timeout,
boolean skipOnTimeout
throws IgniteCheckedException {
assert timeout > 0 || skipOnTimeout;
IgniteCheckedException err = null;
for (ClusterNode node : nodes) {
try {
send(node, topic, topic.ordinal(), msg, plc, true, timeout, skipOnTimeout, null, false);
catch (IgniteCheckedException e) {
if (err == null)
err = e;
if (err != null)
throw err;
* @param nodes Destination nodes.
* @param topic Topic to send the message to.
* @param msg Message to send.
* @param plc Type of processing.
* @throws IgniteCheckedException Thrown in case of any errors.
public void sendToGridTopic(
Collection<? extends ClusterNode> nodes,
GridTopic topic,
Message msg,
byte plc
) throws IgniteCheckedException {
IgniteCheckedException err = null;
for (ClusterNode node : nodes) {
try {
send(node, topic, topic.ordinal(), msg, plc, false, 0, false, null, false);
catch (IgniteCheckedException e) {
if (err == null)
err = e;
if (err != null)
throw err;
* @param node Destination node.
* @param topic Topic to send the message to.
* @param msg Message to send.
* @param plc Type of processing.
* @param timeout Timeout to keep a message on receiving queue.
* @param skipOnTimeout Whether message can be skipped on timeout.
* @param ackC Ack closure.
* @throws IgniteCheckedException Thrown in case of any errors.
public void sendOrderedMessage(
ClusterNode node,
Object topic,
Message msg,
byte plc,
long timeout,
boolean skipOnTimeout,
IgniteInClosure<IgniteException> ackC
) throws IgniteCheckedException {
assert timeout > 0 || skipOnTimeout;
send(node, topic, (byte)-1, msg, plc, true, timeout, skipOnTimeout, ackC, false);
* Sends a peer deployable user message.
* @param nodes Destination nodes.
* @param msg Message to send.
* @param topic Message topic to use.
* @param ordered Is message ordered?
* @param timeout Message timeout in milliseconds for ordered messages.
* @param async Async flag.
* @throws IgniteCheckedException Thrown in case of any errors.
public void sendUserMessage(Collection<? extends ClusterNode> nodes,
Object msg,
@Nullable Object topic,
boolean ordered,
long timeout,
boolean async) throws IgniteCheckedException
boolean loc = nodes.size() == 1 && F.first(nodes).id().equals(locNodeId);
byte[] serMsg = null;
byte[] serTopic = null;
if (!loc) {
serMsg = U.marshal(marsh, msg);
if (topic != null)
serTopic = U.marshal(marsh, topic);
GridDeployment dep = null;
String depClsName = null;
if (ctx.config().isPeerClassLoadingEnabled()) {
Class<?> cls0 = U.detectClass(msg);
if (U.isJdk(cls0) && topic != null)
cls0 = U.detectClass(topic);
dep = ctx.deploy().deploy(cls0, U.detectClassLoader(cls0));
if (dep == null)
throw new IgniteDeploymentCheckedException("Failed to deploy user message: " + msg);
depClsName = cls0.getName();
Message ioMsg = new GridIoUserMessage(
dep != null ? dep.classLoaderId() : null,
dep != null ? dep.deployMode() : null,
dep != null ? dep.userVersion() : null,
dep != null ? dep.participants() : null);
if (ordered)
sendOrderedMessageToGridTopic(nodes, TOPIC_COMM_USER, ioMsg, PUBLIC_POOL, timeout, true);
else if (loc) {
else {
ClusterNode locNode = F.find(nodes, null, F.localNode(locNodeId));
Collection<? extends ClusterNode> rmtNodes = F.view(nodes, F.remoteNodes(locNodeId));
if (!rmtNodes.isEmpty())
sendToGridTopic(rmtNodes, TOPIC_COMM_USER, ioMsg, PUBLIC_POOL);
// Will call local listeners in current thread synchronously or through pool,
// depending async flag, so must go the last
// to allow remote nodes execute the requested operation in parallel.
if (locNode != null) {
public void addUserMessageListener(final @Nullable Object topic, final @Nullable IgniteBiPredicate<UUID, ?> p) {
addUserMessageListener(topic, p, ctx.localNodeId());
* @param topic Topic to subscribe to.
* @param p Message predicate.
public void addUserMessageListener(
final @Nullable Object topic,
final @Nullable IgniteBiPredicate<UUID, ?> p,
final UUID nodeId
) {
if (p != null) {
try {
if (p instanceof PlatformMessageFilter)
new GridUserMessageListener(topic, (IgniteBiPredicate<UUID, Object>)p, nodeId));
catch (IgniteCheckedException e) {
throw new IgniteException(e);
* @param topic Topic to unsubscribe from.
* @param p Message predicate.
public void removeUserMessageListener(@Nullable Object topic, IgniteBiPredicate<UUID, ?> p) {
new GridUserMessageListener(topic, (IgniteBiPredicate<UUID, Object>)p));
* @param topic Listener's topic.
* @param lsnr Listener to add.
public void addMessageListener(GridTopic topic, GridMessageListener lsnr) {
addMessageListener((Object)topic, lsnr);
* @param lsnr Listener to add.
public void addDisconnectListener(GridDisconnectListener lsnr) {
* @param lsnr Listener to remove.
public void removeDisconnectListener(GridDisconnectListener lsnr) {
* @param topic Listener's topic.
* @param lsnr Listener to add.
public void addMessageListener(Object topic, final GridMessageListener lsnr) {
assert lsnr != null;
assert topic != null;
// Make sure that new topic is not in the list of closed topics.
GridMessageListener lsnrs;
for (;;) {
lsnrs = listenerPutIfAbsent0(topic, lsnr);
if (lsnrs == null) {
lsnrs = lsnr;
assert lsnrs != null;
if (!(lsnrs instanceof ArrayListener)) { // We are putting the second listener, creating array.
GridMessageListener arrLsnr = new ArrayListener(lsnrs, lsnr);
if (listenerReplace0(topic, lsnrs, arrLsnr)) {
lsnrs = arrLsnr;
else {
if (((ArrayListener)lsnrs).add(lsnr))
// Add operation failed because array is already empty and is about to be removed, helping and retrying.
listenerRemove0(topic, lsnrs);
Map<UUID, GridCommunicationMessageSet> map = msgSetMap.get(topic);
Collection<GridCommunicationMessageSet> msgSets = map != null ? map.values() : null;
if (msgSets != null) {
final GridMessageListener lsnrs0 = lsnrs;
try {
for (final GridCommunicationMessageSet msgSet : msgSets) {
new Runnable() {
@Override public void run() {
unwindMessageSet(msgSet, lsnrs0);
catch (RejectedExecutionException e) {
U.error(log, "Failed to process delayed message due to execution rejection. Increase the upper bound " +
"on executor service provided in 'IgniteConfiguration.getPublicThreadPoolSize()'). Will attempt to " +
"process message in the listener thread instead.", e);
for (GridCommunicationMessageSet msgSet : msgSets)
unwindMessageSet(msgSet, lsnr);
catch (IgniteCheckedException ice) {
throw new IgniteException(ice);
* @param topic Message topic.
* @return Whether or not listener was indeed removed.
public boolean removeMessageListener(GridTopic topic) {
return removeMessageListener((Object)topic);
* @param topic Message topic.
* @return Whether or not listener was indeed removed.
public boolean removeMessageListener(Object topic) {
return removeMessageListener(topic, null);
* @param topic Listener's topic.
* @param lsnr Listener to remove.
* @return Whether or not the lsnr was removed.
public boolean removeMessageListener(GridTopic topic, @Nullable GridMessageListener lsnr) {
return removeMessageListener((Object)topic, lsnr);
* @param topic Listener's topic.
* @param lsnr Listener to remove.
* @return Whether or not the lsnr was removed.
public boolean removeMessageListener(Object topic, @Nullable GridMessageListener lsnr) {
assert topic != null;
boolean rmv = true;
Collection<GridCommunicationMessageSet> msgSets = null;
// If listener is null, then remove all listeners.
if (lsnr == null) {
lsnr = listenerRemove0(topic);
rmv = lsnr != null;
Map<UUID, GridCommunicationMessageSet> map = msgSetMap.remove(topic);
if (map != null)
msgSets = map.values();
else {
for (;;) {
GridMessageListener lsnrs = listenerGet0(topic);
// If removing listener before subscription happened.
if (lsnrs == null) {
Map<UUID, GridCommunicationMessageSet> map = msgSetMap.remove(topic);
if (map != null)
msgSets = map.values();
rmv = false;
else {
boolean empty = false;
if (!(lsnrs instanceof ArrayListener)) {
if (lsnrs.equals(lsnr)) {
if (!listenerRemove0(topic, lsnrs))
continue; // Retry because it can be packed to array listener.
empty = true;
rmv = false;
else {
ArrayListener arrLsnr = (ArrayListener)lsnrs;
if (arrLsnr.remove(lsnr))
empty = arrLsnr.isEmpty();
// Listener was not found.
rmv = false;
if (empty)
listenerRemove0(topic, lsnrs);
// If removing last subscribed listener.
if (empty) {
Map<UUID, GridCommunicationMessageSet> map = msgSetMap.remove(topic);
if (map != null)
msgSets = map.values();
if (msgSets != null)
for (GridCommunicationMessageSet msgSet : msgSets)
if (rmv && log.isDebugEnabled())
log.debug("Removed message listener [topic=" + topic + ", lsnr=" + lsnr + ']');
if (lsnr instanceof ArrayListener) {
for (GridMessageListener childLsnr : ((ArrayListener)lsnr).arr)
return rmv;
* Closes a listener, if applicable.
* @param lsnr Listener.
private void closeListener(GridMessageListener lsnr) {
if (lsnr instanceof GridUserMessageListener) {
GridUserMessageListener userLsnr = (GridUserMessageListener)lsnr;
if (userLsnr.predLsnr instanceof PlatformMessageFilter)
* Gets sent messages count.
* @return Sent messages count.
public int getSentMessagesCount() {
return getSpi().getSentMessagesCount();
* Gets sent bytes count.
* @return Sent bytes count.
public long getSentBytesCount() {
return getSpi().getSentBytesCount();
* Gets received messages count.
* @return Received messages count.
public int getReceivedMessagesCount() {
return getSpi().getReceivedMessagesCount();
* Gets received bytes count.
* @return Received bytes count.
public long getReceivedBytesCount() {
return getSpi().getReceivedBytesCount();
* Gets outbound messages queue size.
* @return Outbound messages queue size.
public int getOutboundMessagesQueueSize() {
return getSpi().getOutboundMessagesQueueSize();
* @param rctx Receiver context to use.
* @param ex Exception to close receiver with.
private void interruptRecevier(ReceiverContext rctx, Exception ex) {
if (rctx == null)
if (rctx.interrupted.compareAndSet(false, true)) {
if (rctx.timeoutObj != null)
rctx.lastState = rctx.lastState == null ?
new TransmissionMeta(ex) : rctx.lastState.error(ex);
rctx.hnd.onException(rctx.rmtNodeId, ex);
U.error(log, "Receiver has been interrupted due to an exception occurred [ctx=" + rctx + ']', ex);
* @param topic Topic to which the channel is created.
* @param rmtNodeId Remote node id.
* @param initMsg Channel initialization message with additional params.
* @param ch Channel instance.
private void processOpenedChannel(Object topic, UUID rmtNodeId, SessionChannelMessage initMsg, SocketChannel ch) {
ReceiverContext rcvCtx = null;
ObjectInputStream in = null;
ObjectOutputStream out = null;
try {
if (stopping) {
throw new NodeStoppingException("Local node is stopping. Channel will be closed [topic=" + topic +
", channel=" + ch + ']');
if (initMsg == null || initMsg.sesId() == null) {
U.warn(log, "There is no initial message provied for given topic. Opened channel will be closed " +
"[rmtNodeId=" + rmtNodeId + ", topic=" + topic + ", initMsg=" + initMsg + ']');
configureChannel(ch, netTimeoutMs);
in = new ObjectInputStream(ch.socket().getInputStream());
out = new ObjectOutputStream(ch.socket().getOutputStream());
IgniteUuid newSesId = initMsg.sesId();
synchronized (rcvMux) {
TransmissionHandler hnd = topicTransmissionHnds.get(topic);
if (hnd == null) {
U.warn(log, "There is no handler for a given topic. Channel will be closed [rmtNodeId=" + rmtNodeId +
", topic=" + topic + ']');
rcvCtx = rcvCtxs.computeIfAbsent(topic, t -> new ReceiverContext(rmtNodeId, hnd, newSesId));
// Do not allow multiple connections for the same session.
if (!newSesId.equals(rcvCtx.sesId)) {
IgniteCheckedException err = new IgniteCheckedException("Requested topic is busy by another transmission. " +
"It's not allowed to process different sessions over the same topic simultaneously. " +
"Channel will be closed [initMsg=" + initMsg + ", channel=" + ch + ", nodeId=" + rmtNodeId + ']');
U.error(log, err);
out.writeObject(new TransmissionMeta(err));
if (log.isDebugEnabled()) {
log.debug("Trasmission open a new channel [rmtNodeId=" + rmtNodeId + ", topic=" + topic +
", initMsg=" + initMsg + ']');
if (!rcvCtx.lock.tryLock(netTimeoutMs, TimeUnit.MILLISECONDS))
throw new IgniteException("Wait for the previous receiver finished its work timeouted: " + rcvCtx);
try {
if (rcvCtx.timeoutObj != null)
// Send previous context state to sync remote and local node.
out.writeObject(rcvCtx.lastState == null ? new TransmissionMeta() : rcvCtx.lastState);
if (rcvCtx.lastState == null || rcvCtx.lastState.error() == null)
receiveFromChannel(topic, rcvCtx, in, out, ch);
interruptRecevier(rcvCtxs.remove(topic), rcvCtx.lastState.error());
finally {
catch (Throwable t) {
U.error(log, "Download session cannot be finished due to an unexpected error [ctx=" + rcvCtx + ']', t);
// Do not remove receiver context here, since sender will recconect to get this error.
interruptRecevier(rcvCtx, new IgniteCheckedException("Channel processing error [nodeId=" + rmtNodeId + ']', t));
finally {
* @param topic Topic handler related to.
* @param rcvCtx Receiver read context.
* @throws NodeStoppingException If processing fails.
* @throws InterruptedException If thread interrupted.
private void receiveFromChannel(
Object topic,
ReceiverContext rcvCtx,
ObjectInputStream in,
ObjectOutputStream out,
ReadableByteChannel ch
) throws NodeStoppingException, InterruptedException {
try {
while (true) {
if (Thread.currentThread().isInterrupted())
throw new InterruptedException("The thread has been interrupted. Stop downloading file.");
if (stopping)
throw new NodeStoppingException("Operation has been cancelled (node is stopping)");
boolean exit = in.readBoolean();
if (exit) {
ReceiverContext rcv = rcvCtxs.remove(topic);
assert rcv != null;
TransmissionMeta meta = (TransmissionMeta)in.readObject();
if (rcvCtx.rcv == null) {
rcvCtx.rcv = createReceiver(rcvCtx.rmtNodeId,
() -> stopping || rcvCtx.interrupted.get());
rcvCtx.lastState = meta;
validate(rcvCtx.lastState, meta);
try {
long startTime = U.currentTimeMillis();
// Write processing ack.
U.log(log, "File has been received " +
"[name=" + rcvCtx.rcv.state().name() + ", transferred=" + rcvCtx.rcv.transferred() +
", time=" + (double)((U.currentTimeMillis() - startTime) / 1000) + " sec" +
", rmtId=" + rcvCtx.rmtNodeId + ']');
rcvCtx.rcv = null;
catch (Throwable e) {
rcvCtx.lastState = rcvCtx.rcv.state();
throw e;
catch (ClassNotFoundException e) {
throw new IgniteException(e);
catch (IOException e) {
// Waiting for re-establishing connection.
U.warn(log, "Сonnection from the remote node lost. Will wait for the new one to continue file " +
"receive [nodeId=" + rcvCtx.rmtNodeId + ", sesKey=" + rcvCtx.sesId + ']', e);
long startTs = U.currentTimeMillis();
boolean added = ctx.timeout().addTimeoutObject(rcvCtx.timeoutObj = new GridTimeoutObject() {
@Override public IgniteUuid timeoutId() {
return rcvCtx.sesId;
@Override public long endTime() {
return startTs + netTimeoutMs;
@Override public void onTimeout() {
interruptRecevier(rcvCtxs.remove(topic), new IgniteCheckedException("Receiver is closed due to " +
"waiting for the reconnect has been timeouted"));
assert added;
* @param prev Previous available transmission meta.
* @param next Next transmission meta.
private void validate(TransmissionMeta prev, TransmissionMeta next) {
A.ensure(, "Attempt to load different file " +
"[prev=" + prev + ", next=" + next + ']');
A.ensure(prev.offset() == next.offset(),
"The next chunk offest is incorrect [prev=" + prev + ", meta=" + next + ']');
A.ensure(prev.count() == next.count(), " The count of bytes to transfer for " +
"the next chunk is incorrect [prev=" + prev + ", next=" + next + ']');
A.ensure(prev.policy() == next.policy(), "Attemt to continue file upload with" +
" different transmission policy [prev=" + prev + ", next=" + next + ']');
* @param nodeId Remote node id.
* @param hnd Current handler instance which produces file handlers.
* @param meta Meta information about file pending to receive to create appropriate receiver.
* @param stopChecker Process interrupt checker.
* @return Chunk data recevier.
private TransmissionReceiver createReceiver(
UUID nodeId,
TransmissionHandler hnd,
TransmissionMeta meta,
BooleanSupplier stopChecker
) {
switch (meta.policy()) {
case FILE:
return new FileReceiver(
hnd.fileHandler(nodeId, meta),
hnd.filePath(nodeId, meta),
case CHUNK:
return new ChunkReceiver(
hnd.chunkHandler(nodeId, meta),
throw new IllegalStateException("The type of transmission policy is unknown. An implementation " +
"required: " + meta.policy());
* @param channel Socket channel to configure blocking mode.
* @param timeout Ignite network configuration timeout.
* @throws IOException If fails.
private static void configureChannel(SocketChannel channel, int timeout) throws IOException {
// Timeout must be enabled prior to entering the blocking mode to have effect.
* Dumps SPI stats to diagnostic logs in case TcpCommunicationSpi is used, no-op otherwise.
public void dumpStats() {
CommunicationSpi spi = getSpi();
if (spi instanceof TcpCommunicationSpi)
/** {@inheritDoc} */
@Override public void printMemoryStats() {
X.println(">>> IO manager memory stats [igniteInstanceName=" + ctx.igniteInstanceName() + ']');
X.println(">>> lsnrMapSize: " + lsnrMap.size());
X.println(">>> msgSetMapSize: " + msgSetMap.size());
X.println(">>> closedTopicsSize: " + closedTopics.sizex());
X.println(">>> discoWaitMapSize: " + waitMap.size());
* Read context holds all the information about current transfer read from channel process.
private static class ReceiverContext {
/** The remote node input channel came from. */
private final UUID rmtNodeId;
/** Current sesssion handler. */
private final TransmissionHandler hnd;
/** Unique session request id. */
private final IgniteUuid sesId;
/** Flag indicates that current file handling process must be interrupted. */
private final AtomicBoolean interrupted = new AtomicBoolean();
/** Only one thread can handle receiver context. */
private final Lock lock = new ReentrantLock();
/** Last infinished downloading object. */
private TransmissionReceiver rcv;
/** Last saved state about file data processing. */
private TransmissionMeta lastState;
/** Close receiver timeout object. */
private GridTimeoutObject timeoutObj;
* @param rmtNodeId Remote node id.
* @param hnd Channel handler of current topic.
* @param sesId Unique session request id.
public ReceiverContext(UUID rmtNodeId, TransmissionHandler hnd, IgniteUuid sesId) {
this.rmtNodeId = rmtNodeId;
this.hnd = hnd;
this.sesId = sesId;
/** {@inheritDoc} */
@Override public String toString() {
return S.toString(ReceiverContext.class, this);
* Сlass represents an implementation of transmission file writer. Each new instance of transmission sender
* will establish a new connection with unique transmission session identifier to the remote node and given
* topic (an arbitraty {@link GridTopic} can be used).
* <h2>Zero-copy approach</h2>
* <p>
* Current implementation of transmission sender is based on file zero-copy approach (the {@link FileSender}
* is used under the hood). It is much more efficient than a simple loop that reads data from
* given file and writes it to the target socket channel. But if operating system does not support zero-copy
* file transfer, sending a file with {@link TransmissionSender} might fail or yield worse performance.
* <p>
* Please, refer to <a href=""></a>
* or {@link FileChannel#transferTo(long, long, WritableByteChannel)} for details of such approach.
* <h2>File and Chunk handlers</h2>
* <p>
* It is possible to choose a file handler prior to sendig the file to remote node within opened transmission
* session. There are two types of handlers available:
* {@link TransmissionHandler#chunkHandler(UUID, TransmissionMeta)} and
* {@link TransmissionHandler#fileHandler(UUID, TransmissionMeta)}. You can use an appropriate
* {@link TransmissionPolicy} for {@link #send(File, long, long, Map, TransmissionPolicy)} method to switch
* between them.
* <h2>Exceptions handling</h2>
* <p>
* Each transmission can have two different high-level types of exception which are handled differently:
* <ul>
* <li><em>transport</em> exception(e.g. some network issues)</li>
* <li><em>application</em>\<em>handler</em> level exception</li>
* </ul>
* <h3><em>Application</em> exceptions</h3>
* <p>
* The transmission will be stopped immediately and wrapping <em>IgniteExcpetion</em> thrown in case of
* any <em>application</em> exception occured.
* <h3><em>Transport</em> exceptions</h3>
* <p>
* All transport level exceptions of transmission file sender will require transmission to be reconnected.
* For instance, when the local node closes the socket connection in orderly way, but the file is not fully
* handled by remote node, the read operation over the same socket endpoint will return <tt>-1</tt>. Such
* result will be consideread as an {@link IOException} by handler and it will wait for reestablishing connection
* to continue file loading.
* <p>
* Another example, the transmission sender gets the <em>Connection reset by peer</em> IOException message.
* This means that the remote node you are connected to has to reset the connection. This is usually caused by a
* high amount of traffic on the host, but may be caused by a server error or the remote node has exhausted
* system resources as well. Such {@link IOException} will be considered as <em>reconnection required</em>.
* <h3>Timeout exceptions</h3>
* <p>
* For read operations over the {@link InputStream} or write operation through the {@link OutputStream}
* the {@link Socket#setSoTimeout(int)} will be used and an {@link SocketTimeoutException} will be
* thrown when the timeout occured. The default value is taken from {@link IgniteConfiguration#getNetworkTimeout()}.
* <p>
* If reconnection is not occurred withing configured timeout interval the timeout object will be fired which
* clears corresponding to the used topic the {@link GridIoManager.ReceiverContext}.
* <h2>Release resources</h2>
* <p>
* It is important to call <em>close()</em> method or use <em>try-with-resource</em> statement to release
* all resources once you've done with sending files.
* @see FileChannel#transferTo(long, long, WritableByteChannel)
public class TransmissionSender implements Closeable {
/** Remote node id to connect to. */
private final UUID rmtId;
/** Remote topic to connect to. */
private final Object topic;
/** Current unique session identifier to transfer files to remote node. */
private T2<UUID, IgniteUuid> sesKey;
/** Instance of opened writable channel to work with. */
private WritableByteChannel channel;
/** Decorated with data operations socket of output channel. */
private ObjectOutput out;
/** Decorated with data operations socket of input channel. */
private ObjectInput in;
* @param rmtId The remote node to connect to.
* @param topic The remote topic to connect to.
public TransmissionSender(
UUID rmtId,
Object topic
) {
this.rmtId = rmtId;
this.topic = topic;
sesKey = new T2<>(rmtId, IgniteUuid.randomUuid());
* @return The synchronization meta if case connection has been reset.
* @throws IgniteCheckedException If fails.
* @throws IOException If fails.
private TransmissionMeta connect() throws IgniteCheckedException, IOException {
SocketChannel channel = (SocketChannel)openChannel(rmtId,
new SessionChannelMessage(sesKey.get2()))
configureChannel(channel, netTimeoutMs); = (WritableByteChannel)channel;
out = new ObjectOutputStream(channel.socket().getOutputStream());
in = new ObjectInputStream(channel.socket().getInputStream());
TransmissionMeta syncMeta;
try {
// Synchronize state between remote and local nodes.
syncMeta = (TransmissionMeta)in.readObject();
catch (ClassNotFoundException e) {
throw new IgniteException(e);
return syncMeta;
* @param file Source file to send to remote.
* @param params Additional file params.
* @param plc The policy of handling data on remote.
* @throws IgniteCheckedException If fails.
public void send(
File file,
Map<String, Serializable> params,
TransmissionPolicy plc
) throws IgniteCheckedException, InterruptedException, IOException {
send(file, 0, file.length(), params, plc);
* @param file Source file to send to remote.
* @param plc The policy of handling data on remote.
* @throws IgniteCheckedException If fails.
public void send(
File file,
TransmissionPolicy plc
) throws IgniteCheckedException, InterruptedException, IOException {
send(file, 0, file.length(), new HashMap<>(), plc);
* @param file Source file to send to remote.
* @param offset Position to start trasfer at.
* @param cnt Number of bytes to transfer.
* @param params Additional file params.
* @param plc The policy of handling data on remote.
* @throws IgniteCheckedException If fails.
public void send(
File file,
long offset,
long cnt,
Map<String, Serializable> params,
TransmissionPolicy plc
) throws IgniteCheckedException, InterruptedException, IOException {
long startTime = U.currentTimeMillis();
int retries = 0;
senderStopFlags.putIfAbsent(sesKey, new AtomicBoolean());
try (FileSender snd = new FileSender(file,
() -> stopping || senderStopFlags.get(sesKey).get(),
) {
if (log.isDebugEnabled()) {
log.debug("Start writing file to remote node [file=" + file.getName() +
", rmtNodeId=" + rmtId + ", topic=" + topic + ']');
while (true) {
if (Thread.currentThread().isInterrupted())
throw new InterruptedException("The thread has been interrupted. Stop uploading file.");
if (stopping)
throw new NodeStoppingException("Operation has been cancelled (node is stopping)");
try {
TransmissionMeta rcvMeta = null;
// In/out streams are not null if file has been sent successfully.
if (out == null && in == null) {
rcvMeta = connect();
assert rcvMeta != null : "Remote receiver has not sent its meta";
// Stop in case of any error occurred on remote node during file processing.
if (rcvMeta.error() != null)
throw rcvMeta.error();
snd.send(channel, out, rcvMeta);
// Read file received acknowledge.
boolean written = in.readBoolean();
assert written : "File is not fully written: " + file.getAbsolutePath();
catch (IOException e) {
if (retries >= retryCnt) {
throw new IgniteException("The number of retry attempts to upload file exceeded " +
"the limit: " + retryCnt, e);
// Re-establish the new connection to continue upload.
U.warn(log, "Connection lost while writing a file to remote node and " +
"will be reestablished [rmtId=" + rmtId + ", file=" + file.getName() +
", sesKey=" + sesKey + ", retries=" + retries +
", transferred=" + snd.transferred() +
", total=" + snd.state().count() + ']', e);
U.log(log, "File has been sent to remote node [name=" + file.getName() +
", uploadTime=" + (double)((U.currentTimeMillis() - startTime) / 1000) + " sec, retries=" + retries +
", transferred=" + snd.transferred() + ", rmtId=" + rmtId + ']');
catch (InterruptedException e) {
throw e;
catch (IgniteCheckedException e) {
if (X.hasCause(e, TransmissionCancelledException.class)) {
throw new TransmissionCancelledException("File transmission has been cancelled on the remote node " +
"[rmtId=" + rmtId + ", file=" + file.getName() + ", sesKey=" + sesKey + ", retries=" + retries +
", cause='" + e.getCause(TransmissionCancelledException.class).getMessage() + "']");
throw new IgniteCheckedException("Exception while sending file [rmtId=" + rmtId +
", file=" + file.getName() + ", sesKey=" + sesKey + ", retries=" + retries + ']', e);
catch (Throwable e) {
if (stopping)
throw new NodeStoppingException("Operation has been cancelled (node is stopping)");
if (senderStopFlags.get(sesKey).get())
throw new ClusterTopologyCheckedException("Remote node left the cluster: " + rmtId, e);
throw new IgniteException("Unexpected exception while sending file to the remote node. The process stopped " +
"[rmtId=" + rmtId + ", file=" + file.getName() + ", sesKey=" + sesKey + ']', e);
/** {@inheritDoc} */
@Override public void close() throws IOException {
try {
ObjectOutput out0 = out;
if (out0 == null)
U.log(log, "Close file writer session: " + sesKey);
// Send transmission close flag.
catch (IOException e) {
U.warn(log, "An exception while writing close session flag occured. " +
" Session close operation has been ignored", e);
finally {
/** Close channel and relese resources. */
private void closeChannelQuiet() {
out = null;
in = null;
channel = null;
* Linked chain of listeners.
private static class ArrayListener implements GridMessageListener {
/** */
private volatile GridMessageListener[] arr;
* @param arr Array of listeners.
ArrayListener(GridMessageListener... arr) {
this.arr = arr;
* Passes message to the whole chain.
* @param nodeId Node ID.
* @param msg Message.
@Override public void onMessage(UUID nodeId, Object msg, byte plc) {
GridMessageListener[] arr0 = arr;
if (arr0 == null)
for (GridMessageListener l : arr0)
l.onMessage(nodeId, msg, plc);
* @return {@code true} If this instance is empty.
boolean isEmpty() {
return arr == null;
* @param l Listener.
* @return {@code true} If listener was removed.
synchronized boolean remove(GridMessageListener l) {
GridMessageListener[] arr0 = arr;
if (arr0 == null)
return false;
if (arr0.length == 1) {
if (!arr0[0].equals(l))
return false;
arr = null;
return true;
for (int i = 0; i < arr0.length; i++) {
if (arr0[i].equals(l)) {
int newLen = arr0.length - 1;
if (i == newLen) // Remove last.
arr = Arrays.copyOf(arr0, newLen);
else {
GridMessageListener[] arr1 = new GridMessageListener[newLen];
if (i != 0) // Not remove first.
System.arraycopy(arr0, 0, arr1, 0, i);
System.arraycopy(arr0, i + 1, arr1, i, newLen - i);
arr = arr1;
return true;
return false;
* @param l Listener.
* @return {@code true} if listener was added. Add can fail if this instance is empty and is about to be removed
* from map.
synchronized boolean add(GridMessageListener l) {
GridMessageListener[] arr0 = arr;
if (arr0 == null)
return false;
int oldLen = arr0.length;
arr0 = Arrays.copyOf(arr0, oldLen + 1);
arr0[oldLen] = l;
arr = arr0;
return true;
* This class represents a message listener wrapper that knows about peer deployment.
private class GridUserMessageListener implements GridMessageListener {
/** Predicate listeners. */
private final IgniteBiPredicate<UUID, Object> predLsnr;
/** User message topic. */
private final Object topic;
/** Initial node id. */
private final UUID initNodeId;
* @param topic User topic.
* @param predLsnr Predicate listener.
* @param initNodeId Node id that registered given listener.
GridUserMessageListener(@Nullable Object topic, @Nullable IgniteBiPredicate<UUID, Object> predLsnr,
@Nullable UUID initNodeId) {
this.topic = topic;
this.predLsnr = predLsnr;
this.initNodeId = initNodeId;
* @param topic User topic.
* @param predLsnr Predicate listener.
GridUserMessageListener(@Nullable Object topic, @Nullable IgniteBiPredicate<UUID, Object> predLsnr) {
this(topic, predLsnr, null);
/** {@inheritDoc} */
@Override public void onMessage(UUID nodeId, Object msg, byte plc) {
if (!(msg instanceof GridIoUserMessage)) {
U.error(log, "Received unknown message (potentially fatal problem): " + msg);
GridIoUserMessage ioMsg = (GridIoUserMessage)msg;
ClusterNode node = ctx.discovery().node(nodeId);
if (node == null) {
U.warn(log, "Failed to resolve sender node (did the node left grid?): " + nodeId);
Lock lock = busyLock.readLock();
try {
if (stopping) {
if (log.isDebugEnabled())
log.debug("Received user message while stopping (will ignore) [nodeId=" +
nodeId + ", msg=" + msg + ']');
Object msgBody = ioMsg.body();
assert msgBody != null || ioMsg.bodyBytes() != null;
try {
byte[] msgTopicBytes = ioMsg.topicBytes();
Object msgTopic = ioMsg.topic();
GridDeployment dep = ioMsg.deployment();
if (dep == null && ctx.config().isPeerClassLoadingEnabled() &&
ioMsg.deploymentClassName() != null) {
dep = ctx.deploy().getGlobalDeployment(
if (dep == null)
throw new IgniteDeploymentCheckedException(
"Failed to obtain deployment information for user message. " +
"If you are using custom message or topic class, try implementing " +
"GridPeerDeployAware interface. [msg=" + ioMsg + ']');
ioMsg.deployment(dep); // Cache deployment.
// Unmarshall message topic if needed.
if (msgTopic == null && msgTopicBytes != null) {
msgTopic = U.unmarshal(marsh, msgTopicBytes,
U.resolveClassLoader(dep != null ? dep.classLoader() : null, ctx.config()));
ioMsg.topic(msgTopic); // Save topic to avoid future unmarshallings.
if (!F.eq(topic, msgTopic))
if (msgBody == null) {
msgBody = U.unmarshal(marsh, ioMsg.bodyBytes(),
U.resolveClassLoader(dep != null ? dep.classLoader() : null, ctx.config()));
ioMsg.body(msgBody); // Save body to avoid future unmarshallings.
// Resource injection.
if (dep != null)
ctx.resource().inject(dep, dep.deployedClass(ioMsg.deploymentClassName()), msgBody);
catch (IgniteCheckedException e) {
U.error(log, "Failed to unmarshal user message [node=" + nodeId + ", message=" +
msg + ']', e);
if (msgBody != null) {
if (predLsnr != null) {
try (OperationSecurityContext s = {
if (!predLsnr.apply(nodeId, msgBody))
removeMessageListener(TOPIC_COMM_USER, this);
finally {
/** {@inheritDoc} */
@Override public boolean equals(Object o) {
if (this == o)
return true;
if (o == null || getClass() != o.getClass())
return false;
GridUserMessageListener l = (GridUserMessageListener)o;
return F.eq(predLsnr, l.predLsnr) && F.eq(topic, l.topic);
/** {@inheritDoc} */
@Override public int hashCode() {
int res = predLsnr != null ? predLsnr.hashCode() : 0;
res = 31 * res + (topic != null ? topic.hashCode() : 0);
return res;
/** {@inheritDoc} */
@Override public String toString() {
return S.toString(GridUserMessageListener.class, this);
* Ordered communication message set.
private class GridCommunicationMessageSet implements GridTimeoutObject {
/** */
private final UUID nodeId;
/** */
private long endTime;
/** */
private final IgniteUuid timeoutId;
/** */
private final Object topic;
/** */
private final byte plc;
/** */
private final Queue<OrderedMessageContainer> msgs = new ConcurrentLinkedDeque<>();
/** */
private final AtomicBoolean reserved = new AtomicBoolean();
/** */
private final long timeout;
/** */
private final boolean skipOnTimeout;
/** */
private long lastTs;
* @param plc Communication policy.
* @param topic Communication topic.
* @param nodeId Node ID.
* @param timeout Timeout.
* @param skipOnTimeout Whether message can be skipped on timeout.
* @param msg Message to add immediately.
* @param msgC Message closure (may be {@code null}).
byte plc,
Object topic,
UUID nodeId,
long timeout,
boolean skipOnTimeout,
GridIoMessage msg,
@Nullable IgniteRunnable msgC
) {
assert nodeId != null;
assert topic != null;
assert msg != null;
this.plc = plc;
this.nodeId = nodeId;
this.topic = topic;
this.timeout = timeout == 0 ? ctx.config().getNetworkTimeout() : timeout;
this.skipOnTimeout = skipOnTimeout;
endTime = endTime(timeout);
timeoutId = IgniteUuid.randomUuid();
lastTs = U.currentTimeMillis();
msgs.add(new OrderedMessageContainer(msg, lastTs, msgC, MTC.span()));
/** {@inheritDoc} */
@Override public IgniteUuid timeoutId() {
return timeoutId;
/** {@inheritDoc} */
@Override public long endTime() {
return endTime;
/** {@inheritDoc} */
@Override public void onTimeout() {
GridMessageListener lsnr = listenerGet0(topic);
if (lsnr != null) {
long delta = 0;
if (skipOnTimeout) {
while (true) {
delta = 0;
boolean unwind = false;
synchronized (this) {
if (!msgs.isEmpty()) {
delta = U.currentTimeMillis() - lastTs;
if (delta >= timeout)
unwind = true;
if (unwind)
unwindMessageSet(this, lsnr);
// Someone is still listening to messages, so delay set removal.
endTime = endTime(timeout - delta);
if (log.isDebugEnabled())
log.debug("Removing message set due to timeout: " + this);
ConcurrentMap<UUID, GridCommunicationMessageSet> map = msgSetMap.get(topic);
if (map != null) {
boolean rmv;
synchronized (map) {
rmv = map.remove(nodeId, this) && map.isEmpty();
if (rmv)
msgSetMap.remove(topic, map);
* @return ID of node that sent the messages in the set.
UUID nodeId() {
return nodeId;
* @return Communication policy.
byte policy() {
return plc;
* @return Message topic.
Object topic() {
return topic;
* @return {@code True} if successful.
boolean reserve() {
return reserved.compareAndSet(false, true);
* @return {@code True} if set is reserved.
boolean reserved() {
return reserved.get();
* Releases reservation.
void release() {
assert reserved.get() : "Message set was not reserved: " + this;
* @param lsnr Listener to notify.
void unwind(GridMessageListener lsnr) {
assert reserved.get();
for (OrderedMessageContainer mc = msgs.poll(); mc != null; mc = msgs.poll()) {
try (TraceSurroundings ignore = support(ctx.tracing().create(
try {
OrderedMessageContainer fmc = mc;
MTC.span().addTag(SpanTags.MESSAGE, () -> traceName(fmc.message));
invokeListener(plc, lsnr, nodeId, mc.message.message(), secSubjId(mc.message));
finally {
if (mc.closure != null);
* @param msg Message to add.
* @param msgC Message closure (may be {@code null}).
void add(
GridIoMessage msg,
@Nullable IgniteRunnable msgC
) {
msgs.add(new OrderedMessageContainer(msg, U.currentTimeMillis(), msgC, MTC.span()));
* @return {@code True} if set has messages to unwind.
boolean changed() {
return !msgs.isEmpty();
* Calculates end time with overflow check.
* @param timeout Timeout in milliseconds.
* @return End time in milliseconds.
private long endTime(long timeout) {
long endTime = U.currentTimeMillis() + timeout;
// Account for overflow.
if (endTime < 0)
endTime = Long.MAX_VALUE;
return endTime;
/** {@inheritDoc} */
@Override public String toString() {
return S.toString(GridCommunicationMessageSet.class, this);
* DTO for handling of communication message.
private static class OrderedMessageContainer {
/** */
GridIoMessage message;
/** */
long addedTime;
/** */
IgniteRunnable closure;
/** */
Span parentSpan;
* @param msg Received message.
* @param addedTime Time of added to queue.
* @param c Message closure.
* @param parentSpan Span of process which added this message.
private OrderedMessageContainer(GridIoMessage msg, Long addedTime, IgniteRunnable c, Span parentSpan) {
this.message = msg;
this.addedTime = addedTime;
this.closure = c;
this.parentSpan = parentSpan;
private static class ConcurrentHashMap0<K, V> extends ConcurrentHashMap<K, V> {
/** */
private static final long serialVersionUID = 0L;
/** */
private int hash;
* @param o Object to be compared for equality with this map.
* @return {@code True} only for {@code this}.
@Override public boolean equals(Object o) {
return o == this;
* @return Identity hash code.
@Override public int hashCode() {
if (hash == 0) {
int hash0 = System.identityHashCode(this);
hash = hash0 != 0 ? hash0 : -1;
return hash;
private static class DelayedMessage {
/** */
private final UUID nodeId;
/** */
private final GridIoMessage msg;
/** */
private final IgniteRunnable msgC;
* @param nodeId Node ID.
* @param msg Message.
* @param msgC Callback.
private DelayedMessage(UUID nodeId, GridIoMessage msg, IgniteRunnable msgC) {
this.nodeId = nodeId;
this.msg = msg;
this.msgC = msgC;
* @return Message char.
public IgniteRunnable callback() {
return msgC;
* @return Message.
public GridIoMessage message() {
return msg;
* @return Node id.
public UUID nodeId() {
return nodeId;
/** {@inheritDoc} */
@Override public String toString() {
return S.toString(DelayedMessage.class, this, super.toString());
private class IoTestFuture extends GridFutureAdapter<List<IgniteIoTestMessage>> {
/** */
private final long id;
/** */
private final int cntr;
/** */
private final List<IgniteIoTestMessage> ress;
* @param id ID.
* @param cntr Counter.
IoTestFuture(long id, int cntr) {
assert cntr > 0 : cntr; = id;
this.cntr = cntr;
ress = new ArrayList<>(cntr);
void onResponse(IgniteIoTestMessage res) {
boolean complete;
synchronized (this) {
complete = cntr == ress.size();
if (complete)
/** {@inheritDoc} */
@Override public boolean onDone(List<IgniteIoTestMessage> res, @Nullable Throwable err) {
if (super.onDone(res, err)) {
return true;
return false;
/** {@inheritDoc} */
@Override public String toString() {
return S.toString(IoTestFuture.class, this);
private static class IoTestThreadLocalNodeResults {
/** */
private final long[] resLatency;
/** */
private final int rangesCnt;
/** */
private long totalLatency;
/** */
private long maxLatency;
/** */
private long maxLatencyTs;
/** */
private long maxReqSendQueueTime;
/** */
private long maxReqSendQueueTimeTs;
/** */
private long maxReqRcvQueueTime;
/** */
private long maxReqRcvQueueTimeTs;
/** */
private long maxResSendQueueTime;
/** */
private long maxResSendQueueTimeTs;
/** */
private long maxResRcvQueueTime;
/** */
private long maxResRcvQueueTimeTs;
/** */
private long maxReqWireTimeMillis;
/** */
private long maxReqWireTimeTs;
/** */
private long maxResWireTimeMillis;
/** */
private long maxResWireTimeTs;
/** */
private final long latencyLimit;
* @param rangesCnt Ranges count.
* @param latencyLimit
public IoTestThreadLocalNodeResults(int rangesCnt, long latencyLimit) {
this.rangesCnt = rangesCnt;
this.latencyLimit = latencyLimit;
resLatency = new long[rangesCnt + 1];
* @param msg
public void onResult(IgniteIoTestMessage msg) {
long now = System.currentTimeMillis();
long latency = msg.responseProcessedTs() - msg.requestCreateTs();
int idx = latency >= latencyLimit ?
rangesCnt /* Timed out. */ :
(int)Math.floor((1.0 * latency) / ((1.0 * latencyLimit) / rangesCnt));
totalLatency += latency;
if (maxLatency < latency) {
maxLatency = latency;
maxLatencyTs = now;
long reqSndQueueTime = msg.requestSendTs() - msg.requestCreateTs();
if (maxReqSendQueueTime < reqSndQueueTime) {
maxReqSendQueueTime = reqSndQueueTime;
maxReqSendQueueTimeTs = now;
long reqRcvQueueTime = msg.requestProcessTs() - msg.requestReceiveTs();
if (maxReqRcvQueueTime < reqRcvQueueTime) {
maxReqRcvQueueTime = reqRcvQueueTime;
maxReqRcvQueueTimeTs = now;
long resSndQueueTime = msg.responseSendTs() - msg.requestProcessTs();
if (maxResSendQueueTime < resSndQueueTime) {
maxResSendQueueTime = resSndQueueTime;
maxResSendQueueTimeTs = now;
long resRcvQueueTime = msg.responseProcessedTs() - msg.responseReceiveTs();
if (maxResRcvQueueTime < resRcvQueueTime) {
maxResRcvQueueTime = resRcvQueueTime;
maxResRcvQueueTimeTs = now;
long reqWireTimeMillis = msg.requestReceivedTsMillis() - msg.requestSendTsMillis();
if (maxReqWireTimeMillis < reqWireTimeMillis) {
maxReqWireTimeMillis = reqWireTimeMillis;
maxReqWireTimeTs = now;
long resWireTimeMillis = msg.responseReceivedTsMillis() - msg.requestSendTsMillis();
if (maxResWireTimeMillis < resWireTimeMillis) {
maxResWireTimeMillis = resWireTimeMillis;
maxResWireTimeTs = now;
private static class IoTestNodeResults {
/** */
private long latencyLimit;
/** */
private long[] resLatency;
/** */
private long totalLatency;
/** */
private Collection<IgnitePair<Long>> maxLatency = new ArrayList<>();
/** */
private Collection<IgnitePair<Long>> maxReqSendQueueTime = new ArrayList<>();
/** */
private Collection<IgnitePair<Long>> maxReqRcvQueueTime = new ArrayList<>();
/** */
private Collection<IgnitePair<Long>> maxResSendQueueTime = new ArrayList<>();
/** */
private Collection<IgnitePair<Long>> maxResRcvQueueTime = new ArrayList<>();
/** */
private Collection<IgnitePair<Long>> maxReqWireTimeMillis = new ArrayList<>();
/** */
private Collection<IgnitePair<Long>> maxResWireTimeMillis = new ArrayList<>();
* @param res Node results to add.
public void add(IoTestThreadLocalNodeResults res) {
if (resLatency == null) {
resLatency = res.resLatency.clone();
latencyLimit = res.latencyLimit;
else {
assert latencyLimit == res.latencyLimit;
assert resLatency.length == res.resLatency.length;
for (int i = 0; i < resLatency.length; i++)
resLatency[i] += res.resLatency[i];
totalLatency += res.totalLatency;
maxLatency.add(F.pair(res.maxLatency, res.maxLatencyTs));
maxReqSendQueueTime.add(F.pair(res.maxReqSendQueueTime, res.maxReqSendQueueTimeTs));
maxReqRcvQueueTime.add(F.pair(res.maxReqRcvQueueTime, res.maxReqRcvQueueTimeTs));
maxResSendQueueTime.add(F.pair(res.maxResSendQueueTime, res.maxResSendQueueTimeTs));
maxResRcvQueueTime.add(F.pair(res.maxResRcvQueueTime, res.maxResRcvQueueTimeTs));
maxReqWireTimeMillis.add(F.pair(res.maxReqWireTimeMillis, res.maxReqWireTimeTs));
maxResWireTimeMillis.add(F.pair(res.maxResWireTimeMillis, res.maxResWireTimeTs));
* @return Bin latency in microseconds.
public long binLatencyMcs() {
if (resLatency == null)
throw new IllegalStateException();
return latencyLimit / (1000 * (resLatency.length - 1));
* @return Security subject id.
private UUID secSubjId(GridIoMessage msg) {
if ( {
assert msg instanceof GridIoSecurityAwareMessage;
return ((GridIoSecurityAwareMessage) msg).secSubjId();
return null;