blob: 26181ad5579690d4c9e64ec5421fa6996a1a87ff [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.geode.internal.cache.tier.sockets;
import static java.lang.Thread.sleep;
import static org.apache.geode.cache.Region.SEPARATOR;
import static org.apache.geode.distributed.ConfigurationProperties.DURABLE_CLIENT_ID;
import static org.apache.geode.distributed.ConfigurationProperties.DURABLE_CLIENT_TIMEOUT;
import static org.apache.geode.test.dunit.Host.getHost;
import static org.apache.geode.test.dunit.LogWriterUtils.getLogWriter;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import java.io.IOException;
import java.net.InetAddress;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.apache.geode.cache.Cache;
import org.apache.geode.cache.CacheFactory;
import org.apache.geode.cache.ClientSession;
import org.apache.geode.cache.InterestRegistrationEvent;
import org.apache.geode.cache.InterestRegistrationListener;
import org.apache.geode.cache.InterestResultPolicy;
import org.apache.geode.cache.Region;
import org.apache.geode.cache.RegionFactory;
import org.apache.geode.cache.RegionShortcut;
import org.apache.geode.cache.client.ClientCache;
import org.apache.geode.cache.client.ClientCacheFactory;
import org.apache.geode.cache.client.ClientRegionFactory;
import org.apache.geode.cache.client.ClientRegionShortcut;
import org.apache.geode.cache.server.CacheServer;
import org.apache.geode.distributed.DistributedSystem;
import org.apache.geode.internal.AvailablePortHelper;
import org.apache.geode.test.awaitility.GeodeAwaitility;
import org.apache.geode.test.dunit.Host;
import org.apache.geode.test.dunit.LogWriterUtils;
import org.apache.geode.test.dunit.VM;
import org.apache.geode.test.dunit.WaitCriterion;
import org.apache.geode.test.dunit.internal.JUnit4DistributedTestCase;
import org.apache.geode.test.junit.categories.ClientSubscriptionTest;
/**
* Written to test fix for Bug #47132
*/
@Category({ClientSubscriptionTest.class})
public class InterestRegrListenerDUnitTest extends JUnit4DistributedTestCase {
private Cache cache;
private DistributedSystem ds;
private CacheServer cacheServer;
private ClientCache clientCache;
private int cacheServerPort;
private String hostName;
private final Map<String, Integer> listnerMap = new HashMap<String, Integer>();
private static final String UNREGISTER_INTEREST = "UnregisterInterest";
private static final String REGISTER_INTEREST = "RegisterInterest";
private static final int DURABLE_CLIENT_TIMEOUT_TEST = 20;
private static InterestRegrListenerDUnitTest instance = new InterestRegrListenerDUnitTest();
@Override
public final void preSetUp() throws Exception {
disconnectAllFromDS();
}
public Cache createCache(Properties props) throws Exception {
ds = getSystem(props);
cache = CacheFactory.create(ds);
if (cache == null) {
throw new Exception("CacheFactory.create() returned null ");
}
return cache;
}
private void createServer() throws IOException {
cacheServerPort = AvailablePortHelper.getRandomAvailableTCPPort();
cacheServer = cache.addCacheServer();
cacheServer.setPort(cacheServerPort);
cacheServer.start();
hostName = InetAddress.getLocalHost().getHostAddress();
listnerMap.clear();
}
public int getCacheServerPort() {
return cacheServerPort;
}
public String getCacheServerHost() {
return hostName;
}
public void stopCacheServer() {
this.cacheServer.stop();
}
public void setUpServerVM() throws Exception {
Properties gemFireProps = new Properties();
createCache(gemFireProps);
RegionFactory factory = cache.createRegionFactory(RegionShortcut.REPLICATE);
Region r = factory.create("serverRegion");
r.put("serverkey", "servervalue");
}
public void doClientRegionPut() {
Region<String, String> region = clientCache.getRegion("serverRegion");
assertEquals("servervalue", region.get("serverkey"));
region.put("clientkey", "clientvalue");
assertEquals("clientvalue", region.get("clientkey"));
}
public void doServerRegionPut() {
Region<String, String> region = cache.getRegion("serverRegion");
assertEquals("servervalue", region.get("serverkey"));
assertEquals("clientvalue", region.get("clientkey"));
}
public void doClientRegionRegisterInterest(boolean isDurable) {
Region<String, String> region = clientCache.getRegion("serverRegion");
region.registerInterestRegex(".*", // everything
InterestResultPolicy.DEFAULT, true);
}
private void doExpressInterestOnServer(boolean isDurable) {
LogWriterUtils.getLogWriter()
.info("Total ClientSessions " + cacheServer.getAllClientSessions().size());
for (ClientSession c : this.cacheServer.getAllClientSessions()) {
c.registerInterestRegex(SEPARATOR + "serverRegion", ".*", isDurable);
}
}
private void doRegisterListener() {
InterestRegistrationListener listener = new InterestRegistrationListener() {
@Override
public void close() {}
@Override
public void afterUnregisterInterest(InterestRegistrationEvent event) {
Integer count = InterestRegrListenerDUnitTest.this.listnerMap.get(UNREGISTER_INTEREST);
int intCount = 0;
if (count != null)
intCount = count.intValue();
intCount++;
InterestRegrListenerDUnitTest.this.listnerMap.put(UNREGISTER_INTEREST, intCount);
LogWriterUtils.getLogWriter()
.info("InterestRegistrationListener afterUnregisterInterest for "
+ event.getRegionName() + " keys " + event.getKeysOfInterest() + "Count " + intCount
+ " Client : " + event.getClientSession().toString());
}
@Override
public void afterRegisterInterest(InterestRegistrationEvent event) {
Integer count = InterestRegrListenerDUnitTest.this.listnerMap.get(REGISTER_INTEREST);
int intCount = 0;
if (count != null)
intCount = count.intValue();
intCount++;
InterestRegrListenerDUnitTest.this.listnerMap.put(REGISTER_INTEREST, intCount);
LogWriterUtils.getLogWriter()
.info("InterestRegistrationListener afterRegisterInterest for " + event.getRegionName()
+ " keys " + event.getKeysOfInterest() + "Count " + intCount + " Client : "
+ event.getClientSession().toString());
}
};
LogWriterUtils.getLogWriter().info("Registered InterestRegistationLister");
this.cacheServer.registerInterestRegistrationListener(listener);
}
public void setUpClientVM(String host, int port, boolean isDurable, String vmID) {
Properties gemFireProps = new Properties();
if (isDurable) {
gemFireProps.put(DURABLE_CLIENT_ID, vmID);
gemFireProps.put(DURABLE_CLIENT_TIMEOUT, "" + DURABLE_CLIENT_TIMEOUT_TEST);
}
ClientCacheFactory clientCacheFactory = new ClientCacheFactory(gemFireProps);
clientCacheFactory.addPoolServer(host, port);
clientCacheFactory.setPoolSubscriptionEnabled(true);
clientCacheFactory.setPoolMinConnections(5);
clientCache = clientCacheFactory.create();
ClientRegionFactory<String, String> regionFactory =
clientCache.createClientRegionFactory(ClientRegionShortcut.PROXY);
Region<String, String> region = regionFactory.create("serverRegion");
LogWriterUtils.getLogWriter()
.info("Client Cache is created in this vm connected to cacheServer " + host + ":" + port
+ " durable? " + isDurable + " with VMID=" + vmID + " region " + region.getFullPath()
+ " regionSize " + region.size());
assertNotNull(clientCache);
assertNotNull(region);
}
public static void setUpServerVMTask() throws Exception {
instance.setUpServerVM();
}
public static void createServerTask() throws Exception {
instance.createServer();
}
public static void setUpClientVMTask(String host, int port, boolean isDurable, String vmID)
throws Exception {
instance.setUpClientVM(host, port, isDurable, vmID);
}
public static Map<String, Integer> getListenerMapTask() throws Exception {
return instance.listnerMap;
}
public static void doClientRegionPutTask() {
instance.doClientRegionPut();
}
public static void doServerRegionPutTask() {
instance.doServerRegionPut();
}
public static void doExpressInterestOnServerTask(boolean isDurable) {
instance.doExpressInterestOnServer(isDurable);
}
public static void doRegisterListenerTask() {
instance.doRegisterListener();
}
public static Object[] getCacheServerEndPointTask() {
Object[] array = new Object[2];
array[0] = instance.getCacheServerHost();
array[1] = instance.getCacheServerPort();
return array;
}
public static void closeCacheTask() {
instance.cache.close();
}
public static void closeClientCacheTask(boolean keepAlive) {
instance.clientCache.close(keepAlive);
}
public static void doClientRegionRegisterInterestTask(boolean isDurable) {
instance.doClientRegionRegisterInterest(isDurable);
}
@Test
public void testDurableClientExit_ClientExpressedInterest() throws Exception {
final Host host = Host.getHost(0);
VM serverVM = host.getVM(0);
VM clientVM_1 = host.getVM(1);
VM clientVM_2 = host.getVM(2);
VM clientVM_3 = host.getVM(3);
serverVM.invoke(() -> InterestRegrListenerDUnitTest.setUpServerVMTask());
serverVM.invoke(() -> InterestRegrListenerDUnitTest.createServerTask());
Object array[] = (Object[]) serverVM
.invoke(() -> InterestRegrListenerDUnitTest.getCacheServerEndPointTask());
String hostName = (String) array[0];
int port = (Integer) array[1];
Object params[] = new Object[4];
params[0] = hostName;
params[1] = port;
params[2] = true;
params[3] = "VM_1";
LogWriterUtils.getLogWriter()
.info("Starting client1 with server endpoint <" + hostName + ">:" + port);
clientVM_1.invoke(InterestRegrListenerDUnitTest.class, "setUpClientVMTask", params);
params[3] = "VM_2";
LogWriterUtils.getLogWriter()
.info("Starting client2 with server endpoint <" + hostName + ">:" + port);
clientVM_2.invoke(InterestRegrListenerDUnitTest.class, "setUpClientVMTask", params);
params[3] = "VM_3";
LogWriterUtils.getLogWriter()
.info("Starting client2 with server endpoint <" + hostName + ">:" + port);
clientVM_3.invoke(InterestRegrListenerDUnitTest.class, "setUpClientVMTask", params);
params = new Object[1];
params[0] = true;
serverVM.invoke(() -> InterestRegrListenerDUnitTest.doRegisterListenerTask());
clientVM_1.invoke(InterestRegrListenerDUnitTest.class, "doClientRegionRegisterInterestTask",
params);
clientVM_2.invoke(InterestRegrListenerDUnitTest.class, "doClientRegionRegisterInterestTask",
params);
clientVM_3.invoke(InterestRegrListenerDUnitTest.class, "doClientRegionRegisterInterestTask",
params);
clientVM_1.invoke(() -> InterestRegrListenerDUnitTest.doClientRegionPutTask());
clientVM_2.invoke(() -> InterestRegrListenerDUnitTest.doClientRegionPutTask());
clientVM_3.invoke(() -> InterestRegrListenerDUnitTest.doClientRegionPutTask());
serverVM.invoke(() -> InterestRegrListenerDUnitTest.doServerRegionPutTask());
clientVM_1.invoke(() -> InterestRegrListenerDUnitTest.closeClientCacheTask(true));
clientVM_2.invoke(() -> InterestRegrListenerDUnitTest.closeClientCacheTask(true));
clientVM_3.invoke(() -> InterestRegrListenerDUnitTest.closeClientCacheTask(true));
Thread.sleep(2);
Map<String, Integer> listnerMap = (Map<String, Integer>) serverVM
.invoke(() -> InterestRegrListenerDUnitTest.getListenerMapTask());
LogWriterUtils.getLogWriter().info("Listener Map " + listnerMap);
int registerCount = getMapValueForKey(listnerMap, REGISTER_INTEREST);
int unregisterCount = getMapValueForKey(listnerMap, UNREGISTER_INTEREST);
assertEquals(3, registerCount);
assertEquals(0, unregisterCount);
LogWriterUtils.getLogWriter().info(
"Sleeping till durable client queue are expired and unregister event is called on to listener");
Thread.sleep((DURABLE_CLIENT_TIMEOUT_TEST + 5) * 1000);
listnerMap = (Map<String, Integer>) serverVM
.invoke(() -> InterestRegrListenerDUnitTest.getListenerMapTask());
LogWriterUtils.getLogWriter().info("Listener Map after sleeping " + listnerMap);
registerCount = getMapValueForKey(listnerMap, REGISTER_INTEREST);
unregisterCount = getMapValueForKey(listnerMap, UNREGISTER_INTEREST);
assertEquals(3, registerCount);
assertEquals(3, unregisterCount);
serverVM.invoke(() -> InterestRegrListenerDUnitTest.closeCacheTask());
}
@Test
public void testDurableClientExit_ServerExpressedInterest() throws Exception {
final Host host = Host.getHost(0);
VM serverVM = host.getVM(0);
VM clientVM_1 = host.getVM(1);
VM clientVM_2 = host.getVM(2);
VM clientVM_3 = host.getVM(3);
serverVM.invoke(() -> InterestRegrListenerDUnitTest.setUpServerVMTask());
serverVM.invoke(() -> InterestRegrListenerDUnitTest.createServerTask());
Object array[] = (Object[]) serverVM
.invoke(() -> InterestRegrListenerDUnitTest.getCacheServerEndPointTask());
String hostName = (String) array[0];
int port = (Integer) array[1];
Object params[] = new Object[4];
params[0] = hostName;
params[1] = port;
params[2] = true;
params[3] = "VM_1";
LogWriterUtils.getLogWriter()
.info("Starting client1 with server endpoint <" + hostName + ">:" + port);
clientVM_1.invoke(InterestRegrListenerDUnitTest.class, "setUpClientVMTask", params);
params[3] = "VM_2";
LogWriterUtils.getLogWriter()
.info("Starting client2 with server endpoint <" + hostName + ">:" + port);
clientVM_2.invoke(InterestRegrListenerDUnitTest.class, "setUpClientVMTask", params);
params[3] = "VM_3";
LogWriterUtils.getLogWriter()
.info("Starting client2 with server endpoint <" + hostName + ">:" + port);
clientVM_3.invoke(InterestRegrListenerDUnitTest.class, "setUpClientVMTask", params);
params = new Object[1];
params[0] = true;
serverVM.invoke(() -> InterestRegrListenerDUnitTest.doRegisterListenerTask());
serverVM.invoke(InterestRegrListenerDUnitTest.class, "doExpressInterestOnServerTask", params);
clientVM_1.invoke(() -> InterestRegrListenerDUnitTest.doClientRegionPutTask());
clientVM_2.invoke(() -> InterestRegrListenerDUnitTest.doClientRegionPutTask());
clientVM_3.invoke(() -> InterestRegrListenerDUnitTest.doClientRegionPutTask());
serverVM.invoke(() -> InterestRegrListenerDUnitTest.doServerRegionPutTask());
clientVM_1.invoke(() -> InterestRegrListenerDUnitTest.closeClientCacheTask(true));
clientVM_2.invoke(() -> InterestRegrListenerDUnitTest.closeClientCacheTask(true));
clientVM_3.invoke(() -> InterestRegrListenerDUnitTest.closeClientCacheTask(true));
Thread.sleep(2);
Map<String, Integer> listnerMap = (Map<String, Integer>) serverVM
.invoke(() -> InterestRegrListenerDUnitTest.getListenerMapTask());
LogWriterUtils.getLogWriter().info("Listener Map " + listnerMap);
int registerCount = getMapValueForKey(listnerMap, REGISTER_INTEREST);
int unregisterCount = getMapValueForKey(listnerMap, UNREGISTER_INTEREST);
assertEquals(3, registerCount);
assertEquals(0, unregisterCount);
LogWriterUtils.getLogWriter().info(
"Sleeping till durable client queue are expired and unregister event is called on to listener");
Thread.sleep((DURABLE_CLIENT_TIMEOUT_TEST + 5) * 1000);
listnerMap = (Map<String, Integer>) serverVM
.invoke(() -> InterestRegrListenerDUnitTest.getListenerMapTask());
LogWriterUtils.getLogWriter().info("Listener Map after sleeping " + listnerMap);
registerCount = getMapValueForKey(listnerMap, REGISTER_INTEREST);
unregisterCount = getMapValueForKey(listnerMap, UNREGISTER_INTEREST);
assertEquals(3, registerCount);
assertEquals(3, unregisterCount);
serverVM.invoke(() -> InterestRegrListenerDUnitTest.closeCacheTask());
}
@Test
public void testDurableClientExit_ServerExpressedInterest_NonDurableInterest() throws Exception {
final Host host = getHost(0);
final VM serverVM = host.getVM(0);
final VM clientVM_1 = host.getVM(1);
final VM clientVM_2 = host.getVM(2);
final VM clientVM_3 = host.getVM(3);
serverVM.invoke(() -> setUpServerVMTask());
serverVM.invoke(() -> createServerTask());
Object array[] = (Object[]) serverVM
.invoke(() -> getCacheServerEndPointTask());
String hostName = (String) array[0];
int port = (Integer) array[1];
Object params[] = new Object[4];
params[0] = hostName;
params[1] = port;
params[2] = true;
params[3] = "VM_1";
getLogWriter()
.info("Starting client1 with server endpoint <" + hostName + ">:" + port);
clientVM_1.invoke(InterestRegrListenerDUnitTest.class, "setUpClientVMTask", params);
params[3] = "VM_2";
getLogWriter()
.info("Starting client2 with server endpoint <" + hostName + ">:" + port);
clientVM_2.invoke(InterestRegrListenerDUnitTest.class, "setUpClientVMTask", params);
params[3] = "VM_3";
getLogWriter()
.info("Starting client2 with server endpoint <" + hostName + ">:" + port);
clientVM_3.invoke(InterestRegrListenerDUnitTest.class, "setUpClientVMTask", params);
params = new Object[1];
params[0] = false; // non-durable interest
serverVM.invoke(() -> doRegisterListenerTask());
serverVM.invoke(InterestRegrListenerDUnitTest.class, "doExpressInterestOnServerTask", params);
clientVM_1.invoke(() -> doClientRegionPutTask());
clientVM_2.invoke(() -> doClientRegionPutTask());
clientVM_3.invoke(() -> doClientRegionPutTask());
serverVM.invoke(() -> doServerRegionPutTask());
clientVM_1.invoke(() -> closeClientCacheTask(false));
clientVM_2.invoke(() -> closeClientCacheTask(false));
clientVM_3.invoke(() -> closeClientCacheTask(false));
sleep(2);
WaitCriterion wc = new WaitCriterion() {
int registerCount = 0;
int unregisterCount = 0;
@Override
public boolean done() {
Map<String, Integer> listnerMap = (Map<String, Integer>) serverVM
.invoke(() -> getListenerMapTask());
getLogWriter().info("Listener Map " + listnerMap);
registerCount = getMapValueForKey(listnerMap, REGISTER_INTEREST);
unregisterCount = getMapValueForKey(listnerMap, UNREGISTER_INTEREST);
if (registerCount == 3 && unregisterCount == 3) {
return true;
}
getLogWriter()
.info("Waiting for counts to each reach 3. Current registerCount=" + registerCount
+ "; unregisterCount=" + unregisterCount);
return false;
}
@Override
public String description() {
return "Waiting for counts to each reach 3. Current registerCount=" + registerCount
+ "; unregisterCount=" + unregisterCount;
}
};
GeodeAwaitility.await().untilAsserted(wc);
getLogWriter().info(
"Sleeping till durable client queue are expired and unregister event is called on to listener");
sleep((DURABLE_CLIENT_TIMEOUT_TEST + 5) * 1000);
serverVM.invoke(() -> closeCacheTask());
}
private int getMapValueForKey(Map<String, Integer> map, String key) {
if (map.containsKey(key))
return map.get(key).intValue();
else
return 0;
}
}