blob: e0d499e8fe08302cf2e15d53d2cf8323c6af508d [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.processors.cache.distributed.dht;
import java.util.Collection;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.Map;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.cache.Cache;
import javax.cache.configuration.Factory;
import javax.cache.integration.CacheLoaderException;
import javax.cache.integration.CacheWriterException;
import javax.cache.processor.EntryProcessorResult;
import javax.cache.processor.MutableEntry;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.cache.CacheAtomicityMode;
import org.apache.ignite.cache.CacheEntryProcessor;
import org.apache.ignite.cache.CachePartialUpdateException;
import org.apache.ignite.cache.eviction.lru.LruEvictionPolicy;
import org.apache.ignite.cache.store.CacheStore;
import org.apache.ignite.cache.store.CacheStoreAdapter;
import org.apache.ignite.configuration.AtomicConfiguration;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.IgniteKernal;
import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
import org.apache.ignite.internal.util.lang.GridAbsPredicate;
import org.apache.ignite.internal.util.typedef.X;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
import org.apache.ignite.testframework.GridTestUtils;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import static org.apache.ignite.cache.CacheRebalanceMode.SYNC;
import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
import static org.apache.ignite.testframework.GridTestUtils.runAsync;
/**
*
*/
public abstract class IgniteCachePutRetryAbstractSelfTest extends GridCommonAbstractTest {
/** */
protected static final long DURATION = GridTestUtils.SF.applyLB(30_000, 7_000);
/** */
protected static final int GRID_CNT = 4;
/**
* @return Keys count for the test.
*/
protected int keysCount() {
return 2_000;
}
/**
* @param evict If {@code true} adds eviction policy.
* @param store If {@code true} adds cache store.
* @return Cache configuration.
* @throws Exception If failed.
*/
@SuppressWarnings("unchecked")
protected CacheConfiguration cacheConfiguration(boolean evict, boolean store) throws Exception {
CacheConfiguration cfg = new CacheConfiguration(DEFAULT_CACHE_NAME);
cfg.setAtomicityMode(atomicityMode());
cfg.setWriteSynchronizationMode(FULL_SYNC);
cfg.setBackups(1);
cfg.setRebalanceMode(SYNC);
if (evict) {
LruEvictionPolicy plc = new LruEvictionPolicy();
plc.setMaxSize(100);
cfg.setEvictionPolicy(plc);
cfg.setOnheapCacheEnabled(true);
}
if (store) {
cfg.setCacheStoreFactory(new TestStoreFactory());
cfg.setWriteThrough(true);
}
return cfg;
}
/** {@inheritDoc} */
@Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
cfg.setIncludeEventTypes();
((TcpCommunicationSpi)cfg.getCommunicationSpi()).setSharedMemoryPort(-1);
AtomicConfiguration acfg = new AtomicConfiguration();
acfg.setBackups(1);
cfg.setAtomicConfiguration(acfg);
cfg.setIncludeEventTypes(new int[0]);
return cfg;
}
/** {@inheritDoc} */
@Override protected void beforeTestsStarted() throws Exception {
super.beforeTestsStarted();
startGridsMultiThreaded(GRID_CNT);
}
/** {@inheritDoc} */
@Override protected void afterTestsStopped() throws Exception {
super.afterTestsStopped();
stopAllGrids();
}
/** {@inheritDoc} */
@Override protected void afterTest() throws Exception {
super.afterTest();
try {
checkInternalCleanup();
}
finally {
ignite(0).destroyCache(DEFAULT_CACHE_NAME);
}
}
/**
* @return Cache atomicity mode.
*/
protected abstract CacheAtomicityMode atomicityMode();
/**
* @throws Exception If failed.
*/
@org.junit.Test
public void testPut() throws Exception {
checkRetry(Test.PUT, false, false);
}
/**
* @throws Exception If failed.
*/
@org.junit.Test
public void testGetAndPut() throws Exception {
checkRetry(Test.GET_AND_PUT, false, false);
}
/**
* @throws Exception If failed.
*/
@org.junit.Test
public void testPutStoreEnabled() throws Exception {
checkRetry(Test.PUT, false, true);
}
/**
* @throws Exception If failed.
*/
@org.junit.Test
public void testPutAll() throws Exception {
checkRetry(Test.PUT_ALL, false, false);
}
/**
* @throws Exception If failed.
*/
@org.junit.Test
public void testPutAsync() throws Exception {
checkRetry(Test.PUT_ASYNC, false, false);
}
/**
* @throws Exception If failed.
*/
@org.junit.Test
public void testPutAsyncStoreEnabled() throws Exception {
checkRetry(Test.PUT_ASYNC, false, true);
}
/**
* @throws Exception If failed.
*/
@org.junit.Test
public void testInvoke() throws Exception {
checkRetry(Test.INVOKE, false, false);
}
/**
* @throws Exception If failed.
*/
@org.junit.Test
public void testInvokeAll() throws Exception {
checkRetry(Test.INVOKE_ALL, false, false);
}
/**
* @throws Exception If failed.
*/
@org.junit.Test
public void testInvokeAllEvict() throws Exception {
checkRetry(Test.INVOKE_ALL, true, false);
}
/**
* @param test Test type.
* @param evict If {@code true} uses eviction policy
* @param store If {@code true} uses cache with store.
* @throws Exception If failed.
*/
protected final void checkRetry(Test test, boolean evict, boolean store) throws Exception {
ignite(0).createCache(cacheConfiguration(evict, store));
final AtomicBoolean finished = new AtomicBoolean();
int keysCnt = keysCount();
IgniteInternalFuture<Object> fut = runAsync(new Callable<Object>() {
@Override public Object call() throws Exception {
Random rnd = new Random();
while (!finished.get()) {
stopGrid(3);
U.sleep(300);
startGrid(3);
if (rnd.nextBoolean()) // OPC possible only when there is no migration from one backup to another.
awaitPartitionMapExchange();
}
return null;
}
});
final IgniteCache<Integer, Integer> cache = ignite(0).cache(DEFAULT_CACHE_NAME);
int iter = 0;
try {
long stopTime = System.currentTimeMillis() + DURATION;
switch (test) {
case PUT: {
while (System.currentTimeMillis() < stopTime) {
Integer val = ++iter;
for (int i = 0; i < keysCnt; i++)
cache.put(i, val);
for (int i = 0; i < keysCnt; i++)
assertEquals(val, cache.get(i));
}
break;
}
case GET_AND_PUT: {
for (int i = 0; i < keysCnt; i++)
cache.put(i, 0);
while (System.currentTimeMillis() < stopTime) {
Integer expOld = iter;
Integer val = ++iter;
for (int i = 0; i < keysCnt; i++) {
Integer old = cache.getAndPut(i, val);
assertTrue("Unexpected old value [old=" + old + ", exp=" + expOld + ']',
expOld.equals(old) || val.equals(old));
}
for (int i = 0; i < keysCnt; i++)
assertEquals(val, cache.get(i));
}
break;
}
case TX_PUT: {
while (System.currentTimeMillis() < stopTime) {
final Integer val = ++iter;
Ignite ignite = ignite(0);
for (int i = 0; i < keysCnt; i++) {
final Integer key = i;
doInTransaction(ignite, new Callable<Void>() {
@Override public Void call() throws Exception {
cache.put(key, val);
return null;
}
});
}
for (int i = 0; i < keysCnt; i++)
assertEquals(val, cache.get(i));
}
break;
}
case PUT_ALL: {
while (System.currentTimeMillis() < stopTime) {
Integer val = ++iter;
Map<Integer, Integer> map = new LinkedHashMap<>();
for (int i = 0; i < keysCnt; i++) {
map.put(i, val);
if (map.size() == 100 || i == keysCnt - 1) {
cache.putAll(map);
map.clear();
}
}
for (int i = 0; i < keysCnt; i++)
assertEquals(val, cache.get(i));
}
}
case PUT_ASYNC: {
while (System.currentTimeMillis() < stopTime) {
Integer val = ++iter;
for (int i = 0; i < keysCnt; i++)
cache.putAsync(i, val).get();
for (int i = 0; i < keysCnt; i++)
assertEquals(val, cache.getAsync(i).get());
}
break;
}
case INVOKE: {
while (System.currentTimeMillis() < stopTime) {
Integer val = ++iter;
Integer expOld = iter - 1;
for (int i = 0; i < keysCnt; i++) {
Integer old = cache.invoke(i, new SetEntryProcessor(val));
assertNotNull(old);
assertTrue(old.equals(expOld) || old.equals(val));
}
for (int i = 0; i < keysCnt; i++)
assertEquals(val, cache.get(i));
}
break;
}
case INVOKE_ALL: {
while (System.currentTimeMillis() < stopTime) {
Integer val = ++iter;
Integer expOld = iter - 1;
Set<Integer> keys = new LinkedHashSet<>();
for (int i = 0; i < keysCnt; i++) {
keys.add(i);
if (keys.size() == 100 || i == keysCnt - 1) {
Map<Integer, EntryProcessorResult<Integer>> resMap =
cache.invokeAll(keys, new SetEntryProcessor(val));
for (Integer key : keys) {
EntryProcessorResult<Integer> res = resMap.get(key);
assertNotNull(res);
Integer old = res.get();
assertTrue(old.equals(expOld) || old.equals(val));
}
assertEquals(keys.size(), resMap.size());
keys.clear();
}
}
for (int i = 0; i < keysCnt; i++)
assertEquals(val, cache.get(i));
}
break;
}
default:
assert false : test;
}
}
finally {
finished.set(true);
fut.get();
}
for (int i = 0; i < keysCnt; i++)
assertEquals((Integer)iter, cache.get(i));
}
/**
* @throws Exception If failed.
*/
private void checkInternalCleanup() throws Exception {
checkNoAtomicFutures();
checkOnePhaseCommitReturnValuesCleaned(GRID_CNT);
}
/**
* @throws Exception If failed.
*/
private void checkNoAtomicFutures() throws Exception {
for (int i = 0; i < GRID_CNT; i++) {
final IgniteKernal ignite = (IgniteKernal)grid(i);
GridTestUtils.waitForCondition(new GridAbsPredicate() {
@Override public boolean apply() {
return ignite.context().cache().context().mvcc().atomicFuturesCount() == 0;
}
}, 5_000);
Collection<?> futs = ignite.context().cache().context().mvcc().atomicFutures();
assertTrue("Unexpected atomic futures: " + futs, futs.isEmpty());
}
}
/**
* @throws Exception If failed.
*/
@org.junit.Test
public void testFailsWithNoRetries() throws Exception {
checkFailsWithNoRetries(false);
}
/**
* @throws Exception If failed.
*/
@org.junit.Test
public void testFailsWithNoRetriesAsync() throws Exception {
checkFailsWithNoRetries(true);
}
/**
* @param async If {@code true} tests asynchronous put.
* @throws Exception If failed.
*/
private void checkFailsWithNoRetries(boolean async) throws Exception {
ignite(0).createCache(cacheConfiguration(false, false));
final AtomicBoolean finished = new AtomicBoolean();
IgniteInternalFuture<Object> fut = runAsync(new Callable<Object>() {
@Override public Object call() throws Exception {
while (!finished.get()) {
stopGrid(3);
startGrid(3);
awaitPartitionMapExchange();
}
return null;
}
});
try {
int keysCnt = keysCount();
boolean eThrown = false;
IgniteCache<Object, Object> cache = ignite(0).cache(DEFAULT_CACHE_NAME).withNoRetries();
long stopTime = System.currentTimeMillis() + 60_000;
while (System.currentTimeMillis() < stopTime) {
for (int i = 0; i < keysCnt; i++) {
try {
if (async)
cache.putAsync(i, i).get();
else
cache.put(i, i);
}
catch (Exception e) {
assertTrue("Invalid exception: " + e,
X.hasCause(e, ClusterTopologyCheckedException.class, CachePartialUpdateException.class));
eThrown = true;
break;
}
}
if (eThrown)
break;
}
assertTrue(eThrown);
finished.set(true);
fut.get();
}
finally {
finished.set(true);
}
}
/** {@inheritDoc} */
@Override protected long getTestTimeout() {
return 3 * 60 * 1000;
}
/**
*
*/
enum Test {
/** */
PUT,
/** */
GET_AND_PUT,
/** */
PUT_ALL,
/** */
PUT_ASYNC,
/** */
INVOKE,
/** */
INVOKE_ALL,
/** */
TX_PUT
}
/**
*
*/
class SetEntryProcessor implements CacheEntryProcessor<Integer, Integer, Integer> {
/** */
private Integer val;
/**
* @param val Value.
*/
public SetEntryProcessor(Integer val) {
this.val = val;
}
/** {@inheritDoc} */
@Override public Integer process(MutableEntry<Integer, Integer> e, Object... args) {
Integer old = e.getValue();
e.setValue(val);
return old == null ? 0 : old;
}
}
/** */
private static class TestStoreFactory implements Factory<CacheStore> {
/** {@inheritDoc} */
@Override public CacheStore create() {
return new TestCacheStore();
}
}
/**
*
*/
private static class TestCacheStore extends CacheStoreAdapter {
/** Store map. */
private static Map STORE_MAP = new ConcurrentHashMap();
/** {@inheritDoc} */
@Override public Object load(Object key) throws CacheLoaderException {
return STORE_MAP.get(key);
}
/** {@inheritDoc} */
@Override public void write(Cache.Entry entry) throws CacheWriterException {
STORE_MAP.put(entry.getKey(), entry.getValue());
}
/** {@inheritDoc} */
@Override public void delete(Object key) throws CacheWriterException {
STORE_MAP.remove(key);
}
}
}