/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.ignite.spi.discovery.tcp;

import java.io.BufferedInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.InterruptedIOException;
import java.io.StreamCorruptedException;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.net.SocketTimeoutException;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
import java.util.NavigableSet;
import java.util.Queue;
import java.util.TreeMap;
import java.util.TreeSet;
import java.util.UUID;
import java.util.concurrent.BlockingDeque;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicReference;
import javax.net.ssl.SSLException;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteClientDisconnectedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteInterruptedException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.IgniteSystemProperties;
import org.apache.ignite.cache.CacheMetrics;
import org.apache.ignite.cluster.ClusterMetrics;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.failure.FailureContext;
import org.apache.ignite.internal.IgniteClientDisconnectedCheckedException;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.IgniteFeatures;
import org.apache.ignite.internal.IgniteInterruptedCheckedException;
import org.apache.ignite.internal.IgniteNodeAttributes;
import org.apache.ignite.internal.managers.discovery.CustomMessageWrapper;
import org.apache.ignite.internal.managers.discovery.DiscoveryServerOnlyCustomMessage;
import org.apache.ignite.internal.processors.tracing.Span;
import org.apache.ignite.internal.processors.tracing.SpanTags;
import org.apache.ignite.internal.processors.tracing.messages.SpanContainer;
import org.apache.ignite.internal.processors.tracing.messages.TraceableMessage;
import org.apache.ignite.internal.processors.tracing.messages.TraceableMessagesTable;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.internal.util.tostring.GridToStringExclude;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.T2;
import org.apache.ignite.internal.util.typedef.T3;
import org.apache.ignite.internal.util.typedef.X;
import org.apache.ignite.internal.util.typedef.internal.LT;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.internal.util.worker.GridWorker;
import org.apache.ignite.internal.worker.WorkersRegistry;
import org.apache.ignite.lang.IgniteInClosure;
import org.apache.ignite.lang.IgniteUuid;
import org.apache.ignite.spi.IgniteSpiAdapter;
import org.apache.ignite.spi.IgniteSpiContext;
import org.apache.ignite.spi.IgniteSpiException;
import org.apache.ignite.spi.IgniteSpiOperationTimeoutHelper;
import org.apache.ignite.spi.IgniteSpiThread;
import org.apache.ignite.spi.discovery.DiscoveryNotification;
import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage;
import org.apache.ignite.spi.discovery.DiscoverySpiListener;
import org.apache.ignite.spi.discovery.tcp.internal.DiscoveryDataPacket;
import org.apache.ignite.spi.discovery.tcp.internal.TcpDiscoveryNode;
import org.apache.ignite.spi.discovery.tcp.internal.TcpDiscoveryNodesRing;
import org.apache.ignite.spi.discovery.tcp.ipfinder.multicast.TcpDiscoveryMulticastIpFinder;
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryAbstractMessage;
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryAuthFailedMessage;
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryCheckFailedMessage;
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryClientAckResponse;
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryClientMetricsUpdateMessage;
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryClientPingRequest;
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryClientPingResponse;
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryClientReconnectMessage;
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryCustomEventMessage;
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryDuplicateIdMessage;
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryHandshakeRequest;
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryHandshakeResponse;
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryJoinRequestMessage;
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryMetricsUpdateMessage;
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryNodeAddFinishedMessage;
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryNodeAddedMessage;
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryNodeFailedMessage;
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryNodeLeftMessage;
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryPingRequest;
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryPingResponse;
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryRingLatencyCheckMessage;
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryServerOnlyCustomEventMessage;
import org.apache.ignite.thread.IgniteThreadFactory;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;

import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static org.apache.ignite.IgniteSystemProperties.IGNITE_DISCO_FAILED_CLIENT_RECONNECT_DELAY;
import static org.apache.ignite.events.EventType.EVT_CLIENT_NODE_DISCONNECTED;
import static org.apache.ignite.events.EventType.EVT_CLIENT_NODE_RECONNECTED;
import static org.apache.ignite.events.EventType.EVT_NODE_FAILED;
import static org.apache.ignite.events.EventType.EVT_NODE_JOINED;
import static org.apache.ignite.events.EventType.EVT_NODE_LEFT;
import static org.apache.ignite.events.EventType.EVT_NODE_METRICS_UPDATED;
import static org.apache.ignite.events.EventType.EVT_NODE_SEGMENTED;
import static org.apache.ignite.failure.FailureType.CRITICAL_ERROR;
import static org.apache.ignite.internal.events.DiscoveryCustomEvent.EVT_DISCOVERY_CUSTOM_EVT;
import static org.apache.ignite.spi.discovery.tcp.ClientImpl.State.CONNECTED;
import static org.apache.ignite.spi.discovery.tcp.ClientImpl.State.DISCONNECTED;
import static org.apache.ignite.spi.discovery.tcp.ClientImpl.State.SEGMENTED;
import static org.apache.ignite.spi.discovery.tcp.ClientImpl.State.STARTING;
import static org.apache.ignite.spi.discovery.tcp.ClientImpl.State.STOPPED;
import static org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi.DFLT_DISCO_FAILED_CLIENT_RECONNECT_DELAY;
import static org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi.DFLT_THROTTLE_RECONNECT_RESET_TIMEOUT_INTERVAL;

/**
 *
 */
class ClientImpl extends TcpDiscoveryImpl {
    /** */
    private static final Object SPI_STOP = "SPI_STOP";

    /** */
    private static final Object SPI_RECONNECT_FAILED = "SPI_RECONNECT_FAILED";

    /** */
    private static final Object SPI_RECONNECT = "SPI_RECONNECT";

    /** */
    private static final long CLIENT_THROTTLE_RECONNECT_RESET_TIMEOUT = IgniteSystemProperties.getLong(
        IgniteSystemProperties.CLIENT_THROTTLE_RECONNECT_RESET_TIMEOUT_INTERVAL,
        DFLT_THROTTLE_RECONNECT_RESET_TIMEOUT_INTERVAL
    );

    /** Remote nodes. */
    private final ConcurrentMap<UUID, TcpDiscoveryNode> rmtNodes = new ConcurrentHashMap<>();

    /** */
    private final List<DiscoveryDataPacket> delayDiscoData = new ArrayList<>();

    /** Topology history. */
    private final NavigableMap<Long, Collection<ClusterNode>> topHist = new TreeMap<>();

    /** Remote nodes. */
    private final ConcurrentMap<UUID, GridFutureAdapter<Boolean>> pingFuts = new ConcurrentHashMap<>();

    /** Socket writer. */
    private SocketWriter sockWriter;

    /** */
    private SocketReader sockReader;

    /** */
    private volatile State state;

    /** Last message ID. */
    private volatile IgniteUuid lastMsgId;

    /** Current topology version. */
    private volatile long topVer;

    /** Join error. Contains error what occurs on join process. */
    private final AtomicReference<IgniteSpiException> joinErr = new AtomicReference<>();

    /** Joined latch. */
    private final CountDownLatch joinLatch = new CountDownLatch(1);

    /** Left latch. */
    private final CountDownLatch leaveLatch = new CountDownLatch(1);

    /** */
    private final ScheduledExecutorService executorService;

    /** */
    private MessageWorker msgWorker;

    /** Force fail message for local node. */
    private TcpDiscoveryNodeFailedMessage forceFailMsg;

    /** */
    @GridToStringExclude
    private int joinCnt;

    /**
     * @param adapter Adapter.
     */
    ClientImpl(TcpDiscoverySpi adapter) {
        super(adapter);

        String instanceName = adapter.ignite() == null || adapter.ignite().name() == null
            ? "client-node" : adapter.ignite().name();

        executorService = Executors.newSingleThreadScheduledExecutor(
            new IgniteThreadFactory(instanceName, "tcp-discovery-exec"));
    }

    /** {@inheritDoc} */
    @Override public void dumpDebugInfo(IgniteLogger log) {
        StringBuilder b = new StringBuilder(U.nl());

        b.append(">>>").append(U.nl());
        b.append(">>>").append("Dumping discovery SPI debug info.").append(U.nl());
        b.append(">>>").append(U.nl());

        b.append("Local node ID: ").append(getLocalNodeId()).append(U.nl()).append(U.nl());
        b.append("Local node: ").append(locNode).append(U.nl()).append(U.nl());

        b.append("Internal threads: ").append(U.nl());

        b.append("    Message worker: ").append(threadStatus(msgWorker.runner())).append(U.nl());
        b.append("    Socket reader: ").append(threadStatus(sockReader)).append(U.nl());
        b.append("    Socket writer: ").append(threadStatus(sockWriter)).append(U.nl());

        b.append(U.nl());

        b.append("Nodes: ").append(U.nl());

        for (ClusterNode node : allVisibleNodes())
            b.append("    ").append(node.id()).append(U.nl());

        b.append(U.nl());

        b.append("Stats: ").append(spi.stats).append(U.nl());

        U.quietAndInfo(log, b.toString());
    }

    /** {@inheritDoc} */
    @Override public void dumpRingStructure(IgniteLogger log) {
        ClusterNode[] serverNodes = getRemoteNodes().stream()
                .filter(node -> !node.isClient())
                .sorted(Comparator.comparingLong(ClusterNode::order))
                .toArray(ClusterNode[]::new);

        U.quietAndInfo(log, Arrays.toString(serverNodes));
    }

    /** {@inheritDoc} */
    @Override public long getCurrentTopologyVersion() {
        return topVer;
    }

    /** {@inheritDoc} */
    @Override public String getSpiState() {

        if (sockWriter.isOnline())
            return "connected";

        return "disconnected";
    }

    /** {@inheritDoc} */
    @Override public int getMessageWorkerQueueSize() {
        return msgWorker.queueSize();
    }

    /** {@inheritDoc} */
    @Override public UUID getCoordinator() {
        return null;
    }

    /** {@inheritDoc} */
    @Override public void spiStart(@Nullable String igniteInstanceName) throws IgniteSpiException {
        spi.initLocalNode(
            0,
            true);

        locNode = spi.locNode;

        // Marshal credentials for backward compatibility and security.
        marshalCredentials(locNode);

        sockWriter = new SocketWriter();
        sockWriter.start();

        sockReader = new SocketReader();
        sockReader.start();

        if (spi.ipFinder.isShared() && spi.isForceServerMode())
            registerLocalNodeAddress();

        msgWorker = new MessageWorker(log);

        new IgniteSpiThread(msgWorker.igniteInstanceName(), msgWorker.name(), log) {
            @Override protected void body() {
                msgWorker.run();
            }
        }.start();

        executorService.scheduleAtFixedRate(new MetricsSender(), spi.metricsUpdateFreq, spi.metricsUpdateFreq, MILLISECONDS);

        try {
            joinLatch.await();

            IgniteSpiException err = joinErr.get();

            if (err != null)
                throw err;
        }
        catch (InterruptedException e) {
            throw new IgniteSpiException("Thread has been interrupted.", e);
        }

        spi.printStartInfo();
    }

