blob: 59d58934a1bea7f8d6470a17a2b03a42556216c3 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.cache.affinity.rendezvous;
import java.io.Externalizable;
import java.io.IOException;
import java.io.ObjectInput;
import java.io.ObjectOutput;
import java.io.PrintStream;
import java.io.Serializable;
import java.nio.file.FileSystems;
import java.nio.file.Files;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.cache.affinity.AffinityFunction;
import org.apache.ignite.cache.affinity.AffinityFunctionContext;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.events.DiscoveryEvent;
import org.apache.ignite.events.EventType;
import org.apache.ignite.internal.IgniteNodeAttributes;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.affinity.GridAffinityFunctionContextImpl;
import org.apache.ignite.internal.processors.cache.GridCacheUtils;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.A;
import org.apache.ignite.internal.util.typedef.internal.LT;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteBiPredicate;
import org.apache.ignite.lang.IgniteBiTuple;
import org.apache.ignite.resources.IgniteInstanceResource;
import org.apache.ignite.resources.LoggerResource;
import org.apache.ignite.testframework.GridTestNode;
import org.apache.ignite.testframework.GridTestUtils;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.junit.Test;
/**
* Simple benchmarks, compatibility test and distribution check utils for affinity functions.
* Needs to check changes at the {@link RendezvousAffinityFunction}.
*/
public class RendezvousAffinityFunctionSimpleBenchmark extends GridCommonAbstractTest {
/** MAC prefix. */
private static final String MAC_PREF = "MAC";
/** Ignite. */
private static Ignite ignite;
/** Max experiments. */
private static final int MAX_EXPERIMENTS = 200;
/** Max experiments. */
private TopologyModificationMode mode = TopologyModificationMode.CHANGE_LAST_NODE;
/** {@inheritDoc} */
@Override protected long getTestTimeout() {
return 3 * 3600 * 1000;
}
/** {@inheritDoc} */
@Override protected void beforeTestsStarted() throws Exception {
ignite = startGrid();
}
/** {@inheritDoc} */
@Override protected void afterTestsStopped() throws Exception {
super.afterTestsStopped();
ignite = null;
}
/**
* @param nodesCnt Count of nodes to generate.
* @return Nodes list.
*/
private List<ClusterNode> createBaseNodes(int nodesCnt) {
List<ClusterNode> nodes = new ArrayList<>(nodesCnt);
for (int i = 0; i < nodesCnt; i++) {
GridTestNode node = new GridTestNode(UUID.randomUUID());
// two neighbours nodes
node.setAttribute(IgniteNodeAttributes.ATTR_MACS, MAC_PREF + i / 2);
nodes.add(node);
}
return nodes;
}
/**
* Modify the topology by remove the last / add new node.
*
* @param nodes Topology.
* @param prevAssignment Previous afinity.
* @param iter Number of iteration.
* @param backups Backups count.
* @return Affinity context.
*/
private GridAffinityFunctionContextImpl nodesModificationChangeLast(List<ClusterNode> nodes,
List<List<ClusterNode>> prevAssignment, int iter, int backups) {
DiscoveryEvent discoEvt;
discoEvt = iter % 2 == 0 ? addNode(nodes, iter) : removeNode(nodes, nodes.size() - 1);
return new GridAffinityFunctionContextImpl(nodes,
prevAssignment, discoEvt, new AffinityTopologyVersion(nodes.size()), backups);
}
/**
* @param nodes Topology.
* @param idx Index of node to remove.
* @return Discovery event.
*/
@NotNull private DiscoveryEvent removeNode(List<ClusterNode> nodes, int idx) {
return new DiscoveryEvent(nodes.get(0), "", EventType.EVT_NODE_LEFT, nodes.remove(idx));
}
/**
* Modify the topology by remove the first node / add new node
*
* @param nodes Topology.
* @param prevAssignment Previous affinity.
* @param iter Number of iteration.
* @param backups Backups count.
* @return Affinity context.
*/
private GridAffinityFunctionContextImpl nodesModificationChangeFirst(List<ClusterNode> nodes,
List<List<ClusterNode>> prevAssignment, int iter, int backups) {
DiscoveryEvent discoEvt;
discoEvt = iter % 2 == 0 ? addNode(nodes, iter) : removeNode(nodes, 0);
return new GridAffinityFunctionContextImpl(nodes,
prevAssignment, discoEvt, new AffinityTopologyVersion(nodes.size()), backups);
}
/**
* @param nodes Topology.
* @param iter Iteration count.
* @return Discovery event.
*/
@NotNull private DiscoveryEvent addNode(List<ClusterNode> nodes, int iter) {
GridTestNode node = new GridTestNode(UUID.randomUUID());
// two neighbours nodes
node.setAttribute(IgniteNodeAttributes.ATTR_MACS, MAC_PREF + "_add_" + iter / 4);
nodes.add(node);
return new DiscoveryEvent(nodes.get(0), "", EventType.EVT_NODE_JOINED, node);
}
/**
*
* @param aff Affinity function.
* @param nodes Topology.
* @param iter Number of iteration.
* @param prevAssignment Previous affinity assignment.
* @param backups Backups count.
* @return Tuple with affinity and time spend of the affinity calculation.
*/
private IgniteBiTuple<Long, List<List<ClusterNode>>> assignPartitions(AffinityFunction aff,
List<ClusterNode> nodes, List<List<ClusterNode>> prevAssignment, int backups, int iter) {
GridAffinityFunctionContextImpl ctx = null;
switch (mode) {
case CHANGE_LAST_NODE:
ctx = nodesModificationChangeLast(nodes, prevAssignment, iter, backups);
break;
case CHANGE_FIRST_NODE:
ctx = nodesModificationChangeFirst(nodes, prevAssignment, iter, backups);
break;
case ADD:
ctx = new GridAffinityFunctionContextImpl(nodes,
prevAssignment, addNode(nodes, iter), new AffinityTopologyVersion(nodes.size()), backups);
break;
case REMOVE_RANDOM:
ctx = new GridAffinityFunctionContextImpl(nodes,
prevAssignment, removeNode(nodes, nodes.size() - 1),
new AffinityTopologyVersion(nodes.size()), backups);
break;
case NONE:
ctx = new GridAffinityFunctionContextImpl(nodes,
prevAssignment,
new DiscoveryEvent(nodes.get(0), "", EventType.EVT_NODE_JOINED, nodes.get(nodes.size() - 1)),
new AffinityTopologyVersion(nodes.size()), backups);
break;
}
long start = System.currentTimeMillis();
List<List<ClusterNode>> assignments = aff.assignPartitions(ctx);
return F.t(System.currentTimeMillis() - start, assignments);
}
/**
* @param lst List pf measures.
* @return Average of measures.
*/
private double average(Collection<Long> lst) {
if (lst.isEmpty())
return 0;
long sum = 0;
for (long l : lst)
sum += l;
return (double)sum / lst.size();
}
/**
* @param lst List pf measures.
* @param avg Average of the measures.
* @return Variance of the measures.
*/
private double variance(Collection<Long> lst, double avg) {
if (lst.isEmpty())
return 0;
long sum = 0;
for (long l : lst)
sum += (l - avg) * (l - avg);
return Math.sqrt((double)sum / lst.size());
}
/**
* The table with count of partitions on node:
*
* column 0 - primary partitions counts
* column 1 - backup#0 partitions counts
* etc
*
* Rows correspond to the nodes.
*
* @param lst Affinity result.
* @param nodes Topology.
* @return Frequency distribution: counts of partitions on node.
*/
private static List<List<Integer>> freqDistribution(List<List<ClusterNode>> lst, Collection<ClusterNode> nodes) {
List<Map<ClusterNode, AtomicInteger>> nodeMaps = new ArrayList<>();
int backups = lst.get(0).size();
for (int i = 0; i < backups; ++i) {
Map<ClusterNode, AtomicInteger> map = new HashMap<>();
for (List<ClusterNode> l : lst) {
ClusterNode node = l.get(i);
if (!map.containsKey(node))
map.put(node, new AtomicInteger(1));
else
map.get(node).incrementAndGet();
}
nodeMaps.add(map);
}
List<List<Integer>> byNodes = new ArrayList<>(nodes.size());
for (ClusterNode node : nodes) {
List<Integer> byBackups = new ArrayList<>(backups);
for (int j = 0; j < backups; ++j) {
if (nodeMaps.get(j).get(node) == null)
byBackups.add(0);
else
byBackups.add(nodeMaps.get(j).get(node).get());
}
byNodes.add(byBackups);
}
return byNodes;
}
/**
* @param byNodes Frequency distribution.
* @param suffix Label suffix.
* @throws IOException On error.
*/
private void printDistribution(Collection<List<Integer>> byNodes, String suffix) throws IOException {
int nodes = byNodes.size();
try (PrintStream ps = new PrintStream(Files.newOutputStream(FileSystems.getDefault()
.getPath(String.format("%03d", nodes) + suffix)))) {
for (List<Integer> byNode : byNodes) {
for (int w : byNode)
ps.print(String.format("%05d ", w));
ps.println("");
}
}
}
/**
* Chi-square test of the distribution with uniform distribution.
*
* @param byNodes Distribution.
* @param parts Partitions count.
* @param goldenNodeWeight Weight of according the uniform distribution.
* @return Chi-square test.
*/
private double chiSquare(List<List<Integer>> byNodes, int parts, double goldenNodeWeight) {
double sum = 0;
for (List<Integer> byNode : byNodes) {
double w = (double)byNode.get(0) / parts;
sum += (goldenNodeWeight - w) * (goldenNodeWeight - w) / goldenNodeWeight;
}
return sum;
}
/**
* @throws IOException On error.
*/
@Test
public void testDistribution() throws IOException {
AffinityFunction aff0 = new RendezvousAffinityFunction(true, 1024);
AffinityFunction aff1 = new RendezvousAffinityFunctionOld(true, 1024);
GridTestUtils.setFieldValue(aff1, "ignite", ignite);
affinityDistribution(aff0, aff1);
}
/**
*
* @param aff0 Affinity function to compare.
* @param aff1 Affinity function to compare.
*/
private void affinityDistribution(AffinityFunction aff0, AffinityFunction aff1) {
int[] nodesCnts = {5, 64, 100, 128, 200, 256, 300, 400, 500, 600};
for (int nodesCnt : nodesCnts) {
List<ClusterNode> nodes0 = createBaseNodes(nodesCnt);
List<ClusterNode> nodes1 = createBaseNodes(nodesCnt);
assignPartitions(aff0, nodes0, null, 2, 0).get2();
List<List<ClusterNode>> lst0 = assignPartitions(aff0, nodes0, null, 2, 1).get2();
assignPartitions(aff1, nodes1, null, 2, 0).get2();
List<List<ClusterNode>> lst1 = assignPartitions(aff1, nodes1, null, 2, 1).get2();
List<List<Integer>> dist0 = freqDistribution(lst0, nodes0);
List<List<Integer>> dist1 = freqDistribution(lst1, nodes1);
info(String.format("Chi^2. Test %d nodes. %s: %f; %s: %f;",
nodesCnt,
aff0.getClass().getSimpleName(),
chiSquare(dist0, aff0.partitions(), 1.0 / nodesCnt),
aff1.getClass().getSimpleName(),
chiSquare(dist1, aff0.partitions(), 1.0 / nodesCnt)));
try {
printDistribution(dist0, "." + aff0.getClass().getSimpleName());
printDistribution(dist1, "." + aff1.getClass().getSimpleName());
}
catch (IOException e) {
throw new RuntimeException(e);
}
}
}
/**
*
*/
@Test
public void testAffinityBenchmarkAdd() {
mode = TopologyModificationMode.ADD;
AffinityFunction aff0 = new RendezvousAffinityFunctionOld(true, 1024);
GridTestUtils.setFieldValue(aff0, "ignite", ignite);
affinityBenchmark(aff0, new RendezvousAffinityFunction(true, 1024));
}
/**
*
*/
@Test
public void testAffinityBenchmarkChangeLast() {
mode = TopologyModificationMode.CHANGE_LAST_NODE;
AffinityFunction aff0 = new RendezvousAffinityFunctionOld(true, 1024);
GridTestUtils.setFieldValue(aff0, "ignite", ignite);
affinityBenchmark(aff0, new RendezvousAffinityFunction(true, 1024));
}
/**
* @param aff0 Affinity function. to compare.
* @param aff1 Affinity function. to compare.
*/
private void affinityBenchmark(AffinityFunction aff0, AffinityFunction aff1) {
int[] nodesCnts = {100, 4, 100, 200, 300, 400, 500, 600};
final int backups = 2;
for (int nodesCnt : nodesCnts) {
List<ClusterNode> nodes0 = createBaseNodes(nodesCnt);
List<ClusterNode> nodes1 = createBaseNodes(nodesCnt);
List<Long> times0 = new ArrayList<>(MAX_EXPERIMENTS);
List<Long> times1 = new ArrayList<>(MAX_EXPERIMENTS);
List<List<ClusterNode>> prevAssignment =
assignPartitions(aff0, nodes0, null, backups, 0).get2();
for (int i = 0; i < MAX_EXPERIMENTS; ++i) {
IgniteBiTuple<Long, List<List<ClusterNode>>> aa
= assignPartitions(aff0, nodes0, prevAssignment, backups, i);
prevAssignment = aa.get2();
times0.add(aa.get1());
}
prevAssignment = assignPartitions(aff1, nodes1, null, backups, 0).get2();
for (int i = 0; i < MAX_EXPERIMENTS; ++i) {
IgniteBiTuple<Long, List<List<ClusterNode>>> aa
= assignPartitions(aff1, nodes1, prevAssignment, backups, i);
prevAssignment = aa.get2();
times1.add(aa.get1());
}
double avr0 = average(times0);
double var0 = variance(times0, avr0);
double avr1 = average(times1);
double var1 = variance(times1, avr1);
info(String.format("Test %d nodes. %s: %.1f ms +/- %.3f ms; %s: %.1f ms +/- %.3f ms;",
nodesCnt,
aff0.getClass().getSimpleName(),
avr0, var0,
aff1.getClass().getSimpleName(),
avr1, var1));
}
}
/**
*
* @param affOld Old affinity.
* @param affNew New affinity/
* @return Count of partitions to migrate.
*/
private int countPartitionsToMigrate(List<List<ClusterNode>> affOld, List<List<ClusterNode>> affNew) {
if (affOld == null || affNew == null)
return 0;
assertEquals(affOld.size(), affNew.size());
int diff = 0;
for (int i = 0; i < affOld.size(); ++i) {
Collection<ClusterNode> s0 = new HashSet<>(affOld.get(i));
Iterable<ClusterNode> s1 = new HashSet<>(affNew.get(i));
for (ClusterNode n : s1) {
if (!s0.contains(n))
++diff;
}
}
return diff;
}
/**
*
*/
@Test
public void testPartitionsMigrate() {
int[] nodesCnts = {2, 3, 10, 64, 100, 200, 300, 400, 500, 600};
final int backups = 2;
AffinityFunction aff0 = new RendezvousAffinityFunction(true, 256);
// TODO choose another affinity function to compare.
AffinityFunction aff1 = new RendezvousAffinityFunction(true, 256);
for (int nodesCnt : nodesCnts) {
List<ClusterNode> nodes0 = createBaseNodes(nodesCnt);
List<ClusterNode> nodes1 = createBaseNodes(nodesCnt);
List<List<ClusterNode>> affPrev = null;
int diffCnt0 = 0;
affPrev = assignPartitions(aff0, nodes0, null, backups, 0).get2();
for (int i = 0; i < MAX_EXPERIMENTS; ++i) {
List<List<ClusterNode>> affCur = assignPartitions(aff0, nodes0, affPrev, backups, i).get2();
diffCnt0 += countPartitionsToMigrate(affPrev, affCur);
affPrev = affCur;
}
affPrev = assignPartitions(aff1, nodes1, null, backups, 0).get2();
int diffCnt1 = 0;
for (int i = 0; i < MAX_EXPERIMENTS; ++i) {
List<List<ClusterNode>> affCur = assignPartitions(aff1, nodes1, affPrev, backups, i).get2();
diffCnt1 += countPartitionsToMigrate(affPrev, affCur);
affPrev = affCur;
}
double goldenChangeAffinity = (double)aff1.partitions() / nodesCnt * (backups + 1);
info(String.format("Test %d nodes. Golden: %.1f; %s: %.1f; %s: %.1f;",
nodesCnt, goldenChangeAffinity,
aff0.getClass().getSimpleName(),
(double)diffCnt0 / (MAX_EXPERIMENTS - 1),
aff1.getClass().getSimpleName(),
(double)diffCnt1 / (MAX_EXPERIMENTS - 1)));
}
}
/**
*
*/
@Test
public void testAffinityCompatibility() {
AffinityFunction aff0 = new RendezvousAffinityFunction(true, 1024);
// Use the full copy of the old implementation of the RendezvousAffinityFunction to check the compatibility.
AffinityFunction aff1 = new RendezvousAffinityFunctionOld(true, 1024);
GridTestUtils.setFieldValue(aff1, "ignite", ignite);
structuralCompatibility(aff0, aff1);
}
/**
* @param aff0 Affinity function to compare.
* @param aff1 Affinity function to compare.
*/
private void structuralCompatibility(AffinityFunction aff0, AffinityFunction aff1) {
int[] nodesCnts = {64, 100, 200, 300, 400, 500, 600};
final int backups = 2;
mode = TopologyModificationMode.NONE;
for (int nodesCnt : nodesCnts) {
List<ClusterNode> nodes = createBaseNodes(nodesCnt);
List<Integer> structure0 = structureOf(assignPartitions(aff0, nodes, null, backups, 0).get2());
List<Integer> structure1 = structureOf(assignPartitions(aff1, nodes, null, backups, 0).get2());
assertEquals(structure0, structure1);
}
}
/** */
private List<Integer> structureOf(List<List<ClusterNode>> assignment) {
List<Integer> res = new ArrayList<>();
for (List<ClusterNode> nodes : assignment)
res.add(nodes != null && !nodes.contains(null) ? nodes.size() : null);
return res;
}
/**
*
*/
private enum TopologyModificationMode {
/** Change the last node. */
CHANGE_LAST_NODE,
/** Change the first node. */
CHANGE_FIRST_NODE,
/** Add. */
ADD,
/** Remove random. */
REMOVE_RANDOM,
/** Do nothing*/
NONE
}
/**
* Full copy of the old implementation of the RendezvousAffinityFunction to check compatibility and performance.
*/
private static class RendezvousAffinityFunctionOld implements AffinityFunction, Externalizable {
/** */
private static final long serialVersionUID = 0L;
/** Default number of partitions. */
public static final int DFLT_PARTITION_COUNT = 1024;
/** Comparator. */
private static final Comparator<IgniteBiTuple<Long, ClusterNode>> COMPARATOR = new HashComparator();
/** Thread local message digest. */
private ThreadLocal<MessageDigest> digest = new ThreadLocal<MessageDigest>() {
@Override protected MessageDigest initialValue() {
try {
return MessageDigest.getInstance("MD5");
}
catch (NoSuchAlgorithmException e) {
assert false : "Should have failed in constructor";
throw new IgniteException("Failed to obtain message digest (digest was available in constructor)", e);
}
}
};
/** Number of partitions. */
private int parts;
/** Exclude neighbors flag. */
private boolean exclNeighbors;
/** Exclude neighbors warning. */
private transient boolean exclNeighborsWarn;
/** Optional backup filter. First node is primary, second node is a node being tested. */
private IgniteBiPredicate<ClusterNode, ClusterNode> backupFilter;
/** Optional affinity backups filter. The first node is a node being tested,
* the second is a list of nodes that are already assigned for a given partition (the first node in the list
* is primary). */
private IgniteBiPredicate<ClusterNode, List<ClusterNode>> affinityBackupFilter;
/** Ignite instance. */
@IgniteInstanceResource
private Ignite ignite;
/** Logger instance. */
@LoggerResource
private transient IgniteLogger log;
/**
* Empty constructor with all defaults.
*/
public RendezvousAffinityFunctionOld() {
this(false);
}
/**
* Initializes affinity with flag to exclude same-host-neighbors from being backups of each other
* and specified number of backups.
* <p>
* Note that {@code backupFilter} is ignored if {@code excludeNeighbors} is set to {@code true}.
*
* @param exclNeighbors {@code True} if nodes residing on the same host may not act as backups
* of each other.
*/
public RendezvousAffinityFunctionOld(boolean exclNeighbors) {
this(exclNeighbors, DFLT_PARTITION_COUNT);
}
/**
* Initializes affinity with flag to exclude same-host-neighbors from being backups of each other,
* and specified number of backups and partitions.
* <p>
* Note that {@code backupFilter} is ignored if {@code excludeNeighbors} is set to {@code true}.
*
* @param exclNeighbors {@code True} if nodes residing on the same host may not act as backups
* of each other.
* @param parts Total number of partitions.
*/
public RendezvousAffinityFunctionOld(boolean exclNeighbors, int parts) {
this(exclNeighbors, parts, null);
}
/**
* Initializes optional counts for replicas and backups.
* <p>
* Note that {@code backupFilter} is ignored if {@code excludeNeighbors} is set to {@code true}.
*
* @param parts Total number of partitions.
* @param backupFilter Optional back up filter for nodes. If provided, backups will be selected
* from all nodes that pass this filter. First argument for this filter is primary node, and second
* argument is node being tested.
* <p>
* Note that {@code backupFilter} is ignored if {@code excludeNeighbors} is set to {@code true}.
*/
public RendezvousAffinityFunctionOld(int parts, @Nullable IgniteBiPredicate<ClusterNode, ClusterNode> backupFilter) {
this(false, parts, backupFilter);
}
/**
* Private constructor.
*
* @param exclNeighbors Exclude neighbors flag.
* @param parts Partitions count.
* @param backupFilter Backup filter.
*/
private RendezvousAffinityFunctionOld(boolean exclNeighbors, int parts,
IgniteBiPredicate<ClusterNode, ClusterNode> backupFilter) {
A.ensure(parts > 0, "parts > 0");
this.exclNeighbors = exclNeighbors;
this.parts = parts;
this.backupFilter = backupFilter;
try {
MessageDigest.getInstance("MD5");
}
catch (NoSuchAlgorithmException e) {
throw new IgniteException("Failed to obtain MD5 message digest instance.", e);
}
}
/**
* Gets total number of key partitions. To ensure that all partitions are
* equally distributed across all nodes, please make sure that this
* number is significantly larger than a number of nodes. Also, partition
* size should be relatively small. Try to avoid having partitions with more
* than quarter million keys.
* <p>
* Note that for fully replicated caches this method should always
* return {@code 1}.
*
* @return Total partition count.
*/
public int getPartitions() {
return parts;
}
/**
* Sets total number of partitions.
*
* @param parts Total number of partitions.
*/
public void setPartitions(int parts) {
A.ensure(parts <= CacheConfiguration.MAX_PARTITIONS_COUNT, "parts <= " + CacheConfiguration.MAX_PARTITIONS_COUNT);
this.parts = parts;
}
/**
* Gets optional backup filter. If not {@code null}, backups will be selected
* from all nodes that pass this filter. First node passed to this filter is primary node,
* and second node is a node being tested.
* <p>
* Note that {@code backupFilter} is ignored if {@code excludeNeighbors} is set to {@code true}.
*
* @return Optional backup filter.
*/
@Nullable public IgniteBiPredicate<ClusterNode, ClusterNode> getBackupFilter() {
return backupFilter;
}
/**
* Sets optional backup filter. If provided, then backups will be selected from all
* nodes that pass this filter. First node being passed to this filter is primary node,
* and second node is a node being tested.
* <p>
* Note that {@code backupFilter} is ignored if {@code excludeNeighbors} is set to {@code true}.
*
* @param backupFilter Optional backup filter.
* @deprecated Use {@code affinityBackupFilter} instead.
*/
@Deprecated
public void setBackupFilter(@Nullable IgniteBiPredicate<ClusterNode, ClusterNode> backupFilter) {
this.backupFilter = backupFilter;
}
/**
* Gets optional backup filter. If not {@code null}, backups will be selected
* from all nodes that pass this filter. First node passed to this filter is a node being tested,
* and the second parameter is a list of nodes that are already assigned for a given partition (primary node is
* the first in the list).
* <p>
* Note that {@code affinityBackupFilter} is ignored if {@code excludeNeighbors} is set to {@code true}.
*
* @return Optional backup filter.
*/
@Nullable public IgniteBiPredicate<ClusterNode, List<ClusterNode>> getAffinityBackupFilter() {
return affinityBackupFilter;
}
/**
* Sets optional backup filter. If provided, then backups will be selected from all
* nodes that pass this filter. First node being passed to this filter is a node being tested,
* and the second parameter is a list of nodes that are already assigned for a given partition (primary node is
* the first in the list).
* <p>
* Note that {@code affinityBackupFilter} is ignored if {@code excludeNeighbors} is set to {@code true}.
*
* @param affinityBackupFilter Optional backup filter.
*/
public void setAffinityBackupFilter(@Nullable IgniteBiPredicate<ClusterNode,
List<ClusterNode>> affinityBackupFilter) {
this.affinityBackupFilter = affinityBackupFilter;
}
/**
* Checks flag to exclude same-host-neighbors from being backups of each other (default is {@code false}).
* <p>
* Note that {@code backupFilter} is ignored if {@code excludeNeighbors} is set to {@code true}.
*
* @return {@code True} if nodes residing on the same host may not act as backups of each other.
*/
public boolean isExcludeNeighbors() {
return exclNeighbors;
}
/**
* Sets flag to exclude same-host-neighbors from being backups of each other (default is {@code false}).
* <p>
* Note that {@code backupFilter} is ignored if {@code excludeNeighbors} is set to {@code true}.
*
* @param exclNeighbors {@code True} if nodes residing on the same host may not act as backups of each other.
*/
public void setExcludeNeighbors(boolean exclNeighbors) {
this.exclNeighbors = exclNeighbors;
}
/**
* Resolves node hash.
*
* @param node Cluster node;
* @return Node hash.
*/
public Object resolveNodeHash(ClusterNode node) {
return node.consistentId();
}
/**
* Returns collection of nodes (primary first) for specified partition.
*
* @param d Message digest.
* @param part Partition.
* @param nodes Nodes.
* @param nodesHash Serialized nodes hashes.
* @param backups Number of backups.
* @param neighborhoodCache Neighborhood.
* @return Assignment.
*/
public List<ClusterNode> assignPartition(MessageDigest d,
int part,
List<ClusterNode> nodes,
Map<ClusterNode, byte[]> nodesHash,
int backups,
@Nullable Map<UUID, Collection<ClusterNode>> neighborhoodCache) {
if (nodes.size() <= 1)
return nodes;
if (d == null)
d = digest.get();
List<IgniteBiTuple<Long, ClusterNode>> lst = new ArrayList<>(nodes.size());
try {
for (int i = 0; i < nodes.size(); i++) {
ClusterNode node = nodes.get(i);
byte[] nodeHashBytes = nodesHash.get(node);
if (nodeHashBytes == null) {
Object nodeHash = resolveNodeHash(node);
byte[] nodeHashBytes0 = U.marshal(ignite.configuration().getMarshaller(), nodeHash);
// Add 4 bytes for partition bytes.
nodeHashBytes = new byte[nodeHashBytes0.length + 4];
System.arraycopy(nodeHashBytes0, 0, nodeHashBytes, 4, nodeHashBytes0.length);
nodesHash.put(node, nodeHashBytes);
}
U.intToBytes(part, nodeHashBytes, 0);
d.reset();
byte[] bytes = d.digest(nodeHashBytes);
long hash =
(bytes[0] & 0xFFL)
| ((bytes[1] & 0xFFL) << 8)
| ((bytes[2] & 0xFFL) << 16)
| ((bytes[3] & 0xFFL) << 24)
| ((bytes[4] & 0xFFL) << 32)
| ((bytes[5] & 0xFFL) << 40)
| ((bytes[6] & 0xFFL) << 48)
| ((bytes[7] & 0xFFL) << 56);
lst.add(F.t(hash, node));
}
}
catch (IgniteCheckedException e) {
throw new IgniteException(e);
}
Collections.sort(lst, COMPARATOR);
int primaryAndBackups = backups == Integer.MAX_VALUE ? nodes.size() : Math.min(backups + 1, nodes.size());
List<ClusterNode> res = new ArrayList<>(primaryAndBackups);
ClusterNode primary = lst.get(0).get2();
res.add(primary);
// Select backups.
if (backups > 0) {
for (int i = 1; i < lst.size() && res.size() < primaryAndBackups; i++) {
IgniteBiTuple<Long, ClusterNode> next = lst.get(i);
ClusterNode node = next.get2();
if (exclNeighbors) {
Collection<ClusterNode> allNeighbors = GridCacheUtils.neighborsForNodes(neighborhoodCache, res);
if (!allNeighbors.contains(node))
res.add(node);
}
else if (affinityBackupFilter != null && affinityBackupFilter.apply(node, res))
res.add(next.get2());
else if (backupFilter != null && backupFilter.apply(primary, node))
res.add(next.get2());
else if (affinityBackupFilter == null && backupFilter == null)
res.add(next.get2());
}
}
if (res.size() < primaryAndBackups && nodes.size() >= primaryAndBackups && exclNeighbors) {
// Need to iterate again in case if there are no nodes which pass exclude neighbors backups criteria.
for (int i = 1; i < lst.size() && res.size() < primaryAndBackups; i++) {
IgniteBiTuple<Long, ClusterNode> next = lst.get(i);
ClusterNode node = next.get2();
if (!res.contains(node))
res.add(next.get2());
}
if (!exclNeighborsWarn) {
LT.warn(log, "Affinity function excludeNeighbors property is ignored " +
"because topology has no enough nodes to assign backups.");
exclNeighborsWarn = true;
}
}
assert res.size() <= primaryAndBackups;
return res;
}
/** {@inheritDoc} */
@Override public void reset() {
// No-op.
}
/** {@inheritDoc} */
@Override public int partitions() {
return parts;
}
/** {@inheritDoc} */
@Override public int partition(Object key) {
if (key == null)
throw new IllegalArgumentException("Null key is passed for a partition calculation. " +
"Make sure that an affinity key that is used is initialized properly.");
return U.safeAbs(key.hashCode() % parts);
}
/** {@inheritDoc} */
@Override public List<List<ClusterNode>> assignPartitions(AffinityFunctionContext affCtx) {
List<List<ClusterNode>> assignments = new ArrayList<>(parts);
Map<UUID, Collection<ClusterNode>> neighborhoodCache = exclNeighbors ?
GridCacheUtils.neighbors(affCtx.currentTopologySnapshot()) : null;
MessageDigest d = digest.get();
List<ClusterNode> nodes = affCtx.currentTopologySnapshot();
Map<ClusterNode, byte[]> nodesHash = U.newHashMap(nodes.size());
for (int i = 0; i < parts; i++) {
List<ClusterNode> partAssignment = assignPartition(d,
i,
nodes,
nodesHash,
affCtx.backups(),
neighborhoodCache);
assignments.add(partAssignment);
}
return assignments;
}
/** {@inheritDoc} */
@Override public void removeNode(UUID nodeId) {
// No-op.
}
/** {@inheritDoc} */
@Override public void writeExternal(ObjectOutput out) throws IOException {
out.writeInt(parts);
out.writeBoolean(exclNeighbors);
out.writeObject(backupFilter);
}
/** {@inheritDoc} */
@Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
parts = in.readInt();
exclNeighbors = in.readBoolean();
backupFilter = (IgniteBiPredicate<ClusterNode, ClusterNode>)in.readObject();
}
/**
*
*/
private static class HashComparator implements Comparator<IgniteBiTuple<Long, ClusterNode>>, Serializable {
/** */
private static final long serialVersionUID = 0L;
/** {@inheritDoc} */
@Override public int compare(IgniteBiTuple<Long, ClusterNode> o1, IgniteBiTuple<Long, ClusterNode> o2) {
return o1.get1() < o2.get1() ? -1 : o1.get1() > o2.get1() ? 1 :
o1.get2().id().compareTo(o2.get2().id());
}
}
}
}