blob: 7622469305f0b09cf50c8b3cbde9f2e0ab08cc3e [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.processors.cache.distributed.dht;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.UUID;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.cache.CacheAtomicityMode;
import org.apache.ignite.cache.CacheWriteSynchronizationMode;
import org.apache.ignite.cache.eviction.fifo.FifoEvictionPolicy;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.configuration.NearCacheConfiguration;
import org.apache.ignite.events.Event;
import org.apache.ignite.internal.IgniteKernal;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearCacheAdapter;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.P1;
import org.apache.ignite.lang.IgnitePredicate;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.junit.Test;
import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
import static org.apache.ignite.cache.CacheMode.PARTITIONED;
import static org.apache.ignite.cache.CacheRebalanceMode.SYNC;
/**
* Tests for dht cache eviction.
*/
public class GridCacheDhtEvictionNearReadersSelfTest extends GridCommonAbstractTest {
/** */
private static final int GRID_CNT = 4;
/** Default constructor. */
public GridCacheDhtEvictionNearReadersSelfTest() {
super(false /* don't start grid. */);
}
/** {@inheritDoc} */
@Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
CacheConfiguration cacheCfg = defaultCacheConfiguration();
cacheCfg.setCacheMode(PARTITIONED);
cacheCfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC);
cacheCfg.setRebalanceMode(SYNC);
cacheCfg.setAtomicityMode(atomicityMode());
cacheCfg.setBackups(1);
FifoEvictionPolicy plc = new FifoEvictionPolicy();
plc.setMaxSize(10);
cacheCfg.setEvictionPolicy(plc);
cacheCfg.setOnheapCacheEnabled(true);
NearCacheConfiguration nearCfg = new NearCacheConfiguration();
FifoEvictionPolicy nearPlc = new FifoEvictionPolicy();
nearPlc.setMaxSize(10);
nearCfg.setNearEvictionPolicy(nearPlc);
cacheCfg.setNearConfiguration(nearCfg);
cfg.setCacheConfiguration(cacheCfg);
return cfg;
}
/**
* @return Atomicity mode.
*/
public CacheAtomicityMode atomicityMode() {
return TRANSACTIONAL;
}
/** {@inheritDoc} */
@Override protected void beforeTestsStarted() throws Exception {
super.beforeTestsStarted();
if (GRID_CNT < 2)
throw new IgniteCheckedException("GRID_CNT must not be less than 2.");
startGridsMultiThreaded(GRID_CNT);
}
/** {@inheritDoc} */
@SuppressWarnings({"SizeReplaceableByIsEmpty"})
@Override protected void beforeTest() throws Exception {
for (int i = 0; i < GRID_CNT; i++) {
assert near(grid(i)).size() == 0;
assert dht(grid(i)).size() == 0;
assert near(grid(i)).isEmpty();
assert dht(grid(i)).isEmpty();
}
}
/** {@inheritDoc} */
@Override protected void afterTest() throws Exception {
for (int i = 0; i < GRID_CNT; i++) {
near(grid(i)).removeAll();
assert near(grid(i)).isEmpty() : "Near cache is not empty [idx=" + i + "]";
assert dht(grid(i)).isEmpty() : "Dht cache is not empty [idx=" + i + "]";
}
}
/**
* @param g Grid.
* @return Near cache.
*/
@SuppressWarnings({"unchecked"})
private GridNearCacheAdapter<Integer, String> near(Ignite g) {
return (GridNearCacheAdapter)((IgniteKernal)g).internalCache(DEFAULT_CACHE_NAME);
}
/**
* @param g Grid.
* @return Dht cache.
*/
@SuppressWarnings({"unchecked"})
private GridDhtCacheAdapter<Integer, String> dht(Ignite g) {
return ((GridNearCacheAdapter)((IgniteKernal)g).internalCache(DEFAULT_CACHE_NAME)).dht();
}
/**
* @param key Key.
* @return Primary node for the given key.
*/
private Collection<ClusterNode> keyNodes(Object key) {
return grid(0).affinity(DEFAULT_CACHE_NAME).mapKeyToPrimaryAndBackups(key);
}
/**
* @param nodeId Node id.
* @return Predicate for events belonging to specified node.
*/
private IgnitePredicate<Event> nodeEvent(final UUID nodeId) {
assert nodeId != null;
return new P1<Event>() {
@Override public boolean apply(Event e) {
info("Predicate called [e.nodeId()=" + e.node().id() + ", nodeId=" + nodeId + ']');
return e.node().id().equals(nodeId);
}
};
}
/**
* JUnit.
*
* @throws Exception If failed.
*/
@Test
public void testReaders() throws Exception {
Integer key = 1;
Collection<ClusterNode> nodes = new ArrayList<>(keyNodes(key));
ClusterNode primary = F.first(nodes);
assert primary != null;
nodes.remove(primary);
ClusterNode backup = F.first(nodes);
assert backup != null;
// Now calculate other node that doesn't own the key.
nodes = new ArrayList<>(grid(0).cluster().nodes());
nodes.remove(primary);
nodes.remove(backup);
ClusterNode other = F.first(nodes);
assert !F.eqNodes(primary, backup);
assert !F.eqNodes(primary, other);
assert !F.eqNodes(backup, other);
info("Primary node: " + primary.id());
info("Backup node: " + backup.id());
info("Other node: " + other.id());
GridNearCacheAdapter<Integer, String> nearPrimary = near(grid(primary));
GridDhtCacheAdapter<Integer, String> dhtPrimary = dht(grid(primary));
GridNearCacheAdapter<Integer, String> nearBackup = near(grid(backup));
GridDhtCacheAdapter<Integer, String> dhtBackup = dht(grid(backup));
GridNearCacheAdapter<Integer, String> nearOther = near(grid(other));
GridDhtCacheAdapter<Integer, String> dhtOther = dht(grid(other));
String val = "v1";
// Put on primary node.
nearPrimary.getAndPut(key, val);
GridDhtCacheEntry entryPrimary = (GridDhtCacheEntry)dhtPrimary.peekEx(key);
GridDhtCacheEntry entryBackup = (GridDhtCacheEntry)dhtBackup.peekEx(key);
assert entryPrimary != null;
assert entryBackup != null;
assert nearOther.peekEx(key) == null;
assert dhtOther.peekEx(key) == null;
// Entry which has readers will not be evicted.
//IgniteFuture<Event> futOther =
// waitForLocalEvent(grid(other).events(), nodeEvent(other.id()), EVT_CACHE_ENTRY_EVICTED);
//IgniteFuture<Event> futBackup =
// waitForLocalEvent(grid(backup).events(), nodeEvent(backup.id()), EVT_CACHE_ENTRY_EVICTED);
//IgniteFuture<Event> futPrimary =
// waitForLocalEvent(grid(primary).events(), nodeEvent(primary.id()), EVT_CACHE_ENTRY_EVICTED);
// Get value on other node, it should be loaded to near cache.
assertEquals(val, nearOther.repairableGet(key, true, false));
entryPrimary = (GridDhtCacheEntry)dhtPrimary.peekEx(key);
entryBackup = (GridDhtCacheEntry)dhtBackup.peekEx(key);
assert entryPrimary != null;
assert entryBackup != null;
assertEquals(val, localPeek(nearOther, key));
assertTrue(!entryPrimary.readers().isEmpty());
// Evict on primary node.
// It will trigger dht eviction and eviction on backup node.
grid(primary).cache(DEFAULT_CACHE_NAME).localEvict(Collections.<Object>singleton(key));
//futOther.get(3000);
//futBackup.get(3000);
//futPrimary.get(3000);
assertNotNull(localPeek(dhtPrimary, key));
assertNotNull(localPeek(nearPrimary, key));
assertNotNull(localPeek(dhtBackup, key));
assertNotNull(localPeek(nearBackup, key));
assertNull(localPeek(dhtOther, key));
assertNotNull(localPeek(nearOther, key));
}
}