blob: 18d9ad8785dd1a9ec3601d0f32f9c8c82aa7029a [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.spi.communication.tcp;
import javax.net.ssl.SSLEngine;
import javax.net.ssl.SSLException;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.ConnectException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.net.SocketException;
import java.net.SocketTimeoutException;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.nio.channels.Channel;
import java.nio.channels.SelectableChannel;
import java.nio.channels.SocketChannel;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.BitSet;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.StringJoiner;
import java.util.UUID;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteClientDisconnectedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.IgniteSystemProperties;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.configuration.AddressResolver;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.events.DiscoveryEvent;
import org.apache.ignite.events.Event;
import org.apache.ignite.failure.FailureContext;
import org.apache.ignite.failure.FailureType;
import org.apache.ignite.internal.IgniteClientDisconnectedCheckedException;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.IgniteFeatures;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.IgniteInterruptedCheckedException;
import org.apache.ignite.internal.IgniteKernal;
import org.apache.ignite.internal.IgniteTooManyOpenFilesException;
import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
import org.apache.ignite.internal.managers.discovery.IgniteDiscoverySpi;
import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
import org.apache.ignite.internal.managers.eventstorage.HighPriorityListener;
import org.apache.ignite.internal.processors.metric.impl.MetricUtils;
import org.apache.ignite.internal.processors.tracing.MTC;
import org.apache.ignite.internal.processors.tracing.NoopTracing;
import org.apache.ignite.internal.processors.tracing.SpanTags;
import org.apache.ignite.internal.processors.tracing.Tracing;
import org.apache.ignite.internal.util.GridConcurrentFactory;
import org.apache.ignite.internal.util.GridSpinReadWriteLock;
import org.apache.ignite.internal.util.future.GridFinishedFuture;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.internal.util.future.IgniteFutureImpl;
import org.apache.ignite.internal.util.ipc.IpcEndpoint;
import org.apache.ignite.internal.util.ipc.IpcToNioAdapter;
import org.apache.ignite.internal.util.ipc.shmem.IpcOutOfSystemResourcesException;
import org.apache.ignite.internal.util.ipc.shmem.IpcSharedMemoryServerEndpoint;
import org.apache.ignite.internal.util.lang.IgniteInClosure2X;
import org.apache.ignite.internal.util.nio.GridCommunicationClient;
import org.apache.ignite.internal.util.nio.GridConnectionBytesVerifyFilter;
import org.apache.ignite.internal.util.nio.GridDirectParser;
import org.apache.ignite.internal.util.nio.GridNioCodecFilter;
import org.apache.ignite.internal.util.nio.GridNioFilter;
import org.apache.ignite.internal.util.nio.GridNioMessageReaderFactory;
import org.apache.ignite.internal.util.nio.GridNioMessageTracker;
import org.apache.ignite.internal.util.nio.GridNioMessageWriterFactory;
import org.apache.ignite.internal.util.nio.GridNioRecoveryDescriptor;
import org.apache.ignite.internal.util.nio.GridNioServer;
import org.apache.ignite.internal.util.nio.GridNioServerListener;
import org.apache.ignite.internal.util.nio.GridNioServerListenerAdapter;
import org.apache.ignite.internal.util.nio.GridNioSession;
import org.apache.ignite.internal.util.nio.GridNioSessionMetaKey;
import org.apache.ignite.internal.util.nio.GridSelectorNioSessionImpl;
import org.apache.ignite.internal.util.nio.GridNioTracerFilter;
import org.apache.ignite.internal.util.nio.GridShmemCommunicationClient;
import org.apache.ignite.internal.util.nio.GridTcpNioCommunicationClient;
import org.apache.ignite.internal.util.nio.ssl.BlockingSslHandler;
import org.apache.ignite.internal.util.nio.ssl.GridNioSslFilter;
import org.apache.ignite.internal.util.nio.ssl.GridSslMeta;
import org.apache.ignite.internal.util.typedef.CI1;
import org.apache.ignite.internal.util.typedef.CI2;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.X;
import org.apache.ignite.internal.util.typedef.internal.LT;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.internal.util.worker.GridWorker;
import org.apache.ignite.internal.worker.WorkersRegistry;
import org.apache.ignite.lang.IgniteBiInClosure;
import org.apache.ignite.lang.IgniteBiTuple;
import org.apache.ignite.lang.IgniteFuture;
import org.apache.ignite.lang.IgniteInClosure;
import org.apache.ignite.lang.IgnitePredicate;
import org.apache.ignite.lang.IgniteProductVersion;
import org.apache.ignite.lang.IgniteRunnable;
import org.apache.ignite.lang.IgniteUuid;
import org.apache.ignite.plugin.extensions.communication.Message;
import org.apache.ignite.plugin.extensions.communication.MessageFactory;
import org.apache.ignite.plugin.extensions.communication.MessageFormatter;
import org.apache.ignite.plugin.extensions.communication.MessageReader;
import org.apache.ignite.plugin.extensions.communication.MessageWriter;
import org.apache.ignite.resources.IgniteInstanceResource;
import org.apache.ignite.resources.LoggerResource;
import org.apache.ignite.spi.ExponentialBackoffTimeoutStrategy;
import org.apache.ignite.spi.IgnitePortProtocol;
import org.apache.ignite.spi.IgniteSpiAdapter;
import org.apache.ignite.spi.IgniteSpiConfiguration;
import org.apache.ignite.spi.IgniteSpiConsistencyChecked;
import org.apache.ignite.spi.IgniteSpiContext;
import org.apache.ignite.spi.IgniteSpiException;
import org.apache.ignite.spi.IgniteSpiMBeanAdapter;
import org.apache.ignite.spi.IgniteSpiMultipleInstancesSupport;
import org.apache.ignite.spi.IgniteSpiOperationTimeoutException;
import org.apache.ignite.spi.IgniteSpiOperationTimeoutHelper;
import org.apache.ignite.spi.IgniteSpiThread;
import org.apache.ignite.spi.IgniteSpiTimeoutObject;
import org.apache.ignite.spi.TimeoutStrategy;
import org.apache.ignite.spi.communication.CommunicationListener;
import org.apache.ignite.spi.communication.CommunicationSpi;
import org.apache.ignite.spi.communication.tcp.internal.CommunicationListenerEx;
import org.apache.ignite.spi.communication.tcp.internal.ConnectionKey;
import org.apache.ignite.spi.communication.tcp.internal.HandshakeException;
import org.apache.ignite.spi.communication.tcp.internal.TcpCommunicationConnectionCheckFuture;
import org.apache.ignite.spi.communication.tcp.internal.TcpCommunicationNodeConnectionCheckFuture;
import org.apache.ignite.spi.communication.tcp.messages.HandshakeMessage;
import org.apache.ignite.spi.communication.tcp.messages.HandshakeMessage2;
import org.apache.ignite.spi.communication.tcp.messages.HandshakeWaitMessage;
import org.apache.ignite.spi.communication.tcp.messages.NodeIdMessage;
import org.apache.ignite.spi.communication.tcp.messages.RecoveryLastReceivedMessage;
import org.apache.ignite.spi.discovery.DiscoverySpi;
import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
import org.apache.ignite.thread.IgniteThread;
import org.jetbrains.annotations.Nullable;
import static java.util.Collections.emptyList;
import static java.util.Objects.nonNull;
import static org.apache.ignite.IgniteSystemProperties.IGNITE_TCP_COMM_SET_ATTR_HOST_NAMES;
import static org.apache.ignite.IgniteSystemProperties.getBoolean;
import static org.apache.ignite.events.EventType.EVT_NODE_FAILED;
import static org.apache.ignite.events.EventType.EVT_NODE_LEFT;
import static org.apache.ignite.failure.FailureType.CRITICAL_ERROR;
import static org.apache.ignite.failure.FailureType.SYSTEM_WORKER_TERMINATION;
import static org.apache.ignite.internal.processors.tracing.messages.TraceableMessagesTable.traceName;
import static org.apache.ignite.internal.IgniteFeatures.CHANNEL_COMMUNICATION;
import static org.apache.ignite.internal.IgniteFeatures.nodeSupports;
import static org.apache.ignite.internal.util.nio.GridNioSessionMetaKey.SSL_META;
import static org.apache.ignite.plugin.extensions.communication.Message.DIRECT_TYPE_SIZE;
import static org.apache.ignite.spi.communication.tcp.internal.TcpCommunicationConnectionCheckFuture.SES_FUT_META;
import static org.apache.ignite.spi.communication.tcp.messages.RecoveryLastReceivedMessage.ALREADY_CONNECTED;
import static org.apache.ignite.spi.communication.tcp.messages.RecoveryLastReceivedMessage.NEED_WAIT;
import static org.apache.ignite.spi.communication.tcp.messages.RecoveryLastReceivedMessage.NODE_STOPPING;
import static org.apache.ignite.spi.communication.tcp.messages.RecoveryLastReceivedMessage.UNKNOWN_NODE;
/**
* <tt>TcpCommunicationSpi</tt> is default communication SPI which uses
* TCP/IP protocol and Java NIO to communicate with other nodes.
* <p>
* To enable communication with other nodes, this SPI adds {@link #ATTR_ADDRS}
* and {@link #ATTR_PORT} local node attributes (see {@link ClusterNode#attributes()}.
* <p>
* At startup, this SPI tries to start listening to local port specified by
* {@link #setLocalPort(int)} method. If local port is occupied, then SPI will
* automatically increment the port number until it can successfully bind for
* listening. {@link #setLocalPortRange(int)} configuration parameter controls
* maximum number of ports that SPI will try before it fails. Port range comes
* very handy when starting multiple grid nodes on the same machine or even
* in the same VM. In this case all nodes can be brought up without a single
* change in configuration.
* <p>
* This SPI caches connections to remote nodes so it does not have to reconnect every
* time a message is sent. By default, idle connections are kept active for
* {@link #DFLT_IDLE_CONN_TIMEOUT} period and then are closed. Use
* {@link #setIdleConnectionTimeout(long)} configuration parameter to configure
* you own idle connection timeout.
* <h1 class="header">Failure Detection</h1>
* Configuration defaults (see Configuration section below and
* {@link IgniteConfiguration#getFailureDetectionTimeout()}) for details) are chosen to make possible for
* communication SPI work reliably on most of hardware and virtual deployments, but this has made failure detection
* time worse.
* <p>
* If it's needed to tune failure detection then it's highly recommended to do this using
* {@link IgniteConfiguration#setFailureDetectionTimeout(long)}. This failure timeout automatically controls the
* following parameters: {@link #getConnectTimeout()}, {@link #getMaxConnectTimeout()},
* {@link #getReconnectCount()}. If any of those parameters is set explicitly, then the failure timeout setting will be
* ignored.
* <p>
* If it's required to perform advanced settings of failure detection and
* {@link IgniteConfiguration#getFailureDetectionTimeout()} is unsuitable then various {@code TcpCommunicationSpi}
* configuration parameters may be used.
* <h1 class="header">Configuration</h1>
* <h2 class="header">Mandatory</h2>
* This SPI has no mandatory configuration parameters.
* <h2 class="header">Optional</h2>
* The following configuration parameters are optional:
* <ul>
* <li>Address resolver (see {@link #setAddressResolver(AddressResolver)}</li>
* <li>Node local IP address (see {@link #setLocalAddress(String)})</li>
* <li>Node local port number (see {@link #setLocalPort(int)})</li>
* <li>Local port range (see {@link #setLocalPortRange(int)}</li>
* <li>Use paired connections (see {@link #setUsePairedConnections(boolean)}</li>
* <li>Connections per node (see {@link #setConnectionsPerNode(int)})</li>
* <li>Shared memory port (see {@link #setSharedMemoryPort(int)}</li>
* <li>Idle connection timeout (see {@link #setIdleConnectionTimeout(long)})</li>
* <li>Direct or heap buffer allocation (see {@link #setDirectBuffer(boolean)})</li>
* <li>Direct or heap buffer allocation for sending (see {@link #setDirectSendBuffer(boolean)})</li>
* <li>Count of selectors and selector threads for NIO server (see {@link #setSelectorsCount(int)})</li>
* <li>Selector thread busy-loop iterations (see {@link #setSelectorSpins(long)}</li>
* <li>{@code TCP_NODELAY} socket option for sockets (see {@link #setTcpNoDelay(boolean)})</li>
* <li>Filter reachable addresses (see {@link #setFilterReachableAddresses(boolean)} </li>
* <li>Message queue limit (see {@link #setMessageQueueLimit(int)})</li>
* <li>Slow client queue limit (see {@link #setSlowClientQueueLimit(int)})</li>
* <li>Connect timeout (see {@link #setConnectTimeout(long)})</li>
* <li>Maximum connect timeout (see {@link #setMaxConnectTimeout(long)})</li>
* <li>Reconnect attempts count (see {@link #setReconnectCount(int)})</li>
* <li>Socket receive buffer size (see {@link #setSocketReceiveBuffer(int)})</li>
* <li>Socket send buffer size (see {@link #setSocketSendBuffer(int)})</li>
* <li>Socket write timeout (see {@link #setSocketWriteTimeout(long)})</li>
* <li>Number of received messages after which acknowledgment is sent (see {@link #setAckSendThreshold(int)})</li>
* <li>Maximum number of unacknowledged messages (see {@link #setUnacknowledgedMessagesBufferSize(int)})</li>
* </ul>
* <h2 class="header">Java Example</h2>
* TcpCommunicationSpi is used by default and should be explicitly configured
* only if some SPI configuration parameters need to be overridden.
* <pre name="code" class="java">
* TcpCommunicationSpi commSpi = new TcpCommunicationSpi();
*
* // Override local port.
* commSpi.setLocalPort(4321);
*
* IgniteConfiguration cfg = new IgniteConfiguration();
*
* // Override default communication SPI.
* cfg.setCommunicationSpi(commSpi);
*
* // Start grid.
* Ignition.start(cfg);
* </pre>
* <h2 class="header">Spring Example</h2>
* TcpCommunicationSpi can be configured from Spring XML configuration file:
* <pre name="code" class="xml">
* &lt;bean id="grid.custom.cfg" class="org.apache.ignite.configuration.IgniteConfiguration" singleton="true"&gt;
* ...
* &lt;property name="communicationSpi"&gt;
* &lt;bean class="org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi"&gt;
* &lt;!-- Override local port. --&gt;
* &lt;property name="localPort" value="4321"/&gt;
* &lt;/bean&gt;
* &lt;/property&gt;
* ...
* &lt;/bean&gt;
* </pre>
* <p>
* <img src="http://ignite.apache.org/images/spring-small.png">
* <br>
* For information about Spring framework visit <a href="http://www.springframework.org/">www.springframework.org</a>
* @see CommunicationSpi
*/
@IgniteSpiMultipleInstancesSupport(true)
@IgniteSpiConsistencyChecked(optional = false)
public class TcpCommunicationSpi extends IgniteSpiAdapter implements CommunicationSpi<Message> {
/** Time threshold to log too long connection establish. */
private static final int CONNECTION_ESTABLISH_THRESHOLD_MS = 100;
/** IPC error message. */
public static final String OUT_OF_RESOURCES_TCP_MSG = "Failed to allocate shared memory segment " +
"(switching to TCP, may be slower).";
/** Node attribute that is mapped to node IP addresses (value is <tt>comm.tcp.addrs</tt>). */
public static final String ATTR_ADDRS = "comm.tcp.addrs";
/** Node attribute that is mapped to node host names (value is <tt>comm.tcp.host.names</tt>). */
public static final String ATTR_HOST_NAMES = "comm.tcp.host.names";
/** Node attribute that is mapped to node port number (value is <tt>comm.tcp.port</tt>). */
public static final String ATTR_PORT = "comm.tcp.port";
/** Node attribute that is mapped to node port number (value is <tt>comm.shmem.tcp.port</tt>). */
public static final String ATTR_SHMEM_PORT = "comm.shmem.tcp.port";
/** Node attribute that is mapped to node's external addresses (value is <tt>comm.tcp.ext-addrs</tt>). */
public static final String ATTR_EXT_ADDRS = "comm.tcp.ext-addrs";
/** */
public static final String ATTR_PAIRED_CONN = "comm.tcp.pairedConnection";
/** Default port which node sets listener to (value is <tt>47100</tt>). */
public static final int DFLT_PORT = 47100;
/** Default port which node sets listener for shared memory connections (value is <tt>48100</tt>). */
public static final int DFLT_SHMEM_PORT = -1;
/** Default idle connection timeout (value is <tt>10</tt>min). */
public static final long DFLT_IDLE_CONN_TIMEOUT = 10 * 60_000;
/** Default socket send and receive buffer size. */
public static final int DFLT_SOCK_BUF_SIZE = 32 * 1024;
/** Default connection timeout (value is <tt>5000</tt>ms). */
public static final long DFLT_CONN_TIMEOUT = 5000;
/** Default Maximum connection timeout (value is <tt>600,000</tt>ms). */
public static final long DFLT_MAX_CONN_TIMEOUT = 10 * 60 * 1000;
/** Default reconnect attempts count (value is <tt>10</tt>). */
public static final int DFLT_RECONNECT_CNT = 10;
/** Default message queue limit per connection (for incoming and outgoing . */
public static final int DFLT_MSG_QUEUE_LIMIT = GridNioServer.DFLT_SEND_QUEUE_LIMIT;
/**
* Default count of selectors for TCP server equals to
* {@code "Math.max(4, Runtime.getRuntime().availableProcessors() / 2)"}.
*/
public static final int DFLT_SELECTORS_CNT = Math.max(4, Runtime.getRuntime().availableProcessors() / 2);
/** Default initial connect/handshake timeout in case of failure detection enabled. */
private static final int DFLT_INITIAL_TIMEOUT = 500;
/** Default initial delay in case of target node is still out of topology. */
private static final int DFLT_NEED_WAIT_DELAY = 200;
/** Default delay between reconnects attempts in case of temporary network issues. */
private static final int DFLT_RECONNECT_DELAY = 50;
/**
* Version when client is ready to wait to connect to server (could be needed when client tries to open connection
* before it starts being visible for server)
*/
private static final IgniteProductVersion VERSION_SINCE_CLIENT_COULD_WAIT_TO_CONNECT = IgniteProductVersion.fromString("2.1.4");
/** Connection index meta for session. */
public static final int CONN_IDX_META = GridNioSessionMetaKey.nextUniqueKey();
/** Node consistent id meta for session. */
public static final int CONSISTENT_ID_META = GridNioSessionMetaKey.nextUniqueKey();
/** Message tracker meta for session. */
private static final int TRACKER_META = GridNioSessionMetaKey.nextUniqueKey();
/** Channel meta used for establishing channel connections. */
private static final int CHANNEL_FUT_META = GridNioSessionMetaKey.nextUniqueKey();
/**
* Default local port range (value is <tt>100</tt>).
* See {@link #setLocalPortRange(int)} for details.
*/
public static final int DFLT_PORT_RANGE = 100;
/** Default value for {@code TCP_NODELAY} socket option (value is <tt>true</tt>). */
public static final boolean DFLT_TCP_NODELAY = true;
/** Default value for {@code FILTER_REACHABLE_ADDRESSES} socket option (value is <tt>false</tt>). */
public static final boolean DFLT_FILTER_REACHABLE_ADDRESSES = false;
/** Default received messages threshold for sending ack. */
public static final int DFLT_ACK_SND_THRESHOLD = 32;
/** Default socket write timeout. */
public static final long DFLT_SOCK_WRITE_TIMEOUT = 2000;
/** Default connections per node. */
public static final int DFLT_CONN_PER_NODE = 1;
/** Maximum {@link GridNioSession} connections per node. */
public static final int MAX_CONN_PER_NODE = 1024;
/** No-op runnable. */
private static final IgniteRunnable NOOP = () -> {};
/** Node ID message type. */
public static final short NODE_ID_MSG_TYPE = -1;
/** Recovery last received ID message type. */
public static final short RECOVERY_LAST_ID_MSG_TYPE = -2;
/** Handshake message type. */
public static final short HANDSHAKE_MSG_TYPE = -3;
/** Handshake wait message type. */
public static final short HANDSHAKE_WAIT_MSG_TYPE = -28;
/** Communication metrics group name. */
public static final String COMMUNICATION_METRICS_GROUP_NAME = MetricUtils.metricName("communication", "tcp");
/** */
public static final String SENT_MESSAGES_METRIC_NAME = "sentMessagesCount";
/** */
public static final String SENT_MESSAGES_METRIC_DESC = "Total number of messages sent by current node";
/** */
public static final String RECEIVED_MESSAGES_METRIC_NAME = "receivedMessagesCount";
/** */
public static final String RECEIVED_MESSAGES_METRIC_DESC = "Total number of messages received by current node";
/** */
public static final String SENT_MESSAGES_BY_TYPE_METRIC_NAME = "sentMessagesByType";
/** */
public static final String SENT_MESSAGES_BY_TYPE_METRIC_DESC =
"Total number of messages with given type sent by current node";
/** */
public static final String RECEIVED_MESSAGES_BY_TYPE_METRIC_NAME = "receivedMessagesByType";
/** */
public static final String RECEIVED_MESSAGES_BY_TYPE_METRIC_DESC =
"Total number of messages with given type received by current node";
/** */
public static final String SENT_MESSAGES_BY_NODE_CONSISTENT_ID_METRIC_NAME = "sentMessagesToNode";
/** */
public static final String SENT_MESSAGES_BY_NODE_CONSISTENT_ID_METRIC_DESC =
"Total number of messages sent by current node to the given node";
/** */
public static final String RECEIVED_MESSAGES_BY_NODE_CONSISTENT_ID_METRIC_NAME = "receivedMessagesFromNode";
/** */
public static final String RECEIVED_MESSAGES_BY_NODE_CONSISTENT_ID_METRIC_DESC =
"Total number of messages received by current node from the given node";
/** */
private ConnectGateway connectGate;
/** */
private ConnectionPolicy connPlc = new FirstConnectionPolicy();
/** Channel connection index provider. */
private ConnectionPolicy chConnPlc;
/** */
private boolean enableForcibleNodeKill = getBoolean(IgniteSystemProperties.IGNITE_ENABLE_FORCIBLE_NODE_KILL);
/** */
private boolean enableTroubleshootingLog = getBoolean(IgniteSystemProperties.IGNITE_TROUBLESHOOTING_LOGGER);
/** Server listener. */
private final GridNioServerListener<Message> srvLsnr =
new GridNioServerListenerAdapter<Message>() {
@Override public void onSessionWriteTimeout(GridNioSession ses) {
LT.warn(log, "Communication SPI session write timed out (consider increasing " +
"'socketWriteTimeout' " + "configuration property) [remoteAddr=" + ses.remoteAddress() +
", writeTimeout=" + sockWriteTimeout + ']');
if (log.isDebugEnabled())
log.debug("Closing communication SPI session on write timeout [remoteAddr=" + ses.remoteAddress() +
", writeTimeout=" + sockWriteTimeout + ']');
ses.close();
}
@Override public void onConnected(GridNioSession ses) {
if (ses.accepted()) {
if (log.isInfoEnabled())
log.info("Accepted incoming communication connection [locAddr=" + ses.localAddress() +
", rmtAddr=" + ses.remoteAddress() + ']');
try {
boolean client = Boolean.TRUE.equals(ignite().configuration().isClientMode());
if (client || ctxInitLatch.getCount() == 0 || !isHandshakeWaitSupported()) {
if (log.isDebugEnabled())
log.debug("Sending local node ID to newly accepted session: " + ses);
ses.sendNoFuture(nodeIdMessage(), null);
}
else {
if (log.isDebugEnabled())
log.debug("Sending handshake wait message to newly accepted session: " + ses);
ses.sendNoFuture(new HandshakeWaitMessage(), null);
}
}
catch (IgniteCheckedException e) {
U.error(log, "Failed to send message: " + e, e);
}
}
else {
if (log.isInfoEnabled())
log.info("Established outgoing communication connection [locAddr=" + ses.localAddress() +
", rmtAddr=" + ses.remoteAddress() + ']');
}
}
@Override public void onDisconnected(GridNioSession ses, @Nullable Exception e) {
ConnectionKey connId = ses.meta(CONN_IDX_META);
if (connId != null) {
if (connId.dummy())
return;
UUID id = connId.nodeId();
GridCommunicationClient[] nodeClients = clients.get(id);
if (nodeClients != null) {
for (GridCommunicationClient client : nodeClients) {
if (client instanceof GridTcpNioCommunicationClient &&
((GridTcpNioCommunicationClient)client).session() == ses) {
client.close();
removeNodeClient(id, client);
}
}
}
if (!stopping) {
GridNioRecoveryDescriptor outDesc = ses.outRecoveryDescriptor();
if (outDesc != null) {
if (outDesc.nodeAlive(getSpiContext().node(id))) {
if (!outDesc.messagesRequests().isEmpty()) {
if (log.isDebugEnabled())
log.debug("Session was closed but there are unacknowledged messages, " +
"will try to reconnect [rmtNode=" + outDesc.node().id() + ']');
DisconnectedSessionInfo disconnectData =
new DisconnectedSessionInfo(outDesc, connId.connectionIndex());
commWorker.addProcessDisconnectRequest(disconnectData);
}
}
else
outDesc.onNodeLeft();
}
}
CommunicationListener<Message> lsnr0 = lsnr;
if (lsnr0 != null)
lsnr0.onDisconnected(id);
}
}
/**
* @param ses Session.
* @param msg Message.
*/
private void onFirstMessage(final GridNioSession ses, Message msg) {
UUID sndId;
ConnectionKey connKey;
if (msg instanceof NodeIdMessage) {
sndId = U.bytesToUuid(((NodeIdMessage)msg).nodeIdBytes(), 0);
connKey = new ConnectionKey(sndId, 0, -1);
}
else {
assert msg instanceof HandshakeMessage : msg;
HandshakeMessage msg0 = (HandshakeMessage)msg;
sndId = ((HandshakeMessage)msg).nodeId();
connKey = new ConnectionKey(sndId, msg0.connectionIndex(), msg0.connectCount());
}
if (log.isDebugEnabled())
log.debug("Remote node ID received: " + sndId);
final ClusterNode rmtNode = getSpiContext().node(sndId);
if (rmtNode == null) {
DiscoverySpi discoverySpi = ignite().configuration().getDiscoverySpi();
boolean unknownNode = true;
if (discoverySpi instanceof TcpDiscoverySpi) {
TcpDiscoverySpi tcpDiscoverySpi = (TcpDiscoverySpi)discoverySpi;
ClusterNode node0 = tcpDiscoverySpi.getNode0(sndId);
if (node0 != null) {
assert node0.isClient() : node0;
if (node0.version().compareTo(VERSION_SINCE_CLIENT_COULD_WAIT_TO_CONNECT) >= 0)
unknownNode = false;
}
}
else if (discoverySpi instanceof IgniteDiscoverySpi)
unknownNode = !((IgniteDiscoverySpi)discoverySpi).knownNode(sndId);
if (unknownNode) {
U.warn(log, "Close incoming connection, unknown node [nodeId=" + sndId + ", ses=" + ses + ']');
ses.send(new RecoveryLastReceivedMessage(UNKNOWN_NODE)).listen(new CI1<IgniteInternalFuture<?>>() {
@Override public void apply(IgniteInternalFuture<?> fut) {
ses.close();
}
});
}
else {
ses.send(new RecoveryLastReceivedMessage(NEED_WAIT)).listen(new CI1<IgniteInternalFuture<?>>() {
@Override public void apply(IgniteInternalFuture<?> fut) {
ses.close();
}
});
}
return;
}
ses.addMeta(CONSISTENT_ID_META, rmtNode.consistentId());
final ConnectionKey old = ses.addMeta(CONN_IDX_META, connKey);
assert old == null;
ClusterNode locNode = getSpiContext().localNode();
if (ses.remoteAddress() == null)
return;
assert msg instanceof HandshakeMessage : msg;
HandshakeMessage msg0 = (HandshakeMessage)msg;
if (log.isDebugEnabled())
log.debug("Received handshake message [locNodeId=" + locNode.id() + ", rmtNodeId=" + sndId +
", msg=" + msg0 + ']');
if (isChannelConnIdx(msg0.connectionIndex()))
ses.send(new RecoveryLastReceivedMessage(0));
else if (usePairedConnections(rmtNode)) {
final GridNioRecoveryDescriptor recoveryDesc = inRecoveryDescriptor(rmtNode, connKey);
ConnectClosureNew c = new ConnectClosureNew(ses, recoveryDesc, rmtNode);
boolean reserve = recoveryDesc.tryReserve(msg0.connectCount(), c);
if (reserve)
connectedNew(recoveryDesc, ses, true);
else {
if (c.failed) {
ses.send(new RecoveryLastReceivedMessage(ALREADY_CONNECTED));
closeStaleConnections(connKey);
}
}
}
else {
assert connKey.connectionIndex() >= 0 : connKey;
GridCommunicationClient[] curClients = clients.get(sndId);
GridCommunicationClient oldClient =
curClients != null && connKey.connectionIndex() < curClients.length ?
curClients[connKey.connectionIndex()] :
null;
boolean hasShmemClient = false;
if (oldClient != null) {
if (oldClient instanceof GridTcpNioCommunicationClient) {
if (log.isInfoEnabled())
log.info("Received incoming connection when already connected " +
"to this node, rejecting [locNode=" + locNode.id() +
", rmtNode=" + sndId + ']');
ses.send(new RecoveryLastReceivedMessage(ALREADY_CONNECTED));
closeStaleConnections(connKey);
return;
}
else {
assert oldClient instanceof GridShmemCommunicationClient;
hasShmemClient = true;
}
}
GridFutureAdapter<GridCommunicationClient> fut = new GridFutureAdapter<>();
GridFutureAdapter<GridCommunicationClient> oldFut = clientFuts.putIfAbsent(connKey, fut);
final GridNioRecoveryDescriptor recoveryDesc = inRecoveryDescriptor(rmtNode, connKey);
if (oldFut == null) {
curClients = clients.get(sndId);
oldClient = curClients != null && connKey.connectionIndex() < curClients.length ?
curClients[connKey.connectionIndex()] : null;
if (oldClient != null) {
if (oldClient instanceof GridTcpNioCommunicationClient) {
assert oldClient.connectionIndex() == connKey.connectionIndex() : oldClient;
if (log.isInfoEnabled())
log.info("Received incoming connection when already connected " +
"to this node, rejecting [locNode=" + locNode.id() +
", rmtNode=" + sndId + ']');
ses.send(new RecoveryLastReceivedMessage(ALREADY_CONNECTED));
closeStaleConnections(connKey);
fut.onDone(oldClient);
return;
}
else {
assert oldClient instanceof GridShmemCommunicationClient;
hasShmemClient = true;
}
}
boolean reserved = recoveryDesc.tryReserve(msg0.connectCount(),
new ConnectClosure(ses, recoveryDesc, rmtNode, connKey, msg0, !hasShmemClient, fut));
if (log.isDebugEnabled())
log.debug("Received incoming connection from remote node " +
"[rmtNode=" + rmtNode.id() + ", reserved=" + reserved +
", recovery=" + recoveryDesc + ']');
if (reserved) {
try {
GridTcpNioCommunicationClient client =
connected(recoveryDesc, ses, rmtNode, msg0.received(), true, !hasShmemClient);
fut.onDone(client);
}
finally {
clientFuts.remove(connKey, fut);
}
}
}
else {
if (oldFut instanceof ConnectFuture && locNode.order() < rmtNode.order()) {
if (log.isInfoEnabled()) {
log.info("Received incoming connection from remote node while " +
"connecting to this node, rejecting [locNode=" + locNode.id() +
", locNodeOrder=" + locNode.order() + ", rmtNode=" + rmtNode.id() +
", rmtNodeOrder=" + rmtNode.order() + ']');
}
ses.send(new RecoveryLastReceivedMessage(ALREADY_CONNECTED));
}
else {
// The code below causes a race condition between shmem and TCP (see IGNITE-1294)
boolean reserved = recoveryDesc.tryReserve(msg0.connectCount(),
new ConnectClosure(ses, recoveryDesc, rmtNode, connKey, msg0, !hasShmemClient, fut));
if (reserved)
connected(recoveryDesc, ses, rmtNode, msg0.received(), true, !hasShmemClient);
}
}
}
}
/**
* @param connKey Connection key.
*/
private void closeStaleConnections(ConnectionKey connKey) {
for (GridNioSession ses0 : nioSrvr.sessions()) {
ConnectionKey key0 = ses0.meta(CONN_IDX_META);
if (ses0.accepted() && key0 != null &&
key0.nodeId().equals(connKey.nodeId()) &&
key0.connectionIndex() == connKey.connectionIndex() &&
key0.connectCount() < connKey.connectCount())
ses0.close();
}
}
@Override public void onMessageSent(GridNioSession ses, Message msg) {
Object consistentId = ses.meta(CONSISTENT_ID_META);
if (consistentId != null)
metricsLsnr.onMessageSent(msg, consistentId);
}
private void onChannelCreate(
GridSelectorNioSessionImpl ses,
ConnectionKey connKey,
Message msg
) {
cleanupLocalNodeRecoveryDescriptor(connKey);
ses.send(msg)
.listen(sendFut -> {
if (sendFut.error() != null) {
U.error(log, "Fail to send channel creation response to the remote node. " +
"Session will be closed [nodeId=" + connKey.nodeId() +
", idx=" + connKey.connectionIndex() + ']', sendFut.error());
ses.close();
return;
}
ses.closeSocketOnSessionClose(false);
// Close session and send response.
ses.close().listen(closeFut -> {
if (closeFut.error() != null) {
U.error(log, "Nio session has not been properly closed " +
"[nodeId=" + connKey.nodeId() + ", idx=" + connKey.connectionIndex() + ']',
closeFut.error());
U.closeQuiet(ses.key().channel());
return;
}
notifyChannelEvtListener(connKey.nodeId(), ses.key().channel(), msg);
});
});
}
@Override public void onMessage(final GridNioSession ses, Message msg) {
MTC.span().addLog(() -> "Communication received");
MTC.span().addTag(SpanTags.MESSAGE, () -> traceName(msg));
ConnectionKey connKey = ses.meta(CONN_IDX_META);
if (connKey == null) {
assert ses.accepted() : ses;
if (!connectGate.tryEnter()) {
if (log.isDebugEnabled())
log.debug("Close incoming connection, failed to enter gateway.");
ses.send(new RecoveryLastReceivedMessage(NODE_STOPPING)).listen(new CI1<IgniteInternalFuture<?>>() {
@Override public void apply(IgniteInternalFuture<?> fut) {
ses.close();
}
});
return;
}
try {
onFirstMessage(ses, msg);
}
finally {
connectGate.leave();
}
}
else {
Object consistentId = ses.meta(CONSISTENT_ID_META);
assert consistentId != null;
if (isChannelConnIdx(connKey.connectionIndex())) {
if (ses.meta(CHANNEL_FUT_META) == null)
onChannelCreate((GridSelectorNioSessionImpl)ses, connKey, msg);
else {
GridFutureAdapter<Channel> fut = ses.meta(CHANNEL_FUT_META);
GridSelectorNioSessionImpl ses0 = (GridSelectorNioSessionImpl)ses;
ses0.closeSocketOnSessionClose(false);
ses0.close().listen(f -> {
if (f.error() != null) {
fut.onDone(f.error());
return;
}
fut.onDone(ses0.key().channel());
});
}
return;
}
if (msg instanceof RecoveryLastReceivedMessage) {
metricsLsnr.onMessageReceived(msg, consistentId);
GridNioRecoveryDescriptor recovery = ses.outRecoveryDescriptor();
if (recovery != null) {
RecoveryLastReceivedMessage msg0 = (RecoveryLastReceivedMessage)msg;
if (log.isDebugEnabled()) {
log.debug("Received recovery acknowledgement [rmtNode=" + connKey.nodeId() +
", connIdx=" + connKey.connectionIndex() +
", rcvCnt=" + msg0.received() + ']');
}
recovery.ackReceived(msg0.received());
}
return;
}
else {
GridNioRecoveryDescriptor recovery = ses.inRecoveryDescriptor();
if (recovery != null) {
long rcvCnt = recovery.onReceived();
if (rcvCnt % ackSndThreshold == 0) {
if (log.isDebugEnabled()) {
log.debug("Send recovery acknowledgement [rmtNode=" + connKey.nodeId() +
", connIdx=" + connKey.connectionIndex() +
", rcvCnt=" + rcvCnt + ']');
}
ses.systemMessage(new RecoveryLastReceivedMessage(rcvCnt));
recovery.lastAcknowledged(rcvCnt);
}
}
else if (connKey.dummy()) {
assert msg instanceof NodeIdMessage : msg;
TcpCommunicationNodeConnectionCheckFuture fut = ses.meta(SES_FUT_META);
assert fut != null : msg;
fut.onConnected(U.bytesToUuid(((NodeIdMessage)msg).nodeIdBytes(), 0));
nioSrvr.closeFromWorkerThread(ses);
return;
}
}
metricsLsnr.onMessageReceived(msg, consistentId);
IgniteRunnable c;
if (msgQueueLimit > 0) {
GridNioMessageTracker tracker = ses.meta(TRACKER_META);
if (tracker == null) {
GridNioMessageTracker old = ses.addMeta(TRACKER_META, tracker =
new GridNioMessageTracker(ses, msgQueueLimit));
assert old == null;
}
tracker.onMessageReceived();
c = tracker;
}
else
c = NOOP;
notifyListener(connKey.nodeId(), msg, c);
}
}
/** {@inheritDoc} */
@Override public void onFailure(FailureType failureType, Throwable failure) {
if (ignite instanceof IgniteEx)
((IgniteEx)ignite).context().failure().process(new FailureContext(failureType, failure));
}
/**
* @param recovery Recovery descriptor.
* @param ses Session.
* @param node Node.
* @param rcvCnt Number of received messages.
* @param sndRes If {@code true} sends response for recovery handshake.
* @param createClient If {@code true} creates NIO communication client.
* @return Client.
*/
private GridTcpNioCommunicationClient connected(
GridNioRecoveryDescriptor recovery,
GridNioSession ses,
ClusterNode node,
long rcvCnt,
boolean sndRes,
boolean createClient) {
ConnectionKey connKey = ses.meta(CONN_IDX_META);
assert connKey != null && connKey.connectionIndex() >= 0 : connKey;
assert !usePairedConnections(node);
recovery.onHandshake(rcvCnt);
ses.inRecoveryDescriptor(recovery);
ses.outRecoveryDescriptor(recovery);
nioSrvr.resend(ses);
try {
if (sndRes)
nioSrvr.sendSystem(ses, new RecoveryLastReceivedMessage(recovery.received()));
}
catch (IgniteCheckedException e) {
U.error(log, "Failed to send message: " + e, e);
}
recovery.onConnected();
GridTcpNioCommunicationClient client = null;
if (createClient) {
client = new GridTcpNioCommunicationClient(connKey.connectionIndex(), ses, log);
addNodeClient(node, connKey.connectionIndex(), client);
}
return client;
}
/**
* @param recovery Recovery descriptor.
* @param ses Session.
* @param sndRes If {@code true} sends response for recovery handshake.
*/
private void connectedNew(
GridNioRecoveryDescriptor recovery,
GridNioSession ses,
boolean sndRes) {
try {
ses.inRecoveryDescriptor(recovery);
if (sndRes)
nioSrvr.sendSystem(ses, new RecoveryLastReceivedMessage(recovery.received()));
recovery.onConnected();
}
catch (IgniteCheckedException e) {
U.error(log, "Failed to send message: " + e, e);
}
}
/**
*
*/
class ConnectClosureNew implements IgniteInClosure<Boolean> {
/** */
private static final long serialVersionUID = 0L;
/** */
private final GridNioSession ses;
/** */
private final GridNioRecoveryDescriptor recoveryDesc;
/** */
private final ClusterNode rmtNode;
/** */
private boolean failed;
/**
* @param ses Incoming session.
* @param recoveryDesc Recovery descriptor.
* @param rmtNode Remote node.
*/
ConnectClosureNew(GridNioSession ses,
GridNioRecoveryDescriptor recoveryDesc,
ClusterNode rmtNode) {
this.ses = ses;
this.recoveryDesc = recoveryDesc;
this.rmtNode = rmtNode;
}
/** {@inheritDoc} */
@Override public void apply(Boolean success) {
try {
failed = !success;
if (success) {
IgniteInClosure<IgniteInternalFuture<?>> lsnr = new IgniteInClosure<IgniteInternalFuture<?>>() {
@Override public void apply(IgniteInternalFuture<?> msgFut) {
try {
msgFut.get();
connectedNew(recoveryDesc, ses, false);
}
catch (IgniteCheckedException e) {
if (log.isDebugEnabled())
log.debug("Failed to send recovery handshake " +
"[rmtNode=" + rmtNode.id() + ", err=" + e + ']');
recoveryDesc.release();
}
}
};
nioSrvr.sendSystem(ses, new RecoveryLastReceivedMessage(recoveryDesc.received()), lsnr);
}
else
nioSrvr.sendSystem(ses, new RecoveryLastReceivedMessage(ALREADY_CONNECTED));
}
catch (IgniteCheckedException e) {
U.error(log, "Failed to send message: " + e, e);
}
}
}
/**
*
*/
@SuppressWarnings("PackageVisibleInnerClass")
class ConnectClosure implements IgniteInClosure<Boolean> {
/** */
private static final long serialVersionUID = 0L;
/** */
private final GridNioSession ses;
/** */
private final GridNioRecoveryDescriptor recoveryDesc;
/** */
private final ClusterNode rmtNode;
/** */
private final HandshakeMessage msg;
/** */
private final GridFutureAdapter<GridCommunicationClient> fut;
/** */
private final boolean createClient;
/** */
private final ConnectionKey connKey;
/**
* @param ses Incoming session.
* @param recoveryDesc Recovery descriptor.
* @param rmtNode Remote node.
* @param connKey Connection key.
* @param msg Handshake message.
* @param createClient If {@code true} creates NIO communication client..
* @param fut Connect future.
*/
ConnectClosure(GridNioSession ses,
GridNioRecoveryDescriptor recoveryDesc,
ClusterNode rmtNode,
ConnectionKey connKey,
HandshakeMessage msg,
boolean createClient,
GridFutureAdapter<GridCommunicationClient> fut) {
this.ses = ses;
this.recoveryDesc = recoveryDesc;
this.rmtNode = rmtNode;
this.connKey = connKey;
this.msg = msg;
this.createClient = createClient;
this.fut = fut;
}
/** {@inheritDoc} */
@Override public void apply(Boolean success) {
if (success) {
try {
IgniteInClosure<IgniteInternalFuture<?>> lsnr = new IgniteInClosure<IgniteInternalFuture<?>>() {
@Override public void apply(IgniteInternalFuture<?> msgFut) {
try {
msgFut.get();
GridTcpNioCommunicationClient client =
connected(recoveryDesc, ses, rmtNode, msg.received(), false, createClient);
fut.onDone(client);
}
catch (IgniteCheckedException e) {
if (log.isDebugEnabled())
log.debug("Failed to send recovery handshake " +
"[rmtNode=" + rmtNode.id() + ", err=" + e + ']');
recoveryDesc.release();
fut.onDone();
}
finally {
clientFuts.remove(connKey, fut);
}
}
};
nioSrvr.sendSystem(ses, new RecoveryLastReceivedMessage(recoveryDesc.received()), lsnr);
}
catch (IgniteCheckedException e) {
U.error(log, "Failed to send message: " + e, e);
}
}
else {
try {
fut.onDone();
}
finally {
clientFuts.remove(connKey, fut);
}
}
}
}
};
/** Logger. */
@LoggerResource
private IgniteLogger log;
/** Logger. */
@LoggerResource(categoryName = "org.apache.ignite.internal.diagnostic")
private IgniteLogger diagnosticLog;
/** Local IP address. */
private String locAddr;
/** Complex variable that represents this node IP address. */
private volatile InetAddress locHost;
/** Local port which node uses. */
private int locPort = DFLT_PORT;
/** Local port range. */
private int locPortRange = DFLT_PORT_RANGE;
/** Local port which node uses to accept shared memory connections. */
private int shmemPort = DFLT_SHMEM_PORT;
/** Allocate direct buffer or heap buffer. */
private boolean directBuf = true;
/** Allocate direct buffer or heap buffer. */
private boolean directSndBuf;
/** Idle connection timeout. */
private long idleConnTimeout = DFLT_IDLE_CONN_TIMEOUT;
/** Connect timeout. */
private long connTimeout = DFLT_CONN_TIMEOUT;
/** Maximum connect timeout. */
private long maxConnTimeout = DFLT_MAX_CONN_TIMEOUT;
/** Reconnect attempts count. */
private int reconCnt = DFLT_RECONNECT_CNT;
/** Socket send buffer. */
private int sockSndBuf = DFLT_SOCK_BUF_SIZE;
/** Socket receive buffer. */
private int sockRcvBuf = DFLT_SOCK_BUF_SIZE;
/** Message queue limit. */
private int msgQueueLimit = DFLT_MSG_QUEUE_LIMIT;
/** Slow client queue limit. */
private int slowClientQueueLimit;
/** NIO server. */
private GridNioServer<Message> nioSrvr;
/** Shared memory server. */
private IpcSharedMemoryServerEndpoint shmemSrv;
/** */
private boolean usePairedConnections;
/** */
private int connectionsPerNode = DFLT_CONN_PER_NODE;
/** {@code TCP_NODELAY} option value for created sockets. */
private boolean tcpNoDelay = DFLT_TCP_NODELAY;
/** {@code FILTER_REACHABLE_ADDRESSES} option value for created sockets. */
private boolean filterReachableAddresses = DFLT_FILTER_REACHABLE_ADDRESSES;
/** Number of received messages after which acknowledgment is sent. */
private int ackSndThreshold = DFLT_ACK_SND_THRESHOLD;
/** Maximum number of unacknowledged messages. */
private int unackedMsgsBufSize;
/** Socket write timeout. */
private long sockWriteTimeout = DFLT_SOCK_WRITE_TIMEOUT;
/** Recovery and idle clients handler. */
private CommunicationWorker commWorker;
/** Shared memory accept worker. */
private ShmemAcceptWorker shmemAcceptWorker;
/** Shared memory workers. */
private final Collection<ShmemWorker> shmemWorkers = new ConcurrentLinkedDeque<>();
/** Clients. */
private final ConcurrentMap<UUID, GridCommunicationClient[]> clients = GridConcurrentFactory.newMap();
/** SPI listener. */
private volatile CommunicationListener<Message> lsnr;
/** Bound port. */
private int boundTcpPort = -1;
/** Bound port for shared memory server. */
private int boundTcpShmemPort = -1;
/** Count of selectors to use in TCP server. */
private int selectorsCnt = DFLT_SELECTORS_CNT;
/**
* Defines how many non-blocking {@code selector.selectNow()} should be made before
* falling into {@code selector.select(long)} in NIO server. Long value. Default is {@code 0}.
* Can be set to {@code Long.MAX_VALUE} so selector threads will never block.
*/
private long selectorSpins = IgniteSystemProperties.getLong("IGNITE_SELECTOR_SPINS", 0L);
/** Address resolver. */
private AddressResolver addrRslvr;
/** Context initialization latch. */
private final CountDownLatch ctxInitLatch = new CountDownLatch(1);
/** Stopping flag (set to {@code true} when SPI gets stopping signal). */
private volatile boolean stopping;
/** Statistics. */
private TcpCommunicationMetricsListener metricsLsnr;
/** Client connect futures. */
private final ConcurrentMap<ConnectionKey, GridFutureAdapter<GridCommunicationClient>> clientFuts =
GridConcurrentFactory.newMap();
/** */
private final ConcurrentMap<ConnectionKey, GridNioRecoveryDescriptor> recoveryDescs = GridConcurrentFactory.newMap();
/** */
private final ConcurrentMap<ConnectionKey, GridNioRecoveryDescriptor> outRecDescs = GridConcurrentFactory.newMap();
/** */
private final ConcurrentMap<ConnectionKey, GridNioRecoveryDescriptor> inRecDescs = GridConcurrentFactory.newMap();
/** */
private final GridLocalEventListener discoLsnr = new DiscoveryListener();
/** Tracing. */
protected Tracing tracing;
/**
* @return {@code True} if ssl enabled.
*/
private boolean isSslEnabled() {
return ignite.configuration().getSslContextFactory() != null;
}
/**
* Sets address resolver.
*
* @param addrRslvr Address resolver.
* @return {@code this} for chaining.
*/
@IgniteSpiConfiguration(optional = true)
public TcpCommunicationSpi setAddressResolver(AddressResolver addrRslvr) {
// Injection should not override value already set by Spring or user.
if (this.addrRslvr == null)
this.addrRslvr = addrRslvr;
return this;
}
/**
* See {@link #setAddressResolver(AddressResolver)}.
*
* @return Address resolver.
*/
public AddressResolver getAddressResolver() {
return addrRslvr;
}
/**
* Injects resources.
*
* @param ignite Ignite.
*/
@IgniteInstanceResource
@Override protected void injectResources(Ignite ignite) {
super.injectResources(ignite);
if (ignite != null) {
setAddressResolver(ignite.configuration().getAddressResolver());
setLocalAddress(ignite.configuration().getLocalHost());
tracing = ignite instanceof IgniteEx ? ((IgniteEx)ignite).context().tracing() : new NoopTracing();
}
}
/**
* Sets local host address for socket binding. Note that one node could have
* additional addresses beside the loopback one. This configuration
* parameter is optional.
*
* @param locAddr IP address. Default value is any available local
* IP address.
* @return {@code this} for chaining.
*/
@IgniteSpiConfiguration(optional = true)
public TcpCommunicationSpi setLocalAddress(String locAddr) {
// Injection should not override value already set by Spring or user.
if (this.locAddr == null)
this.locAddr = locAddr;
return this;
}
/**
* See {@link #setLocalAddress(String)}.
*
* @return Grid node IP address.
*/
public String getLocalAddress() {
return locAddr;
}
/**
* Sets local port for socket binding.
* <p>
* If not provided, default value is {@link #DFLT_PORT}.
*
* @param locPort Port number.
* @return {@code this} for chaining.
*/
@IgniteSpiConfiguration(optional = true)
public TcpCommunicationSpi setLocalPort(int locPort) {
this.locPort = locPort;
return this;
}
/**
* See {@link #setLocalPort(int)}.
*
* @return Port number.
*/
public int getLocalPort() {
return locPort;
}
/**
* Sets local port range for local host ports (value must greater than or equal to <tt>0</tt>).
* If provided local port (see {@link #setLocalPort(int)}} is occupied,
* implementation will try to increment the port number for as long as it is less than
* initial value plus this range.
* <p>
* If port range value is <tt>0</tt>, then implementation will try bind only to the port provided by
* {@link #setLocalPort(int)} method and fail if binding to this port did not succeed.
* <p>
* Local port range is very useful during development when more than one grid nodes need to run
* on the same physical machine.
* <p>
* If not provided, default value is {@link #DFLT_PORT_RANGE}.
*
* @param locPortRange New local port range.
* @return {@code this} for chaining.
*/
@IgniteSpiConfiguration(optional = true)
public TcpCommunicationSpi setLocalPortRange(int locPortRange) {
this.locPortRange = locPortRange;
return this;
}
/**
* See {@link #setLocalPortRange(int)}.
*
* @return Local Port range.
*/
public int getLocalPortRange() {
return locPortRange;
}
/**
* See {@link #setUsePairedConnections(boolean)}.
*
* @return {@code true} to use paired connections and {@code false} otherwise.
*/
public boolean isUsePairedConnections() {
return usePairedConnections;
}
/**
* Set this to {@code true} if {@code TcpCommunicationSpi} should
* maintain connection for outgoing and incoming messages separately.
* In this case total number of connections between local and each remote node
* is {@link #getConnectionsPerNode()} * 2.
* <p>
* Set this to {@code false} if each connection of {@link #getConnectionsPerNode()}
* should be used for outgoing and incoming messages. In this case total number
* of connections between local and each remote node is {@link #getConnectionsPerNode()}.
* <p>
* Default is {@code false}.
*
* @param usePairedConnections {@code true} to use paired connections and {@code false} otherwise.
* @return {@code this} for chaining.
* @see #getConnectionsPerNode()
*/
@IgniteSpiConfiguration(optional = true)
public TcpCommunicationSpi setUsePairedConnections(boolean usePairedConnections) {
this.usePairedConnections = usePairedConnections;
return this;
}
/**
* Sets number of connections to each remote node. if {@link #isUsePairedConnections()}
* is {@code true} then number of connections is doubled and half is used for incoming and
* half for outgoing messages.
*
* @param maxConnectionsPerNode Number of connections per node.
* @return {@code this} for chaining.
* @see #isUsePairedConnections()
*/
@IgniteSpiConfiguration(optional = true)
public TcpCommunicationSpi setConnectionsPerNode(int maxConnectionsPerNode) {
this.connectionsPerNode = maxConnectionsPerNode;
return this;
}
/**
* See {@link #setConnectionsPerNode(int)}.
*
* @return Number of connections per node.
*/
public int getConnectionsPerNode() {
return connectionsPerNode;
}
/**
* Sets local port to accept shared memory connections.
* <p>
* If set to {@code -1} shared memory communication will be disabled.
* <p>
* If not provided, default value is {@link #DFLT_SHMEM_PORT}.
*
* @param shmemPort Port number.
* @return {@code this} for chaining.
*/
@IgniteSpiConfiguration(optional = true)
public TcpCommunicationSpi setSharedMemoryPort(int shmemPort) {
this.shmemPort = shmemPort;
return this;
}
/**
* See {@link #setSharedMemoryPort(int)}.
*
* @return Port number.
*/
public int getSharedMemoryPort() {
return shmemPort;
}
/**
* Sets maximum idle connection timeout upon which a connection
* to client will be closed.
* <p>
* If not provided, default value is {@link #DFLT_IDLE_CONN_TIMEOUT}.
*
* @param idleConnTimeout Maximum idle connection time.
* @return {@code this} for chaining.
*/
@IgniteSpiConfiguration(optional = true)
public TcpCommunicationSpi setIdleConnectionTimeout(long idleConnTimeout) {
this.idleConnTimeout = idleConnTimeout;
return this;
}
/**
* See {@link #setIdleConnectionTimeout(long)}.
*
* @return Maximum idle connection time.
*/
public long getIdleConnectionTimeout() {
return idleConnTimeout;
}
/**
* See {@link #setSocketWriteTimeout(long)}.
*
* @return Socket write timeout for TCP connections.
*/
public long getSocketWriteTimeout() {
return sockWriteTimeout;
}
/**
* Sets socket write timeout for TCP connection. If message can not be written to
* socket within this time then connection is closed and reconnect is attempted.
* <p>
* Default to {@link #DFLT_SOCK_WRITE_TIMEOUT}.
*
* @param sockWriteTimeout Socket write timeout for TCP connection.
* @return {@code this} for chaining.
*/
@IgniteSpiConfiguration(optional = true)
public TcpCommunicationSpi setSocketWriteTimeout(long sockWriteTimeout) {
this.sockWriteTimeout = sockWriteTimeout;
return this;
}
/**
* See {@link #setAckSendThreshold(int)}.
*
* @return Number of received messages after which acknowledgment is sent.
*/
public int getAckSendThreshold() {
return ackSndThreshold;
}
/**
* Sets number of received messages per connection to node after which acknowledgment message is sent.
* <p>
* Default to {@link #DFLT_ACK_SND_THRESHOLD}.
*
* @param ackSndThreshold Number of received messages after which acknowledgment is sent.
* @return {@code this} for chaining.
*/
@IgniteSpiConfiguration(optional = true)
public TcpCommunicationSpi setAckSendThreshold(int ackSndThreshold) {
this.ackSndThreshold = ackSndThreshold;
return this;
}
/**
* See {@link #setUnacknowledgedMessagesBufferSize(int)}.
*
* @return Maximum number of unacknowledged messages.
*/
public int getUnacknowledgedMessagesBufferSize() {
return unackedMsgsBufSize;
}
/**
* Sets maximum number of stored unacknowledged messages per connection to node.
* If number of unacknowledged messages exceeds this number then connection to node is
* closed and reconnect is attempted.
*
* @param unackedMsgsBufSize Maximum number of unacknowledged messages.
* @return {@code this} for chaining.
*/
@IgniteSpiConfiguration(optional = true)
public TcpCommunicationSpi setUnacknowledgedMessagesBufferSize(int unackedMsgsBufSize) {
this.unackedMsgsBufSize = unackedMsgsBufSize;
return this;
}
/**
* Sets connect timeout used when establishing connection
* with remote nodes.
* <p>
* {@code 0} is interpreted as infinite timeout.
* <p>
* If not provided, default value is {@link #DFLT_CONN_TIMEOUT}.
* <p>
* When this property is explicitly set {@link IgniteConfiguration#getFailureDetectionTimeout()} is ignored.
*
* @param connTimeout Connect timeout.
* @return {@code this} for chaining.
*/
@IgniteSpiConfiguration(optional = true)
public TcpCommunicationSpi setConnectTimeout(long connTimeout) {
this.connTimeout = connTimeout;
failureDetectionTimeoutEnabled(false);
return this;
}
/**
* See {@link #setConnectTimeout(long)}.
*
* @return Connect timeout.
*/
public long getConnectTimeout() {
return connTimeout;
}
/**
* Sets maximum connect timeout. If handshake is not established within connect timeout,
* then SPI tries to repeat handshake procedure with increased connect timeout.
* Connect timeout can grow till maximum timeout value,
* if maximum timeout value is reached then the handshake is considered as failed.
* <p>
* {@code 0} is interpreted as infinite timeout.
* <p>
* If not provided, default value is {@link #DFLT_MAX_CONN_TIMEOUT}.
* <p>
* When this property is explicitly set {@link IgniteConfiguration#getFailureDetectionTimeout()} is ignored.
*
* @param maxConnTimeout Maximum connect timeout.
* @return {@code this} for chaining.
*/
@IgniteSpiConfiguration(optional = true)
public TcpCommunicationSpi setMaxConnectTimeout(long maxConnTimeout) {
this.maxConnTimeout = maxConnTimeout;
failureDetectionTimeoutEnabled(false);
return this;
}
/**
* Gets maximum connect timeout.
*
* @return Maximum connect timeout.
*/
public long getMaxConnectTimeout() {
return maxConnTimeout;
}
/**
* Sets maximum number of reconnect attempts used when establishing connection
* with remote nodes.
* <p>
* If not provided, default value is {@link #DFLT_RECONNECT_CNT}.
* <p>
* When this property is explicitly set {@link IgniteConfiguration#getFailureDetectionTimeout()} is ignored.
*
* @param reconCnt Maximum number of reconnection attempts.
* @return {@code this} for chaining.
*/
@IgniteSpiConfiguration(optional = true)
public TcpCommunicationSpi setReconnectCount(int reconCnt) {
this.reconCnt = reconCnt;
failureDetectionTimeoutEnabled(false);
return this;
}
/**
* Gets maximum number of reconnect attempts used when establishing connection
* with remote nodes.
*
* @return Reconnects count.
*/
public int getReconnectCount() {
return reconCnt;
}
/**
* Sets flag to allocate direct or heap buffer in SPI.
* If value is {@code true}, then SPI will use {@link ByteBuffer#allocateDirect(int)} call.
* Otherwise, SPI will use {@link ByteBuffer#allocate(int)} call.
* <p>
* If not provided, default value is {@code true}.
*
* @param directBuf Flag indicates to allocate direct or heap buffer in SPI.
* @return {@code this} for chaining.
*/
@IgniteSpiConfiguration(optional = true)
public TcpCommunicationSpi setDirectBuffer(boolean directBuf) {
this.directBuf = directBuf;
return this;
}
/**
* Gets flag that indicates whether direct or heap allocated buffer is used.
*
* @return Flag that indicates whether direct or heap allocated buffer is used.
*/
public boolean isDirectBuffer() {
return directBuf;
}
/**
* Gets flag defining whether direct send buffer should be used.
*
* @return {@code True} if direct buffers should be used.
*/
public boolean isDirectSendBuffer() {
return directSndBuf;
}
/**
* Sets whether to use direct buffer for sending.
*
* If not provided default is {@code false}.
*
* @param directSndBuf {@code True} to use direct buffers for send.
* @return {@code this} for chaining.
*/
@IgniteSpiConfiguration(optional = true)
public TcpCommunicationSpi setDirectSendBuffer(boolean directSndBuf) {
this.directSndBuf = directSndBuf;
return this;
}
/**
* Sets the count of selectors te be used in TCP server.
* <p/>
* If not provided, default value is {@link #DFLT_SELECTORS_CNT}.
*
* @param selectorsCnt Selectors count.
* @return {@code this} for chaining.
*/
@IgniteSpiConfiguration(optional = true)
public TcpCommunicationSpi setSelectorsCount(int selectorsCnt) {
this.selectorsCnt = selectorsCnt;
return this;
}
/**
* See {@link #setSelectorsCount(int)}.
*
* @return Count of selectors in TCP server.
*/
public int getSelectorsCount() {
return selectorsCnt;
}
/**
* See {@link #setSelectorSpins(long)}.
*
* @return Selector thread busy-loop iterations.
*/
public long getSelectorSpins() {
return selectorSpins;
}
/**
* Defines how many non-blocking {@code selector.selectNow()} should be made before
* falling into {@code selector.select(long)} in NIO server. Long value. Default is {@code 0}.
* Can be set to {@code Long.MAX_VALUE} so selector threads will never block.
*
* @param selectorSpins Selector thread busy-loop iterations.
* @return {@code this} for chaining.
*/
@IgniteSpiConfiguration(optional = true)
public TcpCommunicationSpi setSelectorSpins(long selectorSpins) {
this.selectorSpins = selectorSpins;
return this;
}
/**
* Sets value for {@code TCP_NODELAY} socket option. Each
* socket will be opened using provided value.
* <p>
* Setting this option to {@code true} disables Nagle's algorithm
* for socket decreasing latency and delivery time for small messages.
* <p>
* For systems that work under heavy network load it is advisable to
* set this value to {@code false}.
* <p>
* If not provided, default value is {@link #DFLT_TCP_NODELAY}.
*
* @param tcpNoDelay {@code True} to disable TCP delay.
* @return {@code this} for chaining.
*/
@IgniteSpiConfiguration(optional = true)
public TcpCommunicationSpi setTcpNoDelay(boolean tcpNoDelay) {
this.tcpNoDelay = tcpNoDelay;
return this;
}
/**
* Gets value for {@code TCP_NODELAY} socket option.
*
* @return {@code True} if TCP delay is disabled.
*/
public boolean isTcpNoDelay() {
return tcpNoDelay;
}
/**
* Gets value for {@code FILTER_REACHABLE_ADDRESSES} socket option.
*
* @return {@code True} if needed to filter reachable addresses.
*/
public boolean isFilterReachableAddresses() {
return filterReachableAddresses;
}
/**
* Setting this option to {@code true} enables filter for reachable
* addresses on creating tcp client.
* <p>
* Usually its advised to set this value to {@code false}.
* <p>
* If not provided, default value is {@link #DFLT_FILTER_REACHABLE_ADDRESSES}.
*
* @param filterReachableAddresses {@code True} to filter reachable addresses.
* @return {@code this} for chaining.
*/
@IgniteSpiConfiguration(optional = true)
public TcpCommunicationSpi setFilterReachableAddresses(boolean filterReachableAddresses) {
this.filterReachableAddresses = filterReachableAddresses;
return this;
}
/**
* Sets receive buffer size for sockets created or accepted by this SPI.
* <p>
* If not provided, default is {@link #DFLT_SOCK_BUF_SIZE}.
*
* @param sockRcvBuf Socket receive buffer size.
* @return {@code this} for chaining.
*/
@IgniteSpiConfiguration(optional = true)
public TcpCommunicationSpi setSocketReceiveBuffer(int sockRcvBuf) {
this.sockRcvBuf = sockRcvBuf;
return this;
}
/**
* See {@link #setSocketReceiveBuffer(int)}.
*
* @return Socket receive buffer size.
*/
public int getSocketReceiveBuffer() {
return sockRcvBuf;
}
/**
* Sets send buffer size for sockets created or accepted by this SPI.
* <p>
* If not provided, default is {@link #DFLT_SOCK_BUF_SIZE}.
*
* @param sockSndBuf Socket send buffer size.
* @return {@code this} for chaining.
*/
@IgniteSpiConfiguration(optional = true)
public TcpCommunicationSpi setSocketSendBuffer(int sockSndBuf) {
this.sockSndBuf = sockSndBuf;
return this;
}
/**
* See {@link #setSocketSendBuffer(int)}.
*
* @return Socket send buffer size.
*/
public int getSocketSendBuffer() {
return sockSndBuf;
}
/**
* Sets message queue limit for incoming and outgoing messages.
* <p>
* When set to positive number send queue is limited to the configured value.
* {@code 0} disables the size limitations.
* <p>
* If not provided, default is {@link #DFLT_MSG_QUEUE_LIMIT}.
*
* @param msgQueueLimit Send queue size limit.
* @return {@code this} for chaining.
*/
@IgniteSpiConfiguration(optional = true)
public TcpCommunicationSpi setMessageQueueLimit(int msgQueueLimit) {
this.msgQueueLimit = msgQueueLimit;
return this;
}
/**
* Gets message queue limit for incoming and outgoing messages.
*
* @return Send queue size limit.
*/
public int getMessageQueueLimit() {
return msgQueueLimit;
}
/**
* See {@link #setSlowClientQueueLimit(int)}.
*
* @return Slow client queue limit.
*/
public int getSlowClientQueueLimit() {
return slowClientQueueLimit;
}
/**
* Sets slow client queue limit.
* <p/>
* When set to a positive number, communication SPI will monitor clients outbound message queue sizes and will drop
* those clients whose queue exceeded this limit.
* <p/>
* This value should be set to less or equal value than {@link #getMessageQueueLimit()} which controls
* message back-pressure for server nodes. The default value for this parameter is {@code 0}
* which means {@code unlimited}.
*
* @param slowClientQueueLimit Slow client queue limit.
* @return {@code this} for chaining.
*/
@IgniteSpiConfiguration(optional = true)
public TcpCommunicationSpi setSlowClientQueueLimit(int slowClientQueueLimit) {
this.slowClientQueueLimit = slowClientQueueLimit;
return this;
}
/** {@inheritDoc} */
@Override public void setListener(CommunicationListener<Message> lsnr) {
this.lsnr = lsnr;
}
/**
* @return Listener.
*/
public CommunicationListener getListener() {
return lsnr;
}
/** {@inheritDoc} */
@Override public int getSentMessagesCount() {
// Listener could be not initialized yet, but discovery thread could try to aggregate metrics.
if (metricsLsnr == null)
return 0;
return metricsLsnr.sentMessagesCount();
}
/** {@inheritDoc} */
@Override public long getSentBytesCount() {
// Listener could be not initialized yet, but discovery thread clould try to aggregate metrics.
if (metricsLsnr == null)
return 0;
return metricsLsnr.sentBytesCount();
}
/** {@inheritDoc} */
@Override public int getReceivedMessagesCount() {
// Listener could be not initialized yet, but discovery thread could try to aggregate metrics.
if (metricsLsnr == null)
return 0;
return metricsLsnr.receivedMessagesCount();
}
/** {@inheritDoc} */
@Override public long getReceivedBytesCount() {
// Listener could be not initialized yet, but discovery thread could try to aggregate metrics.
if (metricsLsnr == null)
return 0;
return metricsLsnr.receivedBytesCount();
}
/**
* Gets received messages counts (grouped by type).
*
* @return Map containing message types and respective counts.
*/
public Map<String, Long> getReceivedMessagesByType() {
return metricsLsnr.receivedMessagesByType();
}
/**
* Gets received messages counts (grouped by node).
*
* @return Map containing sender nodes and respective counts.
*/
public Map<UUID, Long> getReceivedMessagesByNode() {
return metricsLsnr.receivedMessagesByNode();
}
/**
* Gets sent messages counts (grouped by type).
*
* @return Map containing message types and respective counts.
*/
public Map<String, Long> getSentMessagesByType() {
return metricsLsnr.sentMessagesByType();
}
/**
* Gets sent messages counts (grouped by node).
*
* @return Map containing receiver nodes and respective counts.
*/
public Map<UUID, Long> getSentMessagesByNode() {
return metricsLsnr.sentMessagesByNode();
}
/** {@inheritDoc} */
@Override public int getOutboundMessagesQueueSize() {
GridNioServer<Message> srv = nioSrvr;
return srv != null ? srv.outboundMessagesQueueSize() : 0;
}
/** {@inheritDoc} */
@Override public void resetMetrics() {
metricsLsnr.resetMetrics();
}
/**
* @param nodeId Target node ID.
* @return Future.
*/
public IgniteInternalFuture<String> dumpNodeStatistics(final UUID nodeId) {
StringBuilder sb = new StringBuilder("Communication SPI statistics [rmtNode=").append(nodeId).append(']').append(U.nl());
dumpInfo(sb, nodeId);
GridNioServer<Message> nioSrvr = this.nioSrvr;
if (nioSrvr != null) {
sb.append("NIO sessions statistics:");
IgnitePredicate<GridNioSession> p = new IgnitePredicate<GridNioSession>() {
@Override public boolean apply(GridNioSession ses) {
ConnectionKey connId = ses.meta(CONN_IDX_META);
return connId != null && nodeId.equals(connId.nodeId());
}
};
return nioSrvr.dumpStats(sb.toString(), p);
}
else {
sb.append(U.nl()).append("GridNioServer is null.");
return new GridFinishedFuture<>(sb.toString());
}
}
/**
* Dumps SPI per-connection stats to logs.
*/
public void dumpStats() {
final IgniteLogger log = this.diagnosticLog;
if (log != null) {
StringBuilder sb = new StringBuilder();
dumpInfo(sb, null);
U.warn(log, sb.toString());
GridNioServer<Message> nioSrvr = this.nioSrvr;
if (nioSrvr != null) {
nioSrvr.dumpStats().listen(new CI1<IgniteInternalFuture<String>>() {
@Override public void apply(IgniteInternalFuture<String> fut) {
try {
U.warn(log, fut.get());
}
catch (Exception e) {
U.error(log, "Failed to dump NIO server statistics: " + e, e);
}
}
});
}
}
}
/**
* @param sb Message builder.
* @param dstNodeId Target node ID.
*/
private void dumpInfo(StringBuilder sb, UUID dstNodeId) {
sb.append("Communication SPI recovery descriptors: ").append(U.nl());
for (Map.Entry<ConnectionKey, GridNioRecoveryDescriptor> entry : recoveryDescs.entrySet()) {
GridNioRecoveryDescriptor desc = entry.getValue();
if (dstNodeId != null && !dstNodeId.equals(entry.getKey().nodeId()))
continue;
sb.append(" [key=").append(entry.getKey())
.append(", msgsSent=").append(desc.sent())
.append(", msgsAckedByRmt=").append(desc.acked())
.append(", msgsRcvd=").append(desc.received())
.append(", lastAcked=").append(desc.lastAcknowledged())
.append(", reserveCnt=").append(desc.reserveCount())
.append(", descIdHash=").append(System.identityHashCode(desc))
.append(']').append(U.nl());
}
for (Map.Entry<ConnectionKey, GridNioRecoveryDescriptor> entry : outRecDescs.entrySet()) {
GridNioRecoveryDescriptor desc = entry.getValue();
if (dstNodeId != null && !dstNodeId.equals(entry.getKey().nodeId()))
continue;
sb.append(" [key=").append(entry.getKey())
.append(", msgsSent=").append(desc.sent())
.append(", msgsAckedByRmt=").append(desc.acked())
.append(", reserveCnt=").append(desc.reserveCount())
.append(", connected=").append(desc.connected())
.append(", reserved=").append(desc.reserved())
.append(", descIdHash=").append(System.identityHashCode(desc))
.append(']').append(U.nl());
}
for (Map.Entry<ConnectionKey, GridNioRecoveryDescriptor> entry : inRecDescs.entrySet()) {
GridNioRecoveryDescriptor desc = entry.getValue();
if (dstNodeId != null && !dstNodeId.equals(entry.getKey().nodeId()))
continue;
sb.append(" [key=").append(entry.getKey())
.append(", msgsRcvd=").append(desc.received())
.append(", lastAcked=").append(desc.lastAcknowledged())
.append(", reserveCnt=").append(desc.reserveCount())
.append(", connected=").append(desc.connected())
.append(", reserved=").append(desc.reserved())
.append(", handshakeIdx=").append(desc.handshakeIndex())
.append(", descIdHash=").append(System.identityHashCode(desc))
.append(']').append(U.nl());
}
sb.append("Communication SPI clients: ").append(U.nl());
for (Map.Entry<UUID, GridCommunicationClient[]> entry : clients.entrySet()) {
UUID clientNodeId = entry.getKey();
if (dstNodeId != null && !dstNodeId.equals(clientNodeId))
continue;
GridCommunicationClient[] clients0 = entry.getValue();
for (GridCommunicationClient client : clients0) {
if (client != null) {
sb.append(" [node=").append(clientNodeId)
.append(", client=").append(client)
.append(']').append(U.nl());
}
}
}
}
/** {@inheritDoc} */
@Override public Map<String, Object> getNodeAttributes() throws IgniteSpiException {
initFailureDetectionTimeout();
assertParameter(locPort > 1023, "locPort > 1023");
assertParameter(locPort <= 0xffff, "locPort < 0xffff");
assertParameter(locPortRange >= 0, "locPortRange >= 0");
assertParameter(idleConnTimeout > 0, "idleConnTimeout > 0");
assertParameter(sockRcvBuf >= 0, "sockRcvBuf >= 0");
assertParameter(sockSndBuf >= 0, "sockSndBuf >= 0");
assertParameter(msgQueueLimit >= 0, "msgQueueLimit >= 0");
assertParameter(shmemPort > 0 || shmemPort == -1, "shmemPort > 0 || shmemPort == -1");
assertParameter(selectorsCnt > 0, "selectorsCnt > 0");
assertParameter(connectionsPerNode > 0, "connectionsPerNode > 0");
assertParameter(connectionsPerNode <= MAX_CONN_PER_NODE, "connectionsPerNode <= 1024");
if (!failureDetectionTimeoutEnabled()) {
assertParameter(reconCnt > 0, "reconnectCnt > 0");
assertParameter(connTimeout >= 0, "connTimeout >= 0");
assertParameter(maxConnTimeout >= connTimeout, "maxConnTimeout >= connTimeout");
}
assertParameter(sockWriteTimeout >= 0, "sockWriteTimeout >= 0");
assertParameter(ackSndThreshold > 0, "ackSndThreshold > 0");
assertParameter(unackedMsgsBufSize >= 0, "unackedMsgsBufSize >= 0");
if (unackedMsgsBufSize > 0) {
assertParameter(unackedMsgsBufSize >= msgQueueLimit * 5,
"Specified 'unackedMsgsBufSize' is too low, it should be at least 'msgQueueLimit * 5'.");
assertParameter(unackedMsgsBufSize >= ackSndThreshold * 5,
"Specified 'unackedMsgsBufSize' is too low, it should be at least 'ackSndThreshold * 5'.");
}
if (connectionsPerNode > 1)
connPlc = new RoundRobinConnectionPolicy();
else
connPlc = new FirstConnectionPolicy();
chConnPlc = new ConnectionPolicy() {
/** Sequential connection index provider. */
private final AtomicInteger chIdx = new AtomicInteger(MAX_CONN_PER_NODE + 1);
@Override public int connectionIndex() {
return chIdx.incrementAndGet();
}
};
try {
locHost = U.resolveLocalHost(locAddr);
}
catch (IOException e) {
throw new IgniteSpiException("Failed to initialize local address: " + locAddr, e);
}
try {
shmemSrv = resetShmemServer();
}
catch (IgniteCheckedException e) {
U.warn(log, "Failed to start shared memory communication server.", e);
}
try {
// This method potentially resets local port to the value
// local node was bound to.
nioSrvr = resetNioServer();
}
catch (IgniteCheckedException e) {
throw new IgniteSpiException("Failed to initialize TCP server: " + locHost, e);
}
// Set local node attributes.
try {
IgniteBiTuple<Collection<String>, Collection<String>> addrs = U.resolveLocalAddresses(locHost);
Collection<InetSocketAddress> extAddrs = addrRslvr == null ? null :
U.resolveAddresses(addrRslvr, F.flat(Arrays.asList(addrs.get1(), addrs.get2())), boundTcpPort);
Map<String, Object> res = new HashMap<>(5);
boolean setEmptyHostNamesAttr = !getBoolean(IGNITE_TCP_COMM_SET_ATTR_HOST_NAMES, false) &&
(!F.isEmpty(locAddr) && locHost.getHostAddress().equals(locAddr)) && !locHost.isAnyLocalAddress() &&
!locHost.isLoopbackAddress();
res.put(createSpiAttributeName(ATTR_ADDRS), addrs.get1());
res.put(createSpiAttributeName(ATTR_HOST_NAMES), setEmptyHostNamesAttr ? emptyList() : addrs.get2());
res.put(createSpiAttributeName(ATTR_PORT), boundTcpPort);
res.put(createSpiAttributeName(ATTR_SHMEM_PORT), boundTcpShmemPort >= 0 ? boundTcpShmemPort : null);
res.put(createSpiAttributeName(ATTR_EXT_ADDRS), extAddrs);
res.put(createSpiAttributeName(ATTR_PAIRED_CONN), usePairedConnections);
return res;
}
catch (IOException | IgniteCheckedException e) {
throw new IgniteSpiException("Failed to resolve local host to addresses: " + locHost, e);
}
}
/**
* @return Bound TCP server port.
*/
public int boundPort() {
return boundTcpPort;
}
/** {@inheritDoc} */
@Override public void spiStart(String igniteInstanceName) throws IgniteSpiException {
assert locHost != null;
// Start SPI start stopwatch.
startStopwatch();
// Ack parameters.
if (log.isDebugEnabled()) {
log.debug(configInfo("locAddr", locAddr));
log.debug(configInfo("locPort", locPort));
log.debug(configInfo("locPortRange", locPortRange));
log.debug(configInfo("idleConnTimeout", idleConnTimeout));
log.debug(configInfo("directBuf", directBuf));
log.debug(configInfo("directSendBuf", directSndBuf));
log.debug(configInfo("selectorsCnt", selectorsCnt));
log.debug(configInfo("tcpNoDelay", tcpNoDelay));
log.debug(configInfo("sockSndBuf", sockSndBuf));
log.debug(configInfo("sockRcvBuf", sockRcvBuf));
log.debug(configInfo("shmemPort", shmemPort));
log.debug(configInfo("msgQueueLimit", msgQueueLimit));
log.debug(configInfo("connectionsPerNode", connectionsPerNode));
if (failureDetectionTimeoutEnabled()) {
log.debug(configInfo("connTimeout", connTimeout));
log.debug(configInfo("maxConnTimeout", maxConnTimeout));
log.debug(configInfo("reconCnt", reconCnt));
}
else
log.debug(configInfo("failureDetectionTimeout", failureDetectionTimeout()));
log.debug(configInfo("sockWriteTimeout", sockWriteTimeout));
log.debug(configInfo("ackSndThreshold", ackSndThreshold));
log.debug(configInfo("unackedMsgsBufSize", unackedMsgsBufSize));
}
if (!tcpNoDelay)
U.quietAndWarn(log, "'TCP_NO_DELAY' for communication is off, which should be used with caution " +
"since may produce significant delays with some scenarios.");
if (slowClientQueueLimit > 0 && msgQueueLimit > 0 && slowClientQueueLimit >= msgQueueLimit) {
U.quietAndWarn(log, "Slow client queue limit is set to a value greater than or equal to message " +
"queue limit (slow client queue limit will have no effect) [msgQueueLimit=" + msgQueueLimit +
", slowClientQueueLimit=" + slowClientQueueLimit + ']');
}
if (msgQueueLimit == 0)
U.quietAndWarn(log, "Message queue limit is set to 0 which may lead to " +
"potential OOMEs when running cache operations in FULL_ASYNC or PRIMARY_SYNC modes " +
"due to message queues growth on sender and receiver sides.");
registerMBean(igniteInstanceName, new TcpCommunicationSpiMBeanImpl(this), TcpCommunicationSpiMBean.class);
connectGate = new ConnectGateway();
if (shmemSrv != null) {
shmemAcceptWorker = new ShmemAcceptWorker(shmemSrv);
new IgniteThread(shmemAcceptWorker).start();
}
nioSrvr.start();
commWorker = new CommunicationWorker(igniteInstanceName, log);
new IgniteSpiThread(igniteInstanceName, commWorker.name(), log) {
@Override protected void body() {
commWorker.run();
}
}.start();
// Ack start.
if (log.isDebugEnabled())
log.debug(startInfo());
}
/** {@inheritDoc} } */
@Override public void onContextInitialized0(IgniteSpiContext spiCtx) throws IgniteSpiException {
spiCtx.registerPort(boundTcpPort, IgnitePortProtocol.TCP);
// SPI can start without shmem port.
if (boundTcpShmemPort > 0)
spiCtx.registerPort(boundTcpShmemPort, IgnitePortProtocol.TCP);
spiCtx.addLocalEventListener(discoLsnr, EVT_NODE_LEFT, EVT_NODE_FAILED);
ctxInitLatch.countDown();
metricsLsnr = new TcpCommunicationMetricsListener(ignite, spiCtx);
}
/** {@inheritDoc} */
@Override public IgniteSpiContext getSpiContext() {
if (ctxInitLatch.getCount() > 0) {
if (log.isDebugEnabled())
log.debug("Waiting for context initialization.");
try {
U.await(ctxInitLatch);
if (log.isDebugEnabled())
log.debug("Context has been initialized.");
}
catch (IgniteInterruptedCheckedException e) {
U.warn(log, "Thread has been interrupted while waiting for SPI context initialization.", e);
}
}
return super.getSpiContext();
}
/**
* Recreates tpcSrvr socket instance.
*
* @return Server instance.
* @throws IgniteCheckedException Thrown if it's not possible to create server.
*/
private GridNioServer<Message> resetNioServer() throws IgniteCheckedException {
if (boundTcpPort >= 0)
throw new IgniteCheckedException("Tcp NIO server was already created on port " + boundTcpPort);
IgniteCheckedException lastEx = null;
// If configured TCP port is busy, find first available in range.
int lastPort = locPortRange == 0 ? locPort : locPort + locPortRange - 1;
for (int port = locPort; port <= lastPort; port++) {
try {
MessageFactory msgFactory = new MessageFactory() {
private MessageFactory impl;
@Nullable @Override public Message create(short type) {
if (impl == null)
impl = getSpiContext().messageFactory();
assert impl != null;
return impl.create(type);
}
};
GridNioMessageReaderFactory readerFactory = new GridNioMessageReaderFactory() {
private IgniteSpiContext context;
private MessageFormatter formatter;
@Override public MessageReader reader(GridNioSession ses, MessageFactory msgFactory)
throws IgniteCheckedException {
final IgniteSpiContext ctx = TcpCommunicationSpi.super.getSpiContext();
if (formatter == null || context != ctx) {
context = ctx;
formatter = context.messageFormatter();
}
assert formatter != null;
ConnectionKey key = ses.meta(CONN_IDX_META);
return key != null ? formatter.reader(key.nodeId(), msgFactory) : null;
}
};
GridNioMessageWriterFactory writerFactory = new GridNioMessageWriterFactory() {
private IgniteSpiContext context;
private MessageFormatter formatter;
@Override public MessageWriter writer(GridNioSession ses) throws IgniteCheckedException {
final IgniteSpiContext ctx = TcpCommunicationSpi.super.getSpiContext();
if (formatter == null || context != ctx) {
context = ctx;
formatter = context.messageFormatter();
}
assert formatter != null;
ConnectionKey key = ses.meta(CONN_IDX_META);
return key != null ? formatter.writer(key.nodeId()) : null;
}
};
GridDirectParser parser = new GridDirectParser(log.getLogger(GridDirectParser.class),
msgFactory,
readerFactory);
IgnitePredicate<Message> skipRecoveryPred = new IgnitePredicate<Message>() {
@Override public boolean apply(Message msg) {
return msg instanceof RecoveryLastReceivedMessage;
}
};
boolean clientMode = Boolean.TRUE.equals(ignite.configuration().isClientMode());
IgniteBiInClosure<GridNioSession, Integer> queueSizeMonitor =
!clientMode && slowClientQueueLimit > 0 ?
new CI2<GridNioSession, Integer>() {
@Override public void apply(GridNioSession ses, Integer qSize) {
checkClientQueueSize(ses, qSize);
}
} :
null;
GridNioFilter[] filters;
if (isSslEnabled()) {
GridNioSslFilter sslFilter =
new GridNioSslFilter(ignite.configuration().getSslContextFactory().create(),
true, ByteOrder.LITTLE_ENDIAN, log);
sslFilter.directMode(true);
sslFilter.wantClientAuth(true);
sslFilter.needClientAuth(true);
filters = new GridNioFilter[] {
new GridNioTracerFilter(log, tracing),
new GridNioCodecFilter(parser, log, true),
new GridConnectionBytesVerifyFilter(log),
sslFilter
};
}
else
filters = new GridNioFilter[] {
new GridNioTracerFilter(log, tracing),
new GridNioCodecFilter(parser, log, true),
new GridConnectionBytesVerifyFilter(log)
};
GridNioServer.Builder<Message> builder = GridNioServer.<Message>builder()
.address(locHost)
.port(port)
.listener(srvLsnr)
.logger(log)
.selectorCount(selectorsCnt)
.igniteInstanceName(igniteInstanceName)
.serverName("tcp-comm")
.tcpNoDelay(tcpNoDelay)
.directBuffer(directBuf)
.byteOrder(ByteOrder.LITTLE_ENDIAN)
.socketSendBufferSize(sockSndBuf)
.socketReceiveBufferSize(sockRcvBuf)
.sendQueueLimit(msgQueueLimit)
.directMode(true)
.writeTimeout(sockWriteTimeout)
.selectorSpins(selectorSpins)
.filters(filters)
.writerFactory(writerFactory)
.skipRecoveryPredicate(skipRecoveryPred)
.messageQueueSizeListener(queueSizeMonitor)
.tracing(tracing)
.readWriteSelectorsAssign(usePairedConnections);
if (ignite instanceof IgniteEx) {
IgniteEx igniteEx = (IgniteEx)ignite;
builder.workerListener(igniteEx.context().workersRegistry())
.metricRegistry(igniteEx.context().metric().registry(COMMUNICATION_METRICS_GROUP_NAME));
}
GridNioServer<Message> srvr = builder.build();
boundTcpPort = port;
// Ack Port the TCP server was bound to.
if (log.isInfoEnabled()) {
log.info("Successfully bound communication NIO server to TCP port " +
"[port=" + boundTcpPort +
", locHost=" + locHost +
", selectorsCnt=" + selectorsCnt +
", selectorSpins=" + srvr.selectorSpins() +
", pairedConn=" + usePairedConnections + ']');
}
srvr.idleTimeout(idleConnTimeout);
return srvr;
}
catch (IgniteCheckedException e) {
if (X.hasCause(e, SSLException.class))
throw new IgniteSpiException("Failed to create SSL context. SSL factory: "
+ ignite.configuration().getSslContextFactory() + '.', e);
lastEx = e;
if (log.isDebugEnabled())
log.debug("Failed to bind to local port (will try next port within range) [port=" + port +
", locHost=" + locHost + ']');
onException("Failed to bind to local port (will try next port within range) [port=" + port +
", locHost=" + locHost + ']', e);
}
}
// If free port wasn't found.
throw new IgniteCheckedException("Failed to bind to any port within range [startPort=" + locPort +
", portRange=" + locPortRange + ", locHost=" + locHost + ']', lastEx);
}
/**
* Creates new shared memory communication server.
*
* @return Server.
* @throws IgniteCheckedException If failed.
*/
@Nullable private IpcSharedMemoryServerEndpoint resetShmemServer() throws IgniteCheckedException {
if (boundTcpShmemPort >= 0)
throw new IgniteCheckedException("Shared memory server was already created on port " + boundTcpShmemPort);
if (shmemPort == -1 || U.isWindows())
return null;
IgniteCheckedException lastEx = null;
// If configured TCP port is busy, find first available in range.
for (int port = shmemPort; port < shmemPort + locPortRange; port++) {
try {
IgniteConfiguration cfg = ignite.configuration();
IpcSharedMemoryServerEndpoint srv =
new IpcSharedMemoryServerEndpoint(log, cfg.getNodeId(), igniteInstanceName, cfg.getWorkDirectory());
srv.setPort(port);
srv.omitOutOfResourcesWarning(true);
srv.start();
boundTcpShmemPort = port;
// Ack Port the TCP server was bound to.
if (log.isInfoEnabled())
log.info("Successfully bound shared memory communication to TCP port [port=" + boundTcpShmemPort +
", locHost=" + locHost + ']');
return srv;
}
catch (IgniteCheckedException e) {
lastEx = e;
if (log.isDebugEnabled())
log.debug("Failed to bind to local port (will try next port within range) [port=" + port +
", locHost=" + locHost + ']');
}
}
// If free port wasn't found.
throw new IgniteCheckedException("Failed to bind shared memory communication to any port within range [startPort=" +
locPort + ", portRange=" + locPortRange + ", locHost=" + locHost + ']', lastEx);
}
/** {@inheritDoc} */
@Override public void spiStop() throws IgniteSpiException {
assert stopping;
unregisterMBean();
// Stop TCP server.
if (nioSrvr != null)
nioSrvr.stop();
U.cancel(commWorker);
U.join(commWorker, log);
U.cancel(shmemAcceptWorker);
U.join(shmemAcceptWorker, log);
U.cancel(shmemWorkers);
U.join(shmemWorkers, log);
shmemWorkers.clear();
// Force closing on stop (safety).
for (GridCommunicationClient[] clients0 : clients.values()) {
for (GridCommunicationClient client : clients0) {
if (client != null)
client.forceClose();
}
}
// Clear resources.
nioSrvr = null;
commWorker = null;
boundTcpPort = -1;
// Ack stop.
if (log.isDebugEnabled())
log.debug(stopInfo());
}
/** {@inheritDoc} */
@Override protected void onContextDestroyed0() {
stopping = true;
if (ctxInitLatch.getCount() > 0)
// Safety.
ctxInitLatch.countDown();
if (connectGate != null)
connectGate.stopped();
getSpiContext().deregisterPorts();
getSpiContext().removeLocalEventListener(discoLsnr);
}
/** {@inheritDoc} */
@Override public void onClientDisconnected(IgniteFuture<?> reconnectFut) {
connectGate.disconnected(reconnectFut);
for (GridCommunicationClient[] clients0 : clients.values()) {
for (GridCommunicationClient client : clients0) {
if (client != null)
client.forceClose();
}
}
IgniteClientDisconnectedCheckedException err = new IgniteClientDisconnectedCheckedException(reconnectFut,
"Failed to connect client node disconnected.");
for (GridFutureAdapter<GridCommunicationClient> clientFut : clientFuts.values())
clientFut.onDone(err);
recoveryDescs.clear();
inRecDescs.clear();
outRecDescs.clear();
}
/** {@inheritDoc} */
@Override public void onClientReconnected(boolean clusterRestarted) {
connectGate.reconnected();
}
/**
* @param consistentId Consistent id of the node.
* @param nodeId Left node ID.
*/
void onNodeLeft(Object consistentId, UUID nodeId) {
assert nodeId != null;
metricsLsnr.onNodeLeft(consistentId);
GridCommunicationClient[] clients0 = clients.remove(nodeId);
if (clients0 != null) {
for (GridCommunicationClient client : clients0) {
if (client != null) {
if (log.isDebugEnabled())
log.debug("Forcing NIO client close since node has left [nodeId=" + nodeId +
", client=" + client + ']');
client.forceClose();
}
}
}
}
/** {@inheritDoc} */
@Override protected void checkConfigurationConsistency0(IgniteSpiContext spiCtx, ClusterNode node, boolean starting)
throws IgniteSpiException {
// These attributes are set on node startup in any case, so we MUST receive them.
checkAttributePresence(node, createSpiAttributeName(ATTR_ADDRS));
checkAttributePresence(node, createSpiAttributeName(ATTR_HOST_NAMES));
checkAttributePresence(node, createSpiAttributeName(ATTR_PORT));
}
/**
* Checks that node has specified attribute and prints warning if it does not.
*
* @param node Node to check.
* @param attrName Name of the attribute.
*/
private void checkAttributePresence(ClusterNode node, String attrName) {
if (node.attribute(attrName) == null)
U.warn(log, "Remote node has inconsistent configuration (required attribute was not found) " +
"[attrName=" + attrName + ", nodeId=" + node.id() +
"spiCls=" + U.getSimpleName(TcpCommunicationSpi.class) + ']');
}
/** {@inheritDoc} */
@Override public void sendMessage(ClusterNode node, Message msg) throws IgniteSpiException {
sendMessage0(node, msg, null);
}
/**
* @param nodes Nodes to check connection with.
* @return Result future (each bit in result BitSet contains connection status to corresponding node).
*/
public IgniteFuture<BitSet> checkConnection(List<ClusterNode> nodes) {
TcpCommunicationConnectionCheckFuture fut = new TcpCommunicationConnectionCheckFuture(
this,
log.getLogger(TcpCommunicationConnectionCheckFuture.class),
nioSrvr,
nodes);
long timeout = failureDetectionTimeoutEnabled() ? failureDetectionTimeout() : connTimeout;
if (log.isInfoEnabled())
log.info("Start check connection process [nodeCnt=" + nodes.size() + ", timeout=" + timeout + ']');
fut.init(timeout);
return new IgniteFutureImpl<>(fut);
}
/**
* Sends given message to destination node. Note that characteristics of the
* exchange such as durability, guaranteed delivery or error notification is
* dependant on SPI implementation.
*
* @param node Destination node.
* @param msg Message to send.
* @param ackC Ack closure.
* @throws org.apache.ignite.spi.IgniteSpiException Thrown in case of any error during sending the message.
* Note that this is not guaranteed that failed communication will result
* in thrown exception as this is dependant on SPI implementation.
*/
public void sendMessage(ClusterNode node, Message msg, IgniteInClosure<IgniteException> ackC)
throws IgniteSpiException {
sendMessage0(node, msg, ackC);
}
/**
* @param node Destination node.
* @param msg Message to send.
* @param ackC Ack closure.
* @throws org.apache.ignite.spi.IgniteSpiException Thrown in case of any error during sending the message.
* Note that this is not guaranteed that failed communication will result
* in thrown exception as this is dependant on SPI implementation.
*/
private void sendMessage0(ClusterNode node, Message msg, IgniteInClosure<IgniteException> ackC)
throws IgniteSpiException {
assert node != null;
assert msg != null;
if (log.isTraceEnabled())
log.trace("Sending message with ack to node [node=" + node + ", msg=" + msg + ']');
if (isLocalNodeDisconnected()) {
throw new IgniteSpiException("Failed to send a message to remote node because local node has " +
"been disconnected [rmtNodeId=" + node.id() + ']');
}
ClusterNode locNode = getLocalNode();
if (locNode == null)
throw new IgniteSpiException("Local node has not been started or fully initialized " +
"[isStopping=" + getSpiContext().isStopping() + ']');
if (node.id().equals(locNode.id()))
notifyListener(node.id(), msg, NOOP);
else {
GridCommunicationClient client = null;
int connIdx = connPlc.connectionIndex();
try {
boolean retry;
do {
client = reserveClient(node, connIdx);
UUID nodeId = null;
if (!client.async())
nodeId = node.id();
retry = client.sendMessage(nodeId, msg, ackC);
client.release();
if (retry) {
removeNodeClient(node.id(), client);
ClusterNode node0 = getSpiContext().node(node.id());
if (node0 == null)
throw new ClusterTopologyCheckedException("Failed to send message to remote node " +
"(node has left the grid): " + node.id());
}
client = null;
}
while (retry);
}
catch (Throwable t) {
if (stopping)
throw new IgniteSpiException("Node is stopping.", t);
log.error("Failed to send message to remote node [node=" + node + ", msg=" + msg + ']', t);
if (t instanceof Error)
throw (Error)t;
if (t instanceof RuntimeException)
throw (RuntimeException)t;
throw new IgniteSpiException("Failed to send message to remote node: " + node, t);
}
finally {
if (client != null && removeNodeClient(node.id(), client))
client.forceClose();
}
}
}
/**
* @return {@code True} if local node in disconnected state.
*/
private boolean isLocalNodeDisconnected() {
boolean disconnected = false;
if (ignite instanceof IgniteKernal)
disconnected = ((IgniteKernal)ignite).context().clientDisconnected();
return disconnected;
}
/**
* @param nodeId Node ID.
* @param rmvClient Client to remove.
* @return {@code True} if client was removed.
*/
private boolean removeNodeClient(UUID nodeId, GridCommunicationClient rmvClient) {
for (;;) {
GridCommunicationClient[] curClients = clients.get(nodeId);
if (curClients == null || rmvClient.connectionIndex() >= curClients.length || curClients[rmvClient.connectionIndex()] != rmvClient)
return false;
GridCommunicationClient[] newClients = Arrays.copyOf(curClients, curClients.length);
newClients[rmvClient.connectionIndex()] = null;
if (clients.replace(nodeId, curClients, newClients))
return true;
}
}
/**
* @param node Node.
* @param connIdx Connection index.
* @param addClient Client to add.
*/
private void addNodeClient(ClusterNode node, int connIdx, GridCommunicationClient addClient) {
assert connectionsPerNode > 0 : connectionsPerNode;
assert connIdx == addClient.connectionIndex() : addClient;
if (connIdx >= connectionsPerNode) {
assert !usePairedConnections(node);
return;
}
for (;;) {
GridCommunicationClient[] curClients = clients.get(node.id());
assert curClients == null || curClients[connIdx] == null : "Client already created [node=" + node.id() +
", connIdx=" + connIdx +
", client=" + addClient +
", oldClient=" + curClients[connIdx] + ']';
GridCommunicationClient[] newClients;
if (curClients == null) {
newClients = new GridCommunicationClient[connectionsPerNode];
newClients[connIdx] = addClient;
if (clients.putIfAbsent(node.id(), newClients) == null)
break;
}
else {
newClients = Arrays.copyOf(curClients, curClients.length);
newClients[connIdx] = addClient;
if (clients.replace(node.id(), curClients, newClients))
break;
}
}
}
/**
* Returns existing or just created client to node.
*
* @param node Node to which client should be open.
* @param connIdx Connection index.
* @return The existing or just created client.
* @throws IgniteCheckedException Thrown if any exception occurs.
*/
private GridCommunicationClient reserveClient(ClusterNode node, int connIdx) throws IgniteCheckedException {
assert node != null;
assert (connIdx >= 0 && connIdx < connectionsPerNode) || !usePairedConnections(node) : connIdx;
UUID nodeId = node.id();
while (true) {
GridCommunicationClient[] curClients = clients.get(nodeId);
GridCommunicationClient client = curClients != null && connIdx < curClients.length ?
curClients[connIdx] : null;
if (client == null) {
if (stopping)
throw new IgniteSpiException("Node is stopping.");
// Do not allow concurrent connects.
GridFutureAdapter<GridCommunicationClient> fut = new ConnectFuture();
ConnectionKey connKey = new ConnectionKey(nodeId, connIdx, -1);
GridFutureAdapter<GridCommunicationClient> oldFut = clientFuts.putIfAbsent(connKey, fut);
if (oldFut == null) {
try {
GridCommunicationClient[] curClients0 = clients.get(nodeId);
GridCommunicationClient client0 = curClients0 != null && connIdx < curClients0.length ?
curClients0[connIdx] : null;
if (client0 == null) {
client0 = createCommunicationClient(node, connIdx);
if (client0 != null) {
addNodeClient(node, connIdx, client0);
if (client0 instanceof GridTcpNioCommunicationClient) {
GridTcpNioCommunicationClient tcpClient = ((GridTcpNioCommunicationClient)client0);
if (tcpClient.session().closeTime() > 0 && removeNodeClient(nodeId, client0)) {
if (log.isDebugEnabled())
log.debug("Session was closed after client creation, will retry " +
"[node=" + node + ", client=" + client0 + ']');
client0 = null;
}
}
}
else {
U.sleep(200);
if (getSpiContext().node(node.id()) == null)
throw new ClusterTopologyCheckedException("Failed to send message " +
"(node left topology): " + node);
}
}
fut.onDone(client0);
}
catch (Throwable e) {
fut.onDone(e);
if (e instanceof IgniteTooManyOpenFilesException)
throw e;
if (e instanceof Error)
throw (Error)e;
}
finally {
clientFuts.remove(connKey, fut);
}
}
else
fut = oldFut;
client = fut.get();
if (client == null) {
if (isLocalNodeDisconnected())
throw new IgniteCheckedException("Unable to create TCP client due to local node disconnecting.");
else
continue;
}
if (getSpiContext().node(nodeId) == null) {
if (removeNodeClient(nodeId, client))
client.forceClose();
throw new IgniteSpiException("Destination node is not in topology: " + node.id());
}
}
assert connIdx == client.connectionIndex() : client;
if (client.reserve())
return client;
else
// Client has just been closed by idle worker. Help it and try again.
removeNodeClient(nodeId, client);
}
}
/**
* @param node Node to create client for.
* @param connIdx Connection index.
* @return Client.
* @throws IgniteCheckedException If failed.
*/
@Nullable private GridCommunicationClient createCommunicationClient(ClusterNode node, int connIdx)
throws IgniteCheckedException {
assert node != null;
Integer shmemPort = node.attribute(createSpiAttributeName(ATTR_SHMEM_PORT));
ClusterNode locNode = getSpiContext().localNode();
if (locNode == null)
throw new IgniteCheckedException("Failed to create NIO client (local node is stopping)");
if (log.isDebugEnabled())
log.debug("Creating NIO client to node: " + node);
// If remote node has shared memory server enabled and has the same set of MACs
// then we are likely to run on the same host and shared memory communication could be tried.
if (shmemPort != null && U.sameMacs(locNode, node)) {
try {
// https://issues.apache.org/jira/browse/IGNITE-11126 Rework failure detection logic.
GridCommunicationClient client = createShmemClient(
node,
connIdx,
shmemPort);
if (log.isDebugEnabled())
log.debug("Shmem client created: " + client);
return client;
}
catch (IgniteCheckedException e) {
if (e.hasCause(IpcOutOfSystemResourcesException.class))
// Has cause or is itself the IpcOutOfSystemResourcesException.
LT.warn(log, OUT_OF_RESOURCES_TCP_MSG);
else if (getSpiContext().node(node.id()) != null)
LT.warn(log, e.getMessage());
else if (log.isDebugEnabled())
log.debug("Failed to establish shared memory connection with local node (node has left): " +
node.id());
}
}
final long start = System.currentTimeMillis();
GridCommunicationClient client = createTcpClient(node, connIdx);
final long time = System.currentTimeMillis() - start;
if (time > CONNECTION_ESTABLISH_THRESHOLD_MS) {
if (log.isInfoEnabled())
log.info("TCP client created [client=" + clientString(client, node) + ", duration=" + time + "ms]");
}
else if (log.isDebugEnabled())
log.debug("TCP client created [client=" + clientString(client, node) + ", duration=" + time + "ms]");
return client;
}
/**
* Returns the string representation of client with protection from null client value. If the client if null,
* string representation is built from cluster node.
*
* @param client communication client
* @param node cluster node to which the client tried to establish a connection
* @return string representation of client
* @throws IgniteCheckedException if failed
*/
private String clientString(GridCommunicationClient client, ClusterNode node) throws IgniteCheckedException {
if (client == null) {
assert node != null;
StringJoiner joiner = new StringJoiner(", ");
for (InetSocketAddress addr : nodeAddresses(node))
joiner.add(addr.toString());
return "null, node addrs=[" + joiner.toString() + "]";
}
else
return client.toString();
}
/**
* @param node Node.
* @param port Port.
* @param connIdx Connection index.
* @return Client.
* @throws IgniteCheckedException If failed.
*/
@Nullable private GridCommunicationClient createShmemClient(ClusterNode node,
int connIdx,
Integer port) throws IgniteCheckedException {
int attempt = 1;
int connectAttempts = 1;
long connTimeout0 = connTimeout;
IgniteSpiOperationTimeoutHelper timeoutHelper = new IgniteSpiOperationTimeoutHelper(this,
!node.isClient());
while (true) {
GridCommunicationClient client;
try {
client = new GridShmemCommunicationClient(
connIdx,
metricsLsnr.metricRegistry(),
port,
timeoutHelper.nextTimeoutChunk(connTimeout),
log,
getSpiContext().messageFormatter());
}
catch (IgniteCheckedException e) {
if (timeoutHelper.checkFailureTimeoutReached(e))
throw e;
// Reconnect for the second time, if connection is not established.
if (connectAttempts < 2 && X.hasCause(e, ConnectException.class)) {
connectAttempts++;
continue;
}
throw e;
}
try {
safeShmemHandshake(client, node.id(), timeoutHelper.nextTimeoutChunk(connTimeout0));
}
catch (IgniteSpiOperationTimeoutException e) {
client.forceClose();
if (failureDetectionTimeoutEnabled() && timeoutHelper.checkFailureTimeoutReached(e)) {
if (log.isDebugEnabled())
log.debug("Handshake timed out (failure threshold reached) [failureDetectionTimeout=" +
failureDetectionTimeout() + ", err=" + e.getMessage() + ", client=" + client + ']');
throw e;
}
assert !failureDetectionTimeoutEnabled();
if (log.isDebugEnabled())
log.debug("Handshake timed out (will retry with increased timeout) [timeout=" + connTimeout0 +
", err=" + e.getMessage() + ", client=" + client + ']');
if (attempt == reconCnt || connTimeout0 > maxConnTimeout) {
if (log.isDebugEnabled())
log.debug("Handshake timedout (will stop attempts to perform the handshake) " +
"[timeout=" + connTimeout0 + ", maxConnTimeout=" + maxConnTimeout +
", attempt=" + attempt + ", reconCnt=" + reconCnt +
", err=" + e.getMessage() + ", client=" + client + ']');
throw e;
}
else {
attempt++;
connTimeout0 *= 2;
continue;
}
}
catch (IgniteCheckedException | RuntimeException | Error e) {
if (log.isDebugEnabled())
log.debug(
"Caught exception (will close client) [err=" + e.getMessage() + ", client=" + client + ']');
client.forceClose();
throw e;
}
return client;
}
}
/**
* Checks client message queue size and initiates client drop if message queue size exceeds the configured limit.
*
* @param ses Node communication session.
* @param msgQueueSize Message queue size.
*/
private void checkClientQueueSize(GridNioSession ses, int msgQueueSize) {
if (slowClientQueueLimit > 0 && msgQueueSize > slowClientQueueLimit) {
ConnectionKey id = ses.meta(CONN_IDX_META);
if (id != null) {
ClusterNode node = getSpiContext().node(id.nodeId());
if (node != null && node.isClient()) {
String msg = "Client node outbound message queue size exceeded slowClientQueueLimit, " +
"the client will be dropped " +
"(consider changing 'slowClientQueueLimit' configuration property) " +
"[srvNode=" + getSpiContext().localNode().id() +
", clientNode=" + node +
", slowClientQueueLimit=" + slowClientQueueLimit + ']';
U.quietAndWarn(log, msg);
getSpiContext().failNode(id.nodeId(), msg);
}
}
}
}
/**
* @param node Node.
* @return Node addresses.
* @throws IgniteCheckedException If failed.
*/
private Collection<InetSocketAddress> nodeAddresses(ClusterNode node) throws IgniteCheckedException {
return nodeAddresses(node, filterReachableAddresses);
}
/**
* @param node Node.
* @param filterReachableAddresses Filter addresses flag.
* @return Node addresses.
* @throws IgniteCheckedException If node does not have addresses.
*/
public Collection<InetSocketAddress> nodeAddresses(ClusterNode node, boolean filterReachableAddresses)
throws IgniteCheckedException {
Collection<String> rmtAddrs0 = node.attribute(createSpiAttributeName(ATTR_ADDRS));
Collection<String> rmtHostNames0 = node.attribute(createSpiAttributeName(ATTR_HOST_NAMES));
Integer boundPort = node.attribute(createSpiAttributeName(ATTR_PORT));
Collection<InetSocketAddress> extAddrs = node.attribute(createSpiAttributeName(ATTR_EXT_ADDRS));
boolean isRmtAddrsExist = (!F.isEmpty(rmtAddrs0) && boundPort != null);
boolean isExtAddrsExist = !F.isEmpty(extAddrs);
if (!isRmtAddrsExist && !isExtAddrsExist)
throw new IgniteCheckedException("Failed to send message to the destination node. Node doesn't have any " +
"TCP communication addresses or mapped external addresses. Check configuration and make sure " +
"that you use the same communication SPI on all nodes. Remote node id: " + node.id());
LinkedHashSet<InetSocketAddress> addrs;
// Try to connect first on bound addresses.
if (isRmtAddrsExist) {
List<InetSocketAddress> addrs0 = new ArrayList<>(U.toSocketAddresses(rmtAddrs0, rmtHostNames0, boundPort));
boolean sameHost = U.sameMacs(getSpiContext().localNode(), node);
addrs0.sort(U.inetAddressesComparator(sameHost));
addrs = new LinkedHashSet<>(addrs0);
}
else
addrs = new LinkedHashSet<>();
// Then on mapped external addresses.
if (isExtAddrsExist)
addrs.addAll(extAddrs);
if (log.isDebugEnabled())
log.debug("Addresses resolved from attributes [rmtNode=" + node.id() + ", addrs=" + addrs +
", isRmtAddrsExist=" + isRmtAddrsExist + ']');
if (filterReachableAddresses) {
Set<InetAddress> allInetAddrs = U.newHashSet(addrs.size());
for (InetSocketAddress addr : addrs) {
// Skip unresolved as addr.getAddress() can return null.
if (!addr.isUnresolved())
allInetAddrs.add(addr.getAddress());
}
List<InetAddress> reachableInetAddrs = U.filterReachable(allInetAddrs);
if (reachableInetAddrs.size() < allInetAddrs.size()) {
LinkedHashSet<InetSocketAddress> addrs0 = U.newLinkedHashSet(addrs.size());
List<InetSocketAddress> unreachableInetAddr = new ArrayList<>(allInetAddrs.size() - reachableInetAddrs.size());
for (InetSocketAddress addr : addrs) {
if (reachableInetAddrs.contains(addr.getAddress()))
addrs0.add(addr);
else
unreachableInetAddr.add(addr);
}
addrs0.addAll(unreachableInetAddr);
addrs = addrs0;
}
if (log.isDebugEnabled())
log.debug("Addresses to connect for node [rmtNode=" + node.id() + ", addrs=" + addrs + ']');
}
return addrs;
}
/**
* Establish TCP connection to remote node and returns client.
*
* @param node Remote node.
* @param connIdx Connection index.
* @return Client.
* @throws IgniteCheckedException If failed.
*/
protected GridCommunicationClient createTcpClient(ClusterNode node, int connIdx) throws IgniteCheckedException {
GridNioSession ses = createNioSession(node, connIdx);
return ses == null ?
null : new GridTcpNioCommunicationClient(connIdx, ses, log);
}
/**
* Returns the established TCP/IP connection between the current node and remote server. A handshake process of
* negotiation between two communicating nodes will be performed before the {@link GridNioSession} created.
* <p>
* The handshaking process contains of these steps:
*
* <ol>
* <li>The local node opens a new {@link SocketChannel} in the <em>blocking</em> mode.</li>
* <li>The local node calls {@link SocketChannel#connect(SocketAddress)} to remote node.</li>
* <li>The remote GridNioAcceptWorker thread accepts new connection.</li>
* <li>The remote node sends back the {@link NodeIdMessage}.</li>
* <li>The local node reads NodeIdMessage from created channel.</li>
* <li>The local node sends the {@link HandshakeMessage2} to remote.</li>
* <li>The remote node processes {@link HandshakeMessage2} in {@link GridNioServerListener#onMessage(GridNioSession, Object)}.</li>
* <li>The remote node sends back the {@link RecoveryLastReceivedMessage}.</li>
* </ol>
*
* The handshaking process ends.
* </p>
* <p>
* <em>Note.</em> The {@link HandshakeTimeoutObject} is created to control execution timeout during the
* whole handshaking process.
* </p>
*
* @param node Remote node identifier to connect with.
* @param connIdx Connection index based on configured {@link ConnectionPolicy}.
* @return A {@link GridNioSession} connection representation.
* @throws IgniteCheckedException If establish connection fails.
*/
private GridNioSession createNioSession(ClusterNode node, int connIdx) throws IgniteCheckedException {
Collection<InetSocketAddress> addrs = nodeAddresses(node);
GridNioSession ses = null;
IgniteCheckedException errs = null;
long totalTimeout;
if (failureDetectionTimeoutEnabled())
totalTimeout = node.isClient() ? clientFailureDetectionTimeout() : failureDetectionTimeout();
else {
totalTimeout = ExponentialBackoffTimeoutStrategy.totalBackoffTimeout(
connTimeout,
maxConnTimeout,
reconCnt
);
}
for (InetSocketAddress addr : addrs) {
if (addr.isUnresolved())
continue;
TimeoutStrategy connTimeoutStgy = new ExponentialBackoffTimeoutStrategy(
totalTimeout,
failureDetectionTimeoutEnabled() ? DFLT_INITIAL_TIMEOUT : connTimeout,
maxConnTimeout
);
while (ses == null) { // Reconnection on handshake timeout.
if (stopping)
throw new IgniteSpiException("Node is stopping.");
if (isLocalNodeAddress(addr)) {
if (log.isDebugEnabled())
log.debug("Skipping local address [addr=" + addr +
", locAddrs=" + node.attribute(createSpiAttributeName(ATTR_ADDRS)) +
", node=" + node + ']');
break;
}
long timeout = 0;
connectGate.enter();
try {
if (getSpiContext().node(node.id()) == null)
throw new ClusterTopologyCheckedException("Failed to send message (node left topology): " + node);
SocketChannel ch = openSocketChannel();
ch.configureBlocking(true);
ch.socket().setTcpNoDelay(tcpNoDelay);
ch.socket().setKeepAlive(true);
if (sockRcvBuf > 0)
ch.socket().setReceiveBufferSize(sockRcvBuf);
if (sockSndBuf > 0)
ch.socket().setSendBufferSize(sockSndBuf);
ConnectionKey connKey = new ConnectionKey(node.id(), connIdx, -1);
GridNioRecoveryDescriptor recoveryDesc = outRecoveryDescriptor(node, connKey);
assert recoveryDesc != null :
"Recovery descriptor not found [connKey=" + connKey + ", rmtNode=" + node.id() + ']';
if (!recoveryDesc.reserve()) {
U.closeQuiet(ch);
// Ensure the session is closed.
GridNioSession sesFromRecovery = recoveryDesc.session();
if (sesFromRecovery != null) {
while (sesFromRecovery.closeTime() == 0)
sesFromRecovery.close();
}
return null;
}
long rcvCnt;
Map<Integer, Object> meta = new HashMap<>();
GridSslMeta sslMeta = null;
try {
timeout = connTimeoutStgy.nextTimeout();
ch.socket().connect(addr, (int) timeout);
if (getSpiContext().node(node.id()) == null)
throw new ClusterTopologyCheckedException("Failed to send message (node left topology): " + node);
if (isSslEnabled()) {
meta.put(SSL_META.ordinal(), sslMeta = new GridSslMeta());
SSLEngine sslEngine = ignite.configuration().getSslContextFactory().create().createSSLEngine();
sslEngine.setUseClientMode(true);
sslMeta.sslEngine(sslEngine);
}
ClusterNode locNode = getLocalNode();
if (locNode == null)
throw new IgniteCheckedException("Local node has not been started or " +
"fully initialized [isStopping=" + getSpiContext().isStopping() + ']');
timeout = connTimeoutStgy.nextTimeout(timeout);
rcvCnt = safeTcpHandshake(ch,
node.id(),
timeout,
sslMeta,
new HandshakeMessage2(locNode.id(),
recoveryDesc.incrementConnectCount(),
recoveryDesc.received(),
connIdx));
if (rcvCnt == ALREADY_CONNECTED)
return null;
else if (rcvCnt == NODE_STOPPING)
throw new ClusterTopologyCheckedException("Remote node started stop procedure: " + node.id());
else if (rcvCnt == UNKNOWN_NODE)
throw new ClusterTopologyCheckedException("Remote node does not observe current node " +
"in topology : " + node.id());
else if (rcvCnt == NEED_WAIT) {
//check that failure timeout will be reached after sleep(outOfTopDelay).
if (connTimeoutStgy.checkTimeout(DFLT_NEED_WAIT_DELAY)) {
U.warn(log, "Handshake NEED_WAIT timed out (will stop attempts to perform the handshake) " +
"[node=" + node.id() +
", connTimeoutStgy=" + connTimeoutStgy +
", addr=" + addr +
", failureDetectionTimeoutEnabled=" + failureDetectionTimeoutEnabled() +
", timeout=" + timeout + ']');
throw new ClusterTopologyCheckedException("Failed to connect to node " +
"(current or target node is out of topology on target node within timeout). " +
"Make sure that each ComputeTask and cache Transaction has a timeout set " +
"in order to prevent parties from waiting forever in case of network issues " +
"[nodeId=" + node.id() + ", addrs=" + addrs + ']');
}
else {
if (log.isDebugEnabled())
log.debug("NEED_WAIT received, handshake after delay [node = "
+ node + ", outOfTopologyDelay = " + DFLT_NEED_WAIT_DELAY + "ms]");
U.sleep(DFLT_NEED_WAIT_DELAY);
continue;
}
}
else if (rcvCnt < 0)
throw new IgniteCheckedException("Unsupported negative receivedCount [rcvCnt=" + rcvCnt +
", senderNode=" + node + ']');
recoveryDesc.onHandshake(rcvCnt);
meta.put(CONSISTENT_ID_META, node.consistentId());
meta.put(CONN_IDX_META, connKey);
meta.put(GridNioServer.RECOVERY_DESC_META_KEY, recoveryDesc);
ses = nioSrvr.createSession(ch, meta, false, null).get();
}
finally {
if (ses == null) {
U.closeQuiet(ch);
if (recoveryDesc != null)
recoveryDesc.release();
}
}
}
catch (IgniteSpiOperationTimeoutException e) { // Handshake is timed out.
if (ses != null) {
ses.close();
ses = null;
}
onException("Handshake timed out (will retry with increased timeout) [connTimeoutStrategy=" + connTimeoutStgy +
", addr=" + addr + ']', e);
if (log.isDebugEnabled())
log.debug("Handshake timed out (will retry with increased timeout) [connTimeoutStrategy=" + connTimeoutStgy +
", addr=" + addr + ", err=" + e + ']'
);
if (connTimeoutStgy.checkTimeout()) {
U.warn(log, "Handshake timed out (will stop attempts to perform the handshake) " +
"[node=" + node.id() + ", connTimeoutStrategy=" + connTimeoutStgy +
", err=" + e.getMessage() + ", addr=" + addr +
", failureDetectionTimeoutEnabled=" + failureDetectionTimeoutEnabled() +
", timeout=" + timeout + ']');
String msg = "Failed to connect to node (is node still alive?). " +
"Make sure that each ComputeTask and cache Transaction has a timeout set " +
"in order to prevent parties from waiting forever in case of network issues " +
"[nodeId=" + node.id() + ", addrs=" + addrs + ']';
if (errs == null)
errs = new IgniteCheckedException(msg, e);
else
errs.addSuppressed(new IgniteCheckedException(msg, e));
break;
}
}
catch (ClusterTopologyCheckedException e) {
throw e;
}
catch (Exception e) {
// Most probably IO error on socket connect or handshake.
if (ses != null) {
ses.close();
ses = null;
}
onException("Client creation failed [addr=" + addr + ", err=" + e + ']', e);
if (log.isDebugEnabled())
log.debug("Client creation failed [addr=" + addr + ", err=" + e + ']');
if (X.hasCause(e, "Too many open files", SocketException.class))
throw new IgniteTooManyOpenFilesException(e);
// check if timeout occured in case of unrecoverable exception
if (connTimeoutStgy.checkTimeout()) {
U.warn(log, "Connection timed out (will stop attempts to perform the connect) " +
"[node=" + node.id() + ", connTimeoutStgy=" + connTimeoutStgy +
", failureDetectionTimeoutEnabled=" + failureDetectionTimeoutEnabled() +
", timeout=" + timeout +
", err=" + e.getMessage() + ", addr=" + addr + ']');
String msg = "Failed to connect to node (is node still alive?). " +
"Make sure that each ComputeTask and cache Transaction has a timeout set " +
"in order to prevent parties from waiting forever in case of network issues " +
"[nodeId=" + node.id() + ", addrs=" + addrs + ']';
if (errs == null)
errs = new IgniteCheckedException(msg, e);
else
errs.addSuppressed(new IgniteCheckedException(msg, e));
break;
}
if (isRecoverableException(e))
U.sleep(DFLT_RECONNECT_DELAY);
else {
String msg = "Failed to connect to node due to unrecoverable exception (is node still alive?). " +
"Make sure that each ComputeTask and cache Transaction has a timeout set " +
"in order to prevent parties from waiting forever in case of network issues " +
"[nodeId=" + node.id() + ", addrs=" + addrs + ", err= " + e + ']';
if (errs == null)
errs = new IgniteCheckedException(msg, e);
else
errs.addSuppressed(new IgniteCheckedException(msg, e));
break;
}
}
finally {
connectGate.leave();
}
CommunicationWorker commWorker0 = commWorker;
if (commWorker0 != null && commWorker0.runner() == Thread.currentThread())
commWorker0.updateHeartbeat();
}
if (ses != null)
break;
}
if (ses == null)
processSessionCreationError(node, addrs, errs == null ? new IgniteCheckedException("No session found") : errs);
return ses;
}
/**
* Opens a socket channel.
*
* @return A new socket channel.
* @throws IOException If an I/O error occurs.
*/
protected SocketChannel openSocketChannel() throws IOException {
return SocketChannel.open();
}
/**
* Closing connections to node.
* NOTE: It is recommended only for tests.
*
* @param nodeId Node for which to close connections.
* @throws IgniteCheckedException If occurs.
*/
void closeConnections(UUID nodeId) throws IgniteCheckedException {
GridCommunicationClient[] clients = this.clients.remove(nodeId);
if (nonNull(clients)) {
for (GridCommunicationClient client : clients)
client.forceClose();
}
for (ConnectionKey connKey : clientFuts.keySet()) {
if (!nodeId.equals(connKey.nodeId()))
continue;
GridFutureAdapter<GridCommunicationClient> fut = clientFuts.remove(connKey);
if (nonNull(fut))
fut.get().forceClose();
}
}
/**
* Check is passed socket address belong to current node. This method should return true only if the passed
* in address represent an address which will result in a connection to the local node.
*
* @param addr address to check.
* @return true if passed address belongs to local node, otherwise false.
*/
private boolean isLocalNodeAddress(InetSocketAddress addr) {
return addr.getPort() == boundTcpPort
&& (locHost.equals(addr.getAddress())
|| addr.getAddress().isAnyLocalAddress()
|| (locHost.isAnyLocalAddress() && U.isLocalAddress(addr.getAddress())));
}
/**
* Process errors if TCP/IP {@link GridNioSession} creation to remote node hasn't been performed.
*
* @param node Remote node.
* @param addrs Remote node addresses.
* @param errs TCP client creation errors.
* @throws IgniteCheckedException If failed.
*/
protected void processSessionCreationError(
ClusterNode node,
Collection<InetSocketAddress> addrs,
IgniteCheckedException errs
) throws IgniteCheckedException {
assert errs != null;
boolean commErrResolve = false;
IgniteSpiContext ctx = getSpiContext();
if (isRecoverableException(errs) && ctx.communicationFailureResolveSupported()) {
commErrResolve = true;
ctx.resolveCommunicationFailure(node, errs);
}
if (!commErrResolve && enableForcibleNodeKill) {
if (ctx.node(node.id()) != null
&& node.isClient()
&& !getLocalNode().isClient()
&& isRecoverableException(errs)
) {
// Only server can fail client for now, as in TcpDiscovery resolveCommunicationFailure() is not supported.
String msg = "TcpCommunicationSpi failed to establish connection to node, node will be dropped from " +
"cluster [" + "rmtNode=" + node + ']';
if (enableTroubleshootingLog)
U.error(log, msg, errs);
else
U.warn(log, msg);
ctx.failNode(node.id(), "TcpCommunicationSpi failed to establish connection to node [" +
"rmtNode=" + node +
", errs=" + errs +
", connectErrs=" + X.getSuppressedList(errs) + ']');
}
}
throw errs;
}
/**
* @param errs Error.
* @return {@code True} if error was caused by some connection IO error or IgniteCheckedException due to timeout.
*/
private boolean isRecoverableException(Exception errs) {
return X.hasCause(
errs,
IOException.class,
HandshakeException.class,
IgniteSpiOperationTimeoutException.class
);
}
/** */
private IgniteSpiOperationTimeoutException handshakeTimeoutException() {
return new IgniteSpiOperationTimeoutException("Failed to perform handshake due to timeout " +
"(consider increasing 'connectionTimeout' configuration property).");
}
/**
* Performs handshake in timeout-safe way.
*
* @param client Client.
* @param rmtNodeId Remote node.
* @param timeout Timeout for handshake.
* @throws IgniteCheckedException If handshake failed or wasn't completed withing timeout.
*/
@SuppressWarnings("ThrowFromFinallyBlock")
private void safeShmemHandshake(
GridCommunicationClient client,
UUID rmtNodeId,
long timeout
) throws IgniteCheckedException {
HandshakeTimeoutObject<GridCommunicationClient> obj = new HandshakeTimeoutObject<>(client,
U.currentTimeMillis() + timeout);
addTimeoutObject(obj);
try {
client.doHandshake(new HandshakeClosure(rmtNodeId));
}
finally {
if (obj.cancel())
removeTimeoutObject(obj);
else
throw handshakeTimeoutException();
}
}
/**
* Performs handshake in timeout-safe way.
*
* @param ch Socket channel.
* @param rmtNodeId Remote node.
* @param timeout Timeout for handshake.
* @param sslMeta Session meta.
* @param msg {@link HandshakeMessage} or {@link HandshakeMessage2} to send.
* @return Handshake response.
* @throws IgniteCheckedException If handshake failed or wasn't completed withing timeout.
*/
@SuppressWarnings("ThrowFromFinallyBlock")
private long safeTcpHandshake(
SocketChannel ch,
UUID rmtNodeId,
long timeout,
GridSslMeta sslMeta,
HandshakeMessage msg
) throws IgniteCheckedException {
HandshakeTimeoutObject obj = new HandshakeTimeoutObject<>(ch, U.currentTimeMillis() + timeout);
addTimeoutObject(obj);
long rcvCnt;
try {
BlockingSslHandler sslHnd = null;
ByteBuffer buf;
// Step 1. Get remote node response with the remote nodeId value.
if (isSslEnabled()) {
assert sslMeta != null;
sslHnd = new BlockingSslHandler(sslMeta.sslEngine(), ch, directBuf, ByteOrder.LITTLE_ENDIAN, log);
if (!sslHnd.handshake())
throw new HandshakeException("SSL handshake is not completed.");
ByteBuffer handBuff = sslHnd.applicationBuffer();
if (handBuff.remaining() >= DIRECT_TYPE_SIZE) {
short msgType = makeMessageType(handBuff.get(0), handBuff.get(1));
if (msgType == HANDSHAKE_WAIT_MSG_TYPE)
return NEED_WAIT;
}
if (handBuff.remaining() < NodeIdMessage.MESSAGE_FULL_SIZE) {
buf = ByteBuffer.allocate(1000);
int read = ch.read(buf);
if (read == -1)
throw new HandshakeException("Failed to read remote node ID (connection closed).");
buf.flip();
buf = sslHnd.decode(buf);
if (handBuff.remaining() >= DIRECT_TYPE_SIZE) {
short msgType = makeMessageType(handBuff.get(0), handBuff.get(1));
if (msgType == HANDSHAKE_WAIT_MSG_TYPE)
return NEED_WAIT;
}
}
else
buf = handBuff;
}
else {
buf = ByteBuffer.allocate(NodeIdMessage.MESSAGE_FULL_SIZE);
for (int i = 0; i < NodeIdMessage.MESSAGE_FULL_SIZE; ) {
int read = ch.read(buf);
if (read == -1)
throw new HandshakeException("Failed to read remote node ID (connection closed).");
if (read >= DIRECT_TYPE_SIZE) {
short msgType = makeMessageType(buf.get(0), buf.get(1));
if (msgType == HANDSHAKE_WAIT_MSG_TYPE)
return NEED_WAIT;
}
i += read;
}
}
UUID rmtNodeId0 = U.bytesToUuid(buf.array(), Message.DIRECT_TYPE_SIZE);
if (!rmtNodeId.equals(rmtNodeId0))
throw new HandshakeException("Remote node ID is not as expected [expected=" + rmtNodeId +
", rcvd=" + rmtNodeId0 + ']');
else if (log.isDebugEnabled())
log.debug("Received remote node ID: " + rmtNodeId0);
if (isSslEnabled()) {
assert sslHnd != null;
U.writeFully(ch, sslHnd.encrypt(ByteBuffer.wrap(U.IGNITE_HEADER)));
}
else
U.writeFully(ch, ByteBuffer.wrap(U.IGNITE_HEADER));
// Step 2. Prepare Handshake message to send to the remote node.
if (log.isDebugEnabled())
log.debug("Writing handshake message [rmtNode=" + rmtNodeId + ", msg=" + msg + ']');
buf = ByteBuffer.allocate(msg.getMessageSize());
buf.order(ByteOrder.LITTLE_ENDIAN);
boolean written = msg.writeTo(buf, null);
assert written;
buf.flip();
if (isSslEnabled()) {
assert sslHnd != null;
U.writeFully(ch, sslHnd.encrypt(buf));
}
else
U.writeFully(ch, buf);
if (log.isDebugEnabled())
log.debug("Waiting for handshake [rmtNode=" + rmtNodeId + ']');
// Step 3. Waiting for response from the remote node with their receive count message.
if (isSslEnabled()) {
assert sslHnd != null;
buf = ByteBuffer.allocate(1000);
buf.order(ByteOrder.LITTLE_ENDIAN);
ByteBuffer decode = ByteBuffer.allocate(2 * buf.capacity());
decode.order(ByteOrder.LITTLE_ENDIAN);
for (int i = 0; i < RecoveryLastReceivedMessage.MESSAGE_FULL_SIZE; ) {
int read = ch.read(buf);
if (read == -1)
throw new HandshakeException("Failed to read remote node recovery handshake " +
"(connection closed).");
buf.flip();
ByteBuffer decode0 = sslHnd.decode(buf);
i += decode0.remaining();
decode = appendAndResizeIfNeeded(decode, decode0);
buf.clear();
}
decode.flip();
rcvCnt = decode.getLong(Message.DIRECT_TYPE_SIZE);
if (decode.limit() > RecoveryLastReceivedMessage.MESSAGE_FULL_SIZE) {
decode.position(RecoveryLastReceivedMessage.MESSAGE_FULL_SIZE);
sslMeta.decodedBuffer(decode);
}
ByteBuffer inBuf = sslHnd.inputBuffer();
if (inBuf.position() > 0)
sslMeta.encodedBuffer(inBuf);
}
else {
buf = ByteBuffer.allocate(RecoveryLastReceivedMessage.MESSAGE_FULL_SIZE);
buf.order(ByteOrder.LITTLE_ENDIAN);
for (int i = 0; i < RecoveryLastReceivedMessage.MESSAGE_FULL_SIZE; ) {
int read = ch.read(buf);
if (read == -1)
throw new HandshakeException("Failed to read remote node recovery handshake " +
"(connection closed).");
i += read;
}
rcvCnt = buf.getLong(Message.DIRECT_TYPE_SIZE);
}
if (log.isDebugEnabled())
log.debug("Received handshake message [rmtNode=" + rmtNodeId + ", rcvCnt=" + rcvCnt + ']');
if (rcvCnt == -1) {
if (log.isDebugEnabled())
log.debug("Connection rejected, will retry client creation [rmtNode=" + rmtNodeId + ']');
}
}
catch (IOException e) {
if (log.isDebugEnabled())
log.debug("Failed to read from channel: " + e);
throw new IgniteCheckedException("Failed to read from channel.", e);
}
finally {
if (obj.cancel())
removeTimeoutObject(obj);
else
throw handshakeTimeoutException();
}
return rcvCnt;
}
/**
* @param sndId Sender ID.
* @param msg Communication message.
* @param msgC Closure to call when message processing finished.
*/
protected void notifyListener(UUID sndId, Message msg, IgniteRunnable msgC) {
CommunicationListener<Message> lsnr = this.lsnr;
MTC.span().addLog(() -> "Communication listeners notified");
if (lsnr != null)
// Notify listener of a new message.
lsnr.onMessage(sndId, msg, msgC);
else if (log.isDebugEnabled())
log.debug("Received communication message without any registered listeners (will ignore, " +
"is node stopping?) [senderNodeId=" + sndId + ", msg=" + msg + ']');
}
/**
* @param nodeId The remote node id.
* @param channel The configured channel to notify listeners with.
* @param initMsg Channel initialization message with additional channel params.
*/
private void notifyChannelEvtListener(UUID nodeId, Channel channel, Message initMsg) {
if (log.isDebugEnabled())
log.debug("Notify appropriate listeners due to a new channel opened: " + channel);
CommunicationListener<Message> lsnr0 = lsnr;
if (lsnr0 instanceof CommunicationListenerEx)
((CommunicationListenerEx<Message>)lsnr0).onChannelOpened(nodeId, initMsg, channel);
}
/**
* @param target Target buffer to append to.
* @param src Source buffer to get data.
* @return Original or expanded buffer.
*/
private ByteBuffer appendAndResizeIfNeeded(ByteBuffer target, ByteBuffer src) {
if (target.remaining() < src.remaining()) {
int newSize = Math.max(target.capacity() * 2, target.capacity() + src.remaining());
ByteBuffer tmp = ByteBuffer.allocate(newSize);
tmp.order(target.order());
target.flip();
tmp.put(target);
target = tmp;
}
target.put(src);
return target;
}
/**
* Stops service threads to simulate node failure.
*
* FOR TEST PURPOSES ONLY!!!
*/
public void simulateNodeFailure() {
if (nioSrvr != null)
nioSrvr.stop();
if (commWorker != null)
U.interrupt(commWorker.runner());
U.join(commWorker, log);
for (GridCommunicationClient[] clients0 : clients.values()) {
for (GridCommunicationClient client : clients0) {
if (client != null)
client.forceClose();
}
}
}
/**
* @param node Node.
* @param key Connection key.
* @return Recovery descriptor for outgoing connection.
*/
private GridNioRecoveryDescriptor outRecoveryDescriptor(ClusterNode node, ConnectionKey key) {
if (usePairedConnections(node))
return recoveryDescriptor(outRecDescs, true, node, key);
else
return recoveryDescriptor(recoveryDescs, false, node, key);
}
/**
* @param node Node.
* @param key Connection key.
* @return Recovery descriptor for incoming connection.
*/
private GridNioRecoveryDescriptor inRecoveryDescriptor(ClusterNode node, ConnectionKey key) {
if (usePairedConnections(node))
return recoveryDescriptor(inRecDescs, true, node, key);
else
return recoveryDescriptor(recoveryDescs, false, node, key);
}
/**
* @param node Node.
* @return {@code True} if can use in/out connection pair for communication.
*/
private boolean usePairedConnections(ClusterNode node) {
if (usePairedConnections) {
Boolean attr = node.attribute(createSpiAttributeName(ATTR_PAIRED_CONN));
return attr != null && attr;
}
return false;
}
/**
* @param key The connection key to cleanup descriptors on local node.
*/
private void cleanupLocalNodeRecoveryDescriptor(ConnectionKey key) {
ClusterNode node = getLocalNode();
if (usePairedConnections(node)) {
inRecDescs.remove(key);
outRecDescs.remove(key);
}
else
recoveryDescs.remove(key);
}
/**
* @param recoveryDescs Descriptors map.
* @param pairedConnections {@code True} if in/out connections pair is used for communication with node.
* @param node Node.
* @param key Connection key.
* @return Recovery receive data for given node.
*/
private GridNioRecoveryDescriptor recoveryDescriptor(
ConcurrentMap<ConnectionKey, GridNioRecoveryDescriptor> recoveryDescs,
boolean pairedConnections,
ClusterNode node,
ConnectionKey key) {
GridNioRecoveryDescriptor recovery = recoveryDescs.get(key);
if (recovery == null) {
if (log.isDebugEnabled())
log.debug("Missing recovery descriptor for the node (will create a new one) " +
"[locNodeId=" + getLocalNode().id() +
", key=" + key + ", rmtNode=" + node + ']');
int maxSize = Math.max(msgQueueLimit, ackSndThreshold);
int queueLimit = unackedMsgsBufSize != 0 ? unackedMsgsBufSize : (maxSize * 128);
GridNioRecoveryDescriptor old = recoveryDescs.putIfAbsent(key,
recovery = new GridNioRecoveryDescriptor(pairedConnections, queueLimit, node, log));
if (old != null) {
recovery = old;
if (log.isDebugEnabled())
log.debug("Will use existing recovery descriptor: " + recovery);
}
else {
if (log.isDebugEnabled())
log.debug("Initialized recovery descriptor [desc=" + recovery + ", maxSize=" + maxSize +
", queueLimit=" + queueLimit + ']');
}
}
return recovery;
}
/**
* @param msg Error message.
* @param e Exception.
*/
private void onException(String msg, Exception e) {
getExceptionRegistry().onException(msg, e);
}
/**
* @return Node ID message.
*/
private NodeIdMessage nodeIdMessage() {
final UUID locNodeId = (ignite instanceof IgniteEx) ? ((IgniteEx)ignite).context().localNodeId() :
safeLocalNodeId();
return new NodeIdMessage(locNodeId);
}
/**
* @return Local node ID.
*/
private UUID safeLocalNodeId() {
ClusterNode locNode = getLocalNode();
UUID id;
if (locNode == null) {
U.warn(log, "Local node is not started or fully initialized [isStopping=" +
getSpiContext().isStopping() + ']');
id = new UUID(0, 0);
}
else
id = locNode.id();
return id;
}
/** {@inheritDoc} */
@Override public TcpCommunicationSpi setName(String name) {
super.setName(name);
return this;
}
/**
* Checks whether remote nodes support {@link HandshakeWaitMessage}.
*
* @return {@code True} if remote nodes support {@link HandshakeWaitMessage}.
*/
private boolean isHandshakeWaitSupported() {
DiscoverySpi discoSpi = ignite().configuration().getDiscoverySpi();
if (discoSpi instanceof IgniteDiscoverySpi)
return ((IgniteDiscoverySpi)discoSpi).allNodesSupport(IgniteFeatures.TCP_COMMUNICATION_SPI_HANDSHAKE_WAIT_MESSAGE);
else {
Collection<ClusterNode> nodes = discoSpi.getRemoteNodes();
return IgniteFeatures.allNodesSupports(nodes, IgniteFeatures.TCP_COMMUNICATION_SPI_HANDSHAKE_WAIT_MESSAGE);
}
}
/** {@inheritDoc} */
@Override public String toString() {
return S.toString(TcpCommunicationSpi.class, this);
}
/**
* This worker takes responsibility to shut the server down when stopping,
* No other thread shall stop passed server.
*/
private class ShmemAcceptWorker extends GridWorker {
/** */
private final IpcSharedMemoryServerEndpoint srv;
/**
* @param srv Server.
*/
ShmemAcceptWorker(IpcSharedMemoryServerEndpoint srv) {
super(igniteInstanceName, "shmem-communication-acceptor", TcpCommunicationSpi.this.log);
this.srv = srv;
}
/** {@inheritDoc} */
@Override protected void body() throws InterruptedException {
try {
while (!Thread.interrupted()) {
ShmemWorker e = new ShmemWorker(srv.accept());
shmemWorkers.add(e);
new IgniteThread(e).start();
}
}
catch (IgniteCheckedException e) {
if (!isCancelled())
U.error(log, "Shmem server failed.", e);
}
finally {
srv.close();
}
}
/** {@inheritDoc} */
@Override public void cancel() {
super.cancel();
srv.close();
}
}
/**
* Write message type to output stream.
*
* @param os Output stream.
* @param type Message type.
* @throws IOException On error.
*/
private static void writeMessageType(OutputStream os, short type) throws IOException {
os.write((byte)(type & 0xFF));
os.write((byte)((type >> 8) & 0xFF));
}
/**
* Write message type to byte buffer.
*
* @param buf Byte buffer.
* @param type Message type.
*/
public static void writeMessageType(ByteBuffer buf, short type) {
buf.put((byte)(type & 0xFF));
buf.put((byte)((type >> 8) & 0xFF));
}
/**
* Concatenates the two parameter bytes to form a message type value.
*
* @param b0 The first byte.
* @param b1 The second byte.
*/
public static short makeMessageType(byte b0, byte b1) {
return (short)((b1 & 0xFF) << 8 | b0 & 0xFF);
}
/**
* @param ignite Ignite.
*/
private static WorkersRegistry getWorkersRegistry(Ignite ignite) {
return ignite instanceof IgniteEx ? ((IgniteEx)ignite).context().workersRegistry() : null;
}
/**
* @param remote Destination cluster node to communicate with.
* @param initMsg Configuration channel attributes wrapped into the message.
* @return The future, which will be finished on channel ready.
* @throws IgniteSpiException If fails.
*/
public IgniteInternalFuture<Channel> openChannel(
ClusterNode remote,
Message initMsg
) throws IgniteSpiException {
assert !remote.isLocal() : remote;
assert initMsg != null;
assert chConnPlc != null;
assert nodeSupports(remote, CHANNEL_COMMUNICATION) : "Node doesn't support direct connection over socket channel " +
"[nodeId=" + remote.id() + ']';
ConnectionKey key = new ConnectionKey(remote.id(), chConnPlc.connectionIndex());
GridFutureAdapter<Channel> chFut = new GridFutureAdapter<>();
connectGate.enter();
try {
GridNioSession ses = createNioSession(remote, key.connectionIndex());
assert ses != null : "Session must be established [remoteId=" + remote.id() + ", key=" + key + ']';
cleanupLocalNodeRecoveryDescriptor(key);
ses.addMeta(CHANNEL_FUT_META, chFut);
// Send configuration message over the created session.
ses.send(initMsg)
.listen(f -> {
if (f.error() != null) {
GridFutureAdapter<Channel> rq = ses.meta(CHANNEL_FUT_META);
assert rq != null;
rq.onDone(f.error());
ses.close();
return;
}
addTimeoutObject(new IgniteSpiTimeoutObject() {
@Override public IgniteUuid id() {
return IgniteUuid.randomUuid();
}
@Override public long endTime() {
return U.currentTimeMillis() + connTimeout;
}
@Override public void onTimeout() {
// Close session if request not complete yet.
GridFutureAdapter<Channel> rq = ses.meta(CHANNEL_FUT_META);
assert rq != null;
if (rq.onDone(handshakeTimeoutException()))
ses.close();
}
});
});
return chFut;
}
catch (IgniteCheckedException e) {
throw new IgniteSpiException("Unable to create new channel connection to the remote node: " + remote, e);
}
finally {
connectGate.leave();
}
}
/**
* @param connIdx Connection index to check.
* @return {@code true} if connection index is related to the channel create request\response.
*/
private boolean isChannelConnIdx(int connIdx) {
return connIdx > MAX_CONN_PER_NODE;
}
/**
*
*/
private class DiscoveryListener implements GridLocalEventListener, HighPriorityListener {
/** {@inheritDoc} */
@Override public void onEvent(Event evt) {
assert evt instanceof DiscoveryEvent : evt;
assert evt.type() == EVT_NODE_LEFT || evt.type() == EVT_NODE_FAILED;
ClusterNode node = ((DiscoveryEvent)evt).eventNode();
onNodeLeft(node.consistentId(), node.id());
}
/** {@inheritDoc} */
@Override public int order() {
return 0;
}
}
/**
*
*/
private class ShmemWorker extends GridWorker {
/** */
private final IpcEndpoint endpoint;
/**
* @param endpoint Endpoint.
*/
private ShmemWorker(IpcEndpoint endpoint) {
super(igniteInstanceName, "shmem-worker", TcpCommunicationSpi.this.log);
this.endpoint = endpoint;
}
/** {@inheritDoc} */
@Override protected void body() throws InterruptedException {
try {
MessageFactory msgFactory = new MessageFactory() {
private MessageFactory impl;
@Nullable @Override public Message create(short type) {
if (impl == null)
impl = getSpiContext().messageFactory();
assert impl != null;
return impl.create(type);
}
};
GridNioMessageWriterFactory writerFactory = new GridNioMessageWriterFactory() {
private MessageFormatter formatter;
@Override public MessageWriter writer(GridNioSession ses) throws IgniteCheckedException {
if (formatter == null)
formatter = getSpiContext().messageFormatter();
assert formatter != null;
ConnectionKey connKey = ses.meta(CONN_IDX_META);
return connKey != null ? formatter.writer(connKey.nodeId()) : null;
}
};
GridNioMessageReaderFactory readerFactory = new GridNioMessageReaderFactory() {
private MessageFormatter formatter;
@Override public MessageReader reader(GridNioSession ses, MessageFactory msgFactory)
throws IgniteCheckedException {
if (formatter == null)
formatter = getSpiContext().messageFormatter();
assert formatter != null;
ConnectionKey connKey = ses.meta(CONN_IDX_META);
return connKey != null ? formatter.reader(connKey.nodeId(), msgFactory) : null;
}
};
IpcToNioAdapter<Message> adapter = new IpcToNioAdapter<>(
metricsLsnr.metricRegistry(),
log,
endpoint,
srvLsnr,
writerFactory,
new GridNioTracerFilter(log, tracing),
new GridNioCodecFilter(
new GridDirectParser(log.getLogger(GridDirectParser.class), msgFactory, readerFactory),
log,
true),
new GridConnectionBytesVerifyFilter(log)
);
adapter.serve();
}
finally {
shmemWorkers.remove(this);
endpoint.close();
}
}
/** {@inheritDoc} */
@Override public void cancel() {
super.cancel();
endpoint.close();
}
/** {@inheritDoc} */
@Override protected void cleanup() {
super.cleanup();
endpoint.close();
}
/** {@inheritDoc} */
@Override public String toString() {
return S.toString(ShmemWorker.class, this);
}
}
/**
*
*/
private class CommunicationWorker extends GridWorker {
/** */
private final BlockingQueue<DisconnectedSessionInfo> q = new LinkedBlockingQueue<>();
/**
* @param igniteInstanceName Ignite instance name.
* @param log Logger.
*/
private CommunicationWorker(String igniteInstanceName, IgniteLogger log) {
super(igniteInstanceName, "tcp-comm-worker", log, getWorkersRegistry(ignite));
}
/** */
@Override protected void body() throws InterruptedException {
if (log.isDebugEnabled())
log.debug("Tcp communication worker has been started.");
Throwable err = null;
try {
while (!isCancelled()) {
DisconnectedSessionInfo disconnectData;
blockingSectionBegin();
try {
disconnectData = q.poll(idleConnTimeout, TimeUnit.MILLISECONDS);
}
finally {
blockingSectionEnd();
}
if (disconnectData != null)
processDisconnect(disconnectData);
else
processIdle();
onIdle();
}
}
catch (Throwable t) {
if (!(t instanceof InterruptedException))
err = t;
throw t;
}
finally {
if (ignite instanceof IgniteEx) {
if (err == null && !stopping)
err = new IllegalStateException("Thread " + getName() + " is terminated unexpectedly.");
if (err instanceof OutOfMemoryError)
((IgniteEx)ignite).context().failure().process(new FailureContext(CRITICAL_ERROR, err));
else if (err != null)
((IgniteEx)ignite).context().failure().process(
new FailureContext(SYSTEM_WORKER_TERMINATION, err));
}
}
}
/**
*
*/
private void processIdle() {
cleanupRecovery();
for (Map.Entry<UUID, GridCommunicationClient[]> e : clients.entrySet()) {
UUID nodeId = e.getKey();
for (GridCommunicationClient client : e.getValue()) {
if (client == null)
continue;
ClusterNode node = getSpiContext().node(nodeId);
if (node == null) {
if (log.isDebugEnabled())
log.debug("Forcing close of non-existent node connection: " + nodeId);
client.forceClose();
removeNodeClient(nodeId, client);
continue;
}
GridNioRecoveryDescriptor recovery = null;
if (!usePairedConnections(node) && client instanceof GridTcpNioCommunicationClient) {
recovery = recoveryDescs.get(new ConnectionKey(
node.id(), client.connectionIndex(), -1)
);
if (recovery != null && recovery.lastAcknowledged() != recovery.received()) {
RecoveryLastReceivedMessage msg = new RecoveryLastReceivedMessage(recovery.received());
if (log.isDebugEnabled())
log.debug("Send recovery acknowledgement on timeout [rmtNode=" + nodeId +
", rcvCnt=" + msg.received() + ']');
try {
nioSrvr.sendSystem(((GridTcpNioCommunicationClient)client).session(), msg);
recovery.lastAcknowledged(msg.received());
}
catch (IgniteCheckedException err) {
U.error(log, "Failed to send message: " + err, err);
}
continue;
}
}
long idleTime = client.getIdleTime();
if (idleTime >= idleConnTimeout) {
if (recovery == null && usePairedConnections(node))
recovery = outRecDescs.get(new ConnectionKey(
node.id(), client.connectionIndex(), -1)
);
if (recovery != null &&
recovery.nodeAlive(getSpiContext().node(nodeId)) &&
!recovery.messagesRequests().isEmpty()) {
if (log.isDebugEnabled())
log.debug("Node connection is idle, but there are unacknowledged messages, " +
"will wait: " + nodeId);
continue;
}
if (log.isDebugEnabled())
log.debug("Closing idle node connection: " + nodeId);
if (client.close() || client.closed())
removeNodeClient(nodeId, client);
}
}
}
for (GridNioSession ses : nioSrvr.sessions()) {
GridNioRecoveryDescriptor recovery = ses.inRecoveryDescriptor();
if (recovery != null && usePairedConnections(recovery.node())) {
assert ses.accepted() : ses;
sendAckOnTimeout(recovery, ses);
}
}
}
/**
* @param recovery Recovery descriptor.
* @param ses Session.
*/
private void sendAckOnTimeout(GridNioRecoveryDescriptor recovery, GridNioSession ses) {
if (recovery != null && recovery.lastAcknowledged() != recovery.received()) {
RecoveryLastReceivedMessage msg = new RecoveryLastReceivedMessage(recovery.received());
if (log.isDebugEnabled()) {
log.debug("Send recovery acknowledgement on timeout [rmtNode=" + recovery.node().id() +
", rcvCnt=" + msg.received() +
", lastAcked=" + recovery.lastAcknowledged() + ']');
}
try {
nioSrvr.sendSystem(ses, msg);
recovery.lastAcknowledged(msg.received());
}
catch (IgniteCheckedException e) {
U.error(log, "Failed to send message: " + e, e);
}
}
}
/**
*
*/
private void cleanupRecovery() {
cleanupRecovery(recoveryDescs);
cleanupRecovery(inRecDescs);
cleanupRecovery(outRecDescs);
}
/**
* @param recoveryDescs Recovery descriptors to cleanup.
*/
private void cleanupRecovery(ConcurrentMap<ConnectionKey, GridNioRecoveryDescriptor> recoveryDescs) {
Set<ConnectionKey> left = null;
for (Map.Entry<ConnectionKey, GridNioRecoveryDescriptor> e : recoveryDescs.entrySet()) {
if (left != null && left.contains(e.getKey()))
continue;
GridNioRecoveryDescriptor recoveryDesc = e.getValue();
if (!recoveryDesc.nodeAlive(getSpiContext().node(e.getKey().nodeId()))) {
if (left == null)
left = new HashSet<>();
left.add(e.getKey());
}
}
if (left != null) {
assert !left.isEmpty();
for (ConnectionKey id : left) {
GridNioRecoveryDescriptor recoveryDesc = recoveryDescs.get(id);
if (recoveryDesc != null && recoveryDesc.onNodeLeft())
recoveryDescs.remove(id, recoveryDesc);
}
}
}
/**
* @param sesInfo Disconnected session information.
*/
private void processDisconnect(DisconnectedSessionInfo sesInfo) {
GridNioRecoveryDescriptor recoveryDesc = sesInfo.recoveryDesc;
ClusterNode node = recoveryDesc.node();
if (!recoveryDesc.nodeAlive(getSpiContext().node(node.id())))
return;
try {
if (log.isDebugEnabled())
log.debug("Recovery reconnect [rmtNode=" + recoveryDesc.node().id() + ']');
GridCommunicationClient client = reserveClient(node, sesInfo.connIdx);
client.release();
}
catch (ClusterTopologyCheckedException e) {
if (log.isDebugEnabled())
log.debug("Recovery reconnect failed, node stopping [rmtNode=" + recoveryDesc.node().id() + ']');
}
catch (IgniteTooManyOpenFilesException e) {
onException(e.getMessage(), e);
throw e;
}
catch (IgniteCheckedException | IgniteException e) {
try {
if (recoveryDesc.nodeAlive(getSpiContext().node(node.id())) && getSpiContext().pingNode(node.id())) {
if (log.isDebugEnabled())
log.debug("Recovery reconnect failed, will retry " +
"[rmtNode=" + recoveryDesc.node().id() + ", err=" + e + ']');
addProcessDisconnectRequest(sesInfo);
}
else {
if (log.isDebugEnabled())
log.debug("Recovery reconnect failed, " +
"node left [rmtNode=" + recoveryDesc.node().id() + ", err=" + e + ']');
onException("Recovery reconnect failed, node left [rmtNode=" + recoveryDesc.node().id() + "]",
e);
}
}
catch (IgniteClientDisconnectedException ignored) {
if (log.isDebugEnabled())
log.debug("Failed to ping node, client disconnected.");
}
}
}
/**
* @param sesInfo Disconnected session information.
*/
void addProcessDisconnectRequest(DisconnectedSessionInfo sesInfo) {
boolean add = q.add(sesInfo);
assert add;
}
}
/**
*
*/
private static class ConnectFuture extends GridFutureAdapter<GridCommunicationClient> {
// No-op.
}
/**
*
*/
private static class HandshakeTimeoutObject<T> implements IgniteSpiTimeoutObject {
/** */
private final IgniteUuid id = IgniteUuid.randomUuid();
/** */
private final T obj;
/** */
private final long endTime;
/** */
private final AtomicBoolean done = new AtomicBoolean();
/**
* @param obj Client.
* @param endTime End time.
*/
private HandshakeTimeoutObject(T obj, long endTime) {
assert obj != null;
assert obj instanceof GridCommunicationClient || obj instanceof SelectableChannel;
assert endTime > 0;
this.obj = obj;
this.endTime = endTime;
}
/**
* @return {@code True} if object has not yet been timed out.
*/
boolean cancel() {
return done.compareAndSet(false, true);
}
/** {@inheritDoc} */
@Override public void onTimeout() {
if (done.compareAndSet(false, true)) {
// Close socket - timeout occurred.
if (obj instanceof GridCommunicationClient)
((GridCommunicationClient)obj).forceClose();
else
U.closeQuiet((AutoCloseable)obj);
}
}
/** {@inheritDoc} */
@Override public long endTime() {
return endTime;
}
/** {@inheritDoc} */
@Override public IgniteUuid id() {
return id;
}
/** {@inheritDoc} */
@Override public String toString() {
return S.toString(HandshakeTimeoutObject.class, this);
}
}
/**
*
*/
private class HandshakeClosure extends IgniteInClosure2X<InputStream, OutputStream> {
/** */
private static final long serialVersionUID = 0L;
/** */
private final UUID rmtNodeId;
/**
* @param rmtNodeId Remote node ID.
*/
private HandshakeClosure(UUID rmtNodeId) {
this.rmtNodeId = rmtNodeId;
}
/** {@inheritDoc} */
@Override public void applyx(InputStream in, OutputStream out) throws IgniteCheckedException {
try {
// Handshake.
byte[] b = new byte[NodeIdMessage.MESSAGE_FULL_SIZE];
int n = 0;
while (n < NodeIdMessage.MESSAGE_FULL_SIZE) {
int cnt = in.read(b, n, NodeIdMessage.MESSAGE_FULL_SIZE - n);
if (cnt < 0)
throw new IgniteCheckedException("Failed to get remote node ID (end of stream reached)");
n += cnt;
}
// First 4 bytes are for length.
UUID id = U.bytesToUuid(b, Message.DIRECT_TYPE_SIZE);
if (!rmtNodeId.equals(id))
throw new IgniteCheckedException("Remote node ID is not as expected [expected=" + rmtNodeId +
", rcvd=" + id + ']');
else if (log.isDebugEnabled())
log.debug("Received remote node ID: " + id);
}
catch (SocketTimeoutException e) {
throw new IgniteCheckedException("Failed to perform handshake due to timeout (consider increasing " +
"'connectionTimeout' configuration property).", e);
}
catch (IOException e) {
throw new IgniteCheckedException("Failed to perform handshake.", e);
}
try {
ClusterNode localNode = getLocalNode();
if (localNode == null)
throw new IgniteSpiException("Local node has not been started or fully initialized " +
"[isStopping=" + getSpiContext().isStopping() + ']');
UUID id = localNode.id();
NodeIdMessage msg = new NodeIdMessage(id);
out.write(U.IGNITE_HEADER);
writeMessageType(out, NODE_ID_MSG_TYPE);
out.write(msg.nodeIdBytes());
out.flush();
if (log.isDebugEnabled())
log.debug("Sent local node ID [locNodeId=" + id + ", rmtNodeId=" + rmtNodeId + ']');
}
catch (IOException e) {
throw new IgniteCheckedException("Failed to perform handshake.", e);
}
}
}
/**
*
*/
private static class ConnectGateway {
/** */
private GridSpinReadWriteLock lock = new GridSpinReadWriteLock();
/** */
private IgniteException err;
/**
*
*/
void enter() {
lock.readLock();
if (err != null) {
lock.readUnlock();
throw err;
}
}
/**
* @return {@code True} if entered gateway.
*/
boolean tryEnter() {
lock.readLock();
boolean res = err == null;
if (!res)
lock.readUnlock();
return res;
}
/**
*
*/
void leave() {
lock.readUnlock();
}
/**
* @param reconnectFut Reconnect future.
*/
void disconnected(IgniteFuture<?> reconnectFut) {
lock.writeLock();
err = new IgniteClientDisconnectedException(reconnectFut, "Failed to connect, client node disconnected.");
lock.writeUnlock();
}
/**
*
*/
void reconnected() {
lock.writeLock();
try {
if (err instanceof IgniteClientDisconnectedException)
err = null;
}
finally {
lock.writeUnlock();
}
}
/**
*
*/
void stopped() {
lock.readLock();
err = new IgniteException("Failed to connect, node stopped.");
lock.readUnlock();
}
}
/**
*
*/
private static class DisconnectedSessionInfo {
/** */
private final GridNioRecoveryDescriptor recoveryDesc;
/** */
private int connIdx;
/**
* @param recoveryDesc Recovery descriptor.
* @param connIdx Connection index.
*/
DisconnectedSessionInfo(@Nullable GridNioRecoveryDescriptor recoveryDesc, int connIdx) {
this.recoveryDesc = recoveryDesc;
this.connIdx = connIdx;
}
/** {@inheritDoc} */
@Override public String toString() {
return S.toString(DisconnectedSessionInfo.class, this);
}
}
/**
*
*/
interface ConnectionPolicy {
/**
* @return Thread connection index.
*/
int connectionIndex();
}
/** */
private static class FirstConnectionPolicy implements ConnectionPolicy {
/** {@inheritDoc} */
@Override public int connectionIndex() {
return 0;
}
}
/** */
private class RoundRobinConnectionPolicy implements ConnectionPolicy {
/** {@inheritDoc} */
@Override public int connectionIndex() {
return (int)(U.safeAbs(Thread.currentThread().getId()) % connectionsPerNode);
}
}
/**
* MBean implementation for TcpCommunicationSpi.
*/
private class TcpCommunicationSpiMBeanImpl extends IgniteSpiMBeanAdapter implements TcpCommunicationSpiMBean {
/** {@inheritDoc} */
TcpCommunicationSpiMBeanImpl(IgniteSpiAdapter spiAdapter) {
super(spiAdapter);
}
/** {@inheritDoc} */
@Override public String getLocalAddress() {
return TcpCommunicationSpi.this.getLocalAddress();
}
/** {@inheritDoc} */
@Override public int getLocalPort() {
return TcpCommunicationSpi.this.getLocalPort();
}
/** {@inheritDoc} */
@Override public int getLocalPortRange() {
return TcpCommunicationSpi.this.getLocalPortRange();
}
/** {@inheritDoc} */
@Override public boolean isUsePairedConnections() {
return TcpCommunicationSpi.this.isUsePairedConnections();
}
/** {@inheritDoc} */
@Override public int getConnectionsPerNode() {
return TcpCommunicationSpi.this.getConnectionsPerNode();
}
/** {@inheritDoc} */
@Override public int getSharedMemoryPort() {
return TcpCommunicationSpi.this.getSharedMemoryPort();
}
/** {@inheritDoc} */
@Override public long getIdleConnectionTimeout() {
return TcpCommunicationSpi.this.getIdleConnectionTimeout();
}
/** {@inheritDoc} */
@Override public long getSocketWriteTimeout() {
return TcpCommunicationSpi.this.getSocketWriteTimeout();
}
/** {@inheritDoc} */
@Override public int getAckSendThreshold() {
return TcpCommunicationSpi.this.getAckSendThreshold();
}
/** {@inheritDoc} */
@Override public int getUnacknowledgedMessagesBufferSize() {
return TcpCommunicationSpi.this.getUnacknowledgedMessagesBufferSize();
}
/** {@inheritDoc} */
@Override public long getConnectTimeout() {
return TcpCommunicationSpi.this.getConnectTimeout();
}
/** {@inheritDoc} */
@Override public long getMaxConnectTimeout() {
return TcpCommunicationSpi.this.getMaxConnectTimeout();
}
/** {@inheritDoc} */
@Override public int getReconnectCount() {
return TcpCommunicationSpi.this.getReconnectCount();
}
/** {@inheritDoc} */
@Override public boolean isDirectBuffer() {
return TcpCommunicationSpi.this.isDirectBuffer();
}
/** {@inheritDoc} */
@Override public boolean isDirectSendBuffer() {
return TcpCommunicationSpi.this.isDirectSendBuffer();
}
/** {@inheritDoc} */
@Override public int getSelectorsCount() {
return TcpCommunicationSpi.this.getSelectorsCount();
}
/** {@inheritDoc} */
@Override public long getSelectorSpins() {
return TcpCommunicationSpi.this.getSelectorSpins();
}
/** {@inheritDoc} */
@Override public boolean isTcpNoDelay() {
return TcpCommunicationSpi.this.isTcpNoDelay();
}
/** {@inheritDoc} */
@Override public int getSocketReceiveBuffer() {
return TcpCommunicationSpi.this.getSocketReceiveBuffer();
}
/** {@inheritDoc} */
@Override public int getSocketSendBuffer() {
return TcpCommunicationSpi.this.getSocketSendBuffer();
}
/** {@inheritDoc} */
@Override public int getMessageQueueLimit() {
return TcpCommunicationSpi.this.getMessageQueueLimit();
}
/** {@inheritDoc} */
@Override public int getSlowClientQueueLimit() {
return TcpCommunicationSpi.this.getSlowClientQueueLimit();
}
/** {@inheritDoc} */
@Override public void dumpStats() {
TcpCommunicationSpi.this.dumpStats();
}
/** {@inheritDoc} */
@Override public int getSentMessagesCount() {
return TcpCommunicationSpi.this.getSentMessagesCount();
}
/** {@inheritDoc} */
@Override public long getSentBytesCount() {
return TcpCommunicationSpi.this.getSentBytesCount();
}
/** {@inheritDoc} */
@Override public int getReceivedMessagesCount() {
return TcpCommunicationSpi.this.getReceivedMessagesCount();
}
/** {@inheritDoc} */
@Override public long getReceivedBytesCount() {
return TcpCommunicationSpi.this.getReceivedBytesCount();
}
/** {@inheritDoc} */
@Override public Map<String, Long> getReceivedMessagesByType() {
return TcpCommunicationSpi.this.getReceivedMessagesByType();
}
/** {@inheritDoc} */
@Override public Map<UUID, Long> getReceivedMessagesByNode() {
return TcpCommunicationSpi.this.getReceivedMessagesByNode();
}
/** {@inheritDoc} */
@Override public Map<String, Long> getSentMessagesByType() {
return TcpCommunicationSpi.this.getSentMessagesByType();
}
/** {@inheritDoc} */
@Override public Map<UUID, Long> getSentMessagesByNode() {
return TcpCommunicationSpi.this.getSentMessagesByNode();
}
/** {@inheritDoc} */
@Override public int getOutboundMessagesQueueSize() {
return TcpCommunicationSpi.this.getOutboundMessagesQueueSize();
}
}
}