    /** {@inheritDoc} */
    @Override public void spiStop() throws IgniteSpiException {
        if (msgWorker != null && !msgWorker.isDone()) { // Should always be alive
            msgWorker.addMessage(SPI_STOP);

            try {
                if (!leaveLatch.await(spi.netTimeout, MILLISECONDS))
                    U.warn(log, "Failed to left node: timeout [nodeId=" + locNode + ']');
            }
            catch (InterruptedException ignored) {
                // No-op.
            }
        }

        for (GridFutureAdapter<Boolean> fut : pingFuts.values())
            fut.onDone(false);

        rmtNodes.clear();

        if (msgWorker != null)
            U.interrupt(msgWorker.runner());

        U.interrupt(sockWriter);
        U.interrupt(sockReader);

        if (msgWorker != null)
            U.join(msgWorker.runner(), log);

        U.join(sockWriter, log);

        // SocketReader may loose interruption, this hack is made to overcome that case.
        while (!U.join(sockReader, log, 200))
            U.interrupt(sockReader);

        executorService.shutdownNow();

        spi.printStopInfo();
    }

    /** {@inheritDoc} */
    @Override public void onContextInitialized0(IgniteSpiContext spiCtx) throws IgniteSpiException {
        // No-op.
    }

    /** {@inheritDoc} */
    @Override public Collection<ClusterNode> getRemoteNodes() {
        return U.arrayList(rmtNodes.values(), TcpDiscoveryNodesRing.VISIBLE_NODES);
    }

    /** {@inheritDoc} */
    @Override public boolean allNodesSupport(IgniteFeatures feature) {
        return IgniteFeatures.allNodesSupports(upcast(rmtNodes.values()), feature);
    }

    /** {@inheritDoc} */
    @Nullable @Override public ClusterNode getNode(UUID nodeId) {
        if (getLocalNodeId().equals(nodeId))
            return locNode;

        TcpDiscoveryNode node = rmtNodes.get(nodeId);

        return node != null && node.visible() ? node : null;
    }

    /** {@inheritDoc} */
    @Override public boolean pingNode(@NotNull final UUID nodeId) {
        if (nodeId.equals(getLocalNodeId()))
            return true;

        TcpDiscoveryNode node = rmtNodes.get(nodeId);

        if (node == null || !node.visible())
            return false;

        GridFutureAdapter<Boolean> fut = pingFuts.get(nodeId);

        if (fut == null) {
            fut = new GridFutureAdapter<>();

            GridFutureAdapter<Boolean> oldFut = pingFuts.putIfAbsent(nodeId, fut);

            if (oldFut != null)
                fut = oldFut;
            else {
                State state = this.state;

                if (spi.getSpiContext().isStopping() || state == STOPPED || state == SEGMENTED) {
                    if (pingFuts.remove(nodeId, fut))
                        fut.onDone(false);

                    return false;
                }
                else if (state == DISCONNECTED) {
                    if (pingFuts.remove(nodeId, fut))
                        fut.onDone(new IgniteClientDisconnectedCheckedException(null,
                            "Failed to ping node, client node disconnected."));
                }
                else {
                    final GridFutureAdapter<Boolean> finalFut = fut;

                    executorService.schedule(() -> {
                        if (pingFuts.remove(nodeId, finalFut)) {
                            if (ClientImpl.this.state == DISCONNECTED)
                                finalFut.onDone(new IgniteClientDisconnectedCheckedException(null,
                                    "Failed to ping node, client node disconnected."));
                            else
                                finalFut.onDone(false);
                        }
                    }, spi.netTimeout, MILLISECONDS);

                    sockWriter.sendMessage(new TcpDiscoveryClientPingRequest(getLocalNodeId(), nodeId));
                }
            }
        }

        try {
            return fut.get();
        }
        catch (IgniteInterruptedCheckedException ignored) {
            return false;
        }
        catch (IgniteCheckedException e) {
            throw new IgniteSpiException(e);
        }
    }

    /** {@inheritDoc} */
    @Override public void disconnect() throws IgniteSpiException {
        if (msgWorker != null)
            U.interrupt(msgWorker.runner());

        U.interrupt(sockWriter);
        U.interrupt(sockReader);

        if (msgWorker != null)
            U.join(msgWorker.runner(), log);

        U.join(sockWriter, log);
        U.join(sockReader, log);

        leaveLatch.countDown();
        joinLatch.countDown();

        spi.getSpiContext().deregisterPorts();

        Collection<ClusterNode> rmts = getRemoteNodes();

        // This is restart/disconnection and remote nodes are not empty.
        // We need to fire FAIL event for each.
        DiscoverySpiListener lsnr = spi.lsnr;

        if (lsnr != null) {
            for (ClusterNode n : rmts) {
                rmtNodes.remove(n.id());

                Collection<ClusterNode> top = updateTopologyHistory(topVer + 1, null);

                lsnr.onDiscovery(
                    new DiscoveryNotification(
                        EVT_NODE_FAILED, topVer, n, top, new TreeMap<>(topHist), null, null
                    )
                ).get();
            }
        }

        rmtNodes.clear();
    }

    /** {@inheritDoc} */
    @Override public void sendCustomEvent(DiscoverySpiCustomMessage evt) {
        State state = this.state;

        if (state == SEGMENTED)
            throw new IgniteException("Failed to send custom message: client is segmented.");

        if (state == DISCONNECTED)
            throw new IgniteClientDisconnectedException(null, "Failed to send custom message: client is disconnected.");

        try {
            TcpDiscoveryCustomEventMessage msg;

            if (((CustomMessageWrapper)evt).delegate() instanceof DiscoveryServerOnlyCustomMessage)
                msg = new TcpDiscoveryServerOnlyCustomEventMessage(getLocalNodeId(), evt,
                    U.marshal(spi.marshaller(), evt));
            else
                msg = new TcpDiscoveryCustomEventMessage(getLocalNodeId(), evt,
                    U.marshal(spi.marshaller(), evt));

            Span rootSpan = tracing.create(TraceableMessagesTable.traceName(msg.getClass()))
                .addTag(SpanTags.tag(SpanTags.EVENT_NODE, SpanTags.ID), () -> getLocalNodeId().toString())
                .addTag(SpanTags.tag(SpanTags.EVENT_NODE, SpanTags.CONSISTENT_ID),
                    () -> locNode.consistentId().toString())
                .addTag(SpanTags.MESSAGE_CLASS, () -> ((CustomMessageWrapper)evt).delegate().getClass().getSimpleName())
                .addLog(() -> "Created");

            // This root span will be parent both from local and remote nodes.
            msg.spanContainer().serializedSpanBytes(tracing.serialize(rootSpan));

            sockWriter.sendMessage(msg);

            rootSpan.addLog(() -> "Sent").end();
        }
        catch (IgniteCheckedException e) {
            throw new IgniteSpiException("Failed to marshal custom event: " + evt, e);
        }
    }

    /** {@inheritDoc} */
    @Override public void failNode(UUID nodeId, @Nullable String warning) {
        TcpDiscoveryNode node = rmtNodes.get(nodeId);

        if (node != null) {
            TcpDiscoveryNodeFailedMessage msg = new TcpDiscoveryNodeFailedMessage(getLocalNodeId(),
                node.id(),
                node.internalOrder());

            msg.warning(warning);

            msg.force(true);

            msgWorker.addMessage(msg);
        }
    }

    /**
     * @param prevAddr If reconnect is in progress, then previous address of the router the client was connected to
     *      and {@code null} otherwise.
     * @param timeout Timeout.
     * @param beforeEachSleep code to be run before each sleep span.
     * @param afterEachSleep code to be run before each sleep span.
     * @return Opened socket or {@code null} if timeout.
     * @throws InterruptedException If interrupted.
     * @throws IgniteSpiException If failed.
     * @see TcpDiscoverySpi#joinTimeout
     */
    @Nullable private T2<SocketStream, Boolean> joinTopology(
        InetSocketAddress prevAddr,
        long timeout,
        @Nullable Runnable beforeEachSleep,
        @Nullable Runnable afterEachSleep
    ) throws IgniteSpiException, InterruptedException {
        List<InetSocketAddress> addrs = null;

        long startNanos = System.nanoTime();

        while (true) {
            if (Thread.currentThread().isInterrupted())
                throw new InterruptedException();

            while (addrs == null || addrs.isEmpty()) {
                addrs = new ArrayList<>(spi.resolvedAddresses());

                if (!F.isEmpty(addrs)) {
                    if (log.isDebugEnabled())
                        log.debug("Resolved addresses from IP finder: " + addrs);
                }
                else {
                    if (timeout > 0 && U.millisSinceNanos(startNanos) > timeout)
                        return null;

                    LT.warn(log, "IP finder returned empty addresses list. " +
                            "Please check IP finder configuration" +
                            (spi.ipFinder instanceof TcpDiscoveryMulticastIpFinder ?
                                " and make sure multicast works on your network. " : ". ") +
                            "Will retry every " + spi.getReconnectDelay() + " ms. " +
                            "Change 'reconnectDelay' to configure the frequency of retries.", true);

                    sleepEx(spi.getReconnectDelay(), beforeEachSleep, afterEachSleep);
                }
            }

            // Process failed node last.
            if (prevAddr != null) {
                int idx = addrs.indexOf(prevAddr);

                if (idx != -1)
                    Collections.swap(addrs, idx, 0);
            }

            Collection<InetSocketAddress> addrs0 = new ArrayList<>(addrs);

            boolean wait = false;

            for (int i = addrs.size() - 1; i >= 0; i--) {
                if (Thread.currentThread().isInterrupted())
                    throw new InterruptedException();

                InetSocketAddress addr = addrs.get(i);

                boolean recon = prevAddr != null;

                T3<SocketStream, Integer, Boolean> sockAndRes = sendJoinRequest(recon, addr);

                if (sockAndRes == null) {
                    addrs.remove(i);

                    continue;
                }

                assert sockAndRes.get1() != null && sockAndRes.get2() != null : sockAndRes;

                Socket sock = sockAndRes.get1().socket();

                if (log.isDebugEnabled())
                    log.debug("Received response to join request [addr=" + addr + ", res=" + sockAndRes.get2() + ']');

                switch (sockAndRes.get2()) {
                    case RES_OK:
                        return new T2<>(sockAndRes.get1(), sockAndRes.get3());

                    case RES_CONTINUE_JOIN:
                    case RES_WAIT:
                        wait = true;

                        U.closeQuiet(sock);

                        break;

                    default:
                        if (log.isDebugEnabled())
                            log.debug("Received unexpected response to join request: " + sockAndRes.get2());

                        U.closeQuiet(sock);
                }
            }

            if (timeout > 0 && U.millisSinceNanos(startNanos) > timeout)
                return null;

            if (wait) {
                if (log.isDebugEnabled())
                    log.debug("Will wait before retry join.");

                sleepEx(spi.getReconnectDelay(), beforeEachSleep, afterEachSleep);
            }
            else if (addrs.isEmpty()) {
                LT.warn(log, "Failed to connect to any address from IP finder (will retry to join topology " +
                    "every " + spi.getReconnectDelay() + " ms; change 'reconnectDelay' to configure the frequency " +
                    "of retries): " + toOrderedList(addrs0), true);

                sleepEx(spi.getReconnectDelay(), beforeEachSleep, afterEachSleep);
            }
        }
    }

