blob: a5ae5a984e67d465084dc723651e30d6cd849a3b [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.spi.discovery.tcp;
import org.apache.ignite.*;
import org.apache.ignite.cache.*;
import org.apache.ignite.cluster.*;
import org.apache.ignite.internal.*;
import org.apache.ignite.internal.events.*;
import org.apache.ignite.internal.processors.security.*;
import org.apache.ignite.internal.util.*;
import org.apache.ignite.internal.util.future.*;
import org.apache.ignite.internal.util.io.*;
import org.apache.ignite.internal.util.lang.*;
import org.apache.ignite.internal.util.tostring.*;
import org.apache.ignite.internal.util.typedef.*;
import org.apache.ignite.internal.util.typedef.internal.*;
import org.apache.ignite.lang.*;
import org.apache.ignite.plugin.security.*;
import org.apache.ignite.spi.*;
import org.apache.ignite.spi.discovery.*;
import org.apache.ignite.spi.discovery.tcp.internal.*;
import org.apache.ignite.spi.discovery.tcp.messages.*;
import org.jetbrains.annotations.*;
import org.jsr166.*;
import java.io.*;
import java.net.*;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.*;
import static org.apache.ignite.events.EventType.*;
import static org.apache.ignite.internal.IgniteNodeAttributes.*;
import static org.apache.ignite.spi.IgnitePortProtocol.*;
import static org.apache.ignite.spi.discovery.tcp.internal.TcpDiscoverySpiState.*;
import static org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryStatusCheckMessage.*;
/**
*
*/
@SuppressWarnings("NonPrivateFieldAccessedInSynchronizedContext")
class ServerImpl extends TcpDiscoveryImpl {
/** */
private final Executor utilityPool = new ThreadPoolExecutor(0, 1, 2000, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
/** Nodes ring. */
@GridToStringExclude
private final TcpDiscoveryNodesRing ring = new TcpDiscoveryNodesRing();
/** Topology snapshots history. */
private final SortedMap<Long, Collection<ClusterNode>> topHist = new TreeMap<>();
/** Socket readers. */
private final Collection<SocketReader> readers = new LinkedList<>();
/** TCP server for discovery SPI. */
private TcpServer tcpSrvr;
/** Message worker. */
@SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized")
private RingMessageWorker msgWorker;
/** Client message workers. */
protected ConcurrentMap<UUID, ClientMessageWorker> clientMsgWorkers = new ConcurrentHashMap8<>();
/** Metrics sender. */
@SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized")
private HeartbeatsSender hbsSnd;
/** Status checker. */
@SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized")
private CheckStatusSender chkStatusSnd;
/** IP finder cleaner. */
@SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized")
private IpFinderCleaner ipFinderCleaner;
/** Statistics printer thread. */
@SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized")
private StatisticsPrinter statsPrinter;
/** Failed nodes (but still in topology). */
private Collection<TcpDiscoveryNode> failedNodes = new HashSet<>();
/** Leaving nodes (but still in topology). */
private Collection<TcpDiscoveryNode> leavingNodes = new HashSet<>();
/** If non-shared IP finder is used this flag shows whether IP finder contains local address. */
private boolean ipFinderHasLocAddr;
/** Addresses that do not respond during join requests send (for resolving concurrent start). */
private final Collection<SocketAddress> noResAddrs = new GridConcurrentHashSet<>();
/** Addresses that incoming join requests send were send from (for resolving concurrent start). */
private final Collection<SocketAddress> fromAddrs = new GridConcurrentHashSet<>();
/** Response on join request from coordinator (in case of duplicate ID or auth failure). */
private final GridTuple<TcpDiscoveryAbstractMessage> joinRes = F.t1();
/** Mutex. */
private final Object mux = new Object();
/** Discovery state. */
protected TcpDiscoverySpiState spiState = DISCONNECTED;
/** Map with proceeding ping requests. */
private final ConcurrentMap<InetSocketAddress, IgniteInternalFuture<IgniteBiTuple<UUID, Boolean>>> pingMap =
new ConcurrentHashMap8<>();
/**
* @param adapter Adapter.
*/
ServerImpl(TcpDiscoverySpi adapter) {
super(adapter);
}
/** {@inheritDoc} */
@Override public String getSpiState() {
synchronized (mux) {
return spiState.name();
}
}
/** {@inheritDoc} */
@Override public int getMessageWorkerQueueSize() {
return msgWorker.queueSize();
}
/** {@inheritDoc} */
@Nullable @Override public UUID getCoordinator() {
TcpDiscoveryNode crd = resolveCoordinator();
return crd != null ? crd.id() : null;
}
/** {@inheritDoc} */
@Nullable @Override public ClusterNode getNode(UUID nodeId) {
assert nodeId != null;
UUID locNodeId0 = getLocalNodeId();
if (locNodeId0 != null && locNodeId0.equals(nodeId))
// Return local node directly.
return locNode;
TcpDiscoveryNode node = ring.node(nodeId);
if (node != null && !node.visible())
return null;
return node;
}
/** {@inheritDoc} */
@Override public Collection<ClusterNode> getRemoteNodes() {
return F.upcast(ring.visibleRemoteNodes());
}
/** {@inheritDoc} */
@Override public void spiStart(String gridName) throws IgniteSpiException {
synchronized (mux) {
spiState = DISCONNECTED;
}
if (debugMode) {
if (!log.isInfoEnabled())
throw new IgniteSpiException("Info log level should be enabled for TCP discovery to work " +
"in debug mode.");
debugLog = new ConcurrentLinkedDeque<>();
U.quietAndWarn(log, "TCP discovery SPI is configured in debug mode.");
}
// Clear addresses collections.
fromAddrs.clear();
noResAddrs.clear();
msgWorker = new RingMessageWorker();
msgWorker.start();
tcpSrvr = new TcpServer();
spi.initLocalNode(tcpSrvr.port, true);
locNode = spi.locNode;
// Start TCP server thread after local node is initialized.
tcpSrvr.start();
ring.localNode(locNode);
if (spi.ipFinder.isShared())
registerLocalNodeAddress();
else {
if (F.isEmpty(spi.ipFinder.getRegisteredAddresses()))
throw new IgniteSpiException("Non-shared IP finder must have IP addresses specified in " +
"GridTcpDiscoveryIpFinder.getRegisteredAddresses() configuration property " +
"(specify list of IP addresses in configuration).");
ipFinderHasLocAddr = spi.ipFinderHasLocalAddress();
}
if (spi.getStatisticsPrintFrequency() > 0 && log.isInfoEnabled()) {
statsPrinter = new StatisticsPrinter();
statsPrinter.start();
}
spi.stats.onJoinStarted();
joinTopology();
spi.stats.onJoinFinished();
hbsSnd = new HeartbeatsSender();
hbsSnd.start();
chkStatusSnd = new CheckStatusSender();
chkStatusSnd.start();
if (spi.ipFinder.isShared()) {
ipFinderCleaner = new IpFinderCleaner();
ipFinderCleaner.start();
}
spi.printStartInfo();
}
/** {@inheritDoc} */
@Override public void onContextInitialized0(IgniteSpiContext spiCtx) throws IgniteSpiException {
spiCtx.registerPort(tcpSrvr.port, TCP);
}
/** {@inheritDoc} */
@Override public void spiStop() throws IgniteSpiException {
spiStop0(false);
}
/**
* Stops SPI finally or stops SPI for restart.
*
* @param disconnect {@code True} if SPI is being disconnected.
* @throws IgniteSpiException If failed.
*/
private void spiStop0(boolean disconnect) throws IgniteSpiException {
if (log.isDebugEnabled()) {
if (disconnect)
log.debug("Disconnecting SPI.");
else
log.debug("Preparing to start local node stop procedure.");
}
if (disconnect) {
synchronized (mux) {
spiState = DISCONNECTING;
}
}
if (msgWorker != null && msgWorker.isAlive() && !disconnect) {
// Send node left message only if it is final stop.
msgWorker.addMessage(new TcpDiscoveryNodeLeftMessage(locNode.id()));
synchronized (mux) {
long threshold = U.currentTimeMillis() + spi.netTimeout;
long timeout = spi.netTimeout;
while (spiState != LEFT && timeout > 0) {
try {
mux.wait(timeout);
timeout = threshold - U.currentTimeMillis();
}
catch (InterruptedException ignored) {
Thread.currentThread().interrupt();
break;
}
}
if (spiState == LEFT) {
if (log.isDebugEnabled())
log.debug("Verification for local node leave has been received from coordinator" +
" (continuing stop procedure).");
}
else if (log.isInfoEnabled()) {
log.info("No verification for local node leave has been received from coordinator" +
" (will stop node anyway).");
}
}
}
U.interrupt(tcpSrvr);
U.join(tcpSrvr, log);
Collection<SocketReader> tmp;
synchronized (mux) {
tmp = U.arrayList(readers);
}
U.interrupt(tmp);
U.joinThreads(tmp, log);
U.interrupt(hbsSnd);
U.join(hbsSnd, log);
U.interrupt(chkStatusSnd);
U.join(chkStatusSnd, log);
U.interrupt(ipFinderCleaner);
U.join(ipFinderCleaner, log);
U.interrupt(msgWorker);
U.join(msgWorker, log);
U.interrupt(statsPrinter);
U.join(statsPrinter, log);
Collection<TcpDiscoveryNode> rmts = null;
if (!disconnect)
spi.printStopInfo();
else {
spi.getSpiContext().deregisterPorts();
rmts = ring.visibleRemoteNodes();
}
long topVer = ring.topologyVersion();
ring.clear();
if (rmts != null && !rmts.isEmpty()) {
// This is restart/disconnection and remote nodes are not empty.
// We need to fire FAIL event for each.
DiscoverySpiListener lsnr = spi.lsnr;
if (lsnr != null) {
Collection<ClusterNode> processed = new HashSet<>();
for (TcpDiscoveryNode n : rmts) {
assert n.visible();
processed.add(n);
List<ClusterNode> top = U.arrayList(rmts, F.notIn(processed));
topVer++;
Map<Long, Collection<ClusterNode>> hist = updateTopologyHistory(topVer,
Collections.unmodifiableList(top));
lsnr.onDiscovery(EVT_NODE_FAILED, topVer, n, top, hist, null);
}
}
}
printStatistics();
spi.stats.clear();
synchronized (mux) {
// Clear stored data.
leavingNodes.clear();
failedNodes.clear();
spiState = DISCONNECTED;
}
}
/** {@inheritDoc} */
@Override public boolean pingNode(UUID nodeId) {
assert nodeId != null;
if (log.isDebugEnabled())
log.debug("Pinging node: " + nodeId + "]");
if (nodeId == getLocalNodeId())
return true;
TcpDiscoveryNode node = ring.node(nodeId);
if (node == null || !node.visible())
return false;
boolean res = pingNode(node);
if (!res && !node.isClient()) {
LT.warn(log, null, "Failed to ping node (status check will be initiated): " + nodeId);
msgWorker.addMessage(new TcpDiscoveryStatusCheckMessage(locNode, node.id()));
}
return res;
}
/**
* Pings the remote node to see if it's alive.
*
* @param node Node.
* @return {@code True} if ping succeeds.
*/
private boolean pingNode(TcpDiscoveryNode node) {
assert node != null;
if (node.id().equals(getLocalNodeId()))
return true;
UUID clientNodeId = null;
if (node.isClient()) {
clientNodeId = node.id();
node = ring.node(node.clientRouterNodeId());
if (node == null || !node.visible())
return false;
}
for (InetSocketAddress addr : spi.getNodeAddresses(node, U.sameMacs(locNode, node))) {
try {
// ID returned by the node should be the same as ID of the parameter for ping to succeed.
IgniteBiTuple<UUID, Boolean> t = pingNode(addr, clientNodeId);
boolean res = node.id().equals(t.get1()) && (clientNodeId == null || t.get2());
if (res)
node.lastSuccessfulAddress(addr);
return res;
}
catch (IgniteCheckedException e) {
if (log.isDebugEnabled())
log.debug("Failed to ping node [node=" + node + ", err=" + e.getMessage() + ']');
onException("Failed to ping node [node=" + node + ", err=" + e.getMessage() + ']', e);
// continue;
}
}
return false;
}
/**
* Pings the node by its address to see if it's alive.
*
* @param addr Address of the node.
* @param clientNodeId Client node ID.
* @return ID of the remote node and "client exists" flag if node alive.
* @throws IgniteCheckedException If an error occurs.
*/
private IgniteBiTuple<UUID, Boolean> pingNode(InetSocketAddress addr, @Nullable UUID clientNodeId)
throws IgniteCheckedException {
assert addr != null;
UUID locNodeId = getLocalNodeId();
if (F.contains(spi.locNodeAddrs, addr)) {
if (clientNodeId == null)
return F.t(getLocalNodeId(), false);
ClientMessageWorker clientWorker = clientMsgWorkers.get(clientNodeId);
if (clientWorker == null)
return F.t(getLocalNodeId(), false);
boolean clientPingRes;
try {
clientPingRes = clientWorker.ping();
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new IgniteInterruptedCheckedException(e);
}
return F.t(getLocalNodeId(), clientPingRes);
}
GridFutureAdapter<IgniteBiTuple<UUID, Boolean>> fut = new GridFutureAdapter<>();
IgniteInternalFuture<IgniteBiTuple<UUID, Boolean>> oldFut = pingMap.putIfAbsent(addr, fut);
if (oldFut != null)
return oldFut.get();
else {
Collection<Throwable> errs = null;
try {
Socket sock = null;
for (int i = 0; i < spi.reconCnt; i++) {
try {
if (addr.isUnresolved())
addr = new InetSocketAddress(InetAddress.getByName(addr.getHostName()), addr.getPort());
long tstamp = U.currentTimeMillis();
sock = spi.openSocket(addr);
spi.writeToSocket(sock, new TcpDiscoveryPingRequest(locNodeId, clientNodeId));
TcpDiscoveryPingResponse res = spi.readMessage(sock, null, spi.netTimeout);
if (locNodeId.equals(res.creatorNodeId())) {
if (log.isDebugEnabled())
log.debug("Ping response from local node: " + res);
break;
}
spi.stats.onClientSocketInitialized(U.currentTimeMillis() - tstamp);
IgniteBiTuple<UUID, Boolean> t = F.t(res.creatorNodeId(), res.clientExists());
fut.onDone(t);
return t;
}
catch (IOException | IgniteCheckedException e) {
if (errs == null)
errs = new ArrayList<>();
errs.add(e);
}
finally {
U.closeQuiet(sock);
}
}
}
catch (Throwable t) {
fut.onDone(t);
if (t instanceof Error)
throw t;
throw U.cast(t);
}
finally {
if (!fut.isDone())
fut.onDone(U.exceptionWithSuppressed("Failed to ping node by address: " + addr, errs));
boolean b = pingMap.remove(addr, fut);
assert b;
}
return fut.get();
}
}
/** {@inheritDoc} */
@Override public void disconnect() throws IgniteSpiException {
spiStop0(true);
}
/** {@inheritDoc} */
@Override public void sendCustomEvent(DiscoverySpiCustomMessage evt) {
try {
msgWorker.addMessage(new TcpDiscoveryCustomEventMessage(getLocalNodeId(), evt, spi.marsh.marshal(evt)));
}
catch (IgniteCheckedException e) {
throw new IgniteSpiException("Failed to marshal custom event: " + evt, e);
}
}
/** {@inheritDoc} */
@Override public void failNode(UUID nodeId, @Nullable String warning) {
TcpDiscoveryNode node = ring.node(nodeId);
if (node != null) {
TcpDiscoveryNodeFailedMessage msg = new TcpDiscoveryNodeFailedMessage(getLocalNodeId(),
node.id(),
node.internalOrder());
msg.warning(warning);
msgWorker.addMessage(msg);
}
}
/**
* Tries to join this node to topology.
*
* @throws IgniteSpiException If any error occurs.
*/
private void joinTopology() throws IgniteSpiException {
synchronized (mux) {
assert spiState == CONNECTING || spiState == DISCONNECTED;
spiState = CONNECTING;
}
SecurityCredentials locCred = (SecurityCredentials)locNode.getAttributes()
.get(IgniteNodeAttributes.ATTR_SECURITY_CREDENTIALS);
// Marshal credentials for backward compatibility and security.
marshalCredentials(locNode);
while (true) {
if (!sendJoinRequestMessage()) {
if (log.isDebugEnabled())
log.debug("Join request message has not been sent (local node is the first in the topology).");
if (spi.nodeAuth != null) {
// Authenticate local node.
try {
SecurityContext subj = spi.nodeAuth.authenticateNode(locNode, locCred);
if (subj == null)
throw new IgniteSpiException("Authentication failed for local node: " + locNode.id());
Map<String, Object> attrs = new HashMap<>(locNode.attributes());
attrs.put(IgniteNodeAttributes.ATTR_SECURITY_SUBJECT, spi.marsh.marshal(subj));
attrs.remove(IgniteNodeAttributes.ATTR_SECURITY_CREDENTIALS);
locNode.setAttributes(attrs);
}
catch (IgniteException | IgniteCheckedException e) {
throw new IgniteSpiException("Failed to authenticate local node (will shutdown local node).", e);
}
}
locNode.order(1);
locNode.internalOrder(1);
spi.gridStartTime = U.currentTimeMillis();
locNode.visible(true);
ring.clear();
ring.topologyVersion(1);
synchronized (mux) {
topHist.clear();
spiState = CONNECTED;
mux.notifyAll();
}
notifyDiscovery(EVT_NODE_JOINED, 1, locNode);
break;
}
if (log.isDebugEnabled())
log.debug("Join request message has been sent (waiting for coordinator response).");
synchronized (mux) {
long threshold = U.currentTimeMillis() + spi.netTimeout;
long timeout = spi.netTimeout;
while (spiState == CONNECTING && timeout > 0) {
try {
mux.wait(timeout);
timeout = threshold - U.currentTimeMillis();
}
catch (InterruptedException ignored) {
Thread.currentThread().interrupt();
throw new IgniteSpiException("Thread has been interrupted.");
}
}
if (spiState == CONNECTED)
break;
else if (spiState == DUPLICATE_ID)
throw spi.duplicateIdError((TcpDiscoveryDuplicateIdMessage)joinRes.get());
else if (spiState == AUTH_FAILED)
throw spi.authenticationFailedError((TcpDiscoveryAuthFailedMessage)joinRes.get());
else if (spiState == CHECK_FAILED)
throw spi.checkFailedError((TcpDiscoveryCheckFailedMessage)joinRes.get());
else if (spiState == LOOPBACK_PROBLEM) {
TcpDiscoveryLoopbackProblemMessage msg = (TcpDiscoveryLoopbackProblemMessage)joinRes.get();
boolean locHostLoopback = spi.locHost.isLoopbackAddress();
String firstNode = locHostLoopback ? "local" : "remote";
String secondNode = locHostLoopback ? "remote" : "local";
throw new IgniteSpiException("Failed to add node to topology because " + firstNode +
" node is configured to use loopback address, but " + secondNode + " node is not " +
"(consider changing 'localAddress' configuration parameter) " +
"[locNodeAddrs=" + U.addressesAsString(locNode) + ", rmtNodeAddrs=" +
U.addressesAsString(msg.addresses(), msg.hostNames()) + ']');
}
else
LT.warn(log, null, "Node has not been connected to topology and will repeat join process. " +
"Check remote nodes logs for possible error messages. " +
"Note that large topology may require significant time to start. " +
"Increase 'TcpDiscoverySpi.networkTimeout' configuration property " +
"if getting this message on the starting nodes [networkTimeout=" + spi.netTimeout + ']');
}
}
assert locNode.order() != 0;
assert locNode.internalOrder() != 0;
if (log.isDebugEnabled())
log.debug("Discovery SPI has been connected to topology with order: " + locNode.internalOrder());
}
/**
* Tries to send join request message to a random node presenting in topology.
* Address is provided by {@link org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder} and message is
* sent to first node connection succeeded to.
*
* @return {@code true} if send succeeded.
* @throws IgniteSpiException If any error occurs.
*/
@SuppressWarnings({"BusyWait"})
private boolean sendJoinRequestMessage() throws IgniteSpiException {
TcpDiscoveryAbstractMessage joinReq = new TcpDiscoveryJoinRequestMessage(locNode,
spi.collectExchangeData(getLocalNodeId()));
// Time when it has been detected, that addresses from IP finder do not respond.
long noResStart = 0;
while (true) {
Collection<InetSocketAddress> addrs = spi.resolvedAddresses();
if (F.isEmpty(addrs))
return false;
boolean retry = false;
Collection<Exception> errs = new ArrayList<>();
for (InetSocketAddress addr : addrs) {
try {
Integer res = sendMessageDirectly(joinReq, addr);
assert res != null;
noResAddrs.remove(addr);
// Address is responsive, reset period start.
noResStart = 0;
switch (res) {
case RES_WAIT:
// Concurrent startup, try sending join request again or wait if no success.
retry = true;
break;
case RES_OK:
if (log.isDebugEnabled())
log.debug("Join request message has been sent to address [addr=" + addr +
", req=" + joinReq + ']');
// Join request sending succeeded, wait for response from topology.
return true;
default:
// Concurrent startup, try next node.
if (res == RES_CONTINUE_JOIN) {
if (!fromAddrs.contains(addr))
retry = true;
}
else {
if (log.isDebugEnabled())
log.debug("Unexpected response to join request: " + res);
retry = true;
}
break;
}
}
catch (IgniteSpiException e) {
errs.add(e);
if (log.isDebugEnabled()) {
IOException ioe = X.cause(e, IOException.class);
log.debug("Failed to send join request message [addr=" + addr +
", msg=" + (ioe != null ? ioe.getMessage() : e.getMessage()) + ']');
onException("Failed to send join request message [addr=" + addr +
", msg=" + (ioe != null ? ioe.getMessage() : e.getMessage()) + ']', ioe);
}
noResAddrs.add(addr);
}
}
if (retry) {
if (log.isDebugEnabled())
log.debug("Concurrent discovery SPI start has been detected (local node should wait).");
try {
U.sleep(2000);
}
catch (IgniteInterruptedCheckedException e) {
throw new IgniteSpiException("Thread has been interrupted.", e);
}
}
else if (!spi.ipFinder.isShared() && !ipFinderHasLocAddr) {
IgniteCheckedException e = null;
if (!errs.isEmpty()) {
e = new IgniteCheckedException("Multiple connection attempts failed.");
for (Exception err : errs)
e.addSuppressed(err);
}
if (e != null && X.hasCause(e, ConnectException.class))
LT.warn(log, null, "Failed to connect to any address from IP finder " +
"(make sure IP finder addresses are correct and firewalls are disabled on all host machines): " +
addrs);
if (spi.joinTimeout > 0) {
if (noResStart == 0)
noResStart = U.currentTimeMillis();
else if (U.currentTimeMillis() - noResStart > spi.joinTimeout)
throw new IgniteSpiException(
"Failed to connect to any address from IP finder within join timeout " +
"(make sure IP finder addresses are correct, and operating system firewalls are disabled " +
"on all host machines, or consider increasing 'joinTimeout' configuration property): " +
addrs, e);
}
try {
U.sleep(2000);
}
catch (IgniteInterruptedCheckedException ex) {
throw new IgniteSpiException("Thread has been interrupted.", ex);
}
}
else
break;
}
return false;
}
/**
* Establishes connection to an address, sends message and returns the response (if any).
*
* @param msg Message to send.
* @param addr Address to send message to.
* @return Response read from the recipient or {@code null} if no response is supposed.
* @throws IgniteSpiException If an error occurs.
*/
@Nullable private Integer sendMessageDirectly(TcpDiscoveryAbstractMessage msg, InetSocketAddress addr)
throws IgniteSpiException {
assert msg != null;
assert addr != null;
Collection<Throwable> errs = null;
long ackTimeout0 = spi.ackTimeout;
int connectAttempts = 1;
boolean joinReqSent = false;
UUID locNodeId = getLocalNodeId();
for (int i = 0; i < spi.reconCnt; i++) {
// Need to set to false on each new iteration,
// since remote node may leave in the middle of the first iteration.
joinReqSent = false;
boolean openSock = false;
Socket sock = null;
try {
long tstamp = U.currentTimeMillis();
sock = spi.openSocket(addr);
openSock = true;
// Handshake.
spi.writeToSocket(sock, new TcpDiscoveryHandshakeRequest(locNodeId));
TcpDiscoveryHandshakeResponse res = spi.readMessage(sock, null, ackTimeout0);
if (locNodeId.equals(res.creatorNodeId())) {
if (log.isDebugEnabled())
log.debug("Handshake response from local node: " + res);
break;
}
spi.stats.onClientSocketInitialized(U.currentTimeMillis() - tstamp);
// Send message.
tstamp = U.currentTimeMillis();
spi.writeToSocket(sock, msg);
spi.stats.onMessageSent(msg, U.currentTimeMillis() - tstamp);
if (debugMode)
debugLog("Message has been sent directly to address [msg=" + msg + ", addr=" + addr +
", rmtNodeId=" + res.creatorNodeId() + ']');
if (log.isDebugEnabled())
log.debug("Message has been sent directly to address [msg=" + msg + ", addr=" + addr +
", rmtNodeId=" + res.creatorNodeId() + ']');
// Connection has been established, but
// join request may not be unmarshalled on remote host.
// E.g. due to class not found issue.
joinReqSent = msg instanceof TcpDiscoveryJoinRequestMessage;
return spi.readReceipt(sock, ackTimeout0);
}
catch (ClassCastException e) {
// This issue is rarely reproducible on AmazonEC2, but never
// on dedicated machines.
if (log.isDebugEnabled())
U.error(log, "Class cast exception on direct send: " + addr, e);
onException("Class cast exception on direct send: " + addr, e);
if (errs == null)
errs = new ArrayList<>();
errs.add(e);
}
catch (IOException | IgniteCheckedException e) {
if (log.isDebugEnabled())
log.error("Exception on direct send: " + e.getMessage(), e);
onException("Exception on direct send: " + e.getMessage(), e);
if (errs == null)
errs = new ArrayList<>();
errs.add(e);
if (!openSock) {
// Reconnect for the second time, if connection is not established.
if (connectAttempts < 2) {
connectAttempts++;
continue;
}
break; // Don't retry if we can not establish connection.
}
if (e instanceof SocketTimeoutException || X.hasCause(e, SocketTimeoutException.class)) {
ackTimeout0 *= 2;
if (!checkAckTimeout(ackTimeout0))
break;
}
}
finally {
U.closeQuiet(sock);
}
}
if (joinReqSent) {
if (log.isDebugEnabled())
log.debug("Join request has been sent, but receipt has not been read (returning RES_WAIT).");
// Topology will not include this node,
// however, warning on timed out join will be output.
return RES_OK;
}
throw new IgniteSpiException(
"Failed to send message to address [addr=" + addr + ", msg=" + msg + ']',
U.exceptionWithSuppressed("Failed to send message to address " +
"[addr=" + addr + ", msg=" + msg + ']', errs));
}
/**
* Marshalls credentials with discovery SPI marshaller (will replace attribute value).
*
* @param node Node to marshall credentials for.
* @throws IgniteSpiException If marshalling failed.
*/
private void marshalCredentials(TcpDiscoveryNode node) throws IgniteSpiException {
try {
// Use security-unsafe getter.
Map<String, Object> attrs = new HashMap<>(node.getAttributes());
attrs.put(IgniteNodeAttributes.ATTR_SECURITY_CREDENTIALS,
spi.marsh.marshal(attrs.get(IgniteNodeAttributes.ATTR_SECURITY_CREDENTIALS)));
node.setAttributes(attrs);
}
catch (IgniteCheckedException e) {
throw new IgniteSpiException("Failed to marshal node security credentials: " + node.id(), e);
}
}
/**
* Unmarshalls credentials with discovery SPI marshaller (will not replace attribute value).
*
* @param node Node to unmarshall credentials for.
* @return Security credentials.
* @throws IgniteSpiException If unmarshal fails.
*/
private SecurityCredentials unmarshalCredentials(TcpDiscoveryNode node) throws IgniteSpiException {
try {
byte[] credBytes = (byte[])node.getAttributes().get(IgniteNodeAttributes.ATTR_SECURITY_CREDENTIALS);
if (credBytes == null)
return null;
return spi.marsh.unmarshal(credBytes, null);
}
catch (IgniteCheckedException e) {
throw new IgniteSpiException("Failed to unmarshal node security credentials: " + node.id(), e);
}
}
/**
* Notify external listener on discovery event.
*
* @param type Discovery event type. See {@link org.apache.ignite.events.DiscoveryEvent} for more details.
* @param topVer Topology version.
* @param node Remote node this event is connected with.
*/
private void notifyDiscovery(int type, long topVer, TcpDiscoveryNode node) {
assert type > 0;
assert node != null;
DiscoverySpiListener lsnr = spi.lsnr;
TcpDiscoverySpiState spiState = spiStateCopy();
if (lsnr != null && node.visible() && (spiState == CONNECTED || spiState == DISCONNECTING)) {
if (log.isDebugEnabled())
log.debug("Discovery notification [node=" + node + ", spiState=" + spiState +
", type=" + U.gridEventName(type) + ", topVer=" + topVer + ']');
Collection<ClusterNode> top = F.upcast(ring.visibleNodes());
Map<Long, Collection<ClusterNode>> hist = updateTopologyHistory(topVer, top);
lsnr.onDiscovery(type, topVer, node, top, hist, null);
}
else if (log.isDebugEnabled())
log.debug("Skipped discovery notification [node=" + node + ", spiState=" + spiState +
", type=" + U.gridEventName(type) + ", topVer=" + topVer + ']');
}
/**
* Update topology history with new topology snapshots.
*
* @param topVer Topology version.
* @param top Topology snapshot.
* @return Copy of updated topology history.
*/
@Nullable private Map<Long, Collection<ClusterNode>> updateTopologyHistory(long topVer, Collection<ClusterNode> top) {
synchronized (mux) {
if (topHist.containsKey(topVer))
return null;
topHist.put(topVer, top);
while (topHist.size() > spi.topHistSize)
topHist.remove(topHist.firstKey());
if (log.isDebugEnabled())
log.debug("Added topology snapshot to history, topVer=" + topVer + ", historySize=" + topHist.size());
return new TreeMap<>(topHist);
}
}
/**
* Checks whether local node is coordinator. Nodes that are leaving or failed
* (but are still in topology) are removed from search.
*
* @return {@code true} if local node is coordinator.
*/
private boolean isLocalNodeCoordinator() {
synchronized (mux) {
boolean crd = spiState == CONNECTED && locNode.equals(resolveCoordinator());
if (crd)
spi.stats.onBecomingCoordinator();
return crd;
}
}
/**
* @return Spi state copy.
*/
private TcpDiscoverySpiState spiStateCopy() {
TcpDiscoverySpiState state;
synchronized (mux) {
state = spiState;
}
return state;
}
/**
* Resolves coordinator. Nodes that are leaving or failed (but are still in
* topology) are removed from search.
*
* @return Coordinator node or {@code null} if there are no coordinator
* (i.e. local node is the last one and is currently stopping).
*/
@Nullable private TcpDiscoveryNode resolveCoordinator() {
return resolveCoordinator(null);
}
/**
* Resolves coordinator. Nodes that are leaving or failed (but are still in
* topology) are removed from search as well as provided filter.
*
* @param filter Nodes to exclude when resolving coordinator (optional).
* @return Coordinator node or {@code null} if there are no coordinator
* (i.e. local node is the last one and is currently stopping).
*/
@Nullable private TcpDiscoveryNode resolveCoordinator(
@Nullable Collection<TcpDiscoveryNode> filter) {
synchronized (mux) {
Collection<TcpDiscoveryNode> excluded = F.concat(false, failedNodes, leavingNodes);
if (!F.isEmpty(filter))
excluded = F.concat(false, excluded, filter);
return ring.coordinator(excluded);
}
}
/**
* Prints SPI statistics.
*/
private void printStatistics() {
if (log.isInfoEnabled() && spi.statsPrintFreq > 0) {
int failedNodesSize;
int leavingNodesSize;
synchronized (mux) {
failedNodesSize = failedNodes.size();
leavingNodesSize = leavingNodes.size();
}
Runtime runtime = Runtime.getRuntime();
TcpDiscoveryNode coord = resolveCoordinator();
log.info("Discovery SPI statistics [statistics=" + spi.stats + ", spiState=" + spiStateCopy() +
", coord=" + coord +
", topSize=" + ring.allNodes().size() +
", leavingNodesSize=" + leavingNodesSize + ", failedNodesSize=" + failedNodesSize +
", msgWorker.queue.size=" + (msgWorker != null ? msgWorker.queueSize() : "N/A") +
", lastUpdate=" + (locNode != null ? U.format(locNode.lastUpdateTime()) : "N/A") +
", heapFree=" + runtime.freeMemory() / (1024 * 1024) +
"M, heapTotal=" + runtime.maxMemory() / (1024 * 1024) + "M]");
}
}
/**
* @param msg Message to prepare.
* @param destNodeId Destination node ID.
* @param msgs Messages to include.
* @param discardMsgId Discarded message ID.
*/
private void prepareNodeAddedMessage(TcpDiscoveryAbstractMessage msg, UUID destNodeId,
@Nullable Collection<TcpDiscoveryAbstractMessage> msgs, @Nullable IgniteUuid discardMsgId) {
assert destNodeId != null;
if (msg instanceof TcpDiscoveryNodeAddedMessage) {
TcpDiscoveryNodeAddedMessage nodeAddedMsg = (TcpDiscoveryNodeAddedMessage)msg;
TcpDiscoveryNode node = nodeAddedMsg.node();
if (node.id().equals(destNodeId)) {
Collection<TcpDiscoveryNode> allNodes = ring.allNodes();
Collection<TcpDiscoveryNode> topToSnd = new ArrayList<>(allNodes.size());
for (TcpDiscoveryNode n0 : allNodes) {
assert n0.internalOrder() != 0 : n0;
// Skip next node and nodes added after next
// in case this message is resent due to failures/leaves.
// There will be separate messages for nodes with greater
// internal order.
if (n0.internalOrder() < nodeAddedMsg.node().internalOrder())
topToSnd.add(n0);
}
nodeAddedMsg.topology(topToSnd);
nodeAddedMsg.messages(msgs, discardMsgId);
Map<Long, Collection<ClusterNode>> hist;
synchronized (mux) {
hist = new TreeMap<>(topHist);
}
nodeAddedMsg.topologyHistory(hist);
}
}
}
/**
* @param msg Message to clear.
*/
private void clearNodeAddedMessage(TcpDiscoveryAbstractMessage msg) {
if (msg instanceof TcpDiscoveryNodeAddedMessage) {
// Nullify topology before registration.
TcpDiscoveryNodeAddedMessage nodeAddedMsg = (TcpDiscoveryNodeAddedMessage)msg;
nodeAddedMsg.topology(null);
nodeAddedMsg.topologyHistory(null);
nodeAddedMsg.messages(null, null);
}
}
/** {@inheritDoc} */
@Override void simulateNodeFailure() {
U.warn(log, "Simulating node failure: " + getLocalNodeId());
U.interrupt(tcpSrvr);
U.join(tcpSrvr, log);
U.interrupt(hbsSnd);
U.join(hbsSnd, log);
U.interrupt(chkStatusSnd);
U.join(chkStatusSnd, log);
U.interrupt(ipFinderCleaner);
U.join(ipFinderCleaner, log);
Collection<SocketReader> tmp;
synchronized (mux) {
tmp = U.arrayList(readers);
}
U.interrupt(tmp);
U.joinThreads(tmp, log);
U.interrupt(msgWorker);
U.join(msgWorker, log);
U.interrupt(statsPrinter);
U.join(statsPrinter, log);
}
/** {@inheritDoc} */
@Override public void brakeConnection() {
throw new UnsupportedOperationException();
}
/** {@inheritDoc} */
@Override protected IgniteSpiThread workerThread() {
return msgWorker;
}
/**
* <strong>FOR TEST ONLY!!!</strong>
* <p>
* Simulates situation when next node is still alive but is bypassed
* since it has been excluded from the ring, possibly, due to short time
* network problems.
* <p>
* This method is intended for test purposes only.
*/
void forceNextNodeFailure() {
U.warn(log, "Next node will be forcibly failed (if any).");
TcpDiscoveryNode next;
synchronized (mux) {
next = ring.nextNode(failedNodes);
}
if (next != null)
msgWorker.addMessage(new TcpDiscoveryNodeFailedMessage(getLocalNodeId(), next.id(),
next.internalOrder()));
}
/**
* <strong>FOR TEST ONLY!!!</strong>
* <p>
* This method is intended for test purposes only.
*
* @return Nodes ring.
*/
TcpDiscoveryNodesRing ring() {
return ring;
}
/** {@inheritDoc} */
@Override public void dumpDebugInfo(IgniteLogger log) {
if (!debugMode) {
U.quietAndWarn(log, "Failed to dump debug info (discovery SPI was not configured " +
"in debug mode, consider setting 'debugMode' configuration property to 'true').");
return;
}
assert log.isInfoEnabled();
synchronized (mux) {
StringBuilder b = new StringBuilder(U.nl());
b.append(">>>").append(U.nl());
b.append(">>>").append("Dumping discovery SPI debug info.").append(U.nl());
b.append(">>>").append(U.nl());
b.append("Local node ID: ").append(getLocalNodeId()).append(U.nl()).append(U.nl());
b.append("Local node: ").append(locNode).append(U.nl()).append(U.nl());
b.append("SPI state: ").append(spiState).append(U.nl()).append(U.nl());
b.append("Internal threads: ").append(U.nl());
b.append(" Message worker: ").append(threadStatus(msgWorker)).append(U.nl());
b.append(" Check status sender: ").append(threadStatus(chkStatusSnd)).append(U.nl());
b.append(" HB sender: ").append(threadStatus(hbsSnd)).append(U.nl());
b.append(" IP finder cleaner: ").append(threadStatus(ipFinderCleaner)).append(U.nl());
b.append(" Stats printer: ").append(threadStatus(statsPrinter)).append(U.nl());
b.append(U.nl());
b.append("Socket readers: ").append(U.nl());
for (SocketReader rdr : readers)
b.append(" ").append(rdr).append(U.nl());
b.append(U.nl());
b.append("In-memory log messages: ").append(U.nl());
for (String msg : debugLog)
b.append(" ").append(msg).append(U.nl());
b.append(U.nl());
b.append("Leaving nodes: ").append(U.nl());
for (TcpDiscoveryNode node : leavingNodes)
b.append(" ").append(node.id()).append(U.nl());
b.append(U.nl());
b.append("Failed nodes: ").append(U.nl());
for (TcpDiscoveryNode node : failedNodes)
b.append(" ").append(node.id()).append(U.nl());
b.append(U.nl());
b.append("Stats: ").append(spi.stats).append(U.nl());
U.quietAndInfo(log, b.toString());
}
}
/**
* @param msg Message.
* @return {@code True} if recordable in debug mode.
*/
private boolean recordable(TcpDiscoveryAbstractMessage msg) {
return !(msg instanceof TcpDiscoveryHeartbeatMessage) &&
!(msg instanceof TcpDiscoveryStatusCheckMessage) &&
!(msg instanceof TcpDiscoveryDiscardMessage);
}
/**
* Checks if two given {@link SecurityPermissionSet} objects contain the same permissions.
* Each permission belongs to one of three groups : cache, task or system.
*
* @param locPerms The first set of permissions.
* @param rmtPerms The second set of permissions.
* @return {@code True} if given parameters contain the same permissions, {@code False} otherwise.
*/
private boolean permissionsEqual(SecurityPermissionSet locPerms, SecurityPermissionSet rmtPerms) {
boolean dfltAllowMatch = !(locPerms.defaultAllowAll() ^ rmtPerms.defaultAllowAll());
boolean bothHaveSamePerms = F.eqNotOrdered(rmtPerms.systemPermissions(), locPerms.systemPermissions()) &&
F.eqNotOrdered(rmtPerms.cachePermissions(), locPerms.cachePermissions()) &&
F.eqNotOrdered(rmtPerms.taskPermissions(), locPerms.taskPermissions());
return dfltAllowMatch && bothHaveSamePerms;
}
/**
* @param msg Message.
* @param nodeId Node ID.
*/
private static void removeMetrics(TcpDiscoveryHeartbeatMessage msg, UUID nodeId) {
msg.removeMetrics(nodeId);
msg.removeCacheMetrics(nodeId);
}
/** {@inheritDoc} */
@Override public String toString() {
return S.toString(ServerImpl.class, this);
}
/**
* Thread that sends heartbeats.
*/
private class HeartbeatsSender extends IgniteSpiThread {
/**
* Constructor.
*/
private HeartbeatsSender() {
super(spi.ignite().name(), "tcp-disco-hb-sender", log);
setPriority(spi.threadPri);
}
/** {@inheritDoc} */
@SuppressWarnings("BusyWait")
@Override protected void body() throws InterruptedException {
while (!isLocalNodeCoordinator())
Thread.sleep(1000);
if (log.isDebugEnabled())
log.debug("Heartbeats sender has been started.");
while (!isInterrupted()) {
if (spiStateCopy() != CONNECTED) {
if (log.isDebugEnabled())
log.debug("Stopping heartbeats sender (SPI is not connected to topology).");
return;
}
TcpDiscoveryHeartbeatMessage msg = new TcpDiscoveryHeartbeatMessage(getLocalNodeId());
msg.verify(getLocalNodeId());
msgWorker.addMessage(msg);
Thread.sleep(spi.hbFreq);
}
}
}
/**
* Thread that sends status check messages to next node if local node has not
* been receiving heartbeats ({@link TcpDiscoveryHeartbeatMessage})
* for {@link TcpDiscoverySpi#getMaxMissedHeartbeats()} *
* {@link TcpDiscoverySpi#getHeartbeatFrequency()}.
*/
private class CheckStatusSender extends IgniteSpiThread {
/**
* Constructor.
*/
private CheckStatusSender() {
super(spi.ignite().name(), "tcp-disco-status-check-sender", log);
setPriority(spi.threadPri);
}
/** {@inheritDoc} */
@SuppressWarnings("BusyWait")
@Override protected void body() throws InterruptedException {
if (log.isDebugEnabled())
log.debug("Status check sender has been started.");
// Only 1 heartbeat missing is acceptable. Add 50 ms to avoid false alarm.
long checkTimeout = (long)spi.maxMissedHbs * spi.hbFreq + 50;
long lastSent = 0;
while (!isInterrupted()) {
// 1. Determine timeout.
if (lastSent < locNode.lastUpdateTime())
lastSent = locNode.lastUpdateTime();
long timeout = (lastSent + checkTimeout) - U.currentTimeMillis();
if (timeout > 0)
Thread.sleep(timeout);
// 2. Check if SPI is still connected.
if (spiStateCopy() != CONNECTED) {
if (log.isDebugEnabled())
log.debug("Stopping status check sender (SPI is not connected to topology).");
return;
}
// 3. Was there an update?
if (locNode.lastUpdateTime() > lastSent || !ring.hasRemoteNodes()) {
if (log.isDebugEnabled())
log.debug("Skipping status check send " +
"[locNodeLastUpdate=" + U.format(locNode.lastUpdateTime()) +
", hasRmts=" + ring.hasRemoteNodes() + ']');
continue;
}
// 4. Send status check message.
lastSent = U.currentTimeMillis();
msgWorker.addMessage(new TcpDiscoveryStatusCheckMessage(locNode, null));
}
}
}
/**
* Thread that cleans IP finder and keeps it in the correct state, unregistering
* addresses of the nodes that has left the topology.
* <p>
* This thread should run only on coordinator node and will clean IP finder
* if and only if {@link org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder#isShared()} is {@code true}.
*/
private class IpFinderCleaner extends IgniteSpiThread {
/**
* Constructor.
*/
private IpFinderCleaner() {
super(spi.ignite().name(), "tcp-disco-ip-finder-cleaner", log);
setPriority(spi.threadPri);
}
/** {@inheritDoc} */
@SuppressWarnings("BusyWait")
@Override protected void body() throws InterruptedException {
if (log.isDebugEnabled())
log.debug("IP finder cleaner has been started.");
while (!isInterrupted()) {
Thread.sleep(spi.ipFinderCleanFreq);
if (!isLocalNodeCoordinator())
continue;
if (spiStateCopy() != CONNECTED) {
if (log.isDebugEnabled())
log.debug("Stopping IP finder cleaner (SPI is not connected to topology).");
return;
}
if (spi.ipFinder.isShared())
cleanIpFinder();
}
}
/**
* Cleans IP finder.
*/
private void cleanIpFinder() {
assert spi.ipFinder.isShared();
try {
// Addresses that belongs to nodes in topology.
Collection<InetSocketAddress> currAddrs = F.flatCollections(
F.viewReadOnly(
ring.allNodes(),
new C1<TcpDiscoveryNode, Collection<InetSocketAddress>>() {
@Override public Collection<InetSocketAddress> apply(TcpDiscoveryNode node) {
return !node.isClient() ? spi.getNodeAddresses(node) :
Collections.<InetSocketAddress>emptyList();
}
}
)
);
// Addresses registered in IP finder.
Collection<InetSocketAddress> regAddrs = spi.registeredAddresses();
// Remove all addresses that belong to alive nodes, leave dead-node addresses.
Collection<InetSocketAddress> rmvAddrs = F.view(
regAddrs,
F.notContains(currAddrs),
new P1<InetSocketAddress>() {
private final Map<InetSocketAddress, Boolean> pingResMap = new HashMap<>();
@Override public boolean apply(InetSocketAddress addr) {
Boolean res = pingResMap.get(addr);
if (res == null) {
try {
res = pingNode(addr, null).get1() != null;
}
catch (IgniteCheckedException e) {
if (log.isDebugEnabled())
log.debug("Failed to ping node [addr=" + addr +
", err=" + e.getMessage() + ']');
res = false;
}
finally {
pingResMap.put(addr, res);
}
}
return !res;
}
}
);
// Unregister dead-nodes addresses.
if (!rmvAddrs.isEmpty()) {
spi.ipFinder.unregisterAddresses(rmvAddrs);
if (log.isDebugEnabled())
log.debug("Unregistered addresses from IP finder: " + rmvAddrs);
}
// Addresses that were removed by mistake (e.g. on segmentation).
Collection<InetSocketAddress> missingAddrs = F.view(
currAddrs,
F.notContains(regAddrs)
);
// Re-register missing addresses.
if (!missingAddrs.isEmpty()) {
spi.ipFinder.registerAddresses(missingAddrs);
if (log.isDebugEnabled())
log.debug("Registered missing addresses in IP finder: " + missingAddrs);
}
}
catch (IgniteSpiException e) {
LT.error(log, e, "Failed to clean IP finder up.");
}
}
}
/**
* Discovery messages history used for client reconnect.
*/
private class EnsuredMessageHistory {
/** */
private static final int MAX = 1024;
/** Pending messages. */
private final ArrayDeque<TcpDiscoveryAbstractMessage> msgs = new ArrayDeque<>(MAX * 2);
/**
* @param msg Adds message.
*/
void add(TcpDiscoveryAbstractMessage msg) {
assert spi.ensured(msg) : msg;
msgs.addLast(msg);
while (msgs.size() > MAX)
msgs.pollFirst();
}
/**
* Gets messages starting from provided ID (exclusive). If such
* message is not found, {@code null} is returned (this indicates
* a failure condition when it was already removed from queue).
*
* @param lastMsgId Last message ID received on client. {@code Null} if client did not finish connect procedure.
* @param node Client node.
* @return Collection of messages.
*/
@Nullable Collection<TcpDiscoveryAbstractMessage> messages(@Nullable IgniteUuid lastMsgId,
TcpDiscoveryNode node)
{
assert node != null && node.isClient() : node;
if (lastMsgId == null) {
// Client connection failed before it received TcpDiscoveryNodeAddedMessage.
List<TcpDiscoveryAbstractMessage> res = null;
for (TcpDiscoveryAbstractMessage msg : msgs) {
if (msg instanceof TcpDiscoveryNodeAddedMessage) {
if (node.id().equals(((TcpDiscoveryNodeAddedMessage) msg).node().id()))
res = new ArrayList<>(msgs.size());
}
if (res != null)
res.add(prepare(msg, node.id()));
}
if (log.isDebugEnabled()) {
if (res == null)
log.debug("Failed to find node added message [node=" + node + ']');
else
log.debug("Found add added message [node=" + node + ", hist=" + res + ']');
}
return res;
}
else {
if (msgs.isEmpty())
return Collections.emptyList();
Collection<TcpDiscoveryAbstractMessage> cp = new ArrayList<>(msgs.size());
boolean skip = true;
for (TcpDiscoveryAbstractMessage msg : msgs) {
if (skip) {
if (msg.id().equals(lastMsgId))
skip = false;
}
else
cp.add(prepare(msg, node.id()));
}
cp = !skip ? cp : null;
if (log.isDebugEnabled()) {
if (cp == null)
log.debug("Failed to find messages history [node=" + node + ", lastMsgId" + lastMsgId + ']');
else
log.debug("Found messages history [node=" + node + ", hist=" + cp + ']');
}
return cp;
}
}
/**
* @param msg Message.
* @param destNodeId Client node ID.
* @return Prepared message.
*/
private TcpDiscoveryAbstractMessage prepare(TcpDiscoveryAbstractMessage msg, UUID destNodeId) {
if (msg instanceof TcpDiscoveryNodeAddedMessage)
prepareNodeAddedMessage(msg, destNodeId, null, null);
return msg;
}
}
/**
* Pending messages container.
*/
private static class PendingMessages {
/** */
private static final int MAX = 1024;
/** Pending messages. */
private final Queue<TcpDiscoveryAbstractMessage> msgs = new ArrayDeque<>(MAX * 2);
/** Discarded message ID. */
private IgniteUuid discardId;
/**
* Adds pending message and shrinks queue if it exceeds limit
* (messages that were not discarded yet are never removed).
*
* @param msg Message to add.
*/
void add(TcpDiscoveryAbstractMessage msg) {
msgs.add(msg);
while (msgs.size() > MAX) {
TcpDiscoveryAbstractMessage polled = msgs.poll();
assert polled != null;
if (polled.id().equals(discardId))
break;
}
}
/**
* Resets pending messages.
*
* @param msgs Message.
* @param discardId Discarded message ID.
*/
void reset(@Nullable Collection<TcpDiscoveryAbstractMessage> msgs, @Nullable IgniteUuid discardId) {
this.msgs.clear();
if (msgs != null)
this.msgs.addAll(msgs);
this.discardId = discardId;
}
/**
* Clears pending messages.
*/
void clear() {
msgs.clear();
discardId = null;
}
/**
* Discards message with provided ID and all before it.
*
* @param id Discarded message ID.
*/
void discard(IgniteUuid id) {
discardId = id;
}
}
/**
* Message worker thread for messages processing.
*/
private class RingMessageWorker extends MessageWorkerAdapter {
/** Next node. */
@SuppressWarnings({"FieldAccessedSynchronizedAndUnsynchronized"})
private TcpDiscoveryNode next;
/** Pending messages. */
private final PendingMessages pendingMsgs = new PendingMessages();
/** Messages history used for client reconnect. */
private final EnsuredMessageHistory msgHist = new EnsuredMessageHistory();
/** Last message that updated topology. */
private TcpDiscoveryAbstractMessage lastMsg;
/** Force pending messages send. */
private boolean forceSndPending;
/** Socket. */
private Socket sock;
/**
*/
protected RingMessageWorker() {
super("tcp-disco-msg-worker");
}
/**
* @param msg Message to process.
*/
@Override protected void processMessage(TcpDiscoveryAbstractMessage msg) {
if (log.isDebugEnabled())
log.debug("Processing message [cls=" + msg.getClass().getSimpleName() + ", id=" + msg.id() + ']');
if (debugMode)
debugLog("Processing message [cls=" + msg.getClass().getSimpleName() + ", id=" + msg.id() + ']');
spi.stats.onMessageProcessingStarted(msg);
if (msg instanceof TcpDiscoveryJoinRequestMessage)
processJoinRequestMessage((TcpDiscoveryJoinRequestMessage)msg);
else if (msg instanceof TcpDiscoveryClientReconnectMessage)
processClientReconnectMessage((TcpDiscoveryClientReconnectMessage)msg);
else if (msg instanceof TcpDiscoveryNodeAddedMessage)
processNodeAddedMessage((TcpDiscoveryNodeAddedMessage)msg);
else if (msg instanceof TcpDiscoveryNodeAddFinishedMessage)
processNodeAddFinishedMessage((TcpDiscoveryNodeAddFinishedMessage)msg);
else if (msg instanceof TcpDiscoveryNodeLeftMessage)
processNodeLeftMessage((TcpDiscoveryNodeLeftMessage)msg);
else if (msg instanceof TcpDiscoveryNodeFailedMessage)
processNodeFailedMessage((TcpDiscoveryNodeFailedMessage)msg);
else if (msg instanceof TcpDiscoveryClientHeartbeatMessage)
processClientHeartbeatMessage((TcpDiscoveryClientHeartbeatMessage)msg);
else if (msg instanceof TcpDiscoveryHeartbeatMessage)
processHeartbeatMessage((TcpDiscoveryHeartbeatMessage)msg);
else if (msg instanceof TcpDiscoveryStatusCheckMessage)
processStatusCheckMessage((TcpDiscoveryStatusCheckMessage)msg);
else if (msg instanceof TcpDiscoveryDiscardMessage)
processDiscardMessage((TcpDiscoveryDiscardMessage)msg);
else if (msg instanceof TcpDiscoveryCustomEventMessage)
processCustomMessage((TcpDiscoveryCustomEventMessage)msg);
else if (msg instanceof TcpDiscoveryClientPingRequest)
processClientPingRequest((TcpDiscoveryClientPingRequest)msg);
else
assert false : "Unknown message type: " + msg.getClass().getSimpleName();
if (spi.ensured(msg))
msgHist.add(msg);
spi.stats.onMessageProcessingFinished(msg);
}
/**
* Sends message across the ring.
*
* @param msg Message to send
*/
@SuppressWarnings({"BreakStatementWithLabel", "LabeledStatement", "ContinueStatementWithLabel"})
private void sendMessageAcrossRing(TcpDiscoveryAbstractMessage msg) {
assert msg != null;
assert ring.hasRemoteNodes();
for (IgniteInClosure<TcpDiscoveryAbstractMessage> msgLsnr : spi.sendMsgLsnrs)
msgLsnr.apply(msg);
if (redirectToClients(msg)) {
byte[] marshalledMsg = null;
for (ClientMessageWorker clientMsgWorker : clientMsgWorkers.values()) {
// Send a clone to client to avoid ConcurrentModificationException
TcpDiscoveryAbstractMessage msgClone;
try {
if (marshalledMsg == null)
marshalledMsg = spi.marsh.marshal(msg);
msgClone = spi.marsh.unmarshal(marshalledMsg, null);
}
catch (IgniteCheckedException e) {
U.error(log, "Failed to marshal message: " + msg, e);
msgClone = msg;
}
clientMsgWorker.addMessage(msgClone);
}
}
Collection<TcpDiscoveryNode> failedNodes;
TcpDiscoverySpiState state;
synchronized (mux) {
failedNodes = U.arrayList(ServerImpl.this.failedNodes);
state = spiState;
}
Collection<Throwable> errs = null;
boolean sent = false;
boolean searchNext = true;
UUID locNodeId = getLocalNodeId();
while (true) {
if (searchNext) {
TcpDiscoveryNode newNext = ring.nextNode(failedNodes);
if (newNext == null) {
if (log.isDebugEnabled())
log.debug("No next node in topology.");
if (debugMode)
debugLog("No next node in topology.");
if (ring.hasRemoteNodes()) {
msg.senderNodeId(locNodeId);
addMessage(msg);
}
break;
}
if (!newNext.equals(next)) {
if (log.isDebugEnabled())
log.debug("New next node [newNext=" + newNext + ", formerNext=" + next +
", ring=" + ring + ", failedNodes=" + failedNodes + ']');
if (debugMode)
debugLog("New next node [newNext=" + newNext + ", formerNext=" + next +
", ring=" + ring + ", failedNodes=" + failedNodes + ']');
U.closeQuiet(sock);
sock = null;
next = newNext;
}
else if (log.isDebugEnabled())
log.debug("Next node remains the same [nextId=" + next.id() +
", nextOrder=" + next.internalOrder() + ']');
}
// Flag that shows whether next node exists and accepts incoming connections.
boolean nextNodeExists = sock != null;
final boolean sameHost = U.sameMacs(locNode, next);
List<InetSocketAddress> locNodeAddrs = U.arrayList(locNode.socketAddresses());
addr: for (InetSocketAddress addr : spi.getNodeAddresses(next, sameHost)) {
long ackTimeout0 = spi.ackTimeout;
if (locNodeAddrs.contains(addr)){
if (log.isDebugEnabled())
log.debug("Skip to send message to the local node (probably remote node has the same " +
"loopback address that local node): " + addr);
continue;
}
for (int i = 0; i < spi.reconCnt; i++) {
if (sock == null) {
nextNodeExists = false;
boolean success = false;
boolean openSock = false;
// Restore ring.
try {
long tstamp = U.currentTimeMillis();
sock = spi.openSocket(addr);
openSock = true;
// Handshake.
writeToSocket(sock, new TcpDiscoveryHandshakeRequest(locNodeId));
TcpDiscoveryHandshakeResponse res = spi.readMessage(sock, null, ackTimeout0);
if (locNodeId.equals(res.creatorNodeId())) {
if (log.isDebugEnabled())
log.debug("Handshake response from local node: " + res);
U.closeQuiet(sock);
sock = null;
break;
}
spi.stats.onClientSocketInitialized(U.currentTimeMillis() - tstamp);
UUID nextId = res.creatorNodeId();
long nextOrder = res.order();
if (!next.id().equals(nextId)) {
// Node with different ID has bounded to the same port.
if (log.isDebugEnabled())
log.debug("Failed to restore ring because next node ID received is not as " +
"expected [expectedId=" + next.id() + ", rcvdId=" + nextId + ']');
if (debugMode)
debugLog("Failed to restore ring because next node ID received is not as " +
"expected [expectedId=" + next.id() + ", rcvdId=" + nextId + ']');
break;
}
else {
// ID is as expected. Check node order.
if (nextOrder != next.internalOrder()) {
// Is next currently being added?
boolean nextNew = (msg instanceof TcpDiscoveryNodeAddedMessage &&
((TcpDiscoveryNodeAddedMessage)msg).node().id().equals(nextId));
if (!nextNew)
nextNew = hasPendingAddMessage(nextId);
if (!nextNew) {
if (log.isDebugEnabled())
log.debug("Failed to restore ring because next node order received " +
"is not as expected [expected=" + next.internalOrder() +
", rcvd=" + nextOrder + ", id=" + next.id() + ']');
if (debugMode)
debugLog("Failed to restore ring because next node order received " +
"is not as expected [expected=" + next.internalOrder() +
", rcvd=" + nextOrder + ", id=" + next.id() + ']');
break;
}
}
if (log.isDebugEnabled())
log.debug("Initialized connection with next node: " + next.id());
if (debugMode)
debugLog("Initialized connection with next node: " + next.id());
errs = null;
success = true;
next.lastSuccessfulAddress(addr);
}
}
catch (IOException | IgniteCheckedException e) {
if (errs == null)
errs = new ArrayList<>();
errs.add(e);
if (log.isDebugEnabled())
U.error(log, "Failed to connect to next node [msg=" + msg
+ ", err=" + e.getMessage() + ']', e);
onException("Failed to connect to next node [msg=" + msg + ", err=" + e + ']', e);
if (!openSock)
break; // Don't retry if we can not establish connection.
if (e instanceof SocketTimeoutException ||
X.hasCause(e, SocketTimeoutException.class)) {
ackTimeout0 *= 2;
if (!checkAckTimeout(ackTimeout0))
break;
}
continue;
}
finally {
if (!success) {
U.closeQuiet(sock);
sock = null;
}
else
// Next node exists and accepts incoming messages.
nextNodeExists = true;
}
}
try {
boolean failure;
synchronized (mux) {
failure = ServerImpl.this.failedNodes.size() < failedNodes.size();
}
assert !forceSndPending || msg instanceof TcpDiscoveryNodeLeftMessage;
if (failure || forceSndPending) {
if (log.isDebugEnabled())
log.debug("Pending messages will be sent [failure=" + failure +
", forceSndPending=" + forceSndPending + ']');
if (debugMode)
debugLog("Pending messages will be sent [failure=" + failure +
", forceSndPending=" + forceSndPending + ']');
boolean skip = pendingMsgs.discardId != null;
for (TcpDiscoveryAbstractMessage pendingMsg : pendingMsgs.msgs) {
if (skip) {
if (pendingMsg.id().equals(pendingMsgs.discardId))
skip = false;
continue;
}
long tstamp = U.currentTimeMillis();
prepareNodeAddedMessage(pendingMsg, next.id(), pendingMsgs.msgs,
pendingMsgs.discardId);
try {
writeToSocket(sock, pendingMsg);
}
finally {
clearNodeAddedMessage(pendingMsg);
}
spi.stats.onMessageSent(pendingMsg, U.currentTimeMillis() - tstamp);
int res = spi.readReceipt(sock, ackTimeout0);
if (log.isDebugEnabled())
log.debug("Pending message has been sent to next node [msg=" + msg.id() +
", pendingMsgId=" + pendingMsg + ", next=" + next.id() +
", res=" + res + ']');
if (debugMode)
debugLog("Pending message has been sent to next node [msg=" + msg.id() +
", pendingMsgId=" + pendingMsg + ", next=" + next.id() +
", res=" + res + ']');
}
}
prepareNodeAddedMessage(msg, next.id(), pendingMsgs.msgs, pendingMsgs.discardId);
try {
long tstamp = U.currentTimeMillis();
writeToSocket(sock, msg);
spi.stats.onMessageSent(msg, U.currentTimeMillis() - tstamp);
int res = spi.readReceipt(sock, ackTimeout0);
if (log.isDebugEnabled())
log.debug("Message has been sent to next node [msg=" + msg +
", next=" + next.id() +
", res=" + res + ']');
if (debugMode)
debugLog("Message has been sent to next node [msg=" + msg +
", next=" + next.id() +
", res=" + res + ']');
}
finally {
clearNodeAddedMessage(msg);
}
registerPendingMessage(msg);
sent = true;
break addr;
}
catch (IOException | IgniteCheckedException e) {
if (errs == null)
errs = new ArrayList<>();
errs.add(e);
if (log.isDebugEnabled())
U.error(log, "Failed to send message to next node [next=" + next.id() + ", msg=" + msg +
", err=" + e + ']', e);
onException("Failed to send message to next node [next=" + next.id() + ", msg=" + msg + ']',
e);
if (e instanceof SocketTimeoutException || X.hasCause(e, SocketTimeoutException.class)) {
ackTimeout0 *= 2;
if (!checkAckTimeout(ackTimeout0))
break;
}
}
finally {
forceSndPending = false;
if (!sent) {
U.closeQuiet(sock);
sock = null;
if (log.isDebugEnabled())
log.debug("Message has not been sent [next=" + next.id() + ", msg=" + msg +
", i=" + i + ']');
}
}
} // Try to reconnect.
} // Iterating node's addresses.
if (!sent) {
if (!failedNodes.contains(next)) {
failedNodes.add(next);
if (state == CONNECTED) {
Exception err = errs != null ?
U.exceptionWithSuppressed("Failed to send message to next node [msg=" + msg +
", next=" + U.toShortString(next) + ']', errs) :
null;
// If node existed on connection initialization we should check
// whether it has not gone yet.
if (nextNodeExists && pingNode(next))
U.error(log, "Failed to send message to next node [msg=" + msg +
", next=" + next + ']', err);
else if (log.isDebugEnabled())
log.debug("Failed to send message to next node [msg=" + msg + ", next=" + next +
", errMsg=" + (err != null ? err.getMessage() : "N/A") + ']');
}
}
if (msg instanceof TcpDiscoveryStatusCheckMessage) {
TcpDiscoveryStatusCheckMessage msg0 = (TcpDiscoveryStatusCheckMessage)msg;
if (next.id().equals(msg0.failedNodeId())) {
next = null;
if (log.isDebugEnabled())
log.debug("Discarding status check since next node has indeed failed [next=" + next +
", msg=" + msg + ']');
// Discard status check message by exiting loop and handle failure.
break;
}
}
next = null;
searchNext = true;
errs = null;
}
else
break;
}
synchronized (mux) {
failedNodes.removeAll(ServerImpl.this.failedNodes);
}
if (!failedNodes.isEmpty()) {
if (state == CONNECTED) {
if (!sent && log.isDebugEnabled())
// Message has not been sent due to some problems.
log.debug("Message has not been sent: " + msg);
if (log.isDebugEnabled())
log.debug("Detected failed nodes: " + failedNodes);
}
synchronized (mux) {
ServerImpl.this.failedNodes.addAll(failedNodes);
}
for (TcpDiscoveryNode n : failedNodes)
msgWorker.addMessage(new TcpDiscoveryNodeFailedMessage(locNodeId, n.id(), n.internalOrder()));
LT.warn(log, null, "Local node has detected failed nodes and started cluster-wide procedure. " +
"To speed up failure detection please see 'Failure Detection' section under javadoc" +
" for 'TcpDiscoverySpi'");
}
}
/**
* @param msg Message.
* @return Whether to redirect message to client nodes.
*/
private boolean redirectToClients(TcpDiscoveryAbstractMessage msg) {
return msg.verified() && U.getAnnotation(msg.getClass(), TcpDiscoveryRedirectToClient.class) != null;
}
/**
* Registers pending message.
*
* @param msg Message to register.
*/
private void registerPendingMessage(TcpDiscoveryAbstractMessage msg) {
assert msg != null;
if (spi.ensured(msg)) {
pendingMsgs.add(msg);
spi.stats.onPendingMessageRegistered();
if (log.isDebugEnabled())
log.debug("Pending message has been registered: " + msg.id());
}
}
/**
* Checks whether pending messages queue contains unprocessed {@link TcpDiscoveryNodeAddedMessage} for
* the node with {@code nodeId}.
*
* @param nodeId Node ID.
* @return {@code true} if contains, {@code false} otherwise.
*/
private boolean hasPendingAddMessage(UUID nodeId) {
if (pendingMsgs.msgs.isEmpty())
return false;
for (TcpDiscoveryAbstractMessage pendingMsg : pendingMsgs.msgs) {
if (pendingMsg instanceof TcpDiscoveryNodeAddedMessage) {
TcpDiscoveryNodeAddedMessage addMsg = (TcpDiscoveryNodeAddedMessage)pendingMsg;
if (addMsg.node().id().equals(nodeId) && addMsg.id().compareTo(pendingMsgs.discardId) > 0)
return true;
}
}
return false;
}
/**
* Processes join request message.
*
* @param msg Join request message.
*/
private void processJoinRequestMessage(TcpDiscoveryJoinRequestMessage msg) {
assert msg != null;
TcpDiscoveryNode node = msg.node();
UUID locNodeId = getLocalNodeId();
if (!msg.client()) {
boolean rmtHostLoopback = node.socketAddresses().size() == 1 &&
node.socketAddresses().iterator().next().getAddress().isLoopbackAddress();
// This check is performed by the node joining node is connected to, but not by coordinator
// because loopback problem message is sent directly to the joining node which may be unavailable
// if coordinator resides on another host.
if (spi.locHost.isLoopbackAddress() != rmtHostLoopback) {
String firstNode = rmtHostLoopback ? "remote" : "local";
String secondNode = rmtHostLoopback ? "local" : "remote";
String errMsg = "Failed to add node to topology because " + firstNode +
" node is configured to use loopback address, but " + secondNode + " node is not " +
"(consider changing 'localAddress' configuration parameter) " +
"[locNodeAddrs=" + U.addressesAsString(locNode) +
", rmtNodeAddrs=" + U.addressesAsString(node) + ']';
LT.warn(log, null, errMsg);
// Always output in debug.
if (log.isDebugEnabled())
log.debug(errMsg);
try {
trySendMessageDirectly(node, new TcpDiscoveryLoopbackProblemMessage(
locNodeId, locNode.addresses(), locNode.hostNames()));
}
catch (IgniteSpiException e) {
if (log.isDebugEnabled())
log.debug("Failed to send loopback problem message to node " +
"[node=" + node + ", err=" + e.getMessage() + ']');
onException("Failed to send loopback problem message to node " +
"[node=" + node + ", err=" + e.getMessage() + ']', e);
}
// Ignore join request.
return;
}
}
if (isLocalNodeCoordinator()) {
TcpDiscoveryNode existingNode = ring.node(node.id());
if (existingNode != null) {
if (!node.socketAddresses().equals(existingNode.socketAddresses())) {
if (!pingNode(existingNode)) {
addMessage(new TcpDiscoveryNodeFailedMessage(locNodeId,
existingNode.id(), existingNode.internalOrder()));
// Ignore this join request since existing node is about to fail
// and new node can continue.
return;
}
try {
trySendMessageDirectly(node, new TcpDiscoveryDuplicateIdMessage(locNodeId,
existingNode));
}
catch (IgniteSpiException e) {
if (log.isDebugEnabled())
log.debug("Failed to send duplicate ID message to node " +
"[node=" + node + ", existingNode=" + existingNode +
", err=" + e.getMessage() + ']');
onException("Failed to send duplicate ID message to node " +
"[node=" + node + ", existingNode=" + existingNode + ']', e);
}
// Output warning.
LT.warn(log, null, "Ignoring join request from node (duplicate ID) [node=" + node +
", existingNode=" + existingNode + ']');
// Ignore join request.
return;
}
if (msg.client()) {
TcpDiscoveryClientReconnectMessage reconMsg = new TcpDiscoveryClientReconnectMessage(node.id(),
node.clientRouterNodeId(),
null);
reconMsg.verify(getLocalNodeId());
Collection<TcpDiscoveryAbstractMessage> msgs = msgHist.messages(null, node);
if (msgs != null) {
reconMsg.pendingMessages(msgs);
reconMsg.success(true);
}
if (log.isDebugEnabled())
log.debug("Send reconnect message to already joined client " +
"[clientNode=" + existingNode + ", msg=" + reconMsg + ']');
if (getLocalNodeId().equals(node.clientRouterNodeId())) {
ClientMessageWorker wrk = clientMsgWorkers.get(node.id());
if (wrk != null)
wrk.addMessage(reconMsg);
else if (log.isDebugEnabled())
log.debug("Failed to find client message worker " +
"[clientNode=" + existingNode + ", msg=" + reconMsg + ']');
}
else {
if (ring.hasRemoteNodes())
sendMessageAcrossRing(reconMsg);
}
}
else if (log.isDebugEnabled())
log.debug("Ignoring join request message since node is already in topology: " + msg);
return;
}
if (spi.nodeAuth != null) {
// Authenticate node first.
try {
SecurityCredentials cred = unmarshalCredentials(node);
SecurityContext subj = spi.nodeAuth.authenticateNode(node, cred);
if (subj == null) {
// Node has not pass authentication.
LT.warn(log, null,
"Authentication failed [nodeId=" + node.id() +
", addrs=" + U.addressesAsString(node) + ']',
"Authentication failed [nodeId=" + U.id8(node.id()) + ", addrs=" +
U.addressesAsString(node) + ']');
// Always output in debug.
if (log.isDebugEnabled())
log.debug("Authentication failed [nodeId=" + node.id() + ", addrs=" +
U.addressesAsString(node));
try {
trySendMessageDirectly(node, new TcpDiscoveryAuthFailedMessage(locNodeId,
spi.locHost));
}
catch (IgniteSpiException e) {
if (log.isDebugEnabled())
log.debug("Failed to send unauthenticated message to node " +
"[node=" + node + ", err=" + e.getMessage() + ']');
onException("Failed to send unauthenticated message to node " +
"[node=" + node + ", err=" + e.getMessage() + ']', e);
}
// Ignore join request.
return;
}
else {
if (!(subj instanceof Serializable)) {
// Node has not pass authentication.
LT.warn(log, null,
"Authentication subject is not Serializable [nodeId=" + node.id() +
", addrs=" + U.addressesAsString(node) + ']',
"Authentication subject is not Serializable [nodeId=" + U.id8(node.id()) +
", addrs=" +
U.addressesAsString(node) + ']');
// Always output in debug.
if (log.isDebugEnabled())
log.debug("Authentication subject is not serializable [nodeId=" + node.id() +
", addrs=" + U.addressesAsString(node));
try {
trySendMessageDirectly(node, new TcpDiscoveryAuthFailedMessage(locNodeId,
spi.locHost));
}
catch (IgniteSpiException e) {
if (log.isDebugEnabled())
log.debug("Failed to send unauthenticated message to node " +
"[node=" + node + ", err=" + e.getMessage() + ']');
}
// Ignore join request.
return;
}
// Stick in authentication subject to node (use security-safe attributes for copy).
Map<String, Object> attrs = new HashMap<>(node.getAttributes());
attrs.put(IgniteNodeAttributes.ATTR_SECURITY_SUBJECT, spi.marsh.marshal(subj));
node.setAttributes(attrs);
}
}
catch (IgniteException | IgniteCheckedException e) {
LT.error(log, e, "Authentication failed [nodeId=" + node.id() + ", addrs=" +
U.addressesAsString(node) + ']');
if (log.isDebugEnabled())
log.debug("Failed to authenticate node (will ignore join request) [node=" + node +
", err=" + e + ']');
onException("Failed to authenticate node (will ignore join request) [node=" + node +
", err=" + e + ']', e);
// Ignore join request.
return;
}
}
IgniteNodeValidationResult err = spi.getSpiContext().validateNode(node);
if (err != null) {
boolean ping = node.id().equals(err.nodeId()) ? pingNode(node) : pingNode(err.nodeId());
if (!ping) {
if (log.isDebugEnabled())
log.debug("Conflicting node has already left, need to wait for event. " +
"Will ignore join request for now since it will be recent [req=" + msg +
", err=" + err.message() + ']');
// Ignore join request.
return;
}
LT.warn(log, null, err.message());
// Always output in debug.
if (log.isDebugEnabled())
log.debug(err.message());
try {
trySendMessageDirectly(node,
new TcpDiscoveryCheckFailedMessage(locNodeId, err.sendMessage()));
}
catch (IgniteSpiException e) {
if (log.isDebugEnabled())
log.debug("Failed to send hash ID resolver validation failed message to node " +
"[node=" + node + ", err=" + e.getMessage() + ']');
onException("Failed to send hash ID resolver validation failed message to node " +
"[node=" + node + ", err=" + e.getMessage() + ']', e);
}
// Ignore join request.
return;
}
String locMarsh = locNode.attribute(ATTR_MARSHALLER);
String rmtMarsh = node.attribute(ATTR_MARSHALLER);
if (!F.eq(locMarsh, rmtMarsh)) {
String errMsg = "Local node's marshaller differs from remote node's marshaller " +
"(to make sure all nodes in topology have identical marshaller, " +
"configure marshaller explicitly in configuration) " +
"[locMarshaller=" + locMarsh + ", rmtMarshaller=" + rmtMarsh +
", locNodeAddrs=" + U.addressesAsString(locNode) +
", rmtNodeAddrs=" + U.addressesAsString(node) +
", locNodeId=" + locNode.id() + ", rmtNodeId=" + msg.creatorNodeId() + ']';
LT.warn(log, null, errMsg);
// Always output in debug.
if (log.isDebugEnabled())
log.debug(errMsg);
try {
String sndMsg = "Local node's marshaller differs from remote node's marshaller " +
"(to make sure all nodes in topology have identical marshaller, " +
"configure marshaller explicitly in configuration) " +
"[locMarshaller=" + rmtMarsh + ", rmtMarshaller=" + locMarsh +
", locNodeAddrs=" + U.addressesAsString(node) + ", locPort=" + node.discoveryPort() +
", rmtNodeAddr=" + U.addressesAsString(locNode) + ", locNodeId=" + node.id() +
", rmtNodeId=" + locNode.id() + ']';
trySendMessageDirectly(node,
new TcpDiscoveryCheckFailedMessage(locNodeId, sndMsg));
}
catch (IgniteSpiException e) {
if (log.isDebugEnabled())
log.debug("Failed to send marshaller check failed message to node " +
"[node=" + node + ", err=" + e.getMessage() + ']');
onException("Failed to send marshaller check failed message to node " +
"[node=" + node + ", err=" + e.getMessage() + ']', e);
}
// Ignore join request.
return;
}
// Handle join.
node.internalOrder(ring.nextNodeOrder());
if (log.isDebugEnabled())
log.debug("Internal order has been assigned to node: " + node);
TcpDiscoveryNodeAddedMessage nodeAddedMsg = new TcpDiscoveryNodeAddedMessage(locNodeId,
node, msg.discoveryData(), spi.gridStartTime);
nodeAddedMsg.client(msg.client());
processNodeAddedMessage(nodeAddedMsg);
}
else if (ring.hasRemoteNodes())
sendMessageAcrossRing(msg);
}
/**
* Tries to send a message to all node's available addresses.
*
* @param node Node to send message to.
* @param msg Message.
* @throws IgniteSpiException Last failure if all attempts failed.
*/
private void trySendMessageDirectly(TcpDiscoveryNode node, TcpDiscoveryAbstractMessage msg)
throws IgniteSpiException {
if (node.isClient()) {
TcpDiscoveryNode routerNode = ring.node(node.clientRouterNodeId());
if (routerNode == null)
throw new IgniteSpiException("Router node for client does not exist: " + node);
if (routerNode.isClient())
throw new IgniteSpiException("Router node is a client node: " + node);
if (routerNode.id().equals(getLocalNodeId())) {
ClientMessageWorker worker = clientMsgWorkers.get(node.id());
msg.verify(getLocalNodeId()); // Client worker require verified messages.
worker.addMessage(msg);
return;
}
trySendMessageDirectly(routerNode, msg);
return;
}
IgniteSpiException ex = null;
for (InetSocketAddress addr : spi.getNodeAddresses(node, U.sameMacs(locNode, node))) {
try {
sendMessageDirectly(msg, addr);
node.lastSuccessfulAddress(addr);
ex = null;
break;
}
catch (IgniteSpiException e) {
ex = e;
}
}
if (ex != null)
throw ex;
}
/**
* Processes client reconnect message.
*
* @param msg Client reconnect message.
*/
private void processClientReconnectMessage(TcpDiscoveryClientReconnectMessage msg) {
UUID locNodeId = getLocalNodeId();
boolean isLocNodeRouter = locNodeId.equals(msg.routerNodeId());
if (!msg.verified()) {
assert isLocNodeRouter;
msg.verify(locNodeId);
if (ring.hasRemoteNodes()) {
sendMessageAcrossRing(msg);
return;
}
}
UUID nodeId = msg.creatorNodeId();
TcpDiscoveryNode node = ring.node(nodeId);
assert node == null || node.isClient();
if (node != null) {
assert node.isClient();
node.clientRouterNodeId(msg.routerNodeId());
node.aliveCheck(spi.maxMissedClientHbs);
if (isLocalNodeCoordinator()) {
Collection<TcpDiscoveryAbstractMessage> pending = msgHist.messages(msg.lastMessageId(), node);
if (pending != null) {
msg.pendingMessages(pending);
msg.success(true);
if (log.isDebugEnabled())
log.debug("Accept client reconnect, restored pending messages " +
"[locNodeId=" + locNodeId + ", clientNodeId=" + nodeId + ']');
}
else {
if (log.isDebugEnabled())
log.debug("Failing reconnecting client node because failed to restore pending " +
"messages [locNodeId=" + locNodeId + ", clientNodeId=" + nodeId + ']');
processNodeFailedMessage(new TcpDiscoveryNodeFailedMessage(locNodeId,
node.id(), node.internalOrder()));
}
}
}
else if (log.isDebugEnabled())
log.debug("Reconnecting client node is already failed [nodeId=" + nodeId + ']');
if (isLocNodeRouter) {
ClientMessageWorker wrk = clientMsgWorkers.get(nodeId);
if (wrk != null)
wrk.addMessage(msg);
else if (log.isDebugEnabled())
log.debug("Failed to reconnect client node (disconnected during the process) [locNodeId=" +
locNodeId + ", clientNodeId=" + nodeId + ']');
}
else {
if (ring.hasRemoteNodes())
sendMessageAcrossRing(msg);
}
}
/**
* Processes node added message.
*
* @param msg Node added message.
* @deprecated Due to current protocol node add process cannot be dropped in the middle of the ring,
* if new node auth fails due to config inconsistency. So, we need to finish add
* and only then initiate failure.
*/
@Deprecated
private void processNodeAddedMessage(TcpDiscoveryNodeAddedMessage msg) {
assert msg != null;
TcpDiscoveryNode node = msg.node();
assert node != null;
if (node.internalOrder() < locNode.internalOrder()) {
if (log.isDebugEnabled())
log.debug("Discarding node added message since local node's order is greater " +
"[node=" + node + ", locNode=" + locNode + ", msg=" + msg + ']');
return;
}
UUID locNodeId = getLocalNodeId();
if (isLocalNodeCoordinator()) {
if (msg.verified()) {
spi.stats.onRingMessageReceived(msg);
TcpDiscoveryNodeAddFinishedMessage addFinishMsg = new TcpDiscoveryNodeAddFinishedMessage(locNodeId,
node.id());
if (node.isClient()) {
addFinishMsg.clientDiscoData(msg.oldNodesDiscoveryData());
addFinishMsg.clientNodeAttributes(node.attributes());
}
processNodeAddFinishedMessage(addFinishMsg);
addMessage(new TcpDiscoveryDiscardMessage(locNodeId, msg.id()));
return;
}
msg.verify(locNodeId);
}
else if (!locNodeId.equals(node.id()) && ring.node(node.id()) != null) {
// Local node already has node from message in local topology.
// Just pass it to coordinator via the ring.
if (ring.hasRemoteNodes())
sendMessageAcrossRing(msg);
if (log.isDebugEnabled())
log.debug("Local node already has node being added. Passing TcpDiscoveryNodeAddedMessage to " +
"coordinator for final processing [ring=" + ring + ", node=" + node + ", locNode="
+ locNode + ", msg=" + msg + ']');
if (debugMode)
debugLog("Local node already has node being added. Passing TcpDiscoveryNodeAddedMessage to " +
"coordinator for final processing [ring=" + ring + ", node=" + node + ", locNode="
+ locNode + ", msg=" + msg + ']');
return;
}
if (msg.verified() && !locNodeId.equals(node.id())) {
if (node.internalOrder() <= ring.maxInternalOrder()) {
if (log.isDebugEnabled())
log.debug("Discarding node added message since new node's order is less than " +
"max order in ring [ring=" + ring + ", node=" + node + ", locNode=" + locNode +
", msg=" + msg + ']');
if (debugMode)
debugLog("Discarding node added message since new node's order is less than " +
"max order in ring [ring=" + ring + ", node=" + node + ", locNode=" + locNode +
", msg=" + msg + ']');
return;
}
if (!isLocalNodeCoordinator() && spi.nodeAuth != null && spi.nodeAuth.isGlobalNodeAuthentication()) {
boolean authFailed = true;
try {
SecurityCredentials cred = unmarshalCredentials(node);
if (cred == null) {
if (log.isDebugEnabled())
log.debug(
"Skipping global authentication for node (security credentials not found, " +
"probably, due to coordinator has older version) " +
"[nodeId=" + node.id() +
", addrs=" + U.addressesAsString(node) +
", coord=" + ring.coordinator() + ']');
authFailed = false;
}
else {
SecurityContext subj = spi.nodeAuth.authenticateNode(node, cred);
SecurityContext coordSubj = spi.marsh.unmarshal(
node.<byte[]>attribute(IgniteNodeAttributes.ATTR_SECURITY_SUBJECT),
U.gridClassLoader());
if (!permissionsEqual(coordSubj.subject().permissions(), subj.subject().permissions())) {
// Node has not pass authentication.
LT.warn(log, null,
"Authentication failed [nodeId=" + node.id() +
", addrs=" + U.addressesAsString(node) + ']',
"Authentication failed [nodeId=" + U.id8(node.id()) + ", addrs=" +
U.addressesAsString(node) + ']');
// Always output in debug.
if (log.isDebugEnabled())
log.debug("Authentication failed [nodeId=" + node.id() + ", addrs=" +
U.addressesAsString(node));
}
else
// Node will not be kicked out.
authFailed = false;
}
}
catch (IgniteException | IgniteCheckedException e) {
U.error(log, "Failed to verify node permissions consistency (will drop the node): " + node, e);
}
finally {
if (authFailed) {
try {
trySendMessageDirectly(node, new TcpDiscoveryAuthFailedMessage(locNodeId,
spi.locHost));
}
catch (IgniteSpiException e) {
if (log.isDebugEnabled())
log.debug("Failed to send unauthenticated message to node " +
"[node=" + node + ", err=" + e.getMessage() + ']');
onException("Failed to send unauthenticated message to node " +
"[node=" + node + ", err=" + e.getMessage() + ']', e);
}
addMessage(new TcpDiscoveryNodeFailedMessage(locNodeId, node.id(),
node.internalOrder()));
}
}
}
if (msg.client())
node.aliveCheck(spi.maxMissedClientHbs);
boolean topChanged = ring.add(node);
if (topChanged) {
assert !node.visible() : "Added visible node [node=" + node + ", locNode=" + locNode + ']';
Map<Integer, byte[]> data = msg.newNodeDiscoveryData();
if (data != null)
spi.onExchange(node.id(), node.id(), data, U.gridClassLoader());
msg.addDiscoveryData(locNodeId, spi.collectExchangeData(node.id()));
}
if (log.isDebugEnabled())
log.debug("Added node to local ring [added=" + topChanged + ", node=" + node +
", ring=" + ring + ']');
}
if (msg.verified() && locNodeId.equals(node.id())) {
// Discovery data.
Map<UUID, Map<Integer, byte[]>> dataMap;
synchronized (mux) {
if (spiState == CONNECTING && locNode.internalOrder() != node.internalOrder()) {
// Initialize topology.
Collection<TcpDiscoveryNode> top = msg.topology();
if (top != null && !top.isEmpty()) {
spi.gridStartTime = msg.gridStartTime();
for (TcpDiscoveryNode n : top) {
// Make all preceding nodes and local node visible.
n.visible(true);
}
locNode.setAttributes(node.attributes());
locNode.visible(true);
// Restore topology with all nodes visible.
ring.restoreTopology(top, node.internalOrder());
if (log.isDebugEnabled())
log.debug("Restored topology from node added message: " + ring);
dataMap = msg.oldNodesDiscoveryData();
topHist.clear();
topHist.putAll(msg.topologyHistory());
pendingMsgs.discard(msg.discardedMessageId());
// Clear data to minimize message size.
msg.messages(null, null);
msg.topology(null);
msg.topologyHistory(null);
msg.clearDiscoveryData();
}
else {
if (log.isDebugEnabled())
log.debug("Discarding node added message with empty topology: " + msg);
return;
}
}
else {
if (log.isDebugEnabled())
log.debug("Discarding node added message (this message has already been processed) " +
"[spiState=" + spiState +
", msg=" + msg +
", locNode=" + locNode + ']');
return;
}
}
// Notify outside of synchronized block.
if (dataMap != null) {
for (Map.Entry<UUID, Map<Integer, byte[]>> entry : dataMap.entrySet())
spi.onExchange(node.id(), entry.getKey(), entry.getValue(), U.gridClassLoader());
}
}
if (ring.hasRemoteNodes())
sendMessageAcrossRing(msg);
}
/**
* Processes node add finished message.
*
* @param msg Node add finished message.
*/
private void processNodeAddFinishedMessage(TcpDiscoveryNodeAddFinishedMessage msg) {
assert msg != null;
UUID nodeId = msg.nodeId();
assert nodeId != null;
TcpDiscoveryNode node = ring.node(nodeId);
if (node == null) {
if (log.isDebugEnabled())
log.debug("Discarding node add finished message since node is not found " +
"[msg=" + msg + ']');
return;
}
if (log.isDebugEnabled())
log.debug("Node to finish add: " + node);
boolean locNodeCoord = isLocalNodeCoordinator();
UUID locNodeId = getLocalNodeId();
if (locNodeCoord) {
if (msg.verified()) {
spi.stats.onRingMessageReceived(msg);
addMessage(new TcpDiscoveryDiscardMessage(locNodeId, msg.id()));
return;
}
if (node.visible() && node.order() != 0) {
if (log.isDebugEnabled())
log.debug("Discarding node add finished message since node has already been added " +
"[node=" + node + ", msg=" + msg + ']');
return;
}
else
msg.topologyVersion(ring.incrementTopologyVersion());
msg.verify(locNodeId);
}
long topVer = msg.topologyVersion();
boolean fireEvt = false;
if (msg.verified()) {
assert topVer > 0 : "Invalid topology version: " + msg;
if (node.order() == 0)
node.order(topVer);
if (!node.visible()) {
node.visible(true);
fireEvt = true;
}
}
if (msg.verified() && !locNodeId.equals(nodeId) && spiStateCopy() == CONNECTED && fireEvt) {
spi.stats.onNodeJoined();
// Make sure that node with greater order will never get EVT_NODE_JOINED
// on node with less order.
assert node.internalOrder() > locNode.internalOrder() : "Invalid order [node=" + node +
", locNode=" + locNode + ", msg=" + msg + ", ring=" + ring + ']';
if (spi.locNodeVer.equals(node.version()))
node.version(spi.locNodeVer);
if (!locNodeCoord) {
boolean b = ring.topologyVersion(topVer);
assert b : "Topology version has not been updated: [ring=" + ring + ", msg=" + msg +
", lastMsg=" + lastMsg + ", spiState=" + spiStateCopy() + ']';
if (log.isDebugEnabled())
log.debug("Topology version has been updated: [ring=" + ring + ", msg=" + msg + ']');
lastMsg = msg;
}
notifyDiscovery(EVT_NODE_JOINED, topVer, node);
try {
if (spi.ipFinder.isShared() && locNodeCoord)
spi.ipFinder.registerAddresses(node.socketAddresses());
}
catch (IgniteSpiException e) {
if (log.isDebugEnabled())
log.debug("Failed to register new node address [node=" + node +
", err=" + e.getMessage() + ']');
onException("Failed to register new node address [node=" + node +
", err=" + e.getMessage() + ']', e);
}
}
if (msg.verified() && locNodeId.equals(nodeId) && spiStateCopy() == CONNECTING) {
assert node != null;
assert topVer > 0 : "Invalid topology version: " + msg;
ring.topologyVersion(topVer);
node.order(topVer);
synchronized (mux) {
spiState = CONNECTED;
mux.notifyAll();
}
// Discovery manager must create local joined event before spiStart completes.
notifyDiscovery(EVT_NODE_JOINED, topVer, locNode);
}
if (ring.hasRemoteNodes())
sendMessageAcrossRing(msg);
}
/**
* Processes node left message.
*
* @param msg Node left message.
*/
private void processNodeLeftMessage(TcpDiscoveryNodeLeftMessage msg) {
assert msg != null;
UUID locNodeId = getLocalNodeId();
UUID leavingNodeId = msg.creatorNodeId();
if (locNodeId.equals(leavingNodeId)) {
if (msg.senderNodeId() == null) {
synchronized (mux) {
if (log.isDebugEnabled())
log.debug("Starting local node stop procedure.");
spiState = STOPPING;
mux.notifyAll();
}
}
if (msg.verified() || !ring.hasRemoteNodes() || msg.senderNodeId() != null) {
if (spi.ipFinder.isShared() && !ring.hasRemoteNodes()) {
try {
spi.ipFinder.unregisterAddresses(locNode.socketAddresses());
}
catch (IgniteSpiException e) {
U.error(log, "Failed to unregister local node address from IP finder.", e);
}
}
synchronized (mux) {
if (spiState == STOPPING) {
spiState = LEFT;
mux.notifyAll();
}
}
return;
}
sendMessageAcrossRing(msg);
return;
}
if (ring.node(msg.senderNodeId()) == null) {
if (log.isDebugEnabled())
log.debug("Discarding node left message since sender node is not in topology: " + msg);
return;
}
TcpDiscoveryNode leavingNode = ring.node(leavingNodeId);
if (leavingNode != null) {
synchronized (mux) {
leavingNodes.add(leavingNode);
}
}
else {
if (log.isDebugEnabled())
log.debug("Discarding node left message since node was not found: " + msg);
return;
}
boolean locNodeCoord = isLocalNodeCoordinator();
if (locNodeCoord) {
if (msg.verified()) {
spi.stats.onRingMessageReceived(msg);
addMessage(new TcpDiscoveryDiscardMessage(locNodeId, msg.id()));
return;
}
msg.verify(locNodeId);
}
if (msg.verified() && !locNodeId.equals(leavingNodeId)) {
TcpDiscoveryNode leftNode = ring.removeNode(leavingNodeId);
assert leftNode != null;
if (log.isDebugEnabled())
log.debug("Removed node from topology: " + leftNode);
long topVer;
if (locNodeCoord) {
topVer = ring.incrementTopologyVersion();
msg.topologyVersion(topVer);
}
else {
topVer = msg.topologyVersion();
assert topVer > 0 : "Topology version is empty for message: " + msg;
boolean b = ring.topologyVersion(topVer);
assert b : "Topology version has not been updated: [ring=" + ring + ", msg=" + msg +
", lastMsg=" + lastMsg + ", spiState=" + spiStateCopy() + ']';
if (log.isDebugEnabled())
log.debug("Topology version has been updated: [ring=" + ring + ", msg=" + msg + ']');
lastMsg = msg;
}
if (msg.client()) {
ClientMessageWorker wrk = clientMsgWorkers.remove(leavingNodeId);
if (wrk != null)
wrk.addMessage(msg);
}
else if (leftNode.equals(next) && sock != null) {
try {
writeToSocket(sock, msg);
if (log.isDebugEnabled())
log.debug("Sent verified node left message to leaving node: " + msg);
}
catch (IgniteCheckedException | IOException e) {
if (log.isDebugEnabled())
log.debug("Failed to send verified node left message to leaving node [msg=" + msg +
", err=" + e.getMessage() + ']');
onException("Failed to send verified node left message to leaving node [msg=" + msg +
", err=" + e.getMessage() + ']', e);
}
finally {
forceSndPending = true;
next = null;
U.closeQuiet(sock);
}
}
spi.stats.onNodeLeft();
notifyDiscovery(EVT_NODE_LEFT, topVer, leftNode);
synchronized (mux) {
failedNodes.remove(leftNode);
leavingNodes.remove(leftNode);
}
}
if (ring.hasRemoteNodes()) {
try {
sendMessageAcrossRing(msg);
}
finally {
forceSndPending = false;
}
}
else {
forceSndPending = false;
if (log.isDebugEnabled())
log.debug("Unable to send message across the ring (topology has no remote nodes): " + msg);
U.closeQuiet(sock);
}
}
/**
* Processes node failed message.
*
* @param msg Node failed message.
*/
private void processNodeFailedMessage(TcpDiscoveryNodeFailedMessage msg) {
assert msg != null;
UUID sndId = msg.senderNodeId();
if (sndId != null) {
TcpDiscoveryNode sndNode = ring.node(sndId);
if (sndNode == null) {
if (log.isDebugEnabled())
log.debug("Discarding node failed message sent from unknown node: " + msg);
return;
}
else {
boolean contains;
synchronized (mux) {
contains = failedNodes.contains(sndNode);
}
if (contains) {
if (log.isDebugEnabled())
log.debug("Discarding node failed message sent from node which is about to fail: " + msg);
return;
}
}
}
UUID nodeId = msg.failedNodeId();
long order = msg.order();
TcpDiscoveryNode node = ring.node(nodeId);
if (node != null && node.internalOrder() != order) {
if (log.isDebugEnabled())
log.debug("Ignoring node failed message since node internal order does not match " +
"[msg=" + msg + ", node=" + node + ']');
return;
}
if (node != null) {
synchronized (mux) {
failedNodes.add(node);
}
}
else {
if (log.isDebugEnabled())
log.debug("Discarding node failed message since node was not found: " + msg);
return;
}
boolean locNodeCoord = isLocalNodeCoordinator();
UUID locNodeId = getLocalNodeId();
if (locNodeCoord) {
if (msg.verified()) {
spi.stats.onRingMessageReceived(msg);
addMessage(new TcpDiscoveryDiscardMessage(locNodeId, msg.id()));
return;
}
msg.verify(locNodeId);
}
if (msg.verified()) {
node = ring.removeNode(nodeId);
assert node != null;
long topVer;
if (locNodeCoord) {
topVer = ring.incrementTopologyVersion();
msg.topologyVersion(topVer);
}
else {
topVer = msg.topologyVersion();
assert topVer > 0 : "Topology version is empty for message: " + msg;
boolean b = ring.topologyVersion(topVer);
assert b : "Topology version has not been updated: [ring=" + ring + ", msg=" + msg +
", lastMsg=" + lastMsg + ", spiState=" + spiStateCopy() + ']';
if (log.isDebugEnabled())
log.debug("Topology version has been updated: [ring=" + ring + ", msg=" + msg + ']');
lastMsg = msg;
}
synchronized (mux) {
failedNodes.remove(node);
leavingNodes.remove(node);
ClientMessageWorker worker = clientMsgWorkers.remove(node.id());
if (worker != null)
worker.interrupt();
}
if (msg.warning() != null && !msg.creatorNodeId().equals(getLocalNodeId())) {
ClusterNode creatorNode = ring.node(msg.creatorNodeId());
U.warn(log, "Received EVT_NODE_FAILED event with warning [" +
"nodeInitiatedEvt=" + (creatorNode != null ? creatorNode : msg.creatorNodeId()) +
", msg=" + msg.warning() + ']');
}
notifyDiscovery(EVT_NODE_FAILED, topVer, node);
spi.stats.onNodeFailed();
}
if (ring.hasRemoteNodes())
sendMessageAcrossRing(msg);
else {
if (log.isDebugEnabled())
log.debug("Unable to send message across the ring (topology has no remote nodes): " + msg);
U.closeQuiet(sock);
}
}
/**
* Processes status check message.
*
* @param msg Status check message.
*/
private void processStatusCheckMessage(TcpDiscoveryStatusCheckMessage msg) {
assert msg != null;
UUID locNodeId = getLocalNodeId();
if (msg.failedNodeId() != null) {
if (locNodeId.equals(msg.failedNodeId())) {
if (log.isDebugEnabled())
log.debug("Status check message discarded (suspect node is local node).");
return;
}
if (locNodeId.equals(msg.creatorNodeId()) && msg.senderNodeId() != null) {
if (log.isDebugEnabled())
log.debug("Status check message discarded (local node is the sender of the status message).");
return;
}
if (isLocalNodeCoordinator() && ring.node(msg.creatorNodeId()) == null) {
if (log.isDebugEnabled())
log.debug("Status check message discarded (creator node is not in topology).");
return;
}
}
else {
if (isLocalNodeCoordinator() && !locNodeId.equals(msg.creatorNodeId())) {
// Local node is real coordinator, it should respond and discard message.
if (ring.node(msg.creatorNodeId()) != null) {
// Sender is in topology, send message via ring.
msg.status(STATUS_OK);
sendMessageAcrossRing(msg);
}
else {
// Sender is not in topology, it should reconnect.
msg.status(STATUS_RECON);
try {
trySendMessageDirectly(msg.creatorNode(), msg);
if (log.isDebugEnabled())
log.debug("Responded to status check message " +
"[recipient=" + msg.creatorNodeId() + ", status=" + msg.status() + ']');
}
catch (IgniteSpiException e) {
if (e.hasCause(SocketException.class)) {
if (log.isDebugEnabled()) {
log.debug("Failed to respond to status check message (connection refused) " +
"[recipient=" + msg.creatorNodeId() + ", status=" + msg.status() + ']');
}
onException("Failed to respond to status check message (connection refused) " +
"[recipient=" + msg.creatorNodeId() + ", status=" + msg.status() + ']', e);
}
else {
if (pingNode(msg.creatorNode())) {
// Node exists and accepts incoming connections.
U.error(log, "Failed to respond to status check message " +
"[recipient=" + msg.creatorNodeId() + ", status=" + msg.status() + ']', e);
}
else if (log.isDebugEnabled()) {
log.debug("Failed to respond to status check message (did the node stop?) " +
"[recipient=" + msg.creatorNodeId() + ", status=" + msg.status() + ']');
}
}
}
}
return;
}
if (locNodeId.equals(msg.creatorNodeId()) && msg.senderNodeId() == null &&
U.currentTimeMillis() - locNode.lastUpdateTime() < spi.hbFreq) {
if (log.isDebugEnabled())
log.debug("Status check message discarded (local node receives updates).");
return;
}
if (locNodeId.equals(msg.creatorNodeId()) && msg.senderNodeId() == null &&
spiStateCopy() != CONNECTED) {
if (log.isDebugEnabled())
log.debug("Status check message discarded (local node is not connected to topology).");
return;
}
if (locNodeId.equals(msg.creatorNodeId()) && msg.senderNodeId() != null) {
if (spiStateCopy() != CONNECTED)
return;
if (msg.status() == STATUS_OK) {
if (log.isDebugEnabled())
log.debug("Received OK status response from coordinator: " + msg);
}
else if (msg.status() == STATUS_RECON) {
U.warn(log, "Node is out of topology (probably, due to short-time network problems).");
notifyDiscovery(EVT_NODE_SEGMENTED, ring.topologyVersion(), locNode);
return;
}
else if (log.isDebugEnabled())
log.debug("Status value was not updated in status response: " + msg);
// Discard the message.
return;
}
}
if (ring.hasRemoteNodes())
sendMessageAcrossRing(msg);
}
/**
* Processes regular heartbeat message.
*
* @param msg Heartbeat message.
*/
private void processHeartbeatMessage(TcpDiscoveryHeartbeatMessage msg) {
assert msg != null;
assert !msg.client();
UUID locNodeId = getLocalNodeId();
if (ring.node(msg.creatorNodeId()) == null) {
if (log.isDebugEnabled())
log.debug("Discarding heartbeat message issued by unknown node [msg=" + msg +
", ring=" + ring + ']');
return;
}
if (isLocalNodeCoordinator() && !locNodeId.equals(msg.creatorNodeId())) {
if (log.isDebugEnabled())
log.debug("Discarding heartbeat message issued by non-coordinator node: " + msg);
return;
}
if (!isLocalNodeCoordinator() && locNodeId.equals(msg.creatorNodeId())) {
if (log.isDebugEnabled())
log.debug("Discarding heartbeat message issued by local node (node is no more coordinator): " +
msg);
return;
}
if (locNodeId.equals(msg.creatorNodeId()) && !hasMetrics(msg, locNodeId) && msg.senderNodeId() != null) {
if (log.isDebugEnabled())
log.debug("Discarding heartbeat message that has made two passes: " + msg);
return;
}
long tstamp = U.currentTimeMillis();
if (spiStateCopy() == CONNECTED) {
if (msg.hasMetrics()) {
for (Map.Entry<UUID, TcpDiscoveryHeartbeatMessage.MetricsSet> e : msg.metrics().entrySet()) {
UUID nodeId = e.getKey();
TcpDiscoveryHeartbeatMessage.MetricsSet metricsSet = e.getValue();
Map<Integer, CacheMetrics> cacheMetrics = msg.hasCacheMetrics() ?
msg.cacheMetrics().get(nodeId) : Collections.<Integer, CacheMetrics>emptyMap();
updateMetrics(nodeId, metricsSet.metrics(), cacheMetrics, tstamp);
for (T2<UUID, ClusterMetrics> t : metricsSet.clientMetrics())
updateMetrics(t.get1(), t.get2(), cacheMetrics, tstamp);
}
}
}
if (ring.hasRemoteNodes()) {
if ((locNodeId.equals(msg.creatorNodeId()) && msg.senderNodeId() == null ||
!hasMetrics(msg, locNodeId)) && spiStateCopy() == CONNECTED) {
// Message is on its first ring or just created on coordinator.
msg.setMetrics(locNodeId, spi.metricsProvider.metrics());
msg.setCacheMetrics(locNodeId, spi.metricsProvider.cacheMetrics());
for (Map.Entry<UUID, ClientMessageWorker> e : clientMsgWorkers.entrySet()) {
UUID nodeId = e.getKey();
ClusterMetrics metrics = e.getValue().metrics();
if (metrics != null)
msg.setClientMetrics(locNodeId, nodeId, metrics);
msg.addClientNodeId(nodeId);
}
}
else {
// Message is on its second ring.
removeMetrics(msg, locNodeId);
Collection<UUID> clientNodeIds = msg.clientNodeIds();
for (TcpDiscoveryNode clientNode : ring.clientNodes()) {
if (clientNode.visible()) {
if (clientNodeIds.contains(clientNode.id()))
clientNode.aliveCheck(spi.maxMissedClientHbs);
else {
int aliveCheck = clientNode.decrementAliveCheck();
if (aliveCheck == 0 && isLocalNodeCoordinator()) {
processNodeFailedMessage(new TcpDiscoveryNodeFailedMessage(locNodeId,
clientNode.id(), clientNode.internalOrder()));
}
}
}
}
}
if (ring.hasRemoteNodes())
sendMessageAcrossRing(msg);
}
else {
locNode.lastUpdateTime(tstamp);
notifyDiscovery(EVT_NODE_METRICS_UPDATED, ring.topologyVersion(), locNode);
}
}
/**
* Processes client heartbeat message.
*
* @param msg Heartbeat message.
*/
private void processClientHeartbeatMessage(TcpDiscoveryClientHeartbeatMessage msg) {
assert msg.client();
ClientMessageWorker wrk = clientMsgWorkers.get(msg.creatorNodeId());
if (wrk != null)
wrk.metrics(msg.metrics());
else if (log.isDebugEnabled())
log.debug("Received heartbeat message from unknown client node: " + msg);
}
/**
* @param nodeId Node ID.
* @param metrics Metrics.
* @param cacheMetrics Cache metrics.
* @param tstamp Timestamp.
*/
private void updateMetrics(UUID nodeId,
ClusterMetrics metrics,
Map<Integer, CacheMetrics> cacheMetrics,
long tstamp)
{
assert nodeId != null;
assert metrics != null;
TcpDiscoveryNode node = ring.node(nodeId);
if (node != null) {
node.setMetrics(metrics);
node.setCacheMetrics(cacheMetrics);
node.lastUpdateTime(tstamp);
notifyDiscovery(EVT_NODE_METRICS_UPDATED, ring.topologyVersion(), node);
}
else if (log.isDebugEnabled())
log.debug("Received metrics from unknown node: " + nodeId);
}
/**
* @param msg Message.
*/
private boolean hasMetrics(TcpDiscoveryHeartbeatMessage msg, UUID nodeId) {
return msg.hasMetrics(nodeId) || msg.hasCacheMetrics(nodeId);
}
/**
* Processes discard message and discards previously registered pending messages.
*
* @param msg Discard message.
*/
@SuppressWarnings("StatementWithEmptyBody")
private void processDiscardMessage(TcpDiscoveryDiscardMessage msg) {
assert msg != null;
IgniteUuid msgId = msg.msgId();
assert msgId != null;
if (isLocalNodeCoordinator()) {
if (!getLocalNodeId().equals(msg.verifierNodeId()))
// Message is not verified or verified by former coordinator.
msg.verify(getLocalNodeId());
else
// Discard the message.
return;
}
if (msg.verified())
pendingMsgs.discard(msgId);
if (ring.hasRemoteNodes())
sendMessageAcrossRing(msg);
}
/**
* @param msg Message.
*/
private void processClientPingRequest(final TcpDiscoveryClientPingRequest msg) {
utilityPool.execute(new Runnable() {
@Override public void run() {
boolean res = pingNode(msg.nodeToPing());
final ClientMessageWorker worker = clientMsgWorkers.get(msg.creatorNodeId());
if (worker == null) {
if (log.isDebugEnabled())
log.debug("Ping request from dead client node, will be skipped: " + msg.creatorNodeId());
}
else {
TcpDiscoveryClientPingResponse pingRes = new TcpDiscoveryClientPingResponse(
getLocalNodeId(), msg.nodeToPing(), res);
pingRes.verify(getLocalNodeId());
worker.addMessage(pingRes);
}
}
});
}
/**
* @param msg Message.
*/
private void processCustomMessage(TcpDiscoveryCustomEventMessage msg) {
if (isLocalNodeCoordinator()) {
boolean sndNext;
if (!msg.verified()) {
msg.verify(getLocalNodeId());
msg.topologyVersion(ring.topologyVersion());
notifyDiscoveryListener(msg);
sndNext = true;
}
else
sndNext = false;
if (sndNext && ring.hasRemoteNodes())
sendMessageAcrossRing(msg);
else {
spi.stats.onRingMessageReceived(msg);
DiscoverySpiCustomMessage msgObj = null;
try {
msgObj = msg.message(spi.marsh);
}
catch (Throwable e) {
U.error(log, "Failed to unmarshal discovery custom message.", e);
}
if (msgObj != null) {
DiscoverySpiCustomMessage nextMsg = msgObj.ackMessage();
if (nextMsg != null) {
try {
addMessage(new TcpDiscoveryCustomEventMessage(getLocalNodeId(), nextMsg,
spi.marsh.marshal(nextMsg)));
}
catch (IgniteCheckedException e) {
U.error(log, "Failed to marshal discovery custom message.", e);
}
}
}
addMessage(new TcpDiscoveryDiscardMessage(getLocalNodeId(), msg.id()));
}
}
else {
if (msg.verified())
notifyDiscoveryListener(msg);
if (ring.hasRemoteNodes())
sendMessageAcrossRing(msg);
}
}
/**
* @param msg Custom message.
*/
private void notifyDiscoveryListener(TcpDiscoveryCustomEventMessage msg) {
DiscoverySpiListener lsnr = spi.lsnr;
TcpDiscoverySpiState spiState = spiStateCopy();
Map<Long, Collection<ClusterNode>> hist;
synchronized (mux) {
hist = new TreeMap<>(topHist);
}
Collection<ClusterNode> snapshot = hist.get(msg.topologyVersion());
if (lsnr != null && (spiState == CONNECTED || spiState == DISCONNECTING)) {
TcpDiscoveryNode node = ring.node(msg.creatorNodeId());
if (node != null) {
try {
DiscoverySpiCustomMessage msgObj = msg.message(spi.marsh);
lsnr.onDiscovery(DiscoveryCustomEvent.EVT_DISCOVERY_CUSTOM_EVT,
msg.topologyVersion(),
node,
snapshot,
hist,
msgObj);
if (msgObj.isMutable())
msg.message(msgObj, spi.marsh.marshal(msgObj));
}
catch (Throwable e) {
U.error(log, "Failed to unmarshal discovery custom message.", e);
}
}
}
}
}
/**
* Thread that accepts incoming TCP connections.
* <p>
* Tcp server will call provided closure when accepts incoming connection.
* From that moment server is no more responsible for the socket.
*/
private class TcpServer extends IgniteSpiThread {
/** Socket TCP server listens to. */
private ServerSocket srvrSock;
/** Port to listen. */
private int port;
/**
* Constructor.
*
* @throws IgniteSpiException In case of error.
*/
TcpServer() throws IgniteSpiException {
super(spi.ignite().name(), "tcp-disco-srvr", log);
setPriority(spi.threadPri);
for (port = spi.locPort; port < spi.locPort + spi.locPortRange; port++) {
try {
srvrSock = new ServerSocket(port, 0, spi.locHost);
break;
}
catch (IOException e) {
if (port < spi.locPort + spi.locPortRange - 1) {
if (log.isDebugEnabled())
log.debug("Failed to bind to local port (will try next port within range) " +
"[port=" + port + ", localHost=" + spi.locHost + ']');
onException("Failed to bind to local port. " +
"[port=" + port + ", localHost=" + spi.locHost + ']', e);
}
else {
throw new IgniteSpiException("Failed to bind TCP server socket (possibly all ports in range " +
"are in use) [firstPort=" + spi.locPort + ", lastPort=" + (spi.locPort + spi.locPortRange - 1) +
", addr=" + spi.locHost + ']', e);
}
}
}
if (log.isInfoEnabled())
log.info("Successfully bound to TCP port [port=" + port + ", localHost=" + spi.locHost + ']');
}
/** {@inheritDoc} */
@Override protected void body() throws InterruptedException {
try {
while (!isInterrupted()) {
Socket sock = srvrSock.accept();
long tstamp = U.currentTimeMillis();
if (log.isDebugEnabled())
log.debug("Accepted incoming connection from addr: " + sock.getInetAddress());
SocketReader reader = new SocketReader(sock);
synchronized (mux) {
readers.add(reader);
reader.start();
}
spi.stats.onServerSocketInitialized(U.currentTimeMillis() - tstamp);
}
}
catch (IOException e) {
if (log.isDebugEnabled())
U.error(log, "Failed to accept TCP connection.", e);
onException("Failed to accept TCP connection.", e);
if (!isInterrupted()) {
if (U.isMacInvalidArgumentError(e))
U.error(log, "Failed to accept TCP connection\n\t" + U.MAC_INVALID_ARG_MSG, e);
else
U.error(log, "Failed to accept TCP connection.", e);
}
}
finally {
U.closeQuiet(srvrSock);
}
}
/** {@inheritDoc} */
@Override public void interrupt() {
super.interrupt();
U.close(srvrSock, log);
}
}
/**
* Thread that reads messages from the socket created for incoming connections.
*/
private class SocketReader extends IgniteSpiThread {
/** Socket to read data from. */
private final Socket sock;
/** */
private volatile UUID nodeId;
/**
* Constructor.
*
* @param sock Socket to read data from.
*/
SocketReader(Socket sock) {
super(spi.ignite().name(), "tcp-disco-sock-reader", log);
this.sock = sock;
setPriority(spi.threadPri);
spi.stats.onSocketReaderCreated();
}
/** {@inheritDoc} */
@Override protected void body() throws InterruptedException {
UUID locNodeId = getLocalNodeId();
ClientMessageWorker clientMsgWrk = null;
try {
InputStream in;
try {
// Set socket options.
sock.setKeepAlive(true);
sock.setTcpNoDelay(true);
int timeout = sock.getSoTimeout();
sock.setSoTimeout((int)spi.netTimeout);
for (IgniteInClosure<Socket> connLsnr : spi.incomeConnLsnrs)
connLsnr.apply(sock);
in = new BufferedInputStream(sock.getInputStream());
byte[] buf = new byte[4];
int read = 0;
while (read < buf.length) {
int r = in.read(buf, read, buf.length - read);
if (r >= 0)
read += r;
else {
if (log.isDebugEnabled())
log.debug("Failed to read magic header (too few bytes received) " +
"[rmtAddr=" + sock.getRemoteSocketAddress() +
", locAddr=" + sock.getLocalSocketAddress() + ']');
LT.warn(log, null, "Failed to read magic header (too few bytes received) [rmtAddr=" +
sock.getRemoteSocketAddress() + ", locAddr=" + sock.getLocalSocketAddress() + ']');
return;
}
}
if (!Arrays.equals(buf, U.IGNITE_HEADER)) {
if (log.isDebugEnabled())
log.debug("Unknown connection detected (is some other software connecting to " +
"this Ignite port?) " +
"[rmtAddr=" + sock.getRemoteSocketAddress() +
", locAddr=" + sock.getLocalSocketAddress() + ']');
LT.warn(log, null, "Unknown connection detected (is some other software connecting to " +
"this Ignite port?) [rmtAddr=" + sock.getRemoteSocketAddress() +
", locAddr=" + sock.getLocalSocketAddress() + ']');
return;
}
// Restore timeout.
sock.setSoTimeout(timeout);
TcpDiscoveryAbstractMessage msg = spi.readMessage(sock, in, spi.netTimeout);
// Ping.
if (msg instanceof TcpDiscoveryPingRequest) {
if (!spi.isNodeStopping0()) {
TcpDiscoveryPingRequest req = (TcpDiscoveryPingRequest)msg;
TcpDiscoveryPingResponse res = new TcpDiscoveryPingResponse(locNodeId);
if (req.clientNodeId() != null) {
ClientMessageWorker clientWorker = clientMsgWorkers.get(req.clientNodeId());
if (clientWorker != null)
res.clientExists(clientWorker.ping());
}
spi.writeToSocket(sock, res);
}
else if (log.isDebugEnabled())
log.debug("Ignore ping request, node is stopping.");
return;
}
// Handshake.
TcpDiscoveryHandshakeRequest req = (TcpDiscoveryHandshakeRequest)msg;
UUID nodeId = req.creatorNodeId();
this.nodeId = nodeId;
TcpDiscoveryHandshakeResponse res =
new TcpDiscoveryHandshakeResponse(locNodeId, locNode.internalOrder());
spi.writeToSocket(sock, res);
// It can happen if a remote node is stopped and it has a loopback address in the list of addresses,
// the local node sends a handshake request message on the loopback address, so we get here.
if (locNodeId.equals(nodeId)) {
assert !req.client();
if (log.isDebugEnabled())
log.debug("Handshake request from local node: " + req);
return;
}
if (req.client()) {
ClientMessageWorker clientMsgWrk0 = new ClientMessageWorker(sock, nodeId);
while (true) {
ClientMessageWorker old = clientMsgWorkers.putIfAbsent(nodeId, clientMsgWrk0);
if (old == null)
break;
if (old.isInterrupted()) {
clientMsgWorkers.remove(nodeId, old);
continue;
}
old.join(500);
old = clientMsgWorkers.putIfAbsent(nodeId, clientMsgWrk0);
if (old == null)
break;
if (log.isDebugEnabled())
log.debug("Already have client message worker, closing connection " +
"[locNodeId=" + locNodeId +
", rmtNodeId=" + nodeId +
", workerSock=" + old.sock +
", sock=" + sock + ']');
return;
}
if (log.isDebugEnabled())
log.debug("Created client message worker [locNodeId=" + locNodeId +
", rmtNodeId=" + nodeId + ", sock=" + sock + ']');
assert clientMsgWrk0 == clientMsgWorkers.get(nodeId);
clientMsgWrk = clientMsgWrk0;
}
if (log.isDebugEnabled())
log.debug("Initialized connection with remote node [nodeId=" + nodeId +
", client=" + req.client() + ']');
if (debugMode)
debugLog("Initialized connection with remote node [nodeId=" + nodeId +
", client=" + req.client() + ']');
}
catch (IOException e) {
if (log.isDebugEnabled())
U.error(log, "Caught exception on handshake [err=" + e +", sock=" + sock + ']', e);
if (X.hasCause(e, ObjectStreamException.class) || !sock.isClosed()) {
if (U.isMacInvalidArgumentError(e))
LT.error(log, e, "Failed to initialize connection [sock=" + sock + "]\n\t" +
U.MAC_INVALID_ARG_MSG);
else {
U.error(
log,
"Failed to initialize connection (this can happen due to short time " +
"network problems and can be ignored if does not affect node discovery) " +
"[sock=" + sock + ']',
e);
}
}
onException("Caught exception on handshake [err=" + e + ", sock=" + sock + ']', e);
return;
}
catch (IgniteCheckedException e) {
if (log.isDebugEnabled())
U.error(log, "Caught exception on handshake [err=" + e +", sock=" + sock + ']', e);
onException("Caught exception on handshake [err=" + e +", sock=" + sock + ']', e);
if (e.hasCause(SocketTimeoutException.class))
LT.warn(log, null, "Socket operation timed out on handshake " +
"(consider increasing 'networkTimeout' configuration property) " +
"[netTimeout=" + spi.netTimeout + ']');
else if (e.hasCause(ClassNotFoundException.class))
LT.warn(log, null, "Failed to read message due to ClassNotFoundException " +
"(make sure same versions of all classes are available on all nodes) " +
"[rmtAddr=" + sock.getRemoteSocketAddress() +
", err=" + X.cause(e, ClassNotFoundException.class).getMessage() + ']');
// Always report marshalling problems.
else if (e.hasCause(ObjectStreamException.class) ||
(!sock.isClosed() && !e.hasCause(IOException.class)))
LT.error(log, e, "Failed to initialize connection [sock=" + sock + ']');
return;
}
while (!isInterrupted()) {
try {
TcpDiscoveryAbstractMessage msg = spi.marsh.unmarshal(in, U.gridClassLoader());
msg.senderNodeId(nodeId);
if (log.isDebugEnabled())
log.debug("Message has been received: " + msg);
spi.stats.onMessageReceived(msg);
if (debugMode && recordable(msg))
debugLog("Message has been received: " + msg);
if (msg instanceof TcpDiscoveryJoinRequestMessage) {
TcpDiscoveryJoinRequestMessage req = (TcpDiscoveryJoinRequestMessage)msg;
if (!req.responded()) {
boolean ok = processJoinRequestMessage(req, clientMsgWrk);
if (clientMsgWrk != null && ok)
continue;
else
// Direct join request - no need to handle this socket anymore.
break;
}
}
else if (msg instanceof TcpDiscoveryClientReconnectMessage) {
if (clientMsgWrk != null) {
TcpDiscoverySpiState state = spiStateCopy();
if (state == CONNECTED) {
spi.writeToSocket(msg, sock, RES_OK);
if (clientMsgWrk != null && clientMsgWrk.getState() == State.NEW)
clientMsgWrk.start();
msgWorker.addMessage(msg);
continue;
}
else {
spi.writeToSocket(msg, sock, RES_CONTINUE_JOIN);
break;
}
}
}
else if (msg instanceof TcpDiscoveryDuplicateIdMessage) {
// Send receipt back.
spi.writeToSocket(msg, sock, RES_OK);
boolean ignored = false;
TcpDiscoverySpiState state = null;
synchronized (mux) {
if (spiState == CONNECTING) {
joinRes.set(msg);
spiState = DUPLICATE_ID;
mux.notifyAll();
}
else {
ignored = true;
state = spiState;
}
}
if (ignored && log.isDebugEnabled())
log.debug("Duplicate ID message has been ignored [msg=" + msg +
", spiState=" + state + ']');
continue;
}
else if (msg instanceof TcpDiscoveryAuthFailedMessage) {
// Send receipt back.
spi.writeToSocket(msg, sock, RES_OK);
boolean ignored = false;
TcpDiscoverySpiState state = null;
synchronized (mux) {
if (spiState == CONNECTING) {
joinRes.set(msg);
spiState = AUTH_FAILED;
mux.notifyAll();
}
else {
ignored = true;
state = spiState;
}
}
if (ignored && log.isDebugEnabled())
log.debug("Auth failed message has been ignored [msg=" + msg +
", spiState=" + state + ']');
continue;
}
else if (msg instanceof TcpDiscoveryCheckFailedMessage) {
// Send receipt back.
spi.writeToSocket(msg, sock, RES_OK);
boolean ignored = false;
TcpDiscoverySpiState state = null;
synchronized (mux) {
if (spiState == CONNECTING) {
joinRes.set(msg);
spiState = CHECK_FAILED;
mux.notifyAll();
}
else {
ignored = true;
state = spiState;
}
}
if (ignored && log.isDebugEnabled())
log.debug("Check failed message has been ignored [msg=" + msg +
", spiState=" + state + ']');
continue;
}
else if (msg instanceof TcpDiscoveryLoopbackProblemMessage) {
// Send receipt back.
spi.writeToSocket(msg, sock, RES_OK);
boolean ignored = false;
TcpDiscoverySpiState state = null;
synchronized (mux) {
if (spiState == CONNECTING) {
joinRes.set(msg);
spiState = LOOPBACK_PROBLEM;
mux.notifyAll();
}
else {
ignored = true;
state = spiState;
}
}
if (ignored && log.isDebugEnabled())
log.debug("Loopback problem message has been ignored [msg=" + msg +
", spiState=" + state + ']');
continue;
}
if (msg instanceof TcpDiscoveryPingResponse) {
assert msg.client() : msg;
ClientMessageWorker clientWorker = clientMsgWorkers.get(msg.creatorNodeId());
if (clientWorker != null)
clientWorker.pingResult(true);
continue;
}
msgWorker.addMessage(msg);
// Send receipt back.
if (clientMsgWrk == null)
spi.writeToSocket(msg, sock, RES_OK);
}
catch (IgniteCheckedException e) {
if (log.isDebugEnabled())
U.error(log, "Caught exception on message read [sock=" + sock +
", locNodeId=" + locNodeId + ", rmtNodeId=" + nodeId + ']', e);
onException("Caught exception on message read [sock=" + sock +
", locNodeId=" + locNodeId + ", rmtNodeId=" + nodeId + ']', e);
if (isInterrupted() || sock.isClosed())
return;
if (e.hasCause(ClassNotFoundException.class))
LT.warn(log, null, "Failed to read message due to ClassNotFoundException " +
"(make sure same versions of all classes are available on all nodes) " +
"[rmtNodeId=" + nodeId +
", err=" + X.cause(e, ClassNotFoundException.class).getMessage() + ']');
// Always report marshalling errors.
boolean err = e.hasCause(ObjectStreamException.class) ||
(nodeAlive(nodeId) && spiStateCopy() == CONNECTED && !X.hasCause(e, IOException.class));
if (err)
LT.error(log, e, "Failed to read message [sock=" + sock + ", locNodeId=" + locNodeId +
", rmtNodeId=" + nodeId + ']');
return;
}
catch (IOException e) {
if (log.isDebugEnabled())
U.error(log, "Caught exception on message read [sock=" + sock + ", locNodeId=" + locNodeId +
", rmtNodeId=" + nodeId + ']', e);
if (isInterrupted() || sock.isClosed())
return;
// Always report marshalling errors (although it is strange here).
boolean err = X.hasCause(e, ObjectStreamException.class) ||
(nodeAlive(nodeId) && spiStateCopy() == CONNECTED);
if (err)
LT.error(log, e, "Failed to send receipt on message [sock=" + sock +
", locNodeId=" + locNodeId + ", rmtNodeId=" + nodeId + ']');
onException("Caught exception on message read [sock=" + sock + ", locNodeId=" + locNodeId +
", rmtNodeId=" + nodeId + ']', e);
return;
}
}
}
finally {
if (clientMsgWrk != null) {
if (log.isDebugEnabled())
log.debug("Client connection failed [sock=" + sock + ", locNodeId=" + locNodeId +
", rmtNodeId=" + nodeId + ']');
clientMsgWorkers.remove(nodeId, clientMsgWrk);
U.interrupt(clientMsgWrk);
}
U.closeQuiet(sock);
}
}
/**
* @param nodeId Node ID.
* @return {@code True} if node is in the ring and is not being removed from.
*/
private boolean nodeAlive(UUID nodeId) {
// Is node alive or about to be removed from the ring?
TcpDiscoveryNode node = ring.node(nodeId);
boolean nodeAlive = node != null && node.visible();
if (nodeAlive) {
synchronized (mux) {
nodeAlive = !F.transform(failedNodes, F.node2id()).contains(nodeId) &&
!F.transform(leavingNodes, F.node2id()).contains(nodeId);
}
}
return nodeAlive;
}
/**
* @param msg Join request message.
* @param clientMsgWrk Client message worker to start.
* @return Whether connection was successful.
* @throws IOException If IO failed.
*/
@SuppressWarnings({"IfMayBeConditional"})
private boolean processJoinRequestMessage(TcpDiscoveryJoinRequestMessage msg,
@Nullable ClientMessageWorker clientMsgWrk) throws IOException {
assert msg != null;
assert !msg.responded();
TcpDiscoverySpiState state = spiStateCopy();
if (state == CONNECTED) {
spi.writeToSocket(msg, sock, RES_OK);
if (log.isDebugEnabled())
log.debug("Responded to join request message [msg=" + msg + ", res=" + RES_OK + ']');
msg.responded(true);
if (clientMsgWrk != null && clientMsgWrk.getState() == State.NEW)
clientMsgWrk.start();
msgWorker.addMessage(msg);
return true;
}
else {
spi.stats.onMessageProcessingStarted(msg);
Integer res;
SocketAddress rmtAddr = sock.getRemoteSocketAddress();
if (state == CONNECTING) {
if (noResAddrs.contains(rmtAddr) ||
getLocalNodeId().compareTo(msg.creatorNodeId()) < 0)
// Remote node node has not responded to join request or loses UUID race.
res = RES_WAIT;
else
// Remote node responded to join request and wins UUID race.
res = RES_CONTINUE_JOIN;
}
else
// Local node is stopping. Remote node should try next one.
res = RES_CONTINUE_JOIN;
spi.writeToSocket(msg, sock, res);
if (log.isDebugEnabled())
log.debug("Responded to join request message [msg=" + msg + ", res=" + res + ']');
fromAddrs.addAll(msg.node().socketAddresses());
spi.stats.onMessageProcessingFinished(msg);
return false;
}
}
/** {@inheritDoc} */
@Override public void interrupt() {
super.interrupt();
U.closeQuiet(sock);
}
/** {@inheritDoc} */
@Override protected void cleanup() {
super.cleanup();
U.closeQuiet(sock);
synchronized (mux) {
readers.remove(this);
}
spi.stats.onSocketReaderRemoved();
}
/** {@inheritDoc} */
@Override public String toString() {
return "Socket reader [id=" + getId() + ", name=" + getName() + ", nodeId=" + nodeId + ']';
}
}
/**
* SPI Statistics printer.
*/
private class StatisticsPrinter extends IgniteSpiThread {
/**
* Constructor.
*/
StatisticsPrinter() {
super(spi.ignite().name(), "tcp-disco-stats-printer", log);
assert spi.statsPrintFreq > 0;
assert log.isInfoEnabled();
setPriority(spi.threadPri);
}
/** {@inheritDoc} */
@SuppressWarnings({"BusyWait"})
@Override protected void body() throws InterruptedException {
if (log.isDebugEnabled())
log.debug("Statistics printer has been started.");
while (!isInterrupted()) {
Thread.sleep(spi.statsPrintFreq);
printStatistics();
}
}
}
/**
*/
private class ClientMessageWorker extends MessageWorkerAdapter {
/** Node ID. */
private final UUID clientNodeId;
/** Socket. */
private final Socket sock;
/** Current client metrics. */
private volatile ClusterMetrics metrics;
/** */
private final AtomicReference<GridFutureAdapter<Boolean>> pingFut = new AtomicReference<>();
/**
* @param sock Socket.
* @param clientNodeId Node ID.
*/
protected ClientMessageWorker(Socket sock, UUID clientNodeId) {
super("tcp-disco-client-message-worker");
this.sock = sock;
this.clientNodeId = clientNodeId;
}
/**
* @return Current client metrics.
*/
ClusterMetrics metrics() {
return metrics;
}
/**
* @param metrics New current client metrics.
*/
void metrics(ClusterMetrics metrics) {
this.metrics = metrics;
}
/** {@inheritDoc} */
@Override protected void processMessage(TcpDiscoveryAbstractMessage msg) {
try {
assert msg.verified() : msg;
if (log.isDebugEnabled())
log.debug("Redirecting message to client [sock=" + sock + ", locNodeId="
+ getLocalNodeId() + ", rmtNodeId=" + clientNodeId + ", msg=" + msg + ']');
try {
prepareNodeAddedMessage(msg, clientNodeId, null, null);
writeToSocket(sock, msg);
}
finally {
clearNodeAddedMessage(msg);
}
}
catch (IgniteCheckedException | IOException e) {
if (log.isDebugEnabled())
U.error(log, "Client connection failed [sock=" + sock + ", locNodeId="
+ getLocalNodeId() + ", rmtNodeId=" + clientNodeId + ", msg=" + msg + ']', e);
onException("Client connection failed [sock=" + sock + ", locNodeId="
+ getLocalNodeId() + ", rmtNodeId=" + clientNodeId + ", msg=" + msg + ']', e);
clientMsgWorkers.remove(clientNodeId, this);
U.interrupt(this);
U.closeQuiet(sock);
}
}
/**
* @param res Ping result.
*/
public void pingResult(boolean res) {
GridFutureAdapter<Boolean> fut = pingFut.getAndSet(null);
if (fut != null)
fut.onDone(res);
}
/**
* @return Ping result.
* @throws InterruptedException If interrupted.
*/
public boolean ping() throws InterruptedException {
if (spi.isNodeStopping0())
return false;
GridFutureAdapter<Boolean> fut;
while (true) {
fut = pingFut.get();
if (fut != null)
break;
fut = new GridFutureAdapter<>();
if (pingFut.compareAndSet(null, fut)) {
TcpDiscoveryPingRequest pingReq = new TcpDiscoveryPingRequest(getLocalNodeId(), clientNodeId);
pingReq.verify(getLocalNodeId());
addMessage(pingReq);
break;
}
}
try {
return fut.get(spi.ackTimeout, TimeUnit.MILLISECONDS);
}
catch (IgniteInterruptedCheckedException ignored) {
throw new InterruptedException();
}
catch (IgniteFutureTimeoutCheckedException ignored) {
if (pingFut.compareAndSet(fut, null))
fut.onDone(false);
return false;
}
catch (IgniteCheckedException e) {
throw new IgniteSpiException("Internal error: ping future cannot be done with exception", e);
}
}
/** {@inheritDoc} */
@Override protected void cleanup() {
super.cleanup();
pingResult(false);
U.closeQuiet(sock);
}
}
/**
* Base class for message workers.
*/
protected abstract class MessageWorkerAdapter extends IgniteSpiThread {
/** Pre-allocated output stream (100K). */
private final GridByteArrayOutputStream bout = new GridByteArrayOutputStream(100 * 1024);
/** Message queue. */
private final BlockingDeque<TcpDiscoveryAbstractMessage> queue = new LinkedBlockingDeque<>();
/** Backed interrupted flag. */
private volatile boolean interrupted;
/**
* @param name Thread name.
*/
protected MessageWorkerAdapter(String name) {
super(spi.ignite().name(), name, log);
setPriority(spi.threadPri);
}
/** {@inheritDoc} */
@Override protected void body() throws InterruptedException {
if (log.isDebugEnabled())
log.debug("Message worker started [locNodeId=" + getLocalNodeId() + ']');
while (!isInterrupted()) {
TcpDiscoveryAbstractMessage msg = queue.poll(2000, TimeUnit.MILLISECONDS);
if (msg == null)
continue;
processMessage(msg);
}
}
/** {@inheritDoc} */
@Override public void interrupt() {
interrupted = true;
super.interrupt();
}
/** {@inheritDoc} */
@Override public boolean isInterrupted() {
return interrupted || super.isInterrupted();
}
/**
* @return Current queue size.
*/
int queueSize() {
return queue.size();
}
/**
* Adds message to queue.
*
* @param msg Message to add.
*/
void addMessage(TcpDiscoveryAbstractMessage msg) {
if (msg.highPriority())
queue.addFirst(msg);
else
queue.add(msg);
if (log.isDebugEnabled())
log.debug("Message has been added to queue: " + msg);
}
/**
* @param msg Message.
*/
protected abstract void processMessage(TcpDiscoveryAbstractMessage msg);
/**
* @param sock Socket.
* @param msg Message.
* @throws IOException If IO failed.
* @throws IgniteCheckedException If marshalling failed.
*/
protected final void writeToSocket(Socket sock, TcpDiscoveryAbstractMessage msg)
throws IOException, IgniteCheckedException {
bout.reset();
spi.writeToSocket(sock, msg, bout);
}
}
}