blob: 525890dd32993495f608bea24690dae766896f65 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.processors.cache;
import java.util.Collection;
import java.util.HashSet;
import java.util.Map;
import java.util.Random;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import javax.cache.Cache;
import com.google.common.collect.ImmutableSet;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.cache.CachePeekMode;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.util.GridConcurrentHashSet;
import org.apache.ignite.internal.util.typedef.C1;
import org.apache.ignite.internal.util.typedef.CA;
import org.apache.ignite.internal.util.typedef.CAX;
import org.apache.ignite.internal.util.typedef.CI1;
import org.apache.ignite.internal.util.typedef.CIX1;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.lang.IgniteInClosure;
import org.apache.ignite.testframework.GridTestUtils;
import org.junit.Test;
/**
* Multithreaded cache API tests.
*/
public abstract class GridCacheAbstractFullApiMultithreadedSelfTest extends GridCacheAbstractSelfTest {
/** */
private static final Random RND = new Random();
/** */
private static final int WRITE_THREAD_CNT = 3;
/** */
private static final int READ_THREAD_CNT = 3;
/** */
private static final String WRITE_THREAD_NAME = "write-thread";
/** */
private static final String READ_THREAD_NAME = "read-thread";
/** */
private static final int PUT_CNT = 1000;
/** */
private final AtomicInteger cnt = new AtomicInteger();
/** */
private final AtomicBoolean guard = new AtomicBoolean();
/** */
private final Collection<Integer> set = new GridConcurrentHashSet<>();
/** {@inheritDoc} */
@Override protected void beforeTest() throws Exception {
super.beforeTest();
cnt.set(0);
guard.set(false);
set.clear();
}
/**
* @param c Test closure.
* @throws Exception In case of error.
*/
private void runTest(final IgniteInClosure<IgniteCache<String, Integer>> c) throws Exception {
final IgniteInternalFuture<?> fut1 = GridTestUtils.runMultiThreadedAsync(new CAX() {
@Override public void applyx() {
while (true) {
int i = cnt.getAndIncrement();
if (i >= PUT_CNT)
break;
jcache().put("key" + i, i);
set.add(i);
if (i > 10)
guard.compareAndSet(false, true);
}
}
}, WRITE_THREAD_CNT, WRITE_THREAD_NAME);
IgniteInternalFuture<?> fut2 = GridTestUtils.runMultiThreadedAsync(new CA() {
@Override public void apply() {
IgniteCache<String, Integer> cache = jcache();
while (!fut1.isDone())
if (guard.get())
c.apply(cache);
}
}, READ_THREAD_CNT, READ_THREAD_NAME);
fut1.get();
fut2.get();
checkConsistency();
}
/**
*
*/
private void checkConsistency() {
for (Cache.Entry<String, Integer> e : jcache())
for (int i = 1; i < gridCount(); i++) {
Integer val = jcache(i).get(e.getKey());
if (val == null)
assert e.getValue() == null;
else
assert val.equals(e.getValue());
}
}
/**
* @return Random.
*/
private int random() {
int rnd;
do
rnd = RND.nextInt(PUT_CNT);
while (!set.contains(rnd));
return rnd;
}
/**
* @param fromIncl Inclusive start of the range.
* @param toExcl Exclusive stop of the range.
* @return Range of keys.
*/
private Set<String> rangeKeys(int fromIncl, int toExcl) {
return new TreeSet<>(F.transform(F.range(fromIncl, toExcl), new C1<Integer, String>() {
@Override public String apply(Integer i) {
return "key" + i;
}
}));
}
/**
* @throws Exception In case of error.
*/
@Test
public void testContainsKey() throws Exception {
runTest(new CI1<IgniteCache<String, Integer>>() {
@Override public void apply(IgniteCache<String, Integer> cache) {
assert cache.containsKey("key" + random());
assert !cache.containsKey("wrongKey");
}
});
}
/**
* @throws Exception In case of error.
*/
@Test
public void testGet() throws Exception {
runTest(new CIX1<IgniteCache<String, Integer>>() {
@Override public void applyx(IgniteCache<String, Integer> cache) {
int rnd = random();
assert cache.get("key" + rnd) == rnd;
assert cache.get("wrongKey") == null;
}
});
}
/**
* @throws Exception In case of error.
*/
@Test
public void testGetAsyncOld() throws Exception {
runTest(new CIX1<IgniteCache<String, Integer>>() {
@Override public void applyx(IgniteCache<String, Integer> cache) {
int rnd = random();
IgniteCache<String, Integer> cacheAsync = cache.withAsync();
cacheAsync.get("key" + rnd);
assert cacheAsync.<Integer>future().get() == rnd;
cacheAsync.get("wrongKey");
assert cacheAsync.future().get() == null;
}
});
}
/**
* @throws Exception In case of error.
*/
@Test
public void testGetAsync() throws Exception {
runTest(new CIX1<IgniteCache<String, Integer>>() {
@Override public void applyx(IgniteCache<String, Integer> cache) {
int rnd = random();
assert cache.getAsync("key" + rnd).get() == rnd;
assert cache.getAsync("wrongKey").get() == null;
}
});
}
/**
* @throws Exception In case of error.
*/
@Test
public void testGetAll() throws Exception {
runTest(new CIX1<IgniteCache<String, Integer>>() {
@Override public void applyx(IgniteCache<String, Integer> cache) {
int rnd1 = random();
int rnd2 = random();
Map<String, Integer> map = cache.getAll(ImmutableSet.of("key" + rnd1, "key" + rnd2));
assert map.size() == (rnd1 != rnd2 ? 2 : 1);
assert map.get("key" + rnd1) == rnd1;
assert map.get("key" + rnd2) == rnd2;
}
});
}
/**
* @throws Exception In case of error.
*/
@Test
public void testGetAllAsyncOld() throws Exception {
runTest(new CIX1<IgniteCache<String, Integer>>() {
@Override public void applyx(IgniteCache<String, Integer> cache) {
int rnd1 = random();
int rnd2 = random();
IgniteCache<String, Integer> cacheAsync = cache.withAsync();
cacheAsync.getAll(ImmutableSet.of("key" + rnd1, "key" + rnd2));
Map<String, Integer> map = cacheAsync.<Map<String, Integer>>future().get();
assert map.size() == (rnd1 != rnd2 ? 2 : 1);
assert map.get("key" + rnd1) == rnd1;
assert map.get("key" + rnd2) == rnd2;
}
});
}
/**
* @throws Exception In case of error.
*/
@Test
public void testGetAllAsync() throws Exception {
runTest(new CIX1<IgniteCache<String, Integer>>() {
@Override public void applyx(IgniteCache<String, Integer> cache) {
int rnd1 = random();
int rnd2 = random();
Map<String, Integer> map = cache.getAllAsync(ImmutableSet.of("key" + rnd1, "key" + rnd2)).get();
assert map.size() == (rnd1 != rnd2 ? 2 : 1);
assert map.get("key" + rnd1) == rnd1;
assert map.get("key" + rnd2) == rnd2;
}
});
}
/**
* @throws Exception In case of error.
*/
@Test
public void testRemove() throws Exception {
runTest(new CIX1<IgniteCache<String, Integer>>() {
@Override public void applyx(IgniteCache<String, Integer> cache) {
int rnd1 = random();
int rnd2 = random();
assert cache.getAndRemove("wrongKey") == null;
assert !cache.remove("key" + rnd1, -1);
Integer v1 = cache.localPeek("key" + rnd1, CachePeekMode.ONHEAP);
Integer v2 = cache.localPeek("key" + rnd2, CachePeekMode.ONHEAP);
assert v1 == null || v1 == rnd1;
assert v2 == null || v2 == rnd2;
v1 = cache.getAndRemove("key" + rnd1);
assert cache.localPeek("key" + rnd1, CachePeekMode.ONHEAP) == null && (v1 == null || v1 == rnd1);
assert cache.getAndRemove("key" + rnd1) == null;
cache.remove("key" + rnd2, rnd2);
assert cache.localPeek("key" + rnd2, CachePeekMode.ONHEAP) == null;
}
});
}
/**
* @throws Exception In case of error.
*/
@Test
public void testRemoveAsyncOld() throws Exception {
runTest(new CIX1<IgniteCache<String, Integer>>() {
@Override public void applyx(IgniteCache<String, Integer> cache) {
int rnd1 = random();
int rnd2 = random();
IgniteCache<String, Integer> cacheAsync = cache.withAsync();
cacheAsync.getAndRemove("wrongKey");
assert cacheAsync.future().get() == null;
cacheAsync.remove("key" + rnd1, -1);
assert !cacheAsync.<Boolean>future().get();
Integer v1 = cache.localPeek("key" + rnd1, CachePeekMode.ONHEAP);
Integer v2 = cache.localPeek("key" + rnd2, CachePeekMode.ONHEAP);
assert v1 == null || v1 == rnd1;
assert v2 == null || v2 == rnd2;
v1 = removeAsync(cache, "key" + rnd1);
assert cache.localPeek("key" + rnd1, CachePeekMode.ONHEAP) == null && (v1 == null || v1 == rnd1);
assert cache.getAndRemove("key" + rnd1) == null;
removeAsync(cache, "key" + rnd2, rnd2);
assert cache.localPeek("key" + rnd2, CachePeekMode.ONHEAP) == null;
}
});
}
/**
* @throws Exception In case of error.
*/
@Test
public void testRemoveAsync() throws Exception {
runTest(new CIX1<IgniteCache<String, Integer>>() {
@Override public void applyx(IgniteCache<String, Integer> cache) {
int rnd1 = random();
int rnd2 = random();
assert cache.getAndRemoveAsync("wrongKey").get() == null;
assert !cache.removeAsync("key" + rnd1, -1).get();
Integer v1 = cache.localPeek("key" + rnd1, CachePeekMode.ONHEAP);
Integer v2 = cache.localPeek("key" + rnd2, CachePeekMode.ONHEAP);
assert v1 == null || v1 == rnd1;
assert v2 == null || v2 == rnd2;
v1 = removeAsync(cache, "key" + rnd1);
assert cache.localPeek("key" + rnd1, CachePeekMode.ONHEAP) == null && (v1 == null || v1 == rnd1);
assert cache.getAndRemove("key" + rnd1) == null;
removeAsync(cache, "key" + rnd2, rnd2);
assert cache.localPeek("key" + rnd2, CachePeekMode.ONHEAP) == null;
}
});
}
/**
* @throws Exception In case of error.
*/
@Test
public void testRemoveAll() throws Exception {
runTest(new CIX1<IgniteCache<String, Integer>>() {
@Override public void applyx(IgniteCache<String, Integer> cache) {
int rnd = random();
Set<Integer> ids = new HashSet<>(set);
cache.removeAll(rangeKeys(0, rnd));
for (int i = 0; i < rnd; i++) {
if (ids.contains(i))
assertNull(cache.localPeek("key" + i));
}
}
});
}
/**
* @throws Exception In case of error.
*/
@Test
public void testRemoveAllAsyncOld() throws Exception {
runTest(new CIX1<IgniteCache<String, Integer>>() {
@Override public void applyx(IgniteCache<String, Integer> cache) {
int rnd = random();
IgniteCache<String, Integer> cacheAsync = cache.withAsync();
Set<Integer> ids = new HashSet<>(set);
cacheAsync.removeAll(rangeKeys(0, rnd));
cacheAsync.future().get();
for (int i = 0; i < rnd; i++) {
if (ids.contains(i))
assertNull(cache.localPeek("key" + i));
}
}
});
}
/**
* @throws Exception In case of error.
*/
@Test
public void testRemoveAllAsync() throws Exception {
runTest(new CIX1<IgniteCache<String, Integer>>() {
@Override public void applyx(IgniteCache<String, Integer> cache) {
int rnd = random();
Set<Integer> ids = new HashSet<>(set);
cache.removeAllAsync(rangeKeys(0, rnd)).get();
for (int i = 0; i < rnd; i++) {
if (ids.contains(i))
assertNull(cache.localPeek("key" + i));
}
}
});
}
/**
* @param cache Cache.
* @param key Key.
* @return Removed value.
*/
private <K, V> V removeAsync(IgniteCache<K, V> cache, K key) {
return cache.getAndRemoveAsync(key).get();
}
/**
* @param cache Cache.
* @param key Key.
* @param val Value.
* @return Remove result.
*/
private <K, V> boolean removeAsync(IgniteCache<K, V> cache, K key, V val) {
return cache.removeAsync(key, val).get();
}
}