blob: 5349ca7892c0ad9192245ee418522c580c0bc48b [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.processors.affinity;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Map;
import java.util.Random;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.cache.affinity.AffinityFunction;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.IgniteKernal;
import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.apache.ignite.testframework.junits.common.GridCommonTest;
import org.junit.Test;
import static org.apache.ignite.cache.CacheMode.PARTITIONED;
/**
* Tests for {@link GridAffinityProcessor}.
*/
@GridCommonTest(group = "Affinity Processor")
public abstract class GridAffinityProcessorAbstractSelfTest extends GridCommonAbstractTest {
/** Number of grids started for tests. Should not be less than 2. */
private static final int NODES_CNT = 3;
/** Cache name. */
private static final String CACHE_NAME = "cache";
/** Flag to start grid with cache. */
private boolean withCache;
/** {@inheritDoc} */
@Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
((TcpDiscoverySpi)cfg.getDiscoverySpi()).setForceServerMode(true);
if (withCache) {
CacheConfiguration cacheCfg = defaultCacheConfiguration();
cacheCfg.setName(CACHE_NAME);
cacheCfg.setCacheMode(PARTITIONED);
cacheCfg.setBackups(1);
cacheCfg.setAffinity(affinityFunction());
cfg.setCacheConfiguration(cacheCfg);
}
return cfg;
}
/**
* Creates affinity function for test.
*
* @return Affinity function.
*/
protected abstract AffinityFunction affinityFunction();
/** {@inheritDoc} */
@Override protected void beforeTestsStarted() throws Exception {
assert NODES_CNT >= 1;
withCache = true;
for (int i = 0; i < NODES_CNT; i++)
startGrid(i);
withCache = false;
for (int i = NODES_CNT; i < 2 * NODES_CNT; i++)
startClientGrid(i);
}
/**
* Test affinity functions caching and clean up.
*
* @throws Exception In case of any exception.
*/
@Test
public void testAffinityProcessor() throws Exception {
Random rnd = new Random();
final IgniteKernal grid1 = (IgniteKernal)grid(rnd.nextInt(NODES_CNT)); // With cache.
IgniteKernal grid2 = (IgniteKernal)grid(NODES_CNT + rnd.nextInt(NODES_CNT)); // Without cache.
assertEquals(NODES_CNT * 2, grid1.cluster().nodes().size());
assertEquals(NODES_CNT * 2, grid2.cluster().nodes().size());
IgniteCache<Integer, Integer> cache = grid2.cache(CACHE_NAME);
assertNotNull(cache);
GridAffinityProcessor affPrc1 = grid1.context().affinity();
GridAffinityProcessor affPrc2 = grid2.context().affinity();
// Create keys collection.
Collection<Integer> keys = new ArrayList<>(1000);
for (int i = 0; i < 1000; i++)
keys.add(i);
//
// Validate affinity functions collection updated on first call.
//
Map<ClusterNode, Collection<Integer>> node1Map = affPrc1.mapKeysToNodes(CACHE_NAME, keys);
Map<ClusterNode, Collection<Integer>> node2Map = affPrc2.mapKeysToNodes(CACHE_NAME, keys);
Map<ClusterNode, Collection<Integer>> cacheMap = affinity(cache).mapKeysToNodes(keys);
assertEquals(cacheMap.size(), node1Map.size());
assertEquals(cacheMap.size(), node2Map.size());
for (Map.Entry<ClusterNode, Collection<Integer>> entry : cacheMap.entrySet()) {
ClusterNode node = entry.getKey();
Collection<Integer> mappedKeys = entry.getValue();
Collection<Integer> mapped1 = node1Map.get(node);
Collection<Integer> mapped2 = node2Map.get(node);
assertTrue(mappedKeys.containsAll(mapped1) && mapped1.containsAll(mappedKeys));
assertTrue(mappedKeys.containsAll(mapped2) && mapped2.containsAll(mappedKeys));
}
}
/**
* Test performance of affinity processor.
*
* @throws Exception In case of any exception.
*/
@Test
public void testPerformance() throws Exception {
IgniteKernal grid = (IgniteKernal)grid(0);
GridAffinityProcessor aff = grid.context().affinity();
int keysSize = 1000000;
Collection<Integer> keys = new ArrayList<>(keysSize);
for (int i = 0; i < keysSize; i++)
keys.add(i);
long start = System.currentTimeMillis();
int iterations = 10000000;
for (int i = 0; i < iterations; i++)
aff.mapKeyToNode(DEFAULT_CACHE_NAME, keys);
long diff = System.currentTimeMillis() - start;
info(">>> Map " + keysSize + " keys to " + grid.cluster().nodes().size() +
" nodes " + iterations + " times in " + diff + "ms.");
assertTrue(diff < 25000);
}
}