blob: b7e3cd544ddacbc22b053d3a8ac903678ff56a37 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.spi.discovery.tcp;
import org.apache.ignite.*;
import org.apache.ignite.cluster.*;
import org.apache.ignite.internal.*;
import org.apache.ignite.internal.util.*;
import org.apache.ignite.internal.util.io.*;
import org.apache.ignite.internal.util.lang.*;
import org.apache.ignite.internal.util.typedef.*;
import org.apache.ignite.internal.util.typedef.internal.*;
import org.apache.ignite.lang.*;
import org.apache.ignite.marshaller.*;
import org.apache.ignite.marshaller.jdk.*;
import org.apache.ignite.resources.*;
import org.apache.ignite.spi.*;
import org.apache.ignite.spi.discovery.*;
import org.apache.ignite.spi.discovery.tcp.internal.*;
import org.apache.ignite.spi.discovery.tcp.ipfinder.*;
import org.apache.ignite.spi.discovery.tcp.messages.*;
import org.jetbrains.annotations.*;
import java.io.*;
import java.net.*;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.*;
import static org.apache.ignite.spi.discovery.tcp.internal.TcpDiscoverySpiState.*;
/**
* Base class for TCP discovery SPIs.
*/
abstract class TcpDiscoverySpiAdapter extends IgniteSpiAdapter implements DiscoverySpi {
/** Default port to listen (value is <tt>47500</tt>). */
public static final int DFLT_PORT = 47500;
/** Default socket operations timeout in milliseconds (value is <tt>200ms</tt>). */
public static final long DFLT_SOCK_TIMEOUT = 200;
/** Default timeout for receiving message acknowledgement in milliseconds (value is <tt>200ms</tt>). */
public static final long DFLT_ACK_TIMEOUT = 200;
/** Default network timeout in milliseconds (value is <tt>200ms</tt>). */
public static final long DFLT_NETWORK_TIMEOUT = 200;
/** Default value for thread priority (value is <tt>10</tt>). */
public static final int DFLT_THREAD_PRI = 10;
/** Default heartbeat messages issuing frequency (value is <tt>300ms</tt>). */
public static final long DFLT_HEARTBEAT_FREQ = 300;
/** Default size of topology snapshots history. */
public static final int DFLT_TOP_HISTORY_SIZE = 1000;
/** Response OK. */
protected static final int RES_OK = 1;
/** Response CONTINUE JOIN. */
protected static final int RES_CONTINUE_JOIN = 100;
/** Response WAIT. */
protected static final int RES_WAIT = 200;
/** Local address. */
protected String locAddr;
/** IP finder. */
protected TcpDiscoveryIpFinder ipFinder;
/** Socket operations timeout. */
protected long sockTimeout = DFLT_SOCK_TIMEOUT;
/** Message acknowledgement timeout. */
protected long ackTimeout = DFLT_ACK_TIMEOUT;
/** Network timeout. */
protected long netTimeout = DFLT_NETWORK_TIMEOUT;
/** Thread priority for all threads started by SPI. */
protected int threadPri = DFLT_THREAD_PRI;
/** Heartbeat messages issuing frequency. */
protected long hbFreq = DFLT_HEARTBEAT_FREQ;
/** Size of topology snapshots history. */
protected int topHistSize = DFLT_TOP_HISTORY_SIZE;
/** Grid discovery listener. */
protected volatile DiscoverySpiListener lsnr;
/** Data exchange. */
protected DiscoverySpiDataExchange exchange;
/** Metrics provider. */
protected DiscoveryMetricsProvider metricsProvider;
/** Local node attributes. */
protected Map<String, Object> locNodeAttrs;
/** Local node version. */
protected IgniteProductVersion locNodeVer;
/** Local node. */
protected TcpDiscoveryNode locNode;
/** Local host. */
protected InetAddress locHost;
/** Internal and external addresses of local node. */
protected Collection<InetSocketAddress> locNodeAddrs;
/** Socket timeout worker. */
protected SocketTimeoutWorker sockTimeoutWorker;
/** Discovery state. */
protected TcpDiscoverySpiState spiState = DISCONNECTED;
/** Start time of the very first grid node. */
protected volatile long gridStartTime;
/** Marshaller. */
protected final Marshaller marsh = new JdkMarshaller();
/** Statistics. */
protected final TcpDiscoveryStatistics stats = new TcpDiscoveryStatistics();
/** Logger. */
@LoggerResource
protected IgniteLogger log;
/**
* Inject resources
*
* @param ignite Ignite.
*/
@IgniteInstanceResource
@Override protected void injectResources(Ignite ignite) {
super.injectResources(ignite);
// Inject resource.
if (ignite != null)
setLocalAddress(ignite.configuration().getLocalHost());
}
/**
* Sets local host IP address that discovery SPI uses.
* <p>
* If not provided, by default a first found non-loopback address
* will be used. If there is no non-loopback address available,
* then {@link InetAddress#getLocalHost()} will be used.
*
* @param locAddr IP address.
*/
@IgniteSpiConfiguration(optional = true)
public void setLocalAddress(String locAddr) {
// Injection should not override value already set by Spring or user.
if (this.locAddr == null)
this.locAddr = locAddr;
}
/**
* Gets local address that was set to SPI with {@link #setLocalAddress(String)} method.
*
* @return local address.
*/
public String getLocalAddress() {
return locAddr;
}
/**
* Gets IP finder for IP addresses sharing and storing.
*
* @return IP finder for IP addresses sharing and storing.
*/
public TcpDiscoveryIpFinder getIpFinder() {
return ipFinder;
}
/**
* Sets IP finder for IP addresses sharing and storing.
* <p>
* If not provided {@link org.apache.ignite.spi.discovery.tcp.ipfinder.multicast.TcpDiscoveryMulticastIpFinder} will be used by default.
*
* @param ipFinder IP finder.
*/
@IgniteSpiConfiguration(optional = true)
public void setIpFinder(TcpDiscoveryIpFinder ipFinder) {
this.ipFinder = ipFinder;
}
/**
* Sets socket operations timeout. This timeout is used to limit connection time and
* write-to-socket time.
* <p>
* Note that when running Ignite on Amazon EC2, socket timeout must be set to a value
* significantly greater than the default (e.g. to {@code 30000}).
* <p>
* If not specified, default is {@link #DFLT_SOCK_TIMEOUT}.
*
* @param sockTimeout Socket connection timeout.
*/
@IgniteSpiConfiguration(optional = true)
public void setSocketTimeout(long sockTimeout) {
this.sockTimeout = sockTimeout;
}
/**
* Sets timeout for receiving acknowledgement for sent message.
* <p>
* If acknowledgement is not received within this timeout, sending is considered as failed
* and SPI tries to repeat message sending.
* <p>
* If not specified, default is {@link #DFLT_ACK_TIMEOUT}.
*
* @param ackTimeout Acknowledgement timeout.
*/
@IgniteSpiConfiguration(optional = true)
public void setAckTimeout(long ackTimeout) {
this.ackTimeout = ackTimeout;
}
/**
* Sets maximum network timeout to use for network operations.
* <p>
* If not specified, default is {@link #DFLT_NETWORK_TIMEOUT}.
*
* @param netTimeout Network timeout.
*/
@IgniteSpiConfiguration(optional = true)
public void setNetworkTimeout(long netTimeout) {
this.netTimeout = netTimeout;
}
/**
* Sets thread priority. All threads within SPI will be started with it.
* <p>
* If not provided, default value is {@link #DFLT_THREAD_PRI}
*
* @param threadPri Thread priority.
*/
@IgniteSpiConfiguration(optional = true)
public void setThreadPriority(int threadPri) {
this.threadPri = threadPri;
}
/**
* Sets delay between issuing of heartbeat messages. SPI sends heartbeat messages
* in configurable time interval to other nodes to notify them about its state.
* <p>
* If not provided, default value is {@link #DFLT_HEARTBEAT_FREQ}.
*
* @param hbFreq Heartbeat frequency in milliseconds.
*/
@IgniteSpiConfiguration(optional = true)
public void setHeartbeatFrequency(long hbFreq) {
this.hbFreq = hbFreq;
}
/**
* @return Size of topology snapshots history.
*/
public long getTopHistorySize() {
return topHistSize;
}
/**
* Sets size of topology snapshots history. Specified size should be greater than or equal to default size
* {@link #DFLT_TOP_HISTORY_SIZE}.
*
* @param topHistSize Size of topology snapshots history.
*/
@IgniteSpiConfiguration(optional = true)
public void setTopHistorySize(int topHistSize) {
if (topHistSize < DFLT_TOP_HISTORY_SIZE) {
U.warn(log, "Topology history size should be greater than or equal to default size. " +
"Specified size will not be set [curSize=" + this.topHistSize + ", specifiedSize=" + topHistSize +
", defaultSize=" + DFLT_TOP_HISTORY_SIZE + ']');
return;
}
this.topHistSize = topHistSize;
}
/** {@inheritDoc} */
@Override public void setNodeAttributes(Map<String, Object> attrs, IgniteProductVersion ver) {
assert locNodeAttrs == null;
assert locNodeVer == null;
if (log.isDebugEnabled()) {
log.debug("Node attributes to set: " + attrs);
log.debug("Node version to set: " + ver);
}
locNodeAttrs = attrs;
locNodeVer = ver;
}
/** {@inheritDoc} */
@Override protected void onContextInitialized0(IgniteSpiContext spiCtx) throws IgniteSpiException {
super.onContextInitialized0(spiCtx);
ipFinder.onSpiContextInitialized(spiCtx);
}
/** {@inheritDoc} */
@Override protected void onContextDestroyed0() {
super.onContextDestroyed0();
if (ipFinder != null)
ipFinder.onSpiContextDestroyed();
}
/** {@inheritDoc} */
@Override public ClusterNode getLocalNode() {
return locNode;
}
/** {@inheritDoc} */
@Override public void setListener(@Nullable DiscoverySpiListener lsnr) {
this.lsnr = lsnr;
}
/** {@inheritDoc} */
@Override public void setDataExchange(DiscoverySpiDataExchange exchange) {
this.exchange = exchange;
}
/** {@inheritDoc} */
@Override public void setMetricsProvider(DiscoveryMetricsProvider metricsProvider) {
this.metricsProvider = metricsProvider;
}
/** {@inheritDoc} */
@Override public long getGridStartTime() {
assert gridStartTime != 0;
return gridStartTime;
}
/**
* @param sockAddr Remote address.
* @return Opened socket.
* @throws IOException If failed.
*/
protected Socket openSocket(InetSocketAddress sockAddr) throws IOException {
assert sockAddr != null;
InetSocketAddress resolved = sockAddr.isUnresolved() ?
new InetSocketAddress(InetAddress.getByName(sockAddr.getHostName()), sockAddr.getPort()) : sockAddr;
InetAddress addr = resolved.getAddress();
assert addr != null;
Socket sock = new Socket();
sock.bind(new InetSocketAddress(locHost, 0));
sock.setTcpNoDelay(true);
sock.connect(resolved, (int)sockTimeout);
writeToSocket(sock, U.IGNITE_HEADER);
return sock;
}
/**
* Writes message to the socket.
*
* @param sock Socket.
* @param data Raw data to write.
* @throws IOException If IO failed or write timed out.
*/
@SuppressWarnings("ThrowFromFinallyBlock")
protected void writeToSocket(Socket sock, byte[] data) throws IOException {
assert sock != null;
assert data != null;
SocketTimeoutObject obj = new SocketTimeoutObject(sock, U.currentTimeMillis() + sockTimeout);
sockTimeoutWorker.addTimeoutObject(obj);
IOException err = null;
try {
OutputStream out = sock.getOutputStream();
out.write(data);
out.flush();
}
catch (IOException e) {
err = e;
}
finally {
boolean cancelled = obj.cancel();
if (cancelled)
sockTimeoutWorker.removeTimeoutObject(obj);
// Throw original exception.
if (err != null)
throw err;
if (!cancelled)
throw new SocketTimeoutException("Write timed out (socket was concurrently closed).");
}
}
/**
* Writes message to the socket.
*
* @param sock Socket.
* @param msg Message.
* @throws IOException If IO failed or write timed out.
* @throws IgniteCheckedException If marshalling failed.
*/
protected void writeToSocket(Socket sock, TcpDiscoveryAbstractMessage msg) throws IOException, IgniteCheckedException {
writeToSocket(sock, msg, new GridByteArrayOutputStream(8 * 1024)); // 8K.
}
/**
* Writes message to the socket.
*
* @param sock Socket.
* @param msg Message.
* @param bout Byte array output stream.
* @throws IOException If IO failed or write timed out.
* @throws IgniteCheckedException If marshalling failed.
*/
@SuppressWarnings("ThrowFromFinallyBlock")
protected void writeToSocket(Socket sock, TcpDiscoveryAbstractMessage msg, GridByteArrayOutputStream bout)
throws IOException, IgniteCheckedException {
assert sock != null;
assert msg != null;
assert bout != null;
// Marshall message first to perform only write after.
marsh.marshal(msg, bout);
SocketTimeoutObject obj = new SocketTimeoutObject(sock, U.currentTimeMillis() + sockTimeout);
sockTimeoutWorker.addTimeoutObject(obj);
IOException err = null;
try {
OutputStream out = sock.getOutputStream();
bout.writeTo(out);
out.flush();
}
catch (IOException e) {
err = e;
}
finally {
boolean cancelled = obj.cancel();
if (cancelled)
sockTimeoutWorker.removeTimeoutObject(obj);
// Throw original exception.
if (err != null)
throw err;
if (!cancelled)
throw new SocketTimeoutException("Write timed out (socket was concurrently closed).");
}
}
/**
* Writes response to the socket.
*
* @param sock Socket.
* @param res Integer response.
* @throws IOException If IO failed or write timed out.
*/
@SuppressWarnings("ThrowFromFinallyBlock")
protected void writeToSocket(Socket sock, int res) throws IOException {
assert sock != null;
SocketTimeoutObject obj = new SocketTimeoutObject(sock, U.currentTimeMillis() + sockTimeout);
sockTimeoutWorker.addTimeoutObject(obj);
OutputStream out = sock.getOutputStream();
IOException err = null;
try {
out.write(res);
out.flush();
}
catch (IOException e) {
err = e;
}
finally {
boolean cancelled = obj.cancel();
if (cancelled)
sockTimeoutWorker.removeTimeoutObject(obj);
// Throw original exception.
if (err != null)
throw err;
if (!cancelled)
throw new SocketTimeoutException("Write timed out (socket was concurrently closed).");
}
}
/**
* Reads message from the socket limiting read time.
*
* @param sock Socket.
* @param in Input stream (in case socket stream was wrapped).
* @param timeout Socket timeout for this operation.
* @return Message.
* @throws IOException If IO failed or read timed out.
* @throws IgniteCheckedException If unmarshalling failed.
*/
protected <T> T readMessage(Socket sock, @Nullable InputStream in, long timeout) throws IOException, IgniteCheckedException {
assert sock != null;
int oldTimeout = sock.getSoTimeout();
try {
sock.setSoTimeout((int)timeout);
return marsh.unmarshal(in == null ? sock.getInputStream() : in, U.gridClassLoader());
}
catch (IOException | IgniteCheckedException e) {
if (X.hasCause(e, SocketTimeoutException.class))
LT.warn(log, null, "Timed out waiting for message to be read (most probably, the reason is " +
"in long GC pauses on remote node. Current timeout: " + timeout + '.');
throw e;
}
finally {
// Quietly restore timeout.
try {
sock.setSoTimeout(oldTimeout);
}
catch (SocketException ignored) {
// No-op.
}
}
}
/**
* Reads message delivery receipt from the socket.
*
* @param sock Socket.
* @param timeout Socket timeout for this operation.
* @return Receipt.
* @throws IOException If IO failed or read timed out.
*/
protected int readReceipt(Socket sock, long timeout) throws IOException {
assert sock != null;
int oldTimeout = sock.getSoTimeout();
try {
sock.setSoTimeout((int)timeout);
int res = sock.getInputStream().read();
if (res == -1)
throw new EOFException();
return res;
}
catch (SocketTimeoutException e) {
LT.warn(log, null, "Timed out waiting for message delivery receipt (most probably, the reason is " +
"in long GC pauses on remote node; consider tuning GC and increasing 'ackTimeout' " +
"configuration property). Will retry to send message with increased timeout. " +
"Current timeout: " + timeout + '.');
stats.onAckTimeout();
throw e;
}
finally {
// Quietly restore timeout.
try {
sock.setSoTimeout(oldTimeout);
}
catch (SocketException ignored) {
// No-op.
}
}
}
/**
* Resolves addresses registered in the IP finder, removes duplicates and local host
* address and returns the collection of.
*
* @return Resolved addresses without duplicates and local address (potentially
* empty but never null).
* @throws org.apache.ignite.spi.IgniteSpiException If an error occurs.
*/
protected Collection<InetSocketAddress> resolvedAddresses() throws IgniteSpiException {
List<InetSocketAddress> res = new ArrayList<>();
Collection<InetSocketAddress> addrs;
// Get consistent addresses collection.
while (true) {
try {
addrs = registeredAddresses();
break;
}
catch (IgniteSpiException e) {
LT.error(log, e, "Failed to get registered addresses from IP finder on start " +
"(retrying every 2000 ms).");
}
try {
U.sleep(2000);
}
catch (IgniteInterruptedCheckedException e) {
throw new IgniteSpiException("Thread has been interrupted.", e);
}
}
for (InetSocketAddress addr : addrs) {
assert addr != null;
try {
InetSocketAddress resolved = addr.isUnresolved() ?
new InetSocketAddress(InetAddress.getByName(addr.getHostName()), addr.getPort()) : addr;
if (locNodeAddrs == null || !locNodeAddrs.contains(resolved))
res.add(resolved);
}
catch (UnknownHostException ignored) {
LT.warn(log, null, "Failed to resolve address from IP finder (host is unknown): " + addr);
// Add address in any case.
res.add(addr);
}
}
if (!res.isEmpty())
Collections.shuffle(res);
return res;
}
/**
* Gets addresses registered in the IP finder, initializes addresses having no
* port (or 0 port) with {@link #DFLT_PORT}.
*
* @return Registered addresses.
* @throws org.apache.ignite.spi.IgniteSpiException If an error occurs.
*/
protected Collection<InetSocketAddress> registeredAddresses() throws IgniteSpiException {
Collection<InetSocketAddress> res = new ArrayList<>();
for (InetSocketAddress addr : ipFinder.getRegisteredAddresses()) {
if (addr.getPort() == 0) {
// TcpDiscoveryNode.discoveryPort() returns an correct port for a server node and 0 for client node.
int port = locNode.discoveryPort() != 0 ? locNode.discoveryPort() : DFLT_PORT;
addr = addr.isUnresolved() ? new InetSocketAddress(addr.getHostName(), port) :
new InetSocketAddress(addr.getAddress(), port);
}
res.add(addr);
}
return res;
}
/**
* @param msg Message.
* @return Error.
*/
protected IgniteSpiException duplicateIdError(TcpDiscoveryDuplicateIdMessage msg) {
assert msg != null;
return new IgniteSpiException("Local node has the same ID as existing node in topology " +
"(fix configuration and restart local node) [localNode=" + locNode +
", existingNode=" + msg.node() + ']');
}
/**
* @param msg Message.
* @return Error.
*/
protected IgniteSpiException authenticationFailedError(TcpDiscoveryAuthFailedMessage msg) {
assert msg != null;
return new IgniteSpiException(new IgniteAuthenticationException("Authentication failed [nodeId=" +
msg.creatorNodeId() + ", addr=" + msg.address().getHostAddress() + ']'));
}
/**
* @param msg Message.
* @return Error.
*/
protected IgniteSpiException checkFailedError(TcpDiscoveryCheckFailedMessage msg) {
assert msg != null;
return versionCheckFailed(msg) ? new IgniteSpiVersionCheckException(msg.error()) :
new IgniteSpiException(msg.error());
}
/**
* @param msg Message.
* @return Whether delivery of the message is ensured.
*/
protected boolean ensured(TcpDiscoveryAbstractMessage msg) {
return U.getAnnotation(msg.getClass(), TcpDiscoveryEnsureDelivery.class) != null;
}
/**
* @param msg Failed message.
* @return {@code True} if specified failed message relates to version incompatibility, {@code false} otherwise.
* @deprecated Parsing of error message was used for preserving backward compatibility. We should remove it
* and create separate message for failed version check with next major release.
*/
@Deprecated
private static boolean versionCheckFailed(TcpDiscoveryCheckFailedMessage msg) {
return msg.error().contains("versions are not compatible");
}
/**
* @param joiningNodeID Joining node ID.
* @param nodeId Remote node ID for which data is provided.
* @param data Collection of marshalled discovery data objects from different components.
* @param clsLdr Class loader for discovery data unmarshalling.
*/
protected void onExchange(UUID joiningNodeID,
UUID nodeId,
Map<Integer, byte[]> data,
ClassLoader clsLdr)
{
Map<Integer, Serializable> data0 = U.newHashMap(data.size());
for (Map.Entry<Integer, byte[]> entry : data.entrySet()) {
try {
Serializable compData = marsh.unmarshal(entry.getValue(), clsLdr);
data0.put(entry.getKey(), compData);
}
catch (IgniteCheckedException e) {
U.error(log, "Failed to unmarshal discovery data for component: " + entry.getKey(), e);
}
}
exchange.onExchange(joiningNodeID, nodeId, data0);
}
/**
* Handles sockets timeouts.
*/
protected class SocketTimeoutWorker extends IgniteSpiThread {
/** Time-based sorted set for timeout objects. */
private final GridConcurrentSkipListSet<SocketTimeoutObject> timeoutObjs =
new GridConcurrentSkipListSet<>(new Comparator<SocketTimeoutObject>() {
@Override public int compare(SocketTimeoutObject o1, SocketTimeoutObject o2) {
long time1 = o1.endTime();
long time2 = o2.endTime();
long id1 = o1.id();
long id2 = o2.id();
return time1 < time2 ? -1 : time1 > time2 ? 1 :
id1 < id2 ? -1 : id1 > id2 ? 1 : 0;
}
});
/** Mutex. */
private final Object mux0 = new Object();
/**
*
*/
SocketTimeoutWorker() {
super(gridName, "tcp-disco-sock-timeout-worker", log);
setPriority(threadPri);
}
/**
* @param timeoutObj Timeout object to add.
*/
@SuppressWarnings({"NakedNotify"})
public void addTimeoutObject(SocketTimeoutObject timeoutObj) {
assert timeoutObj != null && timeoutObj.endTime() > 0 && timeoutObj.endTime() != Long.MAX_VALUE;
timeoutObjs.add(timeoutObj);
if (timeoutObjs.firstx() == timeoutObj) {
synchronized (mux0) {
mux0.notifyAll();
}
}
}
/**
* @param timeoutObj Timeout object to remove.
*/
public void removeTimeoutObject(SocketTimeoutObject timeoutObj) {
assert timeoutObj != null;
timeoutObjs.remove(timeoutObj);
}
/** {@inheritDoc} */
@Override protected void body() throws InterruptedException {
if (log.isDebugEnabled())
log.debug("Socket timeout worker has been started.");
while (!isInterrupted()) {
long now = U.currentTimeMillis();
for (Iterator<SocketTimeoutObject> iter = timeoutObjs.iterator(); iter.hasNext(); ) {
SocketTimeoutObject timeoutObj = iter.next();
if (timeoutObj.endTime() <= now) {
iter.remove();
if (timeoutObj.onTimeout()) {
LT.warn(log, null, "Socket write has timed out (consider increasing " +
"'sockTimeout' configuration property) [sockTimeout=" + sockTimeout + ']');
stats.onSocketTimeout();
}
}
else
break;
}
synchronized (mux0) {
while (true) {
// Access of the first element must be inside of
// synchronization block, so we don't miss out
// on thread notification events sent from
// 'addTimeoutObject(..)' method.
SocketTimeoutObject first = timeoutObjs.firstx();
if (first != null) {
long waitTime = first.endTime() - U.currentTimeMillis();
if (waitTime > 0)
mux0.wait(waitTime);
else
break;
}
else
mux0.wait(5000);
}
}
}
}
}
/**
* Socket timeout object.
*/
private static class SocketTimeoutObject {
/** */
private static final AtomicLong idGen = new AtomicLong();
/** */
private final long id = idGen.incrementAndGet();
/** */
private final Socket sock;
/** */
private final long endTime;
/** */
private final AtomicBoolean done = new AtomicBoolean();
/**
* @param sock Socket.
* @param endTime End time.
*/
SocketTimeoutObject(Socket sock, long endTime) {
assert sock != null;
assert endTime > 0;
this.sock = sock;
this.endTime = endTime;
}
/**
* @return {@code True} if object has not yet been processed.
*/
boolean cancel() {
return done.compareAndSet(false, true);
}
/**
* @return {@code True} if object has not yet been canceled.
*/
boolean onTimeout() {
if (done.compareAndSet(false, true)) {
// Close socket - timeout occurred.
U.closeQuiet(sock);
return true;
}
return false;
}
/**
* @return End time.
*/
long endTime() {
return endTime;
}
/**
* @return ID.
*/
long id() {
return id;
}
/** {@inheritDoc} */
@Override public String toString() {
return S.toString(SocketTimeoutObject.class, this);
}
}
/**
* Base class for message workers.
*/
protected abstract class MessageWorkerAdapter extends IgniteSpiThread {
/** Pre-allocated output stream (100K). */
private final GridByteArrayOutputStream bout = new GridByteArrayOutputStream(100 * 1024);
/** Message queue. */
private final BlockingDeque<TcpDiscoveryAbstractMessage> queue = new LinkedBlockingDeque<>();
/** Backed interrupted flag. */
private volatile boolean interrupted;
/**
* @param name Thread name.
*/
protected MessageWorkerAdapter(String name) {
super(gridName, name, log);
setPriority(threadPri);
}
/** {@inheritDoc} */
@Override protected void body() throws InterruptedException {
if (log.isDebugEnabled())
log.debug("Message worker started [locNodeId=" + getLocalNodeId() + ']');
while (!isInterrupted()) {
TcpDiscoveryAbstractMessage msg = queue.poll(2000, TimeUnit.MILLISECONDS);
if (msg == null)
continue;
processMessage(msg);
}
}
/** {@inheritDoc} */
@Override public void interrupt() {
interrupted = true;
super.interrupt();
}
/** {@inheritDoc} */
@Override public boolean isInterrupted() {
return interrupted || super.isInterrupted();
}
/**
* @return Current queue size.
*/
int queueSize() {
return queue.size();
}
/**
* Adds message to queue.
*
* @param msg Message to add.
*/
void addMessage(TcpDiscoveryAbstractMessage msg) {
assert msg != null;
if (msg instanceof TcpDiscoveryHeartbeatMessage)
queue.addFirst(msg);
else
queue.add(msg);
if (log.isDebugEnabled())
log.debug("Message has been added to queue: " + msg);
}
/**
* @param msg Message.
*/
protected abstract void processMessage(TcpDiscoveryAbstractMessage msg);
/**
* @param sock Socket.
* @param msg Message.
* @throws IOException If IO failed.
* @throws IgniteCheckedException If marshalling failed.
*/
protected final void writeToSocket(Socket sock, TcpDiscoveryAbstractMessage msg)
throws IOException, IgniteCheckedException {
bout.reset();
TcpDiscoverySpiAdapter.this.writeToSocket(sock, msg, bout);
}
}
/**
*
*/
protected class SocketMultiConnector implements AutoCloseable {
/** */
private int connInProgress;
/** */
private final ExecutorService executor;
/** */
private final CompletionService<GridTuple3<InetSocketAddress, Socket, Exception>> completionSrvc;
/**
* @param addrs Addresses.
* @param retryCnt Retry count.
*/
public SocketMultiConnector(Collection<InetSocketAddress> addrs, final int retryCnt) {
connInProgress = addrs.size();
executor = Executors.newFixedThreadPool(Math.min(1, addrs.size()));
completionSrvc = new ExecutorCompletionService<>(executor);
for (final InetSocketAddress addr : addrs) {
completionSrvc.submit(new Callable<GridTuple3<InetSocketAddress, Socket, Exception>>() {
@Override public GridTuple3<InetSocketAddress, Socket, Exception> call() {
Exception ex = null;
Socket sock = null;
for (int i = 0; i < retryCnt; i++) {
if (Thread.currentThread().isInterrupted())
return null; // Executor is shutdown.
try {
sock = openSocket(addr);
break;
}
catch (Exception e) {
ex = e;
}
}
return new GridTuple3<>(addr, sock, ex);
}
});
}
}
/**
*
*/
@Nullable public GridTuple3<InetSocketAddress, Socket, Exception> next() {
if (connInProgress == 0)
return null;
try {
Future<GridTuple3<InetSocketAddress, Socket, Exception>> fut = completionSrvc.take();
connInProgress--;
return fut.get();
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new IgniteSpiException("Thread has been interrupted.", e);
}
catch (ExecutionException e) {
throw new IgniteSpiException(e);
}
}
/** {@inheritDoc} */
@Override public void close() {
List<Runnable> unstartedTasks = executor.shutdownNow();
connInProgress -= unstartedTasks.size();
if (connInProgress > 0) {
Thread thread = new Thread(new Runnable() {
@Override public void run() {
try {
executor.awaitTermination(5, TimeUnit.MINUTES);
Future<GridTuple3<InetSocketAddress, Socket, Exception>> fut;
while ((fut = completionSrvc.poll()) != null) {
try {
GridTuple3<InetSocketAddress, Socket, Exception> tuple3 = fut.get();
if (tuple3 != null)
IgniteUtils.closeQuiet(tuple3.get2());
}
catch (ExecutionException ignore) {
}
}
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
}
}
});
thread.setDaemon(true);
thread.start();
}
}
}
}