/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.ignite.spi.communication.tcp;

import javax.net.ssl.SSLEngine;
import javax.net.ssl.SSLException;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.ConnectException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.net.SocketException;
import java.net.SocketTimeoutException;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.nio.channels.Channel;
import java.nio.channels.SelectableChannel;
import java.nio.channels.SocketChannel;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.BitSet;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.StringJoiner;
import java.util.UUID;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteClientDisconnectedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.IgniteSystemProperties;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.configuration.AddressResolver;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.events.DiscoveryEvent;
import org.apache.ignite.events.Event;
import org.apache.ignite.failure.FailureContext;
import org.apache.ignite.failure.FailureType;
import org.apache.ignite.internal.IgniteClientDisconnectedCheckedException;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.IgniteFeatures;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.IgniteInterruptedCheckedException;
import org.apache.ignite.internal.IgniteKernal;
import org.apache.ignite.internal.IgniteTooManyOpenFilesException;
import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
import org.apache.ignite.internal.managers.discovery.IgniteDiscoverySpi;
import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
import org.apache.ignite.internal.managers.eventstorage.HighPriorityListener;
import org.apache.ignite.internal.processors.metric.impl.MetricUtils;
import org.apache.ignite.internal.processors.tracing.MTC;
import org.apache.ignite.internal.processors.tracing.NoopTracing;
import org.apache.ignite.internal.processors.tracing.SpanTags;
import org.apache.ignite.internal.processors.tracing.Tracing;
import org.apache.ignite.internal.util.GridConcurrentFactory;
import org.apache.ignite.internal.util.GridSpinReadWriteLock;
import org.apache.ignite.internal.util.future.GridFinishedFuture;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.internal.util.future.IgniteFutureImpl;
import org.apache.ignite.internal.util.ipc.IpcEndpoint;
import org.apache.ignite.internal.util.ipc.IpcToNioAdapter;
import org.apache.ignite.internal.util.ipc.shmem.IpcOutOfSystemResourcesException;
import org.apache.ignite.internal.util.ipc.shmem.IpcSharedMemoryServerEndpoint;
import org.apache.ignite.internal.util.lang.IgniteInClosure2X;
import org.apache.ignite.internal.util.nio.GridCommunicationClient;
import org.apache.ignite.internal.util.nio.GridConnectionBytesVerifyFilter;
import org.apache.ignite.internal.util.nio.GridDirectParser;
import org.apache.ignite.internal.util.nio.GridNioCodecFilter;
import org.apache.ignite.internal.util.nio.GridNioFilter;
import org.apache.ignite.internal.util.nio.GridNioMessageReaderFactory;
import org.apache.ignite.internal.util.nio.GridNioMessageTracker;
import org.apache.ignite.internal.util.nio.GridNioMessageWriterFactory;
import org.apache.ignite.internal.util.nio.GridNioRecoveryDescriptor;
import org.apache.ignite.internal.util.nio.GridNioServer;
import org.apache.ignite.internal.util.nio.GridNioServerListener;
import org.apache.ignite.internal.util.nio.GridNioServerListenerAdapter;
import org.apache.ignite.internal.util.nio.GridNioSession;
import org.apache.ignite.internal.util.nio.GridNioSessionMetaKey;
import org.apache.ignite.internal.util.nio.GridSelectorNioSessionImpl;
import org.apache.ignite.internal.util.nio.GridNioTracerFilter;
import org.apache.ignite.internal.util.nio.GridShmemCommunicationClient;
import org.apache.ignite.internal.util.nio.GridTcpNioCommunicationClient;
import org.apache.ignite.internal.util.nio.ssl.BlockingSslHandler;
import org.apache.ignite.internal.util.nio.ssl.GridNioSslFilter;
import org.apache.ignite.internal.util.nio.ssl.GridSslMeta;
import org.apache.ignite.internal.util.typedef.CI1;
import org.apache.ignite.internal.util.typedef.CI2;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.X;
import org.apache.ignite.internal.util.typedef.internal.LT;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.internal.util.worker.GridWorker;
import org.apache.ignite.internal.worker.WorkersRegistry;
import org.apache.ignite.lang.IgniteBiInClosure;
import org.apache.ignite.lang.IgniteBiTuple;
import org.apache.ignite.lang.IgniteFuture;
import org.apache.ignite.lang.IgniteInClosure;
import org.apache.ignite.lang.IgnitePredicate;
import org.apache.ignite.lang.IgniteProductVersion;
import org.apache.ignite.lang.IgniteRunnable;
import org.apache.ignite.lang.IgniteUuid;
import org.apache.ignite.plugin.extensions.communication.Message;
import org.apache.ignite.plugin.extensions.communication.MessageFactory;
import org.apache.ignite.plugin.extensions.communication.MessageFormatter;
import org.apache.ignite.plugin.extensions.communication.MessageReader;
import org.apache.ignite.plugin.extensions.communication.MessageWriter;
import org.apache.ignite.resources.IgniteInstanceResource;
import org.apache.ignite.resources.LoggerResource;
import org.apache.ignite.spi.ExponentialBackoffTimeoutStrategy;
import org.apache.ignite.spi.IgnitePortProtocol;
import org.apache.ignite.spi.IgniteSpiAdapter;
import org.apache.ignite.spi.IgniteSpiConfiguration;
import org.apache.ignite.spi.IgniteSpiConsistencyChecked;
import org.apache.ignite.spi.IgniteSpiContext;
import org.apache.ignite.spi.IgniteSpiException;
import org.apache.ignite.spi.IgniteSpiMBeanAdapter;
import org.apache.ignite.spi.IgniteSpiMultipleInstancesSupport;
import org.apache.ignite.spi.IgniteSpiOperationTimeoutException;
import org.apache.ignite.spi.IgniteSpiOperationTimeoutHelper;
import org.apache.ignite.spi.IgniteSpiThread;
import org.apache.ignite.spi.IgniteSpiTimeoutObject;
import org.apache.ignite.spi.TimeoutStrategy;
import org.apache.ignite.spi.communication.CommunicationListener;
import org.apache.ignite.spi.communication.CommunicationSpi;
import org.apache.ignite.spi.communication.tcp.internal.CommunicationListenerEx;
import org.apache.ignite.spi.communication.tcp.internal.ConnectionKey;
import org.apache.ignite.spi.communication.tcp.internal.HandshakeException;
import org.apache.ignite.spi.communication.tcp.internal.TcpCommunicationConnectionCheckFuture;
import org.apache.ignite.spi.communication.tcp.internal.TcpCommunicationNodeConnectionCheckFuture;
import org.apache.ignite.spi.communication.tcp.messages.HandshakeMessage;
import org.apache.ignite.spi.communication.tcp.messages.HandshakeMessage2;
import org.apache.ignite.spi.communication.tcp.messages.HandshakeWaitMessage;
import org.apache.ignite.spi.communication.tcp.messages.NodeIdMessage;
import org.apache.ignite.spi.communication.tcp.messages.RecoveryLastReceivedMessage;
import org.apache.ignite.spi.discovery.DiscoverySpi;
import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
import org.apache.ignite.thread.IgniteThread;
import org.jetbrains.annotations.Nullable;

import static java.util.Collections.emptyList;
import static java.util.Objects.nonNull;
import static org.apache.ignite.IgniteSystemProperties.IGNITE_TCP_COMM_SET_ATTR_HOST_NAMES;
import static org.apache.ignite.IgniteSystemProperties.getBoolean;
import static org.apache.ignite.events.EventType.EVT_NODE_FAILED;
import static org.apache.ignite.events.EventType.EVT_NODE_LEFT;
import static org.apache.ignite.failure.FailureType.CRITICAL_ERROR;
import static org.apache.ignite.failure.FailureType.SYSTEM_WORKER_TERMINATION;
import static org.apache.ignite.internal.processors.tracing.messages.TraceableMessagesTable.traceName;
import static org.apache.ignite.internal.IgniteFeatures.CHANNEL_COMMUNICATION;
import static org.apache.ignite.internal.IgniteFeatures.nodeSupports;
import static org.apache.ignite.internal.util.nio.GridNioSessionMetaKey.SSL_META;
import static org.apache.ignite.plugin.extensions.communication.Message.DIRECT_TYPE_SIZE;
import static org.apache.ignite.spi.communication.tcp.internal.TcpCommunicationConnectionCheckFuture.SES_FUT_META;
import static org.apache.ignite.spi.communication.tcp.messages.RecoveryLastReceivedMessage.ALREADY_CONNECTED;
import static org.apache.ignite.spi.communication.tcp.messages.RecoveryLastReceivedMessage.NEED_WAIT;
import static org.apache.ignite.spi.communication.tcp.messages.RecoveryLastReceivedMessage.NODE_STOPPING;
import static org.apache.ignite.spi.communication.tcp.messages.RecoveryLastReceivedMessage.UNKNOWN_NODE;

/**
 * <tt>TcpCommunicationSpi</tt> is default communication SPI which uses
 * TCP/IP protocol and Java NIO to communicate with other nodes.
 * <p>
 * To enable communication with other nodes, this SPI adds {@link #ATTR_ADDRS}
 * and {@link #ATTR_PORT} local node attributes (see {@link ClusterNode#attributes()}.
 * <p>
 * At startup, this SPI tries to start listening to local port specified by
 * {@link #setLocalPort(int)} method. If local port is occupied, then SPI will
 * automatically increment the port number until it can successfully bind for
 * listening. {@link #setLocalPortRange(int)} configuration parameter controls
 * maximum number of ports that SPI will try before it fails. Port range comes
 * very handy when starting multiple grid nodes on the same machine or even
 * in the same VM. In this case all nodes can be brought up without a single
 * change in configuration.
 * <p>
 * This SPI caches connections to remote nodes so it does not have to reconnect every
 * time a message is sent. By default, idle connections are kept active for
 * {@link #DFLT_IDLE_CONN_TIMEOUT} period and then are closed. Use
 * {@link #setIdleConnectionTimeout(long)} configuration parameter to configure
 * you own idle connection timeout.
 * <h1 class="header">Failure Detection</h1>
 * Configuration defaults (see Configuration section below and
 * {@link IgniteConfiguration#getFailureDetectionTimeout()}) for details) are chosen to make possible for
 * communication SPI work reliably on most of hardware and virtual deployments, but this has made failure detection
 * time worse.
 * <p>
 * If it's needed to tune failure detection then it's highly recommended to do this using
 * {@link IgniteConfiguration#setFailureDetectionTimeout(long)}. This failure timeout automatically controls the
 * following parameters: {@link #getConnectTimeout()}, {@link #getMaxConnectTimeout()},
 * {@link #getReconnectCount()}. If any of those parameters is set explicitly, then the failure timeout setting will be
 * ignored.
 * <p>
 * If it's required to perform advanced settings of failure detection and
 * {@link IgniteConfiguration#getFailureDetectionTimeout()} is unsuitable then various {@code TcpCommunicationSpi}
 * configuration parameters may be used.
 * <h1 class="header">Configuration</h1>
 * <h2 class="header">Mandatory</h2>
 * This SPI has no mandatory configuration parameters.
 * <h2 class="header">Optional</h2>
 * The following configuration parameters are optional:
 * <ul>
 * <li>Address resolver (see {@link #setAddressResolver(AddressResolver)}</li>
 * <li>Node local IP address (see {@link #setLocalAddress(String)})</li>
 * <li>Node local port number (see {@link #setLocalPort(int)})</li>
 * <li>Local port range (see {@link #setLocalPortRange(int)}</li>
 * <li>Use paired connections (see {@link #setUsePairedConnections(boolean)}</li>
 * <li>Connections per node (see {@link #setConnectionsPerNode(int)})</li>
 * <li>Shared memory port (see {@link #setSharedMemoryPort(int)}</li>
 * <li>Idle connection timeout (see {@link #setIdleConnectionTimeout(long)})</li>
 * <li>Direct or heap buffer allocation (see {@link #setDirectBuffer(boolean)})</li>
 * <li>Direct or heap buffer allocation for sending (see {@link #setDirectSendBuffer(boolean)})</li>
 * <li>Count of selectors and selector threads for NIO server (see {@link #setSelectorsCount(int)})</li>
 * <li>Selector thread busy-loop iterations (see {@link #setSelectorSpins(long)}</li>
 * <li>{@code TCP_NODELAY} socket option for sockets (see {@link #setTcpNoDelay(boolean)})</li>
 * <li>Filter reachable addresses (see {@link #setFilterReachableAddresses(boolean)} </li>
 * <li>Message queue limit (see {@link #setMessageQueueLimit(int)})</li>
 * <li>Slow client queue limit (see {@link #setSlowClientQueueLimit(int)})</li>
 * <li>Connect timeout (see {@link #setConnectTimeout(long)})</li>
 * <li>Maximum connect timeout (see {@link #setMaxConnectTimeout(long)})</li>
 * <li>Reconnect attempts count (see {@link #setReconnectCount(int)})</li>
 * <li>Socket receive buffer size (see {@link #setSocketReceiveBuffer(int)})</li>
 * <li>Socket send buffer size (see {@link #setSocketSendBuffer(int)})</li>
 * <li>Socket write timeout (see {@link #setSocketWriteTimeout(long)})</li>
 * <li>Number of received messages after which acknowledgment is sent (see {@link #setAckSendThreshold(int)})</li>
 * <li>Maximum number of unacknowledged messages (see {@link #setUnacknowledgedMessagesBufferSize(int)})</li>
 * </ul>
 * <h2 class="header">Java Example</h2>
 * TcpCommunicationSpi is used by default and should be explicitly configured
 * only if some SPI configuration parameters need to be overridden.
 * <pre name="code" class="java">
 * TcpCommunicationSpi commSpi = new TcpCommunicationSpi();
 *
 * // Override local port.
 * commSpi.setLocalPort(4321);
 *
 * IgniteConfiguration cfg = new IgniteConfiguration();
 *
 * // Override default communication SPI.
 * cfg.setCommunicationSpi(commSpi);
 *
 * // Start grid.
 * Ignition.start(cfg);
 * </pre>
 * <h2 class="header">Spring Example</h2>
 * TcpCommunicationSpi can be configured from Spring XML configuration file:
 * <pre name="code" class="xml">
 * &lt;bean id="grid.custom.cfg" class="org.apache.ignite.configuration.IgniteConfiguration" singleton="true"&gt;
 *         ...
 *         &lt;property name="communicationSpi"&gt;
 *             &lt;bean class="org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi"&gt;
 *                 &lt;!-- Override local port. --&gt;
 *                 &lt;property name="localPort" value="4321"/&gt;
 *             &lt;/bean&gt;
 *         &lt;/property&gt;
 *         ...
 * &lt;/bean&gt;
 * </pre>
 * <p>
 * <img src="http://ignite.apache.org/images/spring-small.png">
 * <br>
 * For information about Spring framework visit <a href="http://www.springframework.org/">www.springframework.org</a>
 * @see CommunicationSpi
 */
@IgniteSpiMultipleInstancesSupport(true)
@IgniteSpiConsistencyChecked(optional = false)
public class TcpCommunicationSpi extends IgniteSpiAdapter implements CommunicationSpi<Message> {
    /** Time threshold to log too long connection establish. */
    private static final int CONNECTION_ESTABLISH_THRESHOLD_MS = 100;

    /** IPC error message. */
    public static final String OUT_OF_RESOURCES_TCP_MSG = "Failed to allocate shared memory segment " +
        "(switching to TCP, may be slower).";

    /** Node attribute that is mapped to node IP addresses (value is <tt>comm.tcp.addrs</tt>). */
    public static final String ATTR_ADDRS = "comm.tcp.addrs";

    /** Node attribute that is mapped to node host names (value is <tt>comm.tcp.host.names</tt>). */
    public static final String ATTR_HOST_NAMES = "comm.tcp.host.names";

    /** Node attribute that is mapped to node port number (value is <tt>comm.tcp.port</tt>). */
    public static final String ATTR_PORT = "comm.tcp.port";

    /** Node attribute that is mapped to node port number (value is <tt>comm.shmem.tcp.port</tt>). */
    public static final String ATTR_SHMEM_PORT = "comm.shmem.tcp.port";

    /** Node attribute that is mapped to node's external addresses (value is <tt>comm.tcp.ext-addrs</tt>). */
    public static final String ATTR_EXT_ADDRS = "comm.tcp.ext-addrs";

    /** */
    public static final String ATTR_PAIRED_CONN = "comm.tcp.pairedConnection";

    /** Default port which node sets listener to (value is <tt>47100</tt>). */
    public static final int DFLT_PORT = 47100;

    /** Default port which node sets listener for shared memory connections (value is <tt>48100</tt>). */
    public static final int DFLT_SHMEM_PORT = -1;

    /** Default idle connection timeout (value is <tt>10</tt>min). */
    public static final long DFLT_IDLE_CONN_TIMEOUT = 10 * 60_000;

    /** Default socket send and receive buffer size. */
    public static final int DFLT_SOCK_BUF_SIZE = 32 * 1024;

    /** Default connection timeout (value is <tt>5000</tt>ms). */
    public static final long DFLT_CONN_TIMEOUT = 5000;

    /** Default Maximum connection timeout (value is <tt>600,000</tt>ms). */
    public static final long DFLT_MAX_CONN_TIMEOUT = 10 * 60 * 1000;

    /** Default reconnect attempts count (value is <tt>10</tt>). */
    public static final int DFLT_RECONNECT_CNT = 10;

    /** Default message queue limit per connection (for incoming and outgoing . */
    public static final int DFLT_MSG_QUEUE_LIMIT = GridNioServer.DFLT_SEND_QUEUE_LIMIT;

    /**
     * Default count of selectors for TCP server equals to
     * {@code "Math.max(4, Runtime.getRuntime().availableProcessors() / 2)"}.
     */
    public static final int DFLT_SELECTORS_CNT = Math.max(4, Runtime.getRuntime().availableProcessors() / 2);

    /** Default initial connect/handshake timeout in case of failure detection enabled. */
    private static final int DFLT_INITIAL_TIMEOUT = 500;

    /** Default initial delay in case of target node is still out of topology. */
    private static final int DFLT_NEED_WAIT_DELAY = 200;

    /** Default delay between reconnects attempts in case of temporary network issues. */
    private static final int DFLT_RECONNECT_DELAY = 50;

    /**
     * Version when client is ready to wait to connect to server (could be needed when client tries to open connection
     * before it starts being visible for server)
     */
    private static final IgniteProductVersion VERSION_SINCE_CLIENT_COULD_WAIT_TO_CONNECT = IgniteProductVersion.fromString("2.1.4");

    /** Connection index meta for session. */
    public static final int CONN_IDX_META = GridNioSessionMetaKey.nextUniqueKey();

    /** Node consistent id meta for session. */
    public static final int CONSISTENT_ID_META = GridNioSessionMetaKey.nextUniqueKey();

    /** Message tracker meta for session. */
    private static final int TRACKER_META = GridNioSessionMetaKey.nextUniqueKey();

    /** Channel meta used for establishing channel connections. */
    private static final int CHANNEL_FUT_META = GridNioSessionMetaKey.nextUniqueKey();

    /**
     * Default local port range (value is <tt>100</tt>).
     * See {@link #setLocalPortRange(int)} for details.
     */
    public static final int DFLT_PORT_RANGE = 100;

    /** Default value for {@code TCP_NODELAY} socket option (value is <tt>true</tt>). */
    public static final boolean DFLT_TCP_NODELAY = true;

    /** Default value for {@code FILTER_REACHABLE_ADDRESSES} socket option (value is <tt>false</tt>). */
    public static final boolean DFLT_FILTER_REACHABLE_ADDRESSES = false;

    /** Default received messages threshold for sending ack. */
    public static final int DFLT_ACK_SND_THRESHOLD = 32;

    /** Default socket write timeout. */
    public static final long DFLT_SOCK_WRITE_TIMEOUT = 2000;

    /** Default connections per node. */
    public static final int DFLT_CONN_PER_NODE = 1;

    /** Maximum {@link GridNioSession} connections per node. */
    public static final int MAX_CONN_PER_NODE = 1024;

    /** No-op runnable. */
    private static final IgniteRunnable NOOP = () -> {};

    /** Node ID message type. */
    public static final short NODE_ID_MSG_TYPE = -1;

    /** Recovery last received ID message type. */
    public static final short RECOVERY_LAST_ID_MSG_TYPE = -2;

    /** Handshake message type. */
    public static final short HANDSHAKE_MSG_TYPE = -3;

    /** Handshake wait message type. */
    public static final short HANDSHAKE_WAIT_MSG_TYPE = -28;

    /** Communication metrics group name. */
    public static final String COMMUNICATION_METRICS_GROUP_NAME = MetricUtils.metricName("communication", "tcp");

    /** */
    public static final String SENT_MESSAGES_METRIC_NAME = "sentMessagesCount";

    /** */
    public static final String SENT_MESSAGES_METRIC_DESC = "Total number of messages sent by current node";

    /** */
    public static final String RECEIVED_MESSAGES_METRIC_NAME = "receivedMessagesCount";

    /** */
    public static final String RECEIVED_MESSAGES_METRIC_DESC = "Total number of messages received by current node";

    /** */
    public static final String SENT_MESSAGES_BY_TYPE_METRIC_NAME = "sentMessagesByType";

    /** */
    public static final String SENT_MESSAGES_BY_TYPE_METRIC_DESC =
        "Total number of messages with given type sent by current node";

    /** */
    public static final String RECEIVED_MESSAGES_BY_TYPE_METRIC_NAME = "receivedMessagesByType";

    /** */
    public static final String RECEIVED_MESSAGES_BY_TYPE_METRIC_DESC =
        "Total number of messages with given type received by current node";

    /** */
    public static final String SENT_MESSAGES_BY_NODE_CONSISTENT_ID_METRIC_NAME = "sentMessagesToNode";

    /** */
    public static final String SENT_MESSAGES_BY_NODE_CONSISTENT_ID_METRIC_DESC =
        "Total number of messages sent by current node to the given node";

    /** */
    public static final String RECEIVED_MESSAGES_BY_NODE_CONSISTENT_ID_METRIC_NAME = "receivedMessagesFromNode";

    /** */
    public static final String RECEIVED_MESSAGES_BY_NODE_CONSISTENT_ID_METRIC_DESC =
        "Total number of messages received by current node from the given node";

    /** */
    private ConnectGateway connectGate;

    /** */
    private ConnectionPolicy connPlc = new FirstConnectionPolicy();

    /** Channel connection index provider. */
    private ConnectionPolicy chConnPlc;

