blob: 5671ac0072c13cc81d440290c2e0985f641fe3ae [file] [log] [blame]
/*=========================================================================
* Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved.
* This product is protected by U.S. and international copyright
* and intellectual property laws. Pivotal products are covered by
* one or more patents listed at http://www.pivotal.io/patents.
*=========================================================================
*/
package com.gemstone.gemfire.internal.cache;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import com.gemstone.gemfire.cache.AttributesFactory;
import com.gemstone.gemfire.cache.Cache;
import com.gemstone.gemfire.cache.CacheException;
import com.gemstone.gemfire.cache.CacheFactory;
import com.gemstone.gemfire.cache.DataPolicy;
import com.gemstone.gemfire.cache.Region;
import com.gemstone.gemfire.cache.RegionAttributes;
import com.gemstone.gemfire.cache.Scope;
import com.gemstone.gemfire.cache.client.PoolManager;
import com.gemstone.gemfire.cache.client.internal.PoolImpl;
import com.gemstone.gemfire.cache.client.internal.Connection;
import com.gemstone.gemfire.cache.client.internal.ServerRegionProxy;
import com.gemstone.gemfire.cache.util.BridgeServer;
import com.gemstone.gemfire.cache30.BridgeTestCase;
import com.gemstone.gemfire.cache30.CacheSerializableRunnable;
import com.gemstone.gemfire.distributed.DistributedSystem;
import com.gemstone.gemfire.internal.AvailablePort;
import dunit.DistributedTestCase;
import dunit.Host;
import dunit.VM;
/**
* This is the Bug test for the bug 36773. Skipped sequence id causes missing
* entry in client. The test perform following operations in sequence.
* 1. Create Server1 & Server2
* 2. Create Client1 & Client2
* 3. Perform put operation from the client1 on key1 & key2 such a way that
* put with lower sequence id reaches primary after the put with greater sequence id.
* 4. Check for the keys on Client2
*
* @author Girish Thombare
*
*/
public class HABug36773DUnitTest extends DistributedTestCase
{
VM server1 = null;
VM server2 = null;
VM client1 = null;
VM client2 = null;
private static int PORT1;
private static int PORT2;
private static final String REGION_NAME = "HABug36773DUnitTest_region";
static final String KEY1 = "key1";
static final String KEY2 = "key2";
static final String VALUE1 = "newVal1";
static final String VALUE2 = "newVal2";
static volatile boolean waitFlag = true;
protected static Cache cache = null;
/** constructor */
public HABug36773DUnitTest(String name) {
super(name);
}
public void setUp() throws Exception {
super.setUp();
disconnectAllFromDS();
final Host host = Host.getHost(0);
// Server1 VM
server1 = host.getVM(0);
// Server2 VM
server2 = host.getVM(1);
// Client 1 VM
client1 = host.getVM(2);
// client 2 VM
client2 = host.getVM(3);
PORT1 = ((Integer)server1.invoke(HABug36773DUnitTest.class,
"createServerCache")).intValue();
PORT2 = ((Integer)server2.invoke(HABug36773DUnitTest.class,
"createServerCache")).intValue();
client1.invoke(HABug36773DUnitTest.class, "disableShufflingOfEndpoints");
client2.invoke(HABug36773DUnitTest.class, "disableShufflingOfEndpoints");
client1.invoke(HABug36773DUnitTest.class, "createClientCache",
new Object[] { new Integer(PORT1), new Integer(PORT2) });
client2.invoke(HABug36773DUnitTest.class, "createClientCache",
new Object[] { new Integer(PORT1), new Integer(PORT2) });
}
private void createCache(Properties props) throws Exception
{
DistributedSystem ds = getSystem(props);
cache = CacheFactory.create(ds);
assertNotNull(cache);
}
public static void disableShufflingOfEndpoints()
{
System.setProperty("gemfire.bridge.disableShufflingOfEndpoints", "true");
}
public void _testBug36773() throws Exception
{
//First create entries on both servers via the two client
client1.invoke(HABug36773DUnitTest.class, "createEntriesK1andK2");
client2.invoke(HABug36773DUnitTest.class, "createEntriesK1andK2");
client1.invoke(HABug36773DUnitTest.class, "registerKeysK1andK2");
client2.invoke(HABug36773DUnitTest.class, "registerKeysK1andK2");
server1.invoke(checkSizeRegion(2));
server2.invoke(checkSizeRegion(2));
client1.invoke(checkSizeRegion(2));
client2.invoke(checkSizeRegion(2));
server1.invoke(HABug36773DUnitTest.class, "waitOnTheKeyEntry");
client1.invoke(HABug36773DUnitTest.class,
"acquireConnectionsAndPut", new Object[] {new Integer(PORT2)});
client1.invoke(HABug36773DUnitTest.class,
"acquireConnectionsAndPut", new Object[] {new Integer(PORT1)});
client2.invoke(HABug36773DUnitTest.class, "verifyEntries", new Object[] {new String(KEY2), new String (VALUE2)});
server1.invoke(HABug36773DUnitTest.class, "notfiyThread");
client2.invoke(HABug36773DUnitTest.class, "verifyEntries", new Object[] {new String(KEY1), new String (VALUE1)});
}
public void testDummyForBug36773()
{
getLogWriter().info(" This is the dummy test for the Bug 36773");
}
private CacheSerializableRunnable checkSizeRegion(final int size)
{
CacheSerializableRunnable checkRegion = new CacheSerializableRunnable(
"checkSize") {
public void run2() throws CacheException
{
Region region = cache.getRegion(Region.SEPARATOR + REGION_NAME);
assertNotNull(region);
getLogWriter().info("Size of the region " + region.size());
assertEquals(size, region.size());
}
};
return checkRegion;
}
public static void acquireConnectionsAndPut(Integer portNumber)
{
try {
int port = portNumber.intValue();
Region r1 = cache.getRegion(Region.SEPARATOR + REGION_NAME);
assertNotNull(r1);
PoolImpl pool = (PoolImpl)PoolManager.find(r1.getAttributes().getPoolName());
assertNotNull(pool);
Connection conn = pool.acquireConnection();
final Connection conn1;
ServerRegionProxy srp = new ServerRegionProxy(Region.SEPARATOR + REGION_NAME, pool);
if (conn.getServer().getPort() != port) {
conn1 = pool.acquireConnection(); // Ensure we have a server
// with the
// proper port
}
else {
conn1 = conn;
}
assertNotNull(conn1);
if (port == PORT2) {
assertEquals(PORT2, conn1.getServer().getPort());
srp.putOnForTestsOnly(conn1, KEY1, VALUE1, new EventID(new byte[] { 1 }, 2, 1), null);
}
else if (port == PORT1) {
assertEquals(PORT1, conn1.getServer().getPort());
srp.putOnForTestsOnly(conn1, KEY2, VALUE2, new EventID(new byte[] { 1 }, 2, 2), null);
}
else {
fail("Invalid ports ");
}
}
catch (Exception ex) {
ex.printStackTrace();
fail("while setting acquireConnections " + ex);
}
}
public static void verifyEntries(String KEY, String VALUE)
{
Region r1 = cache.getRegion(Region.SEPARATOR + REGION_NAME);
assertNotNull(r1);
long maxWaitTime = 120000;
try {
long start = System.currentTimeMillis();
while (!r1.getEntry(KEY).getValue().equals(VALUE)) {
assertTrue("Waited over " + maxWaitTime + "entry to get updated",
(System.currentTimeMillis() - start) < maxWaitTime);
try {
Thread.yield();
Thread.sleep(700);
}
catch (InterruptedException ie) {
fail("Interrupted while waiting ", ie);
}
}
}
catch (Exception e) {
fail("Exception in trying to get due to " + e);
}
}
public static void createEntriesK1andK2()
{
try {
Region r1 = cache.getRegion(Region.SEPARATOR + REGION_NAME);
assertNotNull(r1);
if (!r1.containsKey(KEY1)) {
r1.create(KEY1, "key-1");
}
if (!r1.containsKey(KEY2)) {
r1.create(KEY2, "key-2");
}
assertEquals(r1.getEntry(KEY1).getValue(), "key-1");
assertEquals(r1.getEntry(KEY2).getValue(), "key-2");
}
catch (Exception ex) {
fail("failed while createEntriesK1andK2()", ex);
}
}
public static void notfiyThread()
{
waitFlag=false;
}
public static void createClientCache(Integer port1, Integer port2)
throws Exception
{
PORT1 = port1.intValue();
PORT2 = port2.intValue();
Properties props = new Properties();
props.setProperty("mcast-port", "0");
props.setProperty("locators", "");
new HABug36773DUnitTest("temp").createCache(props);
AttributesFactory factory = new AttributesFactory();
factory.setScope(Scope.DISTRIBUTED_ACK);
BridgeTestCase.configureConnectionPool(factory, DistributedTestCase.getIPLiteral(), new int[] {PORT1,PORT2}, true, -1, 2, null);
RegionAttributes attrs = factory.create();
cache.createRegion(REGION_NAME, attrs);
}
public static Integer createServerCache()
throws Exception
{
new HABug36773DUnitTest("temp").createCache(new Properties());
AttributesFactory factory = new AttributesFactory();
factory.setScope(Scope.DISTRIBUTED_ACK);
factory.setDataPolicy(DataPolicy.REPLICATE);
factory.setEarlyAck(true);
RegionAttributes attrs = factory.create();
cache.createRegion(REGION_NAME, attrs);
BridgeServer server = cache.addBridgeServer();
assertNotNull(server);
int port = AvailablePort.getRandomAvailablePort(AvailablePort.SOCKET);
server.setPort(port);
server.setNotifyBySubscription(true);
server.start();
return new Integer(server.getPort());
}
/**
*
* @param key Key in which client is interested
*/
public static void registerKeysK1andK2()
{
try {
Region r = cache.getRegion(Region.SEPARATOR + REGION_NAME);
assertNotNull(r);
List list = new ArrayList();
list.add(KEY1);
list.add(KEY2);
r.registerInterest(list);
}
catch (Exception ex) {
fail("failed while registering interest", ex);
}
}
public static void closeCache()
{
if (cache != null && !cache.isClosed()) {
cache.close();
cache.getDistributedSystem().disconnect();
}
}
public static void waitOnTheKeyEntry()
{
Thread thrd = new Thread() {
public void run()
{
LocalRegion region = (LocalRegion)cache.getRegion(Region.SEPARATOR
+ REGION_NAME);
RegionEntry regionEntry = region.basicGetEntry(KEY1);
synchronized (regionEntry) {
while (waitFlag) {
try {
Thread.sleep(500);
}
catch (InterruptedException e) {
fail("interrupted");
}
}
}
}
};
thrd.start();
}
public void tearDown2() throws Exception
{
//close client
client1.invoke(HABug36773DUnitTest.class, "closeCache");
client2.invoke(HABug36773DUnitTest.class, "closeCache");
//close server
server1.invoke(HABug36773DUnitTest.class, "closeCache");
server2.invoke(HABug36773DUnitTest.class, "closeCache");
}
}