blob: 865e4dcaa5be5693a072102d37805cd8ccd96848 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.processors.cache;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.Callable;
import javax.cache.integration.CacheLoaderException;
import javax.cache.integration.CacheWriterException;
import javax.cache.processor.EntryProcessor;
import javax.cache.processor.MutableEntry;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteException;
import org.apache.ignite.cache.store.CacheStore;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.IgniteKernal;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearCacheAdapter;
import org.apache.ignite.internal.transactions.IgniteTxHeuristicCheckedException;
import org.apache.ignite.lang.IgniteBiInClosure;
import org.apache.ignite.testframework.GridTestUtils;
import org.apache.ignite.testframework.MvccFeatureChecker;
import org.apache.ignite.transactions.Transaction;
import org.apache.ignite.transactions.TransactionConcurrency;
import org.apache.ignite.transactions.TransactionIsolation;
import org.apache.ignite.transactions.TransactionRollbackException;
import org.jetbrains.annotations.Nullable;
import org.junit.Test;
import static org.apache.ignite.cache.CacheMode.REPLICATED;
/**
* Tests that transaction is invalidated in case of {@link IgniteTxHeuristicCheckedException}.
*/
public abstract class IgniteTxStoreExceptionAbstractSelfTest extends GridCacheAbstractSelfTest {
/** Index SPI throwing exception. */
private static TestStore store;
/** */
private static final int PRIMARY = 0;
/** */
private static final int BACKUP = 1;
/** */
private static final int NOT_PRIMARY_AND_BACKUP = 2;
/** */
private static Integer lastKey;
/** {@inheritDoc} */
@Override protected int gridCount() {
return 3;
}
/** {@inheritDoc} */
@Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
cfg.getTransactionConfiguration().setTxSerializableEnabled(true);
return cfg;
}
/** {@inheritDoc} */
@SuppressWarnings("unchecked")
@Override protected CacheConfiguration cacheConfiguration(String igniteInstanceName) throws Exception {
MvccFeatureChecker.skipIfNotSupported(MvccFeatureChecker.Feature.CACHE_STORE);
CacheConfiguration ccfg = super.cacheConfiguration(igniteInstanceName);
ccfg.setCacheStoreFactory(singletonFactory(store));
ccfg.setReadThrough(true);
ccfg.setWriteThrough(true);
ccfg.setLoadPreviousValue(true);
return ccfg;
}
/** {@inheritDoc} */
@Override protected void beforeTestsStarted() throws Exception {
MvccFeatureChecker.skipIfNotSupported(MvccFeatureChecker.Feature.CACHE_STORE);
store = new TestStore();
super.beforeTestsStarted();
lastKey = 0;
}
/** {@inheritDoc} */
@Override protected void afterTestsStopped() throws Exception {
super.afterTestsStopped();
store = null;
}
/** {@inheritDoc} */
@Override protected void afterTest() throws Exception {
store.forceFail(false);
super.afterTest();
}
/**
* @throws Exception If failed.
*/
@Test
public void testPutNear() throws Exception {
checkPut(true, keyForNode(grid(0).localNode(), NOT_PRIMARY_AND_BACKUP));
checkPut(false, keyForNode(grid(0).localNode(), NOT_PRIMARY_AND_BACKUP));
}
/**
* @throws Exception If failed.
*/
@Test
public void testPutPrimary() throws Exception {
checkPut(true, keyForNode(grid(0).localNode(), PRIMARY));
checkPut(false, keyForNode(grid(0).localNode(), PRIMARY));
}
/**
* @throws Exception If failed.
*/
@Test
public void testPutBackup() throws Exception {
checkPut(true, keyForNode(grid(0).localNode(), BACKUP));
checkPut(false, keyForNode(grid(0).localNode(), BACKUP));
}
/**
* @throws Exception If failed.
*/
@Test
public void testPutAll() throws Exception {
checkPutAll(true, keyForNode(grid(0).localNode(), PRIMARY),
keyForNode(grid(0).localNode(), PRIMARY),
keyForNode(grid(0).localNode(), PRIMARY));
checkPutAll(false, keyForNode(grid(0).localNode(), PRIMARY),
keyForNode(grid(0).localNode(), PRIMARY),
keyForNode(grid(0).localNode(), PRIMARY));
if (gridCount() > 1) {
checkPutAll(true, keyForNode(grid(1).localNode(), PRIMARY),
keyForNode(grid(1).localNode(), PRIMARY),
keyForNode(grid(1).localNode(), PRIMARY));
checkPutAll(false, keyForNode(grid(1).localNode(), PRIMARY),
keyForNode(grid(1).localNode(), PRIMARY),
keyForNode(grid(1).localNode(), PRIMARY));
}
}
/**
* @throws Exception If failed.
*/
@Test
public void testRemoveNear() throws Exception {
checkRemove(false, keyForNode(grid(0).localNode(), NOT_PRIMARY_AND_BACKUP));
checkRemove(true, keyForNode(grid(0).localNode(), NOT_PRIMARY_AND_BACKUP));
}
/**
* @throws Exception If failed.
*/
@Test
public void testRemovePrimary() throws Exception {
checkRemove(false, keyForNode(grid(0).localNode(), PRIMARY));
checkRemove(true, keyForNode(grid(0).localNode(), PRIMARY));
}
/**
* @throws Exception If failed.
*/
@Test
public void testRemoveBackup() throws Exception {
checkRemove(false, keyForNode(grid(0).localNode(), BACKUP));
checkRemove(true, keyForNode(grid(0).localNode(), BACKUP));
}
/**
* @throws Exception If failed.
*/
@Test
public void testTransformNear() throws Exception {
checkTransform(false, keyForNode(grid(0).localNode(), NOT_PRIMARY_AND_BACKUP));
checkTransform(true, keyForNode(grid(0).localNode(), NOT_PRIMARY_AND_BACKUP));
}
/**
* @throws Exception If failed.
*/
@Test
public void testTransformPrimary() throws Exception {
checkTransform(false, keyForNode(grid(0).localNode(), PRIMARY));
checkTransform(true, keyForNode(grid(0).localNode(), PRIMARY));
}
/**
* @throws Exception If failed.
*/
@Test
public void testTransformBackup() throws Exception {
checkTransform(false, keyForNode(grid(0).localNode(), BACKUP));
checkTransform(true, keyForNode(grid(0).localNode(), BACKUP));
}
/**
* @throws Exception If failed.
*/
@Test
public void testPutNearTx() throws Exception {
for (TransactionConcurrency concurrency : TransactionConcurrency.values()) {
for (TransactionIsolation isolation : TransactionIsolation.values()) {
checkPutTx(true, concurrency, isolation, keyForNode(grid(0).localNode(), NOT_PRIMARY_AND_BACKUP));
checkPutTx(false, concurrency, isolation, keyForNode(grid(0).localNode(), NOT_PRIMARY_AND_BACKUP));
}
}
}
/**
* @throws Exception If failed.
*/
@Test
public void testPutPrimaryTx() throws Exception {
for (TransactionConcurrency concurrency : TransactionConcurrency.values()) {
for (TransactionIsolation isolation : TransactionIsolation.values()) {
checkPutTx(true, concurrency, isolation, keyForNode(grid(0).localNode(), PRIMARY));
checkPutTx(false, concurrency, isolation, keyForNode(grid(0).localNode(), PRIMARY));
}
}
}
/**
* @throws Exception If failed.
*/
@Test
public void testPutBackupTx() throws Exception {
for (TransactionConcurrency concurrency : TransactionConcurrency.values()) {
for (TransactionIsolation isolation : TransactionIsolation.values()) {
checkPutTx(true, concurrency, isolation, keyForNode(grid(0).localNode(), BACKUP));
checkPutTx(false, concurrency, isolation, keyForNode(grid(0).localNode(), BACKUP));
}
}
}
/**
* @throws Exception If failed.
*/
@Test
public void testPutMultipleKeysTx() throws Exception {
for (TransactionConcurrency concurrency : TransactionConcurrency.values()) {
for (TransactionIsolation isolation : TransactionIsolation.values()) {
checkPutTx(true, concurrency, isolation,
keyForNode(grid(0).localNode(), PRIMARY),
keyForNode(grid(0).localNode(), PRIMARY),
keyForNode(grid(0).localNode(), PRIMARY));
checkPutTx(false, concurrency, isolation,
keyForNode(grid(0).localNode(), PRIMARY),
keyForNode(grid(0).localNode(), PRIMARY),
keyForNode(grid(0).localNode(), PRIMARY));
if (gridCount() > 1) {
checkPutTx(true, concurrency, isolation,
keyForNode(grid(1).localNode(), PRIMARY),
keyForNode(grid(1).localNode(), PRIMARY),
keyForNode(grid(1).localNode(), PRIMARY));
checkPutTx(false, concurrency, isolation,
keyForNode(grid(1).localNode(), PRIMARY),
keyForNode(grid(1).localNode(), PRIMARY),
keyForNode(grid(1).localNode(), PRIMARY));
}
}
}
}
/**
* @param putBefore If {@code true} then puts some value before executing failing operation.
* @param keys Keys.
* @param concurrency Transaction concurrency.
* @param isolation Transaction isolation.
* @throws Exception If failed.
*/
private void checkPutTx(boolean putBefore, TransactionConcurrency concurrency,
TransactionIsolation isolation, final Integer... keys) throws Exception {
assertTrue(keys.length > 0);
info("Test transaction [concurrency=" + concurrency + ", isolation=" + isolation + ']');
IgniteCache<Integer, Integer> cache = grid(0).cache(DEFAULT_CACHE_NAME);
if (putBefore) {
store.forceFail(false);
info("Start transaction.");
try (Transaction tx = grid(0).transactions().txStart(concurrency, isolation)) {
for (Integer key : keys) {
info("Put " + key);
cache.put(key, 1);
}
info("Commit.");
tx.commit();
}
}
// Execute get from all nodes to create readers for near cache.
for (int i = 0; i < gridCount(); i++) {
for (Integer key : keys)
grid(i).cache(DEFAULT_CACHE_NAME).get(key);
}
store.forceFail(true);
try {
info("Start transaction.");
try (Transaction tx = grid(0).transactions().txStart(concurrency, isolation)) {
for (Integer key : keys) {
info("Put " + key);
cache.put(key, 2);
}
info("Commit.");
tx.commit();
}
fail("Transaction should fail.");
}
catch (IgniteException e) {
log.info("Expected exception: " + e);
}
for (Integer key : keys)
checkValue(key, putBefore);
}
/**
* @param key Key.
* @param putBefore If {@code true} expects non-null values.
* @throws Exception If failed.
*/
private void checkValue(final Integer key, boolean putBefore) throws Exception {
store.forceFail(false);
info("Check key: " + key);
for (int i = 0; i < gridCount(); i++) {
IgniteKernal grid = (IgniteKernal)grid(i);
GridCacheAdapter cache = grid.internalCache(DEFAULT_CACHE_NAME);
GridCacheMapEntry entry = cache.map().getEntry(cache.context(), cache.context().toCacheKeyObject(key));
log.info("Entry: " + entry);
if (entry != null) {
assertFalse("Unexpected entry for grid [idx=" + i + ", entry=" + entry + ']', entry.lockedByAny());
assertEquals("Unexpected entry for grid [idx=" + i + ", entry=" + entry + ']', putBefore,
entry.hasValue());
assertEquals("Unexpected entry for grid [idx=" + i + ", entry=" + entry + ']', putBefore ? 1 : null,
entry.rawGet().value(cache.ctx.cacheObjectContext(), false));
}
if (cache.isNear()) {
entry = ((GridNearCacheAdapter)cache).dht().map().getEntry(cache.context(), cache.context().toCacheKeyObject(key));
log.info("Dht entry: " + entry);
if (entry != null) {
assertFalse("Unexpected entry for grid [idx=" + i + ", entry=" + entry + ']', entry.lockedByAny());
assertEquals("Unexpected entry for grid [idx=" + i + ", entry=" + entry + ']', putBefore,
entry.hasValue());
assertEquals("Unexpected entry for grid [idx=" + i + ", entry=" + entry + ']', putBefore ? 1 : null,
entry.rawGet().value(cache.ctx.cacheObjectContext(), false));
}
}
}
for (int i = 0; i < gridCount(); i++)
assertEquals("Unexpected value for grid " + i, putBefore ? 1 : null, grid(i).cache(DEFAULT_CACHE_NAME).get(key));
}
/**
* @param putBefore If {@code true} then puts some value before executing failing operation.
* @param key Key.
* @throws Exception If failed.
*/
private void checkPut(boolean putBefore, final Integer key) throws Exception {
if (putBefore) {
store.forceFail(false);
info("Put key: " + key);
grid(0).cache(DEFAULT_CACHE_NAME).put(key, 1);
}
// Execute get from all nodes to create readers for near cache.
for (int i = 0; i < gridCount(); i++)
grid(i).cache(DEFAULT_CACHE_NAME).get(key);
store.forceFail(true);
info("Going to put: " + key);
GridTestUtils.assertThrows(log, new Callable<Void>() {
@Override public Void call() throws Exception {
grid(0).cache(DEFAULT_CACHE_NAME).put(key, 2);
return null;
}
}, CacheWriterException.class, null);
checkValue(key, putBefore);
}
/**
* @param putBefore If {@code true} then puts some value before executing failing operation.
* @param key Key.
* @throws Exception If failed.
*/
private void checkTransform(boolean putBefore, final Integer key) throws Exception {
if (putBefore) {
store.forceFail(false);
info("Put key: " + key);
grid(0).cache(DEFAULT_CACHE_NAME).put(key, 1);
}
// Execute get from all nodes to create readers for near cache.
for (int i = 0; i < gridCount(); i++)
grid(i).cache(DEFAULT_CACHE_NAME).get(key);
store.forceFail(true);
info("Going to transform: " + key);
Throwable e = GridTestUtils.assertThrows(log, new Callable<Void>() {
@Override public Void call() throws Exception {
grid(0).<Integer, Integer>cache(DEFAULT_CACHE_NAME).invoke(key, new EntryProcessor<Integer, Integer, Void>() {
@Override public Void process(MutableEntry<Integer, Integer> e, Object... args) {
e.setValue(2);
return null;
}
});
return null;
}
}, CacheWriterException.class, null);
assertTrue("Unexpected cause: " + e, e.getCause() instanceof TransactionRollbackException);
checkValue(key, putBefore);
}
/**
* @param putBefore If {@code true} then puts some value before executing failing operation.
* @param keys Keys.
* @throws Exception If failed.
*/
private void checkPutAll(boolean putBefore, Integer... keys) throws Exception {
assert keys.length > 1;
if (putBefore) {
store.forceFail(false);
Map<Integer, Integer> m = new HashMap<>();
for (Integer key : keys)
m.put(key, 1);
info("Put data: " + m);
grid(0).cache(DEFAULT_CACHE_NAME).putAll(m);
}
// Execute get from all nodes to create readers for near cache.
for (int i = 0; i < gridCount(); i++) {
for (Integer key : keys)
grid(i).cache(DEFAULT_CACHE_NAME).get(key);
}
store.forceFail(true);
final Map<Integer, Integer> m = new HashMap<>();
for (Integer key : keys)
m.put(key, 2);
info("Going to putAll: " + m);
GridTestUtils.assertThrows(log, new Callable<Void>() {
@Override public Void call() throws Exception {
grid(0).cache(DEFAULT_CACHE_NAME).putAll(m);
return null;
}
}, CacheWriterException.class, null);
for (Integer key : m.keySet())
checkValue(key, putBefore);
}
/**
* @param putBefore If {@code true} then puts some value before executing failing operation.
* @param key Key.
* @throws Exception If failed.
*/
private void checkRemove(boolean putBefore, final Integer key) throws Exception {
if (putBefore) {
store.forceFail(false);
info("Put key: " + key);
grid(0).cache(DEFAULT_CACHE_NAME).put(key, 1);
}
// Execute get from all nodes to create readers for near cache.
for (int i = 0; i < gridCount(); i++)
grid(i).cache(DEFAULT_CACHE_NAME).get(key);
store.forceFail(true);
info("Going to remove: " + key);
GridTestUtils.assertThrows(log, new Callable<Void>() {
@Override public Void call() throws Exception {
grid(0).cache(DEFAULT_CACHE_NAME).remove(key);
return null;
}
}, CacheWriterException.class, null);
checkValue(key, putBefore);
}
/**
* Generates key of a given type for given node.
*
* @param node Node.
* @param type Key type.
* @return Key.
*/
private Integer keyForNode(ClusterNode node, int type) {
IgniteCache<Integer, Integer> cache = grid(0).cache(DEFAULT_CACHE_NAME);
if (cache.getConfiguration(CacheConfiguration.class).getCacheMode() == REPLICATED && type == NOT_PRIMARY_AND_BACKUP)
return ++lastKey;
for (int key = lastKey + 1; key < (lastKey + 10_000); key++) {
switch (type) {
case NOT_PRIMARY_AND_BACKUP: {
if (!affinity(cache).isPrimaryOrBackup(node, key)) {
lastKey = key;
return key;
}
break;
}
case PRIMARY: {
if (affinity(cache).isPrimary(node, key)) {
lastKey = key;
return key;
}
break;
}
case BACKUP: {
if (affinity(cache).isBackup(node, key)) {
lastKey = key;
return key;
}
break;
}
default:
fail();
}
}
throw new IllegalStateException("Failed to find key.");
}
/**
*
*/
private static class TestStore implements CacheStore<Object, Object> {
/** Fail flag. */
private volatile boolean fail;
/**
* @param fail Fail flag.
*/
public void forceFail(boolean fail) {
this.fail = fail;
}
/** {@inheritDoc} */
@Nullable @Override public Object load(Object key) {
return null;
}
/** {@inheritDoc} */
@Override public void loadCache(IgniteBiInClosure<Object, Object> clo, @Nullable Object... args) {
if (fail)
throw new CacheLoaderException("Store exception");
}
/** {@inheritDoc} */
@Override public Map<Object, Object> loadAll(Iterable<?> keys) throws CacheLoaderException {
return Collections.emptyMap();
}
/** {@inheritDoc} */
@Override public void write(javax.cache.Cache.Entry<?, ?> entry) {
if (fail)
throw new CacheWriterException("Store exception");
}
/** {@inheritDoc} */
@Override public void writeAll(Collection<javax.cache.Cache.Entry<?, ?>> entries) {
if (fail)
throw new CacheWriterException("Store exception");
}
/** {@inheritDoc} */
@Override public void delete(Object key) throws CacheWriterException {
if (fail)
throw new CacheWriterException("Store exception");
}
/** {@inheritDoc} */
@Override public void deleteAll(Collection<?> keys) throws CacheWriterException {
if (fail)
throw new CacheWriterException("Store exception");
}
/** {@inheritDoc} */
@Override public void sessionEnd(boolean commit) {
if (fail && commit)
throw new CacheWriterException("Store exception");
}
}
}