    /** */
    private boolean enableForcibleNodeKill = getBoolean(IgniteSystemProperties.IGNITE_ENABLE_FORCIBLE_NODE_KILL);

    /** */
    private boolean enableTroubleshootingLog = getBoolean(IgniteSystemProperties.IGNITE_TROUBLESHOOTING_LOGGER);

    /** Server listener. */
    private final GridNioServerListener<Message> srvLsnr =
        new GridNioServerListenerAdapter<Message>() {
            @Override public void onSessionWriteTimeout(GridNioSession ses) {
                LT.warn(log, "Communication SPI session write timed out (consider increasing " +
                    "'socketWriteTimeout' " + "configuration property) [remoteAddr=" + ses.remoteAddress() +
                    ", writeTimeout=" + sockWriteTimeout + ']');

                if (log.isDebugEnabled())
                    log.debug("Closing communication SPI session on write timeout [remoteAddr=" + ses.remoteAddress() +
                        ", writeTimeout=" + sockWriteTimeout + ']');

                ses.close();
            }

            @Override public void onConnected(GridNioSession ses) {
                if (ses.accepted()) {
                    if (log.isInfoEnabled())
                        log.info("Accepted incoming communication connection [locAddr=" + ses.localAddress() +
                            ", rmtAddr=" + ses.remoteAddress() + ']');

                    try {
                        boolean client = Boolean.TRUE.equals(ignite().configuration().isClientMode());

                        if (client || ctxInitLatch.getCount() == 0 || !isHandshakeWaitSupported()) {
                            if (log.isDebugEnabled())
                                log.debug("Sending local node ID to newly accepted session: " + ses);

                            ses.sendNoFuture(nodeIdMessage(), null);
                        }
                        else {
                            if (log.isDebugEnabled())
                                log.debug("Sending handshake wait message to newly accepted session: " + ses);

                            ses.sendNoFuture(new HandshakeWaitMessage(), null);
                        }
                    }
                    catch (IgniteCheckedException e) {
                        U.error(log, "Failed to send message: " + e, e);
                    }
                }
                else {
                    if (log.isInfoEnabled())
                        log.info("Established outgoing communication connection [locAddr=" + ses.localAddress() +
                            ", rmtAddr=" + ses.remoteAddress() + ']');
                }
            }

            @Override public void onDisconnected(GridNioSession ses, @Nullable Exception e) {
                ConnectionKey connId = ses.meta(CONN_IDX_META);

                if (connId != null) {
                    if (connId.dummy())
                        return;

                    UUID id = connId.nodeId();

                    GridCommunicationClient[] nodeClients = clients.get(id);

                    if (nodeClients != null) {
                        for (GridCommunicationClient client : nodeClients) {
                            if (client instanceof GridTcpNioCommunicationClient &&
                                ((GridTcpNioCommunicationClient)client).session() == ses) {
                                client.close();

                                removeNodeClient(id, client);
                            }
                        }
                    }

                    if (!stopping) {
                        GridNioRecoveryDescriptor outDesc = ses.outRecoveryDescriptor();

                        if (outDesc != null) {
                            if (outDesc.nodeAlive(getSpiContext().node(id))) {
                                if (!outDesc.messagesRequests().isEmpty()) {
                                    if (log.isDebugEnabled())
                                        log.debug("Session was closed but there are unacknowledged messages, " +
                                            "will try to reconnect [rmtNode=" + outDesc.node().id() + ']');

                                    DisconnectedSessionInfo disconnectData =
                                        new DisconnectedSessionInfo(outDesc, connId.connectionIndex());

                                    commWorker.addProcessDisconnectRequest(disconnectData);
                                }
                            }
                            else
                                outDesc.onNodeLeft();
                        }
                    }

                    CommunicationListener<Message> lsnr0 = lsnr;

                    if (lsnr0 != null)
                        lsnr0.onDisconnected(id);
                }
            }

            /**
             * @param ses Session.
             * @param msg Message.
             */
            private void onFirstMessage(final GridNioSession ses, Message msg) {
                UUID sndId;

                ConnectionKey connKey;

                if (msg instanceof NodeIdMessage) {
                    sndId = U.bytesToUuid(((NodeIdMessage)msg).nodeIdBytes(), 0);
                    connKey = new ConnectionKey(sndId, 0, -1);
                }
                else {
                    assert msg instanceof HandshakeMessage : msg;

                    HandshakeMessage msg0 = (HandshakeMessage)msg;

                    sndId = ((HandshakeMessage)msg).nodeId();
                    connKey = new ConnectionKey(sndId, msg0.connectionIndex(), msg0.connectCount());
                }

                if (log.isDebugEnabled())
                    log.debug("Remote node ID received: " + sndId);

                final ClusterNode rmtNode = getSpiContext().node(sndId);

                if (rmtNode == null) {
                    DiscoverySpi discoverySpi = ignite().configuration().getDiscoverySpi();

                    boolean unknownNode = true;

                    if (discoverySpi instanceof TcpDiscoverySpi) {
                        TcpDiscoverySpi tcpDiscoverySpi = (TcpDiscoverySpi)discoverySpi;

                        ClusterNode node0 = tcpDiscoverySpi.getNode0(sndId);

                        if (node0 != null) {
                            assert node0.isClient() : node0;

                            if (node0.version().compareTo(VERSION_SINCE_CLIENT_COULD_WAIT_TO_CONNECT) >= 0)
                                unknownNode = false;
                        }
                    }
                    else if (discoverySpi instanceof IgniteDiscoverySpi)
                        unknownNode = !((IgniteDiscoverySpi)discoverySpi).knownNode(sndId);

                    if (unknownNode) {
                        U.warn(log, "Close incoming connection, unknown node [nodeId=" + sndId + ", ses=" + ses + ']');

                        ses.send(new RecoveryLastReceivedMessage(UNKNOWN_NODE)).listen(new CI1<IgniteInternalFuture<?>>() {
                            @Override public void apply(IgniteInternalFuture<?> fut) {
                                ses.close();
                            }
                        });
                    }
                    else {
                        ses.send(new RecoveryLastReceivedMessage(NEED_WAIT)).listen(new CI1<IgniteInternalFuture<?>>() {
                            @Override public void apply(IgniteInternalFuture<?> fut) {
                                ses.close();
                            }
                        });
                    }

                    return;
                }

                ses.addMeta(CONSISTENT_ID_META, rmtNode.consistentId());

                final ConnectionKey old = ses.addMeta(CONN_IDX_META, connKey);

                assert old == null;

                ClusterNode locNode = getSpiContext().localNode();

                if (ses.remoteAddress() == null)
                    return;

                assert msg instanceof HandshakeMessage : msg;

                HandshakeMessage msg0 = (HandshakeMessage)msg;

                if (log.isDebugEnabled())
                    log.debug("Received handshake message [locNodeId=" + locNode.id() + ", rmtNodeId=" + sndId +
                        ", msg=" + msg0 + ']');

                if (isChannelConnIdx(msg0.connectionIndex()))
                    ses.send(new RecoveryLastReceivedMessage(0));
                else if (usePairedConnections(rmtNode)) {
                    final GridNioRecoveryDescriptor recoveryDesc = inRecoveryDescriptor(rmtNode, connKey);

                    ConnectClosureNew c = new ConnectClosureNew(ses, recoveryDesc, rmtNode);

                    boolean reserve = recoveryDesc.tryReserve(msg0.connectCount(), c);

                    if (reserve)
                        connectedNew(recoveryDesc, ses, true);
                    else {
                        if (c.failed) {
                            ses.send(new RecoveryLastReceivedMessage(ALREADY_CONNECTED));

                            closeStaleConnections(connKey);
                        }
                    }
                }
                else {
                    assert connKey.connectionIndex() >= 0 : connKey;

                    GridCommunicationClient[] curClients = clients.get(sndId);

                    GridCommunicationClient oldClient =
                        curClients != null && connKey.connectionIndex() < curClients.length ?
                            curClients[connKey.connectionIndex()] :
                            null;

                    boolean hasShmemClient = false;

                    if (oldClient != null) {
                        if (oldClient instanceof GridTcpNioCommunicationClient) {
                            if (log.isInfoEnabled())
                                log.info("Received incoming connection when already connected " +
                                    "to this node, rejecting [locNode=" + locNode.id() +
                                    ", rmtNode=" + sndId + ']');

                            ses.send(new RecoveryLastReceivedMessage(ALREADY_CONNECTED));

                            closeStaleConnections(connKey);

                            return;
                        }
                        else {
                            assert oldClient instanceof GridShmemCommunicationClient;

                            hasShmemClient = true;
                        }
                    }

                    GridFutureAdapter<GridCommunicationClient> fut = new GridFutureAdapter<>();

                    GridFutureAdapter<GridCommunicationClient> oldFut = clientFuts.putIfAbsent(connKey, fut);

                    final GridNioRecoveryDescriptor recoveryDesc = inRecoveryDescriptor(rmtNode, connKey);

                    if (oldFut == null) {
                        curClients = clients.get(sndId);

                        oldClient = curClients != null && connKey.connectionIndex() < curClients.length ?
                            curClients[connKey.connectionIndex()] : null;

                        if (oldClient != null) {
                            if (oldClient instanceof GridTcpNioCommunicationClient) {
                                assert oldClient.connectionIndex() == connKey.connectionIndex() : oldClient;

                                if (log.isInfoEnabled())
                                    log.info("Received incoming connection when already connected " +
                                        "to this node, rejecting [locNode=" + locNode.id() +
                                        ", rmtNode=" + sndId + ']');

                                ses.send(new RecoveryLastReceivedMessage(ALREADY_CONNECTED));

                                closeStaleConnections(connKey);

                                fut.onDone(oldClient);

                                return;
                            }
                            else {
                                assert oldClient instanceof GridShmemCommunicationClient;

                                hasShmemClient = true;
                            }
                        }

                        boolean reserved = recoveryDesc.tryReserve(msg0.connectCount(),
                            new ConnectClosure(ses, recoveryDesc, rmtNode, connKey, msg0, !hasShmemClient, fut));

                        if (log.isDebugEnabled())
                            log.debug("Received incoming connection from remote node " +
                                "[rmtNode=" + rmtNode.id() + ", reserved=" + reserved +
                                ", recovery=" + recoveryDesc + ']');

                        if (reserved) {
                            try {
                                GridTcpNioCommunicationClient client =
                                    connected(recoveryDesc, ses, rmtNode, msg0.received(), true, !hasShmemClient);

                                fut.onDone(client);
                            }
                            finally {
                                clientFuts.remove(connKey, fut);
                            }
                        }
                    }
                    else {
                        if (oldFut instanceof ConnectFuture && locNode.order() < rmtNode.order()) {
                            if (log.isInfoEnabled()) {
                                log.info("Received incoming connection from remote node while " +
                                    "connecting to this node, rejecting [locNode=" + locNode.id() +
                                    ", locNodeOrder=" + locNode.order() + ", rmtNode=" + rmtNode.id() +
                                    ", rmtNodeOrder=" + rmtNode.order() + ']');
                            }

                            ses.send(new RecoveryLastReceivedMessage(ALREADY_CONNECTED));
                        }
                        else {
                            // The code below causes a race condition between shmem and TCP (see IGNITE-1294)
                            boolean reserved = recoveryDesc.tryReserve(msg0.connectCount(),
                                new ConnectClosure(ses, recoveryDesc, rmtNode, connKey, msg0, !hasShmemClient, fut));

                            if (reserved)
                                connected(recoveryDesc, ses, rmtNode, msg0.received(), true, !hasShmemClient);
                        }
                    }
                }
            }

            /**
             * @param connKey Connection key.
             */
            private void closeStaleConnections(ConnectionKey connKey) {
                for (GridNioSession ses0 : nioSrvr.sessions()) {
                    ConnectionKey key0 = ses0.meta(CONN_IDX_META);

                    if (ses0.accepted() && key0 != null &&
                        key0.nodeId().equals(connKey.nodeId()) &&
                        key0.connectionIndex() == connKey.connectionIndex() &&
                        key0.connectCount() < connKey.connectCount())
                        ses0.close();
                }
            }

            @Override public void onMessageSent(GridNioSession ses, Message msg) {
                Object consistentId = ses.meta(CONSISTENT_ID_META);

                if (consistentId != null)
                    metricsLsnr.onMessageSent(msg, consistentId);
            }

            private void onChannelCreate(
                GridSelectorNioSessionImpl ses,
                ConnectionKey connKey,
                Message msg
            ) {
                cleanupLocalNodeRecoveryDescriptor(connKey);

                ses.send(msg)
                    .listen(sendFut -> {
                        if (sendFut.error() != null) {
                            U.error(log, "Fail to send channel creation response to the remote node. " +
                                "Session will be closed [nodeId=" + connKey.nodeId() +
                                ", idx=" + connKey.connectionIndex() + ']', sendFut.error());

                            ses.close();

                            return;
                        }

                        ses.closeSocketOnSessionClose(false);

                        // Close session and send response.
                        ses.close().listen(closeFut -> {
                            if (closeFut.error() != null) {
                                U.error(log, "Nio session has not been properly closed " +
                                        "[nodeId=" + connKey.nodeId() + ", idx=" + connKey.connectionIndex() + ']',
                                    closeFut.error());

                                U.closeQuiet(ses.key().channel());

                                return;
                            }

                            notifyChannelEvtListener(connKey.nodeId(), ses.key().channel(), msg);
                        });
                    });
            }

            @Override public void onMessage(final GridNioSession ses, Message msg) {
                MTC.span().addLog(() -> "Communication received");
                MTC.span().addTag(SpanTags.MESSAGE, () -> traceName(msg));

                ConnectionKey connKey = ses.meta(CONN_IDX_META);

                if (connKey == null) {
                    assert ses.accepted() : ses;

                    if (!connectGate.tryEnter()) {
                        if (log.isDebugEnabled())
                            log.debug("Close incoming connection, failed to enter gateway.");

                        ses.send(new RecoveryLastReceivedMessage(NODE_STOPPING)).listen(new CI1<IgniteInternalFuture<?>>() {
                            @Override public void apply(IgniteInternalFuture<?> fut) {
                                ses.close();
                            }
                        });

                        return;
                    }

                    try {
                        onFirstMessage(ses, msg);
                    }
                    finally {
                        connectGate.leave();
                    }
                }
                else {
                    Object consistentId = ses.meta(CONSISTENT_ID_META);

                    assert consistentId != null;

                    if (isChannelConnIdx(connKey.connectionIndex())) {
                        if (ses.meta(CHANNEL_FUT_META) == null)
                            onChannelCreate((GridSelectorNioSessionImpl)ses, connKey, msg);
                        else {
                            GridFutureAdapter<Channel> fut = ses.meta(CHANNEL_FUT_META);
                            GridSelectorNioSessionImpl ses0 = (GridSelectorNioSessionImpl)ses;

                            ses0.closeSocketOnSessionClose(false);

                            ses0.close().listen(f -> {
                                if (f.error() != null) {
                                    fut.onDone(f.error());

                                    return;
                                }

                                fut.onDone(ses0.key().channel());
                            });
                        }

                        return;
                    }

                    if (msg instanceof RecoveryLastReceivedMessage) {
                        metricsLsnr.onMessageReceived(msg, consistentId);

                        GridNioRecoveryDescriptor recovery = ses.outRecoveryDescriptor();

                        if (recovery != null) {
                            RecoveryLastReceivedMessage msg0 = (RecoveryLastReceivedMessage)msg;

                            if (log.isDebugEnabled()) {
                                log.debug("Received recovery acknowledgement [rmtNode=" + connKey.nodeId() +
                                    ", connIdx=" + connKey.connectionIndex() +
                                    ", rcvCnt=" + msg0.received() + ']');
                            }

                            recovery.ackReceived(msg0.received());
                        }

                        return;
                    }
                    else {
                        GridNioRecoveryDescriptor recovery = ses.inRecoveryDescriptor();

                        if (recovery != null) {
                            long rcvCnt = recovery.onReceived();

                            if (rcvCnt % ackSndThreshold == 0) {
                                if (log.isDebugEnabled()) {
                                    log.debug("Send recovery acknowledgement [rmtNode=" + connKey.nodeId() +
                                        ", connIdx=" + connKey.connectionIndex() +
                                        ", rcvCnt=" + rcvCnt + ']');
                                }

                                ses.systemMessage(new RecoveryLastReceivedMessage(rcvCnt));

                                recovery.lastAcknowledged(rcvCnt);
                            }
                        }
                        else if (connKey.dummy()) {
                            assert msg instanceof NodeIdMessage : msg;

                            TcpCommunicationNodeConnectionCheckFuture fut = ses.meta(SES_FUT_META);

                            assert fut != null : msg;

                            fut.onConnected(U.bytesToUuid(((NodeIdMessage)msg).nodeIdBytes(), 0));

                            nioSrvr.closeFromWorkerThread(ses);

                            return;
                        }
                    }

                    metricsLsnr.onMessageReceived(msg, consistentId);

                    IgniteRunnable c;

                    if (msgQueueLimit > 0) {
                        GridNioMessageTracker tracker = ses.meta(TRACKER_META);

                        if (tracker == null) {
                            GridNioMessageTracker old = ses.addMeta(TRACKER_META, tracker =
                                new GridNioMessageTracker(ses, msgQueueLimit));

                            assert old == null;
                        }

                        tracker.onMessageReceived();

                        c = tracker;
                    }
                    else
                        c = NOOP;

                    notifyListener(connKey.nodeId(), msg, c);
                }
            }

            /** {@inheritDoc} */
            @Override public void onFailure(FailureType failureType, Throwable failure) {
                if (ignite instanceof IgniteEx)
                    ((IgniteEx)ignite).context().failure().process(new FailureContext(failureType, failure));
            }

            /**
             * @param recovery Recovery descriptor.
             * @param ses Session.
             * @param node Node.
             * @param rcvCnt Number of received messages.
             * @param sndRes If {@code true} sends response for recovery handshake.
             * @param createClient If {@code true} creates NIO communication client.
             * @return Client.
             */
            private GridTcpNioCommunicationClient connected(
                GridNioRecoveryDescriptor recovery,
                GridNioSession ses,
                ClusterNode node,
                long rcvCnt,
                boolean sndRes,
                boolean createClient) {
                ConnectionKey connKey = ses.meta(CONN_IDX_META);

                assert connKey != null && connKey.connectionIndex() >= 0 : connKey;
                assert !usePairedConnections(node);

                recovery.onHandshake(rcvCnt);

                ses.inRecoveryDescriptor(recovery);
                ses.outRecoveryDescriptor(recovery);

                nioSrvr.resend(ses);

                try {
                    if (sndRes)
                        nioSrvr.sendSystem(ses, new RecoveryLastReceivedMessage(recovery.received()));
                }
                catch (IgniteCheckedException e) {
                    U.error(log, "Failed to send message: " + e, e);
                }

                recovery.onConnected();

                GridTcpNioCommunicationClient client = null;

                if (createClient) {
                    client = new GridTcpNioCommunicationClient(connKey.connectionIndex(), ses, log);

                    addNodeClient(node, connKey.connectionIndex(), client);
                }

                return client;
            }

            /**
             * @param recovery Recovery descriptor.
             * @param ses Session.
             * @param sndRes If {@code true} sends response for recovery handshake.
             */
            private void connectedNew(
                GridNioRecoveryDescriptor recovery,
                GridNioSession ses,
                boolean sndRes) {
                try {
                    ses.inRecoveryDescriptor(recovery);

                    if (sndRes)
                        nioSrvr.sendSystem(ses, new RecoveryLastReceivedMessage(recovery.received()));

                    recovery.onConnected();
                }
                catch (IgniteCheckedException e) {
                    U.error(log, "Failed to send message: " + e, e);
                }
            }

            /**
             *
             */
            class ConnectClosureNew implements IgniteInClosure<Boolean> {
                /** */
                private static final long serialVersionUID = 0L;

                /** */
                private final GridNioSession ses;

                /** */
                private final GridNioRecoveryDescriptor recoveryDesc;

                /** */
                private final ClusterNode rmtNode;

                /** */
                private boolean failed;

                /**
                 * @param ses Incoming session.
                 * @param recoveryDesc Recovery descriptor.
                 * @param rmtNode Remote node.
                 */
                ConnectClosureNew(GridNioSession ses,
                    GridNioRecoveryDescriptor recoveryDesc,
                    ClusterNode rmtNode) {
                    this.ses = ses;
                    this.recoveryDesc = recoveryDesc;
                    this.rmtNode = rmtNode;
                }

                /** {@inheritDoc} */
                @Override public void apply(Boolean success) {
                    try {
                        failed = !success;

                        if (success) {
                            IgniteInClosure<IgniteInternalFuture<?>> lsnr = new IgniteInClosure<IgniteInternalFuture<?>>() {
                                @Override public void apply(IgniteInternalFuture<?> msgFut) {
                                    try {
                                        msgFut.get();

                                        connectedNew(recoveryDesc, ses, false);
                                    }
                                    catch (IgniteCheckedException e) {
                                        if (log.isDebugEnabled())
                                            log.debug("Failed to send recovery handshake " +
                                                "[rmtNode=" + rmtNode.id() + ", err=" + e + ']');

                                        recoveryDesc.release();
                                    }
                                }
                            };

                            nioSrvr.sendSystem(ses, new RecoveryLastReceivedMessage(recoveryDesc.received()), lsnr);
                        }
                        else
                            nioSrvr.sendSystem(ses, new RecoveryLastReceivedMessage(ALREADY_CONNECTED));
                    }
                    catch (IgniteCheckedException e) {
                        U.error(log, "Failed to send message: " + e, e);
                    }
                }
            }

            /**
             *
             */
            @SuppressWarnings("PackageVisibleInnerClass")
            class ConnectClosure implements IgniteInClosure<Boolean> {
                /** */
                private static final long serialVersionUID = 0L;

                /** */
                private final GridNioSession ses;

                /** */
                private final GridNioRecoveryDescriptor recoveryDesc;

                /** */
                private final ClusterNode rmtNode;

                /** */
                private final HandshakeMessage msg;

                /** */
                private final GridFutureAdapter<GridCommunicationClient> fut;

                /** */
                private final boolean createClient;

                /** */
                private final ConnectionKey connKey;

                /**
                 * @param ses Incoming session.
                 * @param recoveryDesc Recovery descriptor.
                 * @param rmtNode Remote node.
                 * @param connKey Connection key.
                 * @param msg Handshake message.
                 * @param createClient If {@code true} creates NIO communication client..
                 * @param fut Connect future.
                 */
                ConnectClosure(GridNioSession ses,
                    GridNioRecoveryDescriptor recoveryDesc,
                    ClusterNode rmtNode,
                    ConnectionKey connKey,
                    HandshakeMessage msg,
                    boolean createClient,
                    GridFutureAdapter<GridCommunicationClient> fut) {
                    this.ses = ses;
                    this.recoveryDesc = recoveryDesc;
                    this.rmtNode = rmtNode;
                    this.connKey = connKey;
                    this.msg = msg;
                    this.createClient = createClient;
                    this.fut = fut;
                }

                /** {@inheritDoc} */
                @Override public void apply(Boolean success) {
                    if (success) {
                        try {
                            IgniteInClosure<IgniteInternalFuture<?>> lsnr = new IgniteInClosure<IgniteInternalFuture<?>>() {
                                @Override public void apply(IgniteInternalFuture<?> msgFut) {
                                    try {
                                        msgFut.get();

                                        GridTcpNioCommunicationClient client =
                                            connected(recoveryDesc, ses, rmtNode, msg.received(), false, createClient);

                                        fut.onDone(client);
                                    }
                                    catch (IgniteCheckedException e) {
                                        if (log.isDebugEnabled())
                                            log.debug("Failed to send recovery handshake " +
                                                "[rmtNode=" + rmtNode.id() + ", err=" + e + ']');

                                        recoveryDesc.release();

                                        fut.onDone();
                                    }
                                    finally {
                                        clientFuts.remove(connKey, fut);
                                    }
                                }
                            };

                            nioSrvr.sendSystem(ses, new RecoveryLastReceivedMessage(recoveryDesc.received()), lsnr);
                        }
                        catch (IgniteCheckedException e) {
                            U.error(log, "Failed to send message: " + e, e);
                        }
                    }
                    else {
                        try {
                            fut.onDone();
                        }
                        finally {
                            clientFuts.remove(connKey, fut);
                        }
                    }
                }
            }
        };