    /** */
    private static void sleepEx(long millis, Runnable before, Runnable after) throws InterruptedException {
        if (before != null)
            before.run();

        try {
            Thread.sleep(millis);
        }
        finally {
            if (after != null)
                after.run();
        }
    }

    /**
     * @param recon {@code True} if reconnects.
     * @param addr Address.
     * @return Socket, connect response and client acknowledge support flag.
     */
    @Nullable private T3<SocketStream, Integer, Boolean> sendJoinRequest(boolean recon,
        InetSocketAddress addr) {
        assert addr != null;

        if (log.isDebugEnabled())
            log.debug("Send join request [addr=" + addr + ", reconnect=" + recon +
                ", locNodeId=" + getLocalNodeId() + ']');

        Collection<Throwable> errs = null;

        long ackTimeout0 = spi.getAckTimeout();

        int reconCnt = 0;

        int connectAttempts = 1;

        int sslConnectAttempts = 3;

        UUID locNodeId = getLocalNodeId();

        IgniteSpiOperationTimeoutHelper timeoutHelper = new IgniteSpiOperationTimeoutHelper(spi, true);

        DiscoveryDataPacket discoveryData = null;

        while (true) {
            boolean openSock = false;

            Socket sock = null;

            try {
                long tsNanos = System.nanoTime();

                sock = spi.openSocket(addr, timeoutHelper);

                openSock = true;

                TcpDiscoveryHandshakeRequest req = new TcpDiscoveryHandshakeRequest(locNodeId);

                req.client(true);

                spi.writeToSocket(sock, req, timeoutHelper.nextTimeoutChunk(spi.getSocketTimeout()));

                TcpDiscoveryHandshakeResponse res = spi.readMessage(sock, null, ackTimeout0);

                UUID rmtNodeId = res.creatorNodeId();

                assert rmtNodeId != null;
                assert !getLocalNodeId().equals(rmtNodeId);

                locNode.clientRouterNodeId(rmtNodeId);

                tsNanos = System.nanoTime();

                TcpDiscoveryAbstractMessage msg;

                if (!recon) {
                    TcpDiscoveryNode node = locNode;

                    if (locNode.order() > 0) {
                        node = locNode.clientReconnectNode(spi.locNodeAttrs);

                        marshalCredentials(node);
                    }

                    if (discoveryData == null) {
                        DiscoveryDataPacket dataPacket = new DiscoveryDataPacket(getLocalNodeId());

                        dataPacket.joiningNodeClient(true);

                        discoveryData = spi.collectExchangeData(dataPacket);
                    }

                    TcpDiscoveryJoinRequestMessage joinReqMsg = new TcpDiscoveryJoinRequestMessage(node, discoveryData);

                    TcpDiscoveryNode nodef = node;

                    joinReqMsg.spanContainer().span(
                        tracing.create(TraceableMessagesTable.traceName(joinReqMsg.getClass()))
                            .addTag(SpanTags.tag(SpanTags.EVENT_NODE, SpanTags.ID), () -> nodef.id().toString())
                            .addTag(SpanTags.tag(SpanTags.EVENT_NODE, SpanTags.CONSISTENT_ID),
                                () -> nodef.consistentId().toString())
                            .addLog(() -> "Created")
                            .end()
                    );

                    msg = joinReqMsg;

                    // During marshalling, SPI didn't know whether all nodes support compression as we didn't join yet.
                    // The only way to know is passing flag directly with handshake response.
                    if (!res.isDiscoveryDataPacketCompression())
                        ((TcpDiscoveryJoinRequestMessage)msg).gridDiscoveryData().unzipData(log);
                }
                else
                    msg = new TcpDiscoveryClientReconnectMessage(getLocalNodeId(), rmtNodeId, lastMsgId);

                msg.client(true);

                if (msg instanceof TraceableMessage)
                    tracing.messages().beforeSend((TraceableMessage) msg);

                spi.writeToSocket(sock, msg, timeoutHelper.nextTimeoutChunk(spi.getSocketTimeout()));

                spi.stats.onMessageSent(msg, U.millisSinceNanos(tsNanos));

                if (log.isDebugEnabled())
                    log.debug("Message has been sent to address [msg=" + msg + ", addr=" + addr +
                        ", rmtNodeId=" + rmtNodeId + ']');

                return new T3<>(new SocketStream(sock),
                    spi.readReceipt(sock, timeoutHelper.nextTimeoutChunk(ackTimeout0)),
                    res.clientAck());
            }
            catch (IOException | IgniteCheckedException e) {
                U.closeQuiet(sock);

                if (log.isDebugEnabled())
                    log.error("Exception on joining: " + e.getMessage(), e);

                onException("Exception on joining: " + e.getMessage(), e);

                if (errs == null)
                    errs = new ArrayList<>();

                errs.add(e);

                if (X.hasCause(e, SSLException.class)) {
                    if (--sslConnectAttempts == 0)
                        throw new IgniteSpiException("Unable to establish secure connection. " +
                            "Was remote cluster configured with SSL? [rmtAddr=" + addr + ", errMsg=\"" + e.getMessage() + "\"]", e);

                    continue;
                }

                if (X.hasCause(e, StreamCorruptedException.class)) {
                    // StreamCorruptedException could be caused by remote node failover
                    if (connectAttempts < 2) {
                        connectAttempts++;

                        continue;
                    }

                    if (log.isDebugEnabled())
                        log.debug("Connect failed with StreamCorruptedException, skip address: " + addr);

                    break;
                }

                if (timeoutHelper.checkFailureTimeoutReached(e))
                    break;

                if (!spi.failureDetectionTimeoutEnabled() && ++reconCnt == spi.getReconnectCount())
                    break;

                if (!openSock) {
                    // Reconnect for the second time, if connection is not established.
                    if (connectAttempts < 2) {
                        connectAttempts++;

                        continue;
                    }

                    break; // Don't retry if we can not establish connection.
                }

                if (!spi.failureDetectionTimeoutEnabled() && (e instanceof SocketTimeoutException ||
                    X.hasCause(e, SocketTimeoutException.class))) {
                    ackTimeout0 *= 2;

                    if (!checkAckTimeout(ackTimeout0))
                        break;
                }
            }
        }

        if (log.isDebugEnabled())
            log.debug("Failed to join to address [addr=" + addr + ", recon=" + recon + ", errs=" + errs + ']');

        return null;
    }

    /**
     * Marshalls credentials with discovery SPI marshaller (will replace attribute value).
     *
     * @param node Node to marshall credentials for.
     * @throws IgniteSpiException If marshalling failed.
     */
    private void marshalCredentials(TcpDiscoveryNode node) throws IgniteSpiException {
        try {
            // Use security-unsafe getter.
            Map<String, Object> attrs = new HashMap<>(node.getAttributes());

            Object creds = attrs.get(IgniteNodeAttributes.ATTR_SECURITY_CREDENTIALS);

            assert !(creds instanceof byte[]);

            attrs.put(IgniteNodeAttributes.ATTR_SECURITY_CREDENTIALS, U.marshal(spi.marshaller(), creds));

            node.setAttributes(attrs);
        }
        catch (IgniteCheckedException e) {
            throw new IgniteSpiException("Failed to marshal node security credentials: " + node.id(), e);
        }
    }

    /**
     * @param topVer New topology version.
     * @param msg Discovery message.
     * @return Latest topology snapshot.
     */
    private Collection<ClusterNode> updateTopologyHistory(long topVer, @Nullable TcpDiscoveryAbstractMessage msg) {
        this.topVer = topVer;

        if (!topHist.isEmpty() && topVer <= topHist.lastKey()) {
            if (log.isDebugEnabled())
                log.debug("Skip topology update since topology already updated [msg=" + msg +
                    ", lastHistKey=" + topHist.lastKey() +
                    ", topVer=" + topVer +
                    ", locNode=" + locNode + ']');

            Collection<ClusterNode> top = topHist.get(topVer);

            assert top != null : "Failed to find topology history [msg=" + msg + ", hist=" + topHist + ']';

            return top;
        }

        NavigableSet<ClusterNode> allNodes = allVisibleNodes();

        if (!topHist.containsKey(topVer)) {
            assert topHist.isEmpty() || topHist.lastKey() == topVer - 1 :
                "lastVer=" + (topHist.isEmpty() ? null : topHist.lastKey()) +
                ", newVer=" + topVer +
                ", locNode=" + locNode +
                ", msg=" + msg;

            topHist.put(topVer, allNodes);

            if (topHist.size() > spi.topHistSize)
                topHist.pollFirstEntry();

            assert topHist.lastKey() == topVer;
            assert topHist.size() <= spi.topHistSize;
        }

        return allNodes;
    }

    /**
     * @return All nodes.
     */
    private NavigableSet<ClusterNode> allVisibleNodes() {
        NavigableSet<ClusterNode> allNodes = new TreeSet<>();

        for (TcpDiscoveryNode node : rmtNodes.values()) {
            if (node.visible())
                allNodes.add(node);
        }

        allNodes.add(locNode);

        return allNodes;
    }

    /** {@inheritDoc} */
    @Override void simulateNodeFailure() {
        U.warn(log, "Simulating client node failure: " + getLocalNodeId());

        U.interrupt(sockWriter);

        if (msgWorker != null)
            U.interrupt(msgWorker.runner());

        U.join(sockWriter, log);

        if (msgWorker != null)
            U.join(msgWorker.runner(), log);
    }

    /** {@inheritDoc} */
    @Override public void reconnect() throws IgniteSpiException {
        msgWorker.addMessage(SPI_RECONNECT);
    }

    /** {@inheritDoc} */
    @Override public void brakeConnection() {
        SocketStream sockStream = msgWorker.currSock;

        if (sockStream != null)
            U.closeQuiet(sockStream.socket());
    }

    /** {@inheritDoc} */
    @Override public void checkRingLatency(int maxHops) {
        TcpDiscoveryRingLatencyCheckMessage msg = new TcpDiscoveryRingLatencyCheckMessage(getLocalNodeId(), maxHops);

        if (log.isInfoEnabled())
            log.info("Latency check initiated: " + msg.id());

        sockWriter.sendMessage(msg);
    }

