blob: d2a508126c9b663e04533df409fd505a95a2eeb5 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.configuration;
import java.util.BitSet;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.internal.cluster.graph.BitSetIterator;
import org.apache.ignite.internal.cluster.graph.ClusterGraph;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.resources.LoggerResource;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
/**
* Default Communication Failure Resolver.
*/
public class DefaultCommunicationFailureResolver implements CommunicationFailureResolver {
/** */
@LoggerResource
private IgniteLogger log;
/** {@inheritDoc} */
@Override public void resolve(CommunicationFailureContext ctx) {
ClusterPart largestCluster = findLargestConnectedCluster(ctx);
if (largestCluster == null)
return;
log.info("Communication problem resolver found fully connected independent cluster ["
+ "serverNodesCnt=" + largestCluster.srvNodesCnt + ", "
+ "clientNodesCnt=" + largestCluster.connectedClients.size() + ", "
+ "totalAliveNodes=" + ctx.topologySnapshot().size() + ", "
+ "serverNodesIds=" + clusterNodeIds(largestCluster.srvNodesSet, ctx.topologySnapshot(), 1000) + "]");
keepCluster(ctx, largestCluster);
}
/**
* Finds largest part of the cluster where each node is able to connect to each other.
*
* @param ctx Communication failure context.
* @return Largest part of the cluster nodes to keep.
*/
@Nullable private ClusterPart findLargestConnectedCluster(CommunicationFailureContext ctx) {
List<ClusterNode> srvNodes = ctx.topologySnapshot()
.stream()
.filter(node -> !node.isClient())
.collect(Collectors.toList());
// Exclude client nodes from analysis.
ClusterGraph graph = new ClusterGraph(ctx, ClusterNode::isClient);
List<BitSet> components = graph.findConnectedComponents();
if (components.isEmpty()) {
U.warn(log, "Unable to find at least one alive server node in the cluster " + ctx);
return null;
}
if (components.size() == 1) {
BitSet nodesSet = components.get(0);
int nodeCnt = nodesSet.cardinality();
boolean fullyConnected = graph.checkFullyConnected(nodesSet);
if (fullyConnected && nodeCnt == srvNodes.size()) {
U.warn(log, "All alive nodes are fully connected, this should be resolved automatically.");
return null;
}
if (log.isInfoEnabled())
log.info("Communication problem resolver detected partial lost for some connections inside cluster. "
+ "Will keep largest set of healthy fully-connected nodes. Other nodes will be killed forcibly.");
BitSet fullyConnectedPart = graph.findLargestFullyConnectedComponent(nodesSet);
Set<ClusterNode> connectedClients = findConnectedClients(ctx, fullyConnectedPart);
return new ClusterPart(fullyConnectedPart, connectedClients);
}
// If cluster has splitted on several parts and there are at least 2 parts which aren't single node
// It means that split brain has happened.
boolean isSplitBrain = components.size() > 1 &&
components.stream().filter(cmp -> cmp.size() > 1).count() > 1;
if (isSplitBrain)
U.warn(log, "Communication problem resolver detected split brain. "
+ "Cluster has splitted on " + components.size() + " independent parts. "
+ "Will keep only one largest fully-connected part. "
+ "Other nodes will be killed forcibly.");
else
U.warn(log, "Communication problem resolver detected full lost for some connections inside cluster. "
+ "Problem nodes will be found and killed forcibly.");
// For each part of splitted cluster extract largest fully-connected component.
ClusterPart largestCluster = null;
for (int i = 0; i < components.size(); i++) {
BitSet clusterPart = components.get(i);
BitSet fullyConnectedPart = graph.findLargestFullyConnectedComponent(clusterPart);
Set<ClusterNode> connectedClients = findConnectedClients(ctx, fullyConnectedPart);
ClusterPart curr = new ClusterPart(fullyConnectedPart, connectedClients);
if (largestCluster == null || curr.compareTo(largestCluster) > 0)
largestCluster = curr;
}
assert largestCluster != null
: "Unable to find at least one alive independent cluster.";
return largestCluster;
}
/**
* Keeps server cluster nodes presented in given {@code srvNodesSet}.
* Client nodes which have connections to presented {@code srvNodesSet} will be also keeped.
* Other nodes will be killed forcibly.
*
* @param ctx Communication failure context.
* @param clusterPart Set of nodes need to keep in the cluster.
*/
private void keepCluster(CommunicationFailureContext ctx, ClusterPart clusterPart) {
List<ClusterNode> allNodes = ctx.topologySnapshot();
// Kill server nodes.
for (int idx = 0; idx < allNodes.size(); idx++) {
ClusterNode node = allNodes.get(idx);
// Client nodes will be processed separately.
if (node.isClient())
continue;
if (!clusterPart.srvNodesSet.get(idx))
ctx.killNode(node);
}
// Kill client nodes unable to connect to the presented part of cluster.
for (int idx = 0; idx < allNodes.size(); idx++) {
ClusterNode node = allNodes.get(idx);
if (node.isClient() && !clusterPart.connectedClients.contains(node))
ctx.killNode(node);
}
}
/**
* Finds set of the client nodes which are able to connect to given set of server nodes {@code srvNodesSet}.
*
* @param ctx Communication failure context.
* @param srvNodesSet Server nodes set.
* @return Set of client nodes.
*/
private Set<ClusterNode> findConnectedClients(CommunicationFailureContext ctx, BitSet srvNodesSet) {
Set<ClusterNode> connectedClients = new HashSet<>();
List<ClusterNode> allNodes = ctx.topologySnapshot();
for (ClusterNode node : allNodes) {
if (!node.isClient())
continue;
boolean hasConnections = true;
Iterator<Integer> it = new BitSetIterator(srvNodesSet);
while (it.hasNext()) {
int srvNodeIdx = it.next();
ClusterNode srvNode = allNodes.get(srvNodeIdx);
if (!ctx.connectionAvailable(node, srvNode) || !ctx.connectionAvailable(srvNode, node)) {
hasConnections = false;
break;
}
}
if (hasConnections)
connectedClients.add(node);
}
return connectedClients;
}
/**
* Class representing part of cluster.
*/
private static class ClusterPart implements Comparable<ClusterPart> {
/** Server nodes count. */
int srvNodesCnt;
/** Server nodes set. */
BitSet srvNodesSet;
/** Set of client nodes are able to connect to presented part of server nodes. */
Set<ClusterNode> connectedClients;
/**
* Constructor.
*
* @param srvNodesSet Server nodes set.
* @param connectedClients Set of client nodes.
*/
public ClusterPart(BitSet srvNodesSet, Set<ClusterNode> connectedClients) {
this.srvNodesSet = srvNodesSet;
this.srvNodesCnt = srvNodesSet.cardinality();
this.connectedClients = connectedClients;
}
/** {@inheritDoc} */
@Override public int compareTo(@NotNull ClusterPart o) {
int srvNodesCmp = Integer.compare(srvNodesCnt, o.srvNodesCnt);
if (srvNodesCmp != 0)
return srvNodesCmp;
return Integer.compare(connectedClients.size(), o.connectedClients.size());
}
}
/**
* @param cluster Cluster nodes mask.
* @param nodes Nodes.
* @param limit IDs limit.
* @return Cluster node IDs string.
*/
private static String clusterNodeIds(BitSet cluster, List<ClusterNode> nodes, int limit) {
int startIdx = 0;
StringBuilder builder = new StringBuilder();
int cnt = 0;
for (;;) {
int idx = cluster.nextSetBit(startIdx);
if (idx == -1)
break;
startIdx = idx + 1;
if (builder.length() == 0)
builder.append('[');
else
builder.append(", ");
builder.append(nodes.get(idx).id());
if (cnt++ > limit)
builder.append(", ...");
}
builder.append(']');
return builder.toString();
}
/** {@inheritDoc} */
@Override public String toString() {
return S.toString(DefaultCommunicationFailureResolver.class, this);
}
}