| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more contributor license |
| * agreements. See the NOTICE file distributed with this work for additional information regarding |
| * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance with the License. You may obtain a |
| * copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software distributed under the License |
| * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
| * or implied. See the License for the specific language governing permissions and limitations under |
| * the License. |
| */ |
| package org.apache.geode.internal.cache.ha; |
| |
| import static org.apache.geode.distributed.ConfigurationProperties.MCAST_PORT; |
| import static org.apache.geode.internal.lang.SystemPropertyHelper.GEODE_PREFIX; |
| import static org.apache.geode.internal.lang.SystemPropertyHelper.HA_REGION_QUEUE_EXPIRY_TIME_PROPERTY; |
| import static org.apache.geode.internal.statistics.StatisticsClockFactory.disabledClock; |
| import static org.apache.geode.test.awaitility.GeodeAwaitility.await; |
| import static org.hamcrest.CoreMatchers.is; |
| import static org.hamcrest.CoreMatchers.not; |
| import static org.hamcrest.CoreMatchers.notNullValue; |
| import static org.hamcrest.CoreMatchers.nullValue; |
| import static org.hamcrest.CoreMatchers.sameInstance; |
| import static org.hamcrest.number.OrderingComparison.greaterThanOrEqualTo; |
| import static org.junit.Assert.assertThat; |
| import static org.junit.Assert.fail; |
| import static org.mockito.Mockito.doAnswer; |
| import static org.mockito.Mockito.doReturn; |
| import static org.mockito.Mockito.mock; |
| import static org.mockito.Mockito.times; |
| import static org.mockito.Mockito.verify; |
| |
| import java.io.IOException; |
| import java.util.ArrayList; |
| import java.util.Collection; |
| import java.util.Collections; |
| import java.util.HashMap; |
| import java.util.Iterator; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Set; |
| import java.util.concurrent.Callable; |
| import java.util.concurrent.ConcurrentHashMap; |
| import java.util.concurrent.ConcurrentMap; |
| import java.util.concurrent.CyclicBarrier; |
| import java.util.concurrent.ExecutorService; |
| import java.util.concurrent.Executors; |
| import java.util.concurrent.Future; |
| import java.util.concurrent.atomic.AtomicBoolean; |
| |
| import org.awaitility.core.ThrowingRunnable; |
| import org.junit.After; |
| import org.junit.Assert; |
| import org.junit.Before; |
| import org.junit.Rule; |
| import org.junit.Test; |
| import org.junit.contrib.java.lang.system.RestoreSystemProperties; |
| import org.junit.experimental.categories.Category; |
| import org.junit.rules.ErrorCollector; |
| import org.junit.rules.TestName; |
| import org.mockito.Mockito; |
| import org.mockito.invocation.InvocationOnMock; |
| import org.mockito.stubbing.Answer; |
| |
| import org.apache.geode.cache.CacheException; |
| import org.apache.geode.cache.CacheFactory; |
| import org.apache.geode.cache.CacheListener; |
| import org.apache.geode.cache.EntryEvent; |
| import org.apache.geode.cache.Region; |
| import org.apache.geode.cache.RegionExistsException; |
| import org.apache.geode.cache.util.CacheListenerAdapter; |
| import org.apache.geode.internal.cache.Conflatable; |
| import org.apache.geode.internal.cache.EventID; |
| import org.apache.geode.internal.cache.InternalCache; |
| import org.apache.geode.internal.cache.RegionQueue; |
| import org.apache.geode.internal.cache.tier.sockets.CacheClientProxy; |
| import org.apache.geode.internal.cache.tier.sockets.ClientProxyMembershipID; |
| import org.apache.geode.internal.cache.tier.sockets.ClientUpdateMessageImpl; |
| import org.apache.geode.internal.cache.tier.sockets.HAEventWrapper; |
| import org.apache.geode.test.dunit.ThreadUtils; |
| import org.apache.geode.test.junit.categories.ClientSubscriptionTest; |
| |
| /** |
| * This is a test for the APIs of a HARegionQueue and verifies that the head, tail and size counters |
| * are updated properly. |
| * |
| * <p> |
| * TODO: use ExecutorServiceRule instead of raw threads (move CyclicBarrier into rule if needed) |
| */ |
| @Category({ClientSubscriptionTest.class}) |
| public class HARegionQueueJUnitTest { |
| |
| /** total number of threads doing put operations */ |
| private static final int TOTAL_PUT_THREADS = 10; |
| |
| private static HARegionQueue hrqForTestSafeConflationRemoval; |
| private static List list1; |
| |
| protected InternalCache cache; |
| private HARegionQueue haRegionQueue; |
| |
| @Rule |
| public RestoreSystemProperties restoreSystemProperties = new RestoreSystemProperties(); |
| |
| @Rule |
| public ErrorCollector errorCollector = new ErrorCollector(); |
| |
| @Rule |
| public TestName testName = new TestName(); |
| |
| @Before |
| public void setUp() throws Exception { |
| cache = createCache(); |
| } |
| |
| @After |
| public void tearDown() throws Exception { |
| cache.close(); |
| hrqForTestSafeConflationRemoval = null; |
| } |
| |
| /** |
| * This test does the following :<br> |
| * 1)Create producer threads to add objects to queue <br> |
| * 2)Start all the producers so that they complete their puts <br> |
| * 3)Wait till all put-threads complete their job <br> |
| * 4)verify that the size of the queue is equal to the total number of puts done by all producers |
| */ |
| @Test |
| public void testQueuePutWithoutConflation() throws Exception { |
| haRegionQueue = createHARegionQueue(testName.getMethodName()); |
| int putPerProducer = 20; |
| createAndRunProducers(false, false, false, putPerProducer); |
| assertThat(haRegionQueue.size(), is(putPerProducer * TOTAL_PUT_THREADS)); |
| } |
| |
| /** |
| * This test does the following :<br> |
| * 1)Create producer threads to add objects to queue <br> |
| * 2)Start all the producers,all of them will do puts against same set of keys <br> |
| * 3)Wait till all put-threads complete their job <br> |
| * 4)verify that the size of the queue is equal to the total number of puts done by one thread (as |
| * rest of them will conflate) |
| */ |
| @Test |
| public void testQueuePutWithConflation() throws Exception { |
| haRegionQueue = createHARegionQueue(testName.getMethodName()); |
| int putPerProducer = 20; |
| createAndRunProducers(true, false, true, putPerProducer); |
| assertThat(haRegionQueue.size(), is(putPerProducer)); |
| } |
| |
| /** |
| * This test does the following :<br> |
| * 1)Create producer threads to add objects to queue <br> |
| * 2)Start all the producers,all of them will do puts against same set of ids <br> |
| * 3)Wait till all put-threads complete their job <br> |
| * 4)verify that the size of the queue is equal to the total number of puts done by one thread (as |
| * rest of them will be duplicates and hence will be replaced) |
| */ |
| @Test |
| public void testQueuePutWithDuplicates() throws Exception { |
| haRegionQueue = createHARegionQueue(testName.getMethodName()); |
| int putPerProducer = 20; |
| createAndRunProducers(false, false, true, putPerProducer); |
| assertThat(haRegionQueue.size(), is(putPerProducer * TOTAL_PUT_THREADS)); |
| } |
| |
| /* |
| * Test method for 'org.apache.geode.internal.cache.ha.HARegionQueue.addDispatchedMessage(Object)' |
| */ |
| @Test |
| public void testAddDispatchedMessageObject() throws Exception { |
| haRegionQueue = createHARegionQueue(testName.getMethodName()); |
| assertThat(HARegionQueue.getDispatchedMessagesMapForTesting().isEmpty(), is(true)); |
| |
| haRegionQueue.addDispatchedMessage(new ThreadIdentifier(new byte[1], 1), 1); |
| haRegionQueue.addDispatchedMessage(new ThreadIdentifier(new byte[1], 2), 2); |
| |
| assertThat(!HARegionQueue.getDispatchedMessagesMapForTesting().isEmpty(), is(true)); |
| } |
| |
| /** |
| * tests the blocking peek functionality of BlockingHARegionQueue |
| */ |
| @Test |
| public void testBlockQueue() throws Exception { |
| HARegionQueue regionQueue = HARegionQueue.getHARegionQueueInstance(testName.getMethodName(), |
| cache, HARegionQueue.BLOCKING_HA_QUEUE, false, disabledClock()); |
| Thread[] threads = new Thread[10]; |
| int threadsLength = threads.length; |
| CyclicBarrier barrier = new CyclicBarrier(threadsLength + 1); |
| |
| for (int i = 0; i < threadsLength; i++) { |
| threads[i] = new Thread() { |
| @Override |
| public void run() { |
| try { |
| barrier.await(); |
| long startTime = System.currentTimeMillis(); |
| Object obj = regionQueue.peek(); |
| if (obj == null) { |
| errorCollector.addError(new AssertionError( |
| "Failed : failed since object was null and was not expected to be null")); |
| } |
| long totalTime = System.currentTimeMillis() - startTime; |
| |
| if (totalTime < 2000) { |
| errorCollector.addError(new AssertionError( |
| " Failed : Expected time to be greater than 2000 but it is not so ")); |
| } |
| } catch (Exception e) { |
| errorCollector.addError(e); |
| } |
| } |
| }; |
| } |
| |
| for (Thread thread1 : threads) { |
| thread1.start(); |
| } |
| |
| barrier.await(); |
| |
| Thread.sleep(5000); |
| |
| EventID id = new EventID(new byte[] {1}, 1, 1); |
| regionQueue.put(new ConflatableObject("key", "value", id, false, testName.getMethodName())); |
| |
| long startTime = System.currentTimeMillis(); |
| for (Thread thread : threads) { |
| ThreadUtils.join(thread, 60 * 1000); |
| } |
| |
| long totalTime = System.currentTimeMillis() - startTime; |
| |
| if (totalTime >= 60000) { |
| fail(" Test taken too long "); |
| } |
| } |
| |
| /** |
| * tests whether expiry of entry in the region queue occurs as expected |
| */ |
| @Test |
| public void testExpiryPositive() throws Exception { |
| HARegionQueueAttributes haa = new HARegionQueueAttributes(); |
| haa.setExpiryTime(1); |
| |
| HARegionQueue regionQueue = createHARegionQueue(testName.getMethodName(), haa); |
| long start = System.currentTimeMillis(); |
| |
| regionQueue.put(new ConflatableObject("key", "value", new EventID(new byte[] {1}, 1, 1), true, |
| testName.getMethodName())); |
| |
| Map map = (Map) regionQueue.getConflationMapForTesting().get(testName.getMethodName()); |
| waitAtLeast(1000, start, () -> { |
| assertThat(map, is(Collections.emptyMap())); |
| assertThat(regionQueue.getRegion().keys(), is(Collections.emptySet())); |
| }); |
| } |
| |
| /** |
| * tests whether expiry of a conflated entry in the region queue occurs as expected |
| */ |
| @Test |
| public void testExpiryPositiveWithConflation() throws Exception { |
| HARegionQueueAttributes haa = new HARegionQueueAttributes(); |
| haa.setExpiryTime(1); |
| |
| HARegionQueue regionQueue = createHARegionQueue(testName.getMethodName(), haa); |
| long start = System.currentTimeMillis(); |
| |
| regionQueue.put(new ConflatableObject("key", "value", new EventID(new byte[] {1}, 1, 1), true, |
| testName.getMethodName())); |
| |
| regionQueue.put(new ConflatableObject("key", "newValue", new EventID(new byte[] {1}, 1, 2), |
| true, testName.getMethodName())); |
| |
| assertThat( |
| " Expected region size not to be zero since expiry time has not been exceeded but it is not so ", |
| !regionQueue.isEmpty(), is(true)); |
| assertThat( |
| " Expected the available id's size not to be zero since expiry time has not been exceeded but it is not so ", |
| !regionQueue.getAvailableIds().isEmpty(), is(true)); |
| assertThat( |
| " Expected conflation map size not to be zero since expiry time has not been exceeded but it is not so " |
| + ((Map) regionQueue.getConflationMapForTesting().get(testName.getMethodName())) |
| .get("key"), |
| ((Map) regionQueue.getConflationMapForTesting().get(testName.getMethodName())).get("key"), |
| not(sameInstance(null))); |
| assertThat( |
| " Expected eventID map size not to be zero since expiry time has not been exceeded but it is not so ", |
| !regionQueue.getEventsMapForTesting().isEmpty(), is(true)); |
| |
| waitAtLeast(1000, start, () -> { |
| assertThat(regionQueue.getRegion().keys(), is(Collections.emptySet())); |
| assertThat(regionQueue.getAvailableIds(), is(Collections.emptySet())); |
| assertThat(regionQueue.getConflationMapForTesting().get(testName.getMethodName()), |
| is(Collections.emptyMap())); |
| assertThat(regionQueue.getEventsMapForTesting(), is(Collections.emptyMap())); |
| }); |
| } |
| |
| /** |
| * tests a ThreadId not being expired if it was updated |
| */ |
| @Test |
| public void testNoExpiryOfThreadId() throws Exception { |
| HARegionQueueAttributes haa = new HARegionQueueAttributes(); |
| haa.setExpiryTime(45); |
| |
| HARegionQueue regionQueue = createHARegionQueue(testName.getMethodName(), haa); |
| EventID ev1 = new EventID(new byte[] {1}, 1, 1); |
| EventID ev2 = new EventID(new byte[] {1}, 1, 2); |
| Conflatable cf1 = new ConflatableObject("key", "value", ev1, true, testName.getMethodName()); |
| Conflatable cf2 = new ConflatableObject("key", "value2", ev2, true, testName.getMethodName()); |
| |
| regionQueue.put(cf1); |
| long tailKey = regionQueue.tailKey.get(); |
| regionQueue.put(cf2); |
| |
| // Invalidate will trigger the expiration of the entry |
| // See HARegionQueue.createCacheListenerForHARegion |
| regionQueue.getRegion().invalidate(tailKey); |
| |
| assertThat( |
| " Expected region size not to be zero since expiry time has not been exceeded but it is not so ", |
| !regionQueue.isEmpty(), is(true)); |
| assertThat(" Expected the available id's size not to have counter 1 but it has ", |
| !regionQueue.getAvailableIds().contains(1L), is(true)); |
| assertThat(" Expected the available id's size to have counter 2 but it does not have ", |
| regionQueue.getAvailableIds().contains(2L), is(true)); |
| assertThat(" Expected eventID map not to have the first event, but it has", |
| !regionQueue.getCurrentCounterSet(ev1).contains(1L), is(true)); |
| assertThat(" Expected eventID map to have the second event, but it does not", |
| regionQueue.getCurrentCounterSet(ev2).contains(2L), is(true)); |
| } |
| |
| /** |
| * Tests a QueueRemovalMessage coming before a localPut(). The localPut() should result in no data |
| * being put in the queue |
| */ |
| @Test |
| public void testQRMComingBeforeLocalPut() throws Exception { |
| HARegionQueue regionQueue = createHARegionQueue(testName.getMethodName()); |
| EventID id = new EventID(new byte[] {1}, 1, 1); |
| |
| regionQueue.removeDispatchedEvents(id); |
| regionQueue.put(new ConflatableObject("key", "value", id, true, testName.getMethodName())); |
| |
| assertThat(" Expected key to be null since QRM for the message id had already arrived ", |
| !regionQueue.getRegion().containsKey(1L), is(true)); |
| } |
| |
| /** |
| * test verifies correct expiry of ThreadIdentifier in the HARQ if no corresponding put comes |
| */ |
| @Test |
| public void testOnlyQRMComing() throws Exception { |
| HARegionQueueAttributes harqAttr = new HARegionQueueAttributes(); |
| harqAttr.setExpiryTime(1); |
| |
| HARegionQueue regionQueue = createHARegionQueue(testName.getMethodName(), harqAttr); |
| EventID id = new EventID(new byte[] {1}, 1, 1); |
| long start = System.currentTimeMillis(); |
| |
| regionQueue.removeDispatchedEvents(id); |
| |
| assertThat(" Expected testingID to be present since only QRM achieved ", |
| regionQueue.getRegion().containsKey(new ThreadIdentifier(new byte[] {1}, 1)), is(true)); |
| |
| waitAtLeast(1000, start, |
| () -> assertThat( |
| " Expected testingID not to be present since it should have expired after 2.5 seconds", |
| !regionQueue.getRegion().containsKey(new ThreadIdentifier(new byte[] {1}, 1)), |
| is(true))); |
| } |
| |
| /** |
| * test all relevant data structures are updated on a local put |
| */ |
| @Test |
| public void testPutPath() throws Exception { |
| HARegionQueue regionQueue = createHARegionQueue(testName.getMethodName()); |
| Conflatable cf = new ConflatableObject("key", "value", new EventID(new byte[] {1}, 1, 1), true, |
| testName.getMethodName()); |
| |
| regionQueue.put(cf); |
| |
| assertThat(" Expected region peek to return cf but it is not so ", regionQueue.peek(), is(cf)); |
| assertThat( |
| " Expected the available id's size not to be zero since expiry time has not been exceeded but it is not so ", |
| !regionQueue.getAvailableIds().isEmpty(), is(true)); |
| assertThat( |
| " Expected conflation map to have entry for this key since expiry time has not been exceeded but it is not so ", |
| ((Map) regionQueue.getConflationMapForTesting().get(testName.getMethodName())).get("key"), |
| is(1L)); |
| assertThat( |
| " Expected eventID map size not to be zero since expiry time has not been exceeded but it is not so ", |
| !regionQueue.getEventsMapForTesting().isEmpty(), is(true)); |
| } |
| |
| /** |
| * - adds 10 items - sets last dispatched as 5th - verify no data pertaining to the first five is |
| * there - verify the next five entries and their relevant data is present |
| */ |
| @Test |
| public void testQRMDispatch() throws Exception { |
| HARegionQueue regionQueue = createHARegionQueue(testName.getMethodName()); |
| Conflatable[] cf = new Conflatable[10]; |
| |
| // put 10 conflatable objects |
| for (int i = 0; i < 10; i++) { |
| cf[i] = new ConflatableObject("key" + i, "value", new EventID(new byte[] {1}, 1, i), true, |
| testName.getMethodName()); |
| regionQueue.put(cf[i]); |
| } |
| |
| // remove the first 5 by giving the right sequence id |
| regionQueue.removeDispatchedEvents(new EventID(new byte[] {1}, 1, 4)); |
| |
| // verify 1-5 not in region |
| for (int i = 1; i < 6; i++) { |
| assertThat(!regionQueue.getRegion().containsKey((long) i), is(true)); |
| } |
| |
| // verify 6-10 still in region queue |
| for (int i = 6; i < 11; i++) { |
| assertThat(regionQueue.getRegion().containsKey((long) i), is(true)); |
| } |
| |
| // verify 1-5 not in conflation map |
| for (int i = 0; i < 5; i++) { |
| assertThat(!((Map) regionQueue.getConflationMapForTesting().get(testName.getMethodName())) |
| .containsKey("key" + i), is(true)); |
| } |
| |
| // verify 6-10 in conflation map |
| for (int i = 5; i < 10; i++) { |
| assertThat(((Map) regionQueue.getConflationMapForTesting().get(testName.getMethodName())) |
| .containsKey("key" + i), is(true)); |
| } |
| |
| EventID eid = new EventID(new byte[] {1}, 1, 6); |
| |
| // verify 1-5 not in eventMap |
| for (int i = 1; i < 6; i++) { |
| assertThat(!regionQueue.getCurrentCounterSet(eid).contains((long) i), is(true)); |
| } |
| |
| // verify 6-10 in event Map |
| for (int i = 6; i < 11; i++) { |
| assertThat(regionQueue.getCurrentCounterSet(eid).contains((long) i), is(true)); |
| } |
| |
| // verify 1-5 not in available Id's map |
| for (int i = 1; i < 6; i++) { |
| assertThat(!regionQueue.getAvailableIds().contains((long) i), is(true)); |
| } |
| |
| // verify 6-10 in available id's map |
| for (int i = 6; i < 11; i++) { |
| assertThat(regionQueue.getAvailableIds().contains((long) i), is(true)); |
| } |
| } |
| |
| /** |
| * - send Dispatch message for sequence id 7 - put from sequence id 1 - id 10 - verify data for |
| * 1-7 not there - verify data for 8-10 is there |
| */ |
| @Test |
| public void testQRMBeforePut() throws Exception { |
| HARegionQueue regionQueue = createHARegionQueue(testName.getMethodName()); |
| |
| EventID[] ids = new EventID[10]; |
| |
| for (int i = 0; i < 10; i++) { |
| ids[i] = new EventID(new byte[] {1}, 1, i); |
| } |
| |
| // first get the qrm message for the seventh id |
| regionQueue.removeDispatchedEvents(ids[6]); |
| Conflatable[] cf = new Conflatable[10]; |
| |
| // put 10 conflatable objects |
| for (int i = 0; i < 10; i++) { |
| cf[i] = new ConflatableObject("key" + i, "value", ids[i], true, testName.getMethodName()); |
| regionQueue.put(cf[i]); |
| } |
| |
| // verify 1-7 not in region |
| Set values = (Set) regionQueue.getRegion().values(); |
| |
| for (int i = 0; i < 7; i++) { |
| System.out.println(i); |
| assertThat(!values.contains(cf[i]), is(true)); |
| } |
| |
| // verify 8-10 still in region queue |
| for (int i = 7; i < 10; i++) { |
| System.out.println(i); |
| assertThat(values.contains(cf[i]), is(true)); |
| } |
| |
| // verify 1-8 not in conflation map |
| for (int i = 0; i < 7; i++) { |
| assertThat(!((Map) regionQueue.getConflationMapForTesting().get(testName.getMethodName())) |
| .containsKey("key" + i), is(true)); |
| } |
| |
| // verify 8-10 in conflation map |
| for (int i = 7; i < 10; i++) { |
| assertThat(((Map) regionQueue.getConflationMapForTesting().get(testName.getMethodName())) |
| .containsKey("key" + i), is(true)); |
| } |
| |
| EventID eid = new EventID(new byte[] {1}, 1, 6); |
| |
| // verify 1-7 not in eventMap |
| for (int i = 4; i < 11; i++) { |
| assertThat(!regionQueue.getCurrentCounterSet(eid).contains((long) i), is(true)); |
| } |
| |
| // verify 8-10 in event Map |
| for (int i = 1; i < 4; i++) { |
| assertThat(regionQueue.getCurrentCounterSet(eid).contains((long) i), is(true)); |
| } |
| |
| // verify 1-7 not in available Id's map |
| for (int i = 4; i < 11; i++) { |
| assertThat(!regionQueue.getAvailableIds().contains((long) i), is(true)); |
| } |
| |
| // verify 8-10 in available id's map |
| for (int i = 1; i < 4; i++) { |
| assertThat(regionQueue.getAvailableIds().contains((long) i), is(true)); |
| } |
| } |
| |
| /** |
| * test to verify conflation happens as expected |
| */ |
| @Test |
| public void testConflation() throws Exception { |
| HARegionQueue regionQueue = createHARegionQueue(testName.getMethodName()); |
| EventID ev1 = new EventID(new byte[] {1}, 1, 1); |
| EventID ev2 = new EventID(new byte[] {1}, 2, 2); |
| Conflatable cf1 = new ConflatableObject("key", "value", ev1, true, testName.getMethodName()); |
| Conflatable cf2 = new ConflatableObject("key", "value2", ev2, true, testName.getMethodName()); |
| regionQueue.put(cf1); |
| |
| Map conflationMap = regionQueue.getConflationMapForTesting(); |
| assertThat(((Map) conflationMap.get(testName.getMethodName())).get("key"), is(1L)); |
| |
| regionQueue.put(cf2); |
| |
| // verify the conflation map has recorded the new key |
| assertThat(((Map) conflationMap.get(testName.getMethodName())).get("key"), is(2L)); |
| // the old key should not be present |
| assertThat(!regionQueue.getRegion().containsKey(1L), is(true)); |
| // available ids should not contain the old id (the old position) |
| assertThat(!regionQueue.getAvailableIds().contains(1L), is(true)); |
| // available id should have the new id (the new position) |
| assertThat(regionQueue.getAvailableIds().contains(2L), is(true)); |
| // events map should not contain the old position |
| assertThat(regionQueue.getCurrentCounterSet(ev1).isEmpty(), is(true)); |
| // events map should contain the new position |
| assertThat(regionQueue.getCurrentCounterSet(ev2).contains(2L), is(true)); |
| } |
| |
| /** |
| * Tests whether the QRM message removes the events correctly from the DACE & Conflation Map. The |
| * events which are of ID greater than that contained in QRM should stay |
| */ |
| @Test |
| public void testQRM() throws Exception { |
| RegionQueue regionqueue = createHARegionQueue(testName.getMethodName()); |
| |
| for (int i = 0; i < 10; ++i) { |
| regionqueue.put(new ConflatableObject("key" + (i + 1), "value", |
| new EventID(new byte[] {1}, 1, i + 1), true, testName.getMethodName())); |
| } |
| |
| EventID qrmID = new EventID(new byte[] {1}, 1, 5); |
| ((HARegionQueue) regionqueue).removeDispatchedEvents(qrmID); |
| Map conflationMap = ((HARegionQueue) regionqueue).getConflationMapForTesting(); |
| assertThat(((Map) conflationMap.get(testName.getMethodName())).size(), is(5)); |
| |
| Set availableIDs = ((HARegionQueue) regionqueue).getAvailableIds(); |
| Set counters = ((HARegionQueue) regionqueue).getCurrentCounterSet(qrmID); |
| |
| assertThat(availableIDs.size(), is(5)); |
| assertThat(counters.size(), is(5)); |
| |
| for (int i = 5; i < 10; ++i) { |
| assertThat(((Map) (conflationMap.get(testName.getMethodName()))).containsKey("key" + (i + 1)), |
| is(true)); |
| assertThat(availableIDs.contains((long) (i + 1)), is(true)); |
| assertThat(counters.contains((long) (i + 1)), is(true)); |
| } |
| |
| Region rgn = ((HARegionQueue) regionqueue).getRegion(); |
| assertThat(rgn.keySet().size(), is(6)); |
| } |
| |
| /** |
| * This test tests safe removal from the conflation map. i.e operations should only remove old |
| * values and not the latest value |
| */ |
| @Test |
| public void testSafeConflationRemoval() throws Exception { |
| hrqForTestSafeConflationRemoval = new HARQTestClass("testSafeConflationRemoval", cache); |
| Conflatable cf1 = new ConflatableObject("key1", "value", new EventID(new byte[] {1}, 1, 1), |
| true, "testSafeConflationRemoval"); |
| |
| hrqForTestSafeConflationRemoval.put(cf1); |
| hrqForTestSafeConflationRemoval.removeDispatchedEvents(new EventID(new byte[] {1}, 1, 1)); |
| |
| Map map = (Map) hrqForTestSafeConflationRemoval.getConflationMapForTesting() |
| .get("testSafeConflationRemoval"); |
| |
| assertThat( |
| "Expected the counter to be 2 since it should not have been deleted but it is not so ", |
| map.get("key1"), is(2L)); |
| } |
| |
| /** |
| * This test tests remove operation is causing the insertion of sequence ID for existing |
| * ThreadIdentifier object and concurrently the QRM thread is iterating over the Map to form the |
| * Data Set for dispatch. There should not be any Data Loss |
| * |
| * In this test, first we add x number of events for unique thread id for the same region then we |
| * start two concurrent threads. One which does add to dispatched events map with sequence id's |
| * greater than x but the same ThreadIds as previous. The second thread does createMessageList |
| * (which removes from the dispatched events map) and stores the list |
| * |
| * After the two threads have operated, createMessageList is called again and the data is stored |
| * in another list. |
| * |
| * The data in the list is populated on a map against the threadId. |
| * |
| * It is then verified to see that all the sequence should be greater than x |
| */ |
| @Test |
| public void testConcurrentDispatcherAndRemovalForSameRegionSameThreadId() throws Exception { |
| long numberOfIterations = 1000; |
| HARegionQueue hrq = createHARegionQueue(testName.getMethodName()); |
| HARegionQueue.stopQRMThread(); |
| ThreadIdentifier[] ids = new ThreadIdentifier[(int) numberOfIterations]; |
| |
| for (int i = 0; i < numberOfIterations; i++) { |
| ids[i] = new ThreadIdentifier(new byte[] {1}, i); |
| hrq.addDispatchedMessage(ids[i], i); |
| } |
| |
| Thread thread1 = new Thread() { |
| @Override |
| public void run() { |
| try { |
| Thread.sleep(600); |
| } catch (InterruptedException e) { |
| errorCollector.addError(e); |
| } |
| list1 = HARegionQueue.createMessageListForTesting(); |
| } |
| }; |
| |
| Thread thread2 = new Thread() { |
| @Override |
| public void run() { |
| try { |
| Thread.sleep(480); |
| } catch (InterruptedException e) { |
| errorCollector.addError(e); |
| } |
| for (int i = 0; i < numberOfIterations; i++) { |
| hrq.addDispatchedMessage(ids[i], i + numberOfIterations); |
| } |
| } |
| }; |
| |
| thread1.start(); |
| thread2.start(); |
| ThreadUtils.join(thread1, 30 * 1000); |
| ThreadUtils.join(thread2, 30 * 1000); |
| List list2 = HARegionQueue.createMessageListForTesting(); |
| Iterator iterator = list1.iterator(); |
| boolean doOnce = false; |
| EventID id; |
| Map map = new HashMap(); |
| |
| while (iterator.hasNext()) { |
| if (!doOnce) { |
| iterator.next(); |
| iterator.next(); |
| doOnce = true; |
| } else { |
| id = (EventID) iterator.next(); |
| map.put(new Long(id.getThreadID()), id.getSequenceID()); |
| } |
| } |
| |
| iterator = list2.iterator(); |
| doOnce = false; |
| |
| while (iterator.hasNext()) { |
| if (!doOnce) { |
| iterator.next(); |
| iterator.next(); |
| doOnce = true; |
| } else { |
| id = (EventID) iterator.next(); |
| map.put(id.getThreadID(), id.getSequenceID()); |
| } |
| } |
| |
| iterator = map.values().iterator(); |
| Long max = numberOfIterations; |
| while (iterator.hasNext()) { |
| Long next = (Long) iterator.next(); |
| assertThat( |
| " Expected all the sequence ID's to be greater than " + max |
| + " but it is not so. Got sequence id " + next, |
| next.compareTo(max), greaterThanOrEqualTo(0)); |
| } |
| } |
| |
| /** |
| * This test remove operation is updating the sequence ID for a ThreadIdentifier and concurrently |
| * the QRM thread is iterating over the Map to form the Data Set. There should not be any DataLoss |
| * |
| * In this test, first we add x number of events for unique thread id for the same region then we |
| * start two concurrent threads. One which does add to dispatched events map with sequence id's |
| * greater than x and the second one which does createMessageList (which removes from the |
| * dispatched events map) and stores the list |
| * |
| * After the two threads have operated, createMessageList is called again and the data is stored |
| * in another list. |
| * |
| * The data in the list is populated on a map against the threadId. |
| * |
| * It is then verified to see that the map size should be 2x |
| */ |
| @Test |
| public void testConcurrentDispatcherAndRemovalForSameRegionDifferentThreadId() throws Exception { |
| int numberOfIterations = 1000; |
| HARegionQueue hrq = createHARegionQueue(testName.getMethodName()); |
| HARegionQueue.stopQRMThread(); |
| ThreadIdentifier[] ids = new ThreadIdentifier[(int) numberOfIterations]; |
| |
| for (int i = 0; i < numberOfIterations; i++) { |
| ids[i] = new ThreadIdentifier(new byte[] {1}, i); |
| hrq.addDispatchedMessage(ids[i], i); |
| } |
| |
| Thread thread1 = new Thread() { |
| @Override |
| public void run() { |
| try { |
| Thread.sleep(600); |
| } catch (InterruptedException e) { |
| errorCollector.addError(e); |
| } |
| list1 = HARegionQueue.createMessageListForTesting(); |
| } |
| }; |
| |
| Thread thread2 = new Thread() { |
| @Override |
| public void run() { |
| try { |
| Thread.sleep(480); |
| } catch (InterruptedException e) { |
| errorCollector.addError(e); |
| } |
| for (int i = 0; i < numberOfIterations; i++) { |
| ids[i] = new ThreadIdentifier(new byte[] {1}, i + numberOfIterations); |
| hrq.addDispatchedMessage(ids[i], i + numberOfIterations); |
| } |
| } |
| }; |
| |
| thread1.start(); |
| thread2.start(); |
| ThreadUtils.join(thread1, 30 * 1000); |
| ThreadUtils.join(thread2, 30 * 1000); |
| List list2 = HARegionQueue.createMessageListForTesting(); |
| Iterator iterator = list1.iterator(); |
| boolean doOnce = false; |
| EventID id; |
| Map map = new HashMap(); |
| |
| while (iterator.hasNext()) { |
| if (!doOnce) { |
| iterator.next(); |
| iterator.next(); |
| doOnce = true; |
| } else { |
| id = (EventID) iterator.next(); |
| map.put(id.getThreadID(), id.getSequenceID()); |
| } |
| } |
| |
| iterator = list2.iterator(); |
| doOnce = false; |
| |
| while (iterator.hasNext()) { |
| if (!doOnce) { |
| iterator.next(); |
| iterator.next(); |
| doOnce = true; |
| } else { |
| id = (EventID) iterator.next(); |
| map.put(id.getThreadID(), id.getSequenceID()); |
| } |
| } |
| assertThat( |
| " Expected the map size to be " + 2 * numberOfIterations + " but it is " + map.size(), |
| map.size(), is(2 * numberOfIterations)); |
| } |
| |
| /** |
| * This test tests remove operation is causing the insertion of a sequence ID for a new HA Region |
| * and concurrently the QRM thread is iterating over the Map to form the Data Set for dispatch. |
| * There should not be any Data Loss |
| * |
| * In this test, first we add x number of events for unique thread id for the 2 regions then we |
| * start two concurrent threads. One which does add to dispatched events map to 3 new regions |
| * |
| * the second thread does createMessageList (which removes from the dispatched events map) and |
| * stores the list |
| * |
| * After the two threads have operated, createMessageList is called again and the data is stored |
| * in another list. |
| * |
| * The data in the list is populated on a map against the threadId. |
| * |
| * It is then verified to see that a total of x entries are present in the map |
| */ |
| @Test |
| public void testConcurrentDispatcherAndRemovalForMultipleRegionsSameThreadId() throws Exception { |
| int numberOfIterations = 10000; |
| HARegionQueue hrq1 = createHARegionQueue(testName.getMethodName() + "-1"); |
| HARegionQueue hrq2 = createHARegionQueue(testName.getMethodName() + "-2"); |
| HARegionQueue hrq3 = createHARegionQueue(testName.getMethodName() + "-3"); |
| HARegionQueue hrq4 = createHARegionQueue(testName.getMethodName() + "-4"); |
| HARegionQueue hrq5 = createHARegionQueue(testName.getMethodName() + "-5"); |
| |
| HARegionQueue.stopQRMThread(); |
| |
| ThreadIdentifier[] ids = new ThreadIdentifier[(int) numberOfIterations]; |
| |
| for (int i = 0; i < numberOfIterations; i++) { |
| ids[i] = new ThreadIdentifier(new byte[] {1}, i); |
| hrq1.addDispatchedMessage(ids[i], i); |
| hrq2.addDispatchedMessage(ids[i], i); |
| |
| } |
| |
| Thread thread1 = new Thread() { |
| @Override |
| public void run() { |
| try { |
| Thread.sleep(600); |
| } catch (InterruptedException e) { |
| errorCollector.addError(e); |
| } |
| list1 = HARegionQueue.createMessageListForTesting(); |
| } |
| }; |
| |
| Thread thread2 = new Thread() { |
| @Override |
| public void run() { |
| try { |
| Thread.sleep(480); |
| } catch (InterruptedException e) { |
| errorCollector.addError(e); |
| } |
| for (int i = 0; i < numberOfIterations; i++) { |
| hrq3.addDispatchedMessage(ids[i], i); |
| hrq4.addDispatchedMessage(ids[i], i); |
| hrq5.addDispatchedMessage(ids[i], i); |
| } |
| } |
| }; |
| |
| thread1.start(); |
| thread2.start(); |
| ThreadUtils.join(thread1, 30 * 1000); |
| ThreadUtils.join(thread2, 30 * 1000); |
| List list2 = HARegionQueue.createMessageListForTesting(); |
| Iterator iterator = list1.iterator(); |
| boolean doOnce = true; |
| EventID id; |
| Map map = new HashMap(); |
| |
| while (iterator.hasNext()) { |
| if (!doOnce) { |
| iterator.next(); // read the total message size |
| doOnce = true; |
| } else { |
| iterator.next();// region name; |
| int size = (Integer) iterator.next(); |
| for (int i = 0; i < size; i++) { |
| id = (EventID) iterator.next(); |
| map.put(new ThreadIdentifier(id.getMembershipID(), id.getThreadID()), id.getSequenceID()); |
| } |
| } |
| } |
| |
| iterator = list2.iterator(); |
| doOnce = true; |
| |
| while (iterator.hasNext()) { |
| if (!doOnce) { |
| iterator.next(); // read the total message size |
| doOnce = true; |
| } else { |
| iterator.next();// region name; |
| int size = (Integer) iterator.next(); |
| for (int i = 0; i < size; i++) { |
| id = (EventID) iterator.next(); |
| map.put(new ThreadIdentifier(id.getMembershipID(), id.getThreadID()), id.getSequenceID()); |
| } |
| } |
| } |
| |
| assertThat(" Expected the map size to be " + numberOfIterations + " but it is " + map.size(), |
| map.size(), is(numberOfIterations)); |
| } |
| |
| /** |
| * This test tests remove operation is causing the insertion of a sequence ID for a new |
| * ThreadIdentifier Object ( that is the HA Regio name exists but ThreadIdentifier object is |
| * appearing 1st time) and concurrently the QRM thread is iterating over the Map to form the Data |
| * Set for dispatch. There should not be any DataLoss |
| * |
| * In this test, first we add x number of events for unique thread id for the multiples regions |
| * then we start two concurrent threads. One which does add to dispatched events map with sequence |
| * id's greater than x and new ThreadIdentifiers |
| * |
| * the second thread does createMessageList (which removes from the dispatched events map) and |
| * stores the list |
| * |
| * After the two threads have operated, createMessageList is called again and the data is stored |
| * in another list. |
| * |
| * The data in the list is populated on a map against the threadId. |
| * |
| * It is then verified to see that the map size should be 2x * number of regions |
| */ |
| @Test |
| public void testConcurrentDispatcherAndRemovalForMultipleRegionsDifferentThreadId() |
| throws Exception { |
| int numberOfIterations = 1000; |
| HARegionQueue hrq1 = createHARegionQueue(testName.getMethodName() + "-1"); |
| HARegionQueue hrq2 = createHARegionQueue(testName.getMethodName() + "-2"); |
| HARegionQueue hrq3 = createHARegionQueue(testName.getMethodName() + "-3"); |
| HARegionQueue hrq4 = createHARegionQueue(testName.getMethodName() + "-4"); |
| HARegionQueue hrq5 = createHARegionQueue(testName.getMethodName() + "-5"); |
| |
| HARegionQueue.stopQRMThread(); |
| |
| ThreadIdentifier[] ids1 = new ThreadIdentifier[(int) numberOfIterations]; |
| ThreadIdentifier[] ids2 = new ThreadIdentifier[(int) numberOfIterations]; |
| ThreadIdentifier[] ids3 = new ThreadIdentifier[(int) numberOfIterations]; |
| ThreadIdentifier[] ids4 = new ThreadIdentifier[(int) numberOfIterations]; |
| ThreadIdentifier[] ids5 = new ThreadIdentifier[(int) numberOfIterations]; |
| |
| for (int i = 0; i < numberOfIterations; i++) { |
| ids1[i] = new ThreadIdentifier(new byte[] {1}, i); |
| ids2[i] = new ThreadIdentifier(new byte[] {2}, i); |
| ids3[i] = new ThreadIdentifier(new byte[] {3}, i); |
| ids4[i] = new ThreadIdentifier(new byte[] {4}, i); |
| ids5[i] = new ThreadIdentifier(new byte[] {5}, i); |
| hrq1.addDispatchedMessage(ids1[i], i); |
| hrq2.addDispatchedMessage(ids2[i], i); |
| hrq3.addDispatchedMessage(ids3[i], i); |
| hrq4.addDispatchedMessage(ids4[i], i); |
| hrq5.addDispatchedMessage(ids5[i], i); |
| } |
| |
| Thread thread1 = new Thread() { |
| @Override |
| public void run() { |
| try { |
| Thread.sleep(600); |
| } catch (InterruptedException e) { |
| errorCollector.addError(e); |
| } |
| list1 = HARegionQueue.createMessageListForTesting(); |
| } |
| }; |
| |
| Thread thread2 = new Thread() { |
| @Override |
| public void run() { |
| try { |
| Thread.sleep(480); |
| } catch (InterruptedException e) { |
| errorCollector.addError(e); |
| } |
| for (int i = 0; i < numberOfIterations; i++) { |
| ids1[i] = new ThreadIdentifier(new byte[] {1}, i + numberOfIterations); |
| ids2[i] = new ThreadIdentifier(new byte[] {2}, i + numberOfIterations); |
| ids3[i] = new ThreadIdentifier(new byte[] {3}, i + numberOfIterations); |
| ids4[i] = new ThreadIdentifier(new byte[] {4}, i + numberOfIterations); |
| ids5[i] = new ThreadIdentifier(new byte[] {5}, i + numberOfIterations); |
| |
| hrq1.addDispatchedMessage(ids1[i], i + numberOfIterations); |
| hrq2.addDispatchedMessage(ids2[i], i + numberOfIterations); |
| hrq3.addDispatchedMessage(ids3[i], i + numberOfIterations); |
| hrq4.addDispatchedMessage(ids4[i], i + numberOfIterations); |
| hrq5.addDispatchedMessage(ids5[i], i + numberOfIterations); |
| } |
| } |
| }; |
| |
| thread1.start(); |
| thread2.start(); |
| ThreadUtils.join(thread1, 30 * 1000); |
| ThreadUtils.join(thread2, 30 * 1000); |
| List list2 = HARegionQueue.createMessageListForTesting(); |
| Iterator iterator = list1.iterator(); |
| boolean doOnce = true; |
| EventID id = null; |
| Map map = new HashMap(); |
| |
| while (iterator.hasNext()) { |
| if (!doOnce) { |
| iterator.next(); // read the total message size |
| doOnce = true; |
| } else { |
| iterator.next(); // region name; |
| int size = (Integer) iterator.next(); |
| System.out.println(" size of list 1 iteration x " + size); |
| for (int i = 0; i < size; i++) { |
| id = (EventID) iterator.next(); |
| map.put(new ThreadIdentifier(id.getMembershipID(), id.getThreadID()), id.getSequenceID()); |
| } |
| } |
| } |
| |
| iterator = list2.iterator(); |
| doOnce = true; |
| |
| while (iterator.hasNext()) { |
| if (!doOnce) { |
| iterator.next(); // read the total message size |
| doOnce = true; |
| } else { |
| iterator.next(); // region name; |
| int size = (Integer) iterator.next(); |
| System.out.println(" size of list 2 iteration x " + size); |
| for (int i = 0; i < size; i++) { |
| id = (EventID) iterator.next(); |
| map.put(new ThreadIdentifier(id.getMembershipID(), id.getThreadID()), id.getSequenceID()); |
| } |
| } |
| } |
| |
| assertThat( |
| " Expected the map size to be " + numberOfIterations * 2 * 5 + " but it is " + map.size(), |
| map.size(), is(numberOfIterations * 2 * 5)); |
| } |
| |
| /** |
| * Concurrent Peek on Blocking Queue waiting with for a Put . If concurrent take is also happening |
| * such that the object is removed first then the peek should block & not return with null. |
| */ |
| @Test |
| public void testBlockingQueueForConcurrentPeekAndTake() throws Exception { |
| TestBlockingHARegionQueue regionQueue = |
| new TestBlockingHARegionQueue("testBlockQueueForConcurrentPeekAndTake", cache, |
| disabledClock()); |
| Thread[] threads = new Thread[3]; |
| |
| for (int i = 0; i < 3; i++) { |
| threads[i] = new Thread() { |
| @Override |
| public void run() { |
| try { |
| long startTime = System.currentTimeMillis(); |
| Object obj = regionQueue.peek(); |
| if (obj == null) { |
| errorCollector.addError(new AssertionError( |
| "Failed : failed since object was null and was not expected to be null")); |
| } |
| long totalTime = System.currentTimeMillis() - startTime; |
| |
| if (totalTime < 4000) { |
| errorCollector.addError(new AssertionError( |
| "Failed : Expected time to be greater than 4000 but it is not so")); |
| } |
| } catch (Exception e) { |
| errorCollector.addError(e); |
| } |
| } |
| }; |
| } |
| |
| for (int k = 0; k < 3; k++) { |
| threads[k].start(); |
| } |
| |
| Thread.sleep(4000); |
| |
| EventID id = new EventID(new byte[] {1}, 1, 1); |
| EventID id1 = new EventID(new byte[] {1}, 1, 2); |
| |
| regionQueue.takeFirst = true; |
| regionQueue.put(new ConflatableObject("key", "value", id, true, testName.getMethodName())); |
| |
| Thread.sleep(2000); |
| |
| regionQueue.put(new ConflatableObject("key1", "value1", id1, true, testName.getMethodName())); |
| |
| long startTime = System.currentTimeMillis(); |
| for (int k = 0; k < 3; k++) { |
| ThreadUtils.join(threads[k], 180 * 1000); |
| } |
| |
| long totalTime = System.currentTimeMillis() - startTime; |
| if (totalTime >= 180000) { |
| fail(" Test taken too long "); |
| } |
| } |
| |
| /** |
| * Peek on a blocking queue is in progress & the single element is removed either by take or by |
| * QRM thread , the peek should block correctly. |
| */ |
| @Test |
| public void testBlockingQueueForTakeWhenPeekInProgress() throws Exception { |
| TestBlockingHARegionQueue regionQueue = |
| new TestBlockingHARegionQueue("testBlockQueueForTakeWhenPeekInProgress", cache, |
| disabledClock()); |
| Thread[] threads = new Thread[3]; |
| |
| for (int i = 0; i < 3; i++) { |
| threads[i] = new Thread() { |
| @Override |
| public void run() { |
| try { |
| long startTime = System.currentTimeMillis(); |
| Object obj = regionQueue.peek(); |
| if (obj == null) { |
| errorCollector.addError(new AssertionError( |
| "Failed : failed since object was null and was not expected to be null")); |
| } |
| long totalTime = System.currentTimeMillis() - startTime; |
| |
| if (totalTime < 4000) { |
| errorCollector.addError(new AssertionError( |
| "Failed : Expected time to be greater than 4000 but it is not so")); |
| } |
| } catch (Exception e) { |
| errorCollector.addError(e); |
| } |
| } |
| }; |
| } |
| |
| for (int k = 0; k < 3; k++) { |
| threads[k].start(); |
| } |
| |
| Thread.sleep(4000); |
| |
| EventID id = new EventID(new byte[] {1}, 1, 1); |
| EventID id1 = new EventID(new byte[] {1}, 1, 2); |
| |
| regionQueue.takeWhenPeekInProgress = true; |
| regionQueue.put(new ConflatableObject("key", "value", id, true, testName.getMethodName())); |
| |
| Thread.sleep(2000); |
| |
| regionQueue.put(new ConflatableObject("key1", "value1", id1, true, testName.getMethodName())); |
| |
| long startTime = System.currentTimeMillis(); |
| for (int k = 0; k < 3; k++) { |
| ThreadUtils.join(threads[k], 60 * 1000); |
| } |
| |
| long totalTime = System.currentTimeMillis() - startTime; |
| if (totalTime >= 60000) { |
| fail(" Test taken too long "); |
| } |
| } |
| |
| /** |
| * The basis of HARegionQueue's Add & remove operations etc , is that the event being added first |
| * goes into DACE , Region etc and finally it is published by adding into the available IDs Set. |
| * Similarly if an event is to be removed it should first be removed from availableIDs set & then |
| * from behind the scenes. It will be the responsibility of the thread removing from available IDs |
| * successfully which will remove the entry from all other places. Now if the expiry task makes |
| * the event from underlying null before removing from available IDs , there is a potential |
| * violation. This test will validate that behaviour |
| */ |
| @Test |
| public void testConcurrentEventExpiryAndTake() throws Exception { |
| AtomicBoolean complete = new AtomicBoolean(false); |
| AtomicBoolean expiryCalled = new AtomicBoolean(false); |
| AtomicBoolean allowExpiryToProceed = new AtomicBoolean(false); |
| |
| HARegionQueueAttributes haa = new HARegionQueueAttributes(); |
| haa.setExpiryTime(3); |
| |
| RegionQueue regionqueue = |
| new TestOnlyHARegionQueue(testName.getMethodName(), cache, haa, |
| disabledClock()) { |
| @Override |
| CacheListener createCacheListenerForHARegion() { |
| |
| return new CacheListenerAdapter() { |
| |
| @Override |
| public void afterInvalidate(EntryEvent event) { |
| |
| if (event.getKey() instanceof Long) { |
| synchronized (HARegionQueueJUnitTest.this) { |
| expiryCalled.set(true); |
| HARegionQueueJUnitTest.this.notifyAll(); |
| } |
| |
| Thread.yield(); |
| |
| synchronized (HARegionQueueJUnitTest.this) { |
| while (!allowExpiryToProceed.get()) { |
| try { |
| HARegionQueueJUnitTest.this.wait(); |
| } catch (InterruptedException e) { |
| errorCollector.addError(e); |
| break; |
| } |
| } |
| } |
| |
| try { |
| expireTheEventOrThreadIdentifier(event); |
| } catch (CacheException e) { |
| errorCollector.addError(e); |
| } finally { |
| synchronized (HARegionQueueJUnitTest.this) { |
| complete.set(true); |
| HARegionQueueJUnitTest.this.notifyAll(); |
| } |
| } |
| } |
| } |
| }; |
| } |
| }; |
| |
| EventID ev1 = new EventID(new byte[] {1}, 1, 1); |
| Conflatable cf1 = new ConflatableObject("key", "value", ev1, true, testName.getMethodName()); |
| regionqueue.put(cf1); |
| |
| synchronized (this) { |
| while (!expiryCalled.get()) { |
| wait(); |
| } |
| } |
| |
| try { |
| Object o = regionqueue.take(); |
| assertThat(o, nullValue()); |
| |
| } catch (Exception e) { |
| throw new AssertionError("Test failed due to exception ", e); |
| |
| } finally { |
| synchronized (this) { |
| allowExpiryToProceed.set(true); |
| notifyAll(); |
| } |
| } |
| |
| synchronized (this) { |
| while (!complete.get()) { |
| wait(); |
| } |
| } |
| } |
| |
| /** |
| * Tests the functionality of batch peek & remove with blocking & non blocking HARegionQueue |
| */ |
| @Test |
| public void testBatchPeekWithRemoveForNonBlockingQueue() throws Exception { |
| testBatchPeekWithRemove(false); |
| } |
| |
| /** |
| * Tests the functionality of batch peek & remove with blocking & non blocking HARegionQueue |
| */ |
| @Test |
| public void testBatchPeekWithRemoveForBlockingQueue() throws Exception { |
| testBatchPeekWithRemove(true); |
| } |
| |
| private void testBatchPeekWithRemove(boolean createBlockingQueue) |
| throws InterruptedException, IOException, ClassNotFoundException { |
| HARegionQueue regionQueue = createHARegionQueue(createBlockingQueue); |
| |
| for (int i = 0; i < 10; ++i) { |
| EventID ev1 = new EventID(new byte[] {1}, 1, i); |
| Conflatable cf1 = new ConflatableObject("key", "value", ev1, false, testName.getMethodName()); |
| regionQueue.put(cf1); |
| } |
| |
| List objs = regionQueue.peek(10, 5000); |
| assertThat(objs.size(), is(10)); |
| Iterator itr = objs.iterator(); |
| int j = 0; |
| |
| while (itr.hasNext()) { |
| Conflatable conf = (Conflatable) itr.next(); |
| assertThat(conf, notNullValue()); |
| assertThat("The sequence ID of the objects in the queue are not as expected", |
| conf.getEventId().getSequenceID(), is((long) j++)); |
| } |
| |
| regionQueue.remove(); |
| assertThat(regionQueue.size(), is(0)); |
| } |
| |
| private HARegionQueue createHARegionQueue(boolean createBlockingQueue) |
| throws InterruptedException, IOException, ClassNotFoundException { |
| HARegionQueueAttributes haa = new HARegionQueueAttributes(); |
| haa.setExpiryTime(300); |
| |
| if (createBlockingQueue) { |
| return HARegionQueue.getHARegionQueueInstance(testName.getMethodName(), cache, haa, |
| HARegionQueue.BLOCKING_HA_QUEUE, false, disabledClock()); |
| } else { |
| return HARegionQueue.getHARegionQueueInstance(testName.getMethodName(), cache, haa, |
| HARegionQueue.NON_BLOCKING_HA_QUEUE, false, disabledClock()); |
| } |
| } |
| |
| /** |
| * tests whether expiry of entry in the region queue occurs as expected using system property to |
| * set expiry |
| */ |
| @Test |
| public void testExpiryUsingSystemProperty() throws Exception { |
| System.setProperty(GEODE_PREFIX + HA_REGION_QUEUE_EXPIRY_TIME_PROPERTY, "1"); |
| |
| HARegionQueueAttributes haa = new HARegionQueueAttributes(); |
| HARegionQueue regionQueue = createHARegionQueue(testName.getMethodName(), haa); |
| long start = System.currentTimeMillis(); |
| |
| regionQueue.put(new ConflatableObject("key", "value", new EventID(new byte[] {1}, 1, 1), true, |
| testName.getMethodName())); |
| |
| Map map = (Map) regionQueue.getConflationMapForTesting().get(testName.getMethodName()); |
| assertThat(!map.isEmpty(), is(true)); |
| |
| waitAtLeast(1000, start, () -> { |
| assertThat(map, is(Collections.emptyMap())); |
| assertThat(regionQueue.getRegion().keys(), is(Collections.emptySet())); |
| }); |
| } |
| |
| /** |
| * This tests whether the messageSyncInterval for QueueRemovalThread is refreshed properly when |
| * set/updated using cache's setter API |
| */ |
| @Test |
| public void testUpdateOfMessageSyncInterval() throws Exception { |
| int initialMessageSyncInterval = 5; |
| cache.setMessageSyncInterval(initialMessageSyncInterval); |
| createHARegionQueue(testName.getMethodName()); |
| |
| assertThat("messageSyncInterval not set properly", HARegionQueue.getMessageSyncInterval(), |
| is(initialMessageSyncInterval)); |
| |
| int updatedMessageSyncInterval = 10; |
| cache.setMessageSyncInterval(updatedMessageSyncInterval); |
| |
| await() |
| .untilAsserted(() -> assertThat("messageSyncInterval not updated.", |
| HARegionQueue.getMessageSyncInterval(), is(updatedMessageSyncInterval))); |
| } |
| |
| @Test |
| public void testPutEventInHARegion_Conflatable() throws Exception { |
| HARegionQueue regionQueue = createHARegionQueue(testName.getMethodName()); |
| |
| Conflatable expectedConflatable = |
| new ConflatableObject("key", "value", new EventID(new byte[] {1}, 1, 1), true, |
| testName.getMethodName()); |
| |
| final long position = 0L; |
| Conflatable returnedConflatable = regionQueue.putEventInHARegion(expectedConflatable, position); |
| |
| Assert.assertEquals(expectedConflatable, returnedConflatable); |
| |
| Conflatable conflatableInRegion = (Conflatable) regionQueue.getRegion().get(position); |
| |
| Assert.assertEquals(expectedConflatable, conflatableInRegion); |
| } |
| |
| @Test |
| public void testPutEventInHARegion_HAEventWrapper_New() throws Exception { |
| HARegionQueue regionQueue = |
| createHARegionQueue(testName.getMethodName(), HARegionQueue.BLOCKING_HA_QUEUE); |
| HARegionQueue regionQueueSpy = Mockito.spy(regionQueue); |
| |
| HAEventWrapper newHAEventWrapper = new HAEventWrapper(mock(EventID.class)); |
| |
| doReturn(newHAEventWrapper).when(regionQueueSpy) |
| .putEntryConditionallyIntoHAContainer(newHAEventWrapper); |
| |
| final long position = 0L; |
| Conflatable returnedHAEventWrapper = |
| regionQueueSpy.putEventInHARegion(newHAEventWrapper, position); |
| |
| Assert.assertEquals(newHAEventWrapper, returnedHAEventWrapper); |
| |
| HAEventWrapper haEventWrapperInRegion = |
| (HAEventWrapper) regionQueueSpy.getRegion().get(position); |
| |
| Assert.assertEquals(newHAEventWrapper, haEventWrapperInRegion); |
| verify(regionQueueSpy, times(1)).putEntryConditionallyIntoHAContainer(newHAEventWrapper); |
| } |
| |
| @Test |
| public void testPutEventInHARegion_HAEventWrapper_EntryAlreadyExisted() throws Exception { |
| HARegionQueue regionQueue = |
| createHARegionQueue(testName.getMethodName(), HARegionQueue.BLOCKING_HA_QUEUE); |
| HARegionQueue regionQueueSpy = Mockito.spy(regionQueue); |
| |
| // Mock out an existing entry and increment ref count as if it had already been added to the HA |
| // container |
| HAEventWrapper existingHAEventWrapper = new HAEventWrapper(mock(EventID.class)); |
| HAEventWrapper newHAEventWrapper = new HAEventWrapper(mock(EventID.class)); |
| |
| doReturn(existingHAEventWrapper).when(regionQueueSpy) |
| .putEntryConditionallyIntoHAContainer(newHAEventWrapper); |
| |
| final long position = 0L; |
| Conflatable returnedHAEventWrapper = |
| regionQueueSpy.putEventInHARegion(newHAEventWrapper, position); |
| |
| Assert.assertEquals(existingHAEventWrapper, returnedHAEventWrapper); |
| |
| HAEventWrapper haEventWrapperInRegion = |
| (HAEventWrapper) regionQueueSpy.getRegion().get(position); |
| |
| Assert.assertEquals(existingHAEventWrapper, haEventWrapperInRegion); |
| verify(regionQueueSpy, times(1)).putEntryConditionallyIntoHAContainer(newHAEventWrapper); |
| } |
| |
| @Test |
| public void testPutEventInHARegion_HAEventWrapper_QueueNotInitialized() throws Exception { |
| HARegionQueue regionQueue = |
| createHARegionQueue(testName.getMethodName(), HARegionQueue.BLOCKING_HA_QUEUE); |
| |
| // Mock that the regionQueue is not yet initialized |
| regionQueue.initialized.set(false); |
| |
| HARegionQueue regionQueueSpy = Mockito.spy(regionQueue); |
| // Mock out an existing entry and increment ref count as if it had already been added to the HA |
| // container |
| HAEventWrapper expectedHAEventWrapper = new HAEventWrapper(mock(EventID.class)); |
| |
| final long position = 0L; |
| Conflatable returnedHAEventWrapper = |
| regionQueueSpy.putEventInHARegion(expectedHAEventWrapper, position); |
| |
| Assert.assertEquals(expectedHAEventWrapper, returnedHAEventWrapper); |
| |
| HAEventWrapper haEventWrapperInRegion = |
| (HAEventWrapper) regionQueueSpy.getRegion().get(position); |
| |
| Assert.assertEquals(expectedHAEventWrapper, haEventWrapperInRegion); |
| |
| // putEntryConditionallyIntoHAContainer should not be called if the queue isn't yet initialized |
| verify(regionQueueSpy, times(0)).putEntryConditionallyIntoHAContainer(expectedHAEventWrapper); |
| } |
| |
| @Test(expected = IllegalArgumentException.class) |
| public void testPutEventInHARegion_HAEventWrapper_NullClientUpdateMessage() throws Exception { |
| HARegionQueue regionQueue = |
| createHARegionQueue(testName.getMethodName(), HARegionQueue.BLOCKING_HA_QUEUE); |
| |
| HAEventWrapper haEventWrapperWithNullCUMI = new HAEventWrapper(mock(EventID.class)); |
| haEventWrapperWithNullCUMI.setClientUpdateMessage(null); |
| |
| final long position = 0L; |
| regionQueue.putEventInHARegion(haEventWrapperWithNullCUMI, 0L); |
| } |
| |
| @Test |
| public void testPutEntryConditionallyIntoHAContainer_MultipleThreads_SameWrapperInstanceAndCorrectRefCount() |
| throws Exception { |
| HARegionQueue regionQueue = |
| createHARegionQueue(testName.getMethodName(), HARegionQueue.BLOCKING_HA_QUEUE); |
| |
| HAEventWrapper mockHaEventWrapper = mock(HAEventWrapper.class); |
| doReturn(true).when(mockHaEventWrapper).getPutInProgress(); |
| |
| ClientUpdateMessageImpl mockClientUpdateMessage = mock(ClientUpdateMessageImpl.class); |
| doReturn(mockClientUpdateMessage).when(mockHaEventWrapper).getClientUpdateMessage(); |
| |
| int numClients = 100; |
| ExecutorService executorService = Executors.newFixedThreadPool(numClients); |
| |
| Collection<Callable<Conflatable>> concurrentPuts = |
| Collections.nCopies(numClients, (Callable<Conflatable>) () -> regionQueue |
| .putEntryConditionallyIntoHAContainer(mockHaEventWrapper)); |
| |
| List<Future<Conflatable>> futures = executorService.invokeAll(concurrentPuts); |
| |
| List<Conflatable> conflatables = new ArrayList<>(); |
| |
| for (Future future : futures) { |
| conflatables.add((Conflatable) future.get()); |
| } |
| |
| boolean areAllConflatablesEqual = conflatables.stream().allMatch(conflatables.get(0)::equals); |
| |
| Assert.assertTrue(areAllConflatablesEqual); |
| verify(mockHaEventWrapper, times(numClients)).incAndGetReferenceCount(); |
| Assert.assertEquals(regionQueue.haContainer.get(mockHaEventWrapper), mockClientUpdateMessage); |
| } |
| |
| @Test |
| public void testPutEntryConditionallyIntoHAContainer_AddCQAndInterestList() throws Exception { |
| final String haRegionName = testName.getMethodName(); |
| |
| HARegionQueue regionQueue = |
| createHARegionQueue(haRegionName, HARegionQueue.BLOCKING_HA_QUEUE); |
| |
| ClientProxyMembershipID mockClientProxyMembershipId = mock(ClientProxyMembershipID.class); |
| CacheClientProxy mockCacheClientProxy = mock(CacheClientProxy.class); |
| |
| doReturn(mockClientProxyMembershipId).when(mockCacheClientProxy).getProxyID(); |
| ((HAContainerWrapper) regionQueue.haContainer).putProxy(haRegionName, mockCacheClientProxy); |
| |
| ClientUpdateMessageImpl.ClientCqConcurrentMap mockClientCqConcurrentMap = |
| mock(ClientUpdateMessageImpl.ClientCqConcurrentMap.class); |
| ClientUpdateMessageImpl.CqNameToOp mockCqNameToOp = |
| mock(ClientUpdateMessageImpl.CqNameToOp.class); |
| |
| doReturn(mockCqNameToOp).when(mockClientCqConcurrentMap).get(mockClientProxyMembershipId); |
| |
| ClientUpdateMessageImpl mockClientUpdateMessage = mock(ClientUpdateMessageImpl.class); |
| doReturn(true).when(mockClientUpdateMessage) |
| .isClientInterestedInUpdates(mockClientProxyMembershipId); |
| |
| HAEventWrapper mockHAEventWrapper = mock(HAEventWrapper.class); |
| doReturn(mockClientUpdateMessage).when(mockHAEventWrapper).getClientUpdateMessage(); |
| doReturn(mockClientCqConcurrentMap).when(mockHAEventWrapper).getClientCqs(); |
| |
| // Mock that a put is in progress so we don't null out the |
| // ClientUpdateMessage member on the HAEventWrapper |
| mockHAEventWrapper.incrementPutInProgressCounter("test"); |
| |
| // TODO: Why don't we add CQs and Interest when we initially add the |
| // wrapper to the container? |
| regionQueue.putEntryConditionallyIntoHAContainer(mockHAEventWrapper); |
| regionQueue.putEntryConditionallyIntoHAContainer(mockHAEventWrapper); |
| |
| verify(mockClientUpdateMessage, times(1)).addClientCqs(mockClientProxyMembershipId, |
| mockCqNameToOp); |
| verify(mockClientUpdateMessage, times(1)).addClientInterestList(mockClientProxyMembershipId, |
| true); |
| |
| // Mock that the ClientUpdateMessage is only interested in invalidates, then do another put |
| doReturn(false).when(mockClientUpdateMessage) |
| .isClientInterestedInUpdates(mockClientProxyMembershipId); |
| doReturn(true).when(mockClientUpdateMessage) |
| .isClientInterestedInInvalidates(mockClientProxyMembershipId); |
| |
| regionQueue.putEntryConditionallyIntoHAContainer(mockHAEventWrapper); |
| |
| verify(mockClientUpdateMessage, times(1)).addClientInterestList(mockClientProxyMembershipId, |
| false); |
| } |
| |
| @Test |
| public void testDecAndRemoveFromHAContainer_WrapperInContainer() throws Exception { |
| final String haRegionName = testName.getMethodName(); |
| |
| HARegionQueue regionQueue = |
| createHARegionQueue(haRegionName, HARegionQueue.BLOCKING_HA_QUEUE); |
| |
| HAContainerWrapper mockHAContainer = mock(HAContainerWrapper.class); |
| HAEventWrapper mockHAEventWrapper = mock(HAEventWrapper.class); |
| |
| doReturn(mockHAEventWrapper).when(mockHAContainer).getKey(mockHAEventWrapper); |
| |
| regionQueue.haContainer = mockHAContainer; |
| |
| regionQueue.decAndRemoveFromHAContainer(mockHAEventWrapper); |
| |
| verify(mockHAContainer, times(1)).remove(mockHAEventWrapper); |
| } |
| |
| @Test |
| public void testDecAndRemoveFromHAContainer_RemoteWrapperNotInContainer_Removed() |
| throws Exception { |
| final String haRegionName = testName.getMethodName(); |
| |
| HARegionQueue regionQueue = |
| createHARegionQueue(haRegionName, HARegionQueue.BLOCKING_HA_QUEUE); |
| |
| HAContainerWrapper mockHAContainer = mock(HAContainerWrapper.class); |
| HAEventWrapper mockHAEventWrapperInContainer = mock(HAEventWrapper.class); |
| HAEventWrapper mockRemoteHAEventWrapper = mock(HAEventWrapper.class); |
| |
| doReturn(mockHAEventWrapperInContainer).when(mockHAContainer).getKey(mockRemoteHAEventWrapper); |
| |
| regionQueue.haContainer = mockHAContainer; |
| |
| regionQueue.decAndRemoveFromHAContainer(mockRemoteHAEventWrapper); |
| |
| verify(mockHAContainer, times(1)).remove(mockHAEventWrapperInContainer); |
| } |
| |
| @Test |
| public void testDecAndRemoveFromHAContainer_DecrementedButNotRemoved() throws Exception { |
| final String haRegionName = testName.getMethodName(); |
| |
| HARegionQueue regionQueue = |
| createHARegionQueue(haRegionName, HARegionQueue.BLOCKING_HA_QUEUE); |
| |
| HAContainerWrapper mockHAContainer = mock(HAContainerWrapper.class); |
| HAEventWrapper mockHAEventWrapper = mock(HAEventWrapper.class); |
| |
| doReturn(mockHAEventWrapper).when(mockHAContainer).getKey(mockHAEventWrapper); |
| doReturn(1L).when(mockHAEventWrapper).decAndGetReferenceCount(); |
| |
| regionQueue.haContainer = mockHAContainer; |
| |
| regionQueue.decAndRemoveFromHAContainer(mockHAEventWrapper); |
| |
| verify(mockHAEventWrapper, times(1)).decAndGetReferenceCount(); |
| verify(mockHAContainer, times(0)).remove(mockHAEventWrapper); |
| } |
| |
| @Test |
| public void testDecAndRemoveFromHAContainer_AlreadyRemoved() throws Exception { |
| final String haRegionName = testName.getMethodName(); |
| |
| HARegionQueue regionQueue = |
| createHARegionQueue(haRegionName, HARegionQueue.BLOCKING_HA_QUEUE); |
| |
| HAContainerWrapper mockHAContainer = mock(HAContainerWrapper.class); |
| HAEventWrapper mockHAEventWrapperInContainer = mock(HAEventWrapper.class); |
| HAEventWrapper mockRemoteHAEventWrapper = mock(HAEventWrapper.class); |
| |
| doReturn(null).when(mockHAContainer).getKey(mockRemoteHAEventWrapper); |
| |
| regionQueue.haContainer = mockHAContainer; |
| |
| regionQueue.decAndRemoveFromHAContainer(mockRemoteHAEventWrapper); |
| |
| verify(mockHAContainer, times(0)).remove(mockHAEventWrapperInContainer); |
| verify(mockHAEventWrapperInContainer, times(0)).decAndGetReferenceCount(); |
| } |
| |
| @Test |
| public void testDecAndRemoveFromHAContainer_RefChangedAfterGettingKey() throws Exception { |
| final String haRegionName = testName.getMethodName(); |
| |
| HARegionQueue regionQueue = |
| createHARegionQueue(haRegionName, HARegionQueue.BLOCKING_HA_QUEUE); |
| |
| HAContainerWrapper mockHAContainer = mock(HAContainerWrapper.class); |
| HAEventWrapper mockOriginalHAEventWrapperInContainer = mock(HAEventWrapper.class); |
| HAEventWrapper mockNewHAEventWrapperInContainer = mock(HAEventWrapper.class); |
| HAEventWrapper mockRemoteHAEventWrapper = mock(HAEventWrapper.class); |
| |
| // First call will return original wrapper in container, then second call will return a new one |
| // to simulate the key being replaced by a new one in a different thread |
| doReturn(mockOriginalHAEventWrapperInContainer) |
| .doReturn(mockNewHAEventWrapperInContainer) |
| .when(mockHAContainer).getKey(mockRemoteHAEventWrapper); |
| |
| regionQueue.haContainer = mockHAContainer; |
| |
| regionQueue.decAndRemoveFromHAContainer(mockRemoteHAEventWrapper); |
| |
| verify(mockHAContainer, times(0)).remove(mockOriginalHAEventWrapperInContainer); |
| verify(mockOriginalHAEventWrapperInContainer, times(0)).decAndGetReferenceCount(); |
| verify(mockHAContainer, times(1)).remove(mockNewHAEventWrapperInContainer); |
| verify(mockNewHAEventWrapperInContainer, times(1)).decAndGetReferenceCount(); |
| } |
| |
| @Test |
| public void testDecAndRemoveFromHAContainer_MultipleThreadsDecrementing() throws Exception { |
| HARegionQueue regionQueue = |
| createHARegionQueue(testName.getMethodName(), HARegionQueue.BLOCKING_HA_QUEUE); |
| |
| HAEventWrapper mockHAEventWrapper = mock(HAEventWrapper.class); |
| |
| HAContainerWrapper mockHAContainer = mock(HAContainerWrapper.class); |
| doReturn(mockHAEventWrapper).when(mockHAContainer).getKey(mockHAEventWrapper); |
| regionQueue.haContainer = mockHAContainer; |
| |
| final int numClients = 100; |
| |
| doAnswer(new Answer() { |
| private long mockRefCount = numClients; |
| |
| @Override |
| public Object answer(InvocationOnMock invocation) throws Throwable { |
| return --mockRefCount; |
| } |
| }).when(mockHAEventWrapper).decAndGetReferenceCount(); |
| |
| ExecutorService executorService = Executors.newFixedThreadPool(numClients); |
| |
| Collection<Callable<Void>> concurrentDecAndRemoves = |
| Collections.nCopies(numClients, (Callable<Void>) () -> { |
| regionQueue |
| .decAndRemoveFromHAContainer(mockHAEventWrapper); |
| return null; |
| }); |
| |
| List<Future<Void>> futures = executorService.invokeAll(concurrentDecAndRemoves); |
| |
| for (Future<Void> future : futures) { |
| future.get(); |
| } |
| |
| verify(mockHAEventWrapper, times(numClients)).decAndGetReferenceCount(); |
| verify(mockHAContainer, times(1)).remove(mockHAEventWrapper); |
| } |
| |
| @Test |
| public void testPutEntryConditionallyIntoHAContainerUpdatesInterestList() throws Exception { |
| final String haRegionName = testName.getMethodName(); |
| |
| HARegionQueue regionQueue = |
| createHARegionQueue(haRegionName, HARegionQueue.BLOCKING_HA_QUEUE); |
| |
| ClientProxyMembershipID mockClientProxyMembershipId = mock(ClientProxyMembershipID.class); |
| CacheClientProxy mockCacheClientProxy = mock(CacheClientProxy.class); |
| |
| doReturn(mockClientProxyMembershipId).when(mockCacheClientProxy).getProxyID(); |
| ((HAContainerWrapper) regionQueue.haContainer).putProxy(haRegionName, mockCacheClientProxy); |
| |
| EventID mockEventID = mock(EventID.class); |
| ClientUpdateMessageImpl mockClientUpdateMessage = mock(ClientUpdateMessageImpl.class); |
| mockClientUpdateMessage.setEventIdentifier(mockEventID); |
| |
| doReturn(true).when(mockClientUpdateMessage) |
| .isClientInterestedInUpdates(mockClientProxyMembershipId); |
| |
| HAEventWrapper originalHAEventWrapper = new HAEventWrapper(mockEventID); |
| originalHAEventWrapper.setClientUpdateMessage(mockClientUpdateMessage); |
| |
| // allow putInProgress to be false (so we null out the msg field in the wrapper) |
| regionQueue.putEntryConditionallyIntoHAContainer(originalHAEventWrapper); |
| |
| // the initial haContainer.putIfAbsent() doesn't need to invoke addClientInterestList |
| // as it is already part of the original message |
| verify(mockClientUpdateMessage, times(0)).addClientInterestList(mockClientProxyMembershipId, |
| true); |
| |
| // create a new wrapper with the same id and message |
| HAEventWrapper newHAEventWrapper = new HAEventWrapper(mockEventID); |
| newHAEventWrapper.setClientUpdateMessage(mockClientUpdateMessage); |
| |
| regionQueue.putEntryConditionallyIntoHAContainer(newHAEventWrapper); |
| |
| // Verify that the original haContainerEntry gets the updated clientInterestList |
| verify(mockClientUpdateMessage, times(1)).addClientInterestList(mockClientProxyMembershipId, |
| true); |
| } |
| |
| /** |
| * Wait until a given runnable stops throwing exceptions. It should take at least |
| * minimumElapsedTime after the supplied start time to happen. |
| * |
| * This is useful for validating that an entry doesn't expire until a certain amount of time has |
| * passed |
| */ |
| private void waitAtLeast(final int minimumElapsedTime, final long start, |
| final ThrowingRunnable runnable) { |
| await().untilAsserted(runnable); |
| long elapsed = System.currentTimeMillis() - start; |
| assertThat(elapsed >= minimumElapsedTime, is(true)); |
| } |
| |
| /** |
| * Creates and runs the put threads which will create the conflatable objects and add them to the |
| * queue |
| * |
| * @param generateSameKeys - if all the producers need to put objects with same set of keys |
| * (needed for conflation testing) |
| * @param generateSameIds - if all the producers need to put objects with same set of ids (needed |
| * for duplicates testing) |
| * @param conflationEnabled - true if all producers need to put objects with conflation enabled, |
| * false otherwise. |
| * @param putPerProducer - number of objects offered to the queue by each producer |
| */ |
| private void createAndRunProducers(boolean generateSameKeys, boolean generateSameIds, |
| boolean conflationEnabled, int putPerProducer) { |
| Producer[] putThreads = new Producer[TOTAL_PUT_THREADS]; |
| |
| // Create the put-threads, each generating same/different set of ids/keys as |
| // per the parameters |
| for (int i = 0; i < TOTAL_PUT_THREADS; i++) { |
| String keyPrefix; |
| long startId; |
| if (generateSameKeys) { |
| keyPrefix = "key"; |
| } else { |
| keyPrefix = i + "key"; |
| } |
| if (generateSameIds) { |
| startId = 1; |
| } else { |
| startId = i * 100000; |
| } |
| putThreads[i] = |
| new Producer("Producer-" + i, keyPrefix, startId, putPerProducer, conflationEnabled); |
| } |
| |
| // start the put-threads |
| for (int i = 0; i < TOTAL_PUT_THREADS; i++) { |
| putThreads[i].start(); |
| } |
| |
| // call join on the put-threads so that this thread waits till they complete |
| // before doing verification |
| for (int i = 0; i < TOTAL_PUT_THREADS; i++) { |
| ThreadUtils.join(putThreads[i], 30 * 1000); |
| } |
| } |
| |
| /** |
| * Creates the cache instance for the test |
| */ |
| private InternalCache createCache() throws RegionExistsException { |
| return (InternalCache) new CacheFactory().set(MCAST_PORT, "0").create(); |
| } |
| |
| protected int queueType() { |
| return HARegionQueue.NON_BLOCKING_HA_QUEUE; |
| } |
| |
| /** |
| * Creates HA region-queue object |
| */ |
| private HARegionQueue createHARegionQueue(String name) |
| throws IOException, ClassNotFoundException, CacheException, InterruptedException { |
| return HARegionQueue.getHARegionQueueInstance(name, cache, queueType(), false, disabledClock()); |
| } |
| |
| /** |
| * Creates HA region-queue object with specified queue type |
| */ |
| private HARegionQueue createHARegionQueue(String name, int queueType) |
| throws IOException, ClassNotFoundException, CacheException, InterruptedException { |
| return HARegionQueue.getHARegionQueueInstance(name, cache, queueType, false, disabledClock()); |
| } |
| |
| /** |
| * Creates region-queue object with specified HARegionQueueAttributes |
| */ |
| HARegionQueue createHARegionQueue(String name, HARegionQueueAttributes attrs) |
| throws IOException, ClassNotFoundException, CacheException, InterruptedException { |
| return HARegionQueue.getHARegionQueueInstance(name, cache, attrs, queueType(), false, |
| disabledClock()); |
| } |
| |
| /** |
| * Used to override the remove method for testSafeConflationRemoval |
| */ |
| static class ConcHashMap extends ConcurrentHashMap implements ConcurrentMap { |
| |
| @Override |
| public boolean remove(Object arg0, Object arg1) { |
| Conflatable cf2 = new ConflatableObject("key1", "value2", new EventID(new byte[] {1}, 1, 2), |
| true, "testSafeConflationRemoval"); |
| try { |
| hrqForTestSafeConflationRemoval.put(cf2); |
| } catch (Exception e) { |
| throw new AssertionError("Exception occurred in trying to put ", e); |
| } |
| return super.remove(arg0, arg1); |
| } |
| } |
| |
| /** |
| * Extends HARegionQueue for testing purposes. used by testSafeConflationRemoval |
| */ |
| static class HARQTestClass extends TestOnlyHARegionQueue { |
| |
| HARQTestClass(String regionName, InternalCache cache) |
| throws IOException, ClassNotFoundException, CacheException, InterruptedException { |
| super(regionName, cache, disabledClock()); |
| } |
| |
| @Override |
| ConcurrentMap createConcurrentMap() { |
| return new ConcHashMap(); |
| } |
| } |
| |
| /** |
| * Thread to perform PUTs into the queue |
| */ |
| class Producer extends Thread { |
| |
| /** sleep between successive puts */ |
| private static final long sleepTime = 10; |
| |
| /** total number of puts by this thread */ |
| private long totalPuts = 0; |
| |
| /** prefix to keys of all objects put by this thread */ |
| private final String keyPrefix; |
| |
| /** startingId for sequence-ids of all objects put by this thread */ |
| private final long startingId; |
| |
| /** name of this producer thread */ |
| private String producerName; |
| |
| /** |
| * boolean to indicate whether this thread should create conflation enabled entries |
| */ |
| private final boolean createConflatables; |
| |
| /** |
| * @param name - name for this thread |
| * @param keyPrefix - prefix to keys of all objects put by this thread |
| * @param startingId - startingId for sequence-ids of all objects put by this thread |
| * @param totalPuts total number of puts by this thread |
| * @param createConflatableEvents - boolean to indicate whether this thread should create |
| * conflation enabled entries |
| */ |
| Producer(String name, String keyPrefix, long startingId, long totalPuts, |
| boolean createConflatableEvents) { |
| super(name); |
| producerName = name; |
| this.keyPrefix = keyPrefix; |
| this.startingId = startingId; |
| this.totalPuts = totalPuts; |
| createConflatables = createConflatableEvents; |
| setDaemon(true); |
| } |
| |
| /** Create Conflatable objects and put them into the Queue. */ |
| @Override |
| public void run() { |
| if (producerName == null) { |
| producerName = Thread.currentThread().getName(); |
| } |
| for (long i = 0; i < totalPuts; i++) { |
| try { |
| String regionName = "test"; |
| ConflatableObject event = new ConflatableObject(keyPrefix + i, "val" + i, |
| new EventID(new byte[] {1}, startingId, startingId + i), createConflatables, |
| regionName); |
| |
| haRegionQueue.put(event); |
| Thread.sleep(sleepTime); |
| |
| } catch (Exception e) { |
| errorCollector.addError(e); |
| break; |
| } |
| } |
| } |
| } |
| |
| } |