    /** Logger. */
    @LoggerResource
    private IgniteLogger log;

    /** Logger. */
    @LoggerResource(categoryName = "org.apache.ignite.internal.diagnostic")
    private IgniteLogger diagnosticLog;

    /** Local IP address. */
    private String locAddr;

    /** Complex variable that represents this node IP address. */
    private volatile InetAddress locHost;

    /** Local port which node uses. */
    private int locPort = DFLT_PORT;

    /** Local port range. */
    private int locPortRange = DFLT_PORT_RANGE;

    /** Local port which node uses to accept shared memory connections. */
    private int shmemPort = DFLT_SHMEM_PORT;

    /** Allocate direct buffer or heap buffer. */
    private boolean directBuf = true;

    /** Allocate direct buffer or heap buffer. */
    private boolean directSndBuf;

    /** Idle connection timeout. */
    private long idleConnTimeout = DFLT_IDLE_CONN_TIMEOUT;

    /** Connect timeout. */
    private long connTimeout = DFLT_CONN_TIMEOUT;

    /** Maximum connect timeout. */
    private long maxConnTimeout = DFLT_MAX_CONN_TIMEOUT;

    /** Reconnect attempts count. */
    private int reconCnt = DFLT_RECONNECT_CNT;

    /** Socket send buffer. */
    private int sockSndBuf = DFLT_SOCK_BUF_SIZE;

    /** Socket receive buffer. */
    private int sockRcvBuf = DFLT_SOCK_BUF_SIZE;

    /** Message queue limit. */
    private int msgQueueLimit = DFLT_MSG_QUEUE_LIMIT;

    /** Slow client queue limit. */
    private int slowClientQueueLimit;

    /** NIO server. */
    private GridNioServer<Message> nioSrvr;

    /** Shared memory server. */
    private IpcSharedMemoryServerEndpoint shmemSrv;

    /** */
    private boolean usePairedConnections;

    /** */
    private int connectionsPerNode = DFLT_CONN_PER_NODE;

    /** {@code TCP_NODELAY} option value for created sockets. */
    private boolean tcpNoDelay = DFLT_TCP_NODELAY;

    /** {@code FILTER_REACHABLE_ADDRESSES} option value for created sockets. */
    private boolean filterReachableAddresses = DFLT_FILTER_REACHABLE_ADDRESSES;

    /** Number of received messages after which acknowledgment is sent. */
    private int ackSndThreshold = DFLT_ACK_SND_THRESHOLD;

    /** Maximum number of unacknowledged messages. */
    private int unackedMsgsBufSize;

    /** Socket write timeout. */
    private long sockWriteTimeout = DFLT_SOCK_WRITE_TIMEOUT;

    /** Recovery and idle clients handler. */
    private CommunicationWorker commWorker;

    /** Shared memory accept worker. */
    private ShmemAcceptWorker shmemAcceptWorker;

    /** Shared memory workers. */
    private final Collection<ShmemWorker> shmemWorkers = new ConcurrentLinkedDeque<>();

    /** Clients. */
    private final ConcurrentMap<UUID, GridCommunicationClient[]> clients = GridConcurrentFactory.newMap();

    /** SPI listener. */
    private volatile CommunicationListener<Message> lsnr;

    /** Bound port. */
    private int boundTcpPort = -1;

    /** Bound port for shared memory server. */
    private int boundTcpShmemPort = -1;

    /** Count of selectors to use in TCP server. */
    private int selectorsCnt = DFLT_SELECTORS_CNT;

    /**
     * Defines how many non-blocking {@code selector.selectNow()} should be made before
     * falling into {@code selector.select(long)} in NIO server. Long value. Default is {@code 0}.
     * Can be set to {@code Long.MAX_VALUE} so selector threads will never block.
     */
    private long selectorSpins = IgniteSystemProperties.getLong("IGNITE_SELECTOR_SPINS", 0L);

    /** Address resolver. */
    private AddressResolver addrRslvr;

    /** Context initialization latch. */
    private final CountDownLatch ctxInitLatch = new CountDownLatch(1);

    /** Stopping flag (set to {@code true} when SPI gets stopping signal). */
    private volatile boolean stopping;

    /** Statistics. */
    private TcpCommunicationMetricsListener metricsLsnr;

    /** Client connect futures. */
    private final ConcurrentMap<ConnectionKey, GridFutureAdapter<GridCommunicationClient>> clientFuts =
        GridConcurrentFactory.newMap();

    /** */
    private final ConcurrentMap<ConnectionKey, GridNioRecoveryDescriptor> recoveryDescs = GridConcurrentFactory.newMap();

    /** */
    private final ConcurrentMap<ConnectionKey, GridNioRecoveryDescriptor> outRecDescs = GridConcurrentFactory.newMap();

    /** */
    private final ConcurrentMap<ConnectionKey, GridNioRecoveryDescriptor> inRecDescs = GridConcurrentFactory.newMap();

    /** */
    private final GridLocalEventListener discoLsnr = new DiscoveryListener();

    /** Tracing. */
    protected Tracing tracing;

    /**
     * @return {@code True} if ssl enabled.
     */
    private boolean isSslEnabled() {
        return ignite.configuration().getSslContextFactory() != null;
    }

    /**
     * Sets address resolver.
     *
     * @param addrRslvr Address resolver.
     * @return {@code this} for chaining.
     */
    @IgniteSpiConfiguration(optional = true)
    public TcpCommunicationSpi setAddressResolver(AddressResolver addrRslvr) {
        // Injection should not override value already set by Spring or user.
        if (this.addrRslvr == null)
            this.addrRslvr = addrRslvr;

        return this;
    }

    /**
     * See {@link #setAddressResolver(AddressResolver)}.
     *
     * @return Address resolver.
     */
    public AddressResolver getAddressResolver() {
        return addrRslvr;
    }

    /**
     * Injects resources.
     *
     * @param ignite Ignite.
     */
    @IgniteInstanceResource
    @Override protected void injectResources(Ignite ignite) {
        super.injectResources(ignite);

        if (ignite != null) {
            setAddressResolver(ignite.configuration().getAddressResolver());
            setLocalAddress(ignite.configuration().getLocalHost());
            tracing = ignite instanceof IgniteEx ? ((IgniteEx)ignite).context().tracing() : new NoopTracing();
        }
    }

    /**
     * Sets local host address for socket binding. Note that one node could have
     * additional addresses beside the loopback one. This configuration
     * parameter is optional.
     *
     * @param locAddr IP address. Default value is any available local
     *      IP address.
     * @return {@code this} for chaining.
     */
    @IgniteSpiConfiguration(optional = true)
    public TcpCommunicationSpi setLocalAddress(String locAddr) {
        // Injection should not override value already set by Spring or user.
        if (this.locAddr == null)
            this.locAddr = locAddr;

        return this;
    }

    /**
     * See {@link #setLocalAddress(String)}.
     *
     * @return Grid node IP address.
     */
    public String getLocalAddress() {
        return locAddr;
    }

    /**
     * Sets local port for socket binding.
     * <p>
     * If not provided, default value is {@link #DFLT_PORT}.
     *
     * @param locPort Port number.
     * @return {@code this} for chaining.
     */
    @IgniteSpiConfiguration(optional = true)
    public TcpCommunicationSpi setLocalPort(int locPort) {
        this.locPort = locPort;

        return this;
    }

    /**
     * See {@link #setLocalPort(int)}.
     *
     * @return Port number.
     */
    public int getLocalPort() {
        return locPort;
    }

    /**
     * Sets local port range for local host ports (value must greater than or equal to <tt>0</tt>).
     * If provided local port (see {@link #setLocalPort(int)}} is occupied,
     * implementation will try to increment the port number for as long as it is less than
     * initial value plus this range.
     * <p>
     * If port range value is <tt>0</tt>, then implementation will try bind only to the port provided by
     * {@link #setLocalPort(int)} method and fail if binding to this port did not succeed.
     * <p>
     * Local port range is very useful during development when more than one grid nodes need to run
     * on the same physical machine.
     * <p>
     * If not provided, default value is {@link #DFLT_PORT_RANGE}.
     *
     * @param locPortRange New local port range.
     * @return {@code this} for chaining.
     */
    @IgniteSpiConfiguration(optional = true)
    public TcpCommunicationSpi setLocalPortRange(int locPortRange) {
        this.locPortRange = locPortRange;

        return this;
    }

    /**
     * See {@link #setLocalPortRange(int)}.
     *
     * @return Local Port range.
     */
    public int getLocalPortRange() {
        return locPortRange;
    }

    /**
     * See {@link #setUsePairedConnections(boolean)}.
     *
     * @return {@code true} to use paired connections and {@code false} otherwise.
     */
    public boolean isUsePairedConnections() {
        return usePairedConnections;
    }

    /**
     * Set this to {@code true} if {@code TcpCommunicationSpi} should
     * maintain connection for outgoing and incoming messages separately.
     * In this case total number of connections between local and each remote node
     * is {@link #getConnectionsPerNode()} * 2.
     * <p>
     * Set this to {@code false} if each connection of {@link #getConnectionsPerNode()}
     * should be used for outgoing and incoming messages. In this case total number
     * of connections between local and each remote node is {@link #getConnectionsPerNode()}.
     * <p>
     * Default is {@code false}.
     *
     * @param usePairedConnections {@code true} to use paired connections and {@code false} otherwise.
     * @return {@code this} for chaining.
     * @see #getConnectionsPerNode()
     */
    @IgniteSpiConfiguration(optional = true)
    public TcpCommunicationSpi setUsePairedConnections(boolean usePairedConnections) {
        this.usePairedConnections = usePairedConnections;

        return this;
    }

    /**
     * Sets number of connections to each remote node. if {@link #isUsePairedConnections()}
     * is {@code true} then number of connections is doubled and half is used for incoming and
     * half for outgoing messages.
     *
     * @param maxConnectionsPerNode Number of connections per node.
     * @return {@code this} for chaining.
     * @see #isUsePairedConnections()
     */
    @IgniteSpiConfiguration(optional = true)
    public TcpCommunicationSpi setConnectionsPerNode(int maxConnectionsPerNode) {
        this.connectionsPerNode = maxConnectionsPerNode;

        return this;
    }

    /**
     * See {@link #setConnectionsPerNode(int)}.
     *
     * @return Number of connections per node.
     */
    public int getConnectionsPerNode() {
        return connectionsPerNode;
    }

    /**
     * Sets local port to accept shared memory connections.
     * <p>
     * If set to {@code -1} shared memory communication will be disabled.
     * <p>
     * If not provided, default value is {@link #DFLT_SHMEM_PORT}.
     *
     * @param shmemPort Port number.
     * @return {@code this} for chaining.
     */
    @IgniteSpiConfiguration(optional = true)
    public TcpCommunicationSpi setSharedMemoryPort(int shmemPort) {
        this.shmemPort = shmemPort;

        return this;
    }

    /**
     * See {@link #setSharedMemoryPort(int)}.
     *
     * @return Port number.
     */
    public int getSharedMemoryPort() {
        return shmemPort;
    }

    /**
     * Sets maximum idle connection timeout upon which a connection
     * to client will be closed.
     * <p>
     * If not provided, default value is {@link #DFLT_IDLE_CONN_TIMEOUT}.
     *
     * @param idleConnTimeout Maximum idle connection time.
     * @return {@code this} for chaining.
     */
    @IgniteSpiConfiguration(optional = true)
    public TcpCommunicationSpi setIdleConnectionTimeout(long idleConnTimeout) {
        this.idleConnTimeout = idleConnTimeout;

        return this;
    }

    /**
     * See {@link #setIdleConnectionTimeout(long)}.
     *
     * @return Maximum idle connection time.
     */
    public long getIdleConnectionTimeout() {
        return idleConnTimeout;
    }

    /**
     * See {@link #setSocketWriteTimeout(long)}.
     *
     * @return Socket write timeout for TCP connections.
     */
    public long getSocketWriteTimeout() {
        return sockWriteTimeout;
    }

    /**
     * Sets socket write timeout for TCP connection. If message can not be written to
     * socket within this time then connection is closed and reconnect is attempted.
     * <p>
     * Default to {@link #DFLT_SOCK_WRITE_TIMEOUT}.
     *
     * @param sockWriteTimeout Socket write timeout for TCP connection.
     * @return {@code this} for chaining.
     */
    @IgniteSpiConfiguration(optional = true)
    public TcpCommunicationSpi setSocketWriteTimeout(long sockWriteTimeout) {
        this.sockWriteTimeout = sockWriteTimeout;

        return this;
    }

    /**
     * See {@link #setAckSendThreshold(int)}.
     *
     * @return Number of received messages after which acknowledgment is sent.
     */
    public int getAckSendThreshold() {
        return ackSndThreshold;
    }

    /**
     * Sets number of received messages per connection to node after which acknowledgment message is sent.
     * <p>
     * Default to {@link #DFLT_ACK_SND_THRESHOLD}.
     *
     * @param ackSndThreshold Number of received messages after which acknowledgment is sent.
     * @return {@code this} for chaining.
     */
    @IgniteSpiConfiguration(optional = true)
    public TcpCommunicationSpi setAckSendThreshold(int ackSndThreshold) {
        this.ackSndThreshold = ackSndThreshold;

        return this;
    }

    /**
     * See {@link #setUnacknowledgedMessagesBufferSize(int)}.
     *
     * @return Maximum number of unacknowledged messages.
     */
    public int getUnacknowledgedMessagesBufferSize() {
        return unackedMsgsBufSize;
    }

    /**
     * Sets maximum number of stored unacknowledged messages per connection to node.
     * If number of unacknowledged messages exceeds this number then connection to node is
     * closed and reconnect is attempted.
     *
     * @param unackedMsgsBufSize Maximum number of unacknowledged messages.
     * @return {@code this} for chaining.
     */
    @IgniteSpiConfiguration(optional = true)
    public TcpCommunicationSpi setUnacknowledgedMessagesBufferSize(int unackedMsgsBufSize) {
        this.unackedMsgsBufSize = unackedMsgsBufSize;

        return this;
    }

    /**
     * Sets connect timeout used when establishing connection
     * with remote nodes.
     * <p>
     * {@code 0} is interpreted as infinite timeout.
     * <p>
     * If not provided, default value is {@link #DFLT_CONN_TIMEOUT}.
     * <p>
     * When this property is explicitly set {@link IgniteConfiguration#getFailureDetectionTimeout()} is ignored.
     *
     * @param connTimeout Connect timeout.
     * @return {@code this} for chaining.
     */
    @IgniteSpiConfiguration(optional = true)
    public TcpCommunicationSpi setConnectTimeout(long connTimeout) {
        this.connTimeout = connTimeout;

        failureDetectionTimeoutEnabled(false);

        return this;
    }

    /**
     * See {@link #setConnectTimeout(long)}.
     *
     * @return Connect timeout.
     */
    public long getConnectTimeout() {
        return connTimeout;
    }

    /**
     * Sets maximum connect timeout. If handshake is not established within connect timeout,
     * then SPI tries to repeat handshake procedure with increased connect timeout.
     * Connect timeout can grow till maximum timeout value,
     * if maximum timeout value is reached then the handshake is considered as failed.
     * <p>
     * {@code 0} is interpreted as infinite timeout.
     * <p>
     * If not provided, default value is {@link #DFLT_MAX_CONN_TIMEOUT}.
     * <p>
     * When this property is explicitly set {@link IgniteConfiguration#getFailureDetectionTimeout()} is ignored.
     *
     * @param maxConnTimeout Maximum connect timeout.
     * @return {@code this} for chaining.
     */
    @IgniteSpiConfiguration(optional = true)
    public TcpCommunicationSpi setMaxConnectTimeout(long maxConnTimeout) {
        this.maxConnTimeout = maxConnTimeout;

        failureDetectionTimeoutEnabled(false);

        return this;
    }

    /**
     * Gets maximum connect timeout.
     *
     * @return Maximum connect timeout.
     */
    public long getMaxConnectTimeout() {
        return maxConnTimeout;
    }

    /**
     * Sets maximum number of reconnect attempts used when establishing connection
     * with remote nodes.
     * <p>
     * If not provided, default value is {@link #DFLT_RECONNECT_CNT}.
     * <p>
     * When this property is explicitly set {@link IgniteConfiguration#getFailureDetectionTimeout()} is ignored.
     *
     * @param reconCnt Maximum number of reconnection attempts.
     * @return {@code this} for chaining.
     */
    @IgniteSpiConfiguration(optional = true)
    public TcpCommunicationSpi setReconnectCount(int reconCnt) {
        this.reconCnt = reconCnt;

        failureDetectionTimeoutEnabled(false);

        return this;
    }

    /**
     * Gets maximum number of reconnect attempts used when establishing connection
     * with remote nodes.
     *
     * @return Reconnects count.
     */
    public int getReconnectCount() {
        return reconCnt;
    }

    /**
     * Sets flag to allocate direct or heap buffer in SPI.
     * If value is {@code true}, then SPI will use {@link ByteBuffer#allocateDirect(int)} call.
     * Otherwise, SPI will use {@link ByteBuffer#allocate(int)} call.
     * <p>
     * If not provided, default value is {@code true}.
     *
     * @param directBuf Flag indicates to allocate direct or heap buffer in SPI.
     * @return {@code this} for chaining.
     */
    @IgniteSpiConfiguration(optional = true)
    public TcpCommunicationSpi setDirectBuffer(boolean directBuf) {
        this.directBuf = directBuf;

        return this;
    }

    /**
     * Gets flag that indicates whether direct or heap allocated buffer is used.
     *
     * @return Flag that indicates whether direct or heap allocated buffer is used.
     */
    public boolean isDirectBuffer() {
        return directBuf;
    }

    /**
     * Gets flag defining whether direct send buffer should be used.
     *
     * @return {@code True} if direct buffers should be used.
     */
    public boolean isDirectSendBuffer() {
        return directSndBuf;
    }

    /**
     * Sets whether to use direct buffer for sending.
     *
     * If not provided default is {@code false}.
     *
     * @param directSndBuf {@code True} to use direct buffers for send.
     * @return {@code this} for chaining.
     */
    @IgniteSpiConfiguration(optional = true)
    public TcpCommunicationSpi setDirectSendBuffer(boolean directSndBuf) {
        this.directSndBuf = directSndBuf;

        return this;
    }

    /**
     * Sets the count of selectors te be used in TCP server.
     * <p/>
     * If not provided, default value is {@link #DFLT_SELECTORS_CNT}.
     *
     * @param selectorsCnt Selectors count.
     * @return {@code this} for chaining.
     */
    @IgniteSpiConfiguration(optional = true)
    public TcpCommunicationSpi setSelectorsCount(int selectorsCnt) {
        this.selectorsCnt = selectorsCnt;

        return this;
    }

    /**
     * See {@link #setSelectorsCount(int)}.
     *
     * @return Count of selectors in TCP server.
     */
    public int getSelectorsCount() {
        return selectorsCnt;
    }

    /**
     * See {@link #setSelectorSpins(long)}.
     *
     * @return Selector thread busy-loop iterations.
     */
    public long getSelectorSpins() {
        return selectorSpins;
    }

    /**
     * Defines how many non-blocking {@code selector.selectNow()} should be made before
     * falling into {@code selector.select(long)} in NIO server. Long value. Default is {@code 0}.
     * Can be set to {@code Long.MAX_VALUE} so selector threads will never block.
     *
     * @param selectorSpins Selector thread busy-loop iterations.
     * @return {@code this} for chaining.
     */
    @IgniteSpiConfiguration(optional = true)
    public TcpCommunicationSpi setSelectorSpins(long selectorSpins) {
        this.selectorSpins = selectorSpins;

        return this;
    }

    /**
     * Sets value for {@code TCP_NODELAY} socket option. Each
     * socket will be opened using provided value.
     * <p>
     * Setting this option to {@code true} disables Nagle's algorithm
     * for socket decreasing latency and delivery time for small messages.
     * <p>
     * For systems that work under heavy network load it is advisable to
     * set this value to {@code false}.
     * <p>
     * If not provided, default value is {@link #DFLT_TCP_NODELAY}.
     *
     * @param tcpNoDelay {@code True} to disable TCP delay.
     * @return {@code this} for chaining.
     */
    @IgniteSpiConfiguration(optional = true)
    public TcpCommunicationSpi setTcpNoDelay(boolean tcpNoDelay) {
        this.tcpNoDelay = tcpNoDelay;

        return this;
    }

    /**
     * Gets value for {@code TCP_NODELAY} socket option.
     *
     * @return {@code True} if TCP delay is disabled.
     */
    public boolean isTcpNoDelay() {
        return tcpNoDelay;
    }

