blob: cce74ffe159b32b67fd1fe29b797c9011e72b7da [file] [log] [blame]
/*=========================================================================
* Copyright (c) 2002-2014 Pivotal Software, Inc. All Rights Reserved.
* This product is protected by U.S. and international copyright
* and intellectual property laws. Pivotal products are covered by
* more patents listed at http://www.pivotal.io/patents.
*=========================================================================
*/
package com.gemstone.gemfire.internal.cache.tier.sockets;
import com.gemstone.gemfire.cache.AttributesFactory;
import com.gemstone.gemfire.cache.Cache;
import com.gemstone.gemfire.cache.CacheFactory;
import com.gemstone.gemfire.cache.InterestResultPolicy;
import com.gemstone.gemfire.cache.MirrorType;
import com.gemstone.gemfire.cache.Region;
import com.gemstone.gemfire.cache.RegionAttributes;
import com.gemstone.gemfire.cache.Scope;
import com.gemstone.gemfire.cache.client.PoolManager;
import com.gemstone.gemfire.cache.client.internal.Connection;
import com.gemstone.gemfire.cache.client.internal.PoolImpl;
import com.gemstone.gemfire.cache.client.internal.RegisterInterestTracker;
import com.gemstone.gemfire.cache.client.internal.ServerRegionProxy;
import com.gemstone.gemfire.cache.util.BridgeServer;
import com.gemstone.gemfire.distributed.DistributedSystem;
import com.gemstone.gemfire.distributed.internal.DistributionConfig;
import com.gemstone.gemfire.distributed.internal.ServerLocation;
import com.gemstone.gemfire.internal.AvailablePort;
import com.gemstone.gemfire.internal.cache.BridgeObserverAdapter;
import com.gemstone.gemfire.internal.cache.BridgeObserverHolder;
import com.gemstone.gemfire.internal.cache.BridgeServerImpl;
import com.gemstone.gemfire.internal.cache.LocalRegion;
import com.gemstone.gemfire.internal.cache.tier.InterestType;
import dunit.DistributedTestCase;
import dunit.Host;
import dunit.VM;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Properties;
import java.util.Set;
/**
* Tests Interest Registration Functionality
*
*/
public class HAInterestBaseTest extends DistributedTestCase
{
private static int MAX_WAIT = 60000;
protected static Cache cache = null;
protected static PoolImpl pool = null ;
protected static Connection conn = null ;
protected static int PORT1 ;
protected static int PORT2 ;
protected static int PORT3 ;
protected static final String k1 = "k1";
protected static final String k2 = "k2";
protected static final String client_k1 = "client-k1";
protected static final String client_k2 = "client-k2";
protected static final String server_k1 = "server-k1";
protected static final String server_k2 = "server-k2";
protected static final String server_k1_updated = "server_k1_updated";
// To verify the beforeInterestRegistration is called or not
protected static boolean isBeforeRegistrationCallbackCalled = false;
// To verify the beforeInterestRecoeryCallbackCalled is called or not
protected static boolean isBeforeInterestRecoveryCallbackCalled = false;
//To verify the afterInterestRegistration is called or not
protected static boolean isAfterRegistrationCallbackCalled = false;
protected static final String REGION_NAME = "HAInterestBaseTest_region";
protected static Host host = null ;
protected static VM server1 = null;
protected static VM server2 = null;
protected static VM server3 = null;
protected volatile static boolean exceptionOccured = false;
/** constructor */
public HAInterestBaseTest(String name) {
super(name);
}
public void setUp() throws Exception
{
super.setUp();
host = Host.getHost(0);
server1 = host.getVM(0);
server2 = host.getVM(1);
server3 = host.getVM(2);
CacheServerTestUtil.disableShufflingOfEndpoints();
// start servers first
PORT1 = ((Integer) server1.invoke(HAInterestBaseTest.class, "createServerCache")).intValue();
PORT2 = ((Integer) server2.invoke(HAInterestBaseTest.class, "createServerCache")).intValue();
PORT3 = ((Integer) server3.invoke(HAInterestBaseTest.class, "createServerCache")).intValue();
exceptionOccured = false;
}
/**
* Return the current primary waiting for a primary to exist.
* @since 5.7
*/
public static VM getPrimaryVM() {
return getPrimaryVM(null);
}
/**
* Return the current primary waiting for a primary to exist and for it
* not to be the oldPrimary (if oldPrimary is NOT null).
* @since 5.7
*/
public static VM getPrimaryVM(final VM oldPrimary) {
WaitCriterion wc = new WaitCriterion() {
String excuse;
public boolean done() {
int primaryPort = pool.getPrimaryPort();
if (primaryPort == -1) {
return false;
}
// we have a primary
VM currentPrimary = getServerVM(primaryPort);
if (currentPrimary != oldPrimary) {
return true;
}
return false;
}
public String description() {
return excuse;
}
};
DistributedTestCase.waitForCriterion(wc, MAX_WAIT, 1000, true);
int primaryPort = pool.getPrimaryPort();
assertTrue(primaryPort != -1);
VM currentPrimary = getServerVM(primaryPort);
assertTrue(currentPrimary != oldPrimary);
return currentPrimary;
}
public static VM getBackupVM() {
return getBackupVM(null);
}
public static VM getBackupVM(VM stoppedBackup) {
VM currentPrimary = getPrimaryVM(null);
if (currentPrimary != server2 && server2 != stoppedBackup) {
return server2;
} else if (currentPrimary != server3 && server3 != stoppedBackup) {
return server3;
} else if (currentPrimary != server1 && server1 != stoppedBackup) {
return server1;
} else {
fail("expected currentPrimary " + currentPrimary + " to be "
+ server1 + ", or "
+ server2 + ", or "
+ server3);
return null;
}
}
/**
* Given a server vm (server1, server2, or server3) return its port.
* @since 5.7
*/
public static int getServerPort(VM vm) {
if (vm == server1) {
return PORT1;
} else if (vm == server2) {
return PORT2;
} else if (vm == server3) {
return PORT3;
} else {
fail("expected vm " + vm + " to be "
+ server1
+ ", or "
+ server2
+ ", or "
+ server3);
return -1;
}
}
/**
* Given a server port (PORT1, PORT2, or PORT3) return its vm.
* @since 5.7
*/
public static VM getServerVM(int port) {
if (port == PORT1) {
return server1;
} else if (port == PORT2) {
return server2;
} else if (port == PORT3) {
return server3;
} else {
fail("expected port " + port + " to be "
+ PORT1
+ ", or "
+ PORT2
+ ", or "
+ PORT3);
return null;
}
}
public static void verifyRefreshedEntriesFromServer()
{
final Region r1 = cache.getRegion(Region.SEPARATOR + REGION_NAME);
assertNotNull(r1);
WaitCriterion wc = new WaitCriterion() {
String excuse;
public boolean done() {
Region.Entry re = r1.getEntry(k1);
if (re == null) return false;
Object val = re.getValue();
return client_k1.equals(val);
}
public String description() {
return excuse;
}
};
DistributedTestCase.waitForCriterion(wc, MAX_WAIT, 1000, true);
wc = new WaitCriterion() {
String excuse;
public boolean done() {
Region.Entry re = r1.getEntry(k2);
if (re == null) return false;
Object val = re.getValue();
return client_k2.equals(val);
}
public String description() {
return excuse;
}
};
DistributedTestCase.waitForCriterion(wc, MAX_WAIT, 1000, true);
// assertEquals(client_k1, r1.getEntry(k1).getValue());
// assertEquals(client_k2 ,r1.getEntry(k2).getValue());
}
public static void verifyDeadAndLiveServers(final int expectedDeadServers,
final int expectedLiveServers)
{
WaitCriterion wc = new WaitCriterion() {
String description;
public boolean done() {
return pool.getConnectedServerCount() == expectedLiveServers;
}
public String description() {
return description;
}
};
DistributedTestCase.waitForCriterion(wc, MAX_WAIT, 1000, true);
// while (proxy.getDeadServers().size() != expectedDeadServers) { // wait until condition is met
// assertTrue("Waited over " + maxWaitTime + "for dead servers to become : " + expectedDeadServers + " This issue can occur on Solaris as DSM thread get stuck in connectForServer() call, and hence not recovering any newly started server This may be beacuase of tcp_ip_abort_cinterval kernal level property on solaris which has 3 minutes as a default value",
// (System.currentTimeMillis() - start) < maxWaitTime);
// try {
// Thread.yield();
// synchronized(delayLock) {delayLock.wait(2000);}
// }
// catch (InterruptedException ie) {
// fail("Interrupted while waiting ", ie);
// }
// }
// start = System.currentTimeMillis();
}
public static void putK1andK2()
{
Region r1 = cache.getRegion(Region.SEPARATOR + REGION_NAME);
assertNotNull(r1);
try {
r1.put(k1, server_k1);
r1.put(k2, server_k2);
}
catch (Exception e) {
fail("Test failed due to Exception in putK1andK2 ::" + e);
}
}
public static void setBridgeObserverForBeforeInterestRecoveryFailure()
{
PoolImpl.BEFORE_RECOVER_INTEREST_CALLBACK_FLAG = true;
BridgeObserverHolder.setInstance(new BridgeObserverAdapter() {
public void beforeInterestRecovery()
{
synchronized (HAInterestBaseTest.class) {
Thread t = new Thread (){
public void run(){
getBackupVM().invoke(HAInterestBaseTest.class, "startServer");
getPrimaryVM().invoke(HAInterestBaseTest.class, "stopServer");
}
};
t.start();
try {
DistributedTestCase.join(t, 30 * 1000, getLogWriter());
}catch(Exception ignore) {
exceptionOccured= true;
}
HAInterestBaseTest.isBeforeInterestRecoveryCallbackCalled = true;
HAInterestBaseTest.class.notify();
PoolImpl.BEFORE_RECOVER_INTEREST_CALLBACK_FLAG = false;
}
}
});
}
public static void setBridgeObserverForBeforeInterestRecovery()
{
PoolImpl.BEFORE_RECOVER_INTEREST_CALLBACK_FLAG = true;
BridgeObserverHolder.setInstance(new BridgeObserverAdapter() {
public void beforeInterestRecovery()
{
synchronized (HAInterestBaseTest.class) {
Thread t = new Thread (){
public void run(){
Region r1 = cache.getRegion(Region.SEPARATOR + REGION_NAME);
assertNotNull(r1);
try {
r1.put(k1, server_k1_updated);
} catch (Exception e) {
e.printStackTrace();
fail("Test Failed due to ..."+e);
}
}
};
t.start();
HAInterestBaseTest.isBeforeInterestRecoveryCallbackCalled = true;
HAInterestBaseTest.class.notify();
PoolImpl.BEFORE_RECOVER_INTEREST_CALLBACK_FLAG = false;
}
}
});
}
public static void waitForBeforeInterestRecoveryCallBack()
{
assertNotNull(cache);
synchronized (HAInterestBaseTest.class) {
while (!isBeforeInterestRecoveryCallbackCalled) {
try {
HAInterestBaseTest.class.wait();
}
catch (InterruptedException e) {
fail("Test failed due to InterruptedException in waitForBeforeInterstRecovery()");
}
}
}
}
public static void setBridgeObserverForBeforeRegistration(final VM vm)
{
PoolImpl.BEFORE_REGISTER_CALLBACK_FLAG = true;
BridgeObserverHolder.setInstance(new BridgeObserverAdapter() {
public void beforeInterestRegistration()
{
synchronized (HAInterestBaseTest.class) {
vm.invoke(HAInterestBaseTest.class, "startServer");
HAInterestBaseTest.isBeforeRegistrationCallbackCalled = true;
HAInterestBaseTest.class.notify();
PoolImpl.BEFORE_REGISTER_CALLBACK_FLAG = false;
}
}
});
}
public static void waitForBeforeRegistrationCallback()
{
assertNotNull(cache);
synchronized (HAInterestBaseTest.class) {
while (!isBeforeRegistrationCallbackCalled) {
try {
HAInterestBaseTest.class.wait();
}
catch (InterruptedException e) {
fail("Test failed due to InterruptedException in waitForBeforeRegistrationCallback()");
}
}
}
}
public static void setBridgeObserverForAfterRegistration(final VM vm)
{
PoolImpl.AFTER_REGISTER_CALLBACK_FLAG = true;
BridgeObserverHolder.setInstance(new BridgeObserverAdapter() {
public void afterInterestRegistration()
{
synchronized (HAInterestBaseTest.class) {
vm.invoke(HAInterestBaseTest.class, "startServer");
HAInterestBaseTest.isAfterRegistrationCallbackCalled = true;
HAInterestBaseTest.class.notify();
PoolImpl.AFTER_REGISTER_CALLBACK_FLAG = false;
}
}
});
}
public static void waitForAfterRegistrationCallback()
{
assertNotNull(cache);
if (!isAfterRegistrationCallbackCalled) {
synchronized (HAInterestBaseTest.class) {
while (!isAfterRegistrationCallbackCalled) {
try {
HAInterestBaseTest.class.wait();
}
catch (InterruptedException e) {
fail("Test failed due to InterruptedException in waitForAfterRegistrationCallback()");
}
}
}
}
}
public static void unSetBridgeObserverForRegistrationCallback()
{
synchronized (HAInterestBaseTest.class) {
PoolImpl.BEFORE_REGISTER_CALLBACK_FLAG = false;
PoolImpl.AFTER_REGISTER_CALLBACK_FLAG = false ;
HAInterestBaseTest.isBeforeRegistrationCallbackCalled = false;
HAInterestBaseTest.isAfterRegistrationCallbackCalled = false ;
}
}
public static void verifyDispatcherIsAlive()
{
try {
assertEquals("More than one BridgeServer", 1, cache.getBridgeServers()
.size());
WaitCriterion wc = new WaitCriterion() {
String excuse;
public boolean done() {
return cache.getBridgeServers().size() == 1;
}
public String description() {
return excuse;
}
};
DistributedTestCase.waitForCriterion(wc, MAX_WAIT, 1000, true);
BridgeServerImpl bs = (BridgeServerImpl)cache.getBridgeServers()
.iterator().next();
assertNotNull(bs);
assertNotNull(bs.getAcceptor());
assertNotNull(bs.getAcceptor().getCacheClientNotifier());
final CacheClientNotifier ccn = bs.getAcceptor().getCacheClientNotifier();
wc = new WaitCriterion() {
String excuse;
public boolean done() {
return ccn.getClientProxies().size() > 0;
}
public String description() {
return excuse;
}
};
DistributedTestCase.waitForCriterion(wc, MAX_WAIT, 1000, true);
wc = new WaitCriterion() {
String excuse;
Iterator iter_prox;
CacheClientProxy proxy;
public boolean done() {
iter_prox = ccn.getClientProxies().iterator();
if(iter_prox.hasNext()) {
proxy = (CacheClientProxy)iter_prox.next();
return proxy._messageDispatcher.isAlive();
}
else {
return false;
}
}
public String description() {
return excuse;
}
};
DistributedTestCase.waitForCriterion(wc, MAX_WAIT, 1000, true);
}
catch (Exception ex) {
fail("while setting verifyDispatcherIsAlive " + ex);
}
}
public static void verifyDispatcherIsNotAlive()
{
try {
WaitCriterion wc = new WaitCriterion() {
String excuse;
public boolean done() {
return cache.getBridgeServers().size() == 1;
}
public String description() {
return excuse;
}
};
DistributedTestCase.waitForCriterion(wc, MAX_WAIT, 1000, true);
BridgeServerImpl bs = (BridgeServerImpl)cache.getBridgeServers()
.iterator().next();
assertNotNull(bs);
assertNotNull(bs.getAcceptor());
assertNotNull(bs.getAcceptor().getCacheClientNotifier());
final CacheClientNotifier ccn = bs.getAcceptor().getCacheClientNotifier();
wc = new WaitCriterion() {
String excuse;
public boolean done() {
return ccn.getClientProxies().size() > 0;
}
public String description() {
return excuse;
}
};
DistributedTestCase.waitForCriterion(wc, MAX_WAIT, 1000, true);
Iterator iter_prox = ccn.getClientProxies().iterator();
if (iter_prox.hasNext()) {
CacheClientProxy proxy = (CacheClientProxy)iter_prox.next();
assertFalse("Dispatcher on secondary should not be alive",
proxy._messageDispatcher.isAlive());
}
}
catch (Exception ex) {
fail("while setting verifyDispatcherIsNotAlive " + ex);
}
}
public static void createEntriesK1andK2OnServer()
{
try {
Region r1 = cache.getRegion(Region.SEPARATOR+ REGION_NAME);
assertNotNull(r1);
if (!r1.containsKey(k1)) {
r1.create(k1, server_k1);
}
if (!r1.containsKey(k2)) {
r1.create(k2, server_k2);
}
assertEquals(r1.getEntry(k1).getValue(), server_k1);
assertEquals(r1.getEntry(k2).getValue(), server_k2);
}
catch (Exception ex) {
fail("failed while createEntries()", ex);
}
}
public static void createEntriesK1andK2()
{
try {
Region r1 = cache.getRegion(Region.SEPARATOR+ REGION_NAME);
assertNotNull(r1);
if (!r1.containsKey(k1)) {
r1.create(k1, client_k1);
}
if (!r1.containsKey(k2)) {
r1.create(k2, client_k2);
}
assertEquals(r1.getEntry(k1).getValue(), client_k1);
assertEquals(r1.getEntry(k2).getValue(), client_k2);
}
catch (Exception ex) {
fail("failed while createEntries()", ex);
}
}
public static void createServerEntriesK1andK2()
{
try {
Region r1 = cache.getRegion(Region.SEPARATOR+ REGION_NAME);
assertNotNull(r1);
if (!r1.containsKey(k1)) {
r1.create(k1, server_k1);
}
if (!r1.containsKey(k2)) {
r1.create(k2, server_k2);
}
assertEquals(r1.getEntry(k1).getValue(), server_k1);
assertEquals(r1.getEntry(k2).getValue(), server_k2);
}
catch (Exception ex) {
fail("failed while createEntries()", ex);
}
}
public static void registerK1AndK2()
{
try {
Region r = cache.getRegion(Region.SEPARATOR+REGION_NAME);
assertNotNull(r);
List list = new ArrayList();
list.add(k1);
list.add(k2);
r.registerInterest(list, InterestResultPolicy.KEYS_VALUES);
}
catch (Exception ex) {
ex.printStackTrace();
fail("failed while region.registerK1AndK2()", ex);
}
}
public static void reRegisterK1AndK2()
{
try {
Region r = cache.getRegion(Region.SEPARATOR+REGION_NAME);
assertNotNull(r);
List list = new ArrayList();
list.add(k1);
list.add(k2);
r.registerInterest(list);
}
catch (Exception ex) {
fail("failed while region.reRegisterK1AndK2()", ex);
}
}
public static void startServer()
{
try {
Cache c = CacheFactory.getAnyInstance();
assertEquals("More than one BridgeServer", 1, c.getBridgeServers().size());
BridgeServerImpl bs = (BridgeServerImpl) c.getBridgeServers().iterator().next();
assertNotNull(bs);
bs.start();
}
catch (Exception ex) {
fail("while startServer() " + ex);
}
}
public static void startServerAndPause()
{
try {
Cache c = CacheFactory.getAnyInstance();
assertEquals("More than one BridgeServer", 1, c.getBridgeServers().size());
BridgeServerImpl bs = (BridgeServerImpl) c.getBridgeServers().iterator().next();
assertNotNull(bs);
bs.start();
Thread.sleep(10000);
}
catch (Exception ex) {
fail("while startServer() " + ex);
}
}
public static void stopServer()
{
try {
assertEquals("More than one BridgeServer", 1, cache.getBridgeServers().size());
BridgeServerImpl bs = (BridgeServerImpl) cache.getBridgeServers().iterator().next();
assertNotNull(bs);
bs.stop();
}
catch (Exception ex) {
fail("while setting stopServer " + ex);
}
}
public static void stopPrimaryAndRegisterK1AndK2AndVerifyResponse()
{
try {
LocalRegion r = (LocalRegion)cache.getRegion(Region.SEPARATOR+REGION_NAME);
assertNotNull(r);
ServerRegionProxy srp = new ServerRegionProxy(r);
WaitCriterion wc = new WaitCriterion() {
public boolean done() {
return pool.getConnectedServerCount() == 3;
}
public String description() {
return "connected server count never became 3";
}
};
DistributedTestCase.waitForCriterion(wc, 30 * 1000, 1000, true);
// close primaryEP
getPrimaryVM().invoke(HAInterestBaseTest.class, "stopServer");
List list = new ArrayList();
list.add(k1);
list.add(k2);
List serverKeys = srp.registerInterest(list, InterestType.KEY,
InterestResultPolicy.KEYS, false,
r.getAttributes().getDataPolicy().ordinal);
assertNotNull(serverKeys);
List resultKeys =(List) serverKeys.get(0) ;
assertEquals(2,resultKeys.size());
assertTrue(resultKeys.contains(k1));
assertTrue(resultKeys.contains(k2));
}
catch (Exception ex) {
fail("failed while region.stopPrimaryAndRegisterK1AndK2AndVerifyResponse()", ex);
}
}
public static void stopPrimaryAndUnregisterRegisterK1()
{
try {
LocalRegion r = (LocalRegion)cache.getRegion(Region.SEPARATOR+REGION_NAME);
assertNotNull(r);
ServerRegionProxy srp = new ServerRegionProxy(r);
WaitCriterion wc = new WaitCriterion() {
public boolean done() {
return pool.getConnectedServerCount() == 3;
}
public String description() {
return "connected server count never became 3";
}
};
DistributedTestCase.waitForCriterion(wc, 30 * 1000, 1000, true);
// close primaryEP
getPrimaryVM().invoke(HAInterestBaseTest.class, "stopServer");
List list = new ArrayList();
list.add(k1);
srp.unregisterInterest(list, InterestType.KEY,false,false);
}
catch (Exception ex) {
fail("failed while region.stopPrimaryAndUnregisterRegisterK1()", ex);
}
}
public static void stopBothPrimaryAndSecondaryAndRegisterK1AndK2AndVerifyResponse()
{
try {
LocalRegion r = (LocalRegion)cache.getRegion(Region.SEPARATOR+REGION_NAME);
assertNotNull(r);
ServerRegionProxy srp = new ServerRegionProxy(r);
WaitCriterion wc = new WaitCriterion() {
public boolean done() {
return pool.getConnectedServerCount() == 3;
}
public String description() {
return "connected server count never became 3";
}
};
DistributedTestCase.waitForCriterion(wc, 30 * 1000, 1000, true);
// close primaryEP
VM backup = getBackupVM();
getPrimaryVM().invoke(HAInterestBaseTest.class, "stopServer");
//close secondary
backup.invoke(HAInterestBaseTest.class, "stopServer");
List list = new ArrayList();
list.add(k1);
list.add(k2);
List serverKeys = srp.registerInterest(list, InterestType.KEY,
InterestResultPolicy.KEYS, false,
r.getAttributes().getDataPolicy().ordinal);
assertNotNull(serverKeys);
List resultKeys =(List) serverKeys.get(0) ;
assertEquals(2,resultKeys.size());
assertTrue(resultKeys.contains(k1));
assertTrue(resultKeys.contains(k2));
}
catch (Exception ex) {
fail("failed while region.stopBothPrimaryAndSecondaryAndRegisterK1AndK2AndVerifyResponse()", ex);
}
}
/**
* returns the secondary that was stopped
*/
public static VM stopSecondaryAndRegisterK1AndK2AndVerifyResponse()
{
try {
LocalRegion r = (LocalRegion)cache.getRegion(Region.SEPARATOR+REGION_NAME);
assertNotNull(r);
ServerRegionProxy srp = new ServerRegionProxy(r);
WaitCriterion wc = new WaitCriterion() {
public boolean done() {
return pool.getConnectedServerCount() == 3;
}
public String description() {
return "Never got three connected servers";
}
};
DistributedTestCase.waitForCriterion(wc, 90 * 1000, 1000, true);
// close secondary EP
VM result = getBackupVM();
result.invoke(HAInterestBaseTest.class, "stopServer");
List list = new ArrayList();
list.add(k1);
list.add(k2);
List serverKeys = srp.registerInterest(list, InterestType.KEY,
InterestResultPolicy.KEYS,
false, r.getAttributes().getDataPolicy().ordinal);
assertNotNull(serverKeys);
List resultKeys =(List) serverKeys.get(0) ;
assertEquals(2,resultKeys.size());
assertTrue(resultKeys.contains(k1));
assertTrue(resultKeys.contains(k2));
return result;
}
catch (Exception ex) {
fail("failed while region.stopSecondaryAndRegisterK1AndK2AndVerifyResponse()", ex);
return null;
}
}
/**
* returns the secondary that was stopped
*/
public static VM stopSecondaryAndUNregisterK1()
{
try {
LocalRegion r = (LocalRegion)cache.getRegion(Region.SEPARATOR+REGION_NAME);
assertNotNull(r);
ServerRegionProxy srp = new ServerRegionProxy(r);
WaitCriterion wc = new WaitCriterion() {
public boolean done() {
return pool.getConnectedServerCount() == 3;
}
public String description() {
return "connected server count never became 3";
}
};
DistributedTestCase.waitForCriterion(wc, 30 * 1000, 1000, true);
// close secondary EP
VM result = getBackupVM();
result.invoke(HAInterestBaseTest.class, "stopServer");
List list = new ArrayList();
list.add(k1);
srp.unregisterInterest(list, InterestType.KEY,false,false);
return result;
}
catch (Exception ex) {
fail("failed while region.stopSecondaryAndUNregisterK1()", ex);
return null;
}
}
public static void registerK1AndK2OnPrimaryAndSecondaryAndVerifyResponse()
{
try {
ServerLocation primary = pool.getPrimary();
ServerLocation secondary = (ServerLocation)pool.getRedundants().get(0);
LocalRegion r = (LocalRegion)cache.getRegion(Region.SEPARATOR+REGION_NAME);
assertNotNull(r);
ServerRegionProxy srp = new ServerRegionProxy(r);
List list = new ArrayList();
list.add(k1);
list.add(k2);
//Primary server
List serverKeys1 = srp.registerInterestOn(primary, list,
InterestType.KEY, InterestResultPolicy.KEYS, false, r.getAttributes()
.getDataPolicy().ordinal);
assertNotNull(serverKeys1);
//expect serverKeys in response from primary
List resultKeys =(List) serverKeys1.get(0) ;
assertEquals(2,resultKeys.size());
assertTrue(resultKeys.contains(k1));
assertTrue(resultKeys.contains(k2));
//Secondary server
List serverKeys2 = srp.registerInterestOn(secondary, list,
InterestType.KEY, InterestResultPolicy.KEYS, false, r.getAttributes()
.getDataPolicy().ordinal);
// if the list is null then it is empty
if (serverKeys2 != null) {
// no serverKeys in response from secondary
assertTrue(serverKeys2.isEmpty());
}
}
catch (Exception ex) {
fail("failed while region.registerK1AndK2OnPrimaryAndSecondaryAndVerifyResponse()", ex);
}
}
public static void verifyInterestRegistration()
{
try {
WaitCriterion wc = new WaitCriterion() {
String excuse;
public boolean done() {
return cache.getBridgeServers().size() == 1;
}
public String description() {
return excuse;
}
};
DistributedTestCase.waitForCriterion(wc, MAX_WAIT, 1000, true);
BridgeServerImpl bs = (BridgeServerImpl)cache.getBridgeServers()
.iterator().next();
assertNotNull(bs);
assertNotNull(bs.getAcceptor());
assertNotNull(bs.getAcceptor().getCacheClientNotifier());
final CacheClientNotifier ccn = bs.getAcceptor().getCacheClientNotifier();
wc = new WaitCriterion() {
String excuse;
public boolean done() {
return ccn.getClientProxies().size() > 0;
}
public String description() {
return excuse;
}
};
DistributedTestCase.waitForCriterion(wc, MAX_WAIT, 1000, true);
Iterator iter_prox = ccn.getClientProxies().iterator();
if (iter_prox.hasNext()) {
final CacheClientProxy ccp =(CacheClientProxy)iter_prox.next();
wc = new WaitCriterion() {
String excuse;
public boolean done() {
Set keysMap = (Set)ccp.cils[RegisterInterestTracker.interestListIndex]
.getProfile(Region.SEPARATOR + REGION_NAME)
.getKeysOfInterestFor(ccp.getProxyID());
return keysMap != null && keysMap.size() == 2;
}
public String description() {
return excuse;
}
};
DistributedTestCase.waitForCriterion(wc, MAX_WAIT, 1000, true);
Set keysMap = (Set)ccp.cils[RegisterInterestTracker.interestListIndex]
.getProfile( Region.SEPARATOR + REGION_NAME)
.getKeysOfInterestFor(ccp.getProxyID());
assertNotNull(keysMap);
assertEquals(2, keysMap.size());
assertTrue(keysMap.contains(k1));
assertTrue(keysMap.contains(k2));
}
}
catch (Exception ex) {
fail("while setting verifyInterestRegistration " + ex);
}
}
public static void verifyInterestUNRegistration()
{
try {
WaitCriterion wc = new WaitCriterion() {
String excuse;
public boolean done() {
return cache.getBridgeServers().size() == 1;
}
public String description() {
return excuse;
}
};
DistributedTestCase.waitForCriterion(wc, MAX_WAIT, 1000, true);
BridgeServerImpl bs = (BridgeServerImpl)cache.getBridgeServers()
.iterator().next();
assertNotNull(bs);
assertNotNull(bs.getAcceptor());
assertNotNull(bs.getAcceptor().getCacheClientNotifier());
final CacheClientNotifier ccn = bs.getAcceptor().getCacheClientNotifier();
wc = new WaitCriterion() {
String excuse;
public boolean done() {
return ccn.getClientProxies().size() > 0;
}
public String description() {
return excuse;
}
};
DistributedTestCase.waitForCriterion(wc, MAX_WAIT, 1000, true);
Iterator iter_prox = ccn.getClientProxies().iterator();
if (iter_prox.hasNext()) {
final CacheClientProxy ccp =(CacheClientProxy)iter_prox.next();
wc = new WaitCriterion() {
String excuse;
public boolean done() {
Set keysMap = (Set)ccp.cils[RegisterInterestTracker.interestListIndex]
.getProfile(Region.SEPARATOR + REGION_NAME)
.getKeysOfInterestFor(ccp.getProxyID());
return keysMap != null;
}
public String description() {
return excuse;
}
};
DistributedTestCase.waitForCriterion(wc, MAX_WAIT, 1000, true);
Set keysMap = (Set)ccp.cils[RegisterInterestTracker.interestListIndex]
.getProfile(Region.SEPARATOR + REGION_NAME)
.getKeysOfInterestFor(ccp.getProxyID());
assertNotNull(keysMap);
assertEquals(1, keysMap.size());
assertFalse(keysMap.contains(k1));
assertTrue(keysMap.contains(k2));
}
}
catch (Exception ex) {
fail("while setting verifyInterestUNRegistration " + ex);
}
}
private void createCache(Properties props) throws Exception
{
DistributedSystem ds = getSystem(props);
assertNotNull(ds);
ds.disconnect();
ds = getSystem(props);
cache = CacheFactory.create(ds);
assertNotNull(cache);
}
public static void createClientPoolCache(String testName,String host) throws Exception
{
Properties props = new Properties();
props.setProperty(DistributionConfig.MCAST_PORT_NAME, "0");
props.setProperty(DistributionConfig.LOCATORS_NAME, "");
new HAInterestBaseTest("temp").createCache(props);
CacheServerTestUtil.disableShufflingOfEndpoints();
PoolImpl p;
try {
p = (PoolImpl)PoolManager.createFactory()
.addServer(host, PORT1)
.addServer(host, PORT2)
.addServer(host, PORT3)
.setSubscriptionEnabled(true)
.setSubscriptionRedundancy(-1)
.setReadTimeout(1000)
.setPingInterval(1000)
// retryInterval should be more so that only registerInterste thread will initiate failover
// .setRetryInterval(20000)
.create("HAInterestBaseTestPool");
} finally {
CacheServerTestUtil.enableShufflingOfEndpoints();
}
AttributesFactory factory = new AttributesFactory();
factory.setScope(Scope.LOCAL);
factory.setConcurrencyChecksEnabled(true);
factory.setPoolName(p.getName());
cache.createRegion(REGION_NAME, factory.create());
pool = p;
conn = pool.acquireConnection();
assertNotNull(conn);
}
public static void createClientPoolCacheWithSmallRetryInterval(String testName,String host) throws Exception
{
Properties props = new Properties();
props.setProperty(DistributionConfig.MCAST_PORT_NAME, "0");
props.setProperty(DistributionConfig.LOCATORS_NAME, "");
new HAInterestBaseTest("temp").createCache(props);
CacheServerTestUtil.disableShufflingOfEndpoints();
PoolImpl p;
try {
p = (PoolImpl)PoolManager.createFactory()
.addServer(host, PORT1)
.addServer(host, PORT2)
.setSubscriptionEnabled(true)
.setSubscriptionRedundancy(-1)
.setReadTimeout(1000)
.setSocketBufferSize(32768)
.setMinConnections(6)
.setPingInterval(200)
// .setRetryInterval(200)
// retryAttempts 3
.create("HAInterestBaseTestPool");
} finally {
CacheServerTestUtil.enableShufflingOfEndpoints();
}
AttributesFactory factory = new AttributesFactory();
factory.setScope(Scope.LOCAL);
factory.setConcurrencyChecksEnabled(true);
factory.setPoolName(p.getName());
cache.createRegion(REGION_NAME, factory.create());
pool = p;
conn = pool.acquireConnection();
assertNotNull(conn);
}
public static void createClientPoolCacheConnectionToSingleServer(String testName,String hostName) throws Exception
{
Properties props = new Properties();
props.setProperty(DistributionConfig.MCAST_PORT_NAME, "0");
props.setProperty(DistributionConfig.LOCATORS_NAME, "");
new HAInterestBaseTest("temp").createCache(props);
PoolImpl p = (PoolImpl)PoolManager.createFactory()
.addServer(hostName, PORT1)
.setSubscriptionEnabled(true)
.setSubscriptionRedundancy(-1)
.setReadTimeout(1000)
// .setRetryInterval(20)
.create("HAInterestBaseTestPool");
AttributesFactory factory = new AttributesFactory();
factory.setScope(Scope.LOCAL);
factory.setConcurrencyChecksEnabled(true);
factory.setPoolName(p.getName());
cache.createRegion(REGION_NAME, factory.create());
pool = p;
conn = pool.acquireConnection();
assertNotNull(conn);
}
public static Integer createServerCache() throws Exception
{
new HAInterestBaseTest("temp").createCache(new Properties());
AttributesFactory factory = new AttributesFactory();
factory.setScope(Scope.DISTRIBUTED_ACK);
factory.setEnableBridgeConflation(true);
factory.setMirrorType(MirrorType.KEYS_VALUES);
factory.setConcurrencyChecksEnabled(true);
cache.createRegion(REGION_NAME, factory.create());
BridgeServer server = cache.addBridgeServer();
int port = AvailablePort.getRandomAvailablePort(AvailablePort.SOCKET) ;
server.setPort(port);
server.setMaximumTimeBetweenPings(180000);
// ensures updates to be sent instead of invalidations
server.setNotifyBySubscription(true);
server.start();
return new Integer(server.getPort());
}
public static Integer createServerCacheWithLocalRegion() throws Exception
{
new HAInterestBaseTest("temp").createCache(new Properties());
AttributesFactory factory = new AttributesFactory();
factory.setScope(Scope.LOCAL);
factory.setConcurrencyChecksEnabled(true);
RegionAttributes attrs = factory.create();
cache.createRegion(REGION_NAME, attrs);
BridgeServer server = cache.addBridgeServer();
int port = AvailablePort.getRandomAvailablePort(AvailablePort.SOCKET) ;
server.setPort(port);
// ensures updates to be sent instead of invalidations
server.setNotifyBySubscription(true);
server.setMaximumTimeBetweenPings(180000);
server.start();
return new Integer(server.getPort());
}
public void tearDown2() throws Exception
{
super.tearDown2();
// close the clients first
closeCache();
// then close the servers
server1.invoke(HAInterestBaseTest.class, "closeCache");
server2.invoke(HAInterestBaseTest.class, "closeCache");
server3.invoke(HAInterestBaseTest.class, "closeCache");
CacheServerTestUtil.resetDisableShufflingOfEndpointsFlag();
}
public static void closeCache()
{
PoolImpl.AFTER_REGISTER_CALLBACK_FLAG = false ;
PoolImpl.BEFORE_PRIMARY_IDENTIFICATION_FROM_BACKUP_CALLBACK_FLAG = false ;
PoolImpl.BEFORE_RECOVER_INTEREST_CALLBACK_FLAG = false ;
PoolImpl.BEFORE_REGISTER_CALLBACK_FLAG = false ;
HAInterestBaseTest.isAfterRegistrationCallbackCalled = false ;
HAInterestBaseTest.isBeforeInterestRecoveryCallbackCalled = false ;
HAInterestBaseTest.isBeforeRegistrationCallbackCalled = false ;
if (cache != null && !cache.isClosed()) {
cache.close();
cache.getDistributedSystem().disconnect();
}
}
}