    /** {@inheritDoc} */
    @Override protected Collection<IgniteSpiThread> threads() {
        ArrayList<IgniteSpiThread> res = new ArrayList<>();

        res.add(sockWriter);

        MessageWorker msgWorker0 = msgWorker;

        if (msgWorker0 != null) {
            Thread runner = msgWorker0.runner();

            if (runner instanceof IgniteSpiThread)
                res.add((IgniteSpiThread)runner);
        }

        return res;
    }

    /** {@inheritDoc} */
    @Override public void updateMetrics(UUID nodeId,
        ClusterMetrics metrics,
        Map<Integer, CacheMetrics> cacheMetrics,
        long tsNanos)
    {
        boolean isLocDaemon = spi.locNode.isDaemon();

        assert nodeId != null;
        assert metrics != null;
        assert isLocDaemon || cacheMetrics != null;

        TcpDiscoveryNode node = nodeId.equals(getLocalNodeId()) ? locNode : rmtNodes.get(nodeId);

        if (node != null && node.visible()) {
            node.setMetrics(metrics);

            if (!isLocDaemon)
                node.setCacheMetrics(cacheMetrics);

            node.lastUpdateTimeNanos(tsNanos);

            msgWorker.notifyDiscovery(EVT_NODE_METRICS_UPDATED, topVer, node, allVisibleNodes(), null);
        }
        else if (log.isDebugEnabled())
            log.debug("Received metrics from unknown node: " + nodeId);
    }

    /**
     * FOR TEST PURPOSE ONLY!
     */
    @SuppressWarnings("BusyWait")
    public void waitForClientMessagePrecessed() {
        Object last = msgWorker.queue.peekLast();

        while (last != null && !msgWorker.isDone() && msgWorker.queue.contains(last)) {
            try {
                Thread.sleep(10);
            }
            catch (InterruptedException ignored) {
                Thread.currentThread().interrupt();
            }
        }
    }

    /**
     * @param err Error.
     */
    private void joinError(IgniteSpiException err) {
        assert err != null;

        joinErr.compareAndSet(null, err);

        joinLatch.countDown();
    }

    /** */
    private WorkersRegistry getWorkersRegistry() {
        Ignite ignite = spi.ignite();

        return ignite instanceof IgniteEx ? ((IgniteEx)ignite).context().workersRegistry() : null;
    }

    /**
     * Metrics sender.
     */
    private class MetricsSender implements Runnable {
        /** {@inheritDoc} */
        @Override public void run() {
            if (!spi.getSpiContext().isStopping() && sockWriter.isOnline()) {
                TcpDiscoveryClientMetricsUpdateMessage msg = new TcpDiscoveryClientMetricsUpdateMessage(
                    getLocalNodeId(),
                    spi.metricsProvider.metrics());

                msg.client(true);

                sockWriter.sendMessage(msg);
            }
        }
    }

    /**
     * Socket reader.
     */
    private class SocketReader extends IgniteSpiThread {
        /** */
        private final Object mux = new Object();

        /** */
        private SocketStream sockStream;

        /** */
        private UUID rmtNodeId;

        /** */
        private CountDownLatch stopReadLatch;

        /**
         */
        SocketReader() {
            super(spi.ignite().name(), "tcp-client-disco-sock-reader-[]", log);
        }

        /**
         * @param sockStream Socket.
         * @param rmtNodeId Rmt node id.
         */
        void setSocket(SocketStream sockStream, UUID rmtNodeId) {
            synchronized (mux) {
                this.sockStream = sockStream;

                this.rmtNodeId = rmtNodeId;

                mux.notifyAll();
            }
        }

        /**
         * @throws InterruptedException If interrupted.
         */
        private void forceStopRead() throws InterruptedException {
            CountDownLatch stopReadLatch;

            synchronized (mux) {
                SocketStream stream = sockStream;

                if (stream == null)
                    return;

                this.stopReadLatch = stopReadLatch = new CountDownLatch(1);

                U.closeQuiet(stream.socket());

                this.sockStream = null;
                this.rmtNodeId = null;

                mux.notifyAll();
            }

            stopReadLatch.await();
        }

        /** {@inheritDoc} */
        @Override protected void body() throws InterruptedException {
            while (!isInterrupted()) {
                SocketStream sockStream;
                UUID rmtNodeId;

                // Disconnected from router node.
                U.enhanceThreadName("");

                synchronized (mux) {
                    if (stopReadLatch != null) {
                        stopReadLatch.countDown();

                        stopReadLatch = null;
                    }

                    if (this.sockStream == null) {
                        mux.wait();

                        continue;
                    }

                    sockStream = this.sockStream;
                    rmtNodeId = this.rmtNodeId;
                }

                Socket sock = sockStream.socket();

                U.enhanceThreadName(U.id8(rmtNodeId)
                    + ' ' + sockStream.sock.getInetAddress().getHostAddress()
                    + ":" + sockStream.sock.getPort());

                try {
                    InputStream in = sockStream.stream();

                    assert sock.getKeepAlive() && sock.getTcpNoDelay() : "Socket wasn't configured properly:" +
                        " KeepAlive " + sock.getKeepAlive() +
                        " TcpNoDelay " + sock.getTcpNoDelay();

                    while (!isInterrupted()) {
                        TcpDiscoveryAbstractMessage msg;

                        try {
                            msg = U.unmarshal(spi.marshaller(), in, U.resolveClassLoader(spi.ignite().configuration()));
                        }
                        catch (IgniteCheckedException e) {
                            if (log.isDebugEnabled())
                                U.error(log, "Failed to read message [sock=" + sock + ", " +
                                    "locNodeId=" + getLocalNodeId() + ", rmtNodeId=" + rmtNodeId + ']', e);

                            // Exists possibility that exception raised on interruption.
                            if (X.hasCause(e, InterruptedException.class, InterruptedIOException.class,
                                IgniteInterruptedCheckedException.class, IgniteInterruptedException.class))
                                interrupt();

                            IOException ioEx = X.cause(e, IOException.class);

                            if (ioEx != null)
                                throw ioEx;

                            ClassNotFoundException clsNotFoundEx = X.cause(e, ClassNotFoundException.class);

                            if (clsNotFoundEx != null)
                                LT.warn(log, "Failed to read message due to ClassNotFoundException " +
                                    "(make sure same versions of all classes are available on all nodes) " +
                                    "[rmtNodeId=" + rmtNodeId + ", err=" + clsNotFoundEx.getMessage() + ']');
                            else
                                LT.error(log, e, "Failed to read message [sock=" + sock + ", locNodeId=" +
                                    getLocalNodeId() + ", rmtNodeId=" + rmtNodeId + ']');

                            continue;
                        }

                        msg.senderNodeId(rmtNodeId);

                        DebugLogger debugLog = messageLogger(msg);

                        if (debugLog.isDebugEnabled())
                            debugLog.debug("Message has been received: " + msg);

                        spi.stats.onMessageReceived(msg);

                        boolean ack = msg instanceof TcpDiscoveryClientAckResponse;

                        if (!ack)
                            msgWorker.addMessage(msg);
                        else
                            sockWriter.ackReceived((TcpDiscoveryClientAckResponse)msg);
                    }
                }
                catch (IOException e) {
                    msgWorker.addMessage(new SocketClosedMessage(sockStream));

                    if (log.isDebugEnabled())
                        U.error(log, "Connection failed [sock=" + sock + ", locNodeId=" + getLocalNodeId() + ']', e);
                }
                finally {
                    U.closeQuiet(sock);

                    synchronized (mux) {
                        if (this.sockStream == sockStream) {
                            this.sockStream = null;
                            this.rmtNodeId = null;
                        }
                    }
                }
            }
        }
    }

    /**
     *
     */
    private class SocketWriter extends IgniteSpiThread {
        /** */
        private final Object mux = new Object();

        /** */
        private Socket sock;

        /** */
        private boolean clientAck;

        /** */
        private final Queue<TcpDiscoveryAbstractMessage> queue = new ArrayDeque<>();

        /** */
        private final long sockTimeout;

        /** */
        private TcpDiscoveryAbstractMessage unackedMsg;

        /** */
        private CountDownLatch forceLeaveLatch;

        /**
         *
         */
        SocketWriter() {
            super(spi.ignite().name(), "tcp-client-disco-sock-writer", log);

            sockTimeout = spi.failureDetectionTimeoutEnabled() ? spi.failureDetectionTimeout() :
                spi.getSocketTimeout();
        }

        /**
         * @param msg Message.
         */
        private void sendMessage(TcpDiscoveryAbstractMessage msg) {
            synchronized (mux) {
                queue.add(msg);

                mux.notifyAll();
            }
        }

        /**
         * Sends {@link TcpDiscoveryNodeLeftMessage} and closes socket.
         *
         * @throws InterruptedException If interrupted.
         */
        private void forceLeave() throws InterruptedException {
            CountDownLatch forceLeaveLatch;

            synchronized (mux) {
                // If writer was stopped.
                if (sock == null)
                    return;

                this.forceLeaveLatch = forceLeaveLatch = new CountDownLatch(1);

                unackedMsg = null;

                mux.notifyAll();
            }

            forceLeaveLatch.await();
        }

        /**
         * @param sock Socket.
         * @param clientAck {@code True} is server supports client message acknowlede.
         */
        private void setSocket(Socket sock, boolean clientAck) {
            synchronized (mux) {
                this.sock = sock;

                this.clientAck = clientAck;

                unackedMsg = null;

                mux.notifyAll();
            }
        }

        /**
         * @return {@code True} if connection is alive.
         */
        public boolean isOnline() {
            synchronized (mux) {
                return sock != null;
            }
        }

        /**
         * @param res Acknowledge response.
         */
        void ackReceived(TcpDiscoveryClientAckResponse res) {
            synchronized (mux) {
                if (unackedMsg != null) {
                    assert unackedMsg.id().equals(res.messageId()) : unackedMsg;

                    unackedMsg = null;
                }

                mux.notifyAll();
            }
        }

