blob: 861193dc45fbc67413d936351ff012c23bfb5b53 [file] [log] [blame]
/*=========================================================================
* Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved.
* This product is protected by U.S. and international copyright
* and intellectual property laws. Pivotal products are covered by
* one or more patents listed at http://www.pivotal.io/patents.
*=========================================================================
*/
package com.gemstone.gemfire.internal.cache.ha;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.fail;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Vector;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import com.gemstone.gemfire.cache.CacheException;
import com.gemstone.gemfire.internal.cache.Conflatable;
import com.gemstone.gemfire.internal.cache.EventID;
import com.gemstone.gemfire.test.junit.categories.IntegrationTest;
import dunit.DistributedTestCase;
/**
* Test runs all tests of HARQAddOperationJUnitTest using BlockingHARegionQueue
* instead of HARegionQueue
*
* @author Suyog Bhokare
*
*/
@Category(IntegrationTest.class)
public class BlockingHARQAddOperationJUnitTest extends
HARQAddOperationJUnitTest
{
/**
* Creates Blocking HA region-queue object
*
* @return Blocking HA region-queue object
* @throws IOException
* @throws ClassNotFoundException
* @throws CacheException
* @throws InterruptedException
*/
protected HARegionQueue createHARegionQueue(String name)
throws IOException, ClassNotFoundException, CacheException, InterruptedException
{
HARegionQueue regionqueue = HARegionQueue.getHARegionQueueInstance(name,
cache, HARegionQueue.BLOCKING_HA_QUEUE, false);
return regionqueue;
}
/**
* Creates Blocking HA region-queue object
*
* @return Blocking HA region-queue object
* @throws IOException
* @throws ClassNotFoundException
* @throws CacheException
* @throws InterruptedException
*/
protected HARegionQueue createHARegionQueue(String name,
HARegionQueueAttributes attrs) throws IOException, ClassNotFoundException, CacheException, InterruptedException
{
HARegionQueue regionqueue = HARegionQueue.getHARegionQueueInstance(name,
cache, attrs, HARegionQueue.BLOCKING_HA_QUEUE, false);
return regionqueue;
}
/**
* Tests the take() functionality of
* <code>BlockingHARegionQueue<code> with conflation disabled.
*
* @throws Exception
*/
@Test
public void testBlockingTakeConflationDisabled() throws Exception
{
this.logWriter
.info("HARQAddOperationJUnitTest : testBlockingTakeConflationDisabled BEGIN");
doBlockingTake(false);
this.logWriter
.info("HARQAddOperationJUnitTest : testBlockingTakeConflationDisabled END");
}
/**
* Tests the take() functionality of
* <code>BlockingHARegionQueue<code> with conflation enabled.
*
* @throws Exception
*
* @author Dinesh Patel
*/
@Test
public void testBlockingTakeConflationEnabled() throws Exception
{
this.logWriter
.info("HARQAddOperationJUnitTest : testBlockingTakeConflationEnabled BEGIN");
doBlockingTake(true);
this.logWriter
.info("HARQAddOperationJUnitTest : testBlockingTakeConflationEnabled END");
}
/**
* This method performs the following steps :<br>
* 1)Create a blocking queue and start a thread which does take() on it.
* 2)Verify after significant time that the thread is still alive as it should
* be blocked on take() since there are no events in the queue.<br>
* 3)Do a put into the queue and verify that the take thread returns with the
* same object.
*
* @param conflationEnabled -
* whether conflation is enabled or not
* @throws Exception
*
* @author Dinesh Patel
*/
public void doBlockingTake(boolean conflationEnabled) throws Exception
{
testFailed = false;
message = null;
final HARegionQueue rq = createHARegionQueue("testBlockingTake");
final List takenObjects = new ArrayList();
Thread takeThread = new Thread() {
public void run()
{
try {
takenObjects.add(rq.take());
}
catch (Exception e) {
testFailed = true;
message.append("Exception while performing take operation "
+ e.getStackTrace());
}
}
};
takeThread.start();
DistributedTestCase.staticPause(20 * 1000);
if (!takeThread.isAlive()) {
fail("take() thread died ");
}
EventID id1 = new EventID(new byte[] { 1 }, 1, 1);
ConflatableObject c1 = new ConflatableObject(KEY1, VALUE1, id1,
conflationEnabled, "region1");
rq.put(c1);
DistributedTestCase.join(takeThread, 20 * 1000, null);
assertEquals(1, takenObjects.size());
Conflatable obj = (Conflatable)takenObjects.get(0);
assertNotNull(obj);
assertEquals(id1, obj.getEventId());
if (testFailed)
fail("Test failed due to " + message);
}
/**
* This test performs the following steps :<br>
* 1)Create a blocking queue.<br>
* 2) Start two threads which does take() on it and add the return object to a
* list.<br>
* 3)Put two object into the queue. <br>
* 4)Verify both both take() threads return with an object by ensuring that
* the size of the list containing return objects is two.<br>
*
* @throws Exception
*
* @author Dinesh Patel
*/
@Test
public void testConcurrentBlockingTake() throws Exception
{
this.logWriter
.info("HARQAddOperationJUnitTest : testConcurrentBlockingTake BEGIN");
testFailed = false;
message = null;
final HARegionQueue rq = createHARegionQueue("testBlockingTake");
final List takenObjects = new Vector();
final int totalTakeThreads = 2;
Thread[] takeThreads = new Thread[totalTakeThreads];
for (int i = 0; i < totalTakeThreads; i++) {
takeThreads[i] = new Thread() {
public void run()
{
try {
takenObjects.add(rq.take());
}
catch (Exception e) {
testFailed = true;
message.append("Exception while performing take operation "
+ e.getStackTrace());
}
}
};
takeThreads[i].start();
}
Conflatable c = null;
EventID id = null;
for (int i = 0; i < totalTakeThreads; i++) {
id = new EventID(new byte[] { 1 }, 1, i);
c = new ConflatableObject("k" + i, "v" + i, id, true, "region1");
rq.put(c);
}
for (int i = 0; i < totalTakeThreads; i++) {
DistributedTestCase.join(takeThreads[i], 20 * 1000, null);
}
assertEquals(totalTakeThreads, takenObjects.size());
for (int i = 0; i < totalTakeThreads; i++) {
c = (Conflatable)takenObjects.get(i);
assertNotNull(c);
}
if (testFailed)
fail("Test failed due to " + message);
this.logWriter
.info("HARQAddOperationJUnitTest : testConcurrentBlockingTake END");
}
}