blob: 53f2a4019fb7e984ad86351d6c8a4b0402ed0bea [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.cache.affinity;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.cache.CacheWriteSynchronizationMode;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.lang.IgniteBiPredicate;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.jetbrains.annotations.NotNull;
import org.junit.Test;
import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
import static org.apache.ignite.cache.CacheMode.PARTITIONED;
import static org.apache.ignite.cache.CacheRebalanceMode.SYNC;
/**
* Base tests of {@link AffinityFunction} implementations with user provided backup filter.
*/
public abstract class AffinityFunctionBackupFilterAbstractSelfTest extends GridCommonAbstractTest {
/** Split attribute name. */
protected static final String SPLIT_ATTRIBUTE_NAME = "split-attribute";
/** Split attribute value. */
private String splitAttrVal;
/** Attribute value for first node group. */
public static final String FIRST_NODE_GROUP = "A";
/** Backup count. */
protected int backups = 1;
/** Test backup filter. */
protected static final IgniteBiPredicate<ClusterNode, ClusterNode> backupFilter =
new IgniteBiPredicate<ClusterNode, ClusterNode>() {
@Override public boolean apply(ClusterNode primary, ClusterNode backup) {
assert primary != null : "primary is null";
assert backup != null : "backup is null";
return !F.eq(primary.attribute(SPLIT_ATTRIBUTE_NAME), backup.attribute(SPLIT_ATTRIBUTE_NAME));
}
};
/** Test backup filter. */
protected static final IgniteBiPredicate<ClusterNode, List<ClusterNode>> affinityBackupFilter =
new IgniteBiPredicate<ClusterNode, List<ClusterNode>>() {
@Override public boolean apply(ClusterNode node, List<ClusterNode> assigned) {
assert node != null : "primary is null";
assert assigned != null : "backup is null";
Map<String, Integer> backupAssignedAttribute = getAttributeStatistic(assigned);
String nodeAttributeVal = node.attribute(SPLIT_ATTRIBUTE_NAME);
if (FIRST_NODE_GROUP.equals(nodeAttributeVal)
&& backupAssignedAttribute.get(FIRST_NODE_GROUP) < 2)
return true;
return backupAssignedAttribute.get(nodeAttributeVal).equals(0);
}
};
/**
* @param nodes List of cluster nodes.
* @return Statistic.
*/
@NotNull protected static Map<String, Integer> getAttributeStatistic(Collection<ClusterNode> nodes) {
Map<String, Integer> backupAssignedAttribute = new HashMap<>();
backupAssignedAttribute.put(FIRST_NODE_GROUP, 0);
backupAssignedAttribute.put("B", 0);
backupAssignedAttribute.put("C", 0);
for (ClusterNode assignedNode: nodes) {
if (assignedNode == null)
continue;
String val = assignedNode.attribute(SPLIT_ATTRIBUTE_NAME);
Integer cnt = backupAssignedAttribute.get(val);
backupAssignedAttribute.put(val, cnt + 1);
}
return backupAssignedAttribute;
}
/** {@inheritDoc} */
@Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
CacheConfiguration cacheCfg = defaultCacheConfiguration();
cacheCfg.setCacheMode(PARTITIONED);
cacheCfg.setBackups(backups);
if (backups < 2)
cacheCfg.setAffinity(affinityFunction());
else
cacheCfg.setAffinity(affinityFunctionWithAffinityBackupFilter(SPLIT_ATTRIBUTE_NAME));
cacheCfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC);
cacheCfg.setRebalanceMode(SYNC);
cacheCfg.setAtomicityMode(TRANSACTIONAL);
IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
cfg.setCacheConfiguration(cacheCfg);
cfg.setUserAttributes(F.asMap(SPLIT_ATTRIBUTE_NAME, splitAttrVal));
cfg.setConsistentId(igniteInstanceName);
return cfg;
}
/**
* @return Affinity function for test.
*/
protected abstract AffinityFunction affinityFunction();
/**
* @return Affinity function for test.
*/
protected abstract AffinityFunction affinityFunctionWithAffinityBackupFilter(String attributeName);
/**
* @throws Exception If failed.
*/
@Test
public void testPartitionDistribution() throws Exception {
backups = 1;
try {
for (int i = 0; i < 3; i++) {
splitAttrVal = "A";
startGrid(2 * i);
splitAttrVal = "B";
startGrid(2 * i + 1);
awaitPartitionMapExchange();
checkPartitions();
}
}
finally {
stopAllGrids();
}
}
/**
* @throws Exception If failed.
*/
@SuppressWarnings("ConstantConditions")
protected void checkPartitions() throws Exception {
AffinityFunction aff = cacheConfiguration(grid(0).configuration(), DEFAULT_CACHE_NAME).getAffinity();
int partCnt = aff.partitions();
IgniteCache<Object, Object> cache = grid(0).cache(DEFAULT_CACHE_NAME);
for (int i = 0; i < partCnt; i++) {
Collection<ClusterNode> nodes = affinity(cache).mapKeyToPrimaryAndBackups(i);
assertEquals(2, nodes.size());
ClusterNode primary = F.first(nodes);
ClusterNode backup = F.last(nodes);
assertFalse(F.eq(primary.attribute(SPLIT_ATTRIBUTE_NAME), backup.attribute(SPLIT_ATTRIBUTE_NAME)));
}
}
/**
* @throws Exception If failed.
*/
@Test
public void testPartitionDistributionWithAffinityBackupFilter() throws Exception {
backups = 3;
try {
for (int i = 0; i < 2; i++) {
splitAttrVal = FIRST_NODE_GROUP;
startGrid(4 * i);
startGrid(4 * i + 3);
splitAttrVal = "B";
startGrid(4 * i + 1);
splitAttrVal = "C";
startGrid(4 * i + 2);
awaitPartitionMapExchange();
checkPartitionsWithAffinityBackupFilter();
}
}
finally {
stopAllGrids();
}
}
/**
* Start grid with split attribute value.
*
* @param gridIdx Grid index.
* @param splitAttrVal Split attribute value.
*/
protected IgniteEx startGrid(int gridIdx, String splitAttrVal) throws Exception {
this.splitAttrVal = splitAttrVal;
return startGrid(gridIdx);
}
/** Different affinityBackupFilters have different goals */
protected int expectedNodesForEachPartition() {
return backups + 1;
}
/**
* @throws Exception If failed.
*/
protected void checkPartitionsWithAffinityBackupFilter() throws Exception {
AffinityFunction aff = cacheConfiguration(grid(0).configuration(), DEFAULT_CACHE_NAME).getAffinity();
int partCnt = aff.partitions();
IgniteCache<Object, Object> cache = grid(0).cache(DEFAULT_CACHE_NAME);
for (int i = 0; i < partCnt; i++) {
Collection<ClusterNode> nodes = affinity(cache).mapKeyToPrimaryAndBackups(i);
assertEquals(expectedNodesForEachPartition(), nodes.size());
Map<String, Integer> stat = getAttributeStatistic(nodes);
assertEquals(stat.get(FIRST_NODE_GROUP), new Integer(expectedNodesForEachPartition() - 2 ));
assertEquals(stat.get("B"), new Integer(1));
assertEquals(stat.get("C"), new Integer(1));
}
}
}