blob: a0acb5c005de0140e51fdb184d6622f503817ee7 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.spi.communication.tcp;
import org.apache.ignite.*;
import org.apache.ignite.cluster.*;
import org.apache.ignite.configuration.*;
import org.apache.ignite.events.*;
import org.apache.ignite.internal.*;
import org.apache.ignite.internal.managers.eventstorage.*;
import org.apache.ignite.internal.util.*;
import org.apache.ignite.internal.util.future.*;
import org.apache.ignite.internal.util.ipc.*;
import org.apache.ignite.internal.util.ipc.shmem.*;
import org.apache.ignite.internal.util.lang.*;
import org.apache.ignite.internal.util.nio.*;
import org.apache.ignite.internal.util.nio.ssl.*;
import org.apache.ignite.internal.util.typedef.*;
import org.apache.ignite.internal.util.typedef.internal.*;
import org.apache.ignite.internal.util.worker.*;
import org.apache.ignite.lang.*;
import org.apache.ignite.plugin.extensions.communication.*;
import org.apache.ignite.resources.*;
import org.apache.ignite.spi.*;
import org.apache.ignite.spi.communication.*;
import org.apache.ignite.thread.*;
import org.jetbrains.annotations.*;
import org.jsr166.*;
import javax.net.ssl.*;
import java.io.*;
import java.net.*;
import java.nio.*;
import java.nio.channels.*;
import java.nio.channels.spi.*;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.*;
import static org.apache.ignite.events.EventType.*;
/**
* <tt>TcpCommunicationSpi</tt> is default communication SPI which uses
* TCP/IP protocol and Java NIO to communicate with other nodes.
* <p>
* To enable communication with other nodes, this SPI adds {@link #ATTR_ADDRS}
* and {@link #ATTR_PORT} local node attributes (see {@link ClusterNode#attributes()}.
* <p>
* At startup, this SPI tries to start listening to local port specified by
* {@link #setLocalPort(int)} method. If local port is occupied, then SPI will
* automatically increment the port number until it can successfully bind for
* listening. {@link #setLocalPortRange(int)} configuration parameter controls
* maximum number of ports that SPI will try before it fails. Port range comes
* very handy when starting multiple grid nodes on the same machine or even
* in the same VM. In this case all nodes can be brought up without a single
* change in configuration.
* <p>
* This SPI caches connections to remote nodes so it does not have to reconnect every
* time a message is sent. By default, idle connections are kept active for
* {@link #DFLT_IDLE_CONN_TIMEOUT} period and then are closed. Use
* {@link #setIdleConnectionTimeout(long)} configuration parameter to configure
* you own idle connection timeout.
* <p>
* <h1 class="header">Configuration</h1>
* <h2 class="header">Mandatory</h2>
* This SPI has no mandatory configuration parameters.
* <h2 class="header">Optional</h2>
* The following configuration parameters are optional:
* <ul>
* <li>Node local IP address (see {@link #setLocalAddress(String)})</li>
* <li>Node local port number (see {@link #setLocalPort(int)})</li>
* <li>Local port range (see {@link #setLocalPortRange(int)}</li>
* <li>Connection buffer flush frequency (see {@link #setConnectionBufferFlushFrequency(long)})</li>
* <li>Connection buffer size (see {@link #setConnectionBufferSize(int)})</li>
* <li>Idle connection timeout (see {@link #setIdleConnectionTimeout(long)})</li>
* <li>Direct or heap buffer allocation (see {@link #setDirectBuffer(boolean)})</li>
* <li>Direct or heap buffer allocation for sending (see {@link #setDirectSendBuffer(boolean)})</li>
* <li>Count of selectors and selector threads for NIO server (see {@link #setSelectorsCount(int)})</li>
* <li>{@code TCP_NODELAY} socket option for sockets (see {@link #setTcpNoDelay(boolean)})</li>
* <li>Message queue limit (see {@link #setMessageQueueLimit(int)})</li>
* <li>Minimum buffered message count (see {@link #setMinimumBufferedMessageCount(int)})</li>
* <li>Connect timeout (see {@link #setConnectTimeout(long)})</li>
* <li>Maximum connect timeout (see {@link #setMaxConnectTimeout(long)})</li>
* <li>Reconnect attempts count (see {@link #setReconnectCount(int)})</li>
* <li>Socket receive buffer size (see {@link #setSocketReceiveBuffer(int)})</li>
* <li>Socket send buffer size (see {@link #setSocketSendBuffer(int)})</li>
* <li>Socket write timeout (see {@link #setSocketWriteTimeout(long)})</li>
* <li>Number of received messages after which acknowledgment is sent (see {@link #setAckSendThreshold(int)})</li>
* <li>Maximum number of unacknowledged messages (see {@link #setUnacknowledgedMessagesBufferSize(int)})</li>
* </ul>
* <h2 class="header">Java Example</h2>
* TcpCommunicationSpi is used by default and should be explicitly configured
* only if some SPI configuration parameters need to be overridden.
* <pre name="code" class="java">
* TcpCommunicationSpi commSpi = new TcpCommunicationSpi();
*
* // Override local port.
* commSpi.setLocalPort(4321);
*
* IgniteConfiguration cfg = new IgniteConfiguration();
*
* // Override default communication SPI.
* cfg.setCommunicationSpi(commSpi);
*
* // Start grid.
* Ignition.start(cfg);
* </pre>
* <h2 class="header">Spring Example</h2>
* TcpCommunicationSpi can be configured from Spring XML configuration file:
* <pre name="code" class="xml">
* &lt;bean id="grid.custom.cfg" class="org.apache.ignite.configuration.IgniteConfiguration" singleton="true"&gt;
* ...
* &lt;property name="communicationSpi"&gt;
* &lt;bean class="org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi"&gt;
* &lt;!-- Override local port. --&gt;
* &lt;property name="localPort" value="4321"/&gt;
* &lt;/bean&gt;
* &lt;/property&gt;
* ...
* &lt;/bean&gt;
* </pre>
* <p>
* <img src="http://ignite.incubator.apache.org/images/spring-small.png">
* <br>
* For information about Spring framework visit <a href="http://www.springframework.org/">www.springframework.org</a>
* @see CommunicationSpi
*/
@IgniteSpiMultipleInstancesSupport(true)
@IgniteSpiConsistencyChecked(optional = false)
public class TcpCommunicationSpi extends IgniteSpiAdapter
implements CommunicationSpi<Message>, TcpCommunicationSpiMBean {
/** IPC error message. */
public static final String OUT_OF_RESOURCES_TCP_MSG = "Failed to allocate shared memory segment " +
"(switching to TCP, may be slower).";
/** Node attribute that is mapped to node IP addresses (value is <tt>comm.tcp.addrs</tt>). */
public static final String ATTR_ADDRS = "comm.tcp.addrs";
/** Node attribute that is mapped to node host names (value is <tt>comm.tcp.host.names</tt>). */
public static final String ATTR_HOST_NAMES = "comm.tcp.host.names";
/** Node attribute that is mapped to node port number (value is <tt>comm.tcp.port</tt>). */
public static final String ATTR_PORT = "comm.tcp.port";
/** Node attribute that is mapped to node port number (value is <tt>comm.shmem.tcp.port</tt>). */
public static final String ATTR_SHMEM_PORT = "comm.shmem.tcp.port";
/** Node attribute that is mapped to node's external addresses (value is <tt>comm.tcp.ext-addrs</tt>). */
public static final String ATTR_EXT_ADDRS = "comm.tcp.ext-addrs";
/** Default port which node sets listener to (value is <tt>47100</tt>). */
public static final int DFLT_PORT = 47100;
/** Default port which node sets listener for shared memory connections (value is <tt>48100</tt>). */
public static final int DFLT_SHMEM_PORT = 48100;
/** Default idle connection timeout (value is <tt>30000</tt>ms). */
public static final long DFLT_IDLE_CONN_TIMEOUT = 30000;
/** Default socket send and receive buffer size. */
public static final int DFLT_SOCK_BUF_SIZE = 32 * 1024;
/** Default connection timeout (value is <tt>5000</tt>ms). */
public static final long DFLT_CONN_TIMEOUT = 5000;
/** Default Maximum connection timeout (value is <tt>600,000</tt>ms). */
public static final long DFLT_MAX_CONN_TIMEOUT = 10 * 60 * 1000;
/** Default reconnect attempts count (value is <tt>10</tt>). */
public static final int DFLT_RECONNECT_CNT = 10;
/** Default message queue limit per connection (for incoming and outgoing . */
public static final int DFLT_MSG_QUEUE_LIMIT = GridNioServer.DFLT_SEND_QUEUE_LIMIT;
/**
* Default count of selectors for TCP server equals to
* {@code "Math.min(4, Runtime.getRuntime().availableProcessors())"}.
*/
public static final int DFLT_SELECTORS_CNT = Math.min(4, Runtime.getRuntime().availableProcessors());
/** Node ID meta for session. */
private static final int NODE_ID_META = GridNioSessionMetaKey.nextUniqueKey();
/** Message tracker meta for session. */
private static final int TRACKER_META = GridNioSessionMetaKey.nextUniqueKey();
/**
* Default local port range (value is <tt>100</tt>).
* See {@link #setLocalPortRange(int)} for details.
*/
public static final int DFLT_PORT_RANGE = 100;
/** Default value for {@code TCP_NODELAY} socket option (value is <tt>true</tt>). */
public static final boolean DFLT_TCP_NODELAY = true;
/** Default received messages threshold for sending ack. */
public static final int DFLT_ACK_SND_THRESHOLD = 16;
/** Default socket write timeout. */
public static final long DFLT_SOCK_WRITE_TIMEOUT = 2000;
/** No-op runnable. */
private static final IgniteRunnable NOOP = new IgniteRunnable() {
@Override public void run() {
// No-op.
}
};
/** Node ID message type. */
public static final byte NODE_ID_MSG_TYPE = -1;
/** */
public static final byte RECOVERY_LAST_ID_MSG_TYPE = -2;
/** */
public static final byte HANDSHAKE_MSG_TYPE = -3;
/** Server listener. */
private final GridNioServerListener<Message> srvLsnr =
new GridNioServerListenerAdapter<Message>() {
@Override public void onSessionWriteTimeout(GridNioSession ses) {
LT.warn(log, null, "Communication SPI Session write timed out (consider increasing " +
"'socketWriteTimeout' " + "configuration property) [remoteAddr=" + ses.remoteAddress() +
", writeTimeout=" + sockWriteTimeout + ']');
if (log.isDebugEnabled())
log.debug("Closing communication SPI session on write timeout [remoteAddr=" + ses.remoteAddress() +
", writeTimeout=" + sockWriteTimeout + ']');
ses.close();
}
@Override public void onConnected(GridNioSession ses) {
if (ses.accepted()) {
if (log.isDebugEnabled())
log.debug("Sending local node ID to newly accepted session: " + ses);
ses.send(nodeIdMsg);
}
}
@Override public void onDisconnected(GridNioSession ses, @Nullable Exception e) {
UUID id = ses.meta(NODE_ID_META);
if (id != null) {
GridCommunicationClient rmv = clients.get(id);
if (rmv instanceof GridTcpNioCommunicationClient &&
((GridTcpNioCommunicationClient)rmv).session() == ses &&
clients.remove(id, rmv)) {
rmv.forceClose();
if (!isNodeStopping()) {
GridNioRecoveryDescriptor recoveryData = ses.recoveryDescriptor();
if (recoveryData != null) {
if (recoveryData.nodeAlive(getSpiContext().node(id))) {
if (!recoveryData.messagesFutures().isEmpty()) {
if (log.isDebugEnabled())
log.debug("Session was closed but there are unacknowledged messages, " +
"will try to reconnect [rmtNode=" + recoveryData.node().id() + ']');
commWorker.addReconnectRequest(recoveryData);
}
}
else
recoveryData.onNodeLeft();
}
}
}
CommunicationListener<Message> lsnr0 = lsnr;
if (lsnr0 != null)
lsnr0.onDisconnected(id);
}
}
@Override public void onMessage(GridNioSession ses, Message msg) {
UUID sndId = ses.meta(NODE_ID_META);
if (sndId == null) {
assert ses.accepted();
if (msg instanceof NodeIdMessage)
sndId = U.bytesToUuid(((NodeIdMessage)msg).nodeIdBytes, 0);
else {
assert msg instanceof HandshakeMessage : msg;
sndId = ((HandshakeMessage)msg).nodeId();
}
if (log.isDebugEnabled())
log.debug("Remote node ID received: " + sndId);
final UUID old = ses.addMeta(NODE_ID_META, sndId);
assert old == null;
final ClusterNode rmtNode = getSpiContext().node(sndId);
if (rmtNode == null) {
ses.close();
return;
}
ClusterNode locNode = getSpiContext().localNode();
if (ses.remoteAddress() == null)
return;
GridCommunicationClient oldClient = clients.get(sndId);
boolean hasShmemClient = false;
if (oldClient != null) {
if (oldClient instanceof GridTcpNioCommunicationClient) {
if (log.isDebugEnabled())
log.debug("Received incoming connection when already connected " +
"to this node, rejecting [locNode=" + locNode.id() +
", rmtNode=" + sndId + ']');
ses.send(new RecoveryLastReceivedMessage(-1));
return;
}
else {
assert oldClient instanceof GridShmemCommunicationClient;
hasShmemClient = true;
}
}
GridFutureAdapter<GridCommunicationClient> fut = new GridFutureAdapter<>();
GridFutureAdapter<GridCommunicationClient> oldFut = clientFuts.putIfAbsent(sndId, fut);
assert msg instanceof HandshakeMessage : msg;
HandshakeMessage msg0 = (HandshakeMessage)msg;
final GridNioRecoveryDescriptor recoveryDesc = recoveryDescriptor(rmtNode);
if (oldFut == null) {
oldClient = clients.get(sndId);
if (oldClient != null) {
if (oldClient instanceof GridTcpNioCommunicationClient) {
if (log.isDebugEnabled())
log.debug("Received incoming connection when already connected " +
"to this node, rejecting [locNode=" + locNode.id() +
", rmtNode=" + sndId + ']');
ses.send(new RecoveryLastReceivedMessage(-1));
return;
}
else {
assert oldClient instanceof GridShmemCommunicationClient;
hasShmemClient = true;
}
}
boolean reserved = recoveryDesc.tryReserve(msg0.connectCount(),
new ConnectClosure(ses, recoveryDesc, rmtNode, msg0, !hasShmemClient, fut));
if (log.isDebugEnabled())
log.debug("Received incoming connection from remote node " +
"[rmtNode=" + rmtNode.id() + ", reserved=" + reserved + ']');
if (reserved) {
try {
GridTcpNioCommunicationClient client =
connected(recoveryDesc, ses, rmtNode, msg0.received(), true, !hasShmemClient);
fut.onDone(client);
}
finally {
clientFuts.remove(rmtNode.id(), fut);
}
}
}
else {
if (oldFut instanceof ConnectFuture && locNode.order() < rmtNode.order()) {
if (log.isDebugEnabled()) {
log.debug("Received incoming connection from remote node while " +
"connecting to this node, rejecting [locNode=" + locNode.id() +
", locNodeOrder=" + locNode.order() + ", rmtNode=" + rmtNode.id() +
", rmtNodeOrder=" + rmtNode.order() + ']');
}
ses.send(new RecoveryLastReceivedMessage(-1));
}
else {
boolean reserved = recoveryDesc.tryReserve(msg0.connectCount(),
new ConnectClosure(ses, recoveryDesc, rmtNode, msg0, !hasShmemClient, fut));
if (reserved) {
GridTcpNioCommunicationClient client =
connected(recoveryDesc, ses, rmtNode, msg0.received(), true, !hasShmemClient);
fut.onDone(client);
}
}
}
}
else {
rcvdMsgsCnt.increment();
GridNioRecoveryDescriptor recovery = ses.recoveryDescriptor();
if (recovery != null) {
if (msg instanceof RecoveryLastReceivedMessage) {
RecoveryLastReceivedMessage msg0 = (RecoveryLastReceivedMessage)msg;
if (log.isDebugEnabled())
log.debug("Received recovery acknowledgement [rmtNode=" + sndId +
", rcvCnt=" + msg0.received() + ']');
recovery.ackReceived(msg0.received());
return;
}
else {
long rcvCnt = recovery.onReceived();
if (rcvCnt % ackSndThreshold == 0) {
if (log.isDebugEnabled())
log.debug("Send recovery acknowledgement [rmtNode=" + sndId +
", rcvCnt=" + rcvCnt + ']');
nioSrvr.sendSystem(ses, new RecoveryLastReceivedMessage(rcvCnt));
recovery.lastAcknowledged(rcvCnt);
}
}
}
IgniteRunnable c;
if (msgQueueLimit > 0) {
GridNioMessageTracker tracker = ses.meta(TRACKER_META);
if (tracker == null) {
GridNioMessageTracker old = ses.addMeta(TRACKER_META, tracker =
new GridNioMessageTracker(ses, msgQueueLimit));
assert old == null;
}
tracker.onMessageReceived();
c = tracker;
}
else
c = NOOP;
notifyListener(sndId, msg, c);
}
}
/**
* @param recovery Recovery descriptor.
* @param ses Session.
* @param node Node.
* @param rcvCnt Number of received messages..
* @param sndRes If {@code true} sends response for recovery handshake.
* @param createClient If {@code true} creates NIO communication client.
* @return Client.
*/
private GridTcpNioCommunicationClient connected(
GridNioRecoveryDescriptor recovery,
GridNioSession ses,
ClusterNode node,
long rcvCnt,
boolean sndRes,
boolean createClient) {
recovery.onHandshake(rcvCnt);
ses.recoveryDescriptor(recovery);
nioSrvr.resend(ses);
if (sndRes)
nioSrvr.sendSystem(ses, new RecoveryLastReceivedMessage(recovery.receivedCount()));
recovery.connected();
GridTcpNioCommunicationClient client = null;
if (createClient) {
client = new GridTcpNioCommunicationClient(ses, log);
GridCommunicationClient oldClient = clients.putIfAbsent(node.id(), client);
assert oldClient == null : "Client already created [node=" + node + ", client=" + client +
", oldClient=" + oldClient + ", recoveryDesc=" + recovery + ']';
}
return client;
}
/**
*
*/
@SuppressWarnings("PackageVisibleInnerClass")
class ConnectClosure implements IgniteInClosure<Boolean> {
/** */
private static final long serialVersionUID = 0L;
/** */
private final GridNioSession ses;
/** */
private final GridNioRecoveryDescriptor recoveryDesc;
/** */
private final ClusterNode rmtNode;
/** */
private final HandshakeMessage msg;
/** */
private final GridFutureAdapter<GridCommunicationClient> fut;
/** */
private final boolean createClient;
/**
* @param ses Incoming session.
* @param recoveryDesc Recovery descriptor.
* @param rmtNode Remote node.
* @param msg Handshake message.
* @param createClient If {@code true} creates NIO communication client..
* @param fut Connect future.
*/
ConnectClosure(GridNioSession ses,
GridNioRecoveryDescriptor recoveryDesc,
ClusterNode rmtNode,
HandshakeMessage msg,
boolean createClient,
GridFutureAdapter<GridCommunicationClient> fut) {
this.ses = ses;
this.recoveryDesc = recoveryDesc;
this.rmtNode = rmtNode;
this.msg = msg;
this.createClient = createClient;
this.fut = fut;
}
/** {@inheritDoc} */
@Override public void apply(Boolean success) {
if (success) {
IgniteInClosure<IgniteInternalFuture<?>> lsnr = new IgniteInClosure<IgniteInternalFuture<?>>() {
@Override public void apply(IgniteInternalFuture<?> msgFut) {
try {
msgFut.get();
GridTcpNioCommunicationClient client =
connected(recoveryDesc, ses, rmtNode, msg.received(), false, createClient);
fut.onDone(client);
}
catch (IgniteCheckedException e) {
if (log.isDebugEnabled())
log.debug("Failed to send recovery handshake " +
"[rmtNode=" + rmtNode.id() + ", err=" + e + ']');
recoveryDesc.release();
fut.onDone();
}
finally {
clientFuts.remove(rmtNode.id(), fut);
}
}
};
nioSrvr.sendSystem(ses, new RecoveryLastReceivedMessage(recoveryDesc.receivedCount()), lsnr);
}
else {
try {
fut.onDone();
}
finally {
clientFuts.remove(rmtNode.id(), fut);
}
}
}
}
};
/** Logger. */
@LoggerResource
private IgniteLogger log;
/** Local IP address. */
private String locAddr;
/** Complex variable that represents this node IP address. */
private volatile InetAddress locHost;
/** Local port which node uses. */
private int locPort = DFLT_PORT;
/** Local port range. */
private int locPortRange = DFLT_PORT_RANGE;
/** Local port which node uses to accept shared memory connections. */
private int shmemPort = DFLT_SHMEM_PORT;
/** Allocate direct buffer or heap buffer. */
private boolean directBuf = true;
/** Allocate direct buffer or heap buffer. */
private boolean directSndBuf;
/** Idle connection timeout. */
private long idleConnTimeout = DFLT_IDLE_CONN_TIMEOUT;
/** Connect timeout. */
private long connTimeout = DFLT_CONN_TIMEOUT;
/** Maximum connect timeout. */
private long maxConnTimeout = DFLT_MAX_CONN_TIMEOUT;
/** Reconnect attempts count. */
@SuppressWarnings({"FieldAccessedSynchronizedAndUnsynchronized"})
private int reconCnt = DFLT_RECONNECT_CNT;
/** Socket send buffer. */
private int sockSndBuf = DFLT_SOCK_BUF_SIZE;
/** Socket receive buffer. */
private int sockRcvBuf = DFLT_SOCK_BUF_SIZE;
/** Message queue limit. */
private int msgQueueLimit = DFLT_MSG_QUEUE_LIMIT;
/** Slow client queue limit. */
private int slowClientQueueLimit;
/** NIO server. */
private GridNioServer<Message> nioSrvr;
/** Shared memory server. */
private IpcSharedMemoryServerEndpoint shmemSrv;
/** {@code TCP_NODELAY} option value for created sockets. */
private boolean tcpNoDelay = DFLT_TCP_NODELAY;
/** Number of received messages after which acknowledgment is sent. */
private int ackSndThreshold = DFLT_ACK_SND_THRESHOLD;
/** Maximum number of unacknowledged messages. */
private int unackedMsgsBufSize;
/** Socket write timeout. */
private long sockWriteTimeout = DFLT_SOCK_WRITE_TIMEOUT;
/** Recovery and idle clients handler. */
private CommunicationWorker commWorker;
/** Shared memory accept worker. */
private ShmemAcceptWorker shmemAcceptWorker;
/** Shared memory workers. */
private final Collection<ShmemWorker> shmemWorkers = new ConcurrentLinkedDeque8<>();
/** Clients. */
private final ConcurrentMap<UUID, GridCommunicationClient> clients = GridConcurrentFactory.newMap();
/** SPI listener. */
private volatile CommunicationListener<Message> lsnr;
/** Bound port. */
private int boundTcpPort = -1;
/** Bound port for shared memory server. */
private int boundTcpShmemPort = -1;
/** Count of selectors to use in TCP server. */
private int selectorsCnt = DFLT_SELECTORS_CNT;
/** Address resolver. */
private AddressResolver addrRslvr;
/** Local node ID message. */
private NodeIdMessage nodeIdMsg;
/** Received messages count. */
private final LongAdder8 rcvdMsgsCnt = new LongAdder8();
/** Sent messages count.*/
private final LongAdder8 sentMsgsCnt = new LongAdder8();
/** Received bytes count. */
private final LongAdder8 rcvdBytesCnt = new LongAdder8();
/** Sent bytes count.*/
private final LongAdder8 sentBytesCnt = new LongAdder8();
/** Context initialization latch. */
private final CountDownLatch ctxInitLatch = new CountDownLatch(1);
/** metrics listener. */
private final GridNioMetricsListener metricsLsnr = new GridNioMetricsListener() {
@Override public void onBytesSent(int bytesCnt) {
sentBytesCnt.add(bytesCnt);
}
@Override public void onBytesReceived(int bytesCnt) {
rcvdBytesCnt.add(bytesCnt);
}
};
/** Client connect futures. */
private final ConcurrentMap<UUID, GridFutureAdapter<GridCommunicationClient>> clientFuts =
GridConcurrentFactory.newMap();
/** */
private final ConcurrentMap<ClientKey, GridNioRecoveryDescriptor> recoveryDescs = GridConcurrentFactory.newMap();
/** Discovery listener. */
private final GridLocalEventListener discoLsnr = new GridLocalEventListener() {
@Override public void onEvent(Event evt) {
assert evt instanceof DiscoveryEvent;
assert evt.type() == EVT_NODE_LEFT || evt.type() == EVT_NODE_FAILED;
onNodeLeft(((DiscoveryEvent)evt).eventNode().id());
}
};
/**
* @return {@code True} if ssl enabled.
*/
private boolean isSslEnabled() {
return ignite.configuration().getSslContextFactory() != null;
}
/**
* Sets address resolver.
*
* @param addrRslvr Address resolver.
*/
@IgniteSpiConfiguration(optional = true)
public void setAddressResolver(AddressResolver addrRslvr) {
// Injection should not override value already set by Spring or user.
if (this.addrRslvr == null)
this.addrRslvr = addrRslvr;
}
/**
* Injects resources.
*
* @param ignite Ignite.
*/
@IgniteInstanceResource
@Override protected void injectResources(Ignite ignite) {
super.injectResources(ignite);
if (ignite != null) {
setAddressResolver(ignite.configuration().getAddressResolver());
setLocalAddress(ignite.configuration().getLocalHost());
}
}
/**
* Sets local host address for socket binding. Note that one node could have
* additional addresses beside the loopback one. This configuration
* parameter is optional.
*
* @param locAddr IP address. Default value is any available local
* IP address.
*/
@IgniteSpiConfiguration(optional = true)
public void setLocalAddress(String locAddr) {
// Injection should not override value already set by Spring or user.
if (this.locAddr == null)
this.locAddr = locAddr;
}
/** {@inheritDoc} */
@Override public String getLocalAddress() {
return locAddr;
}
/**
* Sets local port for socket binding.
* <p>
* If not provided, default value is {@link #DFLT_PORT}.
*
* @param locPort Port number.
*/
@IgniteSpiConfiguration(optional = true)
public void setLocalPort(int locPort) {
this.locPort = locPort;
}
/** {@inheritDoc} */
@Override public int getLocalPort() {
return locPort;
}
/**
* Sets local port range for local host ports (value must greater than or equal to <tt>0</tt>).
* If provided local port (see {@link #setLocalPort(int)}} is occupied,
* implementation will try to increment the port number for as long as it is less than
* initial value plus this range.
* <p>
* If port range value is <tt>0</tt>, then implementation will try bind only to the port provided by
* {@link #setLocalPort(int)} method and fail if binding to this port did not succeed.
* <p>
* Local port range is very useful during development when more than one grid nodes need to run
* on the same physical machine.
* <p>
* If not provided, default value is {@link #DFLT_PORT_RANGE}.
*
* @param locPortRange New local port range.
*/
@IgniteSpiConfiguration(optional = true)
public void setLocalPortRange(int locPortRange) {
this.locPortRange = locPortRange;
}
/** {@inheritDoc} */
@Override public int getLocalPortRange() {
return locPortRange;
}
/**
* Sets local port to accept shared memory connections.
* <p>
* If set to {@code -1} shared memory communication will be disabled.
* <p>
* If not provided, default value is {@link #DFLT_SHMEM_PORT}.
*
* @param shmemPort Port number.
*/
@IgniteSpiConfiguration(optional = true)
public void setSharedMemoryPort(int shmemPort) {
this.shmemPort = shmemPort;
}
/** {@inheritDoc} */
@Override public int getSharedMemoryPort() {
return shmemPort;
}
/**
* Sets maximum idle connection timeout upon which a connection
* to client will be closed.
* <p>
* If not provided, default value is {@link #DFLT_IDLE_CONN_TIMEOUT}.
*
* @param idleConnTimeout Maximum idle connection time.
*/
@IgniteSpiConfiguration(optional = true)
public void setIdleConnectionTimeout(long idleConnTimeout) {
this.idleConnTimeout = idleConnTimeout;
}
/** {@inheritDoc} */
@Override public long getIdleConnectionTimeout() {
return idleConnTimeout;
}
/** {@inheritDoc} */
@Override public long getSocketWriteTimeout() {
return sockWriteTimeout;
}
/**
* Sets socket write timeout for TCP connection. If message can not be written to
* socket within this time then connection is closed and reconnect is attempted.
* <p>
* Default to {@link #DFLT_SOCK_WRITE_TIMEOUT}.
*
* @param sockWriteTimeout Socket write timeout for TCP connection.
*/
@IgniteSpiConfiguration(optional = true)
public void setSocketWriteTimeout(long sockWriteTimeout) {
this.sockWriteTimeout = sockWriteTimeout;
}
/** {@inheritDoc} */
@Override public int getAckSendThreshold() {
return ackSndThreshold;
}
/**
* Sets number of received messages per connection to node after which acknowledgment message is sent.
* <p>
* Default to {@link #DFLT_ACK_SND_THRESHOLD}.
*
* @param ackSndThreshold Number of received messages after which acknowledgment is sent.
*/
@IgniteSpiConfiguration(optional = true)
public void setAckSendThreshold(int ackSndThreshold) {
this.ackSndThreshold = ackSndThreshold;
}
/** {@inheritDoc} */
@Override public int getUnacknowledgedMessagesBufferSize() {
return unackedMsgsBufSize;
}
/**
* Sets maximum number of stored unacknowledged messages per connection to node.
* If number of unacknowledged messages exceeds this number then connection to node is
* closed and reconnect is attempted.
*
* @param unackedMsgsBufSize Maximum number of unacknowledged messages.
*/
@IgniteSpiConfiguration(optional = true)
public void setUnacknowledgedMessagesBufferSize(int unackedMsgsBufSize) {
this.unackedMsgsBufSize = unackedMsgsBufSize;
}
/**
* Sets connection buffer size. If set to {@code 0} connection buffer is disabled.
*
* @param connBufSize Connection buffer size.
* @see #setConnectionBufferFlushFrequency(long)
* @deprecated Not used any more.
*/
@Deprecated
@IgniteSpiConfiguration(optional = true)
public void setConnectionBufferSize(int connBufSize) {
// No-op.
}
/** {@inheritDoc} */
@Deprecated
@Override public int getConnectionBufferSize() {
return 0;
}
/** {@inheritDoc} */
@Deprecated
@IgniteSpiConfiguration(optional = true)
@Override public void setConnectionBufferFlushFrequency(long connBufFlushFreq) {
// No-op.
}
/** {@inheritDoc} */
@Deprecated
@Override public long getConnectionBufferFlushFrequency() {
return 0;
}
/**
* Sets connect timeout used when establishing connection
* with remote nodes.
* <p>
* {@code 0} is interpreted as infinite timeout.
* <p>
* If not provided, default value is {@link #DFLT_CONN_TIMEOUT}.
*
* @param connTimeout Connect timeout.
*/
@IgniteSpiConfiguration(optional = true)
public void setConnectTimeout(long connTimeout) {
this.connTimeout = connTimeout;
}
/** {@inheritDoc} */
@Override public long getConnectTimeout() {
return connTimeout;
}
/**
* Sets maximum connect timeout. If handshake is not established within connect timeout,
* then SPI tries to repeat handshake procedure with increased connect timeout.
* Connect timeout can grow till maximum timeout value,
* if maximum timeout value is reached then the handshake is considered as failed.
* <p>
* {@code 0} is interpreted as infinite timeout.
* <p>
* If not provided, default value is {@link #DFLT_MAX_CONN_TIMEOUT}.
*
* @param maxConnTimeout Maximum connect timeout.
*/
@IgniteSpiConfiguration(optional = true)
public void setMaxConnectTimeout(long maxConnTimeout) {
this.maxConnTimeout = maxConnTimeout;
}
/** {@inheritDoc} */
@Override public long getMaxConnectTimeout() {
return maxConnTimeout;
}
/**
* Sets maximum number of reconnect attempts used when establishing connection
* with remote nodes.
* <p>
* If not provided, default value is {@link #DFLT_RECONNECT_CNT}.
*
* @param reconCnt Maximum number of reconnection attempts.
*/
@IgniteSpiConfiguration(optional = true)
public void setReconnectCount(int reconCnt) {
this.reconCnt = reconCnt;
}
/** {@inheritDoc} */
@Override public int getReconnectCount() {
return reconCnt;
}
/**
* Sets flag to allocate direct or heap buffer in SPI.
* If value is {@code true}, then SPI will use {@link ByteBuffer#allocateDirect(int)} call.
* Otherwise, SPI will use {@link ByteBuffer#allocate(int)} call.
* <p>
* If not provided, default value is {@code true}.
*
* @param directBuf Flag indicates to allocate direct or heap buffer in SPI.
*/
@IgniteSpiConfiguration(optional = true)
public void setDirectBuffer(boolean directBuf) {
this.directBuf = directBuf;
}
/** {@inheritDoc} */
@Override public boolean isDirectBuffer() {
return directBuf;
}
/** {@inheritDoc} */
@Override public boolean isDirectSendBuffer() {
return directSndBuf;
}
/**
* Sets whether to use direct buffer for sending.
* <p>
* If not provided default is {@code false}.
*
* @param directSndBuf {@code True} to use direct buffers for send.
*/
@IgniteSpiConfiguration(optional = true)
public void setDirectSendBuffer(boolean directSndBuf) {
this.directSndBuf = directSndBuf;
}
/**
* Sets the count of selectors te be used in TCP server.
* <p/>
* If not provided, default value is {@link #DFLT_SELECTORS_CNT}.
*
* @param selectorsCnt Selectors count.
*/
@IgniteSpiConfiguration(optional = true)
public void setSelectorsCount(int selectorsCnt) {
this.selectorsCnt = selectorsCnt;
}
/** {@inheritDoc} */
@Override public int getSelectorsCount() {
return selectorsCnt;
}
/**
* Sets value for {@code TCP_NODELAY} socket option. Each
* socket will be opened using provided value.
* <p>
* Setting this option to {@code true} disables Nagle's algorithm
* for socket decreasing latency and delivery time for small messages.
* <p>
* For systems that work under heavy network load it is advisable to
* set this value to {@code false}.
* <p>
* If not provided, default value is {@link #DFLT_TCP_NODELAY}.
*
* @param tcpNoDelay {@code True} to disable TCP delay.
*/
@IgniteSpiConfiguration(optional = true)
public void setTcpNoDelay(boolean tcpNoDelay) {
this.tcpNoDelay = tcpNoDelay;
}
/** {@inheritDoc} */
@Override public boolean isTcpNoDelay() {
return tcpNoDelay;
}
/**
* Sets receive buffer size for sockets created or accepted by this SPI.
* <p>
* If not provided, default is {@link #DFLT_SOCK_BUF_SIZE}.
*
* @param sockRcvBuf Socket receive buffer size.
*/
@IgniteSpiConfiguration(optional = true)
public void setSocketReceiveBuffer(int sockRcvBuf) {
this.sockRcvBuf = sockRcvBuf;
}
/** {@inheritDoc} */
@Override public int getSocketReceiveBuffer() {
return sockRcvBuf;
}
/**
* Sets send buffer size for sockets created or accepted by this SPI.
* <p>
* If not provided, default is {@link #DFLT_SOCK_BUF_SIZE}.
*
* @param sockSndBuf Socket send buffer size.
*/
@IgniteSpiConfiguration(optional = true)
public void setSocketSendBuffer(int sockSndBuf) {
this.sockSndBuf = sockSndBuf;
}
/** {@inheritDoc} */
@Override public int getSocketSendBuffer() {
return sockSndBuf;
}
/**
* Sets message queue limit for incoming and outgoing messages.
* <p>
* When set to positive number send queue is limited to the configured value.
* {@code 0} disables the size limitations.
* <p>
* If not provided, default is {@link #DFLT_MSG_QUEUE_LIMIT}.
*
* @param msgQueueLimit Send queue size limit.
*/
@IgniteSpiConfiguration(optional = true)
public void setMessageQueueLimit(int msgQueueLimit) {
this.msgQueueLimit = msgQueueLimit;
}
/** {@inheritDoc} */
@Override public int getMessageQueueLimit() {
return msgQueueLimit;
}
/** {@inheritDoc} */
@Override public int getSlowClientQueueLimit() {
return slowClientQueueLimit;
}
/**
* Sets slow client queue limit.
* <p/>
* When set to a positive number, communication SPI will monitor clients outbound message queue sizes and will drop
* those clients whose queue exceeded this limit.
* <p/>
* Usually this value should be set to the same value as {@link #getMessageQueueLimit()} which controls
* message back-pressure for server nodes. The default value for this parameter is {@code 0}
* which means {@code unlimited}.
*
* @param slowClientQueueLimit Slow client queue limit.
*/
public void setSlowClientQueueLimit(int slowClientQueueLimit) {
this.slowClientQueueLimit = slowClientQueueLimit;
}
/**
* Sets the minimum number of messages for this SPI, that are buffered
* prior to sending.
*
* @param minBufferedMsgCnt Minimum buffered message count.
* @deprecated Not used any more.
*/
@IgniteSpiConfiguration(optional = true)
@Deprecated
public void setMinimumBufferedMessageCount(int minBufferedMsgCnt) {
// No-op.
}
/** {@inheritDoc} */
@Deprecated
@Override public int getMinimumBufferedMessageCount() {
return 0;
}
/** {@inheritDoc} */
@Override public void setListener(CommunicationListener<Message> lsnr) {
this.lsnr = lsnr;
}
/**
* @return Listener.
*/
public CommunicationListener getListener() {
return lsnr;
}
/** {@inheritDoc} */
@Override public int getSentMessagesCount() {
return sentMsgsCnt.intValue();
}
/** {@inheritDoc} */
@Override public long getSentBytesCount() {
return sentBytesCnt.longValue();
}
/** {@inheritDoc} */
@Override public int getReceivedMessagesCount() {
return rcvdMsgsCnt.intValue();
}
/** {@inheritDoc} */
@Override public long getReceivedBytesCount() {
return rcvdBytesCnt.longValue();
}
/** {@inheritDoc} */
@Override public int getOutboundMessagesQueueSize() {
return nioSrvr.outboundMessagesQueueSize();
}
/** {@inheritDoc} */
@Override public void resetMetrics() {
// Can't use 'reset' method because it is not thread-safe
// according to javadoc.
sentMsgsCnt.add(-sentMsgsCnt.sum());
rcvdMsgsCnt.add(-rcvdMsgsCnt.sum());
sentBytesCnt.add(-sentBytesCnt.sum());
rcvdBytesCnt.add(-rcvdBytesCnt.sum());
}
/** {@inheritDoc} */
@Override public Map<String, Object> getNodeAttributes() throws IgniteSpiException {
nodeIdMsg = new NodeIdMessage(getLocalNodeId());
assertParameter(locPort > 1023, "locPort > 1023");
assertParameter(locPort <= 0xffff, "locPort < 0xffff");
assertParameter(locPortRange >= 0, "locPortRange >= 0");
assertParameter(idleConnTimeout > 0, "idleConnTimeout > 0");
assertParameter(sockRcvBuf >= 0, "sockRcvBuf >= 0");
assertParameter(sockSndBuf >= 0, "sockSndBuf >= 0");
assertParameter(msgQueueLimit >= 0, "msgQueueLimit >= 0");
assertParameter(shmemPort > 0 || shmemPort == -1, "shmemPort > 0 || shmemPort == -1");
assertParameter(reconCnt > 0, "reconnectCnt > 0");
assertParameter(selectorsCnt > 0, "selectorsCnt > 0");
assertParameter(connTimeout >= 0, "connTimeout >= 0");
assertParameter(maxConnTimeout >= connTimeout, "maxConnTimeout >= connTimeout");
assertParameter(sockWriteTimeout >= 0, "sockWriteTimeout >= 0");
assertParameter(ackSndThreshold > 0, "ackSndThreshold > 0");
assertParameter(unackedMsgsBufSize >= 0, "unackedMsgsBufSize >= 0");
if (unackedMsgsBufSize > 0) {
assertParameter(unackedMsgsBufSize >= msgQueueLimit * 5,
"Specified 'unackedMsgsBufSize' is too low, it should be at least 'msgQueueLimit * 5'.");
assertParameter(unackedMsgsBufSize >= ackSndThreshold * 5,
"Specified 'unackedMsgsBufSize' is too low, it should be at least 'ackSndThreshold * 5'.");
}
try {
locHost = U.resolveLocalHost(locAddr);
}
catch (IOException e) {
throw new IgniteSpiException("Failed to initialize local address: " + locAddr, e);
}
try {
shmemSrv = resetShmemServer();
}
catch (IgniteCheckedException e) {
U.warn(log, "Failed to start shared memory communication server.", e);
}
try {
// This method potentially resets local port to the value
// local node was bound to.
nioSrvr = resetNioServer();
}
catch (IgniteCheckedException e) {
throw new IgniteSpiException("Failed to initialize TCP server: " + locHost, e);
}
// Set local node attributes.
try {
IgniteBiTuple<Collection<String>, Collection<String>> addrs = U.resolveLocalAddresses(locHost);
Collection<InetSocketAddress> extAddrs = addrRslvr == null ? null :
U.resolveAddresses(addrRslvr, F.flat(Arrays.asList(addrs.get1(), addrs.get2())), boundTcpPort);
return F.asMap(
createSpiAttributeName(ATTR_ADDRS), addrs.get1(),
createSpiAttributeName(ATTR_HOST_NAMES), addrs.get2(),
createSpiAttributeName(ATTR_PORT), boundTcpPort,
createSpiAttributeName(ATTR_SHMEM_PORT), boundTcpShmemPort >= 0 ? boundTcpShmemPort : null,
createSpiAttributeName(ATTR_EXT_ADDRS), extAddrs);
}
catch (IOException | IgniteCheckedException e) {
throw new IgniteSpiException("Failed to resolve local host to addresses: " + locHost, e);
}
}
/** {@inheritDoc} */
@Override public void spiStart(String gridName) throws IgniteSpiException {
assert locHost != null;
// Start SPI start stopwatch.
startStopwatch();
// Ack parameters.
if (log.isDebugEnabled()) {
log.debug(configInfo("locAddr", locAddr));
log.debug(configInfo("locPort", locPort));
log.debug(configInfo("locPortRange", locPortRange));
log.debug(configInfo("idleConnTimeout", idleConnTimeout));
log.debug(configInfo("directBuf", directBuf));
log.debug(configInfo("directSendBuf", directSndBuf));
log.debug(configInfo("selectorsCnt", selectorsCnt));
log.debug(configInfo("tcpNoDelay", tcpNoDelay));
log.debug(configInfo("sockSndBuf", sockSndBuf));
log.debug(configInfo("sockRcvBuf", sockRcvBuf));
log.debug(configInfo("shmemPort", shmemPort));
log.debug(configInfo("msgQueueLimit", msgQueueLimit));
log.debug(configInfo("connTimeout", connTimeout));
log.debug(configInfo("maxConnTimeout", maxConnTimeout));
log.debug(configInfo("reconCnt", reconCnt));
log.debug(configInfo("sockWriteTimeout", sockWriteTimeout));
log.debug(configInfo("ackSndThreshold", ackSndThreshold));
log.debug(configInfo("unackedMsgsBufSize", unackedMsgsBufSize));
}
if (!tcpNoDelay)
U.quietAndWarn(log, "'TCP_NO_DELAY' for communication is off, which should be used with caution " +
"since may produce significant delays with some scenarios.");
if (slowClientQueueLimit > 0 && msgQueueLimit > 0 && slowClientQueueLimit >= msgQueueLimit) {
U.quietAndWarn(log, "Slow client queue limit is set to a value greater than message queue limit " +
"(slow client queue limit will have no effect) [msgQueueLimit=" + msgQueueLimit +
", slowClientQueueLimit=" + slowClientQueueLimit + ']');
}
registerMBean(gridName, this, TcpCommunicationSpiMBean.class);
if (shmemSrv != null) {
shmemAcceptWorker = new ShmemAcceptWorker(shmemSrv);
new IgniteThread(shmemAcceptWorker).start();
}
nioSrvr.start();
commWorker = new CommunicationWorker();
commWorker.start();
// Ack start.
if (log.isDebugEnabled())
log.debug(startInfo());
}
/** {@inheritDoc} }*/
@Override public void onContextInitialized0(IgniteSpiContext spiCtx) throws IgniteSpiException {
spiCtx.registerPort(boundTcpPort, IgnitePortProtocol.TCP);
// SPI can start without shmem port.
if (boundTcpShmemPort > 0)
spiCtx.registerPort(boundTcpShmemPort, IgnitePortProtocol.TCP);
spiCtx.addLocalEventListener(discoLsnr, EVT_NODE_LEFT, EVT_NODE_FAILED);
ctxInitLatch.countDown();
}
/** {@inheritDoc} */
@Override public IgniteSpiContext getSpiContext() {
if (ctxInitLatch.getCount() > 0) {
if (log.isDebugEnabled())
log.debug("Waiting for context initialization.");
try {
U.await(ctxInitLatch);
if (log.isDebugEnabled())
log.debug("Context has been initialized.");
}
catch (IgniteInterruptedCheckedException e) {
U.warn(log, "Thread has been interrupted while waiting for SPI context initialization.", e);
}
}
return super.getSpiContext();
}
/**
* Recreates tpcSrvr socket instance.
*
* @return Server instance.
* @throws IgniteCheckedException Thrown if it's not possible to create server.
*/
private GridNioServer<Message> resetNioServer() throws IgniteCheckedException {
if (boundTcpPort >= 0)
throw new IgniteCheckedException("Tcp NIO server was already created on port " + boundTcpPort);
IgniteCheckedException lastEx = null;
// If configured TCP port is busy, find first available in range.
for (int port = locPort; port < locPort + locPortRange; port++) {
try {
MessageFactory msgFactory = new MessageFactory() {
private MessageFactory impl;
@Nullable @Override public Message create(byte type) {
if (impl == null)
impl = getSpiContext().messageFactory();
assert impl != null;
return impl.create(type);
}
};
MessageFormatter msgFormatter = new MessageFormatter() {
private MessageFormatter impl;
@Override public MessageWriter writer() {
if (impl == null)
impl = getSpiContext().messageFormatter();
assert impl != null;
return impl.writer();
}
@Override public MessageReader reader(MessageFactory factory) {
if (impl == null)
impl = getSpiContext().messageFormatter();
assert impl != null;
return impl.reader(factory);
}
};
GridDirectParser parser = new GridDirectParser(msgFactory, msgFormatter);
IgnitePredicate<Message> skipRecoveryPred = new IgnitePredicate<Message>() {
@Override public boolean apply(Message msg) {
return msg instanceof RecoveryLastReceivedMessage;
}
};
boolean clientMode = Boolean.TRUE.equals(ignite.configuration().isClientMode());
IgniteBiInClosure<GridNioSession, Integer> queueSizeMonitor =
!clientMode && slowClientQueueLimit > 0 ?
new CI2<GridNioSession, Integer>() {
@Override public void apply(GridNioSession ses, Integer qSize) {
checkClientQueueSize(ses, qSize);
}
} :
null;
GridNioFilter[] filters;
if (isSslEnabled()) {
GridNioSslFilter sslFilter =
new GridNioSslFilter(ignite.configuration().getSslContextFactory().create(),
true, ByteOrder.nativeOrder(), log);
sslFilter.directMode(true);
sslFilter.wantClientAuth(true);
sslFilter.needClientAuth(true);
filters = new GridNioFilter[] {
new GridNioCodecFilter(parser, log, true),
new GridConnectionBytesVerifyFilter(log),
sslFilter
};
}
else
filters = new GridNioFilter[] {
new GridNioCodecFilter(parser, log, true),
new GridConnectionBytesVerifyFilter(log)
};
GridNioServer<Message> srvr =
GridNioServer.<Message>builder()
.address(locHost)
.port(port)
.listener(srvLsnr)
.logger(log)
.selectorCount(selectorsCnt)
.gridName(gridName)
.tcpNoDelay(tcpNoDelay)
.directBuffer(directBuf)
.byteOrder(ByteOrder.nativeOrder())
.socketSendBufferSize(sockSndBuf)
.socketReceiveBufferSize(sockRcvBuf)
.sendQueueLimit(msgQueueLimit)
.directMode(true)
.metricsListener(metricsLsnr)
.writeTimeout(sockWriteTimeout)
.filters(filters)
.messageFormatter(msgFormatter)
.skipRecoveryPredicate(skipRecoveryPred)
.messageQueueSizeListener(queueSizeMonitor)
.build();
boundTcpPort = port;
// Ack Port the TCP server was bound to.
if (log.isInfoEnabled())
log.info("Successfully bound to TCP port [port=" + boundTcpPort +
", locHost=" + locHost + ']');
srvr.idleTimeout(idleConnTimeout);
return srvr;
}
catch (IgniteCheckedException e) {
if (X.hasCause(e, SSLException.class))
throw new IgniteSpiException("Failed to create SSL context. SSL factory: "
+ ignite.configuration().getSslContextFactory() + '.', e);
lastEx = e;
if (log.isDebugEnabled())
log.debug("Failed to bind to local port (will try next port within range) [port=" + port +
", locHost=" + locHost + ']');
onException("Failed to bind to local port (will try next port within range) [port=" + port +
", locHost=" + locHost + ']', e);
}
}
// If free port wasn't found.
throw new IgniteCheckedException("Failed to bind to any port within range [startPort=" + locPort +
", portRange=" + locPortRange + ", locHost=" + locHost + ']', lastEx);
}
/**
* Creates new shared memory communication server.
* @return Server.
* @throws IgniteCheckedException If failed.
*/
@Nullable private IpcSharedMemoryServerEndpoint resetShmemServer() throws IgniteCheckedException {
if (boundTcpShmemPort >= 0)
throw new IgniteCheckedException("Shared memory server was already created on port " + boundTcpShmemPort);
if (shmemPort == -1 || U.isWindows())
return null;
IgniteCheckedException lastEx = null;
// If configured TCP port is busy, find first available in range.
for (int port = shmemPort; port < shmemPort + locPortRange; port++) {
try {
IpcSharedMemoryServerEndpoint srv =
new IpcSharedMemoryServerEndpoint(log, ignite.configuration().getNodeId(), gridName);
srv.setPort(port);
srv.omitOutOfResourcesWarning(true);
srv.start();
boundTcpShmemPort = port;
// Ack Port the TCP server was bound to.
if (log.isInfoEnabled())
log.info("Successfully bound shared memory communication to TCP port [port=" + boundTcpShmemPort +
", locHost=" + locHost + ']');
return srv;
}
catch (IgniteCheckedException e) {
lastEx = e;
if (log.isDebugEnabled())
log.debug("Failed to bind to local port (will try next port within range) [port=" + port +
", locHost=" + locHost + ']');
}
}
// If free port wasn't found.
throw new IgniteCheckedException("Failed to bind shared memory communication to any port within range [startPort=" +
locPort + ", portRange=" + locPortRange + ", locHost=" + locHost + ']', lastEx);
}
/** {@inheritDoc} */
@Override public void spiStop() throws IgniteSpiException {
assert isNodeStopping();
unregisterMBean();
// Stop TCP server.
if (nioSrvr != null)
nioSrvr.stop();
U.interrupt(commWorker);
U.join(commWorker, log);
U.cancel(shmemAcceptWorker);
U.join(shmemAcceptWorker, log);
U.cancel(shmemWorkers);
U.join(shmemWorkers, log);
shmemWorkers.clear();
// Force closing on stop (safety).
for (GridCommunicationClient client : clients.values())
client.forceClose();
// Clear resources.
nioSrvr = null;
commWorker = null;
boundTcpPort = -1;
// Ack stop.
if (log.isDebugEnabled())
log.debug(stopInfo());
}
/** {@inheritDoc} */
@Override protected void onContextDestroyed0() {
if (ctxInitLatch.getCount() > 0)
// Safety.
ctxInitLatch.countDown();
// Force closing.
for (GridCommunicationClient client : clients.values())
client.forceClose();
getSpiContext().deregisterPorts();
getSpiContext().removeLocalEventListener(discoLsnr);
}
/**
* @param nodeId Left node ID.
*/
void onNodeLeft(UUID nodeId) {
assert nodeId != null;
GridCommunicationClient client = clients.get(nodeId);
if (client != null) {
if (log.isDebugEnabled())
log.debug("Forcing NIO client close since node has left [nodeId=" + nodeId +
", client=" + client + ']');
client.forceClose();
clients.remove(nodeId, client);
}
}
/** {@inheritDoc} */
@Override protected void checkConfigurationConsistency0(IgniteSpiContext spiCtx, ClusterNode node, boolean starting)
throws IgniteSpiException {
// These attributes are set on node startup in any case, so we MUST receive them.
checkAttributePresence(node, createSpiAttributeName(ATTR_ADDRS));
checkAttributePresence(node, createSpiAttributeName(ATTR_HOST_NAMES));
checkAttributePresence(node, createSpiAttributeName(ATTR_PORT));
}
/**
* Checks that node has specified attribute and prints warning if it does not.
*
* @param node Node to check.
* @param attrName Name of the attribute.
*/
private void checkAttributePresence(ClusterNode node, String attrName) {
if (node.attribute(attrName) == null)
U.warn(log, "Remote node has inconsistent configuration (required attribute was not found) " +
"[attrName=" + attrName + ", nodeId=" + node.id() +
"spiCls=" + U.getSimpleName(TcpCommunicationSpi.class) + ']');
}
/** {@inheritDoc} */
@Override public void sendMessage(ClusterNode node, Message msg) throws IgniteSpiException {
assert node != null;
assert msg != null;
if (log.isTraceEnabled())
log.trace("Sending message to node [node=" + node + ", msg=" + msg + ']');
UUID locNodeId = getLocalNodeId();
if (node.id().equals(locNodeId))
notifyListener(locNodeId, msg, NOOP);
else {
GridCommunicationClient client = null;
try {
boolean retry;
do {
client = reserveClient(node);
UUID nodeId = null;
if (!client.async() && !getSpiContext().localNode().version().equals(node.version()))
nodeId = node.id();
retry = client.sendMessage(nodeId, msg);
client.release();
client = null;
if (!retry)
sentMsgsCnt.increment();
else {
ClusterNode node0 = getSpiContext().node(node.id());
if (node0 == null)
throw new IgniteCheckedException("Failed to send message to remote node " +
"(node has left the grid): " + node.id());
}
}
while (retry);
}
catch (IgniteCheckedException e) {
throw new IgniteSpiException("Failed to send message to remote node: " + node, e);
}
finally {
if (client != null && clients.remove(node.id(), client))
client.forceClose();
}
}
}
/**
* Returns existing or just created client to node.
*
* @param node Node to which client should be open.
* @return The existing or just created client.
* @throws IgniteCheckedException Thrown if any exception occurs.
*/
private GridCommunicationClient reserveClient(ClusterNode node) throws IgniteCheckedException {
assert node != null;
UUID nodeId = node.id();
while (true) {
GridCommunicationClient client = clients.get(nodeId);
if (client == null) {
if (isNodeStopping())
throw new IgniteSpiException("Node is stopping.");
// Do not allow concurrent connects.
GridFutureAdapter<GridCommunicationClient> fut = new ConnectFuture();
GridFutureAdapter<GridCommunicationClient> oldFut = clientFuts.putIfAbsent(nodeId, fut);
if (oldFut == null) {
try {
GridCommunicationClient client0 = clients.get(nodeId);
if (client0 == null) {
client0 = createNioClient(node);
if (client0 != null) {
GridCommunicationClient old = clients.put(nodeId, client0);
assert old == null : "Client already created " +
"[node=" + node + ", client=" + client0 + ", oldClient=" + old + ']';
if (client0 instanceof GridTcpNioCommunicationClient) {
GridTcpNioCommunicationClient tcpClient = ((GridTcpNioCommunicationClient)client0);
if (tcpClient.session().closeTime() > 0 && clients.remove(nodeId, client0)) {
if (log.isDebugEnabled())
log.debug("Session was closed after client creation, will retry " +
"[node=" + node + ", client=" + client0 + ']');
client0 = null;
}
}
}
else
U.sleep(200);
}
fut.onDone(client0);
}
catch (Throwable e) {
fut.onDone(e);
if (e instanceof Error)
throw (Error)e;
}
finally {
clientFuts.remove(nodeId, fut);
}
}
else
fut = oldFut;
client = fut.get();
if (client == null)
continue;
if (getSpiContext().node(nodeId) == null) {
if (clients.remove(nodeId, client))
client.forceClose();
throw new IgniteSpiException("Destination node is not in topology: " + node.id());
}
}
if (client.reserve())
return client;
else
// Client has just been closed by idle worker. Help it and try again.
clients.remove(nodeId, client);
}
}
/**
* @param node Node to create client for.
* @return Client.
* @throws IgniteCheckedException If failed.
*/
@Nullable protected GridCommunicationClient createNioClient(ClusterNode node) throws IgniteCheckedException {
assert node != null;
Integer shmemPort = node.attribute(createSpiAttributeName(ATTR_SHMEM_PORT));
ClusterNode locNode = getSpiContext().localNode();
if (locNode == null)
throw new IgniteCheckedException("Failed to create NIO client (local node is stopping)");
// If remote node has shared memory server enabled and has the same set of MACs
// then we are likely to run on the same host and shared memory communication could be tried.
if (shmemPort != null && U.sameMacs(locNode, node)) {
try {
return createShmemClient(node, shmemPort);
}
catch (IgniteCheckedException e) {
if (e.hasCause(IpcOutOfSystemResourcesException.class))
// Has cause or is itself the IpcOutOfSystemResourcesException.
LT.warn(log, null, OUT_OF_RESOURCES_TCP_MSG);
else if (getSpiContext().node(node.id()) != null)
LT.warn(log, null, e.getMessage());
else if (log.isDebugEnabled())
log.debug("Failed to establish shared memory connection with local node (node has left): " +
node.id());
}
}
return createTcpClient(node);
}
/**
* @param node Node.
* @param port Port.
* @return Client.
* @throws IgniteCheckedException If failed.
*/
@Nullable protected GridCommunicationClient createShmemClient(ClusterNode node, Integer port) throws IgniteCheckedException {
int attempt = 1;
int connectAttempts = 1;
long connTimeout0 = connTimeout;
while (true) {
GridCommunicationClient client;
try {
client = new GridShmemCommunicationClient(metricsLsnr,
port,
connTimeout,
log,
getSpiContext().messageFormatter());
}
catch (IgniteCheckedException e) {
// Reconnect for the second time, if connection is not established.
if (connectAttempts < 2 && X.hasCause(e, ConnectException.class)) {
connectAttempts++;
continue;
}
throw e;
}
try {
safeHandshake(client, null, node.id(), connTimeout0, null);
}
catch (HandshakeTimeoutException e) {
if (log.isDebugEnabled())
log.debug("Handshake timedout (will retry with increased timeout) [timeout=" + connTimeout0 +
", err=" + e.getMessage() + ", client=" + client + ']');
client.forceClose();
if (attempt == reconCnt || connTimeout0 > maxConnTimeout) {
if (log.isDebugEnabled())
log.debug("Handshake timedout (will stop attempts to perform the handshake) " +
"[timeout=" + connTimeout0 + ", maxConnTimeout=" + maxConnTimeout +
", attempt=" + attempt + ", reconCnt=" + reconCnt +
", err=" + e.getMessage() + ", client=" + client + ']');
throw e;
}
else {
attempt++;
connTimeout0 *= 2;
continue;
}
}
catch (IgniteCheckedException | RuntimeException | Error e) {
if (log.isDebugEnabled())
log.debug(
"Caught exception (will close client) [err=" + e.getMessage() + ", client=" + client + ']');
client.forceClose();
throw e;
}
return client;
}
}
/**
* Checks client message queue size and initiates client drop if message queue size exceeds the configured limit.
*
* @param ses Node communication session.
* @param msgQueueSize Message queue size.
*/
private void checkClientQueueSize(GridNioSession ses, int msgQueueSize) {
if (slowClientQueueLimit > 0 && msgQueueSize > slowClientQueueLimit) {
UUID id = ses.meta(NODE_ID_META);
if (id != null) {
ClusterNode node = getSpiContext().node(id);
if (node != null && node.isClient()) {
String msg = "Client node outbound message queue size exceeded slowClientQueueLimit, " +
"the client will be dropped " +
"(consider changing 'slowClientQueueLimit' configuration property) " +
"[srvNode=" + getSpiContext().localNode().id() +
", clientNode=" + node +
", slowClientQueueLimit=" + slowClientQueueLimit + ']';
U.quietAndWarn(
log,
msg);
getSpiContext().failNode(id, msg);
}
}
}
}
/**
* Establish TCP connection to remote node and returns client.
*
* @param node Remote node.
* @return Client.
* @throws IgniteCheckedException If failed.
*/
protected GridCommunicationClient createTcpClient(ClusterNode node) throws IgniteCheckedException {
Collection<String> rmtAddrs0 = node.attribute(createSpiAttributeName(ATTR_ADDRS));
Collection<String> rmtHostNames0 = node.attribute(createSpiAttributeName(ATTR_HOST_NAMES));
Integer boundPort = node.attribute(createSpiAttributeName(ATTR_PORT));
Collection<InetSocketAddress> extAddrs = node.attribute(createSpiAttributeName(ATTR_EXT_ADDRS));
boolean isRmtAddrsExist = (!F.isEmpty(rmtAddrs0) && boundPort != null);
boolean isExtAddrsExist = !F.isEmpty(extAddrs);
if (!isRmtAddrsExist && !isExtAddrsExist)
throw new IgniteCheckedException("Failed to send message to the destination node. Node doesn't have any " +
"TCP communication addresses or mapped external addresses. Check configuration and make sure " +
"that you use the same communication SPI on all nodes. Remote node id: " + node.id());
LinkedHashSet<InetSocketAddress> addrs;
// Try to connect first on bound addresses.
if (isRmtAddrsExist) {
List<InetSocketAddress> addrs0 = new ArrayList<>(U.toSocketAddresses(rmtAddrs0, rmtHostNames0, boundPort));
boolean sameHost = U.sameMacs(getSpiContext().localNode(), node);
Collections.sort(addrs0, U.inetAddressesComparator(sameHost));
addrs = new LinkedHashSet<>(addrs0);
}
else
addrs = new LinkedHashSet<>();
// Then on mapped external addresses.
if (isExtAddrsExist)
addrs.addAll(extAddrs);
boolean conn = false;
GridCommunicationClient client = null;
IgniteCheckedException errs = null;
int connectAttempts = 1;
for (InetSocketAddress addr : addrs) {
long connTimeout0 = connTimeout;
int attempt = 1;
while (!conn) { // Reconnection on handshake timeout.
try {
SocketChannel ch = SocketChannel.open();
ch.configureBlocking(true);
ch.socket().setTcpNoDelay(tcpNoDelay);
ch.socket().setKeepAlive(true);
if (sockRcvBuf > 0)
ch.socket().setReceiveBufferSize(sockRcvBuf);
if (sockSndBuf > 0)
ch.socket().setSendBufferSize(sockSndBuf);
GridNioRecoveryDescriptor recoveryDesc = recoveryDescriptor(node);
if (!recoveryDesc.reserve()) {
U.closeQuiet(ch);
return null;
}
long rcvCnt = -1;
SSLEngine sslEngine = null;
try {
ch.socket().connect(addr, (int)connTimeout);
if (isSslEnabled()) {
sslEngine = ignite.configuration().getSslContextFactory().create().createSSLEngine();
sslEngine.setUseClientMode(true);
}
rcvCnt = safeHandshake(ch, recoveryDesc, node.id(), connTimeout0, sslEngine);
if (rcvCnt == -1)
return null;
}
finally {
if (recoveryDesc != null && rcvCnt == -1)
recoveryDesc.release();
}
try {
Map<Integer, Object> meta = new HashMap<>();
meta.put(NODE_ID_META, node.id());
if (isSslEnabled()) {
assert sslEngine != null;
meta.put(GridNioSessionMetaKey.SSL_ENGINE.ordinal(), sslEngine);
}
if (recoveryDesc != null) {
recoveryDesc.onHandshake(rcvCnt);
meta.put(-1, recoveryDesc);
}
GridNioSession ses = nioSrvr.createSession(ch, meta).get();
client = new GridTcpNioCommunicationClient(ses, log);
conn = true;
}
finally {
if (!conn) {
if (recoveryDesc != null)
recoveryDesc.release();
}
}
}
catch (HandshakeTimeoutException e) {
if (client != null) {
client.forceClose();
client = null;
}
onException("Handshake timedout (will retry with increased timeout) [timeout=" + connTimeout0 +
", addr=" + addr + ']', e);
if (log.isDebugEnabled())
log.debug(
"Handshake timedout (will retry with increased timeout) [timeout=" + connTimeout0 +
", addr=" + addr + ", err=" + e + ']');
if (attempt == reconCnt || connTimeout0 > maxConnTimeout) {
if (log.isDebugEnabled())
log.debug("Handshake timedout (will stop attempts to perform the handshake) " +
"[timeout=" + connTimeout0 + ", maxConnTimeout=" + maxConnTimeout +
", attempt=" + attempt + ", reconCnt=" + reconCnt +
", err=" + e.getMessage() + ", addr=" + addr + ']');
if (errs == null)
errs = new IgniteCheckedException("Failed to connect to node (is node still alive?). " +
"Make sure that each GridComputeTask and GridCacheTransaction has a timeout set " +
"in order to prevent parties from waiting forever in case of network issues " +
"[nodeId=" + node.id() + ", addrs=" + addrs + ']');
errs.addSuppressed(new IgniteCheckedException("Failed to connect to address: " + addr, e));
break;
}
else {
attempt++;
connTimeout0 *= 2;
// Continue loop.
}
}
catch (Exception e) {
if (client != null) {
client.forceClose();
client = null;
}
onException("Client creation failed [addr=" + addr + ", err=" + e + ']', e);
if (log.isDebugEnabled())
log.debug("Client creation failed [addr=" + addr + ", err=" + e + ']');
if (X.hasCause(e, SocketTimeoutException.class))
LT.warn(log, null, "Connect timed out (consider increasing 'connTimeout' " +
"configuration property) [addr=" + addr + ", connTimeout=" + connTimeout + ']');
if (errs == null)
errs = new IgniteCheckedException("Failed to connect to node (is node still alive?). " +
"Make sure that each GridComputeTask and GridCacheTransaction has a timeout set " +
"in order to prevent parties from waiting forever in case of network issues " +
"[nodeId=" + node.id() + ", addrs=" + addrs + ']');
errs.addSuppressed(new IgniteCheckedException("Failed to connect to address: " + addr, e));
// Reconnect for the second time, if connection is not established.
if (connectAttempts < 2 &&
(e instanceof ConnectException || X.hasCause(e, ConnectException.class))) {
connectAttempts++;
continue;
}
break;
}
}
if (conn)
break;
}
if (client == null) {
assert errs != null;
if (X.hasCause(errs, ConnectException.class))
LT.warn(log, null, "Failed to connect to a remote node " +
"(make sure that destination node is alive and " +
"operating system firewall is disabled on local and remote hosts) " +
"[addrs=" + addrs + ']');
throw errs;
}
if (log.isDebugEnabled())
log.debug("Created client: " + client);
return client;
}
/**
* Performs handshake in timeout-safe way.
*
* @param client Client.
* @param recovery Recovery descriptor if use recovery handshake, otherwise {@code null}.
* @param rmtNodeId Remote node.
* @param timeout Timeout for handshake.
* @param ssl SSL engine if used cryptography, otherwise {@code null}.
* @throws IgniteCheckedException If handshake failed or wasn't completed withing timeout.
* @return Handshake response.
*/
@SuppressWarnings("ThrowFromFinallyBlock")
private <T> long safeHandshake(
T client,
@Nullable GridNioRecoveryDescriptor recovery,
UUID rmtNodeId,
long timeout,
@Nullable SSLEngine ssl
) throws IgniteCheckedException {
HandshakeTimeoutObject<T> obj = new HandshakeTimeoutObject<>(client, U.currentTimeMillis() + timeout);
addTimeoutObject(obj);
long rcvCnt = 0;
try {
if (client instanceof GridCommunicationClient)
((GridCommunicationClient)client).doHandshake(new HandshakeClosure(rmtNodeId));
else {
SocketChannel ch = (SocketChannel)client;
boolean success = false;
try {
BlockingSslHandler sslHnd = null;
ByteBuffer buf;
if (isSslEnabled()) {
sslHnd = new BlockingSslHandler(ssl, ch, directBuf, ByteOrder.nativeOrder(), log);
if (!sslHnd.handshake())
throw new IgniteCheckedException("SSL handshake is not completed.");
ByteBuffer handBuff = sslHnd.applicationBuffer();
if (handBuff.remaining() < 17) {
buf = ByteBuffer.allocate(1000);
int read = ch.read(buf);
if (read == -1)
throw new IgniteCheckedException("Failed to read remote node ID (connection closed).");
buf.flip();
buf = sslHnd.decode(buf);
}
else
buf = handBuff;
}
else {
buf = ByteBuffer.allocate(17);
for (int i = 0; i < 17; ) {
int read = ch.read(buf);
if (read == -1)
throw new IgniteCheckedException("Failed to read remote node ID (connection closed).");
i += read;
}
}
UUID rmtNodeId0 = U.bytesToUuid(buf.array(), 1);
if (!rmtNodeId.equals(rmtNodeId0))
throw new IgniteCheckedException("Remote node ID is not as expected [expected=" + rmtNodeId +
", rcvd=" + rmtNodeId0 + ']');
else if (log.isDebugEnabled())
log.debug("Received remote node ID: " + rmtNodeId0);
if (isSslEnabled() ) {
assert sslHnd != null;
ch.write(sslHnd.encrypt(ByteBuffer.wrap(U.IGNITE_HEADER)));
}
else
ch.write(ByteBuffer.wrap(U.IGNITE_HEADER));
if (recovery != null) {
HandshakeMessage msg = new HandshakeMessage(getLocalNodeId(),
recovery.incrementConnectCount(),
recovery.receivedCount());
if (log.isDebugEnabled())
log.debug("Write handshake message [rmtNode=" + rmtNodeId + ", msg=" + msg + ']');
buf = ByteBuffer.allocate(33);
buf.order(ByteOrder.nativeOrder());
boolean written = msg.writeTo(buf, getSpiContext().messageFormatter().writer());
assert written;
buf.flip();
if (isSslEnabled()) {
assert sslHnd != null;
ch.write(sslHnd.encrypt(buf));
}
else
ch.write(buf);
}
else {
if (isSslEnabled()) {
assert sslHnd != null;
ch.write(sslHnd.encrypt(ByteBuffer.wrap(nodeIdMsg.nodeIdBytesWithType)));
}
else
ch.write(ByteBuffer.wrap(nodeIdMsg.nodeIdBytesWithType));
}
if (recovery != null) {
if (log.isDebugEnabled())
log.debug("Waiting for handshake [rmtNode=" + rmtNodeId + ']');
if (isSslEnabled()) {
assert sslHnd != null;
buf = ByteBuffer.allocate(1000);
ByteBuffer decode = null;
buf.order(ByteOrder.nativeOrder());
for (int i = 0; i < 9; ) {
int read = ch.read(buf);
if (read == -1)
throw new IgniteCheckedException("Failed to read remote node recovery handshake " +
"(connection closed).");
buf.flip();
decode = sslHnd.decode(buf);
i += decode.remaining();
buf.flip();
buf.compact();
}
rcvCnt = decode.getLong(1);
}
else {
buf = ByteBuffer.allocate(9);
buf.order(ByteOrder.nativeOrder());
for (int i = 0; i < 9; ) {
int read = ch.read(buf);
if (read == -1)
throw new IgniteCheckedException("Failed to read remote node recovery handshake " +
"(connection closed).");
i += read;
}
rcvCnt = buf.getLong(1);
}
if (log.isDebugEnabled())
log.debug("Received handshake message [rmtNode=" + rmtNodeId + ", rcvCnt=" + rcvCnt + ']');
if (rcvCnt == -1) {
if (log.isDebugEnabled())
log.debug("Connection rejected, will retry client creation [rmtNode=" + rmtNodeId + ']');
}
else
success = true;
}
else
success = true;
}
catch (IOException e) {
if (log.isDebugEnabled())
log.debug("Failed to read from channel: " + e);
throw new IgniteCheckedException("Failed to read from channel.", e);
}
finally {
if (!success)
U.closeQuiet(ch);
}
}
}
finally {
boolean cancelled = obj.cancel();
if (cancelled)
removeTimeoutObject(obj);
// Ignoring whatever happened after timeout - reporting only timeout event.
if (!cancelled)
throw new HandshakeTimeoutException("Failed to perform handshake due to timeout (consider increasing " +
"'connectionTimeout' configuration property).");
}
return rcvCnt;
}
/**
* @param sndId Sender ID.
* @param msg Communication message.
* @param msgC Closure to call when message processing finished.
*/
protected void notifyListener(UUID sndId, Message msg, IgniteRunnable msgC) {
CommunicationListener<Message> lsnr = this.lsnr;
if (lsnr != null)
// Notify listener of a new message.
lsnr.onMessage(sndId, msg, msgC);
else if (log.isDebugEnabled())
log.debug("Received communication message without any registered listeners (will ignore, " +
"is node stopping?) [senderNodeId=" + sndId + ", msg=" + msg + ']');
}
/**
* Stops service threads to simulate node failure.
*
* FOR TEST PURPOSES ONLY!!!
*/
void simulateNodeFailure() {
if (nioSrvr != null)
nioSrvr.stop();
U.interrupt(commWorker);
U.join(commWorker, log);
for (GridCommunicationClient client : clients.values())
client.forceClose();
}
/**
* @param node Node.
* @return Recovery receive data for given node.
*/
private GridNioRecoveryDescriptor recoveryDescriptor(ClusterNode node) {
ClientKey id = new ClientKey(node.id(), node.order());
GridNioRecoveryDescriptor recovery = recoveryDescs.get(id);
if (recovery == null) {
int maxSize = Math.max(msgQueueLimit, ackSndThreshold);
int queueLimit = unackedMsgsBufSize != 0 ? unackedMsgsBufSize : (maxSize * 5);
GridNioRecoveryDescriptor old =
recoveryDescs.putIfAbsent(id, recovery = new GridNioRecoveryDescriptor(queueLimit, node, log));
if (old != null)
recovery = old;
}
return recovery;
}
/**
* @param msg Error message.
* @param e Exception.
*/
private void onException(String msg, Exception e) {
getExceptionRegistry().onException(msg, e);
}
/** {@inheritDoc} */
@Override public String toString() {
return S.toString(TcpCommunicationSpi.class, this);
}
/**
*
*/
private static class ClientKey {
/** */
private UUID nodeId;
/** */
private long order;
/**
* @param nodeId Node ID.
* @param order Node order.
*/
private ClientKey(UUID nodeId, long order) {
this.nodeId = nodeId;
this.order = order;
}
/** {@inheritDoc} */
@Override public boolean equals(Object obj) {
if (this == obj)
return true;
if (obj == null || getClass() != obj.getClass())
return false;
ClientKey other = (ClientKey)obj;
return order == other.order && nodeId.equals(other.nodeId);
}
/** {@inheritDoc} */
@Override public int hashCode() {
int res = nodeId.hashCode();
res = 31 * res + (int)(order ^ (order >>> 32));
return res;
}
/** {@inheritDoc} */
@Override public String toString() {
return S.toString(ClientKey.class, this);
}
}
/** Internal exception class for proper timeout handling. */
private static class HandshakeTimeoutException extends IgniteCheckedException {
/** */
private static final long serialVersionUID = 0L;
/**
* @param msg Message.
*/
HandshakeTimeoutException(String msg) {
super(msg);
}
}
/**
* This worker takes responsibility to shut the server down when stopping,
* No other thread shall stop passed server.
*/
private class ShmemAcceptWorker extends GridWorker {
/** */
private final IpcSharedMemoryServerEndpoint srv;
/**
* @param srv Server.
*/
ShmemAcceptWorker(IpcSharedMemoryServerEndpoint srv) {
super(gridName, "shmem-communication-acceptor", TcpCommunicationSpi.this.log);
this.srv = srv;
}
/** {@inheritDoc} */
@Override protected void body() throws InterruptedException {
try {
while (!Thread.interrupted()) {
ShmemWorker e = new ShmemWorker(srv.accept());
shmemWorkers.add(e);
new IgniteThread(e).start();
}
}
catch (IgniteCheckedException e) {
if (!isCancelled())
U.error(log, "Shmem server failed.", e);
}
finally {
srv.close();
}
}
/** {@inheritDoc} */
@Override public void cancel() {
super.cancel();
srv.close();
}
}
/**
*
*/
private class ShmemWorker extends GridWorker {
/** */
private final IpcEndpoint endpoint;
/**
* @param endpoint Endpoint.
*/
private ShmemWorker(IpcEndpoint endpoint) {
super(gridName, "shmem-worker", TcpCommunicationSpi.this.log);
this.endpoint = endpoint;
}
/** {@inheritDoc} */
@Override protected void body() throws InterruptedException {
try {
MessageFactory msgFactory = new MessageFactory() {
private MessageFactory impl;
@Nullable @Override public Message create(byte type) {
if (impl == null)
impl = getSpiContext().messageFactory();
assert impl != null;
return impl.create(type);
}
};
MessageFormatter msgFormatter = new MessageFormatter() {
private MessageFormatter impl;
@Override public MessageWriter writer() {
if (impl == null)
impl = getSpiContext().messageFormatter();
assert impl != null;
return impl.writer();
}
@Override public MessageReader reader(MessageFactory factory) {
if (impl == null)
impl = getSpiContext().messageFormatter();
assert impl != null;
return impl.reader(factory);
}
};
IpcToNioAdapter<Message> adapter = new IpcToNioAdapter<>(
metricsLsnr,
log,
endpoint,
srvLsnr,
msgFormatter,
new GridNioCodecFilter(new GridDirectParser(msgFactory, msgFormatter), log, true),
new GridConnectionBytesVerifyFilter(log)
);
adapter.serve();
}
finally {
shmemWorkers.remove(this);
endpoint.close();
}
}
/** {@inheritDoc} */
@Override public void cancel() {
super.cancel();
endpoint.close();
}
/** @{@inheritDoc} */
@Override protected void cleanup() {
super.cleanup();
endpoint.close();
}
/** @{@inheritDoc} */
@Override public String toString() {
return S.toString(ShmemWorker.class, this);
}
}
/**
*
*/
private class CommunicationWorker extends IgniteSpiThread {
/** */
private final BlockingQueue<GridNioRecoveryDescriptor> q = new LinkedBlockingQueue<>();
/**
*
*/
private CommunicationWorker() {
super(gridName, "tcp-comm-worker", log);
}
/** {@inheritDoc} */
@Override protected void body() throws InterruptedException {
if (log.isDebugEnabled())
log.debug("Tcp communication worker has been started.");
while (!isInterrupted()) {
GridNioRecoveryDescriptor recoveryDesc = q.poll(idleConnTimeout, TimeUnit.MILLISECONDS);
if (recoveryDesc != null)
processRecovery(recoveryDesc);
else
processIdle();
}
}
/**
*
*/
private void processIdle() {
cleanupRecovery();
for (Map.Entry<UUID, GridCommunicationClient> e : clients.entrySet()) {
UUID nodeId = e.getKey();
GridCommunicationClient client = e.getValue();
ClusterNode node = getSpiContext().node(nodeId);
if (node == null) {
if (log.isDebugEnabled())
log.debug("Forcing close of non-existent node connection: " + nodeId);
client.forceClose();
clients.remove(nodeId, client);
continue;
}
GridNioRecoveryDescriptor recovery = null;
if (client instanceof GridTcpNioCommunicationClient) {
recovery = recoveryDescs.get(new ClientKey(node.id(), node.order()));
if (recovery != null && recovery.lastAcknowledged() != recovery.received()) {
RecoveryLastReceivedMessage msg = new RecoveryLastReceivedMessage(recovery.received());
if (log.isDebugEnabled())
log.debug("Send recovery acknowledgement on timeout [rmtNode=" + nodeId +
", rcvCnt=" + msg.received() + ']');
nioSrvr.sendSystem(((GridTcpNioCommunicationClient)client).session(), msg);
recovery.lastAcknowledged(msg.received());
continue;
}
}
long idleTime = client.getIdleTime();
if (idleTime >= idleConnTimeout) {
if (recovery != null &&
recovery.nodeAlive(getSpiContext().node(nodeId)) &&
!recovery.messagesFutures().isEmpty()) {
if (log.isDebugEnabled())
log.debug("Node connection is idle, but there are unacknowledged messages, " +
"will wait: " + nodeId);
continue;
}
if (log.isDebugEnabled())
log.debug("Closing idle node connection: " + nodeId);
if (client.close() || client.closed())
clients.remove(nodeId, client);
}
}
}
/**
*
*/
private void cleanupRecovery() {
Set<ClientKey> left = null;
for (Map.Entry<ClientKey, GridNioRecoveryDescriptor> e : recoveryDescs.entrySet()) {
if (left != null && left.contains(e.getKey()))
continue;
GridNioRecoveryDescriptor recoverySnd = e.getValue();
if (!recoverySnd.nodeAlive(getSpiContext().node(recoverySnd.node().id()))) {
if (left == null)
left = new HashSet<>();
left.add(e.getKey());
}
}
if (left != null) {
assert !left.isEmpty();
for (ClientKey id : left) {
GridNioRecoveryDescriptor recoverySnd = recoveryDescs.remove(id);
if (recoverySnd != null)
recoverySnd.onNodeLeft();
}
}
}
/**
* @param recoveryDesc Recovery descriptor.
*/
private void processRecovery(GridNioRecoveryDescriptor recoveryDesc) {
ClusterNode node = recoveryDesc.node();
if (clients.containsKey(node.id()) ||
!recoveryDesc.nodeAlive(getSpiContext().node(node.id())) ||
!getSpiContext().pingNode(node.id()))
return;
try {
if (log.isDebugEnabled())
log.debug("Recovery reconnect [rmtNode=" + recoveryDesc.node().id() + ']');
GridCommunicationClient client = reserveClient(node);
client.release();
}
catch (IgniteCheckedException | IgniteException e) {
if (recoveryDesc.nodeAlive(getSpiContext().node(node.id()))) {
if (log.isDebugEnabled())
log.debug("Recovery reconnect failed, will retry " +
"[rmtNode=" + recoveryDesc.node().id() + ", err=" + e + ']');
addReconnectRequest(recoveryDesc);
}
else {
if (log.isDebugEnabled())
log.debug("Recovery reconnect failed, " +
"node left [rmtNode=" + recoveryDesc.node().id() + ", err=" + e + ']');
onException("Recovery reconnect failed, node left [rmtNode=" + recoveryDesc.node().id() + "]",
e);
}
}
}
/**
* @param recoverySnd Recovery send data.
*/
void addReconnectRequest(GridNioRecoveryDescriptor recoverySnd) {
boolean add = q.add(recoverySnd);
assert add;
}
}
/**
*
*/
private static class ConnectFuture extends GridFutureAdapter<GridCommunicationClient> {
/** */
private static final long serialVersionUID = 0L;
// No-op.
}
/**
*
*/
private static class HandshakeTimeoutObject<T> implements IgniteSpiTimeoutObject {
/** */
private final IgniteUuid id = IgniteUuid.randomUuid();
/** */
private final T obj;
/** */
private final long endTime;
/** */
private final AtomicBoolean done = new AtomicBoolean();
/**
* @param obj Client.
* @param endTime End time.
*/
private HandshakeTimeoutObject(T obj, long endTime) {
assert obj != null;
assert obj instanceof GridCommunicationClient || obj instanceof SelectableChannel;
assert endTime > 0;
this.obj = obj;
this.endTime = endTime;
}
/**
* @return {@code True} if object has not yet been timed out.
*/
boolean cancel() {
return done.compareAndSet(false, true);
}
/** {@inheritDoc} */
@Override public void onTimeout() {
if (done.compareAndSet(false, true)) {
// Close socket - timeout occurred.
if (obj instanceof GridCommunicationClient)
((GridCommunicationClient)obj).forceClose();
else
U.closeQuiet((AbstractInterruptibleChannel)obj);
}
}
/** {@inheritDoc} */
@Override public long endTime() {
return endTime;
}
/** {@inheritDoc} */
@Override public IgniteUuid id() {
return id;
}
/** {@inheritDoc} */
@Override public String toString() {
return S.toString(HandshakeTimeoutObject.class, this);
}
}
/**
*
*/
private class HandshakeClosure extends IgniteInClosure2X<InputStream, OutputStream> {
/** */
private static final long serialVersionUID = 0L;
/** */
private final UUID rmtNodeId;
/**
* @param rmtNodeId Remote node ID.
*/
private HandshakeClosure(UUID rmtNodeId) {
this.rmtNodeId = rmtNodeId;
}
/** {@inheritDoc} */
@SuppressWarnings("ThrowFromFinallyBlock")
@Override public void applyx(InputStream in, OutputStream out) throws IgniteCheckedException {
try {
// Handshake.
byte[] b = new byte[17];
int n = 0;
while (n < 17) {
int cnt = in.read(b, n, 17 - n);
if (cnt < 0)
throw new IgniteCheckedException("Failed to get remote node ID (end of stream reached)");
n += cnt;
}
// First 4 bytes are for length.
UUID id = U.bytesToUuid(b, 1);
if (!rmtNodeId.equals(id))
throw new IgniteCheckedException("Remote node ID is not as expected [expected=" + rmtNodeId +
", rcvd=" + id + ']');
else if (log.isDebugEnabled())
log.debug("Received remote node ID: " + id);
}
catch (SocketTimeoutException e) {
throw new IgniteCheckedException("Failed to perform handshake due to timeout (consider increasing " +
"'connectionTimeout' configuration property).", e);
}
catch (IOException e) {
throw new IgniteCheckedException("Failed to perform handshake.", e);
}
try {
out.write(U.IGNITE_HEADER);
out.write(NODE_ID_MSG_TYPE);
out.write(nodeIdMsg.nodeIdBytes);
out.flush();
if (log.isDebugEnabled())
log.debug("Sent local node ID [locNodeId=" + getLocalNodeId() + ", rmtNodeId="
+ rmtNodeId + ']');
}
catch (IOException e) {
throw new IgniteCheckedException("Failed to perform handshake.", e);
}
}
}
/**
* Handshake message.
*/
@SuppressWarnings("PublicInnerClass")
public static class HandshakeMessage implements Message {
/** */
private static final long serialVersionUID = 0L;
/** */
private UUID nodeId;
/** */
private long rcvCnt;
/** */
private long connectCnt;
/**
* Default constructor required by {@link Message}.
*/
public HandshakeMessage() {
// No-op.
}
/**
* @param nodeId Node ID.
* @param connectCnt Connect count.
* @param rcvCnt Number of received messages.
*/
public HandshakeMessage(UUID nodeId, long connectCnt, long rcvCnt) {
assert nodeId != null;
assert rcvCnt >= 0 : rcvCnt;
this.nodeId = nodeId;
this.connectCnt = connectCnt;
this.rcvCnt = rcvCnt;
}
/**
* @return Connect count.
*/
public long connectCount() {
return connectCnt;
}
/**
* @return Number of received messages.
*/
public long received() {
return rcvCnt;
}
/**
* @return Node ID.
*/
public UUID nodeId() {
return nodeId;
}
/** {@inheritDoc} */
@Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
if (buf.remaining() < 33)
return false;
buf.put(HANDSHAKE_MSG_TYPE);
byte[] bytes = U.uuidToBytes(nodeId);
assert bytes.length == 16 : bytes.length;
buf.put(bytes);
buf.putLong(rcvCnt);
buf.putLong(connectCnt);
return true;
}
/** {@inheritDoc} */
@Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
if (buf.remaining() < 32)
return false;
byte[] nodeIdBytes = new byte[16];
buf.get(nodeIdBytes);
nodeId = U.bytesToUuid(nodeIdBytes, 0);
rcvCnt = buf.getLong();
connectCnt = buf.getLong();
return true;
}
/** {@inheritDoc} */
@Override public byte directType() {
return HANDSHAKE_MSG_TYPE;
}
/** {@inheritDoc} */
@Override public byte fieldsCount() {
throw new UnsupportedOperationException();
}
/** {@inheritDoc} */
@Override public String toString() {
return S.toString(HandshakeMessage.class, this);
}
}
/**
* Recovery acknowledgment message.
*/
@SuppressWarnings("PublicInnerClass")
public static class RecoveryLastReceivedMessage implements Message {
/** */
private static final long serialVersionUID = 0L;
/** */
private long rcvCnt;
/**
* Default constructor required by {@link Message}.
*/
public RecoveryLastReceivedMessage() {
// No-op.
}
/**
* @param rcvCnt Number of received messages.
*/
public RecoveryLastReceivedMessage(long rcvCnt) {
this.rcvCnt = rcvCnt;
}
/**
* @return Number of received messages.
*/
public long received() {
return rcvCnt;
}
/** {@inheritDoc} */
@Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
if (buf.remaining() < 9)
return false;
buf.put(RECOVERY_LAST_ID_MSG_TYPE);
buf.putLong(rcvCnt);
return true;
}
/** {@inheritDoc} */
@Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
if (buf.remaining() < 8)
return false;
rcvCnt = buf.getLong();
return true;
}
/** {@inheritDoc} */
@Override public byte directType() {
return RECOVERY_LAST_ID_MSG_TYPE;
}
/** {@inheritDoc} */
@Override public byte fieldsCount() {
return 0;
}
/** {@inheritDoc} */
@Override public String toString() {
return S.toString(RecoveryLastReceivedMessage.class, this);
}
}
/**
* Node ID message.
*/
@SuppressWarnings("PublicInnerClass")
public static class NodeIdMessage implements Message {
/** */
private static final long serialVersionUID = 0L;
/** */
private byte[] nodeIdBytes;
/** */
private byte[] nodeIdBytesWithType;
/** */
public NodeIdMessage() {
// No-op.
}
/**
* @param nodeId Node ID.
*/
private NodeIdMessage(UUID nodeId) {
nodeIdBytes = U.uuidToBytes(nodeId);
nodeIdBytesWithType = new byte[nodeIdBytes.length + 1];
nodeIdBytesWithType[0] = NODE_ID_MSG_TYPE;
System.arraycopy(nodeIdBytes, 0, nodeIdBytesWithType, 1, nodeIdBytes.length);
}
/** {@inheritDoc} */
@Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
assert nodeIdBytes.length == 16;
if (buf.remaining() < 17)
return false;
buf.put(NODE_ID_MSG_TYPE);
buf.put(nodeIdBytes);
return true;
}
/** {@inheritDoc} */
@Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
if (buf.remaining() < 16)
return false;
nodeIdBytes = new byte[16];
buf.get(nodeIdBytes);
return true;
}
/** {@inheritDoc} */
@Override public byte directType() {
return NODE_ID_MSG_TYPE;
}
/** {@inheritDoc} */
@Override public byte fieldsCount() {
return 0;
}
/** {@inheritDoc} */
@Override public String toString() {
return S.toString(NodeIdMessage.class, this);
}
}
}