| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.ignite.internal.processors.cache; |
| |
| import java.util.Collection; |
| import java.util.Collections; |
| import java.util.List; |
| import java.util.Set; |
| import java.util.UUID; |
| import org.apache.ignite.IgniteCheckedException; |
| import org.apache.ignite.IgniteException; |
| import org.apache.ignite.binary.BinaryObject; |
| import org.apache.ignite.cache.affinity.AffinityFunction; |
| import org.apache.ignite.cache.affinity.AffinityKeyMapper; |
| import org.apache.ignite.cluster.ClusterNode; |
| import org.apache.ignite.internal.IgniteInternalFuture; |
| import org.apache.ignite.internal.processors.affinity.AffinityAssignment; |
| import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; |
| import org.apache.ignite.internal.processors.affinity.GridAffinityAssignmentCache; |
| import org.apache.ignite.internal.util.future.GridFinishedFuture; |
| import org.apache.ignite.internal.util.typedef.F; |
| import org.jetbrains.annotations.Nullable; |
| |
| /** |
| * Cache affinity manager. |
| */ |
| public class GridCacheAffinityManager extends GridCacheManagerAdapter { |
| /** */ |
| private static final AffinityTopologyVersion LOC_CACHE_TOP_VER = new AffinityTopologyVersion(1); |
| |
| /** */ |
| private static final String FAILED_TO_FIND_CACHE_ERR_MSG = "Failed to find cache (cache was not started " + |
| "yet or cache was already stopped): "; |
| |
| /** Affinity cached function. */ |
| private GridAffinityAssignmentCache aff; |
| |
| /** */ |
| private AffinityFunction affFunction; |
| |
| /** */ |
| private AffinityKeyMapper affMapper; |
| |
| /** {@inheritDoc} */ |
| @Override public void start0() throws IgniteCheckedException { |
| affFunction = cctx.config().getAffinity(); |
| affMapper = cctx.config().getAffinityMapper(); |
| |
| aff = cctx.group().affinity(); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override protected void onKernalStart0() throws IgniteCheckedException { |
| if (cctx.isLocal()) |
| // No discovery event needed for local affinity. |
| aff.calculate(LOC_CACHE_TOP_VER, null, null); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override protected void stop0(boolean cancel, boolean destroy) { |
| aff = null; |
| } |
| |
| /** |
| * Gets affinity ready future, a future that will be completed after affinity with given |
| * topology version is calculated. |
| * |
| * @param topVer Topology version to wait. |
| * @return Affinity ready future. |
| */ |
| public IgniteInternalFuture<AffinityTopologyVersion> affinityReadyFuture(long topVer) { |
| return affinityReadyFuture(new AffinityTopologyVersion(topVer)); |
| } |
| |
| /** |
| * Gets affinity ready future, a future that will be completed after affinity with given |
| * topology version is calculated. |
| * |
| * @param topVer Topology version to wait. |
| * @return Affinity ready future. |
| */ |
| public IgniteInternalFuture<AffinityTopologyVersion> affinityReadyFuture(AffinityTopologyVersion topVer) { |
| assert !cctx.isLocal(); |
| |
| IgniteInternalFuture<AffinityTopologyVersion> fut = aff.readyFuture(topVer); |
| |
| return fut != null ? fut : new GridFinishedFuture<>(aff.lastVersion()); |
| } |
| |
| /** |
| * Gets affinity ready future that will be completed after affinity with given topology version is calculated. |
| * Will return {@code null} if topology with given version is ready by the moment method is invoked. |
| * |
| * @param topVer Topology version to wait. |
| * @return Affinity ready future or {@code null}. |
| */ |
| @Nullable public IgniteInternalFuture<AffinityTopologyVersion> affinityReadyFuturex(AffinityTopologyVersion topVer) { |
| assert !cctx.isLocal(); |
| |
| return aff.readyFuture(topVer); |
| } |
| |
| /** |
| * @param topVer Topology version. |
| * @return Affinity assignments. |
| */ |
| public List<List<ClusterNode>> assignments(AffinityTopologyVersion topVer) { |
| if (cctx.isLocal()) |
| topVer = LOC_CACHE_TOP_VER; |
| |
| GridAffinityAssignmentCache aff0 = aff; |
| |
| if (aff0 == null) |
| throw new IgniteException(FAILED_TO_FIND_CACHE_ERR_MSG + cctx.name()); |
| |
| return aff0.assignments(topVer); |
| } |
| |
| /** |
| * @return Assignment. |
| */ |
| public List<List<ClusterNode>> idealAssignment() { |
| assert !cctx.isLocal(); |
| |
| return aff.idealAssignmentRaw(); |
| } |
| |
| /** |
| * @return Partition count. |
| */ |
| public int partitions() { |
| GridAffinityAssignmentCache aff0 = aff; |
| |
| if (aff0 == null) |
| throw new IgniteException(FAILED_TO_FIND_CACHE_ERR_MSG + cctx.name()); |
| |
| return aff0.partitions(); |
| } |
| |
| /** |
| * @param key Key. |
| * @return Partition. |
| */ |
| public int partition(Object key) { |
| return partition(key, true); |
| } |
| |
| /** |
| * NOTE: Use this method always when you need to calculate partition id for |
| * a key provided by user. It's required since we should apply affinity mapper |
| * logic in order to find a key that will eventually be passed to affinity function. |
| * |
| * @param key Key. |
| * @param useKeyPart If {@code true} can use pre-calculated partition stored in KeyCacheObject. |
| * @return Partition. |
| */ |
| public int partition(Object key, boolean useKeyPart) { |
| GridAffinityAssignmentCache aff0 = aff; |
| |
| if (aff0 == null) |
| throw new IgniteException(FAILED_TO_FIND_CACHE_ERR_MSG + cctx.name()); |
| |
| if (useKeyPart && (key instanceof KeyCacheObject)) { |
| int part = ((KeyCacheObject)key).partition(); |
| |
| if (part != -1) |
| return part; |
| } |
| |
| return affFunction.partition(affinityKey(key)); |
| } |
| |
| /** |
| * If Key is {@link GridCacheInternal GridCacheInternal} entry when won't passed into user's mapper and |
| * will use {@link GridCacheDefaultAffinityKeyMapper default}. |
| * |
| * @param key Key. |
| * @return Affinity key. |
| */ |
| public Object affinityKey(Object key) { |
| CacheObjectContext coCtx = cctx.cacheObjectContext(); |
| |
| if (key instanceof CacheObject && !(key instanceof BinaryObject)) |
| key = ((CacheObject)key).value(coCtx, false); |
| |
| return (key instanceof GridCacheInternal ? coCtx.defaultAffMapper() : affMapper).affinityKey(key); |
| } |
| |
| /** |
| * @param key Key. |
| * @param topVer Topology version. |
| * @return Affinity nodes. |
| */ |
| public List<ClusterNode> nodesByKey(Object key, AffinityTopologyVersion topVer) { |
| return nodesByPartition(partition(key), topVer); |
| } |
| |
| /** |
| * @param part Partition. |
| * @param topVer Topology version. |
| * @return Affinity nodes. |
| */ |
| public List<ClusterNode> nodesByPartition(int part, AffinityTopologyVersion topVer) { |
| if (cctx.isLocal()) |
| topVer = LOC_CACHE_TOP_VER; |
| |
| GridAffinityAssignmentCache aff0 = aff; |
| |
| if (aff0 == null) |
| throw new IgniteException(FAILED_TO_FIND_CACHE_ERR_MSG + cctx.name()); |
| |
| return aff0.nodes(part, topVer); |
| } |
| |
| /** |
| * Get affinity assignment for the given topology version. |
| * |
| * @param topVer Topology version. |
| * @return Affinity assignment. |
| */ |
| public AffinityAssignment assignment(AffinityTopologyVersion topVer) { |
| return assignment(topVer, cctx.shared().exchange().lastAffinityChangedTopologyVersion(topVer)); |
| } |
| |
| /** |
| * Get affinity assignment for the given topology version. |
| * |
| * @param topVer Topology version. |
| * @return Affinity assignment. |
| */ |
| public AffinityAssignment assignment(AffinityTopologyVersion topVer, AffinityTopologyVersion lastAffChangedTopVer) { |
| if (cctx.isLocal()) |
| topVer = lastAffChangedTopVer = LOC_CACHE_TOP_VER; |
| |
| GridAffinityAssignmentCache aff0 = aff; |
| |
| if (aff0 == null) |
| throw new IgniteException(FAILED_TO_FIND_CACHE_ERR_MSG + cctx.name()); |
| |
| return aff0.cachedAffinity(topVer, lastAffChangedTopVer); |
| } |
| |
| /** |
| * @param key Key to check. |
| * @param topVer Topology version. |
| * @return Primary node for given key. |
| */ |
| @Nullable public ClusterNode primaryByKey(Object key, AffinityTopologyVersion topVer) { |
| return primaryByPartition(partition(key), topVer); |
| } |
| |
| /** |
| * @param part Partition. |
| * @param topVer Topology version. |
| * @return Primary node for given key. |
| */ |
| @Nullable public ClusterNode primaryByPartition(int part, AffinityTopologyVersion topVer) { |
| List<ClusterNode> nodes = nodesByPartition(part, topVer); |
| |
| if (nodes.isEmpty()) |
| return null; |
| |
| return nodes.get(0); |
| } |
| |
| /** |
| * @param n Node to check. |
| * @param key Key to check. |
| * @param topVer Topology version. |
| * @return {@code True} if checked node is primary for given key. |
| */ |
| public boolean primaryByKey(ClusterNode n, Object key, AffinityTopologyVersion topVer) { |
| return F.eq(primaryByKey(key, topVer), n); |
| } |
| |
| /** |
| * @param n Node to check. |
| * @param part Partition. |
| * @param topVer Topology version. |
| * @return {@code True} if checked node is primary for given partition. |
| */ |
| public boolean primaryByPartition(ClusterNode n, int part, AffinityTopologyVersion topVer) { |
| return F.eq(primaryByPartition(part, topVer), n); |
| } |
| |
| /** |
| * @param key Key to check. |
| * @param topVer Topology version. |
| * @return Backup nodes. |
| */ |
| public Collection<ClusterNode> backupsByKey(Object key, AffinityTopologyVersion topVer) { |
| return backupsByPartition(partition(key), topVer); |
| } |
| |
| /** |
| * @param part Partition. |
| * @param topVer Topology version. |
| * @return Backup nodes. |
| */ |
| private Collection<ClusterNode> backupsByPartition(int part, AffinityTopologyVersion topVer) { |
| List<ClusterNode> nodes = nodesByPartition(part, topVer); |
| |
| assert !F.isEmpty(nodes); |
| |
| if (nodes.size() == 1) |
| return Collections.emptyList(); |
| |
| return F.view(nodes, F.notEqualTo(nodes.get(0))); |
| } |
| |
| /** |
| * @param n Node to check. |
| * @param part Partition. |
| * @param topVer Topology version. |
| * @return {@code True} if checked node is a backup node for given partition. |
| */ |
| public boolean backupByPartition(ClusterNode n, int part, AffinityTopologyVersion topVer) { |
| List<ClusterNode> nodes = nodesByPartition(part, topVer); |
| |
| assert !F.isEmpty(nodes); |
| |
| return nodes.indexOf(n) > 0; |
| } |
| |
| /** |
| * @param key Key to check. |
| * @param topVer Topology version. |
| * @return {@code true} if given key belongs to local node. |
| */ |
| public boolean keyLocalNode(Object key, AffinityTopologyVersion topVer) { |
| return partitionLocalNode(partition(key), topVer); |
| } |
| |
| /** |
| * @param part Partition number to check. |
| * @param topVer Topology version. |
| * @return {@code true} if given partition belongs to local node. |
| */ |
| public boolean partitionLocalNode(int part, AffinityTopologyVersion topVer) { |
| assert part >= 0 : "Invalid partition: " + part; |
| |
| return nodesByPartition(part, topVer).contains(cctx.localNode()); |
| } |
| |
| /** |
| * @param node Node. |
| * @param part Partition number to check. |
| * @param topVer Topology version. |
| * @return {@code true} if given partition belongs to specified node. |
| */ |
| public boolean partitionBelongs(ClusterNode node, int part, AffinityTopologyVersion topVer) { |
| assert node != null; |
| assert part >= 0 : "Invalid partition: " + part; |
| |
| return nodesByPartition(part, topVer).contains(node); |
| } |
| |
| /** |
| * @param nodeId Node ID. |
| * @param topVer Topology version to calculate affinity. |
| * @return Partitions for which given node is primary. |
| */ |
| public Set<Integer> primaryPartitions(UUID nodeId, AffinityTopologyVersion topVer) { |
| if (cctx.isLocal()) |
| topVer = LOC_CACHE_TOP_VER; |
| |
| GridAffinityAssignmentCache aff0 = aff; |
| |
| if (aff0 == null) |
| throw new IgniteException(FAILED_TO_FIND_CACHE_ERR_MSG + cctx.name()); |
| |
| return aff0.primaryPartitions(nodeId, topVer); |
| } |
| |
| /** |
| * @param nodeId Node ID. |
| * @param topVer Topology version to calculate affinity. |
| * @return Partitions for which given node is backup. |
| */ |
| public Set<Integer> backupPartitions(UUID nodeId, AffinityTopologyVersion topVer) { |
| if (cctx.isLocal()) |
| topVer = LOC_CACHE_TOP_VER; |
| |
| GridAffinityAssignmentCache aff0 = aff; |
| |
| if (aff0 == null) |
| throw new IgniteException(FAILED_TO_FIND_CACHE_ERR_MSG + cctx.name()); |
| |
| return aff0.backupPartitions(nodeId, topVer); |
| } |
| |
| /** |
| * @return Affinity-ready topology version. |
| */ |
| public AffinityTopologyVersion affinityTopologyVersion() { |
| GridAffinityAssignmentCache aff0 = aff; |
| |
| if (aff0 == null) |
| throw new IgniteException(FAILED_TO_FIND_CACHE_ERR_MSG + cctx.name()); |
| |
| return aff0.lastVersion(); |
| } |
| |
| /** |
| * @param part Partition. |
| * @param startVer Start version. |
| * @param endVer End version. |
| * @return {@code True} if primary changed or required affinity version not found in history. |
| */ |
| public boolean primaryChanged(int part, AffinityTopologyVersion startVer, AffinityTopologyVersion endVer) { |
| assert !cctx.isLocal() : cctx.name(); |
| |
| GridAffinityAssignmentCache aff0 = aff; |
| |
| if (aff0 == null) |
| throw new IgniteException(FAILED_TO_FIND_CACHE_ERR_MSG + cctx.name()); |
| |
| return aff0.primaryChanged(part, startVer, endVer); |
| } |
| } |