blob: cb3e45d61f9532752fbdbd38acea02403732f248 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.processors.cache;
import java.util.Collections;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import javax.cache.Cache;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.cache.CachePeekMode;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.events.Event;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.IgniteInterruptedCheckedException;
import org.apache.ignite.internal.IgniteKernal;
import org.apache.ignite.internal.processors.cache.distributed.GridCacheModuloAffinityFunction;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.PA;
import org.apache.ignite.internal.util.typedef.internal.SB;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgnitePredicate;
import org.apache.ignite.testframework.GridTestUtils;
import org.apache.ignite.testframework.MvccFeatureChecker;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.jetbrains.annotations.Nullable;
import org.junit.Test;
import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
import static org.apache.ignite.cache.CacheMode.PARTITIONED;
import static org.apache.ignite.cache.CacheRebalanceMode.ASYNC;
import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
import static org.apache.ignite.events.EventType.EVT_NODE_JOINED;
/**
*
*/
public class GridCachePreloadingEvictionsSelfTest extends GridCommonAbstractTest {
/** */
private static final String VALUE = createValue();
/** */
public static final CachePeekMode[] ALL_PEEK_MODES = new CachePeekMode[] {CachePeekMode.ALL};
/** */
private final AtomicInteger idxGen = new AtomicInteger();
/** {@inheritDoc} */
@Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
CacheConfiguration partCacheCfg = defaultCacheConfiguration();
partCacheCfg.setCacheMode(PARTITIONED);
partCacheCfg.setAffinity(new GridCacheModuloAffinityFunction(1, 1));
partCacheCfg.setWriteSynchronizationMode(FULL_SYNC);
partCacheCfg.setNearConfiguration(null);
partCacheCfg.setEvictionPolicy(null);
partCacheCfg.setRebalanceMode(ASYNC);
partCacheCfg.setAtomicityMode(TRANSACTIONAL);
// This test requires artificial slowing down of the preloading.
partCacheCfg.setRebalanceThrottle(2000);
cfg.setCacheConfiguration(partCacheCfg);
cfg.setUserAttributes(F.asMap(GridCacheModuloAffinityFunction.IDX_ATTR, idxGen.getAndIncrement()));
cfg.setNetworkTimeout(60000);
return cfg;
}
/**
* @throws Exception If failed.
*/
@Test
public void testEvictions() throws Exception {
try {
MvccFeatureChecker.skipIfNotSupported(MvccFeatureChecker.Feature.EVICTION);
final Ignite ignite1 = startGrid(1);
final IgniteCache<Integer, Object> cache1 = ignite1.cache(DEFAULT_CACHE_NAME);
for (int i = 0; i < 5000; i++)
cache1.put(i, VALUE + i);
info("Finished data population.");
final AtomicBoolean done = new AtomicBoolean();
final CountDownLatch startLatch = new CountDownLatch(1);
int oldSize = cache1.localSize(CachePeekMode.ALL);
IgniteInternalFuture fut = multithreadedAsync(
new Callable<Object>() {
@Nullable @Override public Object call() throws Exception {
startLatch.await();
info("Started evicting...");
for (int i = 0; i < 3000 && !done.get(); i++) {
Cache.Entry<Integer, Object> entry = cache1.getEntry(i);
if (entry != null)
ignite1.cache(DEFAULT_CACHE_NAME).localEvict(Collections.<Object>singleton(entry.getKey()));
else
info("Entry is null.");
}
info("Finished evicting.");
return null;
}
},
1);
ignite1.events().localListen(
new IgnitePredicate<Event>() {
@Override public boolean apply(Event evt) {
startLatch.countDown();
return true;
}
},
EVT_NODE_JOINED);
final Ignite ignite2 = startGrid(2);
done.set(true);
fut.get();
sleepUntilCashesEqualize(ignite1, ignite2, oldSize);
checkCachesConsistency(ignite1, ignite2);
oldSize = cache1.size(CachePeekMode.ALL);
info("Evicting on constant topology.");
for (int i = 0; i < 1000; i++) {
Cache.Entry<Integer, Object> entry = randomEntry(ignite1);
if (entry != null)
cache1.localEvict(Collections.singleton(entry.getKey()));
else
info("Entry is null.");
}
sleepUntilCashesEqualize(ignite1, ignite2, oldSize);
checkCachesConsistency(ignite1, ignite2);
}
finally {
stopAllGrids();
}
}
/**
* Waits until cache stabilizes on new value.
*
* @param ignite1 Grid 1.
* @param ignite2 Grid 2.
* @param oldSize Old size, stable size should be .
* @throws IgniteInterruptedCheckedException If interrupted.
*/
private void sleepUntilCashesEqualize(final Ignite ignite1, final Ignite ignite2, final int oldSize)
throws IgniteInterruptedCheckedException {
info("Sleeping...");
assertTrue(GridTestUtils.waitForCondition(new PA() {
@Override public boolean apply() {
int size1 = ignite1.cache(DEFAULT_CACHE_NAME).localSize(CachePeekMode.ONHEAP);
return size1 != oldSize && size1 == ignite2.cache(DEFAULT_CACHE_NAME).localSize(CachePeekMode.ONHEAP);
}
}, getTestTimeout()));
info("Sleep finished.");
}
/**
* @param g Grid.
* @return Random entry from cache.
*/
@Nullable private Cache.Entry<Integer, Object> randomEntry(Ignite g) {
return g.<Integer, Object>cache(DEFAULT_CACHE_NAME).iterator().next();
}
/**
* @param ignite1 Grid 1.
* @param ignite2 Grid 2.
* @throws Exception If failed.
*/
private void checkCachesConsistency(Ignite ignite1, Ignite ignite2) throws Exception {
IgniteKernal g1 = (IgniteKernal)ignite1;
IgniteKernal g2 = (IgniteKernal)ignite2;
GridCacheAdapter<Integer, Object> cache1 = g1.internalCache(DEFAULT_CACHE_NAME);
GridCacheAdapter<Integer, Object> cache2 = g2.internalCache(DEFAULT_CACHE_NAME);
// Sleeping to allow the cache sizes to settle down.
U.sleep(3000);
info("Cache1 size: " + cache1.size(ALL_PEEK_MODES));
info("Cache2 size: " + cache2.size(ALL_PEEK_MODES));
assert cache1.size(ALL_PEEK_MODES) == cache2.size(ALL_PEEK_MODES) :
"Sizes do not match [s1=" + cache1.size(ALL_PEEK_MODES) + ", s2=" + cache2.size(ALL_PEEK_MODES) + ']';
for (Integer key : cache1.keySet()) {
Object e = cache1.localPeek(key, new CachePeekMode[] {CachePeekMode.ONHEAP});
if (e != null)
assert cache2.containsKey(key) : "Cache2 does not contain key: " + key;
}
}
/**
* @return Large value for test.
*/
private static String createValue() {
SB sb = new SB(1024);
for (int i = 0; i < 64; i++)
sb.a("val1");
return sb.toString();
}
}