blob: dd46246146b2610ab2c3c56ac9b100d63287454b [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.processors.affinity;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ConcurrentNavigableMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.cache.affinity.AffinityCentralizedFunction;
import org.apache.ignite.cache.affinity.AffinityFunction;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.events.DiscoveryEvent;
import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.cluster.NodeOrderComparator;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.managers.discovery.DiscoCache;
import org.apache.ignite.internal.processors.cluster.BaselineTopology;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgnitePredicate;
import org.jetbrains.annotations.Nullable;
import static org.apache.ignite.IgniteSystemProperties.IGNITE_AFFINITY_HISTORY_SIZE;
import static org.apache.ignite.IgniteSystemProperties.getInteger;
import static org.apache.ignite.internal.events.DiscoveryCustomEvent.EVT_DISCOVERY_CUSTOM_EVT;
/**
* Affinity cached function.
*/
public class GridAffinityAssignmentCache {
/** Cleanup history size. */
private final int MAX_HIST_SIZE = getInteger(IGNITE_AFFINITY_HISTORY_SIZE, 500);
/** Group name if specified or cache name. */
private final String cacheOrGrpName;
/** Group ID. */
private final int grpId;
/** Number of backups. */
private final int backups;
/** Affinity function. */
private final AffinityFunction aff;
/** */
private final IgnitePredicate<ClusterNode> nodeFilter;
/** Partitions count. */
private final int partsCnt;
/** Affinity calculation results cache: topology version => partition => nodes. */
private final ConcurrentNavigableMap<AffinityTopologyVersion, HistoryAffinityAssignment> affCache;
/** */
private List<List<ClusterNode>> idealAssignment;
/** */
private BaselineTopology baselineTopology;
/** */
private List<List<ClusterNode>> baselineAssignment;
/** Cache item corresponding to the head topology version. */
private final AtomicReference<GridAffinityAssignment> head;
/** Ready futures. */
private final ConcurrentMap<AffinityTopologyVersion, AffinityReadyFuture> readyFuts = new ConcurrentSkipListMap<>();
/** Log. */
private final IgniteLogger log;
/** */
private final GridKernalContext ctx;
/** */
private final boolean locCache;
/** Node stop flag. */
private volatile IgniteCheckedException stopErr;
/** History size ignoring client events changes. */
private final AtomicInteger histSize = new AtomicInteger();
/** Full history size. */
private final AtomicInteger fullHistSize = new AtomicInteger();
/** */
private final Object similarAffKey;
/**
* Constructs affinity cached calculations.
*
* @param ctx Kernal context.
* @param cacheOrGrpName Cache or cache group name.
* @param grpId Group ID.
* @param aff Affinity function.
* @param nodeFilter Node filter.
* @param backups Number of backups.
* @param locCache Local cache flag.
*/
@SuppressWarnings("unchecked")
public GridAffinityAssignmentCache(GridKernalContext ctx,
String cacheOrGrpName,
int grpId,
AffinityFunction aff,
IgnitePredicate<ClusterNode> nodeFilter,
int backups,
boolean locCache)
{
assert ctx != null;
assert aff != null;
assert nodeFilter != null;
assert grpId != 0;
this.ctx = ctx;
this.aff = aff;
this.nodeFilter = nodeFilter;
this.cacheOrGrpName = cacheOrGrpName;
this.grpId = grpId;
this.backups = backups;
this.locCache = locCache;
log = ctx.log(GridAffinityAssignmentCache.class);
partsCnt = aff.partitions();
affCache = new ConcurrentSkipListMap<>();
head = new AtomicReference<>(new GridAffinityAssignment(AffinityTopologyVersion.NONE));
similarAffKey = ctx.affinity().similaryAffinityKey(aff, nodeFilter, backups, partsCnt);
assert similarAffKey != null;
}
/**
* @return Key to find caches with similar affinity.
*/
public Object similarAffinityKey() {
return similarAffKey;
}
/**
* @return Group name if it is specified, otherwise cache name.
*/
public String cacheOrGroupName() {
return cacheOrGrpName;
}
/**
* @return Cache group ID.
*/
public int groupId() {
return grpId;
}
/**
* Initializes affinity with given topology version and assignment.
*
* @param topVer Topology version.
* @param affAssignment Affinity assignment for topology version.
*/
public void initialize(AffinityTopologyVersion topVer, List<List<ClusterNode>> affAssignment) {
assert topVer.compareTo(lastVersion()) >= 0 : "[topVer = " + topVer + ", last=" + lastVersion() + ']';
assert idealAssignment != null;
GridAffinityAssignment assignment = new GridAffinityAssignment(topVer, affAssignment, idealAssignment);
affCache.put(topVer, new HistoryAffinityAssignment(assignment));
head.set(assignment);
for (Map.Entry<AffinityTopologyVersion, AffinityReadyFuture> entry : readyFuts.entrySet()) {
if (entry.getKey().compareTo(topVer) <= 0) {
if (log.isDebugEnabled())
log.debug("Completing topology ready future (initialized affinity) " +
"[locNodeId=" + ctx.localNodeId() + ", futVer=" + entry.getKey() + ", topVer=" + topVer + ']');
entry.getValue().onDone(topVer);
}
}
onHistoryAdded(assignment);
}
/**
* @param assignment Assignment.
*/
public void idealAssignment(List<List<ClusterNode>> assignment) {
this.idealAssignment = assignment;
}
/**
* @return Assignment.
*/
@Nullable public List<List<ClusterNode>> idealAssignment() {
return idealAssignment;
}
/**
* @return {@code True} if affinity function has {@link AffinityCentralizedFunction} annotation.
*/
public boolean centralizedAffinityFunction() {
return U.hasAnnotation(aff, AffinityCentralizedFunction.class);
}
/**
* Kernal stop callback.
*
* @param err Error.
*/
public void cancelFutures(IgniteCheckedException err) {
stopErr = err;
for (AffinityReadyFuture fut : readyFuts.values())
fut.onDone(err);
}
/**
*
*/
public void onReconnected() {
idealAssignment = null;
affCache.clear();
head.set(new GridAffinityAssignment(AffinityTopologyVersion.NONE));
stopErr = null;
}
/**
* Calculates affinity cache for given topology version.
*
* @param topVer Topology version to calculate affinity cache for.
* @param discoEvt Discovery event that caused this topology version change.
* @param discoCache Discovery cache.
* @return Affinity assignments.
*/
@SuppressWarnings("IfMayBeConditional")
public List<List<ClusterNode>> calculate(AffinityTopologyVersion topVer, @Nullable DiscoveryEvent discoEvt,
@Nullable DiscoCache discoCache) {
if (log.isDebugEnabled())
log.debug("Calculating affinity [topVer=" + topVer + ", locNodeId=" + ctx.localNodeId() +
", discoEvt=" + discoEvt + ']');
List<List<ClusterNode>> prevAssignment = idealAssignment;
// Resolve nodes snapshot for specified topology version.
List<ClusterNode> sorted;
if (!locCache) {
sorted = new ArrayList<>(discoCache.cacheGroupAffinityNodes(groupId()));
Collections.sort(sorted, NodeOrderComparator.getInstance());
}
else
sorted = Collections.singletonList(ctx.discovery().localNode());
boolean hasBaseline = false;
boolean changedBaseline = false;
if (discoCache != null) {
hasBaseline = discoCache.state().baselineTopology() != null;
changedBaseline = !hasBaseline ? baselineTopology != null :
!discoCache.state().baselineTopology().equals(baselineTopology);
}
List<List<ClusterNode>> assignment;
if (prevAssignment != null && discoEvt != null) {
boolean affNode = CU.affinityNode(discoEvt.eventNode(), nodeFilter);
if (!affNode)
assignment = prevAssignment;
else if (hasBaseline && !changedBaseline) {
if (baselineAssignment == null)
baselineAssignment = aff.assignPartitions(new GridAffinityFunctionContextImpl(
discoCache.state().baselineTopology().createBaselineView(sorted, nodeFilter),
prevAssignment, discoEvt, topVer, backups));
assignment = currentBaselineAssignment(topVer);
}
else if (hasBaseline && changedBaseline) {
baselineAssignment = aff.assignPartitions(new GridAffinityFunctionContextImpl(
discoCache.state().baselineTopology().createBaselineView(sorted, nodeFilter),
prevAssignment, discoEvt, topVer, backups));
assignment = currentBaselineAssignment(topVer);
}
else
assignment = aff.assignPartitions(new GridAffinityFunctionContextImpl(sorted, prevAssignment,
discoEvt, topVer, backups));
}
else {
if (hasBaseline) {
baselineAssignment = aff.assignPartitions(new GridAffinityFunctionContextImpl(
discoCache.state().baselineTopology().createBaselineView(sorted, nodeFilter),
prevAssignment, discoEvt, topVer, backups));
assignment = currentBaselineAssignment(topVer);
}
else
assignment = aff.assignPartitions(new GridAffinityFunctionContextImpl(sorted, prevAssignment,
discoEvt, topVer, backups));
}
assert assignment != null;
idealAssignment = assignment;
if (hasBaseline) {
baselineTopology = discoCache.state().baselineTopology();
assert baselineAssignment != null;
}
else {
baselineTopology = null;
baselineAssignment = null;
}
if (locCache)
initialize(topVer, assignment);
return assignment;
}
/**
* @param topVer Topology version.
* @return Baseline assignment with filtered out offline nodes.
*/
private List<List<ClusterNode>> currentBaselineAssignment(AffinityTopologyVersion topVer) {
Map<Object, ClusterNode> alives = new HashMap<>();
for (ClusterNode node : ctx.discovery().nodes(topVer)) {
if (!node.isClient() && !node.isDaemon())
alives.put(node.consistentId(), node);
}
List<List<ClusterNode>> result = new ArrayList<>(baselineAssignment.size());
for (int p = 0; p < baselineAssignment.size(); p++) {
List<ClusterNode> baselineMapping = baselineAssignment.get(p);
List<ClusterNode> currentMapping = null;
for (ClusterNode node : baselineMapping) {
ClusterNode aliveNode = alives.get(node.consistentId());
if (aliveNode != null) {
if (currentMapping == null)
currentMapping = new ArrayList<>();
currentMapping.add(aliveNode);
}
}
result.add(p, currentMapping != null ? currentMapping : Collections.<ClusterNode>emptyList());
}
return result;
}
/**
* Copies previous affinity assignment when discovery event does not cause affinity assignment changes
* (e.g. client node joins on leaves).
*
* @param evt Event.
* @param topVer Topology version.
*/
public void clientEventTopologyChange(DiscoveryEvent evt, AffinityTopologyVersion topVer) {
assert topVer.compareTo(lastVersion()) >= 0 : "[topVer = " + topVer + ", last=" + lastVersion() + ']';
GridAffinityAssignment aff = head.get();
assert evt.type() == EVT_DISCOVERY_CUSTOM_EVT || aff.primaryPartitions(evt.eventNode().id()).isEmpty() : evt;
assert evt.type() == EVT_DISCOVERY_CUSTOM_EVT || aff.backupPartitions(evt.eventNode().id()).isEmpty() : evt;
GridAffinityAssignment assignmentCpy = new GridAffinityAssignment(topVer, aff);
affCache.put(topVer, new HistoryAffinityAssignment(assignmentCpy));
head.set(assignmentCpy);
for (Map.Entry<AffinityTopologyVersion, AffinityReadyFuture> entry : readyFuts.entrySet()) {
if (entry.getKey().compareTo(topVer) <= 0) {
if (log.isDebugEnabled())
log.debug("Completing topology ready future (use previous affinity) " +
"[locNodeId=" + ctx.localNodeId() + ", futVer=" + entry.getKey() + ", topVer=" + topVer + ']');
entry.getValue().onDone(topVer);
}
}
onHistoryAdded(assignmentCpy);
}
/**
* @return Last calculated affinity version.
*/
public AffinityTopologyVersion lastVersion() {
return head.get().topologyVersion();
}
/**
* @param topVer Topology version.
* @return Affinity assignment.
*/
public List<List<ClusterNode>> assignments(AffinityTopologyVersion topVer) {
AffinityAssignment aff = cachedAffinity(topVer);
return aff.assignment();
}
/**
* @param topVer Topology version.
* @return Affinity assignment.
*/
public List<List<ClusterNode>> readyAssignments(AffinityTopologyVersion topVer) {
AffinityAssignment aff = readyAffinity(topVer);
assert aff != null : "No ready affinity [grp=" + cacheOrGrpName + ", ver=" + topVer + ']';
return aff.assignment();
}
/**
* Gets future that will be completed after topology with version {@code topVer} is calculated.
*
* @param topVer Topology version to await for.
* @return Future that will be completed after affinity for topology version {@code topVer} is calculated.
*/
@Nullable public IgniteInternalFuture<AffinityTopologyVersion> readyFuture(AffinityTopologyVersion topVer) {
GridAffinityAssignment aff = head.get();
if (aff.topologyVersion().compareTo(topVer) >= 0) {
if (log.isDebugEnabled())
log.debug("Returning finished future for readyFuture [head=" + aff.topologyVersion() +
", topVer=" + topVer + ']');
return null;
}
GridFutureAdapter<AffinityTopologyVersion> fut = F.addIfAbsent(readyFuts, topVer,
new AffinityReadyFuture(topVer));
aff = head.get();
if (aff.topologyVersion().compareTo(topVer) >= 0) {
if (log.isDebugEnabled())
log.debug("Completing topology ready future right away [head=" + aff.topologyVersion() +
", topVer=" + topVer + ']');
fut.onDone(aff.topologyVersion());
}
else if (stopErr != null)
fut.onDone(stopErr);
return fut;
}
/**
* @return Partition count.
*/
public int partitions() {
return partsCnt;
}
/**
* Gets affinity nodes for specified partition.
*
* @param part Partition.
* @param topVer Topology version.
* @return Affinity nodes.
*/
public List<ClusterNode> nodes(int part, AffinityTopologyVersion topVer) {
// Resolve cached affinity nodes.
return cachedAffinity(topVer).get(part);
}
/**
* Get primary partitions for specified node ID.
*
* @param nodeId Node ID to get primary partitions for.
* @param topVer Topology version.
* @return Primary partitions for specified node ID.
*/
public Set<Integer> primaryPartitions(UUID nodeId, AffinityTopologyVersion topVer) {
return cachedAffinity(topVer).primaryPartitions(nodeId);
}
/**
* Get backup partitions for specified node ID.
*
* @param nodeId Node ID to get backup partitions for.
* @param topVer Topology version.
* @return Backup partitions for specified node ID.
*/
public Set<Integer> backupPartitions(UUID nodeId, AffinityTopologyVersion topVer) {
return cachedAffinity(topVer).backupPartitions(nodeId);
}
/**
* Dumps debug information.
*
* @return {@code True} if there are pending futures.
*/
public boolean dumpDebugInfo() {
if (!readyFuts.isEmpty()) {
U.warn(log, "First 3 pending affinity ready futures [grp=" + cacheOrGrpName +
", total=" + readyFuts.size() +
", lastVer=" + lastVersion() + "]:");
int cnt = 0;
for (AffinityReadyFuture fut : readyFuts.values()) {
U.warn(log, ">>> " + fut);
if (++cnt == 3)
break;
}
return true;
}
return false;
}
/**
* @param topVer Topology version.
* @return Assignment.
*/
public AffinityAssignment readyAffinity(AffinityTopologyVersion topVer) {
AffinityAssignment cache = head.get();
if (!cache.topologyVersion().equals(topVer)) {
cache = affCache.get(topVer);
if (cache == null) {
throw new IllegalStateException("Affinity for topology version is " +
"not initialized [locNode=" + ctx.discovery().localNode().id() +
", grp=" + cacheOrGrpName +
", topVer=" + topVer +
", head=" + head.get().topologyVersion() +
", history=" + affCache.keySet() +
']');
}
}
return cache;
}
/**
* Get cached affinity for specified topology version.
*
* @param topVer Topology version.
* @return Cached affinity.
*/
public AffinityAssignment cachedAffinity(AffinityTopologyVersion topVer) {
if (topVer.equals(AffinityTopologyVersion.NONE))
topVer = lastVersion();
else
awaitTopologyVersion(topVer);
assert topVer.topologyVersion() >= 0 : topVer;
AffinityAssignment cache = head.get();
if (!cache.topologyVersion().equals(topVer)) {
cache = affCache.get(topVer);
if (cache == null) {
throw new IllegalStateException("Getting affinity for topology version earlier than affinity is " +
"calculated [locNode=" + ctx.discovery().localNode() +
", grp=" + cacheOrGrpName +
", topVer=" + topVer +
", head=" + head.get().topologyVersion() +
", history=" + affCache.keySet() +
']');
}
}
assert cache.topologyVersion().equals(topVer) : "Invalid cached affinity: " + cache;
return cache;
}
/**
* @param part Partition.
* @param startVer Start version.
* @param endVer End version.
* @return {@code True} if primary changed or required affinity version not found in history.
*/
public boolean primaryChanged(int part, AffinityTopologyVersion startVer, AffinityTopologyVersion endVer) {
AffinityAssignment aff = affCache.get(startVer);
if (aff == null)
return false;
List<ClusterNode> nodes = aff.get(part);
if (nodes.isEmpty())
return true;
ClusterNode primary = nodes.get(0);
for (AffinityAssignment assignment : affCache.tailMap(startVer, false).values()) {
List<ClusterNode> nodes0 = assignment.assignment().get(part);
if (nodes0.isEmpty())
return true;
if (!nodes0.get(0).equals(primary))
return true;
if (assignment.topologyVersion().equals(endVer))
return false;
}
return true;
}
/**
* @param aff Affinity cache.
*/
public void init(GridAffinityAssignmentCache aff) {
assert aff.lastVersion().compareTo(lastVersion()) >= 0;
assert aff.idealAssignment() != null;
idealAssignment(aff.idealAssignment());
initialize(aff.lastVersion(), aff.assignments(aff.lastVersion()));
}
/**
* @param topVer Topology version to wait.
*/
private void awaitTopologyVersion(AffinityTopologyVersion topVer) {
GridAffinityAssignment aff = head.get();
if (aff.topologyVersion().compareTo(topVer) >= 0)
return;
try {
if (log.isDebugEnabled())
log.debug("Will wait for topology version [locNodeId=" + ctx.localNodeId() +
", topVer=" + topVer + ']');
IgniteInternalFuture<AffinityTopologyVersion> fut = readyFuture(topVer);
if (fut != null) {
Thread curTh = Thread.currentThread();
String threadName = curTh.getName();
try {
curTh.setName(threadName + " (waiting " + topVer + ")");
fut.get();
}
finally {
curTh.setName(threadName);
}
}
}
catch (IgniteCheckedException e) {
throw new IgniteException("Failed to wait for affinity ready future for topology version: " + topVer,
e);
}
}
/**
* @param aff Added affinity assignment.
*/
private void onHistoryAdded(GridAffinityAssignment aff) {
int fullSize = fullHistSize.incrementAndGet();
int size;
if (aff.clientEventChange())
size = histSize.get();
else
size = histSize.incrementAndGet();
int rmvCnt = size - MAX_HIST_SIZE;
if (rmvCnt <= 0) {
if (fullSize > MAX_HIST_SIZE * 2)
rmvCnt = MAX_HIST_SIZE;
}
if (rmvCnt > 0) {
Iterator<HistoryAffinityAssignment> it = affCache.values().iterator();
while (it.hasNext() && rmvCnt > 0) {
AffinityAssignment aff0 = it.next();
it.remove();
rmvCnt--;
if (!aff0.clientEventChange())
histSize.decrementAndGet();
fullHistSize.decrementAndGet();
}
}
}
/**
* @return All initialized versions.
*/
public Collection<AffinityTopologyVersion> cachedVersions() {
return affCache.keySet();
}
/**
* Affinity ready future. Will remove itself from ready futures map.
*/
private class AffinityReadyFuture extends GridFutureAdapter<AffinityTopologyVersion> {
/** */
private AffinityTopologyVersion reqTopVer;
/**
*
* @param reqTopVer Required topology version.
*/
private AffinityReadyFuture(AffinityTopologyVersion reqTopVer) {
this.reqTopVer = reqTopVer;
}
/** {@inheritDoc} */
@Override public boolean onDone(AffinityTopologyVersion res, @Nullable Throwable err) {
assert res != null || err != null;
boolean done = super.onDone(res, err);
if (done)
readyFuts.remove(reqTopVer, this);
return done;
}
/** {@inheritDoc} */
@Override public String toString() {
return S.toString(AffinityReadyFuture.class, this);
}
}
}