blob: f2457c75f71c51a204eadc830bd63e7d3a914323 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.processors.cache.distributed.dht;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import javax.cache.processor.EntryProcessor;
import javax.cache.processor.MutableEntry;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.cache.affinity.Affinity;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.configuration.NearCacheConfiguration;
import org.apache.ignite.internal.processors.cache.GridCacheGenericTestStore;
import org.apache.ignite.testframework.MvccFeatureChecker;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.apache.ignite.transactions.Transaction;
import org.apache.ignite.transactions.TransactionConcurrency;
import org.junit.Before;
import org.junit.Test;
import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
import static org.apache.ignite.cache.CacheMode.PARTITIONED;
import static org.apache.ignite.transactions.TransactionConcurrency.OPTIMISTIC;
import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC;
import static org.apache.ignite.transactions.TransactionIsolation.READ_COMMITTED;
/**
* Tests write-through.
*/
public abstract class GridCacheAbstractTransformWriteThroughSelfTest extends GridCommonAbstractTest {
/** Grid count. */
protected static final int GRID_CNT = 3;
/** Update operation. */
protected static final int OP_UPDATE = 0;
/** Delete operation. */
protected static final int OP_DELETE = 1;
/** Near node constant. */
protected static final int NEAR_NODE = 0;
/** Primary node constant. */
protected static final int PRIMARY_NODE = 0;
/** Backup node constant. */
protected static final int BACKUP_NODE = 0;
/** Keys number. */
public static final int KEYS_CNT = 30;
/** Value increment processor. */
private static final EntryProcessor<String, Integer, Void> INCR_CLOS = new EntryProcessor<String, Integer, Void>() {
@Override public Void process(MutableEntry<String, Integer> e, Object... args) {
if (!e.exists())
e.setValue(1);
else
e.setValue(e.getValue() + 1);
return null;
}
};
/** Value remove processor. */
private static final EntryProcessor<String, Integer, Void> RMV_CLOS = new EntryProcessor<String, Integer, Void>() {
@Override public Void process(MutableEntry<String, Integer> e, Object... args) {
e.remove();
return null;
}
};
/** Test store. */
private static List<GridCacheGenericTestStore<String, Integer>> stores =
new ArrayList<>(GRID_CNT);
/**
* @return {@code True} if batch update is enabled.
*/
protected abstract boolean batchUpdate();
/** {@inheritDoc} */
@SuppressWarnings("unchecked")
@Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
MvccFeatureChecker.skipIfNotSupported(MvccFeatureChecker.Feature.CACHE_STORE);
IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
GridCacheGenericTestStore<String, Integer> store = new GridCacheGenericTestStore<>();
stores.add(store);
CacheConfiguration cacheCfg = defaultCacheConfiguration();
cacheCfg.setCacheMode(PARTITIONED);
cacheCfg.setBackups(1);
cacheCfg.setCacheStoreFactory(singletonFactory(store));
cacheCfg.setReadThrough(true);
cacheCfg.setWriteThrough(true);
cacheCfg.setLoadPreviousValue(true);
cacheCfg.setAtomicityMode(TRANSACTIONAL);
cacheCfg.setNearConfiguration(new NearCacheConfiguration());
cfg.setCacheConfiguration(cacheCfg);
return cfg;
}
/** */
@Before
public void beforeCacheStoreListenerRWThroughDisabledTransactionalCacheTest() {
MvccFeatureChecker.skipIfNotSupported(MvccFeatureChecker.Feature.CACHE_STORE);
}
/** {@inheritDoc} */
@Override protected void beforeTestsStarted() throws Exception {
super.beforeTestsStarted();
for (int i = 0; i < GRID_CNT; i++)
startGrid(i);
}
/** {@inheritDoc} */
@Override protected void afterTestsStopped() throws Exception {
stores.clear();
}
/** {@inheritDoc} */
@Override protected void beforeTest() throws Exception {
super.beforeTest();
for (GridCacheGenericTestStore<String, Integer> store : stores)
store.reset();
}
/**
* @throws Exception If failed.
*/
@Test
public void testTransformOptimisticNearUpdate() throws Exception {
checkTransform(OPTIMISTIC, NEAR_NODE, OP_UPDATE);
}
/**
* @throws Exception If failed.
*/
@Test
public void testTransformOptimisticPrimaryUpdate() throws Exception {
checkTransform(OPTIMISTIC, PRIMARY_NODE, OP_UPDATE);
}
/**
* @throws Exception If failed.
*/
@Test
public void testTransformOptimisticBackupUpdate() throws Exception {
checkTransform(OPTIMISTIC, BACKUP_NODE, OP_UPDATE);
}
/**
* @throws Exception If failed.
*/
@Test
public void testTransformOptimisticNearDelete() throws Exception {
checkTransform(OPTIMISTIC, NEAR_NODE, OP_DELETE);
}
/**
* @throws Exception If failed.
*/
@Test
public void testTransformOptimisticPrimaryDelete() throws Exception {
checkTransform(OPTIMISTIC, PRIMARY_NODE, OP_DELETE);
}
/**
* @throws Exception If failed.
*/
@Test
public void testTransformOptimisticBackupDelete() throws Exception {
checkTransform(OPTIMISTIC, BACKUP_NODE, OP_DELETE);
}
/**
* @throws Exception If failed.
*/
@Test
public void testTransformPessimisticNearUpdate() throws Exception {
checkTransform(PESSIMISTIC, NEAR_NODE, OP_UPDATE);
}
/**
* @throws Exception If failed.
*/
@Test
public void testTransformPessimisticPrimaryUpdate() throws Exception {
checkTransform(PESSIMISTIC, PRIMARY_NODE, OP_UPDATE);
}
/**
* @throws Exception If failed.
*/
@Test
public void testTransformPessimisticBackupUpdate() throws Exception {
checkTransform(PESSIMISTIC, BACKUP_NODE, OP_UPDATE);
}
/**
* @throws Exception If failed.
*/
@Test
public void testTransformPessimisticNearDelete() throws Exception {
checkTransform(PESSIMISTIC, NEAR_NODE, OP_DELETE);
}
/**
* @throws Exception If failed.
*/
@Test
public void testTransformPessimisticPrimaryDelete() throws Exception {
checkTransform(PESSIMISTIC, PRIMARY_NODE, OP_DELETE);
}
/**
* @throws Exception If failed.
*/
@Test
public void testTransformPessimisticBackupDelete() throws Exception {
checkTransform(PESSIMISTIC, BACKUP_NODE, OP_DELETE);
}
/**
* @param concurrency Concurrency.
* @param nodeType Node type.
* @param op Op.
* @throws Exception If failed.
*/
protected void checkTransform(TransactionConcurrency concurrency, int nodeType, int op) throws Exception {
IgniteCache<String, Integer> cache = jcache(0);
Collection<String> keys = keysForType(nodeType);
for (String key : keys)
cache.put(key, 1);
GridCacheGenericTestStore<String, Integer> nearStore = stores.get(0);
nearStore.reset();
for (String key : keys)
jcache(0).localClear(key);
info(">>> Starting transform transaction");
try (Transaction tx = ignite(0).transactions().txStart(concurrency, READ_COMMITTED)) {
if (op == OP_UPDATE) {
for (String key : keys)
cache.invoke(key, INCR_CLOS);
}
else {
for (String key : keys)
cache.invoke(key, RMV_CLOS);
}
tx.commit();
}
if (batchUpdate()) {
assertEquals(0, nearStore.getPutCount());
assertEquals(0, nearStore.getRemoveCount());
if (op == OP_UPDATE)
assertEquals(1, nearStore.getPutAllCount());
else
assertEquals(1, nearStore.getRemoveAllCount());
}
else {
assertEquals(0, nearStore.getPutAllCount());
assertEquals(0, nearStore.getRemoveAllCount());
if (op == OP_UPDATE)
assertEquals(keys.size(), nearStore.getPutCount());
else
assertEquals(keys.size(), nearStore.getRemoveCount());
}
if (op == OP_UPDATE) {
for (String key : keys)
assertEquals((Integer)2, nearStore.getMap().get(key));
}
else {
for (String key : keys)
assertNull(nearStore.getMap().get(key));
}
}
/**
* @param nodeType Node type to generate keys for.
* @return Collection of keys.
*/
private Collection<String> keysForType(int nodeType) {
Collection<String> keys = new ArrayList<>(KEYS_CNT);
int numKey = 0;
while (keys.size() < 30) {
String key = String.valueOf(numKey);
Affinity<Object> affinity = ignite(0).affinity(DEFAULT_CACHE_NAME);
if (nodeType == NEAR_NODE) {
if (!affinity.isPrimaryOrBackup(grid(0).localNode(), key))
keys.add(key);
}
else if (nodeType == PRIMARY_NODE) {
if (affinity.isPrimary(grid(0).localNode(), key))
keys.add(key);
}
else if (nodeType == BACKUP_NODE) {
if (affinity.isBackup(grid(0).localNode(), key))
keys.add(key);
}
numKey++;
}
return keys;
}
}