    /**
     * Gets value for {@code FILTER_REACHABLE_ADDRESSES} socket option.
     *
     * @return {@code True} if needed to filter reachable addresses.
     */
    public boolean isFilterReachableAddresses() {
        return filterReachableAddresses;
    }

    /**
     * Setting this option to {@code true} enables filter for reachable
     * addresses on creating tcp client.
     * <p>
     * Usually its advised to set this value to {@code false}.
     * <p>
     * If not provided, default value is {@link #DFLT_FILTER_REACHABLE_ADDRESSES}.
     *
     * @param filterReachableAddresses {@code True} to filter reachable addresses.
     * @return {@code this} for chaining.
     */
    @IgniteSpiConfiguration(optional = true)
    public TcpCommunicationSpi setFilterReachableAddresses(boolean filterReachableAddresses) {
        this.filterReachableAddresses = filterReachableAddresses;

        return this;
    }

    /**
     * Sets receive buffer size for sockets created or accepted by this SPI.
     * <p>
     * If not provided, default is {@link #DFLT_SOCK_BUF_SIZE}.
     *
     * @param sockRcvBuf Socket receive buffer size.
     * @return {@code this} for chaining.
     */
    @IgniteSpiConfiguration(optional = true)
    public TcpCommunicationSpi setSocketReceiveBuffer(int sockRcvBuf) {
        this.sockRcvBuf = sockRcvBuf;

        return this;
    }

    /**
     * See {@link #setSocketReceiveBuffer(int)}.
     *
     * @return Socket receive buffer size.
     */
    public int getSocketReceiveBuffer() {
        return sockRcvBuf;
    }

    /**
     * Sets send buffer size for sockets created or accepted by this SPI.
     * <p>
     * If not provided, default is {@link #DFLT_SOCK_BUF_SIZE}.
     *
     * @param sockSndBuf Socket send buffer size.
     * @return {@code this} for chaining.
     */
    @IgniteSpiConfiguration(optional = true)
    public TcpCommunicationSpi setSocketSendBuffer(int sockSndBuf) {
        this.sockSndBuf = sockSndBuf;

        return this;
    }

    /**
     * See {@link #setSocketSendBuffer(int)}.
     *
     * @return Socket send buffer size.
     */
    public int getSocketSendBuffer() {
        return sockSndBuf;
    }

    /**
     * Sets message queue limit for incoming and outgoing messages.
     * <p>
     * When set to positive number send queue is limited to the configured value.
     * {@code 0} disables the size limitations.
     * <p>
     * If not provided, default is {@link #DFLT_MSG_QUEUE_LIMIT}.
     *
     * @param msgQueueLimit Send queue size limit.
     * @return {@code this} for chaining.
     */
    @IgniteSpiConfiguration(optional = true)
    public TcpCommunicationSpi setMessageQueueLimit(int msgQueueLimit) {
        this.msgQueueLimit = msgQueueLimit;

        return this;
    }

    /**
     * Gets message queue limit for incoming and outgoing messages.
     *
     * @return Send queue size limit.
     */
    public int getMessageQueueLimit() {
        return msgQueueLimit;
    }

    /**
     * See {@link #setSlowClientQueueLimit(int)}.
     *
     * @return Slow client queue limit.
     */
    public int getSlowClientQueueLimit() {
        return slowClientQueueLimit;
    }

    /**
     * Sets slow client queue limit.
     * <p/>
     * When set to a positive number, communication SPI will monitor clients outbound message queue sizes and will drop
     * those clients whose queue exceeded this limit.
     * <p/>
     * This value should be set to less or equal value than {@link #getMessageQueueLimit()} which controls
     * message back-pressure for server nodes. The default value for this parameter is {@code 0}
     * which means {@code unlimited}.
     *
     * @param slowClientQueueLimit Slow client queue limit.
     * @return {@code this} for chaining.
     */
    @IgniteSpiConfiguration(optional = true)
    public TcpCommunicationSpi setSlowClientQueueLimit(int slowClientQueueLimit) {
        this.slowClientQueueLimit = slowClientQueueLimit;

        return this;
    }

    /** {@inheritDoc} */
    @Override public void setListener(CommunicationListener<Message> lsnr) {
        this.lsnr = lsnr;
    }

    /**
     * @return Listener.
     */
    public CommunicationListener getListener() {
        return lsnr;
    }

    /** {@inheritDoc} */
    @Override public int getSentMessagesCount() {
        // Listener could be not initialized yet, but discovery thread could try to aggregate metrics.
        if (metricsLsnr == null)
            return 0;

        return metricsLsnr.sentMessagesCount();
    }

    /** {@inheritDoc} */
    @Override public long getSentBytesCount() {
        // Listener could be not initialized yet, but discovery thread clould try to aggregate metrics.
        if (metricsLsnr == null)
            return 0;

        return metricsLsnr.sentBytesCount();
    }

    /** {@inheritDoc} */
    @Override public int getReceivedMessagesCount() {
        // Listener could be not initialized yet, but discovery thread could try to aggregate metrics.
        if (metricsLsnr == null)
            return 0;

        return metricsLsnr.receivedMessagesCount();
    }

    /** {@inheritDoc} */
    @Override public long getReceivedBytesCount() {
        // Listener could be not initialized yet, but discovery thread could try to aggregate metrics.
        if (metricsLsnr == null)
            return 0;

        return metricsLsnr.receivedBytesCount();
    }

    /**
     * Gets received messages counts (grouped by type).
     *
     * @return Map containing message types and respective counts.
     */
    public Map<String, Long> getReceivedMessagesByType() {
        return metricsLsnr.receivedMessagesByType();
    }

    /**
     * Gets received messages counts (grouped by node).
     *
     * @return Map containing sender nodes and respective counts.
     */
    public Map<UUID, Long> getReceivedMessagesByNode() {
        return metricsLsnr.receivedMessagesByNode();
    }

    /**
     * Gets sent messages counts (grouped by type).
     *
     * @return Map containing message types and respective counts.
     */
    public Map<String, Long> getSentMessagesByType() {
        return metricsLsnr.sentMessagesByType();
    }

    /**
     * Gets sent messages counts (grouped by node).
     *
     * @return Map containing receiver nodes and respective counts.
     */
    public Map<UUID, Long> getSentMessagesByNode() {
        return metricsLsnr.sentMessagesByNode();
    }

    /** {@inheritDoc} */
    @Override public int getOutboundMessagesQueueSize() {
        GridNioServer<Message> srv = nioSrvr;

        return srv != null ? srv.outboundMessagesQueueSize() : 0;
    }

    /** {@inheritDoc} */
    @Override public void resetMetrics() {
        metricsLsnr.resetMetrics();
    }

    /**
     * @param nodeId Target node ID.
     * @return Future.
     */
    public IgniteInternalFuture<String> dumpNodeStatistics(final UUID nodeId) {
        StringBuilder sb = new StringBuilder("Communication SPI statistics [rmtNode=").append(nodeId).append(']').append(U.nl());

        dumpInfo(sb, nodeId);

        GridNioServer<Message> nioSrvr = this.nioSrvr;

        if (nioSrvr != null) {
            sb.append("NIO sessions statistics:");

            IgnitePredicate<GridNioSession> p = new IgnitePredicate<GridNioSession>() {
                @Override public boolean apply(GridNioSession ses) {
                    ConnectionKey connId = ses.meta(CONN_IDX_META);

                    return connId != null && nodeId.equals(connId.nodeId());
                }
            };

            return nioSrvr.dumpStats(sb.toString(), p);
        }
        else {
            sb.append(U.nl()).append("GridNioServer is null.");

            return new GridFinishedFuture<>(sb.toString());
        }
    }

    /**
     * Dumps SPI per-connection stats to logs.
     */
    public void dumpStats() {
        final IgniteLogger log = this.diagnosticLog;

        if (log != null) {
            StringBuilder sb = new StringBuilder();

            dumpInfo(sb, null);

            U.warn(log, sb.toString());

            GridNioServer<Message> nioSrvr = this.nioSrvr;

            if (nioSrvr != null) {
                nioSrvr.dumpStats().listen(new CI1<IgniteInternalFuture<String>>() {
                    @Override public void apply(IgniteInternalFuture<String> fut) {
                        try {
                            U.warn(log, fut.get());
                        }
                        catch (Exception e) {
                            U.error(log, "Failed to dump NIO server statistics: " + e, e);
                        }
                    }
                });
            }
        }
    }

    /**
     * @param sb Message builder.
     * @param dstNodeId Target node ID.
     */
    private void dumpInfo(StringBuilder sb, UUID dstNodeId) {
        sb.append("Communication SPI recovery descriptors: ").append(U.nl());

        for (Map.Entry<ConnectionKey, GridNioRecoveryDescriptor> entry : recoveryDescs.entrySet()) {
            GridNioRecoveryDescriptor desc = entry.getValue();

            if (dstNodeId != null && !dstNodeId.equals(entry.getKey().nodeId()))
                continue;

            sb.append("    [key=").append(entry.getKey())
                .append(", msgsSent=").append(desc.sent())
                .append(", msgsAckedByRmt=").append(desc.acked())
                .append(", msgsRcvd=").append(desc.received())
                .append(", lastAcked=").append(desc.lastAcknowledged())
                .append(", reserveCnt=").append(desc.reserveCount())
                .append(", descIdHash=").append(System.identityHashCode(desc))
                .append(']').append(U.nl());
        }

        for (Map.Entry<ConnectionKey, GridNioRecoveryDescriptor> entry : outRecDescs.entrySet()) {
            GridNioRecoveryDescriptor desc = entry.getValue();

            if (dstNodeId != null && !dstNodeId.equals(entry.getKey().nodeId()))
                continue;

            sb.append("    [key=").append(entry.getKey())
                .append(", msgsSent=").append(desc.sent())
                .append(", msgsAckedByRmt=").append(desc.acked())
                .append(", reserveCnt=").append(desc.reserveCount())
                .append(", connected=").append(desc.connected())
                .append(", reserved=").append(desc.reserved())
                .append(", descIdHash=").append(System.identityHashCode(desc))
                .append(']').append(U.nl());
        }

        for (Map.Entry<ConnectionKey, GridNioRecoveryDescriptor> entry : inRecDescs.entrySet()) {
            GridNioRecoveryDescriptor desc = entry.getValue();

            if (dstNodeId != null && !dstNodeId.equals(entry.getKey().nodeId()))
                continue;

            sb.append("    [key=").append(entry.getKey())
                .append(", msgsRcvd=").append(desc.received())
                .append(", lastAcked=").append(desc.lastAcknowledged())
                .append(", reserveCnt=").append(desc.reserveCount())
                .append(", connected=").append(desc.connected())
                .append(", reserved=").append(desc.reserved())
                .append(", handshakeIdx=").append(desc.handshakeIndex())
                .append(", descIdHash=").append(System.identityHashCode(desc))
                .append(']').append(U.nl());
        }

        sb.append("Communication SPI clients: ").append(U.nl());

        for (Map.Entry<UUID, GridCommunicationClient[]> entry : clients.entrySet()) {
            UUID clientNodeId = entry.getKey();

            if (dstNodeId != null && !dstNodeId.equals(clientNodeId))
                continue;

            GridCommunicationClient[] clients0 = entry.getValue();

            for (GridCommunicationClient client : clients0) {
                if (client != null) {
                    sb.append("    [node=").append(clientNodeId)
                        .append(", client=").append(client)
                        .append(']').append(U.nl());
                }
            }
        }
    }

    /** {@inheritDoc} */
    @Override public Map<String, Object> getNodeAttributes() throws IgniteSpiException {
        initFailureDetectionTimeout();

        assertParameter(locPort > 1023, "locPort > 1023");
        assertParameter(locPort <= 0xffff, "locPort < 0xffff");
        assertParameter(locPortRange >= 0, "locPortRange >= 0");
        assertParameter(idleConnTimeout > 0, "idleConnTimeout > 0");
        assertParameter(sockRcvBuf >= 0, "sockRcvBuf >= 0");
        assertParameter(sockSndBuf >= 0, "sockSndBuf >= 0");
        assertParameter(msgQueueLimit >= 0, "msgQueueLimit >= 0");
        assertParameter(shmemPort > 0 || shmemPort == -1, "shmemPort > 0 || shmemPort == -1");
        assertParameter(selectorsCnt > 0, "selectorsCnt > 0");
        assertParameter(connectionsPerNode > 0, "connectionsPerNode > 0");
        assertParameter(connectionsPerNode <= MAX_CONN_PER_NODE, "connectionsPerNode <= 1024");

        if (!failureDetectionTimeoutEnabled()) {
            assertParameter(reconCnt > 0, "reconnectCnt > 0");
            assertParameter(connTimeout >= 0, "connTimeout >= 0");
            assertParameter(maxConnTimeout >= connTimeout, "maxConnTimeout >= connTimeout");
        }

        assertParameter(sockWriteTimeout >= 0, "sockWriteTimeout >= 0");
        assertParameter(ackSndThreshold > 0, "ackSndThreshold > 0");
        assertParameter(unackedMsgsBufSize >= 0, "unackedMsgsBufSize >= 0");

        if (unackedMsgsBufSize > 0) {
            assertParameter(unackedMsgsBufSize >= msgQueueLimit * 5,
                "Specified 'unackedMsgsBufSize' is too low, it should be at least 'msgQueueLimit * 5'.");

            assertParameter(unackedMsgsBufSize >= ackSndThreshold * 5,
                "Specified 'unackedMsgsBufSize' is too low, it should be at least 'ackSndThreshold * 5'.");
        }

        if (connectionsPerNode > 1)
            connPlc = new RoundRobinConnectionPolicy();
        else
            connPlc = new FirstConnectionPolicy();

        chConnPlc = new ConnectionPolicy() {
            /** Sequential connection index provider. */
            private final AtomicInteger chIdx = new AtomicInteger(MAX_CONN_PER_NODE + 1);

            @Override public int connectionIndex() {
                return chIdx.incrementAndGet();
            }
        };

        try {
            locHost = U.resolveLocalHost(locAddr);
        }
        catch (IOException e) {
            throw new IgniteSpiException("Failed to initialize local address: " + locAddr, e);
        }

        try {
            shmemSrv = resetShmemServer();
        }
        catch (IgniteCheckedException e) {
            U.warn(log, "Failed to start shared memory communication server.", e);
        }

        try {
            // This method potentially resets local port to the value
            // local node was bound to.
            nioSrvr = resetNioServer();
        }
        catch (IgniteCheckedException e) {
            throw new IgniteSpiException("Failed to initialize TCP server: " + locHost, e);
        }

        // Set local node attributes.
        try {
            IgniteBiTuple<Collection<String>, Collection<String>> addrs = U.resolveLocalAddresses(locHost);

            Collection<InetSocketAddress> extAddrs = addrRslvr == null ? null :
                U.resolveAddresses(addrRslvr, F.flat(Arrays.asList(addrs.get1(), addrs.get2())), boundTcpPort);

            Map<String, Object> res = new HashMap<>(5);

            boolean setEmptyHostNamesAttr = !getBoolean(IGNITE_TCP_COMM_SET_ATTR_HOST_NAMES, false) &&
                (!F.isEmpty(locAddr) && locHost.getHostAddress().equals(locAddr)) && !locHost.isAnyLocalAddress() &&
                !locHost.isLoopbackAddress();

            res.put(createSpiAttributeName(ATTR_ADDRS), addrs.get1());
            res.put(createSpiAttributeName(ATTR_HOST_NAMES), setEmptyHostNamesAttr ? emptyList() : addrs.get2());
            res.put(createSpiAttributeName(ATTR_PORT), boundTcpPort);
            res.put(createSpiAttributeName(ATTR_SHMEM_PORT), boundTcpShmemPort >= 0 ? boundTcpShmemPort : null);
            res.put(createSpiAttributeName(ATTR_EXT_ADDRS), extAddrs);
            res.put(createSpiAttributeName(ATTR_PAIRED_CONN), usePairedConnections);

            return res;
        }
        catch (IOException | IgniteCheckedException e) {
            throw new IgniteSpiException("Failed to resolve local host to addresses: " + locHost, e);
        }
    }

    /**
     * @return Bound TCP server port.
     */
    public int boundPort() {
        return boundTcpPort;
    }

    /** {@inheritDoc} */
    @Override public void spiStart(String igniteInstanceName) throws IgniteSpiException {
        assert locHost != null;

        // Start SPI start stopwatch.
        startStopwatch();

        // Ack parameters.
        if (log.isDebugEnabled()) {
            log.debug(configInfo("locAddr", locAddr));
            log.debug(configInfo("locPort", locPort));
            log.debug(configInfo("locPortRange", locPortRange));
            log.debug(configInfo("idleConnTimeout", idleConnTimeout));
            log.debug(configInfo("directBuf", directBuf));
            log.debug(configInfo("directSendBuf", directSndBuf));
            log.debug(configInfo("selectorsCnt", selectorsCnt));
            log.debug(configInfo("tcpNoDelay", tcpNoDelay));
            log.debug(configInfo("sockSndBuf", sockSndBuf));
            log.debug(configInfo("sockRcvBuf", sockRcvBuf));
            log.debug(configInfo("shmemPort", shmemPort));
            log.debug(configInfo("msgQueueLimit", msgQueueLimit));
            log.debug(configInfo("connectionsPerNode", connectionsPerNode));

            if (failureDetectionTimeoutEnabled()) {
                log.debug(configInfo("connTimeout", connTimeout));
                log.debug(configInfo("maxConnTimeout", maxConnTimeout));
                log.debug(configInfo("reconCnt", reconCnt));
            }
            else
                log.debug(configInfo("failureDetectionTimeout", failureDetectionTimeout()));

            log.debug(configInfo("sockWriteTimeout", sockWriteTimeout));
            log.debug(configInfo("ackSndThreshold", ackSndThreshold));
            log.debug(configInfo("unackedMsgsBufSize", unackedMsgsBufSize));
        }

        if (!tcpNoDelay)
            U.quietAndWarn(log, "'TCP_NO_DELAY' for communication is off, which should be used with caution " +
                "since may produce significant delays with some scenarios.");

        if (slowClientQueueLimit > 0 && msgQueueLimit > 0 && slowClientQueueLimit >= msgQueueLimit) {
            U.quietAndWarn(log, "Slow client queue limit is set to a value greater than or equal to message " +
                "queue limit (slow client queue limit will have no effect) [msgQueueLimit=" + msgQueueLimit +
                ", slowClientQueueLimit=" + slowClientQueueLimit + ']');
        }

        if (msgQueueLimit == 0)
            U.quietAndWarn(log, "Message queue limit is set to 0 which may lead to " +
                "potential OOMEs when running cache operations in FULL_ASYNC or PRIMARY_SYNC modes " +
                "due to message queues growth on sender and receiver sides.");

        registerMBean(igniteInstanceName, new TcpCommunicationSpiMBeanImpl(this), TcpCommunicationSpiMBean.class);

        connectGate = new ConnectGateway();

        if (shmemSrv != null) {
            shmemAcceptWorker = new ShmemAcceptWorker(shmemSrv);

            new IgniteThread(shmemAcceptWorker).start();
        }

        nioSrvr.start();

        commWorker = new CommunicationWorker(igniteInstanceName, log);

        new IgniteSpiThread(igniteInstanceName, commWorker.name(), log) {
            @Override protected void body() {
                commWorker.run();
            }
        }.start();

        // Ack start.
        if (log.isDebugEnabled())
            log.debug(startInfo());
    }

    /** {@inheritDoc} } */
    @Override public void onContextInitialized0(IgniteSpiContext spiCtx) throws IgniteSpiException {
        spiCtx.registerPort(boundTcpPort, IgnitePortProtocol.TCP);

        // SPI can start without shmem port.
        if (boundTcpShmemPort > 0)
            spiCtx.registerPort(boundTcpShmemPort, IgnitePortProtocol.TCP);

        spiCtx.addLocalEventListener(discoLsnr, EVT_NODE_LEFT, EVT_NODE_FAILED);

        ctxInitLatch.countDown();

        metricsLsnr = new TcpCommunicationMetricsListener(ignite, spiCtx);
    }

    /** {@inheritDoc} */
    @Override public IgniteSpiContext getSpiContext() {
        if (ctxInitLatch.getCount() > 0) {
            if (log.isDebugEnabled())
                log.debug("Waiting for context initialization.");

            try {
                U.await(ctxInitLatch);

                if (log.isDebugEnabled())
                    log.debug("Context has been initialized.");
            }
            catch (IgniteInterruptedCheckedException e) {
                U.warn(log, "Thread has been interrupted while waiting for SPI context initialization.", e);
            }
        }

        return super.getSpiContext();
    }

