| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.ignite.spi.discovery.tcp; |
| |
| import java.io.BufferedInputStream; |
| import java.io.IOException; |
| import java.io.InputStream; |
| import java.io.InterruptedIOException; |
| import java.io.StreamCorruptedException; |
| import java.net.InetSocketAddress; |
| import java.net.Socket; |
| import java.net.SocketTimeoutException; |
| import java.util.ArrayDeque; |
| import java.util.ArrayList; |
| import java.util.Arrays; |
| import java.util.Collection; |
| import java.util.Collections; |
| import java.util.Comparator; |
| import java.util.HashMap; |
| import java.util.Iterator; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.NavigableMap; |
| import java.util.NavigableSet; |
| import java.util.Queue; |
| import java.util.TreeMap; |
| import java.util.TreeSet; |
| import java.util.UUID; |
| import java.util.concurrent.BlockingDeque; |
| import java.util.concurrent.ConcurrentHashMap; |
| import java.util.concurrent.ConcurrentMap; |
| import java.util.concurrent.CountDownLatch; |
| import java.util.concurrent.Executors; |
| import java.util.concurrent.LinkedBlockingDeque; |
| import java.util.concurrent.ScheduledExecutorService; |
| import java.util.concurrent.ThreadLocalRandom; |
| import java.util.concurrent.atomic.AtomicReference; |
| import javax.net.ssl.SSLException; |
| import org.apache.ignite.Ignite; |
| import org.apache.ignite.IgniteCheckedException; |
| import org.apache.ignite.IgniteClientDisconnectedException; |
| import org.apache.ignite.IgniteException; |
| import org.apache.ignite.IgniteInterruptedException; |
| import org.apache.ignite.IgniteLogger; |
| import org.apache.ignite.IgniteSystemProperties; |
| import org.apache.ignite.cache.CacheMetrics; |
| import org.apache.ignite.cluster.ClusterMetrics; |
| import org.apache.ignite.cluster.ClusterNode; |
| import org.apache.ignite.configuration.IgniteConfiguration; |
| import org.apache.ignite.failure.FailureContext; |
| import org.apache.ignite.internal.IgniteClientDisconnectedCheckedException; |
| import org.apache.ignite.internal.IgniteEx; |
| import org.apache.ignite.internal.IgniteFeatures; |
| import org.apache.ignite.internal.IgniteInterruptedCheckedException; |
| import org.apache.ignite.internal.IgniteNodeAttributes; |
| import org.apache.ignite.internal.managers.discovery.CustomMessageWrapper; |
| import org.apache.ignite.internal.managers.discovery.DiscoveryServerOnlyCustomMessage; |
| import org.apache.ignite.internal.processors.tracing.Span; |
| import org.apache.ignite.internal.processors.tracing.SpanTags; |
| import org.apache.ignite.internal.processors.tracing.messages.SpanContainer; |
| import org.apache.ignite.internal.processors.tracing.messages.TraceableMessage; |
| import org.apache.ignite.internal.processors.tracing.messages.TraceableMessagesTable; |
| import org.apache.ignite.internal.util.future.GridFutureAdapter; |
| import org.apache.ignite.internal.util.tostring.GridToStringExclude; |
| import org.apache.ignite.internal.util.typedef.F; |
| import org.apache.ignite.internal.util.typedef.T2; |
| import org.apache.ignite.internal.util.typedef.T3; |
| import org.apache.ignite.internal.util.typedef.X; |
| import org.apache.ignite.internal.util.typedef.internal.LT; |
| import org.apache.ignite.internal.util.typedef.internal.U; |
| import org.apache.ignite.internal.util.worker.GridWorker; |
| import org.apache.ignite.internal.worker.WorkersRegistry; |
| import org.apache.ignite.lang.IgniteInClosure; |
| import org.apache.ignite.lang.IgniteUuid; |
| import org.apache.ignite.spi.IgniteSpiAdapter; |
| import org.apache.ignite.spi.IgniteSpiContext; |
| import org.apache.ignite.spi.IgniteSpiException; |
| import org.apache.ignite.spi.IgniteSpiOperationTimeoutHelper; |
| import org.apache.ignite.spi.IgniteSpiThread; |
| import org.apache.ignite.spi.discovery.DiscoveryNotification; |
| import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage; |
| import org.apache.ignite.spi.discovery.DiscoverySpiListener; |
| import org.apache.ignite.spi.discovery.tcp.internal.DiscoveryDataPacket; |
| import org.apache.ignite.spi.discovery.tcp.internal.TcpDiscoveryNode; |
| import org.apache.ignite.spi.discovery.tcp.internal.TcpDiscoveryNodesRing; |
| import org.apache.ignite.spi.discovery.tcp.ipfinder.multicast.TcpDiscoveryMulticastIpFinder; |
| import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryAbstractMessage; |
| import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryAuthFailedMessage; |
| import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryCheckFailedMessage; |
| import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryClientAckResponse; |
| import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryClientMetricsUpdateMessage; |
| import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryClientPingRequest; |
| import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryClientPingResponse; |
| import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryClientReconnectMessage; |
| import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryCustomEventMessage; |
| import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryDuplicateIdMessage; |
| import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryHandshakeRequest; |
| import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryHandshakeResponse; |
| import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryJoinRequestMessage; |
| import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryMetricsUpdateMessage; |
| import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryNodeAddFinishedMessage; |
| import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryNodeAddedMessage; |
| import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryNodeFailedMessage; |
| import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryNodeLeftMessage; |
| import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryPingRequest; |
| import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryPingResponse; |
| import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryRingLatencyCheckMessage; |
| import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryServerOnlyCustomEventMessage; |
| import org.apache.ignite.thread.IgniteThreadFactory; |
| import org.jetbrains.annotations.NotNull; |
| import org.jetbrains.annotations.Nullable; |
| |
| import static java.util.concurrent.TimeUnit.MILLISECONDS; |
| import static org.apache.ignite.IgniteSystemProperties.IGNITE_DISCO_FAILED_CLIENT_RECONNECT_DELAY; |
| import static org.apache.ignite.events.EventType.EVT_CLIENT_NODE_DISCONNECTED; |
| import static org.apache.ignite.events.EventType.EVT_CLIENT_NODE_RECONNECTED; |
| import static org.apache.ignite.events.EventType.EVT_NODE_FAILED; |
| import static org.apache.ignite.events.EventType.EVT_NODE_JOINED; |
| import static org.apache.ignite.events.EventType.EVT_NODE_LEFT; |
| import static org.apache.ignite.events.EventType.EVT_NODE_METRICS_UPDATED; |
| import static org.apache.ignite.events.EventType.EVT_NODE_SEGMENTED; |
| import static org.apache.ignite.failure.FailureType.CRITICAL_ERROR; |
| import static org.apache.ignite.internal.events.DiscoveryCustomEvent.EVT_DISCOVERY_CUSTOM_EVT; |
| import static org.apache.ignite.spi.discovery.tcp.ClientImpl.State.CONNECTED; |
| import static org.apache.ignite.spi.discovery.tcp.ClientImpl.State.DISCONNECTED; |
| import static org.apache.ignite.spi.discovery.tcp.ClientImpl.State.SEGMENTED; |
| import static org.apache.ignite.spi.discovery.tcp.ClientImpl.State.STARTING; |
| import static org.apache.ignite.spi.discovery.tcp.ClientImpl.State.STOPPED; |
| import static org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi.DFLT_DISCO_FAILED_CLIENT_RECONNECT_DELAY; |
| import static org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi.DFLT_THROTTLE_RECONNECT_RESET_TIMEOUT_INTERVAL; |
| |
| /** |
| * |
| */ |
| class ClientImpl extends TcpDiscoveryImpl { |
| /** */ |
| private static final Object SPI_STOP = "SPI_STOP"; |
| |
| /** */ |
| private static final Object SPI_RECONNECT_FAILED = "SPI_RECONNECT_FAILED"; |
| |
| /** */ |
| private static final Object SPI_RECONNECT = "SPI_RECONNECT"; |
| |
| /** */ |
| private static final long CLIENT_THROTTLE_RECONNECT_RESET_TIMEOUT = IgniteSystemProperties.getLong( |
| IgniteSystemProperties.CLIENT_THROTTLE_RECONNECT_RESET_TIMEOUT_INTERVAL, |
| DFLT_THROTTLE_RECONNECT_RESET_TIMEOUT_INTERVAL |
| ); |
| |
| /** Remote nodes. */ |
| private final ConcurrentMap<UUID, TcpDiscoveryNode> rmtNodes = new ConcurrentHashMap<>(); |
| |
| /** */ |
| private final List<DiscoveryDataPacket> delayDiscoData = new ArrayList<>(); |
| |
| /** Topology history. */ |
| private final NavigableMap<Long, Collection<ClusterNode>> topHist = new TreeMap<>(); |
| |
| /** Remote nodes. */ |
| private final ConcurrentMap<UUID, GridFutureAdapter<Boolean>> pingFuts = new ConcurrentHashMap<>(); |
| |
| /** Socket writer. */ |
| private SocketWriter sockWriter; |
| |
| /** */ |
| private SocketReader sockReader; |
| |
| /** */ |
| private volatile State state; |
| |
| /** Last message ID. */ |
| private volatile IgniteUuid lastMsgId; |
| |
| /** Current topology version. */ |
| private volatile long topVer; |
| |
| /** Join error. Contains error what occurs on join process. */ |
| private final AtomicReference<IgniteSpiException> joinErr = new AtomicReference<>(); |
| |
| /** Joined latch. */ |
| private final CountDownLatch joinLatch = new CountDownLatch(1); |
| |
| /** Left latch. */ |
| private final CountDownLatch leaveLatch = new CountDownLatch(1); |
| |
| /** */ |
| private final ScheduledExecutorService executorService; |
| |
| /** */ |
| private MessageWorker msgWorker; |
| |
| /** Force fail message for local node. */ |
| private TcpDiscoveryNodeFailedMessage forceFailMsg; |
| |
| /** */ |
| @GridToStringExclude |
| private int joinCnt; |
| |
| /** |
| * @param adapter Adapter. |
| */ |
| ClientImpl(TcpDiscoverySpi adapter) { |
| super(adapter); |
| |
| String instanceName = adapter.ignite() == null || adapter.ignite().name() == null |
| ? "client-node" : adapter.ignite().name(); |
| |
| executorService = Executors.newSingleThreadScheduledExecutor( |
| new IgniteThreadFactory(instanceName, "tcp-discovery-exec")); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void dumpDebugInfo(IgniteLogger log) { |
| StringBuilder b = new StringBuilder(U.nl()); |
| |
| b.append(">>>").append(U.nl()); |
| b.append(">>>").append("Dumping discovery SPI debug info.").append(U.nl()); |
| b.append(">>>").append(U.nl()); |
| |
| b.append("Local node ID: ").append(getLocalNodeId()).append(U.nl()).append(U.nl()); |
| b.append("Local node: ").append(locNode).append(U.nl()).append(U.nl()); |
| |
| b.append("Internal threads: ").append(U.nl()); |
| |
| b.append(" Message worker: ").append(threadStatus(msgWorker.runner())).append(U.nl()); |
| b.append(" Socket reader: ").append(threadStatus(sockReader)).append(U.nl()); |
| b.append(" Socket writer: ").append(threadStatus(sockWriter)).append(U.nl()); |
| |
| b.append(U.nl()); |
| |
| b.append("Nodes: ").append(U.nl()); |
| |
| for (ClusterNode node : allVisibleNodes()) |
| b.append(" ").append(node.id()).append(U.nl()); |
| |
| b.append(U.nl()); |
| |
| b.append("Stats: ").append(spi.stats).append(U.nl()); |
| |
| U.quietAndInfo(log, b.toString()); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void dumpRingStructure(IgniteLogger log) { |
| ClusterNode[] serverNodes = getRemoteNodes().stream() |
| .filter(node -> !node.isClient()) |
| .sorted(Comparator.comparingLong(ClusterNode::order)) |
| .toArray(ClusterNode[]::new); |
| |
| U.quietAndInfo(log, Arrays.toString(serverNodes)); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public long getCurrentTopologyVersion() { |
| return topVer; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public String getSpiState() { |
| |
| if (sockWriter.isOnline()) |
| return "connected"; |
| |
| return "disconnected"; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public int getMessageWorkerQueueSize() { |
| return msgWorker.queueSize(); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public UUID getCoordinator() { |
| return null; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void spiStart(@Nullable String igniteInstanceName) throws IgniteSpiException { |
| spi.initLocalNode( |
| 0, |
| true); |
| |
| locNode = spi.locNode; |
| |
| // Marshal credentials for backward compatibility and security. |
| marshalCredentials(locNode); |
| |
| sockWriter = new SocketWriter(); |
| sockWriter.start(); |
| |
| sockReader = new SocketReader(); |
| sockReader.start(); |
| |
| if (spi.ipFinder.isShared() && spi.isForceServerMode()) |
| registerLocalNodeAddress(); |
| |
| msgWorker = new MessageWorker(log); |
| |
| new IgniteSpiThread(msgWorker.igniteInstanceName(), msgWorker.name(), log) { |
| @Override protected void body() { |
| msgWorker.run(); |
| } |
| }.start(); |
| |
| executorService.scheduleAtFixedRate(new MetricsSender(), spi.metricsUpdateFreq, spi.metricsUpdateFreq, MILLISECONDS); |
| |
| try { |
| joinLatch.await(); |
| |
| IgniteSpiException err = joinErr.get(); |
| |
| if (err != null) |
| throw err; |
| } |
| catch (InterruptedException e) { |
| throw new IgniteSpiException("Thread has been interrupted.", e); |
| } |
| |
| spi.printStartInfo(); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void spiStop() throws IgniteSpiException { |
| if (msgWorker != null && !msgWorker.isDone()) { // Should always be alive |
| msgWorker.addMessage(SPI_STOP); |
| |
| try { |
| if (!leaveLatch.await(spi.netTimeout, MILLISECONDS)) |
| U.warn(log, "Failed to left node: timeout [nodeId=" + locNode + ']'); |
| } |
| catch (InterruptedException ignored) { |
| // No-op. |
| } |
| } |
| |
| for (GridFutureAdapter<Boolean> fut : pingFuts.values()) |
| fut.onDone(false); |
| |
| rmtNodes.clear(); |
| |
| if (msgWorker != null) |
| U.interrupt(msgWorker.runner()); |
| |
| U.interrupt(sockWriter); |
| U.interrupt(sockReader); |
| |
| if (msgWorker != null) |
| U.join(msgWorker.runner(), log); |
| |
| U.join(sockWriter, log); |
| |
| // SocketReader may loose interruption, this hack is made to overcome that case. |
| while (!U.join(sockReader, log, 200)) |
| U.interrupt(sockReader); |
| |
| executorService.shutdownNow(); |
| |
| spi.printStopInfo(); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void onContextInitialized0(IgniteSpiContext spiCtx) throws IgniteSpiException { |
| // No-op. |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public Collection<ClusterNode> getRemoteNodes() { |
| return U.arrayList(rmtNodes.values(), TcpDiscoveryNodesRing.VISIBLE_NODES); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public boolean allNodesSupport(IgniteFeatures feature) { |
| return IgniteFeatures.allNodesSupports(upcast(rmtNodes.values()), feature); |
| } |
| |
| /** {@inheritDoc} */ |
| @Nullable @Override public ClusterNode getNode(UUID nodeId) { |
| if (getLocalNodeId().equals(nodeId)) |
| return locNode; |
| |
| TcpDiscoveryNode node = rmtNodes.get(nodeId); |
| |
| return node != null && node.visible() ? node : null; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public boolean pingNode(@NotNull final UUID nodeId) { |
| if (nodeId.equals(getLocalNodeId())) |
| return true; |
| |
| TcpDiscoveryNode node = rmtNodes.get(nodeId); |
| |
| if (node == null || !node.visible()) |
| return false; |
| |
| GridFutureAdapter<Boolean> fut = pingFuts.get(nodeId); |
| |
| if (fut == null) { |
| fut = new GridFutureAdapter<>(); |
| |
| GridFutureAdapter<Boolean> oldFut = pingFuts.putIfAbsent(nodeId, fut); |
| |
| if (oldFut != null) |
| fut = oldFut; |
| else { |
| State state = this.state; |
| |
| if (spi.getSpiContext().isStopping() || state == STOPPED || state == SEGMENTED) { |
| if (pingFuts.remove(nodeId, fut)) |
| fut.onDone(false); |
| |
| return false; |
| } |
| else if (state == DISCONNECTED) { |
| if (pingFuts.remove(nodeId, fut)) |
| fut.onDone(new IgniteClientDisconnectedCheckedException(null, |
| "Failed to ping node, client node disconnected.")); |
| } |
| else { |
| final GridFutureAdapter<Boolean> finalFut = fut; |
| |
| executorService.schedule(() -> { |
| if (pingFuts.remove(nodeId, finalFut)) { |
| if (ClientImpl.this.state == DISCONNECTED) |
| finalFut.onDone(new IgniteClientDisconnectedCheckedException(null, |
| "Failed to ping node, client node disconnected.")); |
| else |
| finalFut.onDone(false); |
| } |
| }, spi.netTimeout, MILLISECONDS); |
| |
| sockWriter.sendMessage(new TcpDiscoveryClientPingRequest(getLocalNodeId(), nodeId)); |
| } |
| } |
| } |
| |
| try { |
| return fut.get(); |
| } |
| catch (IgniteInterruptedCheckedException ignored) { |
| return false; |
| } |
| catch (IgniteCheckedException e) { |
| throw new IgniteSpiException(e); |
| } |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void disconnect() throws IgniteSpiException { |
| if (msgWorker != null) |
| U.interrupt(msgWorker.runner()); |
| |
| U.interrupt(sockWriter); |
| U.interrupt(sockReader); |
| |
| if (msgWorker != null) |
| U.join(msgWorker.runner(), log); |
| |
| U.join(sockWriter, log); |
| U.join(sockReader, log); |
| |
| leaveLatch.countDown(); |
| joinLatch.countDown(); |
| |
| spi.getSpiContext().deregisterPorts(); |
| |
| Collection<ClusterNode> rmts = getRemoteNodes(); |
| |
| // This is restart/disconnection and remote nodes are not empty. |
| // We need to fire FAIL event for each. |
| DiscoverySpiListener lsnr = spi.lsnr; |
| |
| if (lsnr != null) { |
| for (ClusterNode n : rmts) { |
| rmtNodes.remove(n.id()); |
| |
| Collection<ClusterNode> top = updateTopologyHistory(topVer + 1, null); |
| |
| lsnr.onDiscovery( |
| new DiscoveryNotification( |
| EVT_NODE_FAILED, topVer, n, top, new TreeMap<>(topHist), null, null |
| ) |
| ).get(); |
| } |
| } |
| |
| rmtNodes.clear(); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void sendCustomEvent(DiscoverySpiCustomMessage evt) { |
| State state = this.state; |
| |
| if (state == DISCONNECTED) |
| throw new IgniteClientDisconnectedException(null, "Failed to send custom message: client is disconnected."); |
| |
| if (state == STOPPED || state == SEGMENTED || state == STARTING) |
| throw new IgniteException("Failed to send custom message: client is " + state.name().toLowerCase() + "."); |
| |
| try { |
| TcpDiscoveryCustomEventMessage msg; |
| |
| if (((CustomMessageWrapper)evt).delegate() instanceof DiscoveryServerOnlyCustomMessage) |
| msg = new TcpDiscoveryServerOnlyCustomEventMessage(getLocalNodeId(), evt, |
| U.marshal(spi.marshaller(), evt)); |
| else |
| msg = new TcpDiscoveryCustomEventMessage(getLocalNodeId(), evt, |
| U.marshal(spi.marshaller(), evt)); |
| |
| Span rootSpan = tracing.create(TraceableMessagesTable.traceName(msg.getClass())) |
| .addTag(SpanTags.tag(SpanTags.EVENT_NODE, SpanTags.ID), () -> getLocalNodeId().toString()) |
| .addTag(SpanTags.tag(SpanTags.EVENT_NODE, SpanTags.CONSISTENT_ID), |
| () -> locNode.consistentId().toString()) |
| .addTag(SpanTags.MESSAGE_CLASS, () -> ((CustomMessageWrapper)evt).delegate().getClass().getSimpleName()) |
| .addLog(() -> "Created"); |
| |
| // This root span will be parent both from local and remote nodes. |
| msg.spanContainer().serializedSpanBytes(tracing.serialize(rootSpan)); |
| |
| sockWriter.sendMessage(msg); |
| |
| rootSpan.addLog(() -> "Sent").end(); |
| } |
| catch (IgniteCheckedException e) { |
| throw new IgniteSpiException("Failed to marshal custom event: " + evt, e); |
| } |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void failNode(UUID nodeId, @Nullable String warning) { |
| TcpDiscoveryNode node = rmtNodes.get(nodeId); |
| |
| if (node != null) { |
| TcpDiscoveryNodeFailedMessage msg = new TcpDiscoveryNodeFailedMessage(getLocalNodeId(), |
| node.id(), |
| node.internalOrder()); |
| |
| msg.warning(warning); |
| |
| msg.force(true); |
| |
| msgWorker.addMessage(msg); |
| } |
| } |
| |
| /** |
| * @param prevAddr If reconnect is in progress, then previous address of the router the client was connected to |
| * and {@code null} otherwise. |
| * @param timeout Timeout. |
| * @param beforeEachSleep code to be run before each sleep span. |
| * @param afterEachSleep code to be run before each sleep span. |
| * @return Opened socket or {@code null} if timeout. |
| * @throws InterruptedException If interrupted. |
| * @throws IgniteSpiException If failed. |
| * @see TcpDiscoverySpi#joinTimeout |
| */ |
| @Nullable private T2<SocketStream, Boolean> joinTopology( |
| InetSocketAddress prevAddr, |
| long timeout, |
| @Nullable Runnable beforeEachSleep, |
| @Nullable Runnable afterEachSleep |
| ) throws IgniteSpiException, InterruptedException { |
| List<InetSocketAddress> addrs = null; |
| |
| long startNanos = System.nanoTime(); |
| |
| while (true) { |
| if (Thread.currentThread().isInterrupted()) |
| throw new InterruptedException(); |
| |
| while (addrs == null || addrs.isEmpty()) { |
| addrs = new ArrayList<>(spi.resolvedAddresses()); |
| |
| if (!F.isEmpty(addrs)) { |
| if (log.isDebugEnabled()) |
| log.debug("Resolved addresses from IP finder: " + addrs); |
| } |
| else { |
| if (timeout > 0 && U.millisSinceNanos(startNanos) > timeout) |
| return null; |
| |
| LT.warn(log, "IP finder returned empty addresses list. " + |
| "Please check IP finder configuration" + |
| (spi.ipFinder instanceof TcpDiscoveryMulticastIpFinder ? |
| " and make sure multicast works on your network. " : ". ") + |
| "Will retry every " + spi.getReconnectDelay() + " ms. " + |
| "Change 'reconnectDelay' to configure the frequency of retries.", true); |
| |
| sleepEx(spi.getReconnectDelay(), beforeEachSleep, afterEachSleep); |
| } |
| } |
| |
| // Process failed node last. |
| if (prevAddr != null) { |
| int idx = addrs.indexOf(prevAddr); |
| |
| if (idx != -1) |
| Collections.swap(addrs, idx, 0); |
| } |
| |
| Collection<InetSocketAddress> addrs0 = new ArrayList<>(addrs); |
| |
| boolean wait = false; |
| |
| for (int i = addrs.size() - 1; i >= 0; i--) { |
| if (Thread.currentThread().isInterrupted()) |
| throw new InterruptedException(); |
| |
| InetSocketAddress addr = addrs.get(i); |
| |
| boolean recon = prevAddr != null; |
| |
| T3<SocketStream, Integer, Boolean> sockAndRes = sendJoinRequest(recon, addr); |
| |
| if (sockAndRes == null) { |
| addrs.remove(i); |
| |
| continue; |
| } |
| |
| assert sockAndRes.get1() != null && sockAndRes.get2() != null : sockAndRes; |
| |
| Socket sock = sockAndRes.get1().socket(); |
| |
| if (log.isDebugEnabled()) |
| log.debug("Received response to join request [addr=" + addr + ", res=" + sockAndRes.get2() + ']'); |
| |
| switch (sockAndRes.get2()) { |
| case RES_OK: |
| return new T2<>(sockAndRes.get1(), sockAndRes.get3()); |
| |
| case RES_CONTINUE_JOIN: |
| case RES_WAIT: |
| wait = true; |
| |
| U.closeQuiet(sock); |
| |
| break; |
| |
| default: |
| if (log.isDebugEnabled()) |
| log.debug("Received unexpected response to join request: " + sockAndRes.get2()); |
| |
| U.closeQuiet(sock); |
| } |
| } |
| |
| if (timeout > 0 && U.millisSinceNanos(startNanos) > timeout) |
| return null; |
| |
| if (wait) { |
| if (log.isDebugEnabled()) |
| log.debug("Will wait before retry join."); |
| |
| sleepEx(spi.getReconnectDelay(), beforeEachSleep, afterEachSleep); |
| } |
| else if (addrs.isEmpty()) { |
| LT.warn(log, "Failed to connect to any address from IP finder (will retry to join topology " + |
| "every " + spi.getReconnectDelay() + " ms; change 'reconnectDelay' to configure the frequency " + |
| "of retries): " + toOrderedList(addrs0), true); |
| |
| sleepEx(spi.getReconnectDelay(), beforeEachSleep, afterEachSleep); |
| } |
| } |
| } |
| |
| /** */ |
| private static void sleepEx(long millis, Runnable before, Runnable after) throws InterruptedException { |
| if (before != null) |
| before.run(); |
| |
| try { |
| Thread.sleep(millis); |
| } |
| finally { |
| if (after != null) |
| after.run(); |
| } |
| } |
| |
| /** |
| * @param recon {@code True} if reconnects. |
| * @param addr Address. |
| * @return Socket, connect response and client acknowledge support flag. |
| */ |
| @Nullable private T3<SocketStream, Integer, Boolean> sendJoinRequest(boolean recon, |
| InetSocketAddress addr) { |
| assert addr != null; |
| |
| if (log.isDebugEnabled()) |
| log.debug("Send join request [addr=" + addr + ", reconnect=" + recon + |
| ", locNodeId=" + getLocalNodeId() + ']'); |
| |
| Collection<Throwable> errs = null; |
| |
| long ackTimeout0 = spi.getAckTimeout(); |
| |
| int reconCnt = 0; |
| |
| int connectAttempts = 1; |
| |
| int sslConnectAttempts = 3; |
| |
| UUID locNodeId = getLocalNodeId(); |
| |
| IgniteSpiOperationTimeoutHelper timeoutHelper = new IgniteSpiOperationTimeoutHelper(spi, true); |
| |
| DiscoveryDataPacket discoveryData = null; |
| |
| while (true) { |
| boolean openSock = false; |
| |
| Socket sock = null; |
| |
| try { |
| long tsNanos = System.nanoTime(); |
| |
| sock = spi.openSocket(addr, timeoutHelper); |
| |
| openSock = true; |
| |
| TcpDiscoveryHandshakeRequest req = new TcpDiscoveryHandshakeRequest(locNodeId); |
| |
| req.client(true); |
| |
| spi.writeToSocket(sock, req, timeoutHelper.nextTimeoutChunk(spi.getSocketTimeout())); |
| |
| TcpDiscoveryHandshakeResponse res = spi.readMessage(sock, null, ackTimeout0); |
| |
| UUID rmtNodeId = res.creatorNodeId(); |
| |
| assert rmtNodeId != null; |
| assert !getLocalNodeId().equals(rmtNodeId); |
| |
| locNode.clientRouterNodeId(rmtNodeId); |
| |
| tsNanos = System.nanoTime(); |
| |
| TcpDiscoveryAbstractMessage msg; |
| |
| if (!recon) { |
| TcpDiscoveryNode node = locNode; |
| |
| if (locNode.order() > 0) { |
| node = locNode.clientReconnectNode(spi.locNodeAttrs); |
| |
| marshalCredentials(node); |
| } |
| |
| if (discoveryData == null) { |
| DiscoveryDataPacket dataPacket = new DiscoveryDataPacket(getLocalNodeId()); |
| |
| dataPacket.joiningNodeClient(true); |
| |
| discoveryData = spi.collectExchangeData(dataPacket); |
| } |
| |
| TcpDiscoveryJoinRequestMessage joinReqMsg = new TcpDiscoveryJoinRequestMessage(node, discoveryData); |
| |
| TcpDiscoveryNode nodef = node; |
| |
| joinReqMsg.spanContainer().span( |
| tracing.create(TraceableMessagesTable.traceName(joinReqMsg.getClass())) |
| .addTag(SpanTags.tag(SpanTags.EVENT_NODE, SpanTags.ID), () -> nodef.id().toString()) |
| .addTag(SpanTags.tag(SpanTags.EVENT_NODE, SpanTags.CONSISTENT_ID), |
| () -> nodef.consistentId().toString()) |
| .addLog(() -> "Created") |
| .end() |
| ); |
| |
| msg = joinReqMsg; |
| |
| // During marshalling, SPI didn't know whether all nodes support compression as we didn't join yet. |
| // The only way to know is passing flag directly with handshake response. |
| if (!res.isDiscoveryDataPacketCompression()) |
| ((TcpDiscoveryJoinRequestMessage)msg).gridDiscoveryData().unzipData(log); |
| } |
| else |
| msg = new TcpDiscoveryClientReconnectMessage(getLocalNodeId(), rmtNodeId, lastMsgId); |
| |
| msg.client(true); |
| |
| if (msg instanceof TraceableMessage) |
| tracing.messages().beforeSend((TraceableMessage) msg); |
| |
| spi.writeToSocket(sock, msg, timeoutHelper.nextTimeoutChunk(spi.getSocketTimeout())); |
| |
| spi.stats.onMessageSent(msg, U.millisSinceNanos(tsNanos)); |
| |
| if (log.isDebugEnabled()) |
| log.debug("Message has been sent to address [msg=" + msg + ", addr=" + addr + |
| ", rmtNodeId=" + rmtNodeId + ']'); |
| |
| return new T3<>(new SocketStream(sock), |
| spi.readReceipt(sock, timeoutHelper.nextTimeoutChunk(ackTimeout0)), |
| res.clientAck()); |
| } |
| catch (IOException | IgniteCheckedException e) { |
| U.closeQuiet(sock); |
| |
| if (log.isDebugEnabled()) |
| log.error("Exception on joining: " + e.getMessage(), e); |
| |
| onException("Exception on joining: " + e.getMessage(), e); |
| |
| if (errs == null) |
| errs = new ArrayList<>(); |
| |
| errs.add(e); |
| |
| if (X.hasCause(e, SSLException.class)) { |
| if (--sslConnectAttempts == 0) |
| throw new IgniteSpiException("Unable to establish secure connection. " + |
| "Was remote cluster configured with SSL? [rmtAddr=" + addr + ", errMsg=\"" + e.getMessage() + "\"]", e); |
| |
| continue; |
| } |
| |
| if (X.hasCause(e, StreamCorruptedException.class)) { |
| // StreamCorruptedException could be caused by remote node failover |
| if (connectAttempts < 2) { |
| connectAttempts++; |
| |
| continue; |
| } |
| |
| if (log.isDebugEnabled()) |
| log.debug("Connect failed with StreamCorruptedException, skip address: " + addr); |
| |
| break; |
| } |
| |
| if (timeoutHelper.checkFailureTimeoutReached(e)) |
| break; |
| |
| if (!spi.failureDetectionTimeoutEnabled() && ++reconCnt == spi.getReconnectCount()) |
| break; |
| |
| if (!openSock) { |
| // Reconnect for the second time, if connection is not established. |
| if (connectAttempts < 2) { |
| connectAttempts++; |
| |
| continue; |
| } |
| |
| break; // Don't retry if we can not establish connection. |
| } |
| |
| if (!spi.failureDetectionTimeoutEnabled() && (e instanceof SocketTimeoutException || |
| X.hasCause(e, SocketTimeoutException.class))) { |
| ackTimeout0 *= 2; |
| |
| if (!checkAckTimeout(ackTimeout0)) |
| break; |
| } |
| } |
| } |
| |
| if (log.isDebugEnabled()) |
| log.debug("Failed to join to address [addr=" + addr + ", recon=" + recon + ", errs=" + errs + ']'); |
| |
| return null; |
| } |
| |
| /** |
| * Marshalls credentials with discovery SPI marshaller (will replace attribute value). |
| * |
| * @param node Node to marshall credentials for. |
| * @throws IgniteSpiException If marshalling failed. |
| */ |
| private void marshalCredentials(TcpDiscoveryNode node) throws IgniteSpiException { |
| try { |
| // Use security-unsafe getter. |
| Map<String, Object> attrs = new HashMap<>(node.getAttributes()); |
| |
| Object creds = attrs.get(IgniteNodeAttributes.ATTR_SECURITY_CREDENTIALS); |
| |
| assert !(creds instanceof byte[]); |
| |
| attrs.put(IgniteNodeAttributes.ATTR_SECURITY_CREDENTIALS, U.marshal(spi.marshaller(), creds)); |
| |
| node.setAttributes(attrs); |
| } |
| catch (IgniteCheckedException e) { |
| throw new IgniteSpiException("Failed to marshal node security credentials: " + node.id(), e); |
| } |
| } |
| |
| /** |
| * @param topVer New topology version. |
| * @param msg Discovery message. |
| * @return Latest topology snapshot. |
| */ |
| private Collection<ClusterNode> updateTopologyHistory(long topVer, @Nullable TcpDiscoveryAbstractMessage msg) { |
| this.topVer = topVer; |
| |
| if (!topHist.isEmpty() && topVer <= topHist.lastKey()) { |
| if (log.isDebugEnabled()) |
| log.debug("Skip topology update since topology already updated [msg=" + msg + |
| ", lastHistKey=" + topHist.lastKey() + |
| ", topVer=" + topVer + |
| ", locNode=" + locNode + ']'); |
| |
| Collection<ClusterNode> top = topHist.get(topVer); |
| |
| assert top != null : "Failed to find topology history [msg=" + msg + ", hist=" + topHist + ']'; |
| |
| return top; |
| } |
| |
| NavigableSet<ClusterNode> allNodes = allVisibleNodes(); |
| |
| if (!topHist.containsKey(topVer)) { |
| assert topHist.isEmpty() || topHist.lastKey() == topVer - 1 : |
| "lastVer=" + (topHist.isEmpty() ? null : topHist.lastKey()) + |
| ", newVer=" + topVer + |
| ", locNode=" + locNode + |
| ", msg=" + msg; |
| |
| topHist.put(topVer, allNodes); |
| |
| if (topHist.size() > spi.topHistSize) |
| topHist.pollFirstEntry(); |
| |
| assert topHist.lastKey() == topVer; |
| assert topHist.size() <= spi.topHistSize; |
| } |
| |
| return allNodes; |
| } |
| |
| /** |
| * @return All nodes. |
| */ |
| private NavigableSet<ClusterNode> allVisibleNodes() { |
| NavigableSet<ClusterNode> allNodes = new TreeSet<>(); |
| |
| for (TcpDiscoveryNode node : rmtNodes.values()) { |
| if (node.visible()) |
| allNodes.add(node); |
| } |
| |
| allNodes.add(locNode); |
| |
| return allNodes; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override void simulateNodeFailure() { |
| U.warn(log, "Simulating client node failure: " + getLocalNodeId()); |
| |
| U.interrupt(sockWriter); |
| |
| if (msgWorker != null) |
| U.interrupt(msgWorker.runner()); |
| |
| U.join(sockWriter, log); |
| |
| if (msgWorker != null) |
| U.join(msgWorker.runner(), log); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void reconnect() throws IgniteSpiException { |
| msgWorker.addMessage(SPI_RECONNECT); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void brakeConnection() { |
| SocketStream sockStream = msgWorker.currSock; |
| |
| if (sockStream != null) |
| U.closeQuiet(sockStream.socket()); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void checkRingLatency(int maxHops) { |
| TcpDiscoveryRingLatencyCheckMessage msg = new TcpDiscoveryRingLatencyCheckMessage(getLocalNodeId(), maxHops); |
| |
| if (log.isInfoEnabled()) |
| log.info("Latency check initiated: " + msg.id()); |
| |
| sockWriter.sendMessage(msg); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override protected Collection<IgniteSpiThread> threads() { |
| ArrayList<IgniteSpiThread> res = new ArrayList<>(); |
| |
| res.add(sockWriter); |
| |
| MessageWorker msgWorker0 = msgWorker; |
| |
| if (msgWorker0 != null) { |
| Thread runner = msgWorker0.runner(); |
| |
| if (runner instanceof IgniteSpiThread) |
| res.add((IgniteSpiThread)runner); |
| } |
| |
| return res; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void updateMetrics(UUID nodeId, |
| ClusterMetrics metrics, |
| Map<Integer, CacheMetrics> cacheMetrics, |
| long tsNanos) |
| { |
| boolean isLocDaemon = spi.locNode.isDaemon(); |
| |
| assert nodeId != null; |
| assert metrics != null; |
| assert isLocDaemon || cacheMetrics != null; |
| |
| TcpDiscoveryNode node = nodeId.equals(getLocalNodeId()) ? locNode : rmtNodes.get(nodeId); |
| |
| if (node != null && node.visible()) { |
| node.setMetrics(metrics); |
| |
| if (!isLocDaemon) |
| node.setCacheMetrics(cacheMetrics); |
| |
| node.lastUpdateTimeNanos(tsNanos); |
| |
| msgWorker.notifyDiscovery(EVT_NODE_METRICS_UPDATED, topVer, node, allVisibleNodes(), null); |
| } |
| else if (log.isDebugEnabled()) |
| log.debug("Received metrics from unknown node: " + nodeId); |
| } |
| |
| /** |
| * FOR TEST PURPOSE ONLY! |
| */ |
| @SuppressWarnings("BusyWait") |
| public void waitForClientMessagePrecessed() { |
| Object last = msgWorker.queue.peekLast(); |
| |
| while (last != null && !msgWorker.isDone() && msgWorker.queue.contains(last)) { |
| try { |
| Thread.sleep(10); |
| } |
| catch (InterruptedException ignored) { |
| Thread.currentThread().interrupt(); |
| } |
| } |
| } |
| |
| /** |
| * @param err Error. |
| */ |
| private void joinError(IgniteSpiException err) { |
| assert err != null; |
| |
| joinErr.compareAndSet(null, err); |
| |
| joinLatch.countDown(); |
| } |
| |
| /** */ |
| private WorkersRegistry getWorkersRegistry() { |
| Ignite ignite = spi.ignite(); |
| |
| return ignite instanceof IgniteEx ? ((IgniteEx)ignite).context().workersRegistry() : null; |
| } |
| |
| /** |
| * Metrics sender. |
| */ |
| private class MetricsSender implements Runnable { |
| /** {@inheritDoc} */ |
| @Override public void run() { |
| if (!spi.getSpiContext().isStopping() && sockWriter.isOnline()) { |
| TcpDiscoveryClientMetricsUpdateMessage msg = new TcpDiscoveryClientMetricsUpdateMessage( |
| getLocalNodeId(), |
| spi.metricsProvider.metrics()); |
| |
| msg.client(true); |
| |
| sockWriter.sendMessage(msg); |
| } |
| } |
| } |
| |
| /** |
| * Socket reader. |
| */ |
| private class SocketReader extends IgniteSpiThread { |
| /** */ |
| private final Object mux = new Object(); |
| |
| /** */ |
| private SocketStream sockStream; |
| |
| /** */ |
| private UUID rmtNodeId; |
| |
| /** */ |
| private CountDownLatch stopReadLatch; |
| |
| /** |
| */ |
| SocketReader() { |
| super(spi.ignite().name(), "tcp-client-disco-sock-reader-[]", log); |
| } |
| |
| /** |
| * @param sockStream Socket. |
| * @param rmtNodeId Rmt node id. |
| */ |
| void setSocket(SocketStream sockStream, UUID rmtNodeId) { |
| synchronized (mux) { |
| this.sockStream = sockStream; |
| |
| this.rmtNodeId = rmtNodeId; |
| |
| mux.notifyAll(); |
| } |
| } |
| |
| /** |
| * @throws InterruptedException If interrupted. |
| */ |
| private void forceStopRead() throws InterruptedException { |
| CountDownLatch stopReadLatch; |
| |
| synchronized (mux) { |
| SocketStream stream = sockStream; |
| |
| if (stream == null) |
| return; |
| |
| this.stopReadLatch = stopReadLatch = new CountDownLatch(1); |
| |
| U.closeQuiet(stream.socket()); |
| |
| this.sockStream = null; |
| this.rmtNodeId = null; |
| |
| mux.notifyAll(); |
| } |
| |
| stopReadLatch.await(); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override protected void body() throws InterruptedException { |
| while (!isInterrupted()) { |
| SocketStream sockStream; |
| UUID rmtNodeId; |
| |
| // Disconnected from router node. |
| U.enhanceThreadName(""); |
| |
| synchronized (mux) { |
| if (stopReadLatch != null) { |
| stopReadLatch.countDown(); |
| |
| stopReadLatch = null; |
| } |
| |
| if (this.sockStream == null) { |
| mux.wait(); |
| |
| continue; |
| } |
| |
| sockStream = this.sockStream; |
| rmtNodeId = this.rmtNodeId; |
| } |
| |
| Socket sock = sockStream.socket(); |
| |
| U.enhanceThreadName(U.id8(rmtNodeId) |
| + ' ' + sockStream.sock.getInetAddress().getHostAddress() |
| + ":" + sockStream.sock.getPort()); |
| |
| try { |
| InputStream in = sockStream.stream(); |
| |
| assert sock.getKeepAlive() && sock.getTcpNoDelay() : "Socket wasn't configured properly:" + |
| " KeepAlive " + sock.getKeepAlive() + |
| " TcpNoDelay " + sock.getTcpNoDelay(); |
| |
| while (!isInterrupted()) { |
| TcpDiscoveryAbstractMessage msg; |
| |
| try { |
| msg = U.unmarshal(spi.marshaller(), in, U.resolveClassLoader(spi.ignite().configuration())); |
| } |
| catch (IgniteCheckedException e) { |
| if (log.isDebugEnabled()) |
| U.error(log, "Failed to read message [sock=" + sock + ", " + |
| "locNodeId=" + getLocalNodeId() + ", rmtNodeId=" + rmtNodeId + ']', e); |
| |
| // Exists possibility that exception raised on interruption. |
| if (X.hasCause(e, InterruptedException.class, InterruptedIOException.class, |
| IgniteInterruptedCheckedException.class, IgniteInterruptedException.class)) |
| interrupt(); |
| |
| IOException ioEx = X.cause(e, IOException.class); |
| |
| if (ioEx != null) |
| throw ioEx; |
| |
| ClassNotFoundException clsNotFoundEx = X.cause(e, ClassNotFoundException.class); |
| |
| if (clsNotFoundEx != null) |
| LT.warn(log, "Failed to read message due to ClassNotFoundException " + |
| "(make sure same versions of all classes are available on all nodes) " + |
| "[rmtNodeId=" + rmtNodeId + ", err=" + clsNotFoundEx.getMessage() + ']'); |
| else |
| LT.error(log, e, "Failed to read message [sock=" + sock + ", locNodeId=" + |
| getLocalNodeId() + ", rmtNodeId=" + rmtNodeId + ']'); |
| |
| continue; |
| } |
| |
| msg.senderNodeId(rmtNodeId); |
| |
| DebugLogger debugLog = messageLogger(msg); |
| |
| if (debugLog.isDebugEnabled()) |
| debugLog.debug("Message has been received: " + msg); |
| |
| spi.stats.onMessageReceived(msg); |
| |
| boolean ack = msg instanceof TcpDiscoveryClientAckResponse; |
| |
| if (!ack) |
| msgWorker.addMessage(msg); |
| else |
| sockWriter.ackReceived((TcpDiscoveryClientAckResponse)msg); |
| } |
| } |
| catch (IOException e) { |
| msgWorker.addMessage(new SocketClosedMessage(sockStream)); |
| |
| if (log.isDebugEnabled()) |
| U.error(log, "Connection failed [sock=" + sock + ", locNodeId=" + getLocalNodeId() + ']', e); |
| } |
| finally { |
| U.closeQuiet(sock); |
| |
| synchronized (mux) { |
| if (this.sockStream == sockStream) { |
| this.sockStream = null; |
| this.rmtNodeId = null; |
| } |
| } |
| } |
| } |
| } |
| } |
| |
| /** |
| * |
| */ |
| private class SocketWriter extends IgniteSpiThread { |
| /** */ |
| private final Object mux = new Object(); |
| |
| /** */ |
| private Socket sock; |
| |
| /** */ |
| private boolean clientAck; |
| |
| /** */ |
| private final Queue<TcpDiscoveryAbstractMessage> queue = new ArrayDeque<>(); |
| |
| /** */ |
| private final long sockTimeout; |
| |
| /** */ |
| private TcpDiscoveryAbstractMessage unackedMsg; |
| |
| /** */ |
| private CountDownLatch forceLeaveLatch; |
| |
| /** |
| * |
| */ |
| SocketWriter() { |
| super(spi.ignite().name(), "tcp-client-disco-sock-writer", log); |
| |
| sockTimeout = spi.failureDetectionTimeoutEnabled() ? spi.failureDetectionTimeout() : |
| spi.getSocketTimeout(); |
| } |
| |
| /** |
| * @param msg Message. |
| */ |
| private void sendMessage(TcpDiscoveryAbstractMessage msg) { |
| synchronized (mux) { |
| queue.add(msg); |
| |
| mux.notifyAll(); |
| } |
| } |
| |
| /** |
| * Sends {@link TcpDiscoveryNodeLeftMessage} and closes socket. |
| * |
| * @throws InterruptedException If interrupted. |
| */ |
| private void forceLeave() throws InterruptedException { |
| CountDownLatch forceLeaveLatch; |
| |
| synchronized (mux) { |
| // If writer was stopped. |
| if (sock == null) |
| return; |
| |
| this.forceLeaveLatch = forceLeaveLatch = new CountDownLatch(1); |
| |
| unackedMsg = null; |
| |
| mux.notifyAll(); |
| } |
| |
| forceLeaveLatch.await(); |
| } |
| |
| /** |
| * @param sock Socket. |
| * @param clientAck {@code True} is server supports client message acknowlede. |
| */ |
| private void setSocket(Socket sock, boolean clientAck) { |
| synchronized (mux) { |
| this.sock = sock; |
| |
| this.clientAck = clientAck; |
| |
| unackedMsg = null; |
| |
| mux.notifyAll(); |
| } |
| } |
| |
| /** |
| * @return {@code True} if connection is alive. |
| */ |
| public boolean isOnline() { |
| synchronized (mux) { |
| return sock != null; |
| } |
| } |
| |
| /** |
| * @param res Acknowledge response. |
| */ |
| void ackReceived(TcpDiscoveryClientAckResponse res) { |
| synchronized (mux) { |
| if (unackedMsg != null) { |
| assert unackedMsg.id().equals(res.messageId()) : unackedMsg; |
| |
| unackedMsg = null; |
| } |
| |
| mux.notifyAll(); |
| } |
| } |
| |
| /** {@inheritDoc} */ |
| @Override protected void body() throws InterruptedException { |
| TcpDiscoveryAbstractMessage msg = null; |
| |
| while (!Thread.currentThread().isInterrupted()) { |
| Socket sock; |
| |
| synchronized (mux) { |
| sock = this.sock; |
| |
| if (sock == null) { |
| mux.wait(); |
| |
| continue; |
| } |
| |
| if (forceLeaveLatch != null) { |
| msg = new TcpDiscoveryNodeLeftMessage(getLocalNodeId()); |
| |
| msg.client(true); |
| |
| try { |
| spi.writeToSocket( |
| sock, |
| msg, |
| sockTimeout); |
| } |
| catch (IOException | IgniteCheckedException e) { |
| if (log.isDebugEnabled()) { |
| log.debug("Failed to send TcpDiscoveryNodeLeftMessage on force leave [msg=" + msg + |
| ", err=" + e.getMessage() + ']'); |
| } |
| } |
| |
| U.closeQuiet(sock); |
| |
| this.sock = null; |
| |
| clear(); |
| |
| continue; |
| } |
| else { |
| if (msg == null) |
| msg = queue.poll(); |
| |
| if (msg == null) { |
| mux.wait(); |
| |
| continue; |
| } |
| } |
| } |
| |
| for (IgniteInClosure<TcpDiscoveryAbstractMessage> msgLsnr : spi.sndMsgLsnrs) |
| msgLsnr.apply(msg); |
| |
| boolean ack = clientAck && !(msg instanceof TcpDiscoveryPingResponse); |
| |
| try { |
| if (ack) { |
| synchronized (mux) { |
| assert unackedMsg == null : "Unacked=" + unackedMsg + ", received=" + msg; |
| |
| unackedMsg = msg; |
| } |
| } |
| |
| spi.writeToSocket( |
| sock, |
| msg, |
| sockTimeout); |
| |
| IgniteUuid latencyCheckId = msg instanceof TcpDiscoveryRingLatencyCheckMessage ? |
| msg.id() : null; |
| |
| if (latencyCheckId != null && log.isInfoEnabled()) |
| log.info("Latency check message has been written to socket: " + latencyCheckId); |
| |
| msg = null; |
| |
| if (ack) { |
| long waitEndNanos = System.nanoTime() + U.millisToNanos(spi.failureDetectionTimeoutEnabled() ? |
| spi.failureDetectionTimeout() : spi.getAckTimeout()); |
| |
| TcpDiscoveryAbstractMessage unacked; |
| |
| synchronized (mux) { |
| long nowNanos = System.nanoTime(); |
| |
| while (unackedMsg != null && waitEndNanos - nowNanos > 0) { |
| mux.wait(U.nanosToMillis(waitEndNanos - nowNanos)); |
| |
| nowNanos = System.nanoTime(); |
| } |
| |
| unacked = unackedMsg; |
| |
| unackedMsg = null; |
| } |
| |
| if (unacked != null) { |
| if (log.isDebugEnabled()) |
| log.debug("Failed to get acknowledge for message, will try to reconnect " + |
| "[msg=" + unacked + |
| (spi.failureDetectionTimeoutEnabled() ? |
| ", failureDetectionTimeout=" + spi.failureDetectionTimeout() : |
| ", timeout=" + spi.getAckTimeout()) + ']'); |
| |
| throw new IOException("Failed to get acknowledge for message: " + unacked); |
| } |
| |
| if (latencyCheckId != null && log.isInfoEnabled()) |
| log.info("Latency check message has been acked: " + latencyCheckId); |
| } |
| } |
| catch (InterruptedException ignored) { |
| if (log.isDebugEnabled()) |
| log.debug("Client socket writer interrupted."); |
| |
| return; |
| } |
| catch (Exception e) { |
| if (spi.getSpiContext().isStopping()) { |
| if (log.isDebugEnabled()) |
| log.debug("Failed to send message, node is stopping [msg=" + msg + ", err=" + e + ']'); |
| } |
| else |
| U.error(log, "Failed to send message: " + msg, e); |
| |
| U.closeQuiet(sock); |
| |
| synchronized (mux) { |
| if (sock == this.sock) |
| this.sock = null; // Connection has dead. |
| |
| clear(); |
| } |
| } |
| } |
| } |
| |
| /** |
| * |
| */ |
| private void clear() { |
| assert Thread.holdsLock(mux); |
| |
| queue.clear(); |
| unackedMsg = null; |
| |
| CountDownLatch forceLeaveLatch = this.forceLeaveLatch; |
| |
| if (forceLeaveLatch != null) { |
| this.forceLeaveLatch = null; |
| |
| forceLeaveLatch.countDown(); |
| } |
| } |
| } |
| |
| /** |
| * |
| */ |
| private class Reconnector extends IgniteSpiThread { |
| /** */ |
| private volatile SocketStream sockStream; |
| |
| /** */ |
| private boolean clientAck; |
| |
| /** */ |
| private final boolean join; |
| |
| /** */ |
| private final InetSocketAddress prevAddr; |
| |
| /** |
| * @param join {@code True} if reconnects during join. |
| * @param prevAddr Address of the node, that this client was previously connected to. |
| */ |
| protected Reconnector(boolean join, InetSocketAddress prevAddr) { |
| super(spi.ignite().name(), "tcp-client-disco-reconnector", log); |
| |
| this.join = join; |
| this.prevAddr = prevAddr; |
| } |
| |
| /** |
| * |
| */ |
| public void cancel() { |
| interrupt(); |
| |
| SocketStream sockStream = this.sockStream; |
| |
| if (sockStream != null) |
| U.closeQuiet(sockStream.socket()); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override protected void body() throws InterruptedException { |
| boolean success = false; |
| |
| Exception err = null; |
| |
| long timeout = join ? spi.joinTimeout : spi.netTimeout; |
| |
| long startNanos = System.nanoTime(); |
| |
| if (log.isDebugEnabled()) |
| log.debug("Started reconnect process [join=" + join + ", timeout=" + timeout + ']'); |
| |
| try { |
| while (true) { |
| T2<SocketStream, Boolean> joinRes = joinTopology(prevAddr, timeout, null, null); |
| |
| if (joinRes == null) { |
| if (join) { |
| joinError(new IgniteSpiException("Join process timed out, connection failed and " + |
| "failed to reconnect (consider increasing 'joinTimeout' configuration property) " + |
| "[joinTimeout=" + spi.joinTimeout + ']')); |
| } |
| else |
| U.error(log, "Failed to reconnect to cluster (consider increasing 'networkTimeout'" + |
| " configuration property) [networkTimeout=" + spi.netTimeout + ']'); |
| |
| return; |
| } |
| |
| sockStream = joinRes.get1(); |
| clientAck = joinRes.get2(); |
| |
| Socket sock = sockStream.socket(); |
| |
| if (isInterrupted()) |
| throw new InterruptedException(); |
| |
| int oldTimeout = 0; |
| |
| try { |
| oldTimeout = sock.getSoTimeout(); |
| |
| sock.setSoTimeout((int)spi.netTimeout); |
| |
| InputStream in = sockStream.stream(); |
| |
| assert sock.getKeepAlive() && sock.getTcpNoDelay() : "Socket wasn't configured properly:" + |
| " KeepAlive " + sock.getKeepAlive() + |
| " TcpNoDelay " + sock.getTcpNoDelay(); |
| |
| List<TcpDiscoveryAbstractMessage> msgs = null; |
| |
| while (!isInterrupted()) { |
| TcpDiscoveryAbstractMessage msg = U.unmarshal(spi.marshaller(), in, |
| U.resolveClassLoader(spi.ignite().configuration())); |
| |
| if (msg instanceof TcpDiscoveryClientReconnectMessage) { |
| TcpDiscoveryClientReconnectMessage res = (TcpDiscoveryClientReconnectMessage)msg; |
| |
| if (res.creatorNodeId().equals(getLocalNodeId())) { |
| if (log.isDebugEnabled()) |
| log.debug("Received reconnect response [success=" + res.success() + |
| ", msg=" + msg + ']'); |
| |
| if (res.success()) { |
| msgWorker.addMessage(res); |
| |
| if (msgs != null) { |
| for (TcpDiscoveryAbstractMessage msg0 : msgs) |
| msgWorker.addMessage(msg0); |
| } |
| |
| success = true; |
| |
| return; |
| } |
| else |
| return; |
| } |
| } |
| else if (spi.ensured(msg)) { |
| if (msgs == null) |
| msgs = new ArrayList<>(); |
| |
| msgs.add(msg); |
| } |
| } |
| } |
| catch (IOException | IgniteCheckedException e) { |
| U.closeQuiet(sock); |
| |
| if (log.isDebugEnabled()) |
| log.error("Reconnect error [join=" + join + ", timeout=" + timeout + ']', e); |
| |
| if (timeout > 0 && U.millisSinceNanos(startNanos) > timeout) { |
| String msg = join ? "Failed to connect to cluster (consider increasing 'joinTimeout' " + |
| "configuration property) [joinTimeout=" + spi.joinTimeout + ", err=" + e + ']' : |
| "Failed to reconnect to cluster (consider increasing 'networkTimeout' " + |
| "configuration property) [networkTimeout=" + spi.netTimeout + ", err=" + e + ']'; |
| |
| U.warn(log, msg); |
| |
| throw e; |
| } |
| else |
| U.warn(log, "Failed to reconnect to cluster (will retry): " + e); |
| } |
| finally { |
| if (success) |
| sock.setSoTimeout(oldTimeout); |
| } |
| } |
| } |
| catch (IOException | IgniteCheckedException e) { |
| err = e; |
| |
| success = false; |
| |
| U.error(log, "Failed to reconnect", e); |
| } |
| finally { |
| if (!success) { |
| SocketStream sockStream = this.sockStream; |
| |
| if (sockStream != null) |
| U.closeQuiet(sockStream.socket()); |
| |
| if (join) |
| joinError(new IgniteSpiException("Failed to connect to cluster, connection failed and failed " + |
| "to reconnect.", err)); |
| else |
| msgWorker.addMessage(SPI_RECONNECT_FAILED); |
| } |
| } |
| } |
| } |
| |
| /** |
| * Message worker. |
| */ |
| protected class MessageWorker extends GridWorker { |
| /** Message queue. */ |
| private final BlockingDeque<Object> queue = new LinkedBlockingDeque<>(); |
| |
| /** */ |
| private SocketStream currSock; |
| |
| /** */ |
| private Reconnector reconnector; |
| |
| /** */ |
| private boolean nodeAdded; |
| |
| /** */ |
| private long lastReconnectTsNanos = -1; |
| |
| /** */ |
| private long currentReconnectDelay = -1; |
| |
| /** |
| * @param log Logger. |
| */ |
| private MessageWorker(IgniteLogger log) { |
| super(spi.ignite().name(), "tcp-client-disco-msg-worker", log, getWorkersRegistry()); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override protected void body() throws InterruptedException { |
| state = STARTING; |
| |
| updateHeartbeat(); |
| |
| try { |
| tryJoin(); |
| |
| while (true) { |
| onIdle(); |
| |
| Object msg; |
| |
| blockingSectionBegin(); |
| |
| try { |
| msg = queue.take(); |
| } |
| finally { |
| blockingSectionEnd(); |
| } |
| |
| if (msg instanceof JoinTimeout) { |
| int joinCnt0 = ((JoinTimeout)msg).joinCnt; |
| |
| if (joinCnt == joinCnt0) { |
| if (state == STARTING) { |
| joinError(new IgniteSpiException("Join process timed out, did not receive response for " + |
| "join request (consider increasing 'joinTimeout' configuration property) " + |
| "[joinTimeout=" + spi.joinTimeout + ", sock=" + currSock + ']')); |
| |
| break; |
| } |
| else if (state == DISCONNECTED) { |
| if (log.isDebugEnabled()) |
| log.debug("Failed to reconnect, local node segmented " + |
| "[joinTimeout=" + spi.joinTimeout + ']'); |
| |
| state = SEGMENTED; |
| |
| notifyDiscovery( |
| EVT_NODE_SEGMENTED, topVer, locNode, allVisibleNodes(), null); |
| } |
| } |
| } |
| else if (msg == SPI_STOP) { |
| boolean connected = state == CONNECTED; |
| |
| state = STOPPED; |
| |
| assert spi.getSpiContext().isStopping(); |
| |
| if (connected && currSock != null) { |
| TcpDiscoveryNodeLeftMessage leftMsg = new TcpDiscoveryNodeLeftMessage(getLocalNodeId()); |
| |
| leftMsg.client(true); |
| |
| Span rootSpan = tracing.create(TraceableMessagesTable.traceName(leftMsg.getClass())) |
| .addTag(SpanTags.tag(SpanTags.EVENT_NODE, SpanTags.ID), () -> locNode.id().toString()) |
| .addTag(SpanTags.tag(SpanTags.EVENT_NODE, SpanTags.CONSISTENT_ID), |
| () -> locNode.consistentId().toString()) |
| .addLog(() -> "Created"); |
| |
| leftMsg.spanContainer().serializedSpanBytes(tracing.serialize(rootSpan)); |
| |
| sockWriter.sendMessage(leftMsg); |
| |
| rootSpan.addLog(() -> "Sent").end(); |
| } |
| else |
| leaveLatch.countDown(); |
| } |
| else if (msg == SPI_RECONNECT) { |
| if (state == CONNECTED) { |
| if (reconnector != null) { |
| reconnector.cancel(); |
| reconnector.join(); |
| |
| reconnector = null; |
| } |
| |
| sockWriter.forceLeave(); |
| sockReader.forceStopRead(); |
| |
| currSock = null; |
| |
| queue.clear(); |
| |
| onDisconnected(); |
| |
| UUID newId = UUID.randomUUID(); |
| |
| U.quietAndWarn(log, "Local node will try to reconnect to cluster with new id due " + |
| "to network problems [newId=" + newId + |
| ", prevId=" + locNode.id() + |
| ", locNode=" + locNode + ']'); |
| |
| locNode.onClientDisconnected(newId); |
| |
| throttleClientReconnect(); |
| |
| tryJoin(); |
| } |
| } |
| else if (msg instanceof TcpDiscoveryNodeFailedMessage && |
| ((TcpDiscoveryNodeFailedMessage)msg).failedNodeId().equals(locNode.id())) { |
| TcpDiscoveryNodeFailedMessage msg0 = (TcpDiscoveryNodeFailedMessage)msg; |
| |
| assert msg0.force() : msg0; |
| |
| forceFailMsg = msg0; |
| } |
| else if (msg instanceof SocketClosedMessage) { |
| if (((SocketClosedMessage)msg).sock == currSock) { |
| Socket sock = currSock.sock; |
| |
| InetSocketAddress prevAddr = new InetSocketAddress(sock.getInetAddress(), sock.getPort()); |
| |
| currSock = null; |
| |
| boolean join = joinLatch.getCount() > 0; |
| |
| if (spi.getSpiContext().isStopping() || state == SEGMENTED) { |
| leaveLatch.countDown(); |
| |
| if (join) { |
| joinError(new IgniteSpiException("Failed to connect to cluster: socket closed.")); |
| |
| break; |
| } |
| } |
| else { |
| if (forceFailMsg != null) { |
| if (log.isDebugEnabled()) { |
| log.debug("Connection closed, local node received force fail message, " + |
| "will not try to restore connection"); |
| } |
| |
| queue.addFirst(SPI_RECONNECT_FAILED); |
| } |
| else { |
| if (log.isDebugEnabled()) |
| log.debug("Connection closed, will try to restore connection."); |
| |
| assert reconnector == null; |
| |
| reconnector = new Reconnector(join, prevAddr); |
| reconnector.start(); |
| } |
| } |
| } |
| } |
| else if (msg == SPI_RECONNECT_FAILED) { |
| if (reconnector != null) { |
| reconnector.cancel(); |
| reconnector.join(); |
| |
| reconnector = null; |
| } |
| else |
| assert forceFailMsg != null; |
| |
| if (spi.isClientReconnectDisabled()) { |
| if (state != SEGMENTED && state != STOPPED) { |
| if (forceFailMsg != null) { |
| U.quietAndWarn(log, "Local node was dropped from cluster due to network problems " + |
| "[nodeInitiatedFail=" + forceFailMsg.creatorNodeId() + |
| ", msg=" + forceFailMsg.warning() + ']'); |
| } |
| |
| if (log.isDebugEnabled()) { |
| log.debug("Failed to restore closed connection, reconnect disabled, " + |
| "local node segmented [networkTimeout=" + spi.netTimeout + ']'); |
| } |
| |
| state = SEGMENTED; |
| |
| notifyDiscovery( |
| EVT_NODE_SEGMENTED, topVer, locNode, allVisibleNodes(), null); |
| } |
| } |
| else { |
| if (state == STARTING || state == CONNECTED) { |
| if (log.isDebugEnabled()) { |
| log.debug("Failed to restore closed connection, will try to reconnect " + |
| "[networkTimeout=" + spi.netTimeout + |
| ", joinTimeout=" + spi.joinTimeout + |
| ", failMsg=" + forceFailMsg + ']'); |
| } |
| |
| onDisconnected(); |
| } |
| |
| UUID newId = UUID.randomUUID(); |
| |
| if (forceFailMsg != null) { |
| long delay = IgniteSystemProperties.getLong(IGNITE_DISCO_FAILED_CLIENT_RECONNECT_DELAY, |
| DFLT_DISCO_FAILED_CLIENT_RECONNECT_DELAY); |
| |
| if (delay > 0) { |
| U.quietAndWarn(log, "Local node was dropped from cluster due to network problems, " + |
| "will try to reconnect with new id after " + delay + "ms (reconnect delay " + |
| "can be changed using IGNITE_DISCO_FAILED_CLIENT_RECONNECT_DELAY system " + |
| "property) [" + |
| "newId=" + newId + |
| ", prevId=" + locNode.id() + |
| ", locNode=" + locNode + |
| ", nodeInitiatedFail=" + forceFailMsg.creatorNodeId() + |
| ", msg=" + forceFailMsg.warning() + ']'); |
| |
| Thread.sleep(delay); |
| } |
| else { |
| U.quietAndWarn(log, "Local node was dropped from cluster due to network problems, " + |
| "will try to reconnect with new id [" + |
| "newId=" + newId + |
| ", prevId=" + locNode.id() + |
| ", locNode=" + locNode + |
| ", nodeInitiatedFail=" + forceFailMsg.creatorNodeId() + |
| ", msg=" + forceFailMsg.warning() + ']'); |
| } |
| |
| forceFailMsg = null; |
| } |
| else if (log.isInfoEnabled()) { |
| log.info("Client node disconnected from cluster, will try to reconnect with new id " + |
| "[newId=" + newId + ", prevId=" + locNode.id() + ", locNode=" + locNode + ']'); |
| } |
| |
| locNode.onClientDisconnected(newId); |
| |
| tryJoin(); |
| } |
| } |
| else { |
| TcpDiscoveryAbstractMessage discoMsg = (TcpDiscoveryAbstractMessage)msg; |
| |
| if (joining()) { |
| IgniteSpiException err = null; |
| |
| if (discoMsg instanceof TcpDiscoveryDuplicateIdMessage) |
| err = spi.duplicateIdError((TcpDiscoveryDuplicateIdMessage)msg); |
| else if (discoMsg instanceof TcpDiscoveryAuthFailedMessage) |
| err = spi.authenticationFailedError((TcpDiscoveryAuthFailedMessage)msg); |
| //TODO: https://issues.apache.org/jira/browse/IGNITE-9829 |
| else if (discoMsg instanceof TcpDiscoveryCheckFailedMessage) |
| err = spi.checkFailedError((TcpDiscoveryCheckFailedMessage)msg); |
| |
| if (err != null) { |
| if (state == DISCONNECTED) { |
| U.error(log, "Failed to reconnect, segment local node.", err); |
| |
| state = SEGMENTED; |
| |
| notifyDiscovery( |
| EVT_NODE_SEGMENTED, topVer, locNode, allVisibleNodes(), null); |
| } |
| else |
| joinError(err); |
| |
| cancel(); |
| |
| break; |
| } |
| } |
| |
| processDiscoveryMessage((TcpDiscoveryAbstractMessage)msg); |
| } |
| } |
| } |
| catch (InterruptedException ignored) { |
| Thread.currentThread().interrupt(); |
| } |
| catch (Throwable t) { |
| if (spi.ignite() instanceof IgniteEx) |
| ((IgniteEx)spi.ignite()).context().failure().process(new FailureContext(CRITICAL_ERROR, t)); |
| } |
| finally { |
| SocketStream currSock = this.currSock; |
| |
| if (currSock != null) |
| U.closeQuiet(currSock.socket()); |
| |
| if (joinLatch.getCount() > 0) |
| joinError(new IgniteSpiException("Some error in join process.")); // This should not occur. |
| |
| if (reconnector != null) { |
| reconnector.cancel(); |
| |
| reconnector.join(); |
| } |
| } |
| } |
| |
| /** |
| * Wait random delay before trying to reconnect. Delay will grow exponentially every time client is forced to |
| * reconnect, but only if all these reconnections happened in small period of time (2 minutes). Maximum delay |
| * could be configured with {@link IgniteSpiAdapter#clientFailureDetectionTimeout()}, default value is |
| * {@link IgniteConfiguration#DFLT_CLIENT_FAILURE_DETECTION_TIMEOUT}. |
| * |
| * @throws InterruptedException If thread is interrupted. |
| */ |
| private void throttleClientReconnect() throws InterruptedException { |
| if (currentReconnectDelay == -1 |
| || U.millisSinceNanos(lastReconnectTsNanos) > CLIENT_THROTTLE_RECONNECT_RESET_TIMEOUT |
| ) |
| currentReconnectDelay = 0; // Skip pause on first reconnect. |
| else if (currentReconnectDelay == 0) |
| currentReconnectDelay = 200; |
| else { |
| long maxDelay = spi.failureDetectionTimeoutEnabled() |
| ? spi.clientFailureDetectionTimeout() |
| : IgniteConfiguration.DFLT_CLIENT_FAILURE_DETECTION_TIMEOUT; |
| |
| currentReconnectDelay = Math.min(maxDelay, (int)(currentReconnectDelay * 1.5)); |
| } |
| |
| if (currentReconnectDelay != 0) { |
| ThreadLocalRandom random = ThreadLocalRandom.current(); |
| |
| Thread.sleep(random.nextLong(currentReconnectDelay / 2, currentReconnectDelay)); |
| } |
| |
| lastReconnectTsNanos = System.nanoTime(); |
| } |
| |
| /** |
| * |
| */ |
| private void onDisconnected() { |
| state = DISCONNECTED; |
| |
| nodeAdded = false; |
| |
| delayDiscoData.clear(); |
| |
| notifyDiscovery(EVT_CLIENT_NODE_DISCONNECTED, topVer, locNode, allVisibleNodes(), null); |
| |
| IgniteClientDisconnectedCheckedException err = |
| new IgniteClientDisconnectedCheckedException(null, "Failed to ping node, " + |
| "client node disconnected."); |
| |
| for (Map.Entry<UUID, GridFutureAdapter<Boolean>> e : pingFuts.entrySet()) { |
| GridFutureAdapter<Boolean> fut = e.getValue(); |
| |
| if (pingFuts.remove(e.getKey(), fut)) |
| fut.onDone(err); |
| } |
| } |
| |
| /** |
| * @throws InterruptedException If interrupted. |
| */ |
| private void tryJoin() throws InterruptedException { |
| assert state == DISCONNECTED || state == STARTING : state; |
| |
| boolean join = state == STARTING; |
| |
| joinCnt++; |
| |
| T2<SocketStream, Boolean> joinRes; |
| |
| try { |
| joinRes = joinTopology(null, spi.joinTimeout, |
| new Runnable() { |
| @Override public void run() { |
| blockingSectionBegin(); |
| } |
| }, |
| new Runnable() { |
| @Override public void run() { |
| blockingSectionEnd(); |
| } |
| }); |
| } |
| catch (IgniteSpiException e) { |
| joinError(e); |
| |
| return; |
| } |
| |
| if (joinRes == null) { |
| if (join) |
| joinError(new IgniteSpiException("Join process timed out (timeout = " + spi.joinTimeout + ").")); |
| else { |
| state = SEGMENTED; |
| |
| notifyDiscovery(EVT_NODE_SEGMENTED, topVer, locNode, allVisibleNodes(), null); |
| } |
| |
| return; |
| } |
| |
| currSock = joinRes.get1(); |
| |
| sockWriter.setSocket(joinRes.get1().socket(), joinRes.get2()); |
| |
| if (spi.joinTimeout > 0) { |
| final int joinCnt0 = joinCnt; |
| |
| executorService.schedule(() -> { |
| queue.add(new JoinTimeout(joinCnt0)); |
| }, spi.joinTimeout, MILLISECONDS); |
| } |
| |
| sockReader.setSocket(joinRes.get1(), locNode.clientRouterNodeId()); |
| } |
| |
| /** |
| * @param msg Message. |
| */ |
| protected void processDiscoveryMessage(TcpDiscoveryAbstractMessage msg) { |
| assert msg != null; |
| assert msg.verified() || msg.senderNodeId() == null; |
| |
| spi.startMessageProcess(msg); |
| |
| spi.stats.onMessageProcessingStarted(msg); |
| |
| if (msg instanceof TraceableMessage) |
| tracing.messages().beforeSend((TraceableMessage) msg); |
| |
| if (msg instanceof TcpDiscoveryNodeAddedMessage) |
| processNodeAddedMessage((TcpDiscoveryNodeAddedMessage)msg); |
| else if (msg instanceof TcpDiscoveryNodeAddFinishedMessage) |
| processNodeAddFinishedMessage((TcpDiscoveryNodeAddFinishedMessage)msg); |
| else if (msg instanceof TcpDiscoveryNodeLeftMessage) |
| processNodeLeftMessage((TcpDiscoveryNodeLeftMessage)msg); |
| else if (msg instanceof TcpDiscoveryNodeFailedMessage) |
| processNodeFailedMessage((TcpDiscoveryNodeFailedMessage)msg); |
| else if (msg instanceof TcpDiscoveryMetricsUpdateMessage) |
| processMetricsUpdateMessage((TcpDiscoveryMetricsUpdateMessage)msg); |
| else if (msg instanceof TcpDiscoveryClientReconnectMessage) |
| processClientReconnectMessage((TcpDiscoveryClientReconnectMessage)msg); |
| else if (msg instanceof TcpDiscoveryCustomEventMessage) |
| processCustomMessage((TcpDiscoveryCustomEventMessage)msg); |
| else if (msg instanceof TcpDiscoveryClientPingResponse) |
| processClientPingResponse((TcpDiscoveryClientPingResponse)msg); |
| else if (msg instanceof TcpDiscoveryPingRequest) |
| processPingRequest(); |
| |
| spi.stats.onMessageProcessingFinished(msg); |
| |
| if (msg instanceof TraceableMessage) |
| tracing.messages().finishProcessing((TraceableMessage) msg); |
| |
| if (spi.ensured(msg) |
| && state == CONNECTED |
| && !(msg instanceof TcpDiscoveryClientReconnectMessage)) |
| lastMsgId = msg.id(); |
| } |
| |
| /** |
| * @return {@code True} if client in process of join. |
| */ |
| private boolean joining() { |
| ClientImpl.State state = ClientImpl.this.state; |
| |
| return state == STARTING || state == DISCONNECTED; |
| } |
| |
| /** |
| * @return {@code True} if client disconnected. |
| */ |
| private boolean disconnected() { |
| return state == DISCONNECTED; |
| } |
| |
| /** |
| * @param msg Message. |
| */ |
| private void processNodeAddedMessage(TcpDiscoveryNodeAddedMessage msg) { |
| if (spi.getSpiContext().isStopping()) |
| return; |
| |
| TcpDiscoveryNode node = msg.node(); |
| |
| UUID newNodeId = node.id(); |
| |
| if (getLocalNodeId().equals(newNodeId)) { |
| if (joining()) { |
| Collection<TcpDiscoveryNode> top = msg.topology(); |
| |
| if (top != null) { |
| spi.gridStartTime = msg.gridStartTime(); |
| |
| if (disconnected()) |
| rmtNodes.clear(); |
| |
| for (TcpDiscoveryNode n : top) { |
| if (n.order() > 0) |
| n.visible(true); |
| |
| rmtNodes.put(n.id(), n); |
| } |
| |
| topHist.clear(); |
| |
| nodeAdded = true; |
| |
| if (msg.topologyHistory() != null) |
| topHist.putAll(msg.topologyHistory()); |
| } |
| else { |
| if (log.isDebugEnabled()) |
| log.debug("Discarding node added message with empty topology: " + msg); |
| } |
| } |
| else if (log.isDebugEnabled()) |
| log.debug("Discarding node added message (this message has already been processed) " + |
| "[msg=" + msg + ", locNode=" + locNode + ']'); |
| } |
| else { |
| if (nodeAdded()) { |
| boolean topChanged = rmtNodes.putIfAbsent(newNodeId, node) == null; |
| |
| if (topChanged) { |
| if (log.isDebugEnabled()) |
| log.debug("Added new node to topology: " + node); |
| |
| DiscoveryDataPacket dataPacket = msg.gridDiscoveryData(); |
| |
| if (dataPacket != null && dataPacket.hasJoiningNodeData()) { |
| if (joining()) |
| delayDiscoData.add(dataPacket); |
| else |
| spi.onExchange(dataPacket, U.resolveClassLoader(spi.ignite().configuration())); |
| } |
| } |
| } |
| else { |
| if (log.isDebugEnabled()) |
| log.debug("Ignore topology message, local node not added to topology: " + msg); |
| } |
| } |
| } |
| |
| /** |
| * @param msg Message. |
| */ |
| private void processNodeAddFinishedMessage(TcpDiscoveryNodeAddFinishedMessage msg) { |
| if (spi.getSpiContext().isStopping()) |
| return; |
| |
| if (log.isInfoEnabled()) { |
| for (ClusterNode node : getRemoteNodes()) { |
| if (node.id().equals(locNode.clientRouterNodeId())) { |
| if (log.isInfoEnabled()) |
| log.info("Router node: " + node); |
| |
| break; |
| } |
| } |
| } |
| |
| if (getLocalNodeId().equals(msg.nodeId())) { |
| if (joining()) { |
| DiscoveryDataPacket dataContainer = msg.clientDiscoData(); |
| |
| if (dataContainer != null) |
| spi.onExchange(dataContainer, U.resolveClassLoader(spi.ignite().configuration())); |
| |
| if (!delayDiscoData.isEmpty()) { |
| for (DiscoveryDataPacket data : delayDiscoData) |
| spi.onExchange(data, U.resolveClassLoader(spi.ignite().configuration())); |
| |
| delayDiscoData.clear(); |
| } |
| |
| locNode.setAttributes(msg.clientNodeAttributes()); |
| locNode.visible(true); |
| |
| long topVer = msg.topologyVersion(); |
| |
| locNode.order(topVer); |
| |
| for (Iterator<Long> it = topHist.keySet().iterator(); it.hasNext();) { |
| if (it.next() >= topVer) |
| it.remove(); |
| } |
| |
| Collection<ClusterNode> nodes = updateTopologyHistory(topVer, msg); |
| |
| boolean disconnected = disconnected(); |
| |
| state = CONNECTED; |
| |
| notifyDiscovery(EVT_NODE_JOINED, topVer, locNode, nodes, msg.spanContainer()); |
| |
| if (disconnected) { |
| notifyDiscovery(EVT_CLIENT_NODE_RECONNECTED, topVer, locNode, nodes, null); |
| |
| U.quietAndWarn(log, "Client node was reconnected after it was already considered " + |
| "failed by the server topology (this could happen after all servers restarted or due " + |
| "to a long network outage between the client and servers). All continuous queries and " + |
| "remote event listeners created by this client will be unsubscribed, consider " + |
| "listening to EVT_CLIENT_NODE_RECONNECTED event to restore them."); |
| } |
| |
| joinErr.set(null); |
| |
| joinLatch.countDown(); |
| } |
| else if (log.isDebugEnabled()) |
| log.debug("Discarding node add finished message (this message has already been processed) " + |
| "[msg=" + msg + ", locNode=" + locNode + ']'); |
| } |
| else { |
| if (nodeAdded()) { |
| TcpDiscoveryNode node = rmtNodes.get(msg.nodeId()); |
| |
| if (node == null) { |
| if (log.isDebugEnabled()) |
| log.debug("Discarding node add finished message since node is not found [msg=" + msg + ']'); |
| |
| return; |
| } |
| |
| boolean evt = false; |
| |
| long topVer = msg.topologyVersion(); |
| |
| assert topVer > 0 : msg; |
| |
| if (!node.visible()) { |
| node.order(topVer); |
| node.visible(true); |
| |
| if (spi.locNodeVer.equals(node.version())) |
| node.version(spi.locNodeVer); |
| |
| evt = true; |
| } |
| else { |
| if (log.isDebugEnabled()) |
| log.debug("Skip node join event, node already joined [msg=" + msg + ", node=" + node + ']'); |
| |
| assert node.order() == topVer : node; |
| } |
| |
| Collection<ClusterNode> top = updateTopologyHistory(topVer, msg); |
| |
| assert top != null && top.contains(node) : "Topology does not contain node [msg=" + msg + |
| ", node=" + node + ", top=" + top + ']'; |
| |
| if (state != CONNECTED) { |
| if (log.isDebugEnabled()) |
| log.debug("Discarding node add finished message (join process is not finished): " + msg); |
| |
| return; |
| } |
| |
| if (evt) { |
| notifyDiscovery(EVT_NODE_JOINED, topVer, node, top, msg.spanContainer()); |
| |
| spi.stats.onNodeJoined(); |
| } |
| } |
| else { |
| if (log.isDebugEnabled()) |
| log.debug("Ignore topology message, local node not added to topology: " + msg); |
| } |
| } |
| } |
| |
| /** |
| * @param msg Message. |
| */ |
| private void processNodeLeftMessage(TcpDiscoveryNodeLeftMessage msg) { |
| if (getLocalNodeId().equals(msg.creatorNodeId())) { |
| if (log.isDebugEnabled()) |
| log.debug("Received node left message for local node: " + msg); |
| |
| leaveLatch.countDown(); |
| } |
| else { |
| if (spi.getSpiContext().isStopping()) |
| return; |
| |
| if (nodeAdded()) { |
| TcpDiscoveryNode node = rmtNodes.remove(msg.creatorNodeId()); |
| |
| if (node == null) { |
| if (log.isDebugEnabled()) |
| log.debug("Discarding node left message since node is not found [msg=" + msg + ']'); |
| |
| return; |
| } |
| |
| Collection<ClusterNode> top = updateTopologyHistory(msg.topologyVersion(), msg); |
| |
| if (state != CONNECTED) { |
| if (log.isDebugEnabled()) |
| log.debug("Discarding node left message (join process is not finished): " + msg); |
| |
| return; |
| } |
| |
| notifyDiscovery(EVT_NODE_LEFT, msg.topologyVersion(), node, top, msg.spanContainer()); |
| |
| spi.stats.onNodeLeft(); |
| } |
| else { |
| if (log.isDebugEnabled()) |
| log.debug("Ignore topology message, local node not added to topology: " + msg); |
| } |
| } |
| } |
| |
| /** |
| * @return {@code True} if received node added message for local node. |
| */ |
| private boolean nodeAdded() { |
| return nodeAdded; |
| } |
| |
| /** |
| * @param msg Message. |
| */ |
| private void processNodeFailedMessage(TcpDiscoveryNodeFailedMessage msg) { |
| if (spi.getSpiContext().isStopping()) { |
| if (!getLocalNodeId().equals(msg.creatorNodeId()) && getLocalNodeId().equals(msg.failedNodeId())) { |
| if (leaveLatch.getCount() > 0) { |
| if (log.isDebugEnabled()) |
| log.debug("Remote node fail this node while node is stopping [locNode=" + getLocalNodeId() |
| + ", rmtNode=" + msg.creatorNodeId() + ']'); |
| |
| leaveLatch.countDown(); |
| } |
| } |
| |
| return; |
| } |
| |
| if (nodeAdded()) { |
| if (!getLocalNodeId().equals(msg.creatorNodeId())) { |
| TcpDiscoveryNode node = rmtNodes.remove(msg.failedNodeId()); |
| |
| if (node == null) { |
| if (log.isDebugEnabled()) |
| log.debug("Discarding node failed message since node is not found [msg=" + msg + ']'); |
| |
| return; |
| } |
| |
| Collection<ClusterNode> top = updateTopologyHistory(msg.topologyVersion(), msg); |
| |
| if (state != CONNECTED) { |
| if (log.isDebugEnabled()) |
| log.debug("Discarding node failed message (join process is not finished): " + msg); |
| |
| return; |
| } |
| |
| if (msg.warning() != null) { |
| ClusterNode creatorNode = rmtNodes.get(msg.creatorNodeId()); |
| |
| U.warn(log, "Received EVT_NODE_FAILED event with warning [" + |
| "nodeInitiatedEvt=" + (creatorNode != null ? creatorNode : msg.creatorNodeId()) + |
| ", msg=" + msg.warning() + ']'); |
| } |
| |
| notifyDiscovery(EVT_NODE_FAILED, msg.topologyVersion(), node, top, msg.spanContainer()); |
| |
| spi.stats.onNodeFailed(); |
| } |
| } |
| else { |
| if (log.isDebugEnabled()) |
| log.debug("Ignore topology message, local node not added to topology: " + msg); |
| } |
| } |
| |
| /** |
| * @param msg Message. |
| */ |
| private void processMetricsUpdateMessage(TcpDiscoveryMetricsUpdateMessage msg) { |
| if (spi.getSpiContext().isStopping()) |
| return; |
| |
| if (getLocalNodeId().equals(msg.creatorNodeId())) { |
| assert msg.senderNodeId() != null; |
| |
| if (log.isDebugEnabled()) |
| log.debug("Received metrics response: " + msg); |
| } |
| else { |
| if (msg.hasMetrics()) |
| processMsgCacheMetrics(msg, System.nanoTime()); |
| } |
| } |
| |
| /** |
| * @param msg Message. |
| */ |
| private void processClientReconnectMessage(TcpDiscoveryClientReconnectMessage msg) { |
| if (spi.getSpiContext().isStopping()) |
| return; |
| |
| if (getLocalNodeId().equals(msg.creatorNodeId())) { |
| if (reconnector != null) { |
| assert msg.success() : msg; |
| |
| currSock = reconnector.sockStream; |
| |
| sockWriter.setSocket(currSock.socket(), reconnector.clientAck); |
| sockReader.setSocket(currSock, locNode.clientRouterNodeId()); |
| |
| reconnector = null; |
| |
| for (TcpDiscoveryAbstractMessage pendingMsg : msg.pendingMessages()) { |
| if (log.isDebugEnabled()) |
| log.debug("Process pending message on reconnect [msg=" + pendingMsg + ']'); |
| |
| processDiscoveryMessage(pendingMsg); |
| } |
| } |
| else { |
| if (joinLatch.getCount() > 0) { |
| if (msg.success()) { |
| for (TcpDiscoveryAbstractMessage pendingMsg : msg.pendingMessages()) { |
| if (log.isDebugEnabled()) |
| log.debug("Process pending message on connect [msg=" + pendingMsg + ']'); |
| |
| processDiscoveryMessage(pendingMsg); |
| } |
| |
| assert joinLatch.getCount() == 0 : msg; |
| } |
| } |
| else if (log.isDebugEnabled()) |
| log.debug("Discarding reconnect message, reconnect is completed: " + msg); |
| } |
| } |
| else if (log.isDebugEnabled()) |
| log.debug("Discarding reconnect message for another client: " + msg); |
| } |
| |
| /** |
| * @param msg Message. |
| */ |
| private void processCustomMessage(TcpDiscoveryCustomEventMessage msg) { |
| if (state == CONNECTED) { |
| DiscoverySpiListener lsnr = spi.lsnr; |
| |
| if (lsnr != null) { |
| UUID nodeId = msg.creatorNodeId(); |
| |
| TcpDiscoveryNode node = nodeId.equals(getLocalNodeId()) ? locNode : rmtNodes.get(nodeId); |
| |
| if (node != null && node.visible()) { |
| try { |
| DiscoverySpiCustomMessage msgObj = msg.message(spi.marshaller(), |
| U.resolveClassLoader(spi.ignite().configuration())); |
| |
| notifyDiscovery( |
| EVT_DISCOVERY_CUSTOM_EVT, topVer, node, allVisibleNodes(), msgObj, msg.spanContainer()); |
| } |
| catch (Throwable e) { |
| U.error(log, "Failed to unmarshal discovery custom message.", e); |
| } |
| } |
| else if (log.isDebugEnabled()) |
| log.debug("Received metrics from unknown node: " + nodeId); |
| } |
| } |
| } |
| |
| /** |
| * @param msg Message. |
| */ |
| private void processClientPingResponse(TcpDiscoveryClientPingResponse msg) { |
| GridFutureAdapter<Boolean> fut = pingFuts.remove(msg.nodeToPing()); |
| |
| if (fut != null) |
| fut.onDone(msg.result()); |
| } |
| |
| /** |
| * Router want to ping this client. |
| */ |
| private void processPingRequest() { |
| TcpDiscoveryPingResponse res = new TcpDiscoveryPingResponse(getLocalNodeId()); |
| |
| res.client(true); |
| |
| sockWriter.sendMessage(res); |
| } |
| |
| /** |
| * @param nodeId Node ID. |
| * @param metrics Metrics. |
| * @param cacheMetrics Cache metrics. |
| * @param tsNanos Timestamp as returned by {@link System#nanoTime()}. |
| */ |
| private void updateMetrics(UUID nodeId, |
| ClusterMetrics metrics, |
| Map<Integer, CacheMetrics> cacheMetrics, |
| long tsNanos) |
| { |
| boolean isLocDaemon = spi.locNode.isDaemon(); |
| |
| assert nodeId != null; |
| assert metrics != null; |
| assert isLocDaemon || cacheMetrics != null; |
| |
| TcpDiscoveryNode node = nodeId.equals(getLocalNodeId()) ? locNode : rmtNodes.get(nodeId); |
| |
| if (node != null && node.visible()) { |
| node.setMetrics(metrics); |
| |
| if (!isLocDaemon) |
| node.setCacheMetrics(cacheMetrics); |
| |
| node.lastUpdateTimeNanos(tsNanos); |
| |
| notifyDiscovery(EVT_NODE_METRICS_UPDATED, topVer, node, allVisibleNodes(), null); |
| } |
| else if (log.isDebugEnabled()) |
| log.debug("Received metrics from unknown node: " + nodeId); |
| } |
| |
| /** |
| * @param type Event type. |
| * @param topVer Topology version. |
| * @param node Node. |
| * @param top Topology snapshot. |
| * @param spanContainer Span container. |
| */ |
| private void notifyDiscovery( |
| int type, |
| long topVer, |
| ClusterNode node, |
| Collection<ClusterNode> top, |
| SpanContainer spanContainer |
| ) { |
| notifyDiscovery(type, topVer, node, top, null, spanContainer); |
| } |
| |
| /** |
| * @param type Event type. |
| * @param topVer Topology version. |
| * @param node Node. |
| * @param top Topology snapshot. |
| * @param data Optional custom message data. |
| */ |
| private void notifyDiscovery( |
| int type, |
| long topVer, |
| ClusterNode node, |
| Collection<ClusterNode> top, |
| @Nullable DiscoverySpiCustomMessage data, |
| SpanContainer spanContainer |
| ) { |
| DiscoverySpiListener lsnr = spi.lsnr; |
| |
| DebugLogger debugLog = type == EVT_NODE_METRICS_UPDATED ? traceLog : ClientImpl.this.debugLog; |
| |
| if (lsnr != null) { |
| if (debugLog.isDebugEnabled()) |
| debugLog.debug("Discovery notification [node=" + node + ", type=" + U.gridEventName(type) + |
| ", topVer=" + topVer + ']'); |
| |
| lsnr.onDiscovery( |
| new DiscoveryNotification(type, topVer, node, top, new TreeMap<>(topHist), data, spanContainer) |
| ).get(); |
| } |
| else if (debugLog.isDebugEnabled()) |
| debugLog.debug("Skipped discovery notification [node=" + node + ", type=" + U.gridEventName(type) + |
| ", topVer=" + topVer + ']'); |
| } |
| |
| /** |
| * @param msg Message. |
| */ |
| void addMessage(Object msg) { |
| //TODO: https://ggsystems.atlassian.net/browse/GG-22502 |
| if (msg instanceof TraceableMessage && msg instanceof TcpDiscoveryAbstractMessage) { |
| TraceableMessage tMsg = (TraceableMessage)msg; |
| |
| if (!((TcpDiscoveryAbstractMessage)msg).verified() && |
| tMsg.spanContainer().serializedSpanBytes() == null) { |
| Span rootSpan = tracing.create(TraceableMessagesTable.traceName(tMsg.getClass())) |
| .end(); |
| |
| // This root span will be parent both from local and remote nodes. |
| tMsg.spanContainer().serializedSpanBytes(tracing.serialize(rootSpan)); |
| } |
| } |
| |
| queue.add(msg); |
| } |
| |
| /** |
| * @return Queue size. |
| */ |
| int queueSize() { |
| return queue.size(); |
| } |
| } |
| |
| /** |
| */ |
| private static class JoinTimeout { |
| /** */ |
| private int joinCnt; |
| |
| /** |
| * @param joinCnt Join count to compare. |
| */ |
| private JoinTimeout(int joinCnt) { |
| this.joinCnt = joinCnt; |
| } |
| } |
| |
| /** |
| * |
| */ |
| private static class SocketClosedMessage { |
| /** */ |
| private final SocketStream sock; |
| |
| /** |
| * @param sock Socket. |
| */ |
| private SocketClosedMessage(SocketStream sock) { |
| this.sock = sock; |
| } |
| } |
| |
| /** |
| * |
| */ |
| private static class SocketStream { |
| /** */ |
| private final Socket sock; |
| |
| /** */ |
| private final InputStream in; |
| |
| /** |
| * @param sock Socket. |
| * @throws IOException If failed to create stream. |
| */ |
| public SocketStream(Socket sock) throws IOException { |
| assert sock != null; |
| |
| this.sock = sock; |
| |
| this.in = new BufferedInputStream(sock.getInputStream()); |
| } |
| |
| /** |
| * @return Socket. |
| */ |
| Socket socket() { |
| return sock; |
| |
| } |
| |
| /** |
| * @return Socket input stream. |
| */ |
| InputStream stream() { |
| return in; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public String toString() { |
| return sock.toString(); |
| } |
| } |
| |
| /** |
| * |
| */ |
| enum State { |
| /** */ |
| STARTING, |
| |
| /** */ |
| CONNECTED, |
| |
| /** */ |
| DISCONNECTED, |
| |
| /** */ |
| SEGMENTED, |
| |
| /** */ |
| STOPPED |
| } |
| } |