blob: 07d0b758d0c79d9025e786d49c6a615cf7666fc1 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.processors.cache.distributed.near;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.locks.Lock;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.cache.CachePeekMode;
import org.apache.ignite.cache.CacheWriteSynchronizationMode;
import org.apache.ignite.cache.store.CacheStoreAdapter;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.configuration.NearCacheConfiguration;
import org.apache.ignite.internal.processors.cache.GridCacheAdapter;
import org.apache.ignite.internal.processors.cache.GridCacheEntryEx;
import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
import org.apache.ignite.testframework.MvccFeatureChecker;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.apache.ignite.transactions.Transaction;
import org.junit.Before;
import org.junit.Test;
import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
import static org.apache.ignite.cache.CacheMode.PARTITIONED;
import static org.apache.ignite.transactions.TransactionConcurrency.OPTIMISTIC;
import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC;
import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ;
/**
* Single node test for near cache.
*/
public class GridCacheNearOneNodeSelfTest extends GridCommonAbstractTest {
/** Cache store. */
private static TestStore store = new TestStore();
/** */
@Before
public void beforeGridCacheNearOneNodeSelfTest() {
MvccFeatureChecker.skipIfNotSupported(MvccFeatureChecker.Feature.NEAR_CACHE);
}
/**
*
*/
public GridCacheNearOneNodeSelfTest() {
super(true /*start grid. */);
}
/** {@inheritDoc} */
@Override protected void afterTest() throws Exception {
store.reset();
jcache().removeAll();
assertEquals("DHT entries: " + dht().entrySet(), 0, dht().size());
assertEquals("Near entries: " + near().entrySet(), 0, near().size());
assertEquals(0, jcache().size());
}
/** {@inheritDoc} */
@SuppressWarnings("unchecked")
@Override protected IgniteConfiguration getConfiguration() throws Exception {
MvccFeatureChecker.skipIfNotSupported(MvccFeatureChecker.Feature.NEAR_CACHE);
IgniteConfiguration cfg = super.getConfiguration();
TcpDiscoverySpi disco = new TcpDiscoverySpi();
disco.setIpFinder(new TcpDiscoveryVmIpFinder(true));
cfg.setDiscoverySpi(disco);
CacheConfiguration cacheCfg = defaultCacheConfiguration();
cacheCfg.setCacheMode(PARTITIONED);
cacheCfg.setBackups(1);
cacheCfg.setAtomicityMode(TRANSACTIONAL);
cacheCfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC);
cacheCfg.setNearConfiguration(new NearCacheConfiguration());
cacheCfg.setCacheStoreFactory(singletonFactory(store));
cacheCfg.setReadThrough(true);
cacheCfg.setWriteThrough(true);
cacheCfg.setLoadPreviousValue(true);
cfg.setCacheConfiguration(cacheCfg);
return cfg;
}
/** @throws Exception If failed. */
@Test
public void testRemove() throws Exception {
IgniteCache<Object, Object> near = jcache();
assertEquals("DHT entries: " + dht().entries(), 0, dht().size());
assertEquals("Near entries: " + near().entries(), 0, near().size());
assertEquals(0, near.size());
for (int i = 0; i < 10; i++)
near.put(i, Integer.toString(i));
assertEquals("DHT entries: " + dht().entries(), 10, dht().size());
assertEquals("Near entries: " + near().entries(), 10, near().size());
assertEquals(10, near.size());
near.remove(0);
assertEquals("DHT entries: " + dht().entries(), 9, dht().size());
assertEquals("Near entries: " + near().entries(), 9, near().size());
assertEquals(9, near.size());
near.removeAll();
assertEquals("DHT entries: " + dht().entries(), 0, dht().size());
assertEquals("Near entries: " + near().entries(), 0, near().size());
assertEquals(0, near.size());
}
/** @throws Exception If failed. */
@Test
public void testReadThrough() throws Exception {
IgniteCache<Integer, String> near = jcache();
GridCacheAdapter<Integer, String> dht = dht();
String s = near.get(1);
assert s != null;
assertEquals(s, "1");
assertEquals(1, near.size());
assertEquals(1, near.size());
String d = localPeek(dht, 1);
assert d != null;
assertEquals(d, "1");
assert dht.size() == 1;
assert dht.size() == 1;
assert store.hasValue(1);
}
/**
* Test Optimistic repeatable read write-through.
*
* @throws Exception If failed.
*/
@Test
public void testOptimisticTxWriteThrough() throws Exception {
IgniteCache<Object, Object> near = jcache();
GridCacheAdapter<Integer, String> dht = dht();
try (Transaction tx = grid().transactions().txStart(OPTIMISTIC, REPEATABLE_READ) ) {
near.put(2, "2");
near.put(3, "3");
assert "2".equals(near.get(2));
assert "3".equals(near.get(3));
GridCacheEntryEx entry = dht.peekEx(2);
assert entry == null || entry.rawGet() == null : "Invalid entry: " + entry;
tx.commit();
}
assert "2".equals(near.get(2));
assert "3".equals(near.get(3));
assert "2".equals(dht.get(2));
assert "3".equals(dht.get(3));
assertEquals(2, near.size());
assertEquals(2, near.size());
assertEquals(2, dht.size());
assertEquals(2, dht.size());
}
/** @throws Exception If failed. */
@Test
public void testSingleLockPut() throws Exception {
IgniteCache<Integer, String> near = jcache();
Lock lock = near.lock(1);
lock.lock();
try {
near.put(1, "1");
near.put(2, "2");
String one = near.getAndPut(1, "3");
assertNotNull(one);
assertEquals("1", one);
}
finally {
lock.unlock();
}
}
/** @throws Exception If failed. */
@Test
public void testSingleLock() throws Exception {
IgniteCache<Integer, String> near = jcache();
Lock lock = near.lock(1);
lock.lock();
try {
near.put(1, "1");
assertEquals("1", near.localPeek(1, CachePeekMode.ONHEAP));
assertEquals("1", dhtPeek(1));
assertEquals("1", near.get(1));
assertEquals("1", near.getAndRemove(1));
assertNull(near.localPeek(1, CachePeekMode.ONHEAP));
assertNull(dhtPeek(1));
assertTrue(near.isLocalLocked(1, false));
assertTrue(near.isLocalLocked(1, true));
}
finally {
lock.unlock();
}
assertFalse(near.isLocalLocked(1, false));
assertFalse(near.isLocalLocked(1, true));
}
/** @throws Exception If failed. */
@Test
public void testSingleLockReentry() throws Exception {
IgniteCache<Integer, String> near = jcache();
Lock lock = near.lock(1);
lock.lock();
try {
near.put(1, "1");
assertEquals("1", near.localPeek(1, CachePeekMode.ONHEAP));
assertEquals("1", dhtPeek(1));
assertTrue(near.isLocalLocked(1, false));
assertTrue(near.isLocalLocked(1, true));
lock.lock(); // Reentry.
try {
assertEquals("1", near.get(1));
assertEquals("1", near.getAndRemove(1));
assertNull(near.localPeek(1, CachePeekMode.ONHEAP));
assertNull(dhtPeek(1));
assertTrue(near.isLocalLocked(1, false));
assertTrue(near.isLocalLocked(1, true));
}
finally {
lock.unlock();
}
assertTrue(near.isLocalLocked(1, false));
assertTrue(near.isLocalLocked(1, true));
}
finally {
lock.unlock();
}
assertFalse(near.isLocalLocked(1, false));
assertFalse(near.isLocalLocked(1, true));
}
/** @throws Exception If failed. */
@Test
public void testTransactionSingleGet() throws Exception {
IgniteCache<Object, Object> cache = jcache();
cache.put(1, "val1");
assertEquals("val1", dhtPeek(1));
assertNull(near().peekEx(1));
Transaction tx = grid().transactions().txStart(PESSIMISTIC, REPEATABLE_READ);
assertEquals("val1", cache.get(1));
tx.commit();
assertEquals("val1", dhtPeek(1));
assertNull(near().peekEx(1));
}
/** @throws Exception If failed. */
@Test
public void testTransactionSingleGetRemove() throws Exception {
IgniteCache<Object, Object> cache = jcache();
cache.put(1, "val1");
assertEquals("val1", dhtPeek(1));
assertNull(near().peekEx(1));
Transaction tx = grid().transactions().txStart(PESSIMISTIC, REPEATABLE_READ);
assertEquals("val1", cache.get(1));
assertTrue(cache.remove(1));
tx.commit();
assertNull(dhtPeek(1));
assertNull(near().peekEx(1));
}
/**
*
*/
private static class TestStore extends CacheStoreAdapter<Integer, String> {
/** Map. */
private ConcurrentMap<Integer, String> map = new ConcurrentHashMap<>();
/** Create flag. */
private volatile boolean create = true;
/**
*
*/
void reset() {
map.clear();
create = true;
}
/** @return Create flag. */
boolean isCreate() {
return create;
}
/**
* @param key Key.
* @return Value.
*/
String value(Integer key) {
return map.get(key);
}
/**
* @param key Key.
* @return {@code True} if has value.
*/
boolean hasValue(Integer key) {
return map.containsKey(key);
}
/** @return {@code True} if empty. */
boolean isEmpty() {
return map.isEmpty();
}
/** {@inheritDoc} */
@Override public String load(Integer key) {
if (!create)
return map.get(key);
String s = map.putIfAbsent(key, key.toString());
return s == null ? key.toString() : s;
}
/** {@inheritDoc} */
@Override public void write(javax.cache.Cache.Entry<? extends Integer, ? extends String> e) {
map.put(e.getKey(), e.getValue());
}
/** {@inheritDoc} */
@Override public void delete(Object key) {
map.remove(key);
}
}
}