blob: a9ce0ea9f049bd68d1390e2557665a447ced0cab [file] [log] [blame]
package com.gemstone.gemfire.cache.client;
import java.io.IOException;
import java.net.UnknownHostException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import com.gemstone.gemfire.cache.Cache;
import com.gemstone.gemfire.cache.CacheFactory;
import com.gemstone.gemfire.cache.DataPolicy;
import com.gemstone.gemfire.cache.EntryEvent;
import com.gemstone.gemfire.cache.InterestResultPolicy;
import com.gemstone.gemfire.cache.Region;
import com.gemstone.gemfire.cache.RegionFactory;
import com.gemstone.gemfire.cache.server.CacheServer;
import com.gemstone.gemfire.cache.server.ClientSubscriptionConfig;
import com.gemstone.gemfire.cache.util.CacheListenerAdapter;
import com.gemstone.gemfire.internal.AvailablePortHelper;
import com.gemstone.org.jgroups.util.Stack;
import dunit.DistributedTestCase;
import dunit.Host;
import dunit.SerializableCallable;
import dunit.SerializableRunnable;
import dunit.VM;
/**
* The ClientServerRegisterInterestsDUnitTest class is a test suite of test cases testing the interaction between a
* client and a server in a Register Interests scenario.
*
* @author John Blum
* @see dunit.DistributedTestCase
* @since 8.0
*/
public class ClientServerRegisterInterestsDUnitTest extends DistributedTestCase {
protected static final long WAIT_TIME_MILLISECONDS = TimeUnit.SECONDS.toMillis(5);
protected static final String LOCALHOST = "localhost";
private final AtomicInteger serverPort = new AtomicInteger(0);
private final Stack entryEvents = new Stack();
private VM gemfireServerVm;
public ClientServerRegisterInterestsDUnitTest(final String testName) {
super(testName);
}
@Override
public void setUp() throws Exception {
super.setUp();
disconnectAllFromDS();
setupGemFireCacheServer();
}
@Override
public void tearDown2() throws Exception {
super.tearDown2();
serverPort.set(0);
entryEvents.removeAll();
gemfireServerVm.invoke(new SerializableRunnable() {
@Override public void run() {
CacheFactory.getAnyInstance().close();
}
});
gemfireServerVm = null;
}
private void setupGemFireCacheServer() {
Host localhost = Host.getHost(0);
gemfireServerVm = localhost.getVM(0);
serverPort.set(AvailablePortHelper.getRandomAvailableTCPPort());
gemfireServerVm.invoke(new SerializableRunnable() {
@Override public void run() {
try {
Cache cache = new CacheFactory()
.set("name", "ClientServerRegisterInterestsTestGemFireServer")
.set("mcast-port", "0")
.set("log-file", "clientServerRegisterInterestsTest.log")
.set("log-level", "config")
//.set("jmx-manager", "true")
//.set("jmx-manager-http-port", "0")
//.set("jmx-manager-port", "1199")
//.set("jmx-manager-start", "true")
.create();
RegionFactory<String, String> regionFactory = cache.createRegionFactory();
regionFactory.setDataPolicy(DataPolicy.REPLICATE);
regionFactory.setKeyConstraint(String.class);
regionFactory.setValueConstraint(String.class);
Region<String, String> example = regionFactory.create("Example");
assertNotNull("The 'Example' Region was not properly configured and initialized!", example);
assertEquals("/Example", example.getFullPath());
assertEquals("Example", example.getName());
assertTrue(example.isEmpty());
example.put("1", "ONE");
assertFalse(example.isEmpty());
assertEquals(1, example.size());
CacheServer cacheServer = cache.addCacheServer();
cacheServer.setPort(serverPort.get());
cacheServer.setMaxConnections(10);
ClientSubscriptionConfig clientSubscriptionConfig = cacheServer.getClientSubscriptionConfig();
clientSubscriptionConfig.setCapacity(100);
clientSubscriptionConfig.setEvictionPolicy("entry");
cacheServer.start();
assertTrue("Cache Server is not running!", cacheServer.isRunning());
}
catch (UnknownHostException ignore) {
throw new RuntimeException(ignore);
}
catch (IOException e) {
throw new RuntimeException(String.format(
"Failed to start the GemFire Cache Server listening on port (%1$d) due to IO error!",
serverPort.get()), e);
}
}
});
}
private ClientCache setupGemFireClientCache() {
ClientCache clientCache = new ClientCacheFactory()
.set("durable-client-id", "TestDurableClientId")
.create();
PoolFactory poolFactory = PoolManager.createFactory();
poolFactory.setMaxConnections(10);
poolFactory.setMinConnections(1);
poolFactory.setReadTimeout(5000);
poolFactory.setSubscriptionEnabled(true);
poolFactory.addServer(LOCALHOST, serverPort.get());
Pool pool = poolFactory.create("serverConnectionPool");
assertNotNull("The 'serverConnectionPool' was not properly configured and initialized!", pool);
ClientRegionFactory<String, String> regionFactory = clientCache.createClientRegionFactory(ClientRegionShortcut.CACHING_PROXY);
regionFactory.addCacheListener(new TestEntryCacheListener());
regionFactory.setPoolName(pool.getName());
regionFactory.setKeyConstraint(String.class);
regionFactory.setValueConstraint(String.class);
Region<String, String> exampleCachingProxy = regionFactory.create("Example");
assertNotNull("The 'Example' Client Region was not properly configured and initialized", exampleCachingProxy);
clientCache.readyForEvents();
exampleCachingProxy.registerInterest("ALL_KEYS", InterestResultPolicy.DEFAULT, false, true);
return clientCache;
}
@SuppressWarnings("unchecked")
protected <K, V> V put(final String regionName, final K key, final V value) {
return (V) gemfireServerVm.invoke(new SerializableCallable() {
@Override public Object call() throws Exception {
Cache cache = CacheFactory.getAnyInstance();
cache.getRegion(regionName).put(key, value);
return cache.getRegion(regionName).get(key);
}
});
}
protected void waitOnEvent(final long waitTimeMilliseconds) {
final long timeout = (System.currentTimeMillis() + waitTimeMilliseconds);
while (entryEvents.empty() && (System.currentTimeMillis() < timeout)) {
synchronized (this) {
try {
TimeUnit.MILLISECONDS.timedWait(this, Math.min(500, waitTimeMilliseconds));
}
catch (InterruptedException ignore) {
}
}
}
}
public void testClientRegisterInterests() {
ClientCache clientCache = setupGemFireClientCache();
try {
Region<String, String> example = clientCache.getRegion("/Example");
assertNotNull("'Example' Region in Client Cache was not found!", example);
assertEquals(1, example.size());
assertTrue(example.containsKey("1"));
assertEquals("ONE", example.get("1"));
assertTrue(entryEvents.empty());
String value = put("/Example", "2", "TWO");
assertEquals("TWO", value);
waitOnEvent(WAIT_TIME_MILLISECONDS);
assertFalse(entryEvents.empty());
EntryEvent entryEvent = (EntryEvent) entryEvents.pop();
assertEquals("2", entryEvent.getKey());
assertEquals("TWO", entryEvent.getNewValue());
assertNull(entryEvent.getOldValue());
assertEquals(2, example.size());
assertTrue(example.containsKey("2"));
assertEquals("TWO", example.get("2"));
}
finally {
clientCache.close();
}
}
protected final class TestEntryCacheListener extends CacheListenerAdapter<String, String> {
@Override
public void afterCreate(final EntryEvent<String, String> event) {
super.afterCreate(event);
System.out.printf("Created entry with key (%1$s) and value (%2$s) in Region (%3$s)!", event.getKey(),
event.getNewValue(), event.getRegion().getName());
entryEvents.push(event);
}
}
}