blob: 70e2013d1065a0dd427384cf9390a3d97b437cdf [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.managers.discovery;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import org.apache.ignite.cluster.BaselineNode;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinator;
import org.apache.ignite.internal.processors.cluster.DiscoveryDataClusterState;
import org.apache.ignite.internal.util.GridConcurrentHashSet;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
import org.apache.ignite.internal.util.typedef.C1;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.P1;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.lang.IgniteProductVersion;
import org.jetbrains.annotations.Nullable;
/**
*
*/
public class DiscoCache {
/** */
private static final C1<BaselineNode, ClusterNode> BASELINE_TO_CLUSTER = new C1<BaselineNode, ClusterNode>() {
@Override public ClusterNode apply(BaselineNode baselineNode) {
return (ClusterNode)baselineNode;
}
};
/** */
private final DiscoveryDataClusterState state;
/** Local node. */
private final ClusterNode loc;
/** Remote nodes. */
private final List<ClusterNode> rmtNodes;
/** All nodes. */
private final List<ClusterNode> allNodes;
/** All server nodes. */
private final List<ClusterNode> srvNodes;
/** Daemon nodes. */
private final List<ClusterNode> daemonNodes;
/** Baseline nodes. */
private final List<? extends BaselineNode> baselineNodes;
/** All remote nodes with at least one cache configured. */
@GridToStringInclude
private final List<ClusterNode> rmtNodesWithCaches;
/** Cache nodes by cache name. */
@GridToStringInclude
private final Map<Integer, List<ClusterNode>> allCacheNodes;
/** Affinity cache nodes by cache name. */
@GridToStringInclude
private final Map<Integer, List<ClusterNode>> cacheGrpAffNodes;
/** Node map. */
final Map<UUID, ClusterNode> nodeMap;
/** Alive nodes. */
final Set<UUID> alives = new GridConcurrentHashSet<>();
/** Minimum {@link IgniteProductVersion} across all nodes including client nodes. */
private final IgniteProductVersion minNodeVer;
/** Minimum {@link IgniteProductVersion} across alive server nodes. */
private final IgniteProductVersion minSrvNodeVer;
/** */
private final AffinityTopologyVersion topVer;
/** */
final Map<UUID, Short> nodeIdToConsIdx;
/** */
final Map<Short, UUID> consIdxToNodeId;
/** */
private final P1<BaselineNode> aliveBaselineNodePred;
/** */
private final P1<ClusterNode> aliveNodePred;
/** */
private final MvccCoordinator mvccCrd;
/**
* @param topVer Topology version.
* @param state Current cluster state.
* @param loc Local node.
* @param mvccCrd MVCC coordinator node.
* @param rmtNodes Remote nodes.
* @param allNodes All nodes.
* @param srvNodes Server nodes.
* @param daemonNodes Daemon nodes.
* @param rmtNodesWithCaches Remote nodes with at least one cache configured.
* @param baselineNodes Baseline nodes or {@code null} if baseline was not set.
* @param allCacheNodes Cache nodes by cache name.
* @param cacheGrpAffNodes Affinity nodes by cache group ID.
* @param nodeMap Node map.
* @param alives0 Alive nodes.
* @param nodeIdToConsIdx Node ID to consistent ID mapping.
* @param consIdxToNodeId Consistent ID to node ID mapping.
* @param minNodeVer Minimum node version.
*/
DiscoCache(
AffinityTopologyVersion topVer,
DiscoveryDataClusterState state,
ClusterNode loc,
MvccCoordinator mvccCrd,
List<ClusterNode> rmtNodes,
List<ClusterNode> allNodes,
List<ClusterNode> srvNodes,
List<ClusterNode> daemonNodes,
List<ClusterNode> rmtNodesWithCaches,
@Nullable List<? extends BaselineNode> baselineNodes,
Map<Integer, List<ClusterNode>> allCacheNodes,
Map<Integer, List<ClusterNode>> cacheGrpAffNodes,
Map<UUID, ClusterNode> nodeMap,
Set<UUID> alives0,
@Nullable Map<UUID, Short> nodeIdToConsIdx,
@Nullable Map<Short, UUID> consIdxToNodeId,
IgniteProductVersion minNodeVer,
IgniteProductVersion minSrvNodeVer
) {
this.topVer = topVer;
this.state = state;
this.loc = loc;
this.mvccCrd = mvccCrd;
this.rmtNodes = rmtNodes;
this.allNodes = allNodes;
this.srvNodes = srvNodes;
this.daemonNodes = daemonNodes;
this.rmtNodesWithCaches = rmtNodesWithCaches;
this.baselineNodes = baselineNodes;
this.allCacheNodes = allCacheNodes;
this.cacheGrpAffNodes = cacheGrpAffNodes;
this.nodeMap = nodeMap;
this.alives.addAll(alives0);
this.minNodeVer = minNodeVer;
this.minSrvNodeVer = minSrvNodeVer;
this.nodeIdToConsIdx = nodeIdToConsIdx;
this.consIdxToNodeId = consIdxToNodeId;
aliveBaselineNodePred = new P1<BaselineNode>() {
@Override public boolean apply(BaselineNode node) {
return node instanceof ClusterNode && alives.contains(((ClusterNode)node).id());
}
};
aliveNodePred = new P1<ClusterNode>() {
@Override public boolean apply(ClusterNode node) {
return alives.contains(node.id());
}
};
}
/**
* @return Mvcc coordinator node.
*/
@Nullable public MvccCoordinator mvccCoordinator() {
return mvccCrd;
}
/**
* @return Topology version.
*/
public AffinityTopologyVersion version() {
return topVer;
}
/**
* @return Minimum node version.
*/
public IgniteProductVersion minimumNodeVersion() {
return minNodeVer;
}
/**
* @return Minimum server node version.
*/
public IgniteProductVersion minimumServerNodeVersion() {
return minSrvNodeVer;
}
/**
* @return Current cluster state.
*/
public DiscoveryDataClusterState state() {
return state;
}
/** @return Local node. */
public ClusterNode localNode() {
return loc;
}
/** @return Remote nodes. */
public List<ClusterNode> remoteNodes() {
return rmtNodes;
}
/**
* Returns a collection of baseline nodes.
*
* @return A collection of baseline nodes or {@code null} if baseline topology was not set.
*/
@Nullable public List<? extends BaselineNode> baselineNodes() {
return baselineNodes;
}
/**
* @param nodeId Node ID to check.
* @return {@code True} if baseline is not set or the node is in the baseline topology.
*/
public boolean baselineNode(UUID nodeId) {
return nodeIdToConsIdx == null || nodeIdToConsIdx.containsKey(nodeId);
}
/** @return All nodes. */
public List<ClusterNode> allNodes() {
return allNodes;
}
/** @return Server nodes. */
public List<ClusterNode> serverNodes() {
return srvNodes;
}
/** @return Daemon nodes. */
public List<ClusterNode> daemonNodes() {
return daemonNodes;
}
/** @return Consistent id map UUID -> Short (compacted consistent id). */
public Map<UUID, Short> consistentIdMap() {
return nodeIdToConsIdx;
}
/** @return Consistent id map Short (compacted consistent id) -> UUID. */
public Map<Short, UUID> nodeIdMap() {
return consIdxToNodeId;
}
/**
* Gets all alive remote nodes that have at least one cache configured.
*
* @return Collection of nodes.
*/
public Collection<ClusterNode> remoteAliveNodesWithCaches() {
return F.view(rmtNodesWithCaches, aliveNodePred);
}
/**
* Gets collection of server nodes with at least one cache configured.
*
* @return Collection of nodes.
*/
public Collection<ClusterNode> aliveServerNodes() {
return F.view(serverNodes(), aliveNodePred);
}
/**
* Returns a collection of live baseline nodes.
*
* @return A view of baseline nodes that are currently present in the cluster or {@code null} if baseline
* topology was not set.
*/
@Nullable public Collection<ClusterNode> aliveBaselineNodes() {
return baselineNodes == null ? null : F.viewReadOnly(baselineNodes, BASELINE_TO_CLUSTER, aliveBaselineNodePred);
}
/**
* @param node Node to check.
* @return {@code True} if the node is in baseline or if baseline is not set.
*/
public boolean baselineNode(ClusterNode node) {
return nodeIdToConsIdx == null || nodeIdToConsIdx.get(node.id()) != null;
}
/**
* @return Oldest alive server node.
*/
@SuppressWarnings("ForLoopReplaceableByForEach")
@Nullable public ClusterNode oldestAliveServerNode(){
// Avoid iterator allocation.
for (int i = 0; i < srvNodes.size(); i++) {
ClusterNode srv = srvNodes.get(i);
if (alives.contains(srv.id()))
return srv;
}
return null;
}
/**
* @return Oldest server node.
*/
@Nullable public ClusterNode oldestServerNode(){
if (!srvNodes.isEmpty())
return srvNodes.get(0);
return null;
}
/**
* @param nodeId Node ID.
* @return {@code True} if node is in alives list.
*/
public boolean alive(UUID nodeId) {
return alives.contains(nodeId);
}
/**
* Gets all nodes that have cache with given name.
*
* @param cacheName Cache name.
* @return Collection of nodes.
*/
public List<ClusterNode> cacheNodes(@Nullable String cacheName) {
return cacheNodes(CU.cacheId(cacheName));
}
/**
* Gets all nodes that have cache with given ID.
*
* @param cacheId Cache ID.
* @return Collection of nodes.
*/
public List<ClusterNode> cacheNodes(Integer cacheId) {
return emptyIfNull(allCacheNodes.get(cacheId));
}
/**
* @param grpId Cache group ID.
* @return All nodes that participate in affinity calculation.
*/
public List<ClusterNode> cacheGroupAffinityNodes(int grpId) {
return emptyIfNull(cacheGrpAffNodes.get(grpId));
}
/**
* @param id Node ID.
* @return Node.
*/
@Nullable public ClusterNode node(UUID id) {
return nodeMap.get(id);
}
/**
* Removes left node from alives lists.
*
* @param rmvd Removed node.
*/
public void updateAlives(ClusterNode rmvd) {
alives.remove(rmvd.id());
}
/**
* Removes left nodes from cached alives lists.
*
* @param discovery Discovery manager.
*/
public void updateAlives(GridDiscoveryManager discovery) {
for (UUID alive : alives) {
if (!discovery.alive(alive))
alives.remove(alive);
}
}
/**
* @param order Order.
* @return Server node instance.
*/
@Nullable public ClusterNode serverNodeByOrder(long order) {
int idx = serverNodeBinarySearch(order);
if (idx >= 0)
return srvNodes.get(idx);
return null;
}
/**
* @param order Node order.
* @return Node index.
*/
private int serverNodeBinarySearch(long order) {
int low = 0;
int high = srvNodes.size() - 1;
while (low <= high) {
int mid = (low + high) >>> 1;
ClusterNode midVal = srvNodes.get(mid);
int cmp = Long.compare(midVal.order(), order);
if (cmp < 0)
low = mid + 1;
else if (cmp > 0)
high = mid - 1;
else
return mid;
}
return -(low + 1);
}
/**
*
* Returns {@code True} if all nodes has the given attribute and its value equals to {@code expVal}.
*
* @param <T> Attribute Type.
* @param name Attribute name.
* @param expVal Expected value.
* @return {@code True} if all the given nodes has the given attribute and its value equals to {@code expVal}.
*/
public <T> boolean checkAttribute(String name, T expVal) {
for (ClusterNode node : allNodes) {
T attr = node.attribute(name);
if (attr == null || !expVal.equals(attr))
return false;
}
return true;
}
/**
* @param nodes Cluster nodes.
* @return Empty collection if nodes list is {@code null}
*/
private List<ClusterNode> emptyIfNull(List<ClusterNode> nodes) {
return nodes == null ? Collections.<ClusterNode>emptyList() : nodes;
}
/**
* @param ver Topology version.
* @param state Not {@code null} state if need override state, otherwise current state is used.
* @return Copy of discovery cache with new version.
*/
public DiscoCache copy(AffinityTopologyVersion ver, @Nullable DiscoveryDataClusterState state) {
return new DiscoCache(
ver,
state == null ? this.state : state,
loc,
mvccCrd,
rmtNodes,
allNodes,
srvNodes,
daemonNodes,
rmtNodesWithCaches,
baselineNodes,
allCacheNodes,
cacheGrpAffNodes,
nodeMap,
alives,
nodeIdToConsIdx,
consIdxToNodeId,
minNodeVer,
minSrvNodeVer);
}
/** {@inheritDoc} */
@Override public String toString() {
return S.toString(DiscoCache.class, this);
}
}