blob: 0c52154537264878caa2b9ed5ba6195abfc4e01c [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.processors.cache;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import org.apache.ignite.Ignite;
import org.apache.ignite.Ignition;
import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.TestRecordingCommunicationSpi;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.CachePartitionFullCountersMap;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.CachePartitionPartialCountersMap;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionFullMap;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionMap;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsAbstractMessage;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsFullMessage;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsSingleMessage;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.lang.IgniteBiPredicate;
import org.apache.ignite.lang.IgnitePredicate;
import org.apache.ignite.plugin.extensions.communication.Message;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.junit.Test;
import static org.apache.ignite.testframework.GridTestUtils.getFieldValue;
/**
*
*/
public class CacheExchangeMessageDuplicatedStateTest extends GridCommonAbstractTest {
/** */
private static final String AFF1_CACHE1 = "a1c1";
/** */
private static final String AFF1_CACHE2 = "a1c2";
/** */
private static final String AFF3_CACHE1 = "a3c1";
/** */
private static final String AFF4_FILTER_CACHE1 = "a4c1";
/** */
private static final String AFF4_FILTER_CACHE2 = "a4c2";
/** {@inheritDoc} */
@Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
cfg.setUserAttributes(Collections.singletonMap("name", igniteInstanceName));
TestRecordingCommunicationSpi commSpi = new TestRecordingCommunicationSpi();
commSpi.record(new IgniteBiPredicate<ClusterNode, Message>() {
@Override public boolean apply(ClusterNode node, Message msg) {
return (msg.getClass() == GridDhtPartitionsSingleMessage.class ||
msg.getClass() == GridDhtPartitionsFullMessage.class) &&
((GridDhtPartitionsAbstractMessage)msg).exchangeId() != null;
}
});
cfg.setCommunicationSpi(commSpi);
List<CacheConfiguration> ccfgs = new ArrayList<>();
{
CacheConfiguration ccfg = new CacheConfiguration(DEFAULT_CACHE_NAME);
ccfg.setName(AFF1_CACHE1);
ccfg.setAffinity(new RendezvousAffinityFunction(false, 512));
ccfgs.add(ccfg);
}
{
CacheConfiguration ccfg = new CacheConfiguration(DEFAULT_CACHE_NAME);
ccfg.setName(AFF1_CACHE2);
ccfg.setAffinity(new RendezvousAffinityFunction(false, 512));
ccfgs.add(ccfg);
}
{
CacheConfiguration ccfg = new CacheConfiguration(DEFAULT_CACHE_NAME);
ccfg.setName(AFF3_CACHE1);
ccfg.setBackups(3);
RendezvousAffinityFunction aff = new RendezvousAffinityFunction(false, 64);
ccfg.setAffinity(aff);
ccfgs.add(ccfg);
}
{
CacheConfiguration ccfg = new CacheConfiguration(DEFAULT_CACHE_NAME);
ccfg.setName(AFF4_FILTER_CACHE1);
ccfg.setNodeFilter(new TestNodeFilter());
ccfg.setAffinity(new RendezvousAffinityFunction());
ccfgs.add(ccfg);
}
{
CacheConfiguration ccfg = new CacheConfiguration(DEFAULT_CACHE_NAME);
ccfg.setName(AFF4_FILTER_CACHE2);
ccfg.setNodeFilter(new TestNodeFilter());
ccfg.setAffinity(new RendezvousAffinityFunction());
ccfgs.add(ccfg);
}
cfg.setCacheConfiguration(ccfgs.toArray(new CacheConfiguration[ccfgs.size()]));
return cfg;
}
/** {@inheritDoc} */
@Override protected void beforeTestsStarted() throws Exception {
super.beforeTestsStarted();
startGrid(0);
}
/**
* @throws Exception If failed.
*/
@Test
public void testExchangeMessages() throws Exception {
ignite(0);
final int SRVS = 4;
for (int i = 1; i < SRVS; i++) {
startGrid(i);
awaitPartitionMapExchange();
checkMessages(0, true);
}
startClientGrid(SRVS);
awaitPartitionMapExchange();
checkMessages(0, false);
}
/**
* @param crdIdx Coordinator node index.
* @param checkSingle {@code True} if need check single messages.
*/
private void checkMessages(int crdIdx, boolean checkSingle) {
checkFullMessages(crdIdx);
if (checkSingle)
checkSingleMessages(crdIdx);
}
/**
* @param crdIdx Coordinator node index.
*/
private void checkFullMessages(int crdIdx) {
TestRecordingCommunicationSpi commSpi0 =
(TestRecordingCommunicationSpi)ignite(crdIdx).configuration().getCommunicationSpi();
List<Object> msgs = commSpi0.recordedMessages(false);
assertTrue(!msgs.isEmpty());
for (Object msg : msgs) {
assertTrue("Unexpected messages: " + msg, msg instanceof GridDhtPartitionsFullMessage);
checkFullMessage((GridDhtPartitionsFullMessage)msg);
}
}
/**
* @param crdIdx Coordinator node index.
*/
private void checkSingleMessages(int crdIdx) {
int cnt = 0;
for (Ignite ignite : Ignition.allGrids()) {
if (getTestIgniteInstanceName(crdIdx).equals(ignite.name()) || ignite.configuration().isClientMode())
continue;
TestRecordingCommunicationSpi commSpi0 =
(TestRecordingCommunicationSpi)ignite.configuration().getCommunicationSpi();
List<Object> msgs = commSpi0.recordedMessages(false);
assertTrue(!msgs.isEmpty());
for (Object msg : msgs) {
assertTrue("Unexpected messages: " + msg, msg instanceof GridDhtPartitionsSingleMessage);
checkSingleMessage((GridDhtPartitionsSingleMessage)msg);
}
cnt++;
}
assertTrue(cnt > 0);
}
/**
* @param msg Message.
*/
private void checkFullMessage(GridDhtPartitionsFullMessage msg) {
Map<Integer, Integer> dupPartsData = getFieldValue(msg, "dupPartsData");
assertNotNull(dupPartsData);
checkFullMessage(AFF1_CACHE1, AFF1_CACHE2, dupPartsData, msg);
checkFullMessage(AFF4_FILTER_CACHE1, AFF4_FILTER_CACHE2, dupPartsData, msg);
assertFalse(dupPartsData.containsKey(CU.cacheId(AFF3_CACHE1)));
Map<Integer, CachePartitionFullCountersMap> partCntrs =
getFieldValue(getFieldValue(msg, "partCntrs2"), "map");
if (partCntrs != null) {
for (CachePartitionFullCountersMap cntrs : partCntrs.values()) {
long[] initialUpdCntrs = getFieldValue(cntrs, "initialUpdCntrs");
long[] updCntrs = getFieldValue(cntrs, "updCntrs");
for (int i = 0; i < initialUpdCntrs.length; i++) {
assertEquals(0, initialUpdCntrs[i]);
assertEquals(0, updCntrs[i]);
}
}
}
}
/**
* @param msg Message.
*/
private void checkSingleMessage(GridDhtPartitionsSingleMessage msg) {
Map<Integer, Integer> dupPartsData = getFieldValue(msg, "dupPartsData");
assertNotNull(dupPartsData);
checkSingleMessage(AFF1_CACHE1, AFF1_CACHE2, dupPartsData, msg);
checkSingleMessage(AFF4_FILTER_CACHE1, AFF4_FILTER_CACHE2, dupPartsData, msg);
assertFalse(dupPartsData.containsKey(CU.cacheId(AFF3_CACHE1)));
Map<Integer, CachePartitionPartialCountersMap> partCntrs = getFieldValue(msg, "partCntrs");
if (partCntrs != null) {
for (CachePartitionPartialCountersMap cntrs : partCntrs.values())
assertEquals(0, cntrs.size());
}
}
/**
* @param cache1 Cache 1.
* @param cache2 Cache 2.
* @param dupPartsData Duplicated data map.
* @param msg Message.
*/
private void checkFullMessage(String cache1,
String cache2,
Map<Integer, Integer> dupPartsData,
GridDhtPartitionsFullMessage msg
) {
int cache1Grp = groupIdForCache(ignite(0), cache1);
int cache2Grp = groupIdForCache(ignite(0), cache2);
Integer grpId;
Integer dupGrpId;
if (dupPartsData.containsKey(cache1Grp)) {
grpId = cache1Grp;
dupGrpId = cache2Grp;
}
else {
grpId = cache2Grp;
dupGrpId = cache1Grp;
}
assertTrue(dupPartsData.containsKey(grpId));
assertEquals(dupGrpId, dupPartsData.get(grpId));
assertFalse(dupPartsData.containsKey(dupGrpId));
Map<Integer, GridDhtPartitionFullMap> parts = msg.partitions();
GridDhtPartitionFullMap emptyFullMap = parts.get(grpId);
for (GridDhtPartitionMap map : emptyFullMap.values())
assertEquals(0, map.map().size());
GridDhtPartitionFullMap fullMap = parts.get(dupGrpId);
for (GridDhtPartitionMap map : fullMap.values())
assertFalse(map.map().isEmpty());
}
/**
* @param cache1 Cache 1.
* @param cache2 Cache 2.
* @param dupPartsData Duplicated data map.
* @param msg Message.
*/
private void checkSingleMessage(String cache1,
String cache2,
Map<Integer, Integer> dupPartsData,
GridDhtPartitionsSingleMessage msg
) {
if (!F.isEmpty(msg.cacheGroupsAffinityRequest())) {
for (GridDhtPartitionMap map : msg.partitions().values())
assertTrue(F.isEmpty(map.map()));
return;
}
int cache1Grp = groupIdForCache(ignite(0), cache1);
int cache2Grp = groupIdForCache(ignite(0), cache2);
Integer grpId;
Integer dupGrpId;
if (dupPartsData.containsKey(cache1Grp)) {
grpId = cache1Grp;
dupGrpId = cache2Grp;
}
else {
grpId = cache2Grp;
dupGrpId = cache1Grp;
}
assertTrue(dupPartsData.containsKey(grpId));
assertEquals(dupGrpId, dupPartsData.get(grpId));
assertFalse(dupPartsData.containsKey(dupGrpId));
Map<Integer, GridDhtPartitionMap> parts = msg.partitions();
GridDhtPartitionMap emptyMap = parts.get(grpId);
assertEquals(0, emptyMap.map().size());
GridDhtPartitionMap map = parts.get(dupGrpId);
assertFalse(map.map().isEmpty());
}
/**
*
*/
private static class TestNodeFilter implements IgnitePredicate<ClusterNode> {
/** {@inheritDoc} */
@Override public boolean apply(ClusterNode node) {
// Do not start cache on coordinator.
return !((String)node.attribute("name")).endsWith("0");
}
}
}