    /**
     * Recreates tpcSrvr socket instance.
     *
     * @return Server instance.
     * @throws IgniteCheckedException Thrown if it's not possible to create server.
     */
    private GridNioServer<Message> resetNioServer() throws IgniteCheckedException {
        if (boundTcpPort >= 0)
            throw new IgniteCheckedException("Tcp NIO server was already created on port " + boundTcpPort);

        IgniteCheckedException lastEx = null;

        // If configured TCP port is busy, find first available in range.
        int lastPort = locPortRange == 0 ? locPort : locPort + locPortRange - 1;

        for (int port = locPort; port <= lastPort; port++) {
            try {
                MessageFactory msgFactory = new MessageFactory() {
                    private MessageFactory impl;

                    @Nullable @Override public Message create(short type) {
                        if (impl == null)
                            impl = getSpiContext().messageFactory();

                        assert impl != null;

                        return impl.create(type);
                    }
                };

                GridNioMessageReaderFactory readerFactory = new GridNioMessageReaderFactory() {
                    private IgniteSpiContext context;

                    private MessageFormatter formatter;

                    @Override public MessageReader reader(GridNioSession ses, MessageFactory msgFactory)
                        throws IgniteCheckedException {
                        final IgniteSpiContext ctx = TcpCommunicationSpi.super.getSpiContext();

                        if (formatter == null || context != ctx) {
                            context = ctx;

                            formatter = context.messageFormatter();
                        }

                        assert formatter != null;

                        ConnectionKey key = ses.meta(CONN_IDX_META);

                        return key != null ? formatter.reader(key.nodeId(), msgFactory) : null;
                    }
                };

                GridNioMessageWriterFactory writerFactory = new GridNioMessageWriterFactory() {
                    private IgniteSpiContext context;

                    private MessageFormatter formatter;

                    @Override public MessageWriter writer(GridNioSession ses) throws IgniteCheckedException {
                        final IgniteSpiContext ctx = TcpCommunicationSpi.super.getSpiContext();

                        if (formatter == null || context != ctx) {
                            context = ctx;

                            formatter = context.messageFormatter();
                        }

                        assert formatter != null;

                        ConnectionKey key = ses.meta(CONN_IDX_META);

                        return key != null ? formatter.writer(key.nodeId()) : null;
                    }
                };

                GridDirectParser parser = new GridDirectParser(log.getLogger(GridDirectParser.class),
                    msgFactory,
                    readerFactory);

                IgnitePredicate<Message> skipRecoveryPred = new IgnitePredicate<Message>() {
                    @Override public boolean apply(Message msg) {
                        return msg instanceof RecoveryLastReceivedMessage;
                    }
                };

                boolean clientMode = Boolean.TRUE.equals(ignite.configuration().isClientMode());

                IgniteBiInClosure<GridNioSession, Integer> queueSizeMonitor =
                    !clientMode && slowClientQueueLimit > 0 ?
                        new CI2<GridNioSession, Integer>() {
                            @Override public void apply(GridNioSession ses, Integer qSize) {
                                checkClientQueueSize(ses, qSize);
                            }
                        } :
                        null;

                GridNioFilter[] filters;

                if (isSslEnabled()) {
                    GridNioSslFilter sslFilter =
                        new GridNioSslFilter(ignite.configuration().getSslContextFactory().create(),
                            true, ByteOrder.LITTLE_ENDIAN, log);

                    sslFilter.directMode(true);

                    sslFilter.wantClientAuth(true);
                    sslFilter.needClientAuth(true);

                    filters = new GridNioFilter[] {
                        new GridNioTracerFilter(log, tracing),
                        new GridNioCodecFilter(parser, log, true),
                        new GridConnectionBytesVerifyFilter(log),
                        sslFilter
                    };
                }
                else
                    filters = new GridNioFilter[] {
                        new GridNioTracerFilter(log, tracing),
                        new GridNioCodecFilter(parser, log, true),
                        new GridConnectionBytesVerifyFilter(log)
                    };

                GridNioServer.Builder<Message> builder = GridNioServer.<Message>builder()
                    .address(locHost)
                    .port(port)
                    .listener(srvLsnr)
                    .logger(log)
                    .selectorCount(selectorsCnt)
                    .igniteInstanceName(igniteInstanceName)
                    .serverName("tcp-comm")
                    .tcpNoDelay(tcpNoDelay)
                    .directBuffer(directBuf)
                    .byteOrder(ByteOrder.LITTLE_ENDIAN)
                    .socketSendBufferSize(sockSndBuf)
                    .socketReceiveBufferSize(sockRcvBuf)
                    .sendQueueLimit(msgQueueLimit)
                    .directMode(true)
                    .writeTimeout(sockWriteTimeout)
                    .selectorSpins(selectorSpins)
                    .filters(filters)
                    .writerFactory(writerFactory)
                    .skipRecoveryPredicate(skipRecoveryPred)
                    .messageQueueSizeListener(queueSizeMonitor)
                    .tracing(tracing)
                    .readWriteSelectorsAssign(usePairedConnections);

                if (ignite instanceof IgniteEx) {
                    IgniteEx igniteEx = (IgniteEx)ignite;

                    builder.workerListener(igniteEx.context().workersRegistry())
                        .metricRegistry(igniteEx.context().metric().registry(COMMUNICATION_METRICS_GROUP_NAME));
                }

                GridNioServer<Message> srvr = builder.build();

                boundTcpPort = port;

                // Ack Port the TCP server was bound to.
                if (log.isInfoEnabled()) {
                    log.info("Successfully bound communication NIO server to TCP port " +
                        "[port=" + boundTcpPort +
                        ", locHost=" + locHost +
                        ", selectorsCnt=" + selectorsCnt +
                        ", selectorSpins=" + srvr.selectorSpins() +
                        ", pairedConn=" + usePairedConnections + ']');
                }

                srvr.idleTimeout(idleConnTimeout);

                return srvr;
            }
            catch (IgniteCheckedException e) {
                if (X.hasCause(e, SSLException.class))
                    throw new IgniteSpiException("Failed to create SSL context. SSL factory: "
                        + ignite.configuration().getSslContextFactory() + '.', e);

                lastEx = e;

                if (log.isDebugEnabled())
                    log.debug("Failed to bind to local port (will try next port within range) [port=" + port +
                        ", locHost=" + locHost + ']');

                onException("Failed to bind to local port (will try next port within range) [port=" + port +
                    ", locHost=" + locHost + ']', e);
            }
        }

        // If free port wasn't found.
        throw new IgniteCheckedException("Failed to bind to any port within range [startPort=" + locPort +
            ", portRange=" + locPortRange + ", locHost=" + locHost + ']', lastEx);
    }

    /**
     * Creates new shared memory communication server.
     *
     * @return Server.
     * @throws IgniteCheckedException If failed.
     */
    @Nullable private IpcSharedMemoryServerEndpoint resetShmemServer() throws IgniteCheckedException {
        if (boundTcpShmemPort >= 0)
            throw new IgniteCheckedException("Shared memory server was already created on port " + boundTcpShmemPort);

        if (shmemPort == -1 || U.isWindows())
            return null;

        IgniteCheckedException lastEx = null;

        // If configured TCP port is busy, find first available in range.
        for (int port = shmemPort; port < shmemPort + locPortRange; port++) {
            try {
                IgniteConfiguration cfg = ignite.configuration();

                IpcSharedMemoryServerEndpoint srv =
                    new IpcSharedMemoryServerEndpoint(log, cfg.getNodeId(), igniteInstanceName, cfg.getWorkDirectory());

                srv.setPort(port);

                srv.omitOutOfResourcesWarning(true);

                srv.start();

                boundTcpShmemPort = port;

                // Ack Port the TCP server was bound to.
                if (log.isInfoEnabled())
                    log.info("Successfully bound shared memory communication to TCP port [port=" + boundTcpShmemPort +
                        ", locHost=" + locHost + ']');

                return srv;
            }
            catch (IgniteCheckedException e) {
                lastEx = e;

                if (log.isDebugEnabled())
                    log.debug("Failed to bind to local port (will try next port within range) [port=" + port +
                        ", locHost=" + locHost + ']');
            }
        }

        // If free port wasn't found.
        throw new IgniteCheckedException("Failed to bind shared memory communication to any port within range [startPort=" +
            locPort + ", portRange=" + locPortRange + ", locHost=" + locHost + ']', lastEx);
    }

    /** {@inheritDoc} */
    @Override public void spiStop() throws IgniteSpiException {
        assert stopping;

        unregisterMBean();

        // Stop TCP server.
        if (nioSrvr != null)
            nioSrvr.stop();

        U.cancel(commWorker);
        U.join(commWorker, log);

        U.cancel(shmemAcceptWorker);
        U.join(shmemAcceptWorker, log);

        U.cancel(shmemWorkers);
        U.join(shmemWorkers, log);

        shmemWorkers.clear();

        // Force closing on stop (safety).
        for (GridCommunicationClient[] clients0 : clients.values()) {
            for (GridCommunicationClient client : clients0) {
                if (client != null)
                    client.forceClose();
            }
        }

        // Clear resources.
        nioSrvr = null;
        commWorker = null;

        boundTcpPort = -1;

        // Ack stop.
        if (log.isDebugEnabled())
            log.debug(stopInfo());
    }

    /** {@inheritDoc} */
    @Override protected void onContextDestroyed0() {
        stopping = true;

        if (ctxInitLatch.getCount() > 0)
            // Safety.
            ctxInitLatch.countDown();

        if (connectGate != null)
            connectGate.stopped();

        getSpiContext().deregisterPorts();

        getSpiContext().removeLocalEventListener(discoLsnr);
    }

    /** {@inheritDoc} */
    @Override public void onClientDisconnected(IgniteFuture<?> reconnectFut) {
        connectGate.disconnected(reconnectFut);

        for (GridCommunicationClient[] clients0 : clients.values()) {
            for (GridCommunicationClient client : clients0) {
                if (client != null)
                    client.forceClose();
            }
        }

        IgniteClientDisconnectedCheckedException err = new IgniteClientDisconnectedCheckedException(reconnectFut,
            "Failed to connect client node disconnected.");

        for (GridFutureAdapter<GridCommunicationClient> clientFut : clientFuts.values())
            clientFut.onDone(err);

        recoveryDescs.clear();
        inRecDescs.clear();
        outRecDescs.clear();
    }

    /** {@inheritDoc} */
    @Override public void onClientReconnected(boolean clusterRestarted) {
        connectGate.reconnected();
    }

    /**
     * @param consistentId Consistent id of the node.
     * @param nodeId Left node ID.
     */
    void onNodeLeft(Object consistentId, UUID nodeId) {
        assert nodeId != null;

        metricsLsnr.onNodeLeft(consistentId);

        GridCommunicationClient[] clients0 = clients.remove(nodeId);

        if (clients0 != null) {
            for (GridCommunicationClient client : clients0) {
                if (client != null) {
                    if (log.isDebugEnabled())
                        log.debug("Forcing NIO client close since node has left [nodeId=" + nodeId +
                            ", client=" + client + ']');

                    client.forceClose();
                }
            }
        }
    }

    /** {@inheritDoc} */
    @Override protected void checkConfigurationConsistency0(IgniteSpiContext spiCtx, ClusterNode node, boolean starting)
        throws IgniteSpiException {
        // These attributes are set on node startup in any case, so we MUST receive them.
        checkAttributePresence(node, createSpiAttributeName(ATTR_ADDRS));
        checkAttributePresence(node, createSpiAttributeName(ATTR_HOST_NAMES));
        checkAttributePresence(node, createSpiAttributeName(ATTR_PORT));
    }

    /**
     * Checks that node has specified attribute and prints warning if it does not.
     *
     * @param node Node to check.
     * @param attrName Name of the attribute.
     */
    private void checkAttributePresence(ClusterNode node, String attrName) {
        if (node.attribute(attrName) == null)
            U.warn(log, "Remote node has inconsistent configuration (required attribute was not found) " +
                "[attrName=" + attrName + ", nodeId=" + node.id() +
                "spiCls=" + U.getSimpleName(TcpCommunicationSpi.class) + ']');
    }

    /** {@inheritDoc} */
    @Override public void sendMessage(ClusterNode node, Message msg) throws IgniteSpiException {
        sendMessage0(node, msg, null);
    }

    /**
     * @param nodes Nodes to check connection with.
     * @return Result future (each bit in result BitSet contains connection status to corresponding node).
     */
    public IgniteFuture<BitSet> checkConnection(List<ClusterNode> nodes) {
        TcpCommunicationConnectionCheckFuture fut = new TcpCommunicationConnectionCheckFuture(
            this,
            log.getLogger(TcpCommunicationConnectionCheckFuture.class),
            nioSrvr,
            nodes);

        long timeout = failureDetectionTimeoutEnabled() ? failureDetectionTimeout() : connTimeout;

        if (log.isInfoEnabled())
            log.info("Start check connection process [nodeCnt=" + nodes.size() + ", timeout=" + timeout + ']');

        fut.init(timeout);

        return new IgniteFutureImpl<>(fut);
    }

    /**
     * Sends given message to destination node. Note that characteristics of the
     * exchange such as durability, guaranteed delivery or error notification is
     * dependant on SPI implementation.
     *
     * @param node Destination node.
     * @param msg Message to send.
     * @param ackC Ack closure.
     * @throws org.apache.ignite.spi.IgniteSpiException Thrown in case of any error during sending the message.
     *      Note that this is not guaranteed that failed communication will result
     *      in thrown exception as this is dependant on SPI implementation.
     */
    public void sendMessage(ClusterNode node, Message msg, IgniteInClosure<IgniteException> ackC)
        throws IgniteSpiException {
        sendMessage0(node, msg, ackC);
    }

    /**
     * @param node Destination node.
     * @param msg Message to send.
     * @param ackC Ack closure.
     * @throws org.apache.ignite.spi.IgniteSpiException Thrown in case of any error during sending the message.
     *      Note that this is not guaranteed that failed communication will result
     *      in thrown exception as this is dependant on SPI implementation.
     */
    private void sendMessage0(ClusterNode node, Message msg, IgniteInClosure<IgniteException> ackC)
        throws IgniteSpiException {
        assert node != null;
        assert msg != null;

        if (log.isTraceEnabled())
            log.trace("Sending message with ack to node [node=" + node + ", msg=" + msg + ']');

        if (isLocalNodeDisconnected()) {
            throw new IgniteSpiException("Failed to send a message to remote node because local node has " +
                "been disconnected [rmtNodeId=" + node.id() + ']');
        }

        ClusterNode locNode = getLocalNode();

        if (locNode == null)
            throw new IgniteSpiException("Local node has not been started or fully initialized " +
                "[isStopping=" + getSpiContext().isStopping() + ']');

        if (node.id().equals(locNode.id()))
            notifyListener(node.id(), msg, NOOP);
        else {
            GridCommunicationClient client = null;

            int connIdx = connPlc.connectionIndex();

            try {
                boolean retry;

                do {
                    client = reserveClient(node, connIdx);

                    UUID nodeId = null;

                    if (!client.async())
                        nodeId = node.id();

                    retry = client.sendMessage(nodeId, msg, ackC);

                    client.release();

                    if (retry) {
                        removeNodeClient(node.id(), client);

                        ClusterNode node0 = getSpiContext().node(node.id());

                        if (node0 == null)
                            throw new ClusterTopologyCheckedException("Failed to send message to remote node " +
                                "(node has left the grid): " + node.id());
                    }

                    client = null;
                }
                while (retry);
            }
            catch (Throwable t) {
                if (stopping)
                    throw new IgniteSpiException("Node is stopping.", t);

                log.error("Failed to send message to remote node [node=" + node + ", msg=" + msg + ']', t);

                if (t instanceof Error)
                    throw (Error)t;

                if (t instanceof RuntimeException)
                    throw (RuntimeException)t;

                throw new IgniteSpiException("Failed to send message to remote node: " + node, t);
            }
            finally {
                if (client != null && removeNodeClient(node.id(), client))
                    client.forceClose();
            }
        }
    }

    /**
     * @return {@code True} if local node in disconnected state.
     */
    private boolean isLocalNodeDisconnected() {
        boolean disconnected = false;

        if (ignite instanceof IgniteKernal)
            disconnected = ((IgniteKernal)ignite).context().clientDisconnected();

        return disconnected;
    }

    /**
     * @param nodeId Node ID.
     * @param rmvClient Client to remove.
     * @return {@code True} if client was removed.
     */
    private boolean removeNodeClient(UUID nodeId, GridCommunicationClient rmvClient) {
        for (;;) {
            GridCommunicationClient[] curClients = clients.get(nodeId);

            if (curClients == null || rmvClient.connectionIndex() >= curClients.length || curClients[rmvClient.connectionIndex()] != rmvClient)
                return false;

            GridCommunicationClient[] newClients = Arrays.copyOf(curClients, curClients.length);

            newClients[rmvClient.connectionIndex()] = null;

            if (clients.replace(nodeId, curClients, newClients))
                return true;
        }
    }

    /**
     * @param node Node.
     * @param connIdx Connection index.
     * @param addClient Client to add.
     */
    private void addNodeClient(ClusterNode node, int connIdx, GridCommunicationClient addClient) {
        assert connectionsPerNode > 0 : connectionsPerNode;
        assert connIdx == addClient.connectionIndex() : addClient;

        if (connIdx >= connectionsPerNode) {
            assert !usePairedConnections(node);

            return;
        }

        for (;;) {
            GridCommunicationClient[] curClients = clients.get(node.id());

            assert curClients == null || curClients[connIdx] == null : "Client already created [node=" + node.id() +
                ", connIdx=" + connIdx +
                ", client=" + addClient +
                ", oldClient=" + curClients[connIdx] + ']';

            GridCommunicationClient[] newClients;

            if (curClients == null) {
                newClients = new GridCommunicationClient[connectionsPerNode];
                newClients[connIdx] = addClient;

                if (clients.putIfAbsent(node.id(), newClients) == null)
                    break;
            }
            else {
                newClients = Arrays.copyOf(curClients, curClients.length);
                newClients[connIdx] = addClient;

                if (clients.replace(node.id(), curClients, newClients))
                    break;
            }
        }
    }

    /**
     * Returns existing or just created client to node.
     *
     * @param node Node to which client should be open.
     * @param connIdx Connection index.
     * @return The existing or just created client.
     * @throws IgniteCheckedException Thrown if any exception occurs.
     */
    private GridCommunicationClient reserveClient(ClusterNode node, int connIdx) throws IgniteCheckedException {
        assert node != null;
        assert (connIdx >= 0 && connIdx < connectionsPerNode) || !usePairedConnections(node) : connIdx;

        UUID nodeId = node.id();

        while (true) {
            GridCommunicationClient[] curClients = clients.get(nodeId);

            GridCommunicationClient client = curClients != null && connIdx < curClients.length ?
                curClients[connIdx] : null;

            if (client == null) {
                if (stopping)
                    throw new IgniteSpiException("Node is stopping.");

                // Do not allow concurrent connects.
                GridFutureAdapter<GridCommunicationClient> fut = new ConnectFuture();

                ConnectionKey connKey = new ConnectionKey(nodeId, connIdx, -1);

                GridFutureAdapter<GridCommunicationClient> oldFut = clientFuts.putIfAbsent(connKey, fut);

                if (oldFut == null) {
                    try {
                        GridCommunicationClient[] curClients0 = clients.get(nodeId);

                        GridCommunicationClient client0 = curClients0 != null && connIdx < curClients0.length ?
                            curClients0[connIdx] : null;

                        if (client0 == null) {
                            client0 = createCommunicationClient(node, connIdx);

                            if (client0 != null) {
                                addNodeClient(node, connIdx, client0);

                                if (client0 instanceof GridTcpNioCommunicationClient) {
                                    GridTcpNioCommunicationClient tcpClient = ((GridTcpNioCommunicationClient)client0);

                                    if (tcpClient.session().closeTime() > 0 && removeNodeClient(nodeId, client0)) {
                                        if (log.isDebugEnabled())
                                            log.debug("Session was closed after client creation, will retry " +
                                                "[node=" + node + ", client=" + client0 + ']');

                                        client0 = null;
                                    }
                                }
                            }
                            else {
                                U.sleep(200);

                                if (getSpiContext().node(node.id()) == null)
                                    throw new ClusterTopologyCheckedException("Failed to send message " +
                                        "(node left topology): " + node);
                            }
                        }

                        fut.onDone(client0);
                    }
                    catch (Throwable e) {
                        fut.onDone(e);

                        if (e instanceof IgniteTooManyOpenFilesException)
                            throw e;

                        if (e instanceof Error)
                            throw (Error)e;
                    }
                    finally {
                        clientFuts.remove(connKey, fut);
                    }
                }
                else
                    fut = oldFut;

                client = fut.get();

                if (client == null) {
                    if (isLocalNodeDisconnected())
                        throw new IgniteCheckedException("Unable to create TCP client due to local node disconnecting.");
                    else
                        continue;
                }

                if (getSpiContext().node(nodeId) == null) {
                    if (removeNodeClient(nodeId, client))
                        client.forceClose();

                    throw new IgniteSpiException("Destination node is not in topology: " + node.id());
                }
            }

            assert connIdx == client.connectionIndex() : client;

            if (client.reserve())
                return client;
            else
                // Client has just been closed by idle worker. Help it and try again.
                removeNodeClient(nodeId, client);
        }
    }

    /**
     * @param node Node to create client for.
     * @param connIdx Connection index.
     * @return Client.
     * @throws IgniteCheckedException If failed.
     */
    @Nullable private GridCommunicationClient createCommunicationClient(ClusterNode node, int connIdx)
        throws IgniteCheckedException {
        assert node != null;

        Integer shmemPort = node.attribute(createSpiAttributeName(ATTR_SHMEM_PORT));

        ClusterNode locNode = getSpiContext().localNode();

        if (locNode == null)
            throw new IgniteCheckedException("Failed to create NIO client (local node is stopping)");

        if (log.isDebugEnabled())
            log.debug("Creating NIO client to node: " + node);

        // If remote node has shared memory server enabled and has the same set of MACs
        // then we are likely to run on the same host and shared memory communication could be tried.
        if (shmemPort != null && U.sameMacs(locNode, node)) {
            try {
                // https://issues.apache.org/jira/browse/IGNITE-11126 Rework failure detection logic.
                GridCommunicationClient client = createShmemClient(
                    node,
                    connIdx,
                    shmemPort);

                if (log.isDebugEnabled())
                    log.debug("Shmem client created: " + client);

                return client;
            }
            catch (IgniteCheckedException e) {
                if (e.hasCause(IpcOutOfSystemResourcesException.class))
                    // Has cause or is itself the IpcOutOfSystemResourcesException.
                    LT.warn(log, OUT_OF_RESOURCES_TCP_MSG);
                else if (getSpiContext().node(node.id()) != null)
                    LT.warn(log, e.getMessage());
                else if (log.isDebugEnabled())
                    log.debug("Failed to establish shared memory connection with local node (node has left): " +
                        node.id());
            }
        }

        final long start = System.currentTimeMillis();

        GridCommunicationClient client = createTcpClient(node, connIdx);

        final long time = System.currentTimeMillis() - start;

            if (time > CONNECTION_ESTABLISH_THRESHOLD_MS) {
                if (log.isInfoEnabled())
                    log.info("TCP client created [client=" + clientString(client, node) + ", duration=" + time + "ms]");
            }
            else if (log.isDebugEnabled())
                log.debug("TCP client created [client=" + clientString(client, node) + ", duration=" + time + "ms]");

        return client;
    }

    /**
     * Returns the string representation of client with protection from null client value. If the client if null,
     * string representation is built from cluster node.
     *
     * @param client communication client
     * @param node cluster node to which the client tried to establish a connection
     * @return string representation of client
     * @throws IgniteCheckedException if failed
     */
    private String clientString(GridCommunicationClient client, ClusterNode node) throws IgniteCheckedException {
        if (client == null) {
            assert node != null;

            StringJoiner joiner = new StringJoiner(", ");

            for (InetSocketAddress addr : nodeAddresses(node))
                joiner.add(addr.toString());

            return "null, node addrs=[" + joiner.toString() + "]";
        }
        else
            return client.toString();
    }

