| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| * |
| */ |
| |
| package org.apache.ignite.internal.processors.cluster; |
| |
| import java.io.Serializable; |
| import java.util.ArrayList; |
| import java.util.Collection; |
| import java.util.Collections; |
| import java.util.Comparator; |
| import java.util.HashMap; |
| import java.util.HashSet; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Set; |
| import java.util.TreeSet; |
| import org.apache.ignite.cluster.BaselineNode; |
| import org.apache.ignite.cluster.ClusterNode; |
| import org.apache.ignite.internal.cluster.DetachedClusterNode; |
| import org.apache.ignite.internal.cluster.NodeOrderComparator; |
| import org.apache.ignite.internal.util.typedef.internal.CU; |
| import org.apache.ignite.internal.util.typedef.internal.U; |
| import org.apache.ignite.lang.IgnitePredicate; |
| import org.jetbrains.annotations.NotNull; |
| import org.jetbrains.annotations.Nullable; |
| |
| /** |
| * BaselineTopology represents a set of "database nodes" - nodes responsible for holding persistent-enabled caches |
| * and persisting their information to durable storage. |
| * |
| * Two major features BaselineTopology allows are: |
| * <ol> |
| * <li>Protection from conflicting updates.</li> |
| * <li>Cluster auto activation.</li> |
| * </ol> |
| * |
| * <h2>Protection from conflicting updates</h2> |
| * <p> |
| * Consider en example: there is a cluster of three nodes, A, B and C, all nodes store persistent data. |
| * |
| * Cluster history looks like this: |
| * [A,B,C] |
| * | \ |
| * (1)cluster segmentation, two parts get activated independently |
| * | | |
| * [A,B] [C] |
| * | | |
| * (2)updates to both parts |
| * |
| * After independent updates applied to both parts of cluster at point(2) node C should not be allowed to join |
| * [A,B] part. |
| * |
| * The following algorithm makes sure node C will never join [A,B] part back: |
| * <ol> |
| * <li> |
| * When the cluster is initially activated BaselineTopology is created; its property <b>branchingPntHash</b> |
| * is calculated based on consistent IDs of all nodes. |
| * </li> |
| * <li> |
| * At point(1) two parts are activated separately; BaselineTopology in each is updated: |
| * old <b>branchingPntHash</b> is moved to <b>branchingHistory</b> list; |
| * new one is calculated based on new set of nodes. |
| * </li> |
| * <li> |
| * If node C tries to join other part of cluster (e.g. after network connectivity repair) |
| * [A,B] cluster checks its <b>branchingPntHash</b> and <b>branchingHistory</b> |
| * to see if branching history of C's BaselineTopology has diverged from [A,B]. |
| * If divergence is detected [A,B] cluster refuses to let C into topology. |
| * </li> |
| * </ol> |
| * |
| * All new nodes joining a cluster should pass validation process before join; detailed algorithm of the process |
| * is described below. |
| * </p> |
| * |
| * <h3>Joining node validation algorithm</h3> |
| * Node joining a cluster firstly reads its local BaselineTopology from metastore and sends it to the cluster. |
| * |
| * <ol> |
| * <li> |
| * When BaselineTopology is set (e.g. on first activation) or recreated (e.g. with set-baseline command) |
| * its ID on all nodes is incremented by one. |
| * |
| * So when cluster receives a join request with BaselineTopology it firstly compares joining node BlT ID with |
| * local BlT ID. |
| * |
| * If joining node has a BaselineTopology with ID greater than one in cluster it means that BlT was changed |
| * more times there; therefore new node is not allowed to join the cluster. |
| * </li> |
| * <li> |
| * If user manually activates cluster when some of Baseline nodes are offline no new BlT is created. |
| * Instead current set of online nodes from BaselineTopology is used to update {@link BaselineTopology#branchingPntHash} |
| * property of current BaselineTopology. |
| * Old value of the property is moved to {@link BaselineTopology#branchingHist} list. |
| * |
| * If joining node and local BlT IDs are the same then cluster takes <b>branchingPntHash</b> of joining node |
| * and verifies that its local <b>branchingHist</b> contains that hash. |
| * |
| * If joining node hash is not presented in cluster branching history list |
| * it means that joining node was activated independently of currently running cluster; |
| * therefore new node is not allowed to join the cluster. |
| * |
| * If joining node hash is presented in the history, that it is safe to let the node join the cluster. |
| * </li> |
| * <li> |
| * When BaselineTopology is recreated (e.g. with set-baseline command) previous BaselineTopology is moved |
| * to BaselineHistory (consult source code of {@link GridClusterStateProcessor} for more details). |
| * |
| * If cluster sees that joining node BlT ID is less than cluster BlT ID it looks up for BaselineHistory item |
| * for new node ID. |
| * Having this BaselineHistory item cluster verifies that branching history of the item contains |
| * branching point hash of joining node |
| * (similar check as in the case above with only difference that joining node BlT is compared against |
| * BaselineHistory item instead of BaselineTopology). |
| * |
| * If new node branching point hash is found in the history than node is allowed to join; |
| * otherwise it is rejected. |
| * </li> |
| * </ol> |
| * |
| * <h2>Cluster auto activation</h2> |
| */ |
| public class BaselineTopology implements Serializable { |
| /** */ |
| private static final long serialVersionUID = 0L; |
| |
| /** Consistent ID comparator. */ |
| private static final Comparator<Object> CONSISTENT_ID_COMPARATOR = new Comparator<Object>() { |
| @Override public int compare(Object o1, Object o2) { |
| if (o1 instanceof Comparable && o2 instanceof Comparable && o1.getClass().equals(o2.getClass())) |
| return ((Comparable)o1).compareTo(o2); |
| |
| return o1.toString().compareTo(o2.toString()); |
| } |
| }; |
| |
| /** */ |
| private final int id; |
| |
| /** Key - node consistent ID, value - node attribute map. */ |
| private final Map<Object, Map<String, Object>> nodeMap; |
| |
| /** Compact ID to consistent ID mapping. */ |
| private final Map<Short, Object> compactIdMapping; |
| |
| /** Consistent ID to compact ID mapping. */ |
| private final Map<Object, Short> consistentIdMapping; |
| |
| /** */ |
| private BranchingPointType lastBranchingPointType; |
| |
| /** Branching point hash. */ |
| private long branchingPntHash; |
| |
| /** History of branching events. |
| * Each time when branching point hash changes previous value is added to this list. */ |
| private final List<Long> branchingHist; |
| |
| /** |
| * @param nodeMap Map of node consistent ID to it's attributes. |
| */ |
| private BaselineTopology(Map<Object, Map<String, Object>> nodeMap, int id) { |
| this.id = id; |
| |
| compactIdMapping = U.newHashMap(nodeMap.size()); |
| consistentIdMapping = U.newHashMap(nodeMap.size()); |
| |
| this.nodeMap = nodeMap; |
| |
| Set<Object> consistentIds = new TreeSet<>(CONSISTENT_ID_COMPARATOR); |
| |
| for (Object o : nodeMap.keySet()){ |
| branchingPntHash += (long)o.hashCode(); |
| |
| consistentIds.add(o); |
| } |
| |
| short compactId = 0; |
| |
| for (Object consistentId : consistentIds) { |
| compactIdMapping.put(compactId, consistentId); |
| |
| consistentIdMapping.put(consistentId, compactId++); |
| } |
| |
| lastBranchingPointType = BranchingPointType.NEW_BASELINE_TOPOLOGY; |
| |
| branchingHist = new ArrayList<>(); |
| |
| branchingHist.add(branchingPntHash); |
| } |
| |
| /** |
| * @return id of this BaselineTopology. |
| */ |
| public int id() { |
| return id; |
| } |
| |
| /** |
| * @return Set of consistent IDs. |
| */ |
| public Set<Object> consistentIds() { |
| return nodeMap.keySet(); |
| } |
| |
| /** |
| * @return Activation history. |
| */ |
| public List<Long> branchingHistory() { |
| return branchingHist; |
| } |
| |
| /** |
| * @return Compact IDs mapping. |
| */ |
| public Map<Short, Object> compactIdMapping() { |
| return compactIdMapping; |
| } |
| |
| /** |
| * @return Consistent IDs mapping. |
| */ |
| public Map<Object, Short> consistentIdMapping() { |
| return consistentIdMapping; |
| } |
| |
| /** |
| * @return Short consistent Id. |
| */ |
| public Short resolveShortConsistentId(Object constId){ |
| return consistentIdMapping.get(constId); |
| } |
| |
| /** |
| * @return Object consistent Id. |
| */ |
| public Object resolveConsistentId(Short constId){ |
| return compactIdMapping.get(constId); |
| } |
| |
| |
| /** |
| * @return Activation hash. |
| */ |
| public long branchingPointHash() { |
| return branchingPntHash; |
| } |
| |
| /** |
| * @param consId Node consistent ID. |
| * @return Node attributes map. |
| */ |
| public Map<String, Object> attributes(Object consId) { |
| return nodeMap.get(consId); |
| } |
| |
| /** |
| * |
| */ |
| public List<BaselineNode> currentBaseline() { |
| List<BaselineNode> res = new ArrayList<>(); |
| |
| for (Map.Entry<Object, Map<String, Object>> consIdAttrsEntry : nodeMap.entrySet()) |
| res.add(new DetachedClusterNode(consIdAttrsEntry.getKey(), consIdAttrsEntry.getValue())); |
| |
| return res; |
| } |
| |
| /** |
| * @param consId Consistent ID. |
| * @return Baseline node, if present in the baseline, or {@code null} if absent. |
| */ |
| public ClusterNode baselineNode(Object consId) { |
| Map<String, Object> attrs = nodeMap.get(consId); |
| |
| return attrs != null ? new DetachedClusterNode(consId, attrs) : null; |
| } |
| |
| /** |
| * @param aliveNodes Sorted list of currently alive nodes. |
| * @param nodeFilter Node filter. |
| * @return Sorted list of baseline topology nodes. |
| */ |
| public List<ClusterNode> createBaselineView( |
| List<ClusterNode> aliveNodes, |
| @Nullable IgnitePredicate<ClusterNode> nodeFilter) |
| { |
| List<ClusterNode> res = new ArrayList<>(nodeMap.size()); |
| |
| for (ClusterNode node : aliveNodes) { |
| if (nodeMap.containsKey(node.consistentId()) && (nodeFilter == null || CU.affinityNode(node, nodeFilter))) |
| res.add(node); |
| } |
| |
| assert res.size() <= nodeMap.size(); |
| |
| if (res.size() == nodeMap.size()) |
| return res; |
| |
| Map<Object, ClusterNode> consIdMap = new HashMap<>(); |
| |
| for (ClusterNode node : aliveNodes) { |
| if (nodeMap.containsKey(node.consistentId()) && (nodeFilter == null || CU.affinityNode(node, nodeFilter))) |
| consIdMap.put(node.consistentId(), node); |
| } |
| |
| for (Map.Entry<Object, Map<String, Object>> e : nodeMap.entrySet()) { |
| Object consId = e.getKey(); |
| |
| if (!consIdMap.containsKey(consId)) { |
| DetachedClusterNode node = new DetachedClusterNode(consId, e.getValue()); |
| |
| if (nodeFilter == null || CU.affinityNode(node, nodeFilter)) |
| consIdMap.put(consId, node); |
| } |
| } |
| |
| res = new ArrayList<>(); |
| |
| res.addAll(consIdMap.values()); |
| |
| Collections.sort(res, NodeOrderComparator.getInstance()); |
| |
| return res; |
| } |
| |
| /** |
| * @param presentedNodes Nodes present in cluster. |
| * @return {@code True} if current topology satisfies baseline. |
| */ |
| public boolean isSatisfied(@NotNull Collection<ClusterNode> presentedNodes) { |
| if (presentedNodes.size() < nodeMap.size()) |
| return false; |
| |
| Set<Object> presentedNodeIds = new HashSet<>(); |
| |
| for (ClusterNode node : presentedNodes) |
| presentedNodeIds.add(node.consistentId()); |
| |
| return presentedNodeIds.containsAll(nodeMap.keySet()); |
| } |
| |
| /** |
| * @return Size of the baseline topology. |
| */ |
| public int size() { |
| return nodeMap.size(); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public boolean equals(Object o) { |
| if (this == o) |
| return true; |
| if (o == null || getClass() != o.getClass()) |
| return false; |
| |
| BaselineTopology top = (BaselineTopology)o; |
| |
| return nodeMap != null ? nodeMap.keySet().equals(top.nodeMap.keySet()) : top.nodeMap == null; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public int hashCode() { |
| return nodeMap != null ? nodeMap.hashCode() : 0; |
| } |
| |
| /** |
| * @param blt1 Baseline topology instance. |
| * @param blt2 Baseline topology instance. |
| * @return {@code True} if equals. |
| */ |
| public static boolean equals(BaselineTopology blt1, BaselineTopology blt2) { |
| if (blt1 == null && blt2 == null) |
| return true; |
| |
| if (blt1 == null ^ blt2 == null) |
| return false; |
| |
| return blt1.equals(blt2); |
| } |
| |
| /** |
| * @param nodes Nodes. |
| * @param id ID of BaselineTopology to build. |
| * @return Baseline topology consisting of given nodes. |
| */ |
| @Nullable public static BaselineTopology build(Collection<? extends BaselineNode> nodes, int id) { |
| if (nodes == null) |
| return null; |
| |
| Map<Object, Map<String, Object>> nodeMap = new HashMap<>(); |
| |
| for (BaselineNode node : nodes) |
| nodeMap.put(node.consistentId(), node.attributes()); |
| |
| return new BaselineTopology(nodeMap, id); |
| } |
| |
| /** |
| * @param blt BaselineTopology to check. |
| * @return {@code True} if current BaselineTopology is compatible (the same or a newer one) with passed in Blt. |
| */ |
| boolean isCompatibleWith(BaselineTopology blt) { |
| return blt == null || (branchingPntHash == blt.branchingPntHash) || branchingHist.contains(blt.branchingPntHash); |
| } |
| |
| /** |
| * @param nodes Nodes. |
| */ |
| boolean updateHistory(Collection<? extends BaselineNode> nodes) { |
| long newTopHash = calculateTopologyHash(nodes); |
| |
| lastBranchingPointType = BranchingPointType.CLUSTER_ACTIVATION; |
| |
| if (branchingPntHash != newTopHash) { |
| branchingPntHash = newTopHash; |
| |
| branchingHist.add(newTopHash); |
| |
| return true; |
| } |
| |
| return false; |
| } |
| |
| /** |
| * Resets branching history of the current BaselineTopology. |
| * Current branching point hash and list of previous branching hashes are erased |
| * and replaced with the hash passed via method parameter. |
| * |
| * All nodes that were offline when reset happened (thus didn't reset history of their local BaselineTopologies) |
| * won't be allowed to join the cluster when started back. |
| * |
| * @param newBranchingPointHash New branching point hash. |
| */ |
| void resetBranchingHistory(long newBranchingPointHash) { |
| lastBranchingPointType = BranchingPointType.BRANCHING_HISTORY_RESET; |
| |
| branchingHist.clear(); |
| |
| branchingPntHash = newBranchingPointHash; |
| branchingHist.add(newBranchingPointHash); |
| } |
| |
| /** |
| * @param nodes Nodes. |
| */ |
| private long calculateTopologyHash(Collection<? extends BaselineNode> nodes) { |
| long res = 0; |
| |
| Set<Object> bltConsIds = nodeMap.keySet(); |
| |
| for (BaselineNode node : nodes) { |
| if (bltConsIds.contains(node.consistentId())) |
| res += (long) node.consistentId().hashCode(); |
| } |
| |
| return res; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public String toString() { |
| return "BaselineTopology [id=" + id |
| + ", branchingHash=" + branchingPntHash |
| + ", branchingType='" + lastBranchingPointType + '\'' |
| + ", baselineNodes=" + nodeMap.keySet() + ']'; |
| } |
| } |