| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.ignite.spi.communication.tcp; |
| |
| import org.apache.ignite.*; |
| import org.apache.ignite.cluster.*; |
| import org.apache.ignite.configuration.*; |
| import org.apache.ignite.events.*; |
| import org.apache.ignite.internal.*; |
| import org.apache.ignite.internal.managers.eventstorage.*; |
| import org.apache.ignite.internal.util.*; |
| import org.apache.ignite.internal.util.future.*; |
| import org.apache.ignite.internal.util.ipc.*; |
| import org.apache.ignite.internal.util.ipc.shmem.*; |
| import org.apache.ignite.internal.util.lang.*; |
| import org.apache.ignite.internal.util.nio.*; |
| import org.apache.ignite.internal.util.nio.ssl.*; |
| import org.apache.ignite.internal.util.typedef.*; |
| import org.apache.ignite.internal.util.typedef.internal.*; |
| import org.apache.ignite.internal.util.worker.*; |
| import org.apache.ignite.lang.*; |
| import org.apache.ignite.plugin.extensions.communication.*; |
| import org.apache.ignite.resources.*; |
| import org.apache.ignite.spi.*; |
| import org.apache.ignite.spi.communication.*; |
| import org.apache.ignite.thread.*; |
| import org.jetbrains.annotations.*; |
| import org.jsr166.*; |
| |
| import javax.net.ssl.*; |
| import java.io.*; |
| import java.net.*; |
| import java.nio.*; |
| import java.nio.channels.*; |
| import java.nio.channels.spi.*; |
| import java.util.*; |
| import java.util.concurrent.*; |
| import java.util.concurrent.atomic.*; |
| |
| import static org.apache.ignite.events.EventType.*; |
| |
| /** |
| * <tt>TcpCommunicationSpi</tt> is default communication SPI which uses |
| * TCP/IP protocol and Java NIO to communicate with other nodes. |
| * <p> |
| * To enable communication with other nodes, this SPI adds {@link #ATTR_ADDRS} |
| * and {@link #ATTR_PORT} local node attributes (see {@link ClusterNode#attributes()}. |
| * <p> |
| * At startup, this SPI tries to start listening to local port specified by |
| * {@link #setLocalPort(int)} method. If local port is occupied, then SPI will |
| * automatically increment the port number until it can successfully bind for |
| * listening. {@link #setLocalPortRange(int)} configuration parameter controls |
| * maximum number of ports that SPI will try before it fails. Port range comes |
| * very handy when starting multiple grid nodes on the same machine or even |
| * in the same VM. In this case all nodes can be brought up without a single |
| * change in configuration. |
| * <p> |
| * This SPI caches connections to remote nodes so it does not have to reconnect every |
| * time a message is sent. By default, idle connections are kept active for |
| * {@link #DFLT_IDLE_CONN_TIMEOUT} period and then are closed. Use |
| * {@link #setIdleConnectionTimeout(long)} configuration parameter to configure |
| * you own idle connection timeout. |
| * <p> |
| * <h1 class="header">Configuration</h1> |
| * <h2 class="header">Mandatory</h2> |
| * This SPI has no mandatory configuration parameters. |
| * <h2 class="header">Optional</h2> |
| * The following configuration parameters are optional: |
| * <ul> |
| * <li>Node local IP address (see {@link #setLocalAddress(String)})</li> |
| * <li>Node local port number (see {@link #setLocalPort(int)})</li> |
| * <li>Local port range (see {@link #setLocalPortRange(int)}</li> |
| * <li>Connection buffer flush frequency (see {@link #setConnectionBufferFlushFrequency(long)})</li> |
| * <li>Connection buffer size (see {@link #setConnectionBufferSize(int)})</li> |
| * <li>Idle connection timeout (see {@link #setIdleConnectionTimeout(long)})</li> |
| * <li>Direct or heap buffer allocation (see {@link #setDirectBuffer(boolean)})</li> |
| * <li>Direct or heap buffer allocation for sending (see {@link #setDirectSendBuffer(boolean)})</li> |
| * <li>Count of selectors and selector threads for NIO server (see {@link #setSelectorsCount(int)})</li> |
| * <li>{@code TCP_NODELAY} socket option for sockets (see {@link #setTcpNoDelay(boolean)})</li> |
| * <li>Message queue limit (see {@link #setMessageQueueLimit(int)})</li> |
| * <li>Minimum buffered message count (see {@link #setMinimumBufferedMessageCount(int)})</li> |
| * <li>Connect timeout (see {@link #setConnectTimeout(long)})</li> |
| * <li>Maximum connect timeout (see {@link #setMaxConnectTimeout(long)})</li> |
| * <li>Reconnect attempts count (see {@link #setReconnectCount(int)})</li> |
| * <li>Socket receive buffer size (see {@link #setSocketReceiveBuffer(int)})</li> |
| * <li>Socket send buffer size (see {@link #setSocketSendBuffer(int)})</li> |
| * <li>Socket write timeout (see {@link #setSocketWriteTimeout(long)})</li> |
| * <li>Number of received messages after which acknowledgment is sent (see {@link #setAckSendThreshold(int)})</li> |
| * <li>Maximum number of unacknowledged messages (see {@link #setUnacknowledgedMessagesBufferSize(int)})</li> |
| * </ul> |
| * <h2 class="header">Java Example</h2> |
| * TcpCommunicationSpi is used by default and should be explicitly configured |
| * only if some SPI configuration parameters need to be overridden. |
| * <pre name="code" class="java"> |
| * TcpCommunicationSpi commSpi = new TcpCommunicationSpi(); |
| * |
| * // Override local port. |
| * commSpi.setLocalPort(4321); |
| * |
| * IgniteConfiguration cfg = new IgniteConfiguration(); |
| * |
| * // Override default communication SPI. |
| * cfg.setCommunicationSpi(commSpi); |
| * |
| * // Start grid. |
| * Ignition.start(cfg); |
| * </pre> |
| * <h2 class="header">Spring Example</h2> |
| * TcpCommunicationSpi can be configured from Spring XML configuration file: |
| * <pre name="code" class="xml"> |
| * <bean id="grid.custom.cfg" class="org.apache.ignite.configuration.IgniteConfiguration" singleton="true"> |
| * ... |
| * <property name="communicationSpi"> |
| * <bean class="org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi"> |
| * <!-- Override local port. --> |
| * <property name="localPort" value="4321"/> |
| * </bean> |
| * </property> |
| * ... |
| * </bean> |
| * </pre> |
| * <p> |
| * <img src="http://ignite.incubator.apache.org/images/spring-small.png"> |
| * <br> |
| * For information about Spring framework visit <a href="http://www.springframework.org/">www.springframework.org</a> |
| * @see CommunicationSpi |
| */ |
| @IgniteSpiMultipleInstancesSupport(true) |
| @IgniteSpiConsistencyChecked(optional = false) |
| public class TcpCommunicationSpi extends IgniteSpiAdapter |
| implements CommunicationSpi<Message>, TcpCommunicationSpiMBean { |
| /** IPC error message. */ |
| public static final String OUT_OF_RESOURCES_TCP_MSG = "Failed to allocate shared memory segment " + |
| "(switching to TCP, may be slower)."; |
| |
| /** Node attribute that is mapped to node IP addresses (value is <tt>comm.tcp.addrs</tt>). */ |
| public static final String ATTR_ADDRS = "comm.tcp.addrs"; |
| |
| /** Node attribute that is mapped to node host names (value is <tt>comm.tcp.host.names</tt>). */ |
| public static final String ATTR_HOST_NAMES = "comm.tcp.host.names"; |
| |
| /** Node attribute that is mapped to node port number (value is <tt>comm.tcp.port</tt>). */ |
| public static final String ATTR_PORT = "comm.tcp.port"; |
| |
| /** Node attribute that is mapped to node port number (value is <tt>comm.shmem.tcp.port</tt>). */ |
| public static final String ATTR_SHMEM_PORT = "comm.shmem.tcp.port"; |
| |
| /** Node attribute that is mapped to node's external addresses (value is <tt>comm.tcp.ext-addrs</tt>). */ |
| public static final String ATTR_EXT_ADDRS = "comm.tcp.ext-addrs"; |
| |
| /** Default port which node sets listener to (value is <tt>47100</tt>). */ |
| public static final int DFLT_PORT = 47100; |
| |
| /** Default port which node sets listener for shared memory connections (value is <tt>48100</tt>). */ |
| public static final int DFLT_SHMEM_PORT = 48100; |
| |
| /** Default idle connection timeout (value is <tt>30000</tt>ms). */ |
| public static final long DFLT_IDLE_CONN_TIMEOUT = 30000; |
| |
| /** Default socket send and receive buffer size. */ |
| public static final int DFLT_SOCK_BUF_SIZE = 32 * 1024; |
| |
| /** Default connection timeout (value is <tt>5000</tt>ms). */ |
| public static final long DFLT_CONN_TIMEOUT = 5000; |
| |
| /** Default Maximum connection timeout (value is <tt>600,000</tt>ms). */ |
| public static final long DFLT_MAX_CONN_TIMEOUT = 10 * 60 * 1000; |
| |
| /** Default reconnect attempts count (value is <tt>10</tt>). */ |
| public static final int DFLT_RECONNECT_CNT = 10; |
| |
| /** Default message queue limit per connection (for incoming and outgoing . */ |
| public static final int DFLT_MSG_QUEUE_LIMIT = GridNioServer.DFLT_SEND_QUEUE_LIMIT; |
| |
| /** |
| * Default count of selectors for TCP server equals to |
| * {@code "Math.min(4, Runtime.getRuntime().availableProcessors())"}. |
| */ |
| public static final int DFLT_SELECTORS_CNT = Math.min(4, Runtime.getRuntime().availableProcessors()); |
| |
| /** Node ID meta for session. */ |
| private static final int NODE_ID_META = GridNioSessionMetaKey.nextUniqueKey(); |
| |
| /** Message tracker meta for session. */ |
| private static final int TRACKER_META = GridNioSessionMetaKey.nextUniqueKey(); |
| |
| /** |
| * Default local port range (value is <tt>100</tt>). |
| * See {@link #setLocalPortRange(int)} for details. |
| */ |
| public static final int DFLT_PORT_RANGE = 100; |
| |
| /** Default value for {@code TCP_NODELAY} socket option (value is <tt>true</tt>). */ |
| public static final boolean DFLT_TCP_NODELAY = true; |
| |
| /** Default received messages threshold for sending ack. */ |
| public static final int DFLT_ACK_SND_THRESHOLD = 16; |
| |
| /** Default socket write timeout. */ |
| public static final long DFLT_SOCK_WRITE_TIMEOUT = 2000; |
| |
| /** No-op runnable. */ |
| private static final IgniteRunnable NOOP = new IgniteRunnable() { |
| @Override public void run() { |
| // No-op. |
| } |
| }; |
| |
| /** Node ID message type. */ |
| public static final byte NODE_ID_MSG_TYPE = -1; |
| |
| /** */ |
| public static final byte RECOVERY_LAST_ID_MSG_TYPE = -2; |
| |
| /** */ |
| public static final byte HANDSHAKE_MSG_TYPE = -3; |
| |
| /** Server listener. */ |
| private final GridNioServerListener<Message> srvLsnr = |
| new GridNioServerListenerAdapter<Message>() { |
| @Override public void onSessionWriteTimeout(GridNioSession ses) { |
| LT.warn(log, null, "Communication SPI Session write timed out (consider increasing " + |
| "'socketWriteTimeout' " + "configuration property) [remoteAddr=" + ses.remoteAddress() + |
| ", writeTimeout=" + sockWriteTimeout + ']'); |
| |
| if (log.isDebugEnabled()) |
| log.debug("Closing communication SPI session on write timeout [remoteAddr=" + ses.remoteAddress() + |
| ", writeTimeout=" + sockWriteTimeout + ']'); |
| |
| ses.close(); |
| } |
| |
| @Override public void onConnected(GridNioSession ses) { |
| if (ses.accepted()) { |
| if (log.isDebugEnabled()) |
| log.debug("Sending local node ID to newly accepted session: " + ses); |
| |
| ses.send(nodeIdMsg); |
| } |
| } |
| |
| @Override public void onDisconnected(GridNioSession ses, @Nullable Exception e) { |
| UUID id = ses.meta(NODE_ID_META); |
| |
| if (id != null) { |
| GridCommunicationClient rmv = clients.get(id); |
| |
| if (rmv instanceof GridTcpNioCommunicationClient && |
| ((GridTcpNioCommunicationClient)rmv).session() == ses && |
| clients.remove(id, rmv)) { |
| rmv.forceClose(); |
| |
| if (!isNodeStopping()) { |
| GridNioRecoveryDescriptor recoveryData = ses.recoveryDescriptor(); |
| |
| if (recoveryData != null) { |
| if (recoveryData.nodeAlive(getSpiContext().node(id))) { |
| if (!recoveryData.messagesFutures().isEmpty()) { |
| if (log.isDebugEnabled()) |
| log.debug("Session was closed but there are unacknowledged messages, " + |
| "will try to reconnect [rmtNode=" + recoveryData.node().id() + ']'); |
| |
| commWorker.addReconnectRequest(recoveryData); |
| } |
| } |
| else |
| recoveryData.onNodeLeft(); |
| } |
| } |
| } |
| |
| CommunicationListener<Message> lsnr0 = lsnr; |
| |
| if (lsnr0 != null) |
| lsnr0.onDisconnected(id); |
| } |
| } |
| |
| @Override public void onMessage(GridNioSession ses, Message msg) { |
| UUID sndId = ses.meta(NODE_ID_META); |
| |
| if (sndId == null) { |
| assert ses.accepted(); |
| |
| if (msg instanceof NodeIdMessage) |
| sndId = U.bytesToUuid(((NodeIdMessage)msg).nodeIdBytes, 0); |
| else { |
| assert msg instanceof HandshakeMessage : msg; |
| |
| sndId = ((HandshakeMessage)msg).nodeId(); |
| } |
| |
| if (log.isDebugEnabled()) |
| log.debug("Remote node ID received: " + sndId); |
| |
| final UUID old = ses.addMeta(NODE_ID_META, sndId); |
| |
| assert old == null; |
| |
| final ClusterNode rmtNode = getSpiContext().node(sndId); |
| |
| if (rmtNode == null) { |
| ses.close(); |
| |
| return; |
| } |
| |
| ClusterNode locNode = getSpiContext().localNode(); |
| |
| if (ses.remoteAddress() == null) |
| return; |
| |
| GridCommunicationClient oldClient = clients.get(sndId); |
| |
| boolean hasShmemClient = false; |
| |
| if (oldClient != null) { |
| if (oldClient instanceof GridTcpNioCommunicationClient) { |
| if (log.isDebugEnabled()) |
| log.debug("Received incoming connection when already connected " + |
| "to this node, rejecting [locNode=" + locNode.id() + |
| ", rmtNode=" + sndId + ']'); |
| |
| ses.send(new RecoveryLastReceivedMessage(-1)); |
| |
| return; |
| } |
| else { |
| assert oldClient instanceof GridShmemCommunicationClient; |
| |
| hasShmemClient = true; |
| } |
| } |
| |
| GridFutureAdapter<GridCommunicationClient> fut = new GridFutureAdapter<>(); |
| |
| GridFutureAdapter<GridCommunicationClient> oldFut = clientFuts.putIfAbsent(sndId, fut); |
| |
| assert msg instanceof HandshakeMessage : msg; |
| |
| HandshakeMessage msg0 = (HandshakeMessage)msg; |
| |
| final GridNioRecoveryDescriptor recoveryDesc = recoveryDescriptor(rmtNode); |
| |
| if (oldFut == null) { |
| oldClient = clients.get(sndId); |
| |
| if (oldClient != null) { |
| if (oldClient instanceof GridTcpNioCommunicationClient) { |
| if (log.isDebugEnabled()) |
| log.debug("Received incoming connection when already connected " + |
| "to this node, rejecting [locNode=" + locNode.id() + |
| ", rmtNode=" + sndId + ']'); |
| |
| ses.send(new RecoveryLastReceivedMessage(-1)); |
| |
| return; |
| } |
| else { |
| assert oldClient instanceof GridShmemCommunicationClient; |
| |
| hasShmemClient = true; |
| } |
| } |
| |
| boolean reserved = recoveryDesc.tryReserve(msg0.connectCount(), |
| new ConnectClosure(ses, recoveryDesc, rmtNode, msg0, !hasShmemClient, fut)); |
| |
| if (log.isDebugEnabled()) |
| log.debug("Received incoming connection from remote node " + |
| "[rmtNode=" + rmtNode.id() + ", reserved=" + reserved + ']'); |
| |
| if (reserved) { |
| try { |
| GridTcpNioCommunicationClient client = |
| connected(recoveryDesc, ses, rmtNode, msg0.received(), true, !hasShmemClient); |
| |
| fut.onDone(client); |
| } |
| finally { |
| clientFuts.remove(rmtNode.id(), fut); |
| } |
| } |
| } |
| else { |
| if (oldFut instanceof ConnectFuture && locNode.order() < rmtNode.order()) { |
| if (log.isDebugEnabled()) { |
| log.debug("Received incoming connection from remote node while " + |
| "connecting to this node, rejecting [locNode=" + locNode.id() + |
| ", locNodeOrder=" + locNode.order() + ", rmtNode=" + rmtNode.id() + |
| ", rmtNodeOrder=" + rmtNode.order() + ']'); |
| } |
| |
| ses.send(new RecoveryLastReceivedMessage(-1)); |
| } |
| else { |
| boolean reserved = recoveryDesc.tryReserve(msg0.connectCount(), |
| new ConnectClosure(ses, recoveryDesc, rmtNode, msg0, !hasShmemClient, fut)); |
| |
| if (reserved) { |
| GridTcpNioCommunicationClient client = |
| connected(recoveryDesc, ses, rmtNode, msg0.received(), true, !hasShmemClient); |
| |
| fut.onDone(client); |
| } |
| } |
| } |
| } |
| else { |
| rcvdMsgsCnt.increment(); |
| |
| GridNioRecoveryDescriptor recovery = ses.recoveryDescriptor(); |
| |
| if (recovery != null) { |
| if (msg instanceof RecoveryLastReceivedMessage) { |
| RecoveryLastReceivedMessage msg0 = (RecoveryLastReceivedMessage)msg; |
| |
| if (log.isDebugEnabled()) |
| log.debug("Received recovery acknowledgement [rmtNode=" + sndId + |
| ", rcvCnt=" + msg0.received() + ']'); |
| |
| recovery.ackReceived(msg0.received()); |
| |
| return; |
| } |
| else { |
| long rcvCnt = recovery.onReceived(); |
| |
| if (rcvCnt % ackSndThreshold == 0) { |
| if (log.isDebugEnabled()) |
| log.debug("Send recovery acknowledgement [rmtNode=" + sndId + |
| ", rcvCnt=" + rcvCnt + ']'); |
| |
| nioSrvr.sendSystem(ses, new RecoveryLastReceivedMessage(rcvCnt)); |
| |
| recovery.lastAcknowledged(rcvCnt); |
| } |
| } |
| } |
| |
| IgniteRunnable c; |
| |
| if (msgQueueLimit > 0) { |
| GridNioMessageTracker tracker = ses.meta(TRACKER_META); |
| |
| if (tracker == null) { |
| GridNioMessageTracker old = ses.addMeta(TRACKER_META, tracker = |
| new GridNioMessageTracker(ses, msgQueueLimit)); |
| |
| assert old == null; |
| } |
| |
| tracker.onMessageReceived(); |
| |
| c = tracker; |
| } |
| else |
| c = NOOP; |
| |
| notifyListener(sndId, msg, c); |
| } |
| } |
| |
| /** |
| * @param recovery Recovery descriptor. |
| * @param ses Session. |
| * @param node Node. |
| * @param rcvCnt Number of received messages.. |
| * @param sndRes If {@code true} sends response for recovery handshake. |
| * @param createClient If {@code true} creates NIO communication client. |
| * @return Client. |
| */ |
| private GridTcpNioCommunicationClient connected( |
| GridNioRecoveryDescriptor recovery, |
| GridNioSession ses, |
| ClusterNode node, |
| long rcvCnt, |
| boolean sndRes, |
| boolean createClient) { |
| recovery.onHandshake(rcvCnt); |
| |
| ses.recoveryDescriptor(recovery); |
| |
| nioSrvr.resend(ses); |
| |
| if (sndRes) |
| nioSrvr.sendSystem(ses, new RecoveryLastReceivedMessage(recovery.receivedCount())); |
| |
| recovery.connected(); |
| |
| GridTcpNioCommunicationClient client = null; |
| |
| if (createClient) { |
| client = new GridTcpNioCommunicationClient(ses, log); |
| |
| GridCommunicationClient oldClient = clients.putIfAbsent(node.id(), client); |
| |
| assert oldClient == null : "Client already created [node=" + node + ", client=" + client + |
| ", oldClient=" + oldClient + ", recoveryDesc=" + recovery + ']'; |
| } |
| |
| return client; |
| } |
| |
| /** |
| * |
| */ |
| @SuppressWarnings("PackageVisibleInnerClass") |
| class ConnectClosure implements IgniteInClosure<Boolean> { |
| /** */ |
| private static final long serialVersionUID = 0L; |
| |
| /** */ |
| private final GridNioSession ses; |
| |
| /** */ |
| private final GridNioRecoveryDescriptor recoveryDesc; |
| |
| /** */ |
| private final ClusterNode rmtNode; |
| |
| /** */ |
| private final HandshakeMessage msg; |
| |
| /** */ |
| private final GridFutureAdapter<GridCommunicationClient> fut; |
| |
| /** */ |
| private final boolean createClient; |
| |
| /** |
| * @param ses Incoming session. |
| * @param recoveryDesc Recovery descriptor. |
| * @param rmtNode Remote node. |
| * @param msg Handshake message. |
| * @param createClient If {@code true} creates NIO communication client.. |
| * @param fut Connect future. |
| */ |
| ConnectClosure(GridNioSession ses, |
| GridNioRecoveryDescriptor recoveryDesc, |
| ClusterNode rmtNode, |
| HandshakeMessage msg, |
| boolean createClient, |
| GridFutureAdapter<GridCommunicationClient> fut) { |
| this.ses = ses; |
| this.recoveryDesc = recoveryDesc; |
| this.rmtNode = rmtNode; |
| this.msg = msg; |
| this.createClient = createClient; |
| this.fut = fut; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void apply(Boolean success) { |
| if (success) { |
| IgniteInClosure<IgniteInternalFuture<?>> lsnr = new IgniteInClosure<IgniteInternalFuture<?>>() { |
| @Override public void apply(IgniteInternalFuture<?> msgFut) { |
| try { |
| msgFut.get(); |
| |
| GridTcpNioCommunicationClient client = |
| connected(recoveryDesc, ses, rmtNode, msg.received(), false, createClient); |
| |
| fut.onDone(client); |
| } |
| catch (IgniteCheckedException e) { |
| if (log.isDebugEnabled()) |
| log.debug("Failed to send recovery handshake " + |
| "[rmtNode=" + rmtNode.id() + ", err=" + e + ']'); |
| |
| recoveryDesc.release(); |
| |
| fut.onDone(); |
| } |
| finally { |
| clientFuts.remove(rmtNode.id(), fut); |
| } |
| } |
| }; |
| |
| nioSrvr.sendSystem(ses, new RecoveryLastReceivedMessage(recoveryDesc.receivedCount()), lsnr); |
| } |
| else { |
| try { |
| fut.onDone(); |
| } |
| finally { |
| clientFuts.remove(rmtNode.id(), fut); |
| } |
| } |
| } |
| } |
| }; |
| |
| /** Logger. */ |
| @LoggerResource |
| private IgniteLogger log; |
| |
| /** Local IP address. */ |
| private String locAddr; |
| |
| /** Complex variable that represents this node IP address. */ |
| private volatile InetAddress locHost; |
| |
| /** Local port which node uses. */ |
| private int locPort = DFLT_PORT; |
| |
| /** Local port range. */ |
| private int locPortRange = DFLT_PORT_RANGE; |
| |
| /** Local port which node uses to accept shared memory connections. */ |
| private int shmemPort = DFLT_SHMEM_PORT; |
| |
| /** Allocate direct buffer or heap buffer. */ |
| private boolean directBuf = true; |
| |
| /** Allocate direct buffer or heap buffer. */ |
| private boolean directSndBuf; |
| |
| /** Idle connection timeout. */ |
| private long idleConnTimeout = DFLT_IDLE_CONN_TIMEOUT; |
| |
| /** Connect timeout. */ |
| private long connTimeout = DFLT_CONN_TIMEOUT; |
| |
| /** Maximum connect timeout. */ |
| private long maxConnTimeout = DFLT_MAX_CONN_TIMEOUT; |
| |
| /** Reconnect attempts count. */ |
| @SuppressWarnings({"FieldAccessedSynchronizedAndUnsynchronized"}) |
| private int reconCnt = DFLT_RECONNECT_CNT; |
| |
| /** Socket send buffer. */ |
| private int sockSndBuf = DFLT_SOCK_BUF_SIZE; |
| |
| /** Socket receive buffer. */ |
| private int sockRcvBuf = DFLT_SOCK_BUF_SIZE; |
| |
| /** Message queue limit. */ |
| private int msgQueueLimit = DFLT_MSG_QUEUE_LIMIT; |
| |
| /** Slow client queue limit. */ |
| private int slowClientQueueLimit; |
| |
| /** NIO server. */ |
| private GridNioServer<Message> nioSrvr; |
| |
| /** Shared memory server. */ |
| private IpcSharedMemoryServerEndpoint shmemSrv; |
| |
| /** {@code TCP_NODELAY} option value for created sockets. */ |
| private boolean tcpNoDelay = DFLT_TCP_NODELAY; |
| |
| /** Number of received messages after which acknowledgment is sent. */ |
| private int ackSndThreshold = DFLT_ACK_SND_THRESHOLD; |
| |
| /** Maximum number of unacknowledged messages. */ |
| private int unackedMsgsBufSize; |
| |
| /** Socket write timeout. */ |
| private long sockWriteTimeout = DFLT_SOCK_WRITE_TIMEOUT; |
| |
| /** Recovery and idle clients handler. */ |
| private CommunicationWorker commWorker; |
| |
| /** Shared memory accept worker. */ |
| private ShmemAcceptWorker shmemAcceptWorker; |
| |
| /** Shared memory workers. */ |
| private final Collection<ShmemWorker> shmemWorkers = new ConcurrentLinkedDeque8<>(); |
| |
| /** Clients. */ |
| private final ConcurrentMap<UUID, GridCommunicationClient> clients = GridConcurrentFactory.newMap(); |
| |
| /** SPI listener. */ |
| private volatile CommunicationListener<Message> lsnr; |
| |
| /** Bound port. */ |
| private int boundTcpPort = -1; |
| |
| /** Bound port for shared memory server. */ |
| private int boundTcpShmemPort = -1; |
| |
| /** Count of selectors to use in TCP server. */ |
| private int selectorsCnt = DFLT_SELECTORS_CNT; |
| |
| /** Address resolver. */ |
| private AddressResolver addrRslvr; |
| |
| /** Local node ID message. */ |
| private NodeIdMessage nodeIdMsg; |
| |
| /** Received messages count. */ |
| private final LongAdder8 rcvdMsgsCnt = new LongAdder8(); |
| |
| /** Sent messages count.*/ |
| private final LongAdder8 sentMsgsCnt = new LongAdder8(); |
| |
| /** Received bytes count. */ |
| private final LongAdder8 rcvdBytesCnt = new LongAdder8(); |
| |
| /** Sent bytes count.*/ |
| private final LongAdder8 sentBytesCnt = new LongAdder8(); |
| |
| /** Context initialization latch. */ |
| private final CountDownLatch ctxInitLatch = new CountDownLatch(1); |
| |
| /** metrics listener. */ |
| private final GridNioMetricsListener metricsLsnr = new GridNioMetricsListener() { |
| @Override public void onBytesSent(int bytesCnt) { |
| sentBytesCnt.add(bytesCnt); |
| } |
| |
| @Override public void onBytesReceived(int bytesCnt) { |
| rcvdBytesCnt.add(bytesCnt); |
| } |
| }; |
| |
| /** Client connect futures. */ |
| private final ConcurrentMap<UUID, GridFutureAdapter<GridCommunicationClient>> clientFuts = |
| GridConcurrentFactory.newMap(); |
| |
| /** */ |
| private final ConcurrentMap<ClientKey, GridNioRecoveryDescriptor> recoveryDescs = GridConcurrentFactory.newMap(); |
| |
| /** Discovery listener. */ |
| private final GridLocalEventListener discoLsnr = new GridLocalEventListener() { |
| @Override public void onEvent(Event evt) { |
| assert evt instanceof DiscoveryEvent; |
| assert evt.type() == EVT_NODE_LEFT || evt.type() == EVT_NODE_FAILED; |
| |
| onNodeLeft(((DiscoveryEvent)evt).eventNode().id()); |
| } |
| }; |
| |
| /** |
| * @return {@code True} if ssl enabled. |
| */ |
| private boolean isSslEnabled() { |
| return ignite.configuration().getSslContextFactory() != null; |
| } |
| |
| /** |
| * Sets address resolver. |
| * |
| * @param addrRslvr Address resolver. |
| */ |
| @IgniteSpiConfiguration(optional = true) |
| public void setAddressResolver(AddressResolver addrRslvr) { |
| // Injection should not override value already set by Spring or user. |
| if (this.addrRslvr == null) |
| this.addrRslvr = addrRslvr; |
| } |
| |
| /** |
| * Injects resources. |
| * |
| * @param ignite Ignite. |
| */ |
| @IgniteInstanceResource |
| @Override protected void injectResources(Ignite ignite) { |
| super.injectResources(ignite); |
| |
| if (ignite != null) { |
| setAddressResolver(ignite.configuration().getAddressResolver()); |
| setLocalAddress(ignite.configuration().getLocalHost()); |
| } |
| } |
| |
| /** |
| * Sets local host address for socket binding. Note that one node could have |
| * additional addresses beside the loopback one. This configuration |
| * parameter is optional. |
| * |
| * @param locAddr IP address. Default value is any available local |
| * IP address. |
| */ |
| @IgniteSpiConfiguration(optional = true) |
| public void setLocalAddress(String locAddr) { |
| // Injection should not override value already set by Spring or user. |
| if (this.locAddr == null) |
| this.locAddr = locAddr; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public String getLocalAddress() { |
| return locAddr; |
| } |
| |
| /** |
| * Sets local port for socket binding. |
| * <p> |
| * If not provided, default value is {@link #DFLT_PORT}. |
| * |
| * @param locPort Port number. |
| */ |
| @IgniteSpiConfiguration(optional = true) |
| public void setLocalPort(int locPort) { |
| this.locPort = locPort; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public int getLocalPort() { |
| return locPort; |
| } |
| |
| /** |
| * Sets local port range for local host ports (value must greater than or equal to <tt>0</tt>). |
| * If provided local port (see {@link #setLocalPort(int)}} is occupied, |
| * implementation will try to increment the port number for as long as it is less than |
| * initial value plus this range. |
| * <p> |
| * If port range value is <tt>0</tt>, then implementation will try bind only to the port provided by |
| * {@link #setLocalPort(int)} method and fail if binding to this port did not succeed. |
| * <p> |
| * Local port range is very useful during development when more than one grid nodes need to run |
| * on the same physical machine. |
| * <p> |
| * If not provided, default value is {@link #DFLT_PORT_RANGE}. |
| * |
| * @param locPortRange New local port range. |
| */ |
| @IgniteSpiConfiguration(optional = true) |
| public void setLocalPortRange(int locPortRange) { |
| this.locPortRange = locPortRange; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public int getLocalPortRange() { |
| return locPortRange; |
| } |
| |
| /** |
| * Sets local port to accept shared memory connections. |
| * <p> |
| * If set to {@code -1} shared memory communication will be disabled. |
| * <p> |
| * If not provided, default value is {@link #DFLT_SHMEM_PORT}. |
| * |
| * @param shmemPort Port number. |
| */ |
| @IgniteSpiConfiguration(optional = true) |
| public void setSharedMemoryPort(int shmemPort) { |
| this.shmemPort = shmemPort; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public int getSharedMemoryPort() { |
| return shmemPort; |
| } |
| |
| /** |
| * Sets maximum idle connection timeout upon which a connection |
| * to client will be closed. |
| * <p> |
| * If not provided, default value is {@link #DFLT_IDLE_CONN_TIMEOUT}. |
| * |
| * @param idleConnTimeout Maximum idle connection time. |
| */ |
| @IgniteSpiConfiguration(optional = true) |
| public void setIdleConnectionTimeout(long idleConnTimeout) { |
| this.idleConnTimeout = idleConnTimeout; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public long getIdleConnectionTimeout() { |
| return idleConnTimeout; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public long getSocketWriteTimeout() { |
| return sockWriteTimeout; |
| } |
| |
| /** |
| * Sets socket write timeout for TCP connection. If message can not be written to |
| * socket within this time then connection is closed and reconnect is attempted. |
| * <p> |
| * Default to {@link #DFLT_SOCK_WRITE_TIMEOUT}. |
| * |
| * @param sockWriteTimeout Socket write timeout for TCP connection. |
| */ |
| @IgniteSpiConfiguration(optional = true) |
| public void setSocketWriteTimeout(long sockWriteTimeout) { |
| this.sockWriteTimeout = sockWriteTimeout; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public int getAckSendThreshold() { |
| return ackSndThreshold; |
| } |
| |
| /** |
| * Sets number of received messages per connection to node after which acknowledgment message is sent. |
| * <p> |
| * Default to {@link #DFLT_ACK_SND_THRESHOLD}. |
| * |
| * @param ackSndThreshold Number of received messages after which acknowledgment is sent. |
| */ |
| @IgniteSpiConfiguration(optional = true) |
| public void setAckSendThreshold(int ackSndThreshold) { |
| this.ackSndThreshold = ackSndThreshold; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public int getUnacknowledgedMessagesBufferSize() { |
| return unackedMsgsBufSize; |
| } |
| |
| /** |
| * Sets maximum number of stored unacknowledged messages per connection to node. |
| * If number of unacknowledged messages exceeds this number then connection to node is |
| * closed and reconnect is attempted. |
| * |
| * @param unackedMsgsBufSize Maximum number of unacknowledged messages. |
| */ |
| @IgniteSpiConfiguration(optional = true) |
| public void setUnacknowledgedMessagesBufferSize(int unackedMsgsBufSize) { |
| this.unackedMsgsBufSize = unackedMsgsBufSize; |
| } |
| |
| /** |
| * Sets connection buffer size. If set to {@code 0} connection buffer is disabled. |
| * |
| * @param connBufSize Connection buffer size. |
| * @see #setConnectionBufferFlushFrequency(long) |
| * @deprecated Not used any more. |
| */ |
| @Deprecated |
| @IgniteSpiConfiguration(optional = true) |
| public void setConnectionBufferSize(int connBufSize) { |
| // No-op. |
| } |
| |
| /** {@inheritDoc} */ |
| @Deprecated |
| @Override public int getConnectionBufferSize() { |
| return 0; |
| } |
| |
| /** {@inheritDoc} */ |
| @Deprecated |
| @IgniteSpiConfiguration(optional = true) |
| @Override public void setConnectionBufferFlushFrequency(long connBufFlushFreq) { |
| // No-op. |
| } |
| |
| /** {@inheritDoc} */ |
| @Deprecated |
| @Override public long getConnectionBufferFlushFrequency() { |
| return 0; |
| } |
| |
| /** |
| * Sets connect timeout used when establishing connection |
| * with remote nodes. |
| * <p> |
| * {@code 0} is interpreted as infinite timeout. |
| * <p> |
| * If not provided, default value is {@link #DFLT_CONN_TIMEOUT}. |
| * |
| * @param connTimeout Connect timeout. |
| */ |
| @IgniteSpiConfiguration(optional = true) |
| public void setConnectTimeout(long connTimeout) { |
| this.connTimeout = connTimeout; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public long getConnectTimeout() { |
| return connTimeout; |
| } |
| |
| /** |
| * Sets maximum connect timeout. If handshake is not established within connect timeout, |
| * then SPI tries to repeat handshake procedure with increased connect timeout. |
| * Connect timeout can grow till maximum timeout value, |
| * if maximum timeout value is reached then the handshake is considered as failed. |
| * <p> |
| * {@code 0} is interpreted as infinite timeout. |
| * <p> |
| * If not provided, default value is {@link #DFLT_MAX_CONN_TIMEOUT}. |
| * |
| * @param maxConnTimeout Maximum connect timeout. |
| */ |
| @IgniteSpiConfiguration(optional = true) |
| public void setMaxConnectTimeout(long maxConnTimeout) { |
| this.maxConnTimeout = maxConnTimeout; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public long getMaxConnectTimeout() { |
| return maxConnTimeout; |
| } |
| |
| /** |
| * Sets maximum number of reconnect attempts used when establishing connection |
| * with remote nodes. |
| * <p> |
| * If not provided, default value is {@link #DFLT_RECONNECT_CNT}. |
| * |
| * @param reconCnt Maximum number of reconnection attempts. |
| */ |
| @IgniteSpiConfiguration(optional = true) |
| public void setReconnectCount(int reconCnt) { |
| this.reconCnt = reconCnt; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public int getReconnectCount() { |
| return reconCnt; |
| } |
| |
| /** |
| * Sets flag to allocate direct or heap buffer in SPI. |
| * If value is {@code true}, then SPI will use {@link ByteBuffer#allocateDirect(int)} call. |
| * Otherwise, SPI will use {@link ByteBuffer#allocate(int)} call. |
| * <p> |
| * If not provided, default value is {@code true}. |
| * |
| * @param directBuf Flag indicates to allocate direct or heap buffer in SPI. |
| */ |
| @IgniteSpiConfiguration(optional = true) |
| public void setDirectBuffer(boolean directBuf) { |
| this.directBuf = directBuf; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public boolean isDirectBuffer() { |
| return directBuf; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public boolean isDirectSendBuffer() { |
| return directSndBuf; |
| } |
| |
| /** |
| * Sets whether to use direct buffer for sending. |
| * <p> |
| * If not provided default is {@code false}. |
| * |
| * @param directSndBuf {@code True} to use direct buffers for send. |
| */ |
| @IgniteSpiConfiguration(optional = true) |
| public void setDirectSendBuffer(boolean directSndBuf) { |
| this.directSndBuf = directSndBuf; |
| } |
| |
| /** |
| * Sets the count of selectors te be used in TCP server. |
| * <p/> |
| * If not provided, default value is {@link #DFLT_SELECTORS_CNT}. |
| * |
| * @param selectorsCnt Selectors count. |
| */ |
| @IgniteSpiConfiguration(optional = true) |
| public void setSelectorsCount(int selectorsCnt) { |
| this.selectorsCnt = selectorsCnt; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public int getSelectorsCount() { |
| return selectorsCnt; |
| } |
| |
| /** |
| * Sets value for {@code TCP_NODELAY} socket option. Each |
| * socket will be opened using provided value. |
| * <p> |
| * Setting this option to {@code true} disables Nagle's algorithm |
| * for socket decreasing latency and delivery time for small messages. |
| * <p> |
| * For systems that work under heavy network load it is advisable to |
| * set this value to {@code false}. |
| * <p> |
| * If not provided, default value is {@link #DFLT_TCP_NODELAY}. |
| * |
| * @param tcpNoDelay {@code True} to disable TCP delay. |
| */ |
| @IgniteSpiConfiguration(optional = true) |
| public void setTcpNoDelay(boolean tcpNoDelay) { |
| this.tcpNoDelay = tcpNoDelay; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public boolean isTcpNoDelay() { |
| return tcpNoDelay; |
| } |
| |
| /** |
| * Sets receive buffer size for sockets created or accepted by this SPI. |
| * <p> |
| * If not provided, default is {@link #DFLT_SOCK_BUF_SIZE}. |
| * |
| * @param sockRcvBuf Socket receive buffer size. |
| */ |
| @IgniteSpiConfiguration(optional = true) |
| public void setSocketReceiveBuffer(int sockRcvBuf) { |
| this.sockRcvBuf = sockRcvBuf; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public int getSocketReceiveBuffer() { |
| return sockRcvBuf; |
| } |
| |
| /** |
| * Sets send buffer size for sockets created or accepted by this SPI. |
| * <p> |
| * If not provided, default is {@link #DFLT_SOCK_BUF_SIZE}. |
| * |
| * @param sockSndBuf Socket send buffer size. |
| */ |
| @IgniteSpiConfiguration(optional = true) |
| public void setSocketSendBuffer(int sockSndBuf) { |
| this.sockSndBuf = sockSndBuf; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public int getSocketSendBuffer() { |
| return sockSndBuf; |
| } |
| |
| /** |
| * Sets message queue limit for incoming and outgoing messages. |
| * <p> |
| * When set to positive number send queue is limited to the configured value. |
| * {@code 0} disables the size limitations. |
| * <p> |
| * If not provided, default is {@link #DFLT_MSG_QUEUE_LIMIT}. |
| * |
| * @param msgQueueLimit Send queue size limit. |
| */ |
| @IgniteSpiConfiguration(optional = true) |
| public void setMessageQueueLimit(int msgQueueLimit) { |
| this.msgQueueLimit = msgQueueLimit; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public int getMessageQueueLimit() { |
| return msgQueueLimit; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public int getSlowClientQueueLimit() { |
| return slowClientQueueLimit; |
| } |
| |
| /** |
| * Sets slow client queue limit. |
| * <p/> |
| * When set to a positive number, communication SPI will monitor clients outbound message queue sizes and will drop |
| * those clients whose queue exceeded this limit. |
| * <p/> |
| * Usually this value should be set to the same value as {@link #getMessageQueueLimit()} which controls |
| * message back-pressure for server nodes. The default value for this parameter is {@code 0} |
| * which means {@code unlimited}. |
| * |
| * @param slowClientQueueLimit Slow client queue limit. |
| */ |
| public void setSlowClientQueueLimit(int slowClientQueueLimit) { |
| this.slowClientQueueLimit = slowClientQueueLimit; |
| } |
| |
| /** |
| * Sets the minimum number of messages for this SPI, that are buffered |
| * prior to sending. |
| * |
| * @param minBufferedMsgCnt Minimum buffered message count. |
| * @deprecated Not used any more. |
| */ |
| @IgniteSpiConfiguration(optional = true) |
| @Deprecated |
| public void setMinimumBufferedMessageCount(int minBufferedMsgCnt) { |
| // No-op. |
| } |
| |
| /** {@inheritDoc} */ |
| @Deprecated |
| @Override public int getMinimumBufferedMessageCount() { |
| return 0; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void setListener(CommunicationListener<Message> lsnr) { |
| this.lsnr = lsnr; |
| } |
| |
| /** |
| * @return Listener. |
| */ |
| public CommunicationListener getListener() { |
| return lsnr; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public int getSentMessagesCount() { |
| return sentMsgsCnt.intValue(); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public long getSentBytesCount() { |
| return sentBytesCnt.longValue(); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public int getReceivedMessagesCount() { |
| return rcvdMsgsCnt.intValue(); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public long getReceivedBytesCount() { |
| return rcvdBytesCnt.longValue(); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public int getOutboundMessagesQueueSize() { |
| return nioSrvr.outboundMessagesQueueSize(); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void resetMetrics() { |
| // Can't use 'reset' method because it is not thread-safe |
| // according to javadoc. |
| sentMsgsCnt.add(-sentMsgsCnt.sum()); |
| rcvdMsgsCnt.add(-rcvdMsgsCnt.sum()); |
| sentBytesCnt.add(-sentBytesCnt.sum()); |
| rcvdBytesCnt.add(-rcvdBytesCnt.sum()); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public Map<String, Object> getNodeAttributes() throws IgniteSpiException { |
| nodeIdMsg = new NodeIdMessage(getLocalNodeId()); |
| |
| assertParameter(locPort > 1023, "locPort > 1023"); |
| assertParameter(locPort <= 0xffff, "locPort < 0xffff"); |
| assertParameter(locPortRange >= 0, "locPortRange >= 0"); |
| assertParameter(idleConnTimeout > 0, "idleConnTimeout > 0"); |
| assertParameter(sockRcvBuf >= 0, "sockRcvBuf >= 0"); |
| assertParameter(sockSndBuf >= 0, "sockSndBuf >= 0"); |
| assertParameter(msgQueueLimit >= 0, "msgQueueLimit >= 0"); |
| assertParameter(shmemPort > 0 || shmemPort == -1, "shmemPort > 0 || shmemPort == -1"); |
| assertParameter(reconCnt > 0, "reconnectCnt > 0"); |
| assertParameter(selectorsCnt > 0, "selectorsCnt > 0"); |
| assertParameter(connTimeout >= 0, "connTimeout >= 0"); |
| assertParameter(maxConnTimeout >= connTimeout, "maxConnTimeout >= connTimeout"); |
| assertParameter(sockWriteTimeout >= 0, "sockWriteTimeout >= 0"); |
| assertParameter(ackSndThreshold > 0, "ackSndThreshold > 0"); |
| assertParameter(unackedMsgsBufSize >= 0, "unackedMsgsBufSize >= 0"); |
| |
| if (unackedMsgsBufSize > 0) { |
| assertParameter(unackedMsgsBufSize >= msgQueueLimit * 5, |
| "Specified 'unackedMsgsBufSize' is too low, it should be at least 'msgQueueLimit * 5'."); |
| |
| assertParameter(unackedMsgsBufSize >= ackSndThreshold * 5, |
| "Specified 'unackedMsgsBufSize' is too low, it should be at least 'ackSndThreshold * 5'."); |
| } |
| |
| try { |
| locHost = U.resolveLocalHost(locAddr); |
| } |
| catch (IOException e) { |
| throw new IgniteSpiException("Failed to initialize local address: " + locAddr, e); |
| } |
| |
| try { |
| shmemSrv = resetShmemServer(); |
| } |
| catch (IgniteCheckedException e) { |
| U.warn(log, "Failed to start shared memory communication server.", e); |
| } |
| |
| try { |
| // This method potentially resets local port to the value |
| // local node was bound to. |
| nioSrvr = resetNioServer(); |
| } |
| catch (IgniteCheckedException e) { |
| throw new IgniteSpiException("Failed to initialize TCP server: " + locHost, e); |
| } |
| |
| // Set local node attributes. |
| try { |
| IgniteBiTuple<Collection<String>, Collection<String>> addrs = U.resolveLocalAddresses(locHost); |
| |
| Collection<InetSocketAddress> extAddrs = addrRslvr == null ? null : |
| U.resolveAddresses(addrRslvr, F.flat(Arrays.asList(addrs.get1(), addrs.get2())), boundTcpPort); |
| |
| return F.asMap( |
| createSpiAttributeName(ATTR_ADDRS), addrs.get1(), |
| createSpiAttributeName(ATTR_HOST_NAMES), addrs.get2(), |
| createSpiAttributeName(ATTR_PORT), boundTcpPort, |
| createSpiAttributeName(ATTR_SHMEM_PORT), boundTcpShmemPort >= 0 ? boundTcpShmemPort : null, |
| createSpiAttributeName(ATTR_EXT_ADDRS), extAddrs); |
| } |
| catch (IOException | IgniteCheckedException e) { |
| throw new IgniteSpiException("Failed to resolve local host to addresses: " + locHost, e); |
| } |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void spiStart(String gridName) throws IgniteSpiException { |
| assert locHost != null; |
| |
| // Start SPI start stopwatch. |
| startStopwatch(); |
| |
| // Ack parameters. |
| if (log.isDebugEnabled()) { |
| log.debug(configInfo("locAddr", locAddr)); |
| log.debug(configInfo("locPort", locPort)); |
| log.debug(configInfo("locPortRange", locPortRange)); |
| log.debug(configInfo("idleConnTimeout", idleConnTimeout)); |
| log.debug(configInfo("directBuf", directBuf)); |
| log.debug(configInfo("directSendBuf", directSndBuf)); |
| log.debug(configInfo("selectorsCnt", selectorsCnt)); |
| log.debug(configInfo("tcpNoDelay", tcpNoDelay)); |
| log.debug(configInfo("sockSndBuf", sockSndBuf)); |
| log.debug(configInfo("sockRcvBuf", sockRcvBuf)); |
| log.debug(configInfo("shmemPort", shmemPort)); |
| log.debug(configInfo("msgQueueLimit", msgQueueLimit)); |
| log.debug(configInfo("connTimeout", connTimeout)); |
| log.debug(configInfo("maxConnTimeout", maxConnTimeout)); |
| log.debug(configInfo("reconCnt", reconCnt)); |
| log.debug(configInfo("sockWriteTimeout", sockWriteTimeout)); |
| log.debug(configInfo("ackSndThreshold", ackSndThreshold)); |
| log.debug(configInfo("unackedMsgsBufSize", unackedMsgsBufSize)); |
| } |
| |
| if (!tcpNoDelay) |
| U.quietAndWarn(log, "'TCP_NO_DELAY' for communication is off, which should be used with caution " + |
| "since may produce significant delays with some scenarios."); |
| |
| if (slowClientQueueLimit > 0 && msgQueueLimit > 0 && slowClientQueueLimit >= msgQueueLimit) { |
| U.quietAndWarn(log, "Slow client queue limit is set to a value greater than message queue limit " + |
| "(slow client queue limit will have no effect) [msgQueueLimit=" + msgQueueLimit + |
| ", slowClientQueueLimit=" + slowClientQueueLimit + ']'); |
| } |
| |
| registerMBean(gridName, this, TcpCommunicationSpiMBean.class); |
| |
| if (shmemSrv != null) { |
| shmemAcceptWorker = new ShmemAcceptWorker(shmemSrv); |
| |
| new IgniteThread(shmemAcceptWorker).start(); |
| } |
| |
| nioSrvr.start(); |
| |
| commWorker = new CommunicationWorker(); |
| |
| commWorker.start(); |
| |
| // Ack start. |
| if (log.isDebugEnabled()) |
| log.debug(startInfo()); |
| } |
| |
| /** {@inheritDoc} }*/ |
| @Override public void onContextInitialized0(IgniteSpiContext spiCtx) throws IgniteSpiException { |
| spiCtx.registerPort(boundTcpPort, IgnitePortProtocol.TCP); |
| |
| // SPI can start without shmem port. |
| if (boundTcpShmemPort > 0) |
| spiCtx.registerPort(boundTcpShmemPort, IgnitePortProtocol.TCP); |
| |
| spiCtx.addLocalEventListener(discoLsnr, EVT_NODE_LEFT, EVT_NODE_FAILED); |
| |
| ctxInitLatch.countDown(); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public IgniteSpiContext getSpiContext() { |
| if (ctxInitLatch.getCount() > 0) { |
| if (log.isDebugEnabled()) |
| log.debug("Waiting for context initialization."); |
| |
| try { |
| U.await(ctxInitLatch); |
| |
| if (log.isDebugEnabled()) |
| log.debug("Context has been initialized."); |
| } |
| catch (IgniteInterruptedCheckedException e) { |
| U.warn(log, "Thread has been interrupted while waiting for SPI context initialization.", e); |
| } |
| } |
| |
| return super.getSpiContext(); |
| } |
| |
| /** |
| * Recreates tpcSrvr socket instance. |
| * |
| * @return Server instance. |
| * @throws IgniteCheckedException Thrown if it's not possible to create server. |
| */ |
| private GridNioServer<Message> resetNioServer() throws IgniteCheckedException { |
| if (boundTcpPort >= 0) |
| throw new IgniteCheckedException("Tcp NIO server was already created on port " + boundTcpPort); |
| |
| IgniteCheckedException lastEx = null; |
| |
| // If configured TCP port is busy, find first available in range. |
| for (int port = locPort; port < locPort + locPortRange; port++) { |
| try { |
| MessageFactory msgFactory = new MessageFactory() { |
| private MessageFactory impl; |
| |
| @Nullable @Override public Message create(byte type) { |
| if (impl == null) |
| impl = getSpiContext().messageFactory(); |
| |
| assert impl != null; |
| |
| return impl.create(type); |
| } |
| }; |
| |
| MessageFormatter msgFormatter = new MessageFormatter() { |
| private MessageFormatter impl; |
| |
| @Override public MessageWriter writer() { |
| if (impl == null) |
| impl = getSpiContext().messageFormatter(); |
| |
| assert impl != null; |
| |
| return impl.writer(); |
| } |
| |
| @Override public MessageReader reader(MessageFactory factory) { |
| if (impl == null) |
| impl = getSpiContext().messageFormatter(); |
| |
| assert impl != null; |
| |
| return impl.reader(factory); |
| } |
| }; |
| |
| GridDirectParser parser = new GridDirectParser(msgFactory, msgFormatter); |
| |
| IgnitePredicate<Message> skipRecoveryPred = new IgnitePredicate<Message>() { |
| @Override public boolean apply(Message msg) { |
| return msg instanceof RecoveryLastReceivedMessage; |
| } |
| }; |
| |
| boolean clientMode = Boolean.TRUE.equals(ignite.configuration().isClientMode()); |
| |
| IgniteBiInClosure<GridNioSession, Integer> queueSizeMonitor = |
| !clientMode && slowClientQueueLimit > 0 ? |
| new CI2<GridNioSession, Integer>() { |
| @Override public void apply(GridNioSession ses, Integer qSize) { |
| checkClientQueueSize(ses, qSize); |
| } |
| } : |
| null; |
| |
| GridNioFilter[] filters; |
| |
| if (isSslEnabled()) { |
| GridNioSslFilter sslFilter = |
| new GridNioSslFilter(ignite.configuration().getSslContextFactory().create(), |
| true, ByteOrder.nativeOrder(), log); |
| |
| sslFilter.directMode(true); |
| |
| sslFilter.wantClientAuth(true); |
| sslFilter.needClientAuth(true); |
| |
| filters = new GridNioFilter[] { |
| new GridNioCodecFilter(parser, log, true), |
| new GridConnectionBytesVerifyFilter(log), |
| sslFilter |
| }; |
| } |
| else |
| filters = new GridNioFilter[] { |
| new GridNioCodecFilter(parser, log, true), |
| new GridConnectionBytesVerifyFilter(log) |
| }; |
| |
| GridNioServer<Message> srvr = |
| GridNioServer.<Message>builder() |
| .address(locHost) |
| .port(port) |
| .listener(srvLsnr) |
| .logger(log) |
| .selectorCount(selectorsCnt) |
| .gridName(gridName) |
| .tcpNoDelay(tcpNoDelay) |
| .directBuffer(directBuf) |
| .byteOrder(ByteOrder.nativeOrder()) |
| .socketSendBufferSize(sockSndBuf) |
| .socketReceiveBufferSize(sockRcvBuf) |
| .sendQueueLimit(msgQueueLimit) |
| .directMode(true) |
| .metricsListener(metricsLsnr) |
| .writeTimeout(sockWriteTimeout) |
| .filters(filters) |
| .messageFormatter(msgFormatter) |
| .skipRecoveryPredicate(skipRecoveryPred) |
| .messageQueueSizeListener(queueSizeMonitor) |
| .build(); |
| |
| boundTcpPort = port; |
| |
| // Ack Port the TCP server was bound to. |
| if (log.isInfoEnabled()) |
| log.info("Successfully bound to TCP port [port=" + boundTcpPort + |
| ", locHost=" + locHost + ']'); |
| |
| srvr.idleTimeout(idleConnTimeout); |
| |
| return srvr; |
| } |
| catch (IgniteCheckedException e) { |
| if (X.hasCause(e, SSLException.class)) |
| throw new IgniteSpiException("Failed to create SSL context. SSL factory: " |
| + ignite.configuration().getSslContextFactory() + '.', e); |
| |
| lastEx = e; |
| |
| if (log.isDebugEnabled()) |
| log.debug("Failed to bind to local port (will try next port within range) [port=" + port + |
| ", locHost=" + locHost + ']'); |
| |
| onException("Failed to bind to local port (will try next port within range) [port=" + port + |
| ", locHost=" + locHost + ']', e); |
| } |
| } |
| |
| // If free port wasn't found. |
| throw new IgniteCheckedException("Failed to bind to any port within range [startPort=" + locPort + |
| ", portRange=" + locPortRange + ", locHost=" + locHost + ']', lastEx); |
| } |
| |
| /** |
| * Creates new shared memory communication server. |
| * @return Server. |
| * @throws IgniteCheckedException If failed. |
| */ |
| @Nullable private IpcSharedMemoryServerEndpoint resetShmemServer() throws IgniteCheckedException { |
| if (boundTcpShmemPort >= 0) |
| throw new IgniteCheckedException("Shared memory server was already created on port " + boundTcpShmemPort); |
| |
| if (shmemPort == -1 || U.isWindows()) |
| return null; |
| |
| IgniteCheckedException lastEx = null; |
| |
| // If configured TCP port is busy, find first available in range. |
| for (int port = shmemPort; port < shmemPort + locPortRange; port++) { |
| try { |
| IpcSharedMemoryServerEndpoint srv = |
| new IpcSharedMemoryServerEndpoint(log, ignite.configuration().getNodeId(), gridName); |
| |
| srv.setPort(port); |
| |
| srv.omitOutOfResourcesWarning(true); |
| |
| srv.start(); |
| |
| boundTcpShmemPort = port; |
| |
| // Ack Port the TCP server was bound to. |
| if (log.isInfoEnabled()) |
| log.info("Successfully bound shared memory communication to TCP port [port=" + boundTcpShmemPort + |
| ", locHost=" + locHost + ']'); |
| |
| return srv; |
| } |
| catch (IgniteCheckedException e) { |
| lastEx = e; |
| |
| if (log.isDebugEnabled()) |
| log.debug("Failed to bind to local port (will try next port within range) [port=" + port + |
| ", locHost=" + locHost + ']'); |
| } |
| } |
| |
| // If free port wasn't found. |
| throw new IgniteCheckedException("Failed to bind shared memory communication to any port within range [startPort=" + |
| locPort + ", portRange=" + locPortRange + ", locHost=" + locHost + ']', lastEx); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void spiStop() throws IgniteSpiException { |
| assert isNodeStopping(); |
| |
| unregisterMBean(); |
| |
| // Stop TCP server. |
| if (nioSrvr != null) |
| nioSrvr.stop(); |
| |
| U.interrupt(commWorker); |
| U.join(commWorker, log); |
| |
| U.cancel(shmemAcceptWorker); |
| U.join(shmemAcceptWorker, log); |
| |
| U.cancel(shmemWorkers); |
| U.join(shmemWorkers, log); |
| |
| shmemWorkers.clear(); |
| |
| // Force closing on stop (safety). |
| for (GridCommunicationClient client : clients.values()) |
| client.forceClose(); |
| |
| // Clear resources. |
| nioSrvr = null; |
| commWorker = null; |
| |
| boundTcpPort = -1; |
| |
| // Ack stop. |
| if (log.isDebugEnabled()) |
| log.debug(stopInfo()); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override protected void onContextDestroyed0() { |
| if (ctxInitLatch.getCount() > 0) |
| // Safety. |
| ctxInitLatch.countDown(); |
| |
| // Force closing. |
| for (GridCommunicationClient client : clients.values()) |
| client.forceClose(); |
| |
| getSpiContext().deregisterPorts(); |
| |
| getSpiContext().removeLocalEventListener(discoLsnr); |
| } |
| |
| /** |
| * @param nodeId Left node ID. |
| */ |
| void onNodeLeft(UUID nodeId) { |
| assert nodeId != null; |
| |
| GridCommunicationClient client = clients.get(nodeId); |
| |
| if (client != null) { |
| if (log.isDebugEnabled()) |
| log.debug("Forcing NIO client close since node has left [nodeId=" + nodeId + |
| ", client=" + client + ']'); |
| |
| client.forceClose(); |
| |
| clients.remove(nodeId, client); |
| } |
| } |
| |
| /** {@inheritDoc} */ |
| @Override protected void checkConfigurationConsistency0(IgniteSpiContext spiCtx, ClusterNode node, boolean starting) |
| throws IgniteSpiException { |
| // These attributes are set on node startup in any case, so we MUST receive them. |
| checkAttributePresence(node, createSpiAttributeName(ATTR_ADDRS)); |
| checkAttributePresence(node, createSpiAttributeName(ATTR_HOST_NAMES)); |
| checkAttributePresence(node, createSpiAttributeName(ATTR_PORT)); |
| } |
| |
| /** |
| * Checks that node has specified attribute and prints warning if it does not. |
| * |
| * @param node Node to check. |
| * @param attrName Name of the attribute. |
| */ |
| private void checkAttributePresence(ClusterNode node, String attrName) { |
| if (node.attribute(attrName) == null) |
| U.warn(log, "Remote node has inconsistent configuration (required attribute was not found) " + |
| "[attrName=" + attrName + ", nodeId=" + node.id() + |
| "spiCls=" + U.getSimpleName(TcpCommunicationSpi.class) + ']'); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void sendMessage(ClusterNode node, Message msg) throws IgniteSpiException { |
| assert node != null; |
| assert msg != null; |
| |
| if (log.isTraceEnabled()) |
| log.trace("Sending message to node [node=" + node + ", msg=" + msg + ']'); |
| |
| UUID locNodeId = getLocalNodeId(); |
| |
| if (node.id().equals(locNodeId)) |
| notifyListener(locNodeId, msg, NOOP); |
| else { |
| GridCommunicationClient client = null; |
| |
| try { |
| boolean retry; |
| |
| do { |
| client = reserveClient(node); |
| |
| UUID nodeId = null; |
| |
| if (!client.async() && !getSpiContext().localNode().version().equals(node.version())) |
| nodeId = node.id(); |
| |
| retry = client.sendMessage(nodeId, msg); |
| |
| client.release(); |
| |
| client = null; |
| |
| if (!retry) |
| sentMsgsCnt.increment(); |
| else { |
| ClusterNode node0 = getSpiContext().node(node.id()); |
| |
| if (node0 == null) |
| throw new IgniteCheckedException("Failed to send message to remote node " + |
| "(node has left the grid): " + node.id()); |
| } |
| } |
| while (retry); |
| } |
| catch (IgniteCheckedException e) { |
| throw new IgniteSpiException("Failed to send message to remote node: " + node, e); |
| } |
| finally { |
| if (client != null && clients.remove(node.id(), client)) |
| client.forceClose(); |
| } |
| } |
| } |
| |
| /** |
| * Returns existing or just created client to node. |
| * |
| * @param node Node to which client should be open. |
| * @return The existing or just created client. |
| * @throws IgniteCheckedException Thrown if any exception occurs. |
| */ |
| private GridCommunicationClient reserveClient(ClusterNode node) throws IgniteCheckedException { |
| assert node != null; |
| |
| UUID nodeId = node.id(); |
| |
| while (true) { |
| GridCommunicationClient client = clients.get(nodeId); |
| |
| if (client == null) { |
| if (isNodeStopping()) |
| throw new IgniteSpiException("Node is stopping."); |
| |
| // Do not allow concurrent connects. |
| GridFutureAdapter<GridCommunicationClient> fut = new ConnectFuture(); |
| |
| GridFutureAdapter<GridCommunicationClient> oldFut = clientFuts.putIfAbsent(nodeId, fut); |
| |
| if (oldFut == null) { |
| try { |
| GridCommunicationClient client0 = clients.get(nodeId); |
| |
| if (client0 == null) { |
| client0 = createNioClient(node); |
| |
| if (client0 != null) { |
| GridCommunicationClient old = clients.put(nodeId, client0); |
| |
| assert old == null : "Client already created " + |
| "[node=" + node + ", client=" + client0 + ", oldClient=" + old + ']'; |
| |
| if (client0 instanceof GridTcpNioCommunicationClient) { |
| GridTcpNioCommunicationClient tcpClient = ((GridTcpNioCommunicationClient)client0); |
| |
| if (tcpClient.session().closeTime() > 0 && clients.remove(nodeId, client0)) { |
| if (log.isDebugEnabled()) |
| log.debug("Session was closed after client creation, will retry " + |
| "[node=" + node + ", client=" + client0 + ']'); |
| |
| client0 = null; |
| } |
| } |
| } |
| else |
| U.sleep(200); |
| } |
| |
| fut.onDone(client0); |
| } |
| catch (Throwable e) { |
| fut.onDone(e); |
| |
| if (e instanceof Error) |
| throw (Error)e; |
| } |
| finally { |
| clientFuts.remove(nodeId, fut); |
| } |
| } |
| else |
| fut = oldFut; |
| |
| client = fut.get(); |
| |
| if (client == null) |
| continue; |
| |
| if (getSpiContext().node(nodeId) == null) { |
| if (clients.remove(nodeId, client)) |
| client.forceClose(); |
| |
| throw new IgniteSpiException("Destination node is not in topology: " + node.id()); |
| } |
| } |
| |
| if (client.reserve()) |
| return client; |
| else |
| // Client has just been closed by idle worker. Help it and try again. |
| clients.remove(nodeId, client); |
| } |
| } |
| |
| /** |
| * @param node Node to create client for. |
| * @return Client. |
| * @throws IgniteCheckedException If failed. |
| */ |
| @Nullable protected GridCommunicationClient createNioClient(ClusterNode node) throws IgniteCheckedException { |
| assert node != null; |
| |
| Integer shmemPort = node.attribute(createSpiAttributeName(ATTR_SHMEM_PORT)); |
| |
| ClusterNode locNode = getSpiContext().localNode(); |
| |
| if (locNode == null) |
| throw new IgniteCheckedException("Failed to create NIO client (local node is stopping)"); |
| |
| // If remote node has shared memory server enabled and has the same set of MACs |
| // then we are likely to run on the same host and shared memory communication could be tried. |
| if (shmemPort != null && U.sameMacs(locNode, node)) { |
| try { |
| return createShmemClient(node, shmemPort); |
| } |
| catch (IgniteCheckedException e) { |
| if (e.hasCause(IpcOutOfSystemResourcesException.class)) |
| // Has cause or is itself the IpcOutOfSystemResourcesException. |
| LT.warn(log, null, OUT_OF_RESOURCES_TCP_MSG); |
| else if (getSpiContext().node(node.id()) != null) |
| LT.warn(log, null, e.getMessage()); |
| else if (log.isDebugEnabled()) |
| log.debug("Failed to establish shared memory connection with local node (node has left): " + |
| node.id()); |
| } |
| } |
| |
| return createTcpClient(node); |
| } |
| |
| /** |
| * @param node Node. |
| * @param port Port. |
| * @return Client. |
| * @throws IgniteCheckedException If failed. |
| */ |
| @Nullable protected GridCommunicationClient createShmemClient(ClusterNode node, Integer port) throws IgniteCheckedException { |
| int attempt = 1; |
| |
| int connectAttempts = 1; |
| |
| long connTimeout0 = connTimeout; |
| |
| while (true) { |
| GridCommunicationClient client; |
| |
| try { |
| client = new GridShmemCommunicationClient(metricsLsnr, |
| port, |
| connTimeout, |
| log, |
| getSpiContext().messageFormatter()); |
| } |
| catch (IgniteCheckedException e) { |
| // Reconnect for the second time, if connection is not established. |
| if (connectAttempts < 2 && X.hasCause(e, ConnectException.class)) { |
| connectAttempts++; |
| |
| continue; |
| } |
| |
| throw e; |
| } |
| |
| try { |
| safeHandshake(client, null, node.id(), connTimeout0, null); |
| } |
| catch (HandshakeTimeoutException e) { |
| if (log.isDebugEnabled()) |
| log.debug("Handshake timedout (will retry with increased timeout) [timeout=" + connTimeout0 + |
| ", err=" + e.getMessage() + ", client=" + client + ']'); |
| |
| client.forceClose(); |
| |
| if (attempt == reconCnt || connTimeout0 > maxConnTimeout) { |
| if (log.isDebugEnabled()) |
| log.debug("Handshake timedout (will stop attempts to perform the handshake) " + |
| "[timeout=" + connTimeout0 + ", maxConnTimeout=" + maxConnTimeout + |
| ", attempt=" + attempt + ", reconCnt=" + reconCnt + |
| ", err=" + e.getMessage() + ", client=" + client + ']'); |
| |
| throw e; |
| } |
| else { |
| attempt++; |
| |
| connTimeout0 *= 2; |
| |
| continue; |
| } |
| } |
| catch (IgniteCheckedException | RuntimeException | Error e) { |
| if (log.isDebugEnabled()) |
| log.debug( |
| "Caught exception (will close client) [err=" + e.getMessage() + ", client=" + client + ']'); |
| |
| client.forceClose(); |
| |
| throw e; |
| } |
| |
| return client; |
| } |
| } |
| |
| /** |
| * Checks client message queue size and initiates client drop if message queue size exceeds the configured limit. |
| * |
| * @param ses Node communication session. |
| * @param msgQueueSize Message queue size. |
| */ |
| private void checkClientQueueSize(GridNioSession ses, int msgQueueSize) { |
| if (slowClientQueueLimit > 0 && msgQueueSize > slowClientQueueLimit) { |
| UUID id = ses.meta(NODE_ID_META); |
| |
| if (id != null) { |
| ClusterNode node = getSpiContext().node(id); |
| |
| if (node != null && node.isClient()) { |
| String msg = "Client node outbound message queue size exceeded slowClientQueueLimit, " + |
| "the client will be dropped " + |
| "(consider changing 'slowClientQueueLimit' configuration property) " + |
| "[srvNode=" + getSpiContext().localNode().id() + |
| ", clientNode=" + node + |
| ", slowClientQueueLimit=" + slowClientQueueLimit + ']'; |
| |
| U.quietAndWarn( |
| log, |
| msg); |
| |
| getSpiContext().failNode(id, msg); |
| } |
| } |
| } |
| } |
| |
| /** |
| * Establish TCP connection to remote node and returns client. |
| * |
| * @param node Remote node. |
| * @return Client. |
| * @throws IgniteCheckedException If failed. |
| */ |
| protected GridCommunicationClient createTcpClient(ClusterNode node) throws IgniteCheckedException { |
| Collection<String> rmtAddrs0 = node.attribute(createSpiAttributeName(ATTR_ADDRS)); |
| Collection<String> rmtHostNames0 = node.attribute(createSpiAttributeName(ATTR_HOST_NAMES)); |
| Integer boundPort = node.attribute(createSpiAttributeName(ATTR_PORT)); |
| Collection<InetSocketAddress> extAddrs = node.attribute(createSpiAttributeName(ATTR_EXT_ADDRS)); |
| |
| boolean isRmtAddrsExist = (!F.isEmpty(rmtAddrs0) && boundPort != null); |
| boolean isExtAddrsExist = !F.isEmpty(extAddrs); |
| |
| if (!isRmtAddrsExist && !isExtAddrsExist) |
| throw new IgniteCheckedException("Failed to send message to the destination node. Node doesn't have any " + |
| "TCP communication addresses or mapped external addresses. Check configuration and make sure " + |
| "that you use the same communication SPI on all nodes. Remote node id: " + node.id()); |
| |
| LinkedHashSet<InetSocketAddress> addrs; |
| |
| // Try to connect first on bound addresses. |
| if (isRmtAddrsExist) { |
| List<InetSocketAddress> addrs0 = new ArrayList<>(U.toSocketAddresses(rmtAddrs0, rmtHostNames0, boundPort)); |
| |
| boolean sameHost = U.sameMacs(getSpiContext().localNode(), node); |
| |
| Collections.sort(addrs0, U.inetAddressesComparator(sameHost)); |
| |
| addrs = new LinkedHashSet<>(addrs0); |
| } |
| else |
| addrs = new LinkedHashSet<>(); |
| |
| // Then on mapped external addresses. |
| if (isExtAddrsExist) |
| addrs.addAll(extAddrs); |
| |
| boolean conn = false; |
| GridCommunicationClient client = null; |
| IgniteCheckedException errs = null; |
| |
| int connectAttempts = 1; |
| |
| for (InetSocketAddress addr : addrs) { |
| long connTimeout0 = connTimeout; |
| |
| int attempt = 1; |
| |
| while (!conn) { // Reconnection on handshake timeout. |
| try { |
| SocketChannel ch = SocketChannel.open(); |
| |
| ch.configureBlocking(true); |
| |
| ch.socket().setTcpNoDelay(tcpNoDelay); |
| ch.socket().setKeepAlive(true); |
| |
| if (sockRcvBuf > 0) |
| ch.socket().setReceiveBufferSize(sockRcvBuf); |
| |
| if (sockSndBuf > 0) |
| ch.socket().setSendBufferSize(sockSndBuf); |
| |
| GridNioRecoveryDescriptor recoveryDesc = recoveryDescriptor(node); |
| |
| if (!recoveryDesc.reserve()) { |
| U.closeQuiet(ch); |
| |
| return null; |
| } |
| |
| long rcvCnt = -1; |
| |
| SSLEngine sslEngine = null; |
| |
| try { |
| ch.socket().connect(addr, (int)connTimeout); |
| |
| if (isSslEnabled()) { |
| sslEngine = ignite.configuration().getSslContextFactory().create().createSSLEngine(); |
| |
| sslEngine.setUseClientMode(true); |
| } |
| |
| rcvCnt = safeHandshake(ch, recoveryDesc, node.id(), connTimeout0, sslEngine); |
| |
| if (rcvCnt == -1) |
| return null; |
| } |
| finally { |
| if (recoveryDesc != null && rcvCnt == -1) |
| recoveryDesc.release(); |
| } |
| |
| try { |
| Map<Integer, Object> meta = new HashMap<>(); |
| |
| meta.put(NODE_ID_META, node.id()); |
| |
| if (isSslEnabled()) { |
| assert sslEngine != null; |
| |
| meta.put(GridNioSessionMetaKey.SSL_ENGINE.ordinal(), sslEngine); |
| } |
| if (recoveryDesc != null) { |
| recoveryDesc.onHandshake(rcvCnt); |
| |
| meta.put(-1, recoveryDesc); |
| } |
| |
| GridNioSession ses = nioSrvr.createSession(ch, meta).get(); |
| |
| client = new GridTcpNioCommunicationClient(ses, log); |
| |
| conn = true; |
| } |
| finally { |
| if (!conn) { |
| if (recoveryDesc != null) |
| recoveryDesc.release(); |
| } |
| } |
| } |
| catch (HandshakeTimeoutException e) { |
| if (client != null) { |
| client.forceClose(); |
| |
| client = null; |
| } |
| |
| onException("Handshake timedout (will retry with increased timeout) [timeout=" + connTimeout0 + |
| ", addr=" + addr + ']', e); |
| |
| if (log.isDebugEnabled()) |
| log.debug( |
| "Handshake timedout (will retry with increased timeout) [timeout=" + connTimeout0 + |
| ", addr=" + addr + ", err=" + e + ']'); |
| |
| if (attempt == reconCnt || connTimeout0 > maxConnTimeout) { |
| if (log.isDebugEnabled()) |
| log.debug("Handshake timedout (will stop attempts to perform the handshake) " + |
| "[timeout=" + connTimeout0 + ", maxConnTimeout=" + maxConnTimeout + |
| ", attempt=" + attempt + ", reconCnt=" + reconCnt + |
| ", err=" + e.getMessage() + ", addr=" + addr + ']'); |
| |
| if (errs == null) |
| errs = new IgniteCheckedException("Failed to connect to node (is node still alive?). " + |
| "Make sure that each GridComputeTask and GridCacheTransaction has a timeout set " + |
| "in order to prevent parties from waiting forever in case of network issues " + |
| "[nodeId=" + node.id() + ", addrs=" + addrs + ']'); |
| |
| errs.addSuppressed(new IgniteCheckedException("Failed to connect to address: " + addr, e)); |
| |
| break; |
| } |
| else { |
| attempt++; |
| |
| connTimeout0 *= 2; |
| |
| // Continue loop. |
| } |
| } |
| catch (Exception e) { |
| if (client != null) { |
| client.forceClose(); |
| |
| client = null; |
| } |
| |
| onException("Client creation failed [addr=" + addr + ", err=" + e + ']', e); |
| |
| if (log.isDebugEnabled()) |
| log.debug("Client creation failed [addr=" + addr + ", err=" + e + ']'); |
| |
| if (X.hasCause(e, SocketTimeoutException.class)) |
| LT.warn(log, null, "Connect timed out (consider increasing 'connTimeout' " + |
| "configuration property) [addr=" + addr + ", connTimeout=" + connTimeout + ']'); |
| |
| if (errs == null) |
| errs = new IgniteCheckedException("Failed to connect to node (is node still alive?). " + |
| "Make sure that each GridComputeTask and GridCacheTransaction has a timeout set " + |
| "in order to prevent parties from waiting forever in case of network issues " + |
| "[nodeId=" + node.id() + ", addrs=" + addrs + ']'); |
| |
| errs.addSuppressed(new IgniteCheckedException("Failed to connect to address: " + addr, e)); |
| |
| // Reconnect for the second time, if connection is not established. |
| if (connectAttempts < 2 && |
| (e instanceof ConnectException || X.hasCause(e, ConnectException.class))) { |
| connectAttempts++; |
| |
| continue; |
| } |
| |
| break; |
| } |
| } |
| |
| if (conn) |
| break; |
| } |
| |
| if (client == null) { |
| assert errs != null; |
| |
| if (X.hasCause(errs, ConnectException.class)) |
| LT.warn(log, null, "Failed to connect to a remote node " + |
| "(make sure that destination node is alive and " + |
| "operating system firewall is disabled on local and remote hosts) " + |
| "[addrs=" + addrs + ']'); |
| |
| throw errs; |
| } |
| |
| if (log.isDebugEnabled()) |
| log.debug("Created client: " + client); |
| |
| return client; |
| } |
| |
| /** |
| * Performs handshake in timeout-safe way. |
| * |
| * @param client Client. |
| * @param recovery Recovery descriptor if use recovery handshake, otherwise {@code null}. |
| * @param rmtNodeId Remote node. |
| * @param timeout Timeout for handshake. |
| * @param ssl SSL engine if used cryptography, otherwise {@code null}. |
| * @throws IgniteCheckedException If handshake failed or wasn't completed withing timeout. |
| * @return Handshake response. |
| */ |
| @SuppressWarnings("ThrowFromFinallyBlock") |
| private <T> long safeHandshake( |
| T client, |
| @Nullable GridNioRecoveryDescriptor recovery, |
| UUID rmtNodeId, |
| long timeout, |
| @Nullable SSLEngine ssl |
| ) throws IgniteCheckedException { |
| HandshakeTimeoutObject<T> obj = new HandshakeTimeoutObject<>(client, U.currentTimeMillis() + timeout); |
| |
| addTimeoutObject(obj); |
| |
| long rcvCnt = 0; |
| |
| try { |
| if (client instanceof GridCommunicationClient) |
| ((GridCommunicationClient)client).doHandshake(new HandshakeClosure(rmtNodeId)); |
| else { |
| SocketChannel ch = (SocketChannel)client; |
| |
| boolean success = false; |
| |
| try { |
| BlockingSslHandler sslHnd = null; |
| |
| ByteBuffer buf; |
| |
| if (isSslEnabled()) { |
| sslHnd = new BlockingSslHandler(ssl, ch, directBuf, ByteOrder.nativeOrder(), log); |
| |
| if (!sslHnd.handshake()) |
| throw new IgniteCheckedException("SSL handshake is not completed."); |
| |
| ByteBuffer handBuff = sslHnd.applicationBuffer(); |
| |
| if (handBuff.remaining() < 17) { |
| buf = ByteBuffer.allocate(1000); |
| |
| int read = ch.read(buf); |
| |
| if (read == -1) |
| throw new IgniteCheckedException("Failed to read remote node ID (connection closed)."); |
| |
| buf.flip(); |
| |
| buf = sslHnd.decode(buf); |
| } |
| else |
| buf = handBuff; |
| } |
| else { |
| buf = ByteBuffer.allocate(17); |
| |
| for (int i = 0; i < 17; ) { |
| int read = ch.read(buf); |
| |
| if (read == -1) |
| throw new IgniteCheckedException("Failed to read remote node ID (connection closed)."); |
| |
| i += read; |
| } |
| } |
| |
| UUID rmtNodeId0 = U.bytesToUuid(buf.array(), 1); |
| |
| if (!rmtNodeId.equals(rmtNodeId0)) |
| throw new IgniteCheckedException("Remote node ID is not as expected [expected=" + rmtNodeId + |
| ", rcvd=" + rmtNodeId0 + ']'); |
| else if (log.isDebugEnabled()) |
| log.debug("Received remote node ID: " + rmtNodeId0); |
| |
| if (isSslEnabled() ) { |
| assert sslHnd != null; |
| |
| ch.write(sslHnd.encrypt(ByteBuffer.wrap(U.IGNITE_HEADER))); |
| } |
| else |
| ch.write(ByteBuffer.wrap(U.IGNITE_HEADER)); |
| |
| if (recovery != null) { |
| HandshakeMessage msg = new HandshakeMessage(getLocalNodeId(), |
| recovery.incrementConnectCount(), |
| recovery.receivedCount()); |
| |
| if (log.isDebugEnabled()) |
| log.debug("Write handshake message [rmtNode=" + rmtNodeId + ", msg=" + msg + ']'); |
| |
| buf = ByteBuffer.allocate(33); |
| |
| buf.order(ByteOrder.nativeOrder()); |
| |
| boolean written = msg.writeTo(buf, getSpiContext().messageFormatter().writer()); |
| |
| assert written; |
| |
| buf.flip(); |
| |
| if (isSslEnabled()) { |
| assert sslHnd != null; |
| |
| ch.write(sslHnd.encrypt(buf)); |
| } |
| else |
| ch.write(buf); |
| } |
| else { |
| if (isSslEnabled()) { |
| assert sslHnd != null; |
| |
| ch.write(sslHnd.encrypt(ByteBuffer.wrap(nodeIdMsg.nodeIdBytesWithType))); |
| } |
| else |
| ch.write(ByteBuffer.wrap(nodeIdMsg.nodeIdBytesWithType)); |
| } |
| |
| if (recovery != null) { |
| if (log.isDebugEnabled()) |
| log.debug("Waiting for handshake [rmtNode=" + rmtNodeId + ']'); |
| |
| if (isSslEnabled()) { |
| assert sslHnd != null; |
| |
| buf = ByteBuffer.allocate(1000); |
| |
| ByteBuffer decode = null; |
| |
| buf.order(ByteOrder.nativeOrder()); |
| |
| for (int i = 0; i < 9; ) { |
| int read = ch.read(buf); |
| |
| if (read == -1) |
| throw new IgniteCheckedException("Failed to read remote node recovery handshake " + |
| "(connection closed)."); |
| |
| buf.flip(); |
| |
| decode = sslHnd.decode(buf); |
| |
| i += decode.remaining(); |
| |
| buf.flip(); |
| buf.compact(); |
| } |
| |
| rcvCnt = decode.getLong(1); |
| } |
| else { |
| buf = ByteBuffer.allocate(9); |
| |
| buf.order(ByteOrder.nativeOrder()); |
| |
| for (int i = 0; i < 9; ) { |
| int read = ch.read(buf); |
| |
| if (read == -1) |
| throw new IgniteCheckedException("Failed to read remote node recovery handshake " + |
| "(connection closed)."); |
| |
| i += read; |
| } |
| |
| rcvCnt = buf.getLong(1); |
| } |
| |
| if (log.isDebugEnabled()) |
| log.debug("Received handshake message [rmtNode=" + rmtNodeId + ", rcvCnt=" + rcvCnt + ']'); |
| |
| if (rcvCnt == -1) { |
| if (log.isDebugEnabled()) |
| log.debug("Connection rejected, will retry client creation [rmtNode=" + rmtNodeId + ']'); |
| } |
| else |
| success = true; |
| } |
| else |
| success = true; |
| } |
| catch (IOException e) { |
| if (log.isDebugEnabled()) |
| log.debug("Failed to read from channel: " + e); |
| |
| throw new IgniteCheckedException("Failed to read from channel.", e); |
| } |
| finally { |
| if (!success) |
| U.closeQuiet(ch); |
| } |
| } |
| } |
| finally { |
| boolean cancelled = obj.cancel(); |
| |
| if (cancelled) |
| removeTimeoutObject(obj); |
| |
| // Ignoring whatever happened after timeout - reporting only timeout event. |
| if (!cancelled) |
| throw new HandshakeTimeoutException("Failed to perform handshake due to timeout (consider increasing " + |
| "'connectionTimeout' configuration property)."); |
| } |
| |
| return rcvCnt; |
| } |
| |
| /** |
| * @param sndId Sender ID. |
| * @param msg Communication message. |
| * @param msgC Closure to call when message processing finished. |
| */ |
| protected void notifyListener(UUID sndId, Message msg, IgniteRunnable msgC) { |
| CommunicationListener<Message> lsnr = this.lsnr; |
| |
| if (lsnr != null) |
| // Notify listener of a new message. |
| lsnr.onMessage(sndId, msg, msgC); |
| else if (log.isDebugEnabled()) |
| log.debug("Received communication message without any registered listeners (will ignore, " + |
| "is node stopping?) [senderNodeId=" + sndId + ", msg=" + msg + ']'); |
| } |
| |
| /** |
| * Stops service threads to simulate node failure. |
| * |
| * FOR TEST PURPOSES ONLY!!! |
| */ |
| void simulateNodeFailure() { |
| if (nioSrvr != null) |
| nioSrvr.stop(); |
| |
| U.interrupt(commWorker); |
| |
| U.join(commWorker, log); |
| |
| for (GridCommunicationClient client : clients.values()) |
| client.forceClose(); |
| } |
| |
| /** |
| * @param node Node. |
| * @return Recovery receive data for given node. |
| */ |
| private GridNioRecoveryDescriptor recoveryDescriptor(ClusterNode node) { |
| ClientKey id = new ClientKey(node.id(), node.order()); |
| |
| GridNioRecoveryDescriptor recovery = recoveryDescs.get(id); |
| |
| if (recovery == null) { |
| int maxSize = Math.max(msgQueueLimit, ackSndThreshold); |
| |
| int queueLimit = unackedMsgsBufSize != 0 ? unackedMsgsBufSize : (maxSize * 5); |
| |
| GridNioRecoveryDescriptor old = |
| recoveryDescs.putIfAbsent(id, recovery = new GridNioRecoveryDescriptor(queueLimit, node, log)); |
| |
| if (old != null) |
| recovery = old; |
| } |
| |
| return recovery; |
| } |
| |
| /** |
| * @param msg Error message. |
| * @param e Exception. |
| */ |
| private void onException(String msg, Exception e) { |
| getExceptionRegistry().onException(msg, e); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public String toString() { |
| return S.toString(TcpCommunicationSpi.class, this); |
| } |
| |
| /** |
| * |
| */ |
| private static class ClientKey { |
| /** */ |
| private UUID nodeId; |
| |
| /** */ |
| private long order; |
| |
| /** |
| * @param nodeId Node ID. |
| * @param order Node order. |
| */ |
| private ClientKey(UUID nodeId, long order) { |
| this.nodeId = nodeId; |
| this.order = order; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public boolean equals(Object obj) { |
| if (this == obj) |
| return true; |
| |
| if (obj == null || getClass() != obj.getClass()) |
| return false; |
| |
| ClientKey other = (ClientKey)obj; |
| |
| return order == other.order && nodeId.equals(other.nodeId); |
| |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public int hashCode() { |
| int res = nodeId.hashCode(); |
| |
| res = 31 * res + (int)(order ^ (order >>> 32)); |
| |
| return res; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public String toString() { |
| return S.toString(ClientKey.class, this); |
| } |
| } |
| |
| /** Internal exception class for proper timeout handling. */ |
| private static class HandshakeTimeoutException extends IgniteCheckedException { |
| /** */ |
| private static final long serialVersionUID = 0L; |
| |
| /** |
| * @param msg Message. |
| */ |
| HandshakeTimeoutException(String msg) { |
| super(msg); |
| } |
| } |
| |
| /** |
| * This worker takes responsibility to shut the server down when stopping, |
| * No other thread shall stop passed server. |
| */ |
| private class ShmemAcceptWorker extends GridWorker { |
| /** */ |
| private final IpcSharedMemoryServerEndpoint srv; |
| |
| /** |
| * @param srv Server. |
| */ |
| ShmemAcceptWorker(IpcSharedMemoryServerEndpoint srv) { |
| super(gridName, "shmem-communication-acceptor", TcpCommunicationSpi.this.log); |
| |
| this.srv = srv; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override protected void body() throws InterruptedException { |
| try { |
| while (!Thread.interrupted()) { |
| ShmemWorker e = new ShmemWorker(srv.accept()); |
| |
| shmemWorkers.add(e); |
| |
| new IgniteThread(e).start(); |
| } |
| } |
| catch (IgniteCheckedException e) { |
| if (!isCancelled()) |
| U.error(log, "Shmem server failed.", e); |
| } |
| finally { |
| srv.close(); |
| } |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void cancel() { |
| super.cancel(); |
| |
| srv.close(); |
| } |
| } |
| |
| /** |
| * |
| */ |
| private class ShmemWorker extends GridWorker { |
| /** */ |
| private final IpcEndpoint endpoint; |
| |
| /** |
| * @param endpoint Endpoint. |
| */ |
| private ShmemWorker(IpcEndpoint endpoint) { |
| super(gridName, "shmem-worker", TcpCommunicationSpi.this.log); |
| |
| this.endpoint = endpoint; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override protected void body() throws InterruptedException { |
| try { |
| MessageFactory msgFactory = new MessageFactory() { |
| private MessageFactory impl; |
| |
| @Nullable @Override public Message create(byte type) { |
| if (impl == null) |
| impl = getSpiContext().messageFactory(); |
| |
| assert impl != null; |
| |
| return impl.create(type); |
| } |
| }; |
| |
| MessageFormatter msgFormatter = new MessageFormatter() { |
| private MessageFormatter impl; |
| |
| @Override public MessageWriter writer() { |
| if (impl == null) |
| impl = getSpiContext().messageFormatter(); |
| |
| assert impl != null; |
| |
| return impl.writer(); |
| } |
| |
| @Override public MessageReader reader(MessageFactory factory) { |
| if (impl == null) |
| impl = getSpiContext().messageFormatter(); |
| |
| assert impl != null; |
| |
| return impl.reader(factory); |
| } |
| }; |
| |
| IpcToNioAdapter<Message> adapter = new IpcToNioAdapter<>( |
| metricsLsnr, |
| log, |
| endpoint, |
| srvLsnr, |
| msgFormatter, |
| new GridNioCodecFilter(new GridDirectParser(msgFactory, msgFormatter), log, true), |
| new GridConnectionBytesVerifyFilter(log) |
| ); |
| |
| adapter.serve(); |
| } |
| finally { |
| shmemWorkers.remove(this); |
| |
| endpoint.close(); |
| } |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void cancel() { |
| super.cancel(); |
| |
| endpoint.close(); |
| } |
| |
| /** @{@inheritDoc} */ |
| @Override protected void cleanup() { |
| super.cleanup(); |
| |
| endpoint.close(); |
| } |
| |
| /** @{@inheritDoc} */ |
| @Override public String toString() { |
| return S.toString(ShmemWorker.class, this); |
| } |
| } |
| |
| /** |
| * |
| */ |
| private class CommunicationWorker extends IgniteSpiThread { |
| /** */ |
| private final BlockingQueue<GridNioRecoveryDescriptor> q = new LinkedBlockingQueue<>(); |
| |
| /** |
| * |
| */ |
| private CommunicationWorker() { |
| super(gridName, "tcp-comm-worker", log); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override protected void body() throws InterruptedException { |
| if (log.isDebugEnabled()) |
| log.debug("Tcp communication worker has been started."); |
| |
| while (!isInterrupted()) { |
| GridNioRecoveryDescriptor recoveryDesc = q.poll(idleConnTimeout, TimeUnit.MILLISECONDS); |
| |
| if (recoveryDesc != null) |
| processRecovery(recoveryDesc); |
| else |
| processIdle(); |
| } |
| } |
| |
| /** |
| * |
| */ |
| private void processIdle() { |
| cleanupRecovery(); |
| |
| for (Map.Entry<UUID, GridCommunicationClient> e : clients.entrySet()) { |
| UUID nodeId = e.getKey(); |
| |
| GridCommunicationClient client = e.getValue(); |
| |
| ClusterNode node = getSpiContext().node(nodeId); |
| |
| if (node == null) { |
| if (log.isDebugEnabled()) |
| log.debug("Forcing close of non-existent node connection: " + nodeId); |
| |
| client.forceClose(); |
| |
| clients.remove(nodeId, client); |
| |
| continue; |
| } |
| |
| GridNioRecoveryDescriptor recovery = null; |
| |
| if (client instanceof GridTcpNioCommunicationClient) { |
| recovery = recoveryDescs.get(new ClientKey(node.id(), node.order())); |
| |
| if (recovery != null && recovery.lastAcknowledged() != recovery.received()) { |
| RecoveryLastReceivedMessage msg = new RecoveryLastReceivedMessage(recovery.received()); |
| |
| if (log.isDebugEnabled()) |
| log.debug("Send recovery acknowledgement on timeout [rmtNode=" + nodeId + |
| ", rcvCnt=" + msg.received() + ']'); |
| |
| nioSrvr.sendSystem(((GridTcpNioCommunicationClient)client).session(), msg); |
| |
| recovery.lastAcknowledged(msg.received()); |
| |
| continue; |
| } |
| } |
| |
| long idleTime = client.getIdleTime(); |
| |
| if (idleTime >= idleConnTimeout) { |
| if (recovery != null && |
| recovery.nodeAlive(getSpiContext().node(nodeId)) && |
| !recovery.messagesFutures().isEmpty()) { |
| if (log.isDebugEnabled()) |
| log.debug("Node connection is idle, but there are unacknowledged messages, " + |
| "will wait: " + nodeId); |
| |
| continue; |
| } |
| |
| if (log.isDebugEnabled()) |
| log.debug("Closing idle node connection: " + nodeId); |
| |
| if (client.close() || client.closed()) |
| clients.remove(nodeId, client); |
| } |
| } |
| } |
| |
| /** |
| * |
| */ |
| private void cleanupRecovery() { |
| Set<ClientKey> left = null; |
| |
| for (Map.Entry<ClientKey, GridNioRecoveryDescriptor> e : recoveryDescs.entrySet()) { |
| if (left != null && left.contains(e.getKey())) |
| continue; |
| |
| GridNioRecoveryDescriptor recoverySnd = e.getValue(); |
| |
| if (!recoverySnd.nodeAlive(getSpiContext().node(recoverySnd.node().id()))) { |
| if (left == null) |
| left = new HashSet<>(); |
| |
| left.add(e.getKey()); |
| } |
| } |
| |
| if (left != null) { |
| assert !left.isEmpty(); |
| |
| for (ClientKey id : left) { |
| GridNioRecoveryDescriptor recoverySnd = recoveryDescs.remove(id); |
| |
| if (recoverySnd != null) |
| recoverySnd.onNodeLeft(); |
| } |
| } |
| } |
| |
| /** |
| * @param recoveryDesc Recovery descriptor. |
| */ |
| private void processRecovery(GridNioRecoveryDescriptor recoveryDesc) { |
| ClusterNode node = recoveryDesc.node(); |
| |
| if (clients.containsKey(node.id()) || |
| !recoveryDesc.nodeAlive(getSpiContext().node(node.id())) || |
| !getSpiContext().pingNode(node.id())) |
| return; |
| |
| try { |
| if (log.isDebugEnabled()) |
| log.debug("Recovery reconnect [rmtNode=" + recoveryDesc.node().id() + ']'); |
| |
| GridCommunicationClient client = reserveClient(node); |
| |
| client.release(); |
| } |
| catch (IgniteCheckedException | IgniteException e) { |
| if (recoveryDesc.nodeAlive(getSpiContext().node(node.id()))) { |
| if (log.isDebugEnabled()) |
| log.debug("Recovery reconnect failed, will retry " + |
| "[rmtNode=" + recoveryDesc.node().id() + ", err=" + e + ']'); |
| |
| addReconnectRequest(recoveryDesc); |
| } |
| else { |
| if (log.isDebugEnabled()) |
| log.debug("Recovery reconnect failed, " + |
| "node left [rmtNode=" + recoveryDesc.node().id() + ", err=" + e + ']'); |
| |
| onException("Recovery reconnect failed, node left [rmtNode=" + recoveryDesc.node().id() + "]", |
| e); |
| } |
| } |
| } |
| |
| /** |
| * @param recoverySnd Recovery send data. |
| */ |
| void addReconnectRequest(GridNioRecoveryDescriptor recoverySnd) { |
| boolean add = q.add(recoverySnd); |
| |
| assert add; |
| } |
| } |
| |
| /** |
| * |
| */ |
| private static class ConnectFuture extends GridFutureAdapter<GridCommunicationClient> { |
| /** */ |
| private static final long serialVersionUID = 0L; |
| |
| // No-op. |
| } |
| |
| /** |
| * |
| */ |
| private static class HandshakeTimeoutObject<T> implements IgniteSpiTimeoutObject { |
| /** */ |
| private final IgniteUuid id = IgniteUuid.randomUuid(); |
| |
| /** */ |
| private final T obj; |
| |
| /** */ |
| private final long endTime; |
| |
| /** */ |
| private final AtomicBoolean done = new AtomicBoolean(); |
| |
| /** |
| * @param obj Client. |
| * @param endTime End time. |
| */ |
| private HandshakeTimeoutObject(T obj, long endTime) { |
| assert obj != null; |
| assert obj instanceof GridCommunicationClient || obj instanceof SelectableChannel; |
| assert endTime > 0; |
| |
| this.obj = obj; |
| this.endTime = endTime; |
| } |
| |
| /** |
| * @return {@code True} if object has not yet been timed out. |
| */ |
| boolean cancel() { |
| return done.compareAndSet(false, true); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void onTimeout() { |
| if (done.compareAndSet(false, true)) { |
| // Close socket - timeout occurred. |
| if (obj instanceof GridCommunicationClient) |
| ((GridCommunicationClient)obj).forceClose(); |
| else |
| U.closeQuiet((AbstractInterruptibleChannel)obj); |
| } |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public long endTime() { |
| return endTime; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public IgniteUuid id() { |
| return id; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public String toString() { |
| return S.toString(HandshakeTimeoutObject.class, this); |
| } |
| } |
| |
| /** |
| * |
| */ |
| private class HandshakeClosure extends IgniteInClosure2X<InputStream, OutputStream> { |
| /** */ |
| private static final long serialVersionUID = 0L; |
| |
| /** */ |
| private final UUID rmtNodeId; |
| |
| /** |
| * @param rmtNodeId Remote node ID. |
| */ |
| private HandshakeClosure(UUID rmtNodeId) { |
| this.rmtNodeId = rmtNodeId; |
| } |
| |
| /** {@inheritDoc} */ |
| @SuppressWarnings("ThrowFromFinallyBlock") |
| @Override public void applyx(InputStream in, OutputStream out) throws IgniteCheckedException { |
| try { |
| // Handshake. |
| byte[] b = new byte[17]; |
| |
| int n = 0; |
| |
| while (n < 17) { |
| int cnt = in.read(b, n, 17 - n); |
| |
| if (cnt < 0) |
| throw new IgniteCheckedException("Failed to get remote node ID (end of stream reached)"); |
| |
| n += cnt; |
| } |
| |
| // First 4 bytes are for length. |
| UUID id = U.bytesToUuid(b, 1); |
| |
| if (!rmtNodeId.equals(id)) |
| throw new IgniteCheckedException("Remote node ID is not as expected [expected=" + rmtNodeId + |
| ", rcvd=" + id + ']'); |
| else if (log.isDebugEnabled()) |
| log.debug("Received remote node ID: " + id); |
| } |
| catch (SocketTimeoutException e) { |
| throw new IgniteCheckedException("Failed to perform handshake due to timeout (consider increasing " + |
| "'connectionTimeout' configuration property).", e); |
| } |
| catch (IOException e) { |
| throw new IgniteCheckedException("Failed to perform handshake.", e); |
| } |
| |
| try { |
| out.write(U.IGNITE_HEADER); |
| out.write(NODE_ID_MSG_TYPE); |
| out.write(nodeIdMsg.nodeIdBytes); |
| |
| out.flush(); |
| |
| if (log.isDebugEnabled()) |
| log.debug("Sent local node ID [locNodeId=" + getLocalNodeId() + ", rmtNodeId=" |
| + rmtNodeId + ']'); |
| } |
| catch (IOException e) { |
| throw new IgniteCheckedException("Failed to perform handshake.", e); |
| } |
| } |
| } |
| |
| /** |
| * Handshake message. |
| */ |
| @SuppressWarnings("PublicInnerClass") |
| public static class HandshakeMessage implements Message { |
| /** */ |
| private static final long serialVersionUID = 0L; |
| |
| /** */ |
| private UUID nodeId; |
| |
| /** */ |
| private long rcvCnt; |
| |
| /** */ |
| private long connectCnt; |
| |
| /** |
| * Default constructor required by {@link Message}. |
| */ |
| public HandshakeMessage() { |
| // No-op. |
| } |
| |
| /** |
| * @param nodeId Node ID. |
| * @param connectCnt Connect count. |
| * @param rcvCnt Number of received messages. |
| */ |
| public HandshakeMessage(UUID nodeId, long connectCnt, long rcvCnt) { |
| assert nodeId != null; |
| assert rcvCnt >= 0 : rcvCnt; |
| |
| this.nodeId = nodeId; |
| this.connectCnt = connectCnt; |
| this.rcvCnt = rcvCnt; |
| } |
| |
| /** |
| * @return Connect count. |
| */ |
| public long connectCount() { |
| return connectCnt; |
| } |
| |
| /** |
| * @return Number of received messages. |
| */ |
| public long received() { |
| return rcvCnt; |
| } |
| |
| /** |
| * @return Node ID. |
| */ |
| public UUID nodeId() { |
| return nodeId; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) { |
| if (buf.remaining() < 33) |
| return false; |
| |
| buf.put(HANDSHAKE_MSG_TYPE); |
| |
| byte[] bytes = U.uuidToBytes(nodeId); |
| |
| assert bytes.length == 16 : bytes.length; |
| |
| buf.put(bytes); |
| |
| buf.putLong(rcvCnt); |
| |
| buf.putLong(connectCnt); |
| |
| return true; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) { |
| if (buf.remaining() < 32) |
| return false; |
| |
| byte[] nodeIdBytes = new byte[16]; |
| |
| buf.get(nodeIdBytes); |
| |
| nodeId = U.bytesToUuid(nodeIdBytes, 0); |
| |
| rcvCnt = buf.getLong(); |
| |
| connectCnt = buf.getLong(); |
| |
| return true; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public byte directType() { |
| return HANDSHAKE_MSG_TYPE; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public byte fieldsCount() { |
| throw new UnsupportedOperationException(); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public String toString() { |
| return S.toString(HandshakeMessage.class, this); |
| } |
| } |
| |
| /** |
| * Recovery acknowledgment message. |
| */ |
| @SuppressWarnings("PublicInnerClass") |
| public static class RecoveryLastReceivedMessage implements Message { |
| /** */ |
| private static final long serialVersionUID = 0L; |
| |
| /** */ |
| private long rcvCnt; |
| |
| /** |
| * Default constructor required by {@link Message}. |
| */ |
| public RecoveryLastReceivedMessage() { |
| // No-op. |
| } |
| |
| /** |
| * @param rcvCnt Number of received messages. |
| */ |
| public RecoveryLastReceivedMessage(long rcvCnt) { |
| this.rcvCnt = rcvCnt; |
| } |
| |
| /** |
| * @return Number of received messages. |
| */ |
| public long received() { |
| return rcvCnt; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) { |
| if (buf.remaining() < 9) |
| return false; |
| |
| buf.put(RECOVERY_LAST_ID_MSG_TYPE); |
| |
| buf.putLong(rcvCnt); |
| |
| return true; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) { |
| if (buf.remaining() < 8) |
| return false; |
| |
| rcvCnt = buf.getLong(); |
| |
| return true; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public byte directType() { |
| return RECOVERY_LAST_ID_MSG_TYPE; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public byte fieldsCount() { |
| return 0; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public String toString() { |
| return S.toString(RecoveryLastReceivedMessage.class, this); |
| } |
| } |
| |
| /** |
| * Node ID message. |
| */ |
| @SuppressWarnings("PublicInnerClass") |
| public static class NodeIdMessage implements Message { |
| /** */ |
| private static final long serialVersionUID = 0L; |
| |
| /** */ |
| private byte[] nodeIdBytes; |
| |
| /** */ |
| private byte[] nodeIdBytesWithType; |
| |
| /** */ |
| public NodeIdMessage() { |
| // No-op. |
| } |
| |
| /** |
| * @param nodeId Node ID. |
| */ |
| private NodeIdMessage(UUID nodeId) { |
| nodeIdBytes = U.uuidToBytes(nodeId); |
| |
| nodeIdBytesWithType = new byte[nodeIdBytes.length + 1]; |
| |
| nodeIdBytesWithType[0] = NODE_ID_MSG_TYPE; |
| |
| System.arraycopy(nodeIdBytes, 0, nodeIdBytesWithType, 1, nodeIdBytes.length); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) { |
| assert nodeIdBytes.length == 16; |
| |
| if (buf.remaining() < 17) |
| return false; |
| |
| buf.put(NODE_ID_MSG_TYPE); |
| buf.put(nodeIdBytes); |
| |
| return true; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) { |
| if (buf.remaining() < 16) |
| return false; |
| |
| nodeIdBytes = new byte[16]; |
| |
| buf.get(nodeIdBytes); |
| |
| return true; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public byte directType() { |
| return NODE_ID_MSG_TYPE; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public byte fieldsCount() { |
| return 0; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public String toString() { |
| return S.toString(NodeIdMessage.class, this); |
| } |
| } |
| } |