blob: 384cb2aa0ab514c1f965bfca6a14c291dca81199 [file] [log] [blame]
/*=========================================================================
* Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved.
* This product is protected by U.S. and international copyright
* and intellectual property laws. Pivotal products are covered by
* one or more patents listed at http://www.pivotal.io/patents.
*=========================================================================
*/
package com.gemstone.gemfire.internal.cache.ha;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import com.gemstone.gemfire.cache.AttributesFactory;
import com.gemstone.gemfire.cache.Cache;
import com.gemstone.gemfire.cache.CacheFactory;
import com.gemstone.gemfire.cache.DataPolicy;
import com.gemstone.gemfire.cache.Region;
import com.gemstone.gemfire.cache.RegionAttributes;
import com.gemstone.gemfire.cache.Scope;
import com.gemstone.gemfire.cache30.BridgeTestCase;
import com.gemstone.gemfire.distributed.DistributedSystem;
import com.gemstone.gemfire.distributed.internal.DistributionConfig;
import com.gemstone.gemfire.internal.AvailablePort;
import com.gemstone.gemfire.internal.cache.BridgeServerImpl;
import dunit.DistributedTestCase;
import dunit.Host;
import dunit.VM;
/**
*
* This test verifies that all entry operations (create,put,destroy,invalidate)
* which propagate from one server1 to another server2 do get delivered to the
* client connected to server2 (server2 is primary for client)
*
*/
public class OperationsPropagationDUnitTest extends DistributedTestCase
{
/**
* Server1
*/
VM server1 = null;
/**
* Server2
*/
VM server2 = null;
/**
* Client
*/
VM client1 = null;
/**
* Port of server1
*/
public static int PORT1;
/**
* Port of server2
*/
public static int PORT2;
/**
* Name of the region
*/
private static final String REGION_NAME = "OperationsPropagationDUnitTest_Region";
/**
* cache
*/
private static Cache cache = null;
/**
* Constructor
*
* @param name
*/
public OperationsPropagationDUnitTest(String name) {
super(name);
}
/**
* Create the server1, server2 (in the same DS) and client1 (which is
* connected only to server2
*/
public void setUp() throws Exception
{
super.setUp();
final Host host = Host.getHost(0);
// Server1 VM
server1 = host.getVM(0);
// Server2 VM
server2 = host.getVM(1);
// Client 1 VM
client1 = host.getVM(2);
PORT1 = ((Integer)server1.invoke(OperationsPropagationDUnitTest.class,
"createServerCache")).intValue();
PORT2 = ((Integer)server2.invoke(OperationsPropagationDUnitTest.class,
"createServerCache")).intValue();
client1.invoke(OperationsPropagationDUnitTest.class, "createClientCache",
new Object[] { getServerHostName(host), new Integer(PORT2) });
}
/**
* close the caches of the client and the servers
*/
public void tearDown2() throws Exception
{
super.tearDown2();
client1.invoke(OperationsPropagationDUnitTest.class, "closeCache");
server1.invoke(OperationsPropagationDUnitTest.class, "closeCache");
server2.invoke(OperationsPropagationDUnitTest.class, "closeCache");
}
/**
* closes the cache and disconnects the vm from teh distributed system
*
*/
public static void closeCache()
{
if (cache != null && !cache.isClosed()) {
cache.close();
cache.getDistributedSystem().disconnect();
}
}
/**
* connect to the DS and create a cache
*
* @param props
* @throws Exception
*/
private void createCache(Properties props) throws Exception
{
DistributedSystem ds = getSystem(props);
assertNotNull(ds);
ds.disconnect();
ds = getSystem(props);
cache = CacheFactory.create(ds);
assertNotNull(cache);
}
/**
* region to be created for the test
*/
protected static Region region = null;
/**
* Create the server
*
* @return
* @throws Exception
*/
public static Integer createServerCache() throws Exception
{
new OperationsPropagationDUnitTest("temp").createCache(new Properties());
AttributesFactory factory = new AttributesFactory();
factory.setScope(Scope.DISTRIBUTED_ACK);
factory.setDataPolicy(DataPolicy.REPLICATE);
RegionAttributes attrs = factory.create();
region = cache.createRegion(REGION_NAME, attrs);
BridgeServerImpl server = (BridgeServerImpl)cache.addBridgeServer();
assertNotNull(server);
int port = AvailablePort.getRandomAvailablePort(AvailablePort.SOCKET);
server.setPort(port);
server.setNotifyBySubscription(true);
server.start();
return new Integer(server.getPort());
}
/**
* create the client and connect it to the server with the given port
*
* @param port2
* @throws Exception
*/
public static void createClientCache(String host, Integer port2) throws Exception
{
PORT2 = port2.intValue();
Properties props = new Properties();
props.setProperty(DistributionConfig.MCAST_PORT_NAME, "0");
props.setProperty(DistributionConfig.LOCATORS_NAME, "");
new OperationsPropagationDUnitTest("temp").createCache(props);
props.setProperty("retryAttempts", "2");
props.setProperty("endpoints", "ep1="+host+":" + PORT2);
props.setProperty("redundancyLevel", "-1");
props.setProperty("establishCallbackConnection", "true");
props.setProperty("LBPolicy", "Sticky");
props.setProperty("readTimeout", "2000");
props.setProperty("socketBufferSize", "1000");
props.setProperty("retryInterval", "250");
props.setProperty("connectionsPerServer", "2");
AttributesFactory factory = new AttributesFactory();
factory.setScope(Scope.DISTRIBUTED_ACK);
BridgeTestCase.configureConnectionPool(factory, host, PORT2,-1, true, -1, 2, null);
RegionAttributes attrs = factory.create();
region = cache.createRegion(REGION_NAME, attrs);
assertNotNull(region);
region.registerInterest("ALL_KEYS");
}
public static final String CREATE_KEY = "createKey";
public static final String CREATE_VALUE = "createValue";
public static final String UPDATE_KEY = "updateKey";
public static final String UPDATE_VALUE1 = "updateValue1";
public static final String UPDATE_VALUE2 = "updateValue2";
public static final String INVALIDATE_KEY = "invalidateKey";
public static final String INVALIDATE_VALUE = "invalidateValue";
public static final String DESTROY_KEY = "destroyKey";
public static final String DESTROY_VALUE = "destroyValue";
public static final String PUTALL_KEY = "putAllKey";
public static final String PUTALL_VALUE = "putAllValue";
public static final String PUTALL_KEY2 = "putAllKey2";
public static final String PUTALL_VALUE2 = "putAllValue2";
/**
* This test: 1) First the initial keys and values 2) Verify that the initial
* keys and values have reached the client 3) Do the operations which we want
* to propagate (create, update, invalidate and destroy) 4) Verify the
* operations reached the client
* 5) Do a removeAll
* 6) Verify it reached the client
*
* @throws Exception
*/
public void testOperationsPropagation() throws Exception
{
server1.invoke(OperationsPropagationDUnitTest.class, "initialPutKeyValue");
client1.invoke(OperationsPropagationDUnitTest.class,
"assertKeyValuePresent");
server1.invoke(OperationsPropagationDUnitTest.class, "doOperations");
client1.invoke(OperationsPropagationDUnitTest.class,
"assertOperationsSucceeded");
server1.invoke(OperationsPropagationDUnitTest.class, "doRemoveAll");
client1.invoke(OperationsPropagationDUnitTest.class,
"assertRemoveAllSucceeded");
}
/**
* put the initial keys and values
*
*/
public static void initialPutKeyValue()
{
try {
region.put(UPDATE_KEY, UPDATE_VALUE1);
region.put(INVALIDATE_KEY, INVALIDATE_VALUE);
region.put(DESTROY_KEY, DESTROY_VALUE);
}
catch (Exception e) {
fail(" Test failed due to " + e, e);
}
}
/**
* do the operations which you want to propagate
*
*/
public static void doOperations()
{
try {
region.create(CREATE_KEY, CREATE_VALUE);
region.put(UPDATE_KEY, UPDATE_VALUE2);
region.invalidate(INVALIDATE_KEY);
region.destroy(DESTROY_KEY);
Map map = new HashMap();
map.put(PUTALL_KEY,PUTALL_VALUE);
map.put(PUTALL_KEY2,PUTALL_VALUE2);
region.putAll(map);
}
catch (Exception e) {
fail(" Test failed due to " + e, e);
}
}
/**
* assert the initial key values are present
*
*/
public static void assertKeyValuePresent()
{
try {
WaitCriterion wc = new WaitCriterion() {
String excuse;
public boolean done() {
Object val = region.get(UPDATE_KEY);
return UPDATE_VALUE1.equals(val);
}
public String description() {
return excuse;
}
};
DistributedTestCase.waitForCriterion(wc, 60 * 1000, 1000, true);
/*
* if (!(region.get(UPDATE_KEY).equals(UPDATE_VALUE1))) { fail(" Expected
* value to be " + UPDATE_VALUE1 + " but it is " +
* region.get(UPDATE_KEY)); }
*/
wc = new WaitCriterion() {
String excuse;
public boolean done() {
Object val = region.get(INVALIDATE_KEY);
return INVALIDATE_VALUE.equals(val);
}
public String description() {
return excuse;
}
};
DistributedTestCase.waitForCriterion(wc, 60 * 1000, 1000, true);
/*
* if (!(region.get(INVALIDATE_KEY).equals(INVALIDATE_VALUE))) { fail("
* Expected value to be " + INVALIDATE_VALUE + " but it is " +
* region.get(INVALIDATE_KEY)); }
*/
wc = new WaitCriterion() {
String excuse;
public boolean done() {
Object val = region.get(DESTROY_KEY);
return DESTROY_VALUE.equals(val);
}
public String description() {
return excuse;
}
};
DistributedTestCase.waitForCriterion(wc, 60 * 1000, 1000, true);
/*
* if (!(region.get(DESTROY_KEY).equals(DESTROY_VALUE))) { fail(" Expected
* value to be " + DESTROY_VALUE + " but it is " +
* region.get(DESTROY_KEY)); }
*/
}
catch (Exception e) {
fail(" Test failed due to " + e, e);
}
}
/**
* assert the operations reached the client successfully
*
*/
public static void assertOperationsSucceeded()
{
try {
//Thread.sleep(5000);
WaitCriterion wc = new WaitCriterion() {
String excuse;
public boolean done() {
Object val = region.get(CREATE_KEY);
return CREATE_VALUE.equals(val);
}
public String description() {
return excuse;
}
};
DistributedTestCase.waitForCriterion(wc, 60 * 1000, 1000, true);
/*if (!(region.get(CREATE_KEY).equals(CREATE_VALUE))) {
fail("CREATE operation did not propagate to client : Expected value to be "
+ CREATE_VALUE + " but it is " + region.get(CREATE_KEY));
}*/
wc = new WaitCriterion() {
String excuse;
public boolean done() {
Object val = region.get(UPDATE_KEY);
return UPDATE_VALUE2.equals(val);
}
public String description() {
return excuse;
}
};
DistributedTestCase.waitForCriterion(wc, 60 * 1000, 1000, true);
/*if (!(region.get(UPDATE_KEY).equals(UPDATE_VALUE2))) {
fail(" UPDATE operation did not propagate to Client : Expected value to be "
+ UPDATE_VALUE2 + " but it is " + region.get(UPDATE_KEY));
}*/
wc = new WaitCriterion() {
String excuse;
public boolean done() {
return !region.containsKey(DESTROY_KEY);
}
public String description() {
return excuse;
}
};
DistributedTestCase.waitForCriterion(wc, 60 * 1000, 1000, true);
/*if (region.containsKey(DESTROY_KEY)) {
fail(" DESTROY operation did not propagate to Client : Expected "
+ DESTROY_KEY + " not to be present but it is ");
}*/
wc = new WaitCriterion() {
String excuse;
public boolean done() {
Object val = region.get(INVALIDATE_KEY);
return val == null;
}
public String description() {
return excuse;
}
};
DistributedTestCase.waitForCriterion(wc, 60 * 1000, 1000, true);
/*if (!(region.get(INVALIDATE_KEY) == null)) {
fail(" INVALIDATE operation did not propagate to Client : Expected value to be null but it is "
+ region.get(INVALIDATE_KEY));
}*/
wc = new WaitCriterion() {
String excuse;
public boolean done() {
Object val = region.get(PUTALL_KEY);
return PUTALL_VALUE.equals(val);
}
public String description() {
return excuse;
}
};
DistributedTestCase.waitForCriterion(wc, 60 * 1000, 1000, true);
/*
* if (!(region.get(PUTALL_KEY).equals(PUTALL_VALUE))) { fail("PUTALL
* operation did not propagate to client : Expected value to be " +
* PUTALL_VALUE + " but it is " + region.get(PUTALL_KEY)); }
*/
wc = new WaitCriterion() {
String excuse;
public boolean done() {
Object val = region.get(PUTALL_KEY2);
return PUTALL_VALUE2.equals(val);
}
public String description() {
return excuse;
}
};
DistributedTestCase.waitForCriterion(wc, 60 * 1000, 1000, true);
}
catch (Exception e) {
fail(" Test failed due to " + e, e);
}
}
public static void doRemoveAll() {
region.removeAll(Arrays.asList(PUTALL_KEY, PUTALL_KEY2));
}
public static void assertRemoveAllSucceeded() {
WaitCriterion wc = new WaitCriterion() {
String excuse;
public boolean done() {
Object val = region.get(PUTALL_KEY);
return !region.containsKey(PUTALL_KEY) && !region.containsKey(PUTALL_KEY2);
}
public String description() {
return excuse;
}
};
DistributedTestCase.waitForCriterion(wc, 60 * 1000, 1000, true);
}
}