| /*========================================================================= |
| * Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved. |
| * This product is protected by U.S. and international copyright |
| * and intellectual property laws. Pivotal products are covered by |
| * one or more patents listed at http://www.pivotal.io/patents. |
| *========================================================================= |
| */ |
| package com.gemstone.gemfire.internal.cache.ha; |
| |
| import java.util.Properties; |
| |
| import org.junit.After; |
| import org.junit.Before; |
| import org.junit.Test; |
| import org.junit.experimental.categories.Category; |
| |
| import static org.junit.Assert.*; |
| |
| import junit.framework.TestCase; |
| |
| import com.gemstone.gemfire.cache.Cache; |
| import com.gemstone.gemfire.cache.CacheFactory; |
| import com.gemstone.gemfire.distributed.DistributedSystem; |
| import com.gemstone.gemfire.internal.cache.EventID; |
| import com.gemstone.gemfire.internal.AvailablePort; |
| import com.gemstone.gemfire.test.junit.categories.IntegrationTest; |
| |
| import dunit.DistributedTestCase; |
| import dunit.DistributedTestCase.WaitCriterion; |
| |
| @Category(IntegrationTest.class) |
| public class BlockingHARegionJUnitTest |
| { |
| static Cache cache = null; |
| |
| @Before |
| public void setUp() throws Exception |
| { |
| Properties props = new Properties(); |
| props.setProperty("mcast-port", "0"); |
| if (cache != null) { |
| cache.close(); // fault tolerance |
| } |
| cache = CacheFactory.create(DistributedSystem |
| .connect(props)); |
| } |
| |
| /** |
| * This test has a scenario where the HAReqionQueue capacity is just 1. There will |
| * be two thread. One doing a 1000 puts and the other doing a 1000 takes. The validation |
| * for this test is that it should not encounter any exceptions |
| * |
| */ |
| @Test |
| public void testBoundedPuts() |
| { |
| try { |
| exceptionOccured = false; |
| HARegionQueueAttributes harqa = new HARegionQueueAttributes(); |
| harqa.setBlockingQueueCapacity(1); |
| HARegionQueue hrq = HARegionQueue.getHARegionQueueInstance( |
| "BlockingHARegionJUnitTest_Region", cache, harqa, |
| HARegionQueue.BLOCKING_HA_QUEUE, false); |
| hrq.setPrimary(true);//fix for 40314 - capacity constraint is checked for primary only. |
| Thread thread1 = new DoPuts(hrq,1000); |
| Thread thread2 = new DoTake(hrq,1000); |
| |
| thread1.start(); |
| thread2.start(); |
| |
| DistributedTestCase.join(thread1, 30 * 1000, null); |
| DistributedTestCase.join(thread2, 30 * 1000, null); |
| |
| if (exceptionOccured) { |
| fail(" Test failed due to " + exceptionString); |
| } |
| |
| cache.close(); |
| |
| } |
| catch (Exception e) { |
| fail(" Test encountered an exception "+e); |
| } |
| |
| } |
| |
| /** |
| * This test tests whether puts are blocked. There are two threads. One which is going |
| * to do 2 puts and one which is going to do take a single take. The capacity of the |
| * region is just 1. The put thread is first started and it is then ensured that only one |
| * put has successfully made through and that the thread is still alive. Then the take thread |
| * is started. This will cause the region size to come down by one and the put thread waiting |
| * will go ahead and do the put. The thread should then die and the region size should be validated |
| * to reflect that. |
| * |
| */ |
| @Test |
| public void testPutBeingBlocked() |
| { |
| try { |
| exceptionOccured = false; |
| quitForLoop = false; |
| HARegionQueueAttributes harqa = new HARegionQueueAttributes(); |
| harqa.setBlockingQueueCapacity(1); |
| final HARegionQueue hrq = HARegionQueue.getHARegionQueueInstance( |
| "BlockingHARegionJUnitTest_Region", cache, harqa, |
| HARegionQueue.BLOCKING_HA_QUEUE, false); |
| hrq.setPrimary(true);//fix for 40314 - capacity constraint is checked for primary only. |
| final Thread thread1 = new DoPuts(hrq,2); |
| thread1.start(); |
| WaitCriterion ev = new WaitCriterion() { |
| public boolean done() { |
| return hrq.region.size() == 2; |
| } |
| public String description() { |
| return null; |
| } |
| }; |
| DistributedTestCase.waitForCriterion(ev, 1000, 200, true); |
| assertTrue(thread1.isAlive()); //thread should still be alive (in wait state) |
| |
| Thread thread2 = new DoTake(hrq,1); |
| thread2.start(); //start take thread |
| ev = new WaitCriterion() { |
| public boolean done() { |
| return hrq.region.size() == 3; |
| } |
| public String description() { |
| return null; |
| } |
| }; |
| //sleep. take will proceed and so will sleeping put |
| DistributedTestCase.waitForCriterion(ev, 3 * 1000, 200, true); |
| |
| // thread should have died since put should have proceeded |
| ev = new WaitCriterion() { |
| public boolean done() { |
| return !thread1.isAlive(); |
| } |
| public String description() { |
| return "thread1 still alive"; |
| } |
| }; |
| DistributedTestCase.waitForCriterion(ev, 30 * 1000, 1000, true); |
| |
| DistributedTestCase.join(thread1, 30 * 1000, null); // for completeness |
| DistributedTestCase.join(thread2, 30 * 1000, null); |
| if (exceptionOccured) { |
| fail(" Test failed due to " + exceptionString); |
| } |
| cache.close(); |
| } |
| catch (Exception e) { |
| fail(" Test encountered an exception "+e); |
| } |
| } |
| |
| |
| /** |
| * This test tests that the region capacity is never exceeded even in highly |
| * concurrent environments. The region capacity is set to 10000. Then 5 threads start doing |
| * put simultaneously. They will reach a state where the queue is full and they will all |
| * go in a wait state. the region size would be verified to be 20000 (10000 puts and 10000 DACE objects). |
| * then the threads are interrupted and made to quit the loop |
| * |
| */ |
| @Test |
| public void testConcurrentPutsNotExceedingLimit() |
| { |
| try { |
| exceptionOccured = false; |
| quitForLoop = false; |
| HARegionQueueAttributes harqa = new HARegionQueueAttributes(); |
| harqa.setBlockingQueueCapacity(10000); |
| final HARegionQueue hrq = HARegionQueue.getHARegionQueueInstance( |
| "BlockingHARegionJUnitTest_Region", cache, harqa, |
| HARegionQueue.BLOCKING_HA_QUEUE, false); |
| hrq.setPrimary(true);//fix for 40314 - capacity constraint is checked for primary only. |
| Thread thread1 = new DoPuts(hrq,20000,1); |
| Thread thread2 = new DoPuts(hrq,20000,2); |
| Thread thread3 = new DoPuts(hrq,20000,3); |
| Thread thread4 = new DoPuts(hrq,20000,4); |
| Thread thread5 = new DoPuts(hrq,20000,5); |
| |
| thread1.start(); |
| thread2.start(); |
| thread3.start(); |
| thread4.start(); |
| thread5.start(); |
| |
| WaitCriterion ev = new WaitCriterion() { |
| public boolean done() { |
| return hrq.region.size() == 20000; |
| } |
| public String description() { |
| return null; |
| } |
| }; |
| DistributedTestCase.waitForCriterion(ev, 30 * 1000, 200, true); |
| |
| assertTrue(thread1.isAlive()); |
| assertTrue(thread2.isAlive()); |
| assertTrue(thread3.isAlive()); |
| assertTrue(thread4.isAlive()); |
| assertTrue(thread5.isAlive()); |
| |
| assertTrue(hrq.region.size()==20000); |
| |
| quitForLoop = true; |
| Thread.sleep(20000); |
| |
| thread1.interrupt(); |
| thread2.interrupt(); |
| thread3.interrupt(); |
| thread4.interrupt(); |
| thread5.interrupt(); |
| |
| Thread.sleep(2000); |
| |
| DistributedTestCase.join(thread1, 5 * 60 * 1000, null); |
| DistributedTestCase.join(thread2, 5 * 60 * 1000, null); |
| DistributedTestCase.join(thread3, 5 * 60 * 1000, null); |
| DistributedTestCase.join(thread4, 5 * 60 * 1000, null); |
| DistributedTestCase.join(thread5, 5 * 60 * 1000, null); |
| |
| cache.close(); |
| } |
| catch (Exception e) { |
| fail(" Test encountered an exception "+e); |
| } |
| } |
| |
| /** |
| * This test tests that the region capacity is never exceeded even in highly |
| * concurrent environments. The region capacity is set to 10000. Then 5 threads start doing |
| * put simultaneously. They will reach a state where the queue is full and they will all |
| * go in a wait state. the region size would be verified to be 20000 (10000 puts and 10000 DACE objects). |
| * then the threads are interrupted and made to quit the loop |
| * |
| *TODO: |
| * |
| */ |
| public void _testConcurrentPutsTakesNotExceedingLimit() |
| { |
| try { |
| exceptionOccured = false; |
| quitForLoop = false; |
| HARegionQueueAttributes harqa = new HARegionQueueAttributes(); |
| harqa.setBlockingQueueCapacity(10000); |
| final HARegionQueue hrq = HARegionQueue.getHARegionQueueInstance( |
| "BlockingHARegionJUnitTest_Region", cache, harqa, |
| HARegionQueue.BLOCKING_HA_QUEUE, false); |
| Thread thread1 = new DoPuts(hrq,40000,1); |
| Thread thread2 = new DoPuts(hrq,40000,2); |
| Thread thread3 = new DoPuts(hrq,40000,3); |
| Thread thread4 = new DoPuts(hrq,40000,4); |
| Thread thread5 = new DoPuts(hrq,40000,5); |
| |
| Thread thread6 = new DoTake(hrq,5000); |
| Thread thread7 = new DoTake(hrq,5000); |
| Thread thread8 = new DoTake(hrq,5000); |
| Thread thread9 = new DoTake(hrq,5000); |
| Thread thread10 = new DoTake(hrq,5000); |
| |
| thread1.start(); |
| thread2.start(); |
| thread3.start(); |
| thread4.start(); |
| thread5.start(); |
| |
| thread6.start(); |
| thread7.start(); |
| thread8.start(); |
| thread9.start(); |
| thread10.start(); |
| |
| DistributedTestCase.join(thread6, 30 * 1000, null); |
| DistributedTestCase.join(thread7, 30 * 1000, null); |
| DistributedTestCase.join(thread8, 30 * 1000, null); |
| DistributedTestCase.join(thread9, 30 * 1000, null); |
| DistributedTestCase.join(thread10, 30 * 1000, null); |
| |
| WaitCriterion ev = new WaitCriterion() { |
| public boolean done() { |
| return hrq.region.size() == 20000; |
| } |
| public String description() { |
| return null; |
| } |
| }; |
| DistributedTestCase.waitForCriterion(ev, 30 * 1000, 200, true); |
| |
| assertTrue(thread1.isAlive()); |
| assertTrue(thread2.isAlive()); |
| assertTrue(thread3.isAlive()); |
| assertTrue(thread4.isAlive()); |
| assertTrue(thread5.isAlive()); |
| |
| assertTrue(hrq.region.size()==20000); |
| |
| quitForLoop = true; |
| |
| Thread.sleep(2000); |
| |
| thread1.interrupt(); |
| thread2.interrupt(); |
| thread3.interrupt(); |
| thread4.interrupt(); |
| thread5.interrupt(); |
| |
| Thread.sleep(2000); |
| |
| |
| DistributedTestCase.join(thread1, 30 * 1000, null); |
| DistributedTestCase.join(thread2, 30 * 1000, null); |
| DistributedTestCase.join(thread3, 30 * 1000, null); |
| DistributedTestCase.join(thread4, 30 * 1000, null); |
| DistributedTestCase.join(thread5, 30 * 1000, null); |
| |
| cache.close(); |
| } |
| catch (Exception e) { |
| fail(" Test encountered an exception "+e); |
| } |
| } |
| |
| |
| /** |
| * Tests the bug in HARegionQueue where the take side put permit is not being |
| * incremented when the event arriving at the queue which has optimistically |
| * decreased the put permit, is not incrementing the take permit if the event |
| * has a sequence ID less than the last dispatched sequence ID. This event is |
| * rightly rejected from entering the queue but the take permit also needs to |
| * increase & a notify issued |
| * |
| */ |
| @Test |
| public void testHARQMaxCapacity_Bug37627() |
| { |
| try { |
| exceptionOccured = false; |
| quitForLoop = false; |
| HARegionQueueAttributes harqa = new HARegionQueueAttributes(); |
| harqa.setBlockingQueueCapacity(1); |
| harqa.setExpiryTime(180); |
| final HARegionQueue hrq = HARegionQueue.getHARegionQueueInstance( |
| "BlockingHARegionJUnitTest_Region", cache, harqa, |
| HARegionQueue.BLOCKING_HA_QUEUE, false); |
| hrq.setPrimary(true);//fix for 40314 - capacity constraint is checked for primary only. |
| final EventID id1 = new EventID(new byte[] { 1 }, 1, 2); // violation |
| final EventID ignore = new EventID(new byte[] { 1 }, 1, 1); // |
| final EventID id2 = new EventID(new byte[] { 1 }, 1, 3); // |
| Thread t1 = new Thread() { |
| public void run() |
| { |
| try { |
| hrq.put(new ConflatableObject("key1", "value1", id1, false, |
| "region1")); |
| hrq.take(); |
| hrq.put(new ConflatableObject("key2", "value1", ignore, false, |
| "region1")); |
| hrq.put(new ConflatableObject("key3", "value1", id2, false, |
| "region1")); |
| } |
| catch (Exception e) { |
| exceptionString.append("First Put in region queue failed"); |
| exceptionOccured = true; |
| } |
| } |
| }; |
| t1.start(); |
| DistributedTestCase.join(t1, 20 * 1000, null); |
| if (exceptionOccured) { |
| fail(" Test failed due to " + exceptionString); |
| } |
| } |
| catch (Exception e) { |
| fail(" Test failed due to " + e); |
| } |
| finally { |
| if (cache != null) { |
| cache.close(); |
| } |
| } |
| |
| } |
| |
| |
| |
| /** boolean to record an exception occurence in another thread**/ |
| static volatile boolean exceptionOccured = false; |
| /** StringBuffer to store the exception**/ |
| static StringBuffer exceptionString = new StringBuffer(); |
| /** boolen to quit the for loop**/ |
| static volatile boolean quitForLoop = false; |
| |
| /** |
| * class which does specified number of puts on the queue |
| * @author mbid |
| * |
| */ |
| static class DoPuts extends Thread |
| { |
| HARegionQueue regionQueue = null; |
| final int numberOfPuts; |
| DoPuts(HARegionQueue haRegionQueue, int numberOfPuts) { |
| this.regionQueue = haRegionQueue; |
| this.numberOfPuts = numberOfPuts; |
| } |
| /** |
| * region id can be specified to generate Thread unique events |
| */ |
| int regionId = 0; |
| DoPuts(HARegionQueue haRegionQueue, int numberOfPuts, int regionId) { |
| this.regionQueue = haRegionQueue; |
| this.numberOfPuts = numberOfPuts; |
| this.regionId = regionId; |
| } |
| |
| public void run() |
| { |
| for (int i = 0; i < numberOfPuts; i++) { |
| try { |
| this.regionQueue.put(new ConflatableObject("" + i, "" + i, |
| new EventID(new byte[regionId], i, i), false, |
| "BlockingHARegionJUnitTest_Region")); |
| if(quitForLoop){ |
| break; |
| } |
| if (Thread.currentThread().isInterrupted()) { |
| break; |
| } |
| } |
| catch (Exception e) { |
| exceptionOccured = true; |
| exceptionString.append(" Exception occured due to " + e); |
| } |
| } |
| } |
| } |
| |
| /** |
| * class which does a specified number of takes |
| * @author mbid |
| * |
| */ |
| static class DoTake extends Thread |
| { |
| final HARegionQueue regionQueue; |
| final int numberOfTakes; |
| |
| DoTake(HARegionQueue haRegionQueue, int numberOfTakes) { |
| this.regionQueue = haRegionQueue; |
| this.numberOfTakes = numberOfTakes; |
| } |
| |
| public void run() |
| { |
| for (int i = 0; i < numberOfTakes; i++) { |
| try { |
| assertNotNull(this.regionQueue.take()); |
| } |
| catch (Exception e) { |
| exceptionOccured = true; |
| exceptionString.append(" Exception occured due to " + e); |
| } |
| } |
| } |
| } |
| |
| } |