blob: f9c4a4dd0fdb2a5d0ec0eafc885c5fe30290c1d4 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.spi.discovery.tcp;
import org.apache.ignite.*;
import org.apache.ignite.cache.*;
import org.apache.ignite.cluster.*;
import org.apache.ignite.internal.*;
import org.apache.ignite.internal.util.future.*;
import org.apache.ignite.internal.util.typedef.*;
import org.apache.ignite.internal.util.typedef.internal.*;
import org.apache.ignite.lang.*;
import org.apache.ignite.spi.*;
import org.apache.ignite.spi.discovery.*;
import org.apache.ignite.spi.discovery.tcp.internal.*;
import org.apache.ignite.spi.discovery.tcp.messages.*;
import org.jetbrains.annotations.*;
import org.jsr166.*;
import java.io.*;
import java.net.*;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.*;
import static java.util.concurrent.TimeUnit.*;
import static org.apache.ignite.events.EventType.*;
import static org.apache.ignite.internal.events.DiscoveryCustomEvent.*;
/**
*
*/
class ClientImpl extends TcpDiscoveryImpl {
/** */
private static final Object JOIN_TIMEOUT = "JOIN_TIMEOUT";
/** */
private static final Object SPI_STOP = "SPI_STOP";
/** */
private static final Object SPI_RECONNECT_FAILED = "SPI_RECONNECT_FAILED";
/** Remote nodes. */
private final ConcurrentMap<UUID, TcpDiscoveryNode> rmtNodes = new ConcurrentHashMap8<>();
/** Topology history. */
private final NavigableMap<Long, Collection<ClusterNode>> topHist = new TreeMap<>();
/** Remote nodes. */
private final ConcurrentMap<UUID, GridFutureAdapter<Boolean>> pingFuts = new ConcurrentHashMap8<>();
/** Socket writer. */
private SocketWriter sockWriter;
/** */
private SocketReader sockReader;
/** */
private boolean segmented;
/** Last message ID. */
private volatile IgniteUuid lastMsgId;
/** Current topology version. */
private volatile long topVer;
/** Join error. Contains error what occurs on join process. */
private final AtomicReference<IgniteSpiException> joinErr = new AtomicReference<>();
/** Joined latch. */
private final CountDownLatch joinLatch = new CountDownLatch(1);
/** Left latch. */
private final CountDownLatch leaveLatch = new CountDownLatch(1);
/** */
private final Timer timer = new Timer("TcpDiscoverySpi.timer");
/** */
protected MessageWorker msgWorker;
/**
* @param adapter Adapter.
*/
ClientImpl(TcpDiscoverySpi adapter) {
super(adapter);
}
/** {@inheritDoc} */
@Override public void dumpDebugInfo(IgniteLogger log) {
StringBuilder b = new StringBuilder(U.nl());
b.append(">>>").append(U.nl());
b.append(">>>").append("Dumping discovery SPI debug info.").append(U.nl());
b.append(">>>").append(U.nl());
b.append("Local node ID: ").append(getLocalNodeId()).append(U.nl()).append(U.nl());
b.append("Local node: ").append(locNode).append(U.nl()).append(U.nl());
b.append("Internal threads: ").append(U.nl());
b.append(" Message worker: ").append(threadStatus(msgWorker)).append(U.nl());
b.append(" Socket reader: ").append(threadStatus(sockReader)).append(U.nl());
b.append(" Socket writer: ").append(threadStatus(sockWriter)).append(U.nl());
b.append(U.nl());
b.append("Nodes: ").append(U.nl());
for (ClusterNode node : allVisibleNodes())
b.append(" ").append(node.id()).append(U.nl());
b.append(U.nl());
b.append("Stats: ").append(spi.stats).append(U.nl());
U.quietAndInfo(log, b.toString());
}
/** {@inheritDoc} */
@Override public String getSpiState() {
if (sockWriter.isOnline())
return "connected";
return "disconnected";
}
/** {@inheritDoc} */
@Override public int getMessageWorkerQueueSize() {
return msgWorker.queueSize();
}
/** {@inheritDoc} */
@Override public UUID getCoordinator() {
return null;
}
/** {@inheritDoc} */
@Override public void spiStart(@Nullable String gridName) throws IgniteSpiException {
spi.initLocalNode(0, true);
locNode = spi.locNode;
sockWriter = new SocketWriter();
sockWriter.start();
sockReader = new SocketReader();
sockReader.start();
msgWorker = new MessageWorker();
msgWorker.start();
if (spi.ipFinder.isShared())
registerLocalNodeAddress();
try {
joinLatch.await();
IgniteSpiException err = joinErr.get();
if (err != null)
throw err;
}
catch (InterruptedException e) {
throw new IgniteSpiException("Thread has been interrupted.", e);
}
timer.schedule(new HeartbeatSender(), spi.hbFreq, spi.hbFreq);
spi.printStartInfo();
}
/** {@inheritDoc} */
@Override public void spiStop() throws IgniteSpiException {
timer.cancel();
if (msgWorker != null && msgWorker.isAlive()) { // Should always be alive
msgWorker.addMessage(SPI_STOP);
try {
if (!leaveLatch.await(spi.netTimeout, MILLISECONDS))
U.warn(log, "Failed to left node: timeout [nodeId=" + locNode + ']');
}
catch (InterruptedException ignored) {
// No-op.
}
}
for (GridFutureAdapter<Boolean> fut : pingFuts.values())
fut.onDone(false);
rmtNodes.clear();
U.interrupt(msgWorker);
U.interrupt(sockWriter);
U.interrupt(sockReader);
U.join(msgWorker, log);
U.join(sockWriter, log);
U.join(sockReader, log);
spi.printStopInfo();
}
/** {@inheritDoc} */
@Override public void onContextInitialized0(IgniteSpiContext spiCtx) throws IgniteSpiException {
// No-op.
}
/** {@inheritDoc} */
@Override public Collection<ClusterNode> getRemoteNodes() {
return U.arrayList(rmtNodes.values(), TcpDiscoveryNodesRing.VISIBLE_NODES);
}
/** {@inheritDoc} */
@Nullable @Override public ClusterNode getNode(UUID nodeId) {
if (getLocalNodeId().equals(nodeId))
return locNode;
TcpDiscoveryNode node = rmtNodes.get(nodeId);
return node != null && node.visible() ? node : null;
}
/** {@inheritDoc} */
@Override public boolean pingNode(@NotNull final UUID nodeId) {
if (nodeId.equals(getLocalNodeId()))
return true;
TcpDiscoveryNode node = rmtNodes.get(nodeId);
if (node == null || !node.visible())
return false;
GridFutureAdapter<Boolean> fut = pingFuts.get(nodeId);
if (fut == null) {
fut = new GridFutureAdapter<>();
GridFutureAdapter<Boolean> oldFut = pingFuts.putIfAbsent(nodeId, fut);
if (oldFut != null)
fut = oldFut;
else {
if (spi.getSpiContext().isStopping()) {
if (pingFuts.remove(nodeId, fut))
fut.onDone(false);
return false;
}
final GridFutureAdapter<Boolean> finalFut = fut;
timer.schedule(new TimerTask() {
@Override public void run() {
if (pingFuts.remove(nodeId, finalFut))
finalFut.onDone(false);
}
}, spi.netTimeout);
sockWriter.sendMessage(new TcpDiscoveryClientPingRequest(getLocalNodeId(), nodeId));
}
}
try {
return fut.get();
}
catch (IgniteInterruptedCheckedException ignored) {
return false;
}
catch (IgniteCheckedException e) {
throw new IgniteSpiException(e); // Should newer occur.
}
}
/** {@inheritDoc} */
@Override public void disconnect() throws IgniteSpiException {
U.interrupt(msgWorker);
U.interrupt(sockWriter);
U.interrupt(sockReader);
U.join(msgWorker, log);
U.join(sockWriter, log);
U.join(sockReader, log);
leaveLatch.countDown();
joinLatch.countDown();
spi.getSpiContext().deregisterPorts();
Collection<ClusterNode> rmts = getRemoteNodes();
// This is restart/disconnection and remote nodes are not empty.
// We need to fire FAIL event for each.
DiscoverySpiListener lsnr = spi.lsnr;
if (lsnr != null) {
for (ClusterNode n : rmts) {
rmtNodes.remove(n.id());
Collection<ClusterNode> top = updateTopologyHistory(topVer + 1, null);
lsnr.onDiscovery(EVT_NODE_FAILED, topVer, n, top, new TreeMap<>(topHist), null);
}
}
rmtNodes.clear();
}
/** {@inheritDoc} */
@Override public void sendCustomEvent(DiscoverySpiCustomMessage evt) {
if (segmented)
throw new IgniteException("Failed to send custom message: client is disconnected");
try {
sockWriter.sendMessage(new TcpDiscoveryCustomEventMessage(getLocalNodeId(), evt,
spi.marsh.marshal(evt)));
}
catch (IgniteCheckedException e) {
throw new IgniteSpiException("Failed to marshal custom event: " + evt, e);
}
}
/** {@inheritDoc} */
@Override public void failNode(UUID nodeId, @Nullable String warning) {
TcpDiscoveryNode node = rmtNodes.get(nodeId);
if (node != null) {
TcpDiscoveryNodeFailedMessage msg = new TcpDiscoveryNodeFailedMessage(getLocalNodeId(),
node.id(),
node.internalOrder());
msg.warning(warning);
msgWorker.addMessage(msg);
}
}
/**
* @param recon {@code True} if reconnects.
* @param timeout Timeout.
* @return Opened socket or {@code null} if timeout.
* @throws InterruptedException If interrupted.
* @throws IgniteSpiException If failed.
* @see TcpDiscoverySpi#joinTimeout
*/
@SuppressWarnings("BusyWait")
@Nullable private Socket joinTopology(boolean recon, long timeout) throws IgniteSpiException, InterruptedException {
Collection<InetSocketAddress> addrs = null;
long startTime = U.currentTimeMillis();
// Marshal credentials for backward compatibility and security.
marshalCredentials(locNode);
while (true) {
if (Thread.currentThread().isInterrupted())
throw new InterruptedException();
while (addrs == null || addrs.isEmpty()) {
addrs = spi.resolvedAddresses();
if (!F.isEmpty(addrs)) {
if (log.isDebugEnabled())
log.debug("Resolved addresses from IP finder: " + addrs);
}
else {
if (timeout > 0 && (U.currentTimeMillis() - startTime) > timeout)
return null;
U.warn(log, "No addresses registered in the IP finder (will retry in 2000ms): " + spi.ipFinder);
Thread.sleep(2000);
}
}
Collection<InetSocketAddress> addrs0 = new ArrayList<>(addrs);
Iterator<InetSocketAddress> it = addrs.iterator();
while (it.hasNext()) {
if (Thread.currentThread().isInterrupted())
throw new InterruptedException();
InetSocketAddress addr = it.next();
T2<Socket, Integer> sockAndRes = sendJoinRequest(recon, addr);
if (sockAndRes == null) {
it.remove();
continue;
}
assert sockAndRes.get1() != null && sockAndRes.get2() != null : sockAndRes;
Socket sock = sockAndRes.get1();
switch (sockAndRes.get2()) {
case RES_OK:
return sock;
case RES_CONTINUE_JOIN:
case RES_WAIT:
U.closeQuiet(sock);
break;
default:
if (log.isDebugEnabled())
log.debug("Received unexpected response to join request: " + sockAndRes.get2());
U.closeQuiet(sock);
}
}
if (addrs.isEmpty()) {
if (timeout > 0 && (U.currentTimeMillis() - startTime) > timeout)
return null;
LT.warn(log, null, "Failed to connect to any address from IP finder (will retry to join topology " +
"in 2000ms): " + addrs0);
Thread.sleep(2000);
}
}
}
/**
* @param recon {@code True} if reconnects.
* @param addr Address.
* @return Socket and connect response.
*/
@Nullable private T2<Socket, Integer> sendJoinRequest(boolean recon, InetSocketAddress addr) {
assert addr != null;
if (log.isDebugEnabled())
log.debug("Send join request [addr=" + addr + ", reconnect=" + recon +
", locNodeId=" + getLocalNodeId() + ']');
Collection<Throwable> errs = null;
long ackTimeout0 = spi.ackTimeout;
int connectAttempts = 1;
UUID locNodeId = getLocalNodeId();
for (int i = 0; i < spi.reconCnt; i++) {
boolean openSock = false;
Socket sock = null;
try {
long tstamp = U.currentTimeMillis();
sock = spi.openSocket(addr);
openSock = true;
TcpDiscoveryHandshakeRequest req = new TcpDiscoveryHandshakeRequest(locNodeId);
req.client(true);
spi.writeToSocket(sock, req);
TcpDiscoveryHandshakeResponse res = spi.readMessage(sock, null, ackTimeout0);
UUID rmtNodeId = res.creatorNodeId();
assert rmtNodeId != null;
assert !getLocalNodeId().equals(rmtNodeId);
spi.stats.onClientSocketInitialized(U.currentTimeMillis() - tstamp);
locNode.clientRouterNodeId(rmtNodeId);
tstamp = U.currentTimeMillis();
TcpDiscoveryAbstractMessage msg = recon ?
new TcpDiscoveryClientReconnectMessage(getLocalNodeId(), rmtNodeId, lastMsgId) :
new TcpDiscoveryJoinRequestMessage(locNode, spi.collectExchangeData(getLocalNodeId()));
msg.client(true);
spi.writeToSocket(sock, msg);
spi.stats.onMessageSent(msg, U.currentTimeMillis() - tstamp);
if (log.isDebugEnabled())
log.debug("Message has been sent to address [msg=" + msg + ", addr=" + addr +
", rmtNodeId=" + rmtNodeId + ']');
return new T2<>(sock, spi.readReceipt(sock, ackTimeout0));
}
catch (IOException | IgniteCheckedException e) {
U.closeQuiet(sock);
if (log.isDebugEnabled())
log.error("Exception on joining: " + e.getMessage(), e);
onException("Exception on joining: " + e.getMessage(), e);
if (errs == null)
errs = new ArrayList<>();
errs.add(e);
if (!openSock) {
// Reconnect for the second time, if connection is not established.
if (connectAttempts < 2) {
connectAttempts++;
continue;
}
break; // Don't retry if we can not establish connection.
}
if (e instanceof SocketTimeoutException || X.hasCause(e, SocketTimeoutException.class)) {
ackTimeout0 *= 2;
if (!checkAckTimeout(ackTimeout0))
break;
}
}
}
if (log.isDebugEnabled())
log.debug("Failed to join to address [addr=" + addr + ", recon=" + recon + ", errs=" + errs + ']');
return null;
}
/**
* Marshalls credentials with discovery SPI marshaller (will replace attribute value).
*
* @param node Node to marshall credentials for.
* @throws IgniteSpiException If marshalling failed.
*/
private void marshalCredentials(TcpDiscoveryNode node) throws IgniteSpiException {
try {
// Use security-unsafe getter.
Map<String, Object> attrs = new HashMap<>(node.getAttributes());
attrs.put(IgniteNodeAttributes.ATTR_SECURITY_CREDENTIALS,
spi.marsh.marshal(attrs.get(IgniteNodeAttributes.ATTR_SECURITY_CREDENTIALS)));
node.setAttributes(attrs);
}
catch (IgniteCheckedException e) {
throw new IgniteSpiException("Failed to marshal node security credentials: " + node.id(), e);
}
}
/**
* @param topVer New topology version.
* @param msg Discovery message.
* @return Latest topology snapshot.
*/
private Collection<ClusterNode> updateTopologyHistory(long topVer, @Nullable TcpDiscoveryAbstractMessage msg) {
this.topVer = topVer;
if (!topHist.isEmpty() && topVer <= topHist.lastKey()) {
if (log.isDebugEnabled())
log.debug("Skip topology update since topology already updated [msg=" + msg +
", lastHistKey=" + topHist.lastKey() +
", topVer=" + topVer +
", locNode=" + locNode + ']');
Collection<ClusterNode> top = topHist.get(topVer);
assert top != null : msg;
return top;
}
NavigableSet<ClusterNode> allNodes = allVisibleNodes();
if (!topHist.containsKey(topVer)) {
assert topHist.isEmpty() || topHist.lastKey() == topVer - 1 :
"lastVer=" + (topHist.isEmpty() ? null : topHist.lastKey()) +
", newVer=" + topVer +
", locNode=" + locNode +
", msg=" + msg;
topHist.put(topVer, allNodes);
if (topHist.size() > spi.topHistSize)
topHist.pollFirstEntry();
assert topHist.lastKey() == topVer;
assert topHist.size() <= spi.topHistSize;
}
return allNodes;
}
/**
* @return All nodes.
*/
private NavigableSet<ClusterNode> allVisibleNodes() {
NavigableSet<ClusterNode> allNodes = new TreeSet<>();
for (TcpDiscoveryNode node : rmtNodes.values()) {
if (node.visible())
allNodes.add(node);
}
allNodes.add(locNode);
return allNodes;
}
/** {@inheritDoc} */
@Override void simulateNodeFailure() {
U.warn(log, "Simulating client node failure: " + getLocalNodeId());
U.interrupt(sockWriter);
U.interrupt(msgWorker);
U.join(sockWriter, log);
U.join(msgWorker, log);
}
/** {@inheritDoc} */
@Override public void brakeConnection() {
U.closeQuiet(msgWorker.currSock);
}
/** {@inheritDoc} */
@Override protected IgniteSpiThread workerThread() {
return msgWorker;
}
/**
* FOR TEST PURPOSE ONLY!
*/
@SuppressWarnings("BusyWait")
public void waitForClientMessagePrecessed() {
Object last = msgWorker.queue.peekLast();
while (last != null && msgWorker.isAlive() && msgWorker.queue.contains(last)) {
try {
Thread.sleep(10);
}
catch (InterruptedException ignored) {
Thread.currentThread().interrupt();
}
}
}
/**
* @param err Error.
*/
private void joinError(IgniteSpiException err) {
assert err != null;
joinErr.compareAndSet(null, err);
joinLatch.countDown();
}
/**
* Heartbeat sender.
*/
private class HeartbeatSender extends TimerTask {
/** {@inheritDoc} */
@Override public void run() {
if (!spi.getSpiContext().isStopping() && sockWriter.isOnline()) {
TcpDiscoveryClientHeartbeatMessage msg = new TcpDiscoveryClientHeartbeatMessage(getLocalNodeId(),
spi.metricsProvider.metrics());
msg.client(true);
sockWriter.sendMessage(msg);
}
}
}
/**
* Socket reader.
*/
private class SocketReader extends IgniteSpiThread {
/** */
private final Object mux = new Object();
/** */
private Socket sock;
/** */
private UUID rmtNodeId;
/**
*/
protected SocketReader() {
super(spi.ignite().name(), "tcp-client-disco-sock-reader", log);
}
/**
* @param sock Socket.
* @param rmtNodeId Rmt node id.
*/
public void setSocket(Socket sock, UUID rmtNodeId) {
synchronized (mux) {
this.sock = sock;
this.rmtNodeId = rmtNodeId;
mux.notifyAll();
}
}
/** {@inheritDoc} */
@Override protected void body() throws InterruptedException {
while (!isInterrupted()) {
Socket sock;
UUID rmtNodeId;
synchronized (mux) {
if (this.sock == null) {
mux.wait();
continue;
}
sock = this.sock;
rmtNodeId = this.rmtNodeId;
}
try {
InputStream in = new BufferedInputStream(sock.getInputStream());
sock.setKeepAlive(true);
sock.setTcpNoDelay(true);
while (!isInterrupted()) {
TcpDiscoveryAbstractMessage msg;
try {
msg = spi.marsh.unmarshal(in, U.gridClassLoader());
}
catch (IgniteCheckedException e) {
if (log.isDebugEnabled())
U.error(log, "Failed to read message [sock=" + sock + ", " +
"locNodeId=" + getLocalNodeId() + ", rmtNodeId=" + rmtNodeId + ']', e);
IOException ioEx = X.cause(e, IOException.class);
if (ioEx != null)
throw ioEx;
ClassNotFoundException clsNotFoundEx = X.cause(e, ClassNotFoundException.class);
if (clsNotFoundEx != null)
LT.warn(log, null, "Failed to read message due to ClassNotFoundException " +
"(make sure same versions of all classes are available on all nodes) " +
"[rmtNodeId=" + rmtNodeId + ", err=" + clsNotFoundEx.getMessage() + ']');
else
LT.error(log, e, "Failed to read message [sock=" + sock + ", locNodeId=" +
getLocalNodeId() + ", rmtNodeId=" + rmtNodeId + ']');
continue;
}
msg.senderNodeId(rmtNodeId);
if (log.isDebugEnabled())
log.debug("Message has been received: " + msg);
spi.stats.onMessageReceived(msg);
if (spi.ensured(msg) && joinLatch.getCount() == 0L)
lastMsgId = msg.id();
msgWorker.addMessage(msg);
}
}
catch (IOException e) {
msgWorker.addMessage(new SocketClosedMessage(sock));
if (log.isDebugEnabled())
U.error(log, "Connection failed [sock=" + sock + ", locNodeId=" + getLocalNodeId() + ']', e);
}
finally {
U.closeQuiet(sock);
synchronized (mux) {
if (this.sock == sock) {
this.sock = null;
this.rmtNodeId = null;
}
}
}
}
}
}
/**
*
*/
private class SocketWriter extends IgniteSpiThread {
/** */
private final Object mux = new Object();
/** */
private Socket sock;
/** */
private final Queue<TcpDiscoveryAbstractMessage> queue = new ArrayDeque<>();
/**
*
*/
protected SocketWriter() {
super(spi.ignite().name(), "tcp-client-disco-sock-writer", log);
}
/**
* @param msg Message.
*/
private void sendMessage(TcpDiscoveryAbstractMessage msg) {
synchronized (mux) {
queue.add(msg);
mux.notifyAll();
}
}
/**
* @param sock Socket.
*/
private void setSocket(Socket sock) {
synchronized (mux) {
this.sock = sock;
mux.notifyAll();
}
}
/**
* @return {@code True} if connection is alive.
*/
public boolean isOnline() {
synchronized (mux) {
return sock != null;
}
}
/** {@inheritDoc} */
@Override protected void body() throws InterruptedException {
TcpDiscoveryAbstractMessage msg = null;
while (!Thread.currentThread().isInterrupted()) {
Socket sock;
synchronized (mux) {
sock = this.sock;
if (sock == null) {
mux.wait();
continue;
}
if (msg == null)
msg = queue.poll();
if (msg == null) {
mux.wait();
continue;
}
}
for (IgniteInClosure<TcpDiscoveryAbstractMessage> msgLsnr : spi.sendMsgLsnrs)
msgLsnr.apply(msg);
try {
spi.writeToSocket(sock, msg);
msg = null;
}
catch (IOException e) {
if (log.isDebugEnabled())
U.error(log, "Failed to send node left message (will stop anyway) " +
"[sock=" + sock + ", msg=" + msg + ']', e);
U.closeQuiet(sock);
synchronized (mux) {
if (sock == this.sock)
this.sock = null; // Connection has dead.
}
}
catch (IgniteCheckedException e) {
U.error(log, "Failed to send message: " + msg, e);
msg = null;
}
}
}
}
/**
*
*/
private class Reconnector extends IgniteSpiThread {
/** */
private volatile Socket sock;
/** */
private boolean join;
/**
* @param join {@code True} if reconnects during join.
*/
protected Reconnector(boolean join) {
super(spi.ignite().name(), "tcp-client-disco-reconnector", log);
this.join = join;
}
/**
*
*/
public void cancel() {
interrupt();
U.closeQuiet(sock);
}
/** {@inheritDoc} */
@Override protected void body() throws InterruptedException {
assert !segmented;
boolean success = false;
Exception err = null;
long timeout = join ? spi.joinTimeout : spi.netTimeout;
long startTime = U.currentTimeMillis();
try {
while (true) {
sock = joinTopology(true, timeout);
if (sock == null) {
if (join) {
joinError(new IgniteSpiException("Join process timed out, connection failed and " +
"failed to reconnect (consider increasing 'joinTimeout' configuration property) " +
"[networkTimeout=" + spi.joinTimeout + ", sock=" + sock + ']'));
}
else
U.error(log, "Failed to reconnect to cluster (consider increasing 'networkTimeout' " +
"configuration property) [networkTimeout=" + spi.netTimeout + ", sock=" + sock + ']');
return;
}
if (isInterrupted())
throw new InterruptedException();
int oldTimeout = 0;
try {
oldTimeout = sock.getSoTimeout();
sock.setSoTimeout((int)spi.netTimeout);
InputStream in = new BufferedInputStream(sock.getInputStream());
sock.setKeepAlive(true);
sock.setTcpNoDelay(true);
List<TcpDiscoveryAbstractMessage> msgs = null;
while (!isInterrupted()) {
TcpDiscoveryAbstractMessage msg = spi.marsh.unmarshal(in, U.gridClassLoader());
if (msg instanceof TcpDiscoveryClientReconnectMessage) {
TcpDiscoveryClientReconnectMessage res = (TcpDiscoveryClientReconnectMessage)msg;
if (res.creatorNodeId().equals(getLocalNodeId())) {
if (res.success()) {
msgWorker.addMessage(res);
if (msgs != null) {
for (TcpDiscoveryAbstractMessage msg0 : msgs)
msgWorker.addMessage(msg0);
}
success = true;
}
return;
}
}
else if (spi.ensured(msg)) {
if (msgs == null)
msgs = new ArrayList<>();
msgs.add(msg);
}
}
}
catch (IOException | IgniteCheckedException e) {
U.closeQuiet(sock);
if (log.isDebugEnabled())
log.error("Reconnect error [join=" + join + ", timeout=" + timeout + ']', e);
if (timeout > 0 && (U.currentTimeMillis() - startTime) > timeout) {
String msg = join ? "Failed to connect to cluster (consider increasing 'joinTimeout' " +
"configuration property) [joinTimeout=" + spi.joinTimeout + ", err=" + e + ']' :
"Failed to reconnect to cluster (consider increasing 'networkTimeout' " +
"configuration property) [networkTimeout=" + spi.netTimeout + ", err=" + e + ']';
U.warn(log, msg);
throw e;
}
else
U.warn(log, "Failed to reconnect to cluster (will retry): " + e);
}
finally {
if (success)
sock.setSoTimeout(oldTimeout);
}
}
}
catch (IOException | IgniteCheckedException e) {
err = e;
U.error(log, "Failed to reconnect", e);
}
finally {
if (!success) {
U.closeQuiet(sock);
if (join)
joinError(new IgniteSpiException("Failed to connect to cluster, connection failed and failed " +
"to reconnect.", err));
else
msgWorker.addMessage(SPI_RECONNECT_FAILED);
}
}
}
}
/**
* Message worker.
*/
protected class MessageWorker extends IgniteSpiThread {
/** Message queue. */
private final BlockingDeque<Object> queue = new LinkedBlockingDeque<>();
/** */
private Socket currSock;
/** Indicates that pending messages are currently processed. */
private boolean pending;
/** */
private Reconnector reconnector;
/**
*
*/
private MessageWorker() {
super(spi.ignite().name(), "tcp-client-disco-msg-worker", log);
}
/** {@inheritDoc} */
@SuppressWarnings("InfiniteLoopStatement")
@Override protected void body() throws InterruptedException {
spi.stats.onJoinStarted();
try {
final Socket sock = joinTopology(false, spi.joinTimeout);
if (sock == null) {
joinError(new IgniteSpiException("Join process timed out."));
return;
}
currSock = sock;
sockWriter.setSocket(sock);
if (spi.joinTimeout > 0) {
timer.schedule(new TimerTask() {
@Override public void run() {
if (joinLatch.getCount() > 0)
queue.add(JOIN_TIMEOUT);
}
}, spi.joinTimeout);
}
sockReader.setSocket(sock, locNode.clientRouterNodeId());
while (true) {
Object msg = queue.take();
if (msg == JOIN_TIMEOUT) {
if (joinLatch.getCount() > 0) {
joinError(new IgniteSpiException("Join process timed out, did not receive response for " +
"join request (consider increasing 'joinTimeout' configuration property) " +
"[joinTimeout=" + spi.joinTimeout + ", sock=" + sock + ']'));
break;
}
}
else if (msg == SPI_STOP) {
assert spi.getSpiContext().isStopping();
if (currSock != null) {
TcpDiscoveryAbstractMessage leftMsg = new TcpDiscoveryNodeLeftMessage(getLocalNodeId());
leftMsg.client(true);
sockWriter.sendMessage(leftMsg);
}
else
leaveLatch.countDown();
}
else if (msg instanceof SocketClosedMessage) {
if (((SocketClosedMessage)msg).sock == currSock) {
currSock = null;
boolean join = joinLatch.getCount() > 0;
if (spi.getSpiContext().isStopping() || segmented) {
leaveLatch.countDown();
if (join) {
joinError(new IgniteSpiException("Failed to connect to cluster: socket closed."));
break;
}
}
else {
assert reconnector == null;
final Reconnector reconnector = new Reconnector(join);
this.reconnector = reconnector;
reconnector.start();
}
}
}
else if (msg == SPI_RECONNECT_FAILED) {
if (!segmented) {
segmented = true;
reconnector.cancel();
reconnector.join();
notifyDiscovery(EVT_NODE_SEGMENTED, topVer, locNode, allVisibleNodes());
}
}
else {
TcpDiscoveryAbstractMessage discoMsg = (TcpDiscoveryAbstractMessage)msg;
if (joinLatch.getCount() > 0) {
IgniteSpiException err = null;
if (discoMsg instanceof TcpDiscoveryDuplicateIdMessage)
err = spi.duplicateIdError((TcpDiscoveryDuplicateIdMessage)msg);
else if (discoMsg instanceof TcpDiscoveryAuthFailedMessage)
err = spi.authenticationFailedError((TcpDiscoveryAuthFailedMessage)msg);
else if (discoMsg instanceof TcpDiscoveryCheckFailedMessage)
err = spi.checkFailedError((TcpDiscoveryCheckFailedMessage)msg);
if (err != null) {
joinError(err);
break;
}
}
processDiscoveryMessage((TcpDiscoveryAbstractMessage)msg);
}
}
}
finally {
U.closeQuiet(currSock);
if (joinLatch.getCount() > 0)
joinError(new IgniteSpiException("Some error in join process.")); // This should not occur.
if (reconnector != null) {
reconnector.cancel();
reconnector.join();
}
}
}
/**
* @param msg Message.
*/
protected void processDiscoveryMessage(TcpDiscoveryAbstractMessage msg) {
assert msg != null;
assert msg.verified() || msg.senderNodeId() == null;
spi.stats.onMessageProcessingStarted(msg);
if (msg instanceof TcpDiscoveryNodeAddedMessage)
processNodeAddedMessage((TcpDiscoveryNodeAddedMessage)msg);
else if (msg instanceof TcpDiscoveryNodeAddFinishedMessage)
processNodeAddFinishedMessage((TcpDiscoveryNodeAddFinishedMessage)msg);
else if (msg instanceof TcpDiscoveryNodeLeftMessage)
processNodeLeftMessage((TcpDiscoveryNodeLeftMessage)msg);
else if (msg instanceof TcpDiscoveryNodeFailedMessage)
processNodeFailedMessage((TcpDiscoveryNodeFailedMessage)msg);
else if (msg instanceof TcpDiscoveryHeartbeatMessage)
processHeartbeatMessage((TcpDiscoveryHeartbeatMessage)msg);
else if (msg instanceof TcpDiscoveryClientReconnectMessage)
processClientReconnectMessage((TcpDiscoveryClientReconnectMessage)msg);
else if (msg instanceof TcpDiscoveryCustomEventMessage)
processCustomMessage((TcpDiscoveryCustomEventMessage)msg);
else if (msg instanceof TcpDiscoveryClientPingResponse)
processClientPingResponse((TcpDiscoveryClientPingResponse)msg);
else if (msg instanceof TcpDiscoveryPingRequest)
processPingRequest();
spi.stats.onMessageProcessingFinished(msg);
}
/**
* @param msg Message.
*/
private void processNodeAddedMessage(TcpDiscoveryNodeAddedMessage msg) {
if (spi.getSpiContext().isStopping())
return;
TcpDiscoveryNode node = msg.node();
UUID newNodeId = node.id();
if (getLocalNodeId().equals(newNodeId)) {
if (joinLatch.getCount() > 0) {
Collection<TcpDiscoveryNode> top = msg.topology();
if (top != null) {
spi.gridStartTime = msg.gridStartTime();
for (TcpDiscoveryNode n : top) {
if (n.order() > 0)
n.visible(true);
rmtNodes.put(n.id(), n);
}
topHist.clear();
if (msg.topologyHistory() != null)
topHist.putAll(msg.topologyHistory());
}
else if (log.isDebugEnabled())
log.debug("Discarding node added message with empty topology: " + msg);
}
else if (log.isDebugEnabled())
log.debug("Discarding node added message (this message has already been processed) " +
"[msg=" + msg + ", locNode=" + locNode + ']');
}
else {
if (nodeAdded()) {
boolean topChanged = rmtNodes.putIfAbsent(newNodeId, node) == null;
if (topChanged) {
if (log.isDebugEnabled())
log.debug("Added new node to topology: " + node);
Map<Integer, byte[]> data = msg.newNodeDiscoveryData();
if (data != null)
spi.onExchange(newNodeId, newNodeId, data, null);
}
}
else if (log.isDebugEnabled())
log.debug("Ignore topology message, local node not added to topology: " + msg);
}
}
/**
* @param msg Message.
*/
private void processNodeAddFinishedMessage(TcpDiscoveryNodeAddFinishedMessage msg) {
if (spi.getSpiContext().isStopping())
return;
if (getLocalNodeId().equals(msg.nodeId())) {
if (joinLatch.getCount() > 0) {
Map<UUID, Map<Integer, byte[]>> dataMap = msg.clientDiscoData();
if (dataMap != null) {
for (Map.Entry<UUID, Map<Integer, byte[]>> entry : dataMap.entrySet())
spi.onExchange(getLocalNodeId(), entry.getKey(), entry.getValue(), null);
}
locNode.setAttributes(msg.clientNodeAttributes());
locNode.visible(true);
long topVer = msg.topologyVersion();
locNode.order(topVer);
notifyDiscovery(EVT_NODE_JOINED, topVer, locNode, updateTopologyHistory(topVer, msg));
joinErr.set(null);;
joinLatch.countDown();
spi.stats.onJoinFinished();
}
else if (log.isDebugEnabled())
log.debug("Discarding node add finished message (this message has already been processed) " +
"[msg=" + msg + ", locNode=" + locNode + ']');
}
else {
if (nodeAdded()) {
TcpDiscoveryNode node = rmtNodes.get(msg.nodeId());
if (node == null) {
if (log.isDebugEnabled())
log.debug("Discarding node add finished message since node is not found [msg=" + msg + ']');
return;
}
boolean evt = false;
long topVer = msg.topologyVersion();
assert topVer > 0 : msg;
if (!node.visible()) {
node.order(topVer);
node.visible(true);
if (spi.locNodeVer.equals(node.version()))
node.version(spi.locNodeVer);
evt = true;
}
else {
if (log.isDebugEnabled())
log.debug("Skip node join event, node already joined [msg=" + msg + ", node=" + node + ']');
assert node.order() == topVer : node;
}
Collection<ClusterNode> top = updateTopologyHistory(topVer, msg);
assert top != null && top.contains(node) : "Topology does not contain node [msg=" + msg +
", node=" + node + ", top=" + top + ']';
if (!pending && joinLatch.getCount() > 0) {
if (log.isDebugEnabled())
log.debug("Discarding node add finished message (join process is not finished): " + msg);
return;
}
if (evt) {
notifyDiscovery(EVT_NODE_JOINED, topVer, node, top);
spi.stats.onNodeJoined();
}
}
else if (log.isDebugEnabled())
log.debug("Ignore topology message, local node not added to topology: " + msg);
}
}
/**
* @param msg Message.
*/
private void processNodeLeftMessage(TcpDiscoveryNodeLeftMessage msg) {
if (getLocalNodeId().equals(msg.creatorNodeId())) {
if (log.isDebugEnabled())
log.debug("Received node left message for local node: " + msg);
leaveLatch.countDown();
}
else {
if (spi.getSpiContext().isStopping())
return;
if (nodeAdded()) {
TcpDiscoveryNode node = rmtNodes.remove(msg.creatorNodeId());
if (node == null) {
if (log.isDebugEnabled())
log.debug("Discarding node left message since node is not found [msg=" + msg + ']');
return;
}
Collection<ClusterNode> top = updateTopologyHistory(msg.topologyVersion(), msg);
if (!pending && joinLatch.getCount() > 0) {
if (log.isDebugEnabled())
log.debug("Discarding node left message (join process is not finished): " + msg);
return;
}
notifyDiscovery(EVT_NODE_LEFT, msg.topologyVersion(), node, top);
spi.stats.onNodeLeft();
}
else if (log.isDebugEnabled())
log.debug("Ignore topology message, local node not added to topology: " + msg);
}
}
/**
* @return {@code True} if received node added message for local node.
*/
private boolean nodeAdded() {
return !topHist.isEmpty();
}
/**
* @param msg Message.
*/
private void processNodeFailedMessage(TcpDiscoveryNodeFailedMessage msg) {
if (spi.getSpiContext().isStopping()) {
if (!getLocalNodeId().equals(msg.creatorNodeId()) && getLocalNodeId().equals(msg.failedNodeId())) {
if (leaveLatch.getCount() > 0) {
if (log.isDebugEnabled())
log.debug("Remote node fail this node while node is stopping [locNode=" + getLocalNodeId()
+ ", rmtNode=" + msg.creatorNodeId() + ']');
leaveLatch.countDown();
}
}
return;
}
if (!getLocalNodeId().equals(msg.creatorNodeId())) {
TcpDiscoveryNode node = rmtNodes.remove(msg.failedNodeId());
if (node == null) {
if (log.isDebugEnabled())
log.debug("Discarding node failed message since node is not found [msg=" + msg + ']');
return;
}
Collection<ClusterNode> top = updateTopologyHistory(msg.topologyVersion(), msg);
if (!pending && joinLatch.getCount() > 0) {
if (log.isDebugEnabled())
log.debug("Discarding node failed message (join process is not finished): " + msg);
return;
}
if (msg.warning() != null) {
ClusterNode creatorNode = rmtNodes.get(msg.creatorNodeId());
U.warn(log, "Received EVT_NODE_FAILED event with warning [" +
"nodeInitiatedEvt=" + (creatorNode != null ? creatorNode : msg.creatorNodeId()) +
", msg=" + msg.warning() + ']');
}
notifyDiscovery(EVT_NODE_FAILED, msg.topologyVersion(), node, top);
spi.stats.onNodeFailed();
}
}
/**
* @param msg Message.
*/
private void processHeartbeatMessage(TcpDiscoveryHeartbeatMessage msg) {
if (spi.getSpiContext().isStopping())
return;
if (getLocalNodeId().equals(msg.creatorNodeId())) {
assert msg.senderNodeId() != null;
if (log.isDebugEnabled())
log.debug("Received heartbeat response: " + msg);
}
else {
long tstamp = U.currentTimeMillis();
if (msg.hasMetrics()) {
for (Map.Entry<UUID, TcpDiscoveryHeartbeatMessage.MetricsSet> e : msg.metrics().entrySet()) {
UUID nodeId = e.getKey();
TcpDiscoveryHeartbeatMessage.MetricsSet metricsSet = e.getValue();
Map<Integer, CacheMetrics> cacheMetrics = msg.hasCacheMetrics() ?
msg.cacheMetrics().get(nodeId) : Collections.<Integer, CacheMetrics>emptyMap();
updateMetrics(nodeId, metricsSet.metrics(), cacheMetrics, tstamp);
for (T2<UUID, ClusterMetrics> t : metricsSet.clientMetrics())
updateMetrics(t.get1(), t.get2(), cacheMetrics, tstamp);
}
}
}
}
/**
* @param msg Message.
*/
private void processClientReconnectMessage(TcpDiscoveryClientReconnectMessage msg) {
if (spi.getSpiContext().isStopping())
return;
if (getLocalNodeId().equals(msg.creatorNodeId())) {
if (reconnector != null) {
assert msg.success() : msg;
currSock = reconnector.sock;
sockWriter.setSocket(currSock);
sockReader.setSocket(currSock, locNode.clientRouterNodeId());
reconnector = null;
pending = true;
try {
for (TcpDiscoveryAbstractMessage pendingMsg : msg.pendingMessages()) {
if (log.isDebugEnabled())
log.debug("Process pending message on reconnect [msg=" + pendingMsg + ']');
processDiscoveryMessage(pendingMsg);
}
}
finally {
pending = false;
}
}
else {
if (joinLatch.getCount() > 0) {
if (msg.success()) {
for (TcpDiscoveryAbstractMessage pendingMsg : msg.pendingMessages()) {
if (log.isDebugEnabled())
log.debug("Process pending message on connect [msg=" + pendingMsg + ']');
processDiscoveryMessage(pendingMsg);
}
assert joinLatch.getCount() == 0 : msg;
}
}
else if (log.isDebugEnabled())
log.debug("Discarding reconnect message, reconnect is completed: " + msg);
}
}
else if (log.isDebugEnabled())
log.debug("Discarding reconnect message for another client: " + msg);
}
/**
* @param msg Message.
*/
private void processCustomMessage(TcpDiscoveryCustomEventMessage msg) {
if (msg.verified() && joinLatch.getCount() == 0) {
DiscoverySpiListener lsnr = spi.lsnr;
if (lsnr != null) {
UUID nodeId = msg.creatorNodeId();
TcpDiscoveryNode node = nodeId.equals(getLocalNodeId()) ? locNode : rmtNodes.get(nodeId);
if (node != null && node.visible()) {
try {
DiscoverySpiCustomMessage msgObj = msg.message(spi.marsh);
notifyDiscovery(EVT_DISCOVERY_CUSTOM_EVT, topVer, node, allVisibleNodes(), msgObj);
}
catch (Throwable e) {
U.error(log, "Failed to unmarshal discovery custom message.", e);
}
}
else if (log.isDebugEnabled())
log.debug("Received metrics from unknown node: " + nodeId);
}
}
}
/**
* @param msg Message.
*/
private void processClientPingResponse(TcpDiscoveryClientPingResponse msg) {
GridFutureAdapter<Boolean> fut = pingFuts.remove(msg.nodeToPing());
if (fut != null)
fut.onDone(msg.result());
}
/**
* Router want to ping this client.
*/
private void processPingRequest() {
TcpDiscoveryPingResponse res = new TcpDiscoveryPingResponse(getLocalNodeId());
res.client(true);
sockWriter.sendMessage(res);
}
/**
* @param nodeId Node ID.
* @param metrics Metrics.
* @param cacheMetrics Cache metrics.
* @param tstamp Timestamp.
*/
private void updateMetrics(UUID nodeId,
ClusterMetrics metrics,
Map<Integer, CacheMetrics> cacheMetrics,
long tstamp)
{
assert nodeId != null;
assert metrics != null;
assert cacheMetrics != null;
TcpDiscoveryNode node = nodeId.equals(getLocalNodeId()) ? locNode : rmtNodes.get(nodeId);
if (node != null && node.visible()) {
node.setMetrics(metrics);
node.setCacheMetrics(cacheMetrics);
node.lastUpdateTime(tstamp);
notifyDiscovery(EVT_NODE_METRICS_UPDATED, topVer, node, allVisibleNodes());
}
else if (log.isDebugEnabled())
log.debug("Received metrics from unknown node: " + nodeId);
}
/**
* @param type Event type.
* @param topVer Topology version.
* @param node Node.
* @param top Topology snapshot.
*/
private void notifyDiscovery(int type, long topVer, ClusterNode node, Collection<ClusterNode> top) {
notifyDiscovery(type, topVer, node, top, null);
}
/**
* @param type Event type.
* @param topVer Topology version.
* @param node Node.
* @param top Topology snapshot.
* @param data Optional custom message data.
*/
private void notifyDiscovery(int type, long topVer, ClusterNode node, Collection<ClusterNode> top,
@Nullable DiscoverySpiCustomMessage data) {
DiscoverySpiListener lsnr = spi.lsnr;
if (lsnr != null) {
if (log.isDebugEnabled())
log.debug("Discovery notification [node=" + node + ", type=" + U.gridEventName(type) +
", topVer=" + topVer + ']');
lsnr.onDiscovery(type, topVer, node, top, new TreeMap<>(topHist), data);
}
else if (log.isDebugEnabled())
log.debug("Skipped discovery notification [node=" + node + ", type=" + U.gridEventName(type) +
", topVer=" + topVer + ']');
}
/**
* @param msg Message.
*/
public void addMessage(Object msg) {
queue.add(msg);
}
/**
* @return Queue size.
*/
public int queueSize() {
return queue.size();
}
}
/**
*
*/
private static class SocketClosedMessage {
/** */
private final Socket sock;
/**
* @param sock Socket.
*/
private SocketClosedMessage(Socket sock) {
this.sock = sock;
}
}
}