blob: 21d40bc15971d88f9c50ab0698c4768d0fcb95b4 [file] [log] [blame]
/*=========================================================================
* Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved.
* This product is protected by U.S. and international copyright
* and intellectual property laws. Pivotal products are covered by
* one or more patents listed at http://www.pivotal.io/patents.
*=========================================================================
*/
package com.gemstone.gemfire.internal.cache.tier.sockets;
import java.io.IOException;
import java.util.Iterator;
import java.util.Properties;
import junit.framework.Assert;
import com.gemstone.gemfire.cache.AttributesFactory;
import com.gemstone.gemfire.cache.Cache;
import com.gemstone.gemfire.cache.CacheException;
import com.gemstone.gemfire.cache.CacheFactory;
import com.gemstone.gemfire.cache.CacheWriterException;
import com.gemstone.gemfire.cache.DataPolicy;
import com.gemstone.gemfire.cache.InterestResultPolicy;
import com.gemstone.gemfire.cache.Region;
import com.gemstone.gemfire.cache.RegionAttributes;
import com.gemstone.gemfire.cache.Scope;
import com.gemstone.gemfire.cache.client.Pool;
import com.gemstone.gemfire.cache.client.PoolManager;
import com.gemstone.gemfire.cache.client.internal.Connection;
import com.gemstone.gemfire.cache.client.internal.PoolImpl;
import com.gemstone.gemfire.cache.client.internal.QueueConnectionImpl;
import com.gemstone.gemfire.cache.util.BridgeServer;
import com.gemstone.gemfire.cache30.CacheSerializableRunnable;
import com.gemstone.gemfire.distributed.DistributedSystem;
import com.gemstone.gemfire.distributed.internal.DistributionConfig;
import com.gemstone.gemfire.internal.AvailablePort;
import com.gemstone.gemfire.internal.cache.BridgeServerImpl;
import dunit.DistributedTestCase;
import dunit.Host;
import dunit.SerializableRunnable;
import dunit.VM;
/**
*
* @author Yogesh Mahajan
* @author Suyog Bhokare
*
*/
public class InterestListEndpointDUnitTest extends DistributedTestCase
{
static VM server1 = null;
static VM server2 = null;
static VM client1 = null;
protected static Cache cache = null;
private static int PORT1;
private static int PORT2;
private static Connection conn1 ;
private static PoolImpl pool;
private static final String REGION_NAME = "InterestListEndpointDUnitTest_region";
private static final String k1 = "k1";
private static final String k2 = "k2";
private static final String client_k1 = "client-k1";
private static final String client_k2 = "client-k2";
private static final String server_k1 = "server-k1";
private static final String server_k2 = "server-k2";
static InterestListEndpointDUnitTest impl;
/** constructor */
public InterestListEndpointDUnitTest(String name) {
super(name);
}
public void setUp() throws Exception {
super.setUp();
disconnectAllFromDS();
pause(5000);
final Host host = Host.getHost(0);
server1 = host.getVM(0);
server2 = host.getVM(1);
client1 = host.getVM(2);
createImpl();
for (int i=0; i<4; i++) {
host.getVM(i).invoke(getClass(), "createImpl", null);
}
// create servers first
PORT1 = initServerCache(server1);
PORT2 = initServerCache(server2);
// then create client
pause(5000); // [bruce] avoid ConnectException
client1.invoke(impl.getClass(), "createClientCache", new Object[] {
getServerHostName(server1.getHost()), new Integer(PORT1),new Integer(PORT2)});
}
/** subclass support */
public static void createImpl() {
impl = new InterestListEndpointDUnitTest("temp");
}
private void createCache(Properties props) throws Exception
{
DistributedSystem ds = getSystem(props);
cache = CacheFactory.create(ds);
assertNotNull(cache);
}
/**
* tests wheteher upadets are sent to clients if put on server directly
*
*/
public void testDirectPutOnServer()
{
client1.invoke(impl.getClass(), "createEntriesK1andK2");
server1.invoke(impl.getClass(), "createEntriesK1andK2");
server2.invoke(impl.getClass(), "createEntriesK1andK2");
client1.invoke(impl.getClass(), "registerKey1");
//directly put on server
server1.invoke(impl.getClass(), "put");
client1.invoke(impl.getClass(), "verifyPut");
}
/**
* put on non interest list ep and verify updates
*
*/
public void testInterestListEndpoint()
{
client1.invoke(impl.getClass(), "createEntriesK1andK2");
server2.invoke(impl.getClass(), "createEntriesK1andK2"); // server
server1.invoke(impl.getClass(), "createEntriesK1andK2"); // server
client1.invoke(impl.getClass(), "registerKey1");
server1.invoke(impl.getClass(), "verifyIfNotInterestListEndpointAndThenPut");
server2.invoke(impl.getClass(), "verifyIfNotInterestListEndpointAndThenPut");
client1.invoke(impl.getClass(), "verifyPut");
}
public void testInterestListEndpointAfterFailover() throws Exception
{
final long maxWaitTime = 20000;
client1.invoke(impl.getClass(), "createEntriesK1andK2");
server2.invoke(impl.getClass(), "createEntriesK1andK2");
server1.invoke(impl.getClass(), "createEntriesK1andK2");
client1.invoke(impl.getClass(), "registerKey1");
boolean firstIsPrimary = isVm0Primary();
VM primary = firstIsPrimary? server1 : server2;
primary.invoke(impl.getClass(), "stopILEndpointServer");
pause(5000);
//Since the loadbalancing policy is roundrobin & there are two servers so
// do two dumb puts, which will ensure that fail over happens from the
// interest list end point in case Live ServerMonitor is not working
client1.invoke(new CacheSerializableRunnable("Ensure that the failover from ILEP occurs") {
public void run2() throws CacheException
{
Region r = cache.getRegion("/"+REGION_NAME);
String poolName = r.getAttributes().getPoolName();
assertNotNull(poolName);
final PoolImpl pool = (PoolImpl)PoolManager.find(poolName);
assertNotNull(pool);
pool.acquireConnection();
try {
r.put("ping", "pong1"); // Used in the case where we don't have a LiveServerMonitorThread
} catch (CacheWriterException itsOK) {}
try {
r.put("ping", "pong2"); // Used in the case where we don't have a LiveServerMonitorThread
} catch (CacheWriterException itsOK) {}
WaitCriterion ev = new WaitCriterion() {
public boolean done() {
return pool.getConnectedServerCount() != 2;
}
public String description() {
return null;
}
};
DistributedTestCase.waitForCriterion(ev, maxWaitTime, 200, true);
}
});
//put on stopped server
primary.invoke(impl.getClass(), "put");
client1.invoke(impl.getClass(), "verifyPut");
}
public static boolean isVm0Primary() throws Exception {
int port = ((Integer)client1.invoke(impl.getClass(), "getPrimaryPort")).intValue();
return port == PORT1;
}
public static int getPrimaryPort() throws Exception {
Region r1 = cache.getRegion("/" + REGION_NAME);
String poolName = r1.getAttributes().getPoolName();
assertNotNull(poolName);
pool = (PoolImpl)PoolManager.find(poolName);
assertNotNull(pool);
assertTrue(pool.getPrimaryName() != null);
return pool.getPrimaryPort();
}
public void testUpdaterThreadIsAliveForFailedEndPoint(){
client1.invoke(impl.getClass(), "acquirePoolConnection");
client1.invoke(impl.getClass(), "processException");
client1.invoke(impl.getClass(), "verifyUpdaterThreadIsAlive");
}
public static void acquirePoolConnection()
{
try {
Region r1 = cache.getRegion("/"+REGION_NAME);
assertNotNull(r1);
String poolName = r1.getAttributes().getPoolName();
assertNotNull(poolName);
pool = (PoolImpl)PoolManager.find(poolName);
assertNotNull(pool);
conn1 = pool.getPrimaryConnection();
assertNotNull(conn1);
}
catch (Exception ex) {
throw new RuntimeException("Exception while setting acquireConnections ", ex);
}
}
public static void processException()
{
try {
pool.processException(new IOException(),conn1);
}
catch (Exception ex) {
throw new RuntimeException("Exception while setting processException ", ex);
}
}
public static void verifyUpdaterThreadIsAlive() throws InterruptedException
{
QueueConnectionImpl conn2 = (QueueConnectionImpl) pool.getPrimaryConnection();
Assert.assertNotSame(conn1, conn2);
Assert.assertFalse(conn1.getServer().equals(conn2.getServer()));
assertNull(((QueueConnectionImpl)conn1).getUpdater());
assertTrue((conn2).getUpdater().isAlive());
}
public static void stopILEndpointServer()
{
try {
Cache c = CacheFactory.getAnyInstance();
assertEquals("More than one BridgeServer", 1, c.getBridgeServers().size());
BridgeServerImpl bs = (BridgeServerImpl) c.getBridgeServers().iterator().next();
assertNotNull(bs);
assertNotNull(bs.getAcceptor());
assertNotNull(bs.getAcceptor().getCacheClientNotifier());
Iterator iter_prox = bs.getAcceptor().getCacheClientNotifier().getClientProxies().iterator();
if (iter_prox.hasNext()) {
CacheClientProxy proxy = (CacheClientProxy)iter_prox.next();
//if (proxy._interestList._keysOfInterest.get("/"+REGION_NAME) != null) {
if(proxy.isPrimary()){
Iterator iter = cache.getBridgeServers().iterator();
if (iter.hasNext()) {
BridgeServer server = (BridgeServer)iter.next();
cache.getLogger().fine("stopping server " + server);
server.stop();
}
}
}
}
catch (Exception ex) {
throw new RuntimeException("Exception while setting stopServer ", ex);
}
}
public static void createEntriesK1andK2()
{
try {
Region r1 = cache.getRegion("/"+ REGION_NAME);
assertNotNull(r1);
if (!r1.containsKey(k1)) {
r1.create(k1, client_k1);
}
if (!r1.containsKey(k2)) {
r1.create(k2, client_k2);
}
if (r1.getAttributes().getPartitionAttributes() == null) {
assertEquals(r1.getEntry(k1).getValue(), client_k1);
assertEquals(r1.getEntry(k2).getValue(), client_k2);
}
}
catch (Exception ex) {
throw new RuntimeException("failed while createEntries()", ex);
}
}
public static void createClientCache(String host, Integer port1, Integer port2) throws Exception
{
CacheServerTestUtil.disableShufflingOfEndpoints();
Properties props = new Properties();
props.setProperty(DistributionConfig.MCAST_PORT_NAME, "0");
props.setProperty(DistributionConfig.LOCATORS_NAME, "");
new InterestListEndpointDUnitTest("temp").createCache(props);
Pool p;
try {
p = PoolManager.createFactory()
.addServer(host, port1.intValue())
.addServer(host, port2.intValue())
.setSubscriptionEnabled(true)
.setSubscriptionRedundancy(-1)
.setMinConnections(6)
.setSocketBufferSize(32768)
.setReadTimeout(2000)
// .setRetryInterval(1000)
// .setRetryAttempts(5)
.create("InterestListEndpointDUnitTestPool");
} finally {
CacheServerTestUtil.enableShufflingOfEndpoints();
}
AttributesFactory factory = new AttributesFactory();
factory.setScope(Scope.DISTRIBUTED_ACK);
factory.setPoolName(p.getName());
RegionAttributes attrs = factory.create();
cache.createRegion(REGION_NAME, attrs);
}
protected int getMaxThreads() {
return 0;
}
private int initServerCache(VM server) {
Object[] args = new Object[] {new Integer(getMaxThreads())};
return ((Integer)server.invoke(InterestListEndpointDUnitTest.class,
"createServerCache",
args)).intValue();
}
public static Integer createServerCache(Integer maxThreads) throws Exception
{
new InterestListEndpointDUnitTest("temp").createCache(new Properties());
RegionAttributes attrs = impl.createServerCacheAttributes();
cache.createRegion(REGION_NAME, attrs);
BridgeServer server1 = cache.addBridgeServer();
int port = AvailablePort.getRandomAvailablePort(AvailablePort.SOCKET) ;
server1.setPort(port);
server1.setMaxThreads(maxThreads.intValue());
server1.setNotifyBySubscription(true);
server1.start();
return new Integer(server1.getPort());
}
protected RegionAttributes createServerCacheAttributes()
{
AttributesFactory factory = new AttributesFactory();
factory.setScope(Scope.DISTRIBUTED_ACK);
factory.setDataPolicy(DataPolicy.REPLICATE);
return factory.create();
}
public static void put()
{
try {
Region r = cache.getRegion("/"+REGION_NAME);
assertNotNull(r);
r.put(k1, server_k1);
r.put(k2, server_k2);
}
catch (Exception ex) {
throw new RuntimeException("failed while region.put()", ex);
}
}
public static void verifyIfNotInterestListEndpointAndThenPut()
{
try {
Cache c = CacheFactory.getAnyInstance();
assertEquals("More than one BridgeServer", 1, c.getBridgeServers().size());
BridgeServerImpl bs = (BridgeServerImpl) c.getBridgeServers().iterator().next();
assertNotNull(bs);
assertNotNull(bs.getAcceptor());
assertNotNull(bs.getAcceptor().getCacheClientNotifier());
Iterator iter = bs.getAcceptor().getCacheClientNotifier().getClientProxies().iterator();
//only one server thats why if and not while
if (iter.hasNext()) {
CacheClientProxy proxy = (CacheClientProxy)iter.next();
//if (proxy._interestList._keysOfInterest.get("/"+ REGION_NAME) == null) {
if(! proxy.isPrimary()){
Region r = cache.getRegion("/"+REGION_NAME);
r.put(k1, server_k1);
r.put(k2, server_k2);
}
}
}
catch (Exception ex) {
fail("failed while verifyIfNotInterestListEndpointAndThenPut()", ex);
}
}
public static void registerKey1()
{
try {
Region r = cache.getRegion("/"+REGION_NAME);
assertNotNull(r);
r.registerInterest(k1, InterestResultPolicy.KEYS);
}
catch (Exception ex) {
fail("failed while region.registerInterest()", ex);
}
}
public static void verifyPut()
{
try {
final Region r = cache.getRegion("/"+ REGION_NAME);
assertNotNull(r);
WaitCriterion ev = new WaitCriterion() {
public boolean done() {
Region.Entry e1 = r.getEntry(k1);
if (e1 == null) return false;
Region.Entry e2 = r.getEntry(k2);
if (e2 == null) return false;
Object v1 = e1.getValue();
if (!server_k1.equals(v1)) return false;
Object v2 = e2.getValue();
if (!client_k2.equals(v2)) return false;
// our state is ready for the assertions
return true;
}
public String description() {
Region.Entry e1 = r.getEntry(k1);
if (e1 == null) return "Entry for " + k1 + " is null";
Region.Entry e2 = r.getEntry(k2);
if (e2 == null) return "Entry for " + k2 + " is null";
Object v1 = e1.getValue();
if (!server_k1.equals(v1)) {
return "v1 supposed to be " + server_k1 + " but is " + v1;
}
Object v2 = e2.getValue();
if (!client_k2.equals(v2)) {
return "v2 supposed to be " + client_k2 + " but is " + v2;
}
return "Test missed a success";
}
};
DistributedTestCase.waitForCriterion(ev, 20 * 1000, 200, true);
//yes update
assertEquals(server_k1, r.getEntry(k1).getValue());
//no update
assertEquals(client_k2, r.getEntry(k2).getValue());
}
catch (Exception ex) {
throw new RuntimeException("failed while region.verifyPut()", ex);
}
}
public static void closeCache()
{
if (cache != null && !cache.isClosed()) {
cache.close();
cache.getDistributedSystem().disconnect();
}
}
public void tearDown2() throws Exception
{
// Close client cache first, then server caches
client1.invoke(impl.getClass(), "closeCache");
server2.invoke(impl.getClass(), "closeCache");
server1.invoke(impl.getClass(), "closeCache");
CacheServerTestUtil.resetDisableShufflingOfEndpointsFlag();
cache = null;
invokeInEveryVM(new SerializableRunnable() { public void run() { cache = null; } });
}
}