blob: 177dabb349505944941566b586a2af289e9650d8 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.spi.communication.tcp.internal;
import java.net.InetSocketAddress;
import java.nio.channels.SocketChannel;
import java.util.BitSet;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.events.DiscoveryEvent;
import org.apache.ignite.events.Event;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
import org.apache.ignite.internal.util.GridLeanMap;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.internal.util.nio.GridNioServer;
import org.apache.ignite.internal.util.nio.GridNioSession;
import org.apache.ignite.internal.util.nio.GridNioSessionMetaKey;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteInClosure;
import org.apache.ignite.lang.IgniteUuid;
import org.apache.ignite.spi.IgniteSpiTimeoutObject;
import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
import org.jetbrains.annotations.Nullable;
import static org.apache.ignite.events.EventType.EVT_NODE_FAILED;
import static org.apache.ignite.events.EventType.EVT_NODE_LEFT;
/**
* Tcp Communication Connection Check Future.
*/
public class TcpCommunicationConnectionCheckFuture
extends GridFutureAdapter<BitSet> implements IgniteSpiTimeoutObject, GridLocalEventListener {
/** Session future. */
public static final int SES_FUT_META = GridNioSessionMetaKey.nextUniqueKey();
/** */
private static final AtomicIntegerFieldUpdater<SingleAddressConnectFuture> connFutDoneUpdater =
AtomicIntegerFieldUpdater.newUpdater(SingleAddressConnectFuture.class, "done");
/** */
private static final AtomicIntegerFieldUpdater<MultipleAddressesConnectFuture> connResCntUpdater =
AtomicIntegerFieldUpdater.newUpdater(MultipleAddressesConnectFuture.class, "resCnt");
/** */
private final AtomicInteger resCntr = new AtomicInteger();
/** */
private final List<ClusterNode> nodes;
/** */
private volatile ConnectFuture[] futs;
/** */
private final GridNioServer nioSrvr;
/** */
private final TcpCommunicationSpi spi;
/** */
private final IgniteUuid timeoutObjId = IgniteUuid.randomUuid();
/** */
private final BitSet resBitSet;
/** */
private long endTime;
/** */
private final IgniteLogger log;
/**
* @param spi SPI instance.
* @param log Logger.
* @param nioSrvr NIO server.
* @param nodes Nodes to check.
*/
public TcpCommunicationConnectionCheckFuture(TcpCommunicationSpi spi,
IgniteLogger log,
GridNioServer nioSrvr,
List<ClusterNode> nodes)
{
this.spi = spi;
this.log = log;
this.nioSrvr = nioSrvr;
this.nodes = nodes;
resBitSet = new BitSet(nodes.size());
}
/**
* @param timeout Connect timeout.
*/
public void init(long timeout) {
ConnectFuture[] futs = new ConnectFuture[nodes.size()];
UUID locId = spi.getSpiContext().localNode().id();
for (int i = 0; i < nodes.size(); i++) {
ClusterNode node = nodes.get(i);
if (!node.id().equals(locId)) {
if (spi.getSpiContext().node(node.id()) == null) {
receivedConnectionStatus(i, false);
continue;
}
Collection<InetSocketAddress> addrs;
try {
addrs = spi.nodeAddresses(node, false);
}
catch (Exception e) {
U.error(log, "Failed to get node addresses: " + node, e);
receivedConnectionStatus(i, false);
continue;
}
if (addrs.size() == 1) {
SingleAddressConnectFuture fut = new SingleAddressConnectFuture(i);
fut.init(addrs.iterator().next(), node.consistentId(), node.id());
futs[i] = fut;
}
else {
MultipleAddressesConnectFuture fut = new MultipleAddressesConnectFuture(i);
fut.init(addrs, node.consistentId(), node.id());
futs[i] = fut;
}
}
else
receivedConnectionStatus(i, true);
}
this.futs = futs;
spi.getSpiContext().addLocalEventListener(this, EVT_NODE_LEFT, EVT_NODE_FAILED);
if (!isDone()) {
endTime = System.currentTimeMillis() + timeout;
spi.getSpiContext().addTimeoutObject(this);
}
}
/**
* @param idx Node index.
* @param res Success flag.
*/
private void receivedConnectionStatus(int idx, boolean res) {
assert resCntr.get() < nodes.size();
synchronized (resBitSet) {
resBitSet.set(idx, res);
}
if (resCntr.incrementAndGet() == nodes.size())
onDone(resBitSet);
}
/**
* @param nodeIdx Node index.
* @return Node ID.
*/
private UUID nodeId(int nodeIdx) {
return nodes.get(nodeIdx).id();
}
/** {@inheritDoc} */
@Override public IgniteUuid id() {
return timeoutObjId;
}
/** {@inheritDoc} */
@Override public long endTime() {
return endTime;
}
/** {@inheritDoc} */
@Override public void onEvent(Event evt) {
if (isDone())
return;
assert evt instanceof DiscoveryEvent : evt;
assert evt.type() == EVT_NODE_LEFT || evt.type() == EVT_NODE_FAILED;
UUID nodeId = ((DiscoveryEvent)evt).eventNode().id();
for (int i = 0; i < nodes.size(); i++) {
if (nodes.get(i).id().equals(nodeId)) {
ConnectFuture fut = futs[i];
if (fut != null)
fut.onNodeFailed();
return;
}
}
}
/** {@inheritDoc} */
@Override public void onTimeout() {
if (isDone())
return;
ConnectFuture[] futs = this.futs;
for (int i = 0; i < futs.length; i++) {
ConnectFuture fut = futs[i];
if (fut != null)
fut.onTimeout();
}
}
/** {@inheritDoc} */
@Override public boolean onDone(@Nullable BitSet res, @Nullable Throwable err) {
if (super.onDone(res, err)) {
spi.getSpiContext().removeTimeoutObject(this);
spi.getSpiContext().removeLocalEventListener(this);
return true;
}
return false;
}
/**
*
*/
private interface ConnectFuture {
/**
*
*/
void onTimeout();
/**
*
*/
void onNodeFailed();
}
/**
*
*/
private class SingleAddressConnectFuture implements TcpCommunicationNodeConnectionCheckFuture, ConnectFuture {
/** */
final int nodeIdx;
/** */
volatile int done;
/** */
Map<Integer, Object> sesMeta;
/** */
private SocketChannel ch;
/**
* @param nodeIdx Node index.
*/
SingleAddressConnectFuture(int nodeIdx) {
this.nodeIdx = nodeIdx;
}
/**
* @param addr Node address.
* @param consistentId Consistent if of the node.
* @param rmtNodeId Id of node to open connection check session with.
*/
public void init(InetSocketAddress addr, Object consistentId, UUID rmtNodeId) {
boolean connect;
try {
ch = SocketChannel.open();
ch.configureBlocking(false);
ch.socket().setTcpNoDelay(true);
ch.socket().setKeepAlive(false);
connect = ch.connect(addr);
}
catch (Exception e) {
finish(false);
return;
}
if (!connect) {
sesMeta = new GridLeanMap<>(3);
// Set dummy key to identify connection-check outgoing connection.
ConnectionKey connKey = new ConnectionKey(rmtNodeId, -1, -1, true);
sesMeta.put(TcpCommunicationSpi.CONN_IDX_META, connKey);
sesMeta.put(TcpCommunicationSpi.CONSISTENT_ID_META, consistentId);
sesMeta.put(SES_FUT_META, this);
nioSrvr.createSession(ch, sesMeta, true, new IgniteInClosure<IgniteInternalFuture<GridNioSession>>() {
@Override public void apply(IgniteInternalFuture<GridNioSession> fut) {
if (fut.error() != null)
finish(false);
}
});
}
}
/**
*
*/
@SuppressWarnings("unchecked")
void cancel() {
if (finish(false))
nioSrvr.cancelConnect(ch, sesMeta);
}
/** {@inheritDoc} */
@Override public void onTimeout() {
cancel();
}
/** {@inheritDoc} */
@Override public void onConnected(UUID rmtNodeId) {
finish(nodeId(nodeIdx).equals(rmtNodeId));
}
/** {@inheritDoc} */
@Override public void onNodeFailed() {
cancel();
}
/**
* @param res Result.
* @return {@code True} if result was set by this call.
*/
public boolean finish(boolean res) {
if (connFutDoneUpdater.compareAndSet(this, 0, 1)) {
onStatusReceived(res);
return true;
}
return false;
}
/**
* @param res Result.
*/
void onStatusReceived(boolean res) {
receivedConnectionStatus(nodeIdx, res);
}
}
/**
*
*/
private class MultipleAddressesConnectFuture implements ConnectFuture {
/** */
volatile int resCnt;
/** */
volatile SingleAddressConnectFuture[] futs;
/** */
final int nodeIdx;
/**
* @param nodeIdx Node index.
*/
MultipleAddressesConnectFuture(int nodeIdx) {
this.nodeIdx = nodeIdx;
}
/** {@inheritDoc} */
@Override public void onNodeFailed() {
SingleAddressConnectFuture[] futs = this.futs;
for (int i = 0; i < futs.length; i++) {
ConnectFuture fut = futs[i];
if (fut != null)
fut.onNodeFailed();
}
}
/** {@inheritDoc} */
@Override public void onTimeout() {
SingleAddressConnectFuture[] futs = this.futs;
for (int i = 0; i < futs.length; i++) {
ConnectFuture fut = futs[i];
if (fut != null)
fut.onTimeout();
}
}
/**
* @param addrs Node addresses.
* @param consistentId Consistent if of the node.
* @param rmtNodeId Id of node to open connection check session with.
*/
void init(Collection<InetSocketAddress> addrs, Object consistentId, UUID rmtNodeId) {
SingleAddressConnectFuture[] futs = new SingleAddressConnectFuture[addrs.size()];
for (int i = 0; i < addrs.size(); i++) {
SingleAddressConnectFuture fut = new SingleAddressConnectFuture(nodeIdx) {
@Override void onStatusReceived(boolean res) {
receivedAddressStatus(res);
}
};
futs[i] = fut;
}
this.futs = futs;
int idx = 0;
for (InetSocketAddress addr : addrs) {
futs[idx++].init(addr, consistentId, rmtNodeId);
if (resCnt == Integer.MAX_VALUE)
return;
}
// Close race.
if (done())
cancelFutures();
}
/**
* @return {@code True}
*/
private boolean done() {
int resCnt0 = resCnt;
return resCnt0 == Integer.MAX_VALUE || resCnt0 == futs.length;
}
/**
*
*/
private void cancelFutures() {
SingleAddressConnectFuture[] futs = this.futs;
if (futs != null) {
for (int i = 0; i < futs.length; i++) {
SingleAddressConnectFuture fut = futs[i];
fut.cancel();
}
}
}
/**
* @param res Result.
*/
void receivedAddressStatus(boolean res) {
if (res) {
for (;;) {
int resCnt0 = resCnt;
if (resCnt0 == Integer.MAX_VALUE)
return;
if (connResCntUpdater.compareAndSet(this, resCnt0, Integer.MAX_VALUE)) {
receivedConnectionStatus(nodeIdx, true);
cancelFutures(); // Cancel others connects if they are still in progress.
return;
}
}
}
else {
for (;;) {
int resCnt0 = resCnt;
if (resCnt0 == Integer.MAX_VALUE)
return;
int resCnt1 = resCnt0 + 1;
if (connResCntUpdater.compareAndSet(this, resCnt0, resCnt1)) {
if (resCnt1 == futs.length)
receivedConnectionStatus(nodeIdx, false);
return;
}
}
}
}
}
}