blob: 87cca07b8bd81def312c063c5987041561801736 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.processors.cache.eviction;
import java.io.Serializable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.cache.configuration.Factory;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteTransactions;
import org.apache.ignite.cache.CacheAtomicityMode;
import org.apache.ignite.cache.CachePeekMode;
import org.apache.ignite.cache.affinity.Affinity;
import org.apache.ignite.cache.eviction.EvictableEntry;
import org.apache.ignite.cache.eviction.EvictionPolicy;
import org.apache.ignite.cache.eviction.sorted.SortedEvictionPolicy;
import org.apache.ignite.cache.eviction.sorted.SortedEvictionPolicyFactory;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.failure.FailureHandler;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.IgniteKernal;
import org.apache.ignite.internal.processors.cache.GridCacheAdapter;
import org.apache.ignite.testframework.GridTestUtils;
import org.apache.ignite.testframework.ListeningTestLogger;
import org.apache.ignite.testframework.LogListener;
import org.apache.ignite.testframework.junits.GridAbstractTest;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.apache.ignite.transactions.Transaction;
import org.junit.Test;
import static org.apache.ignite.cluster.ClusterState.ACTIVE;
import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC;
import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ;
/**
* Sorted eviction policy tests.
*/
public class EvictionPolicyFailureHandlerTest extends GridCommonAbstractTest {
/** Test logger. */
private final ListeningTestLogger log = new ListeningTestLogger(false, GridAbstractTest.log);
/** Node failure. */
private AtomicBoolean nodeFailure = new AtomicBoolean(false);
/** Flag indicats that {@link ThrowableEvictionPolicy} should be used instead of {@link SortedEvictionPolicy}. */
private boolean oom;
/** {@inheritDoc} */
@Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
cfg.setFailureHandler((ignite, failureCtx) -> {
nodeFailure.set(true);
return false;
});
SortedEvictionPolicyFactory plcFactory = new SortedEvictionPolicyFactory();
plcFactory.setMaxSize(3);
plcFactory.setBatchSize(10);
plcFactory.setMaxMemorySize(10);
cfg.setGridLogger(log);
cfg.setCacheConfiguration(new CacheConfiguration<Integer, Integer>(DEFAULT_CACHE_NAME)
.setEvictionPolicyFactory(oom ? (Factory<ThrowableEvictionPolicy>)ThrowableEvictionPolicy::new : plcFactory)
.setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL)
.setOnheapCacheEnabled(true));
return cfg;
}
/** {@inheritDoc} */
@Override protected void afterTest() throws Exception {
beforeTest();
stopAllGrids();
}
/**
* We expect that localPeek produces an exception, but the entry evict returns false because the transaction locks
* this entry. After transaction commit, the entry will be evicted.
*/
@Test
public void testCacheMapDoesNotContainsWrongEntityAfterTransaction() throws Exception {
LogListener lsnr = LogListener.matches(s -> s.contains("The cache entry cannot be touched"))
.times(1).build();
log.registerListener(lsnr);
IgniteEx node = startGrid(0);
IgniteEx client = startClientGrid(1);
GridCacheAdapter<Object, Object> cache = ((IgniteKernal)node).internalCache(DEFAULT_CACHE_NAME);
cache.put(1, 1);
CountDownLatch locPeekFinished = new CountDownLatch(1);
CountDownLatch txStarted = new CountDownLatch(1);
CountDownLatch txFinished = new CountDownLatch(1);
GridTestUtils.runAsync(() -> {
IgniteCache<Object, Object> cache1 = client.cache(DEFAULT_CACHE_NAME);
IgniteTransactions transactions = client.transactions();
try (Transaction tx = transactions.txStart(PESSIMISTIC, REPEATABLE_READ)) {
cache1.put(2.1, 2.4);
txStarted.countDown();
locPeekFinished.await();
tx.commit();
}
catch (Exception ignore) {
}
txFinished.countDown();
}, "tx-thread");
txStarted.await();
try {
cache.localPeek(2.1, new CachePeekMode[] {CachePeekMode.ONHEAP});
}
catch (Exception ignore) {
}
locPeekFinished.countDown();
assertTrue(lsnr.check(10_000));
txFinished.await();
assertFalse(cache.map().entrySet(cache.context().cacheId()).stream()
.anyMatch(e -> new Double(2.1).equals(e.key().value(null, false)))
);
assertEquals(ACTIVE, node.cluster().state());
}
/**
* We skip {@link RuntimeException}, but {@link Error} should trigger {@link FailureHandler}.
*/
@Test
public void testErrorShouldCallErrorHandler() throws Exception {
oom = true;
IgniteEx node1 = startGrid(0);
IgniteEx node2 = startGrid(1);
GridCacheAdapter<Object, Object> cache = ((IgniteKernal)node2).internalCache(DEFAULT_CACHE_NAME);
Affinity<Object> affinity = cache.affinity();
try {
for (int i = 0; i < 1000; i++) {
if (affinity.isPrimary(node1.localNode(), i))
cache.put(i, 1);
}
}
catch (Throwable ignore) {
}
assertTrue(nodeFailure.get());
}
/**
*
*/
@Test
public void testFailureHandlerShouldNotCallOnRuntimeException() throws Exception {
IgniteEx node1 = startGrid(0);
IgniteEx node2 = startGrid(1);
GridCacheAdapter<Object, Object> cache = ((IgniteKernal)node2).internalCache(DEFAULT_CACHE_NAME);
Affinity<Object> affinity = cache.affinity();
for (int i = 0; i < 1000; i++) {
if (affinity.isPrimary(node1.localNode(), i))
cache.put(i, 1);
}
for (int i = 0; i < 1000; i++) {
if (affinity.isPrimary(node1.localNode(), (double)i))
cache.put((double)i, 1);
}
assertFalse(cache.map().entrySet(cache.context().cacheId()).stream()
.anyMatch(e -> e.key().value(null, false) instanceof Double)
);
assertFalse(nodeFailure.get());
}
/**
* Test eviction policy which always throws {@link OutOfMemoryError} when an entry is accessed.
*/
private static class ThrowableEvictionPolicy implements EvictionPolicy<Integer, Integer>, Serializable {
/** {@inheritDoc} */
@Override public synchronized void onEntryAccessed(boolean rmv, EvictableEntry<Integer, Integer> e) {
throw new OutOfMemoryError("Test");
}
}
}