    /**
     * @param node Node.
     * @param port Port.
     * @param connIdx Connection index.
     * @return Client.
     * @throws IgniteCheckedException If failed.
     */
    @Nullable private GridCommunicationClient createShmemClient(ClusterNode node,
        int connIdx,
        Integer port) throws IgniteCheckedException {
        int attempt = 1;

        int connectAttempts = 1;

        long connTimeout0 = connTimeout;

        IgniteSpiOperationTimeoutHelper timeoutHelper = new IgniteSpiOperationTimeoutHelper(this,
            !node.isClient());

        while (true) {
            GridCommunicationClient client;

            try {
                client = new GridShmemCommunicationClient(
                    connIdx,
                    metricsLsnr.metricRegistry(),
                    port,
                    timeoutHelper.nextTimeoutChunk(connTimeout),
                    log,
                    getSpiContext().messageFormatter());
            }
            catch (IgniteCheckedException e) {
                if (timeoutHelper.checkFailureTimeoutReached(e))
                    throw e;

                // Reconnect for the second time, if connection is not established.
                if (connectAttempts < 2 && X.hasCause(e, ConnectException.class)) {
                    connectAttempts++;

                    continue;
                }

                throw e;
            }

            try {
                safeShmemHandshake(client, node.id(), timeoutHelper.nextTimeoutChunk(connTimeout0));
            }
            catch (IgniteSpiOperationTimeoutException e) {
                client.forceClose();

                if (failureDetectionTimeoutEnabled() && timeoutHelper.checkFailureTimeoutReached(e)) {
                    if (log.isDebugEnabled())
                        log.debug("Handshake timed out (failure threshold reached) [failureDetectionTimeout=" +
                            failureDetectionTimeout() + ", err=" + e.getMessage() + ", client=" + client + ']');

                    throw e;
                }

                assert !failureDetectionTimeoutEnabled();

                if (log.isDebugEnabled())
                    log.debug("Handshake timed out (will retry with increased timeout) [timeout=" + connTimeout0 +
                        ", err=" + e.getMessage() + ", client=" + client + ']');

                if (attempt == reconCnt || connTimeout0 > maxConnTimeout) {
                    if (log.isDebugEnabled())
                        log.debug("Handshake timedout (will stop attempts to perform the handshake) " +
                            "[timeout=" + connTimeout0 + ", maxConnTimeout=" + maxConnTimeout +
                            ", attempt=" + attempt + ", reconCnt=" + reconCnt +
                            ", err=" + e.getMessage() + ", client=" + client + ']');

                    throw e;
                }
                else {
                    attempt++;

                    connTimeout0 *= 2;

                    continue;
                }
            }
            catch (IgniteCheckedException | RuntimeException | Error e) {
                if (log.isDebugEnabled())
                    log.debug(
                        "Caught exception (will close client) [err=" + e.getMessage() + ", client=" + client + ']');

                client.forceClose();

                throw e;
            }

            return client;
        }
    }

    /**
     * Checks client message queue size and initiates client drop if message queue size exceeds the configured limit.
     *
     * @param ses Node communication session.
     * @param msgQueueSize Message queue size.
     */
    private void checkClientQueueSize(GridNioSession ses, int msgQueueSize) {
        if (slowClientQueueLimit > 0 && msgQueueSize > slowClientQueueLimit) {
            ConnectionKey id = ses.meta(CONN_IDX_META);

            if (id != null) {
                ClusterNode node = getSpiContext().node(id.nodeId());

                if (node != null && node.isClient()) {
                    String msg = "Client node outbound message queue size exceeded slowClientQueueLimit, " +
                        "the client will be dropped " +
                        "(consider changing 'slowClientQueueLimit' configuration property) " +
                        "[srvNode=" + getSpiContext().localNode().id() +
                        ", clientNode=" + node +
                        ", slowClientQueueLimit=" + slowClientQueueLimit + ']';

                    U.quietAndWarn(log, msg);

                    getSpiContext().failNode(id.nodeId(), msg);
                }
            }
        }
    }

    /**
     * @param node Node.
     * @return Node addresses.
     * @throws IgniteCheckedException If failed.
     */
    private Collection<InetSocketAddress> nodeAddresses(ClusterNode node) throws IgniteCheckedException {
        return nodeAddresses(node, filterReachableAddresses);
    }

    /**
     * @param node Node.
     * @param filterReachableAddresses Filter addresses flag.
     * @return Node addresses.
     * @throws IgniteCheckedException If node does not have addresses.
     */
    public Collection<InetSocketAddress> nodeAddresses(ClusterNode node, boolean filterReachableAddresses)
        throws IgniteCheckedException {
        Collection<String> rmtAddrs0 = node.attribute(createSpiAttributeName(ATTR_ADDRS));
        Collection<String> rmtHostNames0 = node.attribute(createSpiAttributeName(ATTR_HOST_NAMES));
        Integer boundPort = node.attribute(createSpiAttributeName(ATTR_PORT));
        Collection<InetSocketAddress> extAddrs = node.attribute(createSpiAttributeName(ATTR_EXT_ADDRS));

        boolean isRmtAddrsExist = (!F.isEmpty(rmtAddrs0) && boundPort != null);
        boolean isExtAddrsExist = !F.isEmpty(extAddrs);

        if (!isRmtAddrsExist && !isExtAddrsExist)
            throw new IgniteCheckedException("Failed to send message to the destination node. Node doesn't have any " +
                "TCP communication addresses or mapped external addresses. Check configuration and make sure " +
                "that you use the same communication SPI on all nodes. Remote node id: " + node.id());

        LinkedHashSet<InetSocketAddress> addrs;

        // Try to connect first on bound addresses.
        if (isRmtAddrsExist) {
            List<InetSocketAddress> addrs0 = new ArrayList<>(U.toSocketAddresses(rmtAddrs0, rmtHostNames0, boundPort));

            boolean sameHost = U.sameMacs(getSpiContext().localNode(), node);

            addrs0.sort(U.inetAddressesComparator(sameHost));

            addrs = new LinkedHashSet<>(addrs0);
        }
        else
            addrs = new LinkedHashSet<>();

        // Then on mapped external addresses.
        if (isExtAddrsExist)
            addrs.addAll(extAddrs);

        if (log.isDebugEnabled())
            log.debug("Addresses resolved from attributes [rmtNode=" + node.id() + ", addrs=" + addrs +
                ", isRmtAddrsExist=" + isRmtAddrsExist + ']');

        if (filterReachableAddresses) {
            Set<InetAddress> allInetAddrs = U.newHashSet(addrs.size());

            for (InetSocketAddress addr : addrs) {
                // Skip unresolved as addr.getAddress() can return null.
                if (!addr.isUnresolved())
                    allInetAddrs.add(addr.getAddress());
            }

            List<InetAddress> reachableInetAddrs = U.filterReachable(allInetAddrs);

            if (reachableInetAddrs.size() < allInetAddrs.size()) {
                LinkedHashSet<InetSocketAddress> addrs0 = U.newLinkedHashSet(addrs.size());

                List<InetSocketAddress> unreachableInetAddr = new ArrayList<>(allInetAddrs.size() - reachableInetAddrs.size());

                for (InetSocketAddress addr : addrs) {
                    if (reachableInetAddrs.contains(addr.getAddress()))
                        addrs0.add(addr);
                    else
                        unreachableInetAddr.add(addr);
                }

                addrs0.addAll(unreachableInetAddr);

                addrs = addrs0;
            }

            if (log.isDebugEnabled())
                log.debug("Addresses to connect for node [rmtNode=" + node.id() + ", addrs=" + addrs + ']');
        }

        return addrs;
    }

    /**
     * Establish TCP connection to remote node and returns client.
     *
     * @param node Remote node.
     * @param connIdx Connection index.
     * @return Client.
     * @throws IgniteCheckedException If failed.
     */
    protected GridCommunicationClient createTcpClient(ClusterNode node, int connIdx) throws IgniteCheckedException {
        GridNioSession ses = createNioSession(node, connIdx);

        return ses == null ?
            null : new GridTcpNioCommunicationClient(connIdx, ses, log);
    }

    /**
     * Returns the established TCP/IP connection between the current node and remote server. A handshake process of
     * negotiation between two communicating nodes will be performed before the {@link GridNioSession} created.
     * <p>
     *     The handshaking process contains of these steps:
     *
     *     <ol>
     *         <li>The local node opens a new {@link SocketChannel} in the <em>blocking</em> mode.</li>
     *         <li>The local node calls {@link SocketChannel#connect(SocketAddress)} to remote node.</li>
     *         <li>The remote GridNioAcceptWorker thread accepts new connection.</li>
     *         <li>The remote node sends back the {@link NodeIdMessage}.</li>
     *         <li>The local node reads NodeIdMessage from created channel.</li>
     *         <li>The local node sends the {@link HandshakeMessage2} to remote.</li>
     *         <li>The remote node processes {@link HandshakeMessage2} in {@link GridNioServerListener#onMessage(GridNioSession, Object)}.</li>
     *         <li>The remote node sends back the {@link RecoveryLastReceivedMessage}.</li>
     *     </ol>
     *
     *     The handshaking process ends.
     * </p>
     * <p>
     *     <em>Note.</em> The {@link HandshakeTimeoutObject} is created to control execution timeout during the
     *     whole handshaking process.
     * </p>
     *
     * @param node Remote node identifier to connect with.
     * @param connIdx Connection index based on configured {@link ConnectionPolicy}.
     * @return A {@link GridNioSession} connection representation.
     * @throws IgniteCheckedException If establish connection fails.
     */
    private GridNioSession createNioSession(ClusterNode node, int connIdx) throws IgniteCheckedException {
        Collection<InetSocketAddress> addrs = nodeAddresses(node);

        GridNioSession ses = null;
        IgniteCheckedException errs = null;

        long totalTimeout;

        if (failureDetectionTimeoutEnabled())
            totalTimeout = node.isClient() ? clientFailureDetectionTimeout() : failureDetectionTimeout();
        else {
            totalTimeout = ExponentialBackoffTimeoutStrategy.totalBackoffTimeout(
                connTimeout,
                maxConnTimeout,
                reconCnt
            );
        }

        for (InetSocketAddress addr : addrs) {
            if (addr.isUnresolved())
                continue;

            TimeoutStrategy connTimeoutStgy = new ExponentialBackoffTimeoutStrategy(
                totalTimeout,
                failureDetectionTimeoutEnabled() ? DFLT_INITIAL_TIMEOUT : connTimeout,
                maxConnTimeout
            );

            while (ses == null) { // Reconnection on handshake timeout.
                if (stopping)
                    throw new IgniteSpiException("Node is stopping.");

                if (isLocalNodeAddress(addr)) {
                    if (log.isDebugEnabled())
                        log.debug("Skipping local address [addr=" + addr +
                            ", locAddrs=" + node.attribute(createSpiAttributeName(ATTR_ADDRS)) +
                            ", node=" + node + ']');
                    break;
                }

                long timeout = 0;

                connectGate.enter();

                try {
                    if (getSpiContext().node(node.id()) == null)
                        throw new ClusterTopologyCheckedException("Failed to send message (node left topology): " + node);

                    SocketChannel ch = openSocketChannel();

                    ch.configureBlocking(true);

                    ch.socket().setTcpNoDelay(tcpNoDelay);
                    ch.socket().setKeepAlive(true);

                    if (sockRcvBuf > 0)
                        ch.socket().setReceiveBufferSize(sockRcvBuf);

                    if (sockSndBuf > 0)
                        ch.socket().setSendBufferSize(sockSndBuf);

                    ConnectionKey connKey = new ConnectionKey(node.id(), connIdx, -1);

                    GridNioRecoveryDescriptor recoveryDesc = outRecoveryDescriptor(node, connKey);

                    assert recoveryDesc != null :
                        "Recovery descriptor not found [connKey=" + connKey + ", rmtNode=" + node.id() + ']';

                    if (!recoveryDesc.reserve()) {
                        U.closeQuiet(ch);

                        // Ensure the session is closed.
                        GridNioSession sesFromRecovery = recoveryDesc.session();

                        if (sesFromRecovery != null) {
                            while (sesFromRecovery.closeTime() == 0)
                                sesFromRecovery.close();
                        }

                        return null;
                    }

                    long rcvCnt;

                    Map<Integer, Object> meta = new HashMap<>();

                    GridSslMeta sslMeta = null;

                    try {
                        timeout = connTimeoutStgy.nextTimeout();

                        ch.socket().connect(addr, (int) timeout);

                        if (getSpiContext().node(node.id()) == null)
                            throw new ClusterTopologyCheckedException("Failed to send message (node left topology): " + node);

                        if (isSslEnabled()) {
                            meta.put(SSL_META.ordinal(), sslMeta = new GridSslMeta());

                            SSLEngine sslEngine = ignite.configuration().getSslContextFactory().create().createSSLEngine();

                            sslEngine.setUseClientMode(true);

                            sslMeta.sslEngine(sslEngine);
                        }

                        ClusterNode locNode = getLocalNode();

                        if (locNode == null)
                            throw new IgniteCheckedException("Local node has not been started or " +
                                "fully initialized [isStopping=" + getSpiContext().isStopping() + ']');

                        timeout = connTimeoutStgy.nextTimeout(timeout);

                        rcvCnt = safeTcpHandshake(ch,
                            node.id(),
                            timeout,
                            sslMeta,
                            new HandshakeMessage2(locNode.id(),
                                recoveryDesc.incrementConnectCount(),
                                recoveryDesc.received(),
                                connIdx));

                        if (rcvCnt == ALREADY_CONNECTED)
                            return null;
                        else if (rcvCnt == NODE_STOPPING)
                            throw new ClusterTopologyCheckedException("Remote node started stop procedure: " + node.id());
                        else if (rcvCnt == UNKNOWN_NODE)
                            throw new ClusterTopologyCheckedException("Remote node does not observe current node " +
                                "in topology : " + node.id());
                        else if (rcvCnt == NEED_WAIT) {
                            //check that failure timeout will be reached after sleep(outOfTopDelay).
                            if (connTimeoutStgy.checkTimeout(DFLT_NEED_WAIT_DELAY)) {
                                U.warn(log, "Handshake NEED_WAIT timed out (will stop attempts to perform the handshake) " +
                                    "[node=" + node.id() +
                                    ", connTimeoutStgy=" + connTimeoutStgy +
                                    ", addr=" + addr +
                                    ", failureDetectionTimeoutEnabled=" + failureDetectionTimeoutEnabled() +
                                    ", timeout=" + timeout + ']');

                                throw new ClusterTopologyCheckedException("Failed to connect to node " +
                                    "(current or target node is out of topology on target node within timeout). " +
                                        "Make sure that each ComputeTask and cache Transaction has a timeout set " +
                                        "in order to prevent parties from waiting forever in case of network issues " +
                                        "[nodeId=" + node.id() + ", addrs=" + addrs + ']');
                            }
                            else {
                                if (log.isDebugEnabled())
                                    log.debug("NEED_WAIT received, handshake after delay [node = "
                                        + node + ", outOfTopologyDelay = " + DFLT_NEED_WAIT_DELAY + "ms]");

                                U.sleep(DFLT_NEED_WAIT_DELAY);

                                continue;
                            }
                        }
                        else if (rcvCnt < 0)
                            throw new IgniteCheckedException("Unsupported negative receivedCount [rcvCnt=" + rcvCnt +
                                ", senderNode=" + node + ']');

                        recoveryDesc.onHandshake(rcvCnt);

                        meta.put(CONSISTENT_ID_META, node.consistentId());
                        meta.put(CONN_IDX_META, connKey);
                        meta.put(GridNioServer.RECOVERY_DESC_META_KEY, recoveryDesc);

                        ses = nioSrvr.createSession(ch, meta, false, null).get();
                    }
                    finally {
                        if (ses == null) {
                            U.closeQuiet(ch);

                            if (recoveryDesc != null)
                                recoveryDesc.release();
                        }
                    }
                }
                catch (IgniteSpiOperationTimeoutException e) { // Handshake is timed out.
                    if (ses != null) {
                        ses.close();

                        ses = null;
                    }

                    onException("Handshake timed out (will retry with increased timeout) [connTimeoutStrategy=" + connTimeoutStgy +
                        ", addr=" + addr + ']', e);

                    if (log.isDebugEnabled())
                        log.debug("Handshake timed out (will retry with increased timeout) [connTimeoutStrategy=" + connTimeoutStgy +
                                ", addr=" + addr + ", err=" + e + ']'
                        );

                    if (connTimeoutStgy.checkTimeout()) {
                        U.warn(log, "Handshake timed out (will stop attempts to perform the handshake) " +
                            "[node=" + node.id() + ", connTimeoutStrategy=" + connTimeoutStgy +
                            ", err=" + e.getMessage() + ", addr=" + addr +
                            ", failureDetectionTimeoutEnabled=" + failureDetectionTimeoutEnabled() +
                            ", timeout=" + timeout + ']');

                        String msg = "Failed to connect to node (is node still alive?). " +
                            "Make sure that each ComputeTask and cache Transaction has a timeout set " +
                            "in order to prevent parties from waiting forever in case of network issues " +
                            "[nodeId=" + node.id() + ", addrs=" + addrs + ']';

                        if (errs == null)
                            errs = new IgniteCheckedException(msg, e);
                        else
                            errs.addSuppressed(new IgniteCheckedException(msg, e));

                        break;
                    }
                }
                catch (ClusterTopologyCheckedException e) {
                    throw e;
                }
                catch (Exception e) {
                    // Most probably IO error on socket connect or handshake.
                    if (ses != null) {
                        ses.close();

                        ses = null;
                    }

                    onException("Client creation failed [addr=" + addr + ", err=" + e + ']', e);

                    if (log.isDebugEnabled())
                        log.debug("Client creation failed [addr=" + addr + ", err=" + e + ']');

                    if (X.hasCause(e, "Too many open files", SocketException.class))
                        throw new IgniteTooManyOpenFilesException(e);

                    // check if timeout occured in case of unrecoverable exception
                    if (connTimeoutStgy.checkTimeout()) {
                        U.warn(log, "Connection timed out (will stop attempts to perform the connect) " +
                                "[node=" + node.id() + ", connTimeoutStgy=" + connTimeoutStgy +
                                ", failureDetectionTimeoutEnabled=" + failureDetectionTimeoutEnabled() +
                                ", timeout=" + timeout +
                                ", err=" + e.getMessage() + ", addr=" + addr + ']');

                        String msg = "Failed to connect to node (is node still alive?). " +
                                "Make sure that each ComputeTask and cache Transaction has a timeout set " +
                                "in order to prevent parties from waiting forever in case of network issues " +
                                "[nodeId=" + node.id() + ", addrs=" + addrs + ']';

                        if (errs == null)
                            errs = new IgniteCheckedException(msg, e);
                        else
                            errs.addSuppressed(new IgniteCheckedException(msg, e));

                        break;
                    }

                    if (isRecoverableException(e))
                        U.sleep(DFLT_RECONNECT_DELAY);
                    else {
                        String msg = "Failed to connect to node due to unrecoverable exception (is node still alive?). " +
                                "Make sure that each ComputeTask and cache Transaction has a timeout set " +
                                "in order to prevent parties from waiting forever in case of network issues " +
                                "[nodeId=" + node.id() + ", addrs=" + addrs + ", err= " + e + ']';

                        if (errs == null)
                            errs = new IgniteCheckedException(msg, e);
                        else
                            errs.addSuppressed(new IgniteCheckedException(msg, e));

                        break;
                    }
                }
                finally {
                    connectGate.leave();
                }

                CommunicationWorker commWorker0 = commWorker;

                if (commWorker0 != null && commWorker0.runner() == Thread.currentThread())
                    commWorker0.updateHeartbeat();
            }

            if (ses != null)
                break;
        }

        if (ses == null)
            processSessionCreationError(node, addrs, errs == null ? new IgniteCheckedException("No session found") : errs);

        return ses;
    }

    /**
     * Opens a socket channel.
     *
     * @return A new socket channel.
     * @throws IOException If an I/O error occurs.
     */
    protected SocketChannel openSocketChannel() throws IOException {
        return SocketChannel.open();
    }

    /**
     * Closing connections to node.
     * NOTE: It is recommended only for tests.
     *
     * @param nodeId Node for which to close connections.
     * @throws IgniteCheckedException If occurs.
     */
    void closeConnections(UUID nodeId) throws IgniteCheckedException {
        GridCommunicationClient[] clients = this.clients.remove(nodeId);

        if (nonNull(clients)) {
            for (GridCommunicationClient client : clients)
                client.forceClose();
        }

        for (ConnectionKey connKey : clientFuts.keySet()) {
            if (!nodeId.equals(connKey.nodeId()))
                continue;

            GridFutureAdapter<GridCommunicationClient> fut = clientFuts.remove(connKey);

            if (nonNull(fut))
                fut.get().forceClose();
        }
    }

    /**
     * Check is passed  socket address belong to current node. This method should return true only if the passed
     * in address represent an address which will result in a connection to the local node.
     *
     * @param addr address to check.
     * @return true if passed address belongs to local node, otherwise false.
     */
    private boolean isLocalNodeAddress(InetSocketAddress addr) {
        return addr.getPort() == boundTcpPort
            && (locHost.equals(addr.getAddress())
                || addr.getAddress().isAnyLocalAddress()
                || (locHost.isAnyLocalAddress() && U.isLocalAddress(addr.getAddress())));
    }

    /**
     * Process errors if TCP/IP {@link GridNioSession} creation to remote node hasn't been performed.
     *
     * @param node Remote node.
     * @param addrs Remote node addresses.
     * @param errs TCP client creation errors.
     * @throws IgniteCheckedException If failed.
     */
    protected void processSessionCreationError(
        ClusterNode node,
        Collection<InetSocketAddress> addrs,
        IgniteCheckedException errs
    ) throws IgniteCheckedException {
        assert errs != null;

        boolean commErrResolve = false;

        IgniteSpiContext ctx = getSpiContext();

        if (isRecoverableException(errs) && ctx.communicationFailureResolveSupported()) {
            commErrResolve = true;

            ctx.resolveCommunicationFailure(node, errs);
        }

        if (!commErrResolve && enableForcibleNodeKill) {
            if (ctx.node(node.id()) != null
                && node.isClient()
                && !getLocalNode().isClient()
                && isRecoverableException(errs)
            ) {
                // Only server can fail client for now, as in TcpDiscovery resolveCommunicationFailure() is not supported.
                String msg = "TcpCommunicationSpi failed to establish connection to node, node will be dropped from " +
                    "cluster [" + "rmtNode=" + node + ']';

                if (enableTroubleshootingLog)
                    U.error(log, msg, errs);
                else
                    U.warn(log, msg);

                ctx.failNode(node.id(), "TcpCommunicationSpi failed to establish connection to node [" +
                    "rmtNode=" + node +
                    ", errs=" + errs +
                    ", connectErrs=" + X.getSuppressedList(errs) + ']');
            }
        }

        throw errs;
    }