        /** {@inheritDoc} */
        @Override protected void body() throws InterruptedException {
            TcpDiscoveryAbstractMessage msg = null;

            while (!Thread.currentThread().isInterrupted()) {
                Socket sock;

                synchronized (mux) {
                    sock = this.sock;

                    if (sock == null) {
                        mux.wait();

                        continue;
                    }

                    if (forceLeaveLatch != null) {
                        msg = new TcpDiscoveryNodeLeftMessage(getLocalNodeId());

                        msg.client(true);

                        try {
                            spi.writeToSocket(
                                sock,
                                msg,
                                sockTimeout);
                        }
                        catch (IOException | IgniteCheckedException e) {
                            if (log.isDebugEnabled()) {
                                log.debug("Failed to send TcpDiscoveryNodeLeftMessage on force leave [msg=" + msg +
                                    ", err=" + e.getMessage() + ']');
                            }
                        }

                        U.closeQuiet(sock);

                        this.sock = null;

                        clear();

                        continue;
                    }
                    else {
                        if (msg == null)
                            msg = queue.poll();

                        if (msg == null) {
                            mux.wait();

                            continue;
                        }
                    }
                }

                for (IgniteInClosure<TcpDiscoveryAbstractMessage> msgLsnr : spi.sndMsgLsnrs)
                    msgLsnr.apply(msg);

                boolean ack = clientAck && !(msg instanceof TcpDiscoveryPingResponse);

                try {
                    if (ack) {
                        synchronized (mux) {
                            assert unackedMsg == null : "Unacked=" + unackedMsg + ", received=" + msg;

                            unackedMsg = msg;
                        }
                    }

                    spi.writeToSocket(
                        sock,
                        msg,
                        sockTimeout);

                    IgniteUuid latencyCheckId = msg instanceof TcpDiscoveryRingLatencyCheckMessage ?
                        msg.id() : null;

                    if (latencyCheckId != null && log.isInfoEnabled())
                        log.info("Latency check message has been written to socket: " + latencyCheckId);

                    msg = null;

                    if (ack) {
                        long waitEndNanos = System.nanoTime() + U.millisToNanos(spi.failureDetectionTimeoutEnabled() ?
                            spi.failureDetectionTimeout() : spi.getAckTimeout());

                        TcpDiscoveryAbstractMessage unacked;

                        synchronized (mux) {
                            long nowNanos = System.nanoTime();

                            while (unackedMsg != null && waitEndNanos - nowNanos > 0) {
                                mux.wait(U.nanosToMillis(waitEndNanos - nowNanos));

                                nowNanos = System.nanoTime();
                            }

                            unacked = unackedMsg;

                            unackedMsg = null;
                        }

                        if (unacked != null) {
                            if (log.isDebugEnabled())
                                log.debug("Failed to get acknowledge for message, will try to reconnect " +
                                "[msg=" + unacked +
                                (spi.failureDetectionTimeoutEnabled() ?
                                ", failureDetectionTimeout=" + spi.failureDetectionTimeout() :
                                ", timeout=" + spi.getAckTimeout()) + ']');

                            throw new IOException("Failed to get acknowledge for message: " + unacked);
                        }

                        if (latencyCheckId != null && log.isInfoEnabled())
                            log.info("Latency check message has been acked: " + latencyCheckId);
                    }
                }
                catch (InterruptedException ignored) {
                    if (log.isDebugEnabled())
                        log.debug("Client socket writer interrupted.");

                    return;
                }
                catch (Exception e) {
                    if (spi.getSpiContext().isStopping()) {
                        if (log.isDebugEnabled())
                            log.debug("Failed to send message, node is stopping [msg=" + msg + ", err=" + e + ']');
                    }
                    else
                        U.error(log, "Failed to send message: " + msg, e);

                    U.closeQuiet(sock);

                    synchronized (mux) {
                        if (sock == this.sock)
                            this.sock = null; // Connection has dead.

                        clear();
                    }
                }
            }
        }

        /**
         *
         */
        private void clear() {
            assert Thread.holdsLock(mux);

            queue.clear();
            unackedMsg = null;

            CountDownLatch forceLeaveLatch = this.forceLeaveLatch;

            if (forceLeaveLatch != null) {
                this.forceLeaveLatch = null;

                forceLeaveLatch.countDown();
            }
        }
    }

    /**
     *
     */
    private class Reconnector extends IgniteSpiThread {
        /** */
        private volatile SocketStream sockStream;

        /** */
        private boolean clientAck;

        /** */
        private final boolean join;

        /** */
        private final InetSocketAddress prevAddr;

        /**
         * @param join {@code True} if reconnects during join.
         * @param prevAddr Address of the node, that this client was previously connected to.
         */
        protected Reconnector(boolean join, InetSocketAddress prevAddr) {
            super(spi.ignite().name(), "tcp-client-disco-reconnector", log);

            this.join = join;
            this.prevAddr = prevAddr;
        }

        /**
         *
         */
        public void cancel() {
            interrupt();

            SocketStream sockStream = this.sockStream;

            if (sockStream != null)
                U.closeQuiet(sockStream.socket());
        }

        /** {@inheritDoc} */
        @Override protected void body() throws InterruptedException {
            boolean success = false;

            Exception err = null;

            long timeout = join ? spi.joinTimeout : spi.netTimeout;

            long startNanos = System.nanoTime();

            if (log.isDebugEnabled())
                log.debug("Started reconnect process [join=" + join + ", timeout=" + timeout + ']');

            try {
                while (true) {
                    T2<SocketStream, Boolean> joinRes = joinTopology(prevAddr, timeout, null, null);

                    if (joinRes == null) {
                        if (join) {
                            joinError(new IgniteSpiException("Join process timed out, connection failed and " +
                                "failed to reconnect (consider increasing 'joinTimeout' configuration property) " +
                                "[joinTimeout=" + spi.joinTimeout + ']'));
                        }
                        else
                            U.error(log, "Failed to reconnect to cluster (consider increasing 'networkTimeout'" +
                                " configuration property) [networkTimeout=" + spi.netTimeout + ']');

                        return;
                    }

                    sockStream = joinRes.get1();
                    clientAck = joinRes.get2();

                    Socket sock = sockStream.socket();

                    if (isInterrupted())
                        throw new InterruptedException();

                    int oldTimeout = 0;

                    try {
                        oldTimeout = sock.getSoTimeout();

                        sock.setSoTimeout((int)spi.netTimeout);

                        InputStream in = sockStream.stream();

                        assert sock.getKeepAlive() && sock.getTcpNoDelay() : "Socket wasn't configured properly:" +
                            " KeepAlive " + sock.getKeepAlive() +
                            " TcpNoDelay " + sock.getTcpNoDelay();

                        List<TcpDiscoveryAbstractMessage> msgs = null;

                        while (!isInterrupted()) {
                            TcpDiscoveryAbstractMessage msg = U.unmarshal(spi.marshaller(), in,
                                U.resolveClassLoader(spi.ignite().configuration()));

                            if (msg instanceof TcpDiscoveryClientReconnectMessage) {
                                TcpDiscoveryClientReconnectMessage res = (TcpDiscoveryClientReconnectMessage)msg;

                                if (res.creatorNodeId().equals(getLocalNodeId())) {
                                    if (log.isDebugEnabled())
                                        log.debug("Received reconnect response [success=" + res.success() +
                                            ", msg=" + msg + ']');

                                    if (res.success()) {
                                        msgWorker.addMessage(res);

                                        if (msgs != null) {
                                            for (TcpDiscoveryAbstractMessage msg0 : msgs)
                                                msgWorker.addMessage(msg0);
                                        }

                                        success = true;

                                        return;
                                    }
                                    else
                                        return;
                                }
                            }
                            else if (spi.ensured(msg)) {
                                if (msgs == null)
                                    msgs = new ArrayList<>();

                                msgs.add(msg);
                            }
                        }
                    }
                    catch (IOException | IgniteCheckedException e) {
                        U.closeQuiet(sock);

                        if (log.isDebugEnabled())
                            log.error("Reconnect error [join=" + join + ", timeout=" + timeout + ']', e);

                        if (timeout > 0 && U.millisSinceNanos(startNanos) > timeout) {
                            String msg = join ? "Failed to connect to cluster (consider increasing 'joinTimeout' " +
                                "configuration  property) [joinTimeout=" + spi.joinTimeout + ", err=" + e + ']' :
                                "Failed to reconnect to cluster (consider increasing 'networkTimeout' " +
                                    "configuration  property) [networkTimeout=" + spi.netTimeout + ", err=" + e + ']';

                            U.warn(log, msg);

                            throw e;
                        }
                        else
                            U.warn(log, "Failed to reconnect to cluster (will retry): " + e);
                    }
                    finally {
                        if (success)
                            sock.setSoTimeout(oldTimeout);
                    }
                }
            }
            catch (IOException | IgniteCheckedException e) {
                err = e;

                success = false;

                U.error(log, "Failed to reconnect", e);
            }
            finally {
                if (!success) {
                    SocketStream sockStream = this.sockStream;

                    if (sockStream != null)
                        U.closeQuiet(sockStream.socket());

                    if (join)
                        joinError(new IgniteSpiException("Failed to connect to cluster, connection failed and failed " +
                            "to reconnect.", err));
                    else
                        msgWorker.addMessage(SPI_RECONNECT_FAILED);
                }
            }
        }
    }

    /**
     * Message worker.
     */
    protected class MessageWorker extends GridWorker {
        /** Message queue. */
        private final BlockingDeque<Object> queue = new LinkedBlockingDeque<>();

        /** */
        private SocketStream currSock;

        /** */
        private Reconnector reconnector;

        /** */
        private boolean nodeAdded;

        /** */
        private long lastReconnectTsNanos = -1;

        /** */
        private long currentReconnectDelay = -1;

        /**
         * @param log Logger.
         */
        private MessageWorker(IgniteLogger log) {
            super(spi.ignite().name(), "tcp-client-disco-msg-worker", log, getWorkersRegistry());
        }

