blob: d706a49379bed0ee351799783392cdc28da88586 [file] [log] [blame]
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
package org.apache.geode.internal.cache.ha;
import static java.lang.Thread.yield;
import static org.apache.geode.internal.cache.ha.HARegionQueue.NON_BLOCKING_HA_QUEUE;
import static org.apache.geode.internal.cache.ha.HARegionQueue.getHARegionQueueInstance;
import static org.apache.geode.test.awaitility.GeodeAwaitility.await;
import static org.apache.geode.test.dunit.Assert.assertEquals;
import static org.apache.geode.test.dunit.Assert.assertNotNull;
import static org.apache.geode.test.dunit.Assert.assertNull;
import static org.apache.geode.test.dunit.Assert.assertTrue;
import static;
import static org.apache.geode.test.dunit.ThreadUtils.join;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import org.junit.Ignore;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.mockito.AdditionalAnswers;
import org.apache.geode.LogWriter;
import org.apache.geode.cache.AttributesFactory;
import org.apache.geode.cache.CacheException;
import org.apache.geode.cache.CacheFactory;
import org.apache.geode.cache.DataPolicy;
import org.apache.geode.cache.EntryEvent;
import org.apache.geode.cache.Region;
import org.apache.geode.cache.Scope;
import org.apache.geode.cache.util.CacheListenerAdapter;
import org.apache.geode.cache30.CacheSerializableRunnable;
import org.apache.geode.distributed.DistributedSystem;
import org.apache.geode.internal.cache.Conflatable;
import org.apache.geode.internal.cache.EntryEventImpl;
import org.apache.geode.internal.cache.EventID;
import org.apache.geode.internal.cache.GemFireCacheImpl;
import org.apache.geode.internal.cache.HARegion;
import org.apache.geode.internal.cache.InternalCache;
import org.apache.geode.test.awaitility.GeodeAwaitility;
import org.apache.geode.test.dunit.Host;
import org.apache.geode.test.dunit.SerializableCallable;
import org.apache.geode.test.dunit.ThreadUtils;
import org.apache.geode.test.dunit.VM;
import org.apache.geode.test.dunit.WaitCriterion;
import org.apache.geode.test.dunit.internal.JUnit4DistributedTestCase;
import org.apache.geode.test.junit.categories.ClientSubscriptionTest;
public class HARegionQueueDUnitTest extends JUnit4DistributedTestCase {
private static volatile boolean toCnt = true;
private static volatile Thread createQueuesThread;
private static InternalCache cache = null;
private static HARegionQueue hrq = null;
private static Thread[] opThreads;
private VM vm0 = null;
private VM vm1 = null;
private VM vm3 = null;
private VM vm2 = null;
* get the VM's
public final void postSetUp() throws Exception {
final Host host = Host.getHost(0);
vm0 = host.getVM(0);
vm1 = host.getVM(1);
vm2 = host.getVM(2);
vm3 = host.getVM(3);
vm0.invoke(() -> HARegionQueueDUnitTest.toCnt = true);
vm1.invoke(() -> HARegionQueueDUnitTest.toCnt = true);
vm2.invoke(() -> HARegionQueueDUnitTest.toCnt = true);
vm3.invoke(() -> HARegionQueueDUnitTest.toCnt = true);
* close the cache in tearDown
public final void preTearDown() throws Exception {
vm0.invoke(() -> HARegionQueueDUnitTest.closeCache());
vm1.invoke(() -> HARegionQueueDUnitTest.closeCache());
vm2.invoke(() -> HARegionQueueDUnitTest.closeCache());
vm3.invoke(() -> HARegionQueueDUnitTest.closeCache());
cache = null;
hrq = null;
opThreads = null;
* create cache
private InternalCache createCache() throws CacheException {
Properties props = new Properties();
DistributedSystem ds = getSystem(props);
ds = getSystem(props);
InternalCache cache = null;
cache = (InternalCache) CacheFactory.create(ds);
if (cache == null) {
// TODO: never throw an anonymous inner class
throw new CacheException("CacheFactory.create() returned null ") {};
return cache;
* 1) Create mirrored HARegion region1 in VM1 and VM2 2) do a put in VM1 3) assert that the put
* has not propagated from VM1 to VM2 4) do a put in VM2 5) assert that the value in VM1 has not
* changed to due to put in VM2 6) assert put in VM2 was successful by doing a get
public void testLocalPut() throws Exception {
vm0.invoke(() -> HARegionQueueDUnitTest.createRegion());
vm1.invoke(() -> HARegionQueueDUnitTest.createRegion());
vm0.invoke(() -> HARegionQueueDUnitTest.putValue1());
vm1.invoke(() -> HARegionQueueDUnitTest.getNull());
vm1.invoke(() -> HARegionQueueDUnitTest.putValue2());
vm0.invoke(() -> HARegionQueueDUnitTest.getValue1());
vm1.invoke(() -> HARegionQueueDUnitTest.getValue2());
* 1) Create mirrored HARegion region1 in VM1 and VM2 2) do a put in VM1 3) assert that the put
* has not propagated from VM1 to VM2 4) do a put in VM2 5) assert that the value in VM1 has not
* changed to due to put in VM2 6) assert respective puts the VMs were successful by doing a get
* 7) localDestroy key in VM1 8) assert key has been destroyed in VM1 9) assert key has not been
* destroyed in VM2
public void testLocalDestroy() throws Exception {
vm0.invoke(() -> HARegionQueueDUnitTest.createRegion());
vm1.invoke(() -> HARegionQueueDUnitTest.createRegion());
vm0.invoke(() -> HARegionQueueDUnitTest.putValue1());
vm1.invoke(() -> HARegionQueueDUnitTest.getNull());
vm1.invoke(() -> HARegionQueueDUnitTest.putValue2());
vm0.invoke(() -> HARegionQueueDUnitTest.getValue1());
vm1.invoke(() -> HARegionQueueDUnitTest.getValue2());
vm0.invoke(() -> HARegionQueueDUnitTest.destroy());
vm0.invoke(() -> HARegionQueueDUnitTest.getNull());
vm1.invoke(() -> HARegionQueueDUnitTest.getValue2());
* 1) Create mirrored HARegion region1 in VM1 2) do a put in VM1 3) get the value in VM1 to assert
* put has happened successfully 4) Create mirrored HARegion region1 in VM2 5) do a get in VM2 to
* verify that value was got through GII 6) do a put in VM2 7) assert put in VM2 was successful
public void testGII() throws Exception {
vm0.invoke(() -> HARegionQueueDUnitTest.createRegion());
vm0.invoke(() -> HARegionQueueDUnitTest.putValue1());
vm0.invoke(() -> HARegionQueueDUnitTest.getValue1());
vm1.invoke(() -> HARegionQueueDUnitTest.createRegion());
vm1.invoke(() -> HARegionQueueDUnitTest.getValue1());
vm1.invoke(() -> HARegionQueueDUnitTest.putValue2());
vm1.invoke(() -> HARegionQueueDUnitTest.getValue2());
* 1) Create mirrored HARegion region1 in VM1 2) do a put in VM1 3) get the value in VM1 to assert
* put has happened successfully 4) Create mirrored HARegion region1 in VM2 5) do a get in VM2 to
* verify that value was got through GII 6) do a put in VM2 7) assert put in VM2 was successful
public void testQRM() throws Exception {
vm0.invoke(() -> HARegionQueueDUnitTest.createRegionQueue());
vm1.invoke(() -> HARegionQueueDUnitTest.createRegionQueue());
vm0.invoke(() -> HARegionQueueDUnitTest.verifyAddingDispatchMesgs());
vm1.invoke(() -> HARegionQueueDUnitTest.verifyDispatchedMessagesRemoved());
* Behaviour of take() has been changed for reliable messaging feature. Region queue take()
* operation will no longer add to the Dispatch Message Map. Hence disabling the test - SUYOG
* Test for #35988 HARegionQueue.take() is not functioning as expected
@Ignore("TODO: this test was disabled")
public void testBugNo35988() throws Exception {
CacheSerializableRunnable createQueue =
new CacheSerializableRunnable("CreateCache, HARegionQueue and start thread") {
public void run2() throws CacheException {
HARegionQueueDUnitTest test = new HARegionQueueDUnitTest();
// TODO:ASIF: Bcoz of the QRM thread cannot take frequency below
// 1 second , thus we need to carfully evaluate what to do. Though
// in this case 1 second instead of 500 ms will work
// System.getProperties().put("QueueRemovalThreadWaitTime", new Long(500));
cache = test.createCache();
HARegionQueueAttributes hrqa = new HARegionQueueAttributes();
try {
hrq = HARegionQueue.getHARegionQueueInstance("testregion1", cache, hrqa,
HARegionQueue.NON_BLOCKING_HA_QUEUE, false);
// Do 1000 putand 100 take in a separate thread
hrq.put(new ConflatableObject(new Long(1), new Long(1),
new EventID(new byte[] {0}, 1, 1), false, "dummy"));
} catch (Exception e) {
throw new AssertionError(e);
vm0.invoke(new CacheSerializableRunnable("takeFromVm0") {
public void run2() throws CacheException {
try {
Conflatable obj = (Conflatable) hrq.take();
} catch (Exception e) {
throw new AssertionError(e);
vm1.invoke(new CacheSerializableRunnable("checkInVm1") {
public void run2() throws CacheException {
WaitCriterion ev = new WaitCriterion() {
public boolean done() {
yield(); // TODO is this necessary?
return hrq.size() == 0;
public String description() {
return null;
* create a client with 2 regions sharing a common writer
private static void createRegion() throws Exception {
HARegionQueueDUnitTest test = new HARegionQueueDUnitTest();
cache = test.createCache();
AttributesFactory factory = new AttributesFactory();
// Mock the HARegionQueue and answer the input CachedDeserializable when updateHAEventWrapper is
// called
HARegionQueue harq = mock(HARegionQueue.class);
when(harq.updateHAEventWrapper(any(), any(), any()))
HARegion.getInstance("HARegionQueueDUnitTest_region", (GemFireCacheImpl) cache, harq,
private static void createRegionQueue() throws Exception {
HARegionQueueDUnitTest test = new HARegionQueueDUnitTest();
cache = test.createCache();
* AttributesFactory factory = new AttributesFactory(); factory.setScope(Scope.DISTRIBUTED_ACK);
* factory.setDataPolicy(DataPolicy.REPLICATE);
hrq = HARegionQueue.getHARegionQueueInstance("HARegionQueueDUnitTest_region", cache,
HARegionQueue.NON_BLOCKING_HA_QUEUE, false);
EventID id1 = new EventID(new byte[] {1}, 1, 1);
EventID id2 = new EventID(new byte[] {1}, 1, 2);
ConflatableObject c1 =
new ConflatableObject("1", "1", id1, false, "HARegionQueueDUnitTest_region");
ConflatableObject c2 =
new ConflatableObject("2", "2", id2, false, "HARegionQueueDUnitTest_region");
private static void createRegionQueue2() throws Exception {
HARegionQueueDUnitTest test = new HARegionQueueDUnitTest();
cache = test.createCache();
* AttributesFactory factory = new AttributesFactory(); factory.setScope(Scope.DISTRIBUTED_ACK);
* factory.setDataPolicy(DataPolicy.REPLICATE);
HARegionQueueAttributes harqAttr = new HARegionQueueAttributes();
hrq = HARegionQueue.getHARegionQueueInstance("HARegionQueueDUnitTest_region", cache, harqAttr,
HARegionQueue.NON_BLOCKING_HA_QUEUE, false);
private static void clearRegion() {
try {
Iterator iterator = hrq.getRegion().keys().iterator();
while (iterator.hasNext()) {
} catch (Exception e) {
fail("Exception occurred while trying to destroy region", e);
private static void verifyAddingDispatchMesgs() {
hrq.addDispatchedMessage(new ThreadIdentifier(new byte[1], 1), 1);
private static void verifyDispatchedMessagesRemoved() {
try {
final Region region = hrq.getRegion();
// wait until we have a dead server
WaitCriterion ev = new WaitCriterion() {
public boolean done() {
yield(); // TODO is this necessary?
return region.get(new Long(0)) == null;
public String description() {
return null;
* if (region.get(new Long(0)) != null) { fail("Expected message to have been deleted but it
* is not deleted"); }
if (region.get(new Long(1)) == null) {
fail("Expected message not to have been deleted but it is deleted");
} catch (Exception e) {
fail("test failed due to an exception", e);
* close the cache
private static void closeCache() {
if (cache != null && !cache.isClosed()) {
* do puts on key-1
private static void putValue1() {
try {
Region r1 = cache.getRegion("/HARegionQueueDUnitTest_region");
r1.put("key-1", "value-1");
} catch (Exception ex) {
fail("failed while region.put()", ex);
private static void putConflatables() {
try {
Region r1 = hrq.getRegion();
for (int i = 1; i < 11; i++) {
r1.put(new Long(i), new ConflatableObject("key" + i, "value" + i,
new EventID(new byte[] {1}, 1, i), true, "HARegionQueueDUnitTest_region"));
} catch (Exception ex) {
fail("failed while region.put()", ex);
* verifies the data has been populated correctly after GII
private static void verifyMapsAndData() {
try {
HARegion r1 = (HARegion) hrq.getRegion();
// region should not be null
assertNotNull(" Did not expect the HARegion to be null but it is", r1);
// it should have ten non null entries
for (int i = 1; i < 11; i++) {
assertNotNull(" Did not expect the entry to be null but it is", r1.get(new Long(i)));
// HARegionQueue should not be null
assertNotNull(" Did not expect the HARegionQueue to be null but it is", hrq);
Map conflationMap = hrq.getConflationMapForTesting();
// conflationMap size should be greater than 0
assertTrue(" Did not expect the conflationMap size to be 0 but it is",
conflationMap.size() > 0);
Map internalMap = (Map) conflationMap.get("HARegionQueueDUnitTest_region");
// internal map should not be null. it should be present
assertNotNull(" Did not expect the internalMap to be null but it is", internalMap);
// get and verify the entries in the conflation map.
for (int i = 1; i < 11; i++) {
assertTrue(" Did not expect the entry not to be equal but it is",
internalMap.get("key" + i).equals(new Long(i)));
Map eventMap = hrq.getEventsMapForTesting();
// DACE should not be null
assertNotNull(" Did not expect the result (DACE object) to be null but it is",
eventMap.get(new ThreadIdentifier(new byte[] {1}, 1)));
Set counterSet = hrq.getCurrentCounterSet(new EventID(new byte[] {1}, 1, 1));
assertTrue(" excpected the counter set size to be 10 but it is not so",
counterSet.size() == 10);
long i = 1;
Iterator iterator = counterSet.iterator();
// verify the order of the iteration. it should be 1 - 10. The underlying
// set is a LinkedHashSet
while (iterator.hasNext()) {
assertTrue(((Long) == i);
// The last dispactchde sequence Id should be -1 since no dispatch has
// been made
assertTrue(hrq.getLastDispatchedSequenceId(new EventID(new byte[] {1}, 1, 1)) == -1);
// sleep for 8.0 seconds. Everythign should expire and everything should
// be null and empty
for (int j = 1; j < 11; j++) {
assertNull("expected the entry to be null since expiry time exceeded but it is not so",
r1.get(new Long(j)));
internalMap = (Map) hrq.getConflationMapForTesting().get("HARegionQueueDUnitTest_region");
assertNotNull(" Did not expect the internalMap to be null but it is", internalMap);
"internalMap (conflation) should have been emptry since expiry of all entries has been exceeded but it is not so",
"eventMap should have been emptry since expiry of all entries has been exceeded but it is not so",
"counter set should have been emptry since expiry of all entries has been exceeded but it is not so",
} catch (Exception ex) {
fail("failed while region.put()", ex);
* do puts on key-1,value-2
private static void putValue2() {
try {
Region r1 = cache.getRegion("/HARegionQueueDUnitTest_region");
r1.put("key-1", "value-2");
} catch (Exception ex) {
fail("failed while region.put()", ex);
* do a get on region1
private static void getValue1() {
try {
Region r = cache.getRegion("/HARegionQueueDUnitTest_region");
if (!(r.get("key-1").equals("value-1"))) {
fail("expected value to be value-1 but it is not so");
} catch (Exception ex) {
fail("failed while region.get()", ex);
* do a get on region1
private static void getNull() {
try {
Region r = cache.getRegion("/HARegionQueueDUnitTest_region");
if (!(r.get("key-1") == (null))) {
fail("expected value to be null but it is not so");
} catch (Exception ex) {
fail("failed while region.get()", ex);
* do a get on region1
public static void getValue2() {
try {
Region r = cache.getRegion("/HARegionQueueDUnitTest_region");
if (!(r.get("key-1").equals("value-2"))) {
fail("expected value to be value-2 but it is not so");
} catch (Exception ex) {
fail("failed while region.get()", ex);
* destroy key-1
public static void destroy() {
try {
Region region1 = cache.getRegion("/HARegionQueueDUnitTest_region");
} catch (Exception e) {
fail("test failed due to exception in destroy", e);
* Tests the Non Blocking HARegionQueue by doing concurrent put /remove / take / peek , batch peek
* operations in multiple regions. The test will have take/remove occuring in all the VMs. This
* test is targetted to test for hang or exceptions in non blocking queue.
public void testConcurrentOperationsDunitTestOnNonBlockingQueue() throws Exception {
concurrentOperationsDunitTest(false, Scope.DISTRIBUTED_ACK);
* Tests the Non Blocking HARegionQueue by doing concurrent put /remove / take / peek , batch peek
* operations in multiple regions. The test will have take/remove occuring in all the VMs. This
* test is targetted to test for hang or exceptions in non blocking queue.
public void testConcurrentOperationsDunitTestOnNonBlockingQueueWithDNoAckRegion()
throws Exception {
concurrentOperationsDunitTest(false, Scope.DISTRIBUTED_NO_ACK);
* Tests the Blokcing HARegionQueue by doing concurrent put /remove / take / peek , batch peek
* operations in multiple regions. The test will have take/remove occuring in all the VMs. This
* test is targetted to test for hang or exceptions in blocking queue.
public void testConcurrentOperationsDunitTestOnBlockingQueue() throws Exception {
concurrentOperationsDunitTest(true, Scope.DISTRIBUTED_ACK);
private void concurrentOperationsDunitTest(final boolean createBlockingQueue,
final Scope rscope) {
// Create Cache and HARegionQueue in all the 4 VMs.
CacheSerializableRunnable createRgnsAndQueues = new CacheSerializableRunnable(
"CreateCache, mirrored Region & HARegionQueue with a CacheListener") {
public void run2() throws CacheException {
HARegionQueueDUnitTest test = new HARegionQueueDUnitTest();
System.getProperties().put("QueueRemovalThreadWaitTime", "2000");
cache = test.createCache();
AttributesFactory factory = new AttributesFactory();
HARegionQueueAttributes hrqa = new HARegionQueueAttributes();
try {
if (createBlockingQueue) {
hrq = HARegionQueue.getHARegionQueueInstance("testregion1", cache, hrqa,
HARegionQueue.BLOCKING_HA_QUEUE, false);
} else {
hrq = HARegionQueue.getHARegionQueueInstance("testregion1", cache, hrqa,
HARegionQueue.NON_BLOCKING_HA_QUEUE, false);
} catch (Exception e) {
throw new AssertionError(e);
factory.addCacheListener(new CacheListenerAdapter() {
public void afterCreate(final EntryEvent event) {
if (toCnt) {
Conflatable conflatable = new ConflatableObject(event.getKey(), event.getNewValue(),
((EntryEventImpl) event).getEventId(), false, event.getRegion().getFullPath());
try {
} catch (Exception e) {
fail("The put operation in queue did not succeed due to exception =", e);
public void afterUpdate(final EntryEvent event) {
if (toCnt) {
Conflatable conflatable = new ConflatableObject(event.getKey(), event.getNewValue(),
((EntryEventImpl) event).getEventId(), true, event.getRegion().getFullPath());
try {
} catch (Exception e) {
fail("The put operation in queue did not succeed due to exception =", e);
cache.createRegion("test_region", factory.create());
CacheSerializableRunnable spawnThreadsAndperformOps =
new CacheSerializableRunnable("Spawn multiple threads which do various operations") {
public void run2() throws CacheException {
opThreads = new RunOp[4 + 2 + 2 + 2];
for (int i = 0; i < 4; ++i) {
opThreads[i] = new RunOp(RunOp.PUT, i);
for (int i = 4; i < 6; ++i) {
opThreads[i] = new RunOp(RunOp.PEEK, i);
for (int i = 6; i < 8; ++i) {
opThreads[i] = new RunOp(RunOp.TAKE, i);
for (int i = 8; i < 10; ++i) {
opThreads[i] = new RunOp(RunOp.TAKE, i);
for (int i = 0; i < opThreads.length; ++i) {
SerializableCallable guaranteeOperationsOccured =
new SerializableCallable("Check Ops Occurred") {
public Object call() throws CacheException {
if (opThreads == null) {
return false;
for (int i = 0; i < opThreads.length; ++i) {
if (((RunOp) opThreads[i]).getNumOpsPerformed() == 0) {
return false;
return true;
.untilAsserted(() -> assertTrue((Boolean) vm0.invoke(guaranteeOperationsOccured)));
.untilAsserted(() -> assertTrue((Boolean) vm1.invoke(guaranteeOperationsOccured)));
.untilAsserted(() -> assertTrue((Boolean) vm2.invoke(guaranteeOperationsOccured)));
.untilAsserted(() -> assertTrue((Boolean) vm3.invoke(guaranteeOperationsOccured)));
// In case of blocking HARegionQueue do some extra puts so that the
// blocking threads
// are exited
CacheSerializableRunnable toggleFlag =
new CacheSerializableRunnable("Toggle the flag to signal end of threads") {
public void run2() throws CacheException {
toCnt = false;
if (createBlockingQueue) {
try {
for (int i = 0; i < 100; ++i) {
hrq.put(new ConflatableObject("1", "1", new EventID(new byte[] {1}, 100, i),
false, "/x"));
} catch (Exception e) {
throw new AssertionError(e);
CacheSerializableRunnable joinWithThreads =
new CacheSerializableRunnable("Join with the threads") {
public void run2() throws CacheException {
for (int i = 0; i < opThreads.length; ++i) {
if (opThreads[i].isInterrupted()) {
fail("Test failed because thread encountered exception");
ThreadUtils.join(opThreads[i], 30 * 1000);
* This is to test the bug which is caused when HARegionQueue object hasnot been fully constructed
* but as the HARegion has got constructed , it gets visible to QRM Message Thread.
* TODO: this test runs too long! Shorten run time. 1m 40s on new Mac.
public void testNPEDueToHARegionQueueEscapeInConstructor() {
// changing EXPIRY_TIME to 5 doesn't change how long the test runs!
final int EXPIRY_TIME = 30; // test will run for this many seconds
// Create two HARegionQueue 's in the two VMs. The frequency of QRM thread
// should be high
// Check for NullPointeException in the other VM.
CacheSerializableRunnable createQueuesAndThread =
new CacheSerializableRunnable("CreateCache, HARegionQueue and start thread") {
public void run2() throws CacheException {
HARegionQueueDUnitTest test = new HARegionQueueDUnitTest();
// TODO:ASIF: Bcoz of the QRM thread cannot take frequency below
// 1 second , thus we need to carfully evaluate what to do.
// For this bug to appear ,without bugfix , qrm needs to run
// very fast.
// System.getProperties().put("QueueRemovalThreadWaitTime", new Long(10));
cache = test.createCache();
HARegionQueueAttributes hrqa = new HARegionQueueAttributes();
try {
hrq = HARegionQueue.getHARegionQueueInstance(
"testNPEDueToHARegionQueueEscapeInConstructor", cache, hrqa,
HARegionQueue.NON_BLOCKING_HA_QUEUE, false);
// changing OP_COUNT to 20 makes no difference in test time
final int OP_COUNT = 200;
// Do 1000 putand 100 take in a separate thread
for (int i = 0; i < OP_COUNT; ++i) {
hrq.put(new ConflatableObject(new Long(i), new Long(i),
new EventID(new byte[] {0}, 1, i), false, "dummy"));
opThreads = new Thread[1];
opThreads[0] = new Thread(new Runnable() {
public void run() {
for (int i = 0; i < OP_COUNT; ++i) {
try {
Object o = hrq.take();
if (o == null) {
} catch (InterruptedException e) {
throw new AssertionError(e);
} catch (Exception e) {
throw new AssertionError(e);
CacheSerializableRunnable createQueues =
new CacheSerializableRunnable("CreateCache, HARegionQueue ") {
public void run2() throws CacheException {
createQueuesThread = Thread.currentThread();
HARegionQueueDUnitTest test = new HARegionQueueDUnitTest();
// System.getProperties().put("QueueRemovalThreadWaitTime",
// new Long(120000));
cache = test.createCache();
HARegionQueueAttributes hrqa = new HARegionQueueAttributes();
try {
hrq = HARegionQueue.getHARegionQueueInstance(
"testNPEDueToHARegionQueueEscapeInConstructor", cache, hrqa,
HARegionQueue.NON_BLOCKING_HA_QUEUE, false);
} catch (Exception e) {
throw new AssertionError(e);
CacheSerializableRunnable waitForCreateQueuesThread =
new CacheSerializableRunnable("joinCreateCache") {
public void run2() {
WaitCriterion ev = new WaitCriterion() {
public boolean done() {
return createQueuesThread != null;
public String description() {
return null;
join(createQueuesThread, 300 * 1000);
CacheSerializableRunnable joinWithThread =
new CacheSerializableRunnable("CreateCache, HARegionQueue join with thread") {
public void run2() throws CacheException {
if (opThreads[0].isInterrupted()) {
fail("The test has failed as it encountered interrupts in puts & takes");
ThreadUtils.join(opThreads[0], 30 * 1000);
private static class RunOp extends Thread {
private static final int PUT = 1;
private static final int TAKE = 2;
private static final int PEEK = 3;
private static final int BATCH_PEEK = 4;
private int opType;
private int threadID;
private int numOpsPerformed = 0;
public RunOp(int opType, int id) {
super("ID=" + id + ",Op=" + opType);
this.opType = opType;
this.threadID = id;
public int getNumOpsPerformed() {
return numOpsPerformed;
public void run() {
Region rgn = cache.getRegion("test_region");
int counter = 0;
LogWriter logger = cache.getLogger();
Conflatable cnf;
try {
while (toCnt) {
// Thread.currentThread().getName() + " before doing operation of
// type= "+ this.opType);
switch (opType) {
case PUT:
rgn.put("key" + threadID, "val" + counter++);
if (counter == 10)
counter = 0;
case TAKE:
cnf = (Conflatable) hrq.take();
if (logger.fineEnabled() && cnf != null) {
logger.fine("Object retrieved by take has key =" + cnf.getKeyToConflate()
+ " and value as" + cnf.getValueToConflate());
case PEEK:
cnf = (Conflatable) hrq.peek();
if (logger.fineEnabled() && cnf != null) {
logger.fine("Object retrieved by peek has key =" + cnf.getKeyToConflate()
+ " and value as" + cnf.getValueToConflate());
// Thread.currentThread().getName() + " before doing remove= "+
// this.opType);
List confList = hrq.peek(3, 2000);
if (logger.fineEnabled() && confList != null) {
logger.fine("Object retrieved by batch peek are =" + confList);
// Thread.currentThread().getName() + " before doing remove= "+
// this.opType);
// Thread.currentThread().getName() + " after Operation of type= "+
// this.opType);
} catch (Exception e) {
throw new AssertionError(e);
* Checks the data received by GII, only gets expired after proper construction of HARegionQueue
* object.
private static void createHARegionQueueandCheckExpiration() throws Exception {
HARegionQueueDUnitTest test = new HARegionQueueDUnitTest();
cache = test.createCache();
HARegionQueueAttributes attrs = new HARegionQueueAttributes();
hrq = getHARegionQueueInstance("HARegionQueueDUnitTest_region", cache, attrs,
// wait until we have a dead
// server
WaitCriterion ev = new WaitCriterion() {
public boolean done() {
return hrq.getAvailableIds().size() == 0;
public String description() {
return null;
// assertIndexDetailsEquals(0, hrq.getAvailableIds().size());
public void testForDuplicateEvents() throws Exception {
vm0.invoke(() -> HARegionQueueDUnitTest.createRegionQueue());
vm1.invoke(() -> HARegionQueueDUnitTest.createRegionQueueandCheckDuplicates());
* HARegionQueue should not allow data with duplicate EventIds.
private static void createRegionQueueandCheckDuplicates() throws Exception {
HARegionQueueDUnitTest test = new HARegionQueueDUnitTest();
cache = test.createCache();
hrq = HARegionQueue.getHARegionQueueInstance("HARegionQueueDUnitTest_region", cache,
HARegionQueue.NON_BLOCKING_HA_QUEUE, false);
assertEquals(2, hrq.size());
EventID id1 = new EventID(new byte[] {1}, 1, 1);
EventID id2 = new EventID(new byte[] {1}, 1, 2);
ConflatableObject c1 =
new ConflatableObject("1", "1", id1, false, "HARegionQueueDUnitTest_region");
ConflatableObject c2 =
new ConflatableObject("2", "2", id2, false, "HARegionQueueDUnitTest_region");
// HARegion size should be 2 as data with same EventIDs is inserted into the queue
assertEquals(2, hrq.size());