blob: caddc85d9b63dee8d2d483449d301a7e1f2e5448 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.processors.affinity;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentSkipListMap;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.binary.BinaryObject;
import org.apache.ignite.cache.affinity.Affinity;
import org.apache.ignite.cache.affinity.AffinityFunction;
import org.apache.ignite.cache.affinity.AffinityKeyMapper;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.cluster.ClusterTopologyException;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.events.DiscoveryEvent;
import org.apache.ignite.events.Event;
import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.IgniteClientDisconnectedCheckedException;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.cluster.ClusterGroupEmptyCheckedException;
import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
import org.apache.ignite.internal.processors.GridProcessorAdapter;
import org.apache.ignite.internal.processors.cache.CacheObject;
import org.apache.ignite.internal.processors.cache.CacheObjectContext;
import org.apache.ignite.internal.processors.cache.DynamicCacheDescriptor;
import org.apache.ignite.internal.processors.cache.GridCacheAdapter;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.KeyCacheObject;
import org.apache.ignite.internal.processors.timeout.GridTimeoutObjectAdapter;
import org.apache.ignite.internal.util.GridLeanMap;
import org.apache.ignite.internal.util.future.GridFinishedFuture;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.internal.util.lang.GridTuple3;
import org.apache.ignite.internal.util.typedef.CI1;
import org.apache.ignite.internal.util.typedef.CX1;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.X;
import org.apache.ignite.internal.util.typedef.internal.A;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteFuture;
import org.apache.ignite.lang.IgnitePredicate;
import org.apache.ignite.lang.IgniteUuid;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import static org.apache.ignite.events.EventType.EVT_NODE_FAILED;
import static org.apache.ignite.events.EventType.EVT_NODE_LEFT;
import static org.apache.ignite.internal.GridClosureCallMode.BROADCAST;
import static org.apache.ignite.internal.processors.affinity.GridAffinityUtils.affinityJob;
import static org.apache.ignite.internal.processors.affinity.GridAffinityUtils.unmarshall;
/**
* Data affinity processor.
*/
public class GridAffinityProcessor extends GridProcessorAdapter {
/** Affinity map cleanup delay (ms). */
private static final long AFFINITY_MAP_CLEAN_UP_DELAY = 3000;
/** Log. */
private final IgniteLogger log;
/** Affinity map. */
private final ConcurrentSkipListMap<AffinityAssignmentKey, IgniteInternalFuture<AffinityInfo>> affMap = new ConcurrentSkipListMap<>();
/** Listener. */
private final GridLocalEventListener lsnr = new GridLocalEventListener() {
@Override public void onEvent(Event evt) {
int evtType = evt.type();
assert evtType == EVT_NODE_FAILED || evtType == EVT_NODE_LEFT;
if (affMap.isEmpty())
return; // Skip empty affinity map.
final DiscoveryEvent discoEvt = (DiscoveryEvent)evt;
// Clean up affinity functions if such cache no more exists.
final Collection<String> caches = ctx.cache().cacheNames();
final Collection<AffinityAssignmentKey> rmv = new HashSet<>();
for (AffinityAssignmentKey key : affMap.keySet()) {
if (!caches.contains(key.cacheName) || key.topVer.topologyVersion() < discoEvt.topologyVersion() - 10)
rmv.add(key);
}
if (!rmv.isEmpty()) {
ctx.timeout().addTimeoutObject(
new GridTimeoutObjectAdapter(
IgniteUuid.fromUuid(ctx.localNodeId()),
AFFINITY_MAP_CLEAN_UP_DELAY) {
@Override public void onTimeout() {
affMap.keySet().removeAll(rmv);
}
});
}
}
};
/**
* @param ctx Context.
*/
public GridAffinityProcessor(GridKernalContext ctx) {
super(ctx);
log = ctx.log(GridAffinityProcessor.class);
}
/** {@inheritDoc} */
@Override public void start() throws IgniteCheckedException {
ctx.event().addLocalEventListener(lsnr, EVT_NODE_FAILED, EVT_NODE_LEFT);
}
/** {@inheritDoc} */
@Override public void onKernalStop(boolean cancel) {
ctx.event().removeLocalEventListener(lsnr);
}
/** {@inheritDoc} */
@Override public void onDisconnected(IgniteFuture<?> reconnectFut) throws IgniteCheckedException {
affMap.clear();
}
/**
* @param cacheName Cache name.
* @param key Key.
* @return Key partition.
* @throws IgniteCheckedException If failed.
*/
public int partition(String cacheName, Object key) throws IgniteCheckedException {
return partition(cacheName, key, null);
}
/**
* @param cacheName Cache name (needed only if {@code aff} is not provided.
* @param key Key.
* @param aff Affinity information.
* @return Key partition.
* @throws IgniteCheckedException If failed.
*/
public int partition(String cacheName, Object key, @Nullable AffinityInfo aff) throws IgniteCheckedException {
assert cacheName != null;
if (key instanceof KeyCacheObject) {
int part = ((KeyCacheObject)key).partition();
if (part >= 0)
return part;
}
return partition0(cacheName, key, aff);
}
/**
* @param cacheName Cache name (needed only if {@code aff} is not provided.
* @param key Key.
* @param aff Affinity.
* @return Key partition.
* @throws IgniteCheckedException If failed.
*/
public int partition0(String cacheName, Object key, @Nullable AffinityInfo aff) throws IgniteCheckedException {
assert cacheName != null;
if (aff == null) {
aff = affinityCache(cacheName);
if (aff == null)
throw new IgniteCheckedException("Failed to get cache affinity (cache was not started " +
"yet or cache was already stopped): " + cacheName);
}
return aff.affFunc.partition(aff.affinityKey(key));
}
/**
* Maps partition to a node.
*
* @param cacheName Cache name.
* @param partId partition.
* @param topVer Affinity topology version.
* @return Picked node.
* @throws IgniteCheckedException If failed.
*/
@Nullable public ClusterNode mapPartitionToNode(String cacheName, int partId, AffinityTopologyVersion topVer)
throws IgniteCheckedException {
assert cacheName != null;
AffinityInfo affInfo = affinityCache(cacheName, topVer);
return affInfo != null ? F.first(affInfo.assignment().get(partId)) : null;
}
/**
* Removes cached affinity instances with affinity topology versions less than {@code topVer}.
*
* @param topVer topology version.
*/
public void removeCachedAffinity(AffinityTopologyVersion topVer) {
assert topVer != null;
int oldSize = affMap.size();
Iterator<Map.Entry<AffinityAssignmentKey, IgniteInternalFuture<AffinityInfo>>> it =
affMap.headMap(new AffinityAssignmentKey(topVer)).entrySet().iterator();
while (it.hasNext()) {
Map.Entry<AffinityAssignmentKey, IgniteInternalFuture<AffinityInfo>> entry = it.next();
assert entry.getValue() != null;
if (!entry.getValue().isDone())
continue;
it.remove();
}
if (log.isDebugEnabled())
log.debug("Affinity cached values were cleared: " + (oldSize - affMap.size()));
}
/**
* Maps keys to nodes for given cache.
*
* @param cacheName Cache name.
* @param keys Keys to map.
* @return Map of nodes to keys.
* @throws IgniteCheckedException If failed.
*/
public <K> Map<ClusterNode, Collection<K>> mapKeysToNodes(String cacheName, @Nullable Collection<? extends K> keys)
throws IgniteCheckedException {
assert cacheName != null;
return keysToNodes(cacheName, keys);
}
/**
* Maps single key to a node.
*
* @param cacheName Cache name.
* @param key Key to map.
* @return Picked node.
* @throws IgniteCheckedException If failed.
*/
@Nullable public <K> ClusterNode mapKeyToNode(String cacheName, K key) throws IgniteCheckedException {
assert cacheName != null;
Map<ClusterNode, Collection<K>> map = keysToNodes(cacheName, F.asList(key));
return !F.isEmpty(map) ? F.first(map.keySet()) : null;
}
/**
* Maps single key to a node.
*
* @param cacheName Cache name.
* @param key Key to map.
* @return Picked node.
* @throws IgniteCheckedException If failed.
*/
@Nullable public <K> ClusterNode mapKeyToNode(String cacheName, K key, AffinityTopologyVersion topVer)
throws IgniteCheckedException {
assert cacheName != null;
Map<ClusterNode, Collection<K>> map = keysToNodes(cacheName, F.asList(key), topVer);
return map != null ? F.first(map.keySet()) : null;
}
/**
* Map single key to primary and backup nodes.
*
* @param cacheName Cache name.
* @param key Key to map.
* @param topVer Topology version.
* @return Affinity nodes, primary first.
* @throws IgniteCheckedException If failed.
*/
public <K> List<ClusterNode> mapKeyToPrimaryAndBackups(String cacheName,
K key,
AffinityTopologyVersion topVer
) throws IgniteCheckedException {
assert cacheName != null;
A.notNull(key, "key");
AffinityInfo affInfo = affinityCache(cacheName, topVer);
if (affInfo == null)
return Collections.emptyList();
int part = partition(cacheName, key, affInfo);
return affInfo.assignment.get(part);
}
/**
* Gets affinity key for cache key.
*
* @param cacheName Cache name.
* @param key Cache key.
* @return Affinity key.
* @throws IgniteCheckedException In case of error.
*/
@Nullable public Object affinityKey(String cacheName, @Nullable Object key) throws IgniteCheckedException {
assert cacheName != null;
if (key == null)
return null;
AffinityInfo affInfo = affinityCache(cacheName);
if (affInfo == null)
return null;
return affInfo.affinityKey(key);
}
/**
* @param cacheName Cache name.
* @return Cache affinity.
*/
public <K> CacheAffinityProxy<K> affinityProxy(String cacheName) {
CU.validateCacheName(cacheName);
return new CacheAffinityProxy<>(cacheName);
}
/**
* @param cacheName Cache name.
* @param keys Keys.
* @return Affinity map.
* @throws IgniteCheckedException If failed.
*/
private <K> Map<ClusterNode, Collection<K>> keysToNodes(@Nullable final String cacheName,
Collection<? extends K> keys) throws IgniteCheckedException {
return keysToNodes(cacheName, keys, null);
}
/**
* @param cacheName Cache name.
* @param keys Keys.
* @param topVer Topology version.
* @return Affinity map.
* @throws IgniteCheckedException If failed.
*/
private <K> Map<ClusterNode, Collection<K>> keysToNodes(@Nullable final String cacheName,
Collection<? extends K> keys, AffinityTopologyVersion topVer) throws IgniteCheckedException {
if (F.isEmpty(keys))
return Collections.emptyMap();
AffinityInfo affInfo = affinityCache(cacheName, topVer);
return affInfo != null ? affinityMap(affInfo, keys) : Collections.emptyMap();
}
/**
* @param cacheName Cache name.
* @return Affinity cache.
* @throws IgniteCheckedException In case of error.
*/
@Nullable private AffinityInfo affinityCache(final String cacheName)
throws IgniteCheckedException {
return affinityCache(cacheName, null);
}
/**
* @param cacheName Cache name.
* @param topVer Topology version.
* @return Affinity cache.
* @throws IgniteCheckedException In case of error.
*/
@Nullable private AffinityInfo affinityCache(final String cacheName, @Nullable AffinityTopologyVersion topVer)
throws IgniteCheckedException {
return affinityCacheFuture(cacheName, topVer).get();
}
/**
* @param cacheName Cache name.
* @param topVer Topology version.
* @return Affinity cache.
* @throws IgniteCheckedException In case of error.
*/
public IgniteInternalFuture<AffinityInfo> affinityCacheFuture(final String cacheName, @Nullable AffinityTopologyVersion topVer)
throws IgniteCheckedException {
assert cacheName != null;
if (topVer == null)
topVer = ctx.cache().context().exchange().readyAffinityVersion();
IgniteInternalFuture<AffinityInfo> locFetchFut = localAffinityInfo(cacheName, topVer);
if (locFetchFut != null)
return locFetchFut;
return remoteAffinityInfo(cacheName, topVer);
}
/**
* Tries to fetch affinity info based on local cache affinity info. If cache with the given name is not started
* locally, will return {@code null}.
*
* @param cacheName Cache name to fetch.
* @param topVer Topology version to use.
* @return Future with affinity info or {@code null} if cache is not started locally.
* @throws IgniteCheckedException If failed to start for local cache context initialization.
*/
private IgniteInternalFuture<AffinityInfo> localAffinityInfo(
String cacheName,
AffinityTopologyVersion topVer
) throws IgniteCheckedException {
assert topVer != null;
AffinityAssignmentKey key = new AffinityAssignmentKey(cacheName, topVer);
IgniteInternalFuture<AffinityInfo> fut = affMap.get(key);
if (fut != null)
return fut;
GridCacheAdapter<Object, Object> cache = ctx.cache().internalCache(cacheName);
if (cache != null) {
GridCacheContext<Object, Object> cctx = cache.context();
cctx.awaitStarted();
AffinityAssignment assign0 = cctx.affinity().assignment(topVer);
try {
cctx.gate().enter();
}
catch (IllegalStateException ignored) {
return new GridFinishedFuture<>((AffinityInfo)null);
}
try {
// using legacy GridAffinityAssignment for compatibility.
AffinityInfo info = new AffinityInfo(
cctx.config().getAffinity(),
cctx.config().getAffinityMapper(),
new GridAffinityAssignment(topVer, assign0.assignment(), assign0.idealAssignment()),
cctx.cacheObjectContext()
);
GridFinishedFuture<AffinityInfo> fut0 = new GridFinishedFuture<>(info);
IgniteInternalFuture<AffinityInfo> old = affMap.putIfAbsent(key, fut0);
if (old != null)
return old;
return fut0;
}
finally {
cctx.gate().leave();
}
}
return null;
}
/**
* Tries to fetch affinity from remote nodes. If there are no nodes with the cache with the given name started,
* the retured future will be completed with {@code null}.
*
* @param cacheName Cache name to fetch affinity.
* @param topVer Topology version to fetch affinity.
* @return Affinity assignment fetch future.
*/
private IgniteInternalFuture<AffinityInfo> remoteAffinityInfo(
String cacheName,
AffinityTopologyVersion topVer
) {
assert topVer != null;
AffinityAssignmentKey key = new AffinityAssignmentKey(cacheName, topVer);
List<ClusterNode> cacheNodes = ctx.discovery().cacheNodes(cacheName, topVer);
DynamicCacheDescriptor desc = ctx.cache().cacheDescriptor(cacheName);
if (desc == null || F.isEmpty(cacheNodes)) {
if (ctx.clientDisconnected())
return new GridFinishedFuture<>(new IgniteClientDisconnectedCheckedException(ctx.cluster().clientReconnectFuture(),
"Failed to get affinity mapping, client disconnected."));
return new GridFinishedFuture<>((AffinityInfo)null);
}
AffinityFuture fut0 = new AffinityFuture(cacheName, topVer, cacheNodes);
IgniteInternalFuture<AffinityInfo> old = affMap.putIfAbsent(key, fut0);
if (old != null)
return old;
fut0.getAffinityFromNextNode();
return fut0;
}
/**
*
*/
private class AffinityFuture extends GridFutureAdapter<AffinityInfo> {
/** */
private final String cacheName;
/** */
private final AffinityTopologyVersion topVer;
/** */
private final List<ClusterNode> cacheNodes;
/** */
private int nodeIdx;
/**
* @param cacheName Cache name.
* @param topVer Topology version.
* @param cacheNodes Cache nodes.
*/
AffinityFuture(String cacheName, AffinityTopologyVersion topVer, List<ClusterNode> cacheNodes) {
this.cacheName = cacheName;
this.topVer = topVer;
this.cacheNodes = cacheNodes;
}
/**
*
*/
void getAffinityFromNextNode() {
while (nodeIdx < cacheNodes.size()) {
final ClusterNode node = cacheNodes.get(nodeIdx);
nodeIdx++;
if (!ctx.discovery().alive(node.id()))
continue;
affinityInfoFromNode(cacheName, topVer, node).listen(new CI1<IgniteInternalFuture<AffinityInfo>>() {
@Override public void apply(IgniteInternalFuture<AffinityInfo> fut) {
try {
onDone(fut.get());
}
catch (IgniteCheckedException e) {
if (e instanceof ClusterTopologyCheckedException || X.hasCause(e, ClusterTopologyException.class)) {
if (log.isDebugEnabled())
log.debug("Failed to get affinity from node, node failed [cache=" + cacheName +
", node=" + node.id() + ", msg=" + e.getMessage() + ']');
getAffinityFromNextNode();
return;
}
if (log.isDebugEnabled())
log.debug("Failed to get affinity from node [cache=" + cacheName +
", node=" + node.id() + ", msg=" + e.getMessage() + ']');
onDone(new IgniteCheckedException("Failed to get affinity mapping from node: " + node.id(), e));
}
}
});
return;
}
onDone(new ClusterGroupEmptyCheckedException("Failed to get cache affinity, all cache nodes failed: " + cacheName));
}
}
/**
* Requests {@link AffinityFunction} and {@link AffinityKeyMapper} from remote node.
*
* @param cacheName Name of cache on which affinity is requested.
* @param topVer Topology version.
* @param n Node from which affinity is requested.
* @return Affinity future.
*/
private IgniteInternalFuture<AffinityInfo> affinityInfoFromNode(String cacheName, AffinityTopologyVersion topVer, ClusterNode n) {
IgniteInternalFuture<GridTuple3<GridAffinityMessage, GridAffinityMessage, GridAffinityAssignment>> fut = ctx.closure()
.callAsyncNoFailover(BROADCAST, affinityJob(cacheName, topVer), F.asList(n), true/*system pool*/, 0, false);
return fut.chain(
new CX1<IgniteInternalFuture<GridTuple3<GridAffinityMessage, GridAffinityMessage, GridAffinityAssignment>>, AffinityInfo>() {
@Override public AffinityInfo applyx(
IgniteInternalFuture<GridTuple3<GridAffinityMessage, GridAffinityMessage, GridAffinityAssignment>> fut
) throws IgniteCheckedException {
GridTuple3<GridAffinityMessage, GridAffinityMessage, GridAffinityAssignment> t = fut.get();
AffinityFunction f = (AffinityFunction)unmarshall(ctx, n.id(), t.get1());
AffinityKeyMapper m = (AffinityKeyMapper)unmarshall(ctx, n.id(), t.get2());
assert m != null;
// Bring to initial state.
f.reset();
m.reset();
CacheConfiguration ccfg = ctx.cache().cacheConfiguration(cacheName);
return new AffinityInfo(f, m, t.get3(), ctx.cacheObjects().contextForCache(ccfg));
}
});
}
/**
* @param aff Affinity function.
* @param keys Keys.
* @return Affinity map.
* @throws IgniteCheckedException If failed.
*/
private <K> Map<ClusterNode, Collection<K>> affinityMap(AffinityInfo aff, Collection<? extends K> keys)
throws IgniteCheckedException {
assert aff != null;
assert !F.isEmpty(keys);
try {
if (keys.size() == 1)
return Collections.singletonMap(primary(aff, F.first(keys)), (Collection<K>)keys);
Map<ClusterNode, Collection<K>> map = new GridLeanMap<>();
for (K k : keys) {
ClusterNode n = primary(aff, k);
Collection<K> mapped = map.get(n);
if (mapped == null)
map.put(n, mapped = new LinkedList<>());
mapped.add(k);
}
return map;
}
catch (IgniteException e) {
// Affinity calculation may lead to IgniteException if no cache nodes found for pair cacheName+topVer.
throw new IgniteCheckedException("Failed to get affinity map for keys: " + keys, e);
}
}
/**
* Get primary node for cached key.
*
* @param aff Affinity function.
* @param key Key to check.
* @return Primary node for given key.
* @throws IgniteCheckedException In case of error.
*/
private <K> ClusterNode primary(AffinityInfo aff, K key) throws IgniteCheckedException {
int part = aff.affFunc.partition(aff.affinityKey(key));
Collection<ClusterNode> nodes = aff.assignment.get(part);
if (F.isEmpty(nodes))
throw new IgniteCheckedException("Failed to get affinity nodes [aff=" + aff + ", key=" + key + ']');
return nodes.iterator().next();
}
/**
* @param aff Affinity function.
* @param nodeFilter Node class.
* @param backups Number of backups.
* @param parts Number of partitions.
* @return Key to find caches with similar affinity.
*/
public Object similaryAffinityKey(AffinityFunction aff,
IgnitePredicate<ClusterNode> nodeFilter,
int backups,
int parts) {
return new SimilarAffinityKey(aff.getClass(), nodeFilter.getClass(), backups, parts);
}
/** {@inheritDoc} */
@Override public void printMemoryStats() {
X.println(">>>");
X.println(">>> Affinity processor memory stats [igniteInstanceName=" + ctx.igniteInstanceName() + ']');
X.println(">>> affMapSize: " + affMap.size());
}
/**
*
*/
private static class AffinityInfo {
/** Affinity function. */
private AffinityFunction affFunc;
/** Mapper */
private AffinityKeyMapper mapper;
/** Assignment. */
private GridAffinityAssignment assignment;
/** */
private CacheObjectContext cacheObjCtx;
/**
* @param affFunc Affinity function.
* @param mapper Affinity key mapper.
* @param assignment Partition assignment.
* @param cacheObjCtx Cache objects context.
*/
private AffinityInfo(AffinityFunction affFunc,
AffinityKeyMapper mapper,
GridAffinityAssignment assignment,
CacheObjectContext cacheObjCtx) {
this.affFunc = affFunc;
this.mapper = mapper;
this.assignment = assignment;
this.cacheObjCtx = cacheObjCtx;
}
/**
* @param key Key.
* @return Affinity key.
*/
private Object affinityKey(Object key) {
if (key instanceof CacheObject && !(key instanceof BinaryObject))
key = ((CacheObject)key).value(cacheObjCtx, false);
return mapper.affinityKey(key);
}
/**
* @return Cache affinity function.
*/
private AffinityFunction affinityFunction() {
return affFunc;
}
/**
* @return Affinity assignment.
*/
private GridAffinityAssignment assignment() {
return assignment;
}
/** {@inheritDoc} */
@Override public String toString() {
return S.toString(AffinityInfo.class, this);
}
}
/**
*
*/
private static class AffinityAssignmentKey implements Comparable<AffinityAssignmentKey> {
/** */
private String cacheName;
/** */
private AffinityTopologyVersion topVer;
/**
* @param cacheName Cache name.
* @param topVer Topology version.
*/
private AffinityAssignmentKey(@NotNull String cacheName, @NotNull AffinityTopologyVersion topVer) {
this.cacheName = cacheName;
this.topVer = topVer;
}
/**
* Current constructor should be used only in removeCachedAffinity for creating of the special keys for removing.
*
* @param topVer Topology version.
*/
private AffinityAssignmentKey(@NotNull AffinityTopologyVersion topVer) {
this.topVer = topVer;
}
/** {@inheritDoc} */
@Override public boolean equals(Object o) {
if (this == o)
return true;
if (!(o instanceof AffinityAssignmentKey))
return false;
AffinityAssignmentKey that = (AffinityAssignmentKey)o;
return topVer.equals(that.topVer) && F.eq(cacheName, that.cacheName);
}
/** {@inheritDoc} */
@Override public int hashCode() {
int res = cacheName != null ? cacheName.hashCode() : 0;
res = 31 * res + topVer.hashCode();
return res;
}
/** {@inheritDoc} */
@Override public String toString() {
return S.toString(AffinityAssignmentKey.class, this);
}
/** {@inheritDoc} */
@Override public int compareTo(AffinityAssignmentKey o) {
assert o != null;
if (this == o)
return 0;
int res = this.topVer.compareTo(o.topVer);
// Key with null cache name must be less than any key with not null cache name for the same topVer.
if (res == 0) {
if (cacheName == null && o.cacheName != null)
return -1;
if (cacheName != null && o.cacheName == null)
return 1;
if (cacheName == null && o.cacheName == null)
return 0;
return cacheName.compareTo(o.cacheName);
}
return res;
}
}
/**
* Grid cache affinity.
*/
private class CacheAffinityProxy<K> implements Affinity<K> {
/** Cache name. */
private final String cacheName;
/**
* @param cacheName Cache name.
*/
public CacheAffinityProxy(String cacheName) {
this.cacheName = cacheName;
}
/** {@inheritDoc} */
@Override public int partitions() {
ctx.gateway().readLock();
try {
return cache().affinityFunction().partitions();
}
catch (IgniteCheckedException e) {
throw new IgniteException(e);
}
finally {
ctx.gateway().readUnlock();
}
}
/** {@inheritDoc} */
@Override public int partition(K key) {
ctx.gateway().readLock();
try {
return partition0(cacheName, key, cache());
}
catch (IgniteCheckedException e) {
throw new IgniteException(e);
}
finally {
ctx.gateway().readUnlock();
}
}
/** {@inheritDoc} */
@Override public boolean isPrimary(ClusterNode n, K key) {
ctx.gateway().readLock();
try {
return cache().assignment().primaryPartitions(n.id()).contains(partition(key));
}
catch (IgniteCheckedException e) {
throw new IgniteException(e);
}
finally {
ctx.gateway().readUnlock();
}
}
/** {@inheritDoc} */
@Override public boolean isBackup(ClusterNode n, K key) {
ctx.gateway().readLock();
try {
return cache().assignment().backupPartitions(n.id()).contains(partition(key));
}
catch (IgniteCheckedException e) {
throw new IgniteException(e);
}
finally {
ctx.gateway().readUnlock();
}
}
/** {@inheritDoc} */
@Override public boolean isPrimaryOrBackup(ClusterNode n, K key) {
ctx.gateway().readLock();
try {
return isPrimary(n, key) || isBackup(n, key);
}
finally {
ctx.gateway().readUnlock();
}
}
/** {@inheritDoc} */
@Override public int[] primaryPartitions(ClusterNode n) {
ctx.gateway().readLock();
try {
Set<Integer> parts = cache().assignment().primaryPartitions(n.id());
return U.toIntArray(parts);
}
catch (IgniteCheckedException e) {
throw new IgniteException(e);
}
finally {
ctx.gateway().readUnlock();
}
}
/** {@inheritDoc} */
@Override public int[] backupPartitions(ClusterNode n) {
ctx.gateway().readLock();
try {
Set<Integer> parts = cache().assignment().backupPartitions(n.id());
return U.toIntArray(parts);
}
catch (IgniteCheckedException e) {
throw new IgniteException(e);
}
finally {
ctx.gateway().readUnlock();
}
}
/** {@inheritDoc} */
@Override public int[] allPartitions(ClusterNode n) {
ctx.gateway().readLock();
try {
GridAffinityAssignment assignment = cache().assignment();
int[] primary = U.toIntArray(assignment.primaryPartitions(n.id()));
int[] backup = U.toIntArray(assignment.backupPartitions(n.id()));
return U.addAll(primary, backup);
}
catch (IgniteCheckedException e) {
throw new IgniteException(e);
}
finally {
ctx.gateway().readUnlock();
}
}
/** {@inheritDoc} */
@Override public Object affinityKey(K key) {
ctx.gateway().readLock();
try {
return cache().affinityKey(key);
}
catch (IgniteCheckedException e) {
throw new IgniteException(e);
}
finally {
ctx.gateway().readUnlock();
}
}
/** {@inheritDoc} */
@Override public Map<ClusterNode, Collection<K>> mapKeysToNodes(@Nullable Collection<? extends K> keys) {
ctx.gateway().readLock();
try {
if (F.isEmpty(keys))
return Collections.emptyMap();
AffinityInfo affInfo = cache();
return affinityMap(affInfo, keys);
}
catch (IgniteCheckedException e) {
throw new IgniteException(e);
}
finally {
ctx.gateway().readUnlock();
}
}
/** {@inheritDoc} */
@Nullable @Override public ClusterNode mapKeyToNode(K key) {
A.notNull(key, "key");
ctx.gateway().readLock();
try {
AffinityInfo affInfo = cache();
Map<ClusterNode, Collection<K>> map = affinityMap(affInfo, Collections.singletonList(key));
return F.first(map.keySet());
}
catch (IgniteCheckedException e) {
throw new IgniteException(e);
}
finally {
ctx.gateway().readUnlock();
}
}
/** {@inheritDoc} */
@Override public Collection<ClusterNode> mapKeyToPrimaryAndBackups(K key) {
ctx.gateway().readLock();
try {
AffinityInfo aff = cache();
return aff.assignment().get(GridAffinityProcessor.this.partition(cacheName, key, aff));
}
catch (IgniteCheckedException e) {
throw new IgniteException(e);
}
finally {
ctx.gateway().readUnlock();
}
}
/** {@inheritDoc} */
@Override public ClusterNode mapPartitionToNode(int part) {
ctx.gateway().readLock();
try {
return F.first(cache().assignment().get(part));
}
catch (IgniteCheckedException e) {
throw new IgniteException(e);
}
finally {
ctx.gateway().readUnlock();
}
}
/** {@inheritDoc} */
@Override public Map<Integer, ClusterNode> mapPartitionsToNodes(Collection<Integer> parts) {
ctx.gateway().readLock();
try {
Map<Integer, ClusterNode> map = new HashMap<>();
if (!F.isEmpty(parts)) {
AffinityInfo aff = cache();
for (int p : parts)
map.put(p, F.first(aff.assignment().get(p)));
}
return map;
}
catch (IgniteCheckedException e) {
throw new IgniteException(e);
}
finally {
ctx.gateway().readUnlock();
}
}
/** {@inheritDoc} */
@Override public Collection<ClusterNode> mapPartitionToPrimaryAndBackups(int part) {
ctx.gateway().readLock();
try {
return cache().assignment().get(part);
}
catch (IgniteCheckedException e) {
throw new IgniteException(e);
}
finally {
ctx.gateway().readUnlock();
}
}
/**
* @return Affinity info for current topology version.
* @throws IgniteCheckedException If failed.
*/
private AffinityInfo cache() throws IgniteCheckedException {
AffinityInfo aff = affinityCache(cacheName);
if (aff == null)
throw new IgniteException("Failed to find cache (cache was not started " +
"yet or cache was already stopped): " + cacheName);
return aff;
}
}
/**
*
*/
private static class SimilarAffinityKey {
/** */
private final int backups;
/** */
private final Class<?> affFuncCls;
/** */
private final Class<?> filterCls;
/** */
private final int partsCnt;
/** */
private final int hash;
/**
* @param affFuncCls Affinity function class.
* @param filterCls Node filter class.
* @param backups Number of backups.
* @param partsCnt Number of partitions.
*/
SimilarAffinityKey(Class<?> affFuncCls, Class<?> filterCls, int backups, int partsCnt) {
this.backups = backups;
this.affFuncCls = affFuncCls;
this.filterCls = filterCls;
this.partsCnt = partsCnt;
int hash = backups;
hash = 31 * hash + affFuncCls.hashCode();
hash = 31 * hash + filterCls.hashCode();
hash = 31 * hash + partsCnt;
this.hash = hash;
}
/** {@inheritDoc} */
@Override public int hashCode() {
return hash;
}
/** {@inheritDoc} */
@Override public boolean equals(Object o) {
if (o == this)
return true;
if (o == null || getClass() != o.getClass())
return false;
SimilarAffinityKey key = (SimilarAffinityKey)o;
return backups == key.backups &&
affFuncCls == key.affFuncCls &&
filterCls == key.filterCls &&
partsCnt == key.partsCnt;
}
/** {@inheritDoc} */
@Override public String toString() {
return S.toString(SimilarAffinityKey.class, this);
}
}
}