        /** {@inheritDoc} */
        @Override protected void body() throws InterruptedException {
            state = STARTING;

            updateHeartbeat();

            try {
                tryJoin();

                while (true) {
                    onIdle();

                    Object msg;

                    blockingSectionBegin();

                    try {
                        msg = queue.take();
                    }
                    finally {
                        blockingSectionEnd();
                    }

                    if (msg instanceof JoinTimeout) {
                        int joinCnt0 = ((JoinTimeout)msg).joinCnt;

                        if (joinCnt == joinCnt0) {
                            if (state == STARTING) {
                                joinError(new IgniteSpiException("Join process timed out, did not receive response for " +
                                    "join request (consider increasing 'joinTimeout' configuration property) " +
                                    "[joinTimeout=" + spi.joinTimeout + ", sock=" + currSock + ']'));

                                break;
                            }
                            else if (state == DISCONNECTED) {
                                if (log.isDebugEnabled())
                                    log.debug("Failed to reconnect, local node segmented " +
                                        "[joinTimeout=" + spi.joinTimeout + ']');

                                state = SEGMENTED;

                                notifyDiscovery(
                                    EVT_NODE_SEGMENTED, topVer, locNode, allVisibleNodes(), null);
                            }
                        }
                    }
                    else if (msg == SPI_STOP) {
                        boolean connected = state == CONNECTED;

                        state = STOPPED;

                        assert spi.getSpiContext().isStopping();

                        if (connected && currSock != null) {
                            TcpDiscoveryNodeLeftMessage leftMsg = new TcpDiscoveryNodeLeftMessage(getLocalNodeId());

                            leftMsg.client(true);

                            Span rootSpan = tracing.create(TraceableMessagesTable.traceName(leftMsg.getClass()))
                                .addTag(SpanTags.tag(SpanTags.EVENT_NODE, SpanTags.ID), () -> locNode.id().toString())
                                .addTag(SpanTags.tag(SpanTags.EVENT_NODE, SpanTags.CONSISTENT_ID),
                                    () -> locNode.consistentId().toString())
                                .addLog(() -> "Created");

                            leftMsg.spanContainer().serializedSpanBytes(tracing.serialize(rootSpan));

                            sockWriter.sendMessage(leftMsg);

                            rootSpan.addLog(() -> "Sent").end();
                        }
                        else
                            leaveLatch.countDown();
                    }
                    else if (msg == SPI_RECONNECT) {
                        if (state == CONNECTED) {
                            if (reconnector != null) {
                                reconnector.cancel();
                                reconnector.join();

                                reconnector = null;
                            }

                            sockWriter.forceLeave();
                            sockReader.forceStopRead();

                            currSock = null;

                            queue.clear();

                            onDisconnected();

                            UUID newId = UUID.randomUUID();

                            U.quietAndWarn(log, "Local node will try to reconnect to cluster with new id due " +
                                "to network problems [newId=" + newId +
                                ", prevId=" + locNode.id() +
                                ", locNode=" + locNode + ']');

                            locNode.onClientDisconnected(newId);

                            throttleClientReconnect();

                            tryJoin();
                        }
                    }
                    else if (msg instanceof TcpDiscoveryNodeFailedMessage &&
                        ((TcpDiscoveryNodeFailedMessage)msg).failedNodeId().equals(locNode.id())) {
                        TcpDiscoveryNodeFailedMessage msg0 = (TcpDiscoveryNodeFailedMessage)msg;

                        assert msg0.force() : msg0;

                        forceFailMsg = msg0;
                    }
                    else if (msg instanceof SocketClosedMessage) {
                        if (((SocketClosedMessage)msg).sock == currSock) {
                            Socket sock = currSock.sock;

                            InetSocketAddress prevAddr = new InetSocketAddress(sock.getInetAddress(), sock.getPort());

                            currSock = null;

                            boolean join = joinLatch.getCount() > 0;

                            if (spi.getSpiContext().isStopping() || state == SEGMENTED) {
                                leaveLatch.countDown();

                                if (join) {
                                    joinError(new IgniteSpiException("Failed to connect to cluster: socket closed."));

                                    break;
                                }
                            }
                            else {
                                if (forceFailMsg != null) {
                                    if (log.isDebugEnabled()) {
                                        log.debug("Connection closed, local node received force fail message, " +
                                            "will not try to restore connection");
                                    }

                                    queue.addFirst(SPI_RECONNECT_FAILED);
                                }
                                else {
                                    if (log.isDebugEnabled())
                                        log.debug("Connection closed, will try to restore connection.");

                                    assert reconnector == null;

                                    reconnector = new Reconnector(join, prevAddr);
                                    reconnector.start();
                                }
                            }
                        }
                    }
                    else if (msg == SPI_RECONNECT_FAILED) {
                        if (reconnector != null) {
                            reconnector.cancel();
                            reconnector.join();

                            reconnector = null;
                        }
                        else
                            assert forceFailMsg != null;

                        if (spi.isClientReconnectDisabled()) {
                            if (state != SEGMENTED && state != STOPPED) {
                                if (forceFailMsg != null) {
                                    U.quietAndWarn(log, "Local node was dropped from cluster due to network problems " +
                                        "[nodeInitiatedFail=" + forceFailMsg.creatorNodeId() +
                                        ", msg=" + forceFailMsg.warning() + ']');
                                }

                                if (log.isDebugEnabled()) {
                                    log.debug("Failed to restore closed connection, reconnect disabled, " +
                                        "local node segmented [networkTimeout=" + spi.netTimeout + ']');
                                }

                                state = SEGMENTED;

                                notifyDiscovery(
                                    EVT_NODE_SEGMENTED, topVer, locNode, allVisibleNodes(), null);
                            }
                        }
                        else {
                            if (state == STARTING || state == CONNECTED) {
                                if (log.isDebugEnabled()) {
                                    log.debug("Failed to restore closed connection, will try to reconnect " +
                                        "[networkTimeout=" + spi.netTimeout +
                                        ", joinTimeout=" + spi.joinTimeout +
                                        ", failMsg=" + forceFailMsg + ']');
                                }

                                onDisconnected();
                            }

                            UUID newId = UUID.randomUUID();

                            if (forceFailMsg != null) {
                                long delay = IgniteSystemProperties.getLong(IGNITE_DISCO_FAILED_CLIENT_RECONNECT_DELAY,
                                    DFLT_DISCO_FAILED_CLIENT_RECONNECT_DELAY);

                                if (delay > 0) {
                                    U.quietAndWarn(log, "Local node was dropped from cluster due to network problems, " +
                                        "will try to reconnect with new id after " + delay + "ms (reconnect delay " +
                                        "can be changed using IGNITE_DISCO_FAILED_CLIENT_RECONNECT_DELAY system " +
                                        "property) [" +
                                        "newId=" + newId +
                                        ", prevId=" + locNode.id() +
                                        ", locNode=" + locNode +
                                        ", nodeInitiatedFail=" + forceFailMsg.creatorNodeId() +
                                        ", msg=" + forceFailMsg.warning() + ']');

                                    Thread.sleep(delay);
                                }
                                else {
                                    U.quietAndWarn(log, "Local node was dropped from cluster due to network problems, " +
                                        "will try to reconnect with new id [" +
                                        "newId=" + newId +
                                        ", prevId=" + locNode.id() +
                                        ", locNode=" + locNode +
                                        ", nodeInitiatedFail=" + forceFailMsg.creatorNodeId() +
                                        ", msg=" + forceFailMsg.warning() + ']');
                                }

                                forceFailMsg = null;
                            }
                            else if (log.isInfoEnabled()) {
                                log.info("Client node disconnected from cluster, will try to reconnect with new id " +
                                    "[newId=" + newId + ", prevId=" + locNode.id() + ", locNode=" + locNode + ']');
                            }

                            locNode.onClientDisconnected(newId);

                            tryJoin();
                        }
                    }
                    else {
                        TcpDiscoveryAbstractMessage discoMsg = (TcpDiscoveryAbstractMessage)msg;

                        if (joining()) {
                            IgniteSpiException err = null;

                            if (discoMsg instanceof TcpDiscoveryDuplicateIdMessage)
                                err = spi.duplicateIdError((TcpDiscoveryDuplicateIdMessage)msg);
                            else if (discoMsg instanceof TcpDiscoveryAuthFailedMessage)
                                err = spi.authenticationFailedError((TcpDiscoveryAuthFailedMessage)msg);
                            //TODO: https://issues.apache.org/jira/browse/IGNITE-9829
                            else if (discoMsg instanceof TcpDiscoveryCheckFailedMessage)
                                err = spi.checkFailedError((TcpDiscoveryCheckFailedMessage)msg);

                            if (err != null) {
                                if (state == DISCONNECTED) {
                                    U.error(log, "Failed to reconnect, segment local node.", err);

                                    state = SEGMENTED;

                                    notifyDiscovery(
                                        EVT_NODE_SEGMENTED, topVer, locNode, allVisibleNodes(), null);
                                }
                                else
                                    joinError(err);

                                cancel();

                                break;
                            }
                        }

                        processDiscoveryMessage((TcpDiscoveryAbstractMessage)msg);
                    }
                }
            }
            catch (InterruptedException ignored) {
                Thread.currentThread().interrupt();
            }
            catch (Throwable t) {
                if (spi.ignite() instanceof IgniteEx)
                    ((IgniteEx)spi.ignite()).context().failure().process(new FailureContext(CRITICAL_ERROR, t));
            }
            finally {
                SocketStream currSock = this.currSock;

                if (currSock != null)
                    U.closeQuiet(currSock.socket());

                if (joinLatch.getCount() > 0)
                    joinError(new IgniteSpiException("Some error in join process.")); // This should not occur.

                if (reconnector != null) {
                    reconnector.cancel();

                    reconnector.join();
                }
            }
        }

        /**
         * Wait random delay before trying to reconnect. Delay will grow exponentially every time client is forced to
         * reconnect, but only if all these reconnections happened in small period of time (2 minutes). Maximum delay
         * could be configured with {@link IgniteSpiAdapter#clientFailureDetectionTimeout()}, default value is
         * {@link IgniteConfiguration#DFLT_CLIENT_FAILURE_DETECTION_TIMEOUT}.
         *
         * @throws InterruptedException If thread is interrupted.
         */
        private void throttleClientReconnect() throws InterruptedException {
            if (currentReconnectDelay == -1
                || U.millisSinceNanos(lastReconnectTsNanos) > CLIENT_THROTTLE_RECONNECT_RESET_TIMEOUT
            )
                currentReconnectDelay = 0; // Skip pause on first reconnect.
            else if (currentReconnectDelay == 0)
                currentReconnectDelay = 200;
            else {
                long maxDelay = spi.failureDetectionTimeoutEnabled()
                    ? spi.clientFailureDetectionTimeout()
                    : IgniteConfiguration.DFLT_CLIENT_FAILURE_DETECTION_TIMEOUT;

                currentReconnectDelay = Math.min(maxDelay, (int)(currentReconnectDelay * 1.5));
            }

            if (currentReconnectDelay != 0) {
                ThreadLocalRandom random = ThreadLocalRandom.current();

                Thread.sleep(random.nextLong(currentReconnectDelay / 2, currentReconnectDelay));
            }

            lastReconnectTsNanos = System.nanoTime();
        }

        /**
         *
         */
        private void onDisconnected() {
            state = DISCONNECTED;

            nodeAdded = false;

            delayDiscoData.clear();

            notifyDiscovery(EVT_CLIENT_NODE_DISCONNECTED, topVer, locNode, allVisibleNodes(), null);

            IgniteClientDisconnectedCheckedException err =
                new IgniteClientDisconnectedCheckedException(null, "Failed to ping node, " +
                    "client node disconnected.");

            for (Map.Entry<UUID, GridFutureAdapter<Boolean>> e : pingFuts.entrySet()) {
                GridFutureAdapter<Boolean> fut = e.getValue();

                if (pingFuts.remove(e.getKey(), fut))
                    fut.onDone(err);
            }
        }