    /**
     * @param errs Error.
     * @return {@code True} if error was caused by some connection IO error or IgniteCheckedException due to timeout.
     */
    private boolean isRecoverableException(Exception errs) {
        return X.hasCause(
            errs,
            IOException.class,
            HandshakeException.class,
            IgniteSpiOperationTimeoutException.class
        );
    }

    /** */
    private IgniteSpiOperationTimeoutException handshakeTimeoutException() {
        return new IgniteSpiOperationTimeoutException("Failed to perform handshake due to timeout " +
            "(consider increasing 'connectionTimeout' configuration property).");
    }

    /**
     * Performs handshake in timeout-safe way.
     *
     * @param client Client.
     * @param rmtNodeId Remote node.
     * @param timeout Timeout for handshake.
     * @throws IgniteCheckedException If handshake failed or wasn't completed withing timeout.
     */
    @SuppressWarnings("ThrowFromFinallyBlock")
    private void safeShmemHandshake(
        GridCommunicationClient client,
        UUID rmtNodeId,
        long timeout
    ) throws IgniteCheckedException {
        HandshakeTimeoutObject<GridCommunicationClient> obj = new HandshakeTimeoutObject<>(client,
            U.currentTimeMillis() + timeout);

        addTimeoutObject(obj);

        try {
            client.doHandshake(new HandshakeClosure(rmtNodeId));
        }
        finally {
            if (obj.cancel())
                removeTimeoutObject(obj);
            else
                throw handshakeTimeoutException();
        }
    }

    /**
     * Performs handshake in timeout-safe way.
     *
     * @param ch Socket channel.
     * @param rmtNodeId Remote node.
     * @param timeout Timeout for handshake.
     * @param sslMeta Session meta.
     * @param msg {@link HandshakeMessage} or {@link HandshakeMessage2} to send.
     * @return Handshake response.
     * @throws IgniteCheckedException If handshake failed or wasn't completed withing timeout.
     */
    @SuppressWarnings("ThrowFromFinallyBlock")
    private long safeTcpHandshake(
        SocketChannel ch,
        UUID rmtNodeId,
        long timeout,
        GridSslMeta sslMeta,
        HandshakeMessage msg
    ) throws IgniteCheckedException {
        HandshakeTimeoutObject obj = new HandshakeTimeoutObject<>(ch, U.currentTimeMillis() + timeout);

        addTimeoutObject(obj);

        long rcvCnt;

        try {
            BlockingSslHandler sslHnd = null;

            ByteBuffer buf;

            // Step 1. Get remote node response with the remote nodeId value.
            if (isSslEnabled()) {
                assert sslMeta != null;

                sslHnd = new BlockingSslHandler(sslMeta.sslEngine(), ch, directBuf, ByteOrder.LITTLE_ENDIAN, log);

                if (!sslHnd.handshake())
                    throw new HandshakeException("SSL handshake is not completed.");

                ByteBuffer handBuff = sslHnd.applicationBuffer();

                if (handBuff.remaining() >= DIRECT_TYPE_SIZE) {
                    short msgType = makeMessageType(handBuff.get(0), handBuff.get(1));

                    if (msgType == HANDSHAKE_WAIT_MSG_TYPE)
                        return NEED_WAIT;
                }

                if (handBuff.remaining() < NodeIdMessage.MESSAGE_FULL_SIZE) {
                    buf = ByteBuffer.allocate(1000);

                    int read = ch.read(buf);

                    if (read == -1)
                        throw new HandshakeException("Failed to read remote node ID (connection closed).");

                    buf.flip();

                    buf = sslHnd.decode(buf);

                    if (handBuff.remaining() >= DIRECT_TYPE_SIZE) {
                        short msgType = makeMessageType(handBuff.get(0), handBuff.get(1));

                        if (msgType == HANDSHAKE_WAIT_MSG_TYPE)
                            return NEED_WAIT;
                    }
                }
                else
                    buf = handBuff;
            }
            else {
                buf = ByteBuffer.allocate(NodeIdMessage.MESSAGE_FULL_SIZE);

                for (int i = 0; i < NodeIdMessage.MESSAGE_FULL_SIZE; ) {
                    int read = ch.read(buf);

                    if (read == -1)
                        throw new HandshakeException("Failed to read remote node ID (connection closed).");

                    if (read >= DIRECT_TYPE_SIZE) {
                        short msgType = makeMessageType(buf.get(0), buf.get(1));

                        if (msgType == HANDSHAKE_WAIT_MSG_TYPE)
                            return NEED_WAIT;
                    }

                    i += read;
                }
            }

            UUID rmtNodeId0 = U.bytesToUuid(buf.array(), Message.DIRECT_TYPE_SIZE);

            if (!rmtNodeId.equals(rmtNodeId0))
                throw new HandshakeException("Remote node ID is not as expected [expected=" + rmtNodeId +
                    ", rcvd=" + rmtNodeId0 + ']');
            else if (log.isDebugEnabled())
                log.debug("Received remote node ID: " + rmtNodeId0);

            if (isSslEnabled()) {
                assert sslHnd != null;

                U.writeFully(ch, sslHnd.encrypt(ByteBuffer.wrap(U.IGNITE_HEADER)));
            }
            else
                U.writeFully(ch, ByteBuffer.wrap(U.IGNITE_HEADER));

            // Step 2. Prepare Handshake message to send to the remote node.
            if (log.isDebugEnabled())
                log.debug("Writing handshake message [rmtNode=" + rmtNodeId + ", msg=" + msg + ']');

            buf = ByteBuffer.allocate(msg.getMessageSize());

            buf.order(ByteOrder.LITTLE_ENDIAN);

            boolean written = msg.writeTo(buf, null);

            assert written;

            buf.flip();

            if (isSslEnabled()) {
                assert sslHnd != null;

                U.writeFully(ch, sslHnd.encrypt(buf));
            }
            else
                U.writeFully(ch, buf);

            if (log.isDebugEnabled())
                log.debug("Waiting for handshake [rmtNode=" + rmtNodeId + ']');

            // Step 3. Waiting for response from the remote node with their receive count message.
            if (isSslEnabled()) {
                assert sslHnd != null;

                buf = ByteBuffer.allocate(1000);
                buf.order(ByteOrder.LITTLE_ENDIAN);

                ByteBuffer decode = ByteBuffer.allocate(2 * buf.capacity());
                decode.order(ByteOrder.LITTLE_ENDIAN);

                for (int i = 0; i < RecoveryLastReceivedMessage.MESSAGE_FULL_SIZE; ) {
                    int read = ch.read(buf);

                    if (read == -1)
                        throw new HandshakeException("Failed to read remote node recovery handshake " +
                            "(connection closed).");

                    buf.flip();

                    ByteBuffer decode0 = sslHnd.decode(buf);

                    i += decode0.remaining();

                    decode = appendAndResizeIfNeeded(decode, decode0);

                    buf.clear();
                }

                decode.flip();

                rcvCnt = decode.getLong(Message.DIRECT_TYPE_SIZE);

                if (decode.limit() > RecoveryLastReceivedMessage.MESSAGE_FULL_SIZE) {
                    decode.position(RecoveryLastReceivedMessage.MESSAGE_FULL_SIZE);

                    sslMeta.decodedBuffer(decode);
                }

                ByteBuffer inBuf = sslHnd.inputBuffer();

                if (inBuf.position() > 0)
                    sslMeta.encodedBuffer(inBuf);
            }
            else {
                buf = ByteBuffer.allocate(RecoveryLastReceivedMessage.MESSAGE_FULL_SIZE);

                buf.order(ByteOrder.LITTLE_ENDIAN);

                for (int i = 0; i < RecoveryLastReceivedMessage.MESSAGE_FULL_SIZE; ) {
                    int read = ch.read(buf);

                    if (read == -1)
                        throw new HandshakeException("Failed to read remote node recovery handshake " +
                            "(connection closed).");

                    i += read;
                }

                rcvCnt = buf.getLong(Message.DIRECT_TYPE_SIZE);
            }

            if (log.isDebugEnabled())
                log.debug("Received handshake message [rmtNode=" + rmtNodeId + ", rcvCnt=" + rcvCnt + ']');

            if (rcvCnt == -1) {
                if (log.isDebugEnabled())
                    log.debug("Connection rejected, will retry client creation [rmtNode=" + rmtNodeId + ']');
            }
        }
        catch (IOException e) {
            if (log.isDebugEnabled())
                log.debug("Failed to read from channel: " + e);

            throw new IgniteCheckedException("Failed to read from channel.", e);
        }
        finally {
            if (obj.cancel())
                removeTimeoutObject(obj);
            else
                throw handshakeTimeoutException();
        }

        return rcvCnt;
    }

    /**
     * @param sndId Sender ID.
     * @param msg Communication message.
     * @param msgC Closure to call when message processing finished.
     */
    protected void notifyListener(UUID sndId, Message msg, IgniteRunnable msgC) {
        CommunicationListener<Message> lsnr = this.lsnr;

        MTC.span().addLog(() -> "Communication listeners notified");

        if (lsnr != null)
            // Notify listener of a new message.
            lsnr.onMessage(sndId, msg, msgC);
        else if (log.isDebugEnabled())
            log.debug("Received communication message without any registered listeners (will ignore, " +
                "is node stopping?) [senderNodeId=" + sndId + ", msg=" + msg + ']');
    }

    /**
     * @param nodeId The remote node id.
     * @param channel The configured channel to notify listeners with.
     * @param initMsg Channel initialization message with additional channel params.
     */
    private void notifyChannelEvtListener(UUID nodeId, Channel channel, Message initMsg) {
        if (log.isDebugEnabled())
            log.debug("Notify appropriate listeners due to a new channel opened: " + channel);

        CommunicationListener<Message> lsnr0 = lsnr;

        if (lsnr0 instanceof CommunicationListenerEx)
            ((CommunicationListenerEx<Message>)lsnr0).onChannelOpened(nodeId, initMsg, channel);
    }

    /**
     * @param target Target buffer to append to.
     * @param src Source buffer to get data.
     * @return Original or expanded buffer.
     */
    private ByteBuffer appendAndResizeIfNeeded(ByteBuffer target, ByteBuffer src) {
        if (target.remaining() < src.remaining()) {
            int newSize = Math.max(target.capacity() * 2, target.capacity() + src.remaining());

            ByteBuffer tmp = ByteBuffer.allocate(newSize);

            tmp.order(target.order());

            target.flip();

            tmp.put(target);

            target = tmp;
        }

        target.put(src);

        return target;
    }

    /**
     * Stops service threads to simulate node failure.
     *
     * FOR TEST PURPOSES ONLY!!!
     */
    public void simulateNodeFailure() {
        if (nioSrvr != null)
            nioSrvr.stop();

        if (commWorker != null)
            U.interrupt(commWorker.runner());

        U.join(commWorker, log);

        for (GridCommunicationClient[] clients0 : clients.values()) {
            for (GridCommunicationClient client : clients0) {
                if (client != null)
                    client.forceClose();
            }
        }
    }

    /**
     * @param node Node.
     * @param key Connection key.
     * @return Recovery descriptor for outgoing connection.
     */
    private GridNioRecoveryDescriptor outRecoveryDescriptor(ClusterNode node, ConnectionKey key) {
        if (usePairedConnections(node))
            return recoveryDescriptor(outRecDescs, true, node, key);
        else
            return recoveryDescriptor(recoveryDescs, false, node, key);
    }

    /**
     * @param node Node.
     * @param key Connection key.
     * @return Recovery descriptor for incoming connection.
     */
    private GridNioRecoveryDescriptor inRecoveryDescriptor(ClusterNode node, ConnectionKey key) {
        if (usePairedConnections(node))
            return recoveryDescriptor(inRecDescs, true, node, key);
        else
            return recoveryDescriptor(recoveryDescs, false, node, key);
    }

    /**
     * @param node Node.
     * @return {@code True} if can use in/out connection pair for communication.
     */
    private boolean usePairedConnections(ClusterNode node) {
        if (usePairedConnections) {
            Boolean attr = node.attribute(createSpiAttributeName(ATTR_PAIRED_CONN));

            return attr != null && attr;
        }

        return false;
    }

    /**
     * @param key The connection key to cleanup descriptors on local node.
     */
    private void cleanupLocalNodeRecoveryDescriptor(ConnectionKey key) {
        ClusterNode node = getLocalNode();

        if (usePairedConnections(node)) {
            inRecDescs.remove(key);
            outRecDescs.remove(key);
        }
        else
            recoveryDescs.remove(key);
    }

    /**
     * @param recoveryDescs Descriptors map.
     * @param pairedConnections {@code True} if in/out connections pair is used for communication with node.
     * @param node Node.
     * @param key Connection key.
     * @return Recovery receive data for given node.
     */
    private GridNioRecoveryDescriptor recoveryDescriptor(
        ConcurrentMap<ConnectionKey, GridNioRecoveryDescriptor> recoveryDescs,
        boolean pairedConnections,
        ClusterNode node,
        ConnectionKey key) {
        GridNioRecoveryDescriptor recovery = recoveryDescs.get(key);

        if (recovery == null) {
            if (log.isDebugEnabled())
                log.debug("Missing recovery descriptor for the node (will create a new one) " +
                    "[locNodeId=" + getLocalNode().id() +
                    ", key=" + key + ", rmtNode=" + node + ']');

            int maxSize = Math.max(msgQueueLimit, ackSndThreshold);

            int queueLimit = unackedMsgsBufSize != 0 ? unackedMsgsBufSize : (maxSize * 128);

            GridNioRecoveryDescriptor old = recoveryDescs.putIfAbsent(key,
                recovery = new GridNioRecoveryDescriptor(pairedConnections, queueLimit, node, log));

            if (old != null) {
                recovery = old;

                if (log.isDebugEnabled())
                    log.debug("Will use existing recovery descriptor: " + recovery);
            }
            else {
                if (log.isDebugEnabled())
                    log.debug("Initialized recovery descriptor [desc=" + recovery + ", maxSize=" + maxSize +
                        ", queueLimit=" + queueLimit + ']');
            }
        }

        return recovery;
    }

    /**
     * @param msg Error message.
     * @param e Exception.
     */
    private void onException(String msg, Exception e) {
        getExceptionRegistry().onException(msg, e);
    }

    /**
     * @return Node ID message.
     */
    private NodeIdMessage nodeIdMessage() {
        final UUID locNodeId = (ignite instanceof IgniteEx) ? ((IgniteEx)ignite).context().localNodeId() :
            safeLocalNodeId();

        return new NodeIdMessage(locNodeId);
    }

    /**
     * @return Local node ID.
     */
    private UUID safeLocalNodeId() {
        ClusterNode locNode = getLocalNode();

        UUID id;

        if (locNode == null) {
            U.warn(log, "Local node is not started or fully initialized [isStopping=" +
                getSpiContext().isStopping() + ']');

            id = new UUID(0, 0);
        }
        else
            id = locNode.id();

        return id;
    }

    /** {@inheritDoc} */
    @Override public TcpCommunicationSpi setName(String name) {
        super.setName(name);

        return this;
    }

    /**
     * Checks whether remote nodes support {@link HandshakeWaitMessage}.
     *
     * @return {@code True} if remote nodes support {@link HandshakeWaitMessage}.
     */
    private boolean isHandshakeWaitSupported() {
        DiscoverySpi discoSpi = ignite().configuration().getDiscoverySpi();

        if (discoSpi instanceof IgniteDiscoverySpi)
            return ((IgniteDiscoverySpi)discoSpi).allNodesSupport(IgniteFeatures.TCP_COMMUNICATION_SPI_HANDSHAKE_WAIT_MESSAGE);
        else {
            Collection<ClusterNode> nodes = discoSpi.getRemoteNodes();

            return IgniteFeatures.allNodesSupports(nodes, IgniteFeatures.TCP_COMMUNICATION_SPI_HANDSHAKE_WAIT_MESSAGE);
        }
    }

    /** {@inheritDoc} */
    @Override public String toString() {
        return S.toString(TcpCommunicationSpi.class, this);
    }

    /**
     * This worker takes responsibility to shut the server down when stopping,
     * No other thread shall stop passed server.
     */
    private class ShmemAcceptWorker extends GridWorker {
        /** */
        private final IpcSharedMemoryServerEndpoint srv;

        /**
         * @param srv Server.
         */
        ShmemAcceptWorker(IpcSharedMemoryServerEndpoint srv) {
            super(igniteInstanceName, "shmem-communication-acceptor", TcpCommunicationSpi.this.log);

            this.srv = srv;
        }

        /** {@inheritDoc} */
        @Override protected void body() throws InterruptedException {
            try {
                while (!Thread.interrupted()) {
                    ShmemWorker e = new ShmemWorker(srv.accept());

                    shmemWorkers.add(e);

                    new IgniteThread(e).start();
                }
            }
            catch (IgniteCheckedException e) {
                if (!isCancelled())
                    U.error(log, "Shmem server failed.", e);
            }
            finally {
                srv.close();
            }
        }

        /** {@inheritDoc} */
        @Override public void cancel() {
            super.cancel();

            srv.close();
        }
    }

    /**
     * Write message type to output stream.
     *
     * @param os Output stream.
     * @param type Message type.
     * @throws IOException On error.
     */
    private static void writeMessageType(OutputStream os, short type) throws IOException {
        os.write((byte)(type & 0xFF));
        os.write((byte)((type >> 8) & 0xFF));
    }

    /**
     * Write message type to byte buffer.
     *
     * @param buf Byte buffer.
     * @param type Message type.
     */
    public static void writeMessageType(ByteBuffer buf, short type) {
        buf.put((byte)(type & 0xFF));
        buf.put((byte)((type >> 8) & 0xFF));
    }

    /**
     * Concatenates the two parameter bytes to form a message type value.
     *
     * @param b0 The first byte.
     * @param b1 The second byte.
     */
    public static short makeMessageType(byte b0, byte b1) {
        return (short)((b1 & 0xFF) << 8 | b0 & 0xFF);
    }

    /**
     * @param ignite Ignite.
     */
    private static WorkersRegistry getWorkersRegistry(Ignite ignite) {
        return ignite instanceof IgniteEx ? ((IgniteEx)ignite).context().workersRegistry() : null;
    }

    /**
     * @param remote Destination cluster node to communicate with.
     * @param initMsg Configuration channel attributes wrapped into the message.
     * @return The future, which will be finished on channel ready.
     * @throws IgniteSpiException If fails.
     */
    public IgniteInternalFuture<Channel> openChannel(
        ClusterNode remote,
        Message initMsg
    ) throws IgniteSpiException {
        assert !remote.isLocal() : remote;
        assert initMsg != null;
        assert chConnPlc != null;
        assert nodeSupports(remote, CHANNEL_COMMUNICATION) : "Node doesn't support direct connection over socket channel " +
                "[nodeId=" + remote.id() + ']';

        ConnectionKey key = new ConnectionKey(remote.id(), chConnPlc.connectionIndex());

        GridFutureAdapter<Channel> chFut = new GridFutureAdapter<>();

        connectGate.enter();

        try {
            GridNioSession ses = createNioSession(remote, key.connectionIndex());

            assert ses != null : "Session must be established [remoteId=" + remote.id() + ", key=" + key + ']';

            cleanupLocalNodeRecoveryDescriptor(key);
            ses.addMeta(CHANNEL_FUT_META, chFut);

            // Send configuration message over the created session.
            ses.send(initMsg)
                .listen(f -> {
                    if (f.error() != null) {
                        GridFutureAdapter<Channel> rq = ses.meta(CHANNEL_FUT_META);

                        assert rq != null;

                        rq.onDone(f.error());

                        ses.close();

                        return;
                    }

                    addTimeoutObject(new IgniteSpiTimeoutObject() {
                        @Override public IgniteUuid id() {
                            return IgniteUuid.randomUuid();
                        }

                        @Override public long endTime() {
                            return U.currentTimeMillis() + connTimeout;
                        }

                        @Override public void onTimeout() {
                            // Close session if request not complete yet.
                            GridFutureAdapter<Channel> rq = ses.meta(CHANNEL_FUT_META);

                            assert rq != null;

                            if (rq.onDone(handshakeTimeoutException()))
                                ses.close();
                        }
                    });
                });

            return chFut;
        }
        catch (IgniteCheckedException e) {
            throw new IgniteSpiException("Unable to create new channel connection to the remote node: " + remote, e);
        }
        finally {
            connectGate.leave();
        }
    }

    /**
     * @param connIdx Connection index to check.
     * @return {@code true} if connection index is related to the channel create request\response.
     */
    private boolean isChannelConnIdx(int connIdx) {
        return connIdx > MAX_CONN_PER_NODE;
    }

    /**
     *
     */
    private class DiscoveryListener implements GridLocalEventListener, HighPriorityListener {
        /** {@inheritDoc} */
        @Override public void onEvent(Event evt) {
            assert evt instanceof DiscoveryEvent : evt;
            assert evt.type() == EVT_NODE_LEFT || evt.type() == EVT_NODE_FAILED;

            ClusterNode node = ((DiscoveryEvent)evt).eventNode();

            onNodeLeft(node.consistentId(), node.id());
        }

        /** {@inheritDoc} */
        @Override public int order() {
            return 0;
        }
    }

    /**
     *
     */
    private class ShmemWorker extends GridWorker {
        /** */
        private final IpcEndpoint endpoint;

        /**
         * @param endpoint Endpoint.
         */
        private ShmemWorker(IpcEndpoint endpoint) {
            super(igniteInstanceName, "shmem-worker", TcpCommunicationSpi.this.log);

            this.endpoint = endpoint;
        }

