blob: db5a5f2362eb5aca87a697993a46622da0bd459d [file] [log] [blame]
/*=========================================================================
* Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved.
* This product is protected by U.S. and international copyright
* and intellectual property laws. Pivotal products are covered by
* one or more patents listed at http://www.pivotal.io/patents.
*=========================================================================
*/
/*
* Created on Feb 3, 2006
*
*/
package com.gemstone.gemfire.internal.cache.tier.sockets;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.util.Map;
import java.util.Properties;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import com.gemstone.gemfire.cache.AttributesFactory;
import com.gemstone.gemfire.cache.Cache;
import com.gemstone.gemfire.cache.CacheFactory;
import com.gemstone.gemfire.cache.EntryEvent;
import com.gemstone.gemfire.cache.Region;
import com.gemstone.gemfire.cache.RegionAttributes;
import com.gemstone.gemfire.cache.Scope;
import com.gemstone.gemfire.cache.client.internal.BridgePoolImpl;
import com.gemstone.gemfire.cache.client.internal.Connection;
import com.gemstone.gemfire.cache.client.internal.PutOp;
import com.gemstone.gemfire.cache.client.internal.QueueStateImpl.SequenceIdAndExpirationObject;
import com.gemstone.gemfire.cache.util.BridgeServer;
import com.gemstone.gemfire.cache.util.CacheListenerAdapter;
import com.gemstone.gemfire.distributed.DistributedSystem;
import com.gemstone.gemfire.distributed.internal.DistributionConfig;
import com.gemstone.gemfire.internal.AvailablePort;
import com.gemstone.gemfire.internal.cache.EntryEventImpl;
import com.gemstone.gemfire.internal.cache.EventID;
import com.gemstone.gemfire.internal.cache.ha.ThreadIdentifier;
import com.gemstone.gemfire.test.junit.categories.IntegrationTest;
import dunit.DistributedTestCase;
import dunit.DistributedTestCase.WaitCriterion;
/**
* @author asif
*
* Tests the functionality of operations of AbstractConnectionProxy & its
* derived classes.
*/
@Category(IntegrationTest.class)
public class ConnectionProxyJUnitTest
{
private static final String expectedRedundantErrorMsg = "Could not find any server to create redundant client queue on.";
private static final String expectedPrimaryErrorMsg = "Could not find any server to create primary client queue on.";
DistributedSystem system;
Cache cache;
BridgePoolImpl proxy = null;
SequenceIdAndExpirationObject seo = null;
@Before
public void setUp() throws Exception
{
Properties p = new Properties();
p.setProperty(DistributionConfig.MCAST_PORT_NAME, "0");
p.setProperty(DistributionConfig.LOCATORS_NAME, "");
this.system = DistributedSystem.connect(p);
this.cache = CacheFactory.create(system);
final String addExpectedPEM =
"<ExpectedException action=add>" + expectedPrimaryErrorMsg + "</ExpectedException>";
final String addExpectedREM =
"<ExpectedException action=add>" + expectedRedundantErrorMsg + "</ExpectedException>";
system.getLogWriter().info(addExpectedPEM);
system.getLogWriter().info(addExpectedREM);
}
@After
public void tearDown() throws Exception
{
this.cache.close();
final String removeExpectedPEM =
"<ExpectedException action=remove>" + expectedPrimaryErrorMsg + "</ExpectedException>";
final String removeExpectedREM =
"<ExpectedException action=remove>" + expectedRedundantErrorMsg + "</ExpectedException>";
system.getLogWriter().info(removeExpectedPEM);
system.getLogWriter().info(removeExpectedREM);
this.system.disconnect();
if (proxy != null)
proxy.close();
}
/**
* This test verifies the behaviour of client request when the listener on the
* server sits forever. This is done in following steps:<br>
* 1)create server<br>
* 2)initialize proxy object and create region for client having a
* CacheListener and make afterCreate in the listener to wait infinitely<br>
* 3)perform a PUT on client by acquiring Connection through proxy<br>
* 4)Verify that exception occurs due to infinite wait in the listener<br>
* 5)Verify that above exception occurs sometime after the readTimeout
* configured for the client <br>
*
* @author Dinesh
*/
public void DISABLE_testListenerOnServerSitForever()
{
int port3 = AvailablePort.getRandomAvailablePort(AvailablePort.SOCKET);
Region testRegion = null ;
try {
BridgeServer server = this.cache.addBridgeServer();
server.setMaximumTimeBetweenPings(10000);
server.setPort(port3);
server.start();
}
catch (Exception e) {
e.printStackTrace();
fail("Failed to create server");
}
try {
Properties props = new Properties();
props.setProperty("retryAttempts", "1");
props.setProperty("endpoints", "ep1=localhost:"+port3);
props.setProperty("establishCallbackConnection", "false");
props.setProperty("LBPolicy", "Sticky");
props.setProperty("readTimeout", "2000");
props.setProperty("socketBufferSize", "32768");
props.setProperty("retryInterval", "10000");
props.setProperty("connectionsPerServer", "2");
props.setProperty("redundancyLevel", "-1");
AttributesFactory factory = new AttributesFactory();
factory.setScope(Scope.DISTRIBUTED_ACK);
factory.setCacheListener(new CacheListenerAdapter() {
public void afterCreate(EntryEvent event)
{
synchronized (ConnectionProxyJUnitTest.this) {
try {
ConnectionProxyJUnitTest.this.wait();
}
catch (InterruptedException e) {
fail("interrupted");
}
}
}
});
proxy = BridgePoolImpl.create(props, true/*useByBridgeWriter*/);
RegionAttributes attrs = factory.create();
testRegion = cache.createRegion("testregion", attrs);
}
catch (Exception ex) {
ex.printStackTrace();
fail("Failed to initialize client");
}
Connection conn = (proxy).acquireConnection();
long t1 = 0;
try {
t1 = System.currentTimeMillis();
EntryEventImpl event = new EntryEventImpl((Object)null);
try {
event.setEventId(new EventID(new byte[] { 1 },1,1));
PutOp.execute(conn, proxy, testRegion.getFullPath(), "key1", "val1", event, null, false);
} finally {
event.release();
}
fail("Test failed as exception was expected");
}
catch (Exception e) {
long t2 = System.currentTimeMillis();
long net = (t2 - t1);
assertTrue(net / 1000 < 5);
}
synchronized (ConnectionProxyJUnitTest.this) {
ConnectionProxyJUnitTest.this.notify();
}
}
/**
* Tests the DeadServerMonitor when identifying an
* Endpoint as alive , does not create a persistent Ping connection
* ( i.e sends a CLOSE protocol , if the number of connections is zero.
* @author Asif
*/
@Test
public void testDeadServerMonitorPingNature1() {
int port3 = AvailablePort.getRandomAvailablePort(AvailablePort.SOCKET);
// final int maxWaitTime = 10000;
try {
Properties props = new Properties();
props.setProperty("retryAttempts", "1");
props.setProperty("endpoints", "ep1=localhost:"+port3);
props.setProperty("establishCallbackConnection", "false");
props.setProperty("LBPolicy", "Sticky");
props.setProperty("readTimeout", "2000");
props.setProperty("socketBufferSize", "32768");
props.setProperty("retryInterval", "500");
props.setProperty("connectionsPerServer", "1");
proxy = BridgePoolImpl.create(props, true/*useByBridgeWriter*/);
}catch (Exception ex) {
ex.printStackTrace();
fail("Failed to initialize client");
}
try {
(proxy).acquireConnection();
}catch(Exception ok) {
ok.printStackTrace();
}
try {
(proxy).acquireConnection();
}catch(Exception ok) {
ok.printStackTrace();
}
// long start = System.currentTimeMillis();
assertEquals(0, proxy.getConnectedServerCount());
//start the server
BridgeServer server = null;
try {
try {
server = this.cache.addBridgeServer();
server.setMaximumTimeBetweenPings(15000);
server.setPort(port3);
server.start();
}
catch (Exception e) {
e.printStackTrace();
fail("Failed to create server");
}
WaitCriterion ev = new WaitCriterion() {
public boolean done() {
return proxy.getConnectedServerCount() == 1;
}
public String description() {
return null;
}
};
DistributedTestCase.waitForCriterion(ev, 90 * 1000, 200, true);
}
finally {
if (server != null) {
server.stop();
}
}
}
/**
* Tests the DeadServerMonitor when identifying an
* Endpoint as alive , does creates a persistent Ping connection
* ( i.e sends a PING protocol , if the number of connections is more than
* zero.
* @author Asif
*/
@Test
public void testDeadServerMonitorPingNature2() {
int port3 = AvailablePort.getRandomAvailablePort(AvailablePort.SOCKET);
// final int maxWaitTime = 10000;
try {
Properties props = new Properties();
props.setProperty("retryAttempts", "1");
props.setProperty("endpoints", "ep1=localhost:"+port3);
props.setProperty("establishCallbackConnection", "false");
props.setProperty("LBPolicy", "Sticky");
props.setProperty("readTimeout", "2000");
props.setProperty("socketBufferSize", "32768");
props.setProperty("retryInterval", "500");
props.setProperty("connectionsPerServer", "1");
proxy = BridgePoolImpl.create(props, true/*useByBridgeWriter*/);
}catch (Exception ex) {
ex.printStackTrace();
fail("Failed to initialize client");
}
//let LiveServerMonitor detect it as alive as the numConnection is more than zero
// long start = System.currentTimeMillis();
assertEquals(0, proxy.getConnectedServerCount());
//start the server
BridgeServer server = null;
try {
try {
server = this.cache.addBridgeServer();
server.setMaximumTimeBetweenPings(15000);
server.setPort(port3);
server.start();
}
catch (Exception e) {
e.printStackTrace();
fail("Failed to create server");
}
WaitCriterion ev = new WaitCriterion() {
public boolean done() {
return proxy.getConnectedServerCount() == 1;
}
public String description() {
return null;
}
};
DistributedTestCase.waitForCriterion(ev, 90 * 1000, 200, true);
}
finally {
if (server != null) {
server.stop();
}
}
}
@Test
public void testThreadIdToSequenceIdMapCreation()
{
int port3 = AvailablePort.getRandomAvailablePort(AvailablePort.SOCKET);
BridgeServer server = null;
try {
try {
server = this.cache.addBridgeServer();
server.setMaximumTimeBetweenPings(10000);
server.setPort(port3);
server.start();
}
catch (Exception e) {
e.printStackTrace();
fail("Failed to create server");
}
try {
Properties props = new Properties();
props.setProperty("endpoints", "ep1=localhost:" + port3);
props.setProperty("establishCallbackConnection", "true");
props.setProperty("redundancyLevel", "-1");
proxy = BridgePoolImpl.create(props, true/*useByBridgeWriter*/);
if (proxy.getThreadIdToSequenceIdMap() == null) {
fail(" ThreadIdToSequenceIdMap is null. ");
}
}
catch (Exception ex) {
ex.printStackTrace();
fail("Failed to initialize client");
}
}
finally {
if (server != null) {
try {
Thread.sleep(500);
}
catch (InterruptedException ie) {
fail("interrupted");
}
server.stop();
}
}
}
@Test
public void testThreadIdToSequenceIdMapExpiryPositive()
{
int port3 = AvailablePort.getRandomAvailablePort(AvailablePort.SOCKET);
BridgeServer server = null;
try {
try {
server = this.cache.addBridgeServer();
server.setMaximumTimeBetweenPings(10000);
server.setPort(port3);
server.start();
}
catch (Exception e) {
e.printStackTrace();
fail("Failed to create server");
}
try {
Properties props = new Properties();
props.setProperty("endpoints", "ep1=localhost:"+port3);
props.setProperty("redundancyLevel", "-1");
props.setProperty("clientAckInterval", "2000");
props.setProperty("messageTrackingTimeout", "4000");
props.setProperty("establishCallbackConnection", "true");
proxy = BridgePoolImpl.create(props, true/*useByBridgeWriter*/);
EventID eid = new EventID(new byte[0],1,1);
if(proxy.verifyIfDuplicate(eid)){
fail(" eid should not be duplicate as it is a new entry");
}
verifyExpiry(60 * 1000);
if(proxy.verifyIfDuplicate(eid)){
fail(" eid should not be duplicate as the previous entry should have expired ");
}
}
catch (Exception ex) {
ex.printStackTrace();
fail("Failed to initialize client");
}
}
finally {
if (server != null) {
server.stop();
}
}
}
@Test
public void testThreadIdToSequenceIdMapExpiryNegative()
{
int port3 = AvailablePort.getRandomAvailablePort(AvailablePort.SOCKET);
BridgeServer server = null;
try {
try {
server = this.cache.addBridgeServer();
server.setMaximumTimeBetweenPings(10000);
server.setPort(port3);
server.start();
}
catch (Exception e) {
e.printStackTrace();
fail("Failed to create server");
}
try {
Properties props = new Properties();
props.setProperty("endpoints", "ep1=localhost:"+port3);
props.setProperty("establishCallbackConnection", "true");
props.setProperty("redundancyLevel", "-1");
props.setProperty("messageTrackingTimeout", "10000");
proxy = BridgePoolImpl.create(props, true/*useByBridgeWriter*/);
final EventID eid = new EventID(new byte[0],1,1);
if(proxy.verifyIfDuplicate(eid)){
fail(" eid should not be duplicate as it is a new entry");
}
WaitCriterion ev = new WaitCriterion() {
public boolean done() {
return proxy.verifyIfDuplicate(eid);
}
public String description() {
return null;
}
};
DistributedTestCase.waitForCriterion(ev, 20 * 1000, 200, true);
}
catch (Exception ex) {
ex.printStackTrace();
fail("Failed to initialize client");
}
}
finally {
if (server != null) {
server.stop();
}
}
}
@Test
public void testThreadIdToSequenceIdMapConcurrency()
{
int port3 = AvailablePort.getRandomAvailablePort(AvailablePort.SOCKET);
BridgeServer server = null;
try {
try {
server = this.cache.addBridgeServer();
server.setMaximumTimeBetweenPings(10000);
server.setPort(port3);
server.start();
}
catch (Exception e) {
e.printStackTrace();
fail("Failed to create server");
}
try {
Properties props = new Properties();
props.setProperty("endpoints", "ep1=localhost:"+port3);
props.setProperty("redundancyLevel", "-1");
props.setProperty("clientAckInterval", "2000");
props.setProperty("messageTrackingTimeout", "5000");
props.setProperty("establishCallbackConnection", "true");
proxy = BridgePoolImpl.create(props, true/*useByBridgeWriter*/);
final int EVENT_ID_COUNT = 10000; // why 10,000?
EventID[] eid = new EventID[EVENT_ID_COUNT];
for (int i = 0; i < EVENT_ID_COUNT; i++) {
eid[i] = new EventID(new byte[0],i,i);
if(proxy.verifyIfDuplicate(eid[i])){
fail(" eid can never be duplicate, it is being created for the first time! ");
}
}
verifyExpiry(30 * 1000);
for (int i = 0; i < EVENT_ID_COUNT; i++) {
if(proxy.verifyIfDuplicate(eid[i])){
fail(" eid can not be found to be duplicate since the entry should have expired! "+i);
}
}
}
catch (Exception ex) {
ex.printStackTrace();
fail("Failed to initialize client");
}
}
finally {
if (server != null) {
server.stop();
}
}
}
@Test
public void testDuplicateSeqIdLesserThanCurrentSeqIdBeingIgnored()
{
int port3 = AvailablePort.getRandomAvailablePort(AvailablePort.SOCKET);
BridgeServer server = null;
try {
try {
server = this.cache.addBridgeServer();
server.setMaximumTimeBetweenPings(10000);
server.setPort(port3);
server.start();
}
catch (Exception e) {
e.printStackTrace();
fail("Failed to create server");
}
try {
Properties props = new Properties();
props.setProperty("endpoints", "ep1=localhost:"+port3);
props.setProperty("establishCallbackConnection", "true");
props.setProperty("redundancyLevel", "-1");
props.setProperty("messageTrackingTimeout", "100000");
proxy = BridgePoolImpl.create(props, true/*useByBridgeWriter*/);
EventID eid1 = new EventID(new byte[0],1,5);
if(proxy.verifyIfDuplicate(eid1)){
fail(" eid1 can never be duplicate, it is being created for the first time! ");
}
EventID eid2 = new EventID(new byte[0],1,2);
if(!proxy.verifyIfDuplicate(eid2)){
fail(" eid2 should be duplicate, seqId is less than highest (5)");
}
EventID eid3 = new EventID(new byte[0],1,3);
if(!proxy.verifyIfDuplicate(eid3)){
fail(" eid3 should be duplicate, seqId is less than highest (5)");
}
assertTrue(!proxy.getThreadIdToSequenceIdMap().isEmpty());
proxy.close();
}
catch (Exception ex) {
ex.printStackTrace();
fail("Failed to initialize client");
}
}
finally {
if (server != null) {
server.stop();
}
}
}
@Test
public void testCleanCloseOfThreadIdToSeqId()
{
int port3 = AvailablePort.getRandomAvailablePort(AvailablePort.SOCKET);
BridgeServer server = null;
try {
try {
server = this.cache.addBridgeServer();
server.setMaximumTimeBetweenPings(10000);
server.setPort(port3);
server.start();
}
catch (Exception e) {
e.printStackTrace();
fail("Failed to create server");
}
try {
Properties props = new Properties();
props.setProperty("endpoints", "ep1=localhost:"+port3);
props.setProperty("establishCallbackConnection", "true");
props.setProperty("redundancyLevel", "-1");
props.setProperty("messageTrackingTimeout", "100000");
proxy = BridgePoolImpl.create(props, true/*useByBridgeWriter*/);
EventID eid1 = new EventID(new byte[0],1,2);
if(proxy.verifyIfDuplicate(eid1)){
fail(" eid can never be duplicate, it is being created for the first time! ");
}
EventID eid2 = new EventID(new byte[0],1,3);
if(proxy.verifyIfDuplicate(eid2)){
fail(" eid can never be duplicate, since sequenceId is greater ");
}
if(!proxy.verifyIfDuplicate(eid2)){
fail(" eid had to be a duplicate, since sequenceId is equal ");
}
EventID eid3 = new EventID(new byte[0],1,1);
if(!proxy.verifyIfDuplicate(eid3)){
fail(" eid had to be a duplicate, since sequenceId is lesser ");
} }
catch (Exception ex) {
ex.printStackTrace();
fail("Failed to initialize client");
}
}
finally {
if (server != null) {
try {
Thread.sleep(500);
}
catch (InterruptedException ie) {
fail("interrupted");
}
server.stop();
}
}
}
@Test
public void testTwoClientsHavingDifferentThreadIdMaps()
{
int port3 = AvailablePort.getRandomAvailablePort(AvailablePort.SOCKET);
BridgeServer server = null;
try {
try {
server = this.cache.addBridgeServer();
server.setMaximumTimeBetweenPings(10000);
server.setPort(port3);
server.start();
}
catch (Exception e) {
e.printStackTrace();
fail("Failed to create server");
}
try {
Properties props = new Properties();
props.setProperty("endpoints", "ep1=localhost:"+port3);
props.setProperty("establishCallbackConnection", "true");
props.setProperty("redundancyLevel", "-1");
props.setProperty("messageTrackingTimeout", "100000");
BridgePoolImpl proxy1 = BridgePoolImpl.create(props, true/*useByBridgeWriter*/);
BridgePoolImpl proxy2 = BridgePoolImpl.create(props, true/*useByBridgeWriter*/);
Map map1 = proxy1.getThreadIdToSequenceIdMap();
Map map2 = proxy2.getThreadIdToSequenceIdMap();
assertTrue(!(map1==map2));
// Close the proxies to remove them from the set of all proxies
proxy1.close();
proxy2.close();
}
catch (Exception ex) {
ex.printStackTrace();
fail("Failed to initialize client");
}
}
finally {
if (server != null) {
server.stop();
}
}
}
@Test
public void testPeriodicAckSendByClient()
{
int port = AvailablePort.getRandomAvailablePort(AvailablePort.SOCKET);
BridgeServer server = null;
try {
try {
server = this.cache.addBridgeServer();
server.setPort(port);
server.start();
}
catch (Exception e) {
e.printStackTrace();
fail("Failed to create server");
}
try {
Properties props = new Properties();
props.setProperty("endpoints", "ep1=localhost:"+port);
props.setProperty("establishCallbackConnection", "true");
props.setProperty("redundancyLevel", "1");
props.setProperty("readTimeout", "20000");
props.setProperty("messageTrackingTimeout", "15000");
props.setProperty("clientAckInterval", "5000");
proxy = BridgePoolImpl.create(props, true/*useByBridgeWriter*/);
EventID eid = new EventID(new byte[0],1,1);
if(proxy.verifyIfDuplicate(eid)){
fail(" eid should not be duplicate as it is a new entry");
}
seo = (SequenceIdAndExpirationObject)proxy.
getThreadIdToSequenceIdMap().get(new ThreadIdentifier(new byte[0],1));
assertFalse(seo.getAckSend());
// should send the ack to server
seo = (SequenceIdAndExpirationObject)proxy.
getThreadIdToSequenceIdMap().get(new ThreadIdentifier(new byte[0],1));
verifyAckSend(60 * 1000,true);
// New update on same threadId
eid = new EventID(new byte[0],1,2);
if(proxy.verifyIfDuplicate(eid)){
fail(" eid should not be duplicate as it is a new entry");
}
seo = (SequenceIdAndExpirationObject)proxy.
getThreadIdToSequenceIdMap().get(new ThreadIdentifier(new byte[0],1));
assertFalse(seo.getAckSend());
// should send another ack to server
seo = (SequenceIdAndExpirationObject)proxy.
getThreadIdToSequenceIdMap().get(new ThreadIdentifier(new byte[0],1));
verifyAckSend(6000,true);
// should expire with the this mentioned.
verifyExpiry(15 * 1000);
}
catch (Exception ex) {
ex.printStackTrace();
fail("Test testPeriodicAckSendByClient Failed");
}
}
finally {
if (server != null) {
server.stop();
}
}
}
//No ack will be send if Redundancy level = 0
@Test
public void testNoAckSendByClient()
{
int port = AvailablePort.getRandomAvailablePort(AvailablePort.SOCKET);
BridgeServer server = null;
try {
try {
server = this.cache.addBridgeServer();
server.setPort(port);
server.start();
}
catch (Exception e) {
e.printStackTrace();
fail("Failed to create server");
}
try {
Properties props = new Properties();
props.setProperty("endpoints", "ep1=localhost:"+port);
props.setProperty("readTimeout", "20000");
props.setProperty("messageTrackingTimeout", "8000");
props.setProperty("clientAckInterval", "2000");
props.setProperty("establishCallbackConnection", "true");
proxy = BridgePoolImpl.create(props, true/*useByBridgeWriter*/);
EventID eid = new EventID(new byte[0],1,1);
if(proxy.verifyIfDuplicate(eid)){
fail(" eid should not be duplicate as it is a new entry");
}
seo = (SequenceIdAndExpirationObject)proxy.
getThreadIdToSequenceIdMap().get(new ThreadIdentifier(new byte[0],1));
assertFalse(seo.getAckSend());
// should not send an ack as redundancy level = 0;
seo = (SequenceIdAndExpirationObject)proxy.
getThreadIdToSequenceIdMap().get(new ThreadIdentifier(new byte[0],1));
verifyAckSend(30 * 1000, false);
// should expire without sending an ack as redundancy level = 0.
verifyExpiry(90 * 1000);
}
catch (Exception ex) {
ex.printStackTrace();
fail("Test testPeriodicAckSendByClient Failed");
}
}
finally {
if (server != null) {
server.stop();
}
}
}
private void verifyAckSend(long timeToWait, final boolean expectedAckSend )
{
WaitCriterion wc = new WaitCriterion() {
public boolean done() {
return expectedAckSend == seo.getAckSend();
}
public String description() {
return "ack flag never became " + expectedAckSend;
}
};
DistributedTestCase.waitForCriterion(wc, timeToWait, 1000, true);
}
private void verifyExpiry(long timeToWait)
{
WaitCriterion wc = new WaitCriterion() {
public boolean done() {
return 0 == proxy.getThreadIdToSequenceIdMap().size();
}
public String description() {
return "Entry never expired";
}
};
DistributedTestCase.waitForCriterion(wc, timeToWait * 2, 200, true);
}
}