        /**
         * @throws InterruptedException If interrupted.
         */
        private void tryJoin() throws InterruptedException {
            assert state == DISCONNECTED || state == STARTING : state;

            boolean join = state == STARTING;

            joinCnt++;

            T2<SocketStream, Boolean> joinRes;

            try {
                joinRes = joinTopology(null, spi.joinTimeout,
                    new Runnable() {
                        @Override public void run() {
                            blockingSectionBegin();
                        }
                    },
                    new Runnable() {
                        @Override public void run() {
                            blockingSectionEnd();
                        }
                    });
            }
            catch (IgniteSpiException e) {
                joinError(e);

                return;
            }

            if (joinRes == null) {
                if (join)
                    joinError(new IgniteSpiException("Join process timed out (timeout = " + spi.joinTimeout + ")."));
                else {
                    state = SEGMENTED;

                    notifyDiscovery(EVT_NODE_SEGMENTED, topVer, locNode, allVisibleNodes(), null);
                }

                return;
            }

            currSock = joinRes.get1();

            sockWriter.setSocket(joinRes.get1().socket(), joinRes.get2());

            if (spi.joinTimeout > 0) {
                final int joinCnt0 = joinCnt;

                executorService.schedule(() -> {
                        queue.add(new JoinTimeout(joinCnt0));
                    }, spi.joinTimeout, MILLISECONDS);
            }

            sockReader.setSocket(joinRes.get1(), locNode.clientRouterNodeId());
        }

        /**
         * @param msg Message.
         */
        protected void processDiscoveryMessage(TcpDiscoveryAbstractMessage msg) {
            assert msg != null;
            assert msg.verified() || msg.senderNodeId() == null;

            spi.startMessageProcess(msg);

            spi.stats.onMessageProcessingStarted(msg);

            if (msg instanceof TraceableMessage)
                tracing.messages().beforeSend((TraceableMessage) msg);

            if (msg instanceof TcpDiscoveryNodeAddedMessage)
                processNodeAddedMessage((TcpDiscoveryNodeAddedMessage)msg);
            else if (msg instanceof TcpDiscoveryNodeAddFinishedMessage)
                processNodeAddFinishedMessage((TcpDiscoveryNodeAddFinishedMessage)msg);
            else if (msg instanceof TcpDiscoveryNodeLeftMessage)
                processNodeLeftMessage((TcpDiscoveryNodeLeftMessage)msg);
            else if (msg instanceof TcpDiscoveryNodeFailedMessage)
                processNodeFailedMessage((TcpDiscoveryNodeFailedMessage)msg);
            else if (msg instanceof TcpDiscoveryMetricsUpdateMessage)
                processMetricsUpdateMessage((TcpDiscoveryMetricsUpdateMessage)msg);
            else if (msg instanceof TcpDiscoveryClientReconnectMessage)
                processClientReconnectMessage((TcpDiscoveryClientReconnectMessage)msg);
            else if (msg instanceof TcpDiscoveryCustomEventMessage)
                processCustomMessage((TcpDiscoveryCustomEventMessage)msg);
            else if (msg instanceof TcpDiscoveryClientPingResponse)
                processClientPingResponse((TcpDiscoveryClientPingResponse)msg);
            else if (msg instanceof TcpDiscoveryPingRequest)
                processPingRequest();

            spi.stats.onMessageProcessingFinished(msg);

            if (msg instanceof TraceableMessage)
                tracing.messages().finishProcessing((TraceableMessage) msg);

            if (spi.ensured(msg)
                    && state == CONNECTED
                    && !(msg instanceof TcpDiscoveryClientReconnectMessage))
                lastMsgId = msg.id();
        }

        /**
         * @return {@code True} if client in process of join.
         */
        private boolean joining() {
            ClientImpl.State state = ClientImpl.this.state;

            return state == STARTING || state == DISCONNECTED;
        }

        /**
         * @return {@code True} if client disconnected.
         */
        private boolean disconnected() {
            return state == DISCONNECTED;
        }

        /**
         * @param msg Message.
         */
        private void processNodeAddedMessage(TcpDiscoveryNodeAddedMessage msg) {
            if (spi.getSpiContext().isStopping())
                return;

            TcpDiscoveryNode node = msg.node();

            UUID newNodeId = node.id();

            if (getLocalNodeId().equals(newNodeId)) {
                if (joining()) {
                    Collection<TcpDiscoveryNode> top = msg.topology();

                    if (top != null) {
                        spi.gridStartTime = msg.gridStartTime();

                        if (disconnected())
                            rmtNodes.clear();

                        for (TcpDiscoveryNode n : top) {
                            if (n.order() > 0)
                                n.visible(true);

                            rmtNodes.put(n.id(), n);
                        }

                        topHist.clear();

                        nodeAdded = true;

                        if (msg.topologyHistory() != null)
                            topHist.putAll(msg.topologyHistory());
                    }
                    else {
                        if (log.isDebugEnabled())
                            log.debug("Discarding node added message with empty topology: " + msg);
                    }
                }
                else if (log.isDebugEnabled())
                    log.debug("Discarding node added message (this message has already been processed) " +
                        "[msg=" + msg + ", locNode=" + locNode + ']');
            }
            else {
                if (nodeAdded()) {
                    boolean topChanged = rmtNodes.putIfAbsent(newNodeId, node) == null;

                    if (topChanged) {
                        if (log.isDebugEnabled())
                            log.debug("Added new node to topology: " + node);

                        DiscoveryDataPacket dataPacket = msg.gridDiscoveryData();

                        if (dataPacket != null && dataPacket.hasJoiningNodeData()) {
                            if (joining())
                                delayDiscoData.add(dataPacket);
                            else
                                spi.onExchange(dataPacket, U.resolveClassLoader(spi.ignite().configuration()));
                        }
                    }
                }
                else {
                    if (log.isDebugEnabled())
                        log.debug("Ignore topology message, local node not added to topology: " + msg);
                }
            }
        }

        /**
         * @param msg Message.
         */
        private void processNodeAddFinishedMessage(TcpDiscoveryNodeAddFinishedMessage msg) {
            if (spi.getSpiContext().isStopping())
                return;

            if (log.isInfoEnabled()) {
                for (ClusterNode node : getRemoteNodes()) {
                    if (node.id().equals(locNode.clientRouterNodeId())) {
                        if (log.isInfoEnabled())
                            log.info("Router node: " + node);

                        break;
                    }
                }
            }

            if (getLocalNodeId().equals(msg.nodeId())) {
                if (joining()) {
                    DiscoveryDataPacket dataContainer = msg.clientDiscoData();

                    if (dataContainer != null)
                        spi.onExchange(dataContainer, U.resolveClassLoader(spi.ignite().configuration()));

                    if (!delayDiscoData.isEmpty()) {
                        for (DiscoveryDataPacket data : delayDiscoData)
                            spi.onExchange(data, U.resolveClassLoader(spi.ignite().configuration()));

                        delayDiscoData.clear();
                    }

                    locNode.setAttributes(msg.clientNodeAttributes());
                    locNode.visible(true);

                    long topVer = msg.topologyVersion();

                    locNode.order(topVer);

                    for (Iterator<Long> it = topHist.keySet().iterator(); it.hasNext();) {
                        if (it.next() >= topVer)
                            it.remove();
                    }

                    Collection<ClusterNode> nodes = updateTopologyHistory(topVer, msg);

                    notifyDiscovery(EVT_NODE_JOINED, topVer, locNode, nodes, msg.spanContainer());

                    boolean disconnected = disconnected();

                    state = CONNECTED;

                    if (disconnected) {
                        notifyDiscovery(EVT_CLIENT_NODE_RECONNECTED, topVer, locNode, nodes, null);

                        U.quietAndWarn(log, "Client node was reconnected after it was already considered " +
                            "failed by the server topology (this could happen after all servers restarted or due " +
                            "to a long network outage between the client and servers). All continuous queries and " +
                            "remote event listeners created by this client will be unsubscribed, consider " +
                            "listening to EVT_CLIENT_NODE_RECONNECTED event to restore them.");
                    }

                    joinErr.set(null);

                    joinLatch.countDown();
                }
                else if (log.isDebugEnabled())
                    log.debug("Discarding node add finished message (this message has already been processed) " +
                        "[msg=" + msg + ", locNode=" + locNode + ']');
            }
            else {
                if (nodeAdded()) {
                    TcpDiscoveryNode node = rmtNodes.get(msg.nodeId());

                    if (node == null) {
                        if (log.isDebugEnabled())
                            log.debug("Discarding node add finished message since node is not found [msg=" + msg + ']');

                        return;
                    }

                    boolean evt = false;

                    long topVer = msg.topologyVersion();

                    assert topVer > 0 : msg;

                    if (!node.visible()) {
                        node.order(topVer);
                        node.visible(true);

                        if (spi.locNodeVer.equals(node.version()))
                            node.version(spi.locNodeVer);

                        evt = true;
                    }
                    else {
                        if (log.isDebugEnabled())
                            log.debug("Skip node join event, node already joined [msg=" + msg + ", node=" + node + ']');

                        assert node.order() == topVer : node;
                    }

                    Collection<ClusterNode> top = updateTopologyHistory(topVer, msg);

                    assert top != null && top.contains(node) : "Topology does not contain node [msg=" + msg +
                        ", node=" + node + ", top=" + top + ']';

                    if (state != CONNECTED) {
                        if (log.isDebugEnabled())
                            log.debug("Discarding node add finished message (join process is not finished): " + msg);

                        return;
                    }

                    if (evt) {
                        notifyDiscovery(EVT_NODE_JOINED, topVer, node, top, msg.spanContainer());

                        spi.stats.onNodeJoined();
                    }
                }
                else {
                    if (log.isDebugEnabled())
                        log.debug("Ignore topology message, local node not added to topology: " + msg);
                }
            }
        }

        /**
         * @param msg Message.
         */
        private void processNodeLeftMessage(TcpDiscoveryNodeLeftMessage msg) {
            if (getLocalNodeId().equals(msg.creatorNodeId())) {
                if (log.isDebugEnabled())
                    log.debug("Received node left message for local node: " + msg);

                leaveLatch.countDown();
            }
            else {
                if (spi.getSpiContext().isStopping())
                    return;

                if (nodeAdded()) {
                    TcpDiscoveryNode node = rmtNodes.remove(msg.creatorNodeId());

                    if (node == null) {
                        if (log.isDebugEnabled())
                            log.debug("Discarding node left message since node is not found [msg=" + msg + ']');

                        return;
                    }

                    Collection<ClusterNode> top = updateTopologyHistory(msg.topologyVersion(), msg);

                    if (state != CONNECTED) {
                        if (log.isDebugEnabled())
                            log.debug("Discarding node left message (join process is not finished): " + msg);

                        return;
                    }

                    notifyDiscovery(EVT_NODE_LEFT, msg.topologyVersion(), node, top, msg.spanContainer());

                    spi.stats.onNodeLeft();
                }
                else {
                    if (log.isDebugEnabled())
                        log.debug("Ignore topology message, local node not added to topology: " + msg);
                }
            }
        }

