blob: 915f84d46f99cd9405fe0f0411a94ce0367e8692 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.spi.communication.tcp.internal;
import java.net.ConnectException;
import java.net.InetSocketAddress;
import java.util.Arrays;
import java.util.Map;
import java.util.Optional;
import java.util.StringJoiner;
import java.util.UUID;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.function.Supplier;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.IgniteSystemProperties;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.internal.IgniteClientDisconnectedCheckedException;
import org.apache.ignite.internal.IgniteFutureTimeoutCheckedException;
import org.apache.ignite.internal.IgniteTooManyOpenFilesException;
import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
import org.apache.ignite.internal.util.GridConcurrentFactory;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.internal.util.ipc.shmem.IpcOutOfSystemResourcesException;
import org.apache.ignite.internal.util.nio.GridCommunicationClient;
import org.apache.ignite.internal.util.nio.GridShmemCommunicationClient;
import org.apache.ignite.internal.util.nio.GridTcpNioCommunicationClient;
import org.apache.ignite.internal.util.typedef.X;
import org.apache.ignite.internal.util.typedef.internal.LT;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.internal.util.worker.GridWorker;
import org.apache.ignite.internal.worker.WorkersRegistry;
import org.apache.ignite.plugin.extensions.communication.MessageFormatter;
import org.apache.ignite.spi.IgniteSpiException;
import org.apache.ignite.spi.IgniteSpiOperationTimeoutException;
import org.apache.ignite.spi.IgniteSpiOperationTimeoutHelper;
import org.apache.ignite.spi.communication.tcp.AttributeNames;
import org.apache.ignite.spi.communication.tcp.TcpCommunicationMetricsListener;
import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
import org.apache.ignite.spi.communication.tcp.internal.shmem.SHMemHandshakeClosure;
import org.apache.ignite.spi.discovery.IgniteDiscoveryThread;
import org.apache.ignite.thread.IgniteThreadFactory;
import org.jetbrains.annotations.Nullable;
import static java.util.Objects.nonNull;
import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
import static org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi.DISABLED_CLIENT_PORT;
import static org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi.OUT_OF_RESOURCES_TCP_MSG;
import static org.apache.ignite.spi.communication.tcp.internal.CommunicationTcpUtils.handshakeTimeoutException;
import static org.apache.ignite.spi.communication.tcp.internal.CommunicationTcpUtils.nodeAddresses;
import static org.apache.ignite.spi.communication.tcp.internal.CommunicationTcpUtils.usePairedConnections;
/**
* Registry of client connections.
*/
public class ConnectionClientPool {
/** Time threshold to log too long connection establish. */
private static final int CONNECTION_ESTABLISH_THRESHOLD_MS = 100;
/** Clients. */
private final ConcurrentMap<UUID, GridCommunicationClient[]> clients = GridConcurrentFactory.newMap();
/** Config. */
private final TcpCommunicationConfiguration cfg;
/** Attribute names. */
private final AttributeNames attrs;
/** Logger. */
private final IgniteLogger log;
/** Statistics. */
@Nullable
private volatile TcpCommunicationMetricsListener metricsLsnr;
/** Local node supplier. */
private final Supplier<ClusterNode> locNodeSupplier;
/** Node getter. */
private final Function<UUID, ClusterNode> nodeGetter;
/** Message formatter supplier. */
private final Supplier<MessageFormatter> msgFormatterSupplier;
/** Workers registry. */
private final WorkersRegistry registry;
/** Tcp communication spi. */
private final TcpCommunicationSpi tcpCommSpi;
/** Cluster state provider. */
private final ClusterStateProvider clusterStateProvider;
/** Nio server wrapper. */
private final GridNioServerWrapper nioSrvWrapper;
/** Client connect futures. */
private final ConcurrentMap<ConnectionKey, GridFutureAdapter<GridCommunicationClient>> clientFuts =
GridConcurrentFactory.newMap();
/** Stopping flag (set to {@code true} when SPI gets stopping signal). */
private volatile boolean stopping = false;
/** Scheduled executor service which closed the socket if handshake timeout is out. **/
private final ScheduledExecutorService handshakeTimeoutExecutorService;
/** Enable forcible node kill. */
private boolean forcibleNodeKillEnabled = IgniteSystemProperties
.getBoolean(IgniteSystemProperties.IGNITE_ENABLE_FORCIBLE_NODE_KILL);
/**
* @param cfg Config.
* @param attrs Attributes.
* @param log Logger.
* @param metricsLsnr Metrics listener.
* @param locNodeSupplier Local node supplier.
* @param nodeGetter Node getter.
* @param msgFormatterSupplier Message formatter supplier.
* @param registry Registry.
* @param tcpCommSpi Tcp communication spi.
* @param clusterStateProvider Cluster state provider.
* @param nioSrvWrapper Nio server wrapper.
* @param igniteInstanceName Ignite instance name.
*/
public ConnectionClientPool(
TcpCommunicationConfiguration cfg,
AttributeNames attrs,
IgniteLogger log,
TcpCommunicationMetricsListener metricsLsnr,
Supplier<ClusterNode> locNodeSupplier,
Function<UUID, ClusterNode> nodeGetter,
Supplier<MessageFormatter> msgFormatterSupplier,
WorkersRegistry registry,
TcpCommunicationSpi tcpCommSpi,
ClusterStateProvider clusterStateProvider,
GridNioServerWrapper nioSrvWrapper,
String igniteInstanceName
) {
this.cfg = cfg;
this.attrs = attrs;
this.log = log;
this.metricsLsnr = metricsLsnr;
this.locNodeSupplier = locNodeSupplier;
this.nodeGetter = nodeGetter;
this.msgFormatterSupplier = msgFormatterSupplier;
this.registry = registry;
this.tcpCommSpi = tcpCommSpi;
this.clusterStateProvider = clusterStateProvider;
this.nioSrvWrapper = nioSrvWrapper;
this.handshakeTimeoutExecutorService = newSingleThreadScheduledExecutor(
new IgniteThreadFactory(igniteInstanceName, "handshake-timeout-client")
);
}
/**
*
*/
public void stop() {
this.stopping = true;
for (GridFutureAdapter<GridCommunicationClient> fut : clientFuts.values()) {
if (fut instanceof ConnectionRequestFuture) {
// There's no way it would be done by itself at this point.
fut.onDone(new IgniteSpiException("SPI is being stopped."));
}
}
handshakeTimeoutExecutorService.shutdown();
}
/**
* Returns existing or just created client to node.
*
* @param node Node to which client should be open.
* @param connIdx Connection index.
* @return The existing or just created client.
* @throws IgniteCheckedException Thrown if any exception occurs.
*/
public GridCommunicationClient reserveClient(ClusterNode node, int connIdx) throws IgniteCheckedException {
assert node != null;
assert (connIdx >= 0 && connIdx < cfg.connectionsPerNode())
|| !(cfg.usePairedConnections() && usePairedConnections(node, attrs.pairedConnection())) : connIdx;
if (locNodeSupplier.get().isClient()) {
if (node.isClient()) {
if (DISABLED_CLIENT_PORT.equals(node.attribute(attrs.port())))
throw new IgniteSpiException("Cannot send message to the client node with no server socket opened.");
}
}
UUID nodeId = node.id();
if (log.isDebugEnabled())
log.debug("The node client is going to reserve a connection [nodeId=" + node.id() + ", connIdx=" + connIdx + "]");
while (true) {
GridCommunicationClient[] curClients = clients.get(nodeId);
GridCommunicationClient client = curClients != null && connIdx < curClients.length ?
curClients[connIdx] : null;
if (client == null) {
if (stopping)
throw new IgniteSpiException("Node is stopping.");
// Do not allow concurrent connects.
GridFutureAdapter<GridCommunicationClient> fut = new ConnectFuture();
ConnectionKey connKey = new ConnectionKey(nodeId, connIdx, -1);
GridFutureAdapter<GridCommunicationClient> oldFut = clientFuts.putIfAbsent(connKey, fut);
if (oldFut == null) {
try {
GridCommunicationClient[] curClients0 = clients.get(nodeId);
GridCommunicationClient client0 = curClients0 != null && connIdx < curClients0.length ?
curClients0[connIdx] : null;
if (client0 == null) {
client0 = createCommunicationClient(node, connIdx);
if (client0 != null) {
addNodeClient(node, connIdx, client0);
if (client0 instanceof GridTcpNioCommunicationClient) {
GridTcpNioCommunicationClient tcpClient = ((GridTcpNioCommunicationClient)client0);
if (tcpClient.session().closeTime() > 0 && removeNodeClient(nodeId, client0)) {
if (log.isDebugEnabled()) {
log.debug("Session was closed after client creation, will retry " +
"[node=" + node + ", client=" + client0 + ']');
}
client0 = null;
}
}
}
else {
U.sleep(200);
if (nodeGetter.apply(node.id()) == null)
throw new ClusterTopologyCheckedException("Failed to send message " +
"(node left topology): " + node);
}
}
fut.onDone(client0);
}
catch (NodeUnreachableException e) {
log.warning(e.getMessage());
fut = handleUnreachableNodeException(node, connIdx, fut, e);
}
catch (Throwable e) {
if (e instanceof NodeUnreachableException)
throw e;
fut.onDone(e);
if (e instanceof IgniteTooManyOpenFilesException)
throw e;
if (e instanceof Error)
throw (Error)e;
}
finally {
clientFuts.remove(connKey, fut);
}
}
else
fut = oldFut;
long clientReserveWaitTimeout = registry != null ? registry.getSystemWorkerBlockedTimeout() / 3
: cfg.connectionTimeout() / 3;
long currTimeout = System.currentTimeMillis();
// This cycle will eventually quit when future is completed by concurrent thread reserving client.
while (true) {
try {
client = fut.get(clientReserveWaitTimeout, TimeUnit.MILLISECONDS);
break;
}
catch (IgniteFutureTimeoutCheckedException ignored) {
currTimeout += clientReserveWaitTimeout;
if (log.isDebugEnabled()) {
log.debug(
"Still waiting for reestablishing connection to node " +
"[nodeId=" + node.id() + ", waitingTime=" + currTimeout + "ms]"
);
}
if (registry != null) {
GridWorker wrkr = registry.worker(Thread.currentThread().getName());
if (wrkr != null)
wrkr.updateHeartbeat();
}
}
}
if (client == null) {
if (clusterStateProvider.isLocalNodeDisconnected())
throw new IgniteCheckedException("Unable to create TCP client due to local node disconnecting.");
else
continue;
}
if (nodeGetter.apply(nodeId) == null) {
if (removeNodeClient(nodeId, client))
client.forceClose();
throw new IgniteSpiException("Destination node is not in topology: " + node.id());
}
}
assert connIdx == client.connectionIndex() : client;
if (client.reserve())
return client;
else
// Client has just been closed by idle worker. Help it and try again.
removeNodeClient(nodeId, client);
}
}
/**
* Handles {@link NodeUnreachableException}. This means that the method will try to trigger client itself to open connection.
* The only possible way of doing this is to use {@link TcpCommunicationConfiguration#connectionRequestor()}'s trigger and wait.
* Specifics of triggers implementation technically should be considered unknown, but for now it's not true and we
* expect that {@link NodeUnreachableException} won't be thrown in {@link IgniteDiscoveryThread}.
*
* @param node Node to open connection to.
* @param connIdx Connection index.
* @param fut Current future for opening connection.
* @param e Curent exception.
* @return New future that will return the client or error. {@code null} client is possible if newly opened
* connection has been closed by idle worker, at least that's what documentation says.
* @throws IgniteCheckedException If trigerring failed or trigger is not configured.
*/
private GridFutureAdapter<GridCommunicationClient> handleUnreachableNodeException(
ClusterNode node,
int connIdx,
GridFutureAdapter<GridCommunicationClient> fut,
NodeUnreachableException e
) throws IgniteCheckedException {
if (cfg.connectionRequestor() != null) {
ConnectFuture fut0 = (ConnectFuture)fut;
final ConnectionKey key = new ConnectionKey(node.id(), connIdx, -1);
ConnectionRequestFuture triggerFut = new ConnectionRequestFuture();
triggerFut.listen(f -> {
try {
fut0.onDone(f.get());
}
catch (Throwable t) {
fut0.onDone(t);
}
finally {
clientFuts.remove(key, triggerFut);
}
});
clientFuts.put(key, triggerFut);
fut = triggerFut;
try {
cfg.connectionRequestor().request(node, connIdx);
long failTimeout = cfg.failureDetectionTimeoutEnabled()
? cfg.failureDetectionTimeout()
: cfg.connectionTimeout();
fut.get(failTimeout);
}
catch (Throwable triggerException) {
if (forcibleNodeKillEnabled
&& node.isClient()
&& triggerException instanceof IgniteFutureTimeoutCheckedException
) {
CommunicationTcpUtils.failNode(node, tcpCommSpi.getSpiContext(), triggerException, log);
}
IgniteSpiException spiE = new IgniteSpiException(e);
spiE.addSuppressed(triggerException);
String msg = "Failed to wait for establishing inverse communication connection from node " + node;
log.warning(msg, spiE);
fut.onDone(spiE);
throw spiE;
}
}
else {
fut.onDone(e);
throw new IgniteCheckedException(e);
}
return fut;
}
/**
* @param node Node to create client for.
* @param connIdx Connection index.
* @return Client.
* @throws IgniteCheckedException If failed.
*/
@Nullable public GridCommunicationClient createCommunicationClient(ClusterNode node, int connIdx)
throws IgniteCheckedException {
assert node != null;
Integer shmemPort = node.attribute(attrs.shmemPort());
ClusterNode locNode = locNodeSupplier.get();
if (locNode == null)
throw new IgniteCheckedException("Failed to create NIO client (local node is stopping)");
if (log.isDebugEnabled())
log.debug("Creating NIO client to node: " + node);
// If remote node has shared memory server enabled and has the same set of MACs
// then we are likely to run on the same host and shared memory communication could be tried.
if (shmemPort != null && U.sameMacs(locNode, node)) {
try {
// https://issues.apache.org/jira/browse/IGNITE-11126 Rework failure detection logic.
GridCommunicationClient client = createShmemClient(
node,
connIdx,
shmemPort);
if (log.isDebugEnabled())
log.debug("Shmem client created: " + client);
return client;
}
catch (IgniteCheckedException e) {
if (e.hasCause(IpcOutOfSystemResourcesException.class))
// Has cause or is itself the IpcOutOfSystemResourcesException.
LT.warn(log, OUT_OF_RESOURCES_TCP_MSG);
else if (nodeGetter.apply(node.id()) != null)
LT.warn(log, e.getMessage());
else if (log.isDebugEnabled()) {
log.debug("Failed to establish shared memory connection with local node (node has left): " +
node.id());
}
}
}
final long start = System.currentTimeMillis();
GridCommunicationClient client = nioSrvWrapper.createTcpClient(node, connIdx, true);
final long time = System.currentTimeMillis() - start;
if (time > CONNECTION_ESTABLISH_THRESHOLD_MS) {
if (log.isInfoEnabled())
log.info("TCP client created [client=" + clientString(client, node) + ", duration=" + time + "ms]");
}
else if (log.isDebugEnabled())
log.debug("TCP client created [client=" + clientString(client, node) + ", duration=" + time + "ms]");
return client;
}
/**
* Returns the string representation of client with protection from null client value. If the client if null, string
* representation is built from cluster node.
*
* @param client communication client
* @param node cluster node to which the client tried to establish a connection
* @return string representation of client
* @throws IgniteCheckedException if failed
*/
public String clientString(GridCommunicationClient client, ClusterNode node) throws IgniteCheckedException {
if (client == null) {
assert node != null;
StringJoiner joiner = new StringJoiner(", ", "null, node addrs=[", "]");
for (InetSocketAddress addr : nodeAddresses(node, cfg.filterReachableAddresses(), attrs, locNodeSupplier))
joiner.add(addr.toString());
return joiner.toString();
}
else
return client.toString();
}
/**
* @param node Node.
* @param port Port.
* @param connIdx Connection index.
* @return Client.
* @throws IgniteCheckedException If failed.
*/
@Nullable public GridCommunicationClient createShmemClient(
ClusterNode node,
int connIdx,
Integer port
) throws IgniteCheckedException {
int attempt = 1;
int connectAttempts = 1;
long connTimeout0 = cfg.connectionTimeout();
IgniteSpiOperationTimeoutHelper timeoutHelper = new IgniteSpiOperationTimeoutHelper(tcpCommSpi, !node.isClient());
while (true) {
GridCommunicationClient client;
try {
client = new GridShmemCommunicationClient(
connIdx,
metricsLsnr.metricRegistry(),
port,
timeoutHelper.nextTimeoutChunk(cfg.connectionTimeout()),
log,
msgFormatterSupplier.get());
}
catch (IgniteCheckedException e) {
if (timeoutHelper.checkFailureTimeoutReached(e))
throw e;
// Reconnect for the second time, if connection is not established.
if (connectAttempts < 2 && X.hasCause(e, ConnectException.class)) {
connectAttempts++;
continue;
}
throw e;
}
try {
safeShmemHandshake(client, node.id(), timeoutHelper.nextTimeoutChunk(connTimeout0));
}
catch (IgniteSpiOperationTimeoutException e) {
client.forceClose();
if (cfg.failureDetectionTimeoutEnabled() && timeoutHelper.checkFailureTimeoutReached(e)) {
if (log.isDebugEnabled()) {
log.debug("Handshake timed out (failure threshold reached) [failureDetectionTimeout=" +
cfg.failureDetectionTimeout() + ", err=" + e.getMessage() + ", client=" + client + ']');
}
throw e;
}
assert !cfg.failureDetectionTimeoutEnabled();
if (log.isDebugEnabled()) {
log.debug("Handshake timed out (will retry with increased timeout) [timeout=" + connTimeout0 +
", err=" + e.getMessage() + ", client=" + client + ']');
}
if (attempt == cfg.reconCount() || connTimeout0 > cfg.maxConnectionTimeout()) {
if (log.isDebugEnabled()) {
log.debug("Handshake timedout (will stop attempts to perform the handshake) " +
"[timeout=" + connTimeout0 + ", maxConnTimeout=" + cfg.maxConnectionTimeout() +
", attempt=" + attempt + ", reconCnt=" + cfg.reconCount() +
", err=" + e.getMessage() + ", client=" + client + ']');
}
throw e;
}
else {
attempt++;
connTimeout0 *= 2;
continue;
}
}
catch (IgniteCheckedException | RuntimeException | Error e) {
if (log.isDebugEnabled())
log.debug("Caught exception (will close client) [err=" + e.getMessage() + ", client=" + client + ']');
client.forceClose();
throw e;
}
return client;
}
}
/**
* @param node Node.
* @param connIdx Connection index.
* @param addClient Client to add.
*/
public void addNodeClient(ClusterNode node, int connIdx, GridCommunicationClient addClient) {
assert cfg.connectionsPerNode() > 0 : cfg.connectionsPerNode();
assert connIdx == addClient.connectionIndex() : addClient;
if (log.isDebugEnabled()) {
log.debug(
"The node client is going to create a connection " +
"[nodeId=" + node.id() + ", connIdx=" + connIdx + ", client=" + addClient + "]"
);
}
if (connIdx >= cfg.connectionsPerNode()) {
assert !(cfg.usePairedConnections() && usePairedConnections(node, attrs.pairedConnection()));
return;
}
for (; ; ) {
GridCommunicationClient[] curClients = clients.get(node.id());
assert curClients == null || curClients[connIdx] == null : "Client already created [node=" + node.id() +
", connIdx=" + connIdx +
", client=" + addClient +
", oldClient=" + curClients[connIdx] + ']';
GridCommunicationClient[] newClients;
if (curClients == null) {
newClients = new GridCommunicationClient[cfg.connectionsPerNode()];
newClients[connIdx] = addClient;
if (clients.putIfAbsent(node.id(), newClients) == null)
break;
}
else {
newClients = curClients.clone();
newClients[connIdx] = addClient;
if (log.isDebugEnabled())
log.debug("The node client was replaced [nodeId=" + node.id() + ", connIdx=" + connIdx + ", client=" + addClient + "]");
if (clients.replace(node.id(), curClients, newClients))
break;
}
}
}
/**
* @param nodeId Node ID.
* @param rmvClient Client to remove.
* @return {@code True} if client was removed.
*/
public boolean removeNodeClient(UUID nodeId, GridCommunicationClient rmvClient) {
if (log.isDebugEnabled())
log.debug("The client was removed [nodeId=" + nodeId + ", client=" + rmvClient.toString() + "].");
for (; ; ) {
GridCommunicationClient[] curClients = clients.get(nodeId);
if (curClients == null
|| rmvClient.connectionIndex() >= curClients.length
|| curClients[rmvClient.connectionIndex()] != rmvClient)
return false;
GridCommunicationClient[] newClients = Arrays.copyOf(curClients, curClients.length);
newClients[rmvClient.connectionIndex()] = null;
if (clients.replace(nodeId, curClients, newClients))
return true;
}
}
/**
* Closing connections to node.
* NOTE: It is recommended only for tests.
*
* @param nodeId Node for which to close connections.
* @throws IgniteCheckedException If occurs.
*/
public void forceCloseConnection(UUID nodeId) throws IgniteCheckedException {
if (log.isDebugEnabled())
log.debug("The node client connections were closed [nodeId=" + nodeId + "]");
GridCommunicationClient[] clients = this.clients.remove(nodeId);
if (nonNull(clients)) {
for (GridCommunicationClient client : clients)
client.forceClose();
}
for (ConnectionKey connKey : clientFuts.keySet()) {
if (!nodeId.equals(connKey.nodeId()))
continue;
GridFutureAdapter<GridCommunicationClient> fut = clientFuts.remove(connKey);
if (nonNull(fut))
Optional.ofNullable(fut.get()).ifPresent(GridCommunicationClient::forceClose);
}
}
/**
* @param nodeId Node id.
*/
public void onNodeLeft(UUID nodeId) {
GridCommunicationClient[] clients0 = clients.remove(nodeId);
if (clients0 != null) {
for (GridCommunicationClient client : clients0) {
if (client != null) {
if (log.isDebugEnabled()) {
log.debug("Forcing NIO client close since node has left [nodeId=" + nodeId +
", client=" + client + ']');
}
client.forceClose();
}
}
}
}
/**
* @param id Id.
*/
public GridCommunicationClient[] clientFor(UUID id) {
return clients.get(id);
}
/**
* Clients entries.
*/
public Iterable<? extends Map.Entry<UUID, GridCommunicationClient[]>> entrySet() {
return clients.entrySet();
}
/**
* @param connKey Connection key.
* @param fut Future.
*/
public void removeFut(ConnectionKey connKey, GridFutureAdapter<GridCommunicationClient> fut) {
clientFuts.remove(connKey, fut);
}
/**
* @param connKey Connection key.
*/
public GridFutureAdapter<GridCommunicationClient> getFut(ConnectionKey connKey) {
return clientFuts.get(connKey);
}
/**
* @param key Key.
* @param fut Future.
*/
public GridFutureAdapter<GridCommunicationClient> putIfAbsentFut(
ConnectionKey key,
GridFutureAdapter<GridCommunicationClient> fut
) {
return clientFuts.putIfAbsent(key, fut);
}
/**
* Close all connections of this instance.
*/
public void forceClose() {
for (GridCommunicationClient[] clients0 : clients.values()) {
for (GridCommunicationClient client : clients0) {
if (client != null)
client.forceClose();
}
}
}
/**
* @param err Err.
*/
public void completeFutures(IgniteClientDisconnectedCheckedException err) {
for (GridFutureAdapter<GridCommunicationClient> clientFut : clientFuts.values())
clientFut.onDone(err);
}
/**
* Performs handshake in timeout-safe way.
*
* @param client Client.
* @param rmtNodeId Remote node.
* @param timeout Timeout for handshake.
* @throws IgniteCheckedException If handshake failed or wasn't completed withing timeout.
*/
@SuppressWarnings("ThrowFromFinallyBlock")
private void safeShmemHandshake(
GridCommunicationClient client,
UUID rmtNodeId,
long timeout
) throws IgniteCheckedException {
HandshakeTimeoutObject obj = new HandshakeTimeoutObject(client);
handshakeTimeoutExecutorService.schedule(obj, timeout, TimeUnit.MILLISECONDS);
try {
client.doHandshake(new SHMemHandshakeClosure(log, rmtNodeId, clusterStateProvider, locNodeSupplier));
}
finally {
if (!obj.cancel())
throw handshakeTimeoutException();
}
}
/**
* @param metricsLsnr New statistics.
*/
public void metricsListener(@Nullable TcpCommunicationMetricsListener metricsLsnr) {
this.metricsLsnr = metricsLsnr;
}
}