blob: 8658ac1356913e156e242fcb0f11f263a7708589 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.processors.cache.persistence;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.ignite.Ignite;
import org.apache.ignite.cache.CacheAtomicityMode;
import org.apache.ignite.cache.CacheMode;
import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.DataRegionConfiguration;
import org.apache.ignite.configuration.DataStorageConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.configuration.WALMode;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.IgniteKernal;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.CacheGroupDescriptor;
import org.apache.ignite.internal.processors.cache.IgniteInternalCache;
import org.apache.ignite.internal.util.typedef.G;
import org.apache.ignite.lang.IgniteUuid;
import org.apache.ignite.testframework.GridTestUtils;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.junit.Test;
import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
import static org.apache.ignite.cache.CacheMode.PARTITIONED;
import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
import static org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager.IGNITE_PDS_CHECKPOINT_TEST_SKIP_SYNC;
/**
* The test validates assignment after nodes restart with enabled persistence.
*/
public class IgnitePdsCacheAssignmentNodeRestartsTest extends GridCommonAbstractTest {
/** {@inheritDoc} */
@Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
cfg.setConsistentId(igniteInstanceName);
cfg.setDataStorageConfiguration(new DataStorageConfiguration()
.setPageSize(1024)
.setDefaultDataRegionConfiguration(new DataRegionConfiguration()
.setPersistenceEnabled(true)
.setInitialSize(50L * 1024 * 1024)
.setMaxSize(50L * 1024 * 1024)
)
.setWalMode(WALMode.LOG_ONLY)
);
return cfg;
}
/** {@inheritDoc} */
@Override protected void beforeTest() throws Exception {
super.beforeTest();
cleanPersistenceDir();
}
/** {@inheritDoc} */
@Override protected void afterTest() throws Exception {
stopAllGrids();
cleanPersistenceDir();
super.afterTest();
}
/**
* @param name Name.
* @param atomicityMode Atomicity mode.
* @param cacheMode Cache mode.
* @param backups Backups.
* @param grp Group.
* @return Cache configuration.
*/
private CacheConfiguration cacheConfiguration(String name,
CacheAtomicityMode atomicityMode,
CacheMode cacheMode,
int backups,
String grp) {
CacheConfiguration ccfg = new CacheConfiguration(name);
ccfg.setAtomicityMode(atomicityMode);
ccfg.setWriteSynchronizationMode(FULL_SYNC);
ccfg.setCacheMode(cacheMode);
ccfg.setGroupName(grp);
ccfg.setAffinity(new RendezvousAffinityFunction(false, 128));
if (cacheMode == PARTITIONED)
ccfg.setBackups(backups);
return ccfg;
}
/**
* @throws Exception If failed.
*/
@Test
public void testAssignmentAfterRestarts() throws Exception {
try {
System.setProperty(IGNITE_PDS_CHECKPOINT_TEST_SKIP_SYNC, "true");
final int gridsCnt = 5;
final int groupsCnt = 2;
final IgniteEx node = (IgniteEx)startGridsMultiThreaded(gridsCnt);
final List<CacheConfiguration> cfgs = Arrays.asList(
cacheConfiguration("g1c1", TRANSACTIONAL, PARTITIONED, gridsCnt, "testGrp1"),
cacheConfiguration("g1c2", TRANSACTIONAL, PARTITIONED, gridsCnt, "testGrp1"),
cacheConfiguration("g2c1", TRANSACTIONAL, PARTITIONED, gridsCnt, "testGrp2"),
cacheConfiguration("g2c2", TRANSACTIONAL, PARTITIONED, gridsCnt, "testGrp2"));
node.getOrCreateCaches(cfgs);
validateDepIds(groupsCnt);
stopAllGrids();
IgniteEx node2 = (IgniteEx)startGridsMultiThreaded(gridsCnt);
validateDepIds(groupsCnt); // Deployment ids must be the same on all nodes.
final int restartIdxFrom = 2;
final AtomicInteger idx = new AtomicInteger(restartIdxFrom);
IgniteInternalFuture fut = GridTestUtils.runMultiThreadedAsync(new Callable<Void>() {
@Override public Void call() throws Exception {
int nodeIdx = idx.getAndIncrement();
stopGrid(nodeIdx);
return null;
}
}, gridsCnt - restartIdxFrom, "stop-node");
fut.get();
awaitPartitionMapExchange();
checkAffinity();
idx.set(restartIdxFrom);
fut = GridTestUtils.runMultiThreadedAsync(new Callable<Void>() {
@Override public Void call() throws Exception {
int nodeIdx = idx.getAndIncrement();
startGrid(nodeIdx);
return null;
}
}, gridsCnt - restartIdxFrom, "start-node");
fut.get();
awaitPartitionMapExchange();
AffinityTopologyVersion topVer = node2.context().cache().context().exchange().readyAffinityVersion();
log.info("Using version: " + topVer);
checkAffinity();
}
finally {
System.clearProperty(IGNITE_PDS_CHECKPOINT_TEST_SKIP_SYNC);
}
}
/**
* @param grpCnt Group count.
*/
private void validateDepIds(int grpCnt) {
Map<Integer, IgniteUuid> depIds = new HashMap<>();
for (Ignite ignite : G.allGrids()) {
final Map<Integer, CacheGroupDescriptor> descMap = ((IgniteEx)ignite).context().cache().cacheGroupDescriptors();
for (Map.Entry<Integer, CacheGroupDescriptor> entry : descMap.entrySet()) {
final IgniteUuid u = entry.getValue().deploymentId();
final IgniteUuid u0 = depIds.get(entry.getKey());
if (u0 == null)
depIds.put(entry.getKey(), u);
else
assertEquals("Descriptors do not match", u0, u);
}
}
assertEquals(grpCnt + 1, depIds.size());
}
/**
* @throws Exception If failed.
*/
private void checkAffinity() throws Exception {
List<Ignite> nodes = G.allGrids();
ClusterNode crdNode = null;
for (Ignite node : nodes) {
ClusterNode locNode = node.cluster().localNode();
if (crdNode == null || locNode.order() < crdNode.order())
crdNode = locNode;
}
AffinityTopologyVersion topVer = ((IgniteKernal)grid(crdNode)).
context().cache().context().exchange().readyAffinityVersion();
Map<String, List<List<ClusterNode>>> affMap = new HashMap<>();
for (Ignite node : nodes) {
IgniteKernal node0 = (IgniteKernal)node;
for (IgniteInternalCache cache : node0.context().cache().caches()) {
List<List<ClusterNode>> aff = affMap.get(cache.name());
List<List<ClusterNode>> aff0 = cache.context().affinity().assignments(topVer);
if (aff != null)
assertEquals(aff, aff0);
else
affMap.put(cache.name(), aff0);
}
}
}
}