| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more contributor license |
| * agreements. See the NOTICE file distributed with this work for additional information regarding |
| * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance with the License. You may obtain a |
| * copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software distributed under the License |
| * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
| * or implied. See the License for the specific language governing permissions and limitations under |
| * the License. |
| */ |
| package org.apache.geode.internal.cache.ha; |
| |
| import static org.apache.geode.distributed.ConfigurationProperties.MCAST_PORT; |
| import static org.apache.geode.test.awaitility.GeodeAwaitility.await; |
| import static org.junit.Assert.assertEquals; |
| import static org.junit.Assert.assertFalse; |
| import static org.junit.Assert.assertNotNull; |
| import static org.junit.Assert.assertNull; |
| import static org.junit.Assert.assertTrue; |
| import static org.junit.Assert.fail; |
| |
| import java.io.IOException; |
| import java.util.Iterator; |
| import java.util.List; |
| import java.util.Map; |
| |
| import org.apache.logging.log4j.Logger; |
| import org.junit.After; |
| import org.junit.Before; |
| import org.junit.Ignore; |
| import org.junit.Test; |
| import org.junit.experimental.categories.Category; |
| |
| import org.apache.geode.LogWriter; |
| import org.apache.geode.cache.AttributesFactory; |
| import org.apache.geode.cache.CacheException; |
| import org.apache.geode.cache.CacheFactory; |
| import org.apache.geode.cache.CacheListener; |
| import org.apache.geode.cache.DataPolicy; |
| import org.apache.geode.cache.EntryEvent; |
| import org.apache.geode.cache.Scope; |
| import org.apache.geode.cache.util.CacheListenerAdapter; |
| import org.apache.geode.internal.cache.Conflatable; |
| import org.apache.geode.internal.cache.EventID; |
| import org.apache.geode.internal.cache.InternalCache; |
| import org.apache.geode.internal.cache.ha.HARegionQueue.MapWrapper; |
| import org.apache.geode.internal.logging.LogService; |
| import org.apache.geode.test.dunit.ThreadUtils; |
| import org.apache.geode.test.junit.categories.ClientSubscriptionTest; |
| |
| /** |
| * Test to verify Add operation to HARegion Queue with and without conflation. |
| */ |
| @Category({ClientSubscriptionTest.class}) |
| public class HARQAddOperationJUnitTest { |
| private static final Logger logger = LogService.getLogger(); |
| |
| /** The cache instance */ |
| protected InternalCache cache = null; |
| |
| /** Logger for this test */ |
| protected LogWriter logWriter = null; |
| |
| /** The <code>RegionQueue</code> instance */ |
| private HARegionQueue rq = null; |
| |
| protected static final String KEY1 = "Key-1"; |
| |
| protected static final String KEY2 = "Key-2"; |
| |
| protected static final String VALUE1 = "Value-1"; |
| |
| protected static final String VALUE2 = "Value-2"; |
| |
| protected boolean testFailed = false; |
| |
| protected StringBuffer message = null; |
| |
| protected int barrierCount = 0; |
| |
| static volatile int expiryCount = 0; |
| |
| @Before |
| public void setUp() throws Exception { |
| this.cache = createCache(); |
| this.logWriter = cache.getLogger(); |
| } |
| |
| @After |
| public void tearDown() throws Exception { |
| this.cache.close(); |
| } |
| |
| /** |
| * Creates the cache instance for the test |
| */ |
| private InternalCache createCache() throws CacheException { |
| return (InternalCache) new CacheFactory().set(MCAST_PORT, "0").create(); |
| } |
| |
| /** |
| * Creates HA region-queue object |
| */ |
| protected HARegionQueue createHARegionQueue(String name) |
| throws IOException, ClassNotFoundException, CacheException, InterruptedException { |
| AttributesFactory factory = new AttributesFactory(); |
| factory.setDataPolicy(DataPolicy.REPLICATE); |
| factory.setScope(Scope.DISTRIBUTED_ACK); |
| HARegionQueue regionqueue = HARegionQueue.getHARegionQueueInstance(name, cache, |
| HARegionQueue.NON_BLOCKING_HA_QUEUE, false); |
| return regionqueue; |
| } |
| |
| /** |
| * Creates HA region-queue object |
| */ |
| protected HARegionQueue createHARegionQueue(String name, HARegionQueueAttributes attrs) |
| throws IOException, ClassNotFoundException, CacheException, InterruptedException { |
| HARegionQueue regionqueue = HARegionQueue.getHARegionQueueInstance(name, cache, attrs, |
| HARegionQueue.NON_BLOCKING_HA_QUEUE, false); |
| return regionqueue; |
| } |
| |
| /** |
| * Add operation with conflation : 1) Add Key1 Val1 & then Key1 Val2. 2) At the end of second |
| * operation , Region should contain the entry correspodning to value 2 3) Available IDs , Last |
| * DispatchedWrapper Set & Conflation Map should have size 1. 4) Conflation Map , |
| * LastDispatchedWrapper Set & Available IDs should have counter corresponding to second operation |
| */ |
| @Test |
| public void testQueueAddOperationWithConflation() throws Exception { |
| this.logWriter.info("HARegionQueueJUnitTest : testQueueAddOperationWithConflation BEGIN"); |
| this.rq = createHARegionQueue("testQueueAddOperationWithConflation"); |
| EventID id1 = new EventID(new byte[] {1}, 1, 1); |
| EventID id2 = new EventID(new byte[] {1}, 1, 2); |
| ConflatableObject c1 = new ConflatableObject(KEY1, VALUE1, id1, true, "region1"); |
| ConflatableObject c2 = new ConflatableObject(KEY1, VALUE2, id2, true, "region1"); |
| this.rq.put(c1); |
| this.rq.put(c2); |
| Map conflationMap = (Map) rq.getConflationMapForTesting().get("region1"); |
| assertEquals(1, conflationMap.size()); |
| Long cntr = (Long) conflationMap.get(KEY1); |
| ConflatableObject retValue = (ConflatableObject) rq.getRegion().get(cntr); |
| assertEquals(VALUE2, retValue.getValueToConflate()); |
| assertEquals(1, rq.getAvailableIds().size()); |
| |
| assertEquals(1, rq.getCurrentCounterSet(id1).size()); |
| this.logWriter.info("HARegionQueueJUnitTest : testQueueAddOperationWithConflation END"); |
| } |
| |
| /** |
| * Add operation without conflation : 1) Region should have an entry containing counter vs |
| * Conflatable object. 2) Events Map should have size as 1 with one ThreadIdentifer objecta s key |
| * & Last DispatchedWrapper as value. 3) This wrapper should have a set with size 1. 4) The |
| * available IDs set shoudl have size 1. 5) Put another object by same thread. 6) The wrapper set |
| * & availableIs List should have size 2 . |
| */ |
| @Test |
| public void testQueueAddOperationWithoutConflation() throws Exception { |
| this.logWriter.info("HARegionQueueJUnitTest : testQueueAddOperationWithoutConflation BEGIN"); |
| this.rq = createHARegionQueue("testQueueAddOperationWithConflation"); |
| EventID id1 = new EventID(new byte[] {1}, 1, 1); |
| EventID id2 = new EventID(new byte[] {1}, 1, 2); |
| ConflatableObject c1 = new ConflatableObject(KEY1, VALUE1, id1, false, "region1"); |
| ConflatableObject c2 = new ConflatableObject(KEY2, VALUE2, id2, false, "region1"); |
| this.rq.put(c1); |
| |
| assertNull(rq.getConflationMapForTesting().get("region1")); |
| assertEquals(1, rq.getAvailableIds().size()); |
| |
| assertEquals(1, rq.getCurrentCounterSet(id1).size()); |
| |
| this.rq.put(c2); |
| assertNull(rq.getConflationMapForTesting().get("region1")); |
| assertEquals(2, rq.getAvailableIds().size()); |
| assertEquals(2, rq.getCurrentCounterSet(id1).size()); |
| |
| Iterator iter = rq.getCurrentCounterSet(id1).iterator(); |
| if (iter.hasNext()) { |
| Long cntr = (Long) iter.next(); |
| ConflatableObject co = (ConflatableObject) this.rq.getRegion().get(cntr); |
| assertEquals(KEY1, co.getKeyToConflate()); |
| assertEquals(VALUE1, co.getValueToConflate()); |
| } |
| if (iter.hasNext()) { |
| Long cntr = (Long) iter.next(); |
| ConflatableObject co = (ConflatableObject) this.rq.getRegion().get(cntr); |
| assertEquals(KEY2, co.getKeyToConflate()); |
| assertEquals(VALUE2, co.getValueToConflate()); |
| } |
| this.logWriter.info("HARegionQueueJUnitTest : testQueueAddOperationWithoutConflation END"); |
| } |
| |
| /** |
| * Add operation without conflation followed by a Take operation: RegionSize, available IDs , |
| * LastDispatchedWrapper's Set should have size 0. Events map containg should have size 1 ( |
| * corresponding to the lastDispatchedAndCurrentEvent Wrapper objcet) |
| */ |
| @Test |
| public void testQueueAddTakeOperationWithoutConflation() throws Exception { |
| this.logWriter |
| .info("HARegionQueueJUnitTest : testQueueAddTakeOperationWithoutConflation BEGIN"); |
| |
| this.rq = createHARegionQueue("testQueueAddOperationWithConflation"); |
| EventID id = new EventID(new byte[] {1}, 1, 1); |
| ConflatableObject obj = new ConflatableObject(KEY1, VALUE1, id, true, "region1"); |
| this.rq.put(obj); |
| this.rq.take(); |
| assertNull(rq.getRegion().get(KEY1)); |
| assertEquals(0, this.rq.getAvailableIds().size()); |
| Map eventsMap = this.rq.getEventsMapForTesting(); |
| assertEquals(1, eventsMap.size()); |
| assertEquals(0, rq.getCurrentCounterSet(id).size()); |
| this.logWriter.info("HARegionQueueJUnitTest : testQueueAddTakeOperationWithoutConflation END"); |
| } |
| |
| /** |
| * (Add opn followed by Take without conflation) :Add operation which creates the |
| * LastDispatchedAndCurrentEvents object should also add it to the Region with Threaddentifer as |
| * key & sequence as the value for Expiry. Perform a take operation. Validate that expiry on |
| * ThreadIdentifier removes itself from Events Map |
| */ |
| @Test |
| public void testExpiryOnThreadIdentifier() { |
| try { |
| HARegionQueueAttributes attrs = new HARegionQueueAttributes(); |
| attrs.setExpiryTime(2); |
| HARegionQueue regionqueue = createHARegionQueue("testing", attrs); |
| // create the conflatable object |
| EventID id = new EventID(new byte[] {1}, 1, 1); |
| ConflatableObject obj = new ConflatableObject(KEY1, VALUE1, id, true, "region1"); |
| |
| ThreadIdentifier threadId = |
| new ThreadIdentifier(obj.getEventId().getMembershipID(), obj.getEventId().getThreadID()); |
| regionqueue.put(obj); |
| regionqueue.take(); |
| Thread.sleep(25000); |
| assertFalse( |
| "ThreadIdentifier did not remove itself through expiry.The reqgion queue is of type=" |
| + regionqueue.getClass(), |
| regionqueue.getRegion().containsKey(threadId)); |
| Map eventsMap = regionqueue.getEventsMapForTesting(); |
| assertNull("expiry action on ThreadIdentifier did not remove itself from eventsMap", |
| eventsMap.get(threadId)); |
| |
| } catch (Exception e) { |
| fail(" test failed due to " + e); |
| } |
| } |
| |
| /** |
| * Add operation which creates the LastDispatchedAndCurrentEvents object should also add it to the |
| * Region with Threaddentifer as key & sequence as the value for Expiry. Validate that expiry on |
| * ThreadIdentifier does not removes itself from Events Map as data is lying & it resets the self |
| * expiry. Validate the data present in Queue experiences expiry. After the expiry of the data , |
| * AvaialbleIds size should be 0, entry removed from Region, LastDispatchedWrapperSet should have |
| * size 0. |
| */ |
| @Test |
| public void testNoExpiryOnThreadIdentifier() { |
| try { |
| HARegionQueueAttributes hqa = new HARegionQueueAttributes(); |
| hqa.setExpiryTime(8); |
| HARegionQueue regionqueue = createHARegionQueue("testing", hqa); |
| EventID id1 = new EventID(new byte[] {1}, 1, 1); |
| EventID id2 = new EventID(new byte[] {1}, 1, 2); |
| ConflatableObject c1 = new ConflatableObject(KEY1, VALUE1, id1, true, "region1"); |
| ConflatableObject c2 = new ConflatableObject(KEY1, VALUE2, id2, true, "region1"); |
| ThreadIdentifier threadId = |
| new ThreadIdentifier(c1.getEventId().getMembershipID(), c1.getEventId().getThreadID()); |
| |
| regionqueue.put(c1); |
| Object o = regionqueue.take(); |
| assertNotNull(o); |
| // wait for some time and put second object |
| Thread.sleep(3000); |
| regionqueue.put(c2); |
| // wait for some more time so that C2 has not expired |
| Thread.sleep(4000); |
| |
| Map eventsMap = regionqueue.getEventsMapForTesting(); |
| |
| // verify that ThreadIdentifier does not remove itself as data is |
| // lying |
| assertNotNull( |
| "ThreadIdentifier removed itself through expiry even though data was lying in the queue", |
| eventsMap.get(threadId)); |
| |
| // After the expiry of the data , AvaialbleIds size should be 0, |
| // entry |
| // removed from Region, LastDispatchedWrapperSet should have size 0. |
| await() |
| .untilAsserted(() -> assertEquals(0, regionqueue.getRegion().entrySet(false).size())); |
| assertEquals(0, regionqueue.getAvailableIds().size()); |
| assertNull(regionqueue.getCurrentCounterSet(id1)); |
| |
| } catch (Exception e) { |
| e.printStackTrace(); |
| fail(" test failed due to " + e); |
| } |
| } |
| |
| /** |
| * Multiple arrivals of QRM for the same thread id of the client.Intially queue contains objects |
| * from 1- 10. QRM with sequenceID 5 arrives It should remove only remove objects for 1- 5. Then |
| * sequenceID 10 come which should remove 5-10. |
| */ |
| @Test |
| public void testMultipleQRMArrival() throws Exception { |
| HARegionQueue regionqueue = createHARegionQueue("testNoExpiryOnThreadIdentifier"); |
| |
| EventID[] ids = new EventID[10]; |
| for (int i = 0; i < 10; i++) { |
| ids[i] = new EventID(new byte[] {1}, 1, i + 1); |
| } |
| for (int i = 0; i < 10; i++) { |
| regionqueue.put(new ConflatableObject("KEY " + i, "VALUE" + i, ids[i], true, "region1")); |
| } |
| |
| // Available id size should be == 10 after puting ten entries |
| assertEquals(10, regionqueue.getAvailableIds().size()); |
| |
| // QRM message for therad id 1 and last sequence id 5 |
| regionqueue.removeDispatchedEvents(ids[4]); |
| assertEquals(5, regionqueue.getAvailableIds().size()); |
| assertEquals(5, regionqueue.getCurrentCounterSet(ids[0]).size()); |
| |
| Iterator iter = regionqueue.getCurrentCounterSet(ids[0]).iterator(); |
| while (iter.hasNext()) { |
| Long cntr = (Long) iter.next(); |
| ConflatableObject co = (ConflatableObject) regionqueue.getRegion().get(cntr); |
| assertTrue(co.getEventId().getSequenceID() > 5); |
| } |
| |
| regionqueue.removeDispatchedEvents(ids[9]); |
| assertEquals(0, regionqueue.getAvailableIds().size()); |
| } |
| |
| /** |
| * Concurrent arrival of put & QRM messagge for a ThreadIdentifier coming for 1st time. The |
| * LastDispatchedObject gets operated first by the put thread , the QRM thread should be blocked |
| * till the completion of add operation. Thus before QRM thread acts , the object should be |
| * present in the lastDispatchedSet & AvailableID. Then the QRM thread gets unblocked , it should |
| * remove from the available ID. |
| */ |
| @Test |
| public void testConcurrentPutAndQRM() throws Exception { |
| testFailed = false; |
| message = new StringBuffer(); |
| final HARegionQueue regionqueue = createHARegionQueue("testConcurrentPutAndQRM"); |
| final EventID id1 = new EventID(new byte[] {1}, 1, 1); |
| final EventID id2 = new EventID(new byte[] {1}, 1, 2); |
| Thread t1 = new Thread() { |
| @Override |
| public void run() { |
| try { |
| regionqueue.put(new ConflatableObject(KEY1, VALUE1, id1, true, "region1")); |
| regionqueue.put(new ConflatableObject(KEY2, VALUE2, id2, true, "region1")); |
| } catch (Exception e) { |
| message.append("Put in region queue failed"); |
| testFailed = true; |
| } |
| } |
| }; |
| t1.setPriority(Thread.MAX_PRIORITY); |
| |
| Thread t2 = new Thread() { |
| @Override |
| public void run() { |
| try { |
| regionqueue.removeDispatchedEvents(id2); |
| } catch (Exception e) { |
| message.append("Removal by QRM in region queue failed"); |
| testFailed = true; |
| } |
| } |
| }; |
| |
| t1.start(); |
| t2.start(); |
| |
| ThreadUtils.join(t1, 180 * 1000); |
| ThreadUtils.join(t2, 180 * 1000); |
| |
| if (testFailed) |
| fail("Test failed due to " + message); |
| |
| assertEquals(0, regionqueue.getAvailableIds().size()); |
| assertEquals(2, regionqueue.getLastDispatchedSequenceId(id2)); |
| } |
| |
| /** |
| * Concurrent arrival of put & QRM messagge for a ThreadIdentifier coming for 1st time. The |
| * LastDispatchedObject gets operated first by the QRM thread , the Put thread should be blocked |
| * till the completion of QRM operation. Thus put thread should see that last Sequence is > than |
| * the current & hence the put operation shud remove from region without adding the ID anywhere. |
| */ |
| @Test |
| public void testConcurrentQRMAndPut() throws Exception { |
| testFailed = false; |
| final HARegionQueue regionqueue = createHARegionQueue("testConcurrentQRMAndPut"); |
| final EventID id1 = new EventID(new byte[] {1}, 1, 1); |
| final EventID id2 = new EventID(new byte[] {1}, 1, 2); |
| |
| Thread t1 = new Thread() { |
| @Override |
| public void run() { |
| try { |
| regionqueue.put(new ConflatableObject(KEY1, VALUE1, id1, true, "region1")); |
| regionqueue.put(new ConflatableObject(KEY2, VALUE2, id2, true, "region1")); |
| } catch (Exception e) { |
| message.append("Put in region queue failed"); |
| testFailed = true; |
| } |
| } |
| }; |
| |
| Thread t2 = new Thread() { |
| @Override |
| public void run() { |
| try { |
| regionqueue.removeDispatchedEvents(id2); |
| } catch (Exception e) { |
| message.append("Removal of Events by QRM in Region queue failed"); |
| testFailed = true; |
| } |
| } |
| }; |
| t2.setPriority(Thread.MAX_PRIORITY); |
| |
| t2.start(); |
| t1.start(); |
| |
| ThreadUtils.join(t1, 180 * 1000); |
| ThreadUtils.join(t2, 180 * 1000); |
| |
| if (testFailed) |
| fail("Test failed due to " + message); |
| |
| assertEquals(0, regionqueue.getAvailableIds().size()); |
| assertEquals(2, regionqueue.getLastDispatchedSequenceId(id2)); |
| } |
| |
| /** |
| * Two QRMs arriving such that higer sequence number arriving before lower sequence number. The |
| * lower squnce number should not set itself & also not do any checking on the IDs of the |
| * LinkedHashSet |
| */ |
| @Test |
| public void testEventMapPopulationForQRM() throws Exception { |
| HARegionQueue regionqueue = createHARegionQueue("testEventMapPopulationForQRM"); |
| EventID id1 = new EventID(new byte[] {1}, 1, 1); |
| EventID id2 = new EventID(new byte[] {1}, 1, 2); |
| |
| this.logWriter.info("RemoveDispatched event for sequence id : " + id2.getSequenceID()); |
| regionqueue.removeDispatchedEvents(id2); |
| this.logWriter.info("RemoveDispatched event for sequence id :" + id1.getSequenceID()); |
| regionqueue.removeDispatchedEvents(id1); |
| assertEquals( |
| "Size of eventMap should be 1 but actual size " + regionqueue.getEventsMapForTesting(), |
| regionqueue.getEventsMapForTesting().size(), 1); |
| this.logWriter.info("sequence id : " + regionqueue.getLastDispatchedSequenceId(id2)); |
| assertEquals("Last dispatched sequence id should be 2 but actual sequence id is ", |
| regionqueue.getLastDispatchedSequenceId(id2), id2.getSequenceID()); |
| this.logWriter.info("testEventMapPopulationForQRM() completed successfully"); |
| |
| } |
| |
| /** |
| * Concurrent arrival of put operations on different threadIDs but for same key . One should |
| * conflate the other. Whosoever confaltes , that ID should be present in the availableIDs list , |
| * Region , ConflationMap & its HashSet for that ThreadIdentifier. The ID which gets conflated |
| * should not be present in the availableID, Region & that ThreadIdentifier's HashSet . The |
| * conflation map should contain the Old IDs position. |
| */ |
| @Test |
| public void testCleanUpForConflation() throws Exception { |
| this.logWriter.info("HARQAddOperationJUnitTest : testCleanUpForConflation BEGIN"); |
| testFailed = false; |
| message = null; |
| final int numOfThreads = 10; |
| final int numOfPuts = 567; |
| final HARegionQueue regionqueue = createHARegionQueue("testCleanUpForConflation"); |
| |
| this.logWriter |
| .info("HARQAddOperationJUnitTest : testCleanUpForConflation after regionqueue create"); |
| /* |
| * doing concurrent put operations on different threadIDs but for the same key |
| */ |
| Thread[] threads = new Thread[10]; |
| for (int i = 0; i < numOfThreads; i++) { |
| final long ids = i; |
| threads[i] = new Thread() { |
| @Override |
| public void run() { |
| for (int j = 0; j < numOfPuts; j++) { |
| EventID id = new EventID(new byte[] {(byte) ids}, ids, j); |
| try { |
| regionqueue.put( |
| new ConflatableObject(KEY1, id.getThreadID() + "VALUE" + j, id, true, "region1")); |
| } catch (Exception ex) { |
| testFailed = true; |
| message.append("put failed for the threadId " + id.getThreadID()); |
| } |
| |
| } |
| |
| } |
| }; |
| } |
| |
| for (int k = 0; k < numOfThreads; k++) { |
| threads[k].start(); |
| } |
| |
| for (int k = 0; k < numOfThreads; k++) { |
| ThreadUtils.join(threads[k], 180 * 1000); |
| } |
| |
| this.logWriter.info("HARQAddOperationJUnitTest : testCleanUpForConflation after join"); |
| |
| if (testFailed) |
| fail("Test failed due to " + message); |
| |
| assertEquals( |
| "size of the conflation map should be 1 but actual size is " |
| + regionqueue.getConflationMapForTesting().size(), |
| 1, regionqueue.getConflationMapForTesting().size()); |
| assertEquals( |
| "size of the event map should be " + numOfThreads + " but actual size " |
| + regionqueue.getEventsMapForTesting().size(), |
| numOfThreads, regionqueue.getEventsMapForTesting().size()); |
| assertEquals( |
| "size of availableids should 1 but actual size " + regionqueue.getAvailableIds().size(), 1, |
| regionqueue.getAvailableIds().size()); |
| int count = 0; |
| for (int i = 0; i < numOfThreads; i++) { |
| if ((regionqueue.getCurrentCounterSet(new EventID(new byte[] {(byte) i}, i, i))).size() > 0) { |
| count++; |
| } |
| } |
| |
| assertEquals("size of the counter set is 1 but the actual size is " + count, 1, count); |
| |
| Long position = null; |
| if (regionqueue.getAvailableIds().size() == 1) { |
| position = (Long) regionqueue.getAvailableIds().iterator().next(); |
| } |
| ConflatableObject id = (ConflatableObject) regionqueue.getRegion().get(position); |
| assertEquals(regionqueue.getCurrentCounterSet(id.getEventId()).size(), 1); |
| this.logWriter.info("HARQAddOperationJUnitTest : testCleanUpForConflation END"); |
| } |
| |
| /** |
| * Test where a 5 separate threads do 4 puts each. A thread then peeks the 20 objects & then |
| * invokes remove. The remove should ensure that the entries are deleted from the available IDs & |
| * the Counters set contained in DACE. Conflation is disabled. |
| */ |
| @Test |
| public void testPeekAndRemoveWithoutConflation() throws Exception { |
| testFailed = false; |
| message = null; |
| final int numOfThreads = 5; |
| final int numOfPuts = 4; |
| final int batchSize = 20; |
| final HARegionQueue regionqueue = createHARegionQueue("testPeekAndRemoveWithoutConflation"); |
| Thread[] threads = new Thread[numOfThreads]; |
| for (int i = 0; i < numOfThreads; i++) { |
| final long ids = i; |
| threads[i] = new Thread() { |
| @Override |
| public void run() { |
| for (int j = 0; j < numOfPuts; j++) { |
| EventID id = new EventID(new byte[] {(byte) ids}, ids, j); |
| try { |
| regionqueue.put(new ConflatableObject(KEY1 + id.getThreadID() + j, |
| id.getThreadID() + "VALUE" + j, id, false, "region1")); |
| } catch (Exception ex) { |
| testFailed = true; |
| message.append("put failed for the threadId " + id.getThreadID()); |
| } |
| } |
| } |
| }; |
| } |
| |
| for (int k = 0; k < numOfThreads; k++) { |
| threads[k].start(); |
| } |
| |
| for (int k = 0; k < numOfThreads; k++) { |
| ThreadUtils.join(threads[k], 180 * 1000); |
| } |
| |
| if (testFailed) |
| fail("Test failed due to " + message); |
| |
| List pickObjects = regionqueue.peek(batchSize); |
| assertEquals(batchSize, pickObjects.size()); |
| regionqueue.remove(); |
| |
| for (int i = 0; i < numOfThreads; i++) { |
| assertEquals(3, |
| regionqueue.getLastDispatchedSequenceId(new EventID(new byte[] {(byte) i}, i, 1))); |
| assertEquals(0, |
| regionqueue.getCurrentCounterSet(new EventID(new byte[] {(byte) i}, i, 1)).size()); |
| } |
| |
| assertEquals(0, regionqueue.getAvailableIds().size()); |
| |
| this.logWriter.info("testPeekAndRemoveWithoutConflation() completed successfully"); |
| } |
| |
| /** |
| * Test where a 5 separate threads do 4 puts each. A thread then peeks the 20 objects & then |
| * invokes remove. The remove should ensure that the entries are deleted from the available IDs & |
| * the Counters set contained in DACE. Conflation is enabled |
| */ |
| @Test |
| public void testPeekAndRemoveWithConflation() throws Exception { |
| testFailed = false; |
| message = null; |
| final int numOfThreads = 5; |
| |
| final int numOfPuts = 4; |
| final int batchSize = numOfThreads * numOfPuts; |
| final HARegionQueue regionqueue = createHARegionQueue("testPeekAndRemoveWithConflation"); |
| Thread[] threads = new Thread[numOfThreads]; |
| for (int i = 0; i < numOfThreads; i++) { |
| final long ids = i; |
| threads[i] = new Thread() { |
| @Override |
| public void run() { |
| for (int j = 0; j < numOfPuts; j++) { |
| EventID id = new EventID(new byte[] {(byte) ids}, ids, j); |
| try { |
| regionqueue.put(new ConflatableObject(KEY1 + ids, id.getThreadID() + "VALUE" + j, id, |
| true, "region1")); |
| } catch (Exception ex) { |
| testFailed = true; |
| message.append("put failed for the threadId " + id.getThreadID()); |
| } |
| } |
| } |
| }; |
| } |
| |
| for (int k = 0; k < numOfThreads; k++) { |
| threads[k].start(); |
| } |
| |
| for (int k = 0; k < numOfThreads; k++) { |
| ThreadUtils.join(threads[k], 180 * 1000); |
| } |
| |
| if (testFailed) |
| fail("Test failed due to " + message); |
| |
| List pickObject = regionqueue.peek(batchSize); |
| assertEquals(numOfThreads, pickObject.size()); |
| regionqueue.remove(); |
| |
| for (int i = 0; i < numOfThreads; i++) { |
| // assertIndexDetailsEquals(numOfPuts, |
| // regionqueue.getLastDispatchedSequenceId(new EventID( |
| // new byte[] { (byte)i }, i, 1))); |
| assertEquals(0, |
| regionqueue.getCurrentCounterSet(new EventID(new byte[] {(byte) i}, i, 1)).size()); |
| } |
| |
| assertEquals("size of availableIds map should be 0 ", 0, regionqueue.getAvailableIds().size()); |
| assertEquals("size of conflation map should be 0 ", 0, |
| ((Map) regionqueue.getConflationMapForTesting().get("region1")).size()); |
| |
| this.logWriter.info("testPeekAndRemoveWithConflation() completed successfully"); |
| } |
| |
| /** |
| * Test where a 5 separate threads do 4 puts each. 4 threads then concurrently do a peek of batch |
| * size 5, 10 , 15 & 20 respectively. And all of them concurrently cal remove. The remove should |
| * ensure that the entries are deleted from the available IDs & the Counters set contained in |
| * DACE. |
| */ |
| @Test |
| public void testPeekForDiffBatchSizeAndRemoveAll() throws Exception { |
| testFailed = false; |
| message = null; |
| barrierCount = 0; |
| final int numOfThreads = 5; |
| final int numOfPuts = 4; |
| // final CountDownLatch mylatch = new CountDownLatch(4); |
| final HARegionQueue regionqueue = createHARegionQueue("testPeekForDiffBatchSizeAndRemoveAll"); |
| Thread[] threads = new Thread[numOfThreads]; |
| for (int i = 0; i < numOfThreads; i++) { |
| final long ids = i; |
| threads[i] = new Thread() { |
| @Override |
| public void run() { |
| for (int j = 0; j < numOfPuts; j++) { |
| EventID id = new EventID(new byte[] {(byte) ids}, ids, j); |
| try { |
| regionqueue.put(new ConflatableObject(KEY1 + id.getThreadID() + j, |
| id.getThreadID() + "VALUE" + j, id, false, "region1")); |
| } catch (Exception ex) { |
| testFailed = true; |
| message.append("put failed for the threadId " + id.getThreadID()); |
| } |
| } |
| } |
| }; |
| } |
| |
| for (int k = 0; k < numOfThreads; k++) { |
| threads[k].start(); |
| } |
| |
| for (int k = 0; k < numOfThreads; k++) { |
| ThreadUtils.join(threads[k], 180 * 1000); |
| } |
| |
| if (testFailed) |
| fail("Test failed due to " + message); |
| |
| testFailed = false; |
| message = null; |
| |
| Thread[] threads_peek_remove = new Thread[numOfPuts]; |
| for (int i = 1; i < (numOfPuts + 1); i++) { |
| final int peakBatchSize = i * 5; |
| threads_peek_remove[i - 1] = new Thread() { |
| |
| @Override |
| public void run() { |
| try { |
| List peakObjects = regionqueue.peek(peakBatchSize); |
| assertEquals(peakBatchSize, peakObjects.size()); |
| synchronized (HARQAddOperationJUnitTest.this) { |
| ++barrierCount; |
| if (barrierCount == 4) { |
| HARQAddOperationJUnitTest.this.notifyAll(); |
| } else { |
| HARQAddOperationJUnitTest.this.wait(); |
| } |
| } |
| regionqueue.remove(); |
| |
| } catch (Exception ex) { |
| testFailed = true; |
| ex.printStackTrace(); |
| message.append("Exception while performing peak operation " + ex.getStackTrace()); |
| |
| } |
| |
| } |
| |
| }; |
| } |
| |
| for (int k = 0; k < numOfPuts; k++) { |
| threads_peek_remove[k].start(); |
| } |
| |
| for (int k = 0; k < numOfPuts; k++) { |
| ThreadUtils.join(threads_peek_remove[k], 180 * 1000); |
| } |
| |
| if (testFailed) |
| fail("Test failed due to " + message); |
| |
| for (int i = 0; i < numOfThreads; i++) { |
| assertEquals(3, |
| regionqueue.getLastDispatchedSequenceId(new EventID(new byte[] {(byte) i}, i, 1))); |
| assertEquals(0, |
| regionqueue.getCurrentCounterSet(new EventID(new byte[] {(byte) i}, i, 1)).size()); |
| } |
| |
| assertEquals(0, regionqueue.getAvailableIds().size()); |
| |
| this.logWriter.info("testPeekForDiffBatchSizeAndRemoveAll() completed successfully"); |
| } |
| |
| /** |
| * Test where a 5 separate threads do 4 puts each. 3 threads then concurrently do a peek of batch |
| * size 5, 10 and 15 respectively. And all of them concurrently call remove. The remove should |
| * ensure that the entries are deleted from the available IDs & the Counters set contained in |
| * DACE. |
| */ |
| @Test |
| public void testPeekForDiffBatchSizeAndRemoveSome() throws Exception { |
| testFailed = false; |
| barrierCount = 0; |
| message = null; |
| final int numOfThreads = 5; |
| final int numOfPuts = 4; |
| final HARegionQueue regionqueue = createHARegionQueue("testPeekForDiffBatchSizeAndRemoveSome"); |
| Thread[] threads = new Thread[numOfThreads]; |
| for (int i = 0; i < numOfThreads; i++) { |
| final long ids = i; |
| threads[i] = new Thread() { |
| @Override |
| public void run() { |
| for (int j = 0; j < numOfPuts; j++) { |
| EventID id = new EventID(new byte[] {(byte) ids}, ids, j); |
| try { |
| regionqueue.put(new ConflatableObject(KEY1 + id.getThreadID() + j, |
| id.getThreadID() + "VALUE" + j, id, false, "region1")); |
| } catch (Exception ex) { |
| testFailed = true; |
| message.append("put failed for the threadId " + id.getThreadID()); |
| } |
| } |
| } |
| }; |
| } |
| |
| for (int k = 0; k < numOfThreads; k++) { |
| threads[k].start(); |
| } |
| |
| for (int k = 0; k < numOfThreads; k++) { |
| ThreadUtils.join(threads[k], 180 * 1000); |
| } |
| |
| if (testFailed) |
| fail("Test failed due to " + message); |
| |
| testFailed = false; |
| message = null; |
| Thread[] threads_peek_remove = new Thread[numOfPuts - 1]; |
| for (int i = 1; i < numOfPuts; i++) { |
| final int peakBatchSize = i * 5; |
| threads_peek_remove[i - 1] = new Thread() { |
| |
| @Override |
| public void run() { |
| try { |
| List peakObjects = regionqueue.peek(peakBatchSize); |
| assertEquals(peakBatchSize, peakObjects.size()); |
| synchronized (HARQAddOperationJUnitTest.this) { |
| ++barrierCount; |
| if (barrierCount == 3) { |
| HARQAddOperationJUnitTest.this.notifyAll(); |
| } else { |
| HARQAddOperationJUnitTest.this.wait(); |
| } |
| } |
| regionqueue.remove(); |
| |
| } catch (Exception ex) { |
| testFailed = true; |
| ex.printStackTrace(); |
| message.append("Exception while performing peak operation " + ex.getStackTrace()); |
| |
| } |
| |
| } |
| |
| }; |
| } |
| |
| for (int k = 0; k < numOfPuts - 1; k++) { |
| threads_peek_remove[k].start(); |
| } |
| |
| for (int k = 0; k < numOfPuts - 1; k++) { |
| ThreadUtils.join(threads_peek_remove[k], 180 * 1000); |
| } |
| |
| if (testFailed) |
| fail("Test failed due to " + message); |
| |
| assertEquals(5, regionqueue.getAvailableIds().size()); |
| |
| this.logWriter.info("testPeekForDiffBatchSizeAndRemoveSome() completed successfully"); |
| } |
| |
| /** |
| * Add with QRM & expiry : Add 10 conflatable objects (0-9). Send QRM LastDispatched as 4. |
| * Validate the sequence ID field of LastDispatchedWrapper object is updated to 4. Perform Take |
| * operations to remove next 5 to 9. The seqeunceId field should be 9. Allow ThreadIdenitifier to |
| * expire. The expiration should fail as the original sequenceId ( -1) does not match with 9. |
| * Validate reset with value 9 . The next expiry should remove the LastDisptachedWrapper |
| */ |
| @Test |
| public void testAddWithQRMAndExpiry() throws Exception { |
| try { |
| HARegionQueueAttributes attrs = new HARegionQueueAttributes(); |
| attrs.setExpiryTime(10); |
| final HARegionQueue regionqueue = |
| new HARegionQueue.TestOnlyHARegionQueue("testing", cache, attrs) { |
| @Override |
| CacheListener createCacheListenerForHARegion() { |
| return new CacheListenerAdapter() { |
| @Override |
| public void afterInvalidate(EntryEvent event) { |
| try { |
| expireTheEventOrThreadIdentifier(event); |
| } catch (CacheException ce) { |
| logger.error( |
| "HAREgionQueue::createCacheListener::Exception in the expiry thread", ce); |
| } |
| if (event.getKey() instanceof ThreadIdentifier) { |
| synchronized (HARQAddOperationJUnitTest.this) { |
| expiryCount++; |
| HARQAddOperationJUnitTest.this.notify(); |
| } |
| } |
| } |
| }; |
| } |
| }; |
| Conflatable[] cf = new Conflatable[10]; |
| // put 10 conflatable objects |
| for (int i = 0; i < 10; i++) { |
| cf[i] = new ConflatableObject("key" + i, "value", new EventID(new byte[] {1}, 1, i), true, |
| "testing"); |
| regionqueue.put(cf[i]); |
| } |
| ThreadIdentifier tID = new ThreadIdentifier(new byte[] {1}, 1); |
| // verify that the sequence-id for Thread-identifier is -1 (default value). |
| assertEquals(new Long(-1), regionqueue.getRegion().get(tID)); |
| |
| // remove the first 5 - (0-4 sequence IDs) |
| regionqueue.removeDispatchedEvents(new EventID(new byte[] {1}, 1, 4)); |
| |
| // verify that the last dispatched event was of sequence id 4 |
| assertEquals(4, regionqueue.getLastDispatchedSequenceId(new EventID(new byte[] {1}, 1, 1))); |
| // verify 1-5 not in region |
| for (long i = 1; i < 6; i++) { |
| assertTrue(!regionqueue.getRegion().containsKey(new Long(i))); |
| } |
| // verify 6-10 still in region queue |
| for (long i = 6; i < 11; i++) { |
| assertTrue(regionqueue.getRegion().containsKey(new Long(i))); |
| } |
| |
| // Perform 5 take operations to remove next 5-9 sequence ids |
| for (long i = 6; i < 11; i++) { |
| regionqueue.take(); |
| } |
| |
| // verify that the last dispatched event was of sequence id 10 |
| assertEquals(9, regionqueue.getLastDispatchedSequenceId(new EventID(new byte[] {1}, 1, 1))); |
| // verify that sequence ids 1-10 all are removed from the RQ |
| for (long i = 1; i < 11; i++) { |
| assertTrue(!regionqueue.getRegion().containsKey(new Long(i))); |
| } |
| |
| // wait until expiry thread has run once |
| synchronized (HARQAddOperationJUnitTest.this) { |
| if (0 == expiryCount) { |
| HARQAddOperationJUnitTest.this.wait(); |
| } |
| if (1 == expiryCount) { |
| // verify that the Thread-identifier has not yet expired |
| assertEquals(1, regionqueue.getEventsMapForTesting().size()); |
| |
| // verify that the sequence-id for Thread-identifier is updated to 9 |
| assertEquals(new Long(9), regionqueue.getRegion().get(tID)); |
| |
| // wait until expiry thread has run again |
| HARQAddOperationJUnitTest.this.wait(); |
| } |
| } |
| |
| // verify that the Thread-identifier has expired |
| assertEquals(0, regionqueue.getEventsMapForTesting().size()); |
| // verify that the sequence-id for Thread-identifier is null |
| assertNull(regionqueue.getRegion().get(tID)); |
| } catch (Exception e) { |
| throw new AssertionError("Exception occurred in test due to", e); |
| } |
| } |
| |
| /* |
| * This test does the following:<br> 1)Create a blocking HARegionQueue<br> 2)Add some events to |
| * the queue with same ThreadIdentifier<br> 3)Do take() operations to drain the queue<br> 4)Verify |
| * that dispatchedMessagesMap is not null<br> 5)Verify that size of the dispatchedMessagesMap is 1 |
| * as one regionqueue is created in this test<br> 6)Verify that the map contains an entry for the |
| * queue-region name<br> 7)Verify that the size of wrapper-map is 1 as all events had same |
| * ThreadId<br> 8)Verify that the sequenceId against the ThreadId in the wrapper-map is same as |
| * that of the last event taken<br> |
| */ |
| |
| /** |
| * Behaviour of take() has been changed for relaible messaging feature. Region queue take() |
| * operation will no longer add to the Dispatch Message Map. Hence disabling the test - SUYOG |
| */ |
| @Ignore |
| @Test |
| public void testDispatchedMsgsMapUpdateOnTakes() throws Exception { |
| this.logWriter.info("HARQAddOperationJUnitTest : testDispatchedEventsMapUpdateOnTakes BEGIN"); |
| |
| String regionName = "testDispatchedEventsMapUpdateOnTakes"; |
| HARegionQueue rq = createHARegionQueue(regionName); |
| |
| Conflatable cf = null; |
| EventID id = null; |
| |
| int totalEvents = 10; |
| for (int i = 0; i < totalEvents; i++) { |
| id = new EventID(new byte[] {1}, 1, i); |
| cf = new ConflatableObject("key" + i, "value" + i, id, false, "testing"); |
| rq.put(cf); |
| } |
| |
| for (int i = 0; i < totalEvents; i++) { |
| rq.take(); |
| } |
| |
| Map dispatchedMsgMap = HARegionQueue.getDispatchedMessagesMapForTesting(); |
| // verify that map is not null |
| assertNotNull("dispatchedMessagesMap found null", dispatchedMsgMap); |
| |
| // size of the dispatchedMessagesMap should be 1 as one regionqueue is |
| // created in this test |
| assertEquals("size of dispatched msgs should be 1", 1, dispatchedMsgMap.size()); |
| |
| // verify that the map contains an entry for the queue-region name |
| MapWrapper wrapper = (MapWrapper) dispatchedMsgMap.get(regionName); |
| assertNotNull("dispatchedMsgMap should contain an entry with queueregion name as key", wrapper); |
| |
| Map dispatchedData = wrapper.map; |
| assertEquals("size of wrapper-map should be 1 as all events had same ThreadId", 1, |
| dispatchedData.size()); |
| |
| ThreadIdentifier tid = new ThreadIdentifier(new byte[] {1}, 1); |
| Long seqId = (Long) dispatchedData.get(tid); |
| |
| assertEquals( |
| "sequenceId against the ThreadId in the wrapper-map should be that of the last event taken.", |
| id.getSequenceID(), seqId.longValue()); |
| |
| this.logWriter.info("HARQAddOperationJUnitTest : testDispatchedEventsMapUpdateOnTakes END"); |
| } |
| } |