blob: 14c2b9e71cefba1bd0aa12c815b0bf2db44a83f2 [file] [log] [blame]
/*=========================================================================
* Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved.
* This product is protected by U.S. and international copyright
* and intellectual property laws. Pivotal products are covered by
* one or more patents listed at http://www.pivotal.io/patents.
*=========================================================================
*/
package com.gemstone.gemfire.internal.cache.ha;
import java.io.IOException;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import org.apache.logging.log4j.Logger;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import static org.junit.Assert.*;
import junit.framework.Assert;
import junit.framework.TestCase;
import com.gemstone.gemfire.LogWriter;
import com.gemstone.gemfire.cache.AttributesFactory;
import com.gemstone.gemfire.cache.Cache;
import com.gemstone.gemfire.cache.CacheException;
import com.gemstone.gemfire.cache.CacheFactory;
import com.gemstone.gemfire.cache.CacheListener;
import com.gemstone.gemfire.cache.DataPolicy;
import com.gemstone.gemfire.cache.EntryEvent;
import com.gemstone.gemfire.cache.Scope;
import com.gemstone.gemfire.cache.util.CacheListenerAdapter;
import com.gemstone.gemfire.distributed.DistributedSystem;
import com.gemstone.gemfire.internal.cache.Conflatable;
import com.gemstone.gemfire.internal.cache.EventID;
import com.gemstone.gemfire.internal.logging.LogService;
import com.gemstone.gemfire.test.junit.categories.IntegrationTest;
import dunit.DistributedTestCase;
/**
* Test to verify Add operation to HARegion Queue with and without conflation.
*
* @author Yogesh Mahajan
* @author Dinesh Patel
*/
@Category(IntegrationTest.class)
public class HARQAddOperationJUnitTest
{
private static final Logger logger = LogService.getLogger();
/** The cache instance */
protected Cache cache = null;
/** Logger for this test */
protected LogWriter logWriter = null;
/** The <code>RegionQueue</code> instance */
private HARegionQueue rq = null;
protected final static String KEY1 = "Key-1";
protected final static String KEY2 = "Key-2";
protected final static String VALUE1 = "Value-1";
protected final static String VALUE2 = "Value-2";
protected boolean testFailed = false;
protected StringBuffer message = null;
protected int barrierCount = 0;
volatile static int expiryCount = 0;
/**
* Create the cache in setup
*
* @throws Exception -
* thrown if any exception occurs in setUp
*/
@Before
public void setUp() throws Exception
{
this.cache = createCache();
this.logWriter = cache.getLogger();
}
/**
* Close the cache in tear down *
*
* @throws Exception -
* thrown if any exception occurs in tearDown
*/
@After
public void tearDown() throws Exception
{
this.cache.close();
}
/**
* Creates the cache instance for the test
*
* @return the cache instance
* @throws CacheException -
* thrown if any exception occurs in cache creation
*/
private Cache createCache() throws CacheException
{
return new CacheFactory().set("mcast-port", "0").create();
}
/**
* Creates HA region-queue object
*
* @return HA region-queue object
* @throws IOException
* @throws ClassNotFoundException
* @throws CacheException
* @throws InterruptedException
*/
protected HARegionQueue createHARegionQueue(String name)
throws IOException, ClassNotFoundException, CacheException, InterruptedException
{
AttributesFactory factory = new AttributesFactory();
factory.setDataPolicy(DataPolicy.REPLICATE);
factory.setScope(Scope.DISTRIBUTED_ACK);
HARegionQueue regionqueue = HARegionQueue.getHARegionQueueInstance(name,
cache, HARegionQueue.NON_BLOCKING_HA_QUEUE, false);
return regionqueue;
}
/**
* Creates HA region-queue object
*
* @return HA region-queue object
* @throws IOException
* @throws ClassNotFoundException
* @throws CacheException
* @throws InterruptedException
*/
protected HARegionQueue createHARegionQueue(String name,
HARegionQueueAttributes attrs) throws IOException, ClassNotFoundException, CacheException, InterruptedException
{
HARegionQueue regionqueue = HARegionQueue.getHARegionQueueInstance(name,
cache, attrs, HARegionQueue.NON_BLOCKING_HA_QUEUE, false);
return regionqueue;
}
/**
* Add operation with conflation : 1) Add Key1 Val1 & then Key1 Val2. 2) At
* the end of second operation , Region should contain the entry correspodning
* to value 2 3) Available IDs , Last DispatchedWrapper Set & Conflation Map
* should have size 1. 4) Conflation Map , LastDispatchedWrapper Set &
* Available IDs should have counter corresponding to second operation
*
*/
@Test
public void testQueueAddOperationWithConflation() throws Exception
{
this.logWriter
.info("HARegionQueueJUnitTest : testQueueAddOperationWithConflation BEGIN");
this.rq = createHARegionQueue("testQueueAddOperationWithConflation");
EventID id1 = new EventID(new byte[] { 1 }, 1, 1);
EventID id2 = new EventID(new byte[] { 1 }, 1, 2);
ConflatableObject c1 = new ConflatableObject(KEY1, VALUE1, id1, true,
"region1");
ConflatableObject c2 = new ConflatableObject(KEY1, VALUE2, id2, true,
"region1");
this.rq.put(c1);
this.rq.put(c2);
Map conflationMap = (Map)rq
.getConflationMapForTesting().get("region1");
assertEquals(1, conflationMap.size());
Long cntr = (Long)conflationMap.get(KEY1);
ConflatableObject retValue = (ConflatableObject)rq.getRegion().get(cntr);
assertEquals(VALUE2, retValue.getValueToConflate());
assertEquals(1, rq.getAvalaibleIds().size());
assertEquals(1, rq.getCurrentCounterSet(id1).size());
this.logWriter
.info("HARegionQueueJUnitTest : testQueueAddOperationWithConflation END");
}
/**
* Add operation without conflation : 1) Region should have an entry
* containing counter vs Conflatable object. 2) Events Map should have size as
* 1 with one ThreadIdentifer objecta s key & Last DispatchedWrapper as value.
* 3) This wrapper should have a set with size 1. 4) The available IDs set
* shoudl have size 1. 5) Put another object by same thread. 6) The wrapper
* set & availableIs List should have size 2 .
*
*/
@Test
public void testQueueAddOperationWithoutConflation() throws Exception
{
this.logWriter
.info("HARegionQueueJUnitTest : testQueueAddOperationWithoutConflation BEGIN");
this.rq = createHARegionQueue("testQueueAddOperationWithConflation");
EventID id1 = new EventID(new byte[] { 1 }, 1, 1);
EventID id2 = new EventID(new byte[] { 1 }, 1, 2);
ConflatableObject c1 = new ConflatableObject(KEY1, VALUE1, id1, false,
"region1");
ConflatableObject c2 = new ConflatableObject(KEY2, VALUE2, id2, false,
"region1");
this.rq.put(c1);
assertNull(rq.getConflationMapForTesting().get("region1"));
assertEquals(1, rq.getAvalaibleIds().size());
assertEquals(1, rq.getCurrentCounterSet(id1).size());
this.rq.put(c2);
assertNull(rq.getConflationMapForTesting().get("region1"));
assertEquals(2, rq.getAvalaibleIds().size());
assertEquals(2, rq.getCurrentCounterSet(id1).size());
Iterator iter = rq.getCurrentCounterSet(id1).iterator();
if (iter.hasNext()) {
Long cntr = (Long)iter.next();
ConflatableObject co = (ConflatableObject)this.rq.getRegion().get(cntr);
assertEquals(KEY1, co.getKeyToConflate());
assertEquals(VALUE1, co.getValueToConflate());
}
if (iter.hasNext()) {
Long cntr = (Long)iter.next();
ConflatableObject co = (ConflatableObject)this.rq.getRegion().get(cntr);
assertEquals(KEY2, co.getKeyToConflate());
assertEquals(VALUE2, co.getValueToConflate());
}
this.logWriter
.info("HARegionQueueJUnitTest : testQueueAddOperationWithoutConflation END");
}
/**
* Add operation without conflation followed by a Take operation: RegionSize,
* available IDs , LastDispatchedWrapper's Set should have size 0. Events map
* containg should have size 1 ( corresponding to the
* lastDispatchedAndCurrentEvent Wrapper objcet)
*
* @throws Exception
*/
@Test
public void testQueueAddTakeOperationWithoutConflation() throws Exception
{
this.logWriter
.info("HARegionQueueJUnitTest : testQueueAddTakeOperationWithoutConflation BEGIN");
this.rq = createHARegionQueue("testQueueAddOperationWithConflation");
EventID id = new EventID(new byte[] { 1 }, 1, 1);
ConflatableObject obj = new ConflatableObject(KEY1, VALUE1, id, true,
"region1");
this.rq.put(obj);
this.rq.take();
assertNull(rq.getRegion().get(KEY1));
assertEquals(0, this.rq.getAvalaibleIds().size());
Map eventsMap = this.rq.getEventsMapForTesting();
assertEquals(1, eventsMap.size());
assertEquals(0, rq.getCurrentCounterSet(id).size());
this.logWriter
.info("HARegionQueueJUnitTest : testQueueAddTakeOperationWithoutConflation END");
}
/**
* (Add opn followed by Take without conflation) :Add operation which creates
* the LastDispatchedAndCurrentEvents object should also add it to the Region
* with Threaddentifer as key & sequence as the value for Expiry. Perform a
* take operation. Validate that expiry on ThreadIdentifier removes itself
* from Events Map
*
*
*/
@Test
public void testExpiryOnThreadIdentifier()
{
try {
HARegionQueueAttributes attrs = new HARegionQueueAttributes();
attrs.setExpiryTime(2);
HARegionQueue regionqueue = createHARegionQueue("testing", attrs);
// create the conflatable object
EventID id = new EventID(new byte[] { 1 }, 1, 1);
ConflatableObject obj = new ConflatableObject(KEY1, VALUE1, id, true,
"region1");
ThreadIdentifier threadId = new ThreadIdentifier(obj.getEventId()
.getMembershipID(), obj.getEventId().getThreadID());
regionqueue.put(obj);
regionqueue.take();
Thread.sleep(25000);
assertFalse(
"ThreadIdentifier did not remove itself through expiry.The reqgion queue is of type="
+ regionqueue.getClass(), regionqueue.getRegion().containsKey(
threadId));
Map eventsMap = regionqueue.getEventsMapForTesting();
assertNull(
"expiry action on ThreadIdentifier did not remove itself from eventsMap",
eventsMap.get(threadId));
}
catch (Exception e) {
fail(" test failed due to " + e);
}
}
/**
* Add operation which creates the LastDispatchedAndCurrentEvents object
* should also add it to the Region with Threaddentifer as key & sequence as
* the value for Expiry. Validate that expiry on ThreadIdentifier does not
* removes itself from Events Map as data is lying & it resets the self
* expiry. Validate the data present in Queue experiences expiry. After the
* expiry of the data , AvaialbleIds size should be 0, entry removed from
* Region, LastDispatchedWrapperSet should have size 0.
*
*
*/
@Test
public void testNoExpiryOnThreadIdentifier()
{
try {
HARegionQueueAttributes hqa = new HARegionQueueAttributes();
hqa.setExpiryTime(8);
HARegionQueue regionqueue = createHARegionQueue("testing", hqa);
EventID id1 = new EventID(new byte[] { 1 }, 1, 1);
EventID id2 = new EventID(new byte[] { 1 }, 1, 2);
ConflatableObject c1 = new ConflatableObject(KEY1, VALUE1, id1, true,
"region1");
ConflatableObject c2 = new ConflatableObject(KEY1, VALUE2, id2, true,
"region1");
ThreadIdentifier threadId = new ThreadIdentifier(c1.getEventId()
.getMembershipID(), c1.getEventId().getThreadID());
regionqueue.put(c1);
Object o = regionqueue.take();
assertNotNull(o);
// wait for some time and put second object
Thread.sleep(3000);
regionqueue.put(c2);
// wait for some more time so that C2 has not expired
Thread.sleep(4000);
Map eventsMap = regionqueue.getEventsMapForTesting();
// verify that ThreadIdentifier does not remove itself as data is
// lying
assertNotNull(
"ThreadIdentifier removed itself through expiry even though data was lying in the queue",
eventsMap.get(threadId));
// wait for some more time to allow expiry on data
Thread.sleep(16000);
// After the expiry of the data , AvaialbleIds size should be 0,
// entry
// removed from Region, LastDispatchedWrapperSet should have size 0.
assertEquals(0, regionqueue.getRegion().entries(false).size());
assertEquals(0, regionqueue.getAvalaibleIds().size());
assertNull(regionqueue.getCurrentCounterSet(id1));
}
catch (Exception e) {
e.printStackTrace();
fail(" test failed due to " + e);
}
}
/**
* Multiple arrivals of QRM for the same thread id of the client.Intially
* queue contains objects from 1- 10. QRM with sequenceID 5 arrives It should
* remove only remove objects for 1- 5. Then sequenceID 10 come which should
* remove 5-10.
*
*/
@Test
public void testMultipleQRMArrival() throws Exception
{
HARegionQueue regionqueue = createHARegionQueue("testNoExpiryOnThreadIdentifier");
EventID[] ids = new EventID[10];
for (int i = 0; i < 10; i++) {
ids[i] = new EventID(new byte[] { 1 }, 1, i + 1);
}
for (int i = 0; i < 10; i++) {
regionqueue.put(new ConflatableObject("KEY " + i, "VALUE" + i, ids[i],
true, "region1"));
}
// Available id size should be == 10 after puting ten entries
assertEquals(10, regionqueue.getAvalaibleIds().size());
// QRM message for therad id 1 and last sequence id 5
regionqueue.removeDispatchedEvents(ids[4]);
assertEquals(5, regionqueue.getAvalaibleIds().size());
assertEquals(5, regionqueue.getCurrentCounterSet(ids[0]).size());
Iterator iter = regionqueue.getCurrentCounterSet(ids[0]).iterator();
while (iter.hasNext()) {
Long cntr = (Long)iter.next();
ConflatableObject co = (ConflatableObject)regionqueue.getRegion().get(
cntr);
assertTrue(co.getEventId().getSequenceID() > 5);
}
regionqueue.removeDispatchedEvents(ids[9]);
assertEquals(0, regionqueue.getAvalaibleIds().size());
}
/**
* Concurrent arrival of put & QRM messagge for a ThreadIdentifier coming for
* 1st time. The LastDispatchedObject gets operated first by the put thread ,
* the QRM thread should be blocked till the completion of add operation. Thus
* before QRM thread acts , the object should be present in the
* lastDispatchedSet & AvailableID. Then the QRM thread gets unblocked , it
* should remove from the available ID.
*
*/
@Test
public void testConcurrentPutAndQRM() throws Exception
{
testFailed = false;
message = new StringBuffer();
final HARegionQueue regionqueue = createHARegionQueue("testConcurrentPutAndQRM");
final EventID id1 = new EventID(new byte[] { 1 }, 1, 1);
final EventID id2 = new EventID(new byte[] { 1 }, 1, 2);
Thread t1 = new Thread() {
public void run()
{
try {
regionqueue.put(new ConflatableObject(KEY1, VALUE1, id1, true,
"region1"));
regionqueue.put(new ConflatableObject(KEY2, VALUE2, id2, true,
"region1"));
}
catch (Exception e) {
message.append("Put in region queue failed");
testFailed = true;
}
}
};
t1.setPriority(Thread.MAX_PRIORITY);
Thread t2 = new Thread() {
public void run()
{
try {
regionqueue.removeDispatchedEvents(id2);
}
catch (Exception e) {
message.append("Removal by QRM in region queue failed");
testFailed = true;
}
}
};
t1.start();
t2.start();
DistributedTestCase.join(t1, 180 * 1000, null);
DistributedTestCase.join(t2, 180 * 1000, null);
if (testFailed)
fail("Test failed due to " + message);
assertEquals(0, regionqueue.getAvalaibleIds().size());
assertEquals(2, regionqueue.getLastDispatchedSequenceId(id2));
}
/**
* Concurrent arrival of put & QRM messagge for a ThreadIdentifier coming for
* 1st time. The LastDispatchedObject gets operated first by the QRM thread ,
* the Put thread should be blocked till the completion of QRM operation. Thus
* put thread should see that last Sequence is > than the current & hence the
* put operation shud remove from region without adding the ID anywhere.
*/
@Test
public void testConcurrentQRMAndPut() throws Exception
{
testFailed = false;
final HARegionQueue regionqueue = createHARegionQueue("testConcurrentQRMAndPut");
final EventID id1 = new EventID(new byte[] { 1 }, 1, 1);
final EventID id2 = new EventID(new byte[] { 1 }, 1, 2);
Thread t1 = new Thread() {
public void run()
{
try {
regionqueue.put(new ConflatableObject(KEY1, VALUE1, id1, true,
"region1"));
regionqueue.put(new ConflatableObject(KEY2, VALUE2, id2, true,
"region1"));
}
catch (Exception e) {
message.append("Put in region queue failed");
testFailed = true;
}
}
};
Thread t2 = new Thread() {
public void run()
{
try {
regionqueue.removeDispatchedEvents(id2);
}
catch (Exception e) {
message.append("Removal of Events by QRM in Region queue failed");
testFailed = true;
}
}
};
t2.setPriority(Thread.MAX_PRIORITY);
t2.start();
t1.start();
DistributedTestCase.join(t1, 180 * 1000, null);
DistributedTestCase.join(t2, 180 * 1000, null);
if (testFailed)
fail("Test failed due to " + message);
assertEquals(0, regionqueue.getAvalaibleIds().size());
assertEquals(2, regionqueue.getLastDispatchedSequenceId(id2));
}
/**
* Two QRMs arriving such that higer sequence number arriving before lower
* sequence number. The lower squnce number should not set itself & also not
* do any checking on the IDs of the LinkedHashSet
*
* @throws Exception
*/
@Test
public void testEventMapPopulationForQRM() throws Exception
{
HARegionQueue regionqueue = createHARegionQueue("testEventMapPopulationForQRM");
EventID id1 = new EventID(new byte[] { 1 }, 1, 1);
EventID id2 = new EventID(new byte[] { 1 }, 1, 2);
this.logWriter.info("RemoveDispatched event for sequence id : "
+ id2.getSequenceID());
regionqueue.removeDispatchedEvents(id2);
this.logWriter.info("RemoveDispatched event for sequence id :"
+ id1.getSequenceID());
regionqueue.removeDispatchedEvents(id1);
assertEquals("Size of eventMap should be 1 but actual size "
+ regionqueue.getEventsMapForTesting(), regionqueue
.getEventsMapForTesting().size(), 1);
this.logWriter.info("sequence id : "
+ regionqueue.getLastDispatchedSequenceId(id2));
assertEquals(
"Last dispatched sequence id should be 2 but actual sequence id is ",
regionqueue.getLastDispatchedSequenceId(id2), id2.getSequenceID());
this.logWriter.info("testEventMapPopulationForQRM() completed successfully");
}
/**
* Concurrent arrival of put operations on different threadIDs but for same
* key . One should conflate the other. Whosoever confaltes , that ID should
* be present in the availableIDs list , Region , ConflationMap & its HashSet
* for that ThreadIdentifier. The ID which gets conflated should not be
* present in the availableID, Region & that ThreadIdentifier's HashSet . The
* conflation map should contain the Old IDs position.
*
* @throws Exception
*/
@Test
public void testCleanUpForConflation() throws Exception
{
this.logWriter
.info("HARQAddOperationJUnitTest : testCleanUpForConflation BEGIN");
testFailed = false;
message = null;
final int numOfThreads = 10;
final int numOfPuts = 567;
final HARegionQueue regionqueue = createHARegionQueue("testCleanUpForConflation");
this.logWriter
.info("HARQAddOperationJUnitTest : testCleanUpForConflation after regionqueue create");
/*
* doing concurrent put operations on different threadIDs but for the same
* key
*/
Thread[] threads = new Thread[10];
for (int i = 0; i < numOfThreads; i++) {
final long ids = i;
threads[i] = new Thread() {
public void run()
{
for (int j = 0; j < numOfPuts; j++) {
EventID id = new EventID(new byte[] { (byte)ids }, ids, j);
try {
regionqueue.put(new ConflatableObject(KEY1, id.getThreadID()
+ "VALUE" + j, id, true, "region1"));
}
catch (Exception ex) {
testFailed = true;
message.append("put failed for the threadId " + id.getThreadID());
}
}
}
};
}
for (int k = 0; k < numOfThreads; k++) {
threads[k].start();
}
for (int k = 0; k < numOfThreads; k++) {
DistributedTestCase.join(threads[k], 180 * 1000, null);
}
this.logWriter
.info("HARQAddOperationJUnitTest : testCleanUpForConflation after join");
if (testFailed)
fail("Test failed due to " + message);
assertEquals("size of the conflation map should be 1 but actual size is "
+ regionqueue.getConflationMapForTesting().size(), 1, regionqueue
.getConflationMapForTesting().size());
assertEquals("size of the event map should be " + numOfThreads
+ " but actual size " + regionqueue.getEventsMapForTesting().size(),
numOfThreads, regionqueue.getEventsMapForTesting().size());
assertEquals("size of availableids should 1 but actual size "
+ regionqueue.getAvalaibleIds().size(), 1, regionqueue
.getAvalaibleIds().size());
int count = 0;
for (int i = 0; i < numOfThreads; i++) {
if ((regionqueue.getCurrentCounterSet(new EventID(new byte[] { (byte)i },
i, i))).size() > 0) {
count++;
}
}
assertEquals("size of the counter set is 1 but the actual size is "
+ count, 1, count);
Long position = null;
if (regionqueue.getAvalaibleIds().size() == 1) {
position = (Long)regionqueue.getAvalaibleIds().iterator().next();
}
ConflatableObject id = (ConflatableObject)regionqueue.getRegion().get(
position);
assertEquals(regionqueue.getCurrentCounterSet(id.getEventId()).size(), 1);
this.logWriter
.info("HARQAddOperationJUnitTest : testCleanUpForConflation END");
}
/**
* Test where a 5 separate threads do 4 puts each. A thread then peeks the 20
* objects & then invokes remove. The remove should ensure that the entries
* are deleted from the available IDs & the Counters set contained in DACE.
* Conflation is disabled.
*/
@Test
public void testPeekAndRemoveWithoutConflation() throws Exception
{
testFailed = false;
message = null;
final int numOfThreads = 5;
final int numOfPuts = 4;
final int batchSize = 20;
final HARegionQueue regionqueue = createHARegionQueue("testPeekAndRemoveWithoutConflation");
Thread[] threads = new Thread[numOfThreads];
for (int i = 0; i < numOfThreads; i++) {
final long ids = i;
threads[i] = new Thread() {
public void run()
{
for (int j = 0; j < numOfPuts; j++) {
EventID id = new EventID(new byte[] { (byte)ids }, ids, j);
try {
regionqueue.put(new ConflatableObject(
KEY1 + id.getThreadID() + j, id.getThreadID() + "VALUE" + j,
id, false, "region1"));
}
catch (Exception ex) {
testFailed = true;
message.append("put failed for the threadId " + id.getThreadID());
}
}
}
};
}
for (int k = 0; k < numOfThreads; k++) {
threads[k].start();
}
for (int k = 0; k < numOfThreads; k++) {
DistributedTestCase.join(threads[k], 180 * 1000, null);
}
if (testFailed)
fail("Test failed due to " + message);
List pickObjects = regionqueue.peek(batchSize);
assertEquals(batchSize, pickObjects.size());
regionqueue.remove();
for (int i = 0; i < numOfThreads; i++) {
assertEquals(3, regionqueue.getLastDispatchedSequenceId(new EventID(
new byte[] { (byte)i }, i, 1)));
assertEquals(0, regionqueue.getCurrentCounterSet(
new EventID(new byte[] { (byte)i }, i, 1)).size());
}
assertEquals(0, regionqueue.getAvalaibleIds().size());
this.logWriter
.info("testPeekAndRemoveWithoutConflation() completed successfully");
}
/**
* Test where a 5 separate threads do 4 puts each. A thread then peeks the 20
* objects & then invokes remove. The remove should ensure that the entries
* are deleted from the available IDs & the Counters set contained in DACE.
* Conflation is enabled
*/
@Test
public void testPeekAndRemoveWithConflation() throws Exception
{
testFailed = false;
message = null;
final int numOfThreads = 5;
final int numOfPuts = 4;
final int batchSize = numOfThreads * numOfPuts;
final HARegionQueue regionqueue = createHARegionQueue("testPeekAndRemoveWithConflation");
Thread[] threads = new Thread[numOfThreads];
for (int i = 0; i < numOfThreads; i++) {
final long ids = i;
threads[i] = new Thread() {
public void run()
{
for (int j = 0; j < numOfPuts; j++) {
EventID id = new EventID(new byte[] { (byte)ids }, ids, j);
try {
regionqueue.put(new ConflatableObject(KEY1 + ids, id
.getThreadID()
+ "VALUE" + j, id, true, "region1"));
}
catch (Exception ex) {
testFailed = true;
message.append("put failed for the threadId " + id.getThreadID());
}
}
}
};
}
for (int k = 0; k < numOfThreads; k++) {
threads[k].start();
}
for (int k = 0; k < numOfThreads; k++) {
DistributedTestCase.join(threads[k], 180 * 1000, null);
}
if (testFailed)
fail("Test failed due to " + message);
List pickObject = regionqueue.peek(batchSize);
assertEquals(numOfThreads, pickObject.size());
regionqueue.remove();
for (int i = 0; i < numOfThreads; i++) {
// assertEquals(numOfPuts,
// regionqueue.getLastDispatchedSequenceId(new EventID(
// new byte[] { (byte)i }, i, 1)));
assertEquals(0, regionqueue.getCurrentCounterSet(
new EventID(new byte[] { (byte)i }, i, 1)).size());
}
assertEquals("size of availableIds map should be 0 ", 0, regionqueue
.getAvalaibleIds().size());
assertEquals("size of conflation map should be 0 ", 0, ((Map)regionqueue
.getConflationMapForTesting().get("region1")).size());
this.logWriter
.info("testPeekAndRemoveWithConflation() completed successfully");
}
/**
* Test where a 5 separate threads do 4 puts each. 4 threads then concurrently
* do a peek of batch size 5, 10 , 15 & 20 respectively. And all of them
* concurrently cal remove. The remove should ensure that the entries are
* deleted from the available IDs & the Counters set contained in DACE.
*
* @throws Exception
*/
@Test
public void testPeekForDiffBatchSizeAndRemoveAll() throws Exception
{
testFailed = false;
message = null;
barrierCount = 0;
final int numOfThreads = 5;
final int numOfPuts = 4;
// final CountDownLatch mylatch = new CountDownLatch(4);
final HARegionQueue regionqueue = createHARegionQueue("testPeekForDiffBatchSizeAndRemoveAll");
Thread[] threads = new Thread[numOfThreads];
for (int i = 0; i < numOfThreads; i++) {
final long ids = i;
threads[i] = new Thread() {
public void run()
{
for (int j = 0; j < numOfPuts; j++) {
EventID id = new EventID(new byte[] { (byte)ids }, ids, j);
try {
regionqueue.put(new ConflatableObject(
KEY1 + id.getThreadID() + j, id.getThreadID() + "VALUE" + j,
id, false, "region1"));
}
catch (Exception ex) {
testFailed = true;
message.append("put failed for the threadId " + id.getThreadID());
}
}
}
};
}
for (int k = 0; k < numOfThreads; k++) {
threads[k].start();
}
for (int k = 0; k < numOfThreads; k++) {
DistributedTestCase.join(threads[k], 180 * 1000, null);
}
if (testFailed)
fail("Test failed due to " + message);
testFailed = false;
message = null;
Thread[] threads_peek_remove = new Thread[numOfPuts];
for (int i = 1; i < (numOfPuts + 1); i++) {
final int peakBatchSize = i * 5;
threads_peek_remove[i - 1] = new Thread() {
public void run()
{
try {
List peakObjects = regionqueue.peek(peakBatchSize);
assertEquals(peakBatchSize, peakObjects.size());
synchronized (HARQAddOperationJUnitTest.this) {
++barrierCount;
if (barrierCount == 4) {
HARQAddOperationJUnitTest.this.notifyAll();
}
else {
HARQAddOperationJUnitTest.this.wait();
}
}
regionqueue.remove();
}
catch (Exception ex) {
testFailed = true;
ex.printStackTrace();
message.append("Exception while performing peak operation "
+ ex.getStackTrace());
}
}
};
}
for (int k = 0; k < numOfPuts; k++) {
threads_peek_remove[k].start();
}
for (int k = 0; k < numOfPuts; k++) {
DistributedTestCase.join(threads_peek_remove[k], 180 * 1000, null);
}
if (testFailed)
fail("Test failed due to " + message);
for (int i = 0; i < numOfThreads; i++) {
assertEquals(3, regionqueue.getLastDispatchedSequenceId(new EventID(
new byte[] { (byte)i }, i, 1)));
assertEquals(0, regionqueue.getCurrentCounterSet(
new EventID(new byte[] { (byte)i }, i, 1)).size());
}
assertEquals(0, regionqueue.getAvalaibleIds().size());
this.logWriter
.info("testPeekForDiffBatchSizeAndRemoveAll() completed successfully");
}
/**
* Test where a 5 separate threads do 4 puts each. 3 threads then concurrently
* do a peek of batch size 5, 10 and 15 respectively. And all of them
* concurrently call remove. The remove should ensure that the entries are
* deleted from the available IDs & the Counters set contained in DACE.
*
* @throws Exception
*/
@Test
public void testPeekForDiffBatchSizeAndRemoveSome() throws Exception
{
testFailed = false;
barrierCount = 0;
message = null;
final int numOfThreads = 5;
final int numOfPuts = 4;
final HARegionQueue regionqueue = createHARegionQueue("testPeekForDiffBatchSizeAndRemoveSome");
Thread[] threads = new Thread[numOfThreads];
for (int i = 0; i < numOfThreads; i++) {
final long ids = i;
threads[i] = new Thread() {
public void run()
{
for (int j = 0; j < numOfPuts; j++) {
EventID id = new EventID(new byte[] { (byte)ids }, ids, j);
try {
regionqueue.put(new ConflatableObject(
KEY1 + id.getThreadID() + j, id.getThreadID() + "VALUE" + j,
id, false, "region1"));
}
catch (Exception ex) {
testFailed = true;
message.append("put failed for the threadId " + id.getThreadID());
}
}
}
};
}
for (int k = 0; k < numOfThreads; k++) {
threads[k].start();
}
for (int k = 0; k < numOfThreads; k++) {
DistributedTestCase.join(threads[k], 180 * 1000, null);
}
if (testFailed)
fail("Test failed due to " + message);
testFailed = false;
message = null;
Thread[] threads_peek_remove = new Thread[numOfPuts - 1];
for (int i = 1; i < numOfPuts; i++) {
final int peakBatchSize = i * 5;
threads_peek_remove[i - 1] = new Thread() {
public void run()
{
try {
List peakObjects = regionqueue.peek(peakBatchSize);
assertEquals(peakBatchSize, peakObjects.size());
synchronized (HARQAddOperationJUnitTest.this) {
++barrierCount;
if (barrierCount == 3) {
HARQAddOperationJUnitTest.this.notifyAll();
}
else {
HARQAddOperationJUnitTest.this.wait();
}
}
regionqueue.remove();
}
catch (Exception ex) {
testFailed = true;
ex.printStackTrace();
message.append("Exception while performing peak operation "
+ ex.getStackTrace());
}
}
};
}
for (int k = 0; k < numOfPuts - 1; k++) {
threads_peek_remove[k].start();
}
for (int k = 0; k < numOfPuts - 1; k++) {
DistributedTestCase.join(threads_peek_remove[k], 180 * 1000, null);
}
if (testFailed)
fail("Test failed due to " + message);
assertEquals(5, regionqueue.getAvalaibleIds().size());
this.logWriter
.info("testPeekForDiffBatchSizeAndRemoveSome() completed successfully");
}
/**
* Add with QRM & expiry : Add 10 conflatable objects (0-9). Send QRM
* LastDispatched as 4. Validate the sequence ID field of
* LastDispatchedWrapper object is updated to 4. Perform Take operations to
* remove next 5 to 9. The seqeunceId field should be 9. Allow
* ThreadIdenitifier to expire. The expiration should fail as the original
* sequenceId ( -1) does not match with 9. Validate reset with value 9 . The
* next expiry should remove the LastDisptachedWrapper
*/
@Test
public void testAddWithQRMAndExpiry() throws Exception
{
try{
HARegionQueueAttributes attrs = new HARegionQueueAttributes();
attrs.setExpiryTime(10);
final HARegionQueue regionqueue = new HARegionQueue.TestOnlyHARegionQueue("testing", cache, attrs) {
CacheListener createCacheListenerForHARegion()
{
return new CacheListenerAdapter() {
public void afterInvalidate(EntryEvent event) {
try {
expireTheEventOrThreadIdentifier(event);
}
catch (CacheException ce) {
logger.error("HAREgionQueue::createCacheListener::Exception in the expiry thread", ce);
}
if (event.getKey() instanceof ThreadIdentifier) {
synchronized (HARQAddOperationJUnitTest.this) {
expiryCount++;
HARQAddOperationJUnitTest.this.notify();
}
}
}
};
}
};
Conflatable[] cf = new Conflatable[10];
// put 10 conflatable objects
for (int i = 0; i < 10; i++) {
cf[i] = new ConflatableObject("key" + i, "value", new EventID(
new byte[] { 1 }, 1, i), true, "testing");
regionqueue.put(cf[i]);
}
ThreadIdentifier tID = new ThreadIdentifier(new byte[] { 1 }, 1);
// verify that the sequence-id for Thread-identifier is -1 (default value).
assertEquals(new Long(-1), regionqueue.getRegion().get(tID));
// remove the first 5 - (0-4 sequence IDs)
regionqueue.removeDispatchedEvents(new EventID(new byte[] { 1 }, 1, 4));
// verify that the last dispatched event was of sequence id 4
assertEquals(4, regionqueue.getLastDispatchedSequenceId(new EventID(new byte[] { 1 }, 1, 1)));
// verify 1-5 not in region
for (long i = 1; i < 6; i++) {
Assert.assertTrue(!regionqueue.getRegion().containsKey(new Long(i)));
}
// verify 6-10 still in region queue
for (long i = 6; i < 11; i++) {
Assert.assertTrue(regionqueue.getRegion().containsKey(new Long(i)));
}
// Perform 5 take operations to remove next 5-9 sequence ids
for (long i = 6; i < 11; i++) {
regionqueue.take();
}
// verify that the last dispatched event was of sequence id 10
assertEquals(9, regionqueue.getLastDispatchedSequenceId(new EventID(new byte[] { 1 }, 1, 1)));
// verify that sequence ids 1-10 all are removed from the RQ
for (long i = 1; i < 11; i++) {
Assert.assertTrue(!regionqueue.getRegion().containsKey(new Long(i)));
}
// wait until expiry thread has run once
synchronized(HARQAddOperationJUnitTest.this) {
if (0 == expiryCount) {
HARQAddOperationJUnitTest.this.wait();
}
if(1 == expiryCount) {
// verify that the Thread-identifier has not yet expired
assertEquals(1,regionqueue.getEventsMapForTesting().size());
// verify that the sequence-id for Thread-identifier is updated to 9
assertEquals(new Long(9), regionqueue.getRegion().get(tID));
// wait until expiry thread has run again
HARQAddOperationJUnitTest.this.wait();
}
}
// verify that the Thread-identifier has expired
assertEquals(0,regionqueue.getEventsMapForTesting().size());
// verify that the sequence-id for Thread-identifier is null
assertNull(regionqueue.getRegion().get(tID));
}
catch (Exception e) {
fail("Exception occured in test due to " + e);
}
}
/**
* This test does the following:<br>
* 1)Create a blocking HARegionQueue<br>
* 2)Add some events to the queue with same ThreadIdentifier<br>
* 3)Do take() operations to drain the queue<br>
* 4)Verify that dispatchedMessagesMap is not null<br>
* 5)Verify that size of the dispatchedMessagesMap is 1 as one regionqueue is
* created in this test<br>
* 6)Verify that the map contains an entry for the queue-region name<br>
* 7)Verify that the size of wrapper-map is 1 as all events had same ThreadId<br>
* 8)Verify that the sequenceId against the ThreadId in the wrapper-map is
* same as that of the last event taken<br>
*
* @throws Exception -
* thrown if any exception occurs in test execution
*/
/**
* Behaviour of take() has been changed for relaible messaging feature. Region queue take()
* operation will no longer add to the Dispatch Message Map. Hence disabling the test - SUYOG
*/
public void _testDispatchedMsgsMapUpdateOnTakes() throws Exception
{
this.logWriter
.info("HARQAddOperationJUnitTest : testDispatchedEventsMapUpdateOnTakes BEGIN");
String regionName = "testDispatchedEventsMapUpdateOnTakes";
HARegionQueue rq = createHARegionQueue(regionName);
Conflatable cf = null;
EventID id = null;
int totalEvents = 10;
for (int i = 0; i < totalEvents; i++) {
id = new EventID(new byte[] { 1 }, 1, i);
cf = new ConflatableObject("key" + i, "value" + i, id, false, "testing");
rq.put(cf);
}
for (int i = 0; i < totalEvents; i++) {
rq.take();
}
Map dispatchedMsgMap = HARegionQueue.getDispatchedMessagesMapForTesting();
// verify that map is not null
assertNotNull("dispatchedMessagesMap found null", dispatchedMsgMap);
// size of the dispatchedMessagesMap should be 1 as one regionqueue is
// created in this test
assertEquals("size of dispatched msgs should be 1", 1, dispatchedMsgMap
.size());
// verify that the map contains an entry for the queue-region name
MapWrapper wrapper = (MapWrapper)dispatchedMsgMap.get(regionName);
assertNotNull(
"dispatchedMsgMap should contain an entry with queueregion name as key",
wrapper);
Map dispatchedData = wrapper.map;
assertEquals(
"size of wrapper-map should be 1 as all events had same ThreadId", 1,
dispatchedData.size());
ThreadIdentifier tid = new ThreadIdentifier(new byte[] { 1 }, 1);
Long seqId = (Long)dispatchedData.get(tid);
assertEquals(
"sequenceId against the ThreadId in the wrapper-map should be that of the last event taken.",
id.getSequenceID(), seqId.longValue());
this.logWriter
.info("HARQAddOperationJUnitTest : testDispatchedEventsMapUpdateOnTakes END");
}
}