blob: fab9be9e896d219ad7c4225de094a8c7e008b7e4 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.processors.cache;
import java.util.Random;
import java.util.concurrent.Callable;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.cache.affinity.Affinity;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.NearCacheConfiguration;
import org.apache.ignite.testframework.GridTestUtils.SF;
import org.apache.ignite.testframework.MvccFeatureChecker;
import org.junit.Assume;
import org.junit.Test;
import static org.apache.ignite.IgniteSystemProperties.IGNITE_ATOMIC_CACHE_DELETE_HISTORY_SIZE;
import static org.apache.ignite.cache.CacheMode.PARTITIONED;
import static org.apache.ignite.cache.CacheRebalanceMode.SYNC;
import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
/**
*
*/
public abstract class GridCacheValueConsistencyAbstractSelfTest extends GridCacheAbstractSelfTest {
/** Number of threads for test. */
private static final int THREAD_CNT = 16;
/** */
private String sizePropVal;
/** {@inheritDoc} */
@Override protected int gridCount() {
return 4;
}
/** {@inheritDoc} */
@Override protected long getTestTimeout() {
return 5 * 60_000;
}
/** {@inheritDoc} */
@Override protected CacheConfiguration cacheConfiguration(String igniteInstanceName) throws Exception {
CacheConfiguration cCfg = super.cacheConfiguration(igniteInstanceName);
cCfg.setCacheMode(PARTITIONED);
cCfg.setAtomicityMode(atomicityMode());
cCfg.setNearConfiguration(nearConfiguration());
cCfg.setRebalanceMode(SYNC);
cCfg.setWriteSynchronizationMode(FULL_SYNC);
cCfg.setBackups(1);
return cCfg;
}
/** {@inheritDoc} */
@Override protected void beforeTestsStarted() throws Exception {
// Need to increase value set in GridAbstractTest
sizePropVal = System.getProperty(IGNITE_ATOMIC_CACHE_DELETE_HISTORY_SIZE);
System.setProperty(IGNITE_ATOMIC_CACHE_DELETE_HISTORY_SIZE, "100000");
super.beforeTestsStarted();
}
/** {@inheritDoc} */
@Override protected void beforeTest() throws Exception {
MvccFeatureChecker.skipIfNotSupported(MvccFeatureChecker.Feature.CACHE_STORE);
if (nearEnabled())
MvccFeatureChecker.skipIfNotSupported(MvccFeatureChecker.Feature.NEAR_CACHE);
}
/** {@inheritDoc} */
@Override protected void afterTestsStopped() throws Exception {
super.afterTestsStopped();
System.setProperty(IGNITE_ATOMIC_CACHE_DELETE_HISTORY_SIZE, sizePropVal != null ? sizePropVal : "");
}
/**
* @return Distribution mode.
*/
@Override protected NearCacheConfiguration nearConfiguration() {
return null;
}
/**
* @return Consistency test iteration count.
*/
protected abstract int iterationCount();
/**
* @throws Exception If failed.
*/
@Test
public void testPutRemove() throws Exception {
awaitPartitionMapExchange();
IgniteCache<String, Integer> cache = jcache();
int keyCnt = 10;
for (int i = 0; i < keyCnt; i++)
cache.put("key" + i, i);
for (int g = 0; g < gridCount(); g++) {
IgniteCache<String, Integer> cache0 = jcache(g);
ClusterNode locNode = grid(g).localNode();
for (int i = 0; i < keyCnt; i++) {
String key = "key" + i;
if (ignite(0).affinity(DEFAULT_CACHE_NAME).mapKeyToPrimaryAndBackups(key).contains(locNode)) {
info("Node is reported as affinity node for key [key=" + key + ", nodeId=" + locNode.id() + ']');
assertEquals((Integer)i, cache0.localPeek(key));
}
else {
info("Node is reported as NOT affinity node for key [key=" + key +
", nodeId=" + locNode.id() + ']');
if (nearEnabled() &&
((IgniteCacheProxy)cache).context().equals(((IgniteCacheProxy)cache0).context()))
assertEquals((Integer)i, cache0.localPeek(key));
else
assertNull(cache0.localPeek(key));
}
assertEquals((Integer)i, cache0.get(key));
}
}
info("Removing values from cache.");
for (int i = 0; i < keyCnt; i++)
assertEquals((Integer)i, cache.getAndRemove("key" + i));
for (int g = 0; g < gridCount(); g++) {
IgniteCache<String, Integer> cache0 = jcache(g);
for (int i = 0; i < keyCnt; i++) {
String key = "key" + i;
assertNull(cache0.localPeek(key));
assertNull(cache0.get(key));
}
}
}
/**
* @throws Exception If failed.
*/
@Test
public void testPutRemoveAll() throws Exception {
awaitPartitionMapExchange();
IgniteCache<String, Integer> cache = jcache();
int keyCnt = 10;
for (int i = 0; i < keyCnt; i++) {
info("Putting value to cache: " + i);
cache.put("key" + i, i);
}
for (int g = 0; g < gridCount(); g++) {
IgniteCache<String, Integer> cache0 = jcache(g);
ClusterNode locNode = grid(g).localNode();
for (int i = 0; i < keyCnt; i++) {
String key = "key" + i;
if (ignite(0).affinity(DEFAULT_CACHE_NAME).mapKeyToPrimaryAndBackups(key).contains(grid(g).localNode())) {
info("Node is reported as affinity node for key [key=" + key + ", nodeId=" + locNode.id() + ']');
assertEquals((Integer)i, cache0.localPeek(key));
}
else {
info("Node is reported as NOT affinity node for key [key=" + key +
", nodeId=" + locNode.id() + ']');
if (nearEnabled() &&
((IgniteCacheProxy)cache).context().equals(((IgniteCacheProxy)cache0).context()))
assertEquals((Integer)i, cache0.localPeek(key));
else
assertNull(cache0.localPeek(key));
}
assertEquals((Integer)i, cache0.get(key));
}
}
for (int g = 0; g < gridCount(); g++) {
info(">>>> Removing all values form cache: " + g);
jcache(g).removeAll();
}
info(">>>> Starting values check");
for (int g = 0; g < gridCount(); g++) {
IgniteCache<String, Integer> cache0 = jcache(g);
for (int i = 0; i < keyCnt; i++) {
String key = "key" + i;
assertNull(cache0.localPeek(key));
assertNull(cache0.get(key));
}
}
}
/**
* @throws Exception If failed.
*/
@Test
public void testPutConsistencyMultithreaded() throws Exception {
Assume.assumeFalse("https://issues.apache.org/jira/browse/IGNITE-627", nearEnabled());
for (int i = 0; i < 20; i++) {
log.info("Iteration: " + i);
final int range = 100;
final int iterCnt = 100;
final AtomicInteger threadId = new AtomicInteger();
final AtomicInteger iters = new AtomicInteger();
multithreadedAsync(new Callable<Object>() {
@Override public Object call() throws Exception {
Random rnd = new Random();
int g = threadId.getAndIncrement();
Ignite ignite = grid(g);
IgniteCache<Object, Object> cache = ignite.cache(DEFAULT_CACHE_NAME);
log.info("Update thread: " + ignite.name());
Thread.currentThread().setName("UpdateThread-" + ignite.name());
Long val = (long)g;
while (true) {
int i = iters.getAndIncrement();
if (i >= iterCnt)
break;
int k = rnd.nextInt(range);
cache.put(k, val);
}
return null;
}
}, gridCount()).get();
checkConsistency(range);
}
}
/**
* @throws Exception If failed.
*/
@Test
public void testPutRemoveConsistencyMultithreaded() throws Exception {
Assume.assumeFalse("https://issues.apache.org/jira/browse/IGNITE-627", nearEnabled());
for (int i = 0; i < SF.applyLB(10, 2); i++) {
log.info("Iteration: " + i);
putRemoveConsistencyMultithreaded();
}
}
/**
* @throws Exception If failed.
*/
private void putRemoveConsistencyMultithreaded() throws Exception {
final int range = 10_000;
final int iterCnt = iterationCount();
final AtomicInteger iters = new AtomicInteger();
multithreadedAsync(new Callable<Object>() {
@Override public Object call() throws Exception {
Random rnd = new Random();
while (true) {
int i = iters.getAndIncrement();
if (i >= iterCnt)
break;
int g = rnd.nextInt(gridCount());
Ignite ignite = grid(g);
IgniteCache<Object, Object> cache = ignite.cache(DEFAULT_CACHE_NAME);
int k = rnd.nextInt(range);
boolean rmv = rnd.nextBoolean();
if (!rmv)
cache.put(k, Thread.currentThread().getId());
else
cache.remove(k);
if (i > 0 && i % 5000 == 0)
info("Completed: " + i);
}
return null;
}
}, THREAD_CNT).get();
checkConsistency(range);
}
/**
* @param range Key range.
*/
private void checkConsistency(int range) {
int present = 0;
int absent = 0;
Affinity<Integer> aff = ignite(0).affinity(DEFAULT_CACHE_NAME);
boolean invalidVal = false;
for (int i = 0; i < range; i++) {
Long firstVal = null;
for (int g = 0; g < gridCount(); g++) {
Ignite ignite = grid(g);
Long val = (Long)ignite.cache(DEFAULT_CACHE_NAME).localPeek(i);
if (firstVal == null && val != null)
firstVal = val;
if (val != null) {
if (!firstVal.equals(val)) {
invalidVal = true;
boolean primary = aff.isPrimary(ignite.cluster().localNode(), i);
boolean backup = aff.isBackup(ignite.cluster().localNode(), i);
log.error("Invalid value detected [key=" + i +
", val=" + val +
", firstVal=" + firstVal +
", node=" + g +
", primary=" + primary +
", backup=" + backup + ']');
log.error("All values: ");
printValues(aff, i);
break;
}
}
}
if (firstVal == null)
absent++;
else
present++;
}
assertFalse("Inconsistent value found.", invalidVal);
info("Finished check [present=" + present + ", absent=" + absent + ']');
info("Checking keySet consistency");
}
/**
* @param aff Affinity.
* @param key Key.
*/
private void printValues(Affinity<Integer> aff, int key) {
for (int g = 0; g < gridCount(); g++) {
Ignite ignite = grid(g);
boolean primary = aff.isPrimary(ignite.cluster().localNode(), key);
boolean backup = aff.isBackup(ignite.cluster().localNode(), key);
Object val = ignite.cache(DEFAULT_CACHE_NAME).localPeek(key);
log.error("Node value [key=" + key +
", val=" + val +
", node=" + g +
", primary=" + primary +
", backup=" + backup + ']');
}
}
}