        /**
         * @return {@code True} if received node added message for local node.
         */
        private boolean nodeAdded() {
            return nodeAdded;
        }

        /**
         * @param msg Message.
         */
        private void processNodeFailedMessage(TcpDiscoveryNodeFailedMessage msg) {
            if (spi.getSpiContext().isStopping()) {
                if (!getLocalNodeId().equals(msg.creatorNodeId()) && getLocalNodeId().equals(msg.failedNodeId())) {
                    if (leaveLatch.getCount() > 0) {
                        if (log.isDebugEnabled())
                            log.debug("Remote node fail this node while node is stopping [locNode=" + getLocalNodeId()
                                + ", rmtNode=" + msg.creatorNodeId() + ']');

                        leaveLatch.countDown();
                    }
                }

                return;
            }

            if (nodeAdded()) {
                if (!getLocalNodeId().equals(msg.creatorNodeId())) {
                    TcpDiscoveryNode node = rmtNodes.remove(msg.failedNodeId());

                    if (node == null) {
                        if (log.isDebugEnabled())
                            log.debug("Discarding node failed message since node is not found [msg=" + msg + ']');

                        return;
                    }

                    Collection<ClusterNode> top = updateTopologyHistory(msg.topologyVersion(), msg);

                    if (state != CONNECTED) {
                        if (log.isDebugEnabled())
                            log.debug("Discarding node failed message (join process is not finished): " + msg);

                        return;
                    }

                    if (msg.warning() != null) {
                        ClusterNode creatorNode = rmtNodes.get(msg.creatorNodeId());

                        U.warn(log, "Received EVT_NODE_FAILED event with warning [" +
                            "nodeInitiatedEvt=" + (creatorNode != null ? creatorNode : msg.creatorNodeId()) +
                            ", msg=" + msg.warning() + ']');
                    }

                    notifyDiscovery(EVT_NODE_FAILED, msg.topologyVersion(), node, top, msg.spanContainer());

                    spi.stats.onNodeFailed();
                }
            }
            else {
                if (log.isDebugEnabled())
                    log.debug("Ignore topology message, local node not added to topology: " + msg);
            }
        }

        /**
         * @param msg Message.
         */
        private void processMetricsUpdateMessage(TcpDiscoveryMetricsUpdateMessage msg) {
            if (spi.getSpiContext().isStopping())
                return;

            if (getLocalNodeId().equals(msg.creatorNodeId())) {
                assert msg.senderNodeId() != null;

                if (log.isDebugEnabled())
                    log.debug("Received metrics response: " + msg);
            }
            else {
                if (msg.hasMetrics())
                    processMsgCacheMetrics(msg, System.nanoTime());
            }
        }

        /**
         * @param msg Message.
         */
        private void processClientReconnectMessage(TcpDiscoveryClientReconnectMessage msg) {
            if (spi.getSpiContext().isStopping())
                return;

            if (getLocalNodeId().equals(msg.creatorNodeId())) {
                if (reconnector != null) {
                    assert msg.success() : msg;

                    currSock = reconnector.sockStream;

                    sockWriter.setSocket(currSock.socket(), reconnector.clientAck);
                    sockReader.setSocket(currSock, locNode.clientRouterNodeId());

                    reconnector = null;

                    for (TcpDiscoveryAbstractMessage pendingMsg : msg.pendingMessages()) {
                        if (log.isDebugEnabled())
                            log.debug("Process pending message on reconnect [msg=" + pendingMsg + ']');

                        processDiscoveryMessage(pendingMsg);
                    }
                }
                else {
                    if (joinLatch.getCount() > 0) {
                        if (msg.success()) {
                            for (TcpDiscoveryAbstractMessage pendingMsg : msg.pendingMessages()) {
                                if (log.isDebugEnabled())
                                    log.debug("Process pending message on connect [msg=" + pendingMsg + ']');

                                processDiscoveryMessage(pendingMsg);
                            }

                            assert joinLatch.getCount() == 0 : msg;
                        }
                    }
                    else if (log.isDebugEnabled())
                        log.debug("Discarding reconnect message, reconnect is completed: " + msg);
                }
            }
            else if (log.isDebugEnabled())
                log.debug("Discarding reconnect message for another client: " + msg);
        }

        /**
         * @param msg Message.
         */
        private void processCustomMessage(TcpDiscoveryCustomEventMessage msg) {
            if (state == CONNECTED) {
                DiscoverySpiListener lsnr = spi.lsnr;

                if (lsnr != null) {
                    UUID nodeId = msg.creatorNodeId();

                    TcpDiscoveryNode node = nodeId.equals(getLocalNodeId()) ? locNode : rmtNodes.get(nodeId);

                    if (node != null && node.visible()) {
                        try {
                            DiscoverySpiCustomMessage msgObj = msg.message(spi.marshaller(),
                                U.resolveClassLoader(spi.ignite().configuration()));

                            notifyDiscovery(
                                EVT_DISCOVERY_CUSTOM_EVT, topVer, node, allVisibleNodes(), msgObj, msg.spanContainer());
                        }
                        catch (Throwable e) {
                            U.error(log, "Failed to unmarshal discovery custom message.", e);
                        }
                    }
                    else if (log.isDebugEnabled())
                        log.debug("Received metrics from unknown node: " + nodeId);
                }
            }
        }

        /**
         * @param msg Message.
         */
        private void processClientPingResponse(TcpDiscoveryClientPingResponse msg) {
            GridFutureAdapter<Boolean> fut = pingFuts.remove(msg.nodeToPing());

            if (fut != null)
                fut.onDone(msg.result());
        }

        /**
         * Router want to ping this client.
         */
        private void processPingRequest() {
            TcpDiscoveryPingResponse res = new TcpDiscoveryPingResponse(getLocalNodeId());

            res.client(true);

            sockWriter.sendMessage(res);
        }

        /**
         * @param nodeId Node ID.
         * @param metrics Metrics.
         * @param cacheMetrics Cache metrics.
         * @param tsNanos Timestamp as returned by {@link System#nanoTime()}.
         */
        private void updateMetrics(UUID nodeId,
            ClusterMetrics metrics,
            Map<Integer, CacheMetrics> cacheMetrics,
            long tsNanos)
        {
            boolean isLocDaemon = spi.locNode.isDaemon();

            assert nodeId != null;
            assert metrics != null;
            assert isLocDaemon || cacheMetrics != null;

            TcpDiscoveryNode node = nodeId.equals(getLocalNodeId()) ? locNode : rmtNodes.get(nodeId);

            if (node != null && node.visible()) {
                node.setMetrics(metrics);

                if (!isLocDaemon)
                    node.setCacheMetrics(cacheMetrics);

                node.lastUpdateTimeNanos(tsNanos);

                notifyDiscovery(EVT_NODE_METRICS_UPDATED, topVer, node, allVisibleNodes(), null);
            }
            else if (log.isDebugEnabled())
                log.debug("Received metrics from unknown node: " + nodeId);
        }

        /**
         * @param type Event type.
         * @param topVer Topology version.
         * @param node Node.
         * @param top Topology snapshot.
         * @param spanContainer Span container.
         */
        private void notifyDiscovery(
            int type,
            long topVer,
            ClusterNode node,
            Collection<ClusterNode> top,
            SpanContainer spanContainer
        ) {
            notifyDiscovery(type, topVer, node, top, null, spanContainer);
        }

        /**
         * @param type Event type.
         * @param topVer Topology version.
         * @param node Node.
         * @param top Topology snapshot.
         * @param data Optional custom message data.
         */
        private void notifyDiscovery(
            int type,
            long topVer,
            ClusterNode node,
            Collection<ClusterNode> top,
            @Nullable DiscoverySpiCustomMessage data,
            SpanContainer spanContainer
        ) {
            DiscoverySpiListener lsnr = spi.lsnr;

            DebugLogger debugLog = type == EVT_NODE_METRICS_UPDATED ? traceLog : ClientImpl.this.debugLog;

            if (lsnr != null) {
                if (debugLog.isDebugEnabled())
                    debugLog.debug("Discovery notification [node=" + node + ", type=" + U.gridEventName(type) +
                        ", topVer=" + topVer + ']');

                lsnr.onDiscovery(
                    new DiscoveryNotification(type, topVer, node, top, new TreeMap<>(topHist), data, spanContainer)
                ).get();
            }
            else if (debugLog.isDebugEnabled())
                debugLog.debug("Skipped discovery notification [node=" + node + ", type=" + U.gridEventName(type) +
                    ", topVer=" + topVer + ']');
        }

        /**
         * @param msg Message.
         */
        void addMessage(Object msg) {
            //TODO: https://ggsystems.atlassian.net/browse/GG-22502
            if (msg instanceof TraceableMessage && msg instanceof TcpDiscoveryAbstractMessage) {
                TraceableMessage tMsg = (TraceableMessage)msg;

                if (!((TcpDiscoveryAbstractMessage)msg).verified() &&
                    tMsg.spanContainer().serializedSpanBytes() == null) {
                    Span rootSpan = tracing.create(TraceableMessagesTable.traceName(tMsg.getClass()))
                        .end();

                    // This root span will be parent both from local and remote nodes.
                    tMsg.spanContainer().serializedSpanBytes(tracing.serialize(rootSpan));
                }
            }

            queue.add(msg);
        }

        /**
         * @return Queue size.
         */
        int queueSize() {
            return queue.size();
        }
    }

    /**
     */
    private static class JoinTimeout {
        /** */
        private int joinCnt;

        /**
         * @param joinCnt Join count to compare.
         */
        private JoinTimeout(int joinCnt) {
            this.joinCnt = joinCnt;
        }
    }

    /**
     *
     */
    private static class SocketClosedMessage {
        /** */
        private final SocketStream sock;

        /**
         * @param sock Socket.
         */
        private SocketClosedMessage(SocketStream sock) {
            this.sock = sock;
        }
    }

    /**
     *
     */
    private static class SocketStream {
        /** */
        private final Socket sock;

        /** */
        private final InputStream in;

        /**
         * @param sock Socket.
         * @throws IOException If failed to create stream.
         */
        public SocketStream(Socket sock) throws IOException {
            assert sock != null;

            this.sock = sock;

            this.in = new BufferedInputStream(sock.getInputStream());
        }

        /**
         * @return Socket.
         */
        Socket socket() {
            return sock;

        }

        /**
         * @return Socket input stream.
         */
        InputStream stream() {
            return in;
        }

        /** {@inheritDoc} */
        @Override public String toString() {
            return sock.toString();
        }
    }

    /**
     *
     */
    enum State {
        /** */
        STARTING,

        /** */
        CONNECTED,

        /** */
        DISCONNECTED,

        /** */
        SEGMENTED,

        /** */
        STOPPED
    }
}