        /** {@inheritDoc} */
        @Override protected void body() throws InterruptedException {
            try {
                MessageFactory msgFactory = new MessageFactory() {
                    private MessageFactory impl;

                    @Nullable @Override public Message create(short type) {
                        if (impl == null)
                            impl = getSpiContext().messageFactory();

                        assert impl != null;

                        return impl.create(type);
                    }
                };

                GridNioMessageWriterFactory writerFactory = new GridNioMessageWriterFactory() {
                    private MessageFormatter formatter;

                    @Override public MessageWriter writer(GridNioSession ses) throws IgniteCheckedException {
                        if (formatter == null)
                            formatter = getSpiContext().messageFormatter();

                        assert formatter != null;

                        ConnectionKey connKey = ses.meta(CONN_IDX_META);

                        return connKey != null ? formatter.writer(connKey.nodeId()) : null;
                    }
                };

                GridNioMessageReaderFactory readerFactory = new GridNioMessageReaderFactory() {
                    private MessageFormatter formatter;

                    @Override public MessageReader reader(GridNioSession ses, MessageFactory msgFactory)
                        throws IgniteCheckedException {
                        if (formatter == null)
                            formatter = getSpiContext().messageFormatter();

                        assert formatter != null;

                        ConnectionKey connKey = ses.meta(CONN_IDX_META);

                        return connKey != null ? formatter.reader(connKey.nodeId(), msgFactory) : null;
                    }
                };

                IpcToNioAdapter<Message> adapter = new IpcToNioAdapter<>(
                    metricsLsnr.metricRegistry(),
                    log,
                    endpoint,
                    srvLsnr,
                    writerFactory,
                    new GridNioTracerFilter(log, tracing),
                    new GridNioCodecFilter(
                        new GridDirectParser(log.getLogger(GridDirectParser.class), msgFactory, readerFactory),
                        log,
                        true),
                    new GridConnectionBytesVerifyFilter(log)
                );

                adapter.serve();
            }
            finally {
                shmemWorkers.remove(this);

                endpoint.close();
            }
        }

        /** {@inheritDoc} */
        @Override public void cancel() {
            super.cancel();

            endpoint.close();
        }

        /** {@inheritDoc} */
        @Override protected void cleanup() {
            super.cleanup();

            endpoint.close();
        }

        /** {@inheritDoc} */
        @Override public String toString() {
            return S.toString(ShmemWorker.class, this);
        }
    }

    /**
     *
     */
    private class CommunicationWorker extends GridWorker {
        /** */
        private final BlockingQueue<DisconnectedSessionInfo> q = new LinkedBlockingQueue<>();

        /**
         * @param igniteInstanceName Ignite instance name.
         * @param log Logger.
         */
        private CommunicationWorker(String igniteInstanceName, IgniteLogger log) {
            super(igniteInstanceName, "tcp-comm-worker", log, getWorkersRegistry(ignite));
        }

        /** */
        @Override protected void body() throws InterruptedException {
            if (log.isDebugEnabled())
                log.debug("Tcp communication worker has been started.");

            Throwable err = null;

            try {
                while (!isCancelled()) {
                    DisconnectedSessionInfo disconnectData;

                    blockingSectionBegin();

                    try {
                        disconnectData = q.poll(idleConnTimeout, TimeUnit.MILLISECONDS);
                    }
                    finally {
                        blockingSectionEnd();
                    }

                    if (disconnectData != null)
                        processDisconnect(disconnectData);
                    else
                        processIdle();

                    onIdle();
                }
            }
            catch (Throwable t) {
                if (!(t instanceof InterruptedException))
                    err = t;

                throw t;
            }
            finally {
                if (ignite instanceof IgniteEx) {
                    if (err == null && !stopping)
                        err = new IllegalStateException("Thread  " + getName() + " is terminated unexpectedly.");

                    if (err instanceof OutOfMemoryError)
                        ((IgniteEx)ignite).context().failure().process(new FailureContext(CRITICAL_ERROR, err));
                    else if (err != null)
                        ((IgniteEx)ignite).context().failure().process(
                            new FailureContext(SYSTEM_WORKER_TERMINATION, err));
                }
            }
        }

        /**
         *
         */
        private void processIdle() {
            cleanupRecovery();

            for (Map.Entry<UUID, GridCommunicationClient[]> e : clients.entrySet()) {
                UUID nodeId = e.getKey();

                for (GridCommunicationClient client : e.getValue()) {
                    if (client == null)
                        continue;

                    ClusterNode node = getSpiContext().node(nodeId);

                    if (node == null) {
                        if (log.isDebugEnabled())
                            log.debug("Forcing close of non-existent node connection: " + nodeId);

                        client.forceClose();

                        removeNodeClient(nodeId, client);

                        continue;
                    }

                    GridNioRecoveryDescriptor recovery = null;

                    if (!usePairedConnections(node) && client instanceof GridTcpNioCommunicationClient) {
                        recovery = recoveryDescs.get(new ConnectionKey(
                            node.id(), client.connectionIndex(), -1)
                        );

                        if (recovery != null && recovery.lastAcknowledged() != recovery.received()) {
                            RecoveryLastReceivedMessage msg = new RecoveryLastReceivedMessage(recovery.received());

                            if (log.isDebugEnabled())
                                log.debug("Send recovery acknowledgement on timeout [rmtNode=" + nodeId +
                                    ", rcvCnt=" + msg.received() + ']');

                            try {
                                nioSrvr.sendSystem(((GridTcpNioCommunicationClient)client).session(), msg);

                                recovery.lastAcknowledged(msg.received());
                            }
                            catch (IgniteCheckedException err) {
                                U.error(log, "Failed to send message: " + err, err);
                            }

                            continue;
                        }
                    }

                    long idleTime = client.getIdleTime();

                    if (idleTime >= idleConnTimeout) {
                        if (recovery == null && usePairedConnections(node))
                            recovery = outRecDescs.get(new ConnectionKey(
                                node.id(), client.connectionIndex(), -1)
                            );

                        if (recovery != null &&
                            recovery.nodeAlive(getSpiContext().node(nodeId)) &&
                            !recovery.messagesRequests().isEmpty()) {
                            if (log.isDebugEnabled())
                                log.debug("Node connection is idle, but there are unacknowledged messages, " +
                                    "will wait: " + nodeId);

                            continue;
                        }

                        if (log.isDebugEnabled())
                            log.debug("Closing idle node connection: " + nodeId);

                        if (client.close() || client.closed())
                            removeNodeClient(nodeId, client);
                    }
                }
            }

            for (GridNioSession ses : nioSrvr.sessions()) {
                GridNioRecoveryDescriptor recovery = ses.inRecoveryDescriptor();

                if (recovery != null && usePairedConnections(recovery.node())) {
                    assert ses.accepted() : ses;

                    sendAckOnTimeout(recovery, ses);
                }
            }
        }

        /**
         * @param recovery Recovery descriptor.
         * @param ses Session.
         */
        private void sendAckOnTimeout(GridNioRecoveryDescriptor recovery, GridNioSession ses) {
            if (recovery != null && recovery.lastAcknowledged() != recovery.received()) {
                RecoveryLastReceivedMessage msg = new RecoveryLastReceivedMessage(recovery.received());

                if (log.isDebugEnabled()) {
                    log.debug("Send recovery acknowledgement on timeout [rmtNode=" + recovery.node().id() +
                        ", rcvCnt=" + msg.received() +
                        ", lastAcked=" + recovery.lastAcknowledged() + ']');
                }

                try {
                    nioSrvr.sendSystem(ses, msg);

                    recovery.lastAcknowledged(msg.received());
                }
                catch (IgniteCheckedException e) {
                    U.error(log, "Failed to send message: " + e, e);
                }
            }
        }

        /**
         *
         */
        private void cleanupRecovery() {
            cleanupRecovery(recoveryDescs);
            cleanupRecovery(inRecDescs);
            cleanupRecovery(outRecDescs);
        }

        /**
         * @param recoveryDescs Recovery descriptors to cleanup.
         */
        private void cleanupRecovery(ConcurrentMap<ConnectionKey, GridNioRecoveryDescriptor> recoveryDescs) {
            Set<ConnectionKey> left = null;

            for (Map.Entry<ConnectionKey, GridNioRecoveryDescriptor> e : recoveryDescs.entrySet()) {
                if (left != null && left.contains(e.getKey()))
                    continue;

                GridNioRecoveryDescriptor recoveryDesc = e.getValue();

                if (!recoveryDesc.nodeAlive(getSpiContext().node(e.getKey().nodeId()))) {
                    if (left == null)
                        left = new HashSet<>();

                    left.add(e.getKey());
                }
            }

            if (left != null) {
                assert !left.isEmpty();

                for (ConnectionKey id : left) {
                    GridNioRecoveryDescriptor recoveryDesc = recoveryDescs.get(id);

                    if (recoveryDesc != null && recoveryDesc.onNodeLeft())
                        recoveryDescs.remove(id, recoveryDesc);
                }
            }
        }

        /**
         * @param sesInfo Disconnected session information.
         */
        private void processDisconnect(DisconnectedSessionInfo sesInfo) {
            GridNioRecoveryDescriptor recoveryDesc = sesInfo.recoveryDesc;

            ClusterNode node = recoveryDesc.node();

            if (!recoveryDesc.nodeAlive(getSpiContext().node(node.id())))
                return;

            try {
                if (log.isDebugEnabled())
                    log.debug("Recovery reconnect [rmtNode=" + recoveryDesc.node().id() + ']');

                GridCommunicationClient client = reserveClient(node, sesInfo.connIdx);

                client.release();
            }
            catch (ClusterTopologyCheckedException e) {
                if (log.isDebugEnabled())
                    log.debug("Recovery reconnect failed, node stopping [rmtNode=" + recoveryDesc.node().id() + ']');
            }
            catch (IgniteTooManyOpenFilesException e) {
                onException(e.getMessage(), e);

                throw e;
            }
            catch (IgniteCheckedException | IgniteException e) {
                try {
                    if (recoveryDesc.nodeAlive(getSpiContext().node(node.id())) && getSpiContext().pingNode(node.id())) {
                        if (log.isDebugEnabled())
                            log.debug("Recovery reconnect failed, will retry " +
                                "[rmtNode=" + recoveryDesc.node().id() + ", err=" + e + ']');

                        addProcessDisconnectRequest(sesInfo);
                    }
                    else {
                        if (log.isDebugEnabled())
                            log.debug("Recovery reconnect failed, " +
                                "node left [rmtNode=" + recoveryDesc.node().id() + ", err=" + e + ']');

                        onException("Recovery reconnect failed, node left [rmtNode=" + recoveryDesc.node().id() + "]",
                            e);
                    }
                }
                catch (IgniteClientDisconnectedException ignored) {
                    if (log.isDebugEnabled())
                        log.debug("Failed to ping node, client disconnected.");
                }
            }
        }

        /**
         * @param sesInfo Disconnected session information.
         */
        void addProcessDisconnectRequest(DisconnectedSessionInfo sesInfo) {
            boolean add = q.add(sesInfo);

            assert add;
        }
    }

    /**
     *
     */
    private static class ConnectFuture extends GridFutureAdapter<GridCommunicationClient> {
        // No-op.
    }

    /**
     *
     */
    private static class HandshakeTimeoutObject<T> implements IgniteSpiTimeoutObject {
        /** */
        private final IgniteUuid id = IgniteUuid.randomUuid();

        /** */
        private final T obj;

        /** */
        private final long endTime;

        /** */
        private final AtomicBoolean done = new AtomicBoolean();

        /**
         * @param obj Client.
         * @param endTime End time.
         */
        private HandshakeTimeoutObject(T obj, long endTime) {
            assert obj != null;
            assert obj instanceof GridCommunicationClient || obj instanceof SelectableChannel;
            assert endTime > 0;

            this.obj = obj;
            this.endTime = endTime;
        }

        /**
         * @return {@code True} if object has not yet been timed out.
         */
        boolean cancel() {
            return done.compareAndSet(false, true);
        }

        /** {@inheritDoc} */
        @Override public void onTimeout() {
            if (done.compareAndSet(false, true)) {
                // Close socket - timeout occurred.
                if (obj instanceof GridCommunicationClient)
                    ((GridCommunicationClient)obj).forceClose();
                else
                    U.closeQuiet((AutoCloseable)obj);
            }
        }

        /** {@inheritDoc} */
        @Override public long endTime() {
            return endTime;
        }

        /** {@inheritDoc} */
        @Override public IgniteUuid id() {
            return id;
        }

        /** {@inheritDoc} */
        @Override public String toString() {
            return S.toString(HandshakeTimeoutObject.class, this);
        }
    }

    /**
     *
     */
    private class HandshakeClosure extends IgniteInClosure2X<InputStream, OutputStream> {
        /** */
        private static final long serialVersionUID = 0L;

        /** */
        private final UUID rmtNodeId;

        /**
         * @param rmtNodeId Remote node ID.
         */
        private HandshakeClosure(UUID rmtNodeId) {
            this.rmtNodeId = rmtNodeId;
        }

        /** {@inheritDoc} */
        @Override public void applyx(InputStream in, OutputStream out) throws IgniteCheckedException {
            try {
                // Handshake.
                byte[] b = new byte[NodeIdMessage.MESSAGE_FULL_SIZE];

                int n = 0;

                while (n < NodeIdMessage.MESSAGE_FULL_SIZE) {
                    int cnt = in.read(b, n, NodeIdMessage.MESSAGE_FULL_SIZE - n);

                    if (cnt < 0)
                        throw new IgniteCheckedException("Failed to get remote node ID (end of stream reached)");

                    n += cnt;
                }

                // First 4 bytes are for length.
                UUID id = U.bytesToUuid(b, Message.DIRECT_TYPE_SIZE);

                if (!rmtNodeId.equals(id))
                    throw new IgniteCheckedException("Remote node ID is not as expected [expected=" + rmtNodeId +
                        ", rcvd=" + id + ']');
                else if (log.isDebugEnabled())
                    log.debug("Received remote node ID: " + id);
            }
            catch (SocketTimeoutException e) {
                throw new IgniteCheckedException("Failed to perform handshake due to timeout (consider increasing " +
                    "'connectionTimeout' configuration property).", e);
            }
            catch (IOException e) {
                throw new IgniteCheckedException("Failed to perform handshake.", e);
            }

            try {
                ClusterNode localNode = getLocalNode();

                if (localNode == null)
                    throw new IgniteSpiException("Local node has not been started or fully initialized " +
                        "[isStopping=" + getSpiContext().isStopping() + ']');

                UUID id = localNode.id();

                NodeIdMessage msg = new NodeIdMessage(id);

                out.write(U.IGNITE_HEADER);
                writeMessageType(out, NODE_ID_MSG_TYPE);
                out.write(msg.nodeIdBytes());

                out.flush();

                if (log.isDebugEnabled())
                    log.debug("Sent local node ID [locNodeId=" + id + ", rmtNodeId=" + rmtNodeId + ']');
            }
            catch (IOException e) {
                throw new IgniteCheckedException("Failed to perform handshake.", e);
            }
        }
    }

    /**
     *
     */
    private static class ConnectGateway {
        /** */
        private GridSpinReadWriteLock lock = new GridSpinReadWriteLock();

        /** */
        private IgniteException err;

        /**
         *
         */
        void enter() {
            lock.readLock();

            if (err != null) {
                lock.readUnlock();

                throw err;
            }
        }

        /**
         * @return {@code True} if entered gateway.
         */
        boolean tryEnter() {
            lock.readLock();

            boolean res = err == null;

            if (!res)
                lock.readUnlock();

            return res;
        }

        /**
         *
         */
        void leave() {
            lock.readUnlock();
        }

        /**
         * @param reconnectFut Reconnect future.
         */
        void disconnected(IgniteFuture<?> reconnectFut) {
            lock.writeLock();

            err = new IgniteClientDisconnectedException(reconnectFut, "Failed to connect, client node disconnected.");

            lock.writeUnlock();
        }

        /**
         *
         */
        void reconnected() {
            lock.writeLock();

            try {
                if (err instanceof IgniteClientDisconnectedException)
                    err = null;
            }
            finally {
                lock.writeUnlock();
            }
        }

        /**
         *
         */
        void stopped() {
            lock.readLock();

            err = new IgniteException("Failed to connect, node stopped.");

            lock.readUnlock();
        }
    }

    /**
     *
     */
    private static class DisconnectedSessionInfo {
        /** */
        private final GridNioRecoveryDescriptor recoveryDesc;

        /** */
        private int connIdx;

        /**
         * @param recoveryDesc Recovery descriptor.
         * @param connIdx Connection index.
         */
        DisconnectedSessionInfo(@Nullable GridNioRecoveryDescriptor recoveryDesc, int connIdx) {
            this.recoveryDesc = recoveryDesc;
            this.connIdx = connIdx;
        }

        /** {@inheritDoc} */
        @Override public String toString() {
            return S.toString(DisconnectedSessionInfo.class, this);
        }
    }

    /**
     *
     */
    interface ConnectionPolicy {
        /**
         * @return Thread connection index.
         */
        int connectionIndex();
    }

    /** */
    private static class FirstConnectionPolicy implements ConnectionPolicy {
        /** {@inheritDoc} */
        @Override public int connectionIndex() {
            return 0;
        }
    }

    /** */
    private class RoundRobinConnectionPolicy implements ConnectionPolicy {
        /** {@inheritDoc} */
        @Override public int connectionIndex() {
            return (int)(U.safeAbs(Thread.currentThread().getId()) % connectionsPerNode);
        }
    }

    /**
     * MBean implementation for TcpCommunicationSpi.
     */
    private class TcpCommunicationSpiMBeanImpl extends IgniteSpiMBeanAdapter implements TcpCommunicationSpiMBean {
        /** {@inheritDoc} */
        TcpCommunicationSpiMBeanImpl(IgniteSpiAdapter spiAdapter) {
            super(spiAdapter);
        }

        /** {@inheritDoc} */
        @Override public String getLocalAddress() {
            return TcpCommunicationSpi.this.getLocalAddress();
        }

        /** {@inheritDoc} */
        @Override public int getLocalPort() {
            return TcpCommunicationSpi.this.getLocalPort();
        }

        /** {@inheritDoc} */
        @Override public int getLocalPortRange() {
            return TcpCommunicationSpi.this.getLocalPortRange();
        }

        /** {@inheritDoc} */
        @Override public boolean isUsePairedConnections() {
            return TcpCommunicationSpi.this.isUsePairedConnections();
        }

        /** {@inheritDoc} */
        @Override public int getConnectionsPerNode() {
            return TcpCommunicationSpi.this.getConnectionsPerNode();
        }

        /** {@inheritDoc} */
        @Override public int getSharedMemoryPort() {
            return TcpCommunicationSpi.this.getSharedMemoryPort();
        }

        /** {@inheritDoc} */
        @Override public long getIdleConnectionTimeout() {
            return TcpCommunicationSpi.this.getIdleConnectionTimeout();
        }

        /** {@inheritDoc} */
        @Override public long getSocketWriteTimeout() {
            return TcpCommunicationSpi.this.getSocketWriteTimeout();
        }

        /** {@inheritDoc} */
        @Override public int getAckSendThreshold() {
            return TcpCommunicationSpi.this.getAckSendThreshold();
        }

        /** {@inheritDoc} */
        @Override public int getUnacknowledgedMessagesBufferSize() {
            return TcpCommunicationSpi.this.getUnacknowledgedMessagesBufferSize();
        }

        /** {@inheritDoc} */
        @Override public long getConnectTimeout() {
            return TcpCommunicationSpi.this.getConnectTimeout();
        }

        /** {@inheritDoc} */
        @Override public long getMaxConnectTimeout() {
            return TcpCommunicationSpi.this.getMaxConnectTimeout();
        }

        /** {@inheritDoc} */
        @Override public int getReconnectCount() {
            return TcpCommunicationSpi.this.getReconnectCount();
        }

        /** {@inheritDoc} */
        @Override public boolean isDirectBuffer() {
            return TcpCommunicationSpi.this.isDirectBuffer();
        }

        /** {@inheritDoc} */
        @Override public boolean isDirectSendBuffer() {
            return TcpCommunicationSpi.this.isDirectSendBuffer();
        }

        /** {@inheritDoc} */
        @Override public int getSelectorsCount() {
            return TcpCommunicationSpi.this.getSelectorsCount();
        }

        /** {@inheritDoc} */
        @Override public long getSelectorSpins() {
            return TcpCommunicationSpi.this.getSelectorSpins();
        }

        /** {@inheritDoc} */
        @Override public boolean isTcpNoDelay() {
            return TcpCommunicationSpi.this.isTcpNoDelay();
        }

        /** {@inheritDoc} */
        @Override public int getSocketReceiveBuffer() {
            return TcpCommunicationSpi.this.getSocketReceiveBuffer();
        }

        /** {@inheritDoc} */
        @Override public int getSocketSendBuffer() {
            return TcpCommunicationSpi.this.getSocketSendBuffer();
        }

        /** {@inheritDoc} */
        @Override public int getMessageQueueLimit() {
            return TcpCommunicationSpi.this.getMessageQueueLimit();
        }

        /** {@inheritDoc} */
        @Override public int getSlowClientQueueLimit() {
            return TcpCommunicationSpi.this.getSlowClientQueueLimit();
        }

        /** {@inheritDoc} */
        @Override public void dumpStats() {
            TcpCommunicationSpi.this.dumpStats();
        }

        /** {@inheritDoc} */
        @Override public int getSentMessagesCount() {
            return TcpCommunicationSpi.this.getSentMessagesCount();
        }

        /** {@inheritDoc} */
        @Override public long getSentBytesCount() {
            return TcpCommunicationSpi.this.getSentBytesCount();
        }

        /** {@inheritDoc} */
        @Override public int getReceivedMessagesCount() {
            return TcpCommunicationSpi.this.getReceivedMessagesCount();
        }

        /** {@inheritDoc} */
        @Override public long getReceivedBytesCount() {
            return TcpCommunicationSpi.this.getReceivedBytesCount();
        }

        /** {@inheritDoc} */
        @Override public Map<String, Long> getReceivedMessagesByType() {
            return TcpCommunicationSpi.this.getReceivedMessagesByType();
        }

        /** {@inheritDoc} */
        @Override public Map<UUID, Long> getReceivedMessagesByNode() {
            return TcpCommunicationSpi.this.getReceivedMessagesByNode();
        }

        /** {@inheritDoc} */
        @Override public Map<String, Long> getSentMessagesByType() {
            return TcpCommunicationSpi.this.getSentMessagesByType();
        }

        /** {@inheritDoc} */
        @Override public Map<UUID, Long> getSentMessagesByNode() {
            return TcpCommunicationSpi.this.getSentMessagesByNode();
        }

        /** {@inheritDoc} */
        @Override public int getOutboundMessagesQueueSize() {
            return TcpCommunicationSpi.this.getOutboundMessagesQueueSize();
        }
    }
}
