blob: 1d55b54b069c63ee00dbb6a3a7a04cef053e684b [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.spi.discovery.tcp;
import java.io.BufferedOutputStream;
import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.Serializable;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.net.SocketException;
import java.net.SocketTimeoutException;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLServerSocketFactory;
import javax.net.ssl.SSLSocketFactory;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteAuthenticationException;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.configuration.AddressResolver;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.IgniteInterruptedCheckedException;
import org.apache.ignite.internal.util.tostring.GridToStringExclude;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.X;
import org.apache.ignite.internal.util.typedef.internal.LT;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteBiTuple;
import org.apache.ignite.lang.IgniteInClosure;
import org.apache.ignite.lang.IgniteProductVersion;
import org.apache.ignite.lang.IgniteUuid;
import org.apache.ignite.marshaller.Marshaller;
import org.apache.ignite.marshaller.jdk.JdkMarshaller;
import org.apache.ignite.resources.IgniteInstanceResource;
import org.apache.ignite.resources.LoggerResource;
import org.apache.ignite.spi.IgniteSpiAdapter;
import org.apache.ignite.spi.IgniteSpiConfiguration;
import org.apache.ignite.spi.IgniteSpiContext;
import org.apache.ignite.spi.IgniteSpiException;
import org.apache.ignite.spi.IgniteSpiMultipleInstancesSupport;
import org.apache.ignite.spi.IgniteSpiOperationTimeoutException;
import org.apache.ignite.spi.IgniteSpiOperationTimeoutHelper;
import org.apache.ignite.spi.IgniteSpiTimeoutObject;
import org.apache.ignite.spi.IgniteSpiVersionCheckException;
import org.apache.ignite.spi.discovery.DiscoveryMetricsProvider;
import org.apache.ignite.spi.discovery.DiscoverySpi;
import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage;
import org.apache.ignite.spi.discovery.DiscoverySpiDataExchange;
import org.apache.ignite.spi.discovery.DiscoverySpiHistorySupport;
import org.apache.ignite.spi.discovery.DiscoverySpiListener;
import org.apache.ignite.spi.discovery.DiscoverySpiNodeAuthenticator;
import org.apache.ignite.spi.discovery.DiscoverySpiOrderSupport;
import org.apache.ignite.spi.discovery.tcp.internal.TcpDiscoveryNode;
import org.apache.ignite.spi.discovery.tcp.internal.TcpDiscoveryStatistics;
import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
import org.apache.ignite.spi.discovery.tcp.ipfinder.jdbc.TcpDiscoveryJdbcIpFinder;
import org.apache.ignite.spi.discovery.tcp.ipfinder.multicast.TcpDiscoveryMulticastIpFinder;
import org.apache.ignite.spi.discovery.tcp.ipfinder.sharedfs.TcpDiscoverySharedFsIpFinder;
import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryAbstractMessage;
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryAuthFailedMessage;
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryCheckFailedMessage;
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryDuplicateIdMessage;
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryEnsureDelivery;
import org.jetbrains.annotations.Nullable;
/**
* Discovery SPI implementation that uses TCP/IP for node discovery.
* <p>
* Nodes are organized in ring. So almost all network exchange (except few cases) is
* done across it.
* <p>
* If node is configured as client node (see {@link IgniteConfiguration#clientMode})
* TcpDiscoverySpi starts in client mode as well. In this case node does not take its place in the ring,
* but it connects to random node in the ring (IP taken from IP finder configured) and
* use it as a router for discovery traffic.
* Therefore slow client node or its shutdown will not affect whole cluster. If TcpDiscoverySpi
* needs to be started in server mode regardless of {@link IgniteConfiguration#clientMode},
* {@link #forceSrvMode} should be set to true.
* <p>
* At startup SPI tries to send messages to random IP taken from
* {@link TcpDiscoveryIpFinder} about self start (stops when send succeeds)
* and then this info goes to coordinator. When coordinator processes join request
* and issues node added messages and all other nodes then receive info about new node.
* <h1 class="header">Failure Detection</h1>
* Configuration defaults (see Configuration section below and
* {@link IgniteConfiguration#getFailureDetectionTimeout()}) for details) are chosen to make possible for discovery
* SPI work reliably on most of hardware and virtual deployments, but this has made failure detection time worse.
* <p>
* If it's needed to tune failure detection then it's highly recommended to do this using
* {@link IgniteConfiguration#setFailureDetectionTimeout(long)}. This failure timeout automatically controls the
* following parameters: {@link #getSocketTimeout()}, {@link #getAckTimeout()}, {@link #getMaxAckTimeout()},
* {@link #getReconnectCount()}. If any of those parameters is set explicitly, then the failure timeout setting will be
* ignored. As an example, for stable low-latency networks the failure detection timeout may be set to ~120 ms.
* <p>
* If it's required to perform advanced settings of failure detection and
* {@link IgniteConfiguration#getFailureDetectionTimeout()} is unsuitable then various {@code TcpDiscoverySpi}
* configuration parameters may be used. As an example, for stable low-latency networks the following more aggressive
* settings are recommended (which allows failure detection time ~200ms):
* <ul>
* <li>Heartbeat frequency (see {@link #setHeartbeatFrequency(long)}) - 100ms</li>
* <li>Socket timeout (see {@link #setSocketTimeout(long)}) - 200ms</li>
* <li>Message acknowledgement timeout (see {@link #setAckTimeout(long)}) - 50ms</li>
* </ul>
* <h1 class="header">Configuration</h1>
* <h2 class="header">Mandatory</h2>
* There are no mandatory configuration parameters.
* <h2 class="header">Optional</h2>
* The following configuration parameters are optional:
* <ul>
* <li>IP finder to share info about nodes IP addresses
* (see {@link #setIpFinder(TcpDiscoveryIpFinder)}).
* See the following IP finder implementations for details on configuration:
* <ul>
* <li>{@link TcpDiscoverySharedFsIpFinder}</li>
* <li>{@ignitelink org.apache.ignite.spi.discovery.tcp.ipfinder.s3.TcpDiscoveryS3IpFinder}</li>
* <li>{@link TcpDiscoveryJdbcIpFinder}</li>
* <li>{@link TcpDiscoveryVmIpFinder}</li>
* <li>{@link TcpDiscoveryMulticastIpFinder} - default</li>
* </ul>
* </li>
* </ul>
* <ul>
* </li>
* <li>Local address (see {@link #setLocalAddress(String)})</li>
* <li>Local port to bind to (see {@link #setLocalPort(int)})</li>
* <li>Local port range to try binding to if previous ports are in use
* (see {@link #setLocalPortRange(int)})</li>
* <li>Heartbeat frequency (see {@link #setHeartbeatFrequency(long)})</li>
* <li>Max missed heartbeats (see {@link #setMaxMissedHeartbeats(int)})</li>
* <li>Number of times node tries to (re)establish connection to another node
* (see {@link #setReconnectCount(int)})</li>
* <li>Network timeout (see {@link #setNetworkTimeout(long)})</li>
* <li>Socket timeout (see {@link #setSocketTimeout(long)})</li>
* <li>Message acknowledgement timeout (see {@link #setAckTimeout(long)})</li>
* <li>Maximum message acknowledgement timeout (see {@link #setMaxAckTimeout(long)})</li>
* <li>Join timeout (see {@link #setJoinTimeout(long)})</li>
* <li>Thread priority for threads started by SPI (see {@link #setThreadPriority(int)})</li>
* <li>IP finder clean frequency (see {@link #setIpFinderCleanFrequency(long)})</li>
* <li>Statistics print frequency (see {@link #setStatisticsPrintFrequency(long)}</li>
* <li>Force server mode (see {@link #setForceServerMode(boolean)}</li>
* </ul>
* <h2 class="header">Java Example</h2>
* <pre name="code" class="java">
* TcpDiscoverySpi spi = new TcpDiscoverySpi();
*
* TcpDiscoveryVmIpFinder finder =
* new GridTcpDiscoveryVmIpFinder();
*
* spi.setIpFinder(finder);
*
* IgniteConfiguration cfg = new IgniteConfiguration();
*
* // Override default discovery SPI.
* cfg.setDiscoverySpi(spi);
*
* // Start grid.
* Ignition.start(cfg);
* </pre>
* <h2 class="header">Spring Example</h2>
* TcpDiscoverySpi can be configured from Spring XML configuration file:
* <pre name="code" class="xml">
* &lt;bean id="grid.custom.cfg" class="org.apache.ignite.configuration.IgniteConfiguration" singleton="true"&gt;
* ...
* &lt;property name="discoverySpi"&gt;
* &lt;bean class="org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi"&gt;
* &lt;property name="ipFinder"&gt;
* &lt;bean class="org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder" /&gt;
* &lt;/property&gt;
* &lt;/bean&gt;
* &lt;/property&gt;
* ...
* &lt;/bean&gt;
* </pre>
* <p>
* <img src="http://ignite.apache.org/images/spring-small.png">
* <br>
* For information about Spring framework visit <a href="http://www.springframework.org/">www.springframework.org</a>
* @see DiscoverySpi
*/
@SuppressWarnings("NonPrivateFieldAccessedInSynchronizedContext")
@IgniteSpiMultipleInstancesSupport(true)
@DiscoverySpiOrderSupport(true)
@DiscoverySpiHistorySupport(true)
public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, TcpDiscoverySpiMBean {
/** Failure detection timeout feature major version. */
final static byte FAILURE_DETECTION_MAJOR_VER = 1;
/** Failure detection timeout feature minor version. */
final static byte FAILURE_DETECTION_MINOR_VER = 4;
/** Failure detection timeout feature maintainance version. */
final static byte FAILURE_DETECTION_MAINT_VER = 1;
/** Node attribute that is mapped to node's external addresses (value is <tt>disc.tcp.ext-addrs</tt>). */
public static final String ATTR_EXT_ADDRS = "disc.tcp.ext-addrs";
/** Default local port range (value is <tt>100</tt>). */
public static final int DFLT_PORT_RANGE = 100;
/** Default port to listen (value is <tt>47500</tt>). */
public static final int DFLT_PORT = 47500;
/** Default timeout for joining topology (value is <tt>0</tt>). */
public static final long DFLT_JOIN_TIMEOUT = 0;
/** Default network timeout in milliseconds (value is <tt>5000ms</tt>). */
public static final long DFLT_NETWORK_TIMEOUT = 5000;
/** Default value for thread priority (value is <tt>10</tt>). */
public static final int DFLT_THREAD_PRI = 10;
/** Default heartbeat messages issuing frequency (value is <tt>2000ms</tt>). */
public static final long DFLT_HEARTBEAT_FREQ = 2000;
/** Default size of topology snapshots history. */
public static final int DFLT_TOP_HISTORY_SIZE = 1000;
/** Default socket operations timeout in milliseconds (value is <tt>5000ms</tt>). */
public static final long DFLT_SOCK_TIMEOUT = 5000;
/** Default timeout for receiving message acknowledgement in milliseconds (value is <tt>5000ms</tt>). */
public static final long DFLT_ACK_TIMEOUT = 5000;
/** Default socket operations timeout in milliseconds (value is <tt>5000ms</tt>). */
public static final long DFLT_SOCK_TIMEOUT_CLIENT = 5000;
/** Default timeout for receiving message acknowledgement in milliseconds (value is <tt>5000ms</tt>). */
public static final long DFLT_ACK_TIMEOUT_CLIENT = 5000;
/** Default reconnect attempts count (value is <tt>10</tt>). */
public static final int DFLT_RECONNECT_CNT = 10;
/** Default max heartbeats count node can miss without initiating status check (value is <tt>1</tt>). */
public static final int DFLT_MAX_MISSED_HEARTBEATS = 1;
/** Default max heartbeats count node can miss without failing client node (value is <tt>5</tt>). */
public static final int DFLT_MAX_MISSED_CLIENT_HEARTBEATS = 5;
/** Default IP finder clean frequency in milliseconds (value is <tt>60,000ms</tt>). */
public static final long DFLT_IP_FINDER_CLEAN_FREQ = 60 * 1000;
/** Default statistics print frequency in milliseconds (value is <tt>0ms</tt>). */
public static final long DFLT_STATS_PRINT_FREQ = 0;
/** Maximum ack timeout value for receiving message acknowledgement in milliseconds (value is <tt>600,000ms</tt>). */
public static final long DFLT_MAX_ACK_TIMEOUT = 10 * 60 * 1000;
/** Local address. */
protected String locAddr;
/** Address resolver. */
private AddressResolver addrRslvr;
/** IP finder. */
protected TcpDiscoveryIpFinder ipFinder;
/** Socket operations timeout. */
private long sockTimeout; // Must be initialized in the constructor of child class.
/** Message acknowledgement timeout. */
private long ackTimeout; // Must be initialized in the constructor of child class.
/** Network timeout. */
protected long netTimeout = DFLT_NETWORK_TIMEOUT;
/** Join timeout. */
@SuppressWarnings("RedundantFieldInitialization")
protected long joinTimeout = DFLT_JOIN_TIMEOUT;
/** Thread priority for all threads started by SPI. */
protected int threadPri = DFLT_THREAD_PRI;
/** Heartbeat messages issuing frequency. */
protected long hbFreq = DFLT_HEARTBEAT_FREQ;
/** Size of topology snapshots history. */
protected int topHistSize = DFLT_TOP_HISTORY_SIZE;
/** Grid discovery listener. */
protected volatile DiscoverySpiListener lsnr;
/** Data exchange. */
protected DiscoverySpiDataExchange exchange;
/** Metrics provider. */
protected DiscoveryMetricsProvider metricsProvider;
/** Local node attributes. */
protected Map<String, Object> locNodeAttrs;
/** Local node version. */
protected IgniteProductVersion locNodeVer;
/** Local node. */
protected TcpDiscoveryNode locNode;
/** */
protected UUID cfgNodeId;
/** Local host. */
protected InetAddress locHost;
/** Internal and external addresses of local node. */
protected Collection<InetSocketAddress> locNodeAddrs;
/** Start time of the very first grid node. */
protected volatile long gridStartTime;
/** Marshaller. */
protected final Marshaller marsh = new JdkMarshaller();
/** Statistics. */
protected final TcpDiscoveryStatistics stats = new TcpDiscoveryStatistics();
/** Local port which node uses. */
protected int locPort = DFLT_PORT;
/** Local port range. */
protected int locPortRange = DFLT_PORT_RANGE;
/** Reconnect attempts count. */
@SuppressWarnings({"FieldAccessedSynchronizedAndUnsynchronized"})
private int reconCnt = DFLT_RECONNECT_CNT;
/** Statistics print frequency. */
@SuppressWarnings({"FieldAccessedSynchronizedAndUnsynchronized", "RedundantFieldInitialization"})
protected long statsPrintFreq = DFLT_STATS_PRINT_FREQ;
/** Maximum message acknowledgement timeout. */
private long maxAckTimeout = DFLT_MAX_ACK_TIMEOUT;
/** Max heartbeats count node can miss without initiating status check. */
protected int maxMissedHbs = DFLT_MAX_MISSED_HEARTBEATS;
/** Max heartbeats count node can miss without failing client node. */
protected int maxMissedClientHbs = DFLT_MAX_MISSED_CLIENT_HEARTBEATS;
/** IP finder clean frequency. */
@SuppressWarnings({"FieldAccessedSynchronizedAndUnsynchronized"})
protected long ipFinderCleanFreq = DFLT_IP_FINDER_CLEAN_FREQ;
/** Node authenticator. */
protected DiscoverySpiNodeAuthenticator nodeAuth;
/** SSL server socket factory. */
protected SSLServerSocketFactory sslSrvSockFactory;
/** SSL socket factory. */
protected SSLSocketFactory sslSockFactory;
/** Context initialization latch. */
@GridToStringExclude
private final CountDownLatch ctxInitLatch = new CountDownLatch(1);
/** */
protected final CopyOnWriteArrayList<IgniteInClosure<TcpDiscoveryAbstractMessage>> sndMsgLsnrs =
new CopyOnWriteArrayList<>();
/** */
protected final CopyOnWriteArrayList<IgniteInClosure<Socket>> incomeConnLsnrs =
new CopyOnWriteArrayList<>();
/** Logger. */
@LoggerResource
protected IgniteLogger log;
/** */
protected TcpDiscoveryImpl impl;
/** */
private boolean forceSrvMode;
/** */
private boolean clientReconnectDisabled;
/** {@inheritDoc} */
@Override public String getSpiState() {
return impl.getSpiState();
}
/** {@inheritDoc} */
@Override public int getMessageWorkerQueueSize() {
return impl.getMessageWorkerQueueSize();
}
/** {@inheritDoc} */
@Nullable @Override public UUID getCoordinator() {
return impl.getCoordinator();
}
/** {@inheritDoc} */
@Override public Collection<ClusterNode> getRemoteNodes() {
return impl.getRemoteNodes();
}
/** {@inheritDoc} */
@Nullable @Override public ClusterNode getNode(UUID nodeId) {
return impl.getNode(nodeId);
}
/** {@inheritDoc} */
@Override public boolean pingNode(UUID nodeId) {
return impl.pingNode(nodeId);
}
/** {@inheritDoc} */
@Override public void disconnect() throws IgniteSpiException {
impl.disconnect();
}
/** {@inheritDoc} */
@Override public void setAuthenticator(DiscoverySpiNodeAuthenticator auth) {
nodeAuth = auth;
}
/** {@inheritDoc} */
@Override public void sendCustomEvent(DiscoverySpiCustomMessage msg) throws IgniteException {
impl.sendCustomEvent(msg);
}
/** {@inheritDoc} */
@Override public void failNode(UUID nodeId, @Nullable String warning) {
impl.failNode(nodeId, warning);
}
/** {@inheritDoc} */
@Override public void dumpDebugInfo() {
impl.dumpDebugInfo(log);
}
/** {@inheritDoc} */
@Override public boolean isClientMode() {
if (impl == null)
throw new IllegalStateException("TcpDiscoverySpi has not started.");
return impl instanceof ClientImpl;
}
/**
* If {@code true} TcpDiscoverySpi will started in server mode regardless
* of {@link IgniteConfiguration#isClientMode()}
*
* @return forceServerMode flag.
*/
public boolean isForceServerMode() {
return forceSrvMode;
}
/**
* Sets force server mode flag.
* <p>
* If {@code true} TcpDiscoverySpi is started in server mode regardless
* of {@link IgniteConfiguration#isClientMode()}.
*
* @param forceSrvMode forceServerMode flag.
* @return {@code this} for chaining.
*/
@IgniteSpiConfiguration(optional = true)
public TcpDiscoverySpi setForceServerMode(boolean forceSrvMode) {
this.forceSrvMode = forceSrvMode;
return this;
}
/**
* If {@code true} client does not try to reconnect after
* server detected client node failure.
*
* @return Client reconnect disabled flag.
*/
public boolean isClientReconnectDisabled() {
return clientReconnectDisabled;
}
/**
* Sets client reconnect disabled flag.
* <p>
* If {@code true} client does not try to reconnect after
* server detected client node failure.
*
* @param clientReconnectDisabled Client reconnect disabled flag.
*/
@IgniteSpiConfiguration(optional = true)
public void setClientReconnectDisabled(boolean clientReconnectDisabled) {
this.clientReconnectDisabled = clientReconnectDisabled;
}
/**
* Inject resources
*
* @param ignite Ignite.
*/
@IgniteInstanceResource
@Override protected void injectResources(Ignite ignite) {
super.injectResources(ignite);
// Inject resource.
if (ignite != null) {
setLocalAddress(ignite.configuration().getLocalHost());
setAddressResolver(ignite.configuration().getAddressResolver());
}
}
/**
* Sets local host IP address that discovery SPI uses.
* <p>
* If not provided, by default a first found non-loopback address
* will be used. If there is no non-loopback address available,
* then {@link InetAddress#getLocalHost()} will be used.
*
* @param locAddr IP address.
* @return {@code this} for chaining.
*/
@IgniteSpiConfiguration(optional = true)
public TcpDiscoverySpi setLocalAddress(String locAddr) {
// Injection should not override value already set by Spring or user.
if (this.locAddr == null)
this.locAddr = locAddr;
return this;
}
/**
* Gets local address that was set to SPI with {@link #setLocalAddress(String)} method.
*
* @return local address.
*/
public String getLocalAddress() {
return locAddr;
}
/**
* Sets address resolver.
*
* @param addrRslvr Address resolver.
*/
@IgniteSpiConfiguration(optional = true)
public void setAddressResolver(AddressResolver addrRslvr) {
// Injection should not override value already set by Spring or user.
if (this.addrRslvr == null)
this.addrRslvr = addrRslvr;
}
/**
* Gets address resolver.
*
* @return Address resolver.
*/
public AddressResolver getAddressResolver() {
return addrRslvr;
}
/** {@inheritDoc} */
@Override public int getReconnectCount() {
return reconCnt;
}
/**
* Number of times node tries to (re)establish connection to another node.
* <p>
* Note that SPI implementation will increase {@link #ackTimeout} by factor 2
* on every retry.
* <p>
* If not specified, default is {@link #DFLT_RECONNECT_CNT}.
* <p>
* When this property is explicitly set {@link IgniteConfiguration#getFailureDetectionTimeout()} is ignored.
*
* @param reconCnt Number of retries during message sending.
* @see #setAckTimeout(long)
* @return {@code this} for chaining.
*/
@IgniteSpiConfiguration(optional = true)
public TcpDiscoverySpi setReconnectCount(int reconCnt) {
this.reconCnt = reconCnt;
failureDetectionTimeoutEnabled(false);
return this;
}
/** {@inheritDoc} */
@Override public long getMaxAckTimeout() {
return maxAckTimeout;
}
/**
* Sets maximum timeout for receiving acknowledgement for sent message.
* <p>
* If acknowledgement is not received within this timeout, sending is considered as failed
* and SPI tries to repeat message sending. Every time SPI retries messing sending, ack
* timeout will be increased. If no acknowledgement is received and {@code maxAckTimeout}
* is reached, then the process of message sending is considered as failed.
* <p>
* If not specified, default is {@link #DFLT_MAX_ACK_TIMEOUT}.
* <p>
* Affected server nodes only.
* <p>
* When this property is explicitly set {@link IgniteConfiguration#getFailureDetectionTimeout()} is ignored.
*
* @param maxAckTimeout Maximum acknowledgement timeout.
* @return {@code this} for chaining.
*/
@IgniteSpiConfiguration(optional = true)
public TcpDiscoverySpi setMaxAckTimeout(long maxAckTimeout) {
this.maxAckTimeout = maxAckTimeout;
failureDetectionTimeoutEnabled(false);
return this;
}
/** {@inheritDoc} */
@Override public int getLocalPort() {
TcpDiscoveryNode locNode0 = locNode;
return locNode0 != null ? locNode0.discoveryPort() : 0;
}
/**
* Sets local port to listen to.
* <p>
* If not specified, default is {@link #DFLT_PORT}.
* <p>
* Affected server nodes only.
*
* @param locPort Local port to bind.
* @return {@code this} for chaining.
*/
@IgniteSpiConfiguration(optional = true)
public TcpDiscoverySpi setLocalPort(int locPort) {
this.locPort = locPort;
return this;
}
/** {@inheritDoc} */
@Override public int getLocalPortRange() {
return locPortRange;
}
/**
* Range for local ports. Local node will try to bind on first available port
* starting from {@link #getLocalPort()} up until
* <tt>{@link #getLocalPort()} {@code + locPortRange}</tt>.
* <p>
* If not specified, default is {@link #DFLT_PORT_RANGE}.
* <p>
* Affected server nodes only.
*
* @param locPortRange Local port range to bind.
* @return {@code this} for chaining.
*/
@IgniteSpiConfiguration(optional = true)
public TcpDiscoverySpi setLocalPortRange(int locPortRange) {
this.locPortRange = locPortRange;
return this;
}
/** {@inheritDoc} */
@Override public int getMaxMissedHeartbeats() {
return maxMissedHbs;
}
/**
* Sets max heartbeats count node can miss without initiating status check.
* <p>
* If not provided, default value is {@link #DFLT_MAX_MISSED_HEARTBEATS}.
* <p>
* Affected server nodes only.
*
* @param maxMissedHbs Max missed heartbeats.
* @return {@code this} for chaining.
*/
@IgniteSpiConfiguration(optional = true)
public TcpDiscoverySpi setMaxMissedHeartbeats(int maxMissedHbs) {
this.maxMissedHbs = maxMissedHbs;
return this;
}
/** {@inheritDoc} */
@Override public int getMaxMissedClientHeartbeats() {
return maxMissedClientHbs;
}
/**
* Sets max heartbeats count node can miss without failing client node.
* <p>
* If not provided, default value is {@link #DFLT_MAX_MISSED_CLIENT_HEARTBEATS}.
*
* @param maxMissedClientHbs Max missed client heartbeats.
* @return {@code this} for chaining.
*/
@IgniteSpiConfiguration(optional = true)
public TcpDiscoverySpi setMaxMissedClientHeartbeats(int maxMissedClientHbs) {
this.maxMissedClientHbs = maxMissedClientHbs;
return this;
}
/** {@inheritDoc} */
@Override public long getStatisticsPrintFrequency() {
return statsPrintFreq;
}
/**
* Sets statistics print frequency.
* <p>
* If not set default value is {@link #DFLT_STATS_PRINT_FREQ}.
* 0 indicates that no print is required. If value is greater than 0 and log is
* not quiet then statistics are printed out with INFO level.
* <p>
* This may be very helpful for tracing topology problems.
*
* @param statsPrintFreq Statistics print frequency in milliseconds.
* @return {@code this} for chaining.
*/
@IgniteSpiConfiguration(optional = true)
public TcpDiscoverySpi setStatisticsPrintFrequency(long statsPrintFreq) {
this.statsPrintFreq = statsPrintFreq;
return this;
}
/** {@inheritDoc} */
@Override public long getIpFinderCleanFrequency() {
return ipFinderCleanFreq;
}
/**
* Sets IP finder clean frequency in milliseconds.
* <p>
* If not provided, default value is {@link #DFLT_IP_FINDER_CLEAN_FREQ}
* <p>
* Affected server nodes only.
*
* @param ipFinderCleanFreq IP finder clean frequency.
* @return {@code this} for chaining.
*/
@IgniteSpiConfiguration(optional = true)
public TcpDiscoverySpi setIpFinderCleanFrequency(long ipFinderCleanFreq) {
this.ipFinderCleanFreq = ipFinderCleanFreq;
return this;
}
/**
* Gets IP finder for IP addresses sharing and storing.
*
* @return IP finder for IP addresses sharing and storing.
*/
public TcpDiscoveryIpFinder getIpFinder() {
return ipFinder;
}
/**
* Sets IP finder for IP addresses sharing and storing.
* <p>
* If not provided {@link org.apache.ignite.spi.discovery.tcp.ipfinder.multicast.TcpDiscoveryMulticastIpFinder} will
* be used by default.
*
* @param ipFinder IP finder.
* @return {@code this} for chaining.
*/
@IgniteSpiConfiguration(optional = true)
public TcpDiscoverySpi setIpFinder(TcpDiscoveryIpFinder ipFinder) {
this.ipFinder = ipFinder;
return this;
}
/**
* Sets socket operations timeout. This timeout is used to limit connection time and
* write-to-socket time.
* <p>
* Note that when running Ignite on Amazon EC2, socket timeout must be set to a value
* significantly greater than the default (e.g. to {@code 30000}).
* <p>
* If not specified, default is {@link #DFLT_SOCK_TIMEOUT} or {@link #DFLT_SOCK_TIMEOUT_CLIENT}.
* <p>
* When this property is explicitly set {@link IgniteConfiguration#getFailureDetectionTimeout()} is ignored.
*
* @param sockTimeout Socket connection timeout.
* @return {@code this} for chaining.
*/
@IgniteSpiConfiguration(optional = true)
public TcpDiscoverySpi setSocketTimeout(long sockTimeout) {
this.sockTimeout = sockTimeout;
failureDetectionTimeoutEnabled(false);
return this;
}
/**
* Sets timeout for receiving acknowledgement for sent message.
* <p>
* If acknowledgement is not received within this timeout, sending is considered as failed
* and SPI tries to repeat message sending.
* <p>
* If not specified, default is {@link #DFLT_ACK_TIMEOUT} or {@link #DFLT_ACK_TIMEOUT_CLIENT}.
* <p>
* When this property is explicitly set {@link IgniteConfiguration#getFailureDetectionTimeout()} is ignored.
*
* @param ackTimeout Acknowledgement timeout.
* @return {@code this} for chaining.
*/
@IgniteSpiConfiguration(optional = true)
public TcpDiscoverySpi setAckTimeout(long ackTimeout) {
this.ackTimeout = ackTimeout;
failureDetectionTimeoutEnabled(false);
return this;
}
/**
* Sets maximum network timeout to use for network operations.
* <p>
* If not specified, default is {@link #DFLT_NETWORK_TIMEOUT}.
*
* @param netTimeout Network timeout.
* @return {@code this} for chaining.
*/
@IgniteSpiConfiguration(optional = true)
public TcpDiscoverySpi setNetworkTimeout(long netTimeout) {
this.netTimeout = netTimeout;
return this;
}
/** {@inheritDoc} */
@Override public long getJoinTimeout() {
return joinTimeout;
}
/**
* Sets join timeout.
* <p>
* If non-shared IP finder is used and node fails to connect to
* any address from IP finder, node keeps trying to join within this
* timeout. If all addresses are still unresponsive, exception is thrown
* and node startup fails.
* <p>
* If not specified, default is {@link #DFLT_JOIN_TIMEOUT}.
*
* @param joinTimeout Join timeout ({@code 0} means wait forever).
*
* @see TcpDiscoveryIpFinder#isShared()
* @return {@code this} for chaining.
*/
@IgniteSpiConfiguration(optional = true)
public TcpDiscoverySpi setJoinTimeout(long joinTimeout) {
this.joinTimeout = joinTimeout;
return this;
}
/**
* Sets thread priority. All threads within SPI will be started with it.
* <p>
* If not provided, default value is {@link #DFLT_THREAD_PRI}
*
* @param threadPri Thread priority.
* @return {@code this} for chaining.
*/
@IgniteSpiConfiguration(optional = true)
public TcpDiscoverySpi setThreadPriority(int threadPri) {
this.threadPri = threadPri;
return this;
}
/**
* Sets delay between issuing of heartbeat messages. SPI sends heartbeat messages
* in configurable time interval to other nodes to notify them about its state.
* <p>
* If not provided, default value is {@link #DFLT_HEARTBEAT_FREQ}.
*
* @param hbFreq Heartbeat frequency in milliseconds.
* @return {@code this} for chaining.
*/
@IgniteSpiConfiguration(optional = true)
public TcpDiscoverySpi setHeartbeatFrequency(long hbFreq) {
this.hbFreq = hbFreq;
return this;
}
/**
* @return Size of topology snapshots history.
*/
public long getTopHistorySize() {
return topHistSize;
}
/**
* Sets size of topology snapshots history. Specified size should be greater than or equal to default size
* {@link #DFLT_TOP_HISTORY_SIZE}.
*
* @param topHistSize Size of topology snapshots history.
* @return {@code this} for chaining.
*/
@IgniteSpiConfiguration(optional = true)
public TcpDiscoverySpi setTopHistorySize(int topHistSize) {
if (topHistSize < DFLT_TOP_HISTORY_SIZE) {
U.warn(log, "Topology history size should be greater than or equal to default size. " +
"Specified size will not be set [curSize=" + this.topHistSize + ", specifiedSize=" + topHistSize +
", defaultSize=" + DFLT_TOP_HISTORY_SIZE + ']');
return this;
}
this.topHistSize = topHistSize;
return this;
}
/** {@inheritDoc} */
@Override public void setNodeAttributes(Map<String, Object> attrs, IgniteProductVersion ver) {
assert locNodeAttrs == null;
assert locNodeVer == null;
if (log.isDebugEnabled()) {
log.debug("Node attributes to set: " + attrs);
log.debug("Node version to set: " + ver);
}
locNodeAttrs = attrs;
locNodeVer = ver;
}
/**
* @param srvPort Server port.
* @param addExtAddrAttr If {@code true} adds {@link #ATTR_EXT_ADDRS} attribute.
*/
protected void initLocalNode(int srvPort, boolean addExtAddrAttr) {
// Init local node.
IgniteBiTuple<Collection<String>, Collection<String>> addrs;
try {
addrs = U.resolveLocalAddresses(locHost);
}
catch (IOException | IgniteCheckedException e) {
throw new IgniteSpiException("Failed to resolve local host to set of external addresses: " + locHost, e);
}
locNode = new TcpDiscoveryNode(
ignite.configuration().getNodeId(),
addrs.get1(),
addrs.get2(),
srvPort,
metricsProvider,
locNodeVer,
ignite.configuration().getConsistentId());
if (addExtAddrAttr) {
Collection<InetSocketAddress> extAddrs = addrRslvr == null ? null :
U.resolveAddresses(addrRslvr, F.flat(Arrays.asList(addrs.get1(), addrs.get2())),
locNode.discoveryPort());
locNodeAddrs = new LinkedHashSet<>();
locNodeAddrs.addAll(locNode.socketAddresses());
if (extAddrs != null) {
locNodeAttrs.put(createSpiAttributeName(ATTR_EXT_ADDRS), extAddrs);
locNodeAddrs.addAll(extAddrs);
}
}
locNode.setAttributes(locNodeAttrs);
locNode.local(true);
if (log.isDebugEnabled())
log.debug("Local node initialized: " + locNode);
}
/**
* @param node Node.
* @return {@link LinkedHashSet} of internal and external addresses of provided node.
* Internal addresses placed before external addresses.
*/
@SuppressWarnings("TypeMayBeWeakened")
LinkedHashSet<InetSocketAddress> getNodeAddresses(TcpDiscoveryNode node) {
LinkedHashSet<InetSocketAddress> res = new LinkedHashSet<>(node.socketAddresses());
Collection<InetSocketAddress> extAddrs = node.attribute(createSpiAttributeName(ATTR_EXT_ADDRS));
if (extAddrs != null)
res.addAll(extAddrs);
return res;
}
/**
* @param node Node.
* @param sameHost Same host flag.
* @return {@link LinkedHashSet} of internal and external addresses of provided node.
* Internal addresses placed before external addresses.
* Internal addresses will be sorted with {@code inetAddressesComparator(sameHost)}.
*/
@SuppressWarnings("TypeMayBeWeakened")
LinkedHashSet<InetSocketAddress> getNodeAddresses(TcpDiscoveryNode node, boolean sameHost) {
List<InetSocketAddress> addrs = U.arrayList(node.socketAddresses());
Collections.sort(addrs, U.inetAddressesComparator(sameHost));
LinkedHashSet<InetSocketAddress> res = new LinkedHashSet<>();
InetSocketAddress lastAddr = node.lastSuccessfulAddress();
if (lastAddr != null)
res.add(lastAddr);
res.addAll(addrs);
Collection<InetSocketAddress> extAddrs = node.attribute(createSpiAttributeName(ATTR_EXT_ADDRS));
if (extAddrs != null)
res.addAll(extAddrs);
return res;
}
/** {@inheritDoc} */
@Override public Collection<Object> injectables() {
return F.<Object>asList(ipFinder);
}
/** {@inheritDoc} */
@Override public long getSocketTimeout() {
return sockTimeout;
}
/** {@inheritDoc} */
@Override public long getAckTimeout() {
return ackTimeout;
}
/** {@inheritDoc} */
@Override public long getNetworkTimeout() {
return netTimeout;
}
/** {@inheritDoc} */
@Override public int getThreadPriority() {
return threadPri;
}
/** {@inheritDoc} */
@Override public long getHeartbeatFrequency() {
return hbFreq;
}
/** {@inheritDoc} */
@Override public String getIpFinderFormatted() {
return ipFinder.toString();
}
/** {@inheritDoc} */
@Override public long getNodesJoined() {
return stats.joinedNodesCount();
}
/** {@inheritDoc} */
@Override public long getNodesLeft() {
return stats.leftNodesCount();
}
/** {@inheritDoc} */
@Override public long getNodesFailed() {
return stats.failedNodesCount();
}
/** {@inheritDoc} */
@Override public long getPendingMessagesRegistered() {
return stats.pendingMessagesRegistered();
}
/** {@inheritDoc} */
@Override public long getPendingMessagesDiscarded() {
return stats.pendingMessagesDiscarded();
}
/** {@inheritDoc} */
@Override public long getAvgMessageProcessingTime() {
return stats.avgMessageProcessingTime();
}
/** {@inheritDoc} */
@Override public long getMaxMessageProcessingTime() {
return stats.maxMessageProcessingTime();
}
/** {@inheritDoc} */
@Override public int getTotalReceivedMessages() {
return stats.totalReceivedMessages();
}
/** {@inheritDoc} */
@Override public Map<String, Integer> getReceivedMessages() {
return stats.receivedMessages();
}
/** {@inheritDoc} */
@Override public int getTotalProcessedMessages() {
return stats.totalProcessedMessages();
}
/** {@inheritDoc} */
@Override public Map<String, Integer> getProcessedMessages() {
return stats.processedMessages();
}
/** {@inheritDoc} */
@Override public long getCoordinatorSinceTimestamp() {
return stats.coordinatorSinceTimestamp();
}
/** {@inheritDoc} */
@Override protected void onContextInitialized0(IgniteSpiContext spiCtx) throws IgniteSpiException {
super.onContextInitialized0(spiCtx);
ctxInitLatch.countDown();
ipFinder.onSpiContextInitialized(spiCtx);
impl.onContextInitialized0(spiCtx);
}
/** {@inheritDoc} */
@Override protected void onContextDestroyed0() {
super.onContextDestroyed0();
if (ctxInitLatch.getCount() > 0)
// Safety.
ctxInitLatch.countDown();
if (ipFinder != null)
ipFinder.onSpiContextDestroyed();
getSpiContext().deregisterPorts();
}
/** {@inheritDoc} */
@Override public IgniteSpiContext getSpiContext() {
if (ctxInitLatch.getCount() > 0) {
if (log.isDebugEnabled())
log.debug("Waiting for context initialization.");
try {
U.await(ctxInitLatch);
if (log.isDebugEnabled())
log.debug("Context has been initialized.");
}
catch (IgniteInterruptedCheckedException e) {
U.warn(log, "Thread has been interrupted while waiting for SPI context initialization.", e);
}
}
return super.getSpiContext();
}
/** {@inheritDoc} */
@Override public ClusterNode getLocalNode() {
return locNode;
}
/** {@inheritDoc} */
@Override public void setListener(@Nullable DiscoverySpiListener lsnr) {
this.lsnr = lsnr;
}
/** {@inheritDoc} */
@Override public TcpDiscoverySpi setDataExchange(DiscoverySpiDataExchange exchange) {
this.exchange = exchange;
return this;
}
/** {@inheritDoc} */
@Override public TcpDiscoverySpi setMetricsProvider(DiscoveryMetricsProvider metricsProvider) {
this.metricsProvider = metricsProvider;
return this;
}
/** {@inheritDoc} */
@Override public long getGridStartTime() {
assert gridStartTime != 0;
return gridStartTime;
}
/**
* @param sockAddr Remote address.
* @param timeoutHelper Timeout helper.
* @return Opened socket.
* @throws IOException If failed.
* @throws IgniteSpiOperationTimeoutException In case of timeout.
*/
protected Socket openSocket(InetSocketAddress sockAddr, IgniteSpiOperationTimeoutHelper timeoutHelper)
throws IOException, IgniteSpiOperationTimeoutException {
return openSocket(createSocket(), sockAddr, timeoutHelper);
}
/**
* Connects to remote address sending {@code U.IGNITE_HEADER} when connection is established.
*
* @param sock Socket bound to a local host address.
* @param remAddr Remote address.
* @param timeoutHelper Timeout helper.
* @return Connected socket.
* @throws IOException If failed.
* @throws IgniteSpiOperationTimeoutException In case of timeout.
*/
protected Socket openSocket(Socket sock, InetSocketAddress remAddr, IgniteSpiOperationTimeoutHelper timeoutHelper)
throws IOException, IgniteSpiOperationTimeoutException {
assert remAddr != null;
InetSocketAddress resolved = remAddr.isUnresolved() ?
new InetSocketAddress(InetAddress.getByName(remAddr.getHostName()), remAddr.getPort()) : remAddr;
InetAddress addr = resolved.getAddress();
assert addr != null;
sock.connect(resolved, (int)timeoutHelper.nextTimeoutChunk(sockTimeout));
writeToSocket(sock, U.IGNITE_HEADER, timeoutHelper.nextTimeoutChunk(sockTimeout));
return sock;
}
/**
* Creates socket binding it to a local host address. This operation is not blocking.
*
* @return Created socket.
* @throws IOException If failed.
*/
Socket createSocket() throws IOException {
Socket sock;
if (isSslEnabled())
sock = sslSockFactory.createSocket();
else
sock = new Socket();
sock.bind(new InetSocketAddress(locHost, 0));
sock.setTcpNoDelay(true);
return sock;
}
/**
* Writes message to the socket.
*
* @param sock Socket.
* @param data Raw data to write.
* @param timeout Socket write timeout.
* @throws IOException If IO failed or write timed out.
*/
@SuppressWarnings("ThrowFromFinallyBlock")
private void writeToSocket(Socket sock, byte[] data, long timeout) throws IOException {
assert sock != null;
assert data != null;
SocketTimeoutObject obj = new SocketTimeoutObject(sock, U.currentTimeMillis() + timeout);
addTimeoutObject(obj);
IOException err = null;
try {
OutputStream out = sock.getOutputStream();
out.write(data);
out.flush();
}
catch (IOException e) {
err = e;
}
finally {
boolean cancelled = obj.cancel();
if (cancelled)
removeTimeoutObject(obj);
// Throw original exception.
if (err != null)
throw err;
if (!cancelled)
throw new SocketTimeoutException("Write timed out (socket was concurrently closed).");
}
}
/**
* Writes message to the socket.
*
* @param sock Socket.
* @param msg Message.
* @param timeout Timeout.
* @throws IOException If IO failed or write timed out.
* @throws IgniteCheckedException If marshalling failed.
*/
@SuppressWarnings("ThrowFromFinallyBlock")
protected void writeToSocket(Socket sock,
TcpDiscoveryAbstractMessage msg,
long timeout) throws IOException, IgniteCheckedException {
assert sock != null;
assert msg != null;
SocketTimeoutObject obj = new SocketTimeoutObject(sock, U.currentTimeMillis() + timeout);
addTimeoutObject(obj);
IOException err = null;
try {
marsh.marshal(msg, new BufferedOutputStream(sock.getOutputStream(), sock.getSendBufferSize()));
}
catch (IOException e) {
err = e;
}
finally {
boolean cancelled = obj.cancel();
if (cancelled)
removeTimeoutObject(obj);
// Throw original exception.
if (err != null)
throw err;
if (!cancelled)
throw new SocketTimeoutException("Write timed out (socket was concurrently closed).");
}
}
/**
* Writes response to the socket.
*
* @param msg Received message.
* @param sock Socket.
* @param res Integer response.
* @param timeout Socket timeout.
* @throws IOException If IO failed or write timed out.
*/
@SuppressWarnings("ThrowFromFinallyBlock")
protected void writeToSocket(TcpDiscoveryAbstractMessage msg, Socket sock, int res, long timeout)
throws IOException {
assert sock != null;
SocketTimeoutObject obj = new SocketTimeoutObject(sock, U.currentTimeMillis() + timeout);
addTimeoutObject(obj);
OutputStream out = sock.getOutputStream();
IOException err = null;
try {
out.write(res);
out.flush();
}
catch (IOException e) {
err = e;
}
finally {
boolean cancelled = obj.cancel();
if (cancelled)
removeTimeoutObject(obj);
// Throw original exception.
if (err != null)
throw err;
if (!cancelled)
throw new SocketTimeoutException("Write timed out (socket was concurrently closed).");
}
}
/**
* Reads message from the socket limiting read time.
*
* @param sock Socket.
* @param in Input stream (in case socket stream was wrapped).
* @param timeout Socket timeout for this operation.
* @return Message.
* @throws IOException If IO failed or read timed out.
* @throws IgniteCheckedException If unmarshalling failed.
*/
protected <T> T readMessage(Socket sock, @Nullable InputStream in, long timeout) throws IOException,
IgniteCheckedException {
assert sock != null;
int oldTimeout = sock.getSoTimeout();
try {
sock.setSoTimeout((int)timeout);
T res = marsh.unmarshal(in == null ? sock.getInputStream() : in, U.gridClassLoader());
return res;
}
catch (IOException | IgniteCheckedException e) {
if (X.hasCause(e, SocketTimeoutException.class))
LT.warn(log, null, "Timed out waiting for message to be read (most probably, the reason is " +
"in long GC pauses on remote node) [curTimeout=" + timeout + ']');
throw e;
}
finally {
// Quietly restore timeout.
try {
sock.setSoTimeout(oldTimeout);
}
catch (SocketException ignored) {
// No-op.
}
}
}
/**
* Reads message delivery receipt from the socket.
*
* @param sock Socket.
* @param timeout Socket timeout for this operation.
* @return Receipt.
* @throws IOException If IO failed or read timed out.
*/
protected int readReceipt(Socket sock, long timeout) throws IOException {
assert sock != null;
int oldTimeout = sock.getSoTimeout();
try {
sock.setSoTimeout((int)timeout);
int res = sock.getInputStream().read();
if (res == -1)
throw new EOFException();
return res;
}
catch (SocketTimeoutException e) {
LT.warn(log, null, "Timed out waiting for message delivery receipt (most probably, the reason is " +
"in long GC pauses on remote node; consider tuning GC and increasing 'ackTimeout' " +
"configuration property). Will retry to send message with increased timeout. " +
"Current timeout: " + timeout + '.');
stats.onAckTimeout();
throw e;
}
finally {
// Quietly restore timeout.
try {
sock.setSoTimeout(oldTimeout);
}
catch (SocketException ignored) {
// No-op.
}
}
}
/**
* Resolves addresses registered in the IP finder, removes duplicates and local host
* address and returns the collection of.
*
* @return Resolved addresses without duplicates and local address (potentially
* empty but never null).
* @throws org.apache.ignite.spi.IgniteSpiException If an error occurs.
*/
protected Collection<InetSocketAddress> resolvedAddresses() throws IgniteSpiException {
List<InetSocketAddress> res = new ArrayList<>();
Collection<InetSocketAddress> addrs;
// Get consistent addresses collection.
while (true) {
try {
addrs = registeredAddresses();
break;
}
catch (IgniteSpiException e) {
LT.error(log, e, "Failed to get registered addresses from IP finder on start " +
"(retrying every 2000 ms).");
}
try {
U.sleep(2000);
}
catch (IgniteInterruptedCheckedException e) {
throw new IgniteSpiException("Thread has been interrupted.", e);
}
}
for (InetSocketAddress addr : addrs) {
assert addr != null;
try {
InetSocketAddress resolved = addr.isUnresolved() ?
new InetSocketAddress(InetAddress.getByName(addr.getHostName()), addr.getPort()) : addr;
if (locNodeAddrs == null || !locNodeAddrs.contains(resolved))
res.add(resolved);
}
catch (UnknownHostException ignored) {
LT.warn(log, null, "Failed to resolve address from IP finder (host is unknown): " + addr);
// Add address in any case.
res.add(addr);
}
}
if (!res.isEmpty())
Collections.shuffle(res);
return res;
}
/**
* Gets addresses registered in the IP finder, initializes addresses having no
* port (or 0 port) with {@link #DFLT_PORT}.
*
* @return Registered addresses.
* @throws org.apache.ignite.spi.IgniteSpiException If an error occurs.
*/
protected Collection<InetSocketAddress> registeredAddresses() throws IgniteSpiException {
Collection<InetSocketAddress> res = new ArrayList<>();
for (InetSocketAddress addr : ipFinder.getRegisteredAddresses()) {
if (addr.getPort() == 0) {
// TcpDiscoveryNode.discoveryPort() returns an correct port for a server node and 0 for client node.
int port = locNode.discoveryPort() != 0 ? locNode.discoveryPort() : DFLT_PORT;
addr = addr.isUnresolved() ? new InetSocketAddress(addr.getHostName(), port) :
new InetSocketAddress(addr.getAddress(), port);
}
res.add(addr);
}
return res;
}
/**
* @param msg Message.
* @return Error.
*/
protected IgniteSpiException duplicateIdError(TcpDiscoveryDuplicateIdMessage msg) {
assert msg != null;
return new IgniteSpiException("Local node has the same ID as existing node in topology " +
"(fix configuration and restart local node) [localNode=" + locNode +
", existingNode=" + msg.node() + ']');
}
/**
* @param msg Message.
* @return Error.
*/
protected IgniteSpiException authenticationFailedError(TcpDiscoveryAuthFailedMessage msg) {
assert msg != null;
return new IgniteSpiException(new IgniteAuthenticationException("Authentication failed [nodeId=" +
msg.creatorNodeId() + ", addr=" + msg.address().getHostAddress() + ']'));
}
/**
* @param msg Message.
* @return Error.
*/
protected IgniteSpiException checkFailedError(TcpDiscoveryCheckFailedMessage msg) {
assert msg != null;
return versionCheckFailed(msg) ? new IgniteSpiVersionCheckException(msg.error()) :
new IgniteSpiException(msg.error());
}
/**
* @param msg Message.
* @return Whether delivery of the message is ensured.
*/
protected boolean ensured(TcpDiscoveryAbstractMessage msg) {
return U.getAnnotation(msg.getClass(), TcpDiscoveryEnsureDelivery.class) != null;
}
/**
* @param msg Failed message.
* @return {@code True} if specified failed message relates to version incompatibility, {@code false} otherwise.
* @deprecated Parsing of error message was used for preserving backward compatibility. We should remove it
* and create separate message for failed version check with next major release.
*/
@Deprecated
private static boolean versionCheckFailed(TcpDiscoveryCheckFailedMessage msg) {
return msg.error().contains("versions are not compatible");
}
/**
* @param nodeId Node ID.
* @return Marshalled exchange data.
*/
protected Map<Integer, byte[]> collectExchangeData(UUID nodeId) {
if (locNode.isDaemon())
return Collections.emptyMap();
Map<Integer, Serializable> data = exchange.collect(nodeId);
assert data != null;
Map<Integer, byte[]> data0 = U.newHashMap(data.size());
for (Map.Entry<Integer, Serializable> entry : data.entrySet()) {
try {
byte[] bytes = marsh.marshal(entry.getValue());
data0.put(entry.getKey(), bytes);
}
catch (IgniteCheckedException e) {
U.error(log, "Failed to marshal discovery data " +
"[comp=" + entry.getKey() + ", data=" + entry.getValue() + ']', e);
}
}
return data0;
}
/**
* @param joiningNodeID Joining node ID.
* @param nodeId Remote node ID for which data is provided.
* @param data Collection of marshalled discovery data objects from different components.
* @param clsLdr Class loader for discovery data unmarshalling.
*/
protected void onExchange(UUID joiningNodeID,
UUID nodeId,
Map<Integer, byte[]> data,
ClassLoader clsLdr)
{
if (locNode.isDaemon())
return;
Map<Integer, Serializable> data0 = U.newHashMap(data.size());
for (Map.Entry<Integer, byte[]> entry : data.entrySet()) {
try {
Serializable compData = marsh.unmarshal(entry.getValue(), clsLdr);
data0.put(entry.getKey(), compData);
}
catch (IgniteCheckedException e) {
U.error(log, "Failed to unmarshal discovery data for component: " + entry.getKey(), e);
}
}
exchange.onExchange(joiningNodeID, nodeId, data0);
}
/** {@inheritDoc} */
@Override public void spiStart(@Nullable String gridName) throws IgniteSpiException {
initFailureDetectionTimeout();
if (!forceSrvMode && (Boolean.TRUE.equals(ignite.configuration().isClientMode()))) {
if (ackTimeout == 0)
ackTimeout = DFLT_ACK_TIMEOUT_CLIENT;
if (sockTimeout == 0)
sockTimeout = DFLT_SOCK_TIMEOUT_CLIENT;
impl = new ClientImpl(this);
ctxInitLatch.countDown();
}
else {
if (ackTimeout == 0)
ackTimeout = DFLT_ACK_TIMEOUT;
if (sockTimeout == 0)
sockTimeout = DFLT_SOCK_TIMEOUT;
impl = new ServerImpl(this);
}
if (!failureDetectionTimeoutEnabled()) {
assertParameter(sockTimeout > 0, "sockTimeout > 0");
assertParameter(ackTimeout > 0, "ackTimeout > 0");
assertParameter(maxAckTimeout > ackTimeout, "maxAckTimeout > ackTimeout");
assertParameter(reconCnt > 0, "reconnectCnt > 0");
}
assertParameter(netTimeout > 0, "networkTimeout > 0");
assertParameter(ipFinder != null, "ipFinder != null");
assertParameter(hbFreq > 0, "heartbeatFreq > 0");
assertParameter(ipFinderCleanFreq > 0, "ipFinderCleanFreq > 0");
assertParameter(locPort > 1023, "localPort > 1023");
assertParameter(locPortRange >= 0, "localPortRange >= 0");
assertParameter(locPort + locPortRange <= 0xffff, "locPort + locPortRange <= 0xffff");
assertParameter(maxMissedHbs > 0, "maxMissedHeartbeats > 0");
assertParameter(maxMissedClientHbs > 0, "maxMissedClientHeartbeats > 0");
assertParameter(threadPri > 0, "threadPri > 0");
assertParameter(statsPrintFreq >= 0, "statsPrintFreq >= 0");
if (isSslEnabled()) {
try {
SSLContext sslCtx = ignite().configuration().getSslContextFactory().create();
sslSockFactory = sslCtx.getSocketFactory();
sslSrvSockFactory = sslCtx.getServerSocketFactory();
}
catch (IgniteException e) {
throw new IgniteSpiException("Failed to create SSL context. SSL factory: "
+ ignite.configuration().getSslContextFactory(), e);
}
}
try {
locHost = U.resolveLocalHost(locAddr);
}
catch (IOException e) {
throw new IgniteSpiException("Unknown local address: " + locAddr, e);
}
if (log.isDebugEnabled()) {
log.debug(configInfo("localHost", locHost.getHostAddress()));
log.debug(configInfo("localPort", locPort));
log.debug(configInfo("localPortRange", locPortRange));
log.debug(configInfo("threadPri", threadPri));
if (!failureDetectionTimeoutEnabled()) {
log.debug("Failure detection timeout is ignored because at least one of the parameters from this list" +
" has been set explicitly: 'sockTimeout', 'ackTimeout', 'maxAckTimeout', 'reconnectCount'.");
log.debug(configInfo("networkTimeout", netTimeout));
log.debug(configInfo("sockTimeout", sockTimeout));
log.debug(configInfo("ackTimeout", ackTimeout));
log.debug(configInfo("maxAckTimeout", maxAckTimeout));
log.debug(configInfo("reconnectCount", reconCnt));
}
else
log.debug(configInfo("failureDetectionTimeout", failureDetectionTimeout()));
log.debug(configInfo("ipFinder", ipFinder));
log.debug(configInfo("ipFinderCleanFreq", ipFinderCleanFreq));
log.debug(configInfo("heartbeatFreq", hbFreq));
log.debug(configInfo("maxMissedHeartbeats", maxMissedHbs));
log.debug(configInfo("statsPrintFreq", statsPrintFreq));
}
// Warn on odd network timeout.
if (netTimeout < 3000)
U.warn(log, "Network timeout is too low (at least 3000 ms recommended): " + netTimeout);
registerMBean(gridName, this, TcpDiscoverySpiMBean.class);
if (ipFinder instanceof TcpDiscoveryMulticastIpFinder) {
TcpDiscoveryMulticastIpFinder mcastIpFinder = ((TcpDiscoveryMulticastIpFinder)ipFinder);
if (mcastIpFinder.getLocalAddress() == null)
mcastIpFinder.setLocalAddress(locAddr);
}
cfgNodeId = ignite.configuration().getNodeId();
impl.spiStart(gridName);
}
/** {@inheritDoc} */
@Override public void spiStop() throws IgniteSpiException {
if (ctxInitLatch.getCount() > 0)
// Safety.
ctxInitLatch.countDown();
if (ipFinder != null) {
try {
ipFinder.close();
}
catch (Exception e) {
log.error("Failed to close ipFinder", e);
}
}
unregisterMBean();
if (impl != null)
impl.spiStop();
}
/**
*
*/
void printStartInfo() {
if (log.isDebugEnabled())
log.debug(startInfo());
}
/**
*
*/
void printStopInfo() {
if (log.isDebugEnabled())
log.debug(stopInfo());
}
/**
* @return Ignite instance.
*/
Ignite ignite() {
return ignite;
}
/**
* @return {@code True} if node is stopping.
*/
boolean isNodeStopping0() {
return isNodeStopping();
}
/**
* @throws IgniteSpiException If any error occurs.
* @return {@code true} if IP finder contains local address.
*/
boolean ipFinderHasLocalAddress() throws IgniteSpiException {
for (InetSocketAddress locAddr : locNodeAddrs) {
for (InetSocketAddress addr : registeredAddresses())
try {
int port = addr.getPort();
InetSocketAddress resolved = addr.isUnresolved() ?
new InetSocketAddress(InetAddress.getByName(addr.getHostName()), port) :
new InetSocketAddress(addr.getAddress(), port);
if (resolved.equals(locAddr))
return true;
}
catch (UnknownHostException e) {
getExceptionRegistry().onException(e.getMessage(), e);
}
}
return false;
}
/**
* @return {@code True} if ssl enabled.
*/
boolean isSslEnabled() {
return ignite().configuration().getSslContextFactory() != null;
}
/**
* <strong>FOR TEST ONLY!!!</strong>
*/
public int clientWorkerCount() {
return ((ServerImpl)impl).clientMsgWorkers.size();
}
/**
* <strong>FOR TEST ONLY!!!</strong>
*/
void forceNextNodeFailure() {
((ServerImpl)impl).forceNextNodeFailure();
}
/**
* <strong>FOR TEST ONLY!!!</strong>
*/
public void addSendMessageListener(IgniteInClosure<TcpDiscoveryAbstractMessage> lsnr) {
sndMsgLsnrs.add(lsnr);
}
/**
* <strong>FOR TEST ONLY!!!</strong>
*/
public void removeSendMessageListener(IgniteInClosure<TcpDiscoveryAbstractMessage> lsnr) {
sndMsgLsnrs.remove(lsnr);
}
/**
* <strong>FOR TEST ONLY!!!</strong>
*/
public void addIncomeConnectionListener(IgniteInClosure<Socket> lsnr) {
incomeConnLsnrs.add(lsnr);
}
/**
* <strong>FOR TEST ONLY!!!</strong>
*/
public void removeIncomeConnectionListener(IgniteInClosure<Socket> lsnr) {
incomeConnLsnrs.remove(lsnr);
}
/**
* FOR TEST PURPOSE ONLY!
*/
public void waitForClientMessagePrecessed() {
if (impl instanceof ClientImpl)
((ClientImpl)impl).waitForClientMessagePrecessed();
}
/**
* <strong>FOR TEST ONLY!!!</strong>
* <p>
* Simulates this node failure by stopping service threads. So, node will become
* unresponsive.
* <p>
* This method is intended for test purposes only.
*/
protected void simulateNodeFailure() {
impl.simulateNodeFailure();
}
/**
* FOR TEST PURPOSE ONLY!
*/
public void brakeConnection() {
impl.brakeConnection();
}
/** {@inheritDoc} */
@Override public String toString() {
return S.toString(TcpDiscoverySpi.class, this);
}
/**
* Socket timeout object.
*/
private class SocketTimeoutObject implements IgniteSpiTimeoutObject {
/** */
private final IgniteUuid id = IgniteUuid.randomUuid();
/** */
private final Socket sock;
/** */
private final long endTime;
/** */
private final AtomicBoolean done = new AtomicBoolean();
/**
* @param sock Socket.
* @param endTime End time.
*/
SocketTimeoutObject(Socket sock, long endTime) {
assert sock != null;
assert endTime > 0;
this.sock = sock;
this.endTime = endTime;
}
/**
* @return {@code True} if object has not yet been processed.
*/
boolean cancel() {
return done.compareAndSet(false, true);
}
/** {@inheritDoc} */
@Override public void onTimeout() {
if (done.compareAndSet(false, true)) {
// Close socket - timeout occurred.
U.closeQuiet(sock);
LT.warn(log, null, "Socket write has timed out (consider increasing " +
(failureDetectionTimeoutEnabled() ?
"'IgniteConfiguration.failureDetectionTimeout' configuration property) [" +
"failureDetectionTimeout=" + failureDetectionTimeout() + ']' :
"'sockTimeout' configuration property) [sockTimeout=" + sockTimeout + ']'));
stats.onSocketTimeout();
}
}
/** {@inheritDoc} */
@Override public long endTime() {
return endTime;
}
/** {@inheritDoc} */
@Override public IgniteUuid id() {
return id;
}
/** {@inheritDoc} */
@Override public String toString() {
return S.toString(SocketTimeoutObject.class, this);
}
}
}