blob: bb4eba91040c2fa76753ac646f7337b59e957d86 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.processors.cache.distributed.replicated.preloader;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Random;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import javax.cache.configuration.CacheEntryListenerConfiguration;
import javax.cache.configuration.Factory;
import javax.cache.configuration.MutableCacheEntryListenerConfiguration;
import javax.cache.event.CacheEntryEventFilter;
import javax.cache.event.CacheEntryListener;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteMessaging;
import org.apache.ignite.binary.BinaryObject;
import org.apache.ignite.cache.CacheEntryEventSerializableFilter;
import org.apache.ignite.cache.CachePeekMode;
import org.apache.ignite.cache.CacheRebalanceMode;
import org.apache.ignite.cache.affinity.AffinityKeyMapper;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.events.Event;
import org.apache.ignite.events.EventType;
import org.apache.ignite.internal.IgniteKernal;
import org.apache.ignite.internal.binary.BinaryEnumObjectImpl;
import org.apache.ignite.internal.binary.BinaryMarshaller;
import org.apache.ignite.internal.processors.cache.GridCacheAdapter;
import org.apache.ignite.internal.processors.cache.GridCacheEntryEx;
import org.apache.ignite.internal.util.lang.GridAbsPredicate;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.P2;
import org.apache.ignite.lang.IgnitePredicate;
import org.apache.ignite.plugin.CachePluginConfiguration;
import org.apache.ignite.spi.eventstorage.memory.MemoryEventStorageSpi;
import org.apache.ignite.testframework.GridTestUtils;
import org.apache.ignite.testframework.MvccFeatureChecker;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.junit.Assume;
import org.junit.Test;
import static org.apache.ignite.cache.CacheMode.REPLICATED;
import static org.apache.ignite.cache.CacheRebalanceMode.ASYNC;
import static org.apache.ignite.cache.CacheRebalanceMode.SYNC;
import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
import static org.apache.ignite.configuration.DeploymentMode.CONTINUOUS;
import static org.apache.ignite.events.EventType.EVTS_ALL;
import static org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_PUT;
import static org.apache.ignite.events.EventType.EVT_CACHE_REBALANCE_STARTED;
import static org.apache.ignite.events.EventType.EVT_CACHE_REBALANCE_STOPPED;
/**
* Tests for replicated cache preloader.
*/
@SuppressWarnings("unchecked")
public class GridCacheReplicatedPreloadSelfTest extends GridCommonAbstractTest {
/** */
private CacheRebalanceMode preloadMode = ASYNC;
/** */
private int batchSize = 4096;
/** */
private volatile boolean extClassloadingAtCfg = false;
/** */
private volatile boolean useExtClassLoader = false;
/** Disable p2p. */
private volatile boolean disableP2p = false;
/** */
private static volatile CountDownLatch latch;
/** */
private static boolean cutromEvt = false;
/** {@inheritDoc} */
@Override protected void afterTest() throws Exception {
super.afterTest();
stopAllGrids();
}
/** {@inheritDoc} */
@Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
cfg.setRebalanceThreadPoolSize(2);
cfg.setCacheConfiguration(cacheConfiguration(igniteInstanceName));
cfg.setDeploymentMode(CONTINUOUS);
cfg.setUserAttributes(F.asMap("EVEN", !igniteInstanceName.endsWith("0") && !igniteInstanceName.endsWith("2")));
MemoryEventStorageSpi spi = new MemoryEventStorageSpi();
spi.setExpireCount(50_000);
cfg.setEventStorageSpi(spi);
if (disableP2p)
cfg.setPeerClassLoadingEnabled(false);
if (getTestIgniteInstanceName(1).equals(igniteInstanceName) || useExtClassLoader ||
cfg.getMarshaller() instanceof BinaryMarshaller)
cfg.setClassLoader(getExternalClassLoader());
if (cutromEvt) {
int[] evts = new int[EVTS_ALL.length + 1];
evts[0] = Integer.MAX_VALUE - 1;
System.arraycopy(EVTS_ALL, 0, evts, 1, EVTS_ALL.length);
cfg.setIncludeEventTypes(evts);
}
cfg.setIncludeEventTypes(EventType.EVTS_ALL);
return cfg;
}
/**
* Gets cache configuration for grid with specified name.
*
* @param igniteInstanceName Ignite instance name.
* @return Cache configuration.
*/
CacheConfiguration cacheConfiguration(String igniteInstanceName) {
CacheConfiguration cacheCfg = defaultCacheConfiguration();
cacheCfg.setCacheMode(REPLICATED);
cacheCfg.setWriteSynchronizationMode(FULL_SYNC);
cacheCfg.setRebalanceMode(preloadMode);
cacheCfg.setRebalanceBatchSize(batchSize);
if (extClassloadingAtCfg)
loadExternalClassesToCfg(cacheCfg);
return cacheCfg;
}
/**
*
* @param cacheCfg Configuration.
*/
private void loadExternalClassesToCfg(CacheConfiguration cacheCfg) {
try {
Object sf = getExternalClassLoader().
loadClass("org.apache.ignite.tests.p2p.CacheDeploymentTestStoreFactory").newInstance();
cacheCfg.setCacheStoreFactory((Factory)sf);
Object sslf = getExternalClassLoader().
loadClass("org.apache.ignite.tests.p2p.CacheDeploymentStoreSessionListenerFactory").newInstance();
cacheCfg.setCacheStoreSessionListenerFactories((Factory)sslf);
Object cpc = getExternalClassLoader().
loadClass("org.apache.ignite.tests.p2p.CacheDeploymentCachePluginConfiguration").newInstance();
cacheCfg.setPluginConfigurations((CachePluginConfiguration)cpc);
Object akm = getExternalClassLoader().
loadClass("org.apache.ignite.tests.p2p.CacheDeploymentAffinityKeyMapper").newInstance();
cacheCfg.setAffinityMapper((AffinityKeyMapper)akm);
Object pred = getExternalClassLoader().
loadClass("org.apache.ignite.tests.p2p.CacheDeploymentAlwaysTruePredicate2").newInstance();
cacheCfg.setNodeFilter((IgnitePredicate)pred);
}
catch (Exception e) {
throw new RuntimeException(e);
}
}
/**
* @throws Exception If failed.
*/
@Test
public void testSingleNode() throws Exception {
preloadMode = SYNC;
try {
startGrid(1);
}
finally {
stopAllGrids();
}
}
/**
* @throws Exception If test failed.
*/
@Test
public void testIntegrity() throws Exception {
preloadMode = SYNC;
try {
Ignite g1 = startGrid(1);
GridCacheAdapter<Integer, String> cache1 = ((IgniteKernal)g1).internalCache(DEFAULT_CACHE_NAME);
// Cache rebalancing events should not be fired for this cache.
CacheConfiguration ccfg = cacheConfiguration(((IgniteKernal)g1).getInstanceName())
.setName(DEFAULT_CACHE_NAME + "_evts_disabled")
.setEventsDisabled(true);
g1.getOrCreateCache(ccfg);
cache1.getAndPut(1, "val1");
cache1.getAndPut(2, "val2");
GridCacheEntryEx e1 = cache1.entryEx(1);
assertNotNull(e1);
e1.unswap();
Ignite g2 = startGrid(2);
Collection<Event> evts = null;
for (int i = 0; i < 3; i++) {
evts = g2.events().localQuery(F.<Event>alwaysTrue(),
EVT_CACHE_REBALANCE_STARTED, EVT_CACHE_REBALANCE_STOPPED);
if (evts.size() != 2) {
info("Wrong events collection size (will retry in 1000 ms): " + evts.size());
Thread.sleep(1000);
}
else
break;
}
assertNotNull(evts);
assertEquals("Wrong events received: " + evts, 2, evts.size());
Iterator<Event> iter = evts.iterator();
assertEquals(EVT_CACHE_REBALANCE_STARTED, iter.next().type());
assertEquals(EVT_CACHE_REBALANCE_STOPPED, iter.next().type());
IgniteCache<Integer, String> cache2 = g2.cache(DEFAULT_CACHE_NAME);
assertEquals("val1", cache2.localPeek(1));
assertEquals("val2", cache2.localPeek(2));
GridCacheAdapter<Integer, String> cacheAdapter2 = ((IgniteKernal)g2).internalCache(DEFAULT_CACHE_NAME);
GridCacheEntryEx e2 = cacheAdapter2.entryEx(1);
assertNotNull(e2);
assertNotSame(e2, e1);
e2.unswap();
assertNotNull(e2.version());
assertEquals(e1.version(), e2.version());
}
finally {
stopAllGrids();
}
}
/**
* @throws Exception If test failed.
*/
@Test
public void testDeployment() throws Exception {
// TODO GG-11141.
if (true)
return;
preloadMode = SYNC;
try {
Ignite g1 = startGrid(1);
Ignite g2 = startGrid(2);
IgniteCache<Integer, Object> cache1 = g1.cache(DEFAULT_CACHE_NAME);
IgniteCache<Integer, Object> cache2 = g2.cache(DEFAULT_CACHE_NAME);
ClassLoader ldr = grid(1).configuration().getClassLoader();
Object v1 = ldr.loadClass("org.apache.ignite.tests.p2p.CacheDeploymentTestValue3").newInstance();
cache1.put(1, v1);
info("Stored value in cache1 [v=" + v1 + ", ldr=" + v1.getClass().getClassLoader() + ']');
Object v2 = cache2.get(1);
info("Read value from cache2 [v=" + v2 + ", ldr=" + v2.getClass().getClassLoader() + ']');
assert v2 != null;
assert v2.toString().equals(v1.toString());
assert !v2.getClass().getClassLoader().equals(getClass().getClassLoader());
assert v2.getClass().getClassLoader().getClass().getName().contains("GridDeploymentClassLoader") ||
grid(2).configuration().getMarshaller() instanceof BinaryMarshaller;
Object e1 = ldr.loadClass("org.apache.ignite.tests.p2p.CacheDeploymentTestEnumValue").getEnumConstants()[0];
cache1.put(2, e1);
Object e2 = cache2.get(2);
if (g1.configuration().getMarshaller() instanceof BinaryMarshaller) {
BinaryObject enumObj = (BinaryObject)cache2.withKeepBinary().get(2);
assertEquals(0, enumObj.enumOrdinal());
assertTrue(enumObj.type().isEnum());
assertTrue(enumObj instanceof BinaryEnumObjectImpl);
}
assert e2 != null;
assert e2.toString().equals(e1.toString());
assert !e2.getClass().getClassLoader().equals(getClass().getClassLoader());
assert e2.getClass().getClassLoader().getClass().getName().contains("GridDeploymentClassLoader") ||
grid(2).configuration().getMarshaller() instanceof BinaryMarshaller;
stopGrid(1);
Ignite g3 = startGrid(3);
IgniteCache<Integer, Object> cache3 = g3.cache(DEFAULT_CACHE_NAME);
Object v3 = cache3.localPeek(1, CachePeekMode.ONHEAP);
assert v3 != null;
info("Read value from cache3 [v=" + v3 + ", ldr=" + v3.getClass().getClassLoader() + ']');
assert v3 != null;
assert v3.toString().equals(v1.toString());
assert !v3.getClass().getClassLoader().equals(getClass().getClassLoader());
assert v3.getClass().getClassLoader().getClass().getName().contains("GridDeploymentClassLoader") ||
grid(3).configuration().getMarshaller() instanceof BinaryMarshaller;
}
finally {
stopAllGrids();
}
}
/**
* @throws Exception If test failed.
*/
@Test
public void testExternalClassesAtConfiguration() throws Exception {
try {
extClassloadingAtCfg = true;
useExtClassLoader = true;
Ignite g1 = startGrid(1);
Ignite g2 = startGrid(2); // Checks deserialization at node join.
Ignite g3 = startClientGrid(3);
IgniteCache<Integer, Object> cache1 = g1.cache(DEFAULT_CACHE_NAME);
IgniteCache<Integer, Object> cache2 = g2.cache(DEFAULT_CACHE_NAME);
IgniteCache<Integer, Object> cache3 = g3.cache(DEFAULT_CACHE_NAME);
final Class<CacheEntryListener> cls1 = (Class<CacheEntryListener>) getExternalClassLoader().
loadClass("org.apache.ignite.tests.p2p.CacheDeploymentCacheEntryListener");
final Class<CacheEntryEventSerializableFilter> cls2 = (Class<CacheEntryEventSerializableFilter>) getExternalClassLoader().
loadClass("org.apache.ignite.tests.p2p.CacheDeploymentCacheEntryEventSerializableFilter");
CacheEntryListenerConfiguration<Integer, Object> lsnrCfg = new MutableCacheEntryListenerConfiguration<>(
new Factory<CacheEntryListener<Integer, Object>>() {
@Override public CacheEntryListener<Integer, Object> create() {
try {
return cls1.newInstance();
}
catch (Exception e) {
throw new RuntimeException(e);
}
}
},
new ClassFilterFactory(cls2),
true,
true
);
cache1.registerCacheEntryListener(lsnrCfg);
cache1.put(1, 1);
assertEquals(1, cache2.get(1));
assertEquals(1, cache3.get(1));
}
finally {
extClassloadingAtCfg = false;
useExtClassLoader = false;
}
}
/**
* @throws Exception If test failed.
*/
@Test
public void testExternalClassesAtConfigurationDynamicStart() throws Exception {
try {
extClassloadingAtCfg = false;
useExtClassLoader = true;
Ignite g1 = startGrid(1);
Ignite g2 = startGrid(2);
Ignite g3 = startClientGrid(3);
CacheConfiguration cfg = defaultCacheConfiguration();
loadExternalClassesToCfg(cfg);
cfg.setName("customStore");
IgniteCache<Integer, Object> cache1 = g1.createCache(cfg);
IgniteCache<Integer, Object> cache2 = g2.getOrCreateCache(cfg); // Checks deserialization at cache creation.
IgniteCache<Integer, Object> cache3 = g3.getOrCreateCache(cfg); // Checks deserialization at cache creation.
cache1.put(1, 1);
assertEquals(1, cache2.get(1));
assertEquals(1, cache3.get(1));
}
finally {
extClassloadingAtCfg = false;
useExtClassLoader = false;
}
}
/**
* @throws Exception If test failed.
*/
@Test
public void testExternalClassesAtConfigurationDynamicStart2() throws Exception {
try {
extClassloadingAtCfg = false;
useExtClassLoader = true;
Ignite g1 = startGrid(1);
Ignite g2 = startGrid(2);
Ignite g3 = startClientGrid(3);
CacheConfiguration cfg = defaultCacheConfiguration();
loadExternalClassesToCfg(cfg);
cfg.setName("customStore");
IgniteCache<Integer, Object> cache1 = g1.getOrCreateCache(cfg);
IgniteCache<Integer, Object> cache2 = g2.getOrCreateCache("customStore"); // Checks deserialization at cache creation.
IgniteCache<Integer, Object> cache3 = g3.getOrCreateCache("customStore"); // Checks deserialization at cache creation.
cache1.put(1, 1);
assertEquals(1, cache2.get(1));
assertEquals(1, cache3.get(1));
}
finally {
extClassloadingAtCfg = false;
useExtClassLoader = false;
}
}
/**
* @throws Exception If test failed.
*/
@Test
public void testExternalClassesAtMessage() throws Exception {
try {
useExtClassLoader = true;
disableP2p = true;
final Class cls = (Class)getExternalClassLoader().
loadClass("org.apache.ignite.tests.p2p.CacheDeploymentExternalizableTestValue");
Ignite g1 = startGrid(1);
startGrid(2);
IgniteMessaging rmtMsg = g1.message();
latch = new CountDownLatch(2);
rmtMsg.remoteListen("MyOrderedTopic", new MessageListener());
Object o = cls.newInstance();
o.toString();
rmtMsg.send("MyOrderedTopic", o);
rmtMsg.sendOrdered("MyOrderedTopic", o, 0);
latch.await();
// Custom topic.
final Class cls2 = (Class)getExternalClassLoader().
loadClass("org.apache.ignite.tests.p2p.CacheDeploymentTestEnumValue");
Object topic = cls2.getEnumConstants()[0];
latch = new CountDownLatch(2);
rmtMsg.remoteListen(topic, new MessageListener());
rmtMsg.send(topic, topic);
rmtMsg.sendOrdered(topic, topic, 0);
latch.await();
}
finally {
useExtClassLoader = false;
disableP2p = false;
}
}
/**
* @throws Exception If test failed.
*/
@Test
public void testExternalClassesAtEventP2pDisabled() throws Exception {
MvccFeatureChecker.skipIfNotSupported(MvccFeatureChecker.Feature.CACHE_EVENTS);
testExternalClassesAtEvent0(true);
}
/**
* @throws Exception If test failed.
*/
@Test
public void testExternalClassesAtEvent() throws Exception {
MvccFeatureChecker.skipIfNotSupported(MvccFeatureChecker.Feature.CACHE_EVENTS);
testExternalClassesAtEvent0(false);
}
/**
* @throws Exception If test failed.
*/
private void testExternalClassesAtEvent0(boolean p2p) throws Exception {
try {
useExtClassLoader = true;
cutromEvt = true;
if (p2p)
disableP2p = true;
final Class cls = (Class)getExternalClassLoader().
loadClass("org.apache.ignite.tests.p2p.CacheDeploymentExternalizableTestValue");
final Class cls2 = (Class)getExternalClassLoader().
loadClass("org.apache.ignite.tests.p2p.GridEventConsumeFilter");
Ignite g1 = startGrid(1);
startGrid(2);
latch = new CountDownLatch(3);
g1.events().localListen((IgnitePredicate)cls2.newInstance(), EVT_CACHE_OBJECT_PUT);
g1.events().localListen(new EventListener(), EVT_CACHE_OBJECT_PUT);
g1.events().remoteListen(null, (IgnitePredicate)cls2.newInstance(), EVT_CACHE_OBJECT_PUT);
g1.events().remoteListen(null, new EventListener(), EVT_CACHE_OBJECT_PUT);
g1.cache(DEFAULT_CACHE_NAME).put("1", cls.newInstance());
latch.await();
}
finally {
useExtClassLoader = false;
cutromEvt = false;
if (p2p)
disableP2p = false;
}
}
/**
* @throws Exception If test failed.
*/
@Test
public void testSync() throws Exception {
preloadMode = SYNC;
batchSize = 512;
try {
IgniteCache<Integer, String> cache1 = startGrid(1).cache(DEFAULT_CACHE_NAME);
int keyCnt = 1000;
for (int i = 0; i < keyCnt; i++)
cache1.put(i, "val" + i);
IgniteCache<Integer, String> cache2 = startGrid(2).cache(DEFAULT_CACHE_NAME);
assertEquals(keyCnt, cache2.localSize(CachePeekMode.ALL));
}
finally {
stopAllGrids();
}
}
/**
* @throws Exception If test failed.
*/
@Test
public void testAsync() throws Exception {
preloadMode = ASYNC;
batchSize = 256;
try {
IgniteCache<Integer, String> cache1 = startGrid(1).cache(DEFAULT_CACHE_NAME);
final int keyCnt = 2000;
for (int i = 0; i < keyCnt; i++)
cache1.put(i, "val" + i);
final IgniteCache<Integer, String> cache2 = startGrid(2).cache(DEFAULT_CACHE_NAME);
int size = cache2.localSize(CachePeekMode.ALL);
info("Size of cache2: " + size);
boolean awaitSize = GridTestUtils.waitForCondition(new GridAbsPredicate() {
@Override public boolean apply() {
return cache2.localSize(CachePeekMode.ALL) >= keyCnt;
}
}, getTestTimeout());
assertTrue("Actual cache size: " + cache2.localSize(CachePeekMode.ALL), awaitSize);
}
finally {
stopAllGrids();
}
}
/**
* @throws Exception If test failed.
*/
@Test
public void testBatchSize1() throws Exception {
preloadMode = SYNC;
batchSize = 1; // 1 byte but one entry should be in batch anyway.
try {
IgniteCache<Integer, String> cache1 = startGrid(1).cache(DEFAULT_CACHE_NAME);
int cnt = 100;
for (int i = 0; i < cnt; i++)
cache1.put(i, "val" + i);
IgniteCache<Integer, String> cache2 = startGrid(2).cache(DEFAULT_CACHE_NAME);
assertEquals(cnt, cache2.localSize(CachePeekMode.ALL));
}
finally {
stopAllGrids();
}
}
/**
* @throws Exception If test failed.
*/
@Test
public void testBatchSize1000() throws Exception {
preloadMode = SYNC;
batchSize = 1000; // 1000 bytes.
try {
IgniteCache<Integer, String> cache1 = startGrid(1).cache(DEFAULT_CACHE_NAME);
int cnt = 100;
for (int i = 0; i < cnt; i++)
cache1.put(i, "val" + i);
IgniteCache<Integer, String> cache2 = startGrid(2).cache(DEFAULT_CACHE_NAME);
assertEquals(cnt, cache2.localSize(CachePeekMode.ALL));
}
finally {
stopAllGrids();
}
}
/**
* @throws Exception If test failed.
*/
@Test
public void testBatchSize10000() throws Exception {
preloadMode = SYNC;
batchSize = 10000; // 10000 bytes.
try {
IgniteCache<Integer, String> cache1 = startGrid(1).cache(DEFAULT_CACHE_NAME);
int cnt = 100;
for (int i = 0; i < cnt; i++)
cache1.put(i, "val" + i);
IgniteCache<Integer, String> cache2 = startGrid(2).cache(DEFAULT_CACHE_NAME);
assertEquals(cnt, cache2.localSize(CachePeekMode.ALL));
}
finally {
stopGrid(1);
stopGrid(2);
}
}
/**
* @throws Exception If test failed.
*/
@Test
public void testMultipleNodes() throws Exception {
Assume.assumeFalse("https://issues.apache.org/jira/browse/IGNITE-10082", MvccFeatureChecker.forcedMvcc());
preloadMode = ASYNC;
batchSize = 256;
try {
int gridCnt = 4;
startGridsMultiThreaded(gridCnt);
info("Beginning data population...");
final int cnt = 2500;
Map<Integer, String> map = null;
for (int i = 0; i < cnt; i++) {
if (i % 100 == 0) {
if (map != null && !map.isEmpty()) {
grid(0).cache(DEFAULT_CACHE_NAME).putAll(map);
info("Put entries count: " + i);
}
map = new HashMap<>();
}
map.put(i, "val" + i);
}
if (map != null && !map.isEmpty())
grid(0).cache(DEFAULT_CACHE_NAME).putAll(map);
for (int gridIdx = 0; gridIdx < gridCnt; gridIdx++) {
assert grid(gridIdx).cache(DEFAULT_CACHE_NAME).localSize(CachePeekMode.ALL) == cnt :
"Actual size: " + grid(gridIdx).cache(DEFAULT_CACHE_NAME).localSize(CachePeekMode.ALL);
info("Cache size is OK for grid index: " + gridIdx);
}
final IgniteCache<Integer, String> lastCache = startGrid(gridCnt).cache(DEFAULT_CACHE_NAME);
// Let preloading start.
Thread.sleep(1000);
// Stop random initial node while preloading is in progress.
int idx = new Random().nextInt(gridCnt);
info("Stopping node with index: " + idx);
stopGrid(idx);
awaitPartitionMapExchange(true, true, null);
boolean awaitSize = GridTestUtils.waitForCondition(new GridAbsPredicate() {
@Override public boolean apply() {
return lastCache.localSize(CachePeekMode.ALL) >= cnt;
}
}, 20_000);
assertTrue("Actual cache size: " + lastCache.localSize(CachePeekMode.ALL), awaitSize);
}
finally {
stopAllGrids();
}
}
/**
* @throws Exception If test failed.
*/
@Test
public void testConcurrentStartSync() throws Exception {
preloadMode = SYNC;
batchSize = 10000;
try {
startGridsMultiThreaded(4);
}
finally {
stopAllGrids();
}
}
/**
* @throws Exception If test failed.
*/
@Test
public void testConcurrentStartAsync() throws Exception {
preloadMode = ASYNC;
batchSize = 10000;
try {
startGridsMultiThreaded(4);
}
finally {
stopAllGrids();
}
}
/**
*
*/
private static class MessageListener implements P2<UUID, Object> {
/**
* @param nodeId
* @param msg
* @return
*/
@Override public boolean apply(UUID nodeId, Object msg) {
System.out.println("Received message [msg=" + msg + ", from=" + nodeId + ']');
latch.countDown();
return true; // Return true to continue listening.
}
}
/** */
private static class EventListener implements IgnitePredicate<Event> {
/** {@inheritDoc} */
@Override public boolean apply(Event evt) {
System.out.println("Cache event: " + evt);
latch.countDown();
return true;
}
}
/**
*
*/
private static class ClassFilterFactory implements Factory<CacheEntryEventFilter<Integer, Object>> {
/** */
private Class<CacheEntryEventSerializableFilter> cls;
/**
* @param cls Class.
*/
public ClassFilterFactory(Class<CacheEntryEventSerializableFilter> cls) {
this.cls = cls;
}
/** {@inheritDoc} */
@Override public CacheEntryEventSerializableFilter<Integer, Object> create() {
try {
return cls.newInstance();
}
catch (Exception e) {
throw new RuntimeException(